1use std::cmp::min;
28use std::collections::HashMap;
29use std::io::{BufWriter, Write};
30use std::mem::size_of;
31use std::sync::Arc;
32
33use flatbuffers::FlatBufferBuilder;
34
35use arrow_array::builder::BufferBuilder;
36use arrow_array::cast::*;
37use arrow_array::types::{Int16Type, Int32Type, Int64Type, RunEndIndexType};
38use arrow_array::*;
39use arrow_buffer::bit_util;
40use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer};
41use arrow_data::{layout, ArrayData, ArrayDataBuilder, BufferSpec};
42use arrow_schema::*;
43
44use crate::compression::CompressionCodec;
45use crate::convert::IpcSchemaEncoder;
46use crate::CONTINUATION_MARKER;
47
48#[derive(Debug, Clone)]
50pub struct IpcWriteOptions {
51 alignment: u8,
54 write_legacy_ipc_format: bool,
56 metadata_version: crate::MetadataVersion,
65 batch_compression_type: Option<crate::CompressionType>,
68 #[deprecated(
73 since = "54.0.0",
74 note = "The ability to preserve dictionary IDs will be removed. With it, all fields related to it."
75 )]
76 preserve_dict_id: bool,
77}
78
79impl IpcWriteOptions {
80 pub fn try_with_compression(
85 mut self,
86 batch_compression_type: Option<crate::CompressionType>,
87 ) -> Result<Self, ArrowError> {
88 self.batch_compression_type = batch_compression_type;
89
90 if self.batch_compression_type.is_some()
91 && self.metadata_version < crate::MetadataVersion::V5
92 {
93 return Err(ArrowError::InvalidArgumentError(
94 "Compression only supported in metadata v5 and above".to_string(),
95 ));
96 }
97 Ok(self)
98 }
99 pub fn try_new(
101 alignment: usize,
102 write_legacy_ipc_format: bool,
103 metadata_version: crate::MetadataVersion,
104 ) -> Result<Self, ArrowError> {
105 let is_alignment_valid =
106 alignment == 8 || alignment == 16 || alignment == 32 || alignment == 64;
107 if !is_alignment_valid {
108 return Err(ArrowError::InvalidArgumentError(
109 "Alignment should be 8, 16, 32, or 64.".to_string(),
110 ));
111 }
112 let alignment: u8 = u8::try_from(alignment).expect("range already checked");
113 match metadata_version {
114 crate::MetadataVersion::V1
115 | crate::MetadataVersion::V2
116 | crate::MetadataVersion::V3 => Err(ArrowError::InvalidArgumentError(
117 "Writing IPC metadata version 3 and lower not supported".to_string(),
118 )),
119 #[allow(deprecated)]
120 crate::MetadataVersion::V4 => Ok(Self {
121 alignment,
122 write_legacy_ipc_format,
123 metadata_version,
124 batch_compression_type: None,
125 preserve_dict_id: false,
126 }),
127 crate::MetadataVersion::V5 => {
128 if write_legacy_ipc_format {
129 Err(ArrowError::InvalidArgumentError(
130 "Legacy IPC format only supported on metadata version 4".to_string(),
131 ))
132 } else {
133 #[allow(deprecated)]
134 Ok(Self {
135 alignment,
136 write_legacy_ipc_format,
137 metadata_version,
138 batch_compression_type: None,
139 preserve_dict_id: false,
140 })
141 }
142 }
143 z => Err(ArrowError::InvalidArgumentError(format!(
144 "Unsupported crate::MetadataVersion {z:?}"
145 ))),
146 }
147 }
148
149 #[deprecated(
152 since = "54.0.0",
153 note = "The ability to preserve dictionary IDs will be removed. With it, all functions related to it."
154 )]
155 pub fn preserve_dict_id(&self) -> bool {
156 #[allow(deprecated)]
157 self.preserve_dict_id
158 }
159
160 #[deprecated(
168 since = "54.0.0",
169 note = "The ability to preserve dictionary IDs will be removed. With it, all functions related to it."
170 )]
171 #[allow(deprecated)]
172 pub fn with_preserve_dict_id(mut self, preserve_dict_id: bool) -> Self {
173 self.preserve_dict_id = preserve_dict_id;
174 self
175 }
176}
177
178impl Default for IpcWriteOptions {
179 fn default() -> Self {
180 #[allow(deprecated)]
181 Self {
182 alignment: 64,
183 write_legacy_ipc_format: false,
184 metadata_version: crate::MetadataVersion::V5,
185 batch_compression_type: None,
186 preserve_dict_id: false,
187 }
188 }
189}
190
191#[derive(Debug, Default)]
192pub struct IpcDataGenerator {}
224
225impl IpcDataGenerator {
226 pub fn schema_to_bytes_with_dictionary_tracker(
232 &self,
233 schema: &Schema,
234 dictionary_tracker: &mut DictionaryTracker,
235 write_options: &IpcWriteOptions,
236 ) -> EncodedData {
237 let mut fbb = FlatBufferBuilder::new();
238 let schema = {
239 let fb = IpcSchemaEncoder::new()
240 .with_dictionary_tracker(dictionary_tracker)
241 .schema_to_fb_offset(&mut fbb, schema);
242 fb.as_union_value()
243 };
244
245 let mut message = crate::MessageBuilder::new(&mut fbb);
246 message.add_version(write_options.metadata_version);
247 message.add_header_type(crate::MessageHeader::Schema);
248 message.add_bodyLength(0);
249 message.add_header(schema);
250 let data = message.finish();
252 fbb.finish(data, None);
253
254 let data = fbb.finished_data();
255 EncodedData {
256 ipc_message: data.to_vec(),
257 arrow_data: vec![],
258 }
259 }
260
261 #[deprecated(
262 since = "54.0.0",
263 note = "Use `schema_to_bytes_with_dictionary_tracker` instead. This function signature of `schema_to_bytes_with_dictionary_tracker` in the next release."
264 )]
265 pub fn schema_to_bytes(&self, schema: &Schema, write_options: &IpcWriteOptions) -> EncodedData {
267 let mut fbb = FlatBufferBuilder::new();
268 let schema = {
269 #[allow(deprecated)]
270 let fb = crate::convert::schema_to_fb_offset(&mut fbb, schema);
272 fb.as_union_value()
273 };
274
275 let mut message = crate::MessageBuilder::new(&mut fbb);
276 message.add_version(write_options.metadata_version);
277 message.add_header_type(crate::MessageHeader::Schema);
278 message.add_bodyLength(0);
279 message.add_header(schema);
280 let data = message.finish();
282 fbb.finish(data, None);
283
284 let data = fbb.finished_data();
285 EncodedData {
286 ipc_message: data.to_vec(),
287 arrow_data: vec![],
288 }
289 }
290
291 fn _encode_dictionaries<I: Iterator<Item = i64>>(
292 &self,
293 column: &ArrayRef,
294 encoded_dictionaries: &mut Vec<EncodedData>,
295 dictionary_tracker: &mut DictionaryTracker,
296 write_options: &IpcWriteOptions,
297 dict_id: &mut I,
298 ) -> Result<(), ArrowError> {
299 match column.data_type() {
300 DataType::Struct(fields) => {
301 let s = as_struct_array(column);
302 for (field, column) in fields.iter().zip(s.columns()) {
303 self.encode_dictionaries(
304 field,
305 column,
306 encoded_dictionaries,
307 dictionary_tracker,
308 write_options,
309 dict_id,
310 )?;
311 }
312 }
313 DataType::RunEndEncoded(_, values) => {
314 let data = column.to_data();
315 if data.child_data().len() != 2 {
316 return Err(ArrowError::InvalidArgumentError(format!(
317 "The run encoded array should have exactly two child arrays. Found {}",
318 data.child_data().len()
319 )));
320 }
321 let values_array = make_array(data.child_data()[1].clone());
324 self.encode_dictionaries(
325 values,
326 &values_array,
327 encoded_dictionaries,
328 dictionary_tracker,
329 write_options,
330 dict_id,
331 )?;
332 }
333 DataType::List(field) => {
334 let list = as_list_array(column);
335 self.encode_dictionaries(
336 field,
337 list.values(),
338 encoded_dictionaries,
339 dictionary_tracker,
340 write_options,
341 dict_id,
342 )?;
343 }
344 DataType::LargeList(field) => {
345 let list = as_large_list_array(column);
346 self.encode_dictionaries(
347 field,
348 list.values(),
349 encoded_dictionaries,
350 dictionary_tracker,
351 write_options,
352 dict_id,
353 )?;
354 }
355 DataType::FixedSizeList(field, _) => {
356 let list = column
357 .as_any()
358 .downcast_ref::<FixedSizeListArray>()
359 .expect("Unable to downcast to fixed size list array");
360 self.encode_dictionaries(
361 field,
362 list.values(),
363 encoded_dictionaries,
364 dictionary_tracker,
365 write_options,
366 dict_id,
367 )?;
368 }
369 DataType::Map(field, _) => {
370 let map_array = as_map_array(column);
371
372 let (keys, values) = match field.data_type() {
373 DataType::Struct(fields) if fields.len() == 2 => (&fields[0], &fields[1]),
374 _ => panic!("Incorrect field data type {:?}", field.data_type()),
375 };
376
377 self.encode_dictionaries(
379 keys,
380 map_array.keys(),
381 encoded_dictionaries,
382 dictionary_tracker,
383 write_options,
384 dict_id,
385 )?;
386
387 self.encode_dictionaries(
389 values,
390 map_array.values(),
391 encoded_dictionaries,
392 dictionary_tracker,
393 write_options,
394 dict_id,
395 )?;
396 }
397 DataType::Union(fields, _) => {
398 let union = as_union_array(column);
399 for (type_id, field) in fields.iter() {
400 let column = union.child(type_id);
401 self.encode_dictionaries(
402 field,
403 column,
404 encoded_dictionaries,
405 dictionary_tracker,
406 write_options,
407 dict_id,
408 )?;
409 }
410 }
411 _ => (),
412 }
413
414 Ok(())
415 }
416
417 fn encode_dictionaries<I: Iterator<Item = i64>>(
418 &self,
419 field: &Field,
420 column: &ArrayRef,
421 encoded_dictionaries: &mut Vec<EncodedData>,
422 dictionary_tracker: &mut DictionaryTracker,
423 write_options: &IpcWriteOptions,
424 dict_id_seq: &mut I,
425 ) -> Result<(), ArrowError> {
426 match column.data_type() {
427 DataType::Dictionary(_key_type, _value_type) => {
428 let dict_data = column.to_data();
429 let dict_values = &dict_data.child_data()[0];
430
431 let values = make_array(dict_data.child_data()[0].clone());
432
433 self._encode_dictionaries(
434 &values,
435 encoded_dictionaries,
436 dictionary_tracker,
437 write_options,
438 dict_id_seq,
439 )?;
440
441 #[allow(deprecated)]
445 let dict_id = dict_id_seq
446 .next()
447 .or_else(|| field.dict_id())
448 .ok_or_else(|| {
449 ArrowError::IpcError(format!("no dict id for field {}", field.name()))
450 })?;
451
452 let emit = dictionary_tracker.insert(dict_id, column)?;
453
454 if emit {
455 encoded_dictionaries.push(self.dictionary_batch_to_bytes(
456 dict_id,
457 dict_values,
458 write_options,
459 )?);
460 }
461 }
462 _ => self._encode_dictionaries(
463 column,
464 encoded_dictionaries,
465 dictionary_tracker,
466 write_options,
467 dict_id_seq,
468 )?,
469 }
470
471 Ok(())
472 }
473
474 pub fn encoded_batch(
478 &self,
479 batch: &RecordBatch,
480 dictionary_tracker: &mut DictionaryTracker,
481 write_options: &IpcWriteOptions,
482 ) -> Result<(Vec<EncodedData>, EncodedData), ArrowError> {
483 let schema = batch.schema();
484 let mut encoded_dictionaries = Vec::with_capacity(schema.flattened_fields().len());
485
486 let mut dict_id = dictionary_tracker.dict_ids.clone().into_iter();
487
488 for (i, field) in schema.fields().iter().enumerate() {
489 let column = batch.column(i);
490 self.encode_dictionaries(
491 field,
492 column,
493 &mut encoded_dictionaries,
494 dictionary_tracker,
495 write_options,
496 &mut dict_id,
497 )?;
498 }
499
500 let encoded_message = self.record_batch_to_bytes(batch, write_options)?;
501 Ok((encoded_dictionaries, encoded_message))
502 }
503
504 fn record_batch_to_bytes(
507 &self,
508 batch: &RecordBatch,
509 write_options: &IpcWriteOptions,
510 ) -> Result<EncodedData, ArrowError> {
511 let mut fbb = FlatBufferBuilder::new();
512
513 let mut nodes: Vec<crate::FieldNode> = vec![];
514 let mut buffers: Vec<crate::Buffer> = vec![];
515 let mut arrow_data: Vec<u8> = vec![];
516 let mut offset = 0;
517
518 let batch_compression_type = write_options.batch_compression_type;
520
521 let compression = batch_compression_type.map(|batch_compression_type| {
522 let mut c = crate::BodyCompressionBuilder::new(&mut fbb);
523 c.add_method(crate::BodyCompressionMethod::BUFFER);
524 c.add_codec(batch_compression_type);
525 c.finish()
526 });
527
528 let compression_codec: Option<CompressionCodec> =
529 batch_compression_type.map(TryInto::try_into).transpose()?;
530
531 let mut variadic_buffer_counts = vec![];
532
533 for array in batch.columns() {
534 let array_data = array.to_data();
535 offset = write_array_data(
536 &array_data,
537 &mut buffers,
538 &mut arrow_data,
539 &mut nodes,
540 offset,
541 array.len(),
542 array.null_count(),
543 compression_codec,
544 write_options,
545 )?;
546
547 append_variadic_buffer_counts(&mut variadic_buffer_counts, &array_data);
548 }
549 let len = arrow_data.len();
551 let pad_len = pad_to_alignment(write_options.alignment, len);
552 arrow_data.extend_from_slice(&PADDING[..pad_len]);
553
554 let buffers = fbb.create_vector(&buffers);
556 let nodes = fbb.create_vector(&nodes);
557 let variadic_buffer = if variadic_buffer_counts.is_empty() {
558 None
559 } else {
560 Some(fbb.create_vector(&variadic_buffer_counts))
561 };
562
563 let root = {
564 let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb);
565 batch_builder.add_length(batch.num_rows() as i64);
566 batch_builder.add_nodes(nodes);
567 batch_builder.add_buffers(buffers);
568 if let Some(c) = compression {
569 batch_builder.add_compression(c);
570 }
571
572 if let Some(v) = variadic_buffer {
573 batch_builder.add_variadicBufferCounts(v);
574 }
575 let b = batch_builder.finish();
576 b.as_union_value()
577 };
578 let mut message = crate::MessageBuilder::new(&mut fbb);
580 message.add_version(write_options.metadata_version);
581 message.add_header_type(crate::MessageHeader::RecordBatch);
582 message.add_bodyLength(arrow_data.len() as i64);
583 message.add_header(root);
584 let root = message.finish();
585 fbb.finish(root, None);
586 let finished_data = fbb.finished_data();
587
588 Ok(EncodedData {
589 ipc_message: finished_data.to_vec(),
590 arrow_data,
591 })
592 }
593
594 fn dictionary_batch_to_bytes(
597 &self,
598 dict_id: i64,
599 array_data: &ArrayData,
600 write_options: &IpcWriteOptions,
601 ) -> Result<EncodedData, ArrowError> {
602 let mut fbb = FlatBufferBuilder::new();
603
604 let mut nodes: Vec<crate::FieldNode> = vec![];
605 let mut buffers: Vec<crate::Buffer> = vec![];
606 let mut arrow_data: Vec<u8> = vec![];
607
608 let batch_compression_type = write_options.batch_compression_type;
610
611 let compression = batch_compression_type.map(|batch_compression_type| {
612 let mut c = crate::BodyCompressionBuilder::new(&mut fbb);
613 c.add_method(crate::BodyCompressionMethod::BUFFER);
614 c.add_codec(batch_compression_type);
615 c.finish()
616 });
617
618 let compression_codec: Option<CompressionCodec> = batch_compression_type
619 .map(|batch_compression_type| batch_compression_type.try_into())
620 .transpose()?;
621
622 write_array_data(
623 array_data,
624 &mut buffers,
625 &mut arrow_data,
626 &mut nodes,
627 0,
628 array_data.len(),
629 array_data.null_count(),
630 compression_codec,
631 write_options,
632 )?;
633
634 let mut variadic_buffer_counts = vec![];
635 append_variadic_buffer_counts(&mut variadic_buffer_counts, array_data);
636
637 let len = arrow_data.len();
639 let pad_len = pad_to_alignment(write_options.alignment, len);
640 arrow_data.extend_from_slice(&PADDING[..pad_len]);
641
642 let buffers = fbb.create_vector(&buffers);
644 let nodes = fbb.create_vector(&nodes);
645 let variadic_buffer = if variadic_buffer_counts.is_empty() {
646 None
647 } else {
648 Some(fbb.create_vector(&variadic_buffer_counts))
649 };
650
651 let root = {
652 let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb);
653 batch_builder.add_length(array_data.len() as i64);
654 batch_builder.add_nodes(nodes);
655 batch_builder.add_buffers(buffers);
656 if let Some(c) = compression {
657 batch_builder.add_compression(c);
658 }
659 if let Some(v) = variadic_buffer {
660 batch_builder.add_variadicBufferCounts(v);
661 }
662 batch_builder.finish()
663 };
664
665 let root = {
666 let mut batch_builder = crate::DictionaryBatchBuilder::new(&mut fbb);
667 batch_builder.add_id(dict_id);
668 batch_builder.add_data(root);
669 batch_builder.finish().as_union_value()
670 };
671
672 let root = {
673 let mut message_builder = crate::MessageBuilder::new(&mut fbb);
674 message_builder.add_version(write_options.metadata_version);
675 message_builder.add_header_type(crate::MessageHeader::DictionaryBatch);
676 message_builder.add_bodyLength(arrow_data.len() as i64);
677 message_builder.add_header(root);
678 message_builder.finish()
679 };
680
681 fbb.finish(root, None);
682 let finished_data = fbb.finished_data();
683
684 Ok(EncodedData {
685 ipc_message: finished_data.to_vec(),
686 arrow_data,
687 })
688 }
689}
690
691fn append_variadic_buffer_counts(counts: &mut Vec<i64>, array: &ArrayData) {
692 match array.data_type() {
693 DataType::BinaryView | DataType::Utf8View => {
694 counts.push(array.buffers().len() as i64 - 1);
697 }
698 DataType::Dictionary(_, _) => {
699 }
702 _ => {
703 for child in array.child_data() {
704 append_variadic_buffer_counts(counts, child)
705 }
706 }
707 }
708}
709
710pub(crate) fn unslice_run_array(arr: ArrayData) -> Result<ArrayData, ArrowError> {
711 match arr.data_type() {
712 DataType::RunEndEncoded(k, _) => match k.data_type() {
713 DataType::Int16 => {
714 Ok(into_zero_offset_run_array(RunArray::<Int16Type>::from(arr))?.into_data())
715 }
716 DataType::Int32 => {
717 Ok(into_zero_offset_run_array(RunArray::<Int32Type>::from(arr))?.into_data())
718 }
719 DataType::Int64 => {
720 Ok(into_zero_offset_run_array(RunArray::<Int64Type>::from(arr))?.into_data())
721 }
722 d => unreachable!("Unexpected data type {d}"),
723 },
724 d => Err(ArrowError::InvalidArgumentError(format!(
725 "The given array is not a run array. Data type of given array: {d}"
726 ))),
727 }
728}
729
730fn into_zero_offset_run_array<R: RunEndIndexType>(
733 run_array: RunArray<R>,
734) -> Result<RunArray<R>, ArrowError> {
735 let run_ends = run_array.run_ends();
736 if run_ends.offset() == 0 && run_ends.max_value() == run_ends.len() {
737 return Ok(run_array);
738 }
739
740 let start_physical_index = run_ends.get_start_physical_index();
742
743 let end_physical_index = run_ends.get_end_physical_index();
745
746 let physical_length = end_physical_index - start_physical_index + 1;
747
748 let offset = R::Native::usize_as(run_ends.offset());
750 let mut builder = BufferBuilder::<R::Native>::new(physical_length);
751 for run_end_value in &run_ends.values()[start_physical_index..end_physical_index] {
752 builder.append(run_end_value.sub_wrapping(offset));
753 }
754 builder.append(R::Native::from_usize(run_array.len()).unwrap());
755 let new_run_ends = unsafe {
756 ArrayDataBuilder::new(R::DATA_TYPE)
759 .len(physical_length)
760 .add_buffer(builder.finish())
761 .build_unchecked()
762 };
763
764 let new_values = run_array
766 .values()
767 .slice(start_physical_index, physical_length)
768 .into_data();
769
770 let builder = ArrayDataBuilder::new(run_array.data_type().clone())
771 .len(run_array.len())
772 .add_child_data(new_run_ends)
773 .add_child_data(new_values);
774 let array_data = unsafe {
775 builder.build_unchecked()
778 };
779 Ok(array_data.into())
780}
781
782#[derive(Debug)]
788pub struct DictionaryTracker {
789 written: HashMap<i64, ArrayData>,
790 dict_ids: Vec<i64>,
791 error_on_replacement: bool,
792 #[deprecated(
793 since = "54.0.0",
794 note = "The ability to preserve dictionary IDs will be removed. With it, all fields related to it."
795 )]
796 preserve_dict_id: bool,
797}
798
799impl DictionaryTracker {
800 pub fn new(error_on_replacement: bool) -> Self {
811 #[allow(deprecated)]
812 Self {
813 written: HashMap::new(),
814 dict_ids: Vec::new(),
815 error_on_replacement,
816 preserve_dict_id: false,
817 }
818 }
819
820 #[deprecated(
826 since = "54.0.0",
827 note = "The ability to preserve dictionary IDs will be removed. With it, all functions related to it."
828 )]
829 pub fn new_with_preserve_dict_id(error_on_replacement: bool, preserve_dict_id: bool) -> Self {
830 #[allow(deprecated)]
831 Self {
832 written: HashMap::new(),
833 dict_ids: Vec::new(),
834 error_on_replacement,
835 preserve_dict_id,
836 }
837 }
838
839 #[deprecated(
847 since = "54.0.0",
848 note = "The ability to preserve dictionary IDs will be removed. With it, all functions related to it."
849 )]
850 pub fn set_dict_id(&mut self, field: &Field) -> i64 {
851 #[allow(deprecated)]
852 let next = if self.preserve_dict_id {
853 #[allow(deprecated)]
854 field.dict_id().expect("no dict_id in field")
855 } else {
856 self.dict_ids
857 .last()
858 .copied()
859 .map(|i| i + 1)
860 .unwrap_or_default()
861 };
862
863 self.dict_ids.push(next);
864 next
865 }
866
867 pub fn dict_id(&mut self) -> &[i64] {
870 &self.dict_ids
871 }
872
873 pub fn insert(&mut self, dict_id: i64, column: &ArrayRef) -> Result<bool, ArrowError> {
883 let dict_data = column.to_data();
884 let dict_values = &dict_data.child_data()[0];
885
886 if let Some(last) = self.written.get(&dict_id) {
888 if ArrayData::ptr_eq(&last.child_data()[0], dict_values) {
889 return Ok(false);
891 }
892 if self.error_on_replacement {
893 if last.child_data()[0] == *dict_values {
895 return Ok(false);
897 }
898 return Err(ArrowError::InvalidArgumentError(
899 "Dictionary replacement detected when writing IPC file format. \
900 Arrow IPC files only support a single dictionary for a given field \
901 across all batches."
902 .to_string(),
903 ));
904 }
905 }
906
907 self.written.insert(dict_id, dict_data);
908 Ok(true)
909 }
910}
911
912pub struct FileWriter<W> {
935 writer: W,
937 write_options: IpcWriteOptions,
939 schema: SchemaRef,
941 block_offsets: usize,
943 dictionary_blocks: Vec<crate::Block>,
945 record_blocks: Vec<crate::Block>,
947 finished: bool,
949 dictionary_tracker: DictionaryTracker,
951 custom_metadata: HashMap<String, String>,
953
954 data_gen: IpcDataGenerator,
955}
956
957impl<W: Write> FileWriter<BufWriter<W>> {
958 pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
962 Self::try_new(BufWriter::new(writer), schema)
963 }
964}
965
966impl<W: Write> FileWriter<W> {
967 pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
975 let write_options = IpcWriteOptions::default();
976 Self::try_new_with_options(writer, schema, write_options)
977 }
978
979 pub fn try_new_with_options(
987 mut writer: W,
988 schema: &Schema,
989 write_options: IpcWriteOptions,
990 ) -> Result<Self, ArrowError> {
991 let data_gen = IpcDataGenerator::default();
992 let pad_len = pad_to_alignment(write_options.alignment, super::ARROW_MAGIC.len());
994 let header_size = super::ARROW_MAGIC.len() + pad_len;
995 writer.write_all(&super::ARROW_MAGIC)?;
996 writer.write_all(&PADDING[..pad_len])?;
997 #[allow(deprecated)]
999 let preserve_dict_id = write_options.preserve_dict_id;
1000 #[allow(deprecated)]
1001 let mut dictionary_tracker =
1002 DictionaryTracker::new_with_preserve_dict_id(true, preserve_dict_id);
1003 let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker(
1004 schema,
1005 &mut dictionary_tracker,
1006 &write_options,
1007 );
1008 let (meta, data) = write_message(&mut writer, encoded_message, &write_options)?;
1009 Ok(Self {
1010 writer,
1011 write_options,
1012 schema: Arc::new(schema.clone()),
1013 block_offsets: meta + data + header_size,
1014 dictionary_blocks: vec![],
1015 record_blocks: vec![],
1016 finished: false,
1017 dictionary_tracker,
1018 custom_metadata: HashMap::new(),
1019 data_gen,
1020 })
1021 }
1022
1023 pub fn write_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
1025 self.custom_metadata.insert(key.into(), value.into());
1026 }
1027
1028 pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1030 if self.finished {
1031 return Err(ArrowError::IpcError(
1032 "Cannot write record batch to file writer as it is closed".to_string(),
1033 ));
1034 }
1035
1036 let (encoded_dictionaries, encoded_message) = self.data_gen.encoded_batch(
1037 batch,
1038 &mut self.dictionary_tracker,
1039 &self.write_options,
1040 )?;
1041
1042 for encoded_dictionary in encoded_dictionaries {
1043 let (meta, data) =
1044 write_message(&mut self.writer, encoded_dictionary, &self.write_options)?;
1045
1046 let block = crate::Block::new(self.block_offsets as i64, meta as i32, data as i64);
1047 self.dictionary_blocks.push(block);
1048 self.block_offsets += meta + data;
1049 }
1050
1051 let (meta, data) = write_message(&mut self.writer, encoded_message, &self.write_options)?;
1052 let block = crate::Block::new(
1054 self.block_offsets as i64,
1055 meta as i32, data as i64,
1057 );
1058 self.record_blocks.push(block);
1059 self.block_offsets += meta + data;
1060 Ok(())
1061 }
1062
1063 pub fn finish(&mut self) -> Result<(), ArrowError> {
1065 if self.finished {
1066 return Err(ArrowError::IpcError(
1067 "Cannot write footer to file writer as it is closed".to_string(),
1068 ));
1069 }
1070
1071 write_continuation(&mut self.writer, &self.write_options, 0)?;
1073
1074 let mut fbb = FlatBufferBuilder::new();
1075 let dictionaries = fbb.create_vector(&self.dictionary_blocks);
1076 let record_batches = fbb.create_vector(&self.record_blocks);
1077 #[allow(deprecated)]
1078 let preserve_dict_id = self.write_options.preserve_dict_id;
1079 #[allow(deprecated)]
1080 let mut dictionary_tracker =
1081 DictionaryTracker::new_with_preserve_dict_id(true, preserve_dict_id);
1082 let schema = IpcSchemaEncoder::new()
1083 .with_dictionary_tracker(&mut dictionary_tracker)
1084 .schema_to_fb_offset(&mut fbb, &self.schema);
1085 let fb_custom_metadata = (!self.custom_metadata.is_empty())
1086 .then(|| crate::convert::metadata_to_fb(&mut fbb, &self.custom_metadata));
1087
1088 let root = {
1089 let mut footer_builder = crate::FooterBuilder::new(&mut fbb);
1090 footer_builder.add_version(self.write_options.metadata_version);
1091 footer_builder.add_schema(schema);
1092 footer_builder.add_dictionaries(dictionaries);
1093 footer_builder.add_recordBatches(record_batches);
1094 if let Some(fb_custom_metadata) = fb_custom_metadata {
1095 footer_builder.add_custom_metadata(fb_custom_metadata);
1096 }
1097 footer_builder.finish()
1098 };
1099 fbb.finish(root, None);
1100 let footer_data = fbb.finished_data();
1101 self.writer.write_all(footer_data)?;
1102 self.writer
1103 .write_all(&(footer_data.len() as i32).to_le_bytes())?;
1104 self.writer.write_all(&super::ARROW_MAGIC)?;
1105 self.writer.flush()?;
1106 self.finished = true;
1107
1108 Ok(())
1109 }
1110
1111 pub fn schema(&self) -> &SchemaRef {
1113 &self.schema
1114 }
1115
1116 pub fn get_ref(&self) -> &W {
1118 &self.writer
1119 }
1120
1121 pub fn get_mut(&mut self) -> &mut W {
1125 &mut self.writer
1126 }
1127
1128 pub fn flush(&mut self) -> Result<(), ArrowError> {
1132 self.writer.flush()?;
1133 Ok(())
1134 }
1135
1136 pub fn into_inner(mut self) -> Result<W, ArrowError> {
1145 if !self.finished {
1146 self.finish()?;
1148 }
1149 Ok(self.writer)
1150 }
1151}
1152
1153impl<W: Write> RecordBatchWriter for FileWriter<W> {
1154 fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1155 self.write(batch)
1156 }
1157
1158 fn close(mut self) -> Result<(), ArrowError> {
1159 self.finish()
1160 }
1161}
1162
1163pub struct StreamWriter<W> {
1187 writer: W,
1189 write_options: IpcWriteOptions,
1191 finished: bool,
1193 dictionary_tracker: DictionaryTracker,
1195
1196 data_gen: IpcDataGenerator,
1197}
1198
1199impl<W: Write> StreamWriter<BufWriter<W>> {
1200 pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1204 Self::try_new(BufWriter::new(writer), schema)
1205 }
1206}
1207
1208impl<W: Write> StreamWriter<W> {
1209 pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1217 let write_options = IpcWriteOptions::default();
1218 Self::try_new_with_options(writer, schema, write_options)
1219 }
1220
1221 pub fn try_new_with_options(
1227 mut writer: W,
1228 schema: &Schema,
1229 write_options: IpcWriteOptions,
1230 ) -> Result<Self, ArrowError> {
1231 let data_gen = IpcDataGenerator::default();
1232 #[allow(deprecated)]
1233 let preserve_dict_id = write_options.preserve_dict_id;
1234 #[allow(deprecated)]
1235 let mut dictionary_tracker =
1236 DictionaryTracker::new_with_preserve_dict_id(false, preserve_dict_id);
1237
1238 let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker(
1240 schema,
1241 &mut dictionary_tracker,
1242 &write_options,
1243 );
1244 write_message(&mut writer, encoded_message, &write_options)?;
1245 Ok(Self {
1246 writer,
1247 write_options,
1248 finished: false,
1249 dictionary_tracker,
1250 data_gen,
1251 })
1252 }
1253
1254 pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1256 if self.finished {
1257 return Err(ArrowError::IpcError(
1258 "Cannot write record batch to stream writer as it is closed".to_string(),
1259 ));
1260 }
1261
1262 let (encoded_dictionaries, encoded_message) = self
1263 .data_gen
1264 .encoded_batch(batch, &mut self.dictionary_tracker, &self.write_options)
1265 .expect("StreamWriter is configured to not error on dictionary replacement");
1266
1267 for encoded_dictionary in encoded_dictionaries {
1268 write_message(&mut self.writer, encoded_dictionary, &self.write_options)?;
1269 }
1270
1271 write_message(&mut self.writer, encoded_message, &self.write_options)?;
1272 Ok(())
1273 }
1274
1275 pub fn finish(&mut self) -> Result<(), ArrowError> {
1277 if self.finished {
1278 return Err(ArrowError::IpcError(
1279 "Cannot write footer to stream writer as it is closed".to_string(),
1280 ));
1281 }
1282
1283 write_continuation(&mut self.writer, &self.write_options, 0)?;
1284
1285 self.finished = true;
1286
1287 Ok(())
1288 }
1289
1290 pub fn get_ref(&self) -> &W {
1292 &self.writer
1293 }
1294
1295 pub fn get_mut(&mut self) -> &mut W {
1299 &mut self.writer
1300 }
1301
1302 pub fn flush(&mut self) -> Result<(), ArrowError> {
1306 self.writer.flush()?;
1307 Ok(())
1308 }
1309
1310 pub fn into_inner(mut self) -> Result<W, ArrowError> {
1348 if !self.finished {
1349 self.finish()?;
1351 }
1352 Ok(self.writer)
1353 }
1354}
1355
1356impl<W: Write> RecordBatchWriter for StreamWriter<W> {
1357 fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1358 self.write(batch)
1359 }
1360
1361 fn close(mut self) -> Result<(), ArrowError> {
1362 self.finish()
1363 }
1364}
1365
1366pub struct EncodedData {
1368 pub ipc_message: Vec<u8>,
1370 pub arrow_data: Vec<u8>,
1372}
1373pub fn write_message<W: Write>(
1375 mut writer: W,
1376 encoded: EncodedData,
1377 write_options: &IpcWriteOptions,
1378) -> Result<(usize, usize), ArrowError> {
1379 let arrow_data_len = encoded.arrow_data.len();
1380 if arrow_data_len % usize::from(write_options.alignment) != 0 {
1381 return Err(ArrowError::MemoryError(
1382 "Arrow data not aligned".to_string(),
1383 ));
1384 }
1385
1386 let a = usize::from(write_options.alignment - 1);
1387 let buffer = encoded.ipc_message;
1388 let flatbuf_size = buffer.len();
1389 let prefix_size = if write_options.write_legacy_ipc_format {
1390 4
1391 } else {
1392 8
1393 };
1394 let aligned_size = (flatbuf_size + prefix_size + a) & !a;
1395 let padding_bytes = aligned_size - flatbuf_size - prefix_size;
1396
1397 write_continuation(
1398 &mut writer,
1399 write_options,
1400 (aligned_size - prefix_size) as i32,
1401 )?;
1402
1403 if flatbuf_size > 0 {
1405 writer.write_all(&buffer)?;
1406 }
1407 writer.write_all(&PADDING[..padding_bytes])?;
1409
1410 let body_len = if arrow_data_len > 0 {
1412 write_body_buffers(&mut writer, &encoded.arrow_data, write_options.alignment)?
1413 } else {
1414 0
1415 };
1416
1417 Ok((aligned_size, body_len))
1418}
1419
1420fn write_body_buffers<W: Write>(
1421 mut writer: W,
1422 data: &[u8],
1423 alignment: u8,
1424) -> Result<usize, ArrowError> {
1425 let len = data.len();
1426 let pad_len = pad_to_alignment(alignment, len);
1427 let total_len = len + pad_len;
1428
1429 writer.write_all(data)?;
1431 if pad_len > 0 {
1432 writer.write_all(&PADDING[..pad_len])?;
1433 }
1434
1435 writer.flush()?;
1436 Ok(total_len)
1437}
1438
1439fn write_continuation<W: Write>(
1442 mut writer: W,
1443 write_options: &IpcWriteOptions,
1444 total_len: i32,
1445) -> Result<usize, ArrowError> {
1446 let mut written = 8;
1447
1448 match write_options.metadata_version {
1450 crate::MetadataVersion::V1 | crate::MetadataVersion::V2 | crate::MetadataVersion::V3 => {
1451 unreachable!("Options with the metadata version cannot be created")
1452 }
1453 crate::MetadataVersion::V4 => {
1454 if !write_options.write_legacy_ipc_format {
1455 writer.write_all(&CONTINUATION_MARKER)?;
1457 written = 4;
1458 }
1459 writer.write_all(&total_len.to_le_bytes()[..])?;
1460 }
1461 crate::MetadataVersion::V5 => {
1462 writer.write_all(&CONTINUATION_MARKER)?;
1464 writer.write_all(&total_len.to_le_bytes()[..])?;
1465 }
1466 z => panic!("Unsupported crate::MetadataVersion {z:?}"),
1467 };
1468
1469 writer.flush()?;
1470
1471 Ok(written)
1472}
1473
1474fn has_validity_bitmap(data_type: &DataType, write_options: &IpcWriteOptions) -> bool {
1478 if write_options.metadata_version < crate::MetadataVersion::V5 {
1479 !matches!(data_type, DataType::Null)
1480 } else {
1481 !matches!(
1482 data_type,
1483 DataType::Null | DataType::Union(_, _) | DataType::RunEndEncoded(_, _)
1484 )
1485 }
1486}
1487
1488#[inline]
1490fn buffer_need_truncate(
1491 array_offset: usize,
1492 buffer: &Buffer,
1493 spec: &BufferSpec,
1494 min_length: usize,
1495) -> bool {
1496 spec != &BufferSpec::AlwaysNull && (array_offset != 0 || min_length < buffer.len())
1497}
1498
1499#[inline]
1501fn get_buffer_element_width(spec: &BufferSpec) -> usize {
1502 match spec {
1503 BufferSpec::FixedWidth { byte_width, .. } => *byte_width,
1504 _ => 0,
1505 }
1506}
1507
1508fn reencode_offsets<O: OffsetSizeTrait>(
1511 offsets: &Buffer,
1512 data: &ArrayData,
1513) -> (Buffer, usize, usize) {
1514 let offsets_slice: &[O] = offsets.typed_data::<O>();
1515 let offset_slice = &offsets_slice[data.offset()..data.offset() + data.len() + 1];
1516
1517 let start_offset = offset_slice.first().unwrap();
1518 let end_offset = offset_slice.last().unwrap();
1519
1520 let offsets = match start_offset.as_usize() {
1521 0 => {
1522 let size = size_of::<O>();
1523 offsets.slice_with_length(data.offset() * size, (data.len() + 1) * size)
1524 }
1525 _ => offset_slice.iter().map(|x| *x - *start_offset).collect(),
1526 };
1527
1528 let start_offset = start_offset.as_usize();
1529 let end_offset = end_offset.as_usize();
1530
1531 (offsets, start_offset, end_offset - start_offset)
1532}
1533
1534fn get_byte_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, Buffer) {
1540 if data.is_empty() {
1541 return (MutableBuffer::new(0).into(), MutableBuffer::new(0).into());
1542 }
1543
1544 let (offsets, original_start_offset, len) = reencode_offsets::<O>(&data.buffers()[0], data);
1545 let values = data.buffers()[1].slice_with_length(original_start_offset, len);
1546 (offsets, values)
1547}
1548
1549fn get_list_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, ArrayData) {
1552 if data.is_empty() {
1553 return (
1554 MutableBuffer::new(0).into(),
1555 data.child_data()[0].slice(0, 0),
1556 );
1557 }
1558
1559 let (offsets, original_start_offset, len) = reencode_offsets::<O>(&data.buffers()[0], data);
1560 let child_data = data.child_data()[0].slice(original_start_offset, len);
1561 (offsets, child_data)
1562}
1563
1564#[allow(clippy::too_many_arguments)]
1566fn write_array_data(
1567 array_data: &ArrayData,
1568 buffers: &mut Vec<crate::Buffer>,
1569 arrow_data: &mut Vec<u8>,
1570 nodes: &mut Vec<crate::FieldNode>,
1571 offset: i64,
1572 num_rows: usize,
1573 null_count: usize,
1574 compression_codec: Option<CompressionCodec>,
1575 write_options: &IpcWriteOptions,
1576) -> Result<i64, ArrowError> {
1577 let mut offset = offset;
1578 if !matches!(array_data.data_type(), DataType::Null) {
1579 nodes.push(crate::FieldNode::new(num_rows as i64, null_count as i64));
1580 } else {
1581 nodes.push(crate::FieldNode::new(num_rows as i64, num_rows as i64));
1584 }
1585 if has_validity_bitmap(array_data.data_type(), write_options) {
1586 let null_buffer = match array_data.nulls() {
1588 None => {
1589 let num_bytes = bit_util::ceil(num_rows, 8);
1591 let buffer = MutableBuffer::new(num_bytes);
1592 let buffer = buffer.with_bitset(num_bytes, true);
1593 buffer.into()
1594 }
1595 Some(buffer) => buffer.inner().sliced(),
1596 };
1597
1598 offset = write_buffer(
1599 null_buffer.as_slice(),
1600 buffers,
1601 arrow_data,
1602 offset,
1603 compression_codec,
1604 write_options.alignment,
1605 )?;
1606 }
1607
1608 let data_type = array_data.data_type();
1609 if matches!(data_type, DataType::Binary | DataType::Utf8) {
1610 let (offsets, values) = get_byte_array_buffers::<i32>(array_data);
1611 for buffer in [offsets, values] {
1612 offset = write_buffer(
1613 buffer.as_slice(),
1614 buffers,
1615 arrow_data,
1616 offset,
1617 compression_codec,
1618 write_options.alignment,
1619 )?;
1620 }
1621 } else if matches!(data_type, DataType::BinaryView | DataType::Utf8View) {
1622 for buffer in array_data.buffers() {
1629 offset = write_buffer(
1630 buffer.as_slice(),
1631 buffers,
1632 arrow_data,
1633 offset,
1634 compression_codec,
1635 write_options.alignment,
1636 )?;
1637 }
1638 } else if matches!(data_type, DataType::LargeBinary | DataType::LargeUtf8) {
1639 let (offsets, values) = get_byte_array_buffers::<i64>(array_data);
1640 for buffer in [offsets, values] {
1641 offset = write_buffer(
1642 buffer.as_slice(),
1643 buffers,
1644 arrow_data,
1645 offset,
1646 compression_codec,
1647 write_options.alignment,
1648 )?;
1649 }
1650 } else if DataType::is_numeric(data_type)
1651 || DataType::is_temporal(data_type)
1652 || matches!(
1653 array_data.data_type(),
1654 DataType::FixedSizeBinary(_) | DataType::Dictionary(_, _)
1655 )
1656 {
1657 assert_eq!(array_data.buffers().len(), 1);
1659
1660 let buffer = &array_data.buffers()[0];
1661 let layout = layout(data_type);
1662 let spec = &layout.buffers[0];
1663
1664 let byte_width = get_buffer_element_width(spec);
1665 let min_length = array_data.len() * byte_width;
1666 let buffer_slice = if buffer_need_truncate(array_data.offset(), buffer, spec, min_length) {
1667 let byte_offset = array_data.offset() * byte_width;
1668 let buffer_length = min(min_length, buffer.len() - byte_offset);
1669 &buffer.as_slice()[byte_offset..(byte_offset + buffer_length)]
1670 } else {
1671 buffer.as_slice()
1672 };
1673 offset = write_buffer(
1674 buffer_slice,
1675 buffers,
1676 arrow_data,
1677 offset,
1678 compression_codec,
1679 write_options.alignment,
1680 )?;
1681 } else if matches!(data_type, DataType::Boolean) {
1682 assert_eq!(array_data.buffers().len(), 1);
1685
1686 let buffer = &array_data.buffers()[0];
1687 let buffer = buffer.bit_slice(array_data.offset(), array_data.len());
1688 offset = write_buffer(
1689 &buffer,
1690 buffers,
1691 arrow_data,
1692 offset,
1693 compression_codec,
1694 write_options.alignment,
1695 )?;
1696 } else if matches!(
1697 data_type,
1698 DataType::List(_) | DataType::LargeList(_) | DataType::Map(_, _)
1699 ) {
1700 assert_eq!(array_data.buffers().len(), 1);
1701 assert_eq!(array_data.child_data().len(), 1);
1702
1703 let (offsets, sliced_child_data) = match data_type {
1705 DataType::List(_) => get_list_array_buffers::<i32>(array_data),
1706 DataType::Map(_, _) => get_list_array_buffers::<i32>(array_data),
1707 DataType::LargeList(_) => get_list_array_buffers::<i64>(array_data),
1708 _ => unreachable!(),
1709 };
1710 offset = write_buffer(
1711 offsets.as_slice(),
1712 buffers,
1713 arrow_data,
1714 offset,
1715 compression_codec,
1716 write_options.alignment,
1717 )?;
1718 offset = write_array_data(
1719 &sliced_child_data,
1720 buffers,
1721 arrow_data,
1722 nodes,
1723 offset,
1724 sliced_child_data.len(),
1725 sliced_child_data.null_count(),
1726 compression_codec,
1727 write_options,
1728 )?;
1729 return Ok(offset);
1730 } else if let DataType::FixedSizeList(_, fixed_size) = data_type {
1731 assert_eq!(array_data.child_data().len(), 1);
1732 let fixed_size = *fixed_size as usize;
1733
1734 let child_offset = array_data.offset() * fixed_size;
1735 let child_length = array_data.len() * fixed_size;
1736 let child_data = array_data.child_data()[0].slice(child_offset, child_length);
1737
1738 offset = write_array_data(
1739 &child_data,
1740 buffers,
1741 arrow_data,
1742 nodes,
1743 offset,
1744 child_data.len(),
1745 child_data.null_count(),
1746 compression_codec,
1747 write_options,
1748 )?;
1749 return Ok(offset);
1750 } else {
1751 for buffer in array_data.buffers() {
1752 offset = write_buffer(
1753 buffer,
1754 buffers,
1755 arrow_data,
1756 offset,
1757 compression_codec,
1758 write_options.alignment,
1759 )?;
1760 }
1761 }
1762
1763 match array_data.data_type() {
1764 DataType::Dictionary(_, _) => {}
1765 DataType::RunEndEncoded(_, _) => {
1766 let arr = unslice_run_array(array_data.clone())?;
1768 for data_ref in arr.child_data() {
1770 offset = write_array_data(
1772 data_ref,
1773 buffers,
1774 arrow_data,
1775 nodes,
1776 offset,
1777 data_ref.len(),
1778 data_ref.null_count(),
1779 compression_codec,
1780 write_options,
1781 )?;
1782 }
1783 }
1784 _ => {
1785 for data_ref in array_data.child_data() {
1787 offset = write_array_data(
1789 data_ref,
1790 buffers,
1791 arrow_data,
1792 nodes,
1793 offset,
1794 data_ref.len(),
1795 data_ref.null_count(),
1796 compression_codec,
1797 write_options,
1798 )?;
1799 }
1800 }
1801 }
1802 Ok(offset)
1803}
1804
1805fn write_buffer(
1818 buffer: &[u8], buffers: &mut Vec<crate::Buffer>, arrow_data: &mut Vec<u8>, offset: i64, compression_codec: Option<CompressionCodec>,
1823 alignment: u8,
1824) -> Result<i64, ArrowError> {
1825 let len: i64 = match compression_codec {
1826 Some(compressor) => compressor.compress_to_vec(buffer, arrow_data)?,
1827 None => {
1828 arrow_data.extend_from_slice(buffer);
1829 buffer.len()
1830 }
1831 }
1832 .try_into()
1833 .map_err(|e| {
1834 ArrowError::InvalidArgumentError(format!("Could not convert compressed size to i64: {e}"))
1835 })?;
1836
1837 buffers.push(crate::Buffer::new(offset, len));
1839 let pad_len = pad_to_alignment(alignment, len as usize);
1841 arrow_data.extend_from_slice(&PADDING[..pad_len]);
1842
1843 Ok(offset + len + (pad_len as i64))
1844}
1845
1846const PADDING: [u8; 64] = [0; 64];
1847
1848#[inline]
1850fn pad_to_alignment(alignment: u8, len: usize) -> usize {
1851 let a = usize::from(alignment - 1);
1852 ((len + a) & !a) - len
1853}
1854
1855#[cfg(test)]
1856mod tests {
1857 use std::hash::Hasher;
1858 use std::io::Cursor;
1859 use std::io::Seek;
1860
1861 use arrow_array::builder::FixedSizeListBuilder;
1862 use arrow_array::builder::Float32Builder;
1863 use arrow_array::builder::Int64Builder;
1864 use arrow_array::builder::MapBuilder;
1865 use arrow_array::builder::UnionBuilder;
1866 use arrow_array::builder::{GenericListBuilder, ListBuilder, StringBuilder};
1867 use arrow_array::builder::{PrimitiveRunBuilder, UInt32Builder};
1868 use arrow_array::types::*;
1869 use arrow_buffer::ScalarBuffer;
1870
1871 use crate::convert::fb_to_schema;
1872 use crate::reader::*;
1873 use crate::root_as_footer;
1874 use crate::MetadataVersion;
1875
1876 use super::*;
1877
1878 fn serialize_file(rb: &RecordBatch) -> Vec<u8> {
1879 let mut writer = FileWriter::try_new(vec![], rb.schema_ref()).unwrap();
1880 writer.write(rb).unwrap();
1881 writer.finish().unwrap();
1882 writer.into_inner().unwrap()
1883 }
1884
1885 fn deserialize_file(bytes: Vec<u8>) -> RecordBatch {
1886 let mut reader = FileReader::try_new(Cursor::new(bytes), None).unwrap();
1887 reader.next().unwrap().unwrap()
1888 }
1889
1890 fn serialize_stream(record: &RecordBatch) -> Vec<u8> {
1891 const IPC_ALIGNMENT: usize = 8;
1895
1896 let mut stream_writer = StreamWriter::try_new_with_options(
1897 vec![],
1898 record.schema_ref(),
1899 IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
1900 )
1901 .unwrap();
1902 stream_writer.write(record).unwrap();
1903 stream_writer.finish().unwrap();
1904 stream_writer.into_inner().unwrap()
1905 }
1906
1907 fn deserialize_stream(bytes: Vec<u8>) -> RecordBatch {
1908 let mut stream_reader = StreamReader::try_new(Cursor::new(bytes), None).unwrap();
1909 stream_reader.next().unwrap().unwrap()
1910 }
1911
1912 #[test]
1913 #[cfg(feature = "lz4")]
1914 fn test_write_empty_record_batch_lz4_compression() {
1915 let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
1916 let values: Vec<Option<i32>> = vec![];
1917 let array = Int32Array::from(values);
1918 let record_batch =
1919 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
1920
1921 let mut file = tempfile::tempfile().unwrap();
1922
1923 {
1924 let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
1925 .unwrap()
1926 .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
1927 .unwrap();
1928
1929 let mut writer =
1930 FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
1931 writer.write(&record_batch).unwrap();
1932 writer.finish().unwrap();
1933 }
1934 file.rewind().unwrap();
1935 {
1936 let reader = FileReader::try_new(file, None).unwrap();
1938 for read_batch in reader {
1939 read_batch
1940 .unwrap()
1941 .columns()
1942 .iter()
1943 .zip(record_batch.columns())
1944 .for_each(|(a, b)| {
1945 assert_eq!(a.data_type(), b.data_type());
1946 assert_eq!(a.len(), b.len());
1947 assert_eq!(a.null_count(), b.null_count());
1948 });
1949 }
1950 }
1951 }
1952
1953 #[test]
1954 #[cfg(feature = "lz4")]
1955 fn test_write_file_with_lz4_compression() {
1956 let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
1957 let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
1958 let array = Int32Array::from(values);
1959 let record_batch =
1960 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
1961
1962 let mut file = tempfile::tempfile().unwrap();
1963 {
1964 let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
1965 .unwrap()
1966 .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
1967 .unwrap();
1968
1969 let mut writer =
1970 FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
1971 writer.write(&record_batch).unwrap();
1972 writer.finish().unwrap();
1973 }
1974 file.rewind().unwrap();
1975 {
1976 let reader = FileReader::try_new(file, None).unwrap();
1978 for read_batch in reader {
1979 read_batch
1980 .unwrap()
1981 .columns()
1982 .iter()
1983 .zip(record_batch.columns())
1984 .for_each(|(a, b)| {
1985 assert_eq!(a.data_type(), b.data_type());
1986 assert_eq!(a.len(), b.len());
1987 assert_eq!(a.null_count(), b.null_count());
1988 });
1989 }
1990 }
1991 }
1992
1993 #[test]
1994 #[cfg(feature = "zstd")]
1995 fn test_write_file_with_zstd_compression() {
1996 let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
1997 let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
1998 let array = Int32Array::from(values);
1999 let record_batch =
2000 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2001 let mut file = tempfile::tempfile().unwrap();
2002 {
2003 let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2004 .unwrap()
2005 .try_with_compression(Some(crate::CompressionType::ZSTD))
2006 .unwrap();
2007
2008 let mut writer =
2009 FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2010 writer.write(&record_batch).unwrap();
2011 writer.finish().unwrap();
2012 }
2013 file.rewind().unwrap();
2014 {
2015 let reader = FileReader::try_new(file, None).unwrap();
2017 for read_batch in reader {
2018 read_batch
2019 .unwrap()
2020 .columns()
2021 .iter()
2022 .zip(record_batch.columns())
2023 .for_each(|(a, b)| {
2024 assert_eq!(a.data_type(), b.data_type());
2025 assert_eq!(a.len(), b.len());
2026 assert_eq!(a.null_count(), b.null_count());
2027 });
2028 }
2029 }
2030 }
2031
2032 #[test]
2033 fn test_write_file() {
2034 let schema = Schema::new(vec![Field::new("field1", DataType::UInt32, true)]);
2035 let values: Vec<Option<u32>> = vec![
2036 Some(999),
2037 None,
2038 Some(235),
2039 Some(123),
2040 None,
2041 None,
2042 None,
2043 None,
2044 None,
2045 ];
2046 let array1 = UInt32Array::from(values);
2047 let batch =
2048 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array1) as ArrayRef])
2049 .unwrap();
2050 let mut file = tempfile::tempfile().unwrap();
2051 {
2052 let mut writer = FileWriter::try_new(&mut file, &schema).unwrap();
2053
2054 writer.write(&batch).unwrap();
2055 writer.finish().unwrap();
2056 }
2057 file.rewind().unwrap();
2058
2059 {
2060 let mut reader = FileReader::try_new(file, None).unwrap();
2061 while let Some(Ok(read_batch)) = reader.next() {
2062 read_batch
2063 .columns()
2064 .iter()
2065 .zip(batch.columns())
2066 .for_each(|(a, b)| {
2067 assert_eq!(a.data_type(), b.data_type());
2068 assert_eq!(a.len(), b.len());
2069 assert_eq!(a.null_count(), b.null_count());
2070 });
2071 }
2072 }
2073 }
2074
2075 fn write_null_file(options: IpcWriteOptions) {
2076 let schema = Schema::new(vec![
2077 Field::new("nulls", DataType::Null, true),
2078 Field::new("int32s", DataType::Int32, false),
2079 Field::new("nulls2", DataType::Null, true),
2080 Field::new("f64s", DataType::Float64, false),
2081 ]);
2082 let array1 = NullArray::new(32);
2083 let array2 = Int32Array::from(vec![1; 32]);
2084 let array3 = NullArray::new(32);
2085 let array4 = Float64Array::from(vec![f64::NAN; 32]);
2086 let batch = RecordBatch::try_new(
2087 Arc::new(schema.clone()),
2088 vec![
2089 Arc::new(array1) as ArrayRef,
2090 Arc::new(array2) as ArrayRef,
2091 Arc::new(array3) as ArrayRef,
2092 Arc::new(array4) as ArrayRef,
2093 ],
2094 )
2095 .unwrap();
2096 let mut file = tempfile::tempfile().unwrap();
2097 {
2098 let mut writer = FileWriter::try_new_with_options(&mut file, &schema, options).unwrap();
2099
2100 writer.write(&batch).unwrap();
2101 writer.finish().unwrap();
2102 }
2103
2104 file.rewind().unwrap();
2105
2106 {
2107 let reader = FileReader::try_new(file, None).unwrap();
2108 reader.for_each(|maybe_batch| {
2109 maybe_batch
2110 .unwrap()
2111 .columns()
2112 .iter()
2113 .zip(batch.columns())
2114 .for_each(|(a, b)| {
2115 assert_eq!(a.data_type(), b.data_type());
2116 assert_eq!(a.len(), b.len());
2117 assert_eq!(a.null_count(), b.null_count());
2118 });
2119 });
2120 }
2121 }
2122 #[test]
2123 fn test_write_null_file_v4() {
2124 write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap());
2125 write_null_file(IpcWriteOptions::try_new(8, true, MetadataVersion::V4).unwrap());
2126 write_null_file(IpcWriteOptions::try_new(64, false, MetadataVersion::V4).unwrap());
2127 write_null_file(IpcWriteOptions::try_new(64, true, MetadataVersion::V4).unwrap());
2128 }
2129
2130 #[test]
2131 fn test_write_null_file_v5() {
2132 write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap());
2133 write_null_file(IpcWriteOptions::try_new(64, false, MetadataVersion::V5).unwrap());
2134 }
2135
2136 #[test]
2137 fn track_union_nested_dict() {
2138 let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2139
2140 let array = Arc::new(inner) as ArrayRef;
2141
2142 #[allow(deprecated)]
2144 let dctfield = Field::new_dict("dict", array.data_type().clone(), false, 2, false);
2145 let union_fields = [(0, Arc::new(dctfield))].into_iter().collect();
2146
2147 let types = [0, 0, 0].into_iter().collect::<ScalarBuffer<i8>>();
2148 let offsets = [0, 1, 2].into_iter().collect::<ScalarBuffer<i32>>();
2149
2150 let union = UnionArray::try_new(union_fields, types, Some(offsets), vec![array]).unwrap();
2151
2152 let schema = Arc::new(Schema::new(vec![Field::new(
2153 "union",
2154 union.data_type().clone(),
2155 false,
2156 )]));
2157
2158 let batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap();
2159
2160 let gen = IpcDataGenerator {};
2161 #[allow(deprecated)]
2162 let mut dict_tracker = DictionaryTracker::new_with_preserve_dict_id(false, true);
2163 gen.encoded_batch(&batch, &mut dict_tracker, &Default::default())
2164 .unwrap();
2165
2166 assert!(dict_tracker.written.contains_key(&2));
2169 }
2170
2171 #[test]
2172 fn track_struct_nested_dict() {
2173 let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2174
2175 let array = Arc::new(inner) as ArrayRef;
2176
2177 #[allow(deprecated)]
2179 let dctfield = Arc::new(Field::new_dict(
2180 "dict",
2181 array.data_type().clone(),
2182 false,
2183 2,
2184 false,
2185 ));
2186
2187 let s = StructArray::from(vec![(dctfield, array)]);
2188 let struct_array = Arc::new(s) as ArrayRef;
2189
2190 let schema = Arc::new(Schema::new(vec![Field::new(
2191 "struct",
2192 struct_array.data_type().clone(),
2193 false,
2194 )]));
2195
2196 let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
2197
2198 let gen = IpcDataGenerator {};
2199 #[allow(deprecated)]
2200 let mut dict_tracker = DictionaryTracker::new_with_preserve_dict_id(false, true);
2201 gen.encoded_batch(&batch, &mut dict_tracker, &Default::default())
2202 .unwrap();
2203
2204 assert!(dict_tracker.written.contains_key(&2));
2205 }
2206
2207 fn write_union_file(options: IpcWriteOptions) {
2208 let schema = Schema::new(vec![Field::new_union(
2209 "union",
2210 vec![0, 1],
2211 vec![
2212 Field::new("a", DataType::Int32, false),
2213 Field::new("c", DataType::Float64, false),
2214 ],
2215 UnionMode::Sparse,
2216 )]);
2217 let mut builder = UnionBuilder::with_capacity_sparse(5);
2218 builder.append::<Int32Type>("a", 1).unwrap();
2219 builder.append_null::<Int32Type>("a").unwrap();
2220 builder.append::<Float64Type>("c", 3.0).unwrap();
2221 builder.append_null::<Float64Type>("c").unwrap();
2222 builder.append::<Int32Type>("a", 4).unwrap();
2223 let union = builder.build().unwrap();
2224
2225 let batch =
2226 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(union) as ArrayRef])
2227 .unwrap();
2228
2229 let mut file = tempfile::tempfile().unwrap();
2230 {
2231 let mut writer = FileWriter::try_new_with_options(&mut file, &schema, options).unwrap();
2232
2233 writer.write(&batch).unwrap();
2234 writer.finish().unwrap();
2235 }
2236 file.rewind().unwrap();
2237
2238 {
2239 let reader = FileReader::try_new(file, None).unwrap();
2240 reader.for_each(|maybe_batch| {
2241 maybe_batch
2242 .unwrap()
2243 .columns()
2244 .iter()
2245 .zip(batch.columns())
2246 .for_each(|(a, b)| {
2247 assert_eq!(a.data_type(), b.data_type());
2248 assert_eq!(a.len(), b.len());
2249 assert_eq!(a.null_count(), b.null_count());
2250 });
2251 });
2252 }
2253 }
2254
2255 #[test]
2256 fn test_write_union_file_v4_v5() {
2257 write_union_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap());
2258 write_union_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap());
2259 }
2260
2261 #[test]
2262 fn test_write_view_types() {
2263 const LONG_TEST_STRING: &str =
2264 "This is a long string to make sure binary view array handles it";
2265 let schema = Schema::new(vec![
2266 Field::new("field1", DataType::BinaryView, true),
2267 Field::new("field2", DataType::Utf8View, true),
2268 ]);
2269 let values: Vec<Option<&[u8]>> = vec![
2270 Some(b"foo"),
2271 Some(b"bar"),
2272 Some(LONG_TEST_STRING.as_bytes()),
2273 ];
2274 let binary_array = BinaryViewArray::from_iter(values);
2275 let utf8_array =
2276 StringViewArray::from_iter(vec![Some("foo"), Some("bar"), Some(LONG_TEST_STRING)]);
2277 let record_batch = RecordBatch::try_new(
2278 Arc::new(schema.clone()),
2279 vec![Arc::new(binary_array), Arc::new(utf8_array)],
2280 )
2281 .unwrap();
2282
2283 let mut file = tempfile::tempfile().unwrap();
2284 {
2285 let mut writer = FileWriter::try_new(&mut file, &schema).unwrap();
2286 writer.write(&record_batch).unwrap();
2287 writer.finish().unwrap();
2288 }
2289 file.rewind().unwrap();
2290 {
2291 let mut reader = FileReader::try_new(&file, None).unwrap();
2292 let read_batch = reader.next().unwrap().unwrap();
2293 read_batch
2294 .columns()
2295 .iter()
2296 .zip(record_batch.columns())
2297 .for_each(|(a, b)| {
2298 assert_eq!(a, b);
2299 });
2300 }
2301 file.rewind().unwrap();
2302 {
2303 let mut reader = FileReader::try_new(&file, Some(vec![0])).unwrap();
2304 let read_batch = reader.next().unwrap().unwrap();
2305 assert_eq!(read_batch.num_columns(), 1);
2306 let read_array = read_batch.column(0);
2307 let write_array = record_batch.column(0);
2308 assert_eq!(read_array, write_array);
2309 }
2310 }
2311
2312 #[test]
2313 fn truncate_ipc_record_batch() {
2314 fn create_batch(rows: usize) -> RecordBatch {
2315 let schema = Schema::new(vec![
2316 Field::new("a", DataType::Int32, false),
2317 Field::new("b", DataType::Utf8, false),
2318 ]);
2319
2320 let a = Int32Array::from_iter_values(0..rows as i32);
2321 let b = StringArray::from_iter_values((0..rows).map(|i| i.to_string()));
2322
2323 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap()
2324 }
2325
2326 let big_record_batch = create_batch(65536);
2327
2328 let length = 5;
2329 let small_record_batch = create_batch(length);
2330
2331 let offset = 2;
2332 let record_batch_slice = big_record_batch.slice(offset, length);
2333 assert!(
2334 serialize_stream(&big_record_batch).len() > serialize_stream(&small_record_batch).len()
2335 );
2336 assert_eq!(
2337 serialize_stream(&small_record_batch).len(),
2338 serialize_stream(&record_batch_slice).len()
2339 );
2340
2341 assert_eq!(
2342 deserialize_stream(serialize_stream(&record_batch_slice)),
2343 record_batch_slice
2344 );
2345 }
2346
2347 #[test]
2348 fn truncate_ipc_record_batch_with_nulls() {
2349 fn create_batch() -> RecordBatch {
2350 let schema = Schema::new(vec![
2351 Field::new("a", DataType::Int32, true),
2352 Field::new("b", DataType::Utf8, true),
2353 ]);
2354
2355 let a = Int32Array::from(vec![Some(1), None, Some(1), None, Some(1)]);
2356 let b = StringArray::from(vec![None, Some("a"), Some("a"), None, Some("a")]);
2357
2358 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap()
2359 }
2360
2361 let record_batch = create_batch();
2362 let record_batch_slice = record_batch.slice(1, 2);
2363 let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2364
2365 assert!(
2366 serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2367 );
2368
2369 assert!(deserialized_batch.column(0).is_null(0));
2370 assert!(deserialized_batch.column(0).is_valid(1));
2371 assert!(deserialized_batch.column(1).is_valid(0));
2372 assert!(deserialized_batch.column(1).is_valid(1));
2373
2374 assert_eq!(record_batch_slice, deserialized_batch);
2375 }
2376
2377 #[test]
2378 fn truncate_ipc_dictionary_array() {
2379 fn create_batch() -> RecordBatch {
2380 let values: StringArray = [Some("foo"), Some("bar"), Some("baz")]
2381 .into_iter()
2382 .collect();
2383 let keys: Int32Array = [Some(0), Some(2), None, Some(1)].into_iter().collect();
2384
2385 let array = DictionaryArray::new(keys, Arc::new(values));
2386
2387 let schema = Schema::new(vec![Field::new("dict", array.data_type().clone(), true)]);
2388
2389 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap()
2390 }
2391
2392 let record_batch = create_batch();
2393 let record_batch_slice = record_batch.slice(1, 2);
2394 let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2395
2396 assert!(
2397 serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2398 );
2399
2400 assert!(deserialized_batch.column(0).is_valid(0));
2401 assert!(deserialized_batch.column(0).is_null(1));
2402
2403 assert_eq!(record_batch_slice, deserialized_batch);
2404 }
2405
2406 #[test]
2407 fn truncate_ipc_struct_array() {
2408 fn create_batch() -> RecordBatch {
2409 let strings: StringArray = [Some("foo"), None, Some("bar"), Some("baz")]
2410 .into_iter()
2411 .collect();
2412 let ints: Int32Array = [Some(0), Some(2), None, Some(1)].into_iter().collect();
2413
2414 let struct_array = StructArray::from(vec![
2415 (
2416 Arc::new(Field::new("s", DataType::Utf8, true)),
2417 Arc::new(strings) as ArrayRef,
2418 ),
2419 (
2420 Arc::new(Field::new("c", DataType::Int32, true)),
2421 Arc::new(ints) as ArrayRef,
2422 ),
2423 ]);
2424
2425 let schema = Schema::new(vec![Field::new(
2426 "struct_array",
2427 struct_array.data_type().clone(),
2428 true,
2429 )]);
2430
2431 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(struct_array)]).unwrap()
2432 }
2433
2434 let record_batch = create_batch();
2435 let record_batch_slice = record_batch.slice(1, 2);
2436 let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2437
2438 assert!(
2439 serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2440 );
2441
2442 let structs = deserialized_batch
2443 .column(0)
2444 .as_any()
2445 .downcast_ref::<StructArray>()
2446 .unwrap();
2447
2448 assert!(structs.column(0).is_null(0));
2449 assert!(structs.column(0).is_valid(1));
2450 assert!(structs.column(1).is_valid(0));
2451 assert!(structs.column(1).is_null(1));
2452 assert_eq!(record_batch_slice, deserialized_batch);
2453 }
2454
2455 #[test]
2456 fn truncate_ipc_string_array_with_all_empty_string() {
2457 fn create_batch() -> RecordBatch {
2458 let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
2459 let a = StringArray::from(vec![Some(""), Some(""), Some(""), Some(""), Some("")]);
2460 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap()
2461 }
2462
2463 let record_batch = create_batch();
2464 let record_batch_slice = record_batch.slice(0, 1);
2465 let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2466
2467 assert!(
2468 serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2469 );
2470 assert_eq!(record_batch_slice, deserialized_batch);
2471 }
2472
2473 #[test]
2474 fn test_stream_writer_writes_array_slice() {
2475 let array = UInt32Array::from(vec![Some(1), Some(2), Some(3)]);
2476 assert_eq!(
2477 vec![Some(1), Some(2), Some(3)],
2478 array.iter().collect::<Vec<_>>()
2479 );
2480
2481 let sliced = array.slice(1, 2);
2482 assert_eq!(vec![Some(2), Some(3)], sliced.iter().collect::<Vec<_>>());
2483
2484 let batch = RecordBatch::try_new(
2485 Arc::new(Schema::new(vec![Field::new("a", DataType::UInt32, true)])),
2486 vec![Arc::new(sliced)],
2487 )
2488 .expect("new batch");
2489
2490 let mut writer = StreamWriter::try_new(vec![], batch.schema_ref()).expect("new writer");
2491 writer.write(&batch).expect("write");
2492 let outbuf = writer.into_inner().expect("inner");
2493
2494 let mut reader = StreamReader::try_new(&outbuf[..], None).expect("new reader");
2495 let read_batch = reader.next().unwrap().expect("read batch");
2496
2497 let read_array: &UInt32Array = read_batch.column(0).as_primitive();
2498 assert_eq!(
2499 vec![Some(2), Some(3)],
2500 read_array.iter().collect::<Vec<_>>()
2501 );
2502 }
2503
2504 #[test]
2505 fn test_large_slice_uint32() {
2506 ensure_roundtrip(Arc::new(UInt32Array::from_iter((0..8000).map(|i| {
2507 if i % 2 == 0 {
2508 Some(i)
2509 } else {
2510 None
2511 }
2512 }))));
2513 }
2514
2515 #[test]
2516 fn test_large_slice_string() {
2517 let strings: Vec<_> = (0..8000)
2518 .map(|i| {
2519 if i % 2 == 0 {
2520 Some(format!("value{}", i))
2521 } else {
2522 None
2523 }
2524 })
2525 .collect();
2526
2527 ensure_roundtrip(Arc::new(StringArray::from(strings)));
2528 }
2529
2530 #[test]
2531 fn test_large_slice_string_list() {
2532 let mut ls = ListBuilder::new(StringBuilder::new());
2533
2534 let mut s = String::new();
2535 for row_number in 0..8000 {
2536 if row_number % 2 == 0 {
2537 for list_element in 0..1000 {
2538 s.clear();
2539 use std::fmt::Write;
2540 write!(&mut s, "value{row_number}-{list_element}").unwrap();
2541 ls.values().append_value(&s);
2542 }
2543 ls.append(true)
2544 } else {
2545 ls.append(false); }
2547 }
2548
2549 ensure_roundtrip(Arc::new(ls.finish()));
2550 }
2551
2552 #[test]
2553 fn test_large_slice_string_list_of_lists() {
2554 let mut ls = ListBuilder::new(ListBuilder::new(StringBuilder::new()));
2558
2559 for _ in 0..4000 {
2560 ls.values().append(true);
2561 ls.append(true)
2562 }
2563
2564 let mut s = String::new();
2565 for row_number in 0..4000 {
2566 if row_number % 2 == 0 {
2567 for list_element in 0..1000 {
2568 s.clear();
2569 use std::fmt::Write;
2570 write!(&mut s, "value{row_number}-{list_element}").unwrap();
2571 ls.values().values().append_value(&s);
2572 }
2573 ls.values().append(true);
2574 ls.append(true)
2575 } else {
2576 ls.append(false); }
2578 }
2579
2580 ensure_roundtrip(Arc::new(ls.finish()));
2581 }
2582
2583 fn ensure_roundtrip(array: ArrayRef) {
2585 let num_rows = array.len();
2586 let orig_batch = RecordBatch::try_from_iter(vec![("a", array)]).unwrap();
2587 let sliced_batch = orig_batch.slice(1, num_rows - 1);
2589
2590 let schema = orig_batch.schema();
2591 let stream_data = {
2592 let mut writer = StreamWriter::try_new(vec![], &schema).unwrap();
2593 writer.write(&sliced_batch).unwrap();
2594 writer.into_inner().unwrap()
2595 };
2596 let read_batch = {
2597 let projection = None;
2598 let mut reader = StreamReader::try_new(Cursor::new(stream_data), projection).unwrap();
2599 reader
2600 .next()
2601 .expect("expect no errors reading batch")
2602 .expect("expect batch")
2603 };
2604 assert_eq!(sliced_batch, read_batch);
2605
2606 let file_data = {
2607 let mut writer = FileWriter::try_new_buffered(vec![], &schema).unwrap();
2608 writer.write(&sliced_batch).unwrap();
2609 writer.into_inner().unwrap().into_inner().unwrap()
2610 };
2611 let read_batch = {
2612 let projection = None;
2613 let mut reader = FileReader::try_new(Cursor::new(file_data), projection).unwrap();
2614 reader
2615 .next()
2616 .expect("expect no errors reading batch")
2617 .expect("expect batch")
2618 };
2619 assert_eq!(sliced_batch, read_batch);
2620
2621 }
2623
2624 #[test]
2625 fn encode_bools_slice() {
2626 assert_bool_roundtrip([true, false], 1, 1);
2628
2629 assert_bool_roundtrip(
2631 [
2632 true, false, true, true, false, false, true, true, true, false, false, false, true,
2633 true, true, true, false, false, false, false, true, true, true, true, true, false,
2634 false, false, false, false,
2635 ],
2636 13,
2637 17,
2638 );
2639
2640 assert_bool_roundtrip(
2642 [
2643 true, false, true, true, false, false, true, true, true, false, false, false,
2644 ],
2645 8,
2646 2,
2647 );
2648
2649 assert_bool_roundtrip(
2651 [
2652 true, false, true, true, false, false, true, true, true, false, false, false, true,
2653 true, true, true, true, false, false, false, false, false,
2654 ],
2655 8,
2656 8,
2657 );
2658 }
2659
2660 fn assert_bool_roundtrip<const N: usize>(bools: [bool; N], offset: usize, length: usize) {
2661 let val_bool_field = Field::new("val", DataType::Boolean, false);
2662
2663 let schema = Arc::new(Schema::new(vec![val_bool_field]));
2664
2665 let bools = BooleanArray::from(bools.to_vec());
2666
2667 let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(bools)]).unwrap();
2668 let batch = batch.slice(offset, length);
2669
2670 let data = serialize_stream(&batch);
2671 let batch2 = deserialize_stream(data);
2672 assert_eq!(batch, batch2);
2673 }
2674
2675 #[test]
2676 fn test_run_array_unslice() {
2677 let total_len = 80;
2678 let vals: Vec<Option<i32>> = vec![Some(1), None, Some(2), Some(3), Some(4), None, Some(5)];
2679 let repeats: Vec<usize> = vec![3, 4, 1, 2];
2680 let mut input_array: Vec<Option<i32>> = Vec::with_capacity(total_len);
2681 for ix in 0_usize..32 {
2682 let repeat: usize = repeats[ix % repeats.len()];
2683 let val: Option<i32> = vals[ix % vals.len()];
2684 input_array.resize(input_array.len() + repeat, val);
2685 }
2686
2687 let mut builder =
2689 PrimitiveRunBuilder::<Int16Type, Int32Type>::with_capacity(input_array.len());
2690 builder.extend(input_array.iter().copied());
2691 let run_array = builder.finish();
2692
2693 for slice_len in 1..=total_len {
2695 let sliced_run_array: RunArray<Int16Type> =
2697 run_array.slice(0, slice_len).into_data().into();
2698
2699 let unsliced_run_array = into_zero_offset_run_array(sliced_run_array).unwrap();
2701 let typed = unsliced_run_array
2702 .downcast::<PrimitiveArray<Int32Type>>()
2703 .unwrap();
2704 let expected: Vec<Option<i32>> = input_array.iter().take(slice_len).copied().collect();
2705 let actual: Vec<Option<i32>> = typed.into_iter().collect();
2706 assert_eq!(expected, actual);
2707
2708 let sliced_run_array: RunArray<Int16Type> = run_array
2710 .slice(total_len - slice_len, slice_len)
2711 .into_data()
2712 .into();
2713
2714 let unsliced_run_array = into_zero_offset_run_array(sliced_run_array).unwrap();
2716 let typed = unsliced_run_array
2717 .downcast::<PrimitiveArray<Int32Type>>()
2718 .unwrap();
2719 let expected: Vec<Option<i32>> = input_array
2720 .iter()
2721 .skip(total_len - slice_len)
2722 .copied()
2723 .collect();
2724 let actual: Vec<Option<i32>> = typed.into_iter().collect();
2725 assert_eq!(expected, actual);
2726 }
2727 }
2728
2729 fn generate_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
2730 let mut ls = GenericListBuilder::<O, _>::new(UInt32Builder::new());
2731
2732 for i in 0..100_000 {
2733 for value in [i, i, i] {
2734 ls.values().append_value(value);
2735 }
2736 ls.append(true)
2737 }
2738
2739 ls.finish()
2740 }
2741
2742 fn generate_nested_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
2743 let mut ls =
2744 GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(UInt32Builder::new()));
2745
2746 for _i in 0..10_000 {
2747 for j in 0..10 {
2748 for value in [j, j, j, j] {
2749 ls.values().values().append_value(value);
2750 }
2751 ls.values().append(true)
2752 }
2753 ls.append(true);
2754 }
2755
2756 ls.finish()
2757 }
2758
2759 fn generate_nested_list_data_starting_at_zero<O: OffsetSizeTrait>() -> GenericListArray<O> {
2760 let mut ls =
2761 GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(UInt32Builder::new()));
2762
2763 for _i in 0..999 {
2764 ls.values().append(true);
2765 ls.append(true);
2766 }
2767
2768 for j in 0..10 {
2769 for value in [j, j, j, j] {
2770 ls.values().values().append_value(value);
2771 }
2772 ls.values().append(true)
2773 }
2774 ls.append(true);
2775
2776 for i in 0..9_000 {
2777 for j in 0..10 {
2778 for value in [i + j, i + j, i + j, i + j] {
2779 ls.values().values().append_value(value);
2780 }
2781 ls.values().append(true)
2782 }
2783 ls.append(true);
2784 }
2785
2786 ls.finish()
2787 }
2788
2789 fn generate_map_array_data() -> MapArray {
2790 let keys_builder = UInt32Builder::new();
2791 let values_builder = UInt32Builder::new();
2792
2793 let mut builder = MapBuilder::new(None, keys_builder, values_builder);
2794
2795 for i in 0..100_000 {
2796 for _j in 0..3 {
2797 builder.keys().append_value(i);
2798 builder.values().append_value(i * 2);
2799 }
2800 builder.append(true).unwrap();
2801 }
2802
2803 builder.finish()
2804 }
2805
2806 #[test]
2807 fn reencode_offsets_when_first_offset_is_not_zero() {
2808 let original_list = generate_list_data::<i32>();
2809 let original_data = original_list.into_data();
2810 let slice_data = original_data.slice(75, 7);
2811 let (new_offsets, original_start, length) =
2812 reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
2813 assert_eq!(
2814 vec![0, 3, 6, 9, 12, 15, 18, 21],
2815 new_offsets.typed_data::<i32>()
2816 );
2817 assert_eq!(225, original_start);
2818 assert_eq!(21, length);
2819 }
2820
2821 #[test]
2822 fn reencode_offsets_when_first_offset_is_zero() {
2823 let mut ls = GenericListBuilder::<i32, _>::new(UInt32Builder::new());
2824 ls.append(true);
2826 ls.values().append_value(35);
2827 ls.values().append_value(42);
2828 ls.append(true);
2829 let original_list = ls.finish();
2830 let original_data = original_list.into_data();
2831
2832 let slice_data = original_data.slice(1, 1);
2833 let (new_offsets, original_start, length) =
2834 reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
2835 assert_eq!(vec![0, 2], new_offsets.typed_data::<i32>());
2836 assert_eq!(0, original_start);
2837 assert_eq!(2, length);
2838 }
2839
2840 fn roundtrip_ensure_sliced_smaller(in_batch: RecordBatch, expected_size_factor: usize) {
2843 let in_sliced = in_batch.slice(999, 1);
2845
2846 let bytes_batch = serialize_file(&in_batch);
2847 let bytes_sliced = serialize_file(&in_sliced);
2848
2849 assert!(bytes_sliced.len() < (bytes_batch.len() / expected_size_factor));
2851
2852 let out_batch = deserialize_file(bytes_batch);
2854 assert_eq!(in_batch, out_batch);
2855
2856 let out_sliced = deserialize_file(bytes_sliced);
2857 assert_eq!(in_sliced, out_sliced);
2858 }
2859
2860 #[test]
2861 fn encode_lists() {
2862 let val_inner = Field::new_list_field(DataType::UInt32, true);
2863 let val_list_field = Field::new("val", DataType::List(Arc::new(val_inner)), false);
2864 let schema = Arc::new(Schema::new(vec![val_list_field]));
2865
2866 let values = Arc::new(generate_list_data::<i32>());
2867
2868 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2869 roundtrip_ensure_sliced_smaller(in_batch, 1000);
2870 }
2871
2872 #[test]
2873 fn encode_empty_list() {
2874 let val_inner = Field::new_list_field(DataType::UInt32, true);
2875 let val_list_field = Field::new("val", DataType::List(Arc::new(val_inner)), false);
2876 let schema = Arc::new(Schema::new(vec![val_list_field]));
2877
2878 let values = Arc::new(generate_list_data::<i32>());
2879
2880 let in_batch = RecordBatch::try_new(schema, vec![values])
2881 .unwrap()
2882 .slice(999, 0);
2883 let out_batch = deserialize_file(serialize_file(&in_batch));
2884 assert_eq!(in_batch, out_batch);
2885 }
2886
2887 #[test]
2888 fn encode_large_lists() {
2889 let val_inner = Field::new_list_field(DataType::UInt32, true);
2890 let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
2891 let schema = Arc::new(Schema::new(vec![val_list_field]));
2892
2893 let values = Arc::new(generate_list_data::<i64>());
2894
2895 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2898 roundtrip_ensure_sliced_smaller(in_batch, 1000);
2899 }
2900
2901 #[test]
2902 fn encode_nested_lists() {
2903 let inner_int = Arc::new(Field::new_list_field(DataType::UInt32, true));
2904 let inner_list_field = Arc::new(Field::new_list_field(DataType::List(inner_int), true));
2905 let list_field = Field::new("val", DataType::List(inner_list_field), true);
2906 let schema = Arc::new(Schema::new(vec![list_field]));
2907
2908 let values = Arc::new(generate_nested_list_data::<i32>());
2909
2910 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2911 roundtrip_ensure_sliced_smaller(in_batch, 1000);
2912 }
2913
2914 #[test]
2915 fn encode_nested_lists_starting_at_zero() {
2916 let inner_int = Arc::new(Field::new("item", DataType::UInt32, true));
2917 let inner_list_field = Arc::new(Field::new("item", DataType::List(inner_int), true));
2918 let list_field = Field::new("val", DataType::List(inner_list_field), true);
2919 let schema = Arc::new(Schema::new(vec![list_field]));
2920
2921 let values = Arc::new(generate_nested_list_data_starting_at_zero::<i32>());
2922
2923 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2924 roundtrip_ensure_sliced_smaller(in_batch, 1);
2925 }
2926
2927 #[test]
2928 fn encode_map_array() {
2929 let keys = Arc::new(Field::new("keys", DataType::UInt32, false));
2930 let values = Arc::new(Field::new("values", DataType::UInt32, true));
2931 let map_field = Field::new_map("map", "entries", keys, values, false, true);
2932 let schema = Arc::new(Schema::new(vec![map_field]));
2933
2934 let values = Arc::new(generate_map_array_data());
2935
2936 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2937 roundtrip_ensure_sliced_smaller(in_batch, 1000);
2938 }
2939
2940 #[test]
2941 fn test_decimal128_alignment16_is_sufficient() {
2942 const IPC_ALIGNMENT: usize = 16;
2943
2944 for num_cols in [1, 2, 3, 17, 50, 73, 99] {
2949 let num_rows = (num_cols * 7 + 11) % 100; let mut fields = Vec::new();
2952 let mut arrays = Vec::new();
2953 for i in 0..num_cols {
2954 let field = Field::new(format!("col_{}", i), DataType::Decimal128(38, 10), true);
2955 let array = Decimal128Array::from(vec![num_cols as i128; num_rows]);
2956 fields.push(field);
2957 arrays.push(Arc::new(array) as Arc<dyn Array>);
2958 }
2959 let schema = Schema::new(fields);
2960 let batch = RecordBatch::try_new(Arc::new(schema), arrays).unwrap();
2961
2962 let mut writer = FileWriter::try_new_with_options(
2963 Vec::new(),
2964 batch.schema_ref(),
2965 IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
2966 )
2967 .unwrap();
2968 writer.write(&batch).unwrap();
2969 writer.finish().unwrap();
2970
2971 let out: Vec<u8> = writer.into_inner().unwrap();
2972
2973 let buffer = Buffer::from_vec(out);
2974 let trailer_start = buffer.len() - 10;
2975 let footer_len =
2976 read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
2977 let footer =
2978 root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
2979
2980 let schema = fb_to_schema(footer.schema().unwrap());
2981
2982 let decoder =
2985 FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true);
2986
2987 let batches = footer.recordBatches().unwrap();
2988
2989 let block = batches.get(0);
2990 let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
2991 let data = buffer.slice_with_length(block.offset() as _, block_len);
2992
2993 let batch2 = decoder.read_record_batch(block, &data).unwrap().unwrap();
2994
2995 assert_eq!(batch, batch2);
2996 }
2997 }
2998
2999 #[test]
3000 fn test_decimal128_alignment8_is_unaligned() {
3001 const IPC_ALIGNMENT: usize = 8;
3002
3003 let num_cols = 2;
3004 let num_rows = 1;
3005
3006 let mut fields = Vec::new();
3007 let mut arrays = Vec::new();
3008 for i in 0..num_cols {
3009 let field = Field::new(format!("col_{}", i), DataType::Decimal128(38, 10), true);
3010 let array = Decimal128Array::from(vec![num_cols as i128; num_rows]);
3011 fields.push(field);
3012 arrays.push(Arc::new(array) as Arc<dyn Array>);
3013 }
3014 let schema = Schema::new(fields);
3015 let batch = RecordBatch::try_new(Arc::new(schema), arrays).unwrap();
3016
3017 let mut writer = FileWriter::try_new_with_options(
3018 Vec::new(),
3019 batch.schema_ref(),
3020 IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
3021 )
3022 .unwrap();
3023 writer.write(&batch).unwrap();
3024 writer.finish().unwrap();
3025
3026 let out: Vec<u8> = writer.into_inner().unwrap();
3027
3028 let buffer = Buffer::from_vec(out);
3029 let trailer_start = buffer.len() - 10;
3030 let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
3031 let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
3032
3033 let schema = fb_to_schema(footer.schema().unwrap());
3034
3035 let decoder =
3038 FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true);
3039
3040 let batches = footer.recordBatches().unwrap();
3041
3042 let block = batches.get(0);
3043 let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
3044 let data = buffer.slice_with_length(block.offset() as _, block_len);
3045
3046 let result = decoder.read_record_batch(block, &data);
3047
3048 let error = result.unwrap_err();
3049 assert_eq!(
3050 error.to_string(),
3051 "Invalid argument error: Misaligned buffers[0] in array of type Decimal128(38, 10), \
3052 offset from expected alignment of 16 by 8"
3053 );
3054 }
3055
3056 #[test]
3057 fn test_flush() {
3058 let num_cols = 2;
3061 let mut fields = Vec::new();
3062 let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap();
3063 for i in 0..num_cols {
3064 let field = Field::new(format!("col_{}", i), DataType::Decimal128(38, 10), true);
3065 fields.push(field);
3066 }
3067 let schema = Schema::new(fields);
3068 let inner_stream_writer = BufWriter::with_capacity(1024, Vec::new());
3069 let inner_file_writer = BufWriter::with_capacity(1024, Vec::new());
3070 let mut stream_writer =
3071 StreamWriter::try_new_with_options(inner_stream_writer, &schema, options.clone())
3072 .unwrap();
3073 let mut file_writer =
3074 FileWriter::try_new_with_options(inner_file_writer, &schema, options).unwrap();
3075
3076 let stream_bytes_written_on_new = stream_writer.get_ref().get_ref().len();
3077 let file_bytes_written_on_new = file_writer.get_ref().get_ref().len();
3078 stream_writer.flush().unwrap();
3079 file_writer.flush().unwrap();
3080 let stream_bytes_written_on_flush = stream_writer.get_ref().get_ref().len();
3081 let file_bytes_written_on_flush = file_writer.get_ref().get_ref().len();
3082 let stream_out = stream_writer.into_inner().unwrap().into_inner().unwrap();
3083 let expected_stream_flushed_bytes = stream_out.len() - 8;
3087 let expected_file_flushed_bytes = expected_stream_flushed_bytes + 8;
3090
3091 assert!(
3092 stream_bytes_written_on_new < stream_bytes_written_on_flush,
3093 "this test makes no sense if flush is not actually required"
3094 );
3095 assert!(
3096 file_bytes_written_on_new < file_bytes_written_on_flush,
3097 "this test makes no sense if flush is not actually required"
3098 );
3099 assert_eq!(stream_bytes_written_on_flush, expected_stream_flushed_bytes);
3100 assert_eq!(file_bytes_written_on_flush, expected_file_flushed_bytes);
3101 }
3102
3103 #[test]
3104 fn test_roundtrip_list_of_fixed_list() -> Result<(), ArrowError> {
3105 let l1_type =
3106 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, false)), 3);
3107 let l2_type = DataType::List(Arc::new(Field::new("item", l1_type.clone(), false)));
3108
3109 let l0_builder = Float32Builder::new();
3110 let l1_builder = FixedSizeListBuilder::new(l0_builder, 3).with_field(Arc::new(Field::new(
3111 "item",
3112 DataType::Float32,
3113 false,
3114 )));
3115 let mut l2_builder =
3116 ListBuilder::new(l1_builder).with_field(Arc::new(Field::new("item", l1_type, false)));
3117
3118 for point in [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0], [7.0, 8.0, 9.0]] {
3119 l2_builder.values().values().append_value(point[0]);
3120 l2_builder.values().values().append_value(point[1]);
3121 l2_builder.values().values().append_value(point[2]);
3122
3123 l2_builder.values().append(true);
3124 }
3125 l2_builder.append(true);
3126
3127 let point = [10., 11., 12.];
3128 l2_builder.values().values().append_value(point[0]);
3129 l2_builder.values().values().append_value(point[1]);
3130 l2_builder.values().values().append_value(point[2]);
3131
3132 l2_builder.values().append(true);
3133 l2_builder.append(true);
3134
3135 let array = Arc::new(l2_builder.finish()) as ArrayRef;
3136
3137 let schema = Arc::new(Schema::new_with_metadata(
3138 vec![Field::new("points", l2_type, false)],
3139 HashMap::default(),
3140 ));
3141
3142 test_slices(&array, &schema, 0, 1)?;
3145 test_slices(&array, &schema, 0, 2)?;
3146 test_slices(&array, &schema, 1, 1)?;
3147
3148 Ok(())
3149 }
3150
3151 #[test]
3152 fn test_roundtrip_list_of_fixed_list_w_nulls() -> Result<(), ArrowError> {
3153 let l0_builder = Float32Builder::new();
3154 let l1_builder = FixedSizeListBuilder::new(l0_builder, 3);
3155 let mut l2_builder = ListBuilder::new(l1_builder);
3156
3157 for point in [
3158 [Some(1.0), Some(2.0), None],
3159 [Some(4.0), Some(5.0), Some(6.0)],
3160 [None, Some(8.0), Some(9.0)],
3161 ] {
3162 for p in point {
3163 match p {
3164 Some(p) => l2_builder.values().values().append_value(p),
3165 None => l2_builder.values().values().append_null(),
3166 }
3167 }
3168
3169 l2_builder.values().append(true);
3170 }
3171 l2_builder.append(true);
3172
3173 let point = [Some(10.), None, None];
3174 for p in point {
3175 match p {
3176 Some(p) => l2_builder.values().values().append_value(p),
3177 None => l2_builder.values().values().append_null(),
3178 }
3179 }
3180
3181 l2_builder.values().append(true);
3182 l2_builder.append(true);
3183
3184 let array = Arc::new(l2_builder.finish()) as ArrayRef;
3185
3186 let schema = Arc::new(Schema::new_with_metadata(
3187 vec![Field::new(
3188 "points",
3189 DataType::List(Arc::new(Field::new(
3190 "item",
3191 DataType::FixedSizeList(
3192 Arc::new(Field::new("item", DataType::Float32, true)),
3193 3,
3194 ),
3195 true,
3196 ))),
3197 true,
3198 )],
3199 HashMap::default(),
3200 ));
3201
3202 test_slices(&array, &schema, 0, 1)?;
3205 test_slices(&array, &schema, 0, 2)?;
3206 test_slices(&array, &schema, 1, 1)?;
3207
3208 Ok(())
3209 }
3210
3211 fn test_slices(
3212 parent_array: &ArrayRef,
3213 schema: &SchemaRef,
3214 offset: usize,
3215 length: usize,
3216 ) -> Result<(), ArrowError> {
3217 let subarray = parent_array.slice(offset, length);
3218 let original_batch = RecordBatch::try_new(schema.clone(), vec![subarray])?;
3219
3220 let mut bytes = Vec::new();
3221 let mut writer = StreamWriter::try_new(&mut bytes, schema)?;
3222 writer.write(&original_batch)?;
3223 writer.finish()?;
3224
3225 let mut cursor = std::io::Cursor::new(bytes);
3226 let mut reader = StreamReader::try_new(&mut cursor, None)?;
3227 let returned_batch = reader.next().unwrap()?;
3228
3229 assert_eq!(original_batch, returned_batch);
3230
3231 Ok(())
3232 }
3233
3234 #[test]
3235 fn test_roundtrip_fixed_list() -> Result<(), ArrowError> {
3236 let int_builder = Int64Builder::new();
3237 let mut fixed_list_builder = FixedSizeListBuilder::new(int_builder, 3)
3238 .with_field(Arc::new(Field::new("item", DataType::Int64, false)));
3239
3240 for point in [[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]] {
3241 fixed_list_builder.values().append_value(point[0]);
3242 fixed_list_builder.values().append_value(point[1]);
3243 fixed_list_builder.values().append_value(point[2]);
3244
3245 fixed_list_builder.append(true);
3246 }
3247
3248 let array = Arc::new(fixed_list_builder.finish()) as ArrayRef;
3249
3250 let schema = Arc::new(Schema::new_with_metadata(
3251 vec![Field::new(
3252 "points",
3253 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, false)), 3),
3254 false,
3255 )],
3256 HashMap::default(),
3257 ));
3258
3259 test_slices(&array, &schema, 0, 4)?;
3262 test_slices(&array, &schema, 0, 2)?;
3263 test_slices(&array, &schema, 1, 3)?;
3264 test_slices(&array, &schema, 2, 1)?;
3265
3266 Ok(())
3267 }
3268
3269 #[test]
3270 fn test_roundtrip_fixed_list_w_nulls() -> Result<(), ArrowError> {
3271 let int_builder = Int64Builder::new();
3272 let mut fixed_list_builder = FixedSizeListBuilder::new(int_builder, 3);
3273
3274 for point in [
3275 [Some(1), Some(2), None],
3276 [Some(4), Some(5), Some(6)],
3277 [None, Some(8), Some(9)],
3278 [Some(10), None, None],
3279 ] {
3280 for p in point {
3281 match p {
3282 Some(p) => fixed_list_builder.values().append_value(p),
3283 None => fixed_list_builder.values().append_null(),
3284 }
3285 }
3286
3287 fixed_list_builder.append(true);
3288 }
3289
3290 let array = Arc::new(fixed_list_builder.finish()) as ArrayRef;
3291
3292 let schema = Arc::new(Schema::new_with_metadata(
3293 vec![Field::new(
3294 "points",
3295 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, true)), 3),
3296 true,
3297 )],
3298 HashMap::default(),
3299 ));
3300
3301 test_slices(&array, &schema, 0, 4)?;
3304 test_slices(&array, &schema, 0, 2)?;
3305 test_slices(&array, &schema, 1, 3)?;
3306 test_slices(&array, &schema, 2, 1)?;
3307
3308 Ok(())
3309 }
3310
3311 #[test]
3312 fn test_metadata_encoding_ordering() {
3313 fn create_hash() -> u64 {
3314 let metadata: HashMap<String, String> = [
3315 ("a", "1"), ("b", "2"), ("c", "3"), ("d", "4"), ("e", "5"), ]
3321 .into_iter()
3322 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3323 .collect();
3324
3325 let schema = Arc::new(
3327 Schema::new(vec![
3328 Field::new("a", DataType::Int64, true).with_metadata(metadata.clone())
3329 ])
3330 .with_metadata(metadata)
3331 .clone(),
3332 );
3333 let batch = RecordBatch::new_empty(schema.clone());
3334
3335 let mut bytes = Vec::new();
3336 let mut w = StreamWriter::try_new(&mut bytes, batch.schema_ref()).unwrap();
3337 w.write(&batch).unwrap();
3338 w.finish().unwrap();
3339
3340 let mut h = std::hash::DefaultHasher::new();
3341 h.write(&bytes);
3342 h.finish()
3343 }
3344
3345 let expected = create_hash();
3346
3347 let all_passed = (0..20).all(|_| create_hash() == expected);
3352 assert!(all_passed);
3353 }
3354}