1use bytes::Bytes;
21use std::io::{Read, Write};
22use std::iter::Peekable;
23use std::slice::Iter;
24use std::sync::{Arc, Mutex};
25use std::vec::IntoIter;
26use thrift::protocol::TCompactOutputProtocol;
27
28use arrow_array::cast::AsArray;
29use arrow_array::types::*;
30use arrow_array::{ArrayRef, RecordBatch, RecordBatchWriter};
31use arrow_schema::{ArrowError, DataType as ArrowDataType, Field, IntervalUnit, SchemaRef};
32
33use super::schema::{add_encoded_arrow_schema_to_metadata, decimal_length_from_precision};
34
35use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder;
36use crate::arrow::ArrowSchemaConverter;
37use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter};
38use crate::column::page_encryption::PageEncryptor;
39use crate::column::writer::encoder::ColumnValueEncoder;
40use crate::column::writer::{
41 get_column_writer, ColumnCloseResult, ColumnWriter, GenericColumnWriter,
42};
43use crate::data_type::{ByteArray, FixedLenByteArray};
44#[cfg(feature = "encryption")]
45use crate::encryption::encrypt::FileEncryptor;
46use crate::errors::{ParquetError, Result};
47use crate::file::metadata::{KeyValue, RowGroupMetaData};
48use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
49use crate::file::reader::{ChunkReader, Length};
50use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
51use crate::schema::types::{ColumnDescPtr, SchemaDescriptor};
52use crate::thrift::TSerializable;
53use levels::{calculate_array_levels, ArrayLevels};
54
55mod byte_array;
56mod levels;
57
58pub struct ArrowWriter<W: Write> {
170 writer: SerializedFileWriter<W>,
172
173 in_progress: Option<ArrowRowGroupWriter>,
175
176 arrow_schema: SchemaRef,
180
181 row_group_writer_factory: ArrowRowGroupWriterFactory,
183
184 max_row_group_size: usize,
186}
187
188impl<W: Write + Send> std::fmt::Debug for ArrowWriter<W> {
189 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
190 let buffered_memory = self.in_progress_size();
191 f.debug_struct("ArrowWriter")
192 .field("writer", &self.writer)
193 .field("in_progress_size", &format_args!("{buffered_memory} bytes"))
194 .field("in_progress_rows", &self.in_progress_rows())
195 .field("arrow_schema", &self.arrow_schema)
196 .field("max_row_group_size", &self.max_row_group_size)
197 .finish()
198 }
199}
200
201impl<W: Write + Send> ArrowWriter<W> {
202 pub fn try_new(
208 writer: W,
209 arrow_schema: SchemaRef,
210 props: Option<WriterProperties>,
211 ) -> Result<Self> {
212 let options = ArrowWriterOptions::new().with_properties(props.unwrap_or_default());
213 Self::try_new_with_options(writer, arrow_schema, options)
214 }
215
216 pub fn try_new_with_options(
222 writer: W,
223 arrow_schema: SchemaRef,
224 options: ArrowWriterOptions,
225 ) -> Result<Self> {
226 let mut props = options.properties;
227 let mut converter = ArrowSchemaConverter::new().with_coerce_types(props.coerce_types());
228 if let Some(schema_root) = &options.schema_root {
229 converter = converter.schema_root(schema_root);
230 }
231 let schema = converter.convert(&arrow_schema)?;
232 if !options.skip_arrow_metadata {
233 add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
235 }
236
237 let max_row_group_size = props.max_row_group_size();
238
239 let props_ptr = Arc::new(props);
240 let file_writer =
241 SerializedFileWriter::new(writer, schema.root_schema_ptr(), Arc::clone(&props_ptr))?;
242
243 let row_group_writer_factory =
244 ArrowRowGroupWriterFactory::new(&file_writer, schema, arrow_schema.clone(), props_ptr);
245
246 Ok(Self {
247 writer: file_writer,
248 in_progress: None,
249 arrow_schema,
250 row_group_writer_factory,
251 max_row_group_size,
252 })
253 }
254
255 pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
257 self.writer.flushed_row_groups()
258 }
259
260 pub fn memory_size(&self) -> usize {
265 match &self.in_progress {
266 Some(in_progress) => in_progress.writers.iter().map(|x| x.memory_size()).sum(),
267 None => 0,
268 }
269 }
270
271 pub fn in_progress_size(&self) -> usize {
278 match &self.in_progress {
279 Some(in_progress) => in_progress
280 .writers
281 .iter()
282 .map(|x| x.get_estimated_total_bytes())
283 .sum(),
284 None => 0,
285 }
286 }
287
288 pub fn in_progress_rows(&self) -> usize {
290 self.in_progress
291 .as_ref()
292 .map(|x| x.buffered_rows)
293 .unwrap_or_default()
294 }
295
296 pub fn bytes_written(&self) -> usize {
298 self.writer.bytes_written()
299 }
300
301 pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
309 if batch.num_rows() == 0 {
310 return Ok(());
311 }
312
313 let in_progress = match &mut self.in_progress {
314 Some(in_progress) => in_progress,
315 x => x.insert(
316 self.row_group_writer_factory
317 .create_row_group_writer(self.writer.flushed_row_groups().len())?,
318 ),
319 };
320
321 if in_progress.buffered_rows + batch.num_rows() > self.max_row_group_size {
323 let to_write = self.max_row_group_size - in_progress.buffered_rows;
324 let a = batch.slice(0, to_write);
325 let b = batch.slice(to_write, batch.num_rows() - to_write);
326 self.write(&a)?;
327 return self.write(&b);
328 }
329
330 in_progress.write(batch)?;
331
332 if in_progress.buffered_rows >= self.max_row_group_size {
333 self.flush()?
334 }
335 Ok(())
336 }
337
338 pub fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
343 self.writer.write_all(buf)
344 }
345
346 pub fn flush(&mut self) -> Result<()> {
348 let in_progress = match self.in_progress.take() {
349 Some(in_progress) => in_progress,
350 None => return Ok(()),
351 };
352
353 let mut row_group_writer = self.writer.next_row_group()?;
354 for chunk in in_progress.close()? {
355 chunk.append_to_row_group(&mut row_group_writer)?;
356 }
357 row_group_writer.close()?;
358 Ok(())
359 }
360
361 pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
365 self.writer.append_key_value_metadata(kv_metadata)
366 }
367
368 pub fn inner(&self) -> &W {
370 self.writer.inner()
371 }
372
373 pub fn inner_mut(&mut self) -> &mut W {
382 self.writer.inner_mut()
383 }
384
385 pub fn into_inner(mut self) -> Result<W> {
387 self.flush()?;
388 self.writer.into_inner()
389 }
390
391 pub fn finish(&mut self) -> Result<crate::format::FileMetaData> {
397 self.flush()?;
398 self.writer.finish()
399 }
400
401 pub fn close(mut self) -> Result<crate::format::FileMetaData> {
403 self.finish()
404 }
405
406 pub fn get_column_writers(&mut self) -> Result<Vec<ArrowColumnWriter>> {
408 self.flush()?;
409 let in_progress = self
410 .row_group_writer_factory
411 .create_row_group_writer(self.writer.flushed_row_groups().len())?;
412 Ok(in_progress.writers)
413 }
414
415 pub fn append_row_group(&mut self, chunks: Vec<ArrowColumnChunk>) -> Result<()> {
417 let mut row_group_writer = self.writer.next_row_group()?;
418 for chunk in chunks {
419 chunk.append_to_row_group(&mut row_group_writer)?;
420 }
421 row_group_writer.close()?;
422 Ok(())
423 }
424}
425
426impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> {
427 fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
428 self.write(batch).map_err(|e| e.into())
429 }
430
431 fn close(self) -> std::result::Result<(), ArrowError> {
432 self.close()?;
433 Ok(())
434 }
435}
436
437#[derive(Debug, Clone, Default)]
441pub struct ArrowWriterOptions {
442 properties: WriterProperties,
443 skip_arrow_metadata: bool,
444 schema_root: Option<String>,
445}
446
447impl ArrowWriterOptions {
448 pub fn new() -> Self {
450 Self::default()
451 }
452
453 pub fn with_properties(self, properties: WriterProperties) -> Self {
455 Self { properties, ..self }
456 }
457
458 pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
465 Self {
466 skip_arrow_metadata,
467 ..self
468 }
469 }
470
471 pub fn with_schema_root(self, schema_root: String) -> Self {
473 Self {
474 schema_root: Some(schema_root),
475 ..self
476 }
477 }
478}
479
480#[derive(Default)]
482struct ArrowColumnChunkData {
483 length: usize,
484 data: Vec<Bytes>,
485}
486
487impl Length for ArrowColumnChunkData {
488 fn len(&self) -> u64 {
489 self.length as _
490 }
491}
492
493impl ChunkReader for ArrowColumnChunkData {
494 type T = ArrowColumnChunkReader;
495
496 fn get_read(&self, start: u64) -> Result<Self::T> {
497 assert_eq!(start, 0); Ok(ArrowColumnChunkReader(
499 self.data.clone().into_iter().peekable(),
500 ))
501 }
502
503 fn get_bytes(&self, _start: u64, _length: usize) -> Result<Bytes> {
504 unimplemented!()
505 }
506}
507
508struct ArrowColumnChunkReader(Peekable<IntoIter<Bytes>>);
510
511impl Read for ArrowColumnChunkReader {
512 fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
513 let buffer = loop {
514 match self.0.peek_mut() {
515 Some(b) if b.is_empty() => {
516 self.0.next();
517 continue;
518 }
519 Some(b) => break b,
520 None => return Ok(0),
521 }
522 };
523
524 let len = buffer.len().min(out.len());
525 let b = buffer.split_to(len);
526 out[..len].copy_from_slice(&b);
527 Ok(len)
528 }
529}
530
531type SharedColumnChunk = Arc<Mutex<ArrowColumnChunkData>>;
536
537#[derive(Default)]
538struct ArrowPageWriter {
539 buffer: SharedColumnChunk,
540 #[cfg(feature = "encryption")]
541 page_encryptor: Option<PageEncryptor>,
542}
543
544impl ArrowPageWriter {
545 #[cfg(feature = "encryption")]
546 pub fn with_encryptor(mut self, page_encryptor: Option<PageEncryptor>) -> Self {
547 self.page_encryptor = page_encryptor;
548 self
549 }
550
551 #[cfg(feature = "encryption")]
552 fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
553 self.page_encryptor.as_mut()
554 }
555
556 #[cfg(not(feature = "encryption"))]
557 fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
558 None
559 }
560}
561
562impl PageWriter for ArrowPageWriter {
563 fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
564 let page = match self.page_encryptor_mut() {
565 Some(page_encryptor) => page_encryptor.encrypt_compressed_page(page)?,
566 None => page,
567 };
568
569 let page_header = page.to_thrift_header();
570 let header = {
571 let mut header = Vec::with_capacity(1024);
572
573 match self.page_encryptor_mut() {
574 Some(page_encryptor) => {
575 page_encryptor.encrypt_page_header(&page_header, &mut header)?;
576 if page.compressed_page().is_data_page() {
577 page_encryptor.increment_page();
578 }
579 }
580 None => {
581 let mut protocol = TCompactOutputProtocol::new(&mut header);
582 page_header.write_to_out_protocol(&mut protocol)?;
583 }
584 };
585
586 Bytes::from(header)
587 };
588
589 let mut buf = self.buffer.try_lock().unwrap();
590
591 let data = page.compressed_page().buffer().clone();
592 let compressed_size = data.len() + header.len();
593
594 let mut spec = PageWriteSpec::new();
595 spec.page_type = page.page_type();
596 spec.num_values = page.num_values();
597 spec.uncompressed_size = page.uncompressed_size() + header.len();
598 spec.offset = buf.length as u64;
599 spec.compressed_size = compressed_size;
600 spec.bytes_written = compressed_size as u64;
601
602 buf.length += compressed_size;
603 buf.data.push(header);
604 buf.data.push(data);
605
606 Ok(spec)
607 }
608
609 fn close(&mut self) -> Result<()> {
610 Ok(())
611 }
612}
613
614#[derive(Debug)]
616pub struct ArrowLeafColumn(ArrayLevels);
617
618pub fn compute_leaves(field: &Field, array: &ArrayRef) -> Result<Vec<ArrowLeafColumn>> {
620 let levels = calculate_array_levels(array, field)?;
621 Ok(levels.into_iter().map(ArrowLeafColumn).collect())
622}
623
624pub struct ArrowColumnChunk {
626 data: ArrowColumnChunkData,
627 close: ColumnCloseResult,
628}
629
630impl std::fmt::Debug for ArrowColumnChunk {
631 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
632 f.debug_struct("ArrowColumnChunk")
633 .field("length", &self.data.length)
634 .finish_non_exhaustive()
635 }
636}
637
638impl ArrowColumnChunk {
639 pub fn append_to_row_group<W: Write + Send>(
641 self,
642 writer: &mut SerializedRowGroupWriter<'_, W>,
643 ) -> Result<()> {
644 writer.append_column(&self.data, self.close)
645 }
646}
647
648pub struct ArrowColumnWriter {
742 writer: ArrowColumnWriterImpl,
743 chunk: SharedColumnChunk,
744}
745
746impl std::fmt::Debug for ArrowColumnWriter {
747 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
748 f.debug_struct("ArrowColumnWriter").finish_non_exhaustive()
749 }
750}
751
752enum ArrowColumnWriterImpl {
753 ByteArray(GenericColumnWriter<'static, ByteArrayEncoder>),
754 Column(ColumnWriter<'static>),
755}
756
757impl ArrowColumnWriter {
758 pub fn write(&mut self, col: &ArrowLeafColumn) -> Result<()> {
760 match &mut self.writer {
761 ArrowColumnWriterImpl::Column(c) => {
762 write_leaf(c, &col.0)?;
763 }
764 ArrowColumnWriterImpl::ByteArray(c) => {
765 write_primitive(c, col.0.array().as_ref(), &col.0)?;
766 }
767 }
768 Ok(())
769 }
770
771 pub fn close(self) -> Result<ArrowColumnChunk> {
773 let close = match self.writer {
774 ArrowColumnWriterImpl::ByteArray(c) => c.close()?,
775 ArrowColumnWriterImpl::Column(c) => c.close()?,
776 };
777 let chunk = Arc::try_unwrap(self.chunk).ok().unwrap();
778 let data = chunk.into_inner().unwrap();
779 Ok(ArrowColumnChunk { data, close })
780 }
781
782 pub fn memory_size(&self) -> usize {
793 match &self.writer {
794 ArrowColumnWriterImpl::ByteArray(c) => c.memory_size(),
795 ArrowColumnWriterImpl::Column(c) => c.memory_size(),
796 }
797 }
798
799 pub fn get_estimated_total_bytes(&self) -> usize {
807 match &self.writer {
808 ArrowColumnWriterImpl::ByteArray(c) => c.get_estimated_total_bytes() as _,
809 ArrowColumnWriterImpl::Column(c) => c.get_estimated_total_bytes() as _,
810 }
811 }
812}
813
814struct ArrowRowGroupWriter {
816 writers: Vec<ArrowColumnWriter>,
817 schema: SchemaRef,
818 buffered_rows: usize,
819}
820
821impl ArrowRowGroupWriter {
822 fn new(writers: Vec<ArrowColumnWriter>, arrow: &SchemaRef) -> Self {
823 Self {
824 writers,
825 schema: arrow.clone(),
826 buffered_rows: 0,
827 }
828 }
829
830 fn write(&mut self, batch: &RecordBatch) -> Result<()> {
831 self.buffered_rows += batch.num_rows();
832 let mut writers = self.writers.iter_mut();
833 for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
834 for leaf in compute_leaves(field.as_ref(), column)? {
835 writers.next().unwrap().write(&leaf)?
836 }
837 }
838 Ok(())
839 }
840
841 fn close(self) -> Result<Vec<ArrowColumnChunk>> {
842 self.writers
843 .into_iter()
844 .map(|writer| writer.close())
845 .collect()
846 }
847}
848
849struct ArrowRowGroupWriterFactory {
850 schema: SchemaDescriptor,
851 arrow_schema: SchemaRef,
852 props: WriterPropertiesPtr,
853 #[cfg(feature = "encryption")]
854 file_encryptor: Option<Arc<FileEncryptor>>,
855}
856
857impl ArrowRowGroupWriterFactory {
858 #[cfg(feature = "encryption")]
859 fn new<W: Write + Send>(
860 file_writer: &SerializedFileWriter<W>,
861 schema: SchemaDescriptor,
862 arrow_schema: SchemaRef,
863 props: WriterPropertiesPtr,
864 ) -> Self {
865 Self {
866 schema,
867 arrow_schema,
868 props,
869 file_encryptor: file_writer.file_encryptor(),
870 }
871 }
872
873 #[cfg(not(feature = "encryption"))]
874 fn new<W: Write + Send>(
875 _file_writer: &SerializedFileWriter<W>,
876 schema: SchemaDescriptor,
877 arrow_schema: SchemaRef,
878 props: WriterPropertiesPtr,
879 ) -> Self {
880 Self {
881 schema,
882 arrow_schema,
883 props,
884 }
885 }
886
887 #[cfg(feature = "encryption")]
888 fn create_row_group_writer(&self, row_group_index: usize) -> Result<ArrowRowGroupWriter> {
889 let writers = get_column_writers_with_encryptor(
890 &self.schema,
891 &self.props,
892 &self.arrow_schema,
893 self.file_encryptor.clone(),
894 row_group_index,
895 )?;
896 Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema))
897 }
898
899 #[cfg(not(feature = "encryption"))]
900 fn create_row_group_writer(&self, _row_group_index: usize) -> Result<ArrowRowGroupWriter> {
901 let writers = get_column_writers(&self.schema, &self.props, &self.arrow_schema)?;
902 Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema))
903 }
904}
905
906pub fn get_column_writers(
908 parquet: &SchemaDescriptor,
909 props: &WriterPropertiesPtr,
910 arrow: &SchemaRef,
911) -> Result<Vec<ArrowColumnWriter>> {
912 let mut writers = Vec::with_capacity(arrow.fields.len());
913 let mut leaves = parquet.columns().iter();
914 let column_factory = ArrowColumnWriterFactory::new();
915 for field in &arrow.fields {
916 column_factory.get_arrow_column_writer(
917 field.data_type(),
918 props,
919 &mut leaves,
920 &mut writers,
921 )?;
922 }
923 Ok(writers)
924}
925
926#[cfg(feature = "encryption")]
928fn get_column_writers_with_encryptor(
929 parquet: &SchemaDescriptor,
930 props: &WriterPropertiesPtr,
931 arrow: &SchemaRef,
932 file_encryptor: Option<Arc<FileEncryptor>>,
933 row_group_index: usize,
934) -> Result<Vec<ArrowColumnWriter>> {
935 let mut writers = Vec::with_capacity(arrow.fields.len());
936 let mut leaves = parquet.columns().iter();
937 let column_factory =
938 ArrowColumnWriterFactory::new().with_file_encryptor(row_group_index, file_encryptor);
939 for field in &arrow.fields {
940 column_factory.get_arrow_column_writer(
941 field.data_type(),
942 props,
943 &mut leaves,
944 &mut writers,
945 )?;
946 }
947 Ok(writers)
948}
949
950struct ArrowColumnWriterFactory {
952 #[cfg(feature = "encryption")]
953 row_group_index: usize,
954 #[cfg(feature = "encryption")]
955 file_encryptor: Option<Arc<FileEncryptor>>,
956}
957
958impl ArrowColumnWriterFactory {
959 pub fn new() -> Self {
960 Self {
961 #[cfg(feature = "encryption")]
962 row_group_index: 0,
963 #[cfg(feature = "encryption")]
964 file_encryptor: None,
965 }
966 }
967
968 #[cfg(feature = "encryption")]
969 pub fn with_file_encryptor(
970 mut self,
971 row_group_index: usize,
972 file_encryptor: Option<Arc<FileEncryptor>>,
973 ) -> Self {
974 self.row_group_index = row_group_index;
975 self.file_encryptor = file_encryptor;
976 self
977 }
978
979 #[cfg(feature = "encryption")]
980 fn create_page_writer(
981 &self,
982 column_descriptor: &ColumnDescPtr,
983 column_index: usize,
984 ) -> Result<Box<ArrowPageWriter>> {
985 let column_path = column_descriptor.path().string();
986 let page_encryptor = PageEncryptor::create_if_column_encrypted(
987 &self.file_encryptor,
988 self.row_group_index,
989 column_index,
990 &column_path,
991 )?;
992 Ok(Box::new(
993 ArrowPageWriter::default().with_encryptor(page_encryptor),
994 ))
995 }
996
997 #[cfg(not(feature = "encryption"))]
998 fn create_page_writer(
999 &self,
1000 _column_descriptor: &ColumnDescPtr,
1001 _column_index: usize,
1002 ) -> Result<Box<ArrowPageWriter>> {
1003 Ok(Box::<ArrowPageWriter>::default())
1004 }
1005
1006 fn get_arrow_column_writer(
1008 &self,
1009 data_type: &ArrowDataType,
1010 props: &WriterPropertiesPtr,
1011 leaves: &mut Iter<'_, ColumnDescPtr>,
1012 out: &mut Vec<ArrowColumnWriter>,
1013 ) -> Result<()> {
1014 let col = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
1015 let page_writer = self.create_page_writer(desc, out.len())?;
1016 let chunk = page_writer.buffer.clone();
1017 let writer = get_column_writer(desc.clone(), props.clone(), page_writer);
1018 Ok(ArrowColumnWriter {
1019 chunk,
1020 writer: ArrowColumnWriterImpl::Column(writer),
1021 })
1022 };
1023
1024 let bytes = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
1025 let page_writer = self.create_page_writer(desc, out.len())?;
1026 let chunk = page_writer.buffer.clone();
1027 let writer = GenericColumnWriter::new(desc.clone(), props.clone(), page_writer);
1028 Ok(ArrowColumnWriter {
1029 chunk,
1030 writer: ArrowColumnWriterImpl::ByteArray(writer),
1031 })
1032 };
1033
1034 match data_type {
1035 _ if data_type.is_primitive() => out.push(col(leaves.next().unwrap())?),
1036 ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Boolean | ArrowDataType::Null => out.push(col(leaves.next().unwrap())?),
1037 ArrowDataType::LargeBinary
1038 | ArrowDataType::Binary
1039 | ArrowDataType::Utf8
1040 | ArrowDataType::LargeUtf8
1041 | ArrowDataType::BinaryView
1042 | ArrowDataType::Utf8View => {
1043 out.push(bytes(leaves.next().unwrap())?)
1044 }
1045 ArrowDataType::List(f)
1046 | ArrowDataType::LargeList(f)
1047 | ArrowDataType::FixedSizeList(f, _) => {
1048 self.get_arrow_column_writer(f.data_type(), props, leaves, out)?
1049 }
1050 ArrowDataType::Struct(fields) => {
1051 for field in fields {
1052 self.get_arrow_column_writer(field.data_type(), props, leaves, out)?
1053 }
1054 }
1055 ArrowDataType::Map(f, _) => match f.data_type() {
1056 ArrowDataType::Struct(f) => {
1057 self.get_arrow_column_writer(f[0].data_type(), props, leaves, out)?;
1058 self.get_arrow_column_writer(f[1].data_type(), props, leaves, out)?
1059 }
1060 _ => unreachable!("invalid map type"),
1061 }
1062 ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1063 ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 | ArrowDataType::Binary | ArrowDataType::LargeBinary => {
1064 out.push(bytes(leaves.next().unwrap())?)
1065 }
1066 ArrowDataType::Utf8View | ArrowDataType::BinaryView => {
1067 out.push(bytes(leaves.next().unwrap())?)
1068 }
1069 ArrowDataType::FixedSizeBinary(_) => {
1070 out.push(bytes(leaves.next().unwrap())?)
1071 }
1072 _ => {
1073 out.push(col(leaves.next().unwrap())?)
1074 }
1075 }
1076 _ => return Err(ParquetError::NYI(
1077 format!(
1078 "Attempting to write an Arrow type {data_type:?} to parquet that is not yet implemented"
1079 )
1080 ))
1081 }
1082 Ok(())
1083 }
1084}
1085
1086fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result<usize> {
1087 let column = levels.array().as_ref();
1088 let indices = levels.non_null_indices();
1089 match writer {
1090 ColumnWriter::Int32ColumnWriter(ref mut typed) => {
1091 match column.data_type() {
1092 ArrowDataType::Date64 => {
1093 let array = arrow_cast::cast(column, &ArrowDataType::Date32)?;
1095 let array = arrow_cast::cast(&array, &ArrowDataType::Int32)?;
1096
1097 let array = array.as_primitive::<Int32Type>();
1098 write_primitive(typed, array.values(), levels)
1099 }
1100 ArrowDataType::UInt32 => {
1101 let values = column.as_primitive::<UInt32Type>().values();
1102 let array = values.inner().typed_data::<i32>();
1105 write_primitive(typed, array, levels)
1106 }
1107 ArrowDataType::Decimal32(_, _) => {
1108 let array = column
1109 .as_primitive::<Decimal32Type>()
1110 .unary::<_, Int32Type>(|v| v);
1111 write_primitive(typed, array.values(), levels)
1112 }
1113 ArrowDataType::Decimal64(_, _) => {
1114 let array = column
1116 .as_primitive::<Decimal64Type>()
1117 .unary::<_, Int32Type>(|v| v as i32);
1118 write_primitive(typed, array.values(), levels)
1119 }
1120 ArrowDataType::Decimal128(_, _) => {
1121 let array = column
1123 .as_primitive::<Decimal128Type>()
1124 .unary::<_, Int32Type>(|v| v as i32);
1125 write_primitive(typed, array.values(), levels)
1126 }
1127 ArrowDataType::Decimal256(_, _) => {
1128 let array = column
1130 .as_primitive::<Decimal256Type>()
1131 .unary::<_, Int32Type>(|v| v.as_i128() as i32);
1132 write_primitive(typed, array.values(), levels)
1133 }
1134 ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1135 ArrowDataType::Decimal32(_, _) => {
1136 let array = arrow_cast::cast(column, value_type)?;
1137 let array = array
1138 .as_primitive::<Decimal32Type>()
1139 .unary::<_, Int32Type>(|v| v);
1140 write_primitive(typed, array.values(), levels)
1141 }
1142 ArrowDataType::Decimal64(_, _) => {
1143 let array = arrow_cast::cast(column, value_type)?;
1144 let array = array
1145 .as_primitive::<Decimal64Type>()
1146 .unary::<_, Int32Type>(|v| v as i32);
1147 write_primitive(typed, array.values(), levels)
1148 }
1149 ArrowDataType::Decimal128(_, _) => {
1150 let array = arrow_cast::cast(column, value_type)?;
1151 let array = array
1152 .as_primitive::<Decimal128Type>()
1153 .unary::<_, Int32Type>(|v| v as i32);
1154 write_primitive(typed, array.values(), levels)
1155 }
1156 ArrowDataType::Decimal256(_, _) => {
1157 let array = arrow_cast::cast(column, value_type)?;
1158 let array = array
1159 .as_primitive::<Decimal256Type>()
1160 .unary::<_, Int32Type>(|v| v.as_i128() as i32);
1161 write_primitive(typed, array.values(), levels)
1162 }
1163 _ => {
1164 let array = arrow_cast::cast(column, &ArrowDataType::Int32)?;
1165 let array = array.as_primitive::<Int32Type>();
1166 write_primitive(typed, array.values(), levels)
1167 }
1168 },
1169 _ => {
1170 let array = arrow_cast::cast(column, &ArrowDataType::Int32)?;
1171 let array = array.as_primitive::<Int32Type>();
1172 write_primitive(typed, array.values(), levels)
1173 }
1174 }
1175 }
1176 ColumnWriter::BoolColumnWriter(ref mut typed) => {
1177 let array = column.as_boolean();
1178 typed.write_batch(
1179 get_bool_array_slice(array, indices).as_slice(),
1180 levels.def_levels(),
1181 levels.rep_levels(),
1182 )
1183 }
1184 ColumnWriter::Int64ColumnWriter(ref mut typed) => {
1185 match column.data_type() {
1186 ArrowDataType::Date64 => {
1187 let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
1188
1189 let array = array.as_primitive::<Int64Type>();
1190 write_primitive(typed, array.values(), levels)
1191 }
1192 ArrowDataType::Int64 => {
1193 let array = column.as_primitive::<Int64Type>();
1194 write_primitive(typed, array.values(), levels)
1195 }
1196 ArrowDataType::UInt64 => {
1197 let values = column.as_primitive::<UInt64Type>().values();
1198 let array = values.inner().typed_data::<i64>();
1201 write_primitive(typed, array, levels)
1202 }
1203 ArrowDataType::Decimal64(_, _) => {
1204 let array = column
1205 .as_primitive::<Decimal64Type>()
1206 .unary::<_, Int64Type>(|v| v);
1207 write_primitive(typed, array.values(), levels)
1208 }
1209 ArrowDataType::Decimal128(_, _) => {
1210 let array = column
1212 .as_primitive::<Decimal128Type>()
1213 .unary::<_, Int64Type>(|v| v as i64);
1214 write_primitive(typed, array.values(), levels)
1215 }
1216 ArrowDataType::Decimal256(_, _) => {
1217 let array = column
1219 .as_primitive::<Decimal256Type>()
1220 .unary::<_, Int64Type>(|v| v.as_i128() as i64);
1221 write_primitive(typed, array.values(), levels)
1222 }
1223 ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1224 ArrowDataType::Decimal64(_, _) => {
1225 let array = arrow_cast::cast(column, value_type)?;
1226 let array = array
1227 .as_primitive::<Decimal64Type>()
1228 .unary::<_, Int64Type>(|v| v);
1229 write_primitive(typed, array.values(), levels)
1230 }
1231 ArrowDataType::Decimal128(_, _) => {
1232 let array = arrow_cast::cast(column, value_type)?;
1233 let array = array
1234 .as_primitive::<Decimal128Type>()
1235 .unary::<_, Int64Type>(|v| v as i64);
1236 write_primitive(typed, array.values(), levels)
1237 }
1238 ArrowDataType::Decimal256(_, _) => {
1239 let array = arrow_cast::cast(column, value_type)?;
1240 let array = array
1241 .as_primitive::<Decimal256Type>()
1242 .unary::<_, Int64Type>(|v| v.as_i128() as i64);
1243 write_primitive(typed, array.values(), levels)
1244 }
1245 _ => {
1246 let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
1247 let array = array.as_primitive::<Int64Type>();
1248 write_primitive(typed, array.values(), levels)
1249 }
1250 },
1251 _ => {
1252 let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
1253 let array = array.as_primitive::<Int64Type>();
1254 write_primitive(typed, array.values(), levels)
1255 }
1256 }
1257 }
1258 ColumnWriter::Int96ColumnWriter(ref mut _typed) => {
1259 unreachable!("Currently unreachable because data type not supported")
1260 }
1261 ColumnWriter::FloatColumnWriter(ref mut typed) => {
1262 let array = column.as_primitive::<Float32Type>();
1263 write_primitive(typed, array.values(), levels)
1264 }
1265 ColumnWriter::DoubleColumnWriter(ref mut typed) => {
1266 let array = column.as_primitive::<Float64Type>();
1267 write_primitive(typed, array.values(), levels)
1268 }
1269 ColumnWriter::ByteArrayColumnWriter(_) => {
1270 unreachable!("should use ByteArrayWriter")
1271 }
1272 ColumnWriter::FixedLenByteArrayColumnWriter(ref mut typed) => {
1273 let bytes = match column.data_type() {
1274 ArrowDataType::Interval(interval_unit) => match interval_unit {
1275 IntervalUnit::YearMonth => {
1276 let array = column
1277 .as_any()
1278 .downcast_ref::<arrow_array::IntervalYearMonthArray>()
1279 .unwrap();
1280 get_interval_ym_array_slice(array, indices)
1281 }
1282 IntervalUnit::DayTime => {
1283 let array = column
1284 .as_any()
1285 .downcast_ref::<arrow_array::IntervalDayTimeArray>()
1286 .unwrap();
1287 get_interval_dt_array_slice(array, indices)
1288 }
1289 _ => {
1290 return Err(ParquetError::NYI(
1291 format!(
1292 "Attempting to write an Arrow interval type {interval_unit:?} to parquet that is not yet implemented"
1293 )
1294 ));
1295 }
1296 },
1297 ArrowDataType::FixedSizeBinary(_) => {
1298 let array = column
1299 .as_any()
1300 .downcast_ref::<arrow_array::FixedSizeBinaryArray>()
1301 .unwrap();
1302 get_fsb_array_slice(array, indices)
1303 }
1304 ArrowDataType::Decimal32(_, _) => {
1305 let array = column.as_primitive::<Decimal32Type>();
1306 get_decimal_32_array_slice(array, indices)
1307 }
1308 ArrowDataType::Decimal64(_, _) => {
1309 let array = column.as_primitive::<Decimal64Type>();
1310 get_decimal_64_array_slice(array, indices)
1311 }
1312 ArrowDataType::Decimal128(_, _) => {
1313 let array = column.as_primitive::<Decimal128Type>();
1314 get_decimal_128_array_slice(array, indices)
1315 }
1316 ArrowDataType::Decimal256(_, _) => {
1317 let array = column
1318 .as_any()
1319 .downcast_ref::<arrow_array::Decimal256Array>()
1320 .unwrap();
1321 get_decimal_256_array_slice(array, indices)
1322 }
1323 ArrowDataType::Float16 => {
1324 let array = column.as_primitive::<Float16Type>();
1325 get_float_16_array_slice(array, indices)
1326 }
1327 _ => {
1328 return Err(ParquetError::NYI(
1329 "Attempting to write an Arrow type that is not yet implemented".to_string(),
1330 ));
1331 }
1332 };
1333 typed.write_batch(bytes.as_slice(), levels.def_levels(), levels.rep_levels())
1334 }
1335 }
1336}
1337
1338fn write_primitive<E: ColumnValueEncoder>(
1339 writer: &mut GenericColumnWriter<E>,
1340 values: &E::Values,
1341 levels: &ArrayLevels,
1342) -> Result<usize> {
1343 writer.write_batch_internal(
1344 values,
1345 Some(levels.non_null_indices()),
1346 levels.def_levels(),
1347 levels.rep_levels(),
1348 None,
1349 None,
1350 None,
1351 )
1352}
1353
1354fn get_bool_array_slice(array: &arrow_array::BooleanArray, indices: &[usize]) -> Vec<bool> {
1355 let mut values = Vec::with_capacity(indices.len());
1356 for i in indices {
1357 values.push(array.value(*i))
1358 }
1359 values
1360}
1361
1362fn get_interval_ym_array_slice(
1365 array: &arrow_array::IntervalYearMonthArray,
1366 indices: &[usize],
1367) -> Vec<FixedLenByteArray> {
1368 let mut values = Vec::with_capacity(indices.len());
1369 for i in indices {
1370 let mut value = array.value(*i).to_le_bytes().to_vec();
1371 let mut suffix = vec![0; 8];
1372 value.append(&mut suffix);
1373 values.push(FixedLenByteArray::from(ByteArray::from(value)))
1374 }
1375 values
1376}
1377
1378fn get_interval_dt_array_slice(
1381 array: &arrow_array::IntervalDayTimeArray,
1382 indices: &[usize],
1383) -> Vec<FixedLenByteArray> {
1384 let mut values = Vec::with_capacity(indices.len());
1385 for i in indices {
1386 let mut out = [0; 12];
1387 let value = array.value(*i);
1388 out[4..8].copy_from_slice(&value.days.to_le_bytes());
1389 out[8..12].copy_from_slice(&value.milliseconds.to_le_bytes());
1390 values.push(FixedLenByteArray::from(ByteArray::from(out.to_vec())));
1391 }
1392 values
1393}
1394
1395fn get_decimal_32_array_slice(
1396 array: &arrow_array::Decimal32Array,
1397 indices: &[usize],
1398) -> Vec<FixedLenByteArray> {
1399 let mut values = Vec::with_capacity(indices.len());
1400 let size = decimal_length_from_precision(array.precision());
1401 for i in indices {
1402 let as_be_bytes = array.value(*i).to_be_bytes();
1403 let resized_value = as_be_bytes[(4 - size)..].to_vec();
1404 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1405 }
1406 values
1407}
1408
1409fn get_decimal_64_array_slice(
1410 array: &arrow_array::Decimal64Array,
1411 indices: &[usize],
1412) -> Vec<FixedLenByteArray> {
1413 let mut values = Vec::with_capacity(indices.len());
1414 let size = decimal_length_from_precision(array.precision());
1415 for i in indices {
1416 let as_be_bytes = array.value(*i).to_be_bytes();
1417 let resized_value = as_be_bytes[(8 - size)..].to_vec();
1418 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1419 }
1420 values
1421}
1422
1423fn get_decimal_128_array_slice(
1424 array: &arrow_array::Decimal128Array,
1425 indices: &[usize],
1426) -> Vec<FixedLenByteArray> {
1427 let mut values = Vec::with_capacity(indices.len());
1428 let size = decimal_length_from_precision(array.precision());
1429 for i in indices {
1430 let as_be_bytes = array.value(*i).to_be_bytes();
1431 let resized_value = as_be_bytes[(16 - size)..].to_vec();
1432 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1433 }
1434 values
1435}
1436
1437fn get_decimal_256_array_slice(
1438 array: &arrow_array::Decimal256Array,
1439 indices: &[usize],
1440) -> Vec<FixedLenByteArray> {
1441 let mut values = Vec::with_capacity(indices.len());
1442 let size = decimal_length_from_precision(array.precision());
1443 for i in indices {
1444 let as_be_bytes = array.value(*i).to_be_bytes();
1445 let resized_value = as_be_bytes[(32 - size)..].to_vec();
1446 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1447 }
1448 values
1449}
1450
1451fn get_float_16_array_slice(
1452 array: &arrow_array::Float16Array,
1453 indices: &[usize],
1454) -> Vec<FixedLenByteArray> {
1455 let mut values = Vec::with_capacity(indices.len());
1456 for i in indices {
1457 let value = array.value(*i).to_le_bytes().to_vec();
1458 values.push(FixedLenByteArray::from(ByteArray::from(value)));
1459 }
1460 values
1461}
1462
1463fn get_fsb_array_slice(
1464 array: &arrow_array::FixedSizeBinaryArray,
1465 indices: &[usize],
1466) -> Vec<FixedLenByteArray> {
1467 let mut values = Vec::with_capacity(indices.len());
1468 for i in indices {
1469 let value = array.value(*i).to_vec();
1470 values.push(FixedLenByteArray::from(ByteArray::from(value)))
1471 }
1472 values
1473}
1474
1475#[cfg(test)]
1476mod tests {
1477 use super::*;
1478
1479 use std::fs::File;
1480 use std::io::Seek;
1481
1482 use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
1483 use crate::arrow::ARROW_SCHEMA_META_KEY;
1484 use crate::column::page::{Page, PageReader};
1485 use crate::file::page_encoding_stats::PageEncodingStats;
1486 use crate::file::reader::SerializedPageReader;
1487 use crate::format::PageHeader;
1488 use crate::schema::types::ColumnPath;
1489 use crate::thrift::TCompactSliceInputProtocol;
1490 use arrow::datatypes::ToByteSlice;
1491 use arrow::datatypes::{DataType, Schema};
1492 use arrow::error::Result as ArrowResult;
1493 use arrow::util::data_gen::create_random_array;
1494 use arrow::util::pretty::pretty_format_batches;
1495 use arrow::{array::*, buffer::Buffer};
1496 use arrow_buffer::{i256, IntervalDayTime, IntervalMonthDayNano, NullBuffer};
1497 use arrow_schema::Fields;
1498 use half::f16;
1499 use num::{FromPrimitive, ToPrimitive};
1500 use tempfile::tempfile;
1501
1502 use crate::basic::Encoding;
1503 use crate::data_type::AsBytes;
1504 use crate::file::metadata::{ColumnChunkMetaData, ParquetMetaData, ParquetMetaDataReader};
1505 use crate::file::page_index::index::Index;
1506 use crate::file::properties::{
1507 BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion,
1508 };
1509 use crate::file::serialized_reader::ReadOptionsBuilder;
1510 use crate::file::{
1511 reader::{FileReader, SerializedFileReader},
1512 statistics::Statistics,
1513 };
1514
1515 #[test]
1516 fn arrow_writer() {
1517 let schema = Schema::new(vec![
1519 Field::new("a", DataType::Int32, false),
1520 Field::new("b", DataType::Int32, true),
1521 ]);
1522
1523 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1525 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1526
1527 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
1529
1530 roundtrip(batch, Some(SMALL_SIZE / 2));
1531 }
1532
1533 fn get_bytes_after_close(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1534 let mut buffer = vec![];
1535
1536 let mut writer = ArrowWriter::try_new(&mut buffer, schema, None).unwrap();
1537 writer.write(expected_batch).unwrap();
1538 writer.close().unwrap();
1539
1540 buffer
1541 }
1542
1543 fn get_bytes_by_into_inner(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1544 let mut writer = ArrowWriter::try_new(Vec::new(), schema, None).unwrap();
1545 writer.write(expected_batch).unwrap();
1546 writer.into_inner().unwrap()
1547 }
1548
1549 #[test]
1550 fn roundtrip_bytes() {
1551 let schema = Arc::new(Schema::new(vec![
1553 Field::new("a", DataType::Int32, false),
1554 Field::new("b", DataType::Int32, true),
1555 ]));
1556
1557 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1559 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1560
1561 let expected_batch =
1563 RecordBatch::try_new(schema.clone(), vec![Arc::new(a), Arc::new(b)]).unwrap();
1564
1565 for buffer in [
1566 get_bytes_after_close(schema.clone(), &expected_batch),
1567 get_bytes_by_into_inner(schema, &expected_batch),
1568 ] {
1569 let cursor = Bytes::from(buffer);
1570 let mut record_batch_reader = ParquetRecordBatchReader::try_new(cursor, 1024).unwrap();
1571
1572 let actual_batch = record_batch_reader
1573 .next()
1574 .expect("No batch found")
1575 .expect("Unable to get batch");
1576
1577 assert_eq!(expected_batch.schema(), actual_batch.schema());
1578 assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
1579 assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
1580 for i in 0..expected_batch.num_columns() {
1581 let expected_data = expected_batch.column(i).to_data();
1582 let actual_data = actual_batch.column(i).to_data();
1583
1584 assert_eq!(expected_data, actual_data);
1585 }
1586 }
1587 }
1588
1589 #[test]
1590 fn arrow_writer_non_null() {
1591 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1593
1594 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1596
1597 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1599
1600 roundtrip(batch, Some(SMALL_SIZE / 2));
1601 }
1602
1603 #[test]
1604 fn arrow_writer_list() {
1605 let schema = Schema::new(vec![Field::new(
1607 "a",
1608 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
1609 true,
1610 )]);
1611
1612 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1614
1615 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1618
1619 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
1621 DataType::Int32,
1622 false,
1623 ))))
1624 .len(5)
1625 .add_buffer(a_value_offsets)
1626 .add_child_data(a_values.into_data())
1627 .null_bit_buffer(Some(Buffer::from([0b00011011])))
1628 .build()
1629 .unwrap();
1630 let a = ListArray::from(a_list_data);
1631
1632 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1634
1635 assert_eq!(batch.column(0).null_count(), 1);
1636
1637 roundtrip(batch, None);
1640 }
1641
1642 #[test]
1643 fn arrow_writer_list_non_null() {
1644 let schema = Schema::new(vec![Field::new(
1646 "a",
1647 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
1648 false,
1649 )]);
1650
1651 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1653
1654 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1657
1658 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
1660 DataType::Int32,
1661 false,
1662 ))))
1663 .len(5)
1664 .add_buffer(a_value_offsets)
1665 .add_child_data(a_values.into_data())
1666 .build()
1667 .unwrap();
1668 let a = ListArray::from(a_list_data);
1669
1670 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1672
1673 assert_eq!(batch.column(0).null_count(), 0);
1676
1677 roundtrip(batch, None);
1678 }
1679
1680 #[test]
1681 fn arrow_writer_binary() {
1682 let string_field = Field::new("a", DataType::Utf8, false);
1683 let binary_field = Field::new("b", DataType::Binary, false);
1684 let schema = Schema::new(vec![string_field, binary_field]);
1685
1686 let raw_string_values = vec!["foo", "bar", "baz", "quux"];
1687 let raw_binary_values = [
1688 b"foo".to_vec(),
1689 b"bar".to_vec(),
1690 b"baz".to_vec(),
1691 b"quux".to_vec(),
1692 ];
1693 let raw_binary_value_refs = raw_binary_values
1694 .iter()
1695 .map(|x| x.as_slice())
1696 .collect::<Vec<_>>();
1697
1698 let string_values = StringArray::from(raw_string_values.clone());
1699 let binary_values = BinaryArray::from(raw_binary_value_refs);
1700 let batch = RecordBatch::try_new(
1701 Arc::new(schema),
1702 vec![Arc::new(string_values), Arc::new(binary_values)],
1703 )
1704 .unwrap();
1705
1706 roundtrip(batch, Some(SMALL_SIZE / 2));
1707 }
1708
1709 #[test]
1710 fn arrow_writer_binary_view() {
1711 let string_field = Field::new("a", DataType::Utf8View, false);
1712 let binary_field = Field::new("b", DataType::BinaryView, false);
1713 let nullable_string_field = Field::new("a", DataType::Utf8View, true);
1714 let schema = Schema::new(vec![string_field, binary_field, nullable_string_field]);
1715
1716 let raw_string_values = vec!["foo", "bar", "large payload over 12 bytes", "lulu"];
1717 let raw_binary_values = vec![
1718 b"foo".to_vec(),
1719 b"bar".to_vec(),
1720 b"large payload over 12 bytes".to_vec(),
1721 b"lulu".to_vec(),
1722 ];
1723 let nullable_string_values =
1724 vec![Some("foo"), None, Some("large payload over 12 bytes"), None];
1725
1726 let string_view_values = StringViewArray::from(raw_string_values);
1727 let binary_view_values = BinaryViewArray::from_iter_values(raw_binary_values);
1728 let nullable_string_view_values = StringViewArray::from(nullable_string_values);
1729 let batch = RecordBatch::try_new(
1730 Arc::new(schema),
1731 vec![
1732 Arc::new(string_view_values),
1733 Arc::new(binary_view_values),
1734 Arc::new(nullable_string_view_values),
1735 ],
1736 )
1737 .unwrap();
1738
1739 roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
1740 roundtrip(batch, None);
1741 }
1742
1743 fn get_decimal_batch(precision: u8, scale: i8) -> RecordBatch {
1744 let decimal_field = Field::new("a", DataType::Decimal128(precision, scale), false);
1745 let schema = Schema::new(vec![decimal_field]);
1746
1747 let decimal_values = vec![10_000, 50_000, 0, -100]
1748 .into_iter()
1749 .map(Some)
1750 .collect::<Decimal128Array>()
1751 .with_precision_and_scale(precision, scale)
1752 .unwrap();
1753
1754 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(decimal_values)]).unwrap()
1755 }
1756
1757 #[test]
1758 fn arrow_writer_decimal() {
1759 let batch_int32_decimal = get_decimal_batch(5, 2);
1761 roundtrip(batch_int32_decimal, Some(SMALL_SIZE / 2));
1762 let batch_int64_decimal = get_decimal_batch(12, 2);
1764 roundtrip(batch_int64_decimal, Some(SMALL_SIZE / 2));
1765 let batch_fixed_len_byte_array_decimal = get_decimal_batch(30, 2);
1767 roundtrip(batch_fixed_len_byte_array_decimal, Some(SMALL_SIZE / 2));
1768 }
1769
1770 #[test]
1771 fn arrow_writer_complex() {
1772 let struct_field_d = Arc::new(Field::new("d", DataType::Float64, true));
1774 let struct_field_f = Arc::new(Field::new("f", DataType::Float32, true));
1775 let struct_field_g = Arc::new(Field::new_list(
1776 "g",
1777 Field::new_list_field(DataType::Int16, true),
1778 false,
1779 ));
1780 let struct_field_h = Arc::new(Field::new_list(
1781 "h",
1782 Field::new_list_field(DataType::Int16, false),
1783 true,
1784 ));
1785 let struct_field_e = Arc::new(Field::new_struct(
1786 "e",
1787 vec![
1788 struct_field_f.clone(),
1789 struct_field_g.clone(),
1790 struct_field_h.clone(),
1791 ],
1792 false,
1793 ));
1794 let schema = Schema::new(vec![
1795 Field::new("a", DataType::Int32, false),
1796 Field::new("b", DataType::Int32, true),
1797 Field::new_struct(
1798 "c",
1799 vec![struct_field_d.clone(), struct_field_e.clone()],
1800 false,
1801 ),
1802 ]);
1803
1804 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1806 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1807 let d = Float64Array::from(vec![None, None, None, Some(1.0), None]);
1808 let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]);
1809
1810 let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1811
1812 let g_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1815
1816 let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
1818 .len(5)
1819 .add_buffer(g_value_offsets.clone())
1820 .add_child_data(g_value.to_data())
1821 .build()
1822 .unwrap();
1823 let g = ListArray::from(g_list_data);
1824 let h_list_data = ArrayData::builder(struct_field_h.data_type().clone())
1826 .len(5)
1827 .add_buffer(g_value_offsets)
1828 .add_child_data(g_value.to_data())
1829 .null_bit_buffer(Some(Buffer::from([0b00011011])))
1830 .build()
1831 .unwrap();
1832 let h = ListArray::from(h_list_data);
1833
1834 let e = StructArray::from(vec![
1835 (struct_field_f, Arc::new(f) as ArrayRef),
1836 (struct_field_g, Arc::new(g) as ArrayRef),
1837 (struct_field_h, Arc::new(h) as ArrayRef),
1838 ]);
1839
1840 let c = StructArray::from(vec![
1841 (struct_field_d, Arc::new(d) as ArrayRef),
1842 (struct_field_e, Arc::new(e) as ArrayRef),
1843 ]);
1844
1845 let batch = RecordBatch::try_new(
1847 Arc::new(schema),
1848 vec![Arc::new(a), Arc::new(b), Arc::new(c)],
1849 )
1850 .unwrap();
1851
1852 roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
1853 roundtrip(batch, Some(SMALL_SIZE / 3));
1854 }
1855
1856 #[test]
1857 fn arrow_writer_complex_mixed() {
1858 let offset_field = Arc::new(Field::new("offset", DataType::Int32, false));
1863 let partition_field = Arc::new(Field::new("partition", DataType::Int64, true));
1864 let topic_field = Arc::new(Field::new("topic", DataType::Utf8, true));
1865 let schema = Schema::new(vec![Field::new(
1866 "some_nested_object",
1867 DataType::Struct(Fields::from(vec![
1868 offset_field.clone(),
1869 partition_field.clone(),
1870 topic_field.clone(),
1871 ])),
1872 false,
1873 )]);
1874
1875 let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
1877 let partition = Int64Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1878 let topic = StringArray::from(vec![Some("A"), None, Some("A"), Some(""), None]);
1879
1880 let some_nested_object = StructArray::from(vec![
1881 (offset_field, Arc::new(offset) as ArrayRef),
1882 (partition_field, Arc::new(partition) as ArrayRef),
1883 (topic_field, Arc::new(topic) as ArrayRef),
1884 ]);
1885
1886 let batch =
1888 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
1889
1890 roundtrip(batch, Some(SMALL_SIZE / 2));
1891 }
1892
1893 #[test]
1894 fn arrow_writer_map() {
1895 let json_content = r#"
1897 {"stocks":{"long": "$AAA", "short": "$BBB"}}
1898 {"stocks":{"long": null, "long": "$CCC", "short": null}}
1899 {"stocks":{"hedged": "$YYY", "long": null, "short": "$D"}}
1900 "#;
1901 let entries_struct_type = DataType::Struct(Fields::from(vec![
1902 Field::new("key", DataType::Utf8, false),
1903 Field::new("value", DataType::Utf8, true),
1904 ]));
1905 let stocks_field = Field::new(
1906 "stocks",
1907 DataType::Map(
1908 Arc::new(Field::new("entries", entries_struct_type, false)),
1909 false,
1910 ),
1911 true,
1912 );
1913 let schema = Arc::new(Schema::new(vec![stocks_field]));
1914 let builder = arrow::json::ReaderBuilder::new(schema).with_batch_size(64);
1915 let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
1916
1917 let batch = reader.next().unwrap().unwrap();
1918 roundtrip(batch, None);
1919 }
1920
1921 #[test]
1922 fn arrow_writer_2_level_struct() {
1923 let field_c = Field::new("c", DataType::Int32, true);
1925 let field_b = Field::new("b", DataType::Struct(vec![field_c].into()), true);
1926 let type_a = DataType::Struct(vec![field_b.clone()].into());
1927 let field_a = Field::new("a", type_a, true);
1928 let schema = Schema::new(vec![field_a.clone()]);
1929
1930 let c = Int32Array::from(vec![Some(1), None, Some(3), None, None, Some(6)]);
1932 let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
1933 .len(6)
1934 .null_bit_buffer(Some(Buffer::from([0b00100111])))
1935 .add_child_data(c.into_data())
1936 .build()
1937 .unwrap();
1938 let b = StructArray::from(b_data);
1939 let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
1940 .len(6)
1941 .null_bit_buffer(Some(Buffer::from([0b00101111])))
1942 .add_child_data(b.into_data())
1943 .build()
1944 .unwrap();
1945 let a = StructArray::from(a_data);
1946
1947 assert_eq!(a.null_count(), 1);
1948 assert_eq!(a.column(0).null_count(), 2);
1949
1950 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1952
1953 roundtrip(batch, Some(SMALL_SIZE / 2));
1954 }
1955
1956 #[test]
1957 fn arrow_writer_2_level_struct_non_null() {
1958 let field_c = Field::new("c", DataType::Int32, false);
1960 let type_b = DataType::Struct(vec![field_c].into());
1961 let field_b = Field::new("b", type_b.clone(), false);
1962 let type_a = DataType::Struct(vec![field_b].into());
1963 let field_a = Field::new("a", type_a.clone(), false);
1964 let schema = Schema::new(vec![field_a]);
1965
1966 let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
1968 let b_data = ArrayDataBuilder::new(type_b)
1969 .len(6)
1970 .add_child_data(c.into_data())
1971 .build()
1972 .unwrap();
1973 let b = StructArray::from(b_data);
1974 let a_data = ArrayDataBuilder::new(type_a)
1975 .len(6)
1976 .add_child_data(b.into_data())
1977 .build()
1978 .unwrap();
1979 let a = StructArray::from(a_data);
1980
1981 assert_eq!(a.null_count(), 0);
1982 assert_eq!(a.column(0).null_count(), 0);
1983
1984 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1986
1987 roundtrip(batch, Some(SMALL_SIZE / 2));
1988 }
1989
1990 #[test]
1991 fn arrow_writer_2_level_struct_mixed_null() {
1992 let field_c = Field::new("c", DataType::Int32, false);
1994 let type_b = DataType::Struct(vec![field_c].into());
1995 let field_b = Field::new("b", type_b.clone(), true);
1996 let type_a = DataType::Struct(vec![field_b].into());
1997 let field_a = Field::new("a", type_a.clone(), false);
1998 let schema = Schema::new(vec![field_a]);
1999
2000 let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
2002 let b_data = ArrayDataBuilder::new(type_b)
2003 .len(6)
2004 .null_bit_buffer(Some(Buffer::from([0b00100111])))
2005 .add_child_data(c.into_data())
2006 .build()
2007 .unwrap();
2008 let b = StructArray::from(b_data);
2009 let a_data = ArrayDataBuilder::new(type_a)
2011 .len(6)
2012 .add_child_data(b.into_data())
2013 .build()
2014 .unwrap();
2015 let a = StructArray::from(a_data);
2016
2017 assert_eq!(a.null_count(), 0);
2018 assert_eq!(a.column(0).null_count(), 2);
2019
2020 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2022
2023 roundtrip(batch, Some(SMALL_SIZE / 2));
2024 }
2025
2026 #[test]
2027 fn arrow_writer_2_level_struct_mixed_null_2() {
2028 let field_c = Field::new("c", DataType::Int32, false);
2030 let field_d = Field::new("d", DataType::FixedSizeBinary(4), false);
2031 let field_e = Field::new(
2032 "e",
2033 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2034 false,
2035 );
2036
2037 let field_b = Field::new(
2038 "b",
2039 DataType::Struct(vec![field_c, field_d, field_e].into()),
2040 false,
2041 );
2042 let type_a = DataType::Struct(vec![field_b.clone()].into());
2043 let field_a = Field::new("a", type_a, true);
2044 let schema = Schema::new(vec![field_a.clone()]);
2045
2046 let c = Int32Array::from_iter_values(0..6);
2048 let d = FixedSizeBinaryArray::try_from_iter(
2049 ["aaaa", "bbbb", "cccc", "dddd", "eeee", "ffff"].into_iter(),
2050 )
2051 .expect("four byte values");
2052 let e = Int32DictionaryArray::from_iter(["one", "two", "three", "four", "five", "one"]);
2053 let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
2054 .len(6)
2055 .add_child_data(c.into_data())
2056 .add_child_data(d.into_data())
2057 .add_child_data(e.into_data())
2058 .build()
2059 .unwrap();
2060 let b = StructArray::from(b_data);
2061 let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
2062 .len(6)
2063 .null_bit_buffer(Some(Buffer::from([0b00100101])))
2064 .add_child_data(b.into_data())
2065 .build()
2066 .unwrap();
2067 let a = StructArray::from(a_data);
2068
2069 assert_eq!(a.null_count(), 3);
2070 assert_eq!(a.column(0).null_count(), 0);
2071
2072 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2074
2075 roundtrip(batch, Some(SMALL_SIZE / 2));
2076 }
2077
2078 #[test]
2079 fn test_fixed_size_binary_in_dict() {
2080 fn test_fixed_size_binary_in_dict_inner<K>()
2081 where
2082 K: ArrowDictionaryKeyType,
2083 K::Native: FromPrimitive + ToPrimitive + TryFrom<u8>,
2084 <<K as arrow_array::ArrowPrimitiveType>::Native as TryFrom<u8>>::Error: std::fmt::Debug,
2085 {
2086 let field = Field::new(
2087 "a",
2088 DataType::Dictionary(
2089 Box::new(K::DATA_TYPE),
2090 Box::new(DataType::FixedSizeBinary(4)),
2091 ),
2092 false,
2093 );
2094 let schema = Schema::new(vec![field]);
2095
2096 let keys: Vec<K::Native> = vec![
2097 K::Native::try_from(0u8).unwrap(),
2098 K::Native::try_from(0u8).unwrap(),
2099 K::Native::try_from(1u8).unwrap(),
2100 ];
2101 let keys = PrimitiveArray::<K>::from_iter_values(keys);
2102 let values = FixedSizeBinaryArray::try_from_iter(
2103 vec![vec![0, 0, 0, 0], vec![1, 1, 1, 1]].into_iter(),
2104 )
2105 .unwrap();
2106
2107 let data = DictionaryArray::<K>::new(keys, Arc::new(values));
2108 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(data)]).unwrap();
2109 roundtrip(batch, None);
2110 }
2111
2112 test_fixed_size_binary_in_dict_inner::<UInt8Type>();
2113 test_fixed_size_binary_in_dict_inner::<UInt16Type>();
2114 test_fixed_size_binary_in_dict_inner::<UInt32Type>();
2115 test_fixed_size_binary_in_dict_inner::<UInt16Type>();
2116 test_fixed_size_binary_in_dict_inner::<Int8Type>();
2117 test_fixed_size_binary_in_dict_inner::<Int16Type>();
2118 test_fixed_size_binary_in_dict_inner::<Int32Type>();
2119 test_fixed_size_binary_in_dict_inner::<Int64Type>();
2120 }
2121
2122 #[test]
2123 fn test_empty_dict() {
2124 let struct_fields = Fields::from(vec![Field::new(
2125 "dict",
2126 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2127 false,
2128 )]);
2129
2130 let schema = Schema::new(vec![Field::new_struct(
2131 "struct",
2132 struct_fields.clone(),
2133 true,
2134 )]);
2135 let dictionary = Arc::new(DictionaryArray::new(
2136 Int32Array::new_null(5),
2137 Arc::new(StringArray::new_null(0)),
2138 ));
2139
2140 let s = StructArray::new(
2141 struct_fields,
2142 vec![dictionary],
2143 Some(NullBuffer::new_null(5)),
2144 );
2145
2146 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(s)]).unwrap();
2147 roundtrip(batch, None);
2148 }
2149 #[test]
2150 fn arrow_writer_page_size() {
2151 let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)]));
2152
2153 let mut builder = StringBuilder::with_capacity(100, 329 * 10_000);
2154
2155 for i in 0..10 {
2157 let value = i
2158 .to_string()
2159 .repeat(10)
2160 .chars()
2161 .take(10)
2162 .collect::<String>();
2163
2164 builder.append_value(value);
2165 }
2166
2167 let array = Arc::new(builder.finish());
2168
2169 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
2170
2171 let file = tempfile::tempfile().unwrap();
2172
2173 let props = WriterProperties::builder()
2175 .set_data_page_size_limit(1)
2176 .set_dictionary_page_size_limit(1)
2177 .set_write_batch_size(1)
2178 .build();
2179
2180 let mut writer =
2181 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
2182 .expect("Unable to write file");
2183 writer.write(&batch).unwrap();
2184 writer.close().unwrap();
2185
2186 let options = ReadOptionsBuilder::new().with_page_index().build();
2187 let reader =
2188 SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap();
2189
2190 let column = reader.metadata().row_group(0).columns();
2191
2192 assert_eq!(column.len(), 1);
2193
2194 assert!(
2197 column[0].dictionary_page_offset().is_some(),
2198 "Expected a dictionary page"
2199 );
2200
2201 assert!(reader.metadata().offset_index().is_some());
2202 let offset_indexes = &reader.metadata().offset_index().unwrap()[0];
2203
2204 let page_locations = offset_indexes[0].page_locations.clone();
2205
2206 assert_eq!(
2209 page_locations.len(),
2210 10,
2211 "Expected 10 pages but got {page_locations:#?}"
2212 );
2213 }
2214
2215 #[test]
2216 fn arrow_writer_float_nans() {
2217 let f16_field = Field::new("a", DataType::Float16, false);
2218 let f32_field = Field::new("b", DataType::Float32, false);
2219 let f64_field = Field::new("c", DataType::Float64, false);
2220 let schema = Schema::new(vec![f16_field, f32_field, f64_field]);
2221
2222 let f16_values = (0..MEDIUM_SIZE)
2223 .map(|i| {
2224 Some(if i % 2 == 0 {
2225 f16::NAN
2226 } else {
2227 f16::from_f32(i as f32)
2228 })
2229 })
2230 .collect::<Float16Array>();
2231
2232 let f32_values = (0..MEDIUM_SIZE)
2233 .map(|i| Some(if i % 2 == 0 { f32::NAN } else { i as f32 }))
2234 .collect::<Float32Array>();
2235
2236 let f64_values = (0..MEDIUM_SIZE)
2237 .map(|i| Some(if i % 2 == 0 { f64::NAN } else { i as f64 }))
2238 .collect::<Float64Array>();
2239
2240 let batch = RecordBatch::try_new(
2241 Arc::new(schema),
2242 vec![
2243 Arc::new(f16_values),
2244 Arc::new(f32_values),
2245 Arc::new(f64_values),
2246 ],
2247 )
2248 .unwrap();
2249
2250 roundtrip(batch, None);
2251 }
2252
2253 const SMALL_SIZE: usize = 7;
2254 const MEDIUM_SIZE: usize = 63;
2255
2256 fn roundtrip(expected_batch: RecordBatch, max_row_group_size: Option<usize>) -> Vec<File> {
2257 let mut files = vec![];
2258 for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2259 let mut props = WriterProperties::builder().set_writer_version(version);
2260
2261 if let Some(size) = max_row_group_size {
2262 props = props.set_max_row_group_size(size)
2263 }
2264
2265 let props = props.build();
2266 files.push(roundtrip_opts(&expected_batch, props))
2267 }
2268 files
2269 }
2270
2271 fn roundtrip_opts_with_array_validation<F>(
2272 expected_batch: &RecordBatch,
2273 props: WriterProperties,
2274 validate: F,
2275 ) -> File
2276 where
2277 F: Fn(&ArrayData, &ArrayData),
2278 {
2279 let file = tempfile::tempfile().unwrap();
2280
2281 let mut writer = ArrowWriter::try_new(
2282 file.try_clone().unwrap(),
2283 expected_batch.schema(),
2284 Some(props),
2285 )
2286 .expect("Unable to write file");
2287 writer.write(expected_batch).unwrap();
2288 writer.close().unwrap();
2289
2290 let mut record_batch_reader =
2291 ParquetRecordBatchReader::try_new(file.try_clone().unwrap(), 1024).unwrap();
2292
2293 let actual_batch = record_batch_reader
2294 .next()
2295 .expect("No batch found")
2296 .expect("Unable to get batch");
2297
2298 assert_eq!(expected_batch.schema(), actual_batch.schema());
2299 assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
2300 assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
2301 for i in 0..expected_batch.num_columns() {
2302 let expected_data = expected_batch.column(i).to_data();
2303 let actual_data = actual_batch.column(i).to_data();
2304 validate(&expected_data, &actual_data);
2305 }
2306
2307 file
2308 }
2309
2310 fn roundtrip_opts(expected_batch: &RecordBatch, props: WriterProperties) -> File {
2311 roundtrip_opts_with_array_validation(expected_batch, props, |a, b| {
2312 a.validate_full().expect("valid expected data");
2313 b.validate_full().expect("valid actual data");
2314 assert_eq!(a, b)
2315 })
2316 }
2317
2318 struct RoundTripOptions {
2319 values: ArrayRef,
2320 schema: SchemaRef,
2321 bloom_filter: bool,
2322 bloom_filter_position: BloomFilterPosition,
2323 }
2324
2325 impl RoundTripOptions {
2326 fn new(values: ArrayRef, nullable: bool) -> Self {
2327 let data_type = values.data_type().clone();
2328 let schema = Schema::new(vec![Field::new("col", data_type, nullable)]);
2329 Self {
2330 values,
2331 schema: Arc::new(schema),
2332 bloom_filter: false,
2333 bloom_filter_position: BloomFilterPosition::AfterRowGroup,
2334 }
2335 }
2336 }
2337
2338 fn one_column_roundtrip(values: ArrayRef, nullable: bool) -> Vec<File> {
2339 one_column_roundtrip_with_options(RoundTripOptions::new(values, nullable))
2340 }
2341
2342 fn one_column_roundtrip_with_schema(values: ArrayRef, schema: SchemaRef) -> Vec<File> {
2343 let mut options = RoundTripOptions::new(values, false);
2344 options.schema = schema;
2345 one_column_roundtrip_with_options(options)
2346 }
2347
2348 fn one_column_roundtrip_with_options(options: RoundTripOptions) -> Vec<File> {
2349 let RoundTripOptions {
2350 values,
2351 schema,
2352 bloom_filter,
2353 bloom_filter_position,
2354 } = options;
2355
2356 let encodings = match values.data_type() {
2357 DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary | DataType::LargeBinary => {
2358 vec![
2359 Encoding::PLAIN,
2360 Encoding::DELTA_BYTE_ARRAY,
2361 Encoding::DELTA_LENGTH_BYTE_ARRAY,
2362 ]
2363 }
2364 DataType::Int64
2365 | DataType::Int32
2366 | DataType::Int16
2367 | DataType::Int8
2368 | DataType::UInt64
2369 | DataType::UInt32
2370 | DataType::UInt16
2371 | DataType::UInt8 => vec![
2372 Encoding::PLAIN,
2373 Encoding::DELTA_BINARY_PACKED,
2374 Encoding::BYTE_STREAM_SPLIT,
2375 ],
2376 DataType::Float32 | DataType::Float64 => {
2377 vec![Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT]
2378 }
2379 _ => vec![Encoding::PLAIN],
2380 };
2381
2382 let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2383
2384 let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
2385
2386 let mut files = vec![];
2387 for dictionary_size in [0, 1, 1024] {
2388 for encoding in &encodings {
2389 for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2390 for row_group_size in row_group_sizes {
2391 let props = WriterProperties::builder()
2392 .set_writer_version(version)
2393 .set_max_row_group_size(row_group_size)
2394 .set_dictionary_enabled(dictionary_size != 0)
2395 .set_dictionary_page_size_limit(dictionary_size.max(1))
2396 .set_encoding(*encoding)
2397 .set_bloom_filter_enabled(bloom_filter)
2398 .set_bloom_filter_position(bloom_filter_position)
2399 .build();
2400
2401 files.push(roundtrip_opts(&expected_batch, props))
2402 }
2403 }
2404 }
2405 }
2406 files
2407 }
2408
2409 fn values_required<A, I>(iter: I) -> Vec<File>
2410 where
2411 A: From<Vec<I::Item>> + Array + 'static,
2412 I: IntoIterator,
2413 {
2414 let raw_values: Vec<_> = iter.into_iter().collect();
2415 let values = Arc::new(A::from(raw_values));
2416 one_column_roundtrip(values, false)
2417 }
2418
2419 fn values_optional<A, I>(iter: I) -> Vec<File>
2420 where
2421 A: From<Vec<Option<I::Item>>> + Array + 'static,
2422 I: IntoIterator,
2423 {
2424 let optional_raw_values: Vec<_> = iter
2425 .into_iter()
2426 .enumerate()
2427 .map(|(i, v)| if i % 2 == 0 { None } else { Some(v) })
2428 .collect();
2429 let optional_values = Arc::new(A::from(optional_raw_values));
2430 one_column_roundtrip(optional_values, true)
2431 }
2432
2433 fn required_and_optional<A, I>(iter: I)
2434 where
2435 A: From<Vec<I::Item>> + From<Vec<Option<I::Item>>> + Array + 'static,
2436 I: IntoIterator + Clone,
2437 {
2438 values_required::<A, I>(iter.clone());
2439 values_optional::<A, I>(iter);
2440 }
2441
2442 fn check_bloom_filter<T: AsBytes>(
2443 files: Vec<File>,
2444 file_column: String,
2445 positive_values: Vec<T>,
2446 negative_values: Vec<T>,
2447 ) {
2448 files.into_iter().take(1).for_each(|file| {
2449 let file_reader = SerializedFileReader::new_with_options(
2450 file,
2451 ReadOptionsBuilder::new()
2452 .with_reader_properties(
2453 ReaderProperties::builder()
2454 .set_read_bloom_filter(true)
2455 .build(),
2456 )
2457 .build(),
2458 )
2459 .expect("Unable to open file as Parquet");
2460 let metadata = file_reader.metadata();
2461
2462 let mut bloom_filters: Vec<_> = vec![];
2464 for (ri, row_group) in metadata.row_groups().iter().enumerate() {
2465 if let Some((column_index, _)) = row_group
2466 .columns()
2467 .iter()
2468 .enumerate()
2469 .find(|(_, column)| column.column_path().string() == file_column)
2470 {
2471 let row_group_reader = file_reader
2472 .get_row_group(ri)
2473 .expect("Unable to read row group");
2474 if let Some(sbbf) = row_group_reader.get_column_bloom_filter(column_index) {
2475 bloom_filters.push(sbbf.clone());
2476 } else {
2477 panic!("No bloom filter for column named {file_column} found");
2478 }
2479 } else {
2480 panic!("No column named {file_column} found");
2481 }
2482 }
2483
2484 positive_values.iter().for_each(|value| {
2485 let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2486 assert!(
2487 found.is_some(),
2488 "{}",
2489 format!("Value {:?} should be in bloom filter", value.as_bytes())
2490 );
2491 });
2492
2493 negative_values.iter().for_each(|value| {
2494 let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2495 assert!(
2496 found.is_none(),
2497 "{}",
2498 format!("Value {:?} should not be in bloom filter", value.as_bytes())
2499 );
2500 });
2501 });
2502 }
2503
2504 #[test]
2505 fn all_null_primitive_single_column() {
2506 let values = Arc::new(Int32Array::from(vec![None; SMALL_SIZE]));
2507 one_column_roundtrip(values, true);
2508 }
2509 #[test]
2510 fn null_single_column() {
2511 let values = Arc::new(NullArray::new(SMALL_SIZE));
2512 one_column_roundtrip(values, true);
2513 }
2515
2516 #[test]
2517 fn bool_single_column() {
2518 required_and_optional::<BooleanArray, _>(
2519 [true, false].iter().cycle().copied().take(SMALL_SIZE),
2520 );
2521 }
2522
2523 #[test]
2524 fn bool_large_single_column() {
2525 let values = Arc::new(
2526 [None, Some(true), Some(false)]
2527 .iter()
2528 .cycle()
2529 .copied()
2530 .take(200_000)
2531 .collect::<BooleanArray>(),
2532 );
2533 let schema = Schema::new(vec![Field::new("col", values.data_type().clone(), true)]);
2534 let expected_batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2535 let file = tempfile::tempfile().unwrap();
2536
2537 let mut writer =
2538 ArrowWriter::try_new(file.try_clone().unwrap(), expected_batch.schema(), None)
2539 .expect("Unable to write file");
2540 writer.write(&expected_batch).unwrap();
2541 writer.close().unwrap();
2542 }
2543
2544 #[test]
2545 fn check_page_offset_index_with_nan() {
2546 let values = Arc::new(Float64Array::from(vec![f64::NAN; 10]));
2547 let schema = Schema::new(vec![Field::new("col", DataType::Float64, true)]);
2548 let batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2549
2550 let mut out = Vec::with_capacity(1024);
2551 let mut writer =
2552 ArrowWriter::try_new(&mut out, batch.schema(), None).expect("Unable to write file");
2553 writer.write(&batch).unwrap();
2554 let file_meta_data = writer.close().unwrap();
2555 for row_group in file_meta_data.row_groups {
2556 for column in row_group.columns {
2557 assert!(column.offset_index_offset.is_some());
2558 assert!(column.offset_index_length.is_some());
2559 assert!(column.column_index_offset.is_none());
2560 assert!(column.column_index_length.is_none());
2561 }
2562 }
2563 }
2564
2565 #[test]
2566 fn i8_single_column() {
2567 required_and_optional::<Int8Array, _>(0..SMALL_SIZE as i8);
2568 }
2569
2570 #[test]
2571 fn i16_single_column() {
2572 required_and_optional::<Int16Array, _>(0..SMALL_SIZE as i16);
2573 }
2574
2575 #[test]
2576 fn i32_single_column() {
2577 required_and_optional::<Int32Array, _>(0..SMALL_SIZE as i32);
2578 }
2579
2580 #[test]
2581 fn i64_single_column() {
2582 required_and_optional::<Int64Array, _>(0..SMALL_SIZE as i64);
2583 }
2584
2585 #[test]
2586 fn u8_single_column() {
2587 required_and_optional::<UInt8Array, _>(0..SMALL_SIZE as u8);
2588 }
2589
2590 #[test]
2591 fn u16_single_column() {
2592 required_and_optional::<UInt16Array, _>(0..SMALL_SIZE as u16);
2593 }
2594
2595 #[test]
2596 fn u32_single_column() {
2597 required_and_optional::<UInt32Array, _>(0..SMALL_SIZE as u32);
2598 }
2599
2600 #[test]
2601 fn u64_single_column() {
2602 required_and_optional::<UInt64Array, _>(0..SMALL_SIZE as u64);
2603 }
2604
2605 #[test]
2606 fn f32_single_column() {
2607 required_and_optional::<Float32Array, _>((0..SMALL_SIZE).map(|i| i as f32));
2608 }
2609
2610 #[test]
2611 fn f64_single_column() {
2612 required_and_optional::<Float64Array, _>((0..SMALL_SIZE).map(|i| i as f64));
2613 }
2614
2615 #[test]
2620 fn timestamp_second_single_column() {
2621 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2622 let values = Arc::new(TimestampSecondArray::from(raw_values));
2623
2624 one_column_roundtrip(values, false);
2625 }
2626
2627 #[test]
2628 fn timestamp_millisecond_single_column() {
2629 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2630 let values = Arc::new(TimestampMillisecondArray::from(raw_values));
2631
2632 one_column_roundtrip(values, false);
2633 }
2634
2635 #[test]
2636 fn timestamp_microsecond_single_column() {
2637 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2638 let values = Arc::new(TimestampMicrosecondArray::from(raw_values));
2639
2640 one_column_roundtrip(values, false);
2641 }
2642
2643 #[test]
2644 fn timestamp_nanosecond_single_column() {
2645 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2646 let values = Arc::new(TimestampNanosecondArray::from(raw_values));
2647
2648 one_column_roundtrip(values, false);
2649 }
2650
2651 #[test]
2652 fn date32_single_column() {
2653 required_and_optional::<Date32Array, _>(0..SMALL_SIZE as i32);
2654 }
2655
2656 #[test]
2657 fn date64_single_column() {
2658 required_and_optional::<Date64Array, _>(
2660 (0..(SMALL_SIZE as i64 * 86400000)).step_by(86400000),
2661 );
2662 }
2663
2664 #[test]
2665 fn time32_second_single_column() {
2666 required_and_optional::<Time32SecondArray, _>(0..SMALL_SIZE as i32);
2667 }
2668
2669 #[test]
2670 fn time32_millisecond_single_column() {
2671 required_and_optional::<Time32MillisecondArray, _>(0..SMALL_SIZE as i32);
2672 }
2673
2674 #[test]
2675 fn time64_microsecond_single_column() {
2676 required_and_optional::<Time64MicrosecondArray, _>(0..SMALL_SIZE as i64);
2677 }
2678
2679 #[test]
2680 fn time64_nanosecond_single_column() {
2681 required_and_optional::<Time64NanosecondArray, _>(0..SMALL_SIZE as i64);
2682 }
2683
2684 #[test]
2685 fn duration_second_single_column() {
2686 required_and_optional::<DurationSecondArray, _>(0..SMALL_SIZE as i64);
2687 }
2688
2689 #[test]
2690 fn duration_millisecond_single_column() {
2691 required_and_optional::<DurationMillisecondArray, _>(0..SMALL_SIZE as i64);
2692 }
2693
2694 #[test]
2695 fn duration_microsecond_single_column() {
2696 required_and_optional::<DurationMicrosecondArray, _>(0..SMALL_SIZE as i64);
2697 }
2698
2699 #[test]
2700 fn duration_nanosecond_single_column() {
2701 required_and_optional::<DurationNanosecondArray, _>(0..SMALL_SIZE as i64);
2702 }
2703
2704 #[test]
2705 fn interval_year_month_single_column() {
2706 required_and_optional::<IntervalYearMonthArray, _>(0..SMALL_SIZE as i32);
2707 }
2708
2709 #[test]
2710 fn interval_day_time_single_column() {
2711 required_and_optional::<IntervalDayTimeArray, _>(vec![
2712 IntervalDayTime::new(0, 1),
2713 IntervalDayTime::new(0, 3),
2714 IntervalDayTime::new(3, -2),
2715 IntervalDayTime::new(-200, 4),
2716 ]);
2717 }
2718
2719 #[test]
2720 #[should_panic(
2721 expected = "Attempting to write an Arrow interval type MonthDayNano to parquet that is not yet implemented"
2722 )]
2723 fn interval_month_day_nano_single_column() {
2724 required_and_optional::<IntervalMonthDayNanoArray, _>(vec![
2725 IntervalMonthDayNano::new(0, 1, 5),
2726 IntervalMonthDayNano::new(0, 3, 2),
2727 IntervalMonthDayNano::new(3, -2, -5),
2728 IntervalMonthDayNano::new(-200, 4, -1),
2729 ]);
2730 }
2731
2732 #[test]
2733 fn binary_single_column() {
2734 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2735 let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
2736 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2737
2738 values_required::<BinaryArray, _>(many_vecs_iter);
2740 }
2741
2742 #[test]
2743 fn binary_view_single_column() {
2744 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2745 let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
2746 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2747
2748 values_required::<BinaryViewArray, _>(many_vecs_iter);
2750 }
2751
2752 #[test]
2753 fn i32_column_bloom_filter_at_end() {
2754 let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
2755 let mut options = RoundTripOptions::new(array, false);
2756 options.bloom_filter = true;
2757 options.bloom_filter_position = BloomFilterPosition::End;
2758
2759 let files = one_column_roundtrip_with_options(options);
2760 check_bloom_filter(
2761 files,
2762 "col".to_string(),
2763 (0..SMALL_SIZE as i32).collect(),
2764 (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
2765 );
2766 }
2767
2768 #[test]
2769 fn i32_column_bloom_filter() {
2770 let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
2771 let mut options = RoundTripOptions::new(array, false);
2772 options.bloom_filter = true;
2773
2774 let files = one_column_roundtrip_with_options(options);
2775 check_bloom_filter(
2776 files,
2777 "col".to_string(),
2778 (0..SMALL_SIZE as i32).collect(),
2779 (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
2780 );
2781 }
2782
2783 #[test]
2784 fn binary_column_bloom_filter() {
2785 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2786 let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
2787 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2788
2789 let array = Arc::new(BinaryArray::from_iter_values(many_vecs_iter));
2790 let mut options = RoundTripOptions::new(array, false);
2791 options.bloom_filter = true;
2792
2793 let files = one_column_roundtrip_with_options(options);
2794 check_bloom_filter(
2795 files,
2796 "col".to_string(),
2797 many_vecs,
2798 vec![vec![(SMALL_SIZE + 1) as u8]],
2799 );
2800 }
2801
2802 #[test]
2803 fn empty_string_null_column_bloom_filter() {
2804 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2805 let raw_strs = raw_values.iter().map(|s| s.as_str());
2806
2807 let array = Arc::new(StringArray::from_iter_values(raw_strs));
2808 let mut options = RoundTripOptions::new(array, false);
2809 options.bloom_filter = true;
2810
2811 let files = one_column_roundtrip_with_options(options);
2812
2813 let optional_raw_values: Vec<_> = raw_values
2814 .iter()
2815 .enumerate()
2816 .filter_map(|(i, v)| if i % 2 == 0 { None } else { Some(v.as_str()) })
2817 .collect();
2818 check_bloom_filter(files, "col".to_string(), optional_raw_values, vec![""]);
2820 }
2821
2822 #[test]
2823 fn large_binary_single_column() {
2824 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2825 let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
2826 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2827
2828 values_required::<LargeBinaryArray, _>(many_vecs_iter);
2830 }
2831
2832 #[test]
2833 fn fixed_size_binary_single_column() {
2834 let mut builder = FixedSizeBinaryBuilder::new(4);
2835 builder.append_value(b"0123").unwrap();
2836 builder.append_null();
2837 builder.append_value(b"8910").unwrap();
2838 builder.append_value(b"1112").unwrap();
2839 let array = Arc::new(builder.finish());
2840
2841 one_column_roundtrip(array, true);
2842 }
2843
2844 #[test]
2845 fn string_single_column() {
2846 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2847 let raw_strs = raw_values.iter().map(|s| s.as_str());
2848
2849 required_and_optional::<StringArray, _>(raw_strs);
2850 }
2851
2852 #[test]
2853 fn large_string_single_column() {
2854 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2855 let raw_strs = raw_values.iter().map(|s| s.as_str());
2856
2857 required_and_optional::<LargeStringArray, _>(raw_strs);
2858 }
2859
2860 #[test]
2861 fn string_view_single_column() {
2862 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2863 let raw_strs = raw_values.iter().map(|s| s.as_str());
2864
2865 required_and_optional::<StringViewArray, _>(raw_strs);
2866 }
2867
2868 #[test]
2869 fn null_list_single_column() {
2870 let null_field = Field::new_list_field(DataType::Null, true);
2871 let list_field = Field::new("emptylist", DataType::List(Arc::new(null_field)), true);
2872
2873 let schema = Schema::new(vec![list_field]);
2874
2875 let a_values = NullArray::new(2);
2877 let a_value_offsets = arrow::buffer::Buffer::from([0, 0, 0, 2].to_byte_slice());
2878 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2879 DataType::Null,
2880 true,
2881 ))))
2882 .len(3)
2883 .add_buffer(a_value_offsets)
2884 .null_bit_buffer(Some(Buffer::from([0b00000101])))
2885 .add_child_data(a_values.into_data())
2886 .build()
2887 .unwrap();
2888
2889 let a = ListArray::from(a_list_data);
2890
2891 assert!(a.is_valid(0));
2892 assert!(!a.is_valid(1));
2893 assert!(a.is_valid(2));
2894
2895 assert_eq!(a.value(0).len(), 0);
2896 assert_eq!(a.value(2).len(), 2);
2897 assert_eq!(a.value(2).logical_nulls().unwrap().null_count(), 2);
2898
2899 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2900 roundtrip(batch, None);
2901 }
2902
2903 #[test]
2904 fn list_single_column() {
2905 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2906 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
2907 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2908 DataType::Int32,
2909 false,
2910 ))))
2911 .len(5)
2912 .add_buffer(a_value_offsets)
2913 .null_bit_buffer(Some(Buffer::from([0b00011011])))
2914 .add_child_data(a_values.into_data())
2915 .build()
2916 .unwrap();
2917
2918 assert_eq!(a_list_data.null_count(), 1);
2919
2920 let a = ListArray::from(a_list_data);
2921 let values = Arc::new(a);
2922
2923 one_column_roundtrip(values, true);
2924 }
2925
2926 #[test]
2927 fn large_list_single_column() {
2928 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2929 let a_value_offsets = arrow::buffer::Buffer::from([0i64, 1, 3, 3, 6, 10].to_byte_slice());
2930 let a_list_data = ArrayData::builder(DataType::LargeList(Arc::new(Field::new(
2931 "large_item",
2932 DataType::Int32,
2933 true,
2934 ))))
2935 .len(5)
2936 .add_buffer(a_value_offsets)
2937 .add_child_data(a_values.into_data())
2938 .null_bit_buffer(Some(Buffer::from([0b00011011])))
2939 .build()
2940 .unwrap();
2941
2942 assert_eq!(a_list_data.null_count(), 1);
2944
2945 let a = LargeListArray::from(a_list_data);
2946 let values = Arc::new(a);
2947
2948 one_column_roundtrip(values, true);
2949 }
2950
2951 #[test]
2952 fn list_nested_nulls() {
2953 use arrow::datatypes::Int32Type;
2954 let data = vec![
2955 Some(vec![Some(1)]),
2956 Some(vec![Some(2), Some(3)]),
2957 None,
2958 Some(vec![Some(4), Some(5), None]),
2959 Some(vec![None]),
2960 Some(vec![Some(6), Some(7)]),
2961 ];
2962
2963 let list = ListArray::from_iter_primitive::<Int32Type, _, _>(data.clone());
2964 one_column_roundtrip(Arc::new(list), true);
2965
2966 let list = LargeListArray::from_iter_primitive::<Int32Type, _, _>(data);
2967 one_column_roundtrip(Arc::new(list), true);
2968 }
2969
2970 #[test]
2971 fn struct_single_column() {
2972 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2973 let struct_field_a = Arc::new(Field::new("f", DataType::Int32, false));
2974 let s = StructArray::from(vec![(struct_field_a, Arc::new(a_values) as ArrayRef)]);
2975
2976 let values = Arc::new(s);
2977 one_column_roundtrip(values, false);
2978 }
2979
2980 #[test]
2981 fn list_and_map_coerced_names() {
2982 let list_field =
2984 Field::new_list("my_list", Field::new("item", DataType::Int32, false), false);
2985 let map_field = Field::new_map(
2986 "my_map",
2987 "entries",
2988 Field::new("keys", DataType::Int32, false),
2989 Field::new("values", DataType::Int32, true),
2990 false,
2991 true,
2992 );
2993
2994 let list_array = create_random_array(&list_field, 100, 0.0, 0.0).unwrap();
2995 let map_array = create_random_array(&map_field, 100, 0.0, 0.0).unwrap();
2996
2997 let arrow_schema = Arc::new(Schema::new(vec![list_field, map_field]));
2998
2999 let props = Some(WriterProperties::builder().set_coerce_types(true).build());
3001 let file = tempfile::tempfile().unwrap();
3002 let mut writer =
3003 ArrowWriter::try_new(file.try_clone().unwrap(), arrow_schema.clone(), props).unwrap();
3004
3005 let batch = RecordBatch::try_new(arrow_schema, vec![list_array, map_array]).unwrap();
3006 writer.write(&batch).unwrap();
3007 let file_metadata = writer.close().unwrap();
3008
3009 assert_eq!(file_metadata.schema[3].name, "element");
3011 assert_eq!(file_metadata.schema[5].name, "key_value");
3013 assert_eq!(file_metadata.schema[6].name, "key");
3015 assert_eq!(file_metadata.schema[7].name, "value");
3017
3018 let reader = SerializedFileReader::new(file).unwrap();
3020 let file_schema = reader.metadata().file_metadata().schema();
3021 let fields = file_schema.get_fields();
3022 let list_field = &fields[0].get_fields()[0];
3023 assert_eq!(list_field.get_fields()[0].name(), "element");
3024 let map_field = &fields[1].get_fields()[0];
3025 assert_eq!(map_field.name(), "key_value");
3026 assert_eq!(map_field.get_fields()[0].name(), "key");
3027 assert_eq!(map_field.get_fields()[1].name(), "value");
3028 }
3029
3030 #[test]
3031 fn fallback_flush_data_page() {
3032 let raw_values: Vec<_> = (0..MEDIUM_SIZE).map(|i| i.to_string()).collect();
3034 let values = Arc::new(StringArray::from(raw_values));
3035 let encodings = vec![
3036 Encoding::DELTA_BYTE_ARRAY,
3037 Encoding::DELTA_LENGTH_BYTE_ARRAY,
3038 ];
3039 let data_type = values.data_type().clone();
3040 let schema = Arc::new(Schema::new(vec![Field::new("col", data_type, false)]));
3041 let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3042
3043 let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
3044 let data_page_size_limit: usize = 32;
3045 let write_batch_size: usize = 16;
3046
3047 for encoding in &encodings {
3048 for row_group_size in row_group_sizes {
3049 let props = WriterProperties::builder()
3050 .set_writer_version(WriterVersion::PARQUET_2_0)
3051 .set_max_row_group_size(row_group_size)
3052 .set_dictionary_enabled(false)
3053 .set_encoding(*encoding)
3054 .set_data_page_size_limit(data_page_size_limit)
3055 .set_write_batch_size(write_batch_size)
3056 .build();
3057
3058 roundtrip_opts_with_array_validation(&expected_batch, props, |a, b| {
3059 let string_array_a = StringArray::from(a.clone());
3060 let string_array_b = StringArray::from(b.clone());
3061 let vec_a: Vec<&str> = string_array_a.iter().map(|v| v.unwrap()).collect();
3062 let vec_b: Vec<&str> = string_array_b.iter().map(|v| v.unwrap()).collect();
3063 assert_eq!(
3064 vec_a, vec_b,
3065 "failed for encoder: {encoding:?} and row_group_size: {row_group_size:?}"
3066 );
3067 });
3068 }
3069 }
3070 }
3071
3072 #[test]
3073 fn arrow_writer_string_dictionary() {
3074 #[allow(deprecated)]
3076 let schema = Arc::new(Schema::new(vec![Field::new_dict(
3077 "dictionary",
3078 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3079 true,
3080 42,
3081 true,
3082 )]));
3083
3084 let d: Int32DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
3086 .iter()
3087 .copied()
3088 .collect();
3089
3090 one_column_roundtrip_with_schema(Arc::new(d), schema);
3092 }
3093
3094 #[test]
3095 fn arrow_writer_dict_and_native_compatibility() {
3096 let schema = Arc::new(Schema::new(vec![Field::new(
3097 "a",
3098 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3099 false,
3100 )]));
3101
3102 let rb1 = RecordBatch::try_new(
3103 schema.clone(),
3104 vec![Arc::new(DictionaryArray::new(
3105 UInt8Array::from_iter_values(vec![0, 1, 0]),
3106 Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3107 ))],
3108 )
3109 .unwrap();
3110
3111 let file = tempfile().unwrap();
3112 let mut writer =
3113 ArrowWriter::try_new(file.try_clone().unwrap(), rb1.schema(), None).unwrap();
3114 writer.write(&rb1).unwrap();
3115
3116 let schema2 = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, false)]));
3119 let rb2 = RecordBatch::try_new(
3120 schema2,
3121 vec![Arc::new(StringArray::from_iter_values(vec![
3122 "barquet", "curious",
3123 ]))],
3124 )
3125 .unwrap();
3126 writer.write(&rb2).unwrap();
3127
3128 writer.close().unwrap();
3129
3130 let mut record_batch_reader =
3131 ParquetRecordBatchReader::try_new(file.try_clone().unwrap(), 1024).unwrap();
3132 let actual_batch = record_batch_reader.next().unwrap().unwrap();
3133
3134 let expected_batch = RecordBatch::try_new(
3135 schema,
3136 vec![Arc::new(DictionaryArray::new(
3137 UInt8Array::from_iter_values(vec![0, 1, 0, 1, 2]),
3138 Arc::new(StringArray::from_iter_values(vec![
3139 "parquet", "barquet", "curious",
3140 ])),
3141 ))],
3142 )
3143 .unwrap();
3144
3145 assert_eq!(actual_batch, expected_batch)
3146 }
3147
3148 #[test]
3149 fn arrow_writer_native_and_dict_compatibility() {
3150 let schema1 = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, false)]));
3151 let rb1 = RecordBatch::try_new(
3152 schema1.clone(),
3153 vec![Arc::new(StringArray::from_iter_values(vec![
3154 "parquet", "barquet",
3155 ]))],
3156 )
3157 .unwrap();
3158
3159 let file = tempfile().unwrap();
3160 let mut writer =
3161 ArrowWriter::try_new(file.try_clone().unwrap(), rb1.schema(), None).unwrap();
3162 writer.write(&rb1).unwrap();
3163
3164 let schema2 = Arc::new(Schema::new(vec![Field::new(
3165 "a",
3166 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3167 false,
3168 )]));
3169
3170 let rb2 = RecordBatch::try_new(
3171 schema2.clone(),
3172 vec![Arc::new(DictionaryArray::new(
3173 UInt8Array::from_iter_values(vec![0, 1, 0]),
3174 Arc::new(StringArray::from_iter_values(vec!["barquet", "curious"])),
3175 ))],
3176 )
3177 .unwrap();
3178 writer.write(&rb2).unwrap();
3179
3180 writer.close().unwrap();
3181
3182 let mut record_batch_reader =
3183 ParquetRecordBatchReader::try_new(file.try_clone().unwrap(), 1024).unwrap();
3184 let actual_batch = record_batch_reader.next().unwrap().unwrap();
3185
3186 let expected_batch = RecordBatch::try_new(
3187 schema1,
3188 vec![Arc::new(StringArray::from_iter_values(vec![
3189 "parquet", "barquet", "barquet", "curious", "barquet",
3190 ]))],
3191 )
3192 .unwrap();
3193
3194 assert_eq!(actual_batch, expected_batch)
3195 }
3196
3197 #[test]
3198 fn arrow_writer_primitive_dictionary() {
3199 #[allow(deprecated)]
3201 let schema = Arc::new(Schema::new(vec![Field::new_dict(
3202 "dictionary",
3203 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::UInt32)),
3204 true,
3205 42,
3206 true,
3207 )]));
3208
3209 let mut builder = PrimitiveDictionaryBuilder::<UInt8Type, UInt32Type>::new();
3211 builder.append(12345678).unwrap();
3212 builder.append_null();
3213 builder.append(22345678).unwrap();
3214 builder.append(12345678).unwrap();
3215 let d = builder.finish();
3216
3217 one_column_roundtrip_with_schema(Arc::new(d), schema);
3218 }
3219
3220 #[test]
3221 fn arrow_writer_decimal32_dictionary() {
3222 let integers = vec![12345, 56789, 34567];
3223
3224 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3225
3226 let values = Decimal32Array::from(integers.clone())
3227 .with_precision_and_scale(5, 2)
3228 .unwrap();
3229
3230 let array = DictionaryArray::new(keys, Arc::new(values));
3231 one_column_roundtrip(Arc::new(array.clone()), true);
3232
3233 let values = Decimal32Array::from(integers)
3234 .with_precision_and_scale(9, 2)
3235 .unwrap();
3236
3237 let array = array.with_values(Arc::new(values));
3238 one_column_roundtrip(Arc::new(array), true);
3239 }
3240
3241 #[test]
3242 fn arrow_writer_decimal64_dictionary() {
3243 let integers = vec![12345, 56789, 34567];
3244
3245 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3246
3247 let values = Decimal64Array::from(integers.clone())
3248 .with_precision_and_scale(5, 2)
3249 .unwrap();
3250
3251 let array = DictionaryArray::new(keys, Arc::new(values));
3252 one_column_roundtrip(Arc::new(array.clone()), true);
3253
3254 let values = Decimal64Array::from(integers)
3255 .with_precision_and_scale(12, 2)
3256 .unwrap();
3257
3258 let array = array.with_values(Arc::new(values));
3259 one_column_roundtrip(Arc::new(array), true);
3260 }
3261
3262 #[test]
3263 fn arrow_writer_decimal128_dictionary() {
3264 let integers = vec![12345, 56789, 34567];
3265
3266 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3267
3268 let values = Decimal128Array::from(integers.clone())
3269 .with_precision_and_scale(5, 2)
3270 .unwrap();
3271
3272 let array = DictionaryArray::new(keys, Arc::new(values));
3273 one_column_roundtrip(Arc::new(array.clone()), true);
3274
3275 let values = Decimal128Array::from(integers)
3276 .with_precision_and_scale(12, 2)
3277 .unwrap();
3278
3279 let array = array.with_values(Arc::new(values));
3280 one_column_roundtrip(Arc::new(array), true);
3281 }
3282
3283 #[test]
3284 fn arrow_writer_decimal256_dictionary() {
3285 let integers = vec![
3286 i256::from_i128(12345),
3287 i256::from_i128(56789),
3288 i256::from_i128(34567),
3289 ];
3290
3291 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3292
3293 let values = Decimal256Array::from(integers.clone())
3294 .with_precision_and_scale(5, 2)
3295 .unwrap();
3296
3297 let array = DictionaryArray::new(keys, Arc::new(values));
3298 one_column_roundtrip(Arc::new(array.clone()), true);
3299
3300 let values = Decimal256Array::from(integers)
3301 .with_precision_and_scale(12, 2)
3302 .unwrap();
3303
3304 let array = array.with_values(Arc::new(values));
3305 one_column_roundtrip(Arc::new(array), true);
3306 }
3307
3308 #[test]
3309 fn arrow_writer_string_dictionary_unsigned_index() {
3310 #[allow(deprecated)]
3312 let schema = Arc::new(Schema::new(vec![Field::new_dict(
3313 "dictionary",
3314 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3315 true,
3316 42,
3317 true,
3318 )]));
3319
3320 let d: UInt8DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
3322 .iter()
3323 .copied()
3324 .collect();
3325
3326 one_column_roundtrip_with_schema(Arc::new(d), schema);
3327 }
3328
3329 #[test]
3330 fn u32_min_max() {
3331 let src = [
3333 u32::MIN,
3334 u32::MIN + 1,
3335 (i32::MAX as u32) - 1,
3336 i32::MAX as u32,
3337 (i32::MAX as u32) + 1,
3338 u32::MAX - 1,
3339 u32::MAX,
3340 ];
3341 let values = Arc::new(UInt32Array::from_iter_values(src.iter().cloned()));
3342 let files = one_column_roundtrip(values, false);
3343
3344 for file in files {
3345 let reader = SerializedFileReader::new(file).unwrap();
3347 let metadata = reader.metadata();
3348
3349 let mut row_offset = 0;
3350 for row_group in metadata.row_groups() {
3351 assert_eq!(row_group.num_columns(), 1);
3352 let column = row_group.column(0);
3353
3354 let num_values = column.num_values() as usize;
3355 let src_slice = &src[row_offset..row_offset + num_values];
3356 row_offset += column.num_values() as usize;
3357
3358 let stats = column.statistics().unwrap();
3359 if let Statistics::Int32(stats) = stats {
3360 assert_eq!(
3361 *stats.min_opt().unwrap() as u32,
3362 *src_slice.iter().min().unwrap()
3363 );
3364 assert_eq!(
3365 *stats.max_opt().unwrap() as u32,
3366 *src_slice.iter().max().unwrap()
3367 );
3368 } else {
3369 panic!("Statistics::Int32 missing")
3370 }
3371 }
3372 }
3373 }
3374
3375 #[test]
3376 fn u64_min_max() {
3377 let src = [
3379 u64::MIN,
3380 u64::MIN + 1,
3381 (i64::MAX as u64) - 1,
3382 i64::MAX as u64,
3383 (i64::MAX as u64) + 1,
3384 u64::MAX - 1,
3385 u64::MAX,
3386 ];
3387 let values = Arc::new(UInt64Array::from_iter_values(src.iter().cloned()));
3388 let files = one_column_roundtrip(values, false);
3389
3390 for file in files {
3391 let reader = SerializedFileReader::new(file).unwrap();
3393 let metadata = reader.metadata();
3394
3395 let mut row_offset = 0;
3396 for row_group in metadata.row_groups() {
3397 assert_eq!(row_group.num_columns(), 1);
3398 let column = row_group.column(0);
3399
3400 let num_values = column.num_values() as usize;
3401 let src_slice = &src[row_offset..row_offset + num_values];
3402 row_offset += column.num_values() as usize;
3403
3404 let stats = column.statistics().unwrap();
3405 if let Statistics::Int64(stats) = stats {
3406 assert_eq!(
3407 *stats.min_opt().unwrap() as u64,
3408 *src_slice.iter().min().unwrap()
3409 );
3410 assert_eq!(
3411 *stats.max_opt().unwrap() as u64,
3412 *src_slice.iter().max().unwrap()
3413 );
3414 } else {
3415 panic!("Statistics::Int64 missing")
3416 }
3417 }
3418 }
3419 }
3420
3421 #[test]
3422 fn statistics_null_counts_only_nulls() {
3423 let values = Arc::new(UInt64Array::from(vec![None, None]));
3425 let files = one_column_roundtrip(values, true);
3426
3427 for file in files {
3428 let reader = SerializedFileReader::new(file).unwrap();
3430 let metadata = reader.metadata();
3431 assert_eq!(metadata.num_row_groups(), 1);
3432 let row_group = metadata.row_group(0);
3433 assert_eq!(row_group.num_columns(), 1);
3434 let column = row_group.column(0);
3435 let stats = column.statistics().unwrap();
3436 assert_eq!(stats.null_count_opt(), Some(2));
3437 }
3438 }
3439
3440 #[test]
3441 fn test_list_of_struct_roundtrip() {
3442 let int_field = Field::new("a", DataType::Int32, true);
3444 let int_field2 = Field::new("b", DataType::Int32, true);
3445
3446 let int_builder = Int32Builder::with_capacity(10);
3447 let int_builder2 = Int32Builder::with_capacity(10);
3448
3449 let struct_builder = StructBuilder::new(
3450 vec![int_field, int_field2],
3451 vec![Box::new(int_builder), Box::new(int_builder2)],
3452 );
3453 let mut list_builder = ListBuilder::new(struct_builder);
3454
3455 let values = list_builder.values();
3460 values
3461 .field_builder::<Int32Builder>(0)
3462 .unwrap()
3463 .append_value(1);
3464 values
3465 .field_builder::<Int32Builder>(1)
3466 .unwrap()
3467 .append_value(2);
3468 values.append(true);
3469 list_builder.append(true);
3470
3471 list_builder.append(true);
3473
3474 list_builder.append(false);
3476
3477 let values = list_builder.values();
3479 values
3480 .field_builder::<Int32Builder>(0)
3481 .unwrap()
3482 .append_null();
3483 values
3484 .field_builder::<Int32Builder>(1)
3485 .unwrap()
3486 .append_null();
3487 values.append(false);
3488 values
3489 .field_builder::<Int32Builder>(0)
3490 .unwrap()
3491 .append_null();
3492 values
3493 .field_builder::<Int32Builder>(1)
3494 .unwrap()
3495 .append_null();
3496 values.append(false);
3497 list_builder.append(true);
3498
3499 let values = list_builder.values();
3501 values
3502 .field_builder::<Int32Builder>(0)
3503 .unwrap()
3504 .append_null();
3505 values
3506 .field_builder::<Int32Builder>(1)
3507 .unwrap()
3508 .append_value(3);
3509 values.append(true);
3510 list_builder.append(true);
3511
3512 let values = list_builder.values();
3514 values
3515 .field_builder::<Int32Builder>(0)
3516 .unwrap()
3517 .append_value(2);
3518 values
3519 .field_builder::<Int32Builder>(1)
3520 .unwrap()
3521 .append_null();
3522 values.append(true);
3523 list_builder.append(true);
3524
3525 let array = Arc::new(list_builder.finish());
3526
3527 one_column_roundtrip(array, true);
3528 }
3529
3530 fn row_group_sizes(metadata: &ParquetMetaData) -> Vec<i64> {
3531 metadata.row_groups().iter().map(|x| x.num_rows()).collect()
3532 }
3533
3534 #[test]
3535 fn test_aggregates_records() {
3536 let arrays = [
3537 Int32Array::from((0..100).collect::<Vec<_>>()),
3538 Int32Array::from((0..50).collect::<Vec<_>>()),
3539 Int32Array::from((200..500).collect::<Vec<_>>()),
3540 ];
3541
3542 let schema = Arc::new(Schema::new(vec![Field::new(
3543 "int",
3544 ArrowDataType::Int32,
3545 false,
3546 )]));
3547
3548 let file = tempfile::tempfile().unwrap();
3549
3550 let props = WriterProperties::builder()
3551 .set_max_row_group_size(200)
3552 .build();
3553
3554 let mut writer =
3555 ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
3556
3557 for array in arrays {
3558 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
3559 writer.write(&batch).unwrap();
3560 }
3561
3562 writer.close().unwrap();
3563
3564 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3565 assert_eq!(&row_group_sizes(builder.metadata()), &[200, 200, 50]);
3566
3567 let batches = builder
3568 .with_batch_size(100)
3569 .build()
3570 .unwrap()
3571 .collect::<ArrowResult<Vec<_>>>()
3572 .unwrap();
3573
3574 assert_eq!(batches.len(), 5);
3575 assert!(batches.iter().all(|x| x.num_columns() == 1));
3576
3577 let batch_sizes: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
3578
3579 assert_eq!(&batch_sizes, &[100, 100, 100, 100, 50]);
3580
3581 let values: Vec<_> = batches
3582 .iter()
3583 .flat_map(|x| {
3584 x.column(0)
3585 .as_any()
3586 .downcast_ref::<Int32Array>()
3587 .unwrap()
3588 .values()
3589 .iter()
3590 .cloned()
3591 })
3592 .collect();
3593
3594 let expected_values: Vec<_> = [0..100, 0..50, 200..500].into_iter().flatten().collect();
3595 assert_eq!(&values, &expected_values)
3596 }
3597
3598 #[test]
3599 fn complex_aggregate() {
3600 let field_a = Arc::new(Field::new("leaf_a", DataType::Int32, false));
3602 let field_b = Arc::new(Field::new("leaf_b", DataType::Int32, true));
3603 let struct_a = Arc::new(Field::new(
3604 "struct_a",
3605 DataType::Struct(vec![field_a.clone(), field_b.clone()].into()),
3606 true,
3607 ));
3608
3609 let list_a = Arc::new(Field::new("list", DataType::List(struct_a), true));
3610 let struct_b = Arc::new(Field::new(
3611 "struct_b",
3612 DataType::Struct(vec![list_a.clone()].into()),
3613 false,
3614 ));
3615
3616 let schema = Arc::new(Schema::new(vec![struct_b]));
3617
3618 let field_a_array = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
3620 let field_b_array =
3621 Int32Array::from_iter(vec![Some(1), None, Some(2), None, None, Some(6)]);
3622
3623 let struct_a_array = StructArray::from(vec![
3624 (field_a.clone(), Arc::new(field_a_array) as ArrayRef),
3625 (field_b.clone(), Arc::new(field_b_array) as ArrayRef),
3626 ]);
3627
3628 let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
3629 .len(5)
3630 .add_buffer(Buffer::from_iter(vec![
3631 0_i32, 1_i32, 1_i32, 3_i32, 3_i32, 5_i32,
3632 ]))
3633 .null_bit_buffer(Some(Buffer::from_iter(vec![
3634 true, false, true, false, true,
3635 ])))
3636 .child_data(vec![struct_a_array.into_data()])
3637 .build()
3638 .unwrap();
3639
3640 let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
3641 let struct_b_array = StructArray::from(vec![(list_a.clone(), list_a_array)]);
3642
3643 let batch1 =
3644 RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
3645 .unwrap();
3646
3647 let field_a_array = Int32Array::from(vec![6, 7, 8, 9, 10]);
3648 let field_b_array = Int32Array::from_iter(vec![None, None, None, Some(1), None]);
3649
3650 let struct_a_array = StructArray::from(vec![
3651 (field_a, Arc::new(field_a_array) as ArrayRef),
3652 (field_b, Arc::new(field_b_array) as ArrayRef),
3653 ]);
3654
3655 let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
3656 .len(2)
3657 .add_buffer(Buffer::from_iter(vec![0_i32, 4_i32, 5_i32]))
3658 .child_data(vec![struct_a_array.into_data()])
3659 .build()
3660 .unwrap();
3661
3662 let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
3663 let struct_b_array = StructArray::from(vec![(list_a, list_a_array)]);
3664
3665 let batch2 =
3666 RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
3667 .unwrap();
3668
3669 let batches = &[batch1, batch2];
3670
3671 let expected = r#"
3674 +-------------------------------------------------------------------------------------------------------+
3675 | struct_b |
3676 +-------------------------------------------------------------------------------------------------------+
3677 | {list: [{leaf_a: 1, leaf_b: 1}]} |
3678 | {list: } |
3679 | {list: [{leaf_a: 2, leaf_b: }, {leaf_a: 3, leaf_b: 2}]} |
3680 | {list: } |
3681 | {list: [{leaf_a: 4, leaf_b: }, {leaf_a: 5, leaf_b: }]} |
3682 | {list: [{leaf_a: 6, leaf_b: }, {leaf_a: 7, leaf_b: }, {leaf_a: 8, leaf_b: }, {leaf_a: 9, leaf_b: 1}]} |
3683 | {list: [{leaf_a: 10, leaf_b: }]} |
3684 +-------------------------------------------------------------------------------------------------------+
3685 "#.trim().split('\n').map(|x| x.trim()).collect::<Vec<_>>().join("\n");
3686
3687 let actual = pretty_format_batches(batches).unwrap().to_string();
3688 assert_eq!(actual, expected);
3689
3690 let file = tempfile::tempfile().unwrap();
3692 let props = WriterProperties::builder()
3693 .set_max_row_group_size(6)
3694 .build();
3695
3696 let mut writer =
3697 ArrowWriter::try_new(file.try_clone().unwrap(), schema, Some(props)).unwrap();
3698
3699 for batch in batches {
3700 writer.write(batch).unwrap();
3701 }
3702 writer.close().unwrap();
3703
3704 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3709 assert_eq!(&row_group_sizes(builder.metadata()), &[6, 1]);
3710
3711 let batches = builder
3712 .with_batch_size(2)
3713 .build()
3714 .unwrap()
3715 .collect::<ArrowResult<Vec<_>>>()
3716 .unwrap();
3717
3718 assert_eq!(batches.len(), 4);
3719 let batch_counts: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
3720 assert_eq!(&batch_counts, &[2, 2, 2, 1]);
3721
3722 let actual = pretty_format_batches(&batches).unwrap().to_string();
3723 assert_eq!(actual, expected);
3724 }
3725
3726 #[test]
3727 fn test_arrow_writer_metadata() {
3728 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3729 let file_schema = batch_schema.clone().with_metadata(
3730 vec![("foo".to_string(), "bar".to_string())]
3731 .into_iter()
3732 .collect(),
3733 );
3734
3735 let batch = RecordBatch::try_new(
3736 Arc::new(batch_schema),
3737 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3738 )
3739 .unwrap();
3740
3741 let mut buf = Vec::with_capacity(1024);
3742 let mut writer = ArrowWriter::try_new(&mut buf, Arc::new(file_schema), None).unwrap();
3743 writer.write(&batch).unwrap();
3744 writer.close().unwrap();
3745 }
3746
3747 #[test]
3748 fn test_arrow_writer_nullable() {
3749 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3750 let file_schema = Schema::new(vec![Field::new("int32", DataType::Int32, true)]);
3751 let file_schema = Arc::new(file_schema);
3752
3753 let batch = RecordBatch::try_new(
3754 Arc::new(batch_schema),
3755 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3756 )
3757 .unwrap();
3758
3759 let mut buf = Vec::with_capacity(1024);
3760 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
3761 writer.write(&batch).unwrap();
3762 writer.close().unwrap();
3763
3764 let mut read = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
3765 let back = read.next().unwrap().unwrap();
3766 assert_eq!(back.schema(), file_schema);
3767 assert_ne!(back.schema(), batch.schema());
3768 assert_eq!(back.column(0).as_ref(), batch.column(0).as_ref());
3769 }
3770
3771 #[test]
3772 fn in_progress_accounting() {
3773 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
3775
3776 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
3778
3779 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
3781
3782 let mut writer = ArrowWriter::try_new(vec![], batch.schema(), None).unwrap();
3783
3784 assert_eq!(writer.in_progress_size(), 0);
3786 assert_eq!(writer.in_progress_rows(), 0);
3787 assert_eq!(writer.memory_size(), 0);
3788 assert_eq!(writer.bytes_written(), 4); writer.write(&batch).unwrap();
3790
3791 let initial_size = writer.in_progress_size();
3793 assert!(initial_size > 0);
3794 assert_eq!(writer.in_progress_rows(), 5);
3795 let initial_memory = writer.memory_size();
3796 assert!(initial_memory > 0);
3797 assert!(
3799 initial_size <= initial_memory,
3800 "{initial_size} <= {initial_memory}"
3801 );
3802
3803 writer.write(&batch).unwrap();
3805 assert!(writer.in_progress_size() > initial_size);
3806 assert_eq!(writer.in_progress_rows(), 10);
3807 assert!(writer.memory_size() > initial_memory);
3808 assert!(
3809 writer.in_progress_size() <= writer.memory_size(),
3810 "in_progress_size {} <= memory_size {}",
3811 writer.in_progress_size(),
3812 writer.memory_size()
3813 );
3814
3815 let pre_flush_bytes_written = writer.bytes_written();
3817 writer.flush().unwrap();
3818 assert_eq!(writer.in_progress_size(), 0);
3819 assert_eq!(writer.memory_size(), 0);
3820 assert!(writer.bytes_written() > pre_flush_bytes_written);
3821
3822 writer.close().unwrap();
3823 }
3824
3825 #[test]
3826 fn test_writer_all_null() {
3827 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
3828 let b = Int32Array::new(vec![0; 5].into(), Some(NullBuffer::new_null(5)));
3829 let batch = RecordBatch::try_from_iter(vec![
3830 ("a", Arc::new(a) as ArrayRef),
3831 ("b", Arc::new(b) as ArrayRef),
3832 ])
3833 .unwrap();
3834
3835 let mut buf = Vec::with_capacity(1024);
3836 let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), None).unwrap();
3837 writer.write(&batch).unwrap();
3838 writer.close().unwrap();
3839
3840 let bytes = Bytes::from(buf);
3841 let options = ReadOptionsBuilder::new().with_page_index().build();
3842 let reader = SerializedFileReader::new_with_options(bytes, options).unwrap();
3843 let index = reader.metadata().offset_index().unwrap();
3844
3845 assert_eq!(index.len(), 1);
3846 assert_eq!(index[0].len(), 2); assert_eq!(index[0][0].page_locations().len(), 1); assert_eq!(index[0][1].page_locations().len(), 1); }
3850
3851 #[test]
3852 fn test_disabled_statistics_with_page() {
3853 let file_schema = Schema::new(vec![
3854 Field::new("a", DataType::Utf8, true),
3855 Field::new("b", DataType::Utf8, true),
3856 ]);
3857 let file_schema = Arc::new(file_schema);
3858
3859 let batch = RecordBatch::try_new(
3860 file_schema.clone(),
3861 vec![
3862 Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
3863 Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
3864 ],
3865 )
3866 .unwrap();
3867
3868 let props = WriterProperties::builder()
3869 .set_statistics_enabled(EnabledStatistics::None)
3870 .set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
3871 .build();
3872
3873 let mut buf = Vec::with_capacity(1024);
3874 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
3875 writer.write(&batch).unwrap();
3876
3877 let metadata = writer.close().unwrap();
3878 assert_eq!(metadata.row_groups.len(), 1);
3879 let row_group = &metadata.row_groups[0];
3880 assert_eq!(row_group.columns.len(), 2);
3881 assert!(row_group.columns[0].offset_index_offset.is_some());
3883 assert!(row_group.columns[0].column_index_offset.is_some());
3884 assert!(row_group.columns[1].offset_index_offset.is_some());
3886 assert!(row_group.columns[1].column_index_offset.is_none());
3887
3888 let options = ReadOptionsBuilder::new().with_page_index().build();
3889 let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
3890
3891 let row_group = reader.get_row_group(0).unwrap();
3892 let a_col = row_group.metadata().column(0);
3893 let b_col = row_group.metadata().column(1);
3894
3895 if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
3897 let min = byte_array_stats.min_opt().unwrap();
3898 let max = byte_array_stats.max_opt().unwrap();
3899
3900 assert_eq!(min.as_bytes(), b"a");
3901 assert_eq!(max.as_bytes(), b"d");
3902 } else {
3903 panic!("expecting Statistics::ByteArray");
3904 }
3905
3906 assert!(b_col.statistics().is_none());
3908
3909 let offset_index = reader.metadata().offset_index().unwrap();
3910 assert_eq!(offset_index.len(), 1); assert_eq!(offset_index[0].len(), 2); let column_index = reader.metadata().column_index().unwrap();
3914 assert_eq!(column_index.len(), 1); assert_eq!(column_index[0].len(), 2); let a_idx = &column_index[0][0];
3918 assert!(matches!(a_idx, Index::BYTE_ARRAY(_)), "{a_idx:?}");
3919 let b_idx = &column_index[0][1];
3920 assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
3921 }
3922
3923 #[test]
3924 fn test_disabled_statistics_with_chunk() {
3925 let file_schema = Schema::new(vec![
3926 Field::new("a", DataType::Utf8, true),
3927 Field::new("b", DataType::Utf8, true),
3928 ]);
3929 let file_schema = Arc::new(file_schema);
3930
3931 let batch = RecordBatch::try_new(
3932 file_schema.clone(),
3933 vec![
3934 Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
3935 Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
3936 ],
3937 )
3938 .unwrap();
3939
3940 let props = WriterProperties::builder()
3941 .set_statistics_enabled(EnabledStatistics::None)
3942 .set_column_statistics_enabled("a".into(), EnabledStatistics::Chunk)
3943 .build();
3944
3945 let mut buf = Vec::with_capacity(1024);
3946 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
3947 writer.write(&batch).unwrap();
3948
3949 let metadata = writer.close().unwrap();
3950 assert_eq!(metadata.row_groups.len(), 1);
3951 let row_group = &metadata.row_groups[0];
3952 assert_eq!(row_group.columns.len(), 2);
3953 assert!(row_group.columns[0].offset_index_offset.is_some());
3955 assert!(row_group.columns[0].column_index_offset.is_none());
3956 assert!(row_group.columns[1].offset_index_offset.is_some());
3958 assert!(row_group.columns[1].column_index_offset.is_none());
3959
3960 let options = ReadOptionsBuilder::new().with_page_index().build();
3961 let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
3962
3963 let row_group = reader.get_row_group(0).unwrap();
3964 let a_col = row_group.metadata().column(0);
3965 let b_col = row_group.metadata().column(1);
3966
3967 if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
3969 let min = byte_array_stats.min_opt().unwrap();
3970 let max = byte_array_stats.max_opt().unwrap();
3971
3972 assert_eq!(min.as_bytes(), b"a");
3973 assert_eq!(max.as_bytes(), b"d");
3974 } else {
3975 panic!("expecting Statistics::ByteArray");
3976 }
3977
3978 assert!(b_col.statistics().is_none());
3980
3981 let column_index = reader.metadata().column_index().unwrap();
3982 assert_eq!(column_index.len(), 1); assert_eq!(column_index[0].len(), 2); let a_idx = &column_index[0][0];
3986 assert!(matches!(a_idx, Index::NONE), "{a_idx:?}");
3987 let b_idx = &column_index[0][1];
3988 assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
3989 }
3990
3991 #[test]
3992 fn test_arrow_writer_skip_metadata() {
3993 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3994 let file_schema = Arc::new(batch_schema.clone());
3995
3996 let batch = RecordBatch::try_new(
3997 Arc::new(batch_schema),
3998 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3999 )
4000 .unwrap();
4001 let skip_options = ArrowWriterOptions::new().with_skip_arrow_metadata(true);
4002
4003 let mut buf = Vec::with_capacity(1024);
4004 let mut writer =
4005 ArrowWriter::try_new_with_options(&mut buf, file_schema.clone(), skip_options).unwrap();
4006 writer.write(&batch).unwrap();
4007 writer.close().unwrap();
4008
4009 let bytes = Bytes::from(buf);
4010 let reader_builder = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
4011 assert_eq!(file_schema, *reader_builder.schema());
4012 if let Some(key_value_metadata) = reader_builder
4013 .metadata()
4014 .file_metadata()
4015 .key_value_metadata()
4016 {
4017 assert!(!key_value_metadata
4018 .iter()
4019 .any(|kv| kv.key.as_str() == ARROW_SCHEMA_META_KEY));
4020 }
4021 }
4022
4023 #[test]
4024 fn mismatched_schemas() {
4025 let batch_schema = Schema::new(vec![Field::new("count", DataType::Int32, false)]);
4026 let file_schema = Arc::new(Schema::new(vec![Field::new(
4027 "temperature",
4028 DataType::Float64,
4029 false,
4030 )]));
4031
4032 let batch = RecordBatch::try_new(
4033 Arc::new(batch_schema),
4034 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4035 )
4036 .unwrap();
4037
4038 let mut buf = Vec::with_capacity(1024);
4039 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
4040
4041 let err = writer.write(&batch).unwrap_err().to_string();
4042 assert_eq!(
4043 err,
4044 "Arrow: Incompatible type. Field 'temperature' has type Float64, array has type Int32"
4045 );
4046 }
4047
4048 #[test]
4049 fn test_roundtrip_empty_schema() {
4051 let empty_batch = RecordBatch::try_new_with_options(
4053 Arc::new(Schema::empty()),
4054 vec![],
4055 &RecordBatchOptions::default().with_row_count(Some(0)),
4056 )
4057 .unwrap();
4058
4059 let mut parquet_bytes: Vec<u8> = Vec::new();
4061 let mut writer =
4062 ArrowWriter::try_new(&mut parquet_bytes, empty_batch.schema(), None).unwrap();
4063 writer.write(&empty_batch).unwrap();
4064 writer.close().unwrap();
4065
4066 let bytes = Bytes::from(parquet_bytes);
4068 let reader = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
4069 assert_eq!(reader.schema(), &empty_batch.schema());
4070 let batches: Vec<_> = reader
4071 .build()
4072 .unwrap()
4073 .collect::<ArrowResult<Vec<_>>>()
4074 .unwrap();
4075 assert_eq!(batches.len(), 0);
4076 }
4077
4078 #[test]
4079 fn test_page_stats_not_written_by_default() {
4080 let string_field = Field::new("a", DataType::Utf8, false);
4081 let schema = Schema::new(vec![string_field]);
4082 let raw_string_values = vec!["Blart Versenwald III"];
4083 let string_values = StringArray::from(raw_string_values.clone());
4084 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
4085
4086 let props = WriterProperties::builder()
4087 .set_statistics_enabled(EnabledStatistics::Page)
4088 .set_dictionary_enabled(false)
4089 .set_encoding(Encoding::PLAIN)
4090 .set_compression(crate::basic::Compression::UNCOMPRESSED)
4091 .build();
4092
4093 let mut file = roundtrip_opts(&batch, props);
4094
4095 let mut buf = vec![];
4098 file.seek(std::io::SeekFrom::Start(0)).unwrap();
4099 let read = file.read_to_end(&mut buf).unwrap();
4100 assert!(read > 0);
4101
4102 let first_page = &buf[4..];
4104 let mut prot = TCompactSliceInputProtocol::new(first_page);
4105 let hdr = PageHeader::read_from_in_protocol(&mut prot).unwrap();
4106 let stats = hdr.data_page_header.unwrap().statistics;
4107
4108 assert!(stats.is_none());
4109 }
4110
4111 #[test]
4112 fn test_page_stats_when_enabled() {
4113 let string_field = Field::new("a", DataType::Utf8, false);
4114 let schema = Schema::new(vec![string_field]);
4115 let raw_string_values = vec!["Blart Versenwald III", "Andrew Lamb"];
4116 let string_values = StringArray::from(raw_string_values.clone());
4117 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
4118
4119 let props = WriterProperties::builder()
4120 .set_statistics_enabled(EnabledStatistics::Page)
4121 .set_dictionary_enabled(false)
4122 .set_encoding(Encoding::PLAIN)
4123 .set_write_page_header_statistics(true)
4124 .set_compression(crate::basic::Compression::UNCOMPRESSED)
4125 .build();
4126
4127 let mut file = roundtrip_opts(&batch, props);
4128
4129 let mut buf = vec![];
4132 file.seek(std::io::SeekFrom::Start(0)).unwrap();
4133 let read = file.read_to_end(&mut buf).unwrap();
4134 assert!(read > 0);
4135
4136 let first_page = &buf[4..];
4138 let mut prot = TCompactSliceInputProtocol::new(first_page);
4139 let hdr = PageHeader::read_from_in_protocol(&mut prot).unwrap();
4140 let stats = hdr.data_page_header.unwrap().statistics;
4141
4142 let stats = stats.unwrap();
4143 assert!(stats.is_max_value_exact.unwrap());
4145 assert!(stats.is_min_value_exact.unwrap());
4146 assert_eq!(stats.max_value.unwrap(), "Blart Versenwald III".as_bytes());
4147 assert_eq!(stats.min_value.unwrap(), "Andrew Lamb".as_bytes());
4148 }
4149
4150 #[test]
4151 fn test_page_stats_truncation() {
4152 let string_field = Field::new("a", DataType::Utf8, false);
4153 let binary_field = Field::new("b", DataType::Binary, false);
4154 let schema = Schema::new(vec![string_field, binary_field]);
4155
4156 let raw_string_values = vec!["Blart Versenwald III"];
4157 let raw_binary_values = [b"Blart Versenwald III".to_vec()];
4158 let raw_binary_value_refs = raw_binary_values
4159 .iter()
4160 .map(|x| x.as_slice())
4161 .collect::<Vec<_>>();
4162
4163 let string_values = StringArray::from(raw_string_values.clone());
4164 let binary_values = BinaryArray::from(raw_binary_value_refs);
4165 let batch = RecordBatch::try_new(
4166 Arc::new(schema),
4167 vec![Arc::new(string_values), Arc::new(binary_values)],
4168 )
4169 .unwrap();
4170
4171 let props = WriterProperties::builder()
4172 .set_statistics_truncate_length(Some(2))
4173 .set_dictionary_enabled(false)
4174 .set_encoding(Encoding::PLAIN)
4175 .set_write_page_header_statistics(true)
4176 .set_compression(crate::basic::Compression::UNCOMPRESSED)
4177 .build();
4178
4179 let mut file = roundtrip_opts(&batch, props);
4180
4181 let mut buf = vec![];
4184 file.seek(std::io::SeekFrom::Start(0)).unwrap();
4185 let read = file.read_to_end(&mut buf).unwrap();
4186 assert!(read > 0);
4187
4188 let first_page = &buf[4..];
4190 let mut prot = TCompactSliceInputProtocol::new(first_page);
4191 let hdr = PageHeader::read_from_in_protocol(&mut prot).unwrap();
4192 let stats = hdr.data_page_header.unwrap().statistics;
4193 assert!(stats.is_some());
4194 let stats = stats.unwrap();
4195 assert!(!stats.is_max_value_exact.unwrap());
4197 assert!(!stats.is_min_value_exact.unwrap());
4198 assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
4199 assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
4200
4201 let second_page = &prot.as_slice()[hdr.compressed_page_size as usize..];
4203 let mut prot = TCompactSliceInputProtocol::new(second_page);
4204 let hdr = PageHeader::read_from_in_protocol(&mut prot).unwrap();
4205 let stats = hdr.data_page_header.unwrap().statistics;
4206 assert!(stats.is_some());
4207 let stats = stats.unwrap();
4208 assert!(!stats.is_max_value_exact.unwrap());
4210 assert!(!stats.is_min_value_exact.unwrap());
4211 assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
4212 assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
4213 }
4214
4215 #[test]
4216 fn test_page_encoding_statistics_roundtrip() {
4217 let batch_schema = Schema::new(vec![Field::new(
4218 "int32",
4219 arrow_schema::DataType::Int32,
4220 false,
4221 )]);
4222
4223 let batch = RecordBatch::try_new(
4224 Arc::new(batch_schema.clone()),
4225 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4226 )
4227 .unwrap();
4228
4229 let mut file: File = tempfile::tempfile().unwrap();
4230 let mut writer = ArrowWriter::try_new(&mut file, Arc::new(batch_schema), None).unwrap();
4231 writer.write(&batch).unwrap();
4232 let file_metadata = writer.close().unwrap();
4233
4234 assert_eq!(file_metadata.row_groups.len(), 1);
4235 assert_eq!(file_metadata.row_groups[0].columns.len(), 1);
4236 let chunk_meta = file_metadata.row_groups[0].columns[0]
4237 .meta_data
4238 .as_ref()
4239 .expect("column metadata missing");
4240 assert!(chunk_meta.encoding_stats.is_some());
4241 let chunk_page_stats = chunk_meta.encoding_stats.as_ref().unwrap();
4242
4243 let options = ReadOptionsBuilder::new().with_page_index().build();
4245 let reader = SerializedFileReader::new_with_options(file, options).unwrap();
4246
4247 let rowgroup = reader.get_row_group(0).expect("row group missing");
4248 assert_eq!(rowgroup.num_columns(), 1);
4249 let column = rowgroup.metadata().column(0);
4250 assert!(column.page_encoding_stats().is_some());
4251 let file_page_stats = column.page_encoding_stats().unwrap();
4252 let chunk_stats: Vec<PageEncodingStats> = chunk_page_stats
4253 .iter()
4254 .map(|x| crate::file::page_encoding_stats::try_from_thrift(x).unwrap())
4255 .collect();
4256 assert_eq!(&chunk_stats, file_page_stats);
4257 }
4258
4259 #[test]
4260 fn test_different_dict_page_size_limit() {
4261 let array = Arc::new(Int64Array::from_iter(0..1024 * 1024));
4262 let schema = Arc::new(Schema::new(vec![
4263 Field::new("col0", arrow_schema::DataType::Int64, false),
4264 Field::new("col1", arrow_schema::DataType::Int64, false),
4265 ]));
4266 let batch =
4267 arrow_array::RecordBatch::try_new(schema.clone(), vec![array.clone(), array]).unwrap();
4268
4269 let props = WriterProperties::builder()
4270 .set_dictionary_page_size_limit(1024 * 1024)
4271 .set_column_dictionary_page_size_limit(ColumnPath::from("col1"), 1024 * 1024 * 4)
4272 .build();
4273 let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap();
4274 writer.write(&batch).unwrap();
4275 let data = Bytes::from(writer.into_inner().unwrap());
4276
4277 let mut metadata = ParquetMetaDataReader::new();
4278 metadata.try_parse(&data).unwrap();
4279 let metadata = metadata.finish().unwrap();
4280 let col0_meta = metadata.row_group(0).column(0);
4281 let col1_meta = metadata.row_group(0).column(1);
4282
4283 let get_dict_page_size = move |meta: &ColumnChunkMetaData| {
4284 let mut reader =
4285 SerializedPageReader::new(Arc::new(data.clone()), meta, 0, None).unwrap();
4286 let page = reader.get_next_page().unwrap().unwrap();
4287 match page {
4288 Page::DictionaryPage { buf, .. } => buf.len(),
4289 _ => panic!("expected DictionaryPage"),
4290 }
4291 };
4292
4293 assert_eq!(get_dict_page_size(col0_meta), 1024 * 1024);
4294 assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4);
4295 }
4296}