1use bytes::Bytes;
21use std::io::{Read, Write};
22use std::iter::Peekable;
23use std::slice::Iter;
24use std::sync::{Arc, Mutex};
25use std::vec::IntoIter;
26use thrift::protocol::TCompactOutputProtocol;
27
28use arrow_array::cast::AsArray;
29use arrow_array::types::*;
30use arrow_array::{ArrayRef, RecordBatch, RecordBatchWriter};
31use arrow_schema::{ArrowError, DataType as ArrowDataType, Field, IntervalUnit, SchemaRef};
32
33use super::schema::{add_encoded_arrow_schema_to_metadata, decimal_length_from_precision};
34
35use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder;
36use crate::arrow::ArrowSchemaConverter;
37use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter};
38use crate::column::page_encryption::PageEncryptor;
39use crate::column::writer::encoder::ColumnValueEncoder;
40use crate::column::writer::{
41 get_column_writer, ColumnCloseResult, ColumnWriter, GenericColumnWriter,
42};
43use crate::data_type::{ByteArray, FixedLenByteArray};
44#[cfg(feature = "encryption")]
45use crate::encryption::encrypt::FileEncryptor;
46use crate::errors::{ParquetError, Result};
47use crate::file::metadata::{KeyValue, RowGroupMetaData};
48use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
49use crate::file::reader::{ChunkReader, Length};
50use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
51use crate::schema::types::{ColumnDescPtr, SchemaDescriptor};
52use crate::thrift::TSerializable;
53use levels::{calculate_array_levels, ArrayLevels};
54
55mod byte_array;
56mod levels;
57
58pub struct ArrowWriter<W: Write> {
132 writer: SerializedFileWriter<W>,
134
135 in_progress: Option<ArrowRowGroupWriter>,
137
138 arrow_schema: SchemaRef,
142
143 row_group_writer_factory: ArrowRowGroupWriterFactory,
145
146 max_row_group_size: usize,
148}
149
150impl<W: Write + Send> std::fmt::Debug for ArrowWriter<W> {
151 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
152 let buffered_memory = self.in_progress_size();
153 f.debug_struct("ArrowWriter")
154 .field("writer", &self.writer)
155 .field("in_progress_size", &format_args!("{buffered_memory} bytes"))
156 .field("in_progress_rows", &self.in_progress_rows())
157 .field("arrow_schema", &self.arrow_schema)
158 .field("max_row_group_size", &self.max_row_group_size)
159 .finish()
160 }
161}
162
163impl<W: Write + Send> ArrowWriter<W> {
164 pub fn try_new(
170 writer: W,
171 arrow_schema: SchemaRef,
172 props: Option<WriterProperties>,
173 ) -> Result<Self> {
174 let options = ArrowWriterOptions::new().with_properties(props.unwrap_or_default());
175 Self::try_new_with_options(writer, arrow_schema, options)
176 }
177
178 pub fn try_new_with_options(
184 writer: W,
185 arrow_schema: SchemaRef,
186 options: ArrowWriterOptions,
187 ) -> Result<Self> {
188 let mut props = options.properties;
189 let mut converter = ArrowSchemaConverter::new().with_coerce_types(props.coerce_types());
190 if let Some(schema_root) = &options.schema_root {
191 converter = converter.schema_root(schema_root);
192 }
193 let schema = converter.convert(&arrow_schema)?;
194 if !options.skip_arrow_metadata {
195 add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
197 }
198
199 let max_row_group_size = props.max_row_group_size();
200
201 let file_writer =
202 SerializedFileWriter::new(writer, schema.root_schema_ptr(), Arc::new(props))?;
203
204 let row_group_writer_factory = ArrowRowGroupWriterFactory::new(&file_writer);
205
206 Ok(Self {
207 writer: file_writer,
208 in_progress: None,
209 arrow_schema,
210 row_group_writer_factory,
211 max_row_group_size,
212 })
213 }
214
215 pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
217 self.writer.flushed_row_groups()
218 }
219
220 pub fn memory_size(&self) -> usize {
225 match &self.in_progress {
226 Some(in_progress) => in_progress.writers.iter().map(|x| x.memory_size()).sum(),
227 None => 0,
228 }
229 }
230
231 pub fn in_progress_size(&self) -> usize {
238 match &self.in_progress {
239 Some(in_progress) => in_progress
240 .writers
241 .iter()
242 .map(|x| x.get_estimated_total_bytes())
243 .sum(),
244 None => 0,
245 }
246 }
247
248 pub fn in_progress_rows(&self) -> usize {
250 self.in_progress
251 .as_ref()
252 .map(|x| x.buffered_rows)
253 .unwrap_or_default()
254 }
255
256 pub fn bytes_written(&self) -> usize {
258 self.writer.bytes_written()
259 }
260
261 pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
269 if batch.num_rows() == 0 {
270 return Ok(());
271 }
272
273 let in_progress = match &mut self.in_progress {
274 Some(in_progress) => in_progress,
275 x => x.insert(self.row_group_writer_factory.create_row_group_writer(
276 self.writer.schema_descr(),
277 self.writer.properties(),
278 &self.arrow_schema,
279 self.writer.flushed_row_groups().len(),
280 )?),
281 };
282
283 if in_progress.buffered_rows + batch.num_rows() > self.max_row_group_size {
285 let to_write = self.max_row_group_size - in_progress.buffered_rows;
286 let a = batch.slice(0, to_write);
287 let b = batch.slice(to_write, batch.num_rows() - to_write);
288 self.write(&a)?;
289 return self.write(&b);
290 }
291
292 in_progress.write(batch)?;
293
294 if in_progress.buffered_rows >= self.max_row_group_size {
295 self.flush()?
296 }
297 Ok(())
298 }
299
300 pub fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
305 self.writer.write_all(buf)
306 }
307
308 pub fn flush(&mut self) -> Result<()> {
310 let in_progress = match self.in_progress.take() {
311 Some(in_progress) => in_progress,
312 None => return Ok(()),
313 };
314
315 let mut row_group_writer = self.writer.next_row_group()?;
316 for chunk in in_progress.close()? {
317 chunk.append_to_row_group(&mut row_group_writer)?;
318 }
319 row_group_writer.close()?;
320 Ok(())
321 }
322
323 pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
327 self.writer.append_key_value_metadata(kv_metadata)
328 }
329
330 pub fn inner(&self) -> &W {
332 self.writer.inner()
333 }
334
335 pub fn inner_mut(&mut self) -> &mut W {
344 self.writer.inner_mut()
345 }
346
347 pub fn into_inner(mut self) -> Result<W> {
349 self.flush()?;
350 self.writer.into_inner()
351 }
352
353 pub fn finish(&mut self) -> Result<crate::format::FileMetaData> {
359 self.flush()?;
360 self.writer.finish()
361 }
362
363 pub fn close(mut self) -> Result<crate::format::FileMetaData> {
365 self.finish()
366 }
367}
368
369impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> {
370 fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
371 self.write(batch).map_err(|e| e.into())
372 }
373
374 fn close(self) -> std::result::Result<(), ArrowError> {
375 self.close()?;
376 Ok(())
377 }
378}
379
380#[derive(Debug, Clone, Default)]
384pub struct ArrowWriterOptions {
385 properties: WriterProperties,
386 skip_arrow_metadata: bool,
387 schema_root: Option<String>,
388}
389
390impl ArrowWriterOptions {
391 pub fn new() -> Self {
393 Self::default()
394 }
395
396 pub fn with_properties(self, properties: WriterProperties) -> Self {
398 Self { properties, ..self }
399 }
400
401 pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
408 Self {
409 skip_arrow_metadata,
410 ..self
411 }
412 }
413
414 pub fn with_schema_root(self, schema_root: String) -> Self {
416 Self {
417 schema_root: Some(schema_root),
418 ..self
419 }
420 }
421}
422
423#[derive(Default)]
425struct ArrowColumnChunkData {
426 length: usize,
427 data: Vec<Bytes>,
428}
429
430impl Length for ArrowColumnChunkData {
431 fn len(&self) -> u64 {
432 self.length as _
433 }
434}
435
436impl ChunkReader for ArrowColumnChunkData {
437 type T = ArrowColumnChunkReader;
438
439 fn get_read(&self, start: u64) -> Result<Self::T> {
440 assert_eq!(start, 0); Ok(ArrowColumnChunkReader(
442 self.data.clone().into_iter().peekable(),
443 ))
444 }
445
446 fn get_bytes(&self, _start: u64, _length: usize) -> Result<Bytes> {
447 unimplemented!()
448 }
449}
450
451struct ArrowColumnChunkReader(Peekable<IntoIter<Bytes>>);
453
454impl Read for ArrowColumnChunkReader {
455 fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
456 let buffer = loop {
457 match self.0.peek_mut() {
458 Some(b) if b.is_empty() => {
459 self.0.next();
460 continue;
461 }
462 Some(b) => break b,
463 None => return Ok(0),
464 }
465 };
466
467 let len = buffer.len().min(out.len());
468 let b = buffer.split_to(len);
469 out[..len].copy_from_slice(&b);
470 Ok(len)
471 }
472}
473
474type SharedColumnChunk = Arc<Mutex<ArrowColumnChunkData>>;
479
480#[derive(Default)]
481struct ArrowPageWriter {
482 buffer: SharedColumnChunk,
483 #[cfg(feature = "encryption")]
484 page_encryptor: Option<PageEncryptor>,
485}
486
487impl ArrowPageWriter {
488 #[cfg(feature = "encryption")]
489 pub fn with_encryptor(mut self, page_encryptor: Option<PageEncryptor>) -> Self {
490 self.page_encryptor = page_encryptor;
491 self
492 }
493
494 #[cfg(feature = "encryption")]
495 fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
496 self.page_encryptor.as_mut()
497 }
498
499 #[cfg(not(feature = "encryption"))]
500 fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
501 None
502 }
503}
504
505impl PageWriter for ArrowPageWriter {
506 fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
507 let page = match self.page_encryptor_mut() {
508 Some(page_encryptor) => page_encryptor.encrypt_compressed_page(page)?,
509 None => page,
510 };
511
512 let page_header = page.to_thrift_header();
513 let header = {
514 let mut header = Vec::with_capacity(1024);
515
516 match self.page_encryptor_mut() {
517 Some(page_encryptor) => {
518 page_encryptor.encrypt_page_header(&page_header, &mut header)?;
519 if page.compressed_page().is_data_page() {
520 page_encryptor.increment_page();
521 }
522 }
523 None => {
524 let mut protocol = TCompactOutputProtocol::new(&mut header);
525 page_header.write_to_out_protocol(&mut protocol)?;
526 }
527 };
528
529 Bytes::from(header)
530 };
531
532 let mut buf = self.buffer.try_lock().unwrap();
533
534 let data = page.compressed_page().buffer().clone();
535 let compressed_size = data.len() + header.len();
536
537 let mut spec = PageWriteSpec::new();
538 spec.page_type = page.page_type();
539 spec.num_values = page.num_values();
540 spec.uncompressed_size = page.uncompressed_size() + header.len();
541 spec.offset = buf.length as u64;
542 spec.compressed_size = compressed_size;
543 spec.bytes_written = compressed_size as u64;
544
545 buf.length += compressed_size;
546 buf.data.push(header);
547 buf.data.push(data);
548
549 Ok(spec)
550 }
551
552 fn close(&mut self) -> Result<()> {
553 Ok(())
554 }
555}
556
557#[derive(Debug)]
559pub struct ArrowLeafColumn(ArrayLevels);
560
561pub fn compute_leaves(field: &Field, array: &ArrayRef) -> Result<Vec<ArrowLeafColumn>> {
563 let levels = calculate_array_levels(array, field)?;
564 Ok(levels.into_iter().map(ArrowLeafColumn).collect())
565}
566
567pub struct ArrowColumnChunk {
569 data: ArrowColumnChunkData,
570 close: ColumnCloseResult,
571}
572
573impl std::fmt::Debug for ArrowColumnChunk {
574 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
575 f.debug_struct("ArrowColumnChunk")
576 .field("length", &self.data.length)
577 .finish_non_exhaustive()
578 }
579}
580
581impl ArrowColumnChunk {
582 pub fn append_to_row_group<W: Write + Send>(
584 self,
585 writer: &mut SerializedRowGroupWriter<'_, W>,
586 ) -> Result<()> {
587 writer.append_column(&self.data, self.close)
588 }
589}
590
591pub struct ArrowColumnWriter {
685 writer: ArrowColumnWriterImpl,
686 chunk: SharedColumnChunk,
687}
688
689impl std::fmt::Debug for ArrowColumnWriter {
690 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
691 f.debug_struct("ArrowColumnWriter").finish_non_exhaustive()
692 }
693}
694
695enum ArrowColumnWriterImpl {
696 ByteArray(GenericColumnWriter<'static, ByteArrayEncoder>),
697 Column(ColumnWriter<'static>),
698}
699
700impl ArrowColumnWriter {
701 pub fn write(&mut self, col: &ArrowLeafColumn) -> Result<()> {
703 match &mut self.writer {
704 ArrowColumnWriterImpl::Column(c) => {
705 write_leaf(c, &col.0)?;
706 }
707 ArrowColumnWriterImpl::ByteArray(c) => {
708 write_primitive(c, col.0.array().as_ref(), &col.0)?;
709 }
710 }
711 Ok(())
712 }
713
714 pub fn close(self) -> Result<ArrowColumnChunk> {
716 let close = match self.writer {
717 ArrowColumnWriterImpl::ByteArray(c) => c.close()?,
718 ArrowColumnWriterImpl::Column(c) => c.close()?,
719 };
720 let chunk = Arc::try_unwrap(self.chunk).ok().unwrap();
721 let data = chunk.into_inner().unwrap();
722 Ok(ArrowColumnChunk { data, close })
723 }
724
725 pub fn memory_size(&self) -> usize {
736 match &self.writer {
737 ArrowColumnWriterImpl::ByteArray(c) => c.memory_size(),
738 ArrowColumnWriterImpl::Column(c) => c.memory_size(),
739 }
740 }
741
742 pub fn get_estimated_total_bytes(&self) -> usize {
750 match &self.writer {
751 ArrowColumnWriterImpl::ByteArray(c) => c.get_estimated_total_bytes() as _,
752 ArrowColumnWriterImpl::Column(c) => c.get_estimated_total_bytes() as _,
753 }
754 }
755}
756
757struct ArrowRowGroupWriter {
759 writers: Vec<ArrowColumnWriter>,
760 schema: SchemaRef,
761 buffered_rows: usize,
762}
763
764impl ArrowRowGroupWriter {
765 fn new(writers: Vec<ArrowColumnWriter>, arrow: &SchemaRef) -> Self {
766 Self {
767 writers,
768 schema: arrow.clone(),
769 buffered_rows: 0,
770 }
771 }
772
773 fn write(&mut self, batch: &RecordBatch) -> Result<()> {
774 self.buffered_rows += batch.num_rows();
775 let mut writers = self.writers.iter_mut();
776 for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
777 for leaf in compute_leaves(field.as_ref(), column)? {
778 writers.next().unwrap().write(&leaf)?
779 }
780 }
781 Ok(())
782 }
783
784 fn close(self) -> Result<Vec<ArrowColumnChunk>> {
785 self.writers
786 .into_iter()
787 .map(|writer| writer.close())
788 .collect()
789 }
790}
791
792struct ArrowRowGroupWriterFactory {
793 #[cfg(feature = "encryption")]
794 file_encryptor: Option<Arc<FileEncryptor>>,
795}
796
797impl ArrowRowGroupWriterFactory {
798 #[cfg(feature = "encryption")]
799 fn new<W: Write + Send>(file_writer: &SerializedFileWriter<W>) -> Self {
800 Self {
801 file_encryptor: file_writer.file_encryptor(),
802 }
803 }
804
805 #[cfg(not(feature = "encryption"))]
806 fn new<W: Write + Send>(_file_writer: &SerializedFileWriter<W>) -> Self {
807 Self {}
808 }
809
810 #[cfg(feature = "encryption")]
811 fn create_row_group_writer(
812 &self,
813 parquet: &SchemaDescriptor,
814 props: &WriterPropertiesPtr,
815 arrow: &SchemaRef,
816 row_group_index: usize,
817 ) -> Result<ArrowRowGroupWriter> {
818 let writers = get_column_writers_with_encryptor(
819 parquet,
820 props,
821 arrow,
822 self.file_encryptor.clone(),
823 row_group_index,
824 )?;
825 Ok(ArrowRowGroupWriter::new(writers, arrow))
826 }
827
828 #[cfg(not(feature = "encryption"))]
829 fn create_row_group_writer(
830 &self,
831 parquet: &SchemaDescriptor,
832 props: &WriterPropertiesPtr,
833 arrow: &SchemaRef,
834 _row_group_index: usize,
835 ) -> Result<ArrowRowGroupWriter> {
836 let writers = get_column_writers(parquet, props, arrow)?;
837 Ok(ArrowRowGroupWriter::new(writers, arrow))
838 }
839}
840
841pub fn get_column_writers(
843 parquet: &SchemaDescriptor,
844 props: &WriterPropertiesPtr,
845 arrow: &SchemaRef,
846) -> Result<Vec<ArrowColumnWriter>> {
847 let mut writers = Vec::with_capacity(arrow.fields.len());
848 let mut leaves = parquet.columns().iter();
849 let column_factory = ArrowColumnWriterFactory::new();
850 for field in &arrow.fields {
851 column_factory.get_arrow_column_writer(
852 field.data_type(),
853 props,
854 &mut leaves,
855 &mut writers,
856 )?;
857 }
858 Ok(writers)
859}
860
861#[cfg(feature = "encryption")]
863fn get_column_writers_with_encryptor(
864 parquet: &SchemaDescriptor,
865 props: &WriterPropertiesPtr,
866 arrow: &SchemaRef,
867 file_encryptor: Option<Arc<FileEncryptor>>,
868 row_group_index: usize,
869) -> Result<Vec<ArrowColumnWriter>> {
870 let mut writers = Vec::with_capacity(arrow.fields.len());
871 let mut leaves = parquet.columns().iter();
872 let column_factory =
873 ArrowColumnWriterFactory::new().with_file_encryptor(row_group_index, file_encryptor);
874 for field in &arrow.fields {
875 column_factory.get_arrow_column_writer(
876 field.data_type(),
877 props,
878 &mut leaves,
879 &mut writers,
880 )?;
881 }
882 Ok(writers)
883}
884
885struct ArrowColumnWriterFactory {
887 #[cfg(feature = "encryption")]
888 row_group_index: usize,
889 #[cfg(feature = "encryption")]
890 file_encryptor: Option<Arc<FileEncryptor>>,
891}
892
893impl ArrowColumnWriterFactory {
894 pub fn new() -> Self {
895 Self {
896 #[cfg(feature = "encryption")]
897 row_group_index: 0,
898 #[cfg(feature = "encryption")]
899 file_encryptor: None,
900 }
901 }
902
903 #[cfg(feature = "encryption")]
904 pub fn with_file_encryptor(
905 mut self,
906 row_group_index: usize,
907 file_encryptor: Option<Arc<FileEncryptor>>,
908 ) -> Self {
909 self.row_group_index = row_group_index;
910 self.file_encryptor = file_encryptor;
911 self
912 }
913
914 #[cfg(feature = "encryption")]
915 fn create_page_writer(
916 &self,
917 column_descriptor: &ColumnDescPtr,
918 column_index: usize,
919 ) -> Result<Box<ArrowPageWriter>> {
920 let column_path = column_descriptor.path().string();
921 let page_encryptor = PageEncryptor::create_if_column_encrypted(
922 &self.file_encryptor,
923 self.row_group_index,
924 column_index,
925 &column_path,
926 )?;
927 Ok(Box::new(
928 ArrowPageWriter::default().with_encryptor(page_encryptor),
929 ))
930 }
931
932 #[cfg(not(feature = "encryption"))]
933 fn create_page_writer(
934 &self,
935 _column_descriptor: &ColumnDescPtr,
936 _column_index: usize,
937 ) -> Result<Box<ArrowPageWriter>> {
938 Ok(Box::<ArrowPageWriter>::default())
939 }
940
941 fn get_arrow_column_writer(
943 &self,
944 data_type: &ArrowDataType,
945 props: &WriterPropertiesPtr,
946 leaves: &mut Iter<'_, ColumnDescPtr>,
947 out: &mut Vec<ArrowColumnWriter>,
948 ) -> Result<()> {
949 let col = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
950 let page_writer = self.create_page_writer(desc, out.len())?;
951 let chunk = page_writer.buffer.clone();
952 let writer = get_column_writer(desc.clone(), props.clone(), page_writer);
953 Ok(ArrowColumnWriter {
954 chunk,
955 writer: ArrowColumnWriterImpl::Column(writer),
956 })
957 };
958
959 let bytes = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
960 let page_writer = self.create_page_writer(desc, out.len())?;
961 let chunk = page_writer.buffer.clone();
962 let writer = GenericColumnWriter::new(desc.clone(), props.clone(), page_writer);
963 Ok(ArrowColumnWriter {
964 chunk,
965 writer: ArrowColumnWriterImpl::ByteArray(writer),
966 })
967 };
968
969 match data_type {
970 _ if data_type.is_primitive() => out.push(col(leaves.next().unwrap())?),
971 ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Boolean | ArrowDataType::Null => out.push(col(leaves.next().unwrap())?),
972 ArrowDataType::LargeBinary
973 | ArrowDataType::Binary
974 | ArrowDataType::Utf8
975 | ArrowDataType::LargeUtf8
976 | ArrowDataType::BinaryView
977 | ArrowDataType::Utf8View => {
978 out.push(bytes(leaves.next().unwrap())?)
979 }
980 ArrowDataType::List(f)
981 | ArrowDataType::LargeList(f)
982 | ArrowDataType::FixedSizeList(f, _) => {
983 self.get_arrow_column_writer(f.data_type(), props, leaves, out)?
984 }
985 ArrowDataType::Struct(fields) => {
986 for field in fields {
987 self.get_arrow_column_writer(field.data_type(), props, leaves, out)?
988 }
989 }
990 ArrowDataType::Map(f, _) => match f.data_type() {
991 ArrowDataType::Struct(f) => {
992 self.get_arrow_column_writer(f[0].data_type(), props, leaves, out)?;
993 self.get_arrow_column_writer(f[1].data_type(), props, leaves, out)?
994 }
995 _ => unreachable!("invalid map type"),
996 }
997 ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
998 ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 | ArrowDataType::Binary | ArrowDataType::LargeBinary => {
999 out.push(bytes(leaves.next().unwrap())?)
1000 }
1001 ArrowDataType::Utf8View | ArrowDataType::BinaryView => {
1002 out.push(bytes(leaves.next().unwrap())?)
1003 }
1004 ArrowDataType::FixedSizeBinary(_) => {
1005 out.push(bytes(leaves.next().unwrap())?)
1006 }
1007 _ => {
1008 out.push(col(leaves.next().unwrap())?)
1009 }
1010 }
1011 _ => return Err(ParquetError::NYI(
1012 format!(
1013 "Attempting to write an Arrow type {data_type:?} to parquet that is not yet implemented"
1014 )
1015 ))
1016 }
1017 Ok(())
1018 }
1019}
1020
1021fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result<usize> {
1022 let column = levels.array().as_ref();
1023 let indices = levels.non_null_indices();
1024 match writer {
1025 ColumnWriter::Int32ColumnWriter(ref mut typed) => {
1026 match column.data_type() {
1027 ArrowDataType::Date64 => {
1028 let array = arrow_cast::cast(column, &ArrowDataType::Date32)?;
1030 let array = arrow_cast::cast(&array, &ArrowDataType::Int32)?;
1031
1032 let array = array.as_primitive::<Int32Type>();
1033 write_primitive(typed, array.values(), levels)
1034 }
1035 ArrowDataType::UInt32 => {
1036 let values = column.as_primitive::<UInt32Type>().values();
1037 let array = values.inner().typed_data::<i32>();
1040 write_primitive(typed, array, levels)
1041 }
1042 ArrowDataType::Decimal128(_, _) => {
1043 let array = column
1045 .as_primitive::<Decimal128Type>()
1046 .unary::<_, Int32Type>(|v| v as i32);
1047 write_primitive(typed, array.values(), levels)
1048 }
1049 ArrowDataType::Decimal256(_, _) => {
1050 let array = column
1052 .as_primitive::<Decimal256Type>()
1053 .unary::<_, Int32Type>(|v| v.as_i128() as i32);
1054 write_primitive(typed, array.values(), levels)
1055 }
1056 ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1057 ArrowDataType::Decimal128(_, _) => {
1058 let array = arrow_cast::cast(column, value_type)?;
1059 let array = array
1060 .as_primitive::<Decimal128Type>()
1061 .unary::<_, Int32Type>(|v| v as i32);
1062 write_primitive(typed, array.values(), levels)
1063 }
1064 ArrowDataType::Decimal256(_, _) => {
1065 let array = arrow_cast::cast(column, value_type)?;
1066 let array = array
1067 .as_primitive::<Decimal256Type>()
1068 .unary::<_, Int32Type>(|v| v.as_i128() as i32);
1069 write_primitive(typed, array.values(), levels)
1070 }
1071 _ => {
1072 let array = arrow_cast::cast(column, &ArrowDataType::Int32)?;
1073 let array = array.as_primitive::<Int32Type>();
1074 write_primitive(typed, array.values(), levels)
1075 }
1076 },
1077 _ => {
1078 let array = arrow_cast::cast(column, &ArrowDataType::Int32)?;
1079 let array = array.as_primitive::<Int32Type>();
1080 write_primitive(typed, array.values(), levels)
1081 }
1082 }
1083 }
1084 ColumnWriter::BoolColumnWriter(ref mut typed) => {
1085 let array = column.as_boolean();
1086 typed.write_batch(
1087 get_bool_array_slice(array, indices).as_slice(),
1088 levels.def_levels(),
1089 levels.rep_levels(),
1090 )
1091 }
1092 ColumnWriter::Int64ColumnWriter(ref mut typed) => {
1093 match column.data_type() {
1094 ArrowDataType::Date64 => {
1095 let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
1096
1097 let array = array.as_primitive::<Int64Type>();
1098 write_primitive(typed, array.values(), levels)
1099 }
1100 ArrowDataType::Int64 => {
1101 let array = column.as_primitive::<Int64Type>();
1102 write_primitive(typed, array.values(), levels)
1103 }
1104 ArrowDataType::UInt64 => {
1105 let values = column.as_primitive::<UInt64Type>().values();
1106 let array = values.inner().typed_data::<i64>();
1109 write_primitive(typed, array, levels)
1110 }
1111 ArrowDataType::Decimal128(_, _) => {
1112 let array = column
1114 .as_primitive::<Decimal128Type>()
1115 .unary::<_, Int64Type>(|v| v as i64);
1116 write_primitive(typed, array.values(), levels)
1117 }
1118 ArrowDataType::Decimal256(_, _) => {
1119 let array = column
1121 .as_primitive::<Decimal256Type>()
1122 .unary::<_, Int64Type>(|v| v.as_i128() as i64);
1123 write_primitive(typed, array.values(), levels)
1124 }
1125 ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1126 ArrowDataType::Decimal128(_, _) => {
1127 let array = arrow_cast::cast(column, value_type)?;
1128 let array = array
1129 .as_primitive::<Decimal128Type>()
1130 .unary::<_, Int64Type>(|v| v as i64);
1131 write_primitive(typed, array.values(), levels)
1132 }
1133 ArrowDataType::Decimal256(_, _) => {
1134 let array = arrow_cast::cast(column, value_type)?;
1135 let array = array
1136 .as_primitive::<Decimal256Type>()
1137 .unary::<_, Int64Type>(|v| v.as_i128() as i64);
1138 write_primitive(typed, array.values(), levels)
1139 }
1140 _ => {
1141 let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
1142 let array = array.as_primitive::<Int64Type>();
1143 write_primitive(typed, array.values(), levels)
1144 }
1145 },
1146 _ => {
1147 let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
1148 let array = array.as_primitive::<Int64Type>();
1149 write_primitive(typed, array.values(), levels)
1150 }
1151 }
1152 }
1153 ColumnWriter::Int96ColumnWriter(ref mut _typed) => {
1154 unreachable!("Currently unreachable because data type not supported")
1155 }
1156 ColumnWriter::FloatColumnWriter(ref mut typed) => {
1157 let array = column.as_primitive::<Float32Type>();
1158 write_primitive(typed, array.values(), levels)
1159 }
1160 ColumnWriter::DoubleColumnWriter(ref mut typed) => {
1161 let array = column.as_primitive::<Float64Type>();
1162 write_primitive(typed, array.values(), levels)
1163 }
1164 ColumnWriter::ByteArrayColumnWriter(_) => {
1165 unreachable!("should use ByteArrayWriter")
1166 }
1167 ColumnWriter::FixedLenByteArrayColumnWriter(ref mut typed) => {
1168 let bytes = match column.data_type() {
1169 ArrowDataType::Interval(interval_unit) => match interval_unit {
1170 IntervalUnit::YearMonth => {
1171 let array = column
1172 .as_any()
1173 .downcast_ref::<arrow_array::IntervalYearMonthArray>()
1174 .unwrap();
1175 get_interval_ym_array_slice(array, indices)
1176 }
1177 IntervalUnit::DayTime => {
1178 let array = column
1179 .as_any()
1180 .downcast_ref::<arrow_array::IntervalDayTimeArray>()
1181 .unwrap();
1182 get_interval_dt_array_slice(array, indices)
1183 }
1184 _ => {
1185 return Err(ParquetError::NYI(
1186 format!(
1187 "Attempting to write an Arrow interval type {interval_unit:?} to parquet that is not yet implemented"
1188 )
1189 ));
1190 }
1191 },
1192 ArrowDataType::FixedSizeBinary(_) => {
1193 let array = column
1194 .as_any()
1195 .downcast_ref::<arrow_array::FixedSizeBinaryArray>()
1196 .unwrap();
1197 get_fsb_array_slice(array, indices)
1198 }
1199 ArrowDataType::Decimal128(_, _) => {
1200 let array = column.as_primitive::<Decimal128Type>();
1201 get_decimal_128_array_slice(array, indices)
1202 }
1203 ArrowDataType::Decimal256(_, _) => {
1204 let array = column
1205 .as_any()
1206 .downcast_ref::<arrow_array::Decimal256Array>()
1207 .unwrap();
1208 get_decimal_256_array_slice(array, indices)
1209 }
1210 ArrowDataType::Float16 => {
1211 let array = column.as_primitive::<Float16Type>();
1212 get_float_16_array_slice(array, indices)
1213 }
1214 _ => {
1215 return Err(ParquetError::NYI(
1216 "Attempting to write an Arrow type that is not yet implemented".to_string(),
1217 ));
1218 }
1219 };
1220 typed.write_batch(bytes.as_slice(), levels.def_levels(), levels.rep_levels())
1221 }
1222 }
1223}
1224
1225fn write_primitive<E: ColumnValueEncoder>(
1226 writer: &mut GenericColumnWriter<E>,
1227 values: &E::Values,
1228 levels: &ArrayLevels,
1229) -> Result<usize> {
1230 writer.write_batch_internal(
1231 values,
1232 Some(levels.non_null_indices()),
1233 levels.def_levels(),
1234 levels.rep_levels(),
1235 None,
1236 None,
1237 None,
1238 )
1239}
1240
1241fn get_bool_array_slice(array: &arrow_array::BooleanArray, indices: &[usize]) -> Vec<bool> {
1242 let mut values = Vec::with_capacity(indices.len());
1243 for i in indices {
1244 values.push(array.value(*i))
1245 }
1246 values
1247}
1248
1249fn get_interval_ym_array_slice(
1252 array: &arrow_array::IntervalYearMonthArray,
1253 indices: &[usize],
1254) -> Vec<FixedLenByteArray> {
1255 let mut values = Vec::with_capacity(indices.len());
1256 for i in indices {
1257 let mut value = array.value(*i).to_le_bytes().to_vec();
1258 let mut suffix = vec![0; 8];
1259 value.append(&mut suffix);
1260 values.push(FixedLenByteArray::from(ByteArray::from(value)))
1261 }
1262 values
1263}
1264
1265fn get_interval_dt_array_slice(
1268 array: &arrow_array::IntervalDayTimeArray,
1269 indices: &[usize],
1270) -> Vec<FixedLenByteArray> {
1271 let mut values = Vec::with_capacity(indices.len());
1272 for i in indices {
1273 let mut out = [0; 12];
1274 let value = array.value(*i);
1275 out[4..8].copy_from_slice(&value.days.to_le_bytes());
1276 out[8..12].copy_from_slice(&value.milliseconds.to_le_bytes());
1277 values.push(FixedLenByteArray::from(ByteArray::from(out.to_vec())));
1278 }
1279 values
1280}
1281
1282fn get_decimal_128_array_slice(
1283 array: &arrow_array::Decimal128Array,
1284 indices: &[usize],
1285) -> Vec<FixedLenByteArray> {
1286 let mut values = Vec::with_capacity(indices.len());
1287 let size = decimal_length_from_precision(array.precision());
1288 for i in indices {
1289 let as_be_bytes = array.value(*i).to_be_bytes();
1290 let resized_value = as_be_bytes[(16 - size)..].to_vec();
1291 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1292 }
1293 values
1294}
1295
1296fn get_decimal_256_array_slice(
1297 array: &arrow_array::Decimal256Array,
1298 indices: &[usize],
1299) -> Vec<FixedLenByteArray> {
1300 let mut values = Vec::with_capacity(indices.len());
1301 let size = decimal_length_from_precision(array.precision());
1302 for i in indices {
1303 let as_be_bytes = array.value(*i).to_be_bytes();
1304 let resized_value = as_be_bytes[(32 - size)..].to_vec();
1305 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1306 }
1307 values
1308}
1309
1310fn get_float_16_array_slice(
1311 array: &arrow_array::Float16Array,
1312 indices: &[usize],
1313) -> Vec<FixedLenByteArray> {
1314 let mut values = Vec::with_capacity(indices.len());
1315 for i in indices {
1316 let value = array.value(*i).to_le_bytes().to_vec();
1317 values.push(FixedLenByteArray::from(ByteArray::from(value)));
1318 }
1319 values
1320}
1321
1322fn get_fsb_array_slice(
1323 array: &arrow_array::FixedSizeBinaryArray,
1324 indices: &[usize],
1325) -> Vec<FixedLenByteArray> {
1326 let mut values = Vec::with_capacity(indices.len());
1327 for i in indices {
1328 let value = array.value(*i).to_vec();
1329 values.push(FixedLenByteArray::from(ByteArray::from(value)))
1330 }
1331 values
1332}
1333
1334#[cfg(test)]
1335mod tests {
1336 use super::*;
1337
1338 use std::fs::File;
1339 use std::io::Seek;
1340
1341 use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
1342 use crate::arrow::ARROW_SCHEMA_META_KEY;
1343 use crate::column::page::{Page, PageReader};
1344 use crate::file::page_encoding_stats::PageEncodingStats;
1345 use crate::file::reader::SerializedPageReader;
1346 use crate::format::PageHeader;
1347 use crate::schema::types::ColumnPath;
1348 use crate::thrift::TCompactSliceInputProtocol;
1349 use arrow::datatypes::ToByteSlice;
1350 use arrow::datatypes::{DataType, Schema};
1351 use arrow::error::Result as ArrowResult;
1352 use arrow::util::data_gen::create_random_array;
1353 use arrow::util::pretty::pretty_format_batches;
1354 use arrow::{array::*, buffer::Buffer};
1355 use arrow_buffer::{i256, IntervalDayTime, IntervalMonthDayNano, NullBuffer};
1356 use arrow_schema::Fields;
1357 use half::f16;
1358 use num::{FromPrimitive, ToPrimitive};
1359
1360 use crate::basic::Encoding;
1361 use crate::data_type::AsBytes;
1362 use crate::file::metadata::{ColumnChunkMetaData, ParquetMetaData, ParquetMetaDataReader};
1363 use crate::file::page_index::index::Index;
1364 use crate::file::properties::{
1365 BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion,
1366 };
1367 use crate::file::serialized_reader::ReadOptionsBuilder;
1368 use crate::file::{
1369 reader::{FileReader, SerializedFileReader},
1370 statistics::Statistics,
1371 };
1372
1373 #[test]
1374 fn arrow_writer() {
1375 let schema = Schema::new(vec![
1377 Field::new("a", DataType::Int32, false),
1378 Field::new("b", DataType::Int32, true),
1379 ]);
1380
1381 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1383 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1384
1385 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
1387
1388 roundtrip(batch, Some(SMALL_SIZE / 2));
1389 }
1390
1391 fn get_bytes_after_close(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1392 let mut buffer = vec![];
1393
1394 let mut writer = ArrowWriter::try_new(&mut buffer, schema, None).unwrap();
1395 writer.write(expected_batch).unwrap();
1396 writer.close().unwrap();
1397
1398 buffer
1399 }
1400
1401 fn get_bytes_by_into_inner(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1402 let mut writer = ArrowWriter::try_new(Vec::new(), schema, None).unwrap();
1403 writer.write(expected_batch).unwrap();
1404 writer.into_inner().unwrap()
1405 }
1406
1407 #[test]
1408 fn roundtrip_bytes() {
1409 let schema = Arc::new(Schema::new(vec![
1411 Field::new("a", DataType::Int32, false),
1412 Field::new("b", DataType::Int32, true),
1413 ]));
1414
1415 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1417 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1418
1419 let expected_batch =
1421 RecordBatch::try_new(schema.clone(), vec![Arc::new(a), Arc::new(b)]).unwrap();
1422
1423 for buffer in [
1424 get_bytes_after_close(schema.clone(), &expected_batch),
1425 get_bytes_by_into_inner(schema, &expected_batch),
1426 ] {
1427 let cursor = Bytes::from(buffer);
1428 let mut record_batch_reader = ParquetRecordBatchReader::try_new(cursor, 1024).unwrap();
1429
1430 let actual_batch = record_batch_reader
1431 .next()
1432 .expect("No batch found")
1433 .expect("Unable to get batch");
1434
1435 assert_eq!(expected_batch.schema(), actual_batch.schema());
1436 assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
1437 assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
1438 for i in 0..expected_batch.num_columns() {
1439 let expected_data = expected_batch.column(i).to_data();
1440 let actual_data = actual_batch.column(i).to_data();
1441
1442 assert_eq!(expected_data, actual_data);
1443 }
1444 }
1445 }
1446
1447 #[test]
1448 fn arrow_writer_non_null() {
1449 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1451
1452 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1454
1455 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1457
1458 roundtrip(batch, Some(SMALL_SIZE / 2));
1459 }
1460
1461 #[test]
1462 fn arrow_writer_list() {
1463 let schema = Schema::new(vec![Field::new(
1465 "a",
1466 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
1467 true,
1468 )]);
1469
1470 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1472
1473 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1476
1477 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
1479 DataType::Int32,
1480 false,
1481 ))))
1482 .len(5)
1483 .add_buffer(a_value_offsets)
1484 .add_child_data(a_values.into_data())
1485 .null_bit_buffer(Some(Buffer::from([0b00011011])))
1486 .build()
1487 .unwrap();
1488 let a = ListArray::from(a_list_data);
1489
1490 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1492
1493 assert_eq!(batch.column(0).null_count(), 1);
1494
1495 roundtrip(batch, None);
1498 }
1499
1500 #[test]
1501 fn arrow_writer_list_non_null() {
1502 let schema = Schema::new(vec![Field::new(
1504 "a",
1505 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
1506 false,
1507 )]);
1508
1509 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1511
1512 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1515
1516 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
1518 DataType::Int32,
1519 false,
1520 ))))
1521 .len(5)
1522 .add_buffer(a_value_offsets)
1523 .add_child_data(a_values.into_data())
1524 .build()
1525 .unwrap();
1526 let a = ListArray::from(a_list_data);
1527
1528 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1530
1531 assert_eq!(batch.column(0).null_count(), 0);
1534
1535 roundtrip(batch, None);
1536 }
1537
1538 #[test]
1539 fn arrow_writer_binary() {
1540 let string_field = Field::new("a", DataType::Utf8, false);
1541 let binary_field = Field::new("b", DataType::Binary, false);
1542 let schema = Schema::new(vec![string_field, binary_field]);
1543
1544 let raw_string_values = vec!["foo", "bar", "baz", "quux"];
1545 let raw_binary_values = [
1546 b"foo".to_vec(),
1547 b"bar".to_vec(),
1548 b"baz".to_vec(),
1549 b"quux".to_vec(),
1550 ];
1551 let raw_binary_value_refs = raw_binary_values
1552 .iter()
1553 .map(|x| x.as_slice())
1554 .collect::<Vec<_>>();
1555
1556 let string_values = StringArray::from(raw_string_values.clone());
1557 let binary_values = BinaryArray::from(raw_binary_value_refs);
1558 let batch = RecordBatch::try_new(
1559 Arc::new(schema),
1560 vec![Arc::new(string_values), Arc::new(binary_values)],
1561 )
1562 .unwrap();
1563
1564 roundtrip(batch, Some(SMALL_SIZE / 2));
1565 }
1566
1567 #[test]
1568 fn arrow_writer_binary_view() {
1569 let string_field = Field::new("a", DataType::Utf8View, false);
1570 let binary_field = Field::new("b", DataType::BinaryView, false);
1571 let nullable_string_field = Field::new("a", DataType::Utf8View, true);
1572 let schema = Schema::new(vec![string_field, binary_field, nullable_string_field]);
1573
1574 let raw_string_values = vec!["foo", "bar", "large payload over 12 bytes", "lulu"];
1575 let raw_binary_values = vec![
1576 b"foo".to_vec(),
1577 b"bar".to_vec(),
1578 b"large payload over 12 bytes".to_vec(),
1579 b"lulu".to_vec(),
1580 ];
1581 let nullable_string_values =
1582 vec![Some("foo"), None, Some("large payload over 12 bytes"), None];
1583
1584 let string_view_values = StringViewArray::from(raw_string_values);
1585 let binary_view_values = BinaryViewArray::from_iter_values(raw_binary_values);
1586 let nullable_string_view_values = StringViewArray::from(nullable_string_values);
1587 let batch = RecordBatch::try_new(
1588 Arc::new(schema),
1589 vec![
1590 Arc::new(string_view_values),
1591 Arc::new(binary_view_values),
1592 Arc::new(nullable_string_view_values),
1593 ],
1594 )
1595 .unwrap();
1596
1597 roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
1598 roundtrip(batch, None);
1599 }
1600
1601 fn get_decimal_batch(precision: u8, scale: i8) -> RecordBatch {
1602 let decimal_field = Field::new("a", DataType::Decimal128(precision, scale), false);
1603 let schema = Schema::new(vec![decimal_field]);
1604
1605 let decimal_values = vec![10_000, 50_000, 0, -100]
1606 .into_iter()
1607 .map(Some)
1608 .collect::<Decimal128Array>()
1609 .with_precision_and_scale(precision, scale)
1610 .unwrap();
1611
1612 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(decimal_values)]).unwrap()
1613 }
1614
1615 #[test]
1616 fn arrow_writer_decimal() {
1617 let batch_int32_decimal = get_decimal_batch(5, 2);
1619 roundtrip(batch_int32_decimal, Some(SMALL_SIZE / 2));
1620 let batch_int64_decimal = get_decimal_batch(12, 2);
1622 roundtrip(batch_int64_decimal, Some(SMALL_SIZE / 2));
1623 let batch_fixed_len_byte_array_decimal = get_decimal_batch(30, 2);
1625 roundtrip(batch_fixed_len_byte_array_decimal, Some(SMALL_SIZE / 2));
1626 }
1627
1628 #[test]
1629 fn arrow_writer_complex() {
1630 let struct_field_d = Arc::new(Field::new("d", DataType::Float64, true));
1632 let struct_field_f = Arc::new(Field::new("f", DataType::Float32, true));
1633 let struct_field_g = Arc::new(Field::new_list(
1634 "g",
1635 Field::new_list_field(DataType::Int16, true),
1636 false,
1637 ));
1638 let struct_field_h = Arc::new(Field::new_list(
1639 "h",
1640 Field::new_list_field(DataType::Int16, false),
1641 true,
1642 ));
1643 let struct_field_e = Arc::new(Field::new_struct(
1644 "e",
1645 vec![
1646 struct_field_f.clone(),
1647 struct_field_g.clone(),
1648 struct_field_h.clone(),
1649 ],
1650 false,
1651 ));
1652 let schema = Schema::new(vec![
1653 Field::new("a", DataType::Int32, false),
1654 Field::new("b", DataType::Int32, true),
1655 Field::new_struct(
1656 "c",
1657 vec![struct_field_d.clone(), struct_field_e.clone()],
1658 false,
1659 ),
1660 ]);
1661
1662 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1664 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1665 let d = Float64Array::from(vec![None, None, None, Some(1.0), None]);
1666 let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]);
1667
1668 let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1669
1670 let g_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1673
1674 let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
1676 .len(5)
1677 .add_buffer(g_value_offsets.clone())
1678 .add_child_data(g_value.to_data())
1679 .build()
1680 .unwrap();
1681 let g = ListArray::from(g_list_data);
1682 let h_list_data = ArrayData::builder(struct_field_h.data_type().clone())
1684 .len(5)
1685 .add_buffer(g_value_offsets)
1686 .add_child_data(g_value.to_data())
1687 .null_bit_buffer(Some(Buffer::from([0b00011011])))
1688 .build()
1689 .unwrap();
1690 let h = ListArray::from(h_list_data);
1691
1692 let e = StructArray::from(vec![
1693 (struct_field_f, Arc::new(f) as ArrayRef),
1694 (struct_field_g, Arc::new(g) as ArrayRef),
1695 (struct_field_h, Arc::new(h) as ArrayRef),
1696 ]);
1697
1698 let c = StructArray::from(vec![
1699 (struct_field_d, Arc::new(d) as ArrayRef),
1700 (struct_field_e, Arc::new(e) as ArrayRef),
1701 ]);
1702
1703 let batch = RecordBatch::try_new(
1705 Arc::new(schema),
1706 vec![Arc::new(a), Arc::new(b), Arc::new(c)],
1707 )
1708 .unwrap();
1709
1710 roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
1711 roundtrip(batch, Some(SMALL_SIZE / 3));
1712 }
1713
1714 #[test]
1715 fn arrow_writer_complex_mixed() {
1716 let offset_field = Arc::new(Field::new("offset", DataType::Int32, false));
1721 let partition_field = Arc::new(Field::new("partition", DataType::Int64, true));
1722 let topic_field = Arc::new(Field::new("topic", DataType::Utf8, true));
1723 let schema = Schema::new(vec![Field::new(
1724 "some_nested_object",
1725 DataType::Struct(Fields::from(vec![
1726 offset_field.clone(),
1727 partition_field.clone(),
1728 topic_field.clone(),
1729 ])),
1730 false,
1731 )]);
1732
1733 let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
1735 let partition = Int64Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1736 let topic = StringArray::from(vec![Some("A"), None, Some("A"), Some(""), None]);
1737
1738 let some_nested_object = StructArray::from(vec![
1739 (offset_field, Arc::new(offset) as ArrayRef),
1740 (partition_field, Arc::new(partition) as ArrayRef),
1741 (topic_field, Arc::new(topic) as ArrayRef),
1742 ]);
1743
1744 let batch =
1746 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
1747
1748 roundtrip(batch, Some(SMALL_SIZE / 2));
1749 }
1750
1751 #[test]
1752 fn arrow_writer_map() {
1753 let json_content = r#"
1755 {"stocks":{"long": "$AAA", "short": "$BBB"}}
1756 {"stocks":{"long": null, "long": "$CCC", "short": null}}
1757 {"stocks":{"hedged": "$YYY", "long": null, "short": "$D"}}
1758 "#;
1759 let entries_struct_type = DataType::Struct(Fields::from(vec![
1760 Field::new("key", DataType::Utf8, false),
1761 Field::new("value", DataType::Utf8, true),
1762 ]));
1763 let stocks_field = Field::new(
1764 "stocks",
1765 DataType::Map(
1766 Arc::new(Field::new("entries", entries_struct_type, false)),
1767 false,
1768 ),
1769 true,
1770 );
1771 let schema = Arc::new(Schema::new(vec![stocks_field]));
1772 let builder = arrow::json::ReaderBuilder::new(schema).with_batch_size(64);
1773 let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
1774
1775 let batch = reader.next().unwrap().unwrap();
1776 roundtrip(batch, None);
1777 }
1778
1779 #[test]
1780 fn arrow_writer_2_level_struct() {
1781 let field_c = Field::new("c", DataType::Int32, true);
1783 let field_b = Field::new("b", DataType::Struct(vec![field_c].into()), true);
1784 let type_a = DataType::Struct(vec![field_b.clone()].into());
1785 let field_a = Field::new("a", type_a, true);
1786 let schema = Schema::new(vec![field_a.clone()]);
1787
1788 let c = Int32Array::from(vec![Some(1), None, Some(3), None, None, Some(6)]);
1790 let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
1791 .len(6)
1792 .null_bit_buffer(Some(Buffer::from([0b00100111])))
1793 .add_child_data(c.into_data())
1794 .build()
1795 .unwrap();
1796 let b = StructArray::from(b_data);
1797 let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
1798 .len(6)
1799 .null_bit_buffer(Some(Buffer::from([0b00101111])))
1800 .add_child_data(b.into_data())
1801 .build()
1802 .unwrap();
1803 let a = StructArray::from(a_data);
1804
1805 assert_eq!(a.null_count(), 1);
1806 assert_eq!(a.column(0).null_count(), 2);
1807
1808 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1810
1811 roundtrip(batch, Some(SMALL_SIZE / 2));
1812 }
1813
1814 #[test]
1815 fn arrow_writer_2_level_struct_non_null() {
1816 let field_c = Field::new("c", DataType::Int32, false);
1818 let type_b = DataType::Struct(vec![field_c].into());
1819 let field_b = Field::new("b", type_b.clone(), false);
1820 let type_a = DataType::Struct(vec![field_b].into());
1821 let field_a = Field::new("a", type_a.clone(), false);
1822 let schema = Schema::new(vec![field_a]);
1823
1824 let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
1826 let b_data = ArrayDataBuilder::new(type_b)
1827 .len(6)
1828 .add_child_data(c.into_data())
1829 .build()
1830 .unwrap();
1831 let b = StructArray::from(b_data);
1832 let a_data = ArrayDataBuilder::new(type_a)
1833 .len(6)
1834 .add_child_data(b.into_data())
1835 .build()
1836 .unwrap();
1837 let a = StructArray::from(a_data);
1838
1839 assert_eq!(a.null_count(), 0);
1840 assert_eq!(a.column(0).null_count(), 0);
1841
1842 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1844
1845 roundtrip(batch, Some(SMALL_SIZE / 2));
1846 }
1847
1848 #[test]
1849 fn arrow_writer_2_level_struct_mixed_null() {
1850 let field_c = Field::new("c", DataType::Int32, false);
1852 let type_b = DataType::Struct(vec![field_c].into());
1853 let field_b = Field::new("b", type_b.clone(), true);
1854 let type_a = DataType::Struct(vec![field_b].into());
1855 let field_a = Field::new("a", type_a.clone(), false);
1856 let schema = Schema::new(vec![field_a]);
1857
1858 let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
1860 let b_data = ArrayDataBuilder::new(type_b)
1861 .len(6)
1862 .null_bit_buffer(Some(Buffer::from([0b00100111])))
1863 .add_child_data(c.into_data())
1864 .build()
1865 .unwrap();
1866 let b = StructArray::from(b_data);
1867 let a_data = ArrayDataBuilder::new(type_a)
1869 .len(6)
1870 .add_child_data(b.into_data())
1871 .build()
1872 .unwrap();
1873 let a = StructArray::from(a_data);
1874
1875 assert_eq!(a.null_count(), 0);
1876 assert_eq!(a.column(0).null_count(), 2);
1877
1878 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1880
1881 roundtrip(batch, Some(SMALL_SIZE / 2));
1882 }
1883
1884 #[test]
1885 fn arrow_writer_2_level_struct_mixed_null_2() {
1886 let field_c = Field::new("c", DataType::Int32, false);
1888 let field_d = Field::new("d", DataType::FixedSizeBinary(4), false);
1889 let field_e = Field::new(
1890 "e",
1891 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1892 false,
1893 );
1894
1895 let field_b = Field::new(
1896 "b",
1897 DataType::Struct(vec![field_c, field_d, field_e].into()),
1898 false,
1899 );
1900 let type_a = DataType::Struct(vec![field_b.clone()].into());
1901 let field_a = Field::new("a", type_a, true);
1902 let schema = Schema::new(vec![field_a.clone()]);
1903
1904 let c = Int32Array::from_iter_values(0..6);
1906 let d = FixedSizeBinaryArray::try_from_iter(
1907 ["aaaa", "bbbb", "cccc", "dddd", "eeee", "ffff"].into_iter(),
1908 )
1909 .expect("four byte values");
1910 let e = Int32DictionaryArray::from_iter(["one", "two", "three", "four", "five", "one"]);
1911 let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
1912 .len(6)
1913 .add_child_data(c.into_data())
1914 .add_child_data(d.into_data())
1915 .add_child_data(e.into_data())
1916 .build()
1917 .unwrap();
1918 let b = StructArray::from(b_data);
1919 let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
1920 .len(6)
1921 .null_bit_buffer(Some(Buffer::from([0b00100101])))
1922 .add_child_data(b.into_data())
1923 .build()
1924 .unwrap();
1925 let a = StructArray::from(a_data);
1926
1927 assert_eq!(a.null_count(), 3);
1928 assert_eq!(a.column(0).null_count(), 0);
1929
1930 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1932
1933 roundtrip(batch, Some(SMALL_SIZE / 2));
1934 }
1935
1936 #[test]
1937 fn test_fixed_size_binary_in_dict() {
1938 fn test_fixed_size_binary_in_dict_inner<K>()
1939 where
1940 K: ArrowDictionaryKeyType,
1941 K::Native: FromPrimitive + ToPrimitive + TryFrom<u8>,
1942 <<K as arrow_array::ArrowPrimitiveType>::Native as TryFrom<u8>>::Error: std::fmt::Debug,
1943 {
1944 let field = Field::new(
1945 "a",
1946 DataType::Dictionary(
1947 Box::new(K::DATA_TYPE),
1948 Box::new(DataType::FixedSizeBinary(4)),
1949 ),
1950 false,
1951 );
1952 let schema = Schema::new(vec![field]);
1953
1954 let keys: Vec<K::Native> = vec![
1955 K::Native::try_from(0u8).unwrap(),
1956 K::Native::try_from(0u8).unwrap(),
1957 K::Native::try_from(1u8).unwrap(),
1958 ];
1959 let keys = PrimitiveArray::<K>::from_iter_values(keys);
1960 let values = FixedSizeBinaryArray::try_from_iter(
1961 vec![vec![0, 0, 0, 0], vec![1, 1, 1, 1]].into_iter(),
1962 )
1963 .unwrap();
1964
1965 let data = DictionaryArray::<K>::new(keys, Arc::new(values));
1966 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(data)]).unwrap();
1967 roundtrip(batch, None);
1968 }
1969
1970 test_fixed_size_binary_in_dict_inner::<UInt8Type>();
1971 test_fixed_size_binary_in_dict_inner::<UInt16Type>();
1972 test_fixed_size_binary_in_dict_inner::<UInt32Type>();
1973 test_fixed_size_binary_in_dict_inner::<UInt16Type>();
1974 test_fixed_size_binary_in_dict_inner::<Int8Type>();
1975 test_fixed_size_binary_in_dict_inner::<Int16Type>();
1976 test_fixed_size_binary_in_dict_inner::<Int32Type>();
1977 test_fixed_size_binary_in_dict_inner::<Int64Type>();
1978 }
1979
1980 #[test]
1981 fn test_empty_dict() {
1982 let struct_fields = Fields::from(vec![Field::new(
1983 "dict",
1984 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1985 false,
1986 )]);
1987
1988 let schema = Schema::new(vec![Field::new_struct(
1989 "struct",
1990 struct_fields.clone(),
1991 true,
1992 )]);
1993 let dictionary = Arc::new(DictionaryArray::new(
1994 Int32Array::new_null(5),
1995 Arc::new(StringArray::new_null(0)),
1996 ));
1997
1998 let s = StructArray::new(
1999 struct_fields,
2000 vec![dictionary],
2001 Some(NullBuffer::new_null(5)),
2002 );
2003
2004 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(s)]).unwrap();
2005 roundtrip(batch, None);
2006 }
2007 #[test]
2008 fn arrow_writer_page_size() {
2009 let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)]));
2010
2011 let mut builder = StringBuilder::with_capacity(100, 329 * 10_000);
2012
2013 for i in 0..10 {
2015 let value = i
2016 .to_string()
2017 .repeat(10)
2018 .chars()
2019 .take(10)
2020 .collect::<String>();
2021
2022 builder.append_value(value);
2023 }
2024
2025 let array = Arc::new(builder.finish());
2026
2027 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
2028
2029 let file = tempfile::tempfile().unwrap();
2030
2031 let props = WriterProperties::builder()
2033 .set_data_page_size_limit(1)
2034 .set_dictionary_page_size_limit(1)
2035 .set_write_batch_size(1)
2036 .build();
2037
2038 let mut writer =
2039 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
2040 .expect("Unable to write file");
2041 writer.write(&batch).unwrap();
2042 writer.close().unwrap();
2043
2044 let options = ReadOptionsBuilder::new().with_page_index().build();
2045 let reader =
2046 SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap();
2047
2048 let column = reader.metadata().row_group(0).columns();
2049
2050 assert_eq!(column.len(), 1);
2051
2052 assert!(
2055 column[0].dictionary_page_offset().is_some(),
2056 "Expected a dictionary page"
2057 );
2058
2059 assert!(reader.metadata().offset_index().is_some());
2060 let offset_indexes = &reader.metadata().offset_index().unwrap()[0];
2061
2062 let page_locations = offset_indexes[0].page_locations.clone();
2063
2064 assert_eq!(
2067 page_locations.len(),
2068 10,
2069 "Expected 10 pages but got {page_locations:#?}"
2070 );
2071 }
2072
2073 #[test]
2074 fn arrow_writer_float_nans() {
2075 let f16_field = Field::new("a", DataType::Float16, false);
2076 let f32_field = Field::new("b", DataType::Float32, false);
2077 let f64_field = Field::new("c", DataType::Float64, false);
2078 let schema = Schema::new(vec![f16_field, f32_field, f64_field]);
2079
2080 let f16_values = (0..MEDIUM_SIZE)
2081 .map(|i| {
2082 Some(if i % 2 == 0 {
2083 f16::NAN
2084 } else {
2085 f16::from_f32(i as f32)
2086 })
2087 })
2088 .collect::<Float16Array>();
2089
2090 let f32_values = (0..MEDIUM_SIZE)
2091 .map(|i| Some(if i % 2 == 0 { f32::NAN } else { i as f32 }))
2092 .collect::<Float32Array>();
2093
2094 let f64_values = (0..MEDIUM_SIZE)
2095 .map(|i| Some(if i % 2 == 0 { f64::NAN } else { i as f64 }))
2096 .collect::<Float64Array>();
2097
2098 let batch = RecordBatch::try_new(
2099 Arc::new(schema),
2100 vec![
2101 Arc::new(f16_values),
2102 Arc::new(f32_values),
2103 Arc::new(f64_values),
2104 ],
2105 )
2106 .unwrap();
2107
2108 roundtrip(batch, None);
2109 }
2110
2111 const SMALL_SIZE: usize = 7;
2112 const MEDIUM_SIZE: usize = 63;
2113
2114 fn roundtrip(expected_batch: RecordBatch, max_row_group_size: Option<usize>) -> Vec<File> {
2115 let mut files = vec![];
2116 for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2117 let mut props = WriterProperties::builder().set_writer_version(version);
2118
2119 if let Some(size) = max_row_group_size {
2120 props = props.set_max_row_group_size(size)
2121 }
2122
2123 let props = props.build();
2124 files.push(roundtrip_opts(&expected_batch, props))
2125 }
2126 files
2127 }
2128
2129 fn roundtrip_opts_with_array_validation<F>(
2130 expected_batch: &RecordBatch,
2131 props: WriterProperties,
2132 validate: F,
2133 ) -> File
2134 where
2135 F: Fn(&ArrayData, &ArrayData),
2136 {
2137 let file = tempfile::tempfile().unwrap();
2138
2139 let mut writer = ArrowWriter::try_new(
2140 file.try_clone().unwrap(),
2141 expected_batch.schema(),
2142 Some(props),
2143 )
2144 .expect("Unable to write file");
2145 writer.write(expected_batch).unwrap();
2146 writer.close().unwrap();
2147
2148 let mut record_batch_reader =
2149 ParquetRecordBatchReader::try_new(file.try_clone().unwrap(), 1024).unwrap();
2150
2151 let actual_batch = record_batch_reader
2152 .next()
2153 .expect("No batch found")
2154 .expect("Unable to get batch");
2155
2156 assert_eq!(expected_batch.schema(), actual_batch.schema());
2157 assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
2158 assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
2159 for i in 0..expected_batch.num_columns() {
2160 let expected_data = expected_batch.column(i).to_data();
2161 let actual_data = actual_batch.column(i).to_data();
2162 validate(&expected_data, &actual_data);
2163 }
2164
2165 file
2166 }
2167
2168 fn roundtrip_opts(expected_batch: &RecordBatch, props: WriterProperties) -> File {
2169 roundtrip_opts_with_array_validation(expected_batch, props, |a, b| {
2170 a.validate_full().expect("valid expected data");
2171 b.validate_full().expect("valid actual data");
2172 assert_eq!(a, b)
2173 })
2174 }
2175
2176 struct RoundTripOptions {
2177 values: ArrayRef,
2178 schema: SchemaRef,
2179 bloom_filter: bool,
2180 bloom_filter_position: BloomFilterPosition,
2181 }
2182
2183 impl RoundTripOptions {
2184 fn new(values: ArrayRef, nullable: bool) -> Self {
2185 let data_type = values.data_type().clone();
2186 let schema = Schema::new(vec![Field::new("col", data_type, nullable)]);
2187 Self {
2188 values,
2189 schema: Arc::new(schema),
2190 bloom_filter: false,
2191 bloom_filter_position: BloomFilterPosition::AfterRowGroup,
2192 }
2193 }
2194 }
2195
2196 fn one_column_roundtrip(values: ArrayRef, nullable: bool) -> Vec<File> {
2197 one_column_roundtrip_with_options(RoundTripOptions::new(values, nullable))
2198 }
2199
2200 fn one_column_roundtrip_with_schema(values: ArrayRef, schema: SchemaRef) -> Vec<File> {
2201 let mut options = RoundTripOptions::new(values, false);
2202 options.schema = schema;
2203 one_column_roundtrip_with_options(options)
2204 }
2205
2206 fn one_column_roundtrip_with_options(options: RoundTripOptions) -> Vec<File> {
2207 let RoundTripOptions {
2208 values,
2209 schema,
2210 bloom_filter,
2211 bloom_filter_position,
2212 } = options;
2213
2214 let encodings = match values.data_type() {
2215 DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary | DataType::LargeBinary => {
2216 vec![
2217 Encoding::PLAIN,
2218 Encoding::DELTA_BYTE_ARRAY,
2219 Encoding::DELTA_LENGTH_BYTE_ARRAY,
2220 ]
2221 }
2222 DataType::Int64
2223 | DataType::Int32
2224 | DataType::Int16
2225 | DataType::Int8
2226 | DataType::UInt64
2227 | DataType::UInt32
2228 | DataType::UInt16
2229 | DataType::UInt8 => vec![
2230 Encoding::PLAIN,
2231 Encoding::DELTA_BINARY_PACKED,
2232 Encoding::BYTE_STREAM_SPLIT,
2233 ],
2234 DataType::Float32 | DataType::Float64 => {
2235 vec![Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT]
2236 }
2237 _ => vec![Encoding::PLAIN],
2238 };
2239
2240 let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2241
2242 let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
2243
2244 let mut files = vec![];
2245 for dictionary_size in [0, 1, 1024] {
2246 for encoding in &encodings {
2247 for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2248 for row_group_size in row_group_sizes {
2249 let props = WriterProperties::builder()
2250 .set_writer_version(version)
2251 .set_max_row_group_size(row_group_size)
2252 .set_dictionary_enabled(dictionary_size != 0)
2253 .set_dictionary_page_size_limit(dictionary_size.max(1))
2254 .set_encoding(*encoding)
2255 .set_bloom_filter_enabled(bloom_filter)
2256 .set_bloom_filter_position(bloom_filter_position)
2257 .build();
2258
2259 files.push(roundtrip_opts(&expected_batch, props))
2260 }
2261 }
2262 }
2263 }
2264 files
2265 }
2266
2267 fn values_required<A, I>(iter: I) -> Vec<File>
2268 where
2269 A: From<Vec<I::Item>> + Array + 'static,
2270 I: IntoIterator,
2271 {
2272 let raw_values: Vec<_> = iter.into_iter().collect();
2273 let values = Arc::new(A::from(raw_values));
2274 one_column_roundtrip(values, false)
2275 }
2276
2277 fn values_optional<A, I>(iter: I) -> Vec<File>
2278 where
2279 A: From<Vec<Option<I::Item>>> + Array + 'static,
2280 I: IntoIterator,
2281 {
2282 let optional_raw_values: Vec<_> = iter
2283 .into_iter()
2284 .enumerate()
2285 .map(|(i, v)| if i % 2 == 0 { None } else { Some(v) })
2286 .collect();
2287 let optional_values = Arc::new(A::from(optional_raw_values));
2288 one_column_roundtrip(optional_values, true)
2289 }
2290
2291 fn required_and_optional<A, I>(iter: I)
2292 where
2293 A: From<Vec<I::Item>> + From<Vec<Option<I::Item>>> + Array + 'static,
2294 I: IntoIterator + Clone,
2295 {
2296 values_required::<A, I>(iter.clone());
2297 values_optional::<A, I>(iter);
2298 }
2299
2300 fn check_bloom_filter<T: AsBytes>(
2301 files: Vec<File>,
2302 file_column: String,
2303 positive_values: Vec<T>,
2304 negative_values: Vec<T>,
2305 ) {
2306 files.into_iter().take(1).for_each(|file| {
2307 let file_reader = SerializedFileReader::new_with_options(
2308 file,
2309 ReadOptionsBuilder::new()
2310 .with_reader_properties(
2311 ReaderProperties::builder()
2312 .set_read_bloom_filter(true)
2313 .build(),
2314 )
2315 .build(),
2316 )
2317 .expect("Unable to open file as Parquet");
2318 let metadata = file_reader.metadata();
2319
2320 let mut bloom_filters: Vec<_> = vec![];
2322 for (ri, row_group) in metadata.row_groups().iter().enumerate() {
2323 if let Some((column_index, _)) = row_group
2324 .columns()
2325 .iter()
2326 .enumerate()
2327 .find(|(_, column)| column.column_path().string() == file_column)
2328 {
2329 let row_group_reader = file_reader
2330 .get_row_group(ri)
2331 .expect("Unable to read row group");
2332 if let Some(sbbf) = row_group_reader.get_column_bloom_filter(column_index) {
2333 bloom_filters.push(sbbf.clone());
2334 } else {
2335 panic!("No bloom filter for column named {file_column} found");
2336 }
2337 } else {
2338 panic!("No column named {file_column} found");
2339 }
2340 }
2341
2342 positive_values.iter().for_each(|value| {
2343 let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2344 assert!(
2345 found.is_some(),
2346 "{}",
2347 format!("Value {:?} should be in bloom filter", value.as_bytes())
2348 );
2349 });
2350
2351 negative_values.iter().for_each(|value| {
2352 let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2353 assert!(
2354 found.is_none(),
2355 "{}",
2356 format!("Value {:?} should not be in bloom filter", value.as_bytes())
2357 );
2358 });
2359 });
2360 }
2361
2362 #[test]
2363 fn all_null_primitive_single_column() {
2364 let values = Arc::new(Int32Array::from(vec![None; SMALL_SIZE]));
2365 one_column_roundtrip(values, true);
2366 }
2367 #[test]
2368 fn null_single_column() {
2369 let values = Arc::new(NullArray::new(SMALL_SIZE));
2370 one_column_roundtrip(values, true);
2371 }
2373
2374 #[test]
2375 fn bool_single_column() {
2376 required_and_optional::<BooleanArray, _>(
2377 [true, false].iter().cycle().copied().take(SMALL_SIZE),
2378 );
2379 }
2380
2381 #[test]
2382 fn bool_large_single_column() {
2383 let values = Arc::new(
2384 [None, Some(true), Some(false)]
2385 .iter()
2386 .cycle()
2387 .copied()
2388 .take(200_000)
2389 .collect::<BooleanArray>(),
2390 );
2391 let schema = Schema::new(vec![Field::new("col", values.data_type().clone(), true)]);
2392 let expected_batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2393 let file = tempfile::tempfile().unwrap();
2394
2395 let mut writer =
2396 ArrowWriter::try_new(file.try_clone().unwrap(), expected_batch.schema(), None)
2397 .expect("Unable to write file");
2398 writer.write(&expected_batch).unwrap();
2399 writer.close().unwrap();
2400 }
2401
2402 #[test]
2403 fn check_page_offset_index_with_nan() {
2404 let values = Arc::new(Float64Array::from(vec![f64::NAN; 10]));
2405 let schema = Schema::new(vec![Field::new("col", DataType::Float64, true)]);
2406 let batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2407
2408 let mut out = Vec::with_capacity(1024);
2409 let mut writer =
2410 ArrowWriter::try_new(&mut out, batch.schema(), None).expect("Unable to write file");
2411 writer.write(&batch).unwrap();
2412 let file_meta_data = writer.close().unwrap();
2413 for row_group in file_meta_data.row_groups {
2414 for column in row_group.columns {
2415 assert!(column.offset_index_offset.is_some());
2416 assert!(column.offset_index_length.is_some());
2417 assert!(column.column_index_offset.is_none());
2418 assert!(column.column_index_length.is_none());
2419 }
2420 }
2421 }
2422
2423 #[test]
2424 fn i8_single_column() {
2425 required_and_optional::<Int8Array, _>(0..SMALL_SIZE as i8);
2426 }
2427
2428 #[test]
2429 fn i16_single_column() {
2430 required_and_optional::<Int16Array, _>(0..SMALL_SIZE as i16);
2431 }
2432
2433 #[test]
2434 fn i32_single_column() {
2435 required_and_optional::<Int32Array, _>(0..SMALL_SIZE as i32);
2436 }
2437
2438 #[test]
2439 fn i64_single_column() {
2440 required_and_optional::<Int64Array, _>(0..SMALL_SIZE as i64);
2441 }
2442
2443 #[test]
2444 fn u8_single_column() {
2445 required_and_optional::<UInt8Array, _>(0..SMALL_SIZE as u8);
2446 }
2447
2448 #[test]
2449 fn u16_single_column() {
2450 required_and_optional::<UInt16Array, _>(0..SMALL_SIZE as u16);
2451 }
2452
2453 #[test]
2454 fn u32_single_column() {
2455 required_and_optional::<UInt32Array, _>(0..SMALL_SIZE as u32);
2456 }
2457
2458 #[test]
2459 fn u64_single_column() {
2460 required_and_optional::<UInt64Array, _>(0..SMALL_SIZE as u64);
2461 }
2462
2463 #[test]
2464 fn f32_single_column() {
2465 required_and_optional::<Float32Array, _>((0..SMALL_SIZE).map(|i| i as f32));
2466 }
2467
2468 #[test]
2469 fn f64_single_column() {
2470 required_and_optional::<Float64Array, _>((0..SMALL_SIZE).map(|i| i as f64));
2471 }
2472
2473 #[test]
2478 fn timestamp_second_single_column() {
2479 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2480 let values = Arc::new(TimestampSecondArray::from(raw_values));
2481
2482 one_column_roundtrip(values, false);
2483 }
2484
2485 #[test]
2486 fn timestamp_millisecond_single_column() {
2487 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2488 let values = Arc::new(TimestampMillisecondArray::from(raw_values));
2489
2490 one_column_roundtrip(values, false);
2491 }
2492
2493 #[test]
2494 fn timestamp_microsecond_single_column() {
2495 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2496 let values = Arc::new(TimestampMicrosecondArray::from(raw_values));
2497
2498 one_column_roundtrip(values, false);
2499 }
2500
2501 #[test]
2502 fn timestamp_nanosecond_single_column() {
2503 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2504 let values = Arc::new(TimestampNanosecondArray::from(raw_values));
2505
2506 one_column_roundtrip(values, false);
2507 }
2508
2509 #[test]
2510 fn date32_single_column() {
2511 required_and_optional::<Date32Array, _>(0..SMALL_SIZE as i32);
2512 }
2513
2514 #[test]
2515 fn date64_single_column() {
2516 required_and_optional::<Date64Array, _>(
2518 (0..(SMALL_SIZE as i64 * 86400000)).step_by(86400000),
2519 );
2520 }
2521
2522 #[test]
2523 fn time32_second_single_column() {
2524 required_and_optional::<Time32SecondArray, _>(0..SMALL_SIZE as i32);
2525 }
2526
2527 #[test]
2528 fn time32_millisecond_single_column() {
2529 required_and_optional::<Time32MillisecondArray, _>(0..SMALL_SIZE as i32);
2530 }
2531
2532 #[test]
2533 fn time64_microsecond_single_column() {
2534 required_and_optional::<Time64MicrosecondArray, _>(0..SMALL_SIZE as i64);
2535 }
2536
2537 #[test]
2538 fn time64_nanosecond_single_column() {
2539 required_and_optional::<Time64NanosecondArray, _>(0..SMALL_SIZE as i64);
2540 }
2541
2542 #[test]
2543 fn duration_second_single_column() {
2544 required_and_optional::<DurationSecondArray, _>(0..SMALL_SIZE as i64);
2545 }
2546
2547 #[test]
2548 fn duration_millisecond_single_column() {
2549 required_and_optional::<DurationMillisecondArray, _>(0..SMALL_SIZE as i64);
2550 }
2551
2552 #[test]
2553 fn duration_microsecond_single_column() {
2554 required_and_optional::<DurationMicrosecondArray, _>(0..SMALL_SIZE as i64);
2555 }
2556
2557 #[test]
2558 fn duration_nanosecond_single_column() {
2559 required_and_optional::<DurationNanosecondArray, _>(0..SMALL_SIZE as i64);
2560 }
2561
2562 #[test]
2563 fn interval_year_month_single_column() {
2564 required_and_optional::<IntervalYearMonthArray, _>(0..SMALL_SIZE as i32);
2565 }
2566
2567 #[test]
2568 fn interval_day_time_single_column() {
2569 required_and_optional::<IntervalDayTimeArray, _>(vec![
2570 IntervalDayTime::new(0, 1),
2571 IntervalDayTime::new(0, 3),
2572 IntervalDayTime::new(3, -2),
2573 IntervalDayTime::new(-200, 4),
2574 ]);
2575 }
2576
2577 #[test]
2578 #[should_panic(
2579 expected = "Attempting to write an Arrow interval type MonthDayNano to parquet that is not yet implemented"
2580 )]
2581 fn interval_month_day_nano_single_column() {
2582 required_and_optional::<IntervalMonthDayNanoArray, _>(vec![
2583 IntervalMonthDayNano::new(0, 1, 5),
2584 IntervalMonthDayNano::new(0, 3, 2),
2585 IntervalMonthDayNano::new(3, -2, -5),
2586 IntervalMonthDayNano::new(-200, 4, -1),
2587 ]);
2588 }
2589
2590 #[test]
2591 fn binary_single_column() {
2592 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2593 let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect();
2594 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2595
2596 values_required::<BinaryArray, _>(many_vecs_iter);
2598 }
2599
2600 #[test]
2601 fn binary_view_single_column() {
2602 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2603 let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect();
2604 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2605
2606 values_required::<BinaryViewArray, _>(many_vecs_iter);
2608 }
2609
2610 #[test]
2611 fn i32_column_bloom_filter_at_end() {
2612 let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
2613 let mut options = RoundTripOptions::new(array, false);
2614 options.bloom_filter = true;
2615 options.bloom_filter_position = BloomFilterPosition::End;
2616
2617 let files = one_column_roundtrip_with_options(options);
2618 check_bloom_filter(
2619 files,
2620 "col".to_string(),
2621 (0..SMALL_SIZE as i32).collect(),
2622 (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
2623 );
2624 }
2625
2626 #[test]
2627 fn i32_column_bloom_filter() {
2628 let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
2629 let mut options = RoundTripOptions::new(array, false);
2630 options.bloom_filter = true;
2631
2632 let files = one_column_roundtrip_with_options(options);
2633 check_bloom_filter(
2634 files,
2635 "col".to_string(),
2636 (0..SMALL_SIZE as i32).collect(),
2637 (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
2638 );
2639 }
2640
2641 #[test]
2642 fn binary_column_bloom_filter() {
2643 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2644 let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect();
2645 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2646
2647 let array = Arc::new(BinaryArray::from_iter_values(many_vecs_iter));
2648 let mut options = RoundTripOptions::new(array, false);
2649 options.bloom_filter = true;
2650
2651 let files = one_column_roundtrip_with_options(options);
2652 check_bloom_filter(
2653 files,
2654 "col".to_string(),
2655 many_vecs,
2656 vec![vec![(SMALL_SIZE + 1) as u8]],
2657 );
2658 }
2659
2660 #[test]
2661 fn empty_string_null_column_bloom_filter() {
2662 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2663 let raw_strs = raw_values.iter().map(|s| s.as_str());
2664
2665 let array = Arc::new(StringArray::from_iter_values(raw_strs));
2666 let mut options = RoundTripOptions::new(array, false);
2667 options.bloom_filter = true;
2668
2669 let files = one_column_roundtrip_with_options(options);
2670
2671 let optional_raw_values: Vec<_> = raw_values
2672 .iter()
2673 .enumerate()
2674 .filter_map(|(i, v)| if i % 2 == 0 { None } else { Some(v.as_str()) })
2675 .collect();
2676 check_bloom_filter(files, "col".to_string(), optional_raw_values, vec![""]);
2678 }
2679
2680 #[test]
2681 fn large_binary_single_column() {
2682 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2683 let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect();
2684 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2685
2686 values_required::<LargeBinaryArray, _>(many_vecs_iter);
2688 }
2689
2690 #[test]
2691 fn fixed_size_binary_single_column() {
2692 let mut builder = FixedSizeBinaryBuilder::new(4);
2693 builder.append_value(b"0123").unwrap();
2694 builder.append_null();
2695 builder.append_value(b"8910").unwrap();
2696 builder.append_value(b"1112").unwrap();
2697 let array = Arc::new(builder.finish());
2698
2699 one_column_roundtrip(array, true);
2700 }
2701
2702 #[test]
2703 fn string_single_column() {
2704 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2705 let raw_strs = raw_values.iter().map(|s| s.as_str());
2706
2707 required_and_optional::<StringArray, _>(raw_strs);
2708 }
2709
2710 #[test]
2711 fn large_string_single_column() {
2712 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2713 let raw_strs = raw_values.iter().map(|s| s.as_str());
2714
2715 required_and_optional::<LargeStringArray, _>(raw_strs);
2716 }
2717
2718 #[test]
2719 fn string_view_single_column() {
2720 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2721 let raw_strs = raw_values.iter().map(|s| s.as_str());
2722
2723 required_and_optional::<StringViewArray, _>(raw_strs);
2724 }
2725
2726 #[test]
2727 fn null_list_single_column() {
2728 let null_field = Field::new_list_field(DataType::Null, true);
2729 let list_field = Field::new("emptylist", DataType::List(Arc::new(null_field)), true);
2730
2731 let schema = Schema::new(vec![list_field]);
2732
2733 let a_values = NullArray::new(2);
2735 let a_value_offsets = arrow::buffer::Buffer::from([0, 0, 0, 2].to_byte_slice());
2736 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2737 DataType::Null,
2738 true,
2739 ))))
2740 .len(3)
2741 .add_buffer(a_value_offsets)
2742 .null_bit_buffer(Some(Buffer::from([0b00000101])))
2743 .add_child_data(a_values.into_data())
2744 .build()
2745 .unwrap();
2746
2747 let a = ListArray::from(a_list_data);
2748
2749 assert!(a.is_valid(0));
2750 assert!(!a.is_valid(1));
2751 assert!(a.is_valid(2));
2752
2753 assert_eq!(a.value(0).len(), 0);
2754 assert_eq!(a.value(2).len(), 2);
2755 assert_eq!(a.value(2).logical_nulls().unwrap().null_count(), 2);
2756
2757 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2758 roundtrip(batch, None);
2759 }
2760
2761 #[test]
2762 fn list_single_column() {
2763 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2764 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
2765 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2766 DataType::Int32,
2767 false,
2768 ))))
2769 .len(5)
2770 .add_buffer(a_value_offsets)
2771 .null_bit_buffer(Some(Buffer::from([0b00011011])))
2772 .add_child_data(a_values.into_data())
2773 .build()
2774 .unwrap();
2775
2776 assert_eq!(a_list_data.null_count(), 1);
2777
2778 let a = ListArray::from(a_list_data);
2779 let values = Arc::new(a);
2780
2781 one_column_roundtrip(values, true);
2782 }
2783
2784 #[test]
2785 fn large_list_single_column() {
2786 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2787 let a_value_offsets = arrow::buffer::Buffer::from([0i64, 1, 3, 3, 6, 10].to_byte_slice());
2788 let a_list_data = ArrayData::builder(DataType::LargeList(Arc::new(Field::new(
2789 "large_item",
2790 DataType::Int32,
2791 true,
2792 ))))
2793 .len(5)
2794 .add_buffer(a_value_offsets)
2795 .add_child_data(a_values.into_data())
2796 .null_bit_buffer(Some(Buffer::from([0b00011011])))
2797 .build()
2798 .unwrap();
2799
2800 assert_eq!(a_list_data.null_count(), 1);
2802
2803 let a = LargeListArray::from(a_list_data);
2804 let values = Arc::new(a);
2805
2806 one_column_roundtrip(values, true);
2807 }
2808
2809 #[test]
2810 fn list_nested_nulls() {
2811 use arrow::datatypes::Int32Type;
2812 let data = vec![
2813 Some(vec![Some(1)]),
2814 Some(vec![Some(2), Some(3)]),
2815 None,
2816 Some(vec![Some(4), Some(5), None]),
2817 Some(vec![None]),
2818 Some(vec![Some(6), Some(7)]),
2819 ];
2820
2821 let list = ListArray::from_iter_primitive::<Int32Type, _, _>(data.clone());
2822 one_column_roundtrip(Arc::new(list), true);
2823
2824 let list = LargeListArray::from_iter_primitive::<Int32Type, _, _>(data);
2825 one_column_roundtrip(Arc::new(list), true);
2826 }
2827
2828 #[test]
2829 fn struct_single_column() {
2830 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2831 let struct_field_a = Arc::new(Field::new("f", DataType::Int32, false));
2832 let s = StructArray::from(vec![(struct_field_a, Arc::new(a_values) as ArrayRef)]);
2833
2834 let values = Arc::new(s);
2835 one_column_roundtrip(values, false);
2836 }
2837
2838 #[test]
2839 fn list_and_map_coerced_names() {
2840 let list_field =
2842 Field::new_list("my_list", Field::new("item", DataType::Int32, false), false);
2843 let map_field = Field::new_map(
2844 "my_map",
2845 "entries",
2846 Field::new("keys", DataType::Int32, false),
2847 Field::new("values", DataType::Int32, true),
2848 false,
2849 true,
2850 );
2851
2852 let list_array = create_random_array(&list_field, 100, 0.0, 0.0).unwrap();
2853 let map_array = create_random_array(&map_field, 100, 0.0, 0.0).unwrap();
2854
2855 let arrow_schema = Arc::new(Schema::new(vec![list_field, map_field]));
2856
2857 let props = Some(WriterProperties::builder().set_coerce_types(true).build());
2859 let file = tempfile::tempfile().unwrap();
2860 let mut writer =
2861 ArrowWriter::try_new(file.try_clone().unwrap(), arrow_schema.clone(), props).unwrap();
2862
2863 let batch = RecordBatch::try_new(arrow_schema, vec![list_array, map_array]).unwrap();
2864 writer.write(&batch).unwrap();
2865 let file_metadata = writer.close().unwrap();
2866
2867 assert_eq!(file_metadata.schema[3].name, "element");
2869 assert_eq!(file_metadata.schema[5].name, "key_value");
2871 assert_eq!(file_metadata.schema[6].name, "key");
2873 assert_eq!(file_metadata.schema[7].name, "value");
2875
2876 let reader = SerializedFileReader::new(file).unwrap();
2878 let file_schema = reader.metadata().file_metadata().schema();
2879 let fields = file_schema.get_fields();
2880 let list_field = &fields[0].get_fields()[0];
2881 assert_eq!(list_field.get_fields()[0].name(), "element");
2882 let map_field = &fields[1].get_fields()[0];
2883 assert_eq!(map_field.name(), "key_value");
2884 assert_eq!(map_field.get_fields()[0].name(), "key");
2885 assert_eq!(map_field.get_fields()[1].name(), "value");
2886 }
2887
2888 #[test]
2889 fn fallback_flush_data_page() {
2890 let raw_values: Vec<_> = (0..MEDIUM_SIZE).map(|i| i.to_string()).collect();
2892 let values = Arc::new(StringArray::from(raw_values));
2893 let encodings = vec![
2894 Encoding::DELTA_BYTE_ARRAY,
2895 Encoding::DELTA_LENGTH_BYTE_ARRAY,
2896 ];
2897 let data_type = values.data_type().clone();
2898 let schema = Arc::new(Schema::new(vec![Field::new("col", data_type, false)]));
2899 let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2900
2901 let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
2902 let data_page_size_limit: usize = 32;
2903 let write_batch_size: usize = 16;
2904
2905 for encoding in &encodings {
2906 for row_group_size in row_group_sizes {
2907 let props = WriterProperties::builder()
2908 .set_writer_version(WriterVersion::PARQUET_2_0)
2909 .set_max_row_group_size(row_group_size)
2910 .set_dictionary_enabled(false)
2911 .set_encoding(*encoding)
2912 .set_data_page_size_limit(data_page_size_limit)
2913 .set_write_batch_size(write_batch_size)
2914 .build();
2915
2916 roundtrip_opts_with_array_validation(&expected_batch, props, |a, b| {
2917 let string_array_a = StringArray::from(a.clone());
2918 let string_array_b = StringArray::from(b.clone());
2919 let vec_a: Vec<&str> = string_array_a.iter().map(|v| v.unwrap()).collect();
2920 let vec_b: Vec<&str> = string_array_b.iter().map(|v| v.unwrap()).collect();
2921 assert_eq!(
2922 vec_a, vec_b,
2923 "failed for encoder: {encoding:?} and row_group_size: {row_group_size:?}"
2924 );
2925 });
2926 }
2927 }
2928 }
2929
2930 #[test]
2931 fn arrow_writer_string_dictionary() {
2932 #[allow(deprecated)]
2934 let schema = Arc::new(Schema::new(vec![Field::new_dict(
2935 "dictionary",
2936 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2937 true,
2938 42,
2939 true,
2940 )]));
2941
2942 let d: Int32DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
2944 .iter()
2945 .copied()
2946 .collect();
2947
2948 one_column_roundtrip_with_schema(Arc::new(d), schema);
2950 }
2951
2952 #[test]
2953 fn arrow_writer_primitive_dictionary() {
2954 #[allow(deprecated)]
2956 let schema = Arc::new(Schema::new(vec![Field::new_dict(
2957 "dictionary",
2958 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::UInt32)),
2959 true,
2960 42,
2961 true,
2962 )]));
2963
2964 let mut builder = PrimitiveDictionaryBuilder::<UInt8Type, UInt32Type>::new();
2966 builder.append(12345678).unwrap();
2967 builder.append_null();
2968 builder.append(22345678).unwrap();
2969 builder.append(12345678).unwrap();
2970 let d = builder.finish();
2971
2972 one_column_roundtrip_with_schema(Arc::new(d), schema);
2973 }
2974
2975 #[test]
2976 fn arrow_writer_decimal128_dictionary() {
2977 let integers = vec![12345, 56789, 34567];
2978
2979 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
2980
2981 let values = Decimal128Array::from(integers.clone())
2982 .with_precision_and_scale(5, 2)
2983 .unwrap();
2984
2985 let array = DictionaryArray::new(keys, Arc::new(values));
2986 one_column_roundtrip(Arc::new(array.clone()), true);
2987
2988 let values = Decimal128Array::from(integers)
2989 .with_precision_and_scale(12, 2)
2990 .unwrap();
2991
2992 let array = array.with_values(Arc::new(values));
2993 one_column_roundtrip(Arc::new(array), true);
2994 }
2995
2996 #[test]
2997 fn arrow_writer_decimal256_dictionary() {
2998 let integers = vec![
2999 i256::from_i128(12345),
3000 i256::from_i128(56789),
3001 i256::from_i128(34567),
3002 ];
3003
3004 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3005
3006 let values = Decimal256Array::from(integers.clone())
3007 .with_precision_and_scale(5, 2)
3008 .unwrap();
3009
3010 let array = DictionaryArray::new(keys, Arc::new(values));
3011 one_column_roundtrip(Arc::new(array.clone()), true);
3012
3013 let values = Decimal256Array::from(integers)
3014 .with_precision_and_scale(12, 2)
3015 .unwrap();
3016
3017 let array = array.with_values(Arc::new(values));
3018 one_column_roundtrip(Arc::new(array), true);
3019 }
3020
3021 #[test]
3022 fn arrow_writer_string_dictionary_unsigned_index() {
3023 #[allow(deprecated)]
3025 let schema = Arc::new(Schema::new(vec![Field::new_dict(
3026 "dictionary",
3027 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3028 true,
3029 42,
3030 true,
3031 )]));
3032
3033 let d: UInt8DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
3035 .iter()
3036 .copied()
3037 .collect();
3038
3039 one_column_roundtrip_with_schema(Arc::new(d), schema);
3040 }
3041
3042 #[test]
3043 fn u32_min_max() {
3044 let src = [
3046 u32::MIN,
3047 u32::MIN + 1,
3048 (i32::MAX as u32) - 1,
3049 i32::MAX as u32,
3050 (i32::MAX as u32) + 1,
3051 u32::MAX - 1,
3052 u32::MAX,
3053 ];
3054 let values = Arc::new(UInt32Array::from_iter_values(src.iter().cloned()));
3055 let files = one_column_roundtrip(values, false);
3056
3057 for file in files {
3058 let reader = SerializedFileReader::new(file).unwrap();
3060 let metadata = reader.metadata();
3061
3062 let mut row_offset = 0;
3063 for row_group in metadata.row_groups() {
3064 assert_eq!(row_group.num_columns(), 1);
3065 let column = row_group.column(0);
3066
3067 let num_values = column.num_values() as usize;
3068 let src_slice = &src[row_offset..row_offset + num_values];
3069 row_offset += column.num_values() as usize;
3070
3071 let stats = column.statistics().unwrap();
3072 if let Statistics::Int32(stats) = stats {
3073 assert_eq!(
3074 *stats.min_opt().unwrap() as u32,
3075 *src_slice.iter().min().unwrap()
3076 );
3077 assert_eq!(
3078 *stats.max_opt().unwrap() as u32,
3079 *src_slice.iter().max().unwrap()
3080 );
3081 } else {
3082 panic!("Statistics::Int32 missing")
3083 }
3084 }
3085 }
3086 }
3087
3088 #[test]
3089 fn u64_min_max() {
3090 let src = [
3092 u64::MIN,
3093 u64::MIN + 1,
3094 (i64::MAX as u64) - 1,
3095 i64::MAX as u64,
3096 (i64::MAX as u64) + 1,
3097 u64::MAX - 1,
3098 u64::MAX,
3099 ];
3100 let values = Arc::new(UInt64Array::from_iter_values(src.iter().cloned()));
3101 let files = one_column_roundtrip(values, false);
3102
3103 for file in files {
3104 let reader = SerializedFileReader::new(file).unwrap();
3106 let metadata = reader.metadata();
3107
3108 let mut row_offset = 0;
3109 for row_group in metadata.row_groups() {
3110 assert_eq!(row_group.num_columns(), 1);
3111 let column = row_group.column(0);
3112
3113 let num_values = column.num_values() as usize;
3114 let src_slice = &src[row_offset..row_offset + num_values];
3115 row_offset += column.num_values() as usize;
3116
3117 let stats = column.statistics().unwrap();
3118 if let Statistics::Int64(stats) = stats {
3119 assert_eq!(
3120 *stats.min_opt().unwrap() as u64,
3121 *src_slice.iter().min().unwrap()
3122 );
3123 assert_eq!(
3124 *stats.max_opt().unwrap() as u64,
3125 *src_slice.iter().max().unwrap()
3126 );
3127 } else {
3128 panic!("Statistics::Int64 missing")
3129 }
3130 }
3131 }
3132 }
3133
3134 #[test]
3135 fn statistics_null_counts_only_nulls() {
3136 let values = Arc::new(UInt64Array::from(vec![None, None]));
3138 let files = one_column_roundtrip(values, true);
3139
3140 for file in files {
3141 let reader = SerializedFileReader::new(file).unwrap();
3143 let metadata = reader.metadata();
3144 assert_eq!(metadata.num_row_groups(), 1);
3145 let row_group = metadata.row_group(0);
3146 assert_eq!(row_group.num_columns(), 1);
3147 let column = row_group.column(0);
3148 let stats = column.statistics().unwrap();
3149 assert_eq!(stats.null_count_opt(), Some(2));
3150 }
3151 }
3152
3153 #[test]
3154 fn test_list_of_struct_roundtrip() {
3155 let int_field = Field::new("a", DataType::Int32, true);
3157 let int_field2 = Field::new("b", DataType::Int32, true);
3158
3159 let int_builder = Int32Builder::with_capacity(10);
3160 let int_builder2 = Int32Builder::with_capacity(10);
3161
3162 let struct_builder = StructBuilder::new(
3163 vec![int_field, int_field2],
3164 vec![Box::new(int_builder), Box::new(int_builder2)],
3165 );
3166 let mut list_builder = ListBuilder::new(struct_builder);
3167
3168 let values = list_builder.values();
3173 values
3174 .field_builder::<Int32Builder>(0)
3175 .unwrap()
3176 .append_value(1);
3177 values
3178 .field_builder::<Int32Builder>(1)
3179 .unwrap()
3180 .append_value(2);
3181 values.append(true);
3182 list_builder.append(true);
3183
3184 list_builder.append(true);
3186
3187 list_builder.append(false);
3189
3190 let values = list_builder.values();
3192 values
3193 .field_builder::<Int32Builder>(0)
3194 .unwrap()
3195 .append_null();
3196 values
3197 .field_builder::<Int32Builder>(1)
3198 .unwrap()
3199 .append_null();
3200 values.append(false);
3201 values
3202 .field_builder::<Int32Builder>(0)
3203 .unwrap()
3204 .append_null();
3205 values
3206 .field_builder::<Int32Builder>(1)
3207 .unwrap()
3208 .append_null();
3209 values.append(false);
3210 list_builder.append(true);
3211
3212 let values = list_builder.values();
3214 values
3215 .field_builder::<Int32Builder>(0)
3216 .unwrap()
3217 .append_null();
3218 values
3219 .field_builder::<Int32Builder>(1)
3220 .unwrap()
3221 .append_value(3);
3222 values.append(true);
3223 list_builder.append(true);
3224
3225 let values = list_builder.values();
3227 values
3228 .field_builder::<Int32Builder>(0)
3229 .unwrap()
3230 .append_value(2);
3231 values
3232 .field_builder::<Int32Builder>(1)
3233 .unwrap()
3234 .append_null();
3235 values.append(true);
3236 list_builder.append(true);
3237
3238 let array = Arc::new(list_builder.finish());
3239
3240 one_column_roundtrip(array, true);
3241 }
3242
3243 fn row_group_sizes(metadata: &ParquetMetaData) -> Vec<i64> {
3244 metadata.row_groups().iter().map(|x| x.num_rows()).collect()
3245 }
3246
3247 #[test]
3248 fn test_aggregates_records() {
3249 let arrays = [
3250 Int32Array::from((0..100).collect::<Vec<_>>()),
3251 Int32Array::from((0..50).collect::<Vec<_>>()),
3252 Int32Array::from((200..500).collect::<Vec<_>>()),
3253 ];
3254
3255 let schema = Arc::new(Schema::new(vec![Field::new(
3256 "int",
3257 ArrowDataType::Int32,
3258 false,
3259 )]));
3260
3261 let file = tempfile::tempfile().unwrap();
3262
3263 let props = WriterProperties::builder()
3264 .set_max_row_group_size(200)
3265 .build();
3266
3267 let mut writer =
3268 ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
3269
3270 for array in arrays {
3271 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
3272 writer.write(&batch).unwrap();
3273 }
3274
3275 writer.close().unwrap();
3276
3277 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3278 assert_eq!(&row_group_sizes(builder.metadata()), &[200, 200, 50]);
3279
3280 let batches = builder
3281 .with_batch_size(100)
3282 .build()
3283 .unwrap()
3284 .collect::<ArrowResult<Vec<_>>>()
3285 .unwrap();
3286
3287 assert_eq!(batches.len(), 5);
3288 assert!(batches.iter().all(|x| x.num_columns() == 1));
3289
3290 let batch_sizes: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
3291
3292 assert_eq!(&batch_sizes, &[100, 100, 100, 100, 50]);
3293
3294 let values: Vec<_> = batches
3295 .iter()
3296 .flat_map(|x| {
3297 x.column(0)
3298 .as_any()
3299 .downcast_ref::<Int32Array>()
3300 .unwrap()
3301 .values()
3302 .iter()
3303 .cloned()
3304 })
3305 .collect();
3306
3307 let expected_values: Vec<_> = [0..100, 0..50, 200..500].into_iter().flatten().collect();
3308 assert_eq!(&values, &expected_values)
3309 }
3310
3311 #[test]
3312 fn complex_aggregate() {
3313 let field_a = Arc::new(Field::new("leaf_a", DataType::Int32, false));
3315 let field_b = Arc::new(Field::new("leaf_b", DataType::Int32, true));
3316 let struct_a = Arc::new(Field::new(
3317 "struct_a",
3318 DataType::Struct(vec![field_a.clone(), field_b.clone()].into()),
3319 true,
3320 ));
3321
3322 let list_a = Arc::new(Field::new("list", DataType::List(struct_a), true));
3323 let struct_b = Arc::new(Field::new(
3324 "struct_b",
3325 DataType::Struct(vec![list_a.clone()].into()),
3326 false,
3327 ));
3328
3329 let schema = Arc::new(Schema::new(vec![struct_b]));
3330
3331 let field_a_array = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
3333 let field_b_array =
3334 Int32Array::from_iter(vec![Some(1), None, Some(2), None, None, Some(6)]);
3335
3336 let struct_a_array = StructArray::from(vec![
3337 (field_a.clone(), Arc::new(field_a_array) as ArrayRef),
3338 (field_b.clone(), Arc::new(field_b_array) as ArrayRef),
3339 ]);
3340
3341 let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
3342 .len(5)
3343 .add_buffer(Buffer::from_iter(vec![
3344 0_i32, 1_i32, 1_i32, 3_i32, 3_i32, 5_i32,
3345 ]))
3346 .null_bit_buffer(Some(Buffer::from_iter(vec![
3347 true, false, true, false, true,
3348 ])))
3349 .child_data(vec![struct_a_array.into_data()])
3350 .build()
3351 .unwrap();
3352
3353 let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
3354 let struct_b_array = StructArray::from(vec![(list_a.clone(), list_a_array)]);
3355
3356 let batch1 =
3357 RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
3358 .unwrap();
3359
3360 let field_a_array = Int32Array::from(vec![6, 7, 8, 9, 10]);
3361 let field_b_array = Int32Array::from_iter(vec![None, None, None, Some(1), None]);
3362
3363 let struct_a_array = StructArray::from(vec![
3364 (field_a, Arc::new(field_a_array) as ArrayRef),
3365 (field_b, Arc::new(field_b_array) as ArrayRef),
3366 ]);
3367
3368 let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
3369 .len(2)
3370 .add_buffer(Buffer::from_iter(vec![0_i32, 4_i32, 5_i32]))
3371 .child_data(vec![struct_a_array.into_data()])
3372 .build()
3373 .unwrap();
3374
3375 let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
3376 let struct_b_array = StructArray::from(vec![(list_a, list_a_array)]);
3377
3378 let batch2 =
3379 RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
3380 .unwrap();
3381
3382 let batches = &[batch1, batch2];
3383
3384 let expected = r#"
3387 +-------------------------------------------------------------------------------------------------------+
3388 | struct_b |
3389 +-------------------------------------------------------------------------------------------------------+
3390 | {list: [{leaf_a: 1, leaf_b: 1}]} |
3391 | {list: } |
3392 | {list: [{leaf_a: 2, leaf_b: }, {leaf_a: 3, leaf_b: 2}]} |
3393 | {list: } |
3394 | {list: [{leaf_a: 4, leaf_b: }, {leaf_a: 5, leaf_b: }]} |
3395 | {list: [{leaf_a: 6, leaf_b: }, {leaf_a: 7, leaf_b: }, {leaf_a: 8, leaf_b: }, {leaf_a: 9, leaf_b: 1}]} |
3396 | {list: [{leaf_a: 10, leaf_b: }]} |
3397 +-------------------------------------------------------------------------------------------------------+
3398 "#.trim().split('\n').map(|x| x.trim()).collect::<Vec<_>>().join("\n");
3399
3400 let actual = pretty_format_batches(batches).unwrap().to_string();
3401 assert_eq!(actual, expected);
3402
3403 let file = tempfile::tempfile().unwrap();
3405 let props = WriterProperties::builder()
3406 .set_max_row_group_size(6)
3407 .build();
3408
3409 let mut writer =
3410 ArrowWriter::try_new(file.try_clone().unwrap(), schema, Some(props)).unwrap();
3411
3412 for batch in batches {
3413 writer.write(batch).unwrap();
3414 }
3415 writer.close().unwrap();
3416
3417 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3422 assert_eq!(&row_group_sizes(builder.metadata()), &[6, 1]);
3423
3424 let batches = builder
3425 .with_batch_size(2)
3426 .build()
3427 .unwrap()
3428 .collect::<ArrowResult<Vec<_>>>()
3429 .unwrap();
3430
3431 assert_eq!(batches.len(), 4);
3432 let batch_counts: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
3433 assert_eq!(&batch_counts, &[2, 2, 2, 1]);
3434
3435 let actual = pretty_format_batches(&batches).unwrap().to_string();
3436 assert_eq!(actual, expected);
3437 }
3438
3439 #[test]
3440 fn test_arrow_writer_metadata() {
3441 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3442 let file_schema = batch_schema.clone().with_metadata(
3443 vec![("foo".to_string(), "bar".to_string())]
3444 .into_iter()
3445 .collect(),
3446 );
3447
3448 let batch = RecordBatch::try_new(
3449 Arc::new(batch_schema),
3450 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3451 )
3452 .unwrap();
3453
3454 let mut buf = Vec::with_capacity(1024);
3455 let mut writer = ArrowWriter::try_new(&mut buf, Arc::new(file_schema), None).unwrap();
3456 writer.write(&batch).unwrap();
3457 writer.close().unwrap();
3458 }
3459
3460 #[test]
3461 fn test_arrow_writer_nullable() {
3462 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3463 let file_schema = Schema::new(vec![Field::new("int32", DataType::Int32, true)]);
3464 let file_schema = Arc::new(file_schema);
3465
3466 let batch = RecordBatch::try_new(
3467 Arc::new(batch_schema),
3468 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3469 )
3470 .unwrap();
3471
3472 let mut buf = Vec::with_capacity(1024);
3473 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
3474 writer.write(&batch).unwrap();
3475 writer.close().unwrap();
3476
3477 let mut read = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
3478 let back = read.next().unwrap().unwrap();
3479 assert_eq!(back.schema(), file_schema);
3480 assert_ne!(back.schema(), batch.schema());
3481 assert_eq!(back.column(0).as_ref(), batch.column(0).as_ref());
3482 }
3483
3484 #[test]
3485 fn in_progress_accounting() {
3486 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
3488
3489 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
3491
3492 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
3494
3495 let mut writer = ArrowWriter::try_new(vec![], batch.schema(), None).unwrap();
3496
3497 assert_eq!(writer.in_progress_size(), 0);
3499 assert_eq!(writer.in_progress_rows(), 0);
3500 assert_eq!(writer.memory_size(), 0);
3501 assert_eq!(writer.bytes_written(), 4); writer.write(&batch).unwrap();
3503
3504 let initial_size = writer.in_progress_size();
3506 assert!(initial_size > 0);
3507 assert_eq!(writer.in_progress_rows(), 5);
3508 let initial_memory = writer.memory_size();
3509 assert!(initial_memory > 0);
3510 assert!(
3512 initial_size <= initial_memory,
3513 "{initial_size} <= {initial_memory}"
3514 );
3515
3516 writer.write(&batch).unwrap();
3518 assert!(writer.in_progress_size() > initial_size);
3519 assert_eq!(writer.in_progress_rows(), 10);
3520 assert!(writer.memory_size() > initial_memory);
3521 assert!(
3522 writer.in_progress_size() <= writer.memory_size(),
3523 "in_progress_size {} <= memory_size {}",
3524 writer.in_progress_size(),
3525 writer.memory_size()
3526 );
3527
3528 let pre_flush_bytes_written = writer.bytes_written();
3530 writer.flush().unwrap();
3531 assert_eq!(writer.in_progress_size(), 0);
3532 assert_eq!(writer.memory_size(), 0);
3533 assert!(writer.bytes_written() > pre_flush_bytes_written);
3534
3535 writer.close().unwrap();
3536 }
3537
3538 #[test]
3539 fn test_writer_all_null() {
3540 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
3541 let b = Int32Array::new(vec![0; 5].into(), Some(NullBuffer::new_null(5)));
3542 let batch = RecordBatch::try_from_iter(vec![
3543 ("a", Arc::new(a) as ArrayRef),
3544 ("b", Arc::new(b) as ArrayRef),
3545 ])
3546 .unwrap();
3547
3548 let mut buf = Vec::with_capacity(1024);
3549 let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), None).unwrap();
3550 writer.write(&batch).unwrap();
3551 writer.close().unwrap();
3552
3553 let bytes = Bytes::from(buf);
3554 let options = ReadOptionsBuilder::new().with_page_index().build();
3555 let reader = SerializedFileReader::new_with_options(bytes, options).unwrap();
3556 let index = reader.metadata().offset_index().unwrap();
3557
3558 assert_eq!(index.len(), 1);
3559 assert_eq!(index[0].len(), 2); assert_eq!(index[0][0].page_locations().len(), 1); assert_eq!(index[0][1].page_locations().len(), 1); }
3563
3564 #[test]
3565 fn test_disabled_statistics_with_page() {
3566 let file_schema = Schema::new(vec![
3567 Field::new("a", DataType::Utf8, true),
3568 Field::new("b", DataType::Utf8, true),
3569 ]);
3570 let file_schema = Arc::new(file_schema);
3571
3572 let batch = RecordBatch::try_new(
3573 file_schema.clone(),
3574 vec![
3575 Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
3576 Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
3577 ],
3578 )
3579 .unwrap();
3580
3581 let props = WriterProperties::builder()
3582 .set_statistics_enabled(EnabledStatistics::None)
3583 .set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
3584 .build();
3585
3586 let mut buf = Vec::with_capacity(1024);
3587 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
3588 writer.write(&batch).unwrap();
3589
3590 let metadata = writer.close().unwrap();
3591 assert_eq!(metadata.row_groups.len(), 1);
3592 let row_group = &metadata.row_groups[0];
3593 assert_eq!(row_group.columns.len(), 2);
3594 assert!(row_group.columns[0].offset_index_offset.is_some());
3596 assert!(row_group.columns[0].column_index_offset.is_some());
3597 assert!(row_group.columns[1].offset_index_offset.is_some());
3599 assert!(row_group.columns[1].column_index_offset.is_none());
3600
3601 let options = ReadOptionsBuilder::new().with_page_index().build();
3602 let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
3603
3604 let row_group = reader.get_row_group(0).unwrap();
3605 let a_col = row_group.metadata().column(0);
3606 let b_col = row_group.metadata().column(1);
3607
3608 if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
3610 let min = byte_array_stats.min_opt().unwrap();
3611 let max = byte_array_stats.max_opt().unwrap();
3612
3613 assert_eq!(min.as_bytes(), b"a");
3614 assert_eq!(max.as_bytes(), b"d");
3615 } else {
3616 panic!("expecting Statistics::ByteArray");
3617 }
3618
3619 assert!(b_col.statistics().is_none());
3621
3622 let offset_index = reader.metadata().offset_index().unwrap();
3623 assert_eq!(offset_index.len(), 1); assert_eq!(offset_index[0].len(), 2); let column_index = reader.metadata().column_index().unwrap();
3627 assert_eq!(column_index.len(), 1); assert_eq!(column_index[0].len(), 2); let a_idx = &column_index[0][0];
3631 assert!(matches!(a_idx, Index::BYTE_ARRAY(_)), "{a_idx:?}");
3632 let b_idx = &column_index[0][1];
3633 assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
3634 }
3635
3636 #[test]
3637 fn test_disabled_statistics_with_chunk() {
3638 let file_schema = Schema::new(vec![
3639 Field::new("a", DataType::Utf8, true),
3640 Field::new("b", DataType::Utf8, true),
3641 ]);
3642 let file_schema = Arc::new(file_schema);
3643
3644 let batch = RecordBatch::try_new(
3645 file_schema.clone(),
3646 vec![
3647 Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
3648 Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
3649 ],
3650 )
3651 .unwrap();
3652
3653 let props = WriterProperties::builder()
3654 .set_statistics_enabled(EnabledStatistics::None)
3655 .set_column_statistics_enabled("a".into(), EnabledStatistics::Chunk)
3656 .build();
3657
3658 let mut buf = Vec::with_capacity(1024);
3659 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
3660 writer.write(&batch).unwrap();
3661
3662 let metadata = writer.close().unwrap();
3663 assert_eq!(metadata.row_groups.len(), 1);
3664 let row_group = &metadata.row_groups[0];
3665 assert_eq!(row_group.columns.len(), 2);
3666 assert!(row_group.columns[0].offset_index_offset.is_some());
3668 assert!(row_group.columns[0].column_index_offset.is_none());
3669 assert!(row_group.columns[1].offset_index_offset.is_some());
3671 assert!(row_group.columns[1].column_index_offset.is_none());
3672
3673 let options = ReadOptionsBuilder::new().with_page_index().build();
3674 let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
3675
3676 let row_group = reader.get_row_group(0).unwrap();
3677 let a_col = row_group.metadata().column(0);
3678 let b_col = row_group.metadata().column(1);
3679
3680 if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
3682 let min = byte_array_stats.min_opt().unwrap();
3683 let max = byte_array_stats.max_opt().unwrap();
3684
3685 assert_eq!(min.as_bytes(), b"a");
3686 assert_eq!(max.as_bytes(), b"d");
3687 } else {
3688 panic!("expecting Statistics::ByteArray");
3689 }
3690
3691 assert!(b_col.statistics().is_none());
3693
3694 let column_index = reader.metadata().column_index().unwrap();
3695 assert_eq!(column_index.len(), 1); assert_eq!(column_index[0].len(), 2); let a_idx = &column_index[0][0];
3699 assert!(matches!(a_idx, Index::NONE), "{a_idx:?}");
3700 let b_idx = &column_index[0][1];
3701 assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
3702 }
3703
3704 #[test]
3705 fn test_arrow_writer_skip_metadata() {
3706 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3707 let file_schema = Arc::new(batch_schema.clone());
3708
3709 let batch = RecordBatch::try_new(
3710 Arc::new(batch_schema),
3711 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3712 )
3713 .unwrap();
3714 let skip_options = ArrowWriterOptions::new().with_skip_arrow_metadata(true);
3715
3716 let mut buf = Vec::with_capacity(1024);
3717 let mut writer =
3718 ArrowWriter::try_new_with_options(&mut buf, file_schema.clone(), skip_options).unwrap();
3719 writer.write(&batch).unwrap();
3720 writer.close().unwrap();
3721
3722 let bytes = Bytes::from(buf);
3723 let reader_builder = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
3724 assert_eq!(file_schema, *reader_builder.schema());
3725 if let Some(key_value_metadata) = reader_builder
3726 .metadata()
3727 .file_metadata()
3728 .key_value_metadata()
3729 {
3730 assert!(!key_value_metadata
3731 .iter()
3732 .any(|kv| kv.key.as_str() == ARROW_SCHEMA_META_KEY));
3733 }
3734 }
3735
3736 #[test]
3737 fn mismatched_schemas() {
3738 let batch_schema = Schema::new(vec![Field::new("count", DataType::Int32, false)]);
3739 let file_schema = Arc::new(Schema::new(vec![Field::new(
3740 "temperature",
3741 DataType::Float64,
3742 false,
3743 )]));
3744
3745 let batch = RecordBatch::try_new(
3746 Arc::new(batch_schema),
3747 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3748 )
3749 .unwrap();
3750
3751 let mut buf = Vec::with_capacity(1024);
3752 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
3753
3754 let err = writer.write(&batch).unwrap_err().to_string();
3755 assert_eq!(
3756 err,
3757 "Arrow: Incompatible type. Field 'temperature' has type Float64, array has type Int32"
3758 );
3759 }
3760
3761 #[test]
3762 fn test_roundtrip_empty_schema() {
3764 let empty_batch = RecordBatch::try_new_with_options(
3766 Arc::new(Schema::empty()),
3767 vec![],
3768 &RecordBatchOptions::default().with_row_count(Some(0)),
3769 )
3770 .unwrap();
3771
3772 let mut parquet_bytes: Vec<u8> = Vec::new();
3774 let mut writer =
3775 ArrowWriter::try_new(&mut parquet_bytes, empty_batch.schema(), None).unwrap();
3776 writer.write(&empty_batch).unwrap();
3777 writer.close().unwrap();
3778
3779 let bytes = Bytes::from(parquet_bytes);
3781 let reader = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
3782 assert_eq!(reader.schema(), &empty_batch.schema());
3783 let batches: Vec<_> = reader
3784 .build()
3785 .unwrap()
3786 .collect::<ArrowResult<Vec<_>>>()
3787 .unwrap();
3788 assert_eq!(batches.len(), 0);
3789 }
3790
3791 #[test]
3792 fn test_page_stats_not_written_by_default() {
3793 let string_field = Field::new("a", DataType::Utf8, false);
3794 let schema = Schema::new(vec![string_field]);
3795 let raw_string_values = vec!["Blart Versenwald III"];
3796 let string_values = StringArray::from(raw_string_values.clone());
3797 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
3798
3799 let props = WriterProperties::builder()
3800 .set_statistics_enabled(EnabledStatistics::Page)
3801 .set_dictionary_enabled(false)
3802 .set_encoding(Encoding::PLAIN)
3803 .set_compression(crate::basic::Compression::UNCOMPRESSED)
3804 .build();
3805
3806 let mut file = roundtrip_opts(&batch, props);
3807
3808 let mut buf = vec![];
3811 file.seek(std::io::SeekFrom::Start(0)).unwrap();
3812 let read = file.read_to_end(&mut buf).unwrap();
3813 assert!(read > 0);
3814
3815 let first_page = &buf[4..];
3817 let mut prot = TCompactSliceInputProtocol::new(first_page);
3818 let hdr = PageHeader::read_from_in_protocol(&mut prot).unwrap();
3819 let stats = hdr.data_page_header.unwrap().statistics;
3820
3821 assert!(stats.is_none());
3822 }
3823
3824 #[test]
3825 fn test_page_stats_when_enabled() {
3826 let string_field = Field::new("a", DataType::Utf8, false);
3827 let schema = Schema::new(vec![string_field]);
3828 let raw_string_values = vec!["Blart Versenwald III", "Andrew Lamb"];
3829 let string_values = StringArray::from(raw_string_values.clone());
3830 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
3831
3832 let props = WriterProperties::builder()
3833 .set_statistics_enabled(EnabledStatistics::Page)
3834 .set_dictionary_enabled(false)
3835 .set_encoding(Encoding::PLAIN)
3836 .set_write_page_header_statistics(true)
3837 .set_compression(crate::basic::Compression::UNCOMPRESSED)
3838 .build();
3839
3840 let mut file = roundtrip_opts(&batch, props);
3841
3842 let mut buf = vec![];
3845 file.seek(std::io::SeekFrom::Start(0)).unwrap();
3846 let read = file.read_to_end(&mut buf).unwrap();
3847 assert!(read > 0);
3848
3849 let first_page = &buf[4..];
3851 let mut prot = TCompactSliceInputProtocol::new(first_page);
3852 let hdr = PageHeader::read_from_in_protocol(&mut prot).unwrap();
3853 let stats = hdr.data_page_header.unwrap().statistics;
3854
3855 let stats = stats.unwrap();
3856 assert!(stats.is_max_value_exact.unwrap());
3858 assert!(stats.is_min_value_exact.unwrap());
3859 assert_eq!(stats.max_value.unwrap(), "Blart Versenwald III".as_bytes());
3860 assert_eq!(stats.min_value.unwrap(), "Andrew Lamb".as_bytes());
3861 }
3862
3863 #[test]
3864 fn test_page_stats_truncation() {
3865 let string_field = Field::new("a", DataType::Utf8, false);
3866 let binary_field = Field::new("b", DataType::Binary, false);
3867 let schema = Schema::new(vec![string_field, binary_field]);
3868
3869 let raw_string_values = vec!["Blart Versenwald III"];
3870 let raw_binary_values = [b"Blart Versenwald III".to_vec()];
3871 let raw_binary_value_refs = raw_binary_values
3872 .iter()
3873 .map(|x| x.as_slice())
3874 .collect::<Vec<_>>();
3875
3876 let string_values = StringArray::from(raw_string_values.clone());
3877 let binary_values = BinaryArray::from(raw_binary_value_refs);
3878 let batch = RecordBatch::try_new(
3879 Arc::new(schema),
3880 vec![Arc::new(string_values), Arc::new(binary_values)],
3881 )
3882 .unwrap();
3883
3884 let props = WriterProperties::builder()
3885 .set_statistics_truncate_length(Some(2))
3886 .set_dictionary_enabled(false)
3887 .set_encoding(Encoding::PLAIN)
3888 .set_write_page_header_statistics(true)
3889 .set_compression(crate::basic::Compression::UNCOMPRESSED)
3890 .build();
3891
3892 let mut file = roundtrip_opts(&batch, props);
3893
3894 let mut buf = vec![];
3897 file.seek(std::io::SeekFrom::Start(0)).unwrap();
3898 let read = file.read_to_end(&mut buf).unwrap();
3899 assert!(read > 0);
3900
3901 let first_page = &buf[4..];
3903 let mut prot = TCompactSliceInputProtocol::new(first_page);
3904 let hdr = PageHeader::read_from_in_protocol(&mut prot).unwrap();
3905 let stats = hdr.data_page_header.unwrap().statistics;
3906 assert!(stats.is_some());
3907 let stats = stats.unwrap();
3908 assert!(!stats.is_max_value_exact.unwrap());
3910 assert!(!stats.is_min_value_exact.unwrap());
3911 assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
3912 assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
3913
3914 let second_page = &prot.as_slice()[hdr.compressed_page_size as usize..];
3916 let mut prot = TCompactSliceInputProtocol::new(second_page);
3917 let hdr = PageHeader::read_from_in_protocol(&mut prot).unwrap();
3918 let stats = hdr.data_page_header.unwrap().statistics;
3919 assert!(stats.is_some());
3920 let stats = stats.unwrap();
3921 assert!(!stats.is_max_value_exact.unwrap());
3923 assert!(!stats.is_min_value_exact.unwrap());
3924 assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
3925 assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
3926 }
3927
3928 #[test]
3929 fn test_page_encoding_statistics_roundtrip() {
3930 let batch_schema = Schema::new(vec![Field::new(
3931 "int32",
3932 arrow_schema::DataType::Int32,
3933 false,
3934 )]);
3935
3936 let batch = RecordBatch::try_new(
3937 Arc::new(batch_schema.clone()),
3938 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3939 )
3940 .unwrap();
3941
3942 let mut file: File = tempfile::tempfile().unwrap();
3943 let mut writer = ArrowWriter::try_new(&mut file, Arc::new(batch_schema), None).unwrap();
3944 writer.write(&batch).unwrap();
3945 let file_metadata = writer.close().unwrap();
3946
3947 assert_eq!(file_metadata.row_groups.len(), 1);
3948 assert_eq!(file_metadata.row_groups[0].columns.len(), 1);
3949 let chunk_meta = file_metadata.row_groups[0].columns[0]
3950 .meta_data
3951 .as_ref()
3952 .expect("column metadata missing");
3953 assert!(chunk_meta.encoding_stats.is_some());
3954 let chunk_page_stats = chunk_meta.encoding_stats.as_ref().unwrap();
3955
3956 let options = ReadOptionsBuilder::new().with_page_index().build();
3958 let reader = SerializedFileReader::new_with_options(file, options).unwrap();
3959
3960 let rowgroup = reader.get_row_group(0).expect("row group missing");
3961 assert_eq!(rowgroup.num_columns(), 1);
3962 let column = rowgroup.metadata().column(0);
3963 assert!(column.page_encoding_stats().is_some());
3964 let file_page_stats = column.page_encoding_stats().unwrap();
3965 let chunk_stats: Vec<PageEncodingStats> = chunk_page_stats
3966 .iter()
3967 .map(|x| crate::file::page_encoding_stats::try_from_thrift(x).unwrap())
3968 .collect();
3969 assert_eq!(&chunk_stats, file_page_stats);
3970 }
3971
3972 #[test]
3973 fn test_different_dict_page_size_limit() {
3974 let array = Arc::new(Int64Array::from_iter(0..1024 * 1024));
3975 let schema = Arc::new(Schema::new(vec![
3976 Field::new("col0", arrow_schema::DataType::Int64, false),
3977 Field::new("col1", arrow_schema::DataType::Int64, false),
3978 ]));
3979 let batch =
3980 arrow_array::RecordBatch::try_new(schema.clone(), vec![array.clone(), array]).unwrap();
3981
3982 let props = WriterProperties::builder()
3983 .set_dictionary_page_size_limit(1024 * 1024)
3984 .set_column_dictionary_page_size_limit(ColumnPath::from("col1"), 1024 * 1024 * 4)
3985 .build();
3986 let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap();
3987 writer.write(&batch).unwrap();
3988 let data = Bytes::from(writer.into_inner().unwrap());
3989
3990 let mut metadata = ParquetMetaDataReader::new();
3991 metadata.try_parse(&data).unwrap();
3992 let metadata = metadata.finish().unwrap();
3993 let col0_meta = metadata.row_group(0).column(0);
3994 let col1_meta = metadata.row_group(0).column(1);
3995
3996 let get_dict_page_size = move |meta: &ColumnChunkMetaData| {
3997 let mut reader =
3998 SerializedPageReader::new(Arc::new(data.clone()), meta, 0, None).unwrap();
3999 let page = reader.get_next_page().unwrap().unwrap();
4000 match page {
4001 Page::DictionaryPage { buf, .. } => buf.len(),
4002 _ => panic!("expected DictionaryPage"),
4003 }
4004 };
4005
4006 assert_eq!(get_dict_page_size(col0_meta), 1024 * 1024);
4007 assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4);
4008 }
4009}