1use bytes::Bytes;
21use std::io::{Read, Write};
22use std::iter::Peekable;
23use std::slice::Iter;
24use std::sync::{Arc, Mutex};
25use std::vec::IntoIter;
26use thrift::protocol::TCompactOutputProtocol;
27
28use arrow_array::cast::AsArray;
29use arrow_array::types::*;
30use arrow_array::{ArrayRef, RecordBatch, RecordBatchWriter};
31use arrow_schema::{ArrowError, DataType as ArrowDataType, Field, IntervalUnit, SchemaRef};
32
33use super::schema::{add_encoded_arrow_schema_to_metadata, decimal_length_from_precision};
34
35use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder;
36use crate::arrow::ArrowSchemaConverter;
37use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter};
38use crate::column::page_encryption::PageEncryptor;
39use crate::column::writer::encoder::ColumnValueEncoder;
40use crate::column::writer::{
41 get_column_writer, ColumnCloseResult, ColumnWriter, GenericColumnWriter,
42};
43use crate::data_type::{ByteArray, FixedLenByteArray};
44#[cfg(feature = "encryption")]
45use crate::encryption::encrypt::FileEncryptor;
46use crate::errors::{ParquetError, Result};
47use crate::file::metadata::{KeyValue, RowGroupMetaData};
48use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
49use crate::file::reader::{ChunkReader, Length};
50use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
51use crate::schema::types::{ColumnDescPtr, SchemaDescriptor};
52use crate::thrift::TSerializable;
53use levels::{calculate_array_levels, ArrayLevels};
54
55mod byte_array;
56mod levels;
57
58pub struct ArrowWriter<W: Write> {
175 writer: SerializedFileWriter<W>,
177
178 in_progress: Option<ArrowRowGroupWriter>,
180
181 arrow_schema: SchemaRef,
185
186 row_group_writer_factory: ArrowRowGroupWriterFactory,
188
189 max_row_group_size: usize,
191}
192
193impl<W: Write + Send> std::fmt::Debug for ArrowWriter<W> {
194 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
195 let buffered_memory = self.in_progress_size();
196 f.debug_struct("ArrowWriter")
197 .field("writer", &self.writer)
198 .field("in_progress_size", &format_args!("{buffered_memory} bytes"))
199 .field("in_progress_rows", &self.in_progress_rows())
200 .field("arrow_schema", &self.arrow_schema)
201 .field("max_row_group_size", &self.max_row_group_size)
202 .finish()
203 }
204}
205
206impl<W: Write + Send> ArrowWriter<W> {
207 pub fn try_new(
213 writer: W,
214 arrow_schema: SchemaRef,
215 props: Option<WriterProperties>,
216 ) -> Result<Self> {
217 let options = ArrowWriterOptions::new().with_properties(props.unwrap_or_default());
218 Self::try_new_with_options(writer, arrow_schema, options)
219 }
220
221 pub fn try_new_with_options(
227 writer: W,
228 arrow_schema: SchemaRef,
229 options: ArrowWriterOptions,
230 ) -> Result<Self> {
231 let mut props = options.properties;
232 let mut converter = ArrowSchemaConverter::new().with_coerce_types(props.coerce_types());
233 if let Some(schema_root) = &options.schema_root {
234 converter = converter.schema_root(schema_root);
235 }
236 let schema = converter.convert(&arrow_schema)?;
237 if !options.skip_arrow_metadata {
238 add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
240 }
241
242 let max_row_group_size = props.max_row_group_size();
243
244 let props_ptr = Arc::new(props);
245 let file_writer =
246 SerializedFileWriter::new(writer, schema.root_schema_ptr(), Arc::clone(&props_ptr))?;
247
248 let row_group_writer_factory =
249 ArrowRowGroupWriterFactory::new(&file_writer, schema, arrow_schema.clone(), props_ptr);
250
251 Ok(Self {
252 writer: file_writer,
253 in_progress: None,
254 arrow_schema,
255 row_group_writer_factory,
256 max_row_group_size,
257 })
258 }
259
260 pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
262 self.writer.flushed_row_groups()
263 }
264
265 pub fn memory_size(&self) -> usize {
270 match &self.in_progress {
271 Some(in_progress) => in_progress.writers.iter().map(|x| x.memory_size()).sum(),
272 None => 0,
273 }
274 }
275
276 pub fn in_progress_size(&self) -> usize {
283 match &self.in_progress {
284 Some(in_progress) => in_progress
285 .writers
286 .iter()
287 .map(|x| x.get_estimated_total_bytes())
288 .sum(),
289 None => 0,
290 }
291 }
292
293 pub fn in_progress_rows(&self) -> usize {
295 self.in_progress
296 .as_ref()
297 .map(|x| x.buffered_rows)
298 .unwrap_or_default()
299 }
300
301 pub fn bytes_written(&self) -> usize {
303 self.writer.bytes_written()
304 }
305
306 pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
314 if batch.num_rows() == 0 {
315 return Ok(());
316 }
317
318 let in_progress = match &mut self.in_progress {
319 Some(in_progress) => in_progress,
320 x => x.insert(
321 self.row_group_writer_factory
322 .create_row_group_writer(self.writer.flushed_row_groups().len())?,
323 ),
324 };
325
326 if in_progress.buffered_rows + batch.num_rows() > self.max_row_group_size {
328 let to_write = self.max_row_group_size - in_progress.buffered_rows;
329 let a = batch.slice(0, to_write);
330 let b = batch.slice(to_write, batch.num_rows() - to_write);
331 self.write(&a)?;
332 return self.write(&b);
333 }
334
335 in_progress.write(batch)?;
336
337 if in_progress.buffered_rows >= self.max_row_group_size {
338 self.flush()?
339 }
340 Ok(())
341 }
342
343 pub fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
348 self.writer.write_all(buf)
349 }
350
351 pub fn flush(&mut self) -> Result<()> {
353 let in_progress = match self.in_progress.take() {
354 Some(in_progress) => in_progress,
355 None => return Ok(()),
356 };
357
358 let mut row_group_writer = self.writer.next_row_group()?;
359 for chunk in in_progress.close()? {
360 chunk.append_to_row_group(&mut row_group_writer)?;
361 }
362 row_group_writer.close()?;
363 Ok(())
364 }
365
366 pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
370 self.writer.append_key_value_metadata(kv_metadata)
371 }
372
373 pub fn inner(&self) -> &W {
375 self.writer.inner()
376 }
377
378 pub fn inner_mut(&mut self) -> &mut W {
387 self.writer.inner_mut()
388 }
389
390 pub fn into_inner(mut self) -> Result<W> {
392 self.flush()?;
393 self.writer.into_inner()
394 }
395
396 pub fn finish(&mut self) -> Result<crate::format::FileMetaData> {
402 self.flush()?;
403 self.writer.finish()
404 }
405
406 pub fn close(mut self) -> Result<crate::format::FileMetaData> {
408 self.finish()
409 }
410
411 pub fn get_column_writers(&mut self) -> Result<Vec<ArrowColumnWriter>> {
413 self.flush()?;
414 let in_progress = self
415 .row_group_writer_factory
416 .create_row_group_writer(self.writer.flushed_row_groups().len())?;
417 Ok(in_progress.writers)
418 }
419
420 pub fn append_row_group(&mut self, chunks: Vec<ArrowColumnChunk>) -> Result<()> {
422 let mut row_group_writer = self.writer.next_row_group()?;
423 for chunk in chunks {
424 chunk.append_to_row_group(&mut row_group_writer)?;
425 }
426 row_group_writer.close()?;
427 Ok(())
428 }
429}
430
431impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> {
432 fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
433 self.write(batch).map_err(|e| e.into())
434 }
435
436 fn close(self) -> std::result::Result<(), ArrowError> {
437 self.close()?;
438 Ok(())
439 }
440}
441
442#[derive(Debug, Clone, Default)]
446pub struct ArrowWriterOptions {
447 properties: WriterProperties,
448 skip_arrow_metadata: bool,
449 schema_root: Option<String>,
450}
451
452impl ArrowWriterOptions {
453 pub fn new() -> Self {
455 Self::default()
456 }
457
458 pub fn with_properties(self, properties: WriterProperties) -> Self {
460 Self { properties, ..self }
461 }
462
463 pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
470 Self {
471 skip_arrow_metadata,
472 ..self
473 }
474 }
475
476 pub fn with_schema_root(self, schema_root: String) -> Self {
478 Self {
479 schema_root: Some(schema_root),
480 ..self
481 }
482 }
483}
484
485#[derive(Default)]
487struct ArrowColumnChunkData {
488 length: usize,
489 data: Vec<Bytes>,
490}
491
492impl Length for ArrowColumnChunkData {
493 fn len(&self) -> u64 {
494 self.length as _
495 }
496}
497
498impl ChunkReader for ArrowColumnChunkData {
499 type T = ArrowColumnChunkReader;
500
501 fn get_read(&self, start: u64) -> Result<Self::T> {
502 assert_eq!(start, 0); Ok(ArrowColumnChunkReader(
504 self.data.clone().into_iter().peekable(),
505 ))
506 }
507
508 fn get_bytes(&self, _start: u64, _length: usize) -> Result<Bytes> {
509 unimplemented!()
510 }
511}
512
513struct ArrowColumnChunkReader(Peekable<IntoIter<Bytes>>);
515
516impl Read for ArrowColumnChunkReader {
517 fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
518 let buffer = loop {
519 match self.0.peek_mut() {
520 Some(b) if b.is_empty() => {
521 self.0.next();
522 continue;
523 }
524 Some(b) => break b,
525 None => return Ok(0),
526 }
527 };
528
529 let len = buffer.len().min(out.len());
530 let b = buffer.split_to(len);
531 out[..len].copy_from_slice(&b);
532 Ok(len)
533 }
534}
535
536type SharedColumnChunk = Arc<Mutex<ArrowColumnChunkData>>;
541
542#[derive(Default)]
543struct ArrowPageWriter {
544 buffer: SharedColumnChunk,
545 #[cfg(feature = "encryption")]
546 page_encryptor: Option<PageEncryptor>,
547}
548
549impl ArrowPageWriter {
550 #[cfg(feature = "encryption")]
551 pub fn with_encryptor(mut self, page_encryptor: Option<PageEncryptor>) -> Self {
552 self.page_encryptor = page_encryptor;
553 self
554 }
555
556 #[cfg(feature = "encryption")]
557 fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
558 self.page_encryptor.as_mut()
559 }
560
561 #[cfg(not(feature = "encryption"))]
562 fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
563 None
564 }
565}
566
567impl PageWriter for ArrowPageWriter {
568 fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
569 let page = match self.page_encryptor_mut() {
570 Some(page_encryptor) => page_encryptor.encrypt_compressed_page(page)?,
571 None => page,
572 };
573
574 let page_header = page.to_thrift_header()?;
575 let header = {
576 let mut header = Vec::with_capacity(1024);
577
578 match self.page_encryptor_mut() {
579 Some(page_encryptor) => {
580 page_encryptor.encrypt_page_header(&page_header, &mut header)?;
581 if page.compressed_page().is_data_page() {
582 page_encryptor.increment_page();
583 }
584 }
585 None => {
586 let mut protocol = TCompactOutputProtocol::new(&mut header);
587 page_header.write_to_out_protocol(&mut protocol)?;
588 }
589 };
590
591 Bytes::from(header)
592 };
593
594 let mut buf = self.buffer.try_lock().unwrap();
595
596 let data = page.compressed_page().buffer().clone();
597 let compressed_size = data.len() + header.len();
598
599 let mut spec = PageWriteSpec::new();
600 spec.page_type = page.page_type();
601 spec.num_values = page.num_values();
602 spec.uncompressed_size = page.uncompressed_size() + header.len();
603 spec.offset = buf.length as u64;
604 spec.compressed_size = compressed_size;
605 spec.bytes_written = compressed_size as u64;
606
607 buf.length += compressed_size;
608 buf.data.push(header);
609 buf.data.push(data);
610
611 Ok(spec)
612 }
613
614 fn close(&mut self) -> Result<()> {
615 Ok(())
616 }
617}
618
619#[derive(Debug)]
621pub struct ArrowLeafColumn(ArrayLevels);
622
623pub fn compute_leaves(field: &Field, array: &ArrayRef) -> Result<Vec<ArrowLeafColumn>> {
625 let levels = calculate_array_levels(array, field)?;
626 Ok(levels.into_iter().map(ArrowLeafColumn).collect())
627}
628
629pub struct ArrowColumnChunk {
631 data: ArrowColumnChunkData,
632 close: ColumnCloseResult,
633}
634
635impl std::fmt::Debug for ArrowColumnChunk {
636 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
637 f.debug_struct("ArrowColumnChunk")
638 .field("length", &self.data.length)
639 .finish_non_exhaustive()
640 }
641}
642
643impl ArrowColumnChunk {
644 pub fn append_to_row_group<W: Write + Send>(
646 self,
647 writer: &mut SerializedRowGroupWriter<'_, W>,
648 ) -> Result<()> {
649 writer.append_column(&self.data, self.close)
650 }
651}
652
653pub struct ArrowColumnWriter {
747 writer: ArrowColumnWriterImpl,
748 chunk: SharedColumnChunk,
749}
750
751impl std::fmt::Debug for ArrowColumnWriter {
752 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
753 f.debug_struct("ArrowColumnWriter").finish_non_exhaustive()
754 }
755}
756
757enum ArrowColumnWriterImpl {
758 ByteArray(GenericColumnWriter<'static, ByteArrayEncoder>),
759 Column(ColumnWriter<'static>),
760}
761
762impl ArrowColumnWriter {
763 pub fn write(&mut self, col: &ArrowLeafColumn) -> Result<()> {
765 match &mut self.writer {
766 ArrowColumnWriterImpl::Column(c) => {
767 write_leaf(c, &col.0)?;
768 }
769 ArrowColumnWriterImpl::ByteArray(c) => {
770 write_primitive(c, col.0.array().as_ref(), &col.0)?;
771 }
772 }
773 Ok(())
774 }
775
776 pub fn close(self) -> Result<ArrowColumnChunk> {
778 let close = match self.writer {
779 ArrowColumnWriterImpl::ByteArray(c) => c.close()?,
780 ArrowColumnWriterImpl::Column(c) => c.close()?,
781 };
782 let chunk = Arc::try_unwrap(self.chunk).ok().unwrap();
783 let data = chunk.into_inner().unwrap();
784 Ok(ArrowColumnChunk { data, close })
785 }
786
787 pub fn memory_size(&self) -> usize {
798 match &self.writer {
799 ArrowColumnWriterImpl::ByteArray(c) => c.memory_size(),
800 ArrowColumnWriterImpl::Column(c) => c.memory_size(),
801 }
802 }
803
804 pub fn get_estimated_total_bytes(&self) -> usize {
812 match &self.writer {
813 ArrowColumnWriterImpl::ByteArray(c) => c.get_estimated_total_bytes() as _,
814 ArrowColumnWriterImpl::Column(c) => c.get_estimated_total_bytes() as _,
815 }
816 }
817}
818
819struct ArrowRowGroupWriter {
821 writers: Vec<ArrowColumnWriter>,
822 schema: SchemaRef,
823 buffered_rows: usize,
824}
825
826impl ArrowRowGroupWriter {
827 fn new(writers: Vec<ArrowColumnWriter>, arrow: &SchemaRef) -> Self {
828 Self {
829 writers,
830 schema: arrow.clone(),
831 buffered_rows: 0,
832 }
833 }
834
835 fn write(&mut self, batch: &RecordBatch) -> Result<()> {
836 self.buffered_rows += batch.num_rows();
837 let mut writers = self.writers.iter_mut();
838 for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
839 for leaf in compute_leaves(field.as_ref(), column)? {
840 writers.next().unwrap().write(&leaf)?
841 }
842 }
843 Ok(())
844 }
845
846 fn close(self) -> Result<Vec<ArrowColumnChunk>> {
847 self.writers
848 .into_iter()
849 .map(|writer| writer.close())
850 .collect()
851 }
852}
853
854struct ArrowRowGroupWriterFactory {
855 schema: SchemaDescriptor,
856 arrow_schema: SchemaRef,
857 props: WriterPropertiesPtr,
858 #[cfg(feature = "encryption")]
859 file_encryptor: Option<Arc<FileEncryptor>>,
860}
861
862impl ArrowRowGroupWriterFactory {
863 #[cfg(feature = "encryption")]
864 fn new<W: Write + Send>(
865 file_writer: &SerializedFileWriter<W>,
866 schema: SchemaDescriptor,
867 arrow_schema: SchemaRef,
868 props: WriterPropertiesPtr,
869 ) -> Self {
870 Self {
871 schema,
872 arrow_schema,
873 props,
874 file_encryptor: file_writer.file_encryptor(),
875 }
876 }
877
878 #[cfg(not(feature = "encryption"))]
879 fn new<W: Write + Send>(
880 _file_writer: &SerializedFileWriter<W>,
881 schema: SchemaDescriptor,
882 arrow_schema: SchemaRef,
883 props: WriterPropertiesPtr,
884 ) -> Self {
885 Self {
886 schema,
887 arrow_schema,
888 props,
889 }
890 }
891
892 #[cfg(feature = "encryption")]
893 fn create_row_group_writer(&self, row_group_index: usize) -> Result<ArrowRowGroupWriter> {
894 let writers = get_column_writers_with_encryptor(
895 &self.schema,
896 &self.props,
897 &self.arrow_schema,
898 self.file_encryptor.clone(),
899 row_group_index,
900 )?;
901 Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema))
902 }
903
904 #[cfg(not(feature = "encryption"))]
905 fn create_row_group_writer(&self, _row_group_index: usize) -> Result<ArrowRowGroupWriter> {
906 let writers = get_column_writers(&self.schema, &self.props, &self.arrow_schema)?;
907 Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema))
908 }
909}
910
911pub fn get_column_writers(
913 parquet: &SchemaDescriptor,
914 props: &WriterPropertiesPtr,
915 arrow: &SchemaRef,
916) -> Result<Vec<ArrowColumnWriter>> {
917 let mut writers = Vec::with_capacity(arrow.fields.len());
918 let mut leaves = parquet.columns().iter();
919 let column_factory = ArrowColumnWriterFactory::new();
920 for field in &arrow.fields {
921 column_factory.get_arrow_column_writer(
922 field.data_type(),
923 props,
924 &mut leaves,
925 &mut writers,
926 )?;
927 }
928 Ok(writers)
929}
930
931#[cfg(feature = "encryption")]
933fn get_column_writers_with_encryptor(
934 parquet: &SchemaDescriptor,
935 props: &WriterPropertiesPtr,
936 arrow: &SchemaRef,
937 file_encryptor: Option<Arc<FileEncryptor>>,
938 row_group_index: usize,
939) -> Result<Vec<ArrowColumnWriter>> {
940 let mut writers = Vec::with_capacity(arrow.fields.len());
941 let mut leaves = parquet.columns().iter();
942 let column_factory =
943 ArrowColumnWriterFactory::new().with_file_encryptor(row_group_index, file_encryptor);
944 for field in &arrow.fields {
945 column_factory.get_arrow_column_writer(
946 field.data_type(),
947 props,
948 &mut leaves,
949 &mut writers,
950 )?;
951 }
952 Ok(writers)
953}
954
955struct ArrowColumnWriterFactory {
957 #[cfg(feature = "encryption")]
958 row_group_index: usize,
959 #[cfg(feature = "encryption")]
960 file_encryptor: Option<Arc<FileEncryptor>>,
961}
962
963impl ArrowColumnWriterFactory {
964 pub fn new() -> Self {
965 Self {
966 #[cfg(feature = "encryption")]
967 row_group_index: 0,
968 #[cfg(feature = "encryption")]
969 file_encryptor: None,
970 }
971 }
972
973 #[cfg(feature = "encryption")]
974 pub fn with_file_encryptor(
975 mut self,
976 row_group_index: usize,
977 file_encryptor: Option<Arc<FileEncryptor>>,
978 ) -> Self {
979 self.row_group_index = row_group_index;
980 self.file_encryptor = file_encryptor;
981 self
982 }
983
984 #[cfg(feature = "encryption")]
985 fn create_page_writer(
986 &self,
987 column_descriptor: &ColumnDescPtr,
988 column_index: usize,
989 ) -> Result<Box<ArrowPageWriter>> {
990 let column_path = column_descriptor.path().string();
991 let page_encryptor = PageEncryptor::create_if_column_encrypted(
992 &self.file_encryptor,
993 self.row_group_index,
994 column_index,
995 &column_path,
996 )?;
997 Ok(Box::new(
998 ArrowPageWriter::default().with_encryptor(page_encryptor),
999 ))
1000 }
1001
1002 #[cfg(not(feature = "encryption"))]
1003 fn create_page_writer(
1004 &self,
1005 _column_descriptor: &ColumnDescPtr,
1006 _column_index: usize,
1007 ) -> Result<Box<ArrowPageWriter>> {
1008 Ok(Box::<ArrowPageWriter>::default())
1009 }
1010
1011 fn get_arrow_column_writer(
1013 &self,
1014 data_type: &ArrowDataType,
1015 props: &WriterPropertiesPtr,
1016 leaves: &mut Iter<'_, ColumnDescPtr>,
1017 out: &mut Vec<ArrowColumnWriter>,
1018 ) -> Result<()> {
1019 let col = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
1020 let page_writer = self.create_page_writer(desc, out.len())?;
1021 let chunk = page_writer.buffer.clone();
1022 let writer = get_column_writer(desc.clone(), props.clone(), page_writer);
1023 Ok(ArrowColumnWriter {
1024 chunk,
1025 writer: ArrowColumnWriterImpl::Column(writer),
1026 })
1027 };
1028
1029 let bytes = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
1030 let page_writer = self.create_page_writer(desc, out.len())?;
1031 let chunk = page_writer.buffer.clone();
1032 let writer = GenericColumnWriter::new(desc.clone(), props.clone(), page_writer);
1033 Ok(ArrowColumnWriter {
1034 chunk,
1035 writer: ArrowColumnWriterImpl::ByteArray(writer),
1036 })
1037 };
1038
1039 match data_type {
1040 _ if data_type.is_primitive() => out.push(col(leaves.next().unwrap())?),
1041 ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Boolean | ArrowDataType::Null => out.push(col(leaves.next().unwrap())?),
1042 ArrowDataType::LargeBinary
1043 | ArrowDataType::Binary
1044 | ArrowDataType::Utf8
1045 | ArrowDataType::LargeUtf8
1046 | ArrowDataType::BinaryView
1047 | ArrowDataType::Utf8View => {
1048 out.push(bytes(leaves.next().unwrap())?)
1049 }
1050 ArrowDataType::List(f)
1051 | ArrowDataType::LargeList(f)
1052 | ArrowDataType::FixedSizeList(f, _) => {
1053 self.get_arrow_column_writer(f.data_type(), props, leaves, out)?
1054 }
1055 ArrowDataType::Struct(fields) => {
1056 for field in fields {
1057 self.get_arrow_column_writer(field.data_type(), props, leaves, out)?
1058 }
1059 }
1060 ArrowDataType::Map(f, _) => match f.data_type() {
1061 ArrowDataType::Struct(f) => {
1062 self.get_arrow_column_writer(f[0].data_type(), props, leaves, out)?;
1063 self.get_arrow_column_writer(f[1].data_type(), props, leaves, out)?
1064 }
1065 _ => unreachable!("invalid map type"),
1066 }
1067 ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1068 ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 | ArrowDataType::Binary | ArrowDataType::LargeBinary => {
1069 out.push(bytes(leaves.next().unwrap())?)
1070 }
1071 ArrowDataType::Utf8View | ArrowDataType::BinaryView => {
1072 out.push(bytes(leaves.next().unwrap())?)
1073 }
1074 ArrowDataType::FixedSizeBinary(_) => {
1075 out.push(bytes(leaves.next().unwrap())?)
1076 }
1077 _ => {
1078 out.push(col(leaves.next().unwrap())?)
1079 }
1080 }
1081 _ => return Err(ParquetError::NYI(
1082 format!(
1083 "Attempting to write an Arrow type {data_type:?} to parquet that is not yet implemented"
1084 )
1085 ))
1086 }
1087 Ok(())
1088 }
1089}
1090
1091fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result<usize> {
1092 let column = levels.array().as_ref();
1093 let indices = levels.non_null_indices();
1094 match writer {
1095 ColumnWriter::Int32ColumnWriter(ref mut typed) => {
1096 match column.data_type() {
1097 ArrowDataType::Date64 => {
1098 let array = arrow_cast::cast(column, &ArrowDataType::Date32)?;
1100 let array = arrow_cast::cast(&array, &ArrowDataType::Int32)?;
1101
1102 let array = array.as_primitive::<Int32Type>();
1103 write_primitive(typed, array.values(), levels)
1104 }
1105 ArrowDataType::UInt32 => {
1106 let values = column.as_primitive::<UInt32Type>().values();
1107 let array = values.inner().typed_data::<i32>();
1110 write_primitive(typed, array, levels)
1111 }
1112 ArrowDataType::Decimal32(_, _) => {
1113 let array = column
1114 .as_primitive::<Decimal32Type>()
1115 .unary::<_, Int32Type>(|v| v);
1116 write_primitive(typed, array.values(), levels)
1117 }
1118 ArrowDataType::Decimal64(_, _) => {
1119 let array = column
1121 .as_primitive::<Decimal64Type>()
1122 .unary::<_, Int32Type>(|v| v as i32);
1123 write_primitive(typed, array.values(), levels)
1124 }
1125 ArrowDataType::Decimal128(_, _) => {
1126 let array = column
1128 .as_primitive::<Decimal128Type>()
1129 .unary::<_, Int32Type>(|v| v as i32);
1130 write_primitive(typed, array.values(), levels)
1131 }
1132 ArrowDataType::Decimal256(_, _) => {
1133 let array = column
1135 .as_primitive::<Decimal256Type>()
1136 .unary::<_, Int32Type>(|v| v.as_i128() as i32);
1137 write_primitive(typed, array.values(), levels)
1138 }
1139 ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1140 ArrowDataType::Decimal32(_, _) => {
1141 let array = arrow_cast::cast(column, value_type)?;
1142 let array = array
1143 .as_primitive::<Decimal32Type>()
1144 .unary::<_, Int32Type>(|v| v);
1145 write_primitive(typed, array.values(), levels)
1146 }
1147 ArrowDataType::Decimal64(_, _) => {
1148 let array = arrow_cast::cast(column, value_type)?;
1149 let array = array
1150 .as_primitive::<Decimal64Type>()
1151 .unary::<_, Int32Type>(|v| v as i32);
1152 write_primitive(typed, array.values(), levels)
1153 }
1154 ArrowDataType::Decimal128(_, _) => {
1155 let array = arrow_cast::cast(column, value_type)?;
1156 let array = array
1157 .as_primitive::<Decimal128Type>()
1158 .unary::<_, Int32Type>(|v| v as i32);
1159 write_primitive(typed, array.values(), levels)
1160 }
1161 ArrowDataType::Decimal256(_, _) => {
1162 let array = arrow_cast::cast(column, value_type)?;
1163 let array = array
1164 .as_primitive::<Decimal256Type>()
1165 .unary::<_, Int32Type>(|v| v.as_i128() as i32);
1166 write_primitive(typed, array.values(), levels)
1167 }
1168 _ => {
1169 let array = arrow_cast::cast(column, &ArrowDataType::Int32)?;
1170 let array = array.as_primitive::<Int32Type>();
1171 write_primitive(typed, array.values(), levels)
1172 }
1173 },
1174 _ => {
1175 let array = arrow_cast::cast(column, &ArrowDataType::Int32)?;
1176 let array = array.as_primitive::<Int32Type>();
1177 write_primitive(typed, array.values(), levels)
1178 }
1179 }
1180 }
1181 ColumnWriter::BoolColumnWriter(ref mut typed) => {
1182 let array = column.as_boolean();
1183 typed.write_batch(
1184 get_bool_array_slice(array, indices).as_slice(),
1185 levels.def_levels(),
1186 levels.rep_levels(),
1187 )
1188 }
1189 ColumnWriter::Int64ColumnWriter(ref mut typed) => {
1190 match column.data_type() {
1191 ArrowDataType::Date64 => {
1192 let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
1193
1194 let array = array.as_primitive::<Int64Type>();
1195 write_primitive(typed, array.values(), levels)
1196 }
1197 ArrowDataType::Int64 => {
1198 let array = column.as_primitive::<Int64Type>();
1199 write_primitive(typed, array.values(), levels)
1200 }
1201 ArrowDataType::UInt64 => {
1202 let values = column.as_primitive::<UInt64Type>().values();
1203 let array = values.inner().typed_data::<i64>();
1206 write_primitive(typed, array, levels)
1207 }
1208 ArrowDataType::Decimal64(_, _) => {
1209 let array = column
1210 .as_primitive::<Decimal64Type>()
1211 .unary::<_, Int64Type>(|v| v);
1212 write_primitive(typed, array.values(), levels)
1213 }
1214 ArrowDataType::Decimal128(_, _) => {
1215 let array = column
1217 .as_primitive::<Decimal128Type>()
1218 .unary::<_, Int64Type>(|v| v as i64);
1219 write_primitive(typed, array.values(), levels)
1220 }
1221 ArrowDataType::Decimal256(_, _) => {
1222 let array = column
1224 .as_primitive::<Decimal256Type>()
1225 .unary::<_, Int64Type>(|v| v.as_i128() as i64);
1226 write_primitive(typed, array.values(), levels)
1227 }
1228 ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1229 ArrowDataType::Decimal64(_, _) => {
1230 let array = arrow_cast::cast(column, value_type)?;
1231 let array = array
1232 .as_primitive::<Decimal64Type>()
1233 .unary::<_, Int64Type>(|v| v);
1234 write_primitive(typed, array.values(), levels)
1235 }
1236 ArrowDataType::Decimal128(_, _) => {
1237 let array = arrow_cast::cast(column, value_type)?;
1238 let array = array
1239 .as_primitive::<Decimal128Type>()
1240 .unary::<_, Int64Type>(|v| v as i64);
1241 write_primitive(typed, array.values(), levels)
1242 }
1243 ArrowDataType::Decimal256(_, _) => {
1244 let array = arrow_cast::cast(column, value_type)?;
1245 let array = array
1246 .as_primitive::<Decimal256Type>()
1247 .unary::<_, Int64Type>(|v| v.as_i128() as i64);
1248 write_primitive(typed, array.values(), levels)
1249 }
1250 _ => {
1251 let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
1252 let array = array.as_primitive::<Int64Type>();
1253 write_primitive(typed, array.values(), levels)
1254 }
1255 },
1256 _ => {
1257 let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
1258 let array = array.as_primitive::<Int64Type>();
1259 write_primitive(typed, array.values(), levels)
1260 }
1261 }
1262 }
1263 ColumnWriter::Int96ColumnWriter(ref mut _typed) => {
1264 unreachable!("Currently unreachable because data type not supported")
1265 }
1266 ColumnWriter::FloatColumnWriter(ref mut typed) => {
1267 let array = column.as_primitive::<Float32Type>();
1268 write_primitive(typed, array.values(), levels)
1269 }
1270 ColumnWriter::DoubleColumnWriter(ref mut typed) => {
1271 let array = column.as_primitive::<Float64Type>();
1272 write_primitive(typed, array.values(), levels)
1273 }
1274 ColumnWriter::ByteArrayColumnWriter(_) => {
1275 unreachable!("should use ByteArrayWriter")
1276 }
1277 ColumnWriter::FixedLenByteArrayColumnWriter(ref mut typed) => {
1278 let bytes = match column.data_type() {
1279 ArrowDataType::Interval(interval_unit) => match interval_unit {
1280 IntervalUnit::YearMonth => {
1281 let array = column
1282 .as_any()
1283 .downcast_ref::<arrow_array::IntervalYearMonthArray>()
1284 .unwrap();
1285 get_interval_ym_array_slice(array, indices)
1286 }
1287 IntervalUnit::DayTime => {
1288 let array = column
1289 .as_any()
1290 .downcast_ref::<arrow_array::IntervalDayTimeArray>()
1291 .unwrap();
1292 get_interval_dt_array_slice(array, indices)
1293 }
1294 _ => {
1295 return Err(ParquetError::NYI(
1296 format!(
1297 "Attempting to write an Arrow interval type {interval_unit:?} to parquet that is not yet implemented"
1298 )
1299 ));
1300 }
1301 },
1302 ArrowDataType::FixedSizeBinary(_) => {
1303 let array = column
1304 .as_any()
1305 .downcast_ref::<arrow_array::FixedSizeBinaryArray>()
1306 .unwrap();
1307 get_fsb_array_slice(array, indices)
1308 }
1309 ArrowDataType::Decimal32(_, _) => {
1310 let array = column.as_primitive::<Decimal32Type>();
1311 get_decimal_32_array_slice(array, indices)
1312 }
1313 ArrowDataType::Decimal64(_, _) => {
1314 let array = column.as_primitive::<Decimal64Type>();
1315 get_decimal_64_array_slice(array, indices)
1316 }
1317 ArrowDataType::Decimal128(_, _) => {
1318 let array = column.as_primitive::<Decimal128Type>();
1319 get_decimal_128_array_slice(array, indices)
1320 }
1321 ArrowDataType::Decimal256(_, _) => {
1322 let array = column
1323 .as_any()
1324 .downcast_ref::<arrow_array::Decimal256Array>()
1325 .unwrap();
1326 get_decimal_256_array_slice(array, indices)
1327 }
1328 ArrowDataType::Float16 => {
1329 let array = column.as_primitive::<Float16Type>();
1330 get_float_16_array_slice(array, indices)
1331 }
1332 _ => {
1333 return Err(ParquetError::NYI(
1334 "Attempting to write an Arrow type that is not yet implemented".to_string(),
1335 ));
1336 }
1337 };
1338 typed.write_batch(bytes.as_slice(), levels.def_levels(), levels.rep_levels())
1339 }
1340 }
1341}
1342
1343fn write_primitive<E: ColumnValueEncoder>(
1344 writer: &mut GenericColumnWriter<E>,
1345 values: &E::Values,
1346 levels: &ArrayLevels,
1347) -> Result<usize> {
1348 writer.write_batch_internal(
1349 values,
1350 Some(levels.non_null_indices()),
1351 levels.def_levels(),
1352 levels.rep_levels(),
1353 None,
1354 None,
1355 None,
1356 )
1357}
1358
1359fn get_bool_array_slice(array: &arrow_array::BooleanArray, indices: &[usize]) -> Vec<bool> {
1360 let mut values = Vec::with_capacity(indices.len());
1361 for i in indices {
1362 values.push(array.value(*i))
1363 }
1364 values
1365}
1366
1367fn get_interval_ym_array_slice(
1370 array: &arrow_array::IntervalYearMonthArray,
1371 indices: &[usize],
1372) -> Vec<FixedLenByteArray> {
1373 let mut values = Vec::with_capacity(indices.len());
1374 for i in indices {
1375 let mut value = array.value(*i).to_le_bytes().to_vec();
1376 let mut suffix = vec![0; 8];
1377 value.append(&mut suffix);
1378 values.push(FixedLenByteArray::from(ByteArray::from(value)))
1379 }
1380 values
1381}
1382
1383fn get_interval_dt_array_slice(
1386 array: &arrow_array::IntervalDayTimeArray,
1387 indices: &[usize],
1388) -> Vec<FixedLenByteArray> {
1389 let mut values = Vec::with_capacity(indices.len());
1390 for i in indices {
1391 let mut out = [0; 12];
1392 let value = array.value(*i);
1393 out[4..8].copy_from_slice(&value.days.to_le_bytes());
1394 out[8..12].copy_from_slice(&value.milliseconds.to_le_bytes());
1395 values.push(FixedLenByteArray::from(ByteArray::from(out.to_vec())));
1396 }
1397 values
1398}
1399
1400fn get_decimal_32_array_slice(
1401 array: &arrow_array::Decimal32Array,
1402 indices: &[usize],
1403) -> Vec<FixedLenByteArray> {
1404 let mut values = Vec::with_capacity(indices.len());
1405 let size = decimal_length_from_precision(array.precision());
1406 for i in indices {
1407 let as_be_bytes = array.value(*i).to_be_bytes();
1408 let resized_value = as_be_bytes[(4 - size)..].to_vec();
1409 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1410 }
1411 values
1412}
1413
1414fn get_decimal_64_array_slice(
1415 array: &arrow_array::Decimal64Array,
1416 indices: &[usize],
1417) -> Vec<FixedLenByteArray> {
1418 let mut values = Vec::with_capacity(indices.len());
1419 let size = decimal_length_from_precision(array.precision());
1420 for i in indices {
1421 let as_be_bytes = array.value(*i).to_be_bytes();
1422 let resized_value = as_be_bytes[(8 - size)..].to_vec();
1423 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1424 }
1425 values
1426}
1427
1428fn get_decimal_128_array_slice(
1429 array: &arrow_array::Decimal128Array,
1430 indices: &[usize],
1431) -> Vec<FixedLenByteArray> {
1432 let mut values = Vec::with_capacity(indices.len());
1433 let size = decimal_length_from_precision(array.precision());
1434 for i in indices {
1435 let as_be_bytes = array.value(*i).to_be_bytes();
1436 let resized_value = as_be_bytes[(16 - size)..].to_vec();
1437 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1438 }
1439 values
1440}
1441
1442fn get_decimal_256_array_slice(
1443 array: &arrow_array::Decimal256Array,
1444 indices: &[usize],
1445) -> Vec<FixedLenByteArray> {
1446 let mut values = Vec::with_capacity(indices.len());
1447 let size = decimal_length_from_precision(array.precision());
1448 for i in indices {
1449 let as_be_bytes = array.value(*i).to_be_bytes();
1450 let resized_value = as_be_bytes[(32 - size)..].to_vec();
1451 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1452 }
1453 values
1454}
1455
1456fn get_float_16_array_slice(
1457 array: &arrow_array::Float16Array,
1458 indices: &[usize],
1459) -> Vec<FixedLenByteArray> {
1460 let mut values = Vec::with_capacity(indices.len());
1461 for i in indices {
1462 let value = array.value(*i).to_le_bytes().to_vec();
1463 values.push(FixedLenByteArray::from(ByteArray::from(value)));
1464 }
1465 values
1466}
1467
1468fn get_fsb_array_slice(
1469 array: &arrow_array::FixedSizeBinaryArray,
1470 indices: &[usize],
1471) -> Vec<FixedLenByteArray> {
1472 let mut values = Vec::with_capacity(indices.len());
1473 for i in indices {
1474 let value = array.value(*i).to_vec();
1475 values.push(FixedLenByteArray::from(ByteArray::from(value)))
1476 }
1477 values
1478}
1479
1480#[cfg(test)]
1481mod tests {
1482 use super::*;
1483
1484 use std::fs::File;
1485 use std::io::Seek;
1486
1487 use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
1488 use crate::arrow::ARROW_SCHEMA_META_KEY;
1489 use crate::column::page::{Page, PageReader};
1490 use crate::file::page_encoding_stats::PageEncodingStats;
1491 use crate::file::reader::SerializedPageReader;
1492 use crate::format::PageHeader;
1493 use crate::schema::types::ColumnPath;
1494 use crate::thrift::TCompactSliceInputProtocol;
1495 use arrow::datatypes::ToByteSlice;
1496 use arrow::datatypes::{DataType, Schema};
1497 use arrow::error::Result as ArrowResult;
1498 use arrow::util::data_gen::create_random_array;
1499 use arrow::util::pretty::pretty_format_batches;
1500 use arrow::{array::*, buffer::Buffer};
1501 use arrow_buffer::{i256, IntervalDayTime, IntervalMonthDayNano, NullBuffer};
1502 use arrow_schema::Fields;
1503 use half::f16;
1504 use num::{FromPrimitive, ToPrimitive};
1505 use tempfile::tempfile;
1506
1507 use crate::basic::Encoding;
1508 use crate::data_type::AsBytes;
1509 use crate::file::metadata::{ColumnChunkMetaData, ParquetMetaData, ParquetMetaDataReader};
1510 use crate::file::page_index::index::Index;
1511 use crate::file::properties::{
1512 BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion,
1513 };
1514 use crate::file::serialized_reader::ReadOptionsBuilder;
1515 use crate::file::{
1516 reader::{FileReader, SerializedFileReader},
1517 statistics::Statistics,
1518 };
1519
1520 #[test]
1521 fn arrow_writer() {
1522 let schema = Schema::new(vec![
1524 Field::new("a", DataType::Int32, false),
1525 Field::new("b", DataType::Int32, true),
1526 ]);
1527
1528 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1530 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1531
1532 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
1534
1535 roundtrip(batch, Some(SMALL_SIZE / 2));
1536 }
1537
1538 fn get_bytes_after_close(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1539 let mut buffer = vec![];
1540
1541 let mut writer = ArrowWriter::try_new(&mut buffer, schema, None).unwrap();
1542 writer.write(expected_batch).unwrap();
1543 writer.close().unwrap();
1544
1545 buffer
1546 }
1547
1548 fn get_bytes_by_into_inner(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1549 let mut writer = ArrowWriter::try_new(Vec::new(), schema, None).unwrap();
1550 writer.write(expected_batch).unwrap();
1551 writer.into_inner().unwrap()
1552 }
1553
1554 #[test]
1555 fn roundtrip_bytes() {
1556 let schema = Arc::new(Schema::new(vec![
1558 Field::new("a", DataType::Int32, false),
1559 Field::new("b", DataType::Int32, true),
1560 ]));
1561
1562 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1564 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1565
1566 let expected_batch =
1568 RecordBatch::try_new(schema.clone(), vec![Arc::new(a), Arc::new(b)]).unwrap();
1569
1570 for buffer in [
1571 get_bytes_after_close(schema.clone(), &expected_batch),
1572 get_bytes_by_into_inner(schema, &expected_batch),
1573 ] {
1574 let cursor = Bytes::from(buffer);
1575 let mut record_batch_reader = ParquetRecordBatchReader::try_new(cursor, 1024).unwrap();
1576
1577 let actual_batch = record_batch_reader
1578 .next()
1579 .expect("No batch found")
1580 .expect("Unable to get batch");
1581
1582 assert_eq!(expected_batch.schema(), actual_batch.schema());
1583 assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
1584 assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
1585 for i in 0..expected_batch.num_columns() {
1586 let expected_data = expected_batch.column(i).to_data();
1587 let actual_data = actual_batch.column(i).to_data();
1588
1589 assert_eq!(expected_data, actual_data);
1590 }
1591 }
1592 }
1593
1594 #[test]
1595 fn arrow_writer_non_null() {
1596 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1598
1599 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1601
1602 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1604
1605 roundtrip(batch, Some(SMALL_SIZE / 2));
1606 }
1607
1608 #[test]
1609 fn arrow_writer_list() {
1610 let schema = Schema::new(vec![Field::new(
1612 "a",
1613 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
1614 true,
1615 )]);
1616
1617 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1619
1620 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1623
1624 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
1626 DataType::Int32,
1627 false,
1628 ))))
1629 .len(5)
1630 .add_buffer(a_value_offsets)
1631 .add_child_data(a_values.into_data())
1632 .null_bit_buffer(Some(Buffer::from([0b00011011])))
1633 .build()
1634 .unwrap();
1635 let a = ListArray::from(a_list_data);
1636
1637 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1639
1640 assert_eq!(batch.column(0).null_count(), 1);
1641
1642 roundtrip(batch, None);
1645 }
1646
1647 #[test]
1648 fn arrow_writer_list_non_null() {
1649 let schema = Schema::new(vec![Field::new(
1651 "a",
1652 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
1653 false,
1654 )]);
1655
1656 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1658
1659 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1662
1663 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
1665 DataType::Int32,
1666 false,
1667 ))))
1668 .len(5)
1669 .add_buffer(a_value_offsets)
1670 .add_child_data(a_values.into_data())
1671 .build()
1672 .unwrap();
1673 let a = ListArray::from(a_list_data);
1674
1675 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1677
1678 assert_eq!(batch.column(0).null_count(), 0);
1681
1682 roundtrip(batch, None);
1683 }
1684
1685 #[test]
1686 fn arrow_writer_binary() {
1687 let string_field = Field::new("a", DataType::Utf8, false);
1688 let binary_field = Field::new("b", DataType::Binary, false);
1689 let schema = Schema::new(vec![string_field, binary_field]);
1690
1691 let raw_string_values = vec!["foo", "bar", "baz", "quux"];
1692 let raw_binary_values = [
1693 b"foo".to_vec(),
1694 b"bar".to_vec(),
1695 b"baz".to_vec(),
1696 b"quux".to_vec(),
1697 ];
1698 let raw_binary_value_refs = raw_binary_values
1699 .iter()
1700 .map(|x| x.as_slice())
1701 .collect::<Vec<_>>();
1702
1703 let string_values = StringArray::from(raw_string_values.clone());
1704 let binary_values = BinaryArray::from(raw_binary_value_refs);
1705 let batch = RecordBatch::try_new(
1706 Arc::new(schema),
1707 vec![Arc::new(string_values), Arc::new(binary_values)],
1708 )
1709 .unwrap();
1710
1711 roundtrip(batch, Some(SMALL_SIZE / 2));
1712 }
1713
1714 #[test]
1715 fn arrow_writer_binary_view() {
1716 let string_field = Field::new("a", DataType::Utf8View, false);
1717 let binary_field = Field::new("b", DataType::BinaryView, false);
1718 let nullable_string_field = Field::new("a", DataType::Utf8View, true);
1719 let schema = Schema::new(vec![string_field, binary_field, nullable_string_field]);
1720
1721 let raw_string_values = vec!["foo", "bar", "large payload over 12 bytes", "lulu"];
1722 let raw_binary_values = vec![
1723 b"foo".to_vec(),
1724 b"bar".to_vec(),
1725 b"large payload over 12 bytes".to_vec(),
1726 b"lulu".to_vec(),
1727 ];
1728 let nullable_string_values =
1729 vec![Some("foo"), None, Some("large payload over 12 bytes"), None];
1730
1731 let string_view_values = StringViewArray::from(raw_string_values);
1732 let binary_view_values = BinaryViewArray::from_iter_values(raw_binary_values);
1733 let nullable_string_view_values = StringViewArray::from(nullable_string_values);
1734 let batch = RecordBatch::try_new(
1735 Arc::new(schema),
1736 vec![
1737 Arc::new(string_view_values),
1738 Arc::new(binary_view_values),
1739 Arc::new(nullable_string_view_values),
1740 ],
1741 )
1742 .unwrap();
1743
1744 roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
1745 roundtrip(batch, None);
1746 }
1747
1748 fn get_decimal_batch(precision: u8, scale: i8) -> RecordBatch {
1749 let decimal_field = Field::new("a", DataType::Decimal128(precision, scale), false);
1750 let schema = Schema::new(vec![decimal_field]);
1751
1752 let decimal_values = vec![10_000, 50_000, 0, -100]
1753 .into_iter()
1754 .map(Some)
1755 .collect::<Decimal128Array>()
1756 .with_precision_and_scale(precision, scale)
1757 .unwrap();
1758
1759 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(decimal_values)]).unwrap()
1760 }
1761
1762 #[test]
1763 fn arrow_writer_decimal() {
1764 let batch_int32_decimal = get_decimal_batch(5, 2);
1766 roundtrip(batch_int32_decimal, Some(SMALL_SIZE / 2));
1767 let batch_int64_decimal = get_decimal_batch(12, 2);
1769 roundtrip(batch_int64_decimal, Some(SMALL_SIZE / 2));
1770 let batch_fixed_len_byte_array_decimal = get_decimal_batch(30, 2);
1772 roundtrip(batch_fixed_len_byte_array_decimal, Some(SMALL_SIZE / 2));
1773 }
1774
1775 #[test]
1776 fn arrow_writer_complex() {
1777 let struct_field_d = Arc::new(Field::new("d", DataType::Float64, true));
1779 let struct_field_f = Arc::new(Field::new("f", DataType::Float32, true));
1780 let struct_field_g = Arc::new(Field::new_list(
1781 "g",
1782 Field::new_list_field(DataType::Int16, true),
1783 false,
1784 ));
1785 let struct_field_h = Arc::new(Field::new_list(
1786 "h",
1787 Field::new_list_field(DataType::Int16, false),
1788 true,
1789 ));
1790 let struct_field_e = Arc::new(Field::new_struct(
1791 "e",
1792 vec![
1793 struct_field_f.clone(),
1794 struct_field_g.clone(),
1795 struct_field_h.clone(),
1796 ],
1797 false,
1798 ));
1799 let schema = Schema::new(vec![
1800 Field::new("a", DataType::Int32, false),
1801 Field::new("b", DataType::Int32, true),
1802 Field::new_struct(
1803 "c",
1804 vec![struct_field_d.clone(), struct_field_e.clone()],
1805 false,
1806 ),
1807 ]);
1808
1809 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1811 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1812 let d = Float64Array::from(vec![None, None, None, Some(1.0), None]);
1813 let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]);
1814
1815 let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1816
1817 let g_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1820
1821 let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
1823 .len(5)
1824 .add_buffer(g_value_offsets.clone())
1825 .add_child_data(g_value.to_data())
1826 .build()
1827 .unwrap();
1828 let g = ListArray::from(g_list_data);
1829 let h_list_data = ArrayData::builder(struct_field_h.data_type().clone())
1831 .len(5)
1832 .add_buffer(g_value_offsets)
1833 .add_child_data(g_value.to_data())
1834 .null_bit_buffer(Some(Buffer::from([0b00011011])))
1835 .build()
1836 .unwrap();
1837 let h = ListArray::from(h_list_data);
1838
1839 let e = StructArray::from(vec![
1840 (struct_field_f, Arc::new(f) as ArrayRef),
1841 (struct_field_g, Arc::new(g) as ArrayRef),
1842 (struct_field_h, Arc::new(h) as ArrayRef),
1843 ]);
1844
1845 let c = StructArray::from(vec![
1846 (struct_field_d, Arc::new(d) as ArrayRef),
1847 (struct_field_e, Arc::new(e) as ArrayRef),
1848 ]);
1849
1850 let batch = RecordBatch::try_new(
1852 Arc::new(schema),
1853 vec![Arc::new(a), Arc::new(b), Arc::new(c)],
1854 )
1855 .unwrap();
1856
1857 roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
1858 roundtrip(batch, Some(SMALL_SIZE / 3));
1859 }
1860
1861 #[test]
1862 fn arrow_writer_complex_mixed() {
1863 let offset_field = Arc::new(Field::new("offset", DataType::Int32, false));
1868 let partition_field = Arc::new(Field::new("partition", DataType::Int64, true));
1869 let topic_field = Arc::new(Field::new("topic", DataType::Utf8, true));
1870 let schema = Schema::new(vec![Field::new(
1871 "some_nested_object",
1872 DataType::Struct(Fields::from(vec![
1873 offset_field.clone(),
1874 partition_field.clone(),
1875 topic_field.clone(),
1876 ])),
1877 false,
1878 )]);
1879
1880 let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
1882 let partition = Int64Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1883 let topic = StringArray::from(vec![Some("A"), None, Some("A"), Some(""), None]);
1884
1885 let some_nested_object = StructArray::from(vec![
1886 (offset_field, Arc::new(offset) as ArrayRef),
1887 (partition_field, Arc::new(partition) as ArrayRef),
1888 (topic_field, Arc::new(topic) as ArrayRef),
1889 ]);
1890
1891 let batch =
1893 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
1894
1895 roundtrip(batch, Some(SMALL_SIZE / 2));
1896 }
1897
1898 #[test]
1899 fn arrow_writer_map() {
1900 let json_content = r#"
1902 {"stocks":{"long": "$AAA", "short": "$BBB"}}
1903 {"stocks":{"long": null, "long": "$CCC", "short": null}}
1904 {"stocks":{"hedged": "$YYY", "long": null, "short": "$D"}}
1905 "#;
1906 let entries_struct_type = DataType::Struct(Fields::from(vec![
1907 Field::new("key", DataType::Utf8, false),
1908 Field::new("value", DataType::Utf8, true),
1909 ]));
1910 let stocks_field = Field::new(
1911 "stocks",
1912 DataType::Map(
1913 Arc::new(Field::new("entries", entries_struct_type, false)),
1914 false,
1915 ),
1916 true,
1917 );
1918 let schema = Arc::new(Schema::new(vec![stocks_field]));
1919 let builder = arrow::json::ReaderBuilder::new(schema).with_batch_size(64);
1920 let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
1921
1922 let batch = reader.next().unwrap().unwrap();
1923 roundtrip(batch, None);
1924 }
1925
1926 #[test]
1927 fn arrow_writer_2_level_struct() {
1928 let field_c = Field::new("c", DataType::Int32, true);
1930 let field_b = Field::new("b", DataType::Struct(vec![field_c].into()), true);
1931 let type_a = DataType::Struct(vec![field_b.clone()].into());
1932 let field_a = Field::new("a", type_a, true);
1933 let schema = Schema::new(vec![field_a.clone()]);
1934
1935 let c = Int32Array::from(vec![Some(1), None, Some(3), None, None, Some(6)]);
1937 let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
1938 .len(6)
1939 .null_bit_buffer(Some(Buffer::from([0b00100111])))
1940 .add_child_data(c.into_data())
1941 .build()
1942 .unwrap();
1943 let b = StructArray::from(b_data);
1944 let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
1945 .len(6)
1946 .null_bit_buffer(Some(Buffer::from([0b00101111])))
1947 .add_child_data(b.into_data())
1948 .build()
1949 .unwrap();
1950 let a = StructArray::from(a_data);
1951
1952 assert_eq!(a.null_count(), 1);
1953 assert_eq!(a.column(0).null_count(), 2);
1954
1955 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1957
1958 roundtrip(batch, Some(SMALL_SIZE / 2));
1959 }
1960
1961 #[test]
1962 fn arrow_writer_2_level_struct_non_null() {
1963 let field_c = Field::new("c", DataType::Int32, false);
1965 let type_b = DataType::Struct(vec![field_c].into());
1966 let field_b = Field::new("b", type_b.clone(), false);
1967 let type_a = DataType::Struct(vec![field_b].into());
1968 let field_a = Field::new("a", type_a.clone(), false);
1969 let schema = Schema::new(vec![field_a]);
1970
1971 let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
1973 let b_data = ArrayDataBuilder::new(type_b)
1974 .len(6)
1975 .add_child_data(c.into_data())
1976 .build()
1977 .unwrap();
1978 let b = StructArray::from(b_data);
1979 let a_data = ArrayDataBuilder::new(type_a)
1980 .len(6)
1981 .add_child_data(b.into_data())
1982 .build()
1983 .unwrap();
1984 let a = StructArray::from(a_data);
1985
1986 assert_eq!(a.null_count(), 0);
1987 assert_eq!(a.column(0).null_count(), 0);
1988
1989 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1991
1992 roundtrip(batch, Some(SMALL_SIZE / 2));
1993 }
1994
1995 #[test]
1996 fn arrow_writer_2_level_struct_mixed_null() {
1997 let field_c = Field::new("c", DataType::Int32, false);
1999 let type_b = DataType::Struct(vec![field_c].into());
2000 let field_b = Field::new("b", type_b.clone(), true);
2001 let type_a = DataType::Struct(vec![field_b].into());
2002 let field_a = Field::new("a", type_a.clone(), false);
2003 let schema = Schema::new(vec![field_a]);
2004
2005 let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
2007 let b_data = ArrayDataBuilder::new(type_b)
2008 .len(6)
2009 .null_bit_buffer(Some(Buffer::from([0b00100111])))
2010 .add_child_data(c.into_data())
2011 .build()
2012 .unwrap();
2013 let b = StructArray::from(b_data);
2014 let a_data = ArrayDataBuilder::new(type_a)
2016 .len(6)
2017 .add_child_data(b.into_data())
2018 .build()
2019 .unwrap();
2020 let a = StructArray::from(a_data);
2021
2022 assert_eq!(a.null_count(), 0);
2023 assert_eq!(a.column(0).null_count(), 2);
2024
2025 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2027
2028 roundtrip(batch, Some(SMALL_SIZE / 2));
2029 }
2030
2031 #[test]
2032 fn arrow_writer_2_level_struct_mixed_null_2() {
2033 let field_c = Field::new("c", DataType::Int32, false);
2035 let field_d = Field::new("d", DataType::FixedSizeBinary(4), false);
2036 let field_e = Field::new(
2037 "e",
2038 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2039 false,
2040 );
2041
2042 let field_b = Field::new(
2043 "b",
2044 DataType::Struct(vec![field_c, field_d, field_e].into()),
2045 false,
2046 );
2047 let type_a = DataType::Struct(vec![field_b.clone()].into());
2048 let field_a = Field::new("a", type_a, true);
2049 let schema = Schema::new(vec![field_a.clone()]);
2050
2051 let c = Int32Array::from_iter_values(0..6);
2053 let d = FixedSizeBinaryArray::try_from_iter(
2054 ["aaaa", "bbbb", "cccc", "dddd", "eeee", "ffff"].into_iter(),
2055 )
2056 .expect("four byte values");
2057 let e = Int32DictionaryArray::from_iter(["one", "two", "three", "four", "five", "one"]);
2058 let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
2059 .len(6)
2060 .add_child_data(c.into_data())
2061 .add_child_data(d.into_data())
2062 .add_child_data(e.into_data())
2063 .build()
2064 .unwrap();
2065 let b = StructArray::from(b_data);
2066 let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
2067 .len(6)
2068 .null_bit_buffer(Some(Buffer::from([0b00100101])))
2069 .add_child_data(b.into_data())
2070 .build()
2071 .unwrap();
2072 let a = StructArray::from(a_data);
2073
2074 assert_eq!(a.null_count(), 3);
2075 assert_eq!(a.column(0).null_count(), 0);
2076
2077 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2079
2080 roundtrip(batch, Some(SMALL_SIZE / 2));
2081 }
2082
2083 #[test]
2084 fn test_fixed_size_binary_in_dict() {
2085 fn test_fixed_size_binary_in_dict_inner<K>()
2086 where
2087 K: ArrowDictionaryKeyType,
2088 K::Native: FromPrimitive + ToPrimitive + TryFrom<u8>,
2089 <<K as arrow_array::ArrowPrimitiveType>::Native as TryFrom<u8>>::Error: std::fmt::Debug,
2090 {
2091 let field = Field::new(
2092 "a",
2093 DataType::Dictionary(
2094 Box::new(K::DATA_TYPE),
2095 Box::new(DataType::FixedSizeBinary(4)),
2096 ),
2097 false,
2098 );
2099 let schema = Schema::new(vec![field]);
2100
2101 let keys: Vec<K::Native> = vec![
2102 K::Native::try_from(0u8).unwrap(),
2103 K::Native::try_from(0u8).unwrap(),
2104 K::Native::try_from(1u8).unwrap(),
2105 ];
2106 let keys = PrimitiveArray::<K>::from_iter_values(keys);
2107 let values = FixedSizeBinaryArray::try_from_iter(
2108 vec![vec![0, 0, 0, 0], vec![1, 1, 1, 1]].into_iter(),
2109 )
2110 .unwrap();
2111
2112 let data = DictionaryArray::<K>::new(keys, Arc::new(values));
2113 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(data)]).unwrap();
2114 roundtrip(batch, None);
2115 }
2116
2117 test_fixed_size_binary_in_dict_inner::<UInt8Type>();
2118 test_fixed_size_binary_in_dict_inner::<UInt16Type>();
2119 test_fixed_size_binary_in_dict_inner::<UInt32Type>();
2120 test_fixed_size_binary_in_dict_inner::<UInt16Type>();
2121 test_fixed_size_binary_in_dict_inner::<Int8Type>();
2122 test_fixed_size_binary_in_dict_inner::<Int16Type>();
2123 test_fixed_size_binary_in_dict_inner::<Int32Type>();
2124 test_fixed_size_binary_in_dict_inner::<Int64Type>();
2125 }
2126
2127 #[test]
2128 fn test_empty_dict() {
2129 let struct_fields = Fields::from(vec![Field::new(
2130 "dict",
2131 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2132 false,
2133 )]);
2134
2135 let schema = Schema::new(vec![Field::new_struct(
2136 "struct",
2137 struct_fields.clone(),
2138 true,
2139 )]);
2140 let dictionary = Arc::new(DictionaryArray::new(
2141 Int32Array::new_null(5),
2142 Arc::new(StringArray::new_null(0)),
2143 ));
2144
2145 let s = StructArray::new(
2146 struct_fields,
2147 vec![dictionary],
2148 Some(NullBuffer::new_null(5)),
2149 );
2150
2151 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(s)]).unwrap();
2152 roundtrip(batch, None);
2153 }
2154 #[test]
2155 fn arrow_writer_page_size() {
2156 let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)]));
2157
2158 let mut builder = StringBuilder::with_capacity(100, 329 * 10_000);
2159
2160 for i in 0..10 {
2162 let value = i
2163 .to_string()
2164 .repeat(10)
2165 .chars()
2166 .take(10)
2167 .collect::<String>();
2168
2169 builder.append_value(value);
2170 }
2171
2172 let array = Arc::new(builder.finish());
2173
2174 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
2175
2176 let file = tempfile::tempfile().unwrap();
2177
2178 let props = WriterProperties::builder()
2180 .set_data_page_size_limit(1)
2181 .set_dictionary_page_size_limit(1)
2182 .set_write_batch_size(1)
2183 .build();
2184
2185 let mut writer =
2186 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
2187 .expect("Unable to write file");
2188 writer.write(&batch).unwrap();
2189 writer.close().unwrap();
2190
2191 let options = ReadOptionsBuilder::new().with_page_index().build();
2192 let reader =
2193 SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap();
2194
2195 let column = reader.metadata().row_group(0).columns();
2196
2197 assert_eq!(column.len(), 1);
2198
2199 assert!(
2202 column[0].dictionary_page_offset().is_some(),
2203 "Expected a dictionary page"
2204 );
2205
2206 assert!(reader.metadata().offset_index().is_some());
2207 let offset_indexes = &reader.metadata().offset_index().unwrap()[0];
2208
2209 let page_locations = offset_indexes[0].page_locations.clone();
2210
2211 assert_eq!(
2214 page_locations.len(),
2215 10,
2216 "Expected 10 pages but got {page_locations:#?}"
2217 );
2218 }
2219
2220 #[test]
2221 fn arrow_writer_float_nans() {
2222 let f16_field = Field::new("a", DataType::Float16, false);
2223 let f32_field = Field::new("b", DataType::Float32, false);
2224 let f64_field = Field::new("c", DataType::Float64, false);
2225 let schema = Schema::new(vec![f16_field, f32_field, f64_field]);
2226
2227 let f16_values = (0..MEDIUM_SIZE)
2228 .map(|i| {
2229 Some(if i % 2 == 0 {
2230 f16::NAN
2231 } else {
2232 f16::from_f32(i as f32)
2233 })
2234 })
2235 .collect::<Float16Array>();
2236
2237 let f32_values = (0..MEDIUM_SIZE)
2238 .map(|i| Some(if i % 2 == 0 { f32::NAN } else { i as f32 }))
2239 .collect::<Float32Array>();
2240
2241 let f64_values = (0..MEDIUM_SIZE)
2242 .map(|i| Some(if i % 2 == 0 { f64::NAN } else { i as f64 }))
2243 .collect::<Float64Array>();
2244
2245 let batch = RecordBatch::try_new(
2246 Arc::new(schema),
2247 vec![
2248 Arc::new(f16_values),
2249 Arc::new(f32_values),
2250 Arc::new(f64_values),
2251 ],
2252 )
2253 .unwrap();
2254
2255 roundtrip(batch, None);
2256 }
2257
2258 const SMALL_SIZE: usize = 7;
2259 const MEDIUM_SIZE: usize = 63;
2260
2261 fn roundtrip(expected_batch: RecordBatch, max_row_group_size: Option<usize>) -> Vec<File> {
2262 let mut files = vec![];
2263 for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2264 let mut props = WriterProperties::builder().set_writer_version(version);
2265
2266 if let Some(size) = max_row_group_size {
2267 props = props.set_max_row_group_size(size)
2268 }
2269
2270 let props = props.build();
2271 files.push(roundtrip_opts(&expected_batch, props))
2272 }
2273 files
2274 }
2275
2276 fn roundtrip_opts_with_array_validation<F>(
2277 expected_batch: &RecordBatch,
2278 props: WriterProperties,
2279 validate: F,
2280 ) -> File
2281 where
2282 F: Fn(&ArrayData, &ArrayData),
2283 {
2284 let file = tempfile::tempfile().unwrap();
2285
2286 let mut writer = ArrowWriter::try_new(
2287 file.try_clone().unwrap(),
2288 expected_batch.schema(),
2289 Some(props),
2290 )
2291 .expect("Unable to write file");
2292 writer.write(expected_batch).unwrap();
2293 writer.close().unwrap();
2294
2295 let mut record_batch_reader =
2296 ParquetRecordBatchReader::try_new(file.try_clone().unwrap(), 1024).unwrap();
2297
2298 let actual_batch = record_batch_reader
2299 .next()
2300 .expect("No batch found")
2301 .expect("Unable to get batch");
2302
2303 assert_eq!(expected_batch.schema(), actual_batch.schema());
2304 assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
2305 assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
2306 for i in 0..expected_batch.num_columns() {
2307 let expected_data = expected_batch.column(i).to_data();
2308 let actual_data = actual_batch.column(i).to_data();
2309 validate(&expected_data, &actual_data);
2310 }
2311
2312 file
2313 }
2314
2315 fn roundtrip_opts(expected_batch: &RecordBatch, props: WriterProperties) -> File {
2316 roundtrip_opts_with_array_validation(expected_batch, props, |a, b| {
2317 a.validate_full().expect("valid expected data");
2318 b.validate_full().expect("valid actual data");
2319 assert_eq!(a, b)
2320 })
2321 }
2322
2323 struct RoundTripOptions {
2324 values: ArrayRef,
2325 schema: SchemaRef,
2326 bloom_filter: bool,
2327 bloom_filter_position: BloomFilterPosition,
2328 }
2329
2330 impl RoundTripOptions {
2331 fn new(values: ArrayRef, nullable: bool) -> Self {
2332 let data_type = values.data_type().clone();
2333 let schema = Schema::new(vec![Field::new("col", data_type, nullable)]);
2334 Self {
2335 values,
2336 schema: Arc::new(schema),
2337 bloom_filter: false,
2338 bloom_filter_position: BloomFilterPosition::AfterRowGroup,
2339 }
2340 }
2341 }
2342
2343 fn one_column_roundtrip(values: ArrayRef, nullable: bool) -> Vec<File> {
2344 one_column_roundtrip_with_options(RoundTripOptions::new(values, nullable))
2345 }
2346
2347 fn one_column_roundtrip_with_schema(values: ArrayRef, schema: SchemaRef) -> Vec<File> {
2348 let mut options = RoundTripOptions::new(values, false);
2349 options.schema = schema;
2350 one_column_roundtrip_with_options(options)
2351 }
2352
2353 fn one_column_roundtrip_with_options(options: RoundTripOptions) -> Vec<File> {
2354 let RoundTripOptions {
2355 values,
2356 schema,
2357 bloom_filter,
2358 bloom_filter_position,
2359 } = options;
2360
2361 let encodings = match values.data_type() {
2362 DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary | DataType::LargeBinary => {
2363 vec![
2364 Encoding::PLAIN,
2365 Encoding::DELTA_BYTE_ARRAY,
2366 Encoding::DELTA_LENGTH_BYTE_ARRAY,
2367 ]
2368 }
2369 DataType::Int64
2370 | DataType::Int32
2371 | DataType::Int16
2372 | DataType::Int8
2373 | DataType::UInt64
2374 | DataType::UInt32
2375 | DataType::UInt16
2376 | DataType::UInt8 => vec![
2377 Encoding::PLAIN,
2378 Encoding::DELTA_BINARY_PACKED,
2379 Encoding::BYTE_STREAM_SPLIT,
2380 ],
2381 DataType::Float32 | DataType::Float64 => {
2382 vec![Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT]
2383 }
2384 _ => vec![Encoding::PLAIN],
2385 };
2386
2387 let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2388
2389 let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
2390
2391 let mut files = vec![];
2392 for dictionary_size in [0, 1, 1024] {
2393 for encoding in &encodings {
2394 for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2395 for row_group_size in row_group_sizes {
2396 let props = WriterProperties::builder()
2397 .set_writer_version(version)
2398 .set_max_row_group_size(row_group_size)
2399 .set_dictionary_enabled(dictionary_size != 0)
2400 .set_dictionary_page_size_limit(dictionary_size.max(1))
2401 .set_encoding(*encoding)
2402 .set_bloom_filter_enabled(bloom_filter)
2403 .set_bloom_filter_position(bloom_filter_position)
2404 .build();
2405
2406 files.push(roundtrip_opts(&expected_batch, props))
2407 }
2408 }
2409 }
2410 }
2411 files
2412 }
2413
2414 fn values_required<A, I>(iter: I) -> Vec<File>
2415 where
2416 A: From<Vec<I::Item>> + Array + 'static,
2417 I: IntoIterator,
2418 {
2419 let raw_values: Vec<_> = iter.into_iter().collect();
2420 let values = Arc::new(A::from(raw_values));
2421 one_column_roundtrip(values, false)
2422 }
2423
2424 fn values_optional<A, I>(iter: I) -> Vec<File>
2425 where
2426 A: From<Vec<Option<I::Item>>> + Array + 'static,
2427 I: IntoIterator,
2428 {
2429 let optional_raw_values: Vec<_> = iter
2430 .into_iter()
2431 .enumerate()
2432 .map(|(i, v)| if i % 2 == 0 { None } else { Some(v) })
2433 .collect();
2434 let optional_values = Arc::new(A::from(optional_raw_values));
2435 one_column_roundtrip(optional_values, true)
2436 }
2437
2438 fn required_and_optional<A, I>(iter: I)
2439 where
2440 A: From<Vec<I::Item>> + From<Vec<Option<I::Item>>> + Array + 'static,
2441 I: IntoIterator + Clone,
2442 {
2443 values_required::<A, I>(iter.clone());
2444 values_optional::<A, I>(iter);
2445 }
2446
2447 fn check_bloom_filter<T: AsBytes>(
2448 files: Vec<File>,
2449 file_column: String,
2450 positive_values: Vec<T>,
2451 negative_values: Vec<T>,
2452 ) {
2453 files.into_iter().take(1).for_each(|file| {
2454 let file_reader = SerializedFileReader::new_with_options(
2455 file,
2456 ReadOptionsBuilder::new()
2457 .with_reader_properties(
2458 ReaderProperties::builder()
2459 .set_read_bloom_filter(true)
2460 .build(),
2461 )
2462 .build(),
2463 )
2464 .expect("Unable to open file as Parquet");
2465 let metadata = file_reader.metadata();
2466
2467 let mut bloom_filters: Vec<_> = vec![];
2469 for (ri, row_group) in metadata.row_groups().iter().enumerate() {
2470 if let Some((column_index, _)) = row_group
2471 .columns()
2472 .iter()
2473 .enumerate()
2474 .find(|(_, column)| column.column_path().string() == file_column)
2475 {
2476 let row_group_reader = file_reader
2477 .get_row_group(ri)
2478 .expect("Unable to read row group");
2479 if let Some(sbbf) = row_group_reader.get_column_bloom_filter(column_index) {
2480 bloom_filters.push(sbbf.clone());
2481 } else {
2482 panic!("No bloom filter for column named {file_column} found");
2483 }
2484 } else {
2485 panic!("No column named {file_column} found");
2486 }
2487 }
2488
2489 positive_values.iter().for_each(|value| {
2490 let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2491 assert!(
2492 found.is_some(),
2493 "{}",
2494 format!("Value {:?} should be in bloom filter", value.as_bytes())
2495 );
2496 });
2497
2498 negative_values.iter().for_each(|value| {
2499 let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2500 assert!(
2501 found.is_none(),
2502 "{}",
2503 format!("Value {:?} should not be in bloom filter", value.as_bytes())
2504 );
2505 });
2506 });
2507 }
2508
2509 #[test]
2510 fn all_null_primitive_single_column() {
2511 let values = Arc::new(Int32Array::from(vec![None; SMALL_SIZE]));
2512 one_column_roundtrip(values, true);
2513 }
2514 #[test]
2515 fn null_single_column() {
2516 let values = Arc::new(NullArray::new(SMALL_SIZE));
2517 one_column_roundtrip(values, true);
2518 }
2520
2521 #[test]
2522 fn bool_single_column() {
2523 required_and_optional::<BooleanArray, _>(
2524 [true, false].iter().cycle().copied().take(SMALL_SIZE),
2525 );
2526 }
2527
2528 #[test]
2529 fn bool_large_single_column() {
2530 let values = Arc::new(
2531 [None, Some(true), Some(false)]
2532 .iter()
2533 .cycle()
2534 .copied()
2535 .take(200_000)
2536 .collect::<BooleanArray>(),
2537 );
2538 let schema = Schema::new(vec![Field::new("col", values.data_type().clone(), true)]);
2539 let expected_batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2540 let file = tempfile::tempfile().unwrap();
2541
2542 let mut writer =
2543 ArrowWriter::try_new(file.try_clone().unwrap(), expected_batch.schema(), None)
2544 .expect("Unable to write file");
2545 writer.write(&expected_batch).unwrap();
2546 writer.close().unwrap();
2547 }
2548
2549 #[test]
2550 fn check_page_offset_index_with_nan() {
2551 let values = Arc::new(Float64Array::from(vec![f64::NAN; 10]));
2552 let schema = Schema::new(vec![Field::new("col", DataType::Float64, true)]);
2553 let batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2554
2555 let mut out = Vec::with_capacity(1024);
2556 let mut writer =
2557 ArrowWriter::try_new(&mut out, batch.schema(), None).expect("Unable to write file");
2558 writer.write(&batch).unwrap();
2559 let file_meta_data = writer.close().unwrap();
2560 for row_group in file_meta_data.row_groups {
2561 for column in row_group.columns {
2562 assert!(column.offset_index_offset.is_some());
2563 assert!(column.offset_index_length.is_some());
2564 assert!(column.column_index_offset.is_none());
2565 assert!(column.column_index_length.is_none());
2566 }
2567 }
2568 }
2569
2570 #[test]
2571 fn i8_single_column() {
2572 required_and_optional::<Int8Array, _>(0..SMALL_SIZE as i8);
2573 }
2574
2575 #[test]
2576 fn i16_single_column() {
2577 required_and_optional::<Int16Array, _>(0..SMALL_SIZE as i16);
2578 }
2579
2580 #[test]
2581 fn i32_single_column() {
2582 required_and_optional::<Int32Array, _>(0..SMALL_SIZE as i32);
2583 }
2584
2585 #[test]
2586 fn i64_single_column() {
2587 required_and_optional::<Int64Array, _>(0..SMALL_SIZE as i64);
2588 }
2589
2590 #[test]
2591 fn u8_single_column() {
2592 required_and_optional::<UInt8Array, _>(0..SMALL_SIZE as u8);
2593 }
2594
2595 #[test]
2596 fn u16_single_column() {
2597 required_and_optional::<UInt16Array, _>(0..SMALL_SIZE as u16);
2598 }
2599
2600 #[test]
2601 fn u32_single_column() {
2602 required_and_optional::<UInt32Array, _>(0..SMALL_SIZE as u32);
2603 }
2604
2605 #[test]
2606 fn u64_single_column() {
2607 required_and_optional::<UInt64Array, _>(0..SMALL_SIZE as u64);
2608 }
2609
2610 #[test]
2611 fn f32_single_column() {
2612 required_and_optional::<Float32Array, _>((0..SMALL_SIZE).map(|i| i as f32));
2613 }
2614
2615 #[test]
2616 fn f64_single_column() {
2617 required_and_optional::<Float64Array, _>((0..SMALL_SIZE).map(|i| i as f64));
2618 }
2619
2620 #[test]
2625 fn timestamp_second_single_column() {
2626 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2627 let values = Arc::new(TimestampSecondArray::from(raw_values));
2628
2629 one_column_roundtrip(values, false);
2630 }
2631
2632 #[test]
2633 fn timestamp_millisecond_single_column() {
2634 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2635 let values = Arc::new(TimestampMillisecondArray::from(raw_values));
2636
2637 one_column_roundtrip(values, false);
2638 }
2639
2640 #[test]
2641 fn timestamp_microsecond_single_column() {
2642 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2643 let values = Arc::new(TimestampMicrosecondArray::from(raw_values));
2644
2645 one_column_roundtrip(values, false);
2646 }
2647
2648 #[test]
2649 fn timestamp_nanosecond_single_column() {
2650 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2651 let values = Arc::new(TimestampNanosecondArray::from(raw_values));
2652
2653 one_column_roundtrip(values, false);
2654 }
2655
2656 #[test]
2657 fn date32_single_column() {
2658 required_and_optional::<Date32Array, _>(0..SMALL_SIZE as i32);
2659 }
2660
2661 #[test]
2662 fn date64_single_column() {
2663 required_and_optional::<Date64Array, _>(
2665 (0..(SMALL_SIZE as i64 * 86400000)).step_by(86400000),
2666 );
2667 }
2668
2669 #[test]
2670 fn time32_second_single_column() {
2671 required_and_optional::<Time32SecondArray, _>(0..SMALL_SIZE as i32);
2672 }
2673
2674 #[test]
2675 fn time32_millisecond_single_column() {
2676 required_and_optional::<Time32MillisecondArray, _>(0..SMALL_SIZE as i32);
2677 }
2678
2679 #[test]
2680 fn time64_microsecond_single_column() {
2681 required_and_optional::<Time64MicrosecondArray, _>(0..SMALL_SIZE as i64);
2682 }
2683
2684 #[test]
2685 fn time64_nanosecond_single_column() {
2686 required_and_optional::<Time64NanosecondArray, _>(0..SMALL_SIZE as i64);
2687 }
2688
2689 #[test]
2690 fn duration_second_single_column() {
2691 required_and_optional::<DurationSecondArray, _>(0..SMALL_SIZE as i64);
2692 }
2693
2694 #[test]
2695 fn duration_millisecond_single_column() {
2696 required_and_optional::<DurationMillisecondArray, _>(0..SMALL_SIZE as i64);
2697 }
2698
2699 #[test]
2700 fn duration_microsecond_single_column() {
2701 required_and_optional::<DurationMicrosecondArray, _>(0..SMALL_SIZE as i64);
2702 }
2703
2704 #[test]
2705 fn duration_nanosecond_single_column() {
2706 required_and_optional::<DurationNanosecondArray, _>(0..SMALL_SIZE as i64);
2707 }
2708
2709 #[test]
2710 fn interval_year_month_single_column() {
2711 required_and_optional::<IntervalYearMonthArray, _>(0..SMALL_SIZE as i32);
2712 }
2713
2714 #[test]
2715 fn interval_day_time_single_column() {
2716 required_and_optional::<IntervalDayTimeArray, _>(vec![
2717 IntervalDayTime::new(0, 1),
2718 IntervalDayTime::new(0, 3),
2719 IntervalDayTime::new(3, -2),
2720 IntervalDayTime::new(-200, 4),
2721 ]);
2722 }
2723
2724 #[test]
2725 #[should_panic(
2726 expected = "Attempting to write an Arrow interval type MonthDayNano to parquet that is not yet implemented"
2727 )]
2728 fn interval_month_day_nano_single_column() {
2729 required_and_optional::<IntervalMonthDayNanoArray, _>(vec![
2730 IntervalMonthDayNano::new(0, 1, 5),
2731 IntervalMonthDayNano::new(0, 3, 2),
2732 IntervalMonthDayNano::new(3, -2, -5),
2733 IntervalMonthDayNano::new(-200, 4, -1),
2734 ]);
2735 }
2736
2737 #[test]
2738 fn binary_single_column() {
2739 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2740 let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
2741 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2742
2743 values_required::<BinaryArray, _>(many_vecs_iter);
2745 }
2746
2747 #[test]
2748 fn binary_view_single_column() {
2749 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2750 let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
2751 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2752
2753 values_required::<BinaryViewArray, _>(many_vecs_iter);
2755 }
2756
2757 #[test]
2758 fn i32_column_bloom_filter_at_end() {
2759 let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
2760 let mut options = RoundTripOptions::new(array, false);
2761 options.bloom_filter = true;
2762 options.bloom_filter_position = BloomFilterPosition::End;
2763
2764 let files = one_column_roundtrip_with_options(options);
2765 check_bloom_filter(
2766 files,
2767 "col".to_string(),
2768 (0..SMALL_SIZE as i32).collect(),
2769 (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
2770 );
2771 }
2772
2773 #[test]
2774 fn i32_column_bloom_filter() {
2775 let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
2776 let mut options = RoundTripOptions::new(array, false);
2777 options.bloom_filter = true;
2778
2779 let files = one_column_roundtrip_with_options(options);
2780 check_bloom_filter(
2781 files,
2782 "col".to_string(),
2783 (0..SMALL_SIZE as i32).collect(),
2784 (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
2785 );
2786 }
2787
2788 #[test]
2789 fn binary_column_bloom_filter() {
2790 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2791 let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
2792 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2793
2794 let array = Arc::new(BinaryArray::from_iter_values(many_vecs_iter));
2795 let mut options = RoundTripOptions::new(array, false);
2796 options.bloom_filter = true;
2797
2798 let files = one_column_roundtrip_with_options(options);
2799 check_bloom_filter(
2800 files,
2801 "col".to_string(),
2802 many_vecs,
2803 vec![vec![(SMALL_SIZE + 1) as u8]],
2804 );
2805 }
2806
2807 #[test]
2808 fn empty_string_null_column_bloom_filter() {
2809 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2810 let raw_strs = raw_values.iter().map(|s| s.as_str());
2811
2812 let array = Arc::new(StringArray::from_iter_values(raw_strs));
2813 let mut options = RoundTripOptions::new(array, false);
2814 options.bloom_filter = true;
2815
2816 let files = one_column_roundtrip_with_options(options);
2817
2818 let optional_raw_values: Vec<_> = raw_values
2819 .iter()
2820 .enumerate()
2821 .filter_map(|(i, v)| if i % 2 == 0 { None } else { Some(v.as_str()) })
2822 .collect();
2823 check_bloom_filter(files, "col".to_string(), optional_raw_values, vec![""]);
2825 }
2826
2827 #[test]
2828 fn large_binary_single_column() {
2829 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2830 let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
2831 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2832
2833 values_required::<LargeBinaryArray, _>(many_vecs_iter);
2835 }
2836
2837 #[test]
2838 fn fixed_size_binary_single_column() {
2839 let mut builder = FixedSizeBinaryBuilder::new(4);
2840 builder.append_value(b"0123").unwrap();
2841 builder.append_null();
2842 builder.append_value(b"8910").unwrap();
2843 builder.append_value(b"1112").unwrap();
2844 let array = Arc::new(builder.finish());
2845
2846 one_column_roundtrip(array, true);
2847 }
2848
2849 #[test]
2850 fn string_single_column() {
2851 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2852 let raw_strs = raw_values.iter().map(|s| s.as_str());
2853
2854 required_and_optional::<StringArray, _>(raw_strs);
2855 }
2856
2857 #[test]
2858 fn large_string_single_column() {
2859 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2860 let raw_strs = raw_values.iter().map(|s| s.as_str());
2861
2862 required_and_optional::<LargeStringArray, _>(raw_strs);
2863 }
2864
2865 #[test]
2866 fn string_view_single_column() {
2867 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2868 let raw_strs = raw_values.iter().map(|s| s.as_str());
2869
2870 required_and_optional::<StringViewArray, _>(raw_strs);
2871 }
2872
2873 #[test]
2874 fn null_list_single_column() {
2875 let null_field = Field::new_list_field(DataType::Null, true);
2876 let list_field = Field::new("emptylist", DataType::List(Arc::new(null_field)), true);
2877
2878 let schema = Schema::new(vec![list_field]);
2879
2880 let a_values = NullArray::new(2);
2882 let a_value_offsets = arrow::buffer::Buffer::from([0, 0, 0, 2].to_byte_slice());
2883 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2884 DataType::Null,
2885 true,
2886 ))))
2887 .len(3)
2888 .add_buffer(a_value_offsets)
2889 .null_bit_buffer(Some(Buffer::from([0b00000101])))
2890 .add_child_data(a_values.into_data())
2891 .build()
2892 .unwrap();
2893
2894 let a = ListArray::from(a_list_data);
2895
2896 assert!(a.is_valid(0));
2897 assert!(!a.is_valid(1));
2898 assert!(a.is_valid(2));
2899
2900 assert_eq!(a.value(0).len(), 0);
2901 assert_eq!(a.value(2).len(), 2);
2902 assert_eq!(a.value(2).logical_nulls().unwrap().null_count(), 2);
2903
2904 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2905 roundtrip(batch, None);
2906 }
2907
2908 #[test]
2909 fn list_single_column() {
2910 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2911 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
2912 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2913 DataType::Int32,
2914 false,
2915 ))))
2916 .len(5)
2917 .add_buffer(a_value_offsets)
2918 .null_bit_buffer(Some(Buffer::from([0b00011011])))
2919 .add_child_data(a_values.into_data())
2920 .build()
2921 .unwrap();
2922
2923 assert_eq!(a_list_data.null_count(), 1);
2924
2925 let a = ListArray::from(a_list_data);
2926 let values = Arc::new(a);
2927
2928 one_column_roundtrip(values, true);
2929 }
2930
2931 #[test]
2932 fn large_list_single_column() {
2933 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2934 let a_value_offsets = arrow::buffer::Buffer::from([0i64, 1, 3, 3, 6, 10].to_byte_slice());
2935 let a_list_data = ArrayData::builder(DataType::LargeList(Arc::new(Field::new(
2936 "large_item",
2937 DataType::Int32,
2938 true,
2939 ))))
2940 .len(5)
2941 .add_buffer(a_value_offsets)
2942 .add_child_data(a_values.into_data())
2943 .null_bit_buffer(Some(Buffer::from([0b00011011])))
2944 .build()
2945 .unwrap();
2946
2947 assert_eq!(a_list_data.null_count(), 1);
2949
2950 let a = LargeListArray::from(a_list_data);
2951 let values = Arc::new(a);
2952
2953 one_column_roundtrip(values, true);
2954 }
2955
2956 #[test]
2957 fn list_nested_nulls() {
2958 use arrow::datatypes::Int32Type;
2959 let data = vec![
2960 Some(vec![Some(1)]),
2961 Some(vec![Some(2), Some(3)]),
2962 None,
2963 Some(vec![Some(4), Some(5), None]),
2964 Some(vec![None]),
2965 Some(vec![Some(6), Some(7)]),
2966 ];
2967
2968 let list = ListArray::from_iter_primitive::<Int32Type, _, _>(data.clone());
2969 one_column_roundtrip(Arc::new(list), true);
2970
2971 let list = LargeListArray::from_iter_primitive::<Int32Type, _, _>(data);
2972 one_column_roundtrip(Arc::new(list), true);
2973 }
2974
2975 #[test]
2976 fn struct_single_column() {
2977 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2978 let struct_field_a = Arc::new(Field::new("f", DataType::Int32, false));
2979 let s = StructArray::from(vec![(struct_field_a, Arc::new(a_values) as ArrayRef)]);
2980
2981 let values = Arc::new(s);
2982 one_column_roundtrip(values, false);
2983 }
2984
2985 #[test]
2986 fn list_and_map_coerced_names() {
2987 let list_field =
2989 Field::new_list("my_list", Field::new("item", DataType::Int32, false), false);
2990 let map_field = Field::new_map(
2991 "my_map",
2992 "entries",
2993 Field::new("keys", DataType::Int32, false),
2994 Field::new("values", DataType::Int32, true),
2995 false,
2996 true,
2997 );
2998
2999 let list_array = create_random_array(&list_field, 100, 0.0, 0.0).unwrap();
3000 let map_array = create_random_array(&map_field, 100, 0.0, 0.0).unwrap();
3001
3002 let arrow_schema = Arc::new(Schema::new(vec![list_field, map_field]));
3003
3004 let props = Some(WriterProperties::builder().set_coerce_types(true).build());
3006 let file = tempfile::tempfile().unwrap();
3007 let mut writer =
3008 ArrowWriter::try_new(file.try_clone().unwrap(), arrow_schema.clone(), props).unwrap();
3009
3010 let batch = RecordBatch::try_new(arrow_schema, vec![list_array, map_array]).unwrap();
3011 writer.write(&batch).unwrap();
3012 let file_metadata = writer.close().unwrap();
3013
3014 assert_eq!(file_metadata.schema[3].name, "element");
3016 assert_eq!(file_metadata.schema[5].name, "key_value");
3018 assert_eq!(file_metadata.schema[6].name, "key");
3020 assert_eq!(file_metadata.schema[7].name, "value");
3022
3023 let reader = SerializedFileReader::new(file).unwrap();
3025 let file_schema = reader.metadata().file_metadata().schema();
3026 let fields = file_schema.get_fields();
3027 let list_field = &fields[0].get_fields()[0];
3028 assert_eq!(list_field.get_fields()[0].name(), "element");
3029 let map_field = &fields[1].get_fields()[0];
3030 assert_eq!(map_field.name(), "key_value");
3031 assert_eq!(map_field.get_fields()[0].name(), "key");
3032 assert_eq!(map_field.get_fields()[1].name(), "value");
3033 }
3034
3035 #[test]
3036 fn fallback_flush_data_page() {
3037 let raw_values: Vec<_> = (0..MEDIUM_SIZE).map(|i| i.to_string()).collect();
3039 let values = Arc::new(StringArray::from(raw_values));
3040 let encodings = vec![
3041 Encoding::DELTA_BYTE_ARRAY,
3042 Encoding::DELTA_LENGTH_BYTE_ARRAY,
3043 ];
3044 let data_type = values.data_type().clone();
3045 let schema = Arc::new(Schema::new(vec![Field::new("col", data_type, false)]));
3046 let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3047
3048 let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
3049 let data_page_size_limit: usize = 32;
3050 let write_batch_size: usize = 16;
3051
3052 for encoding in &encodings {
3053 for row_group_size in row_group_sizes {
3054 let props = WriterProperties::builder()
3055 .set_writer_version(WriterVersion::PARQUET_2_0)
3056 .set_max_row_group_size(row_group_size)
3057 .set_dictionary_enabled(false)
3058 .set_encoding(*encoding)
3059 .set_data_page_size_limit(data_page_size_limit)
3060 .set_write_batch_size(write_batch_size)
3061 .build();
3062
3063 roundtrip_opts_with_array_validation(&expected_batch, props, |a, b| {
3064 let string_array_a = StringArray::from(a.clone());
3065 let string_array_b = StringArray::from(b.clone());
3066 let vec_a: Vec<&str> = string_array_a.iter().map(|v| v.unwrap()).collect();
3067 let vec_b: Vec<&str> = string_array_b.iter().map(|v| v.unwrap()).collect();
3068 assert_eq!(
3069 vec_a, vec_b,
3070 "failed for encoder: {encoding:?} and row_group_size: {row_group_size:?}"
3071 );
3072 });
3073 }
3074 }
3075 }
3076
3077 #[test]
3078 fn arrow_writer_string_dictionary() {
3079 #[allow(deprecated)]
3081 let schema = Arc::new(Schema::new(vec![Field::new_dict(
3082 "dictionary",
3083 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3084 true,
3085 42,
3086 true,
3087 )]));
3088
3089 let d: Int32DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
3091 .iter()
3092 .copied()
3093 .collect();
3094
3095 one_column_roundtrip_with_schema(Arc::new(d), schema);
3097 }
3098
3099 #[test]
3100 fn arrow_writer_test_type_compatibility() {
3101 fn ensure_compatible_write<T1, T2>(array1: T1, array2: T2, expected_result: T1)
3102 where
3103 T1: Array + 'static,
3104 T2: Array + 'static,
3105 {
3106 let schema1 = Arc::new(Schema::new(vec![Field::new(
3107 "a",
3108 array1.data_type().clone(),
3109 false,
3110 )]));
3111
3112 let file = tempfile().unwrap();
3113 let mut writer =
3114 ArrowWriter::try_new(file.try_clone().unwrap(), schema1.clone(), None).unwrap();
3115
3116 let rb1 = RecordBatch::try_new(schema1.clone(), vec![Arc::new(array1)]).unwrap();
3117 writer.write(&rb1).unwrap();
3118
3119 let schema2 = Arc::new(Schema::new(vec![Field::new(
3120 "a",
3121 array2.data_type().clone(),
3122 false,
3123 )]));
3124 let rb2 = RecordBatch::try_new(schema2, vec![Arc::new(array2)]).unwrap();
3125 writer.write(&rb2).unwrap();
3126
3127 writer.close().unwrap();
3128
3129 let mut record_batch_reader =
3130 ParquetRecordBatchReader::try_new(file.try_clone().unwrap(), 1024).unwrap();
3131 let actual_batch = record_batch_reader.next().unwrap().unwrap();
3132
3133 let expected_batch =
3134 RecordBatch::try_new(schema1, vec![Arc::new(expected_result)]).unwrap();
3135 assert_eq!(actual_batch, expected_batch);
3136 }
3137
3138 ensure_compatible_write(
3141 DictionaryArray::new(
3142 UInt8Array::from_iter_values(vec![0]),
3143 Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3144 ),
3145 StringArray::from_iter_values(vec!["barquet"]),
3146 DictionaryArray::new(
3147 UInt8Array::from_iter_values(vec![0, 1]),
3148 Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3149 ),
3150 );
3151
3152 ensure_compatible_write(
3153 StringArray::from_iter_values(vec!["parquet"]),
3154 DictionaryArray::new(
3155 UInt8Array::from_iter_values(vec![0]),
3156 Arc::new(StringArray::from_iter_values(vec!["barquet"])),
3157 ),
3158 StringArray::from_iter_values(vec!["parquet", "barquet"]),
3159 );
3160
3161 ensure_compatible_write(
3164 DictionaryArray::new(
3165 UInt8Array::from_iter_values(vec![0]),
3166 Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3167 ),
3168 DictionaryArray::new(
3169 UInt16Array::from_iter_values(vec![0]),
3170 Arc::new(StringArray::from_iter_values(vec!["barquet"])),
3171 ),
3172 DictionaryArray::new(
3173 UInt8Array::from_iter_values(vec![0, 1]),
3174 Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3175 ),
3176 );
3177
3178 ensure_compatible_write(
3180 DictionaryArray::new(
3181 UInt8Array::from_iter_values(vec![0]),
3182 Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3183 ),
3184 DictionaryArray::new(
3185 UInt8Array::from_iter_values(vec![0]),
3186 Arc::new(LargeStringArray::from_iter_values(vec!["barquet"])),
3187 ),
3188 DictionaryArray::new(
3189 UInt8Array::from_iter_values(vec![0, 1]),
3190 Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3191 ),
3192 );
3193
3194 ensure_compatible_write(
3196 DictionaryArray::new(
3197 UInt8Array::from_iter_values(vec![0]),
3198 Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3199 ),
3200 LargeStringArray::from_iter_values(vec!["barquet"]),
3201 DictionaryArray::new(
3202 UInt8Array::from_iter_values(vec![0, 1]),
3203 Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3204 ),
3205 );
3206
3207 ensure_compatible_write(
3210 StringArray::from_iter_values(vec!["parquet"]),
3211 LargeStringArray::from_iter_values(vec!["barquet"]),
3212 StringArray::from_iter_values(vec!["parquet", "barquet"]),
3213 );
3214
3215 ensure_compatible_write(
3216 LargeStringArray::from_iter_values(vec!["parquet"]),
3217 StringArray::from_iter_values(vec!["barquet"]),
3218 LargeStringArray::from_iter_values(vec!["parquet", "barquet"]),
3219 );
3220
3221 ensure_compatible_write(
3222 StringArray::from_iter_values(vec!["parquet"]),
3223 StringViewArray::from_iter_values(vec!["barquet"]),
3224 StringArray::from_iter_values(vec!["parquet", "barquet"]),
3225 );
3226
3227 ensure_compatible_write(
3228 StringViewArray::from_iter_values(vec!["parquet"]),
3229 StringArray::from_iter_values(vec!["barquet"]),
3230 StringViewArray::from_iter_values(vec!["parquet", "barquet"]),
3231 );
3232
3233 ensure_compatible_write(
3234 LargeStringArray::from_iter_values(vec!["parquet"]),
3235 StringViewArray::from_iter_values(vec!["barquet"]),
3236 LargeStringArray::from_iter_values(vec!["parquet", "barquet"]),
3237 );
3238
3239 ensure_compatible_write(
3240 StringViewArray::from_iter_values(vec!["parquet"]),
3241 LargeStringArray::from_iter_values(vec!["barquet"]),
3242 StringViewArray::from_iter_values(vec!["parquet", "barquet"]),
3243 );
3244
3245 ensure_compatible_write(
3248 BinaryArray::from_iter_values(vec![b"parquet"]),
3249 LargeBinaryArray::from_iter_values(vec![b"barquet"]),
3250 BinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3251 );
3252
3253 ensure_compatible_write(
3254 LargeBinaryArray::from_iter_values(vec![b"parquet"]),
3255 BinaryArray::from_iter_values(vec![b"barquet"]),
3256 LargeBinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3257 );
3258
3259 ensure_compatible_write(
3260 BinaryArray::from_iter_values(vec![b"parquet"]),
3261 BinaryViewArray::from_iter_values(vec![b"barquet"]),
3262 BinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3263 );
3264
3265 ensure_compatible_write(
3266 BinaryViewArray::from_iter_values(vec![b"parquet"]),
3267 BinaryArray::from_iter_values(vec![b"barquet"]),
3268 BinaryViewArray::from_iter_values(vec![b"parquet", b"barquet"]),
3269 );
3270
3271 ensure_compatible_write(
3272 BinaryViewArray::from_iter_values(vec![b"parquet"]),
3273 LargeBinaryArray::from_iter_values(vec![b"barquet"]),
3274 BinaryViewArray::from_iter_values(vec![b"parquet", b"barquet"]),
3275 );
3276
3277 ensure_compatible_write(
3278 LargeBinaryArray::from_iter_values(vec![b"parquet"]),
3279 BinaryViewArray::from_iter_values(vec![b"barquet"]),
3280 LargeBinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3281 );
3282 }
3283
3284 #[test]
3285 fn arrow_writer_primitive_dictionary() {
3286 #[allow(deprecated)]
3288 let schema = Arc::new(Schema::new(vec![Field::new_dict(
3289 "dictionary",
3290 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::UInt32)),
3291 true,
3292 42,
3293 true,
3294 )]));
3295
3296 let mut builder = PrimitiveDictionaryBuilder::<UInt8Type, UInt32Type>::new();
3298 builder.append(12345678).unwrap();
3299 builder.append_null();
3300 builder.append(22345678).unwrap();
3301 builder.append(12345678).unwrap();
3302 let d = builder.finish();
3303
3304 one_column_roundtrip_with_schema(Arc::new(d), schema);
3305 }
3306
3307 #[test]
3308 fn arrow_writer_decimal32_dictionary() {
3309 let integers = vec![12345, 56789, 34567];
3310
3311 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3312
3313 let values = Decimal32Array::from(integers.clone())
3314 .with_precision_and_scale(5, 2)
3315 .unwrap();
3316
3317 let array = DictionaryArray::new(keys, Arc::new(values));
3318 one_column_roundtrip(Arc::new(array.clone()), true);
3319
3320 let values = Decimal32Array::from(integers)
3321 .with_precision_and_scale(9, 2)
3322 .unwrap();
3323
3324 let array = array.with_values(Arc::new(values));
3325 one_column_roundtrip(Arc::new(array), true);
3326 }
3327
3328 #[test]
3329 fn arrow_writer_decimal64_dictionary() {
3330 let integers = vec![12345, 56789, 34567];
3331
3332 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3333
3334 let values = Decimal64Array::from(integers.clone())
3335 .with_precision_and_scale(5, 2)
3336 .unwrap();
3337
3338 let array = DictionaryArray::new(keys, Arc::new(values));
3339 one_column_roundtrip(Arc::new(array.clone()), true);
3340
3341 let values = Decimal64Array::from(integers)
3342 .with_precision_and_scale(12, 2)
3343 .unwrap();
3344
3345 let array = array.with_values(Arc::new(values));
3346 one_column_roundtrip(Arc::new(array), true);
3347 }
3348
3349 #[test]
3350 fn arrow_writer_decimal128_dictionary() {
3351 let integers = vec![12345, 56789, 34567];
3352
3353 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3354
3355 let values = Decimal128Array::from(integers.clone())
3356 .with_precision_and_scale(5, 2)
3357 .unwrap();
3358
3359 let array = DictionaryArray::new(keys, Arc::new(values));
3360 one_column_roundtrip(Arc::new(array.clone()), true);
3361
3362 let values = Decimal128Array::from(integers)
3363 .with_precision_and_scale(12, 2)
3364 .unwrap();
3365
3366 let array = array.with_values(Arc::new(values));
3367 one_column_roundtrip(Arc::new(array), true);
3368 }
3369
3370 #[test]
3371 fn arrow_writer_decimal256_dictionary() {
3372 let integers = vec![
3373 i256::from_i128(12345),
3374 i256::from_i128(56789),
3375 i256::from_i128(34567),
3376 ];
3377
3378 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3379
3380 let values = Decimal256Array::from(integers.clone())
3381 .with_precision_and_scale(5, 2)
3382 .unwrap();
3383
3384 let array = DictionaryArray::new(keys, Arc::new(values));
3385 one_column_roundtrip(Arc::new(array.clone()), true);
3386
3387 let values = Decimal256Array::from(integers)
3388 .with_precision_and_scale(12, 2)
3389 .unwrap();
3390
3391 let array = array.with_values(Arc::new(values));
3392 one_column_roundtrip(Arc::new(array), true);
3393 }
3394
3395 #[test]
3396 fn arrow_writer_string_dictionary_unsigned_index() {
3397 #[allow(deprecated)]
3399 let schema = Arc::new(Schema::new(vec![Field::new_dict(
3400 "dictionary",
3401 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3402 true,
3403 42,
3404 true,
3405 )]));
3406
3407 let d: UInt8DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
3409 .iter()
3410 .copied()
3411 .collect();
3412
3413 one_column_roundtrip_with_schema(Arc::new(d), schema);
3414 }
3415
3416 #[test]
3417 fn u32_min_max() {
3418 let src = [
3420 u32::MIN,
3421 u32::MIN + 1,
3422 (i32::MAX as u32) - 1,
3423 i32::MAX as u32,
3424 (i32::MAX as u32) + 1,
3425 u32::MAX - 1,
3426 u32::MAX,
3427 ];
3428 let values = Arc::new(UInt32Array::from_iter_values(src.iter().cloned()));
3429 let files = one_column_roundtrip(values, false);
3430
3431 for file in files {
3432 let reader = SerializedFileReader::new(file).unwrap();
3434 let metadata = reader.metadata();
3435
3436 let mut row_offset = 0;
3437 for row_group in metadata.row_groups() {
3438 assert_eq!(row_group.num_columns(), 1);
3439 let column = row_group.column(0);
3440
3441 let num_values = column.num_values() as usize;
3442 let src_slice = &src[row_offset..row_offset + num_values];
3443 row_offset += column.num_values() as usize;
3444
3445 let stats = column.statistics().unwrap();
3446 if let Statistics::Int32(stats) = stats {
3447 assert_eq!(
3448 *stats.min_opt().unwrap() as u32,
3449 *src_slice.iter().min().unwrap()
3450 );
3451 assert_eq!(
3452 *stats.max_opt().unwrap() as u32,
3453 *src_slice.iter().max().unwrap()
3454 );
3455 } else {
3456 panic!("Statistics::Int32 missing")
3457 }
3458 }
3459 }
3460 }
3461
3462 #[test]
3463 fn u64_min_max() {
3464 let src = [
3466 u64::MIN,
3467 u64::MIN + 1,
3468 (i64::MAX as u64) - 1,
3469 i64::MAX as u64,
3470 (i64::MAX as u64) + 1,
3471 u64::MAX - 1,
3472 u64::MAX,
3473 ];
3474 let values = Arc::new(UInt64Array::from_iter_values(src.iter().cloned()));
3475 let files = one_column_roundtrip(values, false);
3476
3477 for file in files {
3478 let reader = SerializedFileReader::new(file).unwrap();
3480 let metadata = reader.metadata();
3481
3482 let mut row_offset = 0;
3483 for row_group in metadata.row_groups() {
3484 assert_eq!(row_group.num_columns(), 1);
3485 let column = row_group.column(0);
3486
3487 let num_values = column.num_values() as usize;
3488 let src_slice = &src[row_offset..row_offset + num_values];
3489 row_offset += column.num_values() as usize;
3490
3491 let stats = column.statistics().unwrap();
3492 if let Statistics::Int64(stats) = stats {
3493 assert_eq!(
3494 *stats.min_opt().unwrap() as u64,
3495 *src_slice.iter().min().unwrap()
3496 );
3497 assert_eq!(
3498 *stats.max_opt().unwrap() as u64,
3499 *src_slice.iter().max().unwrap()
3500 );
3501 } else {
3502 panic!("Statistics::Int64 missing")
3503 }
3504 }
3505 }
3506 }
3507
3508 #[test]
3509 fn statistics_null_counts_only_nulls() {
3510 let values = Arc::new(UInt64Array::from(vec![None, None]));
3512 let files = one_column_roundtrip(values, true);
3513
3514 for file in files {
3515 let reader = SerializedFileReader::new(file).unwrap();
3517 let metadata = reader.metadata();
3518 assert_eq!(metadata.num_row_groups(), 1);
3519 let row_group = metadata.row_group(0);
3520 assert_eq!(row_group.num_columns(), 1);
3521 let column = row_group.column(0);
3522 let stats = column.statistics().unwrap();
3523 assert_eq!(stats.null_count_opt(), Some(2));
3524 }
3525 }
3526
3527 #[test]
3528 fn test_list_of_struct_roundtrip() {
3529 let int_field = Field::new("a", DataType::Int32, true);
3531 let int_field2 = Field::new("b", DataType::Int32, true);
3532
3533 let int_builder = Int32Builder::with_capacity(10);
3534 let int_builder2 = Int32Builder::with_capacity(10);
3535
3536 let struct_builder = StructBuilder::new(
3537 vec![int_field, int_field2],
3538 vec![Box::new(int_builder), Box::new(int_builder2)],
3539 );
3540 let mut list_builder = ListBuilder::new(struct_builder);
3541
3542 let values = list_builder.values();
3547 values
3548 .field_builder::<Int32Builder>(0)
3549 .unwrap()
3550 .append_value(1);
3551 values
3552 .field_builder::<Int32Builder>(1)
3553 .unwrap()
3554 .append_value(2);
3555 values.append(true);
3556 list_builder.append(true);
3557
3558 list_builder.append(true);
3560
3561 list_builder.append(false);
3563
3564 let values = list_builder.values();
3566 values
3567 .field_builder::<Int32Builder>(0)
3568 .unwrap()
3569 .append_null();
3570 values
3571 .field_builder::<Int32Builder>(1)
3572 .unwrap()
3573 .append_null();
3574 values.append(false);
3575 values
3576 .field_builder::<Int32Builder>(0)
3577 .unwrap()
3578 .append_null();
3579 values
3580 .field_builder::<Int32Builder>(1)
3581 .unwrap()
3582 .append_null();
3583 values.append(false);
3584 list_builder.append(true);
3585
3586 let values = list_builder.values();
3588 values
3589 .field_builder::<Int32Builder>(0)
3590 .unwrap()
3591 .append_null();
3592 values
3593 .field_builder::<Int32Builder>(1)
3594 .unwrap()
3595 .append_value(3);
3596 values.append(true);
3597 list_builder.append(true);
3598
3599 let values = list_builder.values();
3601 values
3602 .field_builder::<Int32Builder>(0)
3603 .unwrap()
3604 .append_value(2);
3605 values
3606 .field_builder::<Int32Builder>(1)
3607 .unwrap()
3608 .append_null();
3609 values.append(true);
3610 list_builder.append(true);
3611
3612 let array = Arc::new(list_builder.finish());
3613
3614 one_column_roundtrip(array, true);
3615 }
3616
3617 fn row_group_sizes(metadata: &ParquetMetaData) -> Vec<i64> {
3618 metadata.row_groups().iter().map(|x| x.num_rows()).collect()
3619 }
3620
3621 #[test]
3622 fn test_aggregates_records() {
3623 let arrays = [
3624 Int32Array::from((0..100).collect::<Vec<_>>()),
3625 Int32Array::from((0..50).collect::<Vec<_>>()),
3626 Int32Array::from((200..500).collect::<Vec<_>>()),
3627 ];
3628
3629 let schema = Arc::new(Schema::new(vec![Field::new(
3630 "int",
3631 ArrowDataType::Int32,
3632 false,
3633 )]));
3634
3635 let file = tempfile::tempfile().unwrap();
3636
3637 let props = WriterProperties::builder()
3638 .set_max_row_group_size(200)
3639 .build();
3640
3641 let mut writer =
3642 ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
3643
3644 for array in arrays {
3645 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
3646 writer.write(&batch).unwrap();
3647 }
3648
3649 writer.close().unwrap();
3650
3651 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3652 assert_eq!(&row_group_sizes(builder.metadata()), &[200, 200, 50]);
3653
3654 let batches = builder
3655 .with_batch_size(100)
3656 .build()
3657 .unwrap()
3658 .collect::<ArrowResult<Vec<_>>>()
3659 .unwrap();
3660
3661 assert_eq!(batches.len(), 5);
3662 assert!(batches.iter().all(|x| x.num_columns() == 1));
3663
3664 let batch_sizes: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
3665
3666 assert_eq!(&batch_sizes, &[100, 100, 100, 100, 50]);
3667
3668 let values: Vec<_> = batches
3669 .iter()
3670 .flat_map(|x| {
3671 x.column(0)
3672 .as_any()
3673 .downcast_ref::<Int32Array>()
3674 .unwrap()
3675 .values()
3676 .iter()
3677 .cloned()
3678 })
3679 .collect();
3680
3681 let expected_values: Vec<_> = [0..100, 0..50, 200..500].into_iter().flatten().collect();
3682 assert_eq!(&values, &expected_values)
3683 }
3684
3685 #[test]
3686 fn complex_aggregate() {
3687 let field_a = Arc::new(Field::new("leaf_a", DataType::Int32, false));
3689 let field_b = Arc::new(Field::new("leaf_b", DataType::Int32, true));
3690 let struct_a = Arc::new(Field::new(
3691 "struct_a",
3692 DataType::Struct(vec![field_a.clone(), field_b.clone()].into()),
3693 true,
3694 ));
3695
3696 let list_a = Arc::new(Field::new("list", DataType::List(struct_a), true));
3697 let struct_b = Arc::new(Field::new(
3698 "struct_b",
3699 DataType::Struct(vec![list_a.clone()].into()),
3700 false,
3701 ));
3702
3703 let schema = Arc::new(Schema::new(vec![struct_b]));
3704
3705 let field_a_array = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
3707 let field_b_array =
3708 Int32Array::from_iter(vec![Some(1), None, Some(2), None, None, Some(6)]);
3709
3710 let struct_a_array = StructArray::from(vec![
3711 (field_a.clone(), Arc::new(field_a_array) as ArrayRef),
3712 (field_b.clone(), Arc::new(field_b_array) as ArrayRef),
3713 ]);
3714
3715 let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
3716 .len(5)
3717 .add_buffer(Buffer::from_iter(vec![
3718 0_i32, 1_i32, 1_i32, 3_i32, 3_i32, 5_i32,
3719 ]))
3720 .null_bit_buffer(Some(Buffer::from_iter(vec![
3721 true, false, true, false, true,
3722 ])))
3723 .child_data(vec![struct_a_array.into_data()])
3724 .build()
3725 .unwrap();
3726
3727 let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
3728 let struct_b_array = StructArray::from(vec![(list_a.clone(), list_a_array)]);
3729
3730 let batch1 =
3731 RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
3732 .unwrap();
3733
3734 let field_a_array = Int32Array::from(vec![6, 7, 8, 9, 10]);
3735 let field_b_array = Int32Array::from_iter(vec![None, None, None, Some(1), None]);
3736
3737 let struct_a_array = StructArray::from(vec![
3738 (field_a, Arc::new(field_a_array) as ArrayRef),
3739 (field_b, Arc::new(field_b_array) as ArrayRef),
3740 ]);
3741
3742 let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
3743 .len(2)
3744 .add_buffer(Buffer::from_iter(vec![0_i32, 4_i32, 5_i32]))
3745 .child_data(vec![struct_a_array.into_data()])
3746 .build()
3747 .unwrap();
3748
3749 let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
3750 let struct_b_array = StructArray::from(vec![(list_a, list_a_array)]);
3751
3752 let batch2 =
3753 RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
3754 .unwrap();
3755
3756 let batches = &[batch1, batch2];
3757
3758 let expected = r#"
3761 +-------------------------------------------------------------------------------------------------------+
3762 | struct_b |
3763 +-------------------------------------------------------------------------------------------------------+
3764 | {list: [{leaf_a: 1, leaf_b: 1}]} |
3765 | {list: } |
3766 | {list: [{leaf_a: 2, leaf_b: }, {leaf_a: 3, leaf_b: 2}]} |
3767 | {list: } |
3768 | {list: [{leaf_a: 4, leaf_b: }, {leaf_a: 5, leaf_b: }]} |
3769 | {list: [{leaf_a: 6, leaf_b: }, {leaf_a: 7, leaf_b: }, {leaf_a: 8, leaf_b: }, {leaf_a: 9, leaf_b: 1}]} |
3770 | {list: [{leaf_a: 10, leaf_b: }]} |
3771 +-------------------------------------------------------------------------------------------------------+
3772 "#.trim().split('\n').map(|x| x.trim()).collect::<Vec<_>>().join("\n");
3773
3774 let actual = pretty_format_batches(batches).unwrap().to_string();
3775 assert_eq!(actual, expected);
3776
3777 let file = tempfile::tempfile().unwrap();
3779 let props = WriterProperties::builder()
3780 .set_max_row_group_size(6)
3781 .build();
3782
3783 let mut writer =
3784 ArrowWriter::try_new(file.try_clone().unwrap(), schema, Some(props)).unwrap();
3785
3786 for batch in batches {
3787 writer.write(batch).unwrap();
3788 }
3789 writer.close().unwrap();
3790
3791 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3796 assert_eq!(&row_group_sizes(builder.metadata()), &[6, 1]);
3797
3798 let batches = builder
3799 .with_batch_size(2)
3800 .build()
3801 .unwrap()
3802 .collect::<ArrowResult<Vec<_>>>()
3803 .unwrap();
3804
3805 assert_eq!(batches.len(), 4);
3806 let batch_counts: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
3807 assert_eq!(&batch_counts, &[2, 2, 2, 1]);
3808
3809 let actual = pretty_format_batches(&batches).unwrap().to_string();
3810 assert_eq!(actual, expected);
3811 }
3812
3813 #[test]
3814 fn test_arrow_writer_metadata() {
3815 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3816 let file_schema = batch_schema.clone().with_metadata(
3817 vec![("foo".to_string(), "bar".to_string())]
3818 .into_iter()
3819 .collect(),
3820 );
3821
3822 let batch = RecordBatch::try_new(
3823 Arc::new(batch_schema),
3824 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3825 )
3826 .unwrap();
3827
3828 let mut buf = Vec::with_capacity(1024);
3829 let mut writer = ArrowWriter::try_new(&mut buf, Arc::new(file_schema), None).unwrap();
3830 writer.write(&batch).unwrap();
3831 writer.close().unwrap();
3832 }
3833
3834 #[test]
3835 fn test_arrow_writer_nullable() {
3836 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3837 let file_schema = Schema::new(vec![Field::new("int32", DataType::Int32, true)]);
3838 let file_schema = Arc::new(file_schema);
3839
3840 let batch = RecordBatch::try_new(
3841 Arc::new(batch_schema),
3842 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3843 )
3844 .unwrap();
3845
3846 let mut buf = Vec::with_capacity(1024);
3847 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
3848 writer.write(&batch).unwrap();
3849 writer.close().unwrap();
3850
3851 let mut read = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
3852 let back = read.next().unwrap().unwrap();
3853 assert_eq!(back.schema(), file_schema);
3854 assert_ne!(back.schema(), batch.schema());
3855 assert_eq!(back.column(0).as_ref(), batch.column(0).as_ref());
3856 }
3857
3858 #[test]
3859 fn in_progress_accounting() {
3860 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
3862
3863 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
3865
3866 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
3868
3869 let mut writer = ArrowWriter::try_new(vec![], batch.schema(), None).unwrap();
3870
3871 assert_eq!(writer.in_progress_size(), 0);
3873 assert_eq!(writer.in_progress_rows(), 0);
3874 assert_eq!(writer.memory_size(), 0);
3875 assert_eq!(writer.bytes_written(), 4); writer.write(&batch).unwrap();
3877
3878 let initial_size = writer.in_progress_size();
3880 assert!(initial_size > 0);
3881 assert_eq!(writer.in_progress_rows(), 5);
3882 let initial_memory = writer.memory_size();
3883 assert!(initial_memory > 0);
3884 assert!(
3886 initial_size <= initial_memory,
3887 "{initial_size} <= {initial_memory}"
3888 );
3889
3890 writer.write(&batch).unwrap();
3892 assert!(writer.in_progress_size() > initial_size);
3893 assert_eq!(writer.in_progress_rows(), 10);
3894 assert!(writer.memory_size() > initial_memory);
3895 assert!(
3896 writer.in_progress_size() <= writer.memory_size(),
3897 "in_progress_size {} <= memory_size {}",
3898 writer.in_progress_size(),
3899 writer.memory_size()
3900 );
3901
3902 let pre_flush_bytes_written = writer.bytes_written();
3904 writer.flush().unwrap();
3905 assert_eq!(writer.in_progress_size(), 0);
3906 assert_eq!(writer.memory_size(), 0);
3907 assert!(writer.bytes_written() > pre_flush_bytes_written);
3908
3909 writer.close().unwrap();
3910 }
3911
3912 #[test]
3913 fn test_writer_all_null() {
3914 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
3915 let b = Int32Array::new(vec![0; 5].into(), Some(NullBuffer::new_null(5)));
3916 let batch = RecordBatch::try_from_iter(vec![
3917 ("a", Arc::new(a) as ArrayRef),
3918 ("b", Arc::new(b) as ArrayRef),
3919 ])
3920 .unwrap();
3921
3922 let mut buf = Vec::with_capacity(1024);
3923 let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), None).unwrap();
3924 writer.write(&batch).unwrap();
3925 writer.close().unwrap();
3926
3927 let bytes = Bytes::from(buf);
3928 let options = ReadOptionsBuilder::new().with_page_index().build();
3929 let reader = SerializedFileReader::new_with_options(bytes, options).unwrap();
3930 let index = reader.metadata().offset_index().unwrap();
3931
3932 assert_eq!(index.len(), 1);
3933 assert_eq!(index[0].len(), 2); assert_eq!(index[0][0].page_locations().len(), 1); assert_eq!(index[0][1].page_locations().len(), 1); }
3937
3938 #[test]
3939 fn test_disabled_statistics_with_page() {
3940 let file_schema = Schema::new(vec![
3941 Field::new("a", DataType::Utf8, true),
3942 Field::new("b", DataType::Utf8, true),
3943 ]);
3944 let file_schema = Arc::new(file_schema);
3945
3946 let batch = RecordBatch::try_new(
3947 file_schema.clone(),
3948 vec![
3949 Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
3950 Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
3951 ],
3952 )
3953 .unwrap();
3954
3955 let props = WriterProperties::builder()
3956 .set_statistics_enabled(EnabledStatistics::None)
3957 .set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
3958 .build();
3959
3960 let mut buf = Vec::with_capacity(1024);
3961 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
3962 writer.write(&batch).unwrap();
3963
3964 let metadata = writer.close().unwrap();
3965 assert_eq!(metadata.row_groups.len(), 1);
3966 let row_group = &metadata.row_groups[0];
3967 assert_eq!(row_group.columns.len(), 2);
3968 assert!(row_group.columns[0].offset_index_offset.is_some());
3970 assert!(row_group.columns[0].column_index_offset.is_some());
3971 assert!(row_group.columns[1].offset_index_offset.is_some());
3973 assert!(row_group.columns[1].column_index_offset.is_none());
3974
3975 let options = ReadOptionsBuilder::new().with_page_index().build();
3976 let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
3977
3978 let row_group = reader.get_row_group(0).unwrap();
3979 let a_col = row_group.metadata().column(0);
3980 let b_col = row_group.metadata().column(1);
3981
3982 if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
3984 let min = byte_array_stats.min_opt().unwrap();
3985 let max = byte_array_stats.max_opt().unwrap();
3986
3987 assert_eq!(min.as_bytes(), b"a");
3988 assert_eq!(max.as_bytes(), b"d");
3989 } else {
3990 panic!("expecting Statistics::ByteArray");
3991 }
3992
3993 assert!(b_col.statistics().is_none());
3995
3996 let offset_index = reader.metadata().offset_index().unwrap();
3997 assert_eq!(offset_index.len(), 1); assert_eq!(offset_index[0].len(), 2); let column_index = reader.metadata().column_index().unwrap();
4001 assert_eq!(column_index.len(), 1); assert_eq!(column_index[0].len(), 2); let a_idx = &column_index[0][0];
4005 assert!(matches!(a_idx, Index::BYTE_ARRAY(_)), "{a_idx:?}");
4006 let b_idx = &column_index[0][1];
4007 assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
4008 }
4009
4010 #[test]
4011 fn test_disabled_statistics_with_chunk() {
4012 let file_schema = Schema::new(vec![
4013 Field::new("a", DataType::Utf8, true),
4014 Field::new("b", DataType::Utf8, true),
4015 ]);
4016 let file_schema = Arc::new(file_schema);
4017
4018 let batch = RecordBatch::try_new(
4019 file_schema.clone(),
4020 vec![
4021 Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
4022 Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
4023 ],
4024 )
4025 .unwrap();
4026
4027 let props = WriterProperties::builder()
4028 .set_statistics_enabled(EnabledStatistics::None)
4029 .set_column_statistics_enabled("a".into(), EnabledStatistics::Chunk)
4030 .build();
4031
4032 let mut buf = Vec::with_capacity(1024);
4033 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
4034 writer.write(&batch).unwrap();
4035
4036 let metadata = writer.close().unwrap();
4037 assert_eq!(metadata.row_groups.len(), 1);
4038 let row_group = &metadata.row_groups[0];
4039 assert_eq!(row_group.columns.len(), 2);
4040 assert!(row_group.columns[0].offset_index_offset.is_some());
4042 assert!(row_group.columns[0].column_index_offset.is_none());
4043 assert!(row_group.columns[1].offset_index_offset.is_some());
4045 assert!(row_group.columns[1].column_index_offset.is_none());
4046
4047 let options = ReadOptionsBuilder::new().with_page_index().build();
4048 let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
4049
4050 let row_group = reader.get_row_group(0).unwrap();
4051 let a_col = row_group.metadata().column(0);
4052 let b_col = row_group.metadata().column(1);
4053
4054 if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
4056 let min = byte_array_stats.min_opt().unwrap();
4057 let max = byte_array_stats.max_opt().unwrap();
4058
4059 assert_eq!(min.as_bytes(), b"a");
4060 assert_eq!(max.as_bytes(), b"d");
4061 } else {
4062 panic!("expecting Statistics::ByteArray");
4063 }
4064
4065 assert!(b_col.statistics().is_none());
4067
4068 let column_index = reader.metadata().column_index().unwrap();
4069 assert_eq!(column_index.len(), 1); assert_eq!(column_index[0].len(), 2); let a_idx = &column_index[0][0];
4073 assert!(matches!(a_idx, Index::NONE), "{a_idx:?}");
4074 let b_idx = &column_index[0][1];
4075 assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
4076 }
4077
4078 #[test]
4079 fn test_arrow_writer_skip_metadata() {
4080 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
4081 let file_schema = Arc::new(batch_schema.clone());
4082
4083 let batch = RecordBatch::try_new(
4084 Arc::new(batch_schema),
4085 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4086 )
4087 .unwrap();
4088 let skip_options = ArrowWriterOptions::new().with_skip_arrow_metadata(true);
4089
4090 let mut buf = Vec::with_capacity(1024);
4091 let mut writer =
4092 ArrowWriter::try_new_with_options(&mut buf, file_schema.clone(), skip_options).unwrap();
4093 writer.write(&batch).unwrap();
4094 writer.close().unwrap();
4095
4096 let bytes = Bytes::from(buf);
4097 let reader_builder = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
4098 assert_eq!(file_schema, *reader_builder.schema());
4099 if let Some(key_value_metadata) = reader_builder
4100 .metadata()
4101 .file_metadata()
4102 .key_value_metadata()
4103 {
4104 assert!(!key_value_metadata
4105 .iter()
4106 .any(|kv| kv.key.as_str() == ARROW_SCHEMA_META_KEY));
4107 }
4108 }
4109
4110 #[test]
4111 fn mismatched_schemas() {
4112 let batch_schema = Schema::new(vec![Field::new("count", DataType::Int32, false)]);
4113 let file_schema = Arc::new(Schema::new(vec![Field::new(
4114 "temperature",
4115 DataType::Float64,
4116 false,
4117 )]));
4118
4119 let batch = RecordBatch::try_new(
4120 Arc::new(batch_schema),
4121 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4122 )
4123 .unwrap();
4124
4125 let mut buf = Vec::with_capacity(1024);
4126 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
4127
4128 let err = writer.write(&batch).unwrap_err().to_string();
4129 assert_eq!(
4130 err,
4131 "Arrow: Incompatible type. Field 'temperature' has type Float64, array has type Int32"
4132 );
4133 }
4134
4135 #[test]
4136 fn test_roundtrip_empty_schema() {
4138 let empty_batch = RecordBatch::try_new_with_options(
4140 Arc::new(Schema::empty()),
4141 vec![],
4142 &RecordBatchOptions::default().with_row_count(Some(0)),
4143 )
4144 .unwrap();
4145
4146 let mut parquet_bytes: Vec<u8> = Vec::new();
4148 let mut writer =
4149 ArrowWriter::try_new(&mut parquet_bytes, empty_batch.schema(), None).unwrap();
4150 writer.write(&empty_batch).unwrap();
4151 writer.close().unwrap();
4152
4153 let bytes = Bytes::from(parquet_bytes);
4155 let reader = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
4156 assert_eq!(reader.schema(), &empty_batch.schema());
4157 let batches: Vec<_> = reader
4158 .build()
4159 .unwrap()
4160 .collect::<ArrowResult<Vec<_>>>()
4161 .unwrap();
4162 assert_eq!(batches.len(), 0);
4163 }
4164
4165 #[test]
4166 fn test_page_stats_not_written_by_default() {
4167 let string_field = Field::new("a", DataType::Utf8, false);
4168 let schema = Schema::new(vec![string_field]);
4169 let raw_string_values = vec!["Blart Versenwald III"];
4170 let string_values = StringArray::from(raw_string_values.clone());
4171 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
4172
4173 let props = WriterProperties::builder()
4174 .set_statistics_enabled(EnabledStatistics::Page)
4175 .set_dictionary_enabled(false)
4176 .set_encoding(Encoding::PLAIN)
4177 .set_compression(crate::basic::Compression::UNCOMPRESSED)
4178 .build();
4179
4180 let mut file = roundtrip_opts(&batch, props);
4181
4182 let mut buf = vec![];
4185 file.seek(std::io::SeekFrom::Start(0)).unwrap();
4186 let read = file.read_to_end(&mut buf).unwrap();
4187 assert!(read > 0);
4188
4189 let first_page = &buf[4..];
4191 let mut prot = TCompactSliceInputProtocol::new(first_page);
4192 let hdr = PageHeader::read_from_in_protocol(&mut prot).unwrap();
4193 let stats = hdr.data_page_header.unwrap().statistics;
4194
4195 assert!(stats.is_none());
4196 }
4197
4198 #[test]
4199 fn test_page_stats_when_enabled() {
4200 let string_field = Field::new("a", DataType::Utf8, false);
4201 let schema = Schema::new(vec![string_field]);
4202 let raw_string_values = vec!["Blart Versenwald III", "Andrew Lamb"];
4203 let string_values = StringArray::from(raw_string_values.clone());
4204 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
4205
4206 let props = WriterProperties::builder()
4207 .set_statistics_enabled(EnabledStatistics::Page)
4208 .set_dictionary_enabled(false)
4209 .set_encoding(Encoding::PLAIN)
4210 .set_write_page_header_statistics(true)
4211 .set_compression(crate::basic::Compression::UNCOMPRESSED)
4212 .build();
4213
4214 let mut file = roundtrip_opts(&batch, props);
4215
4216 let mut buf = vec![];
4219 file.seek(std::io::SeekFrom::Start(0)).unwrap();
4220 let read = file.read_to_end(&mut buf).unwrap();
4221 assert!(read > 0);
4222
4223 let first_page = &buf[4..];
4225 let mut prot = TCompactSliceInputProtocol::new(first_page);
4226 let hdr = PageHeader::read_from_in_protocol(&mut prot).unwrap();
4227 let stats = hdr.data_page_header.unwrap().statistics;
4228
4229 let stats = stats.unwrap();
4230 assert!(stats.is_max_value_exact.unwrap());
4232 assert!(stats.is_min_value_exact.unwrap());
4233 assert_eq!(stats.max_value.unwrap(), "Blart Versenwald III".as_bytes());
4234 assert_eq!(stats.min_value.unwrap(), "Andrew Lamb".as_bytes());
4235 }
4236
4237 #[test]
4238 fn test_page_stats_truncation() {
4239 let string_field = Field::new("a", DataType::Utf8, false);
4240 let binary_field = Field::new("b", DataType::Binary, false);
4241 let schema = Schema::new(vec![string_field, binary_field]);
4242
4243 let raw_string_values = vec!["Blart Versenwald III"];
4244 let raw_binary_values = [b"Blart Versenwald III".to_vec()];
4245 let raw_binary_value_refs = raw_binary_values
4246 .iter()
4247 .map(|x| x.as_slice())
4248 .collect::<Vec<_>>();
4249
4250 let string_values = StringArray::from(raw_string_values.clone());
4251 let binary_values = BinaryArray::from(raw_binary_value_refs);
4252 let batch = RecordBatch::try_new(
4253 Arc::new(schema),
4254 vec![Arc::new(string_values), Arc::new(binary_values)],
4255 )
4256 .unwrap();
4257
4258 let props = WriterProperties::builder()
4259 .set_statistics_truncate_length(Some(2))
4260 .set_dictionary_enabled(false)
4261 .set_encoding(Encoding::PLAIN)
4262 .set_write_page_header_statistics(true)
4263 .set_compression(crate::basic::Compression::UNCOMPRESSED)
4264 .build();
4265
4266 let mut file = roundtrip_opts(&batch, props);
4267
4268 let mut buf = vec![];
4271 file.seek(std::io::SeekFrom::Start(0)).unwrap();
4272 let read = file.read_to_end(&mut buf).unwrap();
4273 assert!(read > 0);
4274
4275 let first_page = &buf[4..];
4277 let mut prot = TCompactSliceInputProtocol::new(first_page);
4278 let hdr = PageHeader::read_from_in_protocol(&mut prot).unwrap();
4279 let stats = hdr.data_page_header.unwrap().statistics;
4280 assert!(stats.is_some());
4281 let stats = stats.unwrap();
4282 assert!(!stats.is_max_value_exact.unwrap());
4284 assert!(!stats.is_min_value_exact.unwrap());
4285 assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
4286 assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
4287
4288 let second_page = &prot.as_slice()[hdr.compressed_page_size as usize..];
4290 let mut prot = TCompactSliceInputProtocol::new(second_page);
4291 let hdr = PageHeader::read_from_in_protocol(&mut prot).unwrap();
4292 let stats = hdr.data_page_header.unwrap().statistics;
4293 assert!(stats.is_some());
4294 let stats = stats.unwrap();
4295 assert!(!stats.is_max_value_exact.unwrap());
4297 assert!(!stats.is_min_value_exact.unwrap());
4298 assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
4299 assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
4300 }
4301
4302 #[test]
4303 fn test_page_encoding_statistics_roundtrip() {
4304 let batch_schema = Schema::new(vec![Field::new(
4305 "int32",
4306 arrow_schema::DataType::Int32,
4307 false,
4308 )]);
4309
4310 let batch = RecordBatch::try_new(
4311 Arc::new(batch_schema.clone()),
4312 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4313 )
4314 .unwrap();
4315
4316 let mut file: File = tempfile::tempfile().unwrap();
4317 let mut writer = ArrowWriter::try_new(&mut file, Arc::new(batch_schema), None).unwrap();
4318 writer.write(&batch).unwrap();
4319 let file_metadata = writer.close().unwrap();
4320
4321 assert_eq!(file_metadata.row_groups.len(), 1);
4322 assert_eq!(file_metadata.row_groups[0].columns.len(), 1);
4323 let chunk_meta = file_metadata.row_groups[0].columns[0]
4324 .meta_data
4325 .as_ref()
4326 .expect("column metadata missing");
4327 assert!(chunk_meta.encoding_stats.is_some());
4328 let chunk_page_stats = chunk_meta.encoding_stats.as_ref().unwrap();
4329
4330 let options = ReadOptionsBuilder::new().with_page_index().build();
4332 let reader = SerializedFileReader::new_with_options(file, options).unwrap();
4333
4334 let rowgroup = reader.get_row_group(0).expect("row group missing");
4335 assert_eq!(rowgroup.num_columns(), 1);
4336 let column = rowgroup.metadata().column(0);
4337 assert!(column.page_encoding_stats().is_some());
4338 let file_page_stats = column.page_encoding_stats().unwrap();
4339 let chunk_stats: Vec<PageEncodingStats> = chunk_page_stats
4340 .iter()
4341 .map(|x| crate::file::page_encoding_stats::try_from_thrift(x).unwrap())
4342 .collect();
4343 assert_eq!(&chunk_stats, file_page_stats);
4344 }
4345
4346 #[test]
4347 fn test_different_dict_page_size_limit() {
4348 let array = Arc::new(Int64Array::from_iter(0..1024 * 1024));
4349 let schema = Arc::new(Schema::new(vec![
4350 Field::new("col0", arrow_schema::DataType::Int64, false),
4351 Field::new("col1", arrow_schema::DataType::Int64, false),
4352 ]));
4353 let batch =
4354 arrow_array::RecordBatch::try_new(schema.clone(), vec![array.clone(), array]).unwrap();
4355
4356 let props = WriterProperties::builder()
4357 .set_dictionary_page_size_limit(1024 * 1024)
4358 .set_column_dictionary_page_size_limit(ColumnPath::from("col1"), 1024 * 1024 * 4)
4359 .build();
4360 let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap();
4361 writer.write(&batch).unwrap();
4362 let data = Bytes::from(writer.into_inner().unwrap());
4363
4364 let mut metadata = ParquetMetaDataReader::new();
4365 metadata.try_parse(&data).unwrap();
4366 let metadata = metadata.finish().unwrap();
4367 let col0_meta = metadata.row_group(0).column(0);
4368 let col1_meta = metadata.row_group(0).column(1);
4369
4370 let get_dict_page_size = move |meta: &ColumnChunkMetaData| {
4371 let mut reader =
4372 SerializedPageReader::new(Arc::new(data.clone()), meta, 0, None).unwrap();
4373 let page = reader.get_next_page().unwrap().unwrap();
4374 match page {
4375 Page::DictionaryPage { buf, .. } => buf.len(),
4376 _ => panic!("expected DictionaryPage"),
4377 }
4378 };
4379
4380 assert_eq!(get_dict_page_size(col0_meta), 1024 * 1024);
4381 assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4);
4382 }
4383}