1mod stream;
28pub use stream::*;
29
30use arrow_select::concat;
31
32use flatbuffers::{VectorIter, VerifierOptions};
33use std::collections::{HashMap, VecDeque};
34use std::fmt;
35use std::io::{BufReader, Read, Seek, SeekFrom};
36use std::sync::Arc;
37
38use arrow_array::*;
39use arrow_buffer::{
40 ArrowNativeType, BooleanBuffer, Buffer, MutableBuffer, NullBuffer, ScalarBuffer,
41};
42use arrow_data::{ArrayData, ArrayDataBuilder, UnsafeFlag};
43use arrow_schema::*;
44
45use crate::compression::CompressionCodec;
46use crate::r#gen::Message::{self};
47use crate::{Block, CONTINUATION_MARKER, FieldNode, MetadataVersion};
48use DataType::*;
49
50fn read_buffer(
60 buf: &crate::Buffer,
61 a_data: &Buffer,
62 compression_codec: Option<CompressionCodec>,
63) -> Result<Buffer, ArrowError> {
64 let start_offset = buf.offset() as usize;
65 let buf_data = a_data.slice_with_length(start_offset, buf.length() as usize);
66 match (buf_data.is_empty(), compression_codec) {
68 (true, _) | (_, None) => Ok(buf_data),
69 (false, Some(decompressor)) => decompressor.decompress_to_buffer(&buf_data),
70 }
71}
72impl RecordBatchDecoder<'_> {
73 fn create_array(
86 &mut self,
87 field: &Field,
88 variadic_counts: &mut VecDeque<i64>,
89 ) -> Result<ArrayRef, ArrowError> {
90 let data_type = field.data_type();
91 match data_type {
92 Utf8 | Binary | LargeBinary | LargeUtf8 => {
93 let field_node = self.next_node(field)?;
94 let buffers = [
95 self.next_buffer()?,
96 self.next_buffer()?,
97 self.next_buffer()?,
98 ];
99 self.create_primitive_array(field_node, data_type, &buffers)
100 }
101 BinaryView | Utf8View => {
102 let count = variadic_counts
103 .pop_front()
104 .ok_or(ArrowError::IpcError(format!(
105 "Missing variadic count for {data_type} column"
106 )))?;
107 let count = count + 2; let buffers = (0..count)
109 .map(|_| self.next_buffer())
110 .collect::<Result<Vec<_>, _>>()?;
111 let field_node = self.next_node(field)?;
112 self.create_primitive_array(field_node, data_type, &buffers)
113 }
114 FixedSizeBinary(_) => {
115 let field_node = self.next_node(field)?;
116 let buffers = [self.next_buffer()?, self.next_buffer()?];
117 self.create_primitive_array(field_node, data_type, &buffers)
118 }
119 List(list_field) | LargeList(list_field) | Map(list_field, _) => {
120 let list_node = self.next_node(field)?;
121 let list_buffers = [self.next_buffer()?, self.next_buffer()?];
122 let values = self.create_array(list_field, variadic_counts)?;
123 self.create_list_array(list_node, data_type, &list_buffers, values)
124 }
125 ListView(list_field) | LargeListView(list_field) => {
126 let list_node = self.next_node(field)?;
127 let list_buffers = [
128 self.next_buffer()?, self.next_buffer()?, self.next_buffer()?, ];
132 let values = self.create_array(list_field, variadic_counts)?;
133 self.create_list_view_array(list_node, data_type, &list_buffers, values)
134 }
135 FixedSizeList(list_field, _) => {
136 let list_node = self.next_node(field)?;
137 let list_buffers = [self.next_buffer()?];
138 let values = self.create_array(list_field, variadic_counts)?;
139 self.create_list_array(list_node, data_type, &list_buffers, values)
140 }
141 Struct(struct_fields) => {
142 let struct_node = self.next_node(field)?;
143 let null_buffer = self.next_buffer()?;
144
145 let mut struct_arrays = vec![];
147 for struct_field in struct_fields {
150 let child = self.create_array(struct_field, variadic_counts)?;
151 struct_arrays.push(child);
152 }
153 self.create_struct_array(struct_node, null_buffer, struct_fields, struct_arrays)
154 }
155 RunEndEncoded(run_ends_field, values_field) => {
156 let run_node = self.next_node(field)?;
157 let run_ends = self.create_array(run_ends_field, variadic_counts)?;
158 let values = self.create_array(values_field, variadic_counts)?;
159
160 let run_array_length = run_node.length() as usize;
161 let builder = ArrayData::builder(data_type.clone())
162 .len(run_array_length)
163 .offset(0)
164 .add_child_data(run_ends.into_data())
165 .add_child_data(values.into_data())
166 .null_count(run_node.null_count() as usize);
167
168 self.create_array_from_builder(builder)
169 }
170 Dictionary(_, _) => {
172 let index_node = self.next_node(field)?;
173 let index_buffers = [self.next_buffer()?, self.next_buffer()?];
174
175 #[allow(deprecated)]
176 let dict_id = field.dict_id().ok_or_else(|| {
177 ArrowError::ParseError(format!("Field {field} does not have dict id"))
178 })?;
179
180 let value_array = self.dictionaries_by_id.get(&dict_id).ok_or_else(|| {
181 ArrowError::ParseError(format!(
182 "Cannot find a dictionary batch with dict id: {dict_id}"
183 ))
184 })?;
185
186 self.create_dictionary_array(
187 index_node,
188 data_type,
189 &index_buffers,
190 value_array.clone(),
191 )
192 }
193 Union(fields, mode) => {
194 let union_node = self.next_node(field)?;
195 let len = union_node.length() as usize;
196
197 if self.version < MetadataVersion::V5 {
200 self.next_buffer()?;
201 }
202
203 let type_ids: ScalarBuffer<i8> =
204 self.next_buffer()?.slice_with_length(0, len).into();
205
206 let value_offsets = match mode {
207 UnionMode::Dense => {
208 let offsets: ScalarBuffer<i32> =
209 self.next_buffer()?.slice_with_length(0, len * 4).into();
210 Some(offsets)
211 }
212 UnionMode::Sparse => None,
213 };
214
215 let mut children = Vec::with_capacity(fields.len());
216
217 for (_id, field) in fields.iter() {
218 let child = self.create_array(field, variadic_counts)?;
219 children.push(child);
220 }
221
222 let array = if self.skip_validation.get() {
223 unsafe {
225 UnionArray::new_unchecked(fields.clone(), type_ids, value_offsets, children)
226 }
227 } else {
228 UnionArray::try_new(fields.clone(), type_ids, value_offsets, children)?
229 };
230 Ok(Arc::new(array))
231 }
232 Null => {
233 let node = self.next_node(field)?;
234 let length = node.length();
235 let null_count = node.null_count();
236
237 if length != null_count {
238 return Err(ArrowError::SchemaError(format!(
239 "Field {field} of NullArray has unequal null_count {null_count} and len {length}"
240 )));
241 }
242
243 let builder = ArrayData::builder(data_type.clone())
244 .len(length as usize)
245 .offset(0);
246 self.create_array_from_builder(builder)
247 }
248 _ => {
249 let field_node = self.next_node(field)?;
250 let buffers = [self.next_buffer()?, self.next_buffer()?];
251 self.create_primitive_array(field_node, data_type, &buffers)
252 }
253 }
254 }
255
256 fn create_primitive_array(
259 &self,
260 field_node: &FieldNode,
261 data_type: &DataType,
262 buffers: &[Buffer],
263 ) -> Result<ArrayRef, ArrowError> {
264 let length = field_node.length() as usize;
265 let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
266 let mut builder = match data_type {
267 Utf8 | Binary | LargeBinary | LargeUtf8 => {
268 ArrayData::builder(data_type.clone())
270 .len(length)
271 .buffers(buffers[1..3].to_vec())
272 .null_bit_buffer(null_buffer)
273 }
274 BinaryView | Utf8View => ArrayData::builder(data_type.clone())
275 .len(length)
276 .buffers(buffers[1..].to_vec())
277 .null_bit_buffer(null_buffer),
278 _ if data_type.is_primitive() || matches!(data_type, Boolean | FixedSizeBinary(_)) => {
279 ArrayData::builder(data_type.clone())
281 .len(length)
282 .add_buffer(buffers[1].clone())
283 .null_bit_buffer(null_buffer)
284 }
285 t => unreachable!("Data type {:?} either unsupported or not primitive", t),
286 };
287
288 builder = builder.null_count(field_node.null_count() as usize);
289
290 self.create_array_from_builder(builder)
291 }
292
293 fn create_array_from_builder(&self, builder: ArrayDataBuilder) -> Result<ArrayRef, ArrowError> {
295 let mut builder = builder.align_buffers(!self.require_alignment);
296 if self.skip_validation.get() {
297 unsafe { builder = builder.skip_validation(true) }
299 };
300 Ok(make_array(builder.build()?))
301 }
302
303 fn create_list_array(
306 &self,
307 field_node: &FieldNode,
308 data_type: &DataType,
309 buffers: &[Buffer],
310 child_array: ArrayRef,
311 ) -> Result<ArrayRef, ArrowError> {
312 let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
313 let length = field_node.length() as usize;
314 let child_data = child_array.into_data();
315 let mut builder = match data_type {
316 List(_) | LargeList(_) | Map(_, _) => ArrayData::builder(data_type.clone())
317 .len(length)
318 .add_buffer(buffers[1].clone())
319 .add_child_data(child_data)
320 .null_bit_buffer(null_buffer),
321
322 FixedSizeList(_, _) => ArrayData::builder(data_type.clone())
323 .len(length)
324 .add_child_data(child_data)
325 .null_bit_buffer(null_buffer),
326
327 _ => unreachable!("Cannot create list or map array from {:?}", data_type),
328 };
329
330 builder = builder.null_count(field_node.null_count() as usize);
331
332 self.create_array_from_builder(builder)
333 }
334
335 fn create_list_view_array(
336 &self,
337 field_node: &FieldNode,
338 data_type: &DataType,
339 buffers: &[Buffer],
340 child_array: ArrayRef,
341 ) -> Result<ArrayRef, ArrowError> {
342 assert!(matches!(data_type, ListView(_) | LargeListView(_)));
343
344 let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
345 let length = field_node.length() as usize;
346 let child_data = child_array.into_data();
347
348 self.create_array_from_builder(
349 ArrayData::builder(data_type.clone())
350 .len(length)
351 .add_buffer(buffers[1].clone()) .add_buffer(buffers[2].clone()) .add_child_data(child_data)
354 .null_bit_buffer(null_buffer)
355 .null_count(field_node.null_count() as usize),
356 )
357 }
358
359 fn create_struct_array(
360 &self,
361 struct_node: &FieldNode,
362 null_buffer: Buffer,
363 struct_fields: &Fields,
364 struct_arrays: Vec<ArrayRef>,
365 ) -> Result<ArrayRef, ArrowError> {
366 let null_count = struct_node.null_count() as usize;
367 let len = struct_node.length() as usize;
368 let skip_validation = self.skip_validation.get();
369
370 let nulls = if null_count > 0 {
371 let validity_buffer = BooleanBuffer::new(null_buffer, 0, len);
372 let null_buffer = if skip_validation {
373 unsafe { NullBuffer::new_unchecked(validity_buffer, null_count) }
375 } else {
376 let null_buffer = NullBuffer::new(validity_buffer);
377
378 if null_buffer.null_count() != null_count {
379 return Err(ArrowError::InvalidArgumentError(format!(
380 "null_count value ({}) doesn't match actual number of nulls in array ({})",
381 null_count,
382 null_buffer.null_count()
383 )));
384 }
385
386 null_buffer
387 };
388
389 Some(null_buffer)
390 } else {
391 None
392 };
393 if struct_arrays.is_empty() {
394 return Ok(Arc::new(StructArray::new_empty_fields(len, nulls)));
397 }
398
399 let struct_array = if skip_validation {
400 unsafe { StructArray::new_unchecked(struct_fields.clone(), struct_arrays, nulls) }
402 } else {
403 StructArray::try_new(struct_fields.clone(), struct_arrays, nulls)?
404 };
405
406 Ok(Arc::new(struct_array))
407 }
408
409 fn create_dictionary_array(
412 &self,
413 field_node: &FieldNode,
414 data_type: &DataType,
415 buffers: &[Buffer],
416 value_array: ArrayRef,
417 ) -> Result<ArrayRef, ArrowError> {
418 if let Dictionary(_, _) = *data_type {
419 let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
420 let builder = ArrayData::builder(data_type.clone())
421 .len(field_node.length() as usize)
422 .add_buffer(buffers[1].clone())
423 .add_child_data(value_array.into_data())
424 .null_bit_buffer(null_buffer)
425 .null_count(field_node.null_count() as usize);
426 self.create_array_from_builder(builder)
427 } else {
428 unreachable!("Cannot create dictionary array from {:?}", data_type)
429 }
430 }
431}
432
433pub struct RecordBatchDecoder<'a> {
439 batch: crate::RecordBatch<'a>,
441 schema: SchemaRef,
443 dictionaries_by_id: &'a HashMap<i64, ArrayRef>,
445 compression: Option<CompressionCodec>,
447 version: MetadataVersion,
449 data: &'a Buffer,
451 nodes: VectorIter<'a, FieldNode>,
453 buffers: VectorIter<'a, crate::Buffer>,
455 projection: Option<&'a [usize]>,
458 require_alignment: bool,
461 skip_validation: UnsafeFlag,
465}
466
467impl<'a> RecordBatchDecoder<'a> {
468 fn try_new(
470 buf: &'a Buffer,
471 batch: crate::RecordBatch<'a>,
472 schema: SchemaRef,
473 dictionaries_by_id: &'a HashMap<i64, ArrayRef>,
474 metadata: &'a MetadataVersion,
475 ) -> Result<Self, ArrowError> {
476 let buffers = batch.buffers().ok_or_else(|| {
477 ArrowError::IpcError("Unable to get buffers from IPC RecordBatch".to_string())
478 })?;
479 let field_nodes = batch.nodes().ok_or_else(|| {
480 ArrowError::IpcError("Unable to get field nodes from IPC RecordBatch".to_string())
481 })?;
482
483 let batch_compression = batch.compression();
484 let compression = batch_compression
485 .map(|batch_compression| batch_compression.codec().try_into())
486 .transpose()?;
487
488 Ok(Self {
489 batch,
490 schema,
491 dictionaries_by_id,
492 compression,
493 version: *metadata,
494 data: buf,
495 nodes: field_nodes.iter(),
496 buffers: buffers.iter(),
497 projection: None,
498 require_alignment: false,
499 skip_validation: UnsafeFlag::new(),
500 })
501 }
502
503 pub fn with_projection(mut self, projection: Option<&'a [usize]>) -> Self {
508 self.projection = projection;
509 self
510 }
511
512 pub fn with_require_alignment(mut self, require_alignment: bool) -> Self {
518 self.require_alignment = require_alignment;
519 self
520 }
521
522 pub(crate) fn with_skip_validation(mut self, skip_validation: UnsafeFlag) -> Self {
534 self.skip_validation = skip_validation;
535 self
536 }
537
538 fn read_record_batch(mut self) -> Result<RecordBatch, ArrowError> {
540 let mut variadic_counts: VecDeque<i64> = self
541 .batch
542 .variadicBufferCounts()
543 .into_iter()
544 .flatten()
545 .collect();
546
547 let options = RecordBatchOptions::new().with_row_count(Some(self.batch.length() as usize));
548
549 let schema = Arc::clone(&self.schema);
550 if let Some(projection) = self.projection {
551 let mut arrays = vec![];
552 for (idx, field) in schema.fields().iter().enumerate() {
554 if let Some(proj_idx) = projection.iter().position(|p| p == &idx) {
556 let child = self.create_array(field, &mut variadic_counts)?;
557 arrays.push((proj_idx, child));
558 } else {
559 self.skip_field(field, &mut variadic_counts)?;
560 }
561 }
562
563 arrays.sort_by_key(|t| t.0);
564
565 let schema = Arc::new(schema.project(projection)?);
566 let columns = arrays.into_iter().map(|t| t.1).collect::<Vec<_>>();
567
568 if self.skip_validation.get() {
569 unsafe {
571 Ok(RecordBatch::new_unchecked(
572 schema,
573 columns,
574 self.batch.length() as usize,
575 ))
576 }
577 } else {
578 assert!(variadic_counts.is_empty());
579 RecordBatch::try_new_with_options(schema, columns, &options)
580 }
581 } else {
582 let mut children = vec![];
583 for field in schema.fields() {
585 let child = self.create_array(field, &mut variadic_counts)?;
586 children.push(child);
587 }
588
589 if self.skip_validation.get() {
590 unsafe {
592 Ok(RecordBatch::new_unchecked(
593 schema,
594 children,
595 self.batch.length() as usize,
596 ))
597 }
598 } else {
599 assert!(variadic_counts.is_empty());
600 RecordBatch::try_new_with_options(schema, children, &options)
601 }
602 }
603 }
604
605 fn next_buffer(&mut self) -> Result<Buffer, ArrowError> {
606 let buffer = self.buffers.next().ok_or_else(|| {
607 ArrowError::IpcError("Buffer count mismatched with metadata".to_string())
608 })?;
609 read_buffer(buffer, self.data, self.compression)
610 }
611
612 fn skip_buffer(&mut self) {
613 self.buffers.next().unwrap();
614 }
615
616 fn next_node(&mut self, field: &Field) -> Result<&'a FieldNode, ArrowError> {
617 self.nodes.next().ok_or_else(|| {
618 ArrowError::SchemaError(format!(
619 "Invalid data for schema. {field} refers to node not found in schema",
620 ))
621 })
622 }
623
624 fn skip_field(
625 &mut self,
626 field: &Field,
627 variadic_count: &mut VecDeque<i64>,
628 ) -> Result<(), ArrowError> {
629 self.next_node(field)?;
630
631 match field.data_type() {
632 Utf8 | Binary | LargeBinary | LargeUtf8 => {
633 for _ in 0..3 {
634 self.skip_buffer()
635 }
636 }
637 Utf8View | BinaryView => {
638 let count = variadic_count
639 .pop_front()
640 .ok_or(ArrowError::IpcError(format!(
641 "Missing variadic count for {} column",
642 field.data_type()
643 )))?;
644 let count = count + 2; for _i in 0..count {
646 self.skip_buffer()
647 }
648 }
649 FixedSizeBinary(_) => {
650 self.skip_buffer();
651 self.skip_buffer();
652 }
653 List(list_field) | LargeList(list_field) | Map(list_field, _) => {
654 self.skip_buffer();
655 self.skip_buffer();
656 self.skip_field(list_field, variadic_count)?;
657 }
658 FixedSizeList(list_field, _) => {
659 self.skip_buffer();
660 self.skip_field(list_field, variadic_count)?;
661 }
662 Struct(struct_fields) => {
663 self.skip_buffer();
664
665 for struct_field in struct_fields {
667 self.skip_field(struct_field, variadic_count)?
668 }
669 }
670 RunEndEncoded(run_ends_field, values_field) => {
671 self.skip_field(run_ends_field, variadic_count)?;
672 self.skip_field(values_field, variadic_count)?;
673 }
674 Dictionary(_, _) => {
675 self.skip_buffer(); self.skip_buffer(); }
678 Union(fields, mode) => {
679 self.skip_buffer(); match mode {
682 UnionMode::Dense => self.skip_buffer(),
683 UnionMode::Sparse => {}
684 };
685
686 for (_, field) in fields.iter() {
687 self.skip_field(field, variadic_count)?
688 }
689 }
690 Null => {} _ => {
692 self.skip_buffer();
693 self.skip_buffer();
694 }
695 };
696 Ok(())
697 }
698}
699
700pub fn read_record_batch(
711 buf: &Buffer,
712 batch: crate::RecordBatch,
713 schema: SchemaRef,
714 dictionaries_by_id: &HashMap<i64, ArrayRef>,
715 projection: Option<&[usize]>,
716 metadata: &MetadataVersion,
717) -> Result<RecordBatch, ArrowError> {
718 RecordBatchDecoder::try_new(buf, batch, schema, dictionaries_by_id, metadata)?
719 .with_projection(projection)
720 .with_require_alignment(false)
721 .read_record_batch()
722}
723
724pub fn read_dictionary(
727 buf: &Buffer,
728 batch: crate::DictionaryBatch,
729 schema: &Schema,
730 dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
731 metadata: &MetadataVersion,
732) -> Result<(), ArrowError> {
733 read_dictionary_impl(
734 buf,
735 batch,
736 schema,
737 dictionaries_by_id,
738 metadata,
739 false,
740 UnsafeFlag::new(),
741 )
742}
743
744fn read_dictionary_impl(
745 buf: &Buffer,
746 batch: crate::DictionaryBatch,
747 schema: &Schema,
748 dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
749 metadata: &MetadataVersion,
750 require_alignment: bool,
751 skip_validation: UnsafeFlag,
752) -> Result<(), ArrowError> {
753 let id = batch.id();
754
755 let dictionary_values = get_dictionary_values(
756 buf,
757 batch,
758 schema,
759 dictionaries_by_id,
760 metadata,
761 require_alignment,
762 skip_validation,
763 )?;
764
765 update_dictionaries(dictionaries_by_id, batch.isDelta(), id, dictionary_values)?;
766
767 Ok(())
768}
769
770fn update_dictionaries(
779 dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
780 is_delta: bool,
781 dict_id: i64,
782 dict_values: ArrayRef,
783) -> Result<(), ArrowError> {
784 if !is_delta {
785 dictionaries_by_id.insert(dict_id, dict_values.clone());
789 return Ok(());
790 }
791
792 let existing = dictionaries_by_id.get(&dict_id).ok_or_else(|| {
793 ArrowError::InvalidArgumentError(format!(
794 "No existing dictionary for delta dictionary with id '{dict_id}'"
795 ))
796 })?;
797
798 let combined = concat::concat(&[existing, &dict_values]).map_err(|e| {
799 ArrowError::InvalidArgumentError(format!("Failed to concat delta dictionary: {e}"))
800 })?;
801
802 dictionaries_by_id.insert(dict_id, combined);
803
804 Ok(())
805}
806
807fn get_dictionary_values(
811 buf: &Buffer,
812 batch: crate::DictionaryBatch,
813 schema: &Schema,
814 dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
815 metadata: &MetadataVersion,
816 require_alignment: bool,
817 skip_validation: UnsafeFlag,
818) -> Result<ArrayRef, ArrowError> {
819 let id = batch.id();
820 #[allow(deprecated)]
821 let fields_using_this_dictionary = schema.fields_with_dict_id(id);
822 let first_field = fields_using_this_dictionary.first().ok_or_else(|| {
823 ArrowError::InvalidArgumentError(format!("dictionary id {id} not found in schema"))
824 })?;
825
826 let dictionary_values: ArrayRef = match first_field.data_type() {
830 DataType::Dictionary(_, value_type) => {
831 let value = value_type.as_ref().clone();
833 let schema = Schema::new(vec![Field::new("", value, true)]);
834 let record_batch = RecordBatchDecoder::try_new(
836 buf,
837 batch.data().unwrap(),
838 Arc::new(schema),
839 dictionaries_by_id,
840 metadata,
841 )?
842 .with_require_alignment(require_alignment)
843 .with_skip_validation(skip_validation)
844 .read_record_batch()?;
845
846 Some(record_batch.column(0).clone())
847 }
848 _ => None,
849 }
850 .ok_or_else(|| {
851 ArrowError::InvalidArgumentError(format!("dictionary id {id} not found in schema"))
852 })?;
853
854 Ok(dictionary_values)
855}
856
857fn read_block<R: Read + Seek>(mut reader: R, block: &Block) -> Result<Buffer, ArrowError> {
859 reader.seek(SeekFrom::Start(block.offset() as u64))?;
860 let body_len = block.bodyLength().to_usize().unwrap();
861 let metadata_len = block.metaDataLength().to_usize().unwrap();
862 let total_len = body_len.checked_add(metadata_len).unwrap();
863
864 let mut buf = MutableBuffer::from_len_zeroed(total_len);
865 reader.read_exact(&mut buf)?;
866 Ok(buf.into())
867}
868
869fn parse_message(buf: &[u8]) -> Result<Message::Message<'_>, ArrowError> {
873 let buf = match buf[..4] == CONTINUATION_MARKER {
874 true => &buf[8..],
875 false => &buf[4..],
876 };
877 crate::root_as_message(buf)
878 .map_err(|err| ArrowError::ParseError(format!("Unable to get root as message: {err:?}")))
879}
880
881pub fn read_footer_length(buf: [u8; 10]) -> Result<usize, ArrowError> {
885 if buf[4..] != super::ARROW_MAGIC {
886 return Err(ArrowError::ParseError(
887 "Arrow file does not contain correct footer".to_string(),
888 ));
889 }
890
891 let footer_len = i32::from_le_bytes(buf[..4].try_into().unwrap());
893 footer_len
894 .try_into()
895 .map_err(|_| ArrowError::ParseError(format!("Invalid footer length: {footer_len}")))
896}
897
898#[derive(Debug)]
963pub struct FileDecoder {
964 schema: SchemaRef,
965 dictionaries: HashMap<i64, ArrayRef>,
966 version: MetadataVersion,
967 projection: Option<Vec<usize>>,
968 require_alignment: bool,
969 skip_validation: UnsafeFlag,
970}
971
972impl FileDecoder {
973 pub fn new(schema: SchemaRef, version: MetadataVersion) -> Self {
975 Self {
976 schema,
977 version,
978 dictionaries: Default::default(),
979 projection: None,
980 require_alignment: false,
981 skip_validation: UnsafeFlag::new(),
982 }
983 }
984
985 pub fn with_projection(mut self, projection: Vec<usize>) -> Self {
987 self.projection = Some(projection);
988 self
989 }
990
991 pub fn with_require_alignment(mut self, require_alignment: bool) -> Self {
1004 self.require_alignment = require_alignment;
1005 self
1006 }
1007
1008 pub unsafe fn with_skip_validation(mut self, skip_validation: bool) -> Self {
1019 unsafe { self.skip_validation.set(skip_validation) };
1020 self
1021 }
1022
1023 fn read_message<'a>(&self, buf: &'a [u8]) -> Result<Message::Message<'a>, ArrowError> {
1024 let message = parse_message(buf)?;
1025
1026 if self.version != MetadataVersion::V1 && message.version() != self.version {
1028 return Err(ArrowError::IpcError(
1029 "Could not read IPC message as metadata versions mismatch".to_string(),
1030 ));
1031 }
1032 Ok(message)
1033 }
1034
1035 pub fn read_dictionary(&mut self, block: &Block, buf: &Buffer) -> Result<(), ArrowError> {
1037 let message = self.read_message(buf)?;
1038 match message.header_type() {
1039 crate::MessageHeader::DictionaryBatch => {
1040 let batch = message.header_as_dictionary_batch().unwrap();
1041 read_dictionary_impl(
1042 &buf.slice(block.metaDataLength() as _),
1043 batch,
1044 &self.schema,
1045 &mut self.dictionaries,
1046 &message.version(),
1047 self.require_alignment,
1048 self.skip_validation.clone(),
1049 )
1050 }
1051 t => Err(ArrowError::ParseError(format!(
1052 "Expecting DictionaryBatch in dictionary blocks, found {t:?}."
1053 ))),
1054 }
1055 }
1056
1057 pub fn read_record_batch(
1059 &self,
1060 block: &Block,
1061 buf: &Buffer,
1062 ) -> Result<Option<RecordBatch>, ArrowError> {
1063 let message = self.read_message(buf)?;
1064 match message.header_type() {
1065 crate::MessageHeader::Schema => Err(ArrowError::IpcError(
1066 "Not expecting a schema when messages are read".to_string(),
1067 )),
1068 crate::MessageHeader::RecordBatch => {
1069 let batch = message.header_as_record_batch().ok_or_else(|| {
1070 ArrowError::IpcError("Unable to read IPC message as record batch".to_string())
1071 })?;
1072 RecordBatchDecoder::try_new(
1074 &buf.slice(block.metaDataLength() as _),
1075 batch,
1076 self.schema.clone(),
1077 &self.dictionaries,
1078 &message.version(),
1079 )?
1080 .with_projection(self.projection.as_deref())
1081 .with_require_alignment(self.require_alignment)
1082 .with_skip_validation(self.skip_validation.clone())
1083 .read_record_batch()
1084 .map(Some)
1085 }
1086 crate::MessageHeader::NONE => Ok(None),
1087 t => Err(ArrowError::InvalidArgumentError(format!(
1088 "Reading types other than record batches not yet supported, unable to read {t:?}"
1089 ))),
1090 }
1091 }
1092}
1093
1094#[derive(Debug)]
1096pub struct FileReaderBuilder {
1097 projection: Option<Vec<usize>>,
1099 max_footer_fb_tables: usize,
1101 max_footer_fb_depth: usize,
1103}
1104
1105impl Default for FileReaderBuilder {
1106 fn default() -> Self {
1107 let verifier_options = VerifierOptions::default();
1108 Self {
1109 max_footer_fb_tables: verifier_options.max_tables,
1110 max_footer_fb_depth: verifier_options.max_depth,
1111 projection: None,
1112 }
1113 }
1114}
1115
1116impl FileReaderBuilder {
1117 pub fn new() -> Self {
1121 Self::default()
1122 }
1123
1124 pub fn with_projection(mut self, projection: Vec<usize>) -> Self {
1126 self.projection = Some(projection);
1127 self
1128 }
1129
1130 pub fn with_max_footer_fb_tables(mut self, max_footer_fb_tables: usize) -> Self {
1143 self.max_footer_fb_tables = max_footer_fb_tables;
1144 self
1145 }
1146
1147 pub fn with_max_footer_fb_depth(mut self, max_footer_fb_depth: usize) -> Self {
1160 self.max_footer_fb_depth = max_footer_fb_depth;
1161 self
1162 }
1163
1164 pub fn build<R: Read + Seek>(self, mut reader: R) -> Result<FileReader<R>, ArrowError> {
1166 let mut buffer = [0; 10];
1168 reader.seek(SeekFrom::End(-10))?;
1169 reader.read_exact(&mut buffer)?;
1170
1171 let footer_len = read_footer_length(buffer)?;
1172
1173 let mut footer_data = vec![0; footer_len];
1175 reader.seek(SeekFrom::End(-10 - footer_len as i64))?;
1176 reader.read_exact(&mut footer_data)?;
1177
1178 let verifier_options = VerifierOptions {
1179 max_tables: self.max_footer_fb_tables,
1180 max_depth: self.max_footer_fb_depth,
1181 ..Default::default()
1182 };
1183 let footer = crate::root_as_footer_with_opts(&verifier_options, &footer_data[..]).map_err(
1184 |err| ArrowError::ParseError(format!("Unable to get root as footer: {err:?}")),
1185 )?;
1186
1187 let blocks = footer.recordBatches().ok_or_else(|| {
1188 ArrowError::ParseError("Unable to get record batches from IPC Footer".to_string())
1189 })?;
1190
1191 let total_blocks = blocks.len();
1192
1193 let ipc_schema = footer.schema().unwrap();
1194 if !ipc_schema.endianness().equals_to_target_endianness() {
1195 return Err(ArrowError::IpcError(
1196 "the endianness of the source system does not match the endianness of the target system.".to_owned()
1197 ));
1198 }
1199
1200 let schema = crate::convert::fb_to_schema(ipc_schema);
1201
1202 let mut custom_metadata = HashMap::new();
1203 if let Some(fb_custom_metadata) = footer.custom_metadata() {
1204 for kv in fb_custom_metadata.into_iter() {
1205 custom_metadata.insert(
1206 kv.key().unwrap().to_string(),
1207 kv.value().unwrap().to_string(),
1208 );
1209 }
1210 }
1211
1212 let mut decoder = FileDecoder::new(Arc::new(schema), footer.version());
1213 if let Some(projection) = self.projection {
1214 decoder = decoder.with_projection(projection)
1215 }
1216
1217 if let Some(dictionaries) = footer.dictionaries() {
1219 for block in dictionaries {
1220 let buf = read_block(&mut reader, block)?;
1221 decoder.read_dictionary(block, &buf)?;
1222 }
1223 }
1224
1225 Ok(FileReader {
1226 reader,
1227 blocks: blocks.iter().copied().collect(),
1228 current_block: 0,
1229 total_blocks,
1230 decoder,
1231 custom_metadata,
1232 })
1233 }
1234}
1235
1236pub struct FileReader<R> {
1281 reader: R,
1283
1284 decoder: FileDecoder,
1286
1287 blocks: Vec<Block>,
1291
1292 current_block: usize,
1294
1295 total_blocks: usize,
1297
1298 custom_metadata: HashMap<String, String>,
1300}
1301
1302impl<R> fmt::Debug for FileReader<R> {
1303 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
1304 f.debug_struct("FileReader<R>")
1305 .field("decoder", &self.decoder)
1306 .field("blocks", &self.blocks)
1307 .field("current_block", &self.current_block)
1308 .field("total_blocks", &self.total_blocks)
1309 .finish_non_exhaustive()
1310 }
1311}
1312
1313impl<R: Read + Seek> FileReader<BufReader<R>> {
1314 pub fn try_new_buffered(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
1318 Self::try_new(BufReader::new(reader), projection)
1319 }
1320}
1321
1322impl<R: Read + Seek> FileReader<R> {
1323 pub fn try_new(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
1334 let builder = FileReaderBuilder {
1335 projection,
1336 ..Default::default()
1337 };
1338 builder.build(reader)
1339 }
1340
1341 pub fn custom_metadata(&self) -> &HashMap<String, String> {
1343 &self.custom_metadata
1344 }
1345
1346 pub fn num_batches(&self) -> usize {
1348 self.total_blocks
1349 }
1350
1351 pub fn schema(&self) -> SchemaRef {
1353 self.decoder.schema.clone()
1354 }
1355
1356 pub fn set_index(&mut self, index: usize) -> Result<(), ArrowError> {
1360 if index >= self.total_blocks {
1361 Err(ArrowError::InvalidArgumentError(format!(
1362 "Cannot set batch to index {} from {} total batches",
1363 index, self.total_blocks
1364 )))
1365 } else {
1366 self.current_block = index;
1367 Ok(())
1368 }
1369 }
1370
1371 fn maybe_next(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
1372 let block = &self.blocks[self.current_block];
1373 self.current_block += 1;
1374
1375 let buffer = read_block(&mut self.reader, block)?;
1377 self.decoder.read_record_batch(block, &buffer)
1378 }
1379
1380 pub fn get_ref(&self) -> &R {
1384 &self.reader
1385 }
1386
1387 pub fn get_mut(&mut self) -> &mut R {
1391 &mut self.reader
1392 }
1393
1394 pub unsafe fn with_skip_validation(mut self, skip_validation: bool) -> Self {
1400 self.decoder = unsafe { self.decoder.with_skip_validation(skip_validation) };
1401 self
1402 }
1403}
1404
1405impl<R: Read + Seek> Iterator for FileReader<R> {
1406 type Item = Result<RecordBatch, ArrowError>;
1407
1408 fn next(&mut self) -> Option<Self::Item> {
1409 if self.current_block < self.total_blocks {
1411 self.maybe_next().transpose()
1412 } else {
1413 None
1414 }
1415 }
1416}
1417
1418impl<R: Read + Seek> RecordBatchReader for FileReader<R> {
1419 fn schema(&self) -> SchemaRef {
1420 self.schema()
1421 }
1422}
1423
1424pub struct StreamReader<R> {
1458 reader: MessageReader<R>,
1460
1461 schema: SchemaRef,
1463
1464 dictionaries_by_id: HashMap<i64, ArrayRef>,
1468
1469 finished: bool,
1473
1474 projection: Option<(Vec<usize>, Schema)>,
1476
1477 skip_validation: UnsafeFlag,
1481}
1482
1483impl<R> fmt::Debug for StreamReader<R> {
1484 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> {
1485 f.debug_struct("StreamReader<R>")
1486 .field("reader", &"R")
1487 .field("schema", &self.schema)
1488 .field("dictionaries_by_id", &self.dictionaries_by_id)
1489 .field("finished", &self.finished)
1490 .field("projection", &self.projection)
1491 .finish()
1492 }
1493}
1494
1495impl<R: Read> StreamReader<BufReader<R>> {
1496 pub fn try_new_buffered(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
1500 Self::try_new(BufReader::new(reader), projection)
1501 }
1502}
1503
1504impl<R: Read> StreamReader<R> {
1505 pub fn try_new(
1517 reader: R,
1518 projection: Option<Vec<usize>>,
1519 ) -> Result<StreamReader<R>, ArrowError> {
1520 let mut msg_reader = MessageReader::new(reader);
1521 let message = msg_reader.maybe_next()?;
1522 let Some((message, _)) = message else {
1523 return Err(ArrowError::IpcError(
1524 "Expected schema message, found empty stream.".to_string(),
1525 ));
1526 };
1527
1528 if message.header_type() != Message::MessageHeader::Schema {
1529 return Err(ArrowError::IpcError(format!(
1530 "Expected a schema as the first message in the stream, got: {:?}",
1531 message.header_type()
1532 )));
1533 }
1534
1535 let schema = message.header_as_schema().ok_or_else(|| {
1536 ArrowError::ParseError("Failed to parse schema from message header".to_string())
1537 })?;
1538 let schema = crate::convert::fb_to_schema(schema);
1539
1540 let dictionaries_by_id = HashMap::new();
1542
1543 let projection = match projection {
1544 Some(projection_indices) => {
1545 let schema = schema.project(&projection_indices)?;
1546 Some((projection_indices, schema))
1547 }
1548 _ => None,
1549 };
1550
1551 Ok(Self {
1552 reader: msg_reader,
1553 schema: Arc::new(schema),
1554 finished: false,
1555 dictionaries_by_id,
1556 projection,
1557 skip_validation: UnsafeFlag::new(),
1558 })
1559 }
1560
1561 #[deprecated(since = "53.0.0", note = "use `try_new` instead")]
1563 pub fn try_new_unbuffered(
1564 reader: R,
1565 projection: Option<Vec<usize>>,
1566 ) -> Result<Self, ArrowError> {
1567 Self::try_new(reader, projection)
1568 }
1569
1570 pub fn schema(&self) -> SchemaRef {
1572 self.schema.clone()
1573 }
1574
1575 pub fn is_finished(&self) -> bool {
1577 self.finished
1578 }
1579
1580 fn maybe_next(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
1581 if self.finished {
1582 return Ok(None);
1583 }
1584
1585 loop {
1587 let message = self.next_ipc_message()?;
1588 let Some(message) = message else {
1589 self.finished = true;
1591 return Ok(None);
1592 };
1593
1594 match message {
1595 IpcMessage::Schema(_) => {
1596 return Err(ArrowError::IpcError(
1597 "Expected a record batch, but found a schema".to_string(),
1598 ));
1599 }
1600 IpcMessage::RecordBatch(record_batch) => {
1601 return Ok(Some(record_batch));
1602 }
1603 IpcMessage::DictionaryBatch { .. } => {
1604 continue;
1605 }
1606 };
1607 }
1608 }
1609
1610 pub(crate) fn next_ipc_message(&mut self) -> Result<Option<IpcMessage>, ArrowError> {
1618 let message = self.reader.maybe_next()?;
1619 let Some((message, body)) = message else {
1620 return Ok(None);
1622 };
1623
1624 let ipc_message = match message.header_type() {
1625 Message::MessageHeader::Schema => {
1626 let schema = message.header_as_schema().ok_or_else(|| {
1627 ArrowError::ParseError("Failed to parse schema from message header".to_string())
1628 })?;
1629 let arrow_schema = crate::convert::fb_to_schema(schema);
1630 IpcMessage::Schema(arrow_schema)
1631 }
1632 Message::MessageHeader::RecordBatch => {
1633 let batch = message.header_as_record_batch().ok_or_else(|| {
1634 ArrowError::IpcError("Unable to read IPC message as record batch".to_string())
1635 })?;
1636
1637 let version = message.version();
1638 let schema = self.schema.clone();
1639 let record_batch = RecordBatchDecoder::try_new(
1640 &body.into(),
1641 batch,
1642 schema,
1643 &self.dictionaries_by_id,
1644 &version,
1645 )?
1646 .with_projection(self.projection.as_ref().map(|x| x.0.as_ref()))
1647 .with_require_alignment(false)
1648 .with_skip_validation(self.skip_validation.clone())
1649 .read_record_batch()?;
1650 IpcMessage::RecordBatch(record_batch)
1651 }
1652 Message::MessageHeader::DictionaryBatch => {
1653 let dict = message.header_as_dictionary_batch().ok_or_else(|| {
1654 ArrowError::ParseError(
1655 "Failed to parse dictionary batch from message header".to_string(),
1656 )
1657 })?;
1658
1659 let version = message.version();
1660 let dict_values = get_dictionary_values(
1661 &body.into(),
1662 dict,
1663 &self.schema,
1664 &mut self.dictionaries_by_id,
1665 &version,
1666 false,
1667 self.skip_validation.clone(),
1668 )?;
1669
1670 update_dictionaries(
1671 &mut self.dictionaries_by_id,
1672 dict.isDelta(),
1673 dict.id(),
1674 dict_values.clone(),
1675 )?;
1676
1677 IpcMessage::DictionaryBatch {
1678 id: dict.id(),
1679 is_delta: (dict.isDelta()),
1680 values: (dict_values),
1681 }
1682 }
1683 x => {
1684 return Err(ArrowError::ParseError(format!(
1685 "Unsupported message header type in IPC stream: '{x:?}'"
1686 )));
1687 }
1688 };
1689
1690 Ok(Some(ipc_message))
1691 }
1692
1693 pub fn get_ref(&self) -> &R {
1697 self.reader.inner()
1698 }
1699
1700 pub fn get_mut(&mut self) -> &mut R {
1704 self.reader.inner_mut()
1705 }
1706
1707 pub unsafe fn with_skip_validation(mut self, skip_validation: bool) -> Self {
1713 unsafe { self.skip_validation.set(skip_validation) };
1714 self
1715 }
1716}
1717
1718impl<R: Read> Iterator for StreamReader<R> {
1719 type Item = Result<RecordBatch, ArrowError>;
1720
1721 fn next(&mut self) -> Option<Self::Item> {
1722 self.maybe_next().transpose()
1723 }
1724}
1725
1726impl<R: Read> RecordBatchReader for StreamReader<R> {
1727 fn schema(&self) -> SchemaRef {
1728 self.schema.clone()
1729 }
1730}
1731
1732#[derive(Debug)]
1738#[allow(dead_code)]
1739pub(crate) enum IpcMessage {
1740 Schema(arrow_schema::Schema),
1741 RecordBatch(RecordBatch),
1742 DictionaryBatch {
1743 id: i64,
1744 is_delta: bool,
1745 values: ArrayRef,
1746 },
1747}
1748
1749struct MessageReader<R> {
1752 reader: R,
1753 buf: Vec<u8>,
1754}
1755
1756impl<R: Read> MessageReader<R> {
1757 fn new(reader: R) -> Self {
1758 Self {
1759 reader,
1760 buf: Vec::new(),
1761 }
1762 }
1763
1764 fn maybe_next(&mut self) -> Result<Option<(Message::Message<'_>, MutableBuffer)>, ArrowError> {
1775 let meta_len = self.read_meta_len()?;
1776 let Some(meta_len) = meta_len else {
1777 return Ok(None);
1778 };
1779
1780 self.buf.resize(meta_len, 0);
1781 self.reader.read_exact(&mut self.buf)?;
1782
1783 let message = crate::root_as_message(self.buf.as_slice()).map_err(|err| {
1784 ArrowError::ParseError(format!("Unable to get root as message: {err:?}"))
1785 })?;
1786
1787 let mut buf = MutableBuffer::from_len_zeroed(message.bodyLength() as usize);
1788 self.reader.read_exact(&mut buf)?;
1789
1790 Ok(Some((message, buf)))
1791 }
1792
1793 fn inner_mut(&mut self) -> &mut R {
1795 &mut self.reader
1796 }
1797
1798 fn inner(&self) -> &R {
1800 &self.reader
1801 }
1802
1803 pub fn read_meta_len(&mut self) -> Result<Option<usize>, ArrowError> {
1812 let mut meta_len: [u8; 4] = [0; 4];
1813 match self.reader.read_exact(&mut meta_len) {
1814 Ok(_) => {}
1815 Err(e) => {
1816 return if e.kind() == std::io::ErrorKind::UnexpectedEof {
1817 Ok(None)
1821 } else {
1822 Err(ArrowError::from(e))
1823 };
1824 }
1825 };
1826
1827 let meta_len = {
1828 if meta_len == CONTINUATION_MARKER {
1831 self.reader.read_exact(&mut meta_len)?;
1832 }
1833
1834 i32::from_le_bytes(meta_len)
1835 };
1836
1837 if meta_len == 0 {
1838 return Ok(None);
1839 }
1840
1841 let meta_len = usize::try_from(meta_len)
1842 .map_err(|_| ArrowError::ParseError(format!("Invalid metadata length: {meta_len}")))?;
1843
1844 Ok(Some(meta_len))
1845 }
1846}
1847
1848#[cfg(test)]
1849mod tests {
1850 use std::io::Cursor;
1851
1852 use crate::convert::fb_to_schema;
1853 use crate::writer::{
1854 DictionaryTracker, IpcDataGenerator, IpcWriteOptions, unslice_run_array, write_message,
1855 };
1856
1857 use super::*;
1858
1859 use crate::{root_as_footer, root_as_message, size_prefixed_root_as_message};
1860 use arrow_array::builder::{PrimitiveRunBuilder, UnionBuilder};
1861 use arrow_array::types::*;
1862 use arrow_buffer::{NullBuffer, OffsetBuffer};
1863 use arrow_data::ArrayDataBuilder;
1864
1865 fn create_test_projection_schema() -> Schema {
1866 let list_data_type = DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true)));
1868
1869 let fixed_size_list_data_type =
1870 DataType::FixedSizeList(Arc::new(Field::new_list_field(DataType::Int32, false)), 3);
1871
1872 let union_fields = UnionFields::from_fields(vec![
1873 Field::new("a", DataType::Int32, false),
1874 Field::new("b", DataType::Float64, false),
1875 ]);
1876
1877 let union_data_type = DataType::Union(union_fields, UnionMode::Dense);
1878
1879 let struct_fields = Fields::from(vec![
1880 Field::new("id", DataType::Int32, false),
1881 Field::new_list("list", Field::new_list_field(DataType::Int8, true), false),
1882 ]);
1883 let struct_data_type = DataType::Struct(struct_fields);
1884
1885 let run_encoded_data_type = DataType::RunEndEncoded(
1886 Arc::new(Field::new("run_ends", DataType::Int16, false)),
1887 Arc::new(Field::new("values", DataType::Int32, true)),
1888 );
1889
1890 Schema::new(vec![
1892 Field::new("f0", DataType::UInt32, false),
1893 Field::new("f1", DataType::Utf8, false),
1894 Field::new("f2", DataType::Boolean, false),
1895 Field::new("f3", union_data_type, true),
1896 Field::new("f4", DataType::Null, true),
1897 Field::new("f5", DataType::Float64, true),
1898 Field::new("f6", list_data_type, false),
1899 Field::new("f7", DataType::FixedSizeBinary(3), true),
1900 Field::new("f8", fixed_size_list_data_type, false),
1901 Field::new("f9", struct_data_type, false),
1902 Field::new("f10", run_encoded_data_type, false),
1903 Field::new("f11", DataType::Boolean, false),
1904 Field::new_dictionary("f12", DataType::Int8, DataType::Utf8, false),
1905 Field::new("f13", DataType::Utf8, false),
1906 ])
1907 }
1908
1909 fn create_test_projection_batch_data(schema: &Schema) -> RecordBatch {
1910 let array0 = UInt32Array::from(vec![1, 2, 3]);
1912 let array1 = StringArray::from(vec!["foo", "bar", "baz"]);
1913 let array2 = BooleanArray::from(vec![true, false, true]);
1914
1915 let mut union_builder = UnionBuilder::new_dense();
1916 union_builder.append::<Int32Type>("a", 1).unwrap();
1917 union_builder.append::<Float64Type>("b", 10.1).unwrap();
1918 union_builder.append_null::<Float64Type>("b").unwrap();
1919 let array3 = union_builder.build().unwrap();
1920
1921 let array4 = NullArray::new(3);
1922 let array5 = Float64Array::from(vec![Some(1.1), None, Some(3.3)]);
1923 let array6_values = vec![
1924 Some(vec![Some(10), Some(10), Some(10)]),
1925 Some(vec![Some(20), Some(20), Some(20)]),
1926 Some(vec![Some(30), Some(30)]),
1927 ];
1928 let array6 = ListArray::from_iter_primitive::<Int32Type, _, _>(array6_values);
1929 let array7_values = vec![vec![11, 12, 13], vec![22, 23, 24], vec![33, 34, 35]];
1930 let array7 = FixedSizeBinaryArray::try_from_iter(array7_values.into_iter()).unwrap();
1931
1932 let array8_values = ArrayData::builder(DataType::Int32)
1933 .len(9)
1934 .add_buffer(Buffer::from_slice_ref([40, 41, 42, 43, 44, 45, 46, 47, 48]))
1935 .build()
1936 .unwrap();
1937 let array8_data = ArrayData::builder(schema.field(8).data_type().clone())
1938 .len(3)
1939 .add_child_data(array8_values)
1940 .build()
1941 .unwrap();
1942 let array8 = FixedSizeListArray::from(array8_data);
1943
1944 let array9_id: ArrayRef = Arc::new(Int32Array::from(vec![1001, 1002, 1003]));
1945 let array9_list: ArrayRef =
1946 Arc::new(ListArray::from_iter_primitive::<Int8Type, _, _>(vec![
1947 Some(vec![Some(-10)]),
1948 Some(vec![Some(-20), Some(-20), Some(-20)]),
1949 Some(vec![Some(-30)]),
1950 ]));
1951 let array9 = ArrayDataBuilder::new(schema.field(9).data_type().clone())
1952 .add_child_data(array9_id.into_data())
1953 .add_child_data(array9_list.into_data())
1954 .len(3)
1955 .build()
1956 .unwrap();
1957 let array9: ArrayRef = Arc::new(StructArray::from(array9));
1958
1959 let array10_input = vec![Some(1_i32), None, None];
1960 let mut array10_builder = PrimitiveRunBuilder::<Int16Type, Int32Type>::new();
1961 array10_builder.extend(array10_input);
1962 let array10 = array10_builder.finish();
1963
1964 let array11 = BooleanArray::from(vec![false, false, true]);
1965
1966 let array12_values = StringArray::from(vec!["x", "yy", "zzz"]);
1967 let array12_keys = Int8Array::from_iter_values([1, 1, 2]);
1968 let array12 = DictionaryArray::new(array12_keys, Arc::new(array12_values));
1969
1970 let array13 = StringArray::from(vec!["a", "bb", "ccc"]);
1971
1972 RecordBatch::try_new(
1974 Arc::new(schema.clone()),
1975 vec![
1976 Arc::new(array0),
1977 Arc::new(array1),
1978 Arc::new(array2),
1979 Arc::new(array3),
1980 Arc::new(array4),
1981 Arc::new(array5),
1982 Arc::new(array6),
1983 Arc::new(array7),
1984 Arc::new(array8),
1985 Arc::new(array9),
1986 Arc::new(array10),
1987 Arc::new(array11),
1988 Arc::new(array12),
1989 Arc::new(array13),
1990 ],
1991 )
1992 .unwrap()
1993 }
1994
1995 #[test]
1996 fn test_negative_meta_len_start_stream() {
1997 let bytes = i32::to_le_bytes(-1);
1998 let mut buf = vec![];
1999 buf.extend(CONTINUATION_MARKER);
2000 buf.extend(bytes);
2001
2002 let reader_err = StreamReader::try_new(Cursor::new(buf), None).err();
2003 assert!(reader_err.is_some());
2004 assert_eq!(
2005 reader_err.unwrap().to_string(),
2006 "Parser error: Invalid metadata length: -1"
2007 );
2008 }
2009
2010 #[test]
2011 fn test_negative_meta_len_mid_stream() {
2012 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
2013 let mut buf = Vec::new();
2014 {
2015 let mut writer = crate::writer::StreamWriter::try_new(&mut buf, &schema).unwrap();
2016 let batch =
2017 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(Int32Array::from(vec![1]))])
2018 .unwrap();
2019 writer.write(&batch).unwrap();
2020 }
2021
2022 let bytes = i32::to_le_bytes(-1);
2023 buf.extend(CONTINUATION_MARKER);
2024 buf.extend(bytes);
2025
2026 let mut reader = StreamReader::try_new(Cursor::new(buf), None).unwrap();
2027 assert!(reader.maybe_next().is_ok());
2029 let batch_err = reader.maybe_next().err();
2031 assert!(batch_err.is_some());
2032 assert_eq!(
2033 batch_err.unwrap().to_string(),
2034 "Parser error: Invalid metadata length: -1"
2035 );
2036 }
2037
2038 #[test]
2039 fn test_missing_buffer_metadata_error() {
2040 use crate::r#gen::Message::*;
2041 use flatbuffers::FlatBufferBuilder;
2042
2043 let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Int32, true)]));
2044
2045 let mut fbb = FlatBufferBuilder::new();
2048 let nodes = fbb.create_vector(&[FieldNode::new(2, 0)]);
2049 let buffers = fbb.create_vector(&[crate::Buffer::new(0, 8)]);
2050 let batch_offset = RecordBatch::create(
2051 &mut fbb,
2052 &RecordBatchArgs {
2053 length: 2,
2054 nodes: Some(nodes),
2055 buffers: Some(buffers),
2056 compression: None,
2057 variadicBufferCounts: None,
2058 },
2059 );
2060 fbb.finish_minimal(batch_offset);
2061 let batch_bytes = fbb.finished_data().to_vec();
2062 let batch = flatbuffers::root::<RecordBatch>(&batch_bytes).unwrap();
2063
2064 let data_buffer = Buffer::from(vec![0u8; 8]);
2065 let dictionaries: HashMap<i64, ArrayRef> = HashMap::new();
2066 let metadata = MetadataVersion::V5;
2067
2068 let decoder = RecordBatchDecoder::try_new(
2069 &data_buffer,
2070 batch,
2071 schema.clone(),
2072 &dictionaries,
2073 &metadata,
2074 )
2075 .unwrap();
2076
2077 let result = decoder.read_record_batch();
2078
2079 match result {
2080 Err(ArrowError::IpcError(msg)) => {
2081 assert_eq!(msg, "Buffer count mismatched with metadata");
2082 }
2083 other => panic!("unexpected error: {other:?}"),
2084 }
2085 }
2086
2087 #[test]
2088 fn test_projection_array_values() {
2089 let schema = create_test_projection_schema();
2091
2092 let batch = create_test_projection_batch_data(&schema);
2094
2095 let mut buf = Vec::new();
2097 {
2098 let mut writer = crate::writer::FileWriter::try_new(&mut buf, &schema).unwrap();
2099 writer.write(&batch).unwrap();
2100 writer.finish().unwrap();
2101 }
2102
2103 for index in 0..12 {
2105 let projection = vec![index];
2106 let reader = FileReader::try_new(std::io::Cursor::new(buf.clone()), Some(projection));
2107 let read_batch = reader.unwrap().next().unwrap().unwrap();
2108 let projected_column = read_batch.column(0);
2109 let expected_column = batch.column(index);
2110
2111 assert_eq!(projected_column.as_ref(), expected_column.as_ref());
2113 }
2114
2115 {
2116 let reader =
2118 FileReader::try_new(std::io::Cursor::new(buf.clone()), Some(vec![3, 2, 1]));
2119 let read_batch = reader.unwrap().next().unwrap().unwrap();
2120 let expected_batch = batch.project(&[3, 2, 1]).unwrap();
2121 assert_eq!(read_batch, expected_batch);
2122 }
2123 }
2124
2125 #[test]
2126 fn test_arrow_single_float_row() {
2127 let schema = Schema::new(vec![
2128 Field::new("a", DataType::Float32, false),
2129 Field::new("b", DataType::Float32, false),
2130 Field::new("c", DataType::Int32, false),
2131 Field::new("d", DataType::Int32, false),
2132 ]);
2133 let arrays = vec![
2134 Arc::new(Float32Array::from(vec![1.23])) as ArrayRef,
2135 Arc::new(Float32Array::from(vec![-6.50])) as ArrayRef,
2136 Arc::new(Int32Array::from(vec![2])) as ArrayRef,
2137 Arc::new(Int32Array::from(vec![1])) as ArrayRef,
2138 ];
2139 let batch = RecordBatch::try_new(Arc::new(schema.clone()), arrays).unwrap();
2140 let mut file = tempfile::tempfile().unwrap();
2142 let mut stream_writer = crate::writer::StreamWriter::try_new(&mut file, &schema).unwrap();
2143 stream_writer.write(&batch).unwrap();
2144 stream_writer.finish().unwrap();
2145
2146 drop(stream_writer);
2147
2148 file.rewind().unwrap();
2149
2150 let reader = StreamReader::try_new(&mut file, None).unwrap();
2152
2153 reader.for_each(|batch| {
2154 let batch = batch.unwrap();
2155 assert!(
2156 batch
2157 .column(0)
2158 .as_any()
2159 .downcast_ref::<Float32Array>()
2160 .unwrap()
2161 .value(0)
2162 != 0.0
2163 );
2164 assert!(
2165 batch
2166 .column(1)
2167 .as_any()
2168 .downcast_ref::<Float32Array>()
2169 .unwrap()
2170 .value(0)
2171 != 0.0
2172 );
2173 });
2174
2175 file.rewind().unwrap();
2176
2177 let reader = StreamReader::try_new(file, Some(vec![0, 3])).unwrap();
2179
2180 reader.for_each(|batch| {
2181 let batch = batch.unwrap();
2182 assert_eq!(batch.schema().fields().len(), 2);
2183 assert_eq!(batch.schema().fields()[0].data_type(), &DataType::Float32);
2184 assert_eq!(batch.schema().fields()[1].data_type(), &DataType::Int32);
2185 });
2186 }
2187
2188 fn write_ipc(rb: &RecordBatch) -> Vec<u8> {
2190 let mut buf = Vec::new();
2191 let mut writer = crate::writer::FileWriter::try_new(&mut buf, rb.schema_ref()).unwrap();
2192 writer.write(rb).unwrap();
2193 writer.finish().unwrap();
2194 buf
2195 }
2196
2197 fn read_ipc(buf: &[u8]) -> Result<RecordBatch, ArrowError> {
2199 let mut reader = FileReader::try_new(std::io::Cursor::new(buf), None)?;
2200 reader.next().unwrap()
2201 }
2202
2203 fn read_ipc_skip_validation(buf: &[u8]) -> Result<RecordBatch, ArrowError> {
2206 let mut reader = unsafe {
2207 FileReader::try_new(std::io::Cursor::new(buf), None)?.with_skip_validation(true)
2208 };
2209 reader.next().unwrap()
2210 }
2211
2212 fn roundtrip_ipc(rb: &RecordBatch) -> RecordBatch {
2213 let buf = write_ipc(rb);
2214 read_ipc(&buf).unwrap()
2215 }
2216
2217 fn read_ipc_with_decoder(buf: Vec<u8>) -> Result<RecordBatch, ArrowError> {
2220 read_ipc_with_decoder_inner(buf, false)
2221 }
2222
2223 fn read_ipc_with_decoder_skip_validation(buf: Vec<u8>) -> Result<RecordBatch, ArrowError> {
2226 read_ipc_with_decoder_inner(buf, true)
2227 }
2228
2229 fn read_ipc_with_decoder_inner(
2230 buf: Vec<u8>,
2231 skip_validation: bool,
2232 ) -> Result<RecordBatch, ArrowError> {
2233 let buffer = Buffer::from_vec(buf);
2234 let trailer_start = buffer.len() - 10;
2235 let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap())?;
2236 let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start])
2237 .map_err(|e| ArrowError::InvalidArgumentError(format!("Invalid footer: {e}")))?;
2238
2239 let schema = fb_to_schema(footer.schema().unwrap());
2240
2241 let mut decoder = unsafe {
2242 FileDecoder::new(Arc::new(schema), footer.version())
2243 .with_skip_validation(skip_validation)
2244 };
2245 for block in footer.dictionaries().iter().flatten() {
2247 let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
2248 let data = buffer.slice_with_length(block.offset() as _, block_len);
2249 decoder.read_dictionary(block, &data)?
2250 }
2251
2252 let batches = footer.recordBatches().unwrap();
2254 assert_eq!(batches.len(), 1); let block = batches.get(0);
2257 let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
2258 let data = buffer.slice_with_length(block.offset() as _, block_len);
2259 Ok(decoder.read_record_batch(block, &data)?.unwrap())
2260 }
2261
2262 fn write_stream(rb: &RecordBatch) -> Vec<u8> {
2264 let mut buf = Vec::new();
2265 let mut writer = crate::writer::StreamWriter::try_new(&mut buf, rb.schema_ref()).unwrap();
2266 writer.write(rb).unwrap();
2267 writer.finish().unwrap();
2268 buf
2269 }
2270
2271 fn read_stream(buf: &[u8]) -> Result<RecordBatch, ArrowError> {
2273 let mut reader = StreamReader::try_new(std::io::Cursor::new(buf), None)?;
2274 reader.next().unwrap()
2275 }
2276
2277 fn read_stream_skip_validation(buf: &[u8]) -> Result<RecordBatch, ArrowError> {
2280 let mut reader = unsafe {
2281 StreamReader::try_new(std::io::Cursor::new(buf), None)?.with_skip_validation(true)
2282 };
2283 reader.next().unwrap()
2284 }
2285
2286 fn roundtrip_ipc_stream(rb: &RecordBatch) -> RecordBatch {
2287 let buf = write_stream(rb);
2288 read_stream(&buf).unwrap()
2289 }
2290
2291 #[test]
2292 fn test_roundtrip_with_custom_metadata() {
2293 let schema = Schema::new(vec![Field::new("dummy", DataType::Float64, false)]);
2294 let mut buf = Vec::new();
2295 let mut writer = crate::writer::FileWriter::try_new(&mut buf, &schema).unwrap();
2296 let mut test_metadata = HashMap::new();
2297 test_metadata.insert("abc".to_string(), "abc".to_string());
2298 test_metadata.insert("def".to_string(), "def".to_string());
2299 for (k, v) in &test_metadata {
2300 writer.write_metadata(k, v);
2301 }
2302 writer.finish().unwrap();
2303 drop(writer);
2304
2305 let reader = crate::reader::FileReader::try_new(std::io::Cursor::new(buf), None).unwrap();
2306 assert_eq!(reader.custom_metadata(), &test_metadata);
2307 }
2308
2309 #[test]
2310 fn test_roundtrip_nested_dict() {
2311 let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2312
2313 let array = Arc::new(inner) as ArrayRef;
2314
2315 let dctfield = Arc::new(Field::new("dict", array.data_type().clone(), false));
2316
2317 let s = StructArray::from(vec![(dctfield, array)]);
2318 let struct_array = Arc::new(s) as ArrayRef;
2319
2320 let schema = Arc::new(Schema::new(vec![Field::new(
2321 "struct",
2322 struct_array.data_type().clone(),
2323 false,
2324 )]));
2325
2326 let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
2327
2328 assert_eq!(batch, roundtrip_ipc(&batch));
2329 }
2330
2331 #[test]
2332 fn test_roundtrip_nested_dict_no_preserve_dict_id() {
2333 let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2334
2335 let array = Arc::new(inner) as ArrayRef;
2336
2337 let dctfield = Arc::new(Field::new("dict", array.data_type().clone(), false));
2338
2339 let s = StructArray::from(vec![(dctfield, array)]);
2340 let struct_array = Arc::new(s) as ArrayRef;
2341
2342 let schema = Arc::new(Schema::new(vec![Field::new(
2343 "struct",
2344 struct_array.data_type().clone(),
2345 false,
2346 )]));
2347
2348 let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
2349
2350 let mut buf = Vec::new();
2351 let mut writer = crate::writer::FileWriter::try_new_with_options(
2352 &mut buf,
2353 batch.schema_ref(),
2354 IpcWriteOptions::default(),
2355 )
2356 .unwrap();
2357 writer.write(&batch).unwrap();
2358 writer.finish().unwrap();
2359 drop(writer);
2360
2361 let mut reader = FileReader::try_new(std::io::Cursor::new(buf), None).unwrap();
2362
2363 assert_eq!(batch, reader.next().unwrap().unwrap());
2364 }
2365
2366 fn check_union_with_builder(mut builder: UnionBuilder) {
2367 builder.append::<Int32Type>("a", 1).unwrap();
2368 builder.append_null::<Int32Type>("a").unwrap();
2369 builder.append::<Float64Type>("c", 3.0).unwrap();
2370 builder.append::<Int32Type>("a", 4).unwrap();
2371 builder.append::<Int64Type>("d", 11).unwrap();
2372 let union = builder.build().unwrap();
2373
2374 let schema = Arc::new(Schema::new(vec![Field::new(
2375 "union",
2376 union.data_type().clone(),
2377 false,
2378 )]));
2379
2380 let union_array = Arc::new(union) as ArrayRef;
2381
2382 let rb = RecordBatch::try_new(schema, vec![union_array]).unwrap();
2383 let rb2 = roundtrip_ipc(&rb);
2384 assert_eq!(rb.schema(), rb2.schema());
2387 assert_eq!(rb.num_columns(), rb2.num_columns());
2388 assert_eq!(rb.num_rows(), rb2.num_rows());
2389 let union1 = rb.column(0);
2390 let union2 = rb2.column(0);
2391
2392 assert_eq!(union1, union2);
2393 }
2394
2395 #[test]
2396 fn test_roundtrip_dense_union() {
2397 check_union_with_builder(UnionBuilder::new_dense());
2398 }
2399
2400 #[test]
2401 fn test_roundtrip_sparse_union() {
2402 check_union_with_builder(UnionBuilder::new_sparse());
2403 }
2404
2405 #[test]
2406 fn test_roundtrip_struct_empty_fields() {
2407 let nulls = NullBuffer::from(&[true, true, false]);
2408 let rb = RecordBatch::try_from_iter([(
2409 "",
2410 Arc::new(StructArray::new_empty_fields(nulls.len(), Some(nulls))) as _,
2411 )])
2412 .unwrap();
2413 let rb2 = roundtrip_ipc(&rb);
2414 assert_eq!(rb, rb2);
2415 }
2416
2417 #[test]
2418 fn test_roundtrip_stream_run_array_sliced() {
2419 let run_array_1: Int32RunArray = vec!["a", "a", "a", "b", "b", "c", "c", "c"]
2420 .into_iter()
2421 .collect();
2422 let run_array_1_sliced = run_array_1.slice(2, 5);
2423
2424 let run_array_2_inupt = vec![Some(1_i32), None, None, Some(2), Some(2)];
2425 let mut run_array_2_builder = PrimitiveRunBuilder::<Int16Type, Int32Type>::new();
2426 run_array_2_builder.extend(run_array_2_inupt);
2427 let run_array_2 = run_array_2_builder.finish();
2428
2429 let schema = Arc::new(Schema::new(vec![
2430 Field::new(
2431 "run_array_1_sliced",
2432 run_array_1_sliced.data_type().clone(),
2433 false,
2434 ),
2435 Field::new("run_array_2", run_array_2.data_type().clone(), false),
2436 ]));
2437 let input_batch = RecordBatch::try_new(
2438 schema,
2439 vec![Arc::new(run_array_1_sliced.clone()), Arc::new(run_array_2)],
2440 )
2441 .unwrap();
2442 let output_batch = roundtrip_ipc_stream(&input_batch);
2443
2444 assert_eq!(input_batch.column(1), output_batch.column(1));
2448
2449 let run_array_1_unsliced = unslice_run_array(run_array_1_sliced.into_data()).unwrap();
2450 assert_eq!(run_array_1_unsliced, output_batch.column(0).into_data());
2451 }
2452
2453 #[test]
2454 fn test_roundtrip_stream_nested_dict() {
2455 let xs = vec!["AA", "BB", "AA", "CC", "BB"];
2456 let dict = Arc::new(
2457 xs.clone()
2458 .into_iter()
2459 .collect::<DictionaryArray<Int8Type>>(),
2460 );
2461 let string_array: ArrayRef = Arc::new(StringArray::from(xs.clone()));
2462 let struct_array = StructArray::from(vec![
2463 (
2464 Arc::new(Field::new("f2.1", DataType::Utf8, false)),
2465 string_array,
2466 ),
2467 (
2468 Arc::new(Field::new("f2.2_struct", dict.data_type().clone(), false)),
2469 dict.clone() as ArrayRef,
2470 ),
2471 ]);
2472 let schema = Arc::new(Schema::new(vec![
2473 Field::new("f1_string", DataType::Utf8, false),
2474 Field::new("f2_struct", struct_array.data_type().clone(), false),
2475 ]));
2476 let input_batch = RecordBatch::try_new(
2477 schema,
2478 vec![
2479 Arc::new(StringArray::from(xs.clone())),
2480 Arc::new(struct_array),
2481 ],
2482 )
2483 .unwrap();
2484 let output_batch = roundtrip_ipc_stream(&input_batch);
2485 assert_eq!(input_batch, output_batch);
2486 }
2487
2488 #[test]
2489 fn test_roundtrip_stream_nested_dict_of_map_of_dict() {
2490 let values = StringArray::from(vec![Some("a"), None, Some("b"), Some("c")]);
2491 let values = Arc::new(values) as ArrayRef;
2492 let value_dict_keys = Int8Array::from_iter_values([0, 1, 1, 2, 3, 1]);
2493 let value_dict_array = DictionaryArray::new(value_dict_keys, values.clone());
2494
2495 let key_dict_keys = Int8Array::from_iter_values([0, 0, 2, 1, 1, 3]);
2496 let key_dict_array = DictionaryArray::new(key_dict_keys, values);
2497
2498 #[allow(deprecated)]
2499 let keys_field = Arc::new(Field::new_dict(
2500 "keys",
2501 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2502 true, 1,
2504 false,
2505 ));
2506 #[allow(deprecated)]
2507 let values_field = Arc::new(Field::new_dict(
2508 "values",
2509 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2510 true,
2511 2,
2512 false,
2513 ));
2514 let entry_struct = StructArray::from(vec![
2515 (keys_field, make_array(key_dict_array.into_data())),
2516 (values_field, make_array(value_dict_array.into_data())),
2517 ]);
2518 let map_data_type = DataType::Map(
2519 Arc::new(Field::new(
2520 "entries",
2521 entry_struct.data_type().clone(),
2522 false,
2523 )),
2524 false,
2525 );
2526
2527 let entry_offsets = Buffer::from_slice_ref([0, 2, 4, 6]);
2528 let map_data = ArrayData::builder(map_data_type)
2529 .len(3)
2530 .add_buffer(entry_offsets)
2531 .add_child_data(entry_struct.into_data())
2532 .build()
2533 .unwrap();
2534 let map_array = MapArray::from(map_data);
2535
2536 let dict_keys = Int8Array::from_iter_values([0, 1, 1, 2, 2, 1]);
2537 let dict_dict_array = DictionaryArray::new(dict_keys, Arc::new(map_array));
2538
2539 let schema = Arc::new(Schema::new(vec![Field::new(
2540 "f1",
2541 dict_dict_array.data_type().clone(),
2542 false,
2543 )]));
2544 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2545 let output_batch = roundtrip_ipc_stream(&input_batch);
2546 assert_eq!(input_batch, output_batch);
2547 }
2548
2549 fn test_roundtrip_stream_dict_of_list_of_dict_impl<
2550 OffsetSize: OffsetSizeTrait,
2551 U: ArrowNativeType,
2552 >(
2553 list_data_type: DataType,
2554 offsets: &[U; 5],
2555 ) {
2556 let values = StringArray::from(vec![Some("a"), None, Some("c"), None]);
2557 let keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1, 3]);
2558 let dict_array = DictionaryArray::new(keys, Arc::new(values));
2559 let dict_data = dict_array.to_data();
2560
2561 let value_offsets = Buffer::from_slice_ref(offsets);
2562
2563 let list_data = ArrayData::builder(list_data_type)
2564 .len(4)
2565 .add_buffer(value_offsets)
2566 .add_child_data(dict_data)
2567 .build()
2568 .unwrap();
2569 let list_array = GenericListArray::<OffsetSize>::from(list_data);
2570
2571 let keys_for_dict = Int8Array::from_iter_values([0, 3, 0, 1, 1, 2, 0, 1, 3]);
2572 let dict_dict_array = DictionaryArray::new(keys_for_dict, Arc::new(list_array));
2573
2574 let schema = Arc::new(Schema::new(vec![Field::new(
2575 "f1",
2576 dict_dict_array.data_type().clone(),
2577 false,
2578 )]));
2579 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2580 let output_batch = roundtrip_ipc_stream(&input_batch);
2581 assert_eq!(input_batch, output_batch);
2582 }
2583
2584 #[test]
2585 fn test_roundtrip_stream_dict_of_list_of_dict() {
2586 #[allow(deprecated)]
2588 let list_data_type = DataType::List(Arc::new(Field::new_dict(
2589 "item",
2590 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2591 true,
2592 1,
2593 false,
2594 )));
2595 let offsets: &[i32; 5] = &[0, 2, 4, 4, 6];
2596 test_roundtrip_stream_dict_of_list_of_dict_impl::<i32, i32>(list_data_type, offsets);
2597
2598 #[allow(deprecated)]
2600 let list_data_type = DataType::LargeList(Arc::new(Field::new_dict(
2601 "item",
2602 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2603 true,
2604 1,
2605 false,
2606 )));
2607 let offsets: &[i64; 5] = &[0, 2, 4, 4, 7];
2608 test_roundtrip_stream_dict_of_list_of_dict_impl::<i64, i64>(list_data_type, offsets);
2609 }
2610
2611 #[test]
2612 fn test_roundtrip_stream_dict_of_fixed_size_list_of_dict() {
2613 let values = StringArray::from(vec![Some("a"), None, Some("c"), None]);
2614 let keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1, 3, 1, 2]);
2615 let dict_array = DictionaryArray::new(keys, Arc::new(values));
2616 let dict_data = dict_array.into_data();
2617
2618 #[allow(deprecated)]
2619 let list_data_type = DataType::FixedSizeList(
2620 Arc::new(Field::new_dict(
2621 "item",
2622 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2623 true,
2624 1,
2625 false,
2626 )),
2627 3,
2628 );
2629 let list_data = ArrayData::builder(list_data_type)
2630 .len(3)
2631 .add_child_data(dict_data)
2632 .build()
2633 .unwrap();
2634 let list_array = FixedSizeListArray::from(list_data);
2635
2636 let keys_for_dict = Int8Array::from_iter_values([0, 1, 0, 1, 1, 2, 0, 1, 2]);
2637 let dict_dict_array = DictionaryArray::new(keys_for_dict, Arc::new(list_array));
2638
2639 let schema = Arc::new(Schema::new(vec![Field::new(
2640 "f1",
2641 dict_dict_array.data_type().clone(),
2642 false,
2643 )]));
2644 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2645 let output_batch = roundtrip_ipc_stream(&input_batch);
2646 assert_eq!(input_batch, output_batch);
2647 }
2648
2649 const LONG_TEST_STRING: &str =
2650 "This is a long string to make sure binary view array handles it";
2651
2652 #[test]
2653 fn test_roundtrip_view_types() {
2654 let schema = Schema::new(vec![
2655 Field::new("field_1", DataType::BinaryView, true),
2656 Field::new("field_2", DataType::Utf8, true),
2657 Field::new("field_3", DataType::Utf8View, true),
2658 ]);
2659 let bin_values: Vec<Option<&[u8]>> = vec![
2660 Some(b"foo"),
2661 None,
2662 Some(b"bar"),
2663 Some(LONG_TEST_STRING.as_bytes()),
2664 ];
2665 let utf8_values: Vec<Option<&str>> =
2666 vec![Some("foo"), None, Some("bar"), Some(LONG_TEST_STRING)];
2667 let bin_view_array = BinaryViewArray::from_iter(bin_values);
2668 let utf8_array = StringArray::from_iter(utf8_values.iter());
2669 let utf8_view_array = StringViewArray::from_iter(utf8_values);
2670 let record_batch = RecordBatch::try_new(
2671 Arc::new(schema.clone()),
2672 vec![
2673 Arc::new(bin_view_array),
2674 Arc::new(utf8_array),
2675 Arc::new(utf8_view_array),
2676 ],
2677 )
2678 .unwrap();
2679
2680 assert_eq!(record_batch, roundtrip_ipc(&record_batch));
2681 assert_eq!(record_batch, roundtrip_ipc_stream(&record_batch));
2682
2683 let sliced_batch = record_batch.slice(1, 2);
2684 assert_eq!(sliced_batch, roundtrip_ipc(&sliced_batch));
2685 assert_eq!(sliced_batch, roundtrip_ipc_stream(&sliced_batch));
2686 }
2687
2688 #[test]
2689 fn test_roundtrip_view_types_nested_dict() {
2690 let bin_values: Vec<Option<&[u8]>> = vec![
2691 Some(b"foo"),
2692 None,
2693 Some(b"bar"),
2694 Some(LONG_TEST_STRING.as_bytes()),
2695 Some(b"field"),
2696 ];
2697 let utf8_values: Vec<Option<&str>> = vec![
2698 Some("foo"),
2699 None,
2700 Some("bar"),
2701 Some(LONG_TEST_STRING),
2702 Some("field"),
2703 ];
2704 let bin_view_array = Arc::new(BinaryViewArray::from_iter(bin_values));
2705 let utf8_view_array = Arc::new(StringViewArray::from_iter(utf8_values));
2706
2707 let key_dict_keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1, 3]);
2708 let key_dict_array = DictionaryArray::new(key_dict_keys, utf8_view_array.clone());
2709 #[allow(deprecated)]
2710 let keys_field = Arc::new(Field::new_dict(
2711 "keys",
2712 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8View)),
2713 true,
2714 1,
2715 false,
2716 ));
2717
2718 let value_dict_keys = Int8Array::from_iter_values([0, 3, 0, 1, 2, 0, 1]);
2719 let value_dict_array = DictionaryArray::new(value_dict_keys, bin_view_array);
2720 #[allow(deprecated)]
2721 let values_field = Arc::new(Field::new_dict(
2722 "values",
2723 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::BinaryView)),
2724 true,
2725 2,
2726 false,
2727 ));
2728 let entry_struct = StructArray::from(vec![
2729 (keys_field, make_array(key_dict_array.into_data())),
2730 (values_field, make_array(value_dict_array.into_data())),
2731 ]);
2732
2733 let map_data_type = DataType::Map(
2734 Arc::new(Field::new(
2735 "entries",
2736 entry_struct.data_type().clone(),
2737 false,
2738 )),
2739 false,
2740 );
2741 let entry_offsets = Buffer::from_slice_ref([0, 2, 4, 7]);
2742 let map_data = ArrayData::builder(map_data_type)
2743 .len(3)
2744 .add_buffer(entry_offsets)
2745 .add_child_data(entry_struct.into_data())
2746 .build()
2747 .unwrap();
2748 let map_array = MapArray::from(map_data);
2749
2750 let dict_keys = Int8Array::from_iter_values([0, 1, 0, 1, 1, 2, 0, 1, 2]);
2751 let dict_dict_array = DictionaryArray::new(dict_keys, Arc::new(map_array));
2752 let schema = Arc::new(Schema::new(vec![Field::new(
2753 "f1",
2754 dict_dict_array.data_type().clone(),
2755 false,
2756 )]));
2757 let batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2758 assert_eq!(batch, roundtrip_ipc(&batch));
2759 assert_eq!(batch, roundtrip_ipc_stream(&batch));
2760
2761 let sliced_batch = batch.slice(1, 2);
2762 assert_eq!(sliced_batch, roundtrip_ipc(&sliced_batch));
2763 assert_eq!(sliced_batch, roundtrip_ipc_stream(&sliced_batch));
2764 }
2765
2766 #[test]
2767 fn test_no_columns_batch() {
2768 let schema = Arc::new(Schema::empty());
2769 let options = RecordBatchOptions::new()
2770 .with_match_field_names(true)
2771 .with_row_count(Some(10));
2772 let input_batch = RecordBatch::try_new_with_options(schema, vec![], &options).unwrap();
2773 let output_batch = roundtrip_ipc_stream(&input_batch);
2774 assert_eq!(input_batch, output_batch);
2775 }
2776
2777 #[test]
2778 fn test_unaligned() {
2779 let batch = RecordBatch::try_from_iter(vec![(
2780 "i32",
2781 Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _,
2782 )])
2783 .unwrap();
2784
2785 let r#gen = IpcDataGenerator {};
2786 let mut dict_tracker = DictionaryTracker::new(false);
2787 let (_, encoded) = r#gen
2788 .encode(
2789 &batch,
2790 &mut dict_tracker,
2791 &Default::default(),
2792 &mut Default::default(),
2793 )
2794 .unwrap();
2795
2796 let message = root_as_message(&encoded.ipc_message).unwrap();
2797
2798 let mut buffer = MutableBuffer::with_capacity(encoded.arrow_data.len() + 1);
2800 buffer.push(0_u8);
2801 buffer.extend_from_slice(&encoded.arrow_data);
2802 let b = Buffer::from(buffer).slice(1);
2803 assert_ne!(b.as_ptr().align_offset(8), 0);
2804
2805 let ipc_batch = message.header_as_record_batch().unwrap();
2806 let roundtrip = RecordBatchDecoder::try_new(
2807 &b,
2808 ipc_batch,
2809 batch.schema(),
2810 &Default::default(),
2811 &message.version(),
2812 )
2813 .unwrap()
2814 .with_require_alignment(false)
2815 .read_record_batch()
2816 .unwrap();
2817 assert_eq!(batch, roundtrip);
2818 }
2819
2820 #[test]
2821 fn test_unaligned_throws_error_with_require_alignment() {
2822 let batch = RecordBatch::try_from_iter(vec![(
2823 "i32",
2824 Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _,
2825 )])
2826 .unwrap();
2827
2828 let r#gen = IpcDataGenerator {};
2829 let mut dict_tracker = DictionaryTracker::new(false);
2830 let (_, encoded) = r#gen
2831 .encode(
2832 &batch,
2833 &mut dict_tracker,
2834 &Default::default(),
2835 &mut Default::default(),
2836 )
2837 .unwrap();
2838
2839 let message = root_as_message(&encoded.ipc_message).unwrap();
2840
2841 let mut buffer = MutableBuffer::with_capacity(encoded.arrow_data.len() + 1);
2843 buffer.push(0_u8);
2844 buffer.extend_from_slice(&encoded.arrow_data);
2845 let b = Buffer::from(buffer).slice(1);
2846 assert_ne!(b.as_ptr().align_offset(8), 0);
2847
2848 let ipc_batch = message.header_as_record_batch().unwrap();
2849 let result = RecordBatchDecoder::try_new(
2850 &b,
2851 ipc_batch,
2852 batch.schema(),
2853 &Default::default(),
2854 &message.version(),
2855 )
2856 .unwrap()
2857 .with_require_alignment(true)
2858 .read_record_batch();
2859
2860 let error = result.unwrap_err();
2861 assert_eq!(
2862 error.to_string(),
2863 "Invalid argument error: Misaligned buffers[0] in array of type Int32, \
2864 offset from expected alignment of 4 by 1"
2865 );
2866 }
2867
2868 #[test]
2869 fn test_file_with_massive_column_count() {
2870 let limit = 600_000;
2872
2873 let fields = (0..limit)
2874 .map(|i| Field::new(format!("{i}"), DataType::Boolean, false))
2875 .collect::<Vec<_>>();
2876 let schema = Arc::new(Schema::new(fields));
2877 let batch = RecordBatch::new_empty(schema);
2878
2879 let mut buf = Vec::new();
2880 let mut writer = crate::writer::FileWriter::try_new(&mut buf, batch.schema_ref()).unwrap();
2881 writer.write(&batch).unwrap();
2882 writer.finish().unwrap();
2883 drop(writer);
2884
2885 let mut reader = FileReaderBuilder::new()
2886 .with_max_footer_fb_tables(1_500_000)
2887 .build(std::io::Cursor::new(buf))
2888 .unwrap();
2889 let roundtrip_batch = reader.next().unwrap().unwrap();
2890
2891 assert_eq!(batch, roundtrip_batch);
2892 }
2893
2894 #[test]
2895 fn test_file_with_deeply_nested_columns() {
2896 let limit = 61;
2898
2899 let fields = (0..limit).fold(
2900 vec![Field::new("leaf", DataType::Boolean, false)],
2901 |field, index| vec![Field::new_struct(format!("{index}"), field, false)],
2902 );
2903 let schema = Arc::new(Schema::new(fields));
2904 let batch = RecordBatch::new_empty(schema);
2905
2906 let mut buf = Vec::new();
2907 let mut writer = crate::writer::FileWriter::try_new(&mut buf, batch.schema_ref()).unwrap();
2908 writer.write(&batch).unwrap();
2909 writer.finish().unwrap();
2910 drop(writer);
2911
2912 let mut reader = FileReaderBuilder::new()
2913 .with_max_footer_fb_depth(65)
2914 .build(std::io::Cursor::new(buf))
2915 .unwrap();
2916 let roundtrip_batch = reader.next().unwrap().unwrap();
2917
2918 assert_eq!(batch, roundtrip_batch);
2919 }
2920
2921 #[test]
2922 fn test_invalid_struct_array_ipc_read_errors() {
2923 let a_field = Field::new("a", DataType::Int32, false);
2924 let b_field = Field::new("b", DataType::Int32, false);
2925 let struct_fields = Fields::from(vec![a_field.clone(), b_field.clone()]);
2926
2927 let a_array_data = ArrayData::builder(a_field.data_type().clone())
2928 .len(4)
2929 .add_buffer(Buffer::from_slice_ref([1, 2, 3, 4]))
2930 .build()
2931 .unwrap();
2932 let b_array_data = ArrayData::builder(b_field.data_type().clone())
2933 .len(3)
2934 .add_buffer(Buffer::from_slice_ref([5, 6, 7]))
2935 .build()
2936 .unwrap();
2937
2938 let invalid_struct_arr = unsafe {
2939 StructArray::new_unchecked(
2940 struct_fields,
2941 vec![make_array(a_array_data), make_array(b_array_data)],
2942 None,
2943 )
2944 };
2945
2946 expect_ipc_validation_error(
2947 Arc::new(invalid_struct_arr),
2948 "Invalid argument error: Incorrect array length for StructArray field \"b\", expected 4 got 3",
2949 );
2950 }
2951
2952 #[test]
2953 fn test_invalid_nested_array_ipc_read_errors() {
2954 let a_field = Field::new("a", DataType::Int32, false);
2956 let b_field = Field::new("b", DataType::Utf8, false);
2957
2958 let schema = Arc::new(Schema::new(vec![Field::new_struct(
2959 "s",
2960 vec![a_field.clone(), b_field.clone()],
2961 false,
2962 )]));
2963
2964 let a_array_data = ArrayData::builder(a_field.data_type().clone())
2965 .len(4)
2966 .add_buffer(Buffer::from_slice_ref([1, 2, 3, 4]))
2967 .build()
2968 .unwrap();
2969 let b_array_data = {
2971 let valid: &[u8] = b" ";
2972 let mut invalid = vec![];
2973 invalid.extend_from_slice(b"ValidString");
2974 invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
2975 let binary_array =
2976 BinaryArray::from_iter(vec![None, Some(valid), None, Some(&invalid)]);
2977 let array = unsafe {
2978 StringArray::new_unchecked(
2979 binary_array.offsets().clone(),
2980 binary_array.values().clone(),
2981 binary_array.nulls().cloned(),
2982 )
2983 };
2984 array.into_data()
2985 };
2986 let struct_data_type = schema.field(0).data_type();
2987
2988 let invalid_struct_arr = unsafe {
2989 make_array(
2990 ArrayData::builder(struct_data_type.clone())
2991 .len(4)
2992 .add_child_data(a_array_data)
2993 .add_child_data(b_array_data)
2994 .build_unchecked(),
2995 )
2996 };
2997 expect_ipc_validation_error(
2998 Arc::new(invalid_struct_arr),
2999 "Invalid argument error: Invalid UTF8 sequence at string index 3 (3..18): invalid utf-8 sequence of 1 bytes from index 11",
3000 );
3001 }
3002
3003 #[test]
3004 fn test_same_dict_id_without_preserve() {
3005 let batch = RecordBatch::try_new(
3006 Arc::new(Schema::new(
3007 ["a", "b"]
3008 .iter()
3009 .map(|name| {
3010 #[allow(deprecated)]
3011 Field::new_dict(
3012 name.to_string(),
3013 DataType::Dictionary(
3014 Box::new(DataType::Int32),
3015 Box::new(DataType::Utf8),
3016 ),
3017 true,
3018 0,
3019 false,
3020 )
3021 })
3022 .collect::<Vec<Field>>(),
3023 )),
3024 vec![
3025 Arc::new(
3026 vec![Some("c"), Some("d")]
3027 .into_iter()
3028 .collect::<DictionaryArray<Int32Type>>(),
3029 ) as ArrayRef,
3030 Arc::new(
3031 vec![Some("e"), Some("f")]
3032 .into_iter()
3033 .collect::<DictionaryArray<Int32Type>>(),
3034 ) as ArrayRef,
3035 ],
3036 )
3037 .expect("Failed to create RecordBatch");
3038
3039 let mut buf = vec![];
3041 {
3042 let mut writer = crate::writer::StreamWriter::try_new_with_options(
3043 &mut buf,
3044 batch.schema().as_ref(),
3045 crate::writer::IpcWriteOptions::default(),
3046 )
3047 .expect("Failed to create StreamWriter");
3048 writer.write(&batch).expect("Failed to write RecordBatch");
3049 writer.finish().expect("Failed to finish StreamWriter");
3050 }
3051
3052 StreamReader::try_new(std::io::Cursor::new(buf), None)
3053 .expect("Failed to create StreamReader")
3054 .for_each(|decoded_batch| {
3055 assert_eq!(decoded_batch.expect("Failed to read RecordBatch"), batch);
3056 });
3057 }
3058
3059 #[test]
3060 fn test_validation_of_invalid_list_array() {
3061 let array = unsafe {
3063 let values = Int32Array::from(vec![1, 2, 3]);
3064 let bad_offsets = ScalarBuffer::<i32>::from(vec![0, 2, 4, 2]); let offsets = OffsetBuffer::new_unchecked(bad_offsets); let field = Field::new_list_field(DataType::Int32, true);
3067 let nulls = None;
3068 ListArray::new(Arc::new(field), offsets, Arc::new(values), nulls)
3069 };
3070
3071 expect_ipc_validation_error(
3072 Arc::new(array),
3073 "Invalid argument error: Offset invariant failure: offset at position 2 out of bounds: 4 > 2",
3074 );
3075 }
3076
3077 #[test]
3078 fn test_validation_of_invalid_string_array() {
3079 let valid: &[u8] = b" ";
3080 let mut invalid = vec![];
3081 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3082 invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
3083 let binary_array = BinaryArray::from_iter(vec![None, Some(valid), None, Some(&invalid)]);
3084 let array = unsafe {
3087 StringArray::new_unchecked(
3088 binary_array.offsets().clone(),
3089 binary_array.values().clone(),
3090 binary_array.nulls().cloned(),
3091 )
3092 };
3093 expect_ipc_validation_error(
3094 Arc::new(array),
3095 "Invalid argument error: Invalid UTF8 sequence at string index 3 (3..45): invalid utf-8 sequence of 1 bytes from index 38",
3096 );
3097 }
3098
3099 #[test]
3100 fn test_validation_of_invalid_string_view_array() {
3101 let valid: &[u8] = b" ";
3102 let mut invalid = vec![];
3103 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3104 invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
3105 let binary_view_array =
3106 BinaryViewArray::from_iter(vec![None, Some(valid), None, Some(&invalid)]);
3107 let array = unsafe {
3110 StringViewArray::new_unchecked(
3111 binary_view_array.views().clone(),
3112 binary_view_array.data_buffers().to_vec(),
3113 binary_view_array.nulls().cloned(),
3114 )
3115 };
3116 expect_ipc_validation_error(
3117 Arc::new(array),
3118 "Invalid argument error: Encountered non-UTF-8 data at index 3: invalid utf-8 sequence of 1 bytes from index 38",
3119 );
3120 }
3121
3122 #[test]
3125 fn test_validation_of_invalid_dictionary_array() {
3126 let array = unsafe {
3127 let values = StringArray::from_iter_values(["a", "b", "c"]);
3128 let keys = Int32Array::from(vec![1, 200]); DictionaryArray::new_unchecked(keys, Arc::new(values))
3130 };
3131
3132 expect_ipc_validation_error(
3133 Arc::new(array),
3134 "Invalid argument error: Value at position 1 out of bounds: 200 (should be in [0, 2])",
3135 );
3136 }
3137
3138 #[test]
3139 fn test_validation_of_invalid_union_array() {
3140 let array = unsafe {
3141 let fields = UnionFields::try_new(
3142 vec![1, 3], vec![
3144 Field::new("a", DataType::Int32, false),
3145 Field::new("b", DataType::Utf8, false),
3146 ],
3147 )
3148 .unwrap();
3149 let type_ids = ScalarBuffer::from(vec![1i8, 2, 3]); let offsets = None;
3151 let children: Vec<ArrayRef> = vec![
3152 Arc::new(Int32Array::from(vec![10, 20, 30])),
3153 Arc::new(StringArray::from(vec![Some("a"), Some("b"), Some("c")])),
3154 ];
3155
3156 UnionArray::new_unchecked(fields, type_ids, offsets, children)
3157 };
3158
3159 expect_ipc_validation_error(
3160 Arc::new(array),
3161 "Invalid argument error: Type Ids values must match one of the field type ids",
3162 );
3163 }
3164
3165 const INVALID_UTF8_FIRST_CHAR: &[u8] = &[0xa0, 0xa1, 0x20, 0x20];
3168
3169 fn expect_ipc_validation_error(array: ArrayRef, expected_err: &str) {
3171 let rb = RecordBatch::try_from_iter([("a", array)]).unwrap();
3172
3173 let buf = write_stream(&rb); read_stream_skip_validation(&buf).unwrap();
3176 let err = read_stream(&buf).unwrap_err();
3177 assert_eq!(err.to_string(), expected_err);
3178
3179 let buf = write_ipc(&rb); read_ipc_skip_validation(&buf).unwrap();
3182 let err = read_ipc(&buf).unwrap_err();
3183 assert_eq!(err.to_string(), expected_err);
3184
3185 read_ipc_with_decoder_skip_validation(buf.clone()).unwrap();
3187 let err = read_ipc_with_decoder(buf).unwrap_err();
3188 assert_eq!(err.to_string(), expected_err);
3189 }
3190
3191 #[test]
3192 fn test_roundtrip_schema() {
3193 let schema = Schema::new(vec![
3194 Field::new(
3195 "a",
3196 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
3197 false,
3198 ),
3199 Field::new(
3200 "b",
3201 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
3202 false,
3203 ),
3204 ]);
3205
3206 let options = IpcWriteOptions::default();
3207 let data_gen = IpcDataGenerator::default();
3208 let mut dict_tracker = DictionaryTracker::new(false);
3209 let encoded_data =
3210 data_gen.schema_to_bytes_with_dictionary_tracker(&schema, &mut dict_tracker, &options);
3211 let mut schema_bytes = vec![];
3212 write_message(&mut schema_bytes, encoded_data, &options).expect("write_message");
3213
3214 let begin_offset: usize = if schema_bytes[0..4].eq(&CONTINUATION_MARKER) {
3215 4
3216 } else {
3217 0
3218 };
3219
3220 size_prefixed_root_as_message(&schema_bytes[begin_offset..])
3221 .expect_err("size_prefixed_root_as_message");
3222
3223 let msg = parse_message(&schema_bytes).expect("parse_message");
3224 let ipc_schema = msg.header_as_schema().expect("header_as_schema");
3225 let new_schema = fb_to_schema(ipc_schema);
3226
3227 assert_eq!(schema, new_schema);
3228 }
3229
3230 #[test]
3231 fn test_negative_meta_len() {
3232 let bytes = i32::to_le_bytes(-1);
3233 let mut buf = vec![];
3234 buf.extend(CONTINUATION_MARKER);
3235 buf.extend(bytes);
3236
3237 let reader = StreamReader::try_new(Cursor::new(buf), None);
3238 assert!(reader.is_err());
3239 }
3240}