1use bytes::Bytes;
21use std::io::{Read, Write};
22use std::iter::Peekable;
23use std::slice::Iter;
24use std::sync::{Arc, Mutex};
25use std::vec::IntoIter;
26
27use arrow_array::cast::AsArray;
28use arrow_array::types::*;
29use arrow_array::{ArrayRef, RecordBatch, RecordBatchWriter};
30use arrow_schema::{ArrowError, DataType as ArrowDataType, Field, IntervalUnit, SchemaRef};
31
32use super::schema::{add_encoded_arrow_schema_to_metadata, decimal_length_from_precision};
33
34use crate::arrow::ArrowSchemaConverter;
35use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder;
36use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter};
37use crate::column::page_encryption::PageEncryptor;
38use crate::column::writer::encoder::ColumnValueEncoder;
39use crate::column::writer::{
40 ColumnCloseResult, ColumnWriter, GenericColumnWriter, get_column_writer,
41};
42use crate::data_type::{ByteArray, FixedLenByteArray};
43#[cfg(feature = "encryption")]
44use crate::encryption::encrypt::FileEncryptor;
45use crate::errors::{ParquetError, Result};
46use crate::file::metadata::{KeyValue, ParquetMetaData, RowGroupMetaData};
47use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
48use crate::file::reader::{ChunkReader, Length};
49use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
50use crate::parquet_thrift::{ThriftCompactOutputProtocol, WriteThrift};
51use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor};
52use levels::{ArrayLevels, calculate_array_levels};
53
54mod byte_array;
55mod levels;
56
57pub struct ArrowWriter<W: Write> {
174 writer: SerializedFileWriter<W>,
176
177 in_progress: Option<ArrowRowGroupWriter>,
179
180 arrow_schema: SchemaRef,
184
185 row_group_writer_factory: ArrowRowGroupWriterFactory,
187
188 max_row_group_size: usize,
190}
191
192impl<W: Write + Send> std::fmt::Debug for ArrowWriter<W> {
193 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
194 let buffered_memory = self.in_progress_size();
195 f.debug_struct("ArrowWriter")
196 .field("writer", &self.writer)
197 .field("in_progress_size", &format_args!("{buffered_memory} bytes"))
198 .field("in_progress_rows", &self.in_progress_rows())
199 .field("arrow_schema", &self.arrow_schema)
200 .field("max_row_group_size", &self.max_row_group_size)
201 .finish()
202 }
203}
204
205impl<W: Write + Send> ArrowWriter<W> {
206 pub fn try_new(
212 writer: W,
213 arrow_schema: SchemaRef,
214 props: Option<WriterProperties>,
215 ) -> Result<Self> {
216 let options = ArrowWriterOptions::new().with_properties(props.unwrap_or_default());
217 Self::try_new_with_options(writer, arrow_schema, options)
218 }
219
220 pub fn try_new_with_options(
226 writer: W,
227 arrow_schema: SchemaRef,
228 options: ArrowWriterOptions,
229 ) -> Result<Self> {
230 let mut props = options.properties;
231
232 let schema = if let Some(parquet_schema) = options.schema_descr {
233 parquet_schema.clone()
234 } else {
235 let mut converter = ArrowSchemaConverter::new().with_coerce_types(props.coerce_types());
236 if let Some(schema_root) = &options.schema_root {
237 converter = converter.schema_root(schema_root);
238 }
239
240 converter.convert(&arrow_schema)?
241 };
242
243 if !options.skip_arrow_metadata {
244 add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
246 }
247
248 let max_row_group_size = props.max_row_group_size();
249
250 let props_ptr = Arc::new(props);
251 let file_writer =
252 SerializedFileWriter::new(writer, schema.root_schema_ptr(), Arc::clone(&props_ptr))?;
253
254 let row_group_writer_factory =
255 ArrowRowGroupWriterFactory::new(&file_writer, arrow_schema.clone());
256
257 Ok(Self {
258 writer: file_writer,
259 in_progress: None,
260 arrow_schema,
261 row_group_writer_factory,
262 max_row_group_size,
263 })
264 }
265
266 pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
268 self.writer.flushed_row_groups()
269 }
270
271 pub fn memory_size(&self) -> usize {
276 match &self.in_progress {
277 Some(in_progress) => in_progress.writers.iter().map(|x| x.memory_size()).sum(),
278 None => 0,
279 }
280 }
281
282 pub fn in_progress_size(&self) -> usize {
289 match &self.in_progress {
290 Some(in_progress) => in_progress
291 .writers
292 .iter()
293 .map(|x| x.get_estimated_total_bytes())
294 .sum(),
295 None => 0,
296 }
297 }
298
299 pub fn in_progress_rows(&self) -> usize {
301 self.in_progress
302 .as_ref()
303 .map(|x| x.buffered_rows)
304 .unwrap_or_default()
305 }
306
307 pub fn bytes_written(&self) -> usize {
309 self.writer.bytes_written()
310 }
311
312 pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
320 if batch.num_rows() == 0 {
321 return Ok(());
322 }
323
324 let in_progress = match &mut self.in_progress {
325 Some(in_progress) => in_progress,
326 x => x.insert(
327 self.row_group_writer_factory
328 .create_row_group_writer(self.writer.flushed_row_groups().len())?,
329 ),
330 };
331
332 if in_progress.buffered_rows + batch.num_rows() > self.max_row_group_size {
334 let to_write = self.max_row_group_size - in_progress.buffered_rows;
335 let a = batch.slice(0, to_write);
336 let b = batch.slice(to_write, batch.num_rows() - to_write);
337 self.write(&a)?;
338 return self.write(&b);
339 }
340
341 in_progress.write(batch)?;
342
343 if in_progress.buffered_rows >= self.max_row_group_size {
344 self.flush()?
345 }
346 Ok(())
347 }
348
349 pub fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
354 self.writer.write_all(buf)
355 }
356
357 pub fn sync(&mut self) -> std::io::Result<()> {
359 self.writer.flush()
360 }
361
362 pub fn flush(&mut self) -> Result<()> {
367 let in_progress = match self.in_progress.take() {
368 Some(in_progress) => in_progress,
369 None => return Ok(()),
370 };
371
372 let mut row_group_writer = self.writer.next_row_group()?;
373 for chunk in in_progress.close()? {
374 chunk.append_to_row_group(&mut row_group_writer)?;
375 }
376 row_group_writer.close()?;
377 Ok(())
378 }
379
380 pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
384 self.writer.append_key_value_metadata(kv_metadata)
385 }
386
387 pub fn inner(&self) -> &W {
389 self.writer.inner()
390 }
391
392 pub fn inner_mut(&mut self) -> &mut W {
401 self.writer.inner_mut()
402 }
403
404 pub fn into_inner(mut self) -> Result<W> {
406 self.flush()?;
407 self.writer.into_inner()
408 }
409
410 pub fn finish(&mut self) -> Result<ParquetMetaData> {
416 self.flush()?;
417 self.writer.finish()
418 }
419
420 pub fn close(mut self) -> Result<ParquetMetaData> {
422 self.finish()
423 }
424
425 #[deprecated(
427 since = "56.2.0",
428 note = "Use `ArrowRowGroupWriterFactory` instead, see `ArrowColumnWriter` for an example"
429 )]
430 pub fn get_column_writers(&mut self) -> Result<Vec<ArrowColumnWriter>> {
431 self.flush()?;
432 let in_progress = self
433 .row_group_writer_factory
434 .create_row_group_writer(self.writer.flushed_row_groups().len())?;
435 Ok(in_progress.writers)
436 }
437
438 #[deprecated(
440 since = "56.2.0",
441 note = "Use `SerializedFileWriter` directly instead, see `ArrowColumnWriter` for an example"
442 )]
443 pub fn append_row_group(&mut self, chunks: Vec<ArrowColumnChunk>) -> Result<()> {
444 let mut row_group_writer = self.writer.next_row_group()?;
445 for chunk in chunks {
446 chunk.append_to_row_group(&mut row_group_writer)?;
447 }
448 row_group_writer.close()?;
449 Ok(())
450 }
451
452 pub fn into_serialized_writer(
459 mut self,
460 ) -> Result<(SerializedFileWriter<W>, ArrowRowGroupWriterFactory)> {
461 self.flush()?;
462 Ok((self.writer, self.row_group_writer_factory))
463 }
464}
465
466impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> {
467 fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
468 self.write(batch).map_err(|e| e.into())
469 }
470
471 fn close(self) -> std::result::Result<(), ArrowError> {
472 self.close()?;
473 Ok(())
474 }
475}
476
477#[derive(Debug, Clone, Default)]
481pub struct ArrowWriterOptions {
482 properties: WriterProperties,
483 skip_arrow_metadata: bool,
484 schema_root: Option<String>,
485 schema_descr: Option<SchemaDescriptor>,
486}
487
488impl ArrowWriterOptions {
489 pub fn new() -> Self {
491 Self::default()
492 }
493
494 pub fn with_properties(self, properties: WriterProperties) -> Self {
496 Self { properties, ..self }
497 }
498
499 pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
506 Self {
507 skip_arrow_metadata,
508 ..self
509 }
510 }
511
512 pub fn with_schema_root(self, schema_root: String) -> Self {
514 Self {
515 schema_root: Some(schema_root),
516 ..self
517 }
518 }
519
520 pub fn with_parquet_schema(self, schema_descr: SchemaDescriptor) -> Self {
526 Self {
527 schema_descr: Some(schema_descr),
528 ..self
529 }
530 }
531}
532
533#[derive(Default)]
535struct ArrowColumnChunkData {
536 length: usize,
537 data: Vec<Bytes>,
538}
539
540impl Length for ArrowColumnChunkData {
541 fn len(&self) -> u64 {
542 self.length as _
543 }
544}
545
546impl ChunkReader for ArrowColumnChunkData {
547 type T = ArrowColumnChunkReader;
548
549 fn get_read(&self, start: u64) -> Result<Self::T> {
550 assert_eq!(start, 0); Ok(ArrowColumnChunkReader(
552 self.data.clone().into_iter().peekable(),
553 ))
554 }
555
556 fn get_bytes(&self, _start: u64, _length: usize) -> Result<Bytes> {
557 unimplemented!()
558 }
559}
560
561struct ArrowColumnChunkReader(Peekable<IntoIter<Bytes>>);
563
564impl Read for ArrowColumnChunkReader {
565 fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
566 let buffer = loop {
567 match self.0.peek_mut() {
568 Some(b) if b.is_empty() => {
569 self.0.next();
570 continue;
571 }
572 Some(b) => break b,
573 None => return Ok(0),
574 }
575 };
576
577 let len = buffer.len().min(out.len());
578 let b = buffer.split_to(len);
579 out[..len].copy_from_slice(&b);
580 Ok(len)
581 }
582}
583
584type SharedColumnChunk = Arc<Mutex<ArrowColumnChunkData>>;
589
590#[derive(Default)]
591struct ArrowPageWriter {
592 buffer: SharedColumnChunk,
593 #[cfg(feature = "encryption")]
594 page_encryptor: Option<PageEncryptor>,
595}
596
597impl ArrowPageWriter {
598 #[cfg(feature = "encryption")]
599 pub fn with_encryptor(mut self, page_encryptor: Option<PageEncryptor>) -> Self {
600 self.page_encryptor = page_encryptor;
601 self
602 }
603
604 #[cfg(feature = "encryption")]
605 fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
606 self.page_encryptor.as_mut()
607 }
608
609 #[cfg(not(feature = "encryption"))]
610 fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
611 None
612 }
613}
614
615impl PageWriter for ArrowPageWriter {
616 fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
617 let page = match self.page_encryptor_mut() {
618 Some(page_encryptor) => page_encryptor.encrypt_compressed_page(page)?,
619 None => page,
620 };
621
622 let page_header = page.to_thrift_header()?;
623 let header = {
624 let mut header = Vec::with_capacity(1024);
625
626 match self.page_encryptor_mut() {
627 Some(page_encryptor) => {
628 page_encryptor.encrypt_page_header(&page_header, &mut header)?;
629 if page.compressed_page().is_data_page() {
630 page_encryptor.increment_page();
631 }
632 }
633 None => {
634 let mut protocol = ThriftCompactOutputProtocol::new(&mut header);
635 page_header.write_thrift(&mut protocol)?;
636 }
637 };
638
639 Bytes::from(header)
640 };
641
642 let mut buf = self.buffer.try_lock().unwrap();
643
644 let data = page.compressed_page().buffer().clone();
645 let compressed_size = data.len() + header.len();
646
647 let mut spec = PageWriteSpec::new();
648 spec.page_type = page.page_type();
649 spec.num_values = page.num_values();
650 spec.uncompressed_size = page.uncompressed_size() + header.len();
651 spec.offset = buf.length as u64;
652 spec.compressed_size = compressed_size;
653 spec.bytes_written = compressed_size as u64;
654
655 buf.length += compressed_size;
656 buf.data.push(header);
657 buf.data.push(data);
658
659 Ok(spec)
660 }
661
662 fn close(&mut self) -> Result<()> {
663 Ok(())
664 }
665}
666
667#[derive(Debug)]
669pub struct ArrowLeafColumn(ArrayLevels);
670
671pub fn compute_leaves(field: &Field, array: &ArrayRef) -> Result<Vec<ArrowLeafColumn>> {
676 let levels = calculate_array_levels(array, field)?;
677 Ok(levels.into_iter().map(ArrowLeafColumn).collect())
678}
679
680pub struct ArrowColumnChunk {
682 data: ArrowColumnChunkData,
683 close: ColumnCloseResult,
684}
685
686impl std::fmt::Debug for ArrowColumnChunk {
687 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
688 f.debug_struct("ArrowColumnChunk")
689 .field("length", &self.data.length)
690 .finish_non_exhaustive()
691 }
692}
693
694impl ArrowColumnChunk {
695 pub fn append_to_row_group<W: Write + Send>(
697 self,
698 writer: &mut SerializedRowGroupWriter<'_, W>,
699 ) -> Result<()> {
700 writer.append_column(&self.data, self.close)
701 }
702}
703
704pub struct ArrowColumnWriter {
802 writer: ArrowColumnWriterImpl,
803 chunk: SharedColumnChunk,
804}
805
806impl std::fmt::Debug for ArrowColumnWriter {
807 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
808 f.debug_struct("ArrowColumnWriter").finish_non_exhaustive()
809 }
810}
811
812enum ArrowColumnWriterImpl {
813 ByteArray(GenericColumnWriter<'static, ByteArrayEncoder>),
814 Column(ColumnWriter<'static>),
815}
816
817impl ArrowColumnWriter {
818 pub fn write(&mut self, col: &ArrowLeafColumn) -> Result<()> {
820 match &mut self.writer {
821 ArrowColumnWriterImpl::Column(c) => {
822 write_leaf(c, &col.0)?;
823 }
824 ArrowColumnWriterImpl::ByteArray(c) => {
825 write_primitive(c, col.0.array().as_ref(), &col.0)?;
826 }
827 }
828 Ok(())
829 }
830
831 pub fn close(self) -> Result<ArrowColumnChunk> {
833 let close = match self.writer {
834 ArrowColumnWriterImpl::ByteArray(c) => c.close()?,
835 ArrowColumnWriterImpl::Column(c) => c.close()?,
836 };
837 let chunk = Arc::try_unwrap(self.chunk).ok().unwrap();
838 let data = chunk.into_inner().unwrap();
839 Ok(ArrowColumnChunk { data, close })
840 }
841
842 pub fn memory_size(&self) -> usize {
853 match &self.writer {
854 ArrowColumnWriterImpl::ByteArray(c) => c.memory_size(),
855 ArrowColumnWriterImpl::Column(c) => c.memory_size(),
856 }
857 }
858
859 pub fn get_estimated_total_bytes(&self) -> usize {
867 match &self.writer {
868 ArrowColumnWriterImpl::ByteArray(c) => c.get_estimated_total_bytes() as _,
869 ArrowColumnWriterImpl::Column(c) => c.get_estimated_total_bytes() as _,
870 }
871 }
872}
873
874#[derive(Debug)]
881struct ArrowRowGroupWriter {
882 writers: Vec<ArrowColumnWriter>,
883 schema: SchemaRef,
884 buffered_rows: usize,
885}
886
887impl ArrowRowGroupWriter {
888 fn new(writers: Vec<ArrowColumnWriter>, arrow: &SchemaRef) -> Self {
889 Self {
890 writers,
891 schema: arrow.clone(),
892 buffered_rows: 0,
893 }
894 }
895
896 fn write(&mut self, batch: &RecordBatch) -> Result<()> {
897 self.buffered_rows += batch.num_rows();
898 let mut writers = self.writers.iter_mut();
899 for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
900 for leaf in compute_leaves(field.as_ref(), column)? {
901 writers.next().unwrap().write(&leaf)?
902 }
903 }
904 Ok(())
905 }
906
907 fn close(self) -> Result<Vec<ArrowColumnChunk>> {
908 self.writers
909 .into_iter()
910 .map(|writer| writer.close())
911 .collect()
912 }
913}
914
915#[derive(Debug)]
920pub struct ArrowRowGroupWriterFactory {
921 schema: SchemaDescPtr,
922 arrow_schema: SchemaRef,
923 props: WriterPropertiesPtr,
924 #[cfg(feature = "encryption")]
925 file_encryptor: Option<Arc<FileEncryptor>>,
926}
927
928impl ArrowRowGroupWriterFactory {
929 pub fn new<W: Write + Send>(
931 file_writer: &SerializedFileWriter<W>,
932 arrow_schema: SchemaRef,
933 ) -> Self {
934 let schema = Arc::clone(file_writer.schema_descr_ptr());
935 let props = Arc::clone(file_writer.properties());
936 Self {
937 schema,
938 arrow_schema,
939 props,
940 #[cfg(feature = "encryption")]
941 file_encryptor: file_writer.file_encryptor(),
942 }
943 }
944
945 fn create_row_group_writer(&self, row_group_index: usize) -> Result<ArrowRowGroupWriter> {
946 let writers = self.create_column_writers(row_group_index)?;
947 Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema))
948 }
949
950 pub fn create_column_writers(&self, row_group_index: usize) -> Result<Vec<ArrowColumnWriter>> {
952 let mut writers = Vec::with_capacity(self.arrow_schema.fields.len());
953 let mut leaves = self.schema.columns().iter();
954 let column_factory = self.column_writer_factory(row_group_index);
955 for field in &self.arrow_schema.fields {
956 column_factory.get_arrow_column_writer(
957 field.data_type(),
958 &self.props,
959 &mut leaves,
960 &mut writers,
961 )?;
962 }
963 Ok(writers)
964 }
965
966 #[cfg(feature = "encryption")]
967 fn column_writer_factory(&self, row_group_idx: usize) -> ArrowColumnWriterFactory {
968 ArrowColumnWriterFactory::new()
969 .with_file_encryptor(row_group_idx, self.file_encryptor.clone())
970 }
971
972 #[cfg(not(feature = "encryption"))]
973 fn column_writer_factory(&self, _row_group_idx: usize) -> ArrowColumnWriterFactory {
974 ArrowColumnWriterFactory::new()
975 }
976}
977
978#[deprecated(since = "57.0.0", note = "Use `ArrowRowGroupWriterFactory` instead")]
980pub fn get_column_writers(
981 parquet: &SchemaDescriptor,
982 props: &WriterPropertiesPtr,
983 arrow: &SchemaRef,
984) -> Result<Vec<ArrowColumnWriter>> {
985 let mut writers = Vec::with_capacity(arrow.fields.len());
986 let mut leaves = parquet.columns().iter();
987 let column_factory = ArrowColumnWriterFactory::new();
988 for field in &arrow.fields {
989 column_factory.get_arrow_column_writer(
990 field.data_type(),
991 props,
992 &mut leaves,
993 &mut writers,
994 )?;
995 }
996 Ok(writers)
997}
998
999struct ArrowColumnWriterFactory {
1001 #[cfg(feature = "encryption")]
1002 row_group_index: usize,
1003 #[cfg(feature = "encryption")]
1004 file_encryptor: Option<Arc<FileEncryptor>>,
1005}
1006
1007impl ArrowColumnWriterFactory {
1008 pub fn new() -> Self {
1009 Self {
1010 #[cfg(feature = "encryption")]
1011 row_group_index: 0,
1012 #[cfg(feature = "encryption")]
1013 file_encryptor: None,
1014 }
1015 }
1016
1017 #[cfg(feature = "encryption")]
1018 pub fn with_file_encryptor(
1019 mut self,
1020 row_group_index: usize,
1021 file_encryptor: Option<Arc<FileEncryptor>>,
1022 ) -> Self {
1023 self.row_group_index = row_group_index;
1024 self.file_encryptor = file_encryptor;
1025 self
1026 }
1027
1028 #[cfg(feature = "encryption")]
1029 fn create_page_writer(
1030 &self,
1031 column_descriptor: &ColumnDescPtr,
1032 column_index: usize,
1033 ) -> Result<Box<ArrowPageWriter>> {
1034 let column_path = column_descriptor.path().string();
1035 let page_encryptor = PageEncryptor::create_if_column_encrypted(
1036 &self.file_encryptor,
1037 self.row_group_index,
1038 column_index,
1039 &column_path,
1040 )?;
1041 Ok(Box::new(
1042 ArrowPageWriter::default().with_encryptor(page_encryptor),
1043 ))
1044 }
1045
1046 #[cfg(not(feature = "encryption"))]
1047 fn create_page_writer(
1048 &self,
1049 _column_descriptor: &ColumnDescPtr,
1050 _column_index: usize,
1051 ) -> Result<Box<ArrowPageWriter>> {
1052 Ok(Box::<ArrowPageWriter>::default())
1053 }
1054
1055 fn get_arrow_column_writer(
1058 &self,
1059 data_type: &ArrowDataType,
1060 props: &WriterPropertiesPtr,
1061 leaves: &mut Iter<'_, ColumnDescPtr>,
1062 out: &mut Vec<ArrowColumnWriter>,
1063 ) -> Result<()> {
1064 let col = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
1066 let page_writer = self.create_page_writer(desc, out.len())?;
1067 let chunk = page_writer.buffer.clone();
1068 let writer = get_column_writer(desc.clone(), props.clone(), page_writer);
1069 Ok(ArrowColumnWriter {
1070 chunk,
1071 writer: ArrowColumnWriterImpl::Column(writer),
1072 })
1073 };
1074
1075 let bytes = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
1077 let page_writer = self.create_page_writer(desc, out.len())?;
1078 let chunk = page_writer.buffer.clone();
1079 let writer = GenericColumnWriter::new(desc.clone(), props.clone(), page_writer);
1080 Ok(ArrowColumnWriter {
1081 chunk,
1082 writer: ArrowColumnWriterImpl::ByteArray(writer),
1083 })
1084 };
1085
1086 match data_type {
1087 _ if data_type.is_primitive() => out.push(col(leaves.next().unwrap())?),
1088 ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Boolean | ArrowDataType::Null => {
1089 out.push(col(leaves.next().unwrap())?)
1090 }
1091 ArrowDataType::LargeBinary
1092 | ArrowDataType::Binary
1093 | ArrowDataType::Utf8
1094 | ArrowDataType::LargeUtf8
1095 | ArrowDataType::BinaryView
1096 | ArrowDataType::Utf8View => out.push(bytes(leaves.next().unwrap())?),
1097 ArrowDataType::List(f)
1098 | ArrowDataType::LargeList(f)
1099 | ArrowDataType::FixedSizeList(f, _) => {
1100 self.get_arrow_column_writer(f.data_type(), props, leaves, out)?
1101 }
1102 ArrowDataType::Struct(fields) => {
1103 for field in fields {
1104 self.get_arrow_column_writer(field.data_type(), props, leaves, out)?
1105 }
1106 }
1107 ArrowDataType::Map(f, _) => match f.data_type() {
1108 ArrowDataType::Struct(f) => {
1109 self.get_arrow_column_writer(f[0].data_type(), props, leaves, out)?;
1110 self.get_arrow_column_writer(f[1].data_type(), props, leaves, out)?
1111 }
1112 _ => unreachable!("invalid map type"),
1113 },
1114 ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1115 ArrowDataType::Utf8
1116 | ArrowDataType::LargeUtf8
1117 | ArrowDataType::Binary
1118 | ArrowDataType::LargeBinary => out.push(bytes(leaves.next().unwrap())?),
1119 ArrowDataType::Utf8View | ArrowDataType::BinaryView => {
1120 out.push(bytes(leaves.next().unwrap())?)
1121 }
1122 ArrowDataType::FixedSizeBinary(_) => out.push(bytes(leaves.next().unwrap())?),
1123 _ => out.push(col(leaves.next().unwrap())?),
1124 },
1125 _ => {
1126 return Err(ParquetError::NYI(format!(
1127 "Attempting to write an Arrow type {data_type} to parquet that is not yet implemented"
1128 )));
1129 }
1130 }
1131 Ok(())
1132 }
1133}
1134
1135fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result<usize> {
1136 let column = levels.array().as_ref();
1137 let indices = levels.non_null_indices();
1138 match writer {
1139 ColumnWriter::Int32ColumnWriter(typed) => {
1140 match column.data_type() {
1141 ArrowDataType::Date64 => {
1142 let array = arrow_cast::cast(column, &ArrowDataType::Date32)?;
1144 let array = arrow_cast::cast(&array, &ArrowDataType::Int32)?;
1145
1146 let array = array.as_primitive::<Int32Type>();
1147 write_primitive(typed, array.values(), levels)
1148 }
1149 ArrowDataType::UInt32 => {
1150 let values = column.as_primitive::<UInt32Type>().values();
1151 let array = values.inner().typed_data::<i32>();
1154 write_primitive(typed, array, levels)
1155 }
1156 ArrowDataType::Decimal32(_, _) => {
1157 let array = column
1158 .as_primitive::<Decimal32Type>()
1159 .unary::<_, Int32Type>(|v| v);
1160 write_primitive(typed, array.values(), levels)
1161 }
1162 ArrowDataType::Decimal64(_, _) => {
1163 let array = column
1165 .as_primitive::<Decimal64Type>()
1166 .unary::<_, Int32Type>(|v| v as i32);
1167 write_primitive(typed, array.values(), levels)
1168 }
1169 ArrowDataType::Decimal128(_, _) => {
1170 let array = column
1172 .as_primitive::<Decimal128Type>()
1173 .unary::<_, Int32Type>(|v| v as i32);
1174 write_primitive(typed, array.values(), levels)
1175 }
1176 ArrowDataType::Decimal256(_, _) => {
1177 let array = column
1179 .as_primitive::<Decimal256Type>()
1180 .unary::<_, Int32Type>(|v| v.as_i128() as i32);
1181 write_primitive(typed, array.values(), levels)
1182 }
1183 ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1184 ArrowDataType::Decimal32(_, _) => {
1185 let array = arrow_cast::cast(column, value_type)?;
1186 let array = array
1187 .as_primitive::<Decimal32Type>()
1188 .unary::<_, Int32Type>(|v| v);
1189 write_primitive(typed, array.values(), levels)
1190 }
1191 ArrowDataType::Decimal64(_, _) => {
1192 let array = arrow_cast::cast(column, value_type)?;
1193 let array = array
1194 .as_primitive::<Decimal64Type>()
1195 .unary::<_, Int32Type>(|v| v as i32);
1196 write_primitive(typed, array.values(), levels)
1197 }
1198 ArrowDataType::Decimal128(_, _) => {
1199 let array = arrow_cast::cast(column, value_type)?;
1200 let array = array
1201 .as_primitive::<Decimal128Type>()
1202 .unary::<_, Int32Type>(|v| v as i32);
1203 write_primitive(typed, array.values(), levels)
1204 }
1205 ArrowDataType::Decimal256(_, _) => {
1206 let array = arrow_cast::cast(column, value_type)?;
1207 let array = array
1208 .as_primitive::<Decimal256Type>()
1209 .unary::<_, Int32Type>(|v| v.as_i128() as i32);
1210 write_primitive(typed, array.values(), levels)
1211 }
1212 _ => {
1213 let array = arrow_cast::cast(column, &ArrowDataType::Int32)?;
1214 let array = array.as_primitive::<Int32Type>();
1215 write_primitive(typed, array.values(), levels)
1216 }
1217 },
1218 _ => {
1219 let array = arrow_cast::cast(column, &ArrowDataType::Int32)?;
1220 let array = array.as_primitive::<Int32Type>();
1221 write_primitive(typed, array.values(), levels)
1222 }
1223 }
1224 }
1225 ColumnWriter::BoolColumnWriter(typed) => {
1226 let array = column.as_boolean();
1227 typed.write_batch(
1228 get_bool_array_slice(array, indices).as_slice(),
1229 levels.def_levels(),
1230 levels.rep_levels(),
1231 )
1232 }
1233 ColumnWriter::Int64ColumnWriter(typed) => {
1234 match column.data_type() {
1235 ArrowDataType::Date64 => {
1236 let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
1237
1238 let array = array.as_primitive::<Int64Type>();
1239 write_primitive(typed, array.values(), levels)
1240 }
1241 ArrowDataType::Int64 => {
1242 let array = column.as_primitive::<Int64Type>();
1243 write_primitive(typed, array.values(), levels)
1244 }
1245 ArrowDataType::UInt64 => {
1246 let values = column.as_primitive::<UInt64Type>().values();
1247 let array = values.inner().typed_data::<i64>();
1250 write_primitive(typed, array, levels)
1251 }
1252 ArrowDataType::Decimal64(_, _) => {
1253 let array = column
1254 .as_primitive::<Decimal64Type>()
1255 .unary::<_, Int64Type>(|v| v);
1256 write_primitive(typed, array.values(), levels)
1257 }
1258 ArrowDataType::Decimal128(_, _) => {
1259 let array = column
1261 .as_primitive::<Decimal128Type>()
1262 .unary::<_, Int64Type>(|v| v as i64);
1263 write_primitive(typed, array.values(), levels)
1264 }
1265 ArrowDataType::Decimal256(_, _) => {
1266 let array = column
1268 .as_primitive::<Decimal256Type>()
1269 .unary::<_, Int64Type>(|v| v.as_i128() as i64);
1270 write_primitive(typed, array.values(), levels)
1271 }
1272 ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1273 ArrowDataType::Decimal64(_, _) => {
1274 let array = arrow_cast::cast(column, value_type)?;
1275 let array = array
1276 .as_primitive::<Decimal64Type>()
1277 .unary::<_, Int64Type>(|v| v);
1278 write_primitive(typed, array.values(), levels)
1279 }
1280 ArrowDataType::Decimal128(_, _) => {
1281 let array = arrow_cast::cast(column, value_type)?;
1282 let array = array
1283 .as_primitive::<Decimal128Type>()
1284 .unary::<_, Int64Type>(|v| v as i64);
1285 write_primitive(typed, array.values(), levels)
1286 }
1287 ArrowDataType::Decimal256(_, _) => {
1288 let array = arrow_cast::cast(column, value_type)?;
1289 let array = array
1290 .as_primitive::<Decimal256Type>()
1291 .unary::<_, Int64Type>(|v| v.as_i128() as i64);
1292 write_primitive(typed, array.values(), levels)
1293 }
1294 _ => {
1295 let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
1296 let array = array.as_primitive::<Int64Type>();
1297 write_primitive(typed, array.values(), levels)
1298 }
1299 },
1300 _ => {
1301 let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
1302 let array = array.as_primitive::<Int64Type>();
1303 write_primitive(typed, array.values(), levels)
1304 }
1305 }
1306 }
1307 ColumnWriter::Int96ColumnWriter(_typed) => {
1308 unreachable!("Currently unreachable because data type not supported")
1309 }
1310 ColumnWriter::FloatColumnWriter(typed) => {
1311 let array = column.as_primitive::<Float32Type>();
1312 write_primitive(typed, array.values(), levels)
1313 }
1314 ColumnWriter::DoubleColumnWriter(typed) => {
1315 let array = column.as_primitive::<Float64Type>();
1316 write_primitive(typed, array.values(), levels)
1317 }
1318 ColumnWriter::ByteArrayColumnWriter(_) => {
1319 unreachable!("should use ByteArrayWriter")
1320 }
1321 ColumnWriter::FixedLenByteArrayColumnWriter(typed) => {
1322 let bytes = match column.data_type() {
1323 ArrowDataType::Interval(interval_unit) => match interval_unit {
1324 IntervalUnit::YearMonth => {
1325 let array = column
1326 .as_any()
1327 .downcast_ref::<arrow_array::IntervalYearMonthArray>()
1328 .unwrap();
1329 get_interval_ym_array_slice(array, indices)
1330 }
1331 IntervalUnit::DayTime => {
1332 let array = column
1333 .as_any()
1334 .downcast_ref::<arrow_array::IntervalDayTimeArray>()
1335 .unwrap();
1336 get_interval_dt_array_slice(array, indices)
1337 }
1338 _ => {
1339 return Err(ParquetError::NYI(format!(
1340 "Attempting to write an Arrow interval type {interval_unit:?} to parquet that is not yet implemented"
1341 )));
1342 }
1343 },
1344 ArrowDataType::FixedSizeBinary(_) => {
1345 let array = column
1346 .as_any()
1347 .downcast_ref::<arrow_array::FixedSizeBinaryArray>()
1348 .unwrap();
1349 get_fsb_array_slice(array, indices)
1350 }
1351 ArrowDataType::Decimal32(_, _) => {
1352 let array = column.as_primitive::<Decimal32Type>();
1353 get_decimal_32_array_slice(array, indices)
1354 }
1355 ArrowDataType::Decimal64(_, _) => {
1356 let array = column.as_primitive::<Decimal64Type>();
1357 get_decimal_64_array_slice(array, indices)
1358 }
1359 ArrowDataType::Decimal128(_, _) => {
1360 let array = column.as_primitive::<Decimal128Type>();
1361 get_decimal_128_array_slice(array, indices)
1362 }
1363 ArrowDataType::Decimal256(_, _) => {
1364 let array = column
1365 .as_any()
1366 .downcast_ref::<arrow_array::Decimal256Array>()
1367 .unwrap();
1368 get_decimal_256_array_slice(array, indices)
1369 }
1370 ArrowDataType::Float16 => {
1371 let array = column.as_primitive::<Float16Type>();
1372 get_float_16_array_slice(array, indices)
1373 }
1374 _ => {
1375 return Err(ParquetError::NYI(
1376 "Attempting to write an Arrow type that is not yet implemented".to_string(),
1377 ));
1378 }
1379 };
1380 typed.write_batch(bytes.as_slice(), levels.def_levels(), levels.rep_levels())
1381 }
1382 }
1383}
1384
1385fn write_primitive<E: ColumnValueEncoder>(
1386 writer: &mut GenericColumnWriter<E>,
1387 values: &E::Values,
1388 levels: &ArrayLevels,
1389) -> Result<usize> {
1390 writer.write_batch_internal(
1391 values,
1392 Some(levels.non_null_indices()),
1393 levels.def_levels(),
1394 levels.rep_levels(),
1395 None,
1396 None,
1397 None,
1398 )
1399}
1400
1401fn get_bool_array_slice(array: &arrow_array::BooleanArray, indices: &[usize]) -> Vec<bool> {
1402 let mut values = Vec::with_capacity(indices.len());
1403 for i in indices {
1404 values.push(array.value(*i))
1405 }
1406 values
1407}
1408
1409fn get_interval_ym_array_slice(
1412 array: &arrow_array::IntervalYearMonthArray,
1413 indices: &[usize],
1414) -> Vec<FixedLenByteArray> {
1415 let mut values = Vec::with_capacity(indices.len());
1416 for i in indices {
1417 let mut value = array.value(*i).to_le_bytes().to_vec();
1418 let mut suffix = vec![0; 8];
1419 value.append(&mut suffix);
1420 values.push(FixedLenByteArray::from(ByteArray::from(value)))
1421 }
1422 values
1423}
1424
1425fn get_interval_dt_array_slice(
1428 array: &arrow_array::IntervalDayTimeArray,
1429 indices: &[usize],
1430) -> Vec<FixedLenByteArray> {
1431 let mut values = Vec::with_capacity(indices.len());
1432 for i in indices {
1433 let mut out = [0; 12];
1434 let value = array.value(*i);
1435 out[4..8].copy_from_slice(&value.days.to_le_bytes());
1436 out[8..12].copy_from_slice(&value.milliseconds.to_le_bytes());
1437 values.push(FixedLenByteArray::from(ByteArray::from(out.to_vec())));
1438 }
1439 values
1440}
1441
1442fn get_decimal_32_array_slice(
1443 array: &arrow_array::Decimal32Array,
1444 indices: &[usize],
1445) -> Vec<FixedLenByteArray> {
1446 let mut values = Vec::with_capacity(indices.len());
1447 let size = decimal_length_from_precision(array.precision());
1448 for i in indices {
1449 let as_be_bytes = array.value(*i).to_be_bytes();
1450 let resized_value = as_be_bytes[(4 - size)..].to_vec();
1451 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1452 }
1453 values
1454}
1455
1456fn get_decimal_64_array_slice(
1457 array: &arrow_array::Decimal64Array,
1458 indices: &[usize],
1459) -> Vec<FixedLenByteArray> {
1460 let mut values = Vec::with_capacity(indices.len());
1461 let size = decimal_length_from_precision(array.precision());
1462 for i in indices {
1463 let as_be_bytes = array.value(*i).to_be_bytes();
1464 let resized_value = as_be_bytes[(8 - size)..].to_vec();
1465 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1466 }
1467 values
1468}
1469
1470fn get_decimal_128_array_slice(
1471 array: &arrow_array::Decimal128Array,
1472 indices: &[usize],
1473) -> Vec<FixedLenByteArray> {
1474 let mut values = Vec::with_capacity(indices.len());
1475 let size = decimal_length_from_precision(array.precision());
1476 for i in indices {
1477 let as_be_bytes = array.value(*i).to_be_bytes();
1478 let resized_value = as_be_bytes[(16 - size)..].to_vec();
1479 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1480 }
1481 values
1482}
1483
1484fn get_decimal_256_array_slice(
1485 array: &arrow_array::Decimal256Array,
1486 indices: &[usize],
1487) -> Vec<FixedLenByteArray> {
1488 let mut values = Vec::with_capacity(indices.len());
1489 let size = decimal_length_from_precision(array.precision());
1490 for i in indices {
1491 let as_be_bytes = array.value(*i).to_be_bytes();
1492 let resized_value = as_be_bytes[(32 - size)..].to_vec();
1493 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1494 }
1495 values
1496}
1497
1498fn get_float_16_array_slice(
1499 array: &arrow_array::Float16Array,
1500 indices: &[usize],
1501) -> Vec<FixedLenByteArray> {
1502 let mut values = Vec::with_capacity(indices.len());
1503 for i in indices {
1504 let value = array.value(*i).to_le_bytes().to_vec();
1505 values.push(FixedLenByteArray::from(ByteArray::from(value)));
1506 }
1507 values
1508}
1509
1510fn get_fsb_array_slice(
1511 array: &arrow_array::FixedSizeBinaryArray,
1512 indices: &[usize],
1513) -> Vec<FixedLenByteArray> {
1514 let mut values = Vec::with_capacity(indices.len());
1515 for i in indices {
1516 let value = array.value(*i).to_vec();
1517 values.push(FixedLenByteArray::from(ByteArray::from(value)))
1518 }
1519 values
1520}
1521
1522#[cfg(test)]
1523mod tests {
1524 use super::*;
1525
1526 use std::fs::File;
1527
1528 use crate::arrow::ARROW_SCHEMA_META_KEY;
1529 use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
1530 use crate::column::page::{Page, PageReader};
1531 use crate::file::metadata::thrift::PageHeader;
1532 use crate::file::page_index::column_index::ColumnIndexMetaData;
1533 use crate::file::reader::SerializedPageReader;
1534 use crate::parquet_thrift::{ReadThrift, ThriftSliceInputProtocol};
1535 use crate::schema::types::{ColumnPath, Type};
1536 use arrow::datatypes::ToByteSlice;
1537 use arrow::datatypes::{DataType, Schema};
1538 use arrow::error::Result as ArrowResult;
1539 use arrow::util::data_gen::create_random_array;
1540 use arrow::util::pretty::pretty_format_batches;
1541 use arrow::{array::*, buffer::Buffer};
1542 use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano, NullBuffer, i256};
1543 use arrow_schema::Fields;
1544 use half::f16;
1545 use num_traits::{FromPrimitive, ToPrimitive};
1546 use tempfile::tempfile;
1547
1548 use crate::basic::Encoding;
1549 use crate::data_type::AsBytes;
1550 use crate::file::metadata::{ColumnChunkMetaData, ParquetMetaData, ParquetMetaDataReader};
1551 use crate::file::properties::{
1552 BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion,
1553 };
1554 use crate::file::serialized_reader::ReadOptionsBuilder;
1555 use crate::file::{
1556 reader::{FileReader, SerializedFileReader},
1557 statistics::Statistics,
1558 };
1559
1560 #[test]
1561 fn arrow_writer() {
1562 let schema = Schema::new(vec![
1564 Field::new("a", DataType::Int32, false),
1565 Field::new("b", DataType::Int32, true),
1566 ]);
1567
1568 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1570 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1571
1572 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
1574
1575 roundtrip(batch, Some(SMALL_SIZE / 2));
1576 }
1577
1578 fn get_bytes_after_close(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1579 let mut buffer = vec![];
1580
1581 let mut writer = ArrowWriter::try_new(&mut buffer, schema, None).unwrap();
1582 writer.write(expected_batch).unwrap();
1583 writer.close().unwrap();
1584
1585 buffer
1586 }
1587
1588 fn get_bytes_by_into_inner(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1589 let mut writer = ArrowWriter::try_new(Vec::new(), schema, None).unwrap();
1590 writer.write(expected_batch).unwrap();
1591 writer.into_inner().unwrap()
1592 }
1593
1594 #[test]
1595 fn roundtrip_bytes() {
1596 let schema = Arc::new(Schema::new(vec![
1598 Field::new("a", DataType::Int32, false),
1599 Field::new("b", DataType::Int32, true),
1600 ]));
1601
1602 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1604 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1605
1606 let expected_batch =
1608 RecordBatch::try_new(schema.clone(), vec![Arc::new(a), Arc::new(b)]).unwrap();
1609
1610 for buffer in [
1611 get_bytes_after_close(schema.clone(), &expected_batch),
1612 get_bytes_by_into_inner(schema, &expected_batch),
1613 ] {
1614 let cursor = Bytes::from(buffer);
1615 let mut record_batch_reader = ParquetRecordBatchReader::try_new(cursor, 1024).unwrap();
1616
1617 let actual_batch = record_batch_reader
1618 .next()
1619 .expect("No batch found")
1620 .expect("Unable to get batch");
1621
1622 assert_eq!(expected_batch.schema(), actual_batch.schema());
1623 assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
1624 assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
1625 for i in 0..expected_batch.num_columns() {
1626 let expected_data = expected_batch.column(i).to_data();
1627 let actual_data = actual_batch.column(i).to_data();
1628
1629 assert_eq!(expected_data, actual_data);
1630 }
1631 }
1632 }
1633
1634 #[test]
1635 fn arrow_writer_non_null() {
1636 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1638
1639 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1641
1642 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1644
1645 roundtrip(batch, Some(SMALL_SIZE / 2));
1646 }
1647
1648 #[test]
1649 fn arrow_writer_list() {
1650 let schema = Schema::new(vec![Field::new(
1652 "a",
1653 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
1654 true,
1655 )]);
1656
1657 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1659
1660 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1663
1664 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
1666 DataType::Int32,
1667 false,
1668 ))))
1669 .len(5)
1670 .add_buffer(a_value_offsets)
1671 .add_child_data(a_values.into_data())
1672 .null_bit_buffer(Some(Buffer::from([0b00011011])))
1673 .build()
1674 .unwrap();
1675 let a = ListArray::from(a_list_data);
1676
1677 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1679
1680 assert_eq!(batch.column(0).null_count(), 1);
1681
1682 roundtrip(batch, None);
1685 }
1686
1687 #[test]
1688 fn arrow_writer_list_non_null() {
1689 let schema = Schema::new(vec![Field::new(
1691 "a",
1692 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
1693 false,
1694 )]);
1695
1696 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1698
1699 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1702
1703 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
1705 DataType::Int32,
1706 false,
1707 ))))
1708 .len(5)
1709 .add_buffer(a_value_offsets)
1710 .add_child_data(a_values.into_data())
1711 .build()
1712 .unwrap();
1713 let a = ListArray::from(a_list_data);
1714
1715 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1717
1718 assert_eq!(batch.column(0).null_count(), 0);
1721
1722 roundtrip(batch, None);
1723 }
1724
1725 #[test]
1726 fn arrow_writer_binary() {
1727 let string_field = Field::new("a", DataType::Utf8, false);
1728 let binary_field = Field::new("b", DataType::Binary, false);
1729 let schema = Schema::new(vec![string_field, binary_field]);
1730
1731 let raw_string_values = vec!["foo", "bar", "baz", "quux"];
1732 let raw_binary_values = [
1733 b"foo".to_vec(),
1734 b"bar".to_vec(),
1735 b"baz".to_vec(),
1736 b"quux".to_vec(),
1737 ];
1738 let raw_binary_value_refs = raw_binary_values
1739 .iter()
1740 .map(|x| x.as_slice())
1741 .collect::<Vec<_>>();
1742
1743 let string_values = StringArray::from(raw_string_values.clone());
1744 let binary_values = BinaryArray::from(raw_binary_value_refs);
1745 let batch = RecordBatch::try_new(
1746 Arc::new(schema),
1747 vec![Arc::new(string_values), Arc::new(binary_values)],
1748 )
1749 .unwrap();
1750
1751 roundtrip(batch, Some(SMALL_SIZE / 2));
1752 }
1753
1754 #[test]
1755 fn arrow_writer_binary_view() {
1756 let string_field = Field::new("a", DataType::Utf8View, false);
1757 let binary_field = Field::new("b", DataType::BinaryView, false);
1758 let nullable_string_field = Field::new("a", DataType::Utf8View, true);
1759 let schema = Schema::new(vec![string_field, binary_field, nullable_string_field]);
1760
1761 let raw_string_values = vec!["foo", "bar", "large payload over 12 bytes", "lulu"];
1762 let raw_binary_values = vec![
1763 b"foo".to_vec(),
1764 b"bar".to_vec(),
1765 b"large payload over 12 bytes".to_vec(),
1766 b"lulu".to_vec(),
1767 ];
1768 let nullable_string_values =
1769 vec![Some("foo"), None, Some("large payload over 12 bytes"), None];
1770
1771 let string_view_values = StringViewArray::from(raw_string_values);
1772 let binary_view_values = BinaryViewArray::from_iter_values(raw_binary_values);
1773 let nullable_string_view_values = StringViewArray::from(nullable_string_values);
1774 let batch = RecordBatch::try_new(
1775 Arc::new(schema),
1776 vec![
1777 Arc::new(string_view_values),
1778 Arc::new(binary_view_values),
1779 Arc::new(nullable_string_view_values),
1780 ],
1781 )
1782 .unwrap();
1783
1784 roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
1785 roundtrip(batch, None);
1786 }
1787
1788 fn get_decimal_batch(precision: u8, scale: i8) -> RecordBatch {
1789 let decimal_field = Field::new("a", DataType::Decimal128(precision, scale), false);
1790 let schema = Schema::new(vec![decimal_field]);
1791
1792 let decimal_values = vec![10_000, 50_000, 0, -100]
1793 .into_iter()
1794 .map(Some)
1795 .collect::<Decimal128Array>()
1796 .with_precision_and_scale(precision, scale)
1797 .unwrap();
1798
1799 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(decimal_values)]).unwrap()
1800 }
1801
1802 #[test]
1803 fn arrow_writer_decimal() {
1804 let batch_int32_decimal = get_decimal_batch(5, 2);
1806 roundtrip(batch_int32_decimal, Some(SMALL_SIZE / 2));
1807 let batch_int64_decimal = get_decimal_batch(12, 2);
1809 roundtrip(batch_int64_decimal, Some(SMALL_SIZE / 2));
1810 let batch_fixed_len_byte_array_decimal = get_decimal_batch(30, 2);
1812 roundtrip(batch_fixed_len_byte_array_decimal, Some(SMALL_SIZE / 2));
1813 }
1814
1815 #[test]
1816 fn arrow_writer_complex() {
1817 let struct_field_d = Arc::new(Field::new("d", DataType::Float64, true));
1819 let struct_field_f = Arc::new(Field::new("f", DataType::Float32, true));
1820 let struct_field_g = Arc::new(Field::new_list(
1821 "g",
1822 Field::new_list_field(DataType::Int16, true),
1823 false,
1824 ));
1825 let struct_field_h = Arc::new(Field::new_list(
1826 "h",
1827 Field::new_list_field(DataType::Int16, false),
1828 true,
1829 ));
1830 let struct_field_e = Arc::new(Field::new_struct(
1831 "e",
1832 vec![
1833 struct_field_f.clone(),
1834 struct_field_g.clone(),
1835 struct_field_h.clone(),
1836 ],
1837 false,
1838 ));
1839 let schema = Schema::new(vec![
1840 Field::new("a", DataType::Int32, false),
1841 Field::new("b", DataType::Int32, true),
1842 Field::new_struct(
1843 "c",
1844 vec![struct_field_d.clone(), struct_field_e.clone()],
1845 false,
1846 ),
1847 ]);
1848
1849 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1851 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1852 let d = Float64Array::from(vec![None, None, None, Some(1.0), None]);
1853 let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]);
1854
1855 let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1856
1857 let g_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1860
1861 let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
1863 .len(5)
1864 .add_buffer(g_value_offsets.clone())
1865 .add_child_data(g_value.to_data())
1866 .build()
1867 .unwrap();
1868 let g = ListArray::from(g_list_data);
1869 let h_list_data = ArrayData::builder(struct_field_h.data_type().clone())
1871 .len(5)
1872 .add_buffer(g_value_offsets)
1873 .add_child_data(g_value.to_data())
1874 .null_bit_buffer(Some(Buffer::from([0b00011011])))
1875 .build()
1876 .unwrap();
1877 let h = ListArray::from(h_list_data);
1878
1879 let e = StructArray::from(vec![
1880 (struct_field_f, Arc::new(f) as ArrayRef),
1881 (struct_field_g, Arc::new(g) as ArrayRef),
1882 (struct_field_h, Arc::new(h) as ArrayRef),
1883 ]);
1884
1885 let c = StructArray::from(vec![
1886 (struct_field_d, Arc::new(d) as ArrayRef),
1887 (struct_field_e, Arc::new(e) as ArrayRef),
1888 ]);
1889
1890 let batch = RecordBatch::try_new(
1892 Arc::new(schema),
1893 vec![Arc::new(a), Arc::new(b), Arc::new(c)],
1894 )
1895 .unwrap();
1896
1897 roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
1898 roundtrip(batch, Some(SMALL_SIZE / 3));
1899 }
1900
1901 #[test]
1902 fn arrow_writer_complex_mixed() {
1903 let offset_field = Arc::new(Field::new("offset", DataType::Int32, false));
1908 let partition_field = Arc::new(Field::new("partition", DataType::Int64, true));
1909 let topic_field = Arc::new(Field::new("topic", DataType::Utf8, true));
1910 let schema = Schema::new(vec![Field::new(
1911 "some_nested_object",
1912 DataType::Struct(Fields::from(vec![
1913 offset_field.clone(),
1914 partition_field.clone(),
1915 topic_field.clone(),
1916 ])),
1917 false,
1918 )]);
1919
1920 let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
1922 let partition = Int64Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1923 let topic = StringArray::from(vec![Some("A"), None, Some("A"), Some(""), None]);
1924
1925 let some_nested_object = StructArray::from(vec![
1926 (offset_field, Arc::new(offset) as ArrayRef),
1927 (partition_field, Arc::new(partition) as ArrayRef),
1928 (topic_field, Arc::new(topic) as ArrayRef),
1929 ]);
1930
1931 let batch =
1933 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
1934
1935 roundtrip(batch, Some(SMALL_SIZE / 2));
1936 }
1937
1938 #[test]
1939 fn arrow_writer_map() {
1940 let json_content = r#"
1942 {"stocks":{"long": "$AAA", "short": "$BBB"}}
1943 {"stocks":{"long": null, "long": "$CCC", "short": null}}
1944 {"stocks":{"hedged": "$YYY", "long": null, "short": "$D"}}
1945 "#;
1946 let entries_struct_type = DataType::Struct(Fields::from(vec![
1947 Field::new("key", DataType::Utf8, false),
1948 Field::new("value", DataType::Utf8, true),
1949 ]));
1950 let stocks_field = Field::new(
1951 "stocks",
1952 DataType::Map(
1953 Arc::new(Field::new("entries", entries_struct_type, false)),
1954 false,
1955 ),
1956 true,
1957 );
1958 let schema = Arc::new(Schema::new(vec![stocks_field]));
1959 let builder = arrow::json::ReaderBuilder::new(schema).with_batch_size(64);
1960 let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
1961
1962 let batch = reader.next().unwrap().unwrap();
1963 roundtrip(batch, None);
1964 }
1965
1966 #[test]
1967 fn arrow_writer_2_level_struct() {
1968 let field_c = Field::new("c", DataType::Int32, true);
1970 let field_b = Field::new("b", DataType::Struct(vec![field_c].into()), true);
1971 let type_a = DataType::Struct(vec![field_b.clone()].into());
1972 let field_a = Field::new("a", type_a, true);
1973 let schema = Schema::new(vec![field_a.clone()]);
1974
1975 let c = Int32Array::from(vec![Some(1), None, Some(3), None, None, Some(6)]);
1977 let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
1978 .len(6)
1979 .null_bit_buffer(Some(Buffer::from([0b00100111])))
1980 .add_child_data(c.into_data())
1981 .build()
1982 .unwrap();
1983 let b = StructArray::from(b_data);
1984 let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
1985 .len(6)
1986 .null_bit_buffer(Some(Buffer::from([0b00101111])))
1987 .add_child_data(b.into_data())
1988 .build()
1989 .unwrap();
1990 let a = StructArray::from(a_data);
1991
1992 assert_eq!(a.null_count(), 1);
1993 assert_eq!(a.column(0).null_count(), 2);
1994
1995 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1997
1998 roundtrip(batch, Some(SMALL_SIZE / 2));
1999 }
2000
2001 #[test]
2002 fn arrow_writer_2_level_struct_non_null() {
2003 let field_c = Field::new("c", DataType::Int32, false);
2005 let type_b = DataType::Struct(vec![field_c].into());
2006 let field_b = Field::new("b", type_b.clone(), false);
2007 let type_a = DataType::Struct(vec![field_b].into());
2008 let field_a = Field::new("a", type_a.clone(), false);
2009 let schema = Schema::new(vec![field_a]);
2010
2011 let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
2013 let b_data = ArrayDataBuilder::new(type_b)
2014 .len(6)
2015 .add_child_data(c.into_data())
2016 .build()
2017 .unwrap();
2018 let b = StructArray::from(b_data);
2019 let a_data = ArrayDataBuilder::new(type_a)
2020 .len(6)
2021 .add_child_data(b.into_data())
2022 .build()
2023 .unwrap();
2024 let a = StructArray::from(a_data);
2025
2026 assert_eq!(a.null_count(), 0);
2027 assert_eq!(a.column(0).null_count(), 0);
2028
2029 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2031
2032 roundtrip(batch, Some(SMALL_SIZE / 2));
2033 }
2034
2035 #[test]
2036 fn arrow_writer_2_level_struct_mixed_null() {
2037 let field_c = Field::new("c", DataType::Int32, false);
2039 let type_b = DataType::Struct(vec![field_c].into());
2040 let field_b = Field::new("b", type_b.clone(), true);
2041 let type_a = DataType::Struct(vec![field_b].into());
2042 let field_a = Field::new("a", type_a.clone(), false);
2043 let schema = Schema::new(vec![field_a]);
2044
2045 let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
2047 let b_data = ArrayDataBuilder::new(type_b)
2048 .len(6)
2049 .null_bit_buffer(Some(Buffer::from([0b00100111])))
2050 .add_child_data(c.into_data())
2051 .build()
2052 .unwrap();
2053 let b = StructArray::from(b_data);
2054 let a_data = ArrayDataBuilder::new(type_a)
2056 .len(6)
2057 .add_child_data(b.into_data())
2058 .build()
2059 .unwrap();
2060 let a = StructArray::from(a_data);
2061
2062 assert_eq!(a.null_count(), 0);
2063 assert_eq!(a.column(0).null_count(), 2);
2064
2065 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2067
2068 roundtrip(batch, Some(SMALL_SIZE / 2));
2069 }
2070
2071 #[test]
2072 fn arrow_writer_2_level_struct_mixed_null_2() {
2073 let field_c = Field::new("c", DataType::Int32, false);
2075 let field_d = Field::new("d", DataType::FixedSizeBinary(4), false);
2076 let field_e = Field::new(
2077 "e",
2078 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2079 false,
2080 );
2081
2082 let field_b = Field::new(
2083 "b",
2084 DataType::Struct(vec![field_c, field_d, field_e].into()),
2085 false,
2086 );
2087 let type_a = DataType::Struct(vec![field_b.clone()].into());
2088 let field_a = Field::new("a", type_a, true);
2089 let schema = Schema::new(vec![field_a.clone()]);
2090
2091 let c = Int32Array::from_iter_values(0..6);
2093 let d = FixedSizeBinaryArray::try_from_iter(
2094 ["aaaa", "bbbb", "cccc", "dddd", "eeee", "ffff"].into_iter(),
2095 )
2096 .expect("four byte values");
2097 let e = Int32DictionaryArray::from_iter(["one", "two", "three", "four", "five", "one"]);
2098 let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
2099 .len(6)
2100 .add_child_data(c.into_data())
2101 .add_child_data(d.into_data())
2102 .add_child_data(e.into_data())
2103 .build()
2104 .unwrap();
2105 let b = StructArray::from(b_data);
2106 let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
2107 .len(6)
2108 .null_bit_buffer(Some(Buffer::from([0b00100101])))
2109 .add_child_data(b.into_data())
2110 .build()
2111 .unwrap();
2112 let a = StructArray::from(a_data);
2113
2114 assert_eq!(a.null_count(), 3);
2115 assert_eq!(a.column(0).null_count(), 0);
2116
2117 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2119
2120 roundtrip(batch, Some(SMALL_SIZE / 2));
2121 }
2122
2123 #[test]
2124 fn test_fixed_size_binary_in_dict() {
2125 fn test_fixed_size_binary_in_dict_inner<K>()
2126 where
2127 K: ArrowDictionaryKeyType,
2128 K::Native: FromPrimitive + ToPrimitive + TryFrom<u8>,
2129 <<K as arrow_array::ArrowPrimitiveType>::Native as TryFrom<u8>>::Error: std::fmt::Debug,
2130 {
2131 let field = Field::new(
2132 "a",
2133 DataType::Dictionary(
2134 Box::new(K::DATA_TYPE),
2135 Box::new(DataType::FixedSizeBinary(4)),
2136 ),
2137 false,
2138 );
2139 let schema = Schema::new(vec![field]);
2140
2141 let keys: Vec<K::Native> = vec![
2142 K::Native::try_from(0u8).unwrap(),
2143 K::Native::try_from(0u8).unwrap(),
2144 K::Native::try_from(1u8).unwrap(),
2145 ];
2146 let keys = PrimitiveArray::<K>::from_iter_values(keys);
2147 let values = FixedSizeBinaryArray::try_from_iter(
2148 vec![vec![0, 0, 0, 0], vec![1, 1, 1, 1]].into_iter(),
2149 )
2150 .unwrap();
2151
2152 let data = DictionaryArray::<K>::new(keys, Arc::new(values));
2153 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(data)]).unwrap();
2154 roundtrip(batch, None);
2155 }
2156
2157 test_fixed_size_binary_in_dict_inner::<UInt8Type>();
2158 test_fixed_size_binary_in_dict_inner::<UInt16Type>();
2159 test_fixed_size_binary_in_dict_inner::<UInt32Type>();
2160 test_fixed_size_binary_in_dict_inner::<UInt16Type>();
2161 test_fixed_size_binary_in_dict_inner::<Int8Type>();
2162 test_fixed_size_binary_in_dict_inner::<Int16Type>();
2163 test_fixed_size_binary_in_dict_inner::<Int32Type>();
2164 test_fixed_size_binary_in_dict_inner::<Int64Type>();
2165 }
2166
2167 #[test]
2168 fn test_empty_dict() {
2169 let struct_fields = Fields::from(vec![Field::new(
2170 "dict",
2171 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2172 false,
2173 )]);
2174
2175 let schema = Schema::new(vec![Field::new_struct(
2176 "struct",
2177 struct_fields.clone(),
2178 true,
2179 )]);
2180 let dictionary = Arc::new(DictionaryArray::new(
2181 Int32Array::new_null(5),
2182 Arc::new(StringArray::new_null(0)),
2183 ));
2184
2185 let s = StructArray::new(
2186 struct_fields,
2187 vec![dictionary],
2188 Some(NullBuffer::new_null(5)),
2189 );
2190
2191 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(s)]).unwrap();
2192 roundtrip(batch, None);
2193 }
2194 #[test]
2195 fn arrow_writer_page_size() {
2196 let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)]));
2197
2198 let mut builder = StringBuilder::with_capacity(100, 329 * 10_000);
2199
2200 for i in 0..10 {
2202 let value = i
2203 .to_string()
2204 .repeat(10)
2205 .chars()
2206 .take(10)
2207 .collect::<String>();
2208
2209 builder.append_value(value);
2210 }
2211
2212 let array = Arc::new(builder.finish());
2213
2214 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
2215
2216 let file = tempfile::tempfile().unwrap();
2217
2218 let props = WriterProperties::builder()
2220 .set_data_page_size_limit(1)
2221 .set_dictionary_page_size_limit(1)
2222 .set_write_batch_size(1)
2223 .build();
2224
2225 let mut writer =
2226 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
2227 .expect("Unable to write file");
2228 writer.write(&batch).unwrap();
2229 writer.close().unwrap();
2230
2231 let options = ReadOptionsBuilder::new().with_page_index().build();
2232 let reader =
2233 SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap();
2234
2235 let column = reader.metadata().row_group(0).columns();
2236
2237 assert_eq!(column.len(), 1);
2238
2239 assert!(
2242 column[0].dictionary_page_offset().is_some(),
2243 "Expected a dictionary page"
2244 );
2245
2246 assert!(reader.metadata().offset_index().is_some());
2247 let offset_indexes = &reader.metadata().offset_index().unwrap()[0];
2248
2249 let page_locations = offset_indexes[0].page_locations.clone();
2250
2251 assert_eq!(
2254 page_locations.len(),
2255 10,
2256 "Expected 10 pages but got {page_locations:#?}"
2257 );
2258 }
2259
2260 #[test]
2261 fn arrow_writer_float_nans() {
2262 let f16_field = Field::new("a", DataType::Float16, false);
2263 let f32_field = Field::new("b", DataType::Float32, false);
2264 let f64_field = Field::new("c", DataType::Float64, false);
2265 let schema = Schema::new(vec![f16_field, f32_field, f64_field]);
2266
2267 let f16_values = (0..MEDIUM_SIZE)
2268 .map(|i| {
2269 Some(if i % 2 == 0 {
2270 f16::NAN
2271 } else {
2272 f16::from_f32(i as f32)
2273 })
2274 })
2275 .collect::<Float16Array>();
2276
2277 let f32_values = (0..MEDIUM_SIZE)
2278 .map(|i| Some(if i % 2 == 0 { f32::NAN } else { i as f32 }))
2279 .collect::<Float32Array>();
2280
2281 let f64_values = (0..MEDIUM_SIZE)
2282 .map(|i| Some(if i % 2 == 0 { f64::NAN } else { i as f64 }))
2283 .collect::<Float64Array>();
2284
2285 let batch = RecordBatch::try_new(
2286 Arc::new(schema),
2287 vec![
2288 Arc::new(f16_values),
2289 Arc::new(f32_values),
2290 Arc::new(f64_values),
2291 ],
2292 )
2293 .unwrap();
2294
2295 roundtrip(batch, None);
2296 }
2297
2298 const SMALL_SIZE: usize = 7;
2299 const MEDIUM_SIZE: usize = 63;
2300
2301 fn roundtrip(expected_batch: RecordBatch, max_row_group_size: Option<usize>) -> Vec<Bytes> {
2302 let mut files = vec![];
2303 for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2304 let mut props = WriterProperties::builder().set_writer_version(version);
2305
2306 if let Some(size) = max_row_group_size {
2307 props = props.set_max_row_group_size(size)
2308 }
2309
2310 let props = props.build();
2311 files.push(roundtrip_opts(&expected_batch, props))
2312 }
2313 files
2314 }
2315
2316 fn roundtrip_opts_with_array_validation<F>(
2320 expected_batch: &RecordBatch,
2321 props: WriterProperties,
2322 validate: F,
2323 ) -> Bytes
2324 where
2325 F: Fn(&ArrayData, &ArrayData),
2326 {
2327 let mut file = vec![];
2328
2329 let mut writer = ArrowWriter::try_new(&mut file, expected_batch.schema(), Some(props))
2330 .expect("Unable to write file");
2331 writer.write(expected_batch).unwrap();
2332 writer.close().unwrap();
2333
2334 let file = Bytes::from(file);
2335 let mut record_batch_reader =
2336 ParquetRecordBatchReader::try_new(file.clone(), 1024).unwrap();
2337
2338 let actual_batch = record_batch_reader
2339 .next()
2340 .expect("No batch found")
2341 .expect("Unable to get batch");
2342
2343 assert_eq!(expected_batch.schema(), actual_batch.schema());
2344 assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
2345 assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
2346 for i in 0..expected_batch.num_columns() {
2347 let expected_data = expected_batch.column(i).to_data();
2348 let actual_data = actual_batch.column(i).to_data();
2349 validate(&expected_data, &actual_data);
2350 }
2351
2352 file
2353 }
2354
2355 fn roundtrip_opts(expected_batch: &RecordBatch, props: WriterProperties) -> Bytes {
2356 roundtrip_opts_with_array_validation(expected_batch, props, |a, b| {
2357 a.validate_full().expect("valid expected data");
2358 b.validate_full().expect("valid actual data");
2359 assert_eq!(a, b)
2360 })
2361 }
2362
2363 struct RoundTripOptions {
2364 values: ArrayRef,
2365 schema: SchemaRef,
2366 bloom_filter: bool,
2367 bloom_filter_position: BloomFilterPosition,
2368 }
2369
2370 impl RoundTripOptions {
2371 fn new(values: ArrayRef, nullable: bool) -> Self {
2372 let data_type = values.data_type().clone();
2373 let schema = Schema::new(vec![Field::new("col", data_type, nullable)]);
2374 Self {
2375 values,
2376 schema: Arc::new(schema),
2377 bloom_filter: false,
2378 bloom_filter_position: BloomFilterPosition::AfterRowGroup,
2379 }
2380 }
2381 }
2382
2383 fn one_column_roundtrip(values: ArrayRef, nullable: bool) -> Vec<Bytes> {
2384 one_column_roundtrip_with_options(RoundTripOptions::new(values, nullable))
2385 }
2386
2387 fn one_column_roundtrip_with_schema(values: ArrayRef, schema: SchemaRef) -> Vec<Bytes> {
2388 let mut options = RoundTripOptions::new(values, false);
2389 options.schema = schema;
2390 one_column_roundtrip_with_options(options)
2391 }
2392
2393 fn one_column_roundtrip_with_options(options: RoundTripOptions) -> Vec<Bytes> {
2394 let RoundTripOptions {
2395 values,
2396 schema,
2397 bloom_filter,
2398 bloom_filter_position,
2399 } = options;
2400
2401 let encodings = match values.data_type() {
2402 DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary | DataType::LargeBinary => {
2403 vec![
2404 Encoding::PLAIN,
2405 Encoding::DELTA_BYTE_ARRAY,
2406 Encoding::DELTA_LENGTH_BYTE_ARRAY,
2407 ]
2408 }
2409 DataType::Int64
2410 | DataType::Int32
2411 | DataType::Int16
2412 | DataType::Int8
2413 | DataType::UInt64
2414 | DataType::UInt32
2415 | DataType::UInt16
2416 | DataType::UInt8 => vec![
2417 Encoding::PLAIN,
2418 Encoding::DELTA_BINARY_PACKED,
2419 Encoding::BYTE_STREAM_SPLIT,
2420 ],
2421 DataType::Float32 | DataType::Float64 => {
2422 vec![Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT]
2423 }
2424 _ => vec![Encoding::PLAIN],
2425 };
2426
2427 let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2428
2429 let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
2430
2431 let mut files = vec![];
2432 for dictionary_size in [0, 1, 1024] {
2433 for encoding in &encodings {
2434 for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2435 for row_group_size in row_group_sizes {
2436 let props = WriterProperties::builder()
2437 .set_writer_version(version)
2438 .set_max_row_group_size(row_group_size)
2439 .set_dictionary_enabled(dictionary_size != 0)
2440 .set_dictionary_page_size_limit(dictionary_size.max(1))
2441 .set_encoding(*encoding)
2442 .set_bloom_filter_enabled(bloom_filter)
2443 .set_bloom_filter_position(bloom_filter_position)
2444 .build();
2445
2446 files.push(roundtrip_opts(&expected_batch, props))
2447 }
2448 }
2449 }
2450 }
2451 files
2452 }
2453
2454 fn values_required<A, I>(iter: I) -> Vec<Bytes>
2455 where
2456 A: From<Vec<I::Item>> + Array + 'static,
2457 I: IntoIterator,
2458 {
2459 let raw_values: Vec<_> = iter.into_iter().collect();
2460 let values = Arc::new(A::from(raw_values));
2461 one_column_roundtrip(values, false)
2462 }
2463
2464 fn values_optional<A, I>(iter: I) -> Vec<Bytes>
2465 where
2466 A: From<Vec<Option<I::Item>>> + Array + 'static,
2467 I: IntoIterator,
2468 {
2469 let optional_raw_values: Vec<_> = iter
2470 .into_iter()
2471 .enumerate()
2472 .map(|(i, v)| if i % 2 == 0 { None } else { Some(v) })
2473 .collect();
2474 let optional_values = Arc::new(A::from(optional_raw_values));
2475 one_column_roundtrip(optional_values, true)
2476 }
2477
2478 fn required_and_optional<A, I>(iter: I)
2479 where
2480 A: From<Vec<I::Item>> + From<Vec<Option<I::Item>>> + Array + 'static,
2481 I: IntoIterator + Clone,
2482 {
2483 values_required::<A, I>(iter.clone());
2484 values_optional::<A, I>(iter);
2485 }
2486
2487 fn check_bloom_filter<T: AsBytes>(
2488 files: Vec<Bytes>,
2489 file_column: String,
2490 positive_values: Vec<T>,
2491 negative_values: Vec<T>,
2492 ) {
2493 files.into_iter().take(1).for_each(|file| {
2494 let file_reader = SerializedFileReader::new_with_options(
2495 file,
2496 ReadOptionsBuilder::new()
2497 .with_reader_properties(
2498 ReaderProperties::builder()
2499 .set_read_bloom_filter(true)
2500 .build(),
2501 )
2502 .build(),
2503 )
2504 .expect("Unable to open file as Parquet");
2505 let metadata = file_reader.metadata();
2506
2507 let mut bloom_filters: Vec<_> = vec![];
2509 for (ri, row_group) in metadata.row_groups().iter().enumerate() {
2510 if let Some((column_index, _)) = row_group
2511 .columns()
2512 .iter()
2513 .enumerate()
2514 .find(|(_, column)| column.column_path().string() == file_column)
2515 {
2516 let row_group_reader = file_reader
2517 .get_row_group(ri)
2518 .expect("Unable to read row group");
2519 if let Some(sbbf) = row_group_reader.get_column_bloom_filter(column_index) {
2520 bloom_filters.push(sbbf.clone());
2521 } else {
2522 panic!("No bloom filter for column named {file_column} found");
2523 }
2524 } else {
2525 panic!("No column named {file_column} found");
2526 }
2527 }
2528
2529 positive_values.iter().for_each(|value| {
2530 let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2531 assert!(
2532 found.is_some(),
2533 "{}",
2534 format!("Value {:?} should be in bloom filter", value.as_bytes())
2535 );
2536 });
2537
2538 negative_values.iter().for_each(|value| {
2539 let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2540 assert!(
2541 found.is_none(),
2542 "{}",
2543 format!("Value {:?} should not be in bloom filter", value.as_bytes())
2544 );
2545 });
2546 });
2547 }
2548
2549 #[test]
2550 fn all_null_primitive_single_column() {
2551 let values = Arc::new(Int32Array::from(vec![None; SMALL_SIZE]));
2552 one_column_roundtrip(values, true);
2553 }
2554 #[test]
2555 fn null_single_column() {
2556 let values = Arc::new(NullArray::new(SMALL_SIZE));
2557 one_column_roundtrip(values, true);
2558 }
2560
2561 #[test]
2562 fn bool_single_column() {
2563 required_and_optional::<BooleanArray, _>(
2564 [true, false].iter().cycle().copied().take(SMALL_SIZE),
2565 );
2566 }
2567
2568 #[test]
2569 fn bool_large_single_column() {
2570 let values = Arc::new(
2571 [None, Some(true), Some(false)]
2572 .iter()
2573 .cycle()
2574 .copied()
2575 .take(200_000)
2576 .collect::<BooleanArray>(),
2577 );
2578 let schema = Schema::new(vec![Field::new("col", values.data_type().clone(), true)]);
2579 let expected_batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2580 let file = tempfile::tempfile().unwrap();
2581
2582 let mut writer =
2583 ArrowWriter::try_new(file.try_clone().unwrap(), expected_batch.schema(), None)
2584 .expect("Unable to write file");
2585 writer.write(&expected_batch).unwrap();
2586 writer.close().unwrap();
2587 }
2588
2589 #[test]
2590 fn check_page_offset_index_with_nan() {
2591 let values = Arc::new(Float64Array::from(vec![f64::NAN; 10]));
2592 let schema = Schema::new(vec![Field::new("col", DataType::Float64, true)]);
2593 let batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2594
2595 let mut out = Vec::with_capacity(1024);
2596 let mut writer =
2597 ArrowWriter::try_new(&mut out, batch.schema(), None).expect("Unable to write file");
2598 writer.write(&batch).unwrap();
2599 let file_meta_data = writer.close().unwrap();
2600 for row_group in file_meta_data.row_groups() {
2601 for column in row_group.columns() {
2602 assert!(column.offset_index_offset().is_some());
2603 assert!(column.offset_index_length().is_some());
2604 assert!(column.column_index_offset().is_none());
2605 assert!(column.column_index_length().is_none());
2606 }
2607 }
2608 }
2609
2610 #[test]
2611 fn i8_single_column() {
2612 required_and_optional::<Int8Array, _>(0..SMALL_SIZE as i8);
2613 }
2614
2615 #[test]
2616 fn i16_single_column() {
2617 required_and_optional::<Int16Array, _>(0..SMALL_SIZE as i16);
2618 }
2619
2620 #[test]
2621 fn i32_single_column() {
2622 required_and_optional::<Int32Array, _>(0..SMALL_SIZE as i32);
2623 }
2624
2625 #[test]
2626 fn i64_single_column() {
2627 required_and_optional::<Int64Array, _>(0..SMALL_SIZE as i64);
2628 }
2629
2630 #[test]
2631 fn u8_single_column() {
2632 required_and_optional::<UInt8Array, _>(0..SMALL_SIZE as u8);
2633 }
2634
2635 #[test]
2636 fn u16_single_column() {
2637 required_and_optional::<UInt16Array, _>(0..SMALL_SIZE as u16);
2638 }
2639
2640 #[test]
2641 fn u32_single_column() {
2642 required_and_optional::<UInt32Array, _>(0..SMALL_SIZE as u32);
2643 }
2644
2645 #[test]
2646 fn u64_single_column() {
2647 required_and_optional::<UInt64Array, _>(0..SMALL_SIZE as u64);
2648 }
2649
2650 #[test]
2651 fn f32_single_column() {
2652 required_and_optional::<Float32Array, _>((0..SMALL_SIZE).map(|i| i as f32));
2653 }
2654
2655 #[test]
2656 fn f64_single_column() {
2657 required_and_optional::<Float64Array, _>((0..SMALL_SIZE).map(|i| i as f64));
2658 }
2659
2660 #[test]
2665 fn timestamp_second_single_column() {
2666 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2667 let values = Arc::new(TimestampSecondArray::from(raw_values));
2668
2669 one_column_roundtrip(values, false);
2670 }
2671
2672 #[test]
2673 fn timestamp_millisecond_single_column() {
2674 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2675 let values = Arc::new(TimestampMillisecondArray::from(raw_values));
2676
2677 one_column_roundtrip(values, false);
2678 }
2679
2680 #[test]
2681 fn timestamp_microsecond_single_column() {
2682 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2683 let values = Arc::new(TimestampMicrosecondArray::from(raw_values));
2684
2685 one_column_roundtrip(values, false);
2686 }
2687
2688 #[test]
2689 fn timestamp_nanosecond_single_column() {
2690 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2691 let values = Arc::new(TimestampNanosecondArray::from(raw_values));
2692
2693 one_column_roundtrip(values, false);
2694 }
2695
2696 #[test]
2697 fn date32_single_column() {
2698 required_and_optional::<Date32Array, _>(0..SMALL_SIZE as i32);
2699 }
2700
2701 #[test]
2702 fn date64_single_column() {
2703 required_and_optional::<Date64Array, _>(
2705 (0..(SMALL_SIZE as i64 * 86400000)).step_by(86400000),
2706 );
2707 }
2708
2709 #[test]
2710 fn time32_second_single_column() {
2711 required_and_optional::<Time32SecondArray, _>(0..SMALL_SIZE as i32);
2712 }
2713
2714 #[test]
2715 fn time32_millisecond_single_column() {
2716 required_and_optional::<Time32MillisecondArray, _>(0..SMALL_SIZE as i32);
2717 }
2718
2719 #[test]
2720 fn time64_microsecond_single_column() {
2721 required_and_optional::<Time64MicrosecondArray, _>(0..SMALL_SIZE as i64);
2722 }
2723
2724 #[test]
2725 fn time64_nanosecond_single_column() {
2726 required_and_optional::<Time64NanosecondArray, _>(0..SMALL_SIZE as i64);
2727 }
2728
2729 #[test]
2730 fn duration_second_single_column() {
2731 required_and_optional::<DurationSecondArray, _>(0..SMALL_SIZE as i64);
2732 }
2733
2734 #[test]
2735 fn duration_millisecond_single_column() {
2736 required_and_optional::<DurationMillisecondArray, _>(0..SMALL_SIZE as i64);
2737 }
2738
2739 #[test]
2740 fn duration_microsecond_single_column() {
2741 required_and_optional::<DurationMicrosecondArray, _>(0..SMALL_SIZE as i64);
2742 }
2743
2744 #[test]
2745 fn duration_nanosecond_single_column() {
2746 required_and_optional::<DurationNanosecondArray, _>(0..SMALL_SIZE as i64);
2747 }
2748
2749 #[test]
2750 fn interval_year_month_single_column() {
2751 required_and_optional::<IntervalYearMonthArray, _>(0..SMALL_SIZE as i32);
2752 }
2753
2754 #[test]
2755 fn interval_day_time_single_column() {
2756 required_and_optional::<IntervalDayTimeArray, _>(vec![
2757 IntervalDayTime::new(0, 1),
2758 IntervalDayTime::new(0, 3),
2759 IntervalDayTime::new(3, -2),
2760 IntervalDayTime::new(-200, 4),
2761 ]);
2762 }
2763
2764 #[test]
2765 #[should_panic(
2766 expected = "Attempting to write an Arrow interval type MonthDayNano to parquet that is not yet implemented"
2767 )]
2768 fn interval_month_day_nano_single_column() {
2769 required_and_optional::<IntervalMonthDayNanoArray, _>(vec![
2770 IntervalMonthDayNano::new(0, 1, 5),
2771 IntervalMonthDayNano::new(0, 3, 2),
2772 IntervalMonthDayNano::new(3, -2, -5),
2773 IntervalMonthDayNano::new(-200, 4, -1),
2774 ]);
2775 }
2776
2777 #[test]
2778 fn binary_single_column() {
2779 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2780 let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
2781 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2782
2783 values_required::<BinaryArray, _>(many_vecs_iter);
2785 }
2786
2787 #[test]
2788 fn binary_view_single_column() {
2789 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2790 let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
2791 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2792
2793 values_required::<BinaryViewArray, _>(many_vecs_iter);
2795 }
2796
2797 #[test]
2798 fn i32_column_bloom_filter_at_end() {
2799 let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
2800 let mut options = RoundTripOptions::new(array, false);
2801 options.bloom_filter = true;
2802 options.bloom_filter_position = BloomFilterPosition::End;
2803
2804 let files = one_column_roundtrip_with_options(options);
2805 check_bloom_filter(
2806 files,
2807 "col".to_string(),
2808 (0..SMALL_SIZE as i32).collect(),
2809 (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
2810 );
2811 }
2812
2813 #[test]
2814 fn i32_column_bloom_filter() {
2815 let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
2816 let mut options = RoundTripOptions::new(array, false);
2817 options.bloom_filter = true;
2818
2819 let files = one_column_roundtrip_with_options(options);
2820 check_bloom_filter(
2821 files,
2822 "col".to_string(),
2823 (0..SMALL_SIZE as i32).collect(),
2824 (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
2825 );
2826 }
2827
2828 #[test]
2829 fn binary_column_bloom_filter() {
2830 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2831 let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
2832 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2833
2834 let array = Arc::new(BinaryArray::from_iter_values(many_vecs_iter));
2835 let mut options = RoundTripOptions::new(array, false);
2836 options.bloom_filter = true;
2837
2838 let files = one_column_roundtrip_with_options(options);
2839 check_bloom_filter(
2840 files,
2841 "col".to_string(),
2842 many_vecs,
2843 vec![vec![(SMALL_SIZE + 1) as u8]],
2844 );
2845 }
2846
2847 #[test]
2848 fn empty_string_null_column_bloom_filter() {
2849 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2850 let raw_strs = raw_values.iter().map(|s| s.as_str());
2851
2852 let array = Arc::new(StringArray::from_iter_values(raw_strs));
2853 let mut options = RoundTripOptions::new(array, false);
2854 options.bloom_filter = true;
2855
2856 let files = one_column_roundtrip_with_options(options);
2857
2858 let optional_raw_values: Vec<_> = raw_values
2859 .iter()
2860 .enumerate()
2861 .filter_map(|(i, v)| if i % 2 == 0 { None } else { Some(v.as_str()) })
2862 .collect();
2863 check_bloom_filter(files, "col".to_string(), optional_raw_values, vec![""]);
2865 }
2866
2867 #[test]
2868 fn large_binary_single_column() {
2869 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2870 let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
2871 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2872
2873 values_required::<LargeBinaryArray, _>(many_vecs_iter);
2875 }
2876
2877 #[test]
2878 fn fixed_size_binary_single_column() {
2879 let mut builder = FixedSizeBinaryBuilder::new(4);
2880 builder.append_value(b"0123").unwrap();
2881 builder.append_null();
2882 builder.append_value(b"8910").unwrap();
2883 builder.append_value(b"1112").unwrap();
2884 let array = Arc::new(builder.finish());
2885
2886 one_column_roundtrip(array, true);
2887 }
2888
2889 #[test]
2890 fn string_single_column() {
2891 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2892 let raw_strs = raw_values.iter().map(|s| s.as_str());
2893
2894 required_and_optional::<StringArray, _>(raw_strs);
2895 }
2896
2897 #[test]
2898 fn large_string_single_column() {
2899 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2900 let raw_strs = raw_values.iter().map(|s| s.as_str());
2901
2902 required_and_optional::<LargeStringArray, _>(raw_strs);
2903 }
2904
2905 #[test]
2906 fn string_view_single_column() {
2907 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2908 let raw_strs = raw_values.iter().map(|s| s.as_str());
2909
2910 required_and_optional::<StringViewArray, _>(raw_strs);
2911 }
2912
2913 #[test]
2914 fn null_list_single_column() {
2915 let null_field = Field::new_list_field(DataType::Null, true);
2916 let list_field = Field::new("emptylist", DataType::List(Arc::new(null_field)), true);
2917
2918 let schema = Schema::new(vec![list_field]);
2919
2920 let a_values = NullArray::new(2);
2922 let a_value_offsets = arrow::buffer::Buffer::from([0, 0, 0, 2].to_byte_slice());
2923 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2924 DataType::Null,
2925 true,
2926 ))))
2927 .len(3)
2928 .add_buffer(a_value_offsets)
2929 .null_bit_buffer(Some(Buffer::from([0b00000101])))
2930 .add_child_data(a_values.into_data())
2931 .build()
2932 .unwrap();
2933
2934 let a = ListArray::from(a_list_data);
2935
2936 assert!(a.is_valid(0));
2937 assert!(!a.is_valid(1));
2938 assert!(a.is_valid(2));
2939
2940 assert_eq!(a.value(0).len(), 0);
2941 assert_eq!(a.value(2).len(), 2);
2942 assert_eq!(a.value(2).logical_nulls().unwrap().null_count(), 2);
2943
2944 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2945 roundtrip(batch, None);
2946 }
2947
2948 #[test]
2949 fn list_single_column() {
2950 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2951 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
2952 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2953 DataType::Int32,
2954 false,
2955 ))))
2956 .len(5)
2957 .add_buffer(a_value_offsets)
2958 .null_bit_buffer(Some(Buffer::from([0b00011011])))
2959 .add_child_data(a_values.into_data())
2960 .build()
2961 .unwrap();
2962
2963 assert_eq!(a_list_data.null_count(), 1);
2964
2965 let a = ListArray::from(a_list_data);
2966 let values = Arc::new(a);
2967
2968 one_column_roundtrip(values, true);
2969 }
2970
2971 #[test]
2972 fn large_list_single_column() {
2973 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2974 let a_value_offsets = arrow::buffer::Buffer::from([0i64, 1, 3, 3, 6, 10].to_byte_slice());
2975 let a_list_data = ArrayData::builder(DataType::LargeList(Arc::new(Field::new(
2976 "large_item",
2977 DataType::Int32,
2978 true,
2979 ))))
2980 .len(5)
2981 .add_buffer(a_value_offsets)
2982 .add_child_data(a_values.into_data())
2983 .null_bit_buffer(Some(Buffer::from([0b00011011])))
2984 .build()
2985 .unwrap();
2986
2987 assert_eq!(a_list_data.null_count(), 1);
2989
2990 let a = LargeListArray::from(a_list_data);
2991 let values = Arc::new(a);
2992
2993 one_column_roundtrip(values, true);
2994 }
2995
2996 #[test]
2997 fn list_nested_nulls() {
2998 use arrow::datatypes::Int32Type;
2999 let data = vec![
3000 Some(vec![Some(1)]),
3001 Some(vec![Some(2), Some(3)]),
3002 None,
3003 Some(vec![Some(4), Some(5), None]),
3004 Some(vec![None]),
3005 Some(vec![Some(6), Some(7)]),
3006 ];
3007
3008 let list = ListArray::from_iter_primitive::<Int32Type, _, _>(data.clone());
3009 one_column_roundtrip(Arc::new(list), true);
3010
3011 let list = LargeListArray::from_iter_primitive::<Int32Type, _, _>(data);
3012 one_column_roundtrip(Arc::new(list), true);
3013 }
3014
3015 #[test]
3016 fn struct_single_column() {
3017 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
3018 let struct_field_a = Arc::new(Field::new("f", DataType::Int32, false));
3019 let s = StructArray::from(vec![(struct_field_a, Arc::new(a_values) as ArrayRef)]);
3020
3021 let values = Arc::new(s);
3022 one_column_roundtrip(values, false);
3023 }
3024
3025 #[test]
3026 fn list_and_map_coerced_names() {
3027 let list_field =
3029 Field::new_list("my_list", Field::new("item", DataType::Int32, false), false);
3030 let map_field = Field::new_map(
3031 "my_map",
3032 "entries",
3033 Field::new("keys", DataType::Int32, false),
3034 Field::new("values", DataType::Int32, true),
3035 false,
3036 true,
3037 );
3038
3039 let list_array = create_random_array(&list_field, 100, 0.0, 0.0).unwrap();
3040 let map_array = create_random_array(&map_field, 100, 0.0, 0.0).unwrap();
3041
3042 let arrow_schema = Arc::new(Schema::new(vec![list_field, map_field]));
3043
3044 let props = Some(WriterProperties::builder().set_coerce_types(true).build());
3046 let file = tempfile::tempfile().unwrap();
3047 let mut writer =
3048 ArrowWriter::try_new(file.try_clone().unwrap(), arrow_schema.clone(), props).unwrap();
3049
3050 let batch = RecordBatch::try_new(arrow_schema, vec![list_array, map_array]).unwrap();
3051 writer.write(&batch).unwrap();
3052 let file_metadata = writer.close().unwrap();
3053
3054 let schema = file_metadata.file_metadata().schema();
3055 let list_field = &schema.get_fields()[0].get_fields()[0];
3057 assert_eq!(list_field.get_fields()[0].name(), "element");
3058
3059 let map_field = &schema.get_fields()[1].get_fields()[0];
3060 assert_eq!(map_field.name(), "key_value");
3062 assert_eq!(map_field.get_fields()[0].name(), "key");
3064 assert_eq!(map_field.get_fields()[1].name(), "value");
3066
3067 let reader = SerializedFileReader::new(file).unwrap();
3069 let file_schema = reader.metadata().file_metadata().schema();
3070 let fields = file_schema.get_fields();
3071 let list_field = &fields[0].get_fields()[0];
3072 assert_eq!(list_field.get_fields()[0].name(), "element");
3073 let map_field = &fields[1].get_fields()[0];
3074 assert_eq!(map_field.name(), "key_value");
3075 assert_eq!(map_field.get_fields()[0].name(), "key");
3076 assert_eq!(map_field.get_fields()[1].name(), "value");
3077 }
3078
3079 #[test]
3080 fn fallback_flush_data_page() {
3081 let raw_values: Vec<_> = (0..MEDIUM_SIZE).map(|i| i.to_string()).collect();
3083 let values = Arc::new(StringArray::from(raw_values));
3084 let encodings = vec![
3085 Encoding::DELTA_BYTE_ARRAY,
3086 Encoding::DELTA_LENGTH_BYTE_ARRAY,
3087 ];
3088 let data_type = values.data_type().clone();
3089 let schema = Arc::new(Schema::new(vec![Field::new("col", data_type, false)]));
3090 let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3091
3092 let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
3093 let data_page_size_limit: usize = 32;
3094 let write_batch_size: usize = 16;
3095
3096 for encoding in &encodings {
3097 for row_group_size in row_group_sizes {
3098 let props = WriterProperties::builder()
3099 .set_writer_version(WriterVersion::PARQUET_2_0)
3100 .set_max_row_group_size(row_group_size)
3101 .set_dictionary_enabled(false)
3102 .set_encoding(*encoding)
3103 .set_data_page_size_limit(data_page_size_limit)
3104 .set_write_batch_size(write_batch_size)
3105 .build();
3106
3107 roundtrip_opts_with_array_validation(&expected_batch, props, |a, b| {
3108 let string_array_a = StringArray::from(a.clone());
3109 let string_array_b = StringArray::from(b.clone());
3110 let vec_a: Vec<&str> = string_array_a.iter().map(|v| v.unwrap()).collect();
3111 let vec_b: Vec<&str> = string_array_b.iter().map(|v| v.unwrap()).collect();
3112 assert_eq!(
3113 vec_a, vec_b,
3114 "failed for encoder: {encoding:?} and row_group_size: {row_group_size:?}"
3115 );
3116 });
3117 }
3118 }
3119 }
3120
3121 #[test]
3122 fn arrow_writer_string_dictionary() {
3123 #[allow(deprecated)]
3125 let schema = Arc::new(Schema::new(vec![Field::new_dict(
3126 "dictionary",
3127 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3128 true,
3129 42,
3130 true,
3131 )]));
3132
3133 let d: Int32DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
3135 .iter()
3136 .copied()
3137 .collect();
3138
3139 one_column_roundtrip_with_schema(Arc::new(d), schema);
3141 }
3142
3143 #[test]
3144 fn arrow_writer_test_type_compatibility() {
3145 fn ensure_compatible_write<T1, T2>(array1: T1, array2: T2, expected_result: T1)
3146 where
3147 T1: Array + 'static,
3148 T2: Array + 'static,
3149 {
3150 let schema1 = Arc::new(Schema::new(vec![Field::new(
3151 "a",
3152 array1.data_type().clone(),
3153 false,
3154 )]));
3155
3156 let file = tempfile().unwrap();
3157 let mut writer =
3158 ArrowWriter::try_new(file.try_clone().unwrap(), schema1.clone(), None).unwrap();
3159
3160 let rb1 = RecordBatch::try_new(schema1.clone(), vec![Arc::new(array1)]).unwrap();
3161 writer.write(&rb1).unwrap();
3162
3163 let schema2 = Arc::new(Schema::new(vec![Field::new(
3164 "a",
3165 array2.data_type().clone(),
3166 false,
3167 )]));
3168 let rb2 = RecordBatch::try_new(schema2, vec![Arc::new(array2)]).unwrap();
3169 writer.write(&rb2).unwrap();
3170
3171 writer.close().unwrap();
3172
3173 let mut record_batch_reader =
3174 ParquetRecordBatchReader::try_new(file.try_clone().unwrap(), 1024).unwrap();
3175 let actual_batch = record_batch_reader.next().unwrap().unwrap();
3176
3177 let expected_batch =
3178 RecordBatch::try_new(schema1, vec![Arc::new(expected_result)]).unwrap();
3179 assert_eq!(actual_batch, expected_batch);
3180 }
3181
3182 ensure_compatible_write(
3185 DictionaryArray::new(
3186 UInt8Array::from_iter_values(vec![0]),
3187 Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3188 ),
3189 StringArray::from_iter_values(vec!["barquet"]),
3190 DictionaryArray::new(
3191 UInt8Array::from_iter_values(vec![0, 1]),
3192 Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3193 ),
3194 );
3195
3196 ensure_compatible_write(
3197 StringArray::from_iter_values(vec!["parquet"]),
3198 DictionaryArray::new(
3199 UInt8Array::from_iter_values(vec![0]),
3200 Arc::new(StringArray::from_iter_values(vec!["barquet"])),
3201 ),
3202 StringArray::from_iter_values(vec!["parquet", "barquet"]),
3203 );
3204
3205 ensure_compatible_write(
3208 DictionaryArray::new(
3209 UInt8Array::from_iter_values(vec![0]),
3210 Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3211 ),
3212 DictionaryArray::new(
3213 UInt16Array::from_iter_values(vec![0]),
3214 Arc::new(StringArray::from_iter_values(vec!["barquet"])),
3215 ),
3216 DictionaryArray::new(
3217 UInt8Array::from_iter_values(vec![0, 1]),
3218 Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3219 ),
3220 );
3221
3222 ensure_compatible_write(
3224 DictionaryArray::new(
3225 UInt8Array::from_iter_values(vec![0]),
3226 Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3227 ),
3228 DictionaryArray::new(
3229 UInt8Array::from_iter_values(vec![0]),
3230 Arc::new(LargeStringArray::from_iter_values(vec!["barquet"])),
3231 ),
3232 DictionaryArray::new(
3233 UInt8Array::from_iter_values(vec![0, 1]),
3234 Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3235 ),
3236 );
3237
3238 ensure_compatible_write(
3240 DictionaryArray::new(
3241 UInt8Array::from_iter_values(vec![0]),
3242 Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3243 ),
3244 LargeStringArray::from_iter_values(vec!["barquet"]),
3245 DictionaryArray::new(
3246 UInt8Array::from_iter_values(vec![0, 1]),
3247 Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3248 ),
3249 );
3250
3251 ensure_compatible_write(
3254 StringArray::from_iter_values(vec!["parquet"]),
3255 LargeStringArray::from_iter_values(vec!["barquet"]),
3256 StringArray::from_iter_values(vec!["parquet", "barquet"]),
3257 );
3258
3259 ensure_compatible_write(
3260 LargeStringArray::from_iter_values(vec!["parquet"]),
3261 StringArray::from_iter_values(vec!["barquet"]),
3262 LargeStringArray::from_iter_values(vec!["parquet", "barquet"]),
3263 );
3264
3265 ensure_compatible_write(
3266 StringArray::from_iter_values(vec!["parquet"]),
3267 StringViewArray::from_iter_values(vec!["barquet"]),
3268 StringArray::from_iter_values(vec!["parquet", "barquet"]),
3269 );
3270
3271 ensure_compatible_write(
3272 StringViewArray::from_iter_values(vec!["parquet"]),
3273 StringArray::from_iter_values(vec!["barquet"]),
3274 StringViewArray::from_iter_values(vec!["parquet", "barquet"]),
3275 );
3276
3277 ensure_compatible_write(
3278 LargeStringArray::from_iter_values(vec!["parquet"]),
3279 StringViewArray::from_iter_values(vec!["barquet"]),
3280 LargeStringArray::from_iter_values(vec!["parquet", "barquet"]),
3281 );
3282
3283 ensure_compatible_write(
3284 StringViewArray::from_iter_values(vec!["parquet"]),
3285 LargeStringArray::from_iter_values(vec!["barquet"]),
3286 StringViewArray::from_iter_values(vec!["parquet", "barquet"]),
3287 );
3288
3289 ensure_compatible_write(
3292 BinaryArray::from_iter_values(vec![b"parquet"]),
3293 LargeBinaryArray::from_iter_values(vec![b"barquet"]),
3294 BinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3295 );
3296
3297 ensure_compatible_write(
3298 LargeBinaryArray::from_iter_values(vec![b"parquet"]),
3299 BinaryArray::from_iter_values(vec![b"barquet"]),
3300 LargeBinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3301 );
3302
3303 ensure_compatible_write(
3304 BinaryArray::from_iter_values(vec![b"parquet"]),
3305 BinaryViewArray::from_iter_values(vec![b"barquet"]),
3306 BinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3307 );
3308
3309 ensure_compatible_write(
3310 BinaryViewArray::from_iter_values(vec![b"parquet"]),
3311 BinaryArray::from_iter_values(vec![b"barquet"]),
3312 BinaryViewArray::from_iter_values(vec![b"parquet", b"barquet"]),
3313 );
3314
3315 ensure_compatible_write(
3316 BinaryViewArray::from_iter_values(vec![b"parquet"]),
3317 LargeBinaryArray::from_iter_values(vec![b"barquet"]),
3318 BinaryViewArray::from_iter_values(vec![b"parquet", b"barquet"]),
3319 );
3320
3321 ensure_compatible_write(
3322 LargeBinaryArray::from_iter_values(vec![b"parquet"]),
3323 BinaryViewArray::from_iter_values(vec![b"barquet"]),
3324 LargeBinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3325 );
3326 }
3327
3328 #[test]
3329 fn arrow_writer_primitive_dictionary() {
3330 #[allow(deprecated)]
3332 let schema = Arc::new(Schema::new(vec![Field::new_dict(
3333 "dictionary",
3334 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::UInt32)),
3335 true,
3336 42,
3337 true,
3338 )]));
3339
3340 let mut builder = PrimitiveDictionaryBuilder::<UInt8Type, UInt32Type>::new();
3342 builder.append(12345678).unwrap();
3343 builder.append_null();
3344 builder.append(22345678).unwrap();
3345 builder.append(12345678).unwrap();
3346 let d = builder.finish();
3347
3348 one_column_roundtrip_with_schema(Arc::new(d), schema);
3349 }
3350
3351 #[test]
3352 fn arrow_writer_decimal32_dictionary() {
3353 let integers = vec![12345, 56789, 34567];
3354
3355 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3356
3357 let values = Decimal32Array::from(integers.clone())
3358 .with_precision_and_scale(5, 2)
3359 .unwrap();
3360
3361 let array = DictionaryArray::new(keys, Arc::new(values));
3362 one_column_roundtrip(Arc::new(array.clone()), true);
3363
3364 let values = Decimal32Array::from(integers)
3365 .with_precision_and_scale(9, 2)
3366 .unwrap();
3367
3368 let array = array.with_values(Arc::new(values));
3369 one_column_roundtrip(Arc::new(array), true);
3370 }
3371
3372 #[test]
3373 fn arrow_writer_decimal64_dictionary() {
3374 let integers = vec![12345, 56789, 34567];
3375
3376 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3377
3378 let values = Decimal64Array::from(integers.clone())
3379 .with_precision_and_scale(5, 2)
3380 .unwrap();
3381
3382 let array = DictionaryArray::new(keys, Arc::new(values));
3383 one_column_roundtrip(Arc::new(array.clone()), true);
3384
3385 let values = Decimal64Array::from(integers)
3386 .with_precision_and_scale(12, 2)
3387 .unwrap();
3388
3389 let array = array.with_values(Arc::new(values));
3390 one_column_roundtrip(Arc::new(array), true);
3391 }
3392
3393 #[test]
3394 fn arrow_writer_decimal128_dictionary() {
3395 let integers = vec![12345, 56789, 34567];
3396
3397 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3398
3399 let values = Decimal128Array::from(integers.clone())
3400 .with_precision_and_scale(5, 2)
3401 .unwrap();
3402
3403 let array = DictionaryArray::new(keys, Arc::new(values));
3404 one_column_roundtrip(Arc::new(array.clone()), true);
3405
3406 let values = Decimal128Array::from(integers)
3407 .with_precision_and_scale(12, 2)
3408 .unwrap();
3409
3410 let array = array.with_values(Arc::new(values));
3411 one_column_roundtrip(Arc::new(array), true);
3412 }
3413
3414 #[test]
3415 fn arrow_writer_decimal256_dictionary() {
3416 let integers = vec![
3417 i256::from_i128(12345),
3418 i256::from_i128(56789),
3419 i256::from_i128(34567),
3420 ];
3421
3422 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3423
3424 let values = Decimal256Array::from(integers.clone())
3425 .with_precision_and_scale(5, 2)
3426 .unwrap();
3427
3428 let array = DictionaryArray::new(keys, Arc::new(values));
3429 one_column_roundtrip(Arc::new(array.clone()), true);
3430
3431 let values = Decimal256Array::from(integers)
3432 .with_precision_and_scale(12, 2)
3433 .unwrap();
3434
3435 let array = array.with_values(Arc::new(values));
3436 one_column_roundtrip(Arc::new(array), true);
3437 }
3438
3439 #[test]
3440 fn arrow_writer_string_dictionary_unsigned_index() {
3441 #[allow(deprecated)]
3443 let schema = Arc::new(Schema::new(vec![Field::new_dict(
3444 "dictionary",
3445 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3446 true,
3447 42,
3448 true,
3449 )]));
3450
3451 let d: UInt8DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
3453 .iter()
3454 .copied()
3455 .collect();
3456
3457 one_column_roundtrip_with_schema(Arc::new(d), schema);
3458 }
3459
3460 #[test]
3461 fn u32_min_max() {
3462 let src = [
3464 u32::MIN,
3465 u32::MIN + 1,
3466 (i32::MAX as u32) - 1,
3467 i32::MAX as u32,
3468 (i32::MAX as u32) + 1,
3469 u32::MAX - 1,
3470 u32::MAX,
3471 ];
3472 let values = Arc::new(UInt32Array::from_iter_values(src.iter().cloned()));
3473 let files = one_column_roundtrip(values, false);
3474
3475 for file in files {
3476 let reader = SerializedFileReader::new(file).unwrap();
3478 let metadata = reader.metadata();
3479
3480 let mut row_offset = 0;
3481 for row_group in metadata.row_groups() {
3482 assert_eq!(row_group.num_columns(), 1);
3483 let column = row_group.column(0);
3484
3485 let num_values = column.num_values() as usize;
3486 let src_slice = &src[row_offset..row_offset + num_values];
3487 row_offset += column.num_values() as usize;
3488
3489 let stats = column.statistics().unwrap();
3490 if let Statistics::Int32(stats) = stats {
3491 assert_eq!(
3492 *stats.min_opt().unwrap() as u32,
3493 *src_slice.iter().min().unwrap()
3494 );
3495 assert_eq!(
3496 *stats.max_opt().unwrap() as u32,
3497 *src_slice.iter().max().unwrap()
3498 );
3499 } else {
3500 panic!("Statistics::Int32 missing")
3501 }
3502 }
3503 }
3504 }
3505
3506 #[test]
3507 fn u64_min_max() {
3508 let src = [
3510 u64::MIN,
3511 u64::MIN + 1,
3512 (i64::MAX as u64) - 1,
3513 i64::MAX as u64,
3514 (i64::MAX as u64) + 1,
3515 u64::MAX - 1,
3516 u64::MAX,
3517 ];
3518 let values = Arc::new(UInt64Array::from_iter_values(src.iter().cloned()));
3519 let files = one_column_roundtrip(values, false);
3520
3521 for file in files {
3522 let reader = SerializedFileReader::new(file).unwrap();
3524 let metadata = reader.metadata();
3525
3526 let mut row_offset = 0;
3527 for row_group in metadata.row_groups() {
3528 assert_eq!(row_group.num_columns(), 1);
3529 let column = row_group.column(0);
3530
3531 let num_values = column.num_values() as usize;
3532 let src_slice = &src[row_offset..row_offset + num_values];
3533 row_offset += column.num_values() as usize;
3534
3535 let stats = column.statistics().unwrap();
3536 if let Statistics::Int64(stats) = stats {
3537 assert_eq!(
3538 *stats.min_opt().unwrap() as u64,
3539 *src_slice.iter().min().unwrap()
3540 );
3541 assert_eq!(
3542 *stats.max_opt().unwrap() as u64,
3543 *src_slice.iter().max().unwrap()
3544 );
3545 } else {
3546 panic!("Statistics::Int64 missing")
3547 }
3548 }
3549 }
3550 }
3551
3552 #[test]
3553 fn statistics_null_counts_only_nulls() {
3554 let values = Arc::new(UInt64Array::from(vec![None, None]));
3556 let files = one_column_roundtrip(values, true);
3557
3558 for file in files {
3559 let reader = SerializedFileReader::new(file).unwrap();
3561 let metadata = reader.metadata();
3562 assert_eq!(metadata.num_row_groups(), 1);
3563 let row_group = metadata.row_group(0);
3564 assert_eq!(row_group.num_columns(), 1);
3565 let column = row_group.column(0);
3566 let stats = column.statistics().unwrap();
3567 assert_eq!(stats.null_count_opt(), Some(2));
3568 }
3569 }
3570
3571 #[test]
3572 fn test_list_of_struct_roundtrip() {
3573 let int_field = Field::new("a", DataType::Int32, true);
3575 let int_field2 = Field::new("b", DataType::Int32, true);
3576
3577 let int_builder = Int32Builder::with_capacity(10);
3578 let int_builder2 = Int32Builder::with_capacity(10);
3579
3580 let struct_builder = StructBuilder::new(
3581 vec![int_field, int_field2],
3582 vec![Box::new(int_builder), Box::new(int_builder2)],
3583 );
3584 let mut list_builder = ListBuilder::new(struct_builder);
3585
3586 let values = list_builder.values();
3591 values
3592 .field_builder::<Int32Builder>(0)
3593 .unwrap()
3594 .append_value(1);
3595 values
3596 .field_builder::<Int32Builder>(1)
3597 .unwrap()
3598 .append_value(2);
3599 values.append(true);
3600 list_builder.append(true);
3601
3602 list_builder.append(true);
3604
3605 list_builder.append(false);
3607
3608 let values = list_builder.values();
3610 values
3611 .field_builder::<Int32Builder>(0)
3612 .unwrap()
3613 .append_null();
3614 values
3615 .field_builder::<Int32Builder>(1)
3616 .unwrap()
3617 .append_null();
3618 values.append(false);
3619 values
3620 .field_builder::<Int32Builder>(0)
3621 .unwrap()
3622 .append_null();
3623 values
3624 .field_builder::<Int32Builder>(1)
3625 .unwrap()
3626 .append_null();
3627 values.append(false);
3628 list_builder.append(true);
3629
3630 let values = list_builder.values();
3632 values
3633 .field_builder::<Int32Builder>(0)
3634 .unwrap()
3635 .append_null();
3636 values
3637 .field_builder::<Int32Builder>(1)
3638 .unwrap()
3639 .append_value(3);
3640 values.append(true);
3641 list_builder.append(true);
3642
3643 let values = list_builder.values();
3645 values
3646 .field_builder::<Int32Builder>(0)
3647 .unwrap()
3648 .append_value(2);
3649 values
3650 .field_builder::<Int32Builder>(1)
3651 .unwrap()
3652 .append_null();
3653 values.append(true);
3654 list_builder.append(true);
3655
3656 let array = Arc::new(list_builder.finish());
3657
3658 one_column_roundtrip(array, true);
3659 }
3660
3661 fn row_group_sizes(metadata: &ParquetMetaData) -> Vec<i64> {
3662 metadata.row_groups().iter().map(|x| x.num_rows()).collect()
3663 }
3664
3665 #[test]
3666 fn test_aggregates_records() {
3667 let arrays = [
3668 Int32Array::from((0..100).collect::<Vec<_>>()),
3669 Int32Array::from((0..50).collect::<Vec<_>>()),
3670 Int32Array::from((200..500).collect::<Vec<_>>()),
3671 ];
3672
3673 let schema = Arc::new(Schema::new(vec![Field::new(
3674 "int",
3675 ArrowDataType::Int32,
3676 false,
3677 )]));
3678
3679 let file = tempfile::tempfile().unwrap();
3680
3681 let props = WriterProperties::builder()
3682 .set_max_row_group_size(200)
3683 .build();
3684
3685 let mut writer =
3686 ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
3687
3688 for array in arrays {
3689 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
3690 writer.write(&batch).unwrap();
3691 }
3692
3693 writer.close().unwrap();
3694
3695 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3696 assert_eq!(&row_group_sizes(builder.metadata()), &[200, 200, 50]);
3697
3698 let batches = builder
3699 .with_batch_size(100)
3700 .build()
3701 .unwrap()
3702 .collect::<ArrowResult<Vec<_>>>()
3703 .unwrap();
3704
3705 assert_eq!(batches.len(), 5);
3706 assert!(batches.iter().all(|x| x.num_columns() == 1));
3707
3708 let batch_sizes: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
3709
3710 assert_eq!(&batch_sizes, &[100, 100, 100, 100, 50]);
3711
3712 let values: Vec<_> = batches
3713 .iter()
3714 .flat_map(|x| {
3715 x.column(0)
3716 .as_any()
3717 .downcast_ref::<Int32Array>()
3718 .unwrap()
3719 .values()
3720 .iter()
3721 .cloned()
3722 })
3723 .collect();
3724
3725 let expected_values: Vec<_> = [0..100, 0..50, 200..500].into_iter().flatten().collect();
3726 assert_eq!(&values, &expected_values)
3727 }
3728
3729 #[test]
3730 fn complex_aggregate() {
3731 let field_a = Arc::new(Field::new("leaf_a", DataType::Int32, false));
3733 let field_b = Arc::new(Field::new("leaf_b", DataType::Int32, true));
3734 let struct_a = Arc::new(Field::new(
3735 "struct_a",
3736 DataType::Struct(vec![field_a.clone(), field_b.clone()].into()),
3737 true,
3738 ));
3739
3740 let list_a = Arc::new(Field::new("list", DataType::List(struct_a), true));
3741 let struct_b = Arc::new(Field::new(
3742 "struct_b",
3743 DataType::Struct(vec![list_a.clone()].into()),
3744 false,
3745 ));
3746
3747 let schema = Arc::new(Schema::new(vec![struct_b]));
3748
3749 let field_a_array = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
3751 let field_b_array =
3752 Int32Array::from_iter(vec![Some(1), None, Some(2), None, None, Some(6)]);
3753
3754 let struct_a_array = StructArray::from(vec![
3755 (field_a.clone(), Arc::new(field_a_array) as ArrayRef),
3756 (field_b.clone(), Arc::new(field_b_array) as ArrayRef),
3757 ]);
3758
3759 let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
3760 .len(5)
3761 .add_buffer(Buffer::from_iter(vec![
3762 0_i32, 1_i32, 1_i32, 3_i32, 3_i32, 5_i32,
3763 ]))
3764 .null_bit_buffer(Some(Buffer::from_iter(vec![
3765 true, false, true, false, true,
3766 ])))
3767 .child_data(vec![struct_a_array.into_data()])
3768 .build()
3769 .unwrap();
3770
3771 let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
3772 let struct_b_array = StructArray::from(vec![(list_a.clone(), list_a_array)]);
3773
3774 let batch1 =
3775 RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
3776 .unwrap();
3777
3778 let field_a_array = Int32Array::from(vec![6, 7, 8, 9, 10]);
3779 let field_b_array = Int32Array::from_iter(vec![None, None, None, Some(1), None]);
3780
3781 let struct_a_array = StructArray::from(vec![
3782 (field_a, Arc::new(field_a_array) as ArrayRef),
3783 (field_b, Arc::new(field_b_array) as ArrayRef),
3784 ]);
3785
3786 let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
3787 .len(2)
3788 .add_buffer(Buffer::from_iter(vec![0_i32, 4_i32, 5_i32]))
3789 .child_data(vec![struct_a_array.into_data()])
3790 .build()
3791 .unwrap();
3792
3793 let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
3794 let struct_b_array = StructArray::from(vec![(list_a, list_a_array)]);
3795
3796 let batch2 =
3797 RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
3798 .unwrap();
3799
3800 let batches = &[batch1, batch2];
3801
3802 let expected = r#"
3805 +-------------------------------------------------------------------------------------------------------+
3806 | struct_b |
3807 +-------------------------------------------------------------------------------------------------------+
3808 | {list: [{leaf_a: 1, leaf_b: 1}]} |
3809 | {list: } |
3810 | {list: [{leaf_a: 2, leaf_b: }, {leaf_a: 3, leaf_b: 2}]} |
3811 | {list: } |
3812 | {list: [{leaf_a: 4, leaf_b: }, {leaf_a: 5, leaf_b: }]} |
3813 | {list: [{leaf_a: 6, leaf_b: }, {leaf_a: 7, leaf_b: }, {leaf_a: 8, leaf_b: }, {leaf_a: 9, leaf_b: 1}]} |
3814 | {list: [{leaf_a: 10, leaf_b: }]} |
3815 +-------------------------------------------------------------------------------------------------------+
3816 "#.trim().split('\n').map(|x| x.trim()).collect::<Vec<_>>().join("\n");
3817
3818 let actual = pretty_format_batches(batches).unwrap().to_string();
3819 assert_eq!(actual, expected);
3820
3821 let file = tempfile::tempfile().unwrap();
3823 let props = WriterProperties::builder()
3824 .set_max_row_group_size(6)
3825 .build();
3826
3827 let mut writer =
3828 ArrowWriter::try_new(file.try_clone().unwrap(), schema, Some(props)).unwrap();
3829
3830 for batch in batches {
3831 writer.write(batch).unwrap();
3832 }
3833 writer.close().unwrap();
3834
3835 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3840 assert_eq!(&row_group_sizes(builder.metadata()), &[6, 1]);
3841
3842 let batches = builder
3843 .with_batch_size(2)
3844 .build()
3845 .unwrap()
3846 .collect::<ArrowResult<Vec<_>>>()
3847 .unwrap();
3848
3849 assert_eq!(batches.len(), 4);
3850 let batch_counts: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
3851 assert_eq!(&batch_counts, &[2, 2, 2, 1]);
3852
3853 let actual = pretty_format_batches(&batches).unwrap().to_string();
3854 assert_eq!(actual, expected);
3855 }
3856
3857 #[test]
3858 fn test_arrow_writer_metadata() {
3859 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3860 let file_schema = batch_schema.clone().with_metadata(
3861 vec![("foo".to_string(), "bar".to_string())]
3862 .into_iter()
3863 .collect(),
3864 );
3865
3866 let batch = RecordBatch::try_new(
3867 Arc::new(batch_schema),
3868 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3869 )
3870 .unwrap();
3871
3872 let mut buf = Vec::with_capacity(1024);
3873 let mut writer = ArrowWriter::try_new(&mut buf, Arc::new(file_schema), None).unwrap();
3874 writer.write(&batch).unwrap();
3875 writer.close().unwrap();
3876 }
3877
3878 #[test]
3879 fn test_arrow_writer_nullable() {
3880 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3881 let file_schema = Schema::new(vec![Field::new("int32", DataType::Int32, true)]);
3882 let file_schema = Arc::new(file_schema);
3883
3884 let batch = RecordBatch::try_new(
3885 Arc::new(batch_schema),
3886 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3887 )
3888 .unwrap();
3889
3890 let mut buf = Vec::with_capacity(1024);
3891 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
3892 writer.write(&batch).unwrap();
3893 writer.close().unwrap();
3894
3895 let mut read = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
3896 let back = read.next().unwrap().unwrap();
3897 assert_eq!(back.schema(), file_schema);
3898 assert_ne!(back.schema(), batch.schema());
3899 assert_eq!(back.column(0).as_ref(), batch.column(0).as_ref());
3900 }
3901
3902 #[test]
3903 fn in_progress_accounting() {
3904 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
3906
3907 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
3909
3910 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
3912
3913 let mut writer = ArrowWriter::try_new(vec![], batch.schema(), None).unwrap();
3914
3915 assert_eq!(writer.in_progress_size(), 0);
3917 assert_eq!(writer.in_progress_rows(), 0);
3918 assert_eq!(writer.memory_size(), 0);
3919 assert_eq!(writer.bytes_written(), 4); writer.write(&batch).unwrap();
3921
3922 let initial_size = writer.in_progress_size();
3924 assert!(initial_size > 0);
3925 assert_eq!(writer.in_progress_rows(), 5);
3926 let initial_memory = writer.memory_size();
3927 assert!(initial_memory > 0);
3928 assert!(
3930 initial_size <= initial_memory,
3931 "{initial_size} <= {initial_memory}"
3932 );
3933
3934 writer.write(&batch).unwrap();
3936 assert!(writer.in_progress_size() > initial_size);
3937 assert_eq!(writer.in_progress_rows(), 10);
3938 assert!(writer.memory_size() > initial_memory);
3939 assert!(
3940 writer.in_progress_size() <= writer.memory_size(),
3941 "in_progress_size {} <= memory_size {}",
3942 writer.in_progress_size(),
3943 writer.memory_size()
3944 );
3945
3946 let pre_flush_bytes_written = writer.bytes_written();
3948 writer.flush().unwrap();
3949 assert_eq!(writer.in_progress_size(), 0);
3950 assert_eq!(writer.memory_size(), 0);
3951 assert!(writer.bytes_written() > pre_flush_bytes_written);
3952
3953 writer.close().unwrap();
3954 }
3955
3956 #[test]
3957 fn test_writer_all_null() {
3958 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
3959 let b = Int32Array::new(vec![0; 5].into(), Some(NullBuffer::new_null(5)));
3960 let batch = RecordBatch::try_from_iter(vec![
3961 ("a", Arc::new(a) as ArrayRef),
3962 ("b", Arc::new(b) as ArrayRef),
3963 ])
3964 .unwrap();
3965
3966 let mut buf = Vec::with_capacity(1024);
3967 let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), None).unwrap();
3968 writer.write(&batch).unwrap();
3969 writer.close().unwrap();
3970
3971 let bytes = Bytes::from(buf);
3972 let options = ReadOptionsBuilder::new().with_page_index().build();
3973 let reader = SerializedFileReader::new_with_options(bytes, options).unwrap();
3974 let index = reader.metadata().offset_index().unwrap();
3975
3976 assert_eq!(index.len(), 1);
3977 assert_eq!(index[0].len(), 2); assert_eq!(index[0][0].page_locations().len(), 1); assert_eq!(index[0][1].page_locations().len(), 1); }
3981
3982 #[test]
3983 fn test_disabled_statistics_with_page() {
3984 let file_schema = Schema::new(vec![
3985 Field::new("a", DataType::Utf8, true),
3986 Field::new("b", DataType::Utf8, true),
3987 ]);
3988 let file_schema = Arc::new(file_schema);
3989
3990 let batch = RecordBatch::try_new(
3991 file_schema.clone(),
3992 vec![
3993 Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
3994 Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
3995 ],
3996 )
3997 .unwrap();
3998
3999 let props = WriterProperties::builder()
4000 .set_statistics_enabled(EnabledStatistics::None)
4001 .set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
4002 .build();
4003
4004 let mut buf = Vec::with_capacity(1024);
4005 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
4006 writer.write(&batch).unwrap();
4007
4008 let metadata = writer.close().unwrap();
4009 assert_eq!(metadata.num_row_groups(), 1);
4010 let row_group = metadata.row_group(0);
4011 assert_eq!(row_group.num_columns(), 2);
4012 assert!(row_group.column(0).offset_index_offset().is_some());
4014 assert!(row_group.column(0).column_index_offset().is_some());
4015 assert!(row_group.column(1).offset_index_offset().is_some());
4017 assert!(row_group.column(1).column_index_offset().is_none());
4018
4019 let options = ReadOptionsBuilder::new().with_page_index().build();
4020 let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
4021
4022 let row_group = reader.get_row_group(0).unwrap();
4023 let a_col = row_group.metadata().column(0);
4024 let b_col = row_group.metadata().column(1);
4025
4026 if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
4028 let min = byte_array_stats.min_opt().unwrap();
4029 let max = byte_array_stats.max_opt().unwrap();
4030
4031 assert_eq!(min.as_bytes(), b"a");
4032 assert_eq!(max.as_bytes(), b"d");
4033 } else {
4034 panic!("expecting Statistics::ByteArray");
4035 }
4036
4037 assert!(b_col.statistics().is_none());
4039
4040 let offset_index = reader.metadata().offset_index().unwrap();
4041 assert_eq!(offset_index.len(), 1); assert_eq!(offset_index[0].len(), 2); let column_index = reader.metadata().column_index().unwrap();
4045 assert_eq!(column_index.len(), 1); assert_eq!(column_index[0].len(), 2); let a_idx = &column_index[0][0];
4049 assert!(
4050 matches!(a_idx, ColumnIndexMetaData::BYTE_ARRAY(_)),
4051 "{a_idx:?}"
4052 );
4053 let b_idx = &column_index[0][1];
4054 assert!(matches!(b_idx, ColumnIndexMetaData::NONE), "{b_idx:?}");
4055 }
4056
4057 #[test]
4058 fn test_disabled_statistics_with_chunk() {
4059 let file_schema = Schema::new(vec![
4060 Field::new("a", DataType::Utf8, true),
4061 Field::new("b", DataType::Utf8, true),
4062 ]);
4063 let file_schema = Arc::new(file_schema);
4064
4065 let batch = RecordBatch::try_new(
4066 file_schema.clone(),
4067 vec![
4068 Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
4069 Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
4070 ],
4071 )
4072 .unwrap();
4073
4074 let props = WriterProperties::builder()
4075 .set_statistics_enabled(EnabledStatistics::None)
4076 .set_column_statistics_enabled("a".into(), EnabledStatistics::Chunk)
4077 .build();
4078
4079 let mut buf = Vec::with_capacity(1024);
4080 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
4081 writer.write(&batch).unwrap();
4082
4083 let metadata = writer.close().unwrap();
4084 assert_eq!(metadata.num_row_groups(), 1);
4085 let row_group = metadata.row_group(0);
4086 assert_eq!(row_group.num_columns(), 2);
4087 assert!(row_group.column(0).offset_index_offset().is_some());
4089 assert!(row_group.column(0).column_index_offset().is_none());
4090 assert!(row_group.column(1).offset_index_offset().is_some());
4092 assert!(row_group.column(1).column_index_offset().is_none());
4093
4094 let options = ReadOptionsBuilder::new().with_page_index().build();
4095 let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
4096
4097 let row_group = reader.get_row_group(0).unwrap();
4098 let a_col = row_group.metadata().column(0);
4099 let b_col = row_group.metadata().column(1);
4100
4101 if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
4103 let min = byte_array_stats.min_opt().unwrap();
4104 let max = byte_array_stats.max_opt().unwrap();
4105
4106 assert_eq!(min.as_bytes(), b"a");
4107 assert_eq!(max.as_bytes(), b"d");
4108 } else {
4109 panic!("expecting Statistics::ByteArray");
4110 }
4111
4112 assert!(b_col.statistics().is_none());
4114
4115 let column_index = reader.metadata().column_index().unwrap();
4116 assert_eq!(column_index.len(), 1); assert_eq!(column_index[0].len(), 2); let a_idx = &column_index[0][0];
4120 assert!(matches!(a_idx, ColumnIndexMetaData::NONE), "{a_idx:?}");
4121 let b_idx = &column_index[0][1];
4122 assert!(matches!(b_idx, ColumnIndexMetaData::NONE), "{b_idx:?}");
4123 }
4124
4125 #[test]
4126 fn test_arrow_writer_skip_metadata() {
4127 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
4128 let file_schema = Arc::new(batch_schema.clone());
4129
4130 let batch = RecordBatch::try_new(
4131 Arc::new(batch_schema),
4132 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4133 )
4134 .unwrap();
4135 let skip_options = ArrowWriterOptions::new().with_skip_arrow_metadata(true);
4136
4137 let mut buf = Vec::with_capacity(1024);
4138 let mut writer =
4139 ArrowWriter::try_new_with_options(&mut buf, file_schema.clone(), skip_options).unwrap();
4140 writer.write(&batch).unwrap();
4141 writer.close().unwrap();
4142
4143 let bytes = Bytes::from(buf);
4144 let reader_builder = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
4145 assert_eq!(file_schema, *reader_builder.schema());
4146 if let Some(key_value_metadata) = reader_builder
4147 .metadata()
4148 .file_metadata()
4149 .key_value_metadata()
4150 {
4151 assert!(
4152 !key_value_metadata
4153 .iter()
4154 .any(|kv| kv.key.as_str() == ARROW_SCHEMA_META_KEY)
4155 );
4156 }
4157 }
4158
4159 #[test]
4160 fn test_arrow_writer_explicit_schema() {
4161 let batch_schema = Arc::new(Schema::new(vec![Field::new(
4163 "integers",
4164 DataType::Int32,
4165 true,
4166 )]));
4167 let parquet_schema = Type::group_type_builder("root")
4168 .with_fields(vec![
4169 Type::primitive_type_builder("integers", crate::basic::Type::INT64)
4170 .build()
4171 .unwrap()
4172 .into(),
4173 ])
4174 .build()
4175 .unwrap();
4176 let parquet_schema_descr = SchemaDescriptor::new(parquet_schema.into());
4177
4178 let batch = RecordBatch::try_new(
4179 batch_schema.clone(),
4180 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4181 )
4182 .unwrap();
4183
4184 let explicit_schema_options =
4185 ArrowWriterOptions::new().with_parquet_schema(parquet_schema_descr);
4186 let mut buf = Vec::with_capacity(1024);
4187 let mut writer = ArrowWriter::try_new_with_options(
4188 &mut buf,
4189 batch_schema.clone(),
4190 explicit_schema_options,
4191 )
4192 .unwrap();
4193 writer.write(&batch).unwrap();
4194 writer.close().unwrap();
4195
4196 let bytes = Bytes::from(buf);
4197 let reader_builder = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
4198
4199 let expected_schema = Arc::new(Schema::new(vec![Field::new(
4200 "integers",
4201 DataType::Int64,
4202 true,
4203 )]));
4204 assert_eq!(reader_builder.schema(), &expected_schema);
4205
4206 let batches = reader_builder
4207 .build()
4208 .unwrap()
4209 .collect::<Result<Vec<_>, ArrowError>>()
4210 .unwrap();
4211 assert_eq!(batches.len(), 1);
4212
4213 let expected_batch = RecordBatch::try_new(
4214 expected_schema.clone(),
4215 vec![Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as _],
4216 )
4217 .unwrap();
4218 assert_eq!(batches[0], expected_batch);
4219 }
4220
4221 #[test]
4222 fn mismatched_schemas() {
4223 let batch_schema = Schema::new(vec![Field::new("count", DataType::Int32, false)]);
4224 let file_schema = Arc::new(Schema::new(vec![Field::new(
4225 "temperature",
4226 DataType::Float64,
4227 false,
4228 )]));
4229
4230 let batch = RecordBatch::try_new(
4231 Arc::new(batch_schema),
4232 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4233 )
4234 .unwrap();
4235
4236 let mut buf = Vec::with_capacity(1024);
4237 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
4238
4239 let err = writer.write(&batch).unwrap_err().to_string();
4240 assert_eq!(
4241 err,
4242 "Arrow: Incompatible type. Field 'temperature' has type Float64, array has type Int32"
4243 );
4244 }
4245
4246 #[test]
4247 fn test_roundtrip_empty_schema() {
4249 let empty_batch = RecordBatch::try_new_with_options(
4251 Arc::new(Schema::empty()),
4252 vec![],
4253 &RecordBatchOptions::default().with_row_count(Some(0)),
4254 )
4255 .unwrap();
4256
4257 let mut parquet_bytes: Vec<u8> = Vec::new();
4259 let mut writer =
4260 ArrowWriter::try_new(&mut parquet_bytes, empty_batch.schema(), None).unwrap();
4261 writer.write(&empty_batch).unwrap();
4262 writer.close().unwrap();
4263
4264 let bytes = Bytes::from(parquet_bytes);
4266 let reader = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
4267 assert_eq!(reader.schema(), &empty_batch.schema());
4268 let batches: Vec<_> = reader
4269 .build()
4270 .unwrap()
4271 .collect::<ArrowResult<Vec<_>>>()
4272 .unwrap();
4273 assert_eq!(batches.len(), 0);
4274 }
4275
4276 #[test]
4277 fn test_page_stats_not_written_by_default() {
4278 let string_field = Field::new("a", DataType::Utf8, false);
4279 let schema = Schema::new(vec![string_field]);
4280 let raw_string_values = vec!["Blart Versenwald III"];
4281 let string_values = StringArray::from(raw_string_values.clone());
4282 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
4283
4284 let props = WriterProperties::builder()
4285 .set_statistics_enabled(EnabledStatistics::Page)
4286 .set_dictionary_enabled(false)
4287 .set_encoding(Encoding::PLAIN)
4288 .set_compression(crate::basic::Compression::UNCOMPRESSED)
4289 .build();
4290
4291 let file = roundtrip_opts(&batch, props);
4292
4293 let first_page = &file[4..];
4298 let mut prot = ThriftSliceInputProtocol::new(first_page);
4299 let hdr = PageHeader::read_thrift(&mut prot).unwrap();
4300 let stats = hdr.data_page_header.unwrap().statistics;
4301
4302 assert!(stats.is_none());
4303 }
4304
4305 #[test]
4306 fn test_page_stats_when_enabled() {
4307 let string_field = Field::new("a", DataType::Utf8, false);
4308 let schema = Schema::new(vec![string_field]);
4309 let raw_string_values = vec!["Blart Versenwald III", "Andrew Lamb"];
4310 let string_values = StringArray::from(raw_string_values.clone());
4311 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
4312
4313 let props = WriterProperties::builder()
4314 .set_statistics_enabled(EnabledStatistics::Page)
4315 .set_dictionary_enabled(false)
4316 .set_encoding(Encoding::PLAIN)
4317 .set_write_page_header_statistics(true)
4318 .set_compression(crate::basic::Compression::UNCOMPRESSED)
4319 .build();
4320
4321 let file = roundtrip_opts(&batch, props);
4322
4323 let first_page = &file[4..];
4328 let mut prot = ThriftSliceInputProtocol::new(first_page);
4329 let hdr = PageHeader::read_thrift(&mut prot).unwrap();
4330 let stats = hdr.data_page_header.unwrap().statistics;
4331
4332 let stats = stats.unwrap();
4333 assert!(stats.is_max_value_exact.unwrap());
4335 assert!(stats.is_min_value_exact.unwrap());
4336 assert_eq!(stats.max_value.unwrap(), "Blart Versenwald III".as_bytes());
4337 assert_eq!(stats.min_value.unwrap(), "Andrew Lamb".as_bytes());
4338 }
4339
4340 #[test]
4341 fn test_page_stats_truncation() {
4342 let string_field = Field::new("a", DataType::Utf8, false);
4343 let binary_field = Field::new("b", DataType::Binary, false);
4344 let schema = Schema::new(vec![string_field, binary_field]);
4345
4346 let raw_string_values = vec!["Blart Versenwald III"];
4347 let raw_binary_values = [b"Blart Versenwald III".to_vec()];
4348 let raw_binary_value_refs = raw_binary_values
4349 .iter()
4350 .map(|x| x.as_slice())
4351 .collect::<Vec<_>>();
4352
4353 let string_values = StringArray::from(raw_string_values.clone());
4354 let binary_values = BinaryArray::from(raw_binary_value_refs);
4355 let batch = RecordBatch::try_new(
4356 Arc::new(schema),
4357 vec![Arc::new(string_values), Arc::new(binary_values)],
4358 )
4359 .unwrap();
4360
4361 let props = WriterProperties::builder()
4362 .set_statistics_truncate_length(Some(2))
4363 .set_dictionary_enabled(false)
4364 .set_encoding(Encoding::PLAIN)
4365 .set_write_page_header_statistics(true)
4366 .set_compression(crate::basic::Compression::UNCOMPRESSED)
4367 .build();
4368
4369 let file = roundtrip_opts(&batch, props);
4370
4371 let first_page = &file[4..];
4376 let mut prot = ThriftSliceInputProtocol::new(first_page);
4377 let hdr = PageHeader::read_thrift(&mut prot).unwrap();
4378 let stats = hdr.data_page_header.unwrap().statistics;
4379 assert!(stats.is_some());
4380 let stats = stats.unwrap();
4381 assert!(!stats.is_max_value_exact.unwrap());
4383 assert!(!stats.is_min_value_exact.unwrap());
4384 assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
4385 assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
4386
4387 let second_page = &prot.as_slice()[hdr.compressed_page_size as usize..];
4389 let mut prot = ThriftSliceInputProtocol::new(second_page);
4390 let hdr = PageHeader::read_thrift(&mut prot).unwrap();
4391 let stats = hdr.data_page_header.unwrap().statistics;
4392 assert!(stats.is_some());
4393 let stats = stats.unwrap();
4394 assert!(!stats.is_max_value_exact.unwrap());
4396 assert!(!stats.is_min_value_exact.unwrap());
4397 assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
4398 assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
4399 }
4400
4401 #[test]
4402 fn test_page_encoding_statistics_roundtrip() {
4403 let batch_schema = Schema::new(vec![Field::new(
4404 "int32",
4405 arrow_schema::DataType::Int32,
4406 false,
4407 )]);
4408
4409 let batch = RecordBatch::try_new(
4410 Arc::new(batch_schema.clone()),
4411 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4412 )
4413 .unwrap();
4414
4415 let mut file: File = tempfile::tempfile().unwrap();
4416 let mut writer = ArrowWriter::try_new(&mut file, Arc::new(batch_schema), None).unwrap();
4417 writer.write(&batch).unwrap();
4418 let file_metadata = writer.close().unwrap();
4419
4420 assert_eq!(file_metadata.num_row_groups(), 1);
4421 assert_eq!(file_metadata.row_group(0).num_columns(), 1);
4422 assert!(
4423 file_metadata
4424 .row_group(0)
4425 .column(0)
4426 .page_encoding_stats()
4427 .is_some()
4428 );
4429 let chunk_page_stats = file_metadata
4430 .row_group(0)
4431 .column(0)
4432 .page_encoding_stats()
4433 .unwrap();
4434
4435 let options = ReadOptionsBuilder::new().with_page_index().build();
4437 let reader = SerializedFileReader::new_with_options(file, options).unwrap();
4438
4439 let rowgroup = reader.get_row_group(0).expect("row group missing");
4440 assert_eq!(rowgroup.num_columns(), 1);
4441 let column = rowgroup.metadata().column(0);
4442 assert!(column.page_encoding_stats().is_some());
4443 let file_page_stats = column.page_encoding_stats().unwrap();
4444 assert_eq!(chunk_page_stats, file_page_stats);
4445 }
4446
4447 #[test]
4448 fn test_different_dict_page_size_limit() {
4449 let array = Arc::new(Int64Array::from_iter(0..1024 * 1024));
4450 let schema = Arc::new(Schema::new(vec![
4451 Field::new("col0", arrow_schema::DataType::Int64, false),
4452 Field::new("col1", arrow_schema::DataType::Int64, false),
4453 ]));
4454 let batch =
4455 arrow_array::RecordBatch::try_new(schema.clone(), vec![array.clone(), array]).unwrap();
4456
4457 let props = WriterProperties::builder()
4458 .set_dictionary_page_size_limit(1024 * 1024)
4459 .set_column_dictionary_page_size_limit(ColumnPath::from("col1"), 1024 * 1024 * 4)
4460 .build();
4461 let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap();
4462 writer.write(&batch).unwrap();
4463 let data = Bytes::from(writer.into_inner().unwrap());
4464
4465 let mut metadata = ParquetMetaDataReader::new();
4466 metadata.try_parse(&data).unwrap();
4467 let metadata = metadata.finish().unwrap();
4468 let col0_meta = metadata.row_group(0).column(0);
4469 let col1_meta = metadata.row_group(0).column(1);
4470
4471 let get_dict_page_size = move |meta: &ColumnChunkMetaData| {
4472 let mut reader =
4473 SerializedPageReader::new(Arc::new(data.clone()), meta, 0, None).unwrap();
4474 let page = reader.get_next_page().unwrap().unwrap();
4475 match page {
4476 Page::DictionaryPage { buf, .. } => buf.len(),
4477 _ => panic!("expected DictionaryPage"),
4478 }
4479 };
4480
4481 assert_eq!(get_dict_page_size(col0_meta), 1024 * 1024);
4482 assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4);
4483 }
4484}