1use std::cmp::min;
28use std::collections::HashMap;
29use std::io::{BufWriter, Write};
30use std::mem::size_of;
31use std::sync::Arc;
32
33use flatbuffers::FlatBufferBuilder;
34
35use arrow_array::builder::BufferBuilder;
36use arrow_array::cast::*;
37use arrow_array::types::{Int16Type, Int32Type, Int64Type, RunEndIndexType};
38use arrow_array::*;
39use arrow_buffer::bit_util;
40use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer, ToByteSlice};
41use arrow_data::{ArrayData, ArrayDataBuilder, BufferSpec, layout};
42use arrow_schema::*;
43
44use crate::CONTINUATION_MARKER;
45use crate::compression::CompressionCodec;
46pub use crate::compression::CompressionContext;
47use crate::convert::IpcSchemaEncoder;
48
49#[derive(Debug, Clone)]
51pub struct IpcWriteOptions {
52 alignment: u8,
55 write_legacy_ipc_format: bool,
57 metadata_version: crate::MetadataVersion,
66 batch_compression_type: Option<crate::CompressionType>,
69 dictionary_handling: DictionaryHandling,
71}
72
73enum EncodedBuffer {
79 Raw(Buffer),
81 Compressed(Vec<u8>),
83}
84
85impl EncodedBuffer {
86 fn as_slice(&self) -> &[u8] {
87 match self {
88 EncodedBuffer::Raw(b) => b.as_slice(),
89 EncodedBuffer::Compressed(v) => v.as_slice(),
90 }
91 }
92
93 fn len(&self) -> usize {
94 match self {
95 EncodedBuffer::Raw(b) => b.len(),
96 EncodedBuffer::Compressed(v) => v.len(),
97 }
98 }
99}
100enum IpcBodySink<'a> {
102 Write(&'a mut Vec<u8>),
104 Collect(&'a mut Vec<EncodedBuffer>),
106}
107impl<'a> IpcBodySink<'a> {
108 pub fn write(&mut self, pad_len: usize, buffer: EncodedBuffer) {
110 match self {
111 IpcBodySink::Write(vec) => {
112 vec.extend_from_slice(buffer.as_slice());
113 vec.extend_from_slice(&PADDING[..pad_len]);
114 }
115 IpcBodySink::Collect(vec) => {
116 vec.push(buffer);
117 }
118 }
119 }
120}
121
122struct IpcWriteMetadata {
127 dictionary_block_sizes: Vec<(usize, usize)>,
130 padded_header_len: usize,
132 body_len: usize,
134}
135
136impl IpcWriteOptions {
137 pub fn try_with_compression(
142 mut self,
143 batch_compression_type: Option<crate::CompressionType>,
144 ) -> Result<Self, ArrowError> {
145 self.batch_compression_type = batch_compression_type;
146
147 if self.batch_compression_type.is_some()
148 && self.metadata_version < crate::MetadataVersion::V5
149 {
150 return Err(ArrowError::InvalidArgumentError(
151 "Compression only supported in metadata v5 and above".to_string(),
152 ));
153 }
154 Ok(self)
155 }
156 pub fn try_new(
158 alignment: usize,
159 write_legacy_ipc_format: bool,
160 metadata_version: crate::MetadataVersion,
161 ) -> Result<Self, ArrowError> {
162 let is_alignment_valid =
163 alignment == 8 || alignment == 16 || alignment == 32 || alignment == 64;
164 if !is_alignment_valid {
165 return Err(ArrowError::InvalidArgumentError(
166 "Alignment should be 8, 16, 32, or 64.".to_string(),
167 ));
168 }
169 let alignment: u8 = u8::try_from(alignment).expect("range already checked");
170 match metadata_version {
171 crate::MetadataVersion::V1
172 | crate::MetadataVersion::V2
173 | crate::MetadataVersion::V3 => Err(ArrowError::InvalidArgumentError(
174 "Writing IPC metadata version 3 and lower not supported".to_string(),
175 )),
176 #[allow(deprecated)]
177 crate::MetadataVersion::V4 => Ok(Self {
178 alignment,
179 write_legacy_ipc_format,
180 metadata_version,
181 batch_compression_type: None,
182 dictionary_handling: DictionaryHandling::default(),
183 }),
184 crate::MetadataVersion::V5 => {
185 if write_legacy_ipc_format {
186 Err(ArrowError::InvalidArgumentError(
187 "Legacy IPC format only supported on metadata version 4".to_string(),
188 ))
189 } else {
190 Ok(Self {
191 alignment,
192 write_legacy_ipc_format,
193 metadata_version,
194 batch_compression_type: None,
195 dictionary_handling: DictionaryHandling::default(),
196 })
197 }
198 }
199 z => Err(ArrowError::InvalidArgumentError(format!(
200 "Unsupported crate::MetadataVersion {z:?}"
201 ))),
202 }
203 }
204
205 pub fn with_dictionary_handling(mut self, dictionary_handling: DictionaryHandling) -> Self {
207 self.dictionary_handling = dictionary_handling;
208 self
209 }
210}
211
212impl Default for IpcWriteOptions {
213 fn default() -> Self {
214 Self {
215 alignment: 64,
216 write_legacy_ipc_format: false,
217 metadata_version: crate::MetadataVersion::V5,
218 batch_compression_type: None,
219 dictionary_handling: DictionaryHandling::default(),
220 }
221 }
222}
223
224#[derive(Debug, Default)]
225pub struct IpcDataGenerator {}
259
260impl IpcDataGenerator {
261 pub fn schema_to_bytes_with_dictionary_tracker(
264 &self,
265 schema: &Schema,
266 dictionary_tracker: &mut DictionaryTracker,
267 write_options: &IpcWriteOptions,
268 ) -> EncodedData {
269 let mut fbb = FlatBufferBuilder::new();
270 let schema = {
271 let fb = IpcSchemaEncoder::new()
272 .with_dictionary_tracker(dictionary_tracker)
273 .schema_to_fb_offset(&mut fbb, schema);
274 fb.as_union_value()
275 };
276
277 let mut message = crate::MessageBuilder::new(&mut fbb);
278 message.add_version(write_options.metadata_version);
279 message.add_header_type(crate::MessageHeader::Schema);
280 message.add_bodyLength(0);
281 message.add_header(schema);
282 let data = message.finish();
284 fbb.finish(data, None);
285
286 let data = fbb.finished_data();
287 EncodedData {
288 ipc_message: data.to_vec(),
289 arrow_data: vec![],
290 }
291 }
292
293 fn _encode_dictionaries<I: Iterator<Item = i64>>(
294 &self,
295 column: &ArrayRef,
296 encoded_dictionaries: &mut Vec<EncodedData>,
297 dictionary_tracker: &mut DictionaryTracker,
298 write_options: &IpcWriteOptions,
299 dict_id: &mut I,
300 compression_context: &mut CompressionContext,
301 ) -> Result<(), ArrowError> {
302 match column.data_type() {
303 DataType::Struct(fields) => {
304 let s = as_struct_array(column);
305 for (field, column) in fields.iter().zip(s.columns()) {
306 self.encode_dictionaries(
307 field,
308 column,
309 encoded_dictionaries,
310 dictionary_tracker,
311 write_options,
312 dict_id,
313 compression_context,
314 )?;
315 }
316 }
317 DataType::RunEndEncoded(_, values) => {
318 let data = column.to_data();
319 if data.child_data().len() != 2 {
320 return Err(ArrowError::InvalidArgumentError(format!(
321 "The run encoded array should have exactly two child arrays. Found {}",
322 data.child_data().len()
323 )));
324 }
325 let values_array = make_array(data.child_data()[1].clone());
328 self.encode_dictionaries(
329 values,
330 &values_array,
331 encoded_dictionaries,
332 dictionary_tracker,
333 write_options,
334 dict_id,
335 compression_context,
336 )?;
337 }
338 DataType::List(field) => {
339 let list = as_list_array(column);
340 self.encode_dictionaries(
341 field,
342 list.values(),
343 encoded_dictionaries,
344 dictionary_tracker,
345 write_options,
346 dict_id,
347 compression_context,
348 )?;
349 }
350 DataType::LargeList(field) => {
351 let list = as_large_list_array(column);
352 self.encode_dictionaries(
353 field,
354 list.values(),
355 encoded_dictionaries,
356 dictionary_tracker,
357 write_options,
358 dict_id,
359 compression_context,
360 )?;
361 }
362 DataType::ListView(field) => {
363 let list = column.as_list_view::<i32>();
364 self.encode_dictionaries(
365 field,
366 list.values(),
367 encoded_dictionaries,
368 dictionary_tracker,
369 write_options,
370 dict_id,
371 compression_context,
372 )?;
373 }
374 DataType::LargeListView(field) => {
375 let list = column.as_list_view::<i64>();
376 self.encode_dictionaries(
377 field,
378 list.values(),
379 encoded_dictionaries,
380 dictionary_tracker,
381 write_options,
382 dict_id,
383 compression_context,
384 )?;
385 }
386 DataType::FixedSizeList(field, _) => {
387 let list = column
388 .as_any()
389 .downcast_ref::<FixedSizeListArray>()
390 .expect("Unable to downcast to fixed size list array");
391 self.encode_dictionaries(
392 field,
393 list.values(),
394 encoded_dictionaries,
395 dictionary_tracker,
396 write_options,
397 dict_id,
398 compression_context,
399 )?;
400 }
401 DataType::Map(field, _) => {
402 let map_array = as_map_array(column);
403
404 let (keys, values) = match field.data_type() {
405 DataType::Struct(fields) if fields.len() == 2 => (&fields[0], &fields[1]),
406 _ => panic!("Incorrect field data type {:?}", field.data_type()),
407 };
408
409 self.encode_dictionaries(
411 keys,
412 map_array.keys(),
413 encoded_dictionaries,
414 dictionary_tracker,
415 write_options,
416 dict_id,
417 compression_context,
418 )?;
419
420 self.encode_dictionaries(
422 values,
423 map_array.values(),
424 encoded_dictionaries,
425 dictionary_tracker,
426 write_options,
427 dict_id,
428 compression_context,
429 )?;
430 }
431 DataType::Union(fields, _) => {
432 let union = as_union_array(column);
433 for (type_id, field) in fields.iter() {
434 let column = union.child(type_id);
435 self.encode_dictionaries(
436 field,
437 column,
438 encoded_dictionaries,
439 dictionary_tracker,
440 write_options,
441 dict_id,
442 compression_context,
443 )?;
444 }
445 }
446 _ => (),
447 }
448
449 Ok(())
450 }
451
452 #[allow(clippy::too_many_arguments)]
453 fn encode_dictionaries<I: Iterator<Item = i64>>(
454 &self,
455 field: &Field,
456 column: &ArrayRef,
457 encoded_dictionaries: &mut Vec<EncodedData>,
458 dictionary_tracker: &mut DictionaryTracker,
459 write_options: &IpcWriteOptions,
460 dict_id_seq: &mut I,
461 compression_context: &mut CompressionContext,
462 ) -> Result<(), ArrowError> {
463 match column.data_type() {
464 DataType::Dictionary(_key_type, _value_type) => {
465 let dict_data = column.to_data();
466 let dict_values = &dict_data.child_data()[0];
467
468 let values = make_array(dict_data.child_data()[0].clone());
469
470 self._encode_dictionaries(
471 &values,
472 encoded_dictionaries,
473 dictionary_tracker,
474 write_options,
475 dict_id_seq,
476 compression_context,
477 )?;
478
479 let dict_id = dict_id_seq.next().ok_or_else(|| {
483 ArrowError::IpcError(format!(
484 "no dict id for field {:?}: field.data_type={:?}, column.data_type={:?}",
485 field.name(),
486 field.data_type(),
487 column.data_type()
488 ))
489 })?;
490
491 match dictionary_tracker.insert_column(
492 dict_id,
493 column,
494 write_options.dictionary_handling,
495 )? {
496 DictionaryUpdate::None => {}
497 DictionaryUpdate::New | DictionaryUpdate::Replaced => {
498 encoded_dictionaries.push(self.dictionary_batch_to_bytes(
499 dict_id,
500 dict_values,
501 write_options,
502 false,
503 compression_context,
504 )?);
505 }
506 DictionaryUpdate::Delta(data) => {
507 encoded_dictionaries.push(self.dictionary_batch_to_bytes(
508 dict_id,
509 &data,
510 write_options,
511 true,
512 compression_context,
513 )?);
514 }
515 }
516 }
517 _ => self._encode_dictionaries(
518 column,
519 encoded_dictionaries,
520 dictionary_tracker,
521 write_options,
522 dict_id_seq,
523 compression_context,
524 )?,
525 }
526
527 Ok(())
528 }
529
530 pub fn encode(
534 &self,
535 batch: &RecordBatch,
536 dictionary_tracker: &mut DictionaryTracker,
537 write_options: &IpcWriteOptions,
538 compression_context: &mut CompressionContext,
539 ) -> Result<(Vec<EncodedData>, EncodedData), ArrowError> {
540 let encoded_dictionaries = self.encode_all_dicts(
541 batch,
542 dictionary_tracker,
543 write_options,
544 compression_context,
545 )?;
546 let mut arrow_data = Vec::new();
547 let (ipc_message, _, tail_pad) = self.record_batch_to_bytes(
548 batch,
549 write_options,
550 compression_context,
551 &mut IpcBodySink::Write(&mut arrow_data),
552 )?;
553 arrow_data.extend_from_slice(&PADDING[..tail_pad]);
554 Ok((
555 encoded_dictionaries,
556 EncodedData {
557 ipc_message,
558 arrow_data,
559 },
560 ))
561 }
562
563 fn encode_all_dicts(
565 &self,
566 batch: &RecordBatch,
567 dictionary_tracker: &mut DictionaryTracker,
568 write_options: &IpcWriteOptions,
569 compression_context: &mut CompressionContext,
570 ) -> Result<Vec<EncodedData>, ArrowError> {
571 let schema = batch.schema();
572 let mut encoded_dictionaries = Vec::with_capacity(schema.flattened_fields().len());
573 let mut dict_id = dictionary_tracker.dict_ids.clone().into_iter();
574 for (i, field) in schema.fields().iter().enumerate() {
575 self.encode_dictionaries(
576 field,
577 batch.column(i),
578 &mut encoded_dictionaries,
579 dictionary_tracker,
580 write_options,
581 &mut dict_id,
582 compression_context,
583 )?;
584 }
585 Ok(encoded_dictionaries)
586 }
587
588 fn write<W: Write>(
592 &self,
593 batch: &RecordBatch,
594 dictionary_tracker: &mut DictionaryTracker,
595 write_options: &IpcWriteOptions,
596 compression_context: &mut CompressionContext,
597 writer: &mut W,
598 ) -> Result<IpcWriteMetadata, ArrowError> {
599 let encoded_dictionaries = self.encode_all_dicts(
600 batch,
601 dictionary_tracker,
602 write_options,
603 compression_context,
604 )?;
605
606 let mut dictionary_block_sizes = Vec::with_capacity(encoded_dictionaries.len());
607 for dict in encoded_dictionaries {
608 dictionary_block_sizes.push(write_message(&mut *writer, dict, write_options)?);
609 }
610
611 let capacity = batch
612 .columns()
613 .iter()
614 .map(|a| estimate_encoded_buffer_count(a.data_type()))
615 .sum();
616 let mut encoded_buffers: Vec<EncodedBuffer> = Vec::with_capacity(capacity);
617 let (ipc_message, body_len, tail_pad) = self.record_batch_to_bytes(
618 batch,
619 write_options,
620 compression_context,
621 &mut IpcBodySink::Collect(&mut encoded_buffers),
622 )?;
623
624 let alignment = write_options.alignment;
625 let a = usize::from(alignment - 1);
626 let prefix_size = if write_options.write_legacy_ipc_format {
627 4
628 } else {
629 8
630 };
631 let aligned_size = (ipc_message.len() + prefix_size + a) & !a;
632 write_continuation(
633 &mut *writer,
634 write_options,
635 (aligned_size - prefix_size) as i32,
636 )?;
637 writer.write_all(&ipc_message)?;
638 writer.write_all(&PADDING[..aligned_size - ipc_message.len() - prefix_size])?;
639 for enc in &encoded_buffers {
640 writer.write_all(enc.as_slice())?;
641 writer.write_all(&PADDING[..pad_to_alignment(alignment, enc.len())])?;
642 }
643 writer.write_all(&PADDING[..tail_pad])?;
644
645 Ok(IpcWriteMetadata {
646 dictionary_block_sizes,
647 padded_header_len: aligned_size,
648 body_len,
649 })
650 }
651
652 #[deprecated(since = "57.0.0", note = "Use `encode` instead")]
656 pub fn encoded_batch(
657 &self,
658 batch: &RecordBatch,
659 dictionary_tracker: &mut DictionaryTracker,
660 write_options: &IpcWriteOptions,
661 ) -> Result<(Vec<EncodedData>, EncodedData), ArrowError> {
662 self.encode(
663 batch,
664 dictionary_tracker,
665 write_options,
666 &mut Default::default(),
667 )
668 }
669
670 fn record_batch_to_bytes(
676 &self,
677 batch: &RecordBatch,
678 write_options: &IpcWriteOptions,
679 compression_context: &mut CompressionContext,
680 sink: &mut IpcBodySink<'_>,
681 ) -> Result<(Vec<u8>, usize, usize), ArrowError> {
682 let mut fbb = FlatBufferBuilder::new();
683
684 let mut nodes: Vec<crate::FieldNode> = vec![];
685 let mut buffers: Vec<crate::Buffer> = vec![];
686
687 let batch_compression_type = write_options.batch_compression_type;
688
689 let compression = batch_compression_type.map(|batch_compression_type| {
690 let mut c = crate::BodyCompressionBuilder::new(&mut fbb);
691 c.add_method(crate::BodyCompressionMethod::BUFFER);
692 c.add_codec(batch_compression_type);
693 c.finish()
694 });
695
696 let compression_codec: Option<CompressionCodec> =
697 batch_compression_type.map(TryInto::try_into).transpose()?;
698
699 let alignment = write_options.alignment;
700 let mut variadic_buffer_counts = vec![];
701 let mut offset = 0i64;
702
703 for array in batch.columns() {
704 let array_data = array.to_data();
705 offset = write_array_data(
706 &array_data,
707 &mut buffers,
708 sink,
709 &mut nodes,
710 offset,
711 array.len(),
712 array.null_count(),
713 compression_codec,
714 compression_context,
715 write_options,
716 )?;
717 append_variadic_buffer_counts(&mut variadic_buffer_counts, &array_data);
718 }
719
720 let tail_pad = pad_to_alignment(alignment, offset as usize);
721 let body_len = offset as usize + tail_pad;
722
723 let buffers = fbb.create_vector(&buffers);
724 let nodes = fbb.create_vector(&nodes);
725 let variadic_buffer = if variadic_buffer_counts.is_empty() {
726 None
727 } else {
728 Some(fbb.create_vector(&variadic_buffer_counts))
729 };
730
731 let root = {
732 let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb);
733 batch_builder.add_length(batch.num_rows() as i64);
734 batch_builder.add_nodes(nodes);
735 batch_builder.add_buffers(buffers);
736 if let Some(c) = compression {
737 batch_builder.add_compression(c);
738 }
739 if let Some(v) = variadic_buffer {
740 batch_builder.add_variadicBufferCounts(v);
741 }
742 batch_builder.finish().as_union_value()
743 };
744 let mut message = crate::MessageBuilder::new(&mut fbb);
746 message.add_version(write_options.metadata_version);
747 message.add_header_type(crate::MessageHeader::RecordBatch);
748 message.add_bodyLength(body_len as i64);
749 message.add_header(root);
750 let root = message.finish();
751 fbb.finish(root, None);
752
753 Ok((fbb.finished_data().to_vec(), body_len, tail_pad))
754 }
755
756 fn dictionary_batch_to_bytes(
759 &self,
760 dict_id: i64,
761 array_data: &ArrayData,
762 write_options: &IpcWriteOptions,
763 is_delta: bool,
764 compression_context: &mut CompressionContext,
765 ) -> Result<EncodedData, ArrowError> {
766 let mut fbb = FlatBufferBuilder::new();
767
768 let mut nodes: Vec<crate::FieldNode> = vec![];
769 let mut buffers: Vec<crate::Buffer> = vec![];
770 let mut arrow_data: Vec<u8> = vec![];
771
772 let batch_compression_type = write_options.batch_compression_type;
774
775 let compression = batch_compression_type.map(|batch_compression_type| {
776 let mut c = crate::BodyCompressionBuilder::new(&mut fbb);
777 c.add_method(crate::BodyCompressionMethod::BUFFER);
778 c.add_codec(batch_compression_type);
779 c.finish()
780 });
781
782 let compression_codec: Option<CompressionCodec> = batch_compression_type
783 .map(|batch_compression_type| batch_compression_type.try_into())
784 .transpose()?;
785
786 let alignment = write_options.alignment;
787 let mut sink = IpcBodySink::Write(&mut arrow_data);
788 let offset = write_array_data(
789 array_data,
790 &mut buffers,
791 &mut sink,
792 &mut nodes,
793 0,
794 array_data.len(),
795 array_data.null_count(),
796 compression_codec,
797 compression_context,
798 write_options,
799 )?;
800
801 let mut variadic_buffer_counts = vec![];
802 append_variadic_buffer_counts(&mut variadic_buffer_counts, array_data);
803
804 let tail_pad = pad_to_alignment(alignment, offset as usize);
806 let body_len = offset as usize + tail_pad;
807 arrow_data.extend_from_slice(&PADDING[..tail_pad]);
808
809 let buffers = fbb.create_vector(&buffers);
811 let nodes = fbb.create_vector(&nodes);
812 let variadic_buffer = if variadic_buffer_counts.is_empty() {
813 None
814 } else {
815 Some(fbb.create_vector(&variadic_buffer_counts))
816 };
817
818 let root = {
819 let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb);
820 batch_builder.add_length(array_data.len() as i64);
821 batch_builder.add_nodes(nodes);
822 batch_builder.add_buffers(buffers);
823 if let Some(c) = compression {
824 batch_builder.add_compression(c);
825 }
826 if let Some(v) = variadic_buffer {
827 batch_builder.add_variadicBufferCounts(v);
828 }
829 batch_builder.finish()
830 };
831
832 let root = {
833 let mut batch_builder = crate::DictionaryBatchBuilder::new(&mut fbb);
834 batch_builder.add_id(dict_id);
835 batch_builder.add_data(root);
836 batch_builder.add_isDelta(is_delta);
837 batch_builder.finish().as_union_value()
838 };
839
840 let root = {
841 let mut message_builder = crate::MessageBuilder::new(&mut fbb);
842 message_builder.add_version(write_options.metadata_version);
843 message_builder.add_header_type(crate::MessageHeader::DictionaryBatch);
844 message_builder.add_bodyLength(body_len as i64);
845 message_builder.add_header(root);
846 message_builder.finish()
847 };
848
849 fbb.finish(root, None);
850 let finished_data = fbb.finished_data();
851
852 Ok(EncodedData {
853 ipc_message: finished_data.to_vec(),
854 arrow_data,
855 })
856 }
857}
858
859fn append_variadic_buffer_counts(counts: &mut Vec<i64>, array: &ArrayData) {
860 match array.data_type() {
861 DataType::BinaryView | DataType::Utf8View => {
862 counts.push(array.buffers().len() as i64 - 1);
865 }
866 DataType::Dictionary(_, _) => {
867 }
870 _ => {
871 for child in array.child_data() {
872 append_variadic_buffer_counts(counts, child)
873 }
874 }
875 }
876}
877
878pub(crate) fn unslice_run_array(arr: ArrayData) -> Result<ArrayData, ArrowError> {
879 match arr.data_type() {
880 DataType::RunEndEncoded(k, _) => match k.data_type() {
881 DataType::Int16 => {
882 Ok(into_zero_offset_run_array(RunArray::<Int16Type>::from(arr))?.into_data())
883 }
884 DataType::Int32 => {
885 Ok(into_zero_offset_run_array(RunArray::<Int32Type>::from(arr))?.into_data())
886 }
887 DataType::Int64 => {
888 Ok(into_zero_offset_run_array(RunArray::<Int64Type>::from(arr))?.into_data())
889 }
890 d => unreachable!("Unexpected data type {d}"),
891 },
892 d => Err(ArrowError::InvalidArgumentError(format!(
893 "The given array is not a run array. Data type of given array: {d}"
894 ))),
895 }
896}
897
898fn into_zero_offset_run_array<R: RunEndIndexType>(
901 run_array: RunArray<R>,
902) -> Result<RunArray<R>, ArrowError> {
903 let run_ends = run_array.run_ends();
904 if run_ends.offset() == 0 && run_ends.max_value() == run_ends.len() {
905 return Ok(run_array);
906 }
907
908 let start_physical_index = run_ends.get_start_physical_index();
910
911 let end_physical_index = run_ends.get_end_physical_index();
913
914 let physical_length = end_physical_index - start_physical_index + 1;
915
916 let offset = R::Native::usize_as(run_ends.offset());
918 let mut builder = BufferBuilder::<R::Native>::new(physical_length);
919 for run_end_value in &run_ends.values()[start_physical_index..end_physical_index] {
920 builder.append(run_end_value.sub_wrapping(offset));
921 }
922 builder.append(R::Native::from_usize(run_array.len()).unwrap());
923 let new_run_ends = unsafe {
924 ArrayDataBuilder::new(R::DATA_TYPE)
927 .len(physical_length)
928 .add_buffer(builder.finish())
929 .build_unchecked()
930 };
931
932 let new_values = run_array
934 .values()
935 .slice(start_physical_index, physical_length)
936 .into_data();
937
938 let builder = ArrayDataBuilder::new(run_array.data_type().clone())
939 .len(run_array.len())
940 .add_child_data(new_run_ends)
941 .add_child_data(new_values);
942 let array_data = unsafe {
943 builder.build_unchecked()
946 };
947 Ok(array_data.into())
948}
949
950#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
952pub enum DictionaryHandling {
953 #[default]
955 Resend,
956 Delta,
962}
963
964#[derive(Debug, Clone)]
966pub enum DictionaryUpdate {
967 None,
970 New,
972 Replaced,
974 Delta(ArrayData),
976}
977
978#[derive(Debug)]
984pub struct DictionaryTracker {
985 written: HashMap<i64, ArrayData>,
987 dict_ids: Vec<i64>,
988 error_on_replacement: bool,
989}
990
991impl DictionaryTracker {
992 pub fn new(error_on_replacement: bool) -> Self {
998 #[allow(deprecated)]
999 Self {
1000 written: HashMap::new(),
1001 dict_ids: Vec::new(),
1002 error_on_replacement,
1003 }
1004 }
1005
1006 pub fn next_dict_id(&mut self) -> i64 {
1008 let next = self
1009 .dict_ids
1010 .last()
1011 .copied()
1012 .map(|i| i + 1)
1013 .unwrap_or_default();
1014
1015 self.dict_ids.push(next);
1016 next
1017 }
1018
1019 pub fn dict_id(&mut self) -> &[i64] {
1022 &self.dict_ids
1023 }
1024
1025 #[deprecated(since = "56.1.0", note = "Use `insert_column` instead")]
1035 pub fn insert(&mut self, dict_id: i64, column: &ArrayRef) -> Result<bool, ArrowError> {
1036 let dict_data = column.to_data();
1037 let dict_values = &dict_data.child_data()[0];
1038
1039 if let Some(last) = self.written.get(&dict_id) {
1041 if ArrayData::ptr_eq(&last.child_data()[0], dict_values) {
1042 return Ok(false);
1044 }
1045 if self.error_on_replacement {
1046 if last.child_data()[0] == *dict_values {
1048 return Ok(false);
1050 }
1051 return Err(ArrowError::InvalidArgumentError(
1052 "Dictionary replacement detected when writing IPC file format. \
1053 Arrow IPC files only support a single dictionary for a given field \
1054 across all batches."
1055 .to_string(),
1056 ));
1057 }
1058 }
1059
1060 self.written.insert(dict_id, dict_data);
1061 Ok(true)
1062 }
1063
1064 pub fn insert_column(
1080 &mut self,
1081 dict_id: i64,
1082 column: &ArrayRef,
1083 dict_handling: DictionaryHandling,
1084 ) -> Result<DictionaryUpdate, ArrowError> {
1085 let new_data = column.to_data();
1086 let new_values = &new_data.child_data()[0];
1087
1088 let Some(old) = self.written.get(&dict_id) else {
1090 self.written.insert(dict_id, new_data);
1091 return Ok(DictionaryUpdate::New);
1092 };
1093
1094 let old_values = &old.child_data()[0];
1097 if ArrayData::ptr_eq(old_values, new_values) {
1098 return Ok(DictionaryUpdate::None);
1099 }
1100
1101 let comparison = compare_dictionaries(old_values, new_values);
1103 if matches!(comparison, DictionaryComparison::Equal) {
1104 return Ok(DictionaryUpdate::None);
1105 }
1106
1107 const REPLACEMENT_ERROR: &str = "Dictionary replacement detected when writing IPC file format. \
1108 Arrow IPC files only support a single dictionary for a given field \
1109 across all batches.";
1110
1111 match comparison {
1112 DictionaryComparison::NotEqual => {
1113 if self.error_on_replacement {
1114 return Err(ArrowError::InvalidArgumentError(
1115 REPLACEMENT_ERROR.to_string(),
1116 ));
1117 }
1118
1119 self.written.insert(dict_id, new_data);
1120 Ok(DictionaryUpdate::Replaced)
1121 }
1122 DictionaryComparison::Delta => match dict_handling {
1123 DictionaryHandling::Resend => {
1124 if self.error_on_replacement {
1125 return Err(ArrowError::InvalidArgumentError(
1126 REPLACEMENT_ERROR.to_string(),
1127 ));
1128 }
1129
1130 self.written.insert(dict_id, new_data);
1131 Ok(DictionaryUpdate::Replaced)
1132 }
1133 DictionaryHandling::Delta => {
1134 let delta =
1135 new_values.slice(old_values.len(), new_values.len() - old_values.len());
1136 self.written.insert(dict_id, new_data);
1137 Ok(DictionaryUpdate::Delta(delta))
1138 }
1139 },
1140 DictionaryComparison::Equal => unreachable!("Already checked equal case"),
1141 }
1142 }
1143
1144 pub fn clear(&mut self) {
1150 self.dict_ids.clear();
1151 self.written.clear();
1152 }
1153}
1154
1155#[derive(Debug, Clone)]
1157enum DictionaryComparison {
1158 NotEqual,
1160 Equal,
1162 Delta,
1165}
1166
1167fn compare_dictionaries(old: &ArrayData, new: &ArrayData) -> DictionaryComparison {
1169 let existing_len = old.len();
1171 let new_len = new.len();
1172 if existing_len == new_len {
1173 if *old == *new {
1174 return DictionaryComparison::Equal;
1175 } else {
1176 return DictionaryComparison::NotEqual;
1177 }
1178 }
1179
1180 if new_len < existing_len {
1182 return DictionaryComparison::NotEqual;
1183 }
1184
1185 if new.slice(0, existing_len) == *old {
1187 return DictionaryComparison::Delta;
1188 }
1189
1190 DictionaryComparison::NotEqual
1191}
1192
1193pub struct FileWriter<W> {
1216 writer: W,
1218 write_options: IpcWriteOptions,
1220 schema: SchemaRef,
1222 block_offsets: usize,
1224 dictionary_blocks: Vec<crate::Block>,
1226 record_blocks: Vec<crate::Block>,
1228 finished: bool,
1230 dictionary_tracker: DictionaryTracker,
1232 custom_metadata: HashMap<String, String>,
1234
1235 data_gen: IpcDataGenerator,
1236
1237 compression_context: CompressionContext,
1238}
1239
1240impl<W: Write> FileWriter<BufWriter<W>> {
1241 pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1245 Self::try_new(BufWriter::new(writer), schema)
1246 }
1247}
1248
1249impl<W: Write> FileWriter<W> {
1250 pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1258 let write_options = IpcWriteOptions::default();
1259 Self::try_new_with_options(writer, schema, write_options)
1260 }
1261
1262 pub fn try_new_with_options(
1270 mut writer: W,
1271 schema: &Schema,
1272 write_options: IpcWriteOptions,
1273 ) -> Result<Self, ArrowError> {
1274 let data_gen = IpcDataGenerator::default();
1275 let pad_len = pad_to_alignment(write_options.alignment, super::ARROW_MAGIC.len());
1277 let header_size = super::ARROW_MAGIC.len() + pad_len;
1278 writer.write_all(&super::ARROW_MAGIC)?;
1279 writer.write_all(&PADDING[..pad_len])?;
1280 let mut dictionary_tracker = DictionaryTracker::new(true);
1282 let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker(
1283 schema,
1284 &mut dictionary_tracker,
1285 &write_options,
1286 );
1287 let (meta, data) = write_message(&mut writer, encoded_message, &write_options)?;
1288 Ok(Self {
1289 writer,
1290 write_options,
1291 schema: Arc::new(schema.clone()),
1292 block_offsets: meta + data + header_size,
1293 dictionary_blocks: vec![],
1294 record_blocks: vec![],
1295 finished: false,
1296 dictionary_tracker,
1297 custom_metadata: HashMap::new(),
1298 data_gen,
1299 compression_context: CompressionContext::default(),
1300 })
1301 }
1302
1303 pub fn write_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
1305 self.custom_metadata.insert(key.into(), value.into());
1306 }
1307
1308 pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1310 if self.finished {
1311 return Err(ArrowError::IpcError(
1312 "Cannot write record batch to file writer as it is closed".to_string(),
1313 ));
1314 }
1315
1316 let meta = self.data_gen.write(
1317 batch,
1318 &mut self.dictionary_tracker,
1319 &self.write_options,
1320 &mut self.compression_context,
1321 &mut self.writer,
1322 )?;
1323
1324 for (header_len, body_len) in meta.dictionary_block_sizes {
1325 let block = crate::Block::new(
1326 self.block_offsets as i64,
1327 header_len as i32,
1328 body_len as i64,
1329 );
1330 self.dictionary_blocks.push(block);
1331 self.block_offsets += header_len + body_len;
1332 }
1333
1334 let block = crate::Block::new(
1336 self.block_offsets as i64,
1337 meta.padded_header_len as i32,
1338 meta.body_len as i64,
1339 );
1340 self.record_blocks.push(block);
1341 self.block_offsets += meta.padded_header_len + meta.body_len;
1342 Ok(())
1343 }
1344
1345 pub fn finish(&mut self) -> Result<(), ArrowError> {
1347 if self.finished {
1348 return Err(ArrowError::IpcError(
1349 "Cannot write footer to file writer as it is closed".to_string(),
1350 ));
1351 }
1352
1353 write_continuation(&mut self.writer, &self.write_options, 0)?;
1355
1356 let mut fbb = FlatBufferBuilder::new();
1357 let dictionaries = fbb.create_vector(&self.dictionary_blocks);
1358 let record_batches = fbb.create_vector(&self.record_blocks);
1359
1360 self.dictionary_tracker.clear();
1362 let schema = IpcSchemaEncoder::new()
1363 .with_dictionary_tracker(&mut self.dictionary_tracker)
1364 .schema_to_fb_offset(&mut fbb, &self.schema);
1365 let fb_custom_metadata = (!self.custom_metadata.is_empty())
1366 .then(|| crate::convert::metadata_to_fb(&mut fbb, &self.custom_metadata));
1367
1368 let root = {
1369 let mut footer_builder = crate::FooterBuilder::new(&mut fbb);
1370 footer_builder.add_version(self.write_options.metadata_version);
1371 footer_builder.add_schema(schema);
1372 footer_builder.add_dictionaries(dictionaries);
1373 footer_builder.add_recordBatches(record_batches);
1374 if let Some(fb_custom_metadata) = fb_custom_metadata {
1375 footer_builder.add_custom_metadata(fb_custom_metadata);
1376 }
1377 footer_builder.finish()
1378 };
1379 fbb.finish(root, None);
1380 let footer_data = fbb.finished_data();
1381 self.writer.write_all(footer_data)?;
1382 self.writer
1383 .write_all(&(footer_data.len() as i32).to_le_bytes())?;
1384 self.writer.write_all(&super::ARROW_MAGIC)?;
1385 self.writer.flush()?;
1386 self.finished = true;
1387
1388 Ok(())
1389 }
1390
1391 pub fn schema(&self) -> &SchemaRef {
1393 &self.schema
1394 }
1395
1396 pub fn get_ref(&self) -> &W {
1398 &self.writer
1399 }
1400
1401 pub fn get_mut(&mut self) -> &mut W {
1405 &mut self.writer
1406 }
1407
1408 pub fn flush(&mut self) -> Result<(), ArrowError> {
1412 self.writer.flush()?;
1413 Ok(())
1414 }
1415
1416 pub fn into_inner(mut self) -> Result<W, ArrowError> {
1425 if !self.finished {
1426 self.finish()?;
1428 }
1429 Ok(self.writer)
1430 }
1431}
1432
1433impl<W: Write> RecordBatchWriter for FileWriter<W> {
1434 fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1435 self.write(batch)
1436 }
1437
1438 fn close(mut self) -> Result<(), ArrowError> {
1439 self.finish()
1440 }
1441}
1442
1443pub struct StreamWriter<W> {
1517 writer: W,
1519 write_options: IpcWriteOptions,
1521 finished: bool,
1523 dictionary_tracker: DictionaryTracker,
1525
1526 data_gen: IpcDataGenerator,
1527
1528 compression_context: CompressionContext,
1529}
1530
1531impl<W: Write> StreamWriter<BufWriter<W>> {
1532 pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1536 Self::try_new(BufWriter::new(writer), schema)
1537 }
1538}
1539
1540impl<W: Write> StreamWriter<W> {
1541 pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1549 let write_options = IpcWriteOptions::default();
1550 Self::try_new_with_options(writer, schema, write_options)
1551 }
1552
1553 pub fn try_new_with_options(
1559 mut writer: W,
1560 schema: &Schema,
1561 write_options: IpcWriteOptions,
1562 ) -> Result<Self, ArrowError> {
1563 let data_gen = IpcDataGenerator::default();
1564 let mut dictionary_tracker = DictionaryTracker::new(false);
1565
1566 let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker(
1568 schema,
1569 &mut dictionary_tracker,
1570 &write_options,
1571 );
1572 write_message(&mut writer, encoded_message, &write_options)?;
1573 Ok(Self {
1574 writer,
1575 write_options,
1576 finished: false,
1577 dictionary_tracker,
1578 data_gen,
1579 compression_context: CompressionContext::default(),
1580 })
1581 }
1582
1583 pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1585 if self.finished {
1586 return Err(ArrowError::IpcError(
1587 "Cannot write record batch to stream writer as it is closed".to_string(),
1588 ));
1589 }
1590
1591 self.data_gen.write(
1592 batch,
1593 &mut self.dictionary_tracker,
1594 &self.write_options,
1595 &mut self.compression_context,
1596 &mut self.writer,
1597 )?;
1598 Ok(())
1599 }
1600
1601 pub fn finish(&mut self) -> Result<(), ArrowError> {
1603 if self.finished {
1604 return Err(ArrowError::IpcError(
1605 "Cannot write footer to stream writer as it is closed".to_string(),
1606 ));
1607 }
1608
1609 write_continuation(&mut self.writer, &self.write_options, 0)?;
1610 self.writer.flush()?;
1611
1612 self.finished = true;
1613
1614 Ok(())
1615 }
1616
1617 pub fn get_ref(&self) -> &W {
1619 &self.writer
1620 }
1621
1622 pub fn get_mut(&mut self) -> &mut W {
1626 &mut self.writer
1627 }
1628
1629 pub fn flush(&mut self) -> Result<(), ArrowError> {
1633 self.writer.flush()?;
1634 Ok(())
1635 }
1636
1637 pub fn into_inner(mut self) -> Result<W, ArrowError> {
1675 if !self.finished {
1676 self.finish()?;
1678 }
1679 Ok(self.writer)
1680 }
1681}
1682
1683impl<W: Write> RecordBatchWriter for StreamWriter<W> {
1684 fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1685 self.write(batch)
1686 }
1687
1688 fn close(mut self) -> Result<(), ArrowError> {
1689 self.finish()
1690 }
1691}
1692
1693pub struct EncodedData {
1695 pub ipc_message: Vec<u8>,
1697 pub arrow_data: Vec<u8>,
1699}
1700pub fn write_message<W: Write>(
1702 mut writer: W,
1703 encoded: EncodedData,
1704 write_options: &IpcWriteOptions,
1705) -> Result<(usize, usize), ArrowError> {
1706 let arrow_data_len = encoded.arrow_data.len();
1707 if arrow_data_len % usize::from(write_options.alignment) != 0 {
1708 return Err(ArrowError::MemoryError(
1709 "Arrow data not aligned".to_string(),
1710 ));
1711 }
1712
1713 let a = usize::from(write_options.alignment - 1);
1714 let buffer = encoded.ipc_message;
1715 let flatbuf_size = buffer.len();
1716 let prefix_size = if write_options.write_legacy_ipc_format {
1717 4
1718 } else {
1719 8
1720 };
1721 let aligned_size = (flatbuf_size + prefix_size + a) & !a;
1722 let padding_bytes = aligned_size - flatbuf_size - prefix_size;
1723
1724 write_continuation(
1725 &mut writer,
1726 write_options,
1727 (aligned_size - prefix_size) as i32,
1728 )?;
1729
1730 if flatbuf_size > 0 {
1732 writer.write_all(&buffer)?;
1733 }
1734 writer.write_all(&PADDING[..padding_bytes])?;
1736
1737 let body_len = if arrow_data_len > 0 {
1739 write_body_buffers(&mut writer, &encoded.arrow_data, write_options.alignment)?
1740 } else {
1741 0
1742 };
1743
1744 Ok((aligned_size, body_len))
1745}
1746
1747fn write_body_buffers<W: Write>(
1748 mut writer: W,
1749 data: &[u8],
1750 alignment: u8,
1751) -> Result<usize, ArrowError> {
1752 let len = data.len();
1753 let pad_len = pad_to_alignment(alignment, len);
1754 let total_len = len + pad_len;
1755
1756 writer.write_all(data)?;
1758 if pad_len > 0 {
1759 writer.write_all(&PADDING[..pad_len])?;
1760 }
1761
1762 Ok(total_len)
1763}
1764
1765fn write_continuation<W: Write>(
1768 mut writer: W,
1769 write_options: &IpcWriteOptions,
1770 total_len: i32,
1771) -> Result<usize, ArrowError> {
1772 let mut written = 8;
1773
1774 match write_options.metadata_version {
1776 crate::MetadataVersion::V1 | crate::MetadataVersion::V2 | crate::MetadataVersion::V3 => {
1777 unreachable!("Options with the metadata version cannot be created")
1778 }
1779 crate::MetadataVersion::V4 => {
1780 if !write_options.write_legacy_ipc_format {
1781 writer.write_all(&CONTINUATION_MARKER)?;
1783 written = 4;
1784 }
1785 writer.write_all(&total_len.to_le_bytes()[..])?;
1786 }
1787 crate::MetadataVersion::V5 => {
1788 writer.write_all(&CONTINUATION_MARKER)?;
1790 writer.write_all(&total_len.to_le_bytes()[..])?;
1791 }
1792 z => panic!("Unsupported crate::MetadataVersion {z:?}"),
1793 };
1794
1795 Ok(written)
1796}
1797
1798fn has_validity_bitmap(data_type: &DataType, write_options: &IpcWriteOptions) -> bool {
1802 if write_options.metadata_version < crate::MetadataVersion::V5 {
1803 !matches!(data_type, DataType::Null)
1804 } else {
1805 !matches!(
1806 data_type,
1807 DataType::Null | DataType::Union(_, _) | DataType::RunEndEncoded(_, _)
1808 )
1809 }
1810}
1811
1812#[inline]
1814fn buffer_need_truncate(
1815 array_offset: usize,
1816 buffer: &Buffer,
1817 spec: &BufferSpec,
1818 min_length: usize,
1819) -> bool {
1820 spec != &BufferSpec::AlwaysNull && (array_offset != 0 || min_length < buffer.len())
1821}
1822
1823#[inline]
1825fn get_buffer_element_width(spec: &BufferSpec) -> usize {
1826 match spec {
1827 BufferSpec::FixedWidth { byte_width, .. } => *byte_width,
1828 _ => 0,
1829 }
1830}
1831
1832fn reencode_offsets<O: OffsetSizeTrait>(
1835 offsets: &Buffer,
1836 data: &ArrayData,
1837) -> (Buffer, usize, usize) {
1838 let offsets_slice: &[O] = offsets.typed_data::<O>();
1839 let offset_slice = &offsets_slice[data.offset()..data.offset() + data.len() + 1];
1840
1841 let start_offset = offset_slice.first().unwrap();
1842 let end_offset = offset_slice.last().unwrap();
1843
1844 let offsets = match start_offset.as_usize() {
1845 0 => {
1846 let size = size_of::<O>();
1847 offsets.slice_with_length(data.offset() * size, (data.len() + 1) * size)
1848 }
1849 _ => offset_slice.iter().map(|x| *x - *start_offset).collect(),
1850 };
1851
1852 let start_offset = start_offset.as_usize();
1853 let end_offset = end_offset.as_usize();
1854
1855 (offsets, start_offset, end_offset - start_offset)
1856}
1857
1858fn get_byte_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, Buffer) {
1864 if data.is_empty() {
1865 let mut offsets = MutableBuffer::new(size_of::<O>());
1868 offsets.extend_from_slice(O::usize_as(0).to_byte_slice());
1869 return (offsets.into(), MutableBuffer::new(0).into());
1870 }
1871
1872 let (offsets, original_start_offset, len) = reencode_offsets::<O>(&data.buffers()[0], data);
1873 let values = data.buffers()[1].slice_with_length(original_start_offset, len);
1874 (offsets, values)
1875}
1876
1877fn get_list_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, ArrayData) {
1880 if data.is_empty() {
1881 let mut offsets = MutableBuffer::new(size_of::<O>());
1884 offsets.extend_from_slice(O::usize_as(0).to_byte_slice());
1885 return (offsets.into(), data.child_data()[0].slice(0, 0));
1886 }
1887
1888 let (offsets, original_start_offset, len) = reencode_offsets::<O>(&data.buffers()[0], data);
1889 let child_data = data.child_data()[0].slice(original_start_offset, len);
1890 (offsets, child_data)
1891}
1892
1893fn get_list_view_array_buffers<O: OffsetSizeTrait>(
1899 data: &ArrayData,
1900) -> (Buffer, Buffer, ArrayData) {
1901 if data.is_empty() {
1902 return (
1903 MutableBuffer::new(0).into(),
1904 MutableBuffer::new(0).into(),
1905 data.child_data()[0].slice(0, 0),
1906 );
1907 }
1908
1909 let offsets = &data.buffers()[0];
1910 let sizes = &data.buffers()[1];
1911
1912 let element_size = std::mem::size_of::<O>();
1913 let offsets_slice =
1914 offsets.slice_with_length(data.offset() * element_size, data.len() * element_size);
1915 let sizes_slice =
1916 sizes.slice_with_length(data.offset() * element_size, data.len() * element_size);
1917
1918 let child_data = data.child_data()[0].clone();
1919
1920 (offsets_slice, sizes_slice, child_data)
1921}
1922
1923fn get_or_truncate_buffer(array_data: &ArrayData) -> Buffer {
1930 let buffer = &array_data.buffers()[0];
1931 let layout = layout(array_data.data_type());
1932 let spec = &layout.buffers[0];
1933
1934 let byte_width = get_buffer_element_width(spec);
1935 let min_length = array_data.len() * byte_width;
1936 if buffer_need_truncate(array_data.offset(), buffer, spec, min_length) {
1937 let byte_offset = array_data.offset() * byte_width;
1938 let buffer_length = min(min_length, buffer.len() - byte_offset);
1939 buffer.slice_with_length(byte_offset, buffer_length)
1940 } else {
1941 buffer.clone()
1942 }
1943}
1944
1945#[allow(clippy::too_many_arguments)]
1952fn write_array_data(
1953 array_data: &ArrayData,
1954 buffers: &mut Vec<crate::Buffer>,
1955 sink: &mut IpcBodySink<'_>,
1956 nodes: &mut Vec<crate::FieldNode>,
1957 offset: i64,
1958 num_rows: usize,
1959 null_count: usize,
1960 compression_codec: Option<CompressionCodec>,
1961 compression_context: &mut CompressionContext,
1962 write_options: &IpcWriteOptions,
1963) -> Result<i64, ArrowError> {
1964 let mut offset = offset;
1965 if !matches!(array_data.data_type(), DataType::Null) {
1966 nodes.push(crate::FieldNode::new(num_rows as i64, null_count as i64));
1967 } else {
1968 nodes.push(crate::FieldNode::new(num_rows as i64, num_rows as i64));
1971 }
1972 if has_validity_bitmap(array_data.data_type(), write_options) {
1973 let null_buffer = match array_data.nulls() {
1975 None => {
1976 let num_bytes = bit_util::ceil(num_rows, 8);
1978 let buffer = MutableBuffer::new(num_bytes);
1979 let buffer = buffer.with_bitset(num_bytes, true);
1980 buffer.into()
1981 }
1982 Some(buffer) => buffer.inner().sliced(),
1983 };
1984
1985 offset = encode_sink_buffer(
1986 null_buffer,
1987 buffers,
1988 sink,
1989 offset,
1990 compression_codec,
1991 compression_context,
1992 write_options.alignment,
1993 )?;
1994 }
1995
1996 let data_type = array_data.data_type();
1997 if matches!(data_type, DataType::Binary | DataType::Utf8) {
1998 let (offsets, values) = get_byte_array_buffers::<i32>(array_data);
1999 for buffer in [offsets, values] {
2000 offset = encode_sink_buffer(
2001 buffer,
2002 buffers,
2003 sink,
2004 offset,
2005 compression_codec,
2006 compression_context,
2007 write_options.alignment,
2008 )?;
2009 }
2010 } else if matches!(data_type, DataType::BinaryView | DataType::Utf8View) {
2011 let views = get_or_truncate_buffer(array_data);
2018 offset = encode_sink_buffer(
2019 views,
2020 buffers,
2021 sink,
2022 offset,
2023 compression_codec,
2024 compression_context,
2025 write_options.alignment,
2026 )?;
2027
2028 for buffer in array_data.buffers().iter().skip(1) {
2029 offset = encode_sink_buffer(
2030 buffer.clone(),
2031 buffers,
2032 sink,
2033 offset,
2034 compression_codec,
2035 compression_context,
2036 write_options.alignment,
2037 )?;
2038 }
2039 } else if matches!(data_type, DataType::LargeBinary | DataType::LargeUtf8) {
2040 let (offsets, values) = get_byte_array_buffers::<i64>(array_data);
2041 for buffer in [offsets, values] {
2042 offset = encode_sink_buffer(
2043 buffer,
2044 buffers,
2045 sink,
2046 offset,
2047 compression_codec,
2048 compression_context,
2049 write_options.alignment,
2050 )?;
2051 }
2052 } else if DataType::is_numeric(data_type)
2053 || DataType::is_temporal(data_type)
2054 || matches!(
2055 array_data.data_type(),
2056 DataType::FixedSizeBinary(_) | DataType::Dictionary(_, _)
2057 )
2058 {
2059 assert_eq!(array_data.buffers().len(), 1);
2061
2062 let buffer = get_or_truncate_buffer(array_data);
2063 offset = encode_sink_buffer(
2064 buffer,
2065 buffers,
2066 sink,
2067 offset,
2068 compression_codec,
2069 compression_context,
2070 write_options.alignment,
2071 )?;
2072 } else if matches!(data_type, DataType::Boolean) {
2073 assert_eq!(array_data.buffers().len(), 1);
2076
2077 let buffer = &array_data.buffers()[0];
2078 let buffer = buffer.bit_slice(array_data.offset(), array_data.len());
2079 offset = encode_sink_buffer(
2080 buffer,
2081 buffers,
2082 sink,
2083 offset,
2084 compression_codec,
2085 compression_context,
2086 write_options.alignment,
2087 )?;
2088 } else if matches!(
2089 data_type,
2090 DataType::List(_) | DataType::LargeList(_) | DataType::Map(_, _)
2091 ) {
2092 assert_eq!(array_data.buffers().len(), 1);
2093 assert_eq!(array_data.child_data().len(), 1);
2094
2095 let (offsets, sliced_child_data) = match data_type {
2097 DataType::List(_) => get_list_array_buffers::<i32>(array_data),
2098 DataType::Map(_, _) => get_list_array_buffers::<i32>(array_data),
2099 DataType::LargeList(_) => get_list_array_buffers::<i64>(array_data),
2100 _ => unreachable!(),
2101 };
2102 offset = encode_sink_buffer(
2103 offsets,
2104 buffers,
2105 sink,
2106 offset,
2107 compression_codec,
2108 compression_context,
2109 write_options.alignment,
2110 )?;
2111 offset = write_array_data(
2112 &sliced_child_data,
2113 buffers,
2114 sink,
2115 nodes,
2116 offset,
2117 sliced_child_data.len(),
2118 sliced_child_data.null_count(),
2119 compression_codec,
2120 compression_context,
2121 write_options,
2122 )?;
2123 return Ok(offset);
2124 } else if matches!(
2125 data_type,
2126 DataType::ListView(_) | DataType::LargeListView(_)
2127 ) {
2128 assert_eq!(array_data.buffers().len(), 2); assert_eq!(array_data.child_data().len(), 1);
2130
2131 let (offsets, sizes, child_data) = match data_type {
2132 DataType::ListView(_) => get_list_view_array_buffers::<i32>(array_data),
2133 DataType::LargeListView(_) => get_list_view_array_buffers::<i64>(array_data),
2134 _ => unreachable!(),
2135 };
2136
2137 offset = encode_sink_buffer(
2138 offsets,
2139 buffers,
2140 sink,
2141 offset,
2142 compression_codec,
2143 compression_context,
2144 write_options.alignment,
2145 )?;
2146 offset = encode_sink_buffer(
2147 sizes,
2148 buffers,
2149 sink,
2150 offset,
2151 compression_codec,
2152 compression_context,
2153 write_options.alignment,
2154 )?;
2155
2156 offset = write_array_data(
2157 &child_data,
2158 buffers,
2159 sink,
2160 nodes,
2161 offset,
2162 child_data.len(),
2163 child_data.null_count(),
2164 compression_codec,
2165 compression_context,
2166 write_options,
2167 )?;
2168 return Ok(offset);
2169 } else if let DataType::FixedSizeList(_, fixed_size) = data_type {
2170 assert_eq!(array_data.child_data().len(), 1);
2171 let fixed_size = *fixed_size as usize;
2172
2173 let child_offset = array_data.offset() * fixed_size;
2174 let child_length = array_data.len() * fixed_size;
2175 let child_data = array_data.child_data()[0].slice(child_offset, child_length);
2176
2177 offset = write_array_data(
2178 &child_data,
2179 buffers,
2180 sink,
2181 nodes,
2182 offset,
2183 child_data.len(),
2184 child_data.null_count(),
2185 compression_codec,
2186 compression_context,
2187 write_options,
2188 )?;
2189 return Ok(offset);
2190 } else {
2191 for buffer in array_data.buffers() {
2192 offset = encode_sink_buffer(
2193 buffer.clone(),
2194 buffers,
2195 sink,
2196 offset,
2197 compression_codec,
2198 compression_context,
2199 write_options.alignment,
2200 )?;
2201 }
2202 }
2203
2204 match array_data.data_type() {
2205 DataType::Dictionary(_, _) => {}
2206 DataType::RunEndEncoded(_, _) => {
2207 let arr = unslice_run_array(array_data.clone())?;
2209 for data_ref in arr.child_data() {
2211 offset = write_array_data(
2213 data_ref,
2214 buffers,
2215 sink,
2216 nodes,
2217 offset,
2218 data_ref.len(),
2219 data_ref.null_count(),
2220 compression_codec,
2221 compression_context,
2222 write_options,
2223 )?;
2224 }
2225 }
2226 _ => {
2227 for data_ref in array_data.child_data() {
2229 offset = write_array_data(
2231 data_ref,
2232 buffers,
2233 sink,
2234 nodes,
2235 offset,
2236 data_ref.len(),
2237 data_ref.null_count(),
2238 compression_codec,
2239 compression_context,
2240 write_options,
2241 )?;
2242 }
2243 }
2244 }
2245 Ok(offset)
2246}
2247
2248fn encode_sink_buffer(
2262 buffer: Buffer,
2263 buffers: &mut Vec<crate::Buffer>,
2264 sink: &mut IpcBodySink<'_>,
2265 offset: i64,
2266 compression_codec: Option<CompressionCodec>,
2267 compression_context: &mut CompressionContext,
2268 alignment: u8,
2269) -> Result<i64, ArrowError> {
2270 let (encoded, len) = match compression_codec {
2271 None => {
2272 let len = buffer.len() as i64;
2273 (EncodedBuffer::Raw(buffer), len)
2274 }
2275 Some(codec) => {
2276 let mut scratch = Vec::new();
2277 let written =
2278 codec.compress_to_vec(buffer.as_slice(), &mut scratch, compression_context)?;
2279 let len = i64::try_from(written)
2280 .map_err(|e| ArrowError::InvalidArgumentError(format!("{e}")))?;
2281 (EncodedBuffer::Compressed(scratch), len)
2282 }
2283 };
2284
2285 let pad_len = pad_to_alignment(alignment, len as usize);
2286 sink.write(pad_len, encoded);
2287 buffers.push(crate::Buffer::new(offset, len));
2288 Ok(offset + len + pad_len as i64)
2289}
2290
2291const PADDING: [u8; 64] = [0; 64];
2292
2293#[inline]
2299fn estimate_encoded_buffer_count(dt: &DataType) -> usize {
2300 match dt {
2301 DataType::Null => 0,
2302
2303 DataType::Binary | DataType::Utf8 | DataType::LargeBinary | DataType::LargeUtf8 => 3,
2304
2305 DataType::BinaryView | DataType::Utf8View => 3,
2306
2307 DataType::List(f) | DataType::LargeList(f) | DataType::Map(f, _) => {
2308 2 + estimate_encoded_buffer_count(f.data_type())
2309 }
2310
2311 DataType::ListView(f) | DataType::LargeListView(f) => {
2312 3 + estimate_encoded_buffer_count(f.data_type())
2313 }
2314
2315 DataType::FixedSizeList(f, _) => 1 + estimate_encoded_buffer_count(f.data_type()),
2316
2317 DataType::Struct(fields) => {
2318 1 + fields
2319 .iter()
2320 .map(|f| estimate_encoded_buffer_count(f.data_type()))
2321 .sum::<usize>()
2322 }
2323
2324 DataType::Dictionary(_, _) => 2,
2326
2327 DataType::Union(fields, UnionMode::Sparse) => {
2328 1 + fields
2329 .iter()
2330 .map(|(_, f)| estimate_encoded_buffer_count(f.data_type()))
2331 .sum::<usize>()
2332 }
2333 DataType::Union(fields, UnionMode::Dense) => {
2334 2 + fields
2335 .iter()
2336 .map(|(_, f)| estimate_encoded_buffer_count(f.data_type()))
2337 .sum::<usize>()
2338 }
2339
2340 DataType::RunEndEncoded(run_ends, values) => {
2341 estimate_encoded_buffer_count(run_ends.data_type())
2342 + estimate_encoded_buffer_count(values.data_type())
2343 }
2344 _ => 2,
2346 }
2347}
2348
2349#[inline]
2351fn pad_to_alignment(alignment: u8, len: usize) -> usize {
2352 let a = usize::from(alignment - 1);
2353 ((len + a) & !a) - len
2354}
2355
2356#[cfg(test)]
2357mod tests {
2358 use std::hash::Hasher;
2359 use std::io::Cursor;
2360 use std::io::Seek;
2361
2362 use arrow_array::builder::FixedSizeListBuilder;
2363 use arrow_array::builder::Float32Builder;
2364 use arrow_array::builder::Int64Builder;
2365 use arrow_array::builder::MapBuilder;
2366 use arrow_array::builder::StringViewBuilder;
2367 use arrow_array::builder::UnionBuilder;
2368 use arrow_array::builder::{
2369 GenericListBuilder, GenericListViewBuilder, ListBuilder, StringBuilder,
2370 };
2371 use arrow_array::builder::{PrimitiveRunBuilder, UInt32Builder};
2372 use arrow_array::types::*;
2373 use arrow_buffer::ScalarBuffer;
2374
2375 use crate::MetadataVersion;
2376 use crate::convert::fb_to_schema;
2377 use crate::reader::*;
2378 use crate::root_as_footer;
2379
2380 use super::*;
2381
2382 fn serialize_file(rb: &RecordBatch) -> Vec<u8> {
2383 let mut writer = FileWriter::try_new(vec![], rb.schema_ref()).unwrap();
2384 writer.write(rb).unwrap();
2385 writer.finish().unwrap();
2386 writer.into_inner().unwrap()
2387 }
2388
2389 fn deserialize_file(bytes: Vec<u8>) -> RecordBatch {
2390 let mut reader = FileReader::try_new(Cursor::new(bytes), None).unwrap();
2391 reader.next().unwrap().unwrap()
2392 }
2393
2394 fn serialize_stream(record: &RecordBatch) -> Vec<u8> {
2395 const IPC_ALIGNMENT: usize = 8;
2399
2400 let mut stream_writer = StreamWriter::try_new_with_options(
2401 vec![],
2402 record.schema_ref(),
2403 IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
2404 )
2405 .unwrap();
2406 stream_writer.write(record).unwrap();
2407 stream_writer.finish().unwrap();
2408 stream_writer.into_inner().unwrap()
2409 }
2410
2411 fn deserialize_stream(bytes: Vec<u8>) -> RecordBatch {
2412 let mut stream_reader = StreamReader::try_new(Cursor::new(bytes), None).unwrap();
2413 stream_reader.next().unwrap().unwrap()
2414 }
2415
2416 #[test]
2417 #[cfg(feature = "lz4")]
2418 fn test_write_empty_record_batch_lz4_compression() {
2419 let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
2420 let values: Vec<Option<i32>> = vec![];
2421 let array = Int32Array::from(values);
2422 let record_batch =
2423 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2424
2425 let mut file = tempfile::tempfile().unwrap();
2426
2427 {
2428 let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2429 .unwrap()
2430 .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
2431 .unwrap();
2432
2433 let mut writer =
2434 FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2435 writer.write(&record_batch).unwrap();
2436 writer.finish().unwrap();
2437 }
2438 file.rewind().unwrap();
2439 {
2440 let reader = FileReader::try_new(file, None).unwrap();
2442 for read_batch in reader {
2443 read_batch
2444 .unwrap()
2445 .columns()
2446 .iter()
2447 .zip(record_batch.columns())
2448 .for_each(|(a, b)| {
2449 assert_eq!(a.data_type(), b.data_type());
2450 assert_eq!(a.len(), b.len());
2451 assert_eq!(a.null_count(), b.null_count());
2452 });
2453 }
2454 }
2455 }
2456
2457 #[test]
2458 #[cfg(feature = "lz4")]
2459 fn test_write_file_with_lz4_compression() {
2460 let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
2461 let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
2462 let array = Int32Array::from(values);
2463 let record_batch =
2464 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2465
2466 let mut file = tempfile::tempfile().unwrap();
2467 {
2468 let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2469 .unwrap()
2470 .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
2471 .unwrap();
2472
2473 let mut writer =
2474 FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2475 writer.write(&record_batch).unwrap();
2476 writer.finish().unwrap();
2477 }
2478 file.rewind().unwrap();
2479 {
2480 let reader = FileReader::try_new(file, None).unwrap();
2482 for read_batch in reader {
2483 read_batch
2484 .unwrap()
2485 .columns()
2486 .iter()
2487 .zip(record_batch.columns())
2488 .for_each(|(a, b)| {
2489 assert_eq!(a.data_type(), b.data_type());
2490 assert_eq!(a.len(), b.len());
2491 assert_eq!(a.null_count(), b.null_count());
2492 });
2493 }
2494 }
2495 }
2496
2497 #[test]
2498 #[cfg(feature = "zstd")]
2499 fn test_write_file_with_zstd_compression() {
2500 let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
2501 let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
2502 let array = Int32Array::from(values);
2503 let record_batch =
2504 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2505 let mut file = tempfile::tempfile().unwrap();
2506 {
2507 let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2508 .unwrap()
2509 .try_with_compression(Some(crate::CompressionType::ZSTD))
2510 .unwrap();
2511
2512 let mut writer =
2513 FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2514 writer.write(&record_batch).unwrap();
2515 writer.finish().unwrap();
2516 }
2517 file.rewind().unwrap();
2518 {
2519 let reader = FileReader::try_new(file, None).unwrap();
2521 for read_batch in reader {
2522 read_batch
2523 .unwrap()
2524 .columns()
2525 .iter()
2526 .zip(record_batch.columns())
2527 .for_each(|(a, b)| {
2528 assert_eq!(a.data_type(), b.data_type());
2529 assert_eq!(a.len(), b.len());
2530 assert_eq!(a.null_count(), b.null_count());
2531 });
2532 }
2533 }
2534 }
2535
2536 #[test]
2537 fn test_write_file() {
2538 let schema = Schema::new(vec![Field::new("field1", DataType::UInt32, true)]);
2539 let values: Vec<Option<u32>> = vec![
2540 Some(999),
2541 None,
2542 Some(235),
2543 Some(123),
2544 None,
2545 None,
2546 None,
2547 None,
2548 None,
2549 ];
2550 let array1 = UInt32Array::from(values);
2551 let batch =
2552 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array1) as ArrayRef])
2553 .unwrap();
2554 let mut file = tempfile::tempfile().unwrap();
2555 {
2556 let mut writer = FileWriter::try_new(&mut file, &schema).unwrap();
2557
2558 writer.write(&batch).unwrap();
2559 writer.finish().unwrap();
2560 }
2561 file.rewind().unwrap();
2562
2563 {
2564 let mut reader = FileReader::try_new(file, None).unwrap();
2565 while let Some(Ok(read_batch)) = reader.next() {
2566 read_batch
2567 .columns()
2568 .iter()
2569 .zip(batch.columns())
2570 .for_each(|(a, b)| {
2571 assert_eq!(a.data_type(), b.data_type());
2572 assert_eq!(a.len(), b.len());
2573 assert_eq!(a.null_count(), b.null_count());
2574 });
2575 }
2576 }
2577 }
2578
2579 #[test]
2580 fn test_empty_utf8_ipc_writes_nonempty_offsets_buffer() {
2581 let name = StringArray::from(Vec::<String>::new());
2582 let (offsets, values) = get_byte_array_buffers::<i32>(&name.to_data());
2583
2584 assert_eq!(name.len(), 0);
2585 assert_eq!(
2586 offsets.len(),
2587 std::mem::size_of::<i32>(),
2588 "offsets buffer should contain one zero i32 offset"
2589 );
2590 assert_eq!(values.len(), 0, "values buffer should remain empty");
2591 }
2592
2593 #[test]
2594 fn test_empty_large_utf8_ipc_writes_nonempty_offsets_buffer() {
2595 let name = LargeStringArray::from(Vec::<String>::new());
2596 let (offsets, values) = get_byte_array_buffers::<i64>(&name.to_data());
2597
2598 assert_eq!(name.len(), 0);
2599 assert_eq!(
2600 offsets.len(),
2601 std::mem::size_of::<i64>(),
2602 "offsets buffer should contain one zero i64 offset"
2603 );
2604 assert_eq!(values.len(), 0, "values buffer should remain empty");
2605 }
2606
2607 #[test]
2608 fn test_empty_list_ipc_writes_nonempty_offsets_buffer() {
2609 let list = GenericListBuilder::<i32, _>::new(UInt32Builder::new()).finish();
2610 let (offsets, child_data) = get_list_array_buffers::<i32>(&list.to_data());
2611
2612 assert_eq!(list.len(), 0);
2613 assert_eq!(
2614 offsets.len(),
2615 std::mem::size_of::<i32>(),
2616 "offsets buffer should contain one zero i32 offset"
2617 );
2618 assert_eq!(child_data.len(), 0, "child data should remain empty");
2619 }
2620
2621 #[test]
2622 fn test_empty_large_list_ipc_writes_nonempty_offsets_buffer() {
2623 let list = GenericListBuilder::<i64, _>::new(UInt32Builder::new()).finish();
2624 let (offsets, child_data) = get_list_array_buffers::<i64>(&list.to_data());
2625
2626 assert_eq!(list.len(), 0);
2627 assert_eq!(
2628 offsets.len(),
2629 std::mem::size_of::<i64>(),
2630 "offsets buffer should contain one zero i64 offset"
2631 );
2632 assert_eq!(child_data.len(), 0, "child data should remain empty");
2633 }
2634
2635 fn write_null_file(options: IpcWriteOptions) {
2636 let schema = Schema::new(vec![
2637 Field::new("nulls", DataType::Null, true),
2638 Field::new("int32s", DataType::Int32, false),
2639 Field::new("nulls2", DataType::Null, true),
2640 Field::new("f64s", DataType::Float64, false),
2641 ]);
2642 let array1 = NullArray::new(32);
2643 let array2 = Int32Array::from(vec![1; 32]);
2644 let array3 = NullArray::new(32);
2645 let array4 = Float64Array::from(vec![f64::NAN; 32]);
2646 let batch = RecordBatch::try_new(
2647 Arc::new(schema.clone()),
2648 vec![
2649 Arc::new(array1) as ArrayRef,
2650 Arc::new(array2) as ArrayRef,
2651 Arc::new(array3) as ArrayRef,
2652 Arc::new(array4) as ArrayRef,
2653 ],
2654 )
2655 .unwrap();
2656 let mut file = tempfile::tempfile().unwrap();
2657 {
2658 let mut writer = FileWriter::try_new_with_options(&mut file, &schema, options).unwrap();
2659
2660 writer.write(&batch).unwrap();
2661 writer.finish().unwrap();
2662 }
2663
2664 file.rewind().unwrap();
2665
2666 {
2667 let reader = FileReader::try_new(file, None).unwrap();
2668 reader.for_each(|maybe_batch| {
2669 maybe_batch
2670 .unwrap()
2671 .columns()
2672 .iter()
2673 .zip(batch.columns())
2674 .for_each(|(a, b)| {
2675 assert_eq!(a.data_type(), b.data_type());
2676 assert_eq!(a.len(), b.len());
2677 assert_eq!(a.null_count(), b.null_count());
2678 });
2679 });
2680 }
2681 }
2682 #[test]
2683 fn test_write_null_file_v4() {
2684 write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap());
2685 write_null_file(IpcWriteOptions::try_new(8, true, MetadataVersion::V4).unwrap());
2686 write_null_file(IpcWriteOptions::try_new(64, false, MetadataVersion::V4).unwrap());
2687 write_null_file(IpcWriteOptions::try_new(64, true, MetadataVersion::V4).unwrap());
2688 }
2689
2690 #[test]
2691 fn test_write_null_file_v5() {
2692 write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap());
2693 write_null_file(IpcWriteOptions::try_new(64, false, MetadataVersion::V5).unwrap());
2694 }
2695
2696 #[test]
2697 fn track_union_nested_dict() {
2698 let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2699
2700 let array = Arc::new(inner) as ArrayRef;
2701
2702 #[allow(deprecated)]
2704 let dctfield = Field::new_dict("dict", array.data_type().clone(), false, 0, false);
2705 let union_fields = [(0, Arc::new(dctfield))].into_iter().collect();
2706
2707 let types = [0, 0, 0].into_iter().collect::<ScalarBuffer<i8>>();
2708 let offsets = [0, 1, 2].into_iter().collect::<ScalarBuffer<i32>>();
2709
2710 let union = UnionArray::try_new(union_fields, types, Some(offsets), vec![array]).unwrap();
2711
2712 let schema = Arc::new(Schema::new(vec![Field::new(
2713 "union",
2714 union.data_type().clone(),
2715 false,
2716 )]));
2717
2718 let r#gen = IpcDataGenerator::default();
2719 let mut dict_tracker = DictionaryTracker::new(false);
2720 r#gen.schema_to_bytes_with_dictionary_tracker(
2721 &schema,
2722 &mut dict_tracker,
2723 &IpcWriteOptions::default(),
2724 );
2725
2726 let batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap();
2727
2728 r#gen
2729 .encode(
2730 &batch,
2731 &mut dict_tracker,
2732 &Default::default(),
2733 &mut Default::default(),
2734 )
2735 .unwrap();
2736
2737 assert!(dict_tracker.written.contains_key(&0));
2740 }
2741
2742 #[test]
2743 fn track_struct_nested_dict() {
2744 let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2745
2746 let array = Arc::new(inner) as ArrayRef;
2747
2748 #[allow(deprecated)]
2750 let dctfield = Arc::new(Field::new_dict(
2751 "dict",
2752 array.data_type().clone(),
2753 false,
2754 2,
2755 false,
2756 ));
2757
2758 let s = StructArray::from(vec![(dctfield, array)]);
2759 let struct_array = Arc::new(s) as ArrayRef;
2760
2761 let schema = Arc::new(Schema::new(vec![Field::new(
2762 "struct",
2763 struct_array.data_type().clone(),
2764 false,
2765 )]));
2766
2767 let r#gen = IpcDataGenerator::default();
2768 let mut dict_tracker = DictionaryTracker::new(false);
2769 r#gen.schema_to_bytes_with_dictionary_tracker(
2770 &schema,
2771 &mut dict_tracker,
2772 &IpcWriteOptions::default(),
2773 );
2774
2775 let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
2776
2777 r#gen
2778 .encode(
2779 &batch,
2780 &mut dict_tracker,
2781 &Default::default(),
2782 &mut Default::default(),
2783 )
2784 .unwrap();
2785
2786 assert!(dict_tracker.written.contains_key(&0));
2787 }
2788
2789 fn write_union_file(options: IpcWriteOptions) {
2790 let schema = Schema::new(vec![Field::new_union(
2791 "union",
2792 vec![0, 1],
2793 vec![
2794 Field::new("a", DataType::Int32, false),
2795 Field::new("c", DataType::Float64, false),
2796 ],
2797 UnionMode::Sparse,
2798 )]);
2799 let mut builder = UnionBuilder::with_capacity_sparse(5);
2800 builder.append::<Int32Type>("a", 1).unwrap();
2801 builder.append_null::<Int32Type>("a").unwrap();
2802 builder.append::<Float64Type>("c", 3.0).unwrap();
2803 builder.append_null::<Float64Type>("c").unwrap();
2804 builder.append::<Int32Type>("a", 4).unwrap();
2805 let union = builder.build().unwrap();
2806
2807 let batch =
2808 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(union) as ArrayRef])
2809 .unwrap();
2810
2811 let mut file = tempfile::tempfile().unwrap();
2812 {
2813 let mut writer = FileWriter::try_new_with_options(&mut file, &schema, options).unwrap();
2814
2815 writer.write(&batch).unwrap();
2816 writer.finish().unwrap();
2817 }
2818 file.rewind().unwrap();
2819
2820 {
2821 let reader = FileReader::try_new(file, None).unwrap();
2822 reader.for_each(|maybe_batch| {
2823 maybe_batch
2824 .unwrap()
2825 .columns()
2826 .iter()
2827 .zip(batch.columns())
2828 .for_each(|(a, b)| {
2829 assert_eq!(a.data_type(), b.data_type());
2830 assert_eq!(a.len(), b.len());
2831 assert_eq!(a.null_count(), b.null_count());
2832 });
2833 });
2834 }
2835 }
2836
2837 #[test]
2838 fn test_write_union_file_v4_v5() {
2839 write_union_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap());
2840 write_union_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap());
2841 }
2842
2843 #[test]
2844 fn test_write_view_types() {
2845 const LONG_TEST_STRING: &str =
2846 "This is a long string to make sure binary view array handles it";
2847 let schema = Schema::new(vec![
2848 Field::new("field1", DataType::BinaryView, true),
2849 Field::new("field2", DataType::Utf8View, true),
2850 ]);
2851 let values: Vec<Option<&[u8]>> = vec![
2852 Some(b"foo"),
2853 Some(b"bar"),
2854 Some(LONG_TEST_STRING.as_bytes()),
2855 ];
2856 let binary_array = BinaryViewArray::from_iter(values);
2857 let utf8_array =
2858 StringViewArray::from_iter(vec![Some("foo"), Some("bar"), Some(LONG_TEST_STRING)]);
2859 let record_batch = RecordBatch::try_new(
2860 Arc::new(schema.clone()),
2861 vec![Arc::new(binary_array), Arc::new(utf8_array)],
2862 )
2863 .unwrap();
2864
2865 let mut file = tempfile::tempfile().unwrap();
2866 {
2867 let mut writer = FileWriter::try_new(&mut file, &schema).unwrap();
2868 writer.write(&record_batch).unwrap();
2869 writer.finish().unwrap();
2870 }
2871 file.rewind().unwrap();
2872 {
2873 let mut reader = FileReader::try_new(&file, None).unwrap();
2874 let read_batch = reader.next().unwrap().unwrap();
2875 read_batch
2876 .columns()
2877 .iter()
2878 .zip(record_batch.columns())
2879 .for_each(|(a, b)| {
2880 assert_eq!(a, b);
2881 });
2882 }
2883 file.rewind().unwrap();
2884 {
2885 let mut reader = FileReader::try_new(&file, Some(vec![0])).unwrap();
2886 let read_batch = reader.next().unwrap().unwrap();
2887 assert_eq!(read_batch.num_columns(), 1);
2888 let read_array = read_batch.column(0);
2889 let write_array = record_batch.column(0);
2890 assert_eq!(read_array, write_array);
2891 }
2892 }
2893
2894 #[test]
2895 fn truncate_ipc_record_batch() {
2896 fn create_batch(rows: usize) -> RecordBatch {
2897 let schema = Schema::new(vec![
2898 Field::new("a", DataType::Int32, false),
2899 Field::new("b", DataType::Utf8, false),
2900 ]);
2901
2902 let a = Int32Array::from_iter_values(0..rows as i32);
2903 let b = StringArray::from_iter_values((0..rows).map(|i| i.to_string()));
2904
2905 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap()
2906 }
2907
2908 let big_record_batch = create_batch(65536);
2909
2910 let length = 5;
2911 let small_record_batch = create_batch(length);
2912
2913 let offset = 2;
2914 let record_batch_slice = big_record_batch.slice(offset, length);
2915 assert!(
2916 serialize_stream(&big_record_batch).len() > serialize_stream(&small_record_batch).len()
2917 );
2918 assert_eq!(
2919 serialize_stream(&small_record_batch).len(),
2920 serialize_stream(&record_batch_slice).len()
2921 );
2922
2923 assert_eq!(
2924 deserialize_stream(serialize_stream(&record_batch_slice)),
2925 record_batch_slice
2926 );
2927 }
2928
2929 #[test]
2930 fn truncate_ipc_record_batch_with_nulls() {
2931 fn create_batch() -> RecordBatch {
2932 let schema = Schema::new(vec![
2933 Field::new("a", DataType::Int32, true),
2934 Field::new("b", DataType::Utf8, true),
2935 ]);
2936
2937 let a = Int32Array::from(vec![Some(1), None, Some(1), None, Some(1)]);
2938 let b = StringArray::from(vec![None, Some("a"), Some("a"), None, Some("a")]);
2939
2940 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap()
2941 }
2942
2943 let record_batch = create_batch();
2944 let record_batch_slice = record_batch.slice(1, 2);
2945 let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2946
2947 assert!(
2948 serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2949 );
2950
2951 assert!(deserialized_batch.column(0).is_null(0));
2952 assert!(deserialized_batch.column(0).is_valid(1));
2953 assert!(deserialized_batch.column(1).is_valid(0));
2954 assert!(deserialized_batch.column(1).is_valid(1));
2955
2956 assert_eq!(record_batch_slice, deserialized_batch);
2957 }
2958
2959 #[test]
2960 fn truncate_ipc_dictionary_array() {
2961 fn create_batch() -> RecordBatch {
2962 let values: StringArray = [Some("foo"), Some("bar"), Some("baz")]
2963 .into_iter()
2964 .collect();
2965 let keys: Int32Array = [Some(0), Some(2), None, Some(1)].into_iter().collect();
2966
2967 let array = DictionaryArray::new(keys, Arc::new(values));
2968
2969 let schema = Schema::new(vec![Field::new("dict", array.data_type().clone(), true)]);
2970
2971 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap()
2972 }
2973
2974 let record_batch = create_batch();
2975 let record_batch_slice = record_batch.slice(1, 2);
2976 let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2977
2978 assert!(
2979 serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2980 );
2981
2982 assert!(deserialized_batch.column(0).is_valid(0));
2983 assert!(deserialized_batch.column(0).is_null(1));
2984
2985 assert_eq!(record_batch_slice, deserialized_batch);
2986 }
2987
2988 #[test]
2989 fn truncate_ipc_struct_array() {
2990 fn create_batch() -> RecordBatch {
2991 let strings: StringArray = [Some("foo"), None, Some("bar"), Some("baz")]
2992 .into_iter()
2993 .collect();
2994 let ints: Int32Array = [Some(0), Some(2), None, Some(1)].into_iter().collect();
2995
2996 let struct_array = StructArray::from(vec![
2997 (
2998 Arc::new(Field::new("s", DataType::Utf8, true)),
2999 Arc::new(strings) as ArrayRef,
3000 ),
3001 (
3002 Arc::new(Field::new("c", DataType::Int32, true)),
3003 Arc::new(ints) as ArrayRef,
3004 ),
3005 ]);
3006
3007 let schema = Schema::new(vec![Field::new(
3008 "struct_array",
3009 struct_array.data_type().clone(),
3010 true,
3011 )]);
3012
3013 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(struct_array)]).unwrap()
3014 }
3015
3016 let record_batch = create_batch();
3017 let record_batch_slice = record_batch.slice(1, 2);
3018 let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
3019
3020 assert!(
3021 serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
3022 );
3023
3024 let structs = deserialized_batch
3025 .column(0)
3026 .as_any()
3027 .downcast_ref::<StructArray>()
3028 .unwrap();
3029
3030 assert!(structs.column(0).is_null(0));
3031 assert!(structs.column(0).is_valid(1));
3032 assert!(structs.column(1).is_valid(0));
3033 assert!(structs.column(1).is_null(1));
3034 assert_eq!(record_batch_slice, deserialized_batch);
3035 }
3036
3037 #[test]
3038 fn truncate_ipc_string_array_with_all_empty_string() {
3039 fn create_batch() -> RecordBatch {
3040 let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
3041 let a = StringArray::from(vec![Some(""), Some(""), Some(""), Some(""), Some("")]);
3042 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap()
3043 }
3044
3045 let record_batch = create_batch();
3046 let record_batch_slice = record_batch.slice(0, 1);
3047 let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
3048
3049 assert!(
3050 serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
3051 );
3052 assert_eq!(record_batch_slice, deserialized_batch);
3053 }
3054
3055 #[test]
3056 fn test_stream_writer_writes_array_slice() {
3057 let array = UInt32Array::from(vec![Some(1), Some(2), Some(3)]);
3058 assert_eq!(
3059 vec![Some(1), Some(2), Some(3)],
3060 array.iter().collect::<Vec<_>>()
3061 );
3062
3063 let sliced = array.slice(1, 2);
3064 assert_eq!(vec![Some(2), Some(3)], sliced.iter().collect::<Vec<_>>());
3065
3066 let batch = RecordBatch::try_new(
3067 Arc::new(Schema::new(vec![Field::new("a", DataType::UInt32, true)])),
3068 vec![Arc::new(sliced)],
3069 )
3070 .expect("new batch");
3071
3072 let mut writer = StreamWriter::try_new(vec![], batch.schema_ref()).expect("new writer");
3073 writer.write(&batch).expect("write");
3074 let outbuf = writer.into_inner().expect("inner");
3075
3076 let mut reader = StreamReader::try_new(&outbuf[..], None).expect("new reader");
3077 let read_batch = reader.next().unwrap().expect("read batch");
3078
3079 let read_array: &UInt32Array = read_batch.column(0).as_primitive();
3080 assert_eq!(
3081 vec![Some(2), Some(3)],
3082 read_array.iter().collect::<Vec<_>>()
3083 );
3084 }
3085
3086 #[test]
3087 fn test_large_slice_uint32() {
3088 ensure_roundtrip(Arc::new(UInt32Array::from_iter(
3089 (0..8000).map(|i| if i % 2 == 0 { Some(i) } else { None }),
3090 )));
3091 }
3092
3093 #[test]
3094 fn test_large_slice_string() {
3095 let strings: Vec<_> = (0..8000)
3096 .map(|i| {
3097 if i % 2 == 0 {
3098 Some(format!("value{i}"))
3099 } else {
3100 None
3101 }
3102 })
3103 .collect();
3104
3105 ensure_roundtrip(Arc::new(StringArray::from(strings)));
3106 }
3107
3108 #[test]
3109 fn test_large_slice_string_list() {
3110 let mut ls = ListBuilder::new(StringBuilder::new());
3111
3112 let mut s = String::new();
3113 for row_number in 0..8000 {
3114 if row_number % 2 == 0 {
3115 for list_element in 0..1000 {
3116 s.clear();
3117 use std::fmt::Write;
3118 write!(&mut s, "value{row_number}-{list_element}").unwrap();
3119 ls.values().append_value(&s);
3120 }
3121 ls.append(true)
3122 } else {
3123 ls.append(false); }
3125 }
3126
3127 ensure_roundtrip(Arc::new(ls.finish()));
3128 }
3129
3130 #[test]
3131 fn test_large_slice_string_list_of_lists() {
3132 let mut ls = ListBuilder::new(ListBuilder::new(StringBuilder::new()));
3136
3137 for _ in 0..4000 {
3138 ls.values().append(true);
3139 ls.append(true)
3140 }
3141
3142 let mut s = String::new();
3143 for row_number in 0..4000 {
3144 if row_number % 2 == 0 {
3145 for list_element in 0..1000 {
3146 s.clear();
3147 use std::fmt::Write;
3148 write!(&mut s, "value{row_number}-{list_element}").unwrap();
3149 ls.values().values().append_value(&s);
3150 }
3151 ls.values().append(true);
3152 ls.append(true)
3153 } else {
3154 ls.append(false); }
3156 }
3157
3158 ensure_roundtrip(Arc::new(ls.finish()));
3159 }
3160
3161 fn ensure_roundtrip(array: ArrayRef) {
3163 let num_rows = array.len();
3164 let orig_batch = RecordBatch::try_from_iter(vec![("a", array)]).unwrap();
3165 let sliced_batch = orig_batch.slice(1, num_rows - 1);
3167
3168 let schema = orig_batch.schema();
3169 let stream_data = {
3170 let mut writer = StreamWriter::try_new(vec![], &schema).unwrap();
3171 writer.write(&sliced_batch).unwrap();
3172 writer.into_inner().unwrap()
3173 };
3174 let read_batch = {
3175 let projection = None;
3176 let mut reader = StreamReader::try_new(Cursor::new(stream_data), projection).unwrap();
3177 reader
3178 .next()
3179 .expect("expect no errors reading batch")
3180 .expect("expect batch")
3181 };
3182 assert_eq!(sliced_batch, read_batch);
3183
3184 let file_data = {
3185 let mut writer = FileWriter::try_new_buffered(vec![], &schema).unwrap();
3186 writer.write(&sliced_batch).unwrap();
3187 writer.into_inner().unwrap().into_inner().unwrap()
3188 };
3189 let read_batch = {
3190 let projection = None;
3191 let mut reader = FileReader::try_new(Cursor::new(file_data), projection).unwrap();
3192 reader
3193 .next()
3194 .expect("expect no errors reading batch")
3195 .expect("expect batch")
3196 };
3197 assert_eq!(sliced_batch, read_batch);
3198
3199 }
3201
3202 #[test]
3203 fn encode_bools_slice() {
3204 assert_bool_roundtrip([true, false], 1, 1);
3206
3207 assert_bool_roundtrip(
3209 [
3210 true, false, true, true, false, false, true, true, true, false, false, false, true,
3211 true, true, true, false, false, false, false, true, true, true, true, true, false,
3212 false, false, false, false,
3213 ],
3214 13,
3215 17,
3216 );
3217
3218 assert_bool_roundtrip(
3220 [
3221 true, false, true, true, false, false, true, true, true, false, false, false,
3222 ],
3223 8,
3224 2,
3225 );
3226
3227 assert_bool_roundtrip(
3229 [
3230 true, false, true, true, false, false, true, true, true, false, false, false, true,
3231 true, true, true, true, false, false, false, false, false,
3232 ],
3233 8,
3234 8,
3235 );
3236 }
3237
3238 fn assert_bool_roundtrip<const N: usize>(bools: [bool; N], offset: usize, length: usize) {
3239 let val_bool_field = Field::new("val", DataType::Boolean, false);
3240
3241 let schema = Arc::new(Schema::new(vec![val_bool_field]));
3242
3243 let bools = BooleanArray::from(bools.to_vec());
3244
3245 let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(bools)]).unwrap();
3246 let batch = batch.slice(offset, length);
3247
3248 let data = serialize_stream(&batch);
3249 let batch2 = deserialize_stream(data);
3250 assert_eq!(batch, batch2);
3251 }
3252
3253 #[test]
3254 fn test_run_array_unslice() {
3255 let total_len = 80;
3256 let vals: Vec<Option<i32>> = vec![Some(1), None, Some(2), Some(3), Some(4), None, Some(5)];
3257 let repeats: Vec<usize> = vec![3, 4, 1, 2];
3258 let mut input_array: Vec<Option<i32>> = Vec::with_capacity(total_len);
3259 for ix in 0_usize..32 {
3260 let repeat: usize = repeats[ix % repeats.len()];
3261 let val: Option<i32> = vals[ix % vals.len()];
3262 input_array.resize(input_array.len() + repeat, val);
3263 }
3264
3265 let mut builder =
3267 PrimitiveRunBuilder::<Int16Type, Int32Type>::with_capacity(input_array.len());
3268 builder.extend(input_array.iter().copied());
3269 let run_array = builder.finish();
3270
3271 for slice_len in 1..=total_len {
3273 let sliced_run_array: RunArray<Int16Type> =
3275 run_array.slice(0, slice_len).into_data().into();
3276
3277 let unsliced_run_array = into_zero_offset_run_array(sliced_run_array).unwrap();
3279 let typed = unsliced_run_array
3280 .downcast::<PrimitiveArray<Int32Type>>()
3281 .unwrap();
3282 let expected: Vec<Option<i32>> = input_array.iter().take(slice_len).copied().collect();
3283 let actual: Vec<Option<i32>> = typed.into_iter().collect();
3284 assert_eq!(expected, actual);
3285
3286 let sliced_run_array: RunArray<Int16Type> = run_array
3288 .slice(total_len - slice_len, slice_len)
3289 .into_data()
3290 .into();
3291
3292 let unsliced_run_array = into_zero_offset_run_array(sliced_run_array).unwrap();
3294 let typed = unsliced_run_array
3295 .downcast::<PrimitiveArray<Int32Type>>()
3296 .unwrap();
3297 let expected: Vec<Option<i32>> = input_array
3298 .iter()
3299 .skip(total_len - slice_len)
3300 .copied()
3301 .collect();
3302 let actual: Vec<Option<i32>> = typed.into_iter().collect();
3303 assert_eq!(expected, actual);
3304 }
3305 }
3306
3307 fn generate_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3308 let mut ls = GenericListBuilder::<O, _>::new(UInt32Builder::new());
3309
3310 for i in 0..100_000 {
3311 for value in [i, i, i] {
3312 ls.values().append_value(value);
3313 }
3314 ls.append(true)
3315 }
3316
3317 ls.finish()
3318 }
3319
3320 fn generate_utf8view_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3321 let mut ls = GenericListBuilder::<O, _>::new(StringViewBuilder::new());
3322
3323 for i in 0..100_000 {
3324 for value in [
3325 format!("value{}", i),
3326 format!("value{}", i),
3327 format!("value{}", i),
3328 ] {
3329 ls.values().append_value(&value);
3330 }
3331 ls.append(true)
3332 }
3333
3334 ls.finish()
3335 }
3336
3337 fn generate_string_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3338 let mut ls = GenericListBuilder::<O, _>::new(StringBuilder::new());
3339
3340 for i in 0..100_000 {
3341 for value in [
3342 format!("value{}", i),
3343 format!("value{}", i),
3344 format!("value{}", i),
3345 ] {
3346 ls.values().append_value(&value);
3347 }
3348 ls.append(true)
3349 }
3350
3351 ls.finish()
3352 }
3353
3354 fn generate_nested_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3355 let mut ls =
3356 GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(UInt32Builder::new()));
3357
3358 for _i in 0..10_000 {
3359 for j in 0..10 {
3360 for value in [j, j, j, j] {
3361 ls.values().values().append_value(value);
3362 }
3363 ls.values().append(true)
3364 }
3365 ls.append(true);
3366 }
3367
3368 ls.finish()
3369 }
3370
3371 fn generate_nested_list_data_starting_at_zero<O: OffsetSizeTrait>() -> GenericListArray<O> {
3372 let mut ls =
3373 GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(UInt32Builder::new()));
3374
3375 for _i in 0..999 {
3376 ls.values().append(true);
3377 ls.append(true);
3378 }
3379
3380 for j in 0..10 {
3381 for value in [j, j, j, j] {
3382 ls.values().values().append_value(value);
3383 }
3384 ls.values().append(true)
3385 }
3386 ls.append(true);
3387
3388 for i in 0..9_000 {
3389 for j in 0..10 {
3390 for value in [i + j, i + j, i + j, i + j] {
3391 ls.values().values().append_value(value);
3392 }
3393 ls.values().append(true)
3394 }
3395 ls.append(true);
3396 }
3397
3398 ls.finish()
3399 }
3400
3401 fn generate_map_array_data() -> MapArray {
3402 let keys_builder = UInt32Builder::new();
3403 let values_builder = UInt32Builder::new();
3404
3405 let mut builder = MapBuilder::new(None, keys_builder, values_builder);
3406
3407 for i in 0..100_000 {
3408 for _j in 0..3 {
3409 builder.keys().append_value(i);
3410 builder.values().append_value(i * 2);
3411 }
3412 builder.append(true).unwrap();
3413 }
3414
3415 builder.finish()
3416 }
3417
3418 #[test]
3419 fn reencode_offsets_when_first_offset_is_not_zero() {
3420 let original_list = generate_list_data::<i32>();
3421 let original_data = original_list.into_data();
3422 let slice_data = original_data.slice(75, 7);
3423 let (new_offsets, original_start, length) =
3424 reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
3425 assert_eq!(
3426 vec![0, 3, 6, 9, 12, 15, 18, 21],
3427 new_offsets.typed_data::<i32>()
3428 );
3429 assert_eq!(225, original_start);
3430 assert_eq!(21, length);
3431 }
3432
3433 #[test]
3434 fn reencode_offsets_when_first_offset_is_zero() {
3435 let mut ls = GenericListBuilder::<i32, _>::new(UInt32Builder::new());
3436 ls.append(true);
3438 ls.values().append_value(35);
3439 ls.values().append_value(42);
3440 ls.append(true);
3441 let original_list = ls.finish();
3442 let original_data = original_list.into_data();
3443
3444 let slice_data = original_data.slice(1, 1);
3445 let (new_offsets, original_start, length) =
3446 reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
3447 assert_eq!(vec![0, 2], new_offsets.typed_data::<i32>());
3448 assert_eq!(0, original_start);
3449 assert_eq!(2, length);
3450 }
3451
3452 fn roundtrip_ensure_sliced_smaller(in_batch: RecordBatch, expected_size_factor: usize) {
3455 let in_sliced = in_batch.slice(999, 1);
3457
3458 let bytes_batch = serialize_file(&in_batch);
3459 let bytes_sliced = serialize_file(&in_sliced);
3460
3461 assert!(bytes_sliced.len() < (bytes_batch.len() / expected_size_factor));
3463
3464 let out_batch = deserialize_file(bytes_batch);
3466 assert_eq!(in_batch, out_batch);
3467
3468 let out_sliced = deserialize_file(bytes_sliced);
3469 assert_eq!(in_sliced, out_sliced);
3470 }
3471
3472 #[test]
3473 fn encode_lists() {
3474 let val_inner = Field::new_list_field(DataType::UInt32, true);
3475 let val_list_field = Field::new("val", DataType::List(Arc::new(val_inner)), false);
3476 let schema = Arc::new(Schema::new(vec![val_list_field]));
3477
3478 let values = Arc::new(generate_list_data::<i32>());
3479
3480 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3481 roundtrip_ensure_sliced_smaller(in_batch, 1000);
3482 }
3483
3484 #[test]
3485 fn encode_empty_list() {
3486 let val_inner = Field::new_list_field(DataType::UInt32, true);
3487 let val_list_field = Field::new("val", DataType::List(Arc::new(val_inner)), false);
3488 let schema = Arc::new(Schema::new(vec![val_list_field]));
3489
3490 let values = Arc::new(generate_list_data::<i32>());
3491
3492 let in_batch = RecordBatch::try_new(schema, vec![values])
3493 .unwrap()
3494 .slice(999, 0);
3495 let out_batch = deserialize_file(serialize_file(&in_batch));
3496 assert_eq!(in_batch, out_batch);
3497 }
3498
3499 #[test]
3500 fn encode_large_lists() {
3501 let val_inner = Field::new_list_field(DataType::UInt32, true);
3502 let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3503 let schema = Arc::new(Schema::new(vec![val_list_field]));
3504
3505 let values = Arc::new(generate_list_data::<i64>());
3506
3507 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3510 roundtrip_ensure_sliced_smaller(in_batch, 1000);
3511 }
3512
3513 #[test]
3514 fn encode_large_lists_non_zero_offset() {
3515 let val_inner = Field::new_list_field(DataType::UInt32, true);
3516 let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3517 let schema = Arc::new(Schema::new(vec![val_list_field]));
3518
3519 let values = Arc::new(generate_list_data::<i64>());
3520
3521 check_sliced_list_array(schema, values);
3522 }
3523
3524 #[test]
3525 fn encode_large_lists_string_non_zero_offset() {
3526 let val_inner = Field::new_list_field(DataType::Utf8, true);
3527 let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3528 let schema = Arc::new(Schema::new(vec![val_list_field]));
3529
3530 let values = Arc::new(generate_string_list_data::<i64>());
3531
3532 check_sliced_list_array(schema, values);
3533 }
3534
3535 #[test]
3536 fn encode_large_list_string_view_non_zero_offset() {
3537 let val_inner = Field::new_list_field(DataType::Utf8View, true);
3538 let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3539 let schema = Arc::new(Schema::new(vec![val_list_field]));
3540
3541 let values = Arc::new(generate_utf8view_list_data::<i64>());
3542
3543 check_sliced_list_array(schema, values);
3544 }
3545
3546 fn check_sliced_list_array(schema: Arc<Schema>, values: Arc<GenericListArray<i64>>) {
3547 for (offset, len) in [(999, 1), (0, 13), (47, 12), (values.len() - 13, 13)] {
3548 let in_batch = RecordBatch::try_new(schema.clone(), vec![values.clone()])
3549 .unwrap()
3550 .slice(offset, len);
3551 let out_batch = deserialize_file(serialize_file(&in_batch));
3552 assert_eq!(in_batch, out_batch);
3553 }
3554 }
3555
3556 #[test]
3557 fn encode_nested_lists() {
3558 let inner_int = Arc::new(Field::new_list_field(DataType::UInt32, true));
3559 let inner_list_field = Arc::new(Field::new_list_field(DataType::List(inner_int), true));
3560 let list_field = Field::new("val", DataType::List(inner_list_field), true);
3561 let schema = Arc::new(Schema::new(vec![list_field]));
3562
3563 let values = Arc::new(generate_nested_list_data::<i32>());
3564
3565 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3566 roundtrip_ensure_sliced_smaller(in_batch, 1000);
3567 }
3568
3569 #[test]
3570 fn encode_nested_lists_starting_at_zero() {
3571 let inner_int = Arc::new(Field::new("item", DataType::UInt32, true));
3572 let inner_list_field = Arc::new(Field::new("item", DataType::List(inner_int), true));
3573 let list_field = Field::new("val", DataType::List(inner_list_field), true);
3574 let schema = Arc::new(Schema::new(vec![list_field]));
3575
3576 let values = Arc::new(generate_nested_list_data_starting_at_zero::<i32>());
3577
3578 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3579 roundtrip_ensure_sliced_smaller(in_batch, 1);
3580 }
3581
3582 #[test]
3583 fn encode_map_array() {
3584 let keys = Arc::new(Field::new("keys", DataType::UInt32, false));
3585 let values = Arc::new(Field::new("values", DataType::UInt32, true));
3586 let map_field = Field::new_map("map", "entries", keys, values, false, true);
3587 let schema = Arc::new(Schema::new(vec![map_field]));
3588
3589 let values = Arc::new(generate_map_array_data());
3590
3591 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3592 roundtrip_ensure_sliced_smaller(in_batch, 1000);
3593 }
3594
3595 fn generate_list_view_data<O: OffsetSizeTrait>() -> GenericListViewArray<O> {
3596 let mut builder = GenericListViewBuilder::<O, _>::new(UInt32Builder::new());
3597
3598 for i in 0u32..100_000 {
3599 if i.is_multiple_of(10_000) {
3600 builder.append(false);
3601 continue;
3602 }
3603 for value in [i, i, i] {
3604 builder.values().append_value(value);
3605 }
3606 builder.append(true);
3607 }
3608
3609 builder.finish()
3610 }
3611
3612 #[test]
3613 fn encode_list_view_arrays() {
3614 let val_inner = Field::new_list_field(DataType::UInt32, true);
3615 let val_field = Field::new("val", DataType::ListView(Arc::new(val_inner)), true);
3616 let schema = Arc::new(Schema::new(vec![val_field]));
3617
3618 let values = Arc::new(generate_list_view_data::<i32>());
3619
3620 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3621 let out_batch = deserialize_file(serialize_file(&in_batch));
3622 assert_eq!(in_batch, out_batch);
3623 }
3624
3625 #[test]
3626 fn encode_large_list_view_arrays() {
3627 let val_inner = Field::new_list_field(DataType::UInt32, true);
3628 let val_field = Field::new("val", DataType::LargeListView(Arc::new(val_inner)), true);
3629 let schema = Arc::new(Schema::new(vec![val_field]));
3630
3631 let values = Arc::new(generate_list_view_data::<i64>());
3632
3633 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3634 let out_batch = deserialize_file(serialize_file(&in_batch));
3635 assert_eq!(in_batch, out_batch);
3636 }
3637
3638 #[test]
3639 fn check_sliced_list_view_array() {
3640 let inner = Field::new_list_field(DataType::UInt32, true);
3641 let field = Field::new("val", DataType::ListView(Arc::new(inner)), true);
3642 let schema = Arc::new(Schema::new(vec![field]));
3643 let values = Arc::new(generate_list_view_data::<i32>());
3644
3645 for (offset, len) in [(999, 1), (0, 13), (47, 12), (values.len() - 13, 13)] {
3646 let in_batch = RecordBatch::try_new(schema.clone(), vec![values.clone()])
3647 .unwrap()
3648 .slice(offset, len);
3649 let out_batch = deserialize_file(serialize_file(&in_batch));
3650 assert_eq!(in_batch, out_batch);
3651 }
3652 }
3653
3654 #[test]
3655 fn check_sliced_large_list_view_array() {
3656 let inner = Field::new_list_field(DataType::UInt32, true);
3657 let field = Field::new("val", DataType::LargeListView(Arc::new(inner)), true);
3658 let schema = Arc::new(Schema::new(vec![field]));
3659 let values = Arc::new(generate_list_view_data::<i64>());
3660
3661 for (offset, len) in [(999, 1), (0, 13), (47, 12), (values.len() - 13, 13)] {
3662 let in_batch = RecordBatch::try_new(schema.clone(), vec![values.clone()])
3663 .unwrap()
3664 .slice(offset, len);
3665 let out_batch = deserialize_file(serialize_file(&in_batch));
3666 assert_eq!(in_batch, out_batch);
3667 }
3668 }
3669
3670 fn generate_nested_list_view_data<O: OffsetSizeTrait>() -> GenericListViewArray<O> {
3671 let inner_builder = UInt32Builder::new();
3672 let middle_builder = GenericListViewBuilder::<O, _>::new(inner_builder);
3673 let mut outer_builder = GenericListViewBuilder::<O, _>::new(middle_builder);
3674
3675 for i in 0u32..10_000 {
3676 if i.is_multiple_of(1_000) {
3677 outer_builder.append(false);
3678 continue;
3679 }
3680
3681 for _ in 0..3 {
3682 for value in [i, i + 1, i + 2] {
3683 outer_builder.values().values().append_value(value);
3684 }
3685 outer_builder.values().append(true);
3686 }
3687 outer_builder.append(true);
3688 }
3689
3690 outer_builder.finish()
3691 }
3692
3693 #[test]
3694 fn encode_nested_list_views() {
3695 let inner_int = Arc::new(Field::new_list_field(DataType::UInt32, true));
3696 let inner_list_field = Arc::new(Field::new_list_field(DataType::ListView(inner_int), true));
3697 let list_field = Field::new("val", DataType::ListView(inner_list_field), true);
3698 let schema = Arc::new(Schema::new(vec![list_field]));
3699
3700 let values = Arc::new(generate_nested_list_view_data::<i32>());
3701
3702 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3703 let out_batch = deserialize_file(serialize_file(&in_batch));
3704 assert_eq!(in_batch, out_batch);
3705 }
3706
3707 fn test_roundtrip_list_view_of_dict_impl<OffsetSize: OffsetSizeTrait, U: ArrowNativeType>(
3708 list_data_type: DataType,
3709 offsets: &[U; 5],
3710 sizes: &[U; 4],
3711 ) {
3712 let values = StringArray::from(vec![Some("alpha"), None, Some("beta"), Some("gamma")]);
3713 let keys = Int32Array::from_iter_values([0, 0, 1, 2, 3, 0, 2]);
3714 let dict_array = DictionaryArray::new(keys, Arc::new(values));
3715 let dict_data = dict_array.to_data();
3716
3717 let value_offsets = Buffer::from_slice_ref(offsets);
3718 let value_sizes = Buffer::from_slice_ref(sizes);
3719
3720 let list_data = ArrayData::builder(list_data_type)
3721 .len(4)
3722 .add_buffer(value_offsets)
3723 .add_buffer(value_sizes)
3724 .add_child_data(dict_data)
3725 .build()
3726 .unwrap();
3727 let list_view_array = GenericListViewArray::<OffsetSize>::from(list_data);
3728
3729 let schema = Arc::new(Schema::new(vec![Field::new(
3730 "f1",
3731 list_view_array.data_type().clone(),
3732 false,
3733 )]));
3734 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(list_view_array)]).unwrap();
3735
3736 let output_batch = deserialize_file(serialize_file(&input_batch));
3737 assert_eq!(input_batch, output_batch);
3738
3739 let output_batch = deserialize_stream(serialize_stream(&input_batch));
3740 assert_eq!(input_batch, output_batch);
3741 }
3742
3743 #[test]
3744 fn test_roundtrip_list_view_of_dict() {
3745 #[allow(deprecated)]
3746 let list_data_type = DataType::ListView(Arc::new(Field::new_dict(
3747 "item",
3748 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3749 true,
3750 1,
3751 false,
3752 )));
3753 let offsets: &[i32; 5] = &[0, 2, 4, 4, 7];
3754 let sizes: &[i32; 4] = &[2, 2, 0, 3];
3755 test_roundtrip_list_view_of_dict_impl::<i32, i32>(list_data_type, offsets, sizes);
3756 }
3757
3758 #[test]
3759 fn test_roundtrip_large_list_view_of_dict() {
3760 #[allow(deprecated)]
3761 let list_data_type = DataType::LargeListView(Arc::new(Field::new_dict(
3762 "item",
3763 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3764 true,
3765 2,
3766 false,
3767 )));
3768 let offsets: &[i64; 5] = &[0, 2, 4, 4, 7];
3769 let sizes: &[i64; 4] = &[2, 2, 0, 3];
3770 test_roundtrip_list_view_of_dict_impl::<i64, i64>(list_data_type, offsets, sizes);
3771 }
3772
3773 #[test]
3774 fn test_roundtrip_sliced_list_view_of_dict() {
3775 #[allow(deprecated)]
3776 let list_data_type = DataType::ListView(Arc::new(Field::new_dict(
3777 "item",
3778 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3779 true,
3780 3,
3781 false,
3782 )));
3783
3784 let values = StringArray::from(vec![Some("alpha"), None, Some("beta"), Some("gamma")]);
3785 let keys = Int32Array::from_iter_values([0, 0, 1, 2, 3, 0, 2, 1, 0, 3, 2, 1]);
3786 let dict_array = DictionaryArray::new(keys, Arc::new(values));
3787 let dict_data = dict_array.to_data();
3788
3789 let offsets: &[i32; 7] = &[0, 2, 4, 4, 7, 9, 12];
3790 let sizes: &[i32; 6] = &[2, 2, 0, 3, 2, 3];
3791 let value_offsets = Buffer::from_slice_ref(offsets);
3792 let value_sizes = Buffer::from_slice_ref(sizes);
3793
3794 let list_data = ArrayData::builder(list_data_type)
3795 .len(6)
3796 .add_buffer(value_offsets)
3797 .add_buffer(value_sizes)
3798 .add_child_data(dict_data)
3799 .build()
3800 .unwrap();
3801 let list_view_array = GenericListViewArray::<i32>::from(list_data);
3802
3803 let schema = Arc::new(Schema::new(vec![Field::new(
3804 "f1",
3805 list_view_array.data_type().clone(),
3806 false,
3807 )]));
3808 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(list_view_array)]).unwrap();
3809
3810 let sliced_batch = input_batch.slice(1, 4);
3811
3812 let output_batch = deserialize_file(serialize_file(&sliced_batch));
3813 assert_eq!(sliced_batch, output_batch);
3814
3815 let output_batch = deserialize_stream(serialize_stream(&sliced_batch));
3816 assert_eq!(sliced_batch, output_batch);
3817 }
3818
3819 #[test]
3820 fn test_roundtrip_dense_union_of_dict() {
3821 let values = StringArray::from(vec![Some("alpha"), None, Some("beta"), Some("gamma")]);
3822 let keys = Int32Array::from_iter_values([0, 0, 1, 2, 3, 0, 2]);
3823 let dict_array = DictionaryArray::new(keys, Arc::new(values));
3824
3825 #[allow(deprecated)]
3826 let dict_field = Arc::new(Field::new_dict(
3827 "dict",
3828 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3829 true,
3830 1,
3831 false,
3832 ));
3833 let int_field = Arc::new(Field::new("int", DataType::Int32, false));
3834 let union_fields = UnionFields::try_new(vec![0, 1], vec![dict_field, int_field]).unwrap();
3835
3836 let types = ScalarBuffer::from(vec![0i8, 0, 1, 0, 1, 0, 0]);
3837 let offsets = ScalarBuffer::from(vec![0i32, 1, 0, 2, 1, 3, 4]);
3838
3839 let int_array = Int32Array::from(vec![100, 200]);
3840
3841 let union = UnionArray::try_new(
3842 union_fields.clone(),
3843 types,
3844 Some(offsets),
3845 vec![Arc::new(dict_array), Arc::new(int_array)],
3846 )
3847 .unwrap();
3848
3849 let schema = Arc::new(Schema::new(vec![Field::new(
3850 "union",
3851 DataType::Union(union_fields, UnionMode::Dense),
3852 false,
3853 )]));
3854 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap();
3855
3856 let output_batch = deserialize_file(serialize_file(&input_batch));
3857 assert_eq!(input_batch, output_batch);
3858
3859 let output_batch = deserialize_stream(serialize_stream(&input_batch));
3860 assert_eq!(input_batch, output_batch);
3861 }
3862
3863 #[test]
3864 fn test_roundtrip_sparse_union_of_dict() {
3865 let values = StringArray::from(vec![Some("alpha"), None, Some("beta"), Some("gamma")]);
3866 let keys = Int32Array::from_iter_values([0, 0, 1, 2, 3, 0, 2]);
3867 let dict_array = DictionaryArray::new(keys, Arc::new(values));
3868
3869 #[allow(deprecated)]
3870 let dict_field = Arc::new(Field::new_dict(
3871 "dict",
3872 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3873 true,
3874 2,
3875 false,
3876 ));
3877 let int_field = Arc::new(Field::new("int", DataType::Int32, false));
3878 let union_fields = UnionFields::try_new(vec![0, 1], vec![dict_field, int_field]).unwrap();
3879
3880 let types = ScalarBuffer::from(vec![0i8, 0, 1, 0, 1, 0, 0]);
3881
3882 let int_array = Int32Array::from(vec![0, 0, 100, 0, 200, 0, 0]);
3883
3884 let union = UnionArray::try_new(
3885 union_fields.clone(),
3886 types,
3887 None,
3888 vec![Arc::new(dict_array), Arc::new(int_array)],
3889 )
3890 .unwrap();
3891
3892 let schema = Arc::new(Schema::new(vec![Field::new(
3893 "union",
3894 DataType::Union(union_fields, UnionMode::Sparse),
3895 false,
3896 )]));
3897 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap();
3898
3899 let output_batch = deserialize_file(serialize_file(&input_batch));
3900 assert_eq!(input_batch, output_batch);
3901
3902 let output_batch = deserialize_stream(serialize_stream(&input_batch));
3903 assert_eq!(input_batch, output_batch);
3904 }
3905
3906 #[test]
3907 fn test_roundtrip_map_with_dict_keys() {
3908 let key_values = StringArray::from(vec!["key_a", "key_b", "key_c"]);
3911 let keys = Int32Array::from_iter_values([0, 1, 2, 0, 1, 0]);
3912 let dict_keys = DictionaryArray::new(keys, Arc::new(key_values));
3913
3914 let values = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
3915
3916 #[allow(deprecated)]
3917 let entries_field = Arc::new(Field::new(
3918 "entries",
3919 DataType::Struct(
3920 vec![
3921 Field::new_dict(
3922 "key",
3923 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3924 false,
3925 1,
3926 false,
3927 ),
3928 Field::new("value", DataType::Int32, true),
3929 ]
3930 .into(),
3931 ),
3932 false,
3933 ));
3934
3935 let entries = StructArray::from(vec![
3936 (
3937 Arc::new(Field::new(
3938 "key",
3939 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3940 false,
3941 )),
3942 Arc::new(dict_keys) as ArrayRef,
3943 ),
3944 (
3945 Arc::new(Field::new("value", DataType::Int32, true)),
3946 Arc::new(values) as ArrayRef,
3947 ),
3948 ]);
3949
3950 let offsets = Buffer::from_slice_ref([0i32, 2, 4, 6]);
3951
3952 let map_data = ArrayData::builder(DataType::Map(entries_field, false))
3953 .len(3)
3954 .add_buffer(offsets)
3955 .add_child_data(entries.into_data())
3956 .build()
3957 .unwrap();
3958 let map_array = MapArray::from(map_data);
3959
3960 let schema = Arc::new(Schema::new(vec![Field::new(
3961 "map",
3962 map_array.data_type().clone(),
3963 false,
3964 )]));
3965 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(map_array)]).unwrap();
3966
3967 let output_batch = deserialize_file(serialize_file(&input_batch));
3968 assert_eq!(input_batch, output_batch);
3969
3970 let output_batch = deserialize_stream(serialize_stream(&input_batch));
3971 assert_eq!(input_batch, output_batch);
3972 }
3973
3974 #[test]
3975 fn test_roundtrip_map_with_dict_values() {
3976 let keys = StringArray::from(vec!["a", "b", "c", "d", "e", "f"]);
3979
3980 let value_values = StringArray::from(vec!["val_x", "val_y", "val_z"]);
3981 let value_keys = Int32Array::from_iter_values([0, 1, 2, 0, 1, 0]);
3982 let dict_values = DictionaryArray::new(value_keys, Arc::new(value_values));
3983
3984 #[allow(deprecated)]
3985 let entries_field = Arc::new(Field::new(
3986 "entries",
3987 DataType::Struct(
3988 vec![
3989 Field::new("key", DataType::Utf8, false),
3990 Field::new_dict(
3991 "value",
3992 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3993 true,
3994 2,
3995 false,
3996 ),
3997 ]
3998 .into(),
3999 ),
4000 false,
4001 ));
4002
4003 let entries = StructArray::from(vec![
4004 (
4005 Arc::new(Field::new("key", DataType::Utf8, false)),
4006 Arc::new(keys) as ArrayRef,
4007 ),
4008 (
4009 Arc::new(Field::new(
4010 "value",
4011 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
4012 true,
4013 )),
4014 Arc::new(dict_values) as ArrayRef,
4015 ),
4016 ]);
4017
4018 let offsets = Buffer::from_slice_ref([0i32, 2, 4, 6]);
4019
4020 let map_data = ArrayData::builder(DataType::Map(entries_field, false))
4021 .len(3)
4022 .add_buffer(offsets)
4023 .add_child_data(entries.into_data())
4024 .build()
4025 .unwrap();
4026 let map_array = MapArray::from(map_data);
4027
4028 let schema = Arc::new(Schema::new(vec![Field::new(
4029 "map",
4030 map_array.data_type().clone(),
4031 false,
4032 )]));
4033 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(map_array)]).unwrap();
4034
4035 let output_batch = deserialize_file(serialize_file(&input_batch));
4036 assert_eq!(input_batch, output_batch);
4037
4038 let output_batch = deserialize_stream(serialize_stream(&input_batch));
4039 assert_eq!(input_batch, output_batch);
4040 }
4041
4042 #[test]
4043 fn test_decimal128_alignment16_is_sufficient() {
4044 const IPC_ALIGNMENT: usize = 16;
4045
4046 for num_cols in [1, 2, 3, 17, 50, 73, 99] {
4051 let num_rows = (num_cols * 7 + 11) % 100; let mut fields = Vec::new();
4054 let mut arrays = Vec::new();
4055 for i in 0..num_cols {
4056 let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
4057 let array = Decimal128Array::from(vec![num_cols as i128; num_rows]);
4058 fields.push(field);
4059 arrays.push(Arc::new(array) as Arc<dyn Array>);
4060 }
4061 let schema = Schema::new(fields);
4062 let batch = RecordBatch::try_new(Arc::new(schema), arrays).unwrap();
4063
4064 let mut writer = FileWriter::try_new_with_options(
4065 Vec::new(),
4066 batch.schema_ref(),
4067 IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
4068 )
4069 .unwrap();
4070 writer.write(&batch).unwrap();
4071 writer.finish().unwrap();
4072
4073 let out: Vec<u8> = writer.into_inner().unwrap();
4074
4075 let buffer = Buffer::from_vec(out);
4076 let trailer_start = buffer.len() - 10;
4077 let footer_len =
4078 read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
4079 let footer =
4080 root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
4081
4082 let schema = fb_to_schema(footer.schema().unwrap());
4083
4084 let decoder =
4087 FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true);
4088
4089 let batches = footer.recordBatches().unwrap();
4090
4091 let block = batches.get(0);
4092 let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
4093 let data = buffer.slice_with_length(block.offset() as _, block_len);
4094
4095 let batch2 = decoder.read_record_batch(block, &data).unwrap().unwrap();
4096
4097 assert_eq!(batch, batch2);
4098 }
4099 }
4100
4101 #[test]
4102 fn test_decimal128_alignment8_is_unaligned() {
4103 const IPC_ALIGNMENT: usize = 8;
4104
4105 let num_cols = 2;
4106 let num_rows = 1;
4107
4108 let mut fields = Vec::new();
4109 let mut arrays = Vec::new();
4110 for i in 0..num_cols {
4111 let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
4112 let array = Decimal128Array::from(vec![num_cols as i128; num_rows]);
4113 fields.push(field);
4114 arrays.push(Arc::new(array) as Arc<dyn Array>);
4115 }
4116 let schema = Schema::new(fields);
4117 let batch = RecordBatch::try_new(Arc::new(schema), arrays).unwrap();
4118
4119 let mut writer = FileWriter::try_new_with_options(
4120 Vec::new(),
4121 batch.schema_ref(),
4122 IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
4123 )
4124 .unwrap();
4125 writer.write(&batch).unwrap();
4126 writer.finish().unwrap();
4127
4128 let out: Vec<u8> = writer.into_inner().unwrap();
4129
4130 let buffer = Buffer::from_vec(out);
4131 let trailer_start = buffer.len() - 10;
4132 let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
4133 let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
4134 let schema = fb_to_schema(footer.schema().unwrap());
4135
4136 let decoder =
4139 FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true);
4140
4141 let batches = footer.recordBatches().unwrap();
4142
4143 let block = batches.get(0);
4144 let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
4145 let data = buffer.slice_with_length(block.offset() as _, block_len);
4146
4147 let result = decoder.read_record_batch(block, &data);
4148
4149 let error = result.unwrap_err();
4150 assert_eq!(
4151 error.to_string(),
4152 "Invalid argument error: Misaligned buffers[0] in array of type Decimal128(38, 10), \
4153 offset from expected alignment of 16 by 8"
4154 );
4155 }
4156
4157 #[test]
4158 fn test_flush() {
4159 let num_cols = 2;
4162 let mut fields = Vec::new();
4163 let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap();
4164 for i in 0..num_cols {
4165 let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
4166 fields.push(field);
4167 }
4168 let schema = Schema::new(fields);
4169 let inner_stream_writer = BufWriter::with_capacity(1024, Vec::new());
4170 let inner_file_writer = BufWriter::with_capacity(1024, Vec::new());
4171 let mut stream_writer =
4172 StreamWriter::try_new_with_options(inner_stream_writer, &schema, options.clone())
4173 .unwrap();
4174 let mut file_writer =
4175 FileWriter::try_new_with_options(inner_file_writer, &schema, options).unwrap();
4176
4177 let stream_bytes_written_on_new = stream_writer.get_ref().get_ref().len();
4178 let file_bytes_written_on_new = file_writer.get_ref().get_ref().len();
4179 stream_writer.flush().unwrap();
4180 file_writer.flush().unwrap();
4181 let stream_bytes_written_on_flush = stream_writer.get_ref().get_ref().len();
4182 let file_bytes_written_on_flush = file_writer.get_ref().get_ref().len();
4183 let stream_out = stream_writer.into_inner().unwrap().into_inner().unwrap();
4184 let expected_stream_flushed_bytes = stream_out.len() - 8;
4188 let expected_file_flushed_bytes = expected_stream_flushed_bytes + 8;
4191
4192 assert!(
4193 stream_bytes_written_on_new < stream_bytes_written_on_flush,
4194 "this test makes no sense if flush is not actually required"
4195 );
4196 assert!(
4197 file_bytes_written_on_new < file_bytes_written_on_flush,
4198 "this test makes no sense if flush is not actually required"
4199 );
4200 assert_eq!(stream_bytes_written_on_flush, expected_stream_flushed_bytes);
4201 assert_eq!(file_bytes_written_on_flush, expected_file_flushed_bytes);
4202 }
4203
4204 #[test]
4205 fn test_roundtrip_list_of_fixed_list() -> Result<(), ArrowError> {
4206 let l1_type =
4207 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, false)), 3);
4208 let l2_type = DataType::List(Arc::new(Field::new("item", l1_type.clone(), false)));
4209
4210 let l0_builder = Float32Builder::new();
4211 let l1_builder = FixedSizeListBuilder::new(l0_builder, 3).with_field(Arc::new(Field::new(
4212 "item",
4213 DataType::Float32,
4214 false,
4215 )));
4216 let mut l2_builder =
4217 ListBuilder::new(l1_builder).with_field(Arc::new(Field::new("item", l1_type, false)));
4218
4219 for point in [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0], [7.0, 8.0, 9.0]] {
4220 l2_builder.values().values().append_value(point[0]);
4221 l2_builder.values().values().append_value(point[1]);
4222 l2_builder.values().values().append_value(point[2]);
4223
4224 l2_builder.values().append(true);
4225 }
4226 l2_builder.append(true);
4227
4228 let point = [10., 11., 12.];
4229 l2_builder.values().values().append_value(point[0]);
4230 l2_builder.values().values().append_value(point[1]);
4231 l2_builder.values().values().append_value(point[2]);
4232
4233 l2_builder.values().append(true);
4234 l2_builder.append(true);
4235
4236 let array = Arc::new(l2_builder.finish()) as ArrayRef;
4237
4238 let schema = Arc::new(Schema::new_with_metadata(
4239 vec![Field::new("points", l2_type, false)],
4240 HashMap::default(),
4241 ));
4242
4243 test_slices(&array, &schema, 0, 1)?;
4246 test_slices(&array, &schema, 0, 2)?;
4247 test_slices(&array, &schema, 1, 1)?;
4248
4249 Ok(())
4250 }
4251
4252 #[test]
4253 fn test_roundtrip_list_of_fixed_list_w_nulls() -> Result<(), ArrowError> {
4254 let l0_builder = Float32Builder::new();
4255 let l1_builder = FixedSizeListBuilder::new(l0_builder, 3);
4256 let mut l2_builder = ListBuilder::new(l1_builder);
4257
4258 for point in [
4259 [Some(1.0), Some(2.0), None],
4260 [Some(4.0), Some(5.0), Some(6.0)],
4261 [None, Some(8.0), Some(9.0)],
4262 ] {
4263 for p in point {
4264 match p {
4265 Some(p) => l2_builder.values().values().append_value(p),
4266 None => l2_builder.values().values().append_null(),
4267 }
4268 }
4269
4270 l2_builder.values().append(true);
4271 }
4272 l2_builder.append(true);
4273
4274 let point = [Some(10.), None, None];
4275 for p in point {
4276 match p {
4277 Some(p) => l2_builder.values().values().append_value(p),
4278 None => l2_builder.values().values().append_null(),
4279 }
4280 }
4281
4282 l2_builder.values().append(true);
4283 l2_builder.append(true);
4284
4285 let array = Arc::new(l2_builder.finish()) as ArrayRef;
4286
4287 let schema = Arc::new(Schema::new_with_metadata(
4288 vec![Field::new(
4289 "points",
4290 DataType::List(Arc::new(Field::new(
4291 "item",
4292 DataType::FixedSizeList(
4293 Arc::new(Field::new("item", DataType::Float32, true)),
4294 3,
4295 ),
4296 true,
4297 ))),
4298 true,
4299 )],
4300 HashMap::default(),
4301 ));
4302
4303 test_slices(&array, &schema, 0, 1)?;
4306 test_slices(&array, &schema, 0, 2)?;
4307 test_slices(&array, &schema, 1, 1)?;
4308
4309 Ok(())
4310 }
4311
4312 fn test_slices(
4313 parent_array: &ArrayRef,
4314 schema: &SchemaRef,
4315 offset: usize,
4316 length: usize,
4317 ) -> Result<(), ArrowError> {
4318 let subarray = parent_array.slice(offset, length);
4319 let original_batch = RecordBatch::try_new(schema.clone(), vec![subarray])?;
4320
4321 let mut bytes = Vec::new();
4322 let mut writer = StreamWriter::try_new(&mut bytes, schema)?;
4323 writer.write(&original_batch)?;
4324 writer.finish()?;
4325
4326 let mut cursor = std::io::Cursor::new(bytes);
4327 let mut reader = StreamReader::try_new(&mut cursor, None)?;
4328 let returned_batch = reader.next().unwrap()?;
4329
4330 assert_eq!(original_batch, returned_batch);
4331
4332 Ok(())
4333 }
4334
4335 #[test]
4336 fn test_roundtrip_fixed_list() -> Result<(), ArrowError> {
4337 let int_builder = Int64Builder::new();
4338 let mut fixed_list_builder = FixedSizeListBuilder::new(int_builder, 3)
4339 .with_field(Arc::new(Field::new("item", DataType::Int64, false)));
4340
4341 for point in [[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]] {
4342 fixed_list_builder.values().append_value(point[0]);
4343 fixed_list_builder.values().append_value(point[1]);
4344 fixed_list_builder.values().append_value(point[2]);
4345
4346 fixed_list_builder.append(true);
4347 }
4348
4349 let array = Arc::new(fixed_list_builder.finish()) as ArrayRef;
4350
4351 let schema = Arc::new(Schema::new_with_metadata(
4352 vec![Field::new(
4353 "points",
4354 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, false)), 3),
4355 false,
4356 )],
4357 HashMap::default(),
4358 ));
4359
4360 test_slices(&array, &schema, 0, 4)?;
4363 test_slices(&array, &schema, 0, 2)?;
4364 test_slices(&array, &schema, 1, 3)?;
4365 test_slices(&array, &schema, 2, 1)?;
4366
4367 Ok(())
4368 }
4369
4370 #[test]
4371 fn test_roundtrip_fixed_list_w_nulls() -> Result<(), ArrowError> {
4372 let int_builder = Int64Builder::new();
4373 let mut fixed_list_builder = FixedSizeListBuilder::new(int_builder, 3);
4374
4375 for point in [
4376 [Some(1), Some(2), None],
4377 [Some(4), Some(5), Some(6)],
4378 [None, Some(8), Some(9)],
4379 [Some(10), None, None],
4380 ] {
4381 for p in point {
4382 match p {
4383 Some(p) => fixed_list_builder.values().append_value(p),
4384 None => fixed_list_builder.values().append_null(),
4385 }
4386 }
4387
4388 fixed_list_builder.append(true);
4389 }
4390
4391 let array = Arc::new(fixed_list_builder.finish()) as ArrayRef;
4392
4393 let schema = Arc::new(Schema::new_with_metadata(
4394 vec![Field::new(
4395 "points",
4396 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, true)), 3),
4397 true,
4398 )],
4399 HashMap::default(),
4400 ));
4401
4402 test_slices(&array, &schema, 0, 4)?;
4405 test_slices(&array, &schema, 0, 2)?;
4406 test_slices(&array, &schema, 1, 3)?;
4407 test_slices(&array, &schema, 2, 1)?;
4408
4409 Ok(())
4410 }
4411
4412 #[test]
4413 fn test_metadata_encoding_ordering() {
4414 fn create_hash() -> u64 {
4415 let metadata: HashMap<String, String> = [
4416 ("a", "1"), ("b", "2"), ("c", "3"), ("d", "4"), ("e", "5"), ]
4422 .into_iter()
4423 .map(|(k, v)| (k.to_owned(), v.to_owned()))
4424 .collect();
4425
4426 let schema = Arc::new(
4428 Schema::new(vec![
4429 Field::new("a", DataType::Int64, true).with_metadata(metadata.clone()),
4430 ])
4431 .with_metadata(metadata)
4432 .clone(),
4433 );
4434 let batch = RecordBatch::new_empty(schema.clone());
4435
4436 let mut bytes = Vec::new();
4437 let mut w = StreamWriter::try_new(&mut bytes, batch.schema_ref()).unwrap();
4438 w.write(&batch).unwrap();
4439 w.finish().unwrap();
4440
4441 let mut h = std::hash::DefaultHasher::new();
4442 h.write(&bytes);
4443 h.finish()
4444 }
4445
4446 let expected = create_hash();
4447
4448 let all_passed = (0..20).all(|_| create_hash() == expected);
4453 assert!(all_passed);
4454 }
4455
4456 #[test]
4457 fn test_dictionary_tracker_reset() {
4458 let data_gen = IpcDataGenerator::default();
4459 let mut dictionary_tracker = DictionaryTracker::new(false);
4460 let writer_options = IpcWriteOptions::default();
4461 let mut compression_ctx = CompressionContext::default();
4462
4463 let schema = Arc::new(Schema::new(vec![Field::new(
4464 "a",
4465 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
4466 false,
4467 )]));
4468
4469 let mut write_single_batch_stream =
4470 |batch: RecordBatch, dict_tracker: &mut DictionaryTracker| -> Vec<u8> {
4471 let mut buffer = Vec::new();
4472
4473 let stream_header = data_gen.schema_to_bytes_with_dictionary_tracker(
4475 &schema,
4476 dict_tracker,
4477 &writer_options,
4478 );
4479 _ = write_message(&mut buffer, stream_header, &writer_options).unwrap();
4480
4481 let (encoded_dicts, encoded_batch) = data_gen
4482 .encode(&batch, dict_tracker, &writer_options, &mut compression_ctx)
4483 .unwrap();
4484 for encoded_dict in encoded_dicts {
4485 _ = write_message(&mut buffer, encoded_dict, &writer_options).unwrap();
4486 }
4487 _ = write_message(&mut buffer, encoded_batch, &writer_options).unwrap();
4488
4489 buffer
4490 };
4491
4492 let batch1 = RecordBatch::try_new(
4493 schema.clone(),
4494 vec![Arc::new(DictionaryArray::new(
4495 UInt8Array::from_iter_values([0]),
4496 Arc::new(StringArray::from_iter_values(["a"])),
4497 ))],
4498 )
4499 .unwrap();
4500 let buffer = write_single_batch_stream(batch1.clone(), &mut dictionary_tracker);
4501
4502 let mut reader = StreamReader::try_new(Cursor::new(buffer), None).unwrap();
4504 let read_batch = reader.next().unwrap().unwrap();
4505 assert_eq!(read_batch, batch1);
4506
4507 dictionary_tracker.clear();
4509
4510 let batch2 = RecordBatch::try_new(
4512 schema.clone(),
4513 vec![Arc::new(DictionaryArray::new(
4514 UInt8Array::from_iter_values([0]),
4515 Arc::new(StringArray::from_iter_values(["a"])),
4516 ))],
4517 )
4518 .unwrap();
4519 let buffer = write_single_batch_stream(batch2.clone(), &mut dictionary_tracker);
4520 let mut reader = StreamReader::try_new(Cursor::new(buffer), None).unwrap();
4521 let read_batch = reader.next().unwrap().unwrap();
4522 assert_eq!(read_batch, batch2);
4523 }
4524}