1use std::cmp::min;
28use std::collections::HashMap;
29use std::io::{BufWriter, Write};
30use std::mem::size_of;
31use std::sync::Arc;
32
33use flatbuffers::FlatBufferBuilder;
34
35use arrow_array::builder::BufferBuilder;
36use arrow_array::cast::*;
37use arrow_array::types::{Int16Type, Int32Type, Int64Type, RunEndIndexType};
38use arrow_array::*;
39use arrow_buffer::bit_util;
40use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer};
41use arrow_data::{layout, ArrayData, ArrayDataBuilder, BufferSpec};
42use arrow_schema::*;
43
44use crate::compression::CompressionCodec;
45use crate::convert::IpcSchemaEncoder;
46use crate::CONTINUATION_MARKER;
47
48#[derive(Debug, Clone)]
50pub struct IpcWriteOptions {
51 alignment: u8,
54 write_legacy_ipc_format: bool,
56 metadata_version: crate::MetadataVersion,
65 batch_compression_type: Option<crate::CompressionType>,
68 dictionary_handling: DictionaryHandling,
70}
71
72impl IpcWriteOptions {
73 pub fn try_with_compression(
78 mut self,
79 batch_compression_type: Option<crate::CompressionType>,
80 ) -> Result<Self, ArrowError> {
81 self.batch_compression_type = batch_compression_type;
82
83 if self.batch_compression_type.is_some()
84 && self.metadata_version < crate::MetadataVersion::V5
85 {
86 return Err(ArrowError::InvalidArgumentError(
87 "Compression only supported in metadata v5 and above".to_string(),
88 ));
89 }
90 Ok(self)
91 }
92 pub fn try_new(
94 alignment: usize,
95 write_legacy_ipc_format: bool,
96 metadata_version: crate::MetadataVersion,
97 ) -> Result<Self, ArrowError> {
98 let is_alignment_valid =
99 alignment == 8 || alignment == 16 || alignment == 32 || alignment == 64;
100 if !is_alignment_valid {
101 return Err(ArrowError::InvalidArgumentError(
102 "Alignment should be 8, 16, 32, or 64.".to_string(),
103 ));
104 }
105 let alignment: u8 = u8::try_from(alignment).expect("range already checked");
106 match metadata_version {
107 crate::MetadataVersion::V1
108 | crate::MetadataVersion::V2
109 | crate::MetadataVersion::V3 => Err(ArrowError::InvalidArgumentError(
110 "Writing IPC metadata version 3 and lower not supported".to_string(),
111 )),
112 #[allow(deprecated)]
113 crate::MetadataVersion::V4 => Ok(Self {
114 alignment,
115 write_legacy_ipc_format,
116 metadata_version,
117 batch_compression_type: None,
118 dictionary_handling: DictionaryHandling::default(),
119 }),
120 crate::MetadataVersion::V5 => {
121 if write_legacy_ipc_format {
122 Err(ArrowError::InvalidArgumentError(
123 "Legacy IPC format only supported on metadata version 4".to_string(),
124 ))
125 } else {
126 Ok(Self {
127 alignment,
128 write_legacy_ipc_format,
129 metadata_version,
130 batch_compression_type: None,
131 dictionary_handling: DictionaryHandling::default(),
132 })
133 }
134 }
135 z => Err(ArrowError::InvalidArgumentError(format!(
136 "Unsupported crate::MetadataVersion {z:?}"
137 ))),
138 }
139 }
140
141 pub fn with_dictionary_handling(mut self, dictionary_handling: DictionaryHandling) -> Self {
143 self.dictionary_handling = dictionary_handling;
144 self
145 }
146}
147
148impl Default for IpcWriteOptions {
149 fn default() -> Self {
150 Self {
151 alignment: 64,
152 write_legacy_ipc_format: false,
153 metadata_version: crate::MetadataVersion::V5,
154 batch_compression_type: None,
155 dictionary_handling: DictionaryHandling::default(),
156 }
157 }
158}
159
160#[derive(Debug, Default)]
161pub struct IpcDataGenerator {}
193
194impl IpcDataGenerator {
195 pub fn schema_to_bytes_with_dictionary_tracker(
198 &self,
199 schema: &Schema,
200 dictionary_tracker: &mut DictionaryTracker,
201 write_options: &IpcWriteOptions,
202 ) -> EncodedData {
203 let mut fbb = FlatBufferBuilder::new();
204 let schema = {
205 let fb = IpcSchemaEncoder::new()
206 .with_dictionary_tracker(dictionary_tracker)
207 .schema_to_fb_offset(&mut fbb, schema);
208 fb.as_union_value()
209 };
210
211 let mut message = crate::MessageBuilder::new(&mut fbb);
212 message.add_version(write_options.metadata_version);
213 message.add_header_type(crate::MessageHeader::Schema);
214 message.add_bodyLength(0);
215 message.add_header(schema);
216 let data = message.finish();
218 fbb.finish(data, None);
219
220 let data = fbb.finished_data();
221 EncodedData {
222 ipc_message: data.to_vec(),
223 arrow_data: vec![],
224 }
225 }
226
227 fn _encode_dictionaries<I: Iterator<Item = i64>>(
228 &self,
229 column: &ArrayRef,
230 encoded_dictionaries: &mut Vec<EncodedData>,
231 dictionary_tracker: &mut DictionaryTracker,
232 write_options: &IpcWriteOptions,
233 dict_id: &mut I,
234 ) -> Result<(), ArrowError> {
235 match column.data_type() {
236 DataType::Struct(fields) => {
237 let s = as_struct_array(column);
238 for (field, column) in fields.iter().zip(s.columns()) {
239 self.encode_dictionaries(
240 field,
241 column,
242 encoded_dictionaries,
243 dictionary_tracker,
244 write_options,
245 dict_id,
246 )?;
247 }
248 }
249 DataType::RunEndEncoded(_, values) => {
250 let data = column.to_data();
251 if data.child_data().len() != 2 {
252 return Err(ArrowError::InvalidArgumentError(format!(
253 "The run encoded array should have exactly two child arrays. Found {}",
254 data.child_data().len()
255 )));
256 }
257 let values_array = make_array(data.child_data()[1].clone());
260 self.encode_dictionaries(
261 values,
262 &values_array,
263 encoded_dictionaries,
264 dictionary_tracker,
265 write_options,
266 dict_id,
267 )?;
268 }
269 DataType::List(field) => {
270 let list = as_list_array(column);
271 self.encode_dictionaries(
272 field,
273 list.values(),
274 encoded_dictionaries,
275 dictionary_tracker,
276 write_options,
277 dict_id,
278 )?;
279 }
280 DataType::LargeList(field) => {
281 let list = as_large_list_array(column);
282 self.encode_dictionaries(
283 field,
284 list.values(),
285 encoded_dictionaries,
286 dictionary_tracker,
287 write_options,
288 dict_id,
289 )?;
290 }
291 DataType::FixedSizeList(field, _) => {
292 let list = column
293 .as_any()
294 .downcast_ref::<FixedSizeListArray>()
295 .expect("Unable to downcast to fixed size list array");
296 self.encode_dictionaries(
297 field,
298 list.values(),
299 encoded_dictionaries,
300 dictionary_tracker,
301 write_options,
302 dict_id,
303 )?;
304 }
305 DataType::Map(field, _) => {
306 let map_array = as_map_array(column);
307
308 let (keys, values) = match field.data_type() {
309 DataType::Struct(fields) if fields.len() == 2 => (&fields[0], &fields[1]),
310 _ => panic!("Incorrect field data type {:?}", field.data_type()),
311 };
312
313 self.encode_dictionaries(
315 keys,
316 map_array.keys(),
317 encoded_dictionaries,
318 dictionary_tracker,
319 write_options,
320 dict_id,
321 )?;
322
323 self.encode_dictionaries(
325 values,
326 map_array.values(),
327 encoded_dictionaries,
328 dictionary_tracker,
329 write_options,
330 dict_id,
331 )?;
332 }
333 DataType::Union(fields, _) => {
334 let union = as_union_array(column);
335 for (type_id, field) in fields.iter() {
336 let column = union.child(type_id);
337 self.encode_dictionaries(
338 field,
339 column,
340 encoded_dictionaries,
341 dictionary_tracker,
342 write_options,
343 dict_id,
344 )?;
345 }
346 }
347 _ => (),
348 }
349
350 Ok(())
351 }
352
353 fn encode_dictionaries<I: Iterator<Item = i64>>(
354 &self,
355 field: &Field,
356 column: &ArrayRef,
357 encoded_dictionaries: &mut Vec<EncodedData>,
358 dictionary_tracker: &mut DictionaryTracker,
359 write_options: &IpcWriteOptions,
360 dict_id_seq: &mut I,
361 ) -> Result<(), ArrowError> {
362 match column.data_type() {
363 DataType::Dictionary(_key_type, _value_type) => {
364 let dict_data = column.to_data();
365 let dict_values = &dict_data.child_data()[0];
366
367 let values = make_array(dict_data.child_data()[0].clone());
368
369 self._encode_dictionaries(
370 &values,
371 encoded_dictionaries,
372 dictionary_tracker,
373 write_options,
374 dict_id_seq,
375 )?;
376
377 let dict_id = dict_id_seq.next().ok_or_else(|| {
381 ArrowError::IpcError(format!("no dict id for field {}", field.name()))
382 })?;
383
384 match dictionary_tracker.insert_column(
385 dict_id,
386 column,
387 write_options.dictionary_handling,
388 )? {
389 DictionaryUpdate::None => {}
390 DictionaryUpdate::New | DictionaryUpdate::Replaced => {
391 encoded_dictionaries.push(self.dictionary_batch_to_bytes(
392 dict_id,
393 dict_values,
394 write_options,
395 false,
396 )?);
397 }
398 DictionaryUpdate::Delta(data) => {
399 encoded_dictionaries.push(self.dictionary_batch_to_bytes(
400 dict_id,
401 &data,
402 write_options,
403 true,
404 )?);
405 }
406 }
407 }
408 _ => self._encode_dictionaries(
409 column,
410 encoded_dictionaries,
411 dictionary_tracker,
412 write_options,
413 dict_id_seq,
414 )?,
415 }
416
417 Ok(())
418 }
419
420 pub fn encoded_batch(
424 &self,
425 batch: &RecordBatch,
426 dictionary_tracker: &mut DictionaryTracker,
427 write_options: &IpcWriteOptions,
428 ) -> Result<(Vec<EncodedData>, EncodedData), ArrowError> {
429 let schema = batch.schema();
430 let mut encoded_dictionaries = Vec::with_capacity(schema.flattened_fields().len());
431
432 let mut dict_id = dictionary_tracker.dict_ids.clone().into_iter();
433
434 for (i, field) in schema.fields().iter().enumerate() {
435 let column = batch.column(i);
436 self.encode_dictionaries(
437 field,
438 column,
439 &mut encoded_dictionaries,
440 dictionary_tracker,
441 write_options,
442 &mut dict_id,
443 )?;
444 }
445
446 let encoded_message = self.record_batch_to_bytes(batch, write_options)?;
447 Ok((encoded_dictionaries, encoded_message))
448 }
449
450 fn record_batch_to_bytes(
453 &self,
454 batch: &RecordBatch,
455 write_options: &IpcWriteOptions,
456 ) -> Result<EncodedData, ArrowError> {
457 let mut fbb = FlatBufferBuilder::new();
458
459 let mut nodes: Vec<crate::FieldNode> = vec![];
460 let mut buffers: Vec<crate::Buffer> = vec![];
461 let mut arrow_data: Vec<u8> = vec![];
462 let mut offset = 0;
463
464 let batch_compression_type = write_options.batch_compression_type;
466
467 let compression = batch_compression_type.map(|batch_compression_type| {
468 let mut c = crate::BodyCompressionBuilder::new(&mut fbb);
469 c.add_method(crate::BodyCompressionMethod::BUFFER);
470 c.add_codec(batch_compression_type);
471 c.finish()
472 });
473
474 let compression_codec: Option<CompressionCodec> =
475 batch_compression_type.map(TryInto::try_into).transpose()?;
476
477 let mut variadic_buffer_counts = vec![];
478
479 for array in batch.columns() {
480 let array_data = array.to_data();
481 offset = write_array_data(
482 &array_data,
483 &mut buffers,
484 &mut arrow_data,
485 &mut nodes,
486 offset,
487 array.len(),
488 array.null_count(),
489 compression_codec,
490 write_options,
491 )?;
492
493 append_variadic_buffer_counts(&mut variadic_buffer_counts, &array_data);
494 }
495 let len = arrow_data.len();
497 let pad_len = pad_to_alignment(write_options.alignment, len);
498 arrow_data.extend_from_slice(&PADDING[..pad_len]);
499
500 let buffers = fbb.create_vector(&buffers);
502 let nodes = fbb.create_vector(&nodes);
503 let variadic_buffer = if variadic_buffer_counts.is_empty() {
504 None
505 } else {
506 Some(fbb.create_vector(&variadic_buffer_counts))
507 };
508
509 let root = {
510 let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb);
511 batch_builder.add_length(batch.num_rows() as i64);
512 batch_builder.add_nodes(nodes);
513 batch_builder.add_buffers(buffers);
514 if let Some(c) = compression {
515 batch_builder.add_compression(c);
516 }
517
518 if let Some(v) = variadic_buffer {
519 batch_builder.add_variadicBufferCounts(v);
520 }
521 let b = batch_builder.finish();
522 b.as_union_value()
523 };
524 let mut message = crate::MessageBuilder::new(&mut fbb);
526 message.add_version(write_options.metadata_version);
527 message.add_header_type(crate::MessageHeader::RecordBatch);
528 message.add_bodyLength(arrow_data.len() as i64);
529 message.add_header(root);
530 let root = message.finish();
531 fbb.finish(root, None);
532 let finished_data = fbb.finished_data();
533
534 Ok(EncodedData {
535 ipc_message: finished_data.to_vec(),
536 arrow_data,
537 })
538 }
539
540 fn dictionary_batch_to_bytes(
543 &self,
544 dict_id: i64,
545 array_data: &ArrayData,
546 write_options: &IpcWriteOptions,
547 is_delta: bool,
548 ) -> Result<EncodedData, ArrowError> {
549 let mut fbb = FlatBufferBuilder::new();
550
551 let mut nodes: Vec<crate::FieldNode> = vec![];
552 let mut buffers: Vec<crate::Buffer> = vec![];
553 let mut arrow_data: Vec<u8> = vec![];
554
555 let batch_compression_type = write_options.batch_compression_type;
557
558 let compression = batch_compression_type.map(|batch_compression_type| {
559 let mut c = crate::BodyCompressionBuilder::new(&mut fbb);
560 c.add_method(crate::BodyCompressionMethod::BUFFER);
561 c.add_codec(batch_compression_type);
562 c.finish()
563 });
564
565 let compression_codec: Option<CompressionCodec> = batch_compression_type
566 .map(|batch_compression_type| batch_compression_type.try_into())
567 .transpose()?;
568
569 write_array_data(
570 array_data,
571 &mut buffers,
572 &mut arrow_data,
573 &mut nodes,
574 0,
575 array_data.len(),
576 array_data.null_count(),
577 compression_codec,
578 write_options,
579 )?;
580
581 let mut variadic_buffer_counts = vec![];
582 append_variadic_buffer_counts(&mut variadic_buffer_counts, array_data);
583
584 let len = arrow_data.len();
586 let pad_len = pad_to_alignment(write_options.alignment, len);
587 arrow_data.extend_from_slice(&PADDING[..pad_len]);
588
589 let buffers = fbb.create_vector(&buffers);
591 let nodes = fbb.create_vector(&nodes);
592 let variadic_buffer = if variadic_buffer_counts.is_empty() {
593 None
594 } else {
595 Some(fbb.create_vector(&variadic_buffer_counts))
596 };
597
598 let root = {
599 let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb);
600 batch_builder.add_length(array_data.len() as i64);
601 batch_builder.add_nodes(nodes);
602 batch_builder.add_buffers(buffers);
603 if let Some(c) = compression {
604 batch_builder.add_compression(c);
605 }
606 if let Some(v) = variadic_buffer {
607 batch_builder.add_variadicBufferCounts(v);
608 }
609 batch_builder.finish()
610 };
611
612 let root = {
613 let mut batch_builder = crate::DictionaryBatchBuilder::new(&mut fbb);
614 batch_builder.add_id(dict_id);
615 batch_builder.add_data(root);
616 batch_builder.add_isDelta(is_delta);
617 batch_builder.finish().as_union_value()
618 };
619
620 let root = {
621 let mut message_builder = crate::MessageBuilder::new(&mut fbb);
622 message_builder.add_version(write_options.metadata_version);
623 message_builder.add_header_type(crate::MessageHeader::DictionaryBatch);
624 message_builder.add_bodyLength(arrow_data.len() as i64);
625 message_builder.add_header(root);
626 message_builder.finish()
627 };
628
629 fbb.finish(root, None);
630 let finished_data = fbb.finished_data();
631
632 Ok(EncodedData {
633 ipc_message: finished_data.to_vec(),
634 arrow_data,
635 })
636 }
637}
638
639fn append_variadic_buffer_counts(counts: &mut Vec<i64>, array: &ArrayData) {
640 match array.data_type() {
641 DataType::BinaryView | DataType::Utf8View => {
642 counts.push(array.buffers().len() as i64 - 1);
645 }
646 DataType::Dictionary(_, _) => {
647 }
650 _ => {
651 for child in array.child_data() {
652 append_variadic_buffer_counts(counts, child)
653 }
654 }
655 }
656}
657
658pub(crate) fn unslice_run_array(arr: ArrayData) -> Result<ArrayData, ArrowError> {
659 match arr.data_type() {
660 DataType::RunEndEncoded(k, _) => match k.data_type() {
661 DataType::Int16 => {
662 Ok(into_zero_offset_run_array(RunArray::<Int16Type>::from(arr))?.into_data())
663 }
664 DataType::Int32 => {
665 Ok(into_zero_offset_run_array(RunArray::<Int32Type>::from(arr))?.into_data())
666 }
667 DataType::Int64 => {
668 Ok(into_zero_offset_run_array(RunArray::<Int64Type>::from(arr))?.into_data())
669 }
670 d => unreachable!("Unexpected data type {d}"),
671 },
672 d => Err(ArrowError::InvalidArgumentError(format!(
673 "The given array is not a run array. Data type of given array: {d}"
674 ))),
675 }
676}
677
678fn into_zero_offset_run_array<R: RunEndIndexType>(
681 run_array: RunArray<R>,
682) -> Result<RunArray<R>, ArrowError> {
683 let run_ends = run_array.run_ends();
684 if run_ends.offset() == 0 && run_ends.max_value() == run_ends.len() {
685 return Ok(run_array);
686 }
687
688 let start_physical_index = run_ends.get_start_physical_index();
690
691 let end_physical_index = run_ends.get_end_physical_index();
693
694 let physical_length = end_physical_index - start_physical_index + 1;
695
696 let offset = R::Native::usize_as(run_ends.offset());
698 let mut builder = BufferBuilder::<R::Native>::new(physical_length);
699 for run_end_value in &run_ends.values()[start_physical_index..end_physical_index] {
700 builder.append(run_end_value.sub_wrapping(offset));
701 }
702 builder.append(R::Native::from_usize(run_array.len()).unwrap());
703 let new_run_ends = unsafe {
704 ArrayDataBuilder::new(R::DATA_TYPE)
707 .len(physical_length)
708 .add_buffer(builder.finish())
709 .build_unchecked()
710 };
711
712 let new_values = run_array
714 .values()
715 .slice(start_physical_index, physical_length)
716 .into_data();
717
718 let builder = ArrayDataBuilder::new(run_array.data_type().clone())
719 .len(run_array.len())
720 .add_child_data(new_run_ends)
721 .add_child_data(new_values);
722 let array_data = unsafe {
723 builder.build_unchecked()
726 };
727 Ok(array_data.into())
728}
729
730#[derive(Debug, Clone, Copy, PartialEq, Eq)]
732pub enum DictionaryHandling {
733 Resend,
735 Delta,
741}
742
743impl Default for DictionaryHandling {
744 fn default() -> Self {
745 Self::Resend
746 }
747}
748
749#[derive(Debug, Clone)]
751pub enum DictionaryUpdate {
752 None,
755 New,
757 Replaced,
759 Delta(ArrayData),
761}
762
763#[derive(Debug)]
769pub struct DictionaryTracker {
770 written: HashMap<i64, ArrayData>,
771 dict_ids: Vec<i64>,
772 error_on_replacement: bool,
773}
774
775impl DictionaryTracker {
776 pub fn new(error_on_replacement: bool) -> Self {
782 #[allow(deprecated)]
783 Self {
784 written: HashMap::new(),
785 dict_ids: Vec::new(),
786 error_on_replacement,
787 }
788 }
789
790 pub fn next_dict_id(&mut self) -> i64 {
792 let next = self
793 .dict_ids
794 .last()
795 .copied()
796 .map(|i| i + 1)
797 .unwrap_or_default();
798
799 self.dict_ids.push(next);
800 next
801 }
802
803 pub fn dict_id(&mut self) -> &[i64] {
806 &self.dict_ids
807 }
808
809 #[deprecated(since = "56.1.0", note = "Use `insert_column` instead")]
819 pub fn insert(&mut self, dict_id: i64, column: &ArrayRef) -> Result<bool, ArrowError> {
820 let dict_data = column.to_data();
821 let dict_values = &dict_data.child_data()[0];
822
823 if let Some(last) = self.written.get(&dict_id) {
825 if ArrayData::ptr_eq(&last.child_data()[0], dict_values) {
826 return Ok(false);
828 }
829 if self.error_on_replacement {
830 if last.child_data()[0] == *dict_values {
832 return Ok(false);
834 }
835 return Err(ArrowError::InvalidArgumentError(
836 "Dictionary replacement detected when writing IPC file format. \
837 Arrow IPC files only support a single dictionary for a given field \
838 across all batches."
839 .to_string(),
840 ));
841 }
842 }
843
844 self.written.insert(dict_id, dict_data);
845 Ok(true)
846 }
847
848 pub fn insert_column(
864 &mut self,
865 dict_id: i64,
866 column: &ArrayRef,
867 dict_handling: DictionaryHandling,
868 ) -> Result<DictionaryUpdate, ArrowError> {
869 let new_data = column.to_data();
870 let new_values = &new_data.child_data()[0];
871
872 let Some(old) = self.written.get(&dict_id) else {
874 self.written.insert(dict_id, new_data);
875 return Ok(DictionaryUpdate::New);
876 };
877
878 let old_values = &old.child_data()[0];
881 if ArrayData::ptr_eq(old_values, new_values) {
882 return Ok(DictionaryUpdate::None);
883 }
884
885 let comparison = compare_dictionaries(old_values, new_values);
887 if matches!(comparison, DictionaryComparison::Equal) {
888 return Ok(DictionaryUpdate::None);
889 }
890
891 const REPLACEMENT_ERROR: &str =
892 "Dictionary replacement detected when writing IPC file format. \
893 Arrow IPC files only support a single dictionary for a given field \
894 across all batches.";
895
896 match comparison {
897 DictionaryComparison::NotEqual => {
898 if self.error_on_replacement {
899 return Err(ArrowError::InvalidArgumentError(
900 REPLACEMENT_ERROR.to_string(),
901 ));
902 }
903
904 self.written.insert(dict_id, new_data);
905 Ok(DictionaryUpdate::Replaced)
906 }
907 DictionaryComparison::Delta => match dict_handling {
908 DictionaryHandling::Resend => {
909 if self.error_on_replacement {
910 return Err(ArrowError::InvalidArgumentError(
911 REPLACEMENT_ERROR.to_string(),
912 ));
913 }
914
915 self.written.insert(dict_id, new_data);
916 Ok(DictionaryUpdate::Replaced)
917 }
918 DictionaryHandling::Delta => {
919 let delta =
920 new_values.slice(old_values.len(), new_values.len() - old_values.len());
921 self.written.insert(dict_id, new_data);
922 Ok(DictionaryUpdate::Delta(delta))
923 }
924 },
925 DictionaryComparison::Equal => unreachable!("Already checked equal case"),
926 }
927 }
928}
929
930#[derive(Debug, Clone)]
932enum DictionaryComparison {
933 NotEqual,
935 Equal,
937 Delta,
940}
941
942fn compare_dictionaries(old: &ArrayData, new: &ArrayData) -> DictionaryComparison {
944 let existing_len = old.len();
946 let new_len = new.len();
947 if existing_len == new_len {
948 if *old == *new {
949 return DictionaryComparison::Equal;
950 } else {
951 return DictionaryComparison::NotEqual;
952 }
953 }
954
955 if new_len < existing_len {
957 return DictionaryComparison::NotEqual;
958 }
959
960 if new.slice(0, existing_len) == *old {
962 return DictionaryComparison::Delta;
963 }
964
965 DictionaryComparison::NotEqual
966}
967
968pub struct FileWriter<W> {
991 writer: W,
993 write_options: IpcWriteOptions,
995 schema: SchemaRef,
997 block_offsets: usize,
999 dictionary_blocks: Vec<crate::Block>,
1001 record_blocks: Vec<crate::Block>,
1003 finished: bool,
1005 dictionary_tracker: DictionaryTracker,
1007 custom_metadata: HashMap<String, String>,
1009
1010 data_gen: IpcDataGenerator,
1011}
1012
1013impl<W: Write> FileWriter<BufWriter<W>> {
1014 pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1018 Self::try_new(BufWriter::new(writer), schema)
1019 }
1020}
1021
1022impl<W: Write> FileWriter<W> {
1023 pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1031 let write_options = IpcWriteOptions::default();
1032 Self::try_new_with_options(writer, schema, write_options)
1033 }
1034
1035 pub fn try_new_with_options(
1043 mut writer: W,
1044 schema: &Schema,
1045 write_options: IpcWriteOptions,
1046 ) -> Result<Self, ArrowError> {
1047 let data_gen = IpcDataGenerator::default();
1048 let pad_len = pad_to_alignment(write_options.alignment, super::ARROW_MAGIC.len());
1050 let header_size = super::ARROW_MAGIC.len() + pad_len;
1051 writer.write_all(&super::ARROW_MAGIC)?;
1052 writer.write_all(&PADDING[..pad_len])?;
1053 let mut dictionary_tracker = DictionaryTracker::new(true);
1055 let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker(
1056 schema,
1057 &mut dictionary_tracker,
1058 &write_options,
1059 );
1060 let (meta, data) = write_message(&mut writer, encoded_message, &write_options)?;
1061 Ok(Self {
1062 writer,
1063 write_options,
1064 schema: Arc::new(schema.clone()),
1065 block_offsets: meta + data + header_size,
1066 dictionary_blocks: vec![],
1067 record_blocks: vec![],
1068 finished: false,
1069 dictionary_tracker,
1070 custom_metadata: HashMap::new(),
1071 data_gen,
1072 })
1073 }
1074
1075 pub fn write_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
1077 self.custom_metadata.insert(key.into(), value.into());
1078 }
1079
1080 pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1082 if self.finished {
1083 return Err(ArrowError::IpcError(
1084 "Cannot write record batch to file writer as it is closed".to_string(),
1085 ));
1086 }
1087
1088 let (encoded_dictionaries, encoded_message) = self.data_gen.encoded_batch(
1089 batch,
1090 &mut self.dictionary_tracker,
1091 &self.write_options,
1092 )?;
1093
1094 for encoded_dictionary in encoded_dictionaries {
1095 let (meta, data) =
1096 write_message(&mut self.writer, encoded_dictionary, &self.write_options)?;
1097
1098 let block = crate::Block::new(self.block_offsets as i64, meta as i32, data as i64);
1099 self.dictionary_blocks.push(block);
1100 self.block_offsets += meta + data;
1101 }
1102
1103 let (meta, data) = write_message(&mut self.writer, encoded_message, &self.write_options)?;
1104
1105 let block = crate::Block::new(
1107 self.block_offsets as i64,
1108 meta as i32, data as i64,
1110 );
1111 self.record_blocks.push(block);
1112 self.block_offsets += meta + data;
1113 Ok(())
1114 }
1115
1116 pub fn finish(&mut self) -> Result<(), ArrowError> {
1118 if self.finished {
1119 return Err(ArrowError::IpcError(
1120 "Cannot write footer to file writer as it is closed".to_string(),
1121 ));
1122 }
1123
1124 write_continuation(&mut self.writer, &self.write_options, 0)?;
1126
1127 let mut fbb = FlatBufferBuilder::new();
1128 let dictionaries = fbb.create_vector(&self.dictionary_blocks);
1129 let record_batches = fbb.create_vector(&self.record_blocks);
1130 let mut dictionary_tracker = DictionaryTracker::new(true);
1131 let schema = IpcSchemaEncoder::new()
1132 .with_dictionary_tracker(&mut dictionary_tracker)
1133 .schema_to_fb_offset(&mut fbb, &self.schema);
1134 let fb_custom_metadata = (!self.custom_metadata.is_empty())
1135 .then(|| crate::convert::metadata_to_fb(&mut fbb, &self.custom_metadata));
1136
1137 let root = {
1138 let mut footer_builder = crate::FooterBuilder::new(&mut fbb);
1139 footer_builder.add_version(self.write_options.metadata_version);
1140 footer_builder.add_schema(schema);
1141 footer_builder.add_dictionaries(dictionaries);
1142 footer_builder.add_recordBatches(record_batches);
1143 if let Some(fb_custom_metadata) = fb_custom_metadata {
1144 footer_builder.add_custom_metadata(fb_custom_metadata);
1145 }
1146 footer_builder.finish()
1147 };
1148 fbb.finish(root, None);
1149 let footer_data = fbb.finished_data();
1150 self.writer.write_all(footer_data)?;
1151 self.writer
1152 .write_all(&(footer_data.len() as i32).to_le_bytes())?;
1153 self.writer.write_all(&super::ARROW_MAGIC)?;
1154 self.writer.flush()?;
1155 self.finished = true;
1156
1157 Ok(())
1158 }
1159
1160 pub fn schema(&self) -> &SchemaRef {
1162 &self.schema
1163 }
1164
1165 pub fn get_ref(&self) -> &W {
1167 &self.writer
1168 }
1169
1170 pub fn get_mut(&mut self) -> &mut W {
1174 &mut self.writer
1175 }
1176
1177 pub fn flush(&mut self) -> Result<(), ArrowError> {
1181 self.writer.flush()?;
1182 Ok(())
1183 }
1184
1185 pub fn into_inner(mut self) -> Result<W, ArrowError> {
1194 if !self.finished {
1195 self.finish()?;
1197 }
1198 Ok(self.writer)
1199 }
1200}
1201
1202impl<W: Write> RecordBatchWriter for FileWriter<W> {
1203 fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1204 self.write(batch)
1205 }
1206
1207 fn close(mut self) -> Result<(), ArrowError> {
1208 self.finish()
1209 }
1210}
1211
1212pub struct StreamWriter<W> {
1286 writer: W,
1288 write_options: IpcWriteOptions,
1290 finished: bool,
1292 dictionary_tracker: DictionaryTracker,
1294
1295 data_gen: IpcDataGenerator,
1296}
1297
1298impl<W: Write> StreamWriter<BufWriter<W>> {
1299 pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1303 Self::try_new(BufWriter::new(writer), schema)
1304 }
1305}
1306
1307impl<W: Write> StreamWriter<W> {
1308 pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1316 let write_options = IpcWriteOptions::default();
1317 Self::try_new_with_options(writer, schema, write_options)
1318 }
1319
1320 pub fn try_new_with_options(
1326 mut writer: W,
1327 schema: &Schema,
1328 write_options: IpcWriteOptions,
1329 ) -> Result<Self, ArrowError> {
1330 let data_gen = IpcDataGenerator::default();
1331 let mut dictionary_tracker = DictionaryTracker::new(false);
1332
1333 let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker(
1335 schema,
1336 &mut dictionary_tracker,
1337 &write_options,
1338 );
1339 write_message(&mut writer, encoded_message, &write_options)?;
1340 Ok(Self {
1341 writer,
1342 write_options,
1343 finished: false,
1344 dictionary_tracker,
1345 data_gen,
1346 })
1347 }
1348
1349 pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1351 if self.finished {
1352 return Err(ArrowError::IpcError(
1353 "Cannot write record batch to stream writer as it is closed".to_string(),
1354 ));
1355 }
1356
1357 let (encoded_dictionaries, encoded_message) = self
1358 .data_gen
1359 .encoded_batch(batch, &mut self.dictionary_tracker, &self.write_options)
1360 .expect("StreamWriter is configured to not error on dictionary replacement");
1361
1362 for encoded_dictionary in encoded_dictionaries {
1363 write_message(&mut self.writer, encoded_dictionary, &self.write_options)?;
1364 }
1365
1366 write_message(&mut self.writer, encoded_message, &self.write_options)?;
1367 Ok(())
1368 }
1369
1370 pub fn finish(&mut self) -> Result<(), ArrowError> {
1372 if self.finished {
1373 return Err(ArrowError::IpcError(
1374 "Cannot write footer to stream writer as it is closed".to_string(),
1375 ));
1376 }
1377
1378 write_continuation(&mut self.writer, &self.write_options, 0)?;
1379
1380 self.finished = true;
1381
1382 Ok(())
1383 }
1384
1385 pub fn get_ref(&self) -> &W {
1387 &self.writer
1388 }
1389
1390 pub fn get_mut(&mut self) -> &mut W {
1394 &mut self.writer
1395 }
1396
1397 pub fn flush(&mut self) -> Result<(), ArrowError> {
1401 self.writer.flush()?;
1402 Ok(())
1403 }
1404
1405 pub fn into_inner(mut self) -> Result<W, ArrowError> {
1443 if !self.finished {
1444 self.finish()?;
1446 }
1447 Ok(self.writer)
1448 }
1449}
1450
1451impl<W: Write> RecordBatchWriter for StreamWriter<W> {
1452 fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1453 self.write(batch)
1454 }
1455
1456 fn close(mut self) -> Result<(), ArrowError> {
1457 self.finish()
1458 }
1459}
1460
1461pub struct EncodedData {
1463 pub ipc_message: Vec<u8>,
1465 pub arrow_data: Vec<u8>,
1467}
1468pub fn write_message<W: Write>(
1470 mut writer: W,
1471 encoded: EncodedData,
1472 write_options: &IpcWriteOptions,
1473) -> Result<(usize, usize), ArrowError> {
1474 let arrow_data_len = encoded.arrow_data.len();
1475 if arrow_data_len % usize::from(write_options.alignment) != 0 {
1476 return Err(ArrowError::MemoryError(
1477 "Arrow data not aligned".to_string(),
1478 ));
1479 }
1480
1481 let a = usize::from(write_options.alignment - 1);
1482 let buffer = encoded.ipc_message;
1483 let flatbuf_size = buffer.len();
1484 let prefix_size = if write_options.write_legacy_ipc_format {
1485 4
1486 } else {
1487 8
1488 };
1489 let aligned_size = (flatbuf_size + prefix_size + a) & !a;
1490 let padding_bytes = aligned_size - flatbuf_size - prefix_size;
1491
1492 write_continuation(
1493 &mut writer,
1494 write_options,
1495 (aligned_size - prefix_size) as i32,
1496 )?;
1497
1498 if flatbuf_size > 0 {
1500 writer.write_all(&buffer)?;
1501 }
1502 writer.write_all(&PADDING[..padding_bytes])?;
1504
1505 let body_len = if arrow_data_len > 0 {
1507 write_body_buffers(&mut writer, &encoded.arrow_data, write_options.alignment)?
1508 } else {
1509 0
1510 };
1511
1512 Ok((aligned_size, body_len))
1513}
1514
1515fn write_body_buffers<W: Write>(
1516 mut writer: W,
1517 data: &[u8],
1518 alignment: u8,
1519) -> Result<usize, ArrowError> {
1520 let len = data.len();
1521 let pad_len = pad_to_alignment(alignment, len);
1522 let total_len = len + pad_len;
1523
1524 writer.write_all(data)?;
1526 if pad_len > 0 {
1527 writer.write_all(&PADDING[..pad_len])?;
1528 }
1529
1530 writer.flush()?;
1531 Ok(total_len)
1532}
1533
1534fn write_continuation<W: Write>(
1537 mut writer: W,
1538 write_options: &IpcWriteOptions,
1539 total_len: i32,
1540) -> Result<usize, ArrowError> {
1541 let mut written = 8;
1542
1543 match write_options.metadata_version {
1545 crate::MetadataVersion::V1 | crate::MetadataVersion::V2 | crate::MetadataVersion::V3 => {
1546 unreachable!("Options with the metadata version cannot be created")
1547 }
1548 crate::MetadataVersion::V4 => {
1549 if !write_options.write_legacy_ipc_format {
1550 writer.write_all(&CONTINUATION_MARKER)?;
1552 written = 4;
1553 }
1554 writer.write_all(&total_len.to_le_bytes()[..])?;
1555 }
1556 crate::MetadataVersion::V5 => {
1557 writer.write_all(&CONTINUATION_MARKER)?;
1559 writer.write_all(&total_len.to_le_bytes()[..])?;
1560 }
1561 z => panic!("Unsupported crate::MetadataVersion {z:?}"),
1562 };
1563
1564 writer.flush()?;
1565
1566 Ok(written)
1567}
1568
1569fn has_validity_bitmap(data_type: &DataType, write_options: &IpcWriteOptions) -> bool {
1573 if write_options.metadata_version < crate::MetadataVersion::V5 {
1574 !matches!(data_type, DataType::Null)
1575 } else {
1576 !matches!(
1577 data_type,
1578 DataType::Null | DataType::Union(_, _) | DataType::RunEndEncoded(_, _)
1579 )
1580 }
1581}
1582
1583#[inline]
1585fn buffer_need_truncate(
1586 array_offset: usize,
1587 buffer: &Buffer,
1588 spec: &BufferSpec,
1589 min_length: usize,
1590) -> bool {
1591 spec != &BufferSpec::AlwaysNull && (array_offset != 0 || min_length < buffer.len())
1592}
1593
1594#[inline]
1596fn get_buffer_element_width(spec: &BufferSpec) -> usize {
1597 match spec {
1598 BufferSpec::FixedWidth { byte_width, .. } => *byte_width,
1599 _ => 0,
1600 }
1601}
1602
1603fn reencode_offsets<O: OffsetSizeTrait>(
1606 offsets: &Buffer,
1607 data: &ArrayData,
1608) -> (Buffer, usize, usize) {
1609 let offsets_slice: &[O] = offsets.typed_data::<O>();
1610 let offset_slice = &offsets_slice[data.offset()..data.offset() + data.len() + 1];
1611
1612 let start_offset = offset_slice.first().unwrap();
1613 let end_offset = offset_slice.last().unwrap();
1614
1615 let offsets = match start_offset.as_usize() {
1616 0 => {
1617 let size = size_of::<O>();
1618 offsets.slice_with_length(data.offset() * size, (data.len() + 1) * size)
1619 }
1620 _ => offset_slice.iter().map(|x| *x - *start_offset).collect(),
1621 };
1622
1623 let start_offset = start_offset.as_usize();
1624 let end_offset = end_offset.as_usize();
1625
1626 (offsets, start_offset, end_offset - start_offset)
1627}
1628
1629fn get_byte_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, Buffer) {
1635 if data.is_empty() {
1636 return (MutableBuffer::new(0).into(), MutableBuffer::new(0).into());
1637 }
1638
1639 let (offsets, original_start_offset, len) = reencode_offsets::<O>(&data.buffers()[0], data);
1640 let values = data.buffers()[1].slice_with_length(original_start_offset, len);
1641 (offsets, values)
1642}
1643
1644fn get_list_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, ArrayData) {
1647 if data.is_empty() {
1648 return (
1649 MutableBuffer::new(0).into(),
1650 data.child_data()[0].slice(0, 0),
1651 );
1652 }
1653
1654 let (offsets, original_start_offset, len) = reencode_offsets::<O>(&data.buffers()[0], data);
1655 let child_data = data.child_data()[0].slice(original_start_offset, len);
1656 (offsets, child_data)
1657}
1658
1659#[allow(clippy::too_many_arguments)]
1661fn write_array_data(
1662 array_data: &ArrayData,
1663 buffers: &mut Vec<crate::Buffer>,
1664 arrow_data: &mut Vec<u8>,
1665 nodes: &mut Vec<crate::FieldNode>,
1666 offset: i64,
1667 num_rows: usize,
1668 null_count: usize,
1669 compression_codec: Option<CompressionCodec>,
1670 write_options: &IpcWriteOptions,
1671) -> Result<i64, ArrowError> {
1672 let mut offset = offset;
1673 if !matches!(array_data.data_type(), DataType::Null) {
1674 nodes.push(crate::FieldNode::new(num_rows as i64, null_count as i64));
1675 } else {
1676 nodes.push(crate::FieldNode::new(num_rows as i64, num_rows as i64));
1679 }
1680 if has_validity_bitmap(array_data.data_type(), write_options) {
1681 let null_buffer = match array_data.nulls() {
1683 None => {
1684 let num_bytes = bit_util::ceil(num_rows, 8);
1686 let buffer = MutableBuffer::new(num_bytes);
1687 let buffer = buffer.with_bitset(num_bytes, true);
1688 buffer.into()
1689 }
1690 Some(buffer) => buffer.inner().sliced(),
1691 };
1692
1693 offset = write_buffer(
1694 null_buffer.as_slice(),
1695 buffers,
1696 arrow_data,
1697 offset,
1698 compression_codec,
1699 write_options.alignment,
1700 )?;
1701 }
1702
1703 let data_type = array_data.data_type();
1704 if matches!(data_type, DataType::Binary | DataType::Utf8) {
1705 let (offsets, values) = get_byte_array_buffers::<i32>(array_data);
1706 for buffer in [offsets, values] {
1707 offset = write_buffer(
1708 buffer.as_slice(),
1709 buffers,
1710 arrow_data,
1711 offset,
1712 compression_codec,
1713 write_options.alignment,
1714 )?;
1715 }
1716 } else if matches!(data_type, DataType::BinaryView | DataType::Utf8View) {
1717 for buffer in array_data.buffers() {
1724 offset = write_buffer(
1725 buffer.as_slice(),
1726 buffers,
1727 arrow_data,
1728 offset,
1729 compression_codec,
1730 write_options.alignment,
1731 )?;
1732 }
1733 } else if matches!(data_type, DataType::LargeBinary | DataType::LargeUtf8) {
1734 let (offsets, values) = get_byte_array_buffers::<i64>(array_data);
1735 for buffer in [offsets, values] {
1736 offset = write_buffer(
1737 buffer.as_slice(),
1738 buffers,
1739 arrow_data,
1740 offset,
1741 compression_codec,
1742 write_options.alignment,
1743 )?;
1744 }
1745 } else if DataType::is_numeric(data_type)
1746 || DataType::is_temporal(data_type)
1747 || matches!(
1748 array_data.data_type(),
1749 DataType::FixedSizeBinary(_) | DataType::Dictionary(_, _)
1750 )
1751 {
1752 assert_eq!(array_data.buffers().len(), 1);
1754
1755 let buffer = &array_data.buffers()[0];
1756 let layout = layout(data_type);
1757 let spec = &layout.buffers[0];
1758
1759 let byte_width = get_buffer_element_width(spec);
1760 let min_length = array_data.len() * byte_width;
1761 let buffer_slice = if buffer_need_truncate(array_data.offset(), buffer, spec, min_length) {
1762 let byte_offset = array_data.offset() * byte_width;
1763 let buffer_length = min(min_length, buffer.len() - byte_offset);
1764 &buffer.as_slice()[byte_offset..(byte_offset + buffer_length)]
1765 } else {
1766 buffer.as_slice()
1767 };
1768 offset = write_buffer(
1769 buffer_slice,
1770 buffers,
1771 arrow_data,
1772 offset,
1773 compression_codec,
1774 write_options.alignment,
1775 )?;
1776 } else if matches!(data_type, DataType::Boolean) {
1777 assert_eq!(array_data.buffers().len(), 1);
1780
1781 let buffer = &array_data.buffers()[0];
1782 let buffer = buffer.bit_slice(array_data.offset(), array_data.len());
1783 offset = write_buffer(
1784 &buffer,
1785 buffers,
1786 arrow_data,
1787 offset,
1788 compression_codec,
1789 write_options.alignment,
1790 )?;
1791 } else if matches!(
1792 data_type,
1793 DataType::List(_) | DataType::LargeList(_) | DataType::Map(_, _)
1794 ) {
1795 assert_eq!(array_data.buffers().len(), 1);
1796 assert_eq!(array_data.child_data().len(), 1);
1797
1798 let (offsets, sliced_child_data) = match data_type {
1800 DataType::List(_) => get_list_array_buffers::<i32>(array_data),
1801 DataType::Map(_, _) => get_list_array_buffers::<i32>(array_data),
1802 DataType::LargeList(_) => get_list_array_buffers::<i64>(array_data),
1803 _ => unreachable!(),
1804 };
1805 offset = write_buffer(
1806 offsets.as_slice(),
1807 buffers,
1808 arrow_data,
1809 offset,
1810 compression_codec,
1811 write_options.alignment,
1812 )?;
1813 offset = write_array_data(
1814 &sliced_child_data,
1815 buffers,
1816 arrow_data,
1817 nodes,
1818 offset,
1819 sliced_child_data.len(),
1820 sliced_child_data.null_count(),
1821 compression_codec,
1822 write_options,
1823 )?;
1824 return Ok(offset);
1825 } else if let DataType::FixedSizeList(_, fixed_size) = data_type {
1826 assert_eq!(array_data.child_data().len(), 1);
1827 let fixed_size = *fixed_size as usize;
1828
1829 let child_offset = array_data.offset() * fixed_size;
1830 let child_length = array_data.len() * fixed_size;
1831 let child_data = array_data.child_data()[0].slice(child_offset, child_length);
1832
1833 offset = write_array_data(
1834 &child_data,
1835 buffers,
1836 arrow_data,
1837 nodes,
1838 offset,
1839 child_data.len(),
1840 child_data.null_count(),
1841 compression_codec,
1842 write_options,
1843 )?;
1844 return Ok(offset);
1845 } else {
1846 for buffer in array_data.buffers() {
1847 offset = write_buffer(
1848 buffer,
1849 buffers,
1850 arrow_data,
1851 offset,
1852 compression_codec,
1853 write_options.alignment,
1854 )?;
1855 }
1856 }
1857
1858 match array_data.data_type() {
1859 DataType::Dictionary(_, _) => {}
1860 DataType::RunEndEncoded(_, _) => {
1861 let arr = unslice_run_array(array_data.clone())?;
1863 for data_ref in arr.child_data() {
1865 offset = write_array_data(
1867 data_ref,
1868 buffers,
1869 arrow_data,
1870 nodes,
1871 offset,
1872 data_ref.len(),
1873 data_ref.null_count(),
1874 compression_codec,
1875 write_options,
1876 )?;
1877 }
1878 }
1879 _ => {
1880 for data_ref in array_data.child_data() {
1882 offset = write_array_data(
1884 data_ref,
1885 buffers,
1886 arrow_data,
1887 nodes,
1888 offset,
1889 data_ref.len(),
1890 data_ref.null_count(),
1891 compression_codec,
1892 write_options,
1893 )?;
1894 }
1895 }
1896 }
1897 Ok(offset)
1898}
1899
1900fn write_buffer(
1913 buffer: &[u8], buffers: &mut Vec<crate::Buffer>, arrow_data: &mut Vec<u8>, offset: i64, compression_codec: Option<CompressionCodec>,
1918 alignment: u8,
1919) -> Result<i64, ArrowError> {
1920 let len: i64 = match compression_codec {
1921 Some(compressor) => compressor.compress_to_vec(buffer, arrow_data)?,
1922 None => {
1923 arrow_data.extend_from_slice(buffer);
1924 buffer.len()
1925 }
1926 }
1927 .try_into()
1928 .map_err(|e| {
1929 ArrowError::InvalidArgumentError(format!("Could not convert compressed size to i64: {e}"))
1930 })?;
1931
1932 buffers.push(crate::Buffer::new(offset, len));
1934 let pad_len = pad_to_alignment(alignment, len as usize);
1936 arrow_data.extend_from_slice(&PADDING[..pad_len]);
1937
1938 Ok(offset + len + (pad_len as i64))
1939}
1940
1941const PADDING: [u8; 64] = [0; 64];
1942
1943#[inline]
1945fn pad_to_alignment(alignment: u8, len: usize) -> usize {
1946 let a = usize::from(alignment - 1);
1947 ((len + a) & !a) - len
1948}
1949
1950#[cfg(test)]
1951mod tests {
1952 use std::hash::Hasher;
1953 use std::io::Cursor;
1954 use std::io::Seek;
1955
1956 use arrow_array::builder::FixedSizeListBuilder;
1957 use arrow_array::builder::Float32Builder;
1958 use arrow_array::builder::Int64Builder;
1959 use arrow_array::builder::MapBuilder;
1960 use arrow_array::builder::UnionBuilder;
1961 use arrow_array::builder::{GenericListBuilder, ListBuilder, StringBuilder};
1962 use arrow_array::builder::{PrimitiveRunBuilder, UInt32Builder};
1963 use arrow_array::types::*;
1964 use arrow_buffer::ScalarBuffer;
1965
1966 use crate::convert::fb_to_schema;
1967 use crate::reader::*;
1968 use crate::root_as_footer;
1969 use crate::MetadataVersion;
1970
1971 use super::*;
1972
1973 fn serialize_file(rb: &RecordBatch) -> Vec<u8> {
1974 let mut writer = FileWriter::try_new(vec![], rb.schema_ref()).unwrap();
1975 writer.write(rb).unwrap();
1976 writer.finish().unwrap();
1977 writer.into_inner().unwrap()
1978 }
1979
1980 fn deserialize_file(bytes: Vec<u8>) -> RecordBatch {
1981 let mut reader = FileReader::try_new(Cursor::new(bytes), None).unwrap();
1982 reader.next().unwrap().unwrap()
1983 }
1984
1985 fn serialize_stream(record: &RecordBatch) -> Vec<u8> {
1986 const IPC_ALIGNMENT: usize = 8;
1990
1991 let mut stream_writer = StreamWriter::try_new_with_options(
1992 vec![],
1993 record.schema_ref(),
1994 IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
1995 )
1996 .unwrap();
1997 stream_writer.write(record).unwrap();
1998 stream_writer.finish().unwrap();
1999 stream_writer.into_inner().unwrap()
2000 }
2001
2002 fn deserialize_stream(bytes: Vec<u8>) -> RecordBatch {
2003 let mut stream_reader = StreamReader::try_new(Cursor::new(bytes), None).unwrap();
2004 stream_reader.next().unwrap().unwrap()
2005 }
2006
2007 #[test]
2008 #[cfg(feature = "lz4")]
2009 fn test_write_empty_record_batch_lz4_compression() {
2010 let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
2011 let values: Vec<Option<i32>> = vec![];
2012 let array = Int32Array::from(values);
2013 let record_batch =
2014 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2015
2016 let mut file = tempfile::tempfile().unwrap();
2017
2018 {
2019 let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2020 .unwrap()
2021 .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
2022 .unwrap();
2023
2024 let mut writer =
2025 FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2026 writer.write(&record_batch).unwrap();
2027 writer.finish().unwrap();
2028 }
2029 file.rewind().unwrap();
2030 {
2031 let reader = FileReader::try_new(file, None).unwrap();
2033 for read_batch in reader {
2034 read_batch
2035 .unwrap()
2036 .columns()
2037 .iter()
2038 .zip(record_batch.columns())
2039 .for_each(|(a, b)| {
2040 assert_eq!(a.data_type(), b.data_type());
2041 assert_eq!(a.len(), b.len());
2042 assert_eq!(a.null_count(), b.null_count());
2043 });
2044 }
2045 }
2046 }
2047
2048 #[test]
2049 #[cfg(feature = "lz4")]
2050 fn test_write_file_with_lz4_compression() {
2051 let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
2052 let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
2053 let array = Int32Array::from(values);
2054 let record_batch =
2055 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2056
2057 let mut file = tempfile::tempfile().unwrap();
2058 {
2059 let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2060 .unwrap()
2061 .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
2062 .unwrap();
2063
2064 let mut writer =
2065 FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2066 writer.write(&record_batch).unwrap();
2067 writer.finish().unwrap();
2068 }
2069 file.rewind().unwrap();
2070 {
2071 let reader = FileReader::try_new(file, None).unwrap();
2073 for read_batch in reader {
2074 read_batch
2075 .unwrap()
2076 .columns()
2077 .iter()
2078 .zip(record_batch.columns())
2079 .for_each(|(a, b)| {
2080 assert_eq!(a.data_type(), b.data_type());
2081 assert_eq!(a.len(), b.len());
2082 assert_eq!(a.null_count(), b.null_count());
2083 });
2084 }
2085 }
2086 }
2087
2088 #[test]
2089 #[cfg(feature = "zstd")]
2090 fn test_write_file_with_zstd_compression() {
2091 let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
2092 let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
2093 let array = Int32Array::from(values);
2094 let record_batch =
2095 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2096 let mut file = tempfile::tempfile().unwrap();
2097 {
2098 let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2099 .unwrap()
2100 .try_with_compression(Some(crate::CompressionType::ZSTD))
2101 .unwrap();
2102
2103 let mut writer =
2104 FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2105 writer.write(&record_batch).unwrap();
2106 writer.finish().unwrap();
2107 }
2108 file.rewind().unwrap();
2109 {
2110 let reader = FileReader::try_new(file, None).unwrap();
2112 for read_batch in reader {
2113 read_batch
2114 .unwrap()
2115 .columns()
2116 .iter()
2117 .zip(record_batch.columns())
2118 .for_each(|(a, b)| {
2119 assert_eq!(a.data_type(), b.data_type());
2120 assert_eq!(a.len(), b.len());
2121 assert_eq!(a.null_count(), b.null_count());
2122 });
2123 }
2124 }
2125 }
2126
2127 #[test]
2128 fn test_write_file() {
2129 let schema = Schema::new(vec![Field::new("field1", DataType::UInt32, true)]);
2130 let values: Vec<Option<u32>> = vec![
2131 Some(999),
2132 None,
2133 Some(235),
2134 Some(123),
2135 None,
2136 None,
2137 None,
2138 None,
2139 None,
2140 ];
2141 let array1 = UInt32Array::from(values);
2142 let batch =
2143 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array1) as ArrayRef])
2144 .unwrap();
2145 let mut file = tempfile::tempfile().unwrap();
2146 {
2147 let mut writer = FileWriter::try_new(&mut file, &schema).unwrap();
2148
2149 writer.write(&batch).unwrap();
2150 writer.finish().unwrap();
2151 }
2152 file.rewind().unwrap();
2153
2154 {
2155 let mut reader = FileReader::try_new(file, None).unwrap();
2156 while let Some(Ok(read_batch)) = reader.next() {
2157 read_batch
2158 .columns()
2159 .iter()
2160 .zip(batch.columns())
2161 .for_each(|(a, b)| {
2162 assert_eq!(a.data_type(), b.data_type());
2163 assert_eq!(a.len(), b.len());
2164 assert_eq!(a.null_count(), b.null_count());
2165 });
2166 }
2167 }
2168 }
2169
2170 fn write_null_file(options: IpcWriteOptions) {
2171 let schema = Schema::new(vec![
2172 Field::new("nulls", DataType::Null, true),
2173 Field::new("int32s", DataType::Int32, false),
2174 Field::new("nulls2", DataType::Null, true),
2175 Field::new("f64s", DataType::Float64, false),
2176 ]);
2177 let array1 = NullArray::new(32);
2178 let array2 = Int32Array::from(vec![1; 32]);
2179 let array3 = NullArray::new(32);
2180 let array4 = Float64Array::from(vec![f64::NAN; 32]);
2181 let batch = RecordBatch::try_new(
2182 Arc::new(schema.clone()),
2183 vec![
2184 Arc::new(array1) as ArrayRef,
2185 Arc::new(array2) as ArrayRef,
2186 Arc::new(array3) as ArrayRef,
2187 Arc::new(array4) as ArrayRef,
2188 ],
2189 )
2190 .unwrap();
2191 let mut file = tempfile::tempfile().unwrap();
2192 {
2193 let mut writer = FileWriter::try_new_with_options(&mut file, &schema, options).unwrap();
2194
2195 writer.write(&batch).unwrap();
2196 writer.finish().unwrap();
2197 }
2198
2199 file.rewind().unwrap();
2200
2201 {
2202 let reader = FileReader::try_new(file, None).unwrap();
2203 reader.for_each(|maybe_batch| {
2204 maybe_batch
2205 .unwrap()
2206 .columns()
2207 .iter()
2208 .zip(batch.columns())
2209 .for_each(|(a, b)| {
2210 assert_eq!(a.data_type(), b.data_type());
2211 assert_eq!(a.len(), b.len());
2212 assert_eq!(a.null_count(), b.null_count());
2213 });
2214 });
2215 }
2216 }
2217 #[test]
2218 fn test_write_null_file_v4() {
2219 write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap());
2220 write_null_file(IpcWriteOptions::try_new(8, true, MetadataVersion::V4).unwrap());
2221 write_null_file(IpcWriteOptions::try_new(64, false, MetadataVersion::V4).unwrap());
2222 write_null_file(IpcWriteOptions::try_new(64, true, MetadataVersion::V4).unwrap());
2223 }
2224
2225 #[test]
2226 fn test_write_null_file_v5() {
2227 write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap());
2228 write_null_file(IpcWriteOptions::try_new(64, false, MetadataVersion::V5).unwrap());
2229 }
2230
2231 #[test]
2232 fn track_union_nested_dict() {
2233 let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2234
2235 let array = Arc::new(inner) as ArrayRef;
2236
2237 #[allow(deprecated)]
2239 let dctfield = Field::new_dict("dict", array.data_type().clone(), false, 0, false);
2240 let union_fields = [(0, Arc::new(dctfield))].into_iter().collect();
2241
2242 let types = [0, 0, 0].into_iter().collect::<ScalarBuffer<i8>>();
2243 let offsets = [0, 1, 2].into_iter().collect::<ScalarBuffer<i32>>();
2244
2245 let union = UnionArray::try_new(union_fields, types, Some(offsets), vec![array]).unwrap();
2246
2247 let schema = Arc::new(Schema::new(vec![Field::new(
2248 "union",
2249 union.data_type().clone(),
2250 false,
2251 )]));
2252
2253 let gen = IpcDataGenerator {};
2254 let mut dict_tracker = DictionaryTracker::new(false);
2255 gen.schema_to_bytes_with_dictionary_tracker(
2256 &schema,
2257 &mut dict_tracker,
2258 &IpcWriteOptions::default(),
2259 );
2260
2261 let batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap();
2262
2263 gen.encoded_batch(&batch, &mut dict_tracker, &Default::default())
2264 .unwrap();
2265
2266 assert!(dict_tracker.written.contains_key(&0));
2269 }
2270
2271 #[test]
2272 fn track_struct_nested_dict() {
2273 let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2274
2275 let array = Arc::new(inner) as ArrayRef;
2276
2277 #[allow(deprecated)]
2279 let dctfield = Arc::new(Field::new_dict(
2280 "dict",
2281 array.data_type().clone(),
2282 false,
2283 2,
2284 false,
2285 ));
2286
2287 let s = StructArray::from(vec![(dctfield, array)]);
2288 let struct_array = Arc::new(s) as ArrayRef;
2289
2290 let schema = Arc::new(Schema::new(vec![Field::new(
2291 "struct",
2292 struct_array.data_type().clone(),
2293 false,
2294 )]));
2295
2296 let gen = IpcDataGenerator {};
2297 let mut dict_tracker = DictionaryTracker::new(false);
2298 gen.schema_to_bytes_with_dictionary_tracker(
2299 &schema,
2300 &mut dict_tracker,
2301 &IpcWriteOptions::default(),
2302 );
2303
2304 let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
2305
2306 gen.encoded_batch(&batch, &mut dict_tracker, &Default::default())
2307 .unwrap();
2308
2309 assert!(dict_tracker.written.contains_key(&0));
2310 }
2311
2312 fn write_union_file(options: IpcWriteOptions) {
2313 let schema = Schema::new(vec![Field::new_union(
2314 "union",
2315 vec![0, 1],
2316 vec![
2317 Field::new("a", DataType::Int32, false),
2318 Field::new("c", DataType::Float64, false),
2319 ],
2320 UnionMode::Sparse,
2321 )]);
2322 let mut builder = UnionBuilder::with_capacity_sparse(5);
2323 builder.append::<Int32Type>("a", 1).unwrap();
2324 builder.append_null::<Int32Type>("a").unwrap();
2325 builder.append::<Float64Type>("c", 3.0).unwrap();
2326 builder.append_null::<Float64Type>("c").unwrap();
2327 builder.append::<Int32Type>("a", 4).unwrap();
2328 let union = builder.build().unwrap();
2329
2330 let batch =
2331 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(union) as ArrayRef])
2332 .unwrap();
2333
2334 let mut file = tempfile::tempfile().unwrap();
2335 {
2336 let mut writer = FileWriter::try_new_with_options(&mut file, &schema, options).unwrap();
2337
2338 writer.write(&batch).unwrap();
2339 writer.finish().unwrap();
2340 }
2341 file.rewind().unwrap();
2342
2343 {
2344 let reader = FileReader::try_new(file, None).unwrap();
2345 reader.for_each(|maybe_batch| {
2346 maybe_batch
2347 .unwrap()
2348 .columns()
2349 .iter()
2350 .zip(batch.columns())
2351 .for_each(|(a, b)| {
2352 assert_eq!(a.data_type(), b.data_type());
2353 assert_eq!(a.len(), b.len());
2354 assert_eq!(a.null_count(), b.null_count());
2355 });
2356 });
2357 }
2358 }
2359
2360 #[test]
2361 fn test_write_union_file_v4_v5() {
2362 write_union_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap());
2363 write_union_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap());
2364 }
2365
2366 #[test]
2367 fn test_write_view_types() {
2368 const LONG_TEST_STRING: &str =
2369 "This is a long string to make sure binary view array handles it";
2370 let schema = Schema::new(vec![
2371 Field::new("field1", DataType::BinaryView, true),
2372 Field::new("field2", DataType::Utf8View, true),
2373 ]);
2374 let values: Vec<Option<&[u8]>> = vec![
2375 Some(b"foo"),
2376 Some(b"bar"),
2377 Some(LONG_TEST_STRING.as_bytes()),
2378 ];
2379 let binary_array = BinaryViewArray::from_iter(values);
2380 let utf8_array =
2381 StringViewArray::from_iter(vec![Some("foo"), Some("bar"), Some(LONG_TEST_STRING)]);
2382 let record_batch = RecordBatch::try_new(
2383 Arc::new(schema.clone()),
2384 vec![Arc::new(binary_array), Arc::new(utf8_array)],
2385 )
2386 .unwrap();
2387
2388 let mut file = tempfile::tempfile().unwrap();
2389 {
2390 let mut writer = FileWriter::try_new(&mut file, &schema).unwrap();
2391 writer.write(&record_batch).unwrap();
2392 writer.finish().unwrap();
2393 }
2394 file.rewind().unwrap();
2395 {
2396 let mut reader = FileReader::try_new(&file, None).unwrap();
2397 let read_batch = reader.next().unwrap().unwrap();
2398 read_batch
2399 .columns()
2400 .iter()
2401 .zip(record_batch.columns())
2402 .for_each(|(a, b)| {
2403 assert_eq!(a, b);
2404 });
2405 }
2406 file.rewind().unwrap();
2407 {
2408 let mut reader = FileReader::try_new(&file, Some(vec![0])).unwrap();
2409 let read_batch = reader.next().unwrap().unwrap();
2410 assert_eq!(read_batch.num_columns(), 1);
2411 let read_array = read_batch.column(0);
2412 let write_array = record_batch.column(0);
2413 assert_eq!(read_array, write_array);
2414 }
2415 }
2416
2417 #[test]
2418 fn truncate_ipc_record_batch() {
2419 fn create_batch(rows: usize) -> RecordBatch {
2420 let schema = Schema::new(vec![
2421 Field::new("a", DataType::Int32, false),
2422 Field::new("b", DataType::Utf8, false),
2423 ]);
2424
2425 let a = Int32Array::from_iter_values(0..rows as i32);
2426 let b = StringArray::from_iter_values((0..rows).map(|i| i.to_string()));
2427
2428 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap()
2429 }
2430
2431 let big_record_batch = create_batch(65536);
2432
2433 let length = 5;
2434 let small_record_batch = create_batch(length);
2435
2436 let offset = 2;
2437 let record_batch_slice = big_record_batch.slice(offset, length);
2438 assert!(
2439 serialize_stream(&big_record_batch).len() > serialize_stream(&small_record_batch).len()
2440 );
2441 assert_eq!(
2442 serialize_stream(&small_record_batch).len(),
2443 serialize_stream(&record_batch_slice).len()
2444 );
2445
2446 assert_eq!(
2447 deserialize_stream(serialize_stream(&record_batch_slice)),
2448 record_batch_slice
2449 );
2450 }
2451
2452 #[test]
2453 fn truncate_ipc_record_batch_with_nulls() {
2454 fn create_batch() -> RecordBatch {
2455 let schema = Schema::new(vec![
2456 Field::new("a", DataType::Int32, true),
2457 Field::new("b", DataType::Utf8, true),
2458 ]);
2459
2460 let a = Int32Array::from(vec![Some(1), None, Some(1), None, Some(1)]);
2461 let b = StringArray::from(vec![None, Some("a"), Some("a"), None, Some("a")]);
2462
2463 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap()
2464 }
2465
2466 let record_batch = create_batch();
2467 let record_batch_slice = record_batch.slice(1, 2);
2468 let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2469
2470 assert!(
2471 serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2472 );
2473
2474 assert!(deserialized_batch.column(0).is_null(0));
2475 assert!(deserialized_batch.column(0).is_valid(1));
2476 assert!(deserialized_batch.column(1).is_valid(0));
2477 assert!(deserialized_batch.column(1).is_valid(1));
2478
2479 assert_eq!(record_batch_slice, deserialized_batch);
2480 }
2481
2482 #[test]
2483 fn truncate_ipc_dictionary_array() {
2484 fn create_batch() -> RecordBatch {
2485 let values: StringArray = [Some("foo"), Some("bar"), Some("baz")]
2486 .into_iter()
2487 .collect();
2488 let keys: Int32Array = [Some(0), Some(2), None, Some(1)].into_iter().collect();
2489
2490 let array = DictionaryArray::new(keys, Arc::new(values));
2491
2492 let schema = Schema::new(vec![Field::new("dict", array.data_type().clone(), true)]);
2493
2494 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap()
2495 }
2496
2497 let record_batch = create_batch();
2498 let record_batch_slice = record_batch.slice(1, 2);
2499 let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2500
2501 assert!(
2502 serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2503 );
2504
2505 assert!(deserialized_batch.column(0).is_valid(0));
2506 assert!(deserialized_batch.column(0).is_null(1));
2507
2508 assert_eq!(record_batch_slice, deserialized_batch);
2509 }
2510
2511 #[test]
2512 fn truncate_ipc_struct_array() {
2513 fn create_batch() -> RecordBatch {
2514 let strings: StringArray = [Some("foo"), None, Some("bar"), Some("baz")]
2515 .into_iter()
2516 .collect();
2517 let ints: Int32Array = [Some(0), Some(2), None, Some(1)].into_iter().collect();
2518
2519 let struct_array = StructArray::from(vec![
2520 (
2521 Arc::new(Field::new("s", DataType::Utf8, true)),
2522 Arc::new(strings) as ArrayRef,
2523 ),
2524 (
2525 Arc::new(Field::new("c", DataType::Int32, true)),
2526 Arc::new(ints) as ArrayRef,
2527 ),
2528 ]);
2529
2530 let schema = Schema::new(vec![Field::new(
2531 "struct_array",
2532 struct_array.data_type().clone(),
2533 true,
2534 )]);
2535
2536 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(struct_array)]).unwrap()
2537 }
2538
2539 let record_batch = create_batch();
2540 let record_batch_slice = record_batch.slice(1, 2);
2541 let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2542
2543 assert!(
2544 serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2545 );
2546
2547 let structs = deserialized_batch
2548 .column(0)
2549 .as_any()
2550 .downcast_ref::<StructArray>()
2551 .unwrap();
2552
2553 assert!(structs.column(0).is_null(0));
2554 assert!(structs.column(0).is_valid(1));
2555 assert!(structs.column(1).is_valid(0));
2556 assert!(structs.column(1).is_null(1));
2557 assert_eq!(record_batch_slice, deserialized_batch);
2558 }
2559
2560 #[test]
2561 fn truncate_ipc_string_array_with_all_empty_string() {
2562 fn create_batch() -> RecordBatch {
2563 let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
2564 let a = StringArray::from(vec![Some(""), Some(""), Some(""), Some(""), Some("")]);
2565 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap()
2566 }
2567
2568 let record_batch = create_batch();
2569 let record_batch_slice = record_batch.slice(0, 1);
2570 let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2571
2572 assert!(
2573 serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2574 );
2575 assert_eq!(record_batch_slice, deserialized_batch);
2576 }
2577
2578 #[test]
2579 fn test_stream_writer_writes_array_slice() {
2580 let array = UInt32Array::from(vec![Some(1), Some(2), Some(3)]);
2581 assert_eq!(
2582 vec![Some(1), Some(2), Some(3)],
2583 array.iter().collect::<Vec<_>>()
2584 );
2585
2586 let sliced = array.slice(1, 2);
2587 assert_eq!(vec![Some(2), Some(3)], sliced.iter().collect::<Vec<_>>());
2588
2589 let batch = RecordBatch::try_new(
2590 Arc::new(Schema::new(vec![Field::new("a", DataType::UInt32, true)])),
2591 vec![Arc::new(sliced)],
2592 )
2593 .expect("new batch");
2594
2595 let mut writer = StreamWriter::try_new(vec![], batch.schema_ref()).expect("new writer");
2596 writer.write(&batch).expect("write");
2597 let outbuf = writer.into_inner().expect("inner");
2598
2599 let mut reader = StreamReader::try_new(&outbuf[..], None).expect("new reader");
2600 let read_batch = reader.next().unwrap().expect("read batch");
2601
2602 let read_array: &UInt32Array = read_batch.column(0).as_primitive();
2603 assert_eq!(
2604 vec![Some(2), Some(3)],
2605 read_array.iter().collect::<Vec<_>>()
2606 );
2607 }
2608
2609 #[test]
2610 fn test_large_slice_uint32() {
2611 ensure_roundtrip(Arc::new(UInt32Array::from_iter((0..8000).map(|i| {
2612 if i % 2 == 0 {
2613 Some(i)
2614 } else {
2615 None
2616 }
2617 }))));
2618 }
2619
2620 #[test]
2621 fn test_large_slice_string() {
2622 let strings: Vec<_> = (0..8000)
2623 .map(|i| {
2624 if i % 2 == 0 {
2625 Some(format!("value{i}"))
2626 } else {
2627 None
2628 }
2629 })
2630 .collect();
2631
2632 ensure_roundtrip(Arc::new(StringArray::from(strings)));
2633 }
2634
2635 #[test]
2636 fn test_large_slice_string_list() {
2637 let mut ls = ListBuilder::new(StringBuilder::new());
2638
2639 let mut s = String::new();
2640 for row_number in 0..8000 {
2641 if row_number % 2 == 0 {
2642 for list_element in 0..1000 {
2643 s.clear();
2644 use std::fmt::Write;
2645 write!(&mut s, "value{row_number}-{list_element}").unwrap();
2646 ls.values().append_value(&s);
2647 }
2648 ls.append(true)
2649 } else {
2650 ls.append(false); }
2652 }
2653
2654 ensure_roundtrip(Arc::new(ls.finish()));
2655 }
2656
2657 #[test]
2658 fn test_large_slice_string_list_of_lists() {
2659 let mut ls = ListBuilder::new(ListBuilder::new(StringBuilder::new()));
2663
2664 for _ in 0..4000 {
2665 ls.values().append(true);
2666 ls.append(true)
2667 }
2668
2669 let mut s = String::new();
2670 for row_number in 0..4000 {
2671 if row_number % 2 == 0 {
2672 for list_element in 0..1000 {
2673 s.clear();
2674 use std::fmt::Write;
2675 write!(&mut s, "value{row_number}-{list_element}").unwrap();
2676 ls.values().values().append_value(&s);
2677 }
2678 ls.values().append(true);
2679 ls.append(true)
2680 } else {
2681 ls.append(false); }
2683 }
2684
2685 ensure_roundtrip(Arc::new(ls.finish()));
2686 }
2687
2688 fn ensure_roundtrip(array: ArrayRef) {
2690 let num_rows = array.len();
2691 let orig_batch = RecordBatch::try_from_iter(vec![("a", array)]).unwrap();
2692 let sliced_batch = orig_batch.slice(1, num_rows - 1);
2694
2695 let schema = orig_batch.schema();
2696 let stream_data = {
2697 let mut writer = StreamWriter::try_new(vec![], &schema).unwrap();
2698 writer.write(&sliced_batch).unwrap();
2699 writer.into_inner().unwrap()
2700 };
2701 let read_batch = {
2702 let projection = None;
2703 let mut reader = StreamReader::try_new(Cursor::new(stream_data), projection).unwrap();
2704 reader
2705 .next()
2706 .expect("expect no errors reading batch")
2707 .expect("expect batch")
2708 };
2709 assert_eq!(sliced_batch, read_batch);
2710
2711 let file_data = {
2712 let mut writer = FileWriter::try_new_buffered(vec![], &schema).unwrap();
2713 writer.write(&sliced_batch).unwrap();
2714 writer.into_inner().unwrap().into_inner().unwrap()
2715 };
2716 let read_batch = {
2717 let projection = None;
2718 let mut reader = FileReader::try_new(Cursor::new(file_data), projection).unwrap();
2719 reader
2720 .next()
2721 .expect("expect no errors reading batch")
2722 .expect("expect batch")
2723 };
2724 assert_eq!(sliced_batch, read_batch);
2725
2726 }
2728
2729 #[test]
2730 fn encode_bools_slice() {
2731 assert_bool_roundtrip([true, false], 1, 1);
2733
2734 assert_bool_roundtrip(
2736 [
2737 true, false, true, true, false, false, true, true, true, false, false, false, true,
2738 true, true, true, false, false, false, false, true, true, true, true, true, false,
2739 false, false, false, false,
2740 ],
2741 13,
2742 17,
2743 );
2744
2745 assert_bool_roundtrip(
2747 [
2748 true, false, true, true, false, false, true, true, true, false, false, false,
2749 ],
2750 8,
2751 2,
2752 );
2753
2754 assert_bool_roundtrip(
2756 [
2757 true, false, true, true, false, false, true, true, true, false, false, false, true,
2758 true, true, true, true, false, false, false, false, false,
2759 ],
2760 8,
2761 8,
2762 );
2763 }
2764
2765 fn assert_bool_roundtrip<const N: usize>(bools: [bool; N], offset: usize, length: usize) {
2766 let val_bool_field = Field::new("val", DataType::Boolean, false);
2767
2768 let schema = Arc::new(Schema::new(vec![val_bool_field]));
2769
2770 let bools = BooleanArray::from(bools.to_vec());
2771
2772 let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(bools)]).unwrap();
2773 let batch = batch.slice(offset, length);
2774
2775 let data = serialize_stream(&batch);
2776 let batch2 = deserialize_stream(data);
2777 assert_eq!(batch, batch2);
2778 }
2779
2780 #[test]
2781 fn test_run_array_unslice() {
2782 let total_len = 80;
2783 let vals: Vec<Option<i32>> = vec![Some(1), None, Some(2), Some(3), Some(4), None, Some(5)];
2784 let repeats: Vec<usize> = vec![3, 4, 1, 2];
2785 let mut input_array: Vec<Option<i32>> = Vec::with_capacity(total_len);
2786 for ix in 0_usize..32 {
2787 let repeat: usize = repeats[ix % repeats.len()];
2788 let val: Option<i32> = vals[ix % vals.len()];
2789 input_array.resize(input_array.len() + repeat, val);
2790 }
2791
2792 let mut builder =
2794 PrimitiveRunBuilder::<Int16Type, Int32Type>::with_capacity(input_array.len());
2795 builder.extend(input_array.iter().copied());
2796 let run_array = builder.finish();
2797
2798 for slice_len in 1..=total_len {
2800 let sliced_run_array: RunArray<Int16Type> =
2802 run_array.slice(0, slice_len).into_data().into();
2803
2804 let unsliced_run_array = into_zero_offset_run_array(sliced_run_array).unwrap();
2806 let typed = unsliced_run_array
2807 .downcast::<PrimitiveArray<Int32Type>>()
2808 .unwrap();
2809 let expected: Vec<Option<i32>> = input_array.iter().take(slice_len).copied().collect();
2810 let actual: Vec<Option<i32>> = typed.into_iter().collect();
2811 assert_eq!(expected, actual);
2812
2813 let sliced_run_array: RunArray<Int16Type> = run_array
2815 .slice(total_len - slice_len, slice_len)
2816 .into_data()
2817 .into();
2818
2819 let unsliced_run_array = into_zero_offset_run_array(sliced_run_array).unwrap();
2821 let typed = unsliced_run_array
2822 .downcast::<PrimitiveArray<Int32Type>>()
2823 .unwrap();
2824 let expected: Vec<Option<i32>> = input_array
2825 .iter()
2826 .skip(total_len - slice_len)
2827 .copied()
2828 .collect();
2829 let actual: Vec<Option<i32>> = typed.into_iter().collect();
2830 assert_eq!(expected, actual);
2831 }
2832 }
2833
2834 fn generate_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
2835 let mut ls = GenericListBuilder::<O, _>::new(UInt32Builder::new());
2836
2837 for i in 0..100_000 {
2838 for value in [i, i, i] {
2839 ls.values().append_value(value);
2840 }
2841 ls.append(true)
2842 }
2843
2844 ls.finish()
2845 }
2846
2847 fn generate_nested_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
2848 let mut ls =
2849 GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(UInt32Builder::new()));
2850
2851 for _i in 0..10_000 {
2852 for j in 0..10 {
2853 for value in [j, j, j, j] {
2854 ls.values().values().append_value(value);
2855 }
2856 ls.values().append(true)
2857 }
2858 ls.append(true);
2859 }
2860
2861 ls.finish()
2862 }
2863
2864 fn generate_nested_list_data_starting_at_zero<O: OffsetSizeTrait>() -> GenericListArray<O> {
2865 let mut ls =
2866 GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(UInt32Builder::new()));
2867
2868 for _i in 0..999 {
2869 ls.values().append(true);
2870 ls.append(true);
2871 }
2872
2873 for j in 0..10 {
2874 for value in [j, j, j, j] {
2875 ls.values().values().append_value(value);
2876 }
2877 ls.values().append(true)
2878 }
2879 ls.append(true);
2880
2881 for i in 0..9_000 {
2882 for j in 0..10 {
2883 for value in [i + j, i + j, i + j, i + j] {
2884 ls.values().values().append_value(value);
2885 }
2886 ls.values().append(true)
2887 }
2888 ls.append(true);
2889 }
2890
2891 ls.finish()
2892 }
2893
2894 fn generate_map_array_data() -> MapArray {
2895 let keys_builder = UInt32Builder::new();
2896 let values_builder = UInt32Builder::new();
2897
2898 let mut builder = MapBuilder::new(None, keys_builder, values_builder);
2899
2900 for i in 0..100_000 {
2901 for _j in 0..3 {
2902 builder.keys().append_value(i);
2903 builder.values().append_value(i * 2);
2904 }
2905 builder.append(true).unwrap();
2906 }
2907
2908 builder.finish()
2909 }
2910
2911 #[test]
2912 fn reencode_offsets_when_first_offset_is_not_zero() {
2913 let original_list = generate_list_data::<i32>();
2914 let original_data = original_list.into_data();
2915 let slice_data = original_data.slice(75, 7);
2916 let (new_offsets, original_start, length) =
2917 reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
2918 assert_eq!(
2919 vec![0, 3, 6, 9, 12, 15, 18, 21],
2920 new_offsets.typed_data::<i32>()
2921 );
2922 assert_eq!(225, original_start);
2923 assert_eq!(21, length);
2924 }
2925
2926 #[test]
2927 fn reencode_offsets_when_first_offset_is_zero() {
2928 let mut ls = GenericListBuilder::<i32, _>::new(UInt32Builder::new());
2929 ls.append(true);
2931 ls.values().append_value(35);
2932 ls.values().append_value(42);
2933 ls.append(true);
2934 let original_list = ls.finish();
2935 let original_data = original_list.into_data();
2936
2937 let slice_data = original_data.slice(1, 1);
2938 let (new_offsets, original_start, length) =
2939 reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
2940 assert_eq!(vec![0, 2], new_offsets.typed_data::<i32>());
2941 assert_eq!(0, original_start);
2942 assert_eq!(2, length);
2943 }
2944
2945 fn roundtrip_ensure_sliced_smaller(in_batch: RecordBatch, expected_size_factor: usize) {
2948 let in_sliced = in_batch.slice(999, 1);
2950
2951 let bytes_batch = serialize_file(&in_batch);
2952 let bytes_sliced = serialize_file(&in_sliced);
2953
2954 assert!(bytes_sliced.len() < (bytes_batch.len() / expected_size_factor));
2956
2957 let out_batch = deserialize_file(bytes_batch);
2959 assert_eq!(in_batch, out_batch);
2960
2961 let out_sliced = deserialize_file(bytes_sliced);
2962 assert_eq!(in_sliced, out_sliced);
2963 }
2964
2965 #[test]
2966 fn encode_lists() {
2967 let val_inner = Field::new_list_field(DataType::UInt32, true);
2968 let val_list_field = Field::new("val", DataType::List(Arc::new(val_inner)), false);
2969 let schema = Arc::new(Schema::new(vec![val_list_field]));
2970
2971 let values = Arc::new(generate_list_data::<i32>());
2972
2973 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2974 roundtrip_ensure_sliced_smaller(in_batch, 1000);
2975 }
2976
2977 #[test]
2978 fn encode_empty_list() {
2979 let val_inner = Field::new_list_field(DataType::UInt32, true);
2980 let val_list_field = Field::new("val", DataType::List(Arc::new(val_inner)), false);
2981 let schema = Arc::new(Schema::new(vec![val_list_field]));
2982
2983 let values = Arc::new(generate_list_data::<i32>());
2984
2985 let in_batch = RecordBatch::try_new(schema, vec![values])
2986 .unwrap()
2987 .slice(999, 0);
2988 let out_batch = deserialize_file(serialize_file(&in_batch));
2989 assert_eq!(in_batch, out_batch);
2990 }
2991
2992 #[test]
2993 fn encode_large_lists() {
2994 let val_inner = Field::new_list_field(DataType::UInt32, true);
2995 let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
2996 let schema = Arc::new(Schema::new(vec![val_list_field]));
2997
2998 let values = Arc::new(generate_list_data::<i64>());
2999
3000 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3003 roundtrip_ensure_sliced_smaller(in_batch, 1000);
3004 }
3005
3006 #[test]
3007 fn encode_nested_lists() {
3008 let inner_int = Arc::new(Field::new_list_field(DataType::UInt32, true));
3009 let inner_list_field = Arc::new(Field::new_list_field(DataType::List(inner_int), true));
3010 let list_field = Field::new("val", DataType::List(inner_list_field), true);
3011 let schema = Arc::new(Schema::new(vec![list_field]));
3012
3013 let values = Arc::new(generate_nested_list_data::<i32>());
3014
3015 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3016 roundtrip_ensure_sliced_smaller(in_batch, 1000);
3017 }
3018
3019 #[test]
3020 fn encode_nested_lists_starting_at_zero() {
3021 let inner_int = Arc::new(Field::new("item", DataType::UInt32, true));
3022 let inner_list_field = Arc::new(Field::new("item", DataType::List(inner_int), true));
3023 let list_field = Field::new("val", DataType::List(inner_list_field), true);
3024 let schema = Arc::new(Schema::new(vec![list_field]));
3025
3026 let values = Arc::new(generate_nested_list_data_starting_at_zero::<i32>());
3027
3028 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3029 roundtrip_ensure_sliced_smaller(in_batch, 1);
3030 }
3031
3032 #[test]
3033 fn encode_map_array() {
3034 let keys = Arc::new(Field::new("keys", DataType::UInt32, false));
3035 let values = Arc::new(Field::new("values", DataType::UInt32, true));
3036 let map_field = Field::new_map("map", "entries", keys, values, false, true);
3037 let schema = Arc::new(Schema::new(vec![map_field]));
3038
3039 let values = Arc::new(generate_map_array_data());
3040
3041 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3042 roundtrip_ensure_sliced_smaller(in_batch, 1000);
3043 }
3044
3045 #[test]
3046 fn test_decimal128_alignment16_is_sufficient() {
3047 const IPC_ALIGNMENT: usize = 16;
3048
3049 for num_cols in [1, 2, 3, 17, 50, 73, 99] {
3054 let num_rows = (num_cols * 7 + 11) % 100; let mut fields = Vec::new();
3057 let mut arrays = Vec::new();
3058 for i in 0..num_cols {
3059 let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
3060 let array = Decimal128Array::from(vec![num_cols as i128; num_rows]);
3061 fields.push(field);
3062 arrays.push(Arc::new(array) as Arc<dyn Array>);
3063 }
3064 let schema = Schema::new(fields);
3065 let batch = RecordBatch::try_new(Arc::new(schema), arrays).unwrap();
3066
3067 let mut writer = FileWriter::try_new_with_options(
3068 Vec::new(),
3069 batch.schema_ref(),
3070 IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
3071 )
3072 .unwrap();
3073 writer.write(&batch).unwrap();
3074 writer.finish().unwrap();
3075
3076 let out: Vec<u8> = writer.into_inner().unwrap();
3077
3078 let buffer = Buffer::from_vec(out);
3079 let trailer_start = buffer.len() - 10;
3080 let footer_len =
3081 read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
3082 let footer =
3083 root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
3084
3085 let schema = fb_to_schema(footer.schema().unwrap());
3086
3087 let decoder =
3090 FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true);
3091
3092 let batches = footer.recordBatches().unwrap();
3093
3094 let block = batches.get(0);
3095 let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
3096 let data = buffer.slice_with_length(block.offset() as _, block_len);
3097
3098 let batch2 = decoder.read_record_batch(block, &data).unwrap().unwrap();
3099
3100 assert_eq!(batch, batch2);
3101 }
3102 }
3103
3104 #[test]
3105 fn test_decimal128_alignment8_is_unaligned() {
3106 const IPC_ALIGNMENT: usize = 8;
3107
3108 let num_cols = 2;
3109 let num_rows = 1;
3110
3111 let mut fields = Vec::new();
3112 let mut arrays = Vec::new();
3113 for i in 0..num_cols {
3114 let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
3115 let array = Decimal128Array::from(vec![num_cols as i128; num_rows]);
3116 fields.push(field);
3117 arrays.push(Arc::new(array) as Arc<dyn Array>);
3118 }
3119 let schema = Schema::new(fields);
3120 let batch = RecordBatch::try_new(Arc::new(schema), arrays).unwrap();
3121
3122 let mut writer = FileWriter::try_new_with_options(
3123 Vec::new(),
3124 batch.schema_ref(),
3125 IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
3126 )
3127 .unwrap();
3128 writer.write(&batch).unwrap();
3129 writer.finish().unwrap();
3130
3131 let out: Vec<u8> = writer.into_inner().unwrap();
3132
3133 let buffer = Buffer::from_vec(out);
3134 let trailer_start = buffer.len() - 10;
3135 let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
3136 let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
3137 let schema = fb_to_schema(footer.schema().unwrap());
3138
3139 let decoder =
3142 FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true);
3143
3144 let batches = footer.recordBatches().unwrap();
3145
3146 let block = batches.get(0);
3147 let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
3148 let data = buffer.slice_with_length(block.offset() as _, block_len);
3149
3150 let result = decoder.read_record_batch(block, &data);
3151
3152 let error = result.unwrap_err();
3153 assert_eq!(
3154 error.to_string(),
3155 "Invalid argument error: Misaligned buffers[0] in array of type Decimal128(38, 10), \
3156 offset from expected alignment of 16 by 8"
3157 );
3158 }
3159
3160 #[test]
3161 fn test_flush() {
3162 let num_cols = 2;
3165 let mut fields = Vec::new();
3166 let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap();
3167 for i in 0..num_cols {
3168 let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
3169 fields.push(field);
3170 }
3171 let schema = Schema::new(fields);
3172 let inner_stream_writer = BufWriter::with_capacity(1024, Vec::new());
3173 let inner_file_writer = BufWriter::with_capacity(1024, Vec::new());
3174 let mut stream_writer =
3175 StreamWriter::try_new_with_options(inner_stream_writer, &schema, options.clone())
3176 .unwrap();
3177 let mut file_writer =
3178 FileWriter::try_new_with_options(inner_file_writer, &schema, options).unwrap();
3179
3180 let stream_bytes_written_on_new = stream_writer.get_ref().get_ref().len();
3181 let file_bytes_written_on_new = file_writer.get_ref().get_ref().len();
3182 stream_writer.flush().unwrap();
3183 file_writer.flush().unwrap();
3184 let stream_bytes_written_on_flush = stream_writer.get_ref().get_ref().len();
3185 let file_bytes_written_on_flush = file_writer.get_ref().get_ref().len();
3186 let stream_out = stream_writer.into_inner().unwrap().into_inner().unwrap();
3187 let expected_stream_flushed_bytes = stream_out.len() - 8;
3191 let expected_file_flushed_bytes = expected_stream_flushed_bytes + 8;
3194
3195 assert!(
3196 stream_bytes_written_on_new < stream_bytes_written_on_flush,
3197 "this test makes no sense if flush is not actually required"
3198 );
3199 assert!(
3200 file_bytes_written_on_new < file_bytes_written_on_flush,
3201 "this test makes no sense if flush is not actually required"
3202 );
3203 assert_eq!(stream_bytes_written_on_flush, expected_stream_flushed_bytes);
3204 assert_eq!(file_bytes_written_on_flush, expected_file_flushed_bytes);
3205 }
3206
3207 #[test]
3208 fn test_roundtrip_list_of_fixed_list() -> Result<(), ArrowError> {
3209 let l1_type =
3210 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, false)), 3);
3211 let l2_type = DataType::List(Arc::new(Field::new("item", l1_type.clone(), false)));
3212
3213 let l0_builder = Float32Builder::new();
3214 let l1_builder = FixedSizeListBuilder::new(l0_builder, 3).with_field(Arc::new(Field::new(
3215 "item",
3216 DataType::Float32,
3217 false,
3218 )));
3219 let mut l2_builder =
3220 ListBuilder::new(l1_builder).with_field(Arc::new(Field::new("item", l1_type, false)));
3221
3222 for point in [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0], [7.0, 8.0, 9.0]] {
3223 l2_builder.values().values().append_value(point[0]);
3224 l2_builder.values().values().append_value(point[1]);
3225 l2_builder.values().values().append_value(point[2]);
3226
3227 l2_builder.values().append(true);
3228 }
3229 l2_builder.append(true);
3230
3231 let point = [10., 11., 12.];
3232 l2_builder.values().values().append_value(point[0]);
3233 l2_builder.values().values().append_value(point[1]);
3234 l2_builder.values().values().append_value(point[2]);
3235
3236 l2_builder.values().append(true);
3237 l2_builder.append(true);
3238
3239 let array = Arc::new(l2_builder.finish()) as ArrayRef;
3240
3241 let schema = Arc::new(Schema::new_with_metadata(
3242 vec![Field::new("points", l2_type, false)],
3243 HashMap::default(),
3244 ));
3245
3246 test_slices(&array, &schema, 0, 1)?;
3249 test_slices(&array, &schema, 0, 2)?;
3250 test_slices(&array, &schema, 1, 1)?;
3251
3252 Ok(())
3253 }
3254
3255 #[test]
3256 fn test_roundtrip_list_of_fixed_list_w_nulls() -> Result<(), ArrowError> {
3257 let l0_builder = Float32Builder::new();
3258 let l1_builder = FixedSizeListBuilder::new(l0_builder, 3);
3259 let mut l2_builder = ListBuilder::new(l1_builder);
3260
3261 for point in [
3262 [Some(1.0), Some(2.0), None],
3263 [Some(4.0), Some(5.0), Some(6.0)],
3264 [None, Some(8.0), Some(9.0)],
3265 ] {
3266 for p in point {
3267 match p {
3268 Some(p) => l2_builder.values().values().append_value(p),
3269 None => l2_builder.values().values().append_null(),
3270 }
3271 }
3272
3273 l2_builder.values().append(true);
3274 }
3275 l2_builder.append(true);
3276
3277 let point = [Some(10.), None, None];
3278 for p in point {
3279 match p {
3280 Some(p) => l2_builder.values().values().append_value(p),
3281 None => l2_builder.values().values().append_null(),
3282 }
3283 }
3284
3285 l2_builder.values().append(true);
3286 l2_builder.append(true);
3287
3288 let array = Arc::new(l2_builder.finish()) as ArrayRef;
3289
3290 let schema = Arc::new(Schema::new_with_metadata(
3291 vec![Field::new(
3292 "points",
3293 DataType::List(Arc::new(Field::new(
3294 "item",
3295 DataType::FixedSizeList(
3296 Arc::new(Field::new("item", DataType::Float32, true)),
3297 3,
3298 ),
3299 true,
3300 ))),
3301 true,
3302 )],
3303 HashMap::default(),
3304 ));
3305
3306 test_slices(&array, &schema, 0, 1)?;
3309 test_slices(&array, &schema, 0, 2)?;
3310 test_slices(&array, &schema, 1, 1)?;
3311
3312 Ok(())
3313 }
3314
3315 fn test_slices(
3316 parent_array: &ArrayRef,
3317 schema: &SchemaRef,
3318 offset: usize,
3319 length: usize,
3320 ) -> Result<(), ArrowError> {
3321 let subarray = parent_array.slice(offset, length);
3322 let original_batch = RecordBatch::try_new(schema.clone(), vec![subarray])?;
3323
3324 let mut bytes = Vec::new();
3325 let mut writer = StreamWriter::try_new(&mut bytes, schema)?;
3326 writer.write(&original_batch)?;
3327 writer.finish()?;
3328
3329 let mut cursor = std::io::Cursor::new(bytes);
3330 let mut reader = StreamReader::try_new(&mut cursor, None)?;
3331 let returned_batch = reader.next().unwrap()?;
3332
3333 assert_eq!(original_batch, returned_batch);
3334
3335 Ok(())
3336 }
3337
3338 #[test]
3339 fn test_roundtrip_fixed_list() -> Result<(), ArrowError> {
3340 let int_builder = Int64Builder::new();
3341 let mut fixed_list_builder = FixedSizeListBuilder::new(int_builder, 3)
3342 .with_field(Arc::new(Field::new("item", DataType::Int64, false)));
3343
3344 for point in [[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]] {
3345 fixed_list_builder.values().append_value(point[0]);
3346 fixed_list_builder.values().append_value(point[1]);
3347 fixed_list_builder.values().append_value(point[2]);
3348
3349 fixed_list_builder.append(true);
3350 }
3351
3352 let array = Arc::new(fixed_list_builder.finish()) as ArrayRef;
3353
3354 let schema = Arc::new(Schema::new_with_metadata(
3355 vec![Field::new(
3356 "points",
3357 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, false)), 3),
3358 false,
3359 )],
3360 HashMap::default(),
3361 ));
3362
3363 test_slices(&array, &schema, 0, 4)?;
3366 test_slices(&array, &schema, 0, 2)?;
3367 test_slices(&array, &schema, 1, 3)?;
3368 test_slices(&array, &schema, 2, 1)?;
3369
3370 Ok(())
3371 }
3372
3373 #[test]
3374 fn test_roundtrip_fixed_list_w_nulls() -> Result<(), ArrowError> {
3375 let int_builder = Int64Builder::new();
3376 let mut fixed_list_builder = FixedSizeListBuilder::new(int_builder, 3);
3377
3378 for point in [
3379 [Some(1), Some(2), None],
3380 [Some(4), Some(5), Some(6)],
3381 [None, Some(8), Some(9)],
3382 [Some(10), None, None],
3383 ] {
3384 for p in point {
3385 match p {
3386 Some(p) => fixed_list_builder.values().append_value(p),
3387 None => fixed_list_builder.values().append_null(),
3388 }
3389 }
3390
3391 fixed_list_builder.append(true);
3392 }
3393
3394 let array = Arc::new(fixed_list_builder.finish()) as ArrayRef;
3395
3396 let schema = Arc::new(Schema::new_with_metadata(
3397 vec![Field::new(
3398 "points",
3399 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, true)), 3),
3400 true,
3401 )],
3402 HashMap::default(),
3403 ));
3404
3405 test_slices(&array, &schema, 0, 4)?;
3408 test_slices(&array, &schema, 0, 2)?;
3409 test_slices(&array, &schema, 1, 3)?;
3410 test_slices(&array, &schema, 2, 1)?;
3411
3412 Ok(())
3413 }
3414
3415 #[test]
3416 fn test_metadata_encoding_ordering() {
3417 fn create_hash() -> u64 {
3418 let metadata: HashMap<String, String> = [
3419 ("a", "1"), ("b", "2"), ("c", "3"), ("d", "4"), ("e", "5"), ]
3425 .into_iter()
3426 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3427 .collect();
3428
3429 let schema = Arc::new(
3431 Schema::new(vec![
3432 Field::new("a", DataType::Int64, true).with_metadata(metadata.clone())
3433 ])
3434 .with_metadata(metadata)
3435 .clone(),
3436 );
3437 let batch = RecordBatch::new_empty(schema.clone());
3438
3439 let mut bytes = Vec::new();
3440 let mut w = StreamWriter::try_new(&mut bytes, batch.schema_ref()).unwrap();
3441 w.write(&batch).unwrap();
3442 w.finish().unwrap();
3443
3444 let mut h = std::hash::DefaultHasher::new();
3445 h.write(&bytes);
3446 h.finish()
3447 }
3448
3449 let expected = create_hash();
3450
3451 let all_passed = (0..20).all(|_| create_hash() == expected);
3456 assert!(all_passed);
3457 }
3458}