1use std::cmp::min;
28use std::collections::HashMap;
29use std::io::{BufWriter, Write};
30use std::mem::size_of;
31use std::sync::Arc;
32
33use flatbuffers::FlatBufferBuilder;
34
35use arrow_array::builder::BufferBuilder;
36use arrow_array::cast::*;
37use arrow_array::types::{Int16Type, Int32Type, Int64Type, RunEndIndexType};
38use arrow_array::*;
39use arrow_buffer::bit_util;
40use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer, ToByteSlice};
41use arrow_data::{ArrayData, ArrayDataBuilder, BufferSpec, layout};
42use arrow_schema::*;
43
44use crate::CONTINUATION_MARKER;
45use crate::compression::CompressionCodec;
46#[expect(deprecated)]
47pub use crate::compression::{CompressionContext, IpcWriteContext};
48use crate::convert::IpcSchemaEncoder;
49
50#[derive(Debug, Clone)]
52pub struct IpcWriteOptions {
53 alignment: u8,
56 write_legacy_ipc_format: bool,
58 metadata_version: crate::MetadataVersion,
67 batch_compression_type: Option<crate::CompressionType>,
70 batch_compression_level: Option<i32>,
72 dictionary_handling: DictionaryHandling,
74}
75
76enum EncodedBuffer {
82 Raw(Buffer),
84 Compressed(Vec<u8>),
86}
87
88impl EncodedBuffer {
89 fn as_slice(&self) -> &[u8] {
90 match self {
91 EncodedBuffer::Raw(b) => b.as_slice(),
92 EncodedBuffer::Compressed(v) => v.as_slice(),
93 }
94 }
95
96 fn len(&self) -> usize {
97 match self {
98 EncodedBuffer::Raw(b) => b.len(),
99 EncodedBuffer::Compressed(v) => v.len(),
100 }
101 }
102}
103#[derive(Default)]
108struct IpcMetadataBuilder {
109 nodes: Vec<crate::FieldNode>,
110 buffers: Vec<crate::Buffer>,
111}
112
113enum IpcBodySink<'a> {
118 Write(&'a mut Vec<u8>),
120 Collect(&'a mut Vec<EncodedBuffer>),
122}
123impl<'a> IpcBodySink<'a> {
124 pub fn write(&mut self, pad_len: usize, buffer: EncodedBuffer) {
126 match self {
127 IpcBodySink::Write(vec) => {
128 vec.extend_from_slice(buffer.as_slice());
129 vec.extend_from_slice(&PADDING[..pad_len]);
130 }
131 IpcBodySink::Collect(vec) => {
132 vec.push(buffer);
133 }
134 }
135 }
136}
137
138struct IpcWriteMetadata {
143 dictionary_block_sizes: Vec<(usize, usize)>,
146 padded_header_len: usize,
148 body_len: usize,
150}
151
152impl IpcWriteOptions {
153 pub fn try_with_compression(
158 mut self,
159 batch_compression_type: Option<crate::CompressionType>,
160 ) -> Result<Self, ArrowError> {
161 self.batch_compression_type = batch_compression_type;
162
163 if self.batch_compression_type.is_some()
164 && self.metadata_version < crate::MetadataVersion::V5
165 {
166 return Err(ArrowError::InvalidArgumentError(
167 "Compression only supported in metadata v5 and above".to_string(),
168 ));
169 }
170 Ok(self)
171 }
172
173 pub fn try_with_compression_level(
178 mut self,
179 batch_compression_level: Option<i32>,
180 ) -> Result<Self, ArrowError> {
181 self.batch_compression_level = batch_compression_level;
182
183 if self.batch_compression_level.is_some()
184 && self.metadata_version < crate::MetadataVersion::V5
185 {
186 return Err(ArrowError::InvalidArgumentError(
187 "Compression only supported in metadata v5 and above".to_string(),
188 ));
189 }
190
191 match (self.batch_compression_type, self.batch_compression_level) {
192 (Some(crate::CompressionType::ZSTD), Some(level)) => {
193 return self.check_zstd_level(level);
194 }
195 (Some(crate::CompressionType::LZ4_FRAME), Some(_)) => {
196 return Err(ArrowError::InvalidArgumentError(
197 "LZ4 Frame compression does not support configurable compression levels"
198 .to_string(),
199 ));
200 }
201 _ => {}
202 }
203
204 Ok(self)
205 }
206
207 #[cfg(not(feature = "zstd"))]
208 fn check_zstd_level(self, _level: i32) -> Result<Self, ArrowError> {
209 Err(ArrowError::InvalidArgumentError(
210 "zstd IPC compression requires the zstd feature".to_string(),
211 ))
212 }
213
214 #[cfg(feature = "zstd")]
215 fn check_zstd_level(self, level: i32) -> Result<Self, ArrowError> {
216 let range = zstd::compression_level_range();
217 if !range.contains(&(level as zstd::zstd_safe::CompressionLevel)) {
218 return Err(ArrowError::InvalidArgumentError(format!(
219 "ZSTD compression level must be between {} and {}, got {}",
220 range.start(),
221 range.end(),
222 level,
223 )));
224 }
225
226 Ok(self)
227 }
228
229 pub fn try_new(
231 alignment: usize,
232 write_legacy_ipc_format: bool,
233 metadata_version: crate::MetadataVersion,
234 ) -> Result<Self, ArrowError> {
235 let is_alignment_valid =
236 alignment == 8 || alignment == 16 || alignment == 32 || alignment == 64;
237 if !is_alignment_valid {
238 return Err(ArrowError::InvalidArgumentError(
239 "Alignment should be 8, 16, 32, or 64.".to_string(),
240 ));
241 }
242 let alignment: u8 = u8::try_from(alignment).expect("range already checked");
243 match metadata_version {
244 crate::MetadataVersion::V1
245 | crate::MetadataVersion::V2
246 | crate::MetadataVersion::V3 => Err(ArrowError::InvalidArgumentError(
247 "Writing IPC metadata version 3 and lower not supported".to_string(),
248 )),
249 #[allow(deprecated)]
250 crate::MetadataVersion::V4 => Ok(Self {
251 alignment,
252 write_legacy_ipc_format,
253 metadata_version,
254 batch_compression_type: None,
255 batch_compression_level: None,
256 dictionary_handling: DictionaryHandling::default(),
257 }),
258 crate::MetadataVersion::V5 => {
259 if write_legacy_ipc_format {
260 Err(ArrowError::InvalidArgumentError(
261 "Legacy IPC format only supported on metadata version 4".to_string(),
262 ))
263 } else {
264 Ok(Self {
265 alignment,
266 write_legacy_ipc_format,
267 metadata_version,
268 batch_compression_type: None,
269 batch_compression_level: None,
270 dictionary_handling: DictionaryHandling::default(),
271 })
272 }
273 }
274 z => Err(ArrowError::InvalidArgumentError(format!(
275 "Unsupported crate::MetadataVersion {z:?}"
276 ))),
277 }
278 }
279
280 pub fn with_dictionary_handling(mut self, dictionary_handling: DictionaryHandling) -> Self {
282 self.dictionary_handling = dictionary_handling;
283 self
284 }
285}
286
287impl Default for IpcWriteOptions {
288 fn default() -> Self {
289 Self {
290 alignment: 64,
291 write_legacy_ipc_format: false,
292 metadata_version: crate::MetadataVersion::V5,
293 batch_compression_type: None,
294 batch_compression_level: None,
295 dictionary_handling: DictionaryHandling::default(),
296 }
297 }
298}
299
300#[derive(Debug, Default)]
301pub struct IpcDataGenerator {}
335
336impl IpcDataGenerator {
337 pub fn schema_to_bytes_with_dictionary_tracker(
340 &self,
341 schema: &Schema,
342 dictionary_tracker: &mut DictionaryTracker,
343 write_options: &IpcWriteOptions,
344 ) -> EncodedData {
345 let mut fbb = FlatBufferBuilder::new();
346 let schema = {
347 let fb = IpcSchemaEncoder::new()
348 .with_dictionary_tracker(dictionary_tracker)
349 .schema_to_fb_offset(&mut fbb, schema);
350 fb.as_union_value()
351 };
352
353 let mut message = crate::MessageBuilder::new(&mut fbb);
354 message.add_version(write_options.metadata_version);
355 message.add_header_type(crate::MessageHeader::Schema);
356 message.add_bodyLength(0);
357 message.add_header(schema);
358 let data = message.finish();
360 fbb.finish(data, None);
361
362 let data = fbb.finished_data();
363 EncodedData {
364 ipc_message: data.to_vec(),
365 arrow_data: vec![],
366 }
367 }
368
369 fn _encode_dictionaries<I: Iterator<Item = i64>>(
370 &self,
371 column: &ArrayRef,
372 encoded_dictionaries: &mut Vec<EncodedData>,
373 dictionary_tracker: &mut DictionaryTracker,
374 write_options: &IpcWriteOptions,
375 dict_id: &mut I,
376 ipc_write_context: &mut IpcWriteContext,
377 ) -> Result<(), ArrowError> {
378 match column.data_type() {
379 DataType::Struct(fields) => {
380 let s = as_struct_array(column);
381 for (field, column) in fields.iter().zip(s.columns()) {
382 self.encode_dictionaries(
383 field,
384 column,
385 encoded_dictionaries,
386 dictionary_tracker,
387 write_options,
388 dict_id,
389 ipc_write_context,
390 )?;
391 }
392 }
393 DataType::RunEndEncoded(_, values) => {
394 let data = column.to_data();
395 if data.child_data().len() != 2 {
396 return Err(ArrowError::InvalidArgumentError(format!(
397 "The run encoded array should have exactly two child arrays. Found {}",
398 data.child_data().len()
399 )));
400 }
401 let values_array = make_array(data.child_data()[1].clone());
404 self.encode_dictionaries(
405 values,
406 &values_array,
407 encoded_dictionaries,
408 dictionary_tracker,
409 write_options,
410 dict_id,
411 ipc_write_context,
412 )?;
413 }
414 DataType::List(field) => {
415 let list = as_list_array(column);
416 self.encode_dictionaries(
417 field,
418 list.values(),
419 encoded_dictionaries,
420 dictionary_tracker,
421 write_options,
422 dict_id,
423 ipc_write_context,
424 )?;
425 }
426 DataType::LargeList(field) => {
427 let list = as_large_list_array(column);
428 self.encode_dictionaries(
429 field,
430 list.values(),
431 encoded_dictionaries,
432 dictionary_tracker,
433 write_options,
434 dict_id,
435 ipc_write_context,
436 )?;
437 }
438 DataType::ListView(field) => {
439 let list = column.as_list_view::<i32>();
440 self.encode_dictionaries(
441 field,
442 list.values(),
443 encoded_dictionaries,
444 dictionary_tracker,
445 write_options,
446 dict_id,
447 ipc_write_context,
448 )?;
449 }
450 DataType::LargeListView(field) => {
451 let list = column.as_list_view::<i64>();
452 self.encode_dictionaries(
453 field,
454 list.values(),
455 encoded_dictionaries,
456 dictionary_tracker,
457 write_options,
458 dict_id,
459 ipc_write_context,
460 )?;
461 }
462 DataType::FixedSizeList(field, _) => {
463 let list = column
464 .as_any()
465 .downcast_ref::<FixedSizeListArray>()
466 .expect("Unable to downcast to fixed size list array");
467 self.encode_dictionaries(
468 field,
469 list.values(),
470 encoded_dictionaries,
471 dictionary_tracker,
472 write_options,
473 dict_id,
474 ipc_write_context,
475 )?;
476 }
477 DataType::Map(field, _) => {
478 let map_array = as_map_array(column);
479
480 let (keys, values) = match field.data_type() {
481 DataType::Struct(fields) if fields.len() == 2 => (&fields[0], &fields[1]),
482 _ => panic!("Incorrect field data type {:?}", field.data_type()),
483 };
484
485 self.encode_dictionaries(
487 keys,
488 map_array.keys(),
489 encoded_dictionaries,
490 dictionary_tracker,
491 write_options,
492 dict_id,
493 ipc_write_context,
494 )?;
495
496 self.encode_dictionaries(
498 values,
499 map_array.values(),
500 encoded_dictionaries,
501 dictionary_tracker,
502 write_options,
503 dict_id,
504 ipc_write_context,
505 )?;
506 }
507 DataType::Union(fields, _) => {
508 let union = as_union_array(column);
509 for (type_id, field) in fields.iter() {
510 let column = union.child(type_id);
511 self.encode_dictionaries(
512 field,
513 column,
514 encoded_dictionaries,
515 dictionary_tracker,
516 write_options,
517 dict_id,
518 ipc_write_context,
519 )?;
520 }
521 }
522 _ => (),
523 }
524
525 Ok(())
526 }
527
528 #[allow(clippy::too_many_arguments)]
529 fn encode_dictionaries<I: Iterator<Item = i64>>(
530 &self,
531 field: &Field,
532 column: &ArrayRef,
533 encoded_dictionaries: &mut Vec<EncodedData>,
534 dictionary_tracker: &mut DictionaryTracker,
535 write_options: &IpcWriteOptions,
536 dict_id_seq: &mut I,
537 ipc_write_context: &mut IpcWriteContext,
538 ) -> Result<(), ArrowError> {
539 match column.data_type() {
540 DataType::Dictionary(_key_type, _value_type) => {
541 let dict_data = column.to_data();
542 let dict_values = &dict_data.child_data()[0];
543
544 let values = make_array(dict_data.child_data()[0].clone());
545
546 self._encode_dictionaries(
547 &values,
548 encoded_dictionaries,
549 dictionary_tracker,
550 write_options,
551 dict_id_seq,
552 ipc_write_context,
553 )?;
554
555 let dict_id = dict_id_seq.next().ok_or_else(|| {
559 ArrowError::IpcError(format!(
560 "no dict id for field {:?}: field.data_type={:?}, column.data_type={:?}",
561 field.name(),
562 field.data_type(),
563 column.data_type()
564 ))
565 })?;
566
567 match dictionary_tracker.insert_column(
568 dict_id,
569 column,
570 write_options.dictionary_handling,
571 )? {
572 DictionaryUpdate::None => {}
573 DictionaryUpdate::New | DictionaryUpdate::Replaced => {
574 encoded_dictionaries.push(self.dictionary_batch_to_bytes(
575 dict_id,
576 dict_values,
577 write_options,
578 false,
579 ipc_write_context,
580 )?);
581 }
582 DictionaryUpdate::Delta(data) => {
583 encoded_dictionaries.push(self.dictionary_batch_to_bytes(
584 dict_id,
585 &data,
586 write_options,
587 true,
588 ipc_write_context,
589 )?);
590 }
591 }
592 }
593 _ => self._encode_dictionaries(
594 column,
595 encoded_dictionaries,
596 dictionary_tracker,
597 write_options,
598 dict_id_seq,
599 ipc_write_context,
600 )?,
601 }
602
603 Ok(())
604 }
605
606 pub fn encode(
610 &self,
611 batch: &RecordBatch,
612 dictionary_tracker: &mut DictionaryTracker,
613 write_options: &IpcWriteOptions,
614 ipc_write_context: &mut IpcWriteContext,
615 ) -> Result<(Vec<EncodedData>, EncodedData), ArrowError> {
616 let encoded_dictionaries =
617 self.encode_all_dicts(batch, dictionary_tracker, write_options, ipc_write_context)?;
618 let mut arrow_data = Vec::new();
619 let (ipc_message, _, tail_pad) = self.record_batch_to_bytes(
620 batch,
621 write_options,
622 ipc_write_context,
623 &mut IpcBodySink::Write(&mut arrow_data),
624 )?;
625 arrow_data.extend_from_slice(&PADDING[..tail_pad]);
626 Ok((
627 encoded_dictionaries,
628 EncodedData {
629 ipc_message,
630 arrow_data,
631 },
632 ))
633 }
634
635 fn encode_all_dicts(
637 &self,
638 batch: &RecordBatch,
639 dictionary_tracker: &mut DictionaryTracker,
640 write_options: &IpcWriteOptions,
641 ipc_write_context: &mut IpcWriteContext,
642 ) -> Result<Vec<EncodedData>, ArrowError> {
643 let schema = batch.schema();
644 let mut encoded_dictionaries = Vec::with_capacity(schema.flattened_fields().len());
645 let mut dict_id = dictionary_tracker.dict_ids.clone().into_iter();
646 for (i, field) in schema.fields().iter().enumerate() {
647 self.encode_dictionaries(
648 field,
649 batch.column(i),
650 &mut encoded_dictionaries,
651 dictionary_tracker,
652 write_options,
653 &mut dict_id,
654 ipc_write_context,
655 )?;
656 }
657 Ok(encoded_dictionaries)
658 }
659
660 fn write<W: Write>(
664 &self,
665 batch: &RecordBatch,
666 dictionary_tracker: &mut DictionaryTracker,
667 write_options: &IpcWriteOptions,
668 ipc_write_context: &mut IpcWriteContext,
669 writer: &mut W,
670 ) -> Result<IpcWriteMetadata, ArrowError> {
671 let encoded_dictionaries =
672 self.encode_all_dicts(batch, dictionary_tracker, write_options, ipc_write_context)?;
673
674 let mut dictionary_block_sizes = Vec::with_capacity(encoded_dictionaries.len());
675 for dict in encoded_dictionaries {
676 dictionary_block_sizes.push(write_message(&mut *writer, dict, write_options)?);
677 }
678
679 let capacity = batch
680 .columns()
681 .iter()
682 .map(|a| estimate_encoded_buffer_count(a.data_type()))
683 .sum();
684 let mut encoded_buffers: Vec<EncodedBuffer> = Vec::with_capacity(capacity);
685 let (ipc_message, body_len, tail_pad) = self.record_batch_to_bytes(
686 batch,
687 write_options,
688 ipc_write_context,
689 &mut IpcBodySink::Collect(&mut encoded_buffers),
690 )?;
691
692 let alignment = write_options.alignment;
693 let a = usize::from(alignment - 1);
694 let prefix_size = if write_options.write_legacy_ipc_format {
695 4
696 } else {
697 8
698 };
699 let aligned_size = (ipc_message.len() + prefix_size + a) & !a;
700 write_continuation(
701 &mut *writer,
702 write_options,
703 (aligned_size - prefix_size) as i32,
704 )?;
705 writer.write_all(&ipc_message)?;
706 writer.write_all(&PADDING[..aligned_size - ipc_message.len() - prefix_size])?;
707 for enc in &encoded_buffers {
708 writer.write_all(enc.as_slice())?;
709 writer.write_all(&PADDING[..pad_to_alignment(alignment, enc.len())])?;
710 }
711 writer.write_all(&PADDING[..tail_pad])?;
712
713 Ok(IpcWriteMetadata {
714 dictionary_block_sizes,
715 padded_header_len: aligned_size,
716 body_len,
717 })
718 }
719
720 #[deprecated(since = "57.0.0", note = "Use `encode` instead")]
724 pub fn encoded_batch(
725 &self,
726 batch: &RecordBatch,
727 dictionary_tracker: &mut DictionaryTracker,
728 write_options: &IpcWriteOptions,
729 ) -> Result<(Vec<EncodedData>, EncodedData), ArrowError> {
730 self.encode(
731 batch,
732 dictionary_tracker,
733 write_options,
734 &mut Default::default(),
735 )
736 }
737
738 fn record_batch_to_bytes(
744 &self,
745 batch: &RecordBatch,
746 write_options: &IpcWriteOptions,
747 ipc_write_context: &mut IpcWriteContext,
748 sink: &mut IpcBodySink<'_>,
749 ) -> Result<(Vec<u8>, usize, usize), ArrowError> {
750 let batch_compression_type = write_options.batch_compression_type;
751
752 let compression = batch_compression_type.map(|batch_compression_type| {
753 let fbb = ipc_write_context.mut_fbb();
754 let mut c = crate::BodyCompressionBuilder::new(fbb);
755 c.add_method(crate::BodyCompressionMethod::BUFFER);
756 c.add_codec(batch_compression_type);
757 c.finish()
758 });
759
760 let batch_compression_level = write_options.batch_compression_level;
761 let compression_codec: Option<CompressionCodec> = batch_compression_type
762 .map(|compression_type| match batch_compression_level {
763 Some(level) => {
764 CompressionCodec::try_new_with_compression_level(compression_type, level)
765 }
766 None => compression_type.try_into(),
767 })
768 .transpose()?;
769
770 let alignment = write_options.alignment;
771 let mut variadic_buffer_counts = vec![];
772 let mut meta = IpcMetadataBuilder::default();
773 let mut offset = 0i64;
774
775 for array in batch.columns() {
776 let array_data = array.to_data();
777 offset = write_array_data(
778 &array_data,
779 &mut meta,
780 sink,
781 offset,
782 compression_codec,
783 ipc_write_context,
784 write_options,
785 )?;
786 append_variadic_buffer_counts(&mut variadic_buffer_counts, &array_data);
787 }
788
789 let tail_pad = pad_to_alignment(alignment, offset as usize);
790 let body_len = offset as usize + tail_pad;
791
792 let fbb = ipc_write_context.mut_fbb();
793 let buffers = fbb.create_vector(&meta.buffers);
794 let nodes = fbb.create_vector(&meta.nodes);
795 let variadic_buffer = if variadic_buffer_counts.is_empty() {
796 None
797 } else {
798 Some(fbb.create_vector(&variadic_buffer_counts))
799 };
800
801 let root = {
802 let mut batch_builder = crate::RecordBatchBuilder::new(fbb);
803 batch_builder.add_length(batch.num_rows() as i64);
804 batch_builder.add_nodes(nodes);
805 batch_builder.add_buffers(buffers);
806 if let Some(c) = compression {
807 batch_builder.add_compression(c);
808 }
809 if let Some(v) = variadic_buffer {
810 batch_builder.add_variadicBufferCounts(v);
811 }
812 batch_builder.finish().as_union_value()
813 };
814 let mut message = crate::MessageBuilder::new(fbb);
815 message.add_version(write_options.metadata_version);
816 message.add_header_type(crate::MessageHeader::RecordBatch);
817 message.add_bodyLength(body_len as i64);
818 message.add_header(root);
819 let root = message.finish();
820 fbb.finish(root, None);
821
822 let ipc_message = fbb.finished_data().to_vec();
823 fbb.reset();
824 Ok((ipc_message, body_len, tail_pad))
825 }
826
827 fn dictionary_batch_to_bytes(
830 &self,
831 dict_id: i64,
832 array_data: &ArrayData,
833 write_options: &IpcWriteOptions,
834 is_delta: bool,
835 ipc_write_context: &mut IpcWriteContext,
836 ) -> Result<EncodedData, ArrowError> {
837 let mut arrow_data: Vec<u8> = vec![];
838
839 let batch_compression_type = write_options.batch_compression_type;
841
842 let compression = batch_compression_type.map(|batch_compression_type| {
843 let fbb = ipc_write_context.mut_fbb();
844 let mut c = crate::BodyCompressionBuilder::new(fbb);
845 c.add_method(crate::BodyCompressionMethod::BUFFER);
846 c.add_codec(batch_compression_type);
847 c.finish()
848 });
849
850 let batch_compression_level = write_options.batch_compression_level;
851 let compression_codec: Option<CompressionCodec> = batch_compression_type
852 .map(|batch_compression_type| match batch_compression_level {
853 Some(level) => {
854 CompressionCodec::try_new_with_compression_level(batch_compression_type, level)
855 }
856 None => batch_compression_type.try_into(),
857 })
858 .transpose()?;
859
860 let alignment = write_options.alignment;
861 let mut meta = IpcMetadataBuilder::default();
862 let mut sink = IpcBodySink::Write(&mut arrow_data);
863 let offset = write_array_data(
864 array_data,
865 &mut meta,
866 &mut sink,
867 0,
868 compression_codec,
869 ipc_write_context,
870 write_options,
871 )?;
872
873 let mut variadic_buffer_counts = vec![];
874 append_variadic_buffer_counts(&mut variadic_buffer_counts, array_data);
875
876 let tail_pad = pad_to_alignment(alignment, offset as usize);
878 let body_len = offset as usize + tail_pad;
879 arrow_data.extend_from_slice(&PADDING[..tail_pad]);
880
881 let fbb = ipc_write_context.mut_fbb();
882 let buffers = fbb.create_vector(&meta.buffers);
883 let nodes = fbb.create_vector(&meta.nodes);
884 let variadic_buffer = if variadic_buffer_counts.is_empty() {
885 None
886 } else {
887 Some(fbb.create_vector(&variadic_buffer_counts))
888 };
889
890 let root = {
891 let mut batch_builder = crate::RecordBatchBuilder::new(fbb);
892 batch_builder.add_length(array_data.len() as i64);
893 batch_builder.add_nodes(nodes);
894 batch_builder.add_buffers(buffers);
895 if let Some(c) = compression {
896 batch_builder.add_compression(c);
897 }
898 if let Some(v) = variadic_buffer {
899 batch_builder.add_variadicBufferCounts(v);
900 }
901 batch_builder.finish()
902 };
903
904 let root = {
905 let mut batch_builder = crate::DictionaryBatchBuilder::new(fbb);
906 batch_builder.add_id(dict_id);
907 batch_builder.add_data(root);
908 batch_builder.add_isDelta(is_delta);
909 batch_builder.finish().as_union_value()
910 };
911
912 let root = {
913 let mut message_builder = crate::MessageBuilder::new(fbb);
914 message_builder.add_version(write_options.metadata_version);
915 message_builder.add_header_type(crate::MessageHeader::DictionaryBatch);
916 message_builder.add_bodyLength(body_len as i64);
917 message_builder.add_header(root);
918 message_builder.finish()
919 };
920
921 fbb.finish(root, None);
922 let ipc_message = fbb.finished_data().to_vec();
923 fbb.reset();
924
925 Ok(EncodedData {
926 ipc_message,
927 arrow_data,
928 })
929 }
930}
931
932fn append_variadic_buffer_counts(counts: &mut Vec<i64>, array: &ArrayData) {
933 match array.data_type() {
934 DataType::BinaryView | DataType::Utf8View => {
935 counts.push(array.buffers().len() as i64 - 1);
938 }
939 DataType::Dictionary(_, _) => {
940 }
943 _ => {
944 for child in array.child_data() {
945 append_variadic_buffer_counts(counts, child)
946 }
947 }
948 }
949}
950
951pub(crate) fn unslice_run_array(arr: ArrayData) -> Result<ArrayData, ArrowError> {
952 match arr.data_type() {
953 DataType::RunEndEncoded(k, _) => match k.data_type() {
954 DataType::Int16 => {
955 Ok(into_zero_offset_run_array(RunArray::<Int16Type>::from(arr))?.into_data())
956 }
957 DataType::Int32 => {
958 Ok(into_zero_offset_run_array(RunArray::<Int32Type>::from(arr))?.into_data())
959 }
960 DataType::Int64 => {
961 Ok(into_zero_offset_run_array(RunArray::<Int64Type>::from(arr))?.into_data())
962 }
963 d => unreachable!("Unexpected data type {d}"),
964 },
965 d => Err(ArrowError::InvalidArgumentError(format!(
966 "The given array is not a run array. Data type of given array: {d}"
967 ))),
968 }
969}
970
971fn into_zero_offset_run_array<R: RunEndIndexType>(
974 run_array: RunArray<R>,
975) -> Result<RunArray<R>, ArrowError> {
976 let run_ends = run_array.run_ends();
977 if run_ends.offset() == 0 && run_ends.max_value() == run_ends.len() {
978 return Ok(run_array);
979 }
980
981 let start_physical_index = run_ends.get_start_physical_index();
983
984 let end_physical_index = run_ends.get_end_physical_index();
986
987 let physical_length = end_physical_index - start_physical_index + 1;
988
989 let offset = R::Native::usize_as(run_ends.offset());
991 let mut builder = BufferBuilder::<R::Native>::new(physical_length);
992 for run_end_value in &run_ends.values()[start_physical_index..end_physical_index] {
993 builder.append(run_end_value.sub_wrapping(offset));
994 }
995 builder.append(R::Native::from_usize(run_array.len()).unwrap());
996 let new_run_ends = unsafe {
997 ArrayDataBuilder::new(R::DATA_TYPE)
1000 .len(physical_length)
1001 .add_buffer(builder.finish())
1002 .build_unchecked()
1003 };
1004
1005 let new_values = run_array
1007 .values()
1008 .slice(start_physical_index, physical_length)
1009 .into_data();
1010
1011 let builder = ArrayDataBuilder::new(run_array.data_type().clone())
1012 .len(run_array.len())
1013 .add_child_data(new_run_ends)
1014 .add_child_data(new_values);
1015 let array_data = unsafe {
1016 builder.build_unchecked()
1019 };
1020 Ok(array_data.into())
1021}
1022
1023#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
1025pub enum DictionaryHandling {
1026 #[default]
1028 Resend,
1029 Delta,
1035}
1036
1037#[derive(Debug, Clone)]
1039pub enum DictionaryUpdate {
1040 None,
1043 New,
1045 Replaced,
1047 Delta(ArrayData),
1049}
1050
1051#[derive(Debug)]
1057pub struct DictionaryTracker {
1058 written: HashMap<i64, ArrayData>,
1060 dict_ids: Vec<i64>,
1061 error_on_replacement: bool,
1062}
1063
1064impl DictionaryTracker {
1065 pub fn new(error_on_replacement: bool) -> Self {
1071 #[allow(deprecated)]
1072 Self {
1073 written: HashMap::new(),
1074 dict_ids: Vec::new(),
1075 error_on_replacement,
1076 }
1077 }
1078
1079 pub fn next_dict_id(&mut self) -> i64 {
1081 let next = self
1082 .dict_ids
1083 .last()
1084 .copied()
1085 .map(|i| i + 1)
1086 .unwrap_or_default();
1087
1088 self.dict_ids.push(next);
1089 next
1090 }
1091
1092 pub fn dict_id(&mut self) -> &[i64] {
1095 &self.dict_ids
1096 }
1097
1098 #[deprecated(since = "56.1.0", note = "Use `insert_column` instead")]
1108 pub fn insert(&mut self, dict_id: i64, column: &ArrayRef) -> Result<bool, ArrowError> {
1109 let dict_data = column.to_data();
1110 let dict_values = &dict_data.child_data()[0];
1111
1112 if let Some(last) = self.written.get(&dict_id) {
1114 if ArrayData::ptr_eq(&last.child_data()[0], dict_values) {
1115 return Ok(false);
1117 }
1118 if self.error_on_replacement {
1119 if last.child_data()[0] == *dict_values {
1121 return Ok(false);
1123 }
1124 return Err(ArrowError::InvalidArgumentError(
1125 "Dictionary replacement detected when writing IPC file format. \
1126 Arrow IPC files only support a single dictionary for a given field \
1127 across all batches."
1128 .to_string(),
1129 ));
1130 }
1131 }
1132
1133 self.written.insert(dict_id, dict_data);
1134 Ok(true)
1135 }
1136
1137 pub fn insert_column(
1153 &mut self,
1154 dict_id: i64,
1155 column: &ArrayRef,
1156 dict_handling: DictionaryHandling,
1157 ) -> Result<DictionaryUpdate, ArrowError> {
1158 let new_data = column.to_data();
1159 let new_values = &new_data.child_data()[0];
1160
1161 let Some(old) = self.written.get(&dict_id) else {
1163 self.written.insert(dict_id, new_data);
1164 return Ok(DictionaryUpdate::New);
1165 };
1166
1167 let old_values = &old.child_data()[0];
1170 if ArrayData::ptr_eq(old_values, new_values) {
1171 return Ok(DictionaryUpdate::None);
1172 }
1173
1174 let comparison = compare_dictionaries(old_values, new_values);
1176 if matches!(comparison, DictionaryComparison::Equal) {
1177 return Ok(DictionaryUpdate::None);
1178 }
1179
1180 const REPLACEMENT_ERROR: &str = "Dictionary replacement detected when writing IPC file format. \
1181 Arrow IPC files only support a single dictionary for a given field \
1182 across all batches.";
1183
1184 match comparison {
1185 DictionaryComparison::NotEqual => {
1186 if self.error_on_replacement {
1187 return Err(ArrowError::InvalidArgumentError(
1188 REPLACEMENT_ERROR.to_string(),
1189 ));
1190 }
1191
1192 self.written.insert(dict_id, new_data);
1193 Ok(DictionaryUpdate::Replaced)
1194 }
1195 DictionaryComparison::Delta => match dict_handling {
1196 DictionaryHandling::Resend => {
1197 if self.error_on_replacement {
1198 return Err(ArrowError::InvalidArgumentError(
1199 REPLACEMENT_ERROR.to_string(),
1200 ));
1201 }
1202
1203 self.written.insert(dict_id, new_data);
1204 Ok(DictionaryUpdate::Replaced)
1205 }
1206 DictionaryHandling::Delta => {
1207 let delta =
1208 new_values.slice(old_values.len(), new_values.len() - old_values.len());
1209 self.written.insert(dict_id, new_data);
1210 Ok(DictionaryUpdate::Delta(delta))
1211 }
1212 },
1213 DictionaryComparison::Equal => unreachable!("Already checked equal case"),
1214 }
1215 }
1216
1217 pub fn clear(&mut self) {
1223 self.dict_ids.clear();
1224 self.written.clear();
1225 }
1226}
1227
1228#[derive(Debug, Clone)]
1230enum DictionaryComparison {
1231 NotEqual,
1233 Equal,
1235 Delta,
1238}
1239
1240fn compare_dictionaries(old: &ArrayData, new: &ArrayData) -> DictionaryComparison {
1242 let existing_len = old.len();
1244 let new_len = new.len();
1245 if existing_len == new_len {
1246 if *old == *new {
1247 return DictionaryComparison::Equal;
1248 } else {
1249 return DictionaryComparison::NotEqual;
1250 }
1251 }
1252
1253 if new_len < existing_len {
1255 return DictionaryComparison::NotEqual;
1256 }
1257
1258 if new.slice(0, existing_len) == *old {
1260 return DictionaryComparison::Delta;
1261 }
1262
1263 DictionaryComparison::NotEqual
1264}
1265
1266pub struct FileWriter<W> {
1289 writer: W,
1291 write_options: IpcWriteOptions,
1293 schema: SchemaRef,
1295 block_offsets: usize,
1297 dictionary_blocks: Vec<crate::Block>,
1299 record_blocks: Vec<crate::Block>,
1301 finished: bool,
1303 dictionary_tracker: DictionaryTracker,
1305 custom_metadata: HashMap<String, String>,
1307
1308 data_gen: IpcDataGenerator,
1309
1310 ipc_write_context: IpcWriteContext,
1311}
1312
1313impl<W: Write> FileWriter<BufWriter<W>> {
1314 pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1318 Self::try_new(BufWriter::new(writer), schema)
1319 }
1320}
1321
1322impl<W: Write> FileWriter<W> {
1323 pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1331 let write_options = IpcWriteOptions::default();
1332 Self::try_new_with_options(writer, schema, write_options)
1333 }
1334
1335 pub fn try_new_with_options(
1343 mut writer: W,
1344 schema: &Schema,
1345 write_options: IpcWriteOptions,
1346 ) -> Result<Self, ArrowError> {
1347 let data_gen = IpcDataGenerator::default();
1348 let pad_len = pad_to_alignment(write_options.alignment, super::ARROW_MAGIC.len());
1350 let header_size = super::ARROW_MAGIC.len() + pad_len;
1351 writer.write_all(&super::ARROW_MAGIC)?;
1352 writer.write_all(&PADDING[..pad_len])?;
1353 let mut dictionary_tracker = DictionaryTracker::new(true);
1355 let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker(
1356 schema,
1357 &mut dictionary_tracker,
1358 &write_options,
1359 );
1360 let (meta, data) = write_message(&mut writer, encoded_message, &write_options)?;
1361 Ok(Self {
1362 writer,
1363 write_options,
1364 schema: Arc::new(schema.clone()),
1365 block_offsets: meta + data + header_size,
1366 dictionary_blocks: vec![],
1367 record_blocks: vec![],
1368 finished: false,
1369 dictionary_tracker,
1370 custom_metadata: HashMap::new(),
1371 data_gen,
1372 ipc_write_context: IpcWriteContext::default(),
1373 })
1374 }
1375
1376 pub fn write_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
1378 self.custom_metadata.insert(key.into(), value.into());
1379 }
1380
1381 pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1383 if self.finished {
1384 return Err(ArrowError::IpcError(
1385 "Cannot write record batch to file writer as it is closed".to_string(),
1386 ));
1387 }
1388
1389 let meta = self.data_gen.write(
1390 batch,
1391 &mut self.dictionary_tracker,
1392 &self.write_options,
1393 &mut self.ipc_write_context,
1394 &mut self.writer,
1395 )?;
1396
1397 for (header_len, body_len) in meta.dictionary_block_sizes {
1398 let block = crate::Block::new(
1399 self.block_offsets as i64,
1400 header_len as i32,
1401 body_len as i64,
1402 );
1403 self.dictionary_blocks.push(block);
1404 self.block_offsets += header_len + body_len;
1405 }
1406
1407 let block = crate::Block::new(
1409 self.block_offsets as i64,
1410 meta.padded_header_len as i32,
1411 meta.body_len as i64,
1412 );
1413 self.record_blocks.push(block);
1414 self.block_offsets += meta.padded_header_len + meta.body_len;
1415 Ok(())
1416 }
1417
1418 pub fn finish(&mut self) -> Result<(), ArrowError> {
1420 if self.finished {
1421 return Err(ArrowError::IpcError(
1422 "Cannot write footer to file writer as it is closed".to_string(),
1423 ));
1424 }
1425
1426 write_continuation(&mut self.writer, &self.write_options, 0)?;
1428
1429 let mut fbb = FlatBufferBuilder::new();
1430 let dictionaries = fbb.create_vector(&self.dictionary_blocks);
1431 let record_batches = fbb.create_vector(&self.record_blocks);
1432
1433 self.dictionary_tracker.clear();
1435 let schema = IpcSchemaEncoder::new()
1436 .with_dictionary_tracker(&mut self.dictionary_tracker)
1437 .schema_to_fb_offset(&mut fbb, &self.schema);
1438 let fb_custom_metadata = (!self.custom_metadata.is_empty())
1439 .then(|| crate::convert::metadata_to_fb(&mut fbb, &self.custom_metadata));
1440
1441 let root = {
1442 let mut footer_builder = crate::FooterBuilder::new(&mut fbb);
1443 footer_builder.add_version(self.write_options.metadata_version);
1444 footer_builder.add_schema(schema);
1445 footer_builder.add_dictionaries(dictionaries);
1446 footer_builder.add_recordBatches(record_batches);
1447 if let Some(fb_custom_metadata) = fb_custom_metadata {
1448 footer_builder.add_custom_metadata(fb_custom_metadata);
1449 }
1450 footer_builder.finish()
1451 };
1452 fbb.finish(root, None);
1453 let footer_data = fbb.finished_data();
1454 self.writer.write_all(footer_data)?;
1455 self.writer
1456 .write_all(&(footer_data.len() as i32).to_le_bytes())?;
1457 self.writer.write_all(&super::ARROW_MAGIC)?;
1458 self.writer.flush()?;
1459 self.finished = true;
1460
1461 Ok(())
1462 }
1463
1464 pub fn schema(&self) -> &SchemaRef {
1466 &self.schema
1467 }
1468
1469 pub fn get_ref(&self) -> &W {
1471 &self.writer
1472 }
1473
1474 pub fn get_mut(&mut self) -> &mut W {
1478 &mut self.writer
1479 }
1480
1481 pub fn flush(&mut self) -> Result<(), ArrowError> {
1485 self.writer.flush()?;
1486 Ok(())
1487 }
1488
1489 pub fn into_inner(mut self) -> Result<W, ArrowError> {
1498 if !self.finished {
1499 self.finish()?;
1501 }
1502 Ok(self.writer)
1503 }
1504}
1505
1506impl<W: Write> RecordBatchWriter for FileWriter<W> {
1507 fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1508 self.write(batch)
1509 }
1510
1511 fn close(mut self) -> Result<(), ArrowError> {
1512 self.finish()
1513 }
1514}
1515
1516pub struct StreamWriter<W> {
1590 writer: W,
1592 write_options: IpcWriteOptions,
1594 finished: bool,
1596 dictionary_tracker: DictionaryTracker,
1598
1599 data_gen: IpcDataGenerator,
1600
1601 ipc_write_context: IpcWriteContext,
1602}
1603
1604impl<W: Write> StreamWriter<BufWriter<W>> {
1605 pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1609 Self::try_new(BufWriter::new(writer), schema)
1610 }
1611}
1612
1613impl<W: Write> StreamWriter<W> {
1614 pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1622 let write_options = IpcWriteOptions::default();
1623 Self::try_new_with_options(writer, schema, write_options)
1624 }
1625
1626 pub fn try_new_with_options(
1632 mut writer: W,
1633 schema: &Schema,
1634 write_options: IpcWriteOptions,
1635 ) -> Result<Self, ArrowError> {
1636 let data_gen = IpcDataGenerator::default();
1637 let mut dictionary_tracker = DictionaryTracker::new(false);
1638
1639 let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker(
1641 schema,
1642 &mut dictionary_tracker,
1643 &write_options,
1644 );
1645 write_message(&mut writer, encoded_message, &write_options)?;
1646 Ok(Self {
1647 writer,
1648 write_options,
1649 finished: false,
1650 dictionary_tracker,
1651 data_gen,
1652 ipc_write_context: IpcWriteContext::default(),
1653 })
1654 }
1655
1656 pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1658 if self.finished {
1659 return Err(ArrowError::IpcError(
1660 "Cannot write record batch to stream writer as it is closed".to_string(),
1661 ));
1662 }
1663
1664 self.data_gen.write(
1665 batch,
1666 &mut self.dictionary_tracker,
1667 &self.write_options,
1668 &mut self.ipc_write_context,
1669 &mut self.writer,
1670 )?;
1671 Ok(())
1672 }
1673
1674 pub fn finish(&mut self) -> Result<(), ArrowError> {
1676 if self.finished {
1677 return Err(ArrowError::IpcError(
1678 "Cannot write footer to stream writer as it is closed".to_string(),
1679 ));
1680 }
1681
1682 write_continuation(&mut self.writer, &self.write_options, 0)?;
1683 self.writer.flush()?;
1684
1685 self.finished = true;
1686
1687 Ok(())
1688 }
1689
1690 pub fn get_ref(&self) -> &W {
1692 &self.writer
1693 }
1694
1695 pub fn get_mut(&mut self) -> &mut W {
1699 &mut self.writer
1700 }
1701
1702 pub fn flush(&mut self) -> Result<(), ArrowError> {
1706 self.writer.flush()?;
1707 Ok(())
1708 }
1709
1710 pub fn into_inner(mut self) -> Result<W, ArrowError> {
1748 if !self.finished {
1749 self.finish()?;
1751 }
1752 Ok(self.writer)
1753 }
1754}
1755
1756impl<W: Write> RecordBatchWriter for StreamWriter<W> {
1757 fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1758 self.write(batch)
1759 }
1760
1761 fn close(mut self) -> Result<(), ArrowError> {
1762 self.finish()
1763 }
1764}
1765
1766pub struct EncodedData {
1768 pub ipc_message: Vec<u8>,
1770 pub arrow_data: Vec<u8>,
1772}
1773pub fn write_message<W: Write>(
1775 mut writer: W,
1776 encoded: EncodedData,
1777 write_options: &IpcWriteOptions,
1778) -> Result<(usize, usize), ArrowError> {
1779 let arrow_data_len = encoded.arrow_data.len();
1780 if arrow_data_len % usize::from(write_options.alignment) != 0 {
1781 return Err(ArrowError::MemoryError(
1782 "Arrow data not aligned".to_string(),
1783 ));
1784 }
1785
1786 let a = usize::from(write_options.alignment - 1);
1787 let buffer = encoded.ipc_message;
1788 let flatbuf_size = buffer.len();
1789 let prefix_size = if write_options.write_legacy_ipc_format {
1790 4
1791 } else {
1792 8
1793 };
1794 let aligned_size = (flatbuf_size + prefix_size + a) & !a;
1795 let padding_bytes = aligned_size - flatbuf_size - prefix_size;
1796
1797 write_continuation(
1798 &mut writer,
1799 write_options,
1800 (aligned_size - prefix_size) as i32,
1801 )?;
1802
1803 if flatbuf_size > 0 {
1805 writer.write_all(&buffer)?;
1806 }
1807 writer.write_all(&PADDING[..padding_bytes])?;
1809
1810 let body_len = if arrow_data_len > 0 {
1812 write_body_buffers(&mut writer, &encoded.arrow_data, write_options.alignment)?
1813 } else {
1814 0
1815 };
1816
1817 Ok((aligned_size, body_len))
1818}
1819
1820fn write_body_buffers<W: Write>(
1821 mut writer: W,
1822 data: &[u8],
1823 alignment: u8,
1824) -> Result<usize, ArrowError> {
1825 let len = data.len();
1826 let pad_len = pad_to_alignment(alignment, len);
1827 let total_len = len + pad_len;
1828
1829 writer.write_all(data)?;
1831 if pad_len > 0 {
1832 writer.write_all(&PADDING[..pad_len])?;
1833 }
1834
1835 Ok(total_len)
1836}
1837
1838fn write_continuation<W: Write>(
1841 mut writer: W,
1842 write_options: &IpcWriteOptions,
1843 total_len: i32,
1844) -> Result<usize, ArrowError> {
1845 let mut written = 8;
1846
1847 match write_options.metadata_version {
1849 crate::MetadataVersion::V1 | crate::MetadataVersion::V2 | crate::MetadataVersion::V3 => {
1850 unreachable!("Options with the metadata version cannot be created")
1851 }
1852 crate::MetadataVersion::V4 => {
1853 if !write_options.write_legacy_ipc_format {
1854 writer.write_all(&CONTINUATION_MARKER)?;
1856 written = 4;
1857 }
1858 writer.write_all(&total_len.to_le_bytes()[..])?;
1859 }
1860 crate::MetadataVersion::V5 => {
1861 writer.write_all(&CONTINUATION_MARKER)?;
1863 writer.write_all(&total_len.to_le_bytes()[..])?;
1864 }
1865 z => panic!("Unsupported crate::MetadataVersion {z:?}"),
1866 };
1867
1868 Ok(written)
1869}
1870
1871fn has_validity_bitmap(data_type: &DataType, write_options: &IpcWriteOptions) -> bool {
1875 if write_options.metadata_version < crate::MetadataVersion::V5 {
1876 !matches!(data_type, DataType::Null)
1877 } else {
1878 !matches!(
1879 data_type,
1880 DataType::Null | DataType::Union(_, _) | DataType::RunEndEncoded(_, _)
1881 )
1882 }
1883}
1884
1885#[inline]
1887fn buffer_need_truncate(
1888 array_offset: usize,
1889 buffer: &Buffer,
1890 spec: &BufferSpec,
1891 min_length: usize,
1892) -> bool {
1893 spec != &BufferSpec::AlwaysNull && (array_offset != 0 || min_length < buffer.len())
1894}
1895
1896#[inline]
1898fn get_buffer_element_width(spec: &BufferSpec) -> usize {
1899 match spec {
1900 BufferSpec::FixedWidth { byte_width, .. } => *byte_width,
1901 _ => 0,
1902 }
1903}
1904
1905fn reencode_offsets<O: OffsetSizeTrait>(
1908 offsets: &Buffer,
1909 data: &ArrayData,
1910) -> (Buffer, usize, usize) {
1911 let offsets_slice: &[O] = offsets.typed_data::<O>();
1912 let offset_slice = &offsets_slice[data.offset()..data.offset() + data.len() + 1];
1913
1914 let start_offset = offset_slice.first().unwrap();
1915 let end_offset = offset_slice.last().unwrap();
1916
1917 let offsets = match start_offset.as_usize() {
1918 0 => {
1919 let size = size_of::<O>();
1920 offsets.slice_with_length(data.offset() * size, (data.len() + 1) * size)
1921 }
1922 _ => offset_slice.iter().map(|x| *x - *start_offset).collect(),
1923 };
1924
1925 let start_offset = start_offset.as_usize();
1926 let end_offset = end_offset.as_usize();
1927
1928 (offsets, start_offset, end_offset - start_offset)
1929}
1930
1931fn get_byte_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, Buffer) {
1937 if data.is_empty() {
1938 let mut offsets = MutableBuffer::new(size_of::<O>());
1941 offsets.extend_from_slice(O::usize_as(0).to_byte_slice());
1942 return (offsets.into(), MutableBuffer::new(0).into());
1943 }
1944
1945 let (offsets, original_start_offset, len) = reencode_offsets::<O>(&data.buffers()[0], data);
1946 let values = data.buffers()[1].slice_with_length(original_start_offset, len);
1947 (offsets, values)
1948}
1949
1950fn get_list_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, ArrayData) {
1953 if data.is_empty() {
1954 let mut offsets = MutableBuffer::new(size_of::<O>());
1957 offsets.extend_from_slice(O::usize_as(0).to_byte_slice());
1958 return (offsets.into(), data.child_data()[0].slice(0, 0));
1959 }
1960
1961 let (offsets, original_start_offset, len) = reencode_offsets::<O>(&data.buffers()[0], data);
1962 let child_data = data.child_data()[0].slice(original_start_offset, len);
1963 (offsets, child_data)
1964}
1965
1966fn get_list_view_array_buffers<O: OffsetSizeTrait>(
1972 data: &ArrayData,
1973) -> (Buffer, Buffer, ArrayData) {
1974 if data.is_empty() {
1975 return (
1976 MutableBuffer::new(0).into(),
1977 MutableBuffer::new(0).into(),
1978 data.child_data()[0].slice(0, 0),
1979 );
1980 }
1981
1982 let offsets = &data.buffers()[0];
1983 let sizes = &data.buffers()[1];
1984
1985 let element_size = std::mem::size_of::<O>();
1986 let offsets_slice =
1987 offsets.slice_with_length(data.offset() * element_size, data.len() * element_size);
1988 let sizes_slice =
1989 sizes.slice_with_length(data.offset() * element_size, data.len() * element_size);
1990
1991 let child_data = data.child_data()[0].clone();
1992
1993 (offsets_slice, sizes_slice, child_data)
1994}
1995
1996fn get_or_truncate_buffer(array_data: &ArrayData) -> Buffer {
2003 let buffer = &array_data.buffers()[0];
2004 let layout = layout(array_data.data_type());
2005 let spec = &layout.buffers[0];
2006
2007 let byte_width = get_buffer_element_width(spec);
2008 let min_length = array_data.len() * byte_width;
2009 if buffer_need_truncate(array_data.offset(), buffer, spec, min_length) {
2010 let byte_offset = array_data.offset() * byte_width;
2011 let buffer_length = min(min_length, buffer.len() - byte_offset);
2012 buffer.slice_with_length(byte_offset, buffer_length)
2013 } else {
2014 buffer.clone()
2015 }
2016}
2017
2018fn write_array_data(
2024 array_data: &ArrayData,
2025 meta: &mut IpcMetadataBuilder,
2026 sink: &mut IpcBodySink<'_>,
2027 offset: i64,
2028 compression_codec: Option<CompressionCodec>,
2029 ipc_write_context: &mut IpcWriteContext,
2030 write_options: &IpcWriteOptions,
2031) -> Result<i64, ArrowError> {
2032 let mut offset = offset;
2033 let num_rows = array_data.len();
2034 if !matches!(array_data.data_type(), DataType::Null) {
2035 meta.nodes.push(crate::FieldNode::new(
2036 num_rows as i64,
2037 array_data.null_count() as i64,
2038 ));
2039 } else {
2040 meta.nodes
2042 .push(crate::FieldNode::new(num_rows as i64, num_rows as i64));
2043 }
2044 if has_validity_bitmap(array_data.data_type(), write_options) {
2045 let null_buffer = match array_data.nulls() {
2047 None => {
2048 let num_bytes = bit_util::ceil(num_rows, 8);
2050 let buffer = MutableBuffer::new(num_bytes);
2051 let buffer = buffer.with_bitset(num_bytes, true);
2052 buffer.into()
2053 }
2054 Some(buffer) => buffer.inner().sliced(),
2055 };
2056
2057 offset = encode_sink_buffer(
2058 null_buffer,
2059 meta,
2060 sink,
2061 offset,
2062 compression_codec,
2063 ipc_write_context,
2064 write_options.alignment,
2065 )?;
2066 }
2067
2068 let data_type = array_data.data_type();
2069 if matches!(data_type, DataType::Binary | DataType::Utf8) {
2070 let (offsets, values) = get_byte_array_buffers::<i32>(array_data);
2071 for buffer in [offsets, values] {
2072 offset = encode_sink_buffer(
2073 buffer,
2074 meta,
2075 sink,
2076 offset,
2077 compression_codec,
2078 ipc_write_context,
2079 write_options.alignment,
2080 )?;
2081 }
2082 } else if matches!(data_type, DataType::BinaryView | DataType::Utf8View) {
2083 let views = get_or_truncate_buffer(array_data);
2090 offset = encode_sink_buffer(
2091 views,
2092 meta,
2093 sink,
2094 offset,
2095 compression_codec,
2096 ipc_write_context,
2097 write_options.alignment,
2098 )?;
2099
2100 for buffer in array_data.buffers().iter().skip(1) {
2101 offset = encode_sink_buffer(
2102 buffer.clone(),
2103 meta,
2104 sink,
2105 offset,
2106 compression_codec,
2107 ipc_write_context,
2108 write_options.alignment,
2109 )?;
2110 }
2111 } else if matches!(data_type, DataType::LargeBinary | DataType::LargeUtf8) {
2112 let (offsets, values) = get_byte_array_buffers::<i64>(array_data);
2113 for buffer in [offsets, values] {
2114 offset = encode_sink_buffer(
2115 buffer,
2116 meta,
2117 sink,
2118 offset,
2119 compression_codec,
2120 ipc_write_context,
2121 write_options.alignment,
2122 )?;
2123 }
2124 } else if DataType::is_numeric(data_type)
2125 || DataType::is_temporal(data_type)
2126 || matches!(
2127 array_data.data_type(),
2128 DataType::FixedSizeBinary(_) | DataType::Dictionary(_, _)
2129 )
2130 {
2131 assert_eq!(array_data.buffers().len(), 1);
2133
2134 let buffer = get_or_truncate_buffer(array_data);
2135 offset = encode_sink_buffer(
2136 buffer,
2137 meta,
2138 sink,
2139 offset,
2140 compression_codec,
2141 ipc_write_context,
2142 write_options.alignment,
2143 )?;
2144 } else if matches!(data_type, DataType::Boolean) {
2145 assert_eq!(array_data.buffers().len(), 1);
2148
2149 let buffer = &array_data.buffers()[0];
2150 let buffer = buffer.bit_slice(array_data.offset(), array_data.len());
2151 offset = encode_sink_buffer(
2152 buffer,
2153 meta,
2154 sink,
2155 offset,
2156 compression_codec,
2157 ipc_write_context,
2158 write_options.alignment,
2159 )?;
2160 } else if matches!(
2161 data_type,
2162 DataType::List(_) | DataType::LargeList(_) | DataType::Map(_, _)
2163 ) {
2164 assert_eq!(array_data.buffers().len(), 1);
2165 assert_eq!(array_data.child_data().len(), 1);
2166
2167 let (offsets, sliced_child_data) = match data_type {
2169 DataType::List(_) => get_list_array_buffers::<i32>(array_data),
2170 DataType::Map(_, _) => get_list_array_buffers::<i32>(array_data),
2171 DataType::LargeList(_) => get_list_array_buffers::<i64>(array_data),
2172 _ => unreachable!(),
2173 };
2174 offset = encode_sink_buffer(
2175 offsets,
2176 meta,
2177 sink,
2178 offset,
2179 compression_codec,
2180 ipc_write_context,
2181 write_options.alignment,
2182 )?;
2183 offset = write_array_data(
2184 &sliced_child_data,
2185 meta,
2186 sink,
2187 offset,
2188 compression_codec,
2189 ipc_write_context,
2190 write_options,
2191 )?;
2192 return Ok(offset);
2193 } else if matches!(
2194 data_type,
2195 DataType::ListView(_) | DataType::LargeListView(_)
2196 ) {
2197 assert_eq!(array_data.buffers().len(), 2); assert_eq!(array_data.child_data().len(), 1);
2199
2200 let (offsets, sizes, child_data) = match data_type {
2201 DataType::ListView(_) => get_list_view_array_buffers::<i32>(array_data),
2202 DataType::LargeListView(_) => get_list_view_array_buffers::<i64>(array_data),
2203 _ => unreachable!(),
2204 };
2205
2206 offset = encode_sink_buffer(
2207 offsets,
2208 meta,
2209 sink,
2210 offset,
2211 compression_codec,
2212 ipc_write_context,
2213 write_options.alignment,
2214 )?;
2215 offset = encode_sink_buffer(
2216 sizes,
2217 meta,
2218 sink,
2219 offset,
2220 compression_codec,
2221 ipc_write_context,
2222 write_options.alignment,
2223 )?;
2224
2225 offset = write_array_data(
2226 &child_data,
2227 meta,
2228 sink,
2229 offset,
2230 compression_codec,
2231 ipc_write_context,
2232 write_options,
2233 )?;
2234 return Ok(offset);
2235 } else if let DataType::FixedSizeList(_, fixed_size) = data_type {
2236 assert_eq!(array_data.child_data().len(), 1);
2237 let fixed_size = *fixed_size as usize;
2238
2239 let child_offset = array_data.offset() * fixed_size;
2240 let child_length = array_data.len() * fixed_size;
2241 let child_data = array_data.child_data()[0].slice(child_offset, child_length);
2242
2243 offset = write_array_data(
2244 &child_data,
2245 meta,
2246 sink,
2247 offset,
2248 compression_codec,
2249 ipc_write_context,
2250 write_options,
2251 )?;
2252 return Ok(offset);
2253 } else {
2254 for buffer in array_data.buffers() {
2255 offset = encode_sink_buffer(
2256 buffer.clone(),
2257 meta,
2258 sink,
2259 offset,
2260 compression_codec,
2261 ipc_write_context,
2262 write_options.alignment,
2263 )?;
2264 }
2265 }
2266
2267 match array_data.data_type() {
2268 DataType::Dictionary(_, _) => {}
2269 DataType::RunEndEncoded(_, _) => {
2270 let arr = unslice_run_array(array_data.clone())?;
2272 for data_ref in arr.child_data() {
2274 offset = write_array_data(
2276 data_ref,
2277 meta,
2278 sink,
2279 offset,
2280 compression_codec,
2281 ipc_write_context,
2282 write_options,
2283 )?;
2284 }
2285 }
2286 _ => {
2287 for data_ref in array_data.child_data() {
2289 offset = write_array_data(
2291 data_ref,
2292 meta,
2293 sink,
2294 offset,
2295 compression_codec,
2296 ipc_write_context,
2297 write_options,
2298 )?;
2299 }
2300 }
2301 }
2302 Ok(offset)
2303}
2304
2305fn encode_sink_buffer(
2319 buffer: Buffer,
2320 ipc_meta_data: &mut IpcMetadataBuilder,
2321 sink: &mut IpcBodySink<'_>,
2322 offset: i64,
2323 compression_codec: Option<CompressionCodec>,
2324 ipc_write_context: &mut IpcWriteContext,
2325 alignment: u8,
2326) -> Result<i64, ArrowError> {
2327 let (encoded, len) = match compression_codec {
2328 None => {
2329 let len = buffer.len() as i64;
2330 (EncodedBuffer::Raw(buffer), len)
2331 }
2332 Some(codec) => {
2333 let mut scratch = Vec::new();
2334 let written =
2335 codec.compress_to_vec(buffer.as_slice(), &mut scratch, ipc_write_context)?;
2336 let len = i64::try_from(written)
2337 .map_err(|e| ArrowError::InvalidArgumentError(format!("{e}")))?;
2338 (EncodedBuffer::Compressed(scratch), len)
2339 }
2340 };
2341
2342 let pad_len = pad_to_alignment(alignment, len as usize);
2343 sink.write(pad_len, encoded);
2344 ipc_meta_data.buffers.push(crate::Buffer::new(offset, len));
2345 Ok(offset + len + pad_len as i64)
2346}
2347
2348const PADDING: [u8; 64] = [0; 64];
2349
2350#[inline]
2356fn estimate_encoded_buffer_count(dt: &DataType) -> usize {
2357 match dt {
2358 DataType::Null => 0,
2359
2360 DataType::Binary | DataType::Utf8 | DataType::LargeBinary | DataType::LargeUtf8 => 3,
2361
2362 DataType::BinaryView | DataType::Utf8View => 3,
2363
2364 DataType::List(f) | DataType::LargeList(f) | DataType::Map(f, _) => {
2365 2 + estimate_encoded_buffer_count(f.data_type())
2366 }
2367
2368 DataType::ListView(f) | DataType::LargeListView(f) => {
2369 3 + estimate_encoded_buffer_count(f.data_type())
2370 }
2371
2372 DataType::FixedSizeList(f, _) => 1 + estimate_encoded_buffer_count(f.data_type()),
2373
2374 DataType::Struct(fields) => {
2375 1 + fields
2376 .iter()
2377 .map(|f| estimate_encoded_buffer_count(f.data_type()))
2378 .sum::<usize>()
2379 }
2380
2381 DataType::Dictionary(_, _) => 2,
2383
2384 DataType::Union(fields, UnionMode::Sparse) => {
2385 1 + fields
2386 .iter()
2387 .map(|(_, f)| estimate_encoded_buffer_count(f.data_type()))
2388 .sum::<usize>()
2389 }
2390 DataType::Union(fields, UnionMode::Dense) => {
2391 2 + fields
2392 .iter()
2393 .map(|(_, f)| estimate_encoded_buffer_count(f.data_type()))
2394 .sum::<usize>()
2395 }
2396
2397 DataType::RunEndEncoded(run_ends, values) => {
2398 estimate_encoded_buffer_count(run_ends.data_type())
2399 + estimate_encoded_buffer_count(values.data_type())
2400 }
2401 _ => 2,
2403 }
2404}
2405
2406#[inline]
2408fn pad_to_alignment(alignment: u8, len: usize) -> usize {
2409 let a = usize::from(alignment - 1);
2410 ((len + a) & !a) - len
2411}
2412
2413#[cfg(test)]
2414mod tests {
2415 use std::hash::Hasher;
2416 use std::io::Cursor;
2417 use std::io::Seek;
2418
2419 use arrow_array::builder::FixedSizeListBuilder;
2420 use arrow_array::builder::Float32Builder;
2421 use arrow_array::builder::Int64Builder;
2422 use arrow_array::builder::MapBuilder;
2423 use arrow_array::builder::StringViewBuilder;
2424 use arrow_array::builder::UnionBuilder;
2425 use arrow_array::builder::{
2426 GenericListBuilder, GenericListViewBuilder, ListBuilder, StringBuilder,
2427 };
2428 use arrow_array::builder::{PrimitiveRunBuilder, UInt32Builder};
2429 use arrow_array::types::*;
2430 use arrow_buffer::ScalarBuffer;
2431
2432 use crate::MetadataVersion;
2433 use crate::convert::fb_to_schema;
2434 use crate::reader::*;
2435 use crate::root_as_footer;
2436
2437 use super::*;
2438
2439 fn serialize_file(rb: &RecordBatch) -> Vec<u8> {
2440 let mut writer = FileWriter::try_new(vec![], rb.schema_ref()).unwrap();
2441 writer.write(rb).unwrap();
2442 writer.finish().unwrap();
2443 writer.into_inner().unwrap()
2444 }
2445
2446 fn deserialize_file(bytes: Vec<u8>) -> RecordBatch {
2447 let mut reader = FileReader::try_new(Cursor::new(bytes), None).unwrap();
2448 reader.next().unwrap().unwrap()
2449 }
2450
2451 fn serialize_stream(record: &RecordBatch) -> Vec<u8> {
2452 const IPC_ALIGNMENT: usize = 8;
2456
2457 let mut stream_writer = StreamWriter::try_new_with_options(
2458 vec![],
2459 record.schema_ref(),
2460 IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
2461 )
2462 .unwrap();
2463 stream_writer.write(record).unwrap();
2464 stream_writer.finish().unwrap();
2465 stream_writer.into_inner().unwrap()
2466 }
2467
2468 fn deserialize_stream(bytes: Vec<u8>) -> RecordBatch {
2469 let mut stream_reader = StreamReader::try_new(Cursor::new(bytes), None).unwrap();
2470 stream_reader.next().unwrap().unwrap()
2471 }
2472
2473 #[test]
2474 #[cfg(feature = "lz4")]
2475 fn test_write_empty_record_batch_lz4_compression() {
2476 let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
2477 let values: Vec<Option<i32>> = vec![];
2478 let array = Int32Array::from(values);
2479 let record_batch =
2480 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2481
2482 let mut file = tempfile::tempfile().unwrap();
2483
2484 {
2485 let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2486 .unwrap()
2487 .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
2488 .unwrap();
2489
2490 let mut writer =
2491 FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2492 writer.write(&record_batch).unwrap();
2493 writer.finish().unwrap();
2494 }
2495 file.rewind().unwrap();
2496 {
2497 let reader = FileReader::try_new(file, None).unwrap();
2499 for read_batch in reader {
2500 read_batch
2501 .unwrap()
2502 .columns()
2503 .iter()
2504 .zip(record_batch.columns())
2505 .for_each(|(a, b)| {
2506 assert_eq!(a.data_type(), b.data_type());
2507 assert_eq!(a.len(), b.len());
2508 assert_eq!(a.null_count(), b.null_count());
2509 });
2510 }
2511 }
2512 }
2513
2514 #[test]
2515 #[cfg(feature = "lz4")]
2516 fn test_write_file_with_lz4_compression() {
2517 let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
2518 let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
2519 let array = Int32Array::from(values);
2520 let record_batch =
2521 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2522
2523 let mut file = tempfile::tempfile().unwrap();
2524 {
2525 let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2526 .unwrap()
2527 .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
2528 .unwrap();
2529
2530 let mut writer =
2531 FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2532 writer.write(&record_batch).unwrap();
2533 writer.finish().unwrap();
2534 }
2535 file.rewind().unwrap();
2536 {
2537 let reader = FileReader::try_new(file, None).unwrap();
2539 for read_batch in reader {
2540 read_batch
2541 .unwrap()
2542 .columns()
2543 .iter()
2544 .zip(record_batch.columns())
2545 .for_each(|(a, b)| {
2546 assert_eq!(a.data_type(), b.data_type());
2547 assert_eq!(a.len(), b.len());
2548 assert_eq!(a.null_count(), b.null_count());
2549 });
2550 }
2551 }
2552 }
2553
2554 #[test]
2555 #[cfg(feature = "zstd")]
2556 fn test_write_file_with_zstd_compression() {
2557 let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
2558 let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
2559 let array = Int32Array::from(values);
2560 let record_batch =
2561 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2562 let mut file = tempfile::tempfile().unwrap();
2563 {
2564 let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2565 .unwrap()
2566 .try_with_compression(Some(crate::CompressionType::ZSTD))
2567 .unwrap()
2568 .try_with_compression_level(Some(1))
2569 .unwrap();
2570
2571 let mut writer =
2572 FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2573 writer.write(&record_batch).unwrap();
2574 writer.finish().unwrap();
2575 }
2576 file.rewind().unwrap();
2577 {
2578 let reader = FileReader::try_new(file, None).unwrap();
2580 for read_batch in reader {
2581 read_batch
2582 .unwrap()
2583 .columns()
2584 .iter()
2585 .zip(record_batch.columns())
2586 .for_each(|(a, b)| {
2587 assert_eq!(a.data_type(), b.data_type());
2588 assert_eq!(a.len(), b.len());
2589 assert_eq!(a.null_count(), b.null_count());
2590 });
2591 }
2592 }
2593 }
2594
2595 #[test]
2596 fn test_write_file() {
2597 let schema = Schema::new(vec![Field::new("field1", DataType::UInt32, true)]);
2598 let values: Vec<Option<u32>> = vec![
2599 Some(999),
2600 None,
2601 Some(235),
2602 Some(123),
2603 None,
2604 None,
2605 None,
2606 None,
2607 None,
2608 ];
2609 let array1 = UInt32Array::from(values);
2610 let batch =
2611 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array1) as ArrayRef])
2612 .unwrap();
2613 let mut file = tempfile::tempfile().unwrap();
2614 {
2615 let mut writer = FileWriter::try_new(&mut file, &schema).unwrap();
2616
2617 writer.write(&batch).unwrap();
2618 writer.finish().unwrap();
2619 }
2620 file.rewind().unwrap();
2621
2622 {
2623 let mut reader = FileReader::try_new(file, None).unwrap();
2624 while let Some(Ok(read_batch)) = reader.next() {
2625 read_batch
2626 .columns()
2627 .iter()
2628 .zip(batch.columns())
2629 .for_each(|(a, b)| {
2630 assert_eq!(a.data_type(), b.data_type());
2631 assert_eq!(a.len(), b.len());
2632 assert_eq!(a.null_count(), b.null_count());
2633 });
2634 }
2635 }
2636 }
2637
2638 #[test]
2639 fn test_empty_utf8_ipc_writes_nonempty_offsets_buffer() {
2640 let name = StringArray::from(Vec::<String>::new());
2641 let (offsets, values) = get_byte_array_buffers::<i32>(&name.to_data());
2642
2643 assert_eq!(name.len(), 0);
2644 assert_eq!(
2645 offsets.len(),
2646 std::mem::size_of::<i32>(),
2647 "offsets buffer should contain one zero i32 offset"
2648 );
2649 assert_eq!(values.len(), 0, "values buffer should remain empty");
2650 }
2651
2652 #[test]
2653 fn test_empty_large_utf8_ipc_writes_nonempty_offsets_buffer() {
2654 let name = LargeStringArray::from(Vec::<String>::new());
2655 let (offsets, values) = get_byte_array_buffers::<i64>(&name.to_data());
2656
2657 assert_eq!(name.len(), 0);
2658 assert_eq!(
2659 offsets.len(),
2660 std::mem::size_of::<i64>(),
2661 "offsets buffer should contain one zero i64 offset"
2662 );
2663 assert_eq!(values.len(), 0, "values buffer should remain empty");
2664 }
2665
2666 #[test]
2667 fn test_empty_list_ipc_writes_nonempty_offsets_buffer() {
2668 let list = GenericListBuilder::<i32, _>::new(UInt32Builder::new()).finish();
2669 let (offsets, child_data) = get_list_array_buffers::<i32>(&list.to_data());
2670
2671 assert_eq!(list.len(), 0);
2672 assert_eq!(
2673 offsets.len(),
2674 std::mem::size_of::<i32>(),
2675 "offsets buffer should contain one zero i32 offset"
2676 );
2677 assert_eq!(child_data.len(), 0, "child data should remain empty");
2678 }
2679
2680 #[test]
2681 fn test_empty_large_list_ipc_writes_nonempty_offsets_buffer() {
2682 let list = GenericListBuilder::<i64, _>::new(UInt32Builder::new()).finish();
2683 let (offsets, child_data) = get_list_array_buffers::<i64>(&list.to_data());
2684
2685 assert_eq!(list.len(), 0);
2686 assert_eq!(
2687 offsets.len(),
2688 std::mem::size_of::<i64>(),
2689 "offsets buffer should contain one zero i64 offset"
2690 );
2691 assert_eq!(child_data.len(), 0, "child data should remain empty");
2692 }
2693
2694 fn write_null_file(options: IpcWriteOptions) {
2695 let schema = Schema::new(vec![
2696 Field::new("nulls", DataType::Null, true),
2697 Field::new("int32s", DataType::Int32, false),
2698 Field::new("nulls2", DataType::Null, true),
2699 Field::new("f64s", DataType::Float64, false),
2700 ]);
2701 let array1 = NullArray::new(32);
2702 let array2 = Int32Array::from(vec![1; 32]);
2703 let array3 = NullArray::new(32);
2704 let array4 = Float64Array::from(vec![f64::NAN; 32]);
2705 let batch = RecordBatch::try_new(
2706 Arc::new(schema.clone()),
2707 vec![
2708 Arc::new(array1) as ArrayRef,
2709 Arc::new(array2) as ArrayRef,
2710 Arc::new(array3) as ArrayRef,
2711 Arc::new(array4) as ArrayRef,
2712 ],
2713 )
2714 .unwrap();
2715 let mut file = tempfile::tempfile().unwrap();
2716 {
2717 let mut writer = FileWriter::try_new_with_options(&mut file, &schema, options).unwrap();
2718
2719 writer.write(&batch).unwrap();
2720 writer.finish().unwrap();
2721 }
2722
2723 file.rewind().unwrap();
2724
2725 {
2726 let reader = FileReader::try_new(file, None).unwrap();
2727 reader.for_each(|maybe_batch| {
2728 maybe_batch
2729 .unwrap()
2730 .columns()
2731 .iter()
2732 .zip(batch.columns())
2733 .for_each(|(a, b)| {
2734 assert_eq!(a.data_type(), b.data_type());
2735 assert_eq!(a.len(), b.len());
2736 assert_eq!(a.null_count(), b.null_count());
2737 });
2738 });
2739 }
2740 }
2741 #[test]
2742 fn test_write_null_file_v4() {
2743 write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap());
2744 write_null_file(IpcWriteOptions::try_new(8, true, MetadataVersion::V4).unwrap());
2745 write_null_file(IpcWriteOptions::try_new(64, false, MetadataVersion::V4).unwrap());
2746 write_null_file(IpcWriteOptions::try_new(64, true, MetadataVersion::V4).unwrap());
2747 }
2748
2749 #[test]
2750 fn test_write_null_file_v5() {
2751 write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap());
2752 write_null_file(IpcWriteOptions::try_new(64, false, MetadataVersion::V5).unwrap());
2753 }
2754
2755 #[test]
2756 fn track_union_nested_dict() {
2757 let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2758
2759 let array = Arc::new(inner) as ArrayRef;
2760
2761 #[allow(deprecated)]
2763 let dctfield = Field::new_dict("dict", array.data_type().clone(), false, 0, false);
2764 let union_fields = [(0, Arc::new(dctfield))].into_iter().collect();
2765
2766 let types = [0, 0, 0].into_iter().collect::<ScalarBuffer<i8>>();
2767 let offsets = [0, 1, 2].into_iter().collect::<ScalarBuffer<i32>>();
2768
2769 let union = UnionArray::try_new(union_fields, types, Some(offsets), vec![array]).unwrap();
2770
2771 let schema = Arc::new(Schema::new(vec![Field::new(
2772 "union",
2773 union.data_type().clone(),
2774 false,
2775 )]));
2776
2777 let r#gen = IpcDataGenerator::default();
2778 let mut dict_tracker = DictionaryTracker::new(false);
2779 r#gen.schema_to_bytes_with_dictionary_tracker(
2780 &schema,
2781 &mut dict_tracker,
2782 &IpcWriteOptions::default(),
2783 );
2784
2785 let batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap();
2786
2787 r#gen
2788 .encode(
2789 &batch,
2790 &mut dict_tracker,
2791 &Default::default(),
2792 &mut Default::default(),
2793 )
2794 .unwrap();
2795
2796 assert!(dict_tracker.written.contains_key(&0));
2799 }
2800
2801 #[test]
2802 fn track_struct_nested_dict() {
2803 let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2804
2805 let array = Arc::new(inner) as ArrayRef;
2806
2807 #[allow(deprecated)]
2809 let dctfield = Arc::new(Field::new_dict(
2810 "dict",
2811 array.data_type().clone(),
2812 false,
2813 2,
2814 false,
2815 ));
2816
2817 let s = StructArray::from(vec![(dctfield, array)]);
2818 let struct_array = Arc::new(s) as ArrayRef;
2819
2820 let schema = Arc::new(Schema::new(vec![Field::new(
2821 "struct",
2822 struct_array.data_type().clone(),
2823 false,
2824 )]));
2825
2826 let r#gen = IpcDataGenerator::default();
2827 let mut dict_tracker = DictionaryTracker::new(false);
2828 r#gen.schema_to_bytes_with_dictionary_tracker(
2829 &schema,
2830 &mut dict_tracker,
2831 &IpcWriteOptions::default(),
2832 );
2833
2834 let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
2835
2836 r#gen
2837 .encode(
2838 &batch,
2839 &mut dict_tracker,
2840 &Default::default(),
2841 &mut Default::default(),
2842 )
2843 .unwrap();
2844
2845 assert!(dict_tracker.written.contains_key(&0));
2846 }
2847
2848 fn write_union_file(options: IpcWriteOptions) {
2849 let schema = Schema::new(vec![Field::new_union(
2850 "union",
2851 vec![0, 1],
2852 vec![
2853 Field::new("a", DataType::Int32, false),
2854 Field::new("c", DataType::Float64, false),
2855 ],
2856 UnionMode::Sparse,
2857 )]);
2858 let mut builder = UnionBuilder::with_capacity_sparse(5);
2859 builder.append::<Int32Type>("a", 1).unwrap();
2860 builder.append_null::<Int32Type>("a").unwrap();
2861 builder.append::<Float64Type>("c", 3.0).unwrap();
2862 builder.append_null::<Float64Type>("c").unwrap();
2863 builder.append::<Int32Type>("a", 4).unwrap();
2864 let union = builder.build().unwrap();
2865
2866 let batch =
2867 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(union) as ArrayRef])
2868 .unwrap();
2869
2870 let mut file = tempfile::tempfile().unwrap();
2871 {
2872 let mut writer = FileWriter::try_new_with_options(&mut file, &schema, options).unwrap();
2873
2874 writer.write(&batch).unwrap();
2875 writer.finish().unwrap();
2876 }
2877 file.rewind().unwrap();
2878
2879 {
2880 let reader = FileReader::try_new(file, None).unwrap();
2881 reader.for_each(|maybe_batch| {
2882 maybe_batch
2883 .unwrap()
2884 .columns()
2885 .iter()
2886 .zip(batch.columns())
2887 .for_each(|(a, b)| {
2888 assert_eq!(a.data_type(), b.data_type());
2889 assert_eq!(a.len(), b.len());
2890 assert_eq!(a.null_count(), b.null_count());
2891 });
2892 });
2893 }
2894 }
2895
2896 #[test]
2897 fn test_write_union_file_v4_v5() {
2898 write_union_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap());
2899 write_union_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap());
2900 }
2901
2902 #[test]
2903 fn test_write_view_types() {
2904 const LONG_TEST_STRING: &str =
2905 "This is a long string to make sure binary view array handles it";
2906 let schema = Schema::new(vec![
2907 Field::new("field1", DataType::BinaryView, true),
2908 Field::new("field2", DataType::Utf8View, true),
2909 ]);
2910 let values: Vec<Option<&[u8]>> = vec![
2911 Some(b"foo"),
2912 Some(b"bar"),
2913 Some(LONG_TEST_STRING.as_bytes()),
2914 ];
2915 let binary_array = BinaryViewArray::from_iter(values);
2916 let utf8_array =
2917 StringViewArray::from_iter(vec![Some("foo"), Some("bar"), Some(LONG_TEST_STRING)]);
2918 let record_batch = RecordBatch::try_new(
2919 Arc::new(schema.clone()),
2920 vec![Arc::new(binary_array), Arc::new(utf8_array)],
2921 )
2922 .unwrap();
2923
2924 let mut file = tempfile::tempfile().unwrap();
2925 {
2926 let mut writer = FileWriter::try_new(&mut file, &schema).unwrap();
2927 writer.write(&record_batch).unwrap();
2928 writer.finish().unwrap();
2929 }
2930 file.rewind().unwrap();
2931 {
2932 let mut reader = FileReader::try_new(&file, None).unwrap();
2933 let read_batch = reader.next().unwrap().unwrap();
2934 read_batch
2935 .columns()
2936 .iter()
2937 .zip(record_batch.columns())
2938 .for_each(|(a, b)| {
2939 assert_eq!(a, b);
2940 });
2941 }
2942 file.rewind().unwrap();
2943 {
2944 let mut reader = FileReader::try_new(&file, Some(vec![0])).unwrap();
2945 let read_batch = reader.next().unwrap().unwrap();
2946 assert_eq!(read_batch.num_columns(), 1);
2947 let read_array = read_batch.column(0);
2948 let write_array = record_batch.column(0);
2949 assert_eq!(read_array, write_array);
2950 }
2951 }
2952
2953 #[test]
2954 fn truncate_ipc_record_batch() {
2955 fn create_batch(rows: usize) -> RecordBatch {
2956 let schema = Schema::new(vec![
2957 Field::new("a", DataType::Int32, false),
2958 Field::new("b", DataType::Utf8, false),
2959 ]);
2960
2961 let a = Int32Array::from_iter_values(0..rows as i32);
2962 let b = StringArray::from_iter_values((0..rows).map(|i| i.to_string()));
2963
2964 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap()
2965 }
2966
2967 let big_record_batch = create_batch(65536);
2968
2969 let length = 5;
2970 let small_record_batch = create_batch(length);
2971
2972 let offset = 2;
2973 let record_batch_slice = big_record_batch.slice(offset, length);
2974 assert!(
2975 serialize_stream(&big_record_batch).len() > serialize_stream(&small_record_batch).len()
2976 );
2977 assert_eq!(
2978 serialize_stream(&small_record_batch).len(),
2979 serialize_stream(&record_batch_slice).len()
2980 );
2981
2982 assert_eq!(
2983 deserialize_stream(serialize_stream(&record_batch_slice)),
2984 record_batch_slice
2985 );
2986 }
2987
2988 #[test]
2989 fn truncate_ipc_record_batch_with_nulls() {
2990 fn create_batch() -> RecordBatch {
2991 let schema = Schema::new(vec![
2992 Field::new("a", DataType::Int32, true),
2993 Field::new("b", DataType::Utf8, true),
2994 ]);
2995
2996 let a = Int32Array::from(vec![Some(1), None, Some(1), None, Some(1)]);
2997 let b = StringArray::from(vec![None, Some("a"), Some("a"), None, Some("a")]);
2998
2999 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap()
3000 }
3001
3002 let record_batch = create_batch();
3003 let record_batch_slice = record_batch.slice(1, 2);
3004 let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
3005
3006 assert!(
3007 serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
3008 );
3009
3010 assert!(deserialized_batch.column(0).is_null(0));
3011 assert!(deserialized_batch.column(0).is_valid(1));
3012 assert!(deserialized_batch.column(1).is_valid(0));
3013 assert!(deserialized_batch.column(1).is_valid(1));
3014
3015 assert_eq!(record_batch_slice, deserialized_batch);
3016 }
3017
3018 #[test]
3019 fn truncate_ipc_dictionary_array() {
3020 fn create_batch() -> RecordBatch {
3021 let values: StringArray = [Some("foo"), Some("bar"), Some("baz")]
3022 .into_iter()
3023 .collect();
3024 let keys: Int32Array = [Some(0), Some(2), None, Some(1)].into_iter().collect();
3025
3026 let array = DictionaryArray::new(keys, Arc::new(values));
3027
3028 let schema = Schema::new(vec![Field::new("dict", array.data_type().clone(), true)]);
3029
3030 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap()
3031 }
3032
3033 let record_batch = create_batch();
3034 let record_batch_slice = record_batch.slice(1, 2);
3035 let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
3036
3037 assert!(
3038 serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
3039 );
3040
3041 assert!(deserialized_batch.column(0).is_valid(0));
3042 assert!(deserialized_batch.column(0).is_null(1));
3043
3044 assert_eq!(record_batch_slice, deserialized_batch);
3045 }
3046
3047 #[test]
3048 fn truncate_ipc_struct_array() {
3049 fn create_batch() -> RecordBatch {
3050 let strings: StringArray = [Some("foo"), None, Some("bar"), Some("baz")]
3051 .into_iter()
3052 .collect();
3053 let ints: Int32Array = [Some(0), Some(2), None, Some(1)].into_iter().collect();
3054
3055 let struct_array = StructArray::from(vec![
3056 (
3057 Arc::new(Field::new("s", DataType::Utf8, true)),
3058 Arc::new(strings) as ArrayRef,
3059 ),
3060 (
3061 Arc::new(Field::new("c", DataType::Int32, true)),
3062 Arc::new(ints) as ArrayRef,
3063 ),
3064 ]);
3065
3066 let schema = Schema::new(vec![Field::new(
3067 "struct_array",
3068 struct_array.data_type().clone(),
3069 true,
3070 )]);
3071
3072 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(struct_array)]).unwrap()
3073 }
3074
3075 let record_batch = create_batch();
3076 let record_batch_slice = record_batch.slice(1, 2);
3077 let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
3078
3079 assert!(
3080 serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
3081 );
3082
3083 let structs = deserialized_batch
3084 .column(0)
3085 .as_any()
3086 .downcast_ref::<StructArray>()
3087 .unwrap();
3088
3089 assert!(structs.column(0).is_null(0));
3090 assert!(structs.column(0).is_valid(1));
3091 assert!(structs.column(1).is_valid(0));
3092 assert!(structs.column(1).is_null(1));
3093 assert_eq!(record_batch_slice, deserialized_batch);
3094 }
3095
3096 #[test]
3097 fn truncate_ipc_string_array_with_all_empty_string() {
3098 fn create_batch() -> RecordBatch {
3099 let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
3100 let a = StringArray::from(vec![Some(""), Some(""), Some(""), Some(""), Some("")]);
3101 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap()
3102 }
3103
3104 let record_batch = create_batch();
3105 let record_batch_slice = record_batch.slice(0, 1);
3106 let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
3107
3108 assert!(
3109 serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
3110 );
3111 assert_eq!(record_batch_slice, deserialized_batch);
3112 }
3113
3114 #[test]
3115 fn test_stream_writer_writes_array_slice() {
3116 let array = UInt32Array::from(vec![Some(1), Some(2), Some(3)]);
3117 assert_eq!(
3118 vec![Some(1), Some(2), Some(3)],
3119 array.iter().collect::<Vec<_>>()
3120 );
3121
3122 let sliced = array.slice(1, 2);
3123 assert_eq!(vec![Some(2), Some(3)], sliced.iter().collect::<Vec<_>>());
3124
3125 let batch = RecordBatch::try_new(
3126 Arc::new(Schema::new(vec![Field::new("a", DataType::UInt32, true)])),
3127 vec![Arc::new(sliced)],
3128 )
3129 .expect("new batch");
3130
3131 let mut writer = StreamWriter::try_new(vec![], batch.schema_ref()).expect("new writer");
3132 writer.write(&batch).expect("write");
3133 let outbuf = writer.into_inner().expect("inner");
3134
3135 let mut reader = StreamReader::try_new(&outbuf[..], None).expect("new reader");
3136 let read_batch = reader.next().unwrap().expect("read batch");
3137
3138 let read_array: &UInt32Array = read_batch.column(0).as_primitive();
3139 assert_eq!(
3140 vec![Some(2), Some(3)],
3141 read_array.iter().collect::<Vec<_>>()
3142 );
3143 }
3144
3145 #[test]
3146 fn test_large_slice_uint32() {
3147 ensure_roundtrip(Arc::new(UInt32Array::from_iter(
3148 (0..8000).map(|i| if i % 2 == 0 { Some(i) } else { None }),
3149 )));
3150 }
3151
3152 #[test]
3153 fn test_large_slice_string() {
3154 let strings: Vec<_> = (0..8000)
3155 .map(|i| {
3156 if i % 2 == 0 {
3157 Some(format!("value{i}"))
3158 } else {
3159 None
3160 }
3161 })
3162 .collect();
3163
3164 ensure_roundtrip(Arc::new(StringArray::from(strings)));
3165 }
3166
3167 #[test]
3168 fn test_large_slice_string_list() {
3169 let mut ls = ListBuilder::new(StringBuilder::new());
3170
3171 let mut s = String::new();
3172 for row_number in 0..8000 {
3173 if row_number % 2 == 0 {
3174 for list_element in 0..1000 {
3175 s.clear();
3176 use std::fmt::Write;
3177 write!(&mut s, "value{row_number}-{list_element}").unwrap();
3178 ls.values().append_value(&s);
3179 }
3180 ls.append(true)
3181 } else {
3182 ls.append(false); }
3184 }
3185
3186 ensure_roundtrip(Arc::new(ls.finish()));
3187 }
3188
3189 #[test]
3190 fn test_large_slice_string_list_of_lists() {
3191 let mut ls = ListBuilder::new(ListBuilder::new(StringBuilder::new()));
3195
3196 for _ in 0..4000 {
3197 ls.values().append(true);
3198 ls.append(true)
3199 }
3200
3201 let mut s = String::new();
3202 for row_number in 0..4000 {
3203 if row_number % 2 == 0 {
3204 for list_element in 0..1000 {
3205 s.clear();
3206 use std::fmt::Write;
3207 write!(&mut s, "value{row_number}-{list_element}").unwrap();
3208 ls.values().values().append_value(&s);
3209 }
3210 ls.values().append(true);
3211 ls.append(true)
3212 } else {
3213 ls.append(false); }
3215 }
3216
3217 ensure_roundtrip(Arc::new(ls.finish()));
3218 }
3219
3220 fn ensure_roundtrip(array: ArrayRef) {
3222 let num_rows = array.len();
3223 let orig_batch = RecordBatch::try_from_iter(vec![("a", array)]).unwrap();
3224 let sliced_batch = orig_batch.slice(1, num_rows - 1);
3226
3227 let schema = orig_batch.schema();
3228 let stream_data = {
3229 let mut writer = StreamWriter::try_new(vec![], &schema).unwrap();
3230 writer.write(&sliced_batch).unwrap();
3231 writer.into_inner().unwrap()
3232 };
3233 let read_batch = {
3234 let projection = None;
3235 let mut reader = StreamReader::try_new(Cursor::new(stream_data), projection).unwrap();
3236 reader
3237 .next()
3238 .expect("expect no errors reading batch")
3239 .expect("expect batch")
3240 };
3241 assert_eq!(sliced_batch, read_batch);
3242
3243 let file_data = {
3244 let mut writer = FileWriter::try_new_buffered(vec![], &schema).unwrap();
3245 writer.write(&sliced_batch).unwrap();
3246 writer.into_inner().unwrap().into_inner().unwrap()
3247 };
3248 let read_batch = {
3249 let projection = None;
3250 let mut reader = FileReader::try_new(Cursor::new(file_data), projection).unwrap();
3251 reader
3252 .next()
3253 .expect("expect no errors reading batch")
3254 .expect("expect batch")
3255 };
3256 assert_eq!(sliced_batch, read_batch);
3257
3258 }
3260
3261 #[test]
3262 fn encode_bools_slice() {
3263 assert_bool_roundtrip([true, false], 1, 1);
3265
3266 assert_bool_roundtrip(
3268 [
3269 true, false, true, true, false, false, true, true, true, false, false, false, true,
3270 true, true, true, false, false, false, false, true, true, true, true, true, false,
3271 false, false, false, false,
3272 ],
3273 13,
3274 17,
3275 );
3276
3277 assert_bool_roundtrip(
3279 [
3280 true, false, true, true, false, false, true, true, true, false, false, false,
3281 ],
3282 8,
3283 2,
3284 );
3285
3286 assert_bool_roundtrip(
3288 [
3289 true, false, true, true, false, false, true, true, true, false, false, false, true,
3290 true, true, true, true, false, false, false, false, false,
3291 ],
3292 8,
3293 8,
3294 );
3295 }
3296
3297 fn assert_bool_roundtrip<const N: usize>(bools: [bool; N], offset: usize, length: usize) {
3298 let val_bool_field = Field::new("val", DataType::Boolean, false);
3299
3300 let schema = Arc::new(Schema::new(vec![val_bool_field]));
3301
3302 let bools = BooleanArray::from(bools.to_vec());
3303
3304 let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(bools)]).unwrap();
3305 let batch = batch.slice(offset, length);
3306
3307 let data = serialize_stream(&batch);
3308 let batch2 = deserialize_stream(data);
3309 assert_eq!(batch, batch2);
3310 }
3311
3312 #[test]
3313 fn test_run_array_unslice() {
3314 let total_len = 80;
3315 let vals: Vec<Option<i32>> = vec![Some(1), None, Some(2), Some(3), Some(4), None, Some(5)];
3316 let repeats: Vec<usize> = vec![3, 4, 1, 2];
3317 let mut input_array: Vec<Option<i32>> = Vec::with_capacity(total_len);
3318 for ix in 0_usize..32 {
3319 let repeat: usize = repeats[ix % repeats.len()];
3320 let val: Option<i32> = vals[ix % vals.len()];
3321 input_array.resize(input_array.len() + repeat, val);
3322 }
3323
3324 let mut builder =
3326 PrimitiveRunBuilder::<Int16Type, Int32Type>::with_capacity(input_array.len());
3327 builder.extend(input_array.iter().copied());
3328 let run_array = builder.finish();
3329
3330 for slice_len in 1..=total_len {
3332 let sliced_run_array: RunArray<Int16Type> =
3334 run_array.slice(0, slice_len).into_data().into();
3335
3336 let unsliced_run_array = into_zero_offset_run_array(sliced_run_array).unwrap();
3338 let typed = unsliced_run_array
3339 .downcast::<PrimitiveArray<Int32Type>>()
3340 .unwrap();
3341 let expected: Vec<Option<i32>> = input_array.iter().take(slice_len).copied().collect();
3342 let actual: Vec<Option<i32>> = typed.into_iter().collect();
3343 assert_eq!(expected, actual);
3344
3345 let sliced_run_array: RunArray<Int16Type> = run_array
3347 .slice(total_len - slice_len, slice_len)
3348 .into_data()
3349 .into();
3350
3351 let unsliced_run_array = into_zero_offset_run_array(sliced_run_array).unwrap();
3353 let typed = unsliced_run_array
3354 .downcast::<PrimitiveArray<Int32Type>>()
3355 .unwrap();
3356 let expected: Vec<Option<i32>> = input_array
3357 .iter()
3358 .skip(total_len - slice_len)
3359 .copied()
3360 .collect();
3361 let actual: Vec<Option<i32>> = typed.into_iter().collect();
3362 assert_eq!(expected, actual);
3363 }
3364 }
3365
3366 fn generate_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3367 let mut ls = GenericListBuilder::<O, _>::new(UInt32Builder::new());
3368
3369 for i in 0..100_000 {
3370 for value in [i, i, i] {
3371 ls.values().append_value(value);
3372 }
3373 ls.append(true)
3374 }
3375
3376 ls.finish()
3377 }
3378
3379 fn generate_utf8view_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3380 let mut ls = GenericListBuilder::<O, _>::new(StringViewBuilder::new());
3381
3382 for i in 0..100_000 {
3383 for value in [
3384 format!("value{}", i),
3385 format!("value{}", i),
3386 format!("value{}", i),
3387 ] {
3388 ls.values().append_value(&value);
3389 }
3390 ls.append(true)
3391 }
3392
3393 ls.finish()
3394 }
3395
3396 fn generate_string_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3397 let mut ls = GenericListBuilder::<O, _>::new(StringBuilder::new());
3398
3399 for i in 0..100_000 {
3400 for value in [
3401 format!("value{}", i),
3402 format!("value{}", i),
3403 format!("value{}", i),
3404 ] {
3405 ls.values().append_value(&value);
3406 }
3407 ls.append(true)
3408 }
3409
3410 ls.finish()
3411 }
3412
3413 fn generate_nested_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3414 let mut ls =
3415 GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(UInt32Builder::new()));
3416
3417 for _i in 0..10_000 {
3418 for j in 0..10 {
3419 for value in [j, j, j, j] {
3420 ls.values().values().append_value(value);
3421 }
3422 ls.values().append(true)
3423 }
3424 ls.append(true);
3425 }
3426
3427 ls.finish()
3428 }
3429
3430 fn generate_nested_list_data_starting_at_zero<O: OffsetSizeTrait>() -> GenericListArray<O> {
3431 let mut ls =
3432 GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(UInt32Builder::new()));
3433
3434 for _i in 0..999 {
3435 ls.values().append(true);
3436 ls.append(true);
3437 }
3438
3439 for j in 0..10 {
3440 for value in [j, j, j, j] {
3441 ls.values().values().append_value(value);
3442 }
3443 ls.values().append(true)
3444 }
3445 ls.append(true);
3446
3447 for i in 0..9_000 {
3448 for j in 0..10 {
3449 for value in [i + j, i + j, i + j, i + j] {
3450 ls.values().values().append_value(value);
3451 }
3452 ls.values().append(true)
3453 }
3454 ls.append(true);
3455 }
3456
3457 ls.finish()
3458 }
3459
3460 fn generate_map_array_data() -> MapArray {
3461 let keys_builder = UInt32Builder::new();
3462 let values_builder = UInt32Builder::new();
3463
3464 let mut builder = MapBuilder::new(None, keys_builder, values_builder);
3465
3466 for i in 0..100_000 {
3467 for _j in 0..3 {
3468 builder.keys().append_value(i);
3469 builder.values().append_value(i * 2);
3470 }
3471 builder.append(true).unwrap();
3472 }
3473
3474 builder.finish()
3475 }
3476
3477 #[test]
3478 fn reencode_offsets_when_first_offset_is_not_zero() {
3479 let original_list = generate_list_data::<i32>();
3480 let original_data = original_list.into_data();
3481 let slice_data = original_data.slice(75, 7);
3482 let (new_offsets, original_start, length) =
3483 reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
3484 assert_eq!(
3485 vec![0, 3, 6, 9, 12, 15, 18, 21],
3486 new_offsets.typed_data::<i32>()
3487 );
3488 assert_eq!(225, original_start);
3489 assert_eq!(21, length);
3490 }
3491
3492 #[test]
3493 fn reencode_offsets_when_first_offset_is_zero() {
3494 let mut ls = GenericListBuilder::<i32, _>::new(UInt32Builder::new());
3495 ls.append(true);
3497 ls.values().append_value(35);
3498 ls.values().append_value(42);
3499 ls.append(true);
3500 let original_list = ls.finish();
3501 let original_data = original_list.into_data();
3502
3503 let slice_data = original_data.slice(1, 1);
3504 let (new_offsets, original_start, length) =
3505 reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
3506 assert_eq!(vec![0, 2], new_offsets.typed_data::<i32>());
3507 assert_eq!(0, original_start);
3508 assert_eq!(2, length);
3509 }
3510
3511 fn roundtrip_ensure_sliced_smaller(in_batch: RecordBatch, expected_size_factor: usize) {
3514 let in_sliced = in_batch.slice(999, 1);
3516
3517 let bytes_batch = serialize_file(&in_batch);
3518 let bytes_sliced = serialize_file(&in_sliced);
3519
3520 assert!(bytes_sliced.len() < (bytes_batch.len() / expected_size_factor));
3522
3523 let out_batch = deserialize_file(bytes_batch);
3525 assert_eq!(in_batch, out_batch);
3526
3527 let out_sliced = deserialize_file(bytes_sliced);
3528 assert_eq!(in_sliced, out_sliced);
3529 }
3530
3531 #[test]
3532 fn encode_lists() {
3533 let val_inner = Field::new_list_field(DataType::UInt32, true);
3534 let val_list_field = Field::new("val", DataType::List(Arc::new(val_inner)), false);
3535 let schema = Arc::new(Schema::new(vec![val_list_field]));
3536
3537 let values = Arc::new(generate_list_data::<i32>());
3538
3539 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3540 roundtrip_ensure_sliced_smaller(in_batch, 1000);
3541 }
3542
3543 #[test]
3544 fn encode_empty_list() {
3545 let val_inner = Field::new_list_field(DataType::UInt32, true);
3546 let val_list_field = Field::new("val", DataType::List(Arc::new(val_inner)), false);
3547 let schema = Arc::new(Schema::new(vec![val_list_field]));
3548
3549 let values = Arc::new(generate_list_data::<i32>());
3550
3551 let in_batch = RecordBatch::try_new(schema, vec![values])
3552 .unwrap()
3553 .slice(999, 0);
3554 let out_batch = deserialize_file(serialize_file(&in_batch));
3555 assert_eq!(in_batch, out_batch);
3556 }
3557
3558 #[test]
3559 fn encode_large_lists() {
3560 let val_inner = Field::new_list_field(DataType::UInt32, true);
3561 let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3562 let schema = Arc::new(Schema::new(vec![val_list_field]));
3563
3564 let values = Arc::new(generate_list_data::<i64>());
3565
3566 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3569 roundtrip_ensure_sliced_smaller(in_batch, 1000);
3570 }
3571
3572 #[test]
3573 fn encode_large_lists_non_zero_offset() {
3574 let val_inner = Field::new_list_field(DataType::UInt32, true);
3575 let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3576 let schema = Arc::new(Schema::new(vec![val_list_field]));
3577
3578 let values = Arc::new(generate_list_data::<i64>());
3579
3580 check_sliced_list_array(schema, values);
3581 }
3582
3583 #[test]
3584 fn encode_large_lists_string_non_zero_offset() {
3585 let val_inner = Field::new_list_field(DataType::Utf8, true);
3586 let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3587 let schema = Arc::new(Schema::new(vec![val_list_field]));
3588
3589 let values = Arc::new(generate_string_list_data::<i64>());
3590
3591 check_sliced_list_array(schema, values);
3592 }
3593
3594 #[test]
3595 fn encode_large_list_string_view_non_zero_offset() {
3596 let val_inner = Field::new_list_field(DataType::Utf8View, true);
3597 let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3598 let schema = Arc::new(Schema::new(vec![val_list_field]));
3599
3600 let values = Arc::new(generate_utf8view_list_data::<i64>());
3601
3602 check_sliced_list_array(schema, values);
3603 }
3604
3605 fn check_sliced_list_array(schema: Arc<Schema>, values: Arc<GenericListArray<i64>>) {
3606 for (offset, len) in [(999, 1), (0, 13), (47, 12), (values.len() - 13, 13)] {
3607 let in_batch = RecordBatch::try_new(schema.clone(), vec![values.clone()])
3608 .unwrap()
3609 .slice(offset, len);
3610 let out_batch = deserialize_file(serialize_file(&in_batch));
3611 assert_eq!(in_batch, out_batch);
3612 }
3613 }
3614
3615 #[test]
3616 fn encode_nested_lists() {
3617 let inner_int = Arc::new(Field::new_list_field(DataType::UInt32, true));
3618 let inner_list_field = Arc::new(Field::new_list_field(DataType::List(inner_int), true));
3619 let list_field = Field::new("val", DataType::List(inner_list_field), true);
3620 let schema = Arc::new(Schema::new(vec![list_field]));
3621
3622 let values = Arc::new(generate_nested_list_data::<i32>());
3623
3624 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3625 roundtrip_ensure_sliced_smaller(in_batch, 1000);
3626 }
3627
3628 #[test]
3629 fn encode_nested_lists_starting_at_zero() {
3630 let inner_int = Arc::new(Field::new("item", DataType::UInt32, true));
3631 let inner_list_field = Arc::new(Field::new("item", DataType::List(inner_int), true));
3632 let list_field = Field::new("val", DataType::List(inner_list_field), true);
3633 let schema = Arc::new(Schema::new(vec![list_field]));
3634
3635 let values = Arc::new(generate_nested_list_data_starting_at_zero::<i32>());
3636
3637 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3638 roundtrip_ensure_sliced_smaller(in_batch, 1);
3639 }
3640
3641 #[test]
3642 fn encode_map_array() {
3643 let keys = Arc::new(Field::new("keys", DataType::UInt32, false));
3644 let values = Arc::new(Field::new("values", DataType::UInt32, true));
3645 let map_field = Field::new_map("map", "entries", keys, values, false, true);
3646 let schema = Arc::new(Schema::new(vec![map_field]));
3647
3648 let values = Arc::new(generate_map_array_data());
3649
3650 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3651 roundtrip_ensure_sliced_smaller(in_batch, 1000);
3652 }
3653
3654 fn generate_list_view_data<O: OffsetSizeTrait>() -> GenericListViewArray<O> {
3655 let mut builder = GenericListViewBuilder::<O, _>::new(UInt32Builder::new());
3656
3657 for i in 0u32..100_000 {
3658 if i.is_multiple_of(10_000) {
3659 builder.append(false);
3660 continue;
3661 }
3662 for value in [i, i, i] {
3663 builder.values().append_value(value);
3664 }
3665 builder.append(true);
3666 }
3667
3668 builder.finish()
3669 }
3670
3671 #[test]
3672 fn encode_list_view_arrays() {
3673 let val_inner = Field::new_list_field(DataType::UInt32, true);
3674 let val_field = Field::new("val", DataType::ListView(Arc::new(val_inner)), true);
3675 let schema = Arc::new(Schema::new(vec![val_field]));
3676
3677 let values = Arc::new(generate_list_view_data::<i32>());
3678
3679 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3680 let out_batch = deserialize_file(serialize_file(&in_batch));
3681 assert_eq!(in_batch, out_batch);
3682 }
3683
3684 #[test]
3685 fn encode_large_list_view_arrays() {
3686 let val_inner = Field::new_list_field(DataType::UInt32, true);
3687 let val_field = Field::new("val", DataType::LargeListView(Arc::new(val_inner)), true);
3688 let schema = Arc::new(Schema::new(vec![val_field]));
3689
3690 let values = Arc::new(generate_list_view_data::<i64>());
3691
3692 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3693 let out_batch = deserialize_file(serialize_file(&in_batch));
3694 assert_eq!(in_batch, out_batch);
3695 }
3696
3697 #[test]
3698 fn check_sliced_list_view_array() {
3699 let inner = Field::new_list_field(DataType::UInt32, true);
3700 let field = Field::new("val", DataType::ListView(Arc::new(inner)), true);
3701 let schema = Arc::new(Schema::new(vec![field]));
3702 let values = Arc::new(generate_list_view_data::<i32>());
3703
3704 for (offset, len) in [(999, 1), (0, 13), (47, 12), (values.len() - 13, 13)] {
3705 let in_batch = RecordBatch::try_new(schema.clone(), vec![values.clone()])
3706 .unwrap()
3707 .slice(offset, len);
3708 let out_batch = deserialize_file(serialize_file(&in_batch));
3709 assert_eq!(in_batch, out_batch);
3710 }
3711 }
3712
3713 #[test]
3714 fn check_sliced_large_list_view_array() {
3715 let inner = Field::new_list_field(DataType::UInt32, true);
3716 let field = Field::new("val", DataType::LargeListView(Arc::new(inner)), true);
3717 let schema = Arc::new(Schema::new(vec![field]));
3718 let values = Arc::new(generate_list_view_data::<i64>());
3719
3720 for (offset, len) in [(999, 1), (0, 13), (47, 12), (values.len() - 13, 13)] {
3721 let in_batch = RecordBatch::try_new(schema.clone(), vec![values.clone()])
3722 .unwrap()
3723 .slice(offset, len);
3724 let out_batch = deserialize_file(serialize_file(&in_batch));
3725 assert_eq!(in_batch, out_batch);
3726 }
3727 }
3728
3729 fn generate_nested_list_view_data<O: OffsetSizeTrait>() -> GenericListViewArray<O> {
3730 let inner_builder = UInt32Builder::new();
3731 let middle_builder = GenericListViewBuilder::<O, _>::new(inner_builder);
3732 let mut outer_builder = GenericListViewBuilder::<O, _>::new(middle_builder);
3733
3734 for i in 0u32..10_000 {
3735 if i.is_multiple_of(1_000) {
3736 outer_builder.append(false);
3737 continue;
3738 }
3739
3740 for _ in 0..3 {
3741 for value in [i, i + 1, i + 2] {
3742 outer_builder.values().values().append_value(value);
3743 }
3744 outer_builder.values().append(true);
3745 }
3746 outer_builder.append(true);
3747 }
3748
3749 outer_builder.finish()
3750 }
3751
3752 #[test]
3753 fn encode_nested_list_views() {
3754 let inner_int = Arc::new(Field::new_list_field(DataType::UInt32, true));
3755 let inner_list_field = Arc::new(Field::new_list_field(DataType::ListView(inner_int), true));
3756 let list_field = Field::new("val", DataType::ListView(inner_list_field), true);
3757 let schema = Arc::new(Schema::new(vec![list_field]));
3758
3759 let values = Arc::new(generate_nested_list_view_data::<i32>());
3760
3761 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3762 let out_batch = deserialize_file(serialize_file(&in_batch));
3763 assert_eq!(in_batch, out_batch);
3764 }
3765
3766 fn test_roundtrip_list_view_of_dict_impl<OffsetSize: OffsetSizeTrait, U: ArrowNativeType>(
3767 list_data_type: DataType,
3768 offsets: &[U; 5],
3769 sizes: &[U; 4],
3770 ) {
3771 let values = StringArray::from(vec![Some("alpha"), None, Some("beta"), Some("gamma")]);
3772 let keys = Int32Array::from_iter_values([0, 0, 1, 2, 3, 0, 2]);
3773 let dict_array = DictionaryArray::new(keys, Arc::new(values));
3774 let dict_data = dict_array.to_data();
3775
3776 let value_offsets = Buffer::from_slice_ref(offsets);
3777 let value_sizes = Buffer::from_slice_ref(sizes);
3778
3779 let list_data = ArrayData::builder(list_data_type)
3780 .len(4)
3781 .add_buffer(value_offsets)
3782 .add_buffer(value_sizes)
3783 .add_child_data(dict_data)
3784 .build()
3785 .unwrap();
3786 let list_view_array = GenericListViewArray::<OffsetSize>::from(list_data);
3787
3788 let schema = Arc::new(Schema::new(vec![Field::new(
3789 "f1",
3790 list_view_array.data_type().clone(),
3791 false,
3792 )]));
3793 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(list_view_array)]).unwrap();
3794
3795 let output_batch = deserialize_file(serialize_file(&input_batch));
3796 assert_eq!(input_batch, output_batch);
3797
3798 let output_batch = deserialize_stream(serialize_stream(&input_batch));
3799 assert_eq!(input_batch, output_batch);
3800 }
3801
3802 #[test]
3803 fn test_roundtrip_list_view_of_dict() {
3804 #[allow(deprecated)]
3805 let list_data_type = DataType::ListView(Arc::new(Field::new_dict(
3806 "item",
3807 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3808 true,
3809 1,
3810 false,
3811 )));
3812 let offsets: &[i32; 5] = &[0, 2, 4, 4, 7];
3813 let sizes: &[i32; 4] = &[2, 2, 0, 3];
3814 test_roundtrip_list_view_of_dict_impl::<i32, i32>(list_data_type, offsets, sizes);
3815 }
3816
3817 #[test]
3818 fn test_roundtrip_large_list_view_of_dict() {
3819 #[allow(deprecated)]
3820 let list_data_type = DataType::LargeListView(Arc::new(Field::new_dict(
3821 "item",
3822 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3823 true,
3824 2,
3825 false,
3826 )));
3827 let offsets: &[i64; 5] = &[0, 2, 4, 4, 7];
3828 let sizes: &[i64; 4] = &[2, 2, 0, 3];
3829 test_roundtrip_list_view_of_dict_impl::<i64, i64>(list_data_type, offsets, sizes);
3830 }
3831
3832 #[test]
3833 fn test_roundtrip_sliced_list_view_of_dict() {
3834 #[allow(deprecated)]
3835 let list_data_type = DataType::ListView(Arc::new(Field::new_dict(
3836 "item",
3837 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3838 true,
3839 3,
3840 false,
3841 )));
3842
3843 let values = StringArray::from(vec![Some("alpha"), None, Some("beta"), Some("gamma")]);
3844 let keys = Int32Array::from_iter_values([0, 0, 1, 2, 3, 0, 2, 1, 0, 3, 2, 1]);
3845 let dict_array = DictionaryArray::new(keys, Arc::new(values));
3846 let dict_data = dict_array.to_data();
3847
3848 let offsets: &[i32; 7] = &[0, 2, 4, 4, 7, 9, 12];
3849 let sizes: &[i32; 6] = &[2, 2, 0, 3, 2, 3];
3850 let value_offsets = Buffer::from_slice_ref(offsets);
3851 let value_sizes = Buffer::from_slice_ref(sizes);
3852
3853 let list_data = ArrayData::builder(list_data_type)
3854 .len(6)
3855 .add_buffer(value_offsets)
3856 .add_buffer(value_sizes)
3857 .add_child_data(dict_data)
3858 .build()
3859 .unwrap();
3860 let list_view_array = GenericListViewArray::<i32>::from(list_data);
3861
3862 let schema = Arc::new(Schema::new(vec![Field::new(
3863 "f1",
3864 list_view_array.data_type().clone(),
3865 false,
3866 )]));
3867 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(list_view_array)]).unwrap();
3868
3869 let sliced_batch = input_batch.slice(1, 4);
3870
3871 let output_batch = deserialize_file(serialize_file(&sliced_batch));
3872 assert_eq!(sliced_batch, output_batch);
3873
3874 let output_batch = deserialize_stream(serialize_stream(&sliced_batch));
3875 assert_eq!(sliced_batch, output_batch);
3876 }
3877
3878 #[test]
3879 fn test_roundtrip_dense_union_of_dict() {
3880 let values = StringArray::from(vec![Some("alpha"), None, Some("beta"), Some("gamma")]);
3881 let keys = Int32Array::from_iter_values([0, 0, 1, 2, 3, 0, 2]);
3882 let dict_array = DictionaryArray::new(keys, Arc::new(values));
3883
3884 #[allow(deprecated)]
3885 let dict_field = Arc::new(Field::new_dict(
3886 "dict",
3887 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3888 true,
3889 1,
3890 false,
3891 ));
3892 let int_field = Arc::new(Field::new("int", DataType::Int32, false));
3893 let union_fields = UnionFields::try_new(vec![0, 1], vec![dict_field, int_field]).unwrap();
3894
3895 let types = ScalarBuffer::from(vec![0i8, 0, 1, 0, 1, 0, 0]);
3896 let offsets = ScalarBuffer::from(vec![0i32, 1, 0, 2, 1, 3, 4]);
3897
3898 let int_array = Int32Array::from(vec![100, 200]);
3899
3900 let union = UnionArray::try_new(
3901 union_fields.clone(),
3902 types,
3903 Some(offsets),
3904 vec![Arc::new(dict_array), Arc::new(int_array)],
3905 )
3906 .unwrap();
3907
3908 let schema = Arc::new(Schema::new(vec![Field::new(
3909 "union",
3910 DataType::Union(union_fields, UnionMode::Dense),
3911 false,
3912 )]));
3913 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap();
3914
3915 let output_batch = deserialize_file(serialize_file(&input_batch));
3916 assert_eq!(input_batch, output_batch);
3917
3918 let output_batch = deserialize_stream(serialize_stream(&input_batch));
3919 assert_eq!(input_batch, output_batch);
3920 }
3921
3922 #[test]
3923 fn test_roundtrip_sparse_union_of_dict() {
3924 let values = StringArray::from(vec![Some("alpha"), None, Some("beta"), Some("gamma")]);
3925 let keys = Int32Array::from_iter_values([0, 0, 1, 2, 3, 0, 2]);
3926 let dict_array = DictionaryArray::new(keys, Arc::new(values));
3927
3928 #[allow(deprecated)]
3929 let dict_field = Arc::new(Field::new_dict(
3930 "dict",
3931 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3932 true,
3933 2,
3934 false,
3935 ));
3936 let int_field = Arc::new(Field::new("int", DataType::Int32, false));
3937 let union_fields = UnionFields::try_new(vec![0, 1], vec![dict_field, int_field]).unwrap();
3938
3939 let types = ScalarBuffer::from(vec![0i8, 0, 1, 0, 1, 0, 0]);
3940
3941 let int_array = Int32Array::from(vec![0, 0, 100, 0, 200, 0, 0]);
3942
3943 let union = UnionArray::try_new(
3944 union_fields.clone(),
3945 types,
3946 None,
3947 vec![Arc::new(dict_array), Arc::new(int_array)],
3948 )
3949 .unwrap();
3950
3951 let schema = Arc::new(Schema::new(vec![Field::new(
3952 "union",
3953 DataType::Union(union_fields, UnionMode::Sparse),
3954 false,
3955 )]));
3956 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap();
3957
3958 let output_batch = deserialize_file(serialize_file(&input_batch));
3959 assert_eq!(input_batch, output_batch);
3960
3961 let output_batch = deserialize_stream(serialize_stream(&input_batch));
3962 assert_eq!(input_batch, output_batch);
3963 }
3964
3965 #[test]
3966 fn test_roundtrip_map_with_dict_keys() {
3967 let key_values = StringArray::from(vec!["key_a", "key_b", "key_c"]);
3970 let keys = Int32Array::from_iter_values([0, 1, 2, 0, 1, 0]);
3971 let dict_keys = DictionaryArray::new(keys, Arc::new(key_values));
3972
3973 let values = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
3974
3975 #[allow(deprecated)]
3976 let entries_field = Arc::new(Field::new(
3977 "entries",
3978 DataType::Struct(
3979 vec![
3980 Field::new_dict(
3981 "key",
3982 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3983 false,
3984 1,
3985 false,
3986 ),
3987 Field::new("value", DataType::Int32, true),
3988 ]
3989 .into(),
3990 ),
3991 false,
3992 ));
3993
3994 let entries = StructArray::from(vec![
3995 (
3996 Arc::new(Field::new(
3997 "key",
3998 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3999 false,
4000 )),
4001 Arc::new(dict_keys) as ArrayRef,
4002 ),
4003 (
4004 Arc::new(Field::new("value", DataType::Int32, true)),
4005 Arc::new(values) as ArrayRef,
4006 ),
4007 ]);
4008
4009 let offsets = Buffer::from_slice_ref([0i32, 2, 4, 6]);
4010
4011 let map_data = ArrayData::builder(DataType::Map(entries_field, false))
4012 .len(3)
4013 .add_buffer(offsets)
4014 .add_child_data(entries.into_data())
4015 .build()
4016 .unwrap();
4017 let map_array = MapArray::from(map_data);
4018
4019 let schema = Arc::new(Schema::new(vec![Field::new(
4020 "map",
4021 map_array.data_type().clone(),
4022 false,
4023 )]));
4024 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(map_array)]).unwrap();
4025
4026 let output_batch = deserialize_file(serialize_file(&input_batch));
4027 assert_eq!(input_batch, output_batch);
4028
4029 let output_batch = deserialize_stream(serialize_stream(&input_batch));
4030 assert_eq!(input_batch, output_batch);
4031 }
4032
4033 #[test]
4034 fn test_roundtrip_map_with_dict_values() {
4035 let keys = StringArray::from(vec!["a", "b", "c", "d", "e", "f"]);
4038
4039 let value_values = StringArray::from(vec!["val_x", "val_y", "val_z"]);
4040 let value_keys = Int32Array::from_iter_values([0, 1, 2, 0, 1, 0]);
4041 let dict_values = DictionaryArray::new(value_keys, Arc::new(value_values));
4042
4043 #[allow(deprecated)]
4044 let entries_field = Arc::new(Field::new(
4045 "entries",
4046 DataType::Struct(
4047 vec![
4048 Field::new("key", DataType::Utf8, false),
4049 Field::new_dict(
4050 "value",
4051 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
4052 true,
4053 2,
4054 false,
4055 ),
4056 ]
4057 .into(),
4058 ),
4059 false,
4060 ));
4061
4062 let entries = StructArray::from(vec![
4063 (
4064 Arc::new(Field::new("key", DataType::Utf8, false)),
4065 Arc::new(keys) as ArrayRef,
4066 ),
4067 (
4068 Arc::new(Field::new(
4069 "value",
4070 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
4071 true,
4072 )),
4073 Arc::new(dict_values) as ArrayRef,
4074 ),
4075 ]);
4076
4077 let offsets = Buffer::from_slice_ref([0i32, 2, 4, 6]);
4078
4079 let map_data = ArrayData::builder(DataType::Map(entries_field, false))
4080 .len(3)
4081 .add_buffer(offsets)
4082 .add_child_data(entries.into_data())
4083 .build()
4084 .unwrap();
4085 let map_array = MapArray::from(map_data);
4086
4087 let schema = Arc::new(Schema::new(vec![Field::new(
4088 "map",
4089 map_array.data_type().clone(),
4090 false,
4091 )]));
4092 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(map_array)]).unwrap();
4093
4094 let output_batch = deserialize_file(serialize_file(&input_batch));
4095 assert_eq!(input_batch, output_batch);
4096
4097 let output_batch = deserialize_stream(serialize_stream(&input_batch));
4098 assert_eq!(input_batch, output_batch);
4099 }
4100
4101 #[test]
4102 fn test_decimal128_alignment16_is_sufficient() {
4103 const IPC_ALIGNMENT: usize = 16;
4104
4105 for num_cols in [1, 2, 3, 17, 50, 73, 99] {
4110 let num_rows = (num_cols * 7 + 11) % 100; let mut fields = Vec::new();
4113 let mut arrays = Vec::new();
4114 for i in 0..num_cols {
4115 let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
4116 let array = Decimal128Array::from(vec![num_cols as i128; num_rows]);
4117 fields.push(field);
4118 arrays.push(Arc::new(array) as Arc<dyn Array>);
4119 }
4120 let schema = Schema::new(fields);
4121 let batch = RecordBatch::try_new(Arc::new(schema), arrays).unwrap();
4122
4123 let mut writer = FileWriter::try_new_with_options(
4124 Vec::new(),
4125 batch.schema_ref(),
4126 IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
4127 )
4128 .unwrap();
4129 writer.write(&batch).unwrap();
4130 writer.finish().unwrap();
4131
4132 let out: Vec<u8> = writer.into_inner().unwrap();
4133
4134 let buffer = Buffer::from_vec(out);
4135 let trailer_start = buffer.len() - 10;
4136 let footer_len =
4137 read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
4138 let footer =
4139 root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
4140
4141 let schema = fb_to_schema(footer.schema().unwrap());
4142
4143 let decoder =
4146 FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true);
4147
4148 let batches = footer.recordBatches().unwrap();
4149
4150 let block = batches.get(0);
4151 let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
4152 let data = buffer.slice_with_length(block.offset() as _, block_len);
4153
4154 let batch2 = decoder.read_record_batch(block, &data).unwrap().unwrap();
4155
4156 assert_eq!(batch, batch2);
4157 }
4158 }
4159
4160 #[test]
4161 fn test_decimal128_alignment8_is_unaligned() {
4162 const IPC_ALIGNMENT: usize = 8;
4163
4164 let num_cols = 2;
4165 let num_rows = 1;
4166
4167 let mut fields = Vec::new();
4168 let mut arrays = Vec::new();
4169 for i in 0..num_cols {
4170 let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
4171 let array = Decimal128Array::from(vec![num_cols as i128; num_rows]);
4172 fields.push(field);
4173 arrays.push(Arc::new(array) as Arc<dyn Array>);
4174 }
4175 let schema = Schema::new(fields);
4176 let batch = RecordBatch::try_new(Arc::new(schema), arrays).unwrap();
4177
4178 let mut writer = FileWriter::try_new_with_options(
4179 Vec::new(),
4180 batch.schema_ref(),
4181 IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
4182 )
4183 .unwrap();
4184 writer.write(&batch).unwrap();
4185 writer.finish().unwrap();
4186
4187 let out: Vec<u8> = writer.into_inner().unwrap();
4188
4189 let buffer = Buffer::from_vec(out);
4190 let trailer_start = buffer.len() - 10;
4191 let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
4192 let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
4193 let schema = fb_to_schema(footer.schema().unwrap());
4194
4195 let decoder =
4198 FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true);
4199
4200 let batches = footer.recordBatches().unwrap();
4201
4202 let block = batches.get(0);
4203 let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
4204 let data = buffer.slice_with_length(block.offset() as _, block_len);
4205
4206 let result = decoder.read_record_batch(block, &data);
4207
4208 let error = result.unwrap_err();
4209 assert_eq!(
4210 error.to_string(),
4211 "Invalid argument error: Misaligned buffers[0] in array of type Decimal128(38, 10), \
4212 offset from expected alignment of 16 by 8"
4213 );
4214 }
4215
4216 #[test]
4217 fn test_flush() {
4218 let num_cols = 2;
4221 let mut fields = Vec::new();
4222 let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap();
4223 for i in 0..num_cols {
4224 let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
4225 fields.push(field);
4226 }
4227 let schema = Schema::new(fields);
4228 let inner_stream_writer = BufWriter::with_capacity(1024, Vec::new());
4229 let inner_file_writer = BufWriter::with_capacity(1024, Vec::new());
4230 let mut stream_writer =
4231 StreamWriter::try_new_with_options(inner_stream_writer, &schema, options.clone())
4232 .unwrap();
4233 let mut file_writer =
4234 FileWriter::try_new_with_options(inner_file_writer, &schema, options).unwrap();
4235
4236 let stream_bytes_written_on_new = stream_writer.get_ref().get_ref().len();
4237 let file_bytes_written_on_new = file_writer.get_ref().get_ref().len();
4238 stream_writer.flush().unwrap();
4239 file_writer.flush().unwrap();
4240 let stream_bytes_written_on_flush = stream_writer.get_ref().get_ref().len();
4241 let file_bytes_written_on_flush = file_writer.get_ref().get_ref().len();
4242 let stream_out = stream_writer.into_inner().unwrap().into_inner().unwrap();
4243 let expected_stream_flushed_bytes = stream_out.len() - 8;
4247 let expected_file_flushed_bytes = expected_stream_flushed_bytes + 8;
4250
4251 assert!(
4252 stream_bytes_written_on_new < stream_bytes_written_on_flush,
4253 "this test makes no sense if flush is not actually required"
4254 );
4255 assert!(
4256 file_bytes_written_on_new < file_bytes_written_on_flush,
4257 "this test makes no sense if flush is not actually required"
4258 );
4259 assert_eq!(stream_bytes_written_on_flush, expected_stream_flushed_bytes);
4260 assert_eq!(file_bytes_written_on_flush, expected_file_flushed_bytes);
4261 }
4262
4263 #[test]
4264 fn test_roundtrip_list_of_fixed_list() -> Result<(), ArrowError> {
4265 let l1_type =
4266 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, false)), 3);
4267 let l2_type = DataType::List(Arc::new(Field::new("item", l1_type.clone(), false)));
4268
4269 let l0_builder = Float32Builder::new();
4270 let l1_builder = FixedSizeListBuilder::new(l0_builder, 3).with_field(Arc::new(Field::new(
4271 "item",
4272 DataType::Float32,
4273 false,
4274 )));
4275 let mut l2_builder =
4276 ListBuilder::new(l1_builder).with_field(Arc::new(Field::new("item", l1_type, false)));
4277
4278 for point in [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0], [7.0, 8.0, 9.0]] {
4279 l2_builder.values().values().append_value(point[0]);
4280 l2_builder.values().values().append_value(point[1]);
4281 l2_builder.values().values().append_value(point[2]);
4282
4283 l2_builder.values().append(true);
4284 }
4285 l2_builder.append(true);
4286
4287 let point = [10., 11., 12.];
4288 l2_builder.values().values().append_value(point[0]);
4289 l2_builder.values().values().append_value(point[1]);
4290 l2_builder.values().values().append_value(point[2]);
4291
4292 l2_builder.values().append(true);
4293 l2_builder.append(true);
4294
4295 let array = Arc::new(l2_builder.finish()) as ArrayRef;
4296
4297 let schema = Arc::new(Schema::new_with_metadata(
4298 vec![Field::new("points", l2_type, false)],
4299 HashMap::default(),
4300 ));
4301
4302 test_slices(&array, &schema, 0, 1)?;
4305 test_slices(&array, &schema, 0, 2)?;
4306 test_slices(&array, &schema, 1, 1)?;
4307
4308 Ok(())
4309 }
4310
4311 #[test]
4312 fn test_roundtrip_list_of_fixed_list_w_nulls() -> Result<(), ArrowError> {
4313 let l0_builder = Float32Builder::new();
4314 let l1_builder = FixedSizeListBuilder::new(l0_builder, 3);
4315 let mut l2_builder = ListBuilder::new(l1_builder);
4316
4317 for point in [
4318 [Some(1.0), Some(2.0), None],
4319 [Some(4.0), Some(5.0), Some(6.0)],
4320 [None, Some(8.0), Some(9.0)],
4321 ] {
4322 for p in point {
4323 match p {
4324 Some(p) => l2_builder.values().values().append_value(p),
4325 None => l2_builder.values().values().append_null(),
4326 }
4327 }
4328
4329 l2_builder.values().append(true);
4330 }
4331 l2_builder.append(true);
4332
4333 let point = [Some(10.), None, None];
4334 for p in point {
4335 match p {
4336 Some(p) => l2_builder.values().values().append_value(p),
4337 None => l2_builder.values().values().append_null(),
4338 }
4339 }
4340
4341 l2_builder.values().append(true);
4342 l2_builder.append(true);
4343
4344 let array = Arc::new(l2_builder.finish()) as ArrayRef;
4345
4346 let schema = Arc::new(Schema::new_with_metadata(
4347 vec![Field::new(
4348 "points",
4349 DataType::List(Arc::new(Field::new(
4350 "item",
4351 DataType::FixedSizeList(
4352 Arc::new(Field::new("item", DataType::Float32, true)),
4353 3,
4354 ),
4355 true,
4356 ))),
4357 true,
4358 )],
4359 HashMap::default(),
4360 ));
4361
4362 test_slices(&array, &schema, 0, 1)?;
4365 test_slices(&array, &schema, 0, 2)?;
4366 test_slices(&array, &schema, 1, 1)?;
4367
4368 Ok(())
4369 }
4370
4371 fn test_slices(
4372 parent_array: &ArrayRef,
4373 schema: &SchemaRef,
4374 offset: usize,
4375 length: usize,
4376 ) -> Result<(), ArrowError> {
4377 let subarray = parent_array.slice(offset, length);
4378 let original_batch = RecordBatch::try_new(schema.clone(), vec![subarray])?;
4379
4380 let mut bytes = Vec::new();
4381 let mut writer = StreamWriter::try_new(&mut bytes, schema)?;
4382 writer.write(&original_batch)?;
4383 writer.finish()?;
4384
4385 let mut cursor = std::io::Cursor::new(bytes);
4386 let mut reader = StreamReader::try_new(&mut cursor, None)?;
4387 let returned_batch = reader.next().unwrap()?;
4388
4389 assert_eq!(original_batch, returned_batch);
4390
4391 Ok(())
4392 }
4393
4394 #[test]
4395 fn test_roundtrip_fixed_list() -> Result<(), ArrowError> {
4396 let int_builder = Int64Builder::new();
4397 let mut fixed_list_builder = FixedSizeListBuilder::new(int_builder, 3)
4398 .with_field(Arc::new(Field::new("item", DataType::Int64, false)));
4399
4400 for point in [[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]] {
4401 fixed_list_builder.values().append_value(point[0]);
4402 fixed_list_builder.values().append_value(point[1]);
4403 fixed_list_builder.values().append_value(point[2]);
4404
4405 fixed_list_builder.append(true);
4406 }
4407
4408 let array = Arc::new(fixed_list_builder.finish()) as ArrayRef;
4409
4410 let schema = Arc::new(Schema::new_with_metadata(
4411 vec![Field::new(
4412 "points",
4413 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, false)), 3),
4414 false,
4415 )],
4416 HashMap::default(),
4417 ));
4418
4419 test_slices(&array, &schema, 0, 4)?;
4422 test_slices(&array, &schema, 0, 2)?;
4423 test_slices(&array, &schema, 1, 3)?;
4424 test_slices(&array, &schema, 2, 1)?;
4425
4426 Ok(())
4427 }
4428
4429 #[test]
4430 fn test_roundtrip_fixed_list_w_nulls() -> Result<(), ArrowError> {
4431 let int_builder = Int64Builder::new();
4432 let mut fixed_list_builder = FixedSizeListBuilder::new(int_builder, 3);
4433
4434 for point in [
4435 [Some(1), Some(2), None],
4436 [Some(4), Some(5), Some(6)],
4437 [None, Some(8), Some(9)],
4438 [Some(10), None, None],
4439 ] {
4440 for p in point {
4441 match p {
4442 Some(p) => fixed_list_builder.values().append_value(p),
4443 None => fixed_list_builder.values().append_null(),
4444 }
4445 }
4446
4447 fixed_list_builder.append(true);
4448 }
4449
4450 let array = Arc::new(fixed_list_builder.finish()) as ArrayRef;
4451
4452 let schema = Arc::new(Schema::new_with_metadata(
4453 vec![Field::new(
4454 "points",
4455 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, true)), 3),
4456 true,
4457 )],
4458 HashMap::default(),
4459 ));
4460
4461 test_slices(&array, &schema, 0, 4)?;
4464 test_slices(&array, &schema, 0, 2)?;
4465 test_slices(&array, &schema, 1, 3)?;
4466 test_slices(&array, &schema, 2, 1)?;
4467
4468 Ok(())
4469 }
4470
4471 #[test]
4472 fn test_metadata_encoding_ordering() {
4473 fn create_hash() -> u64 {
4474 let metadata: HashMap<String, String> = [
4475 ("a", "1"), ("b", "2"), ("c", "3"), ("d", "4"), ("e", "5"), ]
4481 .into_iter()
4482 .map(|(k, v)| (k.to_owned(), v.to_owned()))
4483 .collect();
4484
4485 let schema = Arc::new(
4487 Schema::new(vec![
4488 Field::new("a", DataType::Int64, true).with_metadata(metadata.clone()),
4489 ])
4490 .with_metadata(metadata)
4491 .clone(),
4492 );
4493 let batch = RecordBatch::new_empty(schema.clone());
4494
4495 let mut bytes = Vec::new();
4496 let mut w = StreamWriter::try_new(&mut bytes, batch.schema_ref()).unwrap();
4497 w.write(&batch).unwrap();
4498 w.finish().unwrap();
4499
4500 let mut h = std::hash::DefaultHasher::new();
4501 h.write(&bytes);
4502 h.finish()
4503 }
4504
4505 let expected = create_hash();
4506
4507 let all_passed = (0..20).all(|_| create_hash() == expected);
4512 assert!(all_passed);
4513 }
4514
4515 #[test]
4516 fn test_dictionary_tracker_reset() {
4517 let data_gen = IpcDataGenerator::default();
4518 let mut dictionary_tracker = DictionaryTracker::new(false);
4519 let writer_options = IpcWriteOptions::default();
4520 let mut compression_ctx = IpcWriteContext::default();
4521
4522 let schema = Arc::new(Schema::new(vec![Field::new(
4523 "a",
4524 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
4525 false,
4526 )]));
4527
4528 let mut write_single_batch_stream =
4529 |batch: RecordBatch, dict_tracker: &mut DictionaryTracker| -> Vec<u8> {
4530 let mut buffer = Vec::new();
4531
4532 let stream_header = data_gen.schema_to_bytes_with_dictionary_tracker(
4534 &schema,
4535 dict_tracker,
4536 &writer_options,
4537 );
4538 _ = write_message(&mut buffer, stream_header, &writer_options).unwrap();
4539
4540 let (encoded_dicts, encoded_batch) = data_gen
4541 .encode(&batch, dict_tracker, &writer_options, &mut compression_ctx)
4542 .unwrap();
4543 for encoded_dict in encoded_dicts {
4544 _ = write_message(&mut buffer, encoded_dict, &writer_options).unwrap();
4545 }
4546 _ = write_message(&mut buffer, encoded_batch, &writer_options).unwrap();
4547
4548 buffer
4549 };
4550
4551 let batch1 = RecordBatch::try_new(
4552 schema.clone(),
4553 vec![Arc::new(DictionaryArray::new(
4554 UInt8Array::from_iter_values([0]),
4555 Arc::new(StringArray::from_iter_values(["a"])),
4556 ))],
4557 )
4558 .unwrap();
4559 let buffer = write_single_batch_stream(batch1.clone(), &mut dictionary_tracker);
4560
4561 let mut reader = StreamReader::try_new(Cursor::new(buffer), None).unwrap();
4563 let read_batch = reader.next().unwrap().unwrap();
4564 assert_eq!(read_batch, batch1);
4565
4566 dictionary_tracker.clear();
4568
4569 let batch2 = RecordBatch::try_new(
4571 schema.clone(),
4572 vec![Arc::new(DictionaryArray::new(
4573 UInt8Array::from_iter_values([0]),
4574 Arc::new(StringArray::from_iter_values(["a"])),
4575 ))],
4576 )
4577 .unwrap();
4578 let buffer = write_single_batch_stream(batch2.clone(), &mut dictionary_tracker);
4579 let mut reader = StreamReader::try_new(Cursor::new(buffer), None).unwrap();
4580 let read_batch = reader.next().unwrap().unwrap();
4581 assert_eq!(read_batch, batch2);
4582 }
4583}