1use bytes::Bytes;
21use std::io::{Read, Write};
22use std::iter::Peekable;
23use std::slice::Iter;
24use std::sync::{Arc, Mutex};
25use std::vec::IntoIter;
26
27use arrow_array::cast::AsArray;
28use arrow_array::types::*;
29use arrow_array::{ArrayRef, Int32Array, RecordBatch, RecordBatchWriter};
30use arrow_schema::{
31 ArrowError, DataType as ArrowDataType, Field, IntervalUnit, SchemaRef, TimeUnit,
32};
33
34use super::schema::{add_encoded_arrow_schema_to_metadata, decimal_length_from_precision};
35
36use crate::arrow::ArrowSchemaConverter;
37use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder;
38use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter};
39use crate::column::page_encryption::PageEncryptor;
40use crate::column::writer::encoder::ColumnValueEncoder;
41use crate::column::writer::{
42 ColumnCloseResult, ColumnWriter, GenericColumnWriter, get_column_writer,
43};
44use crate::data_type::{ByteArray, FixedLenByteArray};
45#[cfg(feature = "encryption")]
46use crate::encryption::encrypt::FileEncryptor;
47use crate::errors::{ParquetError, Result};
48use crate::file::metadata::{KeyValue, ParquetMetaData, RowGroupMetaData};
49use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
50use crate::file::reader::{ChunkReader, Length};
51use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
52use crate::parquet_thrift::{ThriftCompactOutputProtocol, WriteThrift};
53use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor};
54use levels::{ArrayLevels, calculate_array_levels};
55
56mod byte_array;
57mod levels;
58
59pub struct ArrowWriter<W: Write> {
176 writer: SerializedFileWriter<W>,
178
179 in_progress: Option<ArrowRowGroupWriter>,
181
182 arrow_schema: SchemaRef,
186
187 row_group_writer_factory: ArrowRowGroupWriterFactory,
189
190 max_row_group_size: usize,
192}
193
194impl<W: Write + Send> std::fmt::Debug for ArrowWriter<W> {
195 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
196 let buffered_memory = self.in_progress_size();
197 f.debug_struct("ArrowWriter")
198 .field("writer", &self.writer)
199 .field("in_progress_size", &format_args!("{buffered_memory} bytes"))
200 .field("in_progress_rows", &self.in_progress_rows())
201 .field("arrow_schema", &self.arrow_schema)
202 .field("max_row_group_size", &self.max_row_group_size)
203 .finish()
204 }
205}
206
207impl<W: Write + Send> ArrowWriter<W> {
208 pub fn try_new(
214 writer: W,
215 arrow_schema: SchemaRef,
216 props: Option<WriterProperties>,
217 ) -> Result<Self> {
218 let options = ArrowWriterOptions::new().with_properties(props.unwrap_or_default());
219 Self::try_new_with_options(writer, arrow_schema, options)
220 }
221
222 pub fn try_new_with_options(
228 writer: W,
229 arrow_schema: SchemaRef,
230 options: ArrowWriterOptions,
231 ) -> Result<Self> {
232 let mut props = options.properties;
233
234 let schema = if let Some(parquet_schema) = options.schema_descr {
235 parquet_schema.clone()
236 } else {
237 let mut converter = ArrowSchemaConverter::new().with_coerce_types(props.coerce_types());
238 if let Some(schema_root) = &options.schema_root {
239 converter = converter.schema_root(schema_root);
240 }
241
242 converter.convert(&arrow_schema)?
243 };
244
245 if !options.skip_arrow_metadata {
246 add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
248 }
249
250 let max_row_group_size = props.max_row_group_size();
251
252 let props_ptr = Arc::new(props);
253 let file_writer =
254 SerializedFileWriter::new(writer, schema.root_schema_ptr(), Arc::clone(&props_ptr))?;
255
256 let row_group_writer_factory =
257 ArrowRowGroupWriterFactory::new(&file_writer, arrow_schema.clone());
258
259 Ok(Self {
260 writer: file_writer,
261 in_progress: None,
262 arrow_schema,
263 row_group_writer_factory,
264 max_row_group_size,
265 })
266 }
267
268 pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
270 self.writer.flushed_row_groups()
271 }
272
273 pub fn memory_size(&self) -> usize {
278 match &self.in_progress {
279 Some(in_progress) => in_progress.writers.iter().map(|x| x.memory_size()).sum(),
280 None => 0,
281 }
282 }
283
284 pub fn in_progress_size(&self) -> usize {
291 match &self.in_progress {
292 Some(in_progress) => in_progress
293 .writers
294 .iter()
295 .map(|x| x.get_estimated_total_bytes())
296 .sum(),
297 None => 0,
298 }
299 }
300
301 pub fn in_progress_rows(&self) -> usize {
303 self.in_progress
304 .as_ref()
305 .map(|x| x.buffered_rows)
306 .unwrap_or_default()
307 }
308
309 pub fn bytes_written(&self) -> usize {
311 self.writer.bytes_written()
312 }
313
314 pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
322 if batch.num_rows() == 0 {
323 return Ok(());
324 }
325
326 let in_progress = match &mut self.in_progress {
327 Some(in_progress) => in_progress,
328 x => x.insert(
329 self.row_group_writer_factory
330 .create_row_group_writer(self.writer.flushed_row_groups().len())?,
331 ),
332 };
333
334 if in_progress.buffered_rows + batch.num_rows() > self.max_row_group_size {
336 let to_write = self.max_row_group_size - in_progress.buffered_rows;
337 let a = batch.slice(0, to_write);
338 let b = batch.slice(to_write, batch.num_rows() - to_write);
339 self.write(&a)?;
340 return self.write(&b);
341 }
342
343 in_progress.write(batch)?;
344
345 if in_progress.buffered_rows >= self.max_row_group_size {
346 self.flush()?
347 }
348 Ok(())
349 }
350
351 pub fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
356 self.writer.write_all(buf)
357 }
358
359 pub fn sync(&mut self) -> std::io::Result<()> {
361 self.writer.flush()
362 }
363
364 pub fn flush(&mut self) -> Result<()> {
369 let in_progress = match self.in_progress.take() {
370 Some(in_progress) => in_progress,
371 None => return Ok(()),
372 };
373
374 let mut row_group_writer = self.writer.next_row_group()?;
375 for chunk in in_progress.close()? {
376 chunk.append_to_row_group(&mut row_group_writer)?;
377 }
378 row_group_writer.close()?;
379 Ok(())
380 }
381
382 pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
386 self.writer.append_key_value_metadata(kv_metadata)
387 }
388
389 pub fn inner(&self) -> &W {
391 self.writer.inner()
392 }
393
394 pub fn inner_mut(&mut self) -> &mut W {
403 self.writer.inner_mut()
404 }
405
406 pub fn into_inner(mut self) -> Result<W> {
408 self.flush()?;
409 self.writer.into_inner()
410 }
411
412 pub fn finish(&mut self) -> Result<ParquetMetaData> {
418 self.flush()?;
419 self.writer.finish()
420 }
421
422 pub fn close(mut self) -> Result<ParquetMetaData> {
424 self.finish()
425 }
426
427 #[deprecated(
429 since = "56.2.0",
430 note = "Use `ArrowRowGroupWriterFactory` instead, see `ArrowColumnWriter` for an example"
431 )]
432 pub fn get_column_writers(&mut self) -> Result<Vec<ArrowColumnWriter>> {
433 self.flush()?;
434 let in_progress = self
435 .row_group_writer_factory
436 .create_row_group_writer(self.writer.flushed_row_groups().len())?;
437 Ok(in_progress.writers)
438 }
439
440 #[deprecated(
442 since = "56.2.0",
443 note = "Use `SerializedFileWriter` directly instead, see `ArrowColumnWriter` for an example"
444 )]
445 pub fn append_row_group(&mut self, chunks: Vec<ArrowColumnChunk>) -> Result<()> {
446 let mut row_group_writer = self.writer.next_row_group()?;
447 for chunk in chunks {
448 chunk.append_to_row_group(&mut row_group_writer)?;
449 }
450 row_group_writer.close()?;
451 Ok(())
452 }
453
454 pub fn into_serialized_writer(
461 mut self,
462 ) -> Result<(SerializedFileWriter<W>, ArrowRowGroupWriterFactory)> {
463 self.flush()?;
464 Ok((self.writer, self.row_group_writer_factory))
465 }
466}
467
468impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> {
469 fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
470 self.write(batch).map_err(|e| e.into())
471 }
472
473 fn close(self) -> std::result::Result<(), ArrowError> {
474 self.close()?;
475 Ok(())
476 }
477}
478
479#[derive(Debug, Clone, Default)]
483pub struct ArrowWriterOptions {
484 properties: WriterProperties,
485 skip_arrow_metadata: bool,
486 schema_root: Option<String>,
487 schema_descr: Option<SchemaDescriptor>,
488}
489
490impl ArrowWriterOptions {
491 pub fn new() -> Self {
493 Self::default()
494 }
495
496 pub fn with_properties(self, properties: WriterProperties) -> Self {
498 Self { properties, ..self }
499 }
500
501 pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
508 Self {
509 skip_arrow_metadata,
510 ..self
511 }
512 }
513
514 pub fn with_schema_root(self, schema_root: String) -> Self {
516 Self {
517 schema_root: Some(schema_root),
518 ..self
519 }
520 }
521
522 pub fn with_parquet_schema(self, schema_descr: SchemaDescriptor) -> Self {
528 Self {
529 schema_descr: Some(schema_descr),
530 ..self
531 }
532 }
533}
534
535#[derive(Default)]
537struct ArrowColumnChunkData {
538 length: usize,
539 data: Vec<Bytes>,
540}
541
542impl Length for ArrowColumnChunkData {
543 fn len(&self) -> u64 {
544 self.length as _
545 }
546}
547
548impl ChunkReader for ArrowColumnChunkData {
549 type T = ArrowColumnChunkReader;
550
551 fn get_read(&self, start: u64) -> Result<Self::T> {
552 assert_eq!(start, 0); Ok(ArrowColumnChunkReader(
554 self.data.clone().into_iter().peekable(),
555 ))
556 }
557
558 fn get_bytes(&self, _start: u64, _length: usize) -> Result<Bytes> {
559 unimplemented!()
560 }
561}
562
563struct ArrowColumnChunkReader(Peekable<IntoIter<Bytes>>);
565
566impl Read for ArrowColumnChunkReader {
567 fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
568 let buffer = loop {
569 match self.0.peek_mut() {
570 Some(b) if b.is_empty() => {
571 self.0.next();
572 continue;
573 }
574 Some(b) => break b,
575 None => return Ok(0),
576 }
577 };
578
579 let len = buffer.len().min(out.len());
580 let b = buffer.split_to(len);
581 out[..len].copy_from_slice(&b);
582 Ok(len)
583 }
584}
585
586type SharedColumnChunk = Arc<Mutex<ArrowColumnChunkData>>;
591
592#[derive(Default)]
593struct ArrowPageWriter {
594 buffer: SharedColumnChunk,
595 #[cfg(feature = "encryption")]
596 page_encryptor: Option<PageEncryptor>,
597}
598
599impl ArrowPageWriter {
600 #[cfg(feature = "encryption")]
601 pub fn with_encryptor(mut self, page_encryptor: Option<PageEncryptor>) -> Self {
602 self.page_encryptor = page_encryptor;
603 self
604 }
605
606 #[cfg(feature = "encryption")]
607 fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
608 self.page_encryptor.as_mut()
609 }
610
611 #[cfg(not(feature = "encryption"))]
612 fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
613 None
614 }
615}
616
617impl PageWriter for ArrowPageWriter {
618 fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
619 let page = match self.page_encryptor_mut() {
620 Some(page_encryptor) => page_encryptor.encrypt_compressed_page(page)?,
621 None => page,
622 };
623
624 let page_header = page.to_thrift_header()?;
625 let header = {
626 let mut header = Vec::with_capacity(1024);
627
628 match self.page_encryptor_mut() {
629 Some(page_encryptor) => {
630 page_encryptor.encrypt_page_header(&page_header, &mut header)?;
631 if page.compressed_page().is_data_page() {
632 page_encryptor.increment_page();
633 }
634 }
635 None => {
636 let mut protocol = ThriftCompactOutputProtocol::new(&mut header);
637 page_header.write_thrift(&mut protocol)?;
638 }
639 };
640
641 Bytes::from(header)
642 };
643
644 let mut buf = self.buffer.try_lock().unwrap();
645
646 let data = page.compressed_page().buffer().clone();
647 let compressed_size = data.len() + header.len();
648
649 let mut spec = PageWriteSpec::new();
650 spec.page_type = page.page_type();
651 spec.num_values = page.num_values();
652 spec.uncompressed_size = page.uncompressed_size() + header.len();
653 spec.offset = buf.length as u64;
654 spec.compressed_size = compressed_size;
655 spec.bytes_written = compressed_size as u64;
656
657 buf.length += compressed_size;
658 buf.data.push(header);
659 buf.data.push(data);
660
661 Ok(spec)
662 }
663
664 fn close(&mut self) -> Result<()> {
665 Ok(())
666 }
667}
668
669#[derive(Debug)]
671pub struct ArrowLeafColumn(ArrayLevels);
672
673pub fn compute_leaves(field: &Field, array: &ArrayRef) -> Result<Vec<ArrowLeafColumn>> {
678 let levels = calculate_array_levels(array, field)?;
679 Ok(levels.into_iter().map(ArrowLeafColumn).collect())
680}
681
682pub struct ArrowColumnChunk {
684 data: ArrowColumnChunkData,
685 close: ColumnCloseResult,
686}
687
688impl std::fmt::Debug for ArrowColumnChunk {
689 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
690 f.debug_struct("ArrowColumnChunk")
691 .field("length", &self.data.length)
692 .finish_non_exhaustive()
693 }
694}
695
696impl ArrowColumnChunk {
697 pub fn append_to_row_group<W: Write + Send>(
699 self,
700 writer: &mut SerializedRowGroupWriter<'_, W>,
701 ) -> Result<()> {
702 writer.append_column(&self.data, self.close)
703 }
704}
705
706pub struct ArrowColumnWriter {
804 writer: ArrowColumnWriterImpl,
805 chunk: SharedColumnChunk,
806}
807
808impl std::fmt::Debug for ArrowColumnWriter {
809 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
810 f.debug_struct("ArrowColumnWriter").finish_non_exhaustive()
811 }
812}
813
814enum ArrowColumnWriterImpl {
815 ByteArray(GenericColumnWriter<'static, ByteArrayEncoder>),
816 Column(ColumnWriter<'static>),
817}
818
819impl ArrowColumnWriter {
820 pub fn write(&mut self, col: &ArrowLeafColumn) -> Result<()> {
822 match &mut self.writer {
823 ArrowColumnWriterImpl::Column(c) => {
824 let leaf = col.0.array();
825 match leaf.as_any_dictionary_opt() {
826 Some(dictionary) => {
827 let materialized =
828 arrow_select::take::take(dictionary.values(), dictionary.keys(), None)?;
829 write_leaf(c, &materialized, &col.0)?
830 }
831 None => write_leaf(c, leaf, &col.0)?,
832 };
833 }
834 ArrowColumnWriterImpl::ByteArray(c) => {
835 write_primitive(c, col.0.array().as_ref(), &col.0)?;
836 }
837 }
838 Ok(())
839 }
840
841 pub fn close(self) -> Result<ArrowColumnChunk> {
843 let close = match self.writer {
844 ArrowColumnWriterImpl::ByteArray(c) => c.close()?,
845 ArrowColumnWriterImpl::Column(c) => c.close()?,
846 };
847 let chunk = Arc::try_unwrap(self.chunk).ok().unwrap();
848 let data = chunk.into_inner().unwrap();
849 Ok(ArrowColumnChunk { data, close })
850 }
851
852 pub fn memory_size(&self) -> usize {
863 match &self.writer {
864 ArrowColumnWriterImpl::ByteArray(c) => c.memory_size(),
865 ArrowColumnWriterImpl::Column(c) => c.memory_size(),
866 }
867 }
868
869 pub fn get_estimated_total_bytes(&self) -> usize {
877 match &self.writer {
878 ArrowColumnWriterImpl::ByteArray(c) => c.get_estimated_total_bytes() as _,
879 ArrowColumnWriterImpl::Column(c) => c.get_estimated_total_bytes() as _,
880 }
881 }
882}
883
884#[derive(Debug)]
891struct ArrowRowGroupWriter {
892 writers: Vec<ArrowColumnWriter>,
893 schema: SchemaRef,
894 buffered_rows: usize,
895}
896
897impl ArrowRowGroupWriter {
898 fn new(writers: Vec<ArrowColumnWriter>, arrow: &SchemaRef) -> Self {
899 Self {
900 writers,
901 schema: arrow.clone(),
902 buffered_rows: 0,
903 }
904 }
905
906 fn write(&mut self, batch: &RecordBatch) -> Result<()> {
907 self.buffered_rows += batch.num_rows();
908 let mut writers = self.writers.iter_mut();
909 for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
910 for leaf in compute_leaves(field.as_ref(), column)? {
911 writers.next().unwrap().write(&leaf)?
912 }
913 }
914 Ok(())
915 }
916
917 fn close(self) -> Result<Vec<ArrowColumnChunk>> {
918 self.writers
919 .into_iter()
920 .map(|writer| writer.close())
921 .collect()
922 }
923}
924
925#[derive(Debug)]
930pub struct ArrowRowGroupWriterFactory {
931 schema: SchemaDescPtr,
932 arrow_schema: SchemaRef,
933 props: WriterPropertiesPtr,
934 #[cfg(feature = "encryption")]
935 file_encryptor: Option<Arc<FileEncryptor>>,
936}
937
938impl ArrowRowGroupWriterFactory {
939 pub fn new<W: Write + Send>(
941 file_writer: &SerializedFileWriter<W>,
942 arrow_schema: SchemaRef,
943 ) -> Self {
944 let schema = Arc::clone(file_writer.schema_descr_ptr());
945 let props = Arc::clone(file_writer.properties());
946 Self {
947 schema,
948 arrow_schema,
949 props,
950 #[cfg(feature = "encryption")]
951 file_encryptor: file_writer.file_encryptor(),
952 }
953 }
954
955 fn create_row_group_writer(&self, row_group_index: usize) -> Result<ArrowRowGroupWriter> {
956 let writers = self.create_column_writers(row_group_index)?;
957 Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema))
958 }
959
960 pub fn create_column_writers(&self, row_group_index: usize) -> Result<Vec<ArrowColumnWriter>> {
962 let mut writers = Vec::with_capacity(self.arrow_schema.fields.len());
963 let mut leaves = self.schema.columns().iter();
964 let column_factory = self.column_writer_factory(row_group_index);
965 for field in &self.arrow_schema.fields {
966 column_factory.get_arrow_column_writer(
967 field.data_type(),
968 &self.props,
969 &mut leaves,
970 &mut writers,
971 )?;
972 }
973 Ok(writers)
974 }
975
976 #[cfg(feature = "encryption")]
977 fn column_writer_factory(&self, row_group_idx: usize) -> ArrowColumnWriterFactory {
978 ArrowColumnWriterFactory::new()
979 .with_file_encryptor(row_group_idx, self.file_encryptor.clone())
980 }
981
982 #[cfg(not(feature = "encryption"))]
983 fn column_writer_factory(&self, _row_group_idx: usize) -> ArrowColumnWriterFactory {
984 ArrowColumnWriterFactory::new()
985 }
986}
987
988#[deprecated(since = "57.0.0", note = "Use `ArrowRowGroupWriterFactory` instead")]
990pub fn get_column_writers(
991 parquet: &SchemaDescriptor,
992 props: &WriterPropertiesPtr,
993 arrow: &SchemaRef,
994) -> Result<Vec<ArrowColumnWriter>> {
995 let mut writers = Vec::with_capacity(arrow.fields.len());
996 let mut leaves = parquet.columns().iter();
997 let column_factory = ArrowColumnWriterFactory::new();
998 for field in &arrow.fields {
999 column_factory.get_arrow_column_writer(
1000 field.data_type(),
1001 props,
1002 &mut leaves,
1003 &mut writers,
1004 )?;
1005 }
1006 Ok(writers)
1007}
1008
1009struct ArrowColumnWriterFactory {
1011 #[cfg(feature = "encryption")]
1012 row_group_index: usize,
1013 #[cfg(feature = "encryption")]
1014 file_encryptor: Option<Arc<FileEncryptor>>,
1015}
1016
1017impl ArrowColumnWriterFactory {
1018 pub fn new() -> Self {
1019 Self {
1020 #[cfg(feature = "encryption")]
1021 row_group_index: 0,
1022 #[cfg(feature = "encryption")]
1023 file_encryptor: None,
1024 }
1025 }
1026
1027 #[cfg(feature = "encryption")]
1028 pub fn with_file_encryptor(
1029 mut self,
1030 row_group_index: usize,
1031 file_encryptor: Option<Arc<FileEncryptor>>,
1032 ) -> Self {
1033 self.row_group_index = row_group_index;
1034 self.file_encryptor = file_encryptor;
1035 self
1036 }
1037
1038 #[cfg(feature = "encryption")]
1039 fn create_page_writer(
1040 &self,
1041 column_descriptor: &ColumnDescPtr,
1042 column_index: usize,
1043 ) -> Result<Box<ArrowPageWriter>> {
1044 let column_path = column_descriptor.path().string();
1045 let page_encryptor = PageEncryptor::create_if_column_encrypted(
1046 &self.file_encryptor,
1047 self.row_group_index,
1048 column_index,
1049 &column_path,
1050 )?;
1051 Ok(Box::new(
1052 ArrowPageWriter::default().with_encryptor(page_encryptor),
1053 ))
1054 }
1055
1056 #[cfg(not(feature = "encryption"))]
1057 fn create_page_writer(
1058 &self,
1059 _column_descriptor: &ColumnDescPtr,
1060 _column_index: usize,
1061 ) -> Result<Box<ArrowPageWriter>> {
1062 Ok(Box::<ArrowPageWriter>::default())
1063 }
1064
1065 fn get_arrow_column_writer(
1068 &self,
1069 data_type: &ArrowDataType,
1070 props: &WriterPropertiesPtr,
1071 leaves: &mut Iter<'_, ColumnDescPtr>,
1072 out: &mut Vec<ArrowColumnWriter>,
1073 ) -> Result<()> {
1074 let col = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
1076 let page_writer = self.create_page_writer(desc, out.len())?;
1077 let chunk = page_writer.buffer.clone();
1078 let writer = get_column_writer(desc.clone(), props.clone(), page_writer);
1079 Ok(ArrowColumnWriter {
1080 chunk,
1081 writer: ArrowColumnWriterImpl::Column(writer),
1082 })
1083 };
1084
1085 let bytes = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
1087 let page_writer = self.create_page_writer(desc, out.len())?;
1088 let chunk = page_writer.buffer.clone();
1089 let writer = GenericColumnWriter::new(desc.clone(), props.clone(), page_writer);
1090 Ok(ArrowColumnWriter {
1091 chunk,
1092 writer: ArrowColumnWriterImpl::ByteArray(writer),
1093 })
1094 };
1095
1096 match data_type {
1097 _ if data_type.is_primitive() => out.push(col(leaves.next().unwrap())?),
1098 ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Boolean | ArrowDataType::Null => {
1099 out.push(col(leaves.next().unwrap())?)
1100 }
1101 ArrowDataType::LargeBinary
1102 | ArrowDataType::Binary
1103 | ArrowDataType::Utf8
1104 | ArrowDataType::LargeUtf8
1105 | ArrowDataType::BinaryView
1106 | ArrowDataType::Utf8View => out.push(bytes(leaves.next().unwrap())?),
1107 ArrowDataType::List(f)
1108 | ArrowDataType::LargeList(f)
1109 | ArrowDataType::FixedSizeList(f, _) => {
1110 self.get_arrow_column_writer(f.data_type(), props, leaves, out)?
1111 }
1112 ArrowDataType::Struct(fields) => {
1113 for field in fields {
1114 self.get_arrow_column_writer(field.data_type(), props, leaves, out)?
1115 }
1116 }
1117 ArrowDataType::Map(f, _) => match f.data_type() {
1118 ArrowDataType::Struct(f) => {
1119 self.get_arrow_column_writer(f[0].data_type(), props, leaves, out)?;
1120 self.get_arrow_column_writer(f[1].data_type(), props, leaves, out)?
1121 }
1122 _ => unreachable!("invalid map type"),
1123 },
1124 ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1125 ArrowDataType::Utf8
1126 | ArrowDataType::LargeUtf8
1127 | ArrowDataType::Binary
1128 | ArrowDataType::LargeBinary => out.push(bytes(leaves.next().unwrap())?),
1129 ArrowDataType::Utf8View | ArrowDataType::BinaryView => {
1130 out.push(bytes(leaves.next().unwrap())?)
1131 }
1132 ArrowDataType::FixedSizeBinary(_) => out.push(bytes(leaves.next().unwrap())?),
1133 _ => out.push(col(leaves.next().unwrap())?),
1134 },
1135 _ => {
1136 return Err(ParquetError::NYI(format!(
1137 "Attempting to write an Arrow type {data_type} to parquet that is not yet implemented"
1138 )));
1139 }
1140 }
1141 Ok(())
1142 }
1143}
1144
1145fn write_leaf(
1146 writer: &mut ColumnWriter<'_>,
1147 column: &dyn arrow_array::Array,
1148 levels: &ArrayLevels,
1149) -> Result<usize> {
1150 let indices = levels.non_null_indices();
1151
1152 match writer {
1153 ColumnWriter::Int32ColumnWriter(typed) => {
1155 match column.data_type() {
1156 ArrowDataType::Null => {
1157 let array = Int32Array::new_null(column.len());
1158 write_primitive(typed, array.values(), levels)
1159 }
1160 ArrowDataType::Int8 => {
1161 let array: Int32Array = column.as_primitive::<Int8Type>().unary(|x| x as i32);
1162 write_primitive(typed, array.values(), levels)
1163 }
1164 ArrowDataType::Int16 => {
1165 let array: Int32Array = column.as_primitive::<Int16Type>().unary(|x| x as i32);
1166 write_primitive(typed, array.values(), levels)
1167 }
1168 ArrowDataType::Int32 => {
1169 write_primitive(typed, column.as_primitive::<Int32Type>().values(), levels)
1170 }
1171 ArrowDataType::UInt8 => {
1172 let array: Int32Array = column.as_primitive::<UInt8Type>().unary(|x| x as i32);
1173 write_primitive(typed, array.values(), levels)
1174 }
1175 ArrowDataType::UInt16 => {
1176 let array: Int32Array = column.as_primitive::<UInt16Type>().unary(|x| x as i32);
1177 write_primitive(typed, array.values(), levels)
1178 }
1179 ArrowDataType::UInt32 => {
1180 let array = column.as_primitive::<UInt32Type>();
1183 write_primitive(typed, array.values().inner().typed_data(), levels)
1184 }
1185 ArrowDataType::Date32 => {
1186 let array = column.as_primitive::<Date32Type>();
1187 write_primitive(typed, array.values(), levels)
1188 }
1189 ArrowDataType::Time32(TimeUnit::Second) => {
1190 let array = column.as_primitive::<Time32SecondType>();
1191 write_primitive(typed, array.values(), levels)
1192 }
1193 ArrowDataType::Time32(TimeUnit::Millisecond) => {
1194 let array = column.as_primitive::<Time32MillisecondType>();
1195 write_primitive(typed, array.values(), levels)
1196 }
1197 ArrowDataType::Date64 => {
1198 let array: Int32Array = column
1200 .as_primitive::<Date64Type>()
1201 .unary(|x| (x / 86_400_000) as _);
1202
1203 write_primitive(typed, array.values(), levels)
1204 }
1205 ArrowDataType::Decimal32(_, _) => {
1206 let array = column
1207 .as_primitive::<Decimal32Type>()
1208 .unary::<_, Int32Type>(|v| v);
1209 write_primitive(typed, array.values(), levels)
1210 }
1211 ArrowDataType::Decimal64(_, _) => {
1212 let array = column
1214 .as_primitive::<Decimal64Type>()
1215 .unary::<_, Int32Type>(|v| v as i32);
1216 write_primitive(typed, array.values(), levels)
1217 }
1218 ArrowDataType::Decimal128(_, _) => {
1219 let array = column
1221 .as_primitive::<Decimal128Type>()
1222 .unary::<_, Int32Type>(|v| v as i32);
1223 write_primitive(typed, array.values(), levels)
1224 }
1225 ArrowDataType::Decimal256(_, _) => {
1226 let array = column
1228 .as_primitive::<Decimal256Type>()
1229 .unary::<_, Int32Type>(|v| v.as_i128() as i32);
1230 write_primitive(typed, array.values(), levels)
1231 }
1232 d => Err(ParquetError::General(format!("Cannot coerce {d} to I32"))),
1233 }
1234 }
1235 ColumnWriter::BoolColumnWriter(typed) => {
1236 let array = column.as_boolean();
1237 typed.write_batch(
1238 get_bool_array_slice(array, indices).as_slice(),
1239 levels.def_levels(),
1240 levels.rep_levels(),
1241 )
1242 }
1243 ColumnWriter::Int64ColumnWriter(typed) => {
1244 match column.data_type() {
1245 ArrowDataType::Date64 => {
1246 let array = column
1247 .as_primitive::<Date64Type>()
1248 .reinterpret_cast::<Int64Type>();
1249
1250 write_primitive(typed, array.values(), levels)
1251 }
1252 ArrowDataType::Int64 => {
1253 let array = column.as_primitive::<Int64Type>();
1254 write_primitive(typed, array.values(), levels)
1255 }
1256 ArrowDataType::UInt64 => {
1257 let values = column.as_primitive::<UInt64Type>().values();
1258 let array = values.inner().typed_data::<i64>();
1261 write_primitive(typed, array, levels)
1262 }
1263 ArrowDataType::Time64(TimeUnit::Microsecond) => {
1264 let array = column.as_primitive::<Time64MicrosecondType>();
1265 write_primitive(typed, array.values(), levels)
1266 }
1267 ArrowDataType::Time64(TimeUnit::Nanosecond) => {
1268 let array = column.as_primitive::<Time64NanosecondType>();
1269 write_primitive(typed, array.values(), levels)
1270 }
1271 ArrowDataType::Timestamp(unit, _) => match unit {
1272 TimeUnit::Second => {
1273 let array = column.as_primitive::<TimestampSecondType>();
1274 write_primitive(typed, array.values(), levels)
1275 }
1276 TimeUnit::Millisecond => {
1277 let array = column.as_primitive::<TimestampMillisecondType>();
1278 write_primitive(typed, array.values(), levels)
1279 }
1280 TimeUnit::Microsecond => {
1281 let array = column.as_primitive::<TimestampMicrosecondType>();
1282 write_primitive(typed, array.values(), levels)
1283 }
1284 TimeUnit::Nanosecond => {
1285 let array = column.as_primitive::<TimestampNanosecondType>();
1286 write_primitive(typed, array.values(), levels)
1287 }
1288 },
1289 ArrowDataType::Duration(unit) => match unit {
1290 TimeUnit::Second => {
1291 let array = column.as_primitive::<DurationSecondType>();
1292 write_primitive(typed, array.values(), levels)
1293 }
1294 TimeUnit::Millisecond => {
1295 let array = column.as_primitive::<DurationMillisecondType>();
1296 write_primitive(typed, array.values(), levels)
1297 }
1298 TimeUnit::Microsecond => {
1299 let array = column.as_primitive::<DurationMicrosecondType>();
1300 write_primitive(typed, array.values(), levels)
1301 }
1302 TimeUnit::Nanosecond => {
1303 let array = column.as_primitive::<DurationNanosecondType>();
1304 write_primitive(typed, array.values(), levels)
1305 }
1306 },
1307 ArrowDataType::Decimal64(_, _) => {
1308 let array = column
1309 .as_primitive::<Decimal64Type>()
1310 .reinterpret_cast::<Int64Type>();
1311 write_primitive(typed, array.values(), levels)
1312 }
1313 ArrowDataType::Decimal128(_, _) => {
1314 let array = column
1316 .as_primitive::<Decimal128Type>()
1317 .unary::<_, Int64Type>(|v| v as i64);
1318 write_primitive(typed, array.values(), levels)
1319 }
1320 ArrowDataType::Decimal256(_, _) => {
1321 let array = column
1323 .as_primitive::<Decimal256Type>()
1324 .unary::<_, Int64Type>(|v| v.as_i128() as i64);
1325 write_primitive(typed, array.values(), levels)
1326 }
1327 d => Err(ParquetError::General(format!("Cannot coerce {d} to I64"))),
1328 }
1329 }
1330 ColumnWriter::Int96ColumnWriter(_typed) => {
1331 unreachable!("Currently unreachable because data type not supported")
1332 }
1333 ColumnWriter::FloatColumnWriter(typed) => {
1334 let array = column.as_primitive::<Float32Type>();
1335 write_primitive(typed, array.values(), levels)
1336 }
1337 ColumnWriter::DoubleColumnWriter(typed) => {
1338 let array = column.as_primitive::<Float64Type>();
1339 write_primitive(typed, array.values(), levels)
1340 }
1341 ColumnWriter::ByteArrayColumnWriter(_) => {
1342 unreachable!("should use ByteArrayWriter")
1343 }
1344 ColumnWriter::FixedLenByteArrayColumnWriter(typed) => {
1345 let bytes = match column.data_type() {
1346 ArrowDataType::Interval(interval_unit) => match interval_unit {
1347 IntervalUnit::YearMonth => {
1348 let array = column.as_primitive::<IntervalYearMonthType>();
1349 get_interval_ym_array_slice(array, indices)
1350 }
1351 IntervalUnit::DayTime => {
1352 let array = column.as_primitive::<IntervalDayTimeType>();
1353 get_interval_dt_array_slice(array, indices)
1354 }
1355 _ => {
1356 return Err(ParquetError::NYI(format!(
1357 "Attempting to write an Arrow interval type {interval_unit:?} to parquet that is not yet implemented"
1358 )));
1359 }
1360 },
1361 ArrowDataType::FixedSizeBinary(_) => {
1362 let array = column.as_fixed_size_binary();
1363 get_fsb_array_slice(array, indices)
1364 }
1365 ArrowDataType::Decimal32(_, _) => {
1366 let array = column.as_primitive::<Decimal32Type>();
1367 get_decimal_32_array_slice(array, indices)
1368 }
1369 ArrowDataType::Decimal64(_, _) => {
1370 let array = column.as_primitive::<Decimal64Type>();
1371 get_decimal_64_array_slice(array, indices)
1372 }
1373 ArrowDataType::Decimal128(_, _) => {
1374 let array = column.as_primitive::<Decimal128Type>();
1375 get_decimal_128_array_slice(array, indices)
1376 }
1377 ArrowDataType::Decimal256(_, _) => {
1378 let array = column.as_primitive::<Decimal256Type>();
1379 get_decimal_256_array_slice(array, indices)
1380 }
1381 ArrowDataType::Float16 => {
1382 let array = column.as_primitive::<Float16Type>();
1383 get_float_16_array_slice(array, indices)
1384 }
1385 _ => {
1386 return Err(ParquetError::NYI(
1387 "Attempting to write an Arrow type that is not yet implemented".to_string(),
1388 ));
1389 }
1390 };
1391 typed.write_batch(bytes.as_slice(), levels.def_levels(), levels.rep_levels())
1392 }
1393 }
1394}
1395
1396fn write_primitive<E: ColumnValueEncoder>(
1397 writer: &mut GenericColumnWriter<E>,
1398 values: &E::Values,
1399 levels: &ArrayLevels,
1400) -> Result<usize> {
1401 writer.write_batch_internal(
1402 values,
1403 Some(levels.non_null_indices()),
1404 levels.def_levels(),
1405 levels.rep_levels(),
1406 None,
1407 None,
1408 None,
1409 )
1410}
1411
1412fn get_bool_array_slice(array: &arrow_array::BooleanArray, indices: &[usize]) -> Vec<bool> {
1413 let mut values = Vec::with_capacity(indices.len());
1414 for i in indices {
1415 values.push(array.value(*i))
1416 }
1417 values
1418}
1419
1420fn get_interval_ym_array_slice(
1423 array: &arrow_array::IntervalYearMonthArray,
1424 indices: &[usize],
1425) -> Vec<FixedLenByteArray> {
1426 let mut values = Vec::with_capacity(indices.len());
1427 for i in indices {
1428 let mut value = array.value(*i).to_le_bytes().to_vec();
1429 let mut suffix = vec![0; 8];
1430 value.append(&mut suffix);
1431 values.push(FixedLenByteArray::from(ByteArray::from(value)))
1432 }
1433 values
1434}
1435
1436fn get_interval_dt_array_slice(
1439 array: &arrow_array::IntervalDayTimeArray,
1440 indices: &[usize],
1441) -> Vec<FixedLenByteArray> {
1442 let mut values = Vec::with_capacity(indices.len());
1443 for i in indices {
1444 let mut out = [0; 12];
1445 let value = array.value(*i);
1446 out[4..8].copy_from_slice(&value.days.to_le_bytes());
1447 out[8..12].copy_from_slice(&value.milliseconds.to_le_bytes());
1448 values.push(FixedLenByteArray::from(ByteArray::from(out.to_vec())));
1449 }
1450 values
1451}
1452
1453fn get_decimal_32_array_slice(
1454 array: &arrow_array::Decimal32Array,
1455 indices: &[usize],
1456) -> Vec<FixedLenByteArray> {
1457 let mut values = Vec::with_capacity(indices.len());
1458 let size = decimal_length_from_precision(array.precision());
1459 for i in indices {
1460 let as_be_bytes = array.value(*i).to_be_bytes();
1461 let resized_value = as_be_bytes[(4 - size)..].to_vec();
1462 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1463 }
1464 values
1465}
1466
1467fn get_decimal_64_array_slice(
1468 array: &arrow_array::Decimal64Array,
1469 indices: &[usize],
1470) -> Vec<FixedLenByteArray> {
1471 let mut values = Vec::with_capacity(indices.len());
1472 let size = decimal_length_from_precision(array.precision());
1473 for i in indices {
1474 let as_be_bytes = array.value(*i).to_be_bytes();
1475 let resized_value = as_be_bytes[(8 - size)..].to_vec();
1476 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1477 }
1478 values
1479}
1480
1481fn get_decimal_128_array_slice(
1482 array: &arrow_array::Decimal128Array,
1483 indices: &[usize],
1484) -> Vec<FixedLenByteArray> {
1485 let mut values = Vec::with_capacity(indices.len());
1486 let size = decimal_length_from_precision(array.precision());
1487 for i in indices {
1488 let as_be_bytes = array.value(*i).to_be_bytes();
1489 let resized_value = as_be_bytes[(16 - size)..].to_vec();
1490 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1491 }
1492 values
1493}
1494
1495fn get_decimal_256_array_slice(
1496 array: &arrow_array::Decimal256Array,
1497 indices: &[usize],
1498) -> Vec<FixedLenByteArray> {
1499 let mut values = Vec::with_capacity(indices.len());
1500 let size = decimal_length_from_precision(array.precision());
1501 for i in indices {
1502 let as_be_bytes = array.value(*i).to_be_bytes();
1503 let resized_value = as_be_bytes[(32 - size)..].to_vec();
1504 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1505 }
1506 values
1507}
1508
1509fn get_float_16_array_slice(
1510 array: &arrow_array::Float16Array,
1511 indices: &[usize],
1512) -> Vec<FixedLenByteArray> {
1513 let mut values = Vec::with_capacity(indices.len());
1514 for i in indices {
1515 let value = array.value(*i).to_le_bytes().to_vec();
1516 values.push(FixedLenByteArray::from(ByteArray::from(value)));
1517 }
1518 values
1519}
1520
1521fn get_fsb_array_slice(
1522 array: &arrow_array::FixedSizeBinaryArray,
1523 indices: &[usize],
1524) -> Vec<FixedLenByteArray> {
1525 let mut values = Vec::with_capacity(indices.len());
1526 for i in indices {
1527 let value = array.value(*i).to_vec();
1528 values.push(FixedLenByteArray::from(ByteArray::from(value)))
1529 }
1530 values
1531}
1532
1533#[cfg(test)]
1534mod tests {
1535 use super::*;
1536 use std::collections::HashMap;
1537
1538 use std::fs::File;
1539
1540 use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
1541 use crate::arrow::{ARROW_SCHEMA_META_KEY, PARQUET_FIELD_ID_META_KEY};
1542 use crate::column::page::{Page, PageReader};
1543 use crate::file::metadata::thrift::PageHeader;
1544 use crate::file::page_index::column_index::ColumnIndexMetaData;
1545 use crate::file::reader::SerializedPageReader;
1546 use crate::parquet_thrift::{ReadThrift, ThriftSliceInputProtocol};
1547 use crate::schema::types::ColumnPath;
1548 use arrow::datatypes::ToByteSlice;
1549 use arrow::datatypes::{DataType, Schema};
1550 use arrow::error::Result as ArrowResult;
1551 use arrow::util::data_gen::create_random_array;
1552 use arrow::util::pretty::pretty_format_batches;
1553 use arrow::{array::*, buffer::Buffer};
1554 use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano, NullBuffer, OffsetBuffer, i256};
1555 use arrow_schema::Fields;
1556 use half::f16;
1557 use num_traits::{FromPrimitive, ToPrimitive};
1558 use tempfile::tempfile;
1559
1560 use crate::basic::Encoding;
1561 use crate::data_type::AsBytes;
1562 use crate::file::metadata::{ColumnChunkMetaData, ParquetMetaData, ParquetMetaDataReader};
1563 use crate::file::properties::{
1564 BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion,
1565 };
1566 use crate::file::serialized_reader::ReadOptionsBuilder;
1567 use crate::file::{
1568 reader::{FileReader, SerializedFileReader},
1569 statistics::Statistics,
1570 };
1571
1572 #[test]
1573 fn arrow_writer() {
1574 let schema = Schema::new(vec![
1576 Field::new("a", DataType::Int32, false),
1577 Field::new("b", DataType::Int32, true),
1578 ]);
1579
1580 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1582 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1583
1584 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
1586
1587 roundtrip(batch, Some(SMALL_SIZE / 2));
1588 }
1589
1590 fn get_bytes_after_close(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1591 let mut buffer = vec![];
1592
1593 let mut writer = ArrowWriter::try_new(&mut buffer, schema, None).unwrap();
1594 writer.write(expected_batch).unwrap();
1595 writer.close().unwrap();
1596
1597 buffer
1598 }
1599
1600 fn get_bytes_by_into_inner(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1601 let mut writer = ArrowWriter::try_new(Vec::new(), schema, None).unwrap();
1602 writer.write(expected_batch).unwrap();
1603 writer.into_inner().unwrap()
1604 }
1605
1606 #[test]
1607 fn roundtrip_bytes() {
1608 let schema = Arc::new(Schema::new(vec![
1610 Field::new("a", DataType::Int32, false),
1611 Field::new("b", DataType::Int32, true),
1612 ]));
1613
1614 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1616 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1617
1618 let expected_batch =
1620 RecordBatch::try_new(schema.clone(), vec![Arc::new(a), Arc::new(b)]).unwrap();
1621
1622 for buffer in [
1623 get_bytes_after_close(schema.clone(), &expected_batch),
1624 get_bytes_by_into_inner(schema, &expected_batch),
1625 ] {
1626 let cursor = Bytes::from(buffer);
1627 let mut record_batch_reader = ParquetRecordBatchReader::try_new(cursor, 1024).unwrap();
1628
1629 let actual_batch = record_batch_reader
1630 .next()
1631 .expect("No batch found")
1632 .expect("Unable to get batch");
1633
1634 assert_eq!(expected_batch.schema(), actual_batch.schema());
1635 assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
1636 assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
1637 for i in 0..expected_batch.num_columns() {
1638 let expected_data = expected_batch.column(i).to_data();
1639 let actual_data = actual_batch.column(i).to_data();
1640
1641 assert_eq!(expected_data, actual_data);
1642 }
1643 }
1644 }
1645
1646 #[test]
1647 fn arrow_writer_non_null() {
1648 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1650
1651 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1653
1654 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1656
1657 roundtrip(batch, Some(SMALL_SIZE / 2));
1658 }
1659
1660 #[test]
1661 fn arrow_writer_list() {
1662 let schema = Schema::new(vec![Field::new(
1664 "a",
1665 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
1666 true,
1667 )]);
1668
1669 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1671
1672 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1675
1676 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
1678 DataType::Int32,
1679 false,
1680 ))))
1681 .len(5)
1682 .add_buffer(a_value_offsets)
1683 .add_child_data(a_values.into_data())
1684 .null_bit_buffer(Some(Buffer::from([0b00011011])))
1685 .build()
1686 .unwrap();
1687 let a = ListArray::from(a_list_data);
1688
1689 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1691
1692 assert_eq!(batch.column(0).null_count(), 1);
1693
1694 roundtrip(batch, None);
1697 }
1698
1699 #[test]
1700 fn arrow_writer_list_non_null() {
1701 let schema = Schema::new(vec![Field::new(
1703 "a",
1704 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
1705 false,
1706 )]);
1707
1708 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1710
1711 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1714
1715 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
1717 DataType::Int32,
1718 false,
1719 ))))
1720 .len(5)
1721 .add_buffer(a_value_offsets)
1722 .add_child_data(a_values.into_data())
1723 .build()
1724 .unwrap();
1725 let a = ListArray::from(a_list_data);
1726
1727 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1729
1730 assert_eq!(batch.column(0).null_count(), 0);
1733
1734 roundtrip(batch, None);
1735 }
1736
1737 #[test]
1738 fn arrow_writer_binary() {
1739 let string_field = Field::new("a", DataType::Utf8, false);
1740 let binary_field = Field::new("b", DataType::Binary, false);
1741 let schema = Schema::new(vec![string_field, binary_field]);
1742
1743 let raw_string_values = vec!["foo", "bar", "baz", "quux"];
1744 let raw_binary_values = [
1745 b"foo".to_vec(),
1746 b"bar".to_vec(),
1747 b"baz".to_vec(),
1748 b"quux".to_vec(),
1749 ];
1750 let raw_binary_value_refs = raw_binary_values
1751 .iter()
1752 .map(|x| x.as_slice())
1753 .collect::<Vec<_>>();
1754
1755 let string_values = StringArray::from(raw_string_values.clone());
1756 let binary_values = BinaryArray::from(raw_binary_value_refs);
1757 let batch = RecordBatch::try_new(
1758 Arc::new(schema),
1759 vec![Arc::new(string_values), Arc::new(binary_values)],
1760 )
1761 .unwrap();
1762
1763 roundtrip(batch, Some(SMALL_SIZE / 2));
1764 }
1765
1766 #[test]
1767 fn arrow_writer_binary_view() {
1768 let string_field = Field::new("a", DataType::Utf8View, false);
1769 let binary_field = Field::new("b", DataType::BinaryView, false);
1770 let nullable_string_field = Field::new("a", DataType::Utf8View, true);
1771 let schema = Schema::new(vec![string_field, binary_field, nullable_string_field]);
1772
1773 let raw_string_values = vec!["foo", "bar", "large payload over 12 bytes", "lulu"];
1774 let raw_binary_values = vec![
1775 b"foo".to_vec(),
1776 b"bar".to_vec(),
1777 b"large payload over 12 bytes".to_vec(),
1778 b"lulu".to_vec(),
1779 ];
1780 let nullable_string_values =
1781 vec![Some("foo"), None, Some("large payload over 12 bytes"), None];
1782
1783 let string_view_values = StringViewArray::from(raw_string_values);
1784 let binary_view_values = BinaryViewArray::from_iter_values(raw_binary_values);
1785 let nullable_string_view_values = StringViewArray::from(nullable_string_values);
1786 let batch = RecordBatch::try_new(
1787 Arc::new(schema),
1788 vec![
1789 Arc::new(string_view_values),
1790 Arc::new(binary_view_values),
1791 Arc::new(nullable_string_view_values),
1792 ],
1793 )
1794 .unwrap();
1795
1796 roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
1797 roundtrip(batch, None);
1798 }
1799
1800 #[test]
1801 fn arrow_writer_binary_view_long_value() {
1802 let string_field = Field::new("a", DataType::Utf8View, false);
1803 let binary_field = Field::new("b", DataType::BinaryView, false);
1804 let schema = Schema::new(vec![string_field, binary_field]);
1805
1806 let long = "a".repeat(128);
1810 let raw_string_values = vec!["foo", long.as_str(), "bar"];
1811 let raw_binary_values = vec![b"foo".to_vec(), long.as_bytes().to_vec(), b"bar".to_vec()];
1812
1813 let string_view_values: ArrayRef = Arc::new(StringViewArray::from(raw_string_values));
1814 let binary_view_values: ArrayRef =
1815 Arc::new(BinaryViewArray::from_iter_values(raw_binary_values));
1816
1817 one_column_roundtrip(Arc::clone(&string_view_values), false);
1818 one_column_roundtrip(Arc::clone(&binary_view_values), false);
1819
1820 let batch = RecordBatch::try_new(
1821 Arc::new(schema),
1822 vec![string_view_values, binary_view_values],
1823 )
1824 .unwrap();
1825
1826 for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
1828 let props = WriterProperties::builder()
1829 .set_writer_version(version)
1830 .set_dictionary_enabled(false)
1831 .build();
1832 roundtrip_opts(&batch, props);
1833 }
1834 }
1835
1836 fn get_decimal_batch(precision: u8, scale: i8) -> RecordBatch {
1837 let decimal_field = Field::new("a", DataType::Decimal128(precision, scale), false);
1838 let schema = Schema::new(vec![decimal_field]);
1839
1840 let decimal_values = vec![10_000, 50_000, 0, -100]
1841 .into_iter()
1842 .map(Some)
1843 .collect::<Decimal128Array>()
1844 .with_precision_and_scale(precision, scale)
1845 .unwrap();
1846
1847 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(decimal_values)]).unwrap()
1848 }
1849
1850 #[test]
1851 fn arrow_writer_decimal() {
1852 let batch_int32_decimal = get_decimal_batch(5, 2);
1854 roundtrip(batch_int32_decimal, Some(SMALL_SIZE / 2));
1855 let batch_int64_decimal = get_decimal_batch(12, 2);
1857 roundtrip(batch_int64_decimal, Some(SMALL_SIZE / 2));
1858 let batch_fixed_len_byte_array_decimal = get_decimal_batch(30, 2);
1860 roundtrip(batch_fixed_len_byte_array_decimal, Some(SMALL_SIZE / 2));
1861 }
1862
1863 #[test]
1864 fn arrow_writer_complex() {
1865 let struct_field_d = Arc::new(Field::new("d", DataType::Float64, true));
1867 let struct_field_f = Arc::new(Field::new("f", DataType::Float32, true));
1868 let struct_field_g = Arc::new(Field::new_list(
1869 "g",
1870 Field::new_list_field(DataType::Int16, true),
1871 false,
1872 ));
1873 let struct_field_h = Arc::new(Field::new_list(
1874 "h",
1875 Field::new_list_field(DataType::Int16, false),
1876 true,
1877 ));
1878 let struct_field_e = Arc::new(Field::new_struct(
1879 "e",
1880 vec![
1881 struct_field_f.clone(),
1882 struct_field_g.clone(),
1883 struct_field_h.clone(),
1884 ],
1885 false,
1886 ));
1887 let schema = Schema::new(vec![
1888 Field::new("a", DataType::Int32, false),
1889 Field::new("b", DataType::Int32, true),
1890 Field::new_struct(
1891 "c",
1892 vec![struct_field_d.clone(), struct_field_e.clone()],
1893 false,
1894 ),
1895 ]);
1896
1897 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1899 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1900 let d = Float64Array::from(vec![None, None, None, Some(1.0), None]);
1901 let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]);
1902
1903 let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1904
1905 let g_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1908
1909 let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
1911 .len(5)
1912 .add_buffer(g_value_offsets.clone())
1913 .add_child_data(g_value.to_data())
1914 .build()
1915 .unwrap();
1916 let g = ListArray::from(g_list_data);
1917 let h_list_data = ArrayData::builder(struct_field_h.data_type().clone())
1919 .len(5)
1920 .add_buffer(g_value_offsets)
1921 .add_child_data(g_value.to_data())
1922 .null_bit_buffer(Some(Buffer::from([0b00011011])))
1923 .build()
1924 .unwrap();
1925 let h = ListArray::from(h_list_data);
1926
1927 let e = StructArray::from(vec![
1928 (struct_field_f, Arc::new(f) as ArrayRef),
1929 (struct_field_g, Arc::new(g) as ArrayRef),
1930 (struct_field_h, Arc::new(h) as ArrayRef),
1931 ]);
1932
1933 let c = StructArray::from(vec![
1934 (struct_field_d, Arc::new(d) as ArrayRef),
1935 (struct_field_e, Arc::new(e) as ArrayRef),
1936 ]);
1937
1938 let batch = RecordBatch::try_new(
1940 Arc::new(schema),
1941 vec![Arc::new(a), Arc::new(b), Arc::new(c)],
1942 )
1943 .unwrap();
1944
1945 roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
1946 roundtrip(batch, Some(SMALL_SIZE / 3));
1947 }
1948
1949 #[test]
1950 fn arrow_writer_complex_mixed() {
1951 let offset_field = Arc::new(Field::new("offset", DataType::Int32, false));
1956 let partition_field = Arc::new(Field::new("partition", DataType::Int64, true));
1957 let topic_field = Arc::new(Field::new("topic", DataType::Utf8, true));
1958 let schema = Schema::new(vec![Field::new(
1959 "some_nested_object",
1960 DataType::Struct(Fields::from(vec![
1961 offset_field.clone(),
1962 partition_field.clone(),
1963 topic_field.clone(),
1964 ])),
1965 false,
1966 )]);
1967
1968 let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
1970 let partition = Int64Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1971 let topic = StringArray::from(vec![Some("A"), None, Some("A"), Some(""), None]);
1972
1973 let some_nested_object = StructArray::from(vec![
1974 (offset_field, Arc::new(offset) as ArrayRef),
1975 (partition_field, Arc::new(partition) as ArrayRef),
1976 (topic_field, Arc::new(topic) as ArrayRef),
1977 ]);
1978
1979 let batch =
1981 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
1982
1983 roundtrip(batch, Some(SMALL_SIZE / 2));
1984 }
1985
1986 #[test]
1987 fn arrow_writer_map() {
1988 let json_content = r#"
1990 {"stocks":{"long": "$AAA", "short": "$BBB"}}
1991 {"stocks":{"long": null, "long": "$CCC", "short": null}}
1992 {"stocks":{"hedged": "$YYY", "long": null, "short": "$D"}}
1993 "#;
1994 let entries_struct_type = DataType::Struct(Fields::from(vec![
1995 Field::new("key", DataType::Utf8, false),
1996 Field::new("value", DataType::Utf8, true),
1997 ]));
1998 let stocks_field = Field::new(
1999 "stocks",
2000 DataType::Map(
2001 Arc::new(Field::new("entries", entries_struct_type, false)),
2002 false,
2003 ),
2004 true,
2005 );
2006 let schema = Arc::new(Schema::new(vec![stocks_field]));
2007 let builder = arrow::json::ReaderBuilder::new(schema).with_batch_size(64);
2008 let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
2009
2010 let batch = reader.next().unwrap().unwrap();
2011 roundtrip(batch, None);
2012 }
2013
2014 #[test]
2015 fn arrow_writer_2_level_struct() {
2016 let field_c = Field::new("c", DataType::Int32, true);
2018 let field_b = Field::new("b", DataType::Struct(vec![field_c].into()), true);
2019 let type_a = DataType::Struct(vec![field_b.clone()].into());
2020 let field_a = Field::new("a", type_a, true);
2021 let schema = Schema::new(vec![field_a.clone()]);
2022
2023 let c = Int32Array::from(vec![Some(1), None, Some(3), None, None, Some(6)]);
2025 let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
2026 .len(6)
2027 .null_bit_buffer(Some(Buffer::from([0b00100111])))
2028 .add_child_data(c.into_data())
2029 .build()
2030 .unwrap();
2031 let b = StructArray::from(b_data);
2032 let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
2033 .len(6)
2034 .null_bit_buffer(Some(Buffer::from([0b00101111])))
2035 .add_child_data(b.into_data())
2036 .build()
2037 .unwrap();
2038 let a = StructArray::from(a_data);
2039
2040 assert_eq!(a.null_count(), 1);
2041 assert_eq!(a.column(0).null_count(), 2);
2042
2043 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2045
2046 roundtrip(batch, Some(SMALL_SIZE / 2));
2047 }
2048
2049 #[test]
2050 fn arrow_writer_2_level_struct_non_null() {
2051 let field_c = Field::new("c", DataType::Int32, false);
2053 let type_b = DataType::Struct(vec![field_c].into());
2054 let field_b = Field::new("b", type_b.clone(), false);
2055 let type_a = DataType::Struct(vec![field_b].into());
2056 let field_a = Field::new("a", type_a.clone(), false);
2057 let schema = Schema::new(vec![field_a]);
2058
2059 let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
2061 let b_data = ArrayDataBuilder::new(type_b)
2062 .len(6)
2063 .add_child_data(c.into_data())
2064 .build()
2065 .unwrap();
2066 let b = StructArray::from(b_data);
2067 let a_data = ArrayDataBuilder::new(type_a)
2068 .len(6)
2069 .add_child_data(b.into_data())
2070 .build()
2071 .unwrap();
2072 let a = StructArray::from(a_data);
2073
2074 assert_eq!(a.null_count(), 0);
2075 assert_eq!(a.column(0).null_count(), 0);
2076
2077 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2079
2080 roundtrip(batch, Some(SMALL_SIZE / 2));
2081 }
2082
2083 #[test]
2084 fn arrow_writer_2_level_struct_mixed_null() {
2085 let field_c = Field::new("c", DataType::Int32, false);
2087 let type_b = DataType::Struct(vec![field_c].into());
2088 let field_b = Field::new("b", type_b.clone(), true);
2089 let type_a = DataType::Struct(vec![field_b].into());
2090 let field_a = Field::new("a", type_a.clone(), false);
2091 let schema = Schema::new(vec![field_a]);
2092
2093 let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
2095 let b_data = ArrayDataBuilder::new(type_b)
2096 .len(6)
2097 .null_bit_buffer(Some(Buffer::from([0b00100111])))
2098 .add_child_data(c.into_data())
2099 .build()
2100 .unwrap();
2101 let b = StructArray::from(b_data);
2102 let a_data = ArrayDataBuilder::new(type_a)
2104 .len(6)
2105 .add_child_data(b.into_data())
2106 .build()
2107 .unwrap();
2108 let a = StructArray::from(a_data);
2109
2110 assert_eq!(a.null_count(), 0);
2111 assert_eq!(a.column(0).null_count(), 2);
2112
2113 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2115
2116 roundtrip(batch, Some(SMALL_SIZE / 2));
2117 }
2118
2119 #[test]
2120 fn arrow_writer_2_level_struct_mixed_null_2() {
2121 let field_c = Field::new("c", DataType::Int32, false);
2123 let field_d = Field::new("d", DataType::FixedSizeBinary(4), false);
2124 let field_e = Field::new(
2125 "e",
2126 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2127 false,
2128 );
2129
2130 let field_b = Field::new(
2131 "b",
2132 DataType::Struct(vec![field_c, field_d, field_e].into()),
2133 false,
2134 );
2135 let type_a = DataType::Struct(vec![field_b.clone()].into());
2136 let field_a = Field::new("a", type_a, true);
2137 let schema = Schema::new(vec![field_a.clone()]);
2138
2139 let c = Int32Array::from_iter_values(0..6);
2141 let d = FixedSizeBinaryArray::try_from_iter(
2142 ["aaaa", "bbbb", "cccc", "dddd", "eeee", "ffff"].into_iter(),
2143 )
2144 .expect("four byte values");
2145 let e = Int32DictionaryArray::from_iter(["one", "two", "three", "four", "five", "one"]);
2146 let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
2147 .len(6)
2148 .add_child_data(c.into_data())
2149 .add_child_data(d.into_data())
2150 .add_child_data(e.into_data())
2151 .build()
2152 .unwrap();
2153 let b = StructArray::from(b_data);
2154 let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
2155 .len(6)
2156 .null_bit_buffer(Some(Buffer::from([0b00100101])))
2157 .add_child_data(b.into_data())
2158 .build()
2159 .unwrap();
2160 let a = StructArray::from(a_data);
2161
2162 assert_eq!(a.null_count(), 3);
2163 assert_eq!(a.column(0).null_count(), 0);
2164
2165 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2167
2168 roundtrip(batch, Some(SMALL_SIZE / 2));
2169 }
2170
2171 #[test]
2172 fn test_fixed_size_binary_in_dict() {
2173 fn test_fixed_size_binary_in_dict_inner<K>()
2174 where
2175 K: ArrowDictionaryKeyType,
2176 K::Native: FromPrimitive + ToPrimitive + TryFrom<u8>,
2177 <<K as arrow_array::ArrowPrimitiveType>::Native as TryFrom<u8>>::Error: std::fmt::Debug,
2178 {
2179 let field = Field::new(
2180 "a",
2181 DataType::Dictionary(
2182 Box::new(K::DATA_TYPE),
2183 Box::new(DataType::FixedSizeBinary(4)),
2184 ),
2185 false,
2186 );
2187 let schema = Schema::new(vec![field]);
2188
2189 let keys: Vec<K::Native> = vec![
2190 K::Native::try_from(0u8).unwrap(),
2191 K::Native::try_from(0u8).unwrap(),
2192 K::Native::try_from(1u8).unwrap(),
2193 ];
2194 let keys = PrimitiveArray::<K>::from_iter_values(keys);
2195 let values = FixedSizeBinaryArray::try_from_iter(
2196 vec![vec![0, 0, 0, 0], vec![1, 1, 1, 1]].into_iter(),
2197 )
2198 .unwrap();
2199
2200 let data = DictionaryArray::<K>::new(keys, Arc::new(values));
2201 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(data)]).unwrap();
2202 roundtrip(batch, None);
2203 }
2204
2205 test_fixed_size_binary_in_dict_inner::<UInt8Type>();
2206 test_fixed_size_binary_in_dict_inner::<UInt16Type>();
2207 test_fixed_size_binary_in_dict_inner::<UInt32Type>();
2208 test_fixed_size_binary_in_dict_inner::<UInt16Type>();
2209 test_fixed_size_binary_in_dict_inner::<Int8Type>();
2210 test_fixed_size_binary_in_dict_inner::<Int16Type>();
2211 test_fixed_size_binary_in_dict_inner::<Int32Type>();
2212 test_fixed_size_binary_in_dict_inner::<Int64Type>();
2213 }
2214
2215 #[test]
2216 fn test_empty_dict() {
2217 let struct_fields = Fields::from(vec![Field::new(
2218 "dict",
2219 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2220 false,
2221 )]);
2222
2223 let schema = Schema::new(vec![Field::new_struct(
2224 "struct",
2225 struct_fields.clone(),
2226 true,
2227 )]);
2228 let dictionary = Arc::new(DictionaryArray::new(
2229 Int32Array::new_null(5),
2230 Arc::new(StringArray::new_null(0)),
2231 ));
2232
2233 let s = StructArray::new(
2234 struct_fields,
2235 vec![dictionary],
2236 Some(NullBuffer::new_null(5)),
2237 );
2238
2239 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(s)]).unwrap();
2240 roundtrip(batch, None);
2241 }
2242 #[test]
2243 fn arrow_writer_page_size() {
2244 let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)]));
2245
2246 let mut builder = StringBuilder::with_capacity(100, 329 * 10_000);
2247
2248 for i in 0..10 {
2250 let value = i
2251 .to_string()
2252 .repeat(10)
2253 .chars()
2254 .take(10)
2255 .collect::<String>();
2256
2257 builder.append_value(value);
2258 }
2259
2260 let array = Arc::new(builder.finish());
2261
2262 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
2263
2264 let file = tempfile::tempfile().unwrap();
2265
2266 let props = WriterProperties::builder()
2268 .set_data_page_size_limit(1)
2269 .set_dictionary_page_size_limit(1)
2270 .set_write_batch_size(1)
2271 .build();
2272
2273 let mut writer =
2274 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
2275 .expect("Unable to write file");
2276 writer.write(&batch).unwrap();
2277 writer.close().unwrap();
2278
2279 let options = ReadOptionsBuilder::new().with_page_index().build();
2280 let reader =
2281 SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap();
2282
2283 let column = reader.metadata().row_group(0).columns();
2284
2285 assert_eq!(column.len(), 1);
2286
2287 assert!(
2290 column[0].dictionary_page_offset().is_some(),
2291 "Expected a dictionary page"
2292 );
2293
2294 assert!(reader.metadata().offset_index().is_some());
2295 let offset_indexes = &reader.metadata().offset_index().unwrap()[0];
2296
2297 let page_locations = offset_indexes[0].page_locations.clone();
2298
2299 assert_eq!(
2302 page_locations.len(),
2303 10,
2304 "Expected 10 pages but got {page_locations:#?}"
2305 );
2306 }
2307
2308 #[test]
2309 fn arrow_writer_float_nans() {
2310 let f16_field = Field::new("a", DataType::Float16, false);
2311 let f32_field = Field::new("b", DataType::Float32, false);
2312 let f64_field = Field::new("c", DataType::Float64, false);
2313 let schema = Schema::new(vec![f16_field, f32_field, f64_field]);
2314
2315 let f16_values = (0..MEDIUM_SIZE)
2316 .map(|i| {
2317 Some(if i % 2 == 0 {
2318 f16::NAN
2319 } else {
2320 f16::from_f32(i as f32)
2321 })
2322 })
2323 .collect::<Float16Array>();
2324
2325 let f32_values = (0..MEDIUM_SIZE)
2326 .map(|i| Some(if i % 2 == 0 { f32::NAN } else { i as f32 }))
2327 .collect::<Float32Array>();
2328
2329 let f64_values = (0..MEDIUM_SIZE)
2330 .map(|i| Some(if i % 2 == 0 { f64::NAN } else { i as f64 }))
2331 .collect::<Float64Array>();
2332
2333 let batch = RecordBatch::try_new(
2334 Arc::new(schema),
2335 vec![
2336 Arc::new(f16_values),
2337 Arc::new(f32_values),
2338 Arc::new(f64_values),
2339 ],
2340 )
2341 .unwrap();
2342
2343 roundtrip(batch, None);
2344 }
2345
2346 const SMALL_SIZE: usize = 7;
2347 const MEDIUM_SIZE: usize = 63;
2348
2349 fn roundtrip(expected_batch: RecordBatch, max_row_group_size: Option<usize>) -> Vec<Bytes> {
2352 let mut files = vec![];
2353 for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2354 let mut props = WriterProperties::builder().set_writer_version(version);
2355
2356 if let Some(size) = max_row_group_size {
2357 props = props.set_max_row_group_size(size)
2358 }
2359
2360 let props = props.build();
2361 files.push(roundtrip_opts(&expected_batch, props))
2362 }
2363 files
2364 }
2365
2366 fn roundtrip_opts_with_array_validation<F>(
2370 expected_batch: &RecordBatch,
2371 props: WriterProperties,
2372 validate: F,
2373 ) -> Bytes
2374 where
2375 F: Fn(&ArrayData, &ArrayData),
2376 {
2377 let mut file = vec![];
2378
2379 let mut writer = ArrowWriter::try_new(&mut file, expected_batch.schema(), Some(props))
2380 .expect("Unable to write file");
2381 writer.write(expected_batch).unwrap();
2382 writer.close().unwrap();
2383
2384 let file = Bytes::from(file);
2385 let mut record_batch_reader =
2386 ParquetRecordBatchReader::try_new(file.clone(), 1024).unwrap();
2387
2388 let actual_batch = record_batch_reader
2389 .next()
2390 .expect("No batch found")
2391 .expect("Unable to get batch");
2392
2393 assert_eq!(expected_batch.schema(), actual_batch.schema());
2394 assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
2395 assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
2396 for i in 0..expected_batch.num_columns() {
2397 let expected_data = expected_batch.column(i).to_data();
2398 let actual_data = actual_batch.column(i).to_data();
2399 validate(&expected_data, &actual_data);
2400 }
2401
2402 file
2403 }
2404
2405 fn roundtrip_opts(expected_batch: &RecordBatch, props: WriterProperties) -> Bytes {
2406 roundtrip_opts_with_array_validation(expected_batch, props, |a, b| {
2407 a.validate_full().expect("valid expected data");
2408 b.validate_full().expect("valid actual data");
2409 assert_eq!(a, b)
2410 })
2411 }
2412
2413 struct RoundTripOptions {
2414 values: ArrayRef,
2415 schema: SchemaRef,
2416 bloom_filter: bool,
2417 bloom_filter_position: BloomFilterPosition,
2418 }
2419
2420 impl RoundTripOptions {
2421 fn new(values: ArrayRef, nullable: bool) -> Self {
2422 let data_type = values.data_type().clone();
2423 let schema = Schema::new(vec![Field::new("col", data_type, nullable)]);
2424 Self {
2425 values,
2426 schema: Arc::new(schema),
2427 bloom_filter: false,
2428 bloom_filter_position: BloomFilterPosition::AfterRowGroup,
2429 }
2430 }
2431 }
2432
2433 fn one_column_roundtrip(values: ArrayRef, nullable: bool) -> Vec<Bytes> {
2434 one_column_roundtrip_with_options(RoundTripOptions::new(values, nullable))
2435 }
2436
2437 fn one_column_roundtrip_with_schema(values: ArrayRef, schema: SchemaRef) -> Vec<Bytes> {
2438 let mut options = RoundTripOptions::new(values, false);
2439 options.schema = schema;
2440 one_column_roundtrip_with_options(options)
2441 }
2442
2443 fn one_column_roundtrip_with_options(options: RoundTripOptions) -> Vec<Bytes> {
2444 let RoundTripOptions {
2445 values,
2446 schema,
2447 bloom_filter,
2448 bloom_filter_position,
2449 } = options;
2450
2451 let encodings = match values.data_type() {
2452 DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary | DataType::LargeBinary => {
2453 vec![
2454 Encoding::PLAIN,
2455 Encoding::DELTA_BYTE_ARRAY,
2456 Encoding::DELTA_LENGTH_BYTE_ARRAY,
2457 ]
2458 }
2459 DataType::Int64
2460 | DataType::Int32
2461 | DataType::Int16
2462 | DataType::Int8
2463 | DataType::UInt64
2464 | DataType::UInt32
2465 | DataType::UInt16
2466 | DataType::UInt8 => vec![
2467 Encoding::PLAIN,
2468 Encoding::DELTA_BINARY_PACKED,
2469 Encoding::BYTE_STREAM_SPLIT,
2470 ],
2471 DataType::Float32 | DataType::Float64 => {
2472 vec![Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT]
2473 }
2474 _ => vec![Encoding::PLAIN],
2475 };
2476
2477 let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2478
2479 let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
2480
2481 let mut files = vec![];
2482 for dictionary_size in [0, 1, 1024] {
2483 for encoding in &encodings {
2484 for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2485 for row_group_size in row_group_sizes {
2486 let props = WriterProperties::builder()
2487 .set_writer_version(version)
2488 .set_max_row_group_size(row_group_size)
2489 .set_dictionary_enabled(dictionary_size != 0)
2490 .set_dictionary_page_size_limit(dictionary_size.max(1))
2491 .set_encoding(*encoding)
2492 .set_bloom_filter_enabled(bloom_filter)
2493 .set_bloom_filter_position(bloom_filter_position)
2494 .build();
2495
2496 files.push(roundtrip_opts(&expected_batch, props))
2497 }
2498 }
2499 }
2500 }
2501 files
2502 }
2503
2504 fn values_required<A, I>(iter: I) -> Vec<Bytes>
2505 where
2506 A: From<Vec<I::Item>> + Array + 'static,
2507 I: IntoIterator,
2508 {
2509 let raw_values: Vec<_> = iter.into_iter().collect();
2510 let values = Arc::new(A::from(raw_values));
2511 one_column_roundtrip(values, false)
2512 }
2513
2514 fn values_optional<A, I>(iter: I) -> Vec<Bytes>
2515 where
2516 A: From<Vec<Option<I::Item>>> + Array + 'static,
2517 I: IntoIterator,
2518 {
2519 let optional_raw_values: Vec<_> = iter
2520 .into_iter()
2521 .enumerate()
2522 .map(|(i, v)| if i % 2 == 0 { None } else { Some(v) })
2523 .collect();
2524 let optional_values = Arc::new(A::from(optional_raw_values));
2525 one_column_roundtrip(optional_values, true)
2526 }
2527
2528 fn required_and_optional<A, I>(iter: I)
2529 where
2530 A: From<Vec<I::Item>> + From<Vec<Option<I::Item>>> + Array + 'static,
2531 I: IntoIterator + Clone,
2532 {
2533 values_required::<A, I>(iter.clone());
2534 values_optional::<A, I>(iter);
2535 }
2536
2537 fn check_bloom_filter<T: AsBytes>(
2538 files: Vec<Bytes>,
2539 file_column: String,
2540 positive_values: Vec<T>,
2541 negative_values: Vec<T>,
2542 ) {
2543 files.into_iter().take(1).for_each(|file| {
2544 let file_reader = SerializedFileReader::new_with_options(
2545 file,
2546 ReadOptionsBuilder::new()
2547 .with_reader_properties(
2548 ReaderProperties::builder()
2549 .set_read_bloom_filter(true)
2550 .build(),
2551 )
2552 .build(),
2553 )
2554 .expect("Unable to open file as Parquet");
2555 let metadata = file_reader.metadata();
2556
2557 let mut bloom_filters: Vec<_> = vec![];
2559 for (ri, row_group) in metadata.row_groups().iter().enumerate() {
2560 if let Some((column_index, _)) = row_group
2561 .columns()
2562 .iter()
2563 .enumerate()
2564 .find(|(_, column)| column.column_path().string() == file_column)
2565 {
2566 let row_group_reader = file_reader
2567 .get_row_group(ri)
2568 .expect("Unable to read row group");
2569 if let Some(sbbf) = row_group_reader.get_column_bloom_filter(column_index) {
2570 bloom_filters.push(sbbf.clone());
2571 } else {
2572 panic!("No bloom filter for column named {file_column} found");
2573 }
2574 } else {
2575 panic!("No column named {file_column} found");
2576 }
2577 }
2578
2579 positive_values.iter().for_each(|value| {
2580 let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2581 assert!(
2582 found.is_some(),
2583 "{}",
2584 format!("Value {:?} should be in bloom filter", value.as_bytes())
2585 );
2586 });
2587
2588 negative_values.iter().for_each(|value| {
2589 let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2590 assert!(
2591 found.is_none(),
2592 "{}",
2593 format!("Value {:?} should not be in bloom filter", value.as_bytes())
2594 );
2595 });
2596 });
2597 }
2598
2599 #[test]
2600 fn all_null_primitive_single_column() {
2601 let values = Arc::new(Int32Array::from(vec![None; SMALL_SIZE]));
2602 one_column_roundtrip(values, true);
2603 }
2604 #[test]
2605 fn null_single_column() {
2606 let values = Arc::new(NullArray::new(SMALL_SIZE));
2607 one_column_roundtrip(values, true);
2608 }
2610
2611 #[test]
2612 fn bool_single_column() {
2613 required_and_optional::<BooleanArray, _>(
2614 [true, false].iter().cycle().copied().take(SMALL_SIZE),
2615 );
2616 }
2617
2618 #[test]
2619 fn bool_large_single_column() {
2620 let values = Arc::new(
2621 [None, Some(true), Some(false)]
2622 .iter()
2623 .cycle()
2624 .copied()
2625 .take(200_000)
2626 .collect::<BooleanArray>(),
2627 );
2628 let schema = Schema::new(vec![Field::new("col", values.data_type().clone(), true)]);
2629 let expected_batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2630 let file = tempfile::tempfile().unwrap();
2631
2632 let mut writer =
2633 ArrowWriter::try_new(file.try_clone().unwrap(), expected_batch.schema(), None)
2634 .expect("Unable to write file");
2635 writer.write(&expected_batch).unwrap();
2636 writer.close().unwrap();
2637 }
2638
2639 #[test]
2640 fn check_page_offset_index_with_nan() {
2641 let values = Arc::new(Float64Array::from(vec![f64::NAN; 10]));
2642 let schema = Schema::new(vec![Field::new("col", DataType::Float64, true)]);
2643 let batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2644
2645 let mut out = Vec::with_capacity(1024);
2646 let mut writer =
2647 ArrowWriter::try_new(&mut out, batch.schema(), None).expect("Unable to write file");
2648 writer.write(&batch).unwrap();
2649 let file_meta_data = writer.close().unwrap();
2650 for row_group in file_meta_data.row_groups() {
2651 for column in row_group.columns() {
2652 assert!(column.offset_index_offset().is_some());
2653 assert!(column.offset_index_length().is_some());
2654 assert!(column.column_index_offset().is_none());
2655 assert!(column.column_index_length().is_none());
2656 }
2657 }
2658 }
2659
2660 #[test]
2661 fn i8_single_column() {
2662 required_and_optional::<Int8Array, _>(0..SMALL_SIZE as i8);
2663 }
2664
2665 #[test]
2666 fn i16_single_column() {
2667 required_and_optional::<Int16Array, _>(0..SMALL_SIZE as i16);
2668 }
2669
2670 #[test]
2671 fn i32_single_column() {
2672 required_and_optional::<Int32Array, _>(0..SMALL_SIZE as i32);
2673 }
2674
2675 #[test]
2676 fn i64_single_column() {
2677 required_and_optional::<Int64Array, _>(0..SMALL_SIZE as i64);
2678 }
2679
2680 #[test]
2681 fn u8_single_column() {
2682 required_and_optional::<UInt8Array, _>(0..SMALL_SIZE as u8);
2683 }
2684
2685 #[test]
2686 fn u16_single_column() {
2687 required_and_optional::<UInt16Array, _>(0..SMALL_SIZE as u16);
2688 }
2689
2690 #[test]
2691 fn u32_single_column() {
2692 required_and_optional::<UInt32Array, _>(0..SMALL_SIZE as u32);
2693 }
2694
2695 #[test]
2696 fn u64_single_column() {
2697 required_and_optional::<UInt64Array, _>(0..SMALL_SIZE as u64);
2698 }
2699
2700 #[test]
2701 fn f32_single_column() {
2702 required_and_optional::<Float32Array, _>((0..SMALL_SIZE).map(|i| i as f32));
2703 }
2704
2705 #[test]
2706 fn f64_single_column() {
2707 required_and_optional::<Float64Array, _>((0..SMALL_SIZE).map(|i| i as f64));
2708 }
2709
2710 #[test]
2715 fn timestamp_second_single_column() {
2716 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2717 let values = Arc::new(TimestampSecondArray::from(raw_values));
2718
2719 one_column_roundtrip(values, false);
2720 }
2721
2722 #[test]
2723 fn timestamp_millisecond_single_column() {
2724 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2725 let values = Arc::new(TimestampMillisecondArray::from(raw_values));
2726
2727 one_column_roundtrip(values, false);
2728 }
2729
2730 #[test]
2731 fn timestamp_microsecond_single_column() {
2732 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2733 let values = Arc::new(TimestampMicrosecondArray::from(raw_values));
2734
2735 one_column_roundtrip(values, false);
2736 }
2737
2738 #[test]
2739 fn timestamp_nanosecond_single_column() {
2740 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2741 let values = Arc::new(TimestampNanosecondArray::from(raw_values));
2742
2743 one_column_roundtrip(values, false);
2744 }
2745
2746 #[test]
2747 fn date32_single_column() {
2748 required_and_optional::<Date32Array, _>(0..SMALL_SIZE as i32);
2749 }
2750
2751 #[test]
2752 fn date64_single_column() {
2753 required_and_optional::<Date64Array, _>(
2755 (0..(SMALL_SIZE as i64 * 86400000)).step_by(86400000),
2756 );
2757 }
2758
2759 #[test]
2760 fn time32_second_single_column() {
2761 required_and_optional::<Time32SecondArray, _>(0..SMALL_SIZE as i32);
2762 }
2763
2764 #[test]
2765 fn time32_millisecond_single_column() {
2766 required_and_optional::<Time32MillisecondArray, _>(0..SMALL_SIZE as i32);
2767 }
2768
2769 #[test]
2770 fn time64_microsecond_single_column() {
2771 required_and_optional::<Time64MicrosecondArray, _>(0..SMALL_SIZE as i64);
2772 }
2773
2774 #[test]
2775 fn time64_nanosecond_single_column() {
2776 required_and_optional::<Time64NanosecondArray, _>(0..SMALL_SIZE as i64);
2777 }
2778
2779 #[test]
2780 fn duration_second_single_column() {
2781 required_and_optional::<DurationSecondArray, _>(0..SMALL_SIZE as i64);
2782 }
2783
2784 #[test]
2785 fn duration_millisecond_single_column() {
2786 required_and_optional::<DurationMillisecondArray, _>(0..SMALL_SIZE as i64);
2787 }
2788
2789 #[test]
2790 fn duration_microsecond_single_column() {
2791 required_and_optional::<DurationMicrosecondArray, _>(0..SMALL_SIZE as i64);
2792 }
2793
2794 #[test]
2795 fn duration_nanosecond_single_column() {
2796 required_and_optional::<DurationNanosecondArray, _>(0..SMALL_SIZE as i64);
2797 }
2798
2799 #[test]
2800 fn interval_year_month_single_column() {
2801 required_and_optional::<IntervalYearMonthArray, _>(0..SMALL_SIZE as i32);
2802 }
2803
2804 #[test]
2805 fn interval_day_time_single_column() {
2806 required_and_optional::<IntervalDayTimeArray, _>(vec![
2807 IntervalDayTime::new(0, 1),
2808 IntervalDayTime::new(0, 3),
2809 IntervalDayTime::new(3, -2),
2810 IntervalDayTime::new(-200, 4),
2811 ]);
2812 }
2813
2814 #[test]
2815 #[should_panic(
2816 expected = "Attempting to write an Arrow interval type MonthDayNano to parquet that is not yet implemented"
2817 )]
2818 fn interval_month_day_nano_single_column() {
2819 required_and_optional::<IntervalMonthDayNanoArray, _>(vec![
2820 IntervalMonthDayNano::new(0, 1, 5),
2821 IntervalMonthDayNano::new(0, 3, 2),
2822 IntervalMonthDayNano::new(3, -2, -5),
2823 IntervalMonthDayNano::new(-200, 4, -1),
2824 ]);
2825 }
2826
2827 #[test]
2828 fn binary_single_column() {
2829 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2830 let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
2831 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2832
2833 values_required::<BinaryArray, _>(many_vecs_iter);
2835 }
2836
2837 #[test]
2838 fn binary_view_single_column() {
2839 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2840 let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
2841 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2842
2843 values_required::<BinaryViewArray, _>(many_vecs_iter);
2845 }
2846
2847 #[test]
2848 fn i32_column_bloom_filter_at_end() {
2849 let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
2850 let mut options = RoundTripOptions::new(array, false);
2851 options.bloom_filter = true;
2852 options.bloom_filter_position = BloomFilterPosition::End;
2853
2854 let files = one_column_roundtrip_with_options(options);
2855 check_bloom_filter(
2856 files,
2857 "col".to_string(),
2858 (0..SMALL_SIZE as i32).collect(),
2859 (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
2860 );
2861 }
2862
2863 #[test]
2864 fn i32_column_bloom_filter() {
2865 let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
2866 let mut options = RoundTripOptions::new(array, false);
2867 options.bloom_filter = true;
2868
2869 let files = one_column_roundtrip_with_options(options);
2870 check_bloom_filter(
2871 files,
2872 "col".to_string(),
2873 (0..SMALL_SIZE as i32).collect(),
2874 (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
2875 );
2876 }
2877
2878 #[test]
2879 fn binary_column_bloom_filter() {
2880 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2881 let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
2882 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2883
2884 let array = Arc::new(BinaryArray::from_iter_values(many_vecs_iter));
2885 let mut options = RoundTripOptions::new(array, false);
2886 options.bloom_filter = true;
2887
2888 let files = one_column_roundtrip_with_options(options);
2889 check_bloom_filter(
2890 files,
2891 "col".to_string(),
2892 many_vecs,
2893 vec![vec![(SMALL_SIZE + 1) as u8]],
2894 );
2895 }
2896
2897 #[test]
2898 fn empty_string_null_column_bloom_filter() {
2899 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2900 let raw_strs = raw_values.iter().map(|s| s.as_str());
2901
2902 let array = Arc::new(StringArray::from_iter_values(raw_strs));
2903 let mut options = RoundTripOptions::new(array, false);
2904 options.bloom_filter = true;
2905
2906 let files = one_column_roundtrip_with_options(options);
2907
2908 let optional_raw_values: Vec<_> = raw_values
2909 .iter()
2910 .enumerate()
2911 .filter_map(|(i, v)| if i % 2 == 0 { None } else { Some(v.as_str()) })
2912 .collect();
2913 check_bloom_filter(files, "col".to_string(), optional_raw_values, vec![""]);
2915 }
2916
2917 #[test]
2918 fn large_binary_single_column() {
2919 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2920 let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
2921 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2922
2923 values_required::<LargeBinaryArray, _>(many_vecs_iter);
2925 }
2926
2927 #[test]
2928 fn fixed_size_binary_single_column() {
2929 let mut builder = FixedSizeBinaryBuilder::new(4);
2930 builder.append_value(b"0123").unwrap();
2931 builder.append_null();
2932 builder.append_value(b"8910").unwrap();
2933 builder.append_value(b"1112").unwrap();
2934 let array = Arc::new(builder.finish());
2935
2936 one_column_roundtrip(array, true);
2937 }
2938
2939 #[test]
2940 fn string_single_column() {
2941 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2942 let raw_strs = raw_values.iter().map(|s| s.as_str());
2943
2944 required_and_optional::<StringArray, _>(raw_strs);
2945 }
2946
2947 #[test]
2948 fn large_string_single_column() {
2949 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2950 let raw_strs = raw_values.iter().map(|s| s.as_str());
2951
2952 required_and_optional::<LargeStringArray, _>(raw_strs);
2953 }
2954
2955 #[test]
2956 fn string_view_single_column() {
2957 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2958 let raw_strs = raw_values.iter().map(|s| s.as_str());
2959
2960 required_and_optional::<StringViewArray, _>(raw_strs);
2961 }
2962
2963 #[test]
2964 fn null_list_single_column() {
2965 let null_field = Field::new_list_field(DataType::Null, true);
2966 let list_field = Field::new("emptylist", DataType::List(Arc::new(null_field)), true);
2967
2968 let schema = Schema::new(vec![list_field]);
2969
2970 let a_values = NullArray::new(2);
2972 let a_value_offsets = arrow::buffer::Buffer::from([0, 0, 0, 2].to_byte_slice());
2973 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2974 DataType::Null,
2975 true,
2976 ))))
2977 .len(3)
2978 .add_buffer(a_value_offsets)
2979 .null_bit_buffer(Some(Buffer::from([0b00000101])))
2980 .add_child_data(a_values.into_data())
2981 .build()
2982 .unwrap();
2983
2984 let a = ListArray::from(a_list_data);
2985
2986 assert!(a.is_valid(0));
2987 assert!(!a.is_valid(1));
2988 assert!(a.is_valid(2));
2989
2990 assert_eq!(a.value(0).len(), 0);
2991 assert_eq!(a.value(2).len(), 2);
2992 assert_eq!(a.value(2).logical_nulls().unwrap().null_count(), 2);
2993
2994 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2995 roundtrip(batch, None);
2996 }
2997
2998 #[test]
2999 fn list_single_column() {
3000 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
3001 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
3002 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
3003 DataType::Int32,
3004 false,
3005 ))))
3006 .len(5)
3007 .add_buffer(a_value_offsets)
3008 .null_bit_buffer(Some(Buffer::from([0b00011011])))
3009 .add_child_data(a_values.into_data())
3010 .build()
3011 .unwrap();
3012
3013 assert_eq!(a_list_data.null_count(), 1);
3014
3015 let a = ListArray::from(a_list_data);
3016 let values = Arc::new(a);
3017
3018 one_column_roundtrip(values, true);
3019 }
3020
3021 #[test]
3022 fn large_list_single_column() {
3023 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
3024 let a_value_offsets = arrow::buffer::Buffer::from([0i64, 1, 3, 3, 6, 10].to_byte_slice());
3025 let a_list_data = ArrayData::builder(DataType::LargeList(Arc::new(Field::new(
3026 "large_item",
3027 DataType::Int32,
3028 true,
3029 ))))
3030 .len(5)
3031 .add_buffer(a_value_offsets)
3032 .add_child_data(a_values.into_data())
3033 .null_bit_buffer(Some(Buffer::from([0b00011011])))
3034 .build()
3035 .unwrap();
3036
3037 assert_eq!(a_list_data.null_count(), 1);
3039
3040 let a = LargeListArray::from(a_list_data);
3041 let values = Arc::new(a);
3042
3043 one_column_roundtrip(values, true);
3044 }
3045
3046 #[test]
3047 fn list_nested_nulls() {
3048 use arrow::datatypes::Int32Type;
3049 let data = vec![
3050 Some(vec![Some(1)]),
3051 Some(vec![Some(2), Some(3)]),
3052 None,
3053 Some(vec![Some(4), Some(5), None]),
3054 Some(vec![None]),
3055 Some(vec![Some(6), Some(7)]),
3056 ];
3057
3058 let list = ListArray::from_iter_primitive::<Int32Type, _, _>(data.clone());
3059 one_column_roundtrip(Arc::new(list), true);
3060
3061 let list = LargeListArray::from_iter_primitive::<Int32Type, _, _>(data);
3062 one_column_roundtrip(Arc::new(list), true);
3063 }
3064
3065 #[test]
3066 fn struct_single_column() {
3067 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
3068 let struct_field_a = Arc::new(Field::new("f", DataType::Int32, false));
3069 let s = StructArray::from(vec![(struct_field_a, Arc::new(a_values) as ArrayRef)]);
3070
3071 let values = Arc::new(s);
3072 one_column_roundtrip(values, false);
3073 }
3074
3075 #[test]
3076 fn list_and_map_coerced_names() {
3077 let list_field =
3079 Field::new_list("my_list", Field::new("item", DataType::Int32, false), false);
3080 let map_field = Field::new_map(
3081 "my_map",
3082 "entries",
3083 Field::new("keys", DataType::Int32, false),
3084 Field::new("values", DataType::Int32, true),
3085 false,
3086 true,
3087 );
3088
3089 let list_array = create_random_array(&list_field, 100, 0.0, 0.0).unwrap();
3090 let map_array = create_random_array(&map_field, 100, 0.0, 0.0).unwrap();
3091
3092 let arrow_schema = Arc::new(Schema::new(vec![list_field, map_field]));
3093
3094 let props = Some(WriterProperties::builder().set_coerce_types(true).build());
3096 let file = tempfile::tempfile().unwrap();
3097 let mut writer =
3098 ArrowWriter::try_new(file.try_clone().unwrap(), arrow_schema.clone(), props).unwrap();
3099
3100 let batch = RecordBatch::try_new(arrow_schema, vec![list_array, map_array]).unwrap();
3101 writer.write(&batch).unwrap();
3102 let file_metadata = writer.close().unwrap();
3103
3104 let schema = file_metadata.file_metadata().schema();
3105 let list_field = &schema.get_fields()[0].get_fields()[0];
3107 assert_eq!(list_field.get_fields()[0].name(), "element");
3108
3109 let map_field = &schema.get_fields()[1].get_fields()[0];
3110 assert_eq!(map_field.name(), "key_value");
3112 assert_eq!(map_field.get_fields()[0].name(), "key");
3114 assert_eq!(map_field.get_fields()[1].name(), "value");
3116
3117 let reader = SerializedFileReader::new(file).unwrap();
3119 let file_schema = reader.metadata().file_metadata().schema();
3120 let fields = file_schema.get_fields();
3121 let list_field = &fields[0].get_fields()[0];
3122 assert_eq!(list_field.get_fields()[0].name(), "element");
3123 let map_field = &fields[1].get_fields()[0];
3124 assert_eq!(map_field.name(), "key_value");
3125 assert_eq!(map_field.get_fields()[0].name(), "key");
3126 assert_eq!(map_field.get_fields()[1].name(), "value");
3127 }
3128
3129 #[test]
3130 fn fallback_flush_data_page() {
3131 let raw_values: Vec<_> = (0..MEDIUM_SIZE).map(|i| i.to_string()).collect();
3133 let values = Arc::new(StringArray::from(raw_values));
3134 let encodings = vec![
3135 Encoding::DELTA_BYTE_ARRAY,
3136 Encoding::DELTA_LENGTH_BYTE_ARRAY,
3137 ];
3138 let data_type = values.data_type().clone();
3139 let schema = Arc::new(Schema::new(vec![Field::new("col", data_type, false)]));
3140 let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3141
3142 let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
3143 let data_page_size_limit: usize = 32;
3144 let write_batch_size: usize = 16;
3145
3146 for encoding in &encodings {
3147 for row_group_size in row_group_sizes {
3148 let props = WriterProperties::builder()
3149 .set_writer_version(WriterVersion::PARQUET_2_0)
3150 .set_max_row_group_size(row_group_size)
3151 .set_dictionary_enabled(false)
3152 .set_encoding(*encoding)
3153 .set_data_page_size_limit(data_page_size_limit)
3154 .set_write_batch_size(write_batch_size)
3155 .build();
3156
3157 roundtrip_opts_with_array_validation(&expected_batch, props, |a, b| {
3158 let string_array_a = StringArray::from(a.clone());
3159 let string_array_b = StringArray::from(b.clone());
3160 let vec_a: Vec<&str> = string_array_a.iter().map(|v| v.unwrap()).collect();
3161 let vec_b: Vec<&str> = string_array_b.iter().map(|v| v.unwrap()).collect();
3162 assert_eq!(
3163 vec_a, vec_b,
3164 "failed for encoder: {encoding:?} and row_group_size: {row_group_size:?}"
3165 );
3166 });
3167 }
3168 }
3169 }
3170
3171 #[test]
3172 fn arrow_writer_string_dictionary() {
3173 #[allow(deprecated)]
3175 let schema = Arc::new(Schema::new(vec![Field::new_dict(
3176 "dictionary",
3177 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3178 true,
3179 42,
3180 true,
3181 )]));
3182
3183 let d: Int32DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
3185 .iter()
3186 .copied()
3187 .collect();
3188
3189 one_column_roundtrip_with_schema(Arc::new(d), schema);
3191 }
3192
3193 #[test]
3194 fn arrow_writer_test_type_compatibility() {
3195 fn ensure_compatible_write<T1, T2>(array1: T1, array2: T2, expected_result: T1)
3196 where
3197 T1: Array + 'static,
3198 T2: Array + 'static,
3199 {
3200 let schema1 = Arc::new(Schema::new(vec![Field::new(
3201 "a",
3202 array1.data_type().clone(),
3203 false,
3204 )]));
3205
3206 let file = tempfile().unwrap();
3207 let mut writer =
3208 ArrowWriter::try_new(file.try_clone().unwrap(), schema1.clone(), None).unwrap();
3209
3210 let rb1 = RecordBatch::try_new(schema1.clone(), vec![Arc::new(array1)]).unwrap();
3211 writer.write(&rb1).unwrap();
3212
3213 let schema2 = Arc::new(Schema::new(vec![Field::new(
3214 "a",
3215 array2.data_type().clone(),
3216 false,
3217 )]));
3218 let rb2 = RecordBatch::try_new(schema2, vec![Arc::new(array2)]).unwrap();
3219 writer.write(&rb2).unwrap();
3220
3221 writer.close().unwrap();
3222
3223 let mut record_batch_reader =
3224 ParquetRecordBatchReader::try_new(file.try_clone().unwrap(), 1024).unwrap();
3225 let actual_batch = record_batch_reader.next().unwrap().unwrap();
3226
3227 let expected_batch =
3228 RecordBatch::try_new(schema1, vec![Arc::new(expected_result)]).unwrap();
3229 assert_eq!(actual_batch, expected_batch);
3230 }
3231
3232 ensure_compatible_write(
3235 DictionaryArray::new(
3236 UInt8Array::from_iter_values(vec![0]),
3237 Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3238 ),
3239 StringArray::from_iter_values(vec!["barquet"]),
3240 DictionaryArray::new(
3241 UInt8Array::from_iter_values(vec![0, 1]),
3242 Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3243 ),
3244 );
3245
3246 ensure_compatible_write(
3247 StringArray::from_iter_values(vec!["parquet"]),
3248 DictionaryArray::new(
3249 UInt8Array::from_iter_values(vec![0]),
3250 Arc::new(StringArray::from_iter_values(vec!["barquet"])),
3251 ),
3252 StringArray::from_iter_values(vec!["parquet", "barquet"]),
3253 );
3254
3255 ensure_compatible_write(
3258 DictionaryArray::new(
3259 UInt8Array::from_iter_values(vec![0]),
3260 Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3261 ),
3262 DictionaryArray::new(
3263 UInt16Array::from_iter_values(vec![0]),
3264 Arc::new(StringArray::from_iter_values(vec!["barquet"])),
3265 ),
3266 DictionaryArray::new(
3267 UInt8Array::from_iter_values(vec![0, 1]),
3268 Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3269 ),
3270 );
3271
3272 ensure_compatible_write(
3274 DictionaryArray::new(
3275 UInt8Array::from_iter_values(vec![0]),
3276 Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3277 ),
3278 DictionaryArray::new(
3279 UInt8Array::from_iter_values(vec![0]),
3280 Arc::new(LargeStringArray::from_iter_values(vec!["barquet"])),
3281 ),
3282 DictionaryArray::new(
3283 UInt8Array::from_iter_values(vec![0, 1]),
3284 Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3285 ),
3286 );
3287
3288 ensure_compatible_write(
3290 DictionaryArray::new(
3291 UInt8Array::from_iter_values(vec![0]),
3292 Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3293 ),
3294 LargeStringArray::from_iter_values(vec!["barquet"]),
3295 DictionaryArray::new(
3296 UInt8Array::from_iter_values(vec![0, 1]),
3297 Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3298 ),
3299 );
3300
3301 ensure_compatible_write(
3304 StringArray::from_iter_values(vec!["parquet"]),
3305 LargeStringArray::from_iter_values(vec!["barquet"]),
3306 StringArray::from_iter_values(vec!["parquet", "barquet"]),
3307 );
3308
3309 ensure_compatible_write(
3310 LargeStringArray::from_iter_values(vec!["parquet"]),
3311 StringArray::from_iter_values(vec!["barquet"]),
3312 LargeStringArray::from_iter_values(vec!["parquet", "barquet"]),
3313 );
3314
3315 ensure_compatible_write(
3316 StringArray::from_iter_values(vec!["parquet"]),
3317 StringViewArray::from_iter_values(vec!["barquet"]),
3318 StringArray::from_iter_values(vec!["parquet", "barquet"]),
3319 );
3320
3321 ensure_compatible_write(
3322 StringViewArray::from_iter_values(vec!["parquet"]),
3323 StringArray::from_iter_values(vec!["barquet"]),
3324 StringViewArray::from_iter_values(vec!["parquet", "barquet"]),
3325 );
3326
3327 ensure_compatible_write(
3328 LargeStringArray::from_iter_values(vec!["parquet"]),
3329 StringViewArray::from_iter_values(vec!["barquet"]),
3330 LargeStringArray::from_iter_values(vec!["parquet", "barquet"]),
3331 );
3332
3333 ensure_compatible_write(
3334 StringViewArray::from_iter_values(vec!["parquet"]),
3335 LargeStringArray::from_iter_values(vec!["barquet"]),
3336 StringViewArray::from_iter_values(vec!["parquet", "barquet"]),
3337 );
3338
3339 ensure_compatible_write(
3342 BinaryArray::from_iter_values(vec![b"parquet"]),
3343 LargeBinaryArray::from_iter_values(vec![b"barquet"]),
3344 BinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3345 );
3346
3347 ensure_compatible_write(
3348 LargeBinaryArray::from_iter_values(vec![b"parquet"]),
3349 BinaryArray::from_iter_values(vec![b"barquet"]),
3350 LargeBinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3351 );
3352
3353 ensure_compatible_write(
3354 BinaryArray::from_iter_values(vec![b"parquet"]),
3355 BinaryViewArray::from_iter_values(vec![b"barquet"]),
3356 BinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3357 );
3358
3359 ensure_compatible_write(
3360 BinaryViewArray::from_iter_values(vec![b"parquet"]),
3361 BinaryArray::from_iter_values(vec![b"barquet"]),
3362 BinaryViewArray::from_iter_values(vec![b"parquet", b"barquet"]),
3363 );
3364
3365 ensure_compatible_write(
3366 BinaryViewArray::from_iter_values(vec![b"parquet"]),
3367 LargeBinaryArray::from_iter_values(vec![b"barquet"]),
3368 BinaryViewArray::from_iter_values(vec![b"parquet", b"barquet"]),
3369 );
3370
3371 ensure_compatible_write(
3372 LargeBinaryArray::from_iter_values(vec![b"parquet"]),
3373 BinaryViewArray::from_iter_values(vec![b"barquet"]),
3374 LargeBinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3375 );
3376
3377 let list_field_metadata = HashMap::from_iter(vec![(
3380 PARQUET_FIELD_ID_META_KEY.to_string(),
3381 "1".to_string(),
3382 )]);
3383 let list_field = Field::new_list_field(DataType::Int32, false);
3384
3385 let values1 = Arc::new(Int32Array::from(vec![0, 1, 2, 3, 4]));
3386 let offsets1 = OffsetBuffer::new(vec![0, 2, 5].into());
3387
3388 let values2 = Arc::new(Int32Array::from(vec![5, 6, 7, 8, 9]));
3389 let offsets2 = OffsetBuffer::new(vec![0, 3, 5].into());
3390
3391 let values_expected = Arc::new(Int32Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]));
3392 let offsets_expected = OffsetBuffer::new(vec![0, 2, 5, 8, 10].into());
3393
3394 ensure_compatible_write(
3395 ListArray::try_new(
3397 Arc::new(
3398 list_field
3399 .clone()
3400 .with_metadata(list_field_metadata.clone()),
3401 ),
3402 offsets1,
3403 values1,
3404 None,
3405 )
3406 .unwrap(),
3407 ListArray::try_new(Arc::new(list_field.clone()), offsets2, values2, None).unwrap(),
3409 ListArray::try_new(
3411 Arc::new(
3412 list_field
3413 .clone()
3414 .with_metadata(list_field_metadata.clone()),
3415 ),
3416 offsets_expected,
3417 values_expected,
3418 None,
3419 )
3420 .unwrap(),
3421 );
3422 }
3423
3424 #[test]
3425 fn arrow_writer_primitive_dictionary() {
3426 #[allow(deprecated)]
3428 let schema = Arc::new(Schema::new(vec![Field::new_dict(
3429 "dictionary",
3430 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::UInt32)),
3431 true,
3432 42,
3433 true,
3434 )]));
3435
3436 let mut builder = PrimitiveDictionaryBuilder::<UInt8Type, UInt32Type>::new();
3438 builder.append(12345678).unwrap();
3439 builder.append_null();
3440 builder.append(22345678).unwrap();
3441 builder.append(12345678).unwrap();
3442 let d = builder.finish();
3443
3444 one_column_roundtrip_with_schema(Arc::new(d), schema);
3445 }
3446
3447 #[test]
3448 fn arrow_writer_decimal32_dictionary() {
3449 let integers = vec![12345, 56789, 34567];
3450
3451 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3452
3453 let values = Decimal32Array::from(integers.clone())
3454 .with_precision_and_scale(5, 2)
3455 .unwrap();
3456
3457 let array = DictionaryArray::new(keys, Arc::new(values));
3458 one_column_roundtrip(Arc::new(array.clone()), true);
3459
3460 let values = Decimal32Array::from(integers)
3461 .with_precision_and_scale(9, 2)
3462 .unwrap();
3463
3464 let array = array.with_values(Arc::new(values));
3465 one_column_roundtrip(Arc::new(array), true);
3466 }
3467
3468 #[test]
3469 fn arrow_writer_decimal64_dictionary() {
3470 let integers = vec![12345, 56789, 34567];
3471
3472 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3473
3474 let values = Decimal64Array::from(integers.clone())
3475 .with_precision_and_scale(5, 2)
3476 .unwrap();
3477
3478 let array = DictionaryArray::new(keys, Arc::new(values));
3479 one_column_roundtrip(Arc::new(array.clone()), true);
3480
3481 let values = Decimal64Array::from(integers)
3482 .with_precision_and_scale(12, 2)
3483 .unwrap();
3484
3485 let array = array.with_values(Arc::new(values));
3486 one_column_roundtrip(Arc::new(array), true);
3487 }
3488
3489 #[test]
3490 fn arrow_writer_decimal128_dictionary() {
3491 let integers = vec![12345, 56789, 34567];
3492
3493 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3494
3495 let values = Decimal128Array::from(integers.clone())
3496 .with_precision_and_scale(5, 2)
3497 .unwrap();
3498
3499 let array = DictionaryArray::new(keys, Arc::new(values));
3500 one_column_roundtrip(Arc::new(array.clone()), true);
3501
3502 let values = Decimal128Array::from(integers)
3503 .with_precision_and_scale(12, 2)
3504 .unwrap();
3505
3506 let array = array.with_values(Arc::new(values));
3507 one_column_roundtrip(Arc::new(array), true);
3508 }
3509
3510 #[test]
3511 fn arrow_writer_decimal256_dictionary() {
3512 let integers = vec![
3513 i256::from_i128(12345),
3514 i256::from_i128(56789),
3515 i256::from_i128(34567),
3516 ];
3517
3518 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3519
3520 let values = Decimal256Array::from(integers.clone())
3521 .with_precision_and_scale(5, 2)
3522 .unwrap();
3523
3524 let array = DictionaryArray::new(keys, Arc::new(values));
3525 one_column_roundtrip(Arc::new(array.clone()), true);
3526
3527 let values = Decimal256Array::from(integers)
3528 .with_precision_and_scale(12, 2)
3529 .unwrap();
3530
3531 let array = array.with_values(Arc::new(values));
3532 one_column_roundtrip(Arc::new(array), true);
3533 }
3534
3535 #[test]
3536 fn arrow_writer_string_dictionary_unsigned_index() {
3537 #[allow(deprecated)]
3539 let schema = Arc::new(Schema::new(vec![Field::new_dict(
3540 "dictionary",
3541 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3542 true,
3543 42,
3544 true,
3545 )]));
3546
3547 let d: UInt8DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
3549 .iter()
3550 .copied()
3551 .collect();
3552
3553 one_column_roundtrip_with_schema(Arc::new(d), schema);
3554 }
3555
3556 #[test]
3557 fn u32_min_max() {
3558 let src = [
3560 u32::MIN,
3561 u32::MIN + 1,
3562 (i32::MAX as u32) - 1,
3563 i32::MAX as u32,
3564 (i32::MAX as u32) + 1,
3565 u32::MAX - 1,
3566 u32::MAX,
3567 ];
3568 let values = Arc::new(UInt32Array::from_iter_values(src.iter().cloned()));
3569 let files = one_column_roundtrip(values, false);
3570
3571 for file in files {
3572 let reader = SerializedFileReader::new(file).unwrap();
3574 let metadata = reader.metadata();
3575
3576 let mut row_offset = 0;
3577 for row_group in metadata.row_groups() {
3578 assert_eq!(row_group.num_columns(), 1);
3579 let column = row_group.column(0);
3580
3581 let num_values = column.num_values() as usize;
3582 let src_slice = &src[row_offset..row_offset + num_values];
3583 row_offset += column.num_values() as usize;
3584
3585 let stats = column.statistics().unwrap();
3586 if let Statistics::Int32(stats) = stats {
3587 assert_eq!(
3588 *stats.min_opt().unwrap() as u32,
3589 *src_slice.iter().min().unwrap()
3590 );
3591 assert_eq!(
3592 *stats.max_opt().unwrap() as u32,
3593 *src_slice.iter().max().unwrap()
3594 );
3595 } else {
3596 panic!("Statistics::Int32 missing")
3597 }
3598 }
3599 }
3600 }
3601
3602 #[test]
3603 fn u64_min_max() {
3604 let src = [
3606 u64::MIN,
3607 u64::MIN + 1,
3608 (i64::MAX as u64) - 1,
3609 i64::MAX as u64,
3610 (i64::MAX as u64) + 1,
3611 u64::MAX - 1,
3612 u64::MAX,
3613 ];
3614 let values = Arc::new(UInt64Array::from_iter_values(src.iter().cloned()));
3615 let files = one_column_roundtrip(values, false);
3616
3617 for file in files {
3618 let reader = SerializedFileReader::new(file).unwrap();
3620 let metadata = reader.metadata();
3621
3622 let mut row_offset = 0;
3623 for row_group in metadata.row_groups() {
3624 assert_eq!(row_group.num_columns(), 1);
3625 let column = row_group.column(0);
3626
3627 let num_values = column.num_values() as usize;
3628 let src_slice = &src[row_offset..row_offset + num_values];
3629 row_offset += column.num_values() as usize;
3630
3631 let stats = column.statistics().unwrap();
3632 if let Statistics::Int64(stats) = stats {
3633 assert_eq!(
3634 *stats.min_opt().unwrap() as u64,
3635 *src_slice.iter().min().unwrap()
3636 );
3637 assert_eq!(
3638 *stats.max_opt().unwrap() as u64,
3639 *src_slice.iter().max().unwrap()
3640 );
3641 } else {
3642 panic!("Statistics::Int64 missing")
3643 }
3644 }
3645 }
3646 }
3647
3648 #[test]
3649 fn statistics_null_counts_only_nulls() {
3650 let values = Arc::new(UInt64Array::from(vec![None, None]));
3652 let files = one_column_roundtrip(values, true);
3653
3654 for file in files {
3655 let reader = SerializedFileReader::new(file).unwrap();
3657 let metadata = reader.metadata();
3658 assert_eq!(metadata.num_row_groups(), 1);
3659 let row_group = metadata.row_group(0);
3660 assert_eq!(row_group.num_columns(), 1);
3661 let column = row_group.column(0);
3662 let stats = column.statistics().unwrap();
3663 assert_eq!(stats.null_count_opt(), Some(2));
3664 }
3665 }
3666
3667 #[test]
3668 fn test_list_of_struct_roundtrip() {
3669 let int_field = Field::new("a", DataType::Int32, true);
3671 let int_field2 = Field::new("b", DataType::Int32, true);
3672
3673 let int_builder = Int32Builder::with_capacity(10);
3674 let int_builder2 = Int32Builder::with_capacity(10);
3675
3676 let struct_builder = StructBuilder::new(
3677 vec![int_field, int_field2],
3678 vec![Box::new(int_builder), Box::new(int_builder2)],
3679 );
3680 let mut list_builder = ListBuilder::new(struct_builder);
3681
3682 let values = list_builder.values();
3687 values
3688 .field_builder::<Int32Builder>(0)
3689 .unwrap()
3690 .append_value(1);
3691 values
3692 .field_builder::<Int32Builder>(1)
3693 .unwrap()
3694 .append_value(2);
3695 values.append(true);
3696 list_builder.append(true);
3697
3698 list_builder.append(true);
3700
3701 list_builder.append(false);
3703
3704 let values = list_builder.values();
3706 values
3707 .field_builder::<Int32Builder>(0)
3708 .unwrap()
3709 .append_null();
3710 values
3711 .field_builder::<Int32Builder>(1)
3712 .unwrap()
3713 .append_null();
3714 values.append(false);
3715 values
3716 .field_builder::<Int32Builder>(0)
3717 .unwrap()
3718 .append_null();
3719 values
3720 .field_builder::<Int32Builder>(1)
3721 .unwrap()
3722 .append_null();
3723 values.append(false);
3724 list_builder.append(true);
3725
3726 let values = list_builder.values();
3728 values
3729 .field_builder::<Int32Builder>(0)
3730 .unwrap()
3731 .append_null();
3732 values
3733 .field_builder::<Int32Builder>(1)
3734 .unwrap()
3735 .append_value(3);
3736 values.append(true);
3737 list_builder.append(true);
3738
3739 let values = list_builder.values();
3741 values
3742 .field_builder::<Int32Builder>(0)
3743 .unwrap()
3744 .append_value(2);
3745 values
3746 .field_builder::<Int32Builder>(1)
3747 .unwrap()
3748 .append_null();
3749 values.append(true);
3750 list_builder.append(true);
3751
3752 let array = Arc::new(list_builder.finish());
3753
3754 one_column_roundtrip(array, true);
3755 }
3756
3757 fn row_group_sizes(metadata: &ParquetMetaData) -> Vec<i64> {
3758 metadata.row_groups().iter().map(|x| x.num_rows()).collect()
3759 }
3760
3761 #[test]
3762 fn test_aggregates_records() {
3763 let arrays = [
3764 Int32Array::from((0..100).collect::<Vec<_>>()),
3765 Int32Array::from((0..50).collect::<Vec<_>>()),
3766 Int32Array::from((200..500).collect::<Vec<_>>()),
3767 ];
3768
3769 let schema = Arc::new(Schema::new(vec![Field::new(
3770 "int",
3771 ArrowDataType::Int32,
3772 false,
3773 )]));
3774
3775 let file = tempfile::tempfile().unwrap();
3776
3777 let props = WriterProperties::builder()
3778 .set_max_row_group_size(200)
3779 .build();
3780
3781 let mut writer =
3782 ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
3783
3784 for array in arrays {
3785 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
3786 writer.write(&batch).unwrap();
3787 }
3788
3789 writer.close().unwrap();
3790
3791 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3792 assert_eq!(&row_group_sizes(builder.metadata()), &[200, 200, 50]);
3793
3794 let batches = builder
3795 .with_batch_size(100)
3796 .build()
3797 .unwrap()
3798 .collect::<ArrowResult<Vec<_>>>()
3799 .unwrap();
3800
3801 assert_eq!(batches.len(), 5);
3802 assert!(batches.iter().all(|x| x.num_columns() == 1));
3803
3804 let batch_sizes: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
3805
3806 assert_eq!(&batch_sizes, &[100, 100, 100, 100, 50]);
3807
3808 let values: Vec<_> = batches
3809 .iter()
3810 .flat_map(|x| {
3811 x.column(0)
3812 .as_any()
3813 .downcast_ref::<Int32Array>()
3814 .unwrap()
3815 .values()
3816 .iter()
3817 .cloned()
3818 })
3819 .collect();
3820
3821 let expected_values: Vec<_> = [0..100, 0..50, 200..500].into_iter().flatten().collect();
3822 assert_eq!(&values, &expected_values)
3823 }
3824
3825 #[test]
3826 fn complex_aggregate() {
3827 let field_a = Arc::new(Field::new("leaf_a", DataType::Int32, false));
3829 let field_b = Arc::new(Field::new("leaf_b", DataType::Int32, true));
3830 let struct_a = Arc::new(Field::new(
3831 "struct_a",
3832 DataType::Struct(vec![field_a.clone(), field_b.clone()].into()),
3833 true,
3834 ));
3835
3836 let list_a = Arc::new(Field::new("list", DataType::List(struct_a), true));
3837 let struct_b = Arc::new(Field::new(
3838 "struct_b",
3839 DataType::Struct(vec![list_a.clone()].into()),
3840 false,
3841 ));
3842
3843 let schema = Arc::new(Schema::new(vec![struct_b]));
3844
3845 let field_a_array = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
3847 let field_b_array =
3848 Int32Array::from_iter(vec![Some(1), None, Some(2), None, None, Some(6)]);
3849
3850 let struct_a_array = StructArray::from(vec![
3851 (field_a.clone(), Arc::new(field_a_array) as ArrayRef),
3852 (field_b.clone(), Arc::new(field_b_array) as ArrayRef),
3853 ]);
3854
3855 let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
3856 .len(5)
3857 .add_buffer(Buffer::from_iter(vec![
3858 0_i32, 1_i32, 1_i32, 3_i32, 3_i32, 5_i32,
3859 ]))
3860 .null_bit_buffer(Some(Buffer::from_iter(vec![
3861 true, false, true, false, true,
3862 ])))
3863 .child_data(vec![struct_a_array.into_data()])
3864 .build()
3865 .unwrap();
3866
3867 let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
3868 let struct_b_array = StructArray::from(vec![(list_a.clone(), list_a_array)]);
3869
3870 let batch1 =
3871 RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
3872 .unwrap();
3873
3874 let field_a_array = Int32Array::from(vec![6, 7, 8, 9, 10]);
3875 let field_b_array = Int32Array::from_iter(vec![None, None, None, Some(1), None]);
3876
3877 let struct_a_array = StructArray::from(vec![
3878 (field_a, Arc::new(field_a_array) as ArrayRef),
3879 (field_b, Arc::new(field_b_array) as ArrayRef),
3880 ]);
3881
3882 let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
3883 .len(2)
3884 .add_buffer(Buffer::from_iter(vec![0_i32, 4_i32, 5_i32]))
3885 .child_data(vec![struct_a_array.into_data()])
3886 .build()
3887 .unwrap();
3888
3889 let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
3890 let struct_b_array = StructArray::from(vec![(list_a, list_a_array)]);
3891
3892 let batch2 =
3893 RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
3894 .unwrap();
3895
3896 let batches = &[batch1, batch2];
3897
3898 let expected = r#"
3901 +-------------------------------------------------------------------------------------------------------+
3902 | struct_b |
3903 +-------------------------------------------------------------------------------------------------------+
3904 | {list: [{leaf_a: 1, leaf_b: 1}]} |
3905 | {list: } |
3906 | {list: [{leaf_a: 2, leaf_b: }, {leaf_a: 3, leaf_b: 2}]} |
3907 | {list: } |
3908 | {list: [{leaf_a: 4, leaf_b: }, {leaf_a: 5, leaf_b: }]} |
3909 | {list: [{leaf_a: 6, leaf_b: }, {leaf_a: 7, leaf_b: }, {leaf_a: 8, leaf_b: }, {leaf_a: 9, leaf_b: 1}]} |
3910 | {list: [{leaf_a: 10, leaf_b: }]} |
3911 +-------------------------------------------------------------------------------------------------------+
3912 "#.trim().split('\n').map(|x| x.trim()).collect::<Vec<_>>().join("\n");
3913
3914 let actual = pretty_format_batches(batches).unwrap().to_string();
3915 assert_eq!(actual, expected);
3916
3917 let file = tempfile::tempfile().unwrap();
3919 let props = WriterProperties::builder()
3920 .set_max_row_group_size(6)
3921 .build();
3922
3923 let mut writer =
3924 ArrowWriter::try_new(file.try_clone().unwrap(), schema, Some(props)).unwrap();
3925
3926 for batch in batches {
3927 writer.write(batch).unwrap();
3928 }
3929 writer.close().unwrap();
3930
3931 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3936 assert_eq!(&row_group_sizes(builder.metadata()), &[6, 1]);
3937
3938 let batches = builder
3939 .with_batch_size(2)
3940 .build()
3941 .unwrap()
3942 .collect::<ArrowResult<Vec<_>>>()
3943 .unwrap();
3944
3945 assert_eq!(batches.len(), 4);
3946 let batch_counts: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
3947 assert_eq!(&batch_counts, &[2, 2, 2, 1]);
3948
3949 let actual = pretty_format_batches(&batches).unwrap().to_string();
3950 assert_eq!(actual, expected);
3951 }
3952
3953 #[test]
3954 fn test_arrow_writer_metadata() {
3955 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3956 let file_schema = batch_schema.clone().with_metadata(
3957 vec![("foo".to_string(), "bar".to_string())]
3958 .into_iter()
3959 .collect(),
3960 );
3961
3962 let batch = RecordBatch::try_new(
3963 Arc::new(batch_schema),
3964 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3965 )
3966 .unwrap();
3967
3968 let mut buf = Vec::with_capacity(1024);
3969 let mut writer = ArrowWriter::try_new(&mut buf, Arc::new(file_schema), None).unwrap();
3970 writer.write(&batch).unwrap();
3971 writer.close().unwrap();
3972 }
3973
3974 #[test]
3975 fn test_arrow_writer_nullable() {
3976 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3977 let file_schema = Schema::new(vec![Field::new("int32", DataType::Int32, true)]);
3978 let file_schema = Arc::new(file_schema);
3979
3980 let batch = RecordBatch::try_new(
3981 Arc::new(batch_schema),
3982 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3983 )
3984 .unwrap();
3985
3986 let mut buf = Vec::with_capacity(1024);
3987 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
3988 writer.write(&batch).unwrap();
3989 writer.close().unwrap();
3990
3991 let mut read = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
3992 let back = read.next().unwrap().unwrap();
3993 assert_eq!(back.schema(), file_schema);
3994 assert_ne!(back.schema(), batch.schema());
3995 assert_eq!(back.column(0).as_ref(), batch.column(0).as_ref());
3996 }
3997
3998 #[test]
3999 fn in_progress_accounting() {
4000 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
4002
4003 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
4005
4006 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
4008
4009 let mut writer = ArrowWriter::try_new(vec![], batch.schema(), None).unwrap();
4010
4011 assert_eq!(writer.in_progress_size(), 0);
4013 assert_eq!(writer.in_progress_rows(), 0);
4014 assert_eq!(writer.memory_size(), 0);
4015 assert_eq!(writer.bytes_written(), 4); writer.write(&batch).unwrap();
4017
4018 let initial_size = writer.in_progress_size();
4020 assert!(initial_size > 0);
4021 assert_eq!(writer.in_progress_rows(), 5);
4022 let initial_memory = writer.memory_size();
4023 assert!(initial_memory > 0);
4024 assert!(
4026 initial_size <= initial_memory,
4027 "{initial_size} <= {initial_memory}"
4028 );
4029
4030 writer.write(&batch).unwrap();
4032 assert!(writer.in_progress_size() > initial_size);
4033 assert_eq!(writer.in_progress_rows(), 10);
4034 assert!(writer.memory_size() > initial_memory);
4035 assert!(
4036 writer.in_progress_size() <= writer.memory_size(),
4037 "in_progress_size {} <= memory_size {}",
4038 writer.in_progress_size(),
4039 writer.memory_size()
4040 );
4041
4042 let pre_flush_bytes_written = writer.bytes_written();
4044 writer.flush().unwrap();
4045 assert_eq!(writer.in_progress_size(), 0);
4046 assert_eq!(writer.memory_size(), 0);
4047 assert!(writer.bytes_written() > pre_flush_bytes_written);
4048
4049 writer.close().unwrap();
4050 }
4051
4052 #[test]
4053 fn test_writer_all_null() {
4054 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
4055 let b = Int32Array::new(vec![0; 5].into(), Some(NullBuffer::new_null(5)));
4056 let batch = RecordBatch::try_from_iter(vec![
4057 ("a", Arc::new(a) as ArrayRef),
4058 ("b", Arc::new(b) as ArrayRef),
4059 ])
4060 .unwrap();
4061
4062 let mut buf = Vec::with_capacity(1024);
4063 let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), None).unwrap();
4064 writer.write(&batch).unwrap();
4065 writer.close().unwrap();
4066
4067 let bytes = Bytes::from(buf);
4068 let options = ReadOptionsBuilder::new().with_page_index().build();
4069 let reader = SerializedFileReader::new_with_options(bytes, options).unwrap();
4070 let index = reader.metadata().offset_index().unwrap();
4071
4072 assert_eq!(index.len(), 1);
4073 assert_eq!(index[0].len(), 2); assert_eq!(index[0][0].page_locations().len(), 1); assert_eq!(index[0][1].page_locations().len(), 1); }
4077
4078 #[test]
4079 fn test_disabled_statistics_with_page() {
4080 let file_schema = Schema::new(vec![
4081 Field::new("a", DataType::Utf8, true),
4082 Field::new("b", DataType::Utf8, true),
4083 ]);
4084 let file_schema = Arc::new(file_schema);
4085
4086 let batch = RecordBatch::try_new(
4087 file_schema.clone(),
4088 vec![
4089 Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
4090 Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
4091 ],
4092 )
4093 .unwrap();
4094
4095 let props = WriterProperties::builder()
4096 .set_statistics_enabled(EnabledStatistics::None)
4097 .set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
4098 .build();
4099
4100 let mut buf = Vec::with_capacity(1024);
4101 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
4102 writer.write(&batch).unwrap();
4103
4104 let metadata = writer.close().unwrap();
4105 assert_eq!(metadata.num_row_groups(), 1);
4106 let row_group = metadata.row_group(0);
4107 assert_eq!(row_group.num_columns(), 2);
4108 assert!(row_group.column(0).offset_index_offset().is_some());
4110 assert!(row_group.column(0).column_index_offset().is_some());
4111 assert!(row_group.column(1).offset_index_offset().is_some());
4113 assert!(row_group.column(1).column_index_offset().is_none());
4114
4115 let options = ReadOptionsBuilder::new().with_page_index().build();
4116 let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
4117
4118 let row_group = reader.get_row_group(0).unwrap();
4119 let a_col = row_group.metadata().column(0);
4120 let b_col = row_group.metadata().column(1);
4121
4122 if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
4124 let min = byte_array_stats.min_opt().unwrap();
4125 let max = byte_array_stats.max_opt().unwrap();
4126
4127 assert_eq!(min.as_bytes(), b"a");
4128 assert_eq!(max.as_bytes(), b"d");
4129 } else {
4130 panic!("expecting Statistics::ByteArray");
4131 }
4132
4133 assert!(b_col.statistics().is_none());
4135
4136 let offset_index = reader.metadata().offset_index().unwrap();
4137 assert_eq!(offset_index.len(), 1); assert_eq!(offset_index[0].len(), 2); let column_index = reader.metadata().column_index().unwrap();
4141 assert_eq!(column_index.len(), 1); assert_eq!(column_index[0].len(), 2); let a_idx = &column_index[0][0];
4145 assert!(
4146 matches!(a_idx, ColumnIndexMetaData::BYTE_ARRAY(_)),
4147 "{a_idx:?}"
4148 );
4149 let b_idx = &column_index[0][1];
4150 assert!(matches!(b_idx, ColumnIndexMetaData::NONE), "{b_idx:?}");
4151 }
4152
4153 #[test]
4154 fn test_disabled_statistics_with_chunk() {
4155 let file_schema = Schema::new(vec![
4156 Field::new("a", DataType::Utf8, true),
4157 Field::new("b", DataType::Utf8, true),
4158 ]);
4159 let file_schema = Arc::new(file_schema);
4160
4161 let batch = RecordBatch::try_new(
4162 file_schema.clone(),
4163 vec![
4164 Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
4165 Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
4166 ],
4167 )
4168 .unwrap();
4169
4170 let props = WriterProperties::builder()
4171 .set_statistics_enabled(EnabledStatistics::None)
4172 .set_column_statistics_enabled("a".into(), EnabledStatistics::Chunk)
4173 .build();
4174
4175 let mut buf = Vec::with_capacity(1024);
4176 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
4177 writer.write(&batch).unwrap();
4178
4179 let metadata = writer.close().unwrap();
4180 assert_eq!(metadata.num_row_groups(), 1);
4181 let row_group = metadata.row_group(0);
4182 assert_eq!(row_group.num_columns(), 2);
4183 assert!(row_group.column(0).offset_index_offset().is_some());
4185 assert!(row_group.column(0).column_index_offset().is_none());
4186 assert!(row_group.column(1).offset_index_offset().is_some());
4188 assert!(row_group.column(1).column_index_offset().is_none());
4189
4190 let options = ReadOptionsBuilder::new().with_page_index().build();
4191 let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
4192
4193 let row_group = reader.get_row_group(0).unwrap();
4194 let a_col = row_group.metadata().column(0);
4195 let b_col = row_group.metadata().column(1);
4196
4197 if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
4199 let min = byte_array_stats.min_opt().unwrap();
4200 let max = byte_array_stats.max_opt().unwrap();
4201
4202 assert_eq!(min.as_bytes(), b"a");
4203 assert_eq!(max.as_bytes(), b"d");
4204 } else {
4205 panic!("expecting Statistics::ByteArray");
4206 }
4207
4208 assert!(b_col.statistics().is_none());
4210
4211 let column_index = reader.metadata().column_index().unwrap();
4212 assert_eq!(column_index.len(), 1); assert_eq!(column_index[0].len(), 2); let a_idx = &column_index[0][0];
4216 assert!(matches!(a_idx, ColumnIndexMetaData::NONE), "{a_idx:?}");
4217 let b_idx = &column_index[0][1];
4218 assert!(matches!(b_idx, ColumnIndexMetaData::NONE), "{b_idx:?}");
4219 }
4220
4221 #[test]
4222 fn test_arrow_writer_skip_metadata() {
4223 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
4224 let file_schema = Arc::new(batch_schema.clone());
4225
4226 let batch = RecordBatch::try_new(
4227 Arc::new(batch_schema),
4228 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4229 )
4230 .unwrap();
4231 let skip_options = ArrowWriterOptions::new().with_skip_arrow_metadata(true);
4232
4233 let mut buf = Vec::with_capacity(1024);
4234 let mut writer =
4235 ArrowWriter::try_new_with_options(&mut buf, file_schema.clone(), skip_options).unwrap();
4236 writer.write(&batch).unwrap();
4237 writer.close().unwrap();
4238
4239 let bytes = Bytes::from(buf);
4240 let reader_builder = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
4241 assert_eq!(file_schema, *reader_builder.schema());
4242 if let Some(key_value_metadata) = reader_builder
4243 .metadata()
4244 .file_metadata()
4245 .key_value_metadata()
4246 {
4247 assert!(
4248 !key_value_metadata
4249 .iter()
4250 .any(|kv| kv.key.as_str() == ARROW_SCHEMA_META_KEY)
4251 );
4252 }
4253 }
4254
4255 #[test]
4256 fn mismatched_schemas() {
4257 let batch_schema = Schema::new(vec![Field::new("count", DataType::Int32, false)]);
4258 let file_schema = Arc::new(Schema::new(vec![Field::new(
4259 "temperature",
4260 DataType::Float64,
4261 false,
4262 )]));
4263
4264 let batch = RecordBatch::try_new(
4265 Arc::new(batch_schema),
4266 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4267 )
4268 .unwrap();
4269
4270 let mut buf = Vec::with_capacity(1024);
4271 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
4272
4273 let err = writer.write(&batch).unwrap_err().to_string();
4274 assert_eq!(
4275 err,
4276 "Arrow: Incompatible type. Field 'temperature' has type Float64, array has type Int32"
4277 );
4278 }
4279
4280 #[test]
4281 fn test_roundtrip_empty_schema() {
4283 let empty_batch = RecordBatch::try_new_with_options(
4285 Arc::new(Schema::empty()),
4286 vec![],
4287 &RecordBatchOptions::default().with_row_count(Some(0)),
4288 )
4289 .unwrap();
4290
4291 let mut parquet_bytes: Vec<u8> = Vec::new();
4293 let mut writer =
4294 ArrowWriter::try_new(&mut parquet_bytes, empty_batch.schema(), None).unwrap();
4295 writer.write(&empty_batch).unwrap();
4296 writer.close().unwrap();
4297
4298 let bytes = Bytes::from(parquet_bytes);
4300 let reader = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
4301 assert_eq!(reader.schema(), &empty_batch.schema());
4302 let batches: Vec<_> = reader
4303 .build()
4304 .unwrap()
4305 .collect::<ArrowResult<Vec<_>>>()
4306 .unwrap();
4307 assert_eq!(batches.len(), 0);
4308 }
4309
4310 #[test]
4311 fn test_page_stats_not_written_by_default() {
4312 let string_field = Field::new("a", DataType::Utf8, false);
4313 let schema = Schema::new(vec![string_field]);
4314 let raw_string_values = vec!["Blart Versenwald III"];
4315 let string_values = StringArray::from(raw_string_values.clone());
4316 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
4317
4318 let props = WriterProperties::builder()
4319 .set_statistics_enabled(EnabledStatistics::Page)
4320 .set_dictionary_enabled(false)
4321 .set_encoding(Encoding::PLAIN)
4322 .set_compression(crate::basic::Compression::UNCOMPRESSED)
4323 .build();
4324
4325 let file = roundtrip_opts(&batch, props);
4326
4327 let first_page = &file[4..];
4332 let mut prot = ThriftSliceInputProtocol::new(first_page);
4333 let hdr = PageHeader::read_thrift(&mut prot).unwrap();
4334 let stats = hdr.data_page_header.unwrap().statistics;
4335
4336 assert!(stats.is_none());
4337 }
4338
4339 #[test]
4340 fn test_page_stats_when_enabled() {
4341 let string_field = Field::new("a", DataType::Utf8, false);
4342 let schema = Schema::new(vec![string_field]);
4343 let raw_string_values = vec!["Blart Versenwald III", "Andrew Lamb"];
4344 let string_values = StringArray::from(raw_string_values.clone());
4345 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
4346
4347 let props = WriterProperties::builder()
4348 .set_statistics_enabled(EnabledStatistics::Page)
4349 .set_dictionary_enabled(false)
4350 .set_encoding(Encoding::PLAIN)
4351 .set_write_page_header_statistics(true)
4352 .set_compression(crate::basic::Compression::UNCOMPRESSED)
4353 .build();
4354
4355 let file = roundtrip_opts(&batch, props);
4356
4357 let first_page = &file[4..];
4362 let mut prot = ThriftSliceInputProtocol::new(first_page);
4363 let hdr = PageHeader::read_thrift(&mut prot).unwrap();
4364 let stats = hdr.data_page_header.unwrap().statistics;
4365
4366 let stats = stats.unwrap();
4367 assert!(stats.is_max_value_exact.unwrap());
4369 assert!(stats.is_min_value_exact.unwrap());
4370 assert_eq!(stats.max_value.unwrap(), "Blart Versenwald III".as_bytes());
4371 assert_eq!(stats.min_value.unwrap(), "Andrew Lamb".as_bytes());
4372 }
4373
4374 #[test]
4375 fn test_page_stats_truncation() {
4376 let string_field = Field::new("a", DataType::Utf8, false);
4377 let binary_field = Field::new("b", DataType::Binary, false);
4378 let schema = Schema::new(vec![string_field, binary_field]);
4379
4380 let raw_string_values = vec!["Blart Versenwald III"];
4381 let raw_binary_values = [b"Blart Versenwald III".to_vec()];
4382 let raw_binary_value_refs = raw_binary_values
4383 .iter()
4384 .map(|x| x.as_slice())
4385 .collect::<Vec<_>>();
4386
4387 let string_values = StringArray::from(raw_string_values.clone());
4388 let binary_values = BinaryArray::from(raw_binary_value_refs);
4389 let batch = RecordBatch::try_new(
4390 Arc::new(schema),
4391 vec![Arc::new(string_values), Arc::new(binary_values)],
4392 )
4393 .unwrap();
4394
4395 let props = WriterProperties::builder()
4396 .set_statistics_truncate_length(Some(2))
4397 .set_dictionary_enabled(false)
4398 .set_encoding(Encoding::PLAIN)
4399 .set_write_page_header_statistics(true)
4400 .set_compression(crate::basic::Compression::UNCOMPRESSED)
4401 .build();
4402
4403 let file = roundtrip_opts(&batch, props);
4404
4405 let first_page = &file[4..];
4410 let mut prot = ThriftSliceInputProtocol::new(first_page);
4411 let hdr = PageHeader::read_thrift(&mut prot).unwrap();
4412 let stats = hdr.data_page_header.unwrap().statistics;
4413 assert!(stats.is_some());
4414 let stats = stats.unwrap();
4415 assert!(!stats.is_max_value_exact.unwrap());
4417 assert!(!stats.is_min_value_exact.unwrap());
4418 assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
4419 assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
4420
4421 let second_page = &prot.as_slice()[hdr.compressed_page_size as usize..];
4423 let mut prot = ThriftSliceInputProtocol::new(second_page);
4424 let hdr = PageHeader::read_thrift(&mut prot).unwrap();
4425 let stats = hdr.data_page_header.unwrap().statistics;
4426 assert!(stats.is_some());
4427 let stats = stats.unwrap();
4428 assert!(!stats.is_max_value_exact.unwrap());
4430 assert!(!stats.is_min_value_exact.unwrap());
4431 assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
4432 assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
4433 }
4434
4435 #[test]
4436 fn test_page_encoding_statistics_roundtrip() {
4437 let batch_schema = Schema::new(vec![Field::new(
4438 "int32",
4439 arrow_schema::DataType::Int32,
4440 false,
4441 )]);
4442
4443 let batch = RecordBatch::try_new(
4444 Arc::new(batch_schema.clone()),
4445 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4446 )
4447 .unwrap();
4448
4449 let mut file: File = tempfile::tempfile().unwrap();
4450 let mut writer = ArrowWriter::try_new(&mut file, Arc::new(batch_schema), None).unwrap();
4451 writer.write(&batch).unwrap();
4452 let file_metadata = writer.close().unwrap();
4453
4454 assert_eq!(file_metadata.num_row_groups(), 1);
4455 assert_eq!(file_metadata.row_group(0).num_columns(), 1);
4456 assert!(
4457 file_metadata
4458 .row_group(0)
4459 .column(0)
4460 .page_encoding_stats()
4461 .is_some()
4462 );
4463 let chunk_page_stats = file_metadata
4464 .row_group(0)
4465 .column(0)
4466 .page_encoding_stats()
4467 .unwrap();
4468
4469 let options = ReadOptionsBuilder::new()
4471 .with_page_index()
4472 .with_encoding_stats_as_mask(false)
4473 .build();
4474 let reader = SerializedFileReader::new_with_options(file, options).unwrap();
4475
4476 let rowgroup = reader.get_row_group(0).expect("row group missing");
4477 assert_eq!(rowgroup.num_columns(), 1);
4478 let column = rowgroup.metadata().column(0);
4479 assert!(column.page_encoding_stats().is_some());
4480 let file_page_stats = column.page_encoding_stats().unwrap();
4481 assert_eq!(chunk_page_stats, file_page_stats);
4482 }
4483
4484 #[test]
4485 fn test_different_dict_page_size_limit() {
4486 let array = Arc::new(Int64Array::from_iter(0..1024 * 1024));
4487 let schema = Arc::new(Schema::new(vec![
4488 Field::new("col0", arrow_schema::DataType::Int64, false),
4489 Field::new("col1", arrow_schema::DataType::Int64, false),
4490 ]));
4491 let batch =
4492 arrow_array::RecordBatch::try_new(schema.clone(), vec![array.clone(), array]).unwrap();
4493
4494 let props = WriterProperties::builder()
4495 .set_dictionary_page_size_limit(1024 * 1024)
4496 .set_column_dictionary_page_size_limit(ColumnPath::from("col1"), 1024 * 1024 * 4)
4497 .build();
4498 let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap();
4499 writer.write(&batch).unwrap();
4500 let data = Bytes::from(writer.into_inner().unwrap());
4501
4502 let mut metadata = ParquetMetaDataReader::new();
4503 metadata.try_parse(&data).unwrap();
4504 let metadata = metadata.finish().unwrap();
4505 let col0_meta = metadata.row_group(0).column(0);
4506 let col1_meta = metadata.row_group(0).column(1);
4507
4508 let get_dict_page_size = move |meta: &ColumnChunkMetaData| {
4509 let mut reader =
4510 SerializedPageReader::new(Arc::new(data.clone()), meta, 0, None).unwrap();
4511 let page = reader.get_next_page().unwrap().unwrap();
4512 match page {
4513 Page::DictionaryPage { buf, .. } => buf.len(),
4514 _ => panic!("expected DictionaryPage"),
4515 }
4516 };
4517
4518 assert_eq!(get_dict_page_size(col0_meta), 1024 * 1024);
4519 assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4);
4520 }
4521}