1use std::cmp::min;
28use std::collections::HashMap;
29use std::io::{BufWriter, Write};
30use std::mem::size_of;
31use std::sync::Arc;
32
33use flatbuffers::FlatBufferBuilder;
34
35use arrow_array::builder::BufferBuilder;
36use arrow_array::cast::*;
37use arrow_array::types::{Int16Type, Int32Type, Int64Type, RunEndIndexType};
38use arrow_array::*;
39use arrow_buffer::bit_util;
40use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer};
41use arrow_data::{layout, ArrayData, ArrayDataBuilder, BufferSpec};
42use arrow_schema::*;
43
44use crate::compression::CompressionCodec;
45use crate::convert::IpcSchemaEncoder;
46use crate::CONTINUATION_MARKER;
47
48#[derive(Debug, Clone)]
50pub struct IpcWriteOptions {
51 alignment: u8,
54 write_legacy_ipc_format: bool,
56 metadata_version: crate::MetadataVersion,
65 batch_compression_type: Option<crate::CompressionType>,
68}
69
70impl IpcWriteOptions {
71 pub fn try_with_compression(
76 mut self,
77 batch_compression_type: Option<crate::CompressionType>,
78 ) -> Result<Self, ArrowError> {
79 self.batch_compression_type = batch_compression_type;
80
81 if self.batch_compression_type.is_some()
82 && self.metadata_version < crate::MetadataVersion::V5
83 {
84 return Err(ArrowError::InvalidArgumentError(
85 "Compression only supported in metadata v5 and above".to_string(),
86 ));
87 }
88 Ok(self)
89 }
90 pub fn try_new(
92 alignment: usize,
93 write_legacy_ipc_format: bool,
94 metadata_version: crate::MetadataVersion,
95 ) -> Result<Self, ArrowError> {
96 let is_alignment_valid =
97 alignment == 8 || alignment == 16 || alignment == 32 || alignment == 64;
98 if !is_alignment_valid {
99 return Err(ArrowError::InvalidArgumentError(
100 "Alignment should be 8, 16, 32, or 64.".to_string(),
101 ));
102 }
103 let alignment: u8 = u8::try_from(alignment).expect("range already checked");
104 match metadata_version {
105 crate::MetadataVersion::V1
106 | crate::MetadataVersion::V2
107 | crate::MetadataVersion::V3 => Err(ArrowError::InvalidArgumentError(
108 "Writing IPC metadata version 3 and lower not supported".to_string(),
109 )),
110 #[allow(deprecated)]
111 crate::MetadataVersion::V4 => Ok(Self {
112 alignment,
113 write_legacy_ipc_format,
114 metadata_version,
115 batch_compression_type: None,
116 }),
117 crate::MetadataVersion::V5 => {
118 if write_legacy_ipc_format {
119 Err(ArrowError::InvalidArgumentError(
120 "Legacy IPC format only supported on metadata version 4".to_string(),
121 ))
122 } else {
123 Ok(Self {
124 alignment,
125 write_legacy_ipc_format,
126 metadata_version,
127 batch_compression_type: None,
128 })
129 }
130 }
131 z => Err(ArrowError::InvalidArgumentError(format!(
132 "Unsupported crate::MetadataVersion {z:?}"
133 ))),
134 }
135 }
136}
137
138impl Default for IpcWriteOptions {
139 fn default() -> Self {
140 Self {
141 alignment: 64,
142 write_legacy_ipc_format: false,
143 metadata_version: crate::MetadataVersion::V5,
144 batch_compression_type: None,
145 }
146 }
147}
148
149#[derive(Debug, Default)]
150pub struct IpcDataGenerator {}
182
183impl IpcDataGenerator {
184 pub fn schema_to_bytes_with_dictionary_tracker(
187 &self,
188 schema: &Schema,
189 dictionary_tracker: &mut DictionaryTracker,
190 write_options: &IpcWriteOptions,
191 ) -> EncodedData {
192 let mut fbb = FlatBufferBuilder::new();
193 let schema = {
194 let fb = IpcSchemaEncoder::new()
195 .with_dictionary_tracker(dictionary_tracker)
196 .schema_to_fb_offset(&mut fbb, schema);
197 fb.as_union_value()
198 };
199
200 let mut message = crate::MessageBuilder::new(&mut fbb);
201 message.add_version(write_options.metadata_version);
202 message.add_header_type(crate::MessageHeader::Schema);
203 message.add_bodyLength(0);
204 message.add_header(schema);
205 let data = message.finish();
207 fbb.finish(data, None);
208
209 let data = fbb.finished_data();
210 EncodedData {
211 ipc_message: data.to_vec(),
212 arrow_data: vec![],
213 }
214 }
215
216 fn _encode_dictionaries<I: Iterator<Item = i64>>(
217 &self,
218 column: &ArrayRef,
219 encoded_dictionaries: &mut Vec<EncodedData>,
220 dictionary_tracker: &mut DictionaryTracker,
221 write_options: &IpcWriteOptions,
222 dict_id: &mut I,
223 ) -> Result<(), ArrowError> {
224 match column.data_type() {
225 DataType::Struct(fields) => {
226 let s = as_struct_array(column);
227 for (field, column) in fields.iter().zip(s.columns()) {
228 self.encode_dictionaries(
229 field,
230 column,
231 encoded_dictionaries,
232 dictionary_tracker,
233 write_options,
234 dict_id,
235 )?;
236 }
237 }
238 DataType::RunEndEncoded(_, values) => {
239 let data = column.to_data();
240 if data.child_data().len() != 2 {
241 return Err(ArrowError::InvalidArgumentError(format!(
242 "The run encoded array should have exactly two child arrays. Found {}",
243 data.child_data().len()
244 )));
245 }
246 let values_array = make_array(data.child_data()[1].clone());
249 self.encode_dictionaries(
250 values,
251 &values_array,
252 encoded_dictionaries,
253 dictionary_tracker,
254 write_options,
255 dict_id,
256 )?;
257 }
258 DataType::List(field) => {
259 let list = as_list_array(column);
260 self.encode_dictionaries(
261 field,
262 list.values(),
263 encoded_dictionaries,
264 dictionary_tracker,
265 write_options,
266 dict_id,
267 )?;
268 }
269 DataType::LargeList(field) => {
270 let list = as_large_list_array(column);
271 self.encode_dictionaries(
272 field,
273 list.values(),
274 encoded_dictionaries,
275 dictionary_tracker,
276 write_options,
277 dict_id,
278 )?;
279 }
280 DataType::FixedSizeList(field, _) => {
281 let list = column
282 .as_any()
283 .downcast_ref::<FixedSizeListArray>()
284 .expect("Unable to downcast to fixed size list array");
285 self.encode_dictionaries(
286 field,
287 list.values(),
288 encoded_dictionaries,
289 dictionary_tracker,
290 write_options,
291 dict_id,
292 )?;
293 }
294 DataType::Map(field, _) => {
295 let map_array = as_map_array(column);
296
297 let (keys, values) = match field.data_type() {
298 DataType::Struct(fields) if fields.len() == 2 => (&fields[0], &fields[1]),
299 _ => panic!("Incorrect field data type {:?}", field.data_type()),
300 };
301
302 self.encode_dictionaries(
304 keys,
305 map_array.keys(),
306 encoded_dictionaries,
307 dictionary_tracker,
308 write_options,
309 dict_id,
310 )?;
311
312 self.encode_dictionaries(
314 values,
315 map_array.values(),
316 encoded_dictionaries,
317 dictionary_tracker,
318 write_options,
319 dict_id,
320 )?;
321 }
322 DataType::Union(fields, _) => {
323 let union = as_union_array(column);
324 for (type_id, field) in fields.iter() {
325 let column = union.child(type_id);
326 self.encode_dictionaries(
327 field,
328 column,
329 encoded_dictionaries,
330 dictionary_tracker,
331 write_options,
332 dict_id,
333 )?;
334 }
335 }
336 _ => (),
337 }
338
339 Ok(())
340 }
341
342 fn encode_dictionaries<I: Iterator<Item = i64>>(
343 &self,
344 field: &Field,
345 column: &ArrayRef,
346 encoded_dictionaries: &mut Vec<EncodedData>,
347 dictionary_tracker: &mut DictionaryTracker,
348 write_options: &IpcWriteOptions,
349 dict_id_seq: &mut I,
350 ) -> Result<(), ArrowError> {
351 match column.data_type() {
352 DataType::Dictionary(_key_type, _value_type) => {
353 let dict_data = column.to_data();
354 let dict_values = &dict_data.child_data()[0];
355
356 let values = make_array(dict_data.child_data()[0].clone());
357
358 self._encode_dictionaries(
359 &values,
360 encoded_dictionaries,
361 dictionary_tracker,
362 write_options,
363 dict_id_seq,
364 )?;
365
366 let dict_id = dict_id_seq.next().ok_or_else(|| {
370 ArrowError::IpcError(format!("no dict id for field {}", field.name()))
371 })?;
372
373 let emit = dictionary_tracker.insert(dict_id, column)?;
374
375 if emit {
376 encoded_dictionaries.push(self.dictionary_batch_to_bytes(
377 dict_id,
378 dict_values,
379 write_options,
380 )?);
381 }
382 }
383 _ => self._encode_dictionaries(
384 column,
385 encoded_dictionaries,
386 dictionary_tracker,
387 write_options,
388 dict_id_seq,
389 )?,
390 }
391
392 Ok(())
393 }
394
395 pub fn encoded_batch(
399 &self,
400 batch: &RecordBatch,
401 dictionary_tracker: &mut DictionaryTracker,
402 write_options: &IpcWriteOptions,
403 ) -> Result<(Vec<EncodedData>, EncodedData), ArrowError> {
404 let schema = batch.schema();
405 let mut encoded_dictionaries = Vec::with_capacity(schema.flattened_fields().len());
406
407 let mut dict_id = dictionary_tracker.dict_ids.clone().into_iter();
408
409 for (i, field) in schema.fields().iter().enumerate() {
410 let column = batch.column(i);
411 self.encode_dictionaries(
412 field,
413 column,
414 &mut encoded_dictionaries,
415 dictionary_tracker,
416 write_options,
417 &mut dict_id,
418 )?;
419 }
420
421 let encoded_message = self.record_batch_to_bytes(batch, write_options)?;
422 Ok((encoded_dictionaries, encoded_message))
423 }
424
425 fn record_batch_to_bytes(
428 &self,
429 batch: &RecordBatch,
430 write_options: &IpcWriteOptions,
431 ) -> Result<EncodedData, ArrowError> {
432 let mut fbb = FlatBufferBuilder::new();
433
434 let mut nodes: Vec<crate::FieldNode> = vec![];
435 let mut buffers: Vec<crate::Buffer> = vec![];
436 let mut arrow_data: Vec<u8> = vec![];
437 let mut offset = 0;
438
439 let batch_compression_type = write_options.batch_compression_type;
441
442 let compression = batch_compression_type.map(|batch_compression_type| {
443 let mut c = crate::BodyCompressionBuilder::new(&mut fbb);
444 c.add_method(crate::BodyCompressionMethod::BUFFER);
445 c.add_codec(batch_compression_type);
446 c.finish()
447 });
448
449 let compression_codec: Option<CompressionCodec> =
450 batch_compression_type.map(TryInto::try_into).transpose()?;
451
452 let mut variadic_buffer_counts = vec![];
453
454 for array in batch.columns() {
455 let array_data = array.to_data();
456 offset = write_array_data(
457 &array_data,
458 &mut buffers,
459 &mut arrow_data,
460 &mut nodes,
461 offset,
462 array.len(),
463 array.null_count(),
464 compression_codec,
465 write_options,
466 )?;
467
468 append_variadic_buffer_counts(&mut variadic_buffer_counts, &array_data);
469 }
470 let len = arrow_data.len();
472 let pad_len = pad_to_alignment(write_options.alignment, len);
473 arrow_data.extend_from_slice(&PADDING[..pad_len]);
474
475 let buffers = fbb.create_vector(&buffers);
477 let nodes = fbb.create_vector(&nodes);
478 let variadic_buffer = if variadic_buffer_counts.is_empty() {
479 None
480 } else {
481 Some(fbb.create_vector(&variadic_buffer_counts))
482 };
483
484 let root = {
485 let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb);
486 batch_builder.add_length(batch.num_rows() as i64);
487 batch_builder.add_nodes(nodes);
488 batch_builder.add_buffers(buffers);
489 if let Some(c) = compression {
490 batch_builder.add_compression(c);
491 }
492
493 if let Some(v) = variadic_buffer {
494 batch_builder.add_variadicBufferCounts(v);
495 }
496 let b = batch_builder.finish();
497 b.as_union_value()
498 };
499 let mut message = crate::MessageBuilder::new(&mut fbb);
501 message.add_version(write_options.metadata_version);
502 message.add_header_type(crate::MessageHeader::RecordBatch);
503 message.add_bodyLength(arrow_data.len() as i64);
504 message.add_header(root);
505 let root = message.finish();
506 fbb.finish(root, None);
507 let finished_data = fbb.finished_data();
508
509 Ok(EncodedData {
510 ipc_message: finished_data.to_vec(),
511 arrow_data,
512 })
513 }
514
515 fn dictionary_batch_to_bytes(
518 &self,
519 dict_id: i64,
520 array_data: &ArrayData,
521 write_options: &IpcWriteOptions,
522 ) -> Result<EncodedData, ArrowError> {
523 let mut fbb = FlatBufferBuilder::new();
524
525 let mut nodes: Vec<crate::FieldNode> = vec![];
526 let mut buffers: Vec<crate::Buffer> = vec![];
527 let mut arrow_data: Vec<u8> = vec![];
528
529 let batch_compression_type = write_options.batch_compression_type;
531
532 let compression = batch_compression_type.map(|batch_compression_type| {
533 let mut c = crate::BodyCompressionBuilder::new(&mut fbb);
534 c.add_method(crate::BodyCompressionMethod::BUFFER);
535 c.add_codec(batch_compression_type);
536 c.finish()
537 });
538
539 let compression_codec: Option<CompressionCodec> = batch_compression_type
540 .map(|batch_compression_type| batch_compression_type.try_into())
541 .transpose()?;
542
543 write_array_data(
544 array_data,
545 &mut buffers,
546 &mut arrow_data,
547 &mut nodes,
548 0,
549 array_data.len(),
550 array_data.null_count(),
551 compression_codec,
552 write_options,
553 )?;
554
555 let mut variadic_buffer_counts = vec![];
556 append_variadic_buffer_counts(&mut variadic_buffer_counts, array_data);
557
558 let len = arrow_data.len();
560 let pad_len = pad_to_alignment(write_options.alignment, len);
561 arrow_data.extend_from_slice(&PADDING[..pad_len]);
562
563 let buffers = fbb.create_vector(&buffers);
565 let nodes = fbb.create_vector(&nodes);
566 let variadic_buffer = if variadic_buffer_counts.is_empty() {
567 None
568 } else {
569 Some(fbb.create_vector(&variadic_buffer_counts))
570 };
571
572 let root = {
573 let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb);
574 batch_builder.add_length(array_data.len() as i64);
575 batch_builder.add_nodes(nodes);
576 batch_builder.add_buffers(buffers);
577 if let Some(c) = compression {
578 batch_builder.add_compression(c);
579 }
580 if let Some(v) = variadic_buffer {
581 batch_builder.add_variadicBufferCounts(v);
582 }
583 batch_builder.finish()
584 };
585
586 let root = {
587 let mut batch_builder = crate::DictionaryBatchBuilder::new(&mut fbb);
588 batch_builder.add_id(dict_id);
589 batch_builder.add_data(root);
590 batch_builder.finish().as_union_value()
591 };
592
593 let root = {
594 let mut message_builder = crate::MessageBuilder::new(&mut fbb);
595 message_builder.add_version(write_options.metadata_version);
596 message_builder.add_header_type(crate::MessageHeader::DictionaryBatch);
597 message_builder.add_bodyLength(arrow_data.len() as i64);
598 message_builder.add_header(root);
599 message_builder.finish()
600 };
601
602 fbb.finish(root, None);
603 let finished_data = fbb.finished_data();
604
605 Ok(EncodedData {
606 ipc_message: finished_data.to_vec(),
607 arrow_data,
608 })
609 }
610}
611
612fn append_variadic_buffer_counts(counts: &mut Vec<i64>, array: &ArrayData) {
613 match array.data_type() {
614 DataType::BinaryView | DataType::Utf8View => {
615 counts.push(array.buffers().len() as i64 - 1);
618 }
619 DataType::Dictionary(_, _) => {
620 }
623 _ => {
624 for child in array.child_data() {
625 append_variadic_buffer_counts(counts, child)
626 }
627 }
628 }
629}
630
631pub(crate) fn unslice_run_array(arr: ArrayData) -> Result<ArrayData, ArrowError> {
632 match arr.data_type() {
633 DataType::RunEndEncoded(k, _) => match k.data_type() {
634 DataType::Int16 => {
635 Ok(into_zero_offset_run_array(RunArray::<Int16Type>::from(arr))?.into_data())
636 }
637 DataType::Int32 => {
638 Ok(into_zero_offset_run_array(RunArray::<Int32Type>::from(arr))?.into_data())
639 }
640 DataType::Int64 => {
641 Ok(into_zero_offset_run_array(RunArray::<Int64Type>::from(arr))?.into_data())
642 }
643 d => unreachable!("Unexpected data type {d}"),
644 },
645 d => Err(ArrowError::InvalidArgumentError(format!(
646 "The given array is not a run array. Data type of given array: {d}"
647 ))),
648 }
649}
650
651fn into_zero_offset_run_array<R: RunEndIndexType>(
654 run_array: RunArray<R>,
655) -> Result<RunArray<R>, ArrowError> {
656 let run_ends = run_array.run_ends();
657 if run_ends.offset() == 0 && run_ends.max_value() == run_ends.len() {
658 return Ok(run_array);
659 }
660
661 let start_physical_index = run_ends.get_start_physical_index();
663
664 let end_physical_index = run_ends.get_end_physical_index();
666
667 let physical_length = end_physical_index - start_physical_index + 1;
668
669 let offset = R::Native::usize_as(run_ends.offset());
671 let mut builder = BufferBuilder::<R::Native>::new(physical_length);
672 for run_end_value in &run_ends.values()[start_physical_index..end_physical_index] {
673 builder.append(run_end_value.sub_wrapping(offset));
674 }
675 builder.append(R::Native::from_usize(run_array.len()).unwrap());
676 let new_run_ends = unsafe {
677 ArrayDataBuilder::new(R::DATA_TYPE)
680 .len(physical_length)
681 .add_buffer(builder.finish())
682 .build_unchecked()
683 };
684
685 let new_values = run_array
687 .values()
688 .slice(start_physical_index, physical_length)
689 .into_data();
690
691 let builder = ArrayDataBuilder::new(run_array.data_type().clone())
692 .len(run_array.len())
693 .add_child_data(new_run_ends)
694 .add_child_data(new_values);
695 let array_data = unsafe {
696 builder.build_unchecked()
699 };
700 Ok(array_data.into())
701}
702
703#[derive(Debug)]
709pub struct DictionaryTracker {
710 written: HashMap<i64, ArrayData>,
711 dict_ids: Vec<i64>,
712 error_on_replacement: bool,
713}
714
715impl DictionaryTracker {
716 pub fn new(error_on_replacement: bool) -> Self {
727 #[allow(deprecated)]
728 Self {
729 written: HashMap::new(),
730 dict_ids: Vec::new(),
731 error_on_replacement,
732 }
733 }
734
735 pub fn next_dict_id(&mut self) -> i64 {
737 let next = self
738 .dict_ids
739 .last()
740 .copied()
741 .map(|i| i + 1)
742 .unwrap_or_default();
743
744 self.dict_ids.push(next);
745 next
746 }
747
748 pub fn dict_id(&mut self) -> &[i64] {
751 &self.dict_ids
752 }
753
754 pub fn insert(&mut self, dict_id: i64, column: &ArrayRef) -> Result<bool, ArrowError> {
764 let dict_data = column.to_data();
765 let dict_values = &dict_data.child_data()[0];
766
767 if let Some(last) = self.written.get(&dict_id) {
769 if ArrayData::ptr_eq(&last.child_data()[0], dict_values) {
770 return Ok(false);
772 }
773 if self.error_on_replacement {
774 if last.child_data()[0] == *dict_values {
776 return Ok(false);
778 }
779 return Err(ArrowError::InvalidArgumentError(
780 "Dictionary replacement detected when writing IPC file format. \
781 Arrow IPC files only support a single dictionary for a given field \
782 across all batches."
783 .to_string(),
784 ));
785 }
786 }
787
788 self.written.insert(dict_id, dict_data);
789 Ok(true)
790 }
791}
792
793pub struct FileWriter<W> {
816 writer: W,
818 write_options: IpcWriteOptions,
820 schema: SchemaRef,
822 block_offsets: usize,
824 dictionary_blocks: Vec<crate::Block>,
826 record_blocks: Vec<crate::Block>,
828 finished: bool,
830 dictionary_tracker: DictionaryTracker,
832 custom_metadata: HashMap<String, String>,
834
835 data_gen: IpcDataGenerator,
836}
837
838impl<W: Write> FileWriter<BufWriter<W>> {
839 pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
843 Self::try_new(BufWriter::new(writer), schema)
844 }
845}
846
847impl<W: Write> FileWriter<W> {
848 pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
856 let write_options = IpcWriteOptions::default();
857 Self::try_new_with_options(writer, schema, write_options)
858 }
859
860 pub fn try_new_with_options(
868 mut writer: W,
869 schema: &Schema,
870 write_options: IpcWriteOptions,
871 ) -> Result<Self, ArrowError> {
872 let data_gen = IpcDataGenerator::default();
873 let pad_len = pad_to_alignment(write_options.alignment, super::ARROW_MAGIC.len());
875 let header_size = super::ARROW_MAGIC.len() + pad_len;
876 writer.write_all(&super::ARROW_MAGIC)?;
877 writer.write_all(&PADDING[..pad_len])?;
878 let mut dictionary_tracker = DictionaryTracker::new(true);
880 let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker(
881 schema,
882 &mut dictionary_tracker,
883 &write_options,
884 );
885 let (meta, data) = write_message(&mut writer, encoded_message, &write_options)?;
886 Ok(Self {
887 writer,
888 write_options,
889 schema: Arc::new(schema.clone()),
890 block_offsets: meta + data + header_size,
891 dictionary_blocks: vec![],
892 record_blocks: vec![],
893 finished: false,
894 dictionary_tracker,
895 custom_metadata: HashMap::new(),
896 data_gen,
897 })
898 }
899
900 pub fn write_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
902 self.custom_metadata.insert(key.into(), value.into());
903 }
904
905 pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
907 if self.finished {
908 return Err(ArrowError::IpcError(
909 "Cannot write record batch to file writer as it is closed".to_string(),
910 ));
911 }
912
913 let (encoded_dictionaries, encoded_message) = self.data_gen.encoded_batch(
914 batch,
915 &mut self.dictionary_tracker,
916 &self.write_options,
917 )?;
918
919 for encoded_dictionary in encoded_dictionaries {
920 let (meta, data) =
921 write_message(&mut self.writer, encoded_dictionary, &self.write_options)?;
922
923 let block = crate::Block::new(self.block_offsets as i64, meta as i32, data as i64);
924 self.dictionary_blocks.push(block);
925 self.block_offsets += meta + data;
926 }
927
928 let (meta, data) = write_message(&mut self.writer, encoded_message, &self.write_options)?;
929 let block = crate::Block::new(
931 self.block_offsets as i64,
932 meta as i32, data as i64,
934 );
935 self.record_blocks.push(block);
936 self.block_offsets += meta + data;
937 Ok(())
938 }
939
940 pub fn finish(&mut self) -> Result<(), ArrowError> {
942 if self.finished {
943 return Err(ArrowError::IpcError(
944 "Cannot write footer to file writer as it is closed".to_string(),
945 ));
946 }
947
948 write_continuation(&mut self.writer, &self.write_options, 0)?;
950
951 let mut fbb = FlatBufferBuilder::new();
952 let dictionaries = fbb.create_vector(&self.dictionary_blocks);
953 let record_batches = fbb.create_vector(&self.record_blocks);
954 let mut dictionary_tracker = DictionaryTracker::new(true);
955 let schema = IpcSchemaEncoder::new()
956 .with_dictionary_tracker(&mut dictionary_tracker)
957 .schema_to_fb_offset(&mut fbb, &self.schema);
958 let fb_custom_metadata = (!self.custom_metadata.is_empty())
959 .then(|| crate::convert::metadata_to_fb(&mut fbb, &self.custom_metadata));
960
961 let root = {
962 let mut footer_builder = crate::FooterBuilder::new(&mut fbb);
963 footer_builder.add_version(self.write_options.metadata_version);
964 footer_builder.add_schema(schema);
965 footer_builder.add_dictionaries(dictionaries);
966 footer_builder.add_recordBatches(record_batches);
967 if let Some(fb_custom_metadata) = fb_custom_metadata {
968 footer_builder.add_custom_metadata(fb_custom_metadata);
969 }
970 footer_builder.finish()
971 };
972 fbb.finish(root, None);
973 let footer_data = fbb.finished_data();
974 self.writer.write_all(footer_data)?;
975 self.writer
976 .write_all(&(footer_data.len() as i32).to_le_bytes())?;
977 self.writer.write_all(&super::ARROW_MAGIC)?;
978 self.writer.flush()?;
979 self.finished = true;
980
981 Ok(())
982 }
983
984 pub fn schema(&self) -> &SchemaRef {
986 &self.schema
987 }
988
989 pub fn get_ref(&self) -> &W {
991 &self.writer
992 }
993
994 pub fn get_mut(&mut self) -> &mut W {
998 &mut self.writer
999 }
1000
1001 pub fn flush(&mut self) -> Result<(), ArrowError> {
1005 self.writer.flush()?;
1006 Ok(())
1007 }
1008
1009 pub fn into_inner(mut self) -> Result<W, ArrowError> {
1018 if !self.finished {
1019 self.finish()?;
1021 }
1022 Ok(self.writer)
1023 }
1024}
1025
1026impl<W: Write> RecordBatchWriter for FileWriter<W> {
1027 fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1028 self.write(batch)
1029 }
1030
1031 fn close(mut self) -> Result<(), ArrowError> {
1032 self.finish()
1033 }
1034}
1035
1036pub struct StreamWriter<W> {
1060 writer: W,
1062 write_options: IpcWriteOptions,
1064 finished: bool,
1066 dictionary_tracker: DictionaryTracker,
1068
1069 data_gen: IpcDataGenerator,
1070}
1071
1072impl<W: Write> StreamWriter<BufWriter<W>> {
1073 pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1077 Self::try_new(BufWriter::new(writer), schema)
1078 }
1079}
1080
1081impl<W: Write> StreamWriter<W> {
1082 pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
1090 let write_options = IpcWriteOptions::default();
1091 Self::try_new_with_options(writer, schema, write_options)
1092 }
1093
1094 pub fn try_new_with_options(
1100 mut writer: W,
1101 schema: &Schema,
1102 write_options: IpcWriteOptions,
1103 ) -> Result<Self, ArrowError> {
1104 let data_gen = IpcDataGenerator::default();
1105 let mut dictionary_tracker = DictionaryTracker::new(false);
1106
1107 let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker(
1109 schema,
1110 &mut dictionary_tracker,
1111 &write_options,
1112 );
1113 write_message(&mut writer, encoded_message, &write_options)?;
1114 Ok(Self {
1115 writer,
1116 write_options,
1117 finished: false,
1118 dictionary_tracker,
1119 data_gen,
1120 })
1121 }
1122
1123 pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1125 if self.finished {
1126 return Err(ArrowError::IpcError(
1127 "Cannot write record batch to stream writer as it is closed".to_string(),
1128 ));
1129 }
1130
1131 let (encoded_dictionaries, encoded_message) = self
1132 .data_gen
1133 .encoded_batch(batch, &mut self.dictionary_tracker, &self.write_options)
1134 .expect("StreamWriter is configured to not error on dictionary replacement");
1135
1136 for encoded_dictionary in encoded_dictionaries {
1137 write_message(&mut self.writer, encoded_dictionary, &self.write_options)?;
1138 }
1139
1140 write_message(&mut self.writer, encoded_message, &self.write_options)?;
1141 Ok(())
1142 }
1143
1144 pub fn finish(&mut self) -> Result<(), ArrowError> {
1146 if self.finished {
1147 return Err(ArrowError::IpcError(
1148 "Cannot write footer to stream writer as it is closed".to_string(),
1149 ));
1150 }
1151
1152 write_continuation(&mut self.writer, &self.write_options, 0)?;
1153
1154 self.finished = true;
1155
1156 Ok(())
1157 }
1158
1159 pub fn get_ref(&self) -> &W {
1161 &self.writer
1162 }
1163
1164 pub fn get_mut(&mut self) -> &mut W {
1168 &mut self.writer
1169 }
1170
1171 pub fn flush(&mut self) -> Result<(), ArrowError> {
1175 self.writer.flush()?;
1176 Ok(())
1177 }
1178
1179 pub fn into_inner(mut self) -> Result<W, ArrowError> {
1217 if !self.finished {
1218 self.finish()?;
1220 }
1221 Ok(self.writer)
1222 }
1223}
1224
1225impl<W: Write> RecordBatchWriter for StreamWriter<W> {
1226 fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
1227 self.write(batch)
1228 }
1229
1230 fn close(mut self) -> Result<(), ArrowError> {
1231 self.finish()
1232 }
1233}
1234
1235pub struct EncodedData {
1237 pub ipc_message: Vec<u8>,
1239 pub arrow_data: Vec<u8>,
1241}
1242pub fn write_message<W: Write>(
1244 mut writer: W,
1245 encoded: EncodedData,
1246 write_options: &IpcWriteOptions,
1247) -> Result<(usize, usize), ArrowError> {
1248 let arrow_data_len = encoded.arrow_data.len();
1249 if arrow_data_len % usize::from(write_options.alignment) != 0 {
1250 return Err(ArrowError::MemoryError(
1251 "Arrow data not aligned".to_string(),
1252 ));
1253 }
1254
1255 let a = usize::from(write_options.alignment - 1);
1256 let buffer = encoded.ipc_message;
1257 let flatbuf_size = buffer.len();
1258 let prefix_size = if write_options.write_legacy_ipc_format {
1259 4
1260 } else {
1261 8
1262 };
1263 let aligned_size = (flatbuf_size + prefix_size + a) & !a;
1264 let padding_bytes = aligned_size - flatbuf_size - prefix_size;
1265
1266 write_continuation(
1267 &mut writer,
1268 write_options,
1269 (aligned_size - prefix_size) as i32,
1270 )?;
1271
1272 if flatbuf_size > 0 {
1274 writer.write_all(&buffer)?;
1275 }
1276 writer.write_all(&PADDING[..padding_bytes])?;
1278
1279 let body_len = if arrow_data_len > 0 {
1281 write_body_buffers(&mut writer, &encoded.arrow_data, write_options.alignment)?
1282 } else {
1283 0
1284 };
1285
1286 Ok((aligned_size, body_len))
1287}
1288
1289fn write_body_buffers<W: Write>(
1290 mut writer: W,
1291 data: &[u8],
1292 alignment: u8,
1293) -> Result<usize, ArrowError> {
1294 let len = data.len();
1295 let pad_len = pad_to_alignment(alignment, len);
1296 let total_len = len + pad_len;
1297
1298 writer.write_all(data)?;
1300 if pad_len > 0 {
1301 writer.write_all(&PADDING[..pad_len])?;
1302 }
1303
1304 writer.flush()?;
1305 Ok(total_len)
1306}
1307
1308fn write_continuation<W: Write>(
1311 mut writer: W,
1312 write_options: &IpcWriteOptions,
1313 total_len: i32,
1314) -> Result<usize, ArrowError> {
1315 let mut written = 8;
1316
1317 match write_options.metadata_version {
1319 crate::MetadataVersion::V1 | crate::MetadataVersion::V2 | crate::MetadataVersion::V3 => {
1320 unreachable!("Options with the metadata version cannot be created")
1321 }
1322 crate::MetadataVersion::V4 => {
1323 if !write_options.write_legacy_ipc_format {
1324 writer.write_all(&CONTINUATION_MARKER)?;
1326 written = 4;
1327 }
1328 writer.write_all(&total_len.to_le_bytes()[..])?;
1329 }
1330 crate::MetadataVersion::V5 => {
1331 writer.write_all(&CONTINUATION_MARKER)?;
1333 writer.write_all(&total_len.to_le_bytes()[..])?;
1334 }
1335 z => panic!("Unsupported crate::MetadataVersion {z:?}"),
1336 };
1337
1338 writer.flush()?;
1339
1340 Ok(written)
1341}
1342
1343fn has_validity_bitmap(data_type: &DataType, write_options: &IpcWriteOptions) -> bool {
1347 if write_options.metadata_version < crate::MetadataVersion::V5 {
1348 !matches!(data_type, DataType::Null)
1349 } else {
1350 !matches!(
1351 data_type,
1352 DataType::Null | DataType::Union(_, _) | DataType::RunEndEncoded(_, _)
1353 )
1354 }
1355}
1356
1357#[inline]
1359fn buffer_need_truncate(
1360 array_offset: usize,
1361 buffer: &Buffer,
1362 spec: &BufferSpec,
1363 min_length: usize,
1364) -> bool {
1365 spec != &BufferSpec::AlwaysNull && (array_offset != 0 || min_length < buffer.len())
1366}
1367
1368#[inline]
1370fn get_buffer_element_width(spec: &BufferSpec) -> usize {
1371 match spec {
1372 BufferSpec::FixedWidth { byte_width, .. } => *byte_width,
1373 _ => 0,
1374 }
1375}
1376
1377fn reencode_offsets<O: OffsetSizeTrait>(
1380 offsets: &Buffer,
1381 data: &ArrayData,
1382) -> (Buffer, usize, usize) {
1383 let offsets_slice: &[O] = offsets.typed_data::<O>();
1384 let offset_slice = &offsets_slice[data.offset()..data.offset() + data.len() + 1];
1385
1386 let start_offset = offset_slice.first().unwrap();
1387 let end_offset = offset_slice.last().unwrap();
1388
1389 let offsets = match start_offset.as_usize() {
1390 0 => {
1391 let size = size_of::<O>();
1392 offsets.slice_with_length(data.offset() * size, (data.len() + 1) * size)
1393 }
1394 _ => offset_slice.iter().map(|x| *x - *start_offset).collect(),
1395 };
1396
1397 let start_offset = start_offset.as_usize();
1398 let end_offset = end_offset.as_usize();
1399
1400 (offsets, start_offset, end_offset - start_offset)
1401}
1402
1403fn get_byte_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, Buffer) {
1409 if data.is_empty() {
1410 return (MutableBuffer::new(0).into(), MutableBuffer::new(0).into());
1411 }
1412
1413 let (offsets, original_start_offset, len) = reencode_offsets::<O>(&data.buffers()[0], data);
1414 let values = data.buffers()[1].slice_with_length(original_start_offset, len);
1415 (offsets, values)
1416}
1417
1418fn get_list_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, ArrayData) {
1421 if data.is_empty() {
1422 return (
1423 MutableBuffer::new(0).into(),
1424 data.child_data()[0].slice(0, 0),
1425 );
1426 }
1427
1428 let (offsets, original_start_offset, len) = reencode_offsets::<O>(&data.buffers()[0], data);
1429 let child_data = data.child_data()[0].slice(original_start_offset, len);
1430 (offsets, child_data)
1431}
1432
1433#[allow(clippy::too_many_arguments)]
1435fn write_array_data(
1436 array_data: &ArrayData,
1437 buffers: &mut Vec<crate::Buffer>,
1438 arrow_data: &mut Vec<u8>,
1439 nodes: &mut Vec<crate::FieldNode>,
1440 offset: i64,
1441 num_rows: usize,
1442 null_count: usize,
1443 compression_codec: Option<CompressionCodec>,
1444 write_options: &IpcWriteOptions,
1445) -> Result<i64, ArrowError> {
1446 let mut offset = offset;
1447 if !matches!(array_data.data_type(), DataType::Null) {
1448 nodes.push(crate::FieldNode::new(num_rows as i64, null_count as i64));
1449 } else {
1450 nodes.push(crate::FieldNode::new(num_rows as i64, num_rows as i64));
1453 }
1454 if has_validity_bitmap(array_data.data_type(), write_options) {
1455 let null_buffer = match array_data.nulls() {
1457 None => {
1458 let num_bytes = bit_util::ceil(num_rows, 8);
1460 let buffer = MutableBuffer::new(num_bytes);
1461 let buffer = buffer.with_bitset(num_bytes, true);
1462 buffer.into()
1463 }
1464 Some(buffer) => buffer.inner().sliced(),
1465 };
1466
1467 offset = write_buffer(
1468 null_buffer.as_slice(),
1469 buffers,
1470 arrow_data,
1471 offset,
1472 compression_codec,
1473 write_options.alignment,
1474 )?;
1475 }
1476
1477 let data_type = array_data.data_type();
1478 if matches!(data_type, DataType::Binary | DataType::Utf8) {
1479 let (offsets, values) = get_byte_array_buffers::<i32>(array_data);
1480 for buffer in [offsets, values] {
1481 offset = write_buffer(
1482 buffer.as_slice(),
1483 buffers,
1484 arrow_data,
1485 offset,
1486 compression_codec,
1487 write_options.alignment,
1488 )?;
1489 }
1490 } else if matches!(data_type, DataType::BinaryView | DataType::Utf8View) {
1491 for buffer in array_data.buffers() {
1498 offset = write_buffer(
1499 buffer.as_slice(),
1500 buffers,
1501 arrow_data,
1502 offset,
1503 compression_codec,
1504 write_options.alignment,
1505 )?;
1506 }
1507 } else if matches!(data_type, DataType::LargeBinary | DataType::LargeUtf8) {
1508 let (offsets, values) = get_byte_array_buffers::<i64>(array_data);
1509 for buffer in [offsets, values] {
1510 offset = write_buffer(
1511 buffer.as_slice(),
1512 buffers,
1513 arrow_data,
1514 offset,
1515 compression_codec,
1516 write_options.alignment,
1517 )?;
1518 }
1519 } else if DataType::is_numeric(data_type)
1520 || DataType::is_temporal(data_type)
1521 || matches!(
1522 array_data.data_type(),
1523 DataType::FixedSizeBinary(_) | DataType::Dictionary(_, _)
1524 )
1525 {
1526 assert_eq!(array_data.buffers().len(), 1);
1528
1529 let buffer = &array_data.buffers()[0];
1530 let layout = layout(data_type);
1531 let spec = &layout.buffers[0];
1532
1533 let byte_width = get_buffer_element_width(spec);
1534 let min_length = array_data.len() * byte_width;
1535 let buffer_slice = if buffer_need_truncate(array_data.offset(), buffer, spec, min_length) {
1536 let byte_offset = array_data.offset() * byte_width;
1537 let buffer_length = min(min_length, buffer.len() - byte_offset);
1538 &buffer.as_slice()[byte_offset..(byte_offset + buffer_length)]
1539 } else {
1540 buffer.as_slice()
1541 };
1542 offset = write_buffer(
1543 buffer_slice,
1544 buffers,
1545 arrow_data,
1546 offset,
1547 compression_codec,
1548 write_options.alignment,
1549 )?;
1550 } else if matches!(data_type, DataType::Boolean) {
1551 assert_eq!(array_data.buffers().len(), 1);
1554
1555 let buffer = &array_data.buffers()[0];
1556 let buffer = buffer.bit_slice(array_data.offset(), array_data.len());
1557 offset = write_buffer(
1558 &buffer,
1559 buffers,
1560 arrow_data,
1561 offset,
1562 compression_codec,
1563 write_options.alignment,
1564 )?;
1565 } else if matches!(
1566 data_type,
1567 DataType::List(_) | DataType::LargeList(_) | DataType::Map(_, _)
1568 ) {
1569 assert_eq!(array_data.buffers().len(), 1);
1570 assert_eq!(array_data.child_data().len(), 1);
1571
1572 let (offsets, sliced_child_data) = match data_type {
1574 DataType::List(_) => get_list_array_buffers::<i32>(array_data),
1575 DataType::Map(_, _) => get_list_array_buffers::<i32>(array_data),
1576 DataType::LargeList(_) => get_list_array_buffers::<i64>(array_data),
1577 _ => unreachable!(),
1578 };
1579 offset = write_buffer(
1580 offsets.as_slice(),
1581 buffers,
1582 arrow_data,
1583 offset,
1584 compression_codec,
1585 write_options.alignment,
1586 )?;
1587 offset = write_array_data(
1588 &sliced_child_data,
1589 buffers,
1590 arrow_data,
1591 nodes,
1592 offset,
1593 sliced_child_data.len(),
1594 sliced_child_data.null_count(),
1595 compression_codec,
1596 write_options,
1597 )?;
1598 return Ok(offset);
1599 } else if let DataType::FixedSizeList(_, fixed_size) = data_type {
1600 assert_eq!(array_data.child_data().len(), 1);
1601 let fixed_size = *fixed_size as usize;
1602
1603 let child_offset = array_data.offset() * fixed_size;
1604 let child_length = array_data.len() * fixed_size;
1605 let child_data = array_data.child_data()[0].slice(child_offset, child_length);
1606
1607 offset = write_array_data(
1608 &child_data,
1609 buffers,
1610 arrow_data,
1611 nodes,
1612 offset,
1613 child_data.len(),
1614 child_data.null_count(),
1615 compression_codec,
1616 write_options,
1617 )?;
1618 return Ok(offset);
1619 } else {
1620 for buffer in array_data.buffers() {
1621 offset = write_buffer(
1622 buffer,
1623 buffers,
1624 arrow_data,
1625 offset,
1626 compression_codec,
1627 write_options.alignment,
1628 )?;
1629 }
1630 }
1631
1632 match array_data.data_type() {
1633 DataType::Dictionary(_, _) => {}
1634 DataType::RunEndEncoded(_, _) => {
1635 let arr = unslice_run_array(array_data.clone())?;
1637 for data_ref in arr.child_data() {
1639 offset = write_array_data(
1641 data_ref,
1642 buffers,
1643 arrow_data,
1644 nodes,
1645 offset,
1646 data_ref.len(),
1647 data_ref.null_count(),
1648 compression_codec,
1649 write_options,
1650 )?;
1651 }
1652 }
1653 _ => {
1654 for data_ref in array_data.child_data() {
1656 offset = write_array_data(
1658 data_ref,
1659 buffers,
1660 arrow_data,
1661 nodes,
1662 offset,
1663 data_ref.len(),
1664 data_ref.null_count(),
1665 compression_codec,
1666 write_options,
1667 )?;
1668 }
1669 }
1670 }
1671 Ok(offset)
1672}
1673
1674fn write_buffer(
1687 buffer: &[u8], buffers: &mut Vec<crate::Buffer>, arrow_data: &mut Vec<u8>, offset: i64, compression_codec: Option<CompressionCodec>,
1692 alignment: u8,
1693) -> Result<i64, ArrowError> {
1694 let len: i64 = match compression_codec {
1695 Some(compressor) => compressor.compress_to_vec(buffer, arrow_data)?,
1696 None => {
1697 arrow_data.extend_from_slice(buffer);
1698 buffer.len()
1699 }
1700 }
1701 .try_into()
1702 .map_err(|e| {
1703 ArrowError::InvalidArgumentError(format!("Could not convert compressed size to i64: {e}"))
1704 })?;
1705
1706 buffers.push(crate::Buffer::new(offset, len));
1708 let pad_len = pad_to_alignment(alignment, len as usize);
1710 arrow_data.extend_from_slice(&PADDING[..pad_len]);
1711
1712 Ok(offset + len + (pad_len as i64))
1713}
1714
1715const PADDING: [u8; 64] = [0; 64];
1716
1717#[inline]
1719fn pad_to_alignment(alignment: u8, len: usize) -> usize {
1720 let a = usize::from(alignment - 1);
1721 ((len + a) & !a) - len
1722}
1723
1724#[cfg(test)]
1725mod tests {
1726 use std::hash::Hasher;
1727 use std::io::Cursor;
1728 use std::io::Seek;
1729
1730 use arrow_array::builder::FixedSizeListBuilder;
1731 use arrow_array::builder::Float32Builder;
1732 use arrow_array::builder::Int64Builder;
1733 use arrow_array::builder::MapBuilder;
1734 use arrow_array::builder::UnionBuilder;
1735 use arrow_array::builder::{GenericListBuilder, ListBuilder, StringBuilder};
1736 use arrow_array::builder::{PrimitiveRunBuilder, UInt32Builder};
1737 use arrow_array::types::*;
1738 use arrow_buffer::ScalarBuffer;
1739
1740 use crate::convert::fb_to_schema;
1741 use crate::reader::*;
1742 use crate::root_as_footer;
1743 use crate::MetadataVersion;
1744
1745 use super::*;
1746
1747 fn serialize_file(rb: &RecordBatch) -> Vec<u8> {
1748 let mut writer = FileWriter::try_new(vec![], rb.schema_ref()).unwrap();
1749 writer.write(rb).unwrap();
1750 writer.finish().unwrap();
1751 writer.into_inner().unwrap()
1752 }
1753
1754 fn deserialize_file(bytes: Vec<u8>) -> RecordBatch {
1755 let mut reader = FileReader::try_new(Cursor::new(bytes), None).unwrap();
1756 reader.next().unwrap().unwrap()
1757 }
1758
1759 fn serialize_stream(record: &RecordBatch) -> Vec<u8> {
1760 const IPC_ALIGNMENT: usize = 8;
1764
1765 let mut stream_writer = StreamWriter::try_new_with_options(
1766 vec![],
1767 record.schema_ref(),
1768 IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
1769 )
1770 .unwrap();
1771 stream_writer.write(record).unwrap();
1772 stream_writer.finish().unwrap();
1773 stream_writer.into_inner().unwrap()
1774 }
1775
1776 fn deserialize_stream(bytes: Vec<u8>) -> RecordBatch {
1777 let mut stream_reader = StreamReader::try_new(Cursor::new(bytes), None).unwrap();
1778 stream_reader.next().unwrap().unwrap()
1779 }
1780
1781 #[test]
1782 #[cfg(feature = "lz4")]
1783 fn test_write_empty_record_batch_lz4_compression() {
1784 let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
1785 let values: Vec<Option<i32>> = vec![];
1786 let array = Int32Array::from(values);
1787 let record_batch =
1788 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
1789
1790 let mut file = tempfile::tempfile().unwrap();
1791
1792 {
1793 let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
1794 .unwrap()
1795 .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
1796 .unwrap();
1797
1798 let mut writer =
1799 FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
1800 writer.write(&record_batch).unwrap();
1801 writer.finish().unwrap();
1802 }
1803 file.rewind().unwrap();
1804 {
1805 let reader = FileReader::try_new(file, None).unwrap();
1807 for read_batch in reader {
1808 read_batch
1809 .unwrap()
1810 .columns()
1811 .iter()
1812 .zip(record_batch.columns())
1813 .for_each(|(a, b)| {
1814 assert_eq!(a.data_type(), b.data_type());
1815 assert_eq!(a.len(), b.len());
1816 assert_eq!(a.null_count(), b.null_count());
1817 });
1818 }
1819 }
1820 }
1821
1822 #[test]
1823 #[cfg(feature = "lz4")]
1824 fn test_write_file_with_lz4_compression() {
1825 let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
1826 let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
1827 let array = Int32Array::from(values);
1828 let record_batch =
1829 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
1830
1831 let mut file = tempfile::tempfile().unwrap();
1832 {
1833 let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
1834 .unwrap()
1835 .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
1836 .unwrap();
1837
1838 let mut writer =
1839 FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
1840 writer.write(&record_batch).unwrap();
1841 writer.finish().unwrap();
1842 }
1843 file.rewind().unwrap();
1844 {
1845 let reader = FileReader::try_new(file, None).unwrap();
1847 for read_batch in reader {
1848 read_batch
1849 .unwrap()
1850 .columns()
1851 .iter()
1852 .zip(record_batch.columns())
1853 .for_each(|(a, b)| {
1854 assert_eq!(a.data_type(), b.data_type());
1855 assert_eq!(a.len(), b.len());
1856 assert_eq!(a.null_count(), b.null_count());
1857 });
1858 }
1859 }
1860 }
1861
1862 #[test]
1863 #[cfg(feature = "zstd")]
1864 fn test_write_file_with_zstd_compression() {
1865 let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
1866 let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
1867 let array = Int32Array::from(values);
1868 let record_batch =
1869 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap();
1870 let mut file = tempfile::tempfile().unwrap();
1871 {
1872 let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
1873 .unwrap()
1874 .try_with_compression(Some(crate::CompressionType::ZSTD))
1875 .unwrap();
1876
1877 let mut writer =
1878 FileWriter::try_new_with_options(&mut file, &schema, write_option).unwrap();
1879 writer.write(&record_batch).unwrap();
1880 writer.finish().unwrap();
1881 }
1882 file.rewind().unwrap();
1883 {
1884 let reader = FileReader::try_new(file, None).unwrap();
1886 for read_batch in reader {
1887 read_batch
1888 .unwrap()
1889 .columns()
1890 .iter()
1891 .zip(record_batch.columns())
1892 .for_each(|(a, b)| {
1893 assert_eq!(a.data_type(), b.data_type());
1894 assert_eq!(a.len(), b.len());
1895 assert_eq!(a.null_count(), b.null_count());
1896 });
1897 }
1898 }
1899 }
1900
1901 #[test]
1902 fn test_write_file() {
1903 let schema = Schema::new(vec![Field::new("field1", DataType::UInt32, true)]);
1904 let values: Vec<Option<u32>> = vec![
1905 Some(999),
1906 None,
1907 Some(235),
1908 Some(123),
1909 None,
1910 None,
1911 None,
1912 None,
1913 None,
1914 ];
1915 let array1 = UInt32Array::from(values);
1916 let batch =
1917 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array1) as ArrayRef])
1918 .unwrap();
1919 let mut file = tempfile::tempfile().unwrap();
1920 {
1921 let mut writer = FileWriter::try_new(&mut file, &schema).unwrap();
1922
1923 writer.write(&batch).unwrap();
1924 writer.finish().unwrap();
1925 }
1926 file.rewind().unwrap();
1927
1928 {
1929 let mut reader = FileReader::try_new(file, None).unwrap();
1930 while let Some(Ok(read_batch)) = reader.next() {
1931 read_batch
1932 .columns()
1933 .iter()
1934 .zip(batch.columns())
1935 .for_each(|(a, b)| {
1936 assert_eq!(a.data_type(), b.data_type());
1937 assert_eq!(a.len(), b.len());
1938 assert_eq!(a.null_count(), b.null_count());
1939 });
1940 }
1941 }
1942 }
1943
1944 fn write_null_file(options: IpcWriteOptions) {
1945 let schema = Schema::new(vec![
1946 Field::new("nulls", DataType::Null, true),
1947 Field::new("int32s", DataType::Int32, false),
1948 Field::new("nulls2", DataType::Null, true),
1949 Field::new("f64s", DataType::Float64, false),
1950 ]);
1951 let array1 = NullArray::new(32);
1952 let array2 = Int32Array::from(vec![1; 32]);
1953 let array3 = NullArray::new(32);
1954 let array4 = Float64Array::from(vec![f64::NAN; 32]);
1955 let batch = RecordBatch::try_new(
1956 Arc::new(schema.clone()),
1957 vec![
1958 Arc::new(array1) as ArrayRef,
1959 Arc::new(array2) as ArrayRef,
1960 Arc::new(array3) as ArrayRef,
1961 Arc::new(array4) as ArrayRef,
1962 ],
1963 )
1964 .unwrap();
1965 let mut file = tempfile::tempfile().unwrap();
1966 {
1967 let mut writer = FileWriter::try_new_with_options(&mut file, &schema, options).unwrap();
1968
1969 writer.write(&batch).unwrap();
1970 writer.finish().unwrap();
1971 }
1972
1973 file.rewind().unwrap();
1974
1975 {
1976 let reader = FileReader::try_new(file, None).unwrap();
1977 reader.for_each(|maybe_batch| {
1978 maybe_batch
1979 .unwrap()
1980 .columns()
1981 .iter()
1982 .zip(batch.columns())
1983 .for_each(|(a, b)| {
1984 assert_eq!(a.data_type(), b.data_type());
1985 assert_eq!(a.len(), b.len());
1986 assert_eq!(a.null_count(), b.null_count());
1987 });
1988 });
1989 }
1990 }
1991 #[test]
1992 fn test_write_null_file_v4() {
1993 write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap());
1994 write_null_file(IpcWriteOptions::try_new(8, true, MetadataVersion::V4).unwrap());
1995 write_null_file(IpcWriteOptions::try_new(64, false, MetadataVersion::V4).unwrap());
1996 write_null_file(IpcWriteOptions::try_new(64, true, MetadataVersion::V4).unwrap());
1997 }
1998
1999 #[test]
2000 fn test_write_null_file_v5() {
2001 write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap());
2002 write_null_file(IpcWriteOptions::try_new(64, false, MetadataVersion::V5).unwrap());
2003 }
2004
2005 #[test]
2006 fn track_union_nested_dict() {
2007 let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2008
2009 let array = Arc::new(inner) as ArrayRef;
2010
2011 #[allow(deprecated)]
2013 let dctfield = Field::new_dict("dict", array.data_type().clone(), false, 0, false);
2014 let union_fields = [(0, Arc::new(dctfield))].into_iter().collect();
2015
2016 let types = [0, 0, 0].into_iter().collect::<ScalarBuffer<i8>>();
2017 let offsets = [0, 1, 2].into_iter().collect::<ScalarBuffer<i32>>();
2018
2019 let union = UnionArray::try_new(union_fields, types, Some(offsets), vec![array]).unwrap();
2020
2021 let schema = Arc::new(Schema::new(vec![Field::new(
2022 "union",
2023 union.data_type().clone(),
2024 false,
2025 )]));
2026
2027 let gen = IpcDataGenerator {};
2028 let mut dict_tracker = DictionaryTracker::new(false);
2029 gen.schema_to_bytes_with_dictionary_tracker(
2030 &schema,
2031 &mut dict_tracker,
2032 &IpcWriteOptions::default(),
2033 );
2034
2035 let batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap();
2036
2037 gen.encoded_batch(&batch, &mut dict_tracker, &Default::default())
2038 .unwrap();
2039
2040 assert!(dict_tracker.written.contains_key(&0));
2043 }
2044
2045 #[test]
2046 fn track_struct_nested_dict() {
2047 let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
2048
2049 let array = Arc::new(inner) as ArrayRef;
2050
2051 #[allow(deprecated)]
2053 let dctfield = Arc::new(Field::new_dict(
2054 "dict",
2055 array.data_type().clone(),
2056 false,
2057 2,
2058 false,
2059 ));
2060
2061 let s = StructArray::from(vec![(dctfield, array)]);
2062 let struct_array = Arc::new(s) as ArrayRef;
2063
2064 let schema = Arc::new(Schema::new(vec![Field::new(
2065 "struct",
2066 struct_array.data_type().clone(),
2067 false,
2068 )]));
2069
2070 let gen = IpcDataGenerator {};
2071 let mut dict_tracker = DictionaryTracker::new(false);
2072 gen.schema_to_bytes_with_dictionary_tracker(
2073 &schema,
2074 &mut dict_tracker,
2075 &IpcWriteOptions::default(),
2076 );
2077
2078 let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap();
2079
2080 gen.encoded_batch(&batch, &mut dict_tracker, &Default::default())
2081 .unwrap();
2082
2083 assert!(dict_tracker.written.contains_key(&0));
2084 }
2085
2086 fn write_union_file(options: IpcWriteOptions) {
2087 let schema = Schema::new(vec![Field::new_union(
2088 "union",
2089 vec![0, 1],
2090 vec![
2091 Field::new("a", DataType::Int32, false),
2092 Field::new("c", DataType::Float64, false),
2093 ],
2094 UnionMode::Sparse,
2095 )]);
2096 let mut builder = UnionBuilder::with_capacity_sparse(5);
2097 builder.append::<Int32Type>("a", 1).unwrap();
2098 builder.append_null::<Int32Type>("a").unwrap();
2099 builder.append::<Float64Type>("c", 3.0).unwrap();
2100 builder.append_null::<Float64Type>("c").unwrap();
2101 builder.append::<Int32Type>("a", 4).unwrap();
2102 let union = builder.build().unwrap();
2103
2104 let batch =
2105 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(union) as ArrayRef])
2106 .unwrap();
2107
2108 let mut file = tempfile::tempfile().unwrap();
2109 {
2110 let mut writer = FileWriter::try_new_with_options(&mut file, &schema, options).unwrap();
2111
2112 writer.write(&batch).unwrap();
2113 writer.finish().unwrap();
2114 }
2115 file.rewind().unwrap();
2116
2117 {
2118 let reader = FileReader::try_new(file, None).unwrap();
2119 reader.for_each(|maybe_batch| {
2120 maybe_batch
2121 .unwrap()
2122 .columns()
2123 .iter()
2124 .zip(batch.columns())
2125 .for_each(|(a, b)| {
2126 assert_eq!(a.data_type(), b.data_type());
2127 assert_eq!(a.len(), b.len());
2128 assert_eq!(a.null_count(), b.null_count());
2129 });
2130 });
2131 }
2132 }
2133
2134 #[test]
2135 fn test_write_union_file_v4_v5() {
2136 write_union_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap());
2137 write_union_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap());
2138 }
2139
2140 #[test]
2141 fn test_write_view_types() {
2142 const LONG_TEST_STRING: &str =
2143 "This is a long string to make sure binary view array handles it";
2144 let schema = Schema::new(vec![
2145 Field::new("field1", DataType::BinaryView, true),
2146 Field::new("field2", DataType::Utf8View, true),
2147 ]);
2148 let values: Vec<Option<&[u8]>> = vec![
2149 Some(b"foo"),
2150 Some(b"bar"),
2151 Some(LONG_TEST_STRING.as_bytes()),
2152 ];
2153 let binary_array = BinaryViewArray::from_iter(values);
2154 let utf8_array =
2155 StringViewArray::from_iter(vec![Some("foo"), Some("bar"), Some(LONG_TEST_STRING)]);
2156 let record_batch = RecordBatch::try_new(
2157 Arc::new(schema.clone()),
2158 vec![Arc::new(binary_array), Arc::new(utf8_array)],
2159 )
2160 .unwrap();
2161
2162 let mut file = tempfile::tempfile().unwrap();
2163 {
2164 let mut writer = FileWriter::try_new(&mut file, &schema).unwrap();
2165 writer.write(&record_batch).unwrap();
2166 writer.finish().unwrap();
2167 }
2168 file.rewind().unwrap();
2169 {
2170 let mut reader = FileReader::try_new(&file, None).unwrap();
2171 let read_batch = reader.next().unwrap().unwrap();
2172 read_batch
2173 .columns()
2174 .iter()
2175 .zip(record_batch.columns())
2176 .for_each(|(a, b)| {
2177 assert_eq!(a, b);
2178 });
2179 }
2180 file.rewind().unwrap();
2181 {
2182 let mut reader = FileReader::try_new(&file, Some(vec![0])).unwrap();
2183 let read_batch = reader.next().unwrap().unwrap();
2184 assert_eq!(read_batch.num_columns(), 1);
2185 let read_array = read_batch.column(0);
2186 let write_array = record_batch.column(0);
2187 assert_eq!(read_array, write_array);
2188 }
2189 }
2190
2191 #[test]
2192 fn truncate_ipc_record_batch() {
2193 fn create_batch(rows: usize) -> RecordBatch {
2194 let schema = Schema::new(vec![
2195 Field::new("a", DataType::Int32, false),
2196 Field::new("b", DataType::Utf8, false),
2197 ]);
2198
2199 let a = Int32Array::from_iter_values(0..rows as i32);
2200 let b = StringArray::from_iter_values((0..rows).map(|i| i.to_string()));
2201
2202 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap()
2203 }
2204
2205 let big_record_batch = create_batch(65536);
2206
2207 let length = 5;
2208 let small_record_batch = create_batch(length);
2209
2210 let offset = 2;
2211 let record_batch_slice = big_record_batch.slice(offset, length);
2212 assert!(
2213 serialize_stream(&big_record_batch).len() > serialize_stream(&small_record_batch).len()
2214 );
2215 assert_eq!(
2216 serialize_stream(&small_record_batch).len(),
2217 serialize_stream(&record_batch_slice).len()
2218 );
2219
2220 assert_eq!(
2221 deserialize_stream(serialize_stream(&record_batch_slice)),
2222 record_batch_slice
2223 );
2224 }
2225
2226 #[test]
2227 fn truncate_ipc_record_batch_with_nulls() {
2228 fn create_batch() -> RecordBatch {
2229 let schema = Schema::new(vec![
2230 Field::new("a", DataType::Int32, true),
2231 Field::new("b", DataType::Utf8, true),
2232 ]);
2233
2234 let a = Int32Array::from(vec![Some(1), None, Some(1), None, Some(1)]);
2235 let b = StringArray::from(vec![None, Some("a"), Some("a"), None, Some("a")]);
2236
2237 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap()
2238 }
2239
2240 let record_batch = create_batch();
2241 let record_batch_slice = record_batch.slice(1, 2);
2242 let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2243
2244 assert!(
2245 serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2246 );
2247
2248 assert!(deserialized_batch.column(0).is_null(0));
2249 assert!(deserialized_batch.column(0).is_valid(1));
2250 assert!(deserialized_batch.column(1).is_valid(0));
2251 assert!(deserialized_batch.column(1).is_valid(1));
2252
2253 assert_eq!(record_batch_slice, deserialized_batch);
2254 }
2255
2256 #[test]
2257 fn truncate_ipc_dictionary_array() {
2258 fn create_batch() -> RecordBatch {
2259 let values: StringArray = [Some("foo"), Some("bar"), Some("baz")]
2260 .into_iter()
2261 .collect();
2262 let keys: Int32Array = [Some(0), Some(2), None, Some(1)].into_iter().collect();
2263
2264 let array = DictionaryArray::new(keys, Arc::new(values));
2265
2266 let schema = Schema::new(vec![Field::new("dict", array.data_type().clone(), true)]);
2267
2268 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap()
2269 }
2270
2271 let record_batch = create_batch();
2272 let record_batch_slice = record_batch.slice(1, 2);
2273 let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2274
2275 assert!(
2276 serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2277 );
2278
2279 assert!(deserialized_batch.column(0).is_valid(0));
2280 assert!(deserialized_batch.column(0).is_null(1));
2281
2282 assert_eq!(record_batch_slice, deserialized_batch);
2283 }
2284
2285 #[test]
2286 fn truncate_ipc_struct_array() {
2287 fn create_batch() -> RecordBatch {
2288 let strings: StringArray = [Some("foo"), None, Some("bar"), Some("baz")]
2289 .into_iter()
2290 .collect();
2291 let ints: Int32Array = [Some(0), Some(2), None, Some(1)].into_iter().collect();
2292
2293 let struct_array = StructArray::from(vec![
2294 (
2295 Arc::new(Field::new("s", DataType::Utf8, true)),
2296 Arc::new(strings) as ArrayRef,
2297 ),
2298 (
2299 Arc::new(Field::new("c", DataType::Int32, true)),
2300 Arc::new(ints) as ArrayRef,
2301 ),
2302 ]);
2303
2304 let schema = Schema::new(vec![Field::new(
2305 "struct_array",
2306 struct_array.data_type().clone(),
2307 true,
2308 )]);
2309
2310 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(struct_array)]).unwrap()
2311 }
2312
2313 let record_batch = create_batch();
2314 let record_batch_slice = record_batch.slice(1, 2);
2315 let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2316
2317 assert!(
2318 serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2319 );
2320
2321 let structs = deserialized_batch
2322 .column(0)
2323 .as_any()
2324 .downcast_ref::<StructArray>()
2325 .unwrap();
2326
2327 assert!(structs.column(0).is_null(0));
2328 assert!(structs.column(0).is_valid(1));
2329 assert!(structs.column(1).is_valid(0));
2330 assert!(structs.column(1).is_null(1));
2331 assert_eq!(record_batch_slice, deserialized_batch);
2332 }
2333
2334 #[test]
2335 fn truncate_ipc_string_array_with_all_empty_string() {
2336 fn create_batch() -> RecordBatch {
2337 let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
2338 let a = StringArray::from(vec![Some(""), Some(""), Some(""), Some(""), Some("")]);
2339 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap()
2340 }
2341
2342 let record_batch = create_batch();
2343 let record_batch_slice = record_batch.slice(0, 1);
2344 let deserialized_batch = deserialize_stream(serialize_stream(&record_batch_slice));
2345
2346 assert!(
2347 serialize_stream(&record_batch).len() > serialize_stream(&record_batch_slice).len()
2348 );
2349 assert_eq!(record_batch_slice, deserialized_batch);
2350 }
2351
2352 #[test]
2353 fn test_stream_writer_writes_array_slice() {
2354 let array = UInt32Array::from(vec![Some(1), Some(2), Some(3)]);
2355 assert_eq!(
2356 vec![Some(1), Some(2), Some(3)],
2357 array.iter().collect::<Vec<_>>()
2358 );
2359
2360 let sliced = array.slice(1, 2);
2361 assert_eq!(vec![Some(2), Some(3)], sliced.iter().collect::<Vec<_>>());
2362
2363 let batch = RecordBatch::try_new(
2364 Arc::new(Schema::new(vec![Field::new("a", DataType::UInt32, true)])),
2365 vec![Arc::new(sliced)],
2366 )
2367 .expect("new batch");
2368
2369 let mut writer = StreamWriter::try_new(vec![], batch.schema_ref()).expect("new writer");
2370 writer.write(&batch).expect("write");
2371 let outbuf = writer.into_inner().expect("inner");
2372
2373 let mut reader = StreamReader::try_new(&outbuf[..], None).expect("new reader");
2374 let read_batch = reader.next().unwrap().expect("read batch");
2375
2376 let read_array: &UInt32Array = read_batch.column(0).as_primitive();
2377 assert_eq!(
2378 vec![Some(2), Some(3)],
2379 read_array.iter().collect::<Vec<_>>()
2380 );
2381 }
2382
2383 #[test]
2384 fn test_large_slice_uint32() {
2385 ensure_roundtrip(Arc::new(UInt32Array::from_iter((0..8000).map(|i| {
2386 if i % 2 == 0 {
2387 Some(i)
2388 } else {
2389 None
2390 }
2391 }))));
2392 }
2393
2394 #[test]
2395 fn test_large_slice_string() {
2396 let strings: Vec<_> = (0..8000)
2397 .map(|i| {
2398 if i % 2 == 0 {
2399 Some(format!("value{i}"))
2400 } else {
2401 None
2402 }
2403 })
2404 .collect();
2405
2406 ensure_roundtrip(Arc::new(StringArray::from(strings)));
2407 }
2408
2409 #[test]
2410 fn test_large_slice_string_list() {
2411 let mut ls = ListBuilder::new(StringBuilder::new());
2412
2413 let mut s = String::new();
2414 for row_number in 0..8000 {
2415 if row_number % 2 == 0 {
2416 for list_element in 0..1000 {
2417 s.clear();
2418 use std::fmt::Write;
2419 write!(&mut s, "value{row_number}-{list_element}").unwrap();
2420 ls.values().append_value(&s);
2421 }
2422 ls.append(true)
2423 } else {
2424 ls.append(false); }
2426 }
2427
2428 ensure_roundtrip(Arc::new(ls.finish()));
2429 }
2430
2431 #[test]
2432 fn test_large_slice_string_list_of_lists() {
2433 let mut ls = ListBuilder::new(ListBuilder::new(StringBuilder::new()));
2437
2438 for _ in 0..4000 {
2439 ls.values().append(true);
2440 ls.append(true)
2441 }
2442
2443 let mut s = String::new();
2444 for row_number in 0..4000 {
2445 if row_number % 2 == 0 {
2446 for list_element in 0..1000 {
2447 s.clear();
2448 use std::fmt::Write;
2449 write!(&mut s, "value{row_number}-{list_element}").unwrap();
2450 ls.values().values().append_value(&s);
2451 }
2452 ls.values().append(true);
2453 ls.append(true)
2454 } else {
2455 ls.append(false); }
2457 }
2458
2459 ensure_roundtrip(Arc::new(ls.finish()));
2460 }
2461
2462 fn ensure_roundtrip(array: ArrayRef) {
2464 let num_rows = array.len();
2465 let orig_batch = RecordBatch::try_from_iter(vec![("a", array)]).unwrap();
2466 let sliced_batch = orig_batch.slice(1, num_rows - 1);
2468
2469 let schema = orig_batch.schema();
2470 let stream_data = {
2471 let mut writer = StreamWriter::try_new(vec![], &schema).unwrap();
2472 writer.write(&sliced_batch).unwrap();
2473 writer.into_inner().unwrap()
2474 };
2475 let read_batch = {
2476 let projection = None;
2477 let mut reader = StreamReader::try_new(Cursor::new(stream_data), projection).unwrap();
2478 reader
2479 .next()
2480 .expect("expect no errors reading batch")
2481 .expect("expect batch")
2482 };
2483 assert_eq!(sliced_batch, read_batch);
2484
2485 let file_data = {
2486 let mut writer = FileWriter::try_new_buffered(vec![], &schema).unwrap();
2487 writer.write(&sliced_batch).unwrap();
2488 writer.into_inner().unwrap().into_inner().unwrap()
2489 };
2490 let read_batch = {
2491 let projection = None;
2492 let mut reader = FileReader::try_new(Cursor::new(file_data), projection).unwrap();
2493 reader
2494 .next()
2495 .expect("expect no errors reading batch")
2496 .expect("expect batch")
2497 };
2498 assert_eq!(sliced_batch, read_batch);
2499
2500 }
2502
2503 #[test]
2504 fn encode_bools_slice() {
2505 assert_bool_roundtrip([true, false], 1, 1);
2507
2508 assert_bool_roundtrip(
2510 [
2511 true, false, true, true, false, false, true, true, true, false, false, false, true,
2512 true, true, true, false, false, false, false, true, true, true, true, true, false,
2513 false, false, false, false,
2514 ],
2515 13,
2516 17,
2517 );
2518
2519 assert_bool_roundtrip(
2521 [
2522 true, false, true, true, false, false, true, true, true, false, false, false,
2523 ],
2524 8,
2525 2,
2526 );
2527
2528 assert_bool_roundtrip(
2530 [
2531 true, false, true, true, false, false, true, true, true, false, false, false, true,
2532 true, true, true, true, false, false, false, false, false,
2533 ],
2534 8,
2535 8,
2536 );
2537 }
2538
2539 fn assert_bool_roundtrip<const N: usize>(bools: [bool; N], offset: usize, length: usize) {
2540 let val_bool_field = Field::new("val", DataType::Boolean, false);
2541
2542 let schema = Arc::new(Schema::new(vec![val_bool_field]));
2543
2544 let bools = BooleanArray::from(bools.to_vec());
2545
2546 let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(bools)]).unwrap();
2547 let batch = batch.slice(offset, length);
2548
2549 let data = serialize_stream(&batch);
2550 let batch2 = deserialize_stream(data);
2551 assert_eq!(batch, batch2);
2552 }
2553
2554 #[test]
2555 fn test_run_array_unslice() {
2556 let total_len = 80;
2557 let vals: Vec<Option<i32>> = vec![Some(1), None, Some(2), Some(3), Some(4), None, Some(5)];
2558 let repeats: Vec<usize> = vec![3, 4, 1, 2];
2559 let mut input_array: Vec<Option<i32>> = Vec::with_capacity(total_len);
2560 for ix in 0_usize..32 {
2561 let repeat: usize = repeats[ix % repeats.len()];
2562 let val: Option<i32> = vals[ix % vals.len()];
2563 input_array.resize(input_array.len() + repeat, val);
2564 }
2565
2566 let mut builder =
2568 PrimitiveRunBuilder::<Int16Type, Int32Type>::with_capacity(input_array.len());
2569 builder.extend(input_array.iter().copied());
2570 let run_array = builder.finish();
2571
2572 for slice_len in 1..=total_len {
2574 let sliced_run_array: RunArray<Int16Type> =
2576 run_array.slice(0, slice_len).into_data().into();
2577
2578 let unsliced_run_array = into_zero_offset_run_array(sliced_run_array).unwrap();
2580 let typed = unsliced_run_array
2581 .downcast::<PrimitiveArray<Int32Type>>()
2582 .unwrap();
2583 let expected: Vec<Option<i32>> = input_array.iter().take(slice_len).copied().collect();
2584 let actual: Vec<Option<i32>> = typed.into_iter().collect();
2585 assert_eq!(expected, actual);
2586
2587 let sliced_run_array: RunArray<Int16Type> = run_array
2589 .slice(total_len - slice_len, slice_len)
2590 .into_data()
2591 .into();
2592
2593 let unsliced_run_array = into_zero_offset_run_array(sliced_run_array).unwrap();
2595 let typed = unsliced_run_array
2596 .downcast::<PrimitiveArray<Int32Type>>()
2597 .unwrap();
2598 let expected: Vec<Option<i32>> = input_array
2599 .iter()
2600 .skip(total_len - slice_len)
2601 .copied()
2602 .collect();
2603 let actual: Vec<Option<i32>> = typed.into_iter().collect();
2604 assert_eq!(expected, actual);
2605 }
2606 }
2607
2608 fn generate_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
2609 let mut ls = GenericListBuilder::<O, _>::new(UInt32Builder::new());
2610
2611 for i in 0..100_000 {
2612 for value in [i, i, i] {
2613 ls.values().append_value(value);
2614 }
2615 ls.append(true)
2616 }
2617
2618 ls.finish()
2619 }
2620
2621 fn generate_nested_list_data<O: OffsetSizeTrait>() -> GenericListArray<O> {
2622 let mut ls =
2623 GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(UInt32Builder::new()));
2624
2625 for _i in 0..10_000 {
2626 for j in 0..10 {
2627 for value in [j, j, j, j] {
2628 ls.values().values().append_value(value);
2629 }
2630 ls.values().append(true)
2631 }
2632 ls.append(true);
2633 }
2634
2635 ls.finish()
2636 }
2637
2638 fn generate_nested_list_data_starting_at_zero<O: OffsetSizeTrait>() -> GenericListArray<O> {
2639 let mut ls =
2640 GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(UInt32Builder::new()));
2641
2642 for _i in 0..999 {
2643 ls.values().append(true);
2644 ls.append(true);
2645 }
2646
2647 for j in 0..10 {
2648 for value in [j, j, j, j] {
2649 ls.values().values().append_value(value);
2650 }
2651 ls.values().append(true)
2652 }
2653 ls.append(true);
2654
2655 for i in 0..9_000 {
2656 for j in 0..10 {
2657 for value in [i + j, i + j, i + j, i + j] {
2658 ls.values().values().append_value(value);
2659 }
2660 ls.values().append(true)
2661 }
2662 ls.append(true);
2663 }
2664
2665 ls.finish()
2666 }
2667
2668 fn generate_map_array_data() -> MapArray {
2669 let keys_builder = UInt32Builder::new();
2670 let values_builder = UInt32Builder::new();
2671
2672 let mut builder = MapBuilder::new(None, keys_builder, values_builder);
2673
2674 for i in 0..100_000 {
2675 for _j in 0..3 {
2676 builder.keys().append_value(i);
2677 builder.values().append_value(i * 2);
2678 }
2679 builder.append(true).unwrap();
2680 }
2681
2682 builder.finish()
2683 }
2684
2685 #[test]
2686 fn reencode_offsets_when_first_offset_is_not_zero() {
2687 let original_list = generate_list_data::<i32>();
2688 let original_data = original_list.into_data();
2689 let slice_data = original_data.slice(75, 7);
2690 let (new_offsets, original_start, length) =
2691 reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
2692 assert_eq!(
2693 vec![0, 3, 6, 9, 12, 15, 18, 21],
2694 new_offsets.typed_data::<i32>()
2695 );
2696 assert_eq!(225, original_start);
2697 assert_eq!(21, length);
2698 }
2699
2700 #[test]
2701 fn reencode_offsets_when_first_offset_is_zero() {
2702 let mut ls = GenericListBuilder::<i32, _>::new(UInt32Builder::new());
2703 ls.append(true);
2705 ls.values().append_value(35);
2706 ls.values().append_value(42);
2707 ls.append(true);
2708 let original_list = ls.finish();
2709 let original_data = original_list.into_data();
2710
2711 let slice_data = original_data.slice(1, 1);
2712 let (new_offsets, original_start, length) =
2713 reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
2714 assert_eq!(vec![0, 2], new_offsets.typed_data::<i32>());
2715 assert_eq!(0, original_start);
2716 assert_eq!(2, length);
2717 }
2718
2719 fn roundtrip_ensure_sliced_smaller(in_batch: RecordBatch, expected_size_factor: usize) {
2722 let in_sliced = in_batch.slice(999, 1);
2724
2725 let bytes_batch = serialize_file(&in_batch);
2726 let bytes_sliced = serialize_file(&in_sliced);
2727
2728 assert!(bytes_sliced.len() < (bytes_batch.len() / expected_size_factor));
2730
2731 let out_batch = deserialize_file(bytes_batch);
2733 assert_eq!(in_batch, out_batch);
2734
2735 let out_sliced = deserialize_file(bytes_sliced);
2736 assert_eq!(in_sliced, out_sliced);
2737 }
2738
2739 #[test]
2740 fn encode_lists() {
2741 let val_inner = Field::new_list_field(DataType::UInt32, true);
2742 let val_list_field = Field::new("val", DataType::List(Arc::new(val_inner)), false);
2743 let schema = Arc::new(Schema::new(vec![val_list_field]));
2744
2745 let values = Arc::new(generate_list_data::<i32>());
2746
2747 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2748 roundtrip_ensure_sliced_smaller(in_batch, 1000);
2749 }
2750
2751 #[test]
2752 fn encode_empty_list() {
2753 let val_inner = Field::new_list_field(DataType::UInt32, true);
2754 let val_list_field = Field::new("val", DataType::List(Arc::new(val_inner)), false);
2755 let schema = Arc::new(Schema::new(vec![val_list_field]));
2756
2757 let values = Arc::new(generate_list_data::<i32>());
2758
2759 let in_batch = RecordBatch::try_new(schema, vec![values])
2760 .unwrap()
2761 .slice(999, 0);
2762 let out_batch = deserialize_file(serialize_file(&in_batch));
2763 assert_eq!(in_batch, out_batch);
2764 }
2765
2766 #[test]
2767 fn encode_large_lists() {
2768 let val_inner = Field::new_list_field(DataType::UInt32, true);
2769 let val_list_field = Field::new("val", DataType::LargeList(Arc::new(val_inner)), false);
2770 let schema = Arc::new(Schema::new(vec![val_list_field]));
2771
2772 let values = Arc::new(generate_list_data::<i64>());
2773
2774 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2777 roundtrip_ensure_sliced_smaller(in_batch, 1000);
2778 }
2779
2780 #[test]
2781 fn encode_nested_lists() {
2782 let inner_int = Arc::new(Field::new_list_field(DataType::UInt32, true));
2783 let inner_list_field = Arc::new(Field::new_list_field(DataType::List(inner_int), true));
2784 let list_field = Field::new("val", DataType::List(inner_list_field), true);
2785 let schema = Arc::new(Schema::new(vec![list_field]));
2786
2787 let values = Arc::new(generate_nested_list_data::<i32>());
2788
2789 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2790 roundtrip_ensure_sliced_smaller(in_batch, 1000);
2791 }
2792
2793 #[test]
2794 fn encode_nested_lists_starting_at_zero() {
2795 let inner_int = Arc::new(Field::new("item", DataType::UInt32, true));
2796 let inner_list_field = Arc::new(Field::new("item", DataType::List(inner_int), true));
2797 let list_field = Field::new("val", DataType::List(inner_list_field), true);
2798 let schema = Arc::new(Schema::new(vec![list_field]));
2799
2800 let values = Arc::new(generate_nested_list_data_starting_at_zero::<i32>());
2801
2802 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2803 roundtrip_ensure_sliced_smaller(in_batch, 1);
2804 }
2805
2806 #[test]
2807 fn encode_map_array() {
2808 let keys = Arc::new(Field::new("keys", DataType::UInt32, false));
2809 let values = Arc::new(Field::new("values", DataType::UInt32, true));
2810 let map_field = Field::new_map("map", "entries", keys, values, false, true);
2811 let schema = Arc::new(Schema::new(vec![map_field]));
2812
2813 let values = Arc::new(generate_map_array_data());
2814
2815 let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2816 roundtrip_ensure_sliced_smaller(in_batch, 1000);
2817 }
2818
2819 #[test]
2820 fn test_decimal128_alignment16_is_sufficient() {
2821 const IPC_ALIGNMENT: usize = 16;
2822
2823 for num_cols in [1, 2, 3, 17, 50, 73, 99] {
2828 let num_rows = (num_cols * 7 + 11) % 100; let mut fields = Vec::new();
2831 let mut arrays = Vec::new();
2832 for i in 0..num_cols {
2833 let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
2834 let array = Decimal128Array::from(vec![num_cols as i128; num_rows]);
2835 fields.push(field);
2836 arrays.push(Arc::new(array) as Arc<dyn Array>);
2837 }
2838 let schema = Schema::new(fields);
2839 let batch = RecordBatch::try_new(Arc::new(schema), arrays).unwrap();
2840
2841 let mut writer = FileWriter::try_new_with_options(
2842 Vec::new(),
2843 batch.schema_ref(),
2844 IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
2845 )
2846 .unwrap();
2847 writer.write(&batch).unwrap();
2848 writer.finish().unwrap();
2849
2850 let out: Vec<u8> = writer.into_inner().unwrap();
2851
2852 let buffer = Buffer::from_vec(out);
2853 let trailer_start = buffer.len() - 10;
2854 let footer_len =
2855 read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
2856 let footer =
2857 root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
2858
2859 let schema = fb_to_schema(footer.schema().unwrap());
2860
2861 let decoder =
2864 FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true);
2865
2866 let batches = footer.recordBatches().unwrap();
2867
2868 let block = batches.get(0);
2869 let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
2870 let data = buffer.slice_with_length(block.offset() as _, block_len);
2871
2872 let batch2 = decoder.read_record_batch(block, &data).unwrap().unwrap();
2873
2874 assert_eq!(batch, batch2);
2875 }
2876 }
2877
2878 #[test]
2879 fn test_decimal128_alignment8_is_unaligned() {
2880 const IPC_ALIGNMENT: usize = 8;
2881
2882 let num_cols = 2;
2883 let num_rows = 1;
2884
2885 let mut fields = Vec::new();
2886 let mut arrays = Vec::new();
2887 for i in 0..num_cols {
2888 let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
2889 let array = Decimal128Array::from(vec![num_cols as i128; num_rows]);
2890 fields.push(field);
2891 arrays.push(Arc::new(array) as Arc<dyn Array>);
2892 }
2893 let schema = Schema::new(fields);
2894 let batch = RecordBatch::try_new(Arc::new(schema), arrays).unwrap();
2895
2896 let mut writer = FileWriter::try_new_with_options(
2897 Vec::new(),
2898 batch.schema_ref(),
2899 IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
2900 )
2901 .unwrap();
2902 writer.write(&batch).unwrap();
2903 writer.finish().unwrap();
2904
2905 let out: Vec<u8> = writer.into_inner().unwrap();
2906
2907 let buffer = Buffer::from_vec(out);
2908 let trailer_start = buffer.len() - 10;
2909 let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
2910 let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
2911 let schema = fb_to_schema(footer.schema().unwrap());
2912
2913 let decoder =
2916 FileDecoder::new(Arc::new(schema), footer.version()).with_require_alignment(true);
2917
2918 let batches = footer.recordBatches().unwrap();
2919
2920 let block = batches.get(0);
2921 let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
2922 let data = buffer.slice_with_length(block.offset() as _, block_len);
2923
2924 let result = decoder.read_record_batch(block, &data);
2925
2926 let error = result.unwrap_err();
2927 assert_eq!(
2928 error.to_string(),
2929 "Invalid argument error: Misaligned buffers[0] in array of type Decimal128(38, 10), \
2930 offset from expected alignment of 16 by 8"
2931 );
2932 }
2933
2934 #[test]
2935 fn test_flush() {
2936 let num_cols = 2;
2939 let mut fields = Vec::new();
2940 let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap();
2941 for i in 0..num_cols {
2942 let field = Field::new(format!("col_{i}"), DataType::Decimal128(38, 10), true);
2943 fields.push(field);
2944 }
2945 let schema = Schema::new(fields);
2946 let inner_stream_writer = BufWriter::with_capacity(1024, Vec::new());
2947 let inner_file_writer = BufWriter::with_capacity(1024, Vec::new());
2948 let mut stream_writer =
2949 StreamWriter::try_new_with_options(inner_stream_writer, &schema, options.clone())
2950 .unwrap();
2951 let mut file_writer =
2952 FileWriter::try_new_with_options(inner_file_writer, &schema, options).unwrap();
2953
2954 let stream_bytes_written_on_new = stream_writer.get_ref().get_ref().len();
2955 let file_bytes_written_on_new = file_writer.get_ref().get_ref().len();
2956 stream_writer.flush().unwrap();
2957 file_writer.flush().unwrap();
2958 let stream_bytes_written_on_flush = stream_writer.get_ref().get_ref().len();
2959 let file_bytes_written_on_flush = file_writer.get_ref().get_ref().len();
2960 let stream_out = stream_writer.into_inner().unwrap().into_inner().unwrap();
2961 let expected_stream_flushed_bytes = stream_out.len() - 8;
2965 let expected_file_flushed_bytes = expected_stream_flushed_bytes + 8;
2968
2969 assert!(
2970 stream_bytes_written_on_new < stream_bytes_written_on_flush,
2971 "this test makes no sense if flush is not actually required"
2972 );
2973 assert!(
2974 file_bytes_written_on_new < file_bytes_written_on_flush,
2975 "this test makes no sense if flush is not actually required"
2976 );
2977 assert_eq!(stream_bytes_written_on_flush, expected_stream_flushed_bytes);
2978 assert_eq!(file_bytes_written_on_flush, expected_file_flushed_bytes);
2979 }
2980
2981 #[test]
2982 fn test_roundtrip_list_of_fixed_list() -> Result<(), ArrowError> {
2983 let l1_type =
2984 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, false)), 3);
2985 let l2_type = DataType::List(Arc::new(Field::new("item", l1_type.clone(), false)));
2986
2987 let l0_builder = Float32Builder::new();
2988 let l1_builder = FixedSizeListBuilder::new(l0_builder, 3).with_field(Arc::new(Field::new(
2989 "item",
2990 DataType::Float32,
2991 false,
2992 )));
2993 let mut l2_builder =
2994 ListBuilder::new(l1_builder).with_field(Arc::new(Field::new("item", l1_type, false)));
2995
2996 for point in [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0], [7.0, 8.0, 9.0]] {
2997 l2_builder.values().values().append_value(point[0]);
2998 l2_builder.values().values().append_value(point[1]);
2999 l2_builder.values().values().append_value(point[2]);
3000
3001 l2_builder.values().append(true);
3002 }
3003 l2_builder.append(true);
3004
3005 let point = [10., 11., 12.];
3006 l2_builder.values().values().append_value(point[0]);
3007 l2_builder.values().values().append_value(point[1]);
3008 l2_builder.values().values().append_value(point[2]);
3009
3010 l2_builder.values().append(true);
3011 l2_builder.append(true);
3012
3013 let array = Arc::new(l2_builder.finish()) as ArrayRef;
3014
3015 let schema = Arc::new(Schema::new_with_metadata(
3016 vec![Field::new("points", l2_type, false)],
3017 HashMap::default(),
3018 ));
3019
3020 test_slices(&array, &schema, 0, 1)?;
3023 test_slices(&array, &schema, 0, 2)?;
3024 test_slices(&array, &schema, 1, 1)?;
3025
3026 Ok(())
3027 }
3028
3029 #[test]
3030 fn test_roundtrip_list_of_fixed_list_w_nulls() -> Result<(), ArrowError> {
3031 let l0_builder = Float32Builder::new();
3032 let l1_builder = FixedSizeListBuilder::new(l0_builder, 3);
3033 let mut l2_builder = ListBuilder::new(l1_builder);
3034
3035 for point in [
3036 [Some(1.0), Some(2.0), None],
3037 [Some(4.0), Some(5.0), Some(6.0)],
3038 [None, Some(8.0), Some(9.0)],
3039 ] {
3040 for p in point {
3041 match p {
3042 Some(p) => l2_builder.values().values().append_value(p),
3043 None => l2_builder.values().values().append_null(),
3044 }
3045 }
3046
3047 l2_builder.values().append(true);
3048 }
3049 l2_builder.append(true);
3050
3051 let point = [Some(10.), None, None];
3052 for p in point {
3053 match p {
3054 Some(p) => l2_builder.values().values().append_value(p),
3055 None => l2_builder.values().values().append_null(),
3056 }
3057 }
3058
3059 l2_builder.values().append(true);
3060 l2_builder.append(true);
3061
3062 let array = Arc::new(l2_builder.finish()) as ArrayRef;
3063
3064 let schema = Arc::new(Schema::new_with_metadata(
3065 vec![Field::new(
3066 "points",
3067 DataType::List(Arc::new(Field::new(
3068 "item",
3069 DataType::FixedSizeList(
3070 Arc::new(Field::new("item", DataType::Float32, true)),
3071 3,
3072 ),
3073 true,
3074 ))),
3075 true,
3076 )],
3077 HashMap::default(),
3078 ));
3079
3080 test_slices(&array, &schema, 0, 1)?;
3083 test_slices(&array, &schema, 0, 2)?;
3084 test_slices(&array, &schema, 1, 1)?;
3085
3086 Ok(())
3087 }
3088
3089 fn test_slices(
3090 parent_array: &ArrayRef,
3091 schema: &SchemaRef,
3092 offset: usize,
3093 length: usize,
3094 ) -> Result<(), ArrowError> {
3095 let subarray = parent_array.slice(offset, length);
3096 let original_batch = RecordBatch::try_new(schema.clone(), vec![subarray])?;
3097
3098 let mut bytes = Vec::new();
3099 let mut writer = StreamWriter::try_new(&mut bytes, schema)?;
3100 writer.write(&original_batch)?;
3101 writer.finish()?;
3102
3103 let mut cursor = std::io::Cursor::new(bytes);
3104 let mut reader = StreamReader::try_new(&mut cursor, None)?;
3105 let returned_batch = reader.next().unwrap()?;
3106
3107 assert_eq!(original_batch, returned_batch);
3108
3109 Ok(())
3110 }
3111
3112 #[test]
3113 fn test_roundtrip_fixed_list() -> Result<(), ArrowError> {
3114 let int_builder = Int64Builder::new();
3115 let mut fixed_list_builder = FixedSizeListBuilder::new(int_builder, 3)
3116 .with_field(Arc::new(Field::new("item", DataType::Int64, false)));
3117
3118 for point in [[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]] {
3119 fixed_list_builder.values().append_value(point[0]);
3120 fixed_list_builder.values().append_value(point[1]);
3121 fixed_list_builder.values().append_value(point[2]);
3122
3123 fixed_list_builder.append(true);
3124 }
3125
3126 let array = Arc::new(fixed_list_builder.finish()) as ArrayRef;
3127
3128 let schema = Arc::new(Schema::new_with_metadata(
3129 vec![Field::new(
3130 "points",
3131 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, false)), 3),
3132 false,
3133 )],
3134 HashMap::default(),
3135 ));
3136
3137 test_slices(&array, &schema, 0, 4)?;
3140 test_slices(&array, &schema, 0, 2)?;
3141 test_slices(&array, &schema, 1, 3)?;
3142 test_slices(&array, &schema, 2, 1)?;
3143
3144 Ok(())
3145 }
3146
3147 #[test]
3148 fn test_roundtrip_fixed_list_w_nulls() -> Result<(), ArrowError> {
3149 let int_builder = Int64Builder::new();
3150 let mut fixed_list_builder = FixedSizeListBuilder::new(int_builder, 3);
3151
3152 for point in [
3153 [Some(1), Some(2), None],
3154 [Some(4), Some(5), Some(6)],
3155 [None, Some(8), Some(9)],
3156 [Some(10), None, None],
3157 ] {
3158 for p in point {
3159 match p {
3160 Some(p) => fixed_list_builder.values().append_value(p),
3161 None => fixed_list_builder.values().append_null(),
3162 }
3163 }
3164
3165 fixed_list_builder.append(true);
3166 }
3167
3168 let array = Arc::new(fixed_list_builder.finish()) as ArrayRef;
3169
3170 let schema = Arc::new(Schema::new_with_metadata(
3171 vec![Field::new(
3172 "points",
3173 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int64, true)), 3),
3174 true,
3175 )],
3176 HashMap::default(),
3177 ));
3178
3179 test_slices(&array, &schema, 0, 4)?;
3182 test_slices(&array, &schema, 0, 2)?;
3183 test_slices(&array, &schema, 1, 3)?;
3184 test_slices(&array, &schema, 2, 1)?;
3185
3186 Ok(())
3187 }
3188
3189 #[test]
3190 fn test_metadata_encoding_ordering() {
3191 fn create_hash() -> u64 {
3192 let metadata: HashMap<String, String> = [
3193 ("a", "1"), ("b", "2"), ("c", "3"), ("d", "4"), ("e", "5"), ]
3199 .into_iter()
3200 .map(|(k, v)| (k.to_owned(), v.to_owned()))
3201 .collect();
3202
3203 let schema = Arc::new(
3205 Schema::new(vec![
3206 Field::new("a", DataType::Int64, true).with_metadata(metadata.clone())
3207 ])
3208 .with_metadata(metadata)
3209 .clone(),
3210 );
3211 let batch = RecordBatch::new_empty(schema.clone());
3212
3213 let mut bytes = Vec::new();
3214 let mut w = StreamWriter::try_new(&mut bytes, batch.schema_ref()).unwrap();
3215 w.write(&batch).unwrap();
3216 w.finish().unwrap();
3217
3218 let mut h = std::hash::DefaultHasher::new();
3219 h.write(&bytes);
3220 h.finish()
3221 }
3222
3223 let expected = create_hash();
3224
3225 let all_passed = (0..20).all(|_| create_hash() == expected);
3230 assert!(all_passed);
3231 }
3232}