arrow/
pyarrow.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Pass Arrow objects from and to PyArrow, using Arrow's
19//! [C Data Interface](https://arrow.apache.org/docs/format/CDataInterface.html)
20//! and [pyo3](https://docs.rs/pyo3/latest/pyo3/).
21//!
22//! For underlying implementation, see the [ffi] module.
23//!
24//! One can use these to write Python functions that take and return PyArrow
25//! objects, with automatic conversion to corresponding arrow-rs types.
26//!
27//! ```ignore
28//! #[pyfunction]
29//! fn double_array(array: PyArrowType<ArrayData>) -> PyResult<PyArrowType<ArrayData>> {
30//!     let array = array.0; // Extract from PyArrowType wrapper
31//!     let array: Arc<dyn Array> = make_array(array); // Convert ArrayData to ArrayRef
32//!     let array: &Int32Array = array.as_any().downcast_ref()
33//!         .ok_or_else(|| PyValueError::new_err("expected int32 array"))?;
34//!     let array: Int32Array = array.iter().map(|x| x.map(|x| x * 2)).collect();
35//!     Ok(PyArrowType(array.into_data()))
36//! }
37//! ```
38//!
39//! | pyarrow type                | arrow-rs type                                                      |
40//! |-----------------------------|--------------------------------------------------------------------|
41//! | `pyarrow.DataType`          | [DataType]                                                         |
42//! | `pyarrow.Field`             | [Field]                                                            |
43//! | `pyarrow.Schema`            | [Schema]                                                           |
44//! | `pyarrow.Array`             | [ArrayData]                                                        |
45//! | `pyarrow.RecordBatch`       | [RecordBatch]                                                      |
46//! | `pyarrow.RecordBatchReader` | [ArrowArrayStreamReader] / `Box<dyn RecordBatchReader + Send>` (1) |
47//!
48//! (1) `pyarrow.RecordBatchReader` can be imported as [ArrowArrayStreamReader]. Either
49//! [ArrowArrayStreamReader] or `Box<dyn RecordBatchReader + Send>` can be exported
50//! as `pyarrow.RecordBatchReader`. (`Box<dyn RecordBatchReader + Send>` is typically
51//! easier to create.)
52//!
53//! PyArrow has the notion of chunked arrays and tables, but arrow-rs doesn't
54//! have these same concepts. A chunked table is instead represented with
55//! `Vec<RecordBatch>`. A `pyarrow.Table` can be imported to Rust by calling
56//! [pyarrow.Table.to_reader()](https://arrow.apache.org/docs/python/generated/pyarrow.Table.html#pyarrow.Table.to_reader)
57//! and then importing the reader as a [ArrowArrayStreamReader].
58
59use std::convert::{From, TryFrom};
60use std::ptr::{addr_of, addr_of_mut};
61use std::sync::Arc;
62
63use arrow_array::{RecordBatchIterator, RecordBatchOptions, RecordBatchReader, StructArray};
64use pyo3::exceptions::{PyTypeError, PyValueError};
65use pyo3::ffi::Py_uintptr_t;
66use pyo3::import_exception;
67use pyo3::prelude::*;
68use pyo3::pybacked::PyBackedStr;
69use pyo3::types::{PyCapsule, PyList, PyTuple};
70
71use crate::array::{make_array, ArrayData};
72use crate::datatypes::{DataType, Field, Schema};
73use crate::error::ArrowError;
74use crate::ffi;
75use crate::ffi::{FFI_ArrowArray, FFI_ArrowSchema};
76use crate::ffi_stream::{ArrowArrayStreamReader, FFI_ArrowArrayStream};
77use crate::record_batch::RecordBatch;
78
79import_exception!(pyarrow, ArrowException);
80/// Represents an exception raised by PyArrow.
81pub type PyArrowException = ArrowException;
82
83fn to_py_err(err: ArrowError) -> PyErr {
84    PyArrowException::new_err(err.to_string())
85}
86
87/// Trait for converting Python objects to arrow-rs types.
88pub trait FromPyArrow: Sized {
89    /// Convert a Python object to an arrow-rs type.
90    ///
91    /// Takes a GIL-bound value from Python and returns a result with the arrow-rs type.
92    fn from_pyarrow_bound(value: &Bound<PyAny>) -> PyResult<Self>;
93}
94
95/// Create a new PyArrow object from a arrow-rs type.
96pub trait ToPyArrow {
97    /// Convert the implemented type into a Python object without consuming it.
98    fn to_pyarrow(&self, py: Python) -> PyResult<PyObject>;
99}
100
101/// Convert an arrow-rs type into a PyArrow object.
102pub trait IntoPyArrow {
103    /// Convert the implemented type into a Python object while consuming it.
104    fn into_pyarrow(self, py: Python) -> PyResult<PyObject>;
105}
106
107impl<T: ToPyArrow> IntoPyArrow for T {
108    fn into_pyarrow(self, py: Python) -> PyResult<PyObject> {
109        self.to_pyarrow(py)
110    }
111}
112
113fn validate_class(expected: &str, value: &Bound<PyAny>) -> PyResult<()> {
114    let pyarrow = PyModule::import(value.py(), "pyarrow")?;
115    let class = pyarrow.getattr(expected)?;
116    if !value.is_instance(&class)? {
117        let expected_module = class.getattr("__module__")?.extract::<PyBackedStr>()?;
118        let expected_name = class.getattr("__name__")?.extract::<PyBackedStr>()?;
119        let found_class = value.get_type();
120        let found_module = found_class
121            .getattr("__module__")?
122            .extract::<PyBackedStr>()?;
123        let found_name = found_class.getattr("__name__")?.extract::<PyBackedStr>()?;
124        return Err(PyTypeError::new_err(format!(
125            "Expected instance of {}.{}, got {}.{}",
126            expected_module, expected_name, found_module, found_name
127        )));
128    }
129    Ok(())
130}
131
132fn validate_pycapsule(capsule: &Bound<PyCapsule>, name: &str) -> PyResult<()> {
133    let capsule_name = capsule.name()?;
134    if capsule_name.is_none() {
135        return Err(PyValueError::new_err(
136            "Expected schema PyCapsule to have name set.",
137        ));
138    }
139
140    let capsule_name = capsule_name.unwrap().to_str()?;
141    if capsule_name != name {
142        return Err(PyValueError::new_err(format!(
143            "Expected name '{}' in PyCapsule, instead got '{}'",
144            name, capsule_name
145        )));
146    }
147
148    Ok(())
149}
150
151impl FromPyArrow for DataType {
152    fn from_pyarrow_bound(value: &Bound<PyAny>) -> PyResult<Self> {
153        // Newer versions of PyArrow as well as other libraries with Arrow data implement this
154        // method, so prefer it over _export_to_c.
155        // See https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html
156        if value.hasattr("__arrow_c_schema__")? {
157            let capsule = value.getattr("__arrow_c_schema__")?.call0()?;
158            let capsule = capsule.downcast::<PyCapsule>()?;
159            validate_pycapsule(capsule, "arrow_schema")?;
160
161            let schema_ptr = unsafe { capsule.reference::<FFI_ArrowSchema>() };
162            let dtype = DataType::try_from(schema_ptr).map_err(to_py_err)?;
163            return Ok(dtype);
164        }
165
166        validate_class("DataType", value)?;
167
168        let c_schema = FFI_ArrowSchema::empty();
169        let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
170        value.call_method1("_export_to_c", (c_schema_ptr as Py_uintptr_t,))?;
171        let dtype = DataType::try_from(&c_schema).map_err(to_py_err)?;
172        Ok(dtype)
173    }
174}
175
176impl ToPyArrow for DataType {
177    fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
178        let c_schema = FFI_ArrowSchema::try_from(self).map_err(to_py_err)?;
179        let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
180        let module = py.import("pyarrow")?;
181        let class = module.getattr("DataType")?;
182        let dtype = class.call_method1("_import_from_c", (c_schema_ptr as Py_uintptr_t,))?;
183        Ok(dtype.into())
184    }
185}
186
187impl FromPyArrow for Field {
188    fn from_pyarrow_bound(value: &Bound<PyAny>) -> PyResult<Self> {
189        // Newer versions of PyArrow as well as other libraries with Arrow data implement this
190        // method, so prefer it over _export_to_c.
191        // See https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html
192        if value.hasattr("__arrow_c_schema__")? {
193            let capsule = value.getattr("__arrow_c_schema__")?.call0()?;
194            let capsule = capsule.downcast::<PyCapsule>()?;
195            validate_pycapsule(capsule, "arrow_schema")?;
196
197            let schema_ptr = unsafe { capsule.reference::<FFI_ArrowSchema>() };
198            let field = Field::try_from(schema_ptr).map_err(to_py_err)?;
199            return Ok(field);
200        }
201
202        validate_class("Field", value)?;
203
204        let c_schema = FFI_ArrowSchema::empty();
205        let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
206        value.call_method1("_export_to_c", (c_schema_ptr as Py_uintptr_t,))?;
207        let field = Field::try_from(&c_schema).map_err(to_py_err)?;
208        Ok(field)
209    }
210}
211
212impl ToPyArrow for Field {
213    fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
214        let c_schema = FFI_ArrowSchema::try_from(self).map_err(to_py_err)?;
215        let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
216        let module = py.import("pyarrow")?;
217        let class = module.getattr("Field")?;
218        let dtype = class.call_method1("_import_from_c", (c_schema_ptr as Py_uintptr_t,))?;
219        Ok(dtype.into())
220    }
221}
222
223impl FromPyArrow for Schema {
224    fn from_pyarrow_bound(value: &Bound<PyAny>) -> PyResult<Self> {
225        // Newer versions of PyArrow as well as other libraries with Arrow data implement this
226        // method, so prefer it over _export_to_c.
227        // See https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html
228        if value.hasattr("__arrow_c_schema__")? {
229            let capsule = value.getattr("__arrow_c_schema__")?.call0()?;
230            let capsule = capsule.downcast::<PyCapsule>()?;
231            validate_pycapsule(capsule, "arrow_schema")?;
232
233            let schema_ptr = unsafe { capsule.reference::<FFI_ArrowSchema>() };
234            let schema = Schema::try_from(schema_ptr).map_err(to_py_err)?;
235            return Ok(schema);
236        }
237
238        validate_class("Schema", value)?;
239
240        let c_schema = FFI_ArrowSchema::empty();
241        let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
242        value.call_method1("_export_to_c", (c_schema_ptr as Py_uintptr_t,))?;
243        let schema = Schema::try_from(&c_schema).map_err(to_py_err)?;
244        Ok(schema)
245    }
246}
247
248impl ToPyArrow for Schema {
249    fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
250        let c_schema = FFI_ArrowSchema::try_from(self).map_err(to_py_err)?;
251        let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
252        let module = py.import("pyarrow")?;
253        let class = module.getattr("Schema")?;
254        let schema = class.call_method1("_import_from_c", (c_schema_ptr as Py_uintptr_t,))?;
255        Ok(schema.into())
256    }
257}
258
259impl FromPyArrow for ArrayData {
260    fn from_pyarrow_bound(value: &Bound<PyAny>) -> PyResult<Self> {
261        // Newer versions of PyArrow as well as other libraries with Arrow data implement this
262        // method, so prefer it over _export_to_c.
263        // See https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html
264        if value.hasattr("__arrow_c_array__")? {
265            let tuple = value.getattr("__arrow_c_array__")?.call0()?;
266
267            if !tuple.is_instance_of::<PyTuple>() {
268                return Err(PyTypeError::new_err(
269                    "Expected __arrow_c_array__ to return a tuple.",
270                ));
271            }
272
273            let schema_capsule = tuple.get_item(0)?;
274            let schema_capsule = schema_capsule.downcast::<PyCapsule>()?;
275            let array_capsule = tuple.get_item(1)?;
276            let array_capsule = array_capsule.downcast::<PyCapsule>()?;
277
278            validate_pycapsule(schema_capsule, "arrow_schema")?;
279            validate_pycapsule(array_capsule, "arrow_array")?;
280
281            let schema_ptr = unsafe { schema_capsule.reference::<FFI_ArrowSchema>() };
282            let array = unsafe { FFI_ArrowArray::from_raw(array_capsule.pointer() as _) };
283            return unsafe { ffi::from_ffi(array, schema_ptr) }.map_err(to_py_err);
284        }
285
286        validate_class("Array", value)?;
287
288        // prepare a pointer to receive the Array struct
289        let mut array = FFI_ArrowArray::empty();
290        let mut schema = FFI_ArrowSchema::empty();
291
292        // make the conversion through PyArrow's private API
293        // this changes the pointer's memory and is thus unsafe.
294        // In particular, `_export_to_c` can go out of bounds
295        value.call_method1(
296            "_export_to_c",
297            (
298                addr_of_mut!(array) as Py_uintptr_t,
299                addr_of_mut!(schema) as Py_uintptr_t,
300            ),
301        )?;
302
303        unsafe { ffi::from_ffi(array, &schema) }.map_err(to_py_err)
304    }
305}
306
307impl ToPyArrow for ArrayData {
308    fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
309        let array = FFI_ArrowArray::new(self);
310        let schema = FFI_ArrowSchema::try_from(self.data_type()).map_err(to_py_err)?;
311
312        let module = py.import("pyarrow")?;
313        let class = module.getattr("Array")?;
314        let array = class.call_method1(
315            "_import_from_c",
316            (
317                addr_of!(array) as Py_uintptr_t,
318                addr_of!(schema) as Py_uintptr_t,
319            ),
320        )?;
321        Ok(array.unbind())
322    }
323}
324
325impl<T: FromPyArrow> FromPyArrow for Vec<T> {
326    fn from_pyarrow_bound(value: &Bound<PyAny>) -> PyResult<Self> {
327        let list = value.downcast::<PyList>()?;
328        list.iter().map(|x| T::from_pyarrow_bound(&x)).collect()
329    }
330}
331
332impl<T: ToPyArrow> ToPyArrow for Vec<T> {
333    fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
334        let values = self
335            .iter()
336            .map(|v| v.to_pyarrow(py))
337            .collect::<PyResult<Vec<_>>>()?;
338        Ok(PyList::new(py, values)?.unbind().into())
339    }
340}
341
342impl FromPyArrow for RecordBatch {
343    fn from_pyarrow_bound(value: &Bound<PyAny>) -> PyResult<Self> {
344        // Newer versions of PyArrow as well as other libraries with Arrow data implement this
345        // method, so prefer it over _export_to_c.
346        // See https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html
347        if value.hasattr("__arrow_c_array__")? {
348            let tuple = value.getattr("__arrow_c_array__")?.call0()?;
349
350            if !tuple.is_instance_of::<PyTuple>() {
351                return Err(PyTypeError::new_err(
352                    "Expected __arrow_c_array__ to return a tuple.",
353                ));
354            }
355
356            let schema_capsule = tuple.get_item(0)?;
357            let schema_capsule = schema_capsule.downcast::<PyCapsule>()?;
358            let array_capsule = tuple.get_item(1)?;
359            let array_capsule = array_capsule.downcast::<PyCapsule>()?;
360
361            validate_pycapsule(schema_capsule, "arrow_schema")?;
362            validate_pycapsule(array_capsule, "arrow_array")?;
363
364            let schema_ptr = unsafe { schema_capsule.reference::<FFI_ArrowSchema>() };
365            let ffi_array = unsafe { FFI_ArrowArray::from_raw(array_capsule.pointer().cast()) };
366            let mut array_data =
367                unsafe { ffi::from_ffi(ffi_array, schema_ptr) }.map_err(to_py_err)?;
368            if !matches!(array_data.data_type(), DataType::Struct(_)) {
369                return Err(PyTypeError::new_err(
370                    "Expected Struct type from __arrow_c_array.",
371                ));
372            }
373            let options = RecordBatchOptions::default().with_row_count(Some(array_data.len()));
374            // Ensure data is aligned (by potentially copying the buffers).
375            // This is needed because some python code (for example the
376            // python flight client) produces unaligned buffers
377            // See https://github.com/apache/arrow/issues/43552 for details
378            array_data.align_buffers();
379            let array = StructArray::from(array_data);
380            // StructArray does not embed metadata from schema. We need to override
381            // the output schema with the schema from the capsule.
382            let schema = Arc::new(Schema::try_from(schema_ptr).map_err(to_py_err)?);
383            let (_fields, columns, nulls) = array.into_parts();
384            assert_eq!(
385                nulls.map(|n| n.null_count()).unwrap_or_default(),
386                0,
387                "Cannot convert nullable StructArray to RecordBatch, see StructArray documentation"
388            );
389            return RecordBatch::try_new_with_options(schema, columns, &options).map_err(to_py_err);
390        }
391
392        validate_class("RecordBatch", value)?;
393        // TODO(kszucs): implement the FFI conversions in arrow-rs for RecordBatches
394        let schema = value.getattr("schema")?;
395        let schema = Arc::new(Schema::from_pyarrow_bound(&schema)?);
396
397        let arrays = value.getattr("columns")?;
398        let arrays = arrays
399            .downcast::<PyList>()?
400            .iter()
401            .map(|a| Ok(make_array(ArrayData::from_pyarrow_bound(&a)?)))
402            .collect::<PyResult<_>>()?;
403
404        let row_count = value
405            .getattr("num_rows")
406            .ok()
407            .and_then(|x| x.extract().ok());
408        let options = RecordBatchOptions::default().with_row_count(row_count);
409
410        let batch =
411            RecordBatch::try_new_with_options(schema, arrays, &options).map_err(to_py_err)?;
412        Ok(batch)
413    }
414}
415
416impl ToPyArrow for RecordBatch {
417    fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
418        // Workaround apache/arrow#37669 by returning RecordBatchIterator
419        let reader = RecordBatchIterator::new(vec![Ok(self.clone())], self.schema());
420        let reader: Box<dyn RecordBatchReader + Send> = Box::new(reader);
421        let py_reader = reader.into_pyarrow(py)?;
422        py_reader.call_method0(py, "read_next_batch")
423    }
424}
425
426/// Supports conversion from `pyarrow.RecordBatchReader` to [ArrowArrayStreamReader].
427impl FromPyArrow for ArrowArrayStreamReader {
428    fn from_pyarrow_bound(value: &Bound<PyAny>) -> PyResult<Self> {
429        // Newer versions of PyArrow as well as other libraries with Arrow data implement this
430        // method, so prefer it over _export_to_c.
431        // See https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html
432        if value.hasattr("__arrow_c_stream__")? {
433            let capsule = value.getattr("__arrow_c_stream__")?.call0()?;
434            let capsule = capsule.downcast::<PyCapsule>()?;
435            validate_pycapsule(capsule, "arrow_array_stream")?;
436
437            let stream = unsafe { FFI_ArrowArrayStream::from_raw(capsule.pointer() as _) };
438
439            let stream_reader = ArrowArrayStreamReader::try_new(stream)
440                .map_err(|err| PyValueError::new_err(err.to_string()))?;
441
442            return Ok(stream_reader);
443        }
444
445        validate_class("RecordBatchReader", value)?;
446
447        // prepare a pointer to receive the stream struct
448        let mut stream = FFI_ArrowArrayStream::empty();
449        let stream_ptr = &mut stream as *mut FFI_ArrowArrayStream;
450
451        // make the conversion through PyArrow's private API
452        // this changes the pointer's memory and is thus unsafe.
453        // In particular, `_export_to_c` can go out of bounds
454        let args = PyTuple::new(value.py(), [stream_ptr as Py_uintptr_t])?;
455        value.call_method1("_export_to_c", args)?;
456
457        let stream_reader = ArrowArrayStreamReader::try_new(stream)
458            .map_err(|err| PyValueError::new_err(err.to_string()))?;
459
460        Ok(stream_reader)
461    }
462}
463
464/// Convert a [`RecordBatchReader`] into a `pyarrow.RecordBatchReader`.
465impl IntoPyArrow for Box<dyn RecordBatchReader + Send> {
466    // We can't implement `ToPyArrow` for `T: RecordBatchReader + Send` because
467    // there is already a blanket implementation for `T: ToPyArrow`.
468    fn into_pyarrow(self, py: Python) -> PyResult<PyObject> {
469        let mut stream = FFI_ArrowArrayStream::new(self);
470
471        let stream_ptr = (&mut stream) as *mut FFI_ArrowArrayStream;
472        let module = py.import("pyarrow")?;
473        let class = module.getattr("RecordBatchReader")?;
474        let args = PyTuple::new(py, [stream_ptr as Py_uintptr_t])?;
475        let reader = class.call_method1("_import_from_c", args)?;
476
477        Ok(PyObject::from(reader))
478    }
479}
480
481/// Convert a [`ArrowArrayStreamReader`] into a `pyarrow.RecordBatchReader`.
482impl IntoPyArrow for ArrowArrayStreamReader {
483    fn into_pyarrow(self, py: Python) -> PyResult<PyObject> {
484        let boxed: Box<dyn RecordBatchReader + Send> = Box::new(self);
485        boxed.into_pyarrow(py)
486    }
487}
488
489/// A newtype wrapper for types implementing [`FromPyArrow`] or [`IntoPyArrow`].
490///
491/// When wrapped around a type `T: FromPyArrow`, it
492/// implements [`FromPyObject`] for the PyArrow objects. When wrapped around a
493/// `T: IntoPyArrow`, it implements `IntoPy<PyObject>` for the wrapped type.
494#[derive(Debug)]
495pub struct PyArrowType<T>(pub T);
496
497impl<'source, T: FromPyArrow> FromPyObject<'source> for PyArrowType<T> {
498    fn extract_bound(value: &Bound<'source, PyAny>) -> PyResult<Self> {
499        Ok(Self(T::from_pyarrow_bound(value)?))
500    }
501}
502
503impl<'py, T: IntoPyArrow> IntoPyObject<'py> for PyArrowType<T> {
504    type Target = PyAny;
505
506    type Output = Bound<'py, Self::Target>;
507
508    type Error = PyErr;
509
510    fn into_pyobject(self, py: Python<'py>) -> Result<Self::Output, PyErr> {
511        match self.0.into_pyarrow(py) {
512            Ok(obj) => Result::Ok(obj.into_bound(py)),
513            Err(err) => Result::Err(err),
514        }
515    }
516}
517
518impl<T> From<T> for PyArrowType<T> {
519    fn from(s: T) -> Self {
520        Self(s)
521    }
522}