1use std::cmp::min;
28use std::collections::HashMap;
29use std::io::{BufWriter, Write};
30use std::mem::size_of;
31use std::sync::Arc;
32
33use flatbuffers::FlatBufferBuilder;
34
35use arrow_array::builder::BufferBuilder;
36use arrow_array::cast::*;
37use arrow_array::types::{Int16Type, Int32Type, Int64Type, RunEndIndexType};
38use arrow_array::*;
39use arrow_buffer::bit_util;
40use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer, ToByteSlice};
41use arrow_data::{ArrayData, ArrayDataBuilder, BufferSpec, layout};
42use arrow_schema::*;
43
44use crate::CONTINUATION_MARKER;
45use crate::compression::CompressionCodec;
46pub use crate::compression::CompressionContext;
47use crate::convert::IpcSchemaEncoder;
48
49#[derive(Debug, Clone)]
51pub struct IpcWriteOptions {
52 alignment: u8,
55 write_legacy_ipc_format: bool,
57 metadata_version: crate::MetadataVersion,
66 batch_compression_type: Option<crate::CompressionType>,
69 batch_compression_level: Option<i32>,
71 dictionary_handling: DictionaryHandling,
73}
74
75enum EncodedBuffer {
81 Raw(Buffer),
83 Compressed(Vec<u8>),
85}
86
87impl EncodedBuffer {
88 fn as_slice(&self) -> &[u8] {
89 match self {
90 EncodedBuffer::Raw(b) => b.as_slice(),
91 EncodedBuffer::Compressed(v) => v.as_slice(),
92 }
93 }
94
95 fn len(&self) -> usize {
96 match self {
97 EncodedBuffer::Raw(b) => b.len(),
98 EncodedBuffer::Compressed(v) => v.len(),
99 }
100 }
101}
102#[derive(Default)]
107struct IpcMetadataBuilder {
108 nodes: Vec<crate::FieldNode>,
109 buffers: Vec<crate::Buffer>,
110}
111
112enum IpcBodySink<'a> {
117 Write(&'a mut Vec<u8>),
119 Collect(&'a mut Vec<EncodedBuffer>),
121}
122impl<'a> IpcBodySink<'a> {
123 pub fn write(&mut self, pad_len: usize, buffer: EncodedBuffer) {
125 match self {
126 IpcBodySink::Write(vec) => {
127 vec.extend_from_slice(buffer.as_slice());
128 vec.extend_from_slice(&PADDING[..pad_len]);
129 }
130 IpcBodySink::Collect(vec) => {
131 vec.push(buffer);
132 }
133 }
134 }
135}
136
137struct IpcWriteMetadata {
142 dictionary_block_sizes: Vec<(usize, usize)>,
145 padded_header_len: usize,
147 body_len: usize,
149}
150
151impl IpcWriteOptions {
152 pub fn try_with_compression(
157 mut self,
158 batch_compression_type: Option<crate::CompressionType>,
159 ) -> Result<Self, ArrowError> {
160 self.batch_compression_type = batch_compression_type;
161
162 if self.batch_compression_type.is_some()
163 && self.metadata_version < crate::MetadataVersion::V5
164 {
165 return Err(ArrowError::InvalidArgumentError(
166 "Compression only supported in metadata v5 and above".to_string(),
167 ));
168 }
169 Ok(self)
170 }
171
172 pub fn try_with_compression_level(
177 mut self,
178 batch_compression_level: Option<i32>,
179 ) -> Result<Self, ArrowError> {
180 self.batch_compression_level = batch_compression_level;
181
182 if self.batch_compression_level.is_some()
183 && self.metadata_version < crate::MetadataVersion::V5
184 {
185 return Err(ArrowError::InvalidArgumentError(
186 "Compression only supported in metadata v5 and above".to_string(),
187 ));
188 }
189
190 match (self.batch_compression_type, self.batch_compression_level) {
191 (Some(crate::CompressionType::ZSTD), Some(level)) => {
192 return self.check_zstd_level(level);
193 }
194 (Some(crate::CompressionType::LZ4_FRAME), Some(_)) => {
195 return Err(ArrowError::InvalidArgumentError(
196 "LZ4 Frame compression does not support configurable compression levels"
197 .to_string(),
198 ));
199 }
200 _ => {}
201 }
202
203 Ok(self)
204 }
205
206 #[cfg(not(feature = "zstd"))]
207 fn check_zstd_level(self, _level: i32) -> Result<Self, ArrowError> {
208 Err(ArrowError::InvalidArgumentError(
209 "zstd IPC compression requires the zstd feature".to_string(),
210 ))
211 }
212
213 #[cfg(feature = "zstd")]
214 fn check_zstd_level(self, level: i32) -> Result<Self, ArrowError> {
215 let range = zstd::compression_level_range();
216 if !range.contains(&(level as zstd::zstd_safe::CompressionLevel)) {
217 return Err(ArrowError::InvalidArgumentError(format!(
218 "ZSTD compression level must be between {} and {}, got {}",
219 range.start(),
220 range.end(),
221 level,
222 )));
223 }
224
225 Ok(self)
226 }
227
228 pub fn try_new(
230 alignment: usize,
231 write_legacy_ipc_format: bool,
232 metadata_version: crate::MetadataVersion,
233 ) -> Result<Self, ArrowError> {
234 let is_alignment_valid =
235 alignment == 8 || alignment == 16 || alignment == 32 || alignment == 64;
236 if !is_alignment_valid {
237 return Err(ArrowError::InvalidArgumentError(
238 "Alignment should be 8, 16, 32, or 64.".to_string(),
239 ));
240 }
241 let alignment: u8 = u8::try_from(alignment).expect("range already checked");
242 match metadata_version {
243 crate::MetadataVersion::V1
244 | crate::MetadataVersion::V2
245 | crate::MetadataVersion::V3 => Err(ArrowError::InvalidArgumentError(
246 "Writing IPC metadata version 3 and lower not supported".to_string(),
247 )),
248 #[allow(deprecated)]
249 crate::MetadataVersion::V4 => Ok(Self {
250 alignment,
251 write_legacy_ipc_format,
252 metadata_version,
253 batch_compression_type: None,
254 batch_compression_level: None,
255 dictionary_handling: DictionaryHandling::default(),
256 }),
257 crate::MetadataVersion::V5 => {
258 if write_legacy_ipc_format {
259 Err(ArrowError::InvalidArgumentError(
260 "Legacy IPC format only supported on metadata version 4".to_string(),
261 ))
262 } else {
263 Ok(Self {
264 alignment,
265 write_legacy_ipc_format,
266 metadata_version,
267 batch_compression_type: None,
268 batch_compression_level: None,
269 dictionary_handling: DictionaryHandling::default(),
270 })
271 }
272 }
273 z => Err(ArrowError::InvalidArgumentError(format!(
274 "Unsupported crate::MetadataVersion {z:?}"
275 ))),
276 }
277 }
278
279 pub fn with_dictionary_handling(mut self, dictionary_handling: DictionaryHandling) -> Self {
281 self.dictionary_handling = dictionary_handling;
282 self
283 }
284}
285
286impl Default for IpcWriteOptions {
287 fn default() -> Self {
288 Self {
289 alignment: 64,
290 write_legacy_ipc_format: false,
291 metadata_version: crate::MetadataVersion::V5,
292 batch_compression_type: None,
293 batch_compression_level: None,
294 dictionary_handling: DictionaryHandling::default(),
295 }
296 }
297}
298
299#[derive(Debug, Default)]
300pub struct IpcDataGenerator {}
334
335impl IpcDataGenerator {
336 pub fn schema_to_bytes_with_dictionary_tracker(
339 &self,
340 schema: &Schema,
341 dictionary_tracker: &mut DictionaryTracker,
342 write_options: &IpcWriteOptions,
343 ) -> EncodedData {
344 let mut fbb = FlatBufferBuilder::new();
345 let schema = {
346 let fb = IpcSchemaEncoder::new()
347 .with_dictionary_tracker(dictionary_tracker)
348 .schema_to_fb_offset(&mut fbb, schema);
349 fb.as_union_value()
350 };
351
352 let mut message = crate::MessageBuilder::new(&mut fbb);
353 message.add_version(write_options.metadata_version);
354 message.add_header_type(crate::MessageHeader::Schema);
355 message.add_bodyLength(0);
356 message.add_header(schema);
357 let data = message.finish();
359 fbb.finish(data, None);
360
361 let data = fbb.finished_data();
362 EncodedData {
363 ipc_message: data.to_vec(),
364 arrow_data: vec![],
365 }
366 }
367
368 fn _encode_dictionaries<I: Iterator<Item = i64>>(
369 &self,
370 column: &ArrayRef,
371 encoded_dictionaries: &mut Vec<EncodedData>,
372 dictionary_tracker: &mut DictionaryTracker,
373 write_options: &IpcWriteOptions,
374 dict_id: &mut I,
375 compression_context: &mut CompressionContext,
376 ) -> Result<(), ArrowError> {
377 match column.data_type() {
378 DataType::Struct(fields) => {
379 let s = as_struct_array(column);
380 for (field, column) in fields.iter().zip(s.columns()) {
381 self.encode_dictionaries(
382 field,
383 column,
384 encoded_dictionaries,
385 dictionary_tracker,
386 write_options,
387 dict_id,
388 compression_context,
389 )?;
390 }
391 }
392 DataType::RunEndEncoded(_, values) => {
393 let data = column.to_data();
394 if data.child_data().len() != 2 {
395 return Err(ArrowError::InvalidArgumentError(format!(
396 "The run encoded array should have exactly two child arrays. Found {}",
397 data.child_data().len()
398 )));
399 }
400 let values_array = make_array(data.child_data()[1].clone());
403 self.encode_dictionaries(
404 values,
405 &values_array,
406 encoded_dictionaries,
407 dictionary_tracker,
408 write_options,
409 dict_id,
410 compression_context,
411 )?;
412 }
413 DataType::List(field) => {
414 let list = as_list_array(column);
415 self.encode_dictionaries(
416 field,
417 list.values(),
418 encoded_dictionaries,
419 dictionary_tracker,
420 write_options,
421 dict_id,
422 compression_context,
423 )?;
424 }
425 DataType::LargeList(field) => {
426 let list = as_large_list_array(column);
427 self.encode_dictionaries(
428 field,
429 list.values(),
430 encoded_dictionaries,
431 dictionary_tracker,
432 write_options,
433 dict_id,
434 compression_context,
435 )?;
436 }
437 DataType::ListView(field) => {
438 let list = column.as_list_view::<i32>();
439 self.encode_dictionaries(
440 field,
441 list.values(),
442 encoded_dictionaries,
443 dictionary_tracker,
444 write_options,
445 dict_id,
446 compression_context,
447 )?;
448 }
449 DataType::LargeListView(field) => {
450 let list = column.as_list_view::<i64>();
451 self.encode_dictionaries(
452 field,
453 list.values(),
454 encoded_dictionaries,
455 dictionary_tracker,
456 write_options,
457 dict_id,
458 compression_context,
459 )?;
460 }
461 DataType::FixedSizeList(field, _) => {
462 let list = column
463 .as_any()
464 .downcast_ref::<FixedSizeListArray>()
465 .expect("Unable to downcast to fixed size list array");
466 self.encode_dictionaries(
467 field,
468 list.values(),
469 encoded_dictionaries,
470 dictionary_tracker,
471 write_options,
472 dict_id,
473 compression_context,
474 )?;
475 }
476 DataType::Map(field, _) => {
477 let map_array = as_map_array(column);
478
479 let (keys, values) = match field.data_type() {
480 DataType::Struct(fields) if fields.len() == 2 => (&fields[0], &fields[1]),
481 _ => panic!("Incorrect field data type {:?}", field.data_type()),
482 };
483
484 self.encode_dictionaries(
486 keys,
487 map_array.keys(),
488 encoded_dictionaries,
489 dictionary_tracker,
490 write_options,
491 dict_id,
492 compression_context,
493 )?;
494
495 self.encode_dictionaries(
497 values,
498 map_array.values(),
499 encoded_dictionaries,
500 dictionary_tracker,
501 write_options,
502 dict_id,
503 compression_context,
504 )?;
505 }
506 DataType::Union(fields, _) => {
507 let union = as_union_array(column);
508 for (type_id, field) in fields.iter() {
509 let column = union.child(type_id);
510 self.encode_dictionaries(
511 field,
512 column,
513 encoded_dictionaries,
514 dictionary_tracker,
515 write_options,
516 dict_id,
517 compression_context,
518 )?;
519 }
520 }
521 _ => (),
522 }
523
524 Ok(())
525 }
526
527 #[allow(clippy::too_many_arguments)]
528 fn encode_dictionaries<I: Iterator<Item = i64>>(
529 &self,
530 field: &Field,
531 column: &ArrayRef,
532 encoded_dictionaries: &mut Vec<EncodedData>,
533 dictionary_tracker: &mut DictionaryTracker,
534 write_options: &IpcWriteOptions,
535 dict_id_seq: &mut I,
536 compression_context: &mut CompressionContext,
537 ) -> Result<(), ArrowError> {
538 match column.data_type() {
539 DataType::Dictionary(_key_type, _value_type) => {
540 let dict_data = column.to_data();
541 let dict_values = &dict_data.child_data()[0];
542
543 let values = make_array(dict_data.child_data()[0].clone());
544
545 self._encode_dictionaries(
546 &values,
547 encoded_dictionaries,
548 dictionary_tracker,
549 write_options,
550 dict_id_seq,
551 compression_context,
552 )?;
553
554 let dict_id = dict_id_seq.next().ok_or_else(|| {
558 ArrowError::IpcError(format!(
559 "no dict id for field {:?}: field.data_type={:?}, column.data_type={:?}",
560 field.name(),
561 field.data_type(),
562 column.data_type()
563 ))
564 })?;
565
566 match dictionary_tracker.insert_column(
567 dict_id,
568 column,
569 write_options.dictionary_handling,
570 )? {
571 DictionaryUpdate::None => {}
572 DictionaryUpdate::New | DictionaryUpdate::Replaced => {
573 encoded_dictionaries.push(self.dictionary_batch_to_bytes(
574 dict_id,
575 dict_values,
576 write_options,
577 false,
578 compression_context,
579 )?);
580 }
581 DictionaryUpdate::Delta(data) => {
582 encoded_dictionaries.push(self.dictionary_batch_to_bytes(
583 dict_id,
584 &data,
585 write_options,
586 true,
587 compression_context,
588 )?);
589 }
590 }
591 }
592 _ => self._encode_dictionaries(
593 column,
594 encoded_dictionaries,
595 dictionary_tracker,
596 write_options,
597 dict_id_seq,
598 compression_context,
599 )?,
600 }
601
602 Ok(())
603 }
604
605 pub fn encode(
609 &self,
610 batch: &RecordBatch,
611 dictionary_tracker: &mut DictionaryTracker,
612 write_options: &IpcWriteOptions,
613 compression_context: &mut CompressionContext,
614 ) -> Result<(Vec<EncodedData>, EncodedData), ArrowError> {
615 let encoded_dictionaries = self.encode_all_dicts(
616 batch,
617 dictionary_tracker,
618 write_options,
619 compression_context,
620 )?;
621 let mut arrow_data = Vec::new();
622 let (ipc_message, _, tail_pad) = self.record_batch_to_bytes(
623 batch,
624 write_options,
625 compression_context,
626 &mut IpcBodySink::Write(&mut arrow_data),
627 )?;
628 arrow_data.extend_from_slice(&PADDING[..tail_pad]);
629 Ok((
630 encoded_dictionaries,
631 EncodedData {
632 ipc_message,
633 arrow_data,
634 },
635 ))
636 }
637
638 fn encode_all_dicts(
640 &self,
641 batch: &RecordBatch,
642 dictionary_tracker: &mut DictionaryTracker,
643 write_options: &IpcWriteOptions,
644 compression_context: &mut CompressionContext,
645 ) -> Result<Vec<EncodedData>, ArrowError> {
646 let schema = batch.schema();
647 let mut encoded_dictionaries = Vec::with_capacity(schema.flattened_fields().len());
648 let mut dict_id = dictionary_tracker.dict_ids.clone().into_iter();
649 for (i, field) in schema.fields().iter().enumerate() {
650 self.encode_dictionaries(
651 field,
652 batch.column(i),
653 &mut encoded_dictionaries,
654 dictionary_tracker,
655 write_options,
656 &mut dict_id,
657 compression_context,
658 )?;
659 }
660 Ok(encoded_dictionaries)
661 }
662
663 fn write<W: Write>(
667 &self,
668 batch: &RecordBatch,
669 dictionary_tracker: &mut DictionaryTracker,
670 write_options: &IpcWriteOptions,
671 compression_context: &mut CompressionContext,
672 writer: &mut W,
673 ) -> Result<IpcWriteMetadata, ArrowError> {
674 let encoded_dictionaries = self.encode_all_dicts(
675 batch,
676 dictionary_tracker,
677 write_options,
678 compression_context,
679 )?;
680
681 let mut dictionary_block_sizes = Vec::with_capacity(encoded_dictionaries.len());
682 for dict in encoded_dictionaries {
683 dictionary_block_sizes.push(write_message(&mut *writer, dict, write_options)?);
684 }
685
686 let capacity = batch
687 .columns()
688 .iter()
689 .map(|a| estimate_encoded_buffer_count(a.data_type()))
690 .sum();
691 let mut encoded_buffers: Vec<EncodedBuffer> = Vec::with_capacity(capacity);
692 let (ipc_message, body_len, tail_pad) = self.record_batch_to_bytes(
693 batch,
694 write_options,
695 compression_context,
696 &mut IpcBodySink::Collect(&mut encoded_buffers),
697 )?;
698
699 let alignment = write_options.alignment;
700 let a = usize::from(alignment - 1);
701 let prefix_size = if write_options.write_legacy_ipc_format {
702 4
703 } else {
704 8
705 };
706 let aligned_size = (ipc_message.len() + prefix_size + a) & !a;
707 write_continuation(
708 &mut *writer,
709 write_options,
710 (aligned_size - prefix_size) as i32,
711 )?;
712 writer.write_all(&ipc_message)?;
713 writer.write_all(&PADDING[..aligned_size - ipc_message.len() - prefix_size])?;
714 for enc in &encoded_buffers {
715 writer.write_all(enc.as_slice())?;
716 writer.write_all(&PADDING[..pad_to_alignment(alignment, enc.len())])?;
717 }
718 writer.write_all(&PADDING[..tail_pad])?;
719
720 Ok(IpcWriteMetadata {
721 dictionary_block_sizes,
722 padded_header_len: aligned_size,
723 body_len,
724 })
725 }
726
727 #[deprecated(since = "57.0.0", note = "Use `encode` instead")]
731 pub fn encoded_batch(
732 &self,
733 batch: &RecordBatch,
734 dictionary_tracker: &mut DictionaryTracker,
735 write_options: &IpcWriteOptions,
736 ) -> Result<(Vec<EncodedData>, EncodedData), ArrowError> {
737 self.encode(
738 batch,
739 dictionary_tracker,
740 write_options,
741 &mut Default::default(),
742 )
743 }
744
745 fn record_batch_to_bytes(
751 &self,
752 batch: &RecordBatch,
753 write_options: &IpcWriteOptions,
754 compression_context: &mut CompressionContext,
755 sink: &mut IpcBodySink<'_>,
756 ) -> Result<(Vec<u8>, usize, usize), ArrowError> {
757 let mut fbb = FlatBufferBuilder::new();
758
759 let batch_compression_type = write_options.batch_compression_type;
760
761 let compression = batch_compression_type.map(|batch_compression_type| {
762 let mut c = crate::BodyCompressionBuilder::new(&mut fbb);
763 c.add_method(crate::BodyCompressionMethod::BUFFER);
764 c.add_codec(batch_compression_type);
765 c.finish()
766 });
767
768 let batch_compression_level = write_options.batch_compression_level;
769 let compression_codec: Option<CompressionCodec> = batch_compression_type
770 .map(|compression_type| match batch_compression_level {
771 Some(level) => {
772 CompressionCodec::try_new_with_compression_level(compression_type, level)
773 }
774 None => compression_type.try_into(),
775 })
776 .transpose()?;
777
778 let alignment = write_options.alignment;
779 let mut variadic_buffer_counts = vec![];
780 let mut meta = IpcMetadataBuilder::default();
781 let mut offset = 0i64;
782
783 for array in batch.columns() {
784 let array_data = array.to_data();
785 offset = write_array_data(
786 &array_data,
787 &mut meta,
788 sink,
789 offset,
790 compression_codec,
791 compression_context,
792 write_options,
793 )?;
794 append_variadic_buffer_counts(&mut variadic_buffer_counts, &array_data);
795 }
796
797 let tail_pad = pad_to_alignment(alignment, offset as usize);
798 let body_len = offset as usize + tail_pad;
799
800 let buffers = fbb.create_vector(&meta.buffers);
801 let nodes = fbb.create_vector(&meta.nodes);
802 let variadic_buffer = if variadic_buffer_counts.is_empty() {
803 None
804 } else {
805 Some(fbb.create_vector(&variadic_buffer_counts))
806 };
807
808 let root = {
809 let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb);
810 batch_builder.add_length(batch.num_rows() as i64);
811 batch_builder.add_nodes(nodes);
812 batch_builder.add_buffers(buffers);
813 if let Some(c) = compression {
814 batch_builder.add_compression(c);
815 }
816 if let Some(v) = variadic_buffer {
817 batch_builder.add_variadicBufferCounts(v);
818 }
819 batch_builder.finish().as_union_value()
820 };
821 let mut message = crate::MessageBuilder::new(&mut fbb);
823 message.add_version(write_options.metadata_version);
824 message.add_header_type(crate::MessageHeader::RecordBatch);
825 message.add_bodyLength(body_len as i64);
826 message.add_header(root);
827 let root = message.finish();
828 fbb.finish(root, None);
829
830 Ok((fbb.finished_data().to_vec(), body_len, tail_pad))
831 }
832
833 fn dictionary_batch_to_bytes(
836 &self,
837 dict_id: i64,
838 array_data: &ArrayData,
839 write_options: &IpcWriteOptions,
840 is_delta: bool,
841 compression_context: &mut CompressionContext,
842 ) -> Result<EncodedData, ArrowError> {
843 let mut fbb = FlatBufferBuilder::new();
844
845 let mut arrow_data: Vec<u8> = vec![];
846
847 let batch_compression_type = write_options.batch_compression_type;
849
850 let compression = batch_compression_type.map(|batch_compression_type| {
851 let mut c = crate::BodyCompressionBuilder::new(&mut fbb);
852 c.add_method(crate::BodyCompressionMethod::BUFFER);
853 c.add_codec(batch_compression_type);
854 c.finish()
855 });
856
857 let batch_compression_level = write_options.batch_compression_level;
858 let compression_codec: Option<CompressionCodec> = batch_compression_type
859 .map(|batch_compression_type| match batch_compression_level {
860 Some(level) => {
861 CompressionCodec::try_new_with_compression_level(batch_compression_type, level)
862 }
863 None => batch_compression_type.try_into(),
864 })
865 .transpose()?;
866
867 let alignment = write_options.alignment;
868 let mut meta = IpcMetadataBuilder::default();
869 let mut sink = IpcBodySink::Write(&mut arrow_data);
870 let offset = write_array_data(
871 array_data,
872 &mut meta,
873 &mut sink,
874 0,
875 compression_codec,
876 compression_context,
877 write_options,
878 )?;
879
880 let mut variadic_buffer_counts = vec![];
881 append_variadic_buffer_counts(&mut variadic_buffer_counts, array_data);
882
883 let tail_pad = pad_to_alignment(alignment, offset as usize);
885 let body_len = offset as usize + tail_pad;
886 arrow_data.extend_from_slice(&PADDING[..tail_pad]);
887
888 let buffers = fbb.create_vector(&meta.buffers);
890 let nodes = fbb.create_vector(&meta.nodes);
891 let variadic_buffer = if variadic_buffer_counts.is_empty() {
892 None
893 } else {
894 Some(fbb.create_vector(&variadic_buffer_counts))
895 };
896
897 let root = {
898 let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb);
899 batch_builder.add_length(array_data.len() as i64);
900 batch_builder.add_nodes(nodes);
901 batch_builder.add_buffers(buffers);
902 if let Some(c) = compression {
903 batch_builder.add_compression(c);
904 }
905 if let Some(v) = variadic_buffer {
906 batch_builder.add_variadicBufferCounts(v);
907 }
908 batch_builder.finish()
909 };
910
911 let root = {
912 let mut batch_builder = crate::DictionaryBatchBuilder::new(&mut fbb);
913 batch_builder.add_id(dict_id);
914 batch_builder.add_data(root);
915 batch_builder.add_isDelta(is_delta);
916 batch_builder.finish().as_union_value()
917 };
918
919 let root = {
920 let mut message_builder = crate::MessageBuilder::new(&mut fbb);
921 message_builder.add_version(write_options.metadata_version);
922 message_builder.add_header_type(crate::MessageHeader::DictionaryBatch);
923 message_builder.add_bodyLength(body_len as i64);
924 message_builder.add_header(root);
925 message_builder.finish()
926 };
927
928 fbb.finish(root, None);
929 let finished_data = fbb.finished_data();
930
931 Ok(EncodedData {
932 ipc_message: finished_data.to_vec(),
933 arrow_data,
934 })
935 }
936}
937
938fn append_variadic_buffer_counts(counts: &mut Vec<i64>, array: &ArrayData) {
939 match array.data_type() {
940 DataType::BinaryView | DataType::Utf8View => {
941 counts.push(array.buffers().len() as i64 - 1);
944 }
945 DataType::Dictionary(_, _) => {
946 }
949 _ => {
950 for child in array.child_data() {
951 append_variadic_buffer_counts(counts, child)
952 }
953 }
954 }
955}
956
957pub(crate) fn unslice_run_array(arr: ArrayData) -> Result<ArrayData, ArrowError> {
958 match arr.data_type() {
959 DataType::RunEndEncoded(k, _) => match k.data_type() {
960 DataType::Int16 => {
961 Ok(into_zero_offset_run_array(RunArray::<Int16Type>::from(arr))?.into_data())
962 }
963 DataType::Int32 => {
964 Ok(into_zero_offset_run_array(RunArray::<Int32Type>::from(arr))?.into_data())
965 }
966 DataType::Int64 => {
967 Ok(into_zero_offset_run_array(RunArray::<Int64Type>::from(arr))?.into_data())
968 }
969 d => unreachable!("Unexpected data type {d}"),
970 },
971 d => Err(ArrowError::InvalidArgumentError(format!(
972 "The given array is not a run array. Data type of given array: {d}"
973 ))),
974 }
975}
976
977fn into_zero_offset_run_array<R: RunEndIndexType>(
980 run_array: RunArray<R>,
981) -> Result<RunArray<R>, ArrowError> {
982 let run_ends = run_array.run_ends();
983 if run_ends.offset() == 0 && run_ends.max_value() == run_ends.len() {
984 return Ok(run_array);
985 }
986
987 let start_physical_index = run_ends.get_start_physical_index();
989
990 let end_physical_index = run_ends.get_end_physical_index();
992
993 let physical_length = end_physical_index - start_physical_index + 1;
994
995 let offset = R::Native::usize_as(run_ends.offset());
997 let mut builder = BufferBuilder::<R::Native>::new(physical_length);
998 for run_end_value in &run_ends.values()[start_physical_index..end_physical_index] {
999 builder.append(run_end_value.sub_wrapping(offset));
1000 }
1001 builder.append(R::Native::from_usize(run_array.len()).unwrap());
1002 let new_run_ends = unsafe {
1003 ArrayDataBuilder::new(R::DATA_TYPE)
1006 .len(physical_length)
1007 .add_buffer(builder.finish())
1008 .build_unchecked()
1009 };
1010
1011 let new_values = run_array
1013 .values()
1014 .slice(start_physical_index, physical_length)
1015 .into_data();
1016
1017 let builder = ArrayDataBuilder::new(run_array.data_type().clone())
1018 .len(run_array.len())
1019 .add_child_data(new_run_ends)
1020 .add_child_data(new_values);
1021 let array_data = unsafe {
1022 builder.build_unchecked()
1025 };
1026 Ok(array_data.into())
1027}
1028
1029#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
1031pub enum DictionaryHandling {
1032 #[default]
1034 Resend,
1035 Delta,
1041}
1042
1043#[derive(Debug, Clone)]
1045pub enum DictionaryUpdate {
1046 None,
1049 New,
1051 Replaced,
1053 Delta(ArrayData),
1055}
1056
1057#[derive(Debug)]
1063pub struct DictionaryTracker {
1064 written: HashMap<i64, ArrayData>,
1066 dict_ids: Vec<i64>,
1067 error_on_replacement: bool,
1068}
1069
1070impl DictionaryTracker {
1071 pub fn new(error_on_replacement: bool) -> Self {
1077 #[allow(deprecated)]
1078 Self {
1079 written: HashMap::new(),
1080 dict_ids: Vec::new(),
1081 error_on_replacement,
1082 }
1083 }
1084
1085 pub fn next_dict_id(&mut self) -> i64 {
1087 let next = self
1088 .dict_ids
1089 .last()
1090 .copied()
1091 .map(|i| i + 1)
1092 .unwrap_or_default();
1093
1094 self.dict_ids.push(next);
1095 next
1096 }
1097
1098 pub fn dict_id(&mut self) -> &[i64] {
1101 &self.dict_ids
1102 }
1103
1104 #[deprecated(since = "56.1.0", note = "Use `insert_column` instead")]
1114 pub fn insert(&mut self, dict_id: i64, column: &ArrayRef) -> Result<bool, ArrowError> {
1115 let dict_data = column.to_data();
1116 let dict_values = &dict_data.child_data()[0];
1117
1118 if let Some(last) = self.written.get(&dict_id) {
1120 if ArrayData::ptr_eq(&last.child_data()[0], dict_values) {
1121 return Ok(false);
1123 }
1124 if self.error_on_replacement {
1125 if last.child_data()[0] == *dict_values {
1127 return Ok(false);
1129 }
1130 return Err(ArrowError::InvalidArgumentError(
1131 "Dictionary replacement detected when writing IPC file format. \
1132 Arrow IPC files only support a single dictionary for a given field \
1133 across all batches."
1134 .to_string(),
1135 ));
1136 }
1137 }
1138
1139 self.written.insert(dict_id, dict_data);
1140 Ok(true)
1141 }
1142
1143 pub fn insert_column(
1159 &mut self,
1160 dict_id: i64,
1161 column: &ArrayRef,
1162 dict_handling: DictionaryHandling,
1163 ) -> Result<DictionaryUpdate, ArrowError> {
1164 let new_data = column.to_data();
1165 let new_values = &new_data.child_data()[0];
1166
1167 let Some(old) = self.written.get(&dict_id) else {
1169 self.written.insert(dict_id, new_data);
1170 return Ok(DictionaryUpdate::New);
1171 };
1172
1173 let old_values = &old.child_data()[0];
1176 if ArrayData::ptr_eq(old_values, new_values) {
1177 return Ok(DictionaryUpdate::None);
1178 }
1179
1180 let comparison = compare_dictionaries(old_values, new_values);
1182 if matches!(comparison, DictionaryComparison::Equal) {
1183 return Ok(DictionaryUpdate::None);
1184 }
1185
1186 const REPLACEMENT_ERROR: &str = "Dictionary replacement detected when writing IPC file format. \
1187 Arrow IPC files only support a single dictionary for a given field \
1188 across all batches.";
1189
1190 match comparison {
1191 DictionaryComparison::NotEqual => {
1192 if self.error_on_replacement {
1193 return Err(ArrowError::InvalidArgumentError(
1194 REPLACEMENT_ERROR.to_string(),
1195 ));
1196 }
1197
1198 self.written.insert(dict_id, new_data);
1199 Ok(DictionaryUpdate::Replaced)
1200 }
1201 DictionaryComparison::Delta => match dict_handling {
1202 DictionaryHandling::Resend => {
1203 if self.error_on_replacement {
1204 return Err(ArrowError::InvalidArgumentError(
1205 REPLACEMENT_ERROR.to_string(),
1206 ));
1207 }
1208
1209 self.written.insert(dict_id, new_data);
1210 Ok(DictionaryUpdate::Replaced)
1211 }
1212 DictionaryHandling::Delta => {
1213 let delta =
1214 new_values.slice(old_values.len(), new_values.len() - old_values.len());
1215 self.written.insert(dict_id, new_data);
1216 Ok(DictionaryUpdate::Delta(delta))
1217 }
1218 },
1219 DictionaryComparison::Equal => unreachable!("Already checked equal case"),
1220 }
1221 }
1222
1223 pub fn clear(&mut self) {
1229 self.dict_ids.clear();
1230 self.written.clear();
1231 }
1232}
1233
1234#[derive(Debug, Clone)]
1236enum DictionaryComparison {
1237 NotEqual,
1239 Equal,
1241 Delta,
1244}
1245
1246fn compare_dictionaries(old: &ArrayData, new: &ArrayData) -> DictionaryComparison {
1248 let existing_len = old.len();
1250 let new_len = new.len();
1251 if existing_len == new_len {
1252 if *old == *new {
1253 return DictionaryComparison::Equal;
1254 } else {
1255 return DictionaryComparison::NotEqual;
1256 }
1257 }
1258
1259 if new_len < existing_len {
1261 return DictionaryComparison::NotEqual;
1262 }
1263
1264 if new.slice(0, existing_len) == *old {
1266 return DictionaryComparison::Delta;
1267 }
1268
1269 DictionaryComparison::NotEqual
1270}
1271
1272pub struct FileWriter<W> {
1295 writer: W,
1297 write_options: IpcWriteOptions,
1299 schema: SchemaRef,
1301 block_offsets: usize,
1303 dictionary_blocks: Vec<crate::Block>,
1305 record_blocks: Vec<crate::Block>,
1307 finished: bool,
1309 dictionary_tracker: DictionaryTracker,
1311 custom_metadata: HashMap<String, String>,
1313
1314 data_gen: IpcDataGenerator,
1315
1316 compression_context: CompressionContext,
1317}
1318
1319impl<W: Write> FileWriter<BufWriter<W>> {
1320 pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1324 Self::try_new(BufWriter::new(writer), schema)
1325 }
1326}
1327
1328impl<W: Write> FileWriter<W> {
1329 pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1337 let write_options = IpcWriteOptions::default();
1338 Self::try_new_with_options(writer, schema, write_options)
1339 }
1340
1341 pub fn try_new_with_options(
1349 mut writer: W,
1350 schema: &Schema,
1351 write_options: IpcWriteOptions,
1352 ) -> Result<Self, ArrowError> {
1353 let data_gen = IpcDataGenerator::default();
1354 let pad_len = pad_to_alignment(write_options.alignment, super::ARROW_MAGIC.len());
1356 let header_size = super::ARROW_MAGIC.len() + pad_len;
1357 writer.write_all(&super::ARROW_MAGIC)?;
1358 writer.write_all(&PADDING[..pad_len])?;
1359 let mut dictionary_tracker = DictionaryTracker::new(true);
1361 let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker(
1362 schema,
1363 &mut dictionary_tracker,
1364 &write_options,
1365 );
1366 let (meta, data) = write_message(&mut writer, encoded_message, &write_options)?;
1367 Ok(Self {
1368 writer,
1369 write_options,
1370 schema: Arc::new(schema.clone()),
1371 block_offsets: meta + data + header_size,
1372 dictionary_blocks: vec![],
1373 record_blocks: vec![],
1374 finished: false,
1375 dictionary_tracker,
1376 custom_metadata: HashMap::new(),
1377 data_gen,
1378 compression_context: CompressionContext::default(),
1379 })
1380 }
1381
1382 pub fn write_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
1384 self.custom_metadata.insert(key.into(), value.into());
1385 }
1386
1387 pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1389 if self.finished {
1390 return Err(ArrowError::IpcError(
1391 "Cannot write record batch to file writer as it is closed".to_string(),
1392 ));
1393 }
1394
1395 let meta = self.data_gen.write(
1396 batch,
1397 &mut self.dictionary_tracker,
1398 &self.write_options,
1399 &mut self.compression_context,
1400 &mut self.writer,
1401 )?;
1402
1403 for (header_len, body_len) in meta.dictionary_block_sizes {
1404 let block = crate::Block::new(
1405 self.block_offsets as i64,
1406 header_len as i32,
1407 body_len as i64,
1408 );
1409 self.dictionary_blocks.push(block);
1410 self.block_offsets += header_len + body_len;
1411 }
1412
1413 let block = crate::Block::new(
1415 self.block_offsets as i64,
1416 meta.padded_header_len as i32,
1417 meta.body_len as i64,
1418 );
1419 self.record_blocks.push(block);
1420 self.block_offsets += meta.padded_header_len + meta.body_len;
1421 Ok(())
1422 }
1423
1424 pub fn finish(&mut self) -> Result<(), ArrowError> {
1426 if self.finished {
1427 return Err(ArrowError::IpcError(
1428 "Cannot write footer to file writer as it is closed".to_string(),
1429 ));
1430 }
1431
1432 write_continuation(&mut self.writer, &self.write_options, 0)?;
1434
1435 let mut fbb = FlatBufferBuilder::new();
1436 let dictionaries = fbb.create_vector(&self.dictionary_blocks);
1437 let record_batches = fbb.create_vector(&self.record_blocks);
1438
1439 self.dictionary_tracker.clear();
1441 let schema = IpcSchemaEncoder::new()
1442 .with_dictionary_tracker(&mut self.dictionary_tracker)
1443 .schema_to_fb_offset(&mut fbb, &self.schema);
1444 let fb_custom_metadata = (!self.custom_metadata.is_empty())
1445 .then(|| crate::convert::metadata_to_fb(&mut fbb, &self.custom_metadata));
1446
1447 let root = {
1448 let mut footer_builder = crate::FooterBuilder::new(&mut fbb);
1449 footer_builder.add_version(self.write_options.metadata_version);
1450 footer_builder.add_schema(schema);
1451 footer_builder.add_dictionaries(dictionaries);
1452 footer_builder.add_recordBatches(record_batches);
1453 if let Some(fb_custom_metadata) = fb_custom_metadata {
1454 footer_builder.add_custom_metadata(fb_custom_metadata);
1455 }
1456 footer_builder.finish()
1457 };
1458 fbb.finish(root, None);
1459 let footer_data = fbb.finished_data();
1460 self.writer.write_all(footer_data)?;
1461 self.writer
1462 .write_all(&(footer_data.len() as i32).to_le_bytes())?;
1463 self.writer.write_all(&super::ARROW_MAGIC)?;
1464 self.writer.flush()?;
1465 self.finished = true;
1466
1467 Ok(())
1468 }
1469
1470 pub fn schema(&self) -> &SchemaRef {
1472 &self.schema
1473 }
1474
1475 pub fn get_ref(&self) -> &W {
1477 &self.writer
1478 }
1479
1480 pub fn get_mut(&mut self) -> &mut W {
1484 &mut self.writer
1485 }
1486
1487 pub fn flush(&mut self) -> Result<(), ArrowError> {
1491 self.writer.flush()?;
1492 Ok(())
1493 }
1494
1495 pub fn into_inner(mut self) -> Result<W, ArrowError> {
1504 if !self.finished {
1505 self.finish()?;
1507 }
1508 Ok(self.writer)
1509 }
1510}
1511
1512impl<W: Write> RecordBatchWriter for FileWriter<W> {
1513 fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1514 self.write(batch)
1515 }
1516
1517 fn close(mut self) -> Result<(), ArrowError> {
1518 self.finish()
1519 }
1520}
1521
1522pub struct StreamWriter<W> {
1596 writer: W,
1598 write_options: IpcWriteOptions,
1600 finished: bool,
1602 dictionary_tracker: DictionaryTracker,
1604
1605 data_gen: IpcDataGenerator,
1606
1607 compression_context: CompressionContext,
1608}
1609
1610impl<W: Write> StreamWriter<BufWriter<W>> {
1611 pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1615 Self::try_new(BufWriter::new(writer), schema)
1616 }
1617}
1618
1619impl<W: Write> StreamWriter<W> {
1620 pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1628 let write_options = IpcWriteOptions::default();
1629 Self::try_new_with_options(writer, schema, write_options)
1630 }
1631
1632 pub fn try_new_with_options(
1638 mut writer: W,
1639 schema: &Schema,
1640 write_options: IpcWriteOptions,
1641 ) -> Result<Self, ArrowError> {
1642 let data_gen = IpcDataGenerator::default();
1643 let mut dictionary_tracker = DictionaryTracker::new(false);
1644
1645 let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker(
1647 schema,
1648 &mut dictionary_tracker,
1649 &write_options,
1650 );
1651 write_message(&mut writer, encoded_message, &write_options)?;
1652 Ok(Self {
1653 writer,
1654 write_options,
1655 finished: false,
1656 dictionary_tracker,
1657 data_gen,
1658 compression_context: CompressionContext::default(),
1659 })
1660 }
1661
1662 pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1664 if self.finished {
1665 return Err(ArrowError::IpcError(
1666 "Cannot write record batch to stream writer as it is closed".to_string(),
1667 ));
1668 }
1669
1670 self.data_gen.write(
1671 batch,
1672 &mut self.dictionary_tracker,
1673 &self.write_options,
1674 &mut self.compression_context,
1675 &mut self.writer,
1676 )?;
1677 Ok(())
1678 }
1679
1680 pub fn finish(&mut self) -> Result<(), ArrowError> {
1682 if self.finished {
1683 return Err(ArrowError::IpcError(
1684 "Cannot write footer to stream writer as it is closed".to_string(),
1685 ));
1686 }
1687
1688 write_continuation(&mut self.writer, &self.write_options, 0)?;
1689 self.writer.flush()?;
1690
1691 self.finished = true;
1692
1693 Ok(())
1694 }
1695
1696 pub fn get_ref(&self) -> &W {
1698 &self.writer
1699 }
1700
1701 pub fn get_mut(&mut self) -> &mut W {
1705 &mut self.writer
1706 }
1707
1708 pub fn flush(&mut self) -> Result<(), ArrowError> {
1712 self.writer.flush()?;
1713 Ok(())
1714 }
1715
1716 pub fn into_inner(mut self) -> Result<W, ArrowError> {
1754 if !self.finished {
1755 self.finish()?;
1757 }
1758 Ok(self.writer)
1759 }
1760}
1761
1762impl<W: Write> RecordBatchWriter for StreamWriter<W> {
1763 fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1764 self.write(batch)
1765 }
1766
1767 fn close(mut self) -> Result<(), ArrowError> {
1768 self.finish()
1769 }
1770}
1771
1772pub struct EncodedData {
1774 pub ipc_message: Vec<u8>,
1776 pub arrow_data: Vec<u8>,
1778}
1779pub fn write_message<W: Write>(
1781 mut writer: W,
1782 encoded: EncodedData,
1783 write_options: &IpcWriteOptions,
1784) -> Result<(usize, usize), ArrowError> {
1785 let arrow_data_len = encoded.arrow_data.len();
1786 if arrow_data_len % usize::from(write_options.alignment) != 0 {
1787 return Err(ArrowError::MemoryError(
1788 "Arrow data not aligned".to_string(),
1789 ));
1790 }
1791
1792 let a = usize::from(write_options.alignment - 1);
1793 let buffer = encoded.ipc_message;
1794 let flatbuf_size = buffer.len();
1795 let prefix_size = if write_options.write_legacy_ipc_format {
1796 4
1797 } else {
1798 8
1799 };
1800 let aligned_size = (flatbuf_size + prefix_size + a) & !a;
1801 let padding_bytes = aligned_size - flatbuf_size - prefix_size;
1802
1803 write_continuation(
1804 &mut writer,
1805 write_options,
1806 (aligned_size - prefix_size) as i32,
1807 )?;
1808
1809 if flatbuf_size > 0 {
1811 writer.write_all(&buffer)?;
1812 }
1813 writer.write_all(&PADDING[..padding_bytes])?;
1815
1816 let body_len = if arrow_data_len > 0 {
1818 write_body_buffers(&mut writer, &encoded.arrow_data, write_options.alignment)?
1819 } else {
1820 0
1821 };
1822
1823 Ok((aligned_size, body_len))
1824}
1825
1826fn write_body_buffers<W: Write>(
1827 mut writer: W,
1828 data: &[u8],
1829 alignment: u8,
1830) -> Result<usize, ArrowError> {
1831 let len = data.len();
1832 let pad_len = pad_to_alignment(alignment, len);
1833 let total_len = len + pad_len;
1834
1835 writer.write_all(data)?;
1837 if pad_len > 0 {
1838 writer.write_all(&PADDING[..pad_len])?;
1839 }
1840
1841 Ok(total_len)
1842}
1843
1844fn write_continuation<W: Write>(
1847 mut writer: W,
1848 write_options: &IpcWriteOptions,
1849 total_len: i32,
1850) -> Result<usize, ArrowError> {
1851 let mut written = 8;
1852
1853 match write_options.metadata_version {
1855 crate::MetadataVersion::V1 | crate::MetadataVersion::V2 | crate::MetadataVersion::V3 => {
1856 unreachable!("Options with the metadata version cannot be created")
1857 }
1858 crate::MetadataVersion::V4 => {
1859 if !write_options.write_legacy_ipc_format {
1860 writer.write_all(&CONTINUATION_MARKER)?;
1862 written = 4;
1863 }
1864 writer.write_all(&total_len.to_le_bytes()[..])?;
1865 }
1866 crate::MetadataVersion::V5 => {
1867 writer.write_all(&CONTINUATION_MARKER)?;
1869 writer.write_all(&total_len.to_le_bytes()[..])?;
1870 }
1871 z => panic!("Unsupported crate::MetadataVersion {z:?}"),
1872 };
1873
1874 Ok(written)
1875}
1876
1877fn has_validity_bitmap(data_type: &DataType, write_options: &IpcWriteOptions) -> bool {
1881 if write_options.metadata_version < crate::MetadataVersion::V5 {
1882 !matches!(data_type, DataType::Null)
1883 } else {
1884 !matches!(
1885 data_type,
1886 DataType::Null | DataType::Union(_, _) | DataType::RunEndEncoded(_, _)
1887 )
1888 }
1889}
1890
1891#[inline]
1893fn buffer_need_truncate(
1894 array_offset: usize,
1895 buffer: &Buffer,
1896 spec: &BufferSpec,
1897 min_length: usize,
1898) -> bool {
1899 spec != &BufferSpec::AlwaysNull && (array_offset != 0 || min_length < buffer.len())
1900}
1901
1902#[inline]
1904fn get_buffer_element_width(spec: &BufferSpec) -> usize {
1905 match spec {
1906 BufferSpec::FixedWidth { byte_width, .. } => *byte_width,
1907 _ => 0,
1908 }
1909}
1910
1911fn reencode_offsets<O: OffsetSizeTrait>(
1914 offsets: &Buffer,
1915 data: &ArrayData,
1916) -> (Buffer, usize, usize) {
1917 let offsets_slice: &[O] = offsets.typed_data::<O>();
1918 let offset_slice = &offsets_slice[data.offset()..data.offset() + data.len() + 1];
1919
1920 let start_offset = offset_slice.first().unwrap();
1921 let end_offset = offset_slice.last().unwrap();
1922
1923 let offsets = match start_offset.as_usize() {
1924 0 => {
1925 let size = size_of::<O>();
1926 offsets.slice_with_length(data.offset() * size, (data.len() + 1) * size)
1927 }
1928 _ => offset_slice.iter().map(|x| *x - *start_offset).collect(),
1929 };
1930
1931 let start_offset = start_offset.as_usize();
1932 let end_offset = end_offset.as_usize();
1933
1934 (offsets, start_offset, end_offset - start_offset)
1935}
1936
1937fn get_byte_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, Buffer) {
1943 if data.is_empty() {
1944 let mut offsets = MutableBuffer::new(size_of::<O>());
1947 offsets.extend_from_slice(O::usize_as(0).to_byte_slice());
1948 return (offsets.into(), MutableBuffer::new(0).into());
1949 }
1950
1951 let (offsets, original_start_offset, len) = reencode_offsets::<O>(&data.buffers()[0], data);
1952 let values = data.buffers()[1].slice_with_length(original_start_offset, len);
1953 (offsets, values)
1954}
1955
1956fn get_list_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, ArrayData) {
1959 if data.is_empty() {
1960 let mut offsets = MutableBuffer::new(size_of::<O>());
1963 offsets.extend_from_slice(O::usize_as(0).to_byte_slice());
1964 return (offsets.into(), data.child_data()[0].slice(0, 0));
1965 }
1966
1967 let (offsets, original_start_offset, len) = reencode_offsets::<O>(&data.buffers()[0], data);
1968 let child_data = data.child_data()[0].slice(original_start_offset, len);
1969 (offsets, child_data)
1970}
1971
1972fn get_list_view_array_buffers<O: OffsetSizeTrait>(
1978 data: &ArrayData,
1979) -> (Buffer, Buffer, ArrayData) {
1980 if data.is_empty() {
1981 return (
1982 MutableBuffer::new(0).into(),
1983 MutableBuffer::new(0).into(),
1984 data.child_data()[0].slice(0, 0),
1985 );
1986 }
1987
1988 let offsets = &data.buffers()[0];
1989 let sizes = &data.buffers()[1];
1990
1991 let element_size = std::mem::size_of::<O>();
1992 let offsets_slice =
1993 offsets.slice_with_length(data.offset() * element_size, data.len() * element_size);
1994 let sizes_slice =
1995 sizes.slice_with_length(data.offset() * element_size, data.len() * element_size);
1996
1997 let child_data = data.child_data()[0].clone();
1998
1999 (offsets_slice, sizes_slice, child_data)
2000}
2001
2002fn get_or_truncate_buffer(array_data: &ArrayData) -> Buffer {
2009 let buffer = &array_data.buffers()[0];
2010 let layout = layout(array_data.data_type());
2011 let spec = &layout.buffers[0];
2012
2013 let byte_width = get_buffer_element_width(spec);
2014 let min_length = array_data.len() * byte_width;
2015 if buffer_need_truncate(array_data.offset(), buffer, spec, min_length) {
2016 let byte_offset = array_data.offset() * byte_width;
2017 let buffer_length = min(min_length, buffer.len() - byte_offset);
2018 buffer.slice_with_length(byte_offset, buffer_length)
2019 } else {
2020 buffer.clone()
2021 }
2022}
2023
2024fn write_array_data(
2030 array_data: &ArrayData,
2031 meta: &mut IpcMetadataBuilder,
2032 sink: &mut IpcBodySink<'_>,
2033 offset: i64,
2034 compression_codec: Option<CompressionCodec>,
2035 compression_context: &mut CompressionContext,
2036 write_options: &IpcWriteOptions,
2037) -> Result<i64, ArrowError> {
2038 let mut offset = offset;
2039 let num_rows = array_data.len();
2040 if !matches!(array_data.data_type(), DataType::Null) {
2041 meta.nodes.push(crate::FieldNode::new(
2042 num_rows as i64,
2043 array_data.null_count() as i64,
2044 ));
2045 } else {
2046 meta.nodes
2048 .push(crate::FieldNode::new(num_rows as i64, num_rows as i64));
2049 }
2050 if has_validity_bitmap(array_data.data_type(), write_options) {
2051 let null_buffer = match array_data.nulls() {
2053 None => {
2054 let num_bytes = bit_util::ceil(num_rows, 8);
2056 let buffer = MutableBuffer::new(num_bytes);
2057 let buffer = buffer.with_bitset(num_bytes, true);
2058 buffer.into()
2059 }
2060 Some(buffer) => buffer.inner().sliced(),
2061 };
2062
2063 offset = encode_sink_buffer(
2064 null_buffer,
2065 meta,
2066 sink,
2067 offset,
2068 compression_codec,
2069 compression_context,
2070 write_options.alignment,
2071 )?;
2072 }
2073
2074 let data_type = array_data.data_type();
2075 if matches!(data_type, DataType::Binary | DataType::Utf8) {
2076 let (offsets, values) = get_byte_array_buffers::<i32>(array_data);
2077 for buffer in [offsets, values] {
2078 offset = encode_sink_buffer(
2079 buffer,
2080 meta,
2081 sink,
2082 offset,
2083 compression_codec,
2084 compression_context,
2085 write_options.alignment,
2086 )?;
2087 }
2088 } else if matches!(data_type, DataType::BinaryView | DataType::Utf8View) {
2089 let views = get_or_truncate_buffer(array_data);
2096 offset = encode_sink_buffer(
2097 views,
2098 meta,
2099 sink,
2100 offset,
2101 compression_codec,
2102 compression_context,
2103 write_options.alignment,
2104 )?;
2105
2106 for buffer in array_data.buffers().iter().skip(1) {
2107 offset = encode_sink_buffer(
2108 buffer.clone(),
2109 meta,
2110 sink,
2111 offset,
2112 compression_codec,
2113 compression_context,
2114 write_options.alignment,
2115 )?;
2116 }
2117 } else if matches!(data_type, DataType::LargeBinary | DataType::LargeUtf8) {
2118 let (offsets, values) = get_byte_array_buffers::<i64>(array_data);
2119 for buffer in [offsets, values] {
2120 offset = encode_sink_buffer(
2121 buffer,
2122 meta,
2123 sink,
2124 offset,
2125 compression_codec,
2126 compression_context,
2127 write_options.alignment,
2128 )?;
2129 }
2130 } else if DataType::is_numeric(data_type)
2131 || DataType::is_temporal(data_type)
2132 || matches!(
2133 array_data.data_type(),
2134 DataType::FixedSizeBinary(_) | DataType::Dictionary(_, _)
2135 )
2136 {
2137 assert_eq!(array_data.buffers().len(), 1);
2139
2140 let buffer = get_or_truncate_buffer(array_data);
2141 offset = encode_sink_buffer(
2142 buffer,
2143 meta,
2144 sink,
2145 offset,
2146 compression_codec,
2147 compression_context,
2148 write_options.alignment,
2149 )?;
2150 } else if matches!(data_type, DataType::Boolean) {
2151 assert_eq!(array_data.buffers().len(), 1);
2154
2155 let buffer = &array_data.buffers()[0];
2156 let buffer = buffer.bit_slice(array_data.offset(), array_data.len());
2157 offset = encode_sink_buffer(
2158 buffer,
2159 meta,
2160 sink,
2161 offset,
2162 compression_codec,
2163 compression_context,
2164 write_options.alignment,
2165 )?;
2166 } else if matches!(
2167 data_type,
2168 DataType::List(_) | DataType::LargeList(_) | DataType::Map(_, _)
2169 ) {
2170 assert_eq!(array_data.buffers().len(), 1);
2171 assert_eq!(array_data.child_data().len(), 1);
2172
2173 let (offsets, sliced_child_data) = match data_type {
2175 DataType::List(_) => get_list_array_buffers::<i32>(array_data),
2176 DataType::Map(_, _) => get_list_array_buffers::<i32>(array_data),
2177 DataType::LargeList(_) => get_list_array_buffers::<i64>(array_data),
2178 _ => unreachable!(),
2179 };
2180 offset = encode_sink_buffer(
2181 offsets,
2182 meta,
2183 sink,
2184 offset,
2185 compression_codec,
2186 compression_context,
2187 write_options.alignment,
2188 )?;
2189 offset = write_array_data(
2190 &sliced_child_data,
2191 meta,
2192 sink,
2193 offset,
2194 compression_codec,
2195 compression_context,
2196 write_options,
2197 )?;
2198 return Ok(offset);
2199 } else if matches!(
2200 data_type,
2201 DataType::ListView(_) | DataType::LargeListView(_)
2202 ) {
2203 assert_eq!(array_data.buffers().len(), 2); assert_eq!(array_data.child_data().len(), 1);
2205
2206 let (offsets, sizes, child_data) = match data_type {
2207 DataType::ListView(_) => get_list_view_array_buffers::<i32>(array_data),
2208 DataType::LargeListView(_) => get_list_view_array_buffers::<i64>(array_data),
2209 _ => unreachable!(),
2210 };
2211
2212 offset = encode_sink_buffer(
2213 offsets,
2214 meta,
2215 sink,
2216 offset,
2217 compression_codec,
2218 compression_context,
2219 write_options.alignment,
2220 )?;
2221 offset = encode_sink_buffer(
2222 sizes,
2223 meta,
2224 sink,
2225 offset,
2226 compression_codec,
2227 compression_context,
2228 write_options.alignment,
2229 )?;
2230
2231 offset = write_array_data(
2232 &child_data,
2233 meta,
2234 sink,
2235 offset,
2236 compression_codec,
2237 compression_context,
2238 write_options,
2239 )?;
2240 return Ok(offset);
2241 } else if let DataType::FixedSizeList(_, fixed_size) = data_type {
2242 assert_eq!(array_data.child_data().len(), 1);
2243 let fixed_size = *fixed_size as usize;
2244
2245 let child_offset = array_data.offset() * fixed_size;
2246 let child_length = array_data.len() * fixed_size;
2247 let child_data = array_data.child_data()[0].slice(child_offset, child_length);
2248
2249 offset = write_array_data(
2250 &child_data,
2251 meta,
2252 sink,
2253 offset,
2254 compression_codec,
2255 compression_context,
2256 write_options,
2257 )?;
2258 return Ok(offset);
2259 } else {
2260 for buffer in array_data.buffers() {
2261 offset = encode_sink_buffer(
2262 buffer.clone(),
2263 meta,
2264 sink,
2265 offset,
2266 compression_codec,
2267 compression_context,
2268 write_options.alignment,
2269 )?;
2270 }
2271 }
2272
2273 match array_data.data_type() {
2274 DataType::Dictionary(_, _) => {}
2275 DataType::RunEndEncoded(_, _) => {
2276 let arr = unslice_run_array(array_data.clone())?;
2278 for data_ref in arr.child_data() {
2280 offset = write_array_data(
2282 data_ref,
2283 meta,
2284 sink,
2285 offset,
2286 compression_codec,
2287 compression_context,
2288 write_options,
2289 )?;
2290 }
2291 }
2292 _ => {
2293 for data_ref in array_data.child_data() {
2295 offset = write_array_data(
2297 data_ref,
2298 meta,
2299 sink,
2300 offset,
2301 compression_codec,
2302 compression_context,
2303 write_options,
2304 )?;
2305 }
2306 }
2307 }
2308 Ok(offset)
2309}
2310
2311fn encode_sink_buffer(
2325 buffer: Buffer,
2326 ipc_meta_data: &mut IpcMetadataBuilder,
2327 sink: &mut IpcBodySink<'_>,
2328 offset: i64,
2329 compression_codec: Option<CompressionCodec>,
2330 compression_context: &mut CompressionContext,
2331 alignment: u8,
2332) -> Result<i64, ArrowError> {
2333 let (encoded, len) = match compression_codec {
2334 None => {
2335 let len = buffer.len() as i64;
2336 (EncodedBuffer::Raw(buffer), len)
2337 }
2338 Some(codec) => {
2339 let mut scratch = Vec::new();
2340 let written =
2341 codec.compress_to_vec(buffer.as_slice(), &mut scratch, compression_context)?;
2342 let len = i64::try_from(written)
2343 .map_err(|e| ArrowError::InvalidArgumentError(format!("{e}")))?;
2344 (EncodedBuffer::Compressed(scratch), len)
2345 }
2346 };
2347
2348 let pad_len = pad_to_alignment(alignment, len as usize);
2349 sink.write(pad_len, encoded);
2350 ipc_meta_data.buffers.push(crate::Buffer::new(offset, len));
2351 Ok(offset + len + pad_len as i64)
2352}
2353
2354const PADDING: [u8; 64] = [0; 64];
2355
2356#[inline]
2362fn estimate_encoded_buffer_count(dt: &DataType) -> usize {
2363 match dt {
2364 DataType::Null => 0,
2365
2366 DataType::Binary | DataType::Utf8 | DataType::LargeBinary | DataType::LargeUtf8 => 3,
2367
2368 DataType::BinaryView | DataType::Utf8View => 3,
2369
2370 DataType::List(f) | DataType::LargeList(f) | DataType::Map(f, _) => {
2371 2 + estimate_encoded_buffer_count(f.data_type())
2372 }
2373
2374 DataType::ListView(f) | DataType::LargeListView(f) => {
2375 3 + estimate_encoded_buffer_count(f.data_type())
2376 }
2377
2378 DataType::FixedSizeList(f, _) => 1 + estimate_encoded_buffer_count(f.data_type()),
2379
2380 DataType::Struct(fields) => {
2381 1 + fields
2382 .iter()
2383 .map(|f| estimate_encoded_buffer_count(f.data_type()))
2384 .sum::<usize>()
2385 }
2386
2387 DataType::Dictionary(_, _) => 2,
2389
2390 DataType::Union(fields, UnionMode::Sparse) => {
2391 1 + fields
2392 .iter()
2393 .map(|(_, f)| estimate_encoded_buffer_count(f.data_type()))
2394 .sum::<usize>()
2395 }
2396 DataType::Union(fields, UnionMode::Dense) => {
2397 2 + fields
2398 .iter()
2399 .map(|(_, f)| estimate_encoded_buffer_count(f.data_type()))
2400 .sum::<usize>()
2401 }
2402
2403 DataType::RunEndEncoded(run_ends, values) => {
2404 estimate_encoded_buffer_count(run_ends.data_type())
2405 + estimate_encoded_buffer_count(values.data_type())
2406 }
2407 _ => 2,
2409 }
2410}
2411
2412#[inline]
2414fn pad_to_alignment(alignment: u8, len: usize) -> usize {
2415 let a = usize::from(alignment - 1);
2416 ((len + a) & !a) - len
2417}
2418
2419#[cfg(test)]
2420mod tests {
2421 use std::hash::Hasher;
2422 use std::io::Cursor;
2423 use std::io::Seek;
2424
2425 use arrow_array::builder::FixedSizeListBuilder;
2426 use arrow_array::builder::Float32Builder;
2427 use arrow_array::builder::Int64Builder;
2428 use arrow_array::builder::MapBuilder;
2429 use arrow_array::builder::StringViewBuilder;
2430 use arrow_array::builder::UnionBuilder;
2431 use arrow_array::builder::{
2432 GenericListBuilder, GenericListViewBuilder, ListBuilder, StringBuilder,
2433 };
2434 use arrow_array::builder::{PrimitiveRunBuilder, UInt32Builder};
2435 use arrow_array::types::*;
2436 use arrow_buffer::ScalarBuffer;
2437
2438 use crate::MetadataVersion;
2439 use crate::convert::fb_to_schema;
2440 use crate::reader::*;
2441 use crate::root_as_footer;
2442
2443 use super::*;
2444
2445 fn serialize_file(rb: &RecordBatch) -> Vec<u8> {
2446 let mut writer = FileWriter::try_new(vec![], rb.schema_ref()).unwrap();
2447 writer.write(rb).unwrap();
2448 writer.finish().unwrap();
2449 writer.into_inner().unwrap()
2450 }
2451
2452 fn deserialize_file(bytes: Vec<u8>) -> RecordBatch {
2453 let mut reader = FileReader::try_new(Cursor::new(bytes), None).unwrap();
2454 reader.next().unwrap().unwrap()
2455 }
2456
2457 fn serialize_stream(record: &RecordBatch) -> Vec<u8> {
2458 const IPC_ALIGNMENT: usize = 8;
2462
2463 let mut stream_writer = StreamWriter::try_new_with_options(
2464 vec![],
2465 record.schema_ref(),
2466 IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
2467 )
2468 .unwrap();
2469 stream_writer.write(record).unwrap();
2470 stream_writer.finish().unwrap();
2471 stream_writer.into_inner().unwrap()
2472 }
2473
2474 fn deserialize_stream(bytes: Vec<u8>) -> RecordBatch {
2475 let mut stream_reader = StreamReader::try_new(Cursor::new(bytes), None).unwrap();
2476 stream_reader.next().unwrap().unwrap()
2477 }
2478
2479 #[test]
2480 #[cfg(feature = "lz4")]
2481 fn test_write_empty_record_batch_lz4_compression() {
2482 let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
2483 let values: Vec<Option<i32>> = vec![];
2484 let array = Int32Array::from(values);
2485 let record_batch =
2486 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2487
2488 let mut file = tempfile::tempfile().unwrap();
2489
2490 {
2491 let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2492 .unwrap()
2493 .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
2494 .unwrap();
2495
2496 let mut writer =
2497 FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2498 writer.write(&record_batch).unwrap();
2499 writer.finish().unwrap();
2500 }
2501 file.rewind().unwrap();
2502 {
2503 let reader = FileReader::try_new(file, None).unwrap();
2505 for read_batch in reader {
2506 read_batch
2507 .unwrap()
2508 .columns()
2509 .iter()
2510 .zip(record_batch.columns())
2511 .for_each(|(a, b)| {
2512 assert_eq!(a.data_type(), b.data_type());
2513 assert_eq!(a.len(), b.len());
2514 assert_eq!(a.null_count(), b.null_count());
2515 });
2516 }
2517 }
2518 }
2519
2520 #[test]
2521 #[cfg(feature = "lz4")]
2522 fn test_write_file_with_lz4_compression() {
2523 let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
2524 let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
2525 let array = Int32Array::from(values);
2526 let record_batch =
2527 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2528
2529 let mut file = tempfile::tempfile().unwrap();
2530 {
2531 let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2532 .unwrap()
2533 .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
2534 .unwrap();
2535
2536 let mut writer =
2537 FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2538 writer.write(&record_batch).unwrap();
2539 writer.finish().unwrap();
2540 }
2541 file.rewind().unwrap();
2542 {
2543 let reader = FileReader::try_new(file, None).unwrap();
2545 for read_batch in reader {
2546 read_batch
2547 .unwrap()
2548 .columns()
2549 .iter()
2550 .zip(record_batch.columns())
2551 .for_each(|(a, b)| {
2552 assert_eq!(a.data_type(), b.data_type());
2553 assert_eq!(a.len(), b.len());
2554 assert_eq!(a.null_count(), b.null_count());
2555 });
2556 }
2557 }
2558 }
2559
2560 #[test]
2561 #[cfg(feature = "zstd")]
2562 fn test_write_file_with_zstd_compression() {
2563 let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
2564 let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
2565 let array = Int32Array::from(values);
2566 let record_batch =
2567 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2568 let mut file = tempfile::tempfile().unwrap();
2569 {
2570 let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2571 .unwrap()
2572 .try_with_compression(Some(crate::CompressionType::ZSTD))
2573 .unwrap()
2574 .try_with_compression_level(Some(1))
2575 .unwrap();
2576
2577 let mut writer =
2578 FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2579 writer.write(&record_batch).unwrap();
2580 writer.finish().unwrap();
2581 }
2582 file.rewind().unwrap();
2583 {
2584 let reader = FileReader::try_new(file, None).unwrap();
2586 for read_batch in reader {
2587 read_batch
2588 .unwrap()
2589 .columns()
2590 .iter()
2591 .zip(record_batch.columns())
2592 .for_each(|(a, b)| {
2593 assert_eq!(a.data_type(), b.data_type());
2594 assert_eq!(a.len(), b.len());
2595 assert_eq!(a.null_count(), b.null_count());
2596 });
2597 }
2598 }
2599 }
2600
2601 #[test]
2602 fn test_write_file() {
2603 let schema = Schema::new(vec![Field::new("field1", DataType::UInt32, true)]);
2604 let values: Vec<Option<u32>> = vec![
2605 Some(999),
2606 None,
2607 Some(235),
2608 Some(123),
2609 None,
2610 None,
2611 None,
2612 None,
2613 None,
2614 ];
2615 let array1 = UInt32Array::from(values);
2616 let batch =
2617 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array1) as ArrayRef])
2618 .unwrap();
2619 let mut file = tempfile::tempfile().unwrap();
2620 {
2621 let mut writer = FileWriter::try_new(&mut file, &schema).unwrap();
2622
2623 writer.write(&batch).unwrap();
2624 writer.finish().unwrap();
2625 }
2626 file.rewind().unwrap();
2627
2628 {
2629 let mut reader = FileReader::try_new(file, None).unwrap();
2630 while let Some(Ok(read_batch)) = reader.next() {
2631 read_batch
2632 .columns()
2633 .iter()
2634 .zip(batch.columns())
2635 .for_each(|(a, b)| {
2636 assert_eq!(a.data_type(), b.data_type());
2637 assert_eq!(a.len(), b.len());
2638 assert_eq!(a.null_count(), b.null_count());
2639 });
2640 }
2641 }
2642 }
2643
2644 #[test]
2645 fn test_empty_utf8_ipc_writes_nonempty_offsets_buffer() {
2646 let name = StringArray::from(Vec::<String>::new());
2647 let (offsets, values) = get_byte_array_buffers::<i32>(&name.to_data());
2648
2649 assert_eq!(name.len(), 0);
2650 assert_eq!(
2651 offsets.len(),
2652 std::mem::size_of::<i32>(),
2653 "offsets buffer should contain one zero i32 offset"
2654 );
2655 assert_eq!(values.len(), 0, "values buffer should remain empty");
2656 }
2657
2658 #[test]
2659 fn test_empty_large_utf8_ipc_writes_nonempty_offsets_buffer() {
2660 let name = LargeStringArray::from(Vec::<String>::new());
2661 let (offsets, values) = get_byte_array_buffers::<i64>(&name.to_data());
2662
2663 assert_eq!(name.len(), 0);
2664 assert_eq!(
2665 offsets.len(),
2666 std::mem::size_of::<i64>(),
2667 "offsets buffer should contain one zero i64 offset"
2668 );
2669 assert_eq!(values.len(), 0, "values buffer should remain empty");
2670 }
2671
2672 #[test]
2673 fn test_empty_list_ipc_writes_nonempty_offsets_buffer() {
2674 let list = GenericListBuilder::<i32, _>::new(UInt32Builder::new()).finish();
2675 let (offsets, child_data) = get_list_array_buffers::<i32>(&list.to_data());
2676
2677 assert_eq!(list.len(), 0);
2678 assert_eq!(
2679 offsets.len(),
2680 std::mem::size_of::<i32>(),
2681 "offsets buffer should contain one zero i32 offset"
2682 );
2683 assert_eq!(child_data.len(), 0, "child data should remain empty");
2684 }
2685
2686 #[test]
2687 fn test_empty_large_list_ipc_writes_nonempty_offsets_buffer() {
2688 let list = GenericListBuilder::<i64, _>::new(UInt32Builder::new()).finish();
2689 let (offsets, child_data) = get_list_array_buffers::<i64>(&list.to_data());
2690
2691 assert_eq!(list.len(), 0);
2692 assert_eq!(
2693 offsets.len(),
2694 std::mem::size_of::<i64>(),
2695 "offsets buffer should contain one zero i64 offset"
2696 );
2697 assert_eq!(child_data.len(), 0, "child data should remain empty");
2698 }
2699
2700 fn write_null_file(options: IpcWriteOptions) {
2701 let schema = Schema::new(vec![
2702 Field::new("nulls", DataType::Null, true),
2703 Field::new("int32s", DataType::Int32, false),
2704 Field::new("nulls2", DataType::Null, true),
2705 Field::new("f64s", DataType::Float64, false),
2706 ]);
2707 let array1 = NullArray::new(32);
2708 let array2 = Int32Array::from(vec![1; 32]);
2709 let array3 = NullArray::new(32);
2710 let array4 = Float64Array::from(vec![f64::NAN; 32]);
2711 let batch = RecordBatch::try_new(
2712 Arc::new(schema.clone()),
2713 vec![
2714 Arc::new(array1) as ArrayRef,
2715 Arc::new(array2) as ArrayRef,
2716 Arc::new(array3) as ArrayRef,
2717 Arc::new(array4) as ArrayRef,
2718 ],
2719 )
2720 .unwrap();
2721 let mut file = tempfile::tempfile().unwrap();
2722 {
2723 let mut writer = FileWriter::try_new_with_options(&mut file, &schema, options).unwrap();
2724
2725 writer.write(&batch).unwrap();
2726 writer.finish().unwrap();
2727 }
2728
2729 file.rewind().unwrap();
2730
2731 {
2732 let reader = FileReader::try_new(file, None).unwrap();
2733 reader.for_each(|maybe_batch| {
2734 maybe_batch
2735 .unwrap()
2736 .columns()
2737 .iter()
2738 .zip(batch.columns())
2739 .for_each(|(a, b)| {
2740 assert_eq!(a.data_type(), b.data_type());
2741 assert_eq!(a.len(), b.len());
2742 assert_eq!(a.null_count(), b.null_count());
2743 });
2744 });
2745 }
2746 }
2747 #[test]
2748 fn test_write_null_file_v4() {
2749 write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap());
2750 write_null_file(IpcWriteOptions::try_new(8, true, MetadataVersion::V4).unwrap());
2751 write_null_file(IpcWriteOptions::try_new(64, false, MetadataVersion::V4).unwrap());
2752 write_null_file(IpcWriteOptions::try_new(64, true, MetadataVersion::V4).unwrap());
2753 }
2754
2755 #[test]
2756 fn test_write_null_file_v5() {
2757 write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap());
2758 write_null_file(IpcWriteOptions::try_new(64, false, MetadataVersion::V5).unwrap());
2759 }
2760
2761 #[test]
2762 fn track_union_nested_dict() {
2763 let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2764
2765 let array = Arc::new(inner) as ArrayRef;
2766
2767 #[allow(deprecated)]
2769 let dctfield = Field::new_dict("dict", array.data_type().clone(), false, 0, false);
2770 let union_fields = [(0, Arc::new(dctfield))].into_iter().collect();
2771
2772 let types = [0, 0, 0].into_iter().collect::<ScalarBuffer<i8>>();
2773 let offsets = [0, 1, 2].into_iter().collect::<ScalarBuffer<i32>>();
2774
2775 let union = UnionArray::try_new(union_fields, types, Some(offsets), vec![array]).unwrap();
2776
2777 let schema = Arc::new(Schema::new(vec![Field::new(
2778 "union",
2779 union.data_type().clone(),
2780 false,
2781 )]));
2782
2783 let r#gen = IpcDataGenerator::default();
2784 let mut dict_tracker = DictionaryTracker::new(false);
2785 r#gen.schema_to_bytes_with_dictionary_tracker(
2786 &schema,
2787 &mut dict_tracker,
2788 &IpcWriteOptions::default(),
2789 );
2790
2791 let batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap();
2792
2793 r#gen
2794 .encode(
2795 &batch,
2796 &mut dict_tracker,
2797 &Default::default(),
2798 &mut Default::default(),
2799 )
2800 .unwrap();
2801
2802 assert!(dict_tracker.written.contains_key(&0));
2805 }
2806
2807 #[test]
2808 fn track_struct_nested_dict() {
2809 let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2810
2811 let array = Arc::new(inner) as ArrayRef;
2812
2813 #[allow(deprecated)]
2815 let dctfield = Arc::new(Field::new_dict(
2816 "dict",
2817 array.data_type().clone(),
2818 false,
2819 2,
2820 false,
2821 ));
2822
2823 let s = StructArray::from(vec![(dctfield, array)]);
2824 let struct_array = Arc::new(s) as ArrayRef;
2825
2826 let schema = Arc::new(Schema::new(vec![Field::new(
2827 "struct",
2828 struct_array.data_type().clone(),
2829 false,
2830 )]));
2831
2832 let r#gen = IpcDataGenerator::default();
2833 let mut dict_tracker = DictionaryTracker::new(false);
2834 r#gen.schema_to_bytes_with_dictionary_tracker(
2835 &schema,
2836 &mut dict_tracker,
2837 &IpcWriteOptions::default(),
2838 );
2839
2840 let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
2841
2842 r#gen
2843 .encode(
2844 &batch,
2845 &mut dict_tracker,
2846 &Default::default(),
2847 &mut Default::default(),
2848 )
2849 .unwrap();
2850
2851 assert!(dict_tracker.written.contains_key(&0));
2852 }
2853
2854 fn write_union_file(options: IpcWriteOptions) {
2855 let schema = Schema::new(vec![Field::new_union(
2856 "union",
2857 vec![0, 1],
2858 vec![
2859 Field::new("a", DataType::Int32, false),
2860 Field::new("c", DataType::Float64, false),
2861 ],
2862 UnionMode::Sparse,
2863 )]);
2864 let mut builder = UnionBuilder::with_capacity_sparse(5);
2865 builder.append::<Int32Type>("a", 1).unwrap();
2866 builder.append_null::<Int32Type>("a").unwrap();
2867 builder.append::<Float64Type>("c", 3.0).unwrap();
2868 builder.append_null::<Float64Type>("c").unwrap();
2869 builder.append::<Int32Type>("a", 4).unwrap();
2870 let union = builder.build().unwrap();
2871
2872 let batch =
2873 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(union) as ArrayRef])
2874 .unwrap();
2875
2876 let mut file = tempfile::tempfile().unwrap();
2877 {
2878 let mut writer = FileWriter::try_new_with_options(&mut file, &schema, options).unwrap();
2879
2880 writer.write(&batch).unwrap();
2881 writer.finish().unwrap();
2882 }
2883 file.rewind().unwrap();
2884
2885 {
2886 let reader = FileReader::try_new(file, None).unwrap();
2887 reader.for_each(|maybe_batch| {
2888 maybe_batch
2889 .unwrap()
2890 .columns()
2891 .iter()
2892 .zip(batch.columns())
2893 .for_each(|(a, b)| {
2894 assert_eq!(a.data_type(), b.data_type());
2895 assert_eq!(a.len(), b.len());
2896 assert_eq!(a.null_count(), b.null_count());
2897 });
2898 });
2899 }
2900 }
2901
2902 #[test]
2903 fn test_write_union_file_v4_v5() {
2904 write_union_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap());
2905 write_union_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap());
2906 }
2907
2908 #[test]
2909 fn test_write_view_types() {
2910 const LONG_TEST_STRING: &str =
2911 "This is a long string to make sure binary view array handles it";
2912 let schema = Schema::new(vec![
2913 Field::new("field1", DataType::BinaryView, true),
2914 Field::new("field2", DataType::Utf8View, true),
2915 ]);
2916 let values: Vec<Option<&[u8]>> = vec![
2917 Some(b"foo"),
2918 Some(b"bar"),
2919 Some(LONG_TEST_STRING.as_bytes()),
2920 ];
2921 let binary_array = BinaryViewArray::from_iter(values);
2922 let utf8_array =
2923 StringViewArray::from_iter(vec![Some("foo"), Some("bar"), Some(LONG_TEST_STRING)]);
2924 let record_batch = RecordBatch::try_new(
2925 Arc::new(schema.clone()),
2926 vec![Arc::new(binary_array), Arc::new(utf8_array)],
2927 )
2928 .unwrap();
2929
2930 let mut file = tempfile::tempfile().unwrap();
2931 {
2932 let mut writer = FileWriter::try_new(&mut file, &schema).unwrap();
2933 writer.write(&record_batch).unwrap();
2934 writer.finish().unwrap();
2935 }
2936 file.rewind().unwrap();
2937 {
2938 let mut reader = FileReader::try_new(&file, None).unwrap();
2939 let read_batch = reader.next().unwrap().unwrap();
2940 read_batch
2941 .columns()
2942 .iter()
2943 .zip(record_batch.columns())
2944 .for_each(|(a, b)| {
2945 assert_eq!(a, b);
2946 });
2947 }
2948 file.rewind().unwrap();
2949 {
2950 let mut reader = FileReader::try_new(&file, Some(vec![0])).unwrap();
2951 let read_batch = reader.next().unwrap().unwrap();
2952 assert_eq!(read_batch.num_columns(), 1);
2953 let read_array = read_batch.column(0);
2954 let write_array = record_batch.column(0);
2955 assert_eq!(read_array, write_array);
2956 }
2957 }
2958
2959 #[test]
2960 fn truncate_ipc_record_batch() {
2961 fn create_batch(rows: usize) -> RecordBatch {
2962 let schema = Schema::new(vec![
2963 Field::new("a", DataType::Int32, false),
2964 Field::new("b", DataType::Utf8, false),
2965 ]);
2966
2967 let a = Int32Array::from_iter_values(0..rows as i32);
2968 let b = StringArray::from_iter_values((0..rows).map(|i| i.to_string()));
2969
2970 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap()
2971 }
2972
2973 let big_record_batch = create_batch(65536);
2974
2975 let length = 5;
2976 let small_record_batch = create_batch(length);
2977
2978 let offset = 2;
2979 let record_batch_slice = big_record_batch.slice(offset, length);
2980 assert!(
2981 serialize_stream(&big_record_batch).len() > serialize_stream(&small_record_batch).len()
2982 );
2983 assert_eq!(
2984 serialize_stream(&small_record_batch).len(),
2985 serialize_stream(&record_batch_slice).len()
2986 );
2987
2988 assert_eq!(
2989 deserialize_stream(serialize_stream(&record_batch_slice)),
2990 record_batch_slice
2991 );
2992 }
2993
2994 #[test]
2995 fn truncate_ipc_record_batch_with_nulls() {
2996 fn create_batch() -> RecordBatch {
2997 let schema = Schema::new(vec![
2998 Field::new("a", DataType::Int32, true),
2999 Field::new("b", DataType::Utf8, true),
3000 ]);
3001
3002 let a = Int32Array::from(vec![Some(1), None, Some(1), None, Some(1)]);
3003 let b = StringArray::from(vec![None, Some("a"), Some("a"), None, Some("a")]);
3004
3005 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap()
3006 }
3007
3008 let record_batch = create_batch();
3009 let record_batch_slice = record_batch.slice(1, 2);
3010 let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
3011
3012 assert!(
3013 serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
3014 );
3015
3016 assert!(deserialized_batch.column(0).is_null(0));
3017 assert!(deserialized_batch.column(0).is_valid(1));
3018 assert!(deserialized_batch.column(1).is_valid(0));
3019 assert!(deserialized_batch.column(1).is_valid(1));
3020
3021 assert_eq!(record_batch_slice, deserialized_batch);
3022 }
3023
3024 #[test]
3025 fn truncate_ipc_dictionary_array() {
3026 fn create_batch() -> RecordBatch {
3027 let values: StringArray = [Some("foo"), Some("bar"), Some("baz")]
3028 .into_iter()
3029 .collect();
3030 let keys: Int32Array = [Some(0), Some(2), None, Some(1)].into_iter().collect();
3031
3032 let array = DictionaryArray::new(keys, Arc::new(values));
3033
3034 let schema = Schema::new(vec![Field::new("dict", array.data_type().clone(), true)]);
3035
3036 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap()
3037 }
3038
3039 let record_batch = create_batch();
3040 let record_batch_slice = record_batch.slice(1, 2);
3041 let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
3042
3043 assert!(
3044 serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
3045 );
3046
3047 assert!(deserialized_batch.column(0).is_valid(0));
3048 assert!(deserialized_batch.column(0).is_null(1));
3049
3050 assert_eq!(record_batch_slice, deserialized_batch);
3051 }
3052
3053 #[test]
3054 fn truncate_ipc_struct_array() {
3055 fn create_batch() -> RecordBatch {
3056 let strings: StringArray = [Some("foo"), None, Some("bar"), Some("baz")]
3057 .into_iter()
3058 .collect();
3059 let ints: Int32Array = [Some(0), Some(2), None, Some(1)].into_iter().collect();
3060
3061 let struct_array = StructArray::from(vec![
3062 (
3063 Arc::new(Field::new("s", DataType::Utf8, true)),
3064 Arc::new(strings) as ArrayRef,
3065 ),
3066 (
3067 Arc::new(Field::new("c", DataType::Int32, true)),
3068 Arc::new(ints) as ArrayRef,
3069 ),
3070 ]);
3071
3072 let schema = Schema::new(vec![Field::new(
3073 "struct_array",
3074 struct_array.data_type().clone(),
3075 true,
3076 )]);
3077
3078 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(struct_array)]).unwrap()
3079 }
3080
3081 let record_batch = create_batch();
3082 let record_batch_slice = record_batch.slice(1, 2);
3083 let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
3084
3085 assert!(
3086 serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
3087 );
3088
3089 let structs = deserialized_batch
3090 .column(0)
3091 .as_any()
3092 .downcast_ref::<StructArray>()
3093 .unwrap();
3094
3095 assert!(structs.column(0).is_null(0));
3096 assert!(structs.column(0).is_valid(1));
3097 assert!(structs.column(1).is_valid(0));
3098 assert!(structs.column(1).is_null(1));
3099 assert_eq!(record_batch_slice, deserialized_batch);
3100 }
3101
3102 #[test]
3103 fn truncate_ipc_string_array_with_all_empty_string() {
3104 fn create_batch() -> RecordBatch {
3105 let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
3106 let a = StringArray::from(vec![Some(""), Some(""), Some(""), Some(""), Some("")]);
3107 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap()
3108 }
3109
3110 let record_batch = create_batch();
3111 let record_batch_slice = record_batch.slice(0, 1);
3112 let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
3113
3114 assert!(
3115 serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
3116 );
3117 assert_eq!(record_batch_slice, deserialized_batch);
3118 }
3119
3120 #[test]
3121 fn test_stream_writer_writes_array_slice() {
3122 let array = UInt32Array::from(vec![Some(1), Some(2), Some(3)]);
3123 assert_eq!(
3124 vec![Some(1), Some(2), Some(3)],
3125 array.iter().collect::<Vec<_>>()
3126 );
3127
3128 let sliced = array.slice(1, 2);
3129 assert_eq!(vec![Some(2), Some(3)], sliced.iter().collect::<Vec<_>>());
3130
3131 let batch = RecordBatch::try_new(
3132 Arc::new(Schema::new(vec![Field::new("a", DataType::UInt32, true)])),
3133 vec![Arc::new(sliced)],
3134 )
3135 .expect("new batch");
3136
3137 let mut writer = StreamWriter::try_new(vec![], batch.schema_ref()).expect("new writer");
3138 writer.write(&batch).expect("write");
3139 let outbuf = writer.into_inner().expect("inner");
3140
3141 let mut reader = StreamReader::try_new(&outbuf[..], None).expect("new reader");
3142 let read_batch = reader.next().unwrap().expect("read batch");
3143
3144 let read_array: &UInt32Array = read_batch.column(0).as_primitive();
3145 assert_eq!(
3146 vec![Some(2), Some(3)],
3147 read_array.iter().collect::<Vec<_>>()
3148 );
3149 }
3150
3151 #[test]
3152 fn test_large_slice_uint32() {
3153 ensure_roundtrip(Arc::new(UInt32Array::from_iter(
3154 (0..8000).map(|i| if i % 2 == 0 { Some(i) } else { None }),
3155 )));
3156 }
3157
3158 #[test]
3159 fn test_large_slice_string() {
3160 let strings: Vec<_> = (0..8000)
3161 .map(|i| {
3162 if i % 2 == 0 {
3163 Some(format!("value{i}"))
3164 } else {
3165 None
3166 }
3167 })
3168 .collect();
3169
3170 ensure_roundtrip(Arc::new(StringArray::from(strings)));
3171 }
3172
3173 #[test]
3174 fn test_large_slice_string_list() {
3175 let mut ls = ListBuilder::new(StringBuilder::new());
3176
3177 let mut s = String::new();
3178 for row_number in 0..8000 {
3179 if row_number % 2 == 0 {
3180 for list_element in 0..1000 {
3181 s.clear();
3182 use std::fmt::Write;
3183 write!(&mut s, "value{row_number}-{list_element}").unwrap();
3184 ls.values().append_value(&s);
3185 }
3186 ls.append(true)
3187 } else {
3188 ls.append(false); }
3190 }
3191
3192 ensure_roundtrip(Arc::new(ls.finish()));
3193 }
3194
3195 #[test]
3196 fn test_large_slice_string_list_of_lists() {
3197 let mut ls = ListBuilder::new(ListBuilder::new(StringBuilder::new()));
3201
3202 for _ in 0..4000 {
3203 ls.values().append(true);
3204 ls.append(true)
3205 }
3206
3207 let mut s = String::new();
3208 for row_number in 0..4000 {
3209 if row_number % 2 == 0 {
3210 for list_element in 0..1000 {
3211 s.clear();
3212 use std::fmt::Write;
3213 write!(&mut s, "value{row_number}-{list_element}").unwrap();
3214 ls.values().values().append_value(&s);
3215 }
3216 ls.values().append(true);
3217 ls.append(true)
3218 } else {
3219 ls.append(false); }
3221 }
3222
3223 ensure_roundtrip(Arc::new(ls.finish()));
3224 }
3225
3226 fn ensure_roundtrip(array: ArrayRef) {
3228 let num_rows = array.len();
3229 let orig_batch = RecordBatch::try_from_iter(vec![("a", array)]).unwrap();
3230 let sliced_batch = orig_batch.slice(1, num_rows - 1);
3232
3233 let schema = orig_batch.schema();
3234 let stream_data = {
3235 let mut writer = StreamWriter::try_new(vec![], &schema).unwrap();
3236 writer.write(&sliced_batch).unwrap();
3237 writer.into_inner().unwrap()
3238 };
3239 let read_batch = {
3240 let projection = None;
3241 let mut reader = StreamReader::try_new(Cursor::new(stream_data), projection).unwrap();
3242 reader
3243 .next()
3244 .expect("expect no errors reading batch")
3245 .expect("expect batch")
3246 };
3247 assert_eq!(sliced_batch, read_batch);
3248
3249 let file_data = {
3250 let mut writer = FileWriter::try_new_buffered(vec![], &schema).unwrap();
3251 writer.write(&sliced_batch).unwrap();
3252 writer.into_inner().unwrap().into_inner().unwrap()
3253 };
3254 let read_batch = {
3255 let projection = None;
3256 let mut reader = FileReader::try_new(Cursor::new(file_data), projection).unwrap();
3257 reader
3258 .next()
3259 .expect("expect no errors reading batch")
3260 .expect("expect batch")
3261 };
3262 assert_eq!(sliced_batch, read_batch);
3263
3264 }
3266
3267 #[test]
3268 fn encode_bools_slice() {
3269 assert_bool_roundtrip([true, false], 1, 1);
3271
3272 assert_bool_roundtrip(
3274 [
3275 true, false, true, true, false, false, true, true, true, false, false, false, true,
3276 true, true, true, false, false, false, false, true, true, true, true, true, false,
3277 false, false, false, false,
3278 ],
3279 13,
3280 17,
3281 );
3282
3283 assert_bool_roundtrip(
3285 [
3286 true, false, true, true, false, false, true, true, true, false, false, false,
3287 ],
3288 8,
3289 2,
3290 );
3291
3292 assert_bool_roundtrip(
3294 [
3295 true, false, true, true, false, false, true, true, true, false, false, false, true,
3296 true, true, true, true, false, false, false, false, false,
3297 ],
3298 8,
3299 8,
3300 );
3301 }
3302
3303 fn assert_bool_roundtrip<const N: usize>(bools: [bool; N], offset: usize, length: usize) {
3304 let val_bool_field = Field::new("val", DataType::Boolean, false);
3305
3306 let schema = Arc::new(Schema::new(vec![val_bool_field]));
3307
3308 let bools = BooleanArray::from(bools.to_vec());
3309
3310 let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(bools)]).unwrap();
3311 let batch = batch.slice(offset, length);
3312
3313 let data = serialize_stream(&batch);
3314 let batch2 = deserialize_stream(data);
3315 assert_eq!(batch, batch2);
3316 }
3317
3318 #[test]
3319 fn test_run_array_unslice() {
3320 let total_len = 80;
3321 let vals: Vec<Option<i32>> = vec![Some(1), None, Some(2), Some(3), Some(4), None, Some(5)];
3322 let repeats: Vec<usize> = vec![3, 4, 1, 2];
3323 let mut input_array: Vec<Option<i32>> = Vec::with_capacity(total_len);
3324 for ix in 0_usize..32 {
3325 let repeat: usize = repeats[ix % repeats.len()];
3326 let val: Option<i32> = vals[ix % vals.len()];
3327 input_array.resize(input_array.len() + repeat, val);
3328 }
3329
3330 let mut builder =
3332 PrimitiveRunBuilder::<Int16Type, Int32Type>::with_capacity(input_array.len());
3333 builder.extend(input_array.iter().copied());
3334 let run_array = builder.finish();
3335
3336 for slice_len in 1..=total_len {
3338 let sliced_run_array: RunArray<Int16Type> =
3340 run_array.slice(0, slice_len).into_data().into();
3341
3342 let unsliced_run_array = into_zero_offset_run_array(sliced_run_array).unwrap();
3344 let typed = unsliced_run_array
3345 .downcast::<PrimitiveArray<Int32Type>>()
3346 .unwrap();
3347 let expected: Vec<Option<i32>> = input_array.iter().take(slice_len).copied().collect();
3348 let actual: Vec<Option<i32>> = typed.into_iter().collect();
3349 assert_eq!(expected, actual);
3350
3351 let sliced_run_array: RunArray<Int16Type> = run_array
3353 .slice(total_len - slice_len, slice_len)
3354 .into_data()
3355 .into();
3356
3357 let unsliced_run_array = into_zero_offset_run_array(sliced_run_array).unwrap();
3359 let typed = unsliced_run_array
3360 .downcast::<PrimitiveArray<Int32Type>>()
3361 .unwrap();
3362 let expected: Vec<Option<i32>> = input_array
3363 .iter()
3364 .skip(total_len - slice_len)
3365 .copied()
3366 .collect();
3367 let actual: Vec<Option<i32>> = typed.into_iter().collect();
3368 assert_eq!(expected, actual);
3369 }
3370 }
3371
3372 fn generate_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3373 let mut ls = GenericListBuilder::<O, _>::new(UInt32Builder::new());
3374
3375 for i in 0..100_000 {
3376 for value in [i, i, i] {
3377 ls.values().append_value(value);
3378 }
3379 ls.append(true)
3380 }
3381
3382 ls.finish()
3383 }
3384
3385 fn generate_utf8view_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3386 let mut ls = GenericListBuilder::<O, _>::new(StringViewBuilder::new());
3387
3388 for i in 0..100_000 {
3389 for value in [
3390 format!("value{}", i),
3391 format!("value{}", i),
3392 format!("value{}", i),
3393 ] {
3394 ls.values().append_value(&value);
3395 }
3396 ls.append(true)
3397 }
3398
3399 ls.finish()
3400 }
3401
3402 fn generate_string_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3403 let mut ls = GenericListBuilder::<O, _>::new(StringBuilder::new());
3404
3405 for i in 0..100_000 {
3406 for value in [
3407 format!("value{}", i),
3408 format!("value{}", i),
3409 format!("value{}", i),
3410 ] {
3411 ls.values().append_value(&value);
3412 }
3413 ls.append(true)
3414 }
3415
3416 ls.finish()
3417 }
3418
3419 fn generate_nested_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3420 let mut ls =
3421 GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(UInt32Builder::new()));
3422
3423 for _i in 0..10_000 {
3424 for j in 0..10 {
3425 for value in [j, j, j, j] {
3426 ls.values().values().append_value(value);
3427 }
3428 ls.values().append(true)
3429 }
3430 ls.append(true);
3431 }
3432
3433 ls.finish()
3434 }
3435
3436 fn generate_nested_list_data_starting_at_zero<O: OffsetSizeTrait>() -> GenericListArray<O> {
3437 let mut ls =
3438 GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(UInt32Builder::new()));
3439
3440 for _i in 0..999 {
3441 ls.values().append(true);
3442 ls.append(true);
3443 }
3444
3445 for j in 0..10 {
3446 for value in [j, j, j, j] {
3447 ls.values().values().append_value(value);
3448 }
3449 ls.values().append(true)
3450 }
3451 ls.append(true);
3452
3453 for i in 0..9_000 {
3454 for j in 0..10 {
3455 for value in [i + j, i + j, i + j, i + j] {
3456 ls.values().values().append_value(value);
3457 }
3458 ls.values().append(true)
3459 }
3460 ls.append(true);
3461 }
3462
3463 ls.finish()
3464 }
3465
3466 fn generate_map_array_data() -> MapArray {
3467 let keys_builder = UInt32Builder::new();
3468 let values_builder = UInt32Builder::new();
3469
3470 let mut builder = MapBuilder::new(None, keys_builder, values_builder);
3471
3472 for i in 0..100_000 {
3473 for _j in 0..3 {
3474 builder.keys().append_value(i);
3475 builder.values().append_value(i * 2);
3476 }
3477 builder.append(true).unwrap();
3478 }
3479
3480 builder.finish()
3481 }
3482
3483 #[test]
3484 fn reencode_offsets_when_first_offset_is_not_zero() {
3485 let original_list = generate_list_data::<i32>();
3486 let original_data = original_list.into_data();
3487 let slice_data = original_data.slice(75, 7);
3488 let (new_offsets, original_start, length) =
3489 reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
3490 assert_eq!(
3491 vec![0, 3, 6, 9, 12, 15, 18, 21],
3492 new_offsets.typed_data::<i32>()
3493 );
3494 assert_eq!(225, original_start);
3495 assert_eq!(21, length);
3496 }
3497
3498 #[test]
3499 fn reencode_offsets_when_first_offset_is_zero() {
3500 let mut ls = GenericListBuilder::<i32, _>::new(UInt32Builder::new());
3501 ls.append(true);
3503 ls.values().append_value(35);
3504 ls.values().append_value(42);
3505 ls.append(true);
3506 let original_list = ls.finish();
3507 let original_data = original_list.into_data();
3508
3509 let slice_data = original_data.slice(1, 1);
3510 let (new_offsets, original_start, length) =
3511 reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
3512 assert_eq!(vec![0, 2], new_offsets.typed_data::<i32>());
3513 assert_eq!(0, original_start);
3514 assert_eq!(2, length);
3515 }
3516
3517 fn roundtrip_ensure_sliced_smaller(in_batch: RecordBatch, expected_size_factor: usize) {
3520 let in_sliced = in_batch.slice(999, 1);
3522
3523 let bytes_batch = serialize_file(&in_batch);
3524 let bytes_sliced = serialize_file(&in_sliced);
3525
3526 assert!(bytes_sliced.len() < (bytes_batch.len() / expected_size_factor));
3528
3529 let out_batch = deserialize_file(bytes_batch);
3531 assert_eq!(in_batch, out_batch);
3532
3533 let out_sliced = deserialize_file(bytes_sliced);
3534 assert_eq!(in_sliced, out_sliced);
3535 }
3536
3537 #[test]
3538 fn encode_lists() {
3539 let val_inner = Field::new_list_field(DataType::UInt32, true);
3540 let val_list_field = Field::new("val", DataType::List(Arc::new(val_inner)), false);
3541 let schema = Arc::new(Schema::new(vec![val_list_field]));
3542
3543 let values = Arc::new(generate_list_data::<i32>());
3544
3545 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3546 roundtrip_ensure_sliced_smaller(in_batch, 1000);
3547 }
3548
3549 #[test]
3550 fn encode_empty_list() {
3551 let val_inner = Field::new_list_field(DataType::UInt32, true);
3552 let val_list_field = Field::new("val", DataType::List(Arc::new(val_inner)), false);
3553 let schema = Arc::new(Schema::new(vec![val_list_field]));
3554
3555 let values = Arc::new(generate_list_data::<i32>());
3556
3557 let in_batch = RecordBatch::try_new(schema, vec![values])
3558 .unwrap()
3559 .slice(999, 0);
3560 let out_batch = deserialize_file(serialize_file(&in_batch));
3561 assert_eq!(in_batch, out_batch);
3562 }
3563
3564 #[test]
3565 fn encode_large_lists() {
3566 let val_inner = Field::new_list_field(DataType::UInt32, true);
3567 let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3568 let schema = Arc::new(Schema::new(vec![val_list_field]));
3569
3570 let values = Arc::new(generate_list_data::<i64>());
3571
3572 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3575 roundtrip_ensure_sliced_smaller(in_batch, 1000);
3576 }
3577
3578 #[test]
3579 fn encode_large_lists_non_zero_offset() {
3580 let val_inner = Field::new_list_field(DataType::UInt32, true);
3581 let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3582 let schema = Arc::new(Schema::new(vec![val_list_field]));
3583
3584 let values = Arc::new(generate_list_data::<i64>());
3585
3586 check_sliced_list_array(schema, values);
3587 }
3588
3589 #[test]
3590 fn encode_large_lists_string_non_zero_offset() {
3591 let val_inner = Field::new_list_field(DataType::Utf8, true);
3592 let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3593 let schema = Arc::new(Schema::new(vec![val_list_field]));
3594
3595 let values = Arc::new(generate_string_list_data::<i64>());
3596
3597 check_sliced_list_array(schema, values);
3598 }
3599
3600 #[test]
3601 fn encode_large_list_string_view_non_zero_offset() {
3602 let val_inner = Field::new_list_field(DataType::Utf8View, true);
3603 let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3604 let schema = Arc::new(Schema::new(vec![val_list_field]));
3605
3606 let values = Arc::new(generate_utf8view_list_data::<i64>());
3607
3608 check_sliced_list_array(schema, values);
3609 }
3610
3611 fn check_sliced_list_array(schema: Arc<Schema>, values: Arc<GenericListArray<i64>>) {
3612 for (offset, len) in [(999, 1), (0, 13), (47, 12), (values.len() - 13, 13)] {
3613 let in_batch = RecordBatch::try_new(schema.clone(), vec![values.clone()])
3614 .unwrap()
3615 .slice(offset, len);
3616 let out_batch = deserialize_file(serialize_file(&in_batch));
3617 assert_eq!(in_batch, out_batch);
3618 }
3619 }
3620
3621 #[test]
3622 fn encode_nested_lists() {
3623 let inner_int = Arc::new(Field::new_list_field(DataType::UInt32, true));
3624 let inner_list_field = Arc::new(Field::new_list_field(DataType::List(inner_int), true));
3625 let list_field = Field::new("val", DataType::List(inner_list_field), true);
3626 let schema = Arc::new(Schema::new(vec![list_field]));
3627
3628 let values = Arc::new(generate_nested_list_data::<i32>());
3629
3630 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3631 roundtrip_ensure_sliced_smaller(in_batch, 1000);
3632 }
3633
3634 #[test]
3635 fn encode_nested_lists_starting_at_zero() {
3636 let inner_int = Arc::new(Field::new("item", DataType::UInt32, true));
3637 let inner_list_field = Arc::new(Field::new("item", DataType::List(inner_int), true));
3638 let list_field = Field::new("val", DataType::List(inner_list_field), true);
3639 let schema = Arc::new(Schema::new(vec![list_field]));
3640
3641 let values = Arc::new(generate_nested_list_data_starting_at_zero::<i32>());
3642
3643 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3644 roundtrip_ensure_sliced_smaller(in_batch, 1);
3645 }
3646
3647 #[test]
3648 fn encode_map_array() {
3649 let keys = Arc::new(Field::new("keys", DataType::UInt32, false));
3650 let values = Arc::new(Field::new("values", DataType::UInt32, true));
3651 let map_field = Field::new_map("map", "entries", keys, values, false, true);
3652 let schema = Arc::new(Schema::new(vec![map_field]));
3653
3654 let values = Arc::new(generate_map_array_data());
3655
3656 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3657 roundtrip_ensure_sliced_smaller(in_batch, 1000);
3658 }
3659
3660 fn generate_list_view_data<O: OffsetSizeTrait>() -> GenericListViewArray<O> {
3661 let mut builder = GenericListViewBuilder::<O, _>::new(UInt32Builder::new());
3662
3663 for i in 0u32..100_000 {
3664 if i.is_multiple_of(10_000) {
3665 builder.append(false);
3666 continue;
3667 }
3668 for value in [i, i, i] {
3669 builder.values().append_value(value);
3670 }
3671 builder.append(true);
3672 }
3673
3674 builder.finish()
3675 }
3676
3677 #[test]
3678 fn encode_list_view_arrays() {
3679 let val_inner = Field::new_list_field(DataType::UInt32, true);
3680 let val_field = Field::new("val", DataType::ListView(Arc::new(val_inner)), true);
3681 let schema = Arc::new(Schema::new(vec![val_field]));
3682
3683 let values = Arc::new(generate_list_view_data::<i32>());
3684
3685 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3686 let out_batch = deserialize_file(serialize_file(&in_batch));
3687 assert_eq!(in_batch, out_batch);
3688 }
3689
3690 #[test]
3691 fn encode_large_list_view_arrays() {
3692 let val_inner = Field::new_list_field(DataType::UInt32, true);
3693 let val_field = Field::new("val", DataType::LargeListView(Arc::new(val_inner)), true);
3694 let schema = Arc::new(Schema::new(vec![val_field]));
3695
3696 let values = Arc::new(generate_list_view_data::<i64>());
3697
3698 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3699 let out_batch = deserialize_file(serialize_file(&in_batch));
3700 assert_eq!(in_batch, out_batch);
3701 }
3702
3703 #[test]
3704 fn check_sliced_list_view_array() {
3705 let inner = Field::new_list_field(DataType::UInt32, true);
3706 let field = Field::new("val", DataType::ListView(Arc::new(inner)), true);
3707 let schema = Arc::new(Schema::new(vec![field]));
3708 let values = Arc::new(generate_list_view_data::<i32>());
3709
3710 for (offset, len) in [(999, 1), (0, 13), (47, 12), (values.len() - 13, 13)] {
3711 let in_batch = RecordBatch::try_new(schema.clone(), vec![values.clone()])
3712 .unwrap()
3713 .slice(offset, len);
3714 let out_batch = deserialize_file(serialize_file(&in_batch));
3715 assert_eq!(in_batch, out_batch);
3716 }
3717 }
3718
3719 #[test]
3720 fn check_sliced_large_list_view_array() {
3721 let inner = Field::new_list_field(DataType::UInt32, true);
3722 let field = Field::new("val", DataType::LargeListView(Arc::new(inner)), true);
3723 let schema = Arc::new(Schema::new(vec![field]));
3724 let values = Arc::new(generate_list_view_data::<i64>());
3725
3726 for (offset, len) in [(999, 1), (0, 13), (47, 12), (values.len() - 13, 13)] {
3727 let in_batch = RecordBatch::try_new(schema.clone(), vec![values.clone()])
3728 .unwrap()
3729 .slice(offset, len);
3730 let out_batch = deserialize_file(serialize_file(&in_batch));
3731 assert_eq!(in_batch, out_batch);
3732 }
3733 }
3734
3735 fn generate_nested_list_view_data<O: OffsetSizeTrait>() -> GenericListViewArray<O> {
3736 let inner_builder = UInt32Builder::new();
3737 let middle_builder = GenericListViewBuilder::<O, _>::new(inner_builder);
3738 let mut outer_builder = GenericListViewBuilder::<O, _>::new(middle_builder);
3739
3740 for i in 0u32..10_000 {
3741 if i.is_multiple_of(1_000) {
3742 outer_builder.append(false);
3743 continue;
3744 }
3745
3746 for _ in 0..3 {
3747 for value in [i, i + 1, i + 2] {
3748 outer_builder.values().values().append_value(value);
3749 }
3750 outer_builder.values().append(true);
3751 }
3752 outer_builder.append(true);
3753 }
3754
3755 outer_builder.finish()
3756 }
3757
3758 #[test]
3759 fn encode_nested_list_views() {
3760 let inner_int = Arc::new(Field::new_list_field(DataType::UInt32, true));
3761 let inner_list_field = Arc::new(Field::new_list_field(DataType::ListView(inner_int), true));
3762 let list_field = Field::new("val", DataType::ListView(inner_list_field), true);
3763 let schema = Arc::new(Schema::new(vec![list_field]));
3764
3765 let values = Arc::new(generate_nested_list_view_data::<i32>());
3766
3767 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3768 let out_batch = deserialize_file(serialize_file(&in_batch));
3769 assert_eq!(in_batch, out_batch);
3770 }
3771
3772 fn test_roundtrip_list_view_of_dict_impl<OffsetSize: OffsetSizeTrait, U: ArrowNativeType>(
3773 list_data_type: DataType,
3774 offsets: &[U; 5],
3775 sizes: &[U; 4],
3776 ) {
3777 let values = StringArray::from(vec![Some("alpha"), None, Some("beta"), Some("gamma")]);
3778 let keys = Int32Array::from_iter_values([0, 0, 1, 2, 3, 0, 2]);
3779 let dict_array = DictionaryArray::new(keys, Arc::new(values));
3780 let dict_data = dict_array.to_data();
3781
3782 let value_offsets = Buffer::from_slice_ref(offsets);
3783 let value_sizes = Buffer::from_slice_ref(sizes);
3784
3785 let list_data = ArrayData::builder(list_data_type)
3786 .len(4)
3787 .add_buffer(value_offsets)
3788 .add_buffer(value_sizes)
3789 .add_child_data(dict_data)
3790 .build()
3791 .unwrap();
3792 let list_view_array = GenericListViewArray::<OffsetSize>::from(list_data);
3793
3794 let schema = Arc::new(Schema::new(vec![Field::new(
3795 "f1",
3796 list_view_array.data_type().clone(),
3797 false,
3798 )]));
3799 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(list_view_array)]).unwrap();
3800
3801 let output_batch = deserialize_file(serialize_file(&input_batch));
3802 assert_eq!(input_batch, output_batch);
3803
3804 let output_batch = deserialize_stream(serialize_stream(&input_batch));
3805 assert_eq!(input_batch, output_batch);
3806 }
3807
3808 #[test]
3809 fn test_roundtrip_list_view_of_dict() {
3810 #[allow(deprecated)]
3811 let list_data_type = DataType::ListView(Arc::new(Field::new_dict(
3812 "item",
3813 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3814 true,
3815 1,
3816 false,
3817 )));
3818 let offsets: &[i32; 5] = &[0, 2, 4, 4, 7];
3819 let sizes: &[i32; 4] = &[2, 2, 0, 3];
3820 test_roundtrip_list_view_of_dict_impl::<i32, i32>(list_data_type, offsets, sizes);
3821 }
3822
3823 #[test]
3824 fn test_roundtrip_large_list_view_of_dict() {
3825 #[allow(deprecated)]
3826 let list_data_type = DataType::LargeListView(Arc::new(Field::new_dict(
3827 "item",
3828 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3829 true,
3830 2,
3831 false,
3832 )));
3833 let offsets: &[i64; 5] = &[0, 2, 4, 4, 7];
3834 let sizes: &[i64; 4] = &[2, 2, 0, 3];
3835 test_roundtrip_list_view_of_dict_impl::<i64, i64>(list_data_type, offsets, sizes);
3836 }
3837
3838 #[test]
3839 fn test_roundtrip_sliced_list_view_of_dict() {
3840 #[allow(deprecated)]
3841 let list_data_type = DataType::ListView(Arc::new(Field::new_dict(
3842 "item",
3843 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3844 true,
3845 3,
3846 false,
3847 )));
3848
3849 let values = StringArray::from(vec![Some("alpha"), None, Some("beta"), Some("gamma")]);
3850 let keys = Int32Array::from_iter_values([0, 0, 1, 2, 3, 0, 2, 1, 0, 3, 2, 1]);
3851 let dict_array = DictionaryArray::new(keys, Arc::new(values));
3852 let dict_data = dict_array.to_data();
3853
3854 let offsets: &[i32; 7] = &[0, 2, 4, 4, 7, 9, 12];
3855 let sizes: &[i32; 6] = &[2, 2, 0, 3, 2, 3];
3856 let value_offsets = Buffer::from_slice_ref(offsets);
3857 let value_sizes = Buffer::from_slice_ref(sizes);
3858
3859 let list_data = ArrayData::builder(list_data_type)
3860 .len(6)
3861 .add_buffer(value_offsets)
3862 .add_buffer(value_sizes)
3863 .add_child_data(dict_data)
3864 .build()
3865 .unwrap();
3866 let list_view_array = GenericListViewArray::<i32>::from(list_data);
3867
3868 let schema = Arc::new(Schema::new(vec![Field::new(
3869 "f1",
3870 list_view_array.data_type().clone(),
3871 false,
3872 )]));
3873 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(list_view_array)]).unwrap();
3874
3875 let sliced_batch = input_batch.slice(1, 4);
3876
3877 let output_batch = deserialize_file(serialize_file(&sliced_batch));
3878 assert_eq!(sliced_batch, output_batch);
3879
3880 let output_batch = deserialize_stream(serialize_stream(&sliced_batch));
3881 assert_eq!(sliced_batch, output_batch);
3882 }
3883
3884 #[test]
3885 fn test_roundtrip_dense_union_of_dict() {
3886 let values = StringArray::from(vec![Some("alpha"), None, Some("beta"), Some("gamma")]);
3887 let keys = Int32Array::from_iter_values([0, 0, 1, 2, 3, 0, 2]);
3888 let dict_array = DictionaryArray::new(keys, Arc::new(values));
3889
3890 #[allow(deprecated)]
3891 let dict_field = Arc::new(Field::new_dict(
3892 "dict",
3893 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3894 true,
3895 1,
3896 false,
3897 ));
3898 let int_field = Arc::new(Field::new("int", DataType::Int32, false));
3899 let union_fields = UnionFields::try_new(vec![0, 1], vec![dict_field, int_field]).unwrap();
3900
3901 let types = ScalarBuffer::from(vec![0i8, 0, 1, 0, 1, 0, 0]);
3902 let offsets = ScalarBuffer::from(vec![0i32, 1, 0, 2, 1, 3, 4]);
3903
3904 let int_array = Int32Array::from(vec![100, 200]);
3905
3906 let union = UnionArray::try_new(
3907 union_fields.clone(),
3908 types,
3909 Some(offsets),
3910 vec![Arc::new(dict_array), Arc::new(int_array)],
3911 )
3912 .unwrap();
3913
3914 let schema = Arc::new(Schema::new(vec![Field::new(
3915 "union",
3916 DataType::Union(union_fields, UnionMode::Dense),
3917 false,
3918 )]));
3919 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap();
3920
3921 let output_batch = deserialize_file(serialize_file(&input_batch));
3922 assert_eq!(input_batch, output_batch);
3923
3924 let output_batch = deserialize_stream(serialize_stream(&input_batch));
3925 assert_eq!(input_batch, output_batch);
3926 }
3927
3928 #[test]
3929 fn test_roundtrip_sparse_union_of_dict() {
3930 let values = StringArray::from(vec![Some("alpha"), None, Some("beta"), Some("gamma")]);
3931 let keys = Int32Array::from_iter_values([0, 0, 1, 2, 3, 0, 2]);
3932 let dict_array = DictionaryArray::new(keys, Arc::new(values));
3933
3934 #[allow(deprecated)]
3935 let dict_field = Arc::new(Field::new_dict(
3936 "dict",
3937 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3938 true,
3939 2,
3940 false,
3941 ));
3942 let int_field = Arc::new(Field::new("int", DataType::Int32, false));
3943 let union_fields = UnionFields::try_new(vec![0, 1], vec![dict_field, int_field]).unwrap();
3944
3945 let types = ScalarBuffer::from(vec![0i8, 0, 1, 0, 1, 0, 0]);
3946
3947 let int_array = Int32Array::from(vec![0, 0, 100, 0, 200, 0, 0]);
3948
3949 let union = UnionArray::try_new(
3950 union_fields.clone(),
3951 types,
3952 None,
3953 vec![Arc::new(dict_array), Arc::new(int_array)],
3954 )
3955 .unwrap();
3956
3957 let schema = Arc::new(Schema::new(vec![Field::new(
3958 "union",
3959 DataType::Union(union_fields, UnionMode::Sparse),
3960 false,
3961 )]));
3962 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap();
3963
3964 let output_batch = deserialize_file(serialize_file(&input_batch));
3965 assert_eq!(input_batch, output_batch);
3966
3967 let output_batch = deserialize_stream(serialize_stream(&input_batch));
3968 assert_eq!(input_batch, output_batch);
3969 }
3970
3971 #[test]
3972 fn test_roundtrip_map_with_dict_keys() {
3973 let key_values = StringArray::from(vec!["key_a", "key_b", "key_c"]);
3976 let keys = Int32Array::from_iter_values([0, 1, 2, 0, 1, 0]);
3977 let dict_keys = DictionaryArray::new(keys, Arc::new(key_values));
3978
3979 let values = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
3980
3981 #[allow(deprecated)]
3982 let entries_field = Arc::new(Field::new(
3983 "entries",
3984 DataType::Struct(
3985 vec![
3986 Field::new_dict(
3987 "key",
3988 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3989 false,
3990 1,
3991 false,
3992 ),
3993 Field::new("value", DataType::Int32, true),
3994 ]
3995 .into(),
3996 ),
3997 false,
3998 ));
3999
4000 let entries = StructArray::from(vec![
4001 (
4002 Arc::new(Field::new(
4003 "key",
4004 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
4005 false,
4006 )),
4007 Arc::new(dict_keys) as ArrayRef,
4008 ),
4009 (
4010 Arc::new(Field::new("value", DataType::Int32, true)),
4011 Arc::new(values) as ArrayRef,
4012 ),
4013 ]);
4014
4015 let offsets = Buffer::from_slice_ref([0i32, 2, 4, 6]);
4016
4017 let map_data = ArrayData::builder(DataType::Map(entries_field, false))
4018 .len(3)
4019 .add_buffer(offsets)
4020 .add_child_data(entries.into_data())
4021 .build()
4022 .unwrap();
4023 let map_array = MapArray::from(map_data);
4024
4025 let schema = Arc::new(Schema::new(vec![Field::new(
4026 "map",
4027 map_array.data_type().clone(),
4028 false,
4029 )]));
4030 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(map_array)]).unwrap();
4031
4032 let output_batch = deserialize_file(serialize_file(&input_batch));
4033 assert_eq!(input_batch, output_batch);
4034
4035 let output_batch = deserialize_stream(serialize_stream(&input_batch));
4036 assert_eq!(input_batch, output_batch);
4037 }
4038
4039 #[test]
4040 fn test_roundtrip_map_with_dict_values() {
4041 let keys = StringArray::from(vec!["a", "b", "c", "d", "e", "f"]);
4044
4045 let value_values = StringArray::from(vec!["val_x", "val_y", "val_z"]);
4046 let value_keys = Int32Array::from_iter_values([0, 1, 2, 0, 1, 0]);
4047 let dict_values = DictionaryArray::new(value_keys, Arc::new(value_values));
4048
4049 #[allow(deprecated)]
4050 let entries_field = Arc::new(Field::new(
4051 "entries",
4052 DataType::Struct(
4053 vec![
4054 Field::new("key", DataType::Utf8, false),
4055 Field::new_dict(
4056 "value",
4057 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
4058 true,
4059 2,
4060 false,
4061 ),
4062 ]
4063 .into(),
4064 ),
4065 false,
4066 ));
4067
4068 let entries = StructArray::from(vec![
4069 (
4070 Arc::new(Field::new("key", DataType::Utf8, false)),
4071 Arc::new(keys) as ArrayRef,
4072 ),
4073 (
4074 Arc::new(Field::new(
4075 "value",
4076 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
4077 true,
4078 )),
4079 Arc::new(dict_values) as ArrayRef,
4080 ),
4081 ]);
4082
4083 let offsets = Buffer::from_slice_ref([0i32, 2, 4, 6]);
4084
4085 let map_data = ArrayData::builder(DataType::Map(entries_field, false))
4086 .len(3)
4087 .add_buffer(offsets)
4088 .add_child_data(entries.into_data())
4089 .build()
4090 .unwrap();
4091 let map_array = MapArray::from(map_data);
4092
4093 let schema = Arc::new(Schema::new(vec![Field::new(
4094 "map",
4095 map_array.data_type().clone(),
4096 false,
4097 )]));
4098 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(map_array)]).unwrap();
4099
4100 let output_batch = deserialize_file(serialize_file(&input_batch));
4101 assert_eq!(input_batch, output_batch);
4102
4103 let output_batch = deserialize_stream(serialize_stream(&input_batch));
4104 assert_eq!(input_batch, output_batch);
4105 }
4106
4107 #[test]
4108 fn test_decimal128_alignment16_is_sufficient() {
4109 const IPC_ALIGNMENT: usize = 16;
4110
4111 for num_cols in [1, 2, 3, 17, 50, 73, 99] {
4116 let num_rows = (num_cols * 7 + 11) % 100; let mut fields = Vec::new();
4119 let mut arrays = Vec::new();
4120 for i in 0..num_cols {
4121 let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
4122 let array = Decimal128Array::from(vec![num_cols as i128; num_rows]);
4123 fields.push(field);
4124 arrays.push(Arc::new(array) as Arc<dyn Array>);
4125 }
4126 let schema = Schema::new(fields);
4127 let batch = RecordBatch::try_new(Arc::new(schema), arrays).unwrap();
4128
4129 let mut writer = FileWriter::try_new_with_options(
4130 Vec::new(),
4131 batch.schema_ref(),
4132 IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
4133 )
4134 .unwrap();
4135 writer.write(&batch).unwrap();
4136 writer.finish().unwrap();
4137
4138 let out: Vec<u8> = writer.into_inner().unwrap();
4139
4140 let buffer = Buffer::from_vec(out);
4141 let trailer_start = buffer.len() - 10;
4142 let footer_len =
4143 read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
4144 let footer =
4145 root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
4146
4147 let schema = fb_to_schema(footer.schema().unwrap());
4148
4149 let decoder =
4152 FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true);
4153
4154 let batches = footer.recordBatches().unwrap();
4155
4156 let block = batches.get(0);
4157 let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
4158 let data = buffer.slice_with_length(block.offset() as _, block_len);
4159
4160 let batch2 = decoder.read_record_batch(block, &data).unwrap().unwrap();
4161
4162 assert_eq!(batch, batch2);
4163 }
4164 }
4165
4166 #[test]
4167 fn test_decimal128_alignment8_is_unaligned() {
4168 const IPC_ALIGNMENT: usize = 8;
4169
4170 let num_cols = 2;
4171 let num_rows = 1;
4172
4173 let mut fields = Vec::new();
4174 let mut arrays = Vec::new();
4175 for i in 0..num_cols {
4176 let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
4177 let array = Decimal128Array::from(vec![num_cols as i128; num_rows]);
4178 fields.push(field);
4179 arrays.push(Arc::new(array) as Arc<dyn Array>);
4180 }
4181 let schema = Schema::new(fields);
4182 let batch = RecordBatch::try_new(Arc::new(schema), arrays).unwrap();
4183
4184 let mut writer = FileWriter::try_new_with_options(
4185 Vec::new(),
4186 batch.schema_ref(),
4187 IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
4188 )
4189 .unwrap();
4190 writer.write(&batch).unwrap();
4191 writer.finish().unwrap();
4192
4193 let out: Vec<u8> = writer.into_inner().unwrap();
4194
4195 let buffer = Buffer::from_vec(out);
4196 let trailer_start = buffer.len() - 10;
4197 let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
4198 let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
4199 let schema = fb_to_schema(footer.schema().unwrap());
4200
4201 let decoder =
4204 FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true);
4205
4206 let batches = footer.recordBatches().unwrap();
4207
4208 let block = batches.get(0);
4209 let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
4210 let data = buffer.slice_with_length(block.offset() as _, block_len);
4211
4212 let result = decoder.read_record_batch(block, &data);
4213
4214 let error = result.unwrap_err();
4215 assert_eq!(
4216 error.to_string(),
4217 "Invalid argument error: Misaligned buffers[0] in array of type Decimal128(38, 10), \
4218 offset from expected alignment of 16 by 8"
4219 );
4220 }
4221
4222 #[test]
4223 fn test_flush() {
4224 let num_cols = 2;
4227 let mut fields = Vec::new();
4228 let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap();
4229 for i in 0..num_cols {
4230 let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
4231 fields.push(field);
4232 }
4233 let schema = Schema::new(fields);
4234 let inner_stream_writer = BufWriter::with_capacity(1024, Vec::new());
4235 let inner_file_writer = BufWriter::with_capacity(1024, Vec::new());
4236 let mut stream_writer =
4237 StreamWriter::try_new_with_options(inner_stream_writer, &schema, options.clone())
4238 .unwrap();
4239 let mut file_writer =
4240 FileWriter::try_new_with_options(inner_file_writer, &schema, options).unwrap();
4241
4242 let stream_bytes_written_on_new = stream_writer.get_ref().get_ref().len();
4243 let file_bytes_written_on_new = file_writer.get_ref().get_ref().len();
4244 stream_writer.flush().unwrap();
4245 file_writer.flush().unwrap();
4246 let stream_bytes_written_on_flush = stream_writer.get_ref().get_ref().len();
4247 let file_bytes_written_on_flush = file_writer.get_ref().get_ref().len();
4248 let stream_out = stream_writer.into_inner().unwrap().into_inner().unwrap();
4249 let expected_stream_flushed_bytes = stream_out.len() - 8;
4253 let expected_file_flushed_bytes = expected_stream_flushed_bytes + 8;
4256
4257 assert!(
4258 stream_bytes_written_on_new < stream_bytes_written_on_flush,
4259 "this test makes no sense if flush is not actually required"
4260 );
4261 assert!(
4262 file_bytes_written_on_new < file_bytes_written_on_flush,
4263 "this test makes no sense if flush is not actually required"
4264 );
4265 assert_eq!(stream_bytes_written_on_flush, expected_stream_flushed_bytes);
4266 assert_eq!(file_bytes_written_on_flush, expected_file_flushed_bytes);
4267 }
4268
4269 #[test]
4270 fn test_roundtrip_list_of_fixed_list() -> Result<(), ArrowError> {
4271 let l1_type =
4272 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, false)), 3);
4273 let l2_type = DataType::List(Arc::new(Field::new("item", l1_type.clone(), false)));
4274
4275 let l0_builder = Float32Builder::new();
4276 let l1_builder = FixedSizeListBuilder::new(l0_builder, 3).with_field(Arc::new(Field::new(
4277 "item",
4278 DataType::Float32,
4279 false,
4280 )));
4281 let mut l2_builder =
4282 ListBuilder::new(l1_builder).with_field(Arc::new(Field::new("item", l1_type, false)));
4283
4284 for point in [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0], [7.0, 8.0, 9.0]] {
4285 l2_builder.values().values().append_value(point[0]);
4286 l2_builder.values().values().append_value(point[1]);
4287 l2_builder.values().values().append_value(point[2]);
4288
4289 l2_builder.values().append(true);
4290 }
4291 l2_builder.append(true);
4292
4293 let point = [10., 11., 12.];
4294 l2_builder.values().values().append_value(point[0]);
4295 l2_builder.values().values().append_value(point[1]);
4296 l2_builder.values().values().append_value(point[2]);
4297
4298 l2_builder.values().append(true);
4299 l2_builder.append(true);
4300
4301 let array = Arc::new(l2_builder.finish()) as ArrayRef;
4302
4303 let schema = Arc::new(Schema::new_with_metadata(
4304 vec![Field::new("points", l2_type, false)],
4305 HashMap::default(),
4306 ));
4307
4308 test_slices(&array, &schema, 0, 1)?;
4311 test_slices(&array, &schema, 0, 2)?;
4312 test_slices(&array, &schema, 1, 1)?;
4313
4314 Ok(())
4315 }
4316
4317 #[test]
4318 fn test_roundtrip_list_of_fixed_list_w_nulls() -> Result<(), ArrowError> {
4319 let l0_builder = Float32Builder::new();
4320 let l1_builder = FixedSizeListBuilder::new(l0_builder, 3);
4321 let mut l2_builder = ListBuilder::new(l1_builder);
4322
4323 for point in [
4324 [Some(1.0), Some(2.0), None],
4325 [Some(4.0), Some(5.0), Some(6.0)],
4326 [None, Some(8.0), Some(9.0)],
4327 ] {
4328 for p in point {
4329 match p {
4330 Some(p) => l2_builder.values().values().append_value(p),
4331 None => l2_builder.values().values().append_null(),
4332 }
4333 }
4334
4335 l2_builder.values().append(true);
4336 }
4337 l2_builder.append(true);
4338
4339 let point = [Some(10.), None, None];
4340 for p in point {
4341 match p {
4342 Some(p) => l2_builder.values().values().append_value(p),
4343 None => l2_builder.values().values().append_null(),
4344 }
4345 }
4346
4347 l2_builder.values().append(true);
4348 l2_builder.append(true);
4349
4350 let array = Arc::new(l2_builder.finish()) as ArrayRef;
4351
4352 let schema = Arc::new(Schema::new_with_metadata(
4353 vec![Field::new(
4354 "points",
4355 DataType::List(Arc::new(Field::new(
4356 "item",
4357 DataType::FixedSizeList(
4358 Arc::new(Field::new("item", DataType::Float32, true)),
4359 3,
4360 ),
4361 true,
4362 ))),
4363 true,
4364 )],
4365 HashMap::default(),
4366 ));
4367
4368 test_slices(&array, &schema, 0, 1)?;
4371 test_slices(&array, &schema, 0, 2)?;
4372 test_slices(&array, &schema, 1, 1)?;
4373
4374 Ok(())
4375 }
4376
4377 fn test_slices(
4378 parent_array: &ArrayRef,
4379 schema: &SchemaRef,
4380 offset: usize,
4381 length: usize,
4382 ) -> Result<(), ArrowError> {
4383 let subarray = parent_array.slice(offset, length);
4384 let original_batch = RecordBatch::try_new(schema.clone(), vec![subarray])?;
4385
4386 let mut bytes = Vec::new();
4387 let mut writer = StreamWriter::try_new(&mut bytes, schema)?;
4388 writer.write(&original_batch)?;
4389 writer.finish()?;
4390
4391 let mut cursor = std::io::Cursor::new(bytes);
4392 let mut reader = StreamReader::try_new(&mut cursor, None)?;
4393 let returned_batch = reader.next().unwrap()?;
4394
4395 assert_eq!(original_batch, returned_batch);
4396
4397 Ok(())
4398 }
4399
4400 #[test]
4401 fn test_roundtrip_fixed_list() -> Result<(), ArrowError> {
4402 let int_builder = Int64Builder::new();
4403 let mut fixed_list_builder = FixedSizeListBuilder::new(int_builder, 3)
4404 .with_field(Arc::new(Field::new("item", DataType::Int64, false)));
4405
4406 for point in [[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]] {
4407 fixed_list_builder.values().append_value(point[0]);
4408 fixed_list_builder.values().append_value(point[1]);
4409 fixed_list_builder.values().append_value(point[2]);
4410
4411 fixed_list_builder.append(true);
4412 }
4413
4414 let array = Arc::new(fixed_list_builder.finish()) as ArrayRef;
4415
4416 let schema = Arc::new(Schema::new_with_metadata(
4417 vec![Field::new(
4418 "points",
4419 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, false)), 3),
4420 false,
4421 )],
4422 HashMap::default(),
4423 ));
4424
4425 test_slices(&array, &schema, 0, 4)?;
4428 test_slices(&array, &schema, 0, 2)?;
4429 test_slices(&array, &schema, 1, 3)?;
4430 test_slices(&array, &schema, 2, 1)?;
4431
4432 Ok(())
4433 }
4434
4435 #[test]
4436 fn test_roundtrip_fixed_list_w_nulls() -> Result<(), ArrowError> {
4437 let int_builder = Int64Builder::new();
4438 let mut fixed_list_builder = FixedSizeListBuilder::new(int_builder, 3);
4439
4440 for point in [
4441 [Some(1), Some(2), None],
4442 [Some(4), Some(5), Some(6)],
4443 [None, Some(8), Some(9)],
4444 [Some(10), None, None],
4445 ] {
4446 for p in point {
4447 match p {
4448 Some(p) => fixed_list_builder.values().append_value(p),
4449 None => fixed_list_builder.values().append_null(),
4450 }
4451 }
4452
4453 fixed_list_builder.append(true);
4454 }
4455
4456 let array = Arc::new(fixed_list_builder.finish()) as ArrayRef;
4457
4458 let schema = Arc::new(Schema::new_with_metadata(
4459 vec![Field::new(
4460 "points",
4461 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, true)), 3),
4462 true,
4463 )],
4464 HashMap::default(),
4465 ));
4466
4467 test_slices(&array, &schema, 0, 4)?;
4470 test_slices(&array, &schema, 0, 2)?;
4471 test_slices(&array, &schema, 1, 3)?;
4472 test_slices(&array, &schema, 2, 1)?;
4473
4474 Ok(())
4475 }
4476
4477 #[test]
4478 fn test_metadata_encoding_ordering() {
4479 fn create_hash() -> u64 {
4480 let metadata: HashMap<String, String> = [
4481 ("a", "1"), ("b", "2"), ("c", "3"), ("d", "4"), ("e", "5"), ]
4487 .into_iter()
4488 .map(|(k, v)| (k.to_owned(), v.to_owned()))
4489 .collect();
4490
4491 let schema = Arc::new(
4493 Schema::new(vec![
4494 Field::new("a", DataType::Int64, true).with_metadata(metadata.clone()),
4495 ])
4496 .with_metadata(metadata)
4497 .clone(),
4498 );
4499 let batch = RecordBatch::new_empty(schema.clone());
4500
4501 let mut bytes = Vec::new();
4502 let mut w = StreamWriter::try_new(&mut bytes, batch.schema_ref()).unwrap();
4503 w.write(&batch).unwrap();
4504 w.finish().unwrap();
4505
4506 let mut h = std::hash::DefaultHasher::new();
4507 h.write(&bytes);
4508 h.finish()
4509 }
4510
4511 let expected = create_hash();
4512
4513 let all_passed = (0..20).all(|_| create_hash() == expected);
4518 assert!(all_passed);
4519 }
4520
4521 #[test]
4522 fn test_dictionary_tracker_reset() {
4523 let data_gen = IpcDataGenerator::default();
4524 let mut dictionary_tracker = DictionaryTracker::new(false);
4525 let writer_options = IpcWriteOptions::default();
4526 let mut compression_ctx = CompressionContext::default();
4527
4528 let schema = Arc::new(Schema::new(vec![Field::new(
4529 "a",
4530 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
4531 false,
4532 )]));
4533
4534 let mut write_single_batch_stream =
4535 |batch: RecordBatch, dict_tracker: &mut DictionaryTracker| -> Vec<u8> {
4536 let mut buffer = Vec::new();
4537
4538 let stream_header = data_gen.schema_to_bytes_with_dictionary_tracker(
4540 &schema,
4541 dict_tracker,
4542 &writer_options,
4543 );
4544 _ = write_message(&mut buffer, stream_header, &writer_options).unwrap();
4545
4546 let (encoded_dicts, encoded_batch) = data_gen
4547 .encode(&batch, dict_tracker, &writer_options, &mut compression_ctx)
4548 .unwrap();
4549 for encoded_dict in encoded_dicts {
4550 _ = write_message(&mut buffer, encoded_dict, &writer_options).unwrap();
4551 }
4552 _ = write_message(&mut buffer, encoded_batch, &writer_options).unwrap();
4553
4554 buffer
4555 };
4556
4557 let batch1 = RecordBatch::try_new(
4558 schema.clone(),
4559 vec![Arc::new(DictionaryArray::new(
4560 UInt8Array::from_iter_values([0]),
4561 Arc::new(StringArray::from_iter_values(["a"])),
4562 ))],
4563 )
4564 .unwrap();
4565 let buffer = write_single_batch_stream(batch1.clone(), &mut dictionary_tracker);
4566
4567 let mut reader = StreamReader::try_new(Cursor::new(buffer), None).unwrap();
4569 let read_batch = reader.next().unwrap().unwrap();
4570 assert_eq!(read_batch, batch1);
4571
4572 dictionary_tracker.clear();
4574
4575 let batch2 = RecordBatch::try_new(
4577 schema.clone(),
4578 vec![Arc::new(DictionaryArray::new(
4579 UInt8Array::from_iter_values([0]),
4580 Arc::new(StringArray::from_iter_values(["a"])),
4581 ))],
4582 )
4583 .unwrap();
4584 let buffer = write_single_batch_stream(batch2.clone(), &mut dictionary_tracker);
4585 let mut reader = StreamReader::try_new(Cursor::new(buffer), None).unwrap();
4586 let read_batch = reader.next().unwrap().unwrap();
4587 assert_eq!(read_batch, batch2);
4588 }
4589}