1use std::cmp::min;
28use std::collections::HashMap;
29use std::io::{BufWriter, Write};
30use std::mem::size_of;
31use std::sync::Arc;
32
33use flatbuffers::FlatBufferBuilder;
34
35use arrow_array::builder::BufferBuilder;
36use arrow_array::cast::*;
37use arrow_array::types::{Int16Type, Int32Type, Int64Type, RunEndIndexType};
38use arrow_array::*;
39use arrow_buffer::bit_util;
40use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer};
41use arrow_data::{ArrayData, ArrayDataBuilder, BufferSpec, layout};
42use arrow_schema::*;
43
44use crate::CONTINUATION_MARKER;
45use crate::compression::CompressionCodec;
46pub use crate::compression::CompressionContext;
47use crate::convert::IpcSchemaEncoder;
48
49#[derive(Debug, Clone)]
51pub struct IpcWriteOptions {
52 alignment: u8,
55 write_legacy_ipc_format: bool,
57 metadata_version: crate::MetadataVersion,
66 batch_compression_type: Option<crate::CompressionType>,
69 dictionary_handling: DictionaryHandling,
71}
72
73impl IpcWriteOptions {
74 pub fn try_with_compression(
79 mut self,
80 batch_compression_type: Option<crate::CompressionType>,
81 ) -> Result<Self, ArrowError> {
82 self.batch_compression_type = batch_compression_type;
83
84 if self.batch_compression_type.is_some()
85 && self.metadata_version < crate::MetadataVersion::V5
86 {
87 return Err(ArrowError::InvalidArgumentError(
88 "Compression only supported in metadata v5 and above".to_string(),
89 ));
90 }
91 Ok(self)
92 }
93 pub fn try_new(
95 alignment: usize,
96 write_legacy_ipc_format: bool,
97 metadata_version: crate::MetadataVersion,
98 ) -> Result<Self, ArrowError> {
99 let is_alignment_valid =
100 alignment == 8 || alignment == 16 || alignment == 32 || alignment == 64;
101 if !is_alignment_valid {
102 return Err(ArrowError::InvalidArgumentError(
103 "Alignment should be 8, 16, 32, or 64.".to_string(),
104 ));
105 }
106 let alignment: u8 = u8::try_from(alignment).expect("range already checked");
107 match metadata_version {
108 crate::MetadataVersion::V1
109 | crate::MetadataVersion::V2
110 | crate::MetadataVersion::V3 => Err(ArrowError::InvalidArgumentError(
111 "Writing IPC metadata version 3 and lower not supported".to_string(),
112 )),
113 #[allow(deprecated)]
114 crate::MetadataVersion::V4 => Ok(Self {
115 alignment,
116 write_legacy_ipc_format,
117 metadata_version,
118 batch_compression_type: None,
119 dictionary_handling: DictionaryHandling::default(),
120 }),
121 crate::MetadataVersion::V5 => {
122 if write_legacy_ipc_format {
123 Err(ArrowError::InvalidArgumentError(
124 "Legacy IPC format only supported on metadata version 4".to_string(),
125 ))
126 } else {
127 Ok(Self {
128 alignment,
129 write_legacy_ipc_format,
130 metadata_version,
131 batch_compression_type: None,
132 dictionary_handling: DictionaryHandling::default(),
133 })
134 }
135 }
136 z => Err(ArrowError::InvalidArgumentError(format!(
137 "Unsupported crate::MetadataVersion {z:?}"
138 ))),
139 }
140 }
141
142 pub fn with_dictionary_handling(mut self, dictionary_handling: DictionaryHandling) -> Self {
144 self.dictionary_handling = dictionary_handling;
145 self
146 }
147}
148
149impl Default for IpcWriteOptions {
150 fn default() -> Self {
151 Self {
152 alignment: 64,
153 write_legacy_ipc_format: false,
154 metadata_version: crate::MetadataVersion::V5,
155 batch_compression_type: None,
156 dictionary_handling: DictionaryHandling::default(),
157 }
158 }
159}
160
161#[derive(Debug, Default)]
162pub struct IpcDataGenerator {}
196
197impl IpcDataGenerator {
198 pub fn schema_to_bytes_with_dictionary_tracker(
201 &self,
202 schema: &Schema,
203 dictionary_tracker: &mut DictionaryTracker,
204 write_options: &IpcWriteOptions,
205 ) -> EncodedData {
206 let mut fbb = FlatBufferBuilder::new();
207 let schema = {
208 let fb = IpcSchemaEncoder::new()
209 .with_dictionary_tracker(dictionary_tracker)
210 .schema_to_fb_offset(&mut fbb, schema);
211 fb.as_union_value()
212 };
213
214 let mut message = crate::MessageBuilder::new(&mut fbb);
215 message.add_version(write_options.metadata_version);
216 message.add_header_type(crate::MessageHeader::Schema);
217 message.add_bodyLength(0);
218 message.add_header(schema);
219 let data = message.finish();
221 fbb.finish(data, None);
222
223 let data = fbb.finished_data();
224 EncodedData {
225 ipc_message: data.to_vec(),
226 arrow_data: vec![],
227 }
228 }
229
230 fn _encode_dictionaries<I: Iterator<Item = i64>>(
231 &self,
232 column: &ArrayRef,
233 encoded_dictionaries: &mut Vec<EncodedData>,
234 dictionary_tracker: &mut DictionaryTracker,
235 write_options: &IpcWriteOptions,
236 dict_id: &mut I,
237 compression_context: &mut CompressionContext,
238 ) -> Result<(), ArrowError> {
239 match column.data_type() {
240 DataType::Struct(fields) => {
241 let s = as_struct_array(column);
242 for (field, column) in fields.iter().zip(s.columns()) {
243 self.encode_dictionaries(
244 field,
245 column,
246 encoded_dictionaries,
247 dictionary_tracker,
248 write_options,
249 dict_id,
250 compression_context,
251 )?;
252 }
253 }
254 DataType::RunEndEncoded(_, values) => {
255 let data = column.to_data();
256 if data.child_data().len() != 2 {
257 return Err(ArrowError::InvalidArgumentError(format!(
258 "The run encoded array should have exactly two child arrays. Found {}",
259 data.child_data().len()
260 )));
261 }
262 let values_array = make_array(data.child_data()[1].clone());
265 self.encode_dictionaries(
266 values,
267 &values_array,
268 encoded_dictionaries,
269 dictionary_tracker,
270 write_options,
271 dict_id,
272 compression_context,
273 )?;
274 }
275 DataType::List(field) => {
276 let list = as_list_array(column);
277 self.encode_dictionaries(
278 field,
279 list.values(),
280 encoded_dictionaries,
281 dictionary_tracker,
282 write_options,
283 dict_id,
284 compression_context,
285 )?;
286 }
287 DataType::LargeList(field) => {
288 let list = as_large_list_array(column);
289 self.encode_dictionaries(
290 field,
291 list.values(),
292 encoded_dictionaries,
293 dictionary_tracker,
294 write_options,
295 dict_id,
296 compression_context,
297 )?;
298 }
299 DataType::FixedSizeList(field, _) => {
300 let list = column
301 .as_any()
302 .downcast_ref::<FixedSizeListArray>()
303 .expect("Unable to downcast to fixed size list array");
304 self.encode_dictionaries(
305 field,
306 list.values(),
307 encoded_dictionaries,
308 dictionary_tracker,
309 write_options,
310 dict_id,
311 compression_context,
312 )?;
313 }
314 DataType::Map(field, _) => {
315 let map_array = as_map_array(column);
316
317 let (keys, values) = match field.data_type() {
318 DataType::Struct(fields) if fields.len() == 2 => (&fields[0], &fields[1]),
319 _ => panic!("Incorrect field data type {:?}", field.data_type()),
320 };
321
322 self.encode_dictionaries(
324 keys,
325 map_array.keys(),
326 encoded_dictionaries,
327 dictionary_tracker,
328 write_options,
329 dict_id,
330 compression_context,
331 )?;
332
333 self.encode_dictionaries(
335 values,
336 map_array.values(),
337 encoded_dictionaries,
338 dictionary_tracker,
339 write_options,
340 dict_id,
341 compression_context,
342 )?;
343 }
344 DataType::Union(fields, _) => {
345 let union = as_union_array(column);
346 for (type_id, field) in fields.iter() {
347 let column = union.child(type_id);
348 self.encode_dictionaries(
349 field,
350 column,
351 encoded_dictionaries,
352 dictionary_tracker,
353 write_options,
354 dict_id,
355 compression_context,
356 )?;
357 }
358 }
359 _ => (),
360 }
361
362 Ok(())
363 }
364
365 #[allow(clippy::too_many_arguments)]
366 fn encode_dictionaries<I: Iterator<Item = i64>>(
367 &self,
368 field: &Field,
369 column: &ArrayRef,
370 encoded_dictionaries: &mut Vec<EncodedData>,
371 dictionary_tracker: &mut DictionaryTracker,
372 write_options: &IpcWriteOptions,
373 dict_id_seq: &mut I,
374 compression_context: &mut CompressionContext,
375 ) -> Result<(), ArrowError> {
376 match column.data_type() {
377 DataType::Dictionary(_key_type, _value_type) => {
378 let dict_data = column.to_data();
379 let dict_values = &dict_data.child_data()[0];
380
381 let values = make_array(dict_data.child_data()[0].clone());
382
383 self._encode_dictionaries(
384 &values,
385 encoded_dictionaries,
386 dictionary_tracker,
387 write_options,
388 dict_id_seq,
389 compression_context,
390 )?;
391
392 let dict_id = dict_id_seq.next().ok_or_else(|| {
396 ArrowError::IpcError(format!("no dict id for field {}", field.name()))
397 })?;
398
399 match dictionary_tracker.insert_column(
400 dict_id,
401 column,
402 write_options.dictionary_handling,
403 )? {
404 DictionaryUpdate::None => {}
405 DictionaryUpdate::New | DictionaryUpdate::Replaced => {
406 encoded_dictionaries.push(self.dictionary_batch_to_bytes(
407 dict_id,
408 dict_values,
409 write_options,
410 false,
411 compression_context,
412 )?);
413 }
414 DictionaryUpdate::Delta(data) => {
415 encoded_dictionaries.push(self.dictionary_batch_to_bytes(
416 dict_id,
417 &data,
418 write_options,
419 true,
420 compression_context,
421 )?);
422 }
423 }
424 }
425 _ => self._encode_dictionaries(
426 column,
427 encoded_dictionaries,
428 dictionary_tracker,
429 write_options,
430 dict_id_seq,
431 compression_context,
432 )?,
433 }
434
435 Ok(())
436 }
437
438 pub fn encode(
442 &self,
443 batch: &RecordBatch,
444 dictionary_tracker: &mut DictionaryTracker,
445 write_options: &IpcWriteOptions,
446 compression_context: &mut CompressionContext,
447 ) -> Result<(Vec<EncodedData>, EncodedData), ArrowError> {
448 let schema = batch.schema();
449 let mut encoded_dictionaries = Vec::with_capacity(schema.flattened_fields().len());
450
451 let mut dict_id = dictionary_tracker.dict_ids.clone().into_iter();
452
453 for (i, field) in schema.fields().iter().enumerate() {
454 let column = batch.column(i);
455 self.encode_dictionaries(
456 field,
457 column,
458 &mut encoded_dictionaries,
459 dictionary_tracker,
460 write_options,
461 &mut dict_id,
462 compression_context,
463 )?;
464 }
465
466 let encoded_message =
467 self.record_batch_to_bytes(batch, write_options, compression_context)?;
468 Ok((encoded_dictionaries, encoded_message))
469 }
470
471 #[deprecated(since = "57.0.0", note = "Use `encode` instead")]
475 pub fn encoded_batch(
476 &self,
477 batch: &RecordBatch,
478 dictionary_tracker: &mut DictionaryTracker,
479 write_options: &IpcWriteOptions,
480 ) -> Result<(Vec<EncodedData>, EncodedData), ArrowError> {
481 self.encode(
482 batch,
483 dictionary_tracker,
484 write_options,
485 &mut Default::default(),
486 )
487 }
488
489 fn record_batch_to_bytes(
492 &self,
493 batch: &RecordBatch,
494 write_options: &IpcWriteOptions,
495 compression_context: &mut CompressionContext,
496 ) -> Result<EncodedData, ArrowError> {
497 let mut fbb = FlatBufferBuilder::new();
498
499 let mut nodes: Vec<crate::FieldNode> = vec![];
500 let mut buffers: Vec<crate::Buffer> = vec![];
501 let mut arrow_data: Vec<u8> = vec![];
502 let mut offset = 0;
503
504 let batch_compression_type = write_options.batch_compression_type;
506
507 let compression = batch_compression_type.map(|batch_compression_type| {
508 let mut c = crate::BodyCompressionBuilder::new(&mut fbb);
509 c.add_method(crate::BodyCompressionMethod::BUFFER);
510 c.add_codec(batch_compression_type);
511 c.finish()
512 });
513
514 let compression_codec: Option<CompressionCodec> =
515 batch_compression_type.map(TryInto::try_into).transpose()?;
516
517 let mut variadic_buffer_counts = vec![];
518
519 for array in batch.columns() {
520 let array_data = array.to_data();
521 offset = write_array_data(
522 &array_data,
523 &mut buffers,
524 &mut arrow_data,
525 &mut nodes,
526 offset,
527 array.len(),
528 array.null_count(),
529 compression_codec,
530 compression_context,
531 write_options,
532 )?;
533
534 append_variadic_buffer_counts(&mut variadic_buffer_counts, &array_data);
535 }
536 let len = arrow_data.len();
538 let pad_len = pad_to_alignment(write_options.alignment, len);
539 arrow_data.extend_from_slice(&PADDING[..pad_len]);
540
541 let buffers = fbb.create_vector(&buffers);
543 let nodes = fbb.create_vector(&nodes);
544 let variadic_buffer = if variadic_buffer_counts.is_empty() {
545 None
546 } else {
547 Some(fbb.create_vector(&variadic_buffer_counts))
548 };
549
550 let root = {
551 let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb);
552 batch_builder.add_length(batch.num_rows() as i64);
553 batch_builder.add_nodes(nodes);
554 batch_builder.add_buffers(buffers);
555 if let Some(c) = compression {
556 batch_builder.add_compression(c);
557 }
558
559 if let Some(v) = variadic_buffer {
560 batch_builder.add_variadicBufferCounts(v);
561 }
562 let b = batch_builder.finish();
563 b.as_union_value()
564 };
565 let mut message = crate::MessageBuilder::new(&mut fbb);
567 message.add_version(write_options.metadata_version);
568 message.add_header_type(crate::MessageHeader::RecordBatch);
569 message.add_bodyLength(arrow_data.len() as i64);
570 message.add_header(root);
571 let root = message.finish();
572 fbb.finish(root, None);
573 let finished_data = fbb.finished_data();
574
575 Ok(EncodedData {
576 ipc_message: finished_data.to_vec(),
577 arrow_data,
578 })
579 }
580
581 fn dictionary_batch_to_bytes(
584 &self,
585 dict_id: i64,
586 array_data: &ArrayData,
587 write_options: &IpcWriteOptions,
588 is_delta: bool,
589 compression_context: &mut CompressionContext,
590 ) -> Result<EncodedData, ArrowError> {
591 let mut fbb = FlatBufferBuilder::new();
592
593 let mut nodes: Vec<crate::FieldNode> = vec![];
594 let mut buffers: Vec<crate::Buffer> = vec![];
595 let mut arrow_data: Vec<u8> = vec![];
596
597 let batch_compression_type = write_options.batch_compression_type;
599
600 let compression = batch_compression_type.map(|batch_compression_type| {
601 let mut c = crate::BodyCompressionBuilder::new(&mut fbb);
602 c.add_method(crate::BodyCompressionMethod::BUFFER);
603 c.add_codec(batch_compression_type);
604 c.finish()
605 });
606
607 let compression_codec: Option<CompressionCodec> = batch_compression_type
608 .map(|batch_compression_type| batch_compression_type.try_into())
609 .transpose()?;
610
611 write_array_data(
612 array_data,
613 &mut buffers,
614 &mut arrow_data,
615 &mut nodes,
616 0,
617 array_data.len(),
618 array_data.null_count(),
619 compression_codec,
620 compression_context,
621 write_options,
622 )?;
623
624 let mut variadic_buffer_counts = vec![];
625 append_variadic_buffer_counts(&mut variadic_buffer_counts, array_data);
626
627 let len = arrow_data.len();
629 let pad_len = pad_to_alignment(write_options.alignment, len);
630 arrow_data.extend_from_slice(&PADDING[..pad_len]);
631
632 let buffers = fbb.create_vector(&buffers);
634 let nodes = fbb.create_vector(&nodes);
635 let variadic_buffer = if variadic_buffer_counts.is_empty() {
636 None
637 } else {
638 Some(fbb.create_vector(&variadic_buffer_counts))
639 };
640
641 let root = {
642 let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb);
643 batch_builder.add_length(array_data.len() as i64);
644 batch_builder.add_nodes(nodes);
645 batch_builder.add_buffers(buffers);
646 if let Some(c) = compression {
647 batch_builder.add_compression(c);
648 }
649 if let Some(v) = variadic_buffer {
650 batch_builder.add_variadicBufferCounts(v);
651 }
652 batch_builder.finish()
653 };
654
655 let root = {
656 let mut batch_builder = crate::DictionaryBatchBuilder::new(&mut fbb);
657 batch_builder.add_id(dict_id);
658 batch_builder.add_data(root);
659 batch_builder.add_isDelta(is_delta);
660 batch_builder.finish().as_union_value()
661 };
662
663 let root = {
664 let mut message_builder = crate::MessageBuilder::new(&mut fbb);
665 message_builder.add_version(write_options.metadata_version);
666 message_builder.add_header_type(crate::MessageHeader::DictionaryBatch);
667 message_builder.add_bodyLength(arrow_data.len() as i64);
668 message_builder.add_header(root);
669 message_builder.finish()
670 };
671
672 fbb.finish(root, None);
673 let finished_data = fbb.finished_data();
674
675 Ok(EncodedData {
676 ipc_message: finished_data.to_vec(),
677 arrow_data,
678 })
679 }
680}
681
682fn append_variadic_buffer_counts(counts: &mut Vec<i64>, array: &ArrayData) {
683 match array.data_type() {
684 DataType::BinaryView | DataType::Utf8View => {
685 counts.push(array.buffers().len() as i64 - 1);
688 }
689 DataType::Dictionary(_, _) => {
690 }
693 _ => {
694 for child in array.child_data() {
695 append_variadic_buffer_counts(counts, child)
696 }
697 }
698 }
699}
700
701pub(crate) fn unslice_run_array(arr: ArrayData) -> Result<ArrayData, ArrowError> {
702 match arr.data_type() {
703 DataType::RunEndEncoded(k, _) => match k.data_type() {
704 DataType::Int16 => {
705 Ok(into_zero_offset_run_array(RunArray::<Int16Type>::from(arr))?.into_data())
706 }
707 DataType::Int32 => {
708 Ok(into_zero_offset_run_array(RunArray::<Int32Type>::from(arr))?.into_data())
709 }
710 DataType::Int64 => {
711 Ok(into_zero_offset_run_array(RunArray::<Int64Type>::from(arr))?.into_data())
712 }
713 d => unreachable!("Unexpected data type {d}"),
714 },
715 d => Err(ArrowError::InvalidArgumentError(format!(
716 "The given array is not a run array. Data type of given array: {d}"
717 ))),
718 }
719}
720
721fn into_zero_offset_run_array<R: RunEndIndexType>(
724 run_array: RunArray<R>,
725) -> Result<RunArray<R>, ArrowError> {
726 let run_ends = run_array.run_ends();
727 if run_ends.offset() == 0 && run_ends.max_value() == run_ends.len() {
728 return Ok(run_array);
729 }
730
731 let start_physical_index = run_ends.get_start_physical_index();
733
734 let end_physical_index = run_ends.get_end_physical_index();
736
737 let physical_length = end_physical_index - start_physical_index + 1;
738
739 let offset = R::Native::usize_as(run_ends.offset());
741 let mut builder = BufferBuilder::<R::Native>::new(physical_length);
742 for run_end_value in &run_ends.values()[start_physical_index..end_physical_index] {
743 builder.append(run_end_value.sub_wrapping(offset));
744 }
745 builder.append(R::Native::from_usize(run_array.len()).unwrap());
746 let new_run_ends = unsafe {
747 ArrayDataBuilder::new(R::DATA_TYPE)
750 .len(physical_length)
751 .add_buffer(builder.finish())
752 .build_unchecked()
753 };
754
755 let new_values = run_array
757 .values()
758 .slice(start_physical_index, physical_length)
759 .into_data();
760
761 let builder = ArrayDataBuilder::new(run_array.data_type().clone())
762 .len(run_array.len())
763 .add_child_data(new_run_ends)
764 .add_child_data(new_values);
765 let array_data = unsafe {
766 builder.build_unchecked()
769 };
770 Ok(array_data.into())
771}
772
773#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
775pub enum DictionaryHandling {
776 #[default]
778 Resend,
779 Delta,
785}
786
787#[derive(Debug, Clone)]
789pub enum DictionaryUpdate {
790 None,
793 New,
795 Replaced,
797 Delta(ArrayData),
799}
800
801#[derive(Debug)]
807pub struct DictionaryTracker {
808 written: HashMap<i64, ArrayData>,
809 dict_ids: Vec<i64>,
810 error_on_replacement: bool,
811}
812
813impl DictionaryTracker {
814 pub fn new(error_on_replacement: bool) -> Self {
820 #[allow(deprecated)]
821 Self {
822 written: HashMap::new(),
823 dict_ids: Vec::new(),
824 error_on_replacement,
825 }
826 }
827
828 pub fn next_dict_id(&mut self) -> i64 {
830 let next = self
831 .dict_ids
832 .last()
833 .copied()
834 .map(|i| i + 1)
835 .unwrap_or_default();
836
837 self.dict_ids.push(next);
838 next
839 }
840
841 pub fn dict_id(&mut self) -> &[i64] {
844 &self.dict_ids
845 }
846
847 #[deprecated(since = "56.1.0", note = "Use `insert_column` instead")]
857 pub fn insert(&mut self, dict_id: i64, column: &ArrayRef) -> Result<bool, ArrowError> {
858 let dict_data = column.to_data();
859 let dict_values = &dict_data.child_data()[0];
860
861 if let Some(last) = self.written.get(&dict_id) {
863 if ArrayData::ptr_eq(&last.child_data()[0], dict_values) {
864 return Ok(false);
866 }
867 if self.error_on_replacement {
868 if last.child_data()[0] == *dict_values {
870 return Ok(false);
872 }
873 return Err(ArrowError::InvalidArgumentError(
874 "Dictionary replacement detected when writing IPC file format. \
875 Arrow IPC files only support a single dictionary for a given field \
876 across all batches."
877 .to_string(),
878 ));
879 }
880 }
881
882 self.written.insert(dict_id, dict_data);
883 Ok(true)
884 }
885
886 pub fn insert_column(
902 &mut self,
903 dict_id: i64,
904 column: &ArrayRef,
905 dict_handling: DictionaryHandling,
906 ) -> Result<DictionaryUpdate, ArrowError> {
907 let new_data = column.to_data();
908 let new_values = &new_data.child_data()[0];
909
910 let Some(old) = self.written.get(&dict_id) else {
912 self.written.insert(dict_id, new_data);
913 return Ok(DictionaryUpdate::New);
914 };
915
916 let old_values = &old.child_data()[0];
919 if ArrayData::ptr_eq(old_values, new_values) {
920 return Ok(DictionaryUpdate::None);
921 }
922
923 let comparison = compare_dictionaries(old_values, new_values);
925 if matches!(comparison, DictionaryComparison::Equal) {
926 return Ok(DictionaryUpdate::None);
927 }
928
929 const REPLACEMENT_ERROR: &str = "Dictionary replacement detected when writing IPC file format. \
930 Arrow IPC files only support a single dictionary for a given field \
931 across all batches.";
932
933 match comparison {
934 DictionaryComparison::NotEqual => {
935 if self.error_on_replacement {
936 return Err(ArrowError::InvalidArgumentError(
937 REPLACEMENT_ERROR.to_string(),
938 ));
939 }
940
941 self.written.insert(dict_id, new_data);
942 Ok(DictionaryUpdate::Replaced)
943 }
944 DictionaryComparison::Delta => match dict_handling {
945 DictionaryHandling::Resend => {
946 if self.error_on_replacement {
947 return Err(ArrowError::InvalidArgumentError(
948 REPLACEMENT_ERROR.to_string(),
949 ));
950 }
951
952 self.written.insert(dict_id, new_data);
953 Ok(DictionaryUpdate::Replaced)
954 }
955 DictionaryHandling::Delta => {
956 let delta =
957 new_values.slice(old_values.len(), new_values.len() - old_values.len());
958 self.written.insert(dict_id, new_data);
959 Ok(DictionaryUpdate::Delta(delta))
960 }
961 },
962 DictionaryComparison::Equal => unreachable!("Already checked equal case"),
963 }
964 }
965}
966
967#[derive(Debug, Clone)]
969enum DictionaryComparison {
970 NotEqual,
972 Equal,
974 Delta,
977}
978
979fn compare_dictionaries(old: &ArrayData, new: &ArrayData) -> DictionaryComparison {
981 let existing_len = old.len();
983 let new_len = new.len();
984 if existing_len == new_len {
985 if *old == *new {
986 return DictionaryComparison::Equal;
987 } else {
988 return DictionaryComparison::NotEqual;
989 }
990 }
991
992 if new_len < existing_len {
994 return DictionaryComparison::NotEqual;
995 }
996
997 if new.slice(0, existing_len) == *old {
999 return DictionaryComparison::Delta;
1000 }
1001
1002 DictionaryComparison::NotEqual
1003}
1004
1005pub struct FileWriter<W> {
1028 writer: W,
1030 write_options: IpcWriteOptions,
1032 schema: SchemaRef,
1034 block_offsets: usize,
1036 dictionary_blocks: Vec<crate::Block>,
1038 record_blocks: Vec<crate::Block>,
1040 finished: bool,
1042 dictionary_tracker: DictionaryTracker,
1044 custom_metadata: HashMap<String, String>,
1046
1047 data_gen: IpcDataGenerator,
1048
1049 compression_context: CompressionContext,
1050}
1051
1052impl<W: Write> FileWriter<BufWriter<W>> {
1053 pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1057 Self::try_new(BufWriter::new(writer), schema)
1058 }
1059}
1060
1061impl<W: Write> FileWriter<W> {
1062 pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1070 let write_options = IpcWriteOptions::default();
1071 Self::try_new_with_options(writer, schema, write_options)
1072 }
1073
1074 pub fn try_new_with_options(
1082 mut writer: W,
1083 schema: &Schema,
1084 write_options: IpcWriteOptions,
1085 ) -> Result<Self, ArrowError> {
1086 let data_gen = IpcDataGenerator::default();
1087 let pad_len = pad_to_alignment(write_options.alignment, super::ARROW_MAGIC.len());
1089 let header_size = super::ARROW_MAGIC.len() + pad_len;
1090 writer.write_all(&super::ARROW_MAGIC)?;
1091 writer.write_all(&PADDING[..pad_len])?;
1092 let mut dictionary_tracker = DictionaryTracker::new(true);
1094 let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker(
1095 schema,
1096 &mut dictionary_tracker,
1097 &write_options,
1098 );
1099 let (meta, data) = write_message(&mut writer, encoded_message, &write_options)?;
1100 Ok(Self {
1101 writer,
1102 write_options,
1103 schema: Arc::new(schema.clone()),
1104 block_offsets: meta + data + header_size,
1105 dictionary_blocks: vec![],
1106 record_blocks: vec![],
1107 finished: false,
1108 dictionary_tracker,
1109 custom_metadata: HashMap::new(),
1110 data_gen,
1111 compression_context: CompressionContext::default(),
1112 })
1113 }
1114
1115 pub fn write_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
1117 self.custom_metadata.insert(key.into(), value.into());
1118 }
1119
1120 pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1122 if self.finished {
1123 return Err(ArrowError::IpcError(
1124 "Cannot write record batch to file writer as it is closed".to_string(),
1125 ));
1126 }
1127
1128 let (encoded_dictionaries, encoded_message) = self.data_gen.encode(
1129 batch,
1130 &mut self.dictionary_tracker,
1131 &self.write_options,
1132 &mut self.compression_context,
1133 )?;
1134
1135 for encoded_dictionary in encoded_dictionaries {
1136 let (meta, data) =
1137 write_message(&mut self.writer, encoded_dictionary, &self.write_options)?;
1138
1139 let block = crate::Block::new(self.block_offsets as i64, meta as i32, data as i64);
1140 self.dictionary_blocks.push(block);
1141 self.block_offsets += meta + data;
1142 }
1143
1144 let (meta, data) = write_message(&mut self.writer, encoded_message, &self.write_options)?;
1145
1146 let block = crate::Block::new(
1148 self.block_offsets as i64,
1149 meta as i32, data as i64,
1151 );
1152 self.record_blocks.push(block);
1153 self.block_offsets += meta + data;
1154 Ok(())
1155 }
1156
1157 pub fn finish(&mut self) -> Result<(), ArrowError> {
1159 if self.finished {
1160 return Err(ArrowError::IpcError(
1161 "Cannot write footer to file writer as it is closed".to_string(),
1162 ));
1163 }
1164
1165 write_continuation(&mut self.writer, &self.write_options, 0)?;
1167
1168 let mut fbb = FlatBufferBuilder::new();
1169 let dictionaries = fbb.create_vector(&self.dictionary_blocks);
1170 let record_batches = fbb.create_vector(&self.record_blocks);
1171 let mut dictionary_tracker = DictionaryTracker::new(true);
1172 let schema = IpcSchemaEncoder::new()
1173 .with_dictionary_tracker(&mut dictionary_tracker)
1174 .schema_to_fb_offset(&mut fbb, &self.schema);
1175 let fb_custom_metadata = (!self.custom_metadata.is_empty())
1176 .then(|| crate::convert::metadata_to_fb(&mut fbb, &self.custom_metadata));
1177
1178 let root = {
1179 let mut footer_builder = crate::FooterBuilder::new(&mut fbb);
1180 footer_builder.add_version(self.write_options.metadata_version);
1181 footer_builder.add_schema(schema);
1182 footer_builder.add_dictionaries(dictionaries);
1183 footer_builder.add_recordBatches(record_batches);
1184 if let Some(fb_custom_metadata) = fb_custom_metadata {
1185 footer_builder.add_custom_metadata(fb_custom_metadata);
1186 }
1187 footer_builder.finish()
1188 };
1189 fbb.finish(root, None);
1190 let footer_data = fbb.finished_data();
1191 self.writer.write_all(footer_data)?;
1192 self.writer
1193 .write_all(&(footer_data.len() as i32).to_le_bytes())?;
1194 self.writer.write_all(&super::ARROW_MAGIC)?;
1195 self.writer.flush()?;
1196 self.finished = true;
1197
1198 Ok(())
1199 }
1200
1201 pub fn schema(&self) -> &SchemaRef {
1203 &self.schema
1204 }
1205
1206 pub fn get_ref(&self) -> &W {
1208 &self.writer
1209 }
1210
1211 pub fn get_mut(&mut self) -> &mut W {
1215 &mut self.writer
1216 }
1217
1218 pub fn flush(&mut self) -> Result<(), ArrowError> {
1222 self.writer.flush()?;
1223 Ok(())
1224 }
1225
1226 pub fn into_inner(mut self) -> Result<W, ArrowError> {
1235 if !self.finished {
1236 self.finish()?;
1238 }
1239 Ok(self.writer)
1240 }
1241}
1242
1243impl<W: Write> RecordBatchWriter for FileWriter<W> {
1244 fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1245 self.write(batch)
1246 }
1247
1248 fn close(mut self) -> Result<(), ArrowError> {
1249 self.finish()
1250 }
1251}
1252
1253pub struct StreamWriter<W> {
1327 writer: W,
1329 write_options: IpcWriteOptions,
1331 finished: bool,
1333 dictionary_tracker: DictionaryTracker,
1335
1336 data_gen: IpcDataGenerator,
1337
1338 compression_context: CompressionContext,
1339}
1340
1341impl<W: Write> StreamWriter<BufWriter<W>> {
1342 pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1346 Self::try_new(BufWriter::new(writer), schema)
1347 }
1348}
1349
1350impl<W: Write> StreamWriter<W> {
1351 pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1359 let write_options = IpcWriteOptions::default();
1360 Self::try_new_with_options(writer, schema, write_options)
1361 }
1362
1363 pub fn try_new_with_options(
1369 mut writer: W,
1370 schema: &Schema,
1371 write_options: IpcWriteOptions,
1372 ) -> Result<Self, ArrowError> {
1373 let data_gen = IpcDataGenerator::default();
1374 let mut dictionary_tracker = DictionaryTracker::new(false);
1375
1376 let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker(
1378 schema,
1379 &mut dictionary_tracker,
1380 &write_options,
1381 );
1382 write_message(&mut writer, encoded_message, &write_options)?;
1383 Ok(Self {
1384 writer,
1385 write_options,
1386 finished: false,
1387 dictionary_tracker,
1388 data_gen,
1389 compression_context: CompressionContext::default(),
1390 })
1391 }
1392
1393 pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1395 if self.finished {
1396 return Err(ArrowError::IpcError(
1397 "Cannot write record batch to stream writer as it is closed".to_string(),
1398 ));
1399 }
1400
1401 let (encoded_dictionaries, encoded_message) = self
1402 .data_gen
1403 .encode(
1404 batch,
1405 &mut self.dictionary_tracker,
1406 &self.write_options,
1407 &mut self.compression_context,
1408 )
1409 .expect("StreamWriter is configured to not error on dictionary replacement");
1410
1411 for encoded_dictionary in encoded_dictionaries {
1412 write_message(&mut self.writer, encoded_dictionary, &self.write_options)?;
1413 }
1414
1415 write_message(&mut self.writer, encoded_message, &self.write_options)?;
1416 Ok(())
1417 }
1418
1419 pub fn finish(&mut self) -> Result<(), ArrowError> {
1421 if self.finished {
1422 return Err(ArrowError::IpcError(
1423 "Cannot write footer to stream writer as it is closed".to_string(),
1424 ));
1425 }
1426
1427 write_continuation(&mut self.writer, &self.write_options, 0)?;
1428
1429 self.finished = true;
1430
1431 Ok(())
1432 }
1433
1434 pub fn get_ref(&self) -> &W {
1436 &self.writer
1437 }
1438
1439 pub fn get_mut(&mut self) -> &mut W {
1443 &mut self.writer
1444 }
1445
1446 pub fn flush(&mut self) -> Result<(), ArrowError> {
1450 self.writer.flush()?;
1451 Ok(())
1452 }
1453
1454 pub fn into_inner(mut self) -> Result<W, ArrowError> {
1492 if !self.finished {
1493 self.finish()?;
1495 }
1496 Ok(self.writer)
1497 }
1498}
1499
1500impl<W: Write> RecordBatchWriter for StreamWriter<W> {
1501 fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1502 self.write(batch)
1503 }
1504
1505 fn close(mut self) -> Result<(), ArrowError> {
1506 self.finish()
1507 }
1508}
1509
1510pub struct EncodedData {
1512 pub ipc_message: Vec<u8>,
1514 pub arrow_data: Vec<u8>,
1516}
1517pub fn write_message<W: Write>(
1519 mut writer: W,
1520 encoded: EncodedData,
1521 write_options: &IpcWriteOptions,
1522) -> Result<(usize, usize), ArrowError> {
1523 let arrow_data_len = encoded.arrow_data.len();
1524 if arrow_data_len % usize::from(write_options.alignment) != 0 {
1525 return Err(ArrowError::MemoryError(
1526 "Arrow data not aligned".to_string(),
1527 ));
1528 }
1529
1530 let a = usize::from(write_options.alignment - 1);
1531 let buffer = encoded.ipc_message;
1532 let flatbuf_size = buffer.len();
1533 let prefix_size = if write_options.write_legacy_ipc_format {
1534 4
1535 } else {
1536 8
1537 };
1538 let aligned_size = (flatbuf_size + prefix_size + a) & !a;
1539 let padding_bytes = aligned_size - flatbuf_size - prefix_size;
1540
1541 write_continuation(
1542 &mut writer,
1543 write_options,
1544 (aligned_size - prefix_size) as i32,
1545 )?;
1546
1547 if flatbuf_size > 0 {
1549 writer.write_all(&buffer)?;
1550 }
1551 writer.write_all(&PADDING[..padding_bytes])?;
1553
1554 let body_len = if arrow_data_len > 0 {
1556 write_body_buffers(&mut writer, &encoded.arrow_data, write_options.alignment)?
1557 } else {
1558 0
1559 };
1560
1561 Ok((aligned_size, body_len))
1562}
1563
1564fn write_body_buffers<W: Write>(
1565 mut writer: W,
1566 data: &[u8],
1567 alignment: u8,
1568) -> Result<usize, ArrowError> {
1569 let len = data.len();
1570 let pad_len = pad_to_alignment(alignment, len);
1571 let total_len = len + pad_len;
1572
1573 writer.write_all(data)?;
1575 if pad_len > 0 {
1576 writer.write_all(&PADDING[..pad_len])?;
1577 }
1578
1579 writer.flush()?;
1580 Ok(total_len)
1581}
1582
1583fn write_continuation<W: Write>(
1586 mut writer: W,
1587 write_options: &IpcWriteOptions,
1588 total_len: i32,
1589) -> Result<usize, ArrowError> {
1590 let mut written = 8;
1591
1592 match write_options.metadata_version {
1594 crate::MetadataVersion::V1 | crate::MetadataVersion::V2 | crate::MetadataVersion::V3 => {
1595 unreachable!("Options with the metadata version cannot be created")
1596 }
1597 crate::MetadataVersion::V4 => {
1598 if !write_options.write_legacy_ipc_format {
1599 writer.write_all(&CONTINUATION_MARKER)?;
1601 written = 4;
1602 }
1603 writer.write_all(&total_len.to_le_bytes()[..])?;
1604 }
1605 crate::MetadataVersion::V5 => {
1606 writer.write_all(&CONTINUATION_MARKER)?;
1608 writer.write_all(&total_len.to_le_bytes()[..])?;
1609 }
1610 z => panic!("Unsupported crate::MetadataVersion {z:?}"),
1611 };
1612
1613 writer.flush()?;
1614
1615 Ok(written)
1616}
1617
1618fn has_validity_bitmap(data_type: &DataType, write_options: &IpcWriteOptions) -> bool {
1622 if write_options.metadata_version < crate::MetadataVersion::V5 {
1623 !matches!(data_type, DataType::Null)
1624 } else {
1625 !matches!(
1626 data_type,
1627 DataType::Null | DataType::Union(_, _) | DataType::RunEndEncoded(_, _)
1628 )
1629 }
1630}
1631
1632#[inline]
1634fn buffer_need_truncate(
1635 array_offset: usize,
1636 buffer: &Buffer,
1637 spec: &BufferSpec,
1638 min_length: usize,
1639) -> bool {
1640 spec != &BufferSpec::AlwaysNull && (array_offset != 0 || min_length < buffer.len())
1641}
1642
1643#[inline]
1645fn get_buffer_element_width(spec: &BufferSpec) -> usize {
1646 match spec {
1647 BufferSpec::FixedWidth { byte_width, .. } => *byte_width,
1648 _ => 0,
1649 }
1650}
1651
1652fn reencode_offsets<O: OffsetSizeTrait>(
1655 offsets: &Buffer,
1656 data: &ArrayData,
1657) -> (Buffer, usize, usize) {
1658 let offsets_slice: &[O] = offsets.typed_data::<O>();
1659 let offset_slice = &offsets_slice[data.offset()..data.offset() + data.len() + 1];
1660
1661 let start_offset = offset_slice.first().unwrap();
1662 let end_offset = offset_slice.last().unwrap();
1663
1664 let offsets = match start_offset.as_usize() {
1665 0 => {
1666 let size = size_of::<O>();
1667 offsets.slice_with_length(data.offset() * size, (data.len() + 1) * size)
1668 }
1669 _ => offset_slice.iter().map(|x| *x - *start_offset).collect(),
1670 };
1671
1672 let start_offset = start_offset.as_usize();
1673 let end_offset = end_offset.as_usize();
1674
1675 (offsets, start_offset, end_offset - start_offset)
1676}
1677
1678fn get_byte_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, Buffer) {
1684 if data.is_empty() {
1685 return (MutableBuffer::new(0).into(), MutableBuffer::new(0).into());
1686 }
1687
1688 let (offsets, original_start_offset, len) = reencode_offsets::<O>(&data.buffers()[0], data);
1689 let values = data.buffers()[1].slice_with_length(original_start_offset, len);
1690 (offsets, values)
1691}
1692
1693fn get_list_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, ArrayData) {
1696 if data.is_empty() {
1697 return (
1698 MutableBuffer::new(0).into(),
1699 data.child_data()[0].slice(0, 0),
1700 );
1701 }
1702
1703 let (offsets, original_start_offset, len) = reencode_offsets::<O>(&data.buffers()[0], data);
1704 let child_data = data.child_data()[0].slice(original_start_offset, len);
1705 (offsets, child_data)
1706}
1707
1708fn get_list_view_array_buffers<O: OffsetSizeTrait>(
1714 data: &ArrayData,
1715) -> (Buffer, Buffer, ArrayData) {
1716 if data.is_empty() {
1717 return (
1718 MutableBuffer::new(0).into(),
1719 MutableBuffer::new(0).into(),
1720 data.child_data()[0].slice(0, 0),
1721 );
1722 }
1723
1724 let offsets = &data.buffers()[0];
1725 let sizes = &data.buffers()[1];
1726
1727 let element_size = std::mem::size_of::<O>();
1728 let offsets_slice =
1729 offsets.slice_with_length(data.offset() * element_size, data.len() * element_size);
1730 let sizes_slice =
1731 sizes.slice_with_length(data.offset() * element_size, data.len() * element_size);
1732
1733 let child_data = data.child_data()[0].clone();
1734
1735 (offsets_slice, sizes_slice, child_data)
1736}
1737
1738fn get_or_truncate_buffer(array_data: &ArrayData) -> &[u8] {
1745 let buffer = &array_data.buffers()[0];
1746 let layout = layout(array_data.data_type());
1747 let spec = &layout.buffers[0];
1748
1749 let byte_width = get_buffer_element_width(spec);
1750 let min_length = array_data.len() * byte_width;
1751 if buffer_need_truncate(array_data.offset(), buffer, spec, min_length) {
1752 let byte_offset = array_data.offset() * byte_width;
1753 let buffer_length = min(min_length, buffer.len() - byte_offset);
1754 &buffer.as_slice()[byte_offset..(byte_offset + buffer_length)]
1755 } else {
1756 buffer.as_slice()
1757 }
1758}
1759
1760#[allow(clippy::too_many_arguments)]
1762fn write_array_data(
1763 array_data: &ArrayData,
1764 buffers: &mut Vec<crate::Buffer>,
1765 arrow_data: &mut Vec<u8>,
1766 nodes: &mut Vec<crate::FieldNode>,
1767 offset: i64,
1768 num_rows: usize,
1769 null_count: usize,
1770 compression_codec: Option<CompressionCodec>,
1771 compression_context: &mut CompressionContext,
1772 write_options: &IpcWriteOptions,
1773) -> Result<i64, ArrowError> {
1774 let mut offset = offset;
1775 if !matches!(array_data.data_type(), DataType::Null) {
1776 nodes.push(crate::FieldNode::new(num_rows as i64, null_count as i64));
1777 } else {
1778 nodes.push(crate::FieldNode::new(num_rows as i64, num_rows as i64));
1781 }
1782 if has_validity_bitmap(array_data.data_type(), write_options) {
1783 let null_buffer = match array_data.nulls() {
1785 None => {
1786 let num_bytes = bit_util::ceil(num_rows, 8);
1788 let buffer = MutableBuffer::new(num_bytes);
1789 let buffer = buffer.with_bitset(num_bytes, true);
1790 buffer.into()
1791 }
1792 Some(buffer) => buffer.inner().sliced(),
1793 };
1794
1795 offset = write_buffer(
1796 null_buffer.as_slice(),
1797 buffers,
1798 arrow_data,
1799 offset,
1800 compression_codec,
1801 compression_context,
1802 write_options.alignment,
1803 )?;
1804 }
1805
1806 let data_type = array_data.data_type();
1807 if matches!(data_type, DataType::Binary | DataType::Utf8) {
1808 let (offsets, values) = get_byte_array_buffers::<i32>(array_data);
1809 for buffer in [offsets, values] {
1810 offset = write_buffer(
1811 buffer.as_slice(),
1812 buffers,
1813 arrow_data,
1814 offset,
1815 compression_codec,
1816 compression_context,
1817 write_options.alignment,
1818 )?;
1819 }
1820 } else if matches!(data_type, DataType::BinaryView | DataType::Utf8View) {
1821 let views = get_or_truncate_buffer(array_data);
1828 offset = write_buffer(
1829 views,
1830 buffers,
1831 arrow_data,
1832 offset,
1833 compression_codec,
1834 compression_context,
1835 write_options.alignment,
1836 )?;
1837
1838 for buffer in array_data.buffers().iter().skip(1) {
1839 offset = write_buffer(
1840 buffer.as_slice(),
1841 buffers,
1842 arrow_data,
1843 offset,
1844 compression_codec,
1845 compression_context,
1846 write_options.alignment,
1847 )?;
1848 }
1849 } else if matches!(data_type, DataType::LargeBinary | DataType::LargeUtf8) {
1850 let (offsets, values) = get_byte_array_buffers::<i64>(array_data);
1851 for buffer in [offsets, values] {
1852 offset = write_buffer(
1853 buffer.as_slice(),
1854 buffers,
1855 arrow_data,
1856 offset,
1857 compression_codec,
1858 compression_context,
1859 write_options.alignment,
1860 )?;
1861 }
1862 } else if DataType::is_numeric(data_type)
1863 || DataType::is_temporal(data_type)
1864 || matches!(
1865 array_data.data_type(),
1866 DataType::FixedSizeBinary(_) | DataType::Dictionary(_, _)
1867 )
1868 {
1869 assert_eq!(array_data.buffers().len(), 1);
1871
1872 let buffer = get_or_truncate_buffer(array_data);
1873 offset = write_buffer(
1874 buffer,
1875 buffers,
1876 arrow_data,
1877 offset,
1878 compression_codec,
1879 compression_context,
1880 write_options.alignment,
1881 )?;
1882 } else if matches!(data_type, DataType::Boolean) {
1883 assert_eq!(array_data.buffers().len(), 1);
1886
1887 let buffer = &array_data.buffers()[0];
1888 let buffer = buffer.bit_slice(array_data.offset(), array_data.len());
1889 offset = write_buffer(
1890 &buffer,
1891 buffers,
1892 arrow_data,
1893 offset,
1894 compression_codec,
1895 compression_context,
1896 write_options.alignment,
1897 )?;
1898 } else if matches!(
1899 data_type,
1900 DataType::List(_) | DataType::LargeList(_) | DataType::Map(_, _)
1901 ) {
1902 assert_eq!(array_data.buffers().len(), 1);
1903 assert_eq!(array_data.child_data().len(), 1);
1904
1905 let (offsets, sliced_child_data) = match data_type {
1907 DataType::List(_) => get_list_array_buffers::<i32>(array_data),
1908 DataType::Map(_, _) => get_list_array_buffers::<i32>(array_data),
1909 DataType::LargeList(_) => get_list_array_buffers::<i64>(array_data),
1910 _ => unreachable!(),
1911 };
1912 offset = write_buffer(
1913 offsets.as_slice(),
1914 buffers,
1915 arrow_data,
1916 offset,
1917 compression_codec,
1918 compression_context,
1919 write_options.alignment,
1920 )?;
1921 offset = write_array_data(
1922 &sliced_child_data,
1923 buffers,
1924 arrow_data,
1925 nodes,
1926 offset,
1927 sliced_child_data.len(),
1928 sliced_child_data.null_count(),
1929 compression_codec,
1930 compression_context,
1931 write_options,
1932 )?;
1933 return Ok(offset);
1934 } else if matches!(
1935 data_type,
1936 DataType::ListView(_) | DataType::LargeListView(_)
1937 ) {
1938 assert_eq!(array_data.buffers().len(), 2); assert_eq!(array_data.child_data().len(), 1);
1940
1941 let (offsets, sizes, child_data) = match data_type {
1942 DataType::ListView(_) => get_list_view_array_buffers::<i32>(array_data),
1943 DataType::LargeListView(_) => get_list_view_array_buffers::<i64>(array_data),
1944 _ => unreachable!(),
1945 };
1946
1947 offset = write_buffer(
1948 offsets.as_slice(),
1949 buffers,
1950 arrow_data,
1951 offset,
1952 compression_codec,
1953 compression_context,
1954 write_options.alignment,
1955 )?;
1956
1957 offset = write_buffer(
1958 sizes.as_slice(),
1959 buffers,
1960 arrow_data,
1961 offset,
1962 compression_codec,
1963 compression_context,
1964 write_options.alignment,
1965 )?;
1966
1967 offset = write_array_data(
1968 &child_data,
1969 buffers,
1970 arrow_data,
1971 nodes,
1972 offset,
1973 child_data.len(),
1974 child_data.null_count(),
1975 compression_codec,
1976 compression_context,
1977 write_options,
1978 )?;
1979 return Ok(offset);
1980 } else if let DataType::FixedSizeList(_, fixed_size) = data_type {
1981 assert_eq!(array_data.child_data().len(), 1);
1982 let fixed_size = *fixed_size as usize;
1983
1984 let child_offset = array_data.offset() * fixed_size;
1985 let child_length = array_data.len() * fixed_size;
1986 let child_data = array_data.child_data()[0].slice(child_offset, child_length);
1987
1988 offset = write_array_data(
1989 &child_data,
1990 buffers,
1991 arrow_data,
1992 nodes,
1993 offset,
1994 child_data.len(),
1995 child_data.null_count(),
1996 compression_codec,
1997 compression_context,
1998 write_options,
1999 )?;
2000 return Ok(offset);
2001 } else {
2002 for buffer in array_data.buffers() {
2003 offset = write_buffer(
2004 buffer,
2005 buffers,
2006 arrow_data,
2007 offset,
2008 compression_codec,
2009 compression_context,
2010 write_options.alignment,
2011 )?;
2012 }
2013 }
2014
2015 match array_data.data_type() {
2016 DataType::Dictionary(_, _) => {}
2017 DataType::RunEndEncoded(_, _) => {
2018 let arr = unslice_run_array(array_data.clone())?;
2020 for data_ref in arr.child_data() {
2022 offset = write_array_data(
2024 data_ref,
2025 buffers,
2026 arrow_data,
2027 nodes,
2028 offset,
2029 data_ref.len(),
2030 data_ref.null_count(),
2031 compression_codec,
2032 compression_context,
2033 write_options,
2034 )?;
2035 }
2036 }
2037 _ => {
2038 for data_ref in array_data.child_data() {
2040 offset = write_array_data(
2042 data_ref,
2043 buffers,
2044 arrow_data,
2045 nodes,
2046 offset,
2047 data_ref.len(),
2048 data_ref.null_count(),
2049 compression_codec,
2050 compression_context,
2051 write_options,
2052 )?;
2053 }
2054 }
2055 }
2056 Ok(offset)
2057}
2058
2059fn write_buffer(
2072 buffer: &[u8], buffers: &mut Vec<crate::Buffer>, arrow_data: &mut Vec<u8>, offset: i64, compression_codec: Option<CompressionCodec>,
2077 compression_context: &mut CompressionContext,
2078 alignment: u8,
2079) -> Result<i64, ArrowError> {
2080 let len: i64 = match compression_codec {
2081 Some(compressor) => compressor.compress_to_vec(buffer, arrow_data, compression_context)?,
2082 None => {
2083 arrow_data.extend_from_slice(buffer);
2084 buffer.len()
2085 }
2086 }
2087 .try_into()
2088 .map_err(|e| {
2089 ArrowError::InvalidArgumentError(format!("Could not convert compressed size to i64: {e}"))
2090 })?;
2091
2092 buffers.push(crate::Buffer::new(offset, len));
2094 let pad_len = pad_to_alignment(alignment, len as usize);
2096 arrow_data.extend_from_slice(&PADDING[..pad_len]);
2097
2098 Ok(offset + len + (pad_len as i64))
2099}
2100
2101const PADDING: [u8; 64] = [0; 64];
2102
2103#[inline]
2105fn pad_to_alignment(alignment: u8, len: usize) -> usize {
2106 let a = usize::from(alignment - 1);
2107 ((len + a) & !a) - len
2108}
2109
2110#[cfg(test)]
2111mod tests {
2112 use std::hash::Hasher;
2113 use std::io::Cursor;
2114 use std::io::Seek;
2115
2116 use arrow_array::builder::FixedSizeListBuilder;
2117 use arrow_array::builder::Float32Builder;
2118 use arrow_array::builder::Int64Builder;
2119 use arrow_array::builder::MapBuilder;
2120 use arrow_array::builder::StringViewBuilder;
2121 use arrow_array::builder::UnionBuilder;
2122 use arrow_array::builder::{
2123 GenericListBuilder, GenericListViewBuilder, ListBuilder, StringBuilder,
2124 };
2125 use arrow_array::builder::{PrimitiveRunBuilder, UInt32Builder};
2126 use arrow_array::types::*;
2127 use arrow_buffer::ScalarBuffer;
2128
2129 use crate::MetadataVersion;
2130 use crate::convert::fb_to_schema;
2131 use crate::reader::*;
2132 use crate::root_as_footer;
2133
2134 use super::*;
2135
2136 fn serialize_file(rb: &RecordBatch) -> Vec<u8> {
2137 let mut writer = FileWriter::try_new(vec![], rb.schema_ref()).unwrap();
2138 writer.write(rb).unwrap();
2139 writer.finish().unwrap();
2140 writer.into_inner().unwrap()
2141 }
2142
2143 fn deserialize_file(bytes: Vec<u8>) -> RecordBatch {
2144 let mut reader = FileReader::try_new(Cursor::new(bytes), None).unwrap();
2145 reader.next().unwrap().unwrap()
2146 }
2147
2148 fn serialize_stream(record: &RecordBatch) -> Vec<u8> {
2149 const IPC_ALIGNMENT: usize = 8;
2153
2154 let mut stream_writer = StreamWriter::try_new_with_options(
2155 vec![],
2156 record.schema_ref(),
2157 IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
2158 )
2159 .unwrap();
2160 stream_writer.write(record).unwrap();
2161 stream_writer.finish().unwrap();
2162 stream_writer.into_inner().unwrap()
2163 }
2164
2165 fn deserialize_stream(bytes: Vec<u8>) -> RecordBatch {
2166 let mut stream_reader = StreamReader::try_new(Cursor::new(bytes), None).unwrap();
2167 stream_reader.next().unwrap().unwrap()
2168 }
2169
2170 #[test]
2171 #[cfg(feature = "lz4")]
2172 fn test_write_empty_record_batch_lz4_compression() {
2173 let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
2174 let values: Vec<Option<i32>> = vec![];
2175 let array = Int32Array::from(values);
2176 let record_batch =
2177 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2178
2179 let mut file = tempfile::tempfile().unwrap();
2180
2181 {
2182 let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2183 .unwrap()
2184 .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
2185 .unwrap();
2186
2187 let mut writer =
2188 FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2189 writer.write(&record_batch).unwrap();
2190 writer.finish().unwrap();
2191 }
2192 file.rewind().unwrap();
2193 {
2194 let reader = FileReader::try_new(file, None).unwrap();
2196 for read_batch in reader {
2197 read_batch
2198 .unwrap()
2199 .columns()
2200 .iter()
2201 .zip(record_batch.columns())
2202 .for_each(|(a, b)| {
2203 assert_eq!(a.data_type(), b.data_type());
2204 assert_eq!(a.len(), b.len());
2205 assert_eq!(a.null_count(), b.null_count());
2206 });
2207 }
2208 }
2209 }
2210
2211 #[test]
2212 #[cfg(feature = "lz4")]
2213 fn test_write_file_with_lz4_compression() {
2214 let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
2215 let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
2216 let array = Int32Array::from(values);
2217 let record_batch =
2218 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2219
2220 let mut file = tempfile::tempfile().unwrap();
2221 {
2222 let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2223 .unwrap()
2224 .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
2225 .unwrap();
2226
2227 let mut writer =
2228 FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2229 writer.write(&record_batch).unwrap();
2230 writer.finish().unwrap();
2231 }
2232 file.rewind().unwrap();
2233 {
2234 let reader = FileReader::try_new(file, None).unwrap();
2236 for read_batch in reader {
2237 read_batch
2238 .unwrap()
2239 .columns()
2240 .iter()
2241 .zip(record_batch.columns())
2242 .for_each(|(a, b)| {
2243 assert_eq!(a.data_type(), b.data_type());
2244 assert_eq!(a.len(), b.len());
2245 assert_eq!(a.null_count(), b.null_count());
2246 });
2247 }
2248 }
2249 }
2250
2251 #[test]
2252 #[cfg(feature = "zstd")]
2253 fn test_write_file_with_zstd_compression() {
2254 let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
2255 let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
2256 let array = Int32Array::from(values);
2257 let record_batch =
2258 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
2259 let mut file = tempfile::tempfile().unwrap();
2260 {
2261 let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
2262 .unwrap()
2263 .try_with_compression(Some(crate::CompressionType::ZSTD))
2264 .unwrap();
2265
2266 let mut writer =
2267 FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
2268 writer.write(&record_batch).unwrap();
2269 writer.finish().unwrap();
2270 }
2271 file.rewind().unwrap();
2272 {
2273 let reader = FileReader::try_new(file, None).unwrap();
2275 for read_batch in reader {
2276 read_batch
2277 .unwrap()
2278 .columns()
2279 .iter()
2280 .zip(record_batch.columns())
2281 .for_each(|(a, b)| {
2282 assert_eq!(a.data_type(), b.data_type());
2283 assert_eq!(a.len(), b.len());
2284 assert_eq!(a.null_count(), b.null_count());
2285 });
2286 }
2287 }
2288 }
2289
2290 #[test]
2291 fn test_write_file() {
2292 let schema = Schema::new(vec![Field::new("field1", DataType::UInt32, true)]);
2293 let values: Vec<Option<u32>> = vec![
2294 Some(999),
2295 None,
2296 Some(235),
2297 Some(123),
2298 None,
2299 None,
2300 None,
2301 None,
2302 None,
2303 ];
2304 let array1 = UInt32Array::from(values);
2305 let batch =
2306 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array1) as ArrayRef])
2307 .unwrap();
2308 let mut file = tempfile::tempfile().unwrap();
2309 {
2310 let mut writer = FileWriter::try_new(&mut file, &schema).unwrap();
2311
2312 writer.write(&batch).unwrap();
2313 writer.finish().unwrap();
2314 }
2315 file.rewind().unwrap();
2316
2317 {
2318 let mut reader = FileReader::try_new(file, None).unwrap();
2319 while let Some(Ok(read_batch)) = reader.next() {
2320 read_batch
2321 .columns()
2322 .iter()
2323 .zip(batch.columns())
2324 .for_each(|(a, b)| {
2325 assert_eq!(a.data_type(), b.data_type());
2326 assert_eq!(a.len(), b.len());
2327 assert_eq!(a.null_count(), b.null_count());
2328 });
2329 }
2330 }
2331 }
2332
2333 fn write_null_file(options: IpcWriteOptions) {
2334 let schema = Schema::new(vec![
2335 Field::new("nulls", DataType::Null, true),
2336 Field::new("int32s", DataType::Int32, false),
2337 Field::new("nulls2", DataType::Null, true),
2338 Field::new("f64s", DataType::Float64, false),
2339 ]);
2340 let array1 = NullArray::new(32);
2341 let array2 = Int32Array::from(vec![1; 32]);
2342 let array3 = NullArray::new(32);
2343 let array4 = Float64Array::from(vec![f64::NAN; 32]);
2344 let batch = RecordBatch::try_new(
2345 Arc::new(schema.clone()),
2346 vec![
2347 Arc::new(array1) as ArrayRef,
2348 Arc::new(array2) as ArrayRef,
2349 Arc::new(array3) as ArrayRef,
2350 Arc::new(array4) as ArrayRef,
2351 ],
2352 )
2353 .unwrap();
2354 let mut file = tempfile::tempfile().unwrap();
2355 {
2356 let mut writer = FileWriter::try_new_with_options(&mut file, &schema, options).unwrap();
2357
2358 writer.write(&batch).unwrap();
2359 writer.finish().unwrap();
2360 }
2361
2362 file.rewind().unwrap();
2363
2364 {
2365 let reader = FileReader::try_new(file, None).unwrap();
2366 reader.for_each(|maybe_batch| {
2367 maybe_batch
2368 .unwrap()
2369 .columns()
2370 .iter()
2371 .zip(batch.columns())
2372 .for_each(|(a, b)| {
2373 assert_eq!(a.data_type(), b.data_type());
2374 assert_eq!(a.len(), b.len());
2375 assert_eq!(a.null_count(), b.null_count());
2376 });
2377 });
2378 }
2379 }
2380 #[test]
2381 fn test_write_null_file_v4() {
2382 write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap());
2383 write_null_file(IpcWriteOptions::try_new(8, true, MetadataVersion::V4).unwrap());
2384 write_null_file(IpcWriteOptions::try_new(64, false, MetadataVersion::V4).unwrap());
2385 write_null_file(IpcWriteOptions::try_new(64, true, MetadataVersion::V4).unwrap());
2386 }
2387
2388 #[test]
2389 fn test_write_null_file_v5() {
2390 write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap());
2391 write_null_file(IpcWriteOptions::try_new(64, false, MetadataVersion::V5).unwrap());
2392 }
2393
2394 #[test]
2395 fn track_union_nested_dict() {
2396 let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2397
2398 let array = Arc::new(inner) as ArrayRef;
2399
2400 #[allow(deprecated)]
2402 let dctfield = Field::new_dict("dict", array.data_type().clone(), false, 0, false);
2403 let union_fields = [(0, Arc::new(dctfield))].into_iter().collect();
2404
2405 let types = [0, 0, 0].into_iter().collect::<ScalarBuffer<i8>>();
2406 let offsets = [0, 1, 2].into_iter().collect::<ScalarBuffer<i32>>();
2407
2408 let union = UnionArray::try_new(union_fields, types, Some(offsets), vec![array]).unwrap();
2409
2410 let schema = Arc::new(Schema::new(vec![Field::new(
2411 "union",
2412 union.data_type().clone(),
2413 false,
2414 )]));
2415
2416 let r#gen = IpcDataGenerator::default();
2417 let mut dict_tracker = DictionaryTracker::new(false);
2418 r#gen.schema_to_bytes_with_dictionary_tracker(
2419 &schema,
2420 &mut dict_tracker,
2421 &IpcWriteOptions::default(),
2422 );
2423
2424 let batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap();
2425
2426 r#gen
2427 .encode(
2428 &batch,
2429 &mut dict_tracker,
2430 &Default::default(),
2431 &mut Default::default(),
2432 )
2433 .unwrap();
2434
2435 assert!(dict_tracker.written.contains_key(&0));
2438 }
2439
2440 #[test]
2441 fn track_struct_nested_dict() {
2442 let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2443
2444 let array = Arc::new(inner) as ArrayRef;
2445
2446 #[allow(deprecated)]
2448 let dctfield = Arc::new(Field::new_dict(
2449 "dict",
2450 array.data_type().clone(),
2451 false,
2452 2,
2453 false,
2454 ));
2455
2456 let s = StructArray::from(vec![(dctfield, array)]);
2457 let struct_array = Arc::new(s) as ArrayRef;
2458
2459 let schema = Arc::new(Schema::new(vec![Field::new(
2460 "struct",
2461 struct_array.data_type().clone(),
2462 false,
2463 )]));
2464
2465 let r#gen = IpcDataGenerator::default();
2466 let mut dict_tracker = DictionaryTracker::new(false);
2467 r#gen.schema_to_bytes_with_dictionary_tracker(
2468 &schema,
2469 &mut dict_tracker,
2470 &IpcWriteOptions::default(),
2471 );
2472
2473 let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
2474
2475 r#gen
2476 .encode(
2477 &batch,
2478 &mut dict_tracker,
2479 &Default::default(),
2480 &mut Default::default(),
2481 )
2482 .unwrap();
2483
2484 assert!(dict_tracker.written.contains_key(&0));
2485 }
2486
2487 fn write_union_file(options: IpcWriteOptions) {
2488 let schema = Schema::new(vec![Field::new_union(
2489 "union",
2490 vec![0, 1],
2491 vec![
2492 Field::new("a", DataType::Int32, false),
2493 Field::new("c", DataType::Float64, false),
2494 ],
2495 UnionMode::Sparse,
2496 )]);
2497 let mut builder = UnionBuilder::with_capacity_sparse(5);
2498 builder.append::<Int32Type>("a", 1).unwrap();
2499 builder.append_null::<Int32Type>("a").unwrap();
2500 builder.append::<Float64Type>("c", 3.0).unwrap();
2501 builder.append_null::<Float64Type>("c").unwrap();
2502 builder.append::<Int32Type>("a", 4).unwrap();
2503 let union = builder.build().unwrap();
2504
2505 let batch =
2506 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(union) as ArrayRef])
2507 .unwrap();
2508
2509 let mut file = tempfile::tempfile().unwrap();
2510 {
2511 let mut writer = FileWriter::try_new_with_options(&mut file, &schema, options).unwrap();
2512
2513 writer.write(&batch).unwrap();
2514 writer.finish().unwrap();
2515 }
2516 file.rewind().unwrap();
2517
2518 {
2519 let reader = FileReader::try_new(file, None).unwrap();
2520 reader.for_each(|maybe_batch| {
2521 maybe_batch
2522 .unwrap()
2523 .columns()
2524 .iter()
2525 .zip(batch.columns())
2526 .for_each(|(a, b)| {
2527 assert_eq!(a.data_type(), b.data_type());
2528 assert_eq!(a.len(), b.len());
2529 assert_eq!(a.null_count(), b.null_count());
2530 });
2531 });
2532 }
2533 }
2534
2535 #[test]
2536 fn test_write_union_file_v4_v5() {
2537 write_union_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap());
2538 write_union_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap());
2539 }
2540
2541 #[test]
2542 fn test_write_view_types() {
2543 const LONG_TEST_STRING: &str =
2544 "This is a long string to make sure binary view array handles it";
2545 let schema = Schema::new(vec![
2546 Field::new("field1", DataType::BinaryView, true),
2547 Field::new("field2", DataType::Utf8View, true),
2548 ]);
2549 let values: Vec<Option<&[u8]>> = vec![
2550 Some(b"foo"),
2551 Some(b"bar"),
2552 Some(LONG_TEST_STRING.as_bytes()),
2553 ];
2554 let binary_array = BinaryViewArray::from_iter(values);
2555 let utf8_array =
2556 StringViewArray::from_iter(vec![Some("foo"), Some("bar"), Some(LONG_TEST_STRING)]);
2557 let record_batch = RecordBatch::try_new(
2558 Arc::new(schema.clone()),
2559 vec![Arc::new(binary_array), Arc::new(utf8_array)],
2560 )
2561 .unwrap();
2562
2563 let mut file = tempfile::tempfile().unwrap();
2564 {
2565 let mut writer = FileWriter::try_new(&mut file, &schema).unwrap();
2566 writer.write(&record_batch).unwrap();
2567 writer.finish().unwrap();
2568 }
2569 file.rewind().unwrap();
2570 {
2571 let mut reader = FileReader::try_new(&file, None).unwrap();
2572 let read_batch = reader.next().unwrap().unwrap();
2573 read_batch
2574 .columns()
2575 .iter()
2576 .zip(record_batch.columns())
2577 .for_each(|(a, b)| {
2578 assert_eq!(a, b);
2579 });
2580 }
2581 file.rewind().unwrap();
2582 {
2583 let mut reader = FileReader::try_new(&file, Some(vec![0])).unwrap();
2584 let read_batch = reader.next().unwrap().unwrap();
2585 assert_eq!(read_batch.num_columns(), 1);
2586 let read_array = read_batch.column(0);
2587 let write_array = record_batch.column(0);
2588 assert_eq!(read_array, write_array);
2589 }
2590 }
2591
2592 #[test]
2593 fn truncate_ipc_record_batch() {
2594 fn create_batch(rows: usize) -> RecordBatch {
2595 let schema = Schema::new(vec![
2596 Field::new("a", DataType::Int32, false),
2597 Field::new("b", DataType::Utf8, false),
2598 ]);
2599
2600 let a = Int32Array::from_iter_values(0..rows as i32);
2601 let b = StringArray::from_iter_values((0..rows).map(|i| i.to_string()));
2602
2603 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap()
2604 }
2605
2606 let big_record_batch = create_batch(65536);
2607
2608 let length = 5;
2609 let small_record_batch = create_batch(length);
2610
2611 let offset = 2;
2612 let record_batch_slice = big_record_batch.slice(offset, length);
2613 assert!(
2614 serialize_stream(&big_record_batch).len() > serialize_stream(&small_record_batch).len()
2615 );
2616 assert_eq!(
2617 serialize_stream(&small_record_batch).len(),
2618 serialize_stream(&record_batch_slice).len()
2619 );
2620
2621 assert_eq!(
2622 deserialize_stream(serialize_stream(&record_batch_slice)),
2623 record_batch_slice
2624 );
2625 }
2626
2627 #[test]
2628 fn truncate_ipc_record_batch_with_nulls() {
2629 fn create_batch() -> RecordBatch {
2630 let schema = Schema::new(vec![
2631 Field::new("a", DataType::Int32, true),
2632 Field::new("b", DataType::Utf8, true),
2633 ]);
2634
2635 let a = Int32Array::from(vec![Some(1), None, Some(1), None, Some(1)]);
2636 let b = StringArray::from(vec![None, Some("a"), Some("a"), None, Some("a")]);
2637
2638 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap()
2639 }
2640
2641 let record_batch = create_batch();
2642 let record_batch_slice = record_batch.slice(1, 2);
2643 let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2644
2645 assert!(
2646 serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2647 );
2648
2649 assert!(deserialized_batch.column(0).is_null(0));
2650 assert!(deserialized_batch.column(0).is_valid(1));
2651 assert!(deserialized_batch.column(1).is_valid(0));
2652 assert!(deserialized_batch.column(1).is_valid(1));
2653
2654 assert_eq!(record_batch_slice, deserialized_batch);
2655 }
2656
2657 #[test]
2658 fn truncate_ipc_dictionary_array() {
2659 fn create_batch() -> RecordBatch {
2660 let values: StringArray = [Some("foo"), Some("bar"), Some("baz")]
2661 .into_iter()
2662 .collect();
2663 let keys: Int32Array = [Some(0), Some(2), None, Some(1)].into_iter().collect();
2664
2665 let array = DictionaryArray::new(keys, Arc::new(values));
2666
2667 let schema = Schema::new(vec![Field::new("dict", array.data_type().clone(), true)]);
2668
2669 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap()
2670 }
2671
2672 let record_batch = create_batch();
2673 let record_batch_slice = record_batch.slice(1, 2);
2674 let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2675
2676 assert!(
2677 serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2678 );
2679
2680 assert!(deserialized_batch.column(0).is_valid(0));
2681 assert!(deserialized_batch.column(0).is_null(1));
2682
2683 assert_eq!(record_batch_slice, deserialized_batch);
2684 }
2685
2686 #[test]
2687 fn truncate_ipc_struct_array() {
2688 fn create_batch() -> RecordBatch {
2689 let strings: StringArray = [Some("foo"), None, Some("bar"), Some("baz")]
2690 .into_iter()
2691 .collect();
2692 let ints: Int32Array = [Some(0), Some(2), None, Some(1)].into_iter().collect();
2693
2694 let struct_array = StructArray::from(vec![
2695 (
2696 Arc::new(Field::new("s", DataType::Utf8, true)),
2697 Arc::new(strings) as ArrayRef,
2698 ),
2699 (
2700 Arc::new(Field::new("c", DataType::Int32, true)),
2701 Arc::new(ints) as ArrayRef,
2702 ),
2703 ]);
2704
2705 let schema = Schema::new(vec![Field::new(
2706 "struct_array",
2707 struct_array.data_type().clone(),
2708 true,
2709 )]);
2710
2711 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(struct_array)]).unwrap()
2712 }
2713
2714 let record_batch = create_batch();
2715 let record_batch_slice = record_batch.slice(1, 2);
2716 let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2717
2718 assert!(
2719 serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2720 );
2721
2722 let structs = deserialized_batch
2723 .column(0)
2724 .as_any()
2725 .downcast_ref::<StructArray>()
2726 .unwrap();
2727
2728 assert!(structs.column(0).is_null(0));
2729 assert!(structs.column(0).is_valid(1));
2730 assert!(structs.column(1).is_valid(0));
2731 assert!(structs.column(1).is_null(1));
2732 assert_eq!(record_batch_slice, deserialized_batch);
2733 }
2734
2735 #[test]
2736 fn truncate_ipc_string_array_with_all_empty_string() {
2737 fn create_batch() -> RecordBatch {
2738 let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
2739 let a = StringArray::from(vec![Some(""), Some(""), Some(""), Some(""), Some("")]);
2740 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap()
2741 }
2742
2743 let record_batch = create_batch();
2744 let record_batch_slice = record_batch.slice(0, 1);
2745 let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2746
2747 assert!(
2748 serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2749 );
2750 assert_eq!(record_batch_slice, deserialized_batch);
2751 }
2752
2753 #[test]
2754 fn test_stream_writer_writes_array_slice() {
2755 let array = UInt32Array::from(vec![Some(1), Some(2), Some(3)]);
2756 assert_eq!(
2757 vec![Some(1), Some(2), Some(3)],
2758 array.iter().collect::<Vec<_>>()
2759 );
2760
2761 let sliced = array.slice(1, 2);
2762 assert_eq!(vec![Some(2), Some(3)], sliced.iter().collect::<Vec<_>>());
2763
2764 let batch = RecordBatch::try_new(
2765 Arc::new(Schema::new(vec![Field::new("a", DataType::UInt32, true)])),
2766 vec![Arc::new(sliced)],
2767 )
2768 .expect("new batch");
2769
2770 let mut writer = StreamWriter::try_new(vec![], batch.schema_ref()).expect("new writer");
2771 writer.write(&batch).expect("write");
2772 let outbuf = writer.into_inner().expect("inner");
2773
2774 let mut reader = StreamReader::try_new(&outbuf[..], None).expect("new reader");
2775 let read_batch = reader.next().unwrap().expect("read batch");
2776
2777 let read_array: &UInt32Array = read_batch.column(0).as_primitive();
2778 assert_eq!(
2779 vec![Some(2), Some(3)],
2780 read_array.iter().collect::<Vec<_>>()
2781 );
2782 }
2783
2784 #[test]
2785 fn test_large_slice_uint32() {
2786 ensure_roundtrip(Arc::new(UInt32Array::from_iter(
2787 (0..8000).map(|i| if i % 2 == 0 { Some(i) } else { None }),
2788 )));
2789 }
2790
2791 #[test]
2792 fn test_large_slice_string() {
2793 let strings: Vec<_> = (0..8000)
2794 .map(|i| {
2795 if i % 2 == 0 {
2796 Some(format!("value{i}"))
2797 } else {
2798 None
2799 }
2800 })
2801 .collect();
2802
2803 ensure_roundtrip(Arc::new(StringArray::from(strings)));
2804 }
2805
2806 #[test]
2807 fn test_large_slice_string_list() {
2808 let mut ls = ListBuilder::new(StringBuilder::new());
2809
2810 let mut s = String::new();
2811 for row_number in 0..8000 {
2812 if row_number % 2 == 0 {
2813 for list_element in 0..1000 {
2814 s.clear();
2815 use std::fmt::Write;
2816 write!(&mut s, "value{row_number}-{list_element}").unwrap();
2817 ls.values().append_value(&s);
2818 }
2819 ls.append(true)
2820 } else {
2821 ls.append(false); }
2823 }
2824
2825 ensure_roundtrip(Arc::new(ls.finish()));
2826 }
2827
2828 #[test]
2829 fn test_large_slice_string_list_of_lists() {
2830 let mut ls = ListBuilder::new(ListBuilder::new(StringBuilder::new()));
2834
2835 for _ in 0..4000 {
2836 ls.values().append(true);
2837 ls.append(true)
2838 }
2839
2840 let mut s = String::new();
2841 for row_number in 0..4000 {
2842 if row_number % 2 == 0 {
2843 for list_element in 0..1000 {
2844 s.clear();
2845 use std::fmt::Write;
2846 write!(&mut s, "value{row_number}-{list_element}").unwrap();
2847 ls.values().values().append_value(&s);
2848 }
2849 ls.values().append(true);
2850 ls.append(true)
2851 } else {
2852 ls.append(false); }
2854 }
2855
2856 ensure_roundtrip(Arc::new(ls.finish()));
2857 }
2858
2859 fn ensure_roundtrip(array: ArrayRef) {
2861 let num_rows = array.len();
2862 let orig_batch = RecordBatch::try_from_iter(vec![("a", array)]).unwrap();
2863 let sliced_batch = orig_batch.slice(1, num_rows - 1);
2865
2866 let schema = orig_batch.schema();
2867 let stream_data = {
2868 let mut writer = StreamWriter::try_new(vec![], &schema).unwrap();
2869 writer.write(&sliced_batch).unwrap();
2870 writer.into_inner().unwrap()
2871 };
2872 let read_batch = {
2873 let projection = None;
2874 let mut reader = StreamReader::try_new(Cursor::new(stream_data), projection).unwrap();
2875 reader
2876 .next()
2877 .expect("expect no errors reading batch")
2878 .expect("expect batch")
2879 };
2880 assert_eq!(sliced_batch, read_batch);
2881
2882 let file_data = {
2883 let mut writer = FileWriter::try_new_buffered(vec![], &schema).unwrap();
2884 writer.write(&sliced_batch).unwrap();
2885 writer.into_inner().unwrap().into_inner().unwrap()
2886 };
2887 let read_batch = {
2888 let projection = None;
2889 let mut reader = FileReader::try_new(Cursor::new(file_data), projection).unwrap();
2890 reader
2891 .next()
2892 .expect("expect no errors reading batch")
2893 .expect("expect batch")
2894 };
2895 assert_eq!(sliced_batch, read_batch);
2896
2897 }
2899
2900 #[test]
2901 fn encode_bools_slice() {
2902 assert_bool_roundtrip([true, false], 1, 1);
2904
2905 assert_bool_roundtrip(
2907 [
2908 true, false, true, true, false, false, true, true, true, false, false, false, true,
2909 true, true, true, false, false, false, false, true, true, true, true, true, false,
2910 false, false, false, false,
2911 ],
2912 13,
2913 17,
2914 );
2915
2916 assert_bool_roundtrip(
2918 [
2919 true, false, true, true, false, false, true, true, true, false, false, false,
2920 ],
2921 8,
2922 2,
2923 );
2924
2925 assert_bool_roundtrip(
2927 [
2928 true, false, true, true, false, false, true, true, true, false, false, false, true,
2929 true, true, true, true, false, false, false, false, false,
2930 ],
2931 8,
2932 8,
2933 );
2934 }
2935
2936 fn assert_bool_roundtrip<const N: usize>(bools: [bool; N], offset: usize, length: usize) {
2937 let val_bool_field = Field::new("val", DataType::Boolean, false);
2938
2939 let schema = Arc::new(Schema::new(vec![val_bool_field]));
2940
2941 let bools = BooleanArray::from(bools.to_vec());
2942
2943 let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(bools)]).unwrap();
2944 let batch = batch.slice(offset, length);
2945
2946 let data = serialize_stream(&batch);
2947 let batch2 = deserialize_stream(data);
2948 assert_eq!(batch, batch2);
2949 }
2950
2951 #[test]
2952 fn test_run_array_unslice() {
2953 let total_len = 80;
2954 let vals: Vec<Option<i32>> = vec![Some(1), None, Some(2), Some(3), Some(4), None, Some(5)];
2955 let repeats: Vec<usize> = vec![3, 4, 1, 2];
2956 let mut input_array: Vec<Option<i32>> = Vec::with_capacity(total_len);
2957 for ix in 0_usize..32 {
2958 let repeat: usize = repeats[ix % repeats.len()];
2959 let val: Option<i32> = vals[ix % vals.len()];
2960 input_array.resize(input_array.len() + repeat, val);
2961 }
2962
2963 let mut builder =
2965 PrimitiveRunBuilder::<Int16Type, Int32Type>::with_capacity(input_array.len());
2966 builder.extend(input_array.iter().copied());
2967 let run_array = builder.finish();
2968
2969 for slice_len in 1..=total_len {
2971 let sliced_run_array: RunArray<Int16Type> =
2973 run_array.slice(0, slice_len).into_data().into();
2974
2975 let unsliced_run_array = into_zero_offset_run_array(sliced_run_array).unwrap();
2977 let typed = unsliced_run_array
2978 .downcast::<PrimitiveArray<Int32Type>>()
2979 .unwrap();
2980 let expected: Vec<Option<i32>> = input_array.iter().take(slice_len).copied().collect();
2981 let actual: Vec<Option<i32>> = typed.into_iter().collect();
2982 assert_eq!(expected, actual);
2983
2984 let sliced_run_array: RunArray<Int16Type> = run_array
2986 .slice(total_len - slice_len, slice_len)
2987 .into_data()
2988 .into();
2989
2990 let unsliced_run_array = into_zero_offset_run_array(sliced_run_array).unwrap();
2992 let typed = unsliced_run_array
2993 .downcast::<PrimitiveArray<Int32Type>>()
2994 .unwrap();
2995 let expected: Vec<Option<i32>> = input_array
2996 .iter()
2997 .skip(total_len - slice_len)
2998 .copied()
2999 .collect();
3000 let actual: Vec<Option<i32>> = typed.into_iter().collect();
3001 assert_eq!(expected, actual);
3002 }
3003 }
3004
3005 fn generate_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3006 let mut ls = GenericListBuilder::<O, _>::new(UInt32Builder::new());
3007
3008 for i in 0..100_000 {
3009 for value in [i, i, i] {
3010 ls.values().append_value(value);
3011 }
3012 ls.append(true)
3013 }
3014
3015 ls.finish()
3016 }
3017
3018 fn generate_utf8view_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3019 let mut ls = GenericListBuilder::<O, _>::new(StringViewBuilder::new());
3020
3021 for i in 0..100_000 {
3022 for value in [
3023 format!("value{}", i),
3024 format!("value{}", i),
3025 format!("value{}", i),
3026 ] {
3027 ls.values().append_value(&value);
3028 }
3029 ls.append(true)
3030 }
3031
3032 ls.finish()
3033 }
3034
3035 fn generate_string_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3036 let mut ls = GenericListBuilder::<O, _>::new(StringBuilder::new());
3037
3038 for i in 0..100_000 {
3039 for value in [
3040 format!("value{}", i),
3041 format!("value{}", i),
3042 format!("value{}", i),
3043 ] {
3044 ls.values().append_value(&value);
3045 }
3046 ls.append(true)
3047 }
3048
3049 ls.finish()
3050 }
3051
3052 fn generate_nested_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
3053 let mut ls =
3054 GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(UInt32Builder::new()));
3055
3056 for _i in 0..10_000 {
3057 for j in 0..10 {
3058 for value in [j, j, j, j] {
3059 ls.values().values().append_value(value);
3060 }
3061 ls.values().append(true)
3062 }
3063 ls.append(true);
3064 }
3065
3066 ls.finish()
3067 }
3068
3069 fn generate_nested_list_data_starting_at_zero<O: OffsetSizeTrait>() -> GenericListArray<O> {
3070 let mut ls =
3071 GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(UInt32Builder::new()));
3072
3073 for _i in 0..999 {
3074 ls.values().append(true);
3075 ls.append(true);
3076 }
3077
3078 for j in 0..10 {
3079 for value in [j, j, j, j] {
3080 ls.values().values().append_value(value);
3081 }
3082 ls.values().append(true)
3083 }
3084 ls.append(true);
3085
3086 for i in 0..9_000 {
3087 for j in 0..10 {
3088 for value in [i + j, i + j, i + j, i + j] {
3089 ls.values().values().append_value(value);
3090 }
3091 ls.values().append(true)
3092 }
3093 ls.append(true);
3094 }
3095
3096 ls.finish()
3097 }
3098
3099 fn generate_map_array_data() -> MapArray {
3100 let keys_builder = UInt32Builder::new();
3101 let values_builder = UInt32Builder::new();
3102
3103 let mut builder = MapBuilder::new(None, keys_builder, values_builder);
3104
3105 for i in 0..100_000 {
3106 for _j in 0..3 {
3107 builder.keys().append_value(i);
3108 builder.values().append_value(i * 2);
3109 }
3110 builder.append(true).unwrap();
3111 }
3112
3113 builder.finish()
3114 }
3115
3116 #[test]
3117 fn reencode_offsets_when_first_offset_is_not_zero() {
3118 let original_list = generate_list_data::<i32>();
3119 let original_data = original_list.into_data();
3120 let slice_data = original_data.slice(75, 7);
3121 let (new_offsets, original_start, length) =
3122 reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
3123 assert_eq!(
3124 vec![0, 3, 6, 9, 12, 15, 18, 21],
3125 new_offsets.typed_data::<i32>()
3126 );
3127 assert_eq!(225, original_start);
3128 assert_eq!(21, length);
3129 }
3130
3131 #[test]
3132 fn reencode_offsets_when_first_offset_is_zero() {
3133 let mut ls = GenericListBuilder::<i32, _>::new(UInt32Builder::new());
3134 ls.append(true);
3136 ls.values().append_value(35);
3137 ls.values().append_value(42);
3138 ls.append(true);
3139 let original_list = ls.finish();
3140 let original_data = original_list.into_data();
3141
3142 let slice_data = original_data.slice(1, 1);
3143 let (new_offsets, original_start, length) =
3144 reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
3145 assert_eq!(vec![0, 2], new_offsets.typed_data::<i32>());
3146 assert_eq!(0, original_start);
3147 assert_eq!(2, length);
3148 }
3149
3150 fn roundtrip_ensure_sliced_smaller(in_batch: RecordBatch, expected_size_factor: usize) {
3153 let in_sliced = in_batch.slice(999, 1);
3155
3156 let bytes_batch = serialize_file(&in_batch);
3157 let bytes_sliced = serialize_file(&in_sliced);
3158
3159 assert!(bytes_sliced.len() < (bytes_batch.len() / expected_size_factor));
3161
3162 let out_batch = deserialize_file(bytes_batch);
3164 assert_eq!(in_batch, out_batch);
3165
3166 let out_sliced = deserialize_file(bytes_sliced);
3167 assert_eq!(in_sliced, out_sliced);
3168 }
3169
3170 #[test]
3171 fn encode_lists() {
3172 let val_inner = Field::new_list_field(DataType::UInt32, true);
3173 let val_list_field = Field::new("val", DataType::List(Arc::new(val_inner)), false);
3174 let schema = Arc::new(Schema::new(vec![val_list_field]));
3175
3176 let values = Arc::new(generate_list_data::<i32>());
3177
3178 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3179 roundtrip_ensure_sliced_smaller(in_batch, 1000);
3180 }
3181
3182 #[test]
3183 fn encode_empty_list() {
3184 let val_inner = Field::new_list_field(DataType::UInt32, true);
3185 let val_list_field = Field::new("val", DataType::List(Arc::new(val_inner)), false);
3186 let schema = Arc::new(Schema::new(vec![val_list_field]));
3187
3188 let values = Arc::new(generate_list_data::<i32>());
3189
3190 let in_batch = RecordBatch::try_new(schema, vec![values])
3191 .unwrap()
3192 .slice(999, 0);
3193 let out_batch = deserialize_file(serialize_file(&in_batch));
3194 assert_eq!(in_batch, out_batch);
3195 }
3196
3197 #[test]
3198 fn encode_large_lists() {
3199 let val_inner = Field::new_list_field(DataType::UInt32, true);
3200 let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3201 let schema = Arc::new(Schema::new(vec![val_list_field]));
3202
3203 let values = Arc::new(generate_list_data::<i64>());
3204
3205 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3208 roundtrip_ensure_sliced_smaller(in_batch, 1000);
3209 }
3210
3211 #[test]
3212 fn encode_large_lists_non_zero_offset() {
3213 let val_inner = Field::new_list_field(DataType::UInt32, true);
3214 let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3215 let schema = Arc::new(Schema::new(vec![val_list_field]));
3216
3217 let values = Arc::new(generate_list_data::<i64>());
3218
3219 check_sliced_list_array(schema, values);
3220 }
3221
3222 #[test]
3223 fn encode_large_lists_string_non_zero_offset() {
3224 let val_inner = Field::new_list_field(DataType::Utf8, true);
3225 let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3226 let schema = Arc::new(Schema::new(vec![val_list_field]));
3227
3228 let values = Arc::new(generate_string_list_data::<i64>());
3229
3230 check_sliced_list_array(schema, values);
3231 }
3232
3233 #[test]
3234 fn encode_large_list_string_view_non_zero_offset() {
3235 let val_inner = Field::new_list_field(DataType::Utf8View, true);
3236 let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
3237 let schema = Arc::new(Schema::new(vec![val_list_field]));
3238
3239 let values = Arc::new(generate_utf8view_list_data::<i64>());
3240
3241 check_sliced_list_array(schema, values);
3242 }
3243
3244 fn check_sliced_list_array(schema: Arc<Schema>, values: Arc<GenericListArray<i64>>) {
3245 for (offset, len) in [(999, 1), (0, 13), (47, 12), (values.len() - 13, 13)] {
3246 let in_batch = RecordBatch::try_new(schema.clone(), vec![values.clone()])
3247 .unwrap()
3248 .slice(offset, len);
3249 let out_batch = deserialize_file(serialize_file(&in_batch));
3250 assert_eq!(in_batch, out_batch);
3251 }
3252 }
3253
3254 #[test]
3255 fn encode_nested_lists() {
3256 let inner_int = Arc::new(Field::new_list_field(DataType::UInt32, true));
3257 let inner_list_field = Arc::new(Field::new_list_field(DataType::List(inner_int), true));
3258 let list_field = Field::new("val", DataType::List(inner_list_field), true);
3259 let schema = Arc::new(Schema::new(vec![list_field]));
3260
3261 let values = Arc::new(generate_nested_list_data::<i32>());
3262
3263 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3264 roundtrip_ensure_sliced_smaller(in_batch, 1000);
3265 }
3266
3267 #[test]
3268 fn encode_nested_lists_starting_at_zero() {
3269 let inner_int = Arc::new(Field::new("item", DataType::UInt32, true));
3270 let inner_list_field = Arc::new(Field::new("item", DataType::List(inner_int), true));
3271 let list_field = Field::new("val", DataType::List(inner_list_field), true);
3272 let schema = Arc::new(Schema::new(vec![list_field]));
3273
3274 let values = Arc::new(generate_nested_list_data_starting_at_zero::<i32>());
3275
3276 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3277 roundtrip_ensure_sliced_smaller(in_batch, 1);
3278 }
3279
3280 #[test]
3281 fn encode_map_array() {
3282 let keys = Arc::new(Field::new("keys", DataType::UInt32, false));
3283 let values = Arc::new(Field::new("values", DataType::UInt32, true));
3284 let map_field = Field::new_map("map", "entries", keys, values, false, true);
3285 let schema = Arc::new(Schema::new(vec![map_field]));
3286
3287 let values = Arc::new(generate_map_array_data());
3288
3289 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3290 roundtrip_ensure_sliced_smaller(in_batch, 1000);
3291 }
3292
3293 fn generate_list_view_data<O: OffsetSizeTrait>() -> GenericListViewArray<O> {
3294 let mut builder = GenericListViewBuilder::<O, _>::new(UInt32Builder::new());
3295
3296 for i in 0u32..100_000 {
3297 if i.is_multiple_of(10_000) {
3298 builder.append(false);
3299 continue;
3300 }
3301 for value in [i, i, i] {
3302 builder.values().append_value(value);
3303 }
3304 builder.append(true);
3305 }
3306
3307 builder.finish()
3308 }
3309
3310 #[test]
3311 fn encode_list_view_arrays() {
3312 let val_inner = Field::new_list_field(DataType::UInt32, true);
3313 let val_field = Field::new("val", DataType::ListView(Arc::new(val_inner)), true);
3314 let schema = Arc::new(Schema::new(vec![val_field]));
3315
3316 let values = Arc::new(generate_list_view_data::<i32>());
3317
3318 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3319 let out_batch = deserialize_file(serialize_file(&in_batch));
3320 assert_eq!(in_batch, out_batch);
3321 }
3322
3323 #[test]
3324 fn encode_large_list_view_arrays() {
3325 let val_inner = Field::new_list_field(DataType::UInt32, true);
3326 let val_field = Field::new("val", DataType::LargeListView(Arc::new(val_inner)), true);
3327 let schema = Arc::new(Schema::new(vec![val_field]));
3328
3329 let values = Arc::new(generate_list_view_data::<i64>());
3330
3331 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3332 let out_batch = deserialize_file(serialize_file(&in_batch));
3333 assert_eq!(in_batch, out_batch);
3334 }
3335
3336 #[test]
3337 fn check_sliced_list_view_array() {
3338 let inner = Field::new_list_field(DataType::UInt32, true);
3339 let field = Field::new("val", DataType::ListView(Arc::new(inner)), true);
3340 let schema = Arc::new(Schema::new(vec![field]));
3341 let values = Arc::new(generate_list_view_data::<i32>());
3342
3343 for (offset, len) in [(999, 1), (0, 13), (47, 12), (values.len() - 13, 13)] {
3344 let in_batch = RecordBatch::try_new(schema.clone(), vec![values.clone()])
3345 .unwrap()
3346 .slice(offset, len);
3347 let out_batch = deserialize_file(serialize_file(&in_batch));
3348 assert_eq!(in_batch, out_batch);
3349 }
3350 }
3351
3352 #[test]
3353 fn check_sliced_large_list_view_array() {
3354 let inner = Field::new_list_field(DataType::UInt32, true);
3355 let field = Field::new("val", DataType::LargeListView(Arc::new(inner)), true);
3356 let schema = Arc::new(Schema::new(vec![field]));
3357 let values = Arc::new(generate_list_view_data::<i64>());
3358
3359 for (offset, len) in [(999, 1), (0, 13), (47, 12), (values.len() - 13, 13)] {
3360 let in_batch = RecordBatch::try_new(schema.clone(), vec![values.clone()])
3361 .unwrap()
3362 .slice(offset, len);
3363 let out_batch = deserialize_file(serialize_file(&in_batch));
3364 assert_eq!(in_batch, out_batch);
3365 }
3366 }
3367
3368 fn generate_nested_list_view_data<O: OffsetSizeTrait>() -> GenericListViewArray<O> {
3369 let inner_builder = UInt32Builder::new();
3370 let middle_builder = GenericListViewBuilder::<O, _>::new(inner_builder);
3371 let mut outer_builder = GenericListViewBuilder::<O, _>::new(middle_builder);
3372
3373 for i in 0u32..10_000 {
3374 if i.is_multiple_of(1_000) {
3375 outer_builder.append(false);
3376 continue;
3377 }
3378
3379 for _ in 0..3 {
3380 for value in [i, i + 1, i + 2] {
3381 outer_builder.values().values().append_value(value);
3382 }
3383 outer_builder.values().append(true);
3384 }
3385 outer_builder.append(true);
3386 }
3387
3388 outer_builder.finish()
3389 }
3390
3391 #[test]
3392 fn encode_nested_list_views() {
3393 let inner_int = Arc::new(Field::new_list_field(DataType::UInt32, true));
3394 let inner_list_field = Arc::new(Field::new_list_field(DataType::ListView(inner_int), true));
3395 let list_field = Field::new("val", DataType::ListView(inner_list_field), true);
3396 let schema = Arc::new(Schema::new(vec![list_field]));
3397
3398 let values = Arc::new(generate_nested_list_view_data::<i32>());
3399
3400 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3401 let out_batch = deserialize_file(serialize_file(&in_batch));
3402 assert_eq!(in_batch, out_batch);
3403 }
3404
3405 #[test]
3406 fn test_decimal128_alignment16_is_sufficient() {
3407 const IPC_ALIGNMENT: usize = 16;
3408
3409 for num_cols in [1, 2, 3, 17, 50, 73, 99] {
3414 let num_rows = (num_cols * 7 + 11) % 100; let mut fields = Vec::new();
3417 let mut arrays = Vec::new();
3418 for i in 0..num_cols {
3419 let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
3420 let array = Decimal128Array::from(vec![num_cols as i128; num_rows]);
3421 fields.push(field);
3422 arrays.push(Arc::new(array) as Arc<dyn Array>);
3423 }
3424 let schema = Schema::new(fields);
3425 let batch = RecordBatch::try_new(Arc::new(schema), arrays).unwrap();
3426
3427 let mut writer = FileWriter::try_new_with_options(
3428 Vec::new(),
3429 batch.schema_ref(),
3430 IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
3431 )
3432 .unwrap();
3433 writer.write(&batch).unwrap();
3434 writer.finish().unwrap();
3435
3436 let out: Vec<u8> = writer.into_inner().unwrap();
3437
3438 let buffer = Buffer::from_vec(out);
3439 let trailer_start = buffer.len() - 10;
3440 let footer_len =
3441 read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
3442 let footer =
3443 root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
3444
3445 let schema = fb_to_schema(footer.schema().unwrap());
3446
3447 let decoder =
3450 FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true);
3451
3452 let batches = footer.recordBatches().unwrap();
3453
3454 let block = batches.get(0);
3455 let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
3456 let data = buffer.slice_with_length(block.offset() as _, block_len);
3457
3458 let batch2 = decoder.read_record_batch(block, &data).unwrap().unwrap();
3459
3460 assert_eq!(batch, batch2);
3461 }
3462 }
3463
3464 #[test]
3465 fn test_decimal128_alignment8_is_unaligned() {
3466 const IPC_ALIGNMENT: usize = 8;
3467
3468 let num_cols = 2;
3469 let num_rows = 1;
3470
3471 let mut fields = Vec::new();
3472 let mut arrays = Vec::new();
3473 for i in 0..num_cols {
3474 let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
3475 let array = Decimal128Array::from(vec![num_cols as i128; num_rows]);
3476 fields.push(field);
3477 arrays.push(Arc::new(array) as Arc<dyn Array>);
3478 }
3479 let schema = Schema::new(fields);
3480 let batch = RecordBatch::try_new(Arc::new(schema), arrays).unwrap();
3481
3482 let mut writer = FileWriter::try_new_with_options(
3483 Vec::new(),
3484 batch.schema_ref(),
3485 IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
3486 )
3487 .unwrap();
3488 writer.write(&batch).unwrap();
3489 writer.finish().unwrap();
3490
3491 let out: Vec<u8> = writer.into_inner().unwrap();
3492
3493 let buffer = Buffer::from_vec(out);
3494 let trailer_start = buffer.len() - 10;
3495 let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
3496 let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
3497 let schema = fb_to_schema(footer.schema().unwrap());
3498
3499 let decoder =
3502 FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true);
3503
3504 let batches = footer.recordBatches().unwrap();
3505
3506 let block = batches.get(0);
3507 let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
3508 let data = buffer.slice_with_length(block.offset() as _, block_len);
3509
3510 let result = decoder.read_record_batch(block, &data);
3511
3512 let error = result.unwrap_err();
3513 assert_eq!(
3514 error.to_string(),
3515 "Invalid argument error: Misaligned buffers[0] in array of type Decimal128(38, 10), \
3516 offset from expected alignment of 16 by 8"
3517 );
3518 }
3519
3520 #[test]
3521 fn test_flush() {
3522 let num_cols = 2;
3525 let mut fields = Vec::new();
3526 let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap();
3527 for i in 0..num_cols {
3528 let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
3529 fields.push(field);
3530 }
3531 let schema = Schema::new(fields);
3532 let inner_stream_writer = BufWriter::with_capacity(1024, Vec::new());
3533 let inner_file_writer = BufWriter::with_capacity(1024, Vec::new());
3534 let mut stream_writer =
3535 StreamWriter::try_new_with_options(inner_stream_writer, &schema, options.clone())
3536 .unwrap();
3537 let mut file_writer =
3538 FileWriter::try_new_with_options(inner_file_writer, &schema, options).unwrap();
3539
3540 let stream_bytes_written_on_new = stream_writer.get_ref().get_ref().len();
3541 let file_bytes_written_on_new = file_writer.get_ref().get_ref().len();
3542 stream_writer.flush().unwrap();
3543 file_writer.flush().unwrap();
3544 let stream_bytes_written_on_flush = stream_writer.get_ref().get_ref().len();
3545 let file_bytes_written_on_flush = file_writer.get_ref().get_ref().len();
3546 let stream_out = stream_writer.into_inner().unwrap().into_inner().unwrap();
3547 let expected_stream_flushed_bytes = stream_out.len() - 8;
3551 let expected_file_flushed_bytes = expected_stream_flushed_bytes + 8;
3554
3555 assert!(
3556 stream_bytes_written_on_new < stream_bytes_written_on_flush,
3557 "this test makes no sense if flush is not actually required"
3558 );
3559 assert!(
3560 file_bytes_written_on_new < file_bytes_written_on_flush,
3561 "this test makes no sense if flush is not actually required"
3562 );
3563 assert_eq!(stream_bytes_written_on_flush, expected_stream_flushed_bytes);
3564 assert_eq!(file_bytes_written_on_flush, expected_file_flushed_bytes);
3565 }
3566
3567 #[test]
3568 fn test_roundtrip_list_of_fixed_list() -> Result<(), ArrowError> {
3569 let l1_type =
3570 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, false)), 3);
3571 let l2_type = DataType::List(Arc::new(Field::new("item", l1_type.clone(), false)));
3572
3573 let l0_builder = Float32Builder::new();
3574 let l1_builder = FixedSizeListBuilder::new(l0_builder, 3).with_field(Arc::new(Field::new(
3575 "item",
3576 DataType::Float32,
3577 false,
3578 )));
3579 let mut l2_builder =
3580 ListBuilder::new(l1_builder).with_field(Arc::new(Field::new("item", l1_type, false)));
3581
3582 for point in [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0], [7.0, 8.0, 9.0]] {
3583 l2_builder.values().values().append_value(point[0]);
3584 l2_builder.values().values().append_value(point[1]);
3585 l2_builder.values().values().append_value(point[2]);
3586
3587 l2_builder.values().append(true);
3588 }
3589 l2_builder.append(true);
3590
3591 let point = [10., 11., 12.];
3592 l2_builder.values().values().append_value(point[0]);
3593 l2_builder.values().values().append_value(point[1]);
3594 l2_builder.values().values().append_value(point[2]);
3595
3596 l2_builder.values().append(true);
3597 l2_builder.append(true);
3598
3599 let array = Arc::new(l2_builder.finish()) as ArrayRef;
3600
3601 let schema = Arc::new(Schema::new_with_metadata(
3602 vec![Field::new("points", l2_type, false)],
3603 HashMap::default(),
3604 ));
3605
3606 test_slices(&array, &schema, 0, 1)?;
3609 test_slices(&array, &schema, 0, 2)?;
3610 test_slices(&array, &schema, 1, 1)?;
3611
3612 Ok(())
3613 }
3614
3615 #[test]
3616 fn test_roundtrip_list_of_fixed_list_w_nulls() -> Result<(), ArrowError> {
3617 let l0_builder = Float32Builder::new();
3618 let l1_builder = FixedSizeListBuilder::new(l0_builder, 3);
3619 let mut l2_builder = ListBuilder::new(l1_builder);
3620
3621 for point in [
3622 [Some(1.0), Some(2.0), None],
3623 [Some(4.0), Some(5.0), Some(6.0)],
3624 [None, Some(8.0), Some(9.0)],
3625 ] {
3626 for p in point {
3627 match p {
3628 Some(p) => l2_builder.values().values().append_value(p),
3629 None => l2_builder.values().values().append_null(),
3630 }
3631 }
3632
3633 l2_builder.values().append(true);
3634 }
3635 l2_builder.append(true);
3636
3637 let point = [Some(10.), None, None];
3638 for p in point {
3639 match p {
3640 Some(p) => l2_builder.values().values().append_value(p),
3641 None => l2_builder.values().values().append_null(),
3642 }
3643 }
3644
3645 l2_builder.values().append(true);
3646 l2_builder.append(true);
3647
3648 let array = Arc::new(l2_builder.finish()) as ArrayRef;
3649
3650 let schema = Arc::new(Schema::new_with_metadata(
3651 vec![Field::new(
3652 "points",
3653 DataType::List(Arc::new(Field::new(
3654 "item",
3655 DataType::FixedSizeList(
3656 Arc::new(Field::new("item", DataType::Float32, true)),
3657 3,
3658 ),
3659 true,
3660 ))),
3661 true,
3662 )],
3663 HashMap::default(),
3664 ));
3665
3666 test_slices(&array, &schema, 0, 1)?;
3669 test_slices(&array, &schema, 0, 2)?;
3670 test_slices(&array, &schema, 1, 1)?;
3671
3672 Ok(())
3673 }
3674
3675 fn test_slices(
3676 parent_array: &ArrayRef,
3677 schema: &SchemaRef,
3678 offset: usize,
3679 length: usize,
3680 ) -> Result<(), ArrowError> {
3681 let subarray = parent_array.slice(offset, length);
3682 let original_batch = RecordBatch::try_new(schema.clone(), vec![subarray])?;
3683
3684 let mut bytes = Vec::new();
3685 let mut writer = StreamWriter::try_new(&mut bytes, schema)?;
3686 writer.write(&original_batch)?;
3687 writer.finish()?;
3688
3689 let mut cursor = std::io::Cursor::new(bytes);
3690 let mut reader = StreamReader::try_new(&mut cursor, None)?;
3691 let returned_batch = reader.next().unwrap()?;
3692
3693 assert_eq!(original_batch, returned_batch);
3694
3695 Ok(())
3696 }
3697
3698 #[test]
3699 fn test_roundtrip_fixed_list() -> Result<(), ArrowError> {
3700 let int_builder = Int64Builder::new();
3701 let mut fixed_list_builder = FixedSizeListBuilder::new(int_builder, 3)
3702 .with_field(Arc::new(Field::new("item", DataType::Int64, false)));
3703
3704 for point in [[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]] {
3705 fixed_list_builder.values().append_value(point[0]);
3706 fixed_list_builder.values().append_value(point[1]);
3707 fixed_list_builder.values().append_value(point[2]);
3708
3709 fixed_list_builder.append(true);
3710 }
3711
3712 let array = Arc::new(fixed_list_builder.finish()) as ArrayRef;
3713
3714 let schema = Arc::new(Schema::new_with_metadata(
3715 vec![Field::new(
3716 "points",
3717 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, false)), 3),
3718 false,
3719 )],
3720 HashMap::default(),
3721 ));
3722
3723 test_slices(&array, &schema, 0, 4)?;
3726 test_slices(&array, &schema, 0, 2)?;
3727 test_slices(&array, &schema, 1, 3)?;
3728 test_slices(&array, &schema, 2, 1)?;
3729
3730 Ok(())
3731 }
3732
3733 #[test]
3734 fn test_roundtrip_fixed_list_w_nulls() -> Result<(), ArrowError> {
3735 let int_builder = Int64Builder::new();
3736 let mut fixed_list_builder = FixedSizeListBuilder::new(int_builder, 3);
3737
3738 for point in [
3739 [Some(1), Some(2), None],
3740 [Some(4), Some(5), Some(6)],
3741 [None, Some(8), Some(9)],
3742 [Some(10), None, None],
3743 ] {
3744 for p in point {
3745 match p {
3746 Some(p) => fixed_list_builder.values().append_value(p),
3747 None => fixed_list_builder.values().append_null(),
3748 }
3749 }
3750
3751 fixed_list_builder.append(true);
3752 }
3753
3754 let array = Arc::new(fixed_list_builder.finish()) as ArrayRef;
3755
3756 let schema = Arc::new(Schema::new_with_metadata(
3757 vec![Field::new(
3758 "points",
3759 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, true)), 3),
3760 true,
3761 )],
3762 HashMap::default(),
3763 ));
3764
3765 test_slices(&array, &schema, 0, 4)?;
3768 test_slices(&array, &schema, 0, 2)?;
3769 test_slices(&array, &schema, 1, 3)?;
3770 test_slices(&array, &schema, 2, 1)?;
3771
3772 Ok(())
3773 }
3774
3775 #[test]
3776 fn test_metadata_encoding_ordering() {
3777 fn create_hash() -> u64 {
3778 let metadata: HashMap<String, String> = [
3779 ("a", "1"), ("b", "2"), ("c", "3"), ("d", "4"), ("e", "5"), ]
3785 .into_iter()
3786 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3787 .collect();
3788
3789 let schema = Arc::new(
3791 Schema::new(vec![
3792 Field::new("a", DataType::Int64, true).with_metadata(metadata.clone()),
3793 ])
3794 .with_metadata(metadata)
3795 .clone(),
3796 );
3797 let batch = RecordBatch::new_empty(schema.clone());
3798
3799 let mut bytes = Vec::new();
3800 let mut w = StreamWriter::try_new(&mut bytes, batch.schema_ref()).unwrap();
3801 w.write(&batch).unwrap();
3802 w.finish().unwrap();
3803
3804 let mut h = std::hash::DefaultHasher::new();
3805 h.write(&bytes);
3806 h.finish()
3807 }
3808
3809 let expected = create_hash();
3810
3811 let all_passed = (0..20).all(|_| create_hash() == expected);
3816 assert!(all_passed);
3817 }
3818}