1mod stream;
28pub use stream::*;
29
30use arrow_select::concat;
31
32use flatbuffers::{VectorIter, VerifierOptions};
33use std::collections::{HashMap, VecDeque};
34use std::fmt;
35use std::io::{BufReader, Read, Seek, SeekFrom};
36use std::sync::Arc;
37
38use arrow_array::*;
39use arrow_buffer::{
40 ArrowNativeType, BooleanBuffer, Buffer, MutableBuffer, NullBuffer, ScalarBuffer,
41};
42use arrow_data::{ArrayData, ArrayDataBuilder, UnsafeFlag};
43use arrow_schema::*;
44
45use crate::compression::{CompressionCodec, DecompressionContext};
46use crate::r#gen::Message::{self};
47use crate::{Block, CONTINUATION_MARKER, FieldNode, MetadataVersion};
48use DataType::*;
49
50fn read_buffer(
60 buf: &crate::Buffer,
61 a_data: &Buffer,
62 compression_codec: Option<CompressionCodec>,
63 decompression_context: &mut DecompressionContext,
64) -> Result<Buffer, ArrowError> {
65 let start_offset = buf.offset() as usize;
66 let buf_data = a_data.slice_with_length(start_offset, buf.length() as usize);
67 match (buf_data.is_empty(), compression_codec) {
69 (true, _) | (_, None) => Ok(buf_data),
70 (false, Some(decompressor)) => {
71 decompressor.decompress_to_buffer(&buf_data, decompression_context)
72 }
73 }
74}
75impl RecordBatchDecoder<'_> {
76 fn create_array(
89 &mut self,
90 field: &Field,
91 variadic_counts: &mut VecDeque<i64>,
92 ) -> Result<ArrayRef, ArrowError> {
93 let data_type = field.data_type();
94 match data_type {
95 Utf8 | Binary | LargeBinary | LargeUtf8 => {
96 let field_node = self.next_node(field)?;
97 let buffers = [
98 self.next_buffer()?,
99 self.next_buffer()?,
100 self.next_buffer()?,
101 ];
102 self.create_primitive_array(field_node, data_type, &buffers)
103 }
104 BinaryView | Utf8View => {
105 let count = variadic_counts
106 .pop_front()
107 .ok_or(ArrowError::IpcError(format!(
108 "Missing variadic count for {data_type} column"
109 )))?;
110 let count = count + 2; let buffers = (0..count)
112 .map(|_| self.next_buffer())
113 .collect::<Result<Vec<_>, _>>()?;
114 let field_node = self.next_node(field)?;
115 self.create_primitive_array(field_node, data_type, &buffers)
116 }
117 FixedSizeBinary(_) => {
118 let field_node = self.next_node(field)?;
119 let buffers = [self.next_buffer()?, self.next_buffer()?];
120 self.create_primitive_array(field_node, data_type, &buffers)
121 }
122 List(list_field) | LargeList(list_field) | Map(list_field, _) => {
123 let list_node = self.next_node(field)?;
124 let list_buffers = [self.next_buffer()?, self.next_buffer()?];
125 let values = self.create_array(list_field, variadic_counts)?;
126 self.create_list_array(list_node, data_type, &list_buffers, values)
127 }
128 ListView(list_field) | LargeListView(list_field) => {
129 let list_node = self.next_node(field)?;
130 let list_buffers = [
131 self.next_buffer()?, self.next_buffer()?, self.next_buffer()?, ];
135 let values = self.create_array(list_field, variadic_counts)?;
136 self.create_list_view_array(list_node, data_type, &list_buffers, values)
137 }
138 FixedSizeList(list_field, _) => {
139 let list_node = self.next_node(field)?;
140 let list_buffers = [self.next_buffer()?];
141 let values = self.create_array(list_field, variadic_counts)?;
142 self.create_list_array(list_node, data_type, &list_buffers, values)
143 }
144 Struct(struct_fields) => {
145 let struct_node = self.next_node(field)?;
146 let null_buffer = self.next_buffer()?;
147
148 let mut struct_arrays = vec![];
150 for struct_field in struct_fields {
153 let child = self.create_array(struct_field, variadic_counts)?;
154 struct_arrays.push(child);
155 }
156 self.create_struct_array(struct_node, null_buffer, struct_fields, struct_arrays)
157 }
158 RunEndEncoded(run_ends_field, values_field) => {
159 let run_node = self.next_node(field)?;
160 let run_ends = self.create_array(run_ends_field, variadic_counts)?;
161 let values = self.create_array(values_field, variadic_counts)?;
162
163 let run_array_length = run_node.length() as usize;
164 let builder = ArrayData::builder(data_type.clone())
165 .len(run_array_length)
166 .offset(0)
167 .add_child_data(run_ends.into_data())
168 .add_child_data(values.into_data())
169 .null_count(run_node.null_count() as usize);
170
171 self.create_array_from_builder(builder)
172 }
173 Dictionary(_, _) => {
175 let index_node = self.next_node(field)?;
176 let index_buffers = [self.next_buffer()?, self.next_buffer()?];
177
178 #[allow(deprecated)]
179 let dict_id = field.dict_id().ok_or_else(|| {
180 ArrowError::ParseError(format!("Field {field} does not have dict id"))
181 })?;
182
183 let value_array = match self.dictionaries_by_id.get(&dict_id) {
184 Some(array) => array.clone(),
185 None => {
186 if let Dictionary(_, value_type) = data_type {
190 arrow_array::new_empty_array(value_type.as_ref())
191 } else {
192 unreachable!()
193 }
194 }
195 };
196
197 self.create_dictionary_array(index_node, data_type, &index_buffers, value_array)
198 }
199 Union(fields, mode) => {
200 let union_node = self.next_node(field)?;
201 let len = union_node.length() as usize;
202
203 if self.version < MetadataVersion::V5 {
206 self.next_buffer()?;
207 }
208
209 let type_ids: ScalarBuffer<i8> =
210 self.next_buffer()?.slice_with_length(0, len).into();
211
212 let value_offsets = match mode {
213 UnionMode::Dense => {
214 let offsets: ScalarBuffer<i32> =
215 self.next_buffer()?.slice_with_length(0, len * 4).into();
216 Some(offsets)
217 }
218 UnionMode::Sparse => None,
219 };
220
221 let mut children = Vec::with_capacity(fields.len());
222
223 for (_id, field) in fields.iter() {
224 let child = self.create_array(field, variadic_counts)?;
225 children.push(child);
226 }
227
228 let array = if self.skip_validation.get() {
229 unsafe {
231 UnionArray::new_unchecked(fields.clone(), type_ids, value_offsets, children)
232 }
233 } else {
234 UnionArray::try_new(fields.clone(), type_ids, value_offsets, children)?
235 };
236 Ok(Arc::new(array))
237 }
238 Null => {
239 let node = self.next_node(field)?;
240 let length = node.length();
241 let null_count = node.null_count();
242
243 if length != null_count {
244 return Err(ArrowError::SchemaError(format!(
245 "Field {field} of NullArray has unequal null_count {null_count} and len {length}"
246 )));
247 }
248
249 let builder = ArrayData::builder(data_type.clone())
250 .len(length as usize)
251 .offset(0);
252 self.create_array_from_builder(builder)
253 }
254 _ => {
255 let field_node = self.next_node(field)?;
256 let buffers = [self.next_buffer()?, self.next_buffer()?];
257 self.create_primitive_array(field_node, data_type, &buffers)
258 }
259 }
260 }
261
262 fn create_primitive_array(
265 &self,
266 field_node: &FieldNode,
267 data_type: &DataType,
268 buffers: &[Buffer],
269 ) -> Result<ArrayRef, ArrowError> {
270 let length = field_node.length() as usize;
271 let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
272 let mut builder = match data_type {
273 Utf8 | Binary | LargeBinary | LargeUtf8 => {
274 ArrayData::builder(data_type.clone())
276 .len(length)
277 .buffers(buffers[1..3].to_vec())
278 .null_bit_buffer(null_buffer)
279 }
280 BinaryView | Utf8View => ArrayData::builder(data_type.clone())
281 .len(length)
282 .buffers(buffers[1..].to_vec())
283 .null_bit_buffer(null_buffer),
284 _ if data_type.is_primitive() || matches!(data_type, Boolean | FixedSizeBinary(_)) => {
285 ArrayData::builder(data_type.clone())
287 .len(length)
288 .add_buffer(buffers[1].clone())
289 .null_bit_buffer(null_buffer)
290 }
291 t => unreachable!("Data type {:?} either unsupported or not primitive", t),
292 };
293
294 builder = builder.null_count(field_node.null_count() as usize);
295
296 self.create_array_from_builder(builder)
297 }
298
299 fn create_array_from_builder(&self, builder: ArrayDataBuilder) -> Result<ArrayRef, ArrowError> {
301 let mut builder = builder.align_buffers(!self.require_alignment);
302 if self.skip_validation.get() {
303 unsafe { builder = builder.skip_validation(true) }
305 };
306 Ok(make_array(builder.build()?))
307 }
308
309 fn create_list_array(
312 &self,
313 field_node: &FieldNode,
314 data_type: &DataType,
315 buffers: &[Buffer],
316 child_array: ArrayRef,
317 ) -> Result<ArrayRef, ArrowError> {
318 let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
319 let length = field_node.length() as usize;
320 let child_data = child_array.into_data();
321 let mut builder = match data_type {
322 List(_) | LargeList(_) | Map(_, _) => ArrayData::builder(data_type.clone())
323 .len(length)
324 .add_buffer(buffers[1].clone())
325 .add_child_data(child_data)
326 .null_bit_buffer(null_buffer),
327
328 FixedSizeList(_, _) => ArrayData::builder(data_type.clone())
329 .len(length)
330 .add_child_data(child_data)
331 .null_bit_buffer(null_buffer),
332
333 _ => unreachable!("Cannot create list or map array from {:?}", data_type),
334 };
335
336 builder = builder.null_count(field_node.null_count() as usize);
337
338 self.create_array_from_builder(builder)
339 }
340
341 fn create_list_view_array(
342 &self,
343 field_node: &FieldNode,
344 data_type: &DataType,
345 buffers: &[Buffer],
346 child_array: ArrayRef,
347 ) -> Result<ArrayRef, ArrowError> {
348 assert!(matches!(data_type, ListView(_) | LargeListView(_)));
349
350 let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
351 let length = field_node.length() as usize;
352 let child_data = child_array.into_data();
353
354 self.create_array_from_builder(
355 ArrayData::builder(data_type.clone())
356 .len(length)
357 .add_buffer(buffers[1].clone()) .add_buffer(buffers[2].clone()) .add_child_data(child_data)
360 .null_bit_buffer(null_buffer)
361 .null_count(field_node.null_count() as usize),
362 )
363 }
364
365 fn create_struct_array(
366 &self,
367 struct_node: &FieldNode,
368 null_buffer: Buffer,
369 struct_fields: &Fields,
370 struct_arrays: Vec<ArrayRef>,
371 ) -> Result<ArrayRef, ArrowError> {
372 let null_count = struct_node.null_count() as usize;
373 let len = struct_node.length() as usize;
374 let skip_validation = self.skip_validation.get();
375
376 let nulls = if null_count > 0 {
377 let validity_buffer = BooleanBuffer::new(null_buffer, 0, len);
378 let null_buffer = if skip_validation {
379 unsafe { NullBuffer::new_unchecked(validity_buffer, null_count) }
381 } else {
382 let null_buffer = NullBuffer::new(validity_buffer);
383
384 if null_buffer.null_count() != null_count {
385 return Err(ArrowError::InvalidArgumentError(format!(
386 "null_count value ({}) doesn't match actual number of nulls in array ({})",
387 null_count,
388 null_buffer.null_count()
389 )));
390 }
391
392 null_buffer
393 };
394
395 Some(null_buffer)
396 } else {
397 None
398 };
399 if struct_arrays.is_empty() {
400 return Ok(Arc::new(StructArray::new_empty_fields(len, nulls)));
403 }
404
405 let struct_array = if skip_validation {
406 unsafe { StructArray::new_unchecked(struct_fields.clone(), struct_arrays, nulls) }
408 } else {
409 StructArray::try_new(struct_fields.clone(), struct_arrays, nulls)?
410 };
411
412 Ok(Arc::new(struct_array))
413 }
414
415 fn create_dictionary_array(
418 &self,
419 field_node: &FieldNode,
420 data_type: &DataType,
421 buffers: &[Buffer],
422 value_array: ArrayRef,
423 ) -> Result<ArrayRef, ArrowError> {
424 if let Dictionary(_, _) = *data_type {
425 let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
426 let builder = ArrayData::builder(data_type.clone())
427 .len(field_node.length() as usize)
428 .add_buffer(buffers[1].clone())
429 .add_child_data(value_array.into_data())
430 .null_bit_buffer(null_buffer)
431 .null_count(field_node.null_count() as usize);
432 self.create_array_from_builder(builder)
433 } else {
434 unreachable!("Cannot create dictionary array from {:?}", data_type)
435 }
436 }
437}
438
439pub struct RecordBatchDecoder<'a> {
445 batch: crate::RecordBatch<'a>,
447 schema: SchemaRef,
449 dictionaries_by_id: &'a HashMap<i64, ArrayRef>,
451 compression: Option<CompressionCodec>,
453 decompression_context: DecompressionContext,
455 version: MetadataVersion,
457 data: &'a Buffer,
459 nodes: VectorIter<'a, FieldNode>,
461 buffers: VectorIter<'a, crate::Buffer>,
463 projection: Option<&'a [usize]>,
466 require_alignment: bool,
469 skip_validation: UnsafeFlag,
473}
474
475impl<'a> RecordBatchDecoder<'a> {
476 fn try_new(
478 buf: &'a Buffer,
479 batch: crate::RecordBatch<'a>,
480 schema: SchemaRef,
481 dictionaries_by_id: &'a HashMap<i64, ArrayRef>,
482 metadata: &'a MetadataVersion,
483 ) -> Result<Self, ArrowError> {
484 let buffers = batch.buffers().ok_or_else(|| {
485 ArrowError::IpcError("Unable to get buffers from IPC RecordBatch".to_string())
486 })?;
487 let field_nodes = batch.nodes().ok_or_else(|| {
488 ArrowError::IpcError("Unable to get field nodes from IPC RecordBatch".to_string())
489 })?;
490
491 let batch_compression = batch.compression();
492 let compression = batch_compression
493 .map(|batch_compression| batch_compression.codec().try_into())
494 .transpose()?;
495
496 Ok(Self {
497 batch,
498 schema,
499 dictionaries_by_id,
500 compression,
501 decompression_context: DecompressionContext::new(),
502 version: *metadata,
503 data: buf,
504 nodes: field_nodes.iter(),
505 buffers: buffers.iter(),
506 projection: None,
507 require_alignment: false,
508 skip_validation: UnsafeFlag::new(),
509 })
510 }
511
512 pub fn with_projection(mut self, projection: Option<&'a [usize]>) -> Self {
517 self.projection = projection;
518 self
519 }
520
521 pub fn with_require_alignment(mut self, require_alignment: bool) -> Self {
527 self.require_alignment = require_alignment;
528 self
529 }
530
531 pub(crate) fn with_skip_validation(mut self, skip_validation: UnsafeFlag) -> Self {
543 self.skip_validation = skip_validation;
544 self
545 }
546
547 fn read_record_batch(mut self) -> Result<RecordBatch, ArrowError> {
549 let mut variadic_counts: VecDeque<i64> = self
550 .batch
551 .variadicBufferCounts()
552 .into_iter()
553 .flatten()
554 .collect();
555
556 let options = RecordBatchOptions::new().with_row_count(Some(self.batch.length() as usize));
557
558 let schema = Arc::clone(&self.schema);
559 if let Some(projection) = self.projection {
560 let mut arrays = vec![];
561 for (idx, field) in schema.fields().iter().enumerate() {
563 if let Some(proj_idx) = projection.iter().position(|p| p == &idx) {
565 let child = self.create_array(field, &mut variadic_counts)?;
566 arrays.push((proj_idx, child));
567 } else {
568 self.skip_field(field, &mut variadic_counts)?;
569 }
570 }
571
572 arrays.sort_by_key(|t| t.0);
573
574 let schema = Arc::new(schema.project(projection)?);
575 let columns = arrays.into_iter().map(|t| t.1).collect::<Vec<_>>();
576
577 if self.skip_validation.get() {
578 unsafe {
580 Ok(RecordBatch::new_unchecked(
581 schema,
582 columns,
583 self.batch.length() as usize,
584 ))
585 }
586 } else {
587 assert!(variadic_counts.is_empty());
588 RecordBatch::try_new_with_options(schema, columns, &options)
589 }
590 } else {
591 let mut children = vec![];
592 for field in schema.fields() {
594 let child = self.create_array(field, &mut variadic_counts)?;
595 children.push(child);
596 }
597
598 if self.skip_validation.get() {
599 unsafe {
601 Ok(RecordBatch::new_unchecked(
602 schema,
603 children,
604 self.batch.length() as usize,
605 ))
606 }
607 } else {
608 assert!(variadic_counts.is_empty());
609 RecordBatch::try_new_with_options(schema, children, &options)
610 }
611 }
612 }
613
614 fn next_buffer(&mut self) -> Result<Buffer, ArrowError> {
615 let buffer = self.buffers.next().ok_or_else(|| {
616 ArrowError::IpcError("Buffer count mismatched with metadata".to_string())
617 })?;
618 read_buffer(
619 buffer,
620 self.data,
621 self.compression,
622 &mut self.decompression_context,
623 )
624 }
625
626 fn skip_buffer(&mut self) {
627 self.buffers.next().unwrap();
628 }
629
630 fn next_node(&mut self, field: &Field) -> Result<&'a FieldNode, ArrowError> {
631 self.nodes.next().ok_or_else(|| {
632 ArrowError::SchemaError(format!(
633 "Invalid data for schema. {field} refers to node not found in schema",
634 ))
635 })
636 }
637
638 fn skip_field(
639 &mut self,
640 field: &Field,
641 variadic_count: &mut VecDeque<i64>,
642 ) -> Result<(), ArrowError> {
643 self.next_node(field)?;
644
645 match field.data_type() {
646 Utf8 | Binary | LargeBinary | LargeUtf8 => {
647 for _ in 0..3 {
648 self.skip_buffer()
649 }
650 }
651 Utf8View | BinaryView => {
652 let count = variadic_count
653 .pop_front()
654 .ok_or(ArrowError::IpcError(format!(
655 "Missing variadic count for {} column",
656 field.data_type()
657 )))?;
658 let count = count + 2; for _i in 0..count {
660 self.skip_buffer()
661 }
662 }
663 FixedSizeBinary(_) => {
664 self.skip_buffer();
665 self.skip_buffer();
666 }
667 List(list_field) | LargeList(list_field) | Map(list_field, _) => {
668 self.skip_buffer();
669 self.skip_buffer();
670 self.skip_field(list_field, variadic_count)?;
671 }
672 ListView(list_field) | LargeListView(list_field) => {
673 self.skip_buffer(); self.skip_buffer(); self.skip_buffer(); self.skip_field(list_field, variadic_count)?;
677 }
678 FixedSizeList(list_field, _) => {
679 self.skip_buffer();
680 self.skip_field(list_field, variadic_count)?;
681 }
682 Struct(struct_fields) => {
683 self.skip_buffer();
684
685 for struct_field in struct_fields {
687 self.skip_field(struct_field, variadic_count)?
688 }
689 }
690 RunEndEncoded(run_ends_field, values_field) => {
691 self.skip_field(run_ends_field, variadic_count)?;
692 self.skip_field(values_field, variadic_count)?;
693 }
694 Dictionary(_, _) => {
695 self.skip_buffer(); self.skip_buffer(); }
698 Union(fields, mode) => {
699 self.skip_buffer(); match mode {
702 UnionMode::Dense => self.skip_buffer(),
703 UnionMode::Sparse => {}
704 };
705
706 for (_, field) in fields.iter() {
707 self.skip_field(field, variadic_count)?
708 }
709 }
710 Null => {}
712
713 Boolean
715 | Int8
716 | Int16
717 | Int32
718 | Int64
719 | UInt8
720 | UInt16
721 | UInt32
722 | UInt64
723 | Float16
724 | Float32
725 | Float64
726 | Timestamp(_, _)
727 | Date32
728 | Date64
729 | Time32(_)
730 | Time64(_)
731 | Duration(_)
732 | Interval(_)
733 | Decimal32(_, _)
734 | Decimal64(_, _)
735 | Decimal128(_, _)
736 | Decimal256(_, _) => {
737 self.skip_buffer();
738 self.skip_buffer();
739 }
740 };
741 Ok(())
742 }
743}
744
745pub fn read_record_batch(
756 buf: &Buffer,
757 batch: crate::RecordBatch,
758 schema: SchemaRef,
759 dictionaries_by_id: &HashMap<i64, ArrayRef>,
760 projection: Option<&[usize]>,
761 metadata: &MetadataVersion,
762) -> Result<RecordBatch, ArrowError> {
763 RecordBatchDecoder::try_new(buf, batch, schema, dictionaries_by_id, metadata)?
764 .with_projection(projection)
765 .with_require_alignment(false)
766 .read_record_batch()
767}
768
769pub fn read_dictionary(
772 buf: &Buffer,
773 batch: crate::DictionaryBatch,
774 schema: &Schema,
775 dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
776 metadata: &MetadataVersion,
777) -> Result<(), ArrowError> {
778 read_dictionary_impl(
779 buf,
780 batch,
781 schema,
782 dictionaries_by_id,
783 metadata,
784 false,
785 UnsafeFlag::new(),
786 )
787}
788
789fn read_dictionary_impl(
790 buf: &Buffer,
791 batch: crate::DictionaryBatch,
792 schema: &Schema,
793 dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
794 metadata: &MetadataVersion,
795 require_alignment: bool,
796 skip_validation: UnsafeFlag,
797) -> Result<(), ArrowError> {
798 let id = batch.id();
799
800 let dictionary_values = get_dictionary_values(
801 buf,
802 batch,
803 schema,
804 dictionaries_by_id,
805 metadata,
806 require_alignment,
807 skip_validation,
808 )?;
809
810 update_dictionaries(dictionaries_by_id, batch.isDelta(), id, dictionary_values)?;
811
812 Ok(())
813}
814
815fn update_dictionaries(
824 dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
825 is_delta: bool,
826 dict_id: i64,
827 dict_values: ArrayRef,
828) -> Result<(), ArrowError> {
829 if !is_delta {
830 dictionaries_by_id.insert(dict_id, dict_values.clone());
834 return Ok(());
835 }
836
837 let existing = dictionaries_by_id.get(&dict_id).ok_or_else(|| {
838 ArrowError::InvalidArgumentError(format!(
839 "No existing dictionary for delta dictionary with id '{dict_id}'"
840 ))
841 })?;
842
843 let combined = concat::concat(&[existing, &dict_values]).map_err(|e| {
844 ArrowError::InvalidArgumentError(format!("Failed to concat delta dictionary: {e}"))
845 })?;
846
847 dictionaries_by_id.insert(dict_id, combined);
848
849 Ok(())
850}
851
852fn get_dictionary_values(
856 buf: &Buffer,
857 batch: crate::DictionaryBatch,
858 schema: &Schema,
859 dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
860 metadata: &MetadataVersion,
861 require_alignment: bool,
862 skip_validation: UnsafeFlag,
863) -> Result<ArrayRef, ArrowError> {
864 let id = batch.id();
865 #[allow(deprecated)]
866 let fields_using_this_dictionary = schema.fields_with_dict_id(id);
867 let first_field = fields_using_this_dictionary.first().ok_or_else(|| {
868 ArrowError::InvalidArgumentError(format!("dictionary id {id} not found in schema"))
869 })?;
870
871 let dictionary_values: ArrayRef = match first_field.data_type() {
875 DataType::Dictionary(_, value_type) => {
876 let value = value_type.as_ref().clone();
878 let schema = Schema::new(vec![Field::new("", value, true)]);
879 let record_batch = RecordBatchDecoder::try_new(
881 buf,
882 batch.data().unwrap(),
883 Arc::new(schema),
884 dictionaries_by_id,
885 metadata,
886 )?
887 .with_require_alignment(require_alignment)
888 .with_skip_validation(skip_validation)
889 .read_record_batch()?;
890
891 Some(record_batch.column(0).clone())
892 }
893 _ => None,
894 }
895 .ok_or_else(|| {
896 ArrowError::InvalidArgumentError(format!("dictionary id {id} not found in schema"))
897 })?;
898
899 Ok(dictionary_values)
900}
901
902fn read_block<R: Read + Seek>(mut reader: R, block: &Block) -> Result<Buffer, ArrowError> {
904 reader.seek(SeekFrom::Start(block.offset() as u64))?;
905 let body_len = block.bodyLength().to_usize().unwrap();
906 let metadata_len = block.metaDataLength().to_usize().unwrap();
907 let total_len = body_len.checked_add(metadata_len).unwrap();
908
909 let mut buf = MutableBuffer::from_len_zeroed(total_len);
910 reader.read_exact(&mut buf)?;
911 Ok(buf.into())
912}
913
914fn parse_message(buf: &[u8]) -> Result<Message::Message<'_>, ArrowError> {
918 let buf = match buf[..4] == CONTINUATION_MARKER {
919 true => &buf[8..],
920 false => &buf[4..],
921 };
922 crate::root_as_message(buf)
923 .map_err(|err| ArrowError::ParseError(format!("Unable to get root as message: {err:?}")))
924}
925
926pub fn read_footer_length(buf: [u8; 10]) -> Result<usize, ArrowError> {
930 if buf[4..] != super::ARROW_MAGIC {
931 return Err(ArrowError::ParseError(
932 "Arrow file does not contain correct footer".to_string(),
933 ));
934 }
935
936 let footer_len = i32::from_le_bytes(buf[..4].try_into().unwrap());
938 footer_len
939 .try_into()
940 .map_err(|_| ArrowError::ParseError(format!("Invalid footer length: {footer_len}")))
941}
942
943#[derive(Debug)]
1008pub struct FileDecoder {
1009 schema: SchemaRef,
1010 dictionaries: HashMap<i64, ArrayRef>,
1011 version: MetadataVersion,
1012 projection: Option<Vec<usize>>,
1013 require_alignment: bool,
1014 skip_validation: UnsafeFlag,
1015}
1016
1017impl FileDecoder {
1018 pub fn new(schema: SchemaRef, version: MetadataVersion) -> Self {
1020 Self {
1021 schema,
1022 version,
1023 dictionaries: Default::default(),
1024 projection: None,
1025 require_alignment: false,
1026 skip_validation: UnsafeFlag::new(),
1027 }
1028 }
1029
1030 pub fn with_projection(mut self, projection: Vec<usize>) -> Self {
1032 self.projection = Some(projection);
1033 self
1034 }
1035
1036 pub fn with_require_alignment(mut self, require_alignment: bool) -> Self {
1049 self.require_alignment = require_alignment;
1050 self
1051 }
1052
1053 pub unsafe fn with_skip_validation(mut self, skip_validation: bool) -> Self {
1064 unsafe { self.skip_validation.set(skip_validation) };
1065 self
1066 }
1067
1068 fn read_message<'a>(&self, buf: &'a [u8]) -> Result<Message::Message<'a>, ArrowError> {
1069 let message = parse_message(buf)?;
1070
1071 if self.version != MetadataVersion::V1 && message.version() != self.version {
1073 return Err(ArrowError::IpcError(
1074 "Could not read IPC message as metadata versions mismatch".to_string(),
1075 ));
1076 }
1077 Ok(message)
1078 }
1079
1080 pub fn read_dictionary(&mut self, block: &Block, buf: &Buffer) -> Result<(), ArrowError> {
1082 let message = self.read_message(buf)?;
1083 match message.header_type() {
1084 crate::MessageHeader::DictionaryBatch => {
1085 let batch = message.header_as_dictionary_batch().unwrap();
1086 read_dictionary_impl(
1087 &buf.slice(block.metaDataLength() as _),
1088 batch,
1089 &self.schema,
1090 &mut self.dictionaries,
1091 &message.version(),
1092 self.require_alignment,
1093 self.skip_validation.clone(),
1094 )
1095 }
1096 t => Err(ArrowError::ParseError(format!(
1097 "Expecting DictionaryBatch in dictionary blocks, found {t:?}."
1098 ))),
1099 }
1100 }
1101
1102 pub fn read_record_batch(
1104 &self,
1105 block: &Block,
1106 buf: &Buffer,
1107 ) -> Result<Option<RecordBatch>, ArrowError> {
1108 let message = self.read_message(buf)?;
1109 match message.header_type() {
1110 crate::MessageHeader::Schema => Err(ArrowError::IpcError(
1111 "Not expecting a schema when messages are read".to_string(),
1112 )),
1113 crate::MessageHeader::RecordBatch => {
1114 let batch = message.header_as_record_batch().ok_or_else(|| {
1115 ArrowError::IpcError("Unable to read IPC message as record batch".to_string())
1116 })?;
1117 RecordBatchDecoder::try_new(
1119 &buf.slice(block.metaDataLength() as _),
1120 batch,
1121 self.schema.clone(),
1122 &self.dictionaries,
1123 &message.version(),
1124 )?
1125 .with_projection(self.projection.as_deref())
1126 .with_require_alignment(self.require_alignment)
1127 .with_skip_validation(self.skip_validation.clone())
1128 .read_record_batch()
1129 .map(Some)
1130 }
1131 crate::MessageHeader::NONE => Ok(None),
1132 t => Err(ArrowError::InvalidArgumentError(format!(
1133 "Reading types other than record batches not yet supported, unable to read {t:?}"
1134 ))),
1135 }
1136 }
1137}
1138
1139#[derive(Debug)]
1141pub struct FileReaderBuilder {
1142 projection: Option<Vec<usize>>,
1144 max_footer_fb_tables: usize,
1146 max_footer_fb_depth: usize,
1148}
1149
1150impl Default for FileReaderBuilder {
1151 fn default() -> Self {
1152 let verifier_options = VerifierOptions::default();
1153 Self {
1154 max_footer_fb_tables: verifier_options.max_tables,
1155 max_footer_fb_depth: verifier_options.max_depth,
1156 projection: None,
1157 }
1158 }
1159}
1160
1161impl FileReaderBuilder {
1162 pub fn new() -> Self {
1166 Self::default()
1167 }
1168
1169 pub fn with_projection(mut self, projection: Vec<usize>) -> Self {
1171 self.projection = Some(projection);
1172 self
1173 }
1174
1175 pub fn with_max_footer_fb_tables(mut self, max_footer_fb_tables: usize) -> Self {
1188 self.max_footer_fb_tables = max_footer_fb_tables;
1189 self
1190 }
1191
1192 pub fn with_max_footer_fb_depth(mut self, max_footer_fb_depth: usize) -> Self {
1205 self.max_footer_fb_depth = max_footer_fb_depth;
1206 self
1207 }
1208
1209 pub fn build<R: Read + Seek>(self, mut reader: R) -> Result<FileReader<R>, ArrowError> {
1211 let mut buffer = [0; 10];
1213 reader.seek(SeekFrom::End(-10))?;
1214 reader.read_exact(&mut buffer)?;
1215
1216 let footer_len = read_footer_length(buffer)?;
1217
1218 let mut footer_data = vec![0; footer_len];
1220 reader.seek(SeekFrom::End(-10 - footer_len as i64))?;
1221 reader.read_exact(&mut footer_data)?;
1222
1223 let verifier_options = VerifierOptions {
1224 max_tables: self.max_footer_fb_tables,
1225 max_depth: self.max_footer_fb_depth,
1226 ..Default::default()
1227 };
1228 let footer = crate::root_as_footer_with_opts(&verifier_options, &footer_data[..]).map_err(
1229 |err| ArrowError::ParseError(format!("Unable to get root as footer: {err:?}")),
1230 )?;
1231
1232 let blocks = footer.recordBatches().ok_or_else(|| {
1233 ArrowError::ParseError("Unable to get record batches from IPC Footer".to_string())
1234 })?;
1235
1236 let total_blocks = blocks.len();
1237
1238 let ipc_schema = footer.schema().unwrap();
1239 if !ipc_schema.endianness().equals_to_target_endianness() {
1240 return Err(ArrowError::IpcError(
1241 "the endianness of the source system does not match the endianness of the target system.".to_owned()
1242 ));
1243 }
1244
1245 let schema = crate::convert::fb_to_schema(ipc_schema);
1246
1247 let mut custom_metadata = HashMap::new();
1248 if let Some(fb_custom_metadata) = footer.custom_metadata() {
1249 for kv in fb_custom_metadata.into_iter() {
1250 custom_metadata.insert(
1251 kv.key().unwrap().to_string(),
1252 kv.value().unwrap().to_string(),
1253 );
1254 }
1255 }
1256
1257 let mut decoder = FileDecoder::new(Arc::new(schema), footer.version());
1258 if let Some(projection) = self.projection {
1259 decoder = decoder.with_projection(projection)
1260 }
1261
1262 if let Some(dictionaries) = footer.dictionaries() {
1264 for block in dictionaries {
1265 let buf = read_block(&mut reader, block)?;
1266 decoder.read_dictionary(block, &buf)?;
1267 }
1268 }
1269
1270 Ok(FileReader {
1271 reader,
1272 blocks: blocks.iter().copied().collect(),
1273 current_block: 0,
1274 total_blocks,
1275 decoder,
1276 custom_metadata,
1277 })
1278 }
1279}
1280
1281pub struct FileReader<R> {
1326 reader: R,
1328
1329 decoder: FileDecoder,
1331
1332 blocks: Vec<Block>,
1336
1337 current_block: usize,
1339
1340 total_blocks: usize,
1342
1343 custom_metadata: HashMap<String, String>,
1345}
1346
1347impl<R> fmt::Debug for FileReader<R> {
1348 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
1349 f.debug_struct("FileReader<R>")
1350 .field("decoder", &self.decoder)
1351 .field("blocks", &self.blocks)
1352 .field("current_block", &self.current_block)
1353 .field("total_blocks", &self.total_blocks)
1354 .finish_non_exhaustive()
1355 }
1356}
1357
1358impl<R: Read + Seek> FileReader<BufReader<R>> {
1359 pub fn try_new_buffered(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
1363 Self::try_new(BufReader::new(reader), projection)
1364 }
1365}
1366
1367impl<R: Read + Seek> FileReader<R> {
1368 pub fn try_new(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
1379 let builder = FileReaderBuilder {
1380 projection,
1381 ..Default::default()
1382 };
1383 builder.build(reader)
1384 }
1385
1386 pub fn custom_metadata(&self) -> &HashMap<String, String> {
1388 &self.custom_metadata
1389 }
1390
1391 pub fn num_batches(&self) -> usize {
1393 self.total_blocks
1394 }
1395
1396 pub fn schema(&self) -> SchemaRef {
1398 self.decoder.schema.clone()
1399 }
1400
1401 pub fn set_index(&mut self, index: usize) -> Result<(), ArrowError> {
1405 if index >= self.total_blocks {
1406 Err(ArrowError::InvalidArgumentError(format!(
1407 "Cannot set batch to index {} from {} total batches",
1408 index, self.total_blocks
1409 )))
1410 } else {
1411 self.current_block = index;
1412 Ok(())
1413 }
1414 }
1415
1416 fn maybe_next(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
1417 let block = &self.blocks[self.current_block];
1418 self.current_block += 1;
1419
1420 let buffer = read_block(&mut self.reader, block)?;
1422 self.decoder.read_record_batch(block, &buffer)
1423 }
1424
1425 pub fn get_ref(&self) -> &R {
1429 &self.reader
1430 }
1431
1432 pub fn get_mut(&mut self) -> &mut R {
1436 &mut self.reader
1437 }
1438
1439 pub unsafe fn with_skip_validation(mut self, skip_validation: bool) -> Self {
1445 self.decoder = unsafe { self.decoder.with_skip_validation(skip_validation) };
1446 self
1447 }
1448}
1449
1450impl<R: Read + Seek> Iterator for FileReader<R> {
1451 type Item = Result<RecordBatch, ArrowError>;
1452
1453 fn next(&mut self) -> Option<Self::Item> {
1454 if self.current_block < self.total_blocks {
1456 self.maybe_next().transpose()
1457 } else {
1458 None
1459 }
1460 }
1461}
1462
1463impl<R: Read + Seek> RecordBatchReader for FileReader<R> {
1464 fn schema(&self) -> SchemaRef {
1465 self.schema()
1466 }
1467}
1468
1469pub struct StreamReader<R> {
1503 reader: MessageReader<R>,
1505
1506 schema: SchemaRef,
1508
1509 dictionaries_by_id: HashMap<i64, ArrayRef>,
1513
1514 finished: bool,
1518
1519 projection: Option<(Vec<usize>, Schema)>,
1521
1522 skip_validation: UnsafeFlag,
1526}
1527
1528impl<R> fmt::Debug for StreamReader<R> {
1529 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> {
1530 f.debug_struct("StreamReader<R>")
1531 .field("reader", &"R")
1532 .field("schema", &self.schema)
1533 .field("dictionaries_by_id", &self.dictionaries_by_id)
1534 .field("finished", &self.finished)
1535 .field("projection", &self.projection)
1536 .finish()
1537 }
1538}
1539
1540impl<R: Read> StreamReader<BufReader<R>> {
1541 pub fn try_new_buffered(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
1545 Self::try_new(BufReader::new(reader), projection)
1546 }
1547}
1548
1549impl<R: Read> StreamReader<R> {
1550 pub fn try_new(
1562 reader: R,
1563 projection: Option<Vec<usize>>,
1564 ) -> Result<StreamReader<R>, ArrowError> {
1565 let mut msg_reader = MessageReader::new(reader);
1566 let message = msg_reader.maybe_next()?;
1567 let Some((message, _)) = message else {
1568 return Err(ArrowError::IpcError(
1569 "Expected schema message, found empty stream.".to_string(),
1570 ));
1571 };
1572
1573 if message.header_type() != Message::MessageHeader::Schema {
1574 return Err(ArrowError::IpcError(format!(
1575 "Expected a schema as the first message in the stream, got: {:?}",
1576 message.header_type()
1577 )));
1578 }
1579
1580 let schema = message.header_as_schema().ok_or_else(|| {
1581 ArrowError::ParseError("Failed to parse schema from message header".to_string())
1582 })?;
1583 let schema = crate::convert::fb_to_schema(schema);
1584
1585 let dictionaries_by_id = HashMap::new();
1587
1588 let projection = match projection {
1589 Some(projection_indices) => {
1590 let schema = schema.project(&projection_indices)?;
1591 Some((projection_indices, schema))
1592 }
1593 _ => None,
1594 };
1595
1596 Ok(Self {
1597 reader: msg_reader,
1598 schema: Arc::new(schema),
1599 finished: false,
1600 dictionaries_by_id,
1601 projection,
1602 skip_validation: UnsafeFlag::new(),
1603 })
1604 }
1605
1606 #[deprecated(since = "53.0.0", note = "use `try_new` instead")]
1608 pub fn try_new_unbuffered(
1609 reader: R,
1610 projection: Option<Vec<usize>>,
1611 ) -> Result<Self, ArrowError> {
1612 Self::try_new(reader, projection)
1613 }
1614
1615 pub fn schema(&self) -> SchemaRef {
1617 self.schema.clone()
1618 }
1619
1620 pub fn is_finished(&self) -> bool {
1622 self.finished
1623 }
1624
1625 fn maybe_next(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
1626 if self.finished {
1627 return Ok(None);
1628 }
1629
1630 loop {
1632 let message = self.next_ipc_message()?;
1633 let Some(message) = message else {
1634 self.finished = true;
1636 return Ok(None);
1637 };
1638
1639 match message {
1640 IpcMessage::Schema(_) => {
1641 return Err(ArrowError::IpcError(
1642 "Expected a record batch, but found a schema".to_string(),
1643 ));
1644 }
1645 IpcMessage::RecordBatch(record_batch) => {
1646 return Ok(Some(record_batch));
1647 }
1648 IpcMessage::DictionaryBatch { .. } => {
1649 continue;
1650 }
1651 };
1652 }
1653 }
1654
1655 pub(crate) fn next_ipc_message(&mut self) -> Result<Option<IpcMessage>, ArrowError> {
1663 let message = self.reader.maybe_next()?;
1664 let Some((message, body)) = message else {
1665 return Ok(None);
1667 };
1668
1669 let ipc_message = match message.header_type() {
1670 Message::MessageHeader::Schema => {
1671 let schema = message.header_as_schema().ok_or_else(|| {
1672 ArrowError::ParseError("Failed to parse schema from message header".to_string())
1673 })?;
1674 let arrow_schema = crate::convert::fb_to_schema(schema);
1675 IpcMessage::Schema(arrow_schema)
1676 }
1677 Message::MessageHeader::RecordBatch => {
1678 let batch = message.header_as_record_batch().ok_or_else(|| {
1679 ArrowError::IpcError("Unable to read IPC message as record batch".to_string())
1680 })?;
1681
1682 let version = message.version();
1683 let schema = self.schema.clone();
1684 let record_batch = RecordBatchDecoder::try_new(
1685 &body.into(),
1686 batch,
1687 schema,
1688 &self.dictionaries_by_id,
1689 &version,
1690 )?
1691 .with_projection(self.projection.as_ref().map(|x| x.0.as_ref()))
1692 .with_require_alignment(false)
1693 .with_skip_validation(self.skip_validation.clone())
1694 .read_record_batch()?;
1695 IpcMessage::RecordBatch(record_batch)
1696 }
1697 Message::MessageHeader::DictionaryBatch => {
1698 let dict = message.header_as_dictionary_batch().ok_or_else(|| {
1699 ArrowError::ParseError(
1700 "Failed to parse dictionary batch from message header".to_string(),
1701 )
1702 })?;
1703
1704 let version = message.version();
1705 let dict_values = get_dictionary_values(
1706 &body.into(),
1707 dict,
1708 &self.schema,
1709 &mut self.dictionaries_by_id,
1710 &version,
1711 false,
1712 self.skip_validation.clone(),
1713 )?;
1714
1715 update_dictionaries(
1716 &mut self.dictionaries_by_id,
1717 dict.isDelta(),
1718 dict.id(),
1719 dict_values.clone(),
1720 )?;
1721
1722 IpcMessage::DictionaryBatch {
1723 id: dict.id(),
1724 is_delta: (dict.isDelta()),
1725 values: (dict_values),
1726 }
1727 }
1728 x => {
1729 return Err(ArrowError::ParseError(format!(
1730 "Unsupported message header type in IPC stream: '{x:?}'"
1731 )));
1732 }
1733 };
1734
1735 Ok(Some(ipc_message))
1736 }
1737
1738 pub fn get_ref(&self) -> &R {
1742 self.reader.inner()
1743 }
1744
1745 pub fn get_mut(&mut self) -> &mut R {
1749 self.reader.inner_mut()
1750 }
1751
1752 pub unsafe fn with_skip_validation(mut self, skip_validation: bool) -> Self {
1758 unsafe { self.skip_validation.set(skip_validation) };
1759 self
1760 }
1761}
1762
1763impl<R: Read> Iterator for StreamReader<R> {
1764 type Item = Result<RecordBatch, ArrowError>;
1765
1766 fn next(&mut self) -> Option<Self::Item> {
1767 self.maybe_next().transpose()
1768 }
1769}
1770
1771impl<R: Read> RecordBatchReader for StreamReader<R> {
1772 fn schema(&self) -> SchemaRef {
1773 self.schema.clone()
1774 }
1775}
1776
1777#[derive(Debug)]
1783#[allow(dead_code)]
1784pub(crate) enum IpcMessage {
1785 Schema(arrow_schema::Schema),
1786 RecordBatch(RecordBatch),
1787 DictionaryBatch {
1788 id: i64,
1789 is_delta: bool,
1790 values: ArrayRef,
1791 },
1792}
1793
1794struct MessageReader<R> {
1797 reader: R,
1798 buf: Vec<u8>,
1799}
1800
1801impl<R: Read> MessageReader<R> {
1802 fn new(reader: R) -> Self {
1803 Self {
1804 reader,
1805 buf: Vec::new(),
1806 }
1807 }
1808
1809 fn maybe_next(&mut self) -> Result<Option<(Message::Message<'_>, MutableBuffer)>, ArrowError> {
1820 let meta_len = self.read_meta_len()?;
1821 let Some(meta_len) = meta_len else {
1822 return Ok(None);
1823 };
1824
1825 self.buf.resize(meta_len, 0);
1826 self.reader.read_exact(&mut self.buf)?;
1827
1828 let message = crate::root_as_message(self.buf.as_slice()).map_err(|err| {
1829 ArrowError::ParseError(format!("Unable to get root as message: {err:?}"))
1830 })?;
1831
1832 let mut buf = MutableBuffer::from_len_zeroed(message.bodyLength() as usize);
1833 self.reader.read_exact(&mut buf)?;
1834
1835 Ok(Some((message, buf)))
1836 }
1837
1838 fn inner_mut(&mut self) -> &mut R {
1840 &mut self.reader
1841 }
1842
1843 fn inner(&self) -> &R {
1845 &self.reader
1846 }
1847
1848 pub fn read_meta_len(&mut self) -> Result<Option<usize>, ArrowError> {
1857 let mut meta_len: [u8; 4] = [0; 4];
1858 match self.reader.read_exact(&mut meta_len) {
1859 Ok(_) => {}
1860 Err(e) => {
1861 return if e.kind() == std::io::ErrorKind::UnexpectedEof {
1862 Ok(None)
1866 } else {
1867 Err(ArrowError::from(e))
1868 };
1869 }
1870 };
1871
1872 let meta_len = {
1873 if meta_len == CONTINUATION_MARKER {
1876 self.reader.read_exact(&mut meta_len)?;
1877 }
1878
1879 i32::from_le_bytes(meta_len)
1880 };
1881
1882 if meta_len == 0 {
1883 return Ok(None);
1884 }
1885
1886 let meta_len = usize::try_from(meta_len)
1887 .map_err(|_| ArrowError::ParseError(format!("Invalid metadata length: {meta_len}")))?;
1888
1889 Ok(Some(meta_len))
1890 }
1891}
1892
1893#[cfg(test)]
1894mod tests {
1895 use std::io::Cursor;
1896
1897 use crate::convert::fb_to_schema;
1898 use crate::writer::{
1899 DictionaryTracker, IpcDataGenerator, IpcWriteOptions, unslice_run_array, write_message,
1900 };
1901
1902 use super::*;
1903
1904 use crate::{root_as_footer, root_as_message, size_prefixed_root_as_message};
1905 use arrow_array::builder::{PrimitiveRunBuilder, UnionBuilder};
1906 use arrow_array::types::*;
1907 use arrow_buffer::{NullBuffer, OffsetBuffer};
1908 use arrow_data::ArrayDataBuilder;
1909
1910 fn create_test_projection_schema() -> Schema {
1911 let list_data_type = DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true)));
1913
1914 let fixed_size_list_data_type =
1915 DataType::FixedSizeList(Arc::new(Field::new_list_field(DataType::Int32, false)), 3);
1916
1917 let union_fields = UnionFields::from_fields(vec![
1918 Field::new("a", DataType::Int32, false),
1919 Field::new("b", DataType::Float64, false),
1920 ]);
1921
1922 let union_data_type = DataType::Union(union_fields, UnionMode::Dense);
1923
1924 let struct_fields = Fields::from(vec![
1925 Field::new("id", DataType::Int32, false),
1926 Field::new_list("list", Field::new_list_field(DataType::Int8, true), false),
1927 ]);
1928 let struct_data_type = DataType::Struct(struct_fields);
1929
1930 let run_encoded_data_type = DataType::RunEndEncoded(
1931 Arc::new(Field::new("run_ends", DataType::Int16, false)),
1932 Arc::new(Field::new("values", DataType::Int32, true)),
1933 );
1934
1935 Schema::new(vec![
1937 Field::new("f0", DataType::UInt32, false),
1938 Field::new("f1", DataType::Utf8, false),
1939 Field::new("f2", DataType::Boolean, false),
1940 Field::new("f3", union_data_type, true),
1941 Field::new("f4", DataType::Null, true),
1942 Field::new("f5", DataType::Float64, true),
1943 Field::new("f6", list_data_type, false),
1944 Field::new("f7", DataType::FixedSizeBinary(3), true),
1945 Field::new("f8", fixed_size_list_data_type, false),
1946 Field::new("f9", struct_data_type, false),
1947 Field::new("f10", run_encoded_data_type, false),
1948 Field::new("f11", DataType::Boolean, false),
1949 Field::new_dictionary("f12", DataType::Int8, DataType::Utf8, false),
1950 Field::new("f13", DataType::Utf8, false),
1951 ])
1952 }
1953
1954 fn create_test_projection_batch_data(schema: &Schema) -> RecordBatch {
1955 let array0 = UInt32Array::from(vec![1, 2, 3]);
1957 let array1 = StringArray::from(vec!["foo", "bar", "baz"]);
1958 let array2 = BooleanArray::from(vec![true, false, true]);
1959
1960 let mut union_builder = UnionBuilder::new_dense();
1961 union_builder.append::<Int32Type>("a", 1).unwrap();
1962 union_builder.append::<Float64Type>("b", 10.1).unwrap();
1963 union_builder.append_null::<Float64Type>("b").unwrap();
1964 let array3 = union_builder.build().unwrap();
1965
1966 let array4 = NullArray::new(3);
1967 let array5 = Float64Array::from(vec![Some(1.1), None, Some(3.3)]);
1968 let array6_values = vec![
1969 Some(vec![Some(10), Some(10), Some(10)]),
1970 Some(vec![Some(20), Some(20), Some(20)]),
1971 Some(vec![Some(30), Some(30)]),
1972 ];
1973 let array6 = ListArray::from_iter_primitive::<Int32Type, _, _>(array6_values);
1974 let array7_values = vec![vec![11, 12, 13], vec![22, 23, 24], vec![33, 34, 35]];
1975 let array7 = FixedSizeBinaryArray::try_from_iter(array7_values.into_iter()).unwrap();
1976
1977 let array8_values = ArrayData::builder(DataType::Int32)
1978 .len(9)
1979 .add_buffer(Buffer::from_slice_ref([40, 41, 42, 43, 44, 45, 46, 47, 48]))
1980 .build()
1981 .unwrap();
1982 let array8_data = ArrayData::builder(schema.field(8).data_type().clone())
1983 .len(3)
1984 .add_child_data(array8_values)
1985 .build()
1986 .unwrap();
1987 let array8 = FixedSizeListArray::from(array8_data);
1988
1989 let array9_id: ArrayRef = Arc::new(Int32Array::from(vec![1001, 1002, 1003]));
1990 let array9_list: ArrayRef =
1991 Arc::new(ListArray::from_iter_primitive::<Int8Type, _, _>(vec![
1992 Some(vec![Some(-10)]),
1993 Some(vec![Some(-20), Some(-20), Some(-20)]),
1994 Some(vec![Some(-30)]),
1995 ]));
1996 let array9 = ArrayDataBuilder::new(schema.field(9).data_type().clone())
1997 .add_child_data(array9_id.into_data())
1998 .add_child_data(array9_list.into_data())
1999 .len(3)
2000 .build()
2001 .unwrap();
2002 let array9 = StructArray::from(array9);
2003
2004 let array10_input = vec![Some(1_i32), None, None];
2005 let mut array10_builder = PrimitiveRunBuilder::<Int16Type, Int32Type>::new();
2006 array10_builder.extend(array10_input);
2007 let array10 = array10_builder.finish();
2008
2009 let array11 = BooleanArray::from(vec![false, false, true]);
2010
2011 let array12_values = StringArray::from(vec!["x", "yy", "zzz"]);
2012 let array12_keys = Int8Array::from_iter_values([1, 1, 2]);
2013 let array12 = DictionaryArray::new(array12_keys, Arc::new(array12_values));
2014
2015 let array13 = StringArray::from(vec!["a", "bb", "ccc"]);
2016
2017 RecordBatch::try_new(
2019 Arc::new(schema.clone()),
2020 vec![
2021 Arc::new(array0),
2022 Arc::new(array1),
2023 Arc::new(array2),
2024 Arc::new(array3),
2025 Arc::new(array4),
2026 Arc::new(array5),
2027 Arc::new(array6),
2028 Arc::new(array7),
2029 Arc::new(array8),
2030 Arc::new(array9),
2031 Arc::new(array10),
2032 Arc::new(array11),
2033 Arc::new(array12),
2034 Arc::new(array13),
2035 ],
2036 )
2037 .unwrap()
2038 }
2039
2040 #[test]
2041 fn test_negative_meta_len_start_stream() {
2042 let bytes = i32::to_le_bytes(-1);
2043 let mut buf = vec![];
2044 buf.extend(CONTINUATION_MARKER);
2045 buf.extend(bytes);
2046
2047 let reader_err = StreamReader::try_new(Cursor::new(buf), None).err();
2048 assert!(reader_err.is_some());
2049 assert_eq!(
2050 reader_err.unwrap().to_string(),
2051 "Parser error: Invalid metadata length: -1"
2052 );
2053 }
2054
2055 #[test]
2056 fn test_negative_meta_len_mid_stream() {
2057 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
2058 let mut buf = Vec::new();
2059 {
2060 let mut writer = crate::writer::StreamWriter::try_new(&mut buf, &schema).unwrap();
2061 let batch =
2062 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(Int32Array::from(vec![1]))])
2063 .unwrap();
2064 writer.write(&batch).unwrap();
2065 }
2066
2067 let bytes = i32::to_le_bytes(-1);
2068 buf.extend(CONTINUATION_MARKER);
2069 buf.extend(bytes);
2070
2071 let mut reader = StreamReader::try_new(Cursor::new(buf), None).unwrap();
2072 assert!(reader.maybe_next().is_ok());
2074 let batch_err = reader.maybe_next().err();
2076 assert!(batch_err.is_some());
2077 assert_eq!(
2078 batch_err.unwrap().to_string(),
2079 "Parser error: Invalid metadata length: -1"
2080 );
2081 }
2082
2083 #[test]
2084 fn test_missing_buffer_metadata_error() {
2085 use crate::r#gen::Message::*;
2086 use flatbuffers::FlatBufferBuilder;
2087
2088 let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Int32, true)]));
2089
2090 let mut fbb = FlatBufferBuilder::new();
2093 let nodes = fbb.create_vector(&[FieldNode::new(2, 0)]);
2094 let buffers = fbb.create_vector(&[crate::Buffer::new(0, 8)]);
2095 let batch_offset = RecordBatch::create(
2096 &mut fbb,
2097 &RecordBatchArgs {
2098 length: 2,
2099 nodes: Some(nodes),
2100 buffers: Some(buffers),
2101 compression: None,
2102 variadicBufferCounts: None,
2103 },
2104 );
2105 fbb.finish_minimal(batch_offset);
2106 let batch_bytes = fbb.finished_data().to_vec();
2107 let batch = flatbuffers::root::<RecordBatch>(&batch_bytes).unwrap();
2108
2109 let data_buffer = Buffer::from(vec![0u8; 8]);
2110 let dictionaries: HashMap<i64, ArrayRef> = HashMap::new();
2111 let metadata = MetadataVersion::V5;
2112
2113 let decoder = RecordBatchDecoder::try_new(
2114 &data_buffer,
2115 batch,
2116 schema.clone(),
2117 &dictionaries,
2118 &metadata,
2119 )
2120 .unwrap();
2121
2122 let result = decoder.read_record_batch();
2123
2124 match result {
2125 Err(ArrowError::IpcError(msg)) => {
2126 assert_eq!(msg, "Buffer count mismatched with metadata");
2127 }
2128 other => panic!("unexpected error: {other:?}"),
2129 }
2130 }
2131
2132 #[test]
2134 fn test_read_legacy_empty_list_without_offsets_buffer() {
2135 use crate::r#gen::Message::*;
2136 use flatbuffers::FlatBufferBuilder;
2137
2138 let schema = Arc::new(Schema::new(vec![Field::new_list(
2139 "items",
2140 Field::new_list_field(DataType::Int32, true),
2141 true,
2142 )]));
2143
2144 let mut fbb = FlatBufferBuilder::new();
2147 let nodes = fbb.create_vector(&[
2148 FieldNode::new(0, 0), FieldNode::new(0, 0), ]);
2151 let buffers = fbb.create_vector(&[
2152 crate::Buffer::new(0, 0), crate::Buffer::new(0, 0), crate::Buffer::new(0, 0), crate::Buffer::new(0, 0), ]);
2157 let batch_offset = RecordBatch::create(
2158 &mut fbb,
2159 &RecordBatchArgs {
2160 length: 0,
2161 nodes: Some(nodes),
2162 buffers: Some(buffers),
2163 compression: None,
2164 variadicBufferCounts: None,
2165 },
2166 );
2167 fbb.finish_minimal(batch_offset);
2168 let batch_bytes = fbb.finished_data().to_vec();
2169 let batch = flatbuffers::root::<RecordBatch>(&batch_bytes).unwrap();
2170
2171 let body = Buffer::from(Vec::<u8>::new());
2172 let dictionaries: HashMap<i64, ArrayRef> = HashMap::new();
2173 let metadata = MetadataVersion::V5;
2174
2175 let decoder =
2176 RecordBatchDecoder::try_new(&body, batch, schema.clone(), &dictionaries, &metadata)
2177 .unwrap();
2178
2179 let read_batch = decoder.read_record_batch().unwrap();
2180 assert_eq!(read_batch.num_rows(), 0);
2181
2182 let list = read_batch
2183 .column(0)
2184 .as_any()
2185 .downcast_ref::<ListArray>()
2186 .unwrap();
2187 assert_eq!(list.len(), 0);
2188 assert_eq!(list.values().len(), 0);
2189 }
2190
2191 #[test]
2193 fn test_read_legacy_empty_utf8_and_binary_without_offsets_buffer() {
2194 use crate::r#gen::Message::*;
2195 use flatbuffers::FlatBufferBuilder;
2196
2197 let schema = Arc::new(Schema::new(vec![
2198 Field::new("name", DataType::Utf8, true),
2199 Field::new("payload", DataType::Binary, true),
2200 ]));
2201
2202 let mut fbb = FlatBufferBuilder::new();
2205 let nodes = fbb.create_vector(&[
2206 FieldNode::new(0, 0), FieldNode::new(0, 0), ]);
2209 let buffers = fbb.create_vector(&[
2210 crate::Buffer::new(0, 0), crate::Buffer::new(0, 0), crate::Buffer::new(0, 0), crate::Buffer::new(0, 0), crate::Buffer::new(0, 0), crate::Buffer::new(0, 0), ]);
2217 let batch_offset = RecordBatch::create(
2218 &mut fbb,
2219 &RecordBatchArgs {
2220 length: 0,
2221 nodes: Some(nodes),
2222 buffers: Some(buffers),
2223 compression: None,
2224 variadicBufferCounts: None,
2225 },
2226 );
2227 fbb.finish_minimal(batch_offset);
2228 let batch_bytes = fbb.finished_data().to_vec();
2229 let batch = flatbuffers::root::<RecordBatch>(&batch_bytes).unwrap();
2230
2231 let body = Buffer::from(Vec::<u8>::new());
2232 let dictionaries: HashMap<i64, ArrayRef> = HashMap::new();
2233 let metadata = MetadataVersion::V5;
2234
2235 let decoder =
2236 RecordBatchDecoder::try_new(&body, batch, schema.clone(), &dictionaries, &metadata)
2237 .unwrap();
2238
2239 let read_batch = decoder.read_record_batch().unwrap();
2240 assert_eq!(read_batch.num_rows(), 0);
2241
2242 let utf8 = read_batch
2243 .column(0)
2244 .as_any()
2245 .downcast_ref::<StringArray>()
2246 .unwrap();
2247 assert_eq!(utf8.len(), 0);
2248 assert_eq!(utf8.value_offsets(), [0]);
2249
2250 let binary = read_batch
2251 .column(1)
2252 .as_any()
2253 .downcast_ref::<BinaryArray>()
2254 .unwrap();
2255 assert_eq!(binary.len(), 0);
2256 assert_eq!(binary.value_offsets(), [0]);
2257 }
2258
2259 #[test]
2260 fn test_projection_array_values() {
2261 let schema = create_test_projection_schema();
2263
2264 let batch = create_test_projection_batch_data(&schema);
2266
2267 let mut buf = Vec::new();
2269 {
2270 let mut writer = crate::writer::FileWriter::try_new(&mut buf, &schema).unwrap();
2271 writer.write(&batch).unwrap();
2272 writer.finish().unwrap();
2273 }
2274
2275 for index in 0..12 {
2277 let projection = vec![index];
2278 let reader = FileReader::try_new(std::io::Cursor::new(buf.clone()), Some(projection));
2279 let read_batch = reader.unwrap().next().unwrap().unwrap();
2280 let projected_column = read_batch.column(0);
2281 let expected_column = batch.column(index);
2282
2283 assert_eq!(projected_column.as_ref(), expected_column.as_ref());
2285 }
2286
2287 {
2288 let reader =
2290 FileReader::try_new(std::io::Cursor::new(buf.clone()), Some(vec![3, 2, 1]));
2291 let read_batch = reader.unwrap().next().unwrap().unwrap();
2292 let expected_batch = batch.project(&[3, 2, 1]).unwrap();
2293 assert_eq!(read_batch, expected_batch);
2294 }
2295 }
2296
2297 #[test]
2298 fn test_arrow_single_float_row() {
2299 let schema = Schema::new(vec![
2300 Field::new("a", DataType::Float32, false),
2301 Field::new("b", DataType::Float32, false),
2302 Field::new("c", DataType::Int32, false),
2303 Field::new("d", DataType::Int32, false),
2304 ]);
2305 let arrays = vec![
2306 Arc::new(Float32Array::from(vec![1.23])) as ArrayRef,
2307 Arc::new(Float32Array::from(vec![-6.50])) as ArrayRef,
2308 Arc::new(Int32Array::from(vec![2])) as ArrayRef,
2309 Arc::new(Int32Array::from(vec![1])) as ArrayRef,
2310 ];
2311 let batch = RecordBatch::try_new(Arc::new(schema.clone()), arrays).unwrap();
2312 let mut file = tempfile::tempfile().unwrap();
2314 let mut stream_writer = crate::writer::StreamWriter::try_new(&mut file, &schema).unwrap();
2315 stream_writer.write(&batch).unwrap();
2316 stream_writer.finish().unwrap();
2317
2318 drop(stream_writer);
2319
2320 file.rewind().unwrap();
2321
2322 let reader = StreamReader::try_new(&mut file, None).unwrap();
2324
2325 reader.for_each(|batch| {
2326 let batch = batch.unwrap();
2327 assert!(
2328 batch
2329 .column(0)
2330 .as_any()
2331 .downcast_ref::<Float32Array>()
2332 .unwrap()
2333 .value(0)
2334 != 0.0
2335 );
2336 assert!(
2337 batch
2338 .column(1)
2339 .as_any()
2340 .downcast_ref::<Float32Array>()
2341 .unwrap()
2342 .value(0)
2343 != 0.0
2344 );
2345 });
2346
2347 file.rewind().unwrap();
2348
2349 let reader = StreamReader::try_new(file, Some(vec![0, 3])).unwrap();
2351
2352 reader.for_each(|batch| {
2353 let batch = batch.unwrap();
2354 assert_eq!(batch.schema().fields().len(), 2);
2355 assert_eq!(batch.schema().fields()[0].data_type(), &DataType::Float32);
2356 assert_eq!(batch.schema().fields()[1].data_type(), &DataType::Int32);
2357 });
2358 }
2359
2360 fn write_ipc(rb: &RecordBatch) -> Vec<u8> {
2362 let mut buf = Vec::new();
2363 let mut writer = crate::writer::FileWriter::try_new(&mut buf, rb.schema_ref()).unwrap();
2364 writer.write(rb).unwrap();
2365 writer.finish().unwrap();
2366 buf
2367 }
2368
2369 fn read_ipc(buf: &[u8]) -> Result<RecordBatch, ArrowError> {
2371 let mut reader = FileReader::try_new(std::io::Cursor::new(buf), None)?;
2372 reader.next().unwrap()
2373 }
2374
2375 fn read_ipc_skip_validation(buf: &[u8]) -> Result<RecordBatch, ArrowError> {
2378 let mut reader = unsafe {
2379 FileReader::try_new(std::io::Cursor::new(buf), None)?.with_skip_validation(true)
2380 };
2381 reader.next().unwrap()
2382 }
2383
2384 fn roundtrip_ipc(rb: &RecordBatch) -> RecordBatch {
2385 let buf = write_ipc(rb);
2386 read_ipc(&buf).unwrap()
2387 }
2388
2389 fn read_ipc_with_decoder(buf: Vec<u8>) -> Result<RecordBatch, ArrowError> {
2392 read_ipc_with_decoder_inner(buf, false)
2393 }
2394
2395 fn read_ipc_with_decoder_skip_validation(buf: Vec<u8>) -> Result<RecordBatch, ArrowError> {
2398 read_ipc_with_decoder_inner(buf, true)
2399 }
2400
2401 fn read_ipc_with_decoder_inner(
2402 buf: Vec<u8>,
2403 skip_validation: bool,
2404 ) -> Result<RecordBatch, ArrowError> {
2405 let buffer = Buffer::from_vec(buf);
2406 let trailer_start = buffer.len() - 10;
2407 let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap())?;
2408 let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start])
2409 .map_err(|e| ArrowError::InvalidArgumentError(format!("Invalid footer: {e}")))?;
2410
2411 let schema = fb_to_schema(footer.schema().unwrap());
2412
2413 let mut decoder = unsafe {
2414 FileDecoder::new(Arc::new(schema), footer.version())
2415 .with_skip_validation(skip_validation)
2416 };
2417 for block in footer.dictionaries().iter().flatten() {
2419 let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
2420 let data = buffer.slice_with_length(block.offset() as _, block_len);
2421 decoder.read_dictionary(block, &data)?
2422 }
2423
2424 let batches = footer.recordBatches().unwrap();
2426 assert_eq!(batches.len(), 1); let block = batches.get(0);
2429 let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
2430 let data = buffer.slice_with_length(block.offset() as _, block_len);
2431 Ok(decoder.read_record_batch(block, &data)?.unwrap())
2432 }
2433
2434 fn write_stream(rb: &RecordBatch) -> Vec<u8> {
2436 let mut buf = Vec::new();
2437 let mut writer = crate::writer::StreamWriter::try_new(&mut buf, rb.schema_ref()).unwrap();
2438 writer.write(rb).unwrap();
2439 writer.finish().unwrap();
2440 buf
2441 }
2442
2443 fn read_stream(buf: &[u8]) -> Result<RecordBatch, ArrowError> {
2445 let mut reader = StreamReader::try_new(std::io::Cursor::new(buf), None)?;
2446 reader.next().unwrap()
2447 }
2448
2449 fn read_stream_skip_validation(buf: &[u8]) -> Result<RecordBatch, ArrowError> {
2452 let mut reader = unsafe {
2453 StreamReader::try_new(std::io::Cursor::new(buf), None)?.with_skip_validation(true)
2454 };
2455 reader.next().unwrap()
2456 }
2457
2458 fn roundtrip_ipc_stream(rb: &RecordBatch) -> RecordBatch {
2459 let buf = write_stream(rb);
2460 read_stream(&buf).unwrap()
2461 }
2462
2463 #[test]
2464 fn test_roundtrip_with_custom_metadata() {
2465 let schema = Schema::new(vec![Field::new("dummy", DataType::Float64, false)]);
2466 let mut buf = Vec::new();
2467 let mut writer = crate::writer::FileWriter::try_new(&mut buf, &schema).unwrap();
2468 let mut test_metadata = HashMap::new();
2469 test_metadata.insert("abc".to_string(), "abc".to_string());
2470 test_metadata.insert("def".to_string(), "def".to_string());
2471 for (k, v) in &test_metadata {
2472 writer.write_metadata(k, v);
2473 }
2474 writer.finish().unwrap();
2475 drop(writer);
2476
2477 let reader = crate::reader::FileReader::try_new(std::io::Cursor::new(buf), None).unwrap();
2478 assert_eq!(reader.custom_metadata(), &test_metadata);
2479 }
2480
2481 #[test]
2482 fn test_roundtrip_nested_dict() {
2483 let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2484
2485 let array = Arc::new(inner) as ArrayRef;
2486
2487 let dctfield = Arc::new(Field::new("dict", array.data_type().clone(), false));
2488
2489 let s = StructArray::from(vec![(dctfield, array)]);
2490 let struct_array = Arc::new(s) as ArrayRef;
2491
2492 let schema = Arc::new(Schema::new(vec![Field::new(
2493 "struct",
2494 struct_array.data_type().clone(),
2495 false,
2496 )]));
2497
2498 let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
2499
2500 assert_eq!(batch, roundtrip_ipc(&batch));
2501 }
2502
2503 #[test]
2504 fn test_roundtrip_nested_dict_no_preserve_dict_id() {
2505 let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2506
2507 let array = Arc::new(inner) as ArrayRef;
2508
2509 let dctfield = Arc::new(Field::new("dict", array.data_type().clone(), false));
2510
2511 let s = StructArray::from(vec![(dctfield, array)]);
2512 let struct_array = Arc::new(s) as ArrayRef;
2513
2514 let schema = Arc::new(Schema::new(vec![Field::new(
2515 "struct",
2516 struct_array.data_type().clone(),
2517 false,
2518 )]));
2519
2520 let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
2521
2522 let mut buf = Vec::new();
2523 let mut writer = crate::writer::FileWriter::try_new_with_options(
2524 &mut buf,
2525 batch.schema_ref(),
2526 IpcWriteOptions::default(),
2527 )
2528 .unwrap();
2529 writer.write(&batch).unwrap();
2530 writer.finish().unwrap();
2531 drop(writer);
2532
2533 let mut reader = FileReader::try_new(std::io::Cursor::new(buf), None).unwrap();
2534
2535 assert_eq!(batch, reader.next().unwrap().unwrap());
2536 }
2537
2538 fn check_union_with_builder(mut builder: UnionBuilder) {
2539 builder.append::<Int32Type>("a", 1).unwrap();
2540 builder.append_null::<Int32Type>("a").unwrap();
2541 builder.append::<Float64Type>("c", 3.0).unwrap();
2542 builder.append::<Int32Type>("a", 4).unwrap();
2543 builder.append::<Int64Type>("d", 11).unwrap();
2544 let union = builder.build().unwrap();
2545
2546 let schema = Arc::new(Schema::new(vec![Field::new(
2547 "union",
2548 union.data_type().clone(),
2549 false,
2550 )]));
2551
2552 let union_array = Arc::new(union) as ArrayRef;
2553
2554 let rb = RecordBatch::try_new(schema, vec![union_array]).unwrap();
2555 let rb2 = roundtrip_ipc(&rb);
2556 assert_eq!(rb.schema(), rb2.schema());
2559 assert_eq!(rb.num_columns(), rb2.num_columns());
2560 assert_eq!(rb.num_rows(), rb2.num_rows());
2561 let union1 = rb.column(0);
2562 let union2 = rb2.column(0);
2563
2564 assert_eq!(union1, union2);
2565 }
2566
2567 #[test]
2568 fn test_roundtrip_dense_union() {
2569 check_union_with_builder(UnionBuilder::new_dense());
2570 }
2571
2572 #[test]
2573 fn test_roundtrip_sparse_union() {
2574 check_union_with_builder(UnionBuilder::new_sparse());
2575 }
2576
2577 #[test]
2578 fn test_roundtrip_struct_empty_fields() {
2579 let nulls = NullBuffer::from(&[true, true, false]);
2580 let rb = RecordBatch::try_from_iter([(
2581 "",
2582 Arc::new(StructArray::new_empty_fields(nulls.len(), Some(nulls))) as _,
2583 )])
2584 .unwrap();
2585 let rb2 = roundtrip_ipc(&rb);
2586 assert_eq!(rb, rb2);
2587 }
2588
2589 #[test]
2590 fn test_roundtrip_stream_run_array_sliced() {
2591 let run_array_1: Int32RunArray = vec!["a", "a", "a", "b", "b", "c", "c", "c"]
2592 .into_iter()
2593 .collect();
2594 let run_array_1_sliced = run_array_1.slice(2, 5);
2595
2596 let run_array_2_inupt = vec![Some(1_i32), None, None, Some(2), Some(2)];
2597 let mut run_array_2_builder = PrimitiveRunBuilder::<Int16Type, Int32Type>::new();
2598 run_array_2_builder.extend(run_array_2_inupt);
2599 let run_array_2 = run_array_2_builder.finish();
2600
2601 let schema = Arc::new(Schema::new(vec![
2602 Field::new(
2603 "run_array_1_sliced",
2604 run_array_1_sliced.data_type().clone(),
2605 false,
2606 ),
2607 Field::new("run_array_2", run_array_2.data_type().clone(), false),
2608 ]));
2609 let input_batch = RecordBatch::try_new(
2610 schema,
2611 vec![Arc::new(run_array_1_sliced.clone()), Arc::new(run_array_2)],
2612 )
2613 .unwrap();
2614 let output_batch = roundtrip_ipc_stream(&input_batch);
2615
2616 assert_eq!(input_batch.column(1), output_batch.column(1));
2620
2621 let run_array_1_unsliced = unslice_run_array(run_array_1_sliced.into_data()).unwrap();
2622 assert_eq!(run_array_1_unsliced, output_batch.column(0).into_data());
2623 }
2624
2625 #[test]
2626 fn test_roundtrip_stream_nested_dict() {
2627 let xs = vec!["AA", "BB", "AA", "CC", "BB"];
2628 let dict = Arc::new(
2629 xs.clone()
2630 .into_iter()
2631 .collect::<DictionaryArray<Int8Type>>(),
2632 );
2633 let string_array: ArrayRef = Arc::new(StringArray::from(xs.clone()));
2634 let struct_array = StructArray::from(vec![
2635 (
2636 Arc::new(Field::new("f2.1", DataType::Utf8, false)),
2637 string_array,
2638 ),
2639 (
2640 Arc::new(Field::new("f2.2_struct", dict.data_type().clone(), false)),
2641 dict.clone() as ArrayRef,
2642 ),
2643 ]);
2644 let schema = Arc::new(Schema::new(vec![
2645 Field::new("f1_string", DataType::Utf8, false),
2646 Field::new("f2_struct", struct_array.data_type().clone(), false),
2647 ]));
2648 let input_batch = RecordBatch::try_new(
2649 schema,
2650 vec![
2651 Arc::new(StringArray::from(xs.clone())),
2652 Arc::new(struct_array),
2653 ],
2654 )
2655 .unwrap();
2656 let output_batch = roundtrip_ipc_stream(&input_batch);
2657 assert_eq!(input_batch, output_batch);
2658 }
2659
2660 #[test]
2661 fn test_roundtrip_stream_nested_dict_of_map_of_dict() {
2662 let values = StringArray::from(vec![Some("a"), None, Some("b"), Some("c")]);
2663 let values = Arc::new(values) as ArrayRef;
2664 let value_dict_keys = Int8Array::from_iter_values([0, 1, 1, 2, 3, 1]);
2665 let value_dict_array = DictionaryArray::new(value_dict_keys, values.clone());
2666
2667 let key_dict_keys = Int8Array::from_iter_values([0, 0, 2, 1, 1, 3]);
2668 let key_dict_array = DictionaryArray::new(key_dict_keys, values);
2669
2670 #[allow(deprecated)]
2671 let keys_field = Arc::new(Field::new_dict(
2672 "keys",
2673 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2674 true, 1,
2676 false,
2677 ));
2678 #[allow(deprecated)]
2679 let values_field = Arc::new(Field::new_dict(
2680 "values",
2681 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2682 true,
2683 2,
2684 false,
2685 ));
2686 let entry_struct = StructArray::from(vec![
2687 (keys_field, make_array(key_dict_array.into_data())),
2688 (values_field, make_array(value_dict_array.into_data())),
2689 ]);
2690 let map_data_type = DataType::Map(
2691 Arc::new(Field::new(
2692 "entries",
2693 entry_struct.data_type().clone(),
2694 false,
2695 )),
2696 false,
2697 );
2698
2699 let entry_offsets = Buffer::from_slice_ref([0, 2, 4, 6]);
2700 let map_data = ArrayData::builder(map_data_type)
2701 .len(3)
2702 .add_buffer(entry_offsets)
2703 .add_child_data(entry_struct.into_data())
2704 .build()
2705 .unwrap();
2706 let map_array = MapArray::from(map_data);
2707
2708 let dict_keys = Int8Array::from_iter_values([0, 1, 1, 2, 2, 1]);
2709 let dict_dict_array = DictionaryArray::new(dict_keys, Arc::new(map_array));
2710
2711 let schema = Arc::new(Schema::new(vec![Field::new(
2712 "f1",
2713 dict_dict_array.data_type().clone(),
2714 false,
2715 )]));
2716 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2717 let output_batch = roundtrip_ipc_stream(&input_batch);
2718 assert_eq!(input_batch, output_batch);
2719 }
2720
2721 fn test_roundtrip_stream_dict_of_list_of_dict_impl<
2722 OffsetSize: OffsetSizeTrait,
2723 U: ArrowNativeType,
2724 >(
2725 list_data_type: DataType,
2726 offsets: &[U; 5],
2727 ) {
2728 let values = StringArray::from(vec![Some("a"), None, Some("c"), None]);
2729 let keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1, 3]);
2730 let dict_array = DictionaryArray::new(keys, Arc::new(values));
2731 let dict_data = dict_array.to_data();
2732
2733 let value_offsets = Buffer::from_slice_ref(offsets);
2734
2735 let list_data = ArrayData::builder(list_data_type)
2736 .len(4)
2737 .add_buffer(value_offsets)
2738 .add_child_data(dict_data)
2739 .build()
2740 .unwrap();
2741 let list_array = GenericListArray::<OffsetSize>::from(list_data);
2742
2743 let keys_for_dict = Int8Array::from_iter_values([0, 3, 0, 1, 1, 2, 0, 1, 3]);
2744 let dict_dict_array = DictionaryArray::new(keys_for_dict, Arc::new(list_array));
2745
2746 let schema = Arc::new(Schema::new(vec![Field::new(
2747 "f1",
2748 dict_dict_array.data_type().clone(),
2749 false,
2750 )]));
2751 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2752 let output_batch = roundtrip_ipc_stream(&input_batch);
2753 assert_eq!(input_batch, output_batch);
2754 }
2755
2756 #[test]
2757 fn test_roundtrip_stream_dict_of_list_of_dict() {
2758 #[allow(deprecated)]
2760 let list_data_type = DataType::List(Arc::new(Field::new_dict(
2761 "item",
2762 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2763 true,
2764 1,
2765 false,
2766 )));
2767 let offsets: &[i32; 5] = &[0, 2, 4, 4, 6];
2768 test_roundtrip_stream_dict_of_list_of_dict_impl::<i32, i32>(list_data_type, offsets);
2769
2770 #[allow(deprecated)]
2772 let list_data_type = DataType::LargeList(Arc::new(Field::new_dict(
2773 "item",
2774 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2775 true,
2776 1,
2777 false,
2778 )));
2779 let offsets: &[i64; 5] = &[0, 2, 4, 4, 7];
2780 test_roundtrip_stream_dict_of_list_of_dict_impl::<i64, i64>(list_data_type, offsets);
2781 }
2782
2783 #[test]
2784 fn test_roundtrip_stream_dict_of_fixed_size_list_of_dict() {
2785 let values = StringArray::from(vec![Some("a"), None, Some("c"), None]);
2786 let keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1, 3, 1, 2]);
2787 let dict_array = DictionaryArray::new(keys, Arc::new(values));
2788 let dict_data = dict_array.into_data();
2789
2790 #[allow(deprecated)]
2791 let list_data_type = DataType::FixedSizeList(
2792 Arc::new(Field::new_dict(
2793 "item",
2794 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2795 true,
2796 1,
2797 false,
2798 )),
2799 3,
2800 );
2801 let list_data = ArrayData::builder(list_data_type)
2802 .len(3)
2803 .add_child_data(dict_data)
2804 .build()
2805 .unwrap();
2806 let list_array = FixedSizeListArray::from(list_data);
2807
2808 let keys_for_dict = Int8Array::from_iter_values([0, 1, 0, 1, 1, 2, 0, 1, 2]);
2809 let dict_dict_array = DictionaryArray::new(keys_for_dict, Arc::new(list_array));
2810
2811 let schema = Arc::new(Schema::new(vec![Field::new(
2812 "f1",
2813 dict_dict_array.data_type().clone(),
2814 false,
2815 )]));
2816 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2817 let output_batch = roundtrip_ipc_stream(&input_batch);
2818 assert_eq!(input_batch, output_batch);
2819 }
2820
2821 const LONG_TEST_STRING: &str =
2822 "This is a long string to make sure binary view array handles it";
2823
2824 #[test]
2825 fn test_roundtrip_view_types() {
2826 let schema = Schema::new(vec![
2827 Field::new("field_1", DataType::BinaryView, true),
2828 Field::new("field_2", DataType::Utf8, true),
2829 Field::new("field_3", DataType::Utf8View, true),
2830 ]);
2831 let bin_values: Vec<Option<&[u8]>> = vec![
2832 Some(b"foo"),
2833 None,
2834 Some(b"bar"),
2835 Some(LONG_TEST_STRING.as_bytes()),
2836 ];
2837 let utf8_values: Vec<Option<&str>> =
2838 vec![Some("foo"), None, Some("bar"), Some(LONG_TEST_STRING)];
2839 let bin_view_array = BinaryViewArray::from_iter(bin_values);
2840 let utf8_array = StringArray::from_iter(utf8_values.iter());
2841 let utf8_view_array = StringViewArray::from_iter(utf8_values);
2842 let record_batch = RecordBatch::try_new(
2843 Arc::new(schema.clone()),
2844 vec![
2845 Arc::new(bin_view_array),
2846 Arc::new(utf8_array),
2847 Arc::new(utf8_view_array),
2848 ],
2849 )
2850 .unwrap();
2851
2852 assert_eq!(record_batch, roundtrip_ipc(&record_batch));
2853 assert_eq!(record_batch, roundtrip_ipc_stream(&record_batch));
2854
2855 let sliced_batch = record_batch.slice(1, 2);
2856 assert_eq!(sliced_batch, roundtrip_ipc(&sliced_batch));
2857 assert_eq!(sliced_batch, roundtrip_ipc_stream(&sliced_batch));
2858 }
2859
2860 #[test]
2861 fn test_roundtrip_view_types_nested_dict() {
2862 let bin_values: Vec<Option<&[u8]>> = vec![
2863 Some(b"foo"),
2864 None,
2865 Some(b"bar"),
2866 Some(LONG_TEST_STRING.as_bytes()),
2867 Some(b"field"),
2868 ];
2869 let utf8_values: Vec<Option<&str>> = vec![
2870 Some("foo"),
2871 None,
2872 Some("bar"),
2873 Some(LONG_TEST_STRING),
2874 Some("field"),
2875 ];
2876 let bin_view_array = Arc::new(BinaryViewArray::from_iter(bin_values));
2877 let utf8_view_array = Arc::new(StringViewArray::from_iter(utf8_values));
2878
2879 let key_dict_keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1, 3]);
2880 let key_dict_array = DictionaryArray::new(key_dict_keys, utf8_view_array.clone());
2881 #[allow(deprecated)]
2882 let keys_field = Arc::new(Field::new_dict(
2883 "keys",
2884 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8View)),
2885 true,
2886 1,
2887 false,
2888 ));
2889
2890 let value_dict_keys = Int8Array::from_iter_values([0, 3, 0, 1, 2, 0, 1]);
2891 let value_dict_array = DictionaryArray::new(value_dict_keys, bin_view_array);
2892 #[allow(deprecated)]
2893 let values_field = Arc::new(Field::new_dict(
2894 "values",
2895 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::BinaryView)),
2896 true,
2897 2,
2898 false,
2899 ));
2900 let entry_struct = StructArray::from(vec![
2901 (keys_field, make_array(key_dict_array.into_data())),
2902 (values_field, make_array(value_dict_array.into_data())),
2903 ]);
2904
2905 let map_data_type = DataType::Map(
2906 Arc::new(Field::new(
2907 "entries",
2908 entry_struct.data_type().clone(),
2909 false,
2910 )),
2911 false,
2912 );
2913 let entry_offsets = Buffer::from_slice_ref([0, 2, 4, 7]);
2914 let map_data = ArrayData::builder(map_data_type)
2915 .len(3)
2916 .add_buffer(entry_offsets)
2917 .add_child_data(entry_struct.into_data())
2918 .build()
2919 .unwrap();
2920 let map_array = MapArray::from(map_data);
2921
2922 let dict_keys = Int8Array::from_iter_values([0, 1, 0, 1, 1, 2, 0, 1, 2]);
2923 let dict_dict_array = DictionaryArray::new(dict_keys, Arc::new(map_array));
2924 let schema = Arc::new(Schema::new(vec![Field::new(
2925 "f1",
2926 dict_dict_array.data_type().clone(),
2927 false,
2928 )]));
2929 let batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2930 assert_eq!(batch, roundtrip_ipc(&batch));
2931 assert_eq!(batch, roundtrip_ipc_stream(&batch));
2932
2933 let sliced_batch = batch.slice(1, 2);
2934 assert_eq!(sliced_batch, roundtrip_ipc(&sliced_batch));
2935 assert_eq!(sliced_batch, roundtrip_ipc_stream(&sliced_batch));
2936 }
2937
2938 #[test]
2939 fn test_no_columns_batch() {
2940 let schema = Arc::new(Schema::empty());
2941 let options = RecordBatchOptions::new()
2942 .with_match_field_names(true)
2943 .with_row_count(Some(10));
2944 let input_batch = RecordBatch::try_new_with_options(schema, vec![], &options).unwrap();
2945 let output_batch = roundtrip_ipc_stream(&input_batch);
2946 assert_eq!(input_batch, output_batch);
2947 }
2948
2949 #[test]
2950 fn test_unaligned() {
2951 let batch = RecordBatch::try_from_iter(vec![(
2952 "i32",
2953 Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _,
2954 )])
2955 .unwrap();
2956
2957 let r#gen = IpcDataGenerator {};
2958 let mut dict_tracker = DictionaryTracker::new(false);
2959 let (_, encoded) = r#gen
2960 .encode(
2961 &batch,
2962 &mut dict_tracker,
2963 &Default::default(),
2964 &mut Default::default(),
2965 )
2966 .unwrap();
2967
2968 let message = root_as_message(&encoded.ipc_message).unwrap();
2969
2970 let mut buffer = MutableBuffer::with_capacity(encoded.arrow_data.len() + 1);
2972 buffer.push(0_u8);
2973 buffer.extend_from_slice(&encoded.arrow_data);
2974 let b = Buffer::from(buffer).slice(1);
2975 assert_ne!(b.as_ptr().align_offset(8), 0);
2976
2977 let ipc_batch = message.header_as_record_batch().unwrap();
2978 let roundtrip = RecordBatchDecoder::try_new(
2979 &b,
2980 ipc_batch,
2981 batch.schema(),
2982 &Default::default(),
2983 &message.version(),
2984 )
2985 .unwrap()
2986 .with_require_alignment(false)
2987 .read_record_batch()
2988 .unwrap();
2989 assert_eq!(batch, roundtrip);
2990 }
2991
2992 #[test]
2993 fn test_unaligned_throws_error_with_require_alignment() {
2994 let batch = RecordBatch::try_from_iter(vec![(
2995 "i32",
2996 Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _,
2997 )])
2998 .unwrap();
2999
3000 let r#gen = IpcDataGenerator {};
3001 let mut dict_tracker = DictionaryTracker::new(false);
3002 let (_, encoded) = r#gen
3003 .encode(
3004 &batch,
3005 &mut dict_tracker,
3006 &Default::default(),
3007 &mut Default::default(),
3008 )
3009 .unwrap();
3010
3011 let message = root_as_message(&encoded.ipc_message).unwrap();
3012
3013 let mut buffer = MutableBuffer::with_capacity(encoded.arrow_data.len() + 1);
3015 buffer.push(0_u8);
3016 buffer.extend_from_slice(&encoded.arrow_data);
3017 let b = Buffer::from(buffer).slice(1);
3018 assert_ne!(b.as_ptr().align_offset(8), 0);
3019
3020 let ipc_batch = message.header_as_record_batch().unwrap();
3021 let result = RecordBatchDecoder::try_new(
3022 &b,
3023 ipc_batch,
3024 batch.schema(),
3025 &Default::default(),
3026 &message.version(),
3027 )
3028 .unwrap()
3029 .with_require_alignment(true)
3030 .read_record_batch();
3031
3032 let error = result.unwrap_err();
3033 assert_eq!(
3034 error.to_string(),
3035 "Invalid argument error: Misaligned buffers[0] in array of type Int32, \
3036 offset from expected alignment of 4 by 1"
3037 );
3038 }
3039
3040 #[test]
3041 fn test_file_with_massive_column_count() {
3042 let limit = 600_000;
3044
3045 let fields = (0..limit)
3046 .map(|i| Field::new(format!("{i}"), DataType::Boolean, false))
3047 .collect::<Vec<_>>();
3048 let schema = Arc::new(Schema::new(fields));
3049 let batch = RecordBatch::new_empty(schema);
3050
3051 let mut buf = Vec::new();
3052 let mut writer = crate::writer::FileWriter::try_new(&mut buf, batch.schema_ref()).unwrap();
3053 writer.write(&batch).unwrap();
3054 writer.finish().unwrap();
3055 drop(writer);
3056
3057 let mut reader = FileReaderBuilder::new()
3058 .with_max_footer_fb_tables(1_500_000)
3059 .build(std::io::Cursor::new(buf))
3060 .unwrap();
3061 let roundtrip_batch = reader.next().unwrap().unwrap();
3062
3063 assert_eq!(batch, roundtrip_batch);
3064 }
3065
3066 #[test]
3067 fn test_file_with_deeply_nested_columns() {
3068 let limit = 61;
3070
3071 let fields = (0..limit).fold(
3072 vec![Field::new("leaf", DataType::Boolean, false)],
3073 |field, index| vec![Field::new_struct(format!("{index}"), field, false)],
3074 );
3075 let schema = Arc::new(Schema::new(fields));
3076 let batch = RecordBatch::new_empty(schema);
3077
3078 let mut buf = Vec::new();
3079 let mut writer = crate::writer::FileWriter::try_new(&mut buf, batch.schema_ref()).unwrap();
3080 writer.write(&batch).unwrap();
3081 writer.finish().unwrap();
3082 drop(writer);
3083
3084 let mut reader = FileReaderBuilder::new()
3085 .with_max_footer_fb_depth(65)
3086 .build(std::io::Cursor::new(buf))
3087 .unwrap();
3088 let roundtrip_batch = reader.next().unwrap().unwrap();
3089
3090 assert_eq!(batch, roundtrip_batch);
3091 }
3092
3093 #[test]
3094 fn test_invalid_struct_array_ipc_read_errors() {
3095 let a_field = Field::new("a", DataType::Int32, false);
3096 let b_field = Field::new("b", DataType::Int32, false);
3097 let struct_fields = Fields::from(vec![a_field.clone(), b_field.clone()]);
3098
3099 let a_array_data = ArrayData::builder(a_field.data_type().clone())
3100 .len(4)
3101 .add_buffer(Buffer::from_slice_ref([1, 2, 3, 4]))
3102 .build()
3103 .unwrap();
3104 let b_array_data = ArrayData::builder(b_field.data_type().clone())
3105 .len(3)
3106 .add_buffer(Buffer::from_slice_ref([5, 6, 7]))
3107 .build()
3108 .unwrap();
3109
3110 let invalid_struct_arr = unsafe {
3111 StructArray::new_unchecked(
3112 struct_fields,
3113 vec![make_array(a_array_data), make_array(b_array_data)],
3114 None,
3115 )
3116 };
3117
3118 expect_ipc_validation_error(
3119 Arc::new(invalid_struct_arr),
3120 "Invalid argument error: Incorrect array length for StructArray field \"b\", expected 4 got 3",
3121 );
3122 }
3123
3124 #[test]
3125 fn test_invalid_nested_array_ipc_read_errors() {
3126 let a_field = Field::new("a", DataType::Int32, false);
3128 let b_field = Field::new("b", DataType::Utf8, false);
3129
3130 let schema = Arc::new(Schema::new(vec![Field::new_struct(
3131 "s",
3132 vec![a_field.clone(), b_field.clone()],
3133 false,
3134 )]));
3135
3136 let a_array_data = ArrayData::builder(a_field.data_type().clone())
3137 .len(4)
3138 .add_buffer(Buffer::from_slice_ref([1, 2, 3, 4]))
3139 .build()
3140 .unwrap();
3141 let b_array_data = {
3143 let valid: &[u8] = b" ";
3144 let mut invalid = vec![];
3145 invalid.extend_from_slice(b"ValidString");
3146 invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
3147 let binary_array =
3148 BinaryArray::from_iter(vec![None, Some(valid), None, Some(&invalid)]);
3149 let array = unsafe {
3150 StringArray::new_unchecked(
3151 binary_array.offsets().clone(),
3152 binary_array.values().clone(),
3153 binary_array.nulls().cloned(),
3154 )
3155 };
3156 array.into_data()
3157 };
3158 let struct_data_type = schema.field(0).data_type();
3159
3160 let invalid_struct_arr = unsafe {
3161 make_array(
3162 ArrayData::builder(struct_data_type.clone())
3163 .len(4)
3164 .add_child_data(a_array_data)
3165 .add_child_data(b_array_data)
3166 .build_unchecked(),
3167 )
3168 };
3169 expect_ipc_validation_error(
3170 invalid_struct_arr,
3171 "Invalid argument error: Invalid UTF8 sequence at string index 3 (3..18): invalid utf-8 sequence of 1 bytes from index 11",
3172 );
3173 }
3174
3175 #[test]
3176 fn test_same_dict_id_without_preserve() {
3177 let batch = RecordBatch::try_new(
3178 Arc::new(Schema::new(
3179 ["a", "b"]
3180 .iter()
3181 .map(|name| {
3182 #[allow(deprecated)]
3183 Field::new_dict(
3184 name.to_string(),
3185 DataType::Dictionary(
3186 Box::new(DataType::Int32),
3187 Box::new(DataType::Utf8),
3188 ),
3189 true,
3190 0,
3191 false,
3192 )
3193 })
3194 .collect::<Vec<Field>>(),
3195 )),
3196 vec![
3197 Arc::new(
3198 vec![Some("c"), Some("d")]
3199 .into_iter()
3200 .collect::<DictionaryArray<Int32Type>>(),
3201 ) as ArrayRef,
3202 Arc::new(
3203 vec![Some("e"), Some("f")]
3204 .into_iter()
3205 .collect::<DictionaryArray<Int32Type>>(),
3206 ) as ArrayRef,
3207 ],
3208 )
3209 .expect("Failed to create RecordBatch");
3210
3211 let mut buf = vec![];
3213 {
3214 let mut writer = crate::writer::StreamWriter::try_new_with_options(
3215 &mut buf,
3216 batch.schema().as_ref(),
3217 crate::writer::IpcWriteOptions::default(),
3218 )
3219 .expect("Failed to create StreamWriter");
3220 writer.write(&batch).expect("Failed to write RecordBatch");
3221 writer.finish().expect("Failed to finish StreamWriter");
3222 }
3223
3224 StreamReader::try_new(std::io::Cursor::new(buf), None)
3225 .expect("Failed to create StreamReader")
3226 .for_each(|decoded_batch| {
3227 assert_eq!(decoded_batch.expect("Failed to read RecordBatch"), batch);
3228 });
3229 }
3230
3231 #[test]
3232 fn test_validation_of_invalid_list_array() {
3233 let array = unsafe {
3235 let values = Int32Array::from(vec![1, 2, 3]);
3236 let bad_offsets = ScalarBuffer::<i32>::from(vec![0, 2, 4, 2]); let offsets = OffsetBuffer::new_unchecked(bad_offsets); let field = Field::new_list_field(DataType::Int32, true);
3239 let nulls = None;
3240 ListArray::new(Arc::new(field), offsets, Arc::new(values), nulls)
3241 };
3242
3243 expect_ipc_validation_error(
3244 Arc::new(array),
3245 "Invalid argument error: Offset invariant failure: offset at position 2 out of bounds: 4 > 2",
3246 );
3247 }
3248
3249 #[test]
3250 fn test_validation_of_invalid_string_array() {
3251 let valid: &[u8] = b" ";
3252 let mut invalid = vec![];
3253 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3254 invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
3255 let binary_array = BinaryArray::from_iter(vec![None, Some(valid), None, Some(&invalid)]);
3256 let array = unsafe {
3259 StringArray::new_unchecked(
3260 binary_array.offsets().clone(),
3261 binary_array.values().clone(),
3262 binary_array.nulls().cloned(),
3263 )
3264 };
3265 expect_ipc_validation_error(
3266 Arc::new(array),
3267 "Invalid argument error: Invalid UTF8 sequence at string index 3 (3..45): invalid utf-8 sequence of 1 bytes from index 38",
3268 );
3269 }
3270
3271 #[test]
3272 fn test_validation_of_invalid_string_view_array() {
3273 let valid: &[u8] = b" ";
3274 let mut invalid = vec![];
3275 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3276 invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
3277 let binary_view_array =
3278 BinaryViewArray::from_iter(vec![None, Some(valid), None, Some(&invalid)]);
3279 let array = unsafe {
3282 StringViewArray::new_unchecked(
3283 binary_view_array.views().clone(),
3284 binary_view_array.data_buffers().to_vec(),
3285 binary_view_array.nulls().cloned(),
3286 )
3287 };
3288 expect_ipc_validation_error(
3289 Arc::new(array),
3290 "Invalid argument error: Encountered non-UTF-8 data at index 3: invalid utf-8 sequence of 1 bytes from index 38",
3291 );
3292 }
3293
3294 #[test]
3297 fn test_validation_of_invalid_dictionary_array() {
3298 let array = unsafe {
3299 let values = StringArray::from_iter_values(["a", "b", "c"]);
3300 let keys = Int32Array::from(vec![1, 200]); DictionaryArray::new_unchecked(keys, Arc::new(values))
3302 };
3303
3304 expect_ipc_validation_error(
3305 Arc::new(array),
3306 "Invalid argument error: Value at position 1 out of bounds: 200 (should be in [0, 2])",
3307 );
3308 }
3309
3310 #[test]
3311 fn test_validation_of_invalid_union_array() {
3312 let array = unsafe {
3313 let fields = UnionFields::try_new(
3314 vec![1, 3], vec![
3316 Field::new("a", DataType::Int32, false),
3317 Field::new("b", DataType::Utf8, false),
3318 ],
3319 )
3320 .unwrap();
3321 let type_ids = ScalarBuffer::from(vec![1i8, 2, 3]); let offsets = None;
3323 let children: Vec<ArrayRef> = vec![
3324 Arc::new(Int32Array::from(vec![10, 20, 30])),
3325 Arc::new(StringArray::from(vec![Some("a"), Some("b"), Some("c")])),
3326 ];
3327
3328 UnionArray::new_unchecked(fields, type_ids, offsets, children)
3329 };
3330
3331 expect_ipc_validation_error(
3332 Arc::new(array),
3333 "Invalid argument error: Type Ids values must match one of the field type ids",
3334 );
3335 }
3336
3337 const INVALID_UTF8_FIRST_CHAR: &[u8] = &[0xa0, 0xa1, 0x20, 0x20];
3340
3341 fn expect_ipc_validation_error(array: ArrayRef, expected_err: &str) {
3343 let rb = RecordBatch::try_from_iter([("a", array)]).unwrap();
3344
3345 let buf = write_stream(&rb); read_stream_skip_validation(&buf).unwrap();
3348 let err = read_stream(&buf).unwrap_err();
3349 assert_eq!(err.to_string(), expected_err);
3350
3351 let buf = write_ipc(&rb); read_ipc_skip_validation(&buf).unwrap();
3354 let err = read_ipc(&buf).unwrap_err();
3355 assert_eq!(err.to_string(), expected_err);
3356
3357 read_ipc_with_decoder_skip_validation(buf.clone()).unwrap();
3359 let err = read_ipc_with_decoder(buf).unwrap_err();
3360 assert_eq!(err.to_string(), expected_err);
3361 }
3362
3363 #[test]
3364 fn test_roundtrip_schema() {
3365 let schema = Schema::new(vec![
3366 Field::new(
3367 "a",
3368 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
3369 false,
3370 ),
3371 Field::new(
3372 "b",
3373 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
3374 false,
3375 ),
3376 ]);
3377
3378 let options = IpcWriteOptions::default();
3379 let data_gen = IpcDataGenerator::default();
3380 let mut dict_tracker = DictionaryTracker::new(false);
3381 let encoded_data =
3382 data_gen.schema_to_bytes_with_dictionary_tracker(&schema, &mut dict_tracker, &options);
3383 let mut schema_bytes = vec![];
3384 write_message(&mut schema_bytes, encoded_data, &options).expect("write_message");
3385
3386 let begin_offset: usize = if schema_bytes[0..4].eq(&CONTINUATION_MARKER) {
3387 4
3388 } else {
3389 0
3390 };
3391
3392 size_prefixed_root_as_message(&schema_bytes[begin_offset..])
3393 .expect_err("size_prefixed_root_as_message");
3394
3395 let msg = parse_message(&schema_bytes).expect("parse_message");
3396 let ipc_schema = msg.header_as_schema().expect("header_as_schema");
3397 let new_schema = fb_to_schema(ipc_schema);
3398
3399 assert_eq!(schema, new_schema);
3400 }
3401
3402 #[test]
3403 fn test_negative_meta_len() {
3404 let bytes = i32::to_le_bytes(-1);
3405 let mut buf = vec![];
3406 buf.extend(CONTINUATION_MARKER);
3407 buf.extend(bytes);
3408
3409 let reader = StreamReader::try_new(Cursor::new(buf), None);
3410 assert!(reader.is_err());
3411 }
3412
3413 #[test]
3419 fn test_read_null_dict_without_dictionary_batch() {
3420 let keys = Int32Array::new_null(4);
3422 let values: ArrayRef = new_empty_array(&DataType::Utf8);
3423 let dict_array = DictionaryArray::new(keys, values);
3424
3425 let schema = Arc::new(Schema::new(vec![Field::new(
3426 "d",
3427 dict_array.data_type().clone(),
3428 true,
3429 )]));
3430 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(dict_array)]).unwrap();
3431
3432 let full_stream = write_stream(&batch);
3434
3435 let mut stripped = Vec::new();
3439 let mut cursor = Cursor::new(&full_stream);
3440 loop {
3441 let mut header = [0u8; 4];
3444 if cursor.read_exact(&mut header).is_err() {
3445 break;
3446 }
3447 if header == CONTINUATION_MARKER && cursor.read_exact(&mut header).is_err() {
3448 break;
3449 }
3450 let meta_len = u32::from_le_bytes(header) as usize;
3451 if meta_len == 0 {
3452 stripped.extend_from_slice(&CONTINUATION_MARKER);
3454 stripped.extend_from_slice(&0u32.to_le_bytes());
3455 break;
3456 }
3457 let mut meta_buf = vec![0u8; meta_len];
3458 cursor.read_exact(&mut meta_buf).unwrap();
3459
3460 let message = root_as_message(&meta_buf).unwrap();
3461 let body_len = message.bodyLength() as usize;
3462 let mut body_buf = vec![0u8; body_len];
3463 cursor.read_exact(&mut body_buf).unwrap();
3464
3465 if message.header_type() == crate::MessageHeader::DictionaryBatch {
3466 continue;
3469 }
3470 stripped.extend_from_slice(&CONTINUATION_MARKER);
3471 stripped.extend_from_slice(&(meta_len as u32).to_le_bytes());
3472 stripped.extend_from_slice(&meta_buf);
3473 stripped.extend_from_slice(&body_buf);
3474 }
3475
3476 let result = read_stream(&stripped).unwrap();
3478 assert_eq!(result.num_rows(), 4);
3479 assert_eq!(result.num_columns(), 1);
3480
3481 let col = result.column(0);
3482 assert_eq!(col.null_count(), 4);
3483 assert_eq!(col.len(), 4);
3484 assert!(matches!(col.data_type(), DataType::Dictionary(_, _)));
3486 }
3487
3488 #[test]
3491 fn test_projection_skip_list_view() {
3492 use crate::reader::FileReader;
3493 use crate::writer::FileWriter;
3494 use arrow_array::{
3495 GenericListViewArray, Int32Array, RecordBatch,
3496 builder::{GenericListViewBuilder, UInt32Builder},
3497 };
3498 use arrow_schema::{DataType, Field, Schema};
3499 use std::sync::Arc;
3500
3501 let mut builder = GenericListViewBuilder::<i32, _>::new(UInt32Builder::new());
3503
3504 builder.values().append_value(1);
3505 builder.values().append_value(2);
3506 builder.append(true);
3507
3508 builder.append(false);
3509
3510 builder.values().append_value(3);
3511 builder.values().append_value(4);
3512 builder.append(true);
3513
3514 let list_view: GenericListViewArray<i32> = builder.finish();
3515
3516 let values = Int32Array::from(vec![10, 20, 30]);
3518
3519 let schema = Arc::new(Schema::new(vec![
3521 Field::new("a", list_view.data_type().clone(), true),
3522 Field::new("b", DataType::Int32, false),
3523 ]));
3524 let batch =
3526 RecordBatch::try_new(schema, vec![Arc::new(list_view), Arc::new(values.clone())])
3527 .unwrap();
3528
3529 let mut buf = Vec::new();
3531 {
3532 let mut writer = FileWriter::try_new(&mut buf, &batch.schema()).unwrap();
3533 writer.write(&batch).unwrap();
3534 writer.finish().unwrap();
3535 }
3536
3537 let mut reader = FileReader::try_new(std::io::Cursor::new(buf), Some(vec![1])).unwrap();
3539 let read_batch = reader.next().unwrap().unwrap();
3540
3541 assert_eq!(read_batch.num_columns(), 1);
3543 assert_eq!(read_batch.column(0).as_ref(), &values);
3544 }
3545
3546 #[test]
3550 fn test_projection_skip_fixed_width_types() {
3551 use std::sync::Arc;
3552
3553 use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, make_array};
3554 use arrow_buffer::Buffer;
3555 use arrow_data::ArrayData;
3556 use arrow_schema::{DataType, Field, IntervalUnit, Schema, TimeUnit};
3557
3558 use crate::reader::FileReader;
3559 use crate::writer::FileWriter;
3560
3561 fn make_array_for_type(data_type: DataType) -> ArrayRef {
3563 let len = 3;
3564
3565 if matches!(data_type, DataType::Boolean) {
3566 return Arc::new(BooleanArray::from(vec![true, false, true]));
3567 }
3568
3569 let width = data_type.primitive_width().unwrap();
3570 let data = ArrayData::builder(data_type)
3571 .len(len)
3572 .add_buffer(Buffer::from(vec![0_u8; len * width]))
3573 .build()
3574 .unwrap();
3575
3576 make_array(data)
3577 }
3578
3579 let data_types = vec![
3581 DataType::Boolean,
3582 DataType::Int8,
3583 DataType::Int16,
3584 DataType::Int32,
3585 DataType::Int64,
3586 DataType::UInt8,
3587 DataType::UInt16,
3588 DataType::UInt32,
3589 DataType::UInt64,
3590 DataType::Float16,
3591 DataType::Float32,
3592 DataType::Float64,
3593 DataType::Timestamp(TimeUnit::Second, None),
3594 DataType::Date32,
3595 DataType::Date64,
3596 DataType::Time32(TimeUnit::Second),
3597 DataType::Time64(TimeUnit::Microsecond),
3598 DataType::Duration(TimeUnit::Second),
3599 DataType::Interval(IntervalUnit::YearMonth),
3600 DataType::Interval(IntervalUnit::DayTime),
3601 DataType::Interval(IntervalUnit::MonthDayNano),
3602 DataType::Decimal32(9, 2),
3603 DataType::Decimal64(18, 2),
3604 DataType::Decimal128(38, 2),
3605 DataType::Decimal256(76, 2),
3606 ];
3607
3608 for data_type in data_types {
3613 let skipped = make_array_for_type(data_type.clone());
3614 let values = Int32Array::from(vec![10, 20, 30]);
3615
3616 let schema = Arc::new(Schema::new(vec![
3617 Field::new("skipped", data_type, false),
3618 Field::new("values", DataType::Int32, false),
3619 ]));
3620
3621 let batch =
3622 RecordBatch::try_new(schema, vec![skipped, Arc::new(values.clone())]).unwrap();
3623
3624 let mut buf = Vec::new();
3626 {
3627 let mut writer = FileWriter::try_new(&mut buf, &batch.schema()).unwrap();
3628 writer.write(&batch).unwrap();
3629 writer.finish().unwrap();
3630 }
3631
3632 let mut reader = FileReader::try_new(std::io::Cursor::new(buf), Some(vec![1])).unwrap();
3634 let read_batch = reader.next().unwrap().unwrap();
3635
3636 assert_eq!(read_batch.num_columns(), 1);
3638 assert_eq!(read_batch.column(0).as_ref(), &values);
3639 }
3640 }
3641}