Skip to main content

arrow_array/
record_batch.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! A two-dimensional batch of column-oriented data with a defined
19//! [schema](arrow_schema::Schema).
20
21use crate::cast::AsArray;
22use crate::{Array, ArrayRef, StructArray, new_empty_array};
23use arrow_schema::{ArrowError, DataType, Field, FieldRef, Schema, SchemaBuilder, SchemaRef};
24use std::ops::Index;
25use std::sync::Arc;
26
27/// Trait for types that can read `RecordBatch`'s.
28///
29/// To create from an iterator, see [RecordBatchIterator].
30pub trait RecordBatchReader: Iterator<Item = Result<RecordBatch, ArrowError>> {
31    /// Returns the schema of this `RecordBatchReader`.
32    ///
33    /// Implementation of this trait should guarantee that all `RecordBatch`'s returned by this
34    /// reader should have the same schema as returned from this method.
35    fn schema(&self) -> SchemaRef;
36}
37
38impl<R: RecordBatchReader + ?Sized> RecordBatchReader for Box<R> {
39    fn schema(&self) -> SchemaRef {
40        self.as_ref().schema()
41    }
42}
43
44/// Trait for types that can write `RecordBatch`'s.
45pub trait RecordBatchWriter {
46    /// Write a single batch to the writer.
47    fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError>;
48
49    /// Write footer or termination data, then mark the writer as done.
50    fn close(self) -> Result<(), ArrowError>;
51}
52
53/// Creates an array from a literal slice of values,
54/// suitable for rapid testing and development.
55///
56/// Example:
57///
58/// ```rust
59///
60/// use arrow_array::create_array;
61///
62/// let array = create_array!(Int32, [1, 2, 3, 4, 5]);
63/// let array = create_array!(Utf8, [Some("a"), Some("b"), None, Some("e")]);
64/// ```
65/// Support for limited data types is available. The macro will return a compile error if an unsupported data type is used.
66/// Presently supported data types are:
67/// - `Boolean`, `Null`
68/// - `Decimal32`, `Decimal64`, `Decimal128`, `Decimal256`
69/// - `Float16`, `Float32`, `Float64`
70/// - `Int8`, `Int16`, `Int32`, `Int64`
71/// - `UInt8`, `UInt16`, `UInt32`, `UInt64`
72/// - `IntervalDayTime`, `IntervalYearMonth`
73/// - `Second`, `Millisecond`, `Microsecond`, `Nanosecond`
74/// - `Second32`, `Millisecond32`, `Microsecond64`, `Nanosecond64`
75/// - `DurationSecond`, `DurationMillisecond`, `DurationMicrosecond`, `DurationNanosecond`
76/// - `TimestampSecond`, `TimestampMillisecond`, `TimestampMicrosecond`, `TimestampNanosecond`
77/// - `Utf8`, `Utf8View`, `LargeUtf8`, `Binary`, `LargeBinary`
78#[macro_export]
79macro_rules! create_array {
80    // `@from` is used for those types that have a common method `<type>::from`
81    (@from Boolean) => { $crate::BooleanArray };
82    (@from Int8) => { $crate::Int8Array };
83    (@from Int16) => { $crate::Int16Array };
84    (@from Int32) => { $crate::Int32Array };
85    (@from Int64) => { $crate::Int64Array };
86    (@from UInt8) => { $crate::UInt8Array };
87    (@from UInt16) => { $crate::UInt16Array };
88    (@from UInt32) => { $crate::UInt32Array };
89    (@from UInt64) => { $crate::UInt64Array };
90    (@from Float16) => { $crate::Float16Array };
91    (@from Float32) => { $crate::Float32Array };
92    (@from Float64) => { $crate::Float64Array };
93    (@from Utf8) => { $crate::StringArray };
94    (@from Utf8View) => { $crate::StringViewArray };
95    (@from LargeUtf8) => { $crate::LargeStringArray };
96    (@from IntervalDayTime) => { $crate::IntervalDayTimeArray };
97    (@from IntervalYearMonth) => { $crate::IntervalYearMonthArray };
98    (@from Second) => { $crate::TimestampSecondArray };
99    (@from Millisecond) => { $crate::TimestampMillisecondArray };
100    (@from Microsecond) => { $crate::TimestampMicrosecondArray };
101    (@from Nanosecond) => { $crate::TimestampNanosecondArray };
102    (@from Second32) => { $crate::Time32SecondArray };
103    (@from Millisecond32) => { $crate::Time32MillisecondArray };
104    (@from Microsecond64) => { $crate::Time64MicrosecondArray };
105    (@from Nanosecond64) => { $crate::Time64Nanosecond64Array };
106    (@from DurationSecond) => { $crate::DurationSecondArray };
107    (@from DurationMillisecond) => { $crate::DurationMillisecondArray };
108    (@from DurationMicrosecond) => { $crate::DurationMicrosecondArray };
109    (@from DurationNanosecond) => { $crate::DurationNanosecondArray };
110    (@from Decimal32) => { $crate::Decimal32Array };
111    (@from Decimal64) => { $crate::Decimal64Array };
112    (@from Decimal128) => { $crate::Decimal128Array };
113    (@from Decimal256) => { $crate::Decimal256Array };
114    (@from TimestampSecond) => { $crate::TimestampSecondArray };
115    (@from TimestampMillisecond) => { $crate::TimestampMillisecondArray };
116    (@from TimestampMicrosecond) => { $crate::TimestampMicrosecondArray };
117    (@from TimestampNanosecond) => { $crate::TimestampNanosecondArray };
118
119    (@from $ty: ident) => {
120        compile_error!(concat!("Unsupported data type: ", stringify!($ty)))
121    };
122
123    (Null, $size: expr) => {
124        std::sync::Arc::new($crate::NullArray::new($size))
125    };
126
127    (Binary, [$($values: expr),*]) => {
128        std::sync::Arc::new($crate::BinaryArray::from_vec(vec![$($values),*]))
129    };
130
131    (LargeBinary, [$($values: expr),*]) => {
132        std::sync::Arc::new($crate::LargeBinaryArray::from_vec(vec![$($values),*]))
133    };
134
135    ($ty: tt, [$($values: expr),*]) => {
136        std::sync::Arc::new(<$crate::create_array!(@from $ty)>::from(vec![$($values),*]))
137    };
138
139    (Binary, $values: expr) => {
140        std::sync::Arc::new($crate::BinaryArray::from_vec($values))
141    };
142
143    (LargeBinary, $values: expr) => {
144        std::sync::Arc::new($crate::LargeBinaryArray::from_vec($values))
145    };
146
147    ($ty: tt, $values: expr) => {
148        std::sync::Arc::new(<$crate::create_array!(@from $ty)>::from($values))
149    };
150}
151
152/// Creates a record batch from literal slice of values, suitable for rapid
153/// testing and development.
154///
155/// Example:
156///
157/// ```rust
158/// use arrow_array::record_batch;
159/// use arrow_schema;
160///
161/// let batch = record_batch!(
162///     ("a", Int32, [1, 2, 3]),
163///     ("b", Float64, [Some(4.0), None, Some(5.0)]),
164///     ("c", Utf8, ["alpha", "beta", "gamma"])
165/// );
166/// ```
167///
168/// Variables and expressions are also supported:
169///
170/// ```rust
171/// use arrow_array::record_batch;
172///
173/// let values = vec![1, 2, 3];
174/// let batch = record_batch!(
175///     ("a", Int32, values),
176///     ("b", Float64, vec![Some(4.0), None, Some(5.0)])
177/// );
178/// ```
179/// Due to limitation of [`create_array!`] macro, support for limited data types is available.
180#[macro_export]
181macro_rules! record_batch {
182    ($(($name: expr, $type: ident, $($values: tt)+)),*) => {
183        {
184            let schema = std::sync::Arc::new(arrow_schema::Schema::new(vec![
185                $(
186                    arrow_schema::Field::new($name, arrow_schema::DataType::$type, true),
187                )*
188            ]));
189
190            $crate::RecordBatch::try_new(
191                schema,
192                vec![$(
193                    $crate::create_array!($type, $($values)+),
194                )*]
195            )
196        }
197    };
198}
199
200/// A two-dimensional batch of column-oriented data with a defined
201/// [schema](arrow_schema::Schema).
202///
203/// A `RecordBatch` is a two-dimensional dataset of a number of
204/// contiguous arrays, each the same length.
205/// A record batch has a schema which must match its arrays’
206/// datatypes.
207///
208/// Record batches are a convenient unit of work for various
209/// serialization and computation functions, possibly incremental.
210///
211/// Use the [`record_batch!`] macro to create a [`RecordBatch`] from
212/// literal slice of values, useful for rapid prototyping and testing.
213///
214/// Example:
215/// ```rust
216/// use arrow_array::record_batch;
217/// let batch = record_batch!(
218///     ("a", Int32, [1, 2, 3]),
219///     ("b", Float64, [Some(4.0), None, Some(5.0)]),
220///     ("c", Utf8, ["alpha", "beta", "gamma"])
221/// );
222/// ```
223#[derive(Clone, Debug, PartialEq)]
224pub struct RecordBatch {
225    schema: SchemaRef,
226    columns: Vec<Arc<dyn Array>>,
227
228    /// The number of rows in this RecordBatch
229    ///
230    /// This is stored separately from the columns to handle the case of no columns
231    row_count: usize,
232}
233
234impl RecordBatch {
235    /// Creates a `RecordBatch` from a schema and columns.
236    ///
237    /// Expects the following:
238    ///
239    ///  * `!columns.is_empty()`
240    ///  * `schema.fields.len() == columns.len()`
241    ///  * `schema.fields[i].data_type() == columns[i].data_type()`
242    ///  * `columns[i].len() == columns[j].len()`
243    ///
244    /// If the conditions are not met, an error is returned.
245    ///
246    /// # Example
247    ///
248    /// ```
249    /// # use std::sync::Arc;
250    /// # use arrow_array::{Int32Array, RecordBatch};
251    /// # use arrow_schema::{DataType, Field, Schema};
252    ///
253    /// let id_array = Int32Array::from(vec![1, 2, 3, 4, 5]);
254    /// let schema = Schema::new(vec![
255    ///     Field::new("id", DataType::Int32, false)
256    /// ]);
257    ///
258    /// let batch = RecordBatch::try_new(
259    ///     Arc::new(schema),
260    ///     vec![Arc::new(id_array)]
261    /// ).unwrap();
262    /// ```
263    pub fn try_new(schema: SchemaRef, columns: Vec<ArrayRef>) -> Result<Self, ArrowError> {
264        let options = RecordBatchOptions::new();
265        Self::try_new_impl(schema, columns, &options)
266    }
267
268    /// Creates a `RecordBatch` from a schema and columns, without validation.
269    ///
270    /// See [`Self::try_new`] for the checked version.
271    ///
272    /// # Safety
273    ///
274    /// Expects the following:
275    ///
276    ///  * `schema.fields.len() == columns.len()`
277    ///  * `schema.fields[i].data_type() == columns[i].data_type()`
278    ///  * `columns[i].len() == row_count`
279    ///
280    /// Note: if the schema does not match the underlying data exactly, it can lead to undefined
281    /// behavior, for example, via conversion to a `StructArray`, which in turn could lead
282    /// to incorrect access.
283    pub unsafe fn new_unchecked(
284        schema: SchemaRef,
285        columns: Vec<Arc<dyn Array>>,
286        row_count: usize,
287    ) -> Self {
288        Self {
289            schema,
290            columns,
291            row_count,
292        }
293    }
294
295    /// Creates a `RecordBatch` from a schema and columns, with additional options,
296    /// such as whether to strictly validate field names.
297    ///
298    /// See [`RecordBatch::try_new`] for the expected conditions.
299    pub fn try_new_with_options(
300        schema: SchemaRef,
301        columns: Vec<ArrayRef>,
302        options: &RecordBatchOptions,
303    ) -> Result<Self, ArrowError> {
304        Self::try_new_impl(schema, columns, options)
305    }
306
307    /// Creates a new empty [`RecordBatch`].
308    pub fn new_empty(schema: SchemaRef) -> Self {
309        let columns = schema
310            .fields()
311            .iter()
312            .map(|field| new_empty_array(field.data_type()))
313            .collect();
314
315        RecordBatch {
316            schema,
317            columns,
318            row_count: 0,
319        }
320    }
321
322    /// Validate the schema and columns using [`RecordBatchOptions`]. Returns an error
323    /// if any validation check fails, otherwise returns the created [`Self`]
324    fn try_new_impl(
325        schema: SchemaRef,
326        columns: Vec<ArrayRef>,
327        options: &RecordBatchOptions,
328    ) -> Result<Self, ArrowError> {
329        // check that number of fields in schema match column length
330        if schema.fields().len() != columns.len() {
331            return Err(ArrowError::InvalidArgumentError(format!(
332                "number of columns({}) must match number of fields({}) in schema",
333                columns.len(),
334                schema.fields().len(),
335            )));
336        }
337
338        let row_count = options
339            .row_count
340            .or_else(|| columns.first().map(|col| col.len()))
341            .ok_or_else(|| {
342                ArrowError::InvalidArgumentError(
343                    "must either specify a row count or at least one column".to_string(),
344                )
345            })?;
346
347        for (c, f) in columns.iter().zip(&schema.fields) {
348            if !f.is_nullable() && c.null_count() > 0 {
349                return Err(ArrowError::InvalidArgumentError(format!(
350                    "Column '{}' is declared as non-nullable but contains null values",
351                    f.name()
352                )));
353            }
354        }
355
356        // check that all columns have the same row count
357        if columns.iter().any(|c| c.len() != row_count) {
358            let err = match options.row_count {
359                Some(_) => "all columns in a record batch must have the specified row count",
360                None => "all columns in a record batch must have the same length",
361            };
362            return Err(ArrowError::InvalidArgumentError(err.to_string()));
363        }
364
365        // function for comparing column type and field type
366        // return true if 2 types are not matched
367        let type_not_match = if options.match_field_names {
368            |(_, (col_type, field_type)): &(usize, (&DataType, &DataType))| col_type != field_type
369        } else {
370            |(_, (col_type, field_type)): &(usize, (&DataType, &DataType))| {
371                !col_type.equals_datatype(field_type)
372            }
373        };
374
375        // check that all columns match the schema
376        let not_match = columns
377            .iter()
378            .zip(schema.fields().iter())
379            .map(|(col, field)| (col.data_type(), field.data_type()))
380            .enumerate()
381            .find(type_not_match);
382
383        if let Some((i, (col_type, field_type))) = not_match {
384            return Err(ArrowError::InvalidArgumentError(format!(
385                "column types must match schema types, expected {field_type} but found {col_type} at column index {i}"
386            )));
387        }
388
389        Ok(RecordBatch {
390            schema,
391            columns,
392            row_count,
393        })
394    }
395
396    /// Return the schema, columns and row count of this [`RecordBatch`]
397    pub fn into_parts(self) -> (SchemaRef, Vec<ArrayRef>, usize) {
398        (self.schema, self.columns, self.row_count)
399    }
400
401    /// Override the schema of this [`RecordBatch`]
402    ///
403    /// Returns an error if `schema` is not a superset of the current schema
404    /// as determined by [`Schema::contains`]
405    ///
406    /// See also [`Self::schema_metadata_mut`].
407    pub fn with_schema(self, schema: SchemaRef) -> Result<Self, ArrowError> {
408        if !schema.contains(self.schema.as_ref()) {
409            return Err(ArrowError::SchemaError(format!(
410                "target schema is not superset of current schema target={schema} current={}",
411                self.schema
412            )));
413        }
414
415        Ok(Self {
416            schema,
417            columns: self.columns,
418            row_count: self.row_count,
419        })
420    }
421
422    /// Returns the [`Schema`] of the record batch.
423    pub fn schema(&self) -> SchemaRef {
424        self.schema.clone()
425    }
426
427    /// Returns a reference to the [`Schema`] of the record batch.
428    pub fn schema_ref(&self) -> &SchemaRef {
429        &self.schema
430    }
431
432    /// Mutable access to the metadata of the schema.
433    ///
434    /// This allows you to modify [`Schema::metadata`] of [`Self::schema`] in a convenient and fast way.
435    ///
436    /// Note this will clone the entire underlying `Schema` object if it is currently shared
437    ///
438    /// # Example
439    /// ```
440    /// # use std::sync::Arc;
441    /// # use arrow_array::{record_batch, RecordBatch};
442    /// let mut batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
443    /// // Initially, the metadata is empty
444    /// assert!(batch.schema().metadata().get("key").is_none());
445    /// // Insert a key-value pair into the metadata
446    /// batch.schema_metadata_mut().insert("key".into(), "value".into());
447    /// assert_eq!(batch.schema().metadata().get("key"), Some(&String::from("value")));
448    /// ```
449    pub fn schema_metadata_mut(&mut self) -> &mut std::collections::HashMap<String, String> {
450        let schema = Arc::make_mut(&mut self.schema);
451        &mut schema.metadata
452    }
453
454    /// Projects the schema onto the specified columns
455    pub fn project(&self, indices: &[usize]) -> Result<RecordBatch, ArrowError> {
456        let projected_schema = self.schema.project(indices)?;
457        let batch_fields = indices
458            .iter()
459            .map(|f| {
460                self.columns.get(*f).cloned().ok_or_else(|| {
461                    ArrowError::SchemaError(format!(
462                        "project index {} out of bounds, max field {}",
463                        f,
464                        self.columns.len()
465                    ))
466                })
467            })
468            .collect::<Result<Vec<_>, _>>()?;
469
470        unsafe {
471            // Since we're starting from a valid RecordBatch and project
472            // creates a strict subset of the original, there's no need to
473            // redo the validation checks in `try_new_with_options`.
474            Ok(RecordBatch::new_unchecked(
475                SchemaRef::new(projected_schema),
476                batch_fields,
477                self.row_count,
478            ))
479        }
480    }
481
482    /// Normalize a semi-structured [`RecordBatch`] into a flat table.
483    ///
484    /// Nested [`Field`]s will generate names separated by `separator`, up to a depth of `max_level`
485    /// (unlimited if `None`).
486    ///
487    /// e.g. given a [`RecordBatch`] with schema:
488    ///
489    /// ```text
490    ///     "foo": StructArray<"bar": Utf8>
491    /// ```
492    ///
493    /// A separator of `"."` would generate a batch with the schema:
494    ///
495    /// ```text
496    ///     "foo.bar": Utf8
497    /// ```
498    ///
499    /// Note that giving a depth of `Some(0)` to `max_level` is the same as passing in `None`;
500    /// it will be treated as unlimited.
501    ///
502    /// # Example
503    ///
504    /// ```
505    /// # use std::sync::Arc;
506    /// # use arrow_array::{ArrayRef, Int64Array, StringArray, StructArray, RecordBatch};
507    /// # use arrow_schema::{DataType, Field, Fields, Schema};
508    /// #
509    /// let animals: ArrayRef = Arc::new(StringArray::from(vec!["Parrot", ""]));
510    /// let n_legs: ArrayRef = Arc::new(Int64Array::from(vec![Some(2), Some(4)]));
511    ///
512    /// let animals_field = Arc::new(Field::new("animals", DataType::Utf8, true));
513    /// let n_legs_field = Arc::new(Field::new("n_legs", DataType::Int64, true));
514    ///
515    /// let a = Arc::new(StructArray::from(vec![
516    ///     (animals_field.clone(), Arc::new(animals.clone()) as ArrayRef),
517    ///     (n_legs_field.clone(), Arc::new(n_legs.clone()) as ArrayRef),
518    /// ]));
519    ///
520    /// let schema = Schema::new(vec![
521    ///     Field::new(
522    ///         "a",
523    ///         DataType::Struct(Fields::from(vec![animals_field, n_legs_field])),
524    ///         false,
525    ///     )
526    /// ]);
527    ///
528    /// let normalized = RecordBatch::try_new(Arc::new(schema), vec![a])
529    ///     .expect("valid conversion")
530    ///     .normalize(".", None)
531    ///     .expect("valid normalization");
532    ///
533    /// let expected = RecordBatch::try_from_iter_with_nullable(vec![
534    ///     ("a.animals", animals.clone(), true),
535    ///     ("a.n_legs", n_legs.clone(), true),
536    /// ])
537    /// .expect("valid conversion");
538    ///
539    /// assert_eq!(expected, normalized);
540    /// ```
541    pub fn normalize(&self, separator: &str, max_level: Option<usize>) -> Result<Self, ArrowError> {
542        let max_level = match max_level.unwrap_or(usize::MAX) {
543            0 => usize::MAX,
544            val => val,
545        };
546        let mut stack: Vec<(usize, ArrayRef, String, FieldRef)> = self
547            .columns
548            .iter()
549            .zip(self.schema.fields())
550            .rev()
551            .map(|(c, f)| (0, c.clone(), f.name().clone(), Arc::clone(f)))
552            .collect();
553        let mut columns: Vec<ArrayRef> = Vec::new();
554        let mut fields: Vec<FieldRef> = Vec::new();
555
556        while let Some((depth, c, name, field_ref)) = stack.pop() {
557            match field_ref.data_type() {
558                DataType::Struct(_) if depth < max_level => {
559                    let (flat_fields, flat_cols) = c.as_struct().flatten();
560                    for (cff, fff) in flat_cols.into_iter().zip(flat_fields.iter()).rev() {
561                        let child_name = format!("{name}{separator}{}", fff.name());
562                        stack.push((depth + 1, cff, child_name, Arc::clone(fff)))
563                    }
564                }
565                _ => {
566                    let updated_field =
567                        Field::new(name, field_ref.data_type().clone(), field_ref.is_nullable());
568                    columns.push(c);
569                    fields.push(Arc::new(updated_field));
570                }
571            }
572        }
573        RecordBatch::try_new(Arc::new(Schema::new(fields)), columns)
574    }
575
576    /// Returns the number of columns in the record batch.
577    ///
578    /// # Example
579    ///
580    /// ```
581    /// # use std::sync::Arc;
582    /// # use arrow_array::{Int32Array, RecordBatch};
583    /// # use arrow_schema::{DataType, Field, Schema};
584    ///
585    /// let id_array = Int32Array::from(vec![1, 2, 3, 4, 5]);
586    /// let schema = Schema::new(vec![
587    ///     Field::new("id", DataType::Int32, false)
588    /// ]);
589    ///
590    /// let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(id_array)]).unwrap();
591    ///
592    /// assert_eq!(batch.num_columns(), 1);
593    /// ```
594    pub fn num_columns(&self) -> usize {
595        self.columns.len()
596    }
597
598    /// Returns the number of rows in each column.
599    ///
600    /// # Example
601    ///
602    /// ```
603    /// # use std::sync::Arc;
604    /// # use arrow_array::{Int32Array, RecordBatch};
605    /// # use arrow_schema::{DataType, Field, Schema};
606    ///
607    /// let id_array = Int32Array::from(vec![1, 2, 3, 4, 5]);
608    /// let schema = Schema::new(vec![
609    ///     Field::new("id", DataType::Int32, false)
610    /// ]);
611    ///
612    /// let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(id_array)]).unwrap();
613    ///
614    /// assert_eq!(batch.num_rows(), 5);
615    /// ```
616    pub fn num_rows(&self) -> usize {
617        self.row_count
618    }
619
620    /// Get a reference to a column's array by index.
621    ///
622    /// # Panics
623    ///
624    /// Panics if `index` is outside of `0..num_columns`.
625    pub fn column(&self, index: usize) -> &ArrayRef {
626        &self.columns[index]
627    }
628
629    /// Get a reference to a column's array by name.
630    pub fn column_by_name(&self, name: &str) -> Option<&ArrayRef> {
631        self.schema()
632            .column_with_name(name)
633            .map(|(index, _)| &self.columns[index])
634    }
635
636    /// Get a reference to all columns in the record batch.
637    pub fn columns(&self) -> &[ArrayRef] {
638        &self.columns[..]
639    }
640
641    /// Remove column by index and return it.
642    ///
643    /// Return the `ArrayRef` if the column is removed.
644    ///
645    /// # Panics
646    ///
647    /// Panics if `index`` out of bounds.
648    ///
649    /// # Example
650    ///
651    /// ```
652    /// use std::sync::Arc;
653    /// use arrow_array::{BooleanArray, Int32Array, RecordBatch};
654    /// use arrow_schema::{DataType, Field, Schema};
655    /// let id_array = Int32Array::from(vec![1, 2, 3, 4, 5]);
656    /// let bool_array = BooleanArray::from(vec![true, false, false, true, true]);
657    /// let schema = Schema::new(vec![
658    ///     Field::new("id", DataType::Int32, false),
659    ///     Field::new("bool", DataType::Boolean, false),
660    /// ]);
661    ///
662    /// let mut batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(id_array), Arc::new(bool_array)]).unwrap();
663    ///
664    /// let removed_column = batch.remove_column(0);
665    /// assert_eq!(removed_column.as_any().downcast_ref::<Int32Array>().unwrap(), &Int32Array::from(vec![1, 2, 3, 4, 5]));
666    /// assert_eq!(batch.num_columns(), 1);
667    /// ```
668    pub fn remove_column(&mut self, index: usize) -> ArrayRef {
669        let mut builder = SchemaBuilder::from(self.schema.as_ref());
670        builder.remove(index);
671        self.schema = Arc::new(builder.finish());
672        self.columns.remove(index)
673    }
674
675    /// Return a new RecordBatch where each column is sliced
676    /// according to `offset` and `length`
677    ///
678    /// # Panics
679    ///
680    /// Panics if `offset` with `length` is greater than column length.
681    pub fn slice(&self, offset: usize, length: usize) -> RecordBatch {
682        assert!((offset + length) <= self.num_rows());
683
684        let columns = self
685            .columns()
686            .iter()
687            .map(|column| column.slice(offset, length))
688            .collect();
689
690        Self {
691            schema: self.schema.clone(),
692            columns,
693            row_count: length,
694        }
695    }
696
697    /// Create a `RecordBatch` from an iterable list of pairs of the
698    /// form `(field_name, array)`, with the same requirements on
699    /// fields and arrays as [`RecordBatch::try_new`]. This method is
700    /// often used to create a single `RecordBatch` from arrays,
701    /// e.g. for testing.
702    ///
703    /// The resulting schema is marked as nullable for each column if
704    /// the array for that column is has any nulls. To explicitly
705    /// specify nullibility, use [`RecordBatch::try_from_iter_with_nullable`]
706    ///
707    /// Example:
708    /// ```
709    /// # use std::sync::Arc;
710    /// # use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray};
711    ///
712    /// let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2]));
713    /// let b: ArrayRef = Arc::new(StringArray::from(vec!["a", "b"]));
714    ///
715    /// let record_batch = RecordBatch::try_from_iter(vec![
716    ///   ("a", a),
717    ///   ("b", b),
718    /// ]);
719    /// ```
720    /// Another way to quickly create a [`RecordBatch`] is to use the [`record_batch!`] macro,
721    /// which is particularly helpful for rapid prototyping and testing.
722    ///
723    /// Example:
724    ///
725    /// ```rust
726    /// use arrow_array::record_batch;
727    /// let batch = record_batch!(
728    ///     ("a", Int32, [1, 2, 3]),
729    ///     ("b", Float64, [Some(4.0), None, Some(5.0)]),
730    ///     ("c", Utf8, ["alpha", "beta", "gamma"])
731    /// );
732    /// ```
733    pub fn try_from_iter<I, F>(value: I) -> Result<Self, ArrowError>
734    where
735        I: IntoIterator<Item = (F, ArrayRef)>,
736        F: AsRef<str>,
737    {
738        // TODO: implement `TryFrom` trait, once
739        // https://github.com/rust-lang/rust/issues/50133 is no longer an
740        // issue
741        let iter = value.into_iter().map(|(field_name, array)| {
742            let nullable = array.null_count() > 0;
743            (field_name, array, nullable)
744        });
745
746        Self::try_from_iter_with_nullable(iter)
747    }
748
749    /// Create a `RecordBatch` from an iterable list of tuples of the
750    /// form `(field_name, array, nullable)`, with the same requirements on
751    /// fields and arrays as [`RecordBatch::try_new`]. This method is often
752    /// used to create a single `RecordBatch` from arrays, e.g. for
753    /// testing.
754    ///
755    /// Example:
756    /// ```
757    /// # use std::sync::Arc;
758    /// # use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray};
759    ///
760    /// let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2]));
761    /// let b: ArrayRef = Arc::new(StringArray::from(vec![Some("a"), Some("b")]));
762    ///
763    /// // Note neither `a` nor `b` has any actual nulls, but we mark
764    /// // b an nullable
765    /// let record_batch = RecordBatch::try_from_iter_with_nullable(vec![
766    ///   ("a", a, false),
767    ///   ("b", b, true),
768    /// ]);
769    /// ```
770    pub fn try_from_iter_with_nullable<I, F>(value: I) -> Result<Self, ArrowError>
771    where
772        I: IntoIterator<Item = (F, ArrayRef, bool)>,
773        F: AsRef<str>,
774    {
775        let iter = value.into_iter();
776        let capacity = iter.size_hint().0;
777        let mut schema = SchemaBuilder::with_capacity(capacity);
778        let mut columns = Vec::with_capacity(capacity);
779
780        for (field_name, array, nullable) in iter {
781            let field_name = field_name.as_ref();
782            schema.push(Field::new(field_name, array.data_type().clone(), nullable));
783            columns.push(array);
784        }
785
786        let schema = Arc::new(schema.finish());
787        RecordBatch::try_new(schema, columns)
788    }
789
790    /// Registers all buffers in this record batch with the provided [`MemoryPool`].
791    ///
792    /// This claims memory for all columns in the batch by calling [`Array::claim`]
793    /// on each column.
794    ///
795    /// [`MemoryPool`]: arrow_buffer::MemoryPool
796    /// [`Array::claim`]: crate::Array::claim
797    #[cfg(feature = "pool")]
798    pub fn claim(&self, pool: &dyn arrow_buffer::MemoryPool) {
799        for column in self.columns() {
800            column.claim(pool);
801        }
802    }
803
804    /// Returns the total number of bytes of memory occupied physically by this batch.
805    ///
806    /// Note that this does not always correspond to the exact memory usage of a
807    /// `RecordBatch` (might overestimate), since multiple columns can share the same
808    /// buffers or slices thereof, the memory used by the shared buffers might be
809    /// counted multiple times.
810    pub fn get_array_memory_size(&self) -> usize {
811        self.columns()
812            .iter()
813            .map(|array| array.get_array_memory_size())
814            .sum()
815    }
816}
817
818/// Options that control the behaviour used when creating a [`RecordBatch`].
819#[derive(Debug)]
820#[non_exhaustive]
821pub struct RecordBatchOptions {
822    /// Match field names of structs and lists. If set to `true`, the names must match.
823    pub match_field_names: bool,
824
825    /// Optional row count, useful for specifying a row count for a RecordBatch with no columns
826    pub row_count: Option<usize>,
827}
828
829impl RecordBatchOptions {
830    /// Creates a new `RecordBatchOptions`
831    pub fn new() -> Self {
832        Self {
833            match_field_names: true,
834            row_count: None,
835        }
836    }
837    /// Sets the row_count of RecordBatchOptions and returns self
838    pub fn with_row_count(mut self, row_count: Option<usize>) -> Self {
839        self.row_count = row_count;
840        self
841    }
842    /// Sets the match_field_names of RecordBatchOptions and returns self
843    pub fn with_match_field_names(mut self, match_field_names: bool) -> Self {
844        self.match_field_names = match_field_names;
845        self
846    }
847}
848impl Default for RecordBatchOptions {
849    fn default() -> Self {
850        Self::new()
851    }
852}
853impl From<StructArray> for RecordBatch {
854    fn from(value: StructArray) -> Self {
855        let row_count = value.len();
856        let (fields, columns, nulls) = value.into_parts();
857        assert_eq!(
858            nulls.map(|n| n.null_count()).unwrap_or_default(),
859            0,
860            "Cannot convert nullable StructArray to RecordBatch, see StructArray documentation"
861        );
862
863        RecordBatch {
864            schema: Arc::new(Schema::new(fields)),
865            row_count,
866            columns,
867        }
868    }
869}
870
871impl From<&StructArray> for RecordBatch {
872    fn from(struct_array: &StructArray) -> Self {
873        struct_array.clone().into()
874    }
875}
876
877impl Index<&str> for RecordBatch {
878    type Output = ArrayRef;
879
880    /// Get a reference to a column's array by name.
881    ///
882    /// # Panics
883    ///
884    /// Panics if the name is not in the schema.
885    fn index(&self, name: &str) -> &Self::Output {
886        self.column_by_name(name).unwrap()
887    }
888}
889
890/// Generic implementation of [RecordBatchReader] that wraps an iterator.
891///
892/// # Example
893///
894/// ```
895/// # use std::sync::Arc;
896/// # use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray, RecordBatchIterator, RecordBatchReader};
897/// #
898/// let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2]));
899/// let b: ArrayRef = Arc::new(StringArray::from(vec!["a", "b"]));
900///
901/// let record_batch = RecordBatch::try_from_iter(vec![
902///   ("a", a),
903///   ("b", b),
904/// ]).unwrap();
905///
906/// let batches: Vec<RecordBatch> = vec![record_batch.clone(), record_batch.clone()];
907///
908/// let mut reader = RecordBatchIterator::new(batches.into_iter().map(Ok), record_batch.schema());
909///
910/// assert_eq!(reader.schema(), record_batch.schema());
911/// assert_eq!(reader.next().unwrap().unwrap(), record_batch);
912/// # assert_eq!(reader.next().unwrap().unwrap(), record_batch);
913/// # assert!(reader.next().is_none());
914/// ```
915pub struct RecordBatchIterator<I>
916where
917    I: IntoIterator<Item = Result<RecordBatch, ArrowError>>,
918{
919    inner: I::IntoIter,
920    inner_schema: SchemaRef,
921}
922
923impl<I> RecordBatchIterator<I>
924where
925    I: IntoIterator<Item = Result<RecordBatch, ArrowError>>,
926{
927    /// Create a new [RecordBatchIterator].
928    ///
929    /// If `iter` is an infallible iterator, use `.map(Ok)`.
930    pub fn new(iter: I, schema: SchemaRef) -> Self {
931        Self {
932            inner: iter.into_iter(),
933            inner_schema: schema,
934        }
935    }
936}
937
938impl<I> Iterator for RecordBatchIterator<I>
939where
940    I: IntoIterator<Item = Result<RecordBatch, ArrowError>>,
941{
942    type Item = I::Item;
943
944    fn next(&mut self) -> Option<Self::Item> {
945        self.inner.next()
946    }
947
948    fn size_hint(&self) -> (usize, Option<usize>) {
949        self.inner.size_hint()
950    }
951}
952
953impl<I> RecordBatchReader for RecordBatchIterator<I>
954where
955    I: IntoIterator<Item = Result<RecordBatch, ArrowError>>,
956{
957    fn schema(&self) -> SchemaRef {
958        self.inner_schema.clone()
959    }
960}
961
962#[cfg(test)]
963mod tests {
964    use super::*;
965    use crate::{
966        BooleanArray, Int8Array, Int32Array, Int64Array, ListArray, StringArray, StringViewArray,
967    };
968    use arrow_buffer::{Buffer, NullBuffer, ToByteSlice};
969    use arrow_data::{ArrayData, ArrayDataBuilder};
970    use arrow_schema::Fields;
971    use std::collections::HashMap;
972
973    #[test]
974    fn create_record_batch() {
975        let schema = Schema::new(vec![
976            Field::new("a", DataType::Int32, false),
977            Field::new("b", DataType::Utf8, false),
978        ]);
979
980        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
981        let b = StringArray::from(vec!["a", "b", "c", "d", "e"]);
982
983        let record_batch =
984            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
985        check_batch(record_batch, 5)
986    }
987
988    #[test]
989    fn create_string_view_record_batch() {
990        let schema = Schema::new(vec![
991            Field::new("a", DataType::Int32, false),
992            Field::new("b", DataType::Utf8View, false),
993        ]);
994
995        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
996        let b = StringViewArray::from(vec!["a", "b", "c", "d", "e"]);
997
998        let record_batch =
999            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
1000
1001        assert_eq!(5, record_batch.num_rows());
1002        assert_eq!(2, record_batch.num_columns());
1003        assert_eq!(&DataType::Int32, record_batch.schema().field(0).data_type());
1004        assert_eq!(
1005            &DataType::Utf8View,
1006            record_batch.schema().field(1).data_type()
1007        );
1008        assert_eq!(5, record_batch.column(0).len());
1009        assert_eq!(5, record_batch.column(1).len());
1010    }
1011
1012    #[test]
1013    fn create_binary_record_batch_from_variables() {
1014        let binary_values = vec![b"a".as_slice()];
1015        let large_binary_values = vec![b"xxx".as_slice()];
1016
1017        let record_batch = record_batch!(
1018            ("a", Binary, binary_values),
1019            ("b", LargeBinary, large_binary_values)
1020        )
1021        .unwrap();
1022
1023        assert_eq!(1, record_batch.num_rows());
1024        assert_eq!(2, record_batch.num_columns());
1025        assert_eq!(
1026            &DataType::Binary,
1027            record_batch.schema().field(0).data_type()
1028        );
1029        assert_eq!(
1030            &DataType::LargeBinary,
1031            record_batch.schema().field(1).data_type()
1032        );
1033
1034        let binary = record_batch.column(0).as_binary::<i32>();
1035        assert_eq!(b"a", binary.value(0));
1036
1037        let large_binary = record_batch.column(1).as_binary::<i64>();
1038        assert_eq!(b"xxx", large_binary.value(0));
1039    }
1040
1041    #[test]
1042    fn byte_size_should_not_regress() {
1043        let schema = Schema::new(vec![
1044            Field::new("a", DataType::Int32, false),
1045            Field::new("b", DataType::Utf8, false),
1046        ]);
1047
1048        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1049        let b = StringArray::from(vec!["a", "b", "c", "d", "e"]);
1050
1051        let record_batch =
1052            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
1053        assert_eq!(record_batch.get_array_memory_size(), 364);
1054    }
1055
1056    fn check_batch(record_batch: RecordBatch, num_rows: usize) {
1057        assert_eq!(num_rows, record_batch.num_rows());
1058        assert_eq!(2, record_batch.num_columns());
1059        assert_eq!(&DataType::Int32, record_batch.schema().field(0).data_type());
1060        assert_eq!(&DataType::Utf8, record_batch.schema().field(1).data_type());
1061        assert_eq!(num_rows, record_batch.column(0).len());
1062        assert_eq!(num_rows, record_batch.column(1).len());
1063    }
1064
1065    #[test]
1066    #[should_panic(expected = "assertion failed: (offset + length) <= self.num_rows()")]
1067    fn create_record_batch_slice() {
1068        let schema = Schema::new(vec![
1069            Field::new("a", DataType::Int32, false),
1070            Field::new("b", DataType::Utf8, false),
1071        ]);
1072        let expected_schema = schema.clone();
1073
1074        let a = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8]);
1075        let b = StringArray::from(vec!["a", "b", "c", "d", "e", "f", "h", "i"]);
1076
1077        let record_batch =
1078            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
1079
1080        let offset = 2;
1081        let length = 5;
1082        let record_batch_slice = record_batch.slice(offset, length);
1083
1084        assert_eq!(record_batch_slice.schema().as_ref(), &expected_schema);
1085        check_batch(record_batch_slice, 5);
1086
1087        let offset = 2;
1088        let length = 0;
1089        let record_batch_slice = record_batch.slice(offset, length);
1090
1091        assert_eq!(record_batch_slice.schema().as_ref(), &expected_schema);
1092        check_batch(record_batch_slice, 0);
1093
1094        let offset = 2;
1095        let length = 10;
1096        let _record_batch_slice = record_batch.slice(offset, length);
1097    }
1098
1099    #[test]
1100    #[should_panic(expected = "assertion failed: (offset + length) <= self.num_rows()")]
1101    fn create_record_batch_slice_empty_batch() {
1102        let schema = Schema::empty();
1103
1104        let record_batch = RecordBatch::new_empty(Arc::new(schema));
1105
1106        let offset = 0;
1107        let length = 0;
1108        let record_batch_slice = record_batch.slice(offset, length);
1109        assert_eq!(0, record_batch_slice.schema().fields().len());
1110
1111        let offset = 1;
1112        let length = 2;
1113        let _record_batch_slice = record_batch.slice(offset, length);
1114    }
1115
1116    #[test]
1117    fn create_record_batch_try_from_iter() {
1118        let a: ArrayRef = Arc::new(Int32Array::from(vec![
1119            Some(1),
1120            Some(2),
1121            None,
1122            Some(4),
1123            Some(5),
1124        ]));
1125        let b: ArrayRef = Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"]));
1126
1127        let record_batch =
1128            RecordBatch::try_from_iter(vec![("a", a), ("b", b)]).expect("valid conversion");
1129
1130        let expected_schema = Schema::new(vec![
1131            Field::new("a", DataType::Int32, true),
1132            Field::new("b", DataType::Utf8, false),
1133        ]);
1134        assert_eq!(record_batch.schema().as_ref(), &expected_schema);
1135        check_batch(record_batch, 5);
1136    }
1137
1138    #[test]
1139    fn create_record_batch_try_from_iter_with_nullable() {
1140        let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
1141        let b: ArrayRef = Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"]));
1142
1143        // Note there are no nulls in a or b, but we specify that b is nullable
1144        let record_batch =
1145            RecordBatch::try_from_iter_with_nullable(vec![("a", a, false), ("b", b, true)])
1146                .expect("valid conversion");
1147
1148        let expected_schema = Schema::new(vec![
1149            Field::new("a", DataType::Int32, false),
1150            Field::new("b", DataType::Utf8, true),
1151        ]);
1152        assert_eq!(record_batch.schema().as_ref(), &expected_schema);
1153        check_batch(record_batch, 5);
1154    }
1155
1156    #[test]
1157    fn create_record_batch_schema_mismatch() {
1158        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1159
1160        let a = Int64Array::from(vec![1, 2, 3, 4, 5]);
1161
1162        let err = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap_err();
1163        assert_eq!(
1164            err.to_string(),
1165            "Invalid argument error: column types must match schema types, expected Int32 but found Int64 at column index 0"
1166        );
1167    }
1168
1169    #[test]
1170    fn create_record_batch_field_name_mismatch() {
1171        let fields = vec![
1172            Field::new("a1", DataType::Int32, false),
1173            Field::new_list("a2", Field::new_list_field(DataType::Int8, false), false),
1174        ];
1175        let schema = Arc::new(Schema::new(vec![Field::new_struct("a", fields, true)]));
1176
1177        let a1: ArrayRef = Arc::new(Int32Array::from(vec![1, 2]));
1178        let a2_child = Int8Array::from(vec![1, 2, 3, 4]);
1179        let a2 = ArrayDataBuilder::new(DataType::List(Arc::new(Field::new(
1180            "array",
1181            DataType::Int8,
1182            false,
1183        ))))
1184        .add_child_data(a2_child.into_data())
1185        .len(2)
1186        .add_buffer(Buffer::from([0i32, 3, 4].to_byte_slice()))
1187        .build()
1188        .unwrap();
1189        let a2: ArrayRef = Arc::new(ListArray::from(a2));
1190        let a = ArrayDataBuilder::new(DataType::Struct(Fields::from(vec![
1191            Field::new("aa1", DataType::Int32, false),
1192            Field::new("a2", a2.data_type().clone(), false),
1193        ])))
1194        .add_child_data(a1.into_data())
1195        .add_child_data(a2.into_data())
1196        .len(2)
1197        .build()
1198        .unwrap();
1199        let a: ArrayRef = Arc::new(StructArray::from(a));
1200
1201        // creating the batch with field name validation should fail
1202        let batch = RecordBatch::try_new(schema.clone(), vec![a.clone()]);
1203        assert!(batch.is_err());
1204
1205        // creating the batch without field name validation should pass
1206        let options = RecordBatchOptions {
1207            match_field_names: false,
1208            row_count: None,
1209        };
1210        let batch = RecordBatch::try_new_with_options(schema, vec![a], &options);
1211        assert!(batch.is_ok());
1212    }
1213
1214    #[test]
1215    fn create_record_batch_record_mismatch() {
1216        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1217
1218        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1219        let b = Int32Array::from(vec![1, 2, 3, 4, 5]);
1220
1221        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]);
1222        assert!(batch.is_err());
1223    }
1224
1225    #[test]
1226    fn create_record_batch_from_struct_array() {
1227        let boolean = Arc::new(BooleanArray::from(vec![false, false, true, true]));
1228        let int = Arc::new(Int32Array::from(vec![42, 28, 19, 31]));
1229        let struct_array = StructArray::from(vec![
1230            (
1231                Arc::new(Field::new("b", DataType::Boolean, false)),
1232                boolean.clone() as ArrayRef,
1233            ),
1234            (
1235                Arc::new(Field::new("c", DataType::Int32, false)),
1236                int.clone() as ArrayRef,
1237            ),
1238        ]);
1239
1240        let batch = RecordBatch::from(&struct_array);
1241        assert_eq!(2, batch.num_columns());
1242        assert_eq!(4, batch.num_rows());
1243        assert_eq!(
1244            struct_array.data_type(),
1245            &DataType::Struct(batch.schema().fields().clone())
1246        );
1247        assert_eq!(batch.column(0).as_ref(), boolean.as_ref());
1248        assert_eq!(batch.column(1).as_ref(), int.as_ref());
1249    }
1250
1251    #[test]
1252    fn record_batch_equality() {
1253        let id_arr1 = Int32Array::from(vec![1, 2, 3, 4]);
1254        let val_arr1 = Int32Array::from(vec![5, 6, 7, 8]);
1255        let schema1 = Schema::new(vec![
1256            Field::new("id", DataType::Int32, false),
1257            Field::new("val", DataType::Int32, false),
1258        ]);
1259
1260        let id_arr2 = Int32Array::from(vec![1, 2, 3, 4]);
1261        let val_arr2 = Int32Array::from(vec![5, 6, 7, 8]);
1262        let schema2 = Schema::new(vec![
1263            Field::new("id", DataType::Int32, false),
1264            Field::new("val", DataType::Int32, false),
1265        ]);
1266
1267        let batch1 = RecordBatch::try_new(
1268            Arc::new(schema1),
1269            vec![Arc::new(id_arr1), Arc::new(val_arr1)],
1270        )
1271        .unwrap();
1272
1273        let batch2 = RecordBatch::try_new(
1274            Arc::new(schema2),
1275            vec![Arc::new(id_arr2), Arc::new(val_arr2)],
1276        )
1277        .unwrap();
1278
1279        assert_eq!(batch1, batch2);
1280    }
1281
1282    /// validates if the record batch can be accessed using `column_name` as index i.e. `record_batch["column_name"]`
1283    #[test]
1284    fn record_batch_index_access() {
1285        let id_arr = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
1286        let val_arr = Arc::new(Int32Array::from(vec![5, 6, 7, 8]));
1287        let schema1 = Schema::new(vec![
1288            Field::new("id", DataType::Int32, false),
1289            Field::new("val", DataType::Int32, false),
1290        ]);
1291        let record_batch =
1292            RecordBatch::try_new(Arc::new(schema1), vec![id_arr.clone(), val_arr.clone()]).unwrap();
1293
1294        assert_eq!(record_batch["id"].as_ref(), id_arr.as_ref());
1295        assert_eq!(record_batch["val"].as_ref(), val_arr.as_ref());
1296    }
1297
1298    #[test]
1299    fn record_batch_vals_ne() {
1300        let id_arr1 = Int32Array::from(vec![1, 2, 3, 4]);
1301        let val_arr1 = Int32Array::from(vec![5, 6, 7, 8]);
1302        let schema1 = Schema::new(vec![
1303            Field::new("id", DataType::Int32, false),
1304            Field::new("val", DataType::Int32, false),
1305        ]);
1306
1307        let id_arr2 = Int32Array::from(vec![1, 2, 3, 4]);
1308        let val_arr2 = Int32Array::from(vec![1, 2, 3, 4]);
1309        let schema2 = Schema::new(vec![
1310            Field::new("id", DataType::Int32, false),
1311            Field::new("val", DataType::Int32, false),
1312        ]);
1313
1314        let batch1 = RecordBatch::try_new(
1315            Arc::new(schema1),
1316            vec![Arc::new(id_arr1), Arc::new(val_arr1)],
1317        )
1318        .unwrap();
1319
1320        let batch2 = RecordBatch::try_new(
1321            Arc::new(schema2),
1322            vec![Arc::new(id_arr2), Arc::new(val_arr2)],
1323        )
1324        .unwrap();
1325
1326        assert_ne!(batch1, batch2);
1327    }
1328
1329    #[test]
1330    fn record_batch_column_names_ne() {
1331        let id_arr1 = Int32Array::from(vec![1, 2, 3, 4]);
1332        let val_arr1 = Int32Array::from(vec![5, 6, 7, 8]);
1333        let schema1 = Schema::new(vec![
1334            Field::new("id", DataType::Int32, false),
1335            Field::new("val", DataType::Int32, false),
1336        ]);
1337
1338        let id_arr2 = Int32Array::from(vec![1, 2, 3, 4]);
1339        let val_arr2 = Int32Array::from(vec![5, 6, 7, 8]);
1340        let schema2 = Schema::new(vec![
1341            Field::new("id", DataType::Int32, false),
1342            Field::new("num", DataType::Int32, false),
1343        ]);
1344
1345        let batch1 = RecordBatch::try_new(
1346            Arc::new(schema1),
1347            vec![Arc::new(id_arr1), Arc::new(val_arr1)],
1348        )
1349        .unwrap();
1350
1351        let batch2 = RecordBatch::try_new(
1352            Arc::new(schema2),
1353            vec![Arc::new(id_arr2), Arc::new(val_arr2)],
1354        )
1355        .unwrap();
1356
1357        assert_ne!(batch1, batch2);
1358    }
1359
1360    #[test]
1361    fn record_batch_column_number_ne() {
1362        let id_arr1 = Int32Array::from(vec![1, 2, 3, 4]);
1363        let val_arr1 = Int32Array::from(vec![5, 6, 7, 8]);
1364        let schema1 = Schema::new(vec![
1365            Field::new("id", DataType::Int32, false),
1366            Field::new("val", DataType::Int32, false),
1367        ]);
1368
1369        let id_arr2 = Int32Array::from(vec![1, 2, 3, 4]);
1370        let val_arr2 = Int32Array::from(vec![5, 6, 7, 8]);
1371        let num_arr2 = Int32Array::from(vec![5, 6, 7, 8]);
1372        let schema2 = Schema::new(vec![
1373            Field::new("id", DataType::Int32, false),
1374            Field::new("val", DataType::Int32, false),
1375            Field::new("num", DataType::Int32, false),
1376        ]);
1377
1378        let batch1 = RecordBatch::try_new(
1379            Arc::new(schema1),
1380            vec![Arc::new(id_arr1), Arc::new(val_arr1)],
1381        )
1382        .unwrap();
1383
1384        let batch2 = RecordBatch::try_new(
1385            Arc::new(schema2),
1386            vec![Arc::new(id_arr2), Arc::new(val_arr2), Arc::new(num_arr2)],
1387        )
1388        .unwrap();
1389
1390        assert_ne!(batch1, batch2);
1391    }
1392
1393    #[test]
1394    fn record_batch_row_count_ne() {
1395        let id_arr1 = Int32Array::from(vec![1, 2, 3]);
1396        let val_arr1 = Int32Array::from(vec![5, 6, 7]);
1397        let schema1 = Schema::new(vec![
1398            Field::new("id", DataType::Int32, false),
1399            Field::new("val", DataType::Int32, false),
1400        ]);
1401
1402        let id_arr2 = Int32Array::from(vec![1, 2, 3, 4]);
1403        let val_arr2 = Int32Array::from(vec![5, 6, 7, 8]);
1404        let schema2 = Schema::new(vec![
1405            Field::new("id", DataType::Int32, false),
1406            Field::new("num", DataType::Int32, false),
1407        ]);
1408
1409        let batch1 = RecordBatch::try_new(
1410            Arc::new(schema1),
1411            vec![Arc::new(id_arr1), Arc::new(val_arr1)],
1412        )
1413        .unwrap();
1414
1415        let batch2 = RecordBatch::try_new(
1416            Arc::new(schema2),
1417            vec![Arc::new(id_arr2), Arc::new(val_arr2)],
1418        )
1419        .unwrap();
1420
1421        assert_ne!(batch1, batch2);
1422    }
1423
1424    #[test]
1425    fn normalize_simple() {
1426        let animals: ArrayRef = Arc::new(StringArray::from(vec!["Parrot", ""]));
1427        let n_legs: ArrayRef = Arc::new(Int64Array::from(vec![Some(2), Some(4)]));
1428        let year: ArrayRef = Arc::new(Int64Array::from(vec![None, Some(2022)]));
1429
1430        let animals_field = Arc::new(Field::new("animals", DataType::Utf8, true));
1431        let n_legs_field = Arc::new(Field::new("n_legs", DataType::Int64, true));
1432        let year_field = Arc::new(Field::new("year", DataType::Int64, true));
1433
1434        let a = Arc::new(StructArray::from(vec![
1435            (animals_field.clone(), Arc::new(animals.clone()) as ArrayRef),
1436            (n_legs_field.clone(), Arc::new(n_legs.clone()) as ArrayRef),
1437            (year_field.clone(), Arc::new(year.clone()) as ArrayRef),
1438        ]));
1439
1440        let month = Arc::new(Int64Array::from(vec![Some(4), Some(6)]));
1441
1442        let schema = Schema::new(vec![
1443            Field::new(
1444                "a",
1445                DataType::Struct(Fields::from(vec![animals_field, n_legs_field, year_field])),
1446                false,
1447            ),
1448            Field::new("month", DataType::Int64, true),
1449        ]);
1450
1451        let normalized =
1452            RecordBatch::try_new(Arc::new(schema.clone()), vec![a.clone(), month.clone()])
1453                .expect("valid conversion")
1454                .normalize(".", Some(0))
1455                .expect("valid normalization");
1456
1457        let expected = RecordBatch::try_from_iter_with_nullable(vec![
1458            ("a.animals", animals.clone(), true),
1459            ("a.n_legs", n_legs.clone(), true),
1460            ("a.year", year.clone(), true),
1461            ("month", month.clone(), true),
1462        ])
1463        .expect("valid conversion");
1464
1465        assert_eq!(expected, normalized);
1466
1467        // check 0 and None have the same effect
1468        let normalized = RecordBatch::try_new(Arc::new(schema), vec![a, month.clone()])
1469            .expect("valid conversion")
1470            .normalize(".", None)
1471            .expect("valid normalization");
1472
1473        assert_eq!(expected, normalized);
1474    }
1475
1476    #[test]
1477    fn normalize_nested() {
1478        // Initialize schema
1479        let a = Arc::new(Field::new("a", DataType::Int64, true));
1480        let b = Arc::new(Field::new("b", DataType::Int64, false));
1481        let c = Arc::new(Field::new("c", DataType::Int64, true));
1482
1483        let one = Arc::new(Field::new(
1484            "1",
1485            DataType::Struct(Fields::from(vec![a.clone(), b.clone(), c.clone()])),
1486            false,
1487        ));
1488        let two = Arc::new(Field::new(
1489            "2",
1490            DataType::Struct(Fields::from(vec![a.clone(), b.clone(), c.clone()])),
1491            true,
1492        ));
1493
1494        let exclamation = Arc::new(Field::new(
1495            "!",
1496            DataType::Struct(Fields::from(vec![one.clone(), two.clone()])),
1497            false,
1498        ));
1499
1500        let schema = Schema::new(vec![exclamation.clone()]);
1501
1502        // Initialize fields
1503        let a_field = Int64Array::from(vec![Some(0), Some(1)]);
1504        let b_field = Int64Array::from(vec![Some(2), Some(3)]);
1505        let c_field = Int64Array::from(vec![None, Some(4)]);
1506
1507        let one_field = StructArray::from(vec![
1508            (a.clone(), Arc::new(a_field.clone()) as ArrayRef),
1509            (b.clone(), Arc::new(b_field.clone()) as ArrayRef),
1510            (c.clone(), Arc::new(c_field.clone()) as ArrayRef),
1511        ]);
1512        let two_field = StructArray::from(vec![
1513            (a.clone(), Arc::new(a_field.clone()) as ArrayRef),
1514            (b.clone(), Arc::new(b_field.clone()) as ArrayRef),
1515            (c.clone(), Arc::new(c_field.clone()) as ArrayRef),
1516        ]);
1517
1518        let exclamation_field = Arc::new(StructArray::from(vec![
1519            (one.clone(), Arc::new(one_field) as ArrayRef),
1520            (two.clone(), Arc::new(two_field) as ArrayRef),
1521        ]));
1522
1523        // Normalize top level
1524        let normalized =
1525            RecordBatch::try_new(Arc::new(schema.clone()), vec![exclamation_field.clone()])
1526                .expect("valid conversion")
1527                .normalize(".", Some(1))
1528                .expect("valid normalization");
1529
1530        let expected = RecordBatch::try_from_iter_with_nullable(vec![
1531            (
1532                "!.1",
1533                Arc::new(StructArray::from(vec![
1534                    (a.clone(), Arc::new(a_field.clone()) as ArrayRef),
1535                    (b.clone(), Arc::new(b_field.clone()) as ArrayRef),
1536                    (c.clone(), Arc::new(c_field.clone()) as ArrayRef),
1537                ])) as ArrayRef,
1538                false,
1539            ),
1540            (
1541                "!.2",
1542                Arc::new(StructArray::from(vec![
1543                    (a.clone(), Arc::new(a_field.clone()) as ArrayRef),
1544                    (b.clone(), Arc::new(b_field.clone()) as ArrayRef),
1545                    (c.clone(), Arc::new(c_field.clone()) as ArrayRef),
1546                ])) as ArrayRef,
1547                true,
1548            ),
1549        ])
1550        .expect("valid conversion");
1551
1552        assert_eq!(expected, normalized);
1553
1554        // Normalize all levels
1555        let normalized = RecordBatch::try_new(Arc::new(schema), vec![exclamation_field])
1556            .expect("valid conversion")
1557            .normalize(".", None)
1558            .expect("valid normalization");
1559
1560        let expected = RecordBatch::try_from_iter_with_nullable(vec![
1561            ("!.1.a", Arc::new(a_field.clone()) as ArrayRef, true),
1562            ("!.1.b", Arc::new(b_field.clone()) as ArrayRef, false),
1563            ("!.1.c", Arc::new(c_field.clone()) as ArrayRef, true),
1564            ("!.2.a", Arc::new(a_field.clone()) as ArrayRef, true),
1565            ("!.2.b", Arc::new(b_field.clone()) as ArrayRef, false),
1566            ("!.2.c", Arc::new(c_field.clone()) as ArrayRef, true),
1567        ])
1568        .expect("valid conversion");
1569
1570        assert_eq!(expected, normalized);
1571    }
1572
1573    #[test]
1574    fn normalize_empty() {
1575        let animals_field = Arc::new(Field::new("animals", DataType::Utf8, true));
1576        let n_legs_field = Arc::new(Field::new("n_legs", DataType::Int64, true));
1577        let year_field = Arc::new(Field::new("year", DataType::Int64, true));
1578
1579        let schema = Schema::new(vec![
1580            Field::new(
1581                "a",
1582                DataType::Struct(Fields::from(vec![animals_field, n_legs_field, year_field])),
1583                false,
1584            ),
1585            Field::new("month", DataType::Int64, true),
1586        ]);
1587
1588        let normalized = RecordBatch::new_empty(Arc::new(schema.clone()))
1589            .normalize(".", Some(0))
1590            .expect("valid normalization");
1591
1592        let expected = RecordBatch::new_empty(Arc::new(
1593            schema.normalize(".", Some(0)).expect("valid normalization"),
1594        ));
1595
1596        assert_eq!(expected, normalized);
1597    }
1598
1599    #[test]
1600    fn project() {
1601        let a: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(3)]));
1602        let b: ArrayRef = Arc::new(StringArray::from(vec!["a", "b", "c"]));
1603        let c: ArrayRef = Arc::new(StringArray::from(vec!["d", "e", "f"]));
1604
1605        let record_batch =
1606            RecordBatch::try_from_iter(vec![("a", a.clone()), ("b", b.clone()), ("c", c.clone())])
1607                .expect("valid conversion");
1608
1609        let expected =
1610            RecordBatch::try_from_iter(vec![("a", a), ("c", c)]).expect("valid conversion");
1611
1612        assert_eq!(expected, record_batch.project(&[0, 2]).unwrap());
1613    }
1614
1615    #[test]
1616    fn project_empty() {
1617        let c: ArrayRef = Arc::new(StringArray::from(vec!["d", "e", "f"]));
1618
1619        let record_batch =
1620            RecordBatch::try_from_iter(vec![("c", c.clone())]).expect("valid conversion");
1621
1622        let expected = RecordBatch::try_new_with_options(
1623            Arc::new(Schema::empty()),
1624            vec![],
1625            &RecordBatchOptions {
1626                match_field_names: true,
1627                row_count: Some(3),
1628            },
1629        )
1630        .expect("valid conversion");
1631
1632        assert_eq!(expected, record_batch.project(&[]).unwrap());
1633    }
1634
1635    #[test]
1636    fn test_no_column_record_batch() {
1637        let schema = Arc::new(Schema::empty());
1638
1639        let err = RecordBatch::try_new(schema.clone(), vec![]).unwrap_err();
1640        assert!(
1641            err.to_string()
1642                .contains("must either specify a row count or at least one column")
1643        );
1644
1645        let options = RecordBatchOptions::new().with_row_count(Some(10));
1646
1647        let ok = RecordBatch::try_new_with_options(schema.clone(), vec![], &options).unwrap();
1648        assert_eq!(ok.num_rows(), 10);
1649
1650        let a = ok.slice(2, 5);
1651        assert_eq!(a.num_rows(), 5);
1652
1653        let b = ok.slice(5, 0);
1654        assert_eq!(b.num_rows(), 0);
1655
1656        assert_ne!(a, b);
1657        assert_eq!(b, RecordBatch::new_empty(schema))
1658    }
1659
1660    #[test]
1661    fn test_nulls_in_non_nullable_field() {
1662        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
1663        let maybe_batch = RecordBatch::try_new(
1664            schema,
1665            vec![Arc::new(Int32Array::from(vec![Some(1), None]))],
1666        );
1667        assert_eq!(
1668            "Invalid argument error: Column 'a' is declared as non-nullable but contains null values",
1669            format!("{}", maybe_batch.err().unwrap())
1670        );
1671    }
1672    #[test]
1673    fn test_record_batch_options() {
1674        let options = RecordBatchOptions::new()
1675            .with_match_field_names(false)
1676            .with_row_count(Some(20));
1677        assert!(!options.match_field_names);
1678        assert_eq!(options.row_count.unwrap(), 20)
1679    }
1680
1681    #[test]
1682    #[should_panic(expected = "Cannot convert nullable StructArray to RecordBatch")]
1683    fn test_from_struct() {
1684        let s = StructArray::from(ArrayData::new_null(
1685            // Note child is not nullable
1686            &DataType::Struct(vec![Field::new("foo", DataType::Int32, false)].into()),
1687            2,
1688        ));
1689        let _ = RecordBatch::from(s);
1690    }
1691
1692    #[test]
1693    fn test_with_schema() {
1694        let required_schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1695        let required_schema = Arc::new(required_schema);
1696        let nullable_schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
1697        let nullable_schema = Arc::new(nullable_schema);
1698
1699        let batch = RecordBatch::try_new(
1700            required_schema.clone(),
1701            vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as _],
1702        )
1703        .unwrap();
1704
1705        // Can add nullability
1706        let batch = batch.with_schema(nullable_schema.clone()).unwrap();
1707
1708        // Cannot remove nullability
1709        batch.clone().with_schema(required_schema).unwrap_err();
1710
1711        // Can add metadata
1712        let metadata = vec![("foo".to_string(), "bar".to_string())]
1713            .into_iter()
1714            .collect();
1715        let metadata_schema = nullable_schema.as_ref().clone().with_metadata(metadata);
1716        let batch = batch.with_schema(Arc::new(metadata_schema)).unwrap();
1717
1718        // Cannot remove metadata
1719        batch.with_schema(nullable_schema).unwrap_err();
1720    }
1721
1722    #[test]
1723    fn test_boxed_reader() {
1724        // Make sure we can pass a boxed reader to a function generic over
1725        // RecordBatchReader.
1726        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1727        let schema = Arc::new(schema);
1728
1729        let reader = RecordBatchIterator::new(std::iter::empty(), schema);
1730        let reader: Box<dyn RecordBatchReader + Send> = Box::new(reader);
1731
1732        fn get_size(reader: impl RecordBatchReader) -> usize {
1733            reader.size_hint().0
1734        }
1735
1736        let size = get_size(reader);
1737        assert_eq!(size, 0);
1738    }
1739
1740    #[test]
1741    fn test_remove_column_maintains_schema_metadata() {
1742        let id_array = Int32Array::from(vec![1, 2, 3, 4, 5]);
1743        let bool_array = BooleanArray::from(vec![true, false, false, true, true]);
1744
1745        let mut metadata = HashMap::new();
1746        metadata.insert("foo".to_string(), "bar".to_string());
1747        let schema = Schema::new(vec![
1748            Field::new("id", DataType::Int32, false),
1749            Field::new("bool", DataType::Boolean, false),
1750        ])
1751        .with_metadata(metadata);
1752
1753        let mut batch = RecordBatch::try_new(
1754            Arc::new(schema),
1755            vec![Arc::new(id_array), Arc::new(bool_array)],
1756        )
1757        .unwrap();
1758
1759        let _removed_column = batch.remove_column(0);
1760        assert_eq!(batch.schema().metadata().len(), 1);
1761        assert_eq!(
1762            batch.schema().metadata().get("foo").unwrap().as_str(),
1763            "bar"
1764        );
1765    }
1766
1767    #[test]
1768    fn test_normalize_nullable_struct() {
1769        let child = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef;
1770        let struct_nulls =
1771            NullBuffer::new(arrow_buffer::BooleanBuffer::from(vec![true, false, true]));
1772        let struct_array = Arc::new(StructArray::new(
1773            Fields::from(vec![Field::new("x", DataType::Int32, false)]),
1774            vec![child],
1775            Some(struct_nulls),
1776        )) as ArrayRef;
1777
1778        let schema = Schema::new(vec![Field::new(
1779            "s",
1780            DataType::Struct(Fields::from(vec![Field::new("x", DataType::Int32, false)])),
1781            true,
1782        )]);
1783        let batch = RecordBatch::try_new(Arc::new(schema), vec![struct_array]).unwrap();
1784
1785        let normalized = batch.normalize(".", None).unwrap();
1786
1787        assert_eq!(normalized.num_columns(), 1);
1788        assert_eq!(normalized.schema().field(0).name(), "s.x");
1789        assert!(normalized.schema().field(0).is_nullable());
1790        let col = normalized.column(0);
1791        assert!(col.is_valid(0));
1792        assert!(col.is_null(1));
1793        assert!(col.is_valid(2));
1794    }
1795}