1mod stream;
28pub use stream::*;
29
30use arrow_select::concat;
31
32use flatbuffers::{VectorIter, VerifierOptions};
33use std::collections::{HashMap, VecDeque};
34use std::fmt;
35use std::io::{BufReader, Read, Seek, SeekFrom};
36use std::sync::Arc;
37
38use arrow_array::*;
39use arrow_buffer::{
40 ArrowNativeType, BooleanBuffer, Buffer, MutableBuffer, NullBuffer, ScalarBuffer,
41};
42use arrow_data::{ArrayData, ArrayDataBuilder, UnsafeFlag};
43use arrow_schema::*;
44
45use crate::compression::CompressionCodec;
46use crate::r#gen::Message::{self};
47use crate::{Block, CONTINUATION_MARKER, FieldNode, MetadataVersion};
48use DataType::*;
49
50fn read_buffer(
60 buf: &crate::Buffer,
61 a_data: &Buffer,
62 compression_codec: Option<CompressionCodec>,
63) -> Result<Buffer, ArrowError> {
64 let start_offset = buf.offset() as usize;
65 let buf_data = a_data.slice_with_length(start_offset, buf.length() as usize);
66 match (buf_data.is_empty(), compression_codec) {
68 (true, _) | (_, None) => Ok(buf_data),
69 (false, Some(decompressor)) => decompressor.decompress_to_buffer(&buf_data),
70 }
71}
72impl RecordBatchDecoder<'_> {
73 fn create_array(
86 &mut self,
87 field: &Field,
88 variadic_counts: &mut VecDeque<i64>,
89 ) -> Result<ArrayRef, ArrowError> {
90 let data_type = field.data_type();
91 match data_type {
92 Utf8 | Binary | LargeBinary | LargeUtf8 => {
93 let field_node = self.next_node(field)?;
94 let buffers = [
95 self.next_buffer()?,
96 self.next_buffer()?,
97 self.next_buffer()?,
98 ];
99 self.create_primitive_array(field_node, data_type, &buffers)
100 }
101 BinaryView | Utf8View => {
102 let count = variadic_counts
103 .pop_front()
104 .ok_or(ArrowError::IpcError(format!(
105 "Missing variadic count for {data_type} column"
106 )))?;
107 let count = count + 2; let buffers = (0..count)
109 .map(|_| self.next_buffer())
110 .collect::<Result<Vec<_>, _>>()?;
111 let field_node = self.next_node(field)?;
112 self.create_primitive_array(field_node, data_type, &buffers)
113 }
114 FixedSizeBinary(_) => {
115 let field_node = self.next_node(field)?;
116 let buffers = [self.next_buffer()?, self.next_buffer()?];
117 self.create_primitive_array(field_node, data_type, &buffers)
118 }
119 List(list_field) | LargeList(list_field) | Map(list_field, _) => {
120 let list_node = self.next_node(field)?;
121 let list_buffers = [self.next_buffer()?, self.next_buffer()?];
122 let values = self.create_array(list_field, variadic_counts)?;
123 self.create_list_array(list_node, data_type, &list_buffers, values)
124 }
125 FixedSizeList(list_field, _) => {
126 let list_node = self.next_node(field)?;
127 let list_buffers = [self.next_buffer()?];
128 let values = self.create_array(list_field, variadic_counts)?;
129 self.create_list_array(list_node, data_type, &list_buffers, values)
130 }
131 Struct(struct_fields) => {
132 let struct_node = self.next_node(field)?;
133 let null_buffer = self.next_buffer()?;
134
135 let mut struct_arrays = vec![];
137 for struct_field in struct_fields {
140 let child = self.create_array(struct_field, variadic_counts)?;
141 struct_arrays.push(child);
142 }
143 self.create_struct_array(struct_node, null_buffer, struct_fields, struct_arrays)
144 }
145 RunEndEncoded(run_ends_field, values_field) => {
146 let run_node = self.next_node(field)?;
147 let run_ends = self.create_array(run_ends_field, variadic_counts)?;
148 let values = self.create_array(values_field, variadic_counts)?;
149
150 let run_array_length = run_node.length() as usize;
151 let builder = ArrayData::builder(data_type.clone())
152 .len(run_array_length)
153 .offset(0)
154 .add_child_data(run_ends.into_data())
155 .add_child_data(values.into_data())
156 .null_count(run_node.null_count() as usize);
157
158 self.create_array_from_builder(builder)
159 }
160 Dictionary(_, _) => {
162 let index_node = self.next_node(field)?;
163 let index_buffers = [self.next_buffer()?, self.next_buffer()?];
164
165 #[allow(deprecated)]
166 let dict_id = field.dict_id().ok_or_else(|| {
167 ArrowError::ParseError(format!("Field {field} does not have dict id"))
168 })?;
169
170 let value_array = self.dictionaries_by_id.get(&dict_id).ok_or_else(|| {
171 ArrowError::ParseError(format!(
172 "Cannot find a dictionary batch with dict id: {dict_id}"
173 ))
174 })?;
175
176 self.create_dictionary_array(
177 index_node,
178 data_type,
179 &index_buffers,
180 value_array.clone(),
181 )
182 }
183 Union(fields, mode) => {
184 let union_node = self.next_node(field)?;
185 let len = union_node.length() as usize;
186
187 if self.version < MetadataVersion::V5 {
190 self.next_buffer()?;
191 }
192
193 let type_ids: ScalarBuffer<i8> =
194 self.next_buffer()?.slice_with_length(0, len).into();
195
196 let value_offsets = match mode {
197 UnionMode::Dense => {
198 let offsets: ScalarBuffer<i32> =
199 self.next_buffer()?.slice_with_length(0, len * 4).into();
200 Some(offsets)
201 }
202 UnionMode::Sparse => None,
203 };
204
205 let mut children = Vec::with_capacity(fields.len());
206
207 for (_id, field) in fields.iter() {
208 let child = self.create_array(field, variadic_counts)?;
209 children.push(child);
210 }
211
212 let array = if self.skip_validation.get() {
213 unsafe {
215 UnionArray::new_unchecked(fields.clone(), type_ids, value_offsets, children)
216 }
217 } else {
218 UnionArray::try_new(fields.clone(), type_ids, value_offsets, children)?
219 };
220 Ok(Arc::new(array))
221 }
222 Null => {
223 let node = self.next_node(field)?;
224 let length = node.length();
225 let null_count = node.null_count();
226
227 if length != null_count {
228 return Err(ArrowError::SchemaError(format!(
229 "Field {field} of NullArray has unequal null_count {null_count} and len {length}"
230 )));
231 }
232
233 let builder = ArrayData::builder(data_type.clone())
234 .len(length as usize)
235 .offset(0);
236 self.create_array_from_builder(builder)
237 }
238 _ => {
239 let field_node = self.next_node(field)?;
240 let buffers = [self.next_buffer()?, self.next_buffer()?];
241 self.create_primitive_array(field_node, data_type, &buffers)
242 }
243 }
244 }
245
246 fn create_primitive_array(
249 &self,
250 field_node: &FieldNode,
251 data_type: &DataType,
252 buffers: &[Buffer],
253 ) -> Result<ArrayRef, ArrowError> {
254 let length = field_node.length() as usize;
255 let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
256 let mut builder = match data_type {
257 Utf8 | Binary | LargeBinary | LargeUtf8 => {
258 ArrayData::builder(data_type.clone())
260 .len(length)
261 .buffers(buffers[1..3].to_vec())
262 .null_bit_buffer(null_buffer)
263 }
264 BinaryView | Utf8View => ArrayData::builder(data_type.clone())
265 .len(length)
266 .buffers(buffers[1..].to_vec())
267 .null_bit_buffer(null_buffer),
268 _ if data_type.is_primitive() || matches!(data_type, Boolean | FixedSizeBinary(_)) => {
269 ArrayData::builder(data_type.clone())
271 .len(length)
272 .add_buffer(buffers[1].clone())
273 .null_bit_buffer(null_buffer)
274 }
275 t => unreachable!("Data type {:?} either unsupported or not primitive", t),
276 };
277
278 builder = builder.null_count(field_node.null_count() as usize);
279
280 self.create_array_from_builder(builder)
281 }
282
283 fn create_array_from_builder(&self, builder: ArrayDataBuilder) -> Result<ArrayRef, ArrowError> {
285 let mut builder = builder.align_buffers(!self.require_alignment);
286 if self.skip_validation.get() {
287 unsafe { builder = builder.skip_validation(true) }
289 };
290 Ok(make_array(builder.build()?))
291 }
292
293 fn create_list_array(
296 &self,
297 field_node: &FieldNode,
298 data_type: &DataType,
299 buffers: &[Buffer],
300 child_array: ArrayRef,
301 ) -> Result<ArrayRef, ArrowError> {
302 let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
303 let length = field_node.length() as usize;
304 let child_data = child_array.into_data();
305 let mut builder = match data_type {
306 List(_) | LargeList(_) | Map(_, _) => ArrayData::builder(data_type.clone())
307 .len(length)
308 .add_buffer(buffers[1].clone())
309 .add_child_data(child_data)
310 .null_bit_buffer(null_buffer),
311
312 FixedSizeList(_, _) => ArrayData::builder(data_type.clone())
313 .len(length)
314 .add_child_data(child_data)
315 .null_bit_buffer(null_buffer),
316
317 _ => unreachable!("Cannot create list or map array from {:?}", data_type),
318 };
319
320 builder = builder.null_count(field_node.null_count() as usize);
321
322 self.create_array_from_builder(builder)
323 }
324
325 fn create_struct_array(
326 &self,
327 struct_node: &FieldNode,
328 null_buffer: Buffer,
329 struct_fields: &Fields,
330 struct_arrays: Vec<ArrayRef>,
331 ) -> Result<ArrayRef, ArrowError> {
332 let null_count = struct_node.null_count() as usize;
333 let len = struct_node.length() as usize;
334 let skip_validation = self.skip_validation.get();
335
336 let nulls = if null_count > 0 {
337 let validity_buffer = BooleanBuffer::new(null_buffer, 0, len);
338 let null_buffer = if skip_validation {
339 unsafe { NullBuffer::new_unchecked(validity_buffer, null_count) }
341 } else {
342 let null_buffer = NullBuffer::new(validity_buffer);
343
344 if null_buffer.null_count() != null_count {
345 return Err(ArrowError::InvalidArgumentError(format!(
346 "null_count value ({}) doesn't match actual number of nulls in array ({})",
347 null_count,
348 null_buffer.null_count()
349 )));
350 }
351
352 null_buffer
353 };
354
355 Some(null_buffer)
356 } else {
357 None
358 };
359 if struct_arrays.is_empty() {
360 return Ok(Arc::new(StructArray::new_empty_fields(len, nulls)));
363 }
364
365 let struct_array = if skip_validation {
366 unsafe { StructArray::new_unchecked(struct_fields.clone(), struct_arrays, nulls) }
368 } else {
369 StructArray::try_new(struct_fields.clone(), struct_arrays, nulls)?
370 };
371
372 Ok(Arc::new(struct_array))
373 }
374
375 fn create_dictionary_array(
378 &self,
379 field_node: &FieldNode,
380 data_type: &DataType,
381 buffers: &[Buffer],
382 value_array: ArrayRef,
383 ) -> Result<ArrayRef, ArrowError> {
384 if let Dictionary(_, _) = *data_type {
385 let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
386 let builder = ArrayData::builder(data_type.clone())
387 .len(field_node.length() as usize)
388 .add_buffer(buffers[1].clone())
389 .add_child_data(value_array.into_data())
390 .null_bit_buffer(null_buffer)
391 .null_count(field_node.null_count() as usize);
392 self.create_array_from_builder(builder)
393 } else {
394 unreachable!("Cannot create dictionary array from {:?}", data_type)
395 }
396 }
397}
398
399pub struct RecordBatchDecoder<'a> {
405 batch: crate::RecordBatch<'a>,
407 schema: SchemaRef,
409 dictionaries_by_id: &'a HashMap<i64, ArrayRef>,
411 compression: Option<CompressionCodec>,
413 version: MetadataVersion,
415 data: &'a Buffer,
417 nodes: VectorIter<'a, FieldNode>,
419 buffers: VectorIter<'a, crate::Buffer>,
421 projection: Option<&'a [usize]>,
424 require_alignment: bool,
427 skip_validation: UnsafeFlag,
431}
432
433impl<'a> RecordBatchDecoder<'a> {
434 fn try_new(
436 buf: &'a Buffer,
437 batch: crate::RecordBatch<'a>,
438 schema: SchemaRef,
439 dictionaries_by_id: &'a HashMap<i64, ArrayRef>,
440 metadata: &'a MetadataVersion,
441 ) -> Result<Self, ArrowError> {
442 let buffers = batch.buffers().ok_or_else(|| {
443 ArrowError::IpcError("Unable to get buffers from IPC RecordBatch".to_string())
444 })?;
445 let field_nodes = batch.nodes().ok_or_else(|| {
446 ArrowError::IpcError("Unable to get field nodes from IPC RecordBatch".to_string())
447 })?;
448
449 let batch_compression = batch.compression();
450 let compression = batch_compression
451 .map(|batch_compression| batch_compression.codec().try_into())
452 .transpose()?;
453
454 Ok(Self {
455 batch,
456 schema,
457 dictionaries_by_id,
458 compression,
459 version: *metadata,
460 data: buf,
461 nodes: field_nodes.iter(),
462 buffers: buffers.iter(),
463 projection: None,
464 require_alignment: false,
465 skip_validation: UnsafeFlag::new(),
466 })
467 }
468
469 pub fn with_projection(mut self, projection: Option<&'a [usize]>) -> Self {
474 self.projection = projection;
475 self
476 }
477
478 pub fn with_require_alignment(mut self, require_alignment: bool) -> Self {
484 self.require_alignment = require_alignment;
485 self
486 }
487
488 pub(crate) fn with_skip_validation(mut self, skip_validation: UnsafeFlag) -> Self {
500 self.skip_validation = skip_validation;
501 self
502 }
503
504 fn read_record_batch(mut self) -> Result<RecordBatch, ArrowError> {
506 let mut variadic_counts: VecDeque<i64> = self
507 .batch
508 .variadicBufferCounts()
509 .into_iter()
510 .flatten()
511 .collect();
512
513 let options = RecordBatchOptions::new().with_row_count(Some(self.batch.length() as usize));
514
515 let schema = Arc::clone(&self.schema);
516 if let Some(projection) = self.projection {
517 let mut arrays = vec![];
518 for (idx, field) in schema.fields().iter().enumerate() {
520 if let Some(proj_idx) = projection.iter().position(|p| p == &idx) {
522 let child = self.create_array(field, &mut variadic_counts)?;
523 arrays.push((proj_idx, child));
524 } else {
525 self.skip_field(field, &mut variadic_counts)?;
526 }
527 }
528
529 arrays.sort_by_key(|t| t.0);
530
531 let schema = Arc::new(schema.project(projection)?);
532 let columns = arrays.into_iter().map(|t| t.1).collect::<Vec<_>>();
533
534 if self.skip_validation.get() {
535 unsafe {
537 Ok(RecordBatch::new_unchecked(
538 schema,
539 columns,
540 self.batch.length() as usize,
541 ))
542 }
543 } else {
544 assert!(variadic_counts.is_empty());
545 RecordBatch::try_new_with_options(schema, columns, &options)
546 }
547 } else {
548 let mut children = vec![];
549 for field in schema.fields() {
551 let child = self.create_array(field, &mut variadic_counts)?;
552 children.push(child);
553 }
554
555 if self.skip_validation.get() {
556 unsafe {
558 Ok(RecordBatch::new_unchecked(
559 schema,
560 children,
561 self.batch.length() as usize,
562 ))
563 }
564 } else {
565 assert!(variadic_counts.is_empty());
566 RecordBatch::try_new_with_options(schema, children, &options)
567 }
568 }
569 }
570
571 fn next_buffer(&mut self) -> Result<Buffer, ArrowError> {
572 let buffer = self.buffers.next().ok_or_else(|| {
573 ArrowError::IpcError("Buffer count mismatched with metadata".to_string())
574 })?;
575 read_buffer(buffer, self.data, self.compression)
576 }
577
578 fn skip_buffer(&mut self) {
579 self.buffers.next().unwrap();
580 }
581
582 fn next_node(&mut self, field: &Field) -> Result<&'a FieldNode, ArrowError> {
583 self.nodes.next().ok_or_else(|| {
584 ArrowError::SchemaError(format!(
585 "Invalid data for schema. {field} refers to node not found in schema",
586 ))
587 })
588 }
589
590 fn skip_field(
591 &mut self,
592 field: &Field,
593 variadic_count: &mut VecDeque<i64>,
594 ) -> Result<(), ArrowError> {
595 self.next_node(field)?;
596
597 match field.data_type() {
598 Utf8 | Binary | LargeBinary | LargeUtf8 => {
599 for _ in 0..3 {
600 self.skip_buffer()
601 }
602 }
603 Utf8View | BinaryView => {
604 let count = variadic_count
605 .pop_front()
606 .ok_or(ArrowError::IpcError(format!(
607 "Missing variadic count for {} column",
608 field.data_type()
609 )))?;
610 let count = count + 2; for _i in 0..count {
612 self.skip_buffer()
613 }
614 }
615 FixedSizeBinary(_) => {
616 self.skip_buffer();
617 self.skip_buffer();
618 }
619 List(list_field) | LargeList(list_field) | Map(list_field, _) => {
620 self.skip_buffer();
621 self.skip_buffer();
622 self.skip_field(list_field, variadic_count)?;
623 }
624 FixedSizeList(list_field, _) => {
625 self.skip_buffer();
626 self.skip_field(list_field, variadic_count)?;
627 }
628 Struct(struct_fields) => {
629 self.skip_buffer();
630
631 for struct_field in struct_fields {
633 self.skip_field(struct_field, variadic_count)?
634 }
635 }
636 RunEndEncoded(run_ends_field, values_field) => {
637 self.skip_field(run_ends_field, variadic_count)?;
638 self.skip_field(values_field, variadic_count)?;
639 }
640 Dictionary(_, _) => {
641 self.skip_buffer(); self.skip_buffer(); }
644 Union(fields, mode) => {
645 self.skip_buffer(); match mode {
648 UnionMode::Dense => self.skip_buffer(),
649 UnionMode::Sparse => {}
650 };
651
652 for (_, field) in fields.iter() {
653 self.skip_field(field, variadic_count)?
654 }
655 }
656 Null => {} _ => {
658 self.skip_buffer();
659 self.skip_buffer();
660 }
661 };
662 Ok(())
663 }
664}
665
666pub fn read_record_batch(
677 buf: &Buffer,
678 batch: crate::RecordBatch,
679 schema: SchemaRef,
680 dictionaries_by_id: &HashMap<i64, ArrayRef>,
681 projection: Option<&[usize]>,
682 metadata: &MetadataVersion,
683) -> Result<RecordBatch, ArrowError> {
684 RecordBatchDecoder::try_new(buf, batch, schema, dictionaries_by_id, metadata)?
685 .with_projection(projection)
686 .with_require_alignment(false)
687 .read_record_batch()
688}
689
690pub fn read_dictionary(
693 buf: &Buffer,
694 batch: crate::DictionaryBatch,
695 schema: &Schema,
696 dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
697 metadata: &MetadataVersion,
698) -> Result<(), ArrowError> {
699 read_dictionary_impl(
700 buf,
701 batch,
702 schema,
703 dictionaries_by_id,
704 metadata,
705 false,
706 UnsafeFlag::new(),
707 )
708}
709
710fn read_dictionary_impl(
711 buf: &Buffer,
712 batch: crate::DictionaryBatch,
713 schema: &Schema,
714 dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
715 metadata: &MetadataVersion,
716 require_alignment: bool,
717 skip_validation: UnsafeFlag,
718) -> Result<(), ArrowError> {
719 let id = batch.id();
720
721 let dictionary_values = get_dictionary_values(
722 buf,
723 batch,
724 schema,
725 dictionaries_by_id,
726 metadata,
727 require_alignment,
728 skip_validation,
729 )?;
730
731 update_dictionaries(dictionaries_by_id, batch.isDelta(), id, dictionary_values)?;
732
733 Ok(())
734}
735
736fn update_dictionaries(
745 dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
746 is_delta: bool,
747 dict_id: i64,
748 dict_values: ArrayRef,
749) -> Result<(), ArrowError> {
750 if !is_delta {
751 dictionaries_by_id.insert(dict_id, dict_values.clone());
755 return Ok(());
756 }
757
758 let existing = dictionaries_by_id.get(&dict_id).ok_or_else(|| {
759 ArrowError::InvalidArgumentError(format!(
760 "No existing dictionary for delta dictionary with id '{dict_id}'"
761 ))
762 })?;
763
764 let combined = concat::concat(&[existing, &dict_values]).map_err(|e| {
765 ArrowError::InvalidArgumentError(format!("Failed to concat delta dictionary: {e}"))
766 })?;
767
768 dictionaries_by_id.insert(dict_id, combined);
769
770 Ok(())
771}
772
773fn get_dictionary_values(
777 buf: &Buffer,
778 batch: crate::DictionaryBatch,
779 schema: &Schema,
780 dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
781 metadata: &MetadataVersion,
782 require_alignment: bool,
783 skip_validation: UnsafeFlag,
784) -> Result<ArrayRef, ArrowError> {
785 let id = batch.id();
786 #[allow(deprecated)]
787 let fields_using_this_dictionary = schema.fields_with_dict_id(id);
788 let first_field = fields_using_this_dictionary.first().ok_or_else(|| {
789 ArrowError::InvalidArgumentError(format!("dictionary id {id} not found in schema"))
790 })?;
791
792 let dictionary_values: ArrayRef = match first_field.data_type() {
796 DataType::Dictionary(_, value_type) => {
797 let value = value_type.as_ref().clone();
799 let schema = Schema::new(vec![Field::new("", value, true)]);
800 let record_batch = RecordBatchDecoder::try_new(
802 buf,
803 batch.data().unwrap(),
804 Arc::new(schema),
805 dictionaries_by_id,
806 metadata,
807 )?
808 .with_require_alignment(require_alignment)
809 .with_skip_validation(skip_validation)
810 .read_record_batch()?;
811
812 Some(record_batch.column(0).clone())
813 }
814 _ => None,
815 }
816 .ok_or_else(|| {
817 ArrowError::InvalidArgumentError(format!("dictionary id {id} not found in schema"))
818 })?;
819
820 Ok(dictionary_values)
821}
822
823fn read_block<R: Read + Seek>(mut reader: R, block: &Block) -> Result<Buffer, ArrowError> {
825 reader.seek(SeekFrom::Start(block.offset() as u64))?;
826 let body_len = block.bodyLength().to_usize().unwrap();
827 let metadata_len = block.metaDataLength().to_usize().unwrap();
828 let total_len = body_len.checked_add(metadata_len).unwrap();
829
830 let mut buf = MutableBuffer::from_len_zeroed(total_len);
831 reader.read_exact(&mut buf)?;
832 Ok(buf.into())
833}
834
835fn parse_message(buf: &[u8]) -> Result<Message::Message<'_>, ArrowError> {
839 let buf = match buf[..4] == CONTINUATION_MARKER {
840 true => &buf[8..],
841 false => &buf[4..],
842 };
843 crate::root_as_message(buf)
844 .map_err(|err| ArrowError::ParseError(format!("Unable to get root as message: {err:?}")))
845}
846
847pub fn read_footer_length(buf: [u8; 10]) -> Result<usize, ArrowError> {
851 if buf[4..] != super::ARROW_MAGIC {
852 return Err(ArrowError::ParseError(
853 "Arrow file does not contain correct footer".to_string(),
854 ));
855 }
856
857 let footer_len = i32::from_le_bytes(buf[..4].try_into().unwrap());
859 footer_len
860 .try_into()
861 .map_err(|_| ArrowError::ParseError(format!("Invalid footer length: {footer_len}")))
862}
863
864#[derive(Debug)]
929pub struct FileDecoder {
930 schema: SchemaRef,
931 dictionaries: HashMap<i64, ArrayRef>,
932 version: MetadataVersion,
933 projection: Option<Vec<usize>>,
934 require_alignment: bool,
935 skip_validation: UnsafeFlag,
936}
937
938impl FileDecoder {
939 pub fn new(schema: SchemaRef, version: MetadataVersion) -> Self {
941 Self {
942 schema,
943 version,
944 dictionaries: Default::default(),
945 projection: None,
946 require_alignment: false,
947 skip_validation: UnsafeFlag::new(),
948 }
949 }
950
951 pub fn with_projection(mut self, projection: Vec<usize>) -> Self {
953 self.projection = Some(projection);
954 self
955 }
956
957 pub fn with_require_alignment(mut self, require_alignment: bool) -> Self {
970 self.require_alignment = require_alignment;
971 self
972 }
973
974 pub unsafe fn with_skip_validation(mut self, skip_validation: bool) -> Self {
985 unsafe { self.skip_validation.set(skip_validation) };
986 self
987 }
988
989 fn read_message<'a>(&self, buf: &'a [u8]) -> Result<Message::Message<'a>, ArrowError> {
990 let message = parse_message(buf)?;
991
992 if self.version != MetadataVersion::V1 && message.version() != self.version {
994 return Err(ArrowError::IpcError(
995 "Could not read IPC message as metadata versions mismatch".to_string(),
996 ));
997 }
998 Ok(message)
999 }
1000
1001 pub fn read_dictionary(&mut self, block: &Block, buf: &Buffer) -> Result<(), ArrowError> {
1003 let message = self.read_message(buf)?;
1004 match message.header_type() {
1005 crate::MessageHeader::DictionaryBatch => {
1006 let batch = message.header_as_dictionary_batch().unwrap();
1007 read_dictionary_impl(
1008 &buf.slice(block.metaDataLength() as _),
1009 batch,
1010 &self.schema,
1011 &mut self.dictionaries,
1012 &message.version(),
1013 self.require_alignment,
1014 self.skip_validation.clone(),
1015 )
1016 }
1017 t => Err(ArrowError::ParseError(format!(
1018 "Expecting DictionaryBatch in dictionary blocks, found {t:?}."
1019 ))),
1020 }
1021 }
1022
1023 pub fn read_record_batch(
1025 &self,
1026 block: &Block,
1027 buf: &Buffer,
1028 ) -> Result<Option<RecordBatch>, ArrowError> {
1029 let message = self.read_message(buf)?;
1030 match message.header_type() {
1031 crate::MessageHeader::Schema => Err(ArrowError::IpcError(
1032 "Not expecting a schema when messages are read".to_string(),
1033 )),
1034 crate::MessageHeader::RecordBatch => {
1035 let batch = message.header_as_record_batch().ok_or_else(|| {
1036 ArrowError::IpcError("Unable to read IPC message as record batch".to_string())
1037 })?;
1038 RecordBatchDecoder::try_new(
1040 &buf.slice(block.metaDataLength() as _),
1041 batch,
1042 self.schema.clone(),
1043 &self.dictionaries,
1044 &message.version(),
1045 )?
1046 .with_projection(self.projection.as_deref())
1047 .with_require_alignment(self.require_alignment)
1048 .with_skip_validation(self.skip_validation.clone())
1049 .read_record_batch()
1050 .map(Some)
1051 }
1052 crate::MessageHeader::NONE => Ok(None),
1053 t => Err(ArrowError::InvalidArgumentError(format!(
1054 "Reading types other than record batches not yet supported, unable to read {t:?}"
1055 ))),
1056 }
1057 }
1058}
1059
1060#[derive(Debug)]
1062pub struct FileReaderBuilder {
1063 projection: Option<Vec<usize>>,
1065 max_footer_fb_tables: usize,
1067 max_footer_fb_depth: usize,
1069}
1070
1071impl Default for FileReaderBuilder {
1072 fn default() -> Self {
1073 let verifier_options = VerifierOptions::default();
1074 Self {
1075 max_footer_fb_tables: verifier_options.max_tables,
1076 max_footer_fb_depth: verifier_options.max_depth,
1077 projection: None,
1078 }
1079 }
1080}
1081
1082impl FileReaderBuilder {
1083 pub fn new() -> Self {
1087 Self::default()
1088 }
1089
1090 pub fn with_projection(mut self, projection: Vec<usize>) -> Self {
1092 self.projection = Some(projection);
1093 self
1094 }
1095
1096 pub fn with_max_footer_fb_tables(mut self, max_footer_fb_tables: usize) -> Self {
1109 self.max_footer_fb_tables = max_footer_fb_tables;
1110 self
1111 }
1112
1113 pub fn with_max_footer_fb_depth(mut self, max_footer_fb_depth: usize) -> Self {
1126 self.max_footer_fb_depth = max_footer_fb_depth;
1127 self
1128 }
1129
1130 pub fn build<R: Read + Seek>(self, mut reader: R) -> Result<FileReader<R>, ArrowError> {
1132 let mut buffer = [0; 10];
1134 reader.seek(SeekFrom::End(-10))?;
1135 reader.read_exact(&mut buffer)?;
1136
1137 let footer_len = read_footer_length(buffer)?;
1138
1139 let mut footer_data = vec![0; footer_len];
1141 reader.seek(SeekFrom::End(-10 - footer_len as i64))?;
1142 reader.read_exact(&mut footer_data)?;
1143
1144 let verifier_options = VerifierOptions {
1145 max_tables: self.max_footer_fb_tables,
1146 max_depth: self.max_footer_fb_depth,
1147 ..Default::default()
1148 };
1149 let footer = crate::root_as_footer_with_opts(&verifier_options, &footer_data[..]).map_err(
1150 |err| ArrowError::ParseError(format!("Unable to get root as footer: {err:?}")),
1151 )?;
1152
1153 let blocks = footer.recordBatches().ok_or_else(|| {
1154 ArrowError::ParseError("Unable to get record batches from IPC Footer".to_string())
1155 })?;
1156
1157 let total_blocks = blocks.len();
1158
1159 let ipc_schema = footer.schema().unwrap();
1160 if !ipc_schema.endianness().equals_to_target_endianness() {
1161 return Err(ArrowError::IpcError(
1162 "the endianness of the source system does not match the endianness of the target system.".to_owned()
1163 ));
1164 }
1165
1166 let schema = crate::convert::fb_to_schema(ipc_schema);
1167
1168 let mut custom_metadata = HashMap::new();
1169 if let Some(fb_custom_metadata) = footer.custom_metadata() {
1170 for kv in fb_custom_metadata.into_iter() {
1171 custom_metadata.insert(
1172 kv.key().unwrap().to_string(),
1173 kv.value().unwrap().to_string(),
1174 );
1175 }
1176 }
1177
1178 let mut decoder = FileDecoder::new(Arc::new(schema), footer.version());
1179 if let Some(projection) = self.projection {
1180 decoder = decoder.with_projection(projection)
1181 }
1182
1183 if let Some(dictionaries) = footer.dictionaries() {
1185 for block in dictionaries {
1186 let buf = read_block(&mut reader, block)?;
1187 decoder.read_dictionary(block, &buf)?;
1188 }
1189 }
1190
1191 Ok(FileReader {
1192 reader,
1193 blocks: blocks.iter().copied().collect(),
1194 current_block: 0,
1195 total_blocks,
1196 decoder,
1197 custom_metadata,
1198 })
1199 }
1200}
1201
1202pub struct FileReader<R> {
1247 reader: R,
1249
1250 decoder: FileDecoder,
1252
1253 blocks: Vec<Block>,
1257
1258 current_block: usize,
1260
1261 total_blocks: usize,
1263
1264 custom_metadata: HashMap<String, String>,
1266}
1267
1268impl<R> fmt::Debug for FileReader<R> {
1269 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
1270 f.debug_struct("FileReader<R>")
1271 .field("decoder", &self.decoder)
1272 .field("blocks", &self.blocks)
1273 .field("current_block", &self.current_block)
1274 .field("total_blocks", &self.total_blocks)
1275 .finish_non_exhaustive()
1276 }
1277}
1278
1279impl<R: Read + Seek> FileReader<BufReader<R>> {
1280 pub fn try_new_buffered(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
1284 Self::try_new(BufReader::new(reader), projection)
1285 }
1286}
1287
1288impl<R: Read + Seek> FileReader<R> {
1289 pub fn try_new(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
1300 let builder = FileReaderBuilder {
1301 projection,
1302 ..Default::default()
1303 };
1304 builder.build(reader)
1305 }
1306
1307 pub fn custom_metadata(&self) -> &HashMap<String, String> {
1309 &self.custom_metadata
1310 }
1311
1312 pub fn num_batches(&self) -> usize {
1314 self.total_blocks
1315 }
1316
1317 pub fn schema(&self) -> SchemaRef {
1319 self.decoder.schema.clone()
1320 }
1321
1322 pub fn set_index(&mut self, index: usize) -> Result<(), ArrowError> {
1326 if index >= self.total_blocks {
1327 Err(ArrowError::InvalidArgumentError(format!(
1328 "Cannot set batch to index {} from {} total batches",
1329 index, self.total_blocks
1330 )))
1331 } else {
1332 self.current_block = index;
1333 Ok(())
1334 }
1335 }
1336
1337 fn maybe_next(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
1338 let block = &self.blocks[self.current_block];
1339 self.current_block += 1;
1340
1341 let buffer = read_block(&mut self.reader, block)?;
1343 self.decoder.read_record_batch(block, &buffer)
1344 }
1345
1346 pub fn get_ref(&self) -> &R {
1350 &self.reader
1351 }
1352
1353 pub fn get_mut(&mut self) -> &mut R {
1357 &mut self.reader
1358 }
1359
1360 pub unsafe fn with_skip_validation(mut self, skip_validation: bool) -> Self {
1366 self.decoder = unsafe { self.decoder.with_skip_validation(skip_validation) };
1367 self
1368 }
1369}
1370
1371impl<R: Read + Seek> Iterator for FileReader<R> {
1372 type Item = Result<RecordBatch, ArrowError>;
1373
1374 fn next(&mut self) -> Option<Self::Item> {
1375 if self.current_block < self.total_blocks {
1377 self.maybe_next().transpose()
1378 } else {
1379 None
1380 }
1381 }
1382}
1383
1384impl<R: Read + Seek> RecordBatchReader for FileReader<R> {
1385 fn schema(&self) -> SchemaRef {
1386 self.schema()
1387 }
1388}
1389
1390pub struct StreamReader<R> {
1424 reader: MessageReader<R>,
1426
1427 schema: SchemaRef,
1429
1430 dictionaries_by_id: HashMap<i64, ArrayRef>,
1434
1435 finished: bool,
1439
1440 projection: Option<(Vec<usize>, Schema)>,
1442
1443 skip_validation: UnsafeFlag,
1447}
1448
1449impl<R> fmt::Debug for StreamReader<R> {
1450 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> {
1451 f.debug_struct("StreamReader<R>")
1452 .field("reader", &"R")
1453 .field("schema", &self.schema)
1454 .field("dictionaries_by_id", &self.dictionaries_by_id)
1455 .field("finished", &self.finished)
1456 .field("projection", &self.projection)
1457 .finish()
1458 }
1459}
1460
1461impl<R: Read> StreamReader<BufReader<R>> {
1462 pub fn try_new_buffered(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
1466 Self::try_new(BufReader::new(reader), projection)
1467 }
1468}
1469
1470impl<R: Read> StreamReader<R> {
1471 pub fn try_new(
1483 reader: R,
1484 projection: Option<Vec<usize>>,
1485 ) -> Result<StreamReader<R>, ArrowError> {
1486 let mut msg_reader = MessageReader::new(reader);
1487 let message = msg_reader.maybe_next()?;
1488 let Some((message, _)) = message else {
1489 return Err(ArrowError::IpcError(
1490 "Expected schema message, found empty stream.".to_string(),
1491 ));
1492 };
1493
1494 if message.header_type() != Message::MessageHeader::Schema {
1495 return Err(ArrowError::IpcError(format!(
1496 "Expected a schema as the first message in the stream, got: {:?}",
1497 message.header_type()
1498 )));
1499 }
1500
1501 let schema = message.header_as_schema().ok_or_else(|| {
1502 ArrowError::ParseError("Failed to parse schema from message header".to_string())
1503 })?;
1504 let schema = crate::convert::fb_to_schema(schema);
1505
1506 let dictionaries_by_id = HashMap::new();
1508
1509 let projection = match projection {
1510 Some(projection_indices) => {
1511 let schema = schema.project(&projection_indices)?;
1512 Some((projection_indices, schema))
1513 }
1514 _ => None,
1515 };
1516
1517 Ok(Self {
1518 reader: msg_reader,
1519 schema: Arc::new(schema),
1520 finished: false,
1521 dictionaries_by_id,
1522 projection,
1523 skip_validation: UnsafeFlag::new(),
1524 })
1525 }
1526
1527 #[deprecated(since = "53.0.0", note = "use `try_new` instead")]
1529 pub fn try_new_unbuffered(
1530 reader: R,
1531 projection: Option<Vec<usize>>,
1532 ) -> Result<Self, ArrowError> {
1533 Self::try_new(reader, projection)
1534 }
1535
1536 pub fn schema(&self) -> SchemaRef {
1538 self.schema.clone()
1539 }
1540
1541 pub fn is_finished(&self) -> bool {
1543 self.finished
1544 }
1545
1546 fn maybe_next(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
1547 if self.finished {
1548 return Ok(None);
1549 }
1550
1551 loop {
1553 let message = self.next_ipc_message()?;
1554 let Some(message) = message else {
1555 self.finished = true;
1557 return Ok(None);
1558 };
1559
1560 match message {
1561 IpcMessage::Schema(_) => {
1562 return Err(ArrowError::IpcError(
1563 "Expected a record batch, but found a schema".to_string(),
1564 ));
1565 }
1566 IpcMessage::RecordBatch(record_batch) => {
1567 return Ok(Some(record_batch));
1568 }
1569 IpcMessage::DictionaryBatch { .. } => {
1570 continue;
1571 }
1572 };
1573 }
1574 }
1575
1576 pub(crate) fn next_ipc_message(&mut self) -> Result<Option<IpcMessage>, ArrowError> {
1584 let message = self.reader.maybe_next()?;
1585 let Some((message, body)) = message else {
1586 return Ok(None);
1588 };
1589
1590 let ipc_message = match message.header_type() {
1591 Message::MessageHeader::Schema => {
1592 let schema = message.header_as_schema().ok_or_else(|| {
1593 ArrowError::ParseError("Failed to parse schema from message header".to_string())
1594 })?;
1595 let arrow_schema = crate::convert::fb_to_schema(schema);
1596 IpcMessage::Schema(arrow_schema)
1597 }
1598 Message::MessageHeader::RecordBatch => {
1599 let batch = message.header_as_record_batch().ok_or_else(|| {
1600 ArrowError::IpcError("Unable to read IPC message as record batch".to_string())
1601 })?;
1602
1603 let version = message.version();
1604 let schema = self.schema.clone();
1605 let record_batch = RecordBatchDecoder::try_new(
1606 &body.into(),
1607 batch,
1608 schema,
1609 &self.dictionaries_by_id,
1610 &version,
1611 )?
1612 .with_projection(self.projection.as_ref().map(|x| x.0.as_ref()))
1613 .with_require_alignment(false)
1614 .with_skip_validation(self.skip_validation.clone())
1615 .read_record_batch()?;
1616 IpcMessage::RecordBatch(record_batch)
1617 }
1618 Message::MessageHeader::DictionaryBatch => {
1619 let dict = message.header_as_dictionary_batch().ok_or_else(|| {
1620 ArrowError::ParseError(
1621 "Failed to parse dictionary batch from message header".to_string(),
1622 )
1623 })?;
1624
1625 let version = message.version();
1626 let dict_values = get_dictionary_values(
1627 &body.into(),
1628 dict,
1629 &self.schema,
1630 &mut self.dictionaries_by_id,
1631 &version,
1632 false,
1633 self.skip_validation.clone(),
1634 )?;
1635
1636 update_dictionaries(
1637 &mut self.dictionaries_by_id,
1638 dict.isDelta(),
1639 dict.id(),
1640 dict_values.clone(),
1641 )?;
1642
1643 IpcMessage::DictionaryBatch {
1644 id: dict.id(),
1645 is_delta: (dict.isDelta()),
1646 values: (dict_values),
1647 }
1648 }
1649 x => {
1650 return Err(ArrowError::ParseError(format!(
1651 "Unsupported message header type in IPC stream: '{x:?}'"
1652 )));
1653 }
1654 };
1655
1656 Ok(Some(ipc_message))
1657 }
1658
1659 pub fn get_ref(&self) -> &R {
1663 self.reader.inner()
1664 }
1665
1666 pub fn get_mut(&mut self) -> &mut R {
1670 self.reader.inner_mut()
1671 }
1672
1673 pub unsafe fn with_skip_validation(mut self, skip_validation: bool) -> Self {
1679 unsafe { self.skip_validation.set(skip_validation) };
1680 self
1681 }
1682}
1683
1684impl<R: Read> Iterator for StreamReader<R> {
1685 type Item = Result<RecordBatch, ArrowError>;
1686
1687 fn next(&mut self) -> Option<Self::Item> {
1688 self.maybe_next().transpose()
1689 }
1690}
1691
1692impl<R: Read> RecordBatchReader for StreamReader<R> {
1693 fn schema(&self) -> SchemaRef {
1694 self.schema.clone()
1695 }
1696}
1697
1698#[derive(Debug)]
1704#[allow(dead_code)]
1705pub(crate) enum IpcMessage {
1706 Schema(arrow_schema::Schema),
1707 RecordBatch(RecordBatch),
1708 DictionaryBatch {
1709 id: i64,
1710 is_delta: bool,
1711 values: ArrayRef,
1712 },
1713}
1714
1715struct MessageReader<R> {
1718 reader: R,
1719 buf: Vec<u8>,
1720}
1721
1722impl<R: Read> MessageReader<R> {
1723 fn new(reader: R) -> Self {
1724 Self {
1725 reader,
1726 buf: Vec::new(),
1727 }
1728 }
1729
1730 fn maybe_next(&mut self) -> Result<Option<(Message::Message<'_>, MutableBuffer)>, ArrowError> {
1741 let meta_len = self.read_meta_len()?;
1742 let Some(meta_len) = meta_len else {
1743 return Ok(None);
1744 };
1745
1746 self.buf.resize(meta_len, 0);
1747 self.reader.read_exact(&mut self.buf)?;
1748
1749 let message = crate::root_as_message(self.buf.as_slice()).map_err(|err| {
1750 ArrowError::ParseError(format!("Unable to get root as message: {err:?}"))
1751 })?;
1752
1753 let mut buf = MutableBuffer::from_len_zeroed(message.bodyLength() as usize);
1754 self.reader.read_exact(&mut buf)?;
1755
1756 Ok(Some((message, buf)))
1757 }
1758
1759 fn inner_mut(&mut self) -> &mut R {
1761 &mut self.reader
1762 }
1763
1764 fn inner(&self) -> &R {
1766 &self.reader
1767 }
1768
1769 pub fn read_meta_len(&mut self) -> Result<Option<usize>, ArrowError> {
1778 let mut meta_len: [u8; 4] = [0; 4];
1779 match self.reader.read_exact(&mut meta_len) {
1780 Ok(_) => {}
1781 Err(e) => {
1782 return if e.kind() == std::io::ErrorKind::UnexpectedEof {
1783 Ok(None)
1787 } else {
1788 Err(ArrowError::from(e))
1789 };
1790 }
1791 };
1792
1793 let meta_len = {
1794 if meta_len == CONTINUATION_MARKER {
1797 self.reader.read_exact(&mut meta_len)?;
1798 }
1799
1800 i32::from_le_bytes(meta_len)
1801 };
1802
1803 if meta_len == 0 {
1804 return Ok(None);
1805 }
1806
1807 let meta_len = usize::try_from(meta_len)
1808 .map_err(|_| ArrowError::ParseError(format!("Invalid metadata length: {meta_len}")))?;
1809
1810 Ok(Some(meta_len))
1811 }
1812}
1813
1814#[cfg(test)]
1815mod tests {
1816 use std::io::Cursor;
1817
1818 use crate::convert::fb_to_schema;
1819 use crate::writer::{
1820 DictionaryTracker, IpcDataGenerator, IpcWriteOptions, unslice_run_array, write_message,
1821 };
1822
1823 use super::*;
1824
1825 use crate::{root_as_footer, root_as_message, size_prefixed_root_as_message};
1826 use arrow_array::builder::{PrimitiveRunBuilder, UnionBuilder};
1827 use arrow_array::types::*;
1828 use arrow_buffer::{NullBuffer, OffsetBuffer};
1829 use arrow_data::ArrayDataBuilder;
1830
1831 fn create_test_projection_schema() -> Schema {
1832 let list_data_type = DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true)));
1834
1835 let fixed_size_list_data_type =
1836 DataType::FixedSizeList(Arc::new(Field::new_list_field(DataType::Int32, false)), 3);
1837
1838 let union_fields = UnionFields::new(
1839 vec![0, 1],
1840 vec![
1841 Field::new("a", DataType::Int32, false),
1842 Field::new("b", DataType::Float64, false),
1843 ],
1844 );
1845
1846 let union_data_type = DataType::Union(union_fields, UnionMode::Dense);
1847
1848 let struct_fields = Fields::from(vec![
1849 Field::new("id", DataType::Int32, false),
1850 Field::new_list("list", Field::new_list_field(DataType::Int8, true), false),
1851 ]);
1852 let struct_data_type = DataType::Struct(struct_fields);
1853
1854 let run_encoded_data_type = DataType::RunEndEncoded(
1855 Arc::new(Field::new("run_ends", DataType::Int16, false)),
1856 Arc::new(Field::new("values", DataType::Int32, true)),
1857 );
1858
1859 Schema::new(vec![
1861 Field::new("f0", DataType::UInt32, false),
1862 Field::new("f1", DataType::Utf8, false),
1863 Field::new("f2", DataType::Boolean, false),
1864 Field::new("f3", union_data_type, true),
1865 Field::new("f4", DataType::Null, true),
1866 Field::new("f5", DataType::Float64, true),
1867 Field::new("f6", list_data_type, false),
1868 Field::new("f7", DataType::FixedSizeBinary(3), true),
1869 Field::new("f8", fixed_size_list_data_type, false),
1870 Field::new("f9", struct_data_type, false),
1871 Field::new("f10", run_encoded_data_type, false),
1872 Field::new("f11", DataType::Boolean, false),
1873 Field::new_dictionary("f12", DataType::Int8, DataType::Utf8, false),
1874 Field::new("f13", DataType::Utf8, false),
1875 ])
1876 }
1877
1878 fn create_test_projection_batch_data(schema: &Schema) -> RecordBatch {
1879 let array0 = UInt32Array::from(vec![1, 2, 3]);
1881 let array1 = StringArray::from(vec!["foo", "bar", "baz"]);
1882 let array2 = BooleanArray::from(vec![true, false, true]);
1883
1884 let mut union_builder = UnionBuilder::new_dense();
1885 union_builder.append::<Int32Type>("a", 1).unwrap();
1886 union_builder.append::<Float64Type>("b", 10.1).unwrap();
1887 union_builder.append_null::<Float64Type>("b").unwrap();
1888 let array3 = union_builder.build().unwrap();
1889
1890 let array4 = NullArray::new(3);
1891 let array5 = Float64Array::from(vec![Some(1.1), None, Some(3.3)]);
1892 let array6_values = vec![
1893 Some(vec![Some(10), Some(10), Some(10)]),
1894 Some(vec![Some(20), Some(20), Some(20)]),
1895 Some(vec![Some(30), Some(30)]),
1896 ];
1897 let array6 = ListArray::from_iter_primitive::<Int32Type, _, _>(array6_values);
1898 let array7_values = vec![vec![11, 12, 13], vec![22, 23, 24], vec![33, 34, 35]];
1899 let array7 = FixedSizeBinaryArray::try_from_iter(array7_values.into_iter()).unwrap();
1900
1901 let array8_values = ArrayData::builder(DataType::Int32)
1902 .len(9)
1903 .add_buffer(Buffer::from_slice_ref([40, 41, 42, 43, 44, 45, 46, 47, 48]))
1904 .build()
1905 .unwrap();
1906 let array8_data = ArrayData::builder(schema.field(8).data_type().clone())
1907 .len(3)
1908 .add_child_data(array8_values)
1909 .build()
1910 .unwrap();
1911 let array8 = FixedSizeListArray::from(array8_data);
1912
1913 let array9_id: ArrayRef = Arc::new(Int32Array::from(vec![1001, 1002, 1003]));
1914 let array9_list: ArrayRef =
1915 Arc::new(ListArray::from_iter_primitive::<Int8Type, _, _>(vec![
1916 Some(vec![Some(-10)]),
1917 Some(vec![Some(-20), Some(-20), Some(-20)]),
1918 Some(vec![Some(-30)]),
1919 ]));
1920 let array9 = ArrayDataBuilder::new(schema.field(9).data_type().clone())
1921 .add_child_data(array9_id.into_data())
1922 .add_child_data(array9_list.into_data())
1923 .len(3)
1924 .build()
1925 .unwrap();
1926 let array9: ArrayRef = Arc::new(StructArray::from(array9));
1927
1928 let array10_input = vec![Some(1_i32), None, None];
1929 let mut array10_builder = PrimitiveRunBuilder::<Int16Type, Int32Type>::new();
1930 array10_builder.extend(array10_input);
1931 let array10 = array10_builder.finish();
1932
1933 let array11 = BooleanArray::from(vec![false, false, true]);
1934
1935 let array12_values = StringArray::from(vec!["x", "yy", "zzz"]);
1936 let array12_keys = Int8Array::from_iter_values([1, 1, 2]);
1937 let array12 = DictionaryArray::new(array12_keys, Arc::new(array12_values));
1938
1939 let array13 = StringArray::from(vec!["a", "bb", "ccc"]);
1940
1941 RecordBatch::try_new(
1943 Arc::new(schema.clone()),
1944 vec![
1945 Arc::new(array0),
1946 Arc::new(array1),
1947 Arc::new(array2),
1948 Arc::new(array3),
1949 Arc::new(array4),
1950 Arc::new(array5),
1951 Arc::new(array6),
1952 Arc::new(array7),
1953 Arc::new(array8),
1954 Arc::new(array9),
1955 Arc::new(array10),
1956 Arc::new(array11),
1957 Arc::new(array12),
1958 Arc::new(array13),
1959 ],
1960 )
1961 .unwrap()
1962 }
1963
1964 #[test]
1965 fn test_negative_meta_len_start_stream() {
1966 let bytes = i32::to_le_bytes(-1);
1967 let mut buf = vec![];
1968 buf.extend(CONTINUATION_MARKER);
1969 buf.extend(bytes);
1970
1971 let reader_err = StreamReader::try_new(Cursor::new(buf), None).err();
1972 assert!(reader_err.is_some());
1973 assert_eq!(
1974 reader_err.unwrap().to_string(),
1975 "Parser error: Invalid metadata length: -1"
1976 );
1977 }
1978
1979 #[test]
1980 fn test_negative_meta_len_mid_stream() {
1981 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1982 let mut buf = Vec::new();
1983 {
1984 let mut writer = crate::writer::StreamWriter::try_new(&mut buf, &schema).unwrap();
1985 let batch =
1986 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(Int32Array::from(vec![1]))])
1987 .unwrap();
1988 writer.write(&batch).unwrap();
1989 }
1990
1991 let bytes = i32::to_le_bytes(-1);
1992 buf.extend(CONTINUATION_MARKER);
1993 buf.extend(bytes);
1994
1995 let mut reader = StreamReader::try_new(Cursor::new(buf), None).unwrap();
1996 assert!(reader.maybe_next().is_ok());
1998 let batch_err = reader.maybe_next().err();
2000 assert!(batch_err.is_some());
2001 assert_eq!(
2002 batch_err.unwrap().to_string(),
2003 "Parser error: Invalid metadata length: -1"
2004 );
2005 }
2006
2007 #[test]
2008 fn test_missing_buffer_metadata_error() {
2009 use crate::r#gen::Message::*;
2010 use flatbuffers::FlatBufferBuilder;
2011
2012 let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Int32, true)]));
2013
2014 let mut fbb = FlatBufferBuilder::new();
2017 let nodes = fbb.create_vector(&[FieldNode::new(2, 0)]);
2018 let buffers = fbb.create_vector(&[crate::Buffer::new(0, 8)]);
2019 let batch_offset = RecordBatch::create(
2020 &mut fbb,
2021 &RecordBatchArgs {
2022 length: 2,
2023 nodes: Some(nodes),
2024 buffers: Some(buffers),
2025 compression: None,
2026 variadicBufferCounts: None,
2027 },
2028 );
2029 fbb.finish_minimal(batch_offset);
2030 let batch_bytes = fbb.finished_data().to_vec();
2031 let batch = flatbuffers::root::<RecordBatch>(&batch_bytes).unwrap();
2032
2033 let data_buffer = Buffer::from(vec![0u8; 8]);
2034 let dictionaries: HashMap<i64, ArrayRef> = HashMap::new();
2035 let metadata = MetadataVersion::V5;
2036
2037 let decoder = RecordBatchDecoder::try_new(
2038 &data_buffer,
2039 batch,
2040 schema.clone(),
2041 &dictionaries,
2042 &metadata,
2043 )
2044 .unwrap();
2045
2046 let result = decoder.read_record_batch();
2047
2048 match result {
2049 Err(ArrowError::IpcError(msg)) => {
2050 assert_eq!(msg, "Buffer count mismatched with metadata");
2051 }
2052 other => panic!("unexpected error: {other:?}"),
2053 }
2054 }
2055
2056 #[test]
2057 fn test_projection_array_values() {
2058 let schema = create_test_projection_schema();
2060
2061 let batch = create_test_projection_batch_data(&schema);
2063
2064 let mut buf = Vec::new();
2066 {
2067 let mut writer = crate::writer::FileWriter::try_new(&mut buf, &schema).unwrap();
2068 writer.write(&batch).unwrap();
2069 writer.finish().unwrap();
2070 }
2071
2072 for index in 0..12 {
2074 let projection = vec![index];
2075 let reader = FileReader::try_new(std::io::Cursor::new(buf.clone()), Some(projection));
2076 let read_batch = reader.unwrap().next().unwrap().unwrap();
2077 let projected_column = read_batch.column(0);
2078 let expected_column = batch.column(index);
2079
2080 assert_eq!(projected_column.as_ref(), expected_column.as_ref());
2082 }
2083
2084 {
2085 let reader =
2087 FileReader::try_new(std::io::Cursor::new(buf.clone()), Some(vec![3, 2, 1]));
2088 let read_batch = reader.unwrap().next().unwrap().unwrap();
2089 let expected_batch = batch.project(&[3, 2, 1]).unwrap();
2090 assert_eq!(read_batch, expected_batch);
2091 }
2092 }
2093
2094 #[test]
2095 fn test_arrow_single_float_row() {
2096 let schema = Schema::new(vec![
2097 Field::new("a", DataType::Float32, false),
2098 Field::new("b", DataType::Float32, false),
2099 Field::new("c", DataType::Int32, false),
2100 Field::new("d", DataType::Int32, false),
2101 ]);
2102 let arrays = vec![
2103 Arc::new(Float32Array::from(vec![1.23])) as ArrayRef,
2104 Arc::new(Float32Array::from(vec![-6.50])) as ArrayRef,
2105 Arc::new(Int32Array::from(vec![2])) as ArrayRef,
2106 Arc::new(Int32Array::from(vec![1])) as ArrayRef,
2107 ];
2108 let batch = RecordBatch::try_new(Arc::new(schema.clone()), arrays).unwrap();
2109 let mut file = tempfile::tempfile().unwrap();
2111 let mut stream_writer = crate::writer::StreamWriter::try_new(&mut file, &schema).unwrap();
2112 stream_writer.write(&batch).unwrap();
2113 stream_writer.finish().unwrap();
2114
2115 drop(stream_writer);
2116
2117 file.rewind().unwrap();
2118
2119 let reader = StreamReader::try_new(&mut file, None).unwrap();
2121
2122 reader.for_each(|batch| {
2123 let batch = batch.unwrap();
2124 assert!(
2125 batch
2126 .column(0)
2127 .as_any()
2128 .downcast_ref::<Float32Array>()
2129 .unwrap()
2130 .value(0)
2131 != 0.0
2132 );
2133 assert!(
2134 batch
2135 .column(1)
2136 .as_any()
2137 .downcast_ref::<Float32Array>()
2138 .unwrap()
2139 .value(0)
2140 != 0.0
2141 );
2142 });
2143
2144 file.rewind().unwrap();
2145
2146 let reader = StreamReader::try_new(file, Some(vec![0, 3])).unwrap();
2148
2149 reader.for_each(|batch| {
2150 let batch = batch.unwrap();
2151 assert_eq!(batch.schema().fields().len(), 2);
2152 assert_eq!(batch.schema().fields()[0].data_type(), &DataType::Float32);
2153 assert_eq!(batch.schema().fields()[1].data_type(), &DataType::Int32);
2154 });
2155 }
2156
2157 fn write_ipc(rb: &RecordBatch) -> Vec<u8> {
2159 let mut buf = Vec::new();
2160 let mut writer = crate::writer::FileWriter::try_new(&mut buf, rb.schema_ref()).unwrap();
2161 writer.write(rb).unwrap();
2162 writer.finish().unwrap();
2163 buf
2164 }
2165
2166 fn read_ipc(buf: &[u8]) -> Result<RecordBatch, ArrowError> {
2168 let mut reader = FileReader::try_new(std::io::Cursor::new(buf), None)?;
2169 reader.next().unwrap()
2170 }
2171
2172 fn read_ipc_skip_validation(buf: &[u8]) -> Result<RecordBatch, ArrowError> {
2175 let mut reader = unsafe {
2176 FileReader::try_new(std::io::Cursor::new(buf), None)?.with_skip_validation(true)
2177 };
2178 reader.next().unwrap()
2179 }
2180
2181 fn roundtrip_ipc(rb: &RecordBatch) -> RecordBatch {
2182 let buf = write_ipc(rb);
2183 read_ipc(&buf).unwrap()
2184 }
2185
2186 fn read_ipc_with_decoder(buf: Vec<u8>) -> Result<RecordBatch, ArrowError> {
2189 read_ipc_with_decoder_inner(buf, false)
2190 }
2191
2192 fn read_ipc_with_decoder_skip_validation(buf: Vec<u8>) -> Result<RecordBatch, ArrowError> {
2195 read_ipc_with_decoder_inner(buf, true)
2196 }
2197
2198 fn read_ipc_with_decoder_inner(
2199 buf: Vec<u8>,
2200 skip_validation: bool,
2201 ) -> Result<RecordBatch, ArrowError> {
2202 let buffer = Buffer::from_vec(buf);
2203 let trailer_start = buffer.len() - 10;
2204 let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap())?;
2205 let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start])
2206 .map_err(|e| ArrowError::InvalidArgumentError(format!("Invalid footer: {e}")))?;
2207
2208 let schema = fb_to_schema(footer.schema().unwrap());
2209
2210 let mut decoder = unsafe {
2211 FileDecoder::new(Arc::new(schema), footer.version())
2212 .with_skip_validation(skip_validation)
2213 };
2214 for block in footer.dictionaries().iter().flatten() {
2216 let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
2217 let data = buffer.slice_with_length(block.offset() as _, block_len);
2218 decoder.read_dictionary(block, &data)?
2219 }
2220
2221 let batches = footer.recordBatches().unwrap();
2223 assert_eq!(batches.len(), 1); let block = batches.get(0);
2226 let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
2227 let data = buffer.slice_with_length(block.offset() as _, block_len);
2228 Ok(decoder.read_record_batch(block, &data)?.unwrap())
2229 }
2230
2231 fn write_stream(rb: &RecordBatch) -> Vec<u8> {
2233 let mut buf = Vec::new();
2234 let mut writer = crate::writer::StreamWriter::try_new(&mut buf, rb.schema_ref()).unwrap();
2235 writer.write(rb).unwrap();
2236 writer.finish().unwrap();
2237 buf
2238 }
2239
2240 fn read_stream(buf: &[u8]) -> Result<RecordBatch, ArrowError> {
2242 let mut reader = StreamReader::try_new(std::io::Cursor::new(buf), None)?;
2243 reader.next().unwrap()
2244 }
2245
2246 fn read_stream_skip_validation(buf: &[u8]) -> Result<RecordBatch, ArrowError> {
2249 let mut reader = unsafe {
2250 StreamReader::try_new(std::io::Cursor::new(buf), None)?.with_skip_validation(true)
2251 };
2252 reader.next().unwrap()
2253 }
2254
2255 fn roundtrip_ipc_stream(rb: &RecordBatch) -> RecordBatch {
2256 let buf = write_stream(rb);
2257 read_stream(&buf).unwrap()
2258 }
2259
2260 #[test]
2261 fn test_roundtrip_with_custom_metadata() {
2262 let schema = Schema::new(vec![Field::new("dummy", DataType::Float64, false)]);
2263 let mut buf = Vec::new();
2264 let mut writer = crate::writer::FileWriter::try_new(&mut buf, &schema).unwrap();
2265 let mut test_metadata = HashMap::new();
2266 test_metadata.insert("abc".to_string(), "abc".to_string());
2267 test_metadata.insert("def".to_string(), "def".to_string());
2268 for (k, v) in &test_metadata {
2269 writer.write_metadata(k, v);
2270 }
2271 writer.finish().unwrap();
2272 drop(writer);
2273
2274 let reader = crate::reader::FileReader::try_new(std::io::Cursor::new(buf), None).unwrap();
2275 assert_eq!(reader.custom_metadata(), &test_metadata);
2276 }
2277
2278 #[test]
2279 fn test_roundtrip_nested_dict() {
2280 let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2281
2282 let array = Arc::new(inner) as ArrayRef;
2283
2284 let dctfield = Arc::new(Field::new("dict", array.data_type().clone(), false));
2285
2286 let s = StructArray::from(vec![(dctfield, array)]);
2287 let struct_array = Arc::new(s) as ArrayRef;
2288
2289 let schema = Arc::new(Schema::new(vec![Field::new(
2290 "struct",
2291 struct_array.data_type().clone(),
2292 false,
2293 )]));
2294
2295 let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
2296
2297 assert_eq!(batch, roundtrip_ipc(&batch));
2298 }
2299
2300 #[test]
2301 fn test_roundtrip_nested_dict_no_preserve_dict_id() {
2302 let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2303
2304 let array = Arc::new(inner) as ArrayRef;
2305
2306 let dctfield = Arc::new(Field::new("dict", array.data_type().clone(), false));
2307
2308 let s = StructArray::from(vec![(dctfield, array)]);
2309 let struct_array = Arc::new(s) as ArrayRef;
2310
2311 let schema = Arc::new(Schema::new(vec![Field::new(
2312 "struct",
2313 struct_array.data_type().clone(),
2314 false,
2315 )]));
2316
2317 let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
2318
2319 let mut buf = Vec::new();
2320 let mut writer = crate::writer::FileWriter::try_new_with_options(
2321 &mut buf,
2322 batch.schema_ref(),
2323 IpcWriteOptions::default(),
2324 )
2325 .unwrap();
2326 writer.write(&batch).unwrap();
2327 writer.finish().unwrap();
2328 drop(writer);
2329
2330 let mut reader = FileReader::try_new(std::io::Cursor::new(buf), None).unwrap();
2331
2332 assert_eq!(batch, reader.next().unwrap().unwrap());
2333 }
2334
2335 fn check_union_with_builder(mut builder: UnionBuilder) {
2336 builder.append::<Int32Type>("a", 1).unwrap();
2337 builder.append_null::<Int32Type>("a").unwrap();
2338 builder.append::<Float64Type>("c", 3.0).unwrap();
2339 builder.append::<Int32Type>("a", 4).unwrap();
2340 builder.append::<Int64Type>("d", 11).unwrap();
2341 let union = builder.build().unwrap();
2342
2343 let schema = Arc::new(Schema::new(vec![Field::new(
2344 "union",
2345 union.data_type().clone(),
2346 false,
2347 )]));
2348
2349 let union_array = Arc::new(union) as ArrayRef;
2350
2351 let rb = RecordBatch::try_new(schema, vec![union_array]).unwrap();
2352 let rb2 = roundtrip_ipc(&rb);
2353 assert_eq!(rb.schema(), rb2.schema());
2356 assert_eq!(rb.num_columns(), rb2.num_columns());
2357 assert_eq!(rb.num_rows(), rb2.num_rows());
2358 let union1 = rb.column(0);
2359 let union2 = rb2.column(0);
2360
2361 assert_eq!(union1, union2);
2362 }
2363
2364 #[test]
2365 fn test_roundtrip_dense_union() {
2366 check_union_with_builder(UnionBuilder::new_dense());
2367 }
2368
2369 #[test]
2370 fn test_roundtrip_sparse_union() {
2371 check_union_with_builder(UnionBuilder::new_sparse());
2372 }
2373
2374 #[test]
2375 fn test_roundtrip_struct_empty_fields() {
2376 let nulls = NullBuffer::from(&[true, true, false]);
2377 let rb = RecordBatch::try_from_iter([(
2378 "",
2379 Arc::new(StructArray::new_empty_fields(nulls.len(), Some(nulls))) as _,
2380 )])
2381 .unwrap();
2382 let rb2 = roundtrip_ipc(&rb);
2383 assert_eq!(rb, rb2);
2384 }
2385
2386 #[test]
2387 fn test_roundtrip_stream_run_array_sliced() {
2388 let run_array_1: Int32RunArray = vec!["a", "a", "a", "b", "b", "c", "c", "c"]
2389 .into_iter()
2390 .collect();
2391 let run_array_1_sliced = run_array_1.slice(2, 5);
2392
2393 let run_array_2_inupt = vec![Some(1_i32), None, None, Some(2), Some(2)];
2394 let mut run_array_2_builder = PrimitiveRunBuilder::<Int16Type, Int32Type>::new();
2395 run_array_2_builder.extend(run_array_2_inupt);
2396 let run_array_2 = run_array_2_builder.finish();
2397
2398 let schema = Arc::new(Schema::new(vec![
2399 Field::new(
2400 "run_array_1_sliced",
2401 run_array_1_sliced.data_type().clone(),
2402 false,
2403 ),
2404 Field::new("run_array_2", run_array_2.data_type().clone(), false),
2405 ]));
2406 let input_batch = RecordBatch::try_new(
2407 schema,
2408 vec![Arc::new(run_array_1_sliced.clone()), Arc::new(run_array_2)],
2409 )
2410 .unwrap();
2411 let output_batch = roundtrip_ipc_stream(&input_batch);
2412
2413 assert_eq!(input_batch.column(1), output_batch.column(1));
2417
2418 let run_array_1_unsliced = unslice_run_array(run_array_1_sliced.into_data()).unwrap();
2419 assert_eq!(run_array_1_unsliced, output_batch.column(0).into_data());
2420 }
2421
2422 #[test]
2423 fn test_roundtrip_stream_nested_dict() {
2424 let xs = vec!["AA", "BB", "AA", "CC", "BB"];
2425 let dict = Arc::new(
2426 xs.clone()
2427 .into_iter()
2428 .collect::<DictionaryArray<Int8Type>>(),
2429 );
2430 let string_array: ArrayRef = Arc::new(StringArray::from(xs.clone()));
2431 let struct_array = StructArray::from(vec![
2432 (
2433 Arc::new(Field::new("f2.1", DataType::Utf8, false)),
2434 string_array,
2435 ),
2436 (
2437 Arc::new(Field::new("f2.2_struct", dict.data_type().clone(), false)),
2438 dict.clone() as ArrayRef,
2439 ),
2440 ]);
2441 let schema = Arc::new(Schema::new(vec![
2442 Field::new("f1_string", DataType::Utf8, false),
2443 Field::new("f2_struct", struct_array.data_type().clone(), false),
2444 ]));
2445 let input_batch = RecordBatch::try_new(
2446 schema,
2447 vec![
2448 Arc::new(StringArray::from(xs.clone())),
2449 Arc::new(struct_array),
2450 ],
2451 )
2452 .unwrap();
2453 let output_batch = roundtrip_ipc_stream(&input_batch);
2454 assert_eq!(input_batch, output_batch);
2455 }
2456
2457 #[test]
2458 fn test_roundtrip_stream_nested_dict_of_map_of_dict() {
2459 let values = StringArray::from(vec![Some("a"), None, Some("b"), Some("c")]);
2460 let values = Arc::new(values) as ArrayRef;
2461 let value_dict_keys = Int8Array::from_iter_values([0, 1, 1, 2, 3, 1]);
2462 let value_dict_array = DictionaryArray::new(value_dict_keys, values.clone());
2463
2464 let key_dict_keys = Int8Array::from_iter_values([0, 0, 2, 1, 1, 3]);
2465 let key_dict_array = DictionaryArray::new(key_dict_keys, values);
2466
2467 #[allow(deprecated)]
2468 let keys_field = Arc::new(Field::new_dict(
2469 "keys",
2470 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2471 true, 1,
2473 false,
2474 ));
2475 #[allow(deprecated)]
2476 let values_field = Arc::new(Field::new_dict(
2477 "values",
2478 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2479 true,
2480 2,
2481 false,
2482 ));
2483 let entry_struct = StructArray::from(vec![
2484 (keys_field, make_array(key_dict_array.into_data())),
2485 (values_field, make_array(value_dict_array.into_data())),
2486 ]);
2487 let map_data_type = DataType::Map(
2488 Arc::new(Field::new(
2489 "entries",
2490 entry_struct.data_type().clone(),
2491 false,
2492 )),
2493 false,
2494 );
2495
2496 let entry_offsets = Buffer::from_slice_ref([0, 2, 4, 6]);
2497 let map_data = ArrayData::builder(map_data_type)
2498 .len(3)
2499 .add_buffer(entry_offsets)
2500 .add_child_data(entry_struct.into_data())
2501 .build()
2502 .unwrap();
2503 let map_array = MapArray::from(map_data);
2504
2505 let dict_keys = Int8Array::from_iter_values([0, 1, 1, 2, 2, 1]);
2506 let dict_dict_array = DictionaryArray::new(dict_keys, Arc::new(map_array));
2507
2508 let schema = Arc::new(Schema::new(vec![Field::new(
2509 "f1",
2510 dict_dict_array.data_type().clone(),
2511 false,
2512 )]));
2513 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2514 let output_batch = roundtrip_ipc_stream(&input_batch);
2515 assert_eq!(input_batch, output_batch);
2516 }
2517
2518 fn test_roundtrip_stream_dict_of_list_of_dict_impl<
2519 OffsetSize: OffsetSizeTrait,
2520 U: ArrowNativeType,
2521 >(
2522 list_data_type: DataType,
2523 offsets: &[U; 5],
2524 ) {
2525 let values = StringArray::from(vec![Some("a"), None, Some("c"), None]);
2526 let keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1, 3]);
2527 let dict_array = DictionaryArray::new(keys, Arc::new(values));
2528 let dict_data = dict_array.to_data();
2529
2530 let value_offsets = Buffer::from_slice_ref(offsets);
2531
2532 let list_data = ArrayData::builder(list_data_type)
2533 .len(4)
2534 .add_buffer(value_offsets)
2535 .add_child_data(dict_data)
2536 .build()
2537 .unwrap();
2538 let list_array = GenericListArray::<OffsetSize>::from(list_data);
2539
2540 let keys_for_dict = Int8Array::from_iter_values([0, 3, 0, 1, 1, 2, 0, 1, 3]);
2541 let dict_dict_array = DictionaryArray::new(keys_for_dict, Arc::new(list_array));
2542
2543 let schema = Arc::new(Schema::new(vec![Field::new(
2544 "f1",
2545 dict_dict_array.data_type().clone(),
2546 false,
2547 )]));
2548 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2549 let output_batch = roundtrip_ipc_stream(&input_batch);
2550 assert_eq!(input_batch, output_batch);
2551 }
2552
2553 #[test]
2554 fn test_roundtrip_stream_dict_of_list_of_dict() {
2555 #[allow(deprecated)]
2557 let list_data_type = DataType::List(Arc::new(Field::new_dict(
2558 "item",
2559 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2560 true,
2561 1,
2562 false,
2563 )));
2564 let offsets: &[i32; 5] = &[0, 2, 4, 4, 6];
2565 test_roundtrip_stream_dict_of_list_of_dict_impl::<i32, i32>(list_data_type, offsets);
2566
2567 #[allow(deprecated)]
2569 let list_data_type = DataType::LargeList(Arc::new(Field::new_dict(
2570 "item",
2571 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2572 true,
2573 1,
2574 false,
2575 )));
2576 let offsets: &[i64; 5] = &[0, 2, 4, 4, 7];
2577 test_roundtrip_stream_dict_of_list_of_dict_impl::<i64, i64>(list_data_type, offsets);
2578 }
2579
2580 #[test]
2581 fn test_roundtrip_stream_dict_of_fixed_size_list_of_dict() {
2582 let values = StringArray::from(vec![Some("a"), None, Some("c"), None]);
2583 let keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1, 3, 1, 2]);
2584 let dict_array = DictionaryArray::new(keys, Arc::new(values));
2585 let dict_data = dict_array.into_data();
2586
2587 #[allow(deprecated)]
2588 let list_data_type = DataType::FixedSizeList(
2589 Arc::new(Field::new_dict(
2590 "item",
2591 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
2592 true,
2593 1,
2594 false,
2595 )),
2596 3,
2597 );
2598 let list_data = ArrayData::builder(list_data_type)
2599 .len(3)
2600 .add_child_data(dict_data)
2601 .build()
2602 .unwrap();
2603 let list_array = FixedSizeListArray::from(list_data);
2604
2605 let keys_for_dict = Int8Array::from_iter_values([0, 1, 0, 1, 1, 2, 0, 1, 2]);
2606 let dict_dict_array = DictionaryArray::new(keys_for_dict, Arc::new(list_array));
2607
2608 let schema = Arc::new(Schema::new(vec![Field::new(
2609 "f1",
2610 dict_dict_array.data_type().clone(),
2611 false,
2612 )]));
2613 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2614 let output_batch = roundtrip_ipc_stream(&input_batch);
2615 assert_eq!(input_batch, output_batch);
2616 }
2617
2618 const LONG_TEST_STRING: &str =
2619 "This is a long string to make sure binary view array handles it";
2620
2621 #[test]
2622 fn test_roundtrip_view_types() {
2623 let schema = Schema::new(vec![
2624 Field::new("field_1", DataType::BinaryView, true),
2625 Field::new("field_2", DataType::Utf8, true),
2626 Field::new("field_3", DataType::Utf8View, true),
2627 ]);
2628 let bin_values: Vec<Option<&[u8]>> = vec![
2629 Some(b"foo"),
2630 None,
2631 Some(b"bar"),
2632 Some(LONG_TEST_STRING.as_bytes()),
2633 ];
2634 let utf8_values: Vec<Option<&str>> =
2635 vec![Some("foo"), None, Some("bar"), Some(LONG_TEST_STRING)];
2636 let bin_view_array = BinaryViewArray::from_iter(bin_values);
2637 let utf8_array = StringArray::from_iter(utf8_values.iter());
2638 let utf8_view_array = StringViewArray::from_iter(utf8_values);
2639 let record_batch = RecordBatch::try_new(
2640 Arc::new(schema.clone()),
2641 vec![
2642 Arc::new(bin_view_array),
2643 Arc::new(utf8_array),
2644 Arc::new(utf8_view_array),
2645 ],
2646 )
2647 .unwrap();
2648
2649 assert_eq!(record_batch, roundtrip_ipc(&record_batch));
2650 assert_eq!(record_batch, roundtrip_ipc_stream(&record_batch));
2651
2652 let sliced_batch = record_batch.slice(1, 2);
2653 assert_eq!(sliced_batch, roundtrip_ipc(&sliced_batch));
2654 assert_eq!(sliced_batch, roundtrip_ipc_stream(&sliced_batch));
2655 }
2656
2657 #[test]
2658 fn test_roundtrip_view_types_nested_dict() {
2659 let bin_values: Vec<Option<&[u8]>> = vec![
2660 Some(b"foo"),
2661 None,
2662 Some(b"bar"),
2663 Some(LONG_TEST_STRING.as_bytes()),
2664 Some(b"field"),
2665 ];
2666 let utf8_values: Vec<Option<&str>> = vec![
2667 Some("foo"),
2668 None,
2669 Some("bar"),
2670 Some(LONG_TEST_STRING),
2671 Some("field"),
2672 ];
2673 let bin_view_array = Arc::new(BinaryViewArray::from_iter(bin_values));
2674 let utf8_view_array = Arc::new(StringViewArray::from_iter(utf8_values));
2675
2676 let key_dict_keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1, 3]);
2677 let key_dict_array = DictionaryArray::new(key_dict_keys, utf8_view_array.clone());
2678 #[allow(deprecated)]
2679 let keys_field = Arc::new(Field::new_dict(
2680 "keys",
2681 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8View)),
2682 true,
2683 1,
2684 false,
2685 ));
2686
2687 let value_dict_keys = Int8Array::from_iter_values([0, 3, 0, 1, 2, 0, 1]);
2688 let value_dict_array = DictionaryArray::new(value_dict_keys, bin_view_array);
2689 #[allow(deprecated)]
2690 let values_field = Arc::new(Field::new_dict(
2691 "values",
2692 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::BinaryView)),
2693 true,
2694 2,
2695 false,
2696 ));
2697 let entry_struct = StructArray::from(vec![
2698 (keys_field, make_array(key_dict_array.into_data())),
2699 (values_field, make_array(value_dict_array.into_data())),
2700 ]);
2701
2702 let map_data_type = DataType::Map(
2703 Arc::new(Field::new(
2704 "entries",
2705 entry_struct.data_type().clone(),
2706 false,
2707 )),
2708 false,
2709 );
2710 let entry_offsets = Buffer::from_slice_ref([0, 2, 4, 7]);
2711 let map_data = ArrayData::builder(map_data_type)
2712 .len(3)
2713 .add_buffer(entry_offsets)
2714 .add_child_data(entry_struct.into_data())
2715 .build()
2716 .unwrap();
2717 let map_array = MapArray::from(map_data);
2718
2719 let dict_keys = Int8Array::from_iter_values([0, 1, 0, 1, 1, 2, 0, 1, 2]);
2720 let dict_dict_array = DictionaryArray::new(dict_keys, Arc::new(map_array));
2721 let schema = Arc::new(Schema::new(vec![Field::new(
2722 "f1",
2723 dict_dict_array.data_type().clone(),
2724 false,
2725 )]));
2726 let batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
2727 assert_eq!(batch, roundtrip_ipc(&batch));
2728 assert_eq!(batch, roundtrip_ipc_stream(&batch));
2729
2730 let sliced_batch = batch.slice(1, 2);
2731 assert_eq!(sliced_batch, roundtrip_ipc(&sliced_batch));
2732 assert_eq!(sliced_batch, roundtrip_ipc_stream(&sliced_batch));
2733 }
2734
2735 #[test]
2736 fn test_no_columns_batch() {
2737 let schema = Arc::new(Schema::empty());
2738 let options = RecordBatchOptions::new()
2739 .with_match_field_names(true)
2740 .with_row_count(Some(10));
2741 let input_batch = RecordBatch::try_new_with_options(schema, vec![], &options).unwrap();
2742 let output_batch = roundtrip_ipc_stream(&input_batch);
2743 assert_eq!(input_batch, output_batch);
2744 }
2745
2746 #[test]
2747 fn test_unaligned() {
2748 let batch = RecordBatch::try_from_iter(vec![(
2749 "i32",
2750 Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _,
2751 )])
2752 .unwrap();
2753
2754 let r#gen = IpcDataGenerator {};
2755 let mut dict_tracker = DictionaryTracker::new(false);
2756 let (_, encoded) = r#gen
2757 .encode(
2758 &batch,
2759 &mut dict_tracker,
2760 &Default::default(),
2761 &mut Default::default(),
2762 )
2763 .unwrap();
2764
2765 let message = root_as_message(&encoded.ipc_message).unwrap();
2766
2767 let mut buffer = MutableBuffer::with_capacity(encoded.arrow_data.len() + 1);
2769 buffer.push(0_u8);
2770 buffer.extend_from_slice(&encoded.arrow_data);
2771 let b = Buffer::from(buffer).slice(1);
2772 assert_ne!(b.as_ptr().align_offset(8), 0);
2773
2774 let ipc_batch = message.header_as_record_batch().unwrap();
2775 let roundtrip = RecordBatchDecoder::try_new(
2776 &b,
2777 ipc_batch,
2778 batch.schema(),
2779 &Default::default(),
2780 &message.version(),
2781 )
2782 .unwrap()
2783 .with_require_alignment(false)
2784 .read_record_batch()
2785 .unwrap();
2786 assert_eq!(batch, roundtrip);
2787 }
2788
2789 #[test]
2790 fn test_unaligned_throws_error_with_require_alignment() {
2791 let batch = RecordBatch::try_from_iter(vec![(
2792 "i32",
2793 Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _,
2794 )])
2795 .unwrap();
2796
2797 let r#gen = IpcDataGenerator {};
2798 let mut dict_tracker = DictionaryTracker::new(false);
2799 let (_, encoded) = r#gen
2800 .encode(
2801 &batch,
2802 &mut dict_tracker,
2803 &Default::default(),
2804 &mut Default::default(),
2805 )
2806 .unwrap();
2807
2808 let message = root_as_message(&encoded.ipc_message).unwrap();
2809
2810 let mut buffer = MutableBuffer::with_capacity(encoded.arrow_data.len() + 1);
2812 buffer.push(0_u8);
2813 buffer.extend_from_slice(&encoded.arrow_data);
2814 let b = Buffer::from(buffer).slice(1);
2815 assert_ne!(b.as_ptr().align_offset(8), 0);
2816
2817 let ipc_batch = message.header_as_record_batch().unwrap();
2818 let result = RecordBatchDecoder::try_new(
2819 &b,
2820 ipc_batch,
2821 batch.schema(),
2822 &Default::default(),
2823 &message.version(),
2824 )
2825 .unwrap()
2826 .with_require_alignment(true)
2827 .read_record_batch();
2828
2829 let error = result.unwrap_err();
2830 assert_eq!(
2831 error.to_string(),
2832 "Invalid argument error: Misaligned buffers[0] in array of type Int32, \
2833 offset from expected alignment of 4 by 1"
2834 );
2835 }
2836
2837 #[test]
2838 fn test_file_with_massive_column_count() {
2839 let limit = 600_000;
2841
2842 let fields = (0..limit)
2843 .map(|i| Field::new(format!("{i}"), DataType::Boolean, false))
2844 .collect::<Vec<_>>();
2845 let schema = Arc::new(Schema::new(fields));
2846 let batch = RecordBatch::new_empty(schema);
2847
2848 let mut buf = Vec::new();
2849 let mut writer = crate::writer::FileWriter::try_new(&mut buf, batch.schema_ref()).unwrap();
2850 writer.write(&batch).unwrap();
2851 writer.finish().unwrap();
2852 drop(writer);
2853
2854 let mut reader = FileReaderBuilder::new()
2855 .with_max_footer_fb_tables(1_500_000)
2856 .build(std::io::Cursor::new(buf))
2857 .unwrap();
2858 let roundtrip_batch = reader.next().unwrap().unwrap();
2859
2860 assert_eq!(batch, roundtrip_batch);
2861 }
2862
2863 #[test]
2864 fn test_file_with_deeply_nested_columns() {
2865 let limit = 61;
2867
2868 let fields = (0..limit).fold(
2869 vec![Field::new("leaf", DataType::Boolean, false)],
2870 |field, index| vec![Field::new_struct(format!("{index}"), field, false)],
2871 );
2872 let schema = Arc::new(Schema::new(fields));
2873 let batch = RecordBatch::new_empty(schema);
2874
2875 let mut buf = Vec::new();
2876 let mut writer = crate::writer::FileWriter::try_new(&mut buf, batch.schema_ref()).unwrap();
2877 writer.write(&batch).unwrap();
2878 writer.finish().unwrap();
2879 drop(writer);
2880
2881 let mut reader = FileReaderBuilder::new()
2882 .with_max_footer_fb_depth(65)
2883 .build(std::io::Cursor::new(buf))
2884 .unwrap();
2885 let roundtrip_batch = reader.next().unwrap().unwrap();
2886
2887 assert_eq!(batch, roundtrip_batch);
2888 }
2889
2890 #[test]
2891 fn test_invalid_struct_array_ipc_read_errors() {
2892 let a_field = Field::new("a", DataType::Int32, false);
2893 let b_field = Field::new("b", DataType::Int32, false);
2894 let struct_fields = Fields::from(vec![a_field.clone(), b_field.clone()]);
2895
2896 let a_array_data = ArrayData::builder(a_field.data_type().clone())
2897 .len(4)
2898 .add_buffer(Buffer::from_slice_ref([1, 2, 3, 4]))
2899 .build()
2900 .unwrap();
2901 let b_array_data = ArrayData::builder(b_field.data_type().clone())
2902 .len(3)
2903 .add_buffer(Buffer::from_slice_ref([5, 6, 7]))
2904 .build()
2905 .unwrap();
2906
2907 let invalid_struct_arr = unsafe {
2908 StructArray::new_unchecked(
2909 struct_fields,
2910 vec![make_array(a_array_data), make_array(b_array_data)],
2911 None,
2912 )
2913 };
2914
2915 expect_ipc_validation_error(
2916 Arc::new(invalid_struct_arr),
2917 "Invalid argument error: Incorrect array length for StructArray field \"b\", expected 4 got 3",
2918 );
2919 }
2920
2921 #[test]
2922 fn test_invalid_nested_array_ipc_read_errors() {
2923 let a_field = Field::new("a", DataType::Int32, false);
2925 let b_field = Field::new("b", DataType::Utf8, false);
2926
2927 let schema = Arc::new(Schema::new(vec![Field::new_struct(
2928 "s",
2929 vec![a_field.clone(), b_field.clone()],
2930 false,
2931 )]));
2932
2933 let a_array_data = ArrayData::builder(a_field.data_type().clone())
2934 .len(4)
2935 .add_buffer(Buffer::from_slice_ref([1, 2, 3, 4]))
2936 .build()
2937 .unwrap();
2938 let b_array_data = {
2940 let valid: &[u8] = b" ";
2941 let mut invalid = vec![];
2942 invalid.extend_from_slice(b"ValidString");
2943 invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
2944 let binary_array =
2945 BinaryArray::from_iter(vec![None, Some(valid), None, Some(&invalid)]);
2946 let array = unsafe {
2947 StringArray::new_unchecked(
2948 binary_array.offsets().clone(),
2949 binary_array.values().clone(),
2950 binary_array.nulls().cloned(),
2951 )
2952 };
2953 array.into_data()
2954 };
2955 let struct_data_type = schema.field(0).data_type();
2956
2957 let invalid_struct_arr = unsafe {
2958 make_array(
2959 ArrayData::builder(struct_data_type.clone())
2960 .len(4)
2961 .add_child_data(a_array_data)
2962 .add_child_data(b_array_data)
2963 .build_unchecked(),
2964 )
2965 };
2966 expect_ipc_validation_error(
2967 Arc::new(invalid_struct_arr),
2968 "Invalid argument error: Invalid UTF8 sequence at string index 3 (3..18): invalid utf-8 sequence of 1 bytes from index 11",
2969 );
2970 }
2971
2972 #[test]
2973 fn test_same_dict_id_without_preserve() {
2974 let batch = RecordBatch::try_new(
2975 Arc::new(Schema::new(
2976 ["a", "b"]
2977 .iter()
2978 .map(|name| {
2979 #[allow(deprecated)]
2980 Field::new_dict(
2981 name.to_string(),
2982 DataType::Dictionary(
2983 Box::new(DataType::Int32),
2984 Box::new(DataType::Utf8),
2985 ),
2986 true,
2987 0,
2988 false,
2989 )
2990 })
2991 .collect::<Vec<Field>>(),
2992 )),
2993 vec![
2994 Arc::new(
2995 vec![Some("c"), Some("d")]
2996 .into_iter()
2997 .collect::<DictionaryArray<Int32Type>>(),
2998 ) as ArrayRef,
2999 Arc::new(
3000 vec![Some("e"), Some("f")]
3001 .into_iter()
3002 .collect::<DictionaryArray<Int32Type>>(),
3003 ) as ArrayRef,
3004 ],
3005 )
3006 .expect("Failed to create RecordBatch");
3007
3008 let mut buf = vec![];
3010 {
3011 let mut writer = crate::writer::StreamWriter::try_new_with_options(
3012 &mut buf,
3013 batch.schema().as_ref(),
3014 crate::writer::IpcWriteOptions::default(),
3015 )
3016 .expect("Failed to create StreamWriter");
3017 writer.write(&batch).expect("Failed to write RecordBatch");
3018 writer.finish().expect("Failed to finish StreamWriter");
3019 }
3020
3021 StreamReader::try_new(std::io::Cursor::new(buf), None)
3022 .expect("Failed to create StreamReader")
3023 .for_each(|decoded_batch| {
3024 assert_eq!(decoded_batch.expect("Failed to read RecordBatch"), batch);
3025 });
3026 }
3027
3028 #[test]
3029 fn test_validation_of_invalid_list_array() {
3030 let array = unsafe {
3032 let values = Int32Array::from(vec![1, 2, 3]);
3033 let bad_offsets = ScalarBuffer::<i32>::from(vec![0, 2, 4, 2]); let offsets = OffsetBuffer::new_unchecked(bad_offsets); let field = Field::new_list_field(DataType::Int32, true);
3036 let nulls = None;
3037 ListArray::new(Arc::new(field), offsets, Arc::new(values), nulls)
3038 };
3039
3040 expect_ipc_validation_error(
3041 Arc::new(array),
3042 "Invalid argument error: Offset invariant failure: offset at position 2 out of bounds: 4 > 2",
3043 );
3044 }
3045
3046 #[test]
3047 fn test_validation_of_invalid_string_array() {
3048 let valid: &[u8] = b" ";
3049 let mut invalid = vec![];
3050 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3051 invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
3052 let binary_array = BinaryArray::from_iter(vec![None, Some(valid), None, Some(&invalid)]);
3053 let array = unsafe {
3056 StringArray::new_unchecked(
3057 binary_array.offsets().clone(),
3058 binary_array.values().clone(),
3059 binary_array.nulls().cloned(),
3060 )
3061 };
3062 expect_ipc_validation_error(
3063 Arc::new(array),
3064 "Invalid argument error: Invalid UTF8 sequence at string index 3 (3..45): invalid utf-8 sequence of 1 bytes from index 38",
3065 );
3066 }
3067
3068 #[test]
3069 fn test_validation_of_invalid_string_view_array() {
3070 let valid: &[u8] = b" ";
3071 let mut invalid = vec![];
3072 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3073 invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
3074 let binary_view_array =
3075 BinaryViewArray::from_iter(vec![None, Some(valid), None, Some(&invalid)]);
3076 let array = unsafe {
3079 StringViewArray::new_unchecked(
3080 binary_view_array.views().clone(),
3081 binary_view_array.data_buffers().to_vec(),
3082 binary_view_array.nulls().cloned(),
3083 )
3084 };
3085 expect_ipc_validation_error(
3086 Arc::new(array),
3087 "Invalid argument error: Encountered non-UTF-8 data at index 3: invalid utf-8 sequence of 1 bytes from index 38",
3088 );
3089 }
3090
3091 #[test]
3094 fn test_validation_of_invalid_dictionary_array() {
3095 let array = unsafe {
3096 let values = StringArray::from_iter_values(["a", "b", "c"]);
3097 let keys = Int32Array::from(vec![1, 200]); DictionaryArray::new_unchecked(keys, Arc::new(values))
3099 };
3100
3101 expect_ipc_validation_error(
3102 Arc::new(array),
3103 "Invalid argument error: Value at position 1 out of bounds: 200 (should be in [0, 2])",
3104 );
3105 }
3106
3107 #[test]
3108 fn test_validation_of_invalid_union_array() {
3109 let array = unsafe {
3110 let fields = UnionFields::new(
3111 vec![1, 3], vec![
3113 Field::new("a", DataType::Int32, false),
3114 Field::new("b", DataType::Utf8, false),
3115 ],
3116 );
3117 let type_ids = ScalarBuffer::from(vec![1i8, 2, 3]); let offsets = None;
3119 let children: Vec<ArrayRef> = vec![
3120 Arc::new(Int32Array::from(vec![10, 20, 30])),
3121 Arc::new(StringArray::from(vec![Some("a"), Some("b"), Some("c")])),
3122 ];
3123
3124 UnionArray::new_unchecked(fields, type_ids, offsets, children)
3125 };
3126
3127 expect_ipc_validation_error(
3128 Arc::new(array),
3129 "Invalid argument error: Type Ids values must match one of the field type ids",
3130 );
3131 }
3132
3133 const INVALID_UTF8_FIRST_CHAR: &[u8] = &[0xa0, 0xa1, 0x20, 0x20];
3136
3137 fn expect_ipc_validation_error(array: ArrayRef, expected_err: &str) {
3139 let rb = RecordBatch::try_from_iter([("a", array)]).unwrap();
3140
3141 let buf = write_stream(&rb); read_stream_skip_validation(&buf).unwrap();
3144 let err = read_stream(&buf).unwrap_err();
3145 assert_eq!(err.to_string(), expected_err);
3146
3147 let buf = write_ipc(&rb); read_ipc_skip_validation(&buf).unwrap();
3150 let err = read_ipc(&buf).unwrap_err();
3151 assert_eq!(err.to_string(), expected_err);
3152
3153 read_ipc_with_decoder_skip_validation(buf.clone()).unwrap();
3155 let err = read_ipc_with_decoder(buf).unwrap_err();
3156 assert_eq!(err.to_string(), expected_err);
3157 }
3158
3159 #[test]
3160 fn test_roundtrip_schema() {
3161 let schema = Schema::new(vec![
3162 Field::new(
3163 "a",
3164 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
3165 false,
3166 ),
3167 Field::new(
3168 "b",
3169 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
3170 false,
3171 ),
3172 ]);
3173
3174 let options = IpcWriteOptions::default();
3175 let data_gen = IpcDataGenerator::default();
3176 let mut dict_tracker = DictionaryTracker::new(false);
3177 let encoded_data =
3178 data_gen.schema_to_bytes_with_dictionary_tracker(&schema, &mut dict_tracker, &options);
3179 let mut schema_bytes = vec![];
3180 write_message(&mut schema_bytes, encoded_data, &options).expect("write_message");
3181
3182 let begin_offset: usize = if schema_bytes[0..4].eq(&CONTINUATION_MARKER) {
3183 4
3184 } else {
3185 0
3186 };
3187
3188 size_prefixed_root_as_message(&schema_bytes[begin_offset..])
3189 .expect_err("size_prefixed_root_as_message");
3190
3191 let msg = parse_message(&schema_bytes).expect("parse_message");
3192 let ipc_schema = msg.header_as_schema().expect("header_as_schema");
3193 let new_schema = fb_to_schema(ipc_schema);
3194
3195 assert_eq!(schema, new_schema);
3196 }
3197
3198 #[test]
3199 fn test_negative_meta_len() {
3200 let bytes = i32::to_le_bytes(-1);
3201 let mut buf = vec![];
3202 buf.extend(CONTINUATION_MARKER);
3203 buf.extend(bytes);
3204
3205 let reader = StreamReader::try_new(Cursor::new(buf), None);
3206 assert!(reader.is_err());
3207 }
3208}