1use std::cmp::min;
28use std::collections::HashMap;
29use std::io::{BufWriter, Write};
30use std::mem::size_of;
31use std::sync::Arc;
32
33use flatbuffers::FlatBufferBuilder;
34
35use arrow_array::builder::BufferBuilder;
36use arrow_array::cast::*;
37use arrow_array::types::{Int16Type, Int32Type, Int64Type, RunEndIndexType};
38use arrow_array::*;
39use arrow_buffer::bit_util;
40use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer};
41use arrow_data::{ArrayData, ArrayDataBuilder, BufferSpec, layout};
42use arrow_schema::*;
43
44use crate::CONTINUATION_MARKER;
45use crate::compression::CompressionCodec;
46pub use crate::compression::CompressionContext;
47use crate::convert::IpcSchemaEncoder;
48
49#[derive(Debug, Clone)]
51pub struct IpcWriteOptions {
52 alignment: u8,
55 write_legacy_ipc_format: bool,
57 metadata_version: crate::MetadataVersion,
66 batch_compression_type: Option<crate::CompressionType>,
69 dictionary_handling: DictionaryHandling,
71}
72
73impl IpcWriteOptions {
74 pub fn try_with_compression(
79 mut self,
80 batch_compression_type: Option<crate::CompressionType>,
81 ) -> Result<Self, ArrowError> {
82 self.batch_compression_type = batch_compression_type;
83
84 if self.batch_compression_type.is_some()
85 && self.metadata_version < crate::MetadataVersion::V5
86 {
87 return Err(ArrowError::InvalidArgumentError(
88 "Compression only supported in metadata v5 and above".to_string(),
89 ));
90 }
91 Ok(self)
92 }
93 pub fn try_new(
95 alignment: usize,
96 write_legacy_ipc_format: bool,
97 metadata_version: crate::MetadataVersion,
98 ) -> Result<Self, ArrowError> {
99 let is_alignment_valid =
100 alignment == 8 || alignment == 16 || alignment == 32 || alignment == 64;
101 if !is_alignment_valid {
102 return Err(ArrowError::InvalidArgumentError(
103 "Alignment should be 8, 16, 32, or 64.".to_string(),
104 ));
105 }
106 let alignment: u8 = u8::try_from(alignment).expect("range already checked");
107 match metadata_version {
108 crate::MetadataVersion::V1
109 | crate::MetadataVersion::V2
110 | crate::MetadataVersion::V3 => Err(ArrowError::InvalidArgumentError(
111 "Writing IPC metadata version 3 and lower not supported".to_string(),
112 )),
113 #[allow(deprecated)]
114 crate::MetadataVersion::V4 => Ok(Self {
115 alignment,
116 write_legacy_ipc_format,
117 metadata_version,
118 batch_compression_type: None,
119 dictionary_handling: DictionaryHandling::default(),
120 }),
121 crate::MetadataVersion::V5 => {
122 if write_legacy_ipc_format {
123 Err(ArrowError::InvalidArgumentError(
124 "Legacy IPC format only supported on metadata version 4".to_string(),
125 ))
126 } else {
127 Ok(Self {
128 alignment,
129 write_legacy_ipc_format,
130 metadata_version,
131 batch_compression_type: None,
132 dictionary_handling: DictionaryHandling::default(),
133 })
134 }
135 }
136 z => Err(ArrowError::InvalidArgumentError(format!(
137 "Unsupported crate::MetadataVersion {z:?}"
138 ))),
139 }
140 }
141
142 pub fn with_dictionary_handling(mut self, dictionary_handling: DictionaryHandling) -> Self {
144 self.dictionary_handling = dictionary_handling;
145 self
146 }
147}
148
149impl Default for IpcWriteOptions {
150 fn default() -> Self {
151 Self {
152 alignment: 64,
153 write_legacy_ipc_format: false,
154 metadata_version: crate::MetadataVersion::V5,
155 batch_compression_type: None,
156 dictionary_handling: DictionaryHandling::default(),
157 }
158 }
159}
160
161#[derive(Debug, Default)]
162pub struct IpcDataGenerator {}
196
197impl IpcDataGenerator {
198 pub fn schema_to_bytes_with_dictionary_tracker(
201 &self,
202 schema: &Schema,
203 dictionary_tracker: &mut DictionaryTracker,
204 write_options: &IpcWriteOptions,
205 ) -> EncodedData {
206 let mut fbb = FlatBufferBuilder::new();
207 let schema = {
208 let fb = IpcSchemaEncoder::new()
209 .with_dictionary_tracker(dictionary_tracker)
210 .schema_to_fb_offset(&mut fbb, schema);
211 fb.as_union_value()
212 };
213
214 let mut message = crate::MessageBuilder::new(&mut fbb);
215 message.add_version(write_options.metadata_version);
216 message.add_header_type(crate::MessageHeader::Schema);
217 message.add_bodyLength(0);
218 message.add_header(schema);
219 let data = message.finish();
221 fbb.finish(data, None);
222
223 let data = fbb.finished_data();
224 EncodedData {
225 ipc_message: data.to_vec(),
226 arrow_data: vec![],
227 }
228 }
229
230 fn _encode_dictionaries<I: Iterator<Item = i64>>(
231 &self,
232 column: &ArrayRef,
233 encoded_dictionaries: &mut Vec<EncodedData>,
234 dictionary_tracker: &mut DictionaryTracker,
235 write_options: &IpcWriteOptions,
236 dict_id: &mut I,
237 compression_context: &mut CompressionContext,
238 ) -> Result<(), ArrowError> {
239 match column.data_type() {
240 DataType::Struct(fields) => {
241 let s = as_struct_array(column);
242 for (field, column) in fields.iter().zip(s.columns()) {
243 self.encode_dictionaries(
244 field,
245 column,
246 encoded_dictionaries,
247 dictionary_tracker,
248 write_options,
249 dict_id,
250 compression_context,
251 )?;
252 }
253 }
254 DataType::RunEndEncoded(_, values) => {
255 let data = column.to_data();
256 if data.child_data().len() != 2 {
257 return Err(ArrowError::InvalidArgumentError(format!(
258 "The run encoded array should have exactly two child arrays. Found {}",
259 data.child_data().len()
260 )));
261 }
262 let values_array = make_array(data.child_data()[1].clone());
265 self.encode_dictionaries(
266 values,
267 &values_array,
268 encoded_dictionaries,
269 dictionary_tracker,
270 write_options,
271 dict_id,
272 compression_context,
273 )?;
274 }
275 DataType::List(field) => {
276 let list = as_list_array(column);
277 self.encode_dictionaries(
278 field,
279 list.values(),
280 encoded_dictionaries,
281 dictionary_tracker,
282 write_options,
283 dict_id,
284 compression_context,
285 )?;
286 }
287 DataType::LargeList(field) => {
288 let list = as_large_list_array(column);
289 self.encode_dictionaries(
290 field,
291 list.values(),
292 encoded_dictionaries,
293 dictionary_tracker,
294 write_options,
295 dict_id,
296 compression_context,
297 )?;
298 }
299 DataType::ListView(field) => {
300 let list = column.as_list_view::<i32>();
301 self.encode_dictionaries(
302 field,
303 list.values(),
304 encoded_dictionaries,
305 dictionary_tracker,
306 write_options,
307 dict_id,
308 compression_context,
309 )?;
310 }
311 DataType::LargeListView(field) => {
312 let list = column.as_list_view::<i64>();
313 self.encode_dictionaries(
314 field,
315 list.values(),
316 encoded_dictionaries,
317 dictionary_tracker,
318 write_options,
319 dict_id,
320 compression_context,
321 )?;
322 }
323 DataType::FixedSizeList(field, _) => {
324 let list = column
325 .as_any()
326 .downcast_ref::<FixedSizeListArray>()
327 .expect("Unable to downcast to fixed size list array");
328 self.encode_dictionaries(
329 field,
330 list.values(),
331 encoded_dictionaries,
332 dictionary_tracker,
333 write_options,
334 dict_id,
335 compression_context,
336 )?;
337 }
338 DataType::Map(field, _) => {
339 let map_array = as_map_array(column);
340
341 let (keys, values) = match field.data_type() {
342 DataType::Struct(fields) if fields.len() == 2 => (&fields[0], &fields[1]),
343 _ => panic!("Incorrect field data type {:?}", field.data_type()),
344 };
345
346 self.encode_dictionaries(
348 keys,
349 map_array.keys(),
350 encoded_dictionaries,
351 dictionary_tracker,
352 write_options,
353 dict_id,
354 compression_context,
355 )?;
356
357 self.encode_dictionaries(
359 values,
360 map_array.values(),
361 encoded_dictionaries,
362 dictionary_tracker,
363 write_options,
364 dict_id,
365 compression_context,
366 )?;
367 }
368 DataType::Union(fields, _) => {
369 let union = as_union_array(column);
370 for (type_id, field) in fields.iter() {
371 let column = union.child(type_id);
372 self.encode_dictionaries(
373 field,
374 column,
375 encoded_dictionaries,
376 dictionary_tracker,
377 write_options,
378 dict_id,
379 compression_context,
380 )?;
381 }
382 }
383 _ => (),
384 }
385
386 Ok(())
387 }
388
389 #[allow(clippy::too_many_arguments)]
390 fn encode_dictionaries<I: Iterator<Item = i64>>(
391 &self,
392 field: &Field,
393 column: &ArrayRef,
394 encoded_dictionaries: &mut Vec<EncodedData>,
395 dictionary_tracker: &mut DictionaryTracker,
396 write_options: &IpcWriteOptions,
397 dict_id_seq: &mut I,
398 compression_context: &mut CompressionContext,
399 ) -> Result<(), ArrowError> {
400 match column.data_type() {
401 DataType::Dictionary(_key_type, _value_type) => {
402 let dict_data = column.to_data();
403 let dict_values = &dict_data.child_data()[0];
404
405 let values = make_array(dict_data.child_data()[0].clone());
406
407 self._encode_dictionaries(
408 &values,
409 encoded_dictionaries,
410 dictionary_tracker,
411 write_options,
412 dict_id_seq,
413 compression_context,
414 )?;
415
416 let dict_id = dict_id_seq.next().ok_or_else(|| {
420 ArrowError::IpcError(format!("no dict id for field {}", field.name()))
421 })?;
422
423 match dictionary_tracker.insert_column(
424 dict_id,
425 column,
426 write_options.dictionary_handling,
427 )? {
428 DictionaryUpdate::None => {}
429 DictionaryUpdate::New | DictionaryUpdate::Replaced => {
430 encoded_dictionaries.push(self.dictionary_batch_to_bytes(
431 dict_id,
432 dict_values,
433 write_options,
434 false,
435 compression_context,
436 )?);
437 }
438 DictionaryUpdate::Delta(data) => {
439 encoded_dictionaries.push(self.dictionary_batch_to_bytes(
440 dict_id,
441 &data,
442 write_options,
443 true,
444 compression_context,
445 )?);
446 }
447 }
448 }
449 _ => self._encode_dictionaries(
450 column,
451 encoded_dictionaries,
452 dictionary_tracker,
453 write_options,
454 dict_id_seq,
455 compression_context,
456 )?,
457 }
458
459 Ok(())
460 }
461
462 pub fn encode(
466 &self,
467 batch: &RecordBatch,
468 dictionary_tracker: &mut DictionaryTracker,
469 write_options: &IpcWriteOptions,
470 compression_context: &mut CompressionContext,
471 ) -> Result<(Vec<EncodedData>, EncodedData), ArrowError> {
472 let schema = batch.schema();
473 let mut encoded_dictionaries = Vec::with_capacity(schema.flattened_fields().len());
474
475 let mut dict_id = dictionary_tracker.dict_ids.clone().into_iter();
476
477 for (i, field) in schema.fields().iter().enumerate() {
478 let column = batch.column(i);
479 self.encode_dictionaries(
480 field,
481 column,
482 &mut encoded_dictionaries,
483 dictionary_tracker,
484 write_options,
485 &mut dict_id,
486 compression_context,
487 )?;
488 }
489
490 let encoded_message =
491 self.record_batch_to_bytes(batch, write_options, compression_context)?;
492 Ok((encoded_dictionaries, encoded_message))
493 }
494
495 #[deprecated(since = "57.0.0", note = "Use `encode` instead")]
499 pub fn encoded_batch(
500 &self,
501 batch: &RecordBatch,
502 dictionary_tracker: &mut DictionaryTracker,
503 write_options: &IpcWriteOptions,
504 ) -> Result<(Vec<EncodedData>, EncodedData), ArrowError> {
505 self.encode(
506 batch,
507 dictionary_tracker,
508 write_options,
509 &mut Default::default(),
510 )
511 }
512
513 fn record_batch_to_bytes(
516 &self,
517 batch: &RecordBatch,
518 write_options: &IpcWriteOptions,
519 compression_context: &mut CompressionContext,
520 ) -> Result<EncodedData, ArrowError> {
521 let mut fbb = FlatBufferBuilder::new();
522
523 let mut nodes: Vec<crate::FieldNode> = vec![];
524 let mut buffers: Vec<crate::Buffer> = vec![];
525 let mut arrow_data: Vec<u8> = vec![];
526 let mut offset = 0;
527
528 let batch_compression_type = write_options.batch_compression_type;
530
531 let compression = batch_compression_type.map(|batch_compression_type| {
532 let mut c = crate::BodyCompressionBuilder::new(&mut fbb);
533 c.add_method(crate::BodyCompressionMethod::BUFFER);
534 c.add_codec(batch_compression_type);
535 c.finish()
536 });
537
538 let compression_codec: Option<CompressionCodec> =
539 batch_compression_type.map(TryInto::try_into).transpose()?;
540
541 let mut variadic_buffer_counts = vec![];
542
543 for array in batch.columns() {
544 let array_data = array.to_data();
545 offset = write_array_data(
546 &array_data,
547 &mut buffers,
548 &mut arrow_data,
549 &mut nodes,
550 offset,
551 array.len(),
552 array.null_count(),
553 compression_codec,
554 compression_context,
555 write_options,
556 )?;
557
558 append_variadic_buffer_counts(&mut variadic_buffer_counts, &array_data);
559 }
560 let len = arrow_data.len();
562 let pad_len = pad_to_alignment(write_options.alignment, len);
563 arrow_data.extend_from_slice(&PADDING[..pad_len]);
564
565 let buffers = fbb.create_vector(&buffers);
567 let nodes = fbb.create_vector(&nodes);
568 let variadic_buffer = if variadic_buffer_counts.is_empty() {
569 None
570 } else {
571 Some(fbb.create_vector(&variadic_buffer_counts))
572 };
573
574 let root = {
575 let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb);
576 batch_builder.add_length(batch.num_rows() as i64);
577 batch_builder.add_nodes(nodes);
578 batch_builder.add_buffers(buffers);
579 if let Some(c) = compression {
580 batch_builder.add_compression(c);
581 }
582
583 if let Some(v) = variadic_buffer {
584 batch_builder.add_variadicBufferCounts(v);
585 }
586 let b = batch_builder.finish();
587 b.as_union_value()
588 };
589 let mut message = crate::MessageBuilder::new(&mut fbb);
591 message.add_version(write_options.metadata_version);
592 message.add_header_type(crate::MessageHeader::RecordBatch);
593 message.add_bodyLength(arrow_data.len() as i64);
594 message.add_header(root);
595 let root = message.finish();
596 fbb.finish(root, None);
597 let finished_data = fbb.finished_data();
598
599 Ok(EncodedData {
600 ipc_message: finished_data.to_vec(),
601 arrow_data,
602 })
603 }
604
605 fn dictionary_batch_to_bytes(
608 &self,
609 dict_id: i64,
610 array_data: &ArrayData,
611 write_options: &IpcWriteOptions,
612 is_delta: bool,
613 compression_context: &mut CompressionContext,
614 ) -> Result<EncodedData, ArrowError> {
615 let mut fbb = FlatBufferBuilder::new();
616
617 let mut nodes: Vec<crate::FieldNode> = vec![];
618 let mut buffers: Vec<crate::Buffer> = vec![];
619 let mut arrow_data: Vec<u8> = vec![];
620
621 let batch_compression_type = write_options.batch_compression_type;
623
624 let compression = batch_compression_type.map(|batch_compression_type| {
625 let mut c = crate::BodyCompressionBuilder::new(&mut fbb);
626 c.add_method(crate::BodyCompressionMethod::BUFFER);
627 c.add_codec(batch_compression_type);
628 c.finish()
629 });
630
631 let compression_codec: Option<CompressionCodec> = batch_compression_type
632 .map(|batch_compression_type| batch_compression_type.try_into())
633 .transpose()?;
634
635 write_array_data(
636 array_data,
637 &mut buffers,
638 &mut arrow_data,
639 &mut nodes,
640 0,
641 array_data.len(),
642 array_data.null_count(),
643 compression_codec,
644 compression_context,
645 write_options,
646 )?;
647
648 let mut variadic_buffer_counts = vec![];
649 append_variadic_buffer_counts(&mut variadic_buffer_counts, array_data);
650
651 let len = arrow_data.len();
653 let pad_len = pad_to_alignment(write_options.alignment, len);
654 arrow_data.extend_from_slice(&PADDING[..pad_len]);
655
656 let buffers = fbb.create_vector(&buffers);
658 let nodes = fbb.create_vector(&nodes);
659 let variadic_buffer = if variadic_buffer_counts.is_empty() {
660 None
661 } else {
662 Some(fbb.create_vector(&variadic_buffer_counts))
663 };
664
665 let root = {
666 let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb);
667 batch_builder.add_length(array_data.len() as i64);
668 batch_builder.add_nodes(nodes);
669 batch_builder.add_buffers(buffers);
670 if let Some(c) = compression {
671 batch_builder.add_compression(c);
672 }
673 if let Some(v) = variadic_buffer {
674 batch_builder.add_variadicBufferCounts(v);
675 }
676 batch_builder.finish()
677 };
678
679 let root = {
680 let mut batch_builder = crate::DictionaryBatchBuilder::new(&mut fbb);
681 batch_builder.add_id(dict_id);
682 batch_builder.add_data(root);
683 batch_builder.add_isDelta(is_delta);
684 batch_builder.finish().as_union_value()
685 };
686
687 let root = {
688 let mut message_builder = crate::MessageBuilder::new(&mut fbb);
689 message_builder.add_version(write_options.metadata_version);
690 message_builder.add_header_type(crate::MessageHeader::DictionaryBatch);
691 message_builder.add_bodyLength(arrow_data.len() as i64);
692 message_builder.add_header(root);
693 message_builder.finish()
694 };
695
696 fbb.finish(root, None);
697 let finished_data = fbb.finished_data();
698
699 Ok(EncodedData {
700 ipc_message: finished_data.to_vec(),
701 arrow_data,
702 })
703 }
704}
705
706fn append_variadic_buffer_counts(counts: &mut Vec<i64>, array: &ArrayData) {
707 match array.data_type() {
708 DataType::BinaryView | DataType::Utf8View => {
709 counts.push(array.buffers().len() as i64 - 1);
712 }
713 DataType::Dictionary(_, _) => {
714 }
717 _ => {
718 for child in array.child_data() {
719 append_variadic_buffer_counts(counts, child)
720 }
721 }
722 }
723}
724
725pub(crate) fn unslice_run_array(arr: ArrayData) -> Result<ArrayData, ArrowError> {
726 match arr.data_type() {
727 DataType::RunEndEncoded(k, _) => match k.data_type() {
728 DataType::Int16 => {
729 Ok(into_zero_offset_run_array(RunArray::<Int16Type>::from(arr))?.into_data())
730 }
731 DataType::Int32 => {
732 Ok(into_zero_offset_run_array(RunArray::<Int32Type>::from(arr))?.into_data())
733 }
734 DataType::Int64 => {
735 Ok(into_zero_offset_run_array(RunArray::<Int64Type>::from(arr))?.into_data())
736 }
737 d => unreachable!("Unexpected data type {d}"),
738 },
739 d => Err(ArrowError::InvalidArgumentError(format!(
740 "The given array is not a run array. Data type of given array: {d}"
741 ))),
742 }
743}
744
745fn into_zero_offset_run_array<R: RunEndIndexType>(
748 run_array: RunArray<R>,
749) -> Result<RunArray<R>, ArrowError> {
750 let run_ends = run_array.run_ends();
751 if run_ends.offset() == 0 && run_ends.max_value() == run_ends.len() {
752 return Ok(run_array);
753 }
754
755 let start_physical_index = run_ends.get_start_physical_index();
757
758 let end_physical_index = run_ends.get_end_physical_index();
760
761 let physical_length = end_physical_index - start_physical_index + 1;
762
763 let offset = R::Native::usize_as(run_ends.offset());
765 let mut builder = BufferBuilder::<R::Native>::new(physical_length);
766 for run_end_value in &run_ends.values()[start_physical_index..end_physical_index] {
767 builder.append(run_end_value.sub_wrapping(offset));
768 }
769 builder.append(R::Native::from_usize(run_array.len()).unwrap());
770 let new_run_ends = unsafe {
771 ArrayDataBuilder::new(R::DATA_TYPE)
774 .len(physical_length)
775 .add_buffer(builder.finish())
776 .build_unchecked()
777 };
778
779 let new_values = run_array
781 .values()
782 .slice(start_physical_index, physical_length)
783 .into_data();
784
785 let builder = ArrayDataBuilder::new(run_array.data_type().clone())
786 .len(run_array.len())
787 .add_child_data(new_run_ends)
788 .add_child_data(new_values);
789 let array_data = unsafe {
790 builder.build_unchecked()
793 };
794 Ok(array_data.into())
795}
796
797#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
799pub enum DictionaryHandling {
800 #[default]
802 Resend,
803 Delta,
809}
810
811#[derive(Debug, Clone)]
813pub enum DictionaryUpdate {
814 None,
817 New,
819 Replaced,
821 Delta(ArrayData),
823}
824
825#[derive(Debug)]
831pub struct DictionaryTracker {
832 written: HashMap<i64, ArrayData>,
834 dict_ids: Vec<i64>,
835 error_on_replacement: bool,
836}
837
838impl DictionaryTracker {
839 pub fn new(error_on_replacement: bool) -> Self {
845 #[allow(deprecated)]
846 Self {
847 written: HashMap::new(),
848 dict_ids: Vec::new(),
849 error_on_replacement,
850 }
851 }
852
853 pub fn next_dict_id(&mut self) -> i64 {
855 let next = self
856 .dict_ids
857 .last()
858 .copied()
859 .map(|i| i + 1)
860 .unwrap_or_default();
861
862 self.dict_ids.push(next);
863 next
864 }
865
866 pub fn dict_id(&mut self) -> &[i64] {
869 &self.dict_ids
870 }
871
872 #[deprecated(since = "56.1.0", note = "Use `insert_column` instead")]
882 pub fn insert(&mut self, dict_id: i64, column: &ArrayRef) -> Result<bool, ArrowError> {
883 let dict_data = column.to_data();
884 let dict_values = &dict_data.child_data()[0];
885
886 if let Some(last) = self.written.get(&dict_id) {
888 if ArrayData::ptr_eq(&last.child_data()[0], dict_values) {
889 return Ok(false);
891 }
892 if self.error_on_replacement {
893 if last.child_data()[0] == *dict_values {
895 return Ok(false);
897 }
898 return Err(ArrowError::InvalidArgumentError(
899 "Dictionary replacement detected when writing IPC file format. \
900 Arrow IPC files only support a single dictionary for a given field \
901 across all batches."
902 .to_string(),
903 ));
904 }
905 }
906
907 self.written.insert(dict_id, dict_data);
908 Ok(true)
909 }
910
911 pub fn insert_column(
927 &mut self,
928 dict_id: i64,
929 column: &ArrayRef,
930 dict_handling: DictionaryHandling,
931 ) -> Result<DictionaryUpdate, ArrowError> {
932 let new_data = column.to_data();
933 let new_values = &new_data.child_data()[0];
934
935 let Some(old) = self.written.get(&dict_id) else {
937 self.written.insert(dict_id, new_data);
938 return Ok(DictionaryUpdate::New);
939 };
940
941 let old_values = &old.child_data()[0];
944 if ArrayData::ptr_eq(old_values, new_values) {
945 return Ok(DictionaryUpdate::None);
946 }
947
948 let comparison = compare_dictionaries(old_values, new_values);
950 if matches!(comparison, DictionaryComparison::Equal) {
951 return Ok(DictionaryUpdate::None);
952 }
953
954 const REPLACEMENT_ERROR: &str = "Dictionary replacement detected when writing IPC file format. \
955 Arrow IPC files only support a single dictionary for a given field \
956 across all batches.";
957
958 match comparison {
959 DictionaryComparison::NotEqual => {
960 if self.error_on_replacement {
961 return Err(ArrowError::InvalidArgumentError(
962 REPLACEMENT_ERROR.to_string(),
963 ));
964 }
965
966 self.written.insert(dict_id, new_data);
967 Ok(DictionaryUpdate::Replaced)
968 }
969 DictionaryComparison::Delta => match dict_handling {
970 DictionaryHandling::Resend => {
971 if self.error_on_replacement {
972 return Err(ArrowError::InvalidArgumentError(
973 REPLACEMENT_ERROR.to_string(),
974 ));
975 }
976
977 self.written.insert(dict_id, new_data);
978 Ok(DictionaryUpdate::Replaced)
979 }
980 DictionaryHandling::Delta => {
981 let delta =
982 new_values.slice(old_values.len(), new_values.len() - old_values.len());
983 self.written.insert(dict_id, new_data);
984 Ok(DictionaryUpdate::Delta(delta))
985 }
986 },
987 DictionaryComparison::Equal => unreachable!("Already checked equal case"),
988 }
989 }
990
991 pub fn clear(&mut self) {
997 self.dict_ids.clear();
998 self.written.clear();
999 }
1000}
1001
1002#[derive(Debug, Clone)]
1004enum DictionaryComparison {
1005 NotEqual,
1007 Equal,
1009 Delta,
1012}
1013
1014fn compare_dictionaries(old: &ArrayData, new: &ArrayData) -> DictionaryComparison {
1016 let existing_len = old.len();
1018 let new_len = new.len();
1019 if existing_len == new_len {
1020 if *old == *new {
1021 return DictionaryComparison::Equal;
1022 } else {
1023 return DictionaryComparison::NotEqual;
1024 }
1025 }
1026
1027 if new_len < existing_len {
1029 return DictionaryComparison::NotEqual;
1030 }
1031
1032 if new.slice(0, existing_len) == *old {
1034 return DictionaryComparison::Delta;
1035 }
1036
1037 DictionaryComparison::NotEqual
1038}
1039
1040pub struct FileWriter<W> {
1063 writer: W,
1065 write_options: IpcWriteOptions,
1067 schema: SchemaRef,
1069 block_offsets: usize,
1071 dictionary_blocks: Vec<crate::Block>,
1073 record_blocks: Vec<crate::Block>,
1075 finished: bool,
1077 dictionary_tracker: DictionaryTracker,
1079 custom_metadata: HashMap<String, String>,
1081
1082 data_gen: IpcDataGenerator,
1083
1084 compression_context: CompressionContext,
1085}
1086
1087impl<W: Write> FileWriter<BufWriter<W>> {
1088 pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1092 Self::try_new(BufWriter::new(writer), schema)
1093 }
1094}
1095
1096impl<W: Write> FileWriter<W> {
1097 pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1105 let write_options = IpcWriteOptions::default();
1106 Self::try_new_with_options(writer, schema, write_options)
1107 }
1108
1109 pub fn try_new_with_options(
1117 mut writer: W,
1118 schema: &Schema,
1119 write_options: IpcWriteOptions,
1120 ) -> Result<Self, ArrowError> {
1121 let data_gen = IpcDataGenerator::default();
1122 let pad_len = pad_to_alignment(write_options.alignment, super::ARROW_MAGIC.len());
1124 let header_size = super::ARROW_MAGIC.len() + pad_len;
1125 writer.write_all(&super::ARROW_MAGIC)?;
1126 writer.write_all(&PADDING[..pad_len])?;
1127 let mut dictionary_tracker = DictionaryTracker::new(true);
1129 let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker(
1130 schema,
1131 &mut dictionary_tracker,
1132 &write_options,
1133 );
1134 let (meta, data) = write_message(&mut writer, encoded_message, &write_options)?;
1135 Ok(Self {
1136 writer,
1137 write_options,
1138 schema: Arc::new(schema.clone()),
1139 block_offsets: meta + data + header_size,
1140 dictionary_blocks: vec![],
1141 record_blocks: vec![],
1142 finished: false,
1143 dictionary_tracker,
1144 custom_metadata: HashMap::new(),
1145 data_gen,
1146 compression_context: CompressionContext::default(),
1147 })
1148 }
1149
1150 pub fn write_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
1152 self.custom_metadata.insert(key.into(), value.into());
1153 }
1154
1155 pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1157 if self.finished {
1158 return Err(ArrowError::IpcError(
1159 "Cannot write record batch to file writer as it is closed".to_string(),
1160 ));
1161 }
1162
1163 let (encoded_dictionaries, encoded_message) = self.data_gen.encode(
1164 batch,
1165 &mut self.dictionary_tracker,
1166 &self.write_options,
1167 &mut self.compression_context,
1168 )?;
1169
1170 for encoded_dictionary in encoded_dictionaries {
1171 let (meta, data) =
1172 write_message(&mut self.writer, encoded_dictionary, &self.write_options)?;
1173
1174 let block = crate::Block::new(self.block_offsets as i64, meta as i32, data as i64);
1175 self.dictionary_blocks.push(block);
1176 self.block_offsets += meta + data;
1177 }
1178
1179 let (meta, data) = write_message(&mut self.writer, encoded_message, &self.write_options)?;
1180
1181 let block = crate::Block::new(
1183 self.block_offsets as i64,
1184 meta as i32, data as i64,
1186 );
1187 self.record_blocks.push(block);
1188 self.block_offsets += meta + data;
1189 Ok(())
1190 }
1191
1192 pub fn finish(&mut self) -> Result<(), ArrowError> {
1194 if self.finished {
1195 return Err(ArrowError::IpcError(
1196 "Cannot write footer to file writer as it is closed".to_string(),
1197 ));
1198 }
1199
1200 write_continuation(&mut self.writer, &self.write_options, 0)?;
1202
1203 let mut fbb = FlatBufferBuilder::new();
1204 let dictionaries = fbb.create_vector(&self.dictionary_blocks);
1205 let record_batches = fbb.create_vector(&self.record_blocks);
1206
1207 self.dictionary_tracker.clear();
1209 let schema = IpcSchemaEncoder::new()
1210 .with_dictionary_tracker(&mut self.dictionary_tracker)
1211 .schema_to_fb_offset(&mut fbb, &self.schema);
1212 let fb_custom_metadata = (!self.custom_metadata.is_empty())
1213 .then(|| crate::convert::metadata_to_fb(&mut fbb, &self.custom_metadata));
1214
1215 let root = {
1216 let mut footer_builder = crate::FooterBuilder::new(&mut fbb);
1217 footer_builder.add_version(self.write_options.metadata_version);
1218 footer_builder.add_schema(schema);
1219 footer_builder.add_dictionaries(dictionaries);
1220 footer_builder.add_recordBatches(record_batches);
1221 if let Some(fb_custom_metadata) = fb_custom_metadata {
1222 footer_builder.add_custom_metadata(fb_custom_metadata);
1223 }
1224 footer_builder.finish()
1225 };
1226 fbb.finish(root, None);
1227 let footer_data = fbb.finished_data();
1228 self.writer.write_all(footer_data)?;
1229 self.writer
1230 .write_all(&(footer_data.len() as i32).to_le_bytes())?;
1231 self.writer.write_all(&super::ARROW_MAGIC)?;
1232 self.writer.flush()?;
1233 self.finished = true;
1234
1235 Ok(())
1236 }
1237
1238 pub fn schema(&self) -> &SchemaRef {
1240 &self.schema
1241 }
1242
1243 pub fn get_ref(&self) -> &W {
1245 &self.writer
1246 }
1247
1248 pub fn get_mut(&mut self) -> &mut W {
1252 &mut self.writer
1253 }
1254
1255 pub fn flush(&mut self) -> Result<(), ArrowError> {
1259 self.writer.flush()?;
1260 Ok(())
1261 }
1262
1263 pub fn into_inner(mut self) -> Result<W, ArrowError> {
1272 if !self.finished {
1273 self.finish()?;
1275 }
1276 Ok(self.writer)
1277 }
1278}
1279
1280impl<W: Write> RecordBatchWriter for FileWriter<W> {
1281 fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1282 self.write(batch)
1283 }
1284
1285 fn close(mut self) -> Result<(), ArrowError> {
1286 self.finish()
1287 }
1288}
1289
1290pub struct StreamWriter<W> {
1364 writer: W,
1366 write_options: IpcWriteOptions,
1368 finished: bool,
1370 dictionary_tracker: DictionaryTracker,
1372
1373 data_gen: IpcDataGenerator,
1374
1375 compression_context: CompressionContext,
1376}
1377
1378impl<W: Write> StreamWriter<BufWriter<W>> {
1379 pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1383 Self::try_new(BufWriter::new(writer), schema)
1384 }
1385}
1386
1387impl<W: Write> StreamWriter<W> {
1388 pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1396 let write_options = IpcWriteOptions::default();
1397 Self::try_new_with_options(writer, schema, write_options)
1398 }
1399
1400 pub fn try_new_with_options(
1406 mut writer: W,
1407 schema: &Schema,
1408 write_options: IpcWriteOptions,
1409 ) -> Result<Self, ArrowError> {
1410 let data_gen = IpcDataGenerator::default();
1411 let mut dictionary_tracker = DictionaryTracker::new(false);
1412
1413 let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker(
1415 schema,
1416 &mut dictionary_tracker,
1417 &write_options,
1418 );
1419 write_message(&mut writer, encoded_message, &write_options)?;
1420 Ok(Self {
1421 writer,
1422 write_options,
1423 finished: false,
1424 dictionary_tracker,
1425 data_gen,
1426 compression_context: CompressionContext::default(),
1427 })
1428 }
1429
1430 pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1432 if self.finished {
1433 return Err(ArrowError::IpcError(
1434 "Cannot write record batch to stream writer as it is closed".to_string(),
1435 ));
1436 }
1437
1438 let (encoded_dictionaries, encoded_message) = self
1439 .data_gen
1440 .encode(
1441 batch,
1442 &mut self.dictionary_tracker,
1443 &self.write_options,
1444 &mut self.compression_context,
1445 )
1446 .expect("StreamWriter is configured to not error on dictionary replacement");
1447
1448 for encoded_dictionary in encoded_dictionaries {
1449 write_message(&mut self.writer, encoded_dictionary, &self.write_options)?;
1450 }
1451
1452 write_message(&mut self.writer, encoded_message, &self.write_options)?;
1453 Ok(())
1454 }
1455
1456 pub fn finish(&mut self) -> Result<(), ArrowError> {
1458 if self.finished {
1459 return Err(ArrowError::IpcError(
1460 "Cannot write footer to stream writer as it is closed".to_string(),
1461 ));
1462 }
1463
1464 write_continuation(&mut self.writer, &self.write_options, 0)?;
1465
1466 self.finished = true;
1467
1468 Ok(())
1469 }
1470
1471 pub fn get_ref(&self) -> &W {
1473 &self.writer
1474 }
1475
1476 pub fn get_mut(&mut self) -> &mut W {
1480 &mut self.writer
1481 }
1482
1483 pub fn flush(&mut self) -> Result<(), ArrowError> {
1487 self.writer.flush()?;
1488 Ok(())
1489 }
1490
1491 pub fn into_inner(mut self) -> Result<W, ArrowError> {
1529 if !self.finished {
1530 self.finish()?;
1532 }
1533 Ok(self.writer)
1534 }
1535}
1536
1537impl<W: Write> RecordBatchWriter for StreamWriter<W> {
1538 fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1539 self.write(batch)
1540 }
1541
1542 fn close(mut self) -> Result<(), ArrowError> {
1543 self.finish()
1544 }
1545}
1546
1547pub struct EncodedData {
1549 pub ipc_message: Vec<u8>,
1551 pub arrow_data: Vec<u8>,
1553}
1554pub fn write_message<W: Write>(
1556 mut writer: W,
1557 encoded: EncodedData,
1558 write_options: &IpcWriteOptions,
1559) -> Result<(usize, usize), ArrowError> {
1560 let arrow_data_len = encoded.arrow_data.len();
1561 if arrow_data_len % usize::from(write_options.alignment) != 0 {
1562 return Err(ArrowError::MemoryError(
1563 "Arrow data not aligned".to_string(),
1564 ));
1565 }
1566
1567 let a = usize::from(write_options.alignment - 1);
1568 let buffer = encoded.ipc_message;
1569 let flatbuf_size = buffer.len();
1570 let prefix_size = if write_options.write_legacy_ipc_format {
1571 4
1572 } else {
1573 8
1574 };
1575 let aligned_size = (flatbuf_size + prefix_size + a) & !a;
1576 let padding_bytes = aligned_size - flatbuf_size - prefix_size;
1577
1578 write_continuation(
1579 &mut writer,
1580 write_options,
1581 (aligned_size - prefix_size) as i32,
1582 )?;
1583
1584 if flatbuf_size > 0 {
1586 writer.write_all(&buffer)?;
1587 }
1588 writer.write_all(&PADDING[..padding_bytes])?;
1590
1591 let body_len = if arrow_data_len > 0 {
1593 write_body_buffers(&mut writer, &encoded.arrow_data, write_options.alignment)?
1594 } else {
1595 0
1596 };
1597
1598 Ok((aligned_size, body_len))
1599}
1600
1601fn write_body_buffers<W: Write>(
1602 mut writer: W,
1603 data: &[u8],
1604 alignment: u8,
1605) -> Result<usize, ArrowError> {
1606 let len = data.len();
1607 let pad_len = pad_to_alignment(alignment, len);
1608 let total_len = len + pad_len;
1609
1610 writer.write_all(data)?;
1612 if pad_len > 0 {
1613 writer.write_all(&PADDING[..pad_len])?;
1614 }
1615
1616 writer.flush()?;
1617 Ok(total_len)
1618}
1619
1620fn write_continuation<W: Write>(
1623 mut writer: W,
1624 write_options: &IpcWriteOptions,
1625 total_len: i32,
1626) -> Result<usize, ArrowError> {
1627 let mut written = 8;
1628
1629 match write_options.metadata_version {
1631 crate::MetadataVersion::V1 | crate::MetadataVersion::V2 | crate::MetadataVersion::V3 => {
1632 unreachable!("Options with the metadata version cannot be created")
1633 }
1634 crate::MetadataVersion::V4 => {
1635 if !write_options.write_legacy_ipc_format {
1636 writer.write_all(&CONTINUATION_MARKER)?;
1638 written = 4;
1639 }
1640 writer.write_all(&total_len.to_le_bytes()[..])?;
1641 }
1642 crate::MetadataVersion::V5 => {
1643 writer.write_all(&CONTINUATION_MARKER)?;
1645 writer.write_all(&total_len.to_le_bytes()[..])?;
1646 }
1647 z => panic!("Unsupported crate::MetadataVersion {z:?}"),
1648 };
1649
1650 writer.flush()?;
1651
1652 Ok(written)
1653}
1654
1655fn has_validity_bitmap(data_type: &DataType, write_options: &IpcWriteOptions) -> bool {
1659 if write_options.metadata_version < crate::MetadataVersion::V5 {
1660 !matches!(data_type, DataType::Null)
1661 } else {
1662 !matches!(
1663 data_type,
1664 DataType::Null | DataType::Union(_, _) | DataType::RunEndEncoded(_, _)
1665 )
1666 }
1667}
1668
1669#[inline]
1671fn buffer_need_truncate(
1672 array_offset: usize,
1673 buffer: &Buffer,
1674 spec: &BufferSpec,
1675 min_length: usize,
1676) -> bool {
1677 spec != &BufferSpec::AlwaysNull && (array_offset != 0 || min_length < buffer.len())
1678}
1679
1680#[inline]
1682fn get_buffer_element_width(spec: &BufferSpec) -> usize {
1683 match spec {
1684 BufferSpec::FixedWidth { byte_width, .. } => *byte_width,
1685 _ => 0,
1686 }
1687}
1688
1689fn reencode_offsets<O: OffsetSizeTrait>(
1692 offsets: &Buffer,
1693 data: &ArrayData,
1694) -> (Buffer, usize, usize) {
1695 let offsets_slice: &[O] = offsets.typed_data::<O>();
1696 let offset_slice = &offsets_slice[data.offset()..data.offset() + data.len() + 1];
1697
1698 let start_offset = offset_slice.first().unwrap();
1699 let end_offset = offset_slice.last().unwrap();
1700
1701 let offsets = match start_offset.as_usize() {
1702 0 => {
1703 let size = size_of::<O>();
1704 offsets.slice_with_length(data.offset() * size, (data.len() + 1) * size)
1705 }
1706 _ => offset_slice.iter().map(|x| *x - *start_offset).collect(),
1707 };
1708
1709 let start_offset = start_offset.as_usize();
1710 let end_offset = end_offset.as_usize();
1711
1712 (offsets, start_offset, end_offset - start_offset)
1713}
1714
1715fn get_byte_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, Buffer) {
1721 if data.is_empty() {
1722 return (MutableBuffer::new(0).into(), MutableBuffer::new(0).into());
1723 }
1724
1725 let (offsets, original_start_offset, len) = reencode_offsets::<O>(&data.buffers()[0], data);
1726 let values = data.buffers()[1].slice_with_length(original_start_offset, len);
1727 (offsets, values)
1728}
1729
1730fn get_list_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, ArrayData) {
1733 if data.is_empty() {
1734 return (
1735 MutableBuffer::new(0).into(),
1736 data.child_data()[0].slice(0, 0),
1737 );
1738 }
1739
1740 let (offsets, original_start_offset, len) = reencode_offsets::<O>(&data.buffers()[0], data);
1741 let child_data = data.child_data()[0].slice(original_start_offset, len);
1742 (offsets, child_data)
1743}
1744
1745fn get_list_view_array_buffers<O: OffsetSizeTrait>(
1751 data: &ArrayData,
1752) -> (Buffer, Buffer, ArrayData) {
1753 if data.is_empty() {
1754 return (
1755 MutableBuffer::new(0).into(),
1756 MutableBuffer::new(0).into(),
1757 data.child_data()[0].slice(0, 0),
1758 );
1759 }
1760
1761 let offsets = &data.buffers()[0];
1762 let sizes = &data.buffers()[1];
1763
1764 let element_size = std::mem::size_of::<O>();
1765 let offsets_slice =
1766 offsets.slice_with_length(data.offset() * element_size, data.len() * element_size);
1767 let sizes_slice =
1768 sizes.slice_with_length(data.offset() * element_size, data.len() * element_size);
1769
1770 let child_data = data.child_data()[0].clone();
1771
1772 (offsets_slice, sizes_slice, child_data)
1773}
1774
1775fn get_or_truncate_buffer(array_data: &ArrayData) -> &[u8] {
1782 let buffer = &array_data.buffers()[0];
1783 let layout = layout(array_data.data_type());
1784 let spec = &layout.buffers[0];
1785
1786 let byte_width = get_buffer_element_width(spec);
1787 let min_length = array_data.len() * byte_width;
1788 if buffer_need_truncate(array_data.offset(), buffer, spec, min_length) {
1789 let byte_offset = array_data.offset() * byte_width;
1790 let buffer_length = min(min_length, buffer.len() - byte_offset);
1791 &buffer.as_slice()[byte_offset..(byte_offset + buffer_length)]
1792 } else {
1793 buffer.as_slice()
1794 }
1795}
1796
1797#[allow(clippy::too_many_arguments)]
1799fn write_array_data(
1800 array_data: &ArrayData,
1801 buffers: &mut Vec<crate::Buffer>,
1802 arrow_data: &mut Vec<u8>,
1803 nodes: &mut Vec<crate::FieldNode>,
1804 offset: i64,
1805 num_rows: usize,
1806 null_count: usize,
1807 compression_codec: Option<CompressionCodec>,
1808 compression_context: &mut CompressionContext,
1809 write_options: &IpcWriteOptions,
1810) -> Result<i64, ArrowError> {
1811 let mut offset = offset;
1812 if !matches!(array_data.data_type(), DataType::Null) {
1813 nodes.push(crate::FieldNode::new(num_rows as i64, null_count as i64));
1814 } else {
1815 nodes.push(crate::FieldNode::new(num_rows as i64, num_rows as i64));
1818 }
1819 if has_validity_bitmap(array_data.data_type(), write_options) {
1820 let null_buffer = match array_data.nulls() {
1822 None => {
1823 let num_bytes = bit_util::ceil(num_rows, 8);
1825 let buffer = MutableBuffer::new(num_bytes);
1826 let buffer = buffer.with_bitset(num_bytes, true);
1827 buffer.into()
1828 }
1829 Some(buffer) => buffer.inner().sliced(),
1830 };
1831
1832 offset = write_buffer(
1833 null_buffer.as_slice(),
1834 buffers,
1835 arrow_data,
1836 offset,
1837 compression_codec,
1838 compression_context,
1839 write_options.alignment,
1840 )?;
1841 }
1842
1843 let data_type = array_data.data_type();
1844 if matches!(data_type, DataType::Binary | DataType::Utf8) {
1845 let (offsets, values) = get_byte_array_buffers::<i32>(array_data);
1846 for buffer in [offsets, values] {
1847 offset = write_buffer(
1848 buffer.as_slice(),
1849 buffers,
1850 arrow_data,
1851 offset,
1852 compression_codec,
1853 compression_context,
1854 write_options.alignment,
1855 )?;
1856 }
1857 } else if matches!(data_type, DataType::BinaryView | DataType::Utf8View) {
1858 let views = get_or_truncate_buffer(array_data);
1865 offset = write_buffer(
1866 views,
1867 buffers,
1868 arrow_data,
1869 offset,
1870 compression_codec,
1871 compression_context,
1872 write_options.alignment,
1873 )?;
1874
1875 for buffer in array_data.buffers().iter().skip(1) {
1876 offset = write_buffer(
1877 buffer.as_slice(),
1878 buffers,
1879 arrow_data,
1880 offset,
1881 compression_codec,
1882 compression_context,
1883 write_options.alignment,
1884 )?;
1885 }
1886 } else if matches!(data_type, DataType::LargeBinary | DataType::LargeUtf8) {
1887 let (offsets, values) = get_byte_array_buffers::<i64>(array_data);
1888 for buffer in [offsets, values] {
1889 offset = write_buffer(
1890 buffer.as_slice(),
1891 buffers,
1892 arrow_data,
1893 offset,
1894 compression_codec,
1895 compression_context,
1896 write_options.alignment,
1897 )?;
1898 }
1899 } else if DataType::is_numeric(data_type)
1900 || DataType::is_temporal(data_type)
1901 || matches!(
1902 array_data.data_type(),
1903 DataType::FixedSizeBinary(_) | DataType::Dictionary(_, _)
1904 )
1905 {
1906 assert_eq!(array_data.buffers().len(), 1);
1908
1909 let buffer = get_or_truncate_buffer(array_data);
1910 offset = write_buffer(
1911 buffer,
1912 buffers,
1913 arrow_data,
1914 offset,
1915 compression_codec,
1916 compression_context,
1917 write_options.alignment,
1918 )?;
1919 } else if matches!(data_type, DataType::Boolean) {
1920 assert_eq!(array_data.buffers().len(), 1);
1923
1924 let buffer = &array_data.buffers()[0];
1925 let buffer = buffer.bit_slice(array_data.offset(), array_data.len());
1926 offset = write_buffer(
1927 &buffer,
1928 buffers,
1929 arrow_data,
1930 offset,
1931 compression_codec,
1932 compression_context,
1933 write_options.alignment,
1934 )?;
1935 } else if matches!(
1936 data_type,
1937 DataType::List(_) | DataType::LargeList(_) | DataType::Map(_, _)
1938 ) {
1939 assert_eq!(array_data.buffers().len(), 1);
1940 assert_eq!(array_data.child_data().len(), 1);
1941
1942 let (offsets, sliced_child_data) = match data_type {
1944 DataType::List(_) => get_list_array_buffers::<i32>(array_data),
1945 DataType::Map(_, _) => get_list_array_buffers::<i32>(array_data),
1946 DataType::LargeList(_) => get_list_array_buffers::<i64>(array_data),
1947 _ => unreachable!(),
1948 };
1949 offset = write_buffer(
1950 offsets.as_slice(),
1951 buffers,
1952 arrow_data,
1953 offset,
1954 compression_codec,
1955 compression_context,
1956 write_options.alignment,
1957 )?;
1958 offset = write_array_data(
1959 &sliced_child_data,
1960 buffers,
1961 arrow_data,
1962 nodes,
1963 offset,
1964 sliced_child_data.len(),
1965 sliced_child_data.null_count(),
1966 compression_codec,
1967 compression_context,
1968 write_options,
1969 )?;
1970 return Ok(offset);
1971 } else if matches!(
1972 data_type,
1973 DataType::ListView(_) | DataType::LargeListView(_)
1974 ) {
1975 assert_eq!(array_data.buffers().len(), 2); assert_eq!(array_data.child_data().len(), 1);
1977
1978 let (offsets, sizes, child_data) = match data_type {
1979 DataType::ListView(_) => get_list_view_array_buffers::<i32>(array_data),
1980 DataType::LargeListView(_) => get_list_view_array_buffers::<i64>(array_data),
1981 _ => unreachable!(),
1982 };
1983
1984 offset = write_buffer(
1985 offsets.as_slice(),
1986 buffers,
1987 arrow_data,
1988 offset,
1989 compression_codec,
1990 compression_context,
1991 write_options.alignment,
1992 )?;
1993
1994 offset = write_buffer(
1995 sizes.as_slice(),
1996 buffers,
1997 arrow_data,
1998 offset,
1999 compression_codec,
2000 compression_context,
2001 write_options.alignment,
2002 )?;
2003
2004 offset = write_array_data(
2005 &child_data,
2006 buffers,
2007 arrow_data,
2008 nodes,
2009 offset,
2010 child_data.len(),
2011 child_data.null_count(),
2012 compression_codec,
2013 compression_context,
2014 write_options,
2015 )?;
2016 return Ok(offset);
2017 } else if let DataType::FixedSizeList(_, fixed_size) = data_type {
2018 assert_eq!(array_data.child_data().len(), 1);
2019 let fixed_size = *fixed_size as usize;
2020
2021 let child_offset = array_data.offset() * fixed_size;
2022 let child_length = array_data.len() * fixed_size;
2023 let child_data = array_data.child_data()[0].slice(child_offset, child_length);
2024
2025 offset = write_array_data(
2026 &child_data,
2027 buffers,
2028 arrow_data,
2029 nodes,
2030 offset,
2031 child_data.len(),
2032 child_data.null_count(),
2033 compression_codec,
2034 compression_context,
2035 write_options,
2036 )?;
2037 return Ok(offset);
2038 } else {
2039 for buffer in array_data.buffers() {
2040 offset = write_buffer(
2041 buffer,
2042 buffers,
2043 arrow_data,
2044 offset,
2045 compression_codec,
2046 compression_context,
2047 write_options.alignment,
2048 )?;
2049 }
2050 }
2051
2052 match array_data.data_type() {
2053 DataType::Dictionary(_, _) => {}
2054 DataType::RunEndEncoded(_, _) => {
2055 let arr = unslice_run_array(array_data.clone())?;
2057 for data_ref in arr.child_data() {
2059 offset = write_array_data(
2061 data_ref,
2062 buffers,
2063 arrow_data,
2064 nodes,
2065 offset,
2066 data_ref.len(),
2067 data_ref.null_count(),
2068 compression_codec,
2069 compression_context,
2070 write_options,
2071 )?;
2072 }
2073 }
2074 _ => {
2075 for data_ref in array_data.child_data() {
2077 offset = write_array_data(
2079 data_ref,
2080 buffers,
2081 arrow_data,
2082 nodes,
2083 offset,
2084 data_ref.len(),
2085 data_ref.null_count(),
2086 compression_codec,
2087 compression_context,
2088 write_options,
2089 )?;
2090 }
2091 }
2092 }
2093 Ok(offset)
2094}
2095
2096fn write_buffer(
2109 buffer: &[u8], buffers: &mut Vec<crate::Buffer>, arrow_data: &mut Vec<u8>, offset: i64, compression_codec: Option<CompressionCodec>,
2114 compression_context: &mut CompressionContext,
2115 alignment: u8,
2116) -> Result<i64, ArrowError> {
2117 let len: i64 = match compression_codec {
2118 Some(compressor) => compressor.compress_to_vec(buffer, arrow_data, compression_context)?,
2119 None => {
2120 arrow_data.extend_from_slice(buffer);
2121 buffer.len()
2122 }
2123 }
2124 .try_into()
2125 .map_err(|e| {
2126 ArrowError::InvalidArgumentError(format!("Could not convert compressed size to i64: {e}"))
2127 })?;
2128
2129 buffers.push(crate::Buffer::new(offset, len));
2131 let pad_len = pad_to_alignment(alignment, len as usize);
2133 arrow_data.extend_from_slice(&PADDING[..pad_len]);
2134
2135 Ok(offset + len + (pad_len as i64))
2136}
2137
2138const PADDING: [u8; 64] = [0; 64];
2139
2140#[inline]
2142fn pad_to_alignment(alignment: u8, len: usize) -> usize {
2143 let a = usize::from(alignment - 1);
2144 ((len + a) & !a) - len
2145}
2146
2147#[cfg(test)]
2148mod tests {
2149 use std::hash::Hasher;
2150 use std::io::Cursor;
2151 use std::io::Seek;
2152
2153 use arrow_array::builder::FixedSizeListBuilder;
2154 use arrow_array::builder::Float32Builder;
2155 use arrow_array::builder::Int64Builder;
2156 use arrow_array::builder::MapBuilder;
2157 use arrow_array::builder::StringViewBuilder;
2158 use arrow_array::builder::UnionBuilder;
2159 use arrow_array::builder::{
2160 GenericListBuilder, GenericListViewBuilder, ListBuilder, StringBuilder,
2161 };
2162 use arrow_array::builder::{PrimitiveRunBuilder, UInt32Builder};
2163 use arrow_array::types::*;
2164 use arrow_buffer::ScalarBuffer;
2165
2166 use crate::MetadataVersion;
2167 use crate::convert::fb_to_schema;
2168 use crate::reader::*;
2169 use crate::root_as_footer;
2170
2171 use super::*;
2172
2173 fn serialize_file(rb: &RecordBatch) -> Vec<u8> {
2174 let mut writer = FileWriter::try_new(vec![], rb.schema_ref()).unwrap();
2175 writer.write(rb).unwrap();
2176 writer.finish().unwrap();
2177 writer.into_inner().unwrap()
2178 }
2179
2180 fn deserialize_file(bytes: Vec<u8>) -> RecordBatch {
2181 let mut reader = FileReader::try_new(Cursor::new(bytes), None).unwrap();
2182 reader.next().unwrap().unwrap()
2183 }
2184
2185 fn serialize_stream(record: &RecordBatch) -> Vec<u8> {
2186 const IPC_ALIGNMENT: usize = 8;
2190
2191 let mut stream_writer = StreamWriter::try_new_with_options(
2192 vec![],
2193 record.schema_ref(),
2194 IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
2195 )
2196 .unwrap();
2197 stream_writer.write(record).unwrap();
2198 stream_writer.finish().unwrap();
2199 stream_writer.into_inner().unwrap()
2200 }
2201
2202 fn deserialize_stream(bytes: Vec<u8>) -> RecordBatch {
2203 let mut stream_reader = StreamReader::try_new(Cursor::new(bytes), None).unwrap();
2204 stream_reader.next().unwrap().unwrap()
2205 }
2206
2207 #[test]
2208 #[cfg(feature = "lz4")]
2209 fn test_write_empty_record_batch_lz4_compression() {
2210 let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
2211 let values: Vec<Option<i32>> = vec![];
2212 let array = Int32Array::from(values);
2213 let record_batch =
2214 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2215
2216 let mut file = tempfile::tempfile().unwrap();
2217
2218 {
2219 let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2220 .unwrap()
2221 .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
2222 .unwrap();
2223
2224 let mut writer =
2225 FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2226 writer.write(&record_batch).unwrap();
2227 writer.finish().unwrap();
2228 }
2229 file.rewind().unwrap();
2230 {
2231 let reader = FileReader::try_new(file, None).unwrap();
2233 for read_batch in reader {
2234 read_batch
2235 .unwrap()
2236 .columns()
2237 .iter()
2238 .zip(record_batch.columns())
2239 .for_each(|(a, b)| {
2240 assert_eq!(a.data_type(), b.data_type());
2241 assert_eq!(a.len(), b.len());
2242 assert_eq!(a.null_count(), b.null_count());
2243 });
2244 }
2245 }
2246 }
2247
2248 #[test]
2249 #[cfg(feature = "lz4")]
2250 fn test_write_file_with_lz4_compression() {
2251 let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
2252 let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
2253 let array = Int32Array::from(values);
2254 let record_batch =
2255 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2256
2257 let mut file = tempfile::tempfile().unwrap();
2258 {
2259 let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2260 .unwrap()
2261 .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
2262 .unwrap();
2263
2264 let mut writer =
2265 FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2266 writer.write(&record_batch).unwrap();
2267 writer.finish().unwrap();
2268 }
2269 file.rewind().unwrap();
2270 {
2271 let reader = FileReader::try_new(file, None).unwrap();
2273 for read_batch in reader {
2274 read_batch
2275 .unwrap()
2276 .columns()
2277 .iter()
2278 .zip(record_batch.columns())
2279 .for_each(|(a, b)| {
2280 assert_eq!(a.data_type(), b.data_type());
2281 assert_eq!(a.len(), b.len());
2282 assert_eq!(a.null_count(), b.null_count());
2283 });
2284 }
2285 }
2286 }
2287
2288 #[test]
2289 #[cfg(feature = "zstd")]
2290 fn test_write_file_with_zstd_compression() {
2291 let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
2292 let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
2293 let array = Int32Array::from(values);
2294 let record_batch =
2295 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2296 let mut file = tempfile::tempfile().unwrap();
2297 {
2298 let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2299 .unwrap()
2300 .try_with_compression(Some(crate::CompressionType::ZSTD))
2301 .unwrap();
2302
2303 let mut writer =
2304 FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2305 writer.write(&record_batch).unwrap();
2306 writer.finish().unwrap();
2307 }
2308 file.rewind().unwrap();
2309 {
2310 let reader = FileReader::try_new(file, None).unwrap();
2312 for read_batch in reader {
2313 read_batch
2314 .unwrap()
2315 .columns()
2316 .iter()
2317 .zip(record_batch.columns())
2318 .for_each(|(a, b)| {
2319 assert_eq!(a.data_type(), b.data_type());
2320 assert_eq!(a.len(), b.len());
2321 assert_eq!(a.null_count(), b.null_count());
2322 });
2323 }
2324 }
2325 }
2326
2327 #[test]
2328 fn test_write_file() {
2329 let schema = Schema::new(vec![Field::new("field1", DataType::UInt32, true)]);
2330 let values: Vec<Option<u32>> = vec![
2331 Some(999),
2332 None,
2333 Some(235),
2334 Some(123),
2335 None,
2336 None,
2337 None,
2338 None,
2339 None,
2340 ];
2341 let array1 = UInt32Array::from(values);
2342 let batch =
2343 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array1) as ArrayRef])
2344 .unwrap();
2345 let mut file = tempfile::tempfile().unwrap();
2346 {
2347 let mut writer = FileWriter::try_new(&mut file, &schema).unwrap();
2348
2349 writer.write(&batch).unwrap();
2350 writer.finish().unwrap();
2351 }
2352 file.rewind().unwrap();
2353
2354 {
2355 let mut reader = FileReader::try_new(file, None).unwrap();
2356 while let Some(Ok(read_batch)) = reader.next() {
2357 read_batch
2358 .columns()
2359 .iter()
2360 .zip(batch.columns())
2361 .for_each(|(a, b)| {
2362 assert_eq!(a.data_type(), b.data_type());
2363 assert_eq!(a.len(), b.len());
2364 assert_eq!(a.null_count(), b.null_count());
2365 });
2366 }
2367 }
2368 }
2369
2370 fn write_null_file(options: IpcWriteOptions) {
2371 let schema = Schema::new(vec![
2372 Field::new("nulls", DataType::Null, true),
2373 Field::new("int32s", DataType::Int32, false),
2374 Field::new("nulls2", DataType::Null, true),
2375 Field::new("f64s", DataType::Float64, false),
2376 ]);
2377 let array1 = NullArray::new(32);
2378 let array2 = Int32Array::from(vec![1; 32]);
2379 let array3 = NullArray::new(32);
2380 let array4 = Float64Array::from(vec![f64::NAN; 32]);
2381 let batch = RecordBatch::try_new(
2382 Arc::new(schema.clone()),
2383 vec![
2384 Arc::new(array1) as ArrayRef,
2385 Arc::new(array2) as ArrayRef,
2386 Arc::new(array3) as ArrayRef,
2387 Arc::new(array4) as ArrayRef,
2388 ],
2389 )
2390 .unwrap();
2391 let mut file = tempfile::tempfile().unwrap();
2392 {
2393 let mut writer = FileWriter::try_new_with_options(&mut file, &schema, options).unwrap();
2394
2395 writer.write(&batch).unwrap();
2396 writer.finish().unwrap();
2397 }
2398
2399 file.rewind().unwrap();
2400
2401 {
2402 let reader = FileReader::try_new(file, None).unwrap();
2403 reader.for_each(|maybe_batch| {
2404 maybe_batch
2405 .unwrap()
2406 .columns()
2407 .iter()
2408 .zip(batch.columns())
2409 .for_each(|(a, b)| {
2410 assert_eq!(a.data_type(), b.data_type());
2411 assert_eq!(a.len(), b.len());
2412 assert_eq!(a.null_count(), b.null_count());
2413 });
2414 });
2415 }
2416 }
2417 #[test]
2418 fn test_write_null_file_v4() {
2419 write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap());
2420 write_null_file(IpcWriteOptions::try_new(8, true, MetadataVersion::V4).unwrap());
2421 write_null_file(IpcWriteOptions::try_new(64, false, MetadataVersion::V4).unwrap());
2422 write_null_file(IpcWriteOptions::try_new(64, true, MetadataVersion::V4).unwrap());
2423 }
2424
2425 #[test]
2426 fn test_write_null_file_v5() {
2427 write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap());
2428 write_null_file(IpcWriteOptions::try_new(64, false, MetadataVersion::V5).unwrap());
2429 }
2430
2431 #[test]
2432 fn track_union_nested_dict() {
2433 let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2434
2435 let array = Arc::new(inner) as ArrayRef;
2436
2437 #[allow(deprecated)]
2439 let dctfield = Field::new_dict("dict", array.data_type().clone(), false, 0, false);
2440 let union_fields = [(0, Arc::new(dctfield))].into_iter().collect();
2441
2442 let types = [0, 0, 0].into_iter().collect::<ScalarBuffer<i8>>();
2443 let offsets = [0, 1, 2].into_iter().collect::<ScalarBuffer<i32>>();
2444
2445 let union = UnionArray::try_new(union_fields, types, Some(offsets), vec![array]).unwrap();
2446
2447 let schema = Arc::new(Schema::new(vec![Field::new(
2448 "union",
2449 union.data_type().clone(),
2450 false,
2451 )]));
2452
2453 let r#gen = IpcDataGenerator::default();
2454 let mut dict_tracker = DictionaryTracker::new(false);
2455 r#gen.schema_to_bytes_with_dictionary_tracker(
2456 &schema,
2457 &mut dict_tracker,
2458 &IpcWriteOptions::default(),
2459 );
2460
2461 let batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap();
2462
2463 r#gen
2464 .encode(
2465 &batch,
2466 &mut dict_tracker,
2467 &Default::default(),
2468 &mut Default::default(),
2469 )
2470 .unwrap();
2471
2472 assert!(dict_tracker.written.contains_key(&0));
2475 }
2476
2477 #[test]
2478 fn track_struct_nested_dict() {
2479 let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2480
2481 let array = Arc::new(inner) as ArrayRef;
2482
2483 #[allow(deprecated)]
2485 let dctfield = Arc::new(Field::new_dict(
2486 "dict",
2487 array.data_type().clone(),
2488 false,
2489 2,
2490 false,
2491 ));
2492
2493 let s = StructArray::from(vec![(dctfield, array)]);
2494 let struct_array = Arc::new(s) as ArrayRef;
2495
2496 let schema = Arc::new(Schema::new(vec![Field::new(
2497 "struct",
2498 struct_array.data_type().clone(),
2499 false,
2500 )]));
2501
2502 let r#gen = IpcDataGenerator::default();
2503 let mut dict_tracker = DictionaryTracker::new(false);
2504 r#gen.schema_to_bytes_with_dictionary_tracker(
2505 &schema,
2506 &mut dict_tracker,
2507 &IpcWriteOptions::default(),
2508 );
2509
2510 let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
2511
2512 r#gen
2513 .encode(
2514 &batch,
2515 &mut dict_tracker,
2516 &Default::default(),
2517 &mut Default::default(),
2518 )
2519 .unwrap();
2520
2521 assert!(dict_tracker.written.contains_key(&0));
2522 }
2523
2524 fn write_union_file(options: IpcWriteOptions) {
2525 let schema = Schema::new(vec![Field::new_union(
2526 "union",
2527 vec![0, 1],
2528 vec![
2529 Field::new("a", DataType::Int32, false),
2530 Field::new("c", DataType::Float64, false),
2531 ],
2532 UnionMode::Sparse,
2533 )]);
2534 let mut builder = UnionBuilder::with_capacity_sparse(5);
2535 builder.append::<Int32Type>("a", 1).unwrap();
2536 builder.append_null::<Int32Type>("a").unwrap();
2537 builder.append::<Float64Type>("c", 3.0).unwrap();
2538 builder.append_null::<Float64Type>("c").unwrap();
2539 builder.append::<Int32Type>("a", 4).unwrap();
2540 let union = builder.build().unwrap();
2541
2542 let batch =
2543 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(union) as ArrayRef])
2544 .unwrap();
2545
2546 let mut file = tempfile::tempfile().unwrap();
2547 {
2548 let mut writer = FileWriter::try_new_with_options(&mut file, &schema, options).unwrap();
2549
2550 writer.write(&batch).unwrap();
2551 writer.finish().unwrap();
2552 }
2553 file.rewind().unwrap();
2554
2555 {
2556 let reader = FileReader::try_new(file, None).unwrap();
2557 reader.for_each(|maybe_batch| {
2558 maybe_batch
2559 .unwrap()
2560 .columns()
2561 .iter()
2562 .zip(batch.columns())
2563 .for_each(|(a, b)| {
2564 assert_eq!(a.data_type(), b.data_type());
2565 assert_eq!(a.len(), b.len());
2566 assert_eq!(a.null_count(), b.null_count());
2567 });
2568 });
2569 }
2570 }
2571
2572 #[test]
2573 fn test_write_union_file_v4_v5() {
2574 write_union_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap());
2575 write_union_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap());
2576 }
2577
2578 #[test]
2579 fn test_write_view_types() {
2580 const LONG_TEST_STRING: &str =
2581 "This is a long string to make sure binary view array handles it";
2582 let schema = Schema::new(vec![
2583 Field::new("field1", DataType::BinaryView, true),
2584 Field::new("field2", DataType::Utf8View, true),
2585 ]);
2586 let values: Vec<Option<&[u8]>> = vec![
2587 Some(b"foo"),
2588 Some(b"bar"),
2589 Some(LONG_TEST_STRING.as_bytes()),
2590 ];
2591 let binary_array = BinaryViewArray::from_iter(values);
2592 let utf8_array =
2593 StringViewArray::from_iter(vec![Some("foo"), Some("bar"), Some(LONG_TEST_STRING)]);
2594 let record_batch = RecordBatch::try_new(
2595 Arc::new(schema.clone()),
2596 vec![Arc::new(binary_array), Arc::new(utf8_array)],
2597 )
2598 .unwrap();
2599
2600 let mut file = tempfile::tempfile().unwrap();
2601 {
2602 let mut writer = FileWriter::try_new(&mut file, &schema).unwrap();
2603 writer.write(&record_batch).unwrap();
2604 writer.finish().unwrap();
2605 }
2606 file.rewind().unwrap();
2607 {
2608 let mut reader = FileReader::try_new(&file, None).unwrap();
2609 let read_batch = reader.next().unwrap().unwrap();
2610 read_batch
2611 .columns()
2612 .iter()
2613 .zip(record_batch.columns())
2614 .for_each(|(a, b)| {
2615 assert_eq!(a, b);
2616 });
2617 }
2618 file.rewind().unwrap();
2619 {
2620 let mut reader = FileReader::try_new(&file, Some(vec![0])).unwrap();
2621 let read_batch = reader.next().unwrap().unwrap();
2622 assert_eq!(read_batch.num_columns(), 1);
2623 let read_array = read_batch.column(0);
2624 let write_array = record_batch.column(0);
2625 assert_eq!(read_array, write_array);
2626 }
2627 }
2628
2629 #[test]
2630 fn truncate_ipc_record_batch() {
2631 fn create_batch(rows: usize) -> RecordBatch {
2632 let schema = Schema::new(vec![
2633 Field::new("a", DataType::Int32, false),
2634 Field::new("b", DataType::Utf8, false),
2635 ]);
2636
2637 let a = Int32Array::from_iter_values(0..rows as i32);
2638 let b = StringArray::from_iter_values((0..rows).map(|i| i.to_string()));
2639
2640 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap()
2641 }
2642
2643 let big_record_batch = create_batch(65536);
2644
2645 let length = 5;
2646 let small_record_batch = create_batch(length);
2647
2648 let offset = 2;
2649 let record_batch_slice = big_record_batch.slice(offset, length);
2650 assert!(
2651 serialize_stream(&big_record_batch).len() > serialize_stream(&small_record_batch).len()
2652 );
2653 assert_eq!(
2654 serialize_stream(&small_record_batch).len(),
2655 serialize_stream(&record_batch_slice).len()
2656 );
2657
2658 assert_eq!(
2659 deserialize_stream(serialize_stream(&record_batch_slice)),
2660 record_batch_slice
2661 );
2662 }
2663
2664 #[test]
2665 fn truncate_ipc_record_batch_with_nulls() {
2666 fn create_batch() -> RecordBatch {
2667 let schema = Schema::new(vec![
2668 Field::new("a", DataType::Int32, true),
2669 Field::new("b", DataType::Utf8, true),
2670 ]);
2671
2672 let a = Int32Array::from(vec![Some(1), None, Some(1), None, Some(1)]);
2673 let b = StringArray::from(vec![None, Some("a"), Some("a"), None, Some("a")]);
2674
2675 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap()
2676 }
2677
2678 let record_batch = create_batch();
2679 let record_batch_slice = record_batch.slice(1, 2);
2680 let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2681
2682 assert!(
2683 serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2684 );
2685
2686 assert!(deserialized_batch.column(0).is_null(0));
2687 assert!(deserialized_batch.column(0).is_valid(1));
2688 assert!(deserialized_batch.column(1).is_valid(0));
2689 assert!(deserialized_batch.column(1).is_valid(1));
2690
2691 assert_eq!(record_batch_slice, deserialized_batch);
2692 }
2693
2694 #[test]
2695 fn truncate_ipc_dictionary_array() {
2696 fn create_batch() -> RecordBatch {
2697 let values: StringArray = [Some("foo"), Some("bar"), Some("baz")]
2698 .into_iter()
2699 .collect();
2700 let keys: Int32Array = [Some(0), Some(2), None, Some(1)].into_iter().collect();
2701
2702 let array = DictionaryArray::new(keys, Arc::new(values));
2703
2704 let schema = Schema::new(vec![Field::new("dict", array.data_type().clone(), true)]);
2705
2706 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap()
2707 }
2708
2709 let record_batch = create_batch();
2710 let record_batch_slice = record_batch.slice(1, 2);
2711 let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2712
2713 assert!(
2714 serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2715 );
2716
2717 assert!(deserialized_batch.column(0).is_valid(0));
2718 assert!(deserialized_batch.column(0).is_null(1));
2719
2720 assert_eq!(record_batch_slice, deserialized_batch);
2721 }
2722
2723 #[test]
2724 fn truncate_ipc_struct_array() {
2725 fn create_batch() -> RecordBatch {
2726 let strings: StringArray = [Some("foo"), None, Some("bar"), Some("baz")]
2727 .into_iter()
2728 .collect();
2729 let ints: Int32Array = [Some(0), Some(2), None, Some(1)].into_iter().collect();
2730
2731 let struct_array = StructArray::from(vec![
2732 (
2733 Arc::new(Field::new("s", DataType::Utf8, true)),
2734 Arc::new(strings) as ArrayRef,
2735 ),
2736 (
2737 Arc::new(Field::new("c", DataType::Int32, true)),
2738 Arc::new(ints) as ArrayRef,
2739 ),
2740 ]);
2741
2742 let schema = Schema::new(vec![Field::new(
2743 "struct_array",
2744 struct_array.data_type().clone(),
2745 true,
2746 )]);
2747
2748 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(struct_array)]).unwrap()
2749 }
2750
2751 let record_batch = create_batch();
2752 let record_batch_slice = record_batch.slice(1, 2);
2753 let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2754
2755 assert!(
2756 serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2757 );
2758
2759 let structs = deserialized_batch
2760 .column(0)
2761 .as_any()
2762 .downcast_ref::<StructArray>()
2763 .unwrap();
2764
2765 assert!(structs.column(0).is_null(0));
2766 assert!(structs.column(0).is_valid(1));
2767 assert!(structs.column(1).is_valid(0));
2768 assert!(structs.column(1).is_null(1));
2769 assert_eq!(record_batch_slice, deserialized_batch);
2770 }
2771
2772 #[test]
2773 fn truncate_ipc_string_array_with_all_empty_string() {
2774 fn create_batch() -> RecordBatch {
2775 let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
2776 let a = StringArray::from(vec![Some(""), Some(""), Some(""), Some(""), Some("")]);
2777 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap()
2778 }
2779
2780 let record_batch = create_batch();
2781 let record_batch_slice = record_batch.slice(0, 1);
2782 let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2783
2784 assert!(
2785 serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2786 );
2787 assert_eq!(record_batch_slice, deserialized_batch);
2788 }
2789
2790 #[test]
2791 fn test_stream_writer_writes_array_slice() {
2792 let array = UInt32Array::from(vec![Some(1), Some(2), Some(3)]);
2793 assert_eq!(
2794 vec![Some(1), Some(2), Some(3)],
2795 array.iter().collect::<Vec<_>>()
2796 );
2797
2798 let sliced = array.slice(1, 2);
2799 assert_eq!(vec![Some(2), Some(3)], sliced.iter().collect::<Vec<_>>());
2800
2801 let batch = RecordBatch::try_new(
2802 Arc::new(Schema::new(vec![Field::new("a", DataType::UInt32, true)])),
2803 vec![Arc::new(sliced)],
2804 )
2805 .expect("new batch");
2806
2807 let mut writer = StreamWriter::try_new(vec![], batch.schema_ref()).expect("new writer");
2808 writer.write(&batch).expect("write");
2809 let outbuf = writer.into_inner().expect("inner");
2810
2811 let mut reader = StreamReader::try_new(&outbuf[..], None).expect("new reader");
2812 let read_batch = reader.next().unwrap().expect("read batch");
2813
2814 let read_array: &UInt32Array = read_batch.column(0).as_primitive();
2815 assert_eq!(
2816 vec![Some(2), Some(3)],
2817 read_array.iter().collect::<Vec<_>>()
2818 );
2819 }
2820
2821 #[test]
2822 fn test_large_slice_uint32() {
2823 ensure_roundtrip(Arc::new(UInt32Array::from_iter(
2824 (0..8000).map(|i| if i % 2 == 0 { Some(i) } else { None }),
2825 )));
2826 }
2827
2828 #[test]
2829 fn test_large_slice_string() {
2830 let strings: Vec<_> = (0..8000)
2831 .map(|i| {
2832 if i % 2 == 0 {
2833 Some(format!("value{i}"))
2834 } else {
2835 None
2836 }
2837 })
2838 .collect();
2839
2840 ensure_roundtrip(Arc::new(StringArray::from(strings)));
2841 }
2842
2843 #[test]
2844 fn test_large_slice_string_list() {
2845 let mut ls = ListBuilder::new(StringBuilder::new());
2846
2847 let mut s = String::new();
2848 for row_number in 0..8000 {
2849 if row_number % 2 == 0 {
2850 for list_element in 0..1000 {
2851 s.clear();
2852 use std::fmt::Write;
2853 write!(&mut s, "value{row_number}-{list_element}").unwrap();
2854 ls.values().append_value(&s);
2855 }
2856 ls.append(true)
2857 } else {
2858 ls.append(false); }
2860 }
2861
2862 ensure_roundtrip(Arc::new(ls.finish()));
2863 }
2864
2865 #[test]
2866 fn test_large_slice_string_list_of_lists() {
2867 let mut ls = ListBuilder::new(ListBuilder::new(StringBuilder::new()));
2871
2872 for _ in 0..4000 {
2873 ls.values().append(true);
2874 ls.append(true)
2875 }
2876
2877 let mut s = String::new();
2878 for row_number in 0..4000 {
2879 if row_number % 2 == 0 {
2880 for list_element in 0..1000 {
2881 s.clear();
2882 use std::fmt::Write;
2883 write!(&mut s, "value{row_number}-{list_element}").unwrap();
2884 ls.values().values().append_value(&s);
2885 }
2886 ls.values().append(true);
2887 ls.append(true)
2888 } else {
2889 ls.append(false); }
2891 }
2892
2893 ensure_roundtrip(Arc::new(ls.finish()));
2894 }
2895
2896 fn ensure_roundtrip(array: ArrayRef) {
2898 let num_rows = array.len();
2899 let orig_batch = RecordBatch::try_from_iter(vec![("a", array)]).unwrap();
2900 let sliced_batch = orig_batch.slice(1, num_rows - 1);
2902
2903 let schema = orig_batch.schema();
2904 let stream_data = {
2905 let mut writer = StreamWriter::try_new(vec![], &schema).unwrap();
2906 writer.write(&sliced_batch).unwrap();
2907 writer.into_inner().unwrap()
2908 };
2909 let read_batch = {
2910 let projection = None;
2911 let mut reader = StreamReader::try_new(Cursor::new(stream_data), projection).unwrap();
2912 reader
2913 .next()
2914 .expect("expect no errors reading batch")
2915 .expect("expect batch")
2916 };
2917 assert_eq!(sliced_batch, read_batch);
2918
2919 let file_data = {
2920 let mut writer = FileWriter::try_new_buffered(vec![], &schema).unwrap();
2921 writer.write(&sliced_batch).unwrap();
2922 writer.into_inner().unwrap().into_inner().unwrap()
2923 };
2924 let read_batch = {
2925 let projection = None;
2926 let mut reader = FileReader::try_new(Cursor::new(file_data), projection).unwrap();
2927 reader
2928 .next()
2929 .expect("expect no errors reading batch")
2930 .expect("expect batch")
2931 };
2932 assert_eq!(sliced_batch, read_batch);
2933
2934 }
2936
2937 #[test]
2938 fn encode_bools_slice() {
2939 assert_bool_roundtrip([true, false], 1, 1);
2941
2942 assert_bool_roundtrip(
2944 [
2945 true, false, true, true, false, false, true, true, true, false, false, false, true,
2946 true, true, true, false, false, false, false, true, true, true, true, true, false,
2947 false, false, false, false,
2948 ],
2949 13,
2950 17,
2951 );
2952
2953 assert_bool_roundtrip(
2955 [
2956 true, false, true, true, false, false, true, true, true, false, false, false,
2957 ],
2958 8,
2959 2,
2960 );
2961
2962 assert_bool_roundtrip(
2964 [
2965 true, false, true, true, false, false, true, true, true, false, false, false, true,
2966 true, true, true, true, false, false, false, false, false,
2967 ],
2968 8,
2969 8,
2970 );
2971 }
2972
2973 fn assert_bool_roundtrip<const N: usize>(bools: [bool; N], offset: usize, length: usize) {
2974 let val_bool_field = Field::new("val", DataType::Boolean, false);
2975
2976 let schema = Arc::new(Schema::new(vec![val_bool_field]));
2977
2978 let bools = BooleanArray::from(bools.to_vec());
2979
2980 let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(bools)]).unwrap();
2981 let batch = batch.slice(offset, length);
2982
2983 let data = serialize_stream(&batch);
2984 let batch2 = deserialize_stream(data);
2985 assert_eq!(batch, batch2);
2986 }
2987
2988 #[test]
2989 fn test_run_array_unslice() {
2990 let total_len = 80;
2991 let vals: Vec<Option<i32>> = vec![Some(1), None, Some(2), Some(3), Some(4), None, Some(5)];
2992 let repeats: Vec<usize> = vec![3, 4, 1, 2];
2993 let mut input_array: Vec<Option<i32>> = Vec::with_capacity(total_len);
2994 for ix in 0_usize..32 {
2995 let repeat: usize = repeats[ix % repeats.len()];
2996 let val: Option<i32> = vals[ix % vals.len()];
2997 input_array.resize(input_array.len() + repeat, val);
2998 }
2999
3000 let mut builder =
3002 PrimitiveRunBuilder::<Int16Type, Int32Type>::with_capacity(input_array.len());
3003 builder.extend(input_array.iter().copied());
3004 let run_array = builder.finish();
3005
3006 for slice_len in 1..=total_len {
3008 let sliced_run_array: RunArray<Int16Type> =
3010 run_array.slice(0, slice_len).into_data().into();
3011
3012 let unsliced_run_array = into_zero_offset_run_array(sliced_run_array).unwrap();
3014 let typed = unsliced_run_array
3015 .downcast::<PrimitiveArray<Int32Type>>()
3016 .unwrap();
3017 let expected: Vec<Option<i32>> = input_array.iter().take(slice_len).copied().collect();
3018 let actual: Vec<Option<i32>> = typed.into_iter().collect();
3019 assert_eq!(expected, actual);
3020
3021 let sliced_run_array: RunArray<Int16Type> = run_array
3023 .slice(total_len - slice_len, slice_len)
3024 .into_data()
3025 .into();
3026
3027 let unsliced_run_array = into_zero_offset_run_array(sliced_run_array).unwrap();
3029 let typed = unsliced_run_array
3030 .downcast::<PrimitiveArray<Int32Type>>()
3031 .unwrap();
3032 let expected: Vec<Option<i32>> = input_array
3033 .iter()
3034 .skip(total_len - slice_len)
3035 .copied()
3036 .collect();
3037 let actual: Vec<Option<i32>> = typed.into_iter().collect();
3038 assert_eq!(expected, actual);
3039 }
3040 }
3041
3042 fn generate_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3043 let mut ls = GenericListBuilder::<O, _>::new(UInt32Builder::new());
3044
3045 for i in 0..100_000 {
3046 for value in [i, i, i] {
3047 ls.values().append_value(value);
3048 }
3049 ls.append(true)
3050 }
3051
3052 ls.finish()
3053 }
3054
3055 fn generate_utf8view_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3056 let mut ls = GenericListBuilder::<O, _>::new(StringViewBuilder::new());
3057
3058 for i in 0..100_000 {
3059 for value in [
3060 format!("value{}", i),
3061 format!("value{}", i),
3062 format!("value{}", i),
3063 ] {
3064 ls.values().append_value(&value);
3065 }
3066 ls.append(true)
3067 }
3068
3069 ls.finish()
3070 }
3071
3072 fn generate_string_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3073 let mut ls = GenericListBuilder::<O, _>::new(StringBuilder::new());
3074
3075 for i in 0..100_000 {
3076 for value in [
3077 format!("value{}", i),
3078 format!("value{}", i),
3079 format!("value{}", i),
3080 ] {
3081 ls.values().append_value(&value);
3082 }
3083 ls.append(true)
3084 }
3085
3086 ls.finish()
3087 }
3088
3089 fn generate_nested_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3090 let mut ls =
3091 GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(UInt32Builder::new()));
3092
3093 for _i in 0..10_000 {
3094 for j in 0..10 {
3095 for value in [j, j, j, j] {
3096 ls.values().values().append_value(value);
3097 }
3098 ls.values().append(true)
3099 }
3100 ls.append(true);
3101 }
3102
3103 ls.finish()
3104 }
3105
3106 fn generate_nested_list_data_starting_at_zero<O: OffsetSizeTrait>() -> GenericListArray<O> {
3107 let mut ls =
3108 GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(UInt32Builder::new()));
3109
3110 for _i in 0..999 {
3111 ls.values().append(true);
3112 ls.append(true);
3113 }
3114
3115 for j in 0..10 {
3116 for value in [j, j, j, j] {
3117 ls.values().values().append_value(value);
3118 }
3119 ls.values().append(true)
3120 }
3121 ls.append(true);
3122
3123 for i in 0..9_000 {
3124 for j in 0..10 {
3125 for value in [i + j, i + j, i + j, i + j] {
3126 ls.values().values().append_value(value);
3127 }
3128 ls.values().append(true)
3129 }
3130 ls.append(true);
3131 }
3132
3133 ls.finish()
3134 }
3135
3136 fn generate_map_array_data() -> MapArray {
3137 let keys_builder = UInt32Builder::new();
3138 let values_builder = UInt32Builder::new();
3139
3140 let mut builder = MapBuilder::new(None, keys_builder, values_builder);
3141
3142 for i in 0..100_000 {
3143 for _j in 0..3 {
3144 builder.keys().append_value(i);
3145 builder.values().append_value(i * 2);
3146 }
3147 builder.append(true).unwrap();
3148 }
3149
3150 builder.finish()
3151 }
3152
3153 #[test]
3154 fn reencode_offsets_when_first_offset_is_not_zero() {
3155 let original_list = generate_list_data::<i32>();
3156 let original_data = original_list.into_data();
3157 let slice_data = original_data.slice(75, 7);
3158 let (new_offsets, original_start, length) =
3159 reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
3160 assert_eq!(
3161 vec![0, 3, 6, 9, 12, 15, 18, 21],
3162 new_offsets.typed_data::<i32>()
3163 );
3164 assert_eq!(225, original_start);
3165 assert_eq!(21, length);
3166 }
3167
3168 #[test]
3169 fn reencode_offsets_when_first_offset_is_zero() {
3170 let mut ls = GenericListBuilder::<i32, _>::new(UInt32Builder::new());
3171 ls.append(true);
3173 ls.values().append_value(35);
3174 ls.values().append_value(42);
3175 ls.append(true);
3176 let original_list = ls.finish();
3177 let original_data = original_list.into_data();
3178
3179 let slice_data = original_data.slice(1, 1);
3180 let (new_offsets, original_start, length) =
3181 reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
3182 assert_eq!(vec![0, 2], new_offsets.typed_data::<i32>());
3183 assert_eq!(0, original_start);
3184 assert_eq!(2, length);
3185 }
3186
3187 fn roundtrip_ensure_sliced_smaller(in_batch: RecordBatch, expected_size_factor: usize) {
3190 let in_sliced = in_batch.slice(999, 1);
3192
3193 let bytes_batch = serialize_file(&in_batch);
3194 let bytes_sliced = serialize_file(&in_sliced);
3195
3196 assert!(bytes_sliced.len() < (bytes_batch.len() / expected_size_factor));
3198
3199 let out_batch = deserialize_file(bytes_batch);
3201 assert_eq!(in_batch, out_batch);
3202
3203 let out_sliced = deserialize_file(bytes_sliced);
3204 assert_eq!(in_sliced, out_sliced);
3205 }
3206
3207 #[test]
3208 fn encode_lists() {
3209 let val_inner = Field::new_list_field(DataType::UInt32, true);
3210 let val_list_field = Field::new("val", DataType::List(Arc::new(val_inner)), false);
3211 let schema = Arc::new(Schema::new(vec![val_list_field]));
3212
3213 let values = Arc::new(generate_list_data::<i32>());
3214
3215 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3216 roundtrip_ensure_sliced_smaller(in_batch, 1000);
3217 }
3218
3219 #[test]
3220 fn encode_empty_list() {
3221 let val_inner = Field::new_list_field(DataType::UInt32, true);
3222 let val_list_field = Field::new("val", DataType::List(Arc::new(val_inner)), false);
3223 let schema = Arc::new(Schema::new(vec![val_list_field]));
3224
3225 let values = Arc::new(generate_list_data::<i32>());
3226
3227 let in_batch = RecordBatch::try_new(schema, vec![values])
3228 .unwrap()
3229 .slice(999, 0);
3230 let out_batch = deserialize_file(serialize_file(&in_batch));
3231 assert_eq!(in_batch, out_batch);
3232 }
3233
3234 #[test]
3235 fn encode_large_lists() {
3236 let val_inner = Field::new_list_field(DataType::UInt32, true);
3237 let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3238 let schema = Arc::new(Schema::new(vec![val_list_field]));
3239
3240 let values = Arc::new(generate_list_data::<i64>());
3241
3242 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3245 roundtrip_ensure_sliced_smaller(in_batch, 1000);
3246 }
3247
3248 #[test]
3249 fn encode_large_lists_non_zero_offset() {
3250 let val_inner = Field::new_list_field(DataType::UInt32, true);
3251 let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3252 let schema = Arc::new(Schema::new(vec![val_list_field]));
3253
3254 let values = Arc::new(generate_list_data::<i64>());
3255
3256 check_sliced_list_array(schema, values);
3257 }
3258
3259 #[test]
3260 fn encode_large_lists_string_non_zero_offset() {
3261 let val_inner = Field::new_list_field(DataType::Utf8, true);
3262 let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3263 let schema = Arc::new(Schema::new(vec![val_list_field]));
3264
3265 let values = Arc::new(generate_string_list_data::<i64>());
3266
3267 check_sliced_list_array(schema, values);
3268 }
3269
3270 #[test]
3271 fn encode_large_list_string_view_non_zero_offset() {
3272 let val_inner = Field::new_list_field(DataType::Utf8View, true);
3273 let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3274 let schema = Arc::new(Schema::new(vec![val_list_field]));
3275
3276 let values = Arc::new(generate_utf8view_list_data::<i64>());
3277
3278 check_sliced_list_array(schema, values);
3279 }
3280
3281 fn check_sliced_list_array(schema: Arc<Schema>, values: Arc<GenericListArray<i64>>) {
3282 for (offset, len) in [(999, 1), (0, 13), (47, 12), (values.len() - 13, 13)] {
3283 let in_batch = RecordBatch::try_new(schema.clone(), vec![values.clone()])
3284 .unwrap()
3285 .slice(offset, len);
3286 let out_batch = deserialize_file(serialize_file(&in_batch));
3287 assert_eq!(in_batch, out_batch);
3288 }
3289 }
3290
3291 #[test]
3292 fn encode_nested_lists() {
3293 let inner_int = Arc::new(Field::new_list_field(DataType::UInt32, true));
3294 let inner_list_field = Arc::new(Field::new_list_field(DataType::List(inner_int), true));
3295 let list_field = Field::new("val", DataType::List(inner_list_field), true);
3296 let schema = Arc::new(Schema::new(vec![list_field]));
3297
3298 let values = Arc::new(generate_nested_list_data::<i32>());
3299
3300 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3301 roundtrip_ensure_sliced_smaller(in_batch, 1000);
3302 }
3303
3304 #[test]
3305 fn encode_nested_lists_starting_at_zero() {
3306 let inner_int = Arc::new(Field::new("item", DataType::UInt32, true));
3307 let inner_list_field = Arc::new(Field::new("item", DataType::List(inner_int), true));
3308 let list_field = Field::new("val", DataType::List(inner_list_field), true);
3309 let schema = Arc::new(Schema::new(vec![list_field]));
3310
3311 let values = Arc::new(generate_nested_list_data_starting_at_zero::<i32>());
3312
3313 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3314 roundtrip_ensure_sliced_smaller(in_batch, 1);
3315 }
3316
3317 #[test]
3318 fn encode_map_array() {
3319 let keys = Arc::new(Field::new("keys", DataType::UInt32, false));
3320 let values = Arc::new(Field::new("values", DataType::UInt32, true));
3321 let map_field = Field::new_map("map", "entries", keys, values, false, true);
3322 let schema = Arc::new(Schema::new(vec![map_field]));
3323
3324 let values = Arc::new(generate_map_array_data());
3325
3326 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3327 roundtrip_ensure_sliced_smaller(in_batch, 1000);
3328 }
3329
3330 fn generate_list_view_data<O: OffsetSizeTrait>() -> GenericListViewArray<O> {
3331 let mut builder = GenericListViewBuilder::<O, _>::new(UInt32Builder::new());
3332
3333 for i in 0u32..100_000 {
3334 if i.is_multiple_of(10_000) {
3335 builder.append(false);
3336 continue;
3337 }
3338 for value in [i, i, i] {
3339 builder.values().append_value(value);
3340 }
3341 builder.append(true);
3342 }
3343
3344 builder.finish()
3345 }
3346
3347 #[test]
3348 fn encode_list_view_arrays() {
3349 let val_inner = Field::new_list_field(DataType::UInt32, true);
3350 let val_field = Field::new("val", DataType::ListView(Arc::new(val_inner)), true);
3351 let schema = Arc::new(Schema::new(vec![val_field]));
3352
3353 let values = Arc::new(generate_list_view_data::<i32>());
3354
3355 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3356 let out_batch = deserialize_file(serialize_file(&in_batch));
3357 assert_eq!(in_batch, out_batch);
3358 }
3359
3360 #[test]
3361 fn encode_large_list_view_arrays() {
3362 let val_inner = Field::new_list_field(DataType::UInt32, true);
3363 let val_field = Field::new("val", DataType::LargeListView(Arc::new(val_inner)), true);
3364 let schema = Arc::new(Schema::new(vec![val_field]));
3365
3366 let values = Arc::new(generate_list_view_data::<i64>());
3367
3368 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3369 let out_batch = deserialize_file(serialize_file(&in_batch));
3370 assert_eq!(in_batch, out_batch);
3371 }
3372
3373 #[test]
3374 fn check_sliced_list_view_array() {
3375 let inner = Field::new_list_field(DataType::UInt32, true);
3376 let field = Field::new("val", DataType::ListView(Arc::new(inner)), true);
3377 let schema = Arc::new(Schema::new(vec![field]));
3378 let values = Arc::new(generate_list_view_data::<i32>());
3379
3380 for (offset, len) in [(999, 1), (0, 13), (47, 12), (values.len() - 13, 13)] {
3381 let in_batch = RecordBatch::try_new(schema.clone(), vec![values.clone()])
3382 .unwrap()
3383 .slice(offset, len);
3384 let out_batch = deserialize_file(serialize_file(&in_batch));
3385 assert_eq!(in_batch, out_batch);
3386 }
3387 }
3388
3389 #[test]
3390 fn check_sliced_large_list_view_array() {
3391 let inner = Field::new_list_field(DataType::UInt32, true);
3392 let field = Field::new("val", DataType::LargeListView(Arc::new(inner)), true);
3393 let schema = Arc::new(Schema::new(vec![field]));
3394 let values = Arc::new(generate_list_view_data::<i64>());
3395
3396 for (offset, len) in [(999, 1), (0, 13), (47, 12), (values.len() - 13, 13)] {
3397 let in_batch = RecordBatch::try_new(schema.clone(), vec![values.clone()])
3398 .unwrap()
3399 .slice(offset, len);
3400 let out_batch = deserialize_file(serialize_file(&in_batch));
3401 assert_eq!(in_batch, out_batch);
3402 }
3403 }
3404
3405 fn generate_nested_list_view_data<O: OffsetSizeTrait>() -> GenericListViewArray<O> {
3406 let inner_builder = UInt32Builder::new();
3407 let middle_builder = GenericListViewBuilder::<O, _>::new(inner_builder);
3408 let mut outer_builder = GenericListViewBuilder::<O, _>::new(middle_builder);
3409
3410 for i in 0u32..10_000 {
3411 if i.is_multiple_of(1_000) {
3412 outer_builder.append(false);
3413 continue;
3414 }
3415
3416 for _ in 0..3 {
3417 for value in [i, i + 1, i + 2] {
3418 outer_builder.values().values().append_value(value);
3419 }
3420 outer_builder.values().append(true);
3421 }
3422 outer_builder.append(true);
3423 }
3424
3425 outer_builder.finish()
3426 }
3427
3428 #[test]
3429 fn encode_nested_list_views() {
3430 let inner_int = Arc::new(Field::new_list_field(DataType::UInt32, true));
3431 let inner_list_field = Arc::new(Field::new_list_field(DataType::ListView(inner_int), true));
3432 let list_field = Field::new("val", DataType::ListView(inner_list_field), true);
3433 let schema = Arc::new(Schema::new(vec![list_field]));
3434
3435 let values = Arc::new(generate_nested_list_view_data::<i32>());
3436
3437 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3438 let out_batch = deserialize_file(serialize_file(&in_batch));
3439 assert_eq!(in_batch, out_batch);
3440 }
3441
3442 fn test_roundtrip_list_view_of_dict_impl<OffsetSize: OffsetSizeTrait, U: ArrowNativeType>(
3443 list_data_type: DataType,
3444 offsets: &[U; 5],
3445 sizes: &[U; 4],
3446 ) {
3447 let values = StringArray::from(vec![Some("alpha"), None, Some("beta"), Some("gamma")]);
3448 let keys = Int32Array::from_iter_values([0, 0, 1, 2, 3, 0, 2]);
3449 let dict_array = DictionaryArray::new(keys, Arc::new(values));
3450 let dict_data = dict_array.to_data();
3451
3452 let value_offsets = Buffer::from_slice_ref(offsets);
3453 let value_sizes = Buffer::from_slice_ref(sizes);
3454
3455 let list_data = ArrayData::builder(list_data_type)
3456 .len(4)
3457 .add_buffer(value_offsets)
3458 .add_buffer(value_sizes)
3459 .add_child_data(dict_data)
3460 .build()
3461 .unwrap();
3462 let list_view_array = GenericListViewArray::<OffsetSize>::from(list_data);
3463
3464 let schema = Arc::new(Schema::new(vec![Field::new(
3465 "f1",
3466 list_view_array.data_type().clone(),
3467 false,
3468 )]));
3469 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(list_view_array)]).unwrap();
3470
3471 let output_batch = deserialize_file(serialize_file(&input_batch));
3472 assert_eq!(input_batch, output_batch);
3473
3474 let output_batch = deserialize_stream(serialize_stream(&input_batch));
3475 assert_eq!(input_batch, output_batch);
3476 }
3477
3478 #[test]
3479 fn test_roundtrip_list_view_of_dict() {
3480 #[allow(deprecated)]
3481 let list_data_type = DataType::ListView(Arc::new(Field::new_dict(
3482 "item",
3483 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3484 true,
3485 1,
3486 false,
3487 )));
3488 let offsets: &[i32; 5] = &[0, 2, 4, 4, 7];
3489 let sizes: &[i32; 4] = &[2, 2, 0, 3];
3490 test_roundtrip_list_view_of_dict_impl::<i32, i32>(list_data_type, offsets, sizes);
3491 }
3492
3493 #[test]
3494 fn test_roundtrip_large_list_view_of_dict() {
3495 #[allow(deprecated)]
3496 let list_data_type = DataType::LargeListView(Arc::new(Field::new_dict(
3497 "item",
3498 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3499 true,
3500 2,
3501 false,
3502 )));
3503 let offsets: &[i64; 5] = &[0, 2, 4, 4, 7];
3504 let sizes: &[i64; 4] = &[2, 2, 0, 3];
3505 test_roundtrip_list_view_of_dict_impl::<i64, i64>(list_data_type, offsets, sizes);
3506 }
3507
3508 #[test]
3509 fn test_roundtrip_sliced_list_view_of_dict() {
3510 #[allow(deprecated)]
3511 let list_data_type = DataType::ListView(Arc::new(Field::new_dict(
3512 "item",
3513 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3514 true,
3515 3,
3516 false,
3517 )));
3518
3519 let values = StringArray::from(vec![Some("alpha"), None, Some("beta"), Some("gamma")]);
3520 let keys = Int32Array::from_iter_values([0, 0, 1, 2, 3, 0, 2, 1, 0, 3, 2, 1]);
3521 let dict_array = DictionaryArray::new(keys, Arc::new(values));
3522 let dict_data = dict_array.to_data();
3523
3524 let offsets: &[i32; 7] = &[0, 2, 4, 4, 7, 9, 12];
3525 let sizes: &[i32; 6] = &[2, 2, 0, 3, 2, 3];
3526 let value_offsets = Buffer::from_slice_ref(offsets);
3527 let value_sizes = Buffer::from_slice_ref(sizes);
3528
3529 let list_data = ArrayData::builder(list_data_type)
3530 .len(6)
3531 .add_buffer(value_offsets)
3532 .add_buffer(value_sizes)
3533 .add_child_data(dict_data)
3534 .build()
3535 .unwrap();
3536 let list_view_array = GenericListViewArray::<i32>::from(list_data);
3537
3538 let schema = Arc::new(Schema::new(vec![Field::new(
3539 "f1",
3540 list_view_array.data_type().clone(),
3541 false,
3542 )]));
3543 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(list_view_array)]).unwrap();
3544
3545 let sliced_batch = input_batch.slice(1, 4);
3546
3547 let output_batch = deserialize_file(serialize_file(&sliced_batch));
3548 assert_eq!(sliced_batch, output_batch);
3549
3550 let output_batch = deserialize_stream(serialize_stream(&sliced_batch));
3551 assert_eq!(sliced_batch, output_batch);
3552 }
3553
3554 #[test]
3555 fn test_roundtrip_dense_union_of_dict() {
3556 let values = StringArray::from(vec![Some("alpha"), None, Some("beta"), Some("gamma")]);
3557 let keys = Int32Array::from_iter_values([0, 0, 1, 2, 3, 0, 2]);
3558 let dict_array = DictionaryArray::new(keys, Arc::new(values));
3559
3560 #[allow(deprecated)]
3561 let dict_field = Arc::new(Field::new_dict(
3562 "dict",
3563 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3564 true,
3565 1,
3566 false,
3567 ));
3568 let int_field = Arc::new(Field::new("int", DataType::Int32, false));
3569 let union_fields = UnionFields::try_new(vec![0, 1], vec![dict_field, int_field]).unwrap();
3570
3571 let types = ScalarBuffer::from(vec![0i8, 0, 1, 0, 1, 0, 0]);
3572 let offsets = ScalarBuffer::from(vec![0i32, 1, 0, 2, 1, 3, 4]);
3573
3574 let int_array = Int32Array::from(vec![100, 200]);
3575
3576 let union = UnionArray::try_new(
3577 union_fields.clone(),
3578 types,
3579 Some(offsets),
3580 vec![Arc::new(dict_array), Arc::new(int_array)],
3581 )
3582 .unwrap();
3583
3584 let schema = Arc::new(Schema::new(vec![Field::new(
3585 "union",
3586 DataType::Union(union_fields, UnionMode::Dense),
3587 false,
3588 )]));
3589 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap();
3590
3591 let output_batch = deserialize_file(serialize_file(&input_batch));
3592 assert_eq!(input_batch, output_batch);
3593
3594 let output_batch = deserialize_stream(serialize_stream(&input_batch));
3595 assert_eq!(input_batch, output_batch);
3596 }
3597
3598 #[test]
3599 fn test_roundtrip_sparse_union_of_dict() {
3600 let values = StringArray::from(vec![Some("alpha"), None, Some("beta"), Some("gamma")]);
3601 let keys = Int32Array::from_iter_values([0, 0, 1, 2, 3, 0, 2]);
3602 let dict_array = DictionaryArray::new(keys, Arc::new(values));
3603
3604 #[allow(deprecated)]
3605 let dict_field = Arc::new(Field::new_dict(
3606 "dict",
3607 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3608 true,
3609 2,
3610 false,
3611 ));
3612 let int_field = Arc::new(Field::new("int", DataType::Int32, false));
3613 let union_fields = UnionFields::try_new(vec![0, 1], vec![dict_field, int_field]).unwrap();
3614
3615 let types = ScalarBuffer::from(vec![0i8, 0, 1, 0, 1, 0, 0]);
3616
3617 let int_array = Int32Array::from(vec![0, 0, 100, 0, 200, 0, 0]);
3618
3619 let union = UnionArray::try_new(
3620 union_fields.clone(),
3621 types,
3622 None,
3623 vec![Arc::new(dict_array), Arc::new(int_array)],
3624 )
3625 .unwrap();
3626
3627 let schema = Arc::new(Schema::new(vec![Field::new(
3628 "union",
3629 DataType::Union(union_fields, UnionMode::Sparse),
3630 false,
3631 )]));
3632 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap();
3633
3634 let output_batch = deserialize_file(serialize_file(&input_batch));
3635 assert_eq!(input_batch, output_batch);
3636
3637 let output_batch = deserialize_stream(serialize_stream(&input_batch));
3638 assert_eq!(input_batch, output_batch);
3639 }
3640
3641 #[test]
3642 fn test_roundtrip_map_with_dict_keys() {
3643 let key_values = StringArray::from(vec!["key_a", "key_b", "key_c"]);
3646 let keys = Int32Array::from_iter_values([0, 1, 2, 0, 1, 0]);
3647 let dict_keys = DictionaryArray::new(keys, Arc::new(key_values));
3648
3649 let values = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
3650
3651 #[allow(deprecated)]
3652 let entries_field = Arc::new(Field::new(
3653 "entries",
3654 DataType::Struct(
3655 vec![
3656 Field::new_dict(
3657 "key",
3658 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3659 false,
3660 1,
3661 false,
3662 ),
3663 Field::new("value", DataType::Int32, true),
3664 ]
3665 .into(),
3666 ),
3667 false,
3668 ));
3669
3670 let entries = StructArray::from(vec![
3671 (
3672 Arc::new(Field::new(
3673 "key",
3674 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3675 false,
3676 )),
3677 Arc::new(dict_keys) as ArrayRef,
3678 ),
3679 (
3680 Arc::new(Field::new("value", DataType::Int32, true)),
3681 Arc::new(values) as ArrayRef,
3682 ),
3683 ]);
3684
3685 let offsets = Buffer::from_slice_ref([0i32, 2, 4, 6]);
3686
3687 let map_data = ArrayData::builder(DataType::Map(entries_field, false))
3688 .len(3)
3689 .add_buffer(offsets)
3690 .add_child_data(entries.into_data())
3691 .build()
3692 .unwrap();
3693 let map_array = MapArray::from(map_data);
3694
3695 let schema = Arc::new(Schema::new(vec![Field::new(
3696 "map",
3697 map_array.data_type().clone(),
3698 false,
3699 )]));
3700 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(map_array)]).unwrap();
3701
3702 let output_batch = deserialize_file(serialize_file(&input_batch));
3703 assert_eq!(input_batch, output_batch);
3704
3705 let output_batch = deserialize_stream(serialize_stream(&input_batch));
3706 assert_eq!(input_batch, output_batch);
3707 }
3708
3709 #[test]
3710 fn test_roundtrip_map_with_dict_values() {
3711 let keys = StringArray::from(vec!["a", "b", "c", "d", "e", "f"]);
3714
3715 let value_values = StringArray::from(vec!["val_x", "val_y", "val_z"]);
3716 let value_keys = Int32Array::from_iter_values([0, 1, 2, 0, 1, 0]);
3717 let dict_values = DictionaryArray::new(value_keys, Arc::new(value_values));
3718
3719 #[allow(deprecated)]
3720 let entries_field = Arc::new(Field::new(
3721 "entries",
3722 DataType::Struct(
3723 vec![
3724 Field::new("key", DataType::Utf8, false),
3725 Field::new_dict(
3726 "value",
3727 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3728 true,
3729 2,
3730 false,
3731 ),
3732 ]
3733 .into(),
3734 ),
3735 false,
3736 ));
3737
3738 let entries = StructArray::from(vec![
3739 (
3740 Arc::new(Field::new("key", DataType::Utf8, false)),
3741 Arc::new(keys) as ArrayRef,
3742 ),
3743 (
3744 Arc::new(Field::new(
3745 "value",
3746 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3747 true,
3748 )),
3749 Arc::new(dict_values) as ArrayRef,
3750 ),
3751 ]);
3752
3753 let offsets = Buffer::from_slice_ref([0i32, 2, 4, 6]);
3754
3755 let map_data = ArrayData::builder(DataType::Map(entries_field, false))
3756 .len(3)
3757 .add_buffer(offsets)
3758 .add_child_data(entries.into_data())
3759 .build()
3760 .unwrap();
3761 let map_array = MapArray::from(map_data);
3762
3763 let schema = Arc::new(Schema::new(vec![Field::new(
3764 "map",
3765 map_array.data_type().clone(),
3766 false,
3767 )]));
3768 let input_batch = RecordBatch::try_new(schema, vec![Arc::new(map_array)]).unwrap();
3769
3770 let output_batch = deserialize_file(serialize_file(&input_batch));
3771 assert_eq!(input_batch, output_batch);
3772
3773 let output_batch = deserialize_stream(serialize_stream(&input_batch));
3774 assert_eq!(input_batch, output_batch);
3775 }
3776
3777 #[test]
3778 fn test_decimal128_alignment16_is_sufficient() {
3779 const IPC_ALIGNMENT: usize = 16;
3780
3781 for num_cols in [1, 2, 3, 17, 50, 73, 99] {
3786 let num_rows = (num_cols * 7 + 11) % 100; let mut fields = Vec::new();
3789 let mut arrays = Vec::new();
3790 for i in 0..num_cols {
3791 let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
3792 let array = Decimal128Array::from(vec![num_cols as i128; num_rows]);
3793 fields.push(field);
3794 arrays.push(Arc::new(array) as Arc<dyn Array>);
3795 }
3796 let schema = Schema::new(fields);
3797 let batch = RecordBatch::try_new(Arc::new(schema), arrays).unwrap();
3798
3799 let mut writer = FileWriter::try_new_with_options(
3800 Vec::new(),
3801 batch.schema_ref(),
3802 IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
3803 )
3804 .unwrap();
3805 writer.write(&batch).unwrap();
3806 writer.finish().unwrap();
3807
3808 let out: Vec<u8> = writer.into_inner().unwrap();
3809
3810 let buffer = Buffer::from_vec(out);
3811 let trailer_start = buffer.len() - 10;
3812 let footer_len =
3813 read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
3814 let footer =
3815 root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
3816
3817 let schema = fb_to_schema(footer.schema().unwrap());
3818
3819 let decoder =
3822 FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true);
3823
3824 let batches = footer.recordBatches().unwrap();
3825
3826 let block = batches.get(0);
3827 let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
3828 let data = buffer.slice_with_length(block.offset() as _, block_len);
3829
3830 let batch2 = decoder.read_record_batch(block, &data).unwrap().unwrap();
3831
3832 assert_eq!(batch, batch2);
3833 }
3834 }
3835
3836 #[test]
3837 fn test_decimal128_alignment8_is_unaligned() {
3838 const IPC_ALIGNMENT: usize = 8;
3839
3840 let num_cols = 2;
3841 let num_rows = 1;
3842
3843 let mut fields = Vec::new();
3844 let mut arrays = Vec::new();
3845 for i in 0..num_cols {
3846 let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
3847 let array = Decimal128Array::from(vec![num_cols as i128; num_rows]);
3848 fields.push(field);
3849 arrays.push(Arc::new(array) as Arc<dyn Array>);
3850 }
3851 let schema = Schema::new(fields);
3852 let batch = RecordBatch::try_new(Arc::new(schema), arrays).unwrap();
3853
3854 let mut writer = FileWriter::try_new_with_options(
3855 Vec::new(),
3856 batch.schema_ref(),
3857 IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
3858 )
3859 .unwrap();
3860 writer.write(&batch).unwrap();
3861 writer.finish().unwrap();
3862
3863 let out: Vec<u8> = writer.into_inner().unwrap();
3864
3865 let buffer = Buffer::from_vec(out);
3866 let trailer_start = buffer.len() - 10;
3867 let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
3868 let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
3869 let schema = fb_to_schema(footer.schema().unwrap());
3870
3871 let decoder =
3874 FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true);
3875
3876 let batches = footer.recordBatches().unwrap();
3877
3878 let block = batches.get(0);
3879 let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
3880 let data = buffer.slice_with_length(block.offset() as _, block_len);
3881
3882 let result = decoder.read_record_batch(block, &data);
3883
3884 let error = result.unwrap_err();
3885 assert_eq!(
3886 error.to_string(),
3887 "Invalid argument error: Misaligned buffers[0] in array of type Decimal128(38, 10), \
3888 offset from expected alignment of 16 by 8"
3889 );
3890 }
3891
3892 #[test]
3893 fn test_flush() {
3894 let num_cols = 2;
3897 let mut fields = Vec::new();
3898 let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap();
3899 for i in 0..num_cols {
3900 let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
3901 fields.push(field);
3902 }
3903 let schema = Schema::new(fields);
3904 let inner_stream_writer = BufWriter::with_capacity(1024, Vec::new());
3905 let inner_file_writer = BufWriter::with_capacity(1024, Vec::new());
3906 let mut stream_writer =
3907 StreamWriter::try_new_with_options(inner_stream_writer, &schema, options.clone())
3908 .unwrap();
3909 let mut file_writer =
3910 FileWriter::try_new_with_options(inner_file_writer, &schema, options).unwrap();
3911
3912 let stream_bytes_written_on_new = stream_writer.get_ref().get_ref().len();
3913 let file_bytes_written_on_new = file_writer.get_ref().get_ref().len();
3914 stream_writer.flush().unwrap();
3915 file_writer.flush().unwrap();
3916 let stream_bytes_written_on_flush = stream_writer.get_ref().get_ref().len();
3917 let file_bytes_written_on_flush = file_writer.get_ref().get_ref().len();
3918 let stream_out = stream_writer.into_inner().unwrap().into_inner().unwrap();
3919 let expected_stream_flushed_bytes = stream_out.len() - 8;
3923 let expected_file_flushed_bytes = expected_stream_flushed_bytes + 8;
3926
3927 assert!(
3928 stream_bytes_written_on_new < stream_bytes_written_on_flush,
3929 "this test makes no sense if flush is not actually required"
3930 );
3931 assert!(
3932 file_bytes_written_on_new < file_bytes_written_on_flush,
3933 "this test makes no sense if flush is not actually required"
3934 );
3935 assert_eq!(stream_bytes_written_on_flush, expected_stream_flushed_bytes);
3936 assert_eq!(file_bytes_written_on_flush, expected_file_flushed_bytes);
3937 }
3938
3939 #[test]
3940 fn test_roundtrip_list_of_fixed_list() -> Result<(), ArrowError> {
3941 let l1_type =
3942 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, false)), 3);
3943 let l2_type = DataType::List(Arc::new(Field::new("item", l1_type.clone(), false)));
3944
3945 let l0_builder = Float32Builder::new();
3946 let l1_builder = FixedSizeListBuilder::new(l0_builder, 3).with_field(Arc::new(Field::new(
3947 "item",
3948 DataType::Float32,
3949 false,
3950 )));
3951 let mut l2_builder =
3952 ListBuilder::new(l1_builder).with_field(Arc::new(Field::new("item", l1_type, false)));
3953
3954 for point in [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0], [7.0, 8.0, 9.0]] {
3955 l2_builder.values().values().append_value(point[0]);
3956 l2_builder.values().values().append_value(point[1]);
3957 l2_builder.values().values().append_value(point[2]);
3958
3959 l2_builder.values().append(true);
3960 }
3961 l2_builder.append(true);
3962
3963 let point = [10., 11., 12.];
3964 l2_builder.values().values().append_value(point[0]);
3965 l2_builder.values().values().append_value(point[1]);
3966 l2_builder.values().values().append_value(point[2]);
3967
3968 l2_builder.values().append(true);
3969 l2_builder.append(true);
3970
3971 let array = Arc::new(l2_builder.finish()) as ArrayRef;
3972
3973 let schema = Arc::new(Schema::new_with_metadata(
3974 vec![Field::new("points", l2_type, false)],
3975 HashMap::default(),
3976 ));
3977
3978 test_slices(&array, &schema, 0, 1)?;
3981 test_slices(&array, &schema, 0, 2)?;
3982 test_slices(&array, &schema, 1, 1)?;
3983
3984 Ok(())
3985 }
3986
3987 #[test]
3988 fn test_roundtrip_list_of_fixed_list_w_nulls() -> Result<(), ArrowError> {
3989 let l0_builder = Float32Builder::new();
3990 let l1_builder = FixedSizeListBuilder::new(l0_builder, 3);
3991 let mut l2_builder = ListBuilder::new(l1_builder);
3992
3993 for point in [
3994 [Some(1.0), Some(2.0), None],
3995 [Some(4.0), Some(5.0), Some(6.0)],
3996 [None, Some(8.0), Some(9.0)],
3997 ] {
3998 for p in point {
3999 match p {
4000 Some(p) => l2_builder.values().values().append_value(p),
4001 None => l2_builder.values().values().append_null(),
4002 }
4003 }
4004
4005 l2_builder.values().append(true);
4006 }
4007 l2_builder.append(true);
4008
4009 let point = [Some(10.), None, None];
4010 for p in point {
4011 match p {
4012 Some(p) => l2_builder.values().values().append_value(p),
4013 None => l2_builder.values().values().append_null(),
4014 }
4015 }
4016
4017 l2_builder.values().append(true);
4018 l2_builder.append(true);
4019
4020 let array = Arc::new(l2_builder.finish()) as ArrayRef;
4021
4022 let schema = Arc::new(Schema::new_with_metadata(
4023 vec![Field::new(
4024 "points",
4025 DataType::List(Arc::new(Field::new(
4026 "item",
4027 DataType::FixedSizeList(
4028 Arc::new(Field::new("item", DataType::Float32, true)),
4029 3,
4030 ),
4031 true,
4032 ))),
4033 true,
4034 )],
4035 HashMap::default(),
4036 ));
4037
4038 test_slices(&array, &schema, 0, 1)?;
4041 test_slices(&array, &schema, 0, 2)?;
4042 test_slices(&array, &schema, 1, 1)?;
4043
4044 Ok(())
4045 }
4046
4047 fn test_slices(
4048 parent_array: &ArrayRef,
4049 schema: &SchemaRef,
4050 offset: usize,
4051 length: usize,
4052 ) -> Result<(), ArrowError> {
4053 let subarray = parent_array.slice(offset, length);
4054 let original_batch = RecordBatch::try_new(schema.clone(), vec![subarray])?;
4055
4056 let mut bytes = Vec::new();
4057 let mut writer = StreamWriter::try_new(&mut bytes, schema)?;
4058 writer.write(&original_batch)?;
4059 writer.finish()?;
4060
4061 let mut cursor = std::io::Cursor::new(bytes);
4062 let mut reader = StreamReader::try_new(&mut cursor, None)?;
4063 let returned_batch = reader.next().unwrap()?;
4064
4065 assert_eq!(original_batch, returned_batch);
4066
4067 Ok(())
4068 }
4069
4070 #[test]
4071 fn test_roundtrip_fixed_list() -> Result<(), ArrowError> {
4072 let int_builder = Int64Builder::new();
4073 let mut fixed_list_builder = FixedSizeListBuilder::new(int_builder, 3)
4074 .with_field(Arc::new(Field::new("item", DataType::Int64, false)));
4075
4076 for point in [[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]] {
4077 fixed_list_builder.values().append_value(point[0]);
4078 fixed_list_builder.values().append_value(point[1]);
4079 fixed_list_builder.values().append_value(point[2]);
4080
4081 fixed_list_builder.append(true);
4082 }
4083
4084 let array = Arc::new(fixed_list_builder.finish()) as ArrayRef;
4085
4086 let schema = Arc::new(Schema::new_with_metadata(
4087 vec![Field::new(
4088 "points",
4089 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, false)), 3),
4090 false,
4091 )],
4092 HashMap::default(),
4093 ));
4094
4095 test_slices(&array, &schema, 0, 4)?;
4098 test_slices(&array, &schema, 0, 2)?;
4099 test_slices(&array, &schema, 1, 3)?;
4100 test_slices(&array, &schema, 2, 1)?;
4101
4102 Ok(())
4103 }
4104
4105 #[test]
4106 fn test_roundtrip_fixed_list_w_nulls() -> Result<(), ArrowError> {
4107 let int_builder = Int64Builder::new();
4108 let mut fixed_list_builder = FixedSizeListBuilder::new(int_builder, 3);
4109
4110 for point in [
4111 [Some(1), Some(2), None],
4112 [Some(4), Some(5), Some(6)],
4113 [None, Some(8), Some(9)],
4114 [Some(10), None, None],
4115 ] {
4116 for p in point {
4117 match p {
4118 Some(p) => fixed_list_builder.values().append_value(p),
4119 None => fixed_list_builder.values().append_null(),
4120 }
4121 }
4122
4123 fixed_list_builder.append(true);
4124 }
4125
4126 let array = Arc::new(fixed_list_builder.finish()) as ArrayRef;
4127
4128 let schema = Arc::new(Schema::new_with_metadata(
4129 vec![Field::new(
4130 "points",
4131 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, true)), 3),
4132 true,
4133 )],
4134 HashMap::default(),
4135 ));
4136
4137 test_slices(&array, &schema, 0, 4)?;
4140 test_slices(&array, &schema, 0, 2)?;
4141 test_slices(&array, &schema, 1, 3)?;
4142 test_slices(&array, &schema, 2, 1)?;
4143
4144 Ok(())
4145 }
4146
4147 #[test]
4148 fn test_metadata_encoding_ordering() {
4149 fn create_hash() -> u64 {
4150 let metadata: HashMap<String, String> = [
4151 ("a", "1"), ("b", "2"), ("c", "3"), ("d", "4"), ("e", "5"), ]
4157 .into_iter()
4158 .map(|(k, v)| (k.to_owned(), v.to_owned()))
4159 .collect();
4160
4161 let schema = Arc::new(
4163 Schema::new(vec![
4164 Field::new("a", DataType::Int64, true).with_metadata(metadata.clone()),
4165 ])
4166 .with_metadata(metadata)
4167 .clone(),
4168 );
4169 let batch = RecordBatch::new_empty(schema.clone());
4170
4171 let mut bytes = Vec::new();
4172 let mut w = StreamWriter::try_new(&mut bytes, batch.schema_ref()).unwrap();
4173 w.write(&batch).unwrap();
4174 w.finish().unwrap();
4175
4176 let mut h = std::hash::DefaultHasher::new();
4177 h.write(&bytes);
4178 h.finish()
4179 }
4180
4181 let expected = create_hash();
4182
4183 let all_passed = (0..20).all(|_| create_hash() == expected);
4188 assert!(all_passed);
4189 }
4190
4191 #[test]
4192 fn test_dictionary_tracker_reset() {
4193 let data_gen = IpcDataGenerator::default();
4194 let mut dictionary_tracker = DictionaryTracker::new(false);
4195 let writer_options = IpcWriteOptions::default();
4196 let mut compression_ctx = CompressionContext::default();
4197
4198 let schema = Arc::new(Schema::new(vec![Field::new(
4199 "a",
4200 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
4201 false,
4202 )]));
4203
4204 let mut write_single_batch_stream =
4205 |batch: RecordBatch, dict_tracker: &mut DictionaryTracker| -> Vec<u8> {
4206 let mut buffer = Vec::new();
4207
4208 let stream_header = data_gen.schema_to_bytes_with_dictionary_tracker(
4210 &schema,
4211 dict_tracker,
4212 &writer_options,
4213 );
4214 _ = write_message(&mut buffer, stream_header, &writer_options).unwrap();
4215
4216 let (encoded_dicts, encoded_batch) = data_gen
4217 .encode(&batch, dict_tracker, &writer_options, &mut compression_ctx)
4218 .unwrap();
4219 for encoded_dict in encoded_dicts {
4220 _ = write_message(&mut buffer, encoded_dict, &writer_options).unwrap();
4221 }
4222 _ = write_message(&mut buffer, encoded_batch, &writer_options).unwrap();
4223
4224 buffer
4225 };
4226
4227 let batch1 = RecordBatch::try_new(
4228 schema.clone(),
4229 vec![Arc::new(DictionaryArray::new(
4230 UInt8Array::from_iter_values([0]),
4231 Arc::new(StringArray::from_iter_values(["a"])),
4232 ))],
4233 )
4234 .unwrap();
4235 let buffer = write_single_batch_stream(batch1.clone(), &mut dictionary_tracker);
4236
4237 let mut reader = StreamReader::try_new(Cursor::new(buffer), None).unwrap();
4239 let read_batch = reader.next().unwrap().unwrap();
4240 assert_eq!(read_batch, batch1);
4241
4242 dictionary_tracker.clear();
4244
4245 let batch2 = RecordBatch::try_new(
4247 schema.clone(),
4248 vec![Arc::new(DictionaryArray::new(
4249 UInt8Array::from_iter_values([0]),
4250 Arc::new(StringArray::from_iter_values(["a"])),
4251 ))],
4252 )
4253 .unwrap();
4254 let buffer = write_single_batch_stream(batch2.clone(), &mut dictionary_tracker);
4255 let mut reader = StreamReader::try_new(Cursor::new(buffer), None).unwrap();
4256 let read_batch = reader.next().unwrap().unwrap();
4257 assert_eq!(read_batch, batch2);
4258 }
4259}