1use std::cmp::min;
28use std::collections::HashMap;
29use std::io::{BufWriter, Write};
30use std::mem::size_of;
31use std::sync::Arc;
32
33use flatbuffers::FlatBufferBuilder;
34
35use arrow_array::builder::BufferBuilder;
36use arrow_array::cast::*;
37use arrow_array::types::{Int16Type, Int32Type, Int64Type, RunEndIndexType};
38use arrow_array::*;
39use arrow_buffer::bit_util;
40use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer, ToByteSlice};
41use arrow_data::{ArrayData, ArrayDataBuilder, BufferSpec, layout};
42use arrow_schema::*;
43
44use crate::CONTINUATION_MARKER;
45use crate::compression::CompressionCodec;
46pub use crate::compression::CompressionContext;
47use crate::convert::IpcSchemaEncoder;
48
49#[derive(Debug, Clone)]
51pub struct IpcWriteOptions {
52 alignment: u8,
55 write_legacy_ipc_format: bool,
57 metadata_version: crate::MetadataVersion,
66 batch_compression_type: Option<crate::CompressionType>,
69 dictionary_handling: DictionaryHandling,
71}
72
73enum EncodedBuffer {
79 Raw(Buffer),
81 Compressed(Vec<u8>),
83}
84
85impl EncodedBuffer {
86 fn as_slice(&self) -> &[u8] {
87 match self {
88 EncodedBuffer::Raw(b) => b.as_slice(),
89 EncodedBuffer::Compressed(v) => v.as_slice(),
90 }
91 }
92
93 fn len(&self) -> usize {
94 match self {
95 EncodedBuffer::Raw(b) => b.len(),
96 EncodedBuffer::Compressed(v) => v.len(),
97 }
98 }
99}
100#[derive(Default)]
105struct IpcMetadataBuilder {
106 nodes: Vec<crate::FieldNode>,
107 buffers: Vec<crate::Buffer>,
108}
109
110enum IpcBodySink<'a> {
115 Write(&'a mut Vec<u8>),
117 Collect(&'a mut Vec<EncodedBuffer>),
119}
120impl<'a> IpcBodySink<'a> {
121 pub fn write(&mut self, pad_len: usize, buffer: EncodedBuffer) {
123 match self {
124 IpcBodySink::Write(vec) => {
125 vec.extend_from_slice(buffer.as_slice());
126 vec.extend_from_slice(&PADDING[..pad_len]);
127 }
128 IpcBodySink::Collect(vec) => {
129 vec.push(buffer);
130 }
131 }
132 }
133}
134
135struct IpcWriteMetadata {
140 dictionary_block_sizes: Vec<(usize, usize)>,
143 padded_header_len: usize,
145 body_len: usize,
147}
148
149impl IpcWriteOptions {
150 pub fn try_with_compression(
155 mut self,
156 batch_compression_type: Option<crate::CompressionType>,
157 ) -> Result<Self, ArrowError> {
158 self.batch_compression_type = batch_compression_type;
159
160 if self.batch_compression_type.is_some()
161 && self.metadata_version < crate::MetadataVersion::V5
162 {
163 return Err(ArrowError::InvalidArgumentError(
164 "Compression only supported in metadata v5 and above".to_string(),
165 ));
166 }
167 Ok(self)
168 }
169 pub fn try_new(
171 alignment: usize,
172 write_legacy_ipc_format: bool,
173 metadata_version: crate::MetadataVersion,
174 ) -> Result<Self, ArrowError> {
175 let is_alignment_valid =
176 alignment == 8 || alignment == 16 || alignment == 32 || alignment == 64;
177 if !is_alignment_valid {
178 return Err(ArrowError::InvalidArgumentError(
179 "Alignment should be 8, 16, 32, or 64.".to_string(),
180 ));
181 }
182 let alignment: u8 = u8::try_from(alignment).expect("range already checked");
183 match metadata_version {
184 crate::MetadataVersion::V1
185 | crate::MetadataVersion::V2
186 | crate::MetadataVersion::V3 => Err(ArrowError::InvalidArgumentError(
187 "Writing IPC metadata version 3 and lower not supported".to_string(),
188 )),
189 #[allow(deprecated)]
190 crate::MetadataVersion::V4 => Ok(Self {
191 alignment,
192 write_legacy_ipc_format,
193 metadata_version,
194 batch_compression_type: None,
195 dictionary_handling: DictionaryHandling::default(),
196 }),
197 crate::MetadataVersion::V5 => {
198 if write_legacy_ipc_format {
199 Err(ArrowError::InvalidArgumentError(
200 "Legacy IPC format only supported on metadata version 4".to_string(),
201 ))
202 } else {
203 Ok(Self {
204 alignment,
205 write_legacy_ipc_format,
206 metadata_version,
207 batch_compression_type: None,
208 dictionary_handling: DictionaryHandling::default(),
209 })
210 }
211 }
212 z => Err(ArrowError::InvalidArgumentError(format!(
213 "Unsupported crate::MetadataVersion {z:?}"
214 ))),
215 }
216 }
217
218 pub fn with_dictionary_handling(mut self, dictionary_handling: DictionaryHandling) -> Self {
220 self.dictionary_handling = dictionary_handling;
221 self
222 }
223}
224
225impl Default for IpcWriteOptions {
226 fn default() -> Self {
227 Self {
228 alignment: 64,
229 write_legacy_ipc_format: false,
230 metadata_version: crate::MetadataVersion::V5,
231 batch_compression_type: None,
232 dictionary_handling: DictionaryHandling::default(),
233 }
234 }
235}
236
237#[derive(Debug, Default)]
238pub struct IpcDataGenerator {}
272
273impl IpcDataGenerator {
274 pub fn schema_to_bytes_with_dictionary_tracker(
277 &self,
278 schema: &Schema,
279 dictionary_tracker: &mut DictionaryTracker,
280 write_options: &IpcWriteOptions,
281 ) -> EncodedData {
282 let mut fbb = FlatBufferBuilder::new();
283 let schema = {
284 let fb = IpcSchemaEncoder::new()
285 .with_dictionary_tracker(dictionary_tracker)
286 .schema_to_fb_offset(&mut fbb, schema);
287 fb.as_union_value()
288 };
289
290 let mut message = crate::MessageBuilder::new(&mut fbb);
291 message.add_version(write_options.metadata_version);
292 message.add_header_type(crate::MessageHeader::Schema);
293 message.add_bodyLength(0);
294 message.add_header(schema);
295 let data = message.finish();
297 fbb.finish(data, None);
298
299 let data = fbb.finished_data();
300 EncodedData {
301 ipc_message: data.to_vec(),
302 arrow_data: vec![],
303 }
304 }
305
306 fn _encode_dictionaries<I: Iterator<Item = i64>>(
307 &self,
308 column: &ArrayRef,
309 encoded_dictionaries: &mut Vec<EncodedData>,
310 dictionary_tracker: &mut DictionaryTracker,
311 write_options: &IpcWriteOptions,
312 dict_id: &mut I,
313 compression_context: &mut CompressionContext,
314 ) -> Result<(), ArrowError> {
315 match column.data_type() {
316 DataType::Struct(fields) => {
317 let s = as_struct_array(column);
318 for (field, column) in fields.iter().zip(s.columns()) {
319 self.encode_dictionaries(
320 field,
321 column,
322 encoded_dictionaries,
323 dictionary_tracker,
324 write_options,
325 dict_id,
326 compression_context,
327 )?;
328 }
329 }
330 DataType::RunEndEncoded(_, values) => {
331 let data = column.to_data();
332 if data.child_data().len() != 2 {
333 return Err(ArrowError::InvalidArgumentError(format!(
334 "The run encoded array should have exactly two child arrays. Found {}",
335 data.child_data().len()
336 )));
337 }
338 let values_array = make_array(data.child_data()[1].clone());
341 self.encode_dictionaries(
342 values,
343 &values_array,
344 encoded_dictionaries,
345 dictionary_tracker,
346 write_options,
347 dict_id,
348 compression_context,
349 )?;
350 }
351 DataType::List(field) => {
352 let list = as_list_array(column);
353 self.encode_dictionaries(
354 field,
355 list.values(),
356 encoded_dictionaries,
357 dictionary_tracker,
358 write_options,
359 dict_id,
360 compression_context,
361 )?;
362 }
363 DataType::LargeList(field) => {
364 let list = as_large_list_array(column);
365 self.encode_dictionaries(
366 field,
367 list.values(),
368 encoded_dictionaries,
369 dictionary_tracker,
370 write_options,
371 dict_id,
372 compression_context,
373 )?;
374 }
375 DataType::ListView(field) => {
376 let list = column.as_list_view::<i32>();
377 self.encode_dictionaries(
378 field,
379 list.values(),
380 encoded_dictionaries,
381 dictionary_tracker,
382 write_options,
383 dict_id,
384 compression_context,
385 )?;
386 }
387 DataType::LargeListView(field) => {
388 let list = column.as_list_view::<i64>();
389 self.encode_dictionaries(
390 field,
391 list.values(),
392 encoded_dictionaries,
393 dictionary_tracker,
394 write_options,
395 dict_id,
396 compression_context,
397 )?;
398 }
399 DataType::FixedSizeList(field, _) => {
400 let list = column
401 .as_any()
402 .downcast_ref::<FixedSizeListArray>()
403 .expect("Unable to downcast to fixed size list array");
404 self.encode_dictionaries(
405 field,
406 list.values(),
407 encoded_dictionaries,
408 dictionary_tracker,
409 write_options,
410 dict_id,
411 compression_context,
412 )?;
413 }
414 DataType::Map(field, _) => {
415 let map_array = as_map_array(column);
416
417 let (keys, values) = match field.data_type() {
418 DataType::Struct(fields) if fields.len() == 2 => (&fields[0], &fields[1]),
419 _ => panic!("Incorrect field data type {:?}", field.data_type()),
420 };
421
422 self.encode_dictionaries(
424 keys,
425 map_array.keys(),
426 encoded_dictionaries,
427 dictionary_tracker,
428 write_options,
429 dict_id,
430 compression_context,
431 )?;
432
433 self.encode_dictionaries(
435 values,
436 map_array.values(),
437 encoded_dictionaries,
438 dictionary_tracker,
439 write_options,
440 dict_id,
441 compression_context,
442 )?;
443 }
444 DataType::Union(fields, _) => {
445 let union = as_union_array(column);
446 for (type_id, field) in fields.iter() {
447 let column = union.child(type_id);
448 self.encode_dictionaries(
449 field,
450 column,
451 encoded_dictionaries,
452 dictionary_tracker,
453 write_options,
454 dict_id,
455 compression_context,
456 )?;
457 }
458 }
459 _ => (),
460 }
461
462 Ok(())
463 }
464
465 #[allow(clippy::too_many_arguments)]
466 fn encode_dictionaries<I: Iterator<Item = i64>>(
467 &self,
468 field: &Field,
469 column: &ArrayRef,
470 encoded_dictionaries: &mut Vec<EncodedData>,
471 dictionary_tracker: &mut DictionaryTracker,
472 write_options: &IpcWriteOptions,
473 dict_id_seq: &mut I,
474 compression_context: &mut CompressionContext,
475 ) -> Result<(), ArrowError> {
476 match column.data_type() {
477 DataType::Dictionary(_key_type, _value_type) => {
478 let dict_data = column.to_data();
479 let dict_values = &dict_data.child_data()[0];
480
481 let values = make_array(dict_data.child_data()[0].clone());
482
483 self._encode_dictionaries(
484 &values,
485 encoded_dictionaries,
486 dictionary_tracker,
487 write_options,
488 dict_id_seq,
489 compression_context,
490 )?;
491
492 let dict_id = dict_id_seq.next().ok_or_else(|| {
496 ArrowError::IpcError(format!(
497 "no dict id for field {:?}: field.data_type={:?}, column.data_type={:?}",
498 field.name(),
499 field.data_type(),
500 column.data_type()
501 ))
502 })?;
503
504 match dictionary_tracker.insert_column(
505 dict_id,
506 column,
507 write_options.dictionary_handling,
508 )? {
509 DictionaryUpdate::None => {}
510 DictionaryUpdate::New | DictionaryUpdate::Replaced => {
511 encoded_dictionaries.push(self.dictionary_batch_to_bytes(
512 dict_id,
513 dict_values,
514 write_options,
515 false,
516 compression_context,
517 )?);
518 }
519 DictionaryUpdate::Delta(data) => {
520 encoded_dictionaries.push(self.dictionary_batch_to_bytes(
521 dict_id,
522 &data,
523 write_options,
524 true,
525 compression_context,
526 )?);
527 }
528 }
529 }
530 _ => self._encode_dictionaries(
531 column,
532 encoded_dictionaries,
533 dictionary_tracker,
534 write_options,
535 dict_id_seq,
536 compression_context,
537 )?,
538 }
539
540 Ok(())
541 }
542
543 pub fn encode(
547 &self,
548 batch: &RecordBatch,
549 dictionary_tracker: &mut DictionaryTracker,
550 write_options: &IpcWriteOptions,
551 compression_context: &mut CompressionContext,
552 ) -> Result<(Vec<EncodedData>, EncodedData), ArrowError> {
553 let encoded_dictionaries = self.encode_all_dicts(
554 batch,
555 dictionary_tracker,
556 write_options,
557 compression_context,
558 )?;
559 let mut arrow_data = Vec::new();
560 let (ipc_message, _, tail_pad) = self.record_batch_to_bytes(
561 batch,
562 write_options,
563 compression_context,
564 &mut IpcBodySink::Write(&mut arrow_data),
565 )?;
566 arrow_data.extend_from_slice(&PADDING[..tail_pad]);
567 Ok((
568 encoded_dictionaries,
569 EncodedData {
570 ipc_message,
571 arrow_data,
572 },
573 ))
574 }
575
576 fn encode_all_dicts(
578 &self,
579 batch: &RecordBatch,
580 dictionary_tracker: &mut DictionaryTracker,
581 write_options: &IpcWriteOptions,
582 compression_context: &mut CompressionContext,
583 ) -> Result<Vec<EncodedData>, ArrowError> {
584 let schema = batch.schema();
585 let mut encoded_dictionaries = Vec::with_capacity(schema.flattened_fields().len());
586 let mut dict_id = dictionary_tracker.dict_ids.clone().into_iter();
587 for (i, field) in schema.fields().iter().enumerate() {
588 self.encode_dictionaries(
589 field,
590 batch.column(i),
591 &mut encoded_dictionaries,
592 dictionary_tracker,
593 write_options,
594 &mut dict_id,
595 compression_context,
596 )?;
597 }
598 Ok(encoded_dictionaries)
599 }
600
601 fn write<W: Write>(
605 &self,
606 batch: &RecordBatch,
607 dictionary_tracker: &mut DictionaryTracker,
608 write_options: &IpcWriteOptions,
609 compression_context: &mut CompressionContext,
610 writer: &mut W,
611 ) -> Result<IpcWriteMetadata, ArrowError> {
612 let encoded_dictionaries = self.encode_all_dicts(
613 batch,
614 dictionary_tracker,
615 write_options,
616 compression_context,
617 )?;
618
619 let mut dictionary_block_sizes = Vec::with_capacity(encoded_dictionaries.len());
620 for dict in encoded_dictionaries {
621 dictionary_block_sizes.push(write_message(&mut *writer, dict, write_options)?);
622 }
623
624 let capacity = batch
625 .columns()
626 .iter()
627 .map(|a| estimate_encoded_buffer_count(a.data_type()))
628 .sum();
629 let mut encoded_buffers: Vec<EncodedBuffer> = Vec::with_capacity(capacity);
630 let (ipc_message, body_len, tail_pad) = self.record_batch_to_bytes(
631 batch,
632 write_options,
633 compression_context,
634 &mut IpcBodySink::Collect(&mut encoded_buffers),
635 )?;
636
637 let alignment = write_options.alignment;
638 let a = usize::from(alignment - 1);
639 let prefix_size = if write_options.write_legacy_ipc_format {
640 4
641 } else {
642 8
643 };
644 let aligned_size = (ipc_message.len() + prefix_size + a) & !a;
645 write_continuation(
646 &mut *writer,
647 write_options,
648 (aligned_size - prefix_size) as i32,
649 )?;
650 writer.write_all(&ipc_message)?;
651 writer.write_all(&PADDING[..aligned_size - ipc_message.len() - prefix_size])?;
652 for enc in &encoded_buffers {
653 writer.write_all(enc.as_slice())?;
654 writer.write_all(&PADDING[..pad_to_alignment(alignment, enc.len())])?;
655 }
656 writer.write_all(&PADDING[..tail_pad])?;
657
658 Ok(IpcWriteMetadata {
659 dictionary_block_sizes,
660 padded_header_len: aligned_size,
661 body_len,
662 })
663 }
664
665 #[deprecated(since = "57.0.0", note = "Use `encode` instead")]
669 pub fn encoded_batch(
670 &self,
671 batch: &RecordBatch,
672 dictionary_tracker: &mut DictionaryTracker,
673 write_options: &IpcWriteOptions,
674 ) -> Result<(Vec<EncodedData>, EncodedData), ArrowError> {
675 self.encode(
676 batch,
677 dictionary_tracker,
678 write_options,
679 &mut Default::default(),
680 )
681 }
682
683 fn record_batch_to_bytes(
689 &self,
690 batch: &RecordBatch,
691 write_options: &IpcWriteOptions,
692 compression_context: &mut CompressionContext,
693 sink: &mut IpcBodySink<'_>,
694 ) -> Result<(Vec<u8>, usize, usize), ArrowError> {
695 let mut fbb = FlatBufferBuilder::new();
696
697 let batch_compression_type = write_options.batch_compression_type;
698
699 let compression = batch_compression_type.map(|batch_compression_type| {
700 let mut c = crate::BodyCompressionBuilder::new(&mut fbb);
701 c.add_method(crate::BodyCompressionMethod::BUFFER);
702 c.add_codec(batch_compression_type);
703 c.finish()
704 });
705
706 let compression_codec: Option<CompressionCodec> =
707 batch_compression_type.map(TryInto::try_into).transpose()?;
708
709 let alignment = write_options.alignment;
710 let mut variadic_buffer_counts = vec![];
711 let mut meta = IpcMetadataBuilder::default();
712 let mut offset = 0i64;
713
714 for array in batch.columns() {
715 let array_data = array.to_data();
716 offset = write_array_data(
717 &array_data,
718 &mut meta,
719 sink,
720 offset,
721 compression_codec,
722 compression_context,
723 write_options,
724 )?;
725 append_variadic_buffer_counts(&mut variadic_buffer_counts, &array_data);
726 }
727
728 let tail_pad = pad_to_alignment(alignment, offset as usize);
729 let body_len = offset as usize + tail_pad;
730
731 let buffers = fbb.create_vector(&meta.buffers);
732 let nodes = fbb.create_vector(&meta.nodes);
733 let variadic_buffer = if variadic_buffer_counts.is_empty() {
734 None
735 } else {
736 Some(fbb.create_vector(&variadic_buffer_counts))
737 };
738
739 let root = {
740 let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb);
741 batch_builder.add_length(batch.num_rows() as i64);
742 batch_builder.add_nodes(nodes);
743 batch_builder.add_buffers(buffers);
744 if let Some(c) = compression {
745 batch_builder.add_compression(c);
746 }
747 if let Some(v) = variadic_buffer {
748 batch_builder.add_variadicBufferCounts(v);
749 }
750 batch_builder.finish().as_union_value()
751 };
752 let mut message = crate::MessageBuilder::new(&mut fbb);
754 message.add_version(write_options.metadata_version);
755 message.add_header_type(crate::MessageHeader::RecordBatch);
756 message.add_bodyLength(body_len as i64);
757 message.add_header(root);
758 let root = message.finish();
759 fbb.finish(root, None);
760
761 Ok((fbb.finished_data().to_vec(), body_len, tail_pad))
762 }
763
764 fn dictionary_batch_to_bytes(
767 &self,
768 dict_id: i64,
769 array_data: &ArrayData,
770 write_options: &IpcWriteOptions,
771 is_delta: bool,
772 compression_context: &mut CompressionContext,
773 ) -> Result<EncodedData, ArrowError> {
774 let mut fbb = FlatBufferBuilder::new();
775
776 let mut arrow_data: Vec<u8> = vec![];
777
778 let batch_compression_type = write_options.batch_compression_type;
780
781 let compression = batch_compression_type.map(|batch_compression_type| {
782 let mut c = crate::BodyCompressionBuilder::new(&mut fbb);
783 c.add_method(crate::BodyCompressionMethod::BUFFER);
784 c.add_codec(batch_compression_type);
785 c.finish()
786 });
787
788 let compression_codec: Option<CompressionCodec> = batch_compression_type
789 .map(|batch_compression_type| batch_compression_type.try_into())
790 .transpose()?;
791
792 let alignment = write_options.alignment;
793 let mut meta = IpcMetadataBuilder::default();
794 let mut sink = IpcBodySink::Write(&mut arrow_data);
795 let offset = write_array_data(
796 array_data,
797 &mut meta,
798 &mut sink,
799 0,
800 compression_codec,
801 compression_context,
802 write_options,
803 )?;
804
805 let mut variadic_buffer_counts = vec![];
806 append_variadic_buffer_counts(&mut variadic_buffer_counts, array_data);
807
808 let tail_pad = pad_to_alignment(alignment, offset as usize);
810 let body_len = offset as usize + tail_pad;
811 arrow_data.extend_from_slice(&PADDING[..tail_pad]);
812
813 let buffers = fbb.create_vector(&meta.buffers);
815 let nodes = fbb.create_vector(&meta.nodes);
816 let variadic_buffer = if variadic_buffer_counts.is_empty() {
817 None
818 } else {
819 Some(fbb.create_vector(&variadic_buffer_counts))
820 };
821
822 let root = {
823 let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb);
824 batch_builder.add_length(array_data.len() as i64);
825 batch_builder.add_nodes(nodes);
826 batch_builder.add_buffers(buffers);
827 if let Some(c) = compression {
828 batch_builder.add_compression(c);
829 }
830 if let Some(v) = variadic_buffer {
831 batch_builder.add_variadicBufferCounts(v);
832 }
833 batch_builder.finish()
834 };
835
836 let root = {
837 let mut batch_builder = crate::DictionaryBatchBuilder::new(&mut fbb);
838 batch_builder.add_id(dict_id);
839 batch_builder.add_data(root);
840 batch_builder.add_isDelta(is_delta);
841 batch_builder.finish().as_union_value()
842 };
843
844 let root = {
845 let mut message_builder = crate::MessageBuilder::new(&mut fbb);
846 message_builder.add_version(write_options.metadata_version);
847 message_builder.add_header_type(crate::MessageHeader::DictionaryBatch);
848 message_builder.add_bodyLength(body_len as i64);
849 message_builder.add_header(root);
850 message_builder.finish()
851 };
852
853 fbb.finish(root, None);
854 let finished_data = fbb.finished_data();
855
856 Ok(EncodedData {
857 ipc_message: finished_data.to_vec(),
858 arrow_data,
859 })
860 }
861}
862
863fn append_variadic_buffer_counts(counts: &mut Vec<i64>, array: &ArrayData) {
864 match array.data_type() {
865 DataType::BinaryView | DataType::Utf8View => {
866 counts.push(array.buffers().len() as i64 - 1);
869 }
870 DataType::Dictionary(_, _) => {
871 }
874 _ => {
875 for child in array.child_data() {
876 append_variadic_buffer_counts(counts, child)
877 }
878 }
879 }
880}
881
882pub(crate) fn unslice_run_array(arr: ArrayData) -> Result<ArrayData, ArrowError> {
883 match arr.data_type() {
884 DataType::RunEndEncoded(k, _) => match k.data_type() {
885 DataType::Int16 => {
886 Ok(into_zero_offset_run_array(RunArray::<Int16Type>::from(arr))?.into_data())
887 }
888 DataType::Int32 => {
889 Ok(into_zero_offset_run_array(RunArray::<Int32Type>::from(arr))?.into_data())
890 }
891 DataType::Int64 => {
892 Ok(into_zero_offset_run_array(RunArray::<Int64Type>::from(arr))?.into_data())
893 }
894 d => unreachable!("Unexpected data type {d}"),
895 },
896 d => Err(ArrowError::InvalidArgumentError(format!(
897 "The given array is not a run array. Data type of given array: {d}"
898 ))),
899 }
900}
901
902fn into_zero_offset_run_array<R: RunEndIndexType>(
905 run_array: RunArray<R>,
906) -> Result<RunArray<R>, ArrowError> {
907 let run_ends = run_array.run_ends();
908 if run_ends.offset() == 0 && run_ends.max_value() == run_ends.len() {
909 return Ok(run_array);
910 }
911
912 let start_physical_index = run_ends.get_start_physical_index();
914
915 let end_physical_index = run_ends.get_end_physical_index();
917
918 let physical_length = end_physical_index - start_physical_index + 1;
919
920 let offset = R::Native::usize_as(run_ends.offset());
922 let mut builder = BufferBuilder::<R::Native>::new(physical_length);
923 for run_end_value in &run_ends.values()[start_physical_index..end_physical_index] {
924 builder.append(run_end_value.sub_wrapping(offset));
925 }
926 builder.append(R::Native::from_usize(run_array.len()).unwrap());
927 let new_run_ends = unsafe {
928 ArrayDataBuilder::new(R::DATA_TYPE)
931 .len(physical_length)
932 .add_buffer(builder.finish())
933 .build_unchecked()
934 };
935
936 let new_values = run_array
938 .values()
939 .slice(start_physical_index, physical_length)
940 .into_data();
941
942 let builder = ArrayDataBuilder::new(run_array.data_type().clone())
943 .len(run_array.len())
944 .add_child_data(new_run_ends)
945 .add_child_data(new_values);
946 let array_data = unsafe {
947 builder.build_unchecked()
950 };
951 Ok(array_data.into())
952}
953
954#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
956pub enum DictionaryHandling {
957 #[default]
959 Resend,
960 Delta,
966}
967
968#[derive(Debug, Clone)]
970pub enum DictionaryUpdate {
971 None,
974 New,
976 Replaced,
978 Delta(ArrayData),
980}
981
982#[derive(Debug)]
988pub struct DictionaryTracker {
989 written: HashMap<i64, ArrayData>,
991 dict_ids: Vec<i64>,
992 error_on_replacement: bool,
993}
994
995impl DictionaryTracker {
996 pub fn new(error_on_replacement: bool) -> Self {
1002 #[allow(deprecated)]
1003 Self {
1004 written: HashMap::new(),
1005 dict_ids: Vec::new(),
1006 error_on_replacement,
1007 }
1008 }
1009
1010 pub fn next_dict_id(&mut self) -> i64 {
1012 let next = self
1013 .dict_ids
1014 .last()
1015 .copied()
1016 .map(|i| i + 1)
1017 .unwrap_or_default();
1018
1019 self.dict_ids.push(next);
1020 next
1021 }
1022
1023 pub fn dict_id(&mut self) -> &[i64] {
1026 &self.dict_ids
1027 }
1028
1029 #[deprecated(since = "56.1.0", note = "Use `insert_column` instead")]
1039 pub fn insert(&mut self, dict_id: i64, column: &ArrayRef) -> Result<bool, ArrowError> {
1040 let dict_data = column.to_data();
1041 let dict_values = &dict_data.child_data()[0];
1042
1043 if let Some(last) = self.written.get(&dict_id) {
1045 if ArrayData::ptr_eq(&last.child_data()[0], dict_values) {
1046 return Ok(false);
1048 }
1049 if self.error_on_replacement {
1050 if last.child_data()[0] == *dict_values {
1052 return Ok(false);
1054 }
1055 return Err(ArrowError::InvalidArgumentError(
1056 "Dictionary replacement detected when writing IPC file format. \
1057 Arrow IPC files only support a single dictionary for a given field \
1058 across all batches."
1059 .to_string(),
1060 ));
1061 }
1062 }
1063
1064 self.written.insert(dict_id, dict_data);
1065 Ok(true)
1066 }
1067
1068 pub fn insert_column(
1084 &mut self,
1085 dict_id: i64,
1086 column: &ArrayRef,
1087 dict_handling: DictionaryHandling,
1088 ) -> Result<DictionaryUpdate, ArrowError> {
1089 let new_data = column.to_data();
1090 let new_values = &new_data.child_data()[0];
1091
1092 let Some(old) = self.written.get(&dict_id) else {
1094 self.written.insert(dict_id, new_data);
1095 return Ok(DictionaryUpdate::New);
1096 };
1097
1098 let old_values = &old.child_data()[0];
1101 if ArrayData::ptr_eq(old_values, new_values) {
1102 return Ok(DictionaryUpdate::None);
1103 }
1104
1105 let comparison = compare_dictionaries(old_values, new_values);
1107 if matches!(comparison, DictionaryComparison::Equal) {
1108 return Ok(DictionaryUpdate::None);
1109 }
1110
1111 const REPLACEMENT_ERROR: &str = "Dictionary replacement detected when writing IPC file format. \
1112 Arrow IPC files only support a single dictionary for a given field \
1113 across all batches.";
1114
1115 match comparison {
1116 DictionaryComparison::NotEqual => {
1117 if self.error_on_replacement {
1118 return Err(ArrowError::InvalidArgumentError(
1119 REPLACEMENT_ERROR.to_string(),
1120 ));
1121 }
1122
1123 self.written.insert(dict_id, new_data);
1124 Ok(DictionaryUpdate::Replaced)
1125 }
1126 DictionaryComparison::Delta => match dict_handling {
1127 DictionaryHandling::Resend => {
1128 if self.error_on_replacement {
1129 return Err(ArrowError::InvalidArgumentError(
1130 REPLACEMENT_ERROR.to_string(),
1131 ));
1132 }
1133
1134 self.written.insert(dict_id, new_data);
1135 Ok(DictionaryUpdate::Replaced)
1136 }
1137 DictionaryHandling::Delta => {
1138 let delta =
1139 new_values.slice(old_values.len(), new_values.len() - old_values.len());
1140 self.written.insert(dict_id, new_data);
1141 Ok(DictionaryUpdate::Delta(delta))
1142 }
1143 },
1144 DictionaryComparison::Equal => unreachable!("Already checked equal case"),
1145 }
1146 }
1147
1148 pub fn clear(&mut self) {
1154 self.dict_ids.clear();
1155 self.written.clear();
1156 }
1157}
1158
1159#[derive(Debug, Clone)]
1161enum DictionaryComparison {
1162 NotEqual,
1164 Equal,
1166 Delta,
1169}
1170
1171fn compare_dictionaries(old: &ArrayData, new: &ArrayData) -> DictionaryComparison {
1173 let existing_len = old.len();
1175 let new_len = new.len();
1176 if existing_len == new_len {
1177 if *old == *new {
1178 return DictionaryComparison::Equal;
1179 } else {
1180 return DictionaryComparison::NotEqual;
1181 }
1182 }
1183
1184 if new_len < existing_len {
1186 return DictionaryComparison::NotEqual;
1187 }
1188
1189 if new.slice(0, existing_len) == *old {
1191 return DictionaryComparison::Delta;
1192 }
1193
1194 DictionaryComparison::NotEqual
1195}
1196
1197pub struct FileWriter<W> {
1220 writer: W,
1222 write_options: IpcWriteOptions,
1224 schema: SchemaRef,
1226 block_offsets: usize,
1228 dictionary_blocks: Vec<crate::Block>,
1230 record_blocks: Vec<crate::Block>,
1232 finished: bool,
1234 dictionary_tracker: DictionaryTracker,
1236 custom_metadata: HashMap<String, String>,
1238
1239 data_gen: IpcDataGenerator,
1240
1241 compression_context: CompressionContext,
1242}
1243
1244impl<W: Write> FileWriter<BufWriter<W>> {
1245 pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1249 Self::try_new(BufWriter::new(writer), schema)
1250 }
1251}
1252
1253impl<W: Write> FileWriter<W> {
1254 pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1262 let write_options = IpcWriteOptions::default();
1263 Self::try_new_with_options(writer, schema, write_options)
1264 }
1265
1266 pub fn try_new_with_options(
1274 mut writer: W,
1275 schema: &Schema,
1276 write_options: IpcWriteOptions,
1277 ) -> Result<Self, ArrowError> {
1278 let data_gen = IpcDataGenerator::default();
1279 let pad_len = pad_to_alignment(write_options.alignment, super::ARROW_MAGIC.len());
1281 let header_size = super::ARROW_MAGIC.len() + pad_len;
1282 writer.write_all(&super::ARROW_MAGIC)?;
1283 writer.write_all(&PADDING[..pad_len])?;
1284 let mut dictionary_tracker = DictionaryTracker::new(true);
1286 let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker(
1287 schema,
1288 &mut dictionary_tracker,
1289 &write_options,
1290 );
1291 let (meta, data) = write_message(&mut writer, encoded_message, &write_options)?;
1292 Ok(Self {
1293 writer,
1294 write_options,
1295 schema: Arc::new(schema.clone()),
1296 block_offsets: meta + data + header_size,
1297 dictionary_blocks: vec![],
1298 record_blocks: vec![],
1299 finished: false,
1300 dictionary_tracker,
1301 custom_metadata: HashMap::new(),
1302 data_gen,
1303 compression_context: CompressionContext::default(),
1304 })
1305 }
1306
1307 pub fn write_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
1309 self.custom_metadata.insert(key.into(), value.into());
1310 }
1311
1312 pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1314 if self.finished {
1315 return Err(ArrowError::IpcError(
1316 "Cannot write record batch to file writer as it is closed".to_string(),
1317 ));
1318 }
1319
1320 let meta = self.data_gen.write(
1321 batch,
1322 &mut self.dictionary_tracker,
1323 &self.write_options,
1324 &mut self.compression_context,
1325 &mut self.writer,
1326 )?;
1327
1328 for (header_len, body_len) in meta.dictionary_block_sizes {
1329 let block = crate::Block::new(
1330 self.block_offsets as i64,
1331 header_len as i32,
1332 body_len as i64,
1333 );
1334 self.dictionary_blocks.push(block);
1335 self.block_offsets += header_len + body_len;
1336 }
1337
1338 let block = crate::Block::new(
1340 self.block_offsets as i64,
1341 meta.padded_header_len as i32,
1342 meta.body_len as i64,
1343 );
1344 self.record_blocks.push(block);
1345 self.block_offsets += meta.padded_header_len + meta.body_len;
1346 Ok(())
1347 }
1348
1349 pub fn finish(&mut self) -> Result<(), ArrowError> {
1351 if self.finished {
1352 return Err(ArrowError::IpcError(
1353 "Cannot write footer to file writer as it is closed".to_string(),
1354 ));
1355 }
1356
1357 write_continuation(&mut self.writer, &self.write_options, 0)?;
1359
1360 let mut fbb = FlatBufferBuilder::new();
1361 let dictionaries = fbb.create_vector(&self.dictionary_blocks);
1362 let record_batches = fbb.create_vector(&self.record_blocks);
1363
1364 self.dictionary_tracker.clear();
1366 let schema = IpcSchemaEncoder::new()
1367 .with_dictionary_tracker(&mut self.dictionary_tracker)
1368 .schema_to_fb_offset(&mut fbb, &self.schema);
1369 let fb_custom_metadata = (!self.custom_metadata.is_empty())
1370 .then(|| crate::convert::metadata_to_fb(&mut fbb, &self.custom_metadata));
1371
1372 let root = {
1373 let mut footer_builder = crate::FooterBuilder::new(&mut fbb);
1374 footer_builder.add_version(self.write_options.metadata_version);
1375 footer_builder.add_schema(schema);
1376 footer_builder.add_dictionaries(dictionaries);
1377 footer_builder.add_recordBatches(record_batches);
1378 if let Some(fb_custom_metadata) = fb_custom_metadata {
1379 footer_builder.add_custom_metadata(fb_custom_metadata);
1380 }
1381 footer_builder.finish()
1382 };
1383 fbb.finish(root, None);
1384 let footer_data = fbb.finished_data();
1385 self.writer.write_all(footer_data)?;
1386 self.writer
1387 .write_all(&(footer_data.len() as i32).to_le_bytes())?;
1388 self.writer.write_all(&super::ARROW_MAGIC)?;
1389 self.writer.flush()?;
1390 self.finished = true;
1391
1392 Ok(())
1393 }
1394
1395 pub fn schema(&self) -> &SchemaRef {
1397 &self.schema
1398 }
1399
1400 pub fn get_ref(&self) -> &W {
1402 &self.writer
1403 }
1404
1405 pub fn get_mut(&mut self) -> &mut W {
1409 &mut self.writer
1410 }
1411
1412 pub fn flush(&mut self) -> Result<(), ArrowError> {
1416 self.writer.flush()?;
1417 Ok(())
1418 }
1419
1420 pub fn into_inner(mut self) -> Result<W, ArrowError> {
1429 if !self.finished {
1430 self.finish()?;
1432 }
1433 Ok(self.writer)
1434 }
1435}
1436
1437impl<W: Write> RecordBatchWriter for FileWriter<W> {
1438 fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1439 self.write(batch)
1440 }
1441
1442 fn close(mut self) -> Result<(), ArrowError> {
1443 self.finish()
1444 }
1445}
1446
1447pub struct StreamWriter<W> {
1521 writer: W,
1523 write_options: IpcWriteOptions,
1525 finished: bool,
1527 dictionary_tracker: DictionaryTracker,
1529
1530 data_gen: IpcDataGenerator,
1531
1532 compression_context: CompressionContext,
1533}
1534
1535impl<W: Write> StreamWriter<BufWriter<W>> {
1536 pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1540 Self::try_new(BufWriter::new(writer), schema)
1541 }
1542}
1543
1544impl<W: Write> StreamWriter<W> {
1545 pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1553 let write_options = IpcWriteOptions::default();
1554 Self::try_new_with_options(writer, schema, write_options)
1555 }
1556
1557 pub fn try_new_with_options(
1563 mut writer: W,
1564 schema: &Schema,
1565 write_options: IpcWriteOptions,
1566 ) -> Result<Self, ArrowError> {
1567 let data_gen = IpcDataGenerator::default();
1568 let mut dictionary_tracker = DictionaryTracker::new(false);
1569
1570 let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker(
1572 schema,
1573 &mut dictionary_tracker,
1574 &write_options,
1575 );
1576 write_message(&mut writer, encoded_message, &write_options)?;
1577 Ok(Self {
1578 writer,
1579 write_options,
1580 finished: false,
1581 dictionary_tracker,
1582 data_gen,
1583 compression_context: CompressionContext::default(),
1584 })
1585 }
1586
1587 pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1589 if self.finished {
1590 return Err(ArrowError::IpcError(
1591 "Cannot write record batch to stream writer as it is closed".to_string(),
1592 ));
1593 }
1594
1595 self.data_gen.write(
1596 batch,
1597 &mut self.dictionary_tracker,
1598 &self.write_options,
1599 &mut self.compression_context,
1600 &mut self.writer,
1601 )?;
1602 Ok(())
1603 }
1604
1605 pub fn finish(&mut self) -> Result<(), ArrowError> {
1607 if self.finished {
1608 return Err(ArrowError::IpcError(
1609 "Cannot write footer to stream writer as it is closed".to_string(),
1610 ));
1611 }
1612
1613 write_continuation(&mut self.writer, &self.write_options, 0)?;
1614 self.writer.flush()?;
1615
1616 self.finished = true;
1617
1618 Ok(())
1619 }
1620
1621 pub fn get_ref(&self) -> &W {
1623 &self.writer
1624 }
1625
1626 pub fn get_mut(&mut self) -> &mut W {
1630 &mut self.writer
1631 }
1632
1633 pub fn flush(&mut self) -> Result<(), ArrowError> {
1637 self.writer.flush()?;
1638 Ok(())
1639 }
1640
1641 pub fn into_inner(mut self) -> Result<W, ArrowError> {
1679 if !self.finished {
1680 self.finish()?;
1682 }
1683 Ok(self.writer)
1684 }
1685}
1686
1687impl<W: Write> RecordBatchWriter for StreamWriter<W> {
1688 fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1689 self.write(batch)
1690 }
1691
1692 fn close(mut self) -> Result<(), ArrowError> {
1693 self.finish()
1694 }
1695}
1696
1697pub struct EncodedData {
1699 pub ipc_message: Vec<u8>,
1701 pub arrow_data: Vec<u8>,
1703}
1704pub fn write_message<W: Write>(
1706 mut writer: W,
1707 encoded: EncodedData,
1708 write_options: &IpcWriteOptions,
1709) -> Result<(usize, usize), ArrowError> {
1710 let arrow_data_len = encoded.arrow_data.len();
1711 if arrow_data_len % usize::from(write_options.alignment) != 0 {
1712 return Err(ArrowError::MemoryError(
1713 "Arrow data not aligned".to_string(),
1714 ));
1715 }
1716
1717 let a = usize::from(write_options.alignment - 1);
1718 let buffer = encoded.ipc_message;
1719 let flatbuf_size = buffer.len();
1720 let prefix_size = if write_options.write_legacy_ipc_format {
1721 4
1722 } else {
1723 8
1724 };
1725 let aligned_size = (flatbuf_size + prefix_size + a) & !a;
1726 let padding_bytes = aligned_size - flatbuf_size - prefix_size;
1727
1728 write_continuation(
1729 &mut writer,
1730 write_options,
1731 (aligned_size - prefix_size) as i32,
1732 )?;
1733
1734 if flatbuf_size > 0 {
1736 writer.write_all(&buffer)?;
1737 }
1738 writer.write_all(&PADDING[..padding_bytes])?;
1740
1741 let body_len = if arrow_data_len > 0 {
1743 write_body_buffers(&mut writer, &encoded.arrow_data, write_options.alignment)?
1744 } else {
1745 0
1746 };
1747
1748 Ok((aligned_size, body_len))
1749}
1750
1751fn write_body_buffers<W: Write>(
1752 mut writer: W,
1753 data: &[u8],
1754 alignment: u8,
1755) -> Result<usize, ArrowError> {
1756 let len = data.len();
1757 let pad_len = pad_to_alignment(alignment, len);
1758 let total_len = len + pad_len;
1759
1760 writer.write_all(data)?;
1762 if pad_len > 0 {
1763 writer.write_all(&PADDING[..pad_len])?;
1764 }
1765
1766 Ok(total_len)
1767}
1768
1769fn write_continuation<W: Write>(
1772 mut writer: W,
1773 write_options: &IpcWriteOptions,
1774 total_len: i32,
1775) -> Result<usize, ArrowError> {
1776 let mut written = 8;
1777
1778 match write_options.metadata_version {
1780 crate::MetadataVersion::V1 | crate::MetadataVersion::V2 | crate::MetadataVersion::V3 => {
1781 unreachable!("Options with the metadata version cannot be created")
1782 }
1783 crate::MetadataVersion::V4 => {
1784 if !write_options.write_legacy_ipc_format {
1785 writer.write_all(&CONTINUATION_MARKER)?;
1787 written = 4;
1788 }
1789 writer.write_all(&total_len.to_le_bytes()[..])?;
1790 }
1791 crate::MetadataVersion::V5 => {
1792 writer.write_all(&CONTINUATION_MARKER)?;
1794 writer.write_all(&total_len.to_le_bytes()[..])?;
1795 }
1796 z => panic!("Unsupported crate::MetadataVersion {z:?}"),
1797 };
1798
1799 Ok(written)
1800}
1801
1802fn has_validity_bitmap(data_type: &DataType, write_options: &IpcWriteOptions) -> bool {
1806 if write_options.metadata_version < crate::MetadataVersion::V5 {
1807 !matches!(data_type, DataType::Null)
1808 } else {
1809 !matches!(
1810 data_type,
1811 DataType::Null | DataType::Union(_, _) | DataType::RunEndEncoded(_, _)
1812 )
1813 }
1814}
1815
1816#[inline]
1818fn buffer_need_truncate(
1819 array_offset: usize,
1820 buffer: &Buffer,
1821 spec: &BufferSpec,
1822 min_length: usize,
1823) -> bool {
1824 spec != &BufferSpec::AlwaysNull && (array_offset != 0 || min_length < buffer.len())
1825}
1826
1827#[inline]
1829fn get_buffer_element_width(spec: &BufferSpec) -> usize {
1830 match spec {
1831 BufferSpec::FixedWidth { byte_width, .. } => *byte_width,
1832 _ => 0,
1833 }
1834}
1835
1836fn reencode_offsets<O: OffsetSizeTrait>(
1839 offsets: &Buffer,
1840 data: &ArrayData,
1841) -> (Buffer, usize, usize) {
1842 let offsets_slice: &[O] = offsets.typed_data::<O>();
1843 let offset_slice = &offsets_slice[data.offset()..data.offset() + data.len() + 1];
1844
1845 let start_offset = offset_slice.first().unwrap();
1846 let end_offset = offset_slice.last().unwrap();
1847
1848 let offsets = match start_offset.as_usize() {
1849 0 => {
1850 let size = size_of::<O>();
1851 offsets.slice_with_length(data.offset() * size, (data.len() + 1) * size)
1852 }
1853 _ => offset_slice.iter().map(|x| *x - *start_offset).collect(),
1854 };
1855
1856 let start_offset = start_offset.as_usize();
1857 let end_offset = end_offset.as_usize();
1858
1859 (offsets, start_offset, end_offset - start_offset)
1860}
1861
1862fn get_byte_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, Buffer) {
1868 if data.is_empty() {
1869 let mut offsets = MutableBuffer::new(size_of::<O>());
1872 offsets.extend_from_slice(O::usize_as(0).to_byte_slice());
1873 return (offsets.into(), MutableBuffer::new(0).into());
1874 }
1875
1876 let (offsets, original_start_offset, len) = reencode_offsets::<O>(&data.buffers()[0], data);
1877 let values = data.buffers()[1].slice_with_length(original_start_offset, len);
1878 (offsets, values)
1879}
1880
1881fn get_list_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, ArrayData) {
1884 if data.is_empty() {
1885 let mut offsets = MutableBuffer::new(size_of::<O>());
1888 offsets.extend_from_slice(O::usize_as(0).to_byte_slice());
1889 return (offsets.into(), data.child_data()[0].slice(0, 0));
1890 }
1891
1892 let (offsets, original_start_offset, len) = reencode_offsets::<O>(&data.buffers()[0], data);
1893 let child_data = data.child_data()[0].slice(original_start_offset, len);
1894 (offsets, child_data)
1895}
1896
1897fn get_list_view_array_buffers<O: OffsetSizeTrait>(
1903 data: &ArrayData,
1904) -> (Buffer, Buffer, ArrayData) {
1905 if data.is_empty() {
1906 return (
1907 MutableBuffer::new(0).into(),
1908 MutableBuffer::new(0).into(),
1909 data.child_data()[0].slice(0, 0),
1910 );
1911 }
1912
1913 let offsets = &data.buffers()[0];
1914 let sizes = &data.buffers()[1];
1915
1916 let element_size = std::mem::size_of::<O>();
1917 let offsets_slice =
1918 offsets.slice_with_length(data.offset() * element_size, data.len() * element_size);
1919 let sizes_slice =
1920 sizes.slice_with_length(data.offset() * element_size, data.len() * element_size);
1921
1922 let child_data = data.child_data()[0].clone();
1923
1924 (offsets_slice, sizes_slice, child_data)
1925}
1926
1927fn get_or_truncate_buffer(array_data: &ArrayData) -> Buffer {
1934 let buffer = &array_data.buffers()[0];
1935 let layout = layout(array_data.data_type());
1936 let spec = &layout.buffers[0];
1937
1938 let byte_width = get_buffer_element_width(spec);
1939 let min_length = array_data.len() * byte_width;
1940 if buffer_need_truncate(array_data.offset(), buffer, spec, min_length) {
1941 let byte_offset = array_data.offset() * byte_width;
1942 let buffer_length = min(min_length, buffer.len() - byte_offset);
1943 buffer.slice_with_length(byte_offset, buffer_length)
1944 } else {
1945 buffer.clone()
1946 }
1947}
1948
1949fn write_array_data(
1955 array_data: &ArrayData,
1956 meta: &mut IpcMetadataBuilder,
1957 sink: &mut IpcBodySink<'_>,
1958 offset: i64,
1959 compression_codec: Option<CompressionCodec>,
1960 compression_context: &mut CompressionContext,
1961 write_options: &IpcWriteOptions,
1962) -> Result<i64, ArrowError> {
1963 let mut offset = offset;
1964 let num_rows = array_data.len();
1965 if !matches!(array_data.data_type(), DataType::Null) {
1966 meta.nodes.push(crate::FieldNode::new(
1967 num_rows as i64,
1968 array_data.null_count() as i64,
1969 ));
1970 } else {
1971 meta.nodes
1973 .push(crate::FieldNode::new(num_rows as i64, num_rows as i64));
1974 }
1975 if has_validity_bitmap(array_data.data_type(), write_options) {
1976 let null_buffer = match array_data.nulls() {
1978 None => {
1979 let num_bytes = bit_util::ceil(num_rows, 8);
1981 let buffer = MutableBuffer::new(num_bytes);
1982 let buffer = buffer.with_bitset(num_bytes, true);
1983 buffer.into()
1984 }
1985 Some(buffer) => buffer.inner().sliced(),
1986 };
1987
1988 offset = encode_sink_buffer(
1989 null_buffer,
1990 meta,
1991 sink,
1992 offset,
1993 compression_codec,
1994 compression_context,
1995 write_options.alignment,
1996 )?;
1997 }
1998
1999 let data_type = array_data.data_type();
2000 if matches!(data_type, DataType::Binary | DataType::Utf8) {
2001 let (offsets, values) = get_byte_array_buffers::<i32>(array_data);
2002 for buffer in [offsets, values] {
2003 offset = encode_sink_buffer(
2004 buffer,
2005 meta,
2006 sink,
2007 offset,
2008 compression_codec,
2009 compression_context,
2010 write_options.alignment,
2011 )?;
2012 }
2013 } else if matches!(data_type, DataType::BinaryView | DataType::Utf8View) {
2014 let views = get_or_truncate_buffer(array_data);
2021 offset = encode_sink_buffer(
2022 views,
2023 meta,
2024 sink,
2025 offset,
2026 compression_codec,
2027 compression_context,
2028 write_options.alignment,
2029 )?;
2030
2031 for buffer in array_data.buffers().iter().skip(1) {
2032 offset = encode_sink_buffer(
2033 buffer.clone(),
2034 meta,
2035 sink,
2036 offset,
2037 compression_codec,
2038 compression_context,
2039 write_options.alignment,
2040 )?;
2041 }
2042 } else if matches!(data_type, DataType::LargeBinary | DataType::LargeUtf8) {
2043 let (offsets, values) = get_byte_array_buffers::<i64>(array_data);
2044 for buffer in [offsets, values] {
2045 offset = encode_sink_buffer(
2046 buffer,
2047 meta,
2048 sink,
2049 offset,
2050 compression_codec,
2051 compression_context,
2052 write_options.alignment,
2053 )?;
2054 }
2055 } else if DataType::is_numeric(data_type)
2056 || DataType::is_temporal(data_type)
2057 || matches!(
2058 array_data.data_type(),
2059 DataType::FixedSizeBinary(_) | DataType::Dictionary(_, _)
2060 )
2061 {
2062 assert_eq!(array_data.buffers().len(), 1);
2064
2065 let buffer = get_or_truncate_buffer(array_data);
2066 offset = encode_sink_buffer(
2067 buffer,
2068 meta,
2069 sink,
2070 offset,
2071 compression_codec,
2072 compression_context,
2073 write_options.alignment,
2074 )?;
2075 } else if matches!(data_type, DataType::Boolean) {
2076 assert_eq!(array_data.buffers().len(), 1);
2079
2080 let buffer = &array_data.buffers()[0];
2081 let buffer = buffer.bit_slice(array_data.offset(), array_data.len());
2082 offset = encode_sink_buffer(
2083 buffer,
2084 meta,
2085 sink,
2086 offset,
2087 compression_codec,
2088 compression_context,
2089 write_options.alignment,
2090 )?;
2091 } else if matches!(
2092 data_type,
2093 DataType::List(_) | DataType::LargeList(_) | DataType::Map(_, _)
2094 ) {
2095 assert_eq!(array_data.buffers().len(), 1);
2096 assert_eq!(array_data.child_data().len(), 1);
2097
2098 let (offsets, sliced_child_data) = match data_type {
2100 DataType::List(_) => get_list_array_buffers::<i32>(array_data),
2101 DataType::Map(_, _) => get_list_array_buffers::<i32>(array_data),
2102 DataType::LargeList(_) => get_list_array_buffers::<i64>(array_data),
2103 _ => unreachable!(),
2104 };
2105 offset = encode_sink_buffer(
2106 offsets,
2107 meta,
2108 sink,
2109 offset,
2110 compression_codec,
2111 compression_context,
2112 write_options.alignment,
2113 )?;
2114 offset = write_array_data(
2115 &sliced_child_data,
2116 meta,
2117 sink,
2118 offset,
2119 compression_codec,
2120 compression_context,
2121 write_options,
2122 )?;
2123 return Ok(offset);
2124 } else if matches!(
2125 data_type,
2126 DataType::ListView(_) | DataType::LargeListView(_)
2127 ) {
2128 assert_eq!(array_data.buffers().len(), 2); assert_eq!(array_data.child_data().len(), 1);
2130
2131 let (offsets, sizes, child_data) = match data_type {
2132 DataType::ListView(_) => get_list_view_array_buffers::<i32>(array_data),
2133 DataType::LargeListView(_) => get_list_view_array_buffers::<i64>(array_data),
2134 _ => unreachable!(),
2135 };
2136
2137 offset = encode_sink_buffer(
2138 offsets,
2139 meta,
2140 sink,
2141 offset,
2142 compression_codec,
2143 compression_context,
2144 write_options.alignment,
2145 )?;
2146 offset = encode_sink_buffer(
2147 sizes,
2148 meta,
2149 sink,
2150 offset,
2151 compression_codec,
2152 compression_context,
2153 write_options.alignment,
2154 )?;
2155
2156 offset = write_array_data(
2157 &child_data,
2158 meta,
2159 sink,
2160 offset,
2161 compression_codec,
2162 compression_context,
2163 write_options,
2164 )?;
2165 return Ok(offset);
2166 } else if let DataType::FixedSizeList(_, fixed_size) = data_type {
2167 assert_eq!(array_data.child_data().len(), 1);
2168 let fixed_size = *fixed_size as usize;
2169
2170 let child_offset = array_data.offset() * fixed_size;
2171 let child_length = array_data.len() * fixed_size;
2172 let child_data = array_data.child_data()[0].slice(child_offset, child_length);
2173
2174 offset = write_array_data(
2175 &child_data,
2176 meta,
2177 sink,
2178 offset,
2179 compression_codec,
2180 compression_context,
2181 write_options,
2182 )?;
2183 return Ok(offset);
2184 } else {
2185 for buffer in array_data.buffers() {
2186 offset = encode_sink_buffer(
2187 buffer.clone(),
2188 meta,
2189 sink,
2190 offset,
2191 compression_codec,
2192 compression_context,
2193 write_options.alignment,
2194 )?;
2195 }
2196 }
2197
2198 match array_data.data_type() {
2199 DataType::Dictionary(_, _) => {}
2200 DataType::RunEndEncoded(_, _) => {
2201 let arr = unslice_run_array(array_data.clone())?;
2203 for data_ref in arr.child_data() {
2205 offset = write_array_data(
2207 data_ref,
2208 meta,
2209 sink,
2210 offset,
2211 compression_codec,
2212 compression_context,
2213 write_options,
2214 )?;
2215 }
2216 }
2217 _ => {
2218 for data_ref in array_data.child_data() {
2220 offset = write_array_data(
2222 data_ref,
2223 meta,
2224 sink,
2225 offset,
2226 compression_codec,
2227 compression_context,
2228 write_options,
2229 )?;
2230 }
2231 }
2232 }
2233 Ok(offset)
2234}
2235
2236fn encode_sink_buffer(
2250 buffer: Buffer,
2251 ipc_meta_data: &mut IpcMetadataBuilder,
2252 sink: &mut IpcBodySink<'_>,
2253 offset: i64,
2254 compression_codec: Option<CompressionCodec>,
2255 compression_context: &mut CompressionContext,
2256 alignment: u8,
2257) -> Result<i64, ArrowError> {
2258 let (encoded, len) = match compression_codec {
2259 None => {
2260 let len = buffer.len() as i64;
2261 (EncodedBuffer::Raw(buffer), len)
2262 }
2263 Some(codec) => {
2264 let mut scratch = Vec::new();
2265 let written =
2266 codec.compress_to_vec(buffer.as_slice(), &mut scratch, compression_context)?;
2267 let len = i64::try_from(written)
2268 .map_err(|e| ArrowError::InvalidArgumentError(format!("{e}")))?;
2269 (EncodedBuffer::Compressed(scratch), len)
2270 }
2271 };
2272
2273 let pad_len = pad_to_alignment(alignment, len as usize);
2274 sink.write(pad_len, encoded);
2275 ipc_meta_data.buffers.push(crate::Buffer::new(offset, len));
2276 Ok(offset + len + pad_len as i64)
2277}
2278
2279const PADDING: [u8; 64] = [0; 64];
2280
2281#[inline]
2287fn estimate_encoded_buffer_count(dt: &DataType) -> usize {
2288 match dt {
2289 DataType::Null => 0,
2290
2291 DataType::Binary | DataType::Utf8 | DataType::LargeBinary | DataType::LargeUtf8 => 3,
2292
2293 DataType::BinaryView | DataType::Utf8View => 3,
2294
2295 DataType::List(f) | DataType::LargeList(f) | DataType::Map(f, _) => {
2296 2 + estimate_encoded_buffer_count(f.data_type())
2297 }
2298
2299 DataType::ListView(f) | DataType::LargeListView(f) => {
2300 3 + estimate_encoded_buffer_count(f.data_type())
2301 }
2302
2303 DataType::FixedSizeList(f, _) => 1 + estimate_encoded_buffer_count(f.data_type()),
2304
2305 DataType::Struct(fields) => {
2306 1 + fields
2307 .iter()
2308 .map(|f| estimate_encoded_buffer_count(f.data_type()))
2309 .sum::<usize>()
2310 }
2311
2312 DataType::Dictionary(_, _) => 2,
2314
2315 DataType::Union(fields, UnionMode::Sparse) => {
2316 1 + fields
2317 .iter()
2318 .map(|(_, f)| estimate_encoded_buffer_count(f.data_type()))
2319 .sum::<usize>()
2320 }
2321 DataType::Union(fields, UnionMode::Dense) => {
2322 2 + fields
2323 .iter()
2324 .map(|(_, f)| estimate_encoded_buffer_count(f.data_type()))
2325 .sum::<usize>()
2326 }
2327
2328 DataType::RunEndEncoded(run_ends, values) => {
2329 estimate_encoded_buffer_count(run_ends.data_type())
2330 + estimate_encoded_buffer_count(values.data_type())
2331 }
2332 _ => 2,
2334 }
2335}
2336
2337#[inline]
2339fn pad_to_alignment(alignment: u8, len: usize) -> usize {
2340 let a = usize::from(alignment - 1);
2341 ((len + a) & !a) - len
2342}
2343
2344#[cfg(test)]
2345mod tests {
2346 use std::hash::Hasher;
2347 use std::io::Cursor;
2348 use std::io::Seek;
2349
2350 use arrow_array::builder::FixedSizeListBuilder;
2351 use arrow_array::builder::Float32Builder;
2352 use arrow_array::builder::Int64Builder;
2353 use arrow_array::builder::MapBuilder;
2354 use arrow_array::builder::StringViewBuilder;
2355 use arrow_array::builder::UnionBuilder;
2356 use arrow_array::builder::{
2357 GenericListBuilder, GenericListViewBuilder, ListBuilder, StringBuilder,
2358 };
2359 use arrow_array::builder::{PrimitiveRunBuilder, UInt32Builder};
2360 use arrow_array::types::*;
2361 use arrow_buffer::ScalarBuffer;
2362
2363 use crate::MetadataVersion;
2364 use crate::convert::fb_to_schema;
2365 use crate::reader::*;
2366 use crate::root_as_footer;
2367
2368 use super::*;
2369
2370 fn serialize_file(rb: &RecordBatch) -> Vec<u8> {
2371 let mut writer = FileWriter::try_new(vec![], rb.schema_ref()).unwrap();
2372 writer.write(rb).unwrap();
2373 writer.finish().unwrap();
2374 writer.into_inner().unwrap()
2375 }
2376
2377 fn deserialize_file(bytes: Vec<u8>) -> RecordBatch {
2378 let mut reader = FileReader::try_new(Cursor::new(bytes), None).unwrap();
2379 reader.next().unwrap().unwrap()
2380 }
2381
2382 fn serialize_stream(record: &RecordBatch) -> Vec<u8> {
2383 const IPC_ALIGNMENT: usize = 8;
2387
2388 let mut stream_writer = StreamWriter::try_new_with_options(
2389 vec![],
2390 record.schema_ref(),
2391 IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
2392 )
2393 .unwrap();
2394 stream_writer.write(record).unwrap();
2395 stream_writer.finish().unwrap();
2396 stream_writer.into_inner().unwrap()
2397 }
2398
2399 fn deserialize_stream(bytes: Vec<u8>) -> RecordBatch {
2400 let mut stream_reader = StreamReader::try_new(Cursor::new(bytes), None).unwrap();
2401 stream_reader.next().unwrap().unwrap()
2402 }
2403
2404 #[test]
2405 #[cfg(feature = "lz4")]
2406 fn test_write_empty_record_batch_lz4_compression() {
2407 let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
2408 let values: Vec<Option<i32>> = vec![];
2409 let array = Int32Array::from(values);
2410 let record_batch =
2411 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2412
2413 let mut file = tempfile::tempfile().unwrap();
2414
2415 {
2416 let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2417 .unwrap()
2418 .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
2419 .unwrap();
2420
2421 let mut writer =
2422 FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2423 writer.write(&record_batch).unwrap();
2424 writer.finish().unwrap();
2425 }
2426 file.rewind().unwrap();
2427 {
2428 let reader = FileReader::try_new(file, None).unwrap();
2430 for read_batch in reader {
2431 read_batch
2432 .unwrap()
2433 .columns()
2434 .iter()
2435 .zip(record_batch.columns())
2436 .for_each(|(a, b)| {
2437 assert_eq!(a.data_type(), b.data_type());
2438 assert_eq!(a.len(), b.len());
2439 assert_eq!(a.null_count(), b.null_count());
2440 });
2441 }
2442 }
2443 }
2444
2445 #[test]
2446 #[cfg(feature = "lz4")]
2447 fn test_write_file_with_lz4_compression() {
2448 let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
2449 let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
2450 let array = Int32Array::from(values);
2451 let record_batch =
2452 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2453
2454 let mut file = tempfile::tempfile().unwrap();
2455 {
2456 let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2457 .unwrap()
2458 .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
2459 .unwrap();
2460
2461 let mut writer =
2462 FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2463 writer.write(&record_batch).unwrap();
2464 writer.finish().unwrap();
2465 }
2466 file.rewind().unwrap();
2467 {
2468 let reader = FileReader::try_new(file, None).unwrap();
2470 for read_batch in reader {
2471 read_batch
2472 .unwrap()
2473 .columns()
2474 .iter()
2475 .zip(record_batch.columns())
2476 .for_each(|(a, b)| {
2477 assert_eq!(a.data_type(), b.data_type());
2478 assert_eq!(a.len(), b.len());
2479 assert_eq!(a.null_count(), b.null_count());
2480 });
2481 }
2482 }
2483 }
2484
2485 #[test]
2486 #[cfg(feature = "zstd")]
2487 fn test_write_file_with_zstd_compression() {
2488 let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
2489 let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
2490 let array = Int32Array::from(values);
2491 let record_batch =
2492 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2493 let mut file = tempfile::tempfile().unwrap();
2494 {
2495 let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2496 .unwrap()
2497 .try_with_compression(Some(crate::CompressionType::ZSTD))
2498 .unwrap();
2499
2500 let mut writer =
2501 FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2502 writer.write(&record_batch).unwrap();
2503 writer.finish().unwrap();
2504 }
2505 file.rewind().unwrap();
2506 {
2507 let reader = FileReader::try_new(file, None).unwrap();
2509 for read_batch in reader {
2510 read_batch
2511 .unwrap()
2512 .columns()
2513 .iter()
2514 .zip(record_batch.columns())
2515 .for_each(|(a, b)| {
2516 assert_eq!(a.data_type(), b.data_type());
2517 assert_eq!(a.len(), b.len());
2518 assert_eq!(a.null_count(), b.null_count());
2519 });
2520 }
2521 }
2522 }
2523
2524 #[test]
2525 fn test_write_file() {
2526 let schema = Schema::new(vec![Field::new("field1", DataType::UInt32, true)]);
2527 let values: Vec<Option<u32>> = vec![
2528 Some(999),
2529 None,
2530 Some(235),
2531 Some(123),
2532 None,
2533 None,
2534 None,
2535 None,
2536 None,
2537 ];
2538 let array1 = UInt32Array::from(values);
2539 let batch =
2540 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array1) as ArrayRef])
2541 .unwrap();
2542 let mut file = tempfile::tempfile().unwrap();
2543 {
2544 let mut writer = FileWriter::try_new(&mut file, &schema).unwrap();
2545
2546 writer.write(&batch).unwrap();
2547 writer.finish().unwrap();
2548 }
2549 file.rewind().unwrap();
2550
2551 {
2552 let mut reader = FileReader::try_new(file, None).unwrap();
2553 while let Some(Ok(read_batch)) = reader.next() {
2554 read_batch
2555 .columns()
2556 .iter()
2557 .zip(batch.columns())
2558 .for_each(|(a, b)| {
2559 assert_eq!(a.data_type(), b.data_type());
2560 assert_eq!(a.len(), b.len());
2561 assert_eq!(a.null_count(), b.null_count());
2562 });
2563 }
2564 }
2565 }
2566
2567 #[test]
2568 fn test_empty_utf8_ipc_writes_nonempty_offsets_buffer() {
2569 let name = StringArray::from(Vec::<String>::new());
2570 let (offsets, values) = get_byte_array_buffers::<i32>(&name.to_data());
2571
2572 assert_eq!(name.len(), 0);
2573 assert_eq!(
2574 offsets.len(),
2575 std::mem::size_of::<i32>(),
2576 "offsets buffer should contain one zero i32 offset"
2577 );
2578 assert_eq!(values.len(), 0, "values buffer should remain empty");
2579 }
2580
2581 #[test]
2582 fn test_empty_large_utf8_ipc_writes_nonempty_offsets_buffer() {
2583 let name = LargeStringArray::from(Vec::<String>::new());
2584 let (offsets, values) = get_byte_array_buffers::<i64>(&name.to_data());
2585
2586 assert_eq!(name.len(), 0);
2587 assert_eq!(
2588 offsets.len(),
2589 std::mem::size_of::<i64>(),
2590 "offsets buffer should contain one zero i64 offset"
2591 );
2592 assert_eq!(values.len(), 0, "values buffer should remain empty");
2593 }
2594
2595 #[test]
2596 fn test_empty_list_ipc_writes_nonempty_offsets_buffer() {
2597 let list = GenericListBuilder::<i32, _>::new(UInt32Builder::new()).finish();
2598 let (offsets, child_data) = get_list_array_buffers::<i32>(&list.to_data());
2599
2600 assert_eq!(list.len(), 0);
2601 assert_eq!(
2602 offsets.len(),
2603 std::mem::size_of::<i32>(),
2604 "offsets buffer should contain one zero i32 offset"
2605 );
2606 assert_eq!(child_data.len(), 0, "child data should remain empty");
2607 }
2608
2609 #[test]
2610 fn test_empty_large_list_ipc_writes_nonempty_offsets_buffer() {
2611 let list = GenericListBuilder::<i64, _>::new(UInt32Builder::new()).finish();
2612 let (offsets, child_data) = get_list_array_buffers::<i64>(&list.to_data());
2613
2614 assert_eq!(list.len(), 0);
2615 assert_eq!(
2616 offsets.len(),
2617 std::mem::size_of::<i64>(),
2618 "offsets buffer should contain one zero i64 offset"
2619 );
2620 assert_eq!(child_data.len(), 0, "child data should remain empty");
2621 }
2622
2623 fn write_null_file(options: IpcWriteOptions) {
2624 let schema = Schema::new(vec![
2625 Field::new("nulls", DataType::Null, true),
2626 Field::new("int32s", DataType::Int32, false),
2627 Field::new("nulls2", DataType::Null, true),
2628 Field::new("f64s", DataType::Float64, false),
2629 ]);
2630 let array1 = NullArray::new(32);
2631 let array2 = Int32Array::from(vec![1; 32]);
2632 let array3 = NullArray::new(32);
2633 let array4 = Float64Array::from(vec![f64::NAN; 32]);
2634 let batch = RecordBatch::try_new(
2635 Arc::new(schema.clone()),
2636 vec![
2637 Arc::new(array1) as ArrayRef,
2638 Arc::new(array2) as ArrayRef,
2639 Arc::new(array3) as ArrayRef,
2640 Arc::new(array4) as ArrayRef,
2641 ],
2642 )
2643 .unwrap();
2644 let mut file = tempfile::tempfile().unwrap();
2645 {
2646 let mut writer = FileWriter::try_new_with_options(&mut file, &schema, options).unwrap();
2647
2648 writer.write(&batch).unwrap();
2649 writer.finish().unwrap();
2650 }
2651
2652 file.rewind().unwrap();
2653
2654 {
2655 let reader = FileReader::try_new(file, None).unwrap();
2656 reader.for_each(|maybe_batch| {
2657 maybe_batch
2658 .unwrap()
2659 .columns()
2660 .iter()
2661 .zip(batch.columns())
2662 .for_each(|(a, b)| {
2663 assert_eq!(a.data_type(), b.data_type());
2664 assert_eq!(a.len(), b.len());
2665 assert_eq!(a.null_count(), b.null_count());
2666 });
2667 });
2668 }
2669 }
2670 #[test]
2671 fn test_write_null_file_v4() {
2672 write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap());
2673 write_null_file(IpcWriteOptions::try_new(8, true, MetadataVersion::V4).unwrap());
2674 write_null_file(IpcWriteOptions::try_new(64, false, MetadataVersion::V4).unwrap());
2675 write_null_file(IpcWriteOptions::try_new(64, true, MetadataVersion::V4).unwrap());
2676 }
2677
2678 #[test]
2679 fn test_write_null_file_v5() {
2680 write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap());
2681 write_null_file(IpcWriteOptions::try_new(64, false, MetadataVersion::V5).unwrap());
2682 }
2683
2684 #[test]
2685 fn track_union_nested_dict() {
2686 let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2687
2688 let array = Arc::new(inner) as ArrayRef;
2689
2690 #[allow(deprecated)]
2692 let dctfield = Field::new_dict("dict", array.data_type().clone(), false, 0, false);
2693 let union_fields = [(0, Arc::new(dctfield))].into_iter().collect();
2694
2695 let types = [0, 0, 0].into_iter().collect::<ScalarBuffer<i8>>();
2696 let offsets = [0, 1, 2].into_iter().collect::<ScalarBuffer<i32>>();
2697
2698 let union = UnionArray::try_new(union_fields, types, Some(offsets), vec![array]).unwrap();
2699
2700 let schema = Arc::new(Schema::new(vec![Field::new(
2701 "union",
2702 union.data_type().clone(),
2703 false,
2704 )]));
2705
2706 let r#gen = IpcDataGenerator::default();
2707 let mut dict_tracker = DictionaryTracker::new(false);
2708 r#gen.schema_to_bytes_with_dictionary_tracker(
2709 &schema,
2710 &mut dict_tracker,
2711 &IpcWriteOptions::default(),
2712 );
2713
2714 let batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap();
2715
2716 r#gen
2717 .encode(
2718 &batch,
2719 &mut dict_tracker,
2720 &Default::default(),
2721 &mut Default::default(),
2722 )
2723 .unwrap();
2724
2725 assert!(dict_tracker.written.contains_key(&0));
2728 }
2729
2730 #[test]
2731 fn track_struct_nested_dict() {
2732 let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2733
2734 let array = Arc::new(inner) as ArrayRef;
2735
2736 #[allow(deprecated)]
2738 let dctfield = Arc::new(Field::new_dict(
2739 "dict",
2740 array.data_type().clone(),
2741 false,
2742 2,
2743 false,
2744 ));
2745
2746 let s = StructArray::from(vec![(dctfield, array)]);
2747 let struct_array = Arc::new(s) as ArrayRef;
2748
2749 let schema = Arc::new(Schema::new(vec![Field::new(
2750 "struct",
2751 struct_array.data_type().clone(),
2752 false,
2753 )]));
2754
2755 let r#gen = IpcDataGenerator::default();
2756 let mut dict_tracker = DictionaryTracker::new(false);
2757 r#gen.schema_to_bytes_with_dictionary_tracker(
2758 &schema,
2759 &mut dict_tracker,
2760 &IpcWriteOptions::default(),
2761 );
2762
2763 let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
2764
2765 r#gen
2766 .encode(
2767 &batch,
2768 &mut dict_tracker,
2769 &Default::default(),
2770 &mut Default::default(),
2771 )
2772 .unwrap();
2773
2774 assert!(dict_tracker.written.contains_key(&0));
2775 }
2776
2777 fn write_union_file(options: IpcWriteOptions) {
2778 let schema = Schema::new(vec![Field::new_union(
2779 "union",
2780 vec![0, 1],
2781 vec![
2782 Field::new("a", DataType::Int32, false),
2783 Field::new("c", DataType::Float64, false),
2784 ],
2785 UnionMode::Sparse,
2786 )]);
2787 let mut builder = UnionBuilder::with_capacity_sparse(5);
2788 builder.append::<Int32Type>("a", 1).unwrap();
2789 builder.append_null::<Int32Type>("a").unwrap();
2790 builder.append::<Float64Type>("c", 3.0).unwrap();
2791 builder.append_null::<Float64Type>("c").unwrap();
2792 builder.append::<Int32Type>("a", 4).unwrap();
2793 let union = builder.build().unwrap();
2794
2795 let batch =
2796 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(union) as ArrayRef])
2797 .unwrap();
2798
2799 let mut file = tempfile::tempfile().unwrap();
2800 {
2801 let mut writer = FileWriter::try_new_with_options(&mut file, &schema, options).unwrap();
2802
2803 writer.write(&batch).unwrap();
2804 writer.finish().unwrap();
2805 }
2806 file.rewind().unwrap();
2807
2808 {
2809 let reader = FileReader::try_new(file, None).unwrap();
2810 reader.for_each(|maybe_batch| {
2811 maybe_batch
2812 .unwrap()
2813 .columns()
2814 .iter()
2815 .zip(batch.columns())
2816 .for_each(|(a, b)| {
2817 assert_eq!(a.data_type(), b.data_type());
2818 assert_eq!(a.len(), b.len());
2819 assert_eq!(a.null_count(), b.null_count());
2820 });
2821 });
2822 }
2823 }
2824
2825 #[test]
2826 fn test_write_union_file_v4_v5() {
2827 write_union_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap());
2828 write_union_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap());
2829 }
2830
2831 #[test]
2832 fn test_write_view_types() {
2833 const LONG_TEST_STRING: &str =
2834 "This is a long string to make sure binary view array handles it";
2835 let schema = Schema::new(vec![
2836 Field::new("field1", DataType::BinaryView, true),
2837 Field::new("field2", DataType::Utf8View, true),
2838 ]);
2839 let values: Vec<Option<&[u8]>> = vec![
2840 Some(b"foo"),
2841 Some(b"bar"),
2842 Some(LONG_TEST_STRING.as_bytes()),
2843 ];
2844 let binary_array = BinaryViewArray::from_iter(values);
2845 let utf8_array =
2846 StringViewArray::from_iter(vec![Some("foo"), Some("bar"), Some(LONG_TEST_STRING)]);
2847 let record_batch = RecordBatch::try_new(
2848 Arc::new(schema.clone()),
2849 vec![Arc::new(binary_array), Arc::new(utf8_array)],
2850 )
2851 .unwrap();
2852
2853 let mut file = tempfile::tempfile().unwrap();
2854 {
2855 let mut writer = FileWriter::try_new(&mut file, &schema).unwrap();
2856 writer.write(&record_batch).unwrap();
2857 writer.finish().unwrap();
2858 }
2859 file.rewind().unwrap();
2860 {
2861 let mut reader = FileReader::try_new(&file, None).unwrap();
2862 let read_batch = reader.next().unwrap().unwrap();
2863 read_batch
2864 .columns()
2865 .iter()
2866 .zip(record_batch.columns())
2867 .for_each(|(a, b)| {
2868 assert_eq!(a, b);
2869 });
2870 }
2871 file.rewind().unwrap();
2872 {
2873 let mut reader = FileReader::try_new(&file, Some(vec![0])).unwrap();
2874 let read_batch = reader.next().unwrap().unwrap();
2875 assert_eq!(read_batch.num_columns(), 1);
2876 let read_array = read_batch.column(0);
2877 let write_array = record_batch.column(0);
2878 assert_eq!(read_array, write_array);
2879 }
2880 }
2881
2882 #[test]
2883 fn truncate_ipc_record_batch() {
2884 fn create_batch(rows: usize) -> RecordBatch {
2885 let schema = Schema::new(vec![
2886 Field::new("a", DataType::Int32, false),
2887 Field::new("b", DataType::Utf8, false),
2888 ]);
2889
2890 let a = Int32Array::from_iter_values(0..rows as i32);
2891 let b = StringArray::from_iter_values((0..rows).map(|i| i.to_string()));
2892
2893 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap()
2894 }
2895
2896 let big_record_batch = create_batch(65536);
2897
2898 let length = 5;
2899 let small_record_batch = create_batch(length);
2900
2901 let offset = 2;
2902 let record_batch_slice = big_record_batch.slice(offset, length);
2903 assert!(
2904 serialize_stream(&big_record_batch).len() > serialize_stream(&small_record_batch).len()
2905 );
2906 assert_eq!(
2907 serialize_stream(&small_record_batch).len(),
2908 serialize_stream(&record_batch_slice).len()
2909 );
2910
2911 assert_eq!(
2912 deserialize_stream(serialize_stream(&record_batch_slice)),
2913 record_batch_slice
2914 );
2915 }
2916
2917 #[test]
2918 fn truncate_ipc_record_batch_with_nulls() {
2919 fn create_batch() -> RecordBatch {
2920 let schema = Schema::new(vec![
2921 Field::new("a", DataType::Int32, true),
2922 Field::new("b", DataType::Utf8, true),
2923 ]);
2924
2925 let a = Int32Array::from(vec![Some(1), None, Some(1), None, Some(1)]);
2926 let b = StringArray::from(vec![None, Some("a"), Some("a"), None, Some("a")]);
2927
2928 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap()
2929 }
2930
2931 let record_batch = create_batch();
2932 let record_batch_slice = record_batch.slice(1, 2);
2933 let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2934
2935 assert!(
2936 serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2937 );
2938
2939 assert!(deserialized_batch.column(0).is_null(0));
2940 assert!(deserialized_batch.column(0).is_valid(1));
2941 assert!(deserialized_batch.column(1).is_valid(0));
2942 assert!(deserialized_batch.column(1).is_valid(1));
2943
2944 assert_eq!(record_batch_slice, deserialized_batch);
2945 }
2946
2947 #[test]
2948 fn truncate_ipc_dictionary_array() {
2949 fn create_batch() -> RecordBatch {
2950 let values: StringArray = [Some("foo"), Some("bar"), Some("baz")]
2951 .into_iter()
2952 .collect();
2953 let keys: Int32Array = [Some(0), Some(2), None, Some(1)].into_iter().collect();
2954
2955 let array = DictionaryArray::new(keys, Arc::new(values));
2956
2957 let schema = Schema::new(vec![Field::new("dict", array.data_type().clone(), true)]);
2958
2959 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap()
2960 }
2961
2962 let record_batch = create_batch();
2963 let record_batch_slice = record_batch.slice(1, 2);
2964 let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2965
2966 assert!(
2967 serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2968 );
2969
2970 assert!(deserialized_batch.column(0).is_valid(0));
2971 assert!(deserialized_batch.column(0).is_null(1));
2972
2973 assert_eq!(record_batch_slice, deserialized_batch);
2974 }
2975
2976 #[test]
2977 fn truncate_ipc_struct_array() {
2978 fn create_batch() -> RecordBatch {
2979 let strings: StringArray = [Some("foo"), None, Some("bar"), Some("baz")]
2980 .into_iter()
2981 .collect();
2982 let ints: Int32Array = [Some(0), Some(2), None, Some(1)].into_iter().collect();
2983
2984 let struct_array = StructArray::from(vec![
2985 (
2986 Arc::new(Field::new("s", DataType::Utf8, true)),
2987 Arc::new(strings) as ArrayRef,
2988 ),
2989 (
2990 Arc::new(Field::new("c", DataType::Int32, true)),
2991 Arc::new(ints) as ArrayRef,
2992 ),
2993 ]);
2994
2995 let schema = Schema::new(vec![Field::new(
2996 "struct_array",
2997 struct_array.data_type().clone(),
2998 true,
2999 )]);
3000
3001 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(struct_array)]).unwrap()
3002 }
3003
3004 let record_batch = create_batch();
3005 let record_batch_slice = record_batch.slice(1, 2);
3006 let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
3007
3008 assert!(
3009 serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
3010 );
3011
3012 let structs = deserialized_batch
3013 .column(0)
3014 .as_any()
3015 .downcast_ref::<StructArray>()
3016 .unwrap();
3017
3018 assert!(structs.column(0).is_null(0));
3019 assert!(structs.column(0).is_valid(1));
3020 assert!(structs.column(1).is_valid(0));
3021 assert!(structs.column(1).is_null(1));
3022 assert_eq!(record_batch_slice, deserialized_batch);
3023 }
3024
3025 #[test]
3026 fn truncate_ipc_string_array_with_all_empty_string() {
3027 fn create_batch() -> RecordBatch {
3028 let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
3029 let a = StringArray::from(vec![Some(""), Some(""), Some(""), Some(""), Some("")]);
3030 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap()
3031 }
3032
3033 let record_batch = create_batch();
3034 let record_batch_slice = record_batch.slice(0, 1);
3035 let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
3036
3037 assert!(
3038 serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
3039 );
3040 assert_eq!(record_batch_slice, deserialized_batch);
3041 }
3042
3043 #[test]
3044 fn test_stream_writer_writes_array_slice() {
3045 let array = UInt32Array::from(vec![Some(1), Some(2), Some(3)]);
3046 assert_eq!(
3047 vec![Some(1), Some(2), Some(3)],
3048 array.iter().collect::<Vec<_>>()
3049 );
3050
3051 let sliced = array.slice(1, 2);
3052 assert_eq!(vec![Some(2), Some(3)], sliced.iter().collect::<Vec<_>>());
3053
3054 let batch = RecordBatch::try_new(
3055 Arc::new(Schema::new(vec![Field::new("a", DataType::UInt32, true)])),
3056 vec![Arc::new(sliced)],
3057 )
3058 .expect("new batch");
3059
3060 let mut writer = StreamWriter::try_new(vec![], batch.schema_ref()).expect("new writer");
3061 writer.write(&batch).expect("write");
3062 let outbuf = writer.into_inner().expect("inner");
3063
3064 let mut reader = StreamReader::try_new(&outbuf[..], None).expect("new reader");
3065 let read_batch = reader.next().unwrap().expect("read batch");
3066
3067 let read_array: &UInt32Array = read_batch.column(0).as_primitive();
3068 assert_eq!(
3069 vec![Some(2), Some(3)],
3070 read_array.iter().collect::<Vec<_>>()
3071 );
3072 }
3073
3074 #[test]
3075 fn test_large_slice_uint32() {
3076 ensure_roundtrip(Arc::new(UInt32Array::from_iter(
3077 (0..8000).map(|i| if i % 2 == 0 { Some(i) } else { None }),
3078 )));
3079 }
3080
3081 #[test]
3082 fn test_large_slice_string() {
3083 let strings: Vec<_> = (0..8000)
3084 .map(|i| {
3085 if i % 2 == 0 {
3086 Some(format!("value{i}"))
3087 } else {
3088 None
3089 }
3090 })
3091 .collect();
3092
3093 ensure_roundtrip(Arc::new(StringArray::from(strings)));
3094 }
3095
3096 #[test]
3097 fn test_large_slice_string_list() {
3098 let mut ls = ListBuilder::new(StringBuilder::new());
3099
3100 let mut s = String::new();
3101 for row_number in 0..8000 {
3102 if row_number % 2 == 0 {
3103 for list_element in 0..1000 {
3104 s.clear();
3105 use std::fmt::Write;
3106 write!(&mut s, "value{row_number}-{list_element}").unwrap();
3107 ls.values().append_value(&s);
3108 }
3109 ls.append(true)
3110 } else {
3111 ls.append(false); }
3113 }
3114
3115 ensure_roundtrip(Arc::new(ls.finish()));
3116 }
3117
3118 #[test]
3119 fn test_large_slice_string_list_of_lists() {
3120 let mut ls = ListBuilder::new(ListBuilder::new(StringBuilder::new()));
3124
3125 for _ in 0..4000 {
3126 ls.values().append(true);
3127 ls.append(true)
3128 }
3129
3130 let mut s = String::new();
3131 for row_number in 0..4000 {
3132 if row_number % 2 == 0 {
3133 for list_element in 0..1000 {
3134 s.clear();
3135 use std::fmt::Write;
3136 write!(&mut s, "value{row_number}-{list_element}").unwrap();
3137 ls.values().values().append_value(&s);
3138 }
3139 ls.values().append(true);
3140 ls.append(true)
3141 } else {
3142 ls.append(false); }
3144 }
3145
3146 ensure_roundtrip(Arc::new(ls.finish()));
3147 }
3148
3149 fn ensure_roundtrip(array: ArrayRef) {
3151 let num_rows = array.len();
3152 let orig_batch = RecordBatch::try_from_iter(vec![("a", array)]).unwrap();
3153 let sliced_batch = orig_batch.slice(1, num_rows - 1);
3155
3156 let schema = orig_batch.schema();
3157 let stream_data = {
3158 let mut writer = StreamWriter::try_new(vec![], &schema).unwrap();
3159 writer.write(&sliced_batch).unwrap();
3160 writer.into_inner().unwrap()
3161 };
3162 let read_batch = {
3163 let projection = None;
3164 let mut reader = StreamReader::try_new(Cursor::new(stream_data), projection).unwrap();
3165 reader
3166 .next()
3167 .expect("expect no errors reading batch")
3168 .expect("expect batch")
3169 };
3170 assert_eq!(sliced_batch, read_batch);
3171
3172 let file_data = {
3173 let mut writer = FileWriter::try_new_buffered(vec![], &schema).unwrap();
3174 writer.write(&sliced_batch).unwrap();
3175 writer.into_inner().unwrap().into_inner().unwrap()
3176 };
3177 let read_batch = {
3178 let projection = None;
3179 let mut reader = FileReader::try_new(Cursor::new(file_data), projection).unwrap();
3180 reader
3181 .next()
3182 .expect("expect no errors reading batch")
3183 .expect("expect batch")
3184 };
3185 assert_eq!(sliced_batch, read_batch);
3186
3187 }
3189
3190 #[test]
3191 fn encode_bools_slice() {
3192 assert_bool_roundtrip([true, false], 1, 1);
3194
3195 assert_bool_roundtrip(
3197 [
3198 true, false, true, true, false, false, true, true, true, false, false, false, true,
3199 true, true, true, false, false, false, false, true, true, true, true, true, false,
3200 false, false, false, false,
3201 ],
3202 13,
3203 17,
3204 );
3205
3206 assert_bool_roundtrip(
3208 [
3209 true, false, true, true, false, false, true, true, true, false, false, false,
3210 ],
3211 8,
3212 2,
3213 );
3214
3215 assert_bool_roundtrip(
3217 [
3218 true, false, true, true, false, false, true, true, true, false, false, false, true,
3219 true, true, true, true, false, false, false, false, false,
3220 ],
3221 8,
3222 8,
3223 );
3224 }
3225
3226 fn assert_bool_roundtrip<const N: usize>(bools: [bool; N], offset: usize, length: usize) {
3227 let val_bool_field = Field::new("val", DataType::Boolean, false);
3228
3229 let schema = Arc::new(Schema::new(vec![val_bool_field]));
3230
3231 let bools = BooleanArray::from(bools.to_vec());
3232
3233 let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(bools)]).unwrap();
3234 let batch = batch.slice(offset, length);
3235
3236 let data = serialize_stream(&batch);
3237 let batch2 = deserialize_stream(data);
3238 assert_eq!(batch, batch2);
3239 }
3240
3241 #[test]
3242 fn test_run_array_unslice() {
3243 let total_len = 80;
3244 let vals: Vec<Option<i32>> = vec![Some(1), None, Some(2), Some(3), Some(4), None, Some(5)];
3245 let repeats: Vec<usize> = vec![3, 4, 1, 2];
3246 let mut input_array: Vec<Option<i32>> = Vec::with_capacity(total_len);
3247 for ix in 0_usize..32 {
3248 let repeat: usize = repeats[ix % repeats.len()];
3249 let val: Option<i32> = vals[ix % vals.len()];
3250 input_array.resize(input_array.len() + repeat, val);
3251 }
3252
3253 let mut builder =
3255 PrimitiveRunBuilder::<Int16Type, Int32Type>::with_capacity(input_array.len());
3256 builder.extend(input_array.iter().copied());
3257 let run_array = builder.finish();
3258
3259 for slice_len in 1..=total_len {
3261 let sliced_run_array: RunArray<Int16Type> =
3263 run_array.slice(0, slice_len).into_data().into();
3264
3265 let unsliced_run_array = into_zero_offset_run_array(sliced_run_array).unwrap();
3267 let typed = unsliced_run_array
3268 .downcast::<PrimitiveArray<Int32Type>>()
3269 .unwrap();
3270 let expected: Vec<Option<i32>> = input_array.iter().take(slice_len).copied().collect();
3271 let actual: Vec<Option<i32>> = typed.into_iter().collect();
3272 assert_eq!(expected, actual);
3273
3274 let sliced_run_array: RunArray<Int16Type> = run_array
3276 .slice(total_len - slice_len, slice_len)
3277 .into_data()
3278 .into();
3279
3280 let unsliced_run_array = into_zero_offset_run_array(sliced_run_array).unwrap();
3282 let typed = unsliced_run_array
3283 .downcast::<PrimitiveArray<Int32Type>>()
3284 .unwrap();
3285 let expected: Vec<Option<i32>> = input_array
3286 .iter()
3287 .skip(total_len - slice_len)
3288 .copied()
3289 .collect();
3290 let actual: Vec<Option<i32>> = typed.into_iter().collect();
3291 assert_eq!(expected, actual);
3292 }
3293 }
3294
3295 fn generate_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3296 let mut ls = GenericListBuilder::<O, _>::new(UInt32Builder::new());
3297
3298 for i in 0..100_000 {
3299 for value in [i, i, i] {
3300 ls.values().append_value(value);
3301 }
3302 ls.append(true)
3303 }
3304
3305 ls.finish()
3306 }
3307
3308 fn generate_utf8view_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3309 let mut ls = GenericListBuilder::<O, _>::new(StringViewBuilder::new());
3310
3311 for i in 0..100_000 {
3312 for value in [
3313 format!("value{}", i),
3314 format!("value{}", i),
3315 format!("value{}", i),
3316 ] {
3317 ls.values().append_value(&value);
3318 }
3319 ls.append(true)
3320 }
3321
3322 ls.finish()
3323 }
3324
3325 fn generate_string_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3326 let mut ls = GenericListBuilder::<O, _>::new(StringBuilder::new());
3327
3328 for i in 0..100_000 {
3329 for value in [
3330 format!("value{}", i),
3331 format!("value{}", i),
3332 format!("value{}", i),
3333 ] {
3334 ls.values().append_value(&value);
3335 }
3336 ls.append(true)
3337 }
3338
3339 ls.finish()
3340 }
3341
3342 fn generate_nested_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3343 let mut ls =
3344 GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(UInt32Builder::new()));
3345
3346 for _i in 0..10_000 {
3347 for j in 0..10 {
3348 for value in [j, j, j, j] {
3349 ls.values().values().append_value(value);
3350 }
3351 ls.values().append(true)
3352 }
3353 ls.append(true);
3354 }
3355
3356 ls.finish()
3357 }
3358
3359 fn generate_nested_list_data_starting_at_zero<O: OffsetSizeTrait>() -> GenericListArray<O> {
3360 let mut ls =
3361 GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(UInt32Builder::new()));
3362
3363 for _i in 0..999 {
3364 ls.values().append(true);
3365 ls.append(true);
3366 }
3367
3368 for j in 0..10 {
3369 for value in [j, j, j, j] {
3370 ls.values().values().append_value(value);
3371 }
3372 ls.values().append(true)
3373 }
3374 ls.append(true);
3375
3376 for i in 0..9_000 {
3377 for j in 0..10 {
3378 for value in [i + j, i + j, i + j, i + j] {
3379 ls.values().values().append_value(value);
3380 }
3381 ls.values().append(true)
3382 }
3383 ls.append(true);
3384 }
3385
3386 ls.finish()
3387 }
3388
3389 fn generate_map_array_data() -> MapArray {
3390 let keys_builder = UInt32Builder::new();
3391 let values_builder = UInt32Builder::new();
3392
3393 let mut builder = MapBuilder::new(None, keys_builder, values_builder);
3394
3395 for i in 0..100_000 {
3396 for _j in 0..3 {
3397 builder.keys().append_value(i);
3398 builder.values().append_value(i * 2);
3399 }
3400 builder.append(true).unwrap();
3401 }
3402
3403 builder.finish()
3404 }
3405
3406 #[test]
3407 fn reencode_offsets_when_first_offset_is_not_zero() {
3408 let original_list = generate_list_data::<i32>();
3409 let original_data = original_list.into_data();
3410 let slice_data = original_data.slice(75, 7);
3411 let (new_offsets, original_start, length) =
3412 reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
3413 assert_eq!(
3414 vec![0, 3, 6, 9, 12, 15, 18, 21],
3415 new_offsets.typed_data::<i32>()
3416 );
3417 assert_eq!(225, original_start);
3418 assert_eq!(21, length);
3419 }
3420
3421 #[test]
3422 fn reencode_offsets_when_first_offset_is_zero() {
3423 let mut ls = GenericListBuilder::<i32, _>::new(UInt32Builder::new());
3424 ls.append(true);
3426 ls.values().append_value(35);
3427 ls.values().append_value(42);
3428 ls.append(true);
3429 let original_list = ls.finish();
3430 let original_data = original_list.into_data();
3431
3432 let slice_data = original_data.slice(1, 1);
3433 let (new_offsets, original_start, length) =
3434 reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
3435 assert_eq!(vec![0, 2], new_offsets.typed_data::<i32>());
3436 assert_eq!(0, original_start);
3437 assert_eq!(2, length);
3438 }
3439
3440 fn roundtrip_ensure_sliced_smaller(in_batch: RecordBatch, expected_size_factor: usize) {
3443 let in_sliced = in_batch.slice(999, 1);
3445
3446 let bytes_batch = serialize_file(&in_batch);
3447 let bytes_sliced = serialize_file(&in_sliced);
3448
3449 assert!(bytes_sliced.len() < (bytes_batch.len() / expected_size_factor));
3451
3452 let out_batch = deserialize_file(bytes_batch);
3454 assert_eq!(in_batch, out_batch);
3455
3456 let out_sliced = deserialize_file(bytes_sliced);
3457 assert_eq!(in_sliced, out_sliced);
3458 }
3459
3460 #[test]
3461 fn encode_lists() {
3462 let val_inner = Field::new_list_field(DataType::UInt32, true);
3463 let val_list_field = Field::new("val", DataType::List(Arc::new(val_inner)), false);
3464 let schema = Arc::new(Schema::new(vec![val_list_field]));
3465
3466 let values = Arc::new(generate_list_data::<i32>());
3467
3468 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3469 roundtrip_ensure_sliced_smaller(in_batch, 1000);
3470 }
3471
3472 #[test]
3473 fn encode_empty_list() {
3474 let val_inner = Field::new_list_field(DataType::UInt32, true);
3475 let val_list_field = Field::new("val", DataType::List(Arc::new(val_inner)), false);
3476 let schema = Arc::new(Schema::new(vec![val_list_field]));
3477
3478 let values = Arc::new(generate_list_data::<i32>());
3479
3480 let in_batch = RecordBatch::try_new(schema, vec![values])
3481 .unwrap()
3482 .slice(999, 0);
3483 let out_batch = deserialize_file(serialize_file(&in_batch));
3484 assert_eq!(in_batch, out_batch);
3485 }
3486
3487 #[test]
3488 fn encode_large_lists() {
3489 let val_inner = Field::new_list_field(DataType::UInt32, true);
3490 let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3491 let schema = Arc::new(Schema::new(vec![val_list_field]));
3492
3493 let values = Arc::new(generate_list_data::<i64>());
3494
3495 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3498 roundtrip_ensure_sliced_smaller(in_batch, 1000);
3499 }
3500
3501 #[test]
3502 fn encode_large_lists_non_zero_offset() {
3503 let val_inner = Field::new_list_field(DataType::UInt32, true);
3504 let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3505 let schema = Arc::new(Schema::new(vec![val_list_field]));
3506
3507 let values = Arc::new(generate_list_data::<i64>());
3508
3509 check_sliced_list_array(schema, values);
3510 }
3511
3512 #[test]
3513 fn encode_large_lists_string_non_zero_offset() {
3514 let val_inner = Field::new_list_field(DataType::Utf8, true);
3515 let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3516 let schema = Arc::new(Schema::new(vec![val_list_field]));
3517
3518 let values = Arc::new(generate_string_list_data::<i64>());
3519
3520 check_sliced_list_array(schema, values);
3521 }
3522
3523 #[test]
3524 fn encode_large_list_string_view_non_zero_offset() {
3525 let val_inner = Field::new_list_field(DataType::Utf8View, true);
3526 let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3527 let schema = Arc::new(Schema::new(vec![val_list_field]));
3528
3529 let values = Arc::new(generate_utf8view_list_data::<i64>());
3530
3531 check_sliced_list_array(schema, values);
3532 }
3533
3534 fn check_sliced_list_array(schema: Arc<Schema>, values: Arc<GenericListArray<i64>>) {
3535 for (offset, len) in [(999, 1), (0, 13), (47, 12), (values.len() - 13, 13)] {
3536 let in_batch = RecordBatch::try_new(schema.clone(), vec![values.clone()])
3537 .unwrap()
3538 .slice(offset, len);
3539 let out_batch = deserialize_file(serialize_file(&in_batch));
3540 assert_eq!(in_batch, out_batch);
3541 }
3542 }
3543
3544 #[test]
3545 fn encode_nested_lists() {
3546 let inner_int = Arc::new(Field::new_list_field(DataType::UInt32, true));
3547 let inner_list_field = Arc::new(Field::new_list_field(DataType::List(inner_int), true));
3548 let list_field = Field::new("val", DataType::List(inner_list_field), true);
3549 let schema = Arc::new(Schema::new(vec![list_field]));
3550
3551 let values = Arc::new(generate_nested_list_data::<i32>());
3552
3553 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3554 roundtrip_ensure_sliced_smaller(in_batch, 1000);
3555 }
3556
3557 #[test]
3558 fn encode_nested_lists_starting_at_zero() {
3559 let inner_int = Arc::new(Field::new("item", DataType::UInt32, true));
3560 let inner_list_field = Arc::new(Field::new("item", DataType::List(inner_int), true));
3561 let list_field = Field::new("val", DataType::List(inner_list_field), true);
3562 let schema = Arc::new(Schema::new(vec![list_field]));
3563
3564 let values = Arc::new(generate_nested_list_data_starting_at_zero::<i32>());
3565
3566 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3567 roundtrip_ensure_sliced_smaller(in_batch, 1);
3568 }
3569
3570 #[test]
3571 fn encode_map_array() {
3572 let keys = Arc::new(Field::new("keys", DataType::UInt32, false));
3573 let values = Arc::new(Field::new("values", DataType::UInt32, true));
3574 let map_field = Field::new_map("map", "entries", keys, values, false, true);
3575 let schema = Arc::new(Schema::new(vec![map_field]));
3576
3577 let values = Arc::new(generate_map_array_data());
3578
3579 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3580 roundtrip_ensure_sliced_smaller(in_batch, 1000);
3581 }
3582
3583 fn generate_list_view_data<O: OffsetSizeTrait>() -> GenericListViewArray<O> {
3584 let mut builder = GenericListViewBuilder::<O, _>::new(UInt32Builder::new());
3585
3586 for i in 0u32..100_000 {
3587 if i.is_multiple_of(10_000) {
3588 builder.append(false);
3589 continue;
3590 }
3591 for value in [i, i, i] {
3592 builder.values().append_value(value);
3593 }
3594 builder.append(true);
3595 }
3596
3597 builder.finish()
3598 }
3599
3600 #[test]
3601 fn encode_list_view_arrays() {
3602 let val_inner = Field::new_list_field(DataType::UInt32, true);
3603 let val_field = Field::new("val", DataType::ListView(Arc::new(val_inner)), true);
3604 let schema = Arc::new(Schema::new(vec![val_field]));
3605
3606 let values = Arc::new(generate_list_view_data::<i32>());
3607
3608 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3609 let out_batch = deserialize_file(serialize_file(&in_batch));
3610 assert_eq!(in_batch, out_batch);
3611 }
3612
3613 #[test]
3614 fn encode_large_list_view_arrays() {
3615 let val_inner = Field::new_list_field(DataType::UInt32, true);
3616 let val_field = Field::new("val", DataType::LargeListView(Arc::new(val_inner)), true);
3617 let schema = Arc::new(Schema::new(vec![val_field]));
3618
3619 let values = Arc::new(generate_list_view_data::<i64>());
3620
3621 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3622 let out_batch = deserialize_file(serialize_file(&in_batch));
3623 assert_eq!(in_batch, out_batch);
3624 }
3625
3626 #[test]
3627 fn check_sliced_list_view_array() {
3628 let inner = Field::new_list_field(DataType::UInt32, true);
3629 let field = Field::new("val", DataType::ListView(Arc::new(inner)), true);
3630 let schema = Arc::new(Schema::new(vec![field]));
3631 let values = Arc::new(generate_list_view_data::<i32>());
3632
3633 for (offset, len) in [(999, 1), (0, 13), (47, 12), (values.len() - 13, 13)] {
3634 let in_batch = RecordBatch::try_new(schema.clone(), vec![values.clone()])
3635 .unwrap()
3636 .slice(offset, len);
3637 let out_batch = deserialize_file(serialize_file(&in_batch));
3638 assert_eq!(in_batch, out_batch);
3639 }
3640 }
3641
3642 #[test]
3643 fn check_sliced_large_list_view_array() {
3644 let inner = Field::new_list_field(DataType::UInt32, true);
3645 let field = Field::new("val", DataType::LargeListView(Arc::new(inner)), true);
3646 let schema = Arc::new(Schema::new(vec![field]));
3647 let values = Arc::new(generate_list_view_data::<i64>());
3648
3649 for (offset, len) in [(999, 1), (0, 13), (47, 12), (values.len() - 13, 13)] {
3650 let in_batch = RecordBatch::try_new(schema.clone(), vec![values.clone()])
3651 .unwrap()
3652 .slice(offset, len);
3653 let out_batch = deserialize_file(serialize_file(&in_batch));
3654 assert_eq!(in_batch, out_batch);
3655 }
3656 }
3657
3658 fn generate_nested_list_view_data<O: OffsetSizeTrait>() -> GenericListViewArray<O> {
3659 let inner_builder = UInt32Builder::new();
3660 let middle_builder = GenericListViewBuilder::<O, _>::new(inner_builder);
3661 let mut outer_builder = GenericListViewBuilder::<O, _>::new(middle_builder);
3662
3663 for i in 0u32..10_000 {
3664 if i.is_multiple_of(1_000) {
3665 outer_builder.append(false);
3666 continue;
3667 }
3668
3669 for _ in 0..3 {
3670 for value in [i, i + 1, i + 2] {
3671 outer_builder.values().values().append_value(value);
3672 }
3673 outer_builder.values().append(true);
3674 }
3675 outer_builder.append(true);
3676 }
3677
3678 outer_builder.finish()
3679 }
3680
3681 #[test]
3682 fn encode_nested_list_views() {
3683 let inner_int = Arc::new(Field::new_list_field(DataType::UInt32, true));
3684 let inner_list_field = Arc::new(Field::new_list_field(DataType::ListView(inner_int), true));
3685 let list_field = Field::new("val", DataType::ListView(inner_list_field), true);
3686 let schema = Arc::new(Schema::new(vec![list_field]));
3687
3688 let values = Arc::new(generate_nested_list_view_data::<i32>());
3689
3690 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3691 let out_batch = deserialize_file(serialize_file(&in_batch));
3692 assert_eq!(in_batch, out_batch);
3693 }
3694
3695 fn test_roundtrip_list_view_of_dict_impl<OffsetSize: OffsetSizeTrait, U: ArrowNativeType>(
3696 list_data_type: DataType,
3697 offsets: &[U; 5],
3698 sizes: &[U; 4],
3699 ) {
3700 let values = StringArray::from(vec![Some("alpha"), None, Some("beta"), Some("gamma")]);
3701 let keys = Int32Array::from_iter_values([0, 0, 1, 2, 3, 0, 2]);
3702 let dict_array = DictionaryArray::new(keys, Arc::new(values));
3703 let dict_data = dict_array.to_data();
3704
3705 let value_offsets = Buffer::from_slice_ref(offsets);
3706 let value_sizes = Buffer::from_slice_ref(sizes);
3707
3708 let list_data = ArrayData::builder(list_data_type)
3709 .len(4)
3710 .add_buffer(value_offsets)
3711 .add_buffer(value_sizes)
3712 .add_child_data(dict_data)
3713 .build()
3714 .unwrap();
3715 let list_view_array = GenericListViewArray::<OffsetSize>::from(list_data);
3716
3717 let schema = Arc::new(Schema::new(vec![Field::new(
3718 "f1",
3719 list_view_array.data_type().clone(),
3720 false,
3721 )]));
3722 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(list_view_array)]).unwrap();
3723
3724 let output_batch = deserialize_file(serialize_file(&input_batch));
3725 assert_eq!(input_batch, output_batch);
3726
3727 let output_batch = deserialize_stream(serialize_stream(&input_batch));
3728 assert_eq!(input_batch, output_batch);
3729 }
3730
3731 #[test]
3732 fn test_roundtrip_list_view_of_dict() {
3733 #[allow(deprecated)]
3734 let list_data_type = DataType::ListView(Arc::new(Field::new_dict(
3735 "item",
3736 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3737 true,
3738 1,
3739 false,
3740 )));
3741 let offsets: &[i32; 5] = &[0, 2, 4, 4, 7];
3742 let sizes: &[i32; 4] = &[2, 2, 0, 3];
3743 test_roundtrip_list_view_of_dict_impl::<i32, i32>(list_data_type, offsets, sizes);
3744 }
3745
3746 #[test]
3747 fn test_roundtrip_large_list_view_of_dict() {
3748 #[allow(deprecated)]
3749 let list_data_type = DataType::LargeListView(Arc::new(Field::new_dict(
3750 "item",
3751 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3752 true,
3753 2,
3754 false,
3755 )));
3756 let offsets: &[i64; 5] = &[0, 2, 4, 4, 7];
3757 let sizes: &[i64; 4] = &[2, 2, 0, 3];
3758 test_roundtrip_list_view_of_dict_impl::<i64, i64>(list_data_type, offsets, sizes);
3759 }
3760
3761 #[test]
3762 fn test_roundtrip_sliced_list_view_of_dict() {
3763 #[allow(deprecated)]
3764 let list_data_type = DataType::ListView(Arc::new(Field::new_dict(
3765 "item",
3766 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3767 true,
3768 3,
3769 false,
3770 )));
3771
3772 let values = StringArray::from(vec![Some("alpha"), None, Some("beta"), Some("gamma")]);
3773 let keys = Int32Array::from_iter_values([0, 0, 1, 2, 3, 0, 2, 1, 0, 3, 2, 1]);
3774 let dict_array = DictionaryArray::new(keys, Arc::new(values));
3775 let dict_data = dict_array.to_data();
3776
3777 let offsets: &[i32; 7] = &[0, 2, 4, 4, 7, 9, 12];
3778 let sizes: &[i32; 6] = &[2, 2, 0, 3, 2, 3];
3779 let value_offsets = Buffer::from_slice_ref(offsets);
3780 let value_sizes = Buffer::from_slice_ref(sizes);
3781
3782 let list_data = ArrayData::builder(list_data_type)
3783 .len(6)
3784 .add_buffer(value_offsets)
3785 .add_buffer(value_sizes)
3786 .add_child_data(dict_data)
3787 .build()
3788 .unwrap();
3789 let list_view_array = GenericListViewArray::<i32>::from(list_data);
3790
3791 let schema = Arc::new(Schema::new(vec![Field::new(
3792 "f1",
3793 list_view_array.data_type().clone(),
3794 false,
3795 )]));
3796 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(list_view_array)]).unwrap();
3797
3798 let sliced_batch = input_batch.slice(1, 4);
3799
3800 let output_batch = deserialize_file(serialize_file(&sliced_batch));
3801 assert_eq!(sliced_batch, output_batch);
3802
3803 let output_batch = deserialize_stream(serialize_stream(&sliced_batch));
3804 assert_eq!(sliced_batch, output_batch);
3805 }
3806
3807 #[test]
3808 fn test_roundtrip_dense_union_of_dict() {
3809 let values = StringArray::from(vec![Some("alpha"), None, Some("beta"), Some("gamma")]);
3810 let keys = Int32Array::from_iter_values([0, 0, 1, 2, 3, 0, 2]);
3811 let dict_array = DictionaryArray::new(keys, Arc::new(values));
3812
3813 #[allow(deprecated)]
3814 let dict_field = Arc::new(Field::new_dict(
3815 "dict",
3816 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3817 true,
3818 1,
3819 false,
3820 ));
3821 let int_field = Arc::new(Field::new("int", DataType::Int32, false));
3822 let union_fields = UnionFields::try_new(vec![0, 1], vec![dict_field, int_field]).unwrap();
3823
3824 let types = ScalarBuffer::from(vec![0i8, 0, 1, 0, 1, 0, 0]);
3825 let offsets = ScalarBuffer::from(vec![0i32, 1, 0, 2, 1, 3, 4]);
3826
3827 let int_array = Int32Array::from(vec![100, 200]);
3828
3829 let union = UnionArray::try_new(
3830 union_fields.clone(),
3831 types,
3832 Some(offsets),
3833 vec![Arc::new(dict_array), Arc::new(int_array)],
3834 )
3835 .unwrap();
3836
3837 let schema = Arc::new(Schema::new(vec![Field::new(
3838 "union",
3839 DataType::Union(union_fields, UnionMode::Dense),
3840 false,
3841 )]));
3842 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap();
3843
3844 let output_batch = deserialize_file(serialize_file(&input_batch));
3845 assert_eq!(input_batch, output_batch);
3846
3847 let output_batch = deserialize_stream(serialize_stream(&input_batch));
3848 assert_eq!(input_batch, output_batch);
3849 }
3850
3851 #[test]
3852 fn test_roundtrip_sparse_union_of_dict() {
3853 let values = StringArray::from(vec![Some("alpha"), None, Some("beta"), Some("gamma")]);
3854 let keys = Int32Array::from_iter_values([0, 0, 1, 2, 3, 0, 2]);
3855 let dict_array = DictionaryArray::new(keys, Arc::new(values));
3856
3857 #[allow(deprecated)]
3858 let dict_field = Arc::new(Field::new_dict(
3859 "dict",
3860 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3861 true,
3862 2,
3863 false,
3864 ));
3865 let int_field = Arc::new(Field::new("int", DataType::Int32, false));
3866 let union_fields = UnionFields::try_new(vec![0, 1], vec![dict_field, int_field]).unwrap();
3867
3868 let types = ScalarBuffer::from(vec![0i8, 0, 1, 0, 1, 0, 0]);
3869
3870 let int_array = Int32Array::from(vec![0, 0, 100, 0, 200, 0, 0]);
3871
3872 let union = UnionArray::try_new(
3873 union_fields.clone(),
3874 types,
3875 None,
3876 vec![Arc::new(dict_array), Arc::new(int_array)],
3877 )
3878 .unwrap();
3879
3880 let schema = Arc::new(Schema::new(vec![Field::new(
3881 "union",
3882 DataType::Union(union_fields, UnionMode::Sparse),
3883 false,
3884 )]));
3885 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap();
3886
3887 let output_batch = deserialize_file(serialize_file(&input_batch));
3888 assert_eq!(input_batch, output_batch);
3889
3890 let output_batch = deserialize_stream(serialize_stream(&input_batch));
3891 assert_eq!(input_batch, output_batch);
3892 }
3893
3894 #[test]
3895 fn test_roundtrip_map_with_dict_keys() {
3896 let key_values = StringArray::from(vec!["key_a", "key_b", "key_c"]);
3899 let keys = Int32Array::from_iter_values([0, 1, 2, 0, 1, 0]);
3900 let dict_keys = DictionaryArray::new(keys, Arc::new(key_values));
3901
3902 let values = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
3903
3904 #[allow(deprecated)]
3905 let entries_field = Arc::new(Field::new(
3906 "entries",
3907 DataType::Struct(
3908 vec![
3909 Field::new_dict(
3910 "key",
3911 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3912 false,
3913 1,
3914 false,
3915 ),
3916 Field::new("value", DataType::Int32, true),
3917 ]
3918 .into(),
3919 ),
3920 false,
3921 ));
3922
3923 let entries = StructArray::from(vec![
3924 (
3925 Arc::new(Field::new(
3926 "key",
3927 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3928 false,
3929 )),
3930 Arc::new(dict_keys) as ArrayRef,
3931 ),
3932 (
3933 Arc::new(Field::new("value", DataType::Int32, true)),
3934 Arc::new(values) as ArrayRef,
3935 ),
3936 ]);
3937
3938 let offsets = Buffer::from_slice_ref([0i32, 2, 4, 6]);
3939
3940 let map_data = ArrayData::builder(DataType::Map(entries_field, false))
3941 .len(3)
3942 .add_buffer(offsets)
3943 .add_child_data(entries.into_data())
3944 .build()
3945 .unwrap();
3946 let map_array = MapArray::from(map_data);
3947
3948 let schema = Arc::new(Schema::new(vec![Field::new(
3949 "map",
3950 map_array.data_type().clone(),
3951 false,
3952 )]));
3953 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(map_array)]).unwrap();
3954
3955 let output_batch = deserialize_file(serialize_file(&input_batch));
3956 assert_eq!(input_batch, output_batch);
3957
3958 let output_batch = deserialize_stream(serialize_stream(&input_batch));
3959 assert_eq!(input_batch, output_batch);
3960 }
3961
3962 #[test]
3963 fn test_roundtrip_map_with_dict_values() {
3964 let keys = StringArray::from(vec!["a", "b", "c", "d", "e", "f"]);
3967
3968 let value_values = StringArray::from(vec!["val_x", "val_y", "val_z"]);
3969 let value_keys = Int32Array::from_iter_values([0, 1, 2, 0, 1, 0]);
3970 let dict_values = DictionaryArray::new(value_keys, Arc::new(value_values));
3971
3972 #[allow(deprecated)]
3973 let entries_field = Arc::new(Field::new(
3974 "entries",
3975 DataType::Struct(
3976 vec![
3977 Field::new("key", DataType::Utf8, false),
3978 Field::new_dict(
3979 "value",
3980 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3981 true,
3982 2,
3983 false,
3984 ),
3985 ]
3986 .into(),
3987 ),
3988 false,
3989 ));
3990
3991 let entries = StructArray::from(vec![
3992 (
3993 Arc::new(Field::new("key", DataType::Utf8, false)),
3994 Arc::new(keys) as ArrayRef,
3995 ),
3996 (
3997 Arc::new(Field::new(
3998 "value",
3999 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
4000 true,
4001 )),
4002 Arc::new(dict_values) as ArrayRef,
4003 ),
4004 ]);
4005
4006 let offsets = Buffer::from_slice_ref([0i32, 2, 4, 6]);
4007
4008 let map_data = ArrayData::builder(DataType::Map(entries_field, false))
4009 .len(3)
4010 .add_buffer(offsets)
4011 .add_child_data(entries.into_data())
4012 .build()
4013 .unwrap();
4014 let map_array = MapArray::from(map_data);
4015
4016 let schema = Arc::new(Schema::new(vec![Field::new(
4017 "map",
4018 map_array.data_type().clone(),
4019 false,
4020 )]));
4021 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(map_array)]).unwrap();
4022
4023 let output_batch = deserialize_file(serialize_file(&input_batch));
4024 assert_eq!(input_batch, output_batch);
4025
4026 let output_batch = deserialize_stream(serialize_stream(&input_batch));
4027 assert_eq!(input_batch, output_batch);
4028 }
4029
4030 #[test]
4031 fn test_decimal128_alignment16_is_sufficient() {
4032 const IPC_ALIGNMENT: usize = 16;
4033
4034 for num_cols in [1, 2, 3, 17, 50, 73, 99] {
4039 let num_rows = (num_cols * 7 + 11) % 100; let mut fields = Vec::new();
4042 let mut arrays = Vec::new();
4043 for i in 0..num_cols {
4044 let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
4045 let array = Decimal128Array::from(vec![num_cols as i128; num_rows]);
4046 fields.push(field);
4047 arrays.push(Arc::new(array) as Arc<dyn Array>);
4048 }
4049 let schema = Schema::new(fields);
4050 let batch = RecordBatch::try_new(Arc::new(schema), arrays).unwrap();
4051
4052 let mut writer = FileWriter::try_new_with_options(
4053 Vec::new(),
4054 batch.schema_ref(),
4055 IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
4056 )
4057 .unwrap();
4058 writer.write(&batch).unwrap();
4059 writer.finish().unwrap();
4060
4061 let out: Vec<u8> = writer.into_inner().unwrap();
4062
4063 let buffer = Buffer::from_vec(out);
4064 let trailer_start = buffer.len() - 10;
4065 let footer_len =
4066 read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
4067 let footer =
4068 root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
4069
4070 let schema = fb_to_schema(footer.schema().unwrap());
4071
4072 let decoder =
4075 FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true);
4076
4077 let batches = footer.recordBatches().unwrap();
4078
4079 let block = batches.get(0);
4080 let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
4081 let data = buffer.slice_with_length(block.offset() as _, block_len);
4082
4083 let batch2 = decoder.read_record_batch(block, &data).unwrap().unwrap();
4084
4085 assert_eq!(batch, batch2);
4086 }
4087 }
4088
4089 #[test]
4090 fn test_decimal128_alignment8_is_unaligned() {
4091 const IPC_ALIGNMENT: usize = 8;
4092
4093 let num_cols = 2;
4094 let num_rows = 1;
4095
4096 let mut fields = Vec::new();
4097 let mut arrays = Vec::new();
4098 for i in 0..num_cols {
4099 let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
4100 let array = Decimal128Array::from(vec![num_cols as i128; num_rows]);
4101 fields.push(field);
4102 arrays.push(Arc::new(array) as Arc<dyn Array>);
4103 }
4104 let schema = Schema::new(fields);
4105 let batch = RecordBatch::try_new(Arc::new(schema), arrays).unwrap();
4106
4107 let mut writer = FileWriter::try_new_with_options(
4108 Vec::new(),
4109 batch.schema_ref(),
4110 IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
4111 )
4112 .unwrap();
4113 writer.write(&batch).unwrap();
4114 writer.finish().unwrap();
4115
4116 let out: Vec<u8> = writer.into_inner().unwrap();
4117
4118 let buffer = Buffer::from_vec(out);
4119 let trailer_start = buffer.len() - 10;
4120 let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
4121 let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
4122 let schema = fb_to_schema(footer.schema().unwrap());
4123
4124 let decoder =
4127 FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true);
4128
4129 let batches = footer.recordBatches().unwrap();
4130
4131 let block = batches.get(0);
4132 let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
4133 let data = buffer.slice_with_length(block.offset() as _, block_len);
4134
4135 let result = decoder.read_record_batch(block, &data);
4136
4137 let error = result.unwrap_err();
4138 assert_eq!(
4139 error.to_string(),
4140 "Invalid argument error: Misaligned buffers[0] in array of type Decimal128(38, 10), \
4141 offset from expected alignment of 16 by 8"
4142 );
4143 }
4144
4145 #[test]
4146 fn test_flush() {
4147 let num_cols = 2;
4150 let mut fields = Vec::new();
4151 let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap();
4152 for i in 0..num_cols {
4153 let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
4154 fields.push(field);
4155 }
4156 let schema = Schema::new(fields);
4157 let inner_stream_writer = BufWriter::with_capacity(1024, Vec::new());
4158 let inner_file_writer = BufWriter::with_capacity(1024, Vec::new());
4159 let mut stream_writer =
4160 StreamWriter::try_new_with_options(inner_stream_writer, &schema, options.clone())
4161 .unwrap();
4162 let mut file_writer =
4163 FileWriter::try_new_with_options(inner_file_writer, &schema, options).unwrap();
4164
4165 let stream_bytes_written_on_new = stream_writer.get_ref().get_ref().len();
4166 let file_bytes_written_on_new = file_writer.get_ref().get_ref().len();
4167 stream_writer.flush().unwrap();
4168 file_writer.flush().unwrap();
4169 let stream_bytes_written_on_flush = stream_writer.get_ref().get_ref().len();
4170 let file_bytes_written_on_flush = file_writer.get_ref().get_ref().len();
4171 let stream_out = stream_writer.into_inner().unwrap().into_inner().unwrap();
4172 let expected_stream_flushed_bytes = stream_out.len() - 8;
4176 let expected_file_flushed_bytes = expected_stream_flushed_bytes + 8;
4179
4180 assert!(
4181 stream_bytes_written_on_new < stream_bytes_written_on_flush,
4182 "this test makes no sense if flush is not actually required"
4183 );
4184 assert!(
4185 file_bytes_written_on_new < file_bytes_written_on_flush,
4186 "this test makes no sense if flush is not actually required"
4187 );
4188 assert_eq!(stream_bytes_written_on_flush, expected_stream_flushed_bytes);
4189 assert_eq!(file_bytes_written_on_flush, expected_file_flushed_bytes);
4190 }
4191
4192 #[test]
4193 fn test_roundtrip_list_of_fixed_list() -> Result<(), ArrowError> {
4194 let l1_type =
4195 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, false)), 3);
4196 let l2_type = DataType::List(Arc::new(Field::new("item", l1_type.clone(), false)));
4197
4198 let l0_builder = Float32Builder::new();
4199 let l1_builder = FixedSizeListBuilder::new(l0_builder, 3).with_field(Arc::new(Field::new(
4200 "item",
4201 DataType::Float32,
4202 false,
4203 )));
4204 let mut l2_builder =
4205 ListBuilder::new(l1_builder).with_field(Arc::new(Field::new("item", l1_type, false)));
4206
4207 for point in [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0], [7.0, 8.0, 9.0]] {
4208 l2_builder.values().values().append_value(point[0]);
4209 l2_builder.values().values().append_value(point[1]);
4210 l2_builder.values().values().append_value(point[2]);
4211
4212 l2_builder.values().append(true);
4213 }
4214 l2_builder.append(true);
4215
4216 let point = [10., 11., 12.];
4217 l2_builder.values().values().append_value(point[0]);
4218 l2_builder.values().values().append_value(point[1]);
4219 l2_builder.values().values().append_value(point[2]);
4220
4221 l2_builder.values().append(true);
4222 l2_builder.append(true);
4223
4224 let array = Arc::new(l2_builder.finish()) as ArrayRef;
4225
4226 let schema = Arc::new(Schema::new_with_metadata(
4227 vec![Field::new("points", l2_type, false)],
4228 HashMap::default(),
4229 ));
4230
4231 test_slices(&array, &schema, 0, 1)?;
4234 test_slices(&array, &schema, 0, 2)?;
4235 test_slices(&array, &schema, 1, 1)?;
4236
4237 Ok(())
4238 }
4239
4240 #[test]
4241 fn test_roundtrip_list_of_fixed_list_w_nulls() -> Result<(), ArrowError> {
4242 let l0_builder = Float32Builder::new();
4243 let l1_builder = FixedSizeListBuilder::new(l0_builder, 3);
4244 let mut l2_builder = ListBuilder::new(l1_builder);
4245
4246 for point in [
4247 [Some(1.0), Some(2.0), None],
4248 [Some(4.0), Some(5.0), Some(6.0)],
4249 [None, Some(8.0), Some(9.0)],
4250 ] {
4251 for p in point {
4252 match p {
4253 Some(p) => l2_builder.values().values().append_value(p),
4254 None => l2_builder.values().values().append_null(),
4255 }
4256 }
4257
4258 l2_builder.values().append(true);
4259 }
4260 l2_builder.append(true);
4261
4262 let point = [Some(10.), None, None];
4263 for p in point {
4264 match p {
4265 Some(p) => l2_builder.values().values().append_value(p),
4266 None => l2_builder.values().values().append_null(),
4267 }
4268 }
4269
4270 l2_builder.values().append(true);
4271 l2_builder.append(true);
4272
4273 let array = Arc::new(l2_builder.finish()) as ArrayRef;
4274
4275 let schema = Arc::new(Schema::new_with_metadata(
4276 vec![Field::new(
4277 "points",
4278 DataType::List(Arc::new(Field::new(
4279 "item",
4280 DataType::FixedSizeList(
4281 Arc::new(Field::new("item", DataType::Float32, true)),
4282 3,
4283 ),
4284 true,
4285 ))),
4286 true,
4287 )],
4288 HashMap::default(),
4289 ));
4290
4291 test_slices(&array, &schema, 0, 1)?;
4294 test_slices(&array, &schema, 0, 2)?;
4295 test_slices(&array, &schema, 1, 1)?;
4296
4297 Ok(())
4298 }
4299
4300 fn test_slices(
4301 parent_array: &ArrayRef,
4302 schema: &SchemaRef,
4303 offset: usize,
4304 length: usize,
4305 ) -> Result<(), ArrowError> {
4306 let subarray = parent_array.slice(offset, length);
4307 let original_batch = RecordBatch::try_new(schema.clone(), vec![subarray])?;
4308
4309 let mut bytes = Vec::new();
4310 let mut writer = StreamWriter::try_new(&mut bytes, schema)?;
4311 writer.write(&original_batch)?;
4312 writer.finish()?;
4313
4314 let mut cursor = std::io::Cursor::new(bytes);
4315 let mut reader = StreamReader::try_new(&mut cursor, None)?;
4316 let returned_batch = reader.next().unwrap()?;
4317
4318 assert_eq!(original_batch, returned_batch);
4319
4320 Ok(())
4321 }
4322
4323 #[test]
4324 fn test_roundtrip_fixed_list() -> Result<(), ArrowError> {
4325 let int_builder = Int64Builder::new();
4326 let mut fixed_list_builder = FixedSizeListBuilder::new(int_builder, 3)
4327 .with_field(Arc::new(Field::new("item", DataType::Int64, false)));
4328
4329 for point in [[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]] {
4330 fixed_list_builder.values().append_value(point[0]);
4331 fixed_list_builder.values().append_value(point[1]);
4332 fixed_list_builder.values().append_value(point[2]);
4333
4334 fixed_list_builder.append(true);
4335 }
4336
4337 let array = Arc::new(fixed_list_builder.finish()) as ArrayRef;
4338
4339 let schema = Arc::new(Schema::new_with_metadata(
4340 vec![Field::new(
4341 "points",
4342 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, false)), 3),
4343 false,
4344 )],
4345 HashMap::default(),
4346 ));
4347
4348 test_slices(&array, &schema, 0, 4)?;
4351 test_slices(&array, &schema, 0, 2)?;
4352 test_slices(&array, &schema, 1, 3)?;
4353 test_slices(&array, &schema, 2, 1)?;
4354
4355 Ok(())
4356 }
4357
4358 #[test]
4359 fn test_roundtrip_fixed_list_w_nulls() -> Result<(), ArrowError> {
4360 let int_builder = Int64Builder::new();
4361 let mut fixed_list_builder = FixedSizeListBuilder::new(int_builder, 3);
4362
4363 for point in [
4364 [Some(1), Some(2), None],
4365 [Some(4), Some(5), Some(6)],
4366 [None, Some(8), Some(9)],
4367 [Some(10), None, None],
4368 ] {
4369 for p in point {
4370 match p {
4371 Some(p) => fixed_list_builder.values().append_value(p),
4372 None => fixed_list_builder.values().append_null(),
4373 }
4374 }
4375
4376 fixed_list_builder.append(true);
4377 }
4378
4379 let array = Arc::new(fixed_list_builder.finish()) as ArrayRef;
4380
4381 let schema = Arc::new(Schema::new_with_metadata(
4382 vec![Field::new(
4383 "points",
4384 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, true)), 3),
4385 true,
4386 )],
4387 HashMap::default(),
4388 ));
4389
4390 test_slices(&array, &schema, 0, 4)?;
4393 test_slices(&array, &schema, 0, 2)?;
4394 test_slices(&array, &schema, 1, 3)?;
4395 test_slices(&array, &schema, 2, 1)?;
4396
4397 Ok(())
4398 }
4399
4400 #[test]
4401 fn test_metadata_encoding_ordering() {
4402 fn create_hash() -> u64 {
4403 let metadata: HashMap<String, String> = [
4404 ("a", "1"), ("b", "2"), ("c", "3"), ("d", "4"), ("e", "5"), ]
4410 .into_iter()
4411 .map(|(k, v)| (k.to_owned(), v.to_owned()))
4412 .collect();
4413
4414 let schema = Arc::new(
4416 Schema::new(vec![
4417 Field::new("a", DataType::Int64, true).with_metadata(metadata.clone()),
4418 ])
4419 .with_metadata(metadata)
4420 .clone(),
4421 );
4422 let batch = RecordBatch::new_empty(schema.clone());
4423
4424 let mut bytes = Vec::new();
4425 let mut w = StreamWriter::try_new(&mut bytes, batch.schema_ref()).unwrap();
4426 w.write(&batch).unwrap();
4427 w.finish().unwrap();
4428
4429 let mut h = std::hash::DefaultHasher::new();
4430 h.write(&bytes);
4431 h.finish()
4432 }
4433
4434 let expected = create_hash();
4435
4436 let all_passed = (0..20).all(|_| create_hash() == expected);
4441 assert!(all_passed);
4442 }
4443
4444 #[test]
4445 fn test_dictionary_tracker_reset() {
4446 let data_gen = IpcDataGenerator::default();
4447 let mut dictionary_tracker = DictionaryTracker::new(false);
4448 let writer_options = IpcWriteOptions::default();
4449 let mut compression_ctx = CompressionContext::default();
4450
4451 let schema = Arc::new(Schema::new(vec![Field::new(
4452 "a",
4453 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
4454 false,
4455 )]));
4456
4457 let mut write_single_batch_stream =
4458 |batch: RecordBatch, dict_tracker: &mut DictionaryTracker| -> Vec<u8> {
4459 let mut buffer = Vec::new();
4460
4461 let stream_header = data_gen.schema_to_bytes_with_dictionary_tracker(
4463 &schema,
4464 dict_tracker,
4465 &writer_options,
4466 );
4467 _ = write_message(&mut buffer, stream_header, &writer_options).unwrap();
4468
4469 let (encoded_dicts, encoded_batch) = data_gen
4470 .encode(&batch, dict_tracker, &writer_options, &mut compression_ctx)
4471 .unwrap();
4472 for encoded_dict in encoded_dicts {
4473 _ = write_message(&mut buffer, encoded_dict, &writer_options).unwrap();
4474 }
4475 _ = write_message(&mut buffer, encoded_batch, &writer_options).unwrap();
4476
4477 buffer
4478 };
4479
4480 let batch1 = RecordBatch::try_new(
4481 schema.clone(),
4482 vec![Arc::new(DictionaryArray::new(
4483 UInt8Array::from_iter_values([0]),
4484 Arc::new(StringArray::from_iter_values(["a"])),
4485 ))],
4486 )
4487 .unwrap();
4488 let buffer = write_single_batch_stream(batch1.clone(), &mut dictionary_tracker);
4489
4490 let mut reader = StreamReader::try_new(Cursor::new(buffer), None).unwrap();
4492 let read_batch = reader.next().unwrap().unwrap();
4493 assert_eq!(read_batch, batch1);
4494
4495 dictionary_tracker.clear();
4497
4498 let batch2 = RecordBatch::try_new(
4500 schema.clone(),
4501 vec![Arc::new(DictionaryArray::new(
4502 UInt8Array::from_iter_values([0]),
4503 Arc::new(StringArray::from_iter_values(["a"])),
4504 ))],
4505 )
4506 .unwrap();
4507 let buffer = write_single_batch_stream(batch2.clone(), &mut dictionary_tracker);
4508 let mut reader = StreamReader::try_new(Cursor::new(buffer), None).unwrap();
4509 let read_batch = reader.next().unwrap().unwrap();
4510 assert_eq!(read_batch, batch2);
4511 }
4512}