1use std::cmp::min;
28use std::collections::HashMap;
29use std::io::{BufWriter, Write};
30use std::mem::size_of;
31use std::sync::Arc;
32
33use flatbuffers::FlatBufferBuilder;
34
35use arrow_array::builder::BufferBuilder;
36use arrow_array::cast::*;
37use arrow_array::types::{Int16Type, Int32Type, Int64Type, RunEndIndexType};
38use arrow_array::*;
39use arrow_buffer::bit_util;
40use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer, ToByteSlice};
41use arrow_data::{ArrayData, ArrayDataBuilder, BufferSpec, layout};
42use arrow_schema::*;
43
44use crate::CONTINUATION_MARKER;
45use crate::compression::CompressionCodec;
46#[expect(deprecated)]
47pub use crate::compression::{CompressionContext, IpcWriteContext};
48use crate::convert::IpcSchemaEncoder;
49
50#[derive(Debug, Clone)]
52pub struct IpcWriteOptions {
53 alignment: u8,
56 write_legacy_ipc_format: bool,
58 metadata_version: crate::MetadataVersion,
67 batch_compression_type: Option<crate::CompressionType>,
70 batch_compression_level: Option<i32>,
72 dictionary_handling: DictionaryHandling,
74}
75
76enum EncodedBuffer {
82 Raw(Buffer),
84 Compressed(Vec<u8>),
86}
87
88impl EncodedBuffer {
89 fn as_slice(&self) -> &[u8] {
90 match self {
91 EncodedBuffer::Raw(b) => b.as_slice(),
92 EncodedBuffer::Compressed(v) => v.as_slice(),
93 }
94 }
95
96 fn len(&self) -> usize {
97 match self {
98 EncodedBuffer::Raw(b) => b.len(),
99 EncodedBuffer::Compressed(v) => v.len(),
100 }
101 }
102}
103#[derive(Default)]
108struct IpcMetadataBuilder {
109 nodes: Vec<crate::FieldNode>,
110 buffers: Vec<crate::Buffer>,
111}
112
113enum IpcBodySink<'a> {
118 Write(&'a mut Vec<u8>),
120 Collect(&'a mut Vec<EncodedBuffer>),
122}
123impl<'a> IpcBodySink<'a> {
124 pub fn write(&mut self, pad_len: usize, buffer: EncodedBuffer) {
126 match self {
127 IpcBodySink::Write(vec) => {
128 vec.extend_from_slice(buffer.as_slice());
129 vec.extend_from_slice(&PADDING[..pad_len]);
130 }
131 IpcBodySink::Collect(vec) => {
132 vec.push(buffer);
133 }
134 }
135 }
136}
137
138struct IpcWriteMetadata {
143 dictionary_block_sizes: Vec<(usize, usize)>,
146 padded_header_len: usize,
148 body_len: usize,
150}
151
152impl IpcWriteOptions {
153 pub fn try_with_compression(
158 mut self,
159 batch_compression_type: Option<crate::CompressionType>,
160 ) -> Result<Self, ArrowError> {
161 self.batch_compression_type = batch_compression_type;
162
163 if self.batch_compression_type.is_some()
164 && self.metadata_version < crate::MetadataVersion::V5
165 {
166 return Err(ArrowError::InvalidArgumentError(
167 "Compression only supported in metadata v5 and above".to_string(),
168 ));
169 }
170 Ok(self)
171 }
172
173 pub fn try_with_compression_level(
178 mut self,
179 batch_compression_level: Option<i32>,
180 ) -> Result<Self, ArrowError> {
181 self.batch_compression_level = batch_compression_level;
182
183 if self.batch_compression_level.is_some()
184 && self.metadata_version < crate::MetadataVersion::V5
185 {
186 return Err(ArrowError::InvalidArgumentError(
187 "Compression only supported in metadata v5 and above".to_string(),
188 ));
189 }
190
191 match (self.batch_compression_type, self.batch_compression_level) {
192 (Some(crate::CompressionType::ZSTD), Some(level)) => {
193 return self.check_zstd_level(level);
194 }
195 (Some(crate::CompressionType::LZ4_FRAME), Some(_)) => {
196 return Err(ArrowError::InvalidArgumentError(
197 "LZ4 Frame compression does not support configurable compression levels"
198 .to_string(),
199 ));
200 }
201 _ => {}
202 }
203
204 Ok(self)
205 }
206
207 #[cfg(not(feature = "zstd"))]
208 fn check_zstd_level(self, _level: i32) -> Result<Self, ArrowError> {
209 Err(ArrowError::InvalidArgumentError(
210 "zstd IPC compression requires the zstd feature".to_string(),
211 ))
212 }
213
214 #[cfg(feature = "zstd")]
215 fn check_zstd_level(self, level: i32) -> Result<Self, ArrowError> {
216 let range = zstd::compression_level_range();
217 if !range.contains(&(level as zstd::zstd_safe::CompressionLevel)) {
218 return Err(ArrowError::InvalidArgumentError(format!(
219 "ZSTD compression level must be between {} and {}, got {}",
220 range.start(),
221 range.end(),
222 level,
223 )));
224 }
225
226 Ok(self)
227 }
228
229 pub fn try_new(
231 alignment: usize,
232 write_legacy_ipc_format: bool,
233 metadata_version: crate::MetadataVersion,
234 ) -> Result<Self, ArrowError> {
235 let is_alignment_valid =
236 alignment == 8 || alignment == 16 || alignment == 32 || alignment == 64;
237 if !is_alignment_valid {
238 return Err(ArrowError::InvalidArgumentError(
239 "Alignment should be 8, 16, 32, or 64.".to_string(),
240 ));
241 }
242 let alignment: u8 = u8::try_from(alignment).expect("range already checked");
243 match metadata_version {
244 crate::MetadataVersion::V1
245 | crate::MetadataVersion::V2
246 | crate::MetadataVersion::V3 => Err(ArrowError::InvalidArgumentError(
247 "Writing IPC metadata version 3 and lower not supported".to_string(),
248 )),
249 #[allow(deprecated)]
250 crate::MetadataVersion::V4 => Ok(Self {
251 alignment,
252 write_legacy_ipc_format,
253 metadata_version,
254 batch_compression_type: None,
255 batch_compression_level: None,
256 dictionary_handling: DictionaryHandling::default(),
257 }),
258 crate::MetadataVersion::V5 => {
259 if write_legacy_ipc_format {
260 Err(ArrowError::InvalidArgumentError(
261 "Legacy IPC format only supported on metadata version 4".to_string(),
262 ))
263 } else {
264 Ok(Self {
265 alignment,
266 write_legacy_ipc_format,
267 metadata_version,
268 batch_compression_type: None,
269 batch_compression_level: None,
270 dictionary_handling: DictionaryHandling::default(),
271 })
272 }
273 }
274 z => Err(ArrowError::InvalidArgumentError(format!(
275 "Unsupported crate::MetadataVersion {z:?}"
276 ))),
277 }
278 }
279
280 pub fn with_dictionary_handling(mut self, dictionary_handling: DictionaryHandling) -> Self {
282 self.dictionary_handling = dictionary_handling;
283 self
284 }
285}
286
287impl Default for IpcWriteOptions {
288 fn default() -> Self {
289 Self {
290 alignment: 64,
291 write_legacy_ipc_format: false,
292 metadata_version: crate::MetadataVersion::V5,
293 batch_compression_type: None,
294 batch_compression_level: None,
295 dictionary_handling: DictionaryHandling::default(),
296 }
297 }
298}
299
300#[derive(Debug, Default)]
301pub struct IpcDataGenerator {}
335
336impl IpcDataGenerator {
337 pub fn schema_to_bytes_with_dictionary_tracker(
340 &self,
341 schema: &Schema,
342 dictionary_tracker: &mut DictionaryTracker,
343 write_options: &IpcWriteOptions,
344 ) -> EncodedData {
345 let mut fbb = FlatBufferBuilder::new();
346 let schema = {
347 let fb = IpcSchemaEncoder::new()
348 .with_dictionary_tracker(dictionary_tracker)
349 .schema_to_fb_offset(&mut fbb, schema);
350 fb.as_union_value()
351 };
352
353 let mut message = crate::MessageBuilder::new(&mut fbb);
354 message.add_version(write_options.metadata_version);
355 message.add_header_type(crate::MessageHeader::Schema);
356 message.add_bodyLength(0);
357 message.add_header(schema);
358 let data = message.finish();
360 fbb.finish(data, None);
361
362 let data = fbb.finished_data();
363 EncodedData {
364 ipc_message: data.to_vec(),
365 arrow_data: vec![],
366 }
367 }
368
369 fn _encode_dictionaries<I: Iterator<Item = i64>>(
370 &self,
371 column: &ArrayRef,
372 encoded_dictionaries: &mut Vec<EncodedData>,
373 dictionary_tracker: &mut DictionaryTracker,
374 write_options: &IpcWriteOptions,
375 dict_id: &mut I,
376 ipc_write_context: &mut IpcWriteContext,
377 ) -> Result<(), ArrowError> {
378 match column.data_type() {
379 DataType::Struct(fields) => {
380 let s = as_struct_array(column);
381 for (field, column) in fields.iter().zip(s.columns()) {
382 self.encode_dictionaries(
383 field,
384 column,
385 encoded_dictionaries,
386 dictionary_tracker,
387 write_options,
388 dict_id,
389 ipc_write_context,
390 )?;
391 }
392 }
393 DataType::RunEndEncoded(_, values) => {
394 let data = column.to_data();
395 if data.child_data().len() != 2 {
396 return Err(ArrowError::InvalidArgumentError(format!(
397 "The run encoded array should have exactly two child arrays. Found {}",
398 data.child_data().len()
399 )));
400 }
401 let values_array = make_array(data.child_data()[1].clone());
404 self.encode_dictionaries(
405 values,
406 &values_array,
407 encoded_dictionaries,
408 dictionary_tracker,
409 write_options,
410 dict_id,
411 ipc_write_context,
412 )?;
413 }
414 DataType::List(field) => {
415 let list = as_list_array(column);
416 self.encode_dictionaries(
417 field,
418 list.values(),
419 encoded_dictionaries,
420 dictionary_tracker,
421 write_options,
422 dict_id,
423 ipc_write_context,
424 )?;
425 }
426 DataType::LargeList(field) => {
427 let list = as_large_list_array(column);
428 self.encode_dictionaries(
429 field,
430 list.values(),
431 encoded_dictionaries,
432 dictionary_tracker,
433 write_options,
434 dict_id,
435 ipc_write_context,
436 )?;
437 }
438 DataType::ListView(field) => {
439 let list = column.as_list_view::<i32>();
440 self.encode_dictionaries(
441 field,
442 list.values(),
443 encoded_dictionaries,
444 dictionary_tracker,
445 write_options,
446 dict_id,
447 ipc_write_context,
448 )?;
449 }
450 DataType::LargeListView(field) => {
451 let list = column.as_list_view::<i64>();
452 self.encode_dictionaries(
453 field,
454 list.values(),
455 encoded_dictionaries,
456 dictionary_tracker,
457 write_options,
458 dict_id,
459 ipc_write_context,
460 )?;
461 }
462 DataType::FixedSizeList(field, _) => {
463 let list = column
464 .as_any()
465 .downcast_ref::<FixedSizeListArray>()
466 .expect("Unable to downcast to fixed size list array");
467 self.encode_dictionaries(
468 field,
469 list.values(),
470 encoded_dictionaries,
471 dictionary_tracker,
472 write_options,
473 dict_id,
474 ipc_write_context,
475 )?;
476 }
477 DataType::Map(field, _) => {
478 let map_array = as_map_array(column);
479
480 let (keys, values) = match field.data_type() {
481 DataType::Struct(fields) if fields.len() == 2 => (&fields[0], &fields[1]),
482 _ => panic!("Incorrect field data type {:?}", field.data_type()),
483 };
484
485 self.encode_dictionaries(
487 keys,
488 map_array.keys(),
489 encoded_dictionaries,
490 dictionary_tracker,
491 write_options,
492 dict_id,
493 ipc_write_context,
494 )?;
495
496 self.encode_dictionaries(
498 values,
499 map_array.values(),
500 encoded_dictionaries,
501 dictionary_tracker,
502 write_options,
503 dict_id,
504 ipc_write_context,
505 )?;
506 }
507 DataType::Union(fields, _) => {
508 let union = as_union_array(column);
509 for (type_id, field) in fields.iter() {
510 let column = union.child(type_id);
511 self.encode_dictionaries(
512 field,
513 column,
514 encoded_dictionaries,
515 dictionary_tracker,
516 write_options,
517 dict_id,
518 ipc_write_context,
519 )?;
520 }
521 }
522 _ => (),
523 }
524
525 Ok(())
526 }
527
528 #[allow(clippy::too_many_arguments)]
529 fn encode_dictionaries<I: Iterator<Item = i64>>(
530 &self,
531 field: &Field,
532 column: &ArrayRef,
533 encoded_dictionaries: &mut Vec<EncodedData>,
534 dictionary_tracker: &mut DictionaryTracker,
535 write_options: &IpcWriteOptions,
536 dict_id_seq: &mut I,
537 ipc_write_context: &mut IpcWriteContext,
538 ) -> Result<(), ArrowError> {
539 match column.data_type() {
540 DataType::Dictionary(_key_type, value_type) => {
541 if matches!(value_type.as_ref(), DataType::Dictionary(_, _)) {
542 return Err(ArrowError::InvalidArgumentError(format!(
543 "Arrow IPC field metadata cannot encode direct dictionary-of-dictionary values for field {:?}",
544 field.name()
545 )));
546 }
547
548 let dict_data = column.to_data();
549 let dict_values = &dict_data.child_data()[0];
550
551 let values = make_array(dict_data.child_data()[0].clone());
552
553 self._encode_dictionaries(
554 &values,
555 encoded_dictionaries,
556 dictionary_tracker,
557 write_options,
558 dict_id_seq,
559 ipc_write_context,
560 )?;
561
562 let dict_id = dict_id_seq.next().ok_or_else(|| {
566 ArrowError::IpcError(format!(
567 "no dict id for field {:?}: field.data_type={:?}, column.data_type={:?}",
568 field.name(),
569 field.data_type(),
570 column.data_type()
571 ))
572 })?;
573
574 match dictionary_tracker.insert_column(
575 dict_id,
576 column,
577 write_options.dictionary_handling,
578 )? {
579 DictionaryUpdate::None => {}
580 DictionaryUpdate::New | DictionaryUpdate::Replaced => {
581 encoded_dictionaries.push(self.dictionary_batch_to_bytes(
582 dict_id,
583 dict_values,
584 write_options,
585 false,
586 ipc_write_context,
587 )?);
588 }
589 DictionaryUpdate::Delta(data) => {
590 encoded_dictionaries.push(self.dictionary_batch_to_bytes(
591 dict_id,
592 &data,
593 write_options,
594 true,
595 ipc_write_context,
596 )?);
597 }
598 }
599 }
600 _ => self._encode_dictionaries(
601 column,
602 encoded_dictionaries,
603 dictionary_tracker,
604 write_options,
605 dict_id_seq,
606 ipc_write_context,
607 )?,
608 }
609
610 Ok(())
611 }
612
613 pub fn encode(
617 &self,
618 batch: &RecordBatch,
619 dictionary_tracker: &mut DictionaryTracker,
620 write_options: &IpcWriteOptions,
621 ipc_write_context: &mut IpcWriteContext,
622 ) -> Result<(Vec<EncodedData>, EncodedData), ArrowError> {
623 let encoded_dictionaries =
624 self.encode_all_dicts(batch, dictionary_tracker, write_options, ipc_write_context)?;
625 let mut arrow_data = Vec::new();
626 let (ipc_message, _, tail_pad) = self.record_batch_to_bytes(
627 batch,
628 write_options,
629 ipc_write_context,
630 &mut IpcBodySink::Write(&mut arrow_data),
631 )?;
632 arrow_data.extend_from_slice(&PADDING[..tail_pad]);
633 Ok((
634 encoded_dictionaries,
635 EncodedData {
636 ipc_message,
637 arrow_data,
638 },
639 ))
640 }
641
642 fn encode_all_dicts(
644 &self,
645 batch: &RecordBatch,
646 dictionary_tracker: &mut DictionaryTracker,
647 write_options: &IpcWriteOptions,
648 ipc_write_context: &mut IpcWriteContext,
649 ) -> Result<Vec<EncodedData>, ArrowError> {
650 let schema = batch.schema();
651 let mut encoded_dictionaries = Vec::with_capacity(schema.flattened_fields().len());
652 let mut dict_id = dictionary_tracker.dict_ids.clone().into_iter();
653 for (i, field) in schema.fields().iter().enumerate() {
654 self.encode_dictionaries(
655 field,
656 batch.column(i),
657 &mut encoded_dictionaries,
658 dictionary_tracker,
659 write_options,
660 &mut dict_id,
661 ipc_write_context,
662 )?;
663 }
664 Ok(encoded_dictionaries)
665 }
666
667 fn write<W: Write>(
671 &self,
672 batch: &RecordBatch,
673 dictionary_tracker: &mut DictionaryTracker,
674 write_options: &IpcWriteOptions,
675 ipc_write_context: &mut IpcWriteContext,
676 writer: &mut W,
677 ) -> Result<IpcWriteMetadata, ArrowError> {
678 let encoded_dictionaries =
679 self.encode_all_dicts(batch, dictionary_tracker, write_options, ipc_write_context)?;
680
681 let mut dictionary_block_sizes = Vec::with_capacity(encoded_dictionaries.len());
682 for dict in encoded_dictionaries {
683 dictionary_block_sizes.push(write_message(&mut *writer, dict, write_options)?);
684 }
685
686 let capacity = batch
687 .columns()
688 .iter()
689 .map(|a| estimate_encoded_buffer_count(a.data_type()))
690 .sum();
691 let mut encoded_buffers: Vec<EncodedBuffer> = Vec::with_capacity(capacity);
692 let (ipc_message, body_len, tail_pad) = self.record_batch_to_bytes(
693 batch,
694 write_options,
695 ipc_write_context,
696 &mut IpcBodySink::Collect(&mut encoded_buffers),
697 )?;
698
699 let alignment = write_options.alignment;
700 let a = usize::from(alignment - 1);
701 let prefix_size = if write_options.write_legacy_ipc_format {
702 4
703 } else {
704 8
705 };
706 let aligned_size = (ipc_message.len() + prefix_size + a) & !a;
707 write_continuation(
708 &mut *writer,
709 write_options,
710 (aligned_size - prefix_size) as i32,
711 )?;
712 writer.write_all(&ipc_message)?;
713 writer.write_all(&PADDING[..aligned_size - ipc_message.len() - prefix_size])?;
714 for enc in &encoded_buffers {
715 writer.write_all(enc.as_slice())?;
716 writer.write_all(&PADDING[..pad_to_alignment(alignment, enc.len())])?;
717 }
718 writer.write_all(&PADDING[..tail_pad])?;
719
720 Ok(IpcWriteMetadata {
721 dictionary_block_sizes,
722 padded_header_len: aligned_size,
723 body_len,
724 })
725 }
726
727 #[deprecated(since = "57.0.0", note = "Use `encode` instead")]
731 pub fn encoded_batch(
732 &self,
733 batch: &RecordBatch,
734 dictionary_tracker: &mut DictionaryTracker,
735 write_options: &IpcWriteOptions,
736 ) -> Result<(Vec<EncodedData>, EncodedData), ArrowError> {
737 self.encode(
738 batch,
739 dictionary_tracker,
740 write_options,
741 &mut Default::default(),
742 )
743 }
744
745 fn record_batch_to_bytes(
751 &self,
752 batch: &RecordBatch,
753 write_options: &IpcWriteOptions,
754 ipc_write_context: &mut IpcWriteContext,
755 sink: &mut IpcBodySink<'_>,
756 ) -> Result<(Vec<u8>, usize, usize), ArrowError> {
757 let batch_compression_type = write_options.batch_compression_type;
758
759 let compression = batch_compression_type.map(|batch_compression_type| {
760 let fbb = ipc_write_context.mut_fbb();
761 let mut c = crate::BodyCompressionBuilder::new(fbb);
762 c.add_method(crate::BodyCompressionMethod::BUFFER);
763 c.add_codec(batch_compression_type);
764 c.finish()
765 });
766
767 let batch_compression_level = write_options.batch_compression_level;
768 let compression_codec: Option<CompressionCodec> = batch_compression_type
769 .map(|compression_type| match batch_compression_level {
770 Some(level) => {
771 CompressionCodec::try_new_with_compression_level(compression_type, level)
772 }
773 None => compression_type.try_into(),
774 })
775 .transpose()?;
776
777 let alignment = write_options.alignment;
778 let mut variadic_buffer_counts = vec![];
779 let mut meta = IpcMetadataBuilder::default();
780 let mut offset = 0i64;
781
782 for array in batch.columns() {
783 let array_data = array.to_data();
784 offset = write_array_data(
785 &array_data,
786 &mut meta,
787 sink,
788 offset,
789 compression_codec,
790 ipc_write_context,
791 write_options,
792 )?;
793 append_variadic_buffer_counts(&mut variadic_buffer_counts, &array_data);
794 }
795
796 let tail_pad = pad_to_alignment(alignment, offset as usize);
797 let body_len = offset as usize + tail_pad;
798
799 let fbb = ipc_write_context.mut_fbb();
800 let buffers = fbb.create_vector(&meta.buffers);
801 let nodes = fbb.create_vector(&meta.nodes);
802 let variadic_buffer = if variadic_buffer_counts.is_empty() {
803 None
804 } else {
805 Some(fbb.create_vector(&variadic_buffer_counts))
806 };
807
808 let root = {
809 let mut batch_builder = crate::RecordBatchBuilder::new(fbb);
810 batch_builder.add_length(batch.num_rows() as i64);
811 batch_builder.add_nodes(nodes);
812 batch_builder.add_buffers(buffers);
813 if let Some(c) = compression {
814 batch_builder.add_compression(c);
815 }
816 if let Some(v) = variadic_buffer {
817 batch_builder.add_variadicBufferCounts(v);
818 }
819 batch_builder.finish().as_union_value()
820 };
821 let mut message = crate::MessageBuilder::new(fbb);
822 message.add_version(write_options.metadata_version);
823 message.add_header_type(crate::MessageHeader::RecordBatch);
824 message.add_bodyLength(body_len as i64);
825 message.add_header(root);
826 let root = message.finish();
827 fbb.finish(root, None);
828
829 let ipc_message = fbb.finished_data().to_vec();
830 fbb.reset();
831 Ok((ipc_message, body_len, tail_pad))
832 }
833
834 fn dictionary_batch_to_bytes(
837 &self,
838 dict_id: i64,
839 array_data: &ArrayData,
840 write_options: &IpcWriteOptions,
841 is_delta: bool,
842 ipc_write_context: &mut IpcWriteContext,
843 ) -> Result<EncodedData, ArrowError> {
844 let mut arrow_data: Vec<u8> = vec![];
845
846 let batch_compression_type = write_options.batch_compression_type;
848
849 let compression = batch_compression_type.map(|batch_compression_type| {
850 let fbb = ipc_write_context.mut_fbb();
851 let mut c = crate::BodyCompressionBuilder::new(fbb);
852 c.add_method(crate::BodyCompressionMethod::BUFFER);
853 c.add_codec(batch_compression_type);
854 c.finish()
855 });
856
857 let batch_compression_level = write_options.batch_compression_level;
858 let compression_codec: Option<CompressionCodec> = batch_compression_type
859 .map(|batch_compression_type| match batch_compression_level {
860 Some(level) => {
861 CompressionCodec::try_new_with_compression_level(batch_compression_type, level)
862 }
863 None => batch_compression_type.try_into(),
864 })
865 .transpose()?;
866
867 let alignment = write_options.alignment;
868 let mut meta = IpcMetadataBuilder::default();
869 let mut sink = IpcBodySink::Write(&mut arrow_data);
870 let offset = write_array_data(
871 array_data,
872 &mut meta,
873 &mut sink,
874 0,
875 compression_codec,
876 ipc_write_context,
877 write_options,
878 )?;
879
880 let mut variadic_buffer_counts = vec![];
881 append_variadic_buffer_counts(&mut variadic_buffer_counts, array_data);
882
883 let tail_pad = pad_to_alignment(alignment, offset as usize);
885 let body_len = offset as usize + tail_pad;
886 arrow_data.extend_from_slice(&PADDING[..tail_pad]);
887
888 let fbb = ipc_write_context.mut_fbb();
889 let buffers = fbb.create_vector(&meta.buffers);
890 let nodes = fbb.create_vector(&meta.nodes);
891 let variadic_buffer = if variadic_buffer_counts.is_empty() {
892 None
893 } else {
894 Some(fbb.create_vector(&variadic_buffer_counts))
895 };
896
897 let root = {
898 let mut batch_builder = crate::RecordBatchBuilder::new(fbb);
899 batch_builder.add_length(array_data.len() as i64);
900 batch_builder.add_nodes(nodes);
901 batch_builder.add_buffers(buffers);
902 if let Some(c) = compression {
903 batch_builder.add_compression(c);
904 }
905 if let Some(v) = variadic_buffer {
906 batch_builder.add_variadicBufferCounts(v);
907 }
908 batch_builder.finish()
909 };
910
911 let root = {
912 let mut batch_builder = crate::DictionaryBatchBuilder::new(fbb);
913 batch_builder.add_id(dict_id);
914 batch_builder.add_data(root);
915 batch_builder.add_isDelta(is_delta);
916 batch_builder.finish().as_union_value()
917 };
918
919 let root = {
920 let mut message_builder = crate::MessageBuilder::new(fbb);
921 message_builder.add_version(write_options.metadata_version);
922 message_builder.add_header_type(crate::MessageHeader::DictionaryBatch);
923 message_builder.add_bodyLength(body_len as i64);
924 message_builder.add_header(root);
925 message_builder.finish()
926 };
927
928 fbb.finish(root, None);
929 let ipc_message = fbb.finished_data().to_vec();
930 fbb.reset();
931
932 Ok(EncodedData {
933 ipc_message,
934 arrow_data,
935 })
936 }
937}
938
939fn ensure_supported_ipc_schema(schema: &Schema) -> Result<(), ArrowError> {
940 schema
941 .fields()
942 .iter()
943 .try_for_each(|field| ensure_supported_ipc_data_type(field.name(), field.data_type()))
944}
945
946fn ensure_supported_ipc_data_type(
947 field_name: &str,
948 data_type: &DataType,
949) -> Result<(), ArrowError> {
950 match data_type {
951 DataType::Dictionary(_, value_type)
952 if matches!(value_type.as_ref(), DataType::Dictionary(_, _)) =>
953 {
954 Err(ArrowError::InvalidArgumentError(format!(
955 "Arrow IPC field metadata cannot encode direct dictionary-of-dictionary values for field {field_name:?}"
956 )))
957 }
958 DataType::Dictionary(_, value_type) => {
959 ensure_supported_ipc_data_type(field_name, value_type)
960 }
961 DataType::Struct(fields) => fields
962 .iter()
963 .try_for_each(|field| ensure_supported_ipc_data_type(field.name(), field.data_type())),
964 DataType::RunEndEncoded(_, field)
965 | DataType::List(field)
966 | DataType::LargeList(field)
967 | DataType::ListView(field)
968 | DataType::LargeListView(field)
969 | DataType::FixedSizeList(field, _)
970 | DataType::Map(field, _) => {
971 ensure_supported_ipc_data_type(field.name(), field.data_type())
972 }
973 DataType::Union(fields, _) => fields.iter().try_for_each(|(_, field)| {
974 ensure_supported_ipc_data_type(field.name(), field.data_type())
975 }),
976 _ => Ok(()),
977 }
978}
979
980fn append_variadic_buffer_counts(counts: &mut Vec<i64>, array: &ArrayData) {
981 match array.data_type() {
982 DataType::BinaryView | DataType::Utf8View => {
983 counts.push(array.buffers().len() as i64 - 1);
986 }
987 DataType::Dictionary(_, _) => {
988 }
991 _ => {
992 for child in array.child_data() {
993 append_variadic_buffer_counts(counts, child)
994 }
995 }
996 }
997}
998
999pub(crate) fn unslice_run_array(arr: ArrayData) -> Result<ArrayData, ArrowError> {
1000 match arr.data_type() {
1001 DataType::RunEndEncoded(k, _) => match k.data_type() {
1002 DataType::Int16 => {
1003 Ok(into_zero_offset_run_array(RunArray::<Int16Type>::from(arr))?.into_data())
1004 }
1005 DataType::Int32 => {
1006 Ok(into_zero_offset_run_array(RunArray::<Int32Type>::from(arr))?.into_data())
1007 }
1008 DataType::Int64 => {
1009 Ok(into_zero_offset_run_array(RunArray::<Int64Type>::from(arr))?.into_data())
1010 }
1011 d => unreachable!("Unexpected data type {d}"),
1012 },
1013 d => Err(ArrowError::InvalidArgumentError(format!(
1014 "The given array is not a run array. Data type of given array: {d}"
1015 ))),
1016 }
1017}
1018
1019fn into_zero_offset_run_array<R: RunEndIndexType>(
1022 run_array: RunArray<R>,
1023) -> Result<RunArray<R>, ArrowError> {
1024 let run_ends = run_array.run_ends();
1025 if run_ends.offset() == 0 && run_ends.max_value() == run_ends.len() {
1026 return Ok(run_array);
1027 }
1028
1029 let start_physical_index = run_ends.get_start_physical_index();
1031
1032 let end_physical_index = run_ends.get_end_physical_index();
1034
1035 let physical_length = end_physical_index - start_physical_index + 1;
1036
1037 let offset = R::Native::usize_as(run_ends.offset());
1039 let mut builder = BufferBuilder::<R::Native>::new(physical_length);
1040 for run_end_value in &run_ends.values()[start_physical_index..end_physical_index] {
1041 builder.append(run_end_value.sub_wrapping(offset));
1042 }
1043 builder.append(R::Native::from_usize(run_array.len()).unwrap());
1044 let new_run_ends = unsafe {
1045 ArrayDataBuilder::new(R::DATA_TYPE)
1048 .len(physical_length)
1049 .add_buffer(builder.finish())
1050 .build_unchecked()
1051 };
1052
1053 let new_values = run_array
1055 .values()
1056 .slice(start_physical_index, physical_length)
1057 .into_data();
1058
1059 let builder = ArrayDataBuilder::new(run_array.data_type().clone())
1060 .len(run_array.len())
1061 .add_child_data(new_run_ends)
1062 .add_child_data(new_values);
1063 let array_data = unsafe {
1064 builder.build_unchecked()
1067 };
1068 Ok(array_data.into())
1069}
1070
1071#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
1073pub enum DictionaryHandling {
1074 #[default]
1076 Resend,
1077 Delta,
1083}
1084
1085#[derive(Debug, Clone)]
1087pub enum DictionaryUpdate {
1088 None,
1091 New,
1093 Replaced,
1095 Delta(ArrayData),
1097}
1098
1099#[derive(Debug)]
1105pub struct DictionaryTracker {
1106 written: HashMap<i64, ArrayData>,
1108 dict_ids: Vec<i64>,
1109 error_on_replacement: bool,
1110}
1111
1112impl DictionaryTracker {
1113 pub fn new(error_on_replacement: bool) -> Self {
1119 #[allow(deprecated)]
1120 Self {
1121 written: HashMap::new(),
1122 dict_ids: Vec::new(),
1123 error_on_replacement,
1124 }
1125 }
1126
1127 pub fn next_dict_id(&mut self) -> i64 {
1129 let next = self
1130 .dict_ids
1131 .last()
1132 .copied()
1133 .map(|i| i + 1)
1134 .unwrap_or_default();
1135
1136 self.dict_ids.push(next);
1137 next
1138 }
1139
1140 pub fn dict_id(&mut self) -> &[i64] {
1143 &self.dict_ids
1144 }
1145
1146 #[deprecated(since = "56.1.0", note = "Use `insert_column` instead")]
1156 pub fn insert(&mut self, dict_id: i64, column: &ArrayRef) -> Result<bool, ArrowError> {
1157 let dict_data = column.to_data();
1158 let dict_values = &dict_data.child_data()[0];
1159
1160 if let Some(last) = self.written.get(&dict_id) {
1162 if ArrayData::ptr_eq(&last.child_data()[0], dict_values) {
1163 return Ok(false);
1165 }
1166 if self.error_on_replacement {
1167 if last.child_data()[0] == *dict_values {
1169 return Ok(false);
1171 }
1172 return Err(ArrowError::InvalidArgumentError(
1173 "Dictionary replacement detected when writing IPC file format. \
1174 Arrow IPC files only support a single dictionary for a given field \
1175 across all batches."
1176 .to_string(),
1177 ));
1178 }
1179 }
1180
1181 self.written.insert(dict_id, dict_data);
1182 Ok(true)
1183 }
1184
1185 pub fn insert_column(
1201 &mut self,
1202 dict_id: i64,
1203 column: &ArrayRef,
1204 dict_handling: DictionaryHandling,
1205 ) -> Result<DictionaryUpdate, ArrowError> {
1206 let new_data = column.to_data();
1207 let new_values = &new_data.child_data()[0];
1208
1209 let Some(old) = self.written.get(&dict_id) else {
1211 self.written.insert(dict_id, new_data);
1212 return Ok(DictionaryUpdate::New);
1213 };
1214
1215 let old_values = &old.child_data()[0];
1218 if ArrayData::ptr_eq(old_values, new_values) {
1219 return Ok(DictionaryUpdate::None);
1220 }
1221
1222 let comparison = compare_dictionaries(old_values, new_values);
1224 if matches!(comparison, DictionaryComparison::Equal) {
1225 return Ok(DictionaryUpdate::None);
1226 }
1227
1228 const REPLACEMENT_ERROR: &str = "Dictionary replacement detected when writing IPC file format. \
1229 Arrow IPC files only support a single dictionary for a given field \
1230 across all batches.";
1231
1232 match comparison {
1233 DictionaryComparison::NotEqual => {
1234 if self.error_on_replacement {
1235 return Err(ArrowError::InvalidArgumentError(
1236 REPLACEMENT_ERROR.to_string(),
1237 ));
1238 }
1239
1240 self.written.insert(dict_id, new_data);
1241 Ok(DictionaryUpdate::Replaced)
1242 }
1243 DictionaryComparison::Delta => match dict_handling {
1244 DictionaryHandling::Resend => {
1245 if self.error_on_replacement {
1246 return Err(ArrowError::InvalidArgumentError(
1247 REPLACEMENT_ERROR.to_string(),
1248 ));
1249 }
1250
1251 self.written.insert(dict_id, new_data);
1252 Ok(DictionaryUpdate::Replaced)
1253 }
1254 DictionaryHandling::Delta => {
1255 let delta =
1256 new_values.slice(old_values.len(), new_values.len() - old_values.len());
1257 self.written.insert(dict_id, new_data);
1258 Ok(DictionaryUpdate::Delta(delta))
1259 }
1260 },
1261 DictionaryComparison::Equal => unreachable!("Already checked equal case"),
1262 }
1263 }
1264
1265 pub fn clear(&mut self) {
1271 self.dict_ids.clear();
1272 self.written.clear();
1273 }
1274}
1275
1276#[derive(Debug, Clone)]
1278enum DictionaryComparison {
1279 NotEqual,
1281 Equal,
1283 Delta,
1286}
1287
1288fn compare_dictionaries(old: &ArrayData, new: &ArrayData) -> DictionaryComparison {
1290 let existing_len = old.len();
1292 let new_len = new.len();
1293 if existing_len == new_len {
1294 if *old == *new {
1295 return DictionaryComparison::Equal;
1296 } else {
1297 return DictionaryComparison::NotEqual;
1298 }
1299 }
1300
1301 if new_len < existing_len {
1303 return DictionaryComparison::NotEqual;
1304 }
1305
1306 if new.slice(0, existing_len) == *old {
1308 return DictionaryComparison::Delta;
1309 }
1310
1311 DictionaryComparison::NotEqual
1312}
1313
1314pub struct FileWriter<W> {
1337 writer: W,
1339 write_options: IpcWriteOptions,
1341 schema: SchemaRef,
1343 block_offsets: usize,
1345 dictionary_blocks: Vec<crate::Block>,
1347 record_blocks: Vec<crate::Block>,
1349 finished: bool,
1351 dictionary_tracker: DictionaryTracker,
1353 custom_metadata: HashMap<String, String>,
1355
1356 data_gen: IpcDataGenerator,
1357
1358 ipc_write_context: IpcWriteContext,
1359}
1360
1361impl<W: Write> FileWriter<BufWriter<W>> {
1362 pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1366 Self::try_new(BufWriter::new(writer), schema)
1367 }
1368}
1369
1370impl<W: Write> FileWriter<W> {
1371 pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1379 let write_options = IpcWriteOptions::default();
1380 Self::try_new_with_options(writer, schema, write_options)
1381 }
1382
1383 pub fn try_new_with_options(
1391 mut writer: W,
1392 schema: &Schema,
1393 write_options: IpcWriteOptions,
1394 ) -> Result<Self, ArrowError> {
1395 ensure_supported_ipc_schema(schema)?;
1396
1397 let data_gen = IpcDataGenerator::default();
1398 let pad_len = pad_to_alignment(write_options.alignment, super::ARROW_MAGIC.len());
1400 let header_size = super::ARROW_MAGIC.len() + pad_len;
1401 writer.write_all(&super::ARROW_MAGIC)?;
1402 writer.write_all(&PADDING[..pad_len])?;
1403 let mut dictionary_tracker = DictionaryTracker::new(true);
1405 let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker(
1406 schema,
1407 &mut dictionary_tracker,
1408 &write_options,
1409 );
1410 let (meta, data) = write_message(&mut writer, encoded_message, &write_options)?;
1411 Ok(Self {
1412 writer,
1413 write_options,
1414 schema: Arc::new(schema.clone()),
1415 block_offsets: meta + data + header_size,
1416 dictionary_blocks: vec![],
1417 record_blocks: vec![],
1418 finished: false,
1419 dictionary_tracker,
1420 custom_metadata: HashMap::new(),
1421 data_gen,
1422 ipc_write_context: IpcWriteContext::default(),
1423 })
1424 }
1425
1426 pub fn write_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
1428 self.custom_metadata.insert(key.into(), value.into());
1429 }
1430
1431 pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1433 if self.finished {
1434 return Err(ArrowError::IpcError(
1435 "Cannot write record batch to file writer as it is closed".to_string(),
1436 ));
1437 }
1438
1439 let meta = self.data_gen.write(
1440 batch,
1441 &mut self.dictionary_tracker,
1442 &self.write_options,
1443 &mut self.ipc_write_context,
1444 &mut self.writer,
1445 )?;
1446
1447 for (header_len, body_len) in meta.dictionary_block_sizes {
1448 let block = crate::Block::new(
1449 self.block_offsets as i64,
1450 header_len as i32,
1451 body_len as i64,
1452 );
1453 self.dictionary_blocks.push(block);
1454 self.block_offsets += header_len + body_len;
1455 }
1456
1457 let block = crate::Block::new(
1459 self.block_offsets as i64,
1460 meta.padded_header_len as i32,
1461 meta.body_len as i64,
1462 );
1463 self.record_blocks.push(block);
1464 self.block_offsets += meta.padded_header_len + meta.body_len;
1465 Ok(())
1466 }
1467
1468 pub fn finish(&mut self) -> Result<(), ArrowError> {
1470 if self.finished {
1471 return Err(ArrowError::IpcError(
1472 "Cannot write footer to file writer as it is closed".to_string(),
1473 ));
1474 }
1475
1476 write_continuation(&mut self.writer, &self.write_options, 0)?;
1478
1479 let mut fbb = FlatBufferBuilder::new();
1480 let dictionaries = fbb.create_vector(&self.dictionary_blocks);
1481 let record_batches = fbb.create_vector(&self.record_blocks);
1482
1483 self.dictionary_tracker.clear();
1485 let schema = IpcSchemaEncoder::new()
1486 .with_dictionary_tracker(&mut self.dictionary_tracker)
1487 .schema_to_fb_offset(&mut fbb, &self.schema);
1488 let fb_custom_metadata = (!self.custom_metadata.is_empty())
1489 .then(|| crate::convert::metadata_to_fb(&mut fbb, &self.custom_metadata));
1490
1491 let root = {
1492 let mut footer_builder = crate::FooterBuilder::new(&mut fbb);
1493 footer_builder.add_version(self.write_options.metadata_version);
1494 footer_builder.add_schema(schema);
1495 footer_builder.add_dictionaries(dictionaries);
1496 footer_builder.add_recordBatches(record_batches);
1497 if let Some(fb_custom_metadata) = fb_custom_metadata {
1498 footer_builder.add_custom_metadata(fb_custom_metadata);
1499 }
1500 footer_builder.finish()
1501 };
1502 fbb.finish(root, None);
1503 let footer_data = fbb.finished_data();
1504 self.writer.write_all(footer_data)?;
1505 self.writer
1506 .write_all(&(footer_data.len() as i32).to_le_bytes())?;
1507 self.writer.write_all(&super::ARROW_MAGIC)?;
1508 self.writer.flush()?;
1509 self.finished = true;
1510
1511 Ok(())
1512 }
1513
1514 pub fn schema(&self) -> &SchemaRef {
1516 &self.schema
1517 }
1518
1519 pub fn get_ref(&self) -> &W {
1521 &self.writer
1522 }
1523
1524 pub fn get_mut(&mut self) -> &mut W {
1528 &mut self.writer
1529 }
1530
1531 pub fn flush(&mut self) -> Result<(), ArrowError> {
1535 self.writer.flush()?;
1536 Ok(())
1537 }
1538
1539 pub fn into_inner(mut self) -> Result<W, ArrowError> {
1548 if !self.finished {
1549 self.finish()?;
1551 }
1552 Ok(self.writer)
1553 }
1554}
1555
1556impl<W: Write> RecordBatchWriter for FileWriter<W> {
1557 fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1558 self.write(batch)
1559 }
1560
1561 fn close(mut self) -> Result<(), ArrowError> {
1562 self.finish()
1563 }
1564}
1565
1566pub struct StreamWriter<W> {
1640 writer: W,
1642 write_options: IpcWriteOptions,
1644 finished: bool,
1646 dictionary_tracker: DictionaryTracker,
1648
1649 data_gen: IpcDataGenerator,
1650
1651 ipc_write_context: IpcWriteContext,
1652}
1653
1654impl<W: Write> StreamWriter<BufWriter<W>> {
1655 pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1659 Self::try_new(BufWriter::new(writer), schema)
1660 }
1661}
1662
1663impl<W: Write> StreamWriter<W> {
1664 pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1672 let write_options = IpcWriteOptions::default();
1673 Self::try_new_with_options(writer, schema, write_options)
1674 }
1675
1676 pub fn try_new_with_options(
1682 mut writer: W,
1683 schema: &Schema,
1684 write_options: IpcWriteOptions,
1685 ) -> Result<Self, ArrowError> {
1686 ensure_supported_ipc_schema(schema)?;
1687
1688 let data_gen = IpcDataGenerator::default();
1689 let mut dictionary_tracker = DictionaryTracker::new(false);
1690
1691 let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker(
1693 schema,
1694 &mut dictionary_tracker,
1695 &write_options,
1696 );
1697 write_message(&mut writer, encoded_message, &write_options)?;
1698 Ok(Self {
1699 writer,
1700 write_options,
1701 finished: false,
1702 dictionary_tracker,
1703 data_gen,
1704 ipc_write_context: IpcWriteContext::default(),
1705 })
1706 }
1707
1708 pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1710 if self.finished {
1711 return Err(ArrowError::IpcError(
1712 "Cannot write record batch to stream writer as it is closed".to_string(),
1713 ));
1714 }
1715
1716 self.data_gen.write(
1717 batch,
1718 &mut self.dictionary_tracker,
1719 &self.write_options,
1720 &mut self.ipc_write_context,
1721 &mut self.writer,
1722 )?;
1723 Ok(())
1724 }
1725
1726 pub fn finish(&mut self) -> Result<(), ArrowError> {
1728 if self.finished {
1729 return Err(ArrowError::IpcError(
1730 "Cannot write footer to stream writer as it is closed".to_string(),
1731 ));
1732 }
1733
1734 write_continuation(&mut self.writer, &self.write_options, 0)?;
1735 self.writer.flush()?;
1736
1737 self.finished = true;
1738
1739 Ok(())
1740 }
1741
1742 pub fn get_ref(&self) -> &W {
1744 &self.writer
1745 }
1746
1747 pub fn get_mut(&mut self) -> &mut W {
1751 &mut self.writer
1752 }
1753
1754 pub fn flush(&mut self) -> Result<(), ArrowError> {
1758 self.writer.flush()?;
1759 Ok(())
1760 }
1761
1762 pub fn into_inner(mut self) -> Result<W, ArrowError> {
1800 if !self.finished {
1801 self.finish()?;
1803 }
1804 Ok(self.writer)
1805 }
1806}
1807
1808impl<W: Write> RecordBatchWriter for StreamWriter<W> {
1809 fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1810 self.write(batch)
1811 }
1812
1813 fn close(mut self) -> Result<(), ArrowError> {
1814 self.finish()
1815 }
1816}
1817
1818pub struct EncodedData {
1820 pub ipc_message: Vec<u8>,
1822 pub arrow_data: Vec<u8>,
1824}
1825pub fn write_message<W: Write>(
1827 mut writer: W,
1828 encoded: EncodedData,
1829 write_options: &IpcWriteOptions,
1830) -> Result<(usize, usize), ArrowError> {
1831 let arrow_data_len = encoded.arrow_data.len();
1832 if arrow_data_len % usize::from(write_options.alignment) != 0 {
1833 return Err(ArrowError::MemoryError(
1834 "Arrow data not aligned".to_string(),
1835 ));
1836 }
1837
1838 let a = usize::from(write_options.alignment - 1);
1839 let buffer = encoded.ipc_message;
1840 let flatbuf_size = buffer.len();
1841 let prefix_size = if write_options.write_legacy_ipc_format {
1842 4
1843 } else {
1844 8
1845 };
1846 let aligned_size = (flatbuf_size + prefix_size + a) & !a;
1847 let padding_bytes = aligned_size - flatbuf_size - prefix_size;
1848
1849 write_continuation(
1850 &mut writer,
1851 write_options,
1852 (aligned_size - prefix_size) as i32,
1853 )?;
1854
1855 if flatbuf_size > 0 {
1857 writer.write_all(&buffer)?;
1858 }
1859 writer.write_all(&PADDING[..padding_bytes])?;
1861
1862 let body_len = if arrow_data_len > 0 {
1864 write_body_buffers(&mut writer, &encoded.arrow_data, write_options.alignment)?
1865 } else {
1866 0
1867 };
1868
1869 Ok((aligned_size, body_len))
1870}
1871
1872fn write_body_buffers<W: Write>(
1873 mut writer: W,
1874 data: &[u8],
1875 alignment: u8,
1876) -> Result<usize, ArrowError> {
1877 let len = data.len();
1878 let pad_len = pad_to_alignment(alignment, len);
1879 let total_len = len + pad_len;
1880
1881 writer.write_all(data)?;
1883 if pad_len > 0 {
1884 writer.write_all(&PADDING[..pad_len])?;
1885 }
1886
1887 Ok(total_len)
1888}
1889
1890fn write_continuation<W: Write>(
1893 mut writer: W,
1894 write_options: &IpcWriteOptions,
1895 total_len: i32,
1896) -> Result<usize, ArrowError> {
1897 let mut written = 8;
1898
1899 match write_options.metadata_version {
1901 crate::MetadataVersion::V1 | crate::MetadataVersion::V2 | crate::MetadataVersion::V3 => {
1902 unreachable!("Options with the metadata version cannot be created")
1903 }
1904 crate::MetadataVersion::V4 => {
1905 if !write_options.write_legacy_ipc_format {
1906 writer.write_all(&CONTINUATION_MARKER)?;
1908 written = 4;
1909 }
1910 writer.write_all(&total_len.to_le_bytes()[..])?;
1911 }
1912 crate::MetadataVersion::V5 => {
1913 writer.write_all(&CONTINUATION_MARKER)?;
1915 writer.write_all(&total_len.to_le_bytes()[..])?;
1916 }
1917 z => panic!("Unsupported crate::MetadataVersion {z:?}"),
1918 };
1919
1920 Ok(written)
1921}
1922
1923fn has_validity_bitmap(data_type: &DataType, write_options: &IpcWriteOptions) -> bool {
1927 if write_options.metadata_version < crate::MetadataVersion::V5 {
1928 !matches!(data_type, DataType::Null)
1929 } else {
1930 !matches!(
1931 data_type,
1932 DataType::Null | DataType::Union(_, _) | DataType::RunEndEncoded(_, _)
1933 )
1934 }
1935}
1936
1937#[inline]
1939fn buffer_need_truncate(
1940 array_offset: usize,
1941 buffer: &Buffer,
1942 spec: &BufferSpec,
1943 min_length: usize,
1944) -> bool {
1945 spec != &BufferSpec::AlwaysNull && (array_offset != 0 || min_length < buffer.len())
1946}
1947
1948#[inline]
1950fn get_buffer_element_width(spec: &BufferSpec) -> usize {
1951 match spec {
1952 BufferSpec::FixedWidth { byte_width, .. } => *byte_width,
1953 _ => 0,
1954 }
1955}
1956
1957fn reencode_offsets<O: OffsetSizeTrait>(
1960 offsets: &Buffer,
1961 data: &ArrayData,
1962) -> (Buffer, usize, usize) {
1963 let offsets_slice: &[O] = offsets.typed_data::<O>();
1964 let offset_slice = &offsets_slice[data.offset()..data.offset() + data.len() + 1];
1965
1966 let start_offset = offset_slice.first().unwrap();
1967 let end_offset = offset_slice.last().unwrap();
1968
1969 let offsets = match start_offset.as_usize() {
1970 0 => {
1971 let size = size_of::<O>();
1972 offsets.slice_with_length(data.offset() * size, (data.len() + 1) * size)
1973 }
1974 _ => offset_slice.iter().map(|x| *x - *start_offset).collect(),
1975 };
1976
1977 let start_offset = start_offset.as_usize();
1978 let end_offset = end_offset.as_usize();
1979
1980 (offsets, start_offset, end_offset - start_offset)
1981}
1982
1983fn get_byte_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, Buffer) {
1989 if data.is_empty() {
1990 let mut offsets = MutableBuffer::new(size_of::<O>());
1993 offsets.extend_from_slice(O::usize_as(0).to_byte_slice());
1994 return (offsets.into(), MutableBuffer::new(0).into());
1995 }
1996
1997 let (offsets, original_start_offset, len) = reencode_offsets::<O>(&data.buffers()[0], data);
1998 let values = data.buffers()[1].slice_with_length(original_start_offset, len);
1999 (offsets, values)
2000}
2001
2002fn get_list_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, ArrayData) {
2005 if data.is_empty() {
2006 let mut offsets = MutableBuffer::new(size_of::<O>());
2009 offsets.extend_from_slice(O::usize_as(0).to_byte_slice());
2010 return (offsets.into(), data.child_data()[0].slice(0, 0));
2011 }
2012
2013 let (offsets, original_start_offset, len) = reencode_offsets::<O>(&data.buffers()[0], data);
2014 let child_data = data.child_data()[0].slice(original_start_offset, len);
2015 (offsets, child_data)
2016}
2017
2018fn get_list_view_array_buffers<O: OffsetSizeTrait>(
2024 data: &ArrayData,
2025) -> (Buffer, Buffer, ArrayData) {
2026 if data.is_empty() {
2027 return (
2028 MutableBuffer::new(0).into(),
2029 MutableBuffer::new(0).into(),
2030 data.child_data()[0].slice(0, 0),
2031 );
2032 }
2033
2034 let offsets = &data.buffers()[0];
2035 let sizes = &data.buffers()[1];
2036
2037 let element_size = std::mem::size_of::<O>();
2038 let offsets_slice =
2039 offsets.slice_with_length(data.offset() * element_size, data.len() * element_size);
2040 let sizes_slice =
2041 sizes.slice_with_length(data.offset() * element_size, data.len() * element_size);
2042
2043 let child_data = data.child_data()[0].clone();
2044
2045 (offsets_slice, sizes_slice, child_data)
2046}
2047
2048fn get_or_truncate_buffer(array_data: &ArrayData) -> Buffer {
2055 let buffer = &array_data.buffers()[0];
2056 let layout = layout(array_data.data_type());
2057 let spec = &layout.buffers[0];
2058
2059 let byte_width = get_buffer_element_width(spec);
2060 let min_length = array_data.len() * byte_width;
2061 if buffer_need_truncate(array_data.offset(), buffer, spec, min_length) {
2062 let byte_offset = array_data.offset() * byte_width;
2063 let buffer_length = min(min_length, buffer.len() - byte_offset);
2064 buffer.slice_with_length(byte_offset, buffer_length)
2065 } else {
2066 buffer.clone()
2067 }
2068}
2069
2070fn write_array_data(
2076 array_data: &ArrayData,
2077 meta: &mut IpcMetadataBuilder,
2078 sink: &mut IpcBodySink<'_>,
2079 offset: i64,
2080 compression_codec: Option<CompressionCodec>,
2081 ipc_write_context: &mut IpcWriteContext,
2082 write_options: &IpcWriteOptions,
2083) -> Result<i64, ArrowError> {
2084 let mut offset = offset;
2085 let num_rows = array_data.len();
2086 if !matches!(array_data.data_type(), DataType::Null) {
2087 meta.nodes.push(crate::FieldNode::new(
2088 num_rows as i64,
2089 array_data.null_count() as i64,
2090 ));
2091 } else {
2092 meta.nodes
2094 .push(crate::FieldNode::new(num_rows as i64, num_rows as i64));
2095 }
2096 if has_validity_bitmap(array_data.data_type(), write_options) {
2097 let null_buffer = match array_data.nulls() {
2099 None => {
2100 let num_bytes = bit_util::ceil(num_rows, 8);
2102 let buffer = MutableBuffer::new(num_bytes);
2103 let buffer = buffer.with_bitset(num_bytes, true);
2104 buffer.into()
2105 }
2106 Some(buffer) => buffer.inner().sliced(),
2107 };
2108
2109 offset = encode_sink_buffer(
2110 null_buffer,
2111 meta,
2112 sink,
2113 offset,
2114 compression_codec,
2115 ipc_write_context,
2116 write_options.alignment,
2117 )?;
2118 }
2119
2120 let data_type = array_data.data_type();
2121 if matches!(data_type, DataType::Binary | DataType::Utf8) {
2122 let (offsets, values) = get_byte_array_buffers::<i32>(array_data);
2123 for buffer in [offsets, values] {
2124 offset = encode_sink_buffer(
2125 buffer,
2126 meta,
2127 sink,
2128 offset,
2129 compression_codec,
2130 ipc_write_context,
2131 write_options.alignment,
2132 )?;
2133 }
2134 } else if matches!(data_type, DataType::BinaryView | DataType::Utf8View) {
2135 let views = get_or_truncate_buffer(array_data);
2142 offset = encode_sink_buffer(
2143 views,
2144 meta,
2145 sink,
2146 offset,
2147 compression_codec,
2148 ipc_write_context,
2149 write_options.alignment,
2150 )?;
2151
2152 for buffer in array_data.buffers().iter().skip(1) {
2153 offset = encode_sink_buffer(
2154 buffer.clone(),
2155 meta,
2156 sink,
2157 offset,
2158 compression_codec,
2159 ipc_write_context,
2160 write_options.alignment,
2161 )?;
2162 }
2163 } else if matches!(data_type, DataType::LargeBinary | DataType::LargeUtf8) {
2164 let (offsets, values) = get_byte_array_buffers::<i64>(array_data);
2165 for buffer in [offsets, values] {
2166 offset = encode_sink_buffer(
2167 buffer,
2168 meta,
2169 sink,
2170 offset,
2171 compression_codec,
2172 ipc_write_context,
2173 write_options.alignment,
2174 )?;
2175 }
2176 } else if DataType::is_numeric(data_type)
2177 || DataType::is_temporal(data_type)
2178 || matches!(
2179 array_data.data_type(),
2180 DataType::FixedSizeBinary(_) | DataType::Dictionary(_, _)
2181 )
2182 {
2183 assert_eq!(array_data.buffers().len(), 1);
2185
2186 let buffer = get_or_truncate_buffer(array_data);
2187 offset = encode_sink_buffer(
2188 buffer,
2189 meta,
2190 sink,
2191 offset,
2192 compression_codec,
2193 ipc_write_context,
2194 write_options.alignment,
2195 )?;
2196 } else if matches!(data_type, DataType::Boolean) {
2197 assert_eq!(array_data.buffers().len(), 1);
2200
2201 let buffer = &array_data.buffers()[0];
2202 let buffer = buffer.bit_slice(array_data.offset(), array_data.len());
2203 offset = encode_sink_buffer(
2204 buffer,
2205 meta,
2206 sink,
2207 offset,
2208 compression_codec,
2209 ipc_write_context,
2210 write_options.alignment,
2211 )?;
2212 } else if matches!(
2213 data_type,
2214 DataType::List(_) | DataType::LargeList(_) | DataType::Map(_, _)
2215 ) {
2216 assert_eq!(array_data.buffers().len(), 1);
2217 assert_eq!(array_data.child_data().len(), 1);
2218
2219 let (offsets, sliced_child_data) = match data_type {
2221 DataType::List(_) => get_list_array_buffers::<i32>(array_data),
2222 DataType::Map(_, _) => get_list_array_buffers::<i32>(array_data),
2223 DataType::LargeList(_) => get_list_array_buffers::<i64>(array_data),
2224 _ => unreachable!(),
2225 };
2226 offset = encode_sink_buffer(
2227 offsets,
2228 meta,
2229 sink,
2230 offset,
2231 compression_codec,
2232 ipc_write_context,
2233 write_options.alignment,
2234 )?;
2235 offset = write_array_data(
2236 &sliced_child_data,
2237 meta,
2238 sink,
2239 offset,
2240 compression_codec,
2241 ipc_write_context,
2242 write_options,
2243 )?;
2244 return Ok(offset);
2245 } else if matches!(
2246 data_type,
2247 DataType::ListView(_) | DataType::LargeListView(_)
2248 ) {
2249 assert_eq!(array_data.buffers().len(), 2); assert_eq!(array_data.child_data().len(), 1);
2251
2252 let (offsets, sizes, child_data) = match data_type {
2253 DataType::ListView(_) => get_list_view_array_buffers::<i32>(array_data),
2254 DataType::LargeListView(_) => get_list_view_array_buffers::<i64>(array_data),
2255 _ => unreachable!(),
2256 };
2257
2258 offset = encode_sink_buffer(
2259 offsets,
2260 meta,
2261 sink,
2262 offset,
2263 compression_codec,
2264 ipc_write_context,
2265 write_options.alignment,
2266 )?;
2267 offset = encode_sink_buffer(
2268 sizes,
2269 meta,
2270 sink,
2271 offset,
2272 compression_codec,
2273 ipc_write_context,
2274 write_options.alignment,
2275 )?;
2276
2277 offset = write_array_data(
2278 &child_data,
2279 meta,
2280 sink,
2281 offset,
2282 compression_codec,
2283 ipc_write_context,
2284 write_options,
2285 )?;
2286 return Ok(offset);
2287 } else if let DataType::FixedSizeList(_, fixed_size) = data_type {
2288 assert_eq!(array_data.child_data().len(), 1);
2289 let fixed_size = *fixed_size as usize;
2290
2291 let child_offset = array_data.offset() * fixed_size;
2292 let child_length = array_data.len() * fixed_size;
2293 let child_data = array_data.child_data()[0].slice(child_offset, child_length);
2294
2295 offset = write_array_data(
2296 &child_data,
2297 meta,
2298 sink,
2299 offset,
2300 compression_codec,
2301 ipc_write_context,
2302 write_options,
2303 )?;
2304 return Ok(offset);
2305 } else {
2306 for buffer in array_data.buffers() {
2307 offset = encode_sink_buffer(
2308 buffer.clone(),
2309 meta,
2310 sink,
2311 offset,
2312 compression_codec,
2313 ipc_write_context,
2314 write_options.alignment,
2315 )?;
2316 }
2317 }
2318
2319 match array_data.data_type() {
2320 DataType::Dictionary(_, _) => {}
2321 DataType::RunEndEncoded(_, _) => {
2322 let arr = unslice_run_array(array_data.clone())?;
2324 for data_ref in arr.child_data() {
2326 offset = write_array_data(
2328 data_ref,
2329 meta,
2330 sink,
2331 offset,
2332 compression_codec,
2333 ipc_write_context,
2334 write_options,
2335 )?;
2336 }
2337 }
2338 _ => {
2339 for data_ref in array_data.child_data() {
2341 offset = write_array_data(
2343 data_ref,
2344 meta,
2345 sink,
2346 offset,
2347 compression_codec,
2348 ipc_write_context,
2349 write_options,
2350 )?;
2351 }
2352 }
2353 }
2354 Ok(offset)
2355}
2356
2357fn encode_sink_buffer(
2371 buffer: Buffer,
2372 ipc_meta_data: &mut IpcMetadataBuilder,
2373 sink: &mut IpcBodySink<'_>,
2374 offset: i64,
2375 compression_codec: Option<CompressionCodec>,
2376 ipc_write_context: &mut IpcWriteContext,
2377 alignment: u8,
2378) -> Result<i64, ArrowError> {
2379 let (encoded, len) = match compression_codec {
2380 None => {
2381 let len = buffer.len() as i64;
2382 (EncodedBuffer::Raw(buffer), len)
2383 }
2384 Some(codec) => {
2385 let mut scratch = Vec::new();
2386 let written =
2387 codec.compress_to_vec(buffer.as_slice(), &mut scratch, ipc_write_context)?;
2388 let len = i64::try_from(written)
2389 .map_err(|e| ArrowError::InvalidArgumentError(format!("{e}")))?;
2390 (EncodedBuffer::Compressed(scratch), len)
2391 }
2392 };
2393
2394 let pad_len = pad_to_alignment(alignment, len as usize);
2395 sink.write(pad_len, encoded);
2396 ipc_meta_data.buffers.push(crate::Buffer::new(offset, len));
2397 Ok(offset + len + pad_len as i64)
2398}
2399
2400const PADDING: [u8; 64] = [0; 64];
2401
2402#[inline]
2408fn estimate_encoded_buffer_count(dt: &DataType) -> usize {
2409 match dt {
2410 DataType::Null => 0,
2411
2412 DataType::Binary | DataType::Utf8 | DataType::LargeBinary | DataType::LargeUtf8 => 3,
2413
2414 DataType::BinaryView | DataType::Utf8View => 3,
2415
2416 DataType::List(f) | DataType::LargeList(f) | DataType::Map(f, _) => {
2417 2 + estimate_encoded_buffer_count(f.data_type())
2418 }
2419
2420 DataType::ListView(f) | DataType::LargeListView(f) => {
2421 3 + estimate_encoded_buffer_count(f.data_type())
2422 }
2423
2424 DataType::FixedSizeList(f, _) => 1 + estimate_encoded_buffer_count(f.data_type()),
2425
2426 DataType::Struct(fields) => {
2427 1 + fields
2428 .iter()
2429 .map(|f| estimate_encoded_buffer_count(f.data_type()))
2430 .sum::<usize>()
2431 }
2432
2433 DataType::Dictionary(_, _) => 2,
2435
2436 DataType::Union(fields, UnionMode::Sparse) => {
2437 1 + fields
2438 .iter()
2439 .map(|(_, f)| estimate_encoded_buffer_count(f.data_type()))
2440 .sum::<usize>()
2441 }
2442 DataType::Union(fields, UnionMode::Dense) => {
2443 2 + fields
2444 .iter()
2445 .map(|(_, f)| estimate_encoded_buffer_count(f.data_type()))
2446 .sum::<usize>()
2447 }
2448
2449 DataType::RunEndEncoded(run_ends, values) => {
2450 estimate_encoded_buffer_count(run_ends.data_type())
2451 + estimate_encoded_buffer_count(values.data_type())
2452 }
2453 _ => 2,
2455 }
2456}
2457
2458#[inline]
2460fn pad_to_alignment(alignment: u8, len: usize) -> usize {
2461 let a = usize::from(alignment - 1);
2462 ((len + a) & !a) - len
2463}
2464
2465#[cfg(test)]
2466mod tests {
2467 use std::hash::Hasher;
2468 use std::io::Cursor;
2469 use std::io::Seek;
2470
2471 use arrow_array::builder::FixedSizeListBuilder;
2472 use arrow_array::builder::Float32Builder;
2473 use arrow_array::builder::Int64Builder;
2474 use arrow_array::builder::MapBuilder;
2475 use arrow_array::builder::StringViewBuilder;
2476 use arrow_array::builder::UnionBuilder;
2477 use arrow_array::builder::{
2478 GenericListBuilder, GenericListViewBuilder, ListBuilder, StringBuilder,
2479 };
2480 use arrow_array::builder::{PrimitiveRunBuilder, UInt32Builder};
2481 use arrow_array::types::*;
2482 use arrow_buffer::ScalarBuffer;
2483
2484 use crate::MetadataVersion;
2485 use crate::convert::fb_to_schema;
2486 use crate::reader::*;
2487 use crate::root_as_footer;
2488
2489 use super::*;
2490
2491 fn serialize_file(rb: &RecordBatch) -> Vec<u8> {
2492 let mut writer = FileWriter::try_new(vec![], rb.schema_ref()).unwrap();
2493 writer.write(rb).unwrap();
2494 writer.finish().unwrap();
2495 writer.into_inner().unwrap()
2496 }
2497
2498 fn deserialize_file(bytes: Vec<u8>) -> RecordBatch {
2499 let mut reader = FileReader::try_new(Cursor::new(bytes), None).unwrap();
2500 reader.next().unwrap().unwrap()
2501 }
2502
2503 fn serialize_stream(record: &RecordBatch) -> Vec<u8> {
2504 const IPC_ALIGNMENT: usize = 8;
2508
2509 let mut stream_writer = StreamWriter::try_new_with_options(
2510 vec![],
2511 record.schema_ref(),
2512 IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
2513 )
2514 .unwrap();
2515 stream_writer.write(record).unwrap();
2516 stream_writer.finish().unwrap();
2517 stream_writer.into_inner().unwrap()
2518 }
2519
2520 fn deserialize_stream(bytes: Vec<u8>) -> RecordBatch {
2521 let mut stream_reader = StreamReader::try_new(Cursor::new(bytes), None).unwrap();
2522 stream_reader.next().unwrap().unwrap()
2523 }
2524
2525 #[test]
2526 #[cfg(feature = "lz4")]
2527 fn test_write_empty_record_batch_lz4_compression() {
2528 let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
2529 let values: Vec<Option<i32>> = vec![];
2530 let array = Int32Array::from(values);
2531 let record_batch =
2532 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2533
2534 let mut file = tempfile::tempfile().unwrap();
2535
2536 {
2537 let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2538 .unwrap()
2539 .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
2540 .unwrap();
2541
2542 let mut writer =
2543 FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2544 writer.write(&record_batch).unwrap();
2545 writer.finish().unwrap();
2546 }
2547 file.rewind().unwrap();
2548 {
2549 let reader = FileReader::try_new(file, None).unwrap();
2551 for read_batch in reader {
2552 read_batch
2553 .unwrap()
2554 .columns()
2555 .iter()
2556 .zip(record_batch.columns())
2557 .for_each(|(a, b)| {
2558 assert_eq!(a.data_type(), b.data_type());
2559 assert_eq!(a.len(), b.len());
2560 assert_eq!(a.null_count(), b.null_count());
2561 });
2562 }
2563 }
2564 }
2565
2566 #[test]
2567 #[cfg(feature = "lz4")]
2568 fn test_write_file_with_lz4_compression() {
2569 let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
2570 let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
2571 let array = Int32Array::from(values);
2572 let record_batch =
2573 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2574
2575 let mut file = tempfile::tempfile().unwrap();
2576 {
2577 let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2578 .unwrap()
2579 .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
2580 .unwrap();
2581
2582 let mut writer =
2583 FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2584 writer.write(&record_batch).unwrap();
2585 writer.finish().unwrap();
2586 }
2587 file.rewind().unwrap();
2588 {
2589 let reader = FileReader::try_new(file, None).unwrap();
2591 for read_batch in reader {
2592 read_batch
2593 .unwrap()
2594 .columns()
2595 .iter()
2596 .zip(record_batch.columns())
2597 .for_each(|(a, b)| {
2598 assert_eq!(a.data_type(), b.data_type());
2599 assert_eq!(a.len(), b.len());
2600 assert_eq!(a.null_count(), b.null_count());
2601 });
2602 }
2603 }
2604 }
2605
2606 #[test]
2607 #[cfg(feature = "zstd")]
2608 fn test_write_file_with_zstd_compression() {
2609 let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
2610 let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
2611 let array = Int32Array::from(values);
2612 let record_batch =
2613 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2614 let mut file = tempfile::tempfile().unwrap();
2615 {
2616 let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2617 .unwrap()
2618 .try_with_compression(Some(crate::CompressionType::ZSTD))
2619 .unwrap()
2620 .try_with_compression_level(Some(1))
2621 .unwrap();
2622
2623 let mut writer =
2624 FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2625 writer.write(&record_batch).unwrap();
2626 writer.finish().unwrap();
2627 }
2628 file.rewind().unwrap();
2629 {
2630 let reader = FileReader::try_new(file, None).unwrap();
2632 for read_batch in reader {
2633 read_batch
2634 .unwrap()
2635 .columns()
2636 .iter()
2637 .zip(record_batch.columns())
2638 .for_each(|(a, b)| {
2639 assert_eq!(a.data_type(), b.data_type());
2640 assert_eq!(a.len(), b.len());
2641 assert_eq!(a.null_count(), b.null_count());
2642 });
2643 }
2644 }
2645 }
2646
2647 #[test]
2648 fn test_write_file() {
2649 let schema = Schema::new(vec![Field::new("field1", DataType::UInt32, true)]);
2650 let values: Vec<Option<u32>> = vec![
2651 Some(999),
2652 None,
2653 Some(235),
2654 Some(123),
2655 None,
2656 None,
2657 None,
2658 None,
2659 None,
2660 ];
2661 let array1 = UInt32Array::from(values);
2662 let batch =
2663 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array1) as ArrayRef])
2664 .unwrap();
2665 let mut file = tempfile::tempfile().unwrap();
2666 {
2667 let mut writer = FileWriter::try_new(&mut file, &schema).unwrap();
2668
2669 writer.write(&batch).unwrap();
2670 writer.finish().unwrap();
2671 }
2672 file.rewind().unwrap();
2673
2674 {
2675 let mut reader = FileReader::try_new(file, None).unwrap();
2676 while let Some(Ok(read_batch)) = reader.next() {
2677 read_batch
2678 .columns()
2679 .iter()
2680 .zip(batch.columns())
2681 .for_each(|(a, b)| {
2682 assert_eq!(a.data_type(), b.data_type());
2683 assert_eq!(a.len(), b.len());
2684 assert_eq!(a.null_count(), b.null_count());
2685 });
2686 }
2687 }
2688 }
2689
2690 #[test]
2691 fn test_empty_utf8_ipc_writes_nonempty_offsets_buffer() {
2692 let name = StringArray::from(Vec::<String>::new());
2693 let (offsets, values) = get_byte_array_buffers::<i32>(&name.to_data());
2694
2695 assert_eq!(name.len(), 0);
2696 assert_eq!(
2697 offsets.len(),
2698 std::mem::size_of::<i32>(),
2699 "offsets buffer should contain one zero i32 offset"
2700 );
2701 assert_eq!(values.len(), 0, "values buffer should remain empty");
2702 }
2703
2704 #[test]
2705 fn test_empty_large_utf8_ipc_writes_nonempty_offsets_buffer() {
2706 let name = LargeStringArray::from(Vec::<String>::new());
2707 let (offsets, values) = get_byte_array_buffers::<i64>(&name.to_data());
2708
2709 assert_eq!(name.len(), 0);
2710 assert_eq!(
2711 offsets.len(),
2712 std::mem::size_of::<i64>(),
2713 "offsets buffer should contain one zero i64 offset"
2714 );
2715 assert_eq!(values.len(), 0, "values buffer should remain empty");
2716 }
2717
2718 #[test]
2719 fn test_empty_list_ipc_writes_nonempty_offsets_buffer() {
2720 let list = GenericListBuilder::<i32, _>::new(UInt32Builder::new()).finish();
2721 let (offsets, child_data) = get_list_array_buffers::<i32>(&list.to_data());
2722
2723 assert_eq!(list.len(), 0);
2724 assert_eq!(
2725 offsets.len(),
2726 std::mem::size_of::<i32>(),
2727 "offsets buffer should contain one zero i32 offset"
2728 );
2729 assert_eq!(child_data.len(), 0, "child data should remain empty");
2730 }
2731
2732 #[test]
2733 fn test_empty_large_list_ipc_writes_nonempty_offsets_buffer() {
2734 let list = GenericListBuilder::<i64, _>::new(UInt32Builder::new()).finish();
2735 let (offsets, child_data) = get_list_array_buffers::<i64>(&list.to_data());
2736
2737 assert_eq!(list.len(), 0);
2738 assert_eq!(
2739 offsets.len(),
2740 std::mem::size_of::<i64>(),
2741 "offsets buffer should contain one zero i64 offset"
2742 );
2743 assert_eq!(child_data.len(), 0, "child data should remain empty");
2744 }
2745
2746 fn write_null_file(options: IpcWriteOptions) {
2747 let schema = Schema::new(vec![
2748 Field::new("nulls", DataType::Null, true),
2749 Field::new("int32s", DataType::Int32, false),
2750 Field::new("nulls2", DataType::Null, true),
2751 Field::new("f64s", DataType::Float64, false),
2752 ]);
2753 let array1 = NullArray::new(32);
2754 let array2 = Int32Array::from(vec![1; 32]);
2755 let array3 = NullArray::new(32);
2756 let array4 = Float64Array::from(vec![f64::NAN; 32]);
2757 let batch = RecordBatch::try_new(
2758 Arc::new(schema.clone()),
2759 vec![
2760 Arc::new(array1) as ArrayRef,
2761 Arc::new(array2) as ArrayRef,
2762 Arc::new(array3) as ArrayRef,
2763 Arc::new(array4) as ArrayRef,
2764 ],
2765 )
2766 .unwrap();
2767 let mut file = tempfile::tempfile().unwrap();
2768 {
2769 let mut writer = FileWriter::try_new_with_options(&mut file, &schema, options).unwrap();
2770
2771 writer.write(&batch).unwrap();
2772 writer.finish().unwrap();
2773 }
2774
2775 file.rewind().unwrap();
2776
2777 {
2778 let reader = FileReader::try_new(file, None).unwrap();
2779 reader.for_each(|maybe_batch| {
2780 maybe_batch
2781 .unwrap()
2782 .columns()
2783 .iter()
2784 .zip(batch.columns())
2785 .for_each(|(a, b)| {
2786 assert_eq!(a.data_type(), b.data_type());
2787 assert_eq!(a.len(), b.len());
2788 assert_eq!(a.null_count(), b.null_count());
2789 });
2790 });
2791 }
2792 }
2793 #[test]
2794 fn test_write_null_file_v4() {
2795 write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap());
2796 write_null_file(IpcWriteOptions::try_new(8, true, MetadataVersion::V4).unwrap());
2797 write_null_file(IpcWriteOptions::try_new(64, false, MetadataVersion::V4).unwrap());
2798 write_null_file(IpcWriteOptions::try_new(64, true, MetadataVersion::V4).unwrap());
2799 }
2800
2801 #[test]
2802 fn test_write_null_file_v5() {
2803 write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap());
2804 write_null_file(IpcWriteOptions::try_new(64, false, MetadataVersion::V5).unwrap());
2805 }
2806
2807 #[test]
2808 fn track_union_nested_dict() {
2809 let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2810
2811 let array = Arc::new(inner) as ArrayRef;
2812
2813 #[allow(deprecated)]
2815 let dctfield = Field::new_dict("dict", array.data_type().clone(), false, 0, false);
2816 let union_fields = [(0, Arc::new(dctfield))].into_iter().collect();
2817
2818 let types = [0, 0, 0].into_iter().collect::<ScalarBuffer<i8>>();
2819 let offsets = [0, 1, 2].into_iter().collect::<ScalarBuffer<i32>>();
2820
2821 let union = UnionArray::try_new(union_fields, types, Some(offsets), vec![array]).unwrap();
2822
2823 let schema = Arc::new(Schema::new(vec![Field::new(
2824 "union",
2825 union.data_type().clone(),
2826 false,
2827 )]));
2828
2829 let r#gen = IpcDataGenerator::default();
2830 let mut dict_tracker = DictionaryTracker::new(false);
2831 r#gen.schema_to_bytes_with_dictionary_tracker(
2832 &schema,
2833 &mut dict_tracker,
2834 &IpcWriteOptions::default(),
2835 );
2836
2837 let batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap();
2838
2839 r#gen
2840 .encode(
2841 &batch,
2842 &mut dict_tracker,
2843 &Default::default(),
2844 &mut Default::default(),
2845 )
2846 .unwrap();
2847
2848 assert!(dict_tracker.written.contains_key(&0));
2851 }
2852
2853 #[test]
2854 fn track_struct_nested_dict() {
2855 let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2856
2857 let array = Arc::new(inner) as ArrayRef;
2858
2859 #[allow(deprecated)]
2861 let dctfield = Arc::new(Field::new_dict(
2862 "dict",
2863 array.data_type().clone(),
2864 false,
2865 2,
2866 false,
2867 ));
2868
2869 let s = StructArray::from(vec![(dctfield, array)]);
2870 let struct_array = Arc::new(s) as ArrayRef;
2871
2872 let schema = Arc::new(Schema::new(vec![Field::new(
2873 "struct",
2874 struct_array.data_type().clone(),
2875 false,
2876 )]));
2877
2878 let r#gen = IpcDataGenerator::default();
2879 let mut dict_tracker = DictionaryTracker::new(false);
2880 r#gen.schema_to_bytes_with_dictionary_tracker(
2881 &schema,
2882 &mut dict_tracker,
2883 &IpcWriteOptions::default(),
2884 );
2885
2886 let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
2887
2888 r#gen
2889 .encode(
2890 &batch,
2891 &mut dict_tracker,
2892 &Default::default(),
2893 &mut Default::default(),
2894 )
2895 .unwrap();
2896
2897 assert!(dict_tracker.written.contains_key(&0));
2898 }
2899
2900 fn write_union_file(options: IpcWriteOptions) {
2901 let schema = Schema::new(vec![Field::new_union(
2902 "union",
2903 vec![0, 1],
2904 vec![
2905 Field::new("a", DataType::Int32, false),
2906 Field::new("c", DataType::Float64, false),
2907 ],
2908 UnionMode::Sparse,
2909 )]);
2910 let mut builder = UnionBuilder::with_capacity_sparse(5);
2911 builder.append::<Int32Type>("a", 1).unwrap();
2912 builder.append_null::<Int32Type>("a").unwrap();
2913 builder.append::<Float64Type>("c", 3.0).unwrap();
2914 builder.append_null::<Float64Type>("c").unwrap();
2915 builder.append::<Int32Type>("a", 4).unwrap();
2916 let union = builder.build().unwrap();
2917
2918 let batch =
2919 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(union) as ArrayRef])
2920 .unwrap();
2921
2922 let mut file = tempfile::tempfile().unwrap();
2923 {
2924 let mut writer = FileWriter::try_new_with_options(&mut file, &schema, options).unwrap();
2925
2926 writer.write(&batch).unwrap();
2927 writer.finish().unwrap();
2928 }
2929 file.rewind().unwrap();
2930
2931 {
2932 let reader = FileReader::try_new(file, None).unwrap();
2933 reader.for_each(|maybe_batch| {
2934 maybe_batch
2935 .unwrap()
2936 .columns()
2937 .iter()
2938 .zip(batch.columns())
2939 .for_each(|(a, b)| {
2940 assert_eq!(a.data_type(), b.data_type());
2941 assert_eq!(a.len(), b.len());
2942 assert_eq!(a.null_count(), b.null_count());
2943 });
2944 });
2945 }
2946 }
2947
2948 #[test]
2949 fn test_write_union_file_v4_v5() {
2950 write_union_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap());
2951 write_union_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap());
2952 }
2953
2954 #[test]
2955 fn test_write_view_types() {
2956 const LONG_TEST_STRING: &str =
2957 "This is a long string to make sure binary view array handles it";
2958 let schema = Schema::new(vec![
2959 Field::new("field1", DataType::BinaryView, true),
2960 Field::new("field2", DataType::Utf8View, true),
2961 ]);
2962 let values: Vec<Option<&[u8]>> = vec![
2963 Some(b"foo"),
2964 Some(b"bar"),
2965 Some(LONG_TEST_STRING.as_bytes()),
2966 ];
2967 let binary_array = BinaryViewArray::from_iter(values);
2968 let utf8_array =
2969 StringViewArray::from_iter(vec![Some("foo"), Some("bar"), Some(LONG_TEST_STRING)]);
2970 let record_batch = RecordBatch::try_new(
2971 Arc::new(schema.clone()),
2972 vec![Arc::new(binary_array), Arc::new(utf8_array)],
2973 )
2974 .unwrap();
2975
2976 let mut file = tempfile::tempfile().unwrap();
2977 {
2978 let mut writer = FileWriter::try_new(&mut file, &schema).unwrap();
2979 writer.write(&record_batch).unwrap();
2980 writer.finish().unwrap();
2981 }
2982 file.rewind().unwrap();
2983 {
2984 let mut reader = FileReader::try_new(&file, None).unwrap();
2985 let read_batch = reader.next().unwrap().unwrap();
2986 read_batch
2987 .columns()
2988 .iter()
2989 .zip(record_batch.columns())
2990 .for_each(|(a, b)| {
2991 assert_eq!(a, b);
2992 });
2993 }
2994 file.rewind().unwrap();
2995 {
2996 let mut reader = FileReader::try_new(&file, Some(vec![0])).unwrap();
2997 let read_batch = reader.next().unwrap().unwrap();
2998 assert_eq!(read_batch.num_columns(), 1);
2999 let read_array = read_batch.column(0);
3000 let write_array = record_batch.column(0);
3001 assert_eq!(read_array, write_array);
3002 }
3003 }
3004
3005 #[test]
3006 fn truncate_ipc_record_batch() {
3007 fn create_batch(rows: usize) -> RecordBatch {
3008 let schema = Schema::new(vec![
3009 Field::new("a", DataType::Int32, false),
3010 Field::new("b", DataType::Utf8, false),
3011 ]);
3012
3013 let a = Int32Array::from_iter_values(0..rows as i32);
3014 let b = StringArray::from_iter_values((0..rows).map(|i| i.to_string()));
3015
3016 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap()
3017 }
3018
3019 let big_record_batch = create_batch(65536);
3020
3021 let length = 5;
3022 let small_record_batch = create_batch(length);
3023
3024 let offset = 2;
3025 let record_batch_slice = big_record_batch.slice(offset, length);
3026 assert!(
3027 serialize_stream(&big_record_batch).len() > serialize_stream(&small_record_batch).len()
3028 );
3029 assert_eq!(
3030 serialize_stream(&small_record_batch).len(),
3031 serialize_stream(&record_batch_slice).len()
3032 );
3033
3034 assert_eq!(
3035 deserialize_stream(serialize_stream(&record_batch_slice)),
3036 record_batch_slice
3037 );
3038 }
3039
3040 #[test]
3041 fn truncate_ipc_record_batch_with_nulls() {
3042 fn create_batch() -> RecordBatch {
3043 let schema = Schema::new(vec![
3044 Field::new("a", DataType::Int32, true),
3045 Field::new("b", DataType::Utf8, true),
3046 ]);
3047
3048 let a = Int32Array::from(vec![Some(1), None, Some(1), None, Some(1)]);
3049 let b = StringArray::from(vec![None, Some("a"), Some("a"), None, Some("a")]);
3050
3051 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap()
3052 }
3053
3054 let record_batch = create_batch();
3055 let record_batch_slice = record_batch.slice(1, 2);
3056 let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
3057
3058 assert!(
3059 serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
3060 );
3061
3062 assert!(deserialized_batch.column(0).is_null(0));
3063 assert!(deserialized_batch.column(0).is_valid(1));
3064 assert!(deserialized_batch.column(1).is_valid(0));
3065 assert!(deserialized_batch.column(1).is_valid(1));
3066
3067 assert_eq!(record_batch_slice, deserialized_batch);
3068 }
3069
3070 #[test]
3071 fn truncate_ipc_dictionary_array() {
3072 fn create_batch() -> RecordBatch {
3073 let values: StringArray = [Some("foo"), Some("bar"), Some("baz")]
3074 .into_iter()
3075 .collect();
3076 let keys: Int32Array = [Some(0), Some(2), None, Some(1)].into_iter().collect();
3077
3078 let array = DictionaryArray::new(keys, Arc::new(values));
3079
3080 let schema = Schema::new(vec![Field::new("dict", array.data_type().clone(), true)]);
3081
3082 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap()
3083 }
3084
3085 let record_batch = create_batch();
3086 let record_batch_slice = record_batch.slice(1, 2);
3087 let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
3088
3089 assert!(
3090 serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
3091 );
3092
3093 assert!(deserialized_batch.column(0).is_valid(0));
3094 assert!(deserialized_batch.column(0).is_null(1));
3095
3096 assert_eq!(record_batch_slice, deserialized_batch);
3097 }
3098
3099 #[test]
3100 fn truncate_ipc_struct_array() {
3101 fn create_batch() -> RecordBatch {
3102 let strings: StringArray = [Some("foo"), None, Some("bar"), Some("baz")]
3103 .into_iter()
3104 .collect();
3105 let ints: Int32Array = [Some(0), Some(2), None, Some(1)].into_iter().collect();
3106
3107 let struct_array = StructArray::from(vec![
3108 (
3109 Arc::new(Field::new("s", DataType::Utf8, true)),
3110 Arc::new(strings) as ArrayRef,
3111 ),
3112 (
3113 Arc::new(Field::new("c", DataType::Int32, true)),
3114 Arc::new(ints) as ArrayRef,
3115 ),
3116 ]);
3117
3118 let schema = Schema::new(vec![Field::new(
3119 "struct_array",
3120 struct_array.data_type().clone(),
3121 true,
3122 )]);
3123
3124 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(struct_array)]).unwrap()
3125 }
3126
3127 let record_batch = create_batch();
3128 let record_batch_slice = record_batch.slice(1, 2);
3129 let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
3130
3131 assert!(
3132 serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
3133 );
3134
3135 let structs = deserialized_batch
3136 .column(0)
3137 .as_any()
3138 .downcast_ref::<StructArray>()
3139 .unwrap();
3140
3141 assert!(structs.column(0).is_null(0));
3142 assert!(structs.column(0).is_valid(1));
3143 assert!(structs.column(1).is_valid(0));
3144 assert!(structs.column(1).is_null(1));
3145 assert_eq!(record_batch_slice, deserialized_batch);
3146 }
3147
3148 #[test]
3149 fn truncate_ipc_string_array_with_all_empty_string() {
3150 fn create_batch() -> RecordBatch {
3151 let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
3152 let a = StringArray::from(vec![Some(""), Some(""), Some(""), Some(""), Some("")]);
3153 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap()
3154 }
3155
3156 let record_batch = create_batch();
3157 let record_batch_slice = record_batch.slice(0, 1);
3158 let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
3159
3160 assert!(
3161 serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
3162 );
3163 assert_eq!(record_batch_slice, deserialized_batch);
3164 }
3165
3166 #[test]
3167 fn test_stream_writer_writes_array_slice() {
3168 let array = UInt32Array::from(vec![Some(1), Some(2), Some(3)]);
3169 assert_eq!(
3170 vec![Some(1), Some(2), Some(3)],
3171 array.iter().collect::<Vec<_>>()
3172 );
3173
3174 let sliced = array.slice(1, 2);
3175 assert_eq!(vec![Some(2), Some(3)], sliced.iter().collect::<Vec<_>>());
3176
3177 let batch = RecordBatch::try_new(
3178 Arc::new(Schema::new(vec![Field::new("a", DataType::UInt32, true)])),
3179 vec![Arc::new(sliced)],
3180 )
3181 .expect("new batch");
3182
3183 let mut writer = StreamWriter::try_new(vec![], batch.schema_ref()).expect("new writer");
3184 writer.write(&batch).expect("write");
3185 let outbuf = writer.into_inner().expect("inner");
3186
3187 let mut reader = StreamReader::try_new(&outbuf[..], None).expect("new reader");
3188 let read_batch = reader.next().unwrap().expect("read batch");
3189
3190 let read_array: &UInt32Array = read_batch.column(0).as_primitive();
3191 assert_eq!(
3192 vec![Some(2), Some(3)],
3193 read_array.iter().collect::<Vec<_>>()
3194 );
3195 }
3196
3197 #[test]
3198 fn test_large_slice_uint32() {
3199 ensure_roundtrip(Arc::new(UInt32Array::from_iter(
3200 (0..8000).map(|i| if i % 2 == 0 { Some(i) } else { None }),
3201 )));
3202 }
3203
3204 #[test]
3205 fn test_large_slice_string() {
3206 let strings: Vec<_> = (0..8000)
3207 .map(|i| {
3208 if i % 2 == 0 {
3209 Some(format!("value{i}"))
3210 } else {
3211 None
3212 }
3213 })
3214 .collect();
3215
3216 ensure_roundtrip(Arc::new(StringArray::from(strings)));
3217 }
3218
3219 #[test]
3220 fn test_large_slice_string_list() {
3221 let mut ls = ListBuilder::new(StringBuilder::new());
3222
3223 let mut s = String::new();
3224 for row_number in 0..8000 {
3225 if row_number % 2 == 0 {
3226 for list_element in 0..1000 {
3227 s.clear();
3228 use std::fmt::Write;
3229 write!(&mut s, "value{row_number}-{list_element}").unwrap();
3230 ls.values().append_value(&s);
3231 }
3232 ls.append(true)
3233 } else {
3234 ls.append(false); }
3236 }
3237
3238 ensure_roundtrip(Arc::new(ls.finish()));
3239 }
3240
3241 #[test]
3242 fn test_large_slice_string_list_of_lists() {
3243 let mut ls = ListBuilder::new(ListBuilder::new(StringBuilder::new()));
3247
3248 for _ in 0..4000 {
3249 ls.values().append(true);
3250 ls.append(true)
3251 }
3252
3253 let mut s = String::new();
3254 for row_number in 0..4000 {
3255 if row_number % 2 == 0 {
3256 for list_element in 0..1000 {
3257 s.clear();
3258 use std::fmt::Write;
3259 write!(&mut s, "value{row_number}-{list_element}").unwrap();
3260 ls.values().values().append_value(&s);
3261 }
3262 ls.values().append(true);
3263 ls.append(true)
3264 } else {
3265 ls.append(false); }
3267 }
3268
3269 ensure_roundtrip(Arc::new(ls.finish()));
3270 }
3271
3272 fn ensure_roundtrip(array: ArrayRef) {
3274 let num_rows = array.len();
3275 let orig_batch = RecordBatch::try_from_iter(vec![("a", array)]).unwrap();
3276 let sliced_batch = orig_batch.slice(1, num_rows - 1);
3278
3279 let schema = orig_batch.schema();
3280 let stream_data = {
3281 let mut writer = StreamWriter::try_new(vec![], &schema).unwrap();
3282 writer.write(&sliced_batch).unwrap();
3283 writer.into_inner().unwrap()
3284 };
3285 let read_batch = {
3286 let projection = None;
3287 let mut reader = StreamReader::try_new(Cursor::new(stream_data), projection).unwrap();
3288 reader
3289 .next()
3290 .expect("expect no errors reading batch")
3291 .expect("expect batch")
3292 };
3293 assert_eq!(sliced_batch, read_batch);
3294
3295 let file_data = {
3296 let mut writer = FileWriter::try_new_buffered(vec![], &schema).unwrap();
3297 writer.write(&sliced_batch).unwrap();
3298 writer.into_inner().unwrap().into_inner().unwrap()
3299 };
3300 let read_batch = {
3301 let projection = None;
3302 let mut reader = FileReader::try_new(Cursor::new(file_data), projection).unwrap();
3303 reader
3304 .next()
3305 .expect("expect no errors reading batch")
3306 .expect("expect batch")
3307 };
3308 assert_eq!(sliced_batch, read_batch);
3309
3310 }
3312
3313 #[test]
3314 fn encode_bools_slice() {
3315 assert_bool_roundtrip([true, false], 1, 1);
3317
3318 assert_bool_roundtrip(
3320 [
3321 true, false, true, true, false, false, true, true, true, false, false, false, true,
3322 true, true, true, false, false, false, false, true, true, true, true, true, false,
3323 false, false, false, false,
3324 ],
3325 13,
3326 17,
3327 );
3328
3329 assert_bool_roundtrip(
3331 [
3332 true, false, true, true, false, false, true, true, true, false, false, false,
3333 ],
3334 8,
3335 2,
3336 );
3337
3338 assert_bool_roundtrip(
3340 [
3341 true, false, true, true, false, false, true, true, true, false, false, false, true,
3342 true, true, true, true, false, false, false, false, false,
3343 ],
3344 8,
3345 8,
3346 );
3347 }
3348
3349 fn assert_bool_roundtrip<const N: usize>(bools: [bool; N], offset: usize, length: usize) {
3350 let val_bool_field = Field::new("val", DataType::Boolean, false);
3351
3352 let schema = Arc::new(Schema::new(vec![val_bool_field]));
3353
3354 let bools = BooleanArray::from(bools.to_vec());
3355
3356 let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(bools)]).unwrap();
3357 let batch = batch.slice(offset, length);
3358
3359 let data = serialize_stream(&batch);
3360 let batch2 = deserialize_stream(data);
3361 assert_eq!(batch, batch2);
3362 }
3363
3364 #[test]
3365 fn test_run_array_unslice() {
3366 let total_len = 80;
3367 let vals: Vec<Option<i32>> = vec![Some(1), None, Some(2), Some(3), Some(4), None, Some(5)];
3368 let repeats: Vec<usize> = vec![3, 4, 1, 2];
3369 let mut input_array: Vec<Option<i32>> = Vec::with_capacity(total_len);
3370 for ix in 0_usize..32 {
3371 let repeat: usize = repeats[ix % repeats.len()];
3372 let val: Option<i32> = vals[ix % vals.len()];
3373 input_array.resize(input_array.len() + repeat, val);
3374 }
3375
3376 let mut builder =
3378 PrimitiveRunBuilder::<Int16Type, Int32Type>::with_capacity(input_array.len());
3379 builder.extend(input_array.iter().copied());
3380 let run_array = builder.finish();
3381
3382 for slice_len in 1..=total_len {
3384 let sliced_run_array: RunArray<Int16Type> =
3386 run_array.slice(0, slice_len).into_data().into();
3387
3388 let unsliced_run_array = into_zero_offset_run_array(sliced_run_array).unwrap();
3390 let typed = unsliced_run_array
3391 .downcast::<PrimitiveArray<Int32Type>>()
3392 .unwrap();
3393 let expected: Vec<Option<i32>> = input_array.iter().take(slice_len).copied().collect();
3394 let actual: Vec<Option<i32>> = typed.into_iter().collect();
3395 assert_eq!(expected, actual);
3396
3397 let sliced_run_array: RunArray<Int16Type> = run_array
3399 .slice(total_len - slice_len, slice_len)
3400 .into_data()
3401 .into();
3402
3403 let unsliced_run_array = into_zero_offset_run_array(sliced_run_array).unwrap();
3405 let typed = unsliced_run_array
3406 .downcast::<PrimitiveArray<Int32Type>>()
3407 .unwrap();
3408 let expected: Vec<Option<i32>> = input_array
3409 .iter()
3410 .skip(total_len - slice_len)
3411 .copied()
3412 .collect();
3413 let actual: Vec<Option<i32>> = typed.into_iter().collect();
3414 assert_eq!(expected, actual);
3415 }
3416 }
3417
3418 fn generate_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3419 let mut ls = GenericListBuilder::<O, _>::new(UInt32Builder::new());
3420
3421 for i in 0..100_000 {
3422 for value in [i, i, i] {
3423 ls.values().append_value(value);
3424 }
3425 ls.append(true)
3426 }
3427
3428 ls.finish()
3429 }
3430
3431 fn generate_utf8view_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3432 let mut ls = GenericListBuilder::<O, _>::new(StringViewBuilder::new());
3433
3434 for i in 0..100_000 {
3435 for value in [
3436 format!("value{}", i),
3437 format!("value{}", i),
3438 format!("value{}", i),
3439 ] {
3440 ls.values().append_value(&value);
3441 }
3442 ls.append(true)
3443 }
3444
3445 ls.finish()
3446 }
3447
3448 fn generate_string_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3449 let mut ls = GenericListBuilder::<O, _>::new(StringBuilder::new());
3450
3451 for i in 0..100_000 {
3452 for value in [
3453 format!("value{}", i),
3454 format!("value{}", i),
3455 format!("value{}", i),
3456 ] {
3457 ls.values().append_value(&value);
3458 }
3459 ls.append(true)
3460 }
3461
3462 ls.finish()
3463 }
3464
3465 fn generate_nested_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3466 let mut ls =
3467 GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(UInt32Builder::new()));
3468
3469 for _i in 0..10_000 {
3470 for j in 0..10 {
3471 for value in [j, j, j, j] {
3472 ls.values().values().append_value(value);
3473 }
3474 ls.values().append(true)
3475 }
3476 ls.append(true);
3477 }
3478
3479 ls.finish()
3480 }
3481
3482 fn generate_nested_list_data_starting_at_zero<O: OffsetSizeTrait>() -> GenericListArray<O> {
3483 let mut ls =
3484 GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(UInt32Builder::new()));
3485
3486 for _i in 0..999 {
3487 ls.values().append(true);
3488 ls.append(true);
3489 }
3490
3491 for j in 0..10 {
3492 for value in [j, j, j, j] {
3493 ls.values().values().append_value(value);
3494 }
3495 ls.values().append(true)
3496 }
3497 ls.append(true);
3498
3499 for i in 0..9_000 {
3500 for j in 0..10 {
3501 for value in [i + j, i + j, i + j, i + j] {
3502 ls.values().values().append_value(value);
3503 }
3504 ls.values().append(true)
3505 }
3506 ls.append(true);
3507 }
3508
3509 ls.finish()
3510 }
3511
3512 fn generate_map_array_data() -> MapArray {
3513 let keys_builder = UInt32Builder::new();
3514 let values_builder = UInt32Builder::new();
3515
3516 let mut builder = MapBuilder::new(None, keys_builder, values_builder);
3517
3518 for i in 0..100_000 {
3519 for _j in 0..3 {
3520 builder.keys().append_value(i);
3521 builder.values().append_value(i * 2);
3522 }
3523 builder.append(true).unwrap();
3524 }
3525
3526 builder.finish()
3527 }
3528
3529 #[test]
3530 fn reencode_offsets_when_first_offset_is_not_zero() {
3531 let original_list = generate_list_data::<i32>();
3532 let original_data = original_list.into_data();
3533 let slice_data = original_data.slice(75, 7);
3534 let (new_offsets, original_start, length) =
3535 reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
3536 assert_eq!(
3537 vec![0, 3, 6, 9, 12, 15, 18, 21],
3538 new_offsets.typed_data::<i32>()
3539 );
3540 assert_eq!(225, original_start);
3541 assert_eq!(21, length);
3542 }
3543
3544 #[test]
3545 fn reencode_offsets_when_first_offset_is_zero() {
3546 let mut ls = GenericListBuilder::<i32, _>::new(UInt32Builder::new());
3547 ls.append(true);
3549 ls.values().append_value(35);
3550 ls.values().append_value(42);
3551 ls.append(true);
3552 let original_list = ls.finish();
3553 let original_data = original_list.into_data();
3554
3555 let slice_data = original_data.slice(1, 1);
3556 let (new_offsets, original_start, length) =
3557 reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
3558 assert_eq!(vec![0, 2], new_offsets.typed_data::<i32>());
3559 assert_eq!(0, original_start);
3560 assert_eq!(2, length);
3561 }
3562
3563 fn roundtrip_ensure_sliced_smaller(in_batch: RecordBatch, expected_size_factor: usize) {
3566 let in_sliced = in_batch.slice(999, 1);
3568
3569 let bytes_batch = serialize_file(&in_batch);
3570 let bytes_sliced = serialize_file(&in_sliced);
3571
3572 assert!(bytes_sliced.len() < (bytes_batch.len() / expected_size_factor));
3574
3575 let out_batch = deserialize_file(bytes_batch);
3577 assert_eq!(in_batch, out_batch);
3578
3579 let out_sliced = deserialize_file(bytes_sliced);
3580 assert_eq!(in_sliced, out_sliced);
3581 }
3582
3583 #[test]
3584 fn encode_lists() {
3585 let val_inner = Field::new_list_field(DataType::UInt32, true);
3586 let val_list_field = Field::new("val", DataType::List(Arc::new(val_inner)), false);
3587 let schema = Arc::new(Schema::new(vec![val_list_field]));
3588
3589 let values = Arc::new(generate_list_data::<i32>());
3590
3591 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3592 roundtrip_ensure_sliced_smaller(in_batch, 1000);
3593 }
3594
3595 #[test]
3596 fn encode_empty_list() {
3597 let val_inner = Field::new_list_field(DataType::UInt32, true);
3598 let val_list_field = Field::new("val", DataType::List(Arc::new(val_inner)), false);
3599 let schema = Arc::new(Schema::new(vec![val_list_field]));
3600
3601 let values = Arc::new(generate_list_data::<i32>());
3602
3603 let in_batch = RecordBatch::try_new(schema, vec![values])
3604 .unwrap()
3605 .slice(999, 0);
3606 let out_batch = deserialize_file(serialize_file(&in_batch));
3607 assert_eq!(in_batch, out_batch);
3608 }
3609
3610 #[test]
3611 fn encode_large_lists() {
3612 let val_inner = Field::new_list_field(DataType::UInt32, true);
3613 let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3614 let schema = Arc::new(Schema::new(vec![val_list_field]));
3615
3616 let values = Arc::new(generate_list_data::<i64>());
3617
3618 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3621 roundtrip_ensure_sliced_smaller(in_batch, 1000);
3622 }
3623
3624 #[test]
3625 fn encode_large_lists_non_zero_offset() {
3626 let val_inner = Field::new_list_field(DataType::UInt32, true);
3627 let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3628 let schema = Arc::new(Schema::new(vec![val_list_field]));
3629
3630 let values = Arc::new(generate_list_data::<i64>());
3631
3632 check_sliced_list_array(schema, values);
3633 }
3634
3635 #[test]
3636 fn encode_large_lists_string_non_zero_offset() {
3637 let val_inner = Field::new_list_field(DataType::Utf8, true);
3638 let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3639 let schema = Arc::new(Schema::new(vec![val_list_field]));
3640
3641 let values = Arc::new(generate_string_list_data::<i64>());
3642
3643 check_sliced_list_array(schema, values);
3644 }
3645
3646 #[test]
3647 fn encode_large_list_string_view_non_zero_offset() {
3648 let val_inner = Field::new_list_field(DataType::Utf8View, true);
3649 let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3650 let schema = Arc::new(Schema::new(vec![val_list_field]));
3651
3652 let values = Arc::new(generate_utf8view_list_data::<i64>());
3653
3654 check_sliced_list_array(schema, values);
3655 }
3656
3657 fn check_sliced_list_array(schema: Arc<Schema>, values: Arc<GenericListArray<i64>>) {
3658 for (offset, len) in [(999, 1), (0, 13), (47, 12), (values.len() - 13, 13)] {
3659 let in_batch = RecordBatch::try_new(schema.clone(), vec![values.clone()])
3660 .unwrap()
3661 .slice(offset, len);
3662 let out_batch = deserialize_file(serialize_file(&in_batch));
3663 assert_eq!(in_batch, out_batch);
3664 }
3665 }
3666
3667 #[test]
3668 fn encode_nested_lists() {
3669 let inner_int = Arc::new(Field::new_list_field(DataType::UInt32, true));
3670 let inner_list_field = Arc::new(Field::new_list_field(DataType::List(inner_int), true));
3671 let list_field = Field::new("val", DataType::List(inner_list_field), true);
3672 let schema = Arc::new(Schema::new(vec![list_field]));
3673
3674 let values = Arc::new(generate_nested_list_data::<i32>());
3675
3676 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3677 roundtrip_ensure_sliced_smaller(in_batch, 1000);
3678 }
3679
3680 #[test]
3681 fn encode_nested_lists_starting_at_zero() {
3682 let inner_int = Arc::new(Field::new("item", DataType::UInt32, true));
3683 let inner_list_field = Arc::new(Field::new("item", DataType::List(inner_int), true));
3684 let list_field = Field::new("val", DataType::List(inner_list_field), true);
3685 let schema = Arc::new(Schema::new(vec![list_field]));
3686
3687 let values = Arc::new(generate_nested_list_data_starting_at_zero::<i32>());
3688
3689 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3690 roundtrip_ensure_sliced_smaller(in_batch, 1);
3691 }
3692
3693 #[test]
3694 fn encode_map_array() {
3695 let keys = Arc::new(Field::new("keys", DataType::UInt32, false));
3696 let values = Arc::new(Field::new("values", DataType::UInt32, true));
3697 let map_field = Field::new_map("map", "entries", keys, values, false, true);
3698 let schema = Arc::new(Schema::new(vec![map_field]));
3699
3700 let values = Arc::new(generate_map_array_data());
3701
3702 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3703 roundtrip_ensure_sliced_smaller(in_batch, 1000);
3704 }
3705
3706 fn generate_list_view_data<O: OffsetSizeTrait>() -> GenericListViewArray<O> {
3707 let mut builder = GenericListViewBuilder::<O, _>::new(UInt32Builder::new());
3708
3709 for i in 0u32..100_000 {
3710 if i.is_multiple_of(10_000) {
3711 builder.append(false);
3712 continue;
3713 }
3714 for value in [i, i, i] {
3715 builder.values().append_value(value);
3716 }
3717 builder.append(true);
3718 }
3719
3720 builder.finish()
3721 }
3722
3723 #[test]
3724 fn encode_list_view_arrays() {
3725 let val_inner = Field::new_list_field(DataType::UInt32, true);
3726 let val_field = Field::new("val", DataType::ListView(Arc::new(val_inner)), true);
3727 let schema = Arc::new(Schema::new(vec![val_field]));
3728
3729 let values = Arc::new(generate_list_view_data::<i32>());
3730
3731 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3732 let out_batch = deserialize_file(serialize_file(&in_batch));
3733 assert_eq!(in_batch, out_batch);
3734 }
3735
3736 #[test]
3737 fn encode_large_list_view_arrays() {
3738 let val_inner = Field::new_list_field(DataType::UInt32, true);
3739 let val_field = Field::new("val", DataType::LargeListView(Arc::new(val_inner)), true);
3740 let schema = Arc::new(Schema::new(vec![val_field]));
3741
3742 let values = Arc::new(generate_list_view_data::<i64>());
3743
3744 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3745 let out_batch = deserialize_file(serialize_file(&in_batch));
3746 assert_eq!(in_batch, out_batch);
3747 }
3748
3749 #[test]
3750 fn check_sliced_list_view_array() {
3751 let inner = Field::new_list_field(DataType::UInt32, true);
3752 let field = Field::new("val", DataType::ListView(Arc::new(inner)), true);
3753 let schema = Arc::new(Schema::new(vec![field]));
3754 let values = Arc::new(generate_list_view_data::<i32>());
3755
3756 for (offset, len) in [(999, 1), (0, 13), (47, 12), (values.len() - 13, 13)] {
3757 let in_batch = RecordBatch::try_new(schema.clone(), vec![values.clone()])
3758 .unwrap()
3759 .slice(offset, len);
3760 let out_batch = deserialize_file(serialize_file(&in_batch));
3761 assert_eq!(in_batch, out_batch);
3762 }
3763 }
3764
3765 #[test]
3766 fn check_sliced_large_list_view_array() {
3767 let inner = Field::new_list_field(DataType::UInt32, true);
3768 let field = Field::new("val", DataType::LargeListView(Arc::new(inner)), true);
3769 let schema = Arc::new(Schema::new(vec![field]));
3770 let values = Arc::new(generate_list_view_data::<i64>());
3771
3772 for (offset, len) in [(999, 1), (0, 13), (47, 12), (values.len() - 13, 13)] {
3773 let in_batch = RecordBatch::try_new(schema.clone(), vec![values.clone()])
3774 .unwrap()
3775 .slice(offset, len);
3776 let out_batch = deserialize_file(serialize_file(&in_batch));
3777 assert_eq!(in_batch, out_batch);
3778 }
3779 }
3780
3781 fn generate_nested_list_view_data<O: OffsetSizeTrait>() -> GenericListViewArray<O> {
3782 let inner_builder = UInt32Builder::new();
3783 let middle_builder = GenericListViewBuilder::<O, _>::new(inner_builder);
3784 let mut outer_builder = GenericListViewBuilder::<O, _>::new(middle_builder);
3785
3786 for i in 0u32..10_000 {
3787 if i.is_multiple_of(1_000) {
3788 outer_builder.append(false);
3789 continue;
3790 }
3791
3792 for _ in 0..3 {
3793 for value in [i, i + 1, i + 2] {
3794 outer_builder.values().values().append_value(value);
3795 }
3796 outer_builder.values().append(true);
3797 }
3798 outer_builder.append(true);
3799 }
3800
3801 outer_builder.finish()
3802 }
3803
3804 #[test]
3805 fn encode_nested_list_views() {
3806 let inner_int = Arc::new(Field::new_list_field(DataType::UInt32, true));
3807 let inner_list_field = Arc::new(Field::new_list_field(DataType::ListView(inner_int), true));
3808 let list_field = Field::new("val", DataType::ListView(inner_list_field), true);
3809 let schema = Arc::new(Schema::new(vec![list_field]));
3810
3811 let values = Arc::new(generate_nested_list_view_data::<i32>());
3812
3813 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3814 let out_batch = deserialize_file(serialize_file(&in_batch));
3815 assert_eq!(in_batch, out_batch);
3816 }
3817
3818 fn test_roundtrip_list_view_of_dict_impl<OffsetSize: OffsetSizeTrait, U: ArrowNativeType>(
3819 list_data_type: DataType,
3820 offsets: &[U; 5],
3821 sizes: &[U; 4],
3822 ) {
3823 let values = StringArray::from(vec![Some("alpha"), None, Some("beta"), Some("gamma")]);
3824 let keys = Int32Array::from_iter_values([0, 0, 1, 2, 3, 0, 2]);
3825 let dict_array = DictionaryArray::new(keys, Arc::new(values));
3826 let dict_data = dict_array.to_data();
3827
3828 let value_offsets = Buffer::from_slice_ref(offsets);
3829 let value_sizes = Buffer::from_slice_ref(sizes);
3830
3831 let list_data = ArrayData::builder(list_data_type)
3832 .len(4)
3833 .add_buffer(value_offsets)
3834 .add_buffer(value_sizes)
3835 .add_child_data(dict_data)
3836 .build()
3837 .unwrap();
3838 let list_view_array = GenericListViewArray::<OffsetSize>::from(list_data);
3839
3840 let schema = Arc::new(Schema::new(vec![Field::new(
3841 "f1",
3842 list_view_array.data_type().clone(),
3843 false,
3844 )]));
3845 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(list_view_array)]).unwrap();
3846
3847 let output_batch = deserialize_file(serialize_file(&input_batch));
3848 assert_eq!(input_batch, output_batch);
3849
3850 let output_batch = deserialize_stream(serialize_stream(&input_batch));
3851 assert_eq!(input_batch, output_batch);
3852 }
3853
3854 #[test]
3855 fn test_roundtrip_list_view_of_dict() {
3856 #[allow(deprecated)]
3857 let list_data_type = DataType::ListView(Arc::new(Field::new_dict(
3858 "item",
3859 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3860 true,
3861 1,
3862 false,
3863 )));
3864 let offsets: &[i32; 5] = &[0, 2, 4, 4, 7];
3865 let sizes: &[i32; 4] = &[2, 2, 0, 3];
3866 test_roundtrip_list_view_of_dict_impl::<i32, i32>(list_data_type, offsets, sizes);
3867 }
3868
3869 #[test]
3870 fn test_roundtrip_large_list_view_of_dict() {
3871 #[allow(deprecated)]
3872 let list_data_type = DataType::LargeListView(Arc::new(Field::new_dict(
3873 "item",
3874 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3875 true,
3876 2,
3877 false,
3878 )));
3879 let offsets: &[i64; 5] = &[0, 2, 4, 4, 7];
3880 let sizes: &[i64; 4] = &[2, 2, 0, 3];
3881 test_roundtrip_list_view_of_dict_impl::<i64, i64>(list_data_type, offsets, sizes);
3882 }
3883
3884 #[test]
3885 fn test_roundtrip_sliced_list_view_of_dict() {
3886 #[allow(deprecated)]
3887 let list_data_type = DataType::ListView(Arc::new(Field::new_dict(
3888 "item",
3889 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3890 true,
3891 3,
3892 false,
3893 )));
3894
3895 let values = StringArray::from(vec![Some("alpha"), None, Some("beta"), Some("gamma")]);
3896 let keys = Int32Array::from_iter_values([0, 0, 1, 2, 3, 0, 2, 1, 0, 3, 2, 1]);
3897 let dict_array = DictionaryArray::new(keys, Arc::new(values));
3898 let dict_data = dict_array.to_data();
3899
3900 let offsets: &[i32; 7] = &[0, 2, 4, 4, 7, 9, 12];
3901 let sizes: &[i32; 6] = &[2, 2, 0, 3, 2, 3];
3902 let value_offsets = Buffer::from_slice_ref(offsets);
3903 let value_sizes = Buffer::from_slice_ref(sizes);
3904
3905 let list_data = ArrayData::builder(list_data_type)
3906 .len(6)
3907 .add_buffer(value_offsets)
3908 .add_buffer(value_sizes)
3909 .add_child_data(dict_data)
3910 .build()
3911 .unwrap();
3912 let list_view_array = GenericListViewArray::<i32>::from(list_data);
3913
3914 let schema = Arc::new(Schema::new(vec![Field::new(
3915 "f1",
3916 list_view_array.data_type().clone(),
3917 false,
3918 )]));
3919 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(list_view_array)]).unwrap();
3920
3921 let sliced_batch = input_batch.slice(1, 4);
3922
3923 let output_batch = deserialize_file(serialize_file(&sliced_batch));
3924 assert_eq!(sliced_batch, output_batch);
3925
3926 let output_batch = deserialize_stream(serialize_stream(&sliced_batch));
3927 assert_eq!(sliced_batch, output_batch);
3928 }
3929
3930 #[test]
3931 fn test_roundtrip_dense_union_of_dict() {
3932 let values = StringArray::from(vec![Some("alpha"), None, Some("beta"), Some("gamma")]);
3933 let keys = Int32Array::from_iter_values([0, 0, 1, 2, 3, 0, 2]);
3934 let dict_array = DictionaryArray::new(keys, Arc::new(values));
3935
3936 #[allow(deprecated)]
3937 let dict_field = Arc::new(Field::new_dict(
3938 "dict",
3939 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3940 true,
3941 1,
3942 false,
3943 ));
3944 let int_field = Arc::new(Field::new("int", DataType::Int32, false));
3945 let union_fields = UnionFields::try_new(vec![0, 1], vec![dict_field, int_field]).unwrap();
3946
3947 let types = ScalarBuffer::from(vec![0i8, 0, 1, 0, 1, 0, 0]);
3948 let offsets = ScalarBuffer::from(vec![0i32, 1, 0, 2, 1, 3, 4]);
3949
3950 let int_array = Int32Array::from(vec![100, 200]);
3951
3952 let union = UnionArray::try_new(
3953 union_fields.clone(),
3954 types,
3955 Some(offsets),
3956 vec![Arc::new(dict_array), Arc::new(int_array)],
3957 )
3958 .unwrap();
3959
3960 let schema = Arc::new(Schema::new(vec![Field::new(
3961 "union",
3962 DataType::Union(union_fields, UnionMode::Dense),
3963 false,
3964 )]));
3965 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap();
3966
3967 let output_batch = deserialize_file(serialize_file(&input_batch));
3968 assert_eq!(input_batch, output_batch);
3969
3970 let output_batch = deserialize_stream(serialize_stream(&input_batch));
3971 assert_eq!(input_batch, output_batch);
3972 }
3973
3974 #[test]
3975 fn test_roundtrip_sparse_union_of_dict() {
3976 let values = StringArray::from(vec![Some("alpha"), None, Some("beta"), Some("gamma")]);
3977 let keys = Int32Array::from_iter_values([0, 0, 1, 2, 3, 0, 2]);
3978 let dict_array = DictionaryArray::new(keys, Arc::new(values));
3979
3980 #[allow(deprecated)]
3981 let dict_field = Arc::new(Field::new_dict(
3982 "dict",
3983 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3984 true,
3985 2,
3986 false,
3987 ));
3988 let int_field = Arc::new(Field::new("int", DataType::Int32, false));
3989 let union_fields = UnionFields::try_new(vec![0, 1], vec![dict_field, int_field]).unwrap();
3990
3991 let types = ScalarBuffer::from(vec![0i8, 0, 1, 0, 1, 0, 0]);
3992
3993 let int_array = Int32Array::from(vec![0, 0, 100, 0, 200, 0, 0]);
3994
3995 let union = UnionArray::try_new(
3996 union_fields.clone(),
3997 types,
3998 None,
3999 vec![Arc::new(dict_array), Arc::new(int_array)],
4000 )
4001 .unwrap();
4002
4003 let schema = Arc::new(Schema::new(vec![Field::new(
4004 "union",
4005 DataType::Union(union_fields, UnionMode::Sparse),
4006 false,
4007 )]));
4008 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap();
4009
4010 let output_batch = deserialize_file(serialize_file(&input_batch));
4011 assert_eq!(input_batch, output_batch);
4012
4013 let output_batch = deserialize_stream(serialize_stream(&input_batch));
4014 assert_eq!(input_batch, output_batch);
4015 }
4016
4017 #[test]
4018 fn test_roundtrip_map_with_dict_keys() {
4019 let key_values = StringArray::from(vec!["key_a", "key_b", "key_c"]);
4022 let keys = Int32Array::from_iter_values([0, 1, 2, 0, 1, 0]);
4023 let dict_keys = DictionaryArray::new(keys, Arc::new(key_values));
4024
4025 let values = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
4026
4027 #[allow(deprecated)]
4028 let entries_field = Arc::new(Field::new(
4029 "entries",
4030 DataType::Struct(
4031 vec![
4032 Field::new_dict(
4033 "key",
4034 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
4035 false,
4036 1,
4037 false,
4038 ),
4039 Field::new("value", DataType::Int32, true),
4040 ]
4041 .into(),
4042 ),
4043 false,
4044 ));
4045
4046 let entries = StructArray::from(vec![
4047 (
4048 Arc::new(Field::new(
4049 "key",
4050 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
4051 false,
4052 )),
4053 Arc::new(dict_keys) as ArrayRef,
4054 ),
4055 (
4056 Arc::new(Field::new("value", DataType::Int32, true)),
4057 Arc::new(values) as ArrayRef,
4058 ),
4059 ]);
4060
4061 let offsets = Buffer::from_slice_ref([0i32, 2, 4, 6]);
4062
4063 let map_data = ArrayData::builder(DataType::Map(entries_field, false))
4064 .len(3)
4065 .add_buffer(offsets)
4066 .add_child_data(entries.into_data())
4067 .build()
4068 .unwrap();
4069 let map_array = MapArray::from(map_data);
4070
4071 let schema = Arc::new(Schema::new(vec![Field::new(
4072 "map",
4073 map_array.data_type().clone(),
4074 false,
4075 )]));
4076 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(map_array)]).unwrap();
4077
4078 let output_batch = deserialize_file(serialize_file(&input_batch));
4079 assert_eq!(input_batch, output_batch);
4080
4081 let output_batch = deserialize_stream(serialize_stream(&input_batch));
4082 assert_eq!(input_batch, output_batch);
4083 }
4084
4085 #[test]
4086 fn test_roundtrip_map_with_dict_values() {
4087 let keys = StringArray::from(vec!["a", "b", "c", "d", "e", "f"]);
4090
4091 let value_values = StringArray::from(vec!["val_x", "val_y", "val_z"]);
4092 let value_keys = Int32Array::from_iter_values([0, 1, 2, 0, 1, 0]);
4093 let dict_values = DictionaryArray::new(value_keys, Arc::new(value_values));
4094
4095 #[allow(deprecated)]
4096 let entries_field = Arc::new(Field::new(
4097 "entries",
4098 DataType::Struct(
4099 vec![
4100 Field::new("key", DataType::Utf8, false),
4101 Field::new_dict(
4102 "value",
4103 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
4104 true,
4105 2,
4106 false,
4107 ),
4108 ]
4109 .into(),
4110 ),
4111 false,
4112 ));
4113
4114 let entries = StructArray::from(vec![
4115 (
4116 Arc::new(Field::new("key", DataType::Utf8, false)),
4117 Arc::new(keys) as ArrayRef,
4118 ),
4119 (
4120 Arc::new(Field::new(
4121 "value",
4122 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
4123 true,
4124 )),
4125 Arc::new(dict_values) as ArrayRef,
4126 ),
4127 ]);
4128
4129 let offsets = Buffer::from_slice_ref([0i32, 2, 4, 6]);
4130
4131 let map_data = ArrayData::builder(DataType::Map(entries_field, false))
4132 .len(3)
4133 .add_buffer(offsets)
4134 .add_child_data(entries.into_data())
4135 .build()
4136 .unwrap();
4137 let map_array = MapArray::from(map_data);
4138
4139 let schema = Arc::new(Schema::new(vec![Field::new(
4140 "map",
4141 map_array.data_type().clone(),
4142 false,
4143 )]));
4144 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(map_array)]).unwrap();
4145
4146 let output_batch = deserialize_file(serialize_file(&input_batch));
4147 assert_eq!(input_batch, output_batch);
4148
4149 let output_batch = deserialize_stream(serialize_stream(&input_batch));
4150 assert_eq!(input_batch, output_batch);
4151 }
4152
4153 #[test]
4154 fn test_decimal128_alignment16_is_sufficient() {
4155 const IPC_ALIGNMENT: usize = 16;
4156
4157 for num_cols in [1, 2, 3, 17, 50, 73, 99] {
4162 let num_rows = (num_cols * 7 + 11) % 100; let mut fields = Vec::new();
4165 let mut arrays = Vec::new();
4166 for i in 0..num_cols {
4167 let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
4168 let array = Decimal128Array::from(vec![num_cols as i128; num_rows]);
4169 fields.push(field);
4170 arrays.push(Arc::new(array) as Arc<dyn Array>);
4171 }
4172 let schema = Schema::new(fields);
4173 let batch = RecordBatch::try_new(Arc::new(schema), arrays).unwrap();
4174
4175 let mut writer = FileWriter::try_new_with_options(
4176 Vec::new(),
4177 batch.schema_ref(),
4178 IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
4179 )
4180 .unwrap();
4181 writer.write(&batch).unwrap();
4182 writer.finish().unwrap();
4183
4184 let out: Vec<u8> = writer.into_inner().unwrap();
4185
4186 let buffer = Buffer::from_vec(out);
4187 let trailer_start = buffer.len() - 10;
4188 let footer_len =
4189 read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
4190 let footer =
4191 root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
4192
4193 let schema = fb_to_schema(footer.schema().unwrap());
4194
4195 let decoder =
4198 FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true);
4199
4200 let batches = footer.recordBatches().unwrap();
4201
4202 let block = batches.get(0);
4203 let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
4204 let data = buffer.slice_with_length(block.offset() as _, block_len);
4205
4206 let batch2 = decoder.read_record_batch(block, &data).unwrap().unwrap();
4207
4208 assert_eq!(batch, batch2);
4209 }
4210 }
4211
4212 #[test]
4213 fn test_decimal128_alignment8_is_unaligned() {
4214 const IPC_ALIGNMENT: usize = 8;
4215
4216 let num_cols = 2;
4217 let num_rows = 1;
4218
4219 let mut fields = Vec::new();
4220 let mut arrays = Vec::new();
4221 for i in 0..num_cols {
4222 let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
4223 let array = Decimal128Array::from(vec![num_cols as i128; num_rows]);
4224 fields.push(field);
4225 arrays.push(Arc::new(array) as Arc<dyn Array>);
4226 }
4227 let schema = Schema::new(fields);
4228 let batch = RecordBatch::try_new(Arc::new(schema), arrays).unwrap();
4229
4230 let mut writer = FileWriter::try_new_with_options(
4231 Vec::new(),
4232 batch.schema_ref(),
4233 IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
4234 )
4235 .unwrap();
4236 writer.write(&batch).unwrap();
4237 writer.finish().unwrap();
4238
4239 let out: Vec<u8> = writer.into_inner().unwrap();
4240
4241 let buffer = Buffer::from_vec(out);
4242 let trailer_start = buffer.len() - 10;
4243 let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
4244 let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
4245 let schema = fb_to_schema(footer.schema().unwrap());
4246
4247 let decoder =
4250 FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true);
4251
4252 let batches = footer.recordBatches().unwrap();
4253
4254 let block = batches.get(0);
4255 let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
4256 let data = buffer.slice_with_length(block.offset() as _, block_len);
4257
4258 let result = decoder.read_record_batch(block, &data);
4259
4260 let error = result.unwrap_err();
4261 assert_eq!(
4262 error.to_string(),
4263 "Invalid argument error: Misaligned buffers[0] in array of type Decimal128(38, 10), \
4264 offset from expected alignment of 16 by 8"
4265 );
4266 }
4267
4268 #[test]
4269 fn test_flush() {
4270 let num_cols = 2;
4273 let mut fields = Vec::new();
4274 let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap();
4275 for i in 0..num_cols {
4276 let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
4277 fields.push(field);
4278 }
4279 let schema = Schema::new(fields);
4280 let inner_stream_writer = BufWriter::with_capacity(1024, Vec::new());
4281 let inner_file_writer = BufWriter::with_capacity(1024, Vec::new());
4282 let mut stream_writer =
4283 StreamWriter::try_new_with_options(inner_stream_writer, &schema, options.clone())
4284 .unwrap();
4285 let mut file_writer =
4286 FileWriter::try_new_with_options(inner_file_writer, &schema, options).unwrap();
4287
4288 let stream_bytes_written_on_new = stream_writer.get_ref().get_ref().len();
4289 let file_bytes_written_on_new = file_writer.get_ref().get_ref().len();
4290 stream_writer.flush().unwrap();
4291 file_writer.flush().unwrap();
4292 let stream_bytes_written_on_flush = stream_writer.get_ref().get_ref().len();
4293 let file_bytes_written_on_flush = file_writer.get_ref().get_ref().len();
4294 let stream_out = stream_writer.into_inner().unwrap().into_inner().unwrap();
4295 let expected_stream_flushed_bytes = stream_out.len() - 8;
4299 let expected_file_flushed_bytes = expected_stream_flushed_bytes + 8;
4302
4303 assert!(
4304 stream_bytes_written_on_new < stream_bytes_written_on_flush,
4305 "this test makes no sense if flush is not actually required"
4306 );
4307 assert!(
4308 file_bytes_written_on_new < file_bytes_written_on_flush,
4309 "this test makes no sense if flush is not actually required"
4310 );
4311 assert_eq!(stream_bytes_written_on_flush, expected_stream_flushed_bytes);
4312 assert_eq!(file_bytes_written_on_flush, expected_file_flushed_bytes);
4313 }
4314
4315 #[test]
4316 fn test_roundtrip_list_of_fixed_list() -> Result<(), ArrowError> {
4317 let l1_type =
4318 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, false)), 3);
4319 let l2_type = DataType::List(Arc::new(Field::new("item", l1_type.clone(), false)));
4320
4321 let l0_builder = Float32Builder::new();
4322 let l1_builder = FixedSizeListBuilder::new(l0_builder, 3).with_field(Arc::new(Field::new(
4323 "item",
4324 DataType::Float32,
4325 false,
4326 )));
4327 let mut l2_builder =
4328 ListBuilder::new(l1_builder).with_field(Arc::new(Field::new("item", l1_type, false)));
4329
4330 for point in [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0], [7.0, 8.0, 9.0]] {
4331 l2_builder.values().values().append_value(point[0]);
4332 l2_builder.values().values().append_value(point[1]);
4333 l2_builder.values().values().append_value(point[2]);
4334
4335 l2_builder.values().append(true);
4336 }
4337 l2_builder.append(true);
4338
4339 let point = [10., 11., 12.];
4340 l2_builder.values().values().append_value(point[0]);
4341 l2_builder.values().values().append_value(point[1]);
4342 l2_builder.values().values().append_value(point[2]);
4343
4344 l2_builder.values().append(true);
4345 l2_builder.append(true);
4346
4347 let array = Arc::new(l2_builder.finish()) as ArrayRef;
4348
4349 let schema = Arc::new(Schema::new_with_metadata(
4350 vec![Field::new("points", l2_type, false)],
4351 HashMap::default(),
4352 ));
4353
4354 test_slices(&array, &schema, 0, 1)?;
4357 test_slices(&array, &schema, 0, 2)?;
4358 test_slices(&array, &schema, 1, 1)?;
4359
4360 Ok(())
4361 }
4362
4363 #[test]
4364 fn test_roundtrip_list_of_fixed_list_w_nulls() -> Result<(), ArrowError> {
4365 let l0_builder = Float32Builder::new();
4366 let l1_builder = FixedSizeListBuilder::new(l0_builder, 3);
4367 let mut l2_builder = ListBuilder::new(l1_builder);
4368
4369 for point in [
4370 [Some(1.0), Some(2.0), None],
4371 [Some(4.0), Some(5.0), Some(6.0)],
4372 [None, Some(8.0), Some(9.0)],
4373 ] {
4374 for p in point {
4375 match p {
4376 Some(p) => l2_builder.values().values().append_value(p),
4377 None => l2_builder.values().values().append_null(),
4378 }
4379 }
4380
4381 l2_builder.values().append(true);
4382 }
4383 l2_builder.append(true);
4384
4385 let point = [Some(10.), None, None];
4386 for p in point {
4387 match p {
4388 Some(p) => l2_builder.values().values().append_value(p),
4389 None => l2_builder.values().values().append_null(),
4390 }
4391 }
4392
4393 l2_builder.values().append(true);
4394 l2_builder.append(true);
4395
4396 let array = Arc::new(l2_builder.finish()) as ArrayRef;
4397
4398 let schema = Arc::new(Schema::new_with_metadata(
4399 vec![Field::new(
4400 "points",
4401 DataType::List(Arc::new(Field::new(
4402 "item",
4403 DataType::FixedSizeList(
4404 Arc::new(Field::new("item", DataType::Float32, true)),
4405 3,
4406 ),
4407 true,
4408 ))),
4409 true,
4410 )],
4411 HashMap::default(),
4412 ));
4413
4414 test_slices(&array, &schema, 0, 1)?;
4417 test_slices(&array, &schema, 0, 2)?;
4418 test_slices(&array, &schema, 1, 1)?;
4419
4420 Ok(())
4421 }
4422
4423 fn test_slices(
4424 parent_array: &ArrayRef,
4425 schema: &SchemaRef,
4426 offset: usize,
4427 length: usize,
4428 ) -> Result<(), ArrowError> {
4429 let subarray = parent_array.slice(offset, length);
4430 let original_batch = RecordBatch::try_new(schema.clone(), vec![subarray])?;
4431
4432 let mut bytes = Vec::new();
4433 let mut writer = StreamWriter::try_new(&mut bytes, schema)?;
4434 writer.write(&original_batch)?;
4435 writer.finish()?;
4436
4437 let mut cursor = std::io::Cursor::new(bytes);
4438 let mut reader = StreamReader::try_new(&mut cursor, None)?;
4439 let returned_batch = reader.next().unwrap()?;
4440
4441 assert_eq!(original_batch, returned_batch);
4442
4443 Ok(())
4444 }
4445
4446 #[test]
4447 fn test_roundtrip_fixed_list() -> Result<(), ArrowError> {
4448 let int_builder = Int64Builder::new();
4449 let mut fixed_list_builder = FixedSizeListBuilder::new(int_builder, 3)
4450 .with_field(Arc::new(Field::new("item", DataType::Int64, false)));
4451
4452 for point in [[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]] {
4453 fixed_list_builder.values().append_value(point[0]);
4454 fixed_list_builder.values().append_value(point[1]);
4455 fixed_list_builder.values().append_value(point[2]);
4456
4457 fixed_list_builder.append(true);
4458 }
4459
4460 let array = Arc::new(fixed_list_builder.finish()) as ArrayRef;
4461
4462 let schema = Arc::new(Schema::new_with_metadata(
4463 vec![Field::new(
4464 "points",
4465 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, false)), 3),
4466 false,
4467 )],
4468 HashMap::default(),
4469 ));
4470
4471 test_slices(&array, &schema, 0, 4)?;
4474 test_slices(&array, &schema, 0, 2)?;
4475 test_slices(&array, &schema, 1, 3)?;
4476 test_slices(&array, &schema, 2, 1)?;
4477
4478 Ok(())
4479 }
4480
4481 #[test]
4482 fn test_roundtrip_fixed_list_w_nulls() -> Result<(), ArrowError> {
4483 let int_builder = Int64Builder::new();
4484 let mut fixed_list_builder = FixedSizeListBuilder::new(int_builder, 3);
4485
4486 for point in [
4487 [Some(1), Some(2), None],
4488 [Some(4), Some(5), Some(6)],
4489 [None, Some(8), Some(9)],
4490 [Some(10), None, None],
4491 ] {
4492 for p in point {
4493 match p {
4494 Some(p) => fixed_list_builder.values().append_value(p),
4495 None => fixed_list_builder.values().append_null(),
4496 }
4497 }
4498
4499 fixed_list_builder.append(true);
4500 }
4501
4502 let array = Arc::new(fixed_list_builder.finish()) as ArrayRef;
4503
4504 let schema = Arc::new(Schema::new_with_metadata(
4505 vec![Field::new(
4506 "points",
4507 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, true)), 3),
4508 true,
4509 )],
4510 HashMap::default(),
4511 ));
4512
4513 test_slices(&array, &schema, 0, 4)?;
4516 test_slices(&array, &schema, 0, 2)?;
4517 test_slices(&array, &schema, 1, 3)?;
4518 test_slices(&array, &schema, 2, 1)?;
4519
4520 Ok(())
4521 }
4522
4523 #[test]
4524 fn test_metadata_encoding_ordering() {
4525 fn create_hash() -> u64 {
4526 let metadata: HashMap<String, String> = [
4527 ("a", "1"), ("b", "2"), ("c", "3"), ("d", "4"), ("e", "5"), ]
4533 .into_iter()
4534 .map(|(k, v)| (k.to_owned(), v.to_owned()))
4535 .collect();
4536
4537 let schema = Arc::new(
4539 Schema::new(vec![
4540 Field::new("a", DataType::Int64, true).with_metadata(metadata.clone()),
4541 ])
4542 .with_metadata(metadata)
4543 .clone(),
4544 );
4545 let batch = RecordBatch::new_empty(schema.clone());
4546
4547 let mut bytes = Vec::new();
4548 let mut w = StreamWriter::try_new(&mut bytes, batch.schema_ref()).unwrap();
4549 w.write(&batch).unwrap();
4550 w.finish().unwrap();
4551
4552 let mut h = std::hash::DefaultHasher::new();
4553 h.write(&bytes);
4554 h.finish()
4555 }
4556
4557 let expected = create_hash();
4558
4559 let all_passed = (0..20).all(|_| create_hash() == expected);
4564 assert!(all_passed);
4565 }
4566
4567 #[test]
4568 fn test_dictionary_tracker_reset() {
4569 let data_gen = IpcDataGenerator::default();
4570 let mut dictionary_tracker = DictionaryTracker::new(false);
4571 let writer_options = IpcWriteOptions::default();
4572 let mut compression_ctx = IpcWriteContext::default();
4573
4574 let schema = Arc::new(Schema::new(vec![Field::new(
4575 "a",
4576 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
4577 false,
4578 )]));
4579
4580 let mut write_single_batch_stream =
4581 |batch: RecordBatch, dict_tracker: &mut DictionaryTracker| -> Vec<u8> {
4582 let mut buffer = Vec::new();
4583
4584 let stream_header = data_gen.schema_to_bytes_with_dictionary_tracker(
4586 &schema,
4587 dict_tracker,
4588 &writer_options,
4589 );
4590 _ = write_message(&mut buffer, stream_header, &writer_options).unwrap();
4591
4592 let (encoded_dicts, encoded_batch) = data_gen
4593 .encode(&batch, dict_tracker, &writer_options, &mut compression_ctx)
4594 .unwrap();
4595 for encoded_dict in encoded_dicts {
4596 _ = write_message(&mut buffer, encoded_dict, &writer_options).unwrap();
4597 }
4598 _ = write_message(&mut buffer, encoded_batch, &writer_options).unwrap();
4599
4600 buffer
4601 };
4602
4603 let batch1 = RecordBatch::try_new(
4604 schema.clone(),
4605 vec![Arc::new(DictionaryArray::new(
4606 UInt8Array::from_iter_values([0]),
4607 Arc::new(StringArray::from_iter_values(["a"])),
4608 ))],
4609 )
4610 .unwrap();
4611 let buffer = write_single_batch_stream(batch1.clone(), &mut dictionary_tracker);
4612
4613 let mut reader = StreamReader::try_new(Cursor::new(buffer), None).unwrap();
4615 let read_batch = reader.next().unwrap().unwrap();
4616 assert_eq!(read_batch, batch1);
4617
4618 dictionary_tracker.clear();
4620
4621 let batch2 = RecordBatch::try_new(
4623 schema.clone(),
4624 vec![Arc::new(DictionaryArray::new(
4625 UInt8Array::from_iter_values([0]),
4626 Arc::new(StringArray::from_iter_values(["a"])),
4627 ))],
4628 )
4629 .unwrap();
4630 let buffer = write_single_batch_stream(batch2.clone(), &mut dictionary_tracker);
4631 let mut reader = StreamReader::try_new(Cursor::new(buffer), None).unwrap();
4632 let read_batch = reader.next().unwrap().unwrap();
4633 assert_eq!(read_batch, batch2);
4634 }
4635}