1use std::convert::{From, TryFrom};
63use std::ptr::{addr_of, addr_of_mut};
64use std::sync::Arc;
65
66use arrow_array::ffi;
67use arrow_array::ffi::{FFI_ArrowArray, FFI_ArrowSchema};
68use arrow_array::ffi_stream::{ArrowArrayStreamReader, FFI_ArrowArrayStream};
69use arrow_array::{
70 RecordBatch, RecordBatchIterator, RecordBatchOptions, RecordBatchReader, StructArray,
71 make_array,
72};
73use arrow_data::ArrayData;
74use arrow_schema::{ArrowError, DataType, Field, Schema, SchemaRef};
75use pyo3::exceptions::{PyTypeError, PyValueError};
76use pyo3::ffi::Py_uintptr_t;
77use pyo3::prelude::*;
78use pyo3::pybacked::PyBackedStr;
79use pyo3::types::{PyCapsule, PyDict, PyList, PyTuple};
80use pyo3::{import_exception, intern};
81
82import_exception!(pyarrow, ArrowException);
83pub type PyArrowException = ArrowException;
85
86fn to_py_err(err: ArrowError) -> PyErr {
87 PyArrowException::new_err(err.to_string())
88}
89
90pub trait FromPyArrow: Sized {
92 fn from_pyarrow_bound(value: &Bound<PyAny>) -> PyResult<Self>;
96}
97
98pub trait ToPyArrow {
100 fn to_pyarrow<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>>;
102}
103
104pub trait IntoPyArrow {
106 fn into_pyarrow<'py>(self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>>;
108}
109
110impl<T: ToPyArrow> IntoPyArrow for T {
111 fn into_pyarrow<'py>(self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
112 self.to_pyarrow(py)
113 }
114}
115
116fn validate_class(expected: &str, value: &Bound<PyAny>) -> PyResult<()> {
117 let pyarrow = PyModule::import(value.py(), "pyarrow")?;
118 let class = pyarrow.getattr(expected)?;
119 if !value.is_instance(&class)? {
120 let expected_module = class.getattr("__module__")?.extract::<PyBackedStr>()?;
121 let expected_name = class.getattr("__name__")?.extract::<PyBackedStr>()?;
122 let found_class = value.get_type();
123 let found_module = found_class
124 .getattr("__module__")?
125 .extract::<PyBackedStr>()?;
126 let found_name = found_class.getattr("__name__")?.extract::<PyBackedStr>()?;
127 return Err(PyTypeError::new_err(format!(
128 "Expected instance of {expected_module}.{expected_name}, got {found_module}.{found_name}",
129 )));
130 }
131 Ok(())
132}
133
134fn validate_pycapsule(capsule: &Bound<PyCapsule>, name: &str) -> PyResult<()> {
135 let capsule_name = capsule.name()?;
136 if capsule_name.is_none() {
137 return Err(PyValueError::new_err(
138 "Expected schema PyCapsule to have name set.",
139 ));
140 }
141
142 let capsule_name = capsule_name.unwrap().to_str()?;
143 if capsule_name != name {
144 return Err(PyValueError::new_err(format!(
145 "Expected name '{name}' in PyCapsule, instead got '{capsule_name}'",
146 )));
147 }
148
149 Ok(())
150}
151
152impl FromPyArrow for DataType {
153 fn from_pyarrow_bound(value: &Bound<PyAny>) -> PyResult<Self> {
154 if value.hasattr("__arrow_c_schema__")? {
158 let capsule = value.getattr("__arrow_c_schema__")?.call0()?;
159 let capsule = capsule.downcast::<PyCapsule>()?;
160 validate_pycapsule(capsule, "arrow_schema")?;
161
162 let schema_ptr = unsafe { capsule.reference::<FFI_ArrowSchema>() };
163 let dtype = DataType::try_from(schema_ptr).map_err(to_py_err)?;
164 return Ok(dtype);
165 }
166
167 validate_class("DataType", value)?;
168
169 let c_schema = FFI_ArrowSchema::empty();
170 let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
171 value.call_method1("_export_to_c", (c_schema_ptr as Py_uintptr_t,))?;
172 let dtype = DataType::try_from(&c_schema).map_err(to_py_err)?;
173 Ok(dtype)
174 }
175}
176
177impl ToPyArrow for DataType {
178 fn to_pyarrow<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
179 let c_schema = FFI_ArrowSchema::try_from(self).map_err(to_py_err)?;
180 let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
181 let module = py.import("pyarrow")?;
182 let class = module.getattr("DataType")?;
183 let dtype = class.call_method1("_import_from_c", (c_schema_ptr as Py_uintptr_t,))?;
184 Ok(dtype)
185 }
186}
187
188impl FromPyArrow for Field {
189 fn from_pyarrow_bound(value: &Bound<PyAny>) -> PyResult<Self> {
190 if value.hasattr("__arrow_c_schema__")? {
194 let capsule = value.getattr("__arrow_c_schema__")?.call0()?;
195 let capsule = capsule.downcast::<PyCapsule>()?;
196 validate_pycapsule(capsule, "arrow_schema")?;
197
198 let schema_ptr = unsafe { capsule.reference::<FFI_ArrowSchema>() };
199 let field = Field::try_from(schema_ptr).map_err(to_py_err)?;
200 return Ok(field);
201 }
202
203 validate_class("Field", value)?;
204
205 let c_schema = FFI_ArrowSchema::empty();
206 let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
207 value.call_method1("_export_to_c", (c_schema_ptr as Py_uintptr_t,))?;
208 let field = Field::try_from(&c_schema).map_err(to_py_err)?;
209 Ok(field)
210 }
211}
212
213impl ToPyArrow for Field {
214 fn to_pyarrow<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
215 let c_schema = FFI_ArrowSchema::try_from(self).map_err(to_py_err)?;
216 let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
217 let module = py.import("pyarrow")?;
218 let class = module.getattr("Field")?;
219 let dtype = class.call_method1("_import_from_c", (c_schema_ptr as Py_uintptr_t,))?;
220 Ok(dtype)
221 }
222}
223
224impl FromPyArrow for Schema {
225 fn from_pyarrow_bound(value: &Bound<PyAny>) -> PyResult<Self> {
226 if value.hasattr("__arrow_c_schema__")? {
230 let capsule = value.getattr("__arrow_c_schema__")?.call0()?;
231 let capsule = capsule.downcast::<PyCapsule>()?;
232 validate_pycapsule(capsule, "arrow_schema")?;
233
234 let schema_ptr = unsafe { capsule.reference::<FFI_ArrowSchema>() };
235 let schema = Schema::try_from(schema_ptr).map_err(to_py_err)?;
236 return Ok(schema);
237 }
238
239 validate_class("Schema", value)?;
240
241 let c_schema = FFI_ArrowSchema::empty();
242 let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
243 value.call_method1("_export_to_c", (c_schema_ptr as Py_uintptr_t,))?;
244 let schema = Schema::try_from(&c_schema).map_err(to_py_err)?;
245 Ok(schema)
246 }
247}
248
249impl ToPyArrow for Schema {
250 fn to_pyarrow<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
251 let c_schema = FFI_ArrowSchema::try_from(self).map_err(to_py_err)?;
252 let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
253 let module = py.import("pyarrow")?;
254 let class = module.getattr("Schema")?;
255 let schema = class.call_method1("_import_from_c", (c_schema_ptr as Py_uintptr_t,))?;
256 Ok(schema)
257 }
258}
259
260impl FromPyArrow for ArrayData {
261 fn from_pyarrow_bound(value: &Bound<PyAny>) -> PyResult<Self> {
262 if value.hasattr("__arrow_c_array__")? {
266 let tuple = value.getattr("__arrow_c_array__")?.call0()?;
267
268 if !tuple.is_instance_of::<PyTuple>() {
269 return Err(PyTypeError::new_err(
270 "Expected __arrow_c_array__ to return a tuple.",
271 ));
272 }
273
274 let schema_capsule = tuple.get_item(0)?;
275 let schema_capsule = schema_capsule.downcast::<PyCapsule>()?;
276 let array_capsule = tuple.get_item(1)?;
277 let array_capsule = array_capsule.downcast::<PyCapsule>()?;
278
279 validate_pycapsule(schema_capsule, "arrow_schema")?;
280 validate_pycapsule(array_capsule, "arrow_array")?;
281
282 let schema_ptr = unsafe { schema_capsule.reference::<FFI_ArrowSchema>() };
283 let array = unsafe { FFI_ArrowArray::from_raw(array_capsule.pointer() as _) };
284 return unsafe { ffi::from_ffi(array, schema_ptr) }.map_err(to_py_err);
285 }
286
287 validate_class("Array", value)?;
288
289 let mut array = FFI_ArrowArray::empty();
291 let mut schema = FFI_ArrowSchema::empty();
292
293 value.call_method1(
297 "_export_to_c",
298 (
299 addr_of_mut!(array) as Py_uintptr_t,
300 addr_of_mut!(schema) as Py_uintptr_t,
301 ),
302 )?;
303
304 unsafe { ffi::from_ffi(array, &schema) }.map_err(to_py_err)
305 }
306}
307
308impl ToPyArrow for ArrayData {
309 fn to_pyarrow<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
310 let array = FFI_ArrowArray::new(self);
311 let schema = FFI_ArrowSchema::try_from(self.data_type()).map_err(to_py_err)?;
312
313 let module = py.import("pyarrow")?;
314 let class = module.getattr("Array")?;
315 let array = class.call_method1(
316 "_import_from_c",
317 (
318 addr_of!(array) as Py_uintptr_t,
319 addr_of!(schema) as Py_uintptr_t,
320 ),
321 )?;
322 Ok(array)
323 }
324}
325
326impl<T: FromPyArrow> FromPyArrow for Vec<T> {
327 fn from_pyarrow_bound(value: &Bound<PyAny>) -> PyResult<Self> {
328 let list = value.downcast::<PyList>()?;
329 list.iter().map(|x| T::from_pyarrow_bound(&x)).collect()
330 }
331}
332
333impl<T: ToPyArrow> ToPyArrow for Vec<T> {
334 fn to_pyarrow<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
335 let values = self
336 .iter()
337 .map(|v| v.to_pyarrow(py))
338 .collect::<PyResult<Vec<_>>>()?;
339 Ok(PyList::new(py, values)?.into_any())
340 }
341}
342
343impl FromPyArrow for RecordBatch {
344 fn from_pyarrow_bound(value: &Bound<PyAny>) -> PyResult<Self> {
345 if value.hasattr("__arrow_c_array__")? {
349 let tuple = value.getattr("__arrow_c_array__")?.call0()?;
350
351 if !tuple.is_instance_of::<PyTuple>() {
352 return Err(PyTypeError::new_err(
353 "Expected __arrow_c_array__ to return a tuple.",
354 ));
355 }
356
357 let schema_capsule = tuple.get_item(0)?;
358 let schema_capsule = schema_capsule.downcast::<PyCapsule>()?;
359 let array_capsule = tuple.get_item(1)?;
360 let array_capsule = array_capsule.downcast::<PyCapsule>()?;
361
362 validate_pycapsule(schema_capsule, "arrow_schema")?;
363 validate_pycapsule(array_capsule, "arrow_array")?;
364
365 let schema_ptr = unsafe { schema_capsule.reference::<FFI_ArrowSchema>() };
366 let ffi_array = unsafe { FFI_ArrowArray::from_raw(array_capsule.pointer().cast()) };
367 let mut array_data =
368 unsafe { ffi::from_ffi(ffi_array, schema_ptr) }.map_err(to_py_err)?;
369 if !matches!(array_data.data_type(), DataType::Struct(_)) {
370 return Err(PyTypeError::new_err(
371 "Expected Struct type from __arrow_c_array.",
372 ));
373 }
374 let options = RecordBatchOptions::default().with_row_count(Some(array_data.len()));
375 array_data.align_buffers();
380 let array = StructArray::from(array_data);
381 let schema = Arc::new(Schema::try_from(schema_ptr).map_err(to_py_err)?);
384 let (_fields, columns, nulls) = array.into_parts();
385 assert_eq!(
386 nulls.map(|n| n.null_count()).unwrap_or_default(),
387 0,
388 "Cannot convert nullable StructArray to RecordBatch, see StructArray documentation"
389 );
390 return RecordBatch::try_new_with_options(schema, columns, &options).map_err(to_py_err);
391 }
392
393 validate_class("RecordBatch", value)?;
394 let schema = value.getattr("schema")?;
396 let schema = Arc::new(Schema::from_pyarrow_bound(&schema)?);
397
398 let arrays = value.getattr("columns")?;
399 let arrays = arrays
400 .downcast::<PyList>()?
401 .iter()
402 .map(|a| Ok(make_array(ArrayData::from_pyarrow_bound(&a)?)))
403 .collect::<PyResult<_>>()?;
404
405 let row_count = value
406 .getattr("num_rows")
407 .ok()
408 .and_then(|x| x.extract().ok());
409 let options = RecordBatchOptions::default().with_row_count(row_count);
410
411 let batch =
412 RecordBatch::try_new_with_options(schema, arrays, &options).map_err(to_py_err)?;
413 Ok(batch)
414 }
415}
416
417impl ToPyArrow for RecordBatch {
418 fn to_pyarrow<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
419 let reader = RecordBatchIterator::new(vec![Ok(self.clone())], self.schema());
421 let reader: Box<dyn RecordBatchReader + Send> = Box::new(reader);
422 let py_reader = reader.into_pyarrow(py)?;
423 py_reader.call_method0("read_next_batch")
424 }
425}
426
427impl FromPyArrow for ArrowArrayStreamReader {
429 fn from_pyarrow_bound(value: &Bound<PyAny>) -> PyResult<Self> {
430 if value.hasattr("__arrow_c_stream__")? {
434 let capsule = value.getattr("__arrow_c_stream__")?.call0()?;
435 let capsule = capsule.downcast::<PyCapsule>()?;
436 validate_pycapsule(capsule, "arrow_array_stream")?;
437
438 let stream = unsafe { FFI_ArrowArrayStream::from_raw(capsule.pointer() as _) };
439
440 let stream_reader = ArrowArrayStreamReader::try_new(stream)
441 .map_err(|err| PyValueError::new_err(err.to_string()))?;
442
443 return Ok(stream_reader);
444 }
445
446 validate_class("RecordBatchReader", value)?;
447
448 let mut stream = FFI_ArrowArrayStream::empty();
450 let stream_ptr = &mut stream as *mut FFI_ArrowArrayStream;
451
452 let args = PyTuple::new(value.py(), [stream_ptr as Py_uintptr_t])?;
456 value.call_method1("_export_to_c", args)?;
457
458 let stream_reader = ArrowArrayStreamReader::try_new(stream)
459 .map_err(|err| PyValueError::new_err(err.to_string()))?;
460
461 Ok(stream_reader)
462 }
463}
464
465impl IntoPyArrow for Box<dyn RecordBatchReader + Send> {
467 fn into_pyarrow<'py>(self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
470 let mut stream = FFI_ArrowArrayStream::new(self);
471
472 let stream_ptr = (&mut stream) as *mut FFI_ArrowArrayStream;
473 let module = py.import("pyarrow")?;
474 let class = module.getattr("RecordBatchReader")?;
475 let args = PyTuple::new(py, [stream_ptr as Py_uintptr_t])?;
476 let reader = class.call_method1("_import_from_c", args)?;
477
478 Ok(reader)
479 }
480}
481
482impl IntoPyArrow for ArrowArrayStreamReader {
484 fn into_pyarrow<'py>(self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
485 let boxed: Box<dyn RecordBatchReader + Send> = Box::new(self);
486 boxed.into_pyarrow(py)
487 }
488}
489
490#[derive(Clone)]
508pub struct Table {
509 record_batches: Vec<RecordBatch>,
510 schema: SchemaRef,
511}
512
513impl Table {
514 pub fn try_new(
515 record_batches: Vec<RecordBatch>,
516 schema: SchemaRef,
517 ) -> Result<Self, ArrowError> {
518 for record_batch in &record_batches {
519 if schema != record_batch.schema() {
520 return Err(ArrowError::SchemaError(format!(
521 "All record batches must have the same schema. \
522 Expected schema: {:?}, got schema: {:?}",
523 schema,
524 record_batch.schema()
525 )));
526 }
527 }
528 Ok(Self {
529 record_batches,
530 schema,
531 })
532 }
533
534 pub fn record_batches(&self) -> &[RecordBatch] {
535 &self.record_batches
536 }
537
538 pub fn schema(&self) -> SchemaRef {
539 self.schema.clone()
540 }
541
542 pub fn into_inner(self) -> (Vec<RecordBatch>, SchemaRef) {
543 (self.record_batches, self.schema)
544 }
545}
546
547impl TryFrom<Box<dyn RecordBatchReader>> for Table {
548 type Error = ArrowError;
549
550 fn try_from(value: Box<dyn RecordBatchReader>) -> Result<Self, ArrowError> {
551 let schema = value.schema();
552 let batches = value.collect::<Result<Vec<_>, _>>()?;
553 Self::try_new(batches, schema)
554 }
555}
556
557impl FromPyArrow for Table {
559 fn from_pyarrow_bound(ob: &Bound<PyAny>) -> PyResult<Self> {
560 let reader: Box<dyn RecordBatchReader> =
561 Box::new(ArrowArrayStreamReader::from_pyarrow_bound(ob)?);
562 Self::try_from(reader).map_err(|err| PyErr::new::<PyValueError, _>(err.to_string()))
563 }
564}
565
566impl IntoPyArrow for Table {
568 fn into_pyarrow(self, py: Python) -> PyResult<Bound<PyAny>> {
569 let module = py.import(intern!(py, "pyarrow"))?;
570 let class = module.getattr(intern!(py, "Table"))?;
571
572 let py_batches = PyList::new(py, self.record_batches.into_iter().map(PyArrowType))?;
573 let py_schema = PyArrowType(Arc::unwrap_or_clone(self.schema));
574
575 let kwargs = PyDict::new(py);
576 kwargs.set_item("schema", py_schema)?;
577
578 let reader = class.call_method("from_batches", (py_batches,), Some(&kwargs))?;
579
580 Ok(reader)
581 }
582}
583
584#[derive(Debug)]
590pub struct PyArrowType<T>(pub T);
591
592impl<'source, T: FromPyArrow> FromPyObject<'source> for PyArrowType<T> {
593 fn extract_bound(value: &Bound<'source, PyAny>) -> PyResult<Self> {
594 Ok(Self(T::from_pyarrow_bound(value)?))
595 }
596}
597
598impl<'py, T: IntoPyArrow> IntoPyObject<'py> for PyArrowType<T> {
599 type Target = PyAny;
600
601 type Output = Bound<'py, Self::Target>;
602
603 type Error = PyErr;
604
605 fn into_pyobject(self, py: Python<'py>) -> Result<Self::Output, PyErr> {
606 self.0.into_pyarrow(py)
607 }
608}
609
610impl<T> From<T> for PyArrowType<T> {
611 fn from(s: T) -> Self {
612 Self(s)
613 }
614}