1use std::convert::{From, TryFrom};
60use std::ptr::{addr_of, addr_of_mut};
61use std::sync::Arc;
62
63use arrow_array::{RecordBatchIterator, RecordBatchOptions, RecordBatchReader, StructArray};
64use pyo3::exceptions::{PyTypeError, PyValueError};
65use pyo3::ffi::Py_uintptr_t;
66use pyo3::import_exception;
67use pyo3::prelude::*;
68use pyo3::pybacked::PyBackedStr;
69use pyo3::types::{PyCapsule, PyList, PyTuple};
70
71use crate::array::{make_array, ArrayData};
72use crate::datatypes::{DataType, Field, Schema};
73use crate::error::ArrowError;
74use crate::ffi;
75use crate::ffi::{FFI_ArrowArray, FFI_ArrowSchema};
76use crate::ffi_stream::{ArrowArrayStreamReader, FFI_ArrowArrayStream};
77use crate::record_batch::RecordBatch;
78
79import_exception!(pyarrow, ArrowException);
80pub type PyArrowException = ArrowException;
82
83fn to_py_err(err: ArrowError) -> PyErr {
84 PyArrowException::new_err(err.to_string())
85}
86
87pub trait FromPyArrow: Sized {
89 fn from_pyarrow_bound(value: &Bound<PyAny>) -> PyResult<Self>;
93}
94
95pub trait ToPyArrow {
97 fn to_pyarrow(&self, py: Python) -> PyResult<PyObject>;
99}
100
101pub trait IntoPyArrow {
103 fn into_pyarrow(self, py: Python) -> PyResult<PyObject>;
105}
106
107impl<T: ToPyArrow> IntoPyArrow for T {
108 fn into_pyarrow(self, py: Python) -> PyResult<PyObject> {
109 self.to_pyarrow(py)
110 }
111}
112
113fn validate_class(expected: &str, value: &Bound<PyAny>) -> PyResult<()> {
114 let pyarrow = PyModule::import(value.py(), "pyarrow")?;
115 let class = pyarrow.getattr(expected)?;
116 if !value.is_instance(&class)? {
117 let expected_module = class.getattr("__module__")?.extract::<PyBackedStr>()?;
118 let expected_name = class.getattr("__name__")?.extract::<PyBackedStr>()?;
119 let found_class = value.get_type();
120 let found_module = found_class
121 .getattr("__module__")?
122 .extract::<PyBackedStr>()?;
123 let found_name = found_class.getattr("__name__")?.extract::<PyBackedStr>()?;
124 return Err(PyTypeError::new_err(format!(
125 "Expected instance of {}.{}, got {}.{}",
126 expected_module, expected_name, found_module, found_name
127 )));
128 }
129 Ok(())
130}
131
132fn validate_pycapsule(capsule: &Bound<PyCapsule>, name: &str) -> PyResult<()> {
133 let capsule_name = capsule.name()?;
134 if capsule_name.is_none() {
135 return Err(PyValueError::new_err(
136 "Expected schema PyCapsule to have name set.",
137 ));
138 }
139
140 let capsule_name = capsule_name.unwrap().to_str()?;
141 if capsule_name != name {
142 return Err(PyValueError::new_err(format!(
143 "Expected name '{}' in PyCapsule, instead got '{}'",
144 name, capsule_name
145 )));
146 }
147
148 Ok(())
149}
150
151impl FromPyArrow for DataType {
152 fn from_pyarrow_bound(value: &Bound<PyAny>) -> PyResult<Self> {
153 if value.hasattr("__arrow_c_schema__")? {
157 let capsule = value.getattr("__arrow_c_schema__")?.call0()?;
158 let capsule = capsule.downcast::<PyCapsule>()?;
159 validate_pycapsule(capsule, "arrow_schema")?;
160
161 let schema_ptr = unsafe { capsule.reference::<FFI_ArrowSchema>() };
162 let dtype = DataType::try_from(schema_ptr).map_err(to_py_err)?;
163 return Ok(dtype);
164 }
165
166 validate_class("DataType", value)?;
167
168 let c_schema = FFI_ArrowSchema::empty();
169 let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
170 value.call_method1("_export_to_c", (c_schema_ptr as Py_uintptr_t,))?;
171 let dtype = DataType::try_from(&c_schema).map_err(to_py_err)?;
172 Ok(dtype)
173 }
174}
175
176impl ToPyArrow for DataType {
177 fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
178 let c_schema = FFI_ArrowSchema::try_from(self).map_err(to_py_err)?;
179 let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
180 let module = py.import("pyarrow")?;
181 let class = module.getattr("DataType")?;
182 let dtype = class.call_method1("_import_from_c", (c_schema_ptr as Py_uintptr_t,))?;
183 Ok(dtype.into())
184 }
185}
186
187impl FromPyArrow for Field {
188 fn from_pyarrow_bound(value: &Bound<PyAny>) -> PyResult<Self> {
189 if value.hasattr("__arrow_c_schema__")? {
193 let capsule = value.getattr("__arrow_c_schema__")?.call0()?;
194 let capsule = capsule.downcast::<PyCapsule>()?;
195 validate_pycapsule(capsule, "arrow_schema")?;
196
197 let schema_ptr = unsafe { capsule.reference::<FFI_ArrowSchema>() };
198 let field = Field::try_from(schema_ptr).map_err(to_py_err)?;
199 return Ok(field);
200 }
201
202 validate_class("Field", value)?;
203
204 let c_schema = FFI_ArrowSchema::empty();
205 let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
206 value.call_method1("_export_to_c", (c_schema_ptr as Py_uintptr_t,))?;
207 let field = Field::try_from(&c_schema).map_err(to_py_err)?;
208 Ok(field)
209 }
210}
211
212impl ToPyArrow for Field {
213 fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
214 let c_schema = FFI_ArrowSchema::try_from(self).map_err(to_py_err)?;
215 let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
216 let module = py.import("pyarrow")?;
217 let class = module.getattr("Field")?;
218 let dtype = class.call_method1("_import_from_c", (c_schema_ptr as Py_uintptr_t,))?;
219 Ok(dtype.into())
220 }
221}
222
223impl FromPyArrow for Schema {
224 fn from_pyarrow_bound(value: &Bound<PyAny>) -> PyResult<Self> {
225 if value.hasattr("__arrow_c_schema__")? {
229 let capsule = value.getattr("__arrow_c_schema__")?.call0()?;
230 let capsule = capsule.downcast::<PyCapsule>()?;
231 validate_pycapsule(capsule, "arrow_schema")?;
232
233 let schema_ptr = unsafe { capsule.reference::<FFI_ArrowSchema>() };
234 let schema = Schema::try_from(schema_ptr).map_err(to_py_err)?;
235 return Ok(schema);
236 }
237
238 validate_class("Schema", value)?;
239
240 let c_schema = FFI_ArrowSchema::empty();
241 let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
242 value.call_method1("_export_to_c", (c_schema_ptr as Py_uintptr_t,))?;
243 let schema = Schema::try_from(&c_schema).map_err(to_py_err)?;
244 Ok(schema)
245 }
246}
247
248impl ToPyArrow for Schema {
249 fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
250 let c_schema = FFI_ArrowSchema::try_from(self).map_err(to_py_err)?;
251 let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
252 let module = py.import("pyarrow")?;
253 let class = module.getattr("Schema")?;
254 let schema = class.call_method1("_import_from_c", (c_schema_ptr as Py_uintptr_t,))?;
255 Ok(schema.into())
256 }
257}
258
259impl FromPyArrow for ArrayData {
260 fn from_pyarrow_bound(value: &Bound<PyAny>) -> PyResult<Self> {
261 if value.hasattr("__arrow_c_array__")? {
265 let tuple = value.getattr("__arrow_c_array__")?.call0()?;
266
267 if !tuple.is_instance_of::<PyTuple>() {
268 return Err(PyTypeError::new_err(
269 "Expected __arrow_c_array__ to return a tuple.",
270 ));
271 }
272
273 let schema_capsule = tuple.get_item(0)?;
274 let schema_capsule = schema_capsule.downcast::<PyCapsule>()?;
275 let array_capsule = tuple.get_item(1)?;
276 let array_capsule = array_capsule.downcast::<PyCapsule>()?;
277
278 validate_pycapsule(schema_capsule, "arrow_schema")?;
279 validate_pycapsule(array_capsule, "arrow_array")?;
280
281 let schema_ptr = unsafe { schema_capsule.reference::<FFI_ArrowSchema>() };
282 let array = unsafe { FFI_ArrowArray::from_raw(array_capsule.pointer() as _) };
283 return unsafe { ffi::from_ffi(array, schema_ptr) }.map_err(to_py_err);
284 }
285
286 validate_class("Array", value)?;
287
288 let mut array = FFI_ArrowArray::empty();
290 let mut schema = FFI_ArrowSchema::empty();
291
292 value.call_method1(
296 "_export_to_c",
297 (
298 addr_of_mut!(array) as Py_uintptr_t,
299 addr_of_mut!(schema) as Py_uintptr_t,
300 ),
301 )?;
302
303 unsafe { ffi::from_ffi(array, &schema) }.map_err(to_py_err)
304 }
305}
306
307impl ToPyArrow for ArrayData {
308 fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
309 let array = FFI_ArrowArray::new(self);
310 let schema = FFI_ArrowSchema::try_from(self.data_type()).map_err(to_py_err)?;
311
312 let module = py.import("pyarrow")?;
313 let class = module.getattr("Array")?;
314 let array = class.call_method1(
315 "_import_from_c",
316 (
317 addr_of!(array) as Py_uintptr_t,
318 addr_of!(schema) as Py_uintptr_t,
319 ),
320 )?;
321 Ok(array.unbind())
322 }
323}
324
325impl<T: FromPyArrow> FromPyArrow for Vec<T> {
326 fn from_pyarrow_bound(value: &Bound<PyAny>) -> PyResult<Self> {
327 let list = value.downcast::<PyList>()?;
328 list.iter().map(|x| T::from_pyarrow_bound(&x)).collect()
329 }
330}
331
332impl<T: ToPyArrow> ToPyArrow for Vec<T> {
333 fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
334 let values = self
335 .iter()
336 .map(|v| v.to_pyarrow(py))
337 .collect::<PyResult<Vec<_>>>()?;
338 Ok(PyList::new(py, values)?.unbind().into())
339 }
340}
341
342impl FromPyArrow for RecordBatch {
343 fn from_pyarrow_bound(value: &Bound<PyAny>) -> PyResult<Self> {
344 if value.hasattr("__arrow_c_array__")? {
348 let tuple = value.getattr("__arrow_c_array__")?.call0()?;
349
350 if !tuple.is_instance_of::<PyTuple>() {
351 return Err(PyTypeError::new_err(
352 "Expected __arrow_c_array__ to return a tuple.",
353 ));
354 }
355
356 let schema_capsule = tuple.get_item(0)?;
357 let schema_capsule = schema_capsule.downcast::<PyCapsule>()?;
358 let array_capsule = tuple.get_item(1)?;
359 let array_capsule = array_capsule.downcast::<PyCapsule>()?;
360
361 validate_pycapsule(schema_capsule, "arrow_schema")?;
362 validate_pycapsule(array_capsule, "arrow_array")?;
363
364 let schema_ptr = unsafe { schema_capsule.reference::<FFI_ArrowSchema>() };
365 let ffi_array = unsafe { FFI_ArrowArray::from_raw(array_capsule.pointer().cast()) };
366 let mut array_data =
367 unsafe { ffi::from_ffi(ffi_array, schema_ptr) }.map_err(to_py_err)?;
368 if !matches!(array_data.data_type(), DataType::Struct(_)) {
369 return Err(PyTypeError::new_err(
370 "Expected Struct type from __arrow_c_array.",
371 ));
372 }
373 let options = RecordBatchOptions::default().with_row_count(Some(array_data.len()));
374 array_data.align_buffers();
379 let array = StructArray::from(array_data);
380 let schema = Arc::new(Schema::try_from(schema_ptr).map_err(to_py_err)?);
383 let (_fields, columns, nulls) = array.into_parts();
384 assert_eq!(
385 nulls.map(|n| n.null_count()).unwrap_or_default(),
386 0,
387 "Cannot convert nullable StructArray to RecordBatch, see StructArray documentation"
388 );
389 return RecordBatch::try_new_with_options(schema, columns, &options).map_err(to_py_err);
390 }
391
392 validate_class("RecordBatch", value)?;
393 let schema = value.getattr("schema")?;
395 let schema = Arc::new(Schema::from_pyarrow_bound(&schema)?);
396
397 let arrays = value.getattr("columns")?;
398 let arrays = arrays
399 .downcast::<PyList>()?
400 .iter()
401 .map(|a| Ok(make_array(ArrayData::from_pyarrow_bound(&a)?)))
402 .collect::<PyResult<_>>()?;
403
404 let row_count = value
405 .getattr("num_rows")
406 .ok()
407 .and_then(|x| x.extract().ok());
408 let options = RecordBatchOptions::default().with_row_count(row_count);
409
410 let batch =
411 RecordBatch::try_new_with_options(schema, arrays, &options).map_err(to_py_err)?;
412 Ok(batch)
413 }
414}
415
416impl ToPyArrow for RecordBatch {
417 fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
418 let reader = RecordBatchIterator::new(vec![Ok(self.clone())], self.schema());
420 let reader: Box<dyn RecordBatchReader + Send> = Box::new(reader);
421 let py_reader = reader.into_pyarrow(py)?;
422 py_reader.call_method0(py, "read_next_batch")
423 }
424}
425
426impl FromPyArrow for ArrowArrayStreamReader {
428 fn from_pyarrow_bound(value: &Bound<PyAny>) -> PyResult<Self> {
429 if value.hasattr("__arrow_c_stream__")? {
433 let capsule = value.getattr("__arrow_c_stream__")?.call0()?;
434 let capsule = capsule.downcast::<PyCapsule>()?;
435 validate_pycapsule(capsule, "arrow_array_stream")?;
436
437 let stream = unsafe { FFI_ArrowArrayStream::from_raw(capsule.pointer() as _) };
438
439 let stream_reader = ArrowArrayStreamReader::try_new(stream)
440 .map_err(|err| PyValueError::new_err(err.to_string()))?;
441
442 return Ok(stream_reader);
443 }
444
445 validate_class("RecordBatchReader", value)?;
446
447 let mut stream = FFI_ArrowArrayStream::empty();
449 let stream_ptr = &mut stream as *mut FFI_ArrowArrayStream;
450
451 let args = PyTuple::new(value.py(), [stream_ptr as Py_uintptr_t])?;
455 value.call_method1("_export_to_c", args)?;
456
457 let stream_reader = ArrowArrayStreamReader::try_new(stream)
458 .map_err(|err| PyValueError::new_err(err.to_string()))?;
459
460 Ok(stream_reader)
461 }
462}
463
464impl IntoPyArrow for Box<dyn RecordBatchReader + Send> {
466 fn into_pyarrow(self, py: Python) -> PyResult<PyObject> {
469 let mut stream = FFI_ArrowArrayStream::new(self);
470
471 let stream_ptr = (&mut stream) as *mut FFI_ArrowArrayStream;
472 let module = py.import("pyarrow")?;
473 let class = module.getattr("RecordBatchReader")?;
474 let args = PyTuple::new(py, [stream_ptr as Py_uintptr_t])?;
475 let reader = class.call_method1("_import_from_c", args)?;
476
477 Ok(PyObject::from(reader))
478 }
479}
480
481impl IntoPyArrow for ArrowArrayStreamReader {
483 fn into_pyarrow(self, py: Python) -> PyResult<PyObject> {
484 let boxed: Box<dyn RecordBatchReader + Send> = Box::new(self);
485 boxed.into_pyarrow(py)
486 }
487}
488
489#[derive(Debug)]
495pub struct PyArrowType<T>(pub T);
496
497impl<'source, T: FromPyArrow> FromPyObject<'source> for PyArrowType<T> {
498 fn extract_bound(value: &Bound<'source, PyAny>) -> PyResult<Self> {
499 Ok(Self(T::from_pyarrow_bound(value)?))
500 }
501}
502
503impl<'py, T: IntoPyArrow> IntoPyObject<'py> for PyArrowType<T> {
504 type Target = PyAny;
505
506 type Output = Bound<'py, Self::Target>;
507
508 type Error = PyErr;
509
510 fn into_pyobject(self, py: Python<'py>) -> Result<Self::Output, PyErr> {
511 match self.0.into_pyarrow(py) {
512 Ok(obj) => Result::Ok(obj.into_bound(py)),
513 Err(err) => Result::Err(err),
514 }
515 }
516}
517
518impl<T> From<T> for PyArrowType<T> {
519 fn from(s: T) -> Self {
520 Self(s)
521 }
522}