1use bytes::Bytes;
21use std::io::{Read, Write};
22use std::iter::Peekable;
23use std::slice::Iter;
24use std::sync::{Arc, Mutex};
25use std::vec::IntoIter;
26
27use arrow_array::cast::AsArray;
28use arrow_array::types::*;
29use arrow_array::{ArrayRef, RecordBatch, RecordBatchWriter};
30use arrow_schema::{ArrowError, DataType as ArrowDataType, Field, IntervalUnit, SchemaRef};
31
32use super::schema::{add_encoded_arrow_schema_to_metadata, decimal_length_from_precision};
33
34use crate::arrow::ArrowSchemaConverter;
35use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder;
36use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter};
37use crate::column::page_encryption::PageEncryptor;
38use crate::column::writer::encoder::ColumnValueEncoder;
39use crate::column::writer::{
40 ColumnCloseResult, ColumnWriter, GenericColumnWriter, get_column_writer,
41};
42use crate::data_type::{ByteArray, FixedLenByteArray};
43#[cfg(feature = "encryption")]
44use crate::encryption::encrypt::FileEncryptor;
45use crate::errors::{ParquetError, Result};
46use crate::file::metadata::{KeyValue, ParquetMetaData, RowGroupMetaData};
47use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
48use crate::file::reader::{ChunkReader, Length};
49use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
50use crate::parquet_thrift::{ThriftCompactOutputProtocol, WriteThrift};
51use crate::schema::types::{ColumnDescPtr, SchemaDescriptor};
52use levels::{ArrayLevels, calculate_array_levels};
53
54mod byte_array;
55mod levels;
56
57pub struct ArrowWriter<W: Write> {
174 writer: SerializedFileWriter<W>,
176
177 in_progress: Option<ArrowRowGroupWriter>,
179
180 arrow_schema: SchemaRef,
184
185 row_group_writer_factory: ArrowRowGroupWriterFactory,
187
188 max_row_group_size: usize,
190}
191
192impl<W: Write + Send> std::fmt::Debug for ArrowWriter<W> {
193 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
194 let buffered_memory = self.in_progress_size();
195 f.debug_struct("ArrowWriter")
196 .field("writer", &self.writer)
197 .field("in_progress_size", &format_args!("{buffered_memory} bytes"))
198 .field("in_progress_rows", &self.in_progress_rows())
199 .field("arrow_schema", &self.arrow_schema)
200 .field("max_row_group_size", &self.max_row_group_size)
201 .finish()
202 }
203}
204
205impl<W: Write + Send> ArrowWriter<W> {
206 pub fn try_new(
212 writer: W,
213 arrow_schema: SchemaRef,
214 props: Option<WriterProperties>,
215 ) -> Result<Self> {
216 let options = ArrowWriterOptions::new().with_properties(props.unwrap_or_default());
217 Self::try_new_with_options(writer, arrow_schema, options)
218 }
219
220 pub fn try_new_with_options(
226 writer: W,
227 arrow_schema: SchemaRef,
228 options: ArrowWriterOptions,
229 ) -> Result<Self> {
230 let mut props = options.properties;
231
232 let schema = if let Some(parquet_schema) = options.schema_descr {
233 parquet_schema.clone()
234 } else {
235 let mut converter = ArrowSchemaConverter::new().with_coerce_types(props.coerce_types());
236 if let Some(schema_root) = &options.schema_root {
237 converter = converter.schema_root(schema_root);
238 }
239
240 converter.convert(&arrow_schema)?
241 };
242
243 if !options.skip_arrow_metadata {
244 add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
246 }
247
248 let max_row_group_size = props.max_row_group_size();
249
250 let props_ptr = Arc::new(props);
251 let file_writer =
252 SerializedFileWriter::new(writer, schema.root_schema_ptr(), Arc::clone(&props_ptr))?;
253
254 let row_group_writer_factory =
255 ArrowRowGroupWriterFactory::new(&file_writer, schema, arrow_schema.clone(), props_ptr);
256
257 Ok(Self {
258 writer: file_writer,
259 in_progress: None,
260 arrow_schema,
261 row_group_writer_factory,
262 max_row_group_size,
263 })
264 }
265
266 pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
268 self.writer.flushed_row_groups()
269 }
270
271 pub fn memory_size(&self) -> usize {
276 match &self.in_progress {
277 Some(in_progress) => in_progress.writers.iter().map(|x| x.memory_size()).sum(),
278 None => 0,
279 }
280 }
281
282 pub fn in_progress_size(&self) -> usize {
289 match &self.in_progress {
290 Some(in_progress) => in_progress
291 .writers
292 .iter()
293 .map(|x| x.get_estimated_total_bytes())
294 .sum(),
295 None => 0,
296 }
297 }
298
299 pub fn in_progress_rows(&self) -> usize {
301 self.in_progress
302 .as_ref()
303 .map(|x| x.buffered_rows)
304 .unwrap_or_default()
305 }
306
307 pub fn bytes_written(&self) -> usize {
309 self.writer.bytes_written()
310 }
311
312 pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
320 if batch.num_rows() == 0 {
321 return Ok(());
322 }
323
324 let in_progress = match &mut self.in_progress {
325 Some(in_progress) => in_progress,
326 x => x.insert(
327 self.row_group_writer_factory
328 .create_row_group_writer(self.writer.flushed_row_groups().len())?,
329 ),
330 };
331
332 if in_progress.buffered_rows + batch.num_rows() > self.max_row_group_size {
334 let to_write = self.max_row_group_size - in_progress.buffered_rows;
335 let a = batch.slice(0, to_write);
336 let b = batch.slice(to_write, batch.num_rows() - to_write);
337 self.write(&a)?;
338 return self.write(&b);
339 }
340
341 in_progress.write(batch)?;
342
343 if in_progress.buffered_rows >= self.max_row_group_size {
344 self.flush()?
345 }
346 Ok(())
347 }
348
349 pub fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
354 self.writer.write_all(buf)
355 }
356
357 pub fn flush(&mut self) -> Result<()> {
359 let in_progress = match self.in_progress.take() {
360 Some(in_progress) => in_progress,
361 None => return Ok(()),
362 };
363
364 let mut row_group_writer = self.writer.next_row_group()?;
365 for chunk in in_progress.close()? {
366 chunk.append_to_row_group(&mut row_group_writer)?;
367 }
368 row_group_writer.close()?;
369 Ok(())
370 }
371
372 pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
376 self.writer.append_key_value_metadata(kv_metadata)
377 }
378
379 pub fn inner(&self) -> &W {
381 self.writer.inner()
382 }
383
384 pub fn inner_mut(&mut self) -> &mut W {
393 self.writer.inner_mut()
394 }
395
396 pub fn into_inner(mut self) -> Result<W> {
398 self.flush()?;
399 self.writer.into_inner()
400 }
401
402 pub fn finish(&mut self) -> Result<ParquetMetaData> {
408 self.flush()?;
409 self.writer.finish()
410 }
411
412 pub fn close(mut self) -> Result<ParquetMetaData> {
414 self.finish()
415 }
416
417 #[deprecated(since = "56.2.0", note = "Use into_serialized_writer instead")]
419 pub fn get_column_writers(&mut self) -> Result<Vec<ArrowColumnWriter>> {
420 self.flush()?;
421 let in_progress = self
422 .row_group_writer_factory
423 .create_row_group_writer(self.writer.flushed_row_groups().len())?;
424 Ok(in_progress.writers)
425 }
426
427 #[deprecated(since = "56.2.0", note = "Use into_serialized_writer instead")]
429 pub fn append_row_group(&mut self, chunks: Vec<ArrowColumnChunk>) -> Result<()> {
430 let mut row_group_writer = self.writer.next_row_group()?;
431 for chunk in chunks {
432 chunk.append_to_row_group(&mut row_group_writer)?;
433 }
434 row_group_writer.close()?;
435 Ok(())
436 }
437
438 pub fn into_serialized_writer(
441 mut self,
442 ) -> Result<(SerializedFileWriter<W>, ArrowRowGroupWriterFactory)> {
443 self.flush()?;
444 Ok((self.writer, self.row_group_writer_factory))
445 }
446}
447
448impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> {
449 fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
450 self.write(batch).map_err(|e| e.into())
451 }
452
453 fn close(self) -> std::result::Result<(), ArrowError> {
454 self.close()?;
455 Ok(())
456 }
457}
458
459#[derive(Debug, Clone, Default)]
463pub struct ArrowWriterOptions {
464 properties: WriterProperties,
465 skip_arrow_metadata: bool,
466 schema_root: Option<String>,
467 schema_descr: Option<SchemaDescriptor>,
468}
469
470impl ArrowWriterOptions {
471 pub fn new() -> Self {
473 Self::default()
474 }
475
476 pub fn with_properties(self, properties: WriterProperties) -> Self {
478 Self { properties, ..self }
479 }
480
481 pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
488 Self {
489 skip_arrow_metadata,
490 ..self
491 }
492 }
493
494 pub fn with_schema_root(self, schema_root: String) -> Self {
496 Self {
497 schema_root: Some(schema_root),
498 ..self
499 }
500 }
501
502 pub fn with_parquet_schema(self, schema_descr: SchemaDescriptor) -> Self {
508 Self {
509 schema_descr: Some(schema_descr),
510 ..self
511 }
512 }
513}
514
515#[derive(Default)]
517struct ArrowColumnChunkData {
518 length: usize,
519 data: Vec<Bytes>,
520}
521
522impl Length for ArrowColumnChunkData {
523 fn len(&self) -> u64 {
524 self.length as _
525 }
526}
527
528impl ChunkReader for ArrowColumnChunkData {
529 type T = ArrowColumnChunkReader;
530
531 fn get_read(&self, start: u64) -> Result<Self::T> {
532 assert_eq!(start, 0); Ok(ArrowColumnChunkReader(
534 self.data.clone().into_iter().peekable(),
535 ))
536 }
537
538 fn get_bytes(&self, _start: u64, _length: usize) -> Result<Bytes> {
539 unimplemented!()
540 }
541}
542
543struct ArrowColumnChunkReader(Peekable<IntoIter<Bytes>>);
545
546impl Read for ArrowColumnChunkReader {
547 fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
548 let buffer = loop {
549 match self.0.peek_mut() {
550 Some(b) if b.is_empty() => {
551 self.0.next();
552 continue;
553 }
554 Some(b) => break b,
555 None => return Ok(0),
556 }
557 };
558
559 let len = buffer.len().min(out.len());
560 let b = buffer.split_to(len);
561 out[..len].copy_from_slice(&b);
562 Ok(len)
563 }
564}
565
566type SharedColumnChunk = Arc<Mutex<ArrowColumnChunkData>>;
571
572#[derive(Default)]
573struct ArrowPageWriter {
574 buffer: SharedColumnChunk,
575 #[cfg(feature = "encryption")]
576 page_encryptor: Option<PageEncryptor>,
577}
578
579impl ArrowPageWriter {
580 #[cfg(feature = "encryption")]
581 pub fn with_encryptor(mut self, page_encryptor: Option<PageEncryptor>) -> Self {
582 self.page_encryptor = page_encryptor;
583 self
584 }
585
586 #[cfg(feature = "encryption")]
587 fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
588 self.page_encryptor.as_mut()
589 }
590
591 #[cfg(not(feature = "encryption"))]
592 fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
593 None
594 }
595}
596
597impl PageWriter for ArrowPageWriter {
598 fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
599 let page = match self.page_encryptor_mut() {
600 Some(page_encryptor) => page_encryptor.encrypt_compressed_page(page)?,
601 None => page,
602 };
603
604 let page_header = page.to_thrift_header()?;
605 let header = {
606 let mut header = Vec::with_capacity(1024);
607
608 match self.page_encryptor_mut() {
609 Some(page_encryptor) => {
610 page_encryptor.encrypt_page_header(&page_header, &mut header)?;
611 if page.compressed_page().is_data_page() {
612 page_encryptor.increment_page();
613 }
614 }
615 None => {
616 let mut protocol = ThriftCompactOutputProtocol::new(&mut header);
617 page_header.write_thrift(&mut protocol)?;
618 }
619 };
620
621 Bytes::from(header)
622 };
623
624 let mut buf = self.buffer.try_lock().unwrap();
625
626 let data = page.compressed_page().buffer().clone();
627 let compressed_size = data.len() + header.len();
628
629 let mut spec = PageWriteSpec::new();
630 spec.page_type = page.page_type();
631 spec.num_values = page.num_values();
632 spec.uncompressed_size = page.uncompressed_size() + header.len();
633 spec.offset = buf.length as u64;
634 spec.compressed_size = compressed_size;
635 spec.bytes_written = compressed_size as u64;
636
637 buf.length += compressed_size;
638 buf.data.push(header);
639 buf.data.push(data);
640
641 Ok(spec)
642 }
643
644 fn close(&mut self) -> Result<()> {
645 Ok(())
646 }
647}
648
649#[derive(Debug)]
651pub struct ArrowLeafColumn(ArrayLevels);
652
653pub fn compute_leaves(field: &Field, array: &ArrayRef) -> Result<Vec<ArrowLeafColumn>> {
658 let levels = calculate_array_levels(array, field)?;
659 Ok(levels.into_iter().map(ArrowLeafColumn).collect())
660}
661
662pub struct ArrowColumnChunk {
664 data: ArrowColumnChunkData,
665 close: ColumnCloseResult,
666}
667
668impl std::fmt::Debug for ArrowColumnChunk {
669 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
670 f.debug_struct("ArrowColumnChunk")
671 .field("length", &self.data.length)
672 .finish_non_exhaustive()
673 }
674}
675
676impl ArrowColumnChunk {
677 pub fn append_to_row_group<W: Write + Send>(
679 self,
680 writer: &mut SerializedRowGroupWriter<'_, W>,
681 ) -> Result<()> {
682 writer.append_column(&self.data, self.close)
683 }
684}
685
686pub struct ArrowColumnWriter {
780 writer: ArrowColumnWriterImpl,
781 chunk: SharedColumnChunk,
782}
783
784impl std::fmt::Debug for ArrowColumnWriter {
785 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
786 f.debug_struct("ArrowColumnWriter").finish_non_exhaustive()
787 }
788}
789
790enum ArrowColumnWriterImpl {
791 ByteArray(GenericColumnWriter<'static, ByteArrayEncoder>),
792 Column(ColumnWriter<'static>),
793}
794
795impl ArrowColumnWriter {
796 pub fn write(&mut self, col: &ArrowLeafColumn) -> Result<()> {
798 match &mut self.writer {
799 ArrowColumnWriterImpl::Column(c) => {
800 write_leaf(c, &col.0)?;
801 }
802 ArrowColumnWriterImpl::ByteArray(c) => {
803 write_primitive(c, col.0.array().as_ref(), &col.0)?;
804 }
805 }
806 Ok(())
807 }
808
809 pub fn close(self) -> Result<ArrowColumnChunk> {
811 let close = match self.writer {
812 ArrowColumnWriterImpl::ByteArray(c) => c.close()?,
813 ArrowColumnWriterImpl::Column(c) => c.close()?,
814 };
815 let chunk = Arc::try_unwrap(self.chunk).ok().unwrap();
816 let data = chunk.into_inner().unwrap();
817 Ok(ArrowColumnChunk { data, close })
818 }
819
820 pub fn memory_size(&self) -> usize {
831 match &self.writer {
832 ArrowColumnWriterImpl::ByteArray(c) => c.memory_size(),
833 ArrowColumnWriterImpl::Column(c) => c.memory_size(),
834 }
835 }
836
837 pub fn get_estimated_total_bytes(&self) -> usize {
845 match &self.writer {
846 ArrowColumnWriterImpl::ByteArray(c) => c.get_estimated_total_bytes() as _,
847 ArrowColumnWriterImpl::Column(c) => c.get_estimated_total_bytes() as _,
848 }
849 }
850}
851
852struct ArrowRowGroupWriter {
854 writers: Vec<ArrowColumnWriter>,
855 schema: SchemaRef,
856 buffered_rows: usize,
857}
858
859impl ArrowRowGroupWriter {
860 fn new(writers: Vec<ArrowColumnWriter>, arrow: &SchemaRef) -> Self {
861 Self {
862 writers,
863 schema: arrow.clone(),
864 buffered_rows: 0,
865 }
866 }
867
868 fn write(&mut self, batch: &RecordBatch) -> Result<()> {
869 self.buffered_rows += batch.num_rows();
870 let mut writers = self.writers.iter_mut();
871 for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
872 for leaf in compute_leaves(field.as_ref(), column)? {
873 writers.next().unwrap().write(&leaf)?
874 }
875 }
876 Ok(())
877 }
878
879 fn close(self) -> Result<Vec<ArrowColumnChunk>> {
880 self.writers
881 .into_iter()
882 .map(|writer| writer.close())
883 .collect()
884 }
885}
886
887pub struct ArrowRowGroupWriterFactory {
889 schema: SchemaDescriptor,
890 arrow_schema: SchemaRef,
891 props: WriterPropertiesPtr,
892 #[cfg(feature = "encryption")]
893 file_encryptor: Option<Arc<FileEncryptor>>,
894}
895
896impl ArrowRowGroupWriterFactory {
897 #[cfg(feature = "encryption")]
898 fn new<W: Write + Send>(
899 file_writer: &SerializedFileWriter<W>,
900 schema: SchemaDescriptor,
901 arrow_schema: SchemaRef,
902 props: WriterPropertiesPtr,
903 ) -> Self {
904 Self {
905 schema,
906 arrow_schema,
907 props,
908 file_encryptor: file_writer.file_encryptor(),
909 }
910 }
911
912 #[cfg(not(feature = "encryption"))]
913 fn new<W: Write + Send>(
914 _file_writer: &SerializedFileWriter<W>,
915 schema: SchemaDescriptor,
916 arrow_schema: SchemaRef,
917 props: WriterPropertiesPtr,
918 ) -> Self {
919 Self {
920 schema,
921 arrow_schema,
922 props,
923 }
924 }
925
926 #[cfg(feature = "encryption")]
927 fn create_row_group_writer(&self, row_group_index: usize) -> Result<ArrowRowGroupWriter> {
928 let writers = get_column_writers_with_encryptor(
929 &self.schema,
930 &self.props,
931 &self.arrow_schema,
932 self.file_encryptor.clone(),
933 row_group_index,
934 )?;
935 Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema))
936 }
937
938 #[cfg(not(feature = "encryption"))]
939 fn create_row_group_writer(&self, _row_group_index: usize) -> Result<ArrowRowGroupWriter> {
940 let writers = get_column_writers(&self.schema, &self.props, &self.arrow_schema)?;
941 Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema))
942 }
943
944 pub fn create_column_writers(&self, row_group_index: usize) -> Result<Vec<ArrowColumnWriter>> {
946 let rg_writer = self.create_row_group_writer(row_group_index)?;
947 Ok(rg_writer.writers)
948 }
949}
950
951pub fn get_column_writers(
953 parquet: &SchemaDescriptor,
954 props: &WriterPropertiesPtr,
955 arrow: &SchemaRef,
956) -> Result<Vec<ArrowColumnWriter>> {
957 let mut writers = Vec::with_capacity(arrow.fields.len());
958 let mut leaves = parquet.columns().iter();
959 let column_factory = ArrowColumnWriterFactory::new();
960 for field in &arrow.fields {
961 column_factory.get_arrow_column_writer(
962 field.data_type(),
963 props,
964 &mut leaves,
965 &mut writers,
966 )?;
967 }
968 Ok(writers)
969}
970
971#[cfg(feature = "encryption")]
973fn get_column_writers_with_encryptor(
974 parquet: &SchemaDescriptor,
975 props: &WriterPropertiesPtr,
976 arrow: &SchemaRef,
977 file_encryptor: Option<Arc<FileEncryptor>>,
978 row_group_index: usize,
979) -> Result<Vec<ArrowColumnWriter>> {
980 let mut writers = Vec::with_capacity(arrow.fields.len());
981 let mut leaves = parquet.columns().iter();
982 let column_factory =
983 ArrowColumnWriterFactory::new().with_file_encryptor(row_group_index, file_encryptor);
984 for field in &arrow.fields {
985 column_factory.get_arrow_column_writer(
986 field.data_type(),
987 props,
988 &mut leaves,
989 &mut writers,
990 )?;
991 }
992 Ok(writers)
993}
994
995struct ArrowColumnWriterFactory {
997 #[cfg(feature = "encryption")]
998 row_group_index: usize,
999 #[cfg(feature = "encryption")]
1000 file_encryptor: Option<Arc<FileEncryptor>>,
1001}
1002
1003impl ArrowColumnWriterFactory {
1004 pub fn new() -> Self {
1005 Self {
1006 #[cfg(feature = "encryption")]
1007 row_group_index: 0,
1008 #[cfg(feature = "encryption")]
1009 file_encryptor: None,
1010 }
1011 }
1012
1013 #[cfg(feature = "encryption")]
1014 pub fn with_file_encryptor(
1015 mut self,
1016 row_group_index: usize,
1017 file_encryptor: Option<Arc<FileEncryptor>>,
1018 ) -> Self {
1019 self.row_group_index = row_group_index;
1020 self.file_encryptor = file_encryptor;
1021 self
1022 }
1023
1024 #[cfg(feature = "encryption")]
1025 fn create_page_writer(
1026 &self,
1027 column_descriptor: &ColumnDescPtr,
1028 column_index: usize,
1029 ) -> Result<Box<ArrowPageWriter>> {
1030 let column_path = column_descriptor.path().string();
1031 let page_encryptor = PageEncryptor::create_if_column_encrypted(
1032 &self.file_encryptor,
1033 self.row_group_index,
1034 column_index,
1035 &column_path,
1036 )?;
1037 Ok(Box::new(
1038 ArrowPageWriter::default().with_encryptor(page_encryptor),
1039 ))
1040 }
1041
1042 #[cfg(not(feature = "encryption"))]
1043 fn create_page_writer(
1044 &self,
1045 _column_descriptor: &ColumnDescPtr,
1046 _column_index: usize,
1047 ) -> Result<Box<ArrowPageWriter>> {
1048 Ok(Box::<ArrowPageWriter>::default())
1049 }
1050
1051 fn get_arrow_column_writer(
1054 &self,
1055 data_type: &ArrowDataType,
1056 props: &WriterPropertiesPtr,
1057 leaves: &mut Iter<'_, ColumnDescPtr>,
1058 out: &mut Vec<ArrowColumnWriter>,
1059 ) -> Result<()> {
1060 let col = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
1062 let page_writer = self.create_page_writer(desc, out.len())?;
1063 let chunk = page_writer.buffer.clone();
1064 let writer = get_column_writer(desc.clone(), props.clone(), page_writer);
1065 Ok(ArrowColumnWriter {
1066 chunk,
1067 writer: ArrowColumnWriterImpl::Column(writer),
1068 })
1069 };
1070
1071 let bytes = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
1073 let page_writer = self.create_page_writer(desc, out.len())?;
1074 let chunk = page_writer.buffer.clone();
1075 let writer = GenericColumnWriter::new(desc.clone(), props.clone(), page_writer);
1076 Ok(ArrowColumnWriter {
1077 chunk,
1078 writer: ArrowColumnWriterImpl::ByteArray(writer),
1079 })
1080 };
1081
1082 match data_type {
1083 _ if data_type.is_primitive() => out.push(col(leaves.next().unwrap())?),
1084 ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Boolean | ArrowDataType::Null => {
1085 out.push(col(leaves.next().unwrap())?)
1086 }
1087 ArrowDataType::LargeBinary
1088 | ArrowDataType::Binary
1089 | ArrowDataType::Utf8
1090 | ArrowDataType::LargeUtf8
1091 | ArrowDataType::BinaryView
1092 | ArrowDataType::Utf8View => out.push(bytes(leaves.next().unwrap())?),
1093 ArrowDataType::List(f)
1094 | ArrowDataType::LargeList(f)
1095 | ArrowDataType::FixedSizeList(f, _) => {
1096 self.get_arrow_column_writer(f.data_type(), props, leaves, out)?
1097 }
1098 ArrowDataType::Struct(fields) => {
1099 for field in fields {
1100 self.get_arrow_column_writer(field.data_type(), props, leaves, out)?
1101 }
1102 }
1103 ArrowDataType::Map(f, _) => match f.data_type() {
1104 ArrowDataType::Struct(f) => {
1105 self.get_arrow_column_writer(f[0].data_type(), props, leaves, out)?;
1106 self.get_arrow_column_writer(f[1].data_type(), props, leaves, out)?
1107 }
1108 _ => unreachable!("invalid map type"),
1109 },
1110 ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1111 ArrowDataType::Utf8
1112 | ArrowDataType::LargeUtf8
1113 | ArrowDataType::Binary
1114 | ArrowDataType::LargeBinary => out.push(bytes(leaves.next().unwrap())?),
1115 ArrowDataType::Utf8View | ArrowDataType::BinaryView => {
1116 out.push(bytes(leaves.next().unwrap())?)
1117 }
1118 ArrowDataType::FixedSizeBinary(_) => out.push(bytes(leaves.next().unwrap())?),
1119 _ => out.push(col(leaves.next().unwrap())?),
1120 },
1121 _ => {
1122 return Err(ParquetError::NYI(format!(
1123 "Attempting to write an Arrow type {data_type} to parquet that is not yet implemented"
1124 )));
1125 }
1126 }
1127 Ok(())
1128 }
1129}
1130
1131fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result<usize> {
1132 let column = levels.array().as_ref();
1133 let indices = levels.non_null_indices();
1134 match writer {
1135 ColumnWriter::Int32ColumnWriter(typed) => {
1136 match column.data_type() {
1137 ArrowDataType::Date64 => {
1138 let array = arrow_cast::cast(column, &ArrowDataType::Date32)?;
1140 let array = arrow_cast::cast(&array, &ArrowDataType::Int32)?;
1141
1142 let array = array.as_primitive::<Int32Type>();
1143 write_primitive(typed, array.values(), levels)
1144 }
1145 ArrowDataType::UInt32 => {
1146 let values = column.as_primitive::<UInt32Type>().values();
1147 let array = values.inner().typed_data::<i32>();
1150 write_primitive(typed, array, levels)
1151 }
1152 ArrowDataType::Decimal32(_, _) => {
1153 let array = column
1154 .as_primitive::<Decimal32Type>()
1155 .unary::<_, Int32Type>(|v| v);
1156 write_primitive(typed, array.values(), levels)
1157 }
1158 ArrowDataType::Decimal64(_, _) => {
1159 let array = column
1161 .as_primitive::<Decimal64Type>()
1162 .unary::<_, Int32Type>(|v| v as i32);
1163 write_primitive(typed, array.values(), levels)
1164 }
1165 ArrowDataType::Decimal128(_, _) => {
1166 let array = column
1168 .as_primitive::<Decimal128Type>()
1169 .unary::<_, Int32Type>(|v| v as i32);
1170 write_primitive(typed, array.values(), levels)
1171 }
1172 ArrowDataType::Decimal256(_, _) => {
1173 let array = column
1175 .as_primitive::<Decimal256Type>()
1176 .unary::<_, Int32Type>(|v| v.as_i128() as i32);
1177 write_primitive(typed, array.values(), levels)
1178 }
1179 ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1180 ArrowDataType::Decimal32(_, _) => {
1181 let array = arrow_cast::cast(column, value_type)?;
1182 let array = array
1183 .as_primitive::<Decimal32Type>()
1184 .unary::<_, Int32Type>(|v| v);
1185 write_primitive(typed, array.values(), levels)
1186 }
1187 ArrowDataType::Decimal64(_, _) => {
1188 let array = arrow_cast::cast(column, value_type)?;
1189 let array = array
1190 .as_primitive::<Decimal64Type>()
1191 .unary::<_, Int32Type>(|v| v as i32);
1192 write_primitive(typed, array.values(), levels)
1193 }
1194 ArrowDataType::Decimal128(_, _) => {
1195 let array = arrow_cast::cast(column, value_type)?;
1196 let array = array
1197 .as_primitive::<Decimal128Type>()
1198 .unary::<_, Int32Type>(|v| v as i32);
1199 write_primitive(typed, array.values(), levels)
1200 }
1201 ArrowDataType::Decimal256(_, _) => {
1202 let array = arrow_cast::cast(column, value_type)?;
1203 let array = array
1204 .as_primitive::<Decimal256Type>()
1205 .unary::<_, Int32Type>(|v| v.as_i128() as i32);
1206 write_primitive(typed, array.values(), levels)
1207 }
1208 _ => {
1209 let array = arrow_cast::cast(column, &ArrowDataType::Int32)?;
1210 let array = array.as_primitive::<Int32Type>();
1211 write_primitive(typed, array.values(), levels)
1212 }
1213 },
1214 _ => {
1215 let array = arrow_cast::cast(column, &ArrowDataType::Int32)?;
1216 let array = array.as_primitive::<Int32Type>();
1217 write_primitive(typed, array.values(), levels)
1218 }
1219 }
1220 }
1221 ColumnWriter::BoolColumnWriter(typed) => {
1222 let array = column.as_boolean();
1223 typed.write_batch(
1224 get_bool_array_slice(array, indices).as_slice(),
1225 levels.def_levels(),
1226 levels.rep_levels(),
1227 )
1228 }
1229 ColumnWriter::Int64ColumnWriter(typed) => {
1230 match column.data_type() {
1231 ArrowDataType::Date64 => {
1232 let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
1233
1234 let array = array.as_primitive::<Int64Type>();
1235 write_primitive(typed, array.values(), levels)
1236 }
1237 ArrowDataType::Int64 => {
1238 let array = column.as_primitive::<Int64Type>();
1239 write_primitive(typed, array.values(), levels)
1240 }
1241 ArrowDataType::UInt64 => {
1242 let values = column.as_primitive::<UInt64Type>().values();
1243 let array = values.inner().typed_data::<i64>();
1246 write_primitive(typed, array, levels)
1247 }
1248 ArrowDataType::Decimal64(_, _) => {
1249 let array = column
1250 .as_primitive::<Decimal64Type>()
1251 .unary::<_, Int64Type>(|v| v);
1252 write_primitive(typed, array.values(), levels)
1253 }
1254 ArrowDataType::Decimal128(_, _) => {
1255 let array = column
1257 .as_primitive::<Decimal128Type>()
1258 .unary::<_, Int64Type>(|v| v as i64);
1259 write_primitive(typed, array.values(), levels)
1260 }
1261 ArrowDataType::Decimal256(_, _) => {
1262 let array = column
1264 .as_primitive::<Decimal256Type>()
1265 .unary::<_, Int64Type>(|v| v.as_i128() as i64);
1266 write_primitive(typed, array.values(), levels)
1267 }
1268 ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1269 ArrowDataType::Decimal64(_, _) => {
1270 let array = arrow_cast::cast(column, value_type)?;
1271 let array = array
1272 .as_primitive::<Decimal64Type>()
1273 .unary::<_, Int64Type>(|v| v);
1274 write_primitive(typed, array.values(), levels)
1275 }
1276 ArrowDataType::Decimal128(_, _) => {
1277 let array = arrow_cast::cast(column, value_type)?;
1278 let array = array
1279 .as_primitive::<Decimal128Type>()
1280 .unary::<_, Int64Type>(|v| v as i64);
1281 write_primitive(typed, array.values(), levels)
1282 }
1283 ArrowDataType::Decimal256(_, _) => {
1284 let array = arrow_cast::cast(column, value_type)?;
1285 let array = array
1286 .as_primitive::<Decimal256Type>()
1287 .unary::<_, Int64Type>(|v| v.as_i128() as i64);
1288 write_primitive(typed, array.values(), levels)
1289 }
1290 _ => {
1291 let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
1292 let array = array.as_primitive::<Int64Type>();
1293 write_primitive(typed, array.values(), levels)
1294 }
1295 },
1296 _ => {
1297 let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
1298 let array = array.as_primitive::<Int64Type>();
1299 write_primitive(typed, array.values(), levels)
1300 }
1301 }
1302 }
1303 ColumnWriter::Int96ColumnWriter(_typed) => {
1304 unreachable!("Currently unreachable because data type not supported")
1305 }
1306 ColumnWriter::FloatColumnWriter(typed) => {
1307 let array = column.as_primitive::<Float32Type>();
1308 write_primitive(typed, array.values(), levels)
1309 }
1310 ColumnWriter::DoubleColumnWriter(typed) => {
1311 let array = column.as_primitive::<Float64Type>();
1312 write_primitive(typed, array.values(), levels)
1313 }
1314 ColumnWriter::ByteArrayColumnWriter(_) => {
1315 unreachable!("should use ByteArrayWriter")
1316 }
1317 ColumnWriter::FixedLenByteArrayColumnWriter(typed) => {
1318 let bytes = match column.data_type() {
1319 ArrowDataType::Interval(interval_unit) => match interval_unit {
1320 IntervalUnit::YearMonth => {
1321 let array = column
1322 .as_any()
1323 .downcast_ref::<arrow_array::IntervalYearMonthArray>()
1324 .unwrap();
1325 get_interval_ym_array_slice(array, indices)
1326 }
1327 IntervalUnit::DayTime => {
1328 let array = column
1329 .as_any()
1330 .downcast_ref::<arrow_array::IntervalDayTimeArray>()
1331 .unwrap();
1332 get_interval_dt_array_slice(array, indices)
1333 }
1334 _ => {
1335 return Err(ParquetError::NYI(format!(
1336 "Attempting to write an Arrow interval type {interval_unit:?} to parquet that is not yet implemented"
1337 )));
1338 }
1339 },
1340 ArrowDataType::FixedSizeBinary(_) => {
1341 let array = column
1342 .as_any()
1343 .downcast_ref::<arrow_array::FixedSizeBinaryArray>()
1344 .unwrap();
1345 get_fsb_array_slice(array, indices)
1346 }
1347 ArrowDataType::Decimal32(_, _) => {
1348 let array = column.as_primitive::<Decimal32Type>();
1349 get_decimal_32_array_slice(array, indices)
1350 }
1351 ArrowDataType::Decimal64(_, _) => {
1352 let array = column.as_primitive::<Decimal64Type>();
1353 get_decimal_64_array_slice(array, indices)
1354 }
1355 ArrowDataType::Decimal128(_, _) => {
1356 let array = column.as_primitive::<Decimal128Type>();
1357 get_decimal_128_array_slice(array, indices)
1358 }
1359 ArrowDataType::Decimal256(_, _) => {
1360 let array = column
1361 .as_any()
1362 .downcast_ref::<arrow_array::Decimal256Array>()
1363 .unwrap();
1364 get_decimal_256_array_slice(array, indices)
1365 }
1366 ArrowDataType::Float16 => {
1367 let array = column.as_primitive::<Float16Type>();
1368 get_float_16_array_slice(array, indices)
1369 }
1370 _ => {
1371 return Err(ParquetError::NYI(
1372 "Attempting to write an Arrow type that is not yet implemented".to_string(),
1373 ));
1374 }
1375 };
1376 typed.write_batch(bytes.as_slice(), levels.def_levels(), levels.rep_levels())
1377 }
1378 }
1379}
1380
1381fn write_primitive<E: ColumnValueEncoder>(
1382 writer: &mut GenericColumnWriter<E>,
1383 values: &E::Values,
1384 levels: &ArrayLevels,
1385) -> Result<usize> {
1386 writer.write_batch_internal(
1387 values,
1388 Some(levels.non_null_indices()),
1389 levels.def_levels(),
1390 levels.rep_levels(),
1391 None,
1392 None,
1393 None,
1394 )
1395}
1396
1397fn get_bool_array_slice(array: &arrow_array::BooleanArray, indices: &[usize]) -> Vec<bool> {
1398 let mut values = Vec::with_capacity(indices.len());
1399 for i in indices {
1400 values.push(array.value(*i))
1401 }
1402 values
1403}
1404
1405fn get_interval_ym_array_slice(
1408 array: &arrow_array::IntervalYearMonthArray,
1409 indices: &[usize],
1410) -> Vec<FixedLenByteArray> {
1411 let mut values = Vec::with_capacity(indices.len());
1412 for i in indices {
1413 let mut value = array.value(*i).to_le_bytes().to_vec();
1414 let mut suffix = vec![0; 8];
1415 value.append(&mut suffix);
1416 values.push(FixedLenByteArray::from(ByteArray::from(value)))
1417 }
1418 values
1419}
1420
1421fn get_interval_dt_array_slice(
1424 array: &arrow_array::IntervalDayTimeArray,
1425 indices: &[usize],
1426) -> Vec<FixedLenByteArray> {
1427 let mut values = Vec::with_capacity(indices.len());
1428 for i in indices {
1429 let mut out = [0; 12];
1430 let value = array.value(*i);
1431 out[4..8].copy_from_slice(&value.days.to_le_bytes());
1432 out[8..12].copy_from_slice(&value.milliseconds.to_le_bytes());
1433 values.push(FixedLenByteArray::from(ByteArray::from(out.to_vec())));
1434 }
1435 values
1436}
1437
1438fn get_decimal_32_array_slice(
1439 array: &arrow_array::Decimal32Array,
1440 indices: &[usize],
1441) -> Vec<FixedLenByteArray> {
1442 let mut values = Vec::with_capacity(indices.len());
1443 let size = decimal_length_from_precision(array.precision());
1444 for i in indices {
1445 let as_be_bytes = array.value(*i).to_be_bytes();
1446 let resized_value = as_be_bytes[(4 - size)..].to_vec();
1447 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1448 }
1449 values
1450}
1451
1452fn get_decimal_64_array_slice(
1453 array: &arrow_array::Decimal64Array,
1454 indices: &[usize],
1455) -> Vec<FixedLenByteArray> {
1456 let mut values = Vec::with_capacity(indices.len());
1457 let size = decimal_length_from_precision(array.precision());
1458 for i in indices {
1459 let as_be_bytes = array.value(*i).to_be_bytes();
1460 let resized_value = as_be_bytes[(8 - size)..].to_vec();
1461 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1462 }
1463 values
1464}
1465
1466fn get_decimal_128_array_slice(
1467 array: &arrow_array::Decimal128Array,
1468 indices: &[usize],
1469) -> Vec<FixedLenByteArray> {
1470 let mut values = Vec::with_capacity(indices.len());
1471 let size = decimal_length_from_precision(array.precision());
1472 for i in indices {
1473 let as_be_bytes = array.value(*i).to_be_bytes();
1474 let resized_value = as_be_bytes[(16 - size)..].to_vec();
1475 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1476 }
1477 values
1478}
1479
1480fn get_decimal_256_array_slice(
1481 array: &arrow_array::Decimal256Array,
1482 indices: &[usize],
1483) -> Vec<FixedLenByteArray> {
1484 let mut values = Vec::with_capacity(indices.len());
1485 let size = decimal_length_from_precision(array.precision());
1486 for i in indices {
1487 let as_be_bytes = array.value(*i).to_be_bytes();
1488 let resized_value = as_be_bytes[(32 - size)..].to_vec();
1489 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1490 }
1491 values
1492}
1493
1494fn get_float_16_array_slice(
1495 array: &arrow_array::Float16Array,
1496 indices: &[usize],
1497) -> Vec<FixedLenByteArray> {
1498 let mut values = Vec::with_capacity(indices.len());
1499 for i in indices {
1500 let value = array.value(*i).to_le_bytes().to_vec();
1501 values.push(FixedLenByteArray::from(ByteArray::from(value)));
1502 }
1503 values
1504}
1505
1506fn get_fsb_array_slice(
1507 array: &arrow_array::FixedSizeBinaryArray,
1508 indices: &[usize],
1509) -> Vec<FixedLenByteArray> {
1510 let mut values = Vec::with_capacity(indices.len());
1511 for i in indices {
1512 let value = array.value(*i).to_vec();
1513 values.push(FixedLenByteArray::from(ByteArray::from(value)))
1514 }
1515 values
1516}
1517
1518#[cfg(test)]
1519mod tests {
1520 use super::*;
1521
1522 use std::fs::File;
1523
1524 use crate::arrow::ARROW_SCHEMA_META_KEY;
1525 use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
1526 use crate::column::page::{Page, PageReader};
1527 use crate::file::metadata::thrift_gen::PageHeader;
1528 use crate::file::page_index::column_index::ColumnIndexMetaData;
1529 use crate::file::reader::SerializedPageReader;
1530 use crate::parquet_thrift::{ReadThrift, ThriftSliceInputProtocol};
1531 use crate::schema::types::{ColumnPath, Type};
1532 use arrow::datatypes::ToByteSlice;
1533 use arrow::datatypes::{DataType, Schema};
1534 use arrow::error::Result as ArrowResult;
1535 use arrow::util::data_gen::create_random_array;
1536 use arrow::util::pretty::pretty_format_batches;
1537 use arrow::{array::*, buffer::Buffer};
1538 use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano, NullBuffer, i256};
1539 use arrow_schema::Fields;
1540 use half::f16;
1541 use num_traits::{FromPrimitive, ToPrimitive};
1542 use tempfile::tempfile;
1543
1544 use crate::basic::Encoding;
1545 use crate::data_type::AsBytes;
1546 use crate::file::metadata::{ColumnChunkMetaData, ParquetMetaData, ParquetMetaDataReader};
1547 use crate::file::properties::{
1548 BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion,
1549 };
1550 use crate::file::serialized_reader::ReadOptionsBuilder;
1551 use crate::file::{
1552 reader::{FileReader, SerializedFileReader},
1553 statistics::Statistics,
1554 };
1555
1556 #[test]
1557 fn arrow_writer() {
1558 let schema = Schema::new(vec![
1560 Field::new("a", DataType::Int32, false),
1561 Field::new("b", DataType::Int32, true),
1562 ]);
1563
1564 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1566 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1567
1568 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
1570
1571 roundtrip(batch, Some(SMALL_SIZE / 2));
1572 }
1573
1574 fn get_bytes_after_close(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1575 let mut buffer = vec![];
1576
1577 let mut writer = ArrowWriter::try_new(&mut buffer, schema, None).unwrap();
1578 writer.write(expected_batch).unwrap();
1579 writer.close().unwrap();
1580
1581 buffer
1582 }
1583
1584 fn get_bytes_by_into_inner(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1585 let mut writer = ArrowWriter::try_new(Vec::new(), schema, None).unwrap();
1586 writer.write(expected_batch).unwrap();
1587 writer.into_inner().unwrap()
1588 }
1589
1590 #[test]
1591 fn roundtrip_bytes() {
1592 let schema = Arc::new(Schema::new(vec![
1594 Field::new("a", DataType::Int32, false),
1595 Field::new("b", DataType::Int32, true),
1596 ]));
1597
1598 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1600 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1601
1602 let expected_batch =
1604 RecordBatch::try_new(schema.clone(), vec![Arc::new(a), Arc::new(b)]).unwrap();
1605
1606 for buffer in [
1607 get_bytes_after_close(schema.clone(), &expected_batch),
1608 get_bytes_by_into_inner(schema, &expected_batch),
1609 ] {
1610 let cursor = Bytes::from(buffer);
1611 let mut record_batch_reader = ParquetRecordBatchReader::try_new(cursor, 1024).unwrap();
1612
1613 let actual_batch = record_batch_reader
1614 .next()
1615 .expect("No batch found")
1616 .expect("Unable to get batch");
1617
1618 assert_eq!(expected_batch.schema(), actual_batch.schema());
1619 assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
1620 assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
1621 for i in 0..expected_batch.num_columns() {
1622 let expected_data = expected_batch.column(i).to_data();
1623 let actual_data = actual_batch.column(i).to_data();
1624
1625 assert_eq!(expected_data, actual_data);
1626 }
1627 }
1628 }
1629
1630 #[test]
1631 fn arrow_writer_non_null() {
1632 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1634
1635 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1637
1638 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1640
1641 roundtrip(batch, Some(SMALL_SIZE / 2));
1642 }
1643
1644 #[test]
1645 fn arrow_writer_list() {
1646 let schema = Schema::new(vec![Field::new(
1648 "a",
1649 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
1650 true,
1651 )]);
1652
1653 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1655
1656 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1659
1660 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
1662 DataType::Int32,
1663 false,
1664 ))))
1665 .len(5)
1666 .add_buffer(a_value_offsets)
1667 .add_child_data(a_values.into_data())
1668 .null_bit_buffer(Some(Buffer::from([0b00011011])))
1669 .build()
1670 .unwrap();
1671 let a = ListArray::from(a_list_data);
1672
1673 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1675
1676 assert_eq!(batch.column(0).null_count(), 1);
1677
1678 roundtrip(batch, None);
1681 }
1682
1683 #[test]
1684 fn arrow_writer_list_non_null() {
1685 let schema = Schema::new(vec![Field::new(
1687 "a",
1688 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
1689 false,
1690 )]);
1691
1692 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1694
1695 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1698
1699 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
1701 DataType::Int32,
1702 false,
1703 ))))
1704 .len(5)
1705 .add_buffer(a_value_offsets)
1706 .add_child_data(a_values.into_data())
1707 .build()
1708 .unwrap();
1709 let a = ListArray::from(a_list_data);
1710
1711 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1713
1714 assert_eq!(batch.column(0).null_count(), 0);
1717
1718 roundtrip(batch, None);
1719 }
1720
1721 #[test]
1722 fn arrow_writer_binary() {
1723 let string_field = Field::new("a", DataType::Utf8, false);
1724 let binary_field = Field::new("b", DataType::Binary, false);
1725 let schema = Schema::new(vec![string_field, binary_field]);
1726
1727 let raw_string_values = vec!["foo", "bar", "baz", "quux"];
1728 let raw_binary_values = [
1729 b"foo".to_vec(),
1730 b"bar".to_vec(),
1731 b"baz".to_vec(),
1732 b"quux".to_vec(),
1733 ];
1734 let raw_binary_value_refs = raw_binary_values
1735 .iter()
1736 .map(|x| x.as_slice())
1737 .collect::<Vec<_>>();
1738
1739 let string_values = StringArray::from(raw_string_values.clone());
1740 let binary_values = BinaryArray::from(raw_binary_value_refs);
1741 let batch = RecordBatch::try_new(
1742 Arc::new(schema),
1743 vec![Arc::new(string_values), Arc::new(binary_values)],
1744 )
1745 .unwrap();
1746
1747 roundtrip(batch, Some(SMALL_SIZE / 2));
1748 }
1749
1750 #[test]
1751 fn arrow_writer_binary_view() {
1752 let string_field = Field::new("a", DataType::Utf8View, false);
1753 let binary_field = Field::new("b", DataType::BinaryView, false);
1754 let nullable_string_field = Field::new("a", DataType::Utf8View, true);
1755 let schema = Schema::new(vec![string_field, binary_field, nullable_string_field]);
1756
1757 let raw_string_values = vec!["foo", "bar", "large payload over 12 bytes", "lulu"];
1758 let raw_binary_values = vec![
1759 b"foo".to_vec(),
1760 b"bar".to_vec(),
1761 b"large payload over 12 bytes".to_vec(),
1762 b"lulu".to_vec(),
1763 ];
1764 let nullable_string_values =
1765 vec![Some("foo"), None, Some("large payload over 12 bytes"), None];
1766
1767 let string_view_values = StringViewArray::from(raw_string_values);
1768 let binary_view_values = BinaryViewArray::from_iter_values(raw_binary_values);
1769 let nullable_string_view_values = StringViewArray::from(nullable_string_values);
1770 let batch = RecordBatch::try_new(
1771 Arc::new(schema),
1772 vec![
1773 Arc::new(string_view_values),
1774 Arc::new(binary_view_values),
1775 Arc::new(nullable_string_view_values),
1776 ],
1777 )
1778 .unwrap();
1779
1780 roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
1781 roundtrip(batch, None);
1782 }
1783
1784 fn get_decimal_batch(precision: u8, scale: i8) -> RecordBatch {
1785 let decimal_field = Field::new("a", DataType::Decimal128(precision, scale), false);
1786 let schema = Schema::new(vec![decimal_field]);
1787
1788 let decimal_values = vec![10_000, 50_000, 0, -100]
1789 .into_iter()
1790 .map(Some)
1791 .collect::<Decimal128Array>()
1792 .with_precision_and_scale(precision, scale)
1793 .unwrap();
1794
1795 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(decimal_values)]).unwrap()
1796 }
1797
1798 #[test]
1799 fn arrow_writer_decimal() {
1800 let batch_int32_decimal = get_decimal_batch(5, 2);
1802 roundtrip(batch_int32_decimal, Some(SMALL_SIZE / 2));
1803 let batch_int64_decimal = get_decimal_batch(12, 2);
1805 roundtrip(batch_int64_decimal, Some(SMALL_SIZE / 2));
1806 let batch_fixed_len_byte_array_decimal = get_decimal_batch(30, 2);
1808 roundtrip(batch_fixed_len_byte_array_decimal, Some(SMALL_SIZE / 2));
1809 }
1810
1811 #[test]
1812 fn arrow_writer_complex() {
1813 let struct_field_d = Arc::new(Field::new("d", DataType::Float64, true));
1815 let struct_field_f = Arc::new(Field::new("f", DataType::Float32, true));
1816 let struct_field_g = Arc::new(Field::new_list(
1817 "g",
1818 Field::new_list_field(DataType::Int16, true),
1819 false,
1820 ));
1821 let struct_field_h = Arc::new(Field::new_list(
1822 "h",
1823 Field::new_list_field(DataType::Int16, false),
1824 true,
1825 ));
1826 let struct_field_e = Arc::new(Field::new_struct(
1827 "e",
1828 vec![
1829 struct_field_f.clone(),
1830 struct_field_g.clone(),
1831 struct_field_h.clone(),
1832 ],
1833 false,
1834 ));
1835 let schema = Schema::new(vec![
1836 Field::new("a", DataType::Int32, false),
1837 Field::new("b", DataType::Int32, true),
1838 Field::new_struct(
1839 "c",
1840 vec![struct_field_d.clone(), struct_field_e.clone()],
1841 false,
1842 ),
1843 ]);
1844
1845 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1847 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1848 let d = Float64Array::from(vec![None, None, None, Some(1.0), None]);
1849 let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]);
1850
1851 let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1852
1853 let g_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1856
1857 let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
1859 .len(5)
1860 .add_buffer(g_value_offsets.clone())
1861 .add_child_data(g_value.to_data())
1862 .build()
1863 .unwrap();
1864 let g = ListArray::from(g_list_data);
1865 let h_list_data = ArrayData::builder(struct_field_h.data_type().clone())
1867 .len(5)
1868 .add_buffer(g_value_offsets)
1869 .add_child_data(g_value.to_data())
1870 .null_bit_buffer(Some(Buffer::from([0b00011011])))
1871 .build()
1872 .unwrap();
1873 let h = ListArray::from(h_list_data);
1874
1875 let e = StructArray::from(vec![
1876 (struct_field_f, Arc::new(f) as ArrayRef),
1877 (struct_field_g, Arc::new(g) as ArrayRef),
1878 (struct_field_h, Arc::new(h) as ArrayRef),
1879 ]);
1880
1881 let c = StructArray::from(vec![
1882 (struct_field_d, Arc::new(d) as ArrayRef),
1883 (struct_field_e, Arc::new(e) as ArrayRef),
1884 ]);
1885
1886 let batch = RecordBatch::try_new(
1888 Arc::new(schema),
1889 vec![Arc::new(a), Arc::new(b), Arc::new(c)],
1890 )
1891 .unwrap();
1892
1893 roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
1894 roundtrip(batch, Some(SMALL_SIZE / 3));
1895 }
1896
1897 #[test]
1898 fn arrow_writer_complex_mixed() {
1899 let offset_field = Arc::new(Field::new("offset", DataType::Int32, false));
1904 let partition_field = Arc::new(Field::new("partition", DataType::Int64, true));
1905 let topic_field = Arc::new(Field::new("topic", DataType::Utf8, true));
1906 let schema = Schema::new(vec![Field::new(
1907 "some_nested_object",
1908 DataType::Struct(Fields::from(vec![
1909 offset_field.clone(),
1910 partition_field.clone(),
1911 topic_field.clone(),
1912 ])),
1913 false,
1914 )]);
1915
1916 let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
1918 let partition = Int64Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1919 let topic = StringArray::from(vec![Some("A"), None, Some("A"), Some(""), None]);
1920
1921 let some_nested_object = StructArray::from(vec![
1922 (offset_field, Arc::new(offset) as ArrayRef),
1923 (partition_field, Arc::new(partition) as ArrayRef),
1924 (topic_field, Arc::new(topic) as ArrayRef),
1925 ]);
1926
1927 let batch =
1929 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
1930
1931 roundtrip(batch, Some(SMALL_SIZE / 2));
1932 }
1933
1934 #[test]
1935 fn arrow_writer_map() {
1936 let json_content = r#"
1938 {"stocks":{"long": "$AAA", "short": "$BBB"}}
1939 {"stocks":{"long": null, "long": "$CCC", "short": null}}
1940 {"stocks":{"hedged": "$YYY", "long": null, "short": "$D"}}
1941 "#;
1942 let entries_struct_type = DataType::Struct(Fields::from(vec![
1943 Field::new("key", DataType::Utf8, false),
1944 Field::new("value", DataType::Utf8, true),
1945 ]));
1946 let stocks_field = Field::new(
1947 "stocks",
1948 DataType::Map(
1949 Arc::new(Field::new("entries", entries_struct_type, false)),
1950 false,
1951 ),
1952 true,
1953 );
1954 let schema = Arc::new(Schema::new(vec![stocks_field]));
1955 let builder = arrow::json::ReaderBuilder::new(schema).with_batch_size(64);
1956 let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
1957
1958 let batch = reader.next().unwrap().unwrap();
1959 roundtrip(batch, None);
1960 }
1961
1962 #[test]
1963 fn arrow_writer_2_level_struct() {
1964 let field_c = Field::new("c", DataType::Int32, true);
1966 let field_b = Field::new("b", DataType::Struct(vec![field_c].into()), true);
1967 let type_a = DataType::Struct(vec![field_b.clone()].into());
1968 let field_a = Field::new("a", type_a, true);
1969 let schema = Schema::new(vec![field_a.clone()]);
1970
1971 let c = Int32Array::from(vec![Some(1), None, Some(3), None, None, Some(6)]);
1973 let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
1974 .len(6)
1975 .null_bit_buffer(Some(Buffer::from([0b00100111])))
1976 .add_child_data(c.into_data())
1977 .build()
1978 .unwrap();
1979 let b = StructArray::from(b_data);
1980 let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
1981 .len(6)
1982 .null_bit_buffer(Some(Buffer::from([0b00101111])))
1983 .add_child_data(b.into_data())
1984 .build()
1985 .unwrap();
1986 let a = StructArray::from(a_data);
1987
1988 assert_eq!(a.null_count(), 1);
1989 assert_eq!(a.column(0).null_count(), 2);
1990
1991 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1993
1994 roundtrip(batch, Some(SMALL_SIZE / 2));
1995 }
1996
1997 #[test]
1998 fn arrow_writer_2_level_struct_non_null() {
1999 let field_c = Field::new("c", DataType::Int32, false);
2001 let type_b = DataType::Struct(vec![field_c].into());
2002 let field_b = Field::new("b", type_b.clone(), false);
2003 let type_a = DataType::Struct(vec![field_b].into());
2004 let field_a = Field::new("a", type_a.clone(), false);
2005 let schema = Schema::new(vec![field_a]);
2006
2007 let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
2009 let b_data = ArrayDataBuilder::new(type_b)
2010 .len(6)
2011 .add_child_data(c.into_data())
2012 .build()
2013 .unwrap();
2014 let b = StructArray::from(b_data);
2015 let a_data = ArrayDataBuilder::new(type_a)
2016 .len(6)
2017 .add_child_data(b.into_data())
2018 .build()
2019 .unwrap();
2020 let a = StructArray::from(a_data);
2021
2022 assert_eq!(a.null_count(), 0);
2023 assert_eq!(a.column(0).null_count(), 0);
2024
2025 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2027
2028 roundtrip(batch, Some(SMALL_SIZE / 2));
2029 }
2030
2031 #[test]
2032 fn arrow_writer_2_level_struct_mixed_null() {
2033 let field_c = Field::new("c", DataType::Int32, false);
2035 let type_b = DataType::Struct(vec![field_c].into());
2036 let field_b = Field::new("b", type_b.clone(), true);
2037 let type_a = DataType::Struct(vec![field_b].into());
2038 let field_a = Field::new("a", type_a.clone(), false);
2039 let schema = Schema::new(vec![field_a]);
2040
2041 let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
2043 let b_data = ArrayDataBuilder::new(type_b)
2044 .len(6)
2045 .null_bit_buffer(Some(Buffer::from([0b00100111])))
2046 .add_child_data(c.into_data())
2047 .build()
2048 .unwrap();
2049 let b = StructArray::from(b_data);
2050 let a_data = ArrayDataBuilder::new(type_a)
2052 .len(6)
2053 .add_child_data(b.into_data())
2054 .build()
2055 .unwrap();
2056 let a = StructArray::from(a_data);
2057
2058 assert_eq!(a.null_count(), 0);
2059 assert_eq!(a.column(0).null_count(), 2);
2060
2061 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2063
2064 roundtrip(batch, Some(SMALL_SIZE / 2));
2065 }
2066
2067 #[test]
2068 fn arrow_writer_2_level_struct_mixed_null_2() {
2069 let field_c = Field::new("c", DataType::Int32, false);
2071 let field_d = Field::new("d", DataType::FixedSizeBinary(4), false);
2072 let field_e = Field::new(
2073 "e",
2074 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2075 false,
2076 );
2077
2078 let field_b = Field::new(
2079 "b",
2080 DataType::Struct(vec![field_c, field_d, field_e].into()),
2081 false,
2082 );
2083 let type_a = DataType::Struct(vec![field_b.clone()].into());
2084 let field_a = Field::new("a", type_a, true);
2085 let schema = Schema::new(vec![field_a.clone()]);
2086
2087 let c = Int32Array::from_iter_values(0..6);
2089 let d = FixedSizeBinaryArray::try_from_iter(
2090 ["aaaa", "bbbb", "cccc", "dddd", "eeee", "ffff"].into_iter(),
2091 )
2092 .expect("four byte values");
2093 let e = Int32DictionaryArray::from_iter(["one", "two", "three", "four", "five", "one"]);
2094 let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
2095 .len(6)
2096 .add_child_data(c.into_data())
2097 .add_child_data(d.into_data())
2098 .add_child_data(e.into_data())
2099 .build()
2100 .unwrap();
2101 let b = StructArray::from(b_data);
2102 let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
2103 .len(6)
2104 .null_bit_buffer(Some(Buffer::from([0b00100101])))
2105 .add_child_data(b.into_data())
2106 .build()
2107 .unwrap();
2108 let a = StructArray::from(a_data);
2109
2110 assert_eq!(a.null_count(), 3);
2111 assert_eq!(a.column(0).null_count(), 0);
2112
2113 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2115
2116 roundtrip(batch, Some(SMALL_SIZE / 2));
2117 }
2118
2119 #[test]
2120 fn test_fixed_size_binary_in_dict() {
2121 fn test_fixed_size_binary_in_dict_inner<K>()
2122 where
2123 K: ArrowDictionaryKeyType,
2124 K::Native: FromPrimitive + ToPrimitive + TryFrom<u8>,
2125 <<K as arrow_array::ArrowPrimitiveType>::Native as TryFrom<u8>>::Error: std::fmt::Debug,
2126 {
2127 let field = Field::new(
2128 "a",
2129 DataType::Dictionary(
2130 Box::new(K::DATA_TYPE),
2131 Box::new(DataType::FixedSizeBinary(4)),
2132 ),
2133 false,
2134 );
2135 let schema = Schema::new(vec![field]);
2136
2137 let keys: Vec<K::Native> = vec![
2138 K::Native::try_from(0u8).unwrap(),
2139 K::Native::try_from(0u8).unwrap(),
2140 K::Native::try_from(1u8).unwrap(),
2141 ];
2142 let keys = PrimitiveArray::<K>::from_iter_values(keys);
2143 let values = FixedSizeBinaryArray::try_from_iter(
2144 vec![vec![0, 0, 0, 0], vec![1, 1, 1, 1]].into_iter(),
2145 )
2146 .unwrap();
2147
2148 let data = DictionaryArray::<K>::new(keys, Arc::new(values));
2149 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(data)]).unwrap();
2150 roundtrip(batch, None);
2151 }
2152
2153 test_fixed_size_binary_in_dict_inner::<UInt8Type>();
2154 test_fixed_size_binary_in_dict_inner::<UInt16Type>();
2155 test_fixed_size_binary_in_dict_inner::<UInt32Type>();
2156 test_fixed_size_binary_in_dict_inner::<UInt16Type>();
2157 test_fixed_size_binary_in_dict_inner::<Int8Type>();
2158 test_fixed_size_binary_in_dict_inner::<Int16Type>();
2159 test_fixed_size_binary_in_dict_inner::<Int32Type>();
2160 test_fixed_size_binary_in_dict_inner::<Int64Type>();
2161 }
2162
2163 #[test]
2164 fn test_empty_dict() {
2165 let struct_fields = Fields::from(vec![Field::new(
2166 "dict",
2167 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2168 false,
2169 )]);
2170
2171 let schema = Schema::new(vec![Field::new_struct(
2172 "struct",
2173 struct_fields.clone(),
2174 true,
2175 )]);
2176 let dictionary = Arc::new(DictionaryArray::new(
2177 Int32Array::new_null(5),
2178 Arc::new(StringArray::new_null(0)),
2179 ));
2180
2181 let s = StructArray::new(
2182 struct_fields,
2183 vec![dictionary],
2184 Some(NullBuffer::new_null(5)),
2185 );
2186
2187 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(s)]).unwrap();
2188 roundtrip(batch, None);
2189 }
2190 #[test]
2191 fn arrow_writer_page_size() {
2192 let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)]));
2193
2194 let mut builder = StringBuilder::with_capacity(100, 329 * 10_000);
2195
2196 for i in 0..10 {
2198 let value = i
2199 .to_string()
2200 .repeat(10)
2201 .chars()
2202 .take(10)
2203 .collect::<String>();
2204
2205 builder.append_value(value);
2206 }
2207
2208 let array = Arc::new(builder.finish());
2209
2210 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
2211
2212 let file = tempfile::tempfile().unwrap();
2213
2214 let props = WriterProperties::builder()
2216 .set_data_page_size_limit(1)
2217 .set_dictionary_page_size_limit(1)
2218 .set_write_batch_size(1)
2219 .build();
2220
2221 let mut writer =
2222 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
2223 .expect("Unable to write file");
2224 writer.write(&batch).unwrap();
2225 writer.close().unwrap();
2226
2227 let options = ReadOptionsBuilder::new().with_page_index().build();
2228 let reader =
2229 SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap();
2230
2231 let column = reader.metadata().row_group(0).columns();
2232
2233 assert_eq!(column.len(), 1);
2234
2235 assert!(
2238 column[0].dictionary_page_offset().is_some(),
2239 "Expected a dictionary page"
2240 );
2241
2242 assert!(reader.metadata().offset_index().is_some());
2243 let offset_indexes = &reader.metadata().offset_index().unwrap()[0];
2244
2245 let page_locations = offset_indexes[0].page_locations.clone();
2246
2247 assert_eq!(
2250 page_locations.len(),
2251 10,
2252 "Expected 10 pages but got {page_locations:#?}"
2253 );
2254 }
2255
2256 #[test]
2257 fn arrow_writer_float_nans() {
2258 let f16_field = Field::new("a", DataType::Float16, false);
2259 let f32_field = Field::new("b", DataType::Float32, false);
2260 let f64_field = Field::new("c", DataType::Float64, false);
2261 let schema = Schema::new(vec![f16_field, f32_field, f64_field]);
2262
2263 let f16_values = (0..MEDIUM_SIZE)
2264 .map(|i| {
2265 Some(if i % 2 == 0 {
2266 f16::NAN
2267 } else {
2268 f16::from_f32(i as f32)
2269 })
2270 })
2271 .collect::<Float16Array>();
2272
2273 let f32_values = (0..MEDIUM_SIZE)
2274 .map(|i| Some(if i % 2 == 0 { f32::NAN } else { i as f32 }))
2275 .collect::<Float32Array>();
2276
2277 let f64_values = (0..MEDIUM_SIZE)
2278 .map(|i| Some(if i % 2 == 0 { f64::NAN } else { i as f64 }))
2279 .collect::<Float64Array>();
2280
2281 let batch = RecordBatch::try_new(
2282 Arc::new(schema),
2283 vec![
2284 Arc::new(f16_values),
2285 Arc::new(f32_values),
2286 Arc::new(f64_values),
2287 ],
2288 )
2289 .unwrap();
2290
2291 roundtrip(batch, None);
2292 }
2293
2294 const SMALL_SIZE: usize = 7;
2295 const MEDIUM_SIZE: usize = 63;
2296
2297 fn roundtrip(expected_batch: RecordBatch, max_row_group_size: Option<usize>) -> Vec<Bytes> {
2298 let mut files = vec![];
2299 for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2300 let mut props = WriterProperties::builder().set_writer_version(version);
2301
2302 if let Some(size) = max_row_group_size {
2303 props = props.set_max_row_group_size(size)
2304 }
2305
2306 let props = props.build();
2307 files.push(roundtrip_opts(&expected_batch, props))
2308 }
2309 files
2310 }
2311
2312 fn roundtrip_opts_with_array_validation<F>(
2316 expected_batch: &RecordBatch,
2317 props: WriterProperties,
2318 validate: F,
2319 ) -> Bytes
2320 where
2321 F: Fn(&ArrayData, &ArrayData),
2322 {
2323 let mut file = vec![];
2324
2325 let mut writer = ArrowWriter::try_new(&mut file, expected_batch.schema(), Some(props))
2326 .expect("Unable to write file");
2327 writer.write(expected_batch).unwrap();
2328 writer.close().unwrap();
2329
2330 let file = Bytes::from(file);
2331 let mut record_batch_reader =
2332 ParquetRecordBatchReader::try_new(file.clone(), 1024).unwrap();
2333
2334 let actual_batch = record_batch_reader
2335 .next()
2336 .expect("No batch found")
2337 .expect("Unable to get batch");
2338
2339 assert_eq!(expected_batch.schema(), actual_batch.schema());
2340 assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
2341 assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
2342 for i in 0..expected_batch.num_columns() {
2343 let expected_data = expected_batch.column(i).to_data();
2344 let actual_data = actual_batch.column(i).to_data();
2345 validate(&expected_data, &actual_data);
2346 }
2347
2348 file
2349 }
2350
2351 fn roundtrip_opts(expected_batch: &RecordBatch, props: WriterProperties) -> Bytes {
2352 roundtrip_opts_with_array_validation(expected_batch, props, |a, b| {
2353 a.validate_full().expect("valid expected data");
2354 b.validate_full().expect("valid actual data");
2355 assert_eq!(a, b)
2356 })
2357 }
2358
2359 struct RoundTripOptions {
2360 values: ArrayRef,
2361 schema: SchemaRef,
2362 bloom_filter: bool,
2363 bloom_filter_position: BloomFilterPosition,
2364 }
2365
2366 impl RoundTripOptions {
2367 fn new(values: ArrayRef, nullable: bool) -> Self {
2368 let data_type = values.data_type().clone();
2369 let schema = Schema::new(vec![Field::new("col", data_type, nullable)]);
2370 Self {
2371 values,
2372 schema: Arc::new(schema),
2373 bloom_filter: false,
2374 bloom_filter_position: BloomFilterPosition::AfterRowGroup,
2375 }
2376 }
2377 }
2378
2379 fn one_column_roundtrip(values: ArrayRef, nullable: bool) -> Vec<Bytes> {
2380 one_column_roundtrip_with_options(RoundTripOptions::new(values, nullable))
2381 }
2382
2383 fn one_column_roundtrip_with_schema(values: ArrayRef, schema: SchemaRef) -> Vec<Bytes> {
2384 let mut options = RoundTripOptions::new(values, false);
2385 options.schema = schema;
2386 one_column_roundtrip_with_options(options)
2387 }
2388
2389 fn one_column_roundtrip_with_options(options: RoundTripOptions) -> Vec<Bytes> {
2390 let RoundTripOptions {
2391 values,
2392 schema,
2393 bloom_filter,
2394 bloom_filter_position,
2395 } = options;
2396
2397 let encodings = match values.data_type() {
2398 DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary | DataType::LargeBinary => {
2399 vec![
2400 Encoding::PLAIN,
2401 Encoding::DELTA_BYTE_ARRAY,
2402 Encoding::DELTA_LENGTH_BYTE_ARRAY,
2403 ]
2404 }
2405 DataType::Int64
2406 | DataType::Int32
2407 | DataType::Int16
2408 | DataType::Int8
2409 | DataType::UInt64
2410 | DataType::UInt32
2411 | DataType::UInt16
2412 | DataType::UInt8 => vec![
2413 Encoding::PLAIN,
2414 Encoding::DELTA_BINARY_PACKED,
2415 Encoding::BYTE_STREAM_SPLIT,
2416 ],
2417 DataType::Float32 | DataType::Float64 => {
2418 vec![Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT]
2419 }
2420 _ => vec![Encoding::PLAIN],
2421 };
2422
2423 let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2424
2425 let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
2426
2427 let mut files = vec![];
2428 for dictionary_size in [0, 1, 1024] {
2429 for encoding in &encodings {
2430 for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2431 for row_group_size in row_group_sizes {
2432 let props = WriterProperties::builder()
2433 .set_writer_version(version)
2434 .set_max_row_group_size(row_group_size)
2435 .set_dictionary_enabled(dictionary_size != 0)
2436 .set_dictionary_page_size_limit(dictionary_size.max(1))
2437 .set_encoding(*encoding)
2438 .set_bloom_filter_enabled(bloom_filter)
2439 .set_bloom_filter_position(bloom_filter_position)
2440 .build();
2441
2442 files.push(roundtrip_opts(&expected_batch, props))
2443 }
2444 }
2445 }
2446 }
2447 files
2448 }
2449
2450 fn values_required<A, I>(iter: I) -> Vec<Bytes>
2451 where
2452 A: From<Vec<I::Item>> + Array + 'static,
2453 I: IntoIterator,
2454 {
2455 let raw_values: Vec<_> = iter.into_iter().collect();
2456 let values = Arc::new(A::from(raw_values));
2457 one_column_roundtrip(values, false)
2458 }
2459
2460 fn values_optional<A, I>(iter: I) -> Vec<Bytes>
2461 where
2462 A: From<Vec<Option<I::Item>>> + Array + 'static,
2463 I: IntoIterator,
2464 {
2465 let optional_raw_values: Vec<_> = iter
2466 .into_iter()
2467 .enumerate()
2468 .map(|(i, v)| if i % 2 == 0 { None } else { Some(v) })
2469 .collect();
2470 let optional_values = Arc::new(A::from(optional_raw_values));
2471 one_column_roundtrip(optional_values, true)
2472 }
2473
2474 fn required_and_optional<A, I>(iter: I)
2475 where
2476 A: From<Vec<I::Item>> + From<Vec<Option<I::Item>>> + Array + 'static,
2477 I: IntoIterator + Clone,
2478 {
2479 values_required::<A, I>(iter.clone());
2480 values_optional::<A, I>(iter);
2481 }
2482
2483 fn check_bloom_filter<T: AsBytes>(
2484 files: Vec<Bytes>,
2485 file_column: String,
2486 positive_values: Vec<T>,
2487 negative_values: Vec<T>,
2488 ) {
2489 files.into_iter().take(1).for_each(|file| {
2490 let file_reader = SerializedFileReader::new_with_options(
2491 file,
2492 ReadOptionsBuilder::new()
2493 .with_reader_properties(
2494 ReaderProperties::builder()
2495 .set_read_bloom_filter(true)
2496 .build(),
2497 )
2498 .build(),
2499 )
2500 .expect("Unable to open file as Parquet");
2501 let metadata = file_reader.metadata();
2502
2503 let mut bloom_filters: Vec<_> = vec![];
2505 for (ri, row_group) in metadata.row_groups().iter().enumerate() {
2506 if let Some((column_index, _)) = row_group
2507 .columns()
2508 .iter()
2509 .enumerate()
2510 .find(|(_, column)| column.column_path().string() == file_column)
2511 {
2512 let row_group_reader = file_reader
2513 .get_row_group(ri)
2514 .expect("Unable to read row group");
2515 if let Some(sbbf) = row_group_reader.get_column_bloom_filter(column_index) {
2516 bloom_filters.push(sbbf.clone());
2517 } else {
2518 panic!("No bloom filter for column named {file_column} found");
2519 }
2520 } else {
2521 panic!("No column named {file_column} found");
2522 }
2523 }
2524
2525 positive_values.iter().for_each(|value| {
2526 let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2527 assert!(
2528 found.is_some(),
2529 "{}",
2530 format!("Value {:?} should be in bloom filter", value.as_bytes())
2531 );
2532 });
2533
2534 negative_values.iter().for_each(|value| {
2535 let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2536 assert!(
2537 found.is_none(),
2538 "{}",
2539 format!("Value {:?} should not be in bloom filter", value.as_bytes())
2540 );
2541 });
2542 });
2543 }
2544
2545 #[test]
2546 fn all_null_primitive_single_column() {
2547 let values = Arc::new(Int32Array::from(vec![None; SMALL_SIZE]));
2548 one_column_roundtrip(values, true);
2549 }
2550 #[test]
2551 fn null_single_column() {
2552 let values = Arc::new(NullArray::new(SMALL_SIZE));
2553 one_column_roundtrip(values, true);
2554 }
2556
2557 #[test]
2558 fn bool_single_column() {
2559 required_and_optional::<BooleanArray, _>(
2560 [true, false].iter().cycle().copied().take(SMALL_SIZE),
2561 );
2562 }
2563
2564 #[test]
2565 fn bool_large_single_column() {
2566 let values = Arc::new(
2567 [None, Some(true), Some(false)]
2568 .iter()
2569 .cycle()
2570 .copied()
2571 .take(200_000)
2572 .collect::<BooleanArray>(),
2573 );
2574 let schema = Schema::new(vec![Field::new("col", values.data_type().clone(), true)]);
2575 let expected_batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2576 let file = tempfile::tempfile().unwrap();
2577
2578 let mut writer =
2579 ArrowWriter::try_new(file.try_clone().unwrap(), expected_batch.schema(), None)
2580 .expect("Unable to write file");
2581 writer.write(&expected_batch).unwrap();
2582 writer.close().unwrap();
2583 }
2584
2585 #[test]
2586 fn check_page_offset_index_with_nan() {
2587 let values = Arc::new(Float64Array::from(vec![f64::NAN; 10]));
2588 let schema = Schema::new(vec![Field::new("col", DataType::Float64, true)]);
2589 let batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2590
2591 let mut out = Vec::with_capacity(1024);
2592 let mut writer =
2593 ArrowWriter::try_new(&mut out, batch.schema(), None).expect("Unable to write file");
2594 writer.write(&batch).unwrap();
2595 let file_meta_data = writer.close().unwrap();
2596 for row_group in file_meta_data.row_groups() {
2597 for column in row_group.columns() {
2598 assert!(column.offset_index_offset().is_some());
2599 assert!(column.offset_index_length().is_some());
2600 assert!(column.column_index_offset().is_none());
2601 assert!(column.column_index_length().is_none());
2602 }
2603 }
2604 }
2605
2606 #[test]
2607 fn i8_single_column() {
2608 required_and_optional::<Int8Array, _>(0..SMALL_SIZE as i8);
2609 }
2610
2611 #[test]
2612 fn i16_single_column() {
2613 required_and_optional::<Int16Array, _>(0..SMALL_SIZE as i16);
2614 }
2615
2616 #[test]
2617 fn i32_single_column() {
2618 required_and_optional::<Int32Array, _>(0..SMALL_SIZE as i32);
2619 }
2620
2621 #[test]
2622 fn i64_single_column() {
2623 required_and_optional::<Int64Array, _>(0..SMALL_SIZE as i64);
2624 }
2625
2626 #[test]
2627 fn u8_single_column() {
2628 required_and_optional::<UInt8Array, _>(0..SMALL_SIZE as u8);
2629 }
2630
2631 #[test]
2632 fn u16_single_column() {
2633 required_and_optional::<UInt16Array, _>(0..SMALL_SIZE as u16);
2634 }
2635
2636 #[test]
2637 fn u32_single_column() {
2638 required_and_optional::<UInt32Array, _>(0..SMALL_SIZE as u32);
2639 }
2640
2641 #[test]
2642 fn u64_single_column() {
2643 required_and_optional::<UInt64Array, _>(0..SMALL_SIZE as u64);
2644 }
2645
2646 #[test]
2647 fn f32_single_column() {
2648 required_and_optional::<Float32Array, _>((0..SMALL_SIZE).map(|i| i as f32));
2649 }
2650
2651 #[test]
2652 fn f64_single_column() {
2653 required_and_optional::<Float64Array, _>((0..SMALL_SIZE).map(|i| i as f64));
2654 }
2655
2656 #[test]
2661 fn timestamp_second_single_column() {
2662 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2663 let values = Arc::new(TimestampSecondArray::from(raw_values));
2664
2665 one_column_roundtrip(values, false);
2666 }
2667
2668 #[test]
2669 fn timestamp_millisecond_single_column() {
2670 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2671 let values = Arc::new(TimestampMillisecondArray::from(raw_values));
2672
2673 one_column_roundtrip(values, false);
2674 }
2675
2676 #[test]
2677 fn timestamp_microsecond_single_column() {
2678 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2679 let values = Arc::new(TimestampMicrosecondArray::from(raw_values));
2680
2681 one_column_roundtrip(values, false);
2682 }
2683
2684 #[test]
2685 fn timestamp_nanosecond_single_column() {
2686 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2687 let values = Arc::new(TimestampNanosecondArray::from(raw_values));
2688
2689 one_column_roundtrip(values, false);
2690 }
2691
2692 #[test]
2693 fn date32_single_column() {
2694 required_and_optional::<Date32Array, _>(0..SMALL_SIZE as i32);
2695 }
2696
2697 #[test]
2698 fn date64_single_column() {
2699 required_and_optional::<Date64Array, _>(
2701 (0..(SMALL_SIZE as i64 * 86400000)).step_by(86400000),
2702 );
2703 }
2704
2705 #[test]
2706 fn time32_second_single_column() {
2707 required_and_optional::<Time32SecondArray, _>(0..SMALL_SIZE as i32);
2708 }
2709
2710 #[test]
2711 fn time32_millisecond_single_column() {
2712 required_and_optional::<Time32MillisecondArray, _>(0..SMALL_SIZE as i32);
2713 }
2714
2715 #[test]
2716 fn time64_microsecond_single_column() {
2717 required_and_optional::<Time64MicrosecondArray, _>(0..SMALL_SIZE as i64);
2718 }
2719
2720 #[test]
2721 fn time64_nanosecond_single_column() {
2722 required_and_optional::<Time64NanosecondArray, _>(0..SMALL_SIZE as i64);
2723 }
2724
2725 #[test]
2726 fn duration_second_single_column() {
2727 required_and_optional::<DurationSecondArray, _>(0..SMALL_SIZE as i64);
2728 }
2729
2730 #[test]
2731 fn duration_millisecond_single_column() {
2732 required_and_optional::<DurationMillisecondArray, _>(0..SMALL_SIZE as i64);
2733 }
2734
2735 #[test]
2736 fn duration_microsecond_single_column() {
2737 required_and_optional::<DurationMicrosecondArray, _>(0..SMALL_SIZE as i64);
2738 }
2739
2740 #[test]
2741 fn duration_nanosecond_single_column() {
2742 required_and_optional::<DurationNanosecondArray, _>(0..SMALL_SIZE as i64);
2743 }
2744
2745 #[test]
2746 fn interval_year_month_single_column() {
2747 required_and_optional::<IntervalYearMonthArray, _>(0..SMALL_SIZE as i32);
2748 }
2749
2750 #[test]
2751 fn interval_day_time_single_column() {
2752 required_and_optional::<IntervalDayTimeArray, _>(vec![
2753 IntervalDayTime::new(0, 1),
2754 IntervalDayTime::new(0, 3),
2755 IntervalDayTime::new(3, -2),
2756 IntervalDayTime::new(-200, 4),
2757 ]);
2758 }
2759
2760 #[test]
2761 #[should_panic(
2762 expected = "Attempting to write an Arrow interval type MonthDayNano to parquet that is not yet implemented"
2763 )]
2764 fn interval_month_day_nano_single_column() {
2765 required_and_optional::<IntervalMonthDayNanoArray, _>(vec![
2766 IntervalMonthDayNano::new(0, 1, 5),
2767 IntervalMonthDayNano::new(0, 3, 2),
2768 IntervalMonthDayNano::new(3, -2, -5),
2769 IntervalMonthDayNano::new(-200, 4, -1),
2770 ]);
2771 }
2772
2773 #[test]
2774 fn binary_single_column() {
2775 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2776 let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
2777 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2778
2779 values_required::<BinaryArray, _>(many_vecs_iter);
2781 }
2782
2783 #[test]
2784 fn binary_view_single_column() {
2785 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2786 let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
2787 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2788
2789 values_required::<BinaryViewArray, _>(many_vecs_iter);
2791 }
2792
2793 #[test]
2794 fn i32_column_bloom_filter_at_end() {
2795 let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
2796 let mut options = RoundTripOptions::new(array, false);
2797 options.bloom_filter = true;
2798 options.bloom_filter_position = BloomFilterPosition::End;
2799
2800 let files = one_column_roundtrip_with_options(options);
2801 check_bloom_filter(
2802 files,
2803 "col".to_string(),
2804 (0..SMALL_SIZE as i32).collect(),
2805 (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
2806 );
2807 }
2808
2809 #[test]
2810 fn i32_column_bloom_filter() {
2811 let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
2812 let mut options = RoundTripOptions::new(array, false);
2813 options.bloom_filter = true;
2814
2815 let files = one_column_roundtrip_with_options(options);
2816 check_bloom_filter(
2817 files,
2818 "col".to_string(),
2819 (0..SMALL_SIZE as i32).collect(),
2820 (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
2821 );
2822 }
2823
2824 #[test]
2825 fn binary_column_bloom_filter() {
2826 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2827 let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
2828 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2829
2830 let array = Arc::new(BinaryArray::from_iter_values(many_vecs_iter));
2831 let mut options = RoundTripOptions::new(array, false);
2832 options.bloom_filter = true;
2833
2834 let files = one_column_roundtrip_with_options(options);
2835 check_bloom_filter(
2836 files,
2837 "col".to_string(),
2838 many_vecs,
2839 vec![vec![(SMALL_SIZE + 1) as u8]],
2840 );
2841 }
2842
2843 #[test]
2844 fn empty_string_null_column_bloom_filter() {
2845 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2846 let raw_strs = raw_values.iter().map(|s| s.as_str());
2847
2848 let array = Arc::new(StringArray::from_iter_values(raw_strs));
2849 let mut options = RoundTripOptions::new(array, false);
2850 options.bloom_filter = true;
2851
2852 let files = one_column_roundtrip_with_options(options);
2853
2854 let optional_raw_values: Vec<_> = raw_values
2855 .iter()
2856 .enumerate()
2857 .filter_map(|(i, v)| if i % 2 == 0 { None } else { Some(v.as_str()) })
2858 .collect();
2859 check_bloom_filter(files, "col".to_string(), optional_raw_values, vec![""]);
2861 }
2862
2863 #[test]
2864 fn large_binary_single_column() {
2865 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2866 let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
2867 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2868
2869 values_required::<LargeBinaryArray, _>(many_vecs_iter);
2871 }
2872
2873 #[test]
2874 fn fixed_size_binary_single_column() {
2875 let mut builder = FixedSizeBinaryBuilder::new(4);
2876 builder.append_value(b"0123").unwrap();
2877 builder.append_null();
2878 builder.append_value(b"8910").unwrap();
2879 builder.append_value(b"1112").unwrap();
2880 let array = Arc::new(builder.finish());
2881
2882 one_column_roundtrip(array, true);
2883 }
2884
2885 #[test]
2886 fn string_single_column() {
2887 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2888 let raw_strs = raw_values.iter().map(|s| s.as_str());
2889
2890 required_and_optional::<StringArray, _>(raw_strs);
2891 }
2892
2893 #[test]
2894 fn large_string_single_column() {
2895 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2896 let raw_strs = raw_values.iter().map(|s| s.as_str());
2897
2898 required_and_optional::<LargeStringArray, _>(raw_strs);
2899 }
2900
2901 #[test]
2902 fn string_view_single_column() {
2903 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2904 let raw_strs = raw_values.iter().map(|s| s.as_str());
2905
2906 required_and_optional::<StringViewArray, _>(raw_strs);
2907 }
2908
2909 #[test]
2910 fn null_list_single_column() {
2911 let null_field = Field::new_list_field(DataType::Null, true);
2912 let list_field = Field::new("emptylist", DataType::List(Arc::new(null_field)), true);
2913
2914 let schema = Schema::new(vec![list_field]);
2915
2916 let a_values = NullArray::new(2);
2918 let a_value_offsets = arrow::buffer::Buffer::from([0, 0, 0, 2].to_byte_slice());
2919 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2920 DataType::Null,
2921 true,
2922 ))))
2923 .len(3)
2924 .add_buffer(a_value_offsets)
2925 .null_bit_buffer(Some(Buffer::from([0b00000101])))
2926 .add_child_data(a_values.into_data())
2927 .build()
2928 .unwrap();
2929
2930 let a = ListArray::from(a_list_data);
2931
2932 assert!(a.is_valid(0));
2933 assert!(!a.is_valid(1));
2934 assert!(a.is_valid(2));
2935
2936 assert_eq!(a.value(0).len(), 0);
2937 assert_eq!(a.value(2).len(), 2);
2938 assert_eq!(a.value(2).logical_nulls().unwrap().null_count(), 2);
2939
2940 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2941 roundtrip(batch, None);
2942 }
2943
2944 #[test]
2945 fn list_single_column() {
2946 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2947 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
2948 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2949 DataType::Int32,
2950 false,
2951 ))))
2952 .len(5)
2953 .add_buffer(a_value_offsets)
2954 .null_bit_buffer(Some(Buffer::from([0b00011011])))
2955 .add_child_data(a_values.into_data())
2956 .build()
2957 .unwrap();
2958
2959 assert_eq!(a_list_data.null_count(), 1);
2960
2961 let a = ListArray::from(a_list_data);
2962 let values = Arc::new(a);
2963
2964 one_column_roundtrip(values, true);
2965 }
2966
2967 #[test]
2968 fn large_list_single_column() {
2969 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2970 let a_value_offsets = arrow::buffer::Buffer::from([0i64, 1, 3, 3, 6, 10].to_byte_slice());
2971 let a_list_data = ArrayData::builder(DataType::LargeList(Arc::new(Field::new(
2972 "large_item",
2973 DataType::Int32,
2974 true,
2975 ))))
2976 .len(5)
2977 .add_buffer(a_value_offsets)
2978 .add_child_data(a_values.into_data())
2979 .null_bit_buffer(Some(Buffer::from([0b00011011])))
2980 .build()
2981 .unwrap();
2982
2983 assert_eq!(a_list_data.null_count(), 1);
2985
2986 let a = LargeListArray::from(a_list_data);
2987 let values = Arc::new(a);
2988
2989 one_column_roundtrip(values, true);
2990 }
2991
2992 #[test]
2993 fn list_nested_nulls() {
2994 use arrow::datatypes::Int32Type;
2995 let data = vec![
2996 Some(vec![Some(1)]),
2997 Some(vec![Some(2), Some(3)]),
2998 None,
2999 Some(vec![Some(4), Some(5), None]),
3000 Some(vec![None]),
3001 Some(vec![Some(6), Some(7)]),
3002 ];
3003
3004 let list = ListArray::from_iter_primitive::<Int32Type, _, _>(data.clone());
3005 one_column_roundtrip(Arc::new(list), true);
3006
3007 let list = LargeListArray::from_iter_primitive::<Int32Type, _, _>(data);
3008 one_column_roundtrip(Arc::new(list), true);
3009 }
3010
3011 #[test]
3012 fn struct_single_column() {
3013 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
3014 let struct_field_a = Arc::new(Field::new("f", DataType::Int32, false));
3015 let s = StructArray::from(vec![(struct_field_a, Arc::new(a_values) as ArrayRef)]);
3016
3017 let values = Arc::new(s);
3018 one_column_roundtrip(values, false);
3019 }
3020
3021 #[test]
3022 fn list_and_map_coerced_names() {
3023 let list_field =
3025 Field::new_list("my_list", Field::new("item", DataType::Int32, false), false);
3026 let map_field = Field::new_map(
3027 "my_map",
3028 "entries",
3029 Field::new("keys", DataType::Int32, false),
3030 Field::new("values", DataType::Int32, true),
3031 false,
3032 true,
3033 );
3034
3035 let list_array = create_random_array(&list_field, 100, 0.0, 0.0).unwrap();
3036 let map_array = create_random_array(&map_field, 100, 0.0, 0.0).unwrap();
3037
3038 let arrow_schema = Arc::new(Schema::new(vec![list_field, map_field]));
3039
3040 let props = Some(WriterProperties::builder().set_coerce_types(true).build());
3042 let file = tempfile::tempfile().unwrap();
3043 let mut writer =
3044 ArrowWriter::try_new(file.try_clone().unwrap(), arrow_schema.clone(), props).unwrap();
3045
3046 let batch = RecordBatch::try_new(arrow_schema, vec![list_array, map_array]).unwrap();
3047 writer.write(&batch).unwrap();
3048 let file_metadata = writer.close().unwrap();
3049
3050 let schema = file_metadata.file_metadata().schema();
3051 let list_field = &schema.get_fields()[0].get_fields()[0];
3053 assert_eq!(list_field.get_fields()[0].name(), "element");
3054
3055 let map_field = &schema.get_fields()[1].get_fields()[0];
3056 assert_eq!(map_field.name(), "key_value");
3058 assert_eq!(map_field.get_fields()[0].name(), "key");
3060 assert_eq!(map_field.get_fields()[1].name(), "value");
3062
3063 let reader = SerializedFileReader::new(file).unwrap();
3065 let file_schema = reader.metadata().file_metadata().schema();
3066 let fields = file_schema.get_fields();
3067 let list_field = &fields[0].get_fields()[0];
3068 assert_eq!(list_field.get_fields()[0].name(), "element");
3069 let map_field = &fields[1].get_fields()[0];
3070 assert_eq!(map_field.name(), "key_value");
3071 assert_eq!(map_field.get_fields()[0].name(), "key");
3072 assert_eq!(map_field.get_fields()[1].name(), "value");
3073 }
3074
3075 #[test]
3076 fn fallback_flush_data_page() {
3077 let raw_values: Vec<_> = (0..MEDIUM_SIZE).map(|i| i.to_string()).collect();
3079 let values = Arc::new(StringArray::from(raw_values));
3080 let encodings = vec![
3081 Encoding::DELTA_BYTE_ARRAY,
3082 Encoding::DELTA_LENGTH_BYTE_ARRAY,
3083 ];
3084 let data_type = values.data_type().clone();
3085 let schema = Arc::new(Schema::new(vec![Field::new("col", data_type, false)]));
3086 let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3087
3088 let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
3089 let data_page_size_limit: usize = 32;
3090 let write_batch_size: usize = 16;
3091
3092 for encoding in &encodings {
3093 for row_group_size in row_group_sizes {
3094 let props = WriterProperties::builder()
3095 .set_writer_version(WriterVersion::PARQUET_2_0)
3096 .set_max_row_group_size(row_group_size)
3097 .set_dictionary_enabled(false)
3098 .set_encoding(*encoding)
3099 .set_data_page_size_limit(data_page_size_limit)
3100 .set_write_batch_size(write_batch_size)
3101 .build();
3102
3103 roundtrip_opts_with_array_validation(&expected_batch, props, |a, b| {
3104 let string_array_a = StringArray::from(a.clone());
3105 let string_array_b = StringArray::from(b.clone());
3106 let vec_a: Vec<&str> = string_array_a.iter().map(|v| v.unwrap()).collect();
3107 let vec_b: Vec<&str> = string_array_b.iter().map(|v| v.unwrap()).collect();
3108 assert_eq!(
3109 vec_a, vec_b,
3110 "failed for encoder: {encoding:?} and row_group_size: {row_group_size:?}"
3111 );
3112 });
3113 }
3114 }
3115 }
3116
3117 #[test]
3118 fn arrow_writer_string_dictionary() {
3119 #[allow(deprecated)]
3121 let schema = Arc::new(Schema::new(vec![Field::new_dict(
3122 "dictionary",
3123 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3124 true,
3125 42,
3126 true,
3127 )]));
3128
3129 let d: Int32DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
3131 .iter()
3132 .copied()
3133 .collect();
3134
3135 one_column_roundtrip_with_schema(Arc::new(d), schema);
3137 }
3138
3139 #[test]
3140 fn arrow_writer_test_type_compatibility() {
3141 fn ensure_compatible_write<T1, T2>(array1: T1, array2: T2, expected_result: T1)
3142 where
3143 T1: Array + 'static,
3144 T2: Array + 'static,
3145 {
3146 let schema1 = Arc::new(Schema::new(vec![Field::new(
3147 "a",
3148 array1.data_type().clone(),
3149 false,
3150 )]));
3151
3152 let file = tempfile().unwrap();
3153 let mut writer =
3154 ArrowWriter::try_new(file.try_clone().unwrap(), schema1.clone(), None).unwrap();
3155
3156 let rb1 = RecordBatch::try_new(schema1.clone(), vec![Arc::new(array1)]).unwrap();
3157 writer.write(&rb1).unwrap();
3158
3159 let schema2 = Arc::new(Schema::new(vec![Field::new(
3160 "a",
3161 array2.data_type().clone(),
3162 false,
3163 )]));
3164 let rb2 = RecordBatch::try_new(schema2, vec![Arc::new(array2)]).unwrap();
3165 writer.write(&rb2).unwrap();
3166
3167 writer.close().unwrap();
3168
3169 let mut record_batch_reader =
3170 ParquetRecordBatchReader::try_new(file.try_clone().unwrap(), 1024).unwrap();
3171 let actual_batch = record_batch_reader.next().unwrap().unwrap();
3172
3173 let expected_batch =
3174 RecordBatch::try_new(schema1, vec![Arc::new(expected_result)]).unwrap();
3175 assert_eq!(actual_batch, expected_batch);
3176 }
3177
3178 ensure_compatible_write(
3181 DictionaryArray::new(
3182 UInt8Array::from_iter_values(vec![0]),
3183 Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3184 ),
3185 StringArray::from_iter_values(vec!["barquet"]),
3186 DictionaryArray::new(
3187 UInt8Array::from_iter_values(vec![0, 1]),
3188 Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3189 ),
3190 );
3191
3192 ensure_compatible_write(
3193 StringArray::from_iter_values(vec!["parquet"]),
3194 DictionaryArray::new(
3195 UInt8Array::from_iter_values(vec![0]),
3196 Arc::new(StringArray::from_iter_values(vec!["barquet"])),
3197 ),
3198 StringArray::from_iter_values(vec!["parquet", "barquet"]),
3199 );
3200
3201 ensure_compatible_write(
3204 DictionaryArray::new(
3205 UInt8Array::from_iter_values(vec![0]),
3206 Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3207 ),
3208 DictionaryArray::new(
3209 UInt16Array::from_iter_values(vec![0]),
3210 Arc::new(StringArray::from_iter_values(vec!["barquet"])),
3211 ),
3212 DictionaryArray::new(
3213 UInt8Array::from_iter_values(vec![0, 1]),
3214 Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3215 ),
3216 );
3217
3218 ensure_compatible_write(
3220 DictionaryArray::new(
3221 UInt8Array::from_iter_values(vec![0]),
3222 Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3223 ),
3224 DictionaryArray::new(
3225 UInt8Array::from_iter_values(vec![0]),
3226 Arc::new(LargeStringArray::from_iter_values(vec!["barquet"])),
3227 ),
3228 DictionaryArray::new(
3229 UInt8Array::from_iter_values(vec![0, 1]),
3230 Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3231 ),
3232 );
3233
3234 ensure_compatible_write(
3236 DictionaryArray::new(
3237 UInt8Array::from_iter_values(vec![0]),
3238 Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3239 ),
3240 LargeStringArray::from_iter_values(vec!["barquet"]),
3241 DictionaryArray::new(
3242 UInt8Array::from_iter_values(vec![0, 1]),
3243 Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3244 ),
3245 );
3246
3247 ensure_compatible_write(
3250 StringArray::from_iter_values(vec!["parquet"]),
3251 LargeStringArray::from_iter_values(vec!["barquet"]),
3252 StringArray::from_iter_values(vec!["parquet", "barquet"]),
3253 );
3254
3255 ensure_compatible_write(
3256 LargeStringArray::from_iter_values(vec!["parquet"]),
3257 StringArray::from_iter_values(vec!["barquet"]),
3258 LargeStringArray::from_iter_values(vec!["parquet", "barquet"]),
3259 );
3260
3261 ensure_compatible_write(
3262 StringArray::from_iter_values(vec!["parquet"]),
3263 StringViewArray::from_iter_values(vec!["barquet"]),
3264 StringArray::from_iter_values(vec!["parquet", "barquet"]),
3265 );
3266
3267 ensure_compatible_write(
3268 StringViewArray::from_iter_values(vec!["parquet"]),
3269 StringArray::from_iter_values(vec!["barquet"]),
3270 StringViewArray::from_iter_values(vec!["parquet", "barquet"]),
3271 );
3272
3273 ensure_compatible_write(
3274 LargeStringArray::from_iter_values(vec!["parquet"]),
3275 StringViewArray::from_iter_values(vec!["barquet"]),
3276 LargeStringArray::from_iter_values(vec!["parquet", "barquet"]),
3277 );
3278
3279 ensure_compatible_write(
3280 StringViewArray::from_iter_values(vec!["parquet"]),
3281 LargeStringArray::from_iter_values(vec!["barquet"]),
3282 StringViewArray::from_iter_values(vec!["parquet", "barquet"]),
3283 );
3284
3285 ensure_compatible_write(
3288 BinaryArray::from_iter_values(vec![b"parquet"]),
3289 LargeBinaryArray::from_iter_values(vec![b"barquet"]),
3290 BinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3291 );
3292
3293 ensure_compatible_write(
3294 LargeBinaryArray::from_iter_values(vec![b"parquet"]),
3295 BinaryArray::from_iter_values(vec![b"barquet"]),
3296 LargeBinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3297 );
3298
3299 ensure_compatible_write(
3300 BinaryArray::from_iter_values(vec![b"parquet"]),
3301 BinaryViewArray::from_iter_values(vec![b"barquet"]),
3302 BinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3303 );
3304
3305 ensure_compatible_write(
3306 BinaryViewArray::from_iter_values(vec![b"parquet"]),
3307 BinaryArray::from_iter_values(vec![b"barquet"]),
3308 BinaryViewArray::from_iter_values(vec![b"parquet", b"barquet"]),
3309 );
3310
3311 ensure_compatible_write(
3312 BinaryViewArray::from_iter_values(vec![b"parquet"]),
3313 LargeBinaryArray::from_iter_values(vec![b"barquet"]),
3314 BinaryViewArray::from_iter_values(vec![b"parquet", b"barquet"]),
3315 );
3316
3317 ensure_compatible_write(
3318 LargeBinaryArray::from_iter_values(vec![b"parquet"]),
3319 BinaryViewArray::from_iter_values(vec![b"barquet"]),
3320 LargeBinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3321 );
3322 }
3323
3324 #[test]
3325 fn arrow_writer_primitive_dictionary() {
3326 #[allow(deprecated)]
3328 let schema = Arc::new(Schema::new(vec![Field::new_dict(
3329 "dictionary",
3330 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::UInt32)),
3331 true,
3332 42,
3333 true,
3334 )]));
3335
3336 let mut builder = PrimitiveDictionaryBuilder::<UInt8Type, UInt32Type>::new();
3338 builder.append(12345678).unwrap();
3339 builder.append_null();
3340 builder.append(22345678).unwrap();
3341 builder.append(12345678).unwrap();
3342 let d = builder.finish();
3343
3344 one_column_roundtrip_with_schema(Arc::new(d), schema);
3345 }
3346
3347 #[test]
3348 fn arrow_writer_decimal32_dictionary() {
3349 let integers = vec![12345, 56789, 34567];
3350
3351 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3352
3353 let values = Decimal32Array::from(integers.clone())
3354 .with_precision_and_scale(5, 2)
3355 .unwrap();
3356
3357 let array = DictionaryArray::new(keys, Arc::new(values));
3358 one_column_roundtrip(Arc::new(array.clone()), true);
3359
3360 let values = Decimal32Array::from(integers)
3361 .with_precision_and_scale(9, 2)
3362 .unwrap();
3363
3364 let array = array.with_values(Arc::new(values));
3365 one_column_roundtrip(Arc::new(array), true);
3366 }
3367
3368 #[test]
3369 fn arrow_writer_decimal64_dictionary() {
3370 let integers = vec![12345, 56789, 34567];
3371
3372 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3373
3374 let values = Decimal64Array::from(integers.clone())
3375 .with_precision_and_scale(5, 2)
3376 .unwrap();
3377
3378 let array = DictionaryArray::new(keys, Arc::new(values));
3379 one_column_roundtrip(Arc::new(array.clone()), true);
3380
3381 let values = Decimal64Array::from(integers)
3382 .with_precision_and_scale(12, 2)
3383 .unwrap();
3384
3385 let array = array.with_values(Arc::new(values));
3386 one_column_roundtrip(Arc::new(array), true);
3387 }
3388
3389 #[test]
3390 fn arrow_writer_decimal128_dictionary() {
3391 let integers = vec![12345, 56789, 34567];
3392
3393 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3394
3395 let values = Decimal128Array::from(integers.clone())
3396 .with_precision_and_scale(5, 2)
3397 .unwrap();
3398
3399 let array = DictionaryArray::new(keys, Arc::new(values));
3400 one_column_roundtrip(Arc::new(array.clone()), true);
3401
3402 let values = Decimal128Array::from(integers)
3403 .with_precision_and_scale(12, 2)
3404 .unwrap();
3405
3406 let array = array.with_values(Arc::new(values));
3407 one_column_roundtrip(Arc::new(array), true);
3408 }
3409
3410 #[test]
3411 fn arrow_writer_decimal256_dictionary() {
3412 let integers = vec![
3413 i256::from_i128(12345),
3414 i256::from_i128(56789),
3415 i256::from_i128(34567),
3416 ];
3417
3418 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3419
3420 let values = Decimal256Array::from(integers.clone())
3421 .with_precision_and_scale(5, 2)
3422 .unwrap();
3423
3424 let array = DictionaryArray::new(keys, Arc::new(values));
3425 one_column_roundtrip(Arc::new(array.clone()), true);
3426
3427 let values = Decimal256Array::from(integers)
3428 .with_precision_and_scale(12, 2)
3429 .unwrap();
3430
3431 let array = array.with_values(Arc::new(values));
3432 one_column_roundtrip(Arc::new(array), true);
3433 }
3434
3435 #[test]
3436 fn arrow_writer_string_dictionary_unsigned_index() {
3437 #[allow(deprecated)]
3439 let schema = Arc::new(Schema::new(vec![Field::new_dict(
3440 "dictionary",
3441 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3442 true,
3443 42,
3444 true,
3445 )]));
3446
3447 let d: UInt8DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
3449 .iter()
3450 .copied()
3451 .collect();
3452
3453 one_column_roundtrip_with_schema(Arc::new(d), schema);
3454 }
3455
3456 #[test]
3457 fn u32_min_max() {
3458 let src = [
3460 u32::MIN,
3461 u32::MIN + 1,
3462 (i32::MAX as u32) - 1,
3463 i32::MAX as u32,
3464 (i32::MAX as u32) + 1,
3465 u32::MAX - 1,
3466 u32::MAX,
3467 ];
3468 let values = Arc::new(UInt32Array::from_iter_values(src.iter().cloned()));
3469 let files = one_column_roundtrip(values, false);
3470
3471 for file in files {
3472 let reader = SerializedFileReader::new(file).unwrap();
3474 let metadata = reader.metadata();
3475
3476 let mut row_offset = 0;
3477 for row_group in metadata.row_groups() {
3478 assert_eq!(row_group.num_columns(), 1);
3479 let column = row_group.column(0);
3480
3481 let num_values = column.num_values() as usize;
3482 let src_slice = &src[row_offset..row_offset + num_values];
3483 row_offset += column.num_values() as usize;
3484
3485 let stats = column.statistics().unwrap();
3486 if let Statistics::Int32(stats) = stats {
3487 assert_eq!(
3488 *stats.min_opt().unwrap() as u32,
3489 *src_slice.iter().min().unwrap()
3490 );
3491 assert_eq!(
3492 *stats.max_opt().unwrap() as u32,
3493 *src_slice.iter().max().unwrap()
3494 );
3495 } else {
3496 panic!("Statistics::Int32 missing")
3497 }
3498 }
3499 }
3500 }
3501
3502 #[test]
3503 fn u64_min_max() {
3504 let src = [
3506 u64::MIN,
3507 u64::MIN + 1,
3508 (i64::MAX as u64) - 1,
3509 i64::MAX as u64,
3510 (i64::MAX as u64) + 1,
3511 u64::MAX - 1,
3512 u64::MAX,
3513 ];
3514 let values = Arc::new(UInt64Array::from_iter_values(src.iter().cloned()));
3515 let files = one_column_roundtrip(values, false);
3516
3517 for file in files {
3518 let reader = SerializedFileReader::new(file).unwrap();
3520 let metadata = reader.metadata();
3521
3522 let mut row_offset = 0;
3523 for row_group in metadata.row_groups() {
3524 assert_eq!(row_group.num_columns(), 1);
3525 let column = row_group.column(0);
3526
3527 let num_values = column.num_values() as usize;
3528 let src_slice = &src[row_offset..row_offset + num_values];
3529 row_offset += column.num_values() as usize;
3530
3531 let stats = column.statistics().unwrap();
3532 if let Statistics::Int64(stats) = stats {
3533 assert_eq!(
3534 *stats.min_opt().unwrap() as u64,
3535 *src_slice.iter().min().unwrap()
3536 );
3537 assert_eq!(
3538 *stats.max_opt().unwrap() as u64,
3539 *src_slice.iter().max().unwrap()
3540 );
3541 } else {
3542 panic!("Statistics::Int64 missing")
3543 }
3544 }
3545 }
3546 }
3547
3548 #[test]
3549 fn statistics_null_counts_only_nulls() {
3550 let values = Arc::new(UInt64Array::from(vec![None, None]));
3552 let files = one_column_roundtrip(values, true);
3553
3554 for file in files {
3555 let reader = SerializedFileReader::new(file).unwrap();
3557 let metadata = reader.metadata();
3558 assert_eq!(metadata.num_row_groups(), 1);
3559 let row_group = metadata.row_group(0);
3560 assert_eq!(row_group.num_columns(), 1);
3561 let column = row_group.column(0);
3562 let stats = column.statistics().unwrap();
3563 assert_eq!(stats.null_count_opt(), Some(2));
3564 }
3565 }
3566
3567 #[test]
3568 fn test_list_of_struct_roundtrip() {
3569 let int_field = Field::new("a", DataType::Int32, true);
3571 let int_field2 = Field::new("b", DataType::Int32, true);
3572
3573 let int_builder = Int32Builder::with_capacity(10);
3574 let int_builder2 = Int32Builder::with_capacity(10);
3575
3576 let struct_builder = StructBuilder::new(
3577 vec![int_field, int_field2],
3578 vec![Box::new(int_builder), Box::new(int_builder2)],
3579 );
3580 let mut list_builder = ListBuilder::new(struct_builder);
3581
3582 let values = list_builder.values();
3587 values
3588 .field_builder::<Int32Builder>(0)
3589 .unwrap()
3590 .append_value(1);
3591 values
3592 .field_builder::<Int32Builder>(1)
3593 .unwrap()
3594 .append_value(2);
3595 values.append(true);
3596 list_builder.append(true);
3597
3598 list_builder.append(true);
3600
3601 list_builder.append(false);
3603
3604 let values = list_builder.values();
3606 values
3607 .field_builder::<Int32Builder>(0)
3608 .unwrap()
3609 .append_null();
3610 values
3611 .field_builder::<Int32Builder>(1)
3612 .unwrap()
3613 .append_null();
3614 values.append(false);
3615 values
3616 .field_builder::<Int32Builder>(0)
3617 .unwrap()
3618 .append_null();
3619 values
3620 .field_builder::<Int32Builder>(1)
3621 .unwrap()
3622 .append_null();
3623 values.append(false);
3624 list_builder.append(true);
3625
3626 let values = list_builder.values();
3628 values
3629 .field_builder::<Int32Builder>(0)
3630 .unwrap()
3631 .append_null();
3632 values
3633 .field_builder::<Int32Builder>(1)
3634 .unwrap()
3635 .append_value(3);
3636 values.append(true);
3637 list_builder.append(true);
3638
3639 let values = list_builder.values();
3641 values
3642 .field_builder::<Int32Builder>(0)
3643 .unwrap()
3644 .append_value(2);
3645 values
3646 .field_builder::<Int32Builder>(1)
3647 .unwrap()
3648 .append_null();
3649 values.append(true);
3650 list_builder.append(true);
3651
3652 let array = Arc::new(list_builder.finish());
3653
3654 one_column_roundtrip(array, true);
3655 }
3656
3657 fn row_group_sizes(metadata: &ParquetMetaData) -> Vec<i64> {
3658 metadata.row_groups().iter().map(|x| x.num_rows()).collect()
3659 }
3660
3661 #[test]
3662 fn test_aggregates_records() {
3663 let arrays = [
3664 Int32Array::from((0..100).collect::<Vec<_>>()),
3665 Int32Array::from((0..50).collect::<Vec<_>>()),
3666 Int32Array::from((200..500).collect::<Vec<_>>()),
3667 ];
3668
3669 let schema = Arc::new(Schema::new(vec![Field::new(
3670 "int",
3671 ArrowDataType::Int32,
3672 false,
3673 )]));
3674
3675 let file = tempfile::tempfile().unwrap();
3676
3677 let props = WriterProperties::builder()
3678 .set_max_row_group_size(200)
3679 .build();
3680
3681 let mut writer =
3682 ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
3683
3684 for array in arrays {
3685 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
3686 writer.write(&batch).unwrap();
3687 }
3688
3689 writer.close().unwrap();
3690
3691 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3692 assert_eq!(&row_group_sizes(builder.metadata()), &[200, 200, 50]);
3693
3694 let batches = builder
3695 .with_batch_size(100)
3696 .build()
3697 .unwrap()
3698 .collect::<ArrowResult<Vec<_>>>()
3699 .unwrap();
3700
3701 assert_eq!(batches.len(), 5);
3702 assert!(batches.iter().all(|x| x.num_columns() == 1));
3703
3704 let batch_sizes: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
3705
3706 assert_eq!(&batch_sizes, &[100, 100, 100, 100, 50]);
3707
3708 let values: Vec<_> = batches
3709 .iter()
3710 .flat_map(|x| {
3711 x.column(0)
3712 .as_any()
3713 .downcast_ref::<Int32Array>()
3714 .unwrap()
3715 .values()
3716 .iter()
3717 .cloned()
3718 })
3719 .collect();
3720
3721 let expected_values: Vec<_> = [0..100, 0..50, 200..500].into_iter().flatten().collect();
3722 assert_eq!(&values, &expected_values)
3723 }
3724
3725 #[test]
3726 fn complex_aggregate() {
3727 let field_a = Arc::new(Field::new("leaf_a", DataType::Int32, false));
3729 let field_b = Arc::new(Field::new("leaf_b", DataType::Int32, true));
3730 let struct_a = Arc::new(Field::new(
3731 "struct_a",
3732 DataType::Struct(vec![field_a.clone(), field_b.clone()].into()),
3733 true,
3734 ));
3735
3736 let list_a = Arc::new(Field::new("list", DataType::List(struct_a), true));
3737 let struct_b = Arc::new(Field::new(
3738 "struct_b",
3739 DataType::Struct(vec![list_a.clone()].into()),
3740 false,
3741 ));
3742
3743 let schema = Arc::new(Schema::new(vec![struct_b]));
3744
3745 let field_a_array = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
3747 let field_b_array =
3748 Int32Array::from_iter(vec![Some(1), None, Some(2), None, None, Some(6)]);
3749
3750 let struct_a_array = StructArray::from(vec![
3751 (field_a.clone(), Arc::new(field_a_array) as ArrayRef),
3752 (field_b.clone(), Arc::new(field_b_array) as ArrayRef),
3753 ]);
3754
3755 let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
3756 .len(5)
3757 .add_buffer(Buffer::from_iter(vec![
3758 0_i32, 1_i32, 1_i32, 3_i32, 3_i32, 5_i32,
3759 ]))
3760 .null_bit_buffer(Some(Buffer::from_iter(vec![
3761 true, false, true, false, true,
3762 ])))
3763 .child_data(vec![struct_a_array.into_data()])
3764 .build()
3765 .unwrap();
3766
3767 let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
3768 let struct_b_array = StructArray::from(vec![(list_a.clone(), list_a_array)]);
3769
3770 let batch1 =
3771 RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
3772 .unwrap();
3773
3774 let field_a_array = Int32Array::from(vec![6, 7, 8, 9, 10]);
3775 let field_b_array = Int32Array::from_iter(vec![None, None, None, Some(1), None]);
3776
3777 let struct_a_array = StructArray::from(vec![
3778 (field_a, Arc::new(field_a_array) as ArrayRef),
3779 (field_b, Arc::new(field_b_array) as ArrayRef),
3780 ]);
3781
3782 let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
3783 .len(2)
3784 .add_buffer(Buffer::from_iter(vec![0_i32, 4_i32, 5_i32]))
3785 .child_data(vec![struct_a_array.into_data()])
3786 .build()
3787 .unwrap();
3788
3789 let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
3790 let struct_b_array = StructArray::from(vec![(list_a, list_a_array)]);
3791
3792 let batch2 =
3793 RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
3794 .unwrap();
3795
3796 let batches = &[batch1, batch2];
3797
3798 let expected = r#"
3801 +-------------------------------------------------------------------------------------------------------+
3802 | struct_b |
3803 +-------------------------------------------------------------------------------------------------------+
3804 | {list: [{leaf_a: 1, leaf_b: 1}]} |
3805 | {list: } |
3806 | {list: [{leaf_a: 2, leaf_b: }, {leaf_a: 3, leaf_b: 2}]} |
3807 | {list: } |
3808 | {list: [{leaf_a: 4, leaf_b: }, {leaf_a: 5, leaf_b: }]} |
3809 | {list: [{leaf_a: 6, leaf_b: }, {leaf_a: 7, leaf_b: }, {leaf_a: 8, leaf_b: }, {leaf_a: 9, leaf_b: 1}]} |
3810 | {list: [{leaf_a: 10, leaf_b: }]} |
3811 +-------------------------------------------------------------------------------------------------------+
3812 "#.trim().split('\n').map(|x| x.trim()).collect::<Vec<_>>().join("\n");
3813
3814 let actual = pretty_format_batches(batches).unwrap().to_string();
3815 assert_eq!(actual, expected);
3816
3817 let file = tempfile::tempfile().unwrap();
3819 let props = WriterProperties::builder()
3820 .set_max_row_group_size(6)
3821 .build();
3822
3823 let mut writer =
3824 ArrowWriter::try_new(file.try_clone().unwrap(), schema, Some(props)).unwrap();
3825
3826 for batch in batches {
3827 writer.write(batch).unwrap();
3828 }
3829 writer.close().unwrap();
3830
3831 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3836 assert_eq!(&row_group_sizes(builder.metadata()), &[6, 1]);
3837
3838 let batches = builder
3839 .with_batch_size(2)
3840 .build()
3841 .unwrap()
3842 .collect::<ArrowResult<Vec<_>>>()
3843 .unwrap();
3844
3845 assert_eq!(batches.len(), 4);
3846 let batch_counts: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
3847 assert_eq!(&batch_counts, &[2, 2, 2, 1]);
3848
3849 let actual = pretty_format_batches(&batches).unwrap().to_string();
3850 assert_eq!(actual, expected);
3851 }
3852
3853 #[test]
3854 fn test_arrow_writer_metadata() {
3855 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3856 let file_schema = batch_schema.clone().with_metadata(
3857 vec![("foo".to_string(), "bar".to_string())]
3858 .into_iter()
3859 .collect(),
3860 );
3861
3862 let batch = RecordBatch::try_new(
3863 Arc::new(batch_schema),
3864 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3865 )
3866 .unwrap();
3867
3868 let mut buf = Vec::with_capacity(1024);
3869 let mut writer = ArrowWriter::try_new(&mut buf, Arc::new(file_schema), None).unwrap();
3870 writer.write(&batch).unwrap();
3871 writer.close().unwrap();
3872 }
3873
3874 #[test]
3875 fn test_arrow_writer_nullable() {
3876 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3877 let file_schema = Schema::new(vec![Field::new("int32", DataType::Int32, true)]);
3878 let file_schema = Arc::new(file_schema);
3879
3880 let batch = RecordBatch::try_new(
3881 Arc::new(batch_schema),
3882 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3883 )
3884 .unwrap();
3885
3886 let mut buf = Vec::with_capacity(1024);
3887 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
3888 writer.write(&batch).unwrap();
3889 writer.close().unwrap();
3890
3891 let mut read = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
3892 let back = read.next().unwrap().unwrap();
3893 assert_eq!(back.schema(), file_schema);
3894 assert_ne!(back.schema(), batch.schema());
3895 assert_eq!(back.column(0).as_ref(), batch.column(0).as_ref());
3896 }
3897
3898 #[test]
3899 fn in_progress_accounting() {
3900 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
3902
3903 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
3905
3906 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
3908
3909 let mut writer = ArrowWriter::try_new(vec![], batch.schema(), None).unwrap();
3910
3911 assert_eq!(writer.in_progress_size(), 0);
3913 assert_eq!(writer.in_progress_rows(), 0);
3914 assert_eq!(writer.memory_size(), 0);
3915 assert_eq!(writer.bytes_written(), 4); writer.write(&batch).unwrap();
3917
3918 let initial_size = writer.in_progress_size();
3920 assert!(initial_size > 0);
3921 assert_eq!(writer.in_progress_rows(), 5);
3922 let initial_memory = writer.memory_size();
3923 assert!(initial_memory > 0);
3924 assert!(
3926 initial_size <= initial_memory,
3927 "{initial_size} <= {initial_memory}"
3928 );
3929
3930 writer.write(&batch).unwrap();
3932 assert!(writer.in_progress_size() > initial_size);
3933 assert_eq!(writer.in_progress_rows(), 10);
3934 assert!(writer.memory_size() > initial_memory);
3935 assert!(
3936 writer.in_progress_size() <= writer.memory_size(),
3937 "in_progress_size {} <= memory_size {}",
3938 writer.in_progress_size(),
3939 writer.memory_size()
3940 );
3941
3942 let pre_flush_bytes_written = writer.bytes_written();
3944 writer.flush().unwrap();
3945 assert_eq!(writer.in_progress_size(), 0);
3946 assert_eq!(writer.memory_size(), 0);
3947 assert!(writer.bytes_written() > pre_flush_bytes_written);
3948
3949 writer.close().unwrap();
3950 }
3951
3952 #[test]
3953 fn test_writer_all_null() {
3954 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
3955 let b = Int32Array::new(vec![0; 5].into(), Some(NullBuffer::new_null(5)));
3956 let batch = RecordBatch::try_from_iter(vec![
3957 ("a", Arc::new(a) as ArrayRef),
3958 ("b", Arc::new(b) as ArrayRef),
3959 ])
3960 .unwrap();
3961
3962 let mut buf = Vec::with_capacity(1024);
3963 let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), None).unwrap();
3964 writer.write(&batch).unwrap();
3965 writer.close().unwrap();
3966
3967 let bytes = Bytes::from(buf);
3968 let options = ReadOptionsBuilder::new().with_page_index().build();
3969 let reader = SerializedFileReader::new_with_options(bytes, options).unwrap();
3970 let index = reader.metadata().offset_index().unwrap();
3971
3972 assert_eq!(index.len(), 1);
3973 assert_eq!(index[0].len(), 2); assert_eq!(index[0][0].page_locations().len(), 1); assert_eq!(index[0][1].page_locations().len(), 1); }
3977
3978 #[test]
3979 fn test_disabled_statistics_with_page() {
3980 let file_schema = Schema::new(vec![
3981 Field::new("a", DataType::Utf8, true),
3982 Field::new("b", DataType::Utf8, true),
3983 ]);
3984 let file_schema = Arc::new(file_schema);
3985
3986 let batch = RecordBatch::try_new(
3987 file_schema.clone(),
3988 vec![
3989 Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
3990 Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
3991 ],
3992 )
3993 .unwrap();
3994
3995 let props = WriterProperties::builder()
3996 .set_statistics_enabled(EnabledStatistics::None)
3997 .set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
3998 .build();
3999
4000 let mut buf = Vec::with_capacity(1024);
4001 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
4002 writer.write(&batch).unwrap();
4003
4004 let metadata = writer.close().unwrap();
4005 assert_eq!(metadata.num_row_groups(), 1);
4006 let row_group = metadata.row_group(0);
4007 assert_eq!(row_group.num_columns(), 2);
4008 assert!(row_group.column(0).offset_index_offset().is_some());
4010 assert!(row_group.column(0).column_index_offset().is_some());
4011 assert!(row_group.column(1).offset_index_offset().is_some());
4013 assert!(row_group.column(1).column_index_offset().is_none());
4014
4015 let options = ReadOptionsBuilder::new().with_page_index().build();
4016 let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
4017
4018 let row_group = reader.get_row_group(0).unwrap();
4019 let a_col = row_group.metadata().column(0);
4020 let b_col = row_group.metadata().column(1);
4021
4022 if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
4024 let min = byte_array_stats.min_opt().unwrap();
4025 let max = byte_array_stats.max_opt().unwrap();
4026
4027 assert_eq!(min.as_bytes(), b"a");
4028 assert_eq!(max.as_bytes(), b"d");
4029 } else {
4030 panic!("expecting Statistics::ByteArray");
4031 }
4032
4033 assert!(b_col.statistics().is_none());
4035
4036 let offset_index = reader.metadata().offset_index().unwrap();
4037 assert_eq!(offset_index.len(), 1); assert_eq!(offset_index[0].len(), 2); let column_index = reader.metadata().column_index().unwrap();
4041 assert_eq!(column_index.len(), 1); assert_eq!(column_index[0].len(), 2); let a_idx = &column_index[0][0];
4045 assert!(
4046 matches!(a_idx, ColumnIndexMetaData::BYTE_ARRAY(_)),
4047 "{a_idx:?}"
4048 );
4049 let b_idx = &column_index[0][1];
4050 assert!(matches!(b_idx, ColumnIndexMetaData::NONE), "{b_idx:?}");
4051 }
4052
4053 #[test]
4054 fn test_disabled_statistics_with_chunk() {
4055 let file_schema = Schema::new(vec![
4056 Field::new("a", DataType::Utf8, true),
4057 Field::new("b", DataType::Utf8, true),
4058 ]);
4059 let file_schema = Arc::new(file_schema);
4060
4061 let batch = RecordBatch::try_new(
4062 file_schema.clone(),
4063 vec![
4064 Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
4065 Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
4066 ],
4067 )
4068 .unwrap();
4069
4070 let props = WriterProperties::builder()
4071 .set_statistics_enabled(EnabledStatistics::None)
4072 .set_column_statistics_enabled("a".into(), EnabledStatistics::Chunk)
4073 .build();
4074
4075 let mut buf = Vec::with_capacity(1024);
4076 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
4077 writer.write(&batch).unwrap();
4078
4079 let metadata = writer.close().unwrap();
4080 assert_eq!(metadata.num_row_groups(), 1);
4081 let row_group = metadata.row_group(0);
4082 assert_eq!(row_group.num_columns(), 2);
4083 assert!(row_group.column(0).offset_index_offset().is_some());
4085 assert!(row_group.column(0).column_index_offset().is_none());
4086 assert!(row_group.column(1).offset_index_offset().is_some());
4088 assert!(row_group.column(1).column_index_offset().is_none());
4089
4090 let options = ReadOptionsBuilder::new().with_page_index().build();
4091 let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
4092
4093 let row_group = reader.get_row_group(0).unwrap();
4094 let a_col = row_group.metadata().column(0);
4095 let b_col = row_group.metadata().column(1);
4096
4097 if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
4099 let min = byte_array_stats.min_opt().unwrap();
4100 let max = byte_array_stats.max_opt().unwrap();
4101
4102 assert_eq!(min.as_bytes(), b"a");
4103 assert_eq!(max.as_bytes(), b"d");
4104 } else {
4105 panic!("expecting Statistics::ByteArray");
4106 }
4107
4108 assert!(b_col.statistics().is_none());
4110
4111 let column_index = reader.metadata().column_index().unwrap();
4112 assert_eq!(column_index.len(), 1); assert_eq!(column_index[0].len(), 2); let a_idx = &column_index[0][0];
4116 assert!(matches!(a_idx, ColumnIndexMetaData::NONE), "{a_idx:?}");
4117 let b_idx = &column_index[0][1];
4118 assert!(matches!(b_idx, ColumnIndexMetaData::NONE), "{b_idx:?}");
4119 }
4120
4121 #[test]
4122 fn test_arrow_writer_skip_metadata() {
4123 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
4124 let file_schema = Arc::new(batch_schema.clone());
4125
4126 let batch = RecordBatch::try_new(
4127 Arc::new(batch_schema),
4128 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4129 )
4130 .unwrap();
4131 let skip_options = ArrowWriterOptions::new().with_skip_arrow_metadata(true);
4132
4133 let mut buf = Vec::with_capacity(1024);
4134 let mut writer =
4135 ArrowWriter::try_new_with_options(&mut buf, file_schema.clone(), skip_options).unwrap();
4136 writer.write(&batch).unwrap();
4137 writer.close().unwrap();
4138
4139 let bytes = Bytes::from(buf);
4140 let reader_builder = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
4141 assert_eq!(file_schema, *reader_builder.schema());
4142 if let Some(key_value_metadata) = reader_builder
4143 .metadata()
4144 .file_metadata()
4145 .key_value_metadata()
4146 {
4147 assert!(
4148 !key_value_metadata
4149 .iter()
4150 .any(|kv| kv.key.as_str() == ARROW_SCHEMA_META_KEY)
4151 );
4152 }
4153 }
4154
4155 #[test]
4156 fn test_arrow_writer_explicit_schema() {
4157 let batch_schema = Arc::new(Schema::new(vec![Field::new(
4159 "integers",
4160 DataType::Int32,
4161 true,
4162 )]));
4163 let parquet_schema = Type::group_type_builder("root")
4164 .with_fields(vec![Type::primitive_type_builder(
4165 "integers",
4166 crate::basic::Type::INT64,
4167 )
4168 .build()
4169 .unwrap()
4170 .into()])
4171 .build()
4172 .unwrap();
4173 let parquet_schema_descr = SchemaDescriptor::new(parquet_schema.into());
4174
4175 let batch = RecordBatch::try_new(
4176 batch_schema.clone(),
4177 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4178 )
4179 .unwrap();
4180
4181 let explicit_schema_options =
4182 ArrowWriterOptions::new().with_parquet_schema(parquet_schema_descr);
4183 let mut buf = Vec::with_capacity(1024);
4184 let mut writer = ArrowWriter::try_new_with_options(
4185 &mut buf,
4186 batch_schema.clone(),
4187 explicit_schema_options,
4188 )
4189 .unwrap();
4190 writer.write(&batch).unwrap();
4191 writer.close().unwrap();
4192
4193 let bytes = Bytes::from(buf);
4194 let reader_builder = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
4195
4196 let expected_schema = Arc::new(Schema::new(vec![Field::new(
4197 "integers",
4198 DataType::Int64,
4199 true,
4200 )]));
4201 assert_eq!(reader_builder.schema(), &expected_schema);
4202
4203 let batches = reader_builder
4204 .build()
4205 .unwrap()
4206 .collect::<Result<Vec<_>, ArrowError>>()
4207 .unwrap();
4208 assert_eq!(batches.len(), 1);
4209
4210 let expected_batch = RecordBatch::try_new(
4211 expected_schema.clone(),
4212 vec![Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as _],
4213 )
4214 .unwrap();
4215 assert_eq!(batches[0], expected_batch);
4216 }
4217
4218 #[test]
4219 fn mismatched_schemas() {
4220 let batch_schema = Schema::new(vec![Field::new("count", DataType::Int32, false)]);
4221 let file_schema = Arc::new(Schema::new(vec![Field::new(
4222 "temperature",
4223 DataType::Float64,
4224 false,
4225 )]));
4226
4227 let batch = RecordBatch::try_new(
4228 Arc::new(batch_schema),
4229 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4230 )
4231 .unwrap();
4232
4233 let mut buf = Vec::with_capacity(1024);
4234 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
4235
4236 let err = writer.write(&batch).unwrap_err().to_string();
4237 assert_eq!(
4238 err,
4239 "Arrow: Incompatible type. Field 'temperature' has type Float64, array has type Int32"
4240 );
4241 }
4242
4243 #[test]
4244 fn test_roundtrip_empty_schema() {
4246 let empty_batch = RecordBatch::try_new_with_options(
4248 Arc::new(Schema::empty()),
4249 vec![],
4250 &RecordBatchOptions::default().with_row_count(Some(0)),
4251 )
4252 .unwrap();
4253
4254 let mut parquet_bytes: Vec<u8> = Vec::new();
4256 let mut writer =
4257 ArrowWriter::try_new(&mut parquet_bytes, empty_batch.schema(), None).unwrap();
4258 writer.write(&empty_batch).unwrap();
4259 writer.close().unwrap();
4260
4261 let bytes = Bytes::from(parquet_bytes);
4263 let reader = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
4264 assert_eq!(reader.schema(), &empty_batch.schema());
4265 let batches: Vec<_> = reader
4266 .build()
4267 .unwrap()
4268 .collect::<ArrowResult<Vec<_>>>()
4269 .unwrap();
4270 assert_eq!(batches.len(), 0);
4271 }
4272
4273 #[test]
4274 fn test_page_stats_not_written_by_default() {
4275 let string_field = Field::new("a", DataType::Utf8, false);
4276 let schema = Schema::new(vec![string_field]);
4277 let raw_string_values = vec!["Blart Versenwald III"];
4278 let string_values = StringArray::from(raw_string_values.clone());
4279 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
4280
4281 let props = WriterProperties::builder()
4282 .set_statistics_enabled(EnabledStatistics::Page)
4283 .set_dictionary_enabled(false)
4284 .set_encoding(Encoding::PLAIN)
4285 .set_compression(crate::basic::Compression::UNCOMPRESSED)
4286 .build();
4287
4288 let file = roundtrip_opts(&batch, props);
4289
4290 let first_page = &file[4..];
4295 let mut prot = ThriftSliceInputProtocol::new(first_page);
4296 let hdr = PageHeader::read_thrift(&mut prot).unwrap();
4297 let stats = hdr.data_page_header.unwrap().statistics;
4298
4299 assert!(stats.is_none());
4300 }
4301
4302 #[test]
4303 fn test_page_stats_when_enabled() {
4304 let string_field = Field::new("a", DataType::Utf8, false);
4305 let schema = Schema::new(vec![string_field]);
4306 let raw_string_values = vec!["Blart Versenwald III", "Andrew Lamb"];
4307 let string_values = StringArray::from(raw_string_values.clone());
4308 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
4309
4310 let props = WriterProperties::builder()
4311 .set_statistics_enabled(EnabledStatistics::Page)
4312 .set_dictionary_enabled(false)
4313 .set_encoding(Encoding::PLAIN)
4314 .set_write_page_header_statistics(true)
4315 .set_compression(crate::basic::Compression::UNCOMPRESSED)
4316 .build();
4317
4318 let file = roundtrip_opts(&batch, props);
4319
4320 let first_page = &file[4..];
4325 let mut prot = ThriftSliceInputProtocol::new(first_page);
4326 let hdr = PageHeader::read_thrift(&mut prot).unwrap();
4327 let stats = hdr.data_page_header.unwrap().statistics;
4328
4329 let stats = stats.unwrap();
4330 assert!(stats.is_max_value_exact.unwrap());
4332 assert!(stats.is_min_value_exact.unwrap());
4333 assert_eq!(stats.max_value.unwrap(), "Blart Versenwald III".as_bytes());
4334 assert_eq!(stats.min_value.unwrap(), "Andrew Lamb".as_bytes());
4335 }
4336
4337 #[test]
4338 fn test_page_stats_truncation() {
4339 let string_field = Field::new("a", DataType::Utf8, false);
4340 let binary_field = Field::new("b", DataType::Binary, false);
4341 let schema = Schema::new(vec![string_field, binary_field]);
4342
4343 let raw_string_values = vec!["Blart Versenwald III"];
4344 let raw_binary_values = [b"Blart Versenwald III".to_vec()];
4345 let raw_binary_value_refs = raw_binary_values
4346 .iter()
4347 .map(|x| x.as_slice())
4348 .collect::<Vec<_>>();
4349
4350 let string_values = StringArray::from(raw_string_values.clone());
4351 let binary_values = BinaryArray::from(raw_binary_value_refs);
4352 let batch = RecordBatch::try_new(
4353 Arc::new(schema),
4354 vec![Arc::new(string_values), Arc::new(binary_values)],
4355 )
4356 .unwrap();
4357
4358 let props = WriterProperties::builder()
4359 .set_statistics_truncate_length(Some(2))
4360 .set_dictionary_enabled(false)
4361 .set_encoding(Encoding::PLAIN)
4362 .set_write_page_header_statistics(true)
4363 .set_compression(crate::basic::Compression::UNCOMPRESSED)
4364 .build();
4365
4366 let file = roundtrip_opts(&batch, props);
4367
4368 let first_page = &file[4..];
4373 let mut prot = ThriftSliceInputProtocol::new(first_page);
4374 let hdr = PageHeader::read_thrift(&mut prot).unwrap();
4375 let stats = hdr.data_page_header.unwrap().statistics;
4376 assert!(stats.is_some());
4377 let stats = stats.unwrap();
4378 assert!(!stats.is_max_value_exact.unwrap());
4380 assert!(!stats.is_min_value_exact.unwrap());
4381 assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
4382 assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
4383
4384 let second_page = &prot.as_slice()[hdr.compressed_page_size as usize..];
4386 let mut prot = ThriftSliceInputProtocol::new(second_page);
4387 let hdr = PageHeader::read_thrift(&mut prot).unwrap();
4388 let stats = hdr.data_page_header.unwrap().statistics;
4389 assert!(stats.is_some());
4390 let stats = stats.unwrap();
4391 assert!(!stats.is_max_value_exact.unwrap());
4393 assert!(!stats.is_min_value_exact.unwrap());
4394 assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
4395 assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
4396 }
4397
4398 #[test]
4399 fn test_page_encoding_statistics_roundtrip() {
4400 let batch_schema = Schema::new(vec![Field::new(
4401 "int32",
4402 arrow_schema::DataType::Int32,
4403 false,
4404 )]);
4405
4406 let batch = RecordBatch::try_new(
4407 Arc::new(batch_schema.clone()),
4408 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4409 )
4410 .unwrap();
4411
4412 let mut file: File = tempfile::tempfile().unwrap();
4413 let mut writer = ArrowWriter::try_new(&mut file, Arc::new(batch_schema), None).unwrap();
4414 writer.write(&batch).unwrap();
4415 let file_metadata = writer.close().unwrap();
4416
4417 assert_eq!(file_metadata.num_row_groups(), 1);
4418 assert_eq!(file_metadata.row_group(0).num_columns(), 1);
4419 assert!(
4420 file_metadata
4421 .row_group(0)
4422 .column(0)
4423 .page_encoding_stats()
4424 .is_some()
4425 );
4426 let chunk_page_stats = file_metadata
4427 .row_group(0)
4428 .column(0)
4429 .page_encoding_stats()
4430 .unwrap();
4431
4432 let options = ReadOptionsBuilder::new().with_page_index().build();
4434 let reader = SerializedFileReader::new_with_options(file, options).unwrap();
4435
4436 let rowgroup = reader.get_row_group(0).expect("row group missing");
4437 assert_eq!(rowgroup.num_columns(), 1);
4438 let column = rowgroup.metadata().column(0);
4439 assert!(column.page_encoding_stats().is_some());
4440 let file_page_stats = column.page_encoding_stats().unwrap();
4441 assert_eq!(chunk_page_stats, file_page_stats);
4442 }
4443
4444 #[test]
4445 fn test_different_dict_page_size_limit() {
4446 let array = Arc::new(Int64Array::from_iter(0..1024 * 1024));
4447 let schema = Arc::new(Schema::new(vec![
4448 Field::new("col0", arrow_schema::DataType::Int64, false),
4449 Field::new("col1", arrow_schema::DataType::Int64, false),
4450 ]));
4451 let batch =
4452 arrow_array::RecordBatch::try_new(schema.clone(), vec![array.clone(), array]).unwrap();
4453
4454 let props = WriterProperties::builder()
4455 .set_dictionary_page_size_limit(1024 * 1024)
4456 .set_column_dictionary_page_size_limit(ColumnPath::from("col1"), 1024 * 1024 * 4)
4457 .build();
4458 let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap();
4459 writer.write(&batch).unwrap();
4460 let data = Bytes::from(writer.into_inner().unwrap());
4461
4462 let mut metadata = ParquetMetaDataReader::new();
4463 metadata.try_parse(&data).unwrap();
4464 let metadata = metadata.finish().unwrap();
4465 let col0_meta = metadata.row_group(0).column(0);
4466 let col1_meta = metadata.row_group(0).column(1);
4467
4468 let get_dict_page_size = move |meta: &ColumnChunkMetaData| {
4469 let mut reader =
4470 SerializedPageReader::new(Arc::new(data.clone()), meta, 0, None).unwrap();
4471 let page = reader.get_next_page().unwrap().unwrap();
4472 match page {
4473 Page::DictionaryPage { buf, .. } => buf.len(),
4474 _ => panic!("expected DictionaryPage"),
4475 }
4476 };
4477
4478 assert_eq!(get_dict_page_size(col0_meta), 1024 * 1024);
4479 assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4);
4480 }
4481}