1use crate::column::chunker::ContentDefinedChunker;
21
22use bytes::Bytes;
23use std::io::{Read, Write};
24use std::iter::Peekable;
25use std::slice::Iter;
26use std::sync::{Arc, Mutex};
27use std::vec::IntoIter;
28
29use arrow_array::cast::AsArray;
30use arrow_array::types::*;
31use arrow_array::{ArrayRef, Int32Array, RecordBatch, RecordBatchWriter};
32use arrow_schema::{
33 ArrowError, DataType as ArrowDataType, Field, IntervalUnit, SchemaRef, TimeUnit,
34};
35
36use super::schema::{add_encoded_arrow_schema_to_metadata, decimal_length_from_precision};
37
38use crate::arrow::ArrowSchemaConverter;
39use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder;
40use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter};
41use crate::column::page_encryption::PageEncryptor;
42use crate::column::writer::encoder::ColumnValueEncoder;
43use crate::column::writer::{
44 ColumnCloseResult, ColumnWriter, GenericColumnWriter, get_column_writer,
45};
46use crate::data_type::{ByteArray, FixedLenByteArray};
47#[cfg(feature = "encryption")]
48use crate::encryption::encrypt::FileEncryptor;
49use crate::errors::{ParquetError, Result};
50use crate::file::metadata::{KeyValue, ParquetMetaData, RowGroupMetaData};
51use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
52use crate::file::reader::{ChunkReader, Length};
53use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
54use crate::parquet_thrift::{ThriftCompactOutputProtocol, WriteThrift};
55use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor};
56use levels::{ArrayLevels, calculate_array_levels};
57
58mod byte_array;
59mod levels;
60
61pub struct ArrowWriter<W: Write> {
178 writer: SerializedFileWriter<W>,
180
181 in_progress: Option<ArrowRowGroupWriter>,
183
184 arrow_schema: SchemaRef,
188
189 row_group_writer_factory: ArrowRowGroupWriterFactory,
191
192 max_row_group_row_count: Option<usize>,
194
195 max_row_group_bytes: Option<usize>,
197
198 cdc_chunkers: Option<Vec<ContentDefinedChunker>>,
200}
201
202impl<W: Write + Send> std::fmt::Debug for ArrowWriter<W> {
203 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
204 let buffered_memory = self.in_progress_size();
205 f.debug_struct("ArrowWriter")
206 .field("writer", &self.writer)
207 .field("in_progress_size", &format_args!("{buffered_memory} bytes"))
208 .field("in_progress_rows", &self.in_progress_rows())
209 .field("arrow_schema", &self.arrow_schema)
210 .field("max_row_group_row_count", &self.max_row_group_row_count)
211 .field("max_row_group_bytes", &self.max_row_group_bytes)
212 .finish()
213 }
214}
215
216impl<W: Write + Send> ArrowWriter<W> {
217 pub fn try_new(
223 writer: W,
224 arrow_schema: SchemaRef,
225 props: Option<WriterProperties>,
226 ) -> Result<Self> {
227 let options = ArrowWriterOptions::new().with_properties(props.unwrap_or_default());
228 Self::try_new_with_options(writer, arrow_schema, options)
229 }
230
231 pub fn try_new_with_options(
237 writer: W,
238 arrow_schema: SchemaRef,
239 options: ArrowWriterOptions,
240 ) -> Result<Self> {
241 let mut props = options.properties;
242
243 let schema = if let Some(parquet_schema) = options.schema_descr {
244 parquet_schema.clone()
245 } else {
246 let mut converter = ArrowSchemaConverter::new().with_coerce_types(props.coerce_types());
247 if let Some(schema_root) = &options.schema_root {
248 converter = converter.schema_root(schema_root);
249 }
250
251 converter.convert(&arrow_schema)?
252 };
253
254 if !options.skip_arrow_metadata {
255 add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
257 }
258
259 let max_row_group_row_count = props.max_row_group_row_count();
260 let max_row_group_bytes = props.max_row_group_bytes();
261
262 let props_ptr = Arc::new(props);
263 let file_writer =
264 SerializedFileWriter::new(writer, schema.root_schema_ptr(), Arc::clone(&props_ptr))?;
265
266 let row_group_writer_factory =
267 ArrowRowGroupWriterFactory::new(&file_writer, arrow_schema.clone());
268
269 let cdc_chunkers = props_ptr
270 .content_defined_chunking()
271 .map(|opts| {
272 file_writer
273 .schema_descr()
274 .columns()
275 .iter()
276 .map(|desc| ContentDefinedChunker::new(desc, opts))
277 .collect::<Result<Vec<_>>>()
278 })
279 .transpose()?;
280
281 Ok(Self {
282 writer: file_writer,
283 in_progress: None,
284 arrow_schema,
285 row_group_writer_factory,
286 max_row_group_row_count,
287 max_row_group_bytes,
288 cdc_chunkers,
289 })
290 }
291
292 pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
294 self.writer.flushed_row_groups()
295 }
296
297 pub fn memory_size(&self) -> usize {
302 match &self.in_progress {
303 Some(in_progress) => in_progress.writers.iter().map(|x| x.memory_size()).sum(),
304 None => 0,
305 }
306 }
307
308 pub fn in_progress_size(&self) -> usize {
315 match &self.in_progress {
316 Some(in_progress) => in_progress
317 .writers
318 .iter()
319 .map(|x| x.get_estimated_total_bytes())
320 .sum(),
321 None => 0,
322 }
323 }
324
325 pub fn in_progress_rows(&self) -> usize {
327 self.in_progress
328 .as_ref()
329 .map(|x| x.buffered_rows)
330 .unwrap_or_default()
331 }
332
333 pub fn bytes_written(&self) -> usize {
335 self.writer.bytes_written()
336 }
337
338 pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
350 if batch.num_rows() == 0 {
351 return Ok(());
352 }
353
354 let in_progress = match &mut self.in_progress {
355 Some(in_progress) => in_progress,
356 x => x.insert(
357 self.row_group_writer_factory
358 .create_row_group_writer(self.writer.flushed_row_groups().len())?,
359 ),
360 };
361
362 if let Some(max_rows) = self.max_row_group_row_count {
363 if in_progress.buffered_rows + batch.num_rows() > max_rows {
364 let to_write = max_rows - in_progress.buffered_rows;
365 let a = batch.slice(0, to_write);
366 let b = batch.slice(to_write, batch.num_rows() - to_write);
367 self.write(&a)?;
368 return self.write(&b);
369 }
370 }
371
372 if let Some(max_bytes) = self.max_row_group_bytes {
375 if in_progress.buffered_rows > 0 {
376 let current_bytes = in_progress.get_estimated_total_bytes();
377
378 if current_bytes >= max_bytes {
379 self.flush()?;
380 return self.write(batch);
381 }
382
383 let avg_row_bytes = current_bytes / in_progress.buffered_rows;
384 if avg_row_bytes > 0 {
385 let remaining_bytes = max_bytes - current_bytes;
387 let rows_that_fit = remaining_bytes / avg_row_bytes;
388
389 if batch.num_rows() > rows_that_fit {
390 if rows_that_fit > 0 {
391 let a = batch.slice(0, rows_that_fit);
392 let b = batch.slice(rows_that_fit, batch.num_rows() - rows_that_fit);
393 self.write(&a)?;
394 return self.write(&b);
395 } else {
396 self.flush()?;
397 return self.write(batch);
398 }
399 }
400 }
401 }
402 }
403
404 match self.cdc_chunkers.as_mut() {
405 Some(chunkers) => in_progress.write_with_chunkers(batch, chunkers)?,
406 None => in_progress.write(batch)?,
407 }
408
409 let should_flush = self
410 .max_row_group_row_count
411 .is_some_and(|max| in_progress.buffered_rows >= max)
412 || self
413 .max_row_group_bytes
414 .is_some_and(|max| in_progress.get_estimated_total_bytes() >= max);
415
416 if should_flush {
417 self.flush()?
418 }
419 Ok(())
420 }
421
422 pub fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
427 self.writer.write_all(buf)
428 }
429
430 pub fn sync(&mut self) -> std::io::Result<()> {
432 self.writer.flush()
433 }
434
435 pub fn flush(&mut self) -> Result<()> {
440 let in_progress = match self.in_progress.take() {
441 Some(in_progress) => in_progress,
442 None => return Ok(()),
443 };
444
445 let mut row_group_writer = self.writer.next_row_group()?;
446 for chunk in in_progress.close()? {
447 chunk.append_to_row_group(&mut row_group_writer)?;
448 }
449 row_group_writer.close()?;
450 Ok(())
451 }
452
453 pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
457 self.writer.append_key_value_metadata(kv_metadata)
458 }
459
460 pub fn inner(&self) -> &W {
462 self.writer.inner()
463 }
464
465 pub fn inner_mut(&mut self) -> &mut W {
474 self.writer.inner_mut()
475 }
476
477 pub fn into_inner(mut self) -> Result<W> {
479 self.flush()?;
480 self.writer.into_inner()
481 }
482
483 pub fn finish(&mut self) -> Result<ParquetMetaData> {
489 self.flush()?;
490 self.writer.finish()
491 }
492
493 pub fn close(mut self) -> Result<ParquetMetaData> {
495 self.finish()
496 }
497
498 #[deprecated(
500 since = "56.2.0",
501 note = "Use `ArrowRowGroupWriterFactory` instead, see `ArrowColumnWriter` for an example"
502 )]
503 pub fn get_column_writers(&mut self) -> Result<Vec<ArrowColumnWriter>> {
504 self.flush()?;
505 let in_progress = self
506 .row_group_writer_factory
507 .create_row_group_writer(self.writer.flushed_row_groups().len())?;
508 Ok(in_progress.writers)
509 }
510
511 #[deprecated(
513 since = "56.2.0",
514 note = "Use `SerializedFileWriter` directly instead, see `ArrowColumnWriter` for an example"
515 )]
516 pub fn append_row_group(&mut self, chunks: Vec<ArrowColumnChunk>) -> Result<()> {
517 let mut row_group_writer = self.writer.next_row_group()?;
518 for chunk in chunks {
519 chunk.append_to_row_group(&mut row_group_writer)?;
520 }
521 row_group_writer.close()?;
522 Ok(())
523 }
524
525 pub fn into_serialized_writer(
532 mut self,
533 ) -> Result<(SerializedFileWriter<W>, ArrowRowGroupWriterFactory)> {
534 self.flush()?;
535 Ok((self.writer, self.row_group_writer_factory))
536 }
537}
538
539impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> {
540 fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
541 self.write(batch).map_err(|e| e.into())
542 }
543
544 fn close(self) -> std::result::Result<(), ArrowError> {
545 self.close()?;
546 Ok(())
547 }
548}
549
550#[derive(Debug, Clone, Default)]
554pub struct ArrowWriterOptions {
555 properties: WriterProperties,
556 skip_arrow_metadata: bool,
557 schema_root: Option<String>,
558 schema_descr: Option<SchemaDescriptor>,
559}
560
561impl ArrowWriterOptions {
562 pub fn new() -> Self {
564 Self::default()
565 }
566
567 pub fn with_properties(self, properties: WriterProperties) -> Self {
569 Self { properties, ..self }
570 }
571
572 pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
579 Self {
580 skip_arrow_metadata,
581 ..self
582 }
583 }
584
585 pub fn with_schema_root(self, schema_root: String) -> Self {
587 Self {
588 schema_root: Some(schema_root),
589 ..self
590 }
591 }
592
593 pub fn with_parquet_schema(self, schema_descr: SchemaDescriptor) -> Self {
599 Self {
600 schema_descr: Some(schema_descr),
601 ..self
602 }
603 }
604}
605
606#[derive(Default)]
608struct ArrowColumnChunkData {
609 length: usize,
610 data: Vec<Bytes>,
611}
612
613impl Length for ArrowColumnChunkData {
614 fn len(&self) -> u64 {
615 self.length as _
616 }
617}
618
619impl ChunkReader for ArrowColumnChunkData {
620 type T = ArrowColumnChunkReader;
621
622 fn get_read(&self, start: u64) -> Result<Self::T> {
623 assert_eq!(start, 0); Ok(ArrowColumnChunkReader(
625 self.data.clone().into_iter().peekable(),
626 ))
627 }
628
629 fn get_bytes(&self, _start: u64, _length: usize) -> Result<Bytes> {
630 unimplemented!()
631 }
632}
633
634struct ArrowColumnChunkReader(Peekable<IntoIter<Bytes>>);
636
637impl Read for ArrowColumnChunkReader {
638 fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
639 let buffer = loop {
640 match self.0.peek_mut() {
641 Some(b) if b.is_empty() => {
642 self.0.next();
643 continue;
644 }
645 Some(b) => break b,
646 None => return Ok(0),
647 }
648 };
649
650 let len = buffer.len().min(out.len());
651 let b = buffer.split_to(len);
652 out[..len].copy_from_slice(&b);
653 Ok(len)
654 }
655}
656
657type SharedColumnChunk = Arc<Mutex<ArrowColumnChunkData>>;
662
663#[derive(Default)]
664struct ArrowPageWriter {
665 buffer: SharedColumnChunk,
666 #[cfg(feature = "encryption")]
667 page_encryptor: Option<PageEncryptor>,
668}
669
670impl ArrowPageWriter {
671 #[cfg(feature = "encryption")]
672 pub fn with_encryptor(mut self, page_encryptor: Option<PageEncryptor>) -> Self {
673 self.page_encryptor = page_encryptor;
674 self
675 }
676
677 #[cfg(feature = "encryption")]
678 fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
679 self.page_encryptor.as_mut()
680 }
681
682 #[cfg(not(feature = "encryption"))]
683 fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
684 None
685 }
686}
687
688impl PageWriter for ArrowPageWriter {
689 fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
690 let page = match self.page_encryptor_mut() {
691 Some(page_encryptor) => page_encryptor.encrypt_compressed_page(page)?,
692 None => page,
693 };
694
695 let page_header = page.to_thrift_header()?;
696 let header = {
697 let mut header = Vec::with_capacity(1024);
698
699 match self.page_encryptor_mut() {
700 Some(page_encryptor) => {
701 page_encryptor.encrypt_page_header(&page_header, &mut header)?;
702 if page.compressed_page().is_data_page() {
703 page_encryptor.increment_page();
704 }
705 }
706 None => {
707 let mut protocol = ThriftCompactOutputProtocol::new(&mut header);
708 page_header.write_thrift(&mut protocol)?;
709 }
710 };
711
712 Bytes::from(header)
713 };
714
715 let mut buf = self.buffer.try_lock().unwrap();
716
717 let data = page.compressed_page().buffer().clone();
718 let compressed_size = data.len() + header.len();
719
720 let mut spec = PageWriteSpec::new();
721 spec.page_type = page.page_type();
722 spec.num_values = page.num_values();
723 spec.uncompressed_size = page.uncompressed_size() + header.len();
724 spec.offset = buf.length as u64;
725 spec.compressed_size = compressed_size;
726 spec.bytes_written = compressed_size as u64;
727
728 buf.length += compressed_size;
729 buf.data.push(header);
730 buf.data.push(data);
731
732 Ok(spec)
733 }
734
735 fn close(&mut self) -> Result<()> {
736 Ok(())
737 }
738}
739
740#[derive(Debug)]
742pub struct ArrowLeafColumn(ArrayLevels);
743
744pub fn compute_leaves(field: &Field, array: &ArrayRef) -> Result<Vec<ArrowLeafColumn>> {
749 let levels = calculate_array_levels(array, field)?;
750 Ok(levels.into_iter().map(ArrowLeafColumn).collect())
751}
752
753pub struct ArrowColumnChunk {
755 data: ArrowColumnChunkData,
756 close: ColumnCloseResult,
757}
758
759impl std::fmt::Debug for ArrowColumnChunk {
760 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
761 f.debug_struct("ArrowColumnChunk")
762 .field("length", &self.data.length)
763 .finish_non_exhaustive()
764 }
765}
766
767impl ArrowColumnChunk {
768 pub fn append_to_row_group<W: Write + Send>(
770 self,
771 writer: &mut SerializedRowGroupWriter<'_, W>,
772 ) -> Result<()> {
773 writer.append_column(&self.data, self.close)
774 }
775}
776
777pub struct ArrowColumnWriter {
875 writer: ArrowColumnWriterImpl,
876 chunk: SharedColumnChunk,
877}
878
879impl std::fmt::Debug for ArrowColumnWriter {
880 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
881 f.debug_struct("ArrowColumnWriter").finish_non_exhaustive()
882 }
883}
884
885enum ArrowColumnWriterImpl {
886 ByteArray(GenericColumnWriter<'static, ByteArrayEncoder>),
887 Column(ColumnWriter<'static>),
888}
889
890impl ArrowColumnWriter {
891 pub fn write(&mut self, col: &ArrowLeafColumn) -> Result<()> {
893 self.write_internal(&col.0)
894 }
895
896 fn write_with_chunker(
898 &mut self,
899 col: &ArrowLeafColumn,
900 chunker: &mut ContentDefinedChunker,
901 ) -> Result<()> {
902 let levels = &col.0;
903 let chunks =
904 chunker.get_arrow_chunks(levels.def_levels(), levels.rep_levels(), levels.array())?;
905
906 let num_chunks = chunks.len();
907 for (i, chunk) in chunks.iter().enumerate() {
908 let chunk_levels = levels.slice_for_chunk(chunk);
909 self.write_internal(&chunk_levels)?;
910
911 if i + 1 < num_chunks {
913 match &mut self.writer {
914 ArrowColumnWriterImpl::Column(c) => c.add_data_page()?,
915 ArrowColumnWriterImpl::ByteArray(c) => c.add_data_page()?,
916 }
917 }
918 }
919 Ok(())
920 }
921
922 fn write_internal(&mut self, levels: &ArrayLevels) -> Result<()> {
923 match &mut self.writer {
924 ArrowColumnWriterImpl::Column(c) => {
925 let leaf = levels.array();
926 match leaf.as_any_dictionary_opt() {
927 Some(dictionary) => {
928 let materialized =
929 arrow_select::take::take(dictionary.values(), dictionary.keys(), None)?;
930 write_leaf(c, &materialized, levels)?
931 }
932 None => write_leaf(c, leaf, levels)?,
933 };
934 }
935 ArrowColumnWriterImpl::ByteArray(c) => {
936 write_primitive(c, levels.array().as_ref(), levels)?;
937 }
938 }
939 Ok(())
940 }
941
942 pub fn close(self) -> Result<ArrowColumnChunk> {
944 let close = match self.writer {
945 ArrowColumnWriterImpl::ByteArray(c) => c.close()?,
946 ArrowColumnWriterImpl::Column(c) => c.close()?,
947 };
948 let chunk = Arc::try_unwrap(self.chunk).ok().unwrap();
949 let data = chunk.into_inner().unwrap();
950 Ok(ArrowColumnChunk { data, close })
951 }
952
953 pub fn memory_size(&self) -> usize {
964 match &self.writer {
965 ArrowColumnWriterImpl::ByteArray(c) => c.memory_size(),
966 ArrowColumnWriterImpl::Column(c) => c.memory_size(),
967 }
968 }
969
970 pub fn get_estimated_total_bytes(&self) -> usize {
978 match &self.writer {
979 ArrowColumnWriterImpl::ByteArray(c) => c.get_estimated_total_bytes() as _,
980 ArrowColumnWriterImpl::Column(c) => c.get_estimated_total_bytes() as _,
981 }
982 }
983}
984
985#[derive(Debug)]
992struct ArrowRowGroupWriter {
993 writers: Vec<ArrowColumnWriter>,
994 schema: SchemaRef,
995 buffered_rows: usize,
996}
997
998impl ArrowRowGroupWriter {
999 fn new(writers: Vec<ArrowColumnWriter>, arrow: &SchemaRef) -> Self {
1000 Self {
1001 writers,
1002 schema: arrow.clone(),
1003 buffered_rows: 0,
1004 }
1005 }
1006
1007 fn write(&mut self, batch: &RecordBatch) -> Result<()> {
1008 self.buffered_rows += batch.num_rows();
1009 let mut writers = self.writers.iter_mut();
1010 for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
1011 for leaf in compute_leaves(field.as_ref(), column)? {
1012 writers.next().unwrap().write(&leaf)?;
1013 }
1014 }
1015 Ok(())
1016 }
1017
1018 fn write_with_chunkers(
1019 &mut self,
1020 batch: &RecordBatch,
1021 chunkers: &mut [ContentDefinedChunker],
1022 ) -> Result<()> {
1023 self.buffered_rows += batch.num_rows();
1024 let mut writers = self.writers.iter_mut();
1025 let mut chunkers = chunkers.iter_mut();
1026 for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
1027 for leaf in compute_leaves(field.as_ref(), column)? {
1028 writers
1029 .next()
1030 .unwrap()
1031 .write_with_chunker(&leaf, chunkers.next().unwrap())?;
1032 }
1033 }
1034 Ok(())
1035 }
1036
1037 fn get_estimated_total_bytes(&self) -> usize {
1039 self.writers
1040 .iter()
1041 .map(|x| x.get_estimated_total_bytes())
1042 .sum()
1043 }
1044
1045 fn close(self) -> Result<Vec<ArrowColumnChunk>> {
1046 self.writers
1047 .into_iter()
1048 .map(|writer| writer.close())
1049 .collect()
1050 }
1051}
1052
1053#[derive(Debug)]
1058pub struct ArrowRowGroupWriterFactory {
1059 schema: SchemaDescPtr,
1060 arrow_schema: SchemaRef,
1061 props: WriterPropertiesPtr,
1062 #[cfg(feature = "encryption")]
1063 file_encryptor: Option<Arc<FileEncryptor>>,
1064}
1065
1066impl ArrowRowGroupWriterFactory {
1067 pub fn new<W: Write + Send>(
1069 file_writer: &SerializedFileWriter<W>,
1070 arrow_schema: SchemaRef,
1071 ) -> Self {
1072 let schema = Arc::clone(file_writer.schema_descr_ptr());
1073 let props = Arc::clone(file_writer.properties());
1074 Self {
1075 schema,
1076 arrow_schema,
1077 props,
1078 #[cfg(feature = "encryption")]
1079 file_encryptor: file_writer.file_encryptor(),
1080 }
1081 }
1082
1083 fn create_row_group_writer(&self, row_group_index: usize) -> Result<ArrowRowGroupWriter> {
1084 let writers = self.create_column_writers(row_group_index)?;
1085 Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema))
1086 }
1087
1088 pub fn create_column_writers(&self, row_group_index: usize) -> Result<Vec<ArrowColumnWriter>> {
1090 let mut writers = Vec::with_capacity(self.arrow_schema.fields.len());
1091 let mut leaves = self.schema.columns().iter();
1092 let column_factory = self.column_writer_factory(row_group_index);
1093 for field in &self.arrow_schema.fields {
1094 column_factory.get_arrow_column_writer(
1095 field.data_type(),
1096 &self.props,
1097 &mut leaves,
1098 &mut writers,
1099 )?;
1100 }
1101 Ok(writers)
1102 }
1103
1104 #[cfg(feature = "encryption")]
1105 fn column_writer_factory(&self, row_group_idx: usize) -> ArrowColumnWriterFactory {
1106 ArrowColumnWriterFactory::new()
1107 .with_file_encryptor(row_group_idx, self.file_encryptor.clone())
1108 }
1109
1110 #[cfg(not(feature = "encryption"))]
1111 fn column_writer_factory(&self, _row_group_idx: usize) -> ArrowColumnWriterFactory {
1112 ArrowColumnWriterFactory::new()
1113 }
1114}
1115
1116#[deprecated(since = "57.0.0", note = "Use `ArrowRowGroupWriterFactory` instead")]
1118pub fn get_column_writers(
1119 parquet: &SchemaDescriptor,
1120 props: &WriterPropertiesPtr,
1121 arrow: &SchemaRef,
1122) -> Result<Vec<ArrowColumnWriter>> {
1123 let mut writers = Vec::with_capacity(arrow.fields.len());
1124 let mut leaves = parquet.columns().iter();
1125 let column_factory = ArrowColumnWriterFactory::new();
1126 for field in &arrow.fields {
1127 column_factory.get_arrow_column_writer(
1128 field.data_type(),
1129 props,
1130 &mut leaves,
1131 &mut writers,
1132 )?;
1133 }
1134 Ok(writers)
1135}
1136
1137struct ArrowColumnWriterFactory {
1139 #[cfg(feature = "encryption")]
1140 row_group_index: usize,
1141 #[cfg(feature = "encryption")]
1142 file_encryptor: Option<Arc<FileEncryptor>>,
1143}
1144
1145impl ArrowColumnWriterFactory {
1146 pub fn new() -> Self {
1147 Self {
1148 #[cfg(feature = "encryption")]
1149 row_group_index: 0,
1150 #[cfg(feature = "encryption")]
1151 file_encryptor: None,
1152 }
1153 }
1154
1155 #[cfg(feature = "encryption")]
1156 pub fn with_file_encryptor(
1157 mut self,
1158 row_group_index: usize,
1159 file_encryptor: Option<Arc<FileEncryptor>>,
1160 ) -> Self {
1161 self.row_group_index = row_group_index;
1162 self.file_encryptor = file_encryptor;
1163 self
1164 }
1165
1166 #[cfg(feature = "encryption")]
1167 fn create_page_writer(
1168 &self,
1169 column_descriptor: &ColumnDescPtr,
1170 column_index: usize,
1171 ) -> Result<Box<ArrowPageWriter>> {
1172 let column_path = column_descriptor.path().string();
1173 let page_encryptor = PageEncryptor::create_if_column_encrypted(
1174 &self.file_encryptor,
1175 self.row_group_index,
1176 column_index,
1177 &column_path,
1178 )?;
1179 Ok(Box::new(
1180 ArrowPageWriter::default().with_encryptor(page_encryptor),
1181 ))
1182 }
1183
1184 #[cfg(not(feature = "encryption"))]
1185 fn create_page_writer(
1186 &self,
1187 _column_descriptor: &ColumnDescPtr,
1188 _column_index: usize,
1189 ) -> Result<Box<ArrowPageWriter>> {
1190 Ok(Box::<ArrowPageWriter>::default())
1191 }
1192
1193 fn get_arrow_column_writer(
1196 &self,
1197 data_type: &ArrowDataType,
1198 props: &WriterPropertiesPtr,
1199 leaves: &mut Iter<'_, ColumnDescPtr>,
1200 out: &mut Vec<ArrowColumnWriter>,
1201 ) -> Result<()> {
1202 let col = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
1204 let page_writer = self.create_page_writer(desc, out.len())?;
1205 let chunk = page_writer.buffer.clone();
1206 let writer = get_column_writer(desc.clone(), props.clone(), page_writer);
1207 Ok(ArrowColumnWriter {
1208 chunk,
1209 writer: ArrowColumnWriterImpl::Column(writer),
1210 })
1211 };
1212
1213 let bytes = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
1215 let page_writer = self.create_page_writer(desc, out.len())?;
1216 let chunk = page_writer.buffer.clone();
1217 let writer = GenericColumnWriter::new(desc.clone(), props.clone(), page_writer);
1218 Ok(ArrowColumnWriter {
1219 chunk,
1220 writer: ArrowColumnWriterImpl::ByteArray(writer),
1221 })
1222 };
1223
1224 match data_type {
1225 _ if data_type.is_primitive() => out.push(col(leaves.next().unwrap())?),
1226 ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Boolean | ArrowDataType::Null => {
1227 out.push(col(leaves.next().unwrap())?)
1228 }
1229 ArrowDataType::LargeBinary
1230 | ArrowDataType::Binary
1231 | ArrowDataType::Utf8
1232 | ArrowDataType::LargeUtf8
1233 | ArrowDataType::BinaryView
1234 | ArrowDataType::Utf8View => out.push(bytes(leaves.next().unwrap())?),
1235 ArrowDataType::List(f)
1236 | ArrowDataType::LargeList(f)
1237 | ArrowDataType::FixedSizeList(f, _)
1238 | ArrowDataType::ListView(f)
1239 | ArrowDataType::LargeListView(f) => {
1240 self.get_arrow_column_writer(f.data_type(), props, leaves, out)?
1241 }
1242 ArrowDataType::Struct(fields) => {
1243 for field in fields {
1244 self.get_arrow_column_writer(field.data_type(), props, leaves, out)?
1245 }
1246 }
1247 ArrowDataType::Map(f, _) => match f.data_type() {
1248 ArrowDataType::Struct(f) => {
1249 self.get_arrow_column_writer(f[0].data_type(), props, leaves, out)?;
1250 self.get_arrow_column_writer(f[1].data_type(), props, leaves, out)?
1251 }
1252 _ => unreachable!("invalid map type"),
1253 },
1254 ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1255 ArrowDataType::Utf8
1256 | ArrowDataType::LargeUtf8
1257 | ArrowDataType::Binary
1258 | ArrowDataType::LargeBinary => out.push(bytes(leaves.next().unwrap())?),
1259 ArrowDataType::Utf8View | ArrowDataType::BinaryView => {
1260 out.push(bytes(leaves.next().unwrap())?)
1261 }
1262 ArrowDataType::FixedSizeBinary(_) => out.push(bytes(leaves.next().unwrap())?),
1263 _ => out.push(col(leaves.next().unwrap())?),
1264 },
1265 _ => {
1266 return Err(ParquetError::NYI(format!(
1267 "Attempting to write an Arrow type {data_type} to parquet that is not yet implemented"
1268 )));
1269 }
1270 }
1271 Ok(())
1272 }
1273}
1274
1275fn write_leaf(
1276 writer: &mut ColumnWriter<'_>,
1277 column: &dyn arrow_array::Array,
1278 levels: &ArrayLevels,
1279) -> Result<usize> {
1280 let indices = levels.non_null_indices();
1281
1282 match writer {
1283 ColumnWriter::Int32ColumnWriter(typed) => {
1285 match column.data_type() {
1286 ArrowDataType::Null => {
1287 let array = Int32Array::new_null(column.len());
1288 write_primitive(typed, array.values(), levels)
1289 }
1290 ArrowDataType::Int8 => {
1291 let array: Int32Array = column.as_primitive::<Int8Type>().unary(|x| x as i32);
1292 write_primitive(typed, array.values(), levels)
1293 }
1294 ArrowDataType::Int16 => {
1295 let array: Int32Array = column.as_primitive::<Int16Type>().unary(|x| x as i32);
1296 write_primitive(typed, array.values(), levels)
1297 }
1298 ArrowDataType::Int32 => {
1299 write_primitive(typed, column.as_primitive::<Int32Type>().values(), levels)
1300 }
1301 ArrowDataType::UInt8 => {
1302 let array: Int32Array = column.as_primitive::<UInt8Type>().unary(|x| x as i32);
1303 write_primitive(typed, array.values(), levels)
1304 }
1305 ArrowDataType::UInt16 => {
1306 let array: Int32Array = column.as_primitive::<UInt16Type>().unary(|x| x as i32);
1307 write_primitive(typed, array.values(), levels)
1308 }
1309 ArrowDataType::UInt32 => {
1310 let array = column.as_primitive::<UInt32Type>();
1313 write_primitive(typed, array.values().inner().typed_data(), levels)
1314 }
1315 ArrowDataType::Date32 => {
1316 let array = column.as_primitive::<Date32Type>();
1317 write_primitive(typed, array.values(), levels)
1318 }
1319 ArrowDataType::Time32(TimeUnit::Second) => {
1320 let array = column.as_primitive::<Time32SecondType>();
1321 write_primitive(typed, array.values(), levels)
1322 }
1323 ArrowDataType::Time32(TimeUnit::Millisecond) => {
1324 let array = column.as_primitive::<Time32MillisecondType>();
1325 write_primitive(typed, array.values(), levels)
1326 }
1327 ArrowDataType::Date64 => {
1328 let array: Int32Array = column
1330 .as_primitive::<Date64Type>()
1331 .unary(|x| (x / 86_400_000) as _);
1332
1333 write_primitive(typed, array.values(), levels)
1334 }
1335 ArrowDataType::Decimal32(_, _) => {
1336 let array = column
1337 .as_primitive::<Decimal32Type>()
1338 .unary::<_, Int32Type>(|v| v);
1339 write_primitive(typed, array.values(), levels)
1340 }
1341 ArrowDataType::Decimal64(_, _) => {
1342 let array = column
1344 .as_primitive::<Decimal64Type>()
1345 .unary::<_, Int32Type>(|v| v as i32);
1346 write_primitive(typed, array.values(), levels)
1347 }
1348 ArrowDataType::Decimal128(_, _) => {
1349 let array = column
1351 .as_primitive::<Decimal128Type>()
1352 .unary::<_, Int32Type>(|v| v as i32);
1353 write_primitive(typed, array.values(), levels)
1354 }
1355 ArrowDataType::Decimal256(_, _) => {
1356 let array = column
1358 .as_primitive::<Decimal256Type>()
1359 .unary::<_, Int32Type>(|v| v.as_i128() as i32);
1360 write_primitive(typed, array.values(), levels)
1361 }
1362 d => Err(ParquetError::General(format!("Cannot coerce {d} to I32"))),
1363 }
1364 }
1365 ColumnWriter::BoolColumnWriter(typed) => {
1366 let array = column.as_boolean();
1367 typed.write_batch(
1368 get_bool_array_slice(array, indices).as_slice(),
1369 levels.def_levels(),
1370 levels.rep_levels(),
1371 )
1372 }
1373 ColumnWriter::Int64ColumnWriter(typed) => {
1374 match column.data_type() {
1375 ArrowDataType::Date64 => {
1376 let array = column
1377 .as_primitive::<Date64Type>()
1378 .reinterpret_cast::<Int64Type>();
1379
1380 write_primitive(typed, array.values(), levels)
1381 }
1382 ArrowDataType::Int64 => {
1383 let array = column.as_primitive::<Int64Type>();
1384 write_primitive(typed, array.values(), levels)
1385 }
1386 ArrowDataType::UInt64 => {
1387 let values = column.as_primitive::<UInt64Type>().values();
1388 let array = values.inner().typed_data::<i64>();
1391 write_primitive(typed, array, levels)
1392 }
1393 ArrowDataType::Time64(TimeUnit::Microsecond) => {
1394 let array = column.as_primitive::<Time64MicrosecondType>();
1395 write_primitive(typed, array.values(), levels)
1396 }
1397 ArrowDataType::Time64(TimeUnit::Nanosecond) => {
1398 let array = column.as_primitive::<Time64NanosecondType>();
1399 write_primitive(typed, array.values(), levels)
1400 }
1401 ArrowDataType::Timestamp(unit, _) => match unit {
1402 TimeUnit::Second => {
1403 let array = column.as_primitive::<TimestampSecondType>();
1404 write_primitive(typed, array.values(), levels)
1405 }
1406 TimeUnit::Millisecond => {
1407 let array = column.as_primitive::<TimestampMillisecondType>();
1408 write_primitive(typed, array.values(), levels)
1409 }
1410 TimeUnit::Microsecond => {
1411 let array = column.as_primitive::<TimestampMicrosecondType>();
1412 write_primitive(typed, array.values(), levels)
1413 }
1414 TimeUnit::Nanosecond => {
1415 let array = column.as_primitive::<TimestampNanosecondType>();
1416 write_primitive(typed, array.values(), levels)
1417 }
1418 },
1419 ArrowDataType::Duration(unit) => match unit {
1420 TimeUnit::Second => {
1421 let array = column.as_primitive::<DurationSecondType>();
1422 write_primitive(typed, array.values(), levels)
1423 }
1424 TimeUnit::Millisecond => {
1425 let array = column.as_primitive::<DurationMillisecondType>();
1426 write_primitive(typed, array.values(), levels)
1427 }
1428 TimeUnit::Microsecond => {
1429 let array = column.as_primitive::<DurationMicrosecondType>();
1430 write_primitive(typed, array.values(), levels)
1431 }
1432 TimeUnit::Nanosecond => {
1433 let array = column.as_primitive::<DurationNanosecondType>();
1434 write_primitive(typed, array.values(), levels)
1435 }
1436 },
1437 ArrowDataType::Decimal64(_, _) => {
1438 let array = column
1439 .as_primitive::<Decimal64Type>()
1440 .reinterpret_cast::<Int64Type>();
1441 write_primitive(typed, array.values(), levels)
1442 }
1443 ArrowDataType::Decimal128(_, _) => {
1444 let array = column
1446 .as_primitive::<Decimal128Type>()
1447 .unary::<_, Int64Type>(|v| v as i64);
1448 write_primitive(typed, array.values(), levels)
1449 }
1450 ArrowDataType::Decimal256(_, _) => {
1451 let array = column
1453 .as_primitive::<Decimal256Type>()
1454 .unary::<_, Int64Type>(|v| v.as_i128() as i64);
1455 write_primitive(typed, array.values(), levels)
1456 }
1457 d => Err(ParquetError::General(format!("Cannot coerce {d} to I64"))),
1458 }
1459 }
1460 ColumnWriter::Int96ColumnWriter(_typed) => {
1461 unreachable!("Currently unreachable because data type not supported")
1462 }
1463 ColumnWriter::FloatColumnWriter(typed) => {
1464 let array = column.as_primitive::<Float32Type>();
1465 write_primitive(typed, array.values(), levels)
1466 }
1467 ColumnWriter::DoubleColumnWriter(typed) => {
1468 let array = column.as_primitive::<Float64Type>();
1469 write_primitive(typed, array.values(), levels)
1470 }
1471 ColumnWriter::ByteArrayColumnWriter(_) => {
1472 unreachable!("should use ByteArrayWriter")
1473 }
1474 ColumnWriter::FixedLenByteArrayColumnWriter(typed) => {
1475 let bytes = match column.data_type() {
1476 ArrowDataType::Interval(interval_unit) => match interval_unit {
1477 IntervalUnit::YearMonth => {
1478 let array = column.as_primitive::<IntervalYearMonthType>();
1479 get_interval_ym_array_slice(array, indices)
1480 }
1481 IntervalUnit::DayTime => {
1482 let array = column.as_primitive::<IntervalDayTimeType>();
1483 get_interval_dt_array_slice(array, indices)
1484 }
1485 _ => {
1486 return Err(ParquetError::NYI(format!(
1487 "Attempting to write an Arrow interval type {interval_unit:?} to parquet that is not yet implemented"
1488 )));
1489 }
1490 },
1491 ArrowDataType::FixedSizeBinary(_) => {
1492 let array = column.as_fixed_size_binary();
1493 get_fsb_array_slice(array, indices)
1494 }
1495 ArrowDataType::Decimal32(_, _) => {
1496 let array = column.as_primitive::<Decimal32Type>();
1497 get_decimal_32_array_slice(array, indices)
1498 }
1499 ArrowDataType::Decimal64(_, _) => {
1500 let array = column.as_primitive::<Decimal64Type>();
1501 get_decimal_64_array_slice(array, indices)
1502 }
1503 ArrowDataType::Decimal128(_, _) => {
1504 let array = column.as_primitive::<Decimal128Type>();
1505 get_decimal_128_array_slice(array, indices)
1506 }
1507 ArrowDataType::Decimal256(_, _) => {
1508 let array = column.as_primitive::<Decimal256Type>();
1509 get_decimal_256_array_slice(array, indices)
1510 }
1511 ArrowDataType::Float16 => {
1512 let array = column.as_primitive::<Float16Type>();
1513 get_float_16_array_slice(array, indices)
1514 }
1515 _ => {
1516 return Err(ParquetError::NYI(
1517 "Attempting to write an Arrow type that is not yet implemented".to_string(),
1518 ));
1519 }
1520 };
1521 typed.write_batch(bytes.as_slice(), levels.def_levels(), levels.rep_levels())
1522 }
1523 }
1524}
1525
1526fn write_primitive<E: ColumnValueEncoder>(
1527 writer: &mut GenericColumnWriter<E>,
1528 values: &E::Values,
1529 levels: &ArrayLevels,
1530) -> Result<usize> {
1531 writer.write_batch_internal(
1532 values,
1533 Some(levels.non_null_indices()),
1534 levels.def_levels(),
1535 levels.rep_levels(),
1536 None,
1537 None,
1538 None,
1539 )
1540}
1541
1542fn get_bool_array_slice(array: &arrow_array::BooleanArray, indices: &[usize]) -> Vec<bool> {
1543 let mut values = Vec::with_capacity(indices.len());
1544 for i in indices {
1545 values.push(array.value(*i))
1546 }
1547 values
1548}
1549
1550fn get_interval_ym_array_slice(
1553 array: &arrow_array::IntervalYearMonthArray,
1554 indices: &[usize],
1555) -> Vec<FixedLenByteArray> {
1556 let mut values = Vec::with_capacity(indices.len());
1557 for i in indices {
1558 let mut value = array.value(*i).to_le_bytes().to_vec();
1559 let mut suffix = vec![0; 8];
1560 value.append(&mut suffix);
1561 values.push(FixedLenByteArray::from(ByteArray::from(value)))
1562 }
1563 values
1564}
1565
1566fn get_interval_dt_array_slice(
1569 array: &arrow_array::IntervalDayTimeArray,
1570 indices: &[usize],
1571) -> Vec<FixedLenByteArray> {
1572 let mut values = Vec::with_capacity(indices.len());
1573 for i in indices {
1574 let mut out = [0; 12];
1575 let value = array.value(*i);
1576 out[4..8].copy_from_slice(&value.days.to_le_bytes());
1577 out[8..12].copy_from_slice(&value.milliseconds.to_le_bytes());
1578 values.push(FixedLenByteArray::from(ByteArray::from(out.to_vec())));
1579 }
1580 values
1581}
1582
1583fn get_decimal_32_array_slice(
1584 array: &arrow_array::Decimal32Array,
1585 indices: &[usize],
1586) -> Vec<FixedLenByteArray> {
1587 let mut values = Vec::with_capacity(indices.len());
1588 let size = decimal_length_from_precision(array.precision());
1589 for i in indices {
1590 let as_be_bytes = array.value(*i).to_be_bytes();
1591 let resized_value = as_be_bytes[(4 - size)..].to_vec();
1592 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1593 }
1594 values
1595}
1596
1597fn get_decimal_64_array_slice(
1598 array: &arrow_array::Decimal64Array,
1599 indices: &[usize],
1600) -> Vec<FixedLenByteArray> {
1601 let mut values = Vec::with_capacity(indices.len());
1602 let size = decimal_length_from_precision(array.precision());
1603 for i in indices {
1604 let as_be_bytes = array.value(*i).to_be_bytes();
1605 let resized_value = as_be_bytes[(8 - size)..].to_vec();
1606 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1607 }
1608 values
1609}
1610
1611fn get_decimal_128_array_slice(
1612 array: &arrow_array::Decimal128Array,
1613 indices: &[usize],
1614) -> Vec<FixedLenByteArray> {
1615 let mut values = Vec::with_capacity(indices.len());
1616 let size = decimal_length_from_precision(array.precision());
1617 for i in indices {
1618 let as_be_bytes = array.value(*i).to_be_bytes();
1619 let resized_value = as_be_bytes[(16 - size)..].to_vec();
1620 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1621 }
1622 values
1623}
1624
1625fn get_decimal_256_array_slice(
1626 array: &arrow_array::Decimal256Array,
1627 indices: &[usize],
1628) -> Vec<FixedLenByteArray> {
1629 let mut values = Vec::with_capacity(indices.len());
1630 let size = decimal_length_from_precision(array.precision());
1631 for i in indices {
1632 let as_be_bytes = array.value(*i).to_be_bytes();
1633 let resized_value = as_be_bytes[(32 - size)..].to_vec();
1634 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1635 }
1636 values
1637}
1638
1639fn get_float_16_array_slice(
1640 array: &arrow_array::Float16Array,
1641 indices: &[usize],
1642) -> Vec<FixedLenByteArray> {
1643 let mut values = Vec::with_capacity(indices.len());
1644 for i in indices {
1645 let value = array.value(*i).to_le_bytes().to_vec();
1646 values.push(FixedLenByteArray::from(ByteArray::from(value)));
1647 }
1648 values
1649}
1650
1651fn get_fsb_array_slice(
1652 array: &arrow_array::FixedSizeBinaryArray,
1653 indices: &[usize],
1654) -> Vec<FixedLenByteArray> {
1655 let mut values = Vec::with_capacity(indices.len());
1656 for i in indices {
1657 let value = array.value(*i).to_vec();
1658 values.push(FixedLenByteArray::from(ByteArray::from(value)))
1659 }
1660 values
1661}
1662
1663#[cfg(test)]
1664mod tests {
1665 use super::*;
1666 use std::collections::HashMap;
1667
1668 use std::fs::File;
1669
1670 use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
1671 use crate::arrow::{ARROW_SCHEMA_META_KEY, PARQUET_FIELD_ID_META_KEY};
1672 use crate::column::page::{Page, PageReader};
1673 use crate::file::metadata::thrift::PageHeader;
1674 use crate::file::page_index::column_index::ColumnIndexMetaData;
1675 use crate::file::reader::SerializedPageReader;
1676 use crate::parquet_thrift::{ReadThrift, ThriftSliceInputProtocol};
1677 use crate::schema::types::ColumnPath;
1678 use arrow::datatypes::ToByteSlice;
1679 use arrow::datatypes::{DataType, Schema};
1680 use arrow::error::Result as ArrowResult;
1681 use arrow::util::data_gen::create_random_array;
1682 use arrow::util::pretty::pretty_format_batches;
1683 use arrow::{array::*, buffer::Buffer};
1684 use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano, NullBuffer, OffsetBuffer, i256};
1685 use arrow_schema::Fields;
1686 use half::f16;
1687 use num_traits::{FromPrimitive, ToPrimitive};
1688 use tempfile::tempfile;
1689
1690 use crate::basic::Encoding;
1691 use crate::data_type::AsBytes;
1692 use crate::file::metadata::{ColumnChunkMetaData, ParquetMetaData, ParquetMetaDataReader};
1693 use crate::file::properties::{
1694 BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion,
1695 };
1696 use crate::file::serialized_reader::ReadOptionsBuilder;
1697 use crate::file::{
1698 reader::{FileReader, SerializedFileReader},
1699 statistics::Statistics,
1700 };
1701
1702 #[test]
1703 fn arrow_writer() {
1704 let schema = Schema::new(vec![
1706 Field::new("a", DataType::Int32, false),
1707 Field::new("b", DataType::Int32, true),
1708 ]);
1709
1710 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1712 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1713
1714 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
1716
1717 roundtrip(batch, Some(SMALL_SIZE / 2));
1718 }
1719
1720 fn get_bytes_after_close(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1721 let mut buffer = vec![];
1722
1723 let mut writer = ArrowWriter::try_new(&mut buffer, schema, None).unwrap();
1724 writer.write(expected_batch).unwrap();
1725 writer.close().unwrap();
1726
1727 buffer
1728 }
1729
1730 fn get_bytes_by_into_inner(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1731 let mut writer = ArrowWriter::try_new(Vec::new(), schema, None).unwrap();
1732 writer.write(expected_batch).unwrap();
1733 writer.into_inner().unwrap()
1734 }
1735
1736 #[test]
1737 fn roundtrip_bytes() {
1738 let schema = Arc::new(Schema::new(vec![
1740 Field::new("a", DataType::Int32, false),
1741 Field::new("b", DataType::Int32, true),
1742 ]));
1743
1744 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1746 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1747
1748 let expected_batch =
1750 RecordBatch::try_new(schema.clone(), vec![Arc::new(a), Arc::new(b)]).unwrap();
1751
1752 for buffer in [
1753 get_bytes_after_close(schema.clone(), &expected_batch),
1754 get_bytes_by_into_inner(schema, &expected_batch),
1755 ] {
1756 let cursor = Bytes::from(buffer);
1757 let mut record_batch_reader = ParquetRecordBatchReader::try_new(cursor, 1024).unwrap();
1758
1759 let actual_batch = record_batch_reader
1760 .next()
1761 .expect("No batch found")
1762 .expect("Unable to get batch");
1763
1764 assert_eq!(expected_batch.schema(), actual_batch.schema());
1765 assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
1766 assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
1767 for i in 0..expected_batch.num_columns() {
1768 let expected_data = expected_batch.column(i).to_data();
1769 let actual_data = actual_batch.column(i).to_data();
1770
1771 assert_eq!(expected_data, actual_data);
1772 }
1773 }
1774 }
1775
1776 #[test]
1777 fn arrow_writer_non_null() {
1778 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1780
1781 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1783
1784 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1786
1787 roundtrip(batch, Some(SMALL_SIZE / 2));
1788 }
1789
1790 #[test]
1791 fn arrow_writer_list() {
1792 let schema = Schema::new(vec![Field::new(
1794 "a",
1795 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
1796 true,
1797 )]);
1798
1799 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1801
1802 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1805
1806 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
1808 DataType::Int32,
1809 false,
1810 ))))
1811 .len(5)
1812 .add_buffer(a_value_offsets)
1813 .add_child_data(a_values.into_data())
1814 .null_bit_buffer(Some(Buffer::from([0b00011011])))
1815 .build()
1816 .unwrap();
1817 let a = ListArray::from(a_list_data);
1818
1819 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1821
1822 assert_eq!(batch.column(0).null_count(), 1);
1823
1824 roundtrip(batch, None);
1827 }
1828
1829 #[test]
1830 fn arrow_writer_list_non_null() {
1831 let schema = Schema::new(vec![Field::new(
1833 "a",
1834 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
1835 false,
1836 )]);
1837
1838 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1840
1841 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1844
1845 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
1847 DataType::Int32,
1848 false,
1849 ))))
1850 .len(5)
1851 .add_buffer(a_value_offsets)
1852 .add_child_data(a_values.into_data())
1853 .build()
1854 .unwrap();
1855 let a = ListArray::from(a_list_data);
1856
1857 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1859
1860 assert_eq!(batch.column(0).null_count(), 0);
1863
1864 roundtrip(batch, None);
1865 }
1866
1867 #[test]
1868 fn arrow_writer_list_view() {
1869 let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
1870 let schema = Schema::new(vec![Field::new(
1871 "a",
1872 DataType::ListView(list_field.clone()),
1873 true,
1874 )]);
1875
1876 let a = ListViewArray::new(
1878 list_field,
1879 vec![0, 1, 0, 3, 6].into(),
1880 vec![1, 2, 0, 3, 4].into(),
1881 Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
1882 Some(vec![true, true, false, true, true].into()),
1883 );
1884
1885 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1886
1887 assert_eq!(batch.column(0).null_count(), 1);
1888
1889 roundtrip(batch, None);
1890 }
1891
1892 #[test]
1893 fn arrow_writer_list_view_non_null() {
1894 let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
1895 let schema = Schema::new(vec![Field::new(
1896 "a",
1897 DataType::ListView(list_field.clone()),
1898 false,
1899 )]);
1900
1901 let a = ListViewArray::new(
1903 list_field,
1904 vec![0, 1, 0, 3, 6].into(),
1905 vec![1, 2, 0, 3, 4].into(),
1906 Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
1907 None,
1908 );
1909
1910 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1911
1912 assert_eq!(batch.column(0).null_count(), 0);
1913
1914 roundtrip(batch, None);
1915 }
1916
1917 #[test]
1918 fn arrow_writer_list_view_out_of_order() {
1919 let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
1920 let schema = Schema::new(vec![Field::new(
1921 "a",
1922 DataType::ListView(list_field.clone()),
1923 false,
1924 )]);
1925
1926 let a = ListViewArray::new(
1928 list_field,
1929 vec![0, 1, 0, 6, 3].into(),
1930 vec![1, 2, 0, 4, 3].into(),
1931 Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
1932 None,
1933 );
1934
1935 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1936
1937 roundtrip(batch, None);
1938 }
1939
1940 #[test]
1941 fn arrow_writer_large_list_view() {
1942 let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
1943 let schema = Schema::new(vec![Field::new(
1944 "a",
1945 DataType::LargeListView(list_field.clone()),
1946 true,
1947 )]);
1948
1949 let a = LargeListViewArray::new(
1951 list_field,
1952 vec![0i64, 1, 0, 3, 6].into(),
1953 vec![1i64, 2, 0, 3, 4].into(),
1954 Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
1955 Some(vec![true, true, false, true, true].into()),
1956 );
1957
1958 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1959
1960 assert_eq!(batch.column(0).null_count(), 1);
1961
1962 roundtrip(batch, None);
1963 }
1964
1965 #[test]
1966 fn arrow_writer_list_view_with_struct() {
1967 let struct_fields = Fields::from(vec![
1969 Field::new("id", DataType::Int32, false),
1970 Field::new("name", DataType::Utf8, false),
1971 ]);
1972 let struct_type = DataType::Struct(struct_fields.clone());
1973 let list_field = Arc::new(Field::new("item", struct_type.clone(), false));
1974
1975 let schema = Schema::new(vec![Field::new(
1976 "a",
1977 DataType::ListView(list_field.clone()),
1978 true,
1979 )]);
1980
1981 let id_array = Int32Array::from(vec![1, 2, 3, 4, 5]);
1983 let name_array = StringArray::from(vec!["a", "b", "c", "d", "e"]);
1984 let struct_array = StructArray::new(
1985 struct_fields,
1986 vec![Arc::new(id_array), Arc::new(name_array)],
1987 None,
1988 );
1989
1990 let list_view = ListViewArray::new(
1992 list_field,
1993 vec![0, 2, 2].into(), vec![2, 0, 3].into(), Arc::new(struct_array),
1996 Some(vec![true, false, true].into()),
1997 );
1998
1999 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(list_view)]).unwrap();
2000
2001 roundtrip(batch, None);
2002 }
2003
2004 #[test]
2005 fn arrow_writer_binary() {
2006 let string_field = Field::new("a", DataType::Utf8, false);
2007 let binary_field = Field::new("b", DataType::Binary, false);
2008 let schema = Schema::new(vec![string_field, binary_field]);
2009
2010 let raw_string_values = vec!["foo", "bar", "baz", "quux"];
2011 let raw_binary_values = [
2012 b"foo".to_vec(),
2013 b"bar".to_vec(),
2014 b"baz".to_vec(),
2015 b"quux".to_vec(),
2016 ];
2017 let raw_binary_value_refs = raw_binary_values
2018 .iter()
2019 .map(|x| x.as_slice())
2020 .collect::<Vec<_>>();
2021
2022 let string_values = StringArray::from(raw_string_values.clone());
2023 let binary_values = BinaryArray::from(raw_binary_value_refs);
2024 let batch = RecordBatch::try_new(
2025 Arc::new(schema),
2026 vec![Arc::new(string_values), Arc::new(binary_values)],
2027 )
2028 .unwrap();
2029
2030 roundtrip(batch, Some(SMALL_SIZE / 2));
2031 }
2032
2033 #[test]
2034 fn arrow_writer_binary_view() {
2035 let string_field = Field::new("a", DataType::Utf8View, false);
2036 let binary_field = Field::new("b", DataType::BinaryView, false);
2037 let nullable_string_field = Field::new("a", DataType::Utf8View, true);
2038 let schema = Schema::new(vec![string_field, binary_field, nullable_string_field]);
2039
2040 let raw_string_values = vec!["foo", "bar", "large payload over 12 bytes", "lulu"];
2041 let raw_binary_values = vec![
2042 b"foo".to_vec(),
2043 b"bar".to_vec(),
2044 b"large payload over 12 bytes".to_vec(),
2045 b"lulu".to_vec(),
2046 ];
2047 let nullable_string_values =
2048 vec![Some("foo"), None, Some("large payload over 12 bytes"), None];
2049
2050 let string_view_values = StringViewArray::from(raw_string_values);
2051 let binary_view_values = BinaryViewArray::from_iter_values(raw_binary_values);
2052 let nullable_string_view_values = StringViewArray::from(nullable_string_values);
2053 let batch = RecordBatch::try_new(
2054 Arc::new(schema),
2055 vec![
2056 Arc::new(string_view_values),
2057 Arc::new(binary_view_values),
2058 Arc::new(nullable_string_view_values),
2059 ],
2060 )
2061 .unwrap();
2062
2063 roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
2064 roundtrip(batch, None);
2065 }
2066
2067 #[test]
2068 fn arrow_writer_binary_view_long_value() {
2069 let string_field = Field::new("a", DataType::Utf8View, false);
2070 let binary_field = Field::new("b", DataType::BinaryView, false);
2071 let schema = Schema::new(vec![string_field, binary_field]);
2072
2073 let long = "a".repeat(128);
2077 let raw_string_values = vec!["foo", long.as_str(), "bar"];
2078 let raw_binary_values = vec![b"foo".to_vec(), long.as_bytes().to_vec(), b"bar".to_vec()];
2079
2080 let string_view_values: ArrayRef = Arc::new(StringViewArray::from(raw_string_values));
2081 let binary_view_values: ArrayRef =
2082 Arc::new(BinaryViewArray::from_iter_values(raw_binary_values));
2083
2084 one_column_roundtrip(Arc::clone(&string_view_values), false);
2085 one_column_roundtrip(Arc::clone(&binary_view_values), false);
2086
2087 let batch = RecordBatch::try_new(
2088 Arc::new(schema),
2089 vec![string_view_values, binary_view_values],
2090 )
2091 .unwrap();
2092
2093 for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2095 let props = WriterProperties::builder()
2096 .set_writer_version(version)
2097 .set_dictionary_enabled(false)
2098 .build();
2099 roundtrip_opts(&batch, props);
2100 }
2101 }
2102
2103 fn get_decimal_batch(precision: u8, scale: i8) -> RecordBatch {
2104 let decimal_field = Field::new("a", DataType::Decimal128(precision, scale), false);
2105 let schema = Schema::new(vec![decimal_field]);
2106
2107 let decimal_values = vec![10_000, 50_000, 0, -100]
2108 .into_iter()
2109 .map(Some)
2110 .collect::<Decimal128Array>()
2111 .with_precision_and_scale(precision, scale)
2112 .unwrap();
2113
2114 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(decimal_values)]).unwrap()
2115 }
2116
2117 #[test]
2118 fn arrow_writer_decimal() {
2119 let batch_int32_decimal = get_decimal_batch(5, 2);
2121 roundtrip(batch_int32_decimal, Some(SMALL_SIZE / 2));
2122 let batch_int64_decimal = get_decimal_batch(12, 2);
2124 roundtrip(batch_int64_decimal, Some(SMALL_SIZE / 2));
2125 let batch_fixed_len_byte_array_decimal = get_decimal_batch(30, 2);
2127 roundtrip(batch_fixed_len_byte_array_decimal, Some(SMALL_SIZE / 2));
2128 }
2129
2130 #[test]
2131 fn arrow_writer_complex() {
2132 let struct_field_d = Arc::new(Field::new("d", DataType::Float64, true));
2134 let struct_field_f = Arc::new(Field::new("f", DataType::Float32, true));
2135 let struct_field_g = Arc::new(Field::new_list(
2136 "g",
2137 Field::new_list_field(DataType::Int16, true),
2138 false,
2139 ));
2140 let struct_field_h = Arc::new(Field::new_list(
2141 "h",
2142 Field::new_list_field(DataType::Int16, false),
2143 true,
2144 ));
2145 let struct_field_e = Arc::new(Field::new_struct(
2146 "e",
2147 vec![
2148 struct_field_f.clone(),
2149 struct_field_g.clone(),
2150 struct_field_h.clone(),
2151 ],
2152 false,
2153 ));
2154 let schema = Schema::new(vec![
2155 Field::new("a", DataType::Int32, false),
2156 Field::new("b", DataType::Int32, true),
2157 Field::new_struct(
2158 "c",
2159 vec![struct_field_d.clone(), struct_field_e.clone()],
2160 false,
2161 ),
2162 ]);
2163
2164 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2166 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
2167 let d = Float64Array::from(vec![None, None, None, Some(1.0), None]);
2168 let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]);
2169
2170 let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2171
2172 let g_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
2175
2176 let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
2178 .len(5)
2179 .add_buffer(g_value_offsets.clone())
2180 .add_child_data(g_value.to_data())
2181 .build()
2182 .unwrap();
2183 let g = ListArray::from(g_list_data);
2184 let h_list_data = ArrayData::builder(struct_field_h.data_type().clone())
2186 .len(5)
2187 .add_buffer(g_value_offsets)
2188 .add_child_data(g_value.to_data())
2189 .null_bit_buffer(Some(Buffer::from([0b00011011])))
2190 .build()
2191 .unwrap();
2192 let h = ListArray::from(h_list_data);
2193
2194 let e = StructArray::from(vec![
2195 (struct_field_f, Arc::new(f) as ArrayRef),
2196 (struct_field_g, Arc::new(g) as ArrayRef),
2197 (struct_field_h, Arc::new(h) as ArrayRef),
2198 ]);
2199
2200 let c = StructArray::from(vec![
2201 (struct_field_d, Arc::new(d) as ArrayRef),
2202 (struct_field_e, Arc::new(e) as ArrayRef),
2203 ]);
2204
2205 let batch = RecordBatch::try_new(
2207 Arc::new(schema),
2208 vec![Arc::new(a), Arc::new(b), Arc::new(c)],
2209 )
2210 .unwrap();
2211
2212 roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
2213 roundtrip(batch, Some(SMALL_SIZE / 3));
2214 }
2215
2216 #[test]
2217 fn arrow_writer_complex_mixed() {
2218 let offset_field = Arc::new(Field::new("offset", DataType::Int32, false));
2223 let partition_field = Arc::new(Field::new("partition", DataType::Int64, true));
2224 let topic_field = Arc::new(Field::new("topic", DataType::Utf8, true));
2225 let schema = Schema::new(vec![Field::new(
2226 "some_nested_object",
2227 DataType::Struct(Fields::from(vec![
2228 offset_field.clone(),
2229 partition_field.clone(),
2230 topic_field.clone(),
2231 ])),
2232 false,
2233 )]);
2234
2235 let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
2237 let partition = Int64Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
2238 let topic = StringArray::from(vec![Some("A"), None, Some("A"), Some(""), None]);
2239
2240 let some_nested_object = StructArray::from(vec![
2241 (offset_field, Arc::new(offset) as ArrayRef),
2242 (partition_field, Arc::new(partition) as ArrayRef),
2243 (topic_field, Arc::new(topic) as ArrayRef),
2244 ]);
2245
2246 let batch =
2248 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
2249
2250 roundtrip(batch, Some(SMALL_SIZE / 2));
2251 }
2252
2253 #[test]
2254 fn arrow_writer_map() {
2255 let json_content = r#"
2257 {"stocks":{"long": "$AAA", "short": "$BBB"}}
2258 {"stocks":{"long": null, "long": "$CCC", "short": null}}
2259 {"stocks":{"hedged": "$YYY", "long": null, "short": "$D"}}
2260 "#;
2261 let entries_struct_type = DataType::Struct(Fields::from(vec![
2262 Field::new("key", DataType::Utf8, false),
2263 Field::new("value", DataType::Utf8, true),
2264 ]));
2265 let stocks_field = Field::new(
2266 "stocks",
2267 DataType::Map(
2268 Arc::new(Field::new("entries", entries_struct_type, false)),
2269 false,
2270 ),
2271 true,
2272 );
2273 let schema = Arc::new(Schema::new(vec![stocks_field]));
2274 let builder = arrow::json::ReaderBuilder::new(schema).with_batch_size(64);
2275 let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
2276
2277 let batch = reader.next().unwrap().unwrap();
2278 roundtrip(batch, None);
2279 }
2280
2281 #[test]
2282 fn arrow_writer_2_level_struct() {
2283 let field_c = Field::new("c", DataType::Int32, true);
2285 let field_b = Field::new("b", DataType::Struct(vec![field_c].into()), true);
2286 let type_a = DataType::Struct(vec![field_b.clone()].into());
2287 let field_a = Field::new("a", type_a, true);
2288 let schema = Schema::new(vec![field_a.clone()]);
2289
2290 let c = Int32Array::from(vec![Some(1), None, Some(3), None, None, Some(6)]);
2292 let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
2293 .len(6)
2294 .null_bit_buffer(Some(Buffer::from([0b00100111])))
2295 .add_child_data(c.into_data())
2296 .build()
2297 .unwrap();
2298 let b = StructArray::from(b_data);
2299 let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
2300 .len(6)
2301 .null_bit_buffer(Some(Buffer::from([0b00101111])))
2302 .add_child_data(b.into_data())
2303 .build()
2304 .unwrap();
2305 let a = StructArray::from(a_data);
2306
2307 assert_eq!(a.null_count(), 1);
2308 assert_eq!(a.column(0).null_count(), 2);
2309
2310 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2312
2313 roundtrip(batch, Some(SMALL_SIZE / 2));
2314 }
2315
2316 #[test]
2317 fn arrow_writer_2_level_struct_non_null() {
2318 let field_c = Field::new("c", DataType::Int32, false);
2320 let type_b = DataType::Struct(vec![field_c].into());
2321 let field_b = Field::new("b", type_b.clone(), false);
2322 let type_a = DataType::Struct(vec![field_b].into());
2323 let field_a = Field::new("a", type_a.clone(), false);
2324 let schema = Schema::new(vec![field_a]);
2325
2326 let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
2328 let b_data = ArrayDataBuilder::new(type_b)
2329 .len(6)
2330 .add_child_data(c.into_data())
2331 .build()
2332 .unwrap();
2333 let b = StructArray::from(b_data);
2334 let a_data = ArrayDataBuilder::new(type_a)
2335 .len(6)
2336 .add_child_data(b.into_data())
2337 .build()
2338 .unwrap();
2339 let a = StructArray::from(a_data);
2340
2341 assert_eq!(a.null_count(), 0);
2342 assert_eq!(a.column(0).null_count(), 0);
2343
2344 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2346
2347 roundtrip(batch, Some(SMALL_SIZE / 2));
2348 }
2349
2350 #[test]
2351 fn arrow_writer_2_level_struct_mixed_null() {
2352 let field_c = Field::new("c", DataType::Int32, false);
2354 let type_b = DataType::Struct(vec![field_c].into());
2355 let field_b = Field::new("b", type_b.clone(), true);
2356 let type_a = DataType::Struct(vec![field_b].into());
2357 let field_a = Field::new("a", type_a.clone(), false);
2358 let schema = Schema::new(vec![field_a]);
2359
2360 let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
2362 let b_data = ArrayDataBuilder::new(type_b)
2363 .len(6)
2364 .null_bit_buffer(Some(Buffer::from([0b00100111])))
2365 .add_child_data(c.into_data())
2366 .build()
2367 .unwrap();
2368 let b = StructArray::from(b_data);
2369 let a_data = ArrayDataBuilder::new(type_a)
2371 .len(6)
2372 .add_child_data(b.into_data())
2373 .build()
2374 .unwrap();
2375 let a = StructArray::from(a_data);
2376
2377 assert_eq!(a.null_count(), 0);
2378 assert_eq!(a.column(0).null_count(), 2);
2379
2380 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2382
2383 roundtrip(batch, Some(SMALL_SIZE / 2));
2384 }
2385
2386 #[test]
2387 fn arrow_writer_2_level_struct_mixed_null_2() {
2388 let field_c = Field::new("c", DataType::Int32, false);
2390 let field_d = Field::new("d", DataType::FixedSizeBinary(4), false);
2391 let field_e = Field::new(
2392 "e",
2393 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2394 false,
2395 );
2396
2397 let field_b = Field::new(
2398 "b",
2399 DataType::Struct(vec![field_c, field_d, field_e].into()),
2400 false,
2401 );
2402 let type_a = DataType::Struct(vec![field_b.clone()].into());
2403 let field_a = Field::new("a", type_a, true);
2404 let schema = Schema::new(vec![field_a.clone()]);
2405
2406 let c = Int32Array::from_iter_values(0..6);
2408 let d = FixedSizeBinaryArray::try_from_iter(
2409 ["aaaa", "bbbb", "cccc", "dddd", "eeee", "ffff"].into_iter(),
2410 )
2411 .expect("four byte values");
2412 let e = Int32DictionaryArray::from_iter(["one", "two", "three", "four", "five", "one"]);
2413 let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
2414 .len(6)
2415 .add_child_data(c.into_data())
2416 .add_child_data(d.into_data())
2417 .add_child_data(e.into_data())
2418 .build()
2419 .unwrap();
2420 let b = StructArray::from(b_data);
2421 let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
2422 .len(6)
2423 .null_bit_buffer(Some(Buffer::from([0b00100101])))
2424 .add_child_data(b.into_data())
2425 .build()
2426 .unwrap();
2427 let a = StructArray::from(a_data);
2428
2429 assert_eq!(a.null_count(), 3);
2430 assert_eq!(a.column(0).null_count(), 0);
2431
2432 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2434
2435 roundtrip(batch, Some(SMALL_SIZE / 2));
2436 }
2437
2438 #[test]
2439 fn test_fixed_size_binary_in_dict() {
2440 fn test_fixed_size_binary_in_dict_inner<K>()
2441 where
2442 K: ArrowDictionaryKeyType,
2443 K::Native: FromPrimitive + ToPrimitive + TryFrom<u8>,
2444 <<K as arrow_array::ArrowPrimitiveType>::Native as TryFrom<u8>>::Error: std::fmt::Debug,
2445 {
2446 let field = Field::new(
2447 "a",
2448 DataType::Dictionary(
2449 Box::new(K::DATA_TYPE),
2450 Box::new(DataType::FixedSizeBinary(4)),
2451 ),
2452 false,
2453 );
2454 let schema = Schema::new(vec![field]);
2455
2456 let keys: Vec<K::Native> = vec![
2457 K::Native::try_from(0u8).unwrap(),
2458 K::Native::try_from(0u8).unwrap(),
2459 K::Native::try_from(1u8).unwrap(),
2460 ];
2461 let keys = PrimitiveArray::<K>::from_iter_values(keys);
2462 let values = FixedSizeBinaryArray::try_from_iter(
2463 vec![vec![0, 0, 0, 0], vec![1, 1, 1, 1]].into_iter(),
2464 )
2465 .unwrap();
2466
2467 let data = DictionaryArray::<K>::new(keys, Arc::new(values));
2468 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(data)]).unwrap();
2469 roundtrip(batch, None);
2470 }
2471
2472 test_fixed_size_binary_in_dict_inner::<UInt8Type>();
2473 test_fixed_size_binary_in_dict_inner::<UInt16Type>();
2474 test_fixed_size_binary_in_dict_inner::<UInt32Type>();
2475 test_fixed_size_binary_in_dict_inner::<UInt16Type>();
2476 test_fixed_size_binary_in_dict_inner::<Int8Type>();
2477 test_fixed_size_binary_in_dict_inner::<Int16Type>();
2478 test_fixed_size_binary_in_dict_inner::<Int32Type>();
2479 test_fixed_size_binary_in_dict_inner::<Int64Type>();
2480 }
2481
2482 #[test]
2483 fn test_empty_dict() {
2484 let struct_fields = Fields::from(vec![Field::new(
2485 "dict",
2486 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2487 false,
2488 )]);
2489
2490 let schema = Schema::new(vec![Field::new_struct(
2491 "struct",
2492 struct_fields.clone(),
2493 true,
2494 )]);
2495 let dictionary = Arc::new(DictionaryArray::new(
2496 Int32Array::new_null(5),
2497 Arc::new(StringArray::new_null(0)),
2498 ));
2499
2500 let s = StructArray::new(
2501 struct_fields,
2502 vec![dictionary],
2503 Some(NullBuffer::new_null(5)),
2504 );
2505
2506 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(s)]).unwrap();
2507 roundtrip(batch, None);
2508 }
2509 #[test]
2510 fn arrow_writer_page_size() {
2511 let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)]));
2512
2513 let mut builder = StringBuilder::with_capacity(100, 329 * 10_000);
2514
2515 for i in 0..10 {
2517 let value = i
2518 .to_string()
2519 .repeat(10)
2520 .chars()
2521 .take(10)
2522 .collect::<String>();
2523
2524 builder.append_value(value);
2525 }
2526
2527 let array = Arc::new(builder.finish());
2528
2529 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
2530
2531 let file = tempfile::tempfile().unwrap();
2532
2533 let props = WriterProperties::builder()
2535 .set_data_page_size_limit(1)
2536 .set_dictionary_page_size_limit(1)
2537 .set_write_batch_size(1)
2538 .build();
2539
2540 let mut writer =
2541 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
2542 .expect("Unable to write file");
2543 writer.write(&batch).unwrap();
2544 writer.close().unwrap();
2545
2546 let options = ReadOptionsBuilder::new().with_page_index().build();
2547 let reader =
2548 SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap();
2549
2550 let column = reader.metadata().row_group(0).columns();
2551
2552 assert_eq!(column.len(), 1);
2553
2554 assert!(
2557 column[0].dictionary_page_offset().is_some(),
2558 "Expected a dictionary page"
2559 );
2560
2561 assert!(reader.metadata().offset_index().is_some());
2562 let offset_indexes = &reader.metadata().offset_index().unwrap()[0];
2563
2564 let page_locations = offset_indexes[0].page_locations.clone();
2565
2566 assert_eq!(
2569 page_locations.len(),
2570 10,
2571 "Expected 10 pages but got {page_locations:#?}"
2572 );
2573 }
2574
2575 #[test]
2576 fn arrow_writer_float_nans() {
2577 let f16_field = Field::new("a", DataType::Float16, false);
2578 let f32_field = Field::new("b", DataType::Float32, false);
2579 let f64_field = Field::new("c", DataType::Float64, false);
2580 let schema = Schema::new(vec![f16_field, f32_field, f64_field]);
2581
2582 let f16_values = (0..MEDIUM_SIZE)
2583 .map(|i| {
2584 Some(if i % 2 == 0 {
2585 f16::NAN
2586 } else {
2587 f16::from_f32(i as f32)
2588 })
2589 })
2590 .collect::<Float16Array>();
2591
2592 let f32_values = (0..MEDIUM_SIZE)
2593 .map(|i| Some(if i % 2 == 0 { f32::NAN } else { i as f32 }))
2594 .collect::<Float32Array>();
2595
2596 let f64_values = (0..MEDIUM_SIZE)
2597 .map(|i| Some(if i % 2 == 0 { f64::NAN } else { i as f64 }))
2598 .collect::<Float64Array>();
2599
2600 let batch = RecordBatch::try_new(
2601 Arc::new(schema),
2602 vec![
2603 Arc::new(f16_values),
2604 Arc::new(f32_values),
2605 Arc::new(f64_values),
2606 ],
2607 )
2608 .unwrap();
2609
2610 roundtrip(batch, None);
2611 }
2612
2613 const SMALL_SIZE: usize = 7;
2614 const MEDIUM_SIZE: usize = 63;
2615
2616 fn roundtrip(expected_batch: RecordBatch, max_row_group_size: Option<usize>) -> Vec<Bytes> {
2619 let mut files = vec![];
2620 for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2621 let mut props = WriterProperties::builder().set_writer_version(version);
2622
2623 if let Some(size) = max_row_group_size {
2624 props = props.set_max_row_group_row_count(Some(size))
2625 }
2626
2627 let props = props.build();
2628 files.push(roundtrip_opts(&expected_batch, props))
2629 }
2630 files
2631 }
2632
2633 fn roundtrip_opts_with_array_validation<F>(
2637 expected_batch: &RecordBatch,
2638 props: WriterProperties,
2639 validate: F,
2640 ) -> Bytes
2641 where
2642 F: Fn(&ArrayData, &ArrayData),
2643 {
2644 let mut file = vec![];
2645
2646 let mut writer = ArrowWriter::try_new(&mut file, expected_batch.schema(), Some(props))
2647 .expect("Unable to write file");
2648 writer.write(expected_batch).unwrap();
2649 writer.close().unwrap();
2650
2651 let file = Bytes::from(file);
2652 let mut record_batch_reader =
2653 ParquetRecordBatchReader::try_new(file.clone(), 1024).unwrap();
2654
2655 let actual_batch = record_batch_reader
2656 .next()
2657 .expect("No batch found")
2658 .expect("Unable to get batch");
2659
2660 assert_eq!(expected_batch.schema(), actual_batch.schema());
2661 assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
2662 assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
2663 for i in 0..expected_batch.num_columns() {
2664 let expected_data = expected_batch.column(i).to_data();
2665 let actual_data = actual_batch.column(i).to_data();
2666 validate(&expected_data, &actual_data);
2667 }
2668
2669 file
2670 }
2671
2672 fn roundtrip_opts(expected_batch: &RecordBatch, props: WriterProperties) -> Bytes {
2673 roundtrip_opts_with_array_validation(expected_batch, props, |a, b| {
2674 a.validate_full().expect("valid expected data");
2675 b.validate_full().expect("valid actual data");
2676 assert_eq!(a, b)
2677 })
2678 }
2679
2680 struct RoundTripOptions {
2681 values: ArrayRef,
2682 schema: SchemaRef,
2683 bloom_filter: bool,
2684 bloom_filter_position: BloomFilterPosition,
2685 }
2686
2687 impl RoundTripOptions {
2688 fn new(values: ArrayRef, nullable: bool) -> Self {
2689 let data_type = values.data_type().clone();
2690 let schema = Schema::new(vec![Field::new("col", data_type, nullable)]);
2691 Self {
2692 values,
2693 schema: Arc::new(schema),
2694 bloom_filter: false,
2695 bloom_filter_position: BloomFilterPosition::AfterRowGroup,
2696 }
2697 }
2698 }
2699
2700 fn one_column_roundtrip(values: ArrayRef, nullable: bool) -> Vec<Bytes> {
2701 one_column_roundtrip_with_options(RoundTripOptions::new(values, nullable))
2702 }
2703
2704 fn one_column_roundtrip_with_schema(values: ArrayRef, schema: SchemaRef) -> Vec<Bytes> {
2705 let mut options = RoundTripOptions::new(values, false);
2706 options.schema = schema;
2707 one_column_roundtrip_with_options(options)
2708 }
2709
2710 fn one_column_roundtrip_with_options(options: RoundTripOptions) -> Vec<Bytes> {
2711 let RoundTripOptions {
2712 values,
2713 schema,
2714 bloom_filter,
2715 bloom_filter_position,
2716 } = options;
2717
2718 let encodings = match values.data_type() {
2719 DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary | DataType::LargeBinary => {
2720 vec![
2721 Encoding::PLAIN,
2722 Encoding::DELTA_BYTE_ARRAY,
2723 Encoding::DELTA_LENGTH_BYTE_ARRAY,
2724 ]
2725 }
2726 DataType::Int64
2727 | DataType::Int32
2728 | DataType::Int16
2729 | DataType::Int8
2730 | DataType::UInt64
2731 | DataType::UInt32
2732 | DataType::UInt16
2733 | DataType::UInt8 => vec![
2734 Encoding::PLAIN,
2735 Encoding::DELTA_BINARY_PACKED,
2736 Encoding::BYTE_STREAM_SPLIT,
2737 ],
2738 DataType::Float32 | DataType::Float64 => {
2739 vec![Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT]
2740 }
2741 _ => vec![Encoding::PLAIN],
2742 };
2743
2744 let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2745
2746 let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
2747
2748 let mut files = vec![];
2749 for dictionary_size in [0, 1, 1024] {
2750 for encoding in &encodings {
2751 for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2752 for row_group_size in row_group_sizes {
2753 let props = WriterProperties::builder()
2754 .set_writer_version(version)
2755 .set_max_row_group_row_count(Some(row_group_size))
2756 .set_dictionary_enabled(dictionary_size != 0)
2757 .set_dictionary_page_size_limit(dictionary_size.max(1))
2758 .set_encoding(*encoding)
2759 .set_bloom_filter_enabled(bloom_filter)
2760 .set_bloom_filter_position(bloom_filter_position)
2761 .build();
2762
2763 files.push(roundtrip_opts(&expected_batch, props))
2764 }
2765 }
2766 }
2767 }
2768 files
2769 }
2770
2771 fn values_required<A, I>(iter: I) -> Vec<Bytes>
2772 where
2773 A: From<Vec<I::Item>> + Array + 'static,
2774 I: IntoIterator,
2775 {
2776 let raw_values: Vec<_> = iter.into_iter().collect();
2777 let values = Arc::new(A::from(raw_values));
2778 one_column_roundtrip(values, false)
2779 }
2780
2781 fn values_optional<A, I>(iter: I) -> Vec<Bytes>
2782 where
2783 A: From<Vec<Option<I::Item>>> + Array + 'static,
2784 I: IntoIterator,
2785 {
2786 let optional_raw_values: Vec<_> = iter
2787 .into_iter()
2788 .enumerate()
2789 .map(|(i, v)| if i % 2 == 0 { None } else { Some(v) })
2790 .collect();
2791 let optional_values = Arc::new(A::from(optional_raw_values));
2792 one_column_roundtrip(optional_values, true)
2793 }
2794
2795 fn required_and_optional<A, I>(iter: I)
2796 where
2797 A: From<Vec<I::Item>> + From<Vec<Option<I::Item>>> + Array + 'static,
2798 I: IntoIterator + Clone,
2799 {
2800 values_required::<A, I>(iter.clone());
2801 values_optional::<A, I>(iter);
2802 }
2803
2804 fn check_bloom_filter<T: AsBytes>(
2805 files: Vec<Bytes>,
2806 file_column: String,
2807 positive_values: Vec<T>,
2808 negative_values: Vec<T>,
2809 ) {
2810 files.into_iter().take(1).for_each(|file| {
2811 let file_reader = SerializedFileReader::new_with_options(
2812 file,
2813 ReadOptionsBuilder::new()
2814 .with_reader_properties(
2815 ReaderProperties::builder()
2816 .set_read_bloom_filter(true)
2817 .build(),
2818 )
2819 .build(),
2820 )
2821 .expect("Unable to open file as Parquet");
2822 let metadata = file_reader.metadata();
2823
2824 let mut bloom_filters: Vec<_> = vec![];
2826 for (ri, row_group) in metadata.row_groups().iter().enumerate() {
2827 if let Some((column_index, _)) = row_group
2828 .columns()
2829 .iter()
2830 .enumerate()
2831 .find(|(_, column)| column.column_path().string() == file_column)
2832 {
2833 let row_group_reader = file_reader
2834 .get_row_group(ri)
2835 .expect("Unable to read row group");
2836 if let Some(sbbf) = row_group_reader.get_column_bloom_filter(column_index) {
2837 bloom_filters.push(sbbf.clone());
2838 } else {
2839 panic!("No bloom filter for column named {file_column} found");
2840 }
2841 } else {
2842 panic!("No column named {file_column} found");
2843 }
2844 }
2845
2846 positive_values.iter().for_each(|value| {
2847 let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2848 assert!(
2849 found.is_some(),
2850 "{}",
2851 format!("Value {:?} should be in bloom filter", value.as_bytes())
2852 );
2853 });
2854
2855 negative_values.iter().for_each(|value| {
2856 let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2857 assert!(
2858 found.is_none(),
2859 "{}",
2860 format!("Value {:?} should not be in bloom filter", value.as_bytes())
2861 );
2862 });
2863 });
2864 }
2865
2866 #[test]
2867 fn all_null_primitive_single_column() {
2868 let values = Arc::new(Int32Array::from(vec![None; SMALL_SIZE]));
2869 one_column_roundtrip(values, true);
2870 }
2871 #[test]
2872 fn null_single_column() {
2873 let values = Arc::new(NullArray::new(SMALL_SIZE));
2874 one_column_roundtrip(values, true);
2875 }
2877
2878 #[test]
2879 fn bool_single_column() {
2880 required_and_optional::<BooleanArray, _>(
2881 [true, false].iter().cycle().copied().take(SMALL_SIZE),
2882 );
2883 }
2884
2885 #[test]
2886 fn bool_large_single_column() {
2887 let values = Arc::new(
2888 [None, Some(true), Some(false)]
2889 .iter()
2890 .cycle()
2891 .copied()
2892 .take(200_000)
2893 .collect::<BooleanArray>(),
2894 );
2895 let schema = Schema::new(vec![Field::new("col", values.data_type().clone(), true)]);
2896 let expected_batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2897 let file = tempfile::tempfile().unwrap();
2898
2899 let mut writer =
2900 ArrowWriter::try_new(file.try_clone().unwrap(), expected_batch.schema(), None)
2901 .expect("Unable to write file");
2902 writer.write(&expected_batch).unwrap();
2903 writer.close().unwrap();
2904 }
2905
2906 #[test]
2907 fn check_page_offset_index_with_nan() {
2908 let values = Arc::new(Float64Array::from(vec![f64::NAN; 10]));
2909 let schema = Schema::new(vec![Field::new("col", DataType::Float64, true)]);
2910 let batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2911
2912 let mut out = Vec::with_capacity(1024);
2913 let mut writer =
2914 ArrowWriter::try_new(&mut out, batch.schema(), None).expect("Unable to write file");
2915 writer.write(&batch).unwrap();
2916 let file_meta_data = writer.close().unwrap();
2917 for row_group in file_meta_data.row_groups() {
2918 for column in row_group.columns() {
2919 assert!(column.offset_index_offset().is_some());
2920 assert!(column.offset_index_length().is_some());
2921 assert!(column.column_index_offset().is_none());
2922 assert!(column.column_index_length().is_none());
2923 }
2924 }
2925 }
2926
2927 #[test]
2928 fn i8_single_column() {
2929 required_and_optional::<Int8Array, _>(0..SMALL_SIZE as i8);
2930 }
2931
2932 #[test]
2933 fn i16_single_column() {
2934 required_and_optional::<Int16Array, _>(0..SMALL_SIZE as i16);
2935 }
2936
2937 #[test]
2938 fn i32_single_column() {
2939 required_and_optional::<Int32Array, _>(0..SMALL_SIZE as i32);
2940 }
2941
2942 #[test]
2943 fn i64_single_column() {
2944 required_and_optional::<Int64Array, _>(0..SMALL_SIZE as i64);
2945 }
2946
2947 #[test]
2948 fn u8_single_column() {
2949 required_and_optional::<UInt8Array, _>(0..SMALL_SIZE as u8);
2950 }
2951
2952 #[test]
2953 fn u16_single_column() {
2954 required_and_optional::<UInt16Array, _>(0..SMALL_SIZE as u16);
2955 }
2956
2957 #[test]
2958 fn u32_single_column() {
2959 required_and_optional::<UInt32Array, _>(0..SMALL_SIZE as u32);
2960 }
2961
2962 #[test]
2963 fn u64_single_column() {
2964 required_and_optional::<UInt64Array, _>(0..SMALL_SIZE as u64);
2965 }
2966
2967 #[test]
2968 fn f32_single_column() {
2969 required_and_optional::<Float32Array, _>((0..SMALL_SIZE).map(|i| i as f32));
2970 }
2971
2972 #[test]
2973 fn f64_single_column() {
2974 required_and_optional::<Float64Array, _>((0..SMALL_SIZE).map(|i| i as f64));
2975 }
2976
2977 #[test]
2982 fn timestamp_second_single_column() {
2983 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2984 let values = Arc::new(TimestampSecondArray::from(raw_values));
2985
2986 one_column_roundtrip(values, false);
2987 }
2988
2989 #[test]
2990 fn timestamp_millisecond_single_column() {
2991 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
2992 let values = Arc::new(TimestampMillisecondArray::from(raw_values));
2993
2994 one_column_roundtrip(values, false);
2995 }
2996
2997 #[test]
2998 fn timestamp_microsecond_single_column() {
2999 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
3000 let values = Arc::new(TimestampMicrosecondArray::from(raw_values));
3001
3002 one_column_roundtrip(values, false);
3003 }
3004
3005 #[test]
3006 fn timestamp_nanosecond_single_column() {
3007 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
3008 let values = Arc::new(TimestampNanosecondArray::from(raw_values));
3009
3010 one_column_roundtrip(values, false);
3011 }
3012
3013 #[test]
3014 fn date32_single_column() {
3015 required_and_optional::<Date32Array, _>(0..SMALL_SIZE as i32);
3016 }
3017
3018 #[test]
3019 fn date64_single_column() {
3020 required_and_optional::<Date64Array, _>(
3022 (0..(SMALL_SIZE as i64 * 86400000)).step_by(86400000),
3023 );
3024 }
3025
3026 #[test]
3027 fn time32_second_single_column() {
3028 required_and_optional::<Time32SecondArray, _>(0..SMALL_SIZE as i32);
3029 }
3030
3031 #[test]
3032 fn time32_millisecond_single_column() {
3033 required_and_optional::<Time32MillisecondArray, _>(0..SMALL_SIZE as i32);
3034 }
3035
3036 #[test]
3037 fn time64_microsecond_single_column() {
3038 required_and_optional::<Time64MicrosecondArray, _>(0..SMALL_SIZE as i64);
3039 }
3040
3041 #[test]
3042 fn time64_nanosecond_single_column() {
3043 required_and_optional::<Time64NanosecondArray, _>(0..SMALL_SIZE as i64);
3044 }
3045
3046 #[test]
3047 fn duration_second_single_column() {
3048 required_and_optional::<DurationSecondArray, _>(0..SMALL_SIZE as i64);
3049 }
3050
3051 #[test]
3052 fn duration_millisecond_single_column() {
3053 required_and_optional::<DurationMillisecondArray, _>(0..SMALL_SIZE as i64);
3054 }
3055
3056 #[test]
3057 fn duration_microsecond_single_column() {
3058 required_and_optional::<DurationMicrosecondArray, _>(0..SMALL_SIZE as i64);
3059 }
3060
3061 #[test]
3062 fn duration_nanosecond_single_column() {
3063 required_and_optional::<DurationNanosecondArray, _>(0..SMALL_SIZE as i64);
3064 }
3065
3066 #[test]
3067 fn interval_year_month_single_column() {
3068 required_and_optional::<IntervalYearMonthArray, _>(0..SMALL_SIZE as i32);
3069 }
3070
3071 #[test]
3072 fn interval_day_time_single_column() {
3073 required_and_optional::<IntervalDayTimeArray, _>(vec![
3074 IntervalDayTime::new(0, 1),
3075 IntervalDayTime::new(0, 3),
3076 IntervalDayTime::new(3, -2),
3077 IntervalDayTime::new(-200, 4),
3078 ]);
3079 }
3080
3081 #[test]
3082 #[should_panic(
3083 expected = "Attempting to write an Arrow interval type MonthDayNano to parquet that is not yet implemented"
3084 )]
3085 fn interval_month_day_nano_single_column() {
3086 required_and_optional::<IntervalMonthDayNanoArray, _>(vec![
3087 IntervalMonthDayNano::new(0, 1, 5),
3088 IntervalMonthDayNano::new(0, 3, 2),
3089 IntervalMonthDayNano::new(3, -2, -5),
3090 IntervalMonthDayNano::new(-200, 4, -1),
3091 ]);
3092 }
3093
3094 #[test]
3095 fn binary_single_column() {
3096 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3097 let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3098 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3099
3100 values_required::<BinaryArray, _>(many_vecs_iter);
3102 }
3103
3104 #[test]
3105 fn binary_view_single_column() {
3106 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3107 let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3108 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3109
3110 values_required::<BinaryViewArray, _>(many_vecs_iter);
3112 }
3113
3114 #[test]
3115 fn i32_column_bloom_filter_at_end() {
3116 let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
3117 let mut options = RoundTripOptions::new(array, false);
3118 options.bloom_filter = true;
3119 options.bloom_filter_position = BloomFilterPosition::End;
3120
3121 let files = one_column_roundtrip_with_options(options);
3122 check_bloom_filter(
3123 files,
3124 "col".to_string(),
3125 (0..SMALL_SIZE as i32).collect(),
3126 (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3127 );
3128 }
3129
3130 #[test]
3131 fn i32_column_bloom_filter() {
3132 let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
3133 let mut options = RoundTripOptions::new(array, false);
3134 options.bloom_filter = true;
3135
3136 let files = one_column_roundtrip_with_options(options);
3137 check_bloom_filter(
3138 files,
3139 "col".to_string(),
3140 (0..SMALL_SIZE as i32).collect(),
3141 (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3142 );
3143 }
3144
3145 #[test]
3146 fn binary_column_bloom_filter() {
3147 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3148 let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3149 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3150
3151 let array = Arc::new(BinaryArray::from_iter_values(many_vecs_iter));
3152 let mut options = RoundTripOptions::new(array, false);
3153 options.bloom_filter = true;
3154
3155 let files = one_column_roundtrip_with_options(options);
3156 check_bloom_filter(
3157 files,
3158 "col".to_string(),
3159 many_vecs,
3160 vec![vec![(SMALL_SIZE + 1) as u8]],
3161 );
3162 }
3163
3164 #[test]
3165 fn empty_string_null_column_bloom_filter() {
3166 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3167 let raw_strs = raw_values.iter().map(|s| s.as_str());
3168
3169 let array = Arc::new(StringArray::from_iter_values(raw_strs));
3170 let mut options = RoundTripOptions::new(array, false);
3171 options.bloom_filter = true;
3172
3173 let files = one_column_roundtrip_with_options(options);
3174
3175 let optional_raw_values: Vec<_> = raw_values
3176 .iter()
3177 .enumerate()
3178 .filter_map(|(i, v)| if i % 2 == 0 { None } else { Some(v.as_str()) })
3179 .collect();
3180 check_bloom_filter(files, "col".to_string(), optional_raw_values, vec![""]);
3182 }
3183
3184 #[test]
3185 fn large_binary_single_column() {
3186 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3187 let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3188 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3189
3190 values_required::<LargeBinaryArray, _>(many_vecs_iter);
3192 }
3193
3194 #[test]
3195 fn fixed_size_binary_single_column() {
3196 let mut builder = FixedSizeBinaryBuilder::new(4);
3197 builder.append_value(b"0123").unwrap();
3198 builder.append_null();
3199 builder.append_value(b"8910").unwrap();
3200 builder.append_value(b"1112").unwrap();
3201 let array = Arc::new(builder.finish());
3202
3203 one_column_roundtrip(array, true);
3204 }
3205
3206 #[test]
3207 fn string_single_column() {
3208 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3209 let raw_strs = raw_values.iter().map(|s| s.as_str());
3210
3211 required_and_optional::<StringArray, _>(raw_strs);
3212 }
3213
3214 #[test]
3215 fn large_string_single_column() {
3216 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3217 let raw_strs = raw_values.iter().map(|s| s.as_str());
3218
3219 required_and_optional::<LargeStringArray, _>(raw_strs);
3220 }
3221
3222 #[test]
3223 fn string_view_single_column() {
3224 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3225 let raw_strs = raw_values.iter().map(|s| s.as_str());
3226
3227 required_and_optional::<StringViewArray, _>(raw_strs);
3228 }
3229
3230 #[test]
3231 fn null_list_single_column() {
3232 let null_field = Field::new_list_field(DataType::Null, true);
3233 let list_field = Field::new("emptylist", DataType::List(Arc::new(null_field)), true);
3234
3235 let schema = Schema::new(vec![list_field]);
3236
3237 let a_values = NullArray::new(2);
3239 let a_value_offsets = arrow::buffer::Buffer::from([0, 0, 0, 2].to_byte_slice());
3240 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
3241 DataType::Null,
3242 true,
3243 ))))
3244 .len(3)
3245 .add_buffer(a_value_offsets)
3246 .null_bit_buffer(Some(Buffer::from([0b00000101])))
3247 .add_child_data(a_values.into_data())
3248 .build()
3249 .unwrap();
3250
3251 let a = ListArray::from(a_list_data);
3252
3253 assert!(a.is_valid(0));
3254 assert!(!a.is_valid(1));
3255 assert!(a.is_valid(2));
3256
3257 assert_eq!(a.value(0).len(), 0);
3258 assert_eq!(a.value(2).len(), 2);
3259 assert_eq!(a.value(2).logical_nulls().unwrap().null_count(), 2);
3260
3261 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
3262 roundtrip(batch, None);
3263 }
3264
3265 #[test]
3266 fn list_single_column() {
3267 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
3268 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
3269 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
3270 DataType::Int32,
3271 false,
3272 ))))
3273 .len(5)
3274 .add_buffer(a_value_offsets)
3275 .null_bit_buffer(Some(Buffer::from([0b00011011])))
3276 .add_child_data(a_values.into_data())
3277 .build()
3278 .unwrap();
3279
3280 assert_eq!(a_list_data.null_count(), 1);
3281
3282 let a = ListArray::from(a_list_data);
3283 let values = Arc::new(a);
3284
3285 one_column_roundtrip(values, true);
3286 }
3287
3288 #[test]
3289 fn large_list_single_column() {
3290 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
3291 let a_value_offsets = arrow::buffer::Buffer::from([0i64, 1, 3, 3, 6, 10].to_byte_slice());
3292 let a_list_data = ArrayData::builder(DataType::LargeList(Arc::new(Field::new(
3293 "large_item",
3294 DataType::Int32,
3295 true,
3296 ))))
3297 .len(5)
3298 .add_buffer(a_value_offsets)
3299 .add_child_data(a_values.into_data())
3300 .null_bit_buffer(Some(Buffer::from([0b00011011])))
3301 .build()
3302 .unwrap();
3303
3304 assert_eq!(a_list_data.null_count(), 1);
3306
3307 let a = LargeListArray::from(a_list_data);
3308 let values = Arc::new(a);
3309
3310 one_column_roundtrip(values, true);
3311 }
3312
3313 #[test]
3314 fn list_nested_nulls() {
3315 use arrow::datatypes::Int32Type;
3316 let data = vec![
3317 Some(vec![Some(1)]),
3318 Some(vec![Some(2), Some(3)]),
3319 None,
3320 Some(vec![Some(4), Some(5), None]),
3321 Some(vec![None]),
3322 Some(vec![Some(6), Some(7)]),
3323 ];
3324
3325 let list = ListArray::from_iter_primitive::<Int32Type, _, _>(data.clone());
3326 one_column_roundtrip(Arc::new(list), true);
3327
3328 let list = LargeListArray::from_iter_primitive::<Int32Type, _, _>(data);
3329 one_column_roundtrip(Arc::new(list), true);
3330 }
3331
3332 #[test]
3333 fn struct_single_column() {
3334 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
3335 let struct_field_a = Arc::new(Field::new("f", DataType::Int32, false));
3336 let s = StructArray::from(vec![(struct_field_a, Arc::new(a_values) as ArrayRef)]);
3337
3338 let values = Arc::new(s);
3339 one_column_roundtrip(values, false);
3340 }
3341
3342 #[test]
3343 fn list_and_map_coerced_names() {
3344 let list_field =
3346 Field::new_list("my_list", Field::new("item", DataType::Int32, false), false);
3347 let map_field = Field::new_map(
3348 "my_map",
3349 "entries",
3350 Field::new("keys", DataType::Int32, false),
3351 Field::new("values", DataType::Int32, true),
3352 false,
3353 true,
3354 );
3355
3356 let list_array = create_random_array(&list_field, 100, 0.0, 0.0).unwrap();
3357 let map_array = create_random_array(&map_field, 100, 0.0, 0.0).unwrap();
3358
3359 let arrow_schema = Arc::new(Schema::new(vec![list_field, map_field]));
3360
3361 let props = Some(WriterProperties::builder().set_coerce_types(true).build());
3363 let file = tempfile::tempfile().unwrap();
3364 let mut writer =
3365 ArrowWriter::try_new(file.try_clone().unwrap(), arrow_schema.clone(), props).unwrap();
3366
3367 let batch = RecordBatch::try_new(arrow_schema, vec![list_array, map_array]).unwrap();
3368 writer.write(&batch).unwrap();
3369 let file_metadata = writer.close().unwrap();
3370
3371 let schema = file_metadata.file_metadata().schema();
3372 let list_field = &schema.get_fields()[0].get_fields()[0];
3374 assert_eq!(list_field.get_fields()[0].name(), "element");
3375
3376 let map_field = &schema.get_fields()[1].get_fields()[0];
3377 assert_eq!(map_field.name(), "key_value");
3379 assert_eq!(map_field.get_fields()[0].name(), "key");
3381 assert_eq!(map_field.get_fields()[1].name(), "value");
3383
3384 let reader = SerializedFileReader::new(file).unwrap();
3386 let file_schema = reader.metadata().file_metadata().schema();
3387 let fields = file_schema.get_fields();
3388 let list_field = &fields[0].get_fields()[0];
3389 assert_eq!(list_field.get_fields()[0].name(), "element");
3390 let map_field = &fields[1].get_fields()[0];
3391 assert_eq!(map_field.name(), "key_value");
3392 assert_eq!(map_field.get_fields()[0].name(), "key");
3393 assert_eq!(map_field.get_fields()[1].name(), "value");
3394 }
3395
3396 #[test]
3397 fn fallback_flush_data_page() {
3398 let raw_values: Vec<_> = (0..MEDIUM_SIZE).map(|i| i.to_string()).collect();
3400 let values = Arc::new(StringArray::from(raw_values));
3401 let encodings = vec![
3402 Encoding::DELTA_BYTE_ARRAY,
3403 Encoding::DELTA_LENGTH_BYTE_ARRAY,
3404 ];
3405 let data_type = values.data_type().clone();
3406 let schema = Arc::new(Schema::new(vec![Field::new("col", data_type, false)]));
3407 let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3408
3409 let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
3410 let data_page_size_limit: usize = 32;
3411 let write_batch_size: usize = 16;
3412
3413 for encoding in &encodings {
3414 for row_group_size in row_group_sizes {
3415 let props = WriterProperties::builder()
3416 .set_writer_version(WriterVersion::PARQUET_2_0)
3417 .set_max_row_group_row_count(Some(row_group_size))
3418 .set_dictionary_enabled(false)
3419 .set_encoding(*encoding)
3420 .set_data_page_size_limit(data_page_size_limit)
3421 .set_write_batch_size(write_batch_size)
3422 .build();
3423
3424 roundtrip_opts_with_array_validation(&expected_batch, props, |a, b| {
3425 let string_array_a = StringArray::from(a.clone());
3426 let string_array_b = StringArray::from(b.clone());
3427 let vec_a: Vec<&str> = string_array_a.iter().map(|v| v.unwrap()).collect();
3428 let vec_b: Vec<&str> = string_array_b.iter().map(|v| v.unwrap()).collect();
3429 assert_eq!(
3430 vec_a, vec_b,
3431 "failed for encoder: {encoding:?} and row_group_size: {row_group_size:?}"
3432 );
3433 });
3434 }
3435 }
3436 }
3437
3438 #[test]
3439 fn arrow_writer_string_dictionary() {
3440 #[allow(deprecated)]
3442 let schema = Arc::new(Schema::new(vec![Field::new_dict(
3443 "dictionary",
3444 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3445 true,
3446 42,
3447 true,
3448 )]));
3449
3450 let d: Int32DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
3452 .iter()
3453 .copied()
3454 .collect();
3455
3456 one_column_roundtrip_with_schema(Arc::new(d), schema);
3458 }
3459
3460 #[test]
3461 fn arrow_writer_test_type_compatibility() {
3462 fn ensure_compatible_write<T1, T2>(array1: T1, array2: T2, expected_result: T1)
3463 where
3464 T1: Array + 'static,
3465 T2: Array + 'static,
3466 {
3467 let schema1 = Arc::new(Schema::new(vec![Field::new(
3468 "a",
3469 array1.data_type().clone(),
3470 false,
3471 )]));
3472
3473 let file = tempfile().unwrap();
3474 let mut writer =
3475 ArrowWriter::try_new(file.try_clone().unwrap(), schema1.clone(), None).unwrap();
3476
3477 let rb1 = RecordBatch::try_new(schema1.clone(), vec![Arc::new(array1)]).unwrap();
3478 writer.write(&rb1).unwrap();
3479
3480 let schema2 = Arc::new(Schema::new(vec![Field::new(
3481 "a",
3482 array2.data_type().clone(),
3483 false,
3484 )]));
3485 let rb2 = RecordBatch::try_new(schema2, vec![Arc::new(array2)]).unwrap();
3486 writer.write(&rb2).unwrap();
3487
3488 writer.close().unwrap();
3489
3490 let mut record_batch_reader =
3491 ParquetRecordBatchReader::try_new(file.try_clone().unwrap(), 1024).unwrap();
3492 let actual_batch = record_batch_reader.next().unwrap().unwrap();
3493
3494 let expected_batch =
3495 RecordBatch::try_new(schema1, vec![Arc::new(expected_result)]).unwrap();
3496 assert_eq!(actual_batch, expected_batch);
3497 }
3498
3499 ensure_compatible_write(
3502 DictionaryArray::new(
3503 UInt8Array::from_iter_values(vec![0]),
3504 Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3505 ),
3506 StringArray::from_iter_values(vec!["barquet"]),
3507 DictionaryArray::new(
3508 UInt8Array::from_iter_values(vec![0, 1]),
3509 Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3510 ),
3511 );
3512
3513 ensure_compatible_write(
3514 StringArray::from_iter_values(vec!["parquet"]),
3515 DictionaryArray::new(
3516 UInt8Array::from_iter_values(vec![0]),
3517 Arc::new(StringArray::from_iter_values(vec!["barquet"])),
3518 ),
3519 StringArray::from_iter_values(vec!["parquet", "barquet"]),
3520 );
3521
3522 ensure_compatible_write(
3525 DictionaryArray::new(
3526 UInt8Array::from_iter_values(vec![0]),
3527 Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3528 ),
3529 DictionaryArray::new(
3530 UInt16Array::from_iter_values(vec![0]),
3531 Arc::new(StringArray::from_iter_values(vec!["barquet"])),
3532 ),
3533 DictionaryArray::new(
3534 UInt8Array::from_iter_values(vec![0, 1]),
3535 Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3536 ),
3537 );
3538
3539 ensure_compatible_write(
3541 DictionaryArray::new(
3542 UInt8Array::from_iter_values(vec![0]),
3543 Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3544 ),
3545 DictionaryArray::new(
3546 UInt8Array::from_iter_values(vec![0]),
3547 Arc::new(LargeStringArray::from_iter_values(vec!["barquet"])),
3548 ),
3549 DictionaryArray::new(
3550 UInt8Array::from_iter_values(vec![0, 1]),
3551 Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3552 ),
3553 );
3554
3555 ensure_compatible_write(
3557 DictionaryArray::new(
3558 UInt8Array::from_iter_values(vec![0]),
3559 Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3560 ),
3561 LargeStringArray::from_iter_values(vec!["barquet"]),
3562 DictionaryArray::new(
3563 UInt8Array::from_iter_values(vec![0, 1]),
3564 Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3565 ),
3566 );
3567
3568 ensure_compatible_write(
3571 StringArray::from_iter_values(vec!["parquet"]),
3572 LargeStringArray::from_iter_values(vec!["barquet"]),
3573 StringArray::from_iter_values(vec!["parquet", "barquet"]),
3574 );
3575
3576 ensure_compatible_write(
3577 LargeStringArray::from_iter_values(vec!["parquet"]),
3578 StringArray::from_iter_values(vec!["barquet"]),
3579 LargeStringArray::from_iter_values(vec!["parquet", "barquet"]),
3580 );
3581
3582 ensure_compatible_write(
3583 StringArray::from_iter_values(vec!["parquet"]),
3584 StringViewArray::from_iter_values(vec!["barquet"]),
3585 StringArray::from_iter_values(vec!["parquet", "barquet"]),
3586 );
3587
3588 ensure_compatible_write(
3589 StringViewArray::from_iter_values(vec!["parquet"]),
3590 StringArray::from_iter_values(vec!["barquet"]),
3591 StringViewArray::from_iter_values(vec!["parquet", "barquet"]),
3592 );
3593
3594 ensure_compatible_write(
3595 LargeStringArray::from_iter_values(vec!["parquet"]),
3596 StringViewArray::from_iter_values(vec!["barquet"]),
3597 LargeStringArray::from_iter_values(vec!["parquet", "barquet"]),
3598 );
3599
3600 ensure_compatible_write(
3601 StringViewArray::from_iter_values(vec!["parquet"]),
3602 LargeStringArray::from_iter_values(vec!["barquet"]),
3603 StringViewArray::from_iter_values(vec!["parquet", "barquet"]),
3604 );
3605
3606 ensure_compatible_write(
3609 BinaryArray::from_iter_values(vec![b"parquet"]),
3610 LargeBinaryArray::from_iter_values(vec![b"barquet"]),
3611 BinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3612 );
3613
3614 ensure_compatible_write(
3615 LargeBinaryArray::from_iter_values(vec![b"parquet"]),
3616 BinaryArray::from_iter_values(vec![b"barquet"]),
3617 LargeBinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3618 );
3619
3620 ensure_compatible_write(
3621 BinaryArray::from_iter_values(vec![b"parquet"]),
3622 BinaryViewArray::from_iter_values(vec![b"barquet"]),
3623 BinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3624 );
3625
3626 ensure_compatible_write(
3627 BinaryViewArray::from_iter_values(vec![b"parquet"]),
3628 BinaryArray::from_iter_values(vec![b"barquet"]),
3629 BinaryViewArray::from_iter_values(vec![b"parquet", b"barquet"]),
3630 );
3631
3632 ensure_compatible_write(
3633 BinaryViewArray::from_iter_values(vec![b"parquet"]),
3634 LargeBinaryArray::from_iter_values(vec![b"barquet"]),
3635 BinaryViewArray::from_iter_values(vec![b"parquet", b"barquet"]),
3636 );
3637
3638 ensure_compatible_write(
3639 LargeBinaryArray::from_iter_values(vec![b"parquet"]),
3640 BinaryViewArray::from_iter_values(vec![b"barquet"]),
3641 LargeBinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3642 );
3643
3644 let list_field_metadata = HashMap::from_iter(vec![(
3647 PARQUET_FIELD_ID_META_KEY.to_string(),
3648 "1".to_string(),
3649 )]);
3650 let list_field = Field::new_list_field(DataType::Int32, false);
3651
3652 let values1 = Arc::new(Int32Array::from(vec![0, 1, 2, 3, 4]));
3653 let offsets1 = OffsetBuffer::new(vec![0, 2, 5].into());
3654
3655 let values2 = Arc::new(Int32Array::from(vec![5, 6, 7, 8, 9]));
3656 let offsets2 = OffsetBuffer::new(vec![0, 3, 5].into());
3657
3658 let values_expected = Arc::new(Int32Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]));
3659 let offsets_expected = OffsetBuffer::new(vec![0, 2, 5, 8, 10].into());
3660
3661 ensure_compatible_write(
3662 ListArray::try_new(
3664 Arc::new(
3665 list_field
3666 .clone()
3667 .with_metadata(list_field_metadata.clone()),
3668 ),
3669 offsets1,
3670 values1,
3671 None,
3672 )
3673 .unwrap(),
3674 ListArray::try_new(Arc::new(list_field.clone()), offsets2, values2, None).unwrap(),
3676 ListArray::try_new(
3678 Arc::new(
3679 list_field
3680 .clone()
3681 .with_metadata(list_field_metadata.clone()),
3682 ),
3683 offsets_expected,
3684 values_expected,
3685 None,
3686 )
3687 .unwrap(),
3688 );
3689 }
3690
3691 #[test]
3692 fn arrow_writer_primitive_dictionary() {
3693 #[allow(deprecated)]
3695 let schema = Arc::new(Schema::new(vec![Field::new_dict(
3696 "dictionary",
3697 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::UInt32)),
3698 true,
3699 42,
3700 true,
3701 )]));
3702
3703 let mut builder = PrimitiveDictionaryBuilder::<UInt8Type, UInt32Type>::new();
3705 builder.append(12345678).unwrap();
3706 builder.append_null();
3707 builder.append(22345678).unwrap();
3708 builder.append(12345678).unwrap();
3709 let d = builder.finish();
3710
3711 one_column_roundtrip_with_schema(Arc::new(d), schema);
3712 }
3713
3714 #[test]
3715 fn arrow_writer_decimal32_dictionary() {
3716 let integers = vec![12345, 56789, 34567];
3717
3718 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3719
3720 let values = Decimal32Array::from(integers.clone())
3721 .with_precision_and_scale(5, 2)
3722 .unwrap();
3723
3724 let array = DictionaryArray::new(keys, Arc::new(values));
3725 one_column_roundtrip(Arc::new(array.clone()), true);
3726
3727 let values = Decimal32Array::from(integers)
3728 .with_precision_and_scale(9, 2)
3729 .unwrap();
3730
3731 let array = array.with_values(Arc::new(values));
3732 one_column_roundtrip(Arc::new(array), true);
3733 }
3734
3735 #[test]
3736 fn arrow_writer_decimal64_dictionary() {
3737 let integers = vec![12345, 56789, 34567];
3738
3739 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3740
3741 let values = Decimal64Array::from(integers.clone())
3742 .with_precision_and_scale(5, 2)
3743 .unwrap();
3744
3745 let array = DictionaryArray::new(keys, Arc::new(values));
3746 one_column_roundtrip(Arc::new(array.clone()), true);
3747
3748 let values = Decimal64Array::from(integers)
3749 .with_precision_and_scale(12, 2)
3750 .unwrap();
3751
3752 let array = array.with_values(Arc::new(values));
3753 one_column_roundtrip(Arc::new(array), true);
3754 }
3755
3756 #[test]
3757 fn arrow_writer_decimal128_dictionary() {
3758 let integers = vec![12345, 56789, 34567];
3759
3760 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3761
3762 let values = Decimal128Array::from(integers.clone())
3763 .with_precision_and_scale(5, 2)
3764 .unwrap();
3765
3766 let array = DictionaryArray::new(keys, Arc::new(values));
3767 one_column_roundtrip(Arc::new(array.clone()), true);
3768
3769 let values = Decimal128Array::from(integers)
3770 .with_precision_and_scale(12, 2)
3771 .unwrap();
3772
3773 let array = array.with_values(Arc::new(values));
3774 one_column_roundtrip(Arc::new(array), true);
3775 }
3776
3777 #[test]
3778 fn arrow_writer_decimal256_dictionary() {
3779 let integers = vec![
3780 i256::from_i128(12345),
3781 i256::from_i128(56789),
3782 i256::from_i128(34567),
3783 ];
3784
3785 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3786
3787 let values = Decimal256Array::from(integers.clone())
3788 .with_precision_and_scale(5, 2)
3789 .unwrap();
3790
3791 let array = DictionaryArray::new(keys, Arc::new(values));
3792 one_column_roundtrip(Arc::new(array.clone()), true);
3793
3794 let values = Decimal256Array::from(integers)
3795 .with_precision_and_scale(12, 2)
3796 .unwrap();
3797
3798 let array = array.with_values(Arc::new(values));
3799 one_column_roundtrip(Arc::new(array), true);
3800 }
3801
3802 #[test]
3803 fn arrow_writer_string_dictionary_unsigned_index() {
3804 #[allow(deprecated)]
3806 let schema = Arc::new(Schema::new(vec![Field::new_dict(
3807 "dictionary",
3808 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3809 true,
3810 42,
3811 true,
3812 )]));
3813
3814 let d: UInt8DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
3816 .iter()
3817 .copied()
3818 .collect();
3819
3820 one_column_roundtrip_with_schema(Arc::new(d), schema);
3821 }
3822
3823 #[test]
3824 fn u32_min_max() {
3825 let src = [
3827 u32::MIN,
3828 u32::MIN + 1,
3829 (i32::MAX as u32) - 1,
3830 i32::MAX as u32,
3831 (i32::MAX as u32) + 1,
3832 u32::MAX - 1,
3833 u32::MAX,
3834 ];
3835 let values = Arc::new(UInt32Array::from_iter_values(src.iter().cloned()));
3836 let files = one_column_roundtrip(values, false);
3837
3838 for file in files {
3839 let reader = SerializedFileReader::new(file).unwrap();
3841 let metadata = reader.metadata();
3842
3843 let mut row_offset = 0;
3844 for row_group in metadata.row_groups() {
3845 assert_eq!(row_group.num_columns(), 1);
3846 let column = row_group.column(0);
3847
3848 let num_values = column.num_values() as usize;
3849 let src_slice = &src[row_offset..row_offset + num_values];
3850 row_offset += column.num_values() as usize;
3851
3852 let stats = column.statistics().unwrap();
3853 if let Statistics::Int32(stats) = stats {
3854 assert_eq!(
3855 *stats.min_opt().unwrap() as u32,
3856 *src_slice.iter().min().unwrap()
3857 );
3858 assert_eq!(
3859 *stats.max_opt().unwrap() as u32,
3860 *src_slice.iter().max().unwrap()
3861 );
3862 } else {
3863 panic!("Statistics::Int32 missing")
3864 }
3865 }
3866 }
3867 }
3868
3869 #[test]
3870 fn u64_min_max() {
3871 let src = [
3873 u64::MIN,
3874 u64::MIN + 1,
3875 (i64::MAX as u64) - 1,
3876 i64::MAX as u64,
3877 (i64::MAX as u64) + 1,
3878 u64::MAX - 1,
3879 u64::MAX,
3880 ];
3881 let values = Arc::new(UInt64Array::from_iter_values(src.iter().cloned()));
3882 let files = one_column_roundtrip(values, false);
3883
3884 for file in files {
3885 let reader = SerializedFileReader::new(file).unwrap();
3887 let metadata = reader.metadata();
3888
3889 let mut row_offset = 0;
3890 for row_group in metadata.row_groups() {
3891 assert_eq!(row_group.num_columns(), 1);
3892 let column = row_group.column(0);
3893
3894 let num_values = column.num_values() as usize;
3895 let src_slice = &src[row_offset..row_offset + num_values];
3896 row_offset += column.num_values() as usize;
3897
3898 let stats = column.statistics().unwrap();
3899 if let Statistics::Int64(stats) = stats {
3900 assert_eq!(
3901 *stats.min_opt().unwrap() as u64,
3902 *src_slice.iter().min().unwrap()
3903 );
3904 assert_eq!(
3905 *stats.max_opt().unwrap() as u64,
3906 *src_slice.iter().max().unwrap()
3907 );
3908 } else {
3909 panic!("Statistics::Int64 missing")
3910 }
3911 }
3912 }
3913 }
3914
3915 #[test]
3916 fn statistics_null_counts_only_nulls() {
3917 let values = Arc::new(UInt64Array::from(vec![None, None]));
3919 let files = one_column_roundtrip(values, true);
3920
3921 for file in files {
3922 let reader = SerializedFileReader::new(file).unwrap();
3924 let metadata = reader.metadata();
3925 assert_eq!(metadata.num_row_groups(), 1);
3926 let row_group = metadata.row_group(0);
3927 assert_eq!(row_group.num_columns(), 1);
3928 let column = row_group.column(0);
3929 let stats = column.statistics().unwrap();
3930 assert_eq!(stats.null_count_opt(), Some(2));
3931 }
3932 }
3933
3934 #[test]
3935 fn test_list_of_struct_roundtrip() {
3936 let int_field = Field::new("a", DataType::Int32, true);
3938 let int_field2 = Field::new("b", DataType::Int32, true);
3939
3940 let int_builder = Int32Builder::with_capacity(10);
3941 let int_builder2 = Int32Builder::with_capacity(10);
3942
3943 let struct_builder = StructBuilder::new(
3944 vec![int_field, int_field2],
3945 vec![Box::new(int_builder), Box::new(int_builder2)],
3946 );
3947 let mut list_builder = ListBuilder::new(struct_builder);
3948
3949 let values = list_builder.values();
3954 values
3955 .field_builder::<Int32Builder>(0)
3956 .unwrap()
3957 .append_value(1);
3958 values
3959 .field_builder::<Int32Builder>(1)
3960 .unwrap()
3961 .append_value(2);
3962 values.append(true);
3963 list_builder.append(true);
3964
3965 list_builder.append(true);
3967
3968 list_builder.append(false);
3970
3971 let values = list_builder.values();
3973 values
3974 .field_builder::<Int32Builder>(0)
3975 .unwrap()
3976 .append_null();
3977 values
3978 .field_builder::<Int32Builder>(1)
3979 .unwrap()
3980 .append_null();
3981 values.append(false);
3982 values
3983 .field_builder::<Int32Builder>(0)
3984 .unwrap()
3985 .append_null();
3986 values
3987 .field_builder::<Int32Builder>(1)
3988 .unwrap()
3989 .append_null();
3990 values.append(false);
3991 list_builder.append(true);
3992
3993 let values = list_builder.values();
3995 values
3996 .field_builder::<Int32Builder>(0)
3997 .unwrap()
3998 .append_null();
3999 values
4000 .field_builder::<Int32Builder>(1)
4001 .unwrap()
4002 .append_value(3);
4003 values.append(true);
4004 list_builder.append(true);
4005
4006 let values = list_builder.values();
4008 values
4009 .field_builder::<Int32Builder>(0)
4010 .unwrap()
4011 .append_value(2);
4012 values
4013 .field_builder::<Int32Builder>(1)
4014 .unwrap()
4015 .append_null();
4016 values.append(true);
4017 list_builder.append(true);
4018
4019 let array = Arc::new(list_builder.finish());
4020
4021 one_column_roundtrip(array, true);
4022 }
4023
4024 fn row_group_sizes(metadata: &ParquetMetaData) -> Vec<i64> {
4025 metadata.row_groups().iter().map(|x| x.num_rows()).collect()
4026 }
4027
4028 #[test]
4029 fn test_aggregates_records() {
4030 let arrays = [
4031 Int32Array::from((0..100).collect::<Vec<_>>()),
4032 Int32Array::from((0..50).collect::<Vec<_>>()),
4033 Int32Array::from((200..500).collect::<Vec<_>>()),
4034 ];
4035
4036 let schema = Arc::new(Schema::new(vec![Field::new(
4037 "int",
4038 ArrowDataType::Int32,
4039 false,
4040 )]));
4041
4042 let file = tempfile::tempfile().unwrap();
4043
4044 let props = WriterProperties::builder()
4045 .set_max_row_group_row_count(Some(200))
4046 .build();
4047
4048 let mut writer =
4049 ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
4050
4051 for array in arrays {
4052 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
4053 writer.write(&batch).unwrap();
4054 }
4055
4056 writer.close().unwrap();
4057
4058 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4059 assert_eq!(&row_group_sizes(builder.metadata()), &[200, 200, 50]);
4060
4061 let batches = builder
4062 .with_batch_size(100)
4063 .build()
4064 .unwrap()
4065 .collect::<ArrowResult<Vec<_>>>()
4066 .unwrap();
4067
4068 assert_eq!(batches.len(), 5);
4069 assert!(batches.iter().all(|x| x.num_columns() == 1));
4070
4071 let batch_sizes: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
4072
4073 assert_eq!(&batch_sizes, &[100, 100, 100, 100, 50]);
4074
4075 let values: Vec<_> = batches
4076 .iter()
4077 .flat_map(|x| {
4078 x.column(0)
4079 .as_any()
4080 .downcast_ref::<Int32Array>()
4081 .unwrap()
4082 .values()
4083 .iter()
4084 .cloned()
4085 })
4086 .collect();
4087
4088 let expected_values: Vec<_> = [0..100, 0..50, 200..500].into_iter().flatten().collect();
4089 assert_eq!(&values, &expected_values)
4090 }
4091
4092 #[test]
4093 fn complex_aggregate() {
4094 let field_a = Arc::new(Field::new("leaf_a", DataType::Int32, false));
4096 let field_b = Arc::new(Field::new("leaf_b", DataType::Int32, true));
4097 let struct_a = Arc::new(Field::new(
4098 "struct_a",
4099 DataType::Struct(vec![field_a.clone(), field_b.clone()].into()),
4100 true,
4101 ));
4102
4103 let list_a = Arc::new(Field::new("list", DataType::List(struct_a), true));
4104 let struct_b = Arc::new(Field::new(
4105 "struct_b",
4106 DataType::Struct(vec![list_a.clone()].into()),
4107 false,
4108 ));
4109
4110 let schema = Arc::new(Schema::new(vec![struct_b]));
4111
4112 let field_a_array = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
4114 let field_b_array =
4115 Int32Array::from_iter(vec![Some(1), None, Some(2), None, None, Some(6)]);
4116
4117 let struct_a_array = StructArray::from(vec![
4118 (field_a.clone(), Arc::new(field_a_array) as ArrayRef),
4119 (field_b.clone(), Arc::new(field_b_array) as ArrayRef),
4120 ]);
4121
4122 let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
4123 .len(5)
4124 .add_buffer(Buffer::from_iter(vec![
4125 0_i32, 1_i32, 1_i32, 3_i32, 3_i32, 5_i32,
4126 ]))
4127 .null_bit_buffer(Some(Buffer::from_iter(vec![
4128 true, false, true, false, true,
4129 ])))
4130 .child_data(vec![struct_a_array.into_data()])
4131 .build()
4132 .unwrap();
4133
4134 let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
4135 let struct_b_array = StructArray::from(vec![(list_a.clone(), list_a_array)]);
4136
4137 let batch1 =
4138 RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
4139 .unwrap();
4140
4141 let field_a_array = Int32Array::from(vec![6, 7, 8, 9, 10]);
4142 let field_b_array = Int32Array::from_iter(vec![None, None, None, Some(1), None]);
4143
4144 let struct_a_array = StructArray::from(vec![
4145 (field_a, Arc::new(field_a_array) as ArrayRef),
4146 (field_b, Arc::new(field_b_array) as ArrayRef),
4147 ]);
4148
4149 let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
4150 .len(2)
4151 .add_buffer(Buffer::from_iter(vec![0_i32, 4_i32, 5_i32]))
4152 .child_data(vec![struct_a_array.into_data()])
4153 .build()
4154 .unwrap();
4155
4156 let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
4157 let struct_b_array = StructArray::from(vec![(list_a, list_a_array)]);
4158
4159 let batch2 =
4160 RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
4161 .unwrap();
4162
4163 let batches = &[batch1, batch2];
4164
4165 let expected = r#"
4168 +-------------------------------------------------------------------------------------------------------+
4169 | struct_b |
4170 +-------------------------------------------------------------------------------------------------------+
4171 | {list: [{leaf_a: 1, leaf_b: 1}]} |
4172 | {list: } |
4173 | {list: [{leaf_a: 2, leaf_b: }, {leaf_a: 3, leaf_b: 2}]} |
4174 | {list: } |
4175 | {list: [{leaf_a: 4, leaf_b: }, {leaf_a: 5, leaf_b: }]} |
4176 | {list: [{leaf_a: 6, leaf_b: }, {leaf_a: 7, leaf_b: }, {leaf_a: 8, leaf_b: }, {leaf_a: 9, leaf_b: 1}]} |
4177 | {list: [{leaf_a: 10, leaf_b: }]} |
4178 +-------------------------------------------------------------------------------------------------------+
4179 "#.trim().split('\n').map(|x| x.trim()).collect::<Vec<_>>().join("\n");
4180
4181 let actual = pretty_format_batches(batches).unwrap().to_string();
4182 assert_eq!(actual, expected);
4183
4184 let file = tempfile::tempfile().unwrap();
4186 let props = WriterProperties::builder()
4187 .set_max_row_group_row_count(Some(6))
4188 .build();
4189
4190 let mut writer =
4191 ArrowWriter::try_new(file.try_clone().unwrap(), schema, Some(props)).unwrap();
4192
4193 for batch in batches {
4194 writer.write(batch).unwrap();
4195 }
4196 writer.close().unwrap();
4197
4198 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4203 assert_eq!(&row_group_sizes(builder.metadata()), &[6, 1]);
4204
4205 let batches = builder
4206 .with_batch_size(2)
4207 .build()
4208 .unwrap()
4209 .collect::<ArrowResult<Vec<_>>>()
4210 .unwrap();
4211
4212 assert_eq!(batches.len(), 4);
4213 let batch_counts: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
4214 assert_eq!(&batch_counts, &[2, 2, 2, 1]);
4215
4216 let actual = pretty_format_batches(&batches).unwrap().to_string();
4217 assert_eq!(actual, expected);
4218 }
4219
4220 #[test]
4221 fn test_arrow_writer_metadata() {
4222 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
4223 let file_schema = batch_schema.clone().with_metadata(
4224 vec![("foo".to_string(), "bar".to_string())]
4225 .into_iter()
4226 .collect(),
4227 );
4228
4229 let batch = RecordBatch::try_new(
4230 Arc::new(batch_schema),
4231 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4232 )
4233 .unwrap();
4234
4235 let mut buf = Vec::with_capacity(1024);
4236 let mut writer = ArrowWriter::try_new(&mut buf, Arc::new(file_schema), None).unwrap();
4237 writer.write(&batch).unwrap();
4238 writer.close().unwrap();
4239 }
4240
4241 #[test]
4242 fn test_arrow_writer_nullable() {
4243 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
4244 let file_schema = Schema::new(vec![Field::new("int32", DataType::Int32, true)]);
4245 let file_schema = Arc::new(file_schema);
4246
4247 let batch = RecordBatch::try_new(
4248 Arc::new(batch_schema),
4249 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4250 )
4251 .unwrap();
4252
4253 let mut buf = Vec::with_capacity(1024);
4254 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
4255 writer.write(&batch).unwrap();
4256 writer.close().unwrap();
4257
4258 let mut read = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
4259 let back = read.next().unwrap().unwrap();
4260 assert_eq!(back.schema(), file_schema);
4261 assert_ne!(back.schema(), batch.schema());
4262 assert_eq!(back.column(0).as_ref(), batch.column(0).as_ref());
4263 }
4264
4265 #[test]
4266 fn in_progress_accounting() {
4267 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
4269
4270 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
4272
4273 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
4275
4276 let mut writer = ArrowWriter::try_new(vec![], batch.schema(), None).unwrap();
4277
4278 assert_eq!(writer.in_progress_size(), 0);
4280 assert_eq!(writer.in_progress_rows(), 0);
4281 assert_eq!(writer.memory_size(), 0);
4282 assert_eq!(writer.bytes_written(), 4); writer.write(&batch).unwrap();
4284
4285 let initial_size = writer.in_progress_size();
4287 assert!(initial_size > 0);
4288 assert_eq!(writer.in_progress_rows(), 5);
4289 let initial_memory = writer.memory_size();
4290 assert!(initial_memory > 0);
4291 assert!(
4293 initial_size <= initial_memory,
4294 "{initial_size} <= {initial_memory}"
4295 );
4296
4297 writer.write(&batch).unwrap();
4299 assert!(writer.in_progress_size() > initial_size);
4300 assert_eq!(writer.in_progress_rows(), 10);
4301 assert!(writer.memory_size() > initial_memory);
4302 assert!(
4303 writer.in_progress_size() <= writer.memory_size(),
4304 "in_progress_size {} <= memory_size {}",
4305 writer.in_progress_size(),
4306 writer.memory_size()
4307 );
4308
4309 let pre_flush_bytes_written = writer.bytes_written();
4311 writer.flush().unwrap();
4312 assert_eq!(writer.in_progress_size(), 0);
4313 assert_eq!(writer.memory_size(), 0);
4314 assert!(writer.bytes_written() > pre_flush_bytes_written);
4315
4316 writer.close().unwrap();
4317 }
4318
4319 #[test]
4320 fn test_writer_all_null() {
4321 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
4322 let b = Int32Array::new(vec![0; 5].into(), Some(NullBuffer::new_null(5)));
4323 let batch = RecordBatch::try_from_iter(vec![
4324 ("a", Arc::new(a) as ArrayRef),
4325 ("b", Arc::new(b) as ArrayRef),
4326 ])
4327 .unwrap();
4328
4329 let mut buf = Vec::with_capacity(1024);
4330 let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), None).unwrap();
4331 writer.write(&batch).unwrap();
4332 writer.close().unwrap();
4333
4334 let bytes = Bytes::from(buf);
4335 let options = ReadOptionsBuilder::new().with_page_index().build();
4336 let reader = SerializedFileReader::new_with_options(bytes, options).unwrap();
4337 let index = reader.metadata().offset_index().unwrap();
4338
4339 assert_eq!(index.len(), 1);
4340 assert_eq!(index[0].len(), 2); assert_eq!(index[0][0].page_locations().len(), 1); assert_eq!(index[0][1].page_locations().len(), 1); }
4344
4345 #[test]
4346 fn test_disabled_statistics_with_page() {
4347 let file_schema = Schema::new(vec![
4348 Field::new("a", DataType::Utf8, true),
4349 Field::new("b", DataType::Utf8, true),
4350 ]);
4351 let file_schema = Arc::new(file_schema);
4352
4353 let batch = RecordBatch::try_new(
4354 file_schema.clone(),
4355 vec![
4356 Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
4357 Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
4358 ],
4359 )
4360 .unwrap();
4361
4362 let props = WriterProperties::builder()
4363 .set_statistics_enabled(EnabledStatistics::None)
4364 .set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
4365 .build();
4366
4367 let mut buf = Vec::with_capacity(1024);
4368 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
4369 writer.write(&batch).unwrap();
4370
4371 let metadata = writer.close().unwrap();
4372 assert_eq!(metadata.num_row_groups(), 1);
4373 let row_group = metadata.row_group(0);
4374 assert_eq!(row_group.num_columns(), 2);
4375 assert!(row_group.column(0).offset_index_offset().is_some());
4377 assert!(row_group.column(0).column_index_offset().is_some());
4378 assert!(row_group.column(1).offset_index_offset().is_some());
4380 assert!(row_group.column(1).column_index_offset().is_none());
4381
4382 let options = ReadOptionsBuilder::new().with_page_index().build();
4383 let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
4384
4385 let row_group = reader.get_row_group(0).unwrap();
4386 let a_col = row_group.metadata().column(0);
4387 let b_col = row_group.metadata().column(1);
4388
4389 if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
4391 let min = byte_array_stats.min_opt().unwrap();
4392 let max = byte_array_stats.max_opt().unwrap();
4393
4394 assert_eq!(min.as_bytes(), b"a");
4395 assert_eq!(max.as_bytes(), b"d");
4396 } else {
4397 panic!("expecting Statistics::ByteArray");
4398 }
4399
4400 assert!(b_col.statistics().is_none());
4402
4403 let offset_index = reader.metadata().offset_index().unwrap();
4404 assert_eq!(offset_index.len(), 1); assert_eq!(offset_index[0].len(), 2); let column_index = reader.metadata().column_index().unwrap();
4408 assert_eq!(column_index.len(), 1); assert_eq!(column_index[0].len(), 2); let a_idx = &column_index[0][0];
4412 assert!(
4413 matches!(a_idx, ColumnIndexMetaData::BYTE_ARRAY(_)),
4414 "{a_idx:?}"
4415 );
4416 let b_idx = &column_index[0][1];
4417 assert!(matches!(b_idx, ColumnIndexMetaData::NONE), "{b_idx:?}");
4418 }
4419
4420 #[test]
4421 fn test_disabled_statistics_with_chunk() {
4422 let file_schema = Schema::new(vec![
4423 Field::new("a", DataType::Utf8, true),
4424 Field::new("b", DataType::Utf8, true),
4425 ]);
4426 let file_schema = Arc::new(file_schema);
4427
4428 let batch = RecordBatch::try_new(
4429 file_schema.clone(),
4430 vec![
4431 Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
4432 Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
4433 ],
4434 )
4435 .unwrap();
4436
4437 let props = WriterProperties::builder()
4438 .set_statistics_enabled(EnabledStatistics::None)
4439 .set_column_statistics_enabled("a".into(), EnabledStatistics::Chunk)
4440 .build();
4441
4442 let mut buf = Vec::with_capacity(1024);
4443 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
4444 writer.write(&batch).unwrap();
4445
4446 let metadata = writer.close().unwrap();
4447 assert_eq!(metadata.num_row_groups(), 1);
4448 let row_group = metadata.row_group(0);
4449 assert_eq!(row_group.num_columns(), 2);
4450 assert!(row_group.column(0).offset_index_offset().is_some());
4452 assert!(row_group.column(0).column_index_offset().is_none());
4453 assert!(row_group.column(1).offset_index_offset().is_some());
4455 assert!(row_group.column(1).column_index_offset().is_none());
4456
4457 let options = ReadOptionsBuilder::new().with_page_index().build();
4458 let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
4459
4460 let row_group = reader.get_row_group(0).unwrap();
4461 let a_col = row_group.metadata().column(0);
4462 let b_col = row_group.metadata().column(1);
4463
4464 if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
4466 let min = byte_array_stats.min_opt().unwrap();
4467 let max = byte_array_stats.max_opt().unwrap();
4468
4469 assert_eq!(min.as_bytes(), b"a");
4470 assert_eq!(max.as_bytes(), b"d");
4471 } else {
4472 panic!("expecting Statistics::ByteArray");
4473 }
4474
4475 assert!(b_col.statistics().is_none());
4477
4478 let column_index = reader.metadata().column_index().unwrap();
4479 assert_eq!(column_index.len(), 1); assert_eq!(column_index[0].len(), 2); let a_idx = &column_index[0][0];
4483 assert!(matches!(a_idx, ColumnIndexMetaData::NONE), "{a_idx:?}");
4484 let b_idx = &column_index[0][1];
4485 assert!(matches!(b_idx, ColumnIndexMetaData::NONE), "{b_idx:?}");
4486 }
4487
4488 #[test]
4489 fn test_arrow_writer_skip_metadata() {
4490 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
4491 let file_schema = Arc::new(batch_schema.clone());
4492
4493 let batch = RecordBatch::try_new(
4494 Arc::new(batch_schema),
4495 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4496 )
4497 .unwrap();
4498 let skip_options = ArrowWriterOptions::new().with_skip_arrow_metadata(true);
4499
4500 let mut buf = Vec::with_capacity(1024);
4501 let mut writer =
4502 ArrowWriter::try_new_with_options(&mut buf, file_schema.clone(), skip_options).unwrap();
4503 writer.write(&batch).unwrap();
4504 writer.close().unwrap();
4505
4506 let bytes = Bytes::from(buf);
4507 let reader_builder = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
4508 assert_eq!(file_schema, *reader_builder.schema());
4509 if let Some(key_value_metadata) = reader_builder
4510 .metadata()
4511 .file_metadata()
4512 .key_value_metadata()
4513 {
4514 assert!(
4515 !key_value_metadata
4516 .iter()
4517 .any(|kv| kv.key.as_str() == ARROW_SCHEMA_META_KEY)
4518 );
4519 }
4520 }
4521
4522 #[test]
4523 fn mismatched_schemas() {
4524 let batch_schema = Schema::new(vec![Field::new("count", DataType::Int32, false)]);
4525 let file_schema = Arc::new(Schema::new(vec![Field::new(
4526 "temperature",
4527 DataType::Float64,
4528 false,
4529 )]));
4530
4531 let batch = RecordBatch::try_new(
4532 Arc::new(batch_schema),
4533 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4534 )
4535 .unwrap();
4536
4537 let mut buf = Vec::with_capacity(1024);
4538 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
4539
4540 let err = writer.write(&batch).unwrap_err().to_string();
4541 assert_eq!(
4542 err,
4543 "Arrow: Incompatible type. Field 'temperature' has type Float64, array has type Int32"
4544 );
4545 }
4546
4547 #[test]
4548 fn test_roundtrip_empty_schema() {
4550 let empty_batch = RecordBatch::try_new_with_options(
4552 Arc::new(Schema::empty()),
4553 vec![],
4554 &RecordBatchOptions::default().with_row_count(Some(0)),
4555 )
4556 .unwrap();
4557
4558 let mut parquet_bytes: Vec<u8> = Vec::new();
4560 let mut writer =
4561 ArrowWriter::try_new(&mut parquet_bytes, empty_batch.schema(), None).unwrap();
4562 writer.write(&empty_batch).unwrap();
4563 writer.close().unwrap();
4564
4565 let bytes = Bytes::from(parquet_bytes);
4567 let reader = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
4568 assert_eq!(reader.schema(), &empty_batch.schema());
4569 let batches: Vec<_> = reader
4570 .build()
4571 .unwrap()
4572 .collect::<ArrowResult<Vec<_>>>()
4573 .unwrap();
4574 assert_eq!(batches.len(), 0);
4575 }
4576
4577 #[test]
4578 fn test_page_stats_not_written_by_default() {
4579 let string_field = Field::new("a", DataType::Utf8, false);
4580 let schema = Schema::new(vec![string_field]);
4581 let raw_string_values = vec!["Blart Versenwald III"];
4582 let string_values = StringArray::from(raw_string_values.clone());
4583 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
4584
4585 let props = WriterProperties::builder()
4586 .set_statistics_enabled(EnabledStatistics::Page)
4587 .set_dictionary_enabled(false)
4588 .set_encoding(Encoding::PLAIN)
4589 .set_compression(crate::basic::Compression::UNCOMPRESSED)
4590 .build();
4591
4592 let file = roundtrip_opts(&batch, props);
4593
4594 let first_page = &file[4..];
4599 let mut prot = ThriftSliceInputProtocol::new(first_page);
4600 let hdr = PageHeader::read_thrift(&mut prot).unwrap();
4601 let stats = hdr.data_page_header.unwrap().statistics;
4602
4603 assert!(stats.is_none());
4604 }
4605
4606 #[test]
4607 fn test_page_stats_when_enabled() {
4608 let string_field = Field::new("a", DataType::Utf8, false);
4609 let schema = Schema::new(vec![string_field]);
4610 let raw_string_values = vec!["Blart Versenwald III", "Andrew Lamb"];
4611 let string_values = StringArray::from(raw_string_values.clone());
4612 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
4613
4614 let props = WriterProperties::builder()
4615 .set_statistics_enabled(EnabledStatistics::Page)
4616 .set_dictionary_enabled(false)
4617 .set_encoding(Encoding::PLAIN)
4618 .set_write_page_header_statistics(true)
4619 .set_compression(crate::basic::Compression::UNCOMPRESSED)
4620 .build();
4621
4622 let file = roundtrip_opts(&batch, props);
4623
4624 let first_page = &file[4..];
4629 let mut prot = ThriftSliceInputProtocol::new(first_page);
4630 let hdr = PageHeader::read_thrift(&mut prot).unwrap();
4631 let stats = hdr.data_page_header.unwrap().statistics;
4632
4633 let stats = stats.unwrap();
4634 assert!(stats.is_max_value_exact.unwrap());
4636 assert!(stats.is_min_value_exact.unwrap());
4637 assert_eq!(stats.max_value.unwrap(), "Blart Versenwald III".as_bytes());
4638 assert_eq!(stats.min_value.unwrap(), "Andrew Lamb".as_bytes());
4639 }
4640
4641 #[test]
4642 fn test_page_stats_truncation() {
4643 let string_field = Field::new("a", DataType::Utf8, false);
4644 let binary_field = Field::new("b", DataType::Binary, false);
4645 let schema = Schema::new(vec![string_field, binary_field]);
4646
4647 let raw_string_values = vec!["Blart Versenwald III"];
4648 let raw_binary_values = [b"Blart Versenwald III".to_vec()];
4649 let raw_binary_value_refs = raw_binary_values
4650 .iter()
4651 .map(|x| x.as_slice())
4652 .collect::<Vec<_>>();
4653
4654 let string_values = StringArray::from(raw_string_values.clone());
4655 let binary_values = BinaryArray::from(raw_binary_value_refs);
4656 let batch = RecordBatch::try_new(
4657 Arc::new(schema),
4658 vec![Arc::new(string_values), Arc::new(binary_values)],
4659 )
4660 .unwrap();
4661
4662 let props = WriterProperties::builder()
4663 .set_statistics_truncate_length(Some(2))
4664 .set_dictionary_enabled(false)
4665 .set_encoding(Encoding::PLAIN)
4666 .set_write_page_header_statistics(true)
4667 .set_compression(crate::basic::Compression::UNCOMPRESSED)
4668 .build();
4669
4670 let file = roundtrip_opts(&batch, props);
4671
4672 let first_page = &file[4..];
4677 let mut prot = ThriftSliceInputProtocol::new(first_page);
4678 let hdr = PageHeader::read_thrift(&mut prot).unwrap();
4679 let stats = hdr.data_page_header.unwrap().statistics;
4680 assert!(stats.is_some());
4681 let stats = stats.unwrap();
4682 assert!(!stats.is_max_value_exact.unwrap());
4684 assert!(!stats.is_min_value_exact.unwrap());
4685 assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
4686 assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
4687
4688 let second_page = &prot.as_slice()[hdr.compressed_page_size as usize..];
4690 let mut prot = ThriftSliceInputProtocol::new(second_page);
4691 let hdr = PageHeader::read_thrift(&mut prot).unwrap();
4692 let stats = hdr.data_page_header.unwrap().statistics;
4693 assert!(stats.is_some());
4694 let stats = stats.unwrap();
4695 assert!(!stats.is_max_value_exact.unwrap());
4697 assert!(!stats.is_min_value_exact.unwrap());
4698 assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
4699 assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
4700 }
4701
4702 #[test]
4703 fn test_page_encoding_statistics_roundtrip() {
4704 let batch_schema = Schema::new(vec![Field::new(
4705 "int32",
4706 arrow_schema::DataType::Int32,
4707 false,
4708 )]);
4709
4710 let batch = RecordBatch::try_new(
4711 Arc::new(batch_schema.clone()),
4712 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4713 )
4714 .unwrap();
4715
4716 let mut file: File = tempfile::tempfile().unwrap();
4717 let mut writer = ArrowWriter::try_new(&mut file, Arc::new(batch_schema), None).unwrap();
4718 writer.write(&batch).unwrap();
4719 let file_metadata = writer.close().unwrap();
4720
4721 assert_eq!(file_metadata.num_row_groups(), 1);
4722 assert_eq!(file_metadata.row_group(0).num_columns(), 1);
4723 assert!(
4724 file_metadata
4725 .row_group(0)
4726 .column(0)
4727 .page_encoding_stats()
4728 .is_some()
4729 );
4730 let chunk_page_stats = file_metadata
4731 .row_group(0)
4732 .column(0)
4733 .page_encoding_stats()
4734 .unwrap();
4735
4736 let options = ReadOptionsBuilder::new()
4738 .with_page_index()
4739 .with_encoding_stats_as_mask(false)
4740 .build();
4741 let reader = SerializedFileReader::new_with_options(file, options).unwrap();
4742
4743 let rowgroup = reader.get_row_group(0).expect("row group missing");
4744 assert_eq!(rowgroup.num_columns(), 1);
4745 let column = rowgroup.metadata().column(0);
4746 assert!(column.page_encoding_stats().is_some());
4747 let file_page_stats = column.page_encoding_stats().unwrap();
4748 assert_eq!(chunk_page_stats, file_page_stats);
4749 }
4750
4751 #[test]
4752 fn test_different_dict_page_size_limit() {
4753 let array = Arc::new(Int64Array::from_iter(0..1024 * 1024));
4754 let schema = Arc::new(Schema::new(vec![
4755 Field::new("col0", arrow_schema::DataType::Int64, false),
4756 Field::new("col1", arrow_schema::DataType::Int64, false),
4757 ]));
4758 let batch =
4759 arrow_array::RecordBatch::try_new(schema.clone(), vec![array.clone(), array]).unwrap();
4760
4761 let props = WriterProperties::builder()
4762 .set_dictionary_page_size_limit(1024 * 1024)
4763 .set_column_dictionary_page_size_limit(ColumnPath::from("col1"), 1024 * 1024 * 4)
4764 .build();
4765 let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap();
4766 writer.write(&batch).unwrap();
4767 let data = Bytes::from(writer.into_inner().unwrap());
4768
4769 let mut metadata = ParquetMetaDataReader::new();
4770 metadata.try_parse(&data).unwrap();
4771 let metadata = metadata.finish().unwrap();
4772 let col0_meta = metadata.row_group(0).column(0);
4773 let col1_meta = metadata.row_group(0).column(1);
4774
4775 let get_dict_page_size = move |meta: &ColumnChunkMetaData| {
4776 let mut reader =
4777 SerializedPageReader::new(Arc::new(data.clone()), meta, 0, None).unwrap();
4778 let page = reader.get_next_page().unwrap().unwrap();
4779 match page {
4780 Page::DictionaryPage { buf, .. } => buf.len(),
4781 _ => panic!("expected DictionaryPage"),
4782 }
4783 };
4784
4785 assert_eq!(get_dict_page_size(col0_meta), 1024 * 1024);
4786 assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4);
4787 }
4788
4789 struct WriteBatchesShape {
4790 num_batches: usize,
4791 rows_per_batch: usize,
4792 row_size: usize,
4793 }
4794
4795 fn write_batches(
4797 WriteBatchesShape {
4798 num_batches,
4799 rows_per_batch,
4800 row_size,
4801 }: WriteBatchesShape,
4802 props: WriterProperties,
4803 ) -> ParquetRecordBatchReaderBuilder<File> {
4804 let schema = Arc::new(Schema::new(vec![Field::new(
4805 "str",
4806 ArrowDataType::Utf8,
4807 false,
4808 )]));
4809 let file = tempfile::tempfile().unwrap();
4810 let mut writer =
4811 ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
4812
4813 for batch_idx in 0..num_batches {
4814 let strings: Vec<String> = (0..rows_per_batch)
4815 .map(|i| format!("{:0>width$}", batch_idx * 10 + i, width = row_size))
4816 .collect();
4817 let array = StringArray::from(strings);
4818 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
4819 writer.write(&batch).unwrap();
4820 }
4821 writer.close().unwrap();
4822 ParquetRecordBatchReaderBuilder::try_new(file).unwrap()
4823 }
4824
4825 #[test]
4826 fn test_row_group_limit_none_writes_single_row_group() {
4828 let props = WriterProperties::builder()
4829 .set_max_row_group_row_count(None)
4830 .set_max_row_group_bytes(None)
4831 .build();
4832
4833 let builder = write_batches(
4834 WriteBatchesShape {
4835 num_batches: 1,
4836 rows_per_batch: 1000,
4837 row_size: 4,
4838 },
4839 props,
4840 );
4841
4842 assert_eq!(
4843 &row_group_sizes(builder.metadata()),
4844 &[1000],
4845 "With no limits, all rows should be in a single row group"
4846 );
4847 }
4848
4849 #[test]
4850 fn test_row_group_limit_rows_only() {
4852 let props = WriterProperties::builder()
4853 .set_max_row_group_row_count(Some(300))
4854 .set_max_row_group_bytes(None)
4855 .build();
4856
4857 let builder = write_batches(
4858 WriteBatchesShape {
4859 num_batches: 1,
4860 rows_per_batch: 1000,
4861 row_size: 4,
4862 },
4863 props,
4864 );
4865
4866 assert_eq!(
4867 &row_group_sizes(builder.metadata()),
4868 &[300, 300, 300, 100],
4869 "Row groups should be split by row count"
4870 );
4871 }
4872
4873 #[test]
4874 fn test_row_group_limit_bytes_only() {
4876 let props = WriterProperties::builder()
4877 .set_max_row_group_row_count(None)
4878 .set_max_row_group_bytes(Some(3500))
4880 .build();
4881
4882 let builder = write_batches(
4883 WriteBatchesShape {
4884 num_batches: 10,
4885 rows_per_batch: 10,
4886 row_size: 100,
4887 },
4888 props,
4889 );
4890
4891 let sizes = row_group_sizes(builder.metadata());
4892
4893 assert!(
4894 sizes.len() > 1,
4895 "Should have multiple row groups due to byte limit, got {sizes:?}",
4896 );
4897
4898 let total_rows: i64 = sizes.iter().sum();
4899 assert_eq!(total_rows, 100, "Total rows should be preserved");
4900 }
4901
4902 #[test]
4903 fn test_row_group_limit_bytes_flushes_when_current_group_already_too_large() {
4905 let schema = Arc::new(Schema::new(vec![Field::new(
4906 "str",
4907 ArrowDataType::Utf8,
4908 false,
4909 )]));
4910 let file = tempfile::tempfile().unwrap();
4911
4912 let props = WriterProperties::builder()
4914 .set_max_row_group_row_count(None)
4915 .set_max_row_group_bytes(None)
4916 .build();
4917 let mut writer =
4918 ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
4919
4920 let first_array = StringArray::from(
4921 (0..10)
4922 .map(|i| format!("{:0>100}", i))
4923 .collect::<Vec<String>>(),
4924 );
4925 let first_batch =
4926 RecordBatch::try_new(schema.clone(), vec![Arc::new(first_array)]).unwrap();
4927 writer.write(&first_batch).unwrap();
4928 assert_eq!(writer.in_progress_rows(), 10);
4929
4930 writer.max_row_group_bytes = Some(1);
4933
4934 let second_array = StringArray::from(vec!["x".to_string()]);
4935 let second_batch =
4936 RecordBatch::try_new(schema.clone(), vec![Arc::new(second_array)]).unwrap();
4937 writer.write(&second_batch).unwrap();
4938 writer.close().unwrap();
4939 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4940
4941 assert_eq!(
4942 &row_group_sizes(builder.metadata()),
4943 &[10, 1],
4944 "The second write should flush an oversized in-progress row group first",
4945 );
4946 }
4947
4948 #[test]
4949 fn test_row_group_limit_both_row_wins_single_batch() {
4951 let props = WriterProperties::builder()
4952 .set_max_row_group_row_count(Some(200)) .set_max_row_group_bytes(Some(1024 * 1024)) .build();
4955
4956 let builder = write_batches(
4957 WriteBatchesShape {
4958 num_batches: 1,
4959 row_size: 4,
4960 rows_per_batch: 1000,
4961 },
4962 props,
4963 );
4964
4965 assert_eq!(
4966 &row_group_sizes(builder.metadata()),
4967 &[200, 200, 200, 200, 200],
4968 "Row limit should trigger before byte limit"
4969 );
4970 }
4971
4972 #[test]
4973 fn test_row_group_limit_both_row_wins_multiple_batches() {
4975 let props = WriterProperties::builder()
4976 .set_max_row_group_row_count(Some(5)) .set_max_row_group_bytes(Some(9999)) .build();
4979
4980 let builder = write_batches(
4981 WriteBatchesShape {
4982 num_batches: 10,
4983 rows_per_batch: 10,
4984 row_size: 100,
4985 },
4986 props,
4987 );
4988
4989 assert_eq!(
4990 &row_group_sizes(builder.metadata()),
4991 &[5; 20],
4992 "Row limit should trigger before byte limit"
4993 );
4994 }
4995
4996 #[test]
4997 fn test_row_group_limit_both_bytes_wins() {
4999 let props = WriterProperties::builder()
5000 .set_max_row_group_row_count(Some(1000)) .set_max_row_group_bytes(Some(3500)) .build();
5003
5004 let builder = write_batches(
5005 WriteBatchesShape {
5006 num_batches: 10,
5007 rows_per_batch: 10,
5008 row_size: 100,
5009 },
5010 props,
5011 );
5012
5013 let sizes = row_group_sizes(builder.metadata());
5014
5015 assert!(
5016 sizes.len() > 1,
5017 "Byte limit should trigger before row limit, got {sizes:?}",
5018 );
5019
5020 assert!(
5021 sizes.iter().all(|&s| s < 1000),
5022 "No row group should hit the row limit"
5023 );
5024
5025 let total_rows: i64 = sizes.iter().sum();
5026 assert_eq!(total_rows, 100, "Total rows should be preserved");
5027 }
5028}