1use std::cmp::min;
28use std::collections::HashMap;
29use std::io::{BufWriter, Write};
30use std::mem::size_of;
31use std::sync::Arc;
32
33use flatbuffers::FlatBufferBuilder;
34
35use arrow_array::builder::BufferBuilder;
36use arrow_array::cast::*;
37use arrow_array::types::{Int16Type, Int32Type, Int64Type, RunEndIndexType};
38use arrow_array::*;
39use arrow_buffer::bit_util;
40use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer};
41use arrow_data::{ArrayData, ArrayDataBuilder, BufferSpec, layout};
42use arrow_schema::*;
43
44use crate::CONTINUATION_MARKER;
45use crate::compression::CompressionCodec;
46pub use crate::compression::CompressionContext;
47use crate::convert::IpcSchemaEncoder;
48
49#[derive(Debug, Clone)]
51pub struct IpcWriteOptions {
52 alignment: u8,
55 write_legacy_ipc_format: bool,
57 metadata_version: crate::MetadataVersion,
66 batch_compression_type: Option<crate::CompressionType>,
69 dictionary_handling: DictionaryHandling,
71}
72
73impl IpcWriteOptions {
74 pub fn try_with_compression(
79 mut self,
80 batch_compression_type: Option<crate::CompressionType>,
81 ) -> Result<Self, ArrowError> {
82 self.batch_compression_type = batch_compression_type;
83
84 if self.batch_compression_type.is_some()
85 && self.metadata_version < crate::MetadataVersion::V5
86 {
87 return Err(ArrowError::InvalidArgumentError(
88 "Compression only supported in metadata v5 and above".to_string(),
89 ));
90 }
91 Ok(self)
92 }
93 pub fn try_new(
95 alignment: usize,
96 write_legacy_ipc_format: bool,
97 metadata_version: crate::MetadataVersion,
98 ) -> Result<Self, ArrowError> {
99 let is_alignment_valid =
100 alignment == 8 || alignment == 16 || alignment == 32 || alignment == 64;
101 if !is_alignment_valid {
102 return Err(ArrowError::InvalidArgumentError(
103 "Alignment should be 8, 16, 32, or 64.".to_string(),
104 ));
105 }
106 let alignment: u8 = u8::try_from(alignment).expect("range already checked");
107 match metadata_version {
108 crate::MetadataVersion::V1
109 | crate::MetadataVersion::V2
110 | crate::MetadataVersion::V3 => Err(ArrowError::InvalidArgumentError(
111 "Writing IPC metadata version 3 and lower not supported".to_string(),
112 )),
113 #[allow(deprecated)]
114 crate::MetadataVersion::V4 => Ok(Self {
115 alignment,
116 write_legacy_ipc_format,
117 metadata_version,
118 batch_compression_type: None,
119 dictionary_handling: DictionaryHandling::default(),
120 }),
121 crate::MetadataVersion::V5 => {
122 if write_legacy_ipc_format {
123 Err(ArrowError::InvalidArgumentError(
124 "Legacy IPC format only supported on metadata version 4".to_string(),
125 ))
126 } else {
127 Ok(Self {
128 alignment,
129 write_legacy_ipc_format,
130 metadata_version,
131 batch_compression_type: None,
132 dictionary_handling: DictionaryHandling::default(),
133 })
134 }
135 }
136 z => Err(ArrowError::InvalidArgumentError(format!(
137 "Unsupported crate::MetadataVersion {z:?}"
138 ))),
139 }
140 }
141
142 pub fn with_dictionary_handling(mut self, dictionary_handling: DictionaryHandling) -> Self {
144 self.dictionary_handling = dictionary_handling;
145 self
146 }
147}
148
149impl Default for IpcWriteOptions {
150 fn default() -> Self {
151 Self {
152 alignment: 64,
153 write_legacy_ipc_format: false,
154 metadata_version: crate::MetadataVersion::V5,
155 batch_compression_type: None,
156 dictionary_handling: DictionaryHandling::default(),
157 }
158 }
159}
160
161#[derive(Debug, Default)]
162pub struct IpcDataGenerator {}
196
197impl IpcDataGenerator {
198 pub fn schema_to_bytes_with_dictionary_tracker(
201 &self,
202 schema: &Schema,
203 dictionary_tracker: &mut DictionaryTracker,
204 write_options: &IpcWriteOptions,
205 ) -> EncodedData {
206 let mut fbb = FlatBufferBuilder::new();
207 let schema = {
208 let fb = IpcSchemaEncoder::new()
209 .with_dictionary_tracker(dictionary_tracker)
210 .schema_to_fb_offset(&mut fbb, schema);
211 fb.as_union_value()
212 };
213
214 let mut message = crate::MessageBuilder::new(&mut fbb);
215 message.add_version(write_options.metadata_version);
216 message.add_header_type(crate::MessageHeader::Schema);
217 message.add_bodyLength(0);
218 message.add_header(schema);
219 let data = message.finish();
221 fbb.finish(data, None);
222
223 let data = fbb.finished_data();
224 EncodedData {
225 ipc_message: data.to_vec(),
226 arrow_data: vec![],
227 }
228 }
229
230 fn _encode_dictionaries<I: Iterator<Item = i64>>(
231 &self,
232 column: &ArrayRef,
233 encoded_dictionaries: &mut Vec<EncodedData>,
234 dictionary_tracker: &mut DictionaryTracker,
235 write_options: &IpcWriteOptions,
236 dict_id: &mut I,
237 compression_context: &mut CompressionContext,
238 ) -> Result<(), ArrowError> {
239 match column.data_type() {
240 DataType::Struct(fields) => {
241 let s = as_struct_array(column);
242 for (field, column) in fields.iter().zip(s.columns()) {
243 self.encode_dictionaries(
244 field,
245 column,
246 encoded_dictionaries,
247 dictionary_tracker,
248 write_options,
249 dict_id,
250 compression_context,
251 )?;
252 }
253 }
254 DataType::RunEndEncoded(_, values) => {
255 let data = column.to_data();
256 if data.child_data().len() != 2 {
257 return Err(ArrowError::InvalidArgumentError(format!(
258 "The run encoded array should have exactly two child arrays. Found {}",
259 data.child_data().len()
260 )));
261 }
262 let values_array = make_array(data.child_data()[1].clone());
265 self.encode_dictionaries(
266 values,
267 &values_array,
268 encoded_dictionaries,
269 dictionary_tracker,
270 write_options,
271 dict_id,
272 compression_context,
273 )?;
274 }
275 DataType::List(field) => {
276 let list = as_list_array(column);
277 self.encode_dictionaries(
278 field,
279 list.values(),
280 encoded_dictionaries,
281 dictionary_tracker,
282 write_options,
283 dict_id,
284 compression_context,
285 )?;
286 }
287 DataType::LargeList(field) => {
288 let list = as_large_list_array(column);
289 self.encode_dictionaries(
290 field,
291 list.values(),
292 encoded_dictionaries,
293 dictionary_tracker,
294 write_options,
295 dict_id,
296 compression_context,
297 )?;
298 }
299 DataType::FixedSizeList(field, _) => {
300 let list = column
301 .as_any()
302 .downcast_ref::<FixedSizeListArray>()
303 .expect("Unable to downcast to fixed size list array");
304 self.encode_dictionaries(
305 field,
306 list.values(),
307 encoded_dictionaries,
308 dictionary_tracker,
309 write_options,
310 dict_id,
311 compression_context,
312 )?;
313 }
314 DataType::Map(field, _) => {
315 let map_array = as_map_array(column);
316
317 let (keys, values) = match field.data_type() {
318 DataType::Struct(fields) if fields.len() == 2 => (&fields[0], &fields[1]),
319 _ => panic!("Incorrect field data type {:?}", field.data_type()),
320 };
321
322 self.encode_dictionaries(
324 keys,
325 map_array.keys(),
326 encoded_dictionaries,
327 dictionary_tracker,
328 write_options,
329 dict_id,
330 compression_context,
331 )?;
332
333 self.encode_dictionaries(
335 values,
336 map_array.values(),
337 encoded_dictionaries,
338 dictionary_tracker,
339 write_options,
340 dict_id,
341 compression_context,
342 )?;
343 }
344 DataType::Union(fields, _) => {
345 let union = as_union_array(column);
346 for (type_id, field) in fields.iter() {
347 let column = union.child(type_id);
348 self.encode_dictionaries(
349 field,
350 column,
351 encoded_dictionaries,
352 dictionary_tracker,
353 write_options,
354 dict_id,
355 compression_context,
356 )?;
357 }
358 }
359 _ => (),
360 }
361
362 Ok(())
363 }
364
365 #[allow(clippy::too_many_arguments)]
366 fn encode_dictionaries<I: Iterator<Item = i64>>(
367 &self,
368 field: &Field,
369 column: &ArrayRef,
370 encoded_dictionaries: &mut Vec<EncodedData>,
371 dictionary_tracker: &mut DictionaryTracker,
372 write_options: &IpcWriteOptions,
373 dict_id_seq: &mut I,
374 compression_context: &mut CompressionContext,
375 ) -> Result<(), ArrowError> {
376 match column.data_type() {
377 DataType::Dictionary(_key_type, _value_type) => {
378 let dict_data = column.to_data();
379 let dict_values = &dict_data.child_data()[0];
380
381 let values = make_array(dict_data.child_data()[0].clone());
382
383 self._encode_dictionaries(
384 &values,
385 encoded_dictionaries,
386 dictionary_tracker,
387 write_options,
388 dict_id_seq,
389 compression_context,
390 )?;
391
392 let dict_id = dict_id_seq.next().ok_or_else(|| {
396 ArrowError::IpcError(format!("no dict id for field {}", field.name()))
397 })?;
398
399 match dictionary_tracker.insert_column(
400 dict_id,
401 column,
402 write_options.dictionary_handling,
403 )? {
404 DictionaryUpdate::None => {}
405 DictionaryUpdate::New | DictionaryUpdate::Replaced => {
406 encoded_dictionaries.push(self.dictionary_batch_to_bytes(
407 dict_id,
408 dict_values,
409 write_options,
410 false,
411 compression_context,
412 )?);
413 }
414 DictionaryUpdate::Delta(data) => {
415 encoded_dictionaries.push(self.dictionary_batch_to_bytes(
416 dict_id,
417 &data,
418 write_options,
419 true,
420 compression_context,
421 )?);
422 }
423 }
424 }
425 _ => self._encode_dictionaries(
426 column,
427 encoded_dictionaries,
428 dictionary_tracker,
429 write_options,
430 dict_id_seq,
431 compression_context,
432 )?,
433 }
434
435 Ok(())
436 }
437
438 pub fn encode(
442 &self,
443 batch: &RecordBatch,
444 dictionary_tracker: &mut DictionaryTracker,
445 write_options: &IpcWriteOptions,
446 compression_context: &mut CompressionContext,
447 ) -> Result<(Vec<EncodedData>, EncodedData), ArrowError> {
448 let schema = batch.schema();
449 let mut encoded_dictionaries = Vec::with_capacity(schema.flattened_fields().len());
450
451 let mut dict_id = dictionary_tracker.dict_ids.clone().into_iter();
452
453 for (i, field) in schema.fields().iter().enumerate() {
454 let column = batch.column(i);
455 self.encode_dictionaries(
456 field,
457 column,
458 &mut encoded_dictionaries,
459 dictionary_tracker,
460 write_options,
461 &mut dict_id,
462 compression_context,
463 )?;
464 }
465
466 let encoded_message =
467 self.record_batch_to_bytes(batch, write_options, compression_context)?;
468 Ok((encoded_dictionaries, encoded_message))
469 }
470
471 #[deprecated(since = "57.0.0", note = "Use `encode` instead")]
475 pub fn encoded_batch(
476 &self,
477 batch: &RecordBatch,
478 dictionary_tracker: &mut DictionaryTracker,
479 write_options: &IpcWriteOptions,
480 ) -> Result<(Vec<EncodedData>, EncodedData), ArrowError> {
481 self.encode(
482 batch,
483 dictionary_tracker,
484 write_options,
485 &mut Default::default(),
486 )
487 }
488
489 fn record_batch_to_bytes(
492 &self,
493 batch: &RecordBatch,
494 write_options: &IpcWriteOptions,
495 compression_context: &mut CompressionContext,
496 ) -> Result<EncodedData, ArrowError> {
497 let mut fbb = FlatBufferBuilder::new();
498
499 let mut nodes: Vec<crate::FieldNode> = vec![];
500 let mut buffers: Vec<crate::Buffer> = vec![];
501 let mut arrow_data: Vec<u8> = vec![];
502 let mut offset = 0;
503
504 let batch_compression_type = write_options.batch_compression_type;
506
507 let compression = batch_compression_type.map(|batch_compression_type| {
508 let mut c = crate::BodyCompressionBuilder::new(&mut fbb);
509 c.add_method(crate::BodyCompressionMethod::BUFFER);
510 c.add_codec(batch_compression_type);
511 c.finish()
512 });
513
514 let compression_codec: Option<CompressionCodec> =
515 batch_compression_type.map(TryInto::try_into).transpose()?;
516
517 let mut variadic_buffer_counts = vec![];
518
519 for array in batch.columns() {
520 let array_data = array.to_data();
521 offset = write_array_data(
522 &array_data,
523 &mut buffers,
524 &mut arrow_data,
525 &mut nodes,
526 offset,
527 array.len(),
528 array.null_count(),
529 compression_codec,
530 compression_context,
531 write_options,
532 )?;
533
534 append_variadic_buffer_counts(&mut variadic_buffer_counts, &array_data);
535 }
536 let len = arrow_data.len();
538 let pad_len = pad_to_alignment(write_options.alignment, len);
539 arrow_data.extend_from_slice(&PADDING[..pad_len]);
540
541 let buffers = fbb.create_vector(&buffers);
543 let nodes = fbb.create_vector(&nodes);
544 let variadic_buffer = if variadic_buffer_counts.is_empty() {
545 None
546 } else {
547 Some(fbb.create_vector(&variadic_buffer_counts))
548 };
549
550 let root = {
551 let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb);
552 batch_builder.add_length(batch.num_rows() as i64);
553 batch_builder.add_nodes(nodes);
554 batch_builder.add_buffers(buffers);
555 if let Some(c) = compression {
556 batch_builder.add_compression(c);
557 }
558
559 if let Some(v) = variadic_buffer {
560 batch_builder.add_variadicBufferCounts(v);
561 }
562 let b = batch_builder.finish();
563 b.as_union_value()
564 };
565 let mut message = crate::MessageBuilder::new(&mut fbb);
567 message.add_version(write_options.metadata_version);
568 message.add_header_type(crate::MessageHeader::RecordBatch);
569 message.add_bodyLength(arrow_data.len() as i64);
570 message.add_header(root);
571 let root = message.finish();
572 fbb.finish(root, None);
573 let finished_data = fbb.finished_data();
574
575 Ok(EncodedData {
576 ipc_message: finished_data.to_vec(),
577 arrow_data,
578 })
579 }
580
581 fn dictionary_batch_to_bytes(
584 &self,
585 dict_id: i64,
586 array_data: &ArrayData,
587 write_options: &IpcWriteOptions,
588 is_delta: bool,
589 compression_context: &mut CompressionContext,
590 ) -> Result<EncodedData, ArrowError> {
591 let mut fbb = FlatBufferBuilder::new();
592
593 let mut nodes: Vec<crate::FieldNode> = vec![];
594 let mut buffers: Vec<crate::Buffer> = vec![];
595 let mut arrow_data: Vec<u8> = vec![];
596
597 let batch_compression_type = write_options.batch_compression_type;
599
600 let compression = batch_compression_type.map(|batch_compression_type| {
601 let mut c = crate::BodyCompressionBuilder::new(&mut fbb);
602 c.add_method(crate::BodyCompressionMethod::BUFFER);
603 c.add_codec(batch_compression_type);
604 c.finish()
605 });
606
607 let compression_codec: Option<CompressionCodec> = batch_compression_type
608 .map(|batch_compression_type| batch_compression_type.try_into())
609 .transpose()?;
610
611 write_array_data(
612 array_data,
613 &mut buffers,
614 &mut arrow_data,
615 &mut nodes,
616 0,
617 array_data.len(),
618 array_data.null_count(),
619 compression_codec,
620 compression_context,
621 write_options,
622 )?;
623
624 let mut variadic_buffer_counts = vec![];
625 append_variadic_buffer_counts(&mut variadic_buffer_counts, array_data);
626
627 let len = arrow_data.len();
629 let pad_len = pad_to_alignment(write_options.alignment, len);
630 arrow_data.extend_from_slice(&PADDING[..pad_len]);
631
632 let buffers = fbb.create_vector(&buffers);
634 let nodes = fbb.create_vector(&nodes);
635 let variadic_buffer = if variadic_buffer_counts.is_empty() {
636 None
637 } else {
638 Some(fbb.create_vector(&variadic_buffer_counts))
639 };
640
641 let root = {
642 let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb);
643 batch_builder.add_length(array_data.len() as i64);
644 batch_builder.add_nodes(nodes);
645 batch_builder.add_buffers(buffers);
646 if let Some(c) = compression {
647 batch_builder.add_compression(c);
648 }
649 if let Some(v) = variadic_buffer {
650 batch_builder.add_variadicBufferCounts(v);
651 }
652 batch_builder.finish()
653 };
654
655 let root = {
656 let mut batch_builder = crate::DictionaryBatchBuilder::new(&mut fbb);
657 batch_builder.add_id(dict_id);
658 batch_builder.add_data(root);
659 batch_builder.add_isDelta(is_delta);
660 batch_builder.finish().as_union_value()
661 };
662
663 let root = {
664 let mut message_builder = crate::MessageBuilder::new(&mut fbb);
665 message_builder.add_version(write_options.metadata_version);
666 message_builder.add_header_type(crate::MessageHeader::DictionaryBatch);
667 message_builder.add_bodyLength(arrow_data.len() as i64);
668 message_builder.add_header(root);
669 message_builder.finish()
670 };
671
672 fbb.finish(root, None);
673 let finished_data = fbb.finished_data();
674
675 Ok(EncodedData {
676 ipc_message: finished_data.to_vec(),
677 arrow_data,
678 })
679 }
680}
681
682fn append_variadic_buffer_counts(counts: &mut Vec<i64>, array: &ArrayData) {
683 match array.data_type() {
684 DataType::BinaryView | DataType::Utf8View => {
685 counts.push(array.buffers().len() as i64 - 1);
688 }
689 DataType::Dictionary(_, _) => {
690 }
693 _ => {
694 for child in array.child_data() {
695 append_variadic_buffer_counts(counts, child)
696 }
697 }
698 }
699}
700
701pub(crate) fn unslice_run_array(arr: ArrayData) -> Result<ArrayData, ArrowError> {
702 match arr.data_type() {
703 DataType::RunEndEncoded(k, _) => match k.data_type() {
704 DataType::Int16 => {
705 Ok(into_zero_offset_run_array(RunArray::<Int16Type>::from(arr))?.into_data())
706 }
707 DataType::Int32 => {
708 Ok(into_zero_offset_run_array(RunArray::<Int32Type>::from(arr))?.into_data())
709 }
710 DataType::Int64 => {
711 Ok(into_zero_offset_run_array(RunArray::<Int64Type>::from(arr))?.into_data())
712 }
713 d => unreachable!("Unexpected data type {d}"),
714 },
715 d => Err(ArrowError::InvalidArgumentError(format!(
716 "The given array is not a run array. Data type of given array: {d}"
717 ))),
718 }
719}
720
721fn into_zero_offset_run_array<R: RunEndIndexType>(
724 run_array: RunArray<R>,
725) -> Result<RunArray<R>, ArrowError> {
726 let run_ends = run_array.run_ends();
727 if run_ends.offset() == 0 && run_ends.max_value() == run_ends.len() {
728 return Ok(run_array);
729 }
730
731 let start_physical_index = run_ends.get_start_physical_index();
733
734 let end_physical_index = run_ends.get_end_physical_index();
736
737 let physical_length = end_physical_index - start_physical_index + 1;
738
739 let offset = R::Native::usize_as(run_ends.offset());
741 let mut builder = BufferBuilder::<R::Native>::new(physical_length);
742 for run_end_value in &run_ends.values()[start_physical_index..end_physical_index] {
743 builder.append(run_end_value.sub_wrapping(offset));
744 }
745 builder.append(R::Native::from_usize(run_array.len()).unwrap());
746 let new_run_ends = unsafe {
747 ArrayDataBuilder::new(R::DATA_TYPE)
750 .len(physical_length)
751 .add_buffer(builder.finish())
752 .build_unchecked()
753 };
754
755 let new_values = run_array
757 .values()
758 .slice(start_physical_index, physical_length)
759 .into_data();
760
761 let builder = ArrayDataBuilder::new(run_array.data_type().clone())
762 .len(run_array.len())
763 .add_child_data(new_run_ends)
764 .add_child_data(new_values);
765 let array_data = unsafe {
766 builder.build_unchecked()
769 };
770 Ok(array_data.into())
771}
772
773#[derive(Debug, Clone, Copy, PartialEq, Eq)]
775pub enum DictionaryHandling {
776 Resend,
778 Delta,
784}
785
786impl Default for DictionaryHandling {
787 fn default() -> Self {
788 Self::Resend
789 }
790}
791
792#[derive(Debug, Clone)]
794pub enum DictionaryUpdate {
795 None,
798 New,
800 Replaced,
802 Delta(ArrayData),
804}
805
806#[derive(Debug)]
812pub struct DictionaryTracker {
813 written: HashMap<i64, ArrayData>,
814 dict_ids: Vec<i64>,
815 error_on_replacement: bool,
816}
817
818impl DictionaryTracker {
819 pub fn new(error_on_replacement: bool) -> Self {
825 #[allow(deprecated)]
826 Self {
827 written: HashMap::new(),
828 dict_ids: Vec::new(),
829 error_on_replacement,
830 }
831 }
832
833 pub fn next_dict_id(&mut self) -> i64 {
835 let next = self
836 .dict_ids
837 .last()
838 .copied()
839 .map(|i| i + 1)
840 .unwrap_or_default();
841
842 self.dict_ids.push(next);
843 next
844 }
845
846 pub fn dict_id(&mut self) -> &[i64] {
849 &self.dict_ids
850 }
851
852 #[deprecated(since = "56.1.0", note = "Use `insert_column` instead")]
862 pub fn insert(&mut self, dict_id: i64, column: &ArrayRef) -> Result<bool, ArrowError> {
863 let dict_data = column.to_data();
864 let dict_values = &dict_data.child_data()[0];
865
866 if let Some(last) = self.written.get(&dict_id) {
868 if ArrayData::ptr_eq(&last.child_data()[0], dict_values) {
869 return Ok(false);
871 }
872 if self.error_on_replacement {
873 if last.child_data()[0] == *dict_values {
875 return Ok(false);
877 }
878 return Err(ArrowError::InvalidArgumentError(
879 "Dictionary replacement detected when writing IPC file format. \
880 Arrow IPC files only support a single dictionary for a given field \
881 across all batches."
882 .to_string(),
883 ));
884 }
885 }
886
887 self.written.insert(dict_id, dict_data);
888 Ok(true)
889 }
890
891 pub fn insert_column(
907 &mut self,
908 dict_id: i64,
909 column: &ArrayRef,
910 dict_handling: DictionaryHandling,
911 ) -> Result<DictionaryUpdate, ArrowError> {
912 let new_data = column.to_data();
913 let new_values = &new_data.child_data()[0];
914
915 let Some(old) = self.written.get(&dict_id) else {
917 self.written.insert(dict_id, new_data);
918 return Ok(DictionaryUpdate::New);
919 };
920
921 let old_values = &old.child_data()[0];
924 if ArrayData::ptr_eq(old_values, new_values) {
925 return Ok(DictionaryUpdate::None);
926 }
927
928 let comparison = compare_dictionaries(old_values, new_values);
930 if matches!(comparison, DictionaryComparison::Equal) {
931 return Ok(DictionaryUpdate::None);
932 }
933
934 const REPLACEMENT_ERROR: &str = "Dictionary replacement detected when writing IPC file format. \
935 Arrow IPC files only support a single dictionary for a given field \
936 across all batches.";
937
938 match comparison {
939 DictionaryComparison::NotEqual => {
940 if self.error_on_replacement {
941 return Err(ArrowError::InvalidArgumentError(
942 REPLACEMENT_ERROR.to_string(),
943 ));
944 }
945
946 self.written.insert(dict_id, new_data);
947 Ok(DictionaryUpdate::Replaced)
948 }
949 DictionaryComparison::Delta => match dict_handling {
950 DictionaryHandling::Resend => {
951 if self.error_on_replacement {
952 return Err(ArrowError::InvalidArgumentError(
953 REPLACEMENT_ERROR.to_string(),
954 ));
955 }
956
957 self.written.insert(dict_id, new_data);
958 Ok(DictionaryUpdate::Replaced)
959 }
960 DictionaryHandling::Delta => {
961 let delta =
962 new_values.slice(old_values.len(), new_values.len() - old_values.len());
963 self.written.insert(dict_id, new_data);
964 Ok(DictionaryUpdate::Delta(delta))
965 }
966 },
967 DictionaryComparison::Equal => unreachable!("Already checked equal case"),
968 }
969 }
970}
971
972#[derive(Debug, Clone)]
974enum DictionaryComparison {
975 NotEqual,
977 Equal,
979 Delta,
982}
983
984fn compare_dictionaries(old: &ArrayData, new: &ArrayData) -> DictionaryComparison {
986 let existing_len = old.len();
988 let new_len = new.len();
989 if existing_len == new_len {
990 if *old == *new {
991 return DictionaryComparison::Equal;
992 } else {
993 return DictionaryComparison::NotEqual;
994 }
995 }
996
997 if new_len < existing_len {
999 return DictionaryComparison::NotEqual;
1000 }
1001
1002 if new.slice(0, existing_len) == *old {
1004 return DictionaryComparison::Delta;
1005 }
1006
1007 DictionaryComparison::NotEqual
1008}
1009
1010pub struct FileWriter<W> {
1033 writer: W,
1035 write_options: IpcWriteOptions,
1037 schema: SchemaRef,
1039 block_offsets: usize,
1041 dictionary_blocks: Vec<crate::Block>,
1043 record_blocks: Vec<crate::Block>,
1045 finished: bool,
1047 dictionary_tracker: DictionaryTracker,
1049 custom_metadata: HashMap<String, String>,
1051
1052 data_gen: IpcDataGenerator,
1053
1054 compression_context: CompressionContext,
1055}
1056
1057impl<W: Write> FileWriter<BufWriter<W>> {
1058 pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1062 Self::try_new(BufWriter::new(writer), schema)
1063 }
1064}
1065
1066impl<W: Write> FileWriter<W> {
1067 pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1075 let write_options = IpcWriteOptions::default();
1076 Self::try_new_with_options(writer, schema, write_options)
1077 }
1078
1079 pub fn try_new_with_options(
1087 mut writer: W,
1088 schema: &Schema,
1089 write_options: IpcWriteOptions,
1090 ) -> Result<Self, ArrowError> {
1091 let data_gen = IpcDataGenerator::default();
1092 let pad_len = pad_to_alignment(write_options.alignment, super::ARROW_MAGIC.len());
1094 let header_size = super::ARROW_MAGIC.len() + pad_len;
1095 writer.write_all(&super::ARROW_MAGIC)?;
1096 writer.write_all(&PADDING[..pad_len])?;
1097 let mut dictionary_tracker = DictionaryTracker::new(true);
1099 let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker(
1100 schema,
1101 &mut dictionary_tracker,
1102 &write_options,
1103 );
1104 let (meta, data) = write_message(&mut writer, encoded_message, &write_options)?;
1105 Ok(Self {
1106 writer,
1107 write_options,
1108 schema: Arc::new(schema.clone()),
1109 block_offsets: meta + data + header_size,
1110 dictionary_blocks: vec![],
1111 record_blocks: vec![],
1112 finished: false,
1113 dictionary_tracker,
1114 custom_metadata: HashMap::new(),
1115 data_gen,
1116 compression_context: CompressionContext::default(),
1117 })
1118 }
1119
1120 pub fn write_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
1122 self.custom_metadata.insert(key.into(), value.into());
1123 }
1124
1125 pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1127 if self.finished {
1128 return Err(ArrowError::IpcError(
1129 "Cannot write record batch to file writer as it is closed".to_string(),
1130 ));
1131 }
1132
1133 let (encoded_dictionaries, encoded_message) = self.data_gen.encode(
1134 batch,
1135 &mut self.dictionary_tracker,
1136 &self.write_options,
1137 &mut self.compression_context,
1138 )?;
1139
1140 for encoded_dictionary in encoded_dictionaries {
1141 let (meta, data) =
1142 write_message(&mut self.writer, encoded_dictionary, &self.write_options)?;
1143
1144 let block = crate::Block::new(self.block_offsets as i64, meta as i32, data as i64);
1145 self.dictionary_blocks.push(block);
1146 self.block_offsets += meta + data;
1147 }
1148
1149 let (meta, data) = write_message(&mut self.writer, encoded_message, &self.write_options)?;
1150
1151 let block = crate::Block::new(
1153 self.block_offsets as i64,
1154 meta as i32, data as i64,
1156 );
1157 self.record_blocks.push(block);
1158 self.block_offsets += meta + data;
1159 Ok(())
1160 }
1161
1162 pub fn finish(&mut self) -> Result<(), ArrowError> {
1164 if self.finished {
1165 return Err(ArrowError::IpcError(
1166 "Cannot write footer to file writer as it is closed".to_string(),
1167 ));
1168 }
1169
1170 write_continuation(&mut self.writer, &self.write_options, 0)?;
1172
1173 let mut fbb = FlatBufferBuilder::new();
1174 let dictionaries = fbb.create_vector(&self.dictionary_blocks);
1175 let record_batches = fbb.create_vector(&self.record_blocks);
1176 let mut dictionary_tracker = DictionaryTracker::new(true);
1177 let schema = IpcSchemaEncoder::new()
1178 .with_dictionary_tracker(&mut dictionary_tracker)
1179 .schema_to_fb_offset(&mut fbb, &self.schema);
1180 let fb_custom_metadata = (!self.custom_metadata.is_empty())
1181 .then(|| crate::convert::metadata_to_fb(&mut fbb, &self.custom_metadata));
1182
1183 let root = {
1184 let mut footer_builder = crate::FooterBuilder::new(&mut fbb);
1185 footer_builder.add_version(self.write_options.metadata_version);
1186 footer_builder.add_schema(schema);
1187 footer_builder.add_dictionaries(dictionaries);
1188 footer_builder.add_recordBatches(record_batches);
1189 if let Some(fb_custom_metadata) = fb_custom_metadata {
1190 footer_builder.add_custom_metadata(fb_custom_metadata);
1191 }
1192 footer_builder.finish()
1193 };
1194 fbb.finish(root, None);
1195 let footer_data = fbb.finished_data();
1196 self.writer.write_all(footer_data)?;
1197 self.writer
1198 .write_all(&(footer_data.len() as i32).to_le_bytes())?;
1199 self.writer.write_all(&super::ARROW_MAGIC)?;
1200 self.writer.flush()?;
1201 self.finished = true;
1202
1203 Ok(())
1204 }
1205
1206 pub fn schema(&self) -> &SchemaRef {
1208 &self.schema
1209 }
1210
1211 pub fn get_ref(&self) -> &W {
1213 &self.writer
1214 }
1215
1216 pub fn get_mut(&mut self) -> &mut W {
1220 &mut self.writer
1221 }
1222
1223 pub fn flush(&mut self) -> Result<(), ArrowError> {
1227 self.writer.flush()?;
1228 Ok(())
1229 }
1230
1231 pub fn into_inner(mut self) -> Result<W, ArrowError> {
1240 if !self.finished {
1241 self.finish()?;
1243 }
1244 Ok(self.writer)
1245 }
1246}
1247
1248impl<W: Write> RecordBatchWriter for FileWriter<W> {
1249 fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1250 self.write(batch)
1251 }
1252
1253 fn close(mut self) -> Result<(), ArrowError> {
1254 self.finish()
1255 }
1256}
1257
1258pub struct StreamWriter<W> {
1332 writer: W,
1334 write_options: IpcWriteOptions,
1336 finished: bool,
1338 dictionary_tracker: DictionaryTracker,
1340
1341 data_gen: IpcDataGenerator,
1342
1343 compression_context: CompressionContext,
1344}
1345
1346impl<W: Write> StreamWriter<BufWriter<W>> {
1347 pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1351 Self::try_new(BufWriter::new(writer), schema)
1352 }
1353}
1354
1355impl<W: Write> StreamWriter<W> {
1356 pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1364 let write_options = IpcWriteOptions::default();
1365 Self::try_new_with_options(writer, schema, write_options)
1366 }
1367
1368 pub fn try_new_with_options(
1374 mut writer: W,
1375 schema: &Schema,
1376 write_options: IpcWriteOptions,
1377 ) -> Result<Self, ArrowError> {
1378 let data_gen = IpcDataGenerator::default();
1379 let mut dictionary_tracker = DictionaryTracker::new(false);
1380
1381 let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker(
1383 schema,
1384 &mut dictionary_tracker,
1385 &write_options,
1386 );
1387 write_message(&mut writer, encoded_message, &write_options)?;
1388 Ok(Self {
1389 writer,
1390 write_options,
1391 finished: false,
1392 dictionary_tracker,
1393 data_gen,
1394 compression_context: CompressionContext::default(),
1395 })
1396 }
1397
1398 pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1400 if self.finished {
1401 return Err(ArrowError::IpcError(
1402 "Cannot write record batch to stream writer as it is closed".to_string(),
1403 ));
1404 }
1405
1406 let (encoded_dictionaries, encoded_message) = self
1407 .data_gen
1408 .encode(
1409 batch,
1410 &mut self.dictionary_tracker,
1411 &self.write_options,
1412 &mut self.compression_context,
1413 )
1414 .expect("StreamWriter is configured to not error on dictionary replacement");
1415
1416 for encoded_dictionary in encoded_dictionaries {
1417 write_message(&mut self.writer, encoded_dictionary, &self.write_options)?;
1418 }
1419
1420 write_message(&mut self.writer, encoded_message, &self.write_options)?;
1421 Ok(())
1422 }
1423
1424 pub fn finish(&mut self) -> Result<(), ArrowError> {
1426 if self.finished {
1427 return Err(ArrowError::IpcError(
1428 "Cannot write footer to stream writer as it is closed".to_string(),
1429 ));
1430 }
1431
1432 write_continuation(&mut self.writer, &self.write_options, 0)?;
1433
1434 self.finished = true;
1435
1436 Ok(())
1437 }
1438
1439 pub fn get_ref(&self) -> &W {
1441 &self.writer
1442 }
1443
1444 pub fn get_mut(&mut self) -> &mut W {
1448 &mut self.writer
1449 }
1450
1451 pub fn flush(&mut self) -> Result<(), ArrowError> {
1455 self.writer.flush()?;
1456 Ok(())
1457 }
1458
1459 pub fn into_inner(mut self) -> Result<W, ArrowError> {
1497 if !self.finished {
1498 self.finish()?;
1500 }
1501 Ok(self.writer)
1502 }
1503}
1504
1505impl<W: Write> RecordBatchWriter for StreamWriter<W> {
1506 fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1507 self.write(batch)
1508 }
1509
1510 fn close(mut self) -> Result<(), ArrowError> {
1511 self.finish()
1512 }
1513}
1514
1515pub struct EncodedData {
1517 pub ipc_message: Vec<u8>,
1519 pub arrow_data: Vec<u8>,
1521}
1522pub fn write_message<W: Write>(
1524 mut writer: W,
1525 encoded: EncodedData,
1526 write_options: &IpcWriteOptions,
1527) -> Result<(usize, usize), ArrowError> {
1528 let arrow_data_len = encoded.arrow_data.len();
1529 if arrow_data_len % usize::from(write_options.alignment) != 0 {
1530 return Err(ArrowError::MemoryError(
1531 "Arrow data not aligned".to_string(),
1532 ));
1533 }
1534
1535 let a = usize::from(write_options.alignment - 1);
1536 let buffer = encoded.ipc_message;
1537 let flatbuf_size = buffer.len();
1538 let prefix_size = if write_options.write_legacy_ipc_format {
1539 4
1540 } else {
1541 8
1542 };
1543 let aligned_size = (flatbuf_size + prefix_size + a) & !a;
1544 let padding_bytes = aligned_size - flatbuf_size - prefix_size;
1545
1546 write_continuation(
1547 &mut writer,
1548 write_options,
1549 (aligned_size - prefix_size) as i32,
1550 )?;
1551
1552 if flatbuf_size > 0 {
1554 writer.write_all(&buffer)?;
1555 }
1556 writer.write_all(&PADDING[..padding_bytes])?;
1558
1559 let body_len = if arrow_data_len > 0 {
1561 write_body_buffers(&mut writer, &encoded.arrow_data, write_options.alignment)?
1562 } else {
1563 0
1564 };
1565
1566 Ok((aligned_size, body_len))
1567}
1568
1569fn write_body_buffers<W: Write>(
1570 mut writer: W,
1571 data: &[u8],
1572 alignment: u8,
1573) -> Result<usize, ArrowError> {
1574 let len = data.len();
1575 let pad_len = pad_to_alignment(alignment, len);
1576 let total_len = len + pad_len;
1577
1578 writer.write_all(data)?;
1580 if pad_len > 0 {
1581 writer.write_all(&PADDING[..pad_len])?;
1582 }
1583
1584 writer.flush()?;
1585 Ok(total_len)
1586}
1587
1588fn write_continuation<W: Write>(
1591 mut writer: W,
1592 write_options: &IpcWriteOptions,
1593 total_len: i32,
1594) -> Result<usize, ArrowError> {
1595 let mut written = 8;
1596
1597 match write_options.metadata_version {
1599 crate::MetadataVersion::V1 | crate::MetadataVersion::V2 | crate::MetadataVersion::V3 => {
1600 unreachable!("Options with the metadata version cannot be created")
1601 }
1602 crate::MetadataVersion::V4 => {
1603 if !write_options.write_legacy_ipc_format {
1604 writer.write_all(&CONTINUATION_MARKER)?;
1606 written = 4;
1607 }
1608 writer.write_all(&total_len.to_le_bytes()[..])?;
1609 }
1610 crate::MetadataVersion::V5 => {
1611 writer.write_all(&CONTINUATION_MARKER)?;
1613 writer.write_all(&total_len.to_le_bytes()[..])?;
1614 }
1615 z => panic!("Unsupported crate::MetadataVersion {z:?}"),
1616 };
1617
1618 writer.flush()?;
1619
1620 Ok(written)
1621}
1622
1623fn has_validity_bitmap(data_type: &DataType, write_options: &IpcWriteOptions) -> bool {
1627 if write_options.metadata_version < crate::MetadataVersion::V5 {
1628 !matches!(data_type, DataType::Null)
1629 } else {
1630 !matches!(
1631 data_type,
1632 DataType::Null | DataType::Union(_, _) | DataType::RunEndEncoded(_, _)
1633 )
1634 }
1635}
1636
1637#[inline]
1639fn buffer_need_truncate(
1640 array_offset: usize,
1641 buffer: &Buffer,
1642 spec: &BufferSpec,
1643 min_length: usize,
1644) -> bool {
1645 spec != &BufferSpec::AlwaysNull && (array_offset != 0 || min_length < buffer.len())
1646}
1647
1648#[inline]
1650fn get_buffer_element_width(spec: &BufferSpec) -> usize {
1651 match spec {
1652 BufferSpec::FixedWidth { byte_width, .. } => *byte_width,
1653 _ => 0,
1654 }
1655}
1656
1657fn reencode_offsets<O: OffsetSizeTrait>(
1660 offsets: &Buffer,
1661 data: &ArrayData,
1662) -> (Buffer, usize, usize) {
1663 let offsets_slice: &[O] = offsets.typed_data::<O>();
1664 let offset_slice = &offsets_slice[data.offset()..data.offset() + data.len() + 1];
1665
1666 let start_offset = offset_slice.first().unwrap();
1667 let end_offset = offset_slice.last().unwrap();
1668
1669 let offsets = match start_offset.as_usize() {
1670 0 => {
1671 let size = size_of::<O>();
1672 offsets.slice_with_length(data.offset() * size, (data.len() + 1) * size)
1673 }
1674 _ => offset_slice.iter().map(|x| *x - *start_offset).collect(),
1675 };
1676
1677 let start_offset = start_offset.as_usize();
1678 let end_offset = end_offset.as_usize();
1679
1680 (offsets, start_offset, end_offset - start_offset)
1681}
1682
1683fn get_byte_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, Buffer) {
1689 if data.is_empty() {
1690 return (MutableBuffer::new(0).into(), MutableBuffer::new(0).into());
1691 }
1692
1693 let (offsets, original_start_offset, len) = reencode_offsets::<O>(&data.buffers()[0], data);
1694 let values = data.buffers()[1].slice_with_length(original_start_offset, len);
1695 (offsets, values)
1696}
1697
1698fn get_list_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, ArrayData) {
1701 if data.is_empty() {
1702 return (
1703 MutableBuffer::new(0).into(),
1704 data.child_data()[0].slice(0, 0),
1705 );
1706 }
1707
1708 let (offsets, original_start_offset, len) = reencode_offsets::<O>(&data.buffers()[0], data);
1709 let child_data = data.child_data()[0].slice(original_start_offset, len);
1710 (offsets, child_data)
1711}
1712
1713#[allow(clippy::too_many_arguments)]
1715fn write_array_data(
1716 array_data: &ArrayData,
1717 buffers: &mut Vec<crate::Buffer>,
1718 arrow_data: &mut Vec<u8>,
1719 nodes: &mut Vec<crate::FieldNode>,
1720 offset: i64,
1721 num_rows: usize,
1722 null_count: usize,
1723 compression_codec: Option<CompressionCodec>,
1724 compression_context: &mut CompressionContext,
1725 write_options: &IpcWriteOptions,
1726) -> Result<i64, ArrowError> {
1727 let mut offset = offset;
1728 if !matches!(array_data.data_type(), DataType::Null) {
1729 nodes.push(crate::FieldNode::new(num_rows as i64, null_count as i64));
1730 } else {
1731 nodes.push(crate::FieldNode::new(num_rows as i64, num_rows as i64));
1734 }
1735 if has_validity_bitmap(array_data.data_type(), write_options) {
1736 let null_buffer = match array_data.nulls() {
1738 None => {
1739 let num_bytes = bit_util::ceil(num_rows, 8);
1741 let buffer = MutableBuffer::new(num_bytes);
1742 let buffer = buffer.with_bitset(num_bytes, true);
1743 buffer.into()
1744 }
1745 Some(buffer) => buffer.inner().sliced(),
1746 };
1747
1748 offset = write_buffer(
1749 null_buffer.as_slice(),
1750 buffers,
1751 arrow_data,
1752 offset,
1753 compression_codec,
1754 compression_context,
1755 write_options.alignment,
1756 )?;
1757 }
1758
1759 let data_type = array_data.data_type();
1760 if matches!(data_type, DataType::Binary | DataType::Utf8) {
1761 let (offsets, values) = get_byte_array_buffers::<i32>(array_data);
1762 for buffer in [offsets, values] {
1763 offset = write_buffer(
1764 buffer.as_slice(),
1765 buffers,
1766 arrow_data,
1767 offset,
1768 compression_codec,
1769 compression_context,
1770 write_options.alignment,
1771 )?;
1772 }
1773 } else if matches!(data_type, DataType::BinaryView | DataType::Utf8View) {
1774 for buffer in array_data.buffers() {
1781 offset = write_buffer(
1782 buffer.as_slice(),
1783 buffers,
1784 arrow_data,
1785 offset,
1786 compression_codec,
1787 compression_context,
1788 write_options.alignment,
1789 )?;
1790 }
1791 } else if matches!(data_type, DataType::LargeBinary | DataType::LargeUtf8) {
1792 let (offsets, values) = get_byte_array_buffers::<i64>(array_data);
1793 for buffer in [offsets, values] {
1794 offset = write_buffer(
1795 buffer.as_slice(),
1796 buffers,
1797 arrow_data,
1798 offset,
1799 compression_codec,
1800 compression_context,
1801 write_options.alignment,
1802 )?;
1803 }
1804 } else if DataType::is_numeric(data_type)
1805 || DataType::is_temporal(data_type)
1806 || matches!(
1807 array_data.data_type(),
1808 DataType::FixedSizeBinary(_) | DataType::Dictionary(_, _)
1809 )
1810 {
1811 assert_eq!(array_data.buffers().len(), 1);
1813
1814 let buffer = &array_data.buffers()[0];
1815 let layout = layout(data_type);
1816 let spec = &layout.buffers[0];
1817
1818 let byte_width = get_buffer_element_width(spec);
1819 let min_length = array_data.len() * byte_width;
1820 let buffer_slice = if buffer_need_truncate(array_data.offset(), buffer, spec, min_length) {
1821 let byte_offset = array_data.offset() * byte_width;
1822 let buffer_length = min(min_length, buffer.len() - byte_offset);
1823 &buffer.as_slice()[byte_offset..(byte_offset + buffer_length)]
1824 } else {
1825 buffer.as_slice()
1826 };
1827 offset = write_buffer(
1828 buffer_slice,
1829 buffers,
1830 arrow_data,
1831 offset,
1832 compression_codec,
1833 compression_context,
1834 write_options.alignment,
1835 )?;
1836 } else if matches!(data_type, DataType::Boolean) {
1837 assert_eq!(array_data.buffers().len(), 1);
1840
1841 let buffer = &array_data.buffers()[0];
1842 let buffer = buffer.bit_slice(array_data.offset(), array_data.len());
1843 offset = write_buffer(
1844 &buffer,
1845 buffers,
1846 arrow_data,
1847 offset,
1848 compression_codec,
1849 compression_context,
1850 write_options.alignment,
1851 )?;
1852 } else if matches!(
1853 data_type,
1854 DataType::List(_) | DataType::LargeList(_) | DataType::Map(_, _)
1855 ) {
1856 assert_eq!(array_data.buffers().len(), 1);
1857 assert_eq!(array_data.child_data().len(), 1);
1858
1859 let (offsets, sliced_child_data) = match data_type {
1861 DataType::List(_) => get_list_array_buffers::<i32>(array_data),
1862 DataType::Map(_, _) => get_list_array_buffers::<i32>(array_data),
1863 DataType::LargeList(_) => get_list_array_buffers::<i64>(array_data),
1864 _ => unreachable!(),
1865 };
1866 offset = write_buffer(
1867 offsets.as_slice(),
1868 buffers,
1869 arrow_data,
1870 offset,
1871 compression_codec,
1872 compression_context,
1873 write_options.alignment,
1874 )?;
1875 offset = write_array_data(
1876 &sliced_child_data,
1877 buffers,
1878 arrow_data,
1879 nodes,
1880 offset,
1881 sliced_child_data.len(),
1882 sliced_child_data.null_count(),
1883 compression_codec,
1884 compression_context,
1885 write_options,
1886 )?;
1887 return Ok(offset);
1888 } else if let DataType::FixedSizeList(_, fixed_size) = data_type {
1889 assert_eq!(array_data.child_data().len(), 1);
1890 let fixed_size = *fixed_size as usize;
1891
1892 let child_offset = array_data.offset() * fixed_size;
1893 let child_length = array_data.len() * fixed_size;
1894 let child_data = array_data.child_data()[0].slice(child_offset, child_length);
1895
1896 offset = write_array_data(
1897 &child_data,
1898 buffers,
1899 arrow_data,
1900 nodes,
1901 offset,
1902 child_data.len(),
1903 child_data.null_count(),
1904 compression_codec,
1905 compression_context,
1906 write_options,
1907 )?;
1908 return Ok(offset);
1909 } else {
1910 for buffer in array_data.buffers() {
1911 offset = write_buffer(
1912 buffer,
1913 buffers,
1914 arrow_data,
1915 offset,
1916 compression_codec,
1917 compression_context,
1918 write_options.alignment,
1919 )?;
1920 }
1921 }
1922
1923 match array_data.data_type() {
1924 DataType::Dictionary(_, _) => {}
1925 DataType::RunEndEncoded(_, _) => {
1926 let arr = unslice_run_array(array_data.clone())?;
1928 for data_ref in arr.child_data() {
1930 offset = write_array_data(
1932 data_ref,
1933 buffers,
1934 arrow_data,
1935 nodes,
1936 offset,
1937 data_ref.len(),
1938 data_ref.null_count(),
1939 compression_codec,
1940 compression_context,
1941 write_options,
1942 )?;
1943 }
1944 }
1945 _ => {
1946 for data_ref in array_data.child_data() {
1948 offset = write_array_data(
1950 data_ref,
1951 buffers,
1952 arrow_data,
1953 nodes,
1954 offset,
1955 data_ref.len(),
1956 data_ref.null_count(),
1957 compression_codec,
1958 compression_context,
1959 write_options,
1960 )?;
1961 }
1962 }
1963 }
1964 Ok(offset)
1965}
1966
1967fn write_buffer(
1980 buffer: &[u8], buffers: &mut Vec<crate::Buffer>, arrow_data: &mut Vec<u8>, offset: i64, compression_codec: Option<CompressionCodec>,
1985 compression_context: &mut CompressionContext,
1986 alignment: u8,
1987) -> Result<i64, ArrowError> {
1988 let len: i64 = match compression_codec {
1989 Some(compressor) => compressor.compress_to_vec(buffer, arrow_data, compression_context)?,
1990 None => {
1991 arrow_data.extend_from_slice(buffer);
1992 buffer.len()
1993 }
1994 }
1995 .try_into()
1996 .map_err(|e| {
1997 ArrowError::InvalidArgumentError(format!("Could not convert compressed size to i64: {e}"))
1998 })?;
1999
2000 buffers.push(crate::Buffer::new(offset, len));
2002 let pad_len = pad_to_alignment(alignment, len as usize);
2004 arrow_data.extend_from_slice(&PADDING[..pad_len]);
2005
2006 Ok(offset + len + (pad_len as i64))
2007}
2008
2009const PADDING: [u8; 64] = [0; 64];
2010
2011#[inline]
2013fn pad_to_alignment(alignment: u8, len: usize) -> usize {
2014 let a = usize::from(alignment - 1);
2015 ((len + a) & !a) - len
2016}
2017
2018#[cfg(test)]
2019mod tests {
2020 use std::hash::Hasher;
2021 use std::io::Cursor;
2022 use std::io::Seek;
2023
2024 use arrow_array::builder::FixedSizeListBuilder;
2025 use arrow_array::builder::Float32Builder;
2026 use arrow_array::builder::Int64Builder;
2027 use arrow_array::builder::MapBuilder;
2028 use arrow_array::builder::UnionBuilder;
2029 use arrow_array::builder::{GenericListBuilder, ListBuilder, StringBuilder};
2030 use arrow_array::builder::{PrimitiveRunBuilder, UInt32Builder};
2031 use arrow_array::types::*;
2032 use arrow_buffer::ScalarBuffer;
2033
2034 use crate::MetadataVersion;
2035 use crate::convert::fb_to_schema;
2036 use crate::reader::*;
2037 use crate::root_as_footer;
2038
2039 use super::*;
2040
2041 fn serialize_file(rb: &RecordBatch) -> Vec<u8> {
2042 let mut writer = FileWriter::try_new(vec![], rb.schema_ref()).unwrap();
2043 writer.write(rb).unwrap();
2044 writer.finish().unwrap();
2045 writer.into_inner().unwrap()
2046 }
2047
2048 fn deserialize_file(bytes: Vec<u8>) -> RecordBatch {
2049 let mut reader = FileReader::try_new(Cursor::new(bytes), None).unwrap();
2050 reader.next().unwrap().unwrap()
2051 }
2052
2053 fn serialize_stream(record: &RecordBatch) -> Vec<u8> {
2054 const IPC_ALIGNMENT: usize = 8;
2058
2059 let mut stream_writer = StreamWriter::try_new_with_options(
2060 vec![],
2061 record.schema_ref(),
2062 IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
2063 )
2064 .unwrap();
2065 stream_writer.write(record).unwrap();
2066 stream_writer.finish().unwrap();
2067 stream_writer.into_inner().unwrap()
2068 }
2069
2070 fn deserialize_stream(bytes: Vec<u8>) -> RecordBatch {
2071 let mut stream_reader = StreamReader::try_new(Cursor::new(bytes), None).unwrap();
2072 stream_reader.next().unwrap().unwrap()
2073 }
2074
2075 #[test]
2076 #[cfg(feature = "lz4")]
2077 fn test_write_empty_record_batch_lz4_compression() {
2078 let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
2079 let values: Vec<Option<i32>> = vec![];
2080 let array = Int32Array::from(values);
2081 let record_batch =
2082 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2083
2084 let mut file = tempfile::tempfile().unwrap();
2085
2086 {
2087 let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2088 .unwrap()
2089 .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
2090 .unwrap();
2091
2092 let mut writer =
2093 FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2094 writer.write(&record_batch).unwrap();
2095 writer.finish().unwrap();
2096 }
2097 file.rewind().unwrap();
2098 {
2099 let reader = FileReader::try_new(file, None).unwrap();
2101 for read_batch in reader {
2102 read_batch
2103 .unwrap()
2104 .columns()
2105 .iter()
2106 .zip(record_batch.columns())
2107 .for_each(|(a, b)| {
2108 assert_eq!(a.data_type(), b.data_type());
2109 assert_eq!(a.len(), b.len());
2110 assert_eq!(a.null_count(), b.null_count());
2111 });
2112 }
2113 }
2114 }
2115
2116 #[test]
2117 #[cfg(feature = "lz4")]
2118 fn test_write_file_with_lz4_compression() {
2119 let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
2120 let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
2121 let array = Int32Array::from(values);
2122 let record_batch =
2123 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2124
2125 let mut file = tempfile::tempfile().unwrap();
2126 {
2127 let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2128 .unwrap()
2129 .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
2130 .unwrap();
2131
2132 let mut writer =
2133 FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2134 writer.write(&record_batch).unwrap();
2135 writer.finish().unwrap();
2136 }
2137 file.rewind().unwrap();
2138 {
2139 let reader = FileReader::try_new(file, None).unwrap();
2141 for read_batch in reader {
2142 read_batch
2143 .unwrap()
2144 .columns()
2145 .iter()
2146 .zip(record_batch.columns())
2147 .for_each(|(a, b)| {
2148 assert_eq!(a.data_type(), b.data_type());
2149 assert_eq!(a.len(), b.len());
2150 assert_eq!(a.null_count(), b.null_count());
2151 });
2152 }
2153 }
2154 }
2155
2156 #[test]
2157 #[cfg(feature = "zstd")]
2158 fn test_write_file_with_zstd_compression() {
2159 let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
2160 let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
2161 let array = Int32Array::from(values);
2162 let record_batch =
2163 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2164 let mut file = tempfile::tempfile().unwrap();
2165 {
2166 let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2167 .unwrap()
2168 .try_with_compression(Some(crate::CompressionType::ZSTD))
2169 .unwrap();
2170
2171 let mut writer =
2172 FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2173 writer.write(&record_batch).unwrap();
2174 writer.finish().unwrap();
2175 }
2176 file.rewind().unwrap();
2177 {
2178 let reader = FileReader::try_new(file, None).unwrap();
2180 for read_batch in reader {
2181 read_batch
2182 .unwrap()
2183 .columns()
2184 .iter()
2185 .zip(record_batch.columns())
2186 .for_each(|(a, b)| {
2187 assert_eq!(a.data_type(), b.data_type());
2188 assert_eq!(a.len(), b.len());
2189 assert_eq!(a.null_count(), b.null_count());
2190 });
2191 }
2192 }
2193 }
2194
2195 #[test]
2196 fn test_write_file() {
2197 let schema = Schema::new(vec![Field::new("field1", DataType::UInt32, true)]);
2198 let values: Vec<Option<u32>> = vec![
2199 Some(999),
2200 None,
2201 Some(235),
2202 Some(123),
2203 None,
2204 None,
2205 None,
2206 None,
2207 None,
2208 ];
2209 let array1 = UInt32Array::from(values);
2210 let batch =
2211 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array1) as ArrayRef])
2212 .unwrap();
2213 let mut file = tempfile::tempfile().unwrap();
2214 {
2215 let mut writer = FileWriter::try_new(&mut file, &schema).unwrap();
2216
2217 writer.write(&batch).unwrap();
2218 writer.finish().unwrap();
2219 }
2220 file.rewind().unwrap();
2221
2222 {
2223 let mut reader = FileReader::try_new(file, None).unwrap();
2224 while let Some(Ok(read_batch)) = reader.next() {
2225 read_batch
2226 .columns()
2227 .iter()
2228 .zip(batch.columns())
2229 .for_each(|(a, b)| {
2230 assert_eq!(a.data_type(), b.data_type());
2231 assert_eq!(a.len(), b.len());
2232 assert_eq!(a.null_count(), b.null_count());
2233 });
2234 }
2235 }
2236 }
2237
2238 fn write_null_file(options: IpcWriteOptions) {
2239 let schema = Schema::new(vec![
2240 Field::new("nulls", DataType::Null, true),
2241 Field::new("int32s", DataType::Int32, false),
2242 Field::new("nulls2", DataType::Null, true),
2243 Field::new("f64s", DataType::Float64, false),
2244 ]);
2245 let array1 = NullArray::new(32);
2246 let array2 = Int32Array::from(vec![1; 32]);
2247 let array3 = NullArray::new(32);
2248 let array4 = Float64Array::from(vec![f64::NAN; 32]);
2249 let batch = RecordBatch::try_new(
2250 Arc::new(schema.clone()),
2251 vec![
2252 Arc::new(array1) as ArrayRef,
2253 Arc::new(array2) as ArrayRef,
2254 Arc::new(array3) as ArrayRef,
2255 Arc::new(array4) as ArrayRef,
2256 ],
2257 )
2258 .unwrap();
2259 let mut file = tempfile::tempfile().unwrap();
2260 {
2261 let mut writer = FileWriter::try_new_with_options(&mut file, &schema, options).unwrap();
2262
2263 writer.write(&batch).unwrap();
2264 writer.finish().unwrap();
2265 }
2266
2267 file.rewind().unwrap();
2268
2269 {
2270 let reader = FileReader::try_new(file, None).unwrap();
2271 reader.for_each(|maybe_batch| {
2272 maybe_batch
2273 .unwrap()
2274 .columns()
2275 .iter()
2276 .zip(batch.columns())
2277 .for_each(|(a, b)| {
2278 assert_eq!(a.data_type(), b.data_type());
2279 assert_eq!(a.len(), b.len());
2280 assert_eq!(a.null_count(), b.null_count());
2281 });
2282 });
2283 }
2284 }
2285 #[test]
2286 fn test_write_null_file_v4() {
2287 write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap());
2288 write_null_file(IpcWriteOptions::try_new(8, true, MetadataVersion::V4).unwrap());
2289 write_null_file(IpcWriteOptions::try_new(64, false, MetadataVersion::V4).unwrap());
2290 write_null_file(IpcWriteOptions::try_new(64, true, MetadataVersion::V4).unwrap());
2291 }
2292
2293 #[test]
2294 fn test_write_null_file_v5() {
2295 write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap());
2296 write_null_file(IpcWriteOptions::try_new(64, false, MetadataVersion::V5).unwrap());
2297 }
2298
2299 #[test]
2300 fn track_union_nested_dict() {
2301 let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2302
2303 let array = Arc::new(inner) as ArrayRef;
2304
2305 #[allow(deprecated)]
2307 let dctfield = Field::new_dict("dict", array.data_type().clone(), false, 0, false);
2308 let union_fields = [(0, Arc::new(dctfield))].into_iter().collect();
2309
2310 let types = [0, 0, 0].into_iter().collect::<ScalarBuffer<i8>>();
2311 let offsets = [0, 1, 2].into_iter().collect::<ScalarBuffer<i32>>();
2312
2313 let union = UnionArray::try_new(union_fields, types, Some(offsets), vec![array]).unwrap();
2314
2315 let schema = Arc::new(Schema::new(vec![Field::new(
2316 "union",
2317 union.data_type().clone(),
2318 false,
2319 )]));
2320
2321 let r#gen = IpcDataGenerator::default();
2322 let mut dict_tracker = DictionaryTracker::new(false);
2323 r#gen.schema_to_bytes_with_dictionary_tracker(
2324 &schema,
2325 &mut dict_tracker,
2326 &IpcWriteOptions::default(),
2327 );
2328
2329 let batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap();
2330
2331 r#gen
2332 .encode(
2333 &batch,
2334 &mut dict_tracker,
2335 &Default::default(),
2336 &mut Default::default(),
2337 )
2338 .unwrap();
2339
2340 assert!(dict_tracker.written.contains_key(&0));
2343 }
2344
2345 #[test]
2346 fn track_struct_nested_dict() {
2347 let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2348
2349 let array = Arc::new(inner) as ArrayRef;
2350
2351 #[allow(deprecated)]
2353 let dctfield = Arc::new(Field::new_dict(
2354 "dict",
2355 array.data_type().clone(),
2356 false,
2357 2,
2358 false,
2359 ));
2360
2361 let s = StructArray::from(vec![(dctfield, array)]);
2362 let struct_array = Arc::new(s) as ArrayRef;
2363
2364 let schema = Arc::new(Schema::new(vec![Field::new(
2365 "struct",
2366 struct_array.data_type().clone(),
2367 false,
2368 )]));
2369
2370 let r#gen = IpcDataGenerator::default();
2371 let mut dict_tracker = DictionaryTracker::new(false);
2372 r#gen.schema_to_bytes_with_dictionary_tracker(
2373 &schema,
2374 &mut dict_tracker,
2375 &IpcWriteOptions::default(),
2376 );
2377
2378 let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
2379
2380 r#gen
2381 .encode(
2382 &batch,
2383 &mut dict_tracker,
2384 &Default::default(),
2385 &mut Default::default(),
2386 )
2387 .unwrap();
2388
2389 assert!(dict_tracker.written.contains_key(&0));
2390 }
2391
2392 fn write_union_file(options: IpcWriteOptions) {
2393 let schema = Schema::new(vec![Field::new_union(
2394 "union",
2395 vec![0, 1],
2396 vec![
2397 Field::new("a", DataType::Int32, false),
2398 Field::new("c", DataType::Float64, false),
2399 ],
2400 UnionMode::Sparse,
2401 )]);
2402 let mut builder = UnionBuilder::with_capacity_sparse(5);
2403 builder.append::<Int32Type>("a", 1).unwrap();
2404 builder.append_null::<Int32Type>("a").unwrap();
2405 builder.append::<Float64Type>("c", 3.0).unwrap();
2406 builder.append_null::<Float64Type>("c").unwrap();
2407 builder.append::<Int32Type>("a", 4).unwrap();
2408 let union = builder.build().unwrap();
2409
2410 let batch =
2411 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(union) as ArrayRef])
2412 .unwrap();
2413
2414 let mut file = tempfile::tempfile().unwrap();
2415 {
2416 let mut writer = FileWriter::try_new_with_options(&mut file, &schema, options).unwrap();
2417
2418 writer.write(&batch).unwrap();
2419 writer.finish().unwrap();
2420 }
2421 file.rewind().unwrap();
2422
2423 {
2424 let reader = FileReader::try_new(file, None).unwrap();
2425 reader.for_each(|maybe_batch| {
2426 maybe_batch
2427 .unwrap()
2428 .columns()
2429 .iter()
2430 .zip(batch.columns())
2431 .for_each(|(a, b)| {
2432 assert_eq!(a.data_type(), b.data_type());
2433 assert_eq!(a.len(), b.len());
2434 assert_eq!(a.null_count(), b.null_count());
2435 });
2436 });
2437 }
2438 }
2439
2440 #[test]
2441 fn test_write_union_file_v4_v5() {
2442 write_union_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap());
2443 write_union_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap());
2444 }
2445
2446 #[test]
2447 fn test_write_view_types() {
2448 const LONG_TEST_STRING: &str =
2449 "This is a long string to make sure binary view array handles it";
2450 let schema = Schema::new(vec![
2451 Field::new("field1", DataType::BinaryView, true),
2452 Field::new("field2", DataType::Utf8View, true),
2453 ]);
2454 let values: Vec<Option<&[u8]>> = vec![
2455 Some(b"foo"),
2456 Some(b"bar"),
2457 Some(LONG_TEST_STRING.as_bytes()),
2458 ];
2459 let binary_array = BinaryViewArray::from_iter(values);
2460 let utf8_array =
2461 StringViewArray::from_iter(vec![Some("foo"), Some("bar"), Some(LONG_TEST_STRING)]);
2462 let record_batch = RecordBatch::try_new(
2463 Arc::new(schema.clone()),
2464 vec![Arc::new(binary_array), Arc::new(utf8_array)],
2465 )
2466 .unwrap();
2467
2468 let mut file = tempfile::tempfile().unwrap();
2469 {
2470 let mut writer = FileWriter::try_new(&mut file, &schema).unwrap();
2471 writer.write(&record_batch).unwrap();
2472 writer.finish().unwrap();
2473 }
2474 file.rewind().unwrap();
2475 {
2476 let mut reader = FileReader::try_new(&file, None).unwrap();
2477 let read_batch = reader.next().unwrap().unwrap();
2478 read_batch
2479 .columns()
2480 .iter()
2481 .zip(record_batch.columns())
2482 .for_each(|(a, b)| {
2483 assert_eq!(a, b);
2484 });
2485 }
2486 file.rewind().unwrap();
2487 {
2488 let mut reader = FileReader::try_new(&file, Some(vec![0])).unwrap();
2489 let read_batch = reader.next().unwrap().unwrap();
2490 assert_eq!(read_batch.num_columns(), 1);
2491 let read_array = read_batch.column(0);
2492 let write_array = record_batch.column(0);
2493 assert_eq!(read_array, write_array);
2494 }
2495 }
2496
2497 #[test]
2498 fn truncate_ipc_record_batch() {
2499 fn create_batch(rows: usize) -> RecordBatch {
2500 let schema = Schema::new(vec![
2501 Field::new("a", DataType::Int32, false),
2502 Field::new("b", DataType::Utf8, false),
2503 ]);
2504
2505 let a = Int32Array::from_iter_values(0..rows as i32);
2506 let b = StringArray::from_iter_values((0..rows).map(|i| i.to_string()));
2507
2508 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap()
2509 }
2510
2511 let big_record_batch = create_batch(65536);
2512
2513 let length = 5;
2514 let small_record_batch = create_batch(length);
2515
2516 let offset = 2;
2517 let record_batch_slice = big_record_batch.slice(offset, length);
2518 assert!(
2519 serialize_stream(&big_record_batch).len() > serialize_stream(&small_record_batch).len()
2520 );
2521 assert_eq!(
2522 serialize_stream(&small_record_batch).len(),
2523 serialize_stream(&record_batch_slice).len()
2524 );
2525
2526 assert_eq!(
2527 deserialize_stream(serialize_stream(&record_batch_slice)),
2528 record_batch_slice
2529 );
2530 }
2531
2532 #[test]
2533 fn truncate_ipc_record_batch_with_nulls() {
2534 fn create_batch() -> RecordBatch {
2535 let schema = Schema::new(vec![
2536 Field::new("a", DataType::Int32, true),
2537 Field::new("b", DataType::Utf8, true),
2538 ]);
2539
2540 let a = Int32Array::from(vec![Some(1), None, Some(1), None, Some(1)]);
2541 let b = StringArray::from(vec![None, Some("a"), Some("a"), None, Some("a")]);
2542
2543 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap()
2544 }
2545
2546 let record_batch = create_batch();
2547 let record_batch_slice = record_batch.slice(1, 2);
2548 let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2549
2550 assert!(
2551 serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2552 );
2553
2554 assert!(deserialized_batch.column(0).is_null(0));
2555 assert!(deserialized_batch.column(0).is_valid(1));
2556 assert!(deserialized_batch.column(1).is_valid(0));
2557 assert!(deserialized_batch.column(1).is_valid(1));
2558
2559 assert_eq!(record_batch_slice, deserialized_batch);
2560 }
2561
2562 #[test]
2563 fn truncate_ipc_dictionary_array() {
2564 fn create_batch() -> RecordBatch {
2565 let values: StringArray = [Some("foo"), Some("bar"), Some("baz")]
2566 .into_iter()
2567 .collect();
2568 let keys: Int32Array = [Some(0), Some(2), None, Some(1)].into_iter().collect();
2569
2570 let array = DictionaryArray::new(keys, Arc::new(values));
2571
2572 let schema = Schema::new(vec![Field::new("dict", array.data_type().clone(), true)]);
2573
2574 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap()
2575 }
2576
2577 let record_batch = create_batch();
2578 let record_batch_slice = record_batch.slice(1, 2);
2579 let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2580
2581 assert!(
2582 serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2583 );
2584
2585 assert!(deserialized_batch.column(0).is_valid(0));
2586 assert!(deserialized_batch.column(0).is_null(1));
2587
2588 assert_eq!(record_batch_slice, deserialized_batch);
2589 }
2590
2591 #[test]
2592 fn truncate_ipc_struct_array() {
2593 fn create_batch() -> RecordBatch {
2594 let strings: StringArray = [Some("foo"), None, Some("bar"), Some("baz")]
2595 .into_iter()
2596 .collect();
2597 let ints: Int32Array = [Some(0), Some(2), None, Some(1)].into_iter().collect();
2598
2599 let struct_array = StructArray::from(vec![
2600 (
2601 Arc::new(Field::new("s", DataType::Utf8, true)),
2602 Arc::new(strings) as ArrayRef,
2603 ),
2604 (
2605 Arc::new(Field::new("c", DataType::Int32, true)),
2606 Arc::new(ints) as ArrayRef,
2607 ),
2608 ]);
2609
2610 let schema = Schema::new(vec![Field::new(
2611 "struct_array",
2612 struct_array.data_type().clone(),
2613 true,
2614 )]);
2615
2616 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(struct_array)]).unwrap()
2617 }
2618
2619 let record_batch = create_batch();
2620 let record_batch_slice = record_batch.slice(1, 2);
2621 let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2622
2623 assert!(
2624 serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2625 );
2626
2627 let structs = deserialized_batch
2628 .column(0)
2629 .as_any()
2630 .downcast_ref::<StructArray>()
2631 .unwrap();
2632
2633 assert!(structs.column(0).is_null(0));
2634 assert!(structs.column(0).is_valid(1));
2635 assert!(structs.column(1).is_valid(0));
2636 assert!(structs.column(1).is_null(1));
2637 assert_eq!(record_batch_slice, deserialized_batch);
2638 }
2639
2640 #[test]
2641 fn truncate_ipc_string_array_with_all_empty_string() {
2642 fn create_batch() -> RecordBatch {
2643 let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
2644 let a = StringArray::from(vec![Some(""), Some(""), Some(""), Some(""), Some("")]);
2645 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap()
2646 }
2647
2648 let record_batch = create_batch();
2649 let record_batch_slice = record_batch.slice(0, 1);
2650 let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2651
2652 assert!(
2653 serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2654 );
2655 assert_eq!(record_batch_slice, deserialized_batch);
2656 }
2657
2658 #[test]
2659 fn test_stream_writer_writes_array_slice() {
2660 let array = UInt32Array::from(vec![Some(1), Some(2), Some(3)]);
2661 assert_eq!(
2662 vec![Some(1), Some(2), Some(3)],
2663 array.iter().collect::<Vec<_>>()
2664 );
2665
2666 let sliced = array.slice(1, 2);
2667 assert_eq!(vec![Some(2), Some(3)], sliced.iter().collect::<Vec<_>>());
2668
2669 let batch = RecordBatch::try_new(
2670 Arc::new(Schema::new(vec![Field::new("a", DataType::UInt32, true)])),
2671 vec![Arc::new(sliced)],
2672 )
2673 .expect("new batch");
2674
2675 let mut writer = StreamWriter::try_new(vec![], batch.schema_ref()).expect("new writer");
2676 writer.write(&batch).expect("write");
2677 let outbuf = writer.into_inner().expect("inner");
2678
2679 let mut reader = StreamReader::try_new(&outbuf[..], None).expect("new reader");
2680 let read_batch = reader.next().unwrap().expect("read batch");
2681
2682 let read_array: &UInt32Array = read_batch.column(0).as_primitive();
2683 assert_eq!(
2684 vec![Some(2), Some(3)],
2685 read_array.iter().collect::<Vec<_>>()
2686 );
2687 }
2688
2689 #[test]
2690 fn test_large_slice_uint32() {
2691 ensure_roundtrip(Arc::new(UInt32Array::from_iter(
2692 (0..8000).map(|i| if i % 2 == 0 { Some(i) } else { None }),
2693 )));
2694 }
2695
2696 #[test]
2697 fn test_large_slice_string() {
2698 let strings: Vec<_> = (0..8000)
2699 .map(|i| {
2700 if i % 2 == 0 {
2701 Some(format!("value{i}"))
2702 } else {
2703 None
2704 }
2705 })
2706 .collect();
2707
2708 ensure_roundtrip(Arc::new(StringArray::from(strings)));
2709 }
2710
2711 #[test]
2712 fn test_large_slice_string_list() {
2713 let mut ls = ListBuilder::new(StringBuilder::new());
2714
2715 let mut s = String::new();
2716 for row_number in 0..8000 {
2717 if row_number % 2 == 0 {
2718 for list_element in 0..1000 {
2719 s.clear();
2720 use std::fmt::Write;
2721 write!(&mut s, "value{row_number}-{list_element}").unwrap();
2722 ls.values().append_value(&s);
2723 }
2724 ls.append(true)
2725 } else {
2726 ls.append(false); }
2728 }
2729
2730 ensure_roundtrip(Arc::new(ls.finish()));
2731 }
2732
2733 #[test]
2734 fn test_large_slice_string_list_of_lists() {
2735 let mut ls = ListBuilder::new(ListBuilder::new(StringBuilder::new()));
2739
2740 for _ in 0..4000 {
2741 ls.values().append(true);
2742 ls.append(true)
2743 }
2744
2745 let mut s = String::new();
2746 for row_number in 0..4000 {
2747 if row_number % 2 == 0 {
2748 for list_element in 0..1000 {
2749 s.clear();
2750 use std::fmt::Write;
2751 write!(&mut s, "value{row_number}-{list_element}").unwrap();
2752 ls.values().values().append_value(&s);
2753 }
2754 ls.values().append(true);
2755 ls.append(true)
2756 } else {
2757 ls.append(false); }
2759 }
2760
2761 ensure_roundtrip(Arc::new(ls.finish()));
2762 }
2763
2764 fn ensure_roundtrip(array: ArrayRef) {
2766 let num_rows = array.len();
2767 let orig_batch = RecordBatch::try_from_iter(vec![("a", array)]).unwrap();
2768 let sliced_batch = orig_batch.slice(1, num_rows - 1);
2770
2771 let schema = orig_batch.schema();
2772 let stream_data = {
2773 let mut writer = StreamWriter::try_new(vec![], &schema).unwrap();
2774 writer.write(&sliced_batch).unwrap();
2775 writer.into_inner().unwrap()
2776 };
2777 let read_batch = {
2778 let projection = None;
2779 let mut reader = StreamReader::try_new(Cursor::new(stream_data), projection).unwrap();
2780 reader
2781 .next()
2782 .expect("expect no errors reading batch")
2783 .expect("expect batch")
2784 };
2785 assert_eq!(sliced_batch, read_batch);
2786
2787 let file_data = {
2788 let mut writer = FileWriter::try_new_buffered(vec![], &schema).unwrap();
2789 writer.write(&sliced_batch).unwrap();
2790 writer.into_inner().unwrap().into_inner().unwrap()
2791 };
2792 let read_batch = {
2793 let projection = None;
2794 let mut reader = FileReader::try_new(Cursor::new(file_data), projection).unwrap();
2795 reader
2796 .next()
2797 .expect("expect no errors reading batch")
2798 .expect("expect batch")
2799 };
2800 assert_eq!(sliced_batch, read_batch);
2801
2802 }
2804
2805 #[test]
2806 fn encode_bools_slice() {
2807 assert_bool_roundtrip([true, false], 1, 1);
2809
2810 assert_bool_roundtrip(
2812 [
2813 true, false, true, true, false, false, true, true, true, false, false, false, true,
2814 true, true, true, false, false, false, false, true, true, true, true, true, false,
2815 false, false, false, false,
2816 ],
2817 13,
2818 17,
2819 );
2820
2821 assert_bool_roundtrip(
2823 [
2824 true, false, true, true, false, false, true, true, true, false, false, false,
2825 ],
2826 8,
2827 2,
2828 );
2829
2830 assert_bool_roundtrip(
2832 [
2833 true, false, true, true, false, false, true, true, true, false, false, false, true,
2834 true, true, true, true, false, false, false, false, false,
2835 ],
2836 8,
2837 8,
2838 );
2839 }
2840
2841 fn assert_bool_roundtrip<const N: usize>(bools: [bool; N], offset: usize, length: usize) {
2842 let val_bool_field = Field::new("val", DataType::Boolean, false);
2843
2844 let schema = Arc::new(Schema::new(vec![val_bool_field]));
2845
2846 let bools = BooleanArray::from(bools.to_vec());
2847
2848 let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(bools)]).unwrap();
2849 let batch = batch.slice(offset, length);
2850
2851 let data = serialize_stream(&batch);
2852 let batch2 = deserialize_stream(data);
2853 assert_eq!(batch, batch2);
2854 }
2855
2856 #[test]
2857 fn test_run_array_unslice() {
2858 let total_len = 80;
2859 let vals: Vec<Option<i32>> = vec![Some(1), None, Some(2), Some(3), Some(4), None, Some(5)];
2860 let repeats: Vec<usize> = vec![3, 4, 1, 2];
2861 let mut input_array: Vec<Option<i32>> = Vec::with_capacity(total_len);
2862 for ix in 0_usize..32 {
2863 let repeat: usize = repeats[ix % repeats.len()];
2864 let val: Option<i32> = vals[ix % vals.len()];
2865 input_array.resize(input_array.len() + repeat, val);
2866 }
2867
2868 let mut builder =
2870 PrimitiveRunBuilder::<Int16Type, Int32Type>::with_capacity(input_array.len());
2871 builder.extend(input_array.iter().copied());
2872 let run_array = builder.finish();
2873
2874 for slice_len in 1..=total_len {
2876 let sliced_run_array: RunArray<Int16Type> =
2878 run_array.slice(0, slice_len).into_data().into();
2879
2880 let unsliced_run_array = into_zero_offset_run_array(sliced_run_array).unwrap();
2882 let typed = unsliced_run_array
2883 .downcast::<PrimitiveArray<Int32Type>>()
2884 .unwrap();
2885 let expected: Vec<Option<i32>> = input_array.iter().take(slice_len).copied().collect();
2886 let actual: Vec<Option<i32>> = typed.into_iter().collect();
2887 assert_eq!(expected, actual);
2888
2889 let sliced_run_array: RunArray<Int16Type> = run_array
2891 .slice(total_len - slice_len, slice_len)
2892 .into_data()
2893 .into();
2894
2895 let unsliced_run_array = into_zero_offset_run_array(sliced_run_array).unwrap();
2897 let typed = unsliced_run_array
2898 .downcast::<PrimitiveArray<Int32Type>>()
2899 .unwrap();
2900 let expected: Vec<Option<i32>> = input_array
2901 .iter()
2902 .skip(total_len - slice_len)
2903 .copied()
2904 .collect();
2905 let actual: Vec<Option<i32>> = typed.into_iter().collect();
2906 assert_eq!(expected, actual);
2907 }
2908 }
2909
2910 fn generate_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
2911 let mut ls = GenericListBuilder::<O, _>::new(UInt32Builder::new());
2912
2913 for i in 0..100_000 {
2914 for value in [i, i, i] {
2915 ls.values().append_value(value);
2916 }
2917 ls.append(true)
2918 }
2919
2920 ls.finish()
2921 }
2922
2923 fn generate_nested_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
2924 let mut ls =
2925 GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(UInt32Builder::new()));
2926
2927 for _i in 0..10_000 {
2928 for j in 0..10 {
2929 for value in [j, j, j, j] {
2930 ls.values().values().append_value(value);
2931 }
2932 ls.values().append(true)
2933 }
2934 ls.append(true);
2935 }
2936
2937 ls.finish()
2938 }
2939
2940 fn generate_nested_list_data_starting_at_zero<O: OffsetSizeTrait>() -> GenericListArray<O> {
2941 let mut ls =
2942 GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(UInt32Builder::new()));
2943
2944 for _i in 0..999 {
2945 ls.values().append(true);
2946 ls.append(true);
2947 }
2948
2949 for j in 0..10 {
2950 for value in [j, j, j, j] {
2951 ls.values().values().append_value(value);
2952 }
2953 ls.values().append(true)
2954 }
2955 ls.append(true);
2956
2957 for i in 0..9_000 {
2958 for j in 0..10 {
2959 for value in [i + j, i + j, i + j, i + j] {
2960 ls.values().values().append_value(value);
2961 }
2962 ls.values().append(true)
2963 }
2964 ls.append(true);
2965 }
2966
2967 ls.finish()
2968 }
2969
2970 fn generate_map_array_data() -> MapArray {
2971 let keys_builder = UInt32Builder::new();
2972 let values_builder = UInt32Builder::new();
2973
2974 let mut builder = MapBuilder::new(None, keys_builder, values_builder);
2975
2976 for i in 0..100_000 {
2977 for _j in 0..3 {
2978 builder.keys().append_value(i);
2979 builder.values().append_value(i * 2);
2980 }
2981 builder.append(true).unwrap();
2982 }
2983
2984 builder.finish()
2985 }
2986
2987 #[test]
2988 fn reencode_offsets_when_first_offset_is_not_zero() {
2989 let original_list = generate_list_data::<i32>();
2990 let original_data = original_list.into_data();
2991 let slice_data = original_data.slice(75, 7);
2992 let (new_offsets, original_start, length) =
2993 reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
2994 assert_eq!(
2995 vec![0, 3, 6, 9, 12, 15, 18, 21],
2996 new_offsets.typed_data::<i32>()
2997 );
2998 assert_eq!(225, original_start);
2999 assert_eq!(21, length);
3000 }
3001
3002 #[test]
3003 fn reencode_offsets_when_first_offset_is_zero() {
3004 let mut ls = GenericListBuilder::<i32, _>::new(UInt32Builder::new());
3005 ls.append(true);
3007 ls.values().append_value(35);
3008 ls.values().append_value(42);
3009 ls.append(true);
3010 let original_list = ls.finish();
3011 let original_data = original_list.into_data();
3012
3013 let slice_data = original_data.slice(1, 1);
3014 let (new_offsets, original_start, length) =
3015 reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
3016 assert_eq!(vec![0, 2], new_offsets.typed_data::<i32>());
3017 assert_eq!(0, original_start);
3018 assert_eq!(2, length);
3019 }
3020
3021 fn roundtrip_ensure_sliced_smaller(in_batch: RecordBatch, expected_size_factor: usize) {
3024 let in_sliced = in_batch.slice(999, 1);
3026
3027 let bytes_batch = serialize_file(&in_batch);
3028 let bytes_sliced = serialize_file(&in_sliced);
3029
3030 assert!(bytes_sliced.len() < (bytes_batch.len() / expected_size_factor));
3032
3033 let out_batch = deserialize_file(bytes_batch);
3035 assert_eq!(in_batch, out_batch);
3036
3037 let out_sliced = deserialize_file(bytes_sliced);
3038 assert_eq!(in_sliced, out_sliced);
3039 }
3040
3041 #[test]
3042 fn encode_lists() {
3043 let val_inner = Field::new_list_field(DataType::UInt32, true);
3044 let val_list_field = Field::new("val", DataType::List(Arc::new(val_inner)), false);
3045 let schema = Arc::new(Schema::new(vec![val_list_field]));
3046
3047 let values = Arc::new(generate_list_data::<i32>());
3048
3049 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3050 roundtrip_ensure_sliced_smaller(in_batch, 1000);
3051 }
3052
3053 #[test]
3054 fn encode_empty_list() {
3055 let val_inner = Field::new_list_field(DataType::UInt32, true);
3056 let val_list_field = Field::new("val", DataType::List(Arc::new(val_inner)), false);
3057 let schema = Arc::new(Schema::new(vec![val_list_field]));
3058
3059 let values = Arc::new(generate_list_data::<i32>());
3060
3061 let in_batch = RecordBatch::try_new(schema, vec![values])
3062 .unwrap()
3063 .slice(999, 0);
3064 let out_batch = deserialize_file(serialize_file(&in_batch));
3065 assert_eq!(in_batch, out_batch);
3066 }
3067
3068 #[test]
3069 fn encode_large_lists() {
3070 let val_inner = Field::new_list_field(DataType::UInt32, true);
3071 let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3072 let schema = Arc::new(Schema::new(vec![val_list_field]));
3073
3074 let values = Arc::new(generate_list_data::<i64>());
3075
3076 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3079 roundtrip_ensure_sliced_smaller(in_batch, 1000);
3080 }
3081
3082 #[test]
3083 fn encode_nested_lists() {
3084 let inner_int = Arc::new(Field::new_list_field(DataType::UInt32, true));
3085 let inner_list_field = Arc::new(Field::new_list_field(DataType::List(inner_int), true));
3086 let list_field = Field::new("val", DataType::List(inner_list_field), true);
3087 let schema = Arc::new(Schema::new(vec![list_field]));
3088
3089 let values = Arc::new(generate_nested_list_data::<i32>());
3090
3091 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3092 roundtrip_ensure_sliced_smaller(in_batch, 1000);
3093 }
3094
3095 #[test]
3096 fn encode_nested_lists_starting_at_zero() {
3097 let inner_int = Arc::new(Field::new("item", DataType::UInt32, true));
3098 let inner_list_field = Arc::new(Field::new("item", DataType::List(inner_int), true));
3099 let list_field = Field::new("val", DataType::List(inner_list_field), true);
3100 let schema = Arc::new(Schema::new(vec![list_field]));
3101
3102 let values = Arc::new(generate_nested_list_data_starting_at_zero::<i32>());
3103
3104 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3105 roundtrip_ensure_sliced_smaller(in_batch, 1);
3106 }
3107
3108 #[test]
3109 fn encode_map_array() {
3110 let keys = Arc::new(Field::new("keys", DataType::UInt32, false));
3111 let values = Arc::new(Field::new("values", DataType::UInt32, true));
3112 let map_field = Field::new_map("map", "entries", keys, values, false, true);
3113 let schema = Arc::new(Schema::new(vec![map_field]));
3114
3115 let values = Arc::new(generate_map_array_data());
3116
3117 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3118 roundtrip_ensure_sliced_smaller(in_batch, 1000);
3119 }
3120
3121 #[test]
3122 fn test_decimal128_alignment16_is_sufficient() {
3123 const IPC_ALIGNMENT: usize = 16;
3124
3125 for num_cols in [1, 2, 3, 17, 50, 73, 99] {
3130 let num_rows = (num_cols * 7 + 11) % 100; let mut fields = Vec::new();
3133 let mut arrays = Vec::new();
3134 for i in 0..num_cols {
3135 let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
3136 let array = Decimal128Array::from(vec![num_cols as i128; num_rows]);
3137 fields.push(field);
3138 arrays.push(Arc::new(array) as Arc<dyn Array>);
3139 }
3140 let schema = Schema::new(fields);
3141 let batch = RecordBatch::try_new(Arc::new(schema), arrays).unwrap();
3142
3143 let mut writer = FileWriter::try_new_with_options(
3144 Vec::new(),
3145 batch.schema_ref(),
3146 IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
3147 )
3148 .unwrap();
3149 writer.write(&batch).unwrap();
3150 writer.finish().unwrap();
3151
3152 let out: Vec<u8> = writer.into_inner().unwrap();
3153
3154 let buffer = Buffer::from_vec(out);
3155 let trailer_start = buffer.len() - 10;
3156 let footer_len =
3157 read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
3158 let footer =
3159 root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
3160
3161 let schema = fb_to_schema(footer.schema().unwrap());
3162
3163 let decoder =
3166 FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true);
3167
3168 let batches = footer.recordBatches().unwrap();
3169
3170 let block = batches.get(0);
3171 let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
3172 let data = buffer.slice_with_length(block.offset() as _, block_len);
3173
3174 let batch2 = decoder.read_record_batch(block, &data).unwrap().unwrap();
3175
3176 assert_eq!(batch, batch2);
3177 }
3178 }
3179
3180 #[test]
3181 fn test_decimal128_alignment8_is_unaligned() {
3182 const IPC_ALIGNMENT: usize = 8;
3183
3184 let num_cols = 2;
3185 let num_rows = 1;
3186
3187 let mut fields = Vec::new();
3188 let mut arrays = Vec::new();
3189 for i in 0..num_cols {
3190 let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
3191 let array = Decimal128Array::from(vec![num_cols as i128; num_rows]);
3192 fields.push(field);
3193 arrays.push(Arc::new(array) as Arc<dyn Array>);
3194 }
3195 let schema = Schema::new(fields);
3196 let batch = RecordBatch::try_new(Arc::new(schema), arrays).unwrap();
3197
3198 let mut writer = FileWriter::try_new_with_options(
3199 Vec::new(),
3200 batch.schema_ref(),
3201 IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
3202 )
3203 .unwrap();
3204 writer.write(&batch).unwrap();
3205 writer.finish().unwrap();
3206
3207 let out: Vec<u8> = writer.into_inner().unwrap();
3208
3209 let buffer = Buffer::from_vec(out);
3210 let trailer_start = buffer.len() - 10;
3211 let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
3212 let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
3213 let schema = fb_to_schema(footer.schema().unwrap());
3214
3215 let decoder =
3218 FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true);
3219
3220 let batches = footer.recordBatches().unwrap();
3221
3222 let block = batches.get(0);
3223 let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
3224 let data = buffer.slice_with_length(block.offset() as _, block_len);
3225
3226 let result = decoder.read_record_batch(block, &data);
3227
3228 let error = result.unwrap_err();
3229 assert_eq!(
3230 error.to_string(),
3231 "Invalid argument error: Misaligned buffers[0] in array of type Decimal128(38, 10), \
3232 offset from expected alignment of 16 by 8"
3233 );
3234 }
3235
3236 #[test]
3237 fn test_flush() {
3238 let num_cols = 2;
3241 let mut fields = Vec::new();
3242 let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap();
3243 for i in 0..num_cols {
3244 let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
3245 fields.push(field);
3246 }
3247 let schema = Schema::new(fields);
3248 let inner_stream_writer = BufWriter::with_capacity(1024, Vec::new());
3249 let inner_file_writer = BufWriter::with_capacity(1024, Vec::new());
3250 let mut stream_writer =
3251 StreamWriter::try_new_with_options(inner_stream_writer, &schema, options.clone())
3252 .unwrap();
3253 let mut file_writer =
3254 FileWriter::try_new_with_options(inner_file_writer, &schema, options).unwrap();
3255
3256 let stream_bytes_written_on_new = stream_writer.get_ref().get_ref().len();
3257 let file_bytes_written_on_new = file_writer.get_ref().get_ref().len();
3258 stream_writer.flush().unwrap();
3259 file_writer.flush().unwrap();
3260 let stream_bytes_written_on_flush = stream_writer.get_ref().get_ref().len();
3261 let file_bytes_written_on_flush = file_writer.get_ref().get_ref().len();
3262 let stream_out = stream_writer.into_inner().unwrap().into_inner().unwrap();
3263 let expected_stream_flushed_bytes = stream_out.len() - 8;
3267 let expected_file_flushed_bytes = expected_stream_flushed_bytes + 8;
3270
3271 assert!(
3272 stream_bytes_written_on_new < stream_bytes_written_on_flush,
3273 "this test makes no sense if flush is not actually required"
3274 );
3275 assert!(
3276 file_bytes_written_on_new < file_bytes_written_on_flush,
3277 "this test makes no sense if flush is not actually required"
3278 );
3279 assert_eq!(stream_bytes_written_on_flush, expected_stream_flushed_bytes);
3280 assert_eq!(file_bytes_written_on_flush, expected_file_flushed_bytes);
3281 }
3282
3283 #[test]
3284 fn test_roundtrip_list_of_fixed_list() -> Result<(), ArrowError> {
3285 let l1_type =
3286 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, false)), 3);
3287 let l2_type = DataType::List(Arc::new(Field::new("item", l1_type.clone(), false)));
3288
3289 let l0_builder = Float32Builder::new();
3290 let l1_builder = FixedSizeListBuilder::new(l0_builder, 3).with_field(Arc::new(Field::new(
3291 "item",
3292 DataType::Float32,
3293 false,
3294 )));
3295 let mut l2_builder =
3296 ListBuilder::new(l1_builder).with_field(Arc::new(Field::new("item", l1_type, false)));
3297
3298 for point in [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0], [7.0, 8.0, 9.0]] {
3299 l2_builder.values().values().append_value(point[0]);
3300 l2_builder.values().values().append_value(point[1]);
3301 l2_builder.values().values().append_value(point[2]);
3302
3303 l2_builder.values().append(true);
3304 }
3305 l2_builder.append(true);
3306
3307 let point = [10., 11., 12.];
3308 l2_builder.values().values().append_value(point[0]);
3309 l2_builder.values().values().append_value(point[1]);
3310 l2_builder.values().values().append_value(point[2]);
3311
3312 l2_builder.values().append(true);
3313 l2_builder.append(true);
3314
3315 let array = Arc::new(l2_builder.finish()) as ArrayRef;
3316
3317 let schema = Arc::new(Schema::new_with_metadata(
3318 vec![Field::new("points", l2_type, false)],
3319 HashMap::default(),
3320 ));
3321
3322 test_slices(&array, &schema, 0, 1)?;
3325 test_slices(&array, &schema, 0, 2)?;
3326 test_slices(&array, &schema, 1, 1)?;
3327
3328 Ok(())
3329 }
3330
3331 #[test]
3332 fn test_roundtrip_list_of_fixed_list_w_nulls() -> Result<(), ArrowError> {
3333 let l0_builder = Float32Builder::new();
3334 let l1_builder = FixedSizeListBuilder::new(l0_builder, 3);
3335 let mut l2_builder = ListBuilder::new(l1_builder);
3336
3337 for point in [
3338 [Some(1.0), Some(2.0), None],
3339 [Some(4.0), Some(5.0), Some(6.0)],
3340 [None, Some(8.0), Some(9.0)],
3341 ] {
3342 for p in point {
3343 match p {
3344 Some(p) => l2_builder.values().values().append_value(p),
3345 None => l2_builder.values().values().append_null(),
3346 }
3347 }
3348
3349 l2_builder.values().append(true);
3350 }
3351 l2_builder.append(true);
3352
3353 let point = [Some(10.), None, None];
3354 for p in point {
3355 match p {
3356 Some(p) => l2_builder.values().values().append_value(p),
3357 None => l2_builder.values().values().append_null(),
3358 }
3359 }
3360
3361 l2_builder.values().append(true);
3362 l2_builder.append(true);
3363
3364 let array = Arc::new(l2_builder.finish()) as ArrayRef;
3365
3366 let schema = Arc::new(Schema::new_with_metadata(
3367 vec![Field::new(
3368 "points",
3369 DataType::List(Arc::new(Field::new(
3370 "item",
3371 DataType::FixedSizeList(
3372 Arc::new(Field::new("item", DataType::Float32, true)),
3373 3,
3374 ),
3375 true,
3376 ))),
3377 true,
3378 )],
3379 HashMap::default(),
3380 ));
3381
3382 test_slices(&array, &schema, 0, 1)?;
3385 test_slices(&array, &schema, 0, 2)?;
3386 test_slices(&array, &schema, 1, 1)?;
3387
3388 Ok(())
3389 }
3390
3391 fn test_slices(
3392 parent_array: &ArrayRef,
3393 schema: &SchemaRef,
3394 offset: usize,
3395 length: usize,
3396 ) -> Result<(), ArrowError> {
3397 let subarray = parent_array.slice(offset, length);
3398 let original_batch = RecordBatch::try_new(schema.clone(), vec![subarray])?;
3399
3400 let mut bytes = Vec::new();
3401 let mut writer = StreamWriter::try_new(&mut bytes, schema)?;
3402 writer.write(&original_batch)?;
3403 writer.finish()?;
3404
3405 let mut cursor = std::io::Cursor::new(bytes);
3406 let mut reader = StreamReader::try_new(&mut cursor, None)?;
3407 let returned_batch = reader.next().unwrap()?;
3408
3409 assert_eq!(original_batch, returned_batch);
3410
3411 Ok(())
3412 }
3413
3414 #[test]
3415 fn test_roundtrip_fixed_list() -> Result<(), ArrowError> {
3416 let int_builder = Int64Builder::new();
3417 let mut fixed_list_builder = FixedSizeListBuilder::new(int_builder, 3)
3418 .with_field(Arc::new(Field::new("item", DataType::Int64, false)));
3419
3420 for point in [[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]] {
3421 fixed_list_builder.values().append_value(point[0]);
3422 fixed_list_builder.values().append_value(point[1]);
3423 fixed_list_builder.values().append_value(point[2]);
3424
3425 fixed_list_builder.append(true);
3426 }
3427
3428 let array = Arc::new(fixed_list_builder.finish()) as ArrayRef;
3429
3430 let schema = Arc::new(Schema::new_with_metadata(
3431 vec![Field::new(
3432 "points",
3433 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, false)), 3),
3434 false,
3435 )],
3436 HashMap::default(),
3437 ));
3438
3439 test_slices(&array, &schema, 0, 4)?;
3442 test_slices(&array, &schema, 0, 2)?;
3443 test_slices(&array, &schema, 1, 3)?;
3444 test_slices(&array, &schema, 2, 1)?;
3445
3446 Ok(())
3447 }
3448
3449 #[test]
3450 fn test_roundtrip_fixed_list_w_nulls() -> Result<(), ArrowError> {
3451 let int_builder = Int64Builder::new();
3452 let mut fixed_list_builder = FixedSizeListBuilder::new(int_builder, 3);
3453
3454 for point in [
3455 [Some(1), Some(2), None],
3456 [Some(4), Some(5), Some(6)],
3457 [None, Some(8), Some(9)],
3458 [Some(10), None, None],
3459 ] {
3460 for p in point {
3461 match p {
3462 Some(p) => fixed_list_builder.values().append_value(p),
3463 None => fixed_list_builder.values().append_null(),
3464 }
3465 }
3466
3467 fixed_list_builder.append(true);
3468 }
3469
3470 let array = Arc::new(fixed_list_builder.finish()) as ArrayRef;
3471
3472 let schema = Arc::new(Schema::new_with_metadata(
3473 vec![Field::new(
3474 "points",
3475 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, true)), 3),
3476 true,
3477 )],
3478 HashMap::default(),
3479 ));
3480
3481 test_slices(&array, &schema, 0, 4)?;
3484 test_slices(&array, &schema, 0, 2)?;
3485 test_slices(&array, &schema, 1, 3)?;
3486 test_slices(&array, &schema, 2, 1)?;
3487
3488 Ok(())
3489 }
3490
3491 #[test]
3492 fn test_metadata_encoding_ordering() {
3493 fn create_hash() -> u64 {
3494 let metadata: HashMap<String, String> = [
3495 ("a", "1"), ("b", "2"), ("c", "3"), ("d", "4"), ("e", "5"), ]
3501 .into_iter()
3502 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3503 .collect();
3504
3505 let schema = Arc::new(
3507 Schema::new(vec![
3508 Field::new("a", DataType::Int64, true).with_metadata(metadata.clone()),
3509 ])
3510 .with_metadata(metadata)
3511 .clone(),
3512 );
3513 let batch = RecordBatch::new_empty(schema.clone());
3514
3515 let mut bytes = Vec::new();
3516 let mut w = StreamWriter::try_new(&mut bytes, batch.schema_ref()).unwrap();
3517 w.write(&batch).unwrap();
3518 w.finish().unwrap();
3519
3520 let mut h = std::hash::DefaultHasher::new();
3521 h.write(&bytes);
3522 h.finish()
3523 }
3524
3525 let expected = create_hash();
3526
3527 let all_passed = (0..20).all(|_| create_hash() == expected);
3532 assert!(all_passed);
3533 }
3534}