1use bytes::Bytes;
21use std::io::{Read, Write};
22use std::iter::Peekable;
23use std::slice::Iter;
24use std::sync::{Arc, Mutex};
25use std::vec::IntoIter;
26use thrift::protocol::TCompactOutputProtocol;
27
28use arrow_array::cast::AsArray;
29use arrow_array::types::*;
30use arrow_array::{ArrayRef, RecordBatch, RecordBatchWriter};
31use arrow_schema::{ArrowError, DataType as ArrowDataType, Field, IntervalUnit, SchemaRef};
32
33use super::schema::{add_encoded_arrow_schema_to_metadata, decimal_length_from_precision};
34
35use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder;
36use crate::arrow::ArrowSchemaConverter;
37use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter};
38use crate::column::page_encryption::PageEncryptor;
39use crate::column::writer::encoder::ColumnValueEncoder;
40use crate::column::writer::{
41 get_column_writer, ColumnCloseResult, ColumnWriter, GenericColumnWriter,
42};
43use crate::data_type::{ByteArray, FixedLenByteArray};
44#[cfg(feature = "encryption")]
45use crate::encryption::encrypt::FileEncryptor;
46use crate::errors::{ParquetError, Result};
47use crate::file::metadata::{KeyValue, RowGroupMetaData};
48use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
49use crate::file::reader::{ChunkReader, Length};
50use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
51use crate::schema::types::{ColumnDescPtr, SchemaDescriptor};
52use crate::thrift::TSerializable;
53use levels::{calculate_array_levels, ArrayLevels};
54
55mod byte_array;
56mod levels;
57
58pub struct ArrowWriter<W: Write> {
132 writer: SerializedFileWriter<W>,
134
135 in_progress: Option<ArrowRowGroupWriter>,
137
138 arrow_schema: SchemaRef,
142
143 row_group_writer_factory: ArrowRowGroupWriterFactory,
145
146 max_row_group_size: usize,
148}
149
150impl<W: Write + Send> std::fmt::Debug for ArrowWriter<W> {
151 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
152 let buffered_memory = self.in_progress_size();
153 f.debug_struct("ArrowWriter")
154 .field("writer", &self.writer)
155 .field("in_progress_size", &format_args!("{buffered_memory} bytes"))
156 .field("in_progress_rows", &self.in_progress_rows())
157 .field("arrow_schema", &self.arrow_schema)
158 .field("max_row_group_size", &self.max_row_group_size)
159 .finish()
160 }
161}
162
163impl<W: Write + Send> ArrowWriter<W> {
164 pub fn try_new(
170 writer: W,
171 arrow_schema: SchemaRef,
172 props: Option<WriterProperties>,
173 ) -> Result<Self> {
174 let options = ArrowWriterOptions::new().with_properties(props.unwrap_or_default());
175 Self::try_new_with_options(writer, arrow_schema, options)
176 }
177
178 pub fn try_new_with_options(
184 writer: W,
185 arrow_schema: SchemaRef,
186 options: ArrowWriterOptions,
187 ) -> Result<Self> {
188 let mut props = options.properties;
189 let mut converter = ArrowSchemaConverter::new().with_coerce_types(props.coerce_types());
190 if let Some(schema_root) = &options.schema_root {
191 converter = converter.schema_root(schema_root);
192 }
193 let schema = converter.convert(&arrow_schema)?;
194 if !options.skip_arrow_metadata {
195 add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
197 }
198
199 let max_row_group_size = props.max_row_group_size();
200
201 let file_writer =
202 SerializedFileWriter::new(writer, schema.root_schema_ptr(), Arc::new(props))?;
203
204 let row_group_writer_factory = ArrowRowGroupWriterFactory::new(&file_writer);
205
206 Ok(Self {
207 writer: file_writer,
208 in_progress: None,
209 arrow_schema,
210 row_group_writer_factory,
211 max_row_group_size,
212 })
213 }
214
215 pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
217 self.writer.flushed_row_groups()
218 }
219
220 pub fn memory_size(&self) -> usize {
225 match &self.in_progress {
226 Some(in_progress) => in_progress.writers.iter().map(|x| x.memory_size()).sum(),
227 None => 0,
228 }
229 }
230
231 pub fn in_progress_size(&self) -> usize {
238 match &self.in_progress {
239 Some(in_progress) => in_progress
240 .writers
241 .iter()
242 .map(|x| x.get_estimated_total_bytes())
243 .sum(),
244 None => 0,
245 }
246 }
247
248 pub fn in_progress_rows(&self) -> usize {
250 self.in_progress
251 .as_ref()
252 .map(|x| x.buffered_rows)
253 .unwrap_or_default()
254 }
255
256 pub fn bytes_written(&self) -> usize {
258 self.writer.bytes_written()
259 }
260
261 pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
269 if batch.num_rows() == 0 {
270 return Ok(());
271 }
272
273 let in_progress = match &mut self.in_progress {
274 Some(in_progress) => in_progress,
275 x => x.insert(self.row_group_writer_factory.create_row_group_writer(
276 self.writer.schema_descr(),
277 self.writer.properties(),
278 &self.arrow_schema,
279 self.writer.flushed_row_groups().len(),
280 )?),
281 };
282
283 if in_progress.buffered_rows + batch.num_rows() > self.max_row_group_size {
285 let to_write = self.max_row_group_size - in_progress.buffered_rows;
286 let a = batch.slice(0, to_write);
287 let b = batch.slice(to_write, batch.num_rows() - to_write);
288 self.write(&a)?;
289 return self.write(&b);
290 }
291
292 in_progress.write(batch)?;
293
294 if in_progress.buffered_rows >= self.max_row_group_size {
295 self.flush()?
296 }
297 Ok(())
298 }
299
300 pub fn flush(&mut self) -> Result<()> {
302 let in_progress = match self.in_progress.take() {
303 Some(in_progress) => in_progress,
304 None => return Ok(()),
305 };
306
307 let mut row_group_writer = self.writer.next_row_group()?;
308 for chunk in in_progress.close()? {
309 chunk.append_to_row_group(&mut row_group_writer)?;
310 }
311 row_group_writer.close()?;
312 Ok(())
313 }
314
315 pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
319 self.writer.append_key_value_metadata(kv_metadata)
320 }
321
322 pub fn inner(&self) -> &W {
324 self.writer.inner()
325 }
326
327 pub fn inner_mut(&mut self) -> &mut W {
332 self.writer.inner_mut()
333 }
334
335 pub fn into_inner(mut self) -> Result<W> {
337 self.flush()?;
338 self.writer.into_inner()
339 }
340
341 pub fn finish(&mut self) -> Result<crate::format::FileMetaData> {
347 self.flush()?;
348 self.writer.finish()
349 }
350
351 pub fn close(mut self) -> Result<crate::format::FileMetaData> {
353 self.finish()
354 }
355}
356
357impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> {
358 fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
359 self.write(batch).map_err(|e| e.into())
360 }
361
362 fn close(self) -> std::result::Result<(), ArrowError> {
363 self.close()?;
364 Ok(())
365 }
366}
367
368#[derive(Debug, Clone, Default)]
372pub struct ArrowWriterOptions {
373 properties: WriterProperties,
374 skip_arrow_metadata: bool,
375 schema_root: Option<String>,
376}
377
378impl ArrowWriterOptions {
379 pub fn new() -> Self {
381 Self::default()
382 }
383
384 pub fn with_properties(self, properties: WriterProperties) -> Self {
386 Self { properties, ..self }
387 }
388
389 pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
396 Self {
397 skip_arrow_metadata,
398 ..self
399 }
400 }
401
402 pub fn with_schema_root(self, schema_root: String) -> Self {
404 Self {
405 schema_root: Some(schema_root),
406 ..self
407 }
408 }
409}
410
411#[derive(Default)]
413struct ArrowColumnChunkData {
414 length: usize,
415 data: Vec<Bytes>,
416}
417
418impl Length for ArrowColumnChunkData {
419 fn len(&self) -> u64 {
420 self.length as _
421 }
422}
423
424impl ChunkReader for ArrowColumnChunkData {
425 type T = ArrowColumnChunkReader;
426
427 fn get_read(&self, start: u64) -> Result<Self::T> {
428 assert_eq!(start, 0); Ok(ArrowColumnChunkReader(
430 self.data.clone().into_iter().peekable(),
431 ))
432 }
433
434 fn get_bytes(&self, _start: u64, _length: usize) -> Result<Bytes> {
435 unimplemented!()
436 }
437}
438
439struct ArrowColumnChunkReader(Peekable<IntoIter<Bytes>>);
441
442impl Read for ArrowColumnChunkReader {
443 fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
444 let buffer = loop {
445 match self.0.peek_mut() {
446 Some(b) if b.is_empty() => {
447 self.0.next();
448 continue;
449 }
450 Some(b) => break b,
451 None => return Ok(0),
452 }
453 };
454
455 let len = buffer.len().min(out.len());
456 let b = buffer.split_to(len);
457 out[..len].copy_from_slice(&b);
458 Ok(len)
459 }
460}
461
462type SharedColumnChunk = Arc<Mutex<ArrowColumnChunkData>>;
467
468#[derive(Default)]
469struct ArrowPageWriter {
470 buffer: SharedColumnChunk,
471 #[cfg(feature = "encryption")]
472 page_encryptor: Option<PageEncryptor>,
473}
474
475impl ArrowPageWriter {
476 #[cfg(feature = "encryption")]
477 pub fn with_encryptor(mut self, page_encryptor: Option<PageEncryptor>) -> Self {
478 self.page_encryptor = page_encryptor;
479 self
480 }
481
482 #[cfg(feature = "encryption")]
483 fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
484 self.page_encryptor.as_mut()
485 }
486
487 #[cfg(not(feature = "encryption"))]
488 fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
489 None
490 }
491}
492
493impl PageWriter for ArrowPageWriter {
494 fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
495 let page = match self.page_encryptor_mut() {
496 Some(page_encryptor) => page_encryptor.encrypt_compressed_page(page)?,
497 None => page,
498 };
499
500 let page_header = page.to_thrift_header();
501 let header = {
502 let mut header = Vec::with_capacity(1024);
503
504 match self.page_encryptor_mut() {
505 Some(page_encryptor) => {
506 page_encryptor.encrypt_page_header(&page_header, &mut header)?;
507 if page.compressed_page().is_data_page() {
508 page_encryptor.increment_page();
509 }
510 }
511 None => {
512 let mut protocol = TCompactOutputProtocol::new(&mut header);
513 page_header.write_to_out_protocol(&mut protocol)?;
514 }
515 };
516
517 Bytes::from(header)
518 };
519
520 let mut buf = self.buffer.try_lock().unwrap();
521
522 let data = page.compressed_page().buffer().clone();
523 let compressed_size = data.len() + header.len();
524
525 let mut spec = PageWriteSpec::new();
526 spec.page_type = page.page_type();
527 spec.num_values = page.num_values();
528 spec.uncompressed_size = page.uncompressed_size() + header.len();
529 spec.offset = buf.length as u64;
530 spec.compressed_size = compressed_size;
531 spec.bytes_written = compressed_size as u64;
532
533 buf.length += compressed_size;
534 buf.data.push(header);
535 buf.data.push(data);
536
537 Ok(spec)
538 }
539
540 fn close(&mut self) -> Result<()> {
541 Ok(())
542 }
543}
544
545#[derive(Debug)]
547pub struct ArrowLeafColumn(ArrayLevels);
548
549pub fn compute_leaves(field: &Field, array: &ArrayRef) -> Result<Vec<ArrowLeafColumn>> {
551 let levels = calculate_array_levels(array, field)?;
552 Ok(levels.into_iter().map(ArrowLeafColumn).collect())
553}
554
555pub struct ArrowColumnChunk {
557 data: ArrowColumnChunkData,
558 close: ColumnCloseResult,
559}
560
561impl std::fmt::Debug for ArrowColumnChunk {
562 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
563 f.debug_struct("ArrowColumnChunk")
564 .field("length", &self.data.length)
565 .finish_non_exhaustive()
566 }
567}
568
569impl ArrowColumnChunk {
570 pub fn append_to_row_group<W: Write + Send>(
572 self,
573 writer: &mut SerializedRowGroupWriter<'_, W>,
574 ) -> Result<()> {
575 writer.append_column(&self.data, self.close)
576 }
577}
578
579pub struct ArrowColumnWriter {
673 writer: ArrowColumnWriterImpl,
674 chunk: SharedColumnChunk,
675}
676
677impl std::fmt::Debug for ArrowColumnWriter {
678 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
679 f.debug_struct("ArrowColumnWriter").finish_non_exhaustive()
680 }
681}
682
683enum ArrowColumnWriterImpl {
684 ByteArray(GenericColumnWriter<'static, ByteArrayEncoder>),
685 Column(ColumnWriter<'static>),
686}
687
688impl ArrowColumnWriter {
689 pub fn write(&mut self, col: &ArrowLeafColumn) -> Result<()> {
691 match &mut self.writer {
692 ArrowColumnWriterImpl::Column(c) => {
693 write_leaf(c, &col.0)?;
694 }
695 ArrowColumnWriterImpl::ByteArray(c) => {
696 write_primitive(c, col.0.array().as_ref(), &col.0)?;
697 }
698 }
699 Ok(())
700 }
701
702 pub fn close(self) -> Result<ArrowColumnChunk> {
704 let close = match self.writer {
705 ArrowColumnWriterImpl::ByteArray(c) => c.close()?,
706 ArrowColumnWriterImpl::Column(c) => c.close()?,
707 };
708 let chunk = Arc::try_unwrap(self.chunk).ok().unwrap();
709 let data = chunk.into_inner().unwrap();
710 Ok(ArrowColumnChunk { data, close })
711 }
712
713 pub fn memory_size(&self) -> usize {
724 match &self.writer {
725 ArrowColumnWriterImpl::ByteArray(c) => c.memory_size(),
726 ArrowColumnWriterImpl::Column(c) => c.memory_size(),
727 }
728 }
729
730 pub fn get_estimated_total_bytes(&self) -> usize {
738 match &self.writer {
739 ArrowColumnWriterImpl::ByteArray(c) => c.get_estimated_total_bytes() as _,
740 ArrowColumnWriterImpl::Column(c) => c.get_estimated_total_bytes() as _,
741 }
742 }
743}
744
745struct ArrowRowGroupWriter {
747 writers: Vec<ArrowColumnWriter>,
748 schema: SchemaRef,
749 buffered_rows: usize,
750}
751
752impl ArrowRowGroupWriter {
753 fn new(writers: Vec<ArrowColumnWriter>, arrow: &SchemaRef) -> Self {
754 Self {
755 writers,
756 schema: arrow.clone(),
757 buffered_rows: 0,
758 }
759 }
760
761 fn write(&mut self, batch: &RecordBatch) -> Result<()> {
762 self.buffered_rows += batch.num_rows();
763 let mut writers = self.writers.iter_mut();
764 for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
765 for leaf in compute_leaves(field.as_ref(), column)? {
766 writers.next().unwrap().write(&leaf)?
767 }
768 }
769 Ok(())
770 }
771
772 fn close(self) -> Result<Vec<ArrowColumnChunk>> {
773 self.writers
774 .into_iter()
775 .map(|writer| writer.close())
776 .collect()
777 }
778}
779
780struct ArrowRowGroupWriterFactory {
781 #[cfg(feature = "encryption")]
782 file_encryptor: Option<Arc<FileEncryptor>>,
783}
784
785impl ArrowRowGroupWriterFactory {
786 #[cfg(feature = "encryption")]
787 fn new<W: Write + Send>(file_writer: &SerializedFileWriter<W>) -> Self {
788 Self {
789 file_encryptor: file_writer.file_encryptor(),
790 }
791 }
792
793 #[cfg(not(feature = "encryption"))]
794 fn new<W: Write + Send>(_file_writer: &SerializedFileWriter<W>) -> Self {
795 Self {}
796 }
797
798 #[cfg(feature = "encryption")]
799 fn create_row_group_writer(
800 &self,
801 parquet: &SchemaDescriptor,
802 props: &WriterPropertiesPtr,
803 arrow: &SchemaRef,
804 row_group_index: usize,
805 ) -> Result<ArrowRowGroupWriter> {
806 let writers = get_column_writers_with_encryptor(
807 parquet,
808 props,
809 arrow,
810 self.file_encryptor.clone(),
811 row_group_index,
812 )?;
813 Ok(ArrowRowGroupWriter::new(writers, arrow))
814 }
815
816 #[cfg(not(feature = "encryption"))]
817 fn create_row_group_writer(
818 &self,
819 parquet: &SchemaDescriptor,
820 props: &WriterPropertiesPtr,
821 arrow: &SchemaRef,
822 _row_group_index: usize,
823 ) -> Result<ArrowRowGroupWriter> {
824 let writers = get_column_writers(parquet, props, arrow)?;
825 Ok(ArrowRowGroupWriter::new(writers, arrow))
826 }
827}
828
829pub fn get_column_writers(
831 parquet: &SchemaDescriptor,
832 props: &WriterPropertiesPtr,
833 arrow: &SchemaRef,
834) -> Result<Vec<ArrowColumnWriter>> {
835 let mut writers = Vec::with_capacity(arrow.fields.len());
836 let mut leaves = parquet.columns().iter();
837 let column_factory = ArrowColumnWriterFactory::new();
838 for field in &arrow.fields {
839 column_factory.get_arrow_column_writer(
840 field.data_type(),
841 props,
842 &mut leaves,
843 &mut writers,
844 )?;
845 }
846 Ok(writers)
847}
848
849#[cfg(feature = "encryption")]
851fn get_column_writers_with_encryptor(
852 parquet: &SchemaDescriptor,
853 props: &WriterPropertiesPtr,
854 arrow: &SchemaRef,
855 file_encryptor: Option<Arc<FileEncryptor>>,
856 row_group_index: usize,
857) -> Result<Vec<ArrowColumnWriter>> {
858 let mut writers = Vec::with_capacity(arrow.fields.len());
859 let mut leaves = parquet.columns().iter();
860 let column_factory =
861 ArrowColumnWriterFactory::new().with_file_encryptor(row_group_index, file_encryptor);
862 for field in &arrow.fields {
863 column_factory.get_arrow_column_writer(
864 field.data_type(),
865 props,
866 &mut leaves,
867 &mut writers,
868 )?;
869 }
870 Ok(writers)
871}
872
873struct ArrowColumnWriterFactory {
875 #[cfg(feature = "encryption")]
876 row_group_index: usize,
877 #[cfg(feature = "encryption")]
878 file_encryptor: Option<Arc<FileEncryptor>>,
879}
880
881impl ArrowColumnWriterFactory {
882 pub fn new() -> Self {
883 Self {
884 #[cfg(feature = "encryption")]
885 row_group_index: 0,
886 #[cfg(feature = "encryption")]
887 file_encryptor: None,
888 }
889 }
890
891 #[cfg(feature = "encryption")]
892 pub fn with_file_encryptor(
893 mut self,
894 row_group_index: usize,
895 file_encryptor: Option<Arc<FileEncryptor>>,
896 ) -> Self {
897 self.row_group_index = row_group_index;
898 self.file_encryptor = file_encryptor;
899 self
900 }
901
902 #[cfg(feature = "encryption")]
903 fn create_page_writer(
904 &self,
905 column_descriptor: &ColumnDescPtr,
906 column_index: usize,
907 ) -> Result<Box<ArrowPageWriter>> {
908 let column_path = column_descriptor.path().string();
909 let page_encryptor = PageEncryptor::create_if_column_encrypted(
910 &self.file_encryptor,
911 self.row_group_index,
912 column_index,
913 &column_path,
914 )?;
915 Ok(Box::new(
916 ArrowPageWriter::default().with_encryptor(page_encryptor),
917 ))
918 }
919
920 #[cfg(not(feature = "encryption"))]
921 fn create_page_writer(
922 &self,
923 _column_descriptor: &ColumnDescPtr,
924 _column_index: usize,
925 ) -> Result<Box<ArrowPageWriter>> {
926 Ok(Box::<ArrowPageWriter>::default())
927 }
928
929 fn get_arrow_column_writer(
931 &self,
932 data_type: &ArrowDataType,
933 props: &WriterPropertiesPtr,
934 leaves: &mut Iter<'_, ColumnDescPtr>,
935 out: &mut Vec<ArrowColumnWriter>,
936 ) -> Result<()> {
937 let col = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
938 let page_writer = self.create_page_writer(desc, out.len())?;
939 let chunk = page_writer.buffer.clone();
940 let writer = get_column_writer(desc.clone(), props.clone(), page_writer);
941 Ok(ArrowColumnWriter {
942 chunk,
943 writer: ArrowColumnWriterImpl::Column(writer),
944 })
945 };
946
947 let bytes = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
948 let page_writer = self.create_page_writer(desc, out.len())?;
949 let chunk = page_writer.buffer.clone();
950 let writer = GenericColumnWriter::new(desc.clone(), props.clone(), page_writer);
951 Ok(ArrowColumnWriter {
952 chunk,
953 writer: ArrowColumnWriterImpl::ByteArray(writer),
954 })
955 };
956
957 match data_type {
958 _ if data_type.is_primitive() => out.push(col(leaves.next().unwrap())?),
959 ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Boolean | ArrowDataType::Null => out.push(col(leaves.next().unwrap())?),
960 ArrowDataType::LargeBinary
961 | ArrowDataType::Binary
962 | ArrowDataType::Utf8
963 | ArrowDataType::LargeUtf8
964 | ArrowDataType::BinaryView
965 | ArrowDataType::Utf8View => {
966 out.push(bytes(leaves.next().unwrap())?)
967 }
968 ArrowDataType::List(f)
969 | ArrowDataType::LargeList(f)
970 | ArrowDataType::FixedSizeList(f, _) => {
971 self.get_arrow_column_writer(f.data_type(), props, leaves, out)?
972 }
973 ArrowDataType::Struct(fields) => {
974 for field in fields {
975 self.get_arrow_column_writer(field.data_type(), props, leaves, out)?
976 }
977 }
978 ArrowDataType::Map(f, _) => match f.data_type() {
979 ArrowDataType::Struct(f) => {
980 self.get_arrow_column_writer(f[0].data_type(), props, leaves, out)?;
981 self.get_arrow_column_writer(f[1].data_type(), props, leaves, out)?
982 }
983 _ => unreachable!("invalid map type"),
984 }
985 ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
986 ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 | ArrowDataType::Binary | ArrowDataType::LargeBinary => {
987 out.push(bytes(leaves.next().unwrap())?)
988 }
989 ArrowDataType::Utf8View | ArrowDataType::BinaryView => {
990 out.push(bytes(leaves.next().unwrap())?)
991 }
992 _ => {
993 out.push(col(leaves.next().unwrap())?)
994 }
995 }
996 _ => return Err(ParquetError::NYI(
997 format!(
998 "Attempting to write an Arrow type {data_type:?} to parquet that is not yet implemented"
999 )
1000 ))
1001 }
1002 Ok(())
1003 }
1004}
1005
1006fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result<usize> {
1007 let column = levels.array().as_ref();
1008 let indices = levels.non_null_indices();
1009 match writer {
1010 ColumnWriter::Int32ColumnWriter(ref mut typed) => {
1011 match column.data_type() {
1012 ArrowDataType::Date64 => {
1013 let array = arrow_cast::cast(column, &ArrowDataType::Date32)?;
1015 let array = arrow_cast::cast(&array, &ArrowDataType::Int32)?;
1016
1017 let array = array.as_primitive::<Int32Type>();
1018 write_primitive(typed, array.values(), levels)
1019 }
1020 ArrowDataType::UInt32 => {
1021 let values = column.as_primitive::<UInt32Type>().values();
1022 let array = values.inner().typed_data::<i32>();
1025 write_primitive(typed, array, levels)
1026 }
1027 ArrowDataType::Decimal128(_, _) => {
1028 let array = column
1030 .as_primitive::<Decimal128Type>()
1031 .unary::<_, Int32Type>(|v| v as i32);
1032 write_primitive(typed, array.values(), levels)
1033 }
1034 ArrowDataType::Decimal256(_, _) => {
1035 let array = column
1037 .as_primitive::<Decimal256Type>()
1038 .unary::<_, Int32Type>(|v| v.as_i128() as i32);
1039 write_primitive(typed, array.values(), levels)
1040 }
1041 ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1042 ArrowDataType::Decimal128(_, _) => {
1043 let array = arrow_cast::cast(column, value_type)?;
1044 let array = array
1045 .as_primitive::<Decimal128Type>()
1046 .unary::<_, Int32Type>(|v| v as i32);
1047 write_primitive(typed, array.values(), levels)
1048 }
1049 ArrowDataType::Decimal256(_, _) => {
1050 let array = arrow_cast::cast(column, value_type)?;
1051 let array = array
1052 .as_primitive::<Decimal256Type>()
1053 .unary::<_, Int32Type>(|v| v.as_i128() as i32);
1054 write_primitive(typed, array.values(), levels)
1055 }
1056 _ => {
1057 let array = arrow_cast::cast(column, &ArrowDataType::Int32)?;
1058 let array = array.as_primitive::<Int32Type>();
1059 write_primitive(typed, array.values(), levels)
1060 }
1061 },
1062 _ => {
1063 let array = arrow_cast::cast(column, &ArrowDataType::Int32)?;
1064 let array = array.as_primitive::<Int32Type>();
1065 write_primitive(typed, array.values(), levels)
1066 }
1067 }
1068 }
1069 ColumnWriter::BoolColumnWriter(ref mut typed) => {
1070 let array = column.as_boolean();
1071 typed.write_batch(
1072 get_bool_array_slice(array, indices).as_slice(),
1073 levels.def_levels(),
1074 levels.rep_levels(),
1075 )
1076 }
1077 ColumnWriter::Int64ColumnWriter(ref mut typed) => {
1078 match column.data_type() {
1079 ArrowDataType::Date64 => {
1080 let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
1081
1082 let array = array.as_primitive::<Int64Type>();
1083 write_primitive(typed, array.values(), levels)
1084 }
1085 ArrowDataType::Int64 => {
1086 let array = column.as_primitive::<Int64Type>();
1087 write_primitive(typed, array.values(), levels)
1088 }
1089 ArrowDataType::UInt64 => {
1090 let values = column.as_primitive::<UInt64Type>().values();
1091 let array = values.inner().typed_data::<i64>();
1094 write_primitive(typed, array, levels)
1095 }
1096 ArrowDataType::Decimal128(_, _) => {
1097 let array = column
1099 .as_primitive::<Decimal128Type>()
1100 .unary::<_, Int64Type>(|v| v as i64);
1101 write_primitive(typed, array.values(), levels)
1102 }
1103 ArrowDataType::Decimal256(_, _) => {
1104 let array = column
1106 .as_primitive::<Decimal256Type>()
1107 .unary::<_, Int64Type>(|v| v.as_i128() as i64);
1108 write_primitive(typed, array.values(), levels)
1109 }
1110 ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1111 ArrowDataType::Decimal128(_, _) => {
1112 let array = arrow_cast::cast(column, value_type)?;
1113 let array = array
1114 .as_primitive::<Decimal128Type>()
1115 .unary::<_, Int64Type>(|v| v as i64);
1116 write_primitive(typed, array.values(), levels)
1117 }
1118 ArrowDataType::Decimal256(_, _) => {
1119 let array = arrow_cast::cast(column, value_type)?;
1120 let array = array
1121 .as_primitive::<Decimal256Type>()
1122 .unary::<_, Int64Type>(|v| v.as_i128() as i64);
1123 write_primitive(typed, array.values(), levels)
1124 }
1125 _ => {
1126 let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
1127 let array = array.as_primitive::<Int64Type>();
1128 write_primitive(typed, array.values(), levels)
1129 }
1130 },
1131 _ => {
1132 let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
1133 let array = array.as_primitive::<Int64Type>();
1134 write_primitive(typed, array.values(), levels)
1135 }
1136 }
1137 }
1138 ColumnWriter::Int96ColumnWriter(ref mut _typed) => {
1139 unreachable!("Currently unreachable because data type not supported")
1140 }
1141 ColumnWriter::FloatColumnWriter(ref mut typed) => {
1142 let array = column.as_primitive::<Float32Type>();
1143 write_primitive(typed, array.values(), levels)
1144 }
1145 ColumnWriter::DoubleColumnWriter(ref mut typed) => {
1146 let array = column.as_primitive::<Float64Type>();
1147 write_primitive(typed, array.values(), levels)
1148 }
1149 ColumnWriter::ByteArrayColumnWriter(_) => {
1150 unreachable!("should use ByteArrayWriter")
1151 }
1152 ColumnWriter::FixedLenByteArrayColumnWriter(ref mut typed) => {
1153 let bytes = match column.data_type() {
1154 ArrowDataType::Interval(interval_unit) => match interval_unit {
1155 IntervalUnit::YearMonth => {
1156 let array = column
1157 .as_any()
1158 .downcast_ref::<arrow_array::IntervalYearMonthArray>()
1159 .unwrap();
1160 get_interval_ym_array_slice(array, indices)
1161 }
1162 IntervalUnit::DayTime => {
1163 let array = column
1164 .as_any()
1165 .downcast_ref::<arrow_array::IntervalDayTimeArray>()
1166 .unwrap();
1167 get_interval_dt_array_slice(array, indices)
1168 }
1169 _ => {
1170 return Err(ParquetError::NYI(
1171 format!(
1172 "Attempting to write an Arrow interval type {interval_unit:?} to parquet that is not yet implemented"
1173 )
1174 ));
1175 }
1176 },
1177 ArrowDataType::FixedSizeBinary(_) => {
1178 let array = column
1179 .as_any()
1180 .downcast_ref::<arrow_array::FixedSizeBinaryArray>()
1181 .unwrap();
1182 get_fsb_array_slice(array, indices)
1183 }
1184 ArrowDataType::Decimal128(_, _) => {
1185 let array = column.as_primitive::<Decimal128Type>();
1186 get_decimal_128_array_slice(array, indices)
1187 }
1188 ArrowDataType::Decimal256(_, _) => {
1189 let array = column
1190 .as_any()
1191 .downcast_ref::<arrow_array::Decimal256Array>()
1192 .unwrap();
1193 get_decimal_256_array_slice(array, indices)
1194 }
1195 ArrowDataType::Float16 => {
1196 let array = column.as_primitive::<Float16Type>();
1197 get_float_16_array_slice(array, indices)
1198 }
1199 _ => {
1200 return Err(ParquetError::NYI(
1201 "Attempting to write an Arrow type that is not yet implemented".to_string(),
1202 ));
1203 }
1204 };
1205 typed.write_batch(bytes.as_slice(), levels.def_levels(), levels.rep_levels())
1206 }
1207 }
1208}
1209
1210fn write_primitive<E: ColumnValueEncoder>(
1211 writer: &mut GenericColumnWriter<E>,
1212 values: &E::Values,
1213 levels: &ArrayLevels,
1214) -> Result<usize> {
1215 writer.write_batch_internal(
1216 values,
1217 Some(levels.non_null_indices()),
1218 levels.def_levels(),
1219 levels.rep_levels(),
1220 None,
1221 None,
1222 None,
1223 )
1224}
1225
1226fn get_bool_array_slice(array: &arrow_array::BooleanArray, indices: &[usize]) -> Vec<bool> {
1227 let mut values = Vec::with_capacity(indices.len());
1228 for i in indices {
1229 values.push(array.value(*i))
1230 }
1231 values
1232}
1233
1234fn get_interval_ym_array_slice(
1237 array: &arrow_array::IntervalYearMonthArray,
1238 indices: &[usize],
1239) -> Vec<FixedLenByteArray> {
1240 let mut values = Vec::with_capacity(indices.len());
1241 for i in indices {
1242 let mut value = array.value(*i).to_le_bytes().to_vec();
1243 let mut suffix = vec![0; 8];
1244 value.append(&mut suffix);
1245 values.push(FixedLenByteArray::from(ByteArray::from(value)))
1246 }
1247 values
1248}
1249
1250fn get_interval_dt_array_slice(
1253 array: &arrow_array::IntervalDayTimeArray,
1254 indices: &[usize],
1255) -> Vec<FixedLenByteArray> {
1256 let mut values = Vec::with_capacity(indices.len());
1257 for i in indices {
1258 let mut out = [0; 12];
1259 let value = array.value(*i);
1260 out[4..8].copy_from_slice(&value.days.to_le_bytes());
1261 out[8..12].copy_from_slice(&value.milliseconds.to_le_bytes());
1262 values.push(FixedLenByteArray::from(ByteArray::from(out.to_vec())));
1263 }
1264 values
1265}
1266
1267fn get_decimal_128_array_slice(
1268 array: &arrow_array::Decimal128Array,
1269 indices: &[usize],
1270) -> Vec<FixedLenByteArray> {
1271 let mut values = Vec::with_capacity(indices.len());
1272 let size = decimal_length_from_precision(array.precision());
1273 for i in indices {
1274 let as_be_bytes = array.value(*i).to_be_bytes();
1275 let resized_value = as_be_bytes[(16 - size)..].to_vec();
1276 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1277 }
1278 values
1279}
1280
1281fn get_decimal_256_array_slice(
1282 array: &arrow_array::Decimal256Array,
1283 indices: &[usize],
1284) -> Vec<FixedLenByteArray> {
1285 let mut values = Vec::with_capacity(indices.len());
1286 let size = decimal_length_from_precision(array.precision());
1287 for i in indices {
1288 let as_be_bytes = array.value(*i).to_be_bytes();
1289 let resized_value = as_be_bytes[(32 - size)..].to_vec();
1290 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1291 }
1292 values
1293}
1294
1295fn get_float_16_array_slice(
1296 array: &arrow_array::Float16Array,
1297 indices: &[usize],
1298) -> Vec<FixedLenByteArray> {
1299 let mut values = Vec::with_capacity(indices.len());
1300 for i in indices {
1301 let value = array.value(*i).to_le_bytes().to_vec();
1302 values.push(FixedLenByteArray::from(ByteArray::from(value)));
1303 }
1304 values
1305}
1306
1307fn get_fsb_array_slice(
1308 array: &arrow_array::FixedSizeBinaryArray,
1309 indices: &[usize],
1310) -> Vec<FixedLenByteArray> {
1311 let mut values = Vec::with_capacity(indices.len());
1312 for i in indices {
1313 let value = array.value(*i).to_vec();
1314 values.push(FixedLenByteArray::from(ByteArray::from(value)))
1315 }
1316 values
1317}
1318
1319#[cfg(test)]
1320mod tests {
1321 use super::*;
1322
1323 use std::fs::File;
1324
1325 use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
1326 use crate::arrow::ARROW_SCHEMA_META_KEY;
1327 use arrow::datatypes::ToByteSlice;
1328 use arrow::datatypes::{DataType, Schema};
1329 use arrow::error::Result as ArrowResult;
1330 use arrow::util::data_gen::create_random_array;
1331 use arrow::util::pretty::pretty_format_batches;
1332 use arrow::{array::*, buffer::Buffer};
1333 use arrow_buffer::{i256, IntervalDayTime, IntervalMonthDayNano, NullBuffer};
1334 use arrow_schema::Fields;
1335 use half::f16;
1336
1337 use crate::basic::Encoding;
1338 use crate::data_type::AsBytes;
1339 use crate::file::metadata::ParquetMetaData;
1340 use crate::file::page_index::index::Index;
1341 use crate::file::page_index::index_reader::read_offset_indexes;
1342 use crate::file::properties::{
1343 BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion,
1344 };
1345 use crate::file::serialized_reader::ReadOptionsBuilder;
1346 use crate::file::{
1347 reader::{FileReader, SerializedFileReader},
1348 statistics::Statistics,
1349 };
1350
1351 #[test]
1352 fn arrow_writer() {
1353 let schema = Schema::new(vec![
1355 Field::new("a", DataType::Int32, false),
1356 Field::new("b", DataType::Int32, true),
1357 ]);
1358
1359 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1361 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1362
1363 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
1365
1366 roundtrip(batch, Some(SMALL_SIZE / 2));
1367 }
1368
1369 fn get_bytes_after_close(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1370 let mut buffer = vec![];
1371
1372 let mut writer = ArrowWriter::try_new(&mut buffer, schema, None).unwrap();
1373 writer.write(expected_batch).unwrap();
1374 writer.close().unwrap();
1375
1376 buffer
1377 }
1378
1379 fn get_bytes_by_into_inner(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1380 let mut writer = ArrowWriter::try_new(Vec::new(), schema, None).unwrap();
1381 writer.write(expected_batch).unwrap();
1382 writer.into_inner().unwrap()
1383 }
1384
1385 #[test]
1386 fn roundtrip_bytes() {
1387 let schema = Arc::new(Schema::new(vec![
1389 Field::new("a", DataType::Int32, false),
1390 Field::new("b", DataType::Int32, true),
1391 ]));
1392
1393 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1395 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1396
1397 let expected_batch =
1399 RecordBatch::try_new(schema.clone(), vec![Arc::new(a), Arc::new(b)]).unwrap();
1400
1401 for buffer in [
1402 get_bytes_after_close(schema.clone(), &expected_batch),
1403 get_bytes_by_into_inner(schema, &expected_batch),
1404 ] {
1405 let cursor = Bytes::from(buffer);
1406 let mut record_batch_reader = ParquetRecordBatchReader::try_new(cursor, 1024).unwrap();
1407
1408 let actual_batch = record_batch_reader
1409 .next()
1410 .expect("No batch found")
1411 .expect("Unable to get batch");
1412
1413 assert_eq!(expected_batch.schema(), actual_batch.schema());
1414 assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
1415 assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
1416 for i in 0..expected_batch.num_columns() {
1417 let expected_data = expected_batch.column(i).to_data();
1418 let actual_data = actual_batch.column(i).to_data();
1419
1420 assert_eq!(expected_data, actual_data);
1421 }
1422 }
1423 }
1424
1425 #[test]
1426 fn arrow_writer_non_null() {
1427 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1429
1430 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1432
1433 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1435
1436 roundtrip(batch, Some(SMALL_SIZE / 2));
1437 }
1438
1439 #[test]
1440 fn arrow_writer_list() {
1441 let schema = Schema::new(vec![Field::new(
1443 "a",
1444 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
1445 true,
1446 )]);
1447
1448 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1450
1451 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1454
1455 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
1457 DataType::Int32,
1458 false,
1459 ))))
1460 .len(5)
1461 .add_buffer(a_value_offsets)
1462 .add_child_data(a_values.into_data())
1463 .null_bit_buffer(Some(Buffer::from([0b00011011])))
1464 .build()
1465 .unwrap();
1466 let a = ListArray::from(a_list_data);
1467
1468 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1470
1471 assert_eq!(batch.column(0).null_count(), 1);
1472
1473 roundtrip(batch, None);
1476 }
1477
1478 #[test]
1479 fn arrow_writer_list_non_null() {
1480 let schema = Schema::new(vec![Field::new(
1482 "a",
1483 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
1484 false,
1485 )]);
1486
1487 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1489
1490 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1493
1494 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
1496 DataType::Int32,
1497 false,
1498 ))))
1499 .len(5)
1500 .add_buffer(a_value_offsets)
1501 .add_child_data(a_values.into_data())
1502 .build()
1503 .unwrap();
1504 let a = ListArray::from(a_list_data);
1505
1506 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1508
1509 assert_eq!(batch.column(0).null_count(), 0);
1512
1513 roundtrip(batch, None);
1514 }
1515
1516 #[test]
1517 fn arrow_writer_binary() {
1518 let string_field = Field::new("a", DataType::Utf8, false);
1519 let binary_field = Field::new("b", DataType::Binary, false);
1520 let schema = Schema::new(vec![string_field, binary_field]);
1521
1522 let raw_string_values = vec!["foo", "bar", "baz", "quux"];
1523 let raw_binary_values = [
1524 b"foo".to_vec(),
1525 b"bar".to_vec(),
1526 b"baz".to_vec(),
1527 b"quux".to_vec(),
1528 ];
1529 let raw_binary_value_refs = raw_binary_values
1530 .iter()
1531 .map(|x| x.as_slice())
1532 .collect::<Vec<_>>();
1533
1534 let string_values = StringArray::from(raw_string_values.clone());
1535 let binary_values = BinaryArray::from(raw_binary_value_refs);
1536 let batch = RecordBatch::try_new(
1537 Arc::new(schema),
1538 vec![Arc::new(string_values), Arc::new(binary_values)],
1539 )
1540 .unwrap();
1541
1542 roundtrip(batch, Some(SMALL_SIZE / 2));
1543 }
1544
1545 #[test]
1546 fn arrow_writer_binary_view() {
1547 let string_field = Field::new("a", DataType::Utf8View, false);
1548 let binary_field = Field::new("b", DataType::BinaryView, false);
1549 let nullable_string_field = Field::new("a", DataType::Utf8View, true);
1550 let schema = Schema::new(vec![string_field, binary_field, nullable_string_field]);
1551
1552 let raw_string_values = vec!["foo", "bar", "large payload over 12 bytes", "lulu"];
1553 let raw_binary_values = vec![
1554 b"foo".to_vec(),
1555 b"bar".to_vec(),
1556 b"large payload over 12 bytes".to_vec(),
1557 b"lulu".to_vec(),
1558 ];
1559 let nullable_string_values =
1560 vec![Some("foo"), None, Some("large payload over 12 bytes"), None];
1561
1562 let string_view_values = StringViewArray::from(raw_string_values);
1563 let binary_view_values = BinaryViewArray::from_iter_values(raw_binary_values);
1564 let nullable_string_view_values = StringViewArray::from(nullable_string_values);
1565 let batch = RecordBatch::try_new(
1566 Arc::new(schema),
1567 vec![
1568 Arc::new(string_view_values),
1569 Arc::new(binary_view_values),
1570 Arc::new(nullable_string_view_values),
1571 ],
1572 )
1573 .unwrap();
1574
1575 roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
1576 roundtrip(batch, None);
1577 }
1578
1579 fn get_decimal_batch(precision: u8, scale: i8) -> RecordBatch {
1580 let decimal_field = Field::new("a", DataType::Decimal128(precision, scale), false);
1581 let schema = Schema::new(vec![decimal_field]);
1582
1583 let decimal_values = vec![10_000, 50_000, 0, -100]
1584 .into_iter()
1585 .map(Some)
1586 .collect::<Decimal128Array>()
1587 .with_precision_and_scale(precision, scale)
1588 .unwrap();
1589
1590 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(decimal_values)]).unwrap()
1591 }
1592
1593 #[test]
1594 fn arrow_writer_decimal() {
1595 let batch_int32_decimal = get_decimal_batch(5, 2);
1597 roundtrip(batch_int32_decimal, Some(SMALL_SIZE / 2));
1598 let batch_int64_decimal = get_decimal_batch(12, 2);
1600 roundtrip(batch_int64_decimal, Some(SMALL_SIZE / 2));
1601 let batch_fixed_len_byte_array_decimal = get_decimal_batch(30, 2);
1603 roundtrip(batch_fixed_len_byte_array_decimal, Some(SMALL_SIZE / 2));
1604 }
1605
1606 #[test]
1607 fn arrow_writer_complex() {
1608 let struct_field_d = Arc::new(Field::new("d", DataType::Float64, true));
1610 let struct_field_f = Arc::new(Field::new("f", DataType::Float32, true));
1611 let struct_field_g = Arc::new(Field::new_list(
1612 "g",
1613 Field::new_list_field(DataType::Int16, true),
1614 false,
1615 ));
1616 let struct_field_h = Arc::new(Field::new_list(
1617 "h",
1618 Field::new_list_field(DataType::Int16, false),
1619 true,
1620 ));
1621 let struct_field_e = Arc::new(Field::new_struct(
1622 "e",
1623 vec![
1624 struct_field_f.clone(),
1625 struct_field_g.clone(),
1626 struct_field_h.clone(),
1627 ],
1628 false,
1629 ));
1630 let schema = Schema::new(vec![
1631 Field::new("a", DataType::Int32, false),
1632 Field::new("b", DataType::Int32, true),
1633 Field::new_struct(
1634 "c",
1635 vec![struct_field_d.clone(), struct_field_e.clone()],
1636 false,
1637 ),
1638 ]);
1639
1640 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1642 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1643 let d = Float64Array::from(vec![None, None, None, Some(1.0), None]);
1644 let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]);
1645
1646 let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1647
1648 let g_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1651
1652 let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
1654 .len(5)
1655 .add_buffer(g_value_offsets.clone())
1656 .add_child_data(g_value.to_data())
1657 .build()
1658 .unwrap();
1659 let g = ListArray::from(g_list_data);
1660 let h_list_data = ArrayData::builder(struct_field_h.data_type().clone())
1662 .len(5)
1663 .add_buffer(g_value_offsets)
1664 .add_child_data(g_value.to_data())
1665 .null_bit_buffer(Some(Buffer::from([0b00011011])))
1666 .build()
1667 .unwrap();
1668 let h = ListArray::from(h_list_data);
1669
1670 let e = StructArray::from(vec![
1671 (struct_field_f, Arc::new(f) as ArrayRef),
1672 (struct_field_g, Arc::new(g) as ArrayRef),
1673 (struct_field_h, Arc::new(h) as ArrayRef),
1674 ]);
1675
1676 let c = StructArray::from(vec![
1677 (struct_field_d, Arc::new(d) as ArrayRef),
1678 (struct_field_e, Arc::new(e) as ArrayRef),
1679 ]);
1680
1681 let batch = RecordBatch::try_new(
1683 Arc::new(schema),
1684 vec![Arc::new(a), Arc::new(b), Arc::new(c)],
1685 )
1686 .unwrap();
1687
1688 roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
1689 roundtrip(batch, Some(SMALL_SIZE / 3));
1690 }
1691
1692 #[test]
1693 fn arrow_writer_complex_mixed() {
1694 let offset_field = Arc::new(Field::new("offset", DataType::Int32, false));
1699 let partition_field = Arc::new(Field::new("partition", DataType::Int64, true));
1700 let topic_field = Arc::new(Field::new("topic", DataType::Utf8, true));
1701 let schema = Schema::new(vec![Field::new(
1702 "some_nested_object",
1703 DataType::Struct(Fields::from(vec![
1704 offset_field.clone(),
1705 partition_field.clone(),
1706 topic_field.clone(),
1707 ])),
1708 false,
1709 )]);
1710
1711 let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
1713 let partition = Int64Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1714 let topic = StringArray::from(vec![Some("A"), None, Some("A"), Some(""), None]);
1715
1716 let some_nested_object = StructArray::from(vec![
1717 (offset_field, Arc::new(offset) as ArrayRef),
1718 (partition_field, Arc::new(partition) as ArrayRef),
1719 (topic_field, Arc::new(topic) as ArrayRef),
1720 ]);
1721
1722 let batch =
1724 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
1725
1726 roundtrip(batch, Some(SMALL_SIZE / 2));
1727 }
1728
1729 #[test]
1730 fn arrow_writer_map() {
1731 let json_content = r#"
1733 {"stocks":{"long": "$AAA", "short": "$BBB"}}
1734 {"stocks":{"long": null, "long": "$CCC", "short": null}}
1735 {"stocks":{"hedged": "$YYY", "long": null, "short": "$D"}}
1736 "#;
1737 let entries_struct_type = DataType::Struct(Fields::from(vec![
1738 Field::new("key", DataType::Utf8, false),
1739 Field::new("value", DataType::Utf8, true),
1740 ]));
1741 let stocks_field = Field::new(
1742 "stocks",
1743 DataType::Map(
1744 Arc::new(Field::new("entries", entries_struct_type, false)),
1745 false,
1746 ),
1747 true,
1748 );
1749 let schema = Arc::new(Schema::new(vec![stocks_field]));
1750 let builder = arrow::json::ReaderBuilder::new(schema).with_batch_size(64);
1751 let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
1752
1753 let batch = reader.next().unwrap().unwrap();
1754 roundtrip(batch, None);
1755 }
1756
1757 #[test]
1758 fn arrow_writer_2_level_struct() {
1759 let field_c = Field::new("c", DataType::Int32, true);
1761 let field_b = Field::new("b", DataType::Struct(vec![field_c].into()), true);
1762 let type_a = DataType::Struct(vec![field_b.clone()].into());
1763 let field_a = Field::new("a", type_a, true);
1764 let schema = Schema::new(vec![field_a.clone()]);
1765
1766 let c = Int32Array::from(vec![Some(1), None, Some(3), None, None, Some(6)]);
1768 let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
1769 .len(6)
1770 .null_bit_buffer(Some(Buffer::from([0b00100111])))
1771 .add_child_data(c.into_data())
1772 .build()
1773 .unwrap();
1774 let b = StructArray::from(b_data);
1775 let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
1776 .len(6)
1777 .null_bit_buffer(Some(Buffer::from([0b00101111])))
1778 .add_child_data(b.into_data())
1779 .build()
1780 .unwrap();
1781 let a = StructArray::from(a_data);
1782
1783 assert_eq!(a.null_count(), 1);
1784 assert_eq!(a.column(0).null_count(), 2);
1785
1786 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1788
1789 roundtrip(batch, Some(SMALL_SIZE / 2));
1790 }
1791
1792 #[test]
1793 fn arrow_writer_2_level_struct_non_null() {
1794 let field_c = Field::new("c", DataType::Int32, false);
1796 let type_b = DataType::Struct(vec![field_c].into());
1797 let field_b = Field::new("b", type_b.clone(), false);
1798 let type_a = DataType::Struct(vec![field_b].into());
1799 let field_a = Field::new("a", type_a.clone(), false);
1800 let schema = Schema::new(vec![field_a]);
1801
1802 let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
1804 let b_data = ArrayDataBuilder::new(type_b)
1805 .len(6)
1806 .add_child_data(c.into_data())
1807 .build()
1808 .unwrap();
1809 let b = StructArray::from(b_data);
1810 let a_data = ArrayDataBuilder::new(type_a)
1811 .len(6)
1812 .add_child_data(b.into_data())
1813 .build()
1814 .unwrap();
1815 let a = StructArray::from(a_data);
1816
1817 assert_eq!(a.null_count(), 0);
1818 assert_eq!(a.column(0).null_count(), 0);
1819
1820 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1822
1823 roundtrip(batch, Some(SMALL_SIZE / 2));
1824 }
1825
1826 #[test]
1827 fn arrow_writer_2_level_struct_mixed_null() {
1828 let field_c = Field::new("c", DataType::Int32, false);
1830 let type_b = DataType::Struct(vec![field_c].into());
1831 let field_b = Field::new("b", type_b.clone(), true);
1832 let type_a = DataType::Struct(vec![field_b].into());
1833 let field_a = Field::new("a", type_a.clone(), false);
1834 let schema = Schema::new(vec![field_a]);
1835
1836 let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
1838 let b_data = ArrayDataBuilder::new(type_b)
1839 .len(6)
1840 .null_bit_buffer(Some(Buffer::from([0b00100111])))
1841 .add_child_data(c.into_data())
1842 .build()
1843 .unwrap();
1844 let b = StructArray::from(b_data);
1845 let a_data = ArrayDataBuilder::new(type_a)
1847 .len(6)
1848 .add_child_data(b.into_data())
1849 .build()
1850 .unwrap();
1851 let a = StructArray::from(a_data);
1852
1853 assert_eq!(a.null_count(), 0);
1854 assert_eq!(a.column(0).null_count(), 2);
1855
1856 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1858
1859 roundtrip(batch, Some(SMALL_SIZE / 2));
1860 }
1861
1862 #[test]
1863 fn arrow_writer_2_level_struct_mixed_null_2() {
1864 let field_c = Field::new("c", DataType::Int32, false);
1866 let field_d = Field::new("d", DataType::FixedSizeBinary(4), false);
1867 let field_e = Field::new(
1868 "e",
1869 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1870 false,
1871 );
1872
1873 let field_b = Field::new(
1874 "b",
1875 DataType::Struct(vec![field_c, field_d, field_e].into()),
1876 false,
1877 );
1878 let type_a = DataType::Struct(vec![field_b.clone()].into());
1879 let field_a = Field::new("a", type_a, true);
1880 let schema = Schema::new(vec![field_a.clone()]);
1881
1882 let c = Int32Array::from_iter_values(0..6);
1884 let d = FixedSizeBinaryArray::try_from_iter(
1885 ["aaaa", "bbbb", "cccc", "dddd", "eeee", "ffff"].into_iter(),
1886 )
1887 .expect("four byte values");
1888 let e = Int32DictionaryArray::from_iter(["one", "two", "three", "four", "five", "one"]);
1889 let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
1890 .len(6)
1891 .add_child_data(c.into_data())
1892 .add_child_data(d.into_data())
1893 .add_child_data(e.into_data())
1894 .build()
1895 .unwrap();
1896 let b = StructArray::from(b_data);
1897 let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
1898 .len(6)
1899 .null_bit_buffer(Some(Buffer::from([0b00100101])))
1900 .add_child_data(b.into_data())
1901 .build()
1902 .unwrap();
1903 let a = StructArray::from(a_data);
1904
1905 assert_eq!(a.null_count(), 3);
1906 assert_eq!(a.column(0).null_count(), 0);
1907
1908 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1910
1911 roundtrip(batch, Some(SMALL_SIZE / 2));
1912 }
1913
1914 #[test]
1915 fn test_empty_dict() {
1916 let struct_fields = Fields::from(vec![Field::new(
1917 "dict",
1918 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1919 false,
1920 )]);
1921
1922 let schema = Schema::new(vec![Field::new_struct(
1923 "struct",
1924 struct_fields.clone(),
1925 true,
1926 )]);
1927 let dictionary = Arc::new(DictionaryArray::new(
1928 Int32Array::new_null(5),
1929 Arc::new(StringArray::new_null(0)),
1930 ));
1931
1932 let s = StructArray::new(
1933 struct_fields,
1934 vec![dictionary],
1935 Some(NullBuffer::new_null(5)),
1936 );
1937
1938 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(s)]).unwrap();
1939 roundtrip(batch, None);
1940 }
1941 #[test]
1942 fn arrow_writer_page_size() {
1943 let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)]));
1944
1945 let mut builder = StringBuilder::with_capacity(100, 329 * 10_000);
1946
1947 for i in 0..10 {
1949 let value = i
1950 .to_string()
1951 .repeat(10)
1952 .chars()
1953 .take(10)
1954 .collect::<String>();
1955
1956 builder.append_value(value);
1957 }
1958
1959 let array = Arc::new(builder.finish());
1960
1961 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
1962
1963 let file = tempfile::tempfile().unwrap();
1964
1965 let props = WriterProperties::builder()
1967 .set_data_page_size_limit(1)
1968 .set_dictionary_page_size_limit(1)
1969 .set_write_batch_size(1)
1970 .build();
1971
1972 let mut writer =
1973 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
1974 .expect("Unable to write file");
1975 writer.write(&batch).unwrap();
1976 writer.close().unwrap();
1977
1978 let reader = SerializedFileReader::new(file.try_clone().unwrap()).unwrap();
1979
1980 let column = reader.metadata().row_group(0).columns();
1981
1982 assert_eq!(column.len(), 1);
1983
1984 assert!(
1987 column[0].dictionary_page_offset().is_some(),
1988 "Expected a dictionary page"
1989 );
1990
1991 let offset_indexes = read_offset_indexes(&file, column).unwrap().unwrap();
1992
1993 let page_locations = offset_indexes[0].page_locations.clone();
1994
1995 assert_eq!(
1998 page_locations.len(),
1999 10,
2000 "Expected 9 pages but got {page_locations:#?}"
2001 );
2002 }
2003
2004 #[test]
2005 fn arrow_writer_float_nans() {
2006 let f16_field = Field::new("a", DataType::Float16, false);
2007 let f32_field = Field::new("b", DataType::Float32, false);
2008 let f64_field = Field::new("c", DataType::Float64, false);
2009 let schema = Schema::new(vec![f16_field, f32_field, f64_field]);
2010
2011 let f16_values = (0..MEDIUM_SIZE)
2012 .map(|i| {
2013 Some(if i % 2 == 0 {
2014 f16::NAN
2015 } else {
2016 f16::from_f32(i as f32)
2017 })
2018 })
2019 .collect::<Float16Array>();
2020
2021 let f32_values = (0..MEDIUM_SIZE)
2022 .map(|i| Some(if i % 2 == 0 { f32::NAN } else { i as f32 }))
2023 .collect::<Float32Array>();
2024
2025 let f64_values = (0..MEDIUM_SIZE)
2026 .map(|i| Some(if i % 2 == 0 { f64::NAN } else { i as f64 }))
2027 .collect::<Float64Array>();
2028
2029 let batch = RecordBatch::try_new(
2030 Arc::new(schema),
2031 vec![
2032 Arc::new(f16_values),
2033 Arc::new(f32_values),
2034 Arc::new(f64_values),
2035 ],
2036 )
2037 .unwrap();
2038
2039 roundtrip(batch, None);
2040 }
2041
2042 const SMALL_SIZE: usize = 7;
2043 const MEDIUM_SIZE: usize = 63;
2044
2045 fn roundtrip(expected_batch: RecordBatch, max_row_group_size: Option<usize>) -> Vec<File> {
2046 let mut files = vec![];
2047 for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2048 let mut props = WriterProperties::builder().set_writer_version(version);
2049
2050 if let Some(size) = max_row_group_size {
2051 props = props.set_max_row_group_size(size)
2052 }
2053
2054 let props = props.build();
2055 files.push(roundtrip_opts(&expected_batch, props))
2056 }
2057 files
2058 }
2059
2060 fn roundtrip_opts_with_array_validation<F>(
2061 expected_batch: &RecordBatch,
2062 props: WriterProperties,
2063 validate: F,
2064 ) -> File
2065 where
2066 F: Fn(&ArrayData, &ArrayData),
2067 {
2068 let file = tempfile::tempfile().unwrap();
2069
2070 let mut writer = ArrowWriter::try_new(
2071 file.try_clone().unwrap(),
2072 expected_batch.schema(),
2073 Some(props),
2074 )
2075 .expect("Unable to write file");
2076 writer.write(expected_batch).unwrap();
2077 writer.close().unwrap();
2078
2079 let mut record_batch_reader =
2080 ParquetRecordBatchReader::try_new(file.try_clone().unwrap(), 1024).unwrap();
2081
2082 let actual_batch = record_batch_reader
2083 .next()
2084 .expect("No batch found")
2085 .expect("Unable to get batch");
2086
2087 assert_eq!(expected_batch.schema(), actual_batch.schema());
2088 assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
2089 assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
2090 for i in 0..expected_batch.num_columns() {
2091 let expected_data = expected_batch.column(i).to_data();
2092 let actual_data = actual_batch.column(i).to_data();
2093 validate(&expected_data, &actual_data);
2094 }
2095
2096 file
2097 }
2098
2099 fn roundtrip_opts(expected_batch: &RecordBatch, props: WriterProperties) -> File {
2100 roundtrip_opts_with_array_validation(expected_batch, props, |a, b| {
2101 a.validate_full().expect("valid expected data");
2102 b.validate_full().expect("valid actual data");
2103 assert_eq!(a, b)
2104 })
2105 }
2106
2107 struct RoundTripOptions {
2108 values: ArrayRef,
2109 schema: SchemaRef,
2110 bloom_filter: bool,
2111 bloom_filter_position: BloomFilterPosition,
2112 }
2113
2114 impl RoundTripOptions {
2115 fn new(values: ArrayRef, nullable: bool) -> Self {
2116 let data_type = values.data_type().clone();
2117 let schema = Schema::new(vec![Field::new("col", data_type, nullable)]);
2118 Self {
2119 values,
2120 schema: Arc::new(schema),
2121 bloom_filter: false,
2122 bloom_filter_position: BloomFilterPosition::AfterRowGroup,
2123 }
2124 }
2125 }
2126
2127 fn one_column_roundtrip(values: ArrayRef, nullable: bool) -> Vec<File> {
2128 one_column_roundtrip_with_options(RoundTripOptions::new(values, nullable))
2129 }
2130
2131 fn one_column_roundtrip_with_schema(values: ArrayRef, schema: SchemaRef) -> Vec<File> {
2132 let mut options = RoundTripOptions::new(values, false);
2133 options.schema = schema;
2134 one_column_roundtrip_with_options(options)
2135 }
2136
2137 fn one_column_roundtrip_with_options(options: RoundTripOptions) -> Vec<File> {
2138 let RoundTripOptions {
2139 values,
2140 schema,
2141 bloom_filter,
2142 bloom_filter_position,
2143 } = options;
2144
2145 let encodings = match values.data_type() {
2146 DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary | DataType::LargeBinary => {
2147 vec![
2148 Encoding::PLAIN,
2149 Encoding::DELTA_BYTE_ARRAY,
2150 Encoding::DELTA_LENGTH_BYTE_ARRAY,
2151 ]
2152 }
2153 DataType::Int64
2154 | DataType::Int32
2155 | DataType::Int16
2156 | DataType::Int8
2157 | DataType::UInt64
2158 | DataType::UInt32
2159 | DataType::UInt16
2160 | DataType::UInt8 => vec![
2161 Encoding::PLAIN,
2162 Encoding::DELTA_BINARY_PACKED,
2163 Encoding::BYTE_STREAM_SPLIT,
2164 ],
2165 DataType::Float32 | DataType::Float64 => {
2166 vec![Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT]
2167 }
2168 _ => vec![Encoding::PLAIN],
2169 };
2170
2171 let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2172
2173 let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
2174
2175 let mut files = vec![];
2176 for dictionary_size in [0, 1, 1024] {
2177 for encoding in &encodings {
2178 for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2179 for row_group_size in row_group_sizes {
2180 let props = WriterProperties::builder()
2181 .set_writer_version(version)
2182 .set_max_row_group_size(row_group_size)
2183 .set_dictionary_enabled(dictionary_size != 0)
2184 .set_dictionary_page_size_limit(dictionary_size.max(1))
2185 .set_encoding(*encoding)
2186 .set_bloom_filter_enabled(bloom_filter)
2187 .set_bloom_filter_position(bloom_filter_position)
2188 .build();
2189
2190 files.push(roundtrip_opts(&expected_batch, props))
2191 }
2192 }
2193 }
2194 }
2195 files
2196 }
2197
2198 fn values_required<A, I>(iter: I) -> Vec<File>
2199 where
2200 A: From<Vec<I::Item>> + Array + 'static,
2201 I: IntoIterator,
2202 {
2203 let raw_values: Vec<_> = iter.into_iter().collect();
2204 let values = Arc::new(A::from(raw_values));
2205 one_column_roundtrip(values, false)
2206 }
2207
2208 fn values_optional<A, I>(iter: I) -> Vec<File>
2209 where
2210 A: From<Vec<Option<I::Item>>> + Array + 'static,
2211 I: IntoIterator,
2212 {
2213 let optional_raw_values: Vec<_> = iter
2214 .into_iter()
2215 .enumerate()
2216 .map(|(i, v)| if i % 2 == 0 { None } else { Some(v) })
2217 .collect();
2218 let optional_values = Arc::new(A::from(optional_raw_values));
2219 one_column_roundtrip(optional_values, true)
2220 }
2221
2222 fn required_and_optional<A, I>(iter: I)
2223 where
2224 A: From<Vec<I::Item>> + From<Vec<Option<I::Item>>> + Array + 'static,
2225 I: IntoIterator + Clone,
2226 {
2227 values_required::<A, I>(iter.clone());
2228 values_optional::<A, I>(iter);
2229 }
2230
2231 fn check_bloom_filter<T: AsBytes>(
2232 files: Vec<File>,
2233 file_column: String,
2234 positive_values: Vec<T>,
2235 negative_values: Vec<T>,
2236 ) {
2237 files.into_iter().take(1).for_each(|file| {
2238 let file_reader = SerializedFileReader::new_with_options(
2239 file,
2240 ReadOptionsBuilder::new()
2241 .with_reader_properties(
2242 ReaderProperties::builder()
2243 .set_read_bloom_filter(true)
2244 .build(),
2245 )
2246 .build(),
2247 )
2248 .expect("Unable to open file as Parquet");
2249 let metadata = file_reader.metadata();
2250
2251 let mut bloom_filters: Vec<_> = vec![];
2253 for (ri, row_group) in metadata.row_groups().iter().enumerate() {
2254 if let Some((column_index, _)) = row_group
2255 .columns()
2256 .iter()
2257 .enumerate()
2258 .find(|(_, column)| column.column_path().string() == file_column)
2259 {
2260 let row_group_reader = file_reader
2261 .get_row_group(ri)
2262 .expect("Unable to read row group");
2263 if let Some(sbbf) = row_group_reader.get_column_bloom_filter(column_index) {
2264 bloom_filters.push(sbbf.clone());
2265 } else {
2266 panic!("No bloom filter for column named {file_column} found");
2267 }
2268 } else {
2269 panic!("No column named {file_column} found");
2270 }
2271 }
2272
2273 positive_values.iter().for_each(|value| {
2274 let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2275 assert!(
2276 found.is_some(),
2277 "{}",
2278 format!("Value {:?} should be in bloom filter", value.as_bytes())
2279 );
2280 });
2281
2282 negative_values.iter().for_each(|value| {
2283 let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2284 assert!(
2285 found.is_none(),
2286 "{}",
2287 format!("Value {:?} should not be in bloom filter", value.as_bytes())
2288 );
2289 });
2290 });
2291 }
2292
2293 #[test]
2294 fn all_null_primitive_single_column() {
2295 let values = Arc::new(Int32Array::from(vec![None; SMALL_SIZE]));
2296 one_column_roundtrip(values, true);
2297 }
2298 #[test]
2299 fn null_single_column() {
2300 let values = Arc::new(NullArray::new(SMALL_SIZE));
2301 one_column_roundtrip(values, true);
2302 }
2304
2305 #[test]
2306 fn bool_single_column() {
2307 required_and_optional::<BooleanArray, _>(
2308 [true, false].iter().cycle().copied().take(SMALL_SIZE),
2309 );
2310 }
2311
2312 #[test]
2313 fn bool_large_single_column() {
2314 let values = Arc::new(
2315 [None, Some(true), Some(false)]
2316 .iter()
2317 .cycle()
2318 .copied()
2319 .take(200_000)
2320 .collect::<BooleanArray>(),
2321 );
2322 let schema = Schema::new(vec![Field::new("col", values.data_type().clone(), true)]);
2323 let expected_batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2324 let file = tempfile::tempfile().unwrap();
2325
2326 let mut writer =
2327 ArrowWriter::try_new(file.try_clone().unwrap(), expected_batch.schema(), None)
2328 .expect("Unable to write file");
2329 writer.write(&expected_batch).unwrap();
2330 writer.close().unwrap();
2331 }
2332
2333 #[test]
2334 fn check_page_offset_index_with_nan() {
2335 let values = Arc::new(Float64Array::from(vec![f64::NAN; 10]));
2336 let schema = Schema::new(vec![Field::new("col", DataType::Float64, true)]);
2337 let batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2338
2339 let mut out = Vec::with_capacity(1024);
2340 let mut writer =
2341 ArrowWriter::try_new(&mut out, batch.schema(), None).expect("Unable to write file");
2342 writer.write(&batch).unwrap();
2343 let file_meta_data = writer.close().unwrap();
2344 for row_group in file_meta_data.row_groups {
2345 for column in row_group.columns {
2346 assert!(column.offset_index_offset.is_some());
2347 assert!(column.offset_index_length.is_some());
2348 assert!(column.column_index_offset.is_none());
2349 assert!(column.column_index_length.is_none());
2350 }
2351 }
2352 }
2353
2354 #[test]
2355 fn i8_single_column() {
2356 required_and_optional::<Int8Array, _>(0..SMALL_SIZE as i8);
2357 }
2358
2359 #[test]
2360 fn i16_single_column() {
2361 required_and_optional::<Int16Array, _>(0..SMALL_SIZE as i16);
2362 }
2363
2364 #[test]
2365 fn i32_single_column() {
2366 required_and_optional::<Int32Array, _>(0..SMALL_SIZE as i32);
2367 }
2368
2369 #[test]
2370 fn i64_single_column() {
2371 required_and_optional::<Int64Array, _>(0..SMALL_SIZE as i64);
2372 }
2373
2374 #[test]
2375 fn u8_single_column() {
2376 required_and_optional::<UInt8Array, _>(0..SMALL_SIZE as u8);
2377 }
2378
2379 #[test]
2380 fn u16_single_column() {
2381 required_and_optional::<UInt16Array, _>(0..SMALL_SIZE as u16);
2382 }
2383
2384 #[test]
2385 fn u32_single_column() {
2386 required_and_optional::<UInt32Array, _>(0..SMALL_SIZE as u32);
2387 }
2388
2389 #[test]
2390 fn u64_single_column() {
2391 required_and_optional::<UInt64Array, _>(0..SMALL_SIZE as u64);
2392 }
2393
2394 #[test]
2395 fn f32_single_column() {
2396 required_and_optional::<Float32Array, _>((0..SMALL_SIZE).map(|i| i as f32));
2397 }
2398
2399 #[test]
2400 fn f64_single_column() {
2401 required_and_optional::<Float64Array, _>((0..SMALL_SIZE).map(|i| i as f64));
2402 }
2403
2404 #[test]
2409 fn timestamp_second_single_column() {
2410 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2411 let values = Arc::new(TimestampSecondArray::from(raw_values));
2412
2413 one_column_roundtrip(values, false);
2414 }
2415
2416 #[test]
2417 fn timestamp_millisecond_single_column() {
2418 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2419 let values = Arc::new(TimestampMillisecondArray::from(raw_values));
2420
2421 one_column_roundtrip(values, false);
2422 }
2423
2424 #[test]
2425 fn timestamp_microsecond_single_column() {
2426 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2427 let values = Arc::new(TimestampMicrosecondArray::from(raw_values));
2428
2429 one_column_roundtrip(values, false);
2430 }
2431
2432 #[test]
2433 fn timestamp_nanosecond_single_column() {
2434 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2435 let values = Arc::new(TimestampNanosecondArray::from(raw_values));
2436
2437 one_column_roundtrip(values, false);
2438 }
2439
2440 #[test]
2441 fn date32_single_column() {
2442 required_and_optional::<Date32Array, _>(0..SMALL_SIZE as i32);
2443 }
2444
2445 #[test]
2446 fn date64_single_column() {
2447 required_and_optional::<Date64Array, _>(
2449 (0..(SMALL_SIZE as i64 * 86400000)).step_by(86400000),
2450 );
2451 }
2452
2453 #[test]
2454 fn time32_second_single_column() {
2455 required_and_optional::<Time32SecondArray, _>(0..SMALL_SIZE as i32);
2456 }
2457
2458 #[test]
2459 fn time32_millisecond_single_column() {
2460 required_and_optional::<Time32MillisecondArray, _>(0..SMALL_SIZE as i32);
2461 }
2462
2463 #[test]
2464 fn time64_microsecond_single_column() {
2465 required_and_optional::<Time64MicrosecondArray, _>(0..SMALL_SIZE as i64);
2466 }
2467
2468 #[test]
2469 fn time64_nanosecond_single_column() {
2470 required_and_optional::<Time64NanosecondArray, _>(0..SMALL_SIZE as i64);
2471 }
2472
2473 #[test]
2474 #[should_panic(expected = "Converting Duration to parquet not supported")]
2475 fn duration_second_single_column() {
2476 required_and_optional::<DurationSecondArray, _>(0..SMALL_SIZE as i64);
2477 }
2478
2479 #[test]
2480 #[should_panic(expected = "Converting Duration to parquet not supported")]
2481 fn duration_millisecond_single_column() {
2482 required_and_optional::<DurationMillisecondArray, _>(0..SMALL_SIZE as i64);
2483 }
2484
2485 #[test]
2486 #[should_panic(expected = "Converting Duration to parquet not supported")]
2487 fn duration_microsecond_single_column() {
2488 required_and_optional::<DurationMicrosecondArray, _>(0..SMALL_SIZE as i64);
2489 }
2490
2491 #[test]
2492 #[should_panic(expected = "Converting Duration to parquet not supported")]
2493 fn duration_nanosecond_single_column() {
2494 required_and_optional::<DurationNanosecondArray, _>(0..SMALL_SIZE as i64);
2495 }
2496
2497 #[test]
2498 fn interval_year_month_single_column() {
2499 required_and_optional::<IntervalYearMonthArray, _>(0..SMALL_SIZE as i32);
2500 }
2501
2502 #[test]
2503 fn interval_day_time_single_column() {
2504 required_and_optional::<IntervalDayTimeArray, _>(vec![
2505 IntervalDayTime::new(0, 1),
2506 IntervalDayTime::new(0, 3),
2507 IntervalDayTime::new(3, -2),
2508 IntervalDayTime::new(-200, 4),
2509 ]);
2510 }
2511
2512 #[test]
2513 #[should_panic(
2514 expected = "Attempting to write an Arrow interval type MonthDayNano to parquet that is not yet implemented"
2515 )]
2516 fn interval_month_day_nano_single_column() {
2517 required_and_optional::<IntervalMonthDayNanoArray, _>(vec![
2518 IntervalMonthDayNano::new(0, 1, 5),
2519 IntervalMonthDayNano::new(0, 3, 2),
2520 IntervalMonthDayNano::new(3, -2, -5),
2521 IntervalMonthDayNano::new(-200, 4, -1),
2522 ]);
2523 }
2524
2525 #[test]
2526 fn binary_single_column() {
2527 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2528 let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect();
2529 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2530
2531 values_required::<BinaryArray, _>(many_vecs_iter);
2533 }
2534
2535 #[test]
2536 fn binary_view_single_column() {
2537 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2538 let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect();
2539 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2540
2541 values_required::<BinaryViewArray, _>(many_vecs_iter);
2543 }
2544
2545 #[test]
2546 fn i32_column_bloom_filter_at_end() {
2547 let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
2548 let mut options = RoundTripOptions::new(array, false);
2549 options.bloom_filter = true;
2550 options.bloom_filter_position = BloomFilterPosition::End;
2551
2552 let files = one_column_roundtrip_with_options(options);
2553 check_bloom_filter(
2554 files,
2555 "col".to_string(),
2556 (0..SMALL_SIZE as i32).collect(),
2557 (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
2558 );
2559 }
2560
2561 #[test]
2562 fn i32_column_bloom_filter() {
2563 let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
2564 let mut options = RoundTripOptions::new(array, false);
2565 options.bloom_filter = true;
2566
2567 let files = one_column_roundtrip_with_options(options);
2568 check_bloom_filter(
2569 files,
2570 "col".to_string(),
2571 (0..SMALL_SIZE as i32).collect(),
2572 (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
2573 );
2574 }
2575
2576 #[test]
2577 fn binary_column_bloom_filter() {
2578 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2579 let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect();
2580 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2581
2582 let array = Arc::new(BinaryArray::from_iter_values(many_vecs_iter));
2583 let mut options = RoundTripOptions::new(array, false);
2584 options.bloom_filter = true;
2585
2586 let files = one_column_roundtrip_with_options(options);
2587 check_bloom_filter(
2588 files,
2589 "col".to_string(),
2590 many_vecs,
2591 vec![vec![(SMALL_SIZE + 1) as u8]],
2592 );
2593 }
2594
2595 #[test]
2596 fn empty_string_null_column_bloom_filter() {
2597 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2598 let raw_strs = raw_values.iter().map(|s| s.as_str());
2599
2600 let array = Arc::new(StringArray::from_iter_values(raw_strs));
2601 let mut options = RoundTripOptions::new(array, false);
2602 options.bloom_filter = true;
2603
2604 let files = one_column_roundtrip_with_options(options);
2605
2606 let optional_raw_values: Vec<_> = raw_values
2607 .iter()
2608 .enumerate()
2609 .filter_map(|(i, v)| if i % 2 == 0 { None } else { Some(v.as_str()) })
2610 .collect();
2611 check_bloom_filter(files, "col".to_string(), optional_raw_values, vec![""]);
2613 }
2614
2615 #[test]
2616 fn large_binary_single_column() {
2617 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
2618 let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect();
2619 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
2620
2621 values_required::<LargeBinaryArray, _>(many_vecs_iter);
2623 }
2624
2625 #[test]
2626 fn fixed_size_binary_single_column() {
2627 let mut builder = FixedSizeBinaryBuilder::new(4);
2628 builder.append_value(b"0123").unwrap();
2629 builder.append_null();
2630 builder.append_value(b"8910").unwrap();
2631 builder.append_value(b"1112").unwrap();
2632 let array = Arc::new(builder.finish());
2633
2634 one_column_roundtrip(array, true);
2635 }
2636
2637 #[test]
2638 fn string_single_column() {
2639 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2640 let raw_strs = raw_values.iter().map(|s| s.as_str());
2641
2642 required_and_optional::<StringArray, _>(raw_strs);
2643 }
2644
2645 #[test]
2646 fn large_string_single_column() {
2647 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2648 let raw_strs = raw_values.iter().map(|s| s.as_str());
2649
2650 required_and_optional::<LargeStringArray, _>(raw_strs);
2651 }
2652
2653 #[test]
2654 fn string_view_single_column() {
2655 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
2656 let raw_strs = raw_values.iter().map(|s| s.as_str());
2657
2658 required_and_optional::<StringViewArray, _>(raw_strs);
2659 }
2660
2661 #[test]
2662 fn null_list_single_column() {
2663 let null_field = Field::new_list_field(DataType::Null, true);
2664 let list_field = Field::new("emptylist", DataType::List(Arc::new(null_field)), true);
2665
2666 let schema = Schema::new(vec![list_field]);
2667
2668 let a_values = NullArray::new(2);
2670 let a_value_offsets = arrow::buffer::Buffer::from([0, 0, 0, 2].to_byte_slice());
2671 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2672 DataType::Null,
2673 true,
2674 ))))
2675 .len(3)
2676 .add_buffer(a_value_offsets)
2677 .null_bit_buffer(Some(Buffer::from([0b00000101])))
2678 .add_child_data(a_values.into_data())
2679 .build()
2680 .unwrap();
2681
2682 let a = ListArray::from(a_list_data);
2683
2684 assert!(a.is_valid(0));
2685 assert!(!a.is_valid(1));
2686 assert!(a.is_valid(2));
2687
2688 assert_eq!(a.value(0).len(), 0);
2689 assert_eq!(a.value(2).len(), 2);
2690 assert_eq!(a.value(2).logical_nulls().unwrap().null_count(), 2);
2691
2692 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2693 roundtrip(batch, None);
2694 }
2695
2696 #[test]
2697 fn list_single_column() {
2698 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2699 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
2700 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2701 DataType::Int32,
2702 false,
2703 ))))
2704 .len(5)
2705 .add_buffer(a_value_offsets)
2706 .null_bit_buffer(Some(Buffer::from([0b00011011])))
2707 .add_child_data(a_values.into_data())
2708 .build()
2709 .unwrap();
2710
2711 assert_eq!(a_list_data.null_count(), 1);
2712
2713 let a = ListArray::from(a_list_data);
2714 let values = Arc::new(a);
2715
2716 one_column_roundtrip(values, true);
2717 }
2718
2719 #[test]
2720 fn large_list_single_column() {
2721 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2722 let a_value_offsets = arrow::buffer::Buffer::from([0i64, 1, 3, 3, 6, 10].to_byte_slice());
2723 let a_list_data = ArrayData::builder(DataType::LargeList(Arc::new(Field::new(
2724 "large_item",
2725 DataType::Int32,
2726 true,
2727 ))))
2728 .len(5)
2729 .add_buffer(a_value_offsets)
2730 .add_child_data(a_values.into_data())
2731 .null_bit_buffer(Some(Buffer::from([0b00011011])))
2732 .build()
2733 .unwrap();
2734
2735 assert_eq!(a_list_data.null_count(), 1);
2737
2738 let a = LargeListArray::from(a_list_data);
2739 let values = Arc::new(a);
2740
2741 one_column_roundtrip(values, true);
2742 }
2743
2744 #[test]
2745 fn list_nested_nulls() {
2746 use arrow::datatypes::Int32Type;
2747 let data = vec![
2748 Some(vec![Some(1)]),
2749 Some(vec![Some(2), Some(3)]),
2750 None,
2751 Some(vec![Some(4), Some(5), None]),
2752 Some(vec![None]),
2753 Some(vec![Some(6), Some(7)]),
2754 ];
2755
2756 let list = ListArray::from_iter_primitive::<Int32Type, _, _>(data.clone());
2757 one_column_roundtrip(Arc::new(list), true);
2758
2759 let list = LargeListArray::from_iter_primitive::<Int32Type, _, _>(data);
2760 one_column_roundtrip(Arc::new(list), true);
2761 }
2762
2763 #[test]
2764 fn struct_single_column() {
2765 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2766 let struct_field_a = Arc::new(Field::new("f", DataType::Int32, false));
2767 let s = StructArray::from(vec![(struct_field_a, Arc::new(a_values) as ArrayRef)]);
2768
2769 let values = Arc::new(s);
2770 one_column_roundtrip(values, false);
2771 }
2772
2773 #[test]
2774 fn list_and_map_coerced_names() {
2775 let list_field =
2777 Field::new_list("my_list", Field::new("item", DataType::Int32, false), false);
2778 let map_field = Field::new_map(
2779 "my_map",
2780 "entries",
2781 Field::new("keys", DataType::Int32, false),
2782 Field::new("values", DataType::Int32, true),
2783 false,
2784 true,
2785 );
2786
2787 let list_array = create_random_array(&list_field, 100, 0.0, 0.0).unwrap();
2788 let map_array = create_random_array(&map_field, 100, 0.0, 0.0).unwrap();
2789
2790 let arrow_schema = Arc::new(Schema::new(vec![list_field, map_field]));
2791
2792 let props = Some(WriterProperties::builder().set_coerce_types(true).build());
2794 let file = tempfile::tempfile().unwrap();
2795 let mut writer =
2796 ArrowWriter::try_new(file.try_clone().unwrap(), arrow_schema.clone(), props).unwrap();
2797
2798 let batch = RecordBatch::try_new(arrow_schema, vec![list_array, map_array]).unwrap();
2799 writer.write(&batch).unwrap();
2800 let file_metadata = writer.close().unwrap();
2801
2802 assert_eq!(file_metadata.schema[3].name, "element");
2804 assert_eq!(file_metadata.schema[5].name, "key_value");
2806 assert_eq!(file_metadata.schema[6].name, "key");
2808 assert_eq!(file_metadata.schema[7].name, "value");
2810
2811 let reader = SerializedFileReader::new(file).unwrap();
2813 let file_schema = reader.metadata().file_metadata().schema();
2814 let fields = file_schema.get_fields();
2815 let list_field = &fields[0].get_fields()[0];
2816 assert_eq!(list_field.get_fields()[0].name(), "element");
2817 let map_field = &fields[1].get_fields()[0];
2818 assert_eq!(map_field.name(), "key_value");
2819 assert_eq!(map_field.get_fields()[0].name(), "key");
2820 assert_eq!(map_field.get_fields()[1].name(), "value");
2821 }
2822
2823 #[test]
2824 fn fallback_flush_data_page() {
2825 let raw_values: Vec<_> = (0..MEDIUM_SIZE).map(|i| i.to_string()).collect();
2827 let values = Arc::new(StringArray::from(raw_values));
2828 let encodings = vec![
2829 Encoding::DELTA_BYTE_ARRAY,
2830 Encoding::DELTA_LENGTH_BYTE_ARRAY,
2831 ];
2832 let data_type = values.data_type().clone();
2833 let schema = Arc::new(Schema::new(vec![Field::new("col", data_type, false)]));
2834 let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2835
2836 let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
2837 let data_page_size_limit: usize = 32;
2838 let write_batch_size: usize = 16;
2839
2840 for encoding in &encodings {
2841 for row_group_size in row_group_sizes {
2842 let props = WriterProperties::builder()
2843 .set_writer_version(WriterVersion::PARQUET_2_0)
2844 .set_max_row_group_size(row_group_size)
2845 .set_dictionary_enabled(false)
2846 .set_encoding(*encoding)
2847 .set_data_page_size_limit(data_page_size_limit)
2848 .set_write_batch_size(write_batch_size)
2849 .build();
2850
2851 roundtrip_opts_with_array_validation(&expected_batch, props, |a, b| {
2852 let string_array_a = StringArray::from(a.clone());
2853 let string_array_b = StringArray::from(b.clone());
2854 let vec_a: Vec<&str> = string_array_a.iter().map(|v| v.unwrap()).collect();
2855 let vec_b: Vec<&str> = string_array_b.iter().map(|v| v.unwrap()).collect();
2856 assert_eq!(
2857 vec_a, vec_b,
2858 "failed for encoder: {encoding:?} and row_group_size: {row_group_size:?}"
2859 );
2860 });
2861 }
2862 }
2863 }
2864
2865 #[test]
2866 fn arrow_writer_string_dictionary() {
2867 #[allow(deprecated)]
2869 let schema = Arc::new(Schema::new(vec![Field::new_dict(
2870 "dictionary",
2871 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2872 true,
2873 42,
2874 true,
2875 )]));
2876
2877 let d: Int32DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
2879 .iter()
2880 .copied()
2881 .collect();
2882
2883 one_column_roundtrip_with_schema(Arc::new(d), schema);
2885 }
2886
2887 #[test]
2888 fn arrow_writer_primitive_dictionary() {
2889 #[allow(deprecated)]
2891 let schema = Arc::new(Schema::new(vec![Field::new_dict(
2892 "dictionary",
2893 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::UInt32)),
2894 true,
2895 42,
2896 true,
2897 )]));
2898
2899 let mut builder = PrimitiveDictionaryBuilder::<UInt8Type, UInt32Type>::new();
2901 builder.append(12345678).unwrap();
2902 builder.append_null();
2903 builder.append(22345678).unwrap();
2904 builder.append(12345678).unwrap();
2905 let d = builder.finish();
2906
2907 one_column_roundtrip_with_schema(Arc::new(d), schema);
2908 }
2909
2910 #[test]
2911 fn arrow_writer_decimal128_dictionary() {
2912 let integers = vec![12345, 56789, 34567];
2913
2914 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
2915
2916 let values = Decimal128Array::from(integers.clone())
2917 .with_precision_and_scale(5, 2)
2918 .unwrap();
2919
2920 let array = DictionaryArray::new(keys, Arc::new(values));
2921 one_column_roundtrip(Arc::new(array.clone()), true);
2922
2923 let values = Decimal128Array::from(integers)
2924 .with_precision_and_scale(12, 2)
2925 .unwrap();
2926
2927 let array = array.with_values(Arc::new(values));
2928 one_column_roundtrip(Arc::new(array), true);
2929 }
2930
2931 #[test]
2932 fn arrow_writer_decimal256_dictionary() {
2933 let integers = vec![
2934 i256::from_i128(12345),
2935 i256::from_i128(56789),
2936 i256::from_i128(34567),
2937 ];
2938
2939 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
2940
2941 let values = Decimal256Array::from(integers.clone())
2942 .with_precision_and_scale(5, 2)
2943 .unwrap();
2944
2945 let array = DictionaryArray::new(keys, Arc::new(values));
2946 one_column_roundtrip(Arc::new(array.clone()), true);
2947
2948 let values = Decimal256Array::from(integers)
2949 .with_precision_and_scale(12, 2)
2950 .unwrap();
2951
2952 let array = array.with_values(Arc::new(values));
2953 one_column_roundtrip(Arc::new(array), true);
2954 }
2955
2956 #[test]
2957 fn arrow_writer_string_dictionary_unsigned_index() {
2958 #[allow(deprecated)]
2960 let schema = Arc::new(Schema::new(vec![Field::new_dict(
2961 "dictionary",
2962 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
2963 true,
2964 42,
2965 true,
2966 )]));
2967
2968 let d: UInt8DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
2970 .iter()
2971 .copied()
2972 .collect();
2973
2974 one_column_roundtrip_with_schema(Arc::new(d), schema);
2975 }
2976
2977 #[test]
2978 fn u32_min_max() {
2979 let src = [
2981 u32::MIN,
2982 u32::MIN + 1,
2983 (i32::MAX as u32) - 1,
2984 i32::MAX as u32,
2985 (i32::MAX as u32) + 1,
2986 u32::MAX - 1,
2987 u32::MAX,
2988 ];
2989 let values = Arc::new(UInt32Array::from_iter_values(src.iter().cloned()));
2990 let files = one_column_roundtrip(values, false);
2991
2992 for file in files {
2993 let reader = SerializedFileReader::new(file).unwrap();
2995 let metadata = reader.metadata();
2996
2997 let mut row_offset = 0;
2998 for row_group in metadata.row_groups() {
2999 assert_eq!(row_group.num_columns(), 1);
3000 let column = row_group.column(0);
3001
3002 let num_values = column.num_values() as usize;
3003 let src_slice = &src[row_offset..row_offset + num_values];
3004 row_offset += column.num_values() as usize;
3005
3006 let stats = column.statistics().unwrap();
3007 if let Statistics::Int32(stats) = stats {
3008 assert_eq!(
3009 *stats.min_opt().unwrap() as u32,
3010 *src_slice.iter().min().unwrap()
3011 );
3012 assert_eq!(
3013 *stats.max_opt().unwrap() as u32,
3014 *src_slice.iter().max().unwrap()
3015 );
3016 } else {
3017 panic!("Statistics::Int32 missing")
3018 }
3019 }
3020 }
3021 }
3022
3023 #[test]
3024 fn u64_min_max() {
3025 let src = [
3027 u64::MIN,
3028 u64::MIN + 1,
3029 (i64::MAX as u64) - 1,
3030 i64::MAX as u64,
3031 (i64::MAX as u64) + 1,
3032 u64::MAX - 1,
3033 u64::MAX,
3034 ];
3035 let values = Arc::new(UInt64Array::from_iter_values(src.iter().cloned()));
3036 let files = one_column_roundtrip(values, false);
3037
3038 for file in files {
3039 let reader = SerializedFileReader::new(file).unwrap();
3041 let metadata = reader.metadata();
3042
3043 let mut row_offset = 0;
3044 for row_group in metadata.row_groups() {
3045 assert_eq!(row_group.num_columns(), 1);
3046 let column = row_group.column(0);
3047
3048 let num_values = column.num_values() as usize;
3049 let src_slice = &src[row_offset..row_offset + num_values];
3050 row_offset += column.num_values() as usize;
3051
3052 let stats = column.statistics().unwrap();
3053 if let Statistics::Int64(stats) = stats {
3054 assert_eq!(
3055 *stats.min_opt().unwrap() as u64,
3056 *src_slice.iter().min().unwrap()
3057 );
3058 assert_eq!(
3059 *stats.max_opt().unwrap() as u64,
3060 *src_slice.iter().max().unwrap()
3061 );
3062 } else {
3063 panic!("Statistics::Int64 missing")
3064 }
3065 }
3066 }
3067 }
3068
3069 #[test]
3070 fn statistics_null_counts_only_nulls() {
3071 let values = Arc::new(UInt64Array::from(vec![None, None]));
3073 let files = one_column_roundtrip(values, true);
3074
3075 for file in files {
3076 let reader = SerializedFileReader::new(file).unwrap();
3078 let metadata = reader.metadata();
3079 assert_eq!(metadata.num_row_groups(), 1);
3080 let row_group = metadata.row_group(0);
3081 assert_eq!(row_group.num_columns(), 1);
3082 let column = row_group.column(0);
3083 let stats = column.statistics().unwrap();
3084 assert_eq!(stats.null_count_opt(), Some(2));
3085 }
3086 }
3087
3088 #[test]
3089 fn test_list_of_struct_roundtrip() {
3090 let int_field = Field::new("a", DataType::Int32, true);
3092 let int_field2 = Field::new("b", DataType::Int32, true);
3093
3094 let int_builder = Int32Builder::with_capacity(10);
3095 let int_builder2 = Int32Builder::with_capacity(10);
3096
3097 let struct_builder = StructBuilder::new(
3098 vec![int_field, int_field2],
3099 vec![Box::new(int_builder), Box::new(int_builder2)],
3100 );
3101 let mut list_builder = ListBuilder::new(struct_builder);
3102
3103 let values = list_builder.values();
3108 values
3109 .field_builder::<Int32Builder>(0)
3110 .unwrap()
3111 .append_value(1);
3112 values
3113 .field_builder::<Int32Builder>(1)
3114 .unwrap()
3115 .append_value(2);
3116 values.append(true);
3117 list_builder.append(true);
3118
3119 list_builder.append(true);
3121
3122 list_builder.append(false);
3124
3125 let values = list_builder.values();
3127 values
3128 .field_builder::<Int32Builder>(0)
3129 .unwrap()
3130 .append_null();
3131 values
3132 .field_builder::<Int32Builder>(1)
3133 .unwrap()
3134 .append_null();
3135 values.append(false);
3136 values
3137 .field_builder::<Int32Builder>(0)
3138 .unwrap()
3139 .append_null();
3140 values
3141 .field_builder::<Int32Builder>(1)
3142 .unwrap()
3143 .append_null();
3144 values.append(false);
3145 list_builder.append(true);
3146
3147 let values = list_builder.values();
3149 values
3150 .field_builder::<Int32Builder>(0)
3151 .unwrap()
3152 .append_null();
3153 values
3154 .field_builder::<Int32Builder>(1)
3155 .unwrap()
3156 .append_value(3);
3157 values.append(true);
3158 list_builder.append(true);
3159
3160 let values = list_builder.values();
3162 values
3163 .field_builder::<Int32Builder>(0)
3164 .unwrap()
3165 .append_value(2);
3166 values
3167 .field_builder::<Int32Builder>(1)
3168 .unwrap()
3169 .append_null();
3170 values.append(true);
3171 list_builder.append(true);
3172
3173 let array = Arc::new(list_builder.finish());
3174
3175 one_column_roundtrip(array, true);
3176 }
3177
3178 fn row_group_sizes(metadata: &ParquetMetaData) -> Vec<i64> {
3179 metadata.row_groups().iter().map(|x| x.num_rows()).collect()
3180 }
3181
3182 #[test]
3183 fn test_aggregates_records() {
3184 let arrays = [
3185 Int32Array::from((0..100).collect::<Vec<_>>()),
3186 Int32Array::from((0..50).collect::<Vec<_>>()),
3187 Int32Array::from((200..500).collect::<Vec<_>>()),
3188 ];
3189
3190 let schema = Arc::new(Schema::new(vec![Field::new(
3191 "int",
3192 ArrowDataType::Int32,
3193 false,
3194 )]));
3195
3196 let file = tempfile::tempfile().unwrap();
3197
3198 let props = WriterProperties::builder()
3199 .set_max_row_group_size(200)
3200 .build();
3201
3202 let mut writer =
3203 ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
3204
3205 for array in arrays {
3206 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
3207 writer.write(&batch).unwrap();
3208 }
3209
3210 writer.close().unwrap();
3211
3212 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3213 assert_eq!(&row_group_sizes(builder.metadata()), &[200, 200, 50]);
3214
3215 let batches = builder
3216 .with_batch_size(100)
3217 .build()
3218 .unwrap()
3219 .collect::<ArrowResult<Vec<_>>>()
3220 .unwrap();
3221
3222 assert_eq!(batches.len(), 5);
3223 assert!(batches.iter().all(|x| x.num_columns() == 1));
3224
3225 let batch_sizes: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
3226
3227 assert_eq!(&batch_sizes, &[100, 100, 100, 100, 50]);
3228
3229 let values: Vec<_> = batches
3230 .iter()
3231 .flat_map(|x| {
3232 x.column(0)
3233 .as_any()
3234 .downcast_ref::<Int32Array>()
3235 .unwrap()
3236 .values()
3237 .iter()
3238 .cloned()
3239 })
3240 .collect();
3241
3242 let expected_values: Vec<_> = [0..100, 0..50, 200..500].into_iter().flatten().collect();
3243 assert_eq!(&values, &expected_values)
3244 }
3245
3246 #[test]
3247 fn complex_aggregate() {
3248 let field_a = Arc::new(Field::new("leaf_a", DataType::Int32, false));
3250 let field_b = Arc::new(Field::new("leaf_b", DataType::Int32, true));
3251 let struct_a = Arc::new(Field::new(
3252 "struct_a",
3253 DataType::Struct(vec![field_a.clone(), field_b.clone()].into()),
3254 true,
3255 ));
3256
3257 let list_a = Arc::new(Field::new("list", DataType::List(struct_a), true));
3258 let struct_b = Arc::new(Field::new(
3259 "struct_b",
3260 DataType::Struct(vec![list_a.clone()].into()),
3261 false,
3262 ));
3263
3264 let schema = Arc::new(Schema::new(vec![struct_b]));
3265
3266 let field_a_array = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
3268 let field_b_array =
3269 Int32Array::from_iter(vec![Some(1), None, Some(2), None, None, Some(6)]);
3270
3271 let struct_a_array = StructArray::from(vec![
3272 (field_a.clone(), Arc::new(field_a_array) as ArrayRef),
3273 (field_b.clone(), Arc::new(field_b_array) as ArrayRef),
3274 ]);
3275
3276 let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
3277 .len(5)
3278 .add_buffer(Buffer::from_iter(vec![
3279 0_i32, 1_i32, 1_i32, 3_i32, 3_i32, 5_i32,
3280 ]))
3281 .null_bit_buffer(Some(Buffer::from_iter(vec![
3282 true, false, true, false, true,
3283 ])))
3284 .child_data(vec![struct_a_array.into_data()])
3285 .build()
3286 .unwrap();
3287
3288 let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
3289 let struct_b_array = StructArray::from(vec![(list_a.clone(), list_a_array)]);
3290
3291 let batch1 =
3292 RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
3293 .unwrap();
3294
3295 let field_a_array = Int32Array::from(vec![6, 7, 8, 9, 10]);
3296 let field_b_array = Int32Array::from_iter(vec![None, None, None, Some(1), None]);
3297
3298 let struct_a_array = StructArray::from(vec![
3299 (field_a, Arc::new(field_a_array) as ArrayRef),
3300 (field_b, Arc::new(field_b_array) as ArrayRef),
3301 ]);
3302
3303 let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
3304 .len(2)
3305 .add_buffer(Buffer::from_iter(vec![0_i32, 4_i32, 5_i32]))
3306 .child_data(vec![struct_a_array.into_data()])
3307 .build()
3308 .unwrap();
3309
3310 let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
3311 let struct_b_array = StructArray::from(vec![(list_a, list_a_array)]);
3312
3313 let batch2 =
3314 RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
3315 .unwrap();
3316
3317 let batches = &[batch1, batch2];
3318
3319 let expected = r#"
3322 +-------------------------------------------------------------------------------------------------------+
3323 | struct_b |
3324 +-------------------------------------------------------------------------------------------------------+
3325 | {list: [{leaf_a: 1, leaf_b: 1}]} |
3326 | {list: } |
3327 | {list: [{leaf_a: 2, leaf_b: }, {leaf_a: 3, leaf_b: 2}]} |
3328 | {list: } |
3329 | {list: [{leaf_a: 4, leaf_b: }, {leaf_a: 5, leaf_b: }]} |
3330 | {list: [{leaf_a: 6, leaf_b: }, {leaf_a: 7, leaf_b: }, {leaf_a: 8, leaf_b: }, {leaf_a: 9, leaf_b: 1}]} |
3331 | {list: [{leaf_a: 10, leaf_b: }]} |
3332 +-------------------------------------------------------------------------------------------------------+
3333 "#.trim().split('\n').map(|x| x.trim()).collect::<Vec<_>>().join("\n");
3334
3335 let actual = pretty_format_batches(batches).unwrap().to_string();
3336 assert_eq!(actual, expected);
3337
3338 let file = tempfile::tempfile().unwrap();
3340 let props = WriterProperties::builder()
3341 .set_max_row_group_size(6)
3342 .build();
3343
3344 let mut writer =
3345 ArrowWriter::try_new(file.try_clone().unwrap(), schema, Some(props)).unwrap();
3346
3347 for batch in batches {
3348 writer.write(batch).unwrap();
3349 }
3350 writer.close().unwrap();
3351
3352 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3357 assert_eq!(&row_group_sizes(builder.metadata()), &[6, 1]);
3358
3359 let batches = builder
3360 .with_batch_size(2)
3361 .build()
3362 .unwrap()
3363 .collect::<ArrowResult<Vec<_>>>()
3364 .unwrap();
3365
3366 assert_eq!(batches.len(), 4);
3367 let batch_counts: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
3368 assert_eq!(&batch_counts, &[2, 2, 2, 1]);
3369
3370 let actual = pretty_format_batches(&batches).unwrap().to_string();
3371 assert_eq!(actual, expected);
3372 }
3373
3374 #[test]
3375 fn test_arrow_writer_metadata() {
3376 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3377 let file_schema = batch_schema.clone().with_metadata(
3378 vec![("foo".to_string(), "bar".to_string())]
3379 .into_iter()
3380 .collect(),
3381 );
3382
3383 let batch = RecordBatch::try_new(
3384 Arc::new(batch_schema),
3385 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3386 )
3387 .unwrap();
3388
3389 let mut buf = Vec::with_capacity(1024);
3390 let mut writer = ArrowWriter::try_new(&mut buf, Arc::new(file_schema), None).unwrap();
3391 writer.write(&batch).unwrap();
3392 writer.close().unwrap();
3393 }
3394
3395 #[test]
3396 fn test_arrow_writer_nullable() {
3397 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3398 let file_schema = Schema::new(vec![Field::new("int32", DataType::Int32, true)]);
3399 let file_schema = Arc::new(file_schema);
3400
3401 let batch = RecordBatch::try_new(
3402 Arc::new(batch_schema),
3403 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3404 )
3405 .unwrap();
3406
3407 let mut buf = Vec::with_capacity(1024);
3408 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
3409 writer.write(&batch).unwrap();
3410 writer.close().unwrap();
3411
3412 let mut read = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
3413 let back = read.next().unwrap().unwrap();
3414 assert_eq!(back.schema(), file_schema);
3415 assert_ne!(back.schema(), batch.schema());
3416 assert_eq!(back.column(0).as_ref(), batch.column(0).as_ref());
3417 }
3418
3419 #[test]
3420 fn in_progress_accounting() {
3421 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
3423
3424 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
3426
3427 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
3429
3430 let mut writer = ArrowWriter::try_new(vec![], batch.schema(), None).unwrap();
3431
3432 assert_eq!(writer.in_progress_size(), 0);
3434 assert_eq!(writer.in_progress_rows(), 0);
3435 assert_eq!(writer.memory_size(), 0);
3436 assert_eq!(writer.bytes_written(), 4); writer.write(&batch).unwrap();
3438
3439 let initial_size = writer.in_progress_size();
3441 assert!(initial_size > 0);
3442 assert_eq!(writer.in_progress_rows(), 5);
3443 let initial_memory = writer.memory_size();
3444 assert!(initial_memory > 0);
3445 assert!(
3447 initial_size <= initial_memory,
3448 "{initial_size} <= {initial_memory}"
3449 );
3450
3451 writer.write(&batch).unwrap();
3453 assert!(writer.in_progress_size() > initial_size);
3454 assert_eq!(writer.in_progress_rows(), 10);
3455 assert!(writer.memory_size() > initial_memory);
3456 assert!(
3457 writer.in_progress_size() <= writer.memory_size(),
3458 "in_progress_size {} <= memory_size {}",
3459 writer.in_progress_size(),
3460 writer.memory_size()
3461 );
3462
3463 let pre_flush_bytes_written = writer.bytes_written();
3465 writer.flush().unwrap();
3466 assert_eq!(writer.in_progress_size(), 0);
3467 assert_eq!(writer.memory_size(), 0);
3468 assert!(writer.bytes_written() > pre_flush_bytes_written);
3469
3470 writer.close().unwrap();
3471 }
3472
3473 #[test]
3474 fn test_writer_all_null() {
3475 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
3476 let b = Int32Array::new(vec![0; 5].into(), Some(NullBuffer::new_null(5)));
3477 let batch = RecordBatch::try_from_iter(vec![
3478 ("a", Arc::new(a) as ArrayRef),
3479 ("b", Arc::new(b) as ArrayRef),
3480 ])
3481 .unwrap();
3482
3483 let mut buf = Vec::with_capacity(1024);
3484 let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), None).unwrap();
3485 writer.write(&batch).unwrap();
3486 writer.close().unwrap();
3487
3488 let bytes = Bytes::from(buf);
3489 let options = ReadOptionsBuilder::new().with_page_index().build();
3490 let reader = SerializedFileReader::new_with_options(bytes, options).unwrap();
3491 let index = reader.metadata().offset_index().unwrap();
3492
3493 assert_eq!(index.len(), 1);
3494 assert_eq!(index[0].len(), 2); assert_eq!(index[0][0].page_locations().len(), 1); assert_eq!(index[0][1].page_locations().len(), 1); }
3498
3499 #[test]
3500 fn test_disabled_statistics_with_page() {
3501 let file_schema = Schema::new(vec![
3502 Field::new("a", DataType::Utf8, true),
3503 Field::new("b", DataType::Utf8, true),
3504 ]);
3505 let file_schema = Arc::new(file_schema);
3506
3507 let batch = RecordBatch::try_new(
3508 file_schema.clone(),
3509 vec![
3510 Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
3511 Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
3512 ],
3513 )
3514 .unwrap();
3515
3516 let props = WriterProperties::builder()
3517 .set_statistics_enabled(EnabledStatistics::None)
3518 .set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
3519 .build();
3520
3521 let mut buf = Vec::with_capacity(1024);
3522 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
3523 writer.write(&batch).unwrap();
3524
3525 let metadata = writer.close().unwrap();
3526 assert_eq!(metadata.row_groups.len(), 1);
3527 let row_group = &metadata.row_groups[0];
3528 assert_eq!(row_group.columns.len(), 2);
3529 assert!(row_group.columns[0].offset_index_offset.is_some());
3531 assert!(row_group.columns[0].column_index_offset.is_some());
3532 assert!(row_group.columns[1].offset_index_offset.is_some());
3534 assert!(row_group.columns[1].column_index_offset.is_none());
3535
3536 let options = ReadOptionsBuilder::new().with_page_index().build();
3537 let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
3538
3539 let row_group = reader.get_row_group(0).unwrap();
3540 let a_col = row_group.metadata().column(0);
3541 let b_col = row_group.metadata().column(1);
3542
3543 if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
3545 let min = byte_array_stats.min_opt().unwrap();
3546 let max = byte_array_stats.max_opt().unwrap();
3547
3548 assert_eq!(min.as_bytes(), b"a");
3549 assert_eq!(max.as_bytes(), b"d");
3550 } else {
3551 panic!("expecting Statistics::ByteArray");
3552 }
3553
3554 assert!(b_col.statistics().is_none());
3556
3557 let offset_index = reader.metadata().offset_index().unwrap();
3558 assert_eq!(offset_index.len(), 1); assert_eq!(offset_index[0].len(), 2); let column_index = reader.metadata().column_index().unwrap();
3562 assert_eq!(column_index.len(), 1); assert_eq!(column_index[0].len(), 2); let a_idx = &column_index[0][0];
3566 assert!(matches!(a_idx, Index::BYTE_ARRAY(_)), "{a_idx:?}");
3567 let b_idx = &column_index[0][1];
3568 assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
3569 }
3570
3571 #[test]
3572 fn test_disabled_statistics_with_chunk() {
3573 let file_schema = Schema::new(vec![
3574 Field::new("a", DataType::Utf8, true),
3575 Field::new("b", DataType::Utf8, true),
3576 ]);
3577 let file_schema = Arc::new(file_schema);
3578
3579 let batch = RecordBatch::try_new(
3580 file_schema.clone(),
3581 vec![
3582 Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
3583 Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
3584 ],
3585 )
3586 .unwrap();
3587
3588 let props = WriterProperties::builder()
3589 .set_statistics_enabled(EnabledStatistics::None)
3590 .set_column_statistics_enabled("a".into(), EnabledStatistics::Chunk)
3591 .build();
3592
3593 let mut buf = Vec::with_capacity(1024);
3594 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
3595 writer.write(&batch).unwrap();
3596
3597 let metadata = writer.close().unwrap();
3598 assert_eq!(metadata.row_groups.len(), 1);
3599 let row_group = &metadata.row_groups[0];
3600 assert_eq!(row_group.columns.len(), 2);
3601 assert!(row_group.columns[0].offset_index_offset.is_some());
3603 assert!(row_group.columns[0].column_index_offset.is_none());
3604 assert!(row_group.columns[1].offset_index_offset.is_some());
3606 assert!(row_group.columns[1].column_index_offset.is_none());
3607
3608 let options = ReadOptionsBuilder::new().with_page_index().build();
3609 let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
3610
3611 let row_group = reader.get_row_group(0).unwrap();
3612 let a_col = row_group.metadata().column(0);
3613 let b_col = row_group.metadata().column(1);
3614
3615 if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
3617 let min = byte_array_stats.min_opt().unwrap();
3618 let max = byte_array_stats.max_opt().unwrap();
3619
3620 assert_eq!(min.as_bytes(), b"a");
3621 assert_eq!(max.as_bytes(), b"d");
3622 } else {
3623 panic!("expecting Statistics::ByteArray");
3624 }
3625
3626 assert!(b_col.statistics().is_none());
3628
3629 let column_index = reader.metadata().column_index().unwrap();
3630 assert_eq!(column_index.len(), 1); assert_eq!(column_index[0].len(), 2); let a_idx = &column_index[0][0];
3634 assert!(matches!(a_idx, Index::NONE), "{a_idx:?}");
3635 let b_idx = &column_index[0][1];
3636 assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
3637 }
3638
3639 #[test]
3640 fn test_arrow_writer_skip_metadata() {
3641 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
3642 let file_schema = Arc::new(batch_schema.clone());
3643
3644 let batch = RecordBatch::try_new(
3645 Arc::new(batch_schema),
3646 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3647 )
3648 .unwrap();
3649 let skip_options = ArrowWriterOptions::new().with_skip_arrow_metadata(true);
3650
3651 let mut buf = Vec::with_capacity(1024);
3652 let mut writer =
3653 ArrowWriter::try_new_with_options(&mut buf, file_schema.clone(), skip_options).unwrap();
3654 writer.write(&batch).unwrap();
3655 writer.close().unwrap();
3656
3657 let bytes = Bytes::from(buf);
3658 let reader_builder = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
3659 assert_eq!(file_schema, *reader_builder.schema());
3660 if let Some(key_value_metadata) = reader_builder
3661 .metadata()
3662 .file_metadata()
3663 .key_value_metadata()
3664 {
3665 assert!(!key_value_metadata
3666 .iter()
3667 .any(|kv| kv.key.as_str() == ARROW_SCHEMA_META_KEY));
3668 }
3669 }
3670
3671 #[test]
3672 fn mismatched_schemas() {
3673 let batch_schema = Schema::new(vec![Field::new("count", DataType::Int32, false)]);
3674 let file_schema = Arc::new(Schema::new(vec![Field::new(
3675 "temperature",
3676 DataType::Float64,
3677 false,
3678 )]));
3679
3680 let batch = RecordBatch::try_new(
3681 Arc::new(batch_schema),
3682 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
3683 )
3684 .unwrap();
3685
3686 let mut buf = Vec::with_capacity(1024);
3687 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
3688
3689 let err = writer.write(&batch).unwrap_err().to_string();
3690 assert_eq!(
3691 err,
3692 "Arrow: Incompatible type. Field 'temperature' has type Float64, array has type Int32"
3693 );
3694 }
3695
3696 #[test]
3697 fn test_roundtrip_empty_schema() {
3699 let empty_batch = RecordBatch::try_new_with_options(
3701 Arc::new(Schema::empty()),
3702 vec![],
3703 &RecordBatchOptions::default().with_row_count(Some(0)),
3704 )
3705 .unwrap();
3706
3707 let mut parquet_bytes: Vec<u8> = Vec::new();
3709 let mut writer =
3710 ArrowWriter::try_new(&mut parquet_bytes, empty_batch.schema(), None).unwrap();
3711 writer.write(&empty_batch).unwrap();
3712 writer.close().unwrap();
3713
3714 let bytes = Bytes::from(parquet_bytes);
3716 let reader = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
3717 assert_eq!(reader.schema(), &empty_batch.schema());
3718 let batches: Vec<_> = reader
3719 .build()
3720 .unwrap()
3721 .collect::<ArrowResult<Vec<_>>>()
3722 .unwrap();
3723 assert_eq!(batches.len(), 0);
3724 }
3725}