arrow_pyarrow/
lib.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Pass Arrow objects from and to PyArrow, using Arrow's
19//! [C Data Interface](https://arrow.apache.org/docs/format/CDataInterface.html)
20//! and [pyo3](https://docs.rs/pyo3/latest/pyo3/).
21//!
22//! For underlying implementation, see the [ffi] module.
23//!
24//! One can use these to write Python functions that take and return PyArrow
25//! objects, with automatic conversion to corresponding arrow-rs types.
26//!
27//! ```ignore
28//! #[pyfunction]
29//! fn double_array(array: PyArrowType<ArrayData>) -> PyResult<PyArrowType<ArrayData>> {
30//!     let array = array.0; // Extract from PyArrowType wrapper
31//!     let array: Arc<dyn Array> = make_array(array); // Convert ArrayData to ArrayRef
32//!     let array: &Int32Array = array.as_any().downcast_ref()
33//!         .ok_or_else(|| PyValueError::new_err("expected int32 array"))?;
34//!     let array: Int32Array = array.iter().map(|x| x.map(|x| x * 2)).collect();
35//!     Ok(PyArrowType(array.into_data()))
36//! }
37//! ```
38//!
39//! | pyarrow type                | arrow-rs type                                                      |
40//! |-----------------------------|--------------------------------------------------------------------|
41//! | `pyarrow.DataType`          | [DataType]                                                         |
42//! | `pyarrow.Field`             | [Field]                                                            |
43//! | `pyarrow.Schema`            | [Schema]                                                           |
44//! | `pyarrow.Array`             | [ArrayData]                                                        |
45//! | `pyarrow.RecordBatch`       | [RecordBatch]                                                      |
46//! | `pyarrow.RecordBatchReader` | [ArrowArrayStreamReader] / `Box<dyn RecordBatchReader + Send>` (1) |
47//!
48//! (1) `pyarrow.RecordBatchReader` can be imported as [ArrowArrayStreamReader]. Either
49//! [ArrowArrayStreamReader] or `Box<dyn RecordBatchReader + Send>` can be exported
50//! as `pyarrow.RecordBatchReader`. (`Box<dyn RecordBatchReader + Send>` is typically
51//! easier to create.)
52//!
53//! PyArrow has the notion of chunked arrays and tables, but arrow-rs doesn't
54//! have these same concepts. A chunked table is instead represented with
55//! `Vec<RecordBatch>`. A `pyarrow.Table` can be imported to Rust by calling
56//! [pyarrow.Table.to_reader()](https://arrow.apache.org/docs/python/generated/pyarrow.Table.html#pyarrow.Table.to_reader)
57//! and then importing the reader as a [ArrowArrayStreamReader].
58
59use std::convert::{From, TryFrom};
60use std::ptr::{addr_of, addr_of_mut};
61use std::sync::Arc;
62
63use arrow_array::ffi;
64use arrow_array::ffi::{FFI_ArrowArray, FFI_ArrowSchema};
65use arrow_array::ffi_stream::{ArrowArrayStreamReader, FFI_ArrowArrayStream};
66use arrow_array::{
67    make_array, RecordBatch, RecordBatchIterator, RecordBatchOptions, RecordBatchReader,
68    StructArray,
69};
70use arrow_data::ArrayData;
71use arrow_schema::{ArrowError, DataType, Field, Schema};
72use pyo3::exceptions::{PyTypeError, PyValueError};
73use pyo3::ffi::Py_uintptr_t;
74use pyo3::import_exception;
75use pyo3::prelude::*;
76use pyo3::pybacked::PyBackedStr;
77use pyo3::types::{PyCapsule, PyList, PyTuple};
78
79import_exception!(pyarrow, ArrowException);
80/// Represents an exception raised by PyArrow.
81pub type PyArrowException = ArrowException;
82
83fn to_py_err(err: ArrowError) -> PyErr {
84    PyArrowException::new_err(err.to_string())
85}
86
87/// Trait for converting Python objects to arrow-rs types.
88pub trait FromPyArrow: Sized {
89    /// Convert a Python object to an arrow-rs type.
90    ///
91    /// Takes a GIL-bound value from Python and returns a result with the arrow-rs type.
92    fn from_pyarrow_bound(value: &Bound<PyAny>) -> PyResult<Self>;
93}
94
95/// Create a new PyArrow object from a arrow-rs type.
96pub trait ToPyArrow {
97    /// Convert the implemented type into a Python object without consuming it.
98    fn to_pyarrow(&self, py: Python) -> PyResult<PyObject>;
99}
100
101/// Convert an arrow-rs type into a PyArrow object.
102pub trait IntoPyArrow {
103    /// Convert the implemented type into a Python object while consuming it.
104    fn into_pyarrow(self, py: Python) -> PyResult<PyObject>;
105}
106
107impl<T: ToPyArrow> IntoPyArrow for T {
108    fn into_pyarrow(self, py: Python) -> PyResult<PyObject> {
109        self.to_pyarrow(py)
110    }
111}
112
113fn validate_class(expected: &str, value: &Bound<PyAny>) -> PyResult<()> {
114    let pyarrow = PyModule::import(value.py(), "pyarrow")?;
115    let class = pyarrow.getattr(expected)?;
116    if !value.is_instance(&class)? {
117        let expected_module = class.getattr("__module__")?.extract::<PyBackedStr>()?;
118        let expected_name = class.getattr("__name__")?.extract::<PyBackedStr>()?;
119        let found_class = value.get_type();
120        let found_module = found_class
121            .getattr("__module__")?
122            .extract::<PyBackedStr>()?;
123        let found_name = found_class.getattr("__name__")?.extract::<PyBackedStr>()?;
124        return Err(PyTypeError::new_err(format!(
125            "Expected instance of {expected_module}.{expected_name}, got {found_module}.{found_name}",
126        )));
127    }
128    Ok(())
129}
130
131fn validate_pycapsule(capsule: &Bound<PyCapsule>, name: &str) -> PyResult<()> {
132    let capsule_name = capsule.name()?;
133    if capsule_name.is_none() {
134        return Err(PyValueError::new_err(
135            "Expected schema PyCapsule to have name set.",
136        ));
137    }
138
139    let capsule_name = capsule_name.unwrap().to_str()?;
140    if capsule_name != name {
141        return Err(PyValueError::new_err(format!(
142            "Expected name '{name}' in PyCapsule, instead got '{capsule_name}'",
143        )));
144    }
145
146    Ok(())
147}
148
149impl FromPyArrow for DataType {
150    fn from_pyarrow_bound(value: &Bound<PyAny>) -> PyResult<Self> {
151        // Newer versions of PyArrow as well as other libraries with Arrow data implement this
152        // method, so prefer it over _export_to_c.
153        // See https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html
154        if value.hasattr("__arrow_c_schema__")? {
155            let capsule = value.getattr("__arrow_c_schema__")?.call0()?;
156            let capsule = capsule.downcast::<PyCapsule>()?;
157            validate_pycapsule(capsule, "arrow_schema")?;
158
159            let schema_ptr = unsafe { capsule.reference::<FFI_ArrowSchema>() };
160            let dtype = DataType::try_from(schema_ptr).map_err(to_py_err)?;
161            return Ok(dtype);
162        }
163
164        validate_class("DataType", value)?;
165
166        let c_schema = FFI_ArrowSchema::empty();
167        let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
168        value.call_method1("_export_to_c", (c_schema_ptr as Py_uintptr_t,))?;
169        let dtype = DataType::try_from(&c_schema).map_err(to_py_err)?;
170        Ok(dtype)
171    }
172}
173
174impl ToPyArrow for DataType {
175    fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
176        let c_schema = FFI_ArrowSchema::try_from(self).map_err(to_py_err)?;
177        let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
178        let module = py.import("pyarrow")?;
179        let class = module.getattr("DataType")?;
180        let dtype = class.call_method1("_import_from_c", (c_schema_ptr as Py_uintptr_t,))?;
181        Ok(dtype.into())
182    }
183}
184
185impl FromPyArrow for Field {
186    fn from_pyarrow_bound(value: &Bound<PyAny>) -> PyResult<Self> {
187        // Newer versions of PyArrow as well as other libraries with Arrow data implement this
188        // method, so prefer it over _export_to_c.
189        // See https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html
190        if value.hasattr("__arrow_c_schema__")? {
191            let capsule = value.getattr("__arrow_c_schema__")?.call0()?;
192            let capsule = capsule.downcast::<PyCapsule>()?;
193            validate_pycapsule(capsule, "arrow_schema")?;
194
195            let schema_ptr = unsafe { capsule.reference::<FFI_ArrowSchema>() };
196            let field = Field::try_from(schema_ptr).map_err(to_py_err)?;
197            return Ok(field);
198        }
199
200        validate_class("Field", value)?;
201
202        let c_schema = FFI_ArrowSchema::empty();
203        let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
204        value.call_method1("_export_to_c", (c_schema_ptr as Py_uintptr_t,))?;
205        let field = Field::try_from(&c_schema).map_err(to_py_err)?;
206        Ok(field)
207    }
208}
209
210impl ToPyArrow for Field {
211    fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
212        let c_schema = FFI_ArrowSchema::try_from(self).map_err(to_py_err)?;
213        let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
214        let module = py.import("pyarrow")?;
215        let class = module.getattr("Field")?;
216        let dtype = class.call_method1("_import_from_c", (c_schema_ptr as Py_uintptr_t,))?;
217        Ok(dtype.into())
218    }
219}
220
221impl FromPyArrow for Schema {
222    fn from_pyarrow_bound(value: &Bound<PyAny>) -> PyResult<Self> {
223        // Newer versions of PyArrow as well as other libraries with Arrow data implement this
224        // method, so prefer it over _export_to_c.
225        // See https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html
226        if value.hasattr("__arrow_c_schema__")? {
227            let capsule = value.getattr("__arrow_c_schema__")?.call0()?;
228            let capsule = capsule.downcast::<PyCapsule>()?;
229            validate_pycapsule(capsule, "arrow_schema")?;
230
231            let schema_ptr = unsafe { capsule.reference::<FFI_ArrowSchema>() };
232            let schema = Schema::try_from(schema_ptr).map_err(to_py_err)?;
233            return Ok(schema);
234        }
235
236        validate_class("Schema", value)?;
237
238        let c_schema = FFI_ArrowSchema::empty();
239        let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
240        value.call_method1("_export_to_c", (c_schema_ptr as Py_uintptr_t,))?;
241        let schema = Schema::try_from(&c_schema).map_err(to_py_err)?;
242        Ok(schema)
243    }
244}
245
246impl ToPyArrow for Schema {
247    fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
248        let c_schema = FFI_ArrowSchema::try_from(self).map_err(to_py_err)?;
249        let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
250        let module = py.import("pyarrow")?;
251        let class = module.getattr("Schema")?;
252        let schema = class.call_method1("_import_from_c", (c_schema_ptr as Py_uintptr_t,))?;
253        Ok(schema.into())
254    }
255}
256
257impl FromPyArrow for ArrayData {
258    fn from_pyarrow_bound(value: &Bound<PyAny>) -> PyResult<Self> {
259        // Newer versions of PyArrow as well as other libraries with Arrow data implement this
260        // method, so prefer it over _export_to_c.
261        // See https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html
262        if value.hasattr("__arrow_c_array__")? {
263            let tuple = value.getattr("__arrow_c_array__")?.call0()?;
264
265            if !tuple.is_instance_of::<PyTuple>() {
266                return Err(PyTypeError::new_err(
267                    "Expected __arrow_c_array__ to return a tuple.",
268                ));
269            }
270
271            let schema_capsule = tuple.get_item(0)?;
272            let schema_capsule = schema_capsule.downcast::<PyCapsule>()?;
273            let array_capsule = tuple.get_item(1)?;
274            let array_capsule = array_capsule.downcast::<PyCapsule>()?;
275
276            validate_pycapsule(schema_capsule, "arrow_schema")?;
277            validate_pycapsule(array_capsule, "arrow_array")?;
278
279            let schema_ptr = unsafe { schema_capsule.reference::<FFI_ArrowSchema>() };
280            let array = unsafe { FFI_ArrowArray::from_raw(array_capsule.pointer() as _) };
281            return unsafe { ffi::from_ffi(array, schema_ptr) }.map_err(to_py_err);
282        }
283
284        validate_class("Array", value)?;
285
286        // prepare a pointer to receive the Array struct
287        let mut array = FFI_ArrowArray::empty();
288        let mut schema = FFI_ArrowSchema::empty();
289
290        // make the conversion through PyArrow's private API
291        // this changes the pointer's memory and is thus unsafe.
292        // In particular, `_export_to_c` can go out of bounds
293        value.call_method1(
294            "_export_to_c",
295            (
296                addr_of_mut!(array) as Py_uintptr_t,
297                addr_of_mut!(schema) as Py_uintptr_t,
298            ),
299        )?;
300
301        unsafe { ffi::from_ffi(array, &schema) }.map_err(to_py_err)
302    }
303}
304
305impl ToPyArrow for ArrayData {
306    fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
307        let array = FFI_ArrowArray::new(self);
308        let schema = FFI_ArrowSchema::try_from(self.data_type()).map_err(to_py_err)?;
309
310        let module = py.import("pyarrow")?;
311        let class = module.getattr("Array")?;
312        let array = class.call_method1(
313            "_import_from_c",
314            (
315                addr_of!(array) as Py_uintptr_t,
316                addr_of!(schema) as Py_uintptr_t,
317            ),
318        )?;
319        Ok(array.unbind())
320    }
321}
322
323impl<T: FromPyArrow> FromPyArrow for Vec<T> {
324    fn from_pyarrow_bound(value: &Bound<PyAny>) -> PyResult<Self> {
325        let list = value.downcast::<PyList>()?;
326        list.iter().map(|x| T::from_pyarrow_bound(&x)).collect()
327    }
328}
329
330impl<T: ToPyArrow> ToPyArrow for Vec<T> {
331    fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
332        let values = self
333            .iter()
334            .map(|v| v.to_pyarrow(py))
335            .collect::<PyResult<Vec<_>>>()?;
336        Ok(PyList::new(py, values)?.unbind().into())
337    }
338}
339
340impl FromPyArrow for RecordBatch {
341    fn from_pyarrow_bound(value: &Bound<PyAny>) -> PyResult<Self> {
342        // Newer versions of PyArrow as well as other libraries with Arrow data implement this
343        // method, so prefer it over _export_to_c.
344        // See https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html
345        if value.hasattr("__arrow_c_array__")? {
346            let tuple = value.getattr("__arrow_c_array__")?.call0()?;
347
348            if !tuple.is_instance_of::<PyTuple>() {
349                return Err(PyTypeError::new_err(
350                    "Expected __arrow_c_array__ to return a tuple.",
351                ));
352            }
353
354            let schema_capsule = tuple.get_item(0)?;
355            let schema_capsule = schema_capsule.downcast::<PyCapsule>()?;
356            let array_capsule = tuple.get_item(1)?;
357            let array_capsule = array_capsule.downcast::<PyCapsule>()?;
358
359            validate_pycapsule(schema_capsule, "arrow_schema")?;
360            validate_pycapsule(array_capsule, "arrow_array")?;
361
362            let schema_ptr = unsafe { schema_capsule.reference::<FFI_ArrowSchema>() };
363            let ffi_array = unsafe { FFI_ArrowArray::from_raw(array_capsule.pointer().cast()) };
364            let mut array_data =
365                unsafe { ffi::from_ffi(ffi_array, schema_ptr) }.map_err(to_py_err)?;
366            if !matches!(array_data.data_type(), DataType::Struct(_)) {
367                return Err(PyTypeError::new_err(
368                    "Expected Struct type from __arrow_c_array.",
369                ));
370            }
371            let options = RecordBatchOptions::default().with_row_count(Some(array_data.len()));
372            // Ensure data is aligned (by potentially copying the buffers).
373            // This is needed because some python code (for example the
374            // python flight client) produces unaligned buffers
375            // See https://github.com/apache/arrow/issues/43552 for details
376            array_data.align_buffers();
377            let array = StructArray::from(array_data);
378            // StructArray does not embed metadata from schema. We need to override
379            // the output schema with the schema from the capsule.
380            let schema = Arc::new(Schema::try_from(schema_ptr).map_err(to_py_err)?);
381            let (_fields, columns, nulls) = array.into_parts();
382            assert_eq!(
383                nulls.map(|n| n.null_count()).unwrap_or_default(),
384                0,
385                "Cannot convert nullable StructArray to RecordBatch, see StructArray documentation"
386            );
387            return RecordBatch::try_new_with_options(schema, columns, &options).map_err(to_py_err);
388        }
389
390        validate_class("RecordBatch", value)?;
391        // TODO(kszucs): implement the FFI conversions in arrow-rs for RecordBatches
392        let schema = value.getattr("schema")?;
393        let schema = Arc::new(Schema::from_pyarrow_bound(&schema)?);
394
395        let arrays = value.getattr("columns")?;
396        let arrays = arrays
397            .downcast::<PyList>()?
398            .iter()
399            .map(|a| Ok(make_array(ArrayData::from_pyarrow_bound(&a)?)))
400            .collect::<PyResult<_>>()?;
401
402        let row_count = value
403            .getattr("num_rows")
404            .ok()
405            .and_then(|x| x.extract().ok());
406        let options = RecordBatchOptions::default().with_row_count(row_count);
407
408        let batch =
409            RecordBatch::try_new_with_options(schema, arrays, &options).map_err(to_py_err)?;
410        Ok(batch)
411    }
412}
413
414impl ToPyArrow for RecordBatch {
415    fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
416        // Workaround apache/arrow#37669 by returning RecordBatchIterator
417        let reader = RecordBatchIterator::new(vec![Ok(self.clone())], self.schema());
418        let reader: Box<dyn RecordBatchReader + Send> = Box::new(reader);
419        let py_reader = reader.into_pyarrow(py)?;
420        py_reader.call_method0(py, "read_next_batch")
421    }
422}
423
424/// Supports conversion from `pyarrow.RecordBatchReader` to [ArrowArrayStreamReader].
425impl FromPyArrow for ArrowArrayStreamReader {
426    fn from_pyarrow_bound(value: &Bound<PyAny>) -> PyResult<Self> {
427        // Newer versions of PyArrow as well as other libraries with Arrow data implement this
428        // method, so prefer it over _export_to_c.
429        // See https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html
430        if value.hasattr("__arrow_c_stream__")? {
431            let capsule = value.getattr("__arrow_c_stream__")?.call0()?;
432            let capsule = capsule.downcast::<PyCapsule>()?;
433            validate_pycapsule(capsule, "arrow_array_stream")?;
434
435            let stream = unsafe { FFI_ArrowArrayStream::from_raw(capsule.pointer() as _) };
436
437            let stream_reader = ArrowArrayStreamReader::try_new(stream)
438                .map_err(|err| PyValueError::new_err(err.to_string()))?;
439
440            return Ok(stream_reader);
441        }
442
443        validate_class("RecordBatchReader", value)?;
444
445        // prepare a pointer to receive the stream struct
446        let mut stream = FFI_ArrowArrayStream::empty();
447        let stream_ptr = &mut stream as *mut FFI_ArrowArrayStream;
448
449        // make the conversion through PyArrow's private API
450        // this changes the pointer's memory and is thus unsafe.
451        // In particular, `_export_to_c` can go out of bounds
452        let args = PyTuple::new(value.py(), [stream_ptr as Py_uintptr_t])?;
453        value.call_method1("_export_to_c", args)?;
454
455        let stream_reader = ArrowArrayStreamReader::try_new(stream)
456            .map_err(|err| PyValueError::new_err(err.to_string()))?;
457
458        Ok(stream_reader)
459    }
460}
461
462/// Convert a [`RecordBatchReader`] into a `pyarrow.RecordBatchReader`.
463impl IntoPyArrow for Box<dyn RecordBatchReader + Send> {
464    // We can't implement `ToPyArrow` for `T: RecordBatchReader + Send` because
465    // there is already a blanket implementation for `T: ToPyArrow`.
466    fn into_pyarrow(self, py: Python) -> PyResult<PyObject> {
467        let mut stream = FFI_ArrowArrayStream::new(self);
468
469        let stream_ptr = (&mut stream) as *mut FFI_ArrowArrayStream;
470        let module = py.import("pyarrow")?;
471        let class = module.getattr("RecordBatchReader")?;
472        let args = PyTuple::new(py, [stream_ptr as Py_uintptr_t])?;
473        let reader = class.call_method1("_import_from_c", args)?;
474
475        Ok(PyObject::from(reader))
476    }
477}
478
479/// Convert a [`ArrowArrayStreamReader`] into a `pyarrow.RecordBatchReader`.
480impl IntoPyArrow for ArrowArrayStreamReader {
481    fn into_pyarrow(self, py: Python) -> PyResult<PyObject> {
482        let boxed: Box<dyn RecordBatchReader + Send> = Box::new(self);
483        boxed.into_pyarrow(py)
484    }
485}
486
487/// A newtype wrapper for types implementing [`FromPyArrow`] or [`IntoPyArrow`].
488///
489/// When wrapped around a type `T: FromPyArrow`, it
490/// implements [`FromPyObject`] for the PyArrow objects. When wrapped around a
491/// `T: IntoPyArrow`, it implements `IntoPy<PyObject>` for the wrapped type.
492#[derive(Debug)]
493pub struct PyArrowType<T>(pub T);
494
495impl<'source, T: FromPyArrow> FromPyObject<'source> for PyArrowType<T> {
496    fn extract_bound(value: &Bound<'source, PyAny>) -> PyResult<Self> {
497        Ok(Self(T::from_pyarrow_bound(value)?))
498    }
499}
500
501impl<'py, T: IntoPyArrow> IntoPyObject<'py> for PyArrowType<T> {
502    type Target = PyAny;
503
504    type Output = Bound<'py, Self::Target>;
505
506    type Error = PyErr;
507
508    fn into_pyobject(self, py: Python<'py>) -> Result<Self::Output, PyErr> {
509        match self.0.into_pyarrow(py) {
510            Ok(obj) => Result::Ok(obj.into_bound(py)),
511            Err(err) => Result::Err(err),
512        }
513    }
514}
515
516impl<T> From<T> for PyArrowType<T> {
517    fn from(s: T) -> Self {
518        Self(s)
519    }
520}