1use bytes::Bytes;
21use std::io::{Read, Write};
22use std::iter::Peekable;
23use std::slice::Iter;
24use std::sync::{Arc, Mutex};
25use std::vec::IntoIter;
26use thrift::protocol::TCompactOutputProtocol;
27
28use arrow_array::cast::AsArray;
29use arrow_array::types::*;
30use arrow_array::{ArrayRef, RecordBatch, RecordBatchWriter};
31use arrow_schema::{ArrowError, DataType as ArrowDataType, Field, IntervalUnit, SchemaRef};
32
33use super::schema::{add_encoded_arrow_schema_to_metadata, decimal_length_from_precision};
34
35use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder;
36use crate::arrow::ArrowSchemaConverter;
37use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter};
38use crate::column::page_encryption::PageEncryptor;
39use crate::column::writer::encoder::ColumnValueEncoder;
40use crate::column::writer::{
41 get_column_writer, ColumnCloseResult, ColumnWriter, GenericColumnWriter,
42};
43use crate::data_type::{ByteArray, FixedLenByteArray};
44#[cfg(feature = "encryption")]
45use crate::encryption::encrypt::FileEncryptor;
46use crate::errors::{ParquetError, Result};
47use crate::file::metadata::{KeyValue, RowGroupMetaData};
48use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
49use crate::file::reader::{ChunkReader, Length};
50use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
51use crate::schema::types::{ColumnDescPtr, SchemaDescriptor};
52use crate::thrift::TSerializable;
53use levels::{calculate_array_levels, ArrayLevels};
54
55mod byte_array;
56mod levels;
57
58pub struct ArrowWriter<W: Write> {
132 writer: SerializedFileWriter<W>,
134
135 in_progress: Option<ArrowRowGroupWriter>,
137
138 arrow_schema: SchemaRef,
142
143 row_group_writer_factory: ArrowRowGroupWriterFactory,
145
146 max_row_group_size: usize,
148}
149
150impl<W: Write + Send> std::fmt::Debug for ArrowWriter<W> {
151 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
152 let buffered_memory = self.in_progress_size();
153 f.debug_struct("ArrowWriter")
154 .field("writer", &self.writer)
155 .field("in_progress_size", &format_args!("{buffered_memory} bytes"))
156 .field("in_progress_rows", &self.in_progress_rows())
157 .field("arrow_schema", &self.arrow_schema)
158 .field("max_row_group_size", &self.max_row_group_size)
159 .finish()
160 }
161}
162
163impl<W: Write + Send> ArrowWriter<W> {
164 pub fn try_new(
170 writer: W,
171 arrow_schema: SchemaRef,
172 props: Option<WriterProperties>,
173 ) -> Result<Self> {
174 let options = ArrowWriterOptions::new().with_properties(props.unwrap_or_default());
175 Self::try_new_with_options(writer, arrow_schema, options)
176 }
177
178 pub fn try_new_with_options(
184 writer: W,
185 arrow_schema: SchemaRef,
186 options: ArrowWriterOptions,
187 ) -> Result<Self> {
188 let mut props = options.properties;
189 let mut converter = ArrowSchemaConverter::new().with_coerce_types(props.coerce_types());
190 if let Some(schema_root) = &options.schema_root {
191 converter = converter.schema_root(schema_root);
192 }
193 let schema = converter.convert(&arrow_schema)?;
194 if !options.skip_arrow_metadata {
195 add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
197 }
198
199 let max_row_group_size = props.max_row_group_size();
200
201 let file_writer =
202 SerializedFileWriter::new(writer, schema.root_schema_ptr(), Arc::new(props))?;
203
204 let row_group_writer_factory = ArrowRowGroupWriterFactory::new(&file_writer);
205
206 Ok(Self {
207 writer: file_writer,
208 in_progress: None,
209 arrow_schema,
210 row_group_writer_factory,
211 max_row_group_size,
212 })
213 }
214
215 pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
217 self.writer.flushed_row_groups()
218 }
219
220 pub fn memory_size(&self) -> usize {
225 match &self.in_progress {
226 Some(in_progress) => in_progress.writers.iter().map(|x| x.memory_size()).sum(),
227 None => 0,
228 }
229 }
230
231 pub fn in_progress_size(&self) -> usize {
238 match &self.in_progress {
239 Some(in_progress) => in_progress
240 .writers
241 .iter()
242 .map(|x| x.get_estimated_total_bytes())
243 .sum(),
244 None => 0,
245 }
246 }
247
248 pub fn in_progress_rows(&self) -> usize {
250 self.in_progress
251 .as_ref()
252 .map(|x| x.buffered_rows)
253 .unwrap_or_default()
254 }
255
256 pub fn bytes_written(&self) -> usize {
258 self.writer.bytes_written()
259 }
260
261 pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
269 if batch.num_rows() == 0 {
270 return Ok(());
271 }
272
273 let in_progress = match &mut self.in_progress {
274 Some(in_progress) => in_progress,
275 x => x.insert(self.row_group_writer_factory.create_row_group_writer(
276 self.writer.schema_descr(),
277 self.writer.properties(),
278 &self.arrow_schema,
279 self.writer.flushed_row_groups().len(),
280 )?),
281 };
282
283 if in_progress.buffered_rows + batch.num_rows() > self.max_row_group_size {
285 let to_write = self.max_row_group_size - in_progress.buffered_rows;
286 let a = batch.slice(0, to_write);
287 let b = batch.slice(to_write, batch.num_rows() - to_write);
288 self.write(&a)?;
289 return self.write(&b);
290 }
291
292 in_progress.write(batch)?;
293
294 if in_progress.buffered_rows >= self.max_row_group_size {
295 self.flush()?
296 }
297 Ok(())
298 }
299
300 pub fn flush(&mut self) -> Result<()> {
302 let in_progress = match self.in_progress.take() {
303 Some(in_progress) => in_progress,
304 None => return Ok(()),
305 };
306
307 let mut row_group_writer = self.writer.next_row_group()?;
308 for chunk in in_progress.close()? {
309 chunk.append_to_row_group(&mut row_group_writer)?;
310 }
311 row_group_writer.close()?;
312 Ok(())
313 }
314
315 pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
319 self.writer.append_key_value_metadata(kv_metadata)
320 }
321
322 pub fn inner(&self) -> &W {
324 self.writer.inner()
325 }
326
327 pub fn inner_mut(&mut self) -> &mut W {
332 self.writer.inner_mut()
333 }
334
335 pub fn into_inner(mut self) -> Result<W> {
337 self.flush()?;
338 self.writer.into_inner()
339 }
340
341 pub fn finish(&mut self) -> Result<crate::format::FileMetaData> {
347 self.flush()?;
348 self.writer.finish()
349 }
350
351 pub fn close(mut self) -> Result<crate::format::FileMetaData> {
353 self.finish()
354 }
355}
356
357impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> {
358 fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
359 self.write(batch).map_err(|e| e.into())
360 }
361
362 fn close(self) -> std::result::Result<(), ArrowError> {
363 self.close()?;
364 Ok(())
365 }
366}
367
368#[derive(Debug, Clone, Default)]
372pub struct ArrowWriterOptions {
373 properties: WriterProperties,
374 skip_arrow_metadata: bool,
375 schema_root: Option<String>,
376}
377
378impl ArrowWriterOptions {
379 pub fn new() -> Self {
381 Self::default()
382 }
383
384 pub fn with_properties(self, properties: WriterProperties) -> Self {
386 Self { properties, ..self }
387 }
388
389 pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
396 Self {
397 skip_arrow_metadata,
398 ..self
399 }
400 }
401
402 pub fn with_schema_root(self, schema_root: String) -> Self {
404 Self {
405 schema_root: Some(schema_root),
406 ..self
407 }
408 }
409}
410
411#[derive(Default)]
413struct ArrowColumnChunkData {
414 length: usize,
415 data: Vec<Bytes>,
416}
417
418impl Length for ArrowColumnChunkData {
419 fn len(&self) -> u64 {
420 self.length as _
421 }
422}
423
424impl ChunkReader for ArrowColumnChunkData {
425 type T = ArrowColumnChunkReader;
426
427 fn get_read(&self, start: u64) -> Result<Self::T> {
428 assert_eq!(start, 0); Ok(ArrowColumnChunkReader(
430 self.data.clone().into_iter().peekable(),
431 ))
432 }
433
434 fn get_bytes(&self, _start: u64, _length: usize) -> Result<Bytes> {
435 unimplemented!()
436 }
437}
438
439struct ArrowColumnChunkReader(Peekable<IntoIter<Bytes>>);
441
442impl Read for ArrowColumnChunkReader {
443 fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
444 let buffer = loop {
445 match self.0.peek_mut() {
446 Some(b) if b.is_empty() => {
447 self.0.next();
448 continue;
449 }
450 Some(b) => break b,
451 None => return Ok(0),
452 }
453 };
454
455 let len = buffer.len().min(out.len());
456 let b = buffer.split_to(len);
457 out[..len].copy_from_slice(&b);
458 Ok(len)
459 }
460}
461
462type SharedColumnChunk = Arc<Mutex<ArrowColumnChunkData>>;
467
468#[derive(Default)]
469struct ArrowPageWriter {
470 buffer: SharedColumnChunk,
471 #[cfg(feature = "encryption")]
472 page_encryptor: Option<PageEncryptor>,
473}
474
475impl ArrowPageWriter {
476 #[cfg(feature = "encryption")]
477 pub fn with_encryptor(mut self, page_encryptor: Option<PageEncryptor>) -> Self {
478 self.page_encryptor = page_encryptor;
479 self
480 }
481
482 #[cfg(feature = "encryption")]
483 fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
484 self.page_encryptor.as_mut()
485 }
486
487 #[cfg(not(feature = "encryption"))]
488 fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
489 None
490 }
491}
492
493impl PageWriter for ArrowPageWriter {
494 fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
495 let page = match self.page_encryptor_mut() {
496 Some(page_encryptor) => page_encryptor.encrypt_compressed_page(page)?,
497 None => page,
498 };
499
500 let page_header = page.to_thrift_header();
501 let header = {
502 let mut header = Vec::with_capacity(1024);
503
504 match self.page_encryptor_mut() {
505 Some(page_encryptor) => {
506 page_encryptor.encrypt_page_header(&page_header, &mut header)?;
507 if page.compressed_page().is_data_page() {
508 page_encryptor.increment_page();
509 }
510 }
511 None => {
512 let mut protocol = TCompactOutputProtocol::new(&mut header);
513 page_header.write_to_out_protocol(&mut protocol)?;
514 }
515 };
516
517 Bytes::from(header)
518 };
519
520 let mut buf = self.buffer.try_lock().unwrap();
521
522 let data = page.compressed_page().buffer().clone();
523 let compressed_size = data.len() + header.len();
524
525 let mut spec = PageWriteSpec::new();
526 spec.page_type = page.page_type();
527 spec.num_values = page.num_values();
528 spec.uncompressed_size = page.uncompressed_size() + header.len();
529 spec.offset = buf.length as u64;
530 spec.compressed_size = compressed_size;
531 spec.bytes_written = compressed_size as u64;
532
533 buf.length += compressed_size;
534 buf.data.push(header);
535 buf.data.push(data);
536
537 Ok(spec)
538 }
539
540 fn close(&mut self) -> Result<()> {
541 Ok(())
542 }
543}
544
545#[derive(Debug)]
547pub struct ArrowLeafColumn(ArrayLevels);
548
549pub fn compute_leaves(field: &Field, array: &ArrayRef) -> Result<Vec<ArrowLeafColumn>> {
551 let levels = calculate_array_levels(array, field)?;
552 Ok(levels.into_iter().map(ArrowLeafColumn).collect())
553}
554
555pub struct ArrowColumnChunk {
557 data: ArrowColumnChunkData,
558 close: ColumnCloseResult,
559}
560
561impl std::fmt::Debug for ArrowColumnChunk {
562 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
563 f.debug_struct("ArrowColumnChunk")
564 .field("length", &self.data.length)
565 .finish_non_exhaustive()
566 }
567}
568
569impl ArrowColumnChunk {
570 pub fn append_to_row_group<W: Write + Send>(
572 self,
573 writer: &mut SerializedRowGroupWriter<'_, W>,
574 ) -> Result<()> {
575 writer.append_column(&self.data, self.close)
576 }
577}
578
579pub struct ArrowColumnWriter {
673 writer: ArrowColumnWriterImpl,
674 chunk: SharedColumnChunk,
675}
676
677impl std::fmt::Debug for ArrowColumnWriter {
678 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
679 f.debug_struct("ArrowColumnWriter").finish_non_exhaustive()
680 }
681}
682
683enum ArrowColumnWriterImpl {
684 ByteArray(GenericColumnWriter<'static, ByteArrayEncoder>),
685 Column(ColumnWriter<'static>),
686}
687
688impl ArrowColumnWriter {
689 pub fn write(&mut self, col: &ArrowLeafColumn) -> Result<()> {
691 match &mut self.writer {
692 ArrowColumnWriterImpl::Column(c) => {
693 write_leaf(c, &col.0)?;
694 }
695 ArrowColumnWriterImpl::ByteArray(c) => {
696 write_primitive(c, col.0.array().as_ref(), &col.0)?;
697 }
698 }
699 Ok(())
700 }
701
702 pub fn close(self) -> Result<ArrowColumnChunk> {
704 let close = match self.writer {
705 ArrowColumnWriterImpl::ByteArray(c) => c.close()?,
706 ArrowColumnWriterImpl::Column(c) => c.close()?,
707 };
708 let chunk = Arc::try_unwrap(self.chunk).ok().unwrap();
709 let data = chunk.into_inner().unwrap();
710 Ok(ArrowColumnChunk { data, close })
711 }
712
713 pub fn memory_size(&self) -> usize {
724 match &self.writer {
725 ArrowColumnWriterImpl::ByteArray(c) => c.memory_size(),
726 ArrowColumnWriterImpl::Column(c) => c.memory_size(),
727 }
728 }
729
730 pub fn get_estimated_total_bytes(&self) -> usize {
738 match &self.writer {
739 ArrowColumnWriterImpl::ByteArray(c) => c.get_estimated_total_bytes() as _,
740 ArrowColumnWriterImpl::Column(c) => c.get_estimated_total_bytes() as _,
741 }
742 }
743}
744
745struct ArrowRowGroupWriter {
747 writers: Vec<ArrowColumnWriter>,
748 schema: SchemaRef,
749 buffered_rows: usize,
750}
751
752impl ArrowRowGroupWriter {
753 fn new(writers: Vec<ArrowColumnWriter>, arrow: &SchemaRef) -> Self {
754 Self {
755 writers,
756 schema: arrow.clone(),
757 buffered_rows: 0,
758 }
759 }
760
761 fn write(&mut self, batch: &RecordBatch) -> Result<()> {
762 self.buffered_rows += batch.num_rows();
763 let mut writers = self.writers.iter_mut();
764 for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
765 for leaf in compute_leaves(field.as_ref(), column)? {
766 writers.next().unwrap().write(&leaf)?
767 }
768 }
769 Ok(())
770 }
771
772 fn close(self) -> Result<Vec<ArrowColumnChunk>> {
773 self.writers
774 .into_iter()
775 .map(|writer| writer.close())
776 .collect()
777 }
778}
779
780struct ArrowRowGroupWriterFactory {
781 #[cfg(feature = "encryption")]
782 file_encryptor: Option<Arc<FileEncryptor>>,
783}
784
785impl ArrowRowGroupWriterFactory {
786 #[cfg(feature = "encryption")]
787 fn new<W: Write + Send>(file_writer: &SerializedFileWriter<W>) -> Self {
788 Self {
789 file_encryptor: file_writer.file_encryptor(),
790 }
791 }
792
793 #[cfg(not(feature = "encryption"))]
794 fn new<W: Write + Send>(_file_writer: &SerializedFileWriter<W>) -> Self {
795 Self {}
796 }
797
798 #[cfg(feature = "encryption")]
799 fn create_row_group_writer(
800 &self,
801 parquet: &SchemaDescriptor,
802 props: &WriterPropertiesPtr,
803 arrow: &SchemaRef,
804 row_group_index: usize,
805 ) -> Result<ArrowRowGroupWriter> {
806 let writers = get_column_writers_with_encryptor(
807 parquet,
808 props,
809 arrow,
810 self.file_encryptor.clone(),
811 row_group_index,
812 )?;
813 Ok(ArrowRowGroupWriter::new(writers, arrow))
814 }
815
816 #[cfg(not(feature = "encryption"))]
817 fn create_row_group_writer(
818 &self,
819 parquet: &SchemaDescriptor,
820 props: &WriterPropertiesPtr,
821 arrow: &SchemaRef,
822 _row_group_index: usize,
823 ) -> Result<ArrowRowGroupWriter> {
824 let writers = get_column_writers(parquet, props, arrow)?;
825 Ok(ArrowRowGroupWriter::new(writers, arrow))
826 }
827}
828
829pub fn get_column_writers(
831 parquet: &SchemaDescriptor,
832 props: &WriterPropertiesPtr,
833 arrow: &SchemaRef,
834) -> Result<Vec<ArrowColumnWriter>> {
835 let mut writers = Vec::with_capacity(arrow.fields.len());
836 let mut leaves = parquet.columns().iter();
837 let column_factory = ArrowColumnWriterFactory::new();
838 for field in &arrow.fields {
839 column_factory.get_arrow_column_writer(
840 field.data_type(),
841 props,
842 &mut leaves,
843 &mut writers,
844 )?;
845 }
846 Ok(writers)
847}
848
849#[cfg(feature = "encryption")]
851fn get_column_writers_with_encryptor(
852 parquet: &SchemaDescriptor,
853 props: &WriterPropertiesPtr,
854 arrow: &SchemaRef,
855 file_encryptor: Option<Arc<FileEncryptor>>,
856 row_group_index: usize,
857) -> Result<Vec<ArrowColumnWriter>> {
858 let mut writers = Vec::with_capacity(arrow.fields.len());
859 let mut leaves = parquet.columns().iter();
860 let column_factory =
861 ArrowColumnWriterFactory::new().with_file_encryptor(row_group_index, file_encryptor);
862 for field in &arrow.fields {
863 column_factory.get_arrow_column_writer(
864 field.data_type(),
865 props,
866 &mut leaves,
867 &mut writers,
868 )?;
869 }
870 Ok(writers)
871}
872
873struct ArrowColumnWriterFactory {
875 #[cfg(feature = "encryption")]
876 row_group_index: usize,
877 #[cfg(feature = "encryption")]
878 file_encryptor: Option<Arc<FileEncryptor>>,
879}
880
881impl ArrowColumnWriterFactory {
882 pub fn new() -> Self {
883 Self {
884 #[cfg(feature = "encryption")]
885 row_group_index: 0,
886 #[cfg(feature = "encryption")]
887 file_encryptor: None,
888 }
889 }
890
891 #[cfg(feature = "encryption")]
892 pub fn with_file_encryptor(
893 mut self,
894 row_group_index: usize,
895 file_encryptor: Option<Arc<FileEncryptor>>,
896 ) -> Self {
897 self.row_group_index = row_group_index;
898 self.file_encryptor = file_encryptor;
899 self
900 }
901
902 #[cfg(feature = "encryption")]
903 fn create_page_writer(
904 &self,
905 column_descriptor: &ColumnDescPtr,
906 column_index: usize,
907 ) -> Result<Box<ArrowPageWriter>> {
908 let column_path = column_descriptor.path().string();
909 let page_encryptor = PageEncryptor::create_if_column_encrypted(
910 &self.file_encryptor,
911 self.row_group_index,
912 column_index,
913 &column_path,
914 )?;
915 Ok(Box::new(
916 ArrowPageWriter::default().with_encryptor(page_encryptor),
917 ))
918 }
919
920 #[cfg(not(feature = "encryption"))]
921 fn create_page_writer(
922 &self,
923 _column_descriptor: &ColumnDescPtr,
924 _column_index: usize,
925 ) -> Result<Box<ArrowPageWriter>> {
926 Ok(Box::<ArrowPageWriter>::default())
927 }
928
929 fn get_arrow_column_writer(
931 &self,
932 data_type: &ArrowDataType,
933 props: &WriterPropertiesPtr,
934 leaves: &mut Iter<'_, ColumnDescPtr>,
935 out: &mut Vec<ArrowColumnWriter>,
936 ) -> Result<()> {
937 let col = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
938 let page_writer = self.create_page_writer(desc, out.len())?;
939 let chunk = page_writer.buffer.clone();
940 let writer = get_column_writer(desc.clone(), props.clone(), page_writer);
941 Ok(ArrowColumnWriter {
942 chunk,
943 writer: ArrowColumnWriterImpl::Column(writer),
944 })
945 };
946
947 let bytes = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
948 let page_writer = self.create_page_writer(desc, out.len())?;
949 let chunk = page_writer.buffer.clone();
950 let writer = GenericColumnWriter::new(desc.clone(), props.clone(), page_writer);
951 Ok(ArrowColumnWriter {
952 chunk,
953 writer: ArrowColumnWriterImpl::ByteArray(writer),
954 })
955 };
956
957 match data_type {
958 _ if data_type.is_primitive() => out.push(col(leaves.next().unwrap())?),
959 ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Boolean | ArrowDataType::Null => out.push(col(leaves.next().unwrap())?),
960 ArrowDataType::LargeBinary
961 | ArrowDataType::Binary
962 | ArrowDataType::Utf8
963 | ArrowDataType::LargeUtf8
964 | ArrowDataType::BinaryView
965 | ArrowDataType::Utf8View => {
966 out.push(bytes(leaves.next().unwrap())?)
967 }
968 ArrowDataType::List(f)
969 | ArrowDataType::LargeList(f)
970 | ArrowDataType::FixedSizeList(f, _) => {
971 self.get_arrow_column_writer(f.data_type(), props, leaves, out)?
972 }
973 ArrowDataType::Struct(fields) => {
974 for field in fields {
975 self.get_arrow_column_writer(field.data_type(), props, leaves, out)?
976 }
977 }
978 ArrowDataType::Map(f, _) => match f.data_type() {
979 ArrowDataType::Struct(f) => {
980 self.get_arrow_column_writer(f[0].data_type(), props, leaves, out)?;
981 self.get_arrow_column_writer(f[1].data_type(), props, leaves, out)?
982 }
983 _ => unreachable!("invalid map type"),
984 }
985 ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
986 ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 | ArrowDataType::Binary | ArrowDataType::LargeBinary => {
987 out.push(bytes(leaves.next().unwrap())?)
988 }
989 ArrowDataType::Utf8View | ArrowDataType::BinaryView => {
990 out.push(bytes(leaves.next().unwrap())?)
991 }
992 ArrowDataType::FixedSizeBinary(_) => {
993 out.push(bytes(leaves.next().unwrap())?)
994 }
995 _ => {
996 out.push(col(leaves.next().unwrap())?)
997 }
998 }
999 _ => return Err(ParquetError::NYI(
1000 format!(
1001 "Attempting to write an Arrow type {data_type:?} to parquet that is not yet implemented"
1002 )
1003 ))
1004 }
1005 Ok(())
1006 }
1007}
1008
1009fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result<usize> {
1010 let column = levels.array().as_ref();
1011 let indices = levels.non_null_indices();
1012 match writer {
1013 ColumnWriter::Int32ColumnWriter(ref mut typed) => {
1014 match column.data_type() {
1015 ArrowDataType::Date64 => {
1016 let array = arrow_cast::cast(column, &ArrowDataType::Date32)?;
1018 let array = arrow_cast::cast(&array, &ArrowDataType::Int32)?;
1019
1020 let array = array.as_primitive::<Int32Type>();
1021 write_primitive(typed, array.values(), levels)
1022 }
1023 ArrowDataType::UInt32 => {
1024 let values = column.as_primitive::<UInt32Type>().values();
1025 let array = values.inner().typed_data::<i32>();
1028 write_primitive(typed, array, levels)
1029 }
1030 ArrowDataType::Decimal128(_, _) => {
1031 let array = column
1033 .as_primitive::<Decimal128Type>()
1034 .unary::<_, Int32Type>(|v| v as i32);
1035 write_primitive(typed, array.values(), levels)
1036 }
1037 ArrowDataType::Decimal256(_, _) => {
1038 let array = column
1040 .as_primitive::<Decimal256Type>()
1041 .unary::<_, Int32Type>(|v| v.as_i128() as i32);
1042 write_primitive(typed, array.values(), levels)
1043 }
1044 ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1045 ArrowDataType::Decimal128(_, _) => {
1046 let array = arrow_cast::cast(column, value_type)?;
1047 let array = array
1048 .as_primitive::<Decimal128Type>()
1049 .unary::<_, Int32Type>(|v| v as i32);
1050 write_primitive(typed, array.values(), levels)
1051 }
1052 ArrowDataType::Decimal256(_, _) => {
1053 let array = arrow_cast::cast(column, value_type)?;
1054 let array = array
1055 .as_primitive::<Decimal256Type>()
1056 .unary::<_, Int32Type>(|v| v.as_i128() as i32);
1057 write_primitive(typed, array.values(), levels)
1058 }
1059 _ => {
1060 let array = arrow_cast::cast(column, &ArrowDataType::Int32)?;
1061 let array = array.as_primitive::<Int32Type>();
1062 write_primitive(typed, array.values(), levels)
1063 }
1064 },
1065 _ => {
1066 let array = arrow_cast::cast(column, &ArrowDataType::Int32)?;
1067 let array = array.as_primitive::<Int32Type>();
1068 write_primitive(typed, array.values(), levels)
1069 }
1070 }
1071 }
1072 ColumnWriter::BoolColumnWriter(ref mut typed) => {
1073 let array = column.as_boolean();
1074 typed.write_batch(
1075 get_bool_array_slice(array, indices).as_slice(),
1076 levels.def_levels(),
1077 levels.rep_levels(),
1078 )
1079 }
1080 ColumnWriter::Int64ColumnWriter(ref mut typed) => {
1081 match column.data_type() {
1082 ArrowDataType::Date64 => {
1083 let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
1084
1085 let array = array.as_primitive::<Int64Type>();
1086 write_primitive(typed, array.values(), levels)
1087 }
1088 ArrowDataType::Int64 => {
1089 let array = column.as_primitive::<Int64Type>();
1090 write_primitive(typed, array.values(), levels)
1091 }
1092 ArrowDataType::UInt64 => {
1093 let values = column.as_primitive::<UInt64Type>().values();
1094 let array = values.inner().typed_data::<i64>();
1097 write_primitive(typed, array, levels)
1098 }
1099 ArrowDataType::Decimal128(_, _) => {
1100 let array = column
1102 .as_primitive::<Decimal128Type>()
1103 .unary::<_, Int64Type>(|v| v as i64);
1104 write_primitive(typed, array.values(), levels)
1105 }
1106 ArrowDataType::Decimal256(_, _) => {
1107 let array = column
1109 .as_primitive::<Decimal256Type>()
1110 .unary::<_, Int64Type>(|v| v.as_i128() as i64);
1111 write_primitive(typed, array.values(), levels)
1112 }
1113 ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1114 ArrowDataType::Decimal128(_, _) => {
1115 let array = arrow_cast::cast(column, value_type)?;
1116 let array = array
1117 .as_primitive::<Decimal128Type>()
1118 .unary::<_, Int64Type>(|v| v as i64);
1119 write_primitive(typed, array.values(), levels)
1120 }
1121 ArrowDataType::Decimal256(_, _) => {
1122 let array = arrow_cast::cast(column, value_type)?;
1123 let array = array
1124 .as_primitive::<Decimal256Type>()
1125 .unary::<_, Int64Type>(|v| v.as_i128() as i64);
1126 write_primitive(typed, array.values(), levels)
1127 }
1128 _ => {
1129 let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
1130 let array = array.as_primitive::<Int64Type>();
1131 write_primitive(typed, array.values(), levels)
1132 }
1133 },
1134 _ => {
1135 let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
1136 let array = array.as_primitive::<Int64Type>();
1137 write_primitive(typed, array.values(), levels)
1138 }
1139 }
1140 }
1141 ColumnWriter::Int96ColumnWriter(ref mut _typed) => {
1142 unreachable!("Currently unreachable because data type not supported")
1143 }
1144 ColumnWriter::FloatColumnWriter(ref mut typed) => {
1145 let array = column.as_primitive::<Float32Type>();
1146 write_primitive(typed, array.values(), levels)
1147 }
1148 ColumnWriter::DoubleColumnWriter(ref mut typed) => {
1149 let array = column.as_primitive::<Float64Type>();
1150 write_primitive(typed, array.values(), levels)
1151 }
1152 ColumnWriter::ByteArrayColumnWriter(_) => {
1153 unreachable!("should use ByteArrayWriter")
1154 }
1155 ColumnWriter::FixedLenByteArrayColumnWriter(ref mut typed) => {
1156 let bytes = match column.data_type() {
1157 ArrowDataType::Interval(interval_unit) => match interval_unit {
1158 IntervalUnit::YearMonth => {
1159 let array = column
1160 .as_any()
1161 .downcast_ref::<arrow_array::IntervalYearMonthArray>()
1162 .unwrap();
1163 get_interval_ym_array_slice(array, indices)
1164 }
1165 IntervalUnit::DayTime => {
1166 let array = column
1167 .as_any()
1168 .downcast_ref::<arrow_array::IntervalDayTimeArray>()
1169 .unwrap();
1170 get_interval_dt_array_slice(array, indices)
1171 }
1172 _ => {
1173 return Err(ParquetError::NYI(
1174 format!(
1175 "Attempting to write an Arrow interval type {interval_unit:?} to parquet that is not yet implemented"
1176 )
1177 ));
1178 }
1179 },
1180 ArrowDataType::FixedSizeBinary(_) => {
1181 let array = column
1182 .as_any()
1183 .downcast_ref::<arrow_array::FixedSizeBinaryArray>()
1184 .unwrap();
1185 get_fsb_array_slice(array, indices)
1186 }
1187 ArrowDataType::Decimal128(_, _) => {
1188 let array = column.as_primitive::<Decimal128Type>();
1189 get_decimal_128_array_slice(array, indices)
1190 }
1191 ArrowDataType::Decimal256(_, _) => {
1192 let array = column
1193 .as_any()
1194 .downcast_ref::<arrow_array::Decimal256Array>()
1195 .unwrap();
1196 get_decimal_256_array_slice(array, indices)
1197 }
1198 ArrowDataType::Float16 => {
1199 let array = column.as_primitive::<Float16Type>();
1200 get_float_16_array_slice(array, indices)
1201 }
1202 _ => {
1203 return Err(ParquetError::NYI(
1204 "Attempting to write an Arrow type that is not yet implemented".to_string(),
1205 ));
1206 }
1207 };
1208 typed.write_batch(bytes.as_slice(), levels.def_levels(), levels.rep_levels())
1209 }
1210 }
1211}
1212
1213fn write_primitive<E: ColumnValueEncoder>(
1214 writer: &mut GenericColumnWriter<E>,
1215 values: &E::Values,
1216 levels: &ArrayLevels,
1217) -> Result<usize> {
1218 writer.write_batch_internal(
1219 values,
1220 Some(levels.non_null_indices()),
1221 levels.def_levels(),
1222 levels.rep_levels(),
1223 None,
1224 None,
1225 None,
1226 )
1227}
1228
1229fn get_bool_array_slice(array: &arrow_array::BooleanArray, indices: &[usize]) -> Vec<bool> {
1230 let mut values = Vec::with_capacity(indices.len());
1231 for i in indices {
1232 values.push(array.value(*i))
1233 }
1234 values
1235}
1236
1237fn get_interval_ym_array_slice(
1240 array: &arrow_array::IntervalYearMonthArray,
1241 indices: &[usize],
1242) -> Vec<FixedLenByteArray> {
1243 let mut values = Vec::with_capacity(indices.len());
1244 for i in indices {
1245 let mut value = array.value(*i).to_le_bytes().to_vec();
1246 let mut suffix = vec![0; 8];
1247 value.append(&mut suffix);
1248 values.push(FixedLenByteArray::from(ByteArray::from(value)))
1249 }
1250 values
1251}
1252
1253fn get_interval_dt_array_slice(
1256 array: &arrow_array::IntervalDayTimeArray,
1257 indices: &[usize],
1258) -> Vec<FixedLenByteArray> {
1259 let mut values = Vec::with_capacity(indices.len());
1260 for i in indices {
1261 let mut out = [0; 12];
1262 let value = array.value(*i);
1263 out[4..8].copy_from_slice(&value.days.to_le_bytes());
1264 out[8..12].copy_from_slice(&value.milliseconds.to_le_bytes());
1265 values.push(FixedLenByteArray::from(ByteArray::from(out.to_vec())));
1266 }
1267 values
1268}
1269
1270fn get_decimal_128_array_slice(
1271 array: &arrow_array::Decimal128Array,
1272 indices: &[usize],
1273) -> Vec<FixedLenByteArray> {
1274 let mut values = Vec::with_capacity(indices.len());
1275 let size = decimal_length_from_precision(array.precision());
1276 for i in indices {
1277 let as_be_bytes = array.value(*i).to_be_bytes();
1278 let resized_value = as_be_bytes[(16 - size)..].to_vec();
1279 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1280 }
1281 values
1282}
1283
1284fn get_decimal_256_array_slice(
1285 array: &arrow_array::Decimal256Array,
1286 indices: &[usize],
1287) -> Vec<FixedLenByteArray> {
1288 let mut values = Vec::with_capacity(indices.len());
1289 let size = decimal_length_from_precision(array.precision());
1290 for i in indices {
1291 let as_be_bytes = array.value(*i).to_be_bytes();
1292 let resized_value = as_be_bytes[(32 - size)..].to_vec();
1293 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1294 }
1295 values
1296}
1297
1298fn get_float_16_array_slice(
1299 array: &arrow_array::Float16Array,
1300 indices: &[usize],
1301) -> Vec<FixedLenByteArray> {
1302 let mut values = Vec::with_capacity(indices.len());
1303 for i in indices {
1304 let value = array.value(*i).to_le_bytes().to_vec();
1305 values.push(FixedLenByteArray::from(ByteArray::from(value)));
1306 }
1307 values
1308}
1309
1310fn get_fsb_array_slice(
1311 array: &arrow_array::FixedSizeBinaryArray,
1312 indices: &[usize],
1313) -> Vec<FixedLenByteArray> {
1314 let mut values = Vec::with_capacity(indices.len());
1315 for i in indices {
1316 let value = array.value(*i).to_vec();
1317 values.push(FixedLenByteArray::from(ByteArray::from(value)))
1318 }
1319 values
1320}
1321
1322#[cfg(test)]
1323mod tests {
1324 use super::*;
1325
1326 use std::fs::File;
1327 use std::io::Seek;
1328
1329 use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
1330 use crate::arrow::ARROW_SCHEMA_META_KEY;
1331 use crate::format::PageHeader;
1332 use crate::thrift::TCompactSliceInputProtocol;
1333 use arrow::datatypes::ToByteSlice;
1334 use arrow::datatypes::{DataType, Schema};
1335 use arrow::error::Result as ArrowResult;
1336 use arrow::util::data_gen::create_random_array;
1337 use arrow::util::pretty::pretty_format_batches;
1338 use arrow::{array::*, buffer::Buffer};
1339 use arrow_buffer::{i256, IntervalDayTime, IntervalMonthDayNano, NullBuffer};
1340 use arrow_schema::Fields;
1341 use half::f16;
1342 use num::{FromPrimitive, ToPrimitive};
1343
1344 use crate::basic::Encoding;
1345 use crate::data_type::AsBytes;
1346 use crate::file::metadata::ParquetMetaData;
1347 use crate::file::page_index::index::Index;
1348 use crate::file::page_index::index_reader::read_offset_indexes;
1349 use crate::file::properties::{
1350 BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion,
1351 };
1352 use crate::file::serialized_reader::ReadOptionsBuilder;
1353 use crate::file::{
1354 reader::{FileReader, SerializedFileReader},
1355 statistics::Statistics,
1356 };
1357
1358 #[test]
1359 fn arrow_writer() {
1360 let schema = Schema::new(vec![
1362 Field::new("a", DataType::Int32, false),
1363 Field::new("b", DataType::Int32, true),
1364 ]);
1365
1366 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1368 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1369
1370 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
1372
1373 roundtrip(batch, Some(SMALL_SIZE / 2));
1374 }
1375
1376 fn get_bytes_after_close(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1377 let mut buffer = vec![];
1378
1379 let mut writer = ArrowWriter::try_new(&mut buffer, schema, None).unwrap();
1380 writer.write(expected_batch).unwrap();
1381 writer.close().unwrap();
1382
1383 buffer
1384 }
1385
1386 fn get_bytes_by_into_inner(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1387 let mut writer = ArrowWriter::try_new(Vec::new(), schema, None).unwrap();
1388 writer.write(expected_batch).unwrap();
1389 writer.into_inner().unwrap()
1390 }
1391
1392 #[test]
1393 fn roundtrip_bytes() {
1394 let schema = Arc::new(Schema::new(vec![
1396 Field::new("a", DataType::Int32, false),
1397 Field::new("b", DataType::Int32, true),
1398 ]));
1399
1400 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1402 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1403
1404 let expected_batch =
1406 RecordBatch::try_new(schema.clone(), vec![Arc::new(a), Arc::new(b)]).unwrap();
1407
1408 for buffer in [
1409 get_bytes_after_close(schema.clone(), &expected_batch),
1410 get_bytes_by_into_inner(schema, &expected_batch),
1411 ] {
1412 let cursor = Bytes::from(buffer);
1413 let mut record_batch_reader = ParquetRecordBatchReader::try_new(cursor, 1024).unwrap();
1414
1415 let actual_batch = record_batch_reader
1416 .next()
1417 .expect("No batch found")
1418 .expect("Unable to get batch");
1419
1420 assert_eq!(expected_batch.schema(), actual_batch.schema());
1421 assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
1422 assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
1423 for i in 0..expected_batch.num_columns() {
1424 let expected_data = expected_batch.column(i).to_data();
1425 let actual_data = actual_batch.column(i).to_data();
1426
1427 assert_eq!(expected_data, actual_data);
1428 }
1429 }
1430 }
1431
1432 #[test]
1433 fn arrow_writer_non_null() {
1434 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1436
1437 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1439
1440 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1442
1443 roundtrip(batch, Some(SMALL_SIZE / 2));
1444 }
1445
1446 #[test]
1447 fn arrow_writer_list() {
1448 let schema = Schema::new(vec![Field::new(
1450 "a",
1451 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
1452 true,
1453 )]);
1454
1455 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1457
1458 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1461
1462 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
1464 DataType::Int32,
1465 false,
1466 ))))
1467 .len(5)
1468 .add_buffer(a_value_offsets)
1469 .add_child_data(a_values.into_data())
1470 .null_bit_buffer(Some(Buffer::from([0b00011011])))
1471 .build()
1472 .unwrap();
1473 let a = ListArray::from(a_list_data);
1474
1475 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1477
1478 assert_eq!(batch.column(0).null_count(), 1);
1479
1480 roundtrip(batch, None);
1483 }
1484
1485 #[test]
1486 fn arrow_writer_list_non_null() {
1487 let schema = Schema::new(vec![Field::new(
1489 "a",
1490 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
1491 false,
1492 )]);
1493
1494 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1496
1497 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1500
1501 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
1503 DataType::Int32,
1504 false,
1505 ))))
1506 .len(5)
1507 .add_buffer(a_value_offsets)
1508 .add_child_data(a_values.into_data())
1509 .build()
1510 .unwrap();
1511 let a = ListArray::from(a_list_data);
1512
1513 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1515
1516 assert_eq!(batch.column(0).null_count(), 0);
1519
1520 roundtrip(batch, None);
1521 }
1522
1523 #[test]
1524 fn arrow_writer_binary() {
1525 let string_field = Field::new("a", DataType::Utf8, false);
1526 let binary_field = Field::new("b", DataType::Binary, false);
1527 let schema = Schema::new(vec![string_field, binary_field]);
1528
1529 let raw_string_values = vec!["foo", "bar", "baz", "quux"];
1530 let raw_binary_values = [
1531 b"foo".to_vec(),
1532 b"bar".to_vec(),
1533 b"baz".to_vec(),
1534 b"quux".to_vec(),
1535 ];
1536 let raw_binary_value_refs = raw_binary_values
1537 .iter()
1538 .map(|x| x.as_slice())
1539 .collect::<Vec<_>>();
1540
1541 let string_values = StringArray::from(raw_string_values.clone());
1542 let binary_values = BinaryArray::from(raw_binary_value_refs);
1543 let batch = RecordBatch::try_new(
1544 Arc::new(schema),
1545 vec![Arc::new(string_values), Arc::new(binary_values)],
1546 )
1547 .unwrap();
1548
1549 roundtrip(batch, Some(SMALL_SIZE / 2));
1550 }
1551
1552 #[test]
1553 fn arrow_writer_binary_view() {
1554 let string_field = Field::new("a", DataType::Utf8View, false);
1555 let binary_field = Field::new("b", DataType::BinaryView, false);
1556 let nullable_string_field = Field::new("a", DataType::Utf8View, true);
1557 let schema = Schema::new(vec![string_field, binary_field, nullable_string_field]);
1558
1559 let raw_string_values = vec!["foo", "bar", "large payload over 12 bytes", "lulu"];
1560 let raw_binary_values = vec![
1561 b"foo".to_vec(),
1562 b"bar".to_vec(),
1563 b"large payload over 12 bytes".to_vec(),
1564 b"lulu".to_vec(),
1565 ];
1566 let nullable_string_values =
1567 vec![Some("foo"), None, Some("large payload over 12 bytes"), None];
1568
1569 let string_view_values = StringViewArray::from(raw_string_values);
1570 let binary_view_values = BinaryViewArray::from_iter_values(raw_binary_values);
1571 let nullable_string_view_values = StringViewArray::from(nullable_string_values);
1572 let batch = RecordBatch::try_new(
1573 Arc::new(schema),
1574 vec![
1575 Arc::new(string_view_values),
1576 Arc::new(binary_view_values),
1577 Arc::new(nullable_string_view_values),
1578 ],
1579 )
1580 .unwrap();
1581
1582 roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
1583 roundtrip(batch, None);
1584 }
1585
1586 fn get_decimal_batch(precision: u8, scale: i8) -> RecordBatch {
1587 let decimal_field = Field::new("a", DataType::Decimal128(precision, scale), false);
1588 let schema = Schema::new(vec![decimal_field]);
1589
1590 let decimal_values = vec![10_000, 50_000, 0, -100]
1591 .into_iter()
1592 .map(Some)
1593 .collect::<Decimal128Array>()
1594 .with_precision_and_scale(precision, scale)
1595 .unwrap();
1596
1597 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(decimal_values)]).unwrap()
1598 }
1599
1600 #[test]
1601 fn arrow_writer_decimal() {
1602 let batch_int32_decimal = get_decimal_batch(5, 2);
1604 roundtrip(batch_int32_decimal, Some(SMALL_SIZE / 2));
1605 let batch_int64_decimal = get_decimal_batch(12, 2);
1607 roundtrip(batch_int64_decimal, Some(SMALL_SIZE / 2));
1608 let batch_fixed_len_byte_array_decimal = get_decimal_batch(30, 2);
1610 roundtrip(batch_fixed_len_byte_array_decimal, Some(SMALL_SIZE / 2));
1611 }
1612
1613 #[test]
1614 fn arrow_writer_complex() {
1615 let struct_field_d = Arc::new(Field::new("d", DataType::Float64, true));
1617 let struct_field_f = Arc::new(Field::new("f", DataType::Float32, true));
1618 let struct_field_g = Arc::new(Field::new_list(
1619 "g",
1620 Field::new_list_field(DataType::Int16, true),
1621 false,
1622 ));
1623 let struct_field_h = Arc::new(Field::new_list(
1624 "h",
1625 Field::new_list_field(DataType::Int16, false),
1626 true,
1627 ));
1628 let struct_field_e = Arc::new(Field::new_struct(
1629 "e",
1630 vec![
1631 struct_field_f.clone(),
1632 struct_field_g.clone(),
1633 struct_field_h.clone(),
1634 ],
1635 false,
1636 ));
1637 let schema = Schema::new(vec![
1638 Field::new("a", DataType::Int32, false),
1639 Field::new("b", DataType::Int32, true),
1640 Field::new_struct(
1641 "c",
1642 vec![struct_field_d.clone(), struct_field_e.clone()],
1643 false,
1644 ),
1645 ]);
1646
1647 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1649 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1650 let d = Float64Array::from(vec![None, None, None, Some(1.0), None]);
1651 let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]);
1652
1653 let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1654
1655 let g_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1658
1659 let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
1661 .len(5)
1662 .add_buffer(g_value_offsets.clone())
1663 .add_child_data(g_value.to_data())
1664 .build()
1665 .unwrap();
1666 let g = ListArray::from(g_list_data);
1667 let h_list_data = ArrayData::builder(struct_field_h.data_type().clone())
1669 .len(5)
1670 .add_buffer(g_value_offsets)
1671 .add_child_data(g_value.to_data())
1672 .null_bit_buffer(Some(Buffer::from([0b00011011])))
1673 .build()
1674 .unwrap();
1675 let h = ListArray::from(h_list_data);
1676
1677 let e = StructArray::from(vec![
1678 (struct_field_f, Arc::new(f) as ArrayRef),
1679 (struct_field_g, Arc::new(g) as ArrayRef),
1680 (struct_field_h, Arc::new(h) as ArrayRef),
1681 ]);
1682
1683 let c = StructArray::from(vec![
1684 (struct_field_d, Arc::new(d) as ArrayRef),
1685 (struct_field_e, Arc::new(e) as ArrayRef),
1686 ]);
1687
1688 let batch = RecordBatch::try_new(
1690 Arc::new(schema),
1691 vec![Arc::new(a), Arc::new(b), Arc::new(c)],
1692 )
1693 .unwrap();
1694
1695 roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
1696 roundtrip(batch, Some(SMALL_SIZE / 3));
1697 }
1698
1699 #[test]
1700 fn arrow_writer_complex_mixed() {
1701 let offset_field = Arc::new(Field::new("offset", DataType::Int32, false));
1706 let partition_field = Arc::new(Field::new("partition", DataType::Int64, true));
1707 let topic_field = Arc::new(Field::new("topic", DataType::Utf8, true));
1708 let schema = Schema::new(vec![Field::new(
1709 "some_nested_object",
1710 DataType::Struct(Fields::from(vec![
1711 offset_field.clone(),
1712 partition_field.clone(),
1713 topic_field.clone(),
1714 ])),
1715 false,
1716 )]);
1717
1718 let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
1720 let partition = Int64Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1721 let topic = StringArray::from(vec![Some("A"), None, Some("A"), Some(""), None]);
1722
1723 let some_nested_object = StructArray::from(vec![
1724 (offset_field, Arc::new(offset) as ArrayRef),
1725 (partition_field, Arc::new(partition) as ArrayRef),
1726 (topic_field, Arc::new(topic) as ArrayRef),
1727 ]);
1728
1729 let batch =
1731 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
1732
1733 roundtrip(batch, Some(SMALL_SIZE / 2));
1734 }
1735
1736 #[test]
1737 fn arrow_writer_map() {
1738 let json_content = r#"
1740 {"stocks":{"long": "$AAA", "short": "$BBB"}}
1741 {"stocks":{"long": null, "long": "$CCC", "short": null}}
1742 {"stocks":{"hedged": "$YYY", "long": null, "short": "$D"}}
1743 "#;
1744 let entries_struct_type = DataType::Struct(Fields::from(vec![
1745 Field::new("key", DataType::Utf8, false),
1746 Field::new("value", DataType::Utf8, true),
1747 ]));
1748 let stocks_field = Field::new(
1749 "stocks",
1750 DataType::Map(
1751 Arc::new(Field::new("entries", entries_struct_type, false)),
1752 false,
1753 ),
1754 true,
1755 );
1756 let schema = Arc::new(Schema::new(vec![stocks_field]));
1757 let builder = arrow::json::ReaderBuilder::new(schema).with_batch_size(64);
1758 let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
1759
1760 let batch = reader.next().unwrap().unwrap();
1761 roundtrip(batch, None);
1762 }
1763
1764 #[test]
1765 fn arrow_writer_2_level_struct() {
1766 let field_c = Field::new("c", DataType::Int32, true);
1768 let field_b = Field::new("b", DataType::Struct(vec![field_c].into()), true);
1769 let type_a = DataType::Struct(vec![field_b.clone()].into());
1770 let field_a = Field::new("a", type_a, true);
1771 let schema = Schema::new(vec![field_a.clone()]);
1772
1773 let c = Int32Array::from(vec![Some(1), None, Some(3), None, None, Some(6)]);
1775 let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
1776 .len(6)
1777 .null_bit_buffer(Some(Buffer::from([0b00100111])))
1778 .add_child_data(c.into_data())
1779 .build()
1780 .unwrap();
1781 let b = StructArray::from(b_data);
1782 let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
1783 .len(6)
1784 .null_bit_buffer(Some(Buffer::from([0b00101111])))
1785 .add_child_data(b.into_data())
1786 .build()
1787 .unwrap();
1788 let a = StructArray::from(a_data);
1789
1790 assert_eq!(a.null_count(), 1);
1791 assert_eq!(a.column(0).null_count(), 2);
1792
1793 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1795
1796 roundtrip(batch, Some(SMALL_SIZE / 2));
1797 }
1798
1799 #[test]
1800 fn arrow_writer_2_level_struct_non_null() {
1801 let field_c = Field::new("c", DataType::Int32, false);
1803 let type_b = DataType::Struct(vec![field_c].into());
1804 let field_b = Field::new("b", type_b.clone(), false);
1805 let type_a = DataType::Struct(vec![field_b].into());
1806 let field_a = Field::new("a", type_a.clone(), false);
1807 let schema = Schema::new(vec![field_a]);
1808
1809 let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
1811 let b_data = ArrayDataBuilder::new(type_b)
1812 .len(6)
1813 .add_child_data(c.into_data())
1814 .build()
1815 .unwrap();
1816 let b = StructArray::from(b_data);
1817 let a_data = ArrayDataBuilder::new(type_a)
1818 .len(6)
1819 .add_child_data(b.into_data())
1820 .build()
1821 .unwrap();
1822 let a = StructArray::from(a_data);
1823
1824 assert_eq!(a.null_count(), 0);
1825 assert_eq!(a.column(0).null_count(), 0);
1826
1827 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1829
1830 roundtrip(batch, Some(SMALL_SIZE / 2));
1831 }
1832
1833 #[test]
1834 fn arrow_writer_2_level_struct_mixed_null() {
1835 let field_c = Field::new("c", DataType::Int32, false);
1837 let type_b = DataType::Struct(vec![field_c].into());
1838 let field_b = Field::new("b", type_b.clone(), true);
1839 let type_a = DataType::Struct(vec![field_b].into());
1840 let field_a = Field::new("a", type_a.clone(), false);
1841 let schema = Schema::new(vec![field_a]);
1842
1843 let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
1845 let b_data = ArrayDataBuilder::new(type_b)
1846 .len(6)
1847 .null_bit_buffer(Some(Buffer::from([0b00100111])))
1848 .add_child_data(c.into_data())
1849 .build()
1850 .unwrap();
1851 let b = StructArray::from(b_data);
1852 let a_data = ArrayDataBuilder::new(type_a)
1854 .len(6)
1855 .add_child_data(b.into_data())
1856 .build()
1857 .unwrap();
1858 let a = StructArray::from(a_data);
1859
1860 assert_eq!(a.null_count(), 0);
1861 assert_eq!(a.column(0).null_count(), 2);
1862
1863 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1865
1866 roundtrip(batch, Some(SMALL_SIZE / 2));
1867 }
1868
1869 #[test]
1870 fn arrow_writer_2_level_struct_mixed_null_2() {
1871 let field_c = Field::new("c", DataType::Int32, false);
1873 let field_d = Field::new("d", DataType::FixedSizeBinary(4), false);
1874 let field_e = Field::new(
1875 "e",
1876 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1877 false,
1878 );
1879
1880 let field_b = Field::new(
1881 "b",
1882 DataType::Struct(vec![field_c, field_d, field_e].into()),
1883 false,
1884 );
1885 let type_a = DataType::Struct(vec![field_b.clone()].into());
1886 let field_a = Field::new("a", type_a, true);
1887 let schema = Schema::new(vec![field_a.clone()]);
1888
1889 let c = Int32Array::from_iter_values(0..6);
1891 let d = FixedSizeBinaryArray::try_from_iter(
1892 ["aaaa", "bbbb", "cccc", "dddd", "eeee", "ffff"].into_iter(),
1893 )
1894 .expect("four byte values");
1895 let e = Int32DictionaryArray::from_iter(["one", "two", "three", "four", "five", "one"]);
1896 let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
1897 .len(6)
1898 .add_child_data(c.into_data())
1899 .add_child_data(d.into_data())
1900 .add_child_data(e.into_data())
1901 .build()
1902 .unwrap();
1903 let b = StructArray::from(b_data);
1904 let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
1905 .len(6)
1906 .null_bit_buffer(Some(Buffer::from([0b00100101])))
1907 .add_child_data(b.into_data())
1908 .build()
1909 .unwrap();
1910 let a = StructArray::from(a_data);
1911
1912 assert_eq!(a.null_count(), 3);
1913 assert_eq!(a.column(0).null_count(), 0);
1914
1915 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1917
1918 roundtrip(batch, Some(SMALL_SIZE / 2));
1919 }
1920
1921 #[test]
1922 fn test_fixed_size_binary_in_dict() {
1923 fn test_fixed_size_binary_in_dict_inner<K>()
1924 where
1925 K: ArrowDictionaryKeyType,
1926 K::Native: FromPrimitive + ToPrimitive + TryFrom<u8>,
1927 <<K as arrow_array::ArrowPrimitiveType>::Native as TryFrom<u8>>::Error: std::fmt::Debug,
1928 {
1929 let field = Field::new(
1930 "a",
1931 DataType::Dictionary(
1932 Box::new(K::DATA_TYPE),
1933 Box::new(DataType::FixedSizeBinary(4)),
1934 ),
1935 false,
1936 );
1937 let schema = Schema::new(vec![field]);
1938
1939 let keys: Vec<K::Native> = vec![
1940 K::Native::try_from(0u8).unwrap(),
1941 K::Native::try_from(0u8).unwrap(),
1942 K::Native::try_from(1u8).unwrap(),
1943 ];
1944 let keys = PrimitiveArray::<K>::from_iter_values(keys);
1945 let values = FixedSizeBinaryArray::try_from_iter(
1946 vec![vec![0, 0, 0, 0], vec![1, 1, 1, 1]].into_iter(),
1947 )
1948 .unwrap();
1949
1950 let data = DictionaryArray::<K>::new(keys, Arc::new(values));
1951 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(data)]).unwrap();
1952 roundtrip(batch, None);
1953 }
1954
1955 test_fixed_size_binary_in_dict_inner::<UInt8Type>();
1956 test_fixed_size_binary_in_dict_inner::<UInt16Type>();
1957 test_fixed_size_binary_in_dict_inner::<UInt32Type>();
1958 test_fixed_size_binary_in_dict_inner::<UInt16Type>();
1959 test_fixed_size_binary_in_dict_inner::<Int8Type>();
1960 test_fixed_size_binary_in_dict_inner::<Int16Type>();
1961 test_fixed_size_binary_in_dict_inner::<Int32Type>();
1962 test_fixed_size_binary_in_dict_inner::<Int64Type>();
1963 }
1964
1965 #[test]
1966 fn test_empty_dict() {
1967 let struct_fields = Fields::from(vec![Field::new(
1968 "dict",
1969 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1970 false,
1971 )]);
1972
1973 let schema = Schema::new(vec![Field::new_struct(
1974 "struct",
1975 struct_fields.clone(),
1976 true,
1977 )]);
1978 let dictionary = Arc::new(DictionaryArray::new(
1979 Int32Array::new_null(5),
1980 Arc::new(StringArray::new_null(0)),
1981 ));
1982
1983 let s = StructArray::new(
1984 struct_fields,
1985 vec![dictionary],
1986 Some(NullBuffer::new_null(5)),
1987 );
1988
1989 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(s)]).unwrap();
1990 roundtrip(batch, None);
1991 }
1992 #[test]
1993 fn arrow_writer_page_size() {
1994 let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)]));
1995
1996 let mut builder = StringBuilder::with_capacity(100, 329 * 10_000);
1997
1998 for i in 0..10 {
2000 let value = i
2001 .to_string()
2002 .repeat(10)
2003 .chars()
2004 .take(10)
2005 .collect::<String>();
2006
2007 builder.append_value(value);
2008 }
2009
2010 let array = Arc::new(builder.finish());
2011
2012 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
2013
2014 let file = tempfile::tempfile().unwrap();
2015
2016 let props = WriterProperties::builder()
2018 .set_data_page_size_limit(1)
2019 .set_dictionary_page_size_limit(1)
2020 .set_write_batch_size(1)
2021 .build();
2022
2023 let mut writer =
2024 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
2025 .expect("Unable to write file");
2026 writer.write(&batch).unwrap();
2027 writer.close().unwrap();
2028
2029 let reader = SerializedFileReader::new(file.try_clone().unwrap()).unwrap();
2030
2031 let column = reader.metadata().row_group(0).columns();
2032
2033 assert_eq!(column.len(), 1);
2034
2035 assert!(
2038 column[0].dictionary_page_offset().is_some(),
2039 "Expected a dictionary page"
2040 );
2041
2042 let offset_indexes = read_offset_indexes(&file, column).unwrap().unwrap();
2043
2044 let page_locations = offset_indexes[0].page_locations.clone();
2045
2046 assert_eq!(
2049 page_locations.len(),
2050 10,
2051 "Expected 9 pages but got {page_locations:#?}"
2052 );
2053 }
2054
2055 #[test]
2056 fn arrow_writer_float_nans() {
2057 let f16_field = Field::new("a", DataType::Float16, false);
2058 let f32_field = Field::new("b", DataType::Float32, false);
2059 let f64_field = Field::new("c", DataType::Float64, false);
2060 let schema = Schema::new(vec![f16_field, f32_field, f64_field]);
2061
2062 let f16_values = (0..MEDIUM_SIZE)
2063 .map(|i| {
2064 Some(if i % 2 == 0 {
2065 f16::NAN
2066 } else {
2067 f16::from_f32(i as f32)
2068 })
2069 })
2070 .collect::<Float16Array>();
2071
2072 let f32_values = (0..MEDIUM_SIZE)
2073 .map(|i| Some(if i % 2 == 0 { f32::NAN } else { i as f32 }))
2074 .collect::<Float32Array>();
2075
2076 let f64_values = (0..MEDIUM_SIZE)
2077 .map(|i| Some(if i % 2 == 0 { f64::NAN } else { i as f64 }))
2078 .collect::<Float64Array>();
2079
2080 let batch = RecordBatch::try_new(
2081 Arc::new(schema),
2082 vec![
2083 Arc::new(f16_values),
2084 Arc::new(f32_values),
2085 Arc::new(f64_values),
2086 ],
2087 )
2088 .unwrap();
2089
2090 roundtrip(batch, None);
2091 }
2092
2093 const SMALL_SIZE: usize = 7;
2094 const MEDIUM_SIZE: usize = 63;
2095
2096 fn roundtrip(expected_batch: RecordBatch, max_row_group_size: Option<usize>) -> Vec<File> {
2097 let mut files = vec![];
2098 for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2099 let mut props = WriterProperties::builder().set_writer_version(version);
2100
2101 if let Some(size) = max_row_group_size {
2102 props = props.set_max_row_group_size(size)
2103 }
2104
2105 let props = props.build();
2106 files.push(roundtrip_opts(&expected_batch, props))
2107 }
2108 files
2109 }
2110
2111 fn roundtrip_opts_with_array_validation<F>(
2112 expected_batch: &RecordBatch,
2113 props: WriterProperties,
2114 validate: F,
2115 ) -> File
2116 where
2117 F: Fn(&ArrayData, &ArrayData),
2118 {
2119 let file = tempfile::tempfile().unwrap();
2120
2121 let mut writer = ArrowWriter::try_new(
2122 file.try_clone().unwrap(),
2123 expected_batch.schema(),
2124 Some(props),
2125 )
2126 .expect("Unable to write file");
2127 writer.write(expected_batch).unwrap();
2128 writer.close().unwrap();
2129
2130 let mut record_batch_reader =
2131 ParquetRecordBatchReader::try_new(file.try_clone().unwrap(), 1024).unwrap();
2132
2133 let actual_batch = record_batch_reader
2134 .next()
2135 .expect("No batch found")
2136 .expect("Unable to get batch");
2137
2138 assert_eq!(expected_batch.schema(), actual_batch.schema());
2139 assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
2140 assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
2141 for i in 0..expected_batch.num_columns() {
2142 let expected_data = expected_batch.column(i).to_data();
2143 let actual_data = actual_batch.column(i).to_data();
2144 validate(&expected_data, &actual_data);
2145 }
2146
2147 file
2148 }
2149
2150 fn roundtrip_opts(expected_batch: &RecordBatch, props: WriterProperties) -> File {
2151 roundtrip_opts_with_array_validation(expected_batch, props, |a, b| {
2152 a.validate_full().expect("valid expected data");
2153 b.validate_full().expect("valid actual data");
2154 assert_eq!(a, b)
2155 })
2156 }
2157
2158 struct RoundTripOptions {
2159 values: ArrayRef,
2160 schema: SchemaRef,
2161 bloom_filter: bool,
2162 bloom_filter_position: BloomFilterPosition,
2163 }
2164
2165 impl RoundTripOptions {
2166 fn new(values: ArrayRef, nullable: bool) -> Self {
2167 let data_type = values.data_type().clone();
2168 let schema = Schema::new(vec![Field::new("col", data_type, nullable)]);
2169 Self {
2170 values,
2171 schema: Arc::new(schema),
2172 bloom_filter: false,
2173 bloom_filter_position: BloomFilterPosition::AfterRowGroup,
2174 }
2175 }
2176 }
2177
2178 fn one_column_roundtrip(values: ArrayRef, nullable: bool) -> Vec<File> {
2179 one_column_roundtrip_with_options(RoundTripOptions::new(values, nullable))
2180 }
2181
2182 fn one_column_roundtrip_with_schema(values: ArrayRef, schema: SchemaRef) -> Vec<File> {
2183 let mut options = RoundTripOptions::new(values, false);
2184 options.schema = schema;
2185 one_column_roundtrip_with_options(options)
2186 }
2187
2188 fn one_column_roundtrip_with_options(options: RoundTripOptions) -> Vec<File> {
2189 let RoundTripOptions {
2190 values,
2191 schema,
2192 bloom_filter,
2193 bloom_filter_position,
2194 } = options;
2195
2196 let encodings = match values.data_type() {
2197 DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary | DataType::LargeBinary => {
2198 vec![
2199 Encoding::PLAIN,
2200 Encoding::DELTA_BYTE_ARRAY,
2201 Encoding::DELTA_LENGTH_BYTE_ARRAY,
2202 ]
2203 }
2204 DataType::Int64
2205 | DataType::Int32
2206 | DataType::Int16
2207 | DataType::Int8
2208 | DataType::UInt64
2209 | DataType::UInt32
2210 | DataType::UInt16
2211 | DataType::UInt8 => vec![
2212 Encoding::PLAIN,
2213 Encoding::DELTA_BINARY_PACKED,
2214 Encoding::BYTE_STREAM_SPLIT,
2215 ],
2216 DataType::Float32 | DataType::Float64 => {
2217 vec![Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT]
2218 }
2219 _ => vec![Encoding::PLAIN],
2220 };
2221
2222 let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2223
2224 let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
2225
2226 let mut files = vec![];
2227 for dictionary_size in [0, 1, 1024] {
2228 for encoding in &encodings {
2229 for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2230 for row_group_size in row_group_sizes {
2231 let props = WriterProperties::builder()
2232 .set_writer_version(version)
2233 .set_max_row_group_size(row_group_size)
2234 .set_dictionary_enabled(dictionary_size != 0)
2235 .set_dictionary_page_size_limit(dictionary_size.max(1))
2236 .set_encoding(*encoding)
2237 .set_bloom_filter_enabled(bloom_filter)
2238 .set_bloom_filter_position(bloom_filter_position)
2239 .build();
2240
2241 files.push(roundtrip_opts(&expected_batch, props))
2242 }
2243 }
2244 }
2245 }
2246 files
2247 }
2248
2249 fn values_required<A, I>(iter: I) -> Vec<File>
2250 where
2251 A: From<Vec<I::Item>> + Array + 'static,
2252 I: IntoIterator,
2253 {
2254 let raw_values: Vec<_> = iter.into_iter().collect();
2255 let values = Arc::new(A::from(raw_values));
2256 one_column_roundtrip(values, false)
2257 }
2258
2259 fn values_optional<A, I>(iter: I) -> Vec<File>
2260 where
2261 A: From<Vec<Option<I::Item>>> + Array + 'static,
2262 I: IntoIterator,
2263 {
2264 let optional_raw_values: Vec<_> = iter
2265 .into_iter()
2266 .enumerate()
2267 .map(|(i, v)| if i % 2 == 0 { None } else { Some(v) })
2268 .collect();
2269 let optional_values = Arc::new(A::from(optional_raw_values));
2270 one_column_roundtrip(optional_values, true)
2271 }
2272
2273 fn required_and_optional<A, I>(iter: I)
2274 where
2275 A: From<Vec<I::Item>> + From<Vec<Option<I::Item>>> + Array + 'static,
2276 I: IntoIterator + Clone,
2277 {
2278 values_required::<A, I>(iter.clone());
2279 values_optional::<A, I>(iter);
2280 }
2281
2282 fn check_bloom_filter<T: AsBytes>(
2283 files: Vec<File>,
2284 file_column: String,
2285 positive_values: Vec<T>,
2286 negative_values: Vec<T>,
2287 ) {
2288 files.into_iter().take(1).for_each(|file| {
2289 let file_reader = SerializedFileReader::new_with_options(
2290 file,
2291 ReadOptionsBuilder::new()
2292 .with_reader_properties(
2293 ReaderProperties::builder()
2294 .set_read_bloom_filter(true)
2295 .build(),
2296 )
2297 .build(),
2298 )
2299 .expect("Unable to open file as Parquet");
2300 let metadata = file_reader.metadata();
2301
2302 let mut bloom_filters: Vec<_> = vec![];
2304 for (ri, row_group) in metadata.row_groups().iter().enumerate() {
2305 if let Some((column_index, _)) = row_group
2306 .columns()
2307 .iter()
2308 .enumerate()
2309 .find(|(_, column)| column.column_path().string() == file_column)
2310 {
2311 let row_group_reader = file_reader
2312 .get_row_group(ri)
2313 .expect("Unable to read row group");
2314 if let Some(sbbf) = row_group_reader.get_column_bloom_filter(column_index) {
2315 bloom_filters.push(sbbf.clone());
2316 } else {
2317 panic!("No bloom filter for column named {file_column} found");
2318 }
2319 } else {
2320 panic!("No column named {file_column} found");
2321 }
2322 }
2323
2324 positive_values.iter().for_each(|value| {
2325 let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2326 assert!(
2327 found.is_some(),
2328 "{}",
2329 format!("Value {:?} should be in bloom filter", value.as_bytes())
2330 );
2331 });
2332
2333 negative_values.iter().for_each(|value| {
2334 let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2335 assert!(
2336 found.is_none(),
2337 "{}",
2338 format!("Value {:?} should not be in bloom filter", value.as_bytes())
2339 );
2340 });
2341 });
2342 }
2343
2344 #[test]
2345 fn all_null_primitive_single_column() {
2346 let values = Arc::new(Int32Array::from(vec![None; SMALL_SIZE]));
2347 one_column_roundtrip(values, true);
2348 }
2349 #[test]
2350 fn null_single_column() {
2351 let values = Arc::new(NullArray::new(SMALL_SIZE));
2352 one_column_roundtrip(values, true);
2353 }
2355
2356 #[test]
2357 fn bool_single_column() {
2358 required_and_optional::<BooleanArray, _>(
2359 [true, false].iter().cycle().copied().take(SMALL_SIZE),
2360 );
2361 }
2362
2363 #[test]
2364 fn bool_large_single_column() {
2365 let values = Arc::new(
2366 [None, Some(true), Some(false)]
2367 .iter()
2368 .cycle()
2369 .copied()
2370 .take(200_000)
2371 .collect::<BooleanArray>(),
2372 );
2373 let schema = Schema::new(vec![Field::new("col", values.data_type().clone(), true)]);
2374 let expected_batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2375 let file = tempfile::tempfile().unwrap();
2376
2377 let mut writer =
2378 ArrowWriter::try_new(file.try_clone().unwrap(), expected_batch.schema(), None)
2379 .expect("Unable to write file");
2380 writer.write(&expected_batch).unwrap();
2381 writer.close().unwrap();
2382 }
2383
2384 #[test]
2385 fn check_page_offset_index_with_nan() {
2386 let values = Arc::new(Float64Array::from(vec![f64::NAN; 10]));
2387 let schema = Schema::new(vec![Field::new("col", DataType::Float64, true)]);
2388 let batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2389
2390 let mut out = Vec::with_capacity(1024);
2391 let mut writer =
2392 ArrowWriter::try_new(&mut out, batch.schema(), None).expect("Unable to write file");
2393 writer.write(&batch).unwrap();
2394 let file_meta_data = writer.close().unwrap();
2395 for row_group in file_meta_data.row_groups {
2396 for column in row_group.columns {
2397 assert!(column.offset_index_offset.is_some());
2398 assert!(column.offset_index_length.is_some());
2399 assert!(column.column_index_offset.is_none());
2400 assert!(column.column_index_length.is_none());
2401 }
2402 }
2403 }
2404
2405 #[test]
2406 fn i8_single_column() {
2407 required_and_optional::<Int8Array, _>(0..SMALL_SIZE as i8);
2408 }
2409
2410 #[test]
2411 fn i16_single_column() {
2412 required_and_optional::<Int16Array, _>(0..SMALL_SIZE as i16);
2413 }
2414
2415 #[test]
2416 fn i32_single_column() {
2417 required_and_optional::<Int32Array, _>(0..SMALL_SIZE as i32);
2418 }
2419
2420 #[test]
2421 fn i64_single_column() {
2422 required_and_optional::<Int64Array, _>(0..SMALL_SIZE as i64);
2423 }
2424
2425 #[test]
2426 fn u8_single_column() {
2427 required_and_optional::<UInt8Array, _>(0..SMALL_SIZE as u8);
2428 }
2429
2430 #[test]
2431 fn u16_single_column() {
2432 required_and_optional::<UInt16Array, _>(0..SMALL_SIZE as u16);
2433 }
2434
2435 #[test]
2436 fn u32_single_column() {
2437 required_and_optional::<UInt32Array, _>(0..SMALL_SIZE as u32);
2438 }
2439
2440 #[test]
2441 fn u64_single_column() {
2442 required_and_optional::<UInt64Array, _>(0..SMALL_SIZE as u64);
2443 }
2444
2445 #[test]
2446 fn f32_single_column() {
2447 required_and_optional::<Float32Array, _>((0..SMALL_SIZE).map(|i| i as f32));
2448 }
2449
2450 #[test]
2451 fn f64_single_column() {
2452 required_and_optional::<Float64Array, _>((0..SMALL_SIZE).map(|i| i as f64));
2453 }
2454
2455 #[test]
2460 fn timestamp_second_single_column() {
2461 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2462 let values = Arc::new(TimestampSecondArray::from(raw_values));
2463
2464 one_column_roundtrip(values, false);
2465 }
2466
2467 #[test]
2468 fn timestamp_millisecond_single_column() {
2469 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2470 let values = Arc::new(TimestampMillisecondArray::from(raw_values));
2471
2472 one_column_roundtrip(values, false);
2473 }
2474
2475 #[test]
2476 fn timestamp_microsecond_single_column() {
2477 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2478 let values = Arc::new(TimestampMicrosecondArray::from(raw_values));
2479
2480 one_column_roundtrip(values, false);
2481 }
2482
2483 #[test]
2484 fn timestamp_nanosecond_single_column() {
2485 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2486 let values = Arc::new(TimestampNanosecondArray::from(raw_values));
2487
2488 one_column_roundtrip(values, false);
2489 }
2490
2491 #[test]
2492 fn date32_single_column() {
2493 required_and_optional::<Date32Array, _>(0..SMALL_SIZE as i32);
2494 }
2495
2496 #[test]
2497 fn date64_single_column() {
2498 required_and_optional::<Date64Array, _>(
2500 (0..(SMALL_SIZE as i64 * 86400000)).step_by(86400000),
2501 );
2502 }
2503
2504 #[test]
2505 fn time32_second_single_column() {
2506 required_and_optional::<Time32SecondArray, _>(0..SMALL_SIZE as i32);
2507 }
2508
2509 #[test]
2510 fn time32_millisecond_single_column() {
2511 required_and_optional::<Time32MillisecondArray, _>(0..SMALL_SIZE as i32);
2512 }
2513
2514 #[test]
2515 fn time64_microsecond_single_column() {
2516 required_and_optional::<Time64MicrosecondArray, _>(0..SMALL_SIZE as i64);
2517 }
2518
2519 #[test]
2520 fn time64_nanosecond_single_column() {
2521 required_and_optional::<Time64NanosecondArray, _>(0..SMALL_SIZE as i64);
2522 }
2523
2524 #[test]
2525 fn duration_second_single_column() {
2526 required_and_optional::<DurationSecondArray, _>(0..SMALL_SIZE as i64);
2527 }
2528
2529 #[test]
2530 fn duration_millisecond_single_column() {
2531 required_and_optional::<DurationMillisecondArray, _>(0..SMALL_SIZE as i64);
2532 }
2533
2534 #[test]
2535 fn duration_microsecond_single_column() {
2536 required_and_optional::<DurationMicrosecondArray, _>(0..SMALL_SIZE as i64);
2537 }
2538
2539 #[test]
2540 fn duration_nanosecond_single_column() {
2541 required_and_optional::<DurationNanosecondArray, _>(0..SMALL_SIZE as i64);
2542 }
2543
2544 #[test]
2545 fn interval_year_month_single_column() {
2546 required_and_optional::<IntervalYearMonthArray, _>(0..SMALL_SIZE as i32);
2547 }
2548
2549 #[test]
2550 fn interval_day_time_single_column() {
2551 required_and_optional::<IntervalDayTimeArray, _>(vec![
2552 IntervalDayTime::new(0, 1),
2553 IntervalDayTime::new(0, 3),
2554 IntervalDayTime::new(3, -2),
2555 IntervalDayTime::new(-200, 4),
2556 ]);
2557 }
2558
2559 #[test]
2560 #[should_panic(
2561 expected = "Attempting to write an Arrow interval type MonthDayNano to parquet that is not yet implemented"
2562 )]
2563 fn interval_month_day_nano_single_column() {
2564 required_and_optional::<IntervalMonthDayNanoArray, _>(vec![
2565 IntervalMonthDayNano::new(0, 1, 5),
2566 IntervalMonthDayNano::new(0, 3, 2),
2567 IntervalMonthDayNano::new(3, -2, -5),
2568 IntervalMonthDayNano::new(-200, 4, -1),
2569 ]);
2570 }
2571
2572 #[test]
2573 fn binary_single_column() {
2574 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2575 let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect();
2576 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2577
2578 values_required::<BinaryArray, _>(many_vecs_iter);
2580 }
2581
2582 #[test]
2583 fn binary_view_single_column() {
2584 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2585 let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect();
2586 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2587
2588 values_required::<BinaryViewArray, _>(many_vecs_iter);
2590 }
2591
2592 #[test]
2593 fn i32_column_bloom_filter_at_end() {
2594 let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
2595 let mut options = RoundTripOptions::new(array, false);
2596 options.bloom_filter = true;
2597 options.bloom_filter_position = BloomFilterPosition::End;
2598
2599 let files = one_column_roundtrip_with_options(options);
2600 check_bloom_filter(
2601 files,
2602 "col".to_string(),
2603 (0..SMALL_SIZE as i32).collect(),
2604 (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
2605 );
2606 }
2607
2608 #[test]
2609 fn i32_column_bloom_filter() {
2610 let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
2611 let mut options = RoundTripOptions::new(array, false);
2612 options.bloom_filter = true;
2613
2614 let files = one_column_roundtrip_with_options(options);
2615 check_bloom_filter(
2616 files,
2617 "col".to_string(),
2618 (0..SMALL_SIZE as i32).collect(),
2619 (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
2620 );
2621 }
2622
2623 #[test]
2624 fn binary_column_bloom_filter() {
2625 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2626 let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect();
2627 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2628
2629 let array = Arc::new(BinaryArray::from_iter_values(many_vecs_iter));
2630 let mut options = RoundTripOptions::new(array, false);
2631 options.bloom_filter = true;
2632
2633 let files = one_column_roundtrip_with_options(options);
2634 check_bloom_filter(
2635 files,
2636 "col".to_string(),
2637 many_vecs,
2638 vec![vec![(SMALL_SIZE + 1) as u8]],
2639 );
2640 }
2641
2642 #[test]
2643 fn empty_string_null_column_bloom_filter() {
2644 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2645 let raw_strs = raw_values.iter().map(|s| s.as_str());
2646
2647 let array = Arc::new(StringArray::from_iter_values(raw_strs));
2648 let mut options = RoundTripOptions::new(array, false);
2649 options.bloom_filter = true;
2650
2651 let files = one_column_roundtrip_with_options(options);
2652
2653 let optional_raw_values: Vec<_> = raw_values
2654 .iter()
2655 .enumerate()
2656 .filter_map(|(i, v)| if i % 2 == 0 { None } else { Some(v.as_str()) })
2657 .collect();
2658 check_bloom_filter(files, "col".to_string(), optional_raw_values, vec![""]);
2660 }
2661
2662 #[test]
2663 fn large_binary_single_column() {
2664 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2665 let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect();
2666 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2667
2668 values_required::<LargeBinaryArray, _>(many_vecs_iter);
2670 }
2671
2672 #[test]
2673 fn fixed_size_binary_single_column() {
2674 let mut builder = FixedSizeBinaryBuilder::new(4);
2675 builder.append_value(b"0123").unwrap();
2676 builder.append_null();
2677 builder.append_value(b"8910").unwrap();
2678 builder.append_value(b"1112").unwrap();
2679 let array = Arc::new(builder.finish());
2680
2681 one_column_roundtrip(array, true);
2682 }
2683
2684 #[test]
2685 fn string_single_column() {
2686 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2687 let raw_strs = raw_values.iter().map(|s| s.as_str());
2688
2689 required_and_optional::<StringArray, _>(raw_strs);
2690 }
2691
2692 #[test]
2693 fn large_string_single_column() {
2694 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2695 let raw_strs = raw_values.iter().map(|s| s.as_str());
2696
2697 required_and_optional::<LargeStringArray, _>(raw_strs);
2698 }
2699
2700 #[test]
2701 fn string_view_single_column() {
2702 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2703 let raw_strs = raw_values.iter().map(|s| s.as_str());
2704
2705 required_and_optional::<StringViewArray, _>(raw_strs);
2706 }
2707
2708 #[test]
2709 fn null_list_single_column() {
2710 let null_field = Field::new_list_field(DataType::Null, true);
2711 let list_field = Field::new("emptylist", DataType::List(Arc::new(null_field)), true);
2712
2713 let schema = Schema::new(vec![list_field]);
2714
2715 let a_values = NullArray::new(2);
2717 let a_value_offsets = arrow::buffer::Buffer::from([0, 0, 0, 2].to_byte_slice());
2718 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2719 DataType::Null,
2720 true,
2721 ))))
2722 .len(3)
2723 .add_buffer(a_value_offsets)
2724 .null_bit_buffer(Some(Buffer::from([0b00000101])))
2725 .add_child_data(a_values.into_data())
2726 .build()
2727 .unwrap();
2728
2729 let a = ListArray::from(a_list_data);
2730
2731 assert!(a.is_valid(0));
2732 assert!(!a.is_valid(1));
2733 assert!(a.is_valid(2));
2734
2735 assert_eq!(a.value(0).len(), 0);
2736 assert_eq!(a.value(2).len(), 2);
2737 assert_eq!(a.value(2).logical_nulls().unwrap().null_count(), 2);
2738
2739 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2740 roundtrip(batch, None);
2741 }
2742
2743 #[test]
2744 fn list_single_column() {
2745 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2746 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
2747 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2748 DataType::Int32,
2749 false,
2750 ))))
2751 .len(5)
2752 .add_buffer(a_value_offsets)
2753 .null_bit_buffer(Some(Buffer::from([0b00011011])))
2754 .add_child_data(a_values.into_data())
2755 .build()
2756 .unwrap();
2757
2758 assert_eq!(a_list_data.null_count(), 1);
2759
2760 let a = ListArray::from(a_list_data);
2761 let values = Arc::new(a);
2762
2763 one_column_roundtrip(values, true);
2764 }
2765
2766 #[test]
2767 fn large_list_single_column() {
2768 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2769 let a_value_offsets = arrow::buffer::Buffer::from([0i64, 1, 3, 3, 6, 10].to_byte_slice());
2770 let a_list_data = ArrayData::builder(DataType::LargeList(Arc::new(Field::new(
2771 "large_item",
2772 DataType::Int32,
2773 true,
2774 ))))
2775 .len(5)
2776 .add_buffer(a_value_offsets)
2777 .add_child_data(a_values.into_data())
2778 .null_bit_buffer(Some(Buffer::from([0b00011011])))
2779 .build()
2780 .unwrap();
2781
2782 assert_eq!(a_list_data.null_count(), 1);
2784
2785 let a = LargeListArray::from(a_list_data);
2786 let values = Arc::new(a);
2787
2788 one_column_roundtrip(values, true);
2789 }
2790
2791 #[test]
2792 fn list_nested_nulls() {
2793 use arrow::datatypes::Int32Type;
2794 let data = vec![
2795 Some(vec![Some(1)]),
2796 Some(vec![Some(2), Some(3)]),
2797 None,
2798 Some(vec![Some(4), Some(5), None]),
2799 Some(vec![None]),
2800 Some(vec![Some(6), Some(7)]),
2801 ];
2802
2803 let list = ListArray::from_iter_primitive::<Int32Type, _, _>(data.clone());
2804 one_column_roundtrip(Arc::new(list), true);
2805
2806 let list = LargeListArray::from_iter_primitive::<Int32Type, _, _>(data);
2807 one_column_roundtrip(Arc::new(list), true);
2808 }
2809
2810 #[test]
2811 fn struct_single_column() {
2812 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2813 let struct_field_a = Arc::new(Field::new("f", DataType::Int32, false));
2814 let s = StructArray::from(vec![(struct_field_a, Arc::new(a_values) as ArrayRef)]);
2815
2816 let values = Arc::new(s);
2817 one_column_roundtrip(values, false);
2818 }
2819
2820 #[test]
2821 fn list_and_map_coerced_names() {
2822 let list_field =
2824 Field::new_list("my_list", Field::new("item", DataType::Int32, false), false);
2825 let map_field = Field::new_map(
2826 "my_map",
2827 "entries",
2828 Field::new("keys", DataType::Int32, false),
2829 Field::new("values", DataType::Int32, true),
2830 false,
2831 true,
2832 );
2833
2834 let list_array = create_random_array(&list_field, 100, 0.0, 0.0).unwrap();
2835 let map_array = create_random_array(&map_field, 100, 0.0, 0.0).unwrap();
2836
2837 let arrow_schema = Arc::new(Schema::new(vec![list_field, map_field]));
2838
2839 let props = Some(WriterProperties::builder().set_coerce_types(true).build());
2841 let file = tempfile::tempfile().unwrap();
2842 let mut writer =
2843 ArrowWriter::try_new(file.try_clone().unwrap(), arrow_schema.clone(), props).unwrap();
2844
2845 let batch = RecordBatch::try_new(arrow_schema, vec![list_array, map_array]).unwrap();
2846 writer.write(&batch).unwrap();
2847 let file_metadata = writer.close().unwrap();
2848
2849 assert_eq!(file_metadata.schema[3].name, "element");
2851 assert_eq!(file_metadata.schema[5].name, "key_value");
2853 assert_eq!(file_metadata.schema[6].name, "key");
2855 assert_eq!(file_metadata.schema[7].name, "value");
2857
2858 let reader = SerializedFileReader::new(file).unwrap();
2860 let file_schema = reader.metadata().file_metadata().schema();
2861 let fields = file_schema.get_fields();
2862 let list_field = &fields[0].get_fields()[0];
2863 assert_eq!(list_field.get_fields()[0].name(), "element");
2864 let map_field = &fields[1].get_fields()[0];
2865 assert_eq!(map_field.name(), "key_value");
2866 assert_eq!(map_field.get_fields()[0].name(), "key");
2867 assert_eq!(map_field.get_fields()[1].name(), "value");
2868 }
2869
2870 #[test]
2871 fn fallback_flush_data_page() {
2872 let raw_values: Vec<_> = (0..MEDIUM_SIZE).map(|i| i.to_string()).collect();
2874 let values = Arc::new(StringArray::from(raw_values));
2875 let encodings = vec![
2876 Encoding::DELTA_BYTE_ARRAY,
2877 Encoding::DELTA_LENGTH_BYTE_ARRAY,
2878 ];
2879 let data_type = values.data_type().clone();
2880 let schema = Arc::new(Schema::new(vec![Field::new("col", data_type, false)]));
2881 let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2882
2883 let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
2884 let data_page_size_limit: usize = 32;
2885 let write_batch_size: usize = 16;
2886
2887 for encoding in &encodings {
2888 for row_group_size in row_group_sizes {
2889 let props = WriterProperties::builder()
2890 .set_writer_version(WriterVersion::PARQUET_2_0)
2891 .set_max_row_group_size(row_group_size)
2892 .set_dictionary_enabled(false)
2893 .set_encoding(*encoding)
2894 .set_data_page_size_limit(data_page_size_limit)
2895 .set_write_batch_size(write_batch_size)
2896 .build();
2897
2898 roundtrip_opts_with_array_validation(&expected_batch, props, |a, b| {
2899 let string_array_a = StringArray::from(a.clone());
2900 let string_array_b = StringArray::from(b.clone());
2901 let vec_a: Vec<&str> = string_array_a.iter().map(|v| v.unwrap()).collect();
2902 let vec_b: Vec<&str> = string_array_b.iter().map(|v| v.unwrap()).collect();
2903 assert_eq!(
2904 vec_a, vec_b,
2905 "failed for encoder: {encoding:?} and row_group_size: {row_group_size:?}"
2906 );
2907 });
2908 }
2909 }
2910 }
2911
2912 #[test]
2913 fn arrow_writer_string_dictionary() {
2914 #[allow(deprecated)]
2916 let schema = Arc::new(Schema::new(vec![Field::new_dict(
2917 "dictionary",
2918 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2919 true,
2920 42,
2921 true,
2922 )]));
2923
2924 let d: Int32DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
2926 .iter()
2927 .copied()
2928 .collect();
2929
2930 one_column_roundtrip_with_schema(Arc::new(d), schema);
2932 }
2933
2934 #[test]
2935 fn arrow_writer_primitive_dictionary() {
2936 #[allow(deprecated)]
2938 let schema = Arc::new(Schema::new(vec![Field::new_dict(
2939 "dictionary",
2940 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::UInt32)),
2941 true,
2942 42,
2943 true,
2944 )]));
2945
2946 let mut builder = PrimitiveDictionaryBuilder::<UInt8Type, UInt32Type>::new();
2948 builder.append(12345678).unwrap();
2949 builder.append_null();
2950 builder.append(22345678).unwrap();
2951 builder.append(12345678).unwrap();
2952 let d = builder.finish();
2953
2954 one_column_roundtrip_with_schema(Arc::new(d), schema);
2955 }
2956
2957 #[test]
2958 fn arrow_writer_decimal128_dictionary() {
2959 let integers = vec![12345, 56789, 34567];
2960
2961 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
2962
2963 let values = Decimal128Array::from(integers.clone())
2964 .with_precision_and_scale(5, 2)
2965 .unwrap();
2966
2967 let array = DictionaryArray::new(keys, Arc::new(values));
2968 one_column_roundtrip(Arc::new(array.clone()), true);
2969
2970 let values = Decimal128Array::from(integers)
2971 .with_precision_and_scale(12, 2)
2972 .unwrap();
2973
2974 let array = array.with_values(Arc::new(values));
2975 one_column_roundtrip(Arc::new(array), true);
2976 }
2977
2978 #[test]
2979 fn arrow_writer_decimal256_dictionary() {
2980 let integers = vec![
2981 i256::from_i128(12345),
2982 i256::from_i128(56789),
2983 i256::from_i128(34567),
2984 ];
2985
2986 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
2987
2988 let values = Decimal256Array::from(integers.clone())
2989 .with_precision_and_scale(5, 2)
2990 .unwrap();
2991
2992 let array = DictionaryArray::new(keys, Arc::new(values));
2993 one_column_roundtrip(Arc::new(array.clone()), true);
2994
2995 let values = Decimal256Array::from(integers)
2996 .with_precision_and_scale(12, 2)
2997 .unwrap();
2998
2999 let array = array.with_values(Arc::new(values));
3000 one_column_roundtrip(Arc::new(array), true);
3001 }
3002
3003 #[test]
3004 fn arrow_writer_string_dictionary_unsigned_index() {
3005 #[allow(deprecated)]
3007 let schema = Arc::new(Schema::new(vec![Field::new_dict(
3008 "dictionary",
3009 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3010 true,
3011 42,
3012 true,
3013 )]));
3014
3015 let d: UInt8DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
3017 .iter()
3018 .copied()
3019 .collect();
3020
3021 one_column_roundtrip_with_schema(Arc::new(d), schema);
3022 }
3023
3024 #[test]
3025 fn u32_min_max() {
3026 let src = [
3028 u32::MIN,
3029 u32::MIN + 1,
3030 (i32::MAX as u32) - 1,
3031 i32::MAX as u32,
3032 (i32::MAX as u32) + 1,
3033 u32::MAX - 1,
3034 u32::MAX,
3035 ];
3036 let values = Arc::new(UInt32Array::from_iter_values(src.iter().cloned()));
3037 let files = one_column_roundtrip(values, false);
3038
3039 for file in files {
3040 let reader = SerializedFileReader::new(file).unwrap();
3042 let metadata = reader.metadata();
3043
3044 let mut row_offset = 0;
3045 for row_group in metadata.row_groups() {
3046 assert_eq!(row_group.num_columns(), 1);
3047 let column = row_group.column(0);
3048
3049 let num_values = column.num_values() as usize;
3050 let src_slice = &src[row_offset..row_offset + num_values];
3051 row_offset += column.num_values() as usize;
3052
3053 let stats = column.statistics().unwrap();
3054 if let Statistics::Int32(stats) = stats {
3055 assert_eq!(
3056 *stats.min_opt().unwrap() as u32,
3057 *src_slice.iter().min().unwrap()
3058 );
3059 assert_eq!(
3060 *stats.max_opt().unwrap() as u32,
3061 *src_slice.iter().max().unwrap()
3062 );
3063 } else {
3064 panic!("Statistics::Int32 missing")
3065 }
3066 }
3067 }
3068 }
3069
3070 #[test]
3071 fn u64_min_max() {
3072 let src = [
3074 u64::MIN,
3075 u64::MIN + 1,
3076 (i64::MAX as u64) - 1,
3077 i64::MAX as u64,
3078 (i64::MAX as u64) + 1,
3079 u64::MAX - 1,
3080 u64::MAX,
3081 ];
3082 let values = Arc::new(UInt64Array::from_iter_values(src.iter().cloned()));
3083 let files = one_column_roundtrip(values, false);
3084
3085 for file in files {
3086 let reader = SerializedFileReader::new(file).unwrap();
3088 let metadata = reader.metadata();
3089
3090 let mut row_offset = 0;
3091 for row_group in metadata.row_groups() {
3092 assert_eq!(row_group.num_columns(), 1);
3093 let column = row_group.column(0);
3094
3095 let num_values = column.num_values() as usize;
3096 let src_slice = &src[row_offset..row_offset + num_values];
3097 row_offset += column.num_values() as usize;
3098
3099 let stats = column.statistics().unwrap();
3100 if let Statistics::Int64(stats) = stats {
3101 assert_eq!(
3102 *stats.min_opt().unwrap() as u64,
3103 *src_slice.iter().min().unwrap()
3104 );
3105 assert_eq!(
3106 *stats.max_opt().unwrap() as u64,
3107 *src_slice.iter().max().unwrap()
3108 );
3109 } else {
3110 panic!("Statistics::Int64 missing")
3111 }
3112 }
3113 }
3114 }
3115
3116 #[test]
3117 fn statistics_null_counts_only_nulls() {
3118 let values = Arc::new(UInt64Array::from(vec![None, None]));
3120 let files = one_column_roundtrip(values, true);
3121
3122 for file in files {
3123 let reader = SerializedFileReader::new(file).unwrap();
3125 let metadata = reader.metadata();
3126 assert_eq!(metadata.num_row_groups(), 1);
3127 let row_group = metadata.row_group(0);
3128 assert_eq!(row_group.num_columns(), 1);
3129 let column = row_group.column(0);
3130 let stats = column.statistics().unwrap();
3131 assert_eq!(stats.null_count_opt(), Some(2));
3132 }
3133 }
3134
3135 #[test]
3136 fn test_list_of_struct_roundtrip() {
3137 let int_field = Field::new("a", DataType::Int32, true);
3139 let int_field2 = Field::new("b", DataType::Int32, true);
3140
3141 let int_builder = Int32Builder::with_capacity(10);
3142 let int_builder2 = Int32Builder::with_capacity(10);
3143
3144 let struct_builder = StructBuilder::new(
3145 vec![int_field, int_field2],
3146 vec![Box::new(int_builder), Box::new(int_builder2)],
3147 );
3148 let mut list_builder = ListBuilder::new(struct_builder);
3149
3150 let values = list_builder.values();
3155 values
3156 .field_builder::<Int32Builder>(0)
3157 .unwrap()
3158 .append_value(1);
3159 values
3160 .field_builder::<Int32Builder>(1)
3161 .unwrap()
3162 .append_value(2);
3163 values.append(true);
3164 list_builder.append(true);
3165
3166 list_builder.append(true);
3168
3169 list_builder.append(false);
3171
3172 let values = list_builder.values();
3174 values
3175 .field_builder::<Int32Builder>(0)
3176 .unwrap()
3177 .append_null();
3178 values
3179 .field_builder::<Int32Builder>(1)
3180 .unwrap()
3181 .append_null();
3182 values.append(false);
3183 values
3184 .field_builder::<Int32Builder>(0)
3185 .unwrap()
3186 .append_null();
3187 values
3188 .field_builder::<Int32Builder>(1)
3189 .unwrap()
3190 .append_null();
3191 values.append(false);
3192 list_builder.append(true);
3193
3194 let values = list_builder.values();
3196 values
3197 .field_builder::<Int32Builder>(0)
3198 .unwrap()
3199 .append_null();
3200 values
3201 .field_builder::<Int32Builder>(1)
3202 .unwrap()
3203 .append_value(3);
3204 values.append(true);
3205 list_builder.append(true);
3206
3207 let values = list_builder.values();
3209 values
3210 .field_builder::<Int32Builder>(0)
3211 .unwrap()
3212 .append_value(2);
3213 values
3214 .field_builder::<Int32Builder>(1)
3215 .unwrap()
3216 .append_null();
3217 values.append(true);
3218 list_builder.append(true);
3219
3220 let array = Arc::new(list_builder.finish());
3221
3222 one_column_roundtrip(array, true);
3223 }
3224
3225 fn row_group_sizes(metadata: &ParquetMetaData) -> Vec<i64> {
3226 metadata.row_groups().iter().map(|x| x.num_rows()).collect()
3227 }
3228
3229 #[test]
3230 fn test_aggregates_records() {
3231 let arrays = [
3232 Int32Array::from((0..100).collect::<Vec<_>>()),
3233 Int32Array::from((0..50).collect::<Vec<_>>()),
3234 Int32Array::from((200..500).collect::<Vec<_>>()),
3235 ];
3236
3237 let schema = Arc::new(Schema::new(vec![Field::new(
3238 "int",
3239 ArrowDataType::Int32,
3240 false,
3241 )]));
3242
3243 let file = tempfile::tempfile().unwrap();
3244
3245 let props = WriterProperties::builder()
3246 .set_max_row_group_size(200)
3247 .build();
3248
3249 let mut writer =
3250 ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
3251
3252 for array in arrays {
3253 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
3254 writer.write(&batch).unwrap();
3255 }
3256
3257 writer.close().unwrap();
3258
3259 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3260 assert_eq!(&row_group_sizes(builder.metadata()), &[200, 200, 50]);
3261
3262 let batches = builder
3263 .with_batch_size(100)
3264 .build()
3265 .unwrap()
3266 .collect::<ArrowResult<Vec<_>>>()
3267 .unwrap();
3268
3269 assert_eq!(batches.len(), 5);
3270 assert!(batches.iter().all(|x| x.num_columns() == 1));
3271
3272 let batch_sizes: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
3273
3274 assert_eq!(&batch_sizes, &[100, 100, 100, 100, 50]);
3275
3276 let values: Vec<_> = batches
3277 .iter()
3278 .flat_map(|x| {
3279 x.column(0)
3280 .as_any()
3281 .downcast_ref::<Int32Array>()
3282 .unwrap()
3283 .values()
3284 .iter()
3285 .cloned()
3286 })
3287 .collect();
3288
3289 let expected_values: Vec<_> = [0..100, 0..50, 200..500].into_iter().flatten().collect();
3290 assert_eq!(&values, &expected_values)
3291 }
3292
3293 #[test]
3294 fn complex_aggregate() {
3295 let field_a = Arc::new(Field::new("leaf_a", DataType::Int32, false));
3297 let field_b = Arc::new(Field::new("leaf_b", DataType::Int32, true));
3298 let struct_a = Arc::new(Field::new(
3299 "struct_a",
3300 DataType::Struct(vec![field_a.clone(), field_b.clone()].into()),
3301 true,
3302 ));
3303
3304 let list_a = Arc::new(Field::new("list", DataType::List(struct_a), true));
3305 let struct_b = Arc::new(Field::new(
3306 "struct_b",
3307 DataType::Struct(vec![list_a.clone()].into()),
3308 false,
3309 ));
3310
3311 let schema = Arc::new(Schema::new(vec![struct_b]));
3312
3313 let field_a_array = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
3315 let field_b_array =
3316 Int32Array::from_iter(vec![Some(1), None, Some(2), None, None, Some(6)]);
3317
3318 let struct_a_array = StructArray::from(vec![
3319 (field_a.clone(), Arc::new(field_a_array) as ArrayRef),
3320 (field_b.clone(), Arc::new(field_b_array) as ArrayRef),
3321 ]);
3322
3323 let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
3324 .len(5)
3325 .add_buffer(Buffer::from_iter(vec![
3326 0_i32, 1_i32, 1_i32, 3_i32, 3_i32, 5_i32,
3327 ]))
3328 .null_bit_buffer(Some(Buffer::from_iter(vec![
3329 true, false, true, false, true,
3330 ])))
3331 .child_data(vec![struct_a_array.into_data()])
3332 .build()
3333 .unwrap();
3334
3335 let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
3336 let struct_b_array = StructArray::from(vec![(list_a.clone(), list_a_array)]);
3337
3338 let batch1 =
3339 RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
3340 .unwrap();
3341
3342 let field_a_array = Int32Array::from(vec![6, 7, 8, 9, 10]);
3343 let field_b_array = Int32Array::from_iter(vec![None, None, None, Some(1), None]);
3344
3345 let struct_a_array = StructArray::from(vec![
3346 (field_a, Arc::new(field_a_array) as ArrayRef),
3347 (field_b, Arc::new(field_b_array) as ArrayRef),
3348 ]);
3349
3350 let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
3351 .len(2)
3352 .add_buffer(Buffer::from_iter(vec![0_i32, 4_i32, 5_i32]))
3353 .child_data(vec![struct_a_array.into_data()])
3354 .build()
3355 .unwrap();
3356
3357 let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
3358 let struct_b_array = StructArray::from(vec![(list_a, list_a_array)]);
3359
3360 let batch2 =
3361 RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
3362 .unwrap();
3363
3364 let batches = &[batch1, batch2];
3365
3366 let expected = r#"
3369 +-------------------------------------------------------------------------------------------------------+
3370 | struct_b |
3371 +-------------------------------------------------------------------------------------------------------+
3372 | {list: [{leaf_a: 1, leaf_b: 1}]} |
3373 | {list: } |
3374 | {list: [{leaf_a: 2, leaf_b: }, {leaf_a: 3, leaf_b: 2}]} |
3375 | {list: } |
3376 | {list: [{leaf_a: 4, leaf_b: }, {leaf_a: 5, leaf_b: }]} |
3377 | {list: [{leaf_a: 6, leaf_b: }, {leaf_a: 7, leaf_b: }, {leaf_a: 8, leaf_b: }, {leaf_a: 9, leaf_b: 1}]} |
3378 | {list: [{leaf_a: 10, leaf_b: }]} |
3379 +-------------------------------------------------------------------------------------------------------+
3380 "#.trim().split('\n').map(|x| x.trim()).collect::<Vec<_>>().join("\n");
3381
3382 let actual = pretty_format_batches(batches).unwrap().to_string();
3383 assert_eq!(actual, expected);
3384
3385 let file = tempfile::tempfile().unwrap();
3387 let props = WriterProperties::builder()
3388 .set_max_row_group_size(6)
3389 .build();
3390
3391 let mut writer =
3392 ArrowWriter::try_new(file.try_clone().unwrap(), schema, Some(props)).unwrap();
3393
3394 for batch in batches {
3395 writer.write(batch).unwrap();
3396 }
3397 writer.close().unwrap();
3398
3399 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3404 assert_eq!(&row_group_sizes(builder.metadata()), &[6, 1]);
3405
3406 let batches = builder
3407 .with_batch_size(2)
3408 .build()
3409 .unwrap()
3410 .collect::<ArrowResult<Vec<_>>>()
3411 .unwrap();
3412
3413 assert_eq!(batches.len(), 4);
3414 let batch_counts: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
3415 assert_eq!(&batch_counts, &[2, 2, 2, 1]);
3416
3417 let actual = pretty_format_batches(&batches).unwrap().to_string();
3418 assert_eq!(actual, expected);
3419 }
3420
3421 #[test]
3422 fn test_arrow_writer_metadata() {
3423 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3424 let file_schema = batch_schema.clone().with_metadata(
3425 vec![("foo".to_string(), "bar".to_string())]
3426 .into_iter()
3427 .collect(),
3428 );
3429
3430 let batch = RecordBatch::try_new(
3431 Arc::new(batch_schema),
3432 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3433 )
3434 .unwrap();
3435
3436 let mut buf = Vec::with_capacity(1024);
3437 let mut writer = ArrowWriter::try_new(&mut buf, Arc::new(file_schema), None).unwrap();
3438 writer.write(&batch).unwrap();
3439 writer.close().unwrap();
3440 }
3441
3442 #[test]
3443 fn test_arrow_writer_nullable() {
3444 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3445 let file_schema = Schema::new(vec![Field::new("int32", DataType::Int32, true)]);
3446 let file_schema = Arc::new(file_schema);
3447
3448 let batch = RecordBatch::try_new(
3449 Arc::new(batch_schema),
3450 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3451 )
3452 .unwrap();
3453
3454 let mut buf = Vec::with_capacity(1024);
3455 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
3456 writer.write(&batch).unwrap();
3457 writer.close().unwrap();
3458
3459 let mut read = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
3460 let back = read.next().unwrap().unwrap();
3461 assert_eq!(back.schema(), file_schema);
3462 assert_ne!(back.schema(), batch.schema());
3463 assert_eq!(back.column(0).as_ref(), batch.column(0).as_ref());
3464 }
3465
3466 #[test]
3467 fn in_progress_accounting() {
3468 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
3470
3471 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
3473
3474 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
3476
3477 let mut writer = ArrowWriter::try_new(vec![], batch.schema(), None).unwrap();
3478
3479 assert_eq!(writer.in_progress_size(), 0);
3481 assert_eq!(writer.in_progress_rows(), 0);
3482 assert_eq!(writer.memory_size(), 0);
3483 assert_eq!(writer.bytes_written(), 4); writer.write(&batch).unwrap();
3485
3486 let initial_size = writer.in_progress_size();
3488 assert!(initial_size > 0);
3489 assert_eq!(writer.in_progress_rows(), 5);
3490 let initial_memory = writer.memory_size();
3491 assert!(initial_memory > 0);
3492 assert!(
3494 initial_size <= initial_memory,
3495 "{initial_size} <= {initial_memory}"
3496 );
3497
3498 writer.write(&batch).unwrap();
3500 assert!(writer.in_progress_size() > initial_size);
3501 assert_eq!(writer.in_progress_rows(), 10);
3502 assert!(writer.memory_size() > initial_memory);
3503 assert!(
3504 writer.in_progress_size() <= writer.memory_size(),
3505 "in_progress_size {} <= memory_size {}",
3506 writer.in_progress_size(),
3507 writer.memory_size()
3508 );
3509
3510 let pre_flush_bytes_written = writer.bytes_written();
3512 writer.flush().unwrap();
3513 assert_eq!(writer.in_progress_size(), 0);
3514 assert_eq!(writer.memory_size(), 0);
3515 assert!(writer.bytes_written() > pre_flush_bytes_written);
3516
3517 writer.close().unwrap();
3518 }
3519
3520 #[test]
3521 fn test_writer_all_null() {
3522 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
3523 let b = Int32Array::new(vec![0; 5].into(), Some(NullBuffer::new_null(5)));
3524 let batch = RecordBatch::try_from_iter(vec![
3525 ("a", Arc::new(a) as ArrayRef),
3526 ("b", Arc::new(b) as ArrayRef),
3527 ])
3528 .unwrap();
3529
3530 let mut buf = Vec::with_capacity(1024);
3531 let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), None).unwrap();
3532 writer.write(&batch).unwrap();
3533 writer.close().unwrap();
3534
3535 let bytes = Bytes::from(buf);
3536 let options = ReadOptionsBuilder::new().with_page_index().build();
3537 let reader = SerializedFileReader::new_with_options(bytes, options).unwrap();
3538 let index = reader.metadata().offset_index().unwrap();
3539
3540 assert_eq!(index.len(), 1);
3541 assert_eq!(index[0].len(), 2); assert_eq!(index[0][0].page_locations().len(), 1); assert_eq!(index[0][1].page_locations().len(), 1); }
3545
3546 #[test]
3547 fn test_disabled_statistics_with_page() {
3548 let file_schema = Schema::new(vec![
3549 Field::new("a", DataType::Utf8, true),
3550 Field::new("b", DataType::Utf8, true),
3551 ]);
3552 let file_schema = Arc::new(file_schema);
3553
3554 let batch = RecordBatch::try_new(
3555 file_schema.clone(),
3556 vec![
3557 Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
3558 Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
3559 ],
3560 )
3561 .unwrap();
3562
3563 let props = WriterProperties::builder()
3564 .set_statistics_enabled(EnabledStatistics::None)
3565 .set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
3566 .build();
3567
3568 let mut buf = Vec::with_capacity(1024);
3569 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
3570 writer.write(&batch).unwrap();
3571
3572 let metadata = writer.close().unwrap();
3573 assert_eq!(metadata.row_groups.len(), 1);
3574 let row_group = &metadata.row_groups[0];
3575 assert_eq!(row_group.columns.len(), 2);
3576 assert!(row_group.columns[0].offset_index_offset.is_some());
3578 assert!(row_group.columns[0].column_index_offset.is_some());
3579 assert!(row_group.columns[1].offset_index_offset.is_some());
3581 assert!(row_group.columns[1].column_index_offset.is_none());
3582
3583 let options = ReadOptionsBuilder::new().with_page_index().build();
3584 let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
3585
3586 let row_group = reader.get_row_group(0).unwrap();
3587 let a_col = row_group.metadata().column(0);
3588 let b_col = row_group.metadata().column(1);
3589
3590 if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
3592 let min = byte_array_stats.min_opt().unwrap();
3593 let max = byte_array_stats.max_opt().unwrap();
3594
3595 assert_eq!(min.as_bytes(), b"a");
3596 assert_eq!(max.as_bytes(), b"d");
3597 } else {
3598 panic!("expecting Statistics::ByteArray");
3599 }
3600
3601 assert!(b_col.statistics().is_none());
3603
3604 let offset_index = reader.metadata().offset_index().unwrap();
3605 assert_eq!(offset_index.len(), 1); assert_eq!(offset_index[0].len(), 2); let column_index = reader.metadata().column_index().unwrap();
3609 assert_eq!(column_index.len(), 1); assert_eq!(column_index[0].len(), 2); let a_idx = &column_index[0][0];
3613 assert!(matches!(a_idx, Index::BYTE_ARRAY(_)), "{a_idx:?}");
3614 let b_idx = &column_index[0][1];
3615 assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
3616 }
3617
3618 #[test]
3619 fn test_disabled_statistics_with_chunk() {
3620 let file_schema = Schema::new(vec![
3621 Field::new("a", DataType::Utf8, true),
3622 Field::new("b", DataType::Utf8, true),
3623 ]);
3624 let file_schema = Arc::new(file_schema);
3625
3626 let batch = RecordBatch::try_new(
3627 file_schema.clone(),
3628 vec![
3629 Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
3630 Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
3631 ],
3632 )
3633 .unwrap();
3634
3635 let props = WriterProperties::builder()
3636 .set_statistics_enabled(EnabledStatistics::None)
3637 .set_column_statistics_enabled("a".into(), EnabledStatistics::Chunk)
3638 .build();
3639
3640 let mut buf = Vec::with_capacity(1024);
3641 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
3642 writer.write(&batch).unwrap();
3643
3644 let metadata = writer.close().unwrap();
3645 assert_eq!(metadata.row_groups.len(), 1);
3646 let row_group = &metadata.row_groups[0];
3647 assert_eq!(row_group.columns.len(), 2);
3648 assert!(row_group.columns[0].offset_index_offset.is_some());
3650 assert!(row_group.columns[0].column_index_offset.is_none());
3651 assert!(row_group.columns[1].offset_index_offset.is_some());
3653 assert!(row_group.columns[1].column_index_offset.is_none());
3654
3655 let options = ReadOptionsBuilder::new().with_page_index().build();
3656 let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
3657
3658 let row_group = reader.get_row_group(0).unwrap();
3659 let a_col = row_group.metadata().column(0);
3660 let b_col = row_group.metadata().column(1);
3661
3662 if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
3664 let min = byte_array_stats.min_opt().unwrap();
3665 let max = byte_array_stats.max_opt().unwrap();
3666
3667 assert_eq!(min.as_bytes(), b"a");
3668 assert_eq!(max.as_bytes(), b"d");
3669 } else {
3670 panic!("expecting Statistics::ByteArray");
3671 }
3672
3673 assert!(b_col.statistics().is_none());
3675
3676 let column_index = reader.metadata().column_index().unwrap();
3677 assert_eq!(column_index.len(), 1); assert_eq!(column_index[0].len(), 2); let a_idx = &column_index[0][0];
3681 assert!(matches!(a_idx, Index::NONE), "{a_idx:?}");
3682 let b_idx = &column_index[0][1];
3683 assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
3684 }
3685
3686 #[test]
3687 fn test_arrow_writer_skip_metadata() {
3688 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3689 let file_schema = Arc::new(batch_schema.clone());
3690
3691 let batch = RecordBatch::try_new(
3692 Arc::new(batch_schema),
3693 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3694 )
3695 .unwrap();
3696 let skip_options = ArrowWriterOptions::new().with_skip_arrow_metadata(true);
3697
3698 let mut buf = Vec::with_capacity(1024);
3699 let mut writer =
3700 ArrowWriter::try_new_with_options(&mut buf, file_schema.clone(), skip_options).unwrap();
3701 writer.write(&batch).unwrap();
3702 writer.close().unwrap();
3703
3704 let bytes = Bytes::from(buf);
3705 let reader_builder = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
3706 assert_eq!(file_schema, *reader_builder.schema());
3707 if let Some(key_value_metadata) = reader_builder
3708 .metadata()
3709 .file_metadata()
3710 .key_value_metadata()
3711 {
3712 assert!(!key_value_metadata
3713 .iter()
3714 .any(|kv| kv.key.as_str() == ARROW_SCHEMA_META_KEY));
3715 }
3716 }
3717
3718 #[test]
3719 fn mismatched_schemas() {
3720 let batch_schema = Schema::new(vec![Field::new("count", DataType::Int32, false)]);
3721 let file_schema = Arc::new(Schema::new(vec![Field::new(
3722 "temperature",
3723 DataType::Float64,
3724 false,
3725 )]));
3726
3727 let batch = RecordBatch::try_new(
3728 Arc::new(batch_schema),
3729 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3730 )
3731 .unwrap();
3732
3733 let mut buf = Vec::with_capacity(1024);
3734 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
3735
3736 let err = writer.write(&batch).unwrap_err().to_string();
3737 assert_eq!(
3738 err,
3739 "Arrow: Incompatible type. Field 'temperature' has type Float64, array has type Int32"
3740 );
3741 }
3742
3743 #[test]
3744 fn test_roundtrip_empty_schema() {
3746 let empty_batch = RecordBatch::try_new_with_options(
3748 Arc::new(Schema::empty()),
3749 vec![],
3750 &RecordBatchOptions::default().with_row_count(Some(0)),
3751 )
3752 .unwrap();
3753
3754 let mut parquet_bytes: Vec<u8> = Vec::new();
3756 let mut writer =
3757 ArrowWriter::try_new(&mut parquet_bytes, empty_batch.schema(), None).unwrap();
3758 writer.write(&empty_batch).unwrap();
3759 writer.close().unwrap();
3760
3761 let bytes = Bytes::from(parquet_bytes);
3763 let reader = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
3764 assert_eq!(reader.schema(), &empty_batch.schema());
3765 let batches: Vec<_> = reader
3766 .build()
3767 .unwrap()
3768 .collect::<ArrowResult<Vec<_>>>()
3769 .unwrap();
3770 assert_eq!(batches.len(), 0);
3771 }
3772
3773 #[test]
3774 fn test_page_stats_truncation() {
3775 let string_field = Field::new("a", DataType::Utf8, false);
3776 let binary_field = Field::new("b", DataType::Binary, false);
3777 let schema = Schema::new(vec![string_field, binary_field]);
3778
3779 let raw_string_values = vec!["Blart Versenwald III"];
3780 let raw_binary_values = [b"Blart Versenwald III".to_vec()];
3781 let raw_binary_value_refs = raw_binary_values
3782 .iter()
3783 .map(|x| x.as_slice())
3784 .collect::<Vec<_>>();
3785
3786 let string_values = StringArray::from(raw_string_values.clone());
3787 let binary_values = BinaryArray::from(raw_binary_value_refs);
3788 let batch = RecordBatch::try_new(
3789 Arc::new(schema),
3790 vec![Arc::new(string_values), Arc::new(binary_values)],
3791 )
3792 .unwrap();
3793
3794 let props = WriterProperties::builder()
3795 .set_statistics_truncate_length(Some(2))
3796 .set_dictionary_enabled(false)
3797 .set_encoding(Encoding::PLAIN)
3798 .set_compression(crate::basic::Compression::UNCOMPRESSED)
3799 .build();
3800
3801 let mut file = roundtrip_opts(&batch, props);
3802
3803 let mut buf = vec![];
3806 file.seek(std::io::SeekFrom::Start(0)).unwrap();
3807 let read = file.read_to_end(&mut buf).unwrap();
3808 assert!(read > 0);
3809
3810 let first_page = &buf[4..];
3812 let mut prot = TCompactSliceInputProtocol::new(first_page);
3813 let hdr = PageHeader::read_from_in_protocol(&mut prot).unwrap();
3814 let stats = hdr.data_page_header.unwrap().statistics;
3815 assert!(stats.is_some());
3816 let stats = stats.unwrap();
3817 assert!(!stats.is_max_value_exact.unwrap());
3819 assert!(!stats.is_min_value_exact.unwrap());
3820 assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
3821 assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
3822
3823 let second_page = &prot.as_slice()[hdr.compressed_page_size as usize..];
3825 let mut prot = TCompactSliceInputProtocol::new(second_page);
3826 let hdr = PageHeader::read_from_in_protocol(&mut prot).unwrap();
3827 let stats = hdr.data_page_header.unwrap().statistics;
3828 assert!(stats.is_some());
3829 let stats = stats.unwrap();
3830 assert!(!stats.is_max_value_exact.unwrap());
3832 assert!(!stats.is_min_value_exact.unwrap());
3833 assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
3834 assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
3835 }
3836}