arrow_pyarrow/
lib.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Pass Arrow objects from and to PyArrow, using Arrow's
19//! [C Data Interface](https://arrow.apache.org/docs/format/CDataInterface.html)
20//! and [pyo3](https://docs.rs/pyo3/latest/pyo3/).
21//!
22//! For underlying implementation, see the [ffi] module.
23//!
24//! One can use these to write Python functions that take and return PyArrow
25//! objects, with automatic conversion to corresponding arrow-rs types.
26//!
27//! ```ignore
28//! #[pyfunction]
29//! fn double_array(array: PyArrowType<ArrayData>) -> PyResult<PyArrowType<ArrayData>> {
30//!     let array = array.0; // Extract from PyArrowType wrapper
31//!     let array: Arc<dyn Array> = make_array(array); // Convert ArrayData to ArrayRef
32//!     let array: &Int32Array = array.as_any().downcast_ref()
33//!         .ok_or_else(|| PyValueError::new_err("expected int32 array"))?;
34//!     let array: Int32Array = array.iter().map(|x| x.map(|x| x * 2)).collect();
35//!     Ok(PyArrowType(array.into_data()))
36//! }
37//! ```
38//!
39//! | pyarrow type                | arrow-rs type                                                      |
40//! |-----------------------------|--------------------------------------------------------------------|
41//! | `pyarrow.DataType`          | [DataType]                                                         |
42//! | `pyarrow.Field`             | [Field]                                                            |
43//! | `pyarrow.Schema`            | [Schema]                                                           |
44//! | `pyarrow.Array`             | [ArrayData]                                                        |
45//! | `pyarrow.RecordBatch`       | [RecordBatch]                                                      |
46//! | `pyarrow.RecordBatchReader` | [ArrowArrayStreamReader] / `Box<dyn RecordBatchReader + Send>` (1) |
47//! | `pyarrow.Table`             | [Table] (2)                                                        |
48//!
49//! (1) `pyarrow.RecordBatchReader` can be imported as [ArrowArrayStreamReader]. Either
50//! [ArrowArrayStreamReader] or `Box<dyn RecordBatchReader + Send>` can be exported
51//! as `pyarrow.RecordBatchReader`. (`Box<dyn RecordBatchReader + Send>` is typically
52//! easier to create.)
53//!
54//! (2) Although arrow-rs offers [Table], a convenience wrapper for [pyarrow.Table](https://arrow.apache.org/docs/python/generated/pyarrow.Table)
55//! that internally holds `Vec<RecordBatch>`, it is meant primarily for use cases where you already
56//! have `Vec<RecordBatch>` on the Rust side and want to export that in bulk as a `pyarrow.Table`.
57//! In general, it is recommended to use streaming approaches instead of dealing with data in bulk.
58//! For example, a `pyarrow.Table` (or any other object that implements the ArrayStream PyCapsule
59//! interface) can be imported to Rust through `PyArrowType<ArrowArrayStreamReader>` instead of
60//! forcing eager reading into `Vec<RecordBatch>`.
61
62use std::convert::{From, TryFrom};
63use std::ptr::{addr_of, addr_of_mut};
64use std::sync::Arc;
65
66use arrow_array::ffi;
67use arrow_array::ffi::{FFI_ArrowArray, FFI_ArrowSchema};
68use arrow_array::ffi_stream::{ArrowArrayStreamReader, FFI_ArrowArrayStream};
69use arrow_array::{
70    RecordBatch, RecordBatchIterator, RecordBatchOptions, RecordBatchReader, StructArray,
71    make_array,
72};
73use arrow_data::ArrayData;
74use arrow_schema::{ArrowError, DataType, Field, Schema, SchemaRef};
75use pyo3::exceptions::{PyTypeError, PyValueError};
76use pyo3::ffi::Py_uintptr_t;
77use pyo3::prelude::*;
78use pyo3::pybacked::PyBackedStr;
79use pyo3::types::{PyCapsule, PyDict, PyList, PyTuple};
80use pyo3::{import_exception, intern};
81
82import_exception!(pyarrow, ArrowException);
83/// Represents an exception raised by PyArrow.
84pub type PyArrowException = ArrowException;
85
86fn to_py_err(err: ArrowError) -> PyErr {
87    PyArrowException::new_err(err.to_string())
88}
89
90/// Trait for converting Python objects to arrow-rs types.
91pub trait FromPyArrow: Sized {
92    /// Convert a Python object to an arrow-rs type.
93    ///
94    /// Takes a GIL-bound value from Python and returns a result with the arrow-rs type.
95    fn from_pyarrow_bound(value: &Bound<PyAny>) -> PyResult<Self>;
96}
97
98/// Create a new PyArrow object from a arrow-rs type.
99pub trait ToPyArrow {
100    /// Convert the implemented type into a Python object without consuming it.
101    fn to_pyarrow<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>>;
102}
103
104/// Convert an arrow-rs type into a PyArrow object.
105pub trait IntoPyArrow {
106    /// Convert the implemented type into a Python object while consuming it.
107    fn into_pyarrow<'py>(self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>>;
108}
109
110impl<T: ToPyArrow> IntoPyArrow for T {
111    fn into_pyarrow<'py>(self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
112        self.to_pyarrow(py)
113    }
114}
115
116fn validate_class(expected: &str, value: &Bound<PyAny>) -> PyResult<()> {
117    let pyarrow = PyModule::import(value.py(), "pyarrow")?;
118    let class = pyarrow.getattr(expected)?;
119    if !value.is_instance(&class)? {
120        let expected_module = class.getattr("__module__")?.extract::<PyBackedStr>()?;
121        let expected_name = class.getattr("__name__")?.extract::<PyBackedStr>()?;
122        let found_class = value.get_type();
123        let found_module = found_class
124            .getattr("__module__")?
125            .extract::<PyBackedStr>()?;
126        let found_name = found_class.getattr("__name__")?.extract::<PyBackedStr>()?;
127        return Err(PyTypeError::new_err(format!(
128            "Expected instance of {expected_module}.{expected_name}, got {found_module}.{found_name}",
129        )));
130    }
131    Ok(())
132}
133
134fn validate_pycapsule(capsule: &Bound<PyCapsule>, name: &str) -> PyResult<()> {
135    let capsule_name = capsule.name()?;
136    if capsule_name.is_none() {
137        return Err(PyValueError::new_err(
138            "Expected schema PyCapsule to have name set.",
139        ));
140    }
141
142    let capsule_name = capsule_name.unwrap().to_str()?;
143    if capsule_name != name {
144        return Err(PyValueError::new_err(format!(
145            "Expected name '{name}' in PyCapsule, instead got '{capsule_name}'",
146        )));
147    }
148
149    Ok(())
150}
151
152impl FromPyArrow for DataType {
153    fn from_pyarrow_bound(value: &Bound<PyAny>) -> PyResult<Self> {
154        // Newer versions of PyArrow as well as other libraries with Arrow data implement this
155        // method, so prefer it over _export_to_c.
156        // See https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html
157        if value.hasattr("__arrow_c_schema__")? {
158            let capsule = value.getattr("__arrow_c_schema__")?.call0()?;
159            let capsule = capsule.downcast::<PyCapsule>()?;
160            validate_pycapsule(capsule, "arrow_schema")?;
161
162            let schema_ptr = unsafe { capsule.reference::<FFI_ArrowSchema>() };
163            let dtype = DataType::try_from(schema_ptr).map_err(to_py_err)?;
164            return Ok(dtype);
165        }
166
167        validate_class("DataType", value)?;
168
169        let c_schema = FFI_ArrowSchema::empty();
170        let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
171        value.call_method1("_export_to_c", (c_schema_ptr as Py_uintptr_t,))?;
172        let dtype = DataType::try_from(&c_schema).map_err(to_py_err)?;
173        Ok(dtype)
174    }
175}
176
177impl ToPyArrow for DataType {
178    fn to_pyarrow<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
179        let c_schema = FFI_ArrowSchema::try_from(self).map_err(to_py_err)?;
180        let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
181        let module = py.import("pyarrow")?;
182        let class = module.getattr("DataType")?;
183        let dtype = class.call_method1("_import_from_c", (c_schema_ptr as Py_uintptr_t,))?;
184        Ok(dtype)
185    }
186}
187
188impl FromPyArrow for Field {
189    fn from_pyarrow_bound(value: &Bound<PyAny>) -> PyResult<Self> {
190        // Newer versions of PyArrow as well as other libraries with Arrow data implement this
191        // method, so prefer it over _export_to_c.
192        // See https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html
193        if value.hasattr("__arrow_c_schema__")? {
194            let capsule = value.getattr("__arrow_c_schema__")?.call0()?;
195            let capsule = capsule.downcast::<PyCapsule>()?;
196            validate_pycapsule(capsule, "arrow_schema")?;
197
198            let schema_ptr = unsafe { capsule.reference::<FFI_ArrowSchema>() };
199            let field = Field::try_from(schema_ptr).map_err(to_py_err)?;
200            return Ok(field);
201        }
202
203        validate_class("Field", value)?;
204
205        let c_schema = FFI_ArrowSchema::empty();
206        let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
207        value.call_method1("_export_to_c", (c_schema_ptr as Py_uintptr_t,))?;
208        let field = Field::try_from(&c_schema).map_err(to_py_err)?;
209        Ok(field)
210    }
211}
212
213impl ToPyArrow for Field {
214    fn to_pyarrow<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
215        let c_schema = FFI_ArrowSchema::try_from(self).map_err(to_py_err)?;
216        let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
217        let module = py.import("pyarrow")?;
218        let class = module.getattr("Field")?;
219        let dtype = class.call_method1("_import_from_c", (c_schema_ptr as Py_uintptr_t,))?;
220        Ok(dtype)
221    }
222}
223
224impl FromPyArrow for Schema {
225    fn from_pyarrow_bound(value: &Bound<PyAny>) -> PyResult<Self> {
226        // Newer versions of PyArrow as well as other libraries with Arrow data implement this
227        // method, so prefer it over _export_to_c.
228        // See https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html
229        if value.hasattr("__arrow_c_schema__")? {
230            let capsule = value.getattr("__arrow_c_schema__")?.call0()?;
231            let capsule = capsule.downcast::<PyCapsule>()?;
232            validate_pycapsule(capsule, "arrow_schema")?;
233
234            let schema_ptr = unsafe { capsule.reference::<FFI_ArrowSchema>() };
235            let schema = Schema::try_from(schema_ptr).map_err(to_py_err)?;
236            return Ok(schema);
237        }
238
239        validate_class("Schema", value)?;
240
241        let c_schema = FFI_ArrowSchema::empty();
242        let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
243        value.call_method1("_export_to_c", (c_schema_ptr as Py_uintptr_t,))?;
244        let schema = Schema::try_from(&c_schema).map_err(to_py_err)?;
245        Ok(schema)
246    }
247}
248
249impl ToPyArrow for Schema {
250    fn to_pyarrow<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
251        let c_schema = FFI_ArrowSchema::try_from(self).map_err(to_py_err)?;
252        let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
253        let module = py.import("pyarrow")?;
254        let class = module.getattr("Schema")?;
255        let schema = class.call_method1("_import_from_c", (c_schema_ptr as Py_uintptr_t,))?;
256        Ok(schema)
257    }
258}
259
260impl FromPyArrow for ArrayData {
261    fn from_pyarrow_bound(value: &Bound<PyAny>) -> PyResult<Self> {
262        // Newer versions of PyArrow as well as other libraries with Arrow data implement this
263        // method, so prefer it over _export_to_c.
264        // See https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html
265        if value.hasattr("__arrow_c_array__")? {
266            let tuple = value.getattr("__arrow_c_array__")?.call0()?;
267
268            if !tuple.is_instance_of::<PyTuple>() {
269                return Err(PyTypeError::new_err(
270                    "Expected __arrow_c_array__ to return a tuple.",
271                ));
272            }
273
274            let schema_capsule = tuple.get_item(0)?;
275            let schema_capsule = schema_capsule.downcast::<PyCapsule>()?;
276            let array_capsule = tuple.get_item(1)?;
277            let array_capsule = array_capsule.downcast::<PyCapsule>()?;
278
279            validate_pycapsule(schema_capsule, "arrow_schema")?;
280            validate_pycapsule(array_capsule, "arrow_array")?;
281
282            let schema_ptr = unsafe { schema_capsule.reference::<FFI_ArrowSchema>() };
283            let array = unsafe { FFI_ArrowArray::from_raw(array_capsule.pointer() as _) };
284            return unsafe { ffi::from_ffi(array, schema_ptr) }.map_err(to_py_err);
285        }
286
287        validate_class("Array", value)?;
288
289        // prepare a pointer to receive the Array struct
290        let mut array = FFI_ArrowArray::empty();
291        let mut schema = FFI_ArrowSchema::empty();
292
293        // make the conversion through PyArrow's private API
294        // this changes the pointer's memory and is thus unsafe.
295        // In particular, `_export_to_c` can go out of bounds
296        value.call_method1(
297            "_export_to_c",
298            (
299                addr_of_mut!(array) as Py_uintptr_t,
300                addr_of_mut!(schema) as Py_uintptr_t,
301            ),
302        )?;
303
304        unsafe { ffi::from_ffi(array, &schema) }.map_err(to_py_err)
305    }
306}
307
308impl ToPyArrow for ArrayData {
309    fn to_pyarrow<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
310        let array = FFI_ArrowArray::new(self);
311        let schema = FFI_ArrowSchema::try_from(self.data_type()).map_err(to_py_err)?;
312
313        let module = py.import("pyarrow")?;
314        let class = module.getattr("Array")?;
315        let array = class.call_method1(
316            "_import_from_c",
317            (
318                addr_of!(array) as Py_uintptr_t,
319                addr_of!(schema) as Py_uintptr_t,
320            ),
321        )?;
322        Ok(array)
323    }
324}
325
326impl<T: FromPyArrow> FromPyArrow for Vec<T> {
327    fn from_pyarrow_bound(value: &Bound<PyAny>) -> PyResult<Self> {
328        let list = value.downcast::<PyList>()?;
329        list.iter().map(|x| T::from_pyarrow_bound(&x)).collect()
330    }
331}
332
333impl<T: ToPyArrow> ToPyArrow for Vec<T> {
334    fn to_pyarrow<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
335        let values = self
336            .iter()
337            .map(|v| v.to_pyarrow(py))
338            .collect::<PyResult<Vec<_>>>()?;
339        Ok(PyList::new(py, values)?.into_any())
340    }
341}
342
343impl FromPyArrow for RecordBatch {
344    fn from_pyarrow_bound(value: &Bound<PyAny>) -> PyResult<Self> {
345        // Newer versions of PyArrow as well as other libraries with Arrow data implement this
346        // method, so prefer it over _export_to_c.
347        // See https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html
348        if value.hasattr("__arrow_c_array__")? {
349            let tuple = value.getattr("__arrow_c_array__")?.call0()?;
350
351            if !tuple.is_instance_of::<PyTuple>() {
352                return Err(PyTypeError::new_err(
353                    "Expected __arrow_c_array__ to return a tuple.",
354                ));
355            }
356
357            let schema_capsule = tuple.get_item(0)?;
358            let schema_capsule = schema_capsule.downcast::<PyCapsule>()?;
359            let array_capsule = tuple.get_item(1)?;
360            let array_capsule = array_capsule.downcast::<PyCapsule>()?;
361
362            validate_pycapsule(schema_capsule, "arrow_schema")?;
363            validate_pycapsule(array_capsule, "arrow_array")?;
364
365            let schema_ptr = unsafe { schema_capsule.reference::<FFI_ArrowSchema>() };
366            let ffi_array = unsafe { FFI_ArrowArray::from_raw(array_capsule.pointer().cast()) };
367            let mut array_data =
368                unsafe { ffi::from_ffi(ffi_array, schema_ptr) }.map_err(to_py_err)?;
369            if !matches!(array_data.data_type(), DataType::Struct(_)) {
370                return Err(PyTypeError::new_err(
371                    "Expected Struct type from __arrow_c_array.",
372                ));
373            }
374            let options = RecordBatchOptions::default().with_row_count(Some(array_data.len()));
375            // Ensure data is aligned (by potentially copying the buffers).
376            // This is needed because some python code (for example the
377            // python flight client) produces unaligned buffers
378            // See https://github.com/apache/arrow/issues/43552 for details
379            array_data.align_buffers();
380            let array = StructArray::from(array_data);
381            // StructArray does not embed metadata from schema. We need to override
382            // the output schema with the schema from the capsule.
383            let schema = Arc::new(Schema::try_from(schema_ptr).map_err(to_py_err)?);
384            let (_fields, columns, nulls) = array.into_parts();
385            assert_eq!(
386                nulls.map(|n| n.null_count()).unwrap_or_default(),
387                0,
388                "Cannot convert nullable StructArray to RecordBatch, see StructArray documentation"
389            );
390            return RecordBatch::try_new_with_options(schema, columns, &options).map_err(to_py_err);
391        }
392
393        validate_class("RecordBatch", value)?;
394        // TODO(kszucs): implement the FFI conversions in arrow-rs for RecordBatches
395        let schema = value.getattr("schema")?;
396        let schema = Arc::new(Schema::from_pyarrow_bound(&schema)?);
397
398        let arrays = value.getattr("columns")?;
399        let arrays = arrays
400            .downcast::<PyList>()?
401            .iter()
402            .map(|a| Ok(make_array(ArrayData::from_pyarrow_bound(&a)?)))
403            .collect::<PyResult<_>>()?;
404
405        let row_count = value
406            .getattr("num_rows")
407            .ok()
408            .and_then(|x| x.extract().ok());
409        let options = RecordBatchOptions::default().with_row_count(row_count);
410
411        let batch =
412            RecordBatch::try_new_with_options(schema, arrays, &options).map_err(to_py_err)?;
413        Ok(batch)
414    }
415}
416
417impl ToPyArrow for RecordBatch {
418    fn to_pyarrow<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
419        // Workaround apache/arrow#37669 by returning RecordBatchIterator
420        let reader = RecordBatchIterator::new(vec![Ok(self.clone())], self.schema());
421        let reader: Box<dyn RecordBatchReader + Send> = Box::new(reader);
422        let py_reader = reader.into_pyarrow(py)?;
423        py_reader.call_method0("read_next_batch")
424    }
425}
426
427/// Supports conversion from `pyarrow.RecordBatchReader` to [ArrowArrayStreamReader].
428impl FromPyArrow for ArrowArrayStreamReader {
429    fn from_pyarrow_bound(value: &Bound<PyAny>) -> PyResult<Self> {
430        // Newer versions of PyArrow as well as other libraries with Arrow data implement this
431        // method, so prefer it over _export_to_c.
432        // See https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html
433        if value.hasattr("__arrow_c_stream__")? {
434            let capsule = value.getattr("__arrow_c_stream__")?.call0()?;
435            let capsule = capsule.downcast::<PyCapsule>()?;
436            validate_pycapsule(capsule, "arrow_array_stream")?;
437
438            let stream = unsafe { FFI_ArrowArrayStream::from_raw(capsule.pointer() as _) };
439
440            let stream_reader = ArrowArrayStreamReader::try_new(stream)
441                .map_err(|err| PyValueError::new_err(err.to_string()))?;
442
443            return Ok(stream_reader);
444        }
445
446        validate_class("RecordBatchReader", value)?;
447
448        // prepare a pointer to receive the stream struct
449        let mut stream = FFI_ArrowArrayStream::empty();
450        let stream_ptr = &mut stream as *mut FFI_ArrowArrayStream;
451
452        // make the conversion through PyArrow's private API
453        // this changes the pointer's memory and is thus unsafe.
454        // In particular, `_export_to_c` can go out of bounds
455        let args = PyTuple::new(value.py(), [stream_ptr as Py_uintptr_t])?;
456        value.call_method1("_export_to_c", args)?;
457
458        let stream_reader = ArrowArrayStreamReader::try_new(stream)
459            .map_err(|err| PyValueError::new_err(err.to_string()))?;
460
461        Ok(stream_reader)
462    }
463}
464
465/// Convert a [`RecordBatchReader`] into a `pyarrow.RecordBatchReader`.
466impl IntoPyArrow for Box<dyn RecordBatchReader + Send> {
467    // We can't implement `ToPyArrow` for `T: RecordBatchReader + Send` because
468    // there is already a blanket implementation for `T: ToPyArrow`.
469    fn into_pyarrow<'py>(self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
470        let mut stream = FFI_ArrowArrayStream::new(self);
471
472        let stream_ptr = (&mut stream) as *mut FFI_ArrowArrayStream;
473        let module = py.import("pyarrow")?;
474        let class = module.getattr("RecordBatchReader")?;
475        let args = PyTuple::new(py, [stream_ptr as Py_uintptr_t])?;
476        let reader = class.call_method1("_import_from_c", args)?;
477
478        Ok(reader)
479    }
480}
481
482/// Convert a [`ArrowArrayStreamReader`] into a `pyarrow.RecordBatchReader`.
483impl IntoPyArrow for ArrowArrayStreamReader {
484    fn into_pyarrow<'py>(self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
485        let boxed: Box<dyn RecordBatchReader + Send> = Box::new(self);
486        boxed.into_pyarrow(py)
487    }
488}
489
490/// This is a convenience wrapper around `Vec<RecordBatch>` that tries to simplify conversion from
491/// and to `pyarrow.Table`.
492///
493/// This could be used in circumstances where you either want to consume a `pyarrow.Table` directly
494/// (although technically, since `pyarrow.Table` implements the ArrayStreamReader PyCapsule
495/// interface, one could also consume a `PyArrowType<ArrowArrayStreamReader>` instead) or, more
496/// importantly, where one wants to export a `pyarrow.Table` from a `Vec<RecordBatch>` from the Rust
497/// side.
498///
499/// ```ignore
500/// #[pyfunction]
501/// fn return_table(...) -> PyResult<PyArrowType<Table>> {
502///     let batches: Vec<RecordBatch>;
503///     let schema: SchemaRef;
504///     PyArrowType(Table::try_new(batches, schema).map_err(|err| err.into_py_err(py))?)
505/// }
506/// ```
507#[derive(Clone)]
508pub struct Table {
509    record_batches: Vec<RecordBatch>,
510    schema: SchemaRef,
511}
512
513impl Table {
514    pub fn try_new(
515        record_batches: Vec<RecordBatch>,
516        schema: SchemaRef,
517    ) -> Result<Self, ArrowError> {
518        for record_batch in &record_batches {
519            if schema != record_batch.schema() {
520                return Err(ArrowError::SchemaError(format!(
521                    "All record batches must have the same schema. \
522                         Expected schema: {:?}, got schema: {:?}",
523                    schema,
524                    record_batch.schema()
525                )));
526            }
527        }
528        Ok(Self {
529            record_batches,
530            schema,
531        })
532    }
533
534    pub fn record_batches(&self) -> &[RecordBatch] {
535        &self.record_batches
536    }
537
538    pub fn schema(&self) -> SchemaRef {
539        self.schema.clone()
540    }
541
542    pub fn into_inner(self) -> (Vec<RecordBatch>, SchemaRef) {
543        (self.record_batches, self.schema)
544    }
545}
546
547impl TryFrom<Box<dyn RecordBatchReader>> for Table {
548    type Error = ArrowError;
549
550    fn try_from(value: Box<dyn RecordBatchReader>) -> Result<Self, ArrowError> {
551        let schema = value.schema();
552        let batches = value.collect::<Result<Vec<_>, _>>()?;
553        Self::try_new(batches, schema)
554    }
555}
556
557/// Convert a `pyarrow.Table` (or any other ArrowArrayStream compliant object) into [`Table`]
558impl FromPyArrow for Table {
559    fn from_pyarrow_bound(ob: &Bound<PyAny>) -> PyResult<Self> {
560        let reader: Box<dyn RecordBatchReader> =
561            Box::new(ArrowArrayStreamReader::from_pyarrow_bound(ob)?);
562        Self::try_from(reader).map_err(|err| PyErr::new::<PyValueError, _>(err.to_string()))
563    }
564}
565
566/// Convert a [`Table`] into `pyarrow.Table`.
567impl IntoPyArrow for Table {
568    fn into_pyarrow(self, py: Python) -> PyResult<Bound<PyAny>> {
569        let module = py.import(intern!(py, "pyarrow"))?;
570        let class = module.getattr(intern!(py, "Table"))?;
571
572        let py_batches = PyList::new(py, self.record_batches.into_iter().map(PyArrowType))?;
573        let py_schema = PyArrowType(Arc::unwrap_or_clone(self.schema));
574
575        let kwargs = PyDict::new(py);
576        kwargs.set_item("schema", py_schema)?;
577
578        let reader = class.call_method("from_batches", (py_batches,), Some(&kwargs))?;
579
580        Ok(reader)
581    }
582}
583
584/// A newtype wrapper for types implementing [`FromPyArrow`] or [`IntoPyArrow`].
585///
586/// When wrapped around a type `T: FromPyArrow`, it
587/// implements [`FromPyObject`] for the PyArrow objects. When wrapped around a
588/// `T: IntoPyArrow`, it implements `IntoPy<PyObject>` for the wrapped type.
589#[derive(Debug)]
590pub struct PyArrowType<T>(pub T);
591
592impl<'source, T: FromPyArrow> FromPyObject<'source> for PyArrowType<T> {
593    fn extract_bound(value: &Bound<'source, PyAny>) -> PyResult<Self> {
594        Ok(Self(T::from_pyarrow_bound(value)?))
595    }
596}
597
598impl<'py, T: IntoPyArrow> IntoPyObject<'py> for PyArrowType<T> {
599    type Target = PyAny;
600
601    type Output = Bound<'py, Self::Target>;
602
603    type Error = PyErr;
604
605    fn into_pyobject(self, py: Python<'py>) -> Result<Self::Output, PyErr> {
606        self.0.into_pyarrow(py)
607    }
608}
609
610impl<T> From<T> for PyArrowType<T> {
611    fn from(s: T) -> Self {
612        Self(s)
613    }
614}