1mod stream;
28pub use stream::*;
29
30use arrow_select::concat;
31
32use flatbuffers::{VectorIter, VerifierOptions};
33use std::collections::{HashMap, VecDeque};
34use std::fmt;
35use std::io::{BufReader, Read, Seek, SeekFrom};
36use std::sync::Arc;
37
38use arrow_array::*;
39use arrow_buffer::{
40 ArrowNativeType, BooleanBuffer, Buffer, MutableBuffer, NullBuffer, ScalarBuffer,
41};
42use arrow_data::{ArrayData, ArrayDataBuilder, UnsafeFlag};
43use arrow_schema::*;
44
45use crate::compression::{CompressionCodec, DecompressionContext};
46use crate::r#gen::Message::{self};
47use crate::{Block, CONTINUATION_MARKER, FieldNode, MetadataVersion};
48use DataType::*;
49
50fn read_buffer(
60 buf: &crate::Buffer,
61 a_data: &Buffer,
62 compression_codec: Option<CompressionCodec>,
63 decompression_context: &mut DecompressionContext,
64) -> Result<Buffer, ArrowError> {
65 let start_offset = buf.offset() as usize;
66 let buf_data = a_data.slice_with_length(start_offset, buf.length() as usize);
67 match (buf_data.is_empty(), compression_codec) {
69 (true, _) | (_, None) => Ok(buf_data),
70 (false, Some(decompressor)) => {
71 decompressor.decompress_to_buffer(&buf_data, decompression_context)
72 }
73 }
74}
75impl RecordBatchDecoder<'_> {
76 fn create_array(
89 &mut self,
90 field: &Field,
91 variadic_counts: &mut VecDeque<i64>,
92 ) -> Result<ArrayRef, ArrowError> {
93 let data_type = field.data_type();
94 match data_type {
95 Utf8 | Binary | LargeBinary | LargeUtf8 => {
96 let field_node = self.next_node(field)?;
97 let buffers = [
98 self.next_buffer()?,
99 self.next_buffer()?,
100 self.next_buffer()?,
101 ];
102 self.create_primitive_array(field_node, data_type, &buffers)
103 }
104 BinaryView | Utf8View => {
105 let count = variadic_counts
106 .pop_front()
107 .ok_or(ArrowError::IpcError(format!(
108 "Missing variadic count for {data_type} column"
109 )))?;
110 let count = count + 2; let buffers = (0..count)
112 .map(|_| self.next_buffer())
113 .collect::<Result<Vec<_>, _>>()?;
114 let field_node = self.next_node(field)?;
115 self.create_primitive_array(field_node, data_type, &buffers)
116 }
117 FixedSizeBinary(_) => {
118 let field_node = self.next_node(field)?;
119 let buffers = [self.next_buffer()?, self.next_buffer()?];
120 self.create_primitive_array(field_node, data_type, &buffers)
121 }
122 List(list_field) | LargeList(list_field) | Map(list_field, _) => {
123 let list_node = self.next_node(field)?;
124 let list_buffers = [self.next_buffer()?, self.next_buffer()?];
125 let values = self.create_array(list_field, variadic_counts)?;
126 self.create_list_array(list_node, data_type, &list_buffers, values)
127 }
128 ListView(list_field) | LargeListView(list_field) => {
129 let list_node = self.next_node(field)?;
130 let list_buffers = [
131 self.next_buffer()?, self.next_buffer()?, self.next_buffer()?, ];
135 let values = self.create_array(list_field, variadic_counts)?;
136 self.create_list_view_array(list_node, data_type, &list_buffers, values)
137 }
138 FixedSizeList(list_field, _) => {
139 let list_node = self.next_node(field)?;
140 let list_buffers = [self.next_buffer()?];
141 let values = self.create_array(list_field, variadic_counts)?;
142 self.create_list_array(list_node, data_type, &list_buffers, values)
143 }
144 Struct(struct_fields) => {
145 let struct_node = self.next_node(field)?;
146 let null_buffer = self.next_buffer()?;
147
148 let mut struct_arrays = vec![];
150 for struct_field in struct_fields {
153 let child = self.create_array(struct_field, variadic_counts)?;
154 struct_arrays.push(child);
155 }
156 self.create_struct_array(struct_node, null_buffer, struct_fields, struct_arrays)
157 }
158 RunEndEncoded(run_ends_field, values_field) => {
159 let run_node = self.next_node(field)?;
160 let run_ends = self.create_array(run_ends_field, variadic_counts)?;
161 let values = self.create_array(values_field, variadic_counts)?;
162
163 let run_array_length = run_node.length() as usize;
164 let builder = ArrayData::builder(data_type.clone())
165 .len(run_array_length)
166 .offset(0)
167 .add_child_data(run_ends.into_data())
168 .add_child_data(values.into_data())
169 .null_count(run_node.null_count() as usize);
170
171 self.create_array_from_builder(builder)
172 }
173 Dictionary(_, _) => {
175 let index_node = self.next_node(field)?;
176 let index_buffers = [self.next_buffer()?, self.next_buffer()?];
177
178 #[allow(deprecated)]
179 let dict_id = field.dict_id().ok_or_else(|| {
180 ArrowError::ParseError(format!("Field {field} does not have dict id"))
181 })?;
182
183 let value_array = match self.dictionaries_by_id.get(&dict_id) {
184 Some(array) => array.clone(),
185 None => {
186 if let Dictionary(_, value_type) = data_type {
190 arrow_array::new_empty_array(value_type.as_ref())
191 } else {
192 unreachable!()
193 }
194 }
195 };
196
197 self.create_dictionary_array(index_node, data_type, &index_buffers, value_array)
198 }
199 Union(fields, mode) => {
200 let union_node = self.next_node(field)?;
201 let len = union_node.length() as usize;
202
203 if self.version < MetadataVersion::V5 {
206 self.next_buffer()?;
207 }
208
209 let type_ids: ScalarBuffer<i8> =
210 self.next_buffer()?.slice_with_length(0, len).into();
211
212 let value_offsets = match mode {
213 UnionMode::Dense => {
214 let offsets: ScalarBuffer<i32> =
215 self.next_buffer()?.slice_with_length(0, len * 4).into();
216 Some(offsets)
217 }
218 UnionMode::Sparse => None,
219 };
220
221 let mut children = Vec::with_capacity(fields.len());
222
223 for (_id, field) in fields.iter() {
224 let child = self.create_array(field, variadic_counts)?;
225 children.push(child);
226 }
227
228 let array = if self.skip_validation.get() {
229 unsafe {
231 UnionArray::new_unchecked(fields.clone(), type_ids, value_offsets, children)
232 }
233 } else {
234 UnionArray::try_new(fields.clone(), type_ids, value_offsets, children)?
235 };
236 Ok(Arc::new(array))
237 }
238 Null => {
239 let node = self.next_node(field)?;
240 let length = node.length();
241 let null_count = node.null_count();
242
243 if length != null_count {
244 return Err(ArrowError::SchemaError(format!(
245 "Field {field} of NullArray has unequal null_count {null_count} and len {length}"
246 )));
247 }
248
249 let builder = ArrayData::builder(data_type.clone())
250 .len(length as usize)
251 .offset(0);
252 self.create_array_from_builder(builder)
253 }
254 _ => {
255 let field_node = self.next_node(field)?;
256 let buffers = [self.next_buffer()?, self.next_buffer()?];
257 self.create_primitive_array(field_node, data_type, &buffers)
258 }
259 }
260 }
261
262 fn create_primitive_array(
265 &self,
266 field_node: &FieldNode,
267 data_type: &DataType,
268 buffers: &[Buffer],
269 ) -> Result<ArrayRef, ArrowError> {
270 let length = field_node.length() as usize;
271 let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
272 let mut builder = match data_type {
273 Utf8 | Binary | LargeBinary | LargeUtf8 => {
274 ArrayData::builder(data_type.clone())
276 .len(length)
277 .buffers(buffers[1..3].to_vec())
278 .null_bit_buffer(null_buffer)
279 }
280 BinaryView | Utf8View => ArrayData::builder(data_type.clone())
281 .len(length)
282 .buffers(buffers[1..].to_vec())
283 .null_bit_buffer(null_buffer),
284 _ if data_type.is_primitive() || matches!(data_type, Boolean | FixedSizeBinary(_)) => {
285 ArrayData::builder(data_type.clone())
287 .len(length)
288 .add_buffer(buffers[1].clone())
289 .null_bit_buffer(null_buffer)
290 }
291 t => unreachable!("Data type {:?} either unsupported or not primitive", t),
292 };
293
294 builder = builder.null_count(field_node.null_count() as usize);
295
296 self.create_array_from_builder(builder)
297 }
298
299 fn create_array_from_builder(&self, builder: ArrayDataBuilder) -> Result<ArrayRef, ArrowError> {
301 let mut builder = builder.align_buffers(!self.require_alignment);
302 if self.skip_validation.get() {
303 unsafe { builder = builder.skip_validation(true) }
305 };
306 Ok(make_array(builder.build()?))
307 }
308
309 fn create_list_array(
312 &self,
313 field_node: &FieldNode,
314 data_type: &DataType,
315 buffers: &[Buffer],
316 child_array: ArrayRef,
317 ) -> Result<ArrayRef, ArrowError> {
318 let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
319 let length = field_node.length() as usize;
320 let child_data = child_array.into_data();
321 let mut builder = match data_type {
322 List(_) | LargeList(_) | Map(_, _) => ArrayData::builder(data_type.clone())
323 .len(length)
324 .add_buffer(buffers[1].clone())
325 .add_child_data(child_data)
326 .null_bit_buffer(null_buffer),
327
328 FixedSizeList(_, _) => ArrayData::builder(data_type.clone())
329 .len(length)
330 .add_child_data(child_data)
331 .null_bit_buffer(null_buffer),
332
333 _ => unreachable!("Cannot create list or map array from {:?}", data_type),
334 };
335
336 builder = builder.null_count(field_node.null_count() as usize);
337
338 self.create_array_from_builder(builder)
339 }
340
341 fn create_list_view_array(
342 &self,
343 field_node: &FieldNode,
344 data_type: &DataType,
345 buffers: &[Buffer],
346 child_array: ArrayRef,
347 ) -> Result<ArrayRef, ArrowError> {
348 assert!(matches!(data_type, ListView(_) | LargeListView(_)));
349
350 let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
351 let length = field_node.length() as usize;
352 let child_data = child_array.into_data();
353
354 self.create_array_from_builder(
355 ArrayData::builder(data_type.clone())
356 .len(length)
357 .add_buffer(buffers[1].clone()) .add_buffer(buffers[2].clone()) .add_child_data(child_data)
360 .null_bit_buffer(null_buffer)
361 .null_count(field_node.null_count() as usize),
362 )
363 }
364
365 fn create_struct_array(
366 &self,
367 struct_node: &FieldNode,
368 null_buffer: Buffer,
369 struct_fields: &Fields,
370 struct_arrays: Vec<ArrayRef>,
371 ) -> Result<ArrayRef, ArrowError> {
372 let null_count = struct_node.null_count() as usize;
373 let len = struct_node.length() as usize;
374 let skip_validation = self.skip_validation.get();
375
376 let nulls = if null_count > 0 {
377 let validity_buffer = BooleanBuffer::new(null_buffer, 0, len);
378 let null_buffer = if skip_validation {
379 unsafe { NullBuffer::new_unchecked(validity_buffer, null_count) }
381 } else {
382 let null_buffer = NullBuffer::new(validity_buffer);
383
384 if null_buffer.null_count() != null_count {
385 return Err(ArrowError::InvalidArgumentError(format!(
386 "null_count value ({}) doesn't match actual number of nulls in array ({})",
387 null_count,
388 null_buffer.null_count()
389 )));
390 }
391
392 null_buffer
393 };
394
395 Some(null_buffer)
396 } else {
397 None
398 };
399 if struct_arrays.is_empty() {
400 return Ok(Arc::new(StructArray::new_empty_fields(len, nulls)));
403 }
404
405 let struct_array = if skip_validation {
406 unsafe { StructArray::new_unchecked(struct_fields.clone(), struct_arrays, nulls) }
408 } else {
409 StructArray::try_new(struct_fields.clone(), struct_arrays, nulls)?
410 };
411
412 Ok(Arc::new(struct_array))
413 }
414
415 fn create_dictionary_array(
418 &self,
419 field_node: &FieldNode,
420 data_type: &DataType,
421 buffers: &[Buffer],
422 value_array: ArrayRef,
423 ) -> Result<ArrayRef, ArrowError> {
424 if let Dictionary(_, _) = *data_type {
425 let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
426 let builder = ArrayData::builder(data_type.clone())
427 .len(field_node.length() as usize)
428 .add_buffer(buffers[1].clone())
429 .add_child_data(value_array.into_data())
430 .null_bit_buffer(null_buffer)
431 .null_count(field_node.null_count() as usize);
432 self.create_array_from_builder(builder)
433 } else {
434 unreachable!("Cannot create dictionary array from {:?}", data_type)
435 }
436 }
437}
438
439pub struct RecordBatchDecoder<'a> {
445 batch: crate::RecordBatch<'a>,
447 schema: SchemaRef,
449 dictionaries_by_id: &'a HashMap<i64, ArrayRef>,
451 compression: Option<CompressionCodec>,
453 decompression_context: DecompressionContext,
455 version: MetadataVersion,
457 data: &'a Buffer,
459 nodes: VectorIter<'a, FieldNode>,
461 buffers: VectorIter<'a, crate::Buffer>,
463 projection: Option<&'a [usize]>,
466 require_alignment: bool,
469 skip_validation: UnsafeFlag,
473}
474
475impl<'a> RecordBatchDecoder<'a> {
476 fn try_new(
478 buf: &'a Buffer,
479 batch: crate::RecordBatch<'a>,
480 schema: SchemaRef,
481 dictionaries_by_id: &'a HashMap<i64, ArrayRef>,
482 metadata: &'a MetadataVersion,
483 ) -> Result<Self, ArrowError> {
484 let buffers = batch.buffers().ok_or_else(|| {
485 ArrowError::IpcError("Unable to get buffers from IPC RecordBatch".to_string())
486 })?;
487 let field_nodes = batch.nodes().ok_or_else(|| {
488 ArrowError::IpcError("Unable to get field nodes from IPC RecordBatch".to_string())
489 })?;
490
491 let batch_compression = batch.compression();
492 let compression = batch_compression
493 .map(|batch_compression| batch_compression.codec().try_into())
494 .transpose()?;
495
496 Ok(Self {
497 batch,
498 schema,
499 dictionaries_by_id,
500 compression,
501 decompression_context: DecompressionContext::new(),
502 version: *metadata,
503 data: buf,
504 nodes: field_nodes.iter(),
505 buffers: buffers.iter(),
506 projection: None,
507 require_alignment: false,
508 skip_validation: UnsafeFlag::new(),
509 })
510 }
511
512 pub fn with_projection(mut self, projection: Option<&'a [usize]>) -> Self {
517 self.projection = projection;
518 self
519 }
520
521 pub fn with_require_alignment(mut self, require_alignment: bool) -> Self {
527 self.require_alignment = require_alignment;
528 self
529 }
530
531 pub(crate) fn with_skip_validation(mut self, skip_validation: UnsafeFlag) -> Self {
543 self.skip_validation = skip_validation;
544 self
545 }
546
547 fn read_record_batch(mut self) -> Result<RecordBatch, ArrowError> {
549 let mut variadic_counts: VecDeque<i64> = self
550 .batch
551 .variadicBufferCounts()
552 .into_iter()
553 .flatten()
554 .collect();
555
556 let options = RecordBatchOptions::new().with_row_count(Some(self.batch.length() as usize));
557
558 let schema = Arc::clone(&self.schema);
559 if let Some(projection) = self.projection {
560 let mut arrays = vec![];
561 for (idx, field) in schema.fields().iter().enumerate() {
563 let mut child = None;
565 for (proj_idx, projected_idx) in projection.iter().enumerate() {
566 if *projected_idx == idx {
567 if child.is_none() {
568 child = Some(self.create_array(field, &mut variadic_counts)?);
569 }
570
571 arrays.push((proj_idx, child.as_ref().unwrap().clone()));
573 }
574 }
575
576 if child.is_none() {
577 self.skip_field(field, &mut variadic_counts)?;
578 }
579 }
580
581 arrays.sort_by_key(|t| t.0);
582
583 let schema = Arc::new(schema.project(projection)?);
584 let columns = arrays.into_iter().map(|t| t.1).collect::<Vec<_>>();
585
586 if self.skip_validation.get() {
587 unsafe {
589 Ok(RecordBatch::new_unchecked(
590 schema,
591 columns,
592 self.batch.length() as usize,
593 ))
594 }
595 } else {
596 assert!(variadic_counts.is_empty());
597 RecordBatch::try_new_with_options(schema, columns, &options)
598 }
599 } else {
600 let mut children = vec![];
601 for field in schema.fields() {
603 let child = self.create_array(field, &mut variadic_counts)?;
604 children.push(child);
605 }
606
607 if self.skip_validation.get() {
608 unsafe {
610 Ok(RecordBatch::new_unchecked(
611 schema,
612 children,
613 self.batch.length() as usize,
614 ))
615 }
616 } else {
617 assert!(variadic_counts.is_empty());
618 RecordBatch::try_new_with_options(schema, children, &options)
619 }
620 }
621 }
622
623 fn next_buffer(&mut self) -> Result<Buffer, ArrowError> {
624 let buffer = self.buffers.next().ok_or_else(|| {
625 ArrowError::IpcError("Buffer count mismatched with metadata".to_string())
626 })?;
627 read_buffer(
628 buffer,
629 self.data,
630 self.compression,
631 &mut self.decompression_context,
632 )
633 }
634
635 fn skip_buffer(&mut self) {
636 self.buffers.next().unwrap();
637 }
638
639 fn next_node(&mut self, field: &Field) -> Result<&'a FieldNode, ArrowError> {
640 self.nodes.next().ok_or_else(|| {
641 ArrowError::SchemaError(format!(
642 "Invalid data for schema. {field} refers to node not found in schema",
643 ))
644 })
645 }
646
647 fn skip_field(
648 &mut self,
649 field: &Field,
650 variadic_count: &mut VecDeque<i64>,
651 ) -> Result<(), ArrowError> {
652 self.next_node(field)?;
653
654 match field.data_type() {
655 Utf8 | Binary | LargeBinary | LargeUtf8 => {
656 for _ in 0..3 {
657 self.skip_buffer()
658 }
659 }
660 Utf8View | BinaryView => {
661 let count = variadic_count
662 .pop_front()
663 .ok_or(ArrowError::IpcError(format!(
664 "Missing variadic count for {} column",
665 field.data_type()
666 )))?;
667 let count = count + 2; for _i in 0..count {
669 self.skip_buffer()
670 }
671 }
672 FixedSizeBinary(_) => {
673 self.skip_buffer();
674 self.skip_buffer();
675 }
676 List(list_field) | LargeList(list_field) | Map(list_field, _) => {
677 self.skip_buffer();
678 self.skip_buffer();
679 self.skip_field(list_field, variadic_count)?;
680 }
681 ListView(list_field) | LargeListView(list_field) => {
682 self.skip_buffer(); self.skip_buffer(); self.skip_buffer(); self.skip_field(list_field, variadic_count)?;
686 }
687 FixedSizeList(list_field, _) => {
688 self.skip_buffer();
689 self.skip_field(list_field, variadic_count)?;
690 }
691 Struct(struct_fields) => {
692 self.skip_buffer();
693
694 for struct_field in struct_fields {
696 self.skip_field(struct_field, variadic_count)?
697 }
698 }
699 RunEndEncoded(run_ends_field, values_field) => {
700 self.skip_field(run_ends_field, variadic_count)?;
701 self.skip_field(values_field, variadic_count)?;
702 }
703 Dictionary(_, _) => {
704 self.skip_buffer(); self.skip_buffer(); }
707 Union(fields, mode) => {
708 if self.version < MetadataVersion::V5 {
709 self.skip_buffer(); }
711 self.skip_buffer(); match mode {
714 UnionMode::Dense => self.skip_buffer(), UnionMode::Sparse => {}
716 };
717
718 for (_, field) in fields.iter() {
719 self.skip_field(field, variadic_count)?
720 }
721 }
722 Null => {}
724
725 Boolean
727 | Int8
728 | Int16
729 | Int32
730 | Int64
731 | UInt8
732 | UInt16
733 | UInt32
734 | UInt64
735 | Float16
736 | Float32
737 | Float64
738 | Timestamp(_, _)
739 | Date32
740 | Date64
741 | Time32(_)
742 | Time64(_)
743 | Duration(_)
744 | Interval(_)
745 | Decimal32(_, _)
746 | Decimal64(_, _)
747 | Decimal128(_, _)
748 | Decimal256(_, _) => {
749 self.skip_buffer();
750 self.skip_buffer();
751 }
752 };
753 Ok(())
754 }
755}
756
757pub fn read_record_batch(
768 buf: &Buffer,
769 batch: crate::RecordBatch,
770 schema: SchemaRef,
771 dictionaries_by_id: &HashMap<i64, ArrayRef>,
772 projection: Option<&[usize]>,
773 metadata: &MetadataVersion,
774) -> Result<RecordBatch, ArrowError> {
775 RecordBatchDecoder::try_new(buf, batch, schema, dictionaries_by_id, metadata)?
776 .with_projection(projection)
777 .with_require_alignment(false)
778 .read_record_batch()
779}
780
781pub fn read_dictionary(
784 buf: &Buffer,
785 batch: crate::DictionaryBatch,
786 schema: &Schema,
787 dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
788 metadata: &MetadataVersion,
789) -> Result<(), ArrowError> {
790 read_dictionary_impl(
791 buf,
792 batch,
793 schema,
794 dictionaries_by_id,
795 metadata,
796 false,
797 UnsafeFlag::new(),
798 )
799}
800
801fn read_dictionary_impl(
802 buf: &Buffer,
803 batch: crate::DictionaryBatch,
804 schema: &Schema,
805 dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
806 metadata: &MetadataVersion,
807 require_alignment: bool,
808 skip_validation: UnsafeFlag,
809) -> Result<(), ArrowError> {
810 let id = batch.id();
811
812 let dictionary_values = get_dictionary_values(
813 buf,
814 batch,
815 schema,
816 dictionaries_by_id,
817 metadata,
818 require_alignment,
819 skip_validation,
820 )?;
821
822 update_dictionaries(dictionaries_by_id, batch.isDelta(), id, dictionary_values)?;
823
824 Ok(())
825}
826
827fn update_dictionaries(
836 dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
837 is_delta: bool,
838 dict_id: i64,
839 dict_values: ArrayRef,
840) -> Result<(), ArrowError> {
841 if !is_delta {
842 dictionaries_by_id.insert(dict_id, dict_values.clone());
846 return Ok(());
847 }
848
849 let existing = dictionaries_by_id.get(&dict_id).ok_or_else(|| {
850 ArrowError::InvalidArgumentError(format!(
851 "No existing dictionary for delta dictionary with id '{dict_id}'"
852 ))
853 })?;
854
855 let combined = concat::concat(&[existing, &dict_values]).map_err(|e| {
856 ArrowError::InvalidArgumentError(format!("Failed to concat delta dictionary: {e}"))
857 })?;
858
859 dictionaries_by_id.insert(dict_id, combined);
860
861 Ok(())
862}
863
864fn get_dictionary_values(
868 buf: &Buffer,
869 batch: crate::DictionaryBatch,
870 schema: &Schema,
871 dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
872 metadata: &MetadataVersion,
873 require_alignment: bool,
874 skip_validation: UnsafeFlag,
875) -> Result<ArrayRef, ArrowError> {
876 let id = batch.id();
877 #[allow(deprecated)]
878 let fields_using_this_dictionary = schema.fields_with_dict_id(id);
879 let first_field = fields_using_this_dictionary.first().ok_or_else(|| {
880 ArrowError::InvalidArgumentError(format!("dictionary id {id} not found in schema"))
881 })?;
882
883 let dictionary_values: ArrayRef = match first_field.data_type() {
887 DataType::Dictionary(_, value_type) => {
888 let value = value_type.as_ref().clone();
890 let schema = Schema::new(vec![Field::new("", value, true)]);
891 let record_batch = RecordBatchDecoder::try_new(
893 buf,
894 batch.data().unwrap(),
895 Arc::new(schema),
896 dictionaries_by_id,
897 metadata,
898 )?
899 .with_require_alignment(require_alignment)
900 .with_skip_validation(skip_validation)
901 .read_record_batch()?;
902
903 Some(record_batch.column(0).clone())
904 }
905 _ => None,
906 }
907 .ok_or_else(|| {
908 ArrowError::InvalidArgumentError(format!("dictionary id {id} not found in schema"))
909 })?;
910
911 Ok(dictionary_values)
912}
913
914fn read_block<R: Read + Seek>(mut reader: R, block: &Block) -> Result<Buffer, ArrowError> {
916 reader.seek(SeekFrom::Start(block.offset() as u64))?;
917 let body_len = block.bodyLength().to_usize().unwrap();
918 let metadata_len = block.metaDataLength().to_usize().unwrap();
919 let total_len = body_len.checked_add(metadata_len).unwrap();
920
921 let mut buf = MutableBuffer::from_len_zeroed(total_len);
922 reader.read_exact(&mut buf)?;
923 Ok(buf.into())
924}
925
926fn parse_message(buf: &[u8]) -> Result<Message::Message<'_>, ArrowError> {
930 let buf = match buf[..4] == CONTINUATION_MARKER {
931 true => &buf[8..],
932 false => &buf[4..],
933 };
934 crate::root_as_message(buf)
935 .map_err(|err| ArrowError::ParseError(format!("Unable to get root as message: {err:?}")))
936}
937
938pub fn read_footer_length(buf: [u8; 10]) -> Result<usize, ArrowError> {
942 if buf[4..] != super::ARROW_MAGIC {
943 return Err(ArrowError::ParseError(
944 "Arrow file does not contain correct footer".to_string(),
945 ));
946 }
947
948 let footer_len = i32::from_le_bytes(buf[..4].try_into().unwrap());
950 footer_len
951 .try_into()
952 .map_err(|_| ArrowError::ParseError(format!("Invalid footer length: {footer_len}")))
953}
954
955#[derive(Debug)]
1020pub struct FileDecoder {
1021 schema: SchemaRef,
1022 dictionaries: HashMap<i64, ArrayRef>,
1023 version: MetadataVersion,
1024 projection: Option<Vec<usize>>,
1025 require_alignment: bool,
1026 skip_validation: UnsafeFlag,
1027}
1028
1029impl FileDecoder {
1030 pub fn new(schema: SchemaRef, version: MetadataVersion) -> Self {
1032 Self {
1033 schema,
1034 version,
1035 dictionaries: Default::default(),
1036 projection: None,
1037 require_alignment: false,
1038 skip_validation: UnsafeFlag::new(),
1039 }
1040 }
1041
1042 pub fn with_projection(mut self, projection: Vec<usize>) -> Self {
1044 self.projection = Some(projection);
1045 self
1046 }
1047
1048 pub fn with_require_alignment(mut self, require_alignment: bool) -> Self {
1061 self.require_alignment = require_alignment;
1062 self
1063 }
1064
1065 pub unsafe fn with_skip_validation(mut self, skip_validation: bool) -> Self {
1076 unsafe { self.skip_validation.set(skip_validation) };
1077 self
1078 }
1079
1080 fn read_message<'a>(&self, buf: &'a [u8]) -> Result<Message::Message<'a>, ArrowError> {
1081 let message = parse_message(buf)?;
1082
1083 if self.version != MetadataVersion::V1 && message.version() != self.version {
1085 return Err(ArrowError::IpcError(
1086 "Could not read IPC message as metadata versions mismatch".to_string(),
1087 ));
1088 }
1089 Ok(message)
1090 }
1091
1092 pub fn read_dictionary(&mut self, block: &Block, buf: &Buffer) -> Result<(), ArrowError> {
1094 let message = self.read_message(buf)?;
1095 match message.header_type() {
1096 crate::MessageHeader::DictionaryBatch => {
1097 let batch = message.header_as_dictionary_batch().unwrap();
1098 read_dictionary_impl(
1099 &buf.slice(block.metaDataLength() as _),
1100 batch,
1101 &self.schema,
1102 &mut self.dictionaries,
1103 &message.version(),
1104 self.require_alignment,
1105 self.skip_validation.clone(),
1106 )
1107 }
1108 t => Err(ArrowError::ParseError(format!(
1109 "Expecting DictionaryBatch in dictionary blocks, found {t:?}."
1110 ))),
1111 }
1112 }
1113
1114 pub fn read_record_batch(
1116 &self,
1117 block: &Block,
1118 buf: &Buffer,
1119 ) -> Result<Option<RecordBatch>, ArrowError> {
1120 let message = self.read_message(buf)?;
1121 match message.header_type() {
1122 crate::MessageHeader::Schema => Err(ArrowError::IpcError(
1123 "Not expecting a schema when messages are read".to_string(),
1124 )),
1125 crate::MessageHeader::RecordBatch => {
1126 let batch = message.header_as_record_batch().ok_or_else(|| {
1127 ArrowError::IpcError("Unable to read IPC message as record batch".to_string())
1128 })?;
1129 RecordBatchDecoder::try_new(
1131 &buf.slice(block.metaDataLength() as _),
1132 batch,
1133 self.schema.clone(),
1134 &self.dictionaries,
1135 &message.version(),
1136 )?
1137 .with_projection(self.projection.as_deref())
1138 .with_require_alignment(self.require_alignment)
1139 .with_skip_validation(self.skip_validation.clone())
1140 .read_record_batch()
1141 .map(Some)
1142 }
1143 crate::MessageHeader::NONE => Ok(None),
1144 t => Err(ArrowError::InvalidArgumentError(format!(
1145 "Reading types other than record batches not yet supported, unable to read {t:?}"
1146 ))),
1147 }
1148 }
1149}
1150
1151#[derive(Debug)]
1153pub struct FileReaderBuilder {
1154 projection: Option<Vec<usize>>,
1156 max_footer_fb_tables: usize,
1158 max_footer_fb_depth: usize,
1160}
1161
1162impl Default for FileReaderBuilder {
1163 fn default() -> Self {
1164 let verifier_options = VerifierOptions::default();
1165 Self {
1166 max_footer_fb_tables: verifier_options.max_tables,
1167 max_footer_fb_depth: verifier_options.max_depth,
1168 projection: None,
1169 }
1170 }
1171}
1172
1173impl FileReaderBuilder {
1174 pub fn new() -> Self {
1178 Self::default()
1179 }
1180
1181 pub fn with_projection(mut self, projection: Vec<usize>) -> Self {
1183 self.projection = Some(projection);
1184 self
1185 }
1186
1187 pub fn with_max_footer_fb_tables(mut self, max_footer_fb_tables: usize) -> Self {
1200 self.max_footer_fb_tables = max_footer_fb_tables;
1201 self
1202 }
1203
1204 pub fn with_max_footer_fb_depth(mut self, max_footer_fb_depth: usize) -> Self {
1217 self.max_footer_fb_depth = max_footer_fb_depth;
1218 self
1219 }
1220
1221 pub fn build<R: Read + Seek>(self, mut reader: R) -> Result<FileReader<R>, ArrowError> {
1223 let mut buffer = [0; 10];
1225 reader.seek(SeekFrom::End(-10))?;
1226 reader.read_exact(&mut buffer)?;
1227
1228 let footer_len = read_footer_length(buffer)?;
1229
1230 let mut footer_data = vec![0; footer_len];
1232 reader.seek(SeekFrom::End(-10 - footer_len as i64))?;
1233 reader.read_exact(&mut footer_data)?;
1234
1235 let verifier_options = VerifierOptions {
1236 max_tables: self.max_footer_fb_tables,
1237 max_depth: self.max_footer_fb_depth,
1238 ..Default::default()
1239 };
1240 let footer = crate::root_as_footer_with_opts(&verifier_options, &footer_data[..]).map_err(
1241 |err| ArrowError::ParseError(format!("Unable to get root as footer: {err:?}")),
1242 )?;
1243
1244 let blocks = footer.recordBatches().ok_or_else(|| {
1245 ArrowError::ParseError("Unable to get record batches from IPC Footer".to_string())
1246 })?;
1247
1248 let total_blocks = blocks.len();
1249
1250 let ipc_schema = footer.schema().unwrap();
1251 if !ipc_schema.endianness().equals_to_target_endianness() {
1252 return Err(ArrowError::IpcError(
1253 "the endianness of the source system does not match the endianness of the target system.".to_owned()
1254 ));
1255 }
1256
1257 let schema = crate::convert::fb_to_schema(ipc_schema);
1258
1259 let mut custom_metadata = HashMap::new();
1260 if let Some(fb_custom_metadata) = footer.custom_metadata() {
1261 for kv in fb_custom_metadata.into_iter() {
1262 custom_metadata.insert(
1263 kv.key().unwrap().to_string(),
1264 kv.value().unwrap().to_string(),
1265 );
1266 }
1267 }
1268
1269 let mut decoder = FileDecoder::new(Arc::new(schema), footer.version());
1270 if let Some(projection) = self.projection {
1271 decoder = decoder.with_projection(projection)
1272 }
1273
1274 if let Some(dictionaries) = footer.dictionaries() {
1276 for block in dictionaries {
1277 let buf = read_block(&mut reader, block)?;
1278 decoder.read_dictionary(block, &buf)?;
1279 }
1280 }
1281
1282 Ok(FileReader {
1283 reader,
1284 blocks: blocks.iter().copied().collect(),
1285 current_block: 0,
1286 total_blocks,
1287 decoder,
1288 custom_metadata,
1289 })
1290 }
1291}
1292
1293pub struct FileReader<R> {
1338 reader: R,
1340
1341 decoder: FileDecoder,
1343
1344 blocks: Vec<Block>,
1348
1349 current_block: usize,
1351
1352 total_blocks: usize,
1354
1355 custom_metadata: HashMap<String, String>,
1357}
1358
1359impl<R> fmt::Debug for FileReader<R> {
1360 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
1361 f.debug_struct("FileReader<R>")
1362 .field("decoder", &self.decoder)
1363 .field("blocks", &self.blocks)
1364 .field("current_block", &self.current_block)
1365 .field("total_blocks", &self.total_blocks)
1366 .finish_non_exhaustive()
1367 }
1368}
1369
1370impl<R: Read + Seek> FileReader<BufReader<R>> {
1371 pub fn try_new_buffered(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
1375 Self::try_new(BufReader::new(reader), projection)
1376 }
1377}
1378
1379impl<R: Read + Seek> FileReader<R> {
1380 pub fn try_new(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
1391 let builder = FileReaderBuilder {
1392 projection,
1393 ..Default::default()
1394 };
1395 builder.build(reader)
1396 }
1397
1398 pub fn custom_metadata(&self) -> &HashMap<String, String> {
1400 &self.custom_metadata
1401 }
1402
1403 pub fn num_batches(&self) -> usize {
1405 self.total_blocks
1406 }
1407
1408 pub fn schema(&self) -> SchemaRef {
1410 self.decoder.schema.clone()
1411 }
1412
1413 pub fn set_index(&mut self, index: usize) -> Result<(), ArrowError> {
1417 if index >= self.total_blocks {
1418 Err(ArrowError::InvalidArgumentError(format!(
1419 "Cannot set batch to index {} from {} total batches",
1420 index, self.total_blocks
1421 )))
1422 } else {
1423 self.current_block = index;
1424 Ok(())
1425 }
1426 }
1427
1428 fn maybe_next(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
1429 let block = &self.blocks[self.current_block];
1430 self.current_block += 1;
1431
1432 let buffer = read_block(&mut self.reader, block)?;
1434 self.decoder.read_record_batch(block, &buffer)
1435 }
1436
1437 pub fn get_ref(&self) -> &R {
1441 &self.reader
1442 }
1443
1444 pub fn get_mut(&mut self) -> &mut R {
1448 &mut self.reader
1449 }
1450
1451 pub unsafe fn with_skip_validation(mut self, skip_validation: bool) -> Self {
1457 self.decoder = unsafe { self.decoder.with_skip_validation(skip_validation) };
1458 self
1459 }
1460}
1461
1462impl<R: Read + Seek> Iterator for FileReader<R> {
1463 type Item = Result<RecordBatch, ArrowError>;
1464
1465 fn next(&mut self) -> Option<Self::Item> {
1466 if self.current_block < self.total_blocks {
1468 self.maybe_next().transpose()
1469 } else {
1470 None
1471 }
1472 }
1473}
1474
1475impl<R: Read + Seek> RecordBatchReader for FileReader<R> {
1476 fn schema(&self) -> SchemaRef {
1477 self.schema()
1478 }
1479}
1480
1481pub struct StreamReader<R> {
1515 reader: MessageReader<R>,
1517
1518 schema: SchemaRef,
1520
1521 dictionaries_by_id: HashMap<i64, ArrayRef>,
1525
1526 finished: bool,
1530
1531 projection: Option<(Vec<usize>, Schema)>,
1533
1534 skip_validation: UnsafeFlag,
1538}
1539
1540impl<R> fmt::Debug for StreamReader<R> {
1541 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> {
1542 f.debug_struct("StreamReader<R>")
1543 .field("reader", &"R")
1544 .field("schema", &self.schema)
1545 .field("dictionaries_by_id", &self.dictionaries_by_id)
1546 .field("finished", &self.finished)
1547 .field("projection", &self.projection)
1548 .finish()
1549 }
1550}
1551
1552impl<R: Read> StreamReader<BufReader<R>> {
1553 pub fn try_new_buffered(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
1557 Self::try_new(BufReader::new(reader), projection)
1558 }
1559}
1560
1561impl<R: Read> StreamReader<R> {
1562 pub fn try_new(
1574 reader: R,
1575 projection: Option<Vec<usize>>,
1576 ) -> Result<StreamReader<R>, ArrowError> {
1577 let mut msg_reader = MessageReader::new(reader);
1578 let message = msg_reader.maybe_next()?;
1579 let Some((message, _)) = message else {
1580 return Err(ArrowError::IpcError(
1581 "Expected schema message, found empty stream.".to_string(),
1582 ));
1583 };
1584
1585 if message.header_type() != Message::MessageHeader::Schema {
1586 return Err(ArrowError::IpcError(format!(
1587 "Expected a schema as the first message in the stream, got: {:?}",
1588 message.header_type()
1589 )));
1590 }
1591
1592 let schema = message.header_as_schema().ok_or_else(|| {
1593 ArrowError::ParseError("Failed to parse schema from message header".to_string())
1594 })?;
1595 let schema = crate::convert::fb_to_schema(schema);
1596
1597 let dictionaries_by_id = HashMap::new();
1599
1600 let projection = match projection {
1601 Some(projection_indices) => {
1602 let schema = schema.project(&projection_indices)?;
1603 Some((projection_indices, schema))
1604 }
1605 _ => None,
1606 };
1607
1608 Ok(Self {
1609 reader: msg_reader,
1610 schema: Arc::new(schema),
1611 finished: false,
1612 dictionaries_by_id,
1613 projection,
1614 skip_validation: UnsafeFlag::new(),
1615 })
1616 }
1617
1618 #[deprecated(since = "53.0.0", note = "use `try_new` instead")]
1620 pub fn try_new_unbuffered(
1621 reader: R,
1622 projection: Option<Vec<usize>>,
1623 ) -> Result<Self, ArrowError> {
1624 Self::try_new(reader, projection)
1625 }
1626
1627 pub fn schema(&self) -> SchemaRef {
1629 self.schema.clone()
1630 }
1631
1632 pub fn is_finished(&self) -> bool {
1634 self.finished
1635 }
1636
1637 fn maybe_next(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
1638 if self.finished {
1639 return Ok(None);
1640 }
1641
1642 loop {
1644 let message = self.next_ipc_message()?;
1645 let Some(message) = message else {
1646 self.finished = true;
1648 return Ok(None);
1649 };
1650
1651 match message {
1652 IpcMessage::Schema(_) => {
1653 return Err(ArrowError::IpcError(
1654 "Expected a record batch, but found a schema".to_string(),
1655 ));
1656 }
1657 IpcMessage::RecordBatch(record_batch) => {
1658 return Ok(Some(record_batch));
1659 }
1660 IpcMessage::DictionaryBatch { .. } => {
1661 continue;
1662 }
1663 };
1664 }
1665 }
1666
1667 pub(crate) fn next_ipc_message(&mut self) -> Result<Option<IpcMessage>, ArrowError> {
1675 let message = self.reader.maybe_next()?;
1676 let Some((message, body)) = message else {
1677 return Ok(None);
1679 };
1680
1681 let ipc_message = match message.header_type() {
1682 Message::MessageHeader::Schema => {
1683 let schema = message.header_as_schema().ok_or_else(|| {
1684 ArrowError::ParseError("Failed to parse schema from message header".to_string())
1685 })?;
1686 let arrow_schema = crate::convert::fb_to_schema(schema);
1687 IpcMessage::Schema(arrow_schema)
1688 }
1689 Message::MessageHeader::RecordBatch => {
1690 let batch = message.header_as_record_batch().ok_or_else(|| {
1691 ArrowError::IpcError("Unable to read IPC message as record batch".to_string())
1692 })?;
1693
1694 let version = message.version();
1695 let schema = self.schema.clone();
1696 let record_batch = RecordBatchDecoder::try_new(
1697 &body.into(),
1698 batch,
1699 schema,
1700 &self.dictionaries_by_id,
1701 &version,
1702 )?
1703 .with_projection(self.projection.as_ref().map(|x| x.0.as_ref()))
1704 .with_require_alignment(false)
1705 .with_skip_validation(self.skip_validation.clone())
1706 .read_record_batch()?;
1707 IpcMessage::RecordBatch(record_batch)
1708 }
1709 Message::MessageHeader::DictionaryBatch => {
1710 let dict = message.header_as_dictionary_batch().ok_or_else(|| {
1711 ArrowError::ParseError(
1712 "Failed to parse dictionary batch from message header".to_string(),
1713 )
1714 })?;
1715
1716 let version = message.version();
1717 let dict_values = get_dictionary_values(
1718 &body.into(),
1719 dict,
1720 &self.schema,
1721 &mut self.dictionaries_by_id,
1722 &version,
1723 false,
1724 self.skip_validation.clone(),
1725 )?;
1726
1727 update_dictionaries(
1728 &mut self.dictionaries_by_id,
1729 dict.isDelta(),
1730 dict.id(),
1731 dict_values.clone(),
1732 )?;
1733
1734 IpcMessage::DictionaryBatch {
1735 id: dict.id(),
1736 is_delta: (dict.isDelta()),
1737 values: (dict_values),
1738 }
1739 }
1740 x => {
1741 return Err(ArrowError::ParseError(format!(
1742 "Unsupported message header type in IPC stream: '{x:?}'"
1743 )));
1744 }
1745 };
1746
1747 Ok(Some(ipc_message))
1748 }
1749
1750 pub fn get_ref(&self) -> &R {
1754 self.reader.inner()
1755 }
1756
1757 pub fn get_mut(&mut self) -> &mut R {
1761 self.reader.inner_mut()
1762 }
1763
1764 pub unsafe fn with_skip_validation(mut self, skip_validation: bool) -> Self {
1770 unsafe { self.skip_validation.set(skip_validation) };
1771 self
1772 }
1773}
1774
1775impl<R: Read> Iterator for StreamReader<R> {
1776 type Item = Result<RecordBatch, ArrowError>;
1777
1778 fn next(&mut self) -> Option<Self::Item> {
1779 self.maybe_next().transpose()
1780 }
1781}
1782
1783impl<R: Read> RecordBatchReader for StreamReader<R> {
1784 fn schema(&self) -> SchemaRef {
1785 self.schema.clone()
1786 }
1787}
1788
1789#[derive(Debug)]
1795#[allow(dead_code)]
1796pub(crate) enum IpcMessage {
1797 Schema(arrow_schema::Schema),
1798 RecordBatch(RecordBatch),
1799 DictionaryBatch {
1800 id: i64,
1801 is_delta: bool,
1802 values: ArrayRef,
1803 },
1804}
1805
1806struct MessageReader<R> {
1809 reader: R,
1810 buf: Vec<u8>,
1811}
1812
1813impl<R: Read> MessageReader<R> {
1814 fn new(reader: R) -> Self {
1815 Self {
1816 reader,
1817 buf: Vec::new(),
1818 }
1819 }
1820
1821 fn maybe_next(&mut self) -> Result<Option<(Message::Message<'_>, MutableBuffer)>, ArrowError> {
1832 let meta_len = self.read_meta_len()?;
1833 let Some(meta_len) = meta_len else {
1834 return Ok(None);
1835 };
1836
1837 self.buf.resize(meta_len, 0);
1838 self.reader.read_exact(&mut self.buf)?;
1839
1840 let message = crate::root_as_message(self.buf.as_slice()).map_err(|err| {
1841 ArrowError::ParseError(format!("Unable to get root as message: {err:?}"))
1842 })?;
1843
1844 let mut buf = MutableBuffer::from_len_zeroed(message.bodyLength() as usize);
1845 self.reader.read_exact(&mut buf)?;
1846
1847 Ok(Some((message, buf)))
1848 }
1849
1850 fn inner_mut(&mut self) -> &mut R {
1852 &mut self.reader
1853 }
1854
1855 fn inner(&self) -> &R {
1857 &self.reader
1858 }
1859
1860 pub fn read_meta_len(&mut self) -> Result<Option<usize>, ArrowError> {
1869 let mut meta_len: [u8; 4] = [0; 4];
1870 match self.reader.read_exact(&mut meta_len) {
1871 Ok(_) => {}
1872 Err(e) => {
1873 return if e.kind() == std::io::ErrorKind::UnexpectedEof {
1874 Ok(None)
1878 } else {
1879 Err(ArrowError::from(e))
1880 };
1881 }
1882 };
1883
1884 let meta_len = {
1885 if meta_len == CONTINUATION_MARKER {
1888 self.reader.read_exact(&mut meta_len)?;
1889 }
1890
1891 i32::from_le_bytes(meta_len)
1892 };
1893
1894 if meta_len == 0 {
1895 return Ok(None);
1896 }
1897
1898 let meta_len = usize::try_from(meta_len)
1899 .map_err(|_| ArrowError::ParseError(format!("Invalid metadata length: {meta_len}")))?;
1900
1901 Ok(Some(meta_len))
1902 }
1903}
1904
1905#[cfg(test)]
1906mod tests {
1907 use std::io::Cursor;
1908
1909 use crate::convert::fb_to_schema;
1910 use crate::writer::{
1911 DictionaryTracker, IpcDataGenerator, IpcWriteOptions, unslice_run_array, write_message,
1912 };
1913
1914 use super::*;
1915
1916 use crate::{root_as_footer, root_as_message, size_prefixed_root_as_message};
1917 use arrow_array::builder::{PrimitiveRunBuilder, UnionBuilder};
1918 use arrow_array::types::*;
1919 use arrow_buffer::{NullBuffer, OffsetBuffer};
1920 use arrow_data::ArrayDataBuilder;
1921
1922 fn create_test_projection_schema() -> Schema {
1923 let list_data_type = DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true)));
1925
1926 let fixed_size_list_data_type =
1927 DataType::FixedSizeList(Arc::new(Field::new_list_field(DataType::Int32, false)), 3);
1928
1929 let union_fields = UnionFields::from_fields(vec![
1930 Field::new("a", DataType::Int32, false),
1931 Field::new("b", DataType::Float64, false),
1932 ]);
1933
1934 let union_data_type = DataType::Union(union_fields, UnionMode::Dense);
1935
1936 let struct_fields = Fields::from(vec![
1937 Field::new("id", DataType::Int32, false),
1938 Field::new_list("list", Field::new_list_field(DataType::Int8, true), false),
1939 ]);
1940 let struct_data_type = DataType::Struct(struct_fields);
1941
1942 let run_encoded_data_type = DataType::RunEndEncoded(
1943 Arc::new(Field::new("run_ends", DataType::Int16, false)),
1944 Arc::new(Field::new("values", DataType::Int32, true)),
1945 );
1946
1947 Schema::new(vec![
1949 Field::new("f0", DataType::UInt32, false),
1950 Field::new("f1", DataType::Utf8, false),
1951 Field::new("f2", DataType::Boolean, false),
1952 Field::new("f3", union_data_type, true),
1953 Field::new("f4", DataType::Null, true),
1954 Field::new("f5", DataType::Float64, true),
1955 Field::new("f6", list_data_type, false),
1956 Field::new("f7", DataType::FixedSizeBinary(3), true),
1957 Field::new("f8", fixed_size_list_data_type, false),
1958 Field::new("f9", struct_data_type, false),
1959 Field::new("f10", run_encoded_data_type, false),
1960 Field::new("f11", DataType::Boolean, false),
1961 Field::new_dictionary("f12", DataType::Int8, DataType::Utf8, false),
1962 Field::new("f13", DataType::Utf8, false),
1963 ])
1964 }
1965
1966 fn create_test_projection_batch_data(schema: &Schema) -> RecordBatch {
1967 let array0 = UInt32Array::from(vec![1, 2, 3]);
1969 let array1 = StringArray::from(vec!["foo", "bar", "baz"]);
1970 let array2 = BooleanArray::from(vec![true, false, true]);
1971
1972 let mut union_builder = UnionBuilder::new_dense();
1973 union_builder.append::<Int32Type>("a", 1).unwrap();
1974 union_builder.append::<Float64Type>("b", 10.1).unwrap();
1975 union_builder.append_null::<Float64Type>("b").unwrap();
1976 let array3 = union_builder.build().unwrap();
1977
1978 let array4 = NullArray::new(3);
1979 let array5 = Float64Array::from(vec![Some(1.1), None, Some(3.3)]);
1980 let array6_values = vec![
1981 Some(vec![Some(10), Some(10), Some(10)]),
1982 Some(vec![Some(20), Some(20), Some(20)]),
1983 Some(vec![Some(30), Some(30)]),
1984 ];
1985 let array6 = ListArray::from_iter_primitive::<Int32Type, _, _>(array6_values);
1986 let array7_values = vec![vec![11, 12, 13], vec![22, 23, 24], vec![33, 34, 35]];
1987 let array7 = FixedSizeBinaryArray::try_from_iter(array7_values.into_iter()).unwrap();
1988
1989 let array8_values = ArrayData::builder(DataType::Int32)
1990 .len(9)
1991 .add_buffer(Buffer::from_slice_ref([40, 41, 42, 43, 44, 45, 46, 47, 48]))
1992 .build()
1993 .unwrap();
1994 let array8_data = ArrayData::builder(schema.field(8).data_type().clone())
1995 .len(3)
1996 .add_child_data(array8_values)
1997 .build()
1998 .unwrap();
1999 let array8 = FixedSizeListArray::from(array8_data);
2000
2001 let array9_id: ArrayRef = Arc::new(Int32Array::from(vec![1001, 1002, 1003]));
2002 let array9_list: ArrayRef =
2003 Arc::new(ListArray::from_iter_primitive::<Int8Type, _, _>(vec![
2004 Some(vec![Some(-10)]),
2005 Some(vec![Some(-20), Some(-20), Some(-20)]),
2006 Some(vec![Some(-30)]),
2007 ]));
2008 let array9 = ArrayDataBuilder::new(schema.field(9).data_type().clone())
2009 .add_child_data(array9_id.into_data())
2010 .add_child_data(array9_list.into_data())
2011 .len(3)
2012 .build()
2013 .unwrap();
2014 let array9 = StructArray::from(array9);
2015
2016 let array10_input = vec![Some(1_i32), None, None];
2017 let mut array10_builder = PrimitiveRunBuilder::<Int16Type, Int32Type>::new();
2018 array10_builder.extend(array10_input);
2019 let array10 = array10_builder.finish();
2020
2021 let array11 = BooleanArray::from(vec![false, false, true]);
2022
2023 let array12_values = StringArray::from(vec!["x", "yy", "zzz"]);
2024 let array12_keys = Int8Array::from_iter_values([1, 1, 2]);
2025 let array12 = DictionaryArray::new(array12_keys, Arc::new(array12_values));
2026
2027 let array13 = StringArray::from(vec!["a", "bb", "ccc"]);
2028
2029 RecordBatch::try_new(
2031 Arc::new(schema.clone()),
2032 vec![
2033 Arc::new(array0),
2034 Arc::new(array1),
2035 Arc::new(array2),
2036 Arc::new(array3),
2037 Arc::new(array4),
2038 Arc::new(array5),
2039 Arc::new(array6),
2040 Arc::new(array7),
2041 Arc::new(array8),
2042 Arc::new(array9),
2043 Arc::new(array10),
2044 Arc::new(array11),
2045 Arc::new(array12),
2046 Arc::new(array13),
2047 ],
2048 )
2049 .unwrap()
2050 }
2051
2052 #[test]
2053 fn test_negative_meta_len_start_stream() {
2054 let bytes = i32::to_le_bytes(-1);
2055 let mut buf = vec![];
2056 buf.extend(CONTINUATION_MARKER);
2057 buf.extend(bytes);
2058
2059 let reader_err = StreamReader::try_new(Cursor::new(buf), None).err();
2060 assert!(reader_err.is_some());
2061 assert_eq!(
2062 reader_err.unwrap().to_string(),
2063 "Parser error: Invalid metadata length: -1"
2064 );
2065 }
2066
2067 #[test]
2068 fn test_negative_meta_len_mid_stream() {
2069 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
2070 let mut buf = Vec::new();
2071 {
2072 let mut writer = crate::writer::StreamWriter::try_new(&mut buf, &schema).unwrap();
2073 let batch =
2074 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(Int32Array::from(vec![1]))])
2075 .unwrap();
2076 writer.write(&batch).unwrap();
2077 }
2078
2079 let bytes = i32::to_le_bytes(-1);
2080 buf.extend(CONTINUATION_MARKER);
2081 buf.extend(bytes);
2082
2083 let mut reader = StreamReader::try_new(Cursor::new(buf), None).unwrap();
2084 assert!(reader.maybe_next().is_ok());
2086 let batch_err = reader.maybe_next().err();
2088 assert!(batch_err.is_some());
2089 assert_eq!(
2090 batch_err.unwrap().to_string(),
2091 "Parser error: Invalid metadata length: -1"
2092 );
2093 }
2094
2095 #[test]
2096 fn test_missing_buffer_metadata_error() {
2097 use crate::r#gen::Message::*;
2098 use flatbuffers::FlatBufferBuilder;
2099
2100 let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Int32, true)]));
2101
2102 let mut fbb = FlatBufferBuilder::new();
2105 let nodes = fbb.create_vector(&[FieldNode::new(2, 0)]);
2106 let buffers = fbb.create_vector(&[crate::Buffer::new(0, 8)]);
2107 let batch_offset = RecordBatch::create(
2108 &mut fbb,
2109 &RecordBatchArgs {
2110 length: 2,
2111 nodes: Some(nodes),
2112 buffers: Some(buffers),
2113 compression: None,
2114 variadicBufferCounts: None,
2115 },
2116 );
2117 fbb.finish_minimal(batch_offset);
2118 let batch_bytes = fbb.finished_data().to_vec();
2119 let batch = flatbuffers::root::<RecordBatch>(&batch_bytes).unwrap();
2120
2121 let data_buffer = Buffer::from(vec![0u8; 8]);
2122 let dictionaries: HashMap<i64, ArrayRef> = HashMap::new();
2123 let metadata = MetadataVersion::V5;
2124
2125 let decoder = RecordBatchDecoder::try_new(
2126 &data_buffer,
2127 batch,
2128 schema.clone(),
2129 &dictionaries,
2130 &metadata,
2131 )
2132 .unwrap();
2133
2134 let result = decoder.read_record_batch();
2135
2136 match result {
2137 Err(ArrowError::IpcError(msg)) => {
2138 assert_eq!(msg, "Buffer count mismatched with metadata");
2139 }
2140 other => panic!("unexpected error: {other:?}"),
2141 }
2142 }
2143
2144 #[test]
2146 fn test_read_legacy_empty_list_without_offsets_buffer() {
2147 use crate::r#gen::Message::*;
2148 use flatbuffers::FlatBufferBuilder;
2149
2150 let schema = Arc::new(Schema::new(vec![Field::new_list(
2151 "items",
2152 Field::new_list_field(DataType::Int32, true),
2153 true,
2154 )]));
2155
2156 let mut fbb = FlatBufferBuilder::new();
2159 let nodes = fbb.create_vector(&[
2160 FieldNode::new(0, 0), FieldNode::new(0, 0), ]);
2163 let buffers = fbb.create_vector(&[
2164 crate::Buffer::new(0, 0), crate::Buffer::new(0, 0), crate::Buffer::new(0, 0), crate::Buffer::new(0, 0), ]);
2169 let batch_offset = RecordBatch::create(
2170 &mut fbb,
2171 &RecordBatchArgs {
2172 length: 0,
2173 nodes: Some(nodes),
2174 buffers: Some(buffers),
2175 compression: None,
2176 variadicBufferCounts: None,
2177 },
2178 );
2179 fbb.finish_minimal(batch_offset);
2180 let batch_bytes = fbb.finished_data().to_vec();
2181 let batch = flatbuffers::root::<RecordBatch>(&batch_bytes).unwrap();
2182
2183 let body = Buffer::from(Vec::<u8>::new());
2184 let dictionaries: HashMap<i64, ArrayRef> = HashMap::new();
2185 let metadata = MetadataVersion::V5;
2186
2187 let decoder =
2188 RecordBatchDecoder::try_new(&body, batch, schema.clone(), &dictionaries, &metadata)
2189 .unwrap();
2190
2191 let read_batch = decoder.read_record_batch().unwrap();
2192 assert_eq!(read_batch.num_rows(), 0);
2193
2194 let list = read_batch
2195 .column(0)
2196 .as_any()
2197 .downcast_ref::<ListArray>()
2198 .unwrap();
2199 assert_eq!(list.len(), 0);
2200 assert_eq!(list.values().len(), 0);
2201 }
2202
2203 #[test]
2205 fn test_read_legacy_empty_utf8_and_binary_without_offsets_buffer() {
2206 use crate::r#gen::Message::*;
2207 use flatbuffers::FlatBufferBuilder;
2208
2209 let schema = Arc::new(Schema::new(vec![
2210 Field::new("name", DataType::Utf8, true),
2211 Field::new("payload", DataType::Binary, true),
2212 ]));
2213
2214 let mut fbb = FlatBufferBuilder::new();
2217 let nodes = fbb.create_vector(&[
2218 FieldNode::new(0, 0), FieldNode::new(0, 0), ]);
2221 let buffers = fbb.create_vector(&[
2222 crate::Buffer::new(0, 0), crate::Buffer::new(0, 0), crate::Buffer::new(0, 0), crate::Buffer::new(0, 0), crate::Buffer::new(0, 0), crate::Buffer::new(0, 0), ]);
2229 let batch_offset = RecordBatch::create(
2230 &mut fbb,
2231 &RecordBatchArgs {
2232 length: 0,
2233 nodes: Some(nodes),
2234 buffers: Some(buffers),
2235 compression: None,
2236 variadicBufferCounts: None,
2237 },
2238 );
2239 fbb.finish_minimal(batch_offset);
2240 let batch_bytes = fbb.finished_data().to_vec();
2241 let batch = flatbuffers::root::<RecordBatch>(&batch_bytes).unwrap();
2242
2243 let body = Buffer::from(Vec::<u8>::new());
2244 let dictionaries: HashMap<i64, ArrayRef> = HashMap::new();
2245 let metadata = MetadataVersion::V5;
2246
2247 let decoder =
2248 RecordBatchDecoder::try_new(&body, batch, schema.clone(), &dictionaries, &metadata)
2249 .unwrap();
2250
2251 let read_batch = decoder.read_record_batch().unwrap();
2252 assert_eq!(read_batch.num_rows(), 0);
2253
2254 let utf8 = read_batch
2255 .column(0)
2256 .as_any()
2257 .downcast_ref::<StringArray>()
2258 .unwrap();
2259 assert_eq!(utf8.len(), 0);
2260 assert_eq!(utf8.value_offsets(), [0]);
2261
2262 let binary = read_batch
2263 .column(1)
2264 .as_any()
2265 .downcast_ref::<BinaryArray>()
2266 .unwrap();
2267 assert_eq!(binary.len(), 0);
2268 assert_eq!(binary.value_offsets(), [0]);
2269 }
2270
2271 #[test]
2272 fn test_projection_array_values() {
2273 let schema = create_test_projection_schema();
2275
2276 let batch = create_test_projection_batch_data(&schema);
2278
2279 let mut buf = Vec::new();
2281 {
2282 let mut writer = crate::writer::FileWriter::try_new(&mut buf, &schema).unwrap();
2283 writer.write(&batch).unwrap();
2284 writer.finish().unwrap();
2285 }
2286
2287 for index in 0..12 {
2289 let projection = vec![index];
2290 let reader = FileReader::try_new(std::io::Cursor::new(buf.clone()), Some(projection));
2291 let read_batch = reader.unwrap().next().unwrap().unwrap();
2292 let projected_column = read_batch.column(0);
2293 let expected_column = batch.column(index);
2294
2295 assert_eq!(projected_column.as_ref(), expected_column.as_ref());
2297 }
2298
2299 {
2300 let reader =
2302 FileReader::try_new(std::io::Cursor::new(buf.clone()), Some(vec![3, 2, 1]));
2303 let read_batch = reader.unwrap().next().unwrap().unwrap();
2304 let expected_batch = batch.project(&[3, 2, 1]).unwrap();
2305 assert_eq!(read_batch, expected_batch);
2306 }
2307 }
2308
2309 #[test]
2310 fn test_projection_duplicate_indices() {
2311 let schema = create_test_projection_schema();
2312 let batch = create_test_projection_batch_data(&schema);
2313
2314 let mut buf = Vec::new();
2316 {
2317 let mut writer = crate::writer::FileWriter::try_new(&mut buf, &schema).unwrap();
2318 writer.write(&batch).unwrap();
2319 writer.finish().unwrap();
2320 }
2321
2322 for projection in [vec![1, 1], vec![2, 0, 2]] {
2324 let reader =
2325 FileReader::try_new(std::io::Cursor::new(buf.clone()), Some(projection.clone()));
2326 let read_batch = reader.unwrap().next().unwrap().unwrap();
2327
2328 let expected_batch = batch.project(&projection).unwrap();
2329 assert_eq!(read_batch, expected_batch);
2330 }
2331 }
2332
2333 #[test]
2334 fn test_arrow_single_float_row() {
2335 let schema = Schema::new(vec![
2336 Field::new("a", DataType::Float32, false),
2337 Field::new("b", DataType::Float32, false),
2338 Field::new("c", DataType::Int32, false),
2339 Field::new("d", DataType::Int32, false),
2340 ]);
2341 let arrays = vec![
2342 Arc::new(Float32Array::from(vec![1.23])) as ArrayRef,
2343 Arc::new(Float32Array::from(vec![-6.50])) as ArrayRef,
2344 Arc::new(Int32Array::from(vec![2])) as ArrayRef,
2345 Arc::new(Int32Array::from(vec![1])) as ArrayRef,
2346 ];
2347 let batch = RecordBatch::try_new(Arc::new(schema.clone()), arrays).unwrap();
2348 let mut file = tempfile::tempfile().unwrap();
2350 let mut stream_writer = crate::writer::StreamWriter::try_new(&mut file, &schema).unwrap();
2351 stream_writer.write(&batch).unwrap();
2352 stream_writer.finish().unwrap();
2353
2354 drop(stream_writer);
2355
2356 file.rewind().unwrap();
2357
2358 let reader = StreamReader::try_new(&mut file, None).unwrap();
2360
2361 reader.for_each(|batch| {
2362 let batch = batch.unwrap();
2363 assert!(
2364 batch
2365 .column(0)
2366 .as_any()
2367 .downcast_ref::<Float32Array>()
2368 .unwrap()
2369 .value(0)
2370 != 0.0
2371 );
2372 assert!(
2373 batch
2374 .column(1)
2375 .as_any()
2376 .downcast_ref::<Float32Array>()
2377 .unwrap()
2378 .value(0)
2379 != 0.0
2380 );
2381 });
2382
2383 file.rewind().unwrap();
2384
2385 let reader = StreamReader::try_new(file, Some(vec![0, 3])).unwrap();
2387
2388 reader.for_each(|batch| {
2389 let batch = batch.unwrap();
2390 assert_eq!(batch.schema().fields().len(), 2);
2391 assert_eq!(batch.schema().fields()[0].data_type(), &DataType::Float32);
2392 assert_eq!(batch.schema().fields()[1].data_type(), &DataType::Int32);
2393 });
2394 }
2395
2396 fn write_ipc(rb: &RecordBatch) -> Vec<u8> {
2398 let mut buf = Vec::new();
2399 let mut writer = crate::writer::FileWriter::try_new(&mut buf, rb.schema_ref()).unwrap();
2400 writer.write(rb).unwrap();
2401 writer.finish().unwrap();
2402 buf
2403 }
2404
2405 fn read_ipc(buf: &[u8]) -> Result<RecordBatch, ArrowError> {
2407 let mut reader = FileReader::try_new(std::io::Cursor::new(buf), None)?;
2408 reader.next().unwrap()
2409 }
2410
2411 fn read_ipc_skip_validation(buf: &[u8]) -> Result<RecordBatch, ArrowError> {
2414 let mut reader = unsafe {
2415 FileReader::try_new(std::io::Cursor::new(buf), None)?.with_skip_validation(true)
2416 };
2417 reader.next().unwrap()
2418 }
2419
2420 fn roundtrip_ipc(rb: &RecordBatch) -> RecordBatch {
2421 let buf = write_ipc(rb);
2422 read_ipc(&buf).unwrap()
2423 }
2424
2425 fn read_ipc_with_decoder(buf: Vec<u8>) -> Result<RecordBatch, ArrowError> {
2428 read_ipc_with_decoder_inner(buf, false)
2429 }
2430
2431 fn read_ipc_with_decoder_skip_validation(buf: Vec<u8>) -> Result<RecordBatch, ArrowError> {
2434 read_ipc_with_decoder_inner(buf, true)
2435 }
2436
2437 fn read_ipc_with_decoder_inner(
2438 buf: Vec<u8>,
2439 skip_validation: bool,
2440 ) -> Result<RecordBatch, ArrowError> {
2441 let buffer = Buffer::from_vec(buf);
2442 let trailer_start = buffer.len() - 10;
2443 let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap())?;
2444 let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start])
2445 .map_err(|e| ArrowError::InvalidArgumentError(format!("Invalid footer: {e}")))?;
2446
2447 let schema = fb_to_schema(footer.schema().unwrap());
2448
2449 let mut decoder = unsafe {
2450 FileDecoder::new(Arc::new(schema), footer.version())
2451 .with_skip_validation(skip_validation)
2452 };
2453 for block in footer.dictionaries().iter().flatten() {
2455 let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
2456 let data = buffer.slice_with_length(block.offset() as _, block_len);
2457 decoder.read_dictionary(block, &data)?
2458 }
2459
2460 let batches = footer.recordBatches().unwrap();
2462 assert_eq!(batches.len(), 1); let block = batches.get(0);
2465 let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
2466 let data = buffer.slice_with_length(block.offset() as _, block_len);
2467 Ok(decoder.read_record_batch(block, &data)?.unwrap())
2468 }
2469
2470 fn write_stream(rb: &RecordBatch) -> Vec<u8> {
2472 let mut buf = Vec::new();
2473 let mut writer = crate::writer::StreamWriter::try_new(&mut buf, rb.schema_ref()).unwrap();
2474 writer.write(rb).unwrap();
2475 writer.finish().unwrap();
2476 buf
2477 }
2478
2479 fn read_stream(buf: &[u8]) -> Result<RecordBatch, ArrowError> {
2481 let mut reader = StreamReader::try_new(std::io::Cursor::new(buf), None)?;
2482 reader.next().unwrap()
2483 }
2484
2485 fn read_stream_skip_validation(buf: &[u8]) -> Result<RecordBatch, ArrowError> {
2488 let mut reader = unsafe {
2489 StreamReader::try_new(std::io::Cursor::new(buf), None)?.with_skip_validation(true)
2490 };
2491 reader.next().unwrap()
2492 }
2493
2494 fn roundtrip_ipc_stream(rb: &RecordBatch) -> RecordBatch {
2495 let buf = write_stream(rb);
2496 read_stream(&buf).unwrap()
2497 }
2498
2499 #[test]
2500 fn test_roundtrip_with_custom_metadata() {
2501 let schema = Schema::new(vec![Field::new("dummy", DataType::Float64, false)]);
2502 let mut buf = Vec::new();
2503 let mut writer = crate::writer::FileWriter::try_new(&mut buf, &schema).unwrap();
2504 let mut test_metadata = HashMap::new();
2505 test_metadata.insert("abc".to_string(), "abc".to_string());
2506 test_metadata.insert("def".to_string(), "def".to_string());
2507 for (k, v) in &test_metadata {
2508 writer.write_metadata(k, v);
2509 }
2510 writer.finish().unwrap();
2511 drop(writer);
2512
2513 let reader = crate::reader::FileReader::try_new(std::io::Cursor::new(buf), None).unwrap();
2514 assert_eq!(reader.custom_metadata(), &test_metadata);
2515 }
2516
2517 #[test]
2518 fn test_roundtrip_nested_dict() {
2519 let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2520
2521 let array = Arc::new(inner) as ArrayRef;
2522
2523 let dctfield = Arc::new(Field::new("dict", array.data_type().clone(), false));
2524
2525 let s = StructArray::from(vec![(dctfield, array)]);
2526 let struct_array = Arc::new(s) as ArrayRef;
2527
2528 let schema = Arc::new(Schema::new(vec![Field::new(
2529 "struct",
2530 struct_array.data_type().clone(),
2531 false,
2532 )]));
2533
2534 let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
2535
2536 assert_eq!(batch, roundtrip_ipc(&batch));
2537 }
2538
2539 #[test]
2540 fn test_roundtrip_nested_dict_no_preserve_dict_id() {
2541 let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2542
2543 let array = Arc::new(inner) as ArrayRef;
2544
2545 let dctfield = Arc::new(Field::new("dict", array.data_type().clone(), false));
2546
2547 let s = StructArray::from(vec![(dctfield, array)]);
2548 let struct_array = Arc::new(s) as ArrayRef;
2549
2550 let schema = Arc::new(Schema::new(vec![Field::new(
2551 "struct",
2552 struct_array.data_type().clone(),
2553 false,
2554 )]));
2555
2556 let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
2557
2558 let mut buf = Vec::new();
2559 let mut writer = crate::writer::FileWriter::try_new_with_options(
2560 &mut buf,
2561 batch.schema_ref(),
2562 IpcWriteOptions::default(),
2563 )
2564 .unwrap();
2565 writer.write(&batch).unwrap();
2566 writer.finish().unwrap();
2567 drop(writer);
2568
2569 let mut reader = FileReader::try_new(std::io::Cursor::new(buf), None).unwrap();
2570
2571 assert_eq!(batch, reader.next().unwrap().unwrap());
2572 }
2573
2574 fn check_union_with_builder(mut builder: UnionBuilder) {
2575 builder.append::<Int32Type>("a", 1).unwrap();
2576 builder.append_null::<Int32Type>("a").unwrap();
2577 builder.append::<Float64Type>("c", 3.0).unwrap();
2578 builder.append::<Int32Type>("a", 4).unwrap();
2579 builder.append::<Int64Type>("d", 11).unwrap();
2580 let union = builder.build().unwrap();
2581
2582 let schema = Arc::new(Schema::new(vec![Field::new(
2583 "union",
2584 union.data_type().clone(),
2585 false,
2586 )]));
2587
2588 let union_array = Arc::new(union) as ArrayRef;
2589
2590 let rb = RecordBatch::try_new(schema, vec![union_array]).unwrap();
2591 let rb2 = roundtrip_ipc(&rb);
2592 assert_eq!(rb.schema(), rb2.schema());
2595 assert_eq!(rb.num_columns(), rb2.num_columns());
2596 assert_eq!(rb.num_rows(), rb2.num_rows());
2597 let union1 = rb.column(0);
2598 let union2 = rb2.column(0);
2599
2600 assert_eq!(union1, union2);
2601 }
2602
2603 #[test]
2604 fn test_roundtrip_dense_union() {
2605 check_union_with_builder(UnionBuilder::new_dense());
2606 }
2607
2608 #[test]
2609 fn test_roundtrip_sparse_union() {
2610 check_union_with_builder(UnionBuilder::new_sparse());
2611 }
2612
2613 #[test]
2614 fn test_roundtrip_struct_empty_fields() {
2615 let nulls = NullBuffer::from(&[true, true, false]);
2616 let rb = RecordBatch::try_from_iter([(
2617 "",
2618 Arc::new(StructArray::new_empty_fields(nulls.len(), Some(nulls))) as _,
2619 )])
2620 .unwrap();
2621 let rb2 = roundtrip_ipc(&rb);
2622 assert_eq!(rb, rb2);
2623 }
2624
2625 #[test]
2626 fn test_roundtrip_stream_run_array_sliced() {
2627 let run_array_1: Int32RunArray = vec!["a", "a", "a", "b", "b", "c", "c", "c"]
2628 .into_iter()
2629 .collect();
2630 let run_array_1_sliced = run_array_1.slice(2, 5);
2631
2632 let run_array_2_inupt = vec![Some(1_i32), None, None, Some(2), Some(2)];
2633 let mut run_array_2_builder = PrimitiveRunBuilder::<Int16Type, Int32Type>::new();
2634 run_array_2_builder.extend(run_array_2_inupt);
2635 let run_array_2 = run_array_2_builder.finish();
2636
2637 let schema = Arc::new(Schema::new(vec![
2638 Field::new(
2639 "run_array_1_sliced",
2640 run_array_1_sliced.data_type().clone(),
2641 false,
2642 ),
2643 Field::new("run_array_2", run_array_2.data_type().clone(), false),
2644 ]));
2645 let input_batch = RecordBatch::try_new(
2646 schema,
2647 vec![Arc::new(run_array_1_sliced.clone()), Arc::new(run_array_2)],
2648 )
2649 .unwrap();
2650 let output_batch = roundtrip_ipc_stream(&input_batch);
2651
2652 assert_eq!(input_batch.column(1), output_batch.column(1));
2656
2657 let run_array_1_unsliced = unslice_run_array(run_array_1_sliced.into_data()).unwrap();
2658 assert_eq!(run_array_1_unsliced, output_batch.column(0).into_data());
2659 }
2660
2661 #[test]
2662 fn test_roundtrip_stream_nested_dict() {
2663 let xs = vec!["AA", "BB", "AA", "CC", "BB"];
2664 let dict = Arc::new(
2665 xs.clone()
2666 .into_iter()
2667 .collect::<DictionaryArray<Int8Type>>(),
2668 );
2669 let string_array: ArrayRef = Arc::new(StringArray::from(xs.clone()));
2670 let struct_array = StructArray::from(vec![
2671 (
2672 Arc::new(Field::new("f2.1", DataType::Utf8, false)),
2673 string_array,
2674 ),
2675 (
2676 Arc::new(Field::new("f2.2_struct", dict.data_type().clone(), false)),
2677 dict.clone() as ArrayRef,
2678 ),
2679 ]);
2680 let schema = Arc::new(Schema::new(vec![
2681 Field::new("f1_string", DataType::Utf8, false),
2682 Field::new("f2_struct", struct_array.data_type().clone(), false),
2683 ]));
2684 let input_batch = RecordBatch::try_new(
2685 schema,
2686 vec![
2687 Arc::new(StringArray::from(xs.clone())),
2688 Arc::new(struct_array),
2689 ],
2690 )
2691 .unwrap();
2692 let output_batch = roundtrip_ipc_stream(&input_batch);
2693 assert_eq!(input_batch, output_batch);
2694 }
2695
2696 #[test]
2697 fn test_roundtrip_stream_nested_dict_of_map_of_dict() {
2698 let values = StringArray::from(vec![Some("a"), None, Some("b"), Some("c")]);
2699 let values = Arc::new(values) as ArrayRef;
2700 let value_dict_keys = Int8Array::from_iter_values([0, 1, 1, 2, 3, 1]);
2701 let value_dict_array = DictionaryArray::new(value_dict_keys, values.clone());
2702
2703 let key_dict_keys = Int8Array::from_iter_values([0, 0, 2, 1, 1, 3]);
2704 let key_dict_array = DictionaryArray::new(key_dict_keys, values);
2705
2706 #[allow(deprecated)]
2707 let keys_field = Arc::new(Field::new_dict(
2708 "keys",
2709 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2710 true, 1,
2712 false,
2713 ));
2714 #[allow(deprecated)]
2715 let values_field = Arc::new(Field::new_dict(
2716 "values",
2717 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2718 true,
2719 2,
2720 false,
2721 ));
2722 let entry_struct = StructArray::from(vec![
2723 (keys_field, make_array(key_dict_array.into_data())),
2724 (values_field, make_array(value_dict_array.into_data())),
2725 ]);
2726 let map_data_type = DataType::Map(
2727 Arc::new(Field::new(
2728 "entries",
2729 entry_struct.data_type().clone(),
2730 false,
2731 )),
2732 false,
2733 );
2734
2735 let entry_offsets = Buffer::from_slice_ref([0, 2, 4, 6]);
2736 let map_data = ArrayData::builder(map_data_type)
2737 .len(3)
2738 .add_buffer(entry_offsets)
2739 .add_child_data(entry_struct.into_data())
2740 .build()
2741 .unwrap();
2742 let map_array = MapArray::from(map_data);
2743
2744 let dict_keys = Int8Array::from_iter_values([0, 1, 1, 2, 2, 1]);
2745 let dict_dict_array = DictionaryArray::new(dict_keys, Arc::new(map_array));
2746
2747 let schema = Arc::new(Schema::new(vec![Field::new(
2748 "f1",
2749 dict_dict_array.data_type().clone(),
2750 false,
2751 )]));
2752 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2753 let output_batch = roundtrip_ipc_stream(&input_batch);
2754 assert_eq!(input_batch, output_batch);
2755 }
2756
2757 fn test_roundtrip_stream_dict_of_list_of_dict_impl<
2758 OffsetSize: OffsetSizeTrait,
2759 U: ArrowNativeType,
2760 >(
2761 list_data_type: DataType,
2762 offsets: &[U; 5],
2763 ) {
2764 let values = StringArray::from(vec![Some("a"), None, Some("c"), None]);
2765 let keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1, 3]);
2766 let dict_array = DictionaryArray::new(keys, Arc::new(values));
2767 let dict_data = dict_array.to_data();
2768
2769 let value_offsets = Buffer::from_slice_ref(offsets);
2770
2771 let list_data = ArrayData::builder(list_data_type)
2772 .len(4)
2773 .add_buffer(value_offsets)
2774 .add_child_data(dict_data)
2775 .build()
2776 .unwrap();
2777 let list_array = GenericListArray::<OffsetSize>::from(list_data);
2778
2779 let keys_for_dict = Int8Array::from_iter_values([0, 3, 0, 1, 1, 2, 0, 1, 3]);
2780 let dict_dict_array = DictionaryArray::new(keys_for_dict, Arc::new(list_array));
2781
2782 let schema = Arc::new(Schema::new(vec![Field::new(
2783 "f1",
2784 dict_dict_array.data_type().clone(),
2785 false,
2786 )]));
2787 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2788 let output_batch = roundtrip_ipc_stream(&input_batch);
2789 assert_eq!(input_batch, output_batch);
2790 }
2791
2792 #[test]
2793 fn test_roundtrip_stream_dict_of_list_of_dict() {
2794 #[allow(deprecated)]
2796 let list_data_type = DataType::List(Arc::new(Field::new_dict(
2797 "item",
2798 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2799 true,
2800 1,
2801 false,
2802 )));
2803 let offsets: &[i32; 5] = &[0, 2, 4, 4, 6];
2804 test_roundtrip_stream_dict_of_list_of_dict_impl::<i32, i32>(list_data_type, offsets);
2805
2806 #[allow(deprecated)]
2808 let list_data_type = DataType::LargeList(Arc::new(Field::new_dict(
2809 "item",
2810 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2811 true,
2812 1,
2813 false,
2814 )));
2815 let offsets: &[i64; 5] = &[0, 2, 4, 4, 7];
2816 test_roundtrip_stream_dict_of_list_of_dict_impl::<i64, i64>(list_data_type, offsets);
2817 }
2818
2819 #[test]
2820 fn test_roundtrip_stream_dict_of_fixed_size_list_of_dict() {
2821 let values = StringArray::from(vec![Some("a"), None, Some("c"), None]);
2822 let keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1, 3, 1, 2]);
2823 let dict_array = DictionaryArray::new(keys, Arc::new(values));
2824 let dict_data = dict_array.into_data();
2825
2826 #[allow(deprecated)]
2827 let list_data_type = DataType::FixedSizeList(
2828 Arc::new(Field::new_dict(
2829 "item",
2830 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2831 true,
2832 1,
2833 false,
2834 )),
2835 3,
2836 );
2837 let list_data = ArrayData::builder(list_data_type)
2838 .len(3)
2839 .add_child_data(dict_data)
2840 .build()
2841 .unwrap();
2842 let list_array = FixedSizeListArray::from(list_data);
2843
2844 let keys_for_dict = Int8Array::from_iter_values([0, 1, 0, 1, 1, 2, 0, 1, 2]);
2845 let dict_dict_array = DictionaryArray::new(keys_for_dict, Arc::new(list_array));
2846
2847 let schema = Arc::new(Schema::new(vec![Field::new(
2848 "f1",
2849 dict_dict_array.data_type().clone(),
2850 false,
2851 )]));
2852 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2853 let output_batch = roundtrip_ipc_stream(&input_batch);
2854 assert_eq!(input_batch, output_batch);
2855 }
2856
2857 const LONG_TEST_STRING: &str =
2858 "This is a long string to make sure binary view array handles it";
2859
2860 #[test]
2861 fn test_roundtrip_view_types() {
2862 let schema = Schema::new(vec![
2863 Field::new("field_1", DataType::BinaryView, true),
2864 Field::new("field_2", DataType::Utf8, true),
2865 Field::new("field_3", DataType::Utf8View, true),
2866 ]);
2867 let bin_values: Vec<Option<&[u8]>> = vec![
2868 Some(b"foo"),
2869 None,
2870 Some(b"bar"),
2871 Some(LONG_TEST_STRING.as_bytes()),
2872 ];
2873 let utf8_values: Vec<Option<&str>> =
2874 vec![Some("foo"), None, Some("bar"), Some(LONG_TEST_STRING)];
2875 let bin_view_array = BinaryViewArray::from_iter(bin_values);
2876 let utf8_array = StringArray::from_iter(utf8_values.iter());
2877 let utf8_view_array = StringViewArray::from_iter(utf8_values);
2878 let record_batch = RecordBatch::try_new(
2879 Arc::new(schema.clone()),
2880 vec![
2881 Arc::new(bin_view_array),
2882 Arc::new(utf8_array),
2883 Arc::new(utf8_view_array),
2884 ],
2885 )
2886 .unwrap();
2887
2888 assert_eq!(record_batch, roundtrip_ipc(&record_batch));
2889 assert_eq!(record_batch, roundtrip_ipc_stream(&record_batch));
2890
2891 let sliced_batch = record_batch.slice(1, 2);
2892 assert_eq!(sliced_batch, roundtrip_ipc(&sliced_batch));
2893 assert_eq!(sliced_batch, roundtrip_ipc_stream(&sliced_batch));
2894 }
2895
2896 #[test]
2897 fn test_roundtrip_view_types_nested_dict() {
2898 let bin_values: Vec<Option<&[u8]>> = vec![
2899 Some(b"foo"),
2900 None,
2901 Some(b"bar"),
2902 Some(LONG_TEST_STRING.as_bytes()),
2903 Some(b"field"),
2904 ];
2905 let utf8_values: Vec<Option<&str>> = vec![
2906 Some("foo"),
2907 None,
2908 Some("bar"),
2909 Some(LONG_TEST_STRING),
2910 Some("field"),
2911 ];
2912 let bin_view_array = Arc::new(BinaryViewArray::from_iter(bin_values));
2913 let utf8_view_array = Arc::new(StringViewArray::from_iter(utf8_values));
2914
2915 let key_dict_keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1, 3]);
2916 let key_dict_array = DictionaryArray::new(key_dict_keys, utf8_view_array.clone());
2917 #[allow(deprecated)]
2918 let keys_field = Arc::new(Field::new_dict(
2919 "keys",
2920 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8View)),
2921 true,
2922 1,
2923 false,
2924 ));
2925
2926 let value_dict_keys = Int8Array::from_iter_values([0, 3, 0, 1, 2, 0, 1]);
2927 let value_dict_array = DictionaryArray::new(value_dict_keys, bin_view_array);
2928 #[allow(deprecated)]
2929 let values_field = Arc::new(Field::new_dict(
2930 "values",
2931 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::BinaryView)),
2932 true,
2933 2,
2934 false,
2935 ));
2936 let entry_struct = StructArray::from(vec![
2937 (keys_field, make_array(key_dict_array.into_data())),
2938 (values_field, make_array(value_dict_array.into_data())),
2939 ]);
2940
2941 let map_data_type = DataType::Map(
2942 Arc::new(Field::new(
2943 "entries",
2944 entry_struct.data_type().clone(),
2945 false,
2946 )),
2947 false,
2948 );
2949 let entry_offsets = Buffer::from_slice_ref([0, 2, 4, 7]);
2950 let map_data = ArrayData::builder(map_data_type)
2951 .len(3)
2952 .add_buffer(entry_offsets)
2953 .add_child_data(entry_struct.into_data())
2954 .build()
2955 .unwrap();
2956 let map_array = MapArray::from(map_data);
2957
2958 let dict_keys = Int8Array::from_iter_values([0, 1, 0, 1, 1, 2, 0, 1, 2]);
2959 let dict_dict_array = DictionaryArray::new(dict_keys, Arc::new(map_array));
2960 let schema = Arc::new(Schema::new(vec![Field::new(
2961 "f1",
2962 dict_dict_array.data_type().clone(),
2963 false,
2964 )]));
2965 let batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2966 assert_eq!(batch, roundtrip_ipc(&batch));
2967 assert_eq!(batch, roundtrip_ipc_stream(&batch));
2968
2969 let sliced_batch = batch.slice(1, 2);
2970 assert_eq!(sliced_batch, roundtrip_ipc(&sliced_batch));
2971 assert_eq!(sliced_batch, roundtrip_ipc_stream(&sliced_batch));
2972 }
2973
2974 #[test]
2975 fn test_no_columns_batch() {
2976 let schema = Arc::new(Schema::empty());
2977 let options = RecordBatchOptions::new()
2978 .with_match_field_names(true)
2979 .with_row_count(Some(10));
2980 let input_batch = RecordBatch::try_new_with_options(schema, vec![], &options).unwrap();
2981 let output_batch = roundtrip_ipc_stream(&input_batch);
2982 assert_eq!(input_batch, output_batch);
2983 }
2984
2985 #[test]
2986 fn test_unaligned() {
2987 let batch = RecordBatch::try_from_iter(vec![(
2988 "i32",
2989 Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _,
2990 )])
2991 .unwrap();
2992
2993 let r#gen = IpcDataGenerator {};
2994 let mut dict_tracker = DictionaryTracker::new(false);
2995 let (_, encoded) = r#gen
2996 .encode(
2997 &batch,
2998 &mut dict_tracker,
2999 &Default::default(),
3000 &mut Default::default(),
3001 )
3002 .unwrap();
3003
3004 let message = root_as_message(&encoded.ipc_message).unwrap();
3005
3006 let mut buffer = MutableBuffer::with_capacity(encoded.arrow_data.len() + 1);
3008 buffer.push(0_u8);
3009 buffer.extend_from_slice(&encoded.arrow_data);
3010 let b = Buffer::from(buffer).slice(1);
3011 assert_ne!(b.as_ptr().align_offset(8), 0);
3012
3013 let ipc_batch = message.header_as_record_batch().unwrap();
3014 let roundtrip = RecordBatchDecoder::try_new(
3015 &b,
3016 ipc_batch,
3017 batch.schema(),
3018 &Default::default(),
3019 &message.version(),
3020 )
3021 .unwrap()
3022 .with_require_alignment(false)
3023 .read_record_batch()
3024 .unwrap();
3025 assert_eq!(batch, roundtrip);
3026 }
3027
3028 #[test]
3029 fn test_unaligned_throws_error_with_require_alignment() {
3030 let batch = RecordBatch::try_from_iter(vec![(
3031 "i32",
3032 Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _,
3033 )])
3034 .unwrap();
3035
3036 let r#gen = IpcDataGenerator {};
3037 let mut dict_tracker = DictionaryTracker::new(false);
3038 let (_, encoded) = r#gen
3039 .encode(
3040 &batch,
3041 &mut dict_tracker,
3042 &Default::default(),
3043 &mut Default::default(),
3044 )
3045 .unwrap();
3046
3047 let message = root_as_message(&encoded.ipc_message).unwrap();
3048
3049 let mut buffer = MutableBuffer::with_capacity(encoded.arrow_data.len() + 1);
3051 buffer.push(0_u8);
3052 buffer.extend_from_slice(&encoded.arrow_data);
3053 let b = Buffer::from(buffer).slice(1);
3054 assert_ne!(b.as_ptr().align_offset(8), 0);
3055
3056 let ipc_batch = message.header_as_record_batch().unwrap();
3057 let result = RecordBatchDecoder::try_new(
3058 &b,
3059 ipc_batch,
3060 batch.schema(),
3061 &Default::default(),
3062 &message.version(),
3063 )
3064 .unwrap()
3065 .with_require_alignment(true)
3066 .read_record_batch();
3067
3068 let error = result.unwrap_err();
3069 assert_eq!(
3070 error.to_string(),
3071 "Invalid argument error: Misaligned buffers[0] in array of type Int32, \
3072 offset from expected alignment of 4 by 1"
3073 );
3074 }
3075
3076 #[test]
3077 fn test_file_with_massive_column_count() {
3078 let limit = 600_000;
3080
3081 let fields = (0..limit)
3082 .map(|i| Field::new(format!("{i}"), DataType::Boolean, false))
3083 .collect::<Vec<_>>();
3084 let schema = Arc::new(Schema::new(fields));
3085 let batch = RecordBatch::new_empty(schema);
3086
3087 let mut buf = Vec::new();
3088 let mut writer = crate::writer::FileWriter::try_new(&mut buf, batch.schema_ref()).unwrap();
3089 writer.write(&batch).unwrap();
3090 writer.finish().unwrap();
3091 drop(writer);
3092
3093 let mut reader = FileReaderBuilder::new()
3094 .with_max_footer_fb_tables(1_500_000)
3095 .build(std::io::Cursor::new(buf))
3096 .unwrap();
3097 let roundtrip_batch = reader.next().unwrap().unwrap();
3098
3099 assert_eq!(batch, roundtrip_batch);
3100 }
3101
3102 #[test]
3103 fn test_file_with_deeply_nested_columns() {
3104 let limit = 61;
3106
3107 let fields = (0..limit).fold(
3108 vec![Field::new("leaf", DataType::Boolean, false)],
3109 |field, index| vec![Field::new_struct(format!("{index}"), field, false)],
3110 );
3111 let schema = Arc::new(Schema::new(fields));
3112 let batch = RecordBatch::new_empty(schema);
3113
3114 let mut buf = Vec::new();
3115 let mut writer = crate::writer::FileWriter::try_new(&mut buf, batch.schema_ref()).unwrap();
3116 writer.write(&batch).unwrap();
3117 writer.finish().unwrap();
3118 drop(writer);
3119
3120 let mut reader = FileReaderBuilder::new()
3121 .with_max_footer_fb_depth(65)
3122 .build(std::io::Cursor::new(buf))
3123 .unwrap();
3124 let roundtrip_batch = reader.next().unwrap().unwrap();
3125
3126 assert_eq!(batch, roundtrip_batch);
3127 }
3128
3129 #[test]
3130 fn test_invalid_struct_array_ipc_read_errors() {
3131 let a_field = Field::new("a", DataType::Int32, false);
3132 let b_field = Field::new("b", DataType::Int32, false);
3133 let struct_fields = Fields::from(vec![a_field.clone(), b_field.clone()]);
3134
3135 let a_array_data = ArrayData::builder(a_field.data_type().clone())
3136 .len(4)
3137 .add_buffer(Buffer::from_slice_ref([1, 2, 3, 4]))
3138 .build()
3139 .unwrap();
3140 let b_array_data = ArrayData::builder(b_field.data_type().clone())
3141 .len(3)
3142 .add_buffer(Buffer::from_slice_ref([5, 6, 7]))
3143 .build()
3144 .unwrap();
3145
3146 let invalid_struct_arr = unsafe {
3147 StructArray::new_unchecked(
3148 struct_fields,
3149 vec![make_array(a_array_data), make_array(b_array_data)],
3150 None,
3151 )
3152 };
3153
3154 expect_ipc_validation_error(
3155 Arc::new(invalid_struct_arr),
3156 "Invalid argument error: Incorrect array length for StructArray field \"b\", expected 4 got 3",
3157 );
3158 }
3159
3160 #[test]
3161 fn test_invalid_nested_array_ipc_read_errors() {
3162 let a_field = Field::new("a", DataType::Int32, false);
3164 let b_field = Field::new("b", DataType::Utf8, false);
3165
3166 let schema = Arc::new(Schema::new(vec![Field::new_struct(
3167 "s",
3168 vec![a_field.clone(), b_field.clone()],
3169 false,
3170 )]));
3171
3172 let a_array_data = ArrayData::builder(a_field.data_type().clone())
3173 .len(4)
3174 .add_buffer(Buffer::from_slice_ref([1, 2, 3, 4]))
3175 .build()
3176 .unwrap();
3177 let b_array_data = {
3179 let valid: &[u8] = b" ";
3180 let mut invalid = vec![];
3181 invalid.extend_from_slice(b"ValidString");
3182 invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
3183 let binary_array =
3184 BinaryArray::from_iter(vec![None, Some(valid), None, Some(&invalid)]);
3185 let array = unsafe {
3186 StringArray::new_unchecked(
3187 binary_array.offsets().clone(),
3188 binary_array.values().clone(),
3189 binary_array.nulls().cloned(),
3190 )
3191 };
3192 array.into_data()
3193 };
3194 let struct_data_type = schema.field(0).data_type();
3195
3196 let invalid_struct_arr = unsafe {
3197 make_array(
3198 ArrayData::builder(struct_data_type.clone())
3199 .len(4)
3200 .add_child_data(a_array_data)
3201 .add_child_data(b_array_data)
3202 .build_unchecked(),
3203 )
3204 };
3205 expect_ipc_validation_error(
3206 invalid_struct_arr,
3207 "Invalid argument error: Invalid UTF8 sequence at string index 3 (3..18): invalid utf-8 sequence of 1 bytes from index 11",
3208 );
3209 }
3210
3211 #[test]
3212 fn test_same_dict_id_without_preserve() {
3213 let batch = RecordBatch::try_new(
3214 Arc::new(Schema::new(
3215 ["a", "b"]
3216 .iter()
3217 .map(|name| {
3218 #[allow(deprecated)]
3219 Field::new_dict(
3220 name.to_string(),
3221 DataType::Dictionary(
3222 Box::new(DataType::Int32),
3223 Box::new(DataType::Utf8),
3224 ),
3225 true,
3226 0,
3227 false,
3228 )
3229 })
3230 .collect::<Vec<Field>>(),
3231 )),
3232 vec![
3233 Arc::new(
3234 vec![Some("c"), Some("d")]
3235 .into_iter()
3236 .collect::<DictionaryArray<Int32Type>>(),
3237 ) as ArrayRef,
3238 Arc::new(
3239 vec![Some("e"), Some("f")]
3240 .into_iter()
3241 .collect::<DictionaryArray<Int32Type>>(),
3242 ) as ArrayRef,
3243 ],
3244 )
3245 .expect("Failed to create RecordBatch");
3246
3247 let mut buf = vec![];
3249 {
3250 let mut writer = crate::writer::StreamWriter::try_new_with_options(
3251 &mut buf,
3252 batch.schema().as_ref(),
3253 crate::writer::IpcWriteOptions::default(),
3254 )
3255 .expect("Failed to create StreamWriter");
3256 writer.write(&batch).expect("Failed to write RecordBatch");
3257 writer.finish().expect("Failed to finish StreamWriter");
3258 }
3259
3260 StreamReader::try_new(std::io::Cursor::new(buf), None)
3261 .expect("Failed to create StreamReader")
3262 .for_each(|decoded_batch| {
3263 assert_eq!(decoded_batch.expect("Failed to read RecordBatch"), batch);
3264 });
3265 }
3266
3267 #[test]
3268 fn test_validation_of_invalid_list_array() {
3269 let array = unsafe {
3271 let values = Int32Array::from(vec![1, 2, 3]);
3272 let bad_offsets = ScalarBuffer::<i32>::from(vec![0, 2, 4, 2]); let offsets = OffsetBuffer::new_unchecked(bad_offsets); let field = Field::new_list_field(DataType::Int32, true);
3275 let nulls = None;
3276 ListArray::new(Arc::new(field), offsets, Arc::new(values), nulls)
3277 };
3278
3279 expect_ipc_validation_error(
3280 Arc::new(array),
3281 "Invalid argument error: Offset invariant failure: offset at position 2 out of bounds: 4 > 2",
3282 );
3283 }
3284
3285 #[test]
3286 fn test_validation_of_invalid_string_array() {
3287 let valid: &[u8] = b" ";
3288 let mut invalid = vec![];
3289 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3290 invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
3291 let binary_array = BinaryArray::from_iter(vec![None, Some(valid), None, Some(&invalid)]);
3292 let array = unsafe {
3295 StringArray::new_unchecked(
3296 binary_array.offsets().clone(),
3297 binary_array.values().clone(),
3298 binary_array.nulls().cloned(),
3299 )
3300 };
3301 expect_ipc_validation_error(
3302 Arc::new(array),
3303 "Invalid argument error: Invalid UTF8 sequence at string index 3 (3..45): invalid utf-8 sequence of 1 bytes from index 38",
3304 );
3305 }
3306
3307 #[test]
3308 fn test_validation_of_invalid_string_view_array() {
3309 let valid: &[u8] = b" ";
3310 let mut invalid = vec![];
3311 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3312 invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
3313 let binary_view_array =
3314 BinaryViewArray::from_iter(vec![None, Some(valid), None, Some(&invalid)]);
3315 let array = unsafe {
3318 StringViewArray::new_unchecked(
3319 binary_view_array.views().clone(),
3320 binary_view_array.data_buffers().to_vec(),
3321 binary_view_array.nulls().cloned(),
3322 )
3323 };
3324 expect_ipc_validation_error(
3325 Arc::new(array),
3326 "Invalid argument error: Encountered non-UTF-8 data at index 3: invalid utf-8 sequence of 1 bytes from index 38",
3327 );
3328 }
3329
3330 #[test]
3333 fn test_validation_of_invalid_dictionary_array() {
3334 let array = unsafe {
3335 let values = StringArray::from_iter_values(["a", "b", "c"]);
3336 let keys = Int32Array::from(vec![1, 200]); DictionaryArray::new_unchecked(keys, Arc::new(values))
3338 };
3339
3340 expect_ipc_validation_error(
3341 Arc::new(array),
3342 "Invalid argument error: Value at position 1 out of bounds: 200 (should be in [0, 2])",
3343 );
3344 }
3345
3346 #[test]
3347 fn test_validation_of_invalid_union_array() {
3348 let array = unsafe {
3349 let fields = UnionFields::try_new(
3350 vec![1, 3], vec![
3352 Field::new("a", DataType::Int32, false),
3353 Field::new("b", DataType::Utf8, false),
3354 ],
3355 )
3356 .unwrap();
3357 let type_ids = ScalarBuffer::from(vec![1i8, 2, 3]); let offsets = None;
3359 let children: Vec<ArrayRef> = vec![
3360 Arc::new(Int32Array::from(vec![10, 20, 30])),
3361 Arc::new(StringArray::from(vec![Some("a"), Some("b"), Some("c")])),
3362 ];
3363
3364 UnionArray::new_unchecked(fields, type_ids, offsets, children)
3365 };
3366
3367 expect_ipc_validation_error(
3368 Arc::new(array),
3369 "Invalid argument error: Type Ids values must match one of the field type ids",
3370 );
3371 }
3372
3373 const INVALID_UTF8_FIRST_CHAR: &[u8] = &[0xa0, 0xa1, 0x20, 0x20];
3376
3377 fn expect_ipc_validation_error(array: ArrayRef, expected_err: &str) {
3379 let rb = RecordBatch::try_from_iter([("a", array)]).unwrap();
3380
3381 let buf = write_stream(&rb); read_stream_skip_validation(&buf).unwrap();
3384 let err = read_stream(&buf).unwrap_err();
3385 assert_eq!(err.to_string(), expected_err);
3386
3387 let buf = write_ipc(&rb); read_ipc_skip_validation(&buf).unwrap();
3390 let err = read_ipc(&buf).unwrap_err();
3391 assert_eq!(err.to_string(), expected_err);
3392
3393 read_ipc_with_decoder_skip_validation(buf.clone()).unwrap();
3395 let err = read_ipc_with_decoder(buf).unwrap_err();
3396 assert_eq!(err.to_string(), expected_err);
3397 }
3398
3399 #[test]
3400 fn test_roundtrip_schema() {
3401 let schema = Schema::new(vec![
3402 Field::new(
3403 "a",
3404 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
3405 false,
3406 ),
3407 Field::new(
3408 "b",
3409 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
3410 false,
3411 ),
3412 ]);
3413
3414 let options = IpcWriteOptions::default();
3415 let data_gen = IpcDataGenerator::default();
3416 let mut dict_tracker = DictionaryTracker::new(false);
3417 let encoded_data =
3418 data_gen.schema_to_bytes_with_dictionary_tracker(&schema, &mut dict_tracker, &options);
3419 let mut schema_bytes = vec![];
3420 write_message(&mut schema_bytes, encoded_data, &options).expect("write_message");
3421
3422 let begin_offset: usize = if schema_bytes[0..4].eq(&CONTINUATION_MARKER) {
3423 4
3424 } else {
3425 0
3426 };
3427
3428 size_prefixed_root_as_message(&schema_bytes[begin_offset..])
3429 .expect_err("size_prefixed_root_as_message");
3430
3431 let msg = parse_message(&schema_bytes).expect("parse_message");
3432 let ipc_schema = msg.header_as_schema().expect("header_as_schema");
3433 let new_schema = fb_to_schema(ipc_schema);
3434
3435 assert_eq!(schema, new_schema);
3436 }
3437
3438 #[test]
3439 fn test_negative_meta_len() {
3440 let bytes = i32::to_le_bytes(-1);
3441 let mut buf = vec![];
3442 buf.extend(CONTINUATION_MARKER);
3443 buf.extend(bytes);
3444
3445 let reader = StreamReader::try_new(Cursor::new(buf), None);
3446 assert!(reader.is_err());
3447 }
3448
3449 #[test]
3455 fn test_read_null_dict_without_dictionary_batch() {
3456 let keys = Int32Array::new_null(4);
3458 let values: ArrayRef = new_empty_array(&DataType::Utf8);
3459 let dict_array = DictionaryArray::new(keys, values);
3460
3461 let schema = Arc::new(Schema::new(vec![Field::new(
3462 "d",
3463 dict_array.data_type().clone(),
3464 true,
3465 )]));
3466 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(dict_array)]).unwrap();
3467
3468 let full_stream = write_stream(&batch);
3470
3471 let mut stripped = Vec::new();
3475 let mut cursor = Cursor::new(&full_stream);
3476 loop {
3477 let mut header = [0u8; 4];
3480 if cursor.read_exact(&mut header).is_err() {
3481 break;
3482 }
3483 if header == CONTINUATION_MARKER && cursor.read_exact(&mut header).is_err() {
3484 break;
3485 }
3486 let meta_len = u32::from_le_bytes(header) as usize;
3487 if meta_len == 0 {
3488 stripped.extend_from_slice(&CONTINUATION_MARKER);
3490 stripped.extend_from_slice(&0u32.to_le_bytes());
3491 break;
3492 }
3493 let mut meta_buf = vec![0u8; meta_len];
3494 cursor.read_exact(&mut meta_buf).unwrap();
3495
3496 let message = root_as_message(&meta_buf).unwrap();
3497 let body_len = message.bodyLength() as usize;
3498 let mut body_buf = vec![0u8; body_len];
3499 cursor.read_exact(&mut body_buf).unwrap();
3500
3501 if message.header_type() == crate::MessageHeader::DictionaryBatch {
3502 continue;
3505 }
3506 stripped.extend_from_slice(&CONTINUATION_MARKER);
3507 stripped.extend_from_slice(&(meta_len as u32).to_le_bytes());
3508 stripped.extend_from_slice(&meta_buf);
3509 stripped.extend_from_slice(&body_buf);
3510 }
3511
3512 let result = read_stream(&stripped).unwrap();
3514 assert_eq!(result.num_rows(), 4);
3515 assert_eq!(result.num_columns(), 1);
3516
3517 let col = result.column(0);
3518 assert_eq!(col.null_count(), 4);
3519 assert_eq!(col.len(), 4);
3520 assert!(matches!(col.data_type(), DataType::Dictionary(_, _)));
3522 }
3523
3524 #[test]
3527 fn test_projection_skip_list_view() {
3528 use crate::reader::FileReader;
3529 use crate::writer::FileWriter;
3530 use arrow_array::{
3531 GenericListViewArray, Int32Array, RecordBatch,
3532 builder::{GenericListViewBuilder, UInt32Builder},
3533 };
3534 use arrow_schema::{DataType, Field, Schema};
3535 use std::sync::Arc;
3536
3537 let mut builder = GenericListViewBuilder::<i32, _>::new(UInt32Builder::new());
3539
3540 builder.values().append_value(1);
3541 builder.values().append_value(2);
3542 builder.append(true);
3543
3544 builder.append(false);
3545
3546 builder.values().append_value(3);
3547 builder.values().append_value(4);
3548 builder.append(true);
3549
3550 let list_view: GenericListViewArray<i32> = builder.finish();
3551
3552 let values = Int32Array::from(vec![10, 20, 30]);
3554
3555 let schema = Arc::new(Schema::new(vec![
3557 Field::new("a", list_view.data_type().clone(), true),
3558 Field::new("b", DataType::Int32, false),
3559 ]));
3560 let batch =
3562 RecordBatch::try_new(schema, vec![Arc::new(list_view), Arc::new(values.clone())])
3563 .unwrap();
3564
3565 let mut buf = Vec::new();
3567 {
3568 let mut writer = FileWriter::try_new(&mut buf, &batch.schema()).unwrap();
3569 writer.write(&batch).unwrap();
3570 writer.finish().unwrap();
3571 }
3572
3573 let mut reader = FileReader::try_new(std::io::Cursor::new(buf), Some(vec![1])).unwrap();
3575 let read_batch = reader.next().unwrap().unwrap();
3576
3577 assert_eq!(read_batch.num_columns(), 1);
3579 assert_eq!(read_batch.column(0).as_ref(), &values);
3580 }
3581
3582 #[test]
3585 fn test_projection_skip_union_v4() {
3586 use crate::MetadataVersion;
3587 use crate::reader::FileReader;
3588 use crate::writer::{FileWriter, IpcWriteOptions};
3589 use arrow_array::{
3590 ArrayRef, Int32Array, RecordBatch, builder::UnionBuilder, types::Int32Type,
3591 };
3592 use arrow_schema::{DataType, Field, Schema};
3593 use std::sync::Arc;
3594
3595 let mut builder = UnionBuilder::new_dense();
3597 builder.append::<Int32Type>("a", 1).unwrap();
3598 builder.append::<Int32Type>("a", 2).unwrap();
3599 builder.append::<Int32Type>("a", 3).unwrap();
3600 let union = builder.build().unwrap();
3601
3602 let values = Int32Array::from(vec![10, 20, 30]);
3604
3605 let schema = Arc::new(Schema::new(vec![
3607 Field::new("union", union.data_type().clone(), false),
3608 Field::new("values", DataType::Int32, false),
3609 ]));
3610
3611 let batch = RecordBatch::try_new(
3613 schema,
3614 vec![Arc::new(union) as ArrayRef, Arc::new(values.clone())],
3615 )
3616 .unwrap();
3617
3618 let mut buf = Vec::new();
3620 {
3621 let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap();
3622 let mut writer =
3623 FileWriter::try_new_with_options(&mut buf, &batch.schema(), options).unwrap();
3624 writer.write(&batch).unwrap();
3625 writer.finish().unwrap();
3626 }
3627 let mut reader = FileReader::try_new(std::io::Cursor::new(buf), Some(vec![1])).unwrap();
3629 let read_batch = reader.next().unwrap().unwrap();
3630
3631 assert_eq!(read_batch.num_columns(), 1);
3633 assert_eq!(read_batch.column(0).as_ref(), &values);
3634 }
3635
3636 #[test]
3640 fn test_projection_skip_fixed_width_types() {
3641 use std::sync::Arc;
3642
3643 use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, make_array};
3644 use arrow_buffer::Buffer;
3645 use arrow_data::ArrayData;
3646 use arrow_schema::{DataType, Field, IntervalUnit, Schema, TimeUnit};
3647
3648 use crate::reader::FileReader;
3649 use crate::writer::FileWriter;
3650
3651 fn make_array_for_type(data_type: DataType) -> ArrayRef {
3653 let len = 3;
3654
3655 if matches!(data_type, DataType::Boolean) {
3656 return Arc::new(BooleanArray::from(vec![true, false, true]));
3657 }
3658
3659 let width = data_type.primitive_width().unwrap();
3660 let data = ArrayData::builder(data_type)
3661 .len(len)
3662 .add_buffer(Buffer::from(vec![0_u8; len * width]))
3663 .build()
3664 .unwrap();
3665
3666 make_array(data)
3667 }
3668
3669 let data_types = vec![
3671 DataType::Boolean,
3672 DataType::Int8,
3673 DataType::Int16,
3674 DataType::Int32,
3675 DataType::Int64,
3676 DataType::UInt8,
3677 DataType::UInt16,
3678 DataType::UInt32,
3679 DataType::UInt64,
3680 DataType::Float16,
3681 DataType::Float32,
3682 DataType::Float64,
3683 DataType::Timestamp(TimeUnit::Second, None),
3684 DataType::Date32,
3685 DataType::Date64,
3686 DataType::Time32(TimeUnit::Second),
3687 DataType::Time64(TimeUnit::Microsecond),
3688 DataType::Duration(TimeUnit::Second),
3689 DataType::Interval(IntervalUnit::YearMonth),
3690 DataType::Interval(IntervalUnit::DayTime),
3691 DataType::Interval(IntervalUnit::MonthDayNano),
3692 DataType::Decimal32(9, 2),
3693 DataType::Decimal64(18, 2),
3694 DataType::Decimal128(38, 2),
3695 DataType::Decimal256(76, 2),
3696 ];
3697
3698 for data_type in data_types {
3703 let skipped = make_array_for_type(data_type.clone());
3704 let values = Int32Array::from(vec![10, 20, 30]);
3705
3706 let schema = Arc::new(Schema::new(vec![
3707 Field::new("skipped", data_type, false),
3708 Field::new("values", DataType::Int32, false),
3709 ]));
3710
3711 let batch =
3712 RecordBatch::try_new(schema, vec![skipped, Arc::new(values.clone())]).unwrap();
3713
3714 let mut buf = Vec::new();
3716 {
3717 let mut writer = FileWriter::try_new(&mut buf, &batch.schema()).unwrap();
3718 writer.write(&batch).unwrap();
3719 writer.finish().unwrap();
3720 }
3721
3722 let mut reader = FileReader::try_new(std::io::Cursor::new(buf), Some(vec![1])).unwrap();
3724 let read_batch = reader.next().unwrap().unwrap();
3725
3726 assert_eq!(read_batch.num_columns(), 1);
3728 assert_eq!(read_batch.column(0).as_ref(), &values);
3729 }
3730 }
3731}