1use crate::cast::AsArray;
22use crate::{Array, ArrayRef, StructArray, new_empty_array};
23use arrow_schema::{ArrowError, DataType, Field, FieldRef, Schema, SchemaBuilder, SchemaRef};
24use std::ops::Index;
25use std::sync::Arc;
26
27pub trait RecordBatchReader: Iterator<Item = Result<RecordBatch, ArrowError>> {
31 fn schema(&self) -> SchemaRef;
36}
37
38impl<R: RecordBatchReader + ?Sized> RecordBatchReader for Box<R> {
39 fn schema(&self) -> SchemaRef {
40 self.as_ref().schema()
41 }
42}
43
44pub trait RecordBatchWriter {
46 fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError>;
48
49 fn close(self) -> Result<(), ArrowError>;
51}
52
53#[macro_export]
79macro_rules! create_array {
80 (@from Boolean) => { $crate::BooleanArray };
82 (@from Int8) => { $crate::Int8Array };
83 (@from Int16) => { $crate::Int16Array };
84 (@from Int32) => { $crate::Int32Array };
85 (@from Int64) => { $crate::Int64Array };
86 (@from UInt8) => { $crate::UInt8Array };
87 (@from UInt16) => { $crate::UInt16Array };
88 (@from UInt32) => { $crate::UInt32Array };
89 (@from UInt64) => { $crate::UInt64Array };
90 (@from Float16) => { $crate::Float16Array };
91 (@from Float32) => { $crate::Float32Array };
92 (@from Float64) => { $crate::Float64Array };
93 (@from Utf8) => { $crate::StringArray };
94 (@from Utf8View) => { $crate::StringViewArray };
95 (@from LargeUtf8) => { $crate::LargeStringArray };
96 (@from IntervalDayTime) => { $crate::IntervalDayTimeArray };
97 (@from IntervalYearMonth) => { $crate::IntervalYearMonthArray };
98 (@from Second) => { $crate::TimestampSecondArray };
99 (@from Millisecond) => { $crate::TimestampMillisecondArray };
100 (@from Microsecond) => { $crate::TimestampMicrosecondArray };
101 (@from Nanosecond) => { $crate::TimestampNanosecondArray };
102 (@from Second32) => { $crate::Time32SecondArray };
103 (@from Millisecond32) => { $crate::Time32MillisecondArray };
104 (@from Microsecond64) => { $crate::Time64MicrosecondArray };
105 (@from Nanosecond64) => { $crate::Time64Nanosecond64Array };
106 (@from DurationSecond) => { $crate::DurationSecondArray };
107 (@from DurationMillisecond) => { $crate::DurationMillisecondArray };
108 (@from DurationMicrosecond) => { $crate::DurationMicrosecondArray };
109 (@from DurationNanosecond) => { $crate::DurationNanosecondArray };
110 (@from Decimal32) => { $crate::Decimal32Array };
111 (@from Decimal64) => { $crate::Decimal64Array };
112 (@from Decimal128) => { $crate::Decimal128Array };
113 (@from Decimal256) => { $crate::Decimal256Array };
114 (@from TimestampSecond) => { $crate::TimestampSecondArray };
115 (@from TimestampMillisecond) => { $crate::TimestampMillisecondArray };
116 (@from TimestampMicrosecond) => { $crate::TimestampMicrosecondArray };
117 (@from TimestampNanosecond) => { $crate::TimestampNanosecondArray };
118
119 (@from $ty: ident) => {
120 compile_error!(concat!("Unsupported data type: ", stringify!($ty)))
121 };
122
123 (Null, $size: expr) => {
124 std::sync::Arc::new($crate::NullArray::new($size))
125 };
126
127 (Binary, [$($values: expr),*]) => {
128 std::sync::Arc::new($crate::BinaryArray::from_vec(vec![$($values),*]))
129 };
130
131 (LargeBinary, [$($values: expr),*]) => {
132 std::sync::Arc::new($crate::LargeBinaryArray::from_vec(vec![$($values),*]))
133 };
134
135 ($ty: tt, [$($values: expr),*]) => {
136 std::sync::Arc::new(<$crate::create_array!(@from $ty)>::from(vec![$($values),*]))
137 };
138
139 (Binary, $values: expr) => {
140 std::sync::Arc::new($crate::BinaryArray::from_vec($values))
141 };
142
143 (LargeBinary, $values: expr) => {
144 std::sync::Arc::new($crate::LargeBinaryArray::from_vec($values))
145 };
146
147 ($ty: tt, $values: expr) => {
148 std::sync::Arc::new(<$crate::create_array!(@from $ty)>::from($values))
149 };
150}
151
152#[macro_export]
181macro_rules! record_batch {
182 ($(($name: expr, $type: ident, $($values: tt)+)),*) => {
183 {
184 let schema = std::sync::Arc::new(arrow_schema::Schema::new(vec![
185 $(
186 arrow_schema::Field::new($name, arrow_schema::DataType::$type, true),
187 )*
188 ]));
189
190 $crate::RecordBatch::try_new(
191 schema,
192 vec![$(
193 $crate::create_array!($type, $($values)+),
194 )*]
195 )
196 }
197 };
198}
199
200#[derive(Clone, Debug, PartialEq)]
224pub struct RecordBatch {
225 schema: SchemaRef,
226 columns: Vec<Arc<dyn Array>>,
227
228 row_count: usize,
232}
233
234impl RecordBatch {
235 pub fn try_new(schema: SchemaRef, columns: Vec<ArrayRef>) -> Result<Self, ArrowError> {
264 let options = RecordBatchOptions::new();
265 Self::try_new_impl(schema, columns, &options)
266 }
267
268 pub unsafe fn new_unchecked(
284 schema: SchemaRef,
285 columns: Vec<Arc<dyn Array>>,
286 row_count: usize,
287 ) -> Self {
288 Self {
289 schema,
290 columns,
291 row_count,
292 }
293 }
294
295 pub fn try_new_with_options(
300 schema: SchemaRef,
301 columns: Vec<ArrayRef>,
302 options: &RecordBatchOptions,
303 ) -> Result<Self, ArrowError> {
304 Self::try_new_impl(schema, columns, options)
305 }
306
307 pub fn new_empty(schema: SchemaRef) -> Self {
309 let columns = schema
310 .fields()
311 .iter()
312 .map(|field| new_empty_array(field.data_type()))
313 .collect();
314
315 RecordBatch {
316 schema,
317 columns,
318 row_count: 0,
319 }
320 }
321
322 fn try_new_impl(
325 schema: SchemaRef,
326 columns: Vec<ArrayRef>,
327 options: &RecordBatchOptions,
328 ) -> Result<Self, ArrowError> {
329 if schema.fields().len() != columns.len() {
331 return Err(ArrowError::InvalidArgumentError(format!(
332 "number of columns({}) must match number of fields({}) in schema",
333 columns.len(),
334 schema.fields().len(),
335 )));
336 }
337
338 let row_count = options
339 .row_count
340 .or_else(|| columns.first().map(|col| col.len()))
341 .ok_or_else(|| {
342 ArrowError::InvalidArgumentError(
343 "must either specify a row count or at least one column".to_string(),
344 )
345 })?;
346
347 for (c, f) in columns.iter().zip(&schema.fields) {
348 if !f.is_nullable() && c.null_count() > 0 {
349 return Err(ArrowError::InvalidArgumentError(format!(
350 "Column '{}' is declared as non-nullable but contains null values",
351 f.name()
352 )));
353 }
354 }
355
356 if columns.iter().any(|c| c.len() != row_count) {
358 let err = match options.row_count {
359 Some(_) => "all columns in a record batch must have the specified row count",
360 None => "all columns in a record batch must have the same length",
361 };
362 return Err(ArrowError::InvalidArgumentError(err.to_string()));
363 }
364
365 let type_not_match = if options.match_field_names {
368 |(_, (col_type, field_type)): &(usize, (&DataType, &DataType))| col_type != field_type
369 } else {
370 |(_, (col_type, field_type)): &(usize, (&DataType, &DataType))| {
371 !col_type.equals_datatype(field_type)
372 }
373 };
374
375 let not_match = columns
377 .iter()
378 .zip(schema.fields().iter())
379 .map(|(col, field)| (col.data_type(), field.data_type()))
380 .enumerate()
381 .find(type_not_match);
382
383 if let Some((i, (col_type, field_type))) = not_match {
384 return Err(ArrowError::InvalidArgumentError(format!(
385 "column types must match schema types, expected {field_type} but found {col_type} at column index {i}"
386 )));
387 }
388
389 Ok(RecordBatch {
390 schema,
391 columns,
392 row_count,
393 })
394 }
395
396 pub fn into_parts(self) -> (SchemaRef, Vec<ArrayRef>, usize) {
398 (self.schema, self.columns, self.row_count)
399 }
400
401 pub fn with_schema(self, schema: SchemaRef) -> Result<Self, ArrowError> {
408 if !schema.contains(self.schema.as_ref()) {
409 return Err(ArrowError::SchemaError(format!(
410 "target schema is not superset of current schema target={schema} current={}",
411 self.schema
412 )));
413 }
414
415 Ok(Self {
416 schema,
417 columns: self.columns,
418 row_count: self.row_count,
419 })
420 }
421
422 pub fn schema(&self) -> SchemaRef {
424 self.schema.clone()
425 }
426
427 pub fn schema_ref(&self) -> &SchemaRef {
429 &self.schema
430 }
431
432 pub fn schema_metadata_mut(&mut self) -> &mut std::collections::HashMap<String, String> {
450 let schema = Arc::make_mut(&mut self.schema);
451 &mut schema.metadata
452 }
453
454 pub fn project(&self, indices: &[usize]) -> Result<RecordBatch, ArrowError> {
456 let projected_schema = self.schema.project(indices)?;
457 let batch_fields = indices
458 .iter()
459 .map(|f| {
460 self.columns.get(*f).cloned().ok_or_else(|| {
461 ArrowError::SchemaError(format!(
462 "project index {} out of bounds, max field {}",
463 f,
464 self.columns.len()
465 ))
466 })
467 })
468 .collect::<Result<Vec<_>, _>>()?;
469
470 unsafe {
471 Ok(RecordBatch::new_unchecked(
475 SchemaRef::new(projected_schema),
476 batch_fields,
477 self.row_count,
478 ))
479 }
480 }
481
482 pub fn normalize(&self, separator: &str, max_level: Option<usize>) -> Result<Self, ArrowError> {
542 let max_level = match max_level.unwrap_or(usize::MAX) {
543 0 => usize::MAX,
544 val => val,
545 };
546 let mut stack: Vec<(usize, ArrayRef, String, FieldRef)> = self
547 .columns
548 .iter()
549 .zip(self.schema.fields())
550 .rev()
551 .map(|(c, f)| (0, c.clone(), f.name().clone(), Arc::clone(f)))
552 .collect();
553 let mut columns: Vec<ArrayRef> = Vec::new();
554 let mut fields: Vec<FieldRef> = Vec::new();
555
556 while let Some((depth, c, name, field_ref)) = stack.pop() {
557 match field_ref.data_type() {
558 DataType::Struct(_) if depth < max_level => {
559 let (flat_fields, flat_cols) = c.as_struct().flatten();
560 for (cff, fff) in flat_cols.into_iter().zip(flat_fields.iter()).rev() {
561 let child_name = format!("{name}{separator}{}", fff.name());
562 stack.push((depth + 1, cff, child_name, Arc::clone(fff)))
563 }
564 }
565 _ => {
566 let updated_field =
567 Field::new(name, field_ref.data_type().clone(), field_ref.is_nullable());
568 columns.push(c);
569 fields.push(Arc::new(updated_field));
570 }
571 }
572 }
573 RecordBatch::try_new(Arc::new(Schema::new(fields)), columns)
574 }
575
576 pub fn num_columns(&self) -> usize {
595 self.columns.len()
596 }
597
598 pub fn num_rows(&self) -> usize {
617 self.row_count
618 }
619
620 pub fn column(&self, index: usize) -> &ArrayRef {
626 &self.columns[index]
627 }
628
629 pub fn column_by_name(&self, name: &str) -> Option<&ArrayRef> {
631 self.schema()
632 .column_with_name(name)
633 .map(|(index, _)| &self.columns[index])
634 }
635
636 pub fn columns(&self) -> &[ArrayRef] {
638 &self.columns[..]
639 }
640
641 pub fn remove_column(&mut self, index: usize) -> ArrayRef {
669 let mut builder = SchemaBuilder::from(self.schema.as_ref());
670 builder.remove(index);
671 self.schema = Arc::new(builder.finish());
672 self.columns.remove(index)
673 }
674
675 pub fn slice(&self, offset: usize, length: usize) -> RecordBatch {
682 assert!((offset + length) <= self.num_rows());
683
684 let columns = self
685 .columns()
686 .iter()
687 .map(|column| column.slice(offset, length))
688 .collect();
689
690 Self {
691 schema: self.schema.clone(),
692 columns,
693 row_count: length,
694 }
695 }
696
697 pub fn try_from_iter<I, F>(value: I) -> Result<Self, ArrowError>
734 where
735 I: IntoIterator<Item = (F, ArrayRef)>,
736 F: AsRef<str>,
737 {
738 let iter = value.into_iter().map(|(field_name, array)| {
742 let nullable = array.null_count() > 0;
743 (field_name, array, nullable)
744 });
745
746 Self::try_from_iter_with_nullable(iter)
747 }
748
749 pub fn try_from_iter_with_nullable<I, F>(value: I) -> Result<Self, ArrowError>
771 where
772 I: IntoIterator<Item = (F, ArrayRef, bool)>,
773 F: AsRef<str>,
774 {
775 let iter = value.into_iter();
776 let capacity = iter.size_hint().0;
777 let mut schema = SchemaBuilder::with_capacity(capacity);
778 let mut columns = Vec::with_capacity(capacity);
779
780 for (field_name, array, nullable) in iter {
781 let field_name = field_name.as_ref();
782 schema.push(Field::new(field_name, array.data_type().clone(), nullable));
783 columns.push(array);
784 }
785
786 let schema = Arc::new(schema.finish());
787 RecordBatch::try_new(schema, columns)
788 }
789
790 #[cfg(feature = "pool")]
798 pub fn claim(&self, pool: &dyn arrow_buffer::MemoryPool) {
799 for column in self.columns() {
800 column.claim(pool);
801 }
802 }
803
804 pub fn get_array_memory_size(&self) -> usize {
811 self.columns()
812 .iter()
813 .map(|array| array.get_array_memory_size())
814 .sum()
815 }
816}
817
818#[derive(Debug)]
820#[non_exhaustive]
821pub struct RecordBatchOptions {
822 pub match_field_names: bool,
824
825 pub row_count: Option<usize>,
827}
828
829impl RecordBatchOptions {
830 pub fn new() -> Self {
832 Self {
833 match_field_names: true,
834 row_count: None,
835 }
836 }
837 pub fn with_row_count(mut self, row_count: Option<usize>) -> Self {
839 self.row_count = row_count;
840 self
841 }
842 pub fn with_match_field_names(mut self, match_field_names: bool) -> Self {
844 self.match_field_names = match_field_names;
845 self
846 }
847}
848impl Default for RecordBatchOptions {
849 fn default() -> Self {
850 Self::new()
851 }
852}
853impl From<StructArray> for RecordBatch {
854 fn from(value: StructArray) -> Self {
855 let row_count = value.len();
856 let (fields, columns, nulls) = value.into_parts();
857 assert_eq!(
858 nulls.map(|n| n.null_count()).unwrap_or_default(),
859 0,
860 "Cannot convert nullable StructArray to RecordBatch, see StructArray documentation"
861 );
862
863 RecordBatch {
864 schema: Arc::new(Schema::new(fields)),
865 row_count,
866 columns,
867 }
868 }
869}
870
871impl From<&StructArray> for RecordBatch {
872 fn from(struct_array: &StructArray) -> Self {
873 struct_array.clone().into()
874 }
875}
876
877impl Index<&str> for RecordBatch {
878 type Output = ArrayRef;
879
880 fn index(&self, name: &str) -> &Self::Output {
886 self.column_by_name(name).unwrap()
887 }
888}
889
890pub struct RecordBatchIterator<I>
916where
917 I: IntoIterator<Item = Result<RecordBatch, ArrowError>>,
918{
919 inner: I::IntoIter,
920 inner_schema: SchemaRef,
921}
922
923impl<I> RecordBatchIterator<I>
924where
925 I: IntoIterator<Item = Result<RecordBatch, ArrowError>>,
926{
927 pub fn new(iter: I, schema: SchemaRef) -> Self {
931 Self {
932 inner: iter.into_iter(),
933 inner_schema: schema,
934 }
935 }
936}
937
938impl<I> Iterator for RecordBatchIterator<I>
939where
940 I: IntoIterator<Item = Result<RecordBatch, ArrowError>>,
941{
942 type Item = I::Item;
943
944 fn next(&mut self) -> Option<Self::Item> {
945 self.inner.next()
946 }
947
948 fn size_hint(&self) -> (usize, Option<usize>) {
949 self.inner.size_hint()
950 }
951}
952
953impl<I> RecordBatchReader for RecordBatchIterator<I>
954where
955 I: IntoIterator<Item = Result<RecordBatch, ArrowError>>,
956{
957 fn schema(&self) -> SchemaRef {
958 self.inner_schema.clone()
959 }
960}
961
962#[cfg(test)]
963mod tests {
964 use super::*;
965 use crate::{
966 BooleanArray, Int8Array, Int32Array, Int64Array, ListArray, StringArray, StringViewArray,
967 };
968 use arrow_buffer::{Buffer, NullBuffer, ToByteSlice};
969 use arrow_data::{ArrayData, ArrayDataBuilder};
970 use arrow_schema::Fields;
971 use std::collections::HashMap;
972
973 #[test]
974 fn create_record_batch() {
975 let schema = Schema::new(vec![
976 Field::new("a", DataType::Int32, false),
977 Field::new("b", DataType::Utf8, false),
978 ]);
979
980 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
981 let b = StringArray::from(vec!["a", "b", "c", "d", "e"]);
982
983 let record_batch =
984 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
985 check_batch(record_batch, 5)
986 }
987
988 #[test]
989 fn create_string_view_record_batch() {
990 let schema = Schema::new(vec![
991 Field::new("a", DataType::Int32, false),
992 Field::new("b", DataType::Utf8View, false),
993 ]);
994
995 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
996 let b = StringViewArray::from(vec!["a", "b", "c", "d", "e"]);
997
998 let record_batch =
999 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
1000
1001 assert_eq!(5, record_batch.num_rows());
1002 assert_eq!(2, record_batch.num_columns());
1003 assert_eq!(&DataType::Int32, record_batch.schema().field(0).data_type());
1004 assert_eq!(
1005 &DataType::Utf8View,
1006 record_batch.schema().field(1).data_type()
1007 );
1008 assert_eq!(5, record_batch.column(0).len());
1009 assert_eq!(5, record_batch.column(1).len());
1010 }
1011
1012 #[test]
1013 fn create_binary_record_batch_from_variables() {
1014 let binary_values = vec![b"a".as_slice()];
1015 let large_binary_values = vec![b"xxx".as_slice()];
1016
1017 let record_batch = record_batch!(
1018 ("a", Binary, binary_values),
1019 ("b", LargeBinary, large_binary_values)
1020 )
1021 .unwrap();
1022
1023 assert_eq!(1, record_batch.num_rows());
1024 assert_eq!(2, record_batch.num_columns());
1025 assert_eq!(
1026 &DataType::Binary,
1027 record_batch.schema().field(0).data_type()
1028 );
1029 assert_eq!(
1030 &DataType::LargeBinary,
1031 record_batch.schema().field(1).data_type()
1032 );
1033
1034 let binary = record_batch.column(0).as_binary::<i32>();
1035 assert_eq!(b"a", binary.value(0));
1036
1037 let large_binary = record_batch.column(1).as_binary::<i64>();
1038 assert_eq!(b"xxx", large_binary.value(0));
1039 }
1040
1041 #[test]
1042 fn byte_size_should_not_regress() {
1043 let schema = Schema::new(vec![
1044 Field::new("a", DataType::Int32, false),
1045 Field::new("b", DataType::Utf8, false),
1046 ]);
1047
1048 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1049 let b = StringArray::from(vec!["a", "b", "c", "d", "e"]);
1050
1051 let record_batch =
1052 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
1053 assert_eq!(record_batch.get_array_memory_size(), 364);
1054 }
1055
1056 fn check_batch(record_batch: RecordBatch, num_rows: usize) {
1057 assert_eq!(num_rows, record_batch.num_rows());
1058 assert_eq!(2, record_batch.num_columns());
1059 assert_eq!(&DataType::Int32, record_batch.schema().field(0).data_type());
1060 assert_eq!(&DataType::Utf8, record_batch.schema().field(1).data_type());
1061 assert_eq!(num_rows, record_batch.column(0).len());
1062 assert_eq!(num_rows, record_batch.column(1).len());
1063 }
1064
1065 #[test]
1066 #[should_panic(expected = "assertion failed: (offset + length) <= self.num_rows()")]
1067 fn create_record_batch_slice() {
1068 let schema = Schema::new(vec![
1069 Field::new("a", DataType::Int32, false),
1070 Field::new("b", DataType::Utf8, false),
1071 ]);
1072 let expected_schema = schema.clone();
1073
1074 let a = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8]);
1075 let b = StringArray::from(vec!["a", "b", "c", "d", "e", "f", "h", "i"]);
1076
1077 let record_batch =
1078 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
1079
1080 let offset = 2;
1081 let length = 5;
1082 let record_batch_slice = record_batch.slice(offset, length);
1083
1084 assert_eq!(record_batch_slice.schema().as_ref(), &expected_schema);
1085 check_batch(record_batch_slice, 5);
1086
1087 let offset = 2;
1088 let length = 0;
1089 let record_batch_slice = record_batch.slice(offset, length);
1090
1091 assert_eq!(record_batch_slice.schema().as_ref(), &expected_schema);
1092 check_batch(record_batch_slice, 0);
1093
1094 let offset = 2;
1095 let length = 10;
1096 let _record_batch_slice = record_batch.slice(offset, length);
1097 }
1098
1099 #[test]
1100 #[should_panic(expected = "assertion failed: (offset + length) <= self.num_rows()")]
1101 fn create_record_batch_slice_empty_batch() {
1102 let schema = Schema::empty();
1103
1104 let record_batch = RecordBatch::new_empty(Arc::new(schema));
1105
1106 let offset = 0;
1107 let length = 0;
1108 let record_batch_slice = record_batch.slice(offset, length);
1109 assert_eq!(0, record_batch_slice.schema().fields().len());
1110
1111 let offset = 1;
1112 let length = 2;
1113 let _record_batch_slice = record_batch.slice(offset, length);
1114 }
1115
1116 #[test]
1117 fn create_record_batch_try_from_iter() {
1118 let a: ArrayRef = Arc::new(Int32Array::from(vec![
1119 Some(1),
1120 Some(2),
1121 None,
1122 Some(4),
1123 Some(5),
1124 ]));
1125 let b: ArrayRef = Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"]));
1126
1127 let record_batch =
1128 RecordBatch::try_from_iter(vec![("a", a), ("b", b)]).expect("valid conversion");
1129
1130 let expected_schema = Schema::new(vec![
1131 Field::new("a", DataType::Int32, true),
1132 Field::new("b", DataType::Utf8, false),
1133 ]);
1134 assert_eq!(record_batch.schema().as_ref(), &expected_schema);
1135 check_batch(record_batch, 5);
1136 }
1137
1138 #[test]
1139 fn create_record_batch_try_from_iter_with_nullable() {
1140 let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
1141 let b: ArrayRef = Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"]));
1142
1143 let record_batch =
1145 RecordBatch::try_from_iter_with_nullable(vec![("a", a, false), ("b", b, true)])
1146 .expect("valid conversion");
1147
1148 let expected_schema = Schema::new(vec![
1149 Field::new("a", DataType::Int32, false),
1150 Field::new("b", DataType::Utf8, true),
1151 ]);
1152 assert_eq!(record_batch.schema().as_ref(), &expected_schema);
1153 check_batch(record_batch, 5);
1154 }
1155
1156 #[test]
1157 fn create_record_batch_schema_mismatch() {
1158 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1159
1160 let a = Int64Array::from(vec![1, 2, 3, 4, 5]);
1161
1162 let err = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap_err();
1163 assert_eq!(
1164 err.to_string(),
1165 "Invalid argument error: column types must match schema types, expected Int32 but found Int64 at column index 0"
1166 );
1167 }
1168
1169 #[test]
1170 fn create_record_batch_field_name_mismatch() {
1171 let fields = vec![
1172 Field::new("a1", DataType::Int32, false),
1173 Field::new_list("a2", Field::new_list_field(DataType::Int8, false), false),
1174 ];
1175 let schema = Arc::new(Schema::new(vec![Field::new_struct("a", fields, true)]));
1176
1177 let a1: ArrayRef = Arc::new(Int32Array::from(vec![1, 2]));
1178 let a2_child = Int8Array::from(vec![1, 2, 3, 4]);
1179 let a2 = ArrayDataBuilder::new(DataType::List(Arc::new(Field::new(
1180 "array",
1181 DataType::Int8,
1182 false,
1183 ))))
1184 .add_child_data(a2_child.into_data())
1185 .len(2)
1186 .add_buffer(Buffer::from([0i32, 3, 4].to_byte_slice()))
1187 .build()
1188 .unwrap();
1189 let a2: ArrayRef = Arc::new(ListArray::from(a2));
1190 let a = ArrayDataBuilder::new(DataType::Struct(Fields::from(vec![
1191 Field::new("aa1", DataType::Int32, false),
1192 Field::new("a2", a2.data_type().clone(), false),
1193 ])))
1194 .add_child_data(a1.into_data())
1195 .add_child_data(a2.into_data())
1196 .len(2)
1197 .build()
1198 .unwrap();
1199 let a: ArrayRef = Arc::new(StructArray::from(a));
1200
1201 let batch = RecordBatch::try_new(schema.clone(), vec![a.clone()]);
1203 assert!(batch.is_err());
1204
1205 let options = RecordBatchOptions {
1207 match_field_names: false,
1208 row_count: None,
1209 };
1210 let batch = RecordBatch::try_new_with_options(schema, vec![a], &options);
1211 assert!(batch.is_ok());
1212 }
1213
1214 #[test]
1215 fn create_record_batch_record_mismatch() {
1216 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1217
1218 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1219 let b = Int32Array::from(vec![1, 2, 3, 4, 5]);
1220
1221 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]);
1222 assert!(batch.is_err());
1223 }
1224
1225 #[test]
1226 fn create_record_batch_from_struct_array() {
1227 let boolean = Arc::new(BooleanArray::from(vec![false, false, true, true]));
1228 let int = Arc::new(Int32Array::from(vec![42, 28, 19, 31]));
1229 let struct_array = StructArray::from(vec![
1230 (
1231 Arc::new(Field::new("b", DataType::Boolean, false)),
1232 boolean.clone() as ArrayRef,
1233 ),
1234 (
1235 Arc::new(Field::new("c", DataType::Int32, false)),
1236 int.clone() as ArrayRef,
1237 ),
1238 ]);
1239
1240 let batch = RecordBatch::from(&struct_array);
1241 assert_eq!(2, batch.num_columns());
1242 assert_eq!(4, batch.num_rows());
1243 assert_eq!(
1244 struct_array.data_type(),
1245 &DataType::Struct(batch.schema().fields().clone())
1246 );
1247 assert_eq!(batch.column(0).as_ref(), boolean.as_ref());
1248 assert_eq!(batch.column(1).as_ref(), int.as_ref());
1249 }
1250
1251 #[test]
1252 fn record_batch_equality() {
1253 let id_arr1 = Int32Array::from(vec![1, 2, 3, 4]);
1254 let val_arr1 = Int32Array::from(vec![5, 6, 7, 8]);
1255 let schema1 = Schema::new(vec![
1256 Field::new("id", DataType::Int32, false),
1257 Field::new("val", DataType::Int32, false),
1258 ]);
1259
1260 let id_arr2 = Int32Array::from(vec![1, 2, 3, 4]);
1261 let val_arr2 = Int32Array::from(vec![5, 6, 7, 8]);
1262 let schema2 = Schema::new(vec![
1263 Field::new("id", DataType::Int32, false),
1264 Field::new("val", DataType::Int32, false),
1265 ]);
1266
1267 let batch1 = RecordBatch::try_new(
1268 Arc::new(schema1),
1269 vec![Arc::new(id_arr1), Arc::new(val_arr1)],
1270 )
1271 .unwrap();
1272
1273 let batch2 = RecordBatch::try_new(
1274 Arc::new(schema2),
1275 vec![Arc::new(id_arr2), Arc::new(val_arr2)],
1276 )
1277 .unwrap();
1278
1279 assert_eq!(batch1, batch2);
1280 }
1281
1282 #[test]
1284 fn record_batch_index_access() {
1285 let id_arr = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
1286 let val_arr = Arc::new(Int32Array::from(vec![5, 6, 7, 8]));
1287 let schema1 = Schema::new(vec![
1288 Field::new("id", DataType::Int32, false),
1289 Field::new("val", DataType::Int32, false),
1290 ]);
1291 let record_batch =
1292 RecordBatch::try_new(Arc::new(schema1), vec![id_arr.clone(), val_arr.clone()]).unwrap();
1293
1294 assert_eq!(record_batch["id"].as_ref(), id_arr.as_ref());
1295 assert_eq!(record_batch["val"].as_ref(), val_arr.as_ref());
1296 }
1297
1298 #[test]
1299 fn record_batch_vals_ne() {
1300 let id_arr1 = Int32Array::from(vec![1, 2, 3, 4]);
1301 let val_arr1 = Int32Array::from(vec![5, 6, 7, 8]);
1302 let schema1 = Schema::new(vec![
1303 Field::new("id", DataType::Int32, false),
1304 Field::new("val", DataType::Int32, false),
1305 ]);
1306
1307 let id_arr2 = Int32Array::from(vec![1, 2, 3, 4]);
1308 let val_arr2 = Int32Array::from(vec![1, 2, 3, 4]);
1309 let schema2 = Schema::new(vec![
1310 Field::new("id", DataType::Int32, false),
1311 Field::new("val", DataType::Int32, false),
1312 ]);
1313
1314 let batch1 = RecordBatch::try_new(
1315 Arc::new(schema1),
1316 vec![Arc::new(id_arr1), Arc::new(val_arr1)],
1317 )
1318 .unwrap();
1319
1320 let batch2 = RecordBatch::try_new(
1321 Arc::new(schema2),
1322 vec![Arc::new(id_arr2), Arc::new(val_arr2)],
1323 )
1324 .unwrap();
1325
1326 assert_ne!(batch1, batch2);
1327 }
1328
1329 #[test]
1330 fn record_batch_column_names_ne() {
1331 let id_arr1 = Int32Array::from(vec![1, 2, 3, 4]);
1332 let val_arr1 = Int32Array::from(vec![5, 6, 7, 8]);
1333 let schema1 = Schema::new(vec![
1334 Field::new("id", DataType::Int32, false),
1335 Field::new("val", DataType::Int32, false),
1336 ]);
1337
1338 let id_arr2 = Int32Array::from(vec![1, 2, 3, 4]);
1339 let val_arr2 = Int32Array::from(vec![5, 6, 7, 8]);
1340 let schema2 = Schema::new(vec![
1341 Field::new("id", DataType::Int32, false),
1342 Field::new("num", DataType::Int32, false),
1343 ]);
1344
1345 let batch1 = RecordBatch::try_new(
1346 Arc::new(schema1),
1347 vec![Arc::new(id_arr1), Arc::new(val_arr1)],
1348 )
1349 .unwrap();
1350
1351 let batch2 = RecordBatch::try_new(
1352 Arc::new(schema2),
1353 vec![Arc::new(id_arr2), Arc::new(val_arr2)],
1354 )
1355 .unwrap();
1356
1357 assert_ne!(batch1, batch2);
1358 }
1359
1360 #[test]
1361 fn record_batch_column_number_ne() {
1362 let id_arr1 = Int32Array::from(vec![1, 2, 3, 4]);
1363 let val_arr1 = Int32Array::from(vec![5, 6, 7, 8]);
1364 let schema1 = Schema::new(vec![
1365 Field::new("id", DataType::Int32, false),
1366 Field::new("val", DataType::Int32, false),
1367 ]);
1368
1369 let id_arr2 = Int32Array::from(vec![1, 2, 3, 4]);
1370 let val_arr2 = Int32Array::from(vec![5, 6, 7, 8]);
1371 let num_arr2 = Int32Array::from(vec![5, 6, 7, 8]);
1372 let schema2 = Schema::new(vec![
1373 Field::new("id", DataType::Int32, false),
1374 Field::new("val", DataType::Int32, false),
1375 Field::new("num", DataType::Int32, false),
1376 ]);
1377
1378 let batch1 = RecordBatch::try_new(
1379 Arc::new(schema1),
1380 vec![Arc::new(id_arr1), Arc::new(val_arr1)],
1381 )
1382 .unwrap();
1383
1384 let batch2 = RecordBatch::try_new(
1385 Arc::new(schema2),
1386 vec![Arc::new(id_arr2), Arc::new(val_arr2), Arc::new(num_arr2)],
1387 )
1388 .unwrap();
1389
1390 assert_ne!(batch1, batch2);
1391 }
1392
1393 #[test]
1394 fn record_batch_row_count_ne() {
1395 let id_arr1 = Int32Array::from(vec![1, 2, 3]);
1396 let val_arr1 = Int32Array::from(vec![5, 6, 7]);
1397 let schema1 = Schema::new(vec![
1398 Field::new("id", DataType::Int32, false),
1399 Field::new("val", DataType::Int32, false),
1400 ]);
1401
1402 let id_arr2 = Int32Array::from(vec![1, 2, 3, 4]);
1403 let val_arr2 = Int32Array::from(vec![5, 6, 7, 8]);
1404 let schema2 = Schema::new(vec![
1405 Field::new("id", DataType::Int32, false),
1406 Field::new("num", DataType::Int32, false),
1407 ]);
1408
1409 let batch1 = RecordBatch::try_new(
1410 Arc::new(schema1),
1411 vec![Arc::new(id_arr1), Arc::new(val_arr1)],
1412 )
1413 .unwrap();
1414
1415 let batch2 = RecordBatch::try_new(
1416 Arc::new(schema2),
1417 vec![Arc::new(id_arr2), Arc::new(val_arr2)],
1418 )
1419 .unwrap();
1420
1421 assert_ne!(batch1, batch2);
1422 }
1423
1424 #[test]
1425 fn normalize_simple() {
1426 let animals: ArrayRef = Arc::new(StringArray::from(vec!["Parrot", ""]));
1427 let n_legs: ArrayRef = Arc::new(Int64Array::from(vec![Some(2), Some(4)]));
1428 let year: ArrayRef = Arc::new(Int64Array::from(vec![None, Some(2022)]));
1429
1430 let animals_field = Arc::new(Field::new("animals", DataType::Utf8, true));
1431 let n_legs_field = Arc::new(Field::new("n_legs", DataType::Int64, true));
1432 let year_field = Arc::new(Field::new("year", DataType::Int64, true));
1433
1434 let a = Arc::new(StructArray::from(vec![
1435 (animals_field.clone(), Arc::new(animals.clone()) as ArrayRef),
1436 (n_legs_field.clone(), Arc::new(n_legs.clone()) as ArrayRef),
1437 (year_field.clone(), Arc::new(year.clone()) as ArrayRef),
1438 ]));
1439
1440 let month = Arc::new(Int64Array::from(vec![Some(4), Some(6)]));
1441
1442 let schema = Schema::new(vec![
1443 Field::new(
1444 "a",
1445 DataType::Struct(Fields::from(vec![animals_field, n_legs_field, year_field])),
1446 false,
1447 ),
1448 Field::new("month", DataType::Int64, true),
1449 ]);
1450
1451 let normalized =
1452 RecordBatch::try_new(Arc::new(schema.clone()), vec![a.clone(), month.clone()])
1453 .expect("valid conversion")
1454 .normalize(".", Some(0))
1455 .expect("valid normalization");
1456
1457 let expected = RecordBatch::try_from_iter_with_nullable(vec![
1458 ("a.animals", animals.clone(), true),
1459 ("a.n_legs", n_legs.clone(), true),
1460 ("a.year", year.clone(), true),
1461 ("month", month.clone(), true),
1462 ])
1463 .expect("valid conversion");
1464
1465 assert_eq!(expected, normalized);
1466
1467 let normalized = RecordBatch::try_new(Arc::new(schema), vec![a, month.clone()])
1469 .expect("valid conversion")
1470 .normalize(".", None)
1471 .expect("valid normalization");
1472
1473 assert_eq!(expected, normalized);
1474 }
1475
1476 #[test]
1477 fn normalize_nested() {
1478 let a = Arc::new(Field::new("a", DataType::Int64, true));
1480 let b = Arc::new(Field::new("b", DataType::Int64, false));
1481 let c = Arc::new(Field::new("c", DataType::Int64, true));
1482
1483 let one = Arc::new(Field::new(
1484 "1",
1485 DataType::Struct(Fields::from(vec![a.clone(), b.clone(), c.clone()])),
1486 false,
1487 ));
1488 let two = Arc::new(Field::new(
1489 "2",
1490 DataType::Struct(Fields::from(vec![a.clone(), b.clone(), c.clone()])),
1491 true,
1492 ));
1493
1494 let exclamation = Arc::new(Field::new(
1495 "!",
1496 DataType::Struct(Fields::from(vec![one.clone(), two.clone()])),
1497 false,
1498 ));
1499
1500 let schema = Schema::new(vec![exclamation.clone()]);
1501
1502 let a_field = Int64Array::from(vec![Some(0), Some(1)]);
1504 let b_field = Int64Array::from(vec![Some(2), Some(3)]);
1505 let c_field = Int64Array::from(vec![None, Some(4)]);
1506
1507 let one_field = StructArray::from(vec![
1508 (a.clone(), Arc::new(a_field.clone()) as ArrayRef),
1509 (b.clone(), Arc::new(b_field.clone()) as ArrayRef),
1510 (c.clone(), Arc::new(c_field.clone()) as ArrayRef),
1511 ]);
1512 let two_field = StructArray::from(vec![
1513 (a.clone(), Arc::new(a_field.clone()) as ArrayRef),
1514 (b.clone(), Arc::new(b_field.clone()) as ArrayRef),
1515 (c.clone(), Arc::new(c_field.clone()) as ArrayRef),
1516 ]);
1517
1518 let exclamation_field = Arc::new(StructArray::from(vec![
1519 (one.clone(), Arc::new(one_field) as ArrayRef),
1520 (two.clone(), Arc::new(two_field) as ArrayRef),
1521 ]));
1522
1523 let normalized =
1525 RecordBatch::try_new(Arc::new(schema.clone()), vec![exclamation_field.clone()])
1526 .expect("valid conversion")
1527 .normalize(".", Some(1))
1528 .expect("valid normalization");
1529
1530 let expected = RecordBatch::try_from_iter_with_nullable(vec![
1531 (
1532 "!.1",
1533 Arc::new(StructArray::from(vec![
1534 (a.clone(), Arc::new(a_field.clone()) as ArrayRef),
1535 (b.clone(), Arc::new(b_field.clone()) as ArrayRef),
1536 (c.clone(), Arc::new(c_field.clone()) as ArrayRef),
1537 ])) as ArrayRef,
1538 false,
1539 ),
1540 (
1541 "!.2",
1542 Arc::new(StructArray::from(vec![
1543 (a.clone(), Arc::new(a_field.clone()) as ArrayRef),
1544 (b.clone(), Arc::new(b_field.clone()) as ArrayRef),
1545 (c.clone(), Arc::new(c_field.clone()) as ArrayRef),
1546 ])) as ArrayRef,
1547 true,
1548 ),
1549 ])
1550 .expect("valid conversion");
1551
1552 assert_eq!(expected, normalized);
1553
1554 let normalized = RecordBatch::try_new(Arc::new(schema), vec![exclamation_field])
1556 .expect("valid conversion")
1557 .normalize(".", None)
1558 .expect("valid normalization");
1559
1560 let expected = RecordBatch::try_from_iter_with_nullable(vec![
1561 ("!.1.a", Arc::new(a_field.clone()) as ArrayRef, true),
1562 ("!.1.b", Arc::new(b_field.clone()) as ArrayRef, false),
1563 ("!.1.c", Arc::new(c_field.clone()) as ArrayRef, true),
1564 ("!.2.a", Arc::new(a_field.clone()) as ArrayRef, true),
1565 ("!.2.b", Arc::new(b_field.clone()) as ArrayRef, false),
1566 ("!.2.c", Arc::new(c_field.clone()) as ArrayRef, true),
1567 ])
1568 .expect("valid conversion");
1569
1570 assert_eq!(expected, normalized);
1571 }
1572
1573 #[test]
1574 fn normalize_empty() {
1575 let animals_field = Arc::new(Field::new("animals", DataType::Utf8, true));
1576 let n_legs_field = Arc::new(Field::new("n_legs", DataType::Int64, true));
1577 let year_field = Arc::new(Field::new("year", DataType::Int64, true));
1578
1579 let schema = Schema::new(vec![
1580 Field::new(
1581 "a",
1582 DataType::Struct(Fields::from(vec![animals_field, n_legs_field, year_field])),
1583 false,
1584 ),
1585 Field::new("month", DataType::Int64, true),
1586 ]);
1587
1588 let normalized = RecordBatch::new_empty(Arc::new(schema.clone()))
1589 .normalize(".", Some(0))
1590 .expect("valid normalization");
1591
1592 let expected = RecordBatch::new_empty(Arc::new(
1593 schema.normalize(".", Some(0)).expect("valid normalization"),
1594 ));
1595
1596 assert_eq!(expected, normalized);
1597 }
1598
1599 #[test]
1600 fn project() {
1601 let a: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(3)]));
1602 let b: ArrayRef = Arc::new(StringArray::from(vec!["a", "b", "c"]));
1603 let c: ArrayRef = Arc::new(StringArray::from(vec!["d", "e", "f"]));
1604
1605 let record_batch =
1606 RecordBatch::try_from_iter(vec![("a", a.clone()), ("b", b.clone()), ("c", c.clone())])
1607 .expect("valid conversion");
1608
1609 let expected =
1610 RecordBatch::try_from_iter(vec![("a", a), ("c", c)]).expect("valid conversion");
1611
1612 assert_eq!(expected, record_batch.project(&[0, 2]).unwrap());
1613 }
1614
1615 #[test]
1616 fn project_empty() {
1617 let c: ArrayRef = Arc::new(StringArray::from(vec!["d", "e", "f"]));
1618
1619 let record_batch =
1620 RecordBatch::try_from_iter(vec![("c", c.clone())]).expect("valid conversion");
1621
1622 let expected = RecordBatch::try_new_with_options(
1623 Arc::new(Schema::empty()),
1624 vec![],
1625 &RecordBatchOptions {
1626 match_field_names: true,
1627 row_count: Some(3),
1628 },
1629 )
1630 .expect("valid conversion");
1631
1632 assert_eq!(expected, record_batch.project(&[]).unwrap());
1633 }
1634
1635 #[test]
1636 fn test_no_column_record_batch() {
1637 let schema = Arc::new(Schema::empty());
1638
1639 let err = RecordBatch::try_new(schema.clone(), vec![]).unwrap_err();
1640 assert!(
1641 err.to_string()
1642 .contains("must either specify a row count or at least one column")
1643 );
1644
1645 let options = RecordBatchOptions::new().with_row_count(Some(10));
1646
1647 let ok = RecordBatch::try_new_with_options(schema.clone(), vec![], &options).unwrap();
1648 assert_eq!(ok.num_rows(), 10);
1649
1650 let a = ok.slice(2, 5);
1651 assert_eq!(a.num_rows(), 5);
1652
1653 let b = ok.slice(5, 0);
1654 assert_eq!(b.num_rows(), 0);
1655
1656 assert_ne!(a, b);
1657 assert_eq!(b, RecordBatch::new_empty(schema))
1658 }
1659
1660 #[test]
1661 fn test_nulls_in_non_nullable_field() {
1662 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
1663 let maybe_batch = RecordBatch::try_new(
1664 schema,
1665 vec![Arc::new(Int32Array::from(vec![Some(1), None]))],
1666 );
1667 assert_eq!(
1668 "Invalid argument error: Column 'a' is declared as non-nullable but contains null values",
1669 format!("{}", maybe_batch.err().unwrap())
1670 );
1671 }
1672 #[test]
1673 fn test_record_batch_options() {
1674 let options = RecordBatchOptions::new()
1675 .with_match_field_names(false)
1676 .with_row_count(Some(20));
1677 assert!(!options.match_field_names);
1678 assert_eq!(options.row_count.unwrap(), 20)
1679 }
1680
1681 #[test]
1682 #[should_panic(expected = "Cannot convert nullable StructArray to RecordBatch")]
1683 fn test_from_struct() {
1684 let s = StructArray::from(ArrayData::new_null(
1685 &DataType::Struct(vec![Field::new("foo", DataType::Int32, false)].into()),
1687 2,
1688 ));
1689 let _ = RecordBatch::from(s);
1690 }
1691
1692 #[test]
1693 fn test_with_schema() {
1694 let required_schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1695 let required_schema = Arc::new(required_schema);
1696 let nullable_schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
1697 let nullable_schema = Arc::new(nullable_schema);
1698
1699 let batch = RecordBatch::try_new(
1700 required_schema.clone(),
1701 vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as _],
1702 )
1703 .unwrap();
1704
1705 let batch = batch.with_schema(nullable_schema.clone()).unwrap();
1707
1708 batch.clone().with_schema(required_schema).unwrap_err();
1710
1711 let metadata = vec![("foo".to_string(), "bar".to_string())]
1713 .into_iter()
1714 .collect();
1715 let metadata_schema = nullable_schema.as_ref().clone().with_metadata(metadata);
1716 let batch = batch.with_schema(Arc::new(metadata_schema)).unwrap();
1717
1718 batch.with_schema(nullable_schema).unwrap_err();
1720 }
1721
1722 #[test]
1723 fn test_boxed_reader() {
1724 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1727 let schema = Arc::new(schema);
1728
1729 let reader = RecordBatchIterator::new(std::iter::empty(), schema);
1730 let reader: Box<dyn RecordBatchReader + Send> = Box::new(reader);
1731
1732 fn get_size(reader: impl RecordBatchReader) -> usize {
1733 reader.size_hint().0
1734 }
1735
1736 let size = get_size(reader);
1737 assert_eq!(size, 0);
1738 }
1739
1740 #[test]
1741 fn test_remove_column_maintains_schema_metadata() {
1742 let id_array = Int32Array::from(vec![1, 2, 3, 4, 5]);
1743 let bool_array = BooleanArray::from(vec![true, false, false, true, true]);
1744
1745 let mut metadata = HashMap::new();
1746 metadata.insert("foo".to_string(), "bar".to_string());
1747 let schema = Schema::new(vec![
1748 Field::new("id", DataType::Int32, false),
1749 Field::new("bool", DataType::Boolean, false),
1750 ])
1751 .with_metadata(metadata);
1752
1753 let mut batch = RecordBatch::try_new(
1754 Arc::new(schema),
1755 vec![Arc::new(id_array), Arc::new(bool_array)],
1756 )
1757 .unwrap();
1758
1759 let _removed_column = batch.remove_column(0);
1760 assert_eq!(batch.schema().metadata().len(), 1);
1761 assert_eq!(
1762 batch.schema().metadata().get("foo").unwrap().as_str(),
1763 "bar"
1764 );
1765 }
1766
1767 #[test]
1768 fn test_normalize_nullable_struct() {
1769 let child = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef;
1770 let struct_nulls =
1771 NullBuffer::new(arrow_buffer::BooleanBuffer::from(vec![true, false, true]));
1772 let struct_array = Arc::new(StructArray::new(
1773 Fields::from(vec![Field::new("x", DataType::Int32, false)]),
1774 vec![child],
1775 Some(struct_nulls),
1776 )) as ArrayRef;
1777
1778 let schema = Schema::new(vec![Field::new(
1779 "s",
1780 DataType::Struct(Fields::from(vec![Field::new("x", DataType::Int32, false)])),
1781 true,
1782 )]);
1783 let batch = RecordBatch::try_new(Arc::new(schema), vec![struct_array]).unwrap();
1784
1785 let normalized = batch.normalize(".", None).unwrap();
1786
1787 assert_eq!(normalized.num_columns(), 1);
1788 assert_eq!(normalized.schema().field(0).name(), "s.x");
1789 assert!(normalized.schema().field(0).is_nullable());
1790 let col = normalized.column(0);
1791 assert!(col.is_valid(0));
1792 assert!(col.is_null(1));
1793 assert!(col.is_valid(2));
1794 }
1795}