1use bytes::Bytes;
21use std::io::{Read, Write};
22use std::iter::Peekable;
23use std::slice::Iter;
24use std::sync::{Arc, Mutex};
25use std::vec::IntoIter;
26use thrift::protocol::TCompactOutputProtocol;
27
28use arrow_array::cast::AsArray;
29use arrow_array::types::*;
30use arrow_array::{ArrayRef, RecordBatch, RecordBatchWriter};
31use arrow_schema::{ArrowError, DataType as ArrowDataType, Field, IntervalUnit, SchemaRef};
32
33use super::schema::{add_encoded_arrow_schema_to_metadata, decimal_length_from_precision};
34
35use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder;
36use crate::arrow::ArrowSchemaConverter;
37use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter};
38use crate::column::writer::encoder::ColumnValueEncoder;
39use crate::column::writer::{
40 get_column_writer, ColumnCloseResult, ColumnWriter, GenericColumnWriter,
41};
42use crate::data_type::{ByteArray, FixedLenByteArray};
43use crate::errors::{ParquetError, Result};
44use crate::file::metadata::{KeyValue, RowGroupMetaData};
45use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
46use crate::file::reader::{ChunkReader, Length};
47use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
48use crate::schema::types::{ColumnDescPtr, SchemaDescriptor};
49use crate::thrift::TSerializable;
50use levels::{calculate_array_levels, ArrayLevels};
51
52mod byte_array;
53mod levels;
54
55pub struct ArrowWriter<W: Write> {
128 writer: SerializedFileWriter<W>,
130
131 in_progress: Option<ArrowRowGroupWriter>,
133
134 arrow_schema: SchemaRef,
138
139 max_row_group_size: usize,
141}
142
143impl<W: Write + Send> std::fmt::Debug for ArrowWriter<W> {
144 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
145 let buffered_memory = self.in_progress_size();
146 f.debug_struct("ArrowWriter")
147 .field("writer", &self.writer)
148 .field("in_progress_size", &format_args!("{buffered_memory} bytes"))
149 .field("in_progress_rows", &self.in_progress_rows())
150 .field("arrow_schema", &self.arrow_schema)
151 .field("max_row_group_size", &self.max_row_group_size)
152 .finish()
153 }
154}
155
156impl<W: Write + Send> ArrowWriter<W> {
157 pub fn try_new(
163 writer: W,
164 arrow_schema: SchemaRef,
165 props: Option<WriterProperties>,
166 ) -> Result<Self> {
167 let options = ArrowWriterOptions::new().with_properties(props.unwrap_or_default());
168 Self::try_new_with_options(writer, arrow_schema, options)
169 }
170
171 pub fn try_new_with_options(
177 writer: W,
178 arrow_schema: SchemaRef,
179 options: ArrowWriterOptions,
180 ) -> Result<Self> {
181 let mut props = options.properties;
182 let mut converter = ArrowSchemaConverter::new().with_coerce_types(props.coerce_types());
183 if let Some(schema_root) = &options.schema_root {
184 converter = converter.schema_root(schema_root);
185 }
186 let schema = converter.convert(&arrow_schema)?;
187 if !options.skip_arrow_metadata {
188 add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
190 }
191
192 let max_row_group_size = props.max_row_group_size();
193
194 let file_writer =
195 SerializedFileWriter::new(writer, schema.root_schema_ptr(), Arc::new(props))?;
196
197 Ok(Self {
198 writer: file_writer,
199 in_progress: None,
200 arrow_schema,
201 max_row_group_size,
202 })
203 }
204
205 pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
207 self.writer.flushed_row_groups()
208 }
209
210 pub fn memory_size(&self) -> usize {
215 match &self.in_progress {
216 Some(in_progress) => in_progress.writers.iter().map(|x| x.memory_size()).sum(),
217 None => 0,
218 }
219 }
220
221 pub fn in_progress_size(&self) -> usize {
228 match &self.in_progress {
229 Some(in_progress) => in_progress
230 .writers
231 .iter()
232 .map(|x| x.get_estimated_total_bytes())
233 .sum(),
234 None => 0,
235 }
236 }
237
238 pub fn in_progress_rows(&self) -> usize {
240 self.in_progress
241 .as_ref()
242 .map(|x| x.buffered_rows)
243 .unwrap_or_default()
244 }
245
246 pub fn bytes_written(&self) -> usize {
248 self.writer.bytes_written()
249 }
250
251 pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
259 if batch.num_rows() == 0 {
260 return Ok(());
261 }
262
263 let in_progress = match &mut self.in_progress {
264 Some(in_progress) => in_progress,
265 x => x.insert(ArrowRowGroupWriter::new(
266 self.writer.schema_descr(),
267 self.writer.properties(),
268 &self.arrow_schema,
269 )?),
270 };
271
272 if in_progress.buffered_rows + batch.num_rows() > self.max_row_group_size {
274 let to_write = self.max_row_group_size - in_progress.buffered_rows;
275 let a = batch.slice(0, to_write);
276 let b = batch.slice(to_write, batch.num_rows() - to_write);
277 self.write(&a)?;
278 return self.write(&b);
279 }
280
281 in_progress.write(batch)?;
282
283 if in_progress.buffered_rows >= self.max_row_group_size {
284 self.flush()?
285 }
286 Ok(())
287 }
288
289 pub fn flush(&mut self) -> Result<()> {
291 let in_progress = match self.in_progress.take() {
292 Some(in_progress) => in_progress,
293 None => return Ok(()),
294 };
295
296 let mut row_group_writer = self.writer.next_row_group()?;
297 for chunk in in_progress.close()? {
298 chunk.append_to_row_group(&mut row_group_writer)?;
299 }
300 row_group_writer.close()?;
301 Ok(())
302 }
303
304 pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
308 self.writer.append_key_value_metadata(kv_metadata)
309 }
310
311 pub fn inner(&self) -> &W {
313 self.writer.inner()
314 }
315
316 pub fn inner_mut(&mut self) -> &mut W {
321 self.writer.inner_mut()
322 }
323
324 pub fn into_inner(mut self) -> Result<W> {
326 self.flush()?;
327 self.writer.into_inner()
328 }
329
330 pub fn finish(&mut self) -> Result<crate::format::FileMetaData> {
336 self.flush()?;
337 self.writer.finish()
338 }
339
340 pub fn close(mut self) -> Result<crate::format::FileMetaData> {
342 self.finish()
343 }
344}
345
346impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> {
347 fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
348 self.write(batch).map_err(|e| e.into())
349 }
350
351 fn close(self) -> std::result::Result<(), ArrowError> {
352 self.close()?;
353 Ok(())
354 }
355}
356
357#[derive(Debug, Clone, Default)]
361pub struct ArrowWriterOptions {
362 properties: WriterProperties,
363 skip_arrow_metadata: bool,
364 schema_root: Option<String>,
365}
366
367impl ArrowWriterOptions {
368 pub fn new() -> Self {
370 Self::default()
371 }
372
373 pub fn with_properties(self, properties: WriterProperties) -> Self {
375 Self { properties, ..self }
376 }
377
378 pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
385 Self {
386 skip_arrow_metadata,
387 ..self
388 }
389 }
390
391 pub fn with_schema_root(self, schema_root: String) -> Self {
393 Self {
394 schema_root: Some(schema_root),
395 ..self
396 }
397 }
398}
399
400#[derive(Default)]
402struct ArrowColumnChunkData {
403 length: usize,
404 data: Vec<Bytes>,
405}
406
407impl Length for ArrowColumnChunkData {
408 fn len(&self) -> u64 {
409 self.length as _
410 }
411}
412
413impl ChunkReader for ArrowColumnChunkData {
414 type T = ArrowColumnChunkReader;
415
416 fn get_read(&self, start: u64) -> Result<Self::T> {
417 assert_eq!(start, 0); Ok(ArrowColumnChunkReader(
419 self.data.clone().into_iter().peekable(),
420 ))
421 }
422
423 fn get_bytes(&self, _start: u64, _length: usize) -> Result<Bytes> {
424 unimplemented!()
425 }
426}
427
428struct ArrowColumnChunkReader(Peekable<IntoIter<Bytes>>);
430
431impl Read for ArrowColumnChunkReader {
432 fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
433 let buffer = loop {
434 match self.0.peek_mut() {
435 Some(b) if b.is_empty() => {
436 self.0.next();
437 continue;
438 }
439 Some(b) => break b,
440 None => return Ok(0),
441 }
442 };
443
444 let len = buffer.len().min(out.len());
445 let b = buffer.split_to(len);
446 out[..len].copy_from_slice(&b);
447 Ok(len)
448 }
449}
450
451type SharedColumnChunk = Arc<Mutex<ArrowColumnChunkData>>;
456
457#[derive(Default)]
458struct ArrowPageWriter {
459 buffer: SharedColumnChunk,
460}
461
462impl PageWriter for ArrowPageWriter {
463 fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
464 let mut buf = self.buffer.try_lock().unwrap();
465 let page_header = page.to_thrift_header();
466 let header = {
467 let mut header = Vec::with_capacity(1024);
468 let mut protocol = TCompactOutputProtocol::new(&mut header);
469 page_header.write_to_out_protocol(&mut protocol)?;
470 Bytes::from(header)
471 };
472
473 let data = page.compressed_page().buffer().clone();
474 let compressed_size = data.len() + header.len();
475
476 let mut spec = PageWriteSpec::new();
477 spec.page_type = page.page_type();
478 spec.num_values = page.num_values();
479 spec.uncompressed_size = page.uncompressed_size() + header.len();
480 spec.offset = buf.length as u64;
481 spec.compressed_size = compressed_size;
482 spec.bytes_written = compressed_size as u64;
483
484 buf.length += compressed_size;
485 buf.data.push(header);
486 buf.data.push(data);
487
488 Ok(spec)
489 }
490
491 fn close(&mut self) -> Result<()> {
492 Ok(())
493 }
494}
495
496#[derive(Debug)]
498pub struct ArrowLeafColumn(ArrayLevels);
499
500pub fn compute_leaves(field: &Field, array: &ArrayRef) -> Result<Vec<ArrowLeafColumn>> {
502 let levels = calculate_array_levels(array, field)?;
503 Ok(levels.into_iter().map(ArrowLeafColumn).collect())
504}
505
506pub struct ArrowColumnChunk {
508 data: ArrowColumnChunkData,
509 close: ColumnCloseResult,
510}
511
512impl std::fmt::Debug for ArrowColumnChunk {
513 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
514 f.debug_struct("ArrowColumnChunk")
515 .field("length", &self.data.length)
516 .finish_non_exhaustive()
517 }
518}
519
520impl ArrowColumnChunk {
521 pub fn append_to_row_group<W: Write + Send>(
523 self,
524 writer: &mut SerializedRowGroupWriter<'_, W>,
525 ) -> Result<()> {
526 writer.append_column(&self.data, self.close)
527 }
528}
529
530pub struct ArrowColumnWriter {
610 writer: ArrowColumnWriterImpl,
611 chunk: SharedColumnChunk,
612}
613
614impl std::fmt::Debug for ArrowColumnWriter {
615 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
616 f.debug_struct("ArrowColumnWriter").finish_non_exhaustive()
617 }
618}
619
620enum ArrowColumnWriterImpl {
621 ByteArray(GenericColumnWriter<'static, ByteArrayEncoder>),
622 Column(ColumnWriter<'static>),
623}
624
625impl ArrowColumnWriter {
626 pub fn write(&mut self, col: &ArrowLeafColumn) -> Result<()> {
628 match &mut self.writer {
629 ArrowColumnWriterImpl::Column(c) => {
630 write_leaf(c, &col.0)?;
631 }
632 ArrowColumnWriterImpl::ByteArray(c) => {
633 write_primitive(c, col.0.array().as_ref(), &col.0)?;
634 }
635 }
636 Ok(())
637 }
638
639 pub fn close(self) -> Result<ArrowColumnChunk> {
641 let close = match self.writer {
642 ArrowColumnWriterImpl::ByteArray(c) => c.close()?,
643 ArrowColumnWriterImpl::Column(c) => c.close()?,
644 };
645 let chunk = Arc::try_unwrap(self.chunk).ok().unwrap();
646 let data = chunk.into_inner().unwrap();
647 Ok(ArrowColumnChunk { data, close })
648 }
649
650 pub fn memory_size(&self) -> usize {
661 match &self.writer {
662 ArrowColumnWriterImpl::ByteArray(c) => c.memory_size(),
663 ArrowColumnWriterImpl::Column(c) => c.memory_size(),
664 }
665 }
666
667 pub fn get_estimated_total_bytes(&self) -> usize {
675 match &self.writer {
676 ArrowColumnWriterImpl::ByteArray(c) => c.get_estimated_total_bytes() as _,
677 ArrowColumnWriterImpl::Column(c) => c.get_estimated_total_bytes() as _,
678 }
679 }
680}
681
682struct ArrowRowGroupWriter {
684 writers: Vec<ArrowColumnWriter>,
685 schema: SchemaRef,
686 buffered_rows: usize,
687}
688
689impl ArrowRowGroupWriter {
690 fn new(
691 parquet: &SchemaDescriptor,
692 props: &WriterPropertiesPtr,
693 arrow: &SchemaRef,
694 ) -> Result<Self> {
695 let writers = get_column_writers(parquet, props, arrow)?;
696 Ok(Self {
697 writers,
698 schema: arrow.clone(),
699 buffered_rows: 0,
700 })
701 }
702
703 fn write(&mut self, batch: &RecordBatch) -> Result<()> {
704 self.buffered_rows += batch.num_rows();
705 let mut writers = self.writers.iter_mut();
706 for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
707 for leaf in compute_leaves(field.as_ref(), column)? {
708 writers.next().unwrap().write(&leaf)?
709 }
710 }
711 Ok(())
712 }
713
714 fn close(self) -> Result<Vec<ArrowColumnChunk>> {
715 self.writers
716 .into_iter()
717 .map(|writer| writer.close())
718 .collect()
719 }
720}
721
722pub fn get_column_writers(
724 parquet: &SchemaDescriptor,
725 props: &WriterPropertiesPtr,
726 arrow: &SchemaRef,
727) -> Result<Vec<ArrowColumnWriter>> {
728 let mut writers = Vec::with_capacity(arrow.fields.len());
729 let mut leaves = parquet.columns().iter();
730 for field in &arrow.fields {
731 get_arrow_column_writer(field.data_type(), props, &mut leaves, &mut writers)?;
732 }
733 Ok(writers)
734}
735
736fn get_arrow_column_writer(
738 data_type: &ArrowDataType,
739 props: &WriterPropertiesPtr,
740 leaves: &mut Iter<'_, ColumnDescPtr>,
741 out: &mut Vec<ArrowColumnWriter>,
742) -> Result<()> {
743 let col = |desc: &ColumnDescPtr| {
744 let page_writer = Box::<ArrowPageWriter>::default();
745 let chunk = page_writer.buffer.clone();
746 let writer = get_column_writer(desc.clone(), props.clone(), page_writer);
747 ArrowColumnWriter {
748 chunk,
749 writer: ArrowColumnWriterImpl::Column(writer),
750 }
751 };
752
753 let bytes = |desc: &ColumnDescPtr| {
754 let page_writer = Box::<ArrowPageWriter>::default();
755 let chunk = page_writer.buffer.clone();
756 let writer = GenericColumnWriter::new(desc.clone(), props.clone(), page_writer);
757 ArrowColumnWriter {
758 chunk,
759 writer: ArrowColumnWriterImpl::ByteArray(writer),
760 }
761 };
762
763 match data_type {
764 _ if data_type.is_primitive() => out.push(col(leaves.next().unwrap())),
765 ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Boolean | ArrowDataType::Null => out.push(col(leaves.next().unwrap())),
766 ArrowDataType::LargeBinary
767 | ArrowDataType::Binary
768 | ArrowDataType::Utf8
769 | ArrowDataType::LargeUtf8
770 | ArrowDataType::BinaryView
771 | ArrowDataType::Utf8View => {
772 out.push(bytes(leaves.next().unwrap()))
773 }
774 ArrowDataType::List(f)
775 | ArrowDataType::LargeList(f)
776 | ArrowDataType::FixedSizeList(f, _) => {
777 get_arrow_column_writer(f.data_type(), props, leaves, out)?
778 }
779 ArrowDataType::Struct(fields) => {
780 for field in fields {
781 get_arrow_column_writer(field.data_type(), props, leaves, out)?
782 }
783 }
784 ArrowDataType::Map(f, _) => match f.data_type() {
785 ArrowDataType::Struct(f) => {
786 get_arrow_column_writer(f[0].data_type(), props, leaves, out)?;
787 get_arrow_column_writer(f[1].data_type(), props, leaves, out)?
788 }
789 _ => unreachable!("invalid map type"),
790 }
791 ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
792 ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 | ArrowDataType::Binary | ArrowDataType::LargeBinary => {
793 out.push(bytes(leaves.next().unwrap()))
794 }
795 ArrowDataType::Utf8View | ArrowDataType::BinaryView => {
796 out.push(bytes(leaves.next().unwrap()))
797 }
798 _ => {
799 out.push(col(leaves.next().unwrap()))
800 }
801 }
802 _ => return Err(ParquetError::NYI(
803 format!(
804 "Attempting to write an Arrow type {data_type:?} to parquet that is not yet implemented"
805 )
806 ))
807 }
808 Ok(())
809}
810
811fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result<usize> {
812 let column = levels.array().as_ref();
813 let indices = levels.non_null_indices();
814 match writer {
815 ColumnWriter::Int32ColumnWriter(ref mut typed) => {
816 match column.data_type() {
817 ArrowDataType::Date64 => {
818 let array = arrow_cast::cast(column, &ArrowDataType::Date32)?;
820 let array = arrow_cast::cast(&array, &ArrowDataType::Int32)?;
821
822 let array = array.as_primitive::<Int32Type>();
823 write_primitive(typed, array.values(), levels)
824 }
825 ArrowDataType::UInt32 => {
826 let values = column.as_primitive::<UInt32Type>().values();
827 let array = values.inner().typed_data::<i32>();
830 write_primitive(typed, array, levels)
831 }
832 ArrowDataType::Decimal128(_, _) => {
833 let array = column
835 .as_primitive::<Decimal128Type>()
836 .unary::<_, Int32Type>(|v| v as i32);
837 write_primitive(typed, array.values(), levels)
838 }
839 ArrowDataType::Decimal256(_, _) => {
840 let array = column
842 .as_primitive::<Decimal256Type>()
843 .unary::<_, Int32Type>(|v| v.as_i128() as i32);
844 write_primitive(typed, array.values(), levels)
845 }
846 ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
847 ArrowDataType::Decimal128(_, _) => {
848 let array = arrow_cast::cast(column, value_type)?;
849 let array = array
850 .as_primitive::<Decimal128Type>()
851 .unary::<_, Int32Type>(|v| v as i32);
852 write_primitive(typed, array.values(), levels)
853 }
854 ArrowDataType::Decimal256(_, _) => {
855 let array = arrow_cast::cast(column, value_type)?;
856 let array = array
857 .as_primitive::<Decimal256Type>()
858 .unary::<_, Int32Type>(|v| v.as_i128() as i32);
859 write_primitive(typed, array.values(), levels)
860 }
861 _ => {
862 let array = arrow_cast::cast(column, &ArrowDataType::Int32)?;
863 let array = array.as_primitive::<Int32Type>();
864 write_primitive(typed, array.values(), levels)
865 }
866 },
867 _ => {
868 let array = arrow_cast::cast(column, &ArrowDataType::Int32)?;
869 let array = array.as_primitive::<Int32Type>();
870 write_primitive(typed, array.values(), levels)
871 }
872 }
873 }
874 ColumnWriter::BoolColumnWriter(ref mut typed) => {
875 let array = column.as_boolean();
876 typed.write_batch(
877 get_bool_array_slice(array, indices).as_slice(),
878 levels.def_levels(),
879 levels.rep_levels(),
880 )
881 }
882 ColumnWriter::Int64ColumnWriter(ref mut typed) => {
883 match column.data_type() {
884 ArrowDataType::Date64 => {
885 let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
886
887 let array = array.as_primitive::<Int64Type>();
888 write_primitive(typed, array.values(), levels)
889 }
890 ArrowDataType::Int64 => {
891 let array = column.as_primitive::<Int64Type>();
892 write_primitive(typed, array.values(), levels)
893 }
894 ArrowDataType::UInt64 => {
895 let values = column.as_primitive::<UInt64Type>().values();
896 let array = values.inner().typed_data::<i64>();
899 write_primitive(typed, array, levels)
900 }
901 ArrowDataType::Decimal128(_, _) => {
902 let array = column
904 .as_primitive::<Decimal128Type>()
905 .unary::<_, Int64Type>(|v| v as i64);
906 write_primitive(typed, array.values(), levels)
907 }
908 ArrowDataType::Decimal256(_, _) => {
909 let array = column
911 .as_primitive::<Decimal256Type>()
912 .unary::<_, Int64Type>(|v| v.as_i128() as i64);
913 write_primitive(typed, array.values(), levels)
914 }
915 ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
916 ArrowDataType::Decimal128(_, _) => {
917 let array = arrow_cast::cast(column, value_type)?;
918 let array = array
919 .as_primitive::<Decimal128Type>()
920 .unary::<_, Int64Type>(|v| v as i64);
921 write_primitive(typed, array.values(), levels)
922 }
923 ArrowDataType::Decimal256(_, _) => {
924 let array = arrow_cast::cast(column, value_type)?;
925 let array = array
926 .as_primitive::<Decimal256Type>()
927 .unary::<_, Int64Type>(|v| v.as_i128() as i64);
928 write_primitive(typed, array.values(), levels)
929 }
930 _ => {
931 let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
932 let array = array.as_primitive::<Int64Type>();
933 write_primitive(typed, array.values(), levels)
934 }
935 },
936 _ => {
937 let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
938 let array = array.as_primitive::<Int64Type>();
939 write_primitive(typed, array.values(), levels)
940 }
941 }
942 }
943 ColumnWriter::Int96ColumnWriter(ref mut _typed) => {
944 unreachable!("Currently unreachable because data type not supported")
945 }
946 ColumnWriter::FloatColumnWriter(ref mut typed) => {
947 let array = column.as_primitive::<Float32Type>();
948 write_primitive(typed, array.values(), levels)
949 }
950 ColumnWriter::DoubleColumnWriter(ref mut typed) => {
951 let array = column.as_primitive::<Float64Type>();
952 write_primitive(typed, array.values(), levels)
953 }
954 ColumnWriter::ByteArrayColumnWriter(_) => {
955 unreachable!("should use ByteArrayWriter")
956 }
957 ColumnWriter::FixedLenByteArrayColumnWriter(ref mut typed) => {
958 let bytes = match column.data_type() {
959 ArrowDataType::Interval(interval_unit) => match interval_unit {
960 IntervalUnit::YearMonth => {
961 let array = column
962 .as_any()
963 .downcast_ref::<arrow_array::IntervalYearMonthArray>()
964 .unwrap();
965 get_interval_ym_array_slice(array, indices)
966 }
967 IntervalUnit::DayTime => {
968 let array = column
969 .as_any()
970 .downcast_ref::<arrow_array::IntervalDayTimeArray>()
971 .unwrap();
972 get_interval_dt_array_slice(array, indices)
973 }
974 _ => {
975 return Err(ParquetError::NYI(
976 format!(
977 "Attempting to write an Arrow interval type {interval_unit:?} to parquet that is not yet implemented"
978 )
979 ));
980 }
981 },
982 ArrowDataType::FixedSizeBinary(_) => {
983 let array = column
984 .as_any()
985 .downcast_ref::<arrow_array::FixedSizeBinaryArray>()
986 .unwrap();
987 get_fsb_array_slice(array, indices)
988 }
989 ArrowDataType::Decimal128(_, _) => {
990 let array = column.as_primitive::<Decimal128Type>();
991 get_decimal_128_array_slice(array, indices)
992 }
993 ArrowDataType::Decimal256(_, _) => {
994 let array = column
995 .as_any()
996 .downcast_ref::<arrow_array::Decimal256Array>()
997 .unwrap();
998 get_decimal_256_array_slice(array, indices)
999 }
1000 ArrowDataType::Float16 => {
1001 let array = column.as_primitive::<Float16Type>();
1002 get_float_16_array_slice(array, indices)
1003 }
1004 _ => {
1005 return Err(ParquetError::NYI(
1006 "Attempting to write an Arrow type that is not yet implemented".to_string(),
1007 ));
1008 }
1009 };
1010 typed.write_batch(bytes.as_slice(), levels.def_levels(), levels.rep_levels())
1011 }
1012 }
1013}
1014
1015fn write_primitive<E: ColumnValueEncoder>(
1016 writer: &mut GenericColumnWriter<E>,
1017 values: &E::Values,
1018 levels: &ArrayLevels,
1019) -> Result<usize> {
1020 writer.write_batch_internal(
1021 values,
1022 Some(levels.non_null_indices()),
1023 levels.def_levels(),
1024 levels.rep_levels(),
1025 None,
1026 None,
1027 None,
1028 )
1029}
1030
1031fn get_bool_array_slice(array: &arrow_array::BooleanArray, indices: &[usize]) -> Vec<bool> {
1032 let mut values = Vec::with_capacity(indices.len());
1033 for i in indices {
1034 values.push(array.value(*i))
1035 }
1036 values
1037}
1038
1039fn get_interval_ym_array_slice(
1042 array: &arrow_array::IntervalYearMonthArray,
1043 indices: &[usize],
1044) -> Vec<FixedLenByteArray> {
1045 let mut values = Vec::with_capacity(indices.len());
1046 for i in indices {
1047 let mut value = array.value(*i).to_le_bytes().to_vec();
1048 let mut suffix = vec![0; 8];
1049 value.append(&mut suffix);
1050 values.push(FixedLenByteArray::from(ByteArray::from(value)))
1051 }
1052 values
1053}
1054
1055fn get_interval_dt_array_slice(
1058 array: &arrow_array::IntervalDayTimeArray,
1059 indices: &[usize],
1060) -> Vec<FixedLenByteArray> {
1061 let mut values = Vec::with_capacity(indices.len());
1062 for i in indices {
1063 let mut out = [0; 12];
1064 let value = array.value(*i);
1065 out[4..8].copy_from_slice(&value.days.to_le_bytes());
1066 out[8..12].copy_from_slice(&value.milliseconds.to_le_bytes());
1067 values.push(FixedLenByteArray::from(ByteArray::from(out.to_vec())));
1068 }
1069 values
1070}
1071
1072fn get_decimal_128_array_slice(
1073 array: &arrow_array::Decimal128Array,
1074 indices: &[usize],
1075) -> Vec<FixedLenByteArray> {
1076 let mut values = Vec::with_capacity(indices.len());
1077 let size = decimal_length_from_precision(array.precision());
1078 for i in indices {
1079 let as_be_bytes = array.value(*i).to_be_bytes();
1080 let resized_value = as_be_bytes[(16 - size)..].to_vec();
1081 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1082 }
1083 values
1084}
1085
1086fn get_decimal_256_array_slice(
1087 array: &arrow_array::Decimal256Array,
1088 indices: &[usize],
1089) -> Vec<FixedLenByteArray> {
1090 let mut values = Vec::with_capacity(indices.len());
1091 let size = decimal_length_from_precision(array.precision());
1092 for i in indices {
1093 let as_be_bytes = array.value(*i).to_be_bytes();
1094 let resized_value = as_be_bytes[(32 - size)..].to_vec();
1095 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1096 }
1097 values
1098}
1099
1100fn get_float_16_array_slice(
1101 array: &arrow_array::Float16Array,
1102 indices: &[usize],
1103) -> Vec<FixedLenByteArray> {
1104 let mut values = Vec::with_capacity(indices.len());
1105 for i in indices {
1106 let value = array.value(*i).to_le_bytes().to_vec();
1107 values.push(FixedLenByteArray::from(ByteArray::from(value)));
1108 }
1109 values
1110}
1111
1112fn get_fsb_array_slice(
1113 array: &arrow_array::FixedSizeBinaryArray,
1114 indices: &[usize],
1115) -> Vec<FixedLenByteArray> {
1116 let mut values = Vec::with_capacity(indices.len());
1117 for i in indices {
1118 let value = array.value(*i).to_vec();
1119 values.push(FixedLenByteArray::from(ByteArray::from(value)))
1120 }
1121 values
1122}
1123
1124#[cfg(test)]
1125mod tests {
1126 use super::*;
1127
1128 use std::fs::File;
1129
1130 use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
1131 use crate::arrow::ARROW_SCHEMA_META_KEY;
1132 use arrow::datatypes::ToByteSlice;
1133 use arrow::datatypes::{DataType, Schema};
1134 use arrow::error::Result as ArrowResult;
1135 use arrow::util::data_gen::create_random_array;
1136 use arrow::util::pretty::pretty_format_batches;
1137 use arrow::{array::*, buffer::Buffer};
1138 use arrow_buffer::{i256, IntervalDayTime, IntervalMonthDayNano, NullBuffer};
1139 use arrow_schema::Fields;
1140 use half::f16;
1141
1142 use crate::basic::Encoding;
1143 use crate::data_type::AsBytes;
1144 use crate::file::metadata::ParquetMetaData;
1145 use crate::file::page_index::index::Index;
1146 use crate::file::page_index::index_reader::read_offset_indexes;
1147 use crate::file::properties::{
1148 BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion,
1149 };
1150 use crate::file::serialized_reader::ReadOptionsBuilder;
1151 use crate::file::{
1152 reader::{FileReader, SerializedFileReader},
1153 statistics::Statistics,
1154 };
1155
1156 #[test]
1157 fn arrow_writer() {
1158 let schema = Schema::new(vec![
1160 Field::new("a", DataType::Int32, false),
1161 Field::new("b", DataType::Int32, true),
1162 ]);
1163
1164 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1166 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1167
1168 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
1170
1171 roundtrip(batch, Some(SMALL_SIZE / 2));
1172 }
1173
1174 fn get_bytes_after_close(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1175 let mut buffer = vec![];
1176
1177 let mut writer = ArrowWriter::try_new(&mut buffer, schema, None).unwrap();
1178 writer.write(expected_batch).unwrap();
1179 writer.close().unwrap();
1180
1181 buffer
1182 }
1183
1184 fn get_bytes_by_into_inner(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1185 let mut writer = ArrowWriter::try_new(Vec::new(), schema, None).unwrap();
1186 writer.write(expected_batch).unwrap();
1187 writer.into_inner().unwrap()
1188 }
1189
1190 #[test]
1191 fn roundtrip_bytes() {
1192 let schema = Arc::new(Schema::new(vec![
1194 Field::new("a", DataType::Int32, false),
1195 Field::new("b", DataType::Int32, true),
1196 ]));
1197
1198 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1200 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1201
1202 let expected_batch =
1204 RecordBatch::try_new(schema.clone(), vec![Arc::new(a), Arc::new(b)]).unwrap();
1205
1206 for buffer in [
1207 get_bytes_after_close(schema.clone(), &expected_batch),
1208 get_bytes_by_into_inner(schema, &expected_batch),
1209 ] {
1210 let cursor = Bytes::from(buffer);
1211 let mut record_batch_reader = ParquetRecordBatchReader::try_new(cursor, 1024).unwrap();
1212
1213 let actual_batch = record_batch_reader
1214 .next()
1215 .expect("No batch found")
1216 .expect("Unable to get batch");
1217
1218 assert_eq!(expected_batch.schema(), actual_batch.schema());
1219 assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
1220 assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
1221 for i in 0..expected_batch.num_columns() {
1222 let expected_data = expected_batch.column(i).to_data();
1223 let actual_data = actual_batch.column(i).to_data();
1224
1225 assert_eq!(expected_data, actual_data);
1226 }
1227 }
1228 }
1229
1230 #[test]
1231 fn arrow_writer_non_null() {
1232 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1234
1235 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1237
1238 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1240
1241 roundtrip(batch, Some(SMALL_SIZE / 2));
1242 }
1243
1244 #[test]
1245 fn arrow_writer_list() {
1246 let schema = Schema::new(vec![Field::new(
1248 "a",
1249 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
1250 true,
1251 )]);
1252
1253 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1255
1256 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1259
1260 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
1262 DataType::Int32,
1263 false,
1264 ))))
1265 .len(5)
1266 .add_buffer(a_value_offsets)
1267 .add_child_data(a_values.into_data())
1268 .null_bit_buffer(Some(Buffer::from([0b00011011])))
1269 .build()
1270 .unwrap();
1271 let a = ListArray::from(a_list_data);
1272
1273 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1275
1276 assert_eq!(batch.column(0).null_count(), 1);
1277
1278 roundtrip(batch, None);
1281 }
1282
1283 #[test]
1284 fn arrow_writer_list_non_null() {
1285 let schema = Schema::new(vec![Field::new(
1287 "a",
1288 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
1289 false,
1290 )]);
1291
1292 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1294
1295 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1298
1299 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
1301 DataType::Int32,
1302 false,
1303 ))))
1304 .len(5)
1305 .add_buffer(a_value_offsets)
1306 .add_child_data(a_values.into_data())
1307 .build()
1308 .unwrap();
1309 let a = ListArray::from(a_list_data);
1310
1311 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1313
1314 assert_eq!(batch.column(0).null_count(), 0);
1317
1318 roundtrip(batch, None);
1319 }
1320
1321 #[test]
1322 fn arrow_writer_binary() {
1323 let string_field = Field::new("a", DataType::Utf8, false);
1324 let binary_field = Field::new("b", DataType::Binary, false);
1325 let schema = Schema::new(vec![string_field, binary_field]);
1326
1327 let raw_string_values = vec!["foo", "bar", "baz", "quux"];
1328 let raw_binary_values = [
1329 b"foo".to_vec(),
1330 b"bar".to_vec(),
1331 b"baz".to_vec(),
1332 b"quux".to_vec(),
1333 ];
1334 let raw_binary_value_refs = raw_binary_values
1335 .iter()
1336 .map(|x| x.as_slice())
1337 .collect::<Vec<_>>();
1338
1339 let string_values = StringArray::from(raw_string_values.clone());
1340 let binary_values = BinaryArray::from(raw_binary_value_refs);
1341 let batch = RecordBatch::try_new(
1342 Arc::new(schema),
1343 vec![Arc::new(string_values), Arc::new(binary_values)],
1344 )
1345 .unwrap();
1346
1347 roundtrip(batch, Some(SMALL_SIZE / 2));
1348 }
1349
1350 #[test]
1351 fn arrow_writer_binary_view() {
1352 let string_field = Field::new("a", DataType::Utf8View, false);
1353 let binary_field = Field::new("b", DataType::BinaryView, false);
1354 let nullable_string_field = Field::new("a", DataType::Utf8View, true);
1355 let schema = Schema::new(vec![string_field, binary_field, nullable_string_field]);
1356
1357 let raw_string_values = vec!["foo", "bar", "large payload over 12 bytes", "lulu"];
1358 let raw_binary_values = vec![
1359 b"foo".to_vec(),
1360 b"bar".to_vec(),
1361 b"large payload over 12 bytes".to_vec(),
1362 b"lulu".to_vec(),
1363 ];
1364 let nullable_string_values =
1365 vec![Some("foo"), None, Some("large payload over 12 bytes"), None];
1366
1367 let string_view_values = StringViewArray::from(raw_string_values);
1368 let binary_view_values = BinaryViewArray::from_iter_values(raw_binary_values);
1369 let nullable_string_view_values = StringViewArray::from(nullable_string_values);
1370 let batch = RecordBatch::try_new(
1371 Arc::new(schema),
1372 vec![
1373 Arc::new(string_view_values),
1374 Arc::new(binary_view_values),
1375 Arc::new(nullable_string_view_values),
1376 ],
1377 )
1378 .unwrap();
1379
1380 roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
1381 roundtrip(batch, None);
1382 }
1383
1384 fn get_decimal_batch(precision: u8, scale: i8) -> RecordBatch {
1385 let decimal_field = Field::new("a", DataType::Decimal128(precision, scale), false);
1386 let schema = Schema::new(vec![decimal_field]);
1387
1388 let decimal_values = vec![10_000, 50_000, 0, -100]
1389 .into_iter()
1390 .map(Some)
1391 .collect::<Decimal128Array>()
1392 .with_precision_and_scale(precision, scale)
1393 .unwrap();
1394
1395 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(decimal_values)]).unwrap()
1396 }
1397
1398 #[test]
1399 fn arrow_writer_decimal() {
1400 let batch_int32_decimal = get_decimal_batch(5, 2);
1402 roundtrip(batch_int32_decimal, Some(SMALL_SIZE / 2));
1403 let batch_int64_decimal = get_decimal_batch(12, 2);
1405 roundtrip(batch_int64_decimal, Some(SMALL_SIZE / 2));
1406 let batch_fixed_len_byte_array_decimal = get_decimal_batch(30, 2);
1408 roundtrip(batch_fixed_len_byte_array_decimal, Some(SMALL_SIZE / 2));
1409 }
1410
1411 #[test]
1412 fn arrow_writer_complex() {
1413 let struct_field_d = Arc::new(Field::new("d", DataType::Float64, true));
1415 let struct_field_f = Arc::new(Field::new("f", DataType::Float32, true));
1416 let struct_field_g = Arc::new(Field::new_list(
1417 "g",
1418 Field::new_list_field(DataType::Int16, true),
1419 false,
1420 ));
1421 let struct_field_h = Arc::new(Field::new_list(
1422 "h",
1423 Field::new_list_field(DataType::Int16, false),
1424 true,
1425 ));
1426 let struct_field_e = Arc::new(Field::new_struct(
1427 "e",
1428 vec![
1429 struct_field_f.clone(),
1430 struct_field_g.clone(),
1431 struct_field_h.clone(),
1432 ],
1433 false,
1434 ));
1435 let schema = Schema::new(vec![
1436 Field::new("a", DataType::Int32, false),
1437 Field::new("b", DataType::Int32, true),
1438 Field::new_struct(
1439 "c",
1440 vec![struct_field_d.clone(), struct_field_e.clone()],
1441 false,
1442 ),
1443 ]);
1444
1445 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1447 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1448 let d = Float64Array::from(vec![None, None, None, Some(1.0), None]);
1449 let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]);
1450
1451 let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1452
1453 let g_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1456
1457 let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
1459 .len(5)
1460 .add_buffer(g_value_offsets.clone())
1461 .add_child_data(g_value.to_data())
1462 .build()
1463 .unwrap();
1464 let g = ListArray::from(g_list_data);
1465 let h_list_data = ArrayData::builder(struct_field_h.data_type().clone())
1467 .len(5)
1468 .add_buffer(g_value_offsets)
1469 .add_child_data(g_value.to_data())
1470 .null_bit_buffer(Some(Buffer::from([0b00011011])))
1471 .build()
1472 .unwrap();
1473 let h = ListArray::from(h_list_data);
1474
1475 let e = StructArray::from(vec![
1476 (struct_field_f, Arc::new(f) as ArrayRef),
1477 (struct_field_g, Arc::new(g) as ArrayRef),
1478 (struct_field_h, Arc::new(h) as ArrayRef),
1479 ]);
1480
1481 let c = StructArray::from(vec![
1482 (struct_field_d, Arc::new(d) as ArrayRef),
1483 (struct_field_e, Arc::new(e) as ArrayRef),
1484 ]);
1485
1486 let batch = RecordBatch::try_new(
1488 Arc::new(schema),
1489 vec![Arc::new(a), Arc::new(b), Arc::new(c)],
1490 )
1491 .unwrap();
1492
1493 roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
1494 roundtrip(batch, Some(SMALL_SIZE / 3));
1495 }
1496
1497 #[test]
1498 fn arrow_writer_complex_mixed() {
1499 let offset_field = Arc::new(Field::new("offset", DataType::Int32, false));
1504 let partition_field = Arc::new(Field::new("partition", DataType::Int64, true));
1505 let topic_field = Arc::new(Field::new("topic", DataType::Utf8, true));
1506 let schema = Schema::new(vec![Field::new(
1507 "some_nested_object",
1508 DataType::Struct(Fields::from(vec![
1509 offset_field.clone(),
1510 partition_field.clone(),
1511 topic_field.clone(),
1512 ])),
1513 false,
1514 )]);
1515
1516 let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
1518 let partition = Int64Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1519 let topic = StringArray::from(vec![Some("A"), None, Some("A"), Some(""), None]);
1520
1521 let some_nested_object = StructArray::from(vec![
1522 (offset_field, Arc::new(offset) as ArrayRef),
1523 (partition_field, Arc::new(partition) as ArrayRef),
1524 (topic_field, Arc::new(topic) as ArrayRef),
1525 ]);
1526
1527 let batch =
1529 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
1530
1531 roundtrip(batch, Some(SMALL_SIZE / 2));
1532 }
1533
1534 #[test]
1535 fn arrow_writer_map() {
1536 let json_content = r#"
1538 {"stocks":{"long": "$AAA", "short": "$BBB"}}
1539 {"stocks":{"long": null, "long": "$CCC", "short": null}}
1540 {"stocks":{"hedged": "$YYY", "long": null, "short": "$D"}}
1541 "#;
1542 let entries_struct_type = DataType::Struct(Fields::from(vec![
1543 Field::new("key", DataType::Utf8, false),
1544 Field::new("value", DataType::Utf8, true),
1545 ]));
1546 let stocks_field = Field::new(
1547 "stocks",
1548 DataType::Map(
1549 Arc::new(Field::new("entries", entries_struct_type, false)),
1550 false,
1551 ),
1552 true,
1553 );
1554 let schema = Arc::new(Schema::new(vec![stocks_field]));
1555 let builder = arrow::json::ReaderBuilder::new(schema).with_batch_size(64);
1556 let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
1557
1558 let batch = reader.next().unwrap().unwrap();
1559 roundtrip(batch, None);
1560 }
1561
1562 #[test]
1563 fn arrow_writer_2_level_struct() {
1564 let field_c = Field::new("c", DataType::Int32, true);
1566 let field_b = Field::new("b", DataType::Struct(vec![field_c].into()), true);
1567 let type_a = DataType::Struct(vec![field_b.clone()].into());
1568 let field_a = Field::new("a", type_a, true);
1569 let schema = Schema::new(vec![field_a.clone()]);
1570
1571 let c = Int32Array::from(vec![Some(1), None, Some(3), None, None, Some(6)]);
1573 let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
1574 .len(6)
1575 .null_bit_buffer(Some(Buffer::from([0b00100111])))
1576 .add_child_data(c.into_data())
1577 .build()
1578 .unwrap();
1579 let b = StructArray::from(b_data);
1580 let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
1581 .len(6)
1582 .null_bit_buffer(Some(Buffer::from([0b00101111])))
1583 .add_child_data(b.into_data())
1584 .build()
1585 .unwrap();
1586 let a = StructArray::from(a_data);
1587
1588 assert_eq!(a.null_count(), 1);
1589 assert_eq!(a.column(0).null_count(), 2);
1590
1591 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1593
1594 roundtrip(batch, Some(SMALL_SIZE / 2));
1595 }
1596
1597 #[test]
1598 fn arrow_writer_2_level_struct_non_null() {
1599 let field_c = Field::new("c", DataType::Int32, false);
1601 let type_b = DataType::Struct(vec![field_c].into());
1602 let field_b = Field::new("b", type_b.clone(), false);
1603 let type_a = DataType::Struct(vec![field_b].into());
1604 let field_a = Field::new("a", type_a.clone(), false);
1605 let schema = Schema::new(vec![field_a]);
1606
1607 let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
1609 let b_data = ArrayDataBuilder::new(type_b)
1610 .len(6)
1611 .add_child_data(c.into_data())
1612 .build()
1613 .unwrap();
1614 let b = StructArray::from(b_data);
1615 let a_data = ArrayDataBuilder::new(type_a)
1616 .len(6)
1617 .add_child_data(b.into_data())
1618 .build()
1619 .unwrap();
1620 let a = StructArray::from(a_data);
1621
1622 assert_eq!(a.null_count(), 0);
1623 assert_eq!(a.column(0).null_count(), 0);
1624
1625 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1627
1628 roundtrip(batch, Some(SMALL_SIZE / 2));
1629 }
1630
1631 #[test]
1632 fn arrow_writer_2_level_struct_mixed_null() {
1633 let field_c = Field::new("c", DataType::Int32, false);
1635 let type_b = DataType::Struct(vec![field_c].into());
1636 let field_b = Field::new("b", type_b.clone(), true);
1637 let type_a = DataType::Struct(vec![field_b].into());
1638 let field_a = Field::new("a", type_a.clone(), false);
1639 let schema = Schema::new(vec![field_a]);
1640
1641 let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
1643 let b_data = ArrayDataBuilder::new(type_b)
1644 .len(6)
1645 .null_bit_buffer(Some(Buffer::from([0b00100111])))
1646 .add_child_data(c.into_data())
1647 .build()
1648 .unwrap();
1649 let b = StructArray::from(b_data);
1650 let a_data = ArrayDataBuilder::new(type_a)
1652 .len(6)
1653 .add_child_data(b.into_data())
1654 .build()
1655 .unwrap();
1656 let a = StructArray::from(a_data);
1657
1658 assert_eq!(a.null_count(), 0);
1659 assert_eq!(a.column(0).null_count(), 2);
1660
1661 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1663
1664 roundtrip(batch, Some(SMALL_SIZE / 2));
1665 }
1666
1667 #[test]
1668 fn arrow_writer_2_level_struct_mixed_null_2() {
1669 let field_c = Field::new("c", DataType::Int32, false);
1671 let field_d = Field::new("d", DataType::FixedSizeBinary(4), false);
1672 let field_e = Field::new(
1673 "e",
1674 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1675 false,
1676 );
1677
1678 let field_b = Field::new(
1679 "b",
1680 DataType::Struct(vec![field_c, field_d, field_e].into()),
1681 false,
1682 );
1683 let type_a = DataType::Struct(vec![field_b.clone()].into());
1684 let field_a = Field::new("a", type_a, true);
1685 let schema = Schema::new(vec![field_a.clone()]);
1686
1687 let c = Int32Array::from_iter_values(0..6);
1689 let d = FixedSizeBinaryArray::try_from_iter(
1690 ["aaaa", "bbbb", "cccc", "dddd", "eeee", "ffff"].into_iter(),
1691 )
1692 .expect("four byte values");
1693 let e = Int32DictionaryArray::from_iter(["one", "two", "three", "four", "five", "one"]);
1694 let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
1695 .len(6)
1696 .add_child_data(c.into_data())
1697 .add_child_data(d.into_data())
1698 .add_child_data(e.into_data())
1699 .build()
1700 .unwrap();
1701 let b = StructArray::from(b_data);
1702 let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
1703 .len(6)
1704 .null_bit_buffer(Some(Buffer::from([0b00100101])))
1705 .add_child_data(b.into_data())
1706 .build()
1707 .unwrap();
1708 let a = StructArray::from(a_data);
1709
1710 assert_eq!(a.null_count(), 3);
1711 assert_eq!(a.column(0).null_count(), 0);
1712
1713 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1715
1716 roundtrip(batch, Some(SMALL_SIZE / 2));
1717 }
1718
1719 #[test]
1720 fn test_empty_dict() {
1721 let struct_fields = Fields::from(vec![Field::new(
1722 "dict",
1723 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1724 false,
1725 )]);
1726
1727 let schema = Schema::new(vec![Field::new_struct(
1728 "struct",
1729 struct_fields.clone(),
1730 true,
1731 )]);
1732 let dictionary = Arc::new(DictionaryArray::new(
1733 Int32Array::new_null(5),
1734 Arc::new(StringArray::new_null(0)),
1735 ));
1736
1737 let s = StructArray::new(
1738 struct_fields,
1739 vec![dictionary],
1740 Some(NullBuffer::new_null(5)),
1741 );
1742
1743 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(s)]).unwrap();
1744 roundtrip(batch, None);
1745 }
1746 #[test]
1747 fn arrow_writer_page_size() {
1748 let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)]));
1749
1750 let mut builder = StringBuilder::with_capacity(100, 329 * 10_000);
1751
1752 for i in 0..10 {
1754 let value = i
1755 .to_string()
1756 .repeat(10)
1757 .chars()
1758 .take(10)
1759 .collect::<String>();
1760
1761 builder.append_value(value);
1762 }
1763
1764 let array = Arc::new(builder.finish());
1765
1766 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
1767
1768 let file = tempfile::tempfile().unwrap();
1769
1770 let props = WriterProperties::builder()
1772 .set_data_page_size_limit(1)
1773 .set_dictionary_page_size_limit(1)
1774 .set_write_batch_size(1)
1775 .build();
1776
1777 let mut writer =
1778 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
1779 .expect("Unable to write file");
1780 writer.write(&batch).unwrap();
1781 writer.close().unwrap();
1782
1783 let reader = SerializedFileReader::new(file.try_clone().unwrap()).unwrap();
1784
1785 let column = reader.metadata().row_group(0).columns();
1786
1787 assert_eq!(column.len(), 1);
1788
1789 assert!(
1792 column[0].dictionary_page_offset().is_some(),
1793 "Expected a dictionary page"
1794 );
1795
1796 let offset_indexes = read_offset_indexes(&file, column).unwrap().unwrap();
1797
1798 let page_locations = offset_indexes[0].page_locations.clone();
1799
1800 assert_eq!(
1803 page_locations.len(),
1804 10,
1805 "Expected 9 pages but got {page_locations:#?}"
1806 );
1807 }
1808
1809 #[test]
1810 fn arrow_writer_float_nans() {
1811 let f16_field = Field::new("a", DataType::Float16, false);
1812 let f32_field = Field::new("b", DataType::Float32, false);
1813 let f64_field = Field::new("c", DataType::Float64, false);
1814 let schema = Schema::new(vec![f16_field, f32_field, f64_field]);
1815
1816 let f16_values = (0..MEDIUM_SIZE)
1817 .map(|i| {
1818 Some(if i % 2 == 0 {
1819 f16::NAN
1820 } else {
1821 f16::from_f32(i as f32)
1822 })
1823 })
1824 .collect::<Float16Array>();
1825
1826 let f32_values = (0..MEDIUM_SIZE)
1827 .map(|i| Some(if i % 2 == 0 { f32::NAN } else { i as f32 }))
1828 .collect::<Float32Array>();
1829
1830 let f64_values = (0..MEDIUM_SIZE)
1831 .map(|i| Some(if i % 2 == 0 { f64::NAN } else { i as f64 }))
1832 .collect::<Float64Array>();
1833
1834 let batch = RecordBatch::try_new(
1835 Arc::new(schema),
1836 vec![
1837 Arc::new(f16_values),
1838 Arc::new(f32_values),
1839 Arc::new(f64_values),
1840 ],
1841 )
1842 .unwrap();
1843
1844 roundtrip(batch, None);
1845 }
1846
1847 const SMALL_SIZE: usize = 7;
1848 const MEDIUM_SIZE: usize = 63;
1849
1850 fn roundtrip(expected_batch: RecordBatch, max_row_group_size: Option<usize>) -> Vec<File> {
1851 let mut files = vec![];
1852 for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
1853 let mut props = WriterProperties::builder().set_writer_version(version);
1854
1855 if let Some(size) = max_row_group_size {
1856 props = props.set_max_row_group_size(size)
1857 }
1858
1859 let props = props.build();
1860 files.push(roundtrip_opts(&expected_batch, props))
1861 }
1862 files
1863 }
1864
1865 fn roundtrip_opts_with_array_validation<F>(
1866 expected_batch: &RecordBatch,
1867 props: WriterProperties,
1868 validate: F,
1869 ) -> File
1870 where
1871 F: Fn(&ArrayData, &ArrayData),
1872 {
1873 let file = tempfile::tempfile().unwrap();
1874
1875 let mut writer = ArrowWriter::try_new(
1876 file.try_clone().unwrap(),
1877 expected_batch.schema(),
1878 Some(props),
1879 )
1880 .expect("Unable to write file");
1881 writer.write(expected_batch).unwrap();
1882 writer.close().unwrap();
1883
1884 let mut record_batch_reader =
1885 ParquetRecordBatchReader::try_new(file.try_clone().unwrap(), 1024).unwrap();
1886
1887 let actual_batch = record_batch_reader
1888 .next()
1889 .expect("No batch found")
1890 .expect("Unable to get batch");
1891
1892 assert_eq!(expected_batch.schema(), actual_batch.schema());
1893 assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
1894 assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
1895 for i in 0..expected_batch.num_columns() {
1896 let expected_data = expected_batch.column(i).to_data();
1897 let actual_data = actual_batch.column(i).to_data();
1898 validate(&expected_data, &actual_data);
1899 }
1900
1901 file
1902 }
1903
1904 fn roundtrip_opts(expected_batch: &RecordBatch, props: WriterProperties) -> File {
1905 roundtrip_opts_with_array_validation(expected_batch, props, |a, b| {
1906 a.validate_full().expect("valid expected data");
1907 b.validate_full().expect("valid actual data");
1908 assert_eq!(a, b)
1909 })
1910 }
1911
1912 struct RoundTripOptions {
1913 values: ArrayRef,
1914 schema: SchemaRef,
1915 bloom_filter: bool,
1916 bloom_filter_position: BloomFilterPosition,
1917 }
1918
1919 impl RoundTripOptions {
1920 fn new(values: ArrayRef, nullable: bool) -> Self {
1921 let data_type = values.data_type().clone();
1922 let schema = Schema::new(vec![Field::new("col", data_type, nullable)]);
1923 Self {
1924 values,
1925 schema: Arc::new(schema),
1926 bloom_filter: false,
1927 bloom_filter_position: BloomFilterPosition::AfterRowGroup,
1928 }
1929 }
1930 }
1931
1932 fn one_column_roundtrip(values: ArrayRef, nullable: bool) -> Vec<File> {
1933 one_column_roundtrip_with_options(RoundTripOptions::new(values, nullable))
1934 }
1935
1936 fn one_column_roundtrip_with_schema(values: ArrayRef, schema: SchemaRef) -> Vec<File> {
1937 let mut options = RoundTripOptions::new(values, false);
1938 options.schema = schema;
1939 one_column_roundtrip_with_options(options)
1940 }
1941
1942 fn one_column_roundtrip_with_options(options: RoundTripOptions) -> Vec<File> {
1943 let RoundTripOptions {
1944 values,
1945 schema,
1946 bloom_filter,
1947 bloom_filter_position,
1948 } = options;
1949
1950 let encodings = match values.data_type() {
1951 DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary | DataType::LargeBinary => {
1952 vec![
1953 Encoding::PLAIN,
1954 Encoding::DELTA_BYTE_ARRAY,
1955 Encoding::DELTA_LENGTH_BYTE_ARRAY,
1956 ]
1957 }
1958 DataType::Int64
1959 | DataType::Int32
1960 | DataType::Int16
1961 | DataType::Int8
1962 | DataType::UInt64
1963 | DataType::UInt32
1964 | DataType::UInt16
1965 | DataType::UInt8 => vec![
1966 Encoding::PLAIN,
1967 Encoding::DELTA_BINARY_PACKED,
1968 Encoding::BYTE_STREAM_SPLIT,
1969 ],
1970 DataType::Float32 | DataType::Float64 => {
1971 vec![Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT]
1972 }
1973 _ => vec![Encoding::PLAIN],
1974 };
1975
1976 let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
1977
1978 let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
1979
1980 let mut files = vec![];
1981 for dictionary_size in [0, 1, 1024] {
1982 for encoding in &encodings {
1983 for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
1984 for row_group_size in row_group_sizes {
1985 let props = WriterProperties::builder()
1986 .set_writer_version(version)
1987 .set_max_row_group_size(row_group_size)
1988 .set_dictionary_enabled(dictionary_size != 0)
1989 .set_dictionary_page_size_limit(dictionary_size.max(1))
1990 .set_encoding(*encoding)
1991 .set_bloom_filter_enabled(bloom_filter)
1992 .set_bloom_filter_position(bloom_filter_position)
1993 .build();
1994
1995 files.push(roundtrip_opts(&expected_batch, props))
1996 }
1997 }
1998 }
1999 }
2000 files
2001 }
2002
2003 fn values_required<A, I>(iter: I) -> Vec<File>
2004 where
2005 A: From<Vec<I::Item>> + Array + 'static,
2006 I: IntoIterator,
2007 {
2008 let raw_values: Vec<_> = iter.into_iter().collect();
2009 let values = Arc::new(A::from(raw_values));
2010 one_column_roundtrip(values, false)
2011 }
2012
2013 fn values_optional<A, I>(iter: I) -> Vec<File>
2014 where
2015 A: From<Vec<Option<I::Item>>> + Array + 'static,
2016 I: IntoIterator,
2017 {
2018 let optional_raw_values: Vec<_> = iter
2019 .into_iter()
2020 .enumerate()
2021 .map(|(i, v)| if i % 2 == 0 { None } else { Some(v) })
2022 .collect();
2023 let optional_values = Arc::new(A::from(optional_raw_values));
2024 one_column_roundtrip(optional_values, true)
2025 }
2026
2027 fn required_and_optional<A, I>(iter: I)
2028 where
2029 A: From<Vec<I::Item>> + From<Vec<Option<I::Item>>> + Array + 'static,
2030 I: IntoIterator + Clone,
2031 {
2032 values_required::<A, I>(iter.clone());
2033 values_optional::<A, I>(iter);
2034 }
2035
2036 fn check_bloom_filter<T: AsBytes>(
2037 files: Vec<File>,
2038 file_column: String,
2039 positive_values: Vec<T>,
2040 negative_values: Vec<T>,
2041 ) {
2042 files.into_iter().take(1).for_each(|file| {
2043 let file_reader = SerializedFileReader::new_with_options(
2044 file,
2045 ReadOptionsBuilder::new()
2046 .with_reader_properties(
2047 ReaderProperties::builder()
2048 .set_read_bloom_filter(true)
2049 .build(),
2050 )
2051 .build(),
2052 )
2053 .expect("Unable to open file as Parquet");
2054 let metadata = file_reader.metadata();
2055
2056 let mut bloom_filters: Vec<_> = vec![];
2058 for (ri, row_group) in metadata.row_groups().iter().enumerate() {
2059 if let Some((column_index, _)) = row_group
2060 .columns()
2061 .iter()
2062 .enumerate()
2063 .find(|(_, column)| column.column_path().string() == file_column)
2064 {
2065 let row_group_reader = file_reader
2066 .get_row_group(ri)
2067 .expect("Unable to read row group");
2068 if let Some(sbbf) = row_group_reader.get_column_bloom_filter(column_index) {
2069 bloom_filters.push(sbbf.clone());
2070 } else {
2071 panic!("No bloom filter for column named {file_column} found");
2072 }
2073 } else {
2074 panic!("No column named {file_column} found");
2075 }
2076 }
2077
2078 positive_values.iter().for_each(|value| {
2079 let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2080 assert!(
2081 found.is_some(),
2082 "{}",
2083 format!("Value {:?} should be in bloom filter", value.as_bytes())
2084 );
2085 });
2086
2087 negative_values.iter().for_each(|value| {
2088 let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2089 assert!(
2090 found.is_none(),
2091 "{}",
2092 format!("Value {:?} should not be in bloom filter", value.as_bytes())
2093 );
2094 });
2095 });
2096 }
2097
2098 #[test]
2099 fn all_null_primitive_single_column() {
2100 let values = Arc::new(Int32Array::from(vec![None; SMALL_SIZE]));
2101 one_column_roundtrip(values, true);
2102 }
2103 #[test]
2104 fn null_single_column() {
2105 let values = Arc::new(NullArray::new(SMALL_SIZE));
2106 one_column_roundtrip(values, true);
2107 }
2109
2110 #[test]
2111 fn bool_single_column() {
2112 required_and_optional::<BooleanArray, _>(
2113 [true, false].iter().cycle().copied().take(SMALL_SIZE),
2114 );
2115 }
2116
2117 #[test]
2118 fn bool_large_single_column() {
2119 let values = Arc::new(
2120 [None, Some(true), Some(false)]
2121 .iter()
2122 .cycle()
2123 .copied()
2124 .take(200_000)
2125 .collect::<BooleanArray>(),
2126 );
2127 let schema = Schema::new(vec![Field::new("col", values.data_type().clone(), true)]);
2128 let expected_batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2129 let file = tempfile::tempfile().unwrap();
2130
2131 let mut writer =
2132 ArrowWriter::try_new(file.try_clone().unwrap(), expected_batch.schema(), None)
2133 .expect("Unable to write file");
2134 writer.write(&expected_batch).unwrap();
2135 writer.close().unwrap();
2136 }
2137
2138 #[test]
2139 fn check_page_offset_index_with_nan() {
2140 let values = Arc::new(Float64Array::from(vec![f64::NAN; 10]));
2141 let schema = Schema::new(vec![Field::new("col", DataType::Float64, true)]);
2142 let batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2143
2144 let mut out = Vec::with_capacity(1024);
2145 let mut writer =
2146 ArrowWriter::try_new(&mut out, batch.schema(), None).expect("Unable to write file");
2147 writer.write(&batch).unwrap();
2148 let file_meta_data = writer.close().unwrap();
2149 for row_group in file_meta_data.row_groups {
2150 for column in row_group.columns {
2151 assert!(column.offset_index_offset.is_some());
2152 assert!(column.offset_index_length.is_some());
2153 assert!(column.column_index_offset.is_none());
2154 assert!(column.column_index_length.is_none());
2155 }
2156 }
2157 }
2158
2159 #[test]
2160 fn i8_single_column() {
2161 required_and_optional::<Int8Array, _>(0..SMALL_SIZE as i8);
2162 }
2163
2164 #[test]
2165 fn i16_single_column() {
2166 required_and_optional::<Int16Array, _>(0..SMALL_SIZE as i16);
2167 }
2168
2169 #[test]
2170 fn i32_single_column() {
2171 required_and_optional::<Int32Array, _>(0..SMALL_SIZE as i32);
2172 }
2173
2174 #[test]
2175 fn i64_single_column() {
2176 required_and_optional::<Int64Array, _>(0..SMALL_SIZE as i64);
2177 }
2178
2179 #[test]
2180 fn u8_single_column() {
2181 required_and_optional::<UInt8Array, _>(0..SMALL_SIZE as u8);
2182 }
2183
2184 #[test]
2185 fn u16_single_column() {
2186 required_and_optional::<UInt16Array, _>(0..SMALL_SIZE as u16);
2187 }
2188
2189 #[test]
2190 fn u32_single_column() {
2191 required_and_optional::<UInt32Array, _>(0..SMALL_SIZE as u32);
2192 }
2193
2194 #[test]
2195 fn u64_single_column() {
2196 required_and_optional::<UInt64Array, _>(0..SMALL_SIZE as u64);
2197 }
2198
2199 #[test]
2200 fn f32_single_column() {
2201 required_and_optional::<Float32Array, _>((0..SMALL_SIZE).map(|i| i as f32));
2202 }
2203
2204 #[test]
2205 fn f64_single_column() {
2206 required_and_optional::<Float64Array, _>((0..SMALL_SIZE).map(|i| i as f64));
2207 }
2208
2209 #[test]
2214 fn timestamp_second_single_column() {
2215 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2216 let values = Arc::new(TimestampSecondArray::from(raw_values));
2217
2218 one_column_roundtrip(values, false);
2219 }
2220
2221 #[test]
2222 fn timestamp_millisecond_single_column() {
2223 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2224 let values = Arc::new(TimestampMillisecondArray::from(raw_values));
2225
2226 one_column_roundtrip(values, false);
2227 }
2228
2229 #[test]
2230 fn timestamp_microsecond_single_column() {
2231 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2232 let values = Arc::new(TimestampMicrosecondArray::from(raw_values));
2233
2234 one_column_roundtrip(values, false);
2235 }
2236
2237 #[test]
2238 fn timestamp_nanosecond_single_column() {
2239 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2240 let values = Arc::new(TimestampNanosecondArray::from(raw_values));
2241
2242 one_column_roundtrip(values, false);
2243 }
2244
2245 #[test]
2246 fn date32_single_column() {
2247 required_and_optional::<Date32Array, _>(0..SMALL_SIZE as i32);
2248 }
2249
2250 #[test]
2251 fn date64_single_column() {
2252 required_and_optional::<Date64Array, _>(
2254 (0..(SMALL_SIZE as i64 * 86400000)).step_by(86400000),
2255 );
2256 }
2257
2258 #[test]
2259 fn time32_second_single_column() {
2260 required_and_optional::<Time32SecondArray, _>(0..SMALL_SIZE as i32);
2261 }
2262
2263 #[test]
2264 fn time32_millisecond_single_column() {
2265 required_and_optional::<Time32MillisecondArray, _>(0..SMALL_SIZE as i32);
2266 }
2267
2268 #[test]
2269 fn time64_microsecond_single_column() {
2270 required_and_optional::<Time64MicrosecondArray, _>(0..SMALL_SIZE as i64);
2271 }
2272
2273 #[test]
2274 fn time64_nanosecond_single_column() {
2275 required_and_optional::<Time64NanosecondArray, _>(0..SMALL_SIZE as i64);
2276 }
2277
2278 #[test]
2279 #[should_panic(expected = "Converting Duration to parquet not supported")]
2280 fn duration_second_single_column() {
2281 required_and_optional::<DurationSecondArray, _>(0..SMALL_SIZE as i64);
2282 }
2283
2284 #[test]
2285 #[should_panic(expected = "Converting Duration to parquet not supported")]
2286 fn duration_millisecond_single_column() {
2287 required_and_optional::<DurationMillisecondArray, _>(0..SMALL_SIZE as i64);
2288 }
2289
2290 #[test]
2291 #[should_panic(expected = "Converting Duration to parquet not supported")]
2292 fn duration_microsecond_single_column() {
2293 required_and_optional::<DurationMicrosecondArray, _>(0..SMALL_SIZE as i64);
2294 }
2295
2296 #[test]
2297 #[should_panic(expected = "Converting Duration to parquet not supported")]
2298 fn duration_nanosecond_single_column() {
2299 required_and_optional::<DurationNanosecondArray, _>(0..SMALL_SIZE as i64);
2300 }
2301
2302 #[test]
2303 fn interval_year_month_single_column() {
2304 required_and_optional::<IntervalYearMonthArray, _>(0..SMALL_SIZE as i32);
2305 }
2306
2307 #[test]
2308 fn interval_day_time_single_column() {
2309 required_and_optional::<IntervalDayTimeArray, _>(vec![
2310 IntervalDayTime::new(0, 1),
2311 IntervalDayTime::new(0, 3),
2312 IntervalDayTime::new(3, -2),
2313 IntervalDayTime::new(-200, 4),
2314 ]);
2315 }
2316
2317 #[test]
2318 #[should_panic(
2319 expected = "Attempting to write an Arrow interval type MonthDayNano to parquet that is not yet implemented"
2320 )]
2321 fn interval_month_day_nano_single_column() {
2322 required_and_optional::<IntervalMonthDayNanoArray, _>(vec![
2323 IntervalMonthDayNano::new(0, 1, 5),
2324 IntervalMonthDayNano::new(0, 3, 2),
2325 IntervalMonthDayNano::new(3, -2, -5),
2326 IntervalMonthDayNano::new(-200, 4, -1),
2327 ]);
2328 }
2329
2330 #[test]
2331 fn binary_single_column() {
2332 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2333 let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect();
2334 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2335
2336 values_required::<BinaryArray, _>(many_vecs_iter);
2338 }
2339
2340 #[test]
2341 fn binary_view_single_column() {
2342 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2343 let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect();
2344 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2345
2346 values_required::<BinaryViewArray, _>(many_vecs_iter);
2348 }
2349
2350 #[test]
2351 fn i32_column_bloom_filter_at_end() {
2352 let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
2353 let mut options = RoundTripOptions::new(array, false);
2354 options.bloom_filter = true;
2355 options.bloom_filter_position = BloomFilterPosition::End;
2356
2357 let files = one_column_roundtrip_with_options(options);
2358 check_bloom_filter(
2359 files,
2360 "col".to_string(),
2361 (0..SMALL_SIZE as i32).collect(),
2362 (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
2363 );
2364 }
2365
2366 #[test]
2367 fn i32_column_bloom_filter() {
2368 let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
2369 let mut options = RoundTripOptions::new(array, false);
2370 options.bloom_filter = true;
2371
2372 let files = one_column_roundtrip_with_options(options);
2373 check_bloom_filter(
2374 files,
2375 "col".to_string(),
2376 (0..SMALL_SIZE as i32).collect(),
2377 (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
2378 );
2379 }
2380
2381 #[test]
2382 fn binary_column_bloom_filter() {
2383 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2384 let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect();
2385 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2386
2387 let array = Arc::new(BinaryArray::from_iter_values(many_vecs_iter));
2388 let mut options = RoundTripOptions::new(array, false);
2389 options.bloom_filter = true;
2390
2391 let files = one_column_roundtrip_with_options(options);
2392 check_bloom_filter(
2393 files,
2394 "col".to_string(),
2395 many_vecs,
2396 vec![vec![(SMALL_SIZE + 1) as u8]],
2397 );
2398 }
2399
2400 #[test]
2401 fn empty_string_null_column_bloom_filter() {
2402 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2403 let raw_strs = raw_values.iter().map(|s| s.as_str());
2404
2405 let array = Arc::new(StringArray::from_iter_values(raw_strs));
2406 let mut options = RoundTripOptions::new(array, false);
2407 options.bloom_filter = true;
2408
2409 let files = one_column_roundtrip_with_options(options);
2410
2411 let optional_raw_values: Vec<_> = raw_values
2412 .iter()
2413 .enumerate()
2414 .filter_map(|(i, v)| if i % 2 == 0 { None } else { Some(v.as_str()) })
2415 .collect();
2416 check_bloom_filter(files, "col".to_string(), optional_raw_values, vec![""]);
2418 }
2419
2420 #[test]
2421 fn large_binary_single_column() {
2422 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2423 let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect();
2424 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2425
2426 values_required::<LargeBinaryArray, _>(many_vecs_iter);
2428 }
2429
2430 #[test]
2431 fn fixed_size_binary_single_column() {
2432 let mut builder = FixedSizeBinaryBuilder::new(4);
2433 builder.append_value(b"0123").unwrap();
2434 builder.append_null();
2435 builder.append_value(b"8910").unwrap();
2436 builder.append_value(b"1112").unwrap();
2437 let array = Arc::new(builder.finish());
2438
2439 one_column_roundtrip(array, true);
2440 }
2441
2442 #[test]
2443 fn string_single_column() {
2444 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2445 let raw_strs = raw_values.iter().map(|s| s.as_str());
2446
2447 required_and_optional::<StringArray, _>(raw_strs);
2448 }
2449
2450 #[test]
2451 fn large_string_single_column() {
2452 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2453 let raw_strs = raw_values.iter().map(|s| s.as_str());
2454
2455 required_and_optional::<LargeStringArray, _>(raw_strs);
2456 }
2457
2458 #[test]
2459 fn string_view_single_column() {
2460 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2461 let raw_strs = raw_values.iter().map(|s| s.as_str());
2462
2463 required_and_optional::<StringViewArray, _>(raw_strs);
2464 }
2465
2466 #[test]
2467 fn null_list_single_column() {
2468 let null_field = Field::new_list_field(DataType::Null, true);
2469 let list_field = Field::new("emptylist", DataType::List(Arc::new(null_field)), true);
2470
2471 let schema = Schema::new(vec![list_field]);
2472
2473 let a_values = NullArray::new(2);
2475 let a_value_offsets = arrow::buffer::Buffer::from([0, 0, 0, 2].to_byte_slice());
2476 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2477 DataType::Null,
2478 true,
2479 ))))
2480 .len(3)
2481 .add_buffer(a_value_offsets)
2482 .null_bit_buffer(Some(Buffer::from([0b00000101])))
2483 .add_child_data(a_values.into_data())
2484 .build()
2485 .unwrap();
2486
2487 let a = ListArray::from(a_list_data);
2488
2489 assert!(a.is_valid(0));
2490 assert!(!a.is_valid(1));
2491 assert!(a.is_valid(2));
2492
2493 assert_eq!(a.value(0).len(), 0);
2494 assert_eq!(a.value(2).len(), 2);
2495 assert_eq!(a.value(2).logical_nulls().unwrap().null_count(), 2);
2496
2497 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2498 roundtrip(batch, None);
2499 }
2500
2501 #[test]
2502 fn list_single_column() {
2503 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2504 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
2505 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2506 DataType::Int32,
2507 false,
2508 ))))
2509 .len(5)
2510 .add_buffer(a_value_offsets)
2511 .null_bit_buffer(Some(Buffer::from([0b00011011])))
2512 .add_child_data(a_values.into_data())
2513 .build()
2514 .unwrap();
2515
2516 assert_eq!(a_list_data.null_count(), 1);
2517
2518 let a = ListArray::from(a_list_data);
2519 let values = Arc::new(a);
2520
2521 one_column_roundtrip(values, true);
2522 }
2523
2524 #[test]
2525 fn large_list_single_column() {
2526 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2527 let a_value_offsets = arrow::buffer::Buffer::from([0i64, 1, 3, 3, 6, 10].to_byte_slice());
2528 let a_list_data = ArrayData::builder(DataType::LargeList(Arc::new(Field::new(
2529 "large_item",
2530 DataType::Int32,
2531 true,
2532 ))))
2533 .len(5)
2534 .add_buffer(a_value_offsets)
2535 .add_child_data(a_values.into_data())
2536 .null_bit_buffer(Some(Buffer::from([0b00011011])))
2537 .build()
2538 .unwrap();
2539
2540 assert_eq!(a_list_data.null_count(), 1);
2542
2543 let a = LargeListArray::from(a_list_data);
2544 let values = Arc::new(a);
2545
2546 one_column_roundtrip(values, true);
2547 }
2548
2549 #[test]
2550 fn list_nested_nulls() {
2551 use arrow::datatypes::Int32Type;
2552 let data = vec![
2553 Some(vec![Some(1)]),
2554 Some(vec![Some(2), Some(3)]),
2555 None,
2556 Some(vec![Some(4), Some(5), None]),
2557 Some(vec![None]),
2558 Some(vec![Some(6), Some(7)]),
2559 ];
2560
2561 let list = ListArray::from_iter_primitive::<Int32Type, _, _>(data.clone());
2562 one_column_roundtrip(Arc::new(list), true);
2563
2564 let list = LargeListArray::from_iter_primitive::<Int32Type, _, _>(data);
2565 one_column_roundtrip(Arc::new(list), true);
2566 }
2567
2568 #[test]
2569 fn struct_single_column() {
2570 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2571 let struct_field_a = Arc::new(Field::new("f", DataType::Int32, false));
2572 let s = StructArray::from(vec![(struct_field_a, Arc::new(a_values) as ArrayRef)]);
2573
2574 let values = Arc::new(s);
2575 one_column_roundtrip(values, false);
2576 }
2577
2578 #[test]
2579 fn list_and_map_coerced_names() {
2580 let list_field =
2582 Field::new_list("my_list", Field::new("item", DataType::Int32, false), false);
2583 let map_field = Field::new_map(
2584 "my_map",
2585 "entries",
2586 Field::new("keys", DataType::Int32, false),
2587 Field::new("values", DataType::Int32, true),
2588 false,
2589 true,
2590 );
2591
2592 let list_array = create_random_array(&list_field, 100, 0.0, 0.0).unwrap();
2593 let map_array = create_random_array(&map_field, 100, 0.0, 0.0).unwrap();
2594
2595 let arrow_schema = Arc::new(Schema::new(vec![list_field, map_field]));
2596
2597 let props = Some(WriterProperties::builder().set_coerce_types(true).build());
2599 let file = tempfile::tempfile().unwrap();
2600 let mut writer =
2601 ArrowWriter::try_new(file.try_clone().unwrap(), arrow_schema.clone(), props).unwrap();
2602
2603 let batch = RecordBatch::try_new(arrow_schema, vec![list_array, map_array]).unwrap();
2604 writer.write(&batch).unwrap();
2605 let file_metadata = writer.close().unwrap();
2606
2607 assert_eq!(file_metadata.schema[3].name, "element");
2609 assert_eq!(file_metadata.schema[5].name, "key_value");
2611 assert_eq!(file_metadata.schema[6].name, "key");
2613 assert_eq!(file_metadata.schema[7].name, "value");
2615
2616 let reader = SerializedFileReader::new(file).unwrap();
2618 let file_schema = reader.metadata().file_metadata().schema();
2619 let fields = file_schema.get_fields();
2620 let list_field = &fields[0].get_fields()[0];
2621 assert_eq!(list_field.get_fields()[0].name(), "element");
2622 let map_field = &fields[1].get_fields()[0];
2623 assert_eq!(map_field.name(), "key_value");
2624 assert_eq!(map_field.get_fields()[0].name(), "key");
2625 assert_eq!(map_field.get_fields()[1].name(), "value");
2626 }
2627
2628 #[test]
2629 fn fallback_flush_data_page() {
2630 let raw_values: Vec<_> = (0..MEDIUM_SIZE).map(|i| i.to_string()).collect();
2632 let values = Arc::new(StringArray::from(raw_values));
2633 let encodings = vec![
2634 Encoding::DELTA_BYTE_ARRAY,
2635 Encoding::DELTA_LENGTH_BYTE_ARRAY,
2636 ];
2637 let data_type = values.data_type().clone();
2638 let schema = Arc::new(Schema::new(vec![Field::new("col", data_type, false)]));
2639 let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2640
2641 let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
2642 let data_page_size_limit: usize = 32;
2643 let write_batch_size: usize = 16;
2644
2645 for encoding in &encodings {
2646 for row_group_size in row_group_sizes {
2647 let props = WriterProperties::builder()
2648 .set_writer_version(WriterVersion::PARQUET_2_0)
2649 .set_max_row_group_size(row_group_size)
2650 .set_dictionary_enabled(false)
2651 .set_encoding(*encoding)
2652 .set_data_page_size_limit(data_page_size_limit)
2653 .set_write_batch_size(write_batch_size)
2654 .build();
2655
2656 roundtrip_opts_with_array_validation(&expected_batch, props, |a, b| {
2657 let string_array_a = StringArray::from(a.clone());
2658 let string_array_b = StringArray::from(b.clone());
2659 let vec_a: Vec<&str> = string_array_a.iter().map(|v| v.unwrap()).collect();
2660 let vec_b: Vec<&str> = string_array_b.iter().map(|v| v.unwrap()).collect();
2661 assert_eq!(
2662 vec_a, vec_b,
2663 "failed for encoder: {encoding:?} and row_group_size: {row_group_size:?}"
2664 );
2665 });
2666 }
2667 }
2668 }
2669
2670 #[test]
2671 fn arrow_writer_string_dictionary() {
2672 #[allow(deprecated)]
2674 let schema = Arc::new(Schema::new(vec![Field::new_dict(
2675 "dictionary",
2676 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2677 true,
2678 42,
2679 true,
2680 )]));
2681
2682 let d: Int32DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
2684 .iter()
2685 .copied()
2686 .collect();
2687
2688 one_column_roundtrip_with_schema(Arc::new(d), schema);
2690 }
2691
2692 #[test]
2693 fn arrow_writer_primitive_dictionary() {
2694 #[allow(deprecated)]
2696 let schema = Arc::new(Schema::new(vec![Field::new_dict(
2697 "dictionary",
2698 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::UInt32)),
2699 true,
2700 42,
2701 true,
2702 )]));
2703
2704 let mut builder = PrimitiveDictionaryBuilder::<UInt8Type, UInt32Type>::new();
2706 builder.append(12345678).unwrap();
2707 builder.append_null();
2708 builder.append(22345678).unwrap();
2709 builder.append(12345678).unwrap();
2710 let d = builder.finish();
2711
2712 one_column_roundtrip_with_schema(Arc::new(d), schema);
2713 }
2714
2715 #[test]
2716 fn arrow_writer_decimal128_dictionary() {
2717 let integers = vec![12345, 56789, 34567];
2718
2719 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
2720
2721 let values = Decimal128Array::from(integers.clone())
2722 .with_precision_and_scale(5, 2)
2723 .unwrap();
2724
2725 let array = DictionaryArray::new(keys, Arc::new(values));
2726 one_column_roundtrip(Arc::new(array.clone()), true);
2727
2728 let values = Decimal128Array::from(integers)
2729 .with_precision_and_scale(12, 2)
2730 .unwrap();
2731
2732 let array = array.with_values(Arc::new(values));
2733 one_column_roundtrip(Arc::new(array), true);
2734 }
2735
2736 #[test]
2737 fn arrow_writer_decimal256_dictionary() {
2738 let integers = vec![
2739 i256::from_i128(12345),
2740 i256::from_i128(56789),
2741 i256::from_i128(34567),
2742 ];
2743
2744 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
2745
2746 let values = Decimal256Array::from(integers.clone())
2747 .with_precision_and_scale(5, 2)
2748 .unwrap();
2749
2750 let array = DictionaryArray::new(keys, Arc::new(values));
2751 one_column_roundtrip(Arc::new(array.clone()), true);
2752
2753 let values = Decimal256Array::from(integers)
2754 .with_precision_and_scale(12, 2)
2755 .unwrap();
2756
2757 let array = array.with_values(Arc::new(values));
2758 one_column_roundtrip(Arc::new(array), true);
2759 }
2760
2761 #[test]
2762 fn arrow_writer_string_dictionary_unsigned_index() {
2763 #[allow(deprecated)]
2765 let schema = Arc::new(Schema::new(vec![Field::new_dict(
2766 "dictionary",
2767 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
2768 true,
2769 42,
2770 true,
2771 )]));
2772
2773 let d: UInt8DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
2775 .iter()
2776 .copied()
2777 .collect();
2778
2779 one_column_roundtrip_with_schema(Arc::new(d), schema);
2780 }
2781
2782 #[test]
2783 fn u32_min_max() {
2784 let src = [
2786 u32::MIN,
2787 u32::MIN + 1,
2788 (i32::MAX as u32) - 1,
2789 i32::MAX as u32,
2790 (i32::MAX as u32) + 1,
2791 u32::MAX - 1,
2792 u32::MAX,
2793 ];
2794 let values = Arc::new(UInt32Array::from_iter_values(src.iter().cloned()));
2795 let files = one_column_roundtrip(values, false);
2796
2797 for file in files {
2798 let reader = SerializedFileReader::new(file).unwrap();
2800 let metadata = reader.metadata();
2801
2802 let mut row_offset = 0;
2803 for row_group in metadata.row_groups() {
2804 assert_eq!(row_group.num_columns(), 1);
2805 let column = row_group.column(0);
2806
2807 let num_values = column.num_values() as usize;
2808 let src_slice = &src[row_offset..row_offset + num_values];
2809 row_offset += column.num_values() as usize;
2810
2811 let stats = column.statistics().unwrap();
2812 if let Statistics::Int32(stats) = stats {
2813 assert_eq!(
2814 *stats.min_opt().unwrap() as u32,
2815 *src_slice.iter().min().unwrap()
2816 );
2817 assert_eq!(
2818 *stats.max_opt().unwrap() as u32,
2819 *src_slice.iter().max().unwrap()
2820 );
2821 } else {
2822 panic!("Statistics::Int32 missing")
2823 }
2824 }
2825 }
2826 }
2827
2828 #[test]
2829 fn u64_min_max() {
2830 let src = [
2832 u64::MIN,
2833 u64::MIN + 1,
2834 (i64::MAX as u64) - 1,
2835 i64::MAX as u64,
2836 (i64::MAX as u64) + 1,
2837 u64::MAX - 1,
2838 u64::MAX,
2839 ];
2840 let values = Arc::new(UInt64Array::from_iter_values(src.iter().cloned()));
2841 let files = one_column_roundtrip(values, false);
2842
2843 for file in files {
2844 let reader = SerializedFileReader::new(file).unwrap();
2846 let metadata = reader.metadata();
2847
2848 let mut row_offset = 0;
2849 for row_group in metadata.row_groups() {
2850 assert_eq!(row_group.num_columns(), 1);
2851 let column = row_group.column(0);
2852
2853 let num_values = column.num_values() as usize;
2854 let src_slice = &src[row_offset..row_offset + num_values];
2855 row_offset += column.num_values() as usize;
2856
2857 let stats = column.statistics().unwrap();
2858 if let Statistics::Int64(stats) = stats {
2859 assert_eq!(
2860 *stats.min_opt().unwrap() as u64,
2861 *src_slice.iter().min().unwrap()
2862 );
2863 assert_eq!(
2864 *stats.max_opt().unwrap() as u64,
2865 *src_slice.iter().max().unwrap()
2866 );
2867 } else {
2868 panic!("Statistics::Int64 missing")
2869 }
2870 }
2871 }
2872 }
2873
2874 #[test]
2875 fn statistics_null_counts_only_nulls() {
2876 let values = Arc::new(UInt64Array::from(vec![None, None]));
2878 let files = one_column_roundtrip(values, true);
2879
2880 for file in files {
2881 let reader = SerializedFileReader::new(file).unwrap();
2883 let metadata = reader.metadata();
2884 assert_eq!(metadata.num_row_groups(), 1);
2885 let row_group = metadata.row_group(0);
2886 assert_eq!(row_group.num_columns(), 1);
2887 let column = row_group.column(0);
2888 let stats = column.statistics().unwrap();
2889 assert_eq!(stats.null_count_opt(), Some(2));
2890 }
2891 }
2892
2893 #[test]
2894 fn test_list_of_struct_roundtrip() {
2895 let int_field = Field::new("a", DataType::Int32, true);
2897 let int_field2 = Field::new("b", DataType::Int32, true);
2898
2899 let int_builder = Int32Builder::with_capacity(10);
2900 let int_builder2 = Int32Builder::with_capacity(10);
2901
2902 let struct_builder = StructBuilder::new(
2903 vec![int_field, int_field2],
2904 vec![Box::new(int_builder), Box::new(int_builder2)],
2905 );
2906 let mut list_builder = ListBuilder::new(struct_builder);
2907
2908 let values = list_builder.values();
2913 values
2914 .field_builder::<Int32Builder>(0)
2915 .unwrap()
2916 .append_value(1);
2917 values
2918 .field_builder::<Int32Builder>(1)
2919 .unwrap()
2920 .append_value(2);
2921 values.append(true);
2922 list_builder.append(true);
2923
2924 list_builder.append(true);
2926
2927 list_builder.append(false);
2929
2930 let values = list_builder.values();
2932 values
2933 .field_builder::<Int32Builder>(0)
2934 .unwrap()
2935 .append_null();
2936 values
2937 .field_builder::<Int32Builder>(1)
2938 .unwrap()
2939 .append_null();
2940 values.append(false);
2941 values
2942 .field_builder::<Int32Builder>(0)
2943 .unwrap()
2944 .append_null();
2945 values
2946 .field_builder::<Int32Builder>(1)
2947 .unwrap()
2948 .append_null();
2949 values.append(false);
2950 list_builder.append(true);
2951
2952 let values = list_builder.values();
2954 values
2955 .field_builder::<Int32Builder>(0)
2956 .unwrap()
2957 .append_null();
2958 values
2959 .field_builder::<Int32Builder>(1)
2960 .unwrap()
2961 .append_value(3);
2962 values.append(true);
2963 list_builder.append(true);
2964
2965 let values = list_builder.values();
2967 values
2968 .field_builder::<Int32Builder>(0)
2969 .unwrap()
2970 .append_value(2);
2971 values
2972 .field_builder::<Int32Builder>(1)
2973 .unwrap()
2974 .append_null();
2975 values.append(true);
2976 list_builder.append(true);
2977
2978 let array = Arc::new(list_builder.finish());
2979
2980 one_column_roundtrip(array, true);
2981 }
2982
2983 fn row_group_sizes(metadata: &ParquetMetaData) -> Vec<i64> {
2984 metadata.row_groups().iter().map(|x| x.num_rows()).collect()
2985 }
2986
2987 #[test]
2988 fn test_aggregates_records() {
2989 let arrays = [
2990 Int32Array::from((0..100).collect::<Vec<_>>()),
2991 Int32Array::from((0..50).collect::<Vec<_>>()),
2992 Int32Array::from((200..500).collect::<Vec<_>>()),
2993 ];
2994
2995 let schema = Arc::new(Schema::new(vec![Field::new(
2996 "int",
2997 ArrowDataType::Int32,
2998 false,
2999 )]));
3000
3001 let file = tempfile::tempfile().unwrap();
3002
3003 let props = WriterProperties::builder()
3004 .set_max_row_group_size(200)
3005 .build();
3006
3007 let mut writer =
3008 ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
3009
3010 for array in arrays {
3011 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
3012 writer.write(&batch).unwrap();
3013 }
3014
3015 writer.close().unwrap();
3016
3017 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3018 assert_eq!(&row_group_sizes(builder.metadata()), &[200, 200, 50]);
3019
3020 let batches = builder
3021 .with_batch_size(100)
3022 .build()
3023 .unwrap()
3024 .collect::<ArrowResult<Vec<_>>>()
3025 .unwrap();
3026
3027 assert_eq!(batches.len(), 5);
3028 assert!(batches.iter().all(|x| x.num_columns() == 1));
3029
3030 let batch_sizes: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
3031
3032 assert_eq!(&batch_sizes, &[100, 100, 100, 100, 50]);
3033
3034 let values: Vec<_> = batches
3035 .iter()
3036 .flat_map(|x| {
3037 x.column(0)
3038 .as_any()
3039 .downcast_ref::<Int32Array>()
3040 .unwrap()
3041 .values()
3042 .iter()
3043 .cloned()
3044 })
3045 .collect();
3046
3047 let expected_values: Vec<_> = [0..100, 0..50, 200..500].into_iter().flatten().collect();
3048 assert_eq!(&values, &expected_values)
3049 }
3050
3051 #[test]
3052 fn complex_aggregate() {
3053 let field_a = Arc::new(Field::new("leaf_a", DataType::Int32, false));
3055 let field_b = Arc::new(Field::new("leaf_b", DataType::Int32, true));
3056 let struct_a = Arc::new(Field::new(
3057 "struct_a",
3058 DataType::Struct(vec![field_a.clone(), field_b.clone()].into()),
3059 true,
3060 ));
3061
3062 let list_a = Arc::new(Field::new("list", DataType::List(struct_a), true));
3063 let struct_b = Arc::new(Field::new(
3064 "struct_b",
3065 DataType::Struct(vec![list_a.clone()].into()),
3066 false,
3067 ));
3068
3069 let schema = Arc::new(Schema::new(vec![struct_b]));
3070
3071 let field_a_array = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
3073 let field_b_array =
3074 Int32Array::from_iter(vec![Some(1), None, Some(2), None, None, Some(6)]);
3075
3076 let struct_a_array = StructArray::from(vec![
3077 (field_a.clone(), Arc::new(field_a_array) as ArrayRef),
3078 (field_b.clone(), Arc::new(field_b_array) as ArrayRef),
3079 ]);
3080
3081 let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
3082 .len(5)
3083 .add_buffer(Buffer::from_iter(vec![
3084 0_i32, 1_i32, 1_i32, 3_i32, 3_i32, 5_i32,
3085 ]))
3086 .null_bit_buffer(Some(Buffer::from_iter(vec![
3087 true, false, true, false, true,
3088 ])))
3089 .child_data(vec![struct_a_array.into_data()])
3090 .build()
3091 .unwrap();
3092
3093 let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
3094 let struct_b_array = StructArray::from(vec![(list_a.clone(), list_a_array)]);
3095
3096 let batch1 =
3097 RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
3098 .unwrap();
3099
3100 let field_a_array = Int32Array::from(vec![6, 7, 8, 9, 10]);
3101 let field_b_array = Int32Array::from_iter(vec![None, None, None, Some(1), None]);
3102
3103 let struct_a_array = StructArray::from(vec![
3104 (field_a, Arc::new(field_a_array) as ArrayRef),
3105 (field_b, Arc::new(field_b_array) as ArrayRef),
3106 ]);
3107
3108 let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
3109 .len(2)
3110 .add_buffer(Buffer::from_iter(vec![0_i32, 4_i32, 5_i32]))
3111 .child_data(vec![struct_a_array.into_data()])
3112 .build()
3113 .unwrap();
3114
3115 let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
3116 let struct_b_array = StructArray::from(vec![(list_a, list_a_array)]);
3117
3118 let batch2 =
3119 RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
3120 .unwrap();
3121
3122 let batches = &[batch1, batch2];
3123
3124 let expected = r#"
3127 +-------------------------------------------------------------------------------------------------------+
3128 | struct_b |
3129 +-------------------------------------------------------------------------------------------------------+
3130 | {list: [{leaf_a: 1, leaf_b: 1}]} |
3131 | {list: } |
3132 | {list: [{leaf_a: 2, leaf_b: }, {leaf_a: 3, leaf_b: 2}]} |
3133 | {list: } |
3134 | {list: [{leaf_a: 4, leaf_b: }, {leaf_a: 5, leaf_b: }]} |
3135 | {list: [{leaf_a: 6, leaf_b: }, {leaf_a: 7, leaf_b: }, {leaf_a: 8, leaf_b: }, {leaf_a: 9, leaf_b: 1}]} |
3136 | {list: [{leaf_a: 10, leaf_b: }]} |
3137 +-------------------------------------------------------------------------------------------------------+
3138 "#.trim().split('\n').map(|x| x.trim()).collect::<Vec<_>>().join("\n");
3139
3140 let actual = pretty_format_batches(batches).unwrap().to_string();
3141 assert_eq!(actual, expected);
3142
3143 let file = tempfile::tempfile().unwrap();
3145 let props = WriterProperties::builder()
3146 .set_max_row_group_size(6)
3147 .build();
3148
3149 let mut writer =
3150 ArrowWriter::try_new(file.try_clone().unwrap(), schema, Some(props)).unwrap();
3151
3152 for batch in batches {
3153 writer.write(batch).unwrap();
3154 }
3155 writer.close().unwrap();
3156
3157 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3162 assert_eq!(&row_group_sizes(builder.metadata()), &[6, 1]);
3163
3164 let batches = builder
3165 .with_batch_size(2)
3166 .build()
3167 .unwrap()
3168 .collect::<ArrowResult<Vec<_>>>()
3169 .unwrap();
3170
3171 assert_eq!(batches.len(), 4);
3172 let batch_counts: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
3173 assert_eq!(&batch_counts, &[2, 2, 2, 1]);
3174
3175 let actual = pretty_format_batches(&batches).unwrap().to_string();
3176 assert_eq!(actual, expected);
3177 }
3178
3179 #[test]
3180 fn test_arrow_writer_metadata() {
3181 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3182 let file_schema = batch_schema.clone().with_metadata(
3183 vec![("foo".to_string(), "bar".to_string())]
3184 .into_iter()
3185 .collect(),
3186 );
3187
3188 let batch = RecordBatch::try_new(
3189 Arc::new(batch_schema),
3190 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3191 )
3192 .unwrap();
3193
3194 let mut buf = Vec::with_capacity(1024);
3195 let mut writer = ArrowWriter::try_new(&mut buf, Arc::new(file_schema), None).unwrap();
3196 writer.write(&batch).unwrap();
3197 writer.close().unwrap();
3198 }
3199
3200 #[test]
3201 fn test_arrow_writer_nullable() {
3202 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3203 let file_schema = Schema::new(vec![Field::new("int32", DataType::Int32, true)]);
3204 let file_schema = Arc::new(file_schema);
3205
3206 let batch = RecordBatch::try_new(
3207 Arc::new(batch_schema),
3208 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3209 )
3210 .unwrap();
3211
3212 let mut buf = Vec::with_capacity(1024);
3213 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
3214 writer.write(&batch).unwrap();
3215 writer.close().unwrap();
3216
3217 let mut read = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
3218 let back = read.next().unwrap().unwrap();
3219 assert_eq!(back.schema(), file_schema);
3220 assert_ne!(back.schema(), batch.schema());
3221 assert_eq!(back.column(0).as_ref(), batch.column(0).as_ref());
3222 }
3223
3224 #[test]
3225 fn in_progress_accounting() {
3226 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
3228
3229 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
3231
3232 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
3234
3235 let mut writer = ArrowWriter::try_new(vec![], batch.schema(), None).unwrap();
3236
3237 assert_eq!(writer.in_progress_size(), 0);
3239 assert_eq!(writer.in_progress_rows(), 0);
3240 assert_eq!(writer.memory_size(), 0);
3241 assert_eq!(writer.bytes_written(), 4); writer.write(&batch).unwrap();
3243
3244 let initial_size = writer.in_progress_size();
3246 assert!(initial_size > 0);
3247 assert_eq!(writer.in_progress_rows(), 5);
3248 let initial_memory = writer.memory_size();
3249 assert!(initial_memory > 0);
3250 assert!(
3252 initial_size <= initial_memory,
3253 "{initial_size} <= {initial_memory}"
3254 );
3255
3256 writer.write(&batch).unwrap();
3258 assert!(writer.in_progress_size() > initial_size);
3259 assert_eq!(writer.in_progress_rows(), 10);
3260 assert!(writer.memory_size() > initial_memory);
3261 assert!(
3262 writer.in_progress_size() <= writer.memory_size(),
3263 "in_progress_size {} <= memory_size {}",
3264 writer.in_progress_size(),
3265 writer.memory_size()
3266 );
3267
3268 let pre_flush_bytes_written = writer.bytes_written();
3270 writer.flush().unwrap();
3271 assert_eq!(writer.in_progress_size(), 0);
3272 assert_eq!(writer.memory_size(), 0);
3273 assert!(writer.bytes_written() > pre_flush_bytes_written);
3274
3275 writer.close().unwrap();
3276 }
3277
3278 #[test]
3279 fn test_writer_all_null() {
3280 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
3281 let b = Int32Array::new(vec![0; 5].into(), Some(NullBuffer::new_null(5)));
3282 let batch = RecordBatch::try_from_iter(vec![
3283 ("a", Arc::new(a) as ArrayRef),
3284 ("b", Arc::new(b) as ArrayRef),
3285 ])
3286 .unwrap();
3287
3288 let mut buf = Vec::with_capacity(1024);
3289 let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), None).unwrap();
3290 writer.write(&batch).unwrap();
3291 writer.close().unwrap();
3292
3293 let bytes = Bytes::from(buf);
3294 let options = ReadOptionsBuilder::new().with_page_index().build();
3295 let reader = SerializedFileReader::new_with_options(bytes, options).unwrap();
3296 let index = reader.metadata().offset_index().unwrap();
3297
3298 assert_eq!(index.len(), 1);
3299 assert_eq!(index[0].len(), 2); assert_eq!(index[0][0].page_locations().len(), 1); assert_eq!(index[0][1].page_locations().len(), 1); }
3303
3304 #[test]
3305 fn test_disabled_statistics_with_page() {
3306 let file_schema = Schema::new(vec![
3307 Field::new("a", DataType::Utf8, true),
3308 Field::new("b", DataType::Utf8, true),
3309 ]);
3310 let file_schema = Arc::new(file_schema);
3311
3312 let batch = RecordBatch::try_new(
3313 file_schema.clone(),
3314 vec![
3315 Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
3316 Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
3317 ],
3318 )
3319 .unwrap();
3320
3321 let props = WriterProperties::builder()
3322 .set_statistics_enabled(EnabledStatistics::None)
3323 .set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
3324 .build();
3325
3326 let mut buf = Vec::with_capacity(1024);
3327 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
3328 writer.write(&batch).unwrap();
3329
3330 let metadata = writer.close().unwrap();
3331 assert_eq!(metadata.row_groups.len(), 1);
3332 let row_group = &metadata.row_groups[0];
3333 assert_eq!(row_group.columns.len(), 2);
3334 assert!(row_group.columns[0].offset_index_offset.is_some());
3336 assert!(row_group.columns[0].column_index_offset.is_some());
3337 assert!(row_group.columns[1].offset_index_offset.is_some());
3339 assert!(row_group.columns[1].column_index_offset.is_none());
3340
3341 let options = ReadOptionsBuilder::new().with_page_index().build();
3342 let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
3343
3344 let row_group = reader.get_row_group(0).unwrap();
3345 let a_col = row_group.metadata().column(0);
3346 let b_col = row_group.metadata().column(1);
3347
3348 if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
3350 let min = byte_array_stats.min_opt().unwrap();
3351 let max = byte_array_stats.max_opt().unwrap();
3352
3353 assert_eq!(min.as_bytes(), b"a");
3354 assert_eq!(max.as_bytes(), b"d");
3355 } else {
3356 panic!("expecting Statistics::ByteArray");
3357 }
3358
3359 assert!(b_col.statistics().is_none());
3361
3362 let offset_index = reader.metadata().offset_index().unwrap();
3363 assert_eq!(offset_index.len(), 1); assert_eq!(offset_index[0].len(), 2); let column_index = reader.metadata().column_index().unwrap();
3367 assert_eq!(column_index.len(), 1); assert_eq!(column_index[0].len(), 2); let a_idx = &column_index[0][0];
3371 assert!(matches!(a_idx, Index::BYTE_ARRAY(_)), "{a_idx:?}");
3372 let b_idx = &column_index[0][1];
3373 assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
3374 }
3375
3376 #[test]
3377 fn test_disabled_statistics_with_chunk() {
3378 let file_schema = Schema::new(vec![
3379 Field::new("a", DataType::Utf8, true),
3380 Field::new("b", DataType::Utf8, true),
3381 ]);
3382 let file_schema = Arc::new(file_schema);
3383
3384 let batch = RecordBatch::try_new(
3385 file_schema.clone(),
3386 vec![
3387 Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
3388 Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
3389 ],
3390 )
3391 .unwrap();
3392
3393 let props = WriterProperties::builder()
3394 .set_statistics_enabled(EnabledStatistics::None)
3395 .set_column_statistics_enabled("a".into(), EnabledStatistics::Chunk)
3396 .build();
3397
3398 let mut buf = Vec::with_capacity(1024);
3399 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
3400 writer.write(&batch).unwrap();
3401
3402 let metadata = writer.close().unwrap();
3403 assert_eq!(metadata.row_groups.len(), 1);
3404 let row_group = &metadata.row_groups[0];
3405 assert_eq!(row_group.columns.len(), 2);
3406 assert!(row_group.columns[0].offset_index_offset.is_some());
3408 assert!(row_group.columns[0].column_index_offset.is_none());
3409 assert!(row_group.columns[1].offset_index_offset.is_some());
3411 assert!(row_group.columns[1].column_index_offset.is_none());
3412
3413 let options = ReadOptionsBuilder::new().with_page_index().build();
3414 let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
3415
3416 let row_group = reader.get_row_group(0).unwrap();
3417 let a_col = row_group.metadata().column(0);
3418 let b_col = row_group.metadata().column(1);
3419
3420 if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
3422 let min = byte_array_stats.min_opt().unwrap();
3423 let max = byte_array_stats.max_opt().unwrap();
3424
3425 assert_eq!(min.as_bytes(), b"a");
3426 assert_eq!(max.as_bytes(), b"d");
3427 } else {
3428 panic!("expecting Statistics::ByteArray");
3429 }
3430
3431 assert!(b_col.statistics().is_none());
3433
3434 let column_index = reader.metadata().column_index().unwrap();
3435 assert_eq!(column_index.len(), 1); assert_eq!(column_index[0].len(), 2); let a_idx = &column_index[0][0];
3439 assert!(matches!(a_idx, Index::NONE), "{a_idx:?}");
3440 let b_idx = &column_index[0][1];
3441 assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
3442 }
3443
3444 #[test]
3445 fn test_arrow_writer_skip_metadata() {
3446 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3447 let file_schema = Arc::new(batch_schema.clone());
3448
3449 let batch = RecordBatch::try_new(
3450 Arc::new(batch_schema),
3451 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3452 )
3453 .unwrap();
3454 let skip_options = ArrowWriterOptions::new().with_skip_arrow_metadata(true);
3455
3456 let mut buf = Vec::with_capacity(1024);
3457 let mut writer =
3458 ArrowWriter::try_new_with_options(&mut buf, file_schema.clone(), skip_options).unwrap();
3459 writer.write(&batch).unwrap();
3460 writer.close().unwrap();
3461
3462 let bytes = Bytes::from(buf);
3463 let reader_builder = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
3464 assert_eq!(file_schema, *reader_builder.schema());
3465 if let Some(key_value_metadata) = reader_builder
3466 .metadata()
3467 .file_metadata()
3468 .key_value_metadata()
3469 {
3470 assert!(!key_value_metadata
3471 .iter()
3472 .any(|kv| kv.key.as_str() == ARROW_SCHEMA_META_KEY));
3473 }
3474 }
3475
3476 #[test]
3477 fn mismatched_schemas() {
3478 let batch_schema = Schema::new(vec![Field::new("count", DataType::Int32, false)]);
3479 let file_schema = Arc::new(Schema::new(vec![Field::new(
3480 "temperature",
3481 DataType::Float64,
3482 false,
3483 )]));
3484
3485 let batch = RecordBatch::try_new(
3486 Arc::new(batch_schema),
3487 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3488 )
3489 .unwrap();
3490
3491 let mut buf = Vec::with_capacity(1024);
3492 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
3493
3494 let err = writer.write(&batch).unwrap_err().to_string();
3495 assert_eq!(
3496 err,
3497 "Arrow: Incompatible type. Field 'temperature' has type Float64, array has type Int32"
3498 );
3499 }
3500
3501 #[test]
3502 fn test_roundtrip_empty_schema() {
3504 let empty_batch = RecordBatch::try_new_with_options(
3506 Arc::new(Schema::empty()),
3507 vec![],
3508 &RecordBatchOptions::default().with_row_count(Some(0)),
3509 )
3510 .unwrap();
3511
3512 let mut parquet_bytes: Vec<u8> = Vec::new();
3514 let mut writer =
3515 ArrowWriter::try_new(&mut parquet_bytes, empty_batch.schema(), None).unwrap();
3516 writer.write(&empty_batch).unwrap();
3517 writer.close().unwrap();
3518
3519 let bytes = Bytes::from(parquet_bytes);
3521 let reader = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
3522 assert_eq!(reader.schema(), &empty_batch.schema());
3523 let batches: Vec<_> = reader
3524 .build()
3525 .unwrap()
3526 .collect::<ArrowResult<Vec<_>>>()
3527 .unwrap();
3528 assert_eq!(batches.len(), 0);
3529 }
3530}