1use bytes::Bytes;
21use std::io::{Read, Write};
22use std::iter::Peekable;
23use std::slice::Iter;
24use std::sync::{Arc, Mutex};
25use std::vec::IntoIter;
26use thrift::protocol::TCompactOutputProtocol;
27
28use arrow_array::cast::AsArray;
29use arrow_array::types::*;
30use arrow_array::{ArrayRef, RecordBatch, RecordBatchWriter};
31use arrow_schema::{ArrowError, DataType as ArrowDataType, Field, IntervalUnit, SchemaRef};
32
33use super::schema::{add_encoded_arrow_schema_to_metadata, decimal_length_from_precision};
34
35use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder;
36use crate::arrow::ArrowSchemaConverter;
37use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter};
38use crate::column::page_encryption::PageEncryptor;
39use crate::column::writer::encoder::ColumnValueEncoder;
40use crate::column::writer::{
41 get_column_writer, ColumnCloseResult, ColumnWriter, GenericColumnWriter,
42};
43use crate::data_type::{ByteArray, FixedLenByteArray};
44#[cfg(feature = "encryption")]
45use crate::encryption::encrypt::FileEncryptor;
46use crate::errors::{ParquetError, Result};
47use crate::file::metadata::{KeyValue, RowGroupMetaData};
48use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
49use crate::file::reader::{ChunkReader, Length};
50use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
51use crate::schema::types::{ColumnDescPtr, SchemaDescriptor};
52use crate::thrift::TSerializable;
53use levels::{calculate_array_levels, ArrayLevels};
54
55mod byte_array;
56mod levels;
57
58pub struct ArrowWriter<W: Write> {
132 writer: SerializedFileWriter<W>,
134
135 in_progress: Option<ArrowRowGroupWriter>,
137
138 arrow_schema: SchemaRef,
142
143 row_group_writer_factory: ArrowRowGroupWriterFactory,
145
146 max_row_group_size: usize,
148}
149
150impl<W: Write + Send> std::fmt::Debug for ArrowWriter<W> {
151 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
152 let buffered_memory = self.in_progress_size();
153 f.debug_struct("ArrowWriter")
154 .field("writer", &self.writer)
155 .field("in_progress_size", &format_args!("{buffered_memory} bytes"))
156 .field("in_progress_rows", &self.in_progress_rows())
157 .field("arrow_schema", &self.arrow_schema)
158 .field("max_row_group_size", &self.max_row_group_size)
159 .finish()
160 }
161}
162
163impl<W: Write + Send> ArrowWriter<W> {
164 pub fn try_new(
170 writer: W,
171 arrow_schema: SchemaRef,
172 props: Option<WriterProperties>,
173 ) -> Result<Self> {
174 let options = ArrowWriterOptions::new().with_properties(props.unwrap_or_default());
175 Self::try_new_with_options(writer, arrow_schema, options)
176 }
177
178 pub fn try_new_with_options(
184 writer: W,
185 arrow_schema: SchemaRef,
186 options: ArrowWriterOptions,
187 ) -> Result<Self> {
188 let mut props = options.properties;
189 let mut converter = ArrowSchemaConverter::new().with_coerce_types(props.coerce_types());
190 if let Some(schema_root) = &options.schema_root {
191 converter = converter.schema_root(schema_root);
192 }
193 let schema = converter.convert(&arrow_schema)?;
194 if !options.skip_arrow_metadata {
195 add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
197 }
198
199 let max_row_group_size = props.max_row_group_size();
200
201 let file_writer =
202 SerializedFileWriter::new(writer, schema.root_schema_ptr(), Arc::new(props))?;
203
204 let row_group_writer_factory = ArrowRowGroupWriterFactory::new(&file_writer);
205
206 Ok(Self {
207 writer: file_writer,
208 in_progress: None,
209 arrow_schema,
210 row_group_writer_factory,
211 max_row_group_size,
212 })
213 }
214
215 pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
217 self.writer.flushed_row_groups()
218 }
219
220 pub fn memory_size(&self) -> usize {
225 match &self.in_progress {
226 Some(in_progress) => in_progress.writers.iter().map(|x| x.memory_size()).sum(),
227 None => 0,
228 }
229 }
230
231 pub fn in_progress_size(&self) -> usize {
238 match &self.in_progress {
239 Some(in_progress) => in_progress
240 .writers
241 .iter()
242 .map(|x| x.get_estimated_total_bytes())
243 .sum(),
244 None => 0,
245 }
246 }
247
248 pub fn in_progress_rows(&self) -> usize {
250 self.in_progress
251 .as_ref()
252 .map(|x| x.buffered_rows)
253 .unwrap_or_default()
254 }
255
256 pub fn bytes_written(&self) -> usize {
258 self.writer.bytes_written()
259 }
260
261 pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
269 if batch.num_rows() == 0 {
270 return Ok(());
271 }
272
273 let in_progress = match &mut self.in_progress {
274 Some(in_progress) => in_progress,
275 x => x.insert(self.row_group_writer_factory.create_row_group_writer(
276 self.writer.schema_descr(),
277 self.writer.properties(),
278 &self.arrow_schema,
279 self.writer.flushed_row_groups().len(),
280 )?),
281 };
282
283 if in_progress.buffered_rows + batch.num_rows() > self.max_row_group_size {
285 let to_write = self.max_row_group_size - in_progress.buffered_rows;
286 let a = batch.slice(0, to_write);
287 let b = batch.slice(to_write, batch.num_rows() - to_write);
288 self.write(&a)?;
289 return self.write(&b);
290 }
291
292 in_progress.write(batch)?;
293
294 if in_progress.buffered_rows >= self.max_row_group_size {
295 self.flush()?
296 }
297 Ok(())
298 }
299
300 pub fn flush(&mut self) -> Result<()> {
302 let in_progress = match self.in_progress.take() {
303 Some(in_progress) => in_progress,
304 None => return Ok(()),
305 };
306
307 let mut row_group_writer = self.writer.next_row_group()?;
308 for chunk in in_progress.close()? {
309 chunk.append_to_row_group(&mut row_group_writer)?;
310 }
311 row_group_writer.close()?;
312 Ok(())
313 }
314
315 pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
319 self.writer.append_key_value_metadata(kv_metadata)
320 }
321
322 pub fn inner(&self) -> &W {
324 self.writer.inner()
325 }
326
327 pub fn inner_mut(&mut self) -> &mut W {
332 self.writer.inner_mut()
333 }
334
335 pub fn into_inner(mut self) -> Result<W> {
337 self.flush()?;
338 self.writer.into_inner()
339 }
340
341 pub fn finish(&mut self) -> Result<crate::format::FileMetaData> {
347 self.flush()?;
348 self.writer.finish()
349 }
350
351 pub fn close(mut self) -> Result<crate::format::FileMetaData> {
353 self.finish()
354 }
355}
356
357impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> {
358 fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
359 self.write(batch).map_err(|e| e.into())
360 }
361
362 fn close(self) -> std::result::Result<(), ArrowError> {
363 self.close()?;
364 Ok(())
365 }
366}
367
368#[derive(Debug, Clone, Default)]
372pub struct ArrowWriterOptions {
373 properties: WriterProperties,
374 skip_arrow_metadata: bool,
375 schema_root: Option<String>,
376}
377
378impl ArrowWriterOptions {
379 pub fn new() -> Self {
381 Self::default()
382 }
383
384 pub fn with_properties(self, properties: WriterProperties) -> Self {
386 Self { properties, ..self }
387 }
388
389 pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
396 Self {
397 skip_arrow_metadata,
398 ..self
399 }
400 }
401
402 pub fn with_schema_root(self, schema_root: String) -> Self {
404 Self {
405 schema_root: Some(schema_root),
406 ..self
407 }
408 }
409}
410
411#[derive(Default)]
413struct ArrowColumnChunkData {
414 length: usize,
415 data: Vec<Bytes>,
416}
417
418impl Length for ArrowColumnChunkData {
419 fn len(&self) -> u64 {
420 self.length as _
421 }
422}
423
424impl ChunkReader for ArrowColumnChunkData {
425 type T = ArrowColumnChunkReader;
426
427 fn get_read(&self, start: u64) -> Result<Self::T> {
428 assert_eq!(start, 0); Ok(ArrowColumnChunkReader(
430 self.data.clone().into_iter().peekable(),
431 ))
432 }
433
434 fn get_bytes(&self, _start: u64, _length: usize) -> Result<Bytes> {
435 unimplemented!()
436 }
437}
438
439struct ArrowColumnChunkReader(Peekable<IntoIter<Bytes>>);
441
442impl Read for ArrowColumnChunkReader {
443 fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
444 let buffer = loop {
445 match self.0.peek_mut() {
446 Some(b) if b.is_empty() => {
447 self.0.next();
448 continue;
449 }
450 Some(b) => break b,
451 None => return Ok(0),
452 }
453 };
454
455 let len = buffer.len().min(out.len());
456 let b = buffer.split_to(len);
457 out[..len].copy_from_slice(&b);
458 Ok(len)
459 }
460}
461
462type SharedColumnChunk = Arc<Mutex<ArrowColumnChunkData>>;
467
468#[derive(Default)]
469struct ArrowPageWriter {
470 buffer: SharedColumnChunk,
471 #[cfg(feature = "encryption")]
472 page_encryptor: Option<PageEncryptor>,
473}
474
475impl ArrowPageWriter {
476 #[cfg(feature = "encryption")]
477 pub fn with_encryptor(mut self, page_encryptor: Option<PageEncryptor>) -> Self {
478 self.page_encryptor = page_encryptor;
479 self
480 }
481
482 #[cfg(feature = "encryption")]
483 fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
484 self.page_encryptor.as_mut()
485 }
486
487 #[cfg(not(feature = "encryption"))]
488 fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
489 None
490 }
491}
492
493impl PageWriter for ArrowPageWriter {
494 fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
495 let page = match self.page_encryptor_mut() {
496 Some(page_encryptor) => page_encryptor.encrypt_compressed_page(page)?,
497 None => page,
498 };
499
500 let page_header = page.to_thrift_header();
501 let header = {
502 let mut header = Vec::with_capacity(1024);
503
504 match self.page_encryptor_mut() {
505 Some(page_encryptor) => {
506 page_encryptor.encrypt_page_header(&page_header, &mut header)?;
507 if page.compressed_page().is_data_page() {
508 page_encryptor.increment_page();
509 }
510 }
511 None => {
512 let mut protocol = TCompactOutputProtocol::new(&mut header);
513 page_header.write_to_out_protocol(&mut protocol)?;
514 }
515 };
516
517 Bytes::from(header)
518 };
519
520 let mut buf = self.buffer.try_lock().unwrap();
521
522 let data = page.compressed_page().buffer().clone();
523 let compressed_size = data.len() + header.len();
524
525 let mut spec = PageWriteSpec::new();
526 spec.page_type = page.page_type();
527 spec.num_values = page.num_values();
528 spec.uncompressed_size = page.uncompressed_size() + header.len();
529 spec.offset = buf.length as u64;
530 spec.compressed_size = compressed_size;
531 spec.bytes_written = compressed_size as u64;
532
533 buf.length += compressed_size;
534 buf.data.push(header);
535 buf.data.push(data);
536
537 Ok(spec)
538 }
539
540 fn close(&mut self) -> Result<()> {
541 Ok(())
542 }
543}
544
545#[derive(Debug)]
547pub struct ArrowLeafColumn(ArrayLevels);
548
549pub fn compute_leaves(field: &Field, array: &ArrayRef) -> Result<Vec<ArrowLeafColumn>> {
551 let levels = calculate_array_levels(array, field)?;
552 Ok(levels.into_iter().map(ArrowLeafColumn).collect())
553}
554
555pub struct ArrowColumnChunk {
557 data: ArrowColumnChunkData,
558 close: ColumnCloseResult,
559}
560
561impl std::fmt::Debug for ArrowColumnChunk {
562 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
563 f.debug_struct("ArrowColumnChunk")
564 .field("length", &self.data.length)
565 .finish_non_exhaustive()
566 }
567}
568
569impl ArrowColumnChunk {
570 pub fn append_to_row_group<W: Write + Send>(
572 self,
573 writer: &mut SerializedRowGroupWriter<'_, W>,
574 ) -> Result<()> {
575 writer.append_column(&self.data, self.close)
576 }
577}
578
579pub struct ArrowColumnWriter {
673 writer: ArrowColumnWriterImpl,
674 chunk: SharedColumnChunk,
675}
676
677impl std::fmt::Debug for ArrowColumnWriter {
678 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
679 f.debug_struct("ArrowColumnWriter").finish_non_exhaustive()
680 }
681}
682
683enum ArrowColumnWriterImpl {
684 ByteArray(GenericColumnWriter<'static, ByteArrayEncoder>),
685 Column(ColumnWriter<'static>),
686}
687
688impl ArrowColumnWriter {
689 pub fn write(&mut self, col: &ArrowLeafColumn) -> Result<()> {
691 match &mut self.writer {
692 ArrowColumnWriterImpl::Column(c) => {
693 write_leaf(c, &col.0)?;
694 }
695 ArrowColumnWriterImpl::ByteArray(c) => {
696 write_primitive(c, col.0.array().as_ref(), &col.0)?;
697 }
698 }
699 Ok(())
700 }
701
702 pub fn close(self) -> Result<ArrowColumnChunk> {
704 let close = match self.writer {
705 ArrowColumnWriterImpl::ByteArray(c) => c.close()?,
706 ArrowColumnWriterImpl::Column(c) => c.close()?,
707 };
708 let chunk = Arc::try_unwrap(self.chunk).ok().unwrap();
709 let data = chunk.into_inner().unwrap();
710 Ok(ArrowColumnChunk { data, close })
711 }
712
713 pub fn memory_size(&self) -> usize {
724 match &self.writer {
725 ArrowColumnWriterImpl::ByteArray(c) => c.memory_size(),
726 ArrowColumnWriterImpl::Column(c) => c.memory_size(),
727 }
728 }
729
730 pub fn get_estimated_total_bytes(&self) -> usize {
738 match &self.writer {
739 ArrowColumnWriterImpl::ByteArray(c) => c.get_estimated_total_bytes() as _,
740 ArrowColumnWriterImpl::Column(c) => c.get_estimated_total_bytes() as _,
741 }
742 }
743}
744
745struct ArrowRowGroupWriter {
747 writers: Vec<ArrowColumnWriter>,
748 schema: SchemaRef,
749 buffered_rows: usize,
750}
751
752impl ArrowRowGroupWriter {
753 fn new(writers: Vec<ArrowColumnWriter>, arrow: &SchemaRef) -> Self {
754 Self {
755 writers,
756 schema: arrow.clone(),
757 buffered_rows: 0,
758 }
759 }
760
761 fn write(&mut self, batch: &RecordBatch) -> Result<()> {
762 self.buffered_rows += batch.num_rows();
763 let mut writers = self.writers.iter_mut();
764 for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
765 for leaf in compute_leaves(field.as_ref(), column)? {
766 writers.next().unwrap().write(&leaf)?
767 }
768 }
769 Ok(())
770 }
771
772 fn close(self) -> Result<Vec<ArrowColumnChunk>> {
773 self.writers
774 .into_iter()
775 .map(|writer| writer.close())
776 .collect()
777 }
778}
779
780struct ArrowRowGroupWriterFactory {
781 #[cfg(feature = "encryption")]
782 file_encryptor: Option<Arc<FileEncryptor>>,
783}
784
785impl ArrowRowGroupWriterFactory {
786 #[cfg(feature = "encryption")]
787 fn new<W: Write + Send>(file_writer: &SerializedFileWriter<W>) -> Self {
788 Self {
789 file_encryptor: file_writer.file_encryptor(),
790 }
791 }
792
793 #[cfg(not(feature = "encryption"))]
794 fn new<W: Write + Send>(_file_writer: &SerializedFileWriter<W>) -> Self {
795 Self {}
796 }
797
798 #[cfg(feature = "encryption")]
799 fn create_row_group_writer(
800 &self,
801 parquet: &SchemaDescriptor,
802 props: &WriterPropertiesPtr,
803 arrow: &SchemaRef,
804 row_group_index: usize,
805 ) -> Result<ArrowRowGroupWriter> {
806 let writers = get_column_writers_with_encryptor(
807 parquet,
808 props,
809 arrow,
810 self.file_encryptor.clone(),
811 row_group_index,
812 )?;
813 Ok(ArrowRowGroupWriter::new(writers, arrow))
814 }
815
816 #[cfg(not(feature = "encryption"))]
817 fn create_row_group_writer(
818 &self,
819 parquet: &SchemaDescriptor,
820 props: &WriterPropertiesPtr,
821 arrow: &SchemaRef,
822 _row_group_index: usize,
823 ) -> Result<ArrowRowGroupWriter> {
824 let writers = get_column_writers(parquet, props, arrow)?;
825 Ok(ArrowRowGroupWriter::new(writers, arrow))
826 }
827}
828
829pub fn get_column_writers(
831 parquet: &SchemaDescriptor,
832 props: &WriterPropertiesPtr,
833 arrow: &SchemaRef,
834) -> Result<Vec<ArrowColumnWriter>> {
835 let mut writers = Vec::with_capacity(arrow.fields.len());
836 let mut leaves = parquet.columns().iter();
837 let column_factory = ArrowColumnWriterFactory::new();
838 for field in &arrow.fields {
839 column_factory.get_arrow_column_writer(
840 field.data_type(),
841 props,
842 &mut leaves,
843 &mut writers,
844 )?;
845 }
846 Ok(writers)
847}
848
849#[cfg(feature = "encryption")]
851fn get_column_writers_with_encryptor(
852 parquet: &SchemaDescriptor,
853 props: &WriterPropertiesPtr,
854 arrow: &SchemaRef,
855 file_encryptor: Option<Arc<FileEncryptor>>,
856 row_group_index: usize,
857) -> Result<Vec<ArrowColumnWriter>> {
858 let mut writers = Vec::with_capacity(arrow.fields.len());
859 let mut leaves = parquet.columns().iter();
860 let column_factory =
861 ArrowColumnWriterFactory::new().with_file_encryptor(row_group_index, file_encryptor);
862 for field in &arrow.fields {
863 column_factory.get_arrow_column_writer(
864 field.data_type(),
865 props,
866 &mut leaves,
867 &mut writers,
868 )?;
869 }
870 Ok(writers)
871}
872
873struct ArrowColumnWriterFactory {
875 #[cfg(feature = "encryption")]
876 row_group_index: usize,
877 #[cfg(feature = "encryption")]
878 file_encryptor: Option<Arc<FileEncryptor>>,
879}
880
881impl ArrowColumnWriterFactory {
882 pub fn new() -> Self {
883 Self {
884 #[cfg(feature = "encryption")]
885 row_group_index: 0,
886 #[cfg(feature = "encryption")]
887 file_encryptor: None,
888 }
889 }
890
891 #[cfg(feature = "encryption")]
892 pub fn with_file_encryptor(
893 mut self,
894 row_group_index: usize,
895 file_encryptor: Option<Arc<FileEncryptor>>,
896 ) -> Self {
897 self.row_group_index = row_group_index;
898 self.file_encryptor = file_encryptor;
899 self
900 }
901
902 #[cfg(feature = "encryption")]
903 fn create_page_writer(
904 &self,
905 column_descriptor: &ColumnDescPtr,
906 column_index: usize,
907 ) -> Result<Box<ArrowPageWriter>> {
908 let column_path = column_descriptor.path().string();
909 let page_encryptor = PageEncryptor::create_if_column_encrypted(
910 &self.file_encryptor,
911 self.row_group_index,
912 column_index,
913 &column_path,
914 )?;
915 Ok(Box::new(
916 ArrowPageWriter::default().with_encryptor(page_encryptor),
917 ))
918 }
919
920 #[cfg(not(feature = "encryption"))]
921 fn create_page_writer(
922 &self,
923 _column_descriptor: &ColumnDescPtr,
924 _column_index: usize,
925 ) -> Result<Box<ArrowPageWriter>> {
926 Ok(Box::<ArrowPageWriter>::default())
927 }
928
929 fn get_arrow_column_writer(
931 &self,
932 data_type: &ArrowDataType,
933 props: &WriterPropertiesPtr,
934 leaves: &mut Iter<'_, ColumnDescPtr>,
935 out: &mut Vec<ArrowColumnWriter>,
936 ) -> Result<()> {
937 let col = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
938 let page_writer = self.create_page_writer(desc, out.len())?;
939 let chunk = page_writer.buffer.clone();
940 let writer = get_column_writer(desc.clone(), props.clone(), page_writer);
941 Ok(ArrowColumnWriter {
942 chunk,
943 writer: ArrowColumnWriterImpl::Column(writer),
944 })
945 };
946
947 let bytes = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
948 let page_writer = self.create_page_writer(desc, out.len())?;
949 let chunk = page_writer.buffer.clone();
950 let writer = GenericColumnWriter::new(desc.clone(), props.clone(), page_writer);
951 Ok(ArrowColumnWriter {
952 chunk,
953 writer: ArrowColumnWriterImpl::ByteArray(writer),
954 })
955 };
956
957 match data_type {
958 _ if data_type.is_primitive() => out.push(col(leaves.next().unwrap())?),
959 ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Boolean | ArrowDataType::Null => out.push(col(leaves.next().unwrap())?),
960 ArrowDataType::LargeBinary
961 | ArrowDataType::Binary
962 | ArrowDataType::Utf8
963 | ArrowDataType::LargeUtf8
964 | ArrowDataType::BinaryView
965 | ArrowDataType::Utf8View => {
966 out.push(bytes(leaves.next().unwrap())?)
967 }
968 ArrowDataType::List(f)
969 | ArrowDataType::LargeList(f)
970 | ArrowDataType::FixedSizeList(f, _) => {
971 self.get_arrow_column_writer(f.data_type(), props, leaves, out)?
972 }
973 ArrowDataType::Struct(fields) => {
974 for field in fields {
975 self.get_arrow_column_writer(field.data_type(), props, leaves, out)?
976 }
977 }
978 ArrowDataType::Map(f, _) => match f.data_type() {
979 ArrowDataType::Struct(f) => {
980 self.get_arrow_column_writer(f[0].data_type(), props, leaves, out)?;
981 self.get_arrow_column_writer(f[1].data_type(), props, leaves, out)?
982 }
983 _ => unreachable!("invalid map type"),
984 }
985 ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
986 ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 | ArrowDataType::Binary | ArrowDataType::LargeBinary => {
987 out.push(bytes(leaves.next().unwrap())?)
988 }
989 ArrowDataType::Utf8View | ArrowDataType::BinaryView => {
990 out.push(bytes(leaves.next().unwrap())?)
991 }
992 ArrowDataType::FixedSizeBinary(_) => {
993 out.push(bytes(leaves.next().unwrap())?)
994 }
995 _ => {
996 out.push(col(leaves.next().unwrap())?)
997 }
998 }
999 _ => return Err(ParquetError::NYI(
1000 format!(
1001 "Attempting to write an Arrow type {data_type:?} to parquet that is not yet implemented"
1002 )
1003 ))
1004 }
1005 Ok(())
1006 }
1007}
1008
1009fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result<usize> {
1010 let column = levels.array().as_ref();
1011 let indices = levels.non_null_indices();
1012 match writer {
1013 ColumnWriter::Int32ColumnWriter(ref mut typed) => {
1014 match column.data_type() {
1015 ArrowDataType::Date64 => {
1016 let array = arrow_cast::cast(column, &ArrowDataType::Date32)?;
1018 let array = arrow_cast::cast(&array, &ArrowDataType::Int32)?;
1019
1020 let array = array.as_primitive::<Int32Type>();
1021 write_primitive(typed, array.values(), levels)
1022 }
1023 ArrowDataType::UInt32 => {
1024 let values = column.as_primitive::<UInt32Type>().values();
1025 let array = values.inner().typed_data::<i32>();
1028 write_primitive(typed, array, levels)
1029 }
1030 ArrowDataType::Decimal128(_, _) => {
1031 let array = column
1033 .as_primitive::<Decimal128Type>()
1034 .unary::<_, Int32Type>(|v| v as i32);
1035 write_primitive(typed, array.values(), levels)
1036 }
1037 ArrowDataType::Decimal256(_, _) => {
1038 let array = column
1040 .as_primitive::<Decimal256Type>()
1041 .unary::<_, Int32Type>(|v| v.as_i128() as i32);
1042 write_primitive(typed, array.values(), levels)
1043 }
1044 ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1045 ArrowDataType::Decimal128(_, _) => {
1046 let array = arrow_cast::cast(column, value_type)?;
1047 let array = array
1048 .as_primitive::<Decimal128Type>()
1049 .unary::<_, Int32Type>(|v| v as i32);
1050 write_primitive(typed, array.values(), levels)
1051 }
1052 ArrowDataType::Decimal256(_, _) => {
1053 let array = arrow_cast::cast(column, value_type)?;
1054 let array = array
1055 .as_primitive::<Decimal256Type>()
1056 .unary::<_, Int32Type>(|v| v.as_i128() as i32);
1057 write_primitive(typed, array.values(), levels)
1058 }
1059 _ => {
1060 let array = arrow_cast::cast(column, &ArrowDataType::Int32)?;
1061 let array = array.as_primitive::<Int32Type>();
1062 write_primitive(typed, array.values(), levels)
1063 }
1064 },
1065 _ => {
1066 let array = arrow_cast::cast(column, &ArrowDataType::Int32)?;
1067 let array = array.as_primitive::<Int32Type>();
1068 write_primitive(typed, array.values(), levels)
1069 }
1070 }
1071 }
1072 ColumnWriter::BoolColumnWriter(ref mut typed) => {
1073 let array = column.as_boolean();
1074 typed.write_batch(
1075 get_bool_array_slice(array, indices).as_slice(),
1076 levels.def_levels(),
1077 levels.rep_levels(),
1078 )
1079 }
1080 ColumnWriter::Int64ColumnWriter(ref mut typed) => {
1081 match column.data_type() {
1082 ArrowDataType::Date64 => {
1083 let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
1084
1085 let array = array.as_primitive::<Int64Type>();
1086 write_primitive(typed, array.values(), levels)
1087 }
1088 ArrowDataType::Int64 => {
1089 let array = column.as_primitive::<Int64Type>();
1090 write_primitive(typed, array.values(), levels)
1091 }
1092 ArrowDataType::UInt64 => {
1093 let values = column.as_primitive::<UInt64Type>().values();
1094 let array = values.inner().typed_data::<i64>();
1097 write_primitive(typed, array, levels)
1098 }
1099 ArrowDataType::Decimal128(_, _) => {
1100 let array = column
1102 .as_primitive::<Decimal128Type>()
1103 .unary::<_, Int64Type>(|v| v as i64);
1104 write_primitive(typed, array.values(), levels)
1105 }
1106 ArrowDataType::Decimal256(_, _) => {
1107 let array = column
1109 .as_primitive::<Decimal256Type>()
1110 .unary::<_, Int64Type>(|v| v.as_i128() as i64);
1111 write_primitive(typed, array.values(), levels)
1112 }
1113 ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1114 ArrowDataType::Decimal128(_, _) => {
1115 let array = arrow_cast::cast(column, value_type)?;
1116 let array = array
1117 .as_primitive::<Decimal128Type>()
1118 .unary::<_, Int64Type>(|v| v as i64);
1119 write_primitive(typed, array.values(), levels)
1120 }
1121 ArrowDataType::Decimal256(_, _) => {
1122 let array = arrow_cast::cast(column, value_type)?;
1123 let array = array
1124 .as_primitive::<Decimal256Type>()
1125 .unary::<_, Int64Type>(|v| v.as_i128() as i64);
1126 write_primitive(typed, array.values(), levels)
1127 }
1128 _ => {
1129 let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
1130 let array = array.as_primitive::<Int64Type>();
1131 write_primitive(typed, array.values(), levels)
1132 }
1133 },
1134 _ => {
1135 let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
1136 let array = array.as_primitive::<Int64Type>();
1137 write_primitive(typed, array.values(), levels)
1138 }
1139 }
1140 }
1141 ColumnWriter::Int96ColumnWriter(ref mut _typed) => {
1142 unreachable!("Currently unreachable because data type not supported")
1143 }
1144 ColumnWriter::FloatColumnWriter(ref mut typed) => {
1145 let array = column.as_primitive::<Float32Type>();
1146 write_primitive(typed, array.values(), levels)
1147 }
1148 ColumnWriter::DoubleColumnWriter(ref mut typed) => {
1149 let array = column.as_primitive::<Float64Type>();
1150 write_primitive(typed, array.values(), levels)
1151 }
1152 ColumnWriter::ByteArrayColumnWriter(_) => {
1153 unreachable!("should use ByteArrayWriter")
1154 }
1155 ColumnWriter::FixedLenByteArrayColumnWriter(ref mut typed) => {
1156 let bytes = match column.data_type() {
1157 ArrowDataType::Interval(interval_unit) => match interval_unit {
1158 IntervalUnit::YearMonth => {
1159 let array = column
1160 .as_any()
1161 .downcast_ref::<arrow_array::IntervalYearMonthArray>()
1162 .unwrap();
1163 get_interval_ym_array_slice(array, indices)
1164 }
1165 IntervalUnit::DayTime => {
1166 let array = column
1167 .as_any()
1168 .downcast_ref::<arrow_array::IntervalDayTimeArray>()
1169 .unwrap();
1170 get_interval_dt_array_slice(array, indices)
1171 }
1172 _ => {
1173 return Err(ParquetError::NYI(
1174 format!(
1175 "Attempting to write an Arrow interval type {interval_unit:?} to parquet that is not yet implemented"
1176 )
1177 ));
1178 }
1179 },
1180 ArrowDataType::FixedSizeBinary(_) => {
1181 let array = column
1182 .as_any()
1183 .downcast_ref::<arrow_array::FixedSizeBinaryArray>()
1184 .unwrap();
1185 get_fsb_array_slice(array, indices)
1186 }
1187 ArrowDataType::Decimal128(_, _) => {
1188 let array = column.as_primitive::<Decimal128Type>();
1189 get_decimal_128_array_slice(array, indices)
1190 }
1191 ArrowDataType::Decimal256(_, _) => {
1192 let array = column
1193 .as_any()
1194 .downcast_ref::<arrow_array::Decimal256Array>()
1195 .unwrap();
1196 get_decimal_256_array_slice(array, indices)
1197 }
1198 ArrowDataType::Float16 => {
1199 let array = column.as_primitive::<Float16Type>();
1200 get_float_16_array_slice(array, indices)
1201 }
1202 _ => {
1203 return Err(ParquetError::NYI(
1204 "Attempting to write an Arrow type that is not yet implemented".to_string(),
1205 ));
1206 }
1207 };
1208 typed.write_batch(bytes.as_slice(), levels.def_levels(), levels.rep_levels())
1209 }
1210 }
1211}
1212
1213fn write_primitive<E: ColumnValueEncoder>(
1214 writer: &mut GenericColumnWriter<E>,
1215 values: &E::Values,
1216 levels: &ArrayLevels,
1217) -> Result<usize> {
1218 writer.write_batch_internal(
1219 values,
1220 Some(levels.non_null_indices()),
1221 levels.def_levels(),
1222 levels.rep_levels(),
1223 None,
1224 None,
1225 None,
1226 )
1227}
1228
1229fn get_bool_array_slice(array: &arrow_array::BooleanArray, indices: &[usize]) -> Vec<bool> {
1230 let mut values = Vec::with_capacity(indices.len());
1231 for i in indices {
1232 values.push(array.value(*i))
1233 }
1234 values
1235}
1236
1237fn get_interval_ym_array_slice(
1240 array: &arrow_array::IntervalYearMonthArray,
1241 indices: &[usize],
1242) -> Vec<FixedLenByteArray> {
1243 let mut values = Vec::with_capacity(indices.len());
1244 for i in indices {
1245 let mut value = array.value(*i).to_le_bytes().to_vec();
1246 let mut suffix = vec![0; 8];
1247 value.append(&mut suffix);
1248 values.push(FixedLenByteArray::from(ByteArray::from(value)))
1249 }
1250 values
1251}
1252
1253fn get_interval_dt_array_slice(
1256 array: &arrow_array::IntervalDayTimeArray,
1257 indices: &[usize],
1258) -> Vec<FixedLenByteArray> {
1259 let mut values = Vec::with_capacity(indices.len());
1260 for i in indices {
1261 let mut out = [0; 12];
1262 let value = array.value(*i);
1263 out[4..8].copy_from_slice(&value.days.to_le_bytes());
1264 out[8..12].copy_from_slice(&value.milliseconds.to_le_bytes());
1265 values.push(FixedLenByteArray::from(ByteArray::from(out.to_vec())));
1266 }
1267 values
1268}
1269
1270fn get_decimal_128_array_slice(
1271 array: &arrow_array::Decimal128Array,
1272 indices: &[usize],
1273) -> Vec<FixedLenByteArray> {
1274 let mut values = Vec::with_capacity(indices.len());
1275 let size = decimal_length_from_precision(array.precision());
1276 for i in indices {
1277 let as_be_bytes = array.value(*i).to_be_bytes();
1278 let resized_value = as_be_bytes[(16 - size)..].to_vec();
1279 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1280 }
1281 values
1282}
1283
1284fn get_decimal_256_array_slice(
1285 array: &arrow_array::Decimal256Array,
1286 indices: &[usize],
1287) -> Vec<FixedLenByteArray> {
1288 let mut values = Vec::with_capacity(indices.len());
1289 let size = decimal_length_from_precision(array.precision());
1290 for i in indices {
1291 let as_be_bytes = array.value(*i).to_be_bytes();
1292 let resized_value = as_be_bytes[(32 - size)..].to_vec();
1293 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1294 }
1295 values
1296}
1297
1298fn get_float_16_array_slice(
1299 array: &arrow_array::Float16Array,
1300 indices: &[usize],
1301) -> Vec<FixedLenByteArray> {
1302 let mut values = Vec::with_capacity(indices.len());
1303 for i in indices {
1304 let value = array.value(*i).to_le_bytes().to_vec();
1305 values.push(FixedLenByteArray::from(ByteArray::from(value)));
1306 }
1307 values
1308}
1309
1310fn get_fsb_array_slice(
1311 array: &arrow_array::FixedSizeBinaryArray,
1312 indices: &[usize],
1313) -> Vec<FixedLenByteArray> {
1314 let mut values = Vec::with_capacity(indices.len());
1315 for i in indices {
1316 let value = array.value(*i).to_vec();
1317 values.push(FixedLenByteArray::from(ByteArray::from(value)))
1318 }
1319 values
1320}
1321
1322#[cfg(test)]
1323mod tests {
1324 use super::*;
1325
1326 use std::fs::File;
1327
1328 use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
1329 use crate::arrow::ARROW_SCHEMA_META_KEY;
1330 use arrow::datatypes::ToByteSlice;
1331 use arrow::datatypes::{DataType, Schema};
1332 use arrow::error::Result as ArrowResult;
1333 use arrow::util::data_gen::create_random_array;
1334 use arrow::util::pretty::pretty_format_batches;
1335 use arrow::{array::*, buffer::Buffer};
1336 use arrow_buffer::{i256, IntervalDayTime, IntervalMonthDayNano, NullBuffer};
1337 use arrow_schema::Fields;
1338 use half::f16;
1339 use num::{FromPrimitive, ToPrimitive};
1340
1341 use crate::basic::Encoding;
1342 use crate::data_type::AsBytes;
1343 use crate::file::metadata::ParquetMetaData;
1344 use crate::file::page_index::index::Index;
1345 use crate::file::page_index::index_reader::read_offset_indexes;
1346 use crate::file::properties::{
1347 BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion,
1348 };
1349 use crate::file::serialized_reader::ReadOptionsBuilder;
1350 use crate::file::{
1351 reader::{FileReader, SerializedFileReader},
1352 statistics::Statistics,
1353 };
1354
1355 #[test]
1356 fn arrow_writer() {
1357 let schema = Schema::new(vec![
1359 Field::new("a", DataType::Int32, false),
1360 Field::new("b", DataType::Int32, true),
1361 ]);
1362
1363 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1365 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1366
1367 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
1369
1370 roundtrip(batch, Some(SMALL_SIZE / 2));
1371 }
1372
1373 fn get_bytes_after_close(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1374 let mut buffer = vec![];
1375
1376 let mut writer = ArrowWriter::try_new(&mut buffer, schema, None).unwrap();
1377 writer.write(expected_batch).unwrap();
1378 writer.close().unwrap();
1379
1380 buffer
1381 }
1382
1383 fn get_bytes_by_into_inner(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1384 let mut writer = ArrowWriter::try_new(Vec::new(), schema, None).unwrap();
1385 writer.write(expected_batch).unwrap();
1386 writer.into_inner().unwrap()
1387 }
1388
1389 #[test]
1390 fn roundtrip_bytes() {
1391 let schema = Arc::new(Schema::new(vec![
1393 Field::new("a", DataType::Int32, false),
1394 Field::new("b", DataType::Int32, true),
1395 ]));
1396
1397 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1399 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1400
1401 let expected_batch =
1403 RecordBatch::try_new(schema.clone(), vec![Arc::new(a), Arc::new(b)]).unwrap();
1404
1405 for buffer in [
1406 get_bytes_after_close(schema.clone(), &expected_batch),
1407 get_bytes_by_into_inner(schema, &expected_batch),
1408 ] {
1409 let cursor = Bytes::from(buffer);
1410 let mut record_batch_reader = ParquetRecordBatchReader::try_new(cursor, 1024).unwrap();
1411
1412 let actual_batch = record_batch_reader
1413 .next()
1414 .expect("No batch found")
1415 .expect("Unable to get batch");
1416
1417 assert_eq!(expected_batch.schema(), actual_batch.schema());
1418 assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
1419 assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
1420 for i in 0..expected_batch.num_columns() {
1421 let expected_data = expected_batch.column(i).to_data();
1422 let actual_data = actual_batch.column(i).to_data();
1423
1424 assert_eq!(expected_data, actual_data);
1425 }
1426 }
1427 }
1428
1429 #[test]
1430 fn arrow_writer_non_null() {
1431 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1433
1434 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1436
1437 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1439
1440 roundtrip(batch, Some(SMALL_SIZE / 2));
1441 }
1442
1443 #[test]
1444 fn arrow_writer_list() {
1445 let schema = Schema::new(vec![Field::new(
1447 "a",
1448 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
1449 true,
1450 )]);
1451
1452 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1454
1455 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1458
1459 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
1461 DataType::Int32,
1462 false,
1463 ))))
1464 .len(5)
1465 .add_buffer(a_value_offsets)
1466 .add_child_data(a_values.into_data())
1467 .null_bit_buffer(Some(Buffer::from([0b00011011])))
1468 .build()
1469 .unwrap();
1470 let a = ListArray::from(a_list_data);
1471
1472 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1474
1475 assert_eq!(batch.column(0).null_count(), 1);
1476
1477 roundtrip(batch, None);
1480 }
1481
1482 #[test]
1483 fn arrow_writer_list_non_null() {
1484 let schema = Schema::new(vec![Field::new(
1486 "a",
1487 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
1488 false,
1489 )]);
1490
1491 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1493
1494 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1497
1498 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
1500 DataType::Int32,
1501 false,
1502 ))))
1503 .len(5)
1504 .add_buffer(a_value_offsets)
1505 .add_child_data(a_values.into_data())
1506 .build()
1507 .unwrap();
1508 let a = ListArray::from(a_list_data);
1509
1510 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1512
1513 assert_eq!(batch.column(0).null_count(), 0);
1516
1517 roundtrip(batch, None);
1518 }
1519
1520 #[test]
1521 fn arrow_writer_binary() {
1522 let string_field = Field::new("a", DataType::Utf8, false);
1523 let binary_field = Field::new("b", DataType::Binary, false);
1524 let schema = Schema::new(vec![string_field, binary_field]);
1525
1526 let raw_string_values = vec!["foo", "bar", "baz", "quux"];
1527 let raw_binary_values = [
1528 b"foo".to_vec(),
1529 b"bar".to_vec(),
1530 b"baz".to_vec(),
1531 b"quux".to_vec(),
1532 ];
1533 let raw_binary_value_refs = raw_binary_values
1534 .iter()
1535 .map(|x| x.as_slice())
1536 .collect::<Vec<_>>();
1537
1538 let string_values = StringArray::from(raw_string_values.clone());
1539 let binary_values = BinaryArray::from(raw_binary_value_refs);
1540 let batch = RecordBatch::try_new(
1541 Arc::new(schema),
1542 vec![Arc::new(string_values), Arc::new(binary_values)],
1543 )
1544 .unwrap();
1545
1546 roundtrip(batch, Some(SMALL_SIZE / 2));
1547 }
1548
1549 #[test]
1550 fn arrow_writer_binary_view() {
1551 let string_field = Field::new("a", DataType::Utf8View, false);
1552 let binary_field = Field::new("b", DataType::BinaryView, false);
1553 let nullable_string_field = Field::new("a", DataType::Utf8View, true);
1554 let schema = Schema::new(vec![string_field, binary_field, nullable_string_field]);
1555
1556 let raw_string_values = vec!["foo", "bar", "large payload over 12 bytes", "lulu"];
1557 let raw_binary_values = vec![
1558 b"foo".to_vec(),
1559 b"bar".to_vec(),
1560 b"large payload over 12 bytes".to_vec(),
1561 b"lulu".to_vec(),
1562 ];
1563 let nullable_string_values =
1564 vec![Some("foo"), None, Some("large payload over 12 bytes"), None];
1565
1566 let string_view_values = StringViewArray::from(raw_string_values);
1567 let binary_view_values = BinaryViewArray::from_iter_values(raw_binary_values);
1568 let nullable_string_view_values = StringViewArray::from(nullable_string_values);
1569 let batch = RecordBatch::try_new(
1570 Arc::new(schema),
1571 vec![
1572 Arc::new(string_view_values),
1573 Arc::new(binary_view_values),
1574 Arc::new(nullable_string_view_values),
1575 ],
1576 )
1577 .unwrap();
1578
1579 roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
1580 roundtrip(batch, None);
1581 }
1582
1583 fn get_decimal_batch(precision: u8, scale: i8) -> RecordBatch {
1584 let decimal_field = Field::new("a", DataType::Decimal128(precision, scale), false);
1585 let schema = Schema::new(vec![decimal_field]);
1586
1587 let decimal_values = vec![10_000, 50_000, 0, -100]
1588 .into_iter()
1589 .map(Some)
1590 .collect::<Decimal128Array>()
1591 .with_precision_and_scale(precision, scale)
1592 .unwrap();
1593
1594 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(decimal_values)]).unwrap()
1595 }
1596
1597 #[test]
1598 fn arrow_writer_decimal() {
1599 let batch_int32_decimal = get_decimal_batch(5, 2);
1601 roundtrip(batch_int32_decimal, Some(SMALL_SIZE / 2));
1602 let batch_int64_decimal = get_decimal_batch(12, 2);
1604 roundtrip(batch_int64_decimal, Some(SMALL_SIZE / 2));
1605 let batch_fixed_len_byte_array_decimal = get_decimal_batch(30, 2);
1607 roundtrip(batch_fixed_len_byte_array_decimal, Some(SMALL_SIZE / 2));
1608 }
1609
1610 #[test]
1611 fn arrow_writer_complex() {
1612 let struct_field_d = Arc::new(Field::new("d", DataType::Float64, true));
1614 let struct_field_f = Arc::new(Field::new("f", DataType::Float32, true));
1615 let struct_field_g = Arc::new(Field::new_list(
1616 "g",
1617 Field::new_list_field(DataType::Int16, true),
1618 false,
1619 ));
1620 let struct_field_h = Arc::new(Field::new_list(
1621 "h",
1622 Field::new_list_field(DataType::Int16, false),
1623 true,
1624 ));
1625 let struct_field_e = Arc::new(Field::new_struct(
1626 "e",
1627 vec![
1628 struct_field_f.clone(),
1629 struct_field_g.clone(),
1630 struct_field_h.clone(),
1631 ],
1632 false,
1633 ));
1634 let schema = Schema::new(vec![
1635 Field::new("a", DataType::Int32, false),
1636 Field::new("b", DataType::Int32, true),
1637 Field::new_struct(
1638 "c",
1639 vec![struct_field_d.clone(), struct_field_e.clone()],
1640 false,
1641 ),
1642 ]);
1643
1644 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1646 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1647 let d = Float64Array::from(vec![None, None, None, Some(1.0), None]);
1648 let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]);
1649
1650 let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1651
1652 let g_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1655
1656 let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
1658 .len(5)
1659 .add_buffer(g_value_offsets.clone())
1660 .add_child_data(g_value.to_data())
1661 .build()
1662 .unwrap();
1663 let g = ListArray::from(g_list_data);
1664 let h_list_data = ArrayData::builder(struct_field_h.data_type().clone())
1666 .len(5)
1667 .add_buffer(g_value_offsets)
1668 .add_child_data(g_value.to_data())
1669 .null_bit_buffer(Some(Buffer::from([0b00011011])))
1670 .build()
1671 .unwrap();
1672 let h = ListArray::from(h_list_data);
1673
1674 let e = StructArray::from(vec![
1675 (struct_field_f, Arc::new(f) as ArrayRef),
1676 (struct_field_g, Arc::new(g) as ArrayRef),
1677 (struct_field_h, Arc::new(h) as ArrayRef),
1678 ]);
1679
1680 let c = StructArray::from(vec![
1681 (struct_field_d, Arc::new(d) as ArrayRef),
1682 (struct_field_e, Arc::new(e) as ArrayRef),
1683 ]);
1684
1685 let batch = RecordBatch::try_new(
1687 Arc::new(schema),
1688 vec![Arc::new(a), Arc::new(b), Arc::new(c)],
1689 )
1690 .unwrap();
1691
1692 roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
1693 roundtrip(batch, Some(SMALL_SIZE / 3));
1694 }
1695
1696 #[test]
1697 fn arrow_writer_complex_mixed() {
1698 let offset_field = Arc::new(Field::new("offset", DataType::Int32, false));
1703 let partition_field = Arc::new(Field::new("partition", DataType::Int64, true));
1704 let topic_field = Arc::new(Field::new("topic", DataType::Utf8, true));
1705 let schema = Schema::new(vec![Field::new(
1706 "some_nested_object",
1707 DataType::Struct(Fields::from(vec![
1708 offset_field.clone(),
1709 partition_field.clone(),
1710 topic_field.clone(),
1711 ])),
1712 false,
1713 )]);
1714
1715 let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
1717 let partition = Int64Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1718 let topic = StringArray::from(vec![Some("A"), None, Some("A"), Some(""), None]);
1719
1720 let some_nested_object = StructArray::from(vec![
1721 (offset_field, Arc::new(offset) as ArrayRef),
1722 (partition_field, Arc::new(partition) as ArrayRef),
1723 (topic_field, Arc::new(topic) as ArrayRef),
1724 ]);
1725
1726 let batch =
1728 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
1729
1730 roundtrip(batch, Some(SMALL_SIZE / 2));
1731 }
1732
1733 #[test]
1734 fn arrow_writer_map() {
1735 let json_content = r#"
1737 {"stocks":{"long": "$AAA", "short": "$BBB"}}
1738 {"stocks":{"long": null, "long": "$CCC", "short": null}}
1739 {"stocks":{"hedged": "$YYY", "long": null, "short": "$D"}}
1740 "#;
1741 let entries_struct_type = DataType::Struct(Fields::from(vec![
1742 Field::new("key", DataType::Utf8, false),
1743 Field::new("value", DataType::Utf8, true),
1744 ]));
1745 let stocks_field = Field::new(
1746 "stocks",
1747 DataType::Map(
1748 Arc::new(Field::new("entries", entries_struct_type, false)),
1749 false,
1750 ),
1751 true,
1752 );
1753 let schema = Arc::new(Schema::new(vec![stocks_field]));
1754 let builder = arrow::json::ReaderBuilder::new(schema).with_batch_size(64);
1755 let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
1756
1757 let batch = reader.next().unwrap().unwrap();
1758 roundtrip(batch, None);
1759 }
1760
1761 #[test]
1762 fn arrow_writer_2_level_struct() {
1763 let field_c = Field::new("c", DataType::Int32, true);
1765 let field_b = Field::new("b", DataType::Struct(vec![field_c].into()), true);
1766 let type_a = DataType::Struct(vec![field_b.clone()].into());
1767 let field_a = Field::new("a", type_a, true);
1768 let schema = Schema::new(vec![field_a.clone()]);
1769
1770 let c = Int32Array::from(vec![Some(1), None, Some(3), None, None, Some(6)]);
1772 let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
1773 .len(6)
1774 .null_bit_buffer(Some(Buffer::from([0b00100111])))
1775 .add_child_data(c.into_data())
1776 .build()
1777 .unwrap();
1778 let b = StructArray::from(b_data);
1779 let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
1780 .len(6)
1781 .null_bit_buffer(Some(Buffer::from([0b00101111])))
1782 .add_child_data(b.into_data())
1783 .build()
1784 .unwrap();
1785 let a = StructArray::from(a_data);
1786
1787 assert_eq!(a.null_count(), 1);
1788 assert_eq!(a.column(0).null_count(), 2);
1789
1790 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1792
1793 roundtrip(batch, Some(SMALL_SIZE / 2));
1794 }
1795
1796 #[test]
1797 fn arrow_writer_2_level_struct_non_null() {
1798 let field_c = Field::new("c", DataType::Int32, false);
1800 let type_b = DataType::Struct(vec![field_c].into());
1801 let field_b = Field::new("b", type_b.clone(), false);
1802 let type_a = DataType::Struct(vec![field_b].into());
1803 let field_a = Field::new("a", type_a.clone(), false);
1804 let schema = Schema::new(vec![field_a]);
1805
1806 let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
1808 let b_data = ArrayDataBuilder::new(type_b)
1809 .len(6)
1810 .add_child_data(c.into_data())
1811 .build()
1812 .unwrap();
1813 let b = StructArray::from(b_data);
1814 let a_data = ArrayDataBuilder::new(type_a)
1815 .len(6)
1816 .add_child_data(b.into_data())
1817 .build()
1818 .unwrap();
1819 let a = StructArray::from(a_data);
1820
1821 assert_eq!(a.null_count(), 0);
1822 assert_eq!(a.column(0).null_count(), 0);
1823
1824 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1826
1827 roundtrip(batch, Some(SMALL_SIZE / 2));
1828 }
1829
1830 #[test]
1831 fn arrow_writer_2_level_struct_mixed_null() {
1832 let field_c = Field::new("c", DataType::Int32, false);
1834 let type_b = DataType::Struct(vec![field_c].into());
1835 let field_b = Field::new("b", type_b.clone(), true);
1836 let type_a = DataType::Struct(vec![field_b].into());
1837 let field_a = Field::new("a", type_a.clone(), false);
1838 let schema = Schema::new(vec![field_a]);
1839
1840 let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
1842 let b_data = ArrayDataBuilder::new(type_b)
1843 .len(6)
1844 .null_bit_buffer(Some(Buffer::from([0b00100111])))
1845 .add_child_data(c.into_data())
1846 .build()
1847 .unwrap();
1848 let b = StructArray::from(b_data);
1849 let a_data = ArrayDataBuilder::new(type_a)
1851 .len(6)
1852 .add_child_data(b.into_data())
1853 .build()
1854 .unwrap();
1855 let a = StructArray::from(a_data);
1856
1857 assert_eq!(a.null_count(), 0);
1858 assert_eq!(a.column(0).null_count(), 2);
1859
1860 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1862
1863 roundtrip(batch, Some(SMALL_SIZE / 2));
1864 }
1865
1866 #[test]
1867 fn arrow_writer_2_level_struct_mixed_null_2() {
1868 let field_c = Field::new("c", DataType::Int32, false);
1870 let field_d = Field::new("d", DataType::FixedSizeBinary(4), false);
1871 let field_e = Field::new(
1872 "e",
1873 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1874 false,
1875 );
1876
1877 let field_b = Field::new(
1878 "b",
1879 DataType::Struct(vec![field_c, field_d, field_e].into()),
1880 false,
1881 );
1882 let type_a = DataType::Struct(vec![field_b.clone()].into());
1883 let field_a = Field::new("a", type_a, true);
1884 let schema = Schema::new(vec![field_a.clone()]);
1885
1886 let c = Int32Array::from_iter_values(0..6);
1888 let d = FixedSizeBinaryArray::try_from_iter(
1889 ["aaaa", "bbbb", "cccc", "dddd", "eeee", "ffff"].into_iter(),
1890 )
1891 .expect("four byte values");
1892 let e = Int32DictionaryArray::from_iter(["one", "two", "three", "four", "five", "one"]);
1893 let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
1894 .len(6)
1895 .add_child_data(c.into_data())
1896 .add_child_data(d.into_data())
1897 .add_child_data(e.into_data())
1898 .build()
1899 .unwrap();
1900 let b = StructArray::from(b_data);
1901 let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
1902 .len(6)
1903 .null_bit_buffer(Some(Buffer::from([0b00100101])))
1904 .add_child_data(b.into_data())
1905 .build()
1906 .unwrap();
1907 let a = StructArray::from(a_data);
1908
1909 assert_eq!(a.null_count(), 3);
1910 assert_eq!(a.column(0).null_count(), 0);
1911
1912 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1914
1915 roundtrip(batch, Some(SMALL_SIZE / 2));
1916 }
1917
1918 #[test]
1919 fn test_fixed_size_binary_in_dict() {
1920 fn test_fixed_size_binary_in_dict_inner<K>()
1921 where
1922 K: ArrowDictionaryKeyType,
1923 K::Native: FromPrimitive + ToPrimitive + TryFrom<u8>,
1924 <<K as arrow_array::ArrowPrimitiveType>::Native as TryFrom<u8>>::Error: std::fmt::Debug,
1925 {
1926 let field = Field::new(
1927 "a",
1928 DataType::Dictionary(
1929 Box::new(K::DATA_TYPE),
1930 Box::new(DataType::FixedSizeBinary(4)),
1931 ),
1932 false,
1933 );
1934 let schema = Schema::new(vec![field]);
1935
1936 let keys: Vec<K::Native> = vec![
1937 K::Native::try_from(0u8).unwrap(),
1938 K::Native::try_from(0u8).unwrap(),
1939 K::Native::try_from(1u8).unwrap(),
1940 ];
1941 let keys = PrimitiveArray::<K>::from_iter_values(keys);
1942 let values = FixedSizeBinaryArray::try_from_iter(
1943 vec![vec![0, 0, 0, 0], vec![1, 1, 1, 1]].into_iter(),
1944 )
1945 .unwrap();
1946
1947 let data = DictionaryArray::<K>::new(keys, Arc::new(values));
1948 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(data)]).unwrap();
1949 roundtrip(batch, None);
1950 }
1951
1952 test_fixed_size_binary_in_dict_inner::<UInt8Type>();
1953 test_fixed_size_binary_in_dict_inner::<UInt16Type>();
1954 test_fixed_size_binary_in_dict_inner::<UInt32Type>();
1955 test_fixed_size_binary_in_dict_inner::<UInt16Type>();
1956 test_fixed_size_binary_in_dict_inner::<Int8Type>();
1957 test_fixed_size_binary_in_dict_inner::<Int16Type>();
1958 test_fixed_size_binary_in_dict_inner::<Int32Type>();
1959 test_fixed_size_binary_in_dict_inner::<Int64Type>();
1960 }
1961
1962 #[test]
1963 fn test_empty_dict() {
1964 let struct_fields = Fields::from(vec![Field::new(
1965 "dict",
1966 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1967 false,
1968 )]);
1969
1970 let schema = Schema::new(vec![Field::new_struct(
1971 "struct",
1972 struct_fields.clone(),
1973 true,
1974 )]);
1975 let dictionary = Arc::new(DictionaryArray::new(
1976 Int32Array::new_null(5),
1977 Arc::new(StringArray::new_null(0)),
1978 ));
1979
1980 let s = StructArray::new(
1981 struct_fields,
1982 vec![dictionary],
1983 Some(NullBuffer::new_null(5)),
1984 );
1985
1986 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(s)]).unwrap();
1987 roundtrip(batch, None);
1988 }
1989 #[test]
1990 fn arrow_writer_page_size() {
1991 let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)]));
1992
1993 let mut builder = StringBuilder::with_capacity(100, 329 * 10_000);
1994
1995 for i in 0..10 {
1997 let value = i
1998 .to_string()
1999 .repeat(10)
2000 .chars()
2001 .take(10)
2002 .collect::<String>();
2003
2004 builder.append_value(value);
2005 }
2006
2007 let array = Arc::new(builder.finish());
2008
2009 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
2010
2011 let file = tempfile::tempfile().unwrap();
2012
2013 let props = WriterProperties::builder()
2015 .set_data_page_size_limit(1)
2016 .set_dictionary_page_size_limit(1)
2017 .set_write_batch_size(1)
2018 .build();
2019
2020 let mut writer =
2021 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
2022 .expect("Unable to write file");
2023 writer.write(&batch).unwrap();
2024 writer.close().unwrap();
2025
2026 let reader = SerializedFileReader::new(file.try_clone().unwrap()).unwrap();
2027
2028 let column = reader.metadata().row_group(0).columns();
2029
2030 assert_eq!(column.len(), 1);
2031
2032 assert!(
2035 column[0].dictionary_page_offset().is_some(),
2036 "Expected a dictionary page"
2037 );
2038
2039 let offset_indexes = read_offset_indexes(&file, column).unwrap().unwrap();
2040
2041 let page_locations = offset_indexes[0].page_locations.clone();
2042
2043 assert_eq!(
2046 page_locations.len(),
2047 10,
2048 "Expected 9 pages but got {page_locations:#?}"
2049 );
2050 }
2051
2052 #[test]
2053 fn arrow_writer_float_nans() {
2054 let f16_field = Field::new("a", DataType::Float16, false);
2055 let f32_field = Field::new("b", DataType::Float32, false);
2056 let f64_field = Field::new("c", DataType::Float64, false);
2057 let schema = Schema::new(vec![f16_field, f32_field, f64_field]);
2058
2059 let f16_values = (0..MEDIUM_SIZE)
2060 .map(|i| {
2061 Some(if i % 2 == 0 {
2062 f16::NAN
2063 } else {
2064 f16::from_f32(i as f32)
2065 })
2066 })
2067 .collect::<Float16Array>();
2068
2069 let f32_values = (0..MEDIUM_SIZE)
2070 .map(|i| Some(if i % 2 == 0 { f32::NAN } else { i as f32 }))
2071 .collect::<Float32Array>();
2072
2073 let f64_values = (0..MEDIUM_SIZE)
2074 .map(|i| Some(if i % 2 == 0 { f64::NAN } else { i as f64 }))
2075 .collect::<Float64Array>();
2076
2077 let batch = RecordBatch::try_new(
2078 Arc::new(schema),
2079 vec![
2080 Arc::new(f16_values),
2081 Arc::new(f32_values),
2082 Arc::new(f64_values),
2083 ],
2084 )
2085 .unwrap();
2086
2087 roundtrip(batch, None);
2088 }
2089
2090 const SMALL_SIZE: usize = 7;
2091 const MEDIUM_SIZE: usize = 63;
2092
2093 fn roundtrip(expected_batch: RecordBatch, max_row_group_size: Option<usize>) -> Vec<File> {
2094 let mut files = vec![];
2095 for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2096 let mut props = WriterProperties::builder().set_writer_version(version);
2097
2098 if let Some(size) = max_row_group_size {
2099 props = props.set_max_row_group_size(size)
2100 }
2101
2102 let props = props.build();
2103 files.push(roundtrip_opts(&expected_batch, props))
2104 }
2105 files
2106 }
2107
2108 fn roundtrip_opts_with_array_validation<F>(
2109 expected_batch: &RecordBatch,
2110 props: WriterProperties,
2111 validate: F,
2112 ) -> File
2113 where
2114 F: Fn(&ArrayData, &ArrayData),
2115 {
2116 let file = tempfile::tempfile().unwrap();
2117
2118 let mut writer = ArrowWriter::try_new(
2119 file.try_clone().unwrap(),
2120 expected_batch.schema(),
2121 Some(props),
2122 )
2123 .expect("Unable to write file");
2124 writer.write(expected_batch).unwrap();
2125 writer.close().unwrap();
2126
2127 let mut record_batch_reader =
2128 ParquetRecordBatchReader::try_new(file.try_clone().unwrap(), 1024).unwrap();
2129
2130 let actual_batch = record_batch_reader
2131 .next()
2132 .expect("No batch found")
2133 .expect("Unable to get batch");
2134
2135 assert_eq!(expected_batch.schema(), actual_batch.schema());
2136 assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
2137 assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
2138 for i in 0..expected_batch.num_columns() {
2139 let expected_data = expected_batch.column(i).to_data();
2140 let actual_data = actual_batch.column(i).to_data();
2141 validate(&expected_data, &actual_data);
2142 }
2143
2144 file
2145 }
2146
2147 fn roundtrip_opts(expected_batch: &RecordBatch, props: WriterProperties) -> File {
2148 roundtrip_opts_with_array_validation(expected_batch, props, |a, b| {
2149 a.validate_full().expect("valid expected data");
2150 b.validate_full().expect("valid actual data");
2151 assert_eq!(a, b)
2152 })
2153 }
2154
2155 struct RoundTripOptions {
2156 values: ArrayRef,
2157 schema: SchemaRef,
2158 bloom_filter: bool,
2159 bloom_filter_position: BloomFilterPosition,
2160 }
2161
2162 impl RoundTripOptions {
2163 fn new(values: ArrayRef, nullable: bool) -> Self {
2164 let data_type = values.data_type().clone();
2165 let schema = Schema::new(vec![Field::new("col", data_type, nullable)]);
2166 Self {
2167 values,
2168 schema: Arc::new(schema),
2169 bloom_filter: false,
2170 bloom_filter_position: BloomFilterPosition::AfterRowGroup,
2171 }
2172 }
2173 }
2174
2175 fn one_column_roundtrip(values: ArrayRef, nullable: bool) -> Vec<File> {
2176 one_column_roundtrip_with_options(RoundTripOptions::new(values, nullable))
2177 }
2178
2179 fn one_column_roundtrip_with_schema(values: ArrayRef, schema: SchemaRef) -> Vec<File> {
2180 let mut options = RoundTripOptions::new(values, false);
2181 options.schema = schema;
2182 one_column_roundtrip_with_options(options)
2183 }
2184
2185 fn one_column_roundtrip_with_options(options: RoundTripOptions) -> Vec<File> {
2186 let RoundTripOptions {
2187 values,
2188 schema,
2189 bloom_filter,
2190 bloom_filter_position,
2191 } = options;
2192
2193 let encodings = match values.data_type() {
2194 DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary | DataType::LargeBinary => {
2195 vec![
2196 Encoding::PLAIN,
2197 Encoding::DELTA_BYTE_ARRAY,
2198 Encoding::DELTA_LENGTH_BYTE_ARRAY,
2199 ]
2200 }
2201 DataType::Int64
2202 | DataType::Int32
2203 | DataType::Int16
2204 | DataType::Int8
2205 | DataType::UInt64
2206 | DataType::UInt32
2207 | DataType::UInt16
2208 | DataType::UInt8 => vec![
2209 Encoding::PLAIN,
2210 Encoding::DELTA_BINARY_PACKED,
2211 Encoding::BYTE_STREAM_SPLIT,
2212 ],
2213 DataType::Float32 | DataType::Float64 => {
2214 vec![Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT]
2215 }
2216 _ => vec![Encoding::PLAIN],
2217 };
2218
2219 let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2220
2221 let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
2222
2223 let mut files = vec![];
2224 for dictionary_size in [0, 1, 1024] {
2225 for encoding in &encodings {
2226 for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2227 for row_group_size in row_group_sizes {
2228 let props = WriterProperties::builder()
2229 .set_writer_version(version)
2230 .set_max_row_group_size(row_group_size)
2231 .set_dictionary_enabled(dictionary_size != 0)
2232 .set_dictionary_page_size_limit(dictionary_size.max(1))
2233 .set_encoding(*encoding)
2234 .set_bloom_filter_enabled(bloom_filter)
2235 .set_bloom_filter_position(bloom_filter_position)
2236 .build();
2237
2238 files.push(roundtrip_opts(&expected_batch, props))
2239 }
2240 }
2241 }
2242 }
2243 files
2244 }
2245
2246 fn values_required<A, I>(iter: I) -> Vec<File>
2247 where
2248 A: From<Vec<I::Item>> + Array + 'static,
2249 I: IntoIterator,
2250 {
2251 let raw_values: Vec<_> = iter.into_iter().collect();
2252 let values = Arc::new(A::from(raw_values));
2253 one_column_roundtrip(values, false)
2254 }
2255
2256 fn values_optional<A, I>(iter: I) -> Vec<File>
2257 where
2258 A: From<Vec<Option<I::Item>>> + Array + 'static,
2259 I: IntoIterator,
2260 {
2261 let optional_raw_values: Vec<_> = iter
2262 .into_iter()
2263 .enumerate()
2264 .map(|(i, v)| if i % 2 == 0 { None } else { Some(v) })
2265 .collect();
2266 let optional_values = Arc::new(A::from(optional_raw_values));
2267 one_column_roundtrip(optional_values, true)
2268 }
2269
2270 fn required_and_optional<A, I>(iter: I)
2271 where
2272 A: From<Vec<I::Item>> + From<Vec<Option<I::Item>>> + Array + 'static,
2273 I: IntoIterator + Clone,
2274 {
2275 values_required::<A, I>(iter.clone());
2276 values_optional::<A, I>(iter);
2277 }
2278
2279 fn check_bloom_filter<T: AsBytes>(
2280 files: Vec<File>,
2281 file_column: String,
2282 positive_values: Vec<T>,
2283 negative_values: Vec<T>,
2284 ) {
2285 files.into_iter().take(1).for_each(|file| {
2286 let file_reader = SerializedFileReader::new_with_options(
2287 file,
2288 ReadOptionsBuilder::new()
2289 .with_reader_properties(
2290 ReaderProperties::builder()
2291 .set_read_bloom_filter(true)
2292 .build(),
2293 )
2294 .build(),
2295 )
2296 .expect("Unable to open file as Parquet");
2297 let metadata = file_reader.metadata();
2298
2299 let mut bloom_filters: Vec<_> = vec![];
2301 for (ri, row_group) in metadata.row_groups().iter().enumerate() {
2302 if let Some((column_index, _)) = row_group
2303 .columns()
2304 .iter()
2305 .enumerate()
2306 .find(|(_, column)| column.column_path().string() == file_column)
2307 {
2308 let row_group_reader = file_reader
2309 .get_row_group(ri)
2310 .expect("Unable to read row group");
2311 if let Some(sbbf) = row_group_reader.get_column_bloom_filter(column_index) {
2312 bloom_filters.push(sbbf.clone());
2313 } else {
2314 panic!("No bloom filter for column named {file_column} found");
2315 }
2316 } else {
2317 panic!("No column named {file_column} found");
2318 }
2319 }
2320
2321 positive_values.iter().for_each(|value| {
2322 let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2323 assert!(
2324 found.is_some(),
2325 "{}",
2326 format!("Value {:?} should be in bloom filter", value.as_bytes())
2327 );
2328 });
2329
2330 negative_values.iter().for_each(|value| {
2331 let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2332 assert!(
2333 found.is_none(),
2334 "{}",
2335 format!("Value {:?} should not be in bloom filter", value.as_bytes())
2336 );
2337 });
2338 });
2339 }
2340
2341 #[test]
2342 fn all_null_primitive_single_column() {
2343 let values = Arc::new(Int32Array::from(vec![None; SMALL_SIZE]));
2344 one_column_roundtrip(values, true);
2345 }
2346 #[test]
2347 fn null_single_column() {
2348 let values = Arc::new(NullArray::new(SMALL_SIZE));
2349 one_column_roundtrip(values, true);
2350 }
2352
2353 #[test]
2354 fn bool_single_column() {
2355 required_and_optional::<BooleanArray, _>(
2356 [true, false].iter().cycle().copied().take(SMALL_SIZE),
2357 );
2358 }
2359
2360 #[test]
2361 fn bool_large_single_column() {
2362 let values = Arc::new(
2363 [None, Some(true), Some(false)]
2364 .iter()
2365 .cycle()
2366 .copied()
2367 .take(200_000)
2368 .collect::<BooleanArray>(),
2369 );
2370 let schema = Schema::new(vec![Field::new("col", values.data_type().clone(), true)]);
2371 let expected_batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2372 let file = tempfile::tempfile().unwrap();
2373
2374 let mut writer =
2375 ArrowWriter::try_new(file.try_clone().unwrap(), expected_batch.schema(), None)
2376 .expect("Unable to write file");
2377 writer.write(&expected_batch).unwrap();
2378 writer.close().unwrap();
2379 }
2380
2381 #[test]
2382 fn check_page_offset_index_with_nan() {
2383 let values = Arc::new(Float64Array::from(vec![f64::NAN; 10]));
2384 let schema = Schema::new(vec![Field::new("col", DataType::Float64, true)]);
2385 let batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2386
2387 let mut out = Vec::with_capacity(1024);
2388 let mut writer =
2389 ArrowWriter::try_new(&mut out, batch.schema(), None).expect("Unable to write file");
2390 writer.write(&batch).unwrap();
2391 let file_meta_data = writer.close().unwrap();
2392 for row_group in file_meta_data.row_groups {
2393 for column in row_group.columns {
2394 assert!(column.offset_index_offset.is_some());
2395 assert!(column.offset_index_length.is_some());
2396 assert!(column.column_index_offset.is_none());
2397 assert!(column.column_index_length.is_none());
2398 }
2399 }
2400 }
2401
2402 #[test]
2403 fn i8_single_column() {
2404 required_and_optional::<Int8Array, _>(0..SMALL_SIZE as i8);
2405 }
2406
2407 #[test]
2408 fn i16_single_column() {
2409 required_and_optional::<Int16Array, _>(0..SMALL_SIZE as i16);
2410 }
2411
2412 #[test]
2413 fn i32_single_column() {
2414 required_and_optional::<Int32Array, _>(0..SMALL_SIZE as i32);
2415 }
2416
2417 #[test]
2418 fn i64_single_column() {
2419 required_and_optional::<Int64Array, _>(0..SMALL_SIZE as i64);
2420 }
2421
2422 #[test]
2423 fn u8_single_column() {
2424 required_and_optional::<UInt8Array, _>(0..SMALL_SIZE as u8);
2425 }
2426
2427 #[test]
2428 fn u16_single_column() {
2429 required_and_optional::<UInt16Array, _>(0..SMALL_SIZE as u16);
2430 }
2431
2432 #[test]
2433 fn u32_single_column() {
2434 required_and_optional::<UInt32Array, _>(0..SMALL_SIZE as u32);
2435 }
2436
2437 #[test]
2438 fn u64_single_column() {
2439 required_and_optional::<UInt64Array, _>(0..SMALL_SIZE as u64);
2440 }
2441
2442 #[test]
2443 fn f32_single_column() {
2444 required_and_optional::<Float32Array, _>((0..SMALL_SIZE).map(|i| i as f32));
2445 }
2446
2447 #[test]
2448 fn f64_single_column() {
2449 required_and_optional::<Float64Array, _>((0..SMALL_SIZE).map(|i| i as f64));
2450 }
2451
2452 #[test]
2457 fn timestamp_second_single_column() {
2458 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2459 let values = Arc::new(TimestampSecondArray::from(raw_values));
2460
2461 one_column_roundtrip(values, false);
2462 }
2463
2464 #[test]
2465 fn timestamp_millisecond_single_column() {
2466 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2467 let values = Arc::new(TimestampMillisecondArray::from(raw_values));
2468
2469 one_column_roundtrip(values, false);
2470 }
2471
2472 #[test]
2473 fn timestamp_microsecond_single_column() {
2474 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2475 let values = Arc::new(TimestampMicrosecondArray::from(raw_values));
2476
2477 one_column_roundtrip(values, false);
2478 }
2479
2480 #[test]
2481 fn timestamp_nanosecond_single_column() {
2482 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2483 let values = Arc::new(TimestampNanosecondArray::from(raw_values));
2484
2485 one_column_roundtrip(values, false);
2486 }
2487
2488 #[test]
2489 fn date32_single_column() {
2490 required_and_optional::<Date32Array, _>(0..SMALL_SIZE as i32);
2491 }
2492
2493 #[test]
2494 fn date64_single_column() {
2495 required_and_optional::<Date64Array, _>(
2497 (0..(SMALL_SIZE as i64 * 86400000)).step_by(86400000),
2498 );
2499 }
2500
2501 #[test]
2502 fn time32_second_single_column() {
2503 required_and_optional::<Time32SecondArray, _>(0..SMALL_SIZE as i32);
2504 }
2505
2506 #[test]
2507 fn time32_millisecond_single_column() {
2508 required_and_optional::<Time32MillisecondArray, _>(0..SMALL_SIZE as i32);
2509 }
2510
2511 #[test]
2512 fn time64_microsecond_single_column() {
2513 required_and_optional::<Time64MicrosecondArray, _>(0..SMALL_SIZE as i64);
2514 }
2515
2516 #[test]
2517 fn time64_nanosecond_single_column() {
2518 required_and_optional::<Time64NanosecondArray, _>(0..SMALL_SIZE as i64);
2519 }
2520
2521 #[test]
2522 fn duration_second_single_column() {
2523 required_and_optional::<DurationSecondArray, _>(0..SMALL_SIZE as i64);
2524 }
2525
2526 #[test]
2527 fn duration_millisecond_single_column() {
2528 required_and_optional::<DurationMillisecondArray, _>(0..SMALL_SIZE as i64);
2529 }
2530
2531 #[test]
2532 fn duration_microsecond_single_column() {
2533 required_and_optional::<DurationMicrosecondArray, _>(0..SMALL_SIZE as i64);
2534 }
2535
2536 #[test]
2537 fn duration_nanosecond_single_column() {
2538 required_and_optional::<DurationNanosecondArray, _>(0..SMALL_SIZE as i64);
2539 }
2540
2541 #[test]
2542 fn interval_year_month_single_column() {
2543 required_and_optional::<IntervalYearMonthArray, _>(0..SMALL_SIZE as i32);
2544 }
2545
2546 #[test]
2547 fn interval_day_time_single_column() {
2548 required_and_optional::<IntervalDayTimeArray, _>(vec![
2549 IntervalDayTime::new(0, 1),
2550 IntervalDayTime::new(0, 3),
2551 IntervalDayTime::new(3, -2),
2552 IntervalDayTime::new(-200, 4),
2553 ]);
2554 }
2555
2556 #[test]
2557 #[should_panic(
2558 expected = "Attempting to write an Arrow interval type MonthDayNano to parquet that is not yet implemented"
2559 )]
2560 fn interval_month_day_nano_single_column() {
2561 required_and_optional::<IntervalMonthDayNanoArray, _>(vec![
2562 IntervalMonthDayNano::new(0, 1, 5),
2563 IntervalMonthDayNano::new(0, 3, 2),
2564 IntervalMonthDayNano::new(3, -2, -5),
2565 IntervalMonthDayNano::new(-200, 4, -1),
2566 ]);
2567 }
2568
2569 #[test]
2570 fn binary_single_column() {
2571 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2572 let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect();
2573 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2574
2575 values_required::<BinaryArray, _>(many_vecs_iter);
2577 }
2578
2579 #[test]
2580 fn binary_view_single_column() {
2581 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2582 let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect();
2583 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2584
2585 values_required::<BinaryViewArray, _>(many_vecs_iter);
2587 }
2588
2589 #[test]
2590 fn i32_column_bloom_filter_at_end() {
2591 let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
2592 let mut options = RoundTripOptions::new(array, false);
2593 options.bloom_filter = true;
2594 options.bloom_filter_position = BloomFilterPosition::End;
2595
2596 let files = one_column_roundtrip_with_options(options);
2597 check_bloom_filter(
2598 files,
2599 "col".to_string(),
2600 (0..SMALL_SIZE as i32).collect(),
2601 (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
2602 );
2603 }
2604
2605 #[test]
2606 fn i32_column_bloom_filter() {
2607 let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
2608 let mut options = RoundTripOptions::new(array, false);
2609 options.bloom_filter = true;
2610
2611 let files = one_column_roundtrip_with_options(options);
2612 check_bloom_filter(
2613 files,
2614 "col".to_string(),
2615 (0..SMALL_SIZE as i32).collect(),
2616 (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
2617 );
2618 }
2619
2620 #[test]
2621 fn binary_column_bloom_filter() {
2622 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2623 let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect();
2624 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2625
2626 let array = Arc::new(BinaryArray::from_iter_values(many_vecs_iter));
2627 let mut options = RoundTripOptions::new(array, false);
2628 options.bloom_filter = true;
2629
2630 let files = one_column_roundtrip_with_options(options);
2631 check_bloom_filter(
2632 files,
2633 "col".to_string(),
2634 many_vecs,
2635 vec![vec![(SMALL_SIZE + 1) as u8]],
2636 );
2637 }
2638
2639 #[test]
2640 fn empty_string_null_column_bloom_filter() {
2641 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2642 let raw_strs = raw_values.iter().map(|s| s.as_str());
2643
2644 let array = Arc::new(StringArray::from_iter_values(raw_strs));
2645 let mut options = RoundTripOptions::new(array, false);
2646 options.bloom_filter = true;
2647
2648 let files = one_column_roundtrip_with_options(options);
2649
2650 let optional_raw_values: Vec<_> = raw_values
2651 .iter()
2652 .enumerate()
2653 .filter_map(|(i, v)| if i % 2 == 0 { None } else { Some(v.as_str()) })
2654 .collect();
2655 check_bloom_filter(files, "col".to_string(), optional_raw_values, vec![""]);
2657 }
2658
2659 #[test]
2660 fn large_binary_single_column() {
2661 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2662 let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect();
2663 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2664
2665 values_required::<LargeBinaryArray, _>(many_vecs_iter);
2667 }
2668
2669 #[test]
2670 fn fixed_size_binary_single_column() {
2671 let mut builder = FixedSizeBinaryBuilder::new(4);
2672 builder.append_value(b"0123").unwrap();
2673 builder.append_null();
2674 builder.append_value(b"8910").unwrap();
2675 builder.append_value(b"1112").unwrap();
2676 let array = Arc::new(builder.finish());
2677
2678 one_column_roundtrip(array, true);
2679 }
2680
2681 #[test]
2682 fn string_single_column() {
2683 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2684 let raw_strs = raw_values.iter().map(|s| s.as_str());
2685
2686 required_and_optional::<StringArray, _>(raw_strs);
2687 }
2688
2689 #[test]
2690 fn large_string_single_column() {
2691 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2692 let raw_strs = raw_values.iter().map(|s| s.as_str());
2693
2694 required_and_optional::<LargeStringArray, _>(raw_strs);
2695 }
2696
2697 #[test]
2698 fn string_view_single_column() {
2699 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2700 let raw_strs = raw_values.iter().map(|s| s.as_str());
2701
2702 required_and_optional::<StringViewArray, _>(raw_strs);
2703 }
2704
2705 #[test]
2706 fn null_list_single_column() {
2707 let null_field = Field::new_list_field(DataType::Null, true);
2708 let list_field = Field::new("emptylist", DataType::List(Arc::new(null_field)), true);
2709
2710 let schema = Schema::new(vec![list_field]);
2711
2712 let a_values = NullArray::new(2);
2714 let a_value_offsets = arrow::buffer::Buffer::from([0, 0, 0, 2].to_byte_slice());
2715 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2716 DataType::Null,
2717 true,
2718 ))))
2719 .len(3)
2720 .add_buffer(a_value_offsets)
2721 .null_bit_buffer(Some(Buffer::from([0b00000101])))
2722 .add_child_data(a_values.into_data())
2723 .build()
2724 .unwrap();
2725
2726 let a = ListArray::from(a_list_data);
2727
2728 assert!(a.is_valid(0));
2729 assert!(!a.is_valid(1));
2730 assert!(a.is_valid(2));
2731
2732 assert_eq!(a.value(0).len(), 0);
2733 assert_eq!(a.value(2).len(), 2);
2734 assert_eq!(a.value(2).logical_nulls().unwrap().null_count(), 2);
2735
2736 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2737 roundtrip(batch, None);
2738 }
2739
2740 #[test]
2741 fn list_single_column() {
2742 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2743 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
2744 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2745 DataType::Int32,
2746 false,
2747 ))))
2748 .len(5)
2749 .add_buffer(a_value_offsets)
2750 .null_bit_buffer(Some(Buffer::from([0b00011011])))
2751 .add_child_data(a_values.into_data())
2752 .build()
2753 .unwrap();
2754
2755 assert_eq!(a_list_data.null_count(), 1);
2756
2757 let a = ListArray::from(a_list_data);
2758 let values = Arc::new(a);
2759
2760 one_column_roundtrip(values, true);
2761 }
2762
2763 #[test]
2764 fn large_list_single_column() {
2765 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2766 let a_value_offsets = arrow::buffer::Buffer::from([0i64, 1, 3, 3, 6, 10].to_byte_slice());
2767 let a_list_data = ArrayData::builder(DataType::LargeList(Arc::new(Field::new(
2768 "large_item",
2769 DataType::Int32,
2770 true,
2771 ))))
2772 .len(5)
2773 .add_buffer(a_value_offsets)
2774 .add_child_data(a_values.into_data())
2775 .null_bit_buffer(Some(Buffer::from([0b00011011])))
2776 .build()
2777 .unwrap();
2778
2779 assert_eq!(a_list_data.null_count(), 1);
2781
2782 let a = LargeListArray::from(a_list_data);
2783 let values = Arc::new(a);
2784
2785 one_column_roundtrip(values, true);
2786 }
2787
2788 #[test]
2789 fn list_nested_nulls() {
2790 use arrow::datatypes::Int32Type;
2791 let data = vec![
2792 Some(vec![Some(1)]),
2793 Some(vec![Some(2), Some(3)]),
2794 None,
2795 Some(vec![Some(4), Some(5), None]),
2796 Some(vec![None]),
2797 Some(vec![Some(6), Some(7)]),
2798 ];
2799
2800 let list = ListArray::from_iter_primitive::<Int32Type, _, _>(data.clone());
2801 one_column_roundtrip(Arc::new(list), true);
2802
2803 let list = LargeListArray::from_iter_primitive::<Int32Type, _, _>(data);
2804 one_column_roundtrip(Arc::new(list), true);
2805 }
2806
2807 #[test]
2808 fn struct_single_column() {
2809 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2810 let struct_field_a = Arc::new(Field::new("f", DataType::Int32, false));
2811 let s = StructArray::from(vec![(struct_field_a, Arc::new(a_values) as ArrayRef)]);
2812
2813 let values = Arc::new(s);
2814 one_column_roundtrip(values, false);
2815 }
2816
2817 #[test]
2818 fn list_and_map_coerced_names() {
2819 let list_field =
2821 Field::new_list("my_list", Field::new("item", DataType::Int32, false), false);
2822 let map_field = Field::new_map(
2823 "my_map",
2824 "entries",
2825 Field::new("keys", DataType::Int32, false),
2826 Field::new("values", DataType::Int32, true),
2827 false,
2828 true,
2829 );
2830
2831 let list_array = create_random_array(&list_field, 100, 0.0, 0.0).unwrap();
2832 let map_array = create_random_array(&map_field, 100, 0.0, 0.0).unwrap();
2833
2834 let arrow_schema = Arc::new(Schema::new(vec![list_field, map_field]));
2835
2836 let props = Some(WriterProperties::builder().set_coerce_types(true).build());
2838 let file = tempfile::tempfile().unwrap();
2839 let mut writer =
2840 ArrowWriter::try_new(file.try_clone().unwrap(), arrow_schema.clone(), props).unwrap();
2841
2842 let batch = RecordBatch::try_new(arrow_schema, vec![list_array, map_array]).unwrap();
2843 writer.write(&batch).unwrap();
2844 let file_metadata = writer.close().unwrap();
2845
2846 assert_eq!(file_metadata.schema[3].name, "element");
2848 assert_eq!(file_metadata.schema[5].name, "key_value");
2850 assert_eq!(file_metadata.schema[6].name, "key");
2852 assert_eq!(file_metadata.schema[7].name, "value");
2854
2855 let reader = SerializedFileReader::new(file).unwrap();
2857 let file_schema = reader.metadata().file_metadata().schema();
2858 let fields = file_schema.get_fields();
2859 let list_field = &fields[0].get_fields()[0];
2860 assert_eq!(list_field.get_fields()[0].name(), "element");
2861 let map_field = &fields[1].get_fields()[0];
2862 assert_eq!(map_field.name(), "key_value");
2863 assert_eq!(map_field.get_fields()[0].name(), "key");
2864 assert_eq!(map_field.get_fields()[1].name(), "value");
2865 }
2866
2867 #[test]
2868 fn fallback_flush_data_page() {
2869 let raw_values: Vec<_> = (0..MEDIUM_SIZE).map(|i| i.to_string()).collect();
2871 let values = Arc::new(StringArray::from(raw_values));
2872 let encodings = vec![
2873 Encoding::DELTA_BYTE_ARRAY,
2874 Encoding::DELTA_LENGTH_BYTE_ARRAY,
2875 ];
2876 let data_type = values.data_type().clone();
2877 let schema = Arc::new(Schema::new(vec![Field::new("col", data_type, false)]));
2878 let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2879
2880 let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
2881 let data_page_size_limit: usize = 32;
2882 let write_batch_size: usize = 16;
2883
2884 for encoding in &encodings {
2885 for row_group_size in row_group_sizes {
2886 let props = WriterProperties::builder()
2887 .set_writer_version(WriterVersion::PARQUET_2_0)
2888 .set_max_row_group_size(row_group_size)
2889 .set_dictionary_enabled(false)
2890 .set_encoding(*encoding)
2891 .set_data_page_size_limit(data_page_size_limit)
2892 .set_write_batch_size(write_batch_size)
2893 .build();
2894
2895 roundtrip_opts_with_array_validation(&expected_batch, props, |a, b| {
2896 let string_array_a = StringArray::from(a.clone());
2897 let string_array_b = StringArray::from(b.clone());
2898 let vec_a: Vec<&str> = string_array_a.iter().map(|v| v.unwrap()).collect();
2899 let vec_b: Vec<&str> = string_array_b.iter().map(|v| v.unwrap()).collect();
2900 assert_eq!(
2901 vec_a, vec_b,
2902 "failed for encoder: {encoding:?} and row_group_size: {row_group_size:?}"
2903 );
2904 });
2905 }
2906 }
2907 }
2908
2909 #[test]
2910 fn arrow_writer_string_dictionary() {
2911 #[allow(deprecated)]
2913 let schema = Arc::new(Schema::new(vec![Field::new_dict(
2914 "dictionary",
2915 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2916 true,
2917 42,
2918 true,
2919 )]));
2920
2921 let d: Int32DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
2923 .iter()
2924 .copied()
2925 .collect();
2926
2927 one_column_roundtrip_with_schema(Arc::new(d), schema);
2929 }
2930
2931 #[test]
2932 fn arrow_writer_primitive_dictionary() {
2933 #[allow(deprecated)]
2935 let schema = Arc::new(Schema::new(vec![Field::new_dict(
2936 "dictionary",
2937 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::UInt32)),
2938 true,
2939 42,
2940 true,
2941 )]));
2942
2943 let mut builder = PrimitiveDictionaryBuilder::<UInt8Type, UInt32Type>::new();
2945 builder.append(12345678).unwrap();
2946 builder.append_null();
2947 builder.append(22345678).unwrap();
2948 builder.append(12345678).unwrap();
2949 let d = builder.finish();
2950
2951 one_column_roundtrip_with_schema(Arc::new(d), schema);
2952 }
2953
2954 #[test]
2955 fn arrow_writer_decimal128_dictionary() {
2956 let integers = vec![12345, 56789, 34567];
2957
2958 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
2959
2960 let values = Decimal128Array::from(integers.clone())
2961 .with_precision_and_scale(5, 2)
2962 .unwrap();
2963
2964 let array = DictionaryArray::new(keys, Arc::new(values));
2965 one_column_roundtrip(Arc::new(array.clone()), true);
2966
2967 let values = Decimal128Array::from(integers)
2968 .with_precision_and_scale(12, 2)
2969 .unwrap();
2970
2971 let array = array.with_values(Arc::new(values));
2972 one_column_roundtrip(Arc::new(array), true);
2973 }
2974
2975 #[test]
2976 fn arrow_writer_decimal256_dictionary() {
2977 let integers = vec![
2978 i256::from_i128(12345),
2979 i256::from_i128(56789),
2980 i256::from_i128(34567),
2981 ];
2982
2983 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
2984
2985 let values = Decimal256Array::from(integers.clone())
2986 .with_precision_and_scale(5, 2)
2987 .unwrap();
2988
2989 let array = DictionaryArray::new(keys, Arc::new(values));
2990 one_column_roundtrip(Arc::new(array.clone()), true);
2991
2992 let values = Decimal256Array::from(integers)
2993 .with_precision_and_scale(12, 2)
2994 .unwrap();
2995
2996 let array = array.with_values(Arc::new(values));
2997 one_column_roundtrip(Arc::new(array), true);
2998 }
2999
3000 #[test]
3001 fn arrow_writer_string_dictionary_unsigned_index() {
3002 #[allow(deprecated)]
3004 let schema = Arc::new(Schema::new(vec![Field::new_dict(
3005 "dictionary",
3006 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3007 true,
3008 42,
3009 true,
3010 )]));
3011
3012 let d: UInt8DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
3014 .iter()
3015 .copied()
3016 .collect();
3017
3018 one_column_roundtrip_with_schema(Arc::new(d), schema);
3019 }
3020
3021 #[test]
3022 fn u32_min_max() {
3023 let src = [
3025 u32::MIN,
3026 u32::MIN + 1,
3027 (i32::MAX as u32) - 1,
3028 i32::MAX as u32,
3029 (i32::MAX as u32) + 1,
3030 u32::MAX - 1,
3031 u32::MAX,
3032 ];
3033 let values = Arc::new(UInt32Array::from_iter_values(src.iter().cloned()));
3034 let files = one_column_roundtrip(values, false);
3035
3036 for file in files {
3037 let reader = SerializedFileReader::new(file).unwrap();
3039 let metadata = reader.metadata();
3040
3041 let mut row_offset = 0;
3042 for row_group in metadata.row_groups() {
3043 assert_eq!(row_group.num_columns(), 1);
3044 let column = row_group.column(0);
3045
3046 let num_values = column.num_values() as usize;
3047 let src_slice = &src[row_offset..row_offset + num_values];
3048 row_offset += column.num_values() as usize;
3049
3050 let stats = column.statistics().unwrap();
3051 if let Statistics::Int32(stats) = stats {
3052 assert_eq!(
3053 *stats.min_opt().unwrap() as u32,
3054 *src_slice.iter().min().unwrap()
3055 );
3056 assert_eq!(
3057 *stats.max_opt().unwrap() as u32,
3058 *src_slice.iter().max().unwrap()
3059 );
3060 } else {
3061 panic!("Statistics::Int32 missing")
3062 }
3063 }
3064 }
3065 }
3066
3067 #[test]
3068 fn u64_min_max() {
3069 let src = [
3071 u64::MIN,
3072 u64::MIN + 1,
3073 (i64::MAX as u64) - 1,
3074 i64::MAX as u64,
3075 (i64::MAX as u64) + 1,
3076 u64::MAX - 1,
3077 u64::MAX,
3078 ];
3079 let values = Arc::new(UInt64Array::from_iter_values(src.iter().cloned()));
3080 let files = one_column_roundtrip(values, false);
3081
3082 for file in files {
3083 let reader = SerializedFileReader::new(file).unwrap();
3085 let metadata = reader.metadata();
3086
3087 let mut row_offset = 0;
3088 for row_group in metadata.row_groups() {
3089 assert_eq!(row_group.num_columns(), 1);
3090 let column = row_group.column(0);
3091
3092 let num_values = column.num_values() as usize;
3093 let src_slice = &src[row_offset..row_offset + num_values];
3094 row_offset += column.num_values() as usize;
3095
3096 let stats = column.statistics().unwrap();
3097 if let Statistics::Int64(stats) = stats {
3098 assert_eq!(
3099 *stats.min_opt().unwrap() as u64,
3100 *src_slice.iter().min().unwrap()
3101 );
3102 assert_eq!(
3103 *stats.max_opt().unwrap() as u64,
3104 *src_slice.iter().max().unwrap()
3105 );
3106 } else {
3107 panic!("Statistics::Int64 missing")
3108 }
3109 }
3110 }
3111 }
3112
3113 #[test]
3114 fn statistics_null_counts_only_nulls() {
3115 let values = Arc::new(UInt64Array::from(vec![None, None]));
3117 let files = one_column_roundtrip(values, true);
3118
3119 for file in files {
3120 let reader = SerializedFileReader::new(file).unwrap();
3122 let metadata = reader.metadata();
3123 assert_eq!(metadata.num_row_groups(), 1);
3124 let row_group = metadata.row_group(0);
3125 assert_eq!(row_group.num_columns(), 1);
3126 let column = row_group.column(0);
3127 let stats = column.statistics().unwrap();
3128 assert_eq!(stats.null_count_opt(), Some(2));
3129 }
3130 }
3131
3132 #[test]
3133 fn test_list_of_struct_roundtrip() {
3134 let int_field = Field::new("a", DataType::Int32, true);
3136 let int_field2 = Field::new("b", DataType::Int32, true);
3137
3138 let int_builder = Int32Builder::with_capacity(10);
3139 let int_builder2 = Int32Builder::with_capacity(10);
3140
3141 let struct_builder = StructBuilder::new(
3142 vec![int_field, int_field2],
3143 vec![Box::new(int_builder), Box::new(int_builder2)],
3144 );
3145 let mut list_builder = ListBuilder::new(struct_builder);
3146
3147 let values = list_builder.values();
3152 values
3153 .field_builder::<Int32Builder>(0)
3154 .unwrap()
3155 .append_value(1);
3156 values
3157 .field_builder::<Int32Builder>(1)
3158 .unwrap()
3159 .append_value(2);
3160 values.append(true);
3161 list_builder.append(true);
3162
3163 list_builder.append(true);
3165
3166 list_builder.append(false);
3168
3169 let values = list_builder.values();
3171 values
3172 .field_builder::<Int32Builder>(0)
3173 .unwrap()
3174 .append_null();
3175 values
3176 .field_builder::<Int32Builder>(1)
3177 .unwrap()
3178 .append_null();
3179 values.append(false);
3180 values
3181 .field_builder::<Int32Builder>(0)
3182 .unwrap()
3183 .append_null();
3184 values
3185 .field_builder::<Int32Builder>(1)
3186 .unwrap()
3187 .append_null();
3188 values.append(false);
3189 list_builder.append(true);
3190
3191 let values = list_builder.values();
3193 values
3194 .field_builder::<Int32Builder>(0)
3195 .unwrap()
3196 .append_null();
3197 values
3198 .field_builder::<Int32Builder>(1)
3199 .unwrap()
3200 .append_value(3);
3201 values.append(true);
3202 list_builder.append(true);
3203
3204 let values = list_builder.values();
3206 values
3207 .field_builder::<Int32Builder>(0)
3208 .unwrap()
3209 .append_value(2);
3210 values
3211 .field_builder::<Int32Builder>(1)
3212 .unwrap()
3213 .append_null();
3214 values.append(true);
3215 list_builder.append(true);
3216
3217 let array = Arc::new(list_builder.finish());
3218
3219 one_column_roundtrip(array, true);
3220 }
3221
3222 fn row_group_sizes(metadata: &ParquetMetaData) -> Vec<i64> {
3223 metadata.row_groups().iter().map(|x| x.num_rows()).collect()
3224 }
3225
3226 #[test]
3227 fn test_aggregates_records() {
3228 let arrays = [
3229 Int32Array::from((0..100).collect::<Vec<_>>()),
3230 Int32Array::from((0..50).collect::<Vec<_>>()),
3231 Int32Array::from((200..500).collect::<Vec<_>>()),
3232 ];
3233
3234 let schema = Arc::new(Schema::new(vec![Field::new(
3235 "int",
3236 ArrowDataType::Int32,
3237 false,
3238 )]));
3239
3240 let file = tempfile::tempfile().unwrap();
3241
3242 let props = WriterProperties::builder()
3243 .set_max_row_group_size(200)
3244 .build();
3245
3246 let mut writer =
3247 ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
3248
3249 for array in arrays {
3250 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
3251 writer.write(&batch).unwrap();
3252 }
3253
3254 writer.close().unwrap();
3255
3256 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3257 assert_eq!(&row_group_sizes(builder.metadata()), &[200, 200, 50]);
3258
3259 let batches = builder
3260 .with_batch_size(100)
3261 .build()
3262 .unwrap()
3263 .collect::<ArrowResult<Vec<_>>>()
3264 .unwrap();
3265
3266 assert_eq!(batches.len(), 5);
3267 assert!(batches.iter().all(|x| x.num_columns() == 1));
3268
3269 let batch_sizes: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
3270
3271 assert_eq!(&batch_sizes, &[100, 100, 100, 100, 50]);
3272
3273 let values: Vec<_> = batches
3274 .iter()
3275 .flat_map(|x| {
3276 x.column(0)
3277 .as_any()
3278 .downcast_ref::<Int32Array>()
3279 .unwrap()
3280 .values()
3281 .iter()
3282 .cloned()
3283 })
3284 .collect();
3285
3286 let expected_values: Vec<_> = [0..100, 0..50, 200..500].into_iter().flatten().collect();
3287 assert_eq!(&values, &expected_values)
3288 }
3289
3290 #[test]
3291 fn complex_aggregate() {
3292 let field_a = Arc::new(Field::new("leaf_a", DataType::Int32, false));
3294 let field_b = Arc::new(Field::new("leaf_b", DataType::Int32, true));
3295 let struct_a = Arc::new(Field::new(
3296 "struct_a",
3297 DataType::Struct(vec![field_a.clone(), field_b.clone()].into()),
3298 true,
3299 ));
3300
3301 let list_a = Arc::new(Field::new("list", DataType::List(struct_a), true));
3302 let struct_b = Arc::new(Field::new(
3303 "struct_b",
3304 DataType::Struct(vec![list_a.clone()].into()),
3305 false,
3306 ));
3307
3308 let schema = Arc::new(Schema::new(vec![struct_b]));
3309
3310 let field_a_array = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
3312 let field_b_array =
3313 Int32Array::from_iter(vec![Some(1), None, Some(2), None, None, Some(6)]);
3314
3315 let struct_a_array = StructArray::from(vec![
3316 (field_a.clone(), Arc::new(field_a_array) as ArrayRef),
3317 (field_b.clone(), Arc::new(field_b_array) as ArrayRef),
3318 ]);
3319
3320 let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
3321 .len(5)
3322 .add_buffer(Buffer::from_iter(vec![
3323 0_i32, 1_i32, 1_i32, 3_i32, 3_i32, 5_i32,
3324 ]))
3325 .null_bit_buffer(Some(Buffer::from_iter(vec![
3326 true, false, true, false, true,
3327 ])))
3328 .child_data(vec![struct_a_array.into_data()])
3329 .build()
3330 .unwrap();
3331
3332 let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
3333 let struct_b_array = StructArray::from(vec![(list_a.clone(), list_a_array)]);
3334
3335 let batch1 =
3336 RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
3337 .unwrap();
3338
3339 let field_a_array = Int32Array::from(vec![6, 7, 8, 9, 10]);
3340 let field_b_array = Int32Array::from_iter(vec![None, None, None, Some(1), None]);
3341
3342 let struct_a_array = StructArray::from(vec![
3343 (field_a, Arc::new(field_a_array) as ArrayRef),
3344 (field_b, Arc::new(field_b_array) as ArrayRef),
3345 ]);
3346
3347 let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
3348 .len(2)
3349 .add_buffer(Buffer::from_iter(vec![0_i32, 4_i32, 5_i32]))
3350 .child_data(vec![struct_a_array.into_data()])
3351 .build()
3352 .unwrap();
3353
3354 let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
3355 let struct_b_array = StructArray::from(vec![(list_a, list_a_array)]);
3356
3357 let batch2 =
3358 RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
3359 .unwrap();
3360
3361 let batches = &[batch1, batch2];
3362
3363 let expected = r#"
3366 +-------------------------------------------------------------------------------------------------------+
3367 | struct_b |
3368 +-------------------------------------------------------------------------------------------------------+
3369 | {list: [{leaf_a: 1, leaf_b: 1}]} |
3370 | {list: } |
3371 | {list: [{leaf_a: 2, leaf_b: }, {leaf_a: 3, leaf_b: 2}]} |
3372 | {list: } |
3373 | {list: [{leaf_a: 4, leaf_b: }, {leaf_a: 5, leaf_b: }]} |
3374 | {list: [{leaf_a: 6, leaf_b: }, {leaf_a: 7, leaf_b: }, {leaf_a: 8, leaf_b: }, {leaf_a: 9, leaf_b: 1}]} |
3375 | {list: [{leaf_a: 10, leaf_b: }]} |
3376 +-------------------------------------------------------------------------------------------------------+
3377 "#.trim().split('\n').map(|x| x.trim()).collect::<Vec<_>>().join("\n");
3378
3379 let actual = pretty_format_batches(batches).unwrap().to_string();
3380 assert_eq!(actual, expected);
3381
3382 let file = tempfile::tempfile().unwrap();
3384 let props = WriterProperties::builder()
3385 .set_max_row_group_size(6)
3386 .build();
3387
3388 let mut writer =
3389 ArrowWriter::try_new(file.try_clone().unwrap(), schema, Some(props)).unwrap();
3390
3391 for batch in batches {
3392 writer.write(batch).unwrap();
3393 }
3394 writer.close().unwrap();
3395
3396 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3401 assert_eq!(&row_group_sizes(builder.metadata()), &[6, 1]);
3402
3403 let batches = builder
3404 .with_batch_size(2)
3405 .build()
3406 .unwrap()
3407 .collect::<ArrowResult<Vec<_>>>()
3408 .unwrap();
3409
3410 assert_eq!(batches.len(), 4);
3411 let batch_counts: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
3412 assert_eq!(&batch_counts, &[2, 2, 2, 1]);
3413
3414 let actual = pretty_format_batches(&batches).unwrap().to_string();
3415 assert_eq!(actual, expected);
3416 }
3417
3418 #[test]
3419 fn test_arrow_writer_metadata() {
3420 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3421 let file_schema = batch_schema.clone().with_metadata(
3422 vec![("foo".to_string(), "bar".to_string())]
3423 .into_iter()
3424 .collect(),
3425 );
3426
3427 let batch = RecordBatch::try_new(
3428 Arc::new(batch_schema),
3429 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3430 )
3431 .unwrap();
3432
3433 let mut buf = Vec::with_capacity(1024);
3434 let mut writer = ArrowWriter::try_new(&mut buf, Arc::new(file_schema), None).unwrap();
3435 writer.write(&batch).unwrap();
3436 writer.close().unwrap();
3437 }
3438
3439 #[test]
3440 fn test_arrow_writer_nullable() {
3441 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3442 let file_schema = Schema::new(vec![Field::new("int32", DataType::Int32, true)]);
3443 let file_schema = Arc::new(file_schema);
3444
3445 let batch = RecordBatch::try_new(
3446 Arc::new(batch_schema),
3447 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3448 )
3449 .unwrap();
3450
3451 let mut buf = Vec::with_capacity(1024);
3452 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
3453 writer.write(&batch).unwrap();
3454 writer.close().unwrap();
3455
3456 let mut read = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
3457 let back = read.next().unwrap().unwrap();
3458 assert_eq!(back.schema(), file_schema);
3459 assert_ne!(back.schema(), batch.schema());
3460 assert_eq!(back.column(0).as_ref(), batch.column(0).as_ref());
3461 }
3462
3463 #[test]
3464 fn in_progress_accounting() {
3465 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
3467
3468 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
3470
3471 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
3473
3474 let mut writer = ArrowWriter::try_new(vec![], batch.schema(), None).unwrap();
3475
3476 assert_eq!(writer.in_progress_size(), 0);
3478 assert_eq!(writer.in_progress_rows(), 0);
3479 assert_eq!(writer.memory_size(), 0);
3480 assert_eq!(writer.bytes_written(), 4); writer.write(&batch).unwrap();
3482
3483 let initial_size = writer.in_progress_size();
3485 assert!(initial_size > 0);
3486 assert_eq!(writer.in_progress_rows(), 5);
3487 let initial_memory = writer.memory_size();
3488 assert!(initial_memory > 0);
3489 assert!(
3491 initial_size <= initial_memory,
3492 "{initial_size} <= {initial_memory}"
3493 );
3494
3495 writer.write(&batch).unwrap();
3497 assert!(writer.in_progress_size() > initial_size);
3498 assert_eq!(writer.in_progress_rows(), 10);
3499 assert!(writer.memory_size() > initial_memory);
3500 assert!(
3501 writer.in_progress_size() <= writer.memory_size(),
3502 "in_progress_size {} <= memory_size {}",
3503 writer.in_progress_size(),
3504 writer.memory_size()
3505 );
3506
3507 let pre_flush_bytes_written = writer.bytes_written();
3509 writer.flush().unwrap();
3510 assert_eq!(writer.in_progress_size(), 0);
3511 assert_eq!(writer.memory_size(), 0);
3512 assert!(writer.bytes_written() > pre_flush_bytes_written);
3513
3514 writer.close().unwrap();
3515 }
3516
3517 #[test]
3518 fn test_writer_all_null() {
3519 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
3520 let b = Int32Array::new(vec![0; 5].into(), Some(NullBuffer::new_null(5)));
3521 let batch = RecordBatch::try_from_iter(vec![
3522 ("a", Arc::new(a) as ArrayRef),
3523 ("b", Arc::new(b) as ArrayRef),
3524 ])
3525 .unwrap();
3526
3527 let mut buf = Vec::with_capacity(1024);
3528 let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), None).unwrap();
3529 writer.write(&batch).unwrap();
3530 writer.close().unwrap();
3531
3532 let bytes = Bytes::from(buf);
3533 let options = ReadOptionsBuilder::new().with_page_index().build();
3534 let reader = SerializedFileReader::new_with_options(bytes, options).unwrap();
3535 let index = reader.metadata().offset_index().unwrap();
3536
3537 assert_eq!(index.len(), 1);
3538 assert_eq!(index[0].len(), 2); assert_eq!(index[0][0].page_locations().len(), 1); assert_eq!(index[0][1].page_locations().len(), 1); }
3542
3543 #[test]
3544 fn test_disabled_statistics_with_page() {
3545 let file_schema = Schema::new(vec![
3546 Field::new("a", DataType::Utf8, true),
3547 Field::new("b", DataType::Utf8, true),
3548 ]);
3549 let file_schema = Arc::new(file_schema);
3550
3551 let batch = RecordBatch::try_new(
3552 file_schema.clone(),
3553 vec![
3554 Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
3555 Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
3556 ],
3557 )
3558 .unwrap();
3559
3560 let props = WriterProperties::builder()
3561 .set_statistics_enabled(EnabledStatistics::None)
3562 .set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
3563 .build();
3564
3565 let mut buf = Vec::with_capacity(1024);
3566 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
3567 writer.write(&batch).unwrap();
3568
3569 let metadata = writer.close().unwrap();
3570 assert_eq!(metadata.row_groups.len(), 1);
3571 let row_group = &metadata.row_groups[0];
3572 assert_eq!(row_group.columns.len(), 2);
3573 assert!(row_group.columns[0].offset_index_offset.is_some());
3575 assert!(row_group.columns[0].column_index_offset.is_some());
3576 assert!(row_group.columns[1].offset_index_offset.is_some());
3578 assert!(row_group.columns[1].column_index_offset.is_none());
3579
3580 let options = ReadOptionsBuilder::new().with_page_index().build();
3581 let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
3582
3583 let row_group = reader.get_row_group(0).unwrap();
3584 let a_col = row_group.metadata().column(0);
3585 let b_col = row_group.metadata().column(1);
3586
3587 if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
3589 let min = byte_array_stats.min_opt().unwrap();
3590 let max = byte_array_stats.max_opt().unwrap();
3591
3592 assert_eq!(min.as_bytes(), b"a");
3593 assert_eq!(max.as_bytes(), b"d");
3594 } else {
3595 panic!("expecting Statistics::ByteArray");
3596 }
3597
3598 assert!(b_col.statistics().is_none());
3600
3601 let offset_index = reader.metadata().offset_index().unwrap();
3602 assert_eq!(offset_index.len(), 1); assert_eq!(offset_index[0].len(), 2); let column_index = reader.metadata().column_index().unwrap();
3606 assert_eq!(column_index.len(), 1); assert_eq!(column_index[0].len(), 2); let a_idx = &column_index[0][0];
3610 assert!(matches!(a_idx, Index::BYTE_ARRAY(_)), "{a_idx:?}");
3611 let b_idx = &column_index[0][1];
3612 assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
3613 }
3614
3615 #[test]
3616 fn test_disabled_statistics_with_chunk() {
3617 let file_schema = Schema::new(vec![
3618 Field::new("a", DataType::Utf8, true),
3619 Field::new("b", DataType::Utf8, true),
3620 ]);
3621 let file_schema = Arc::new(file_schema);
3622
3623 let batch = RecordBatch::try_new(
3624 file_schema.clone(),
3625 vec![
3626 Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
3627 Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
3628 ],
3629 )
3630 .unwrap();
3631
3632 let props = WriterProperties::builder()
3633 .set_statistics_enabled(EnabledStatistics::None)
3634 .set_column_statistics_enabled("a".into(), EnabledStatistics::Chunk)
3635 .build();
3636
3637 let mut buf = Vec::with_capacity(1024);
3638 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
3639 writer.write(&batch).unwrap();
3640
3641 let metadata = writer.close().unwrap();
3642 assert_eq!(metadata.row_groups.len(), 1);
3643 let row_group = &metadata.row_groups[0];
3644 assert_eq!(row_group.columns.len(), 2);
3645 assert!(row_group.columns[0].offset_index_offset.is_some());
3647 assert!(row_group.columns[0].column_index_offset.is_none());
3648 assert!(row_group.columns[1].offset_index_offset.is_some());
3650 assert!(row_group.columns[1].column_index_offset.is_none());
3651
3652 let options = ReadOptionsBuilder::new().with_page_index().build();
3653 let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
3654
3655 let row_group = reader.get_row_group(0).unwrap();
3656 let a_col = row_group.metadata().column(0);
3657 let b_col = row_group.metadata().column(1);
3658
3659 if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
3661 let min = byte_array_stats.min_opt().unwrap();
3662 let max = byte_array_stats.max_opt().unwrap();
3663
3664 assert_eq!(min.as_bytes(), b"a");
3665 assert_eq!(max.as_bytes(), b"d");
3666 } else {
3667 panic!("expecting Statistics::ByteArray");
3668 }
3669
3670 assert!(b_col.statistics().is_none());
3672
3673 let column_index = reader.metadata().column_index().unwrap();
3674 assert_eq!(column_index.len(), 1); assert_eq!(column_index[0].len(), 2); let a_idx = &column_index[0][0];
3678 assert!(matches!(a_idx, Index::NONE), "{a_idx:?}");
3679 let b_idx = &column_index[0][1];
3680 assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
3681 }
3682
3683 #[test]
3684 fn test_arrow_writer_skip_metadata() {
3685 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3686 let file_schema = Arc::new(batch_schema.clone());
3687
3688 let batch = RecordBatch::try_new(
3689 Arc::new(batch_schema),
3690 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3691 )
3692 .unwrap();
3693 let skip_options = ArrowWriterOptions::new().with_skip_arrow_metadata(true);
3694
3695 let mut buf = Vec::with_capacity(1024);
3696 let mut writer =
3697 ArrowWriter::try_new_with_options(&mut buf, file_schema.clone(), skip_options).unwrap();
3698 writer.write(&batch).unwrap();
3699 writer.close().unwrap();
3700
3701 let bytes = Bytes::from(buf);
3702 let reader_builder = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
3703 assert_eq!(file_schema, *reader_builder.schema());
3704 if let Some(key_value_metadata) = reader_builder
3705 .metadata()
3706 .file_metadata()
3707 .key_value_metadata()
3708 {
3709 assert!(!key_value_metadata
3710 .iter()
3711 .any(|kv| kv.key.as_str() == ARROW_SCHEMA_META_KEY));
3712 }
3713 }
3714
3715 #[test]
3716 fn mismatched_schemas() {
3717 let batch_schema = Schema::new(vec![Field::new("count", DataType::Int32, false)]);
3718 let file_schema = Arc::new(Schema::new(vec![Field::new(
3719 "temperature",
3720 DataType::Float64,
3721 false,
3722 )]));
3723
3724 let batch = RecordBatch::try_new(
3725 Arc::new(batch_schema),
3726 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3727 )
3728 .unwrap();
3729
3730 let mut buf = Vec::with_capacity(1024);
3731 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
3732
3733 let err = writer.write(&batch).unwrap_err().to_string();
3734 assert_eq!(
3735 err,
3736 "Arrow: Incompatible type. Field 'temperature' has type Float64, array has type Int32"
3737 );
3738 }
3739
3740 #[test]
3741 fn test_roundtrip_empty_schema() {
3743 let empty_batch = RecordBatch::try_new_with_options(
3745 Arc::new(Schema::empty()),
3746 vec![],
3747 &RecordBatchOptions::default().with_row_count(Some(0)),
3748 )
3749 .unwrap();
3750
3751 let mut parquet_bytes: Vec<u8> = Vec::new();
3753 let mut writer =
3754 ArrowWriter::try_new(&mut parquet_bytes, empty_batch.schema(), None).unwrap();
3755 writer.write(&empty_batch).unwrap();
3756 writer.close().unwrap();
3757
3758 let bytes = Bytes::from(parquet_bytes);
3760 let reader = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
3761 assert_eq!(reader.schema(), &empty_batch.schema());
3762 let batches: Vec<_> = reader
3763 .build()
3764 .unwrap()
3765 .collect::<ArrowResult<Vec<_>>>()
3766 .unwrap();
3767 assert_eq!(batches.len(), 0);
3768 }
3769}