1use crate::column::chunker::ContentDefinedChunker;
21
22use bytes::Bytes;
23use std::io::{Read, Write};
24use std::iter::Peekable;
25use std::slice::Iter;
26use std::sync::{Arc, Mutex};
27use std::vec::IntoIter;
28
29use arrow_array::cast::AsArray;
30use arrow_array::types::*;
31use arrow_array::{ArrayRef, Int32Array, RecordBatch, RecordBatchWriter};
32use arrow_schema::{
33 ArrowError, DataType as ArrowDataType, Field, IntervalUnit, SchemaRef, TimeUnit,
34};
35
36use super::schema::{add_encoded_arrow_schema_to_metadata, decimal_length_from_precision};
37
38use crate::arrow::ArrowSchemaConverter;
39use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder;
40use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter};
41use crate::column::page_encryption::PageEncryptor;
42use crate::column::writer::encoder::ColumnValueEncoder;
43use crate::column::writer::{
44 ColumnCloseResult, ColumnWriter, GenericColumnWriter, get_column_writer,
45};
46use crate::data_type::{ByteArray, FixedLenByteArray};
47#[cfg(feature = "encryption")]
48use crate::encryption::encrypt::FileEncryptor;
49use crate::errors::{ParquetError, Result};
50use crate::file::metadata::{KeyValue, ParquetMetaData, RowGroupMetaData};
51use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
52use crate::file::reader::{ChunkReader, Length};
53use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
54use crate::parquet_thrift::{ThriftCompactOutputProtocol, WriteThrift};
55use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor};
56use levels::{ArrayLevels, calculate_array_levels};
57
58mod byte_array;
59mod levels;
60
61pub struct ArrowWriter<W: Write> {
178 writer: SerializedFileWriter<W>,
180
181 in_progress: Option<ArrowRowGroupWriter>,
183
184 arrow_schema: SchemaRef,
188
189 row_group_writer_factory: ArrowRowGroupWriterFactory,
191
192 max_row_group_row_count: Option<usize>,
194
195 max_row_group_bytes: Option<usize>,
197
198 cdc_chunkers: Option<Vec<ContentDefinedChunker>>,
200}
201
202impl<W: Write + Send> std::fmt::Debug for ArrowWriter<W> {
203 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
204 let buffered_memory = self.in_progress_size();
205 f.debug_struct("ArrowWriter")
206 .field("writer", &self.writer)
207 .field("in_progress_size", &format_args!("{buffered_memory} bytes"))
208 .field("in_progress_rows", &self.in_progress_rows())
209 .field("arrow_schema", &self.arrow_schema)
210 .field("max_row_group_row_count", &self.max_row_group_row_count)
211 .field("max_row_group_bytes", &self.max_row_group_bytes)
212 .finish()
213 }
214}
215
216impl<W: Write + Send> ArrowWriter<W> {
217 pub fn try_new(
223 writer: W,
224 arrow_schema: SchemaRef,
225 props: Option<WriterProperties>,
226 ) -> Result<Self> {
227 let options = ArrowWriterOptions::new().with_properties(props.unwrap_or_default());
228 Self::try_new_with_options(writer, arrow_schema, options)
229 }
230
231 pub fn try_new_with_options(
237 writer: W,
238 arrow_schema: SchemaRef,
239 options: ArrowWriterOptions,
240 ) -> Result<Self> {
241 let mut props = options.properties;
242
243 let schema = if let Some(parquet_schema) = options.schema_descr {
244 parquet_schema.clone()
245 } else {
246 let mut converter = ArrowSchemaConverter::new().with_coerce_types(props.coerce_types());
247 if let Some(schema_root) = &options.schema_root {
248 converter = converter.schema_root(schema_root);
249 }
250
251 converter.convert(&arrow_schema)?
252 };
253
254 if !options.skip_arrow_metadata {
255 add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
257 }
258
259 let max_row_group_row_count = props.max_row_group_row_count();
260 let max_row_group_bytes = props.max_row_group_bytes();
261
262 let props_ptr = Arc::new(props);
263 let file_writer =
264 SerializedFileWriter::new(writer, schema.root_schema_ptr(), Arc::clone(&props_ptr))?;
265
266 let row_group_writer_factory =
267 ArrowRowGroupWriterFactory::new(&file_writer, arrow_schema.clone());
268
269 let cdc_chunkers = props_ptr
270 .content_defined_chunking()
271 .map(|opts| {
272 file_writer
273 .schema_descr()
274 .columns()
275 .iter()
276 .map(|desc| ContentDefinedChunker::new(desc, opts))
277 .collect::<Result<Vec<_>>>()
278 })
279 .transpose()?;
280
281 Ok(Self {
282 writer: file_writer,
283 in_progress: None,
284 arrow_schema,
285 row_group_writer_factory,
286 max_row_group_row_count,
287 max_row_group_bytes,
288 cdc_chunkers,
289 })
290 }
291
292 pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
294 self.writer.flushed_row_groups()
295 }
296
297 pub fn memory_size(&self) -> usize {
302 match &self.in_progress {
303 Some(in_progress) => in_progress.writers.iter().map(|x| x.memory_size()).sum(),
304 None => 0,
305 }
306 }
307
308 pub fn in_progress_size(&self) -> usize {
315 match &self.in_progress {
316 Some(in_progress) => in_progress
317 .writers
318 .iter()
319 .map(|x| x.get_estimated_total_bytes())
320 .sum(),
321 None => 0,
322 }
323 }
324
325 pub fn in_progress_rows(&self) -> usize {
327 self.in_progress
328 .as_ref()
329 .map(|x| x.buffered_rows)
330 .unwrap_or_default()
331 }
332
333 pub fn bytes_written(&self) -> usize {
335 self.writer.bytes_written()
336 }
337
338 pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
350 if batch.num_rows() == 0 {
351 return Ok(());
352 }
353
354 let in_progress = match &mut self.in_progress {
355 Some(in_progress) => in_progress,
356 x => x.insert(
357 self.row_group_writer_factory
358 .create_row_group_writer(self.writer.flushed_row_groups().len())?,
359 ),
360 };
361
362 if let Some(max_rows) = self.max_row_group_row_count {
363 if in_progress.buffered_rows + batch.num_rows() > max_rows {
364 let to_write = max_rows - in_progress.buffered_rows;
365 let a = batch.slice(0, to_write);
366 let b = batch.slice(to_write, batch.num_rows() - to_write);
367 self.write(&a)?;
368 return self.write(&b);
369 }
370 }
371
372 if let Some(max_bytes) = self.max_row_group_bytes {
375 if in_progress.buffered_rows > 0 {
376 let current_bytes = in_progress.get_estimated_total_bytes();
377
378 if current_bytes >= max_bytes {
379 self.flush()?;
380 return self.write(batch);
381 }
382
383 let avg_row_bytes = current_bytes / in_progress.buffered_rows;
384 if avg_row_bytes > 0 {
385 let remaining_bytes = max_bytes - current_bytes;
387 let rows_that_fit = remaining_bytes / avg_row_bytes;
388
389 if batch.num_rows() > rows_that_fit {
390 if rows_that_fit > 0 {
391 let a = batch.slice(0, rows_that_fit);
392 let b = batch.slice(rows_that_fit, batch.num_rows() - rows_that_fit);
393 self.write(&a)?;
394 return self.write(&b);
395 } else {
396 self.flush()?;
397 return self.write(batch);
398 }
399 }
400 }
401 }
402 }
403
404 match self.cdc_chunkers.as_mut() {
405 Some(chunkers) => in_progress.write_with_chunkers(batch, chunkers)?,
406 None => in_progress.write(batch)?,
407 }
408
409 let should_flush = self
410 .max_row_group_row_count
411 .is_some_and(|max| in_progress.buffered_rows >= max)
412 || self
413 .max_row_group_bytes
414 .is_some_and(|max| in_progress.get_estimated_total_bytes() >= max);
415
416 if should_flush {
417 self.flush()?
418 }
419 Ok(())
420 }
421
422 pub fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
427 self.writer.write_all(buf)
428 }
429
430 pub fn sync(&mut self) -> std::io::Result<()> {
432 self.writer.flush()
433 }
434
435 pub fn flush(&mut self) -> Result<()> {
440 let in_progress = match self.in_progress.take() {
441 Some(in_progress) => in_progress,
442 None => return Ok(()),
443 };
444
445 let mut row_group_writer = self.writer.next_row_group()?;
446 for chunk in in_progress.close()? {
447 chunk.append_to_row_group(&mut row_group_writer)?;
448 }
449 row_group_writer.close()?;
450 Ok(())
451 }
452
453 pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
457 self.writer.append_key_value_metadata(kv_metadata)
458 }
459
460 pub fn inner(&self) -> &W {
462 self.writer.inner()
463 }
464
465 pub fn inner_mut(&mut self) -> &mut W {
474 self.writer.inner_mut()
475 }
476
477 pub fn into_inner(mut self) -> Result<W> {
479 self.flush()?;
480 self.writer.into_inner()
481 }
482
483 pub fn finish(&mut self) -> Result<ParquetMetaData> {
489 self.flush()?;
490 self.writer.finish()
491 }
492
493 pub fn close(mut self) -> Result<ParquetMetaData> {
495 self.finish()
496 }
497
498 #[deprecated(
500 since = "56.2.0",
501 note = "Use `ArrowRowGroupWriterFactory` instead, see `ArrowColumnWriter` for an example"
502 )]
503 pub fn get_column_writers(&mut self) -> Result<Vec<ArrowColumnWriter>> {
504 self.flush()?;
505 let in_progress = self
506 .row_group_writer_factory
507 .create_row_group_writer(self.writer.flushed_row_groups().len())?;
508 Ok(in_progress.writers)
509 }
510
511 #[deprecated(
513 since = "56.2.0",
514 note = "Use `SerializedFileWriter` directly instead, see `ArrowColumnWriter` for an example"
515 )]
516 pub fn append_row_group(&mut self, chunks: Vec<ArrowColumnChunk>) -> Result<()> {
517 let mut row_group_writer = self.writer.next_row_group()?;
518 for chunk in chunks {
519 chunk.append_to_row_group(&mut row_group_writer)?;
520 }
521 row_group_writer.close()?;
522 Ok(())
523 }
524
525 pub fn into_serialized_writer(
532 mut self,
533 ) -> Result<(SerializedFileWriter<W>, ArrowRowGroupWriterFactory)> {
534 self.flush()?;
535 Ok((self.writer, self.row_group_writer_factory))
536 }
537}
538
539impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> {
540 fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
541 self.write(batch).map_err(|e| e.into())
542 }
543
544 fn close(self) -> std::result::Result<(), ArrowError> {
545 self.close()?;
546 Ok(())
547 }
548}
549
550#[derive(Debug, Clone, Default)]
554pub struct ArrowWriterOptions {
555 properties: WriterProperties,
556 skip_arrow_metadata: bool,
557 schema_root: Option<String>,
558 schema_descr: Option<SchemaDescriptor>,
559}
560
561impl ArrowWriterOptions {
562 pub fn new() -> Self {
564 Self::default()
565 }
566
567 pub fn with_properties(self, properties: WriterProperties) -> Self {
569 Self { properties, ..self }
570 }
571
572 pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
579 Self {
580 skip_arrow_metadata,
581 ..self
582 }
583 }
584
585 pub fn with_schema_root(self, schema_root: String) -> Self {
587 Self {
588 schema_root: Some(schema_root),
589 ..self
590 }
591 }
592
593 pub fn with_parquet_schema(self, schema_descr: SchemaDescriptor) -> Self {
599 Self {
600 schema_descr: Some(schema_descr),
601 ..self
602 }
603 }
604}
605
606#[derive(Default)]
608struct ArrowColumnChunkData {
609 length: usize,
610 data: Vec<Bytes>,
611}
612
613impl Length for ArrowColumnChunkData {
614 fn len(&self) -> u64 {
615 self.length as _
616 }
617}
618
619impl ChunkReader for ArrowColumnChunkData {
620 type T = ArrowColumnChunkReader;
621
622 fn get_read(&self, start: u64) -> Result<Self::T> {
623 assert_eq!(start, 0); Ok(ArrowColumnChunkReader(
625 self.data.clone().into_iter().peekable(),
626 ))
627 }
628
629 fn get_bytes(&self, _start: u64, _length: usize) -> Result<Bytes> {
630 unimplemented!()
631 }
632}
633
634struct ArrowColumnChunkReader(Peekable<IntoIter<Bytes>>);
636
637impl Read for ArrowColumnChunkReader {
638 fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
639 let buffer = loop {
640 match self.0.peek_mut() {
641 Some(b) if b.is_empty() => {
642 self.0.next();
643 continue;
644 }
645 Some(b) => break b,
646 None => return Ok(0),
647 }
648 };
649
650 let len = buffer.len().min(out.len());
651 let b = buffer.split_to(len);
652 out[..len].copy_from_slice(&b);
653 Ok(len)
654 }
655}
656
657type SharedColumnChunk = Arc<Mutex<ArrowColumnChunkData>>;
662
663#[derive(Default)]
664struct ArrowPageWriter {
665 buffer: SharedColumnChunk,
666 #[cfg(feature = "encryption")]
667 page_encryptor: Option<PageEncryptor>,
668}
669
670impl ArrowPageWriter {
671 #[cfg(feature = "encryption")]
672 pub fn with_encryptor(mut self, page_encryptor: Option<PageEncryptor>) -> Self {
673 self.page_encryptor = page_encryptor;
674 self
675 }
676
677 #[cfg(feature = "encryption")]
678 fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
679 self.page_encryptor.as_mut()
680 }
681
682 #[cfg(not(feature = "encryption"))]
683 fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
684 None
685 }
686}
687
688impl PageWriter for ArrowPageWriter {
689 fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
690 let page = match self.page_encryptor_mut() {
691 Some(page_encryptor) => page_encryptor.encrypt_compressed_page(page)?,
692 None => page,
693 };
694
695 let page_header = page.to_thrift_header()?;
696 let header = {
697 let mut header = Vec::with_capacity(1024);
698
699 match self.page_encryptor_mut() {
700 Some(page_encryptor) => {
701 page_encryptor.encrypt_page_header(&page_header, &mut header)?;
702 if page.compressed_page().is_data_page() {
703 page_encryptor.increment_page();
704 }
705 }
706 None => {
707 let mut protocol = ThriftCompactOutputProtocol::new(&mut header);
708 page_header.write_thrift(&mut protocol)?;
709 }
710 };
711
712 Bytes::from(header)
713 };
714
715 let mut buf = self.buffer.try_lock().unwrap();
716
717 let data = page.compressed_page().buffer().clone();
718 let compressed_size = data.len() + header.len();
719
720 let mut spec = PageWriteSpec::new();
721 spec.page_type = page.page_type();
722 spec.num_values = page.num_values();
723 spec.uncompressed_size = page.uncompressed_size() + header.len();
724 spec.offset = buf.length as u64;
725 spec.compressed_size = compressed_size;
726 spec.bytes_written = compressed_size as u64;
727
728 buf.length += compressed_size;
729 buf.data.push(header);
730 buf.data.push(data);
731
732 Ok(spec)
733 }
734
735 fn close(&mut self) -> Result<()> {
736 Ok(())
737 }
738}
739
740#[derive(Debug)]
742pub struct ArrowLeafColumn(ArrayLevels);
743
744pub fn compute_leaves(field: &Field, array: &ArrayRef) -> Result<Vec<ArrowLeafColumn>> {
749 let levels = calculate_array_levels(array, field)?;
750 Ok(levels.into_iter().map(ArrowLeafColumn).collect())
751}
752
753pub struct ArrowColumnChunk {
755 data: ArrowColumnChunkData,
756 close: ColumnCloseResult,
757}
758
759impl std::fmt::Debug for ArrowColumnChunk {
760 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
761 f.debug_struct("ArrowColumnChunk")
762 .field("length", &self.data.length)
763 .finish_non_exhaustive()
764 }
765}
766
767impl ArrowColumnChunk {
768 pub fn close(&self) -> &ColumnCloseResult {
775 &self.close
776 }
777
778 pub fn close_mut(&mut self) -> &mut ColumnCloseResult {
785 &mut self.close
786 }
787
788 pub fn append_to_row_group<W: Write + Send>(
790 self,
791 writer: &mut SerializedRowGroupWriter<'_, W>,
792 ) -> Result<()> {
793 writer.append_column(&self.data, self.close)
794 }
795}
796
797pub struct ArrowColumnWriter {
895 writer: ArrowColumnWriterImpl,
896 chunk: SharedColumnChunk,
897}
898
899impl std::fmt::Debug for ArrowColumnWriter {
900 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
901 f.debug_struct("ArrowColumnWriter").finish_non_exhaustive()
902 }
903}
904
905enum ArrowColumnWriterImpl {
906 ByteArray(GenericColumnWriter<'static, ByteArrayEncoder>),
907 Column(ColumnWriter<'static>),
908}
909
910impl ArrowColumnWriter {
911 pub fn write(&mut self, col: &ArrowLeafColumn) -> Result<()> {
913 self.write_internal(&col.0)
914 }
915
916 fn write_with_chunker(
918 &mut self,
919 col: &ArrowLeafColumn,
920 chunker: &mut ContentDefinedChunker,
921 ) -> Result<()> {
922 let levels = &col.0;
923 let chunks = chunker.get_arrow_chunks(
924 levels.def_level_data().as_ref(),
925 levels.rep_level_data().as_ref(),
926 levels.array(),
927 )?;
928
929 let num_chunks = chunks.len();
930 for (i, chunk) in chunks.iter().enumerate() {
931 let chunk_levels = levels.slice_for_chunk(chunk);
932 self.write_internal(&chunk_levels)?;
933
934 if i + 1 < num_chunks {
936 match &mut self.writer {
937 ArrowColumnWriterImpl::Column(c) => c.add_data_page()?,
938 ArrowColumnWriterImpl::ByteArray(c) => c.add_data_page()?,
939 }
940 }
941 }
942 Ok(())
943 }
944
945 fn write_internal(&mut self, levels: &ArrayLevels) -> Result<()> {
946 match &mut self.writer {
947 ArrowColumnWriterImpl::Column(c) => {
948 let leaf = levels.array();
949 match leaf.as_any_dictionary_opt() {
950 Some(dictionary) => {
951 let materialized =
952 arrow_select::take::take(dictionary.values(), dictionary.keys(), None)?;
953 write_leaf(c, &materialized, levels)?
954 }
955 None => write_leaf(c, leaf, levels)?,
956 };
957 }
958 ArrowColumnWriterImpl::ByteArray(c) => {
959 write_primitive(c, levels.array().as_ref(), levels)?;
960 }
961 }
962 Ok(())
963 }
964
965 pub fn close(self) -> Result<ArrowColumnChunk> {
967 let close = match self.writer {
968 ArrowColumnWriterImpl::ByteArray(c) => c.close()?,
969 ArrowColumnWriterImpl::Column(c) => c.close()?,
970 };
971 let chunk = Arc::try_unwrap(self.chunk).ok().unwrap();
972 let data = chunk.into_inner().unwrap();
973 Ok(ArrowColumnChunk { data, close })
974 }
975
976 pub fn memory_size(&self) -> usize {
987 match &self.writer {
988 ArrowColumnWriterImpl::ByteArray(c) => c.memory_size(),
989 ArrowColumnWriterImpl::Column(c) => c.memory_size(),
990 }
991 }
992
993 pub fn get_estimated_total_bytes(&self) -> usize {
1001 match &self.writer {
1002 ArrowColumnWriterImpl::ByteArray(c) => c.get_estimated_total_bytes() as _,
1003 ArrowColumnWriterImpl::Column(c) => c.get_estimated_total_bytes() as _,
1004 }
1005 }
1006}
1007
1008#[derive(Debug)]
1015struct ArrowRowGroupWriter {
1016 writers: Vec<ArrowColumnWriter>,
1017 schema: SchemaRef,
1018 buffered_rows: usize,
1019}
1020
1021impl ArrowRowGroupWriter {
1022 fn new(writers: Vec<ArrowColumnWriter>, arrow: &SchemaRef) -> Self {
1023 Self {
1024 writers,
1025 schema: arrow.clone(),
1026 buffered_rows: 0,
1027 }
1028 }
1029
1030 fn write(&mut self, batch: &RecordBatch) -> Result<()> {
1031 self.buffered_rows += batch.num_rows();
1032 let mut writers = self.writers.iter_mut();
1033 for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
1034 for leaf in compute_leaves(field.as_ref(), column)? {
1035 writers.next().unwrap().write(&leaf)?;
1036 }
1037 }
1038 Ok(())
1039 }
1040
1041 fn write_with_chunkers(
1042 &mut self,
1043 batch: &RecordBatch,
1044 chunkers: &mut [ContentDefinedChunker],
1045 ) -> Result<()> {
1046 self.buffered_rows += batch.num_rows();
1047 let mut writers = self.writers.iter_mut();
1048 let mut chunkers = chunkers.iter_mut();
1049 for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
1050 for leaf in compute_leaves(field.as_ref(), column)? {
1051 writers
1052 .next()
1053 .unwrap()
1054 .write_with_chunker(&leaf, chunkers.next().unwrap())?;
1055 }
1056 }
1057 Ok(())
1058 }
1059
1060 fn get_estimated_total_bytes(&self) -> usize {
1062 self.writers
1063 .iter()
1064 .map(|x| x.get_estimated_total_bytes())
1065 .sum()
1066 }
1067
1068 fn close(self) -> Result<Vec<ArrowColumnChunk>> {
1069 self.writers
1070 .into_iter()
1071 .map(|writer| writer.close())
1072 .collect()
1073 }
1074}
1075
1076#[derive(Debug)]
1081pub struct ArrowRowGroupWriterFactory {
1082 schema: SchemaDescPtr,
1083 arrow_schema: SchemaRef,
1084 props: WriterPropertiesPtr,
1085 #[cfg(feature = "encryption")]
1086 file_encryptor: Option<Arc<FileEncryptor>>,
1087}
1088
1089impl ArrowRowGroupWriterFactory {
1090 pub fn new<W: Write + Send>(
1092 file_writer: &SerializedFileWriter<W>,
1093 arrow_schema: SchemaRef,
1094 ) -> Self {
1095 let schema = Arc::clone(file_writer.schema_descr_ptr());
1096 let props = Arc::clone(file_writer.properties());
1097 Self {
1098 schema,
1099 arrow_schema,
1100 props,
1101 #[cfg(feature = "encryption")]
1102 file_encryptor: file_writer.file_encryptor(),
1103 }
1104 }
1105
1106 fn create_row_group_writer(&self, row_group_index: usize) -> Result<ArrowRowGroupWriter> {
1107 let writers = self.create_column_writers(row_group_index)?;
1108 Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema))
1109 }
1110
1111 pub fn create_column_writers(&self, row_group_index: usize) -> Result<Vec<ArrowColumnWriter>> {
1113 let mut writers = Vec::with_capacity(self.arrow_schema.fields.len());
1114 let mut leaves = self.schema.columns().iter();
1115 let column_factory = self.column_writer_factory(row_group_index);
1116 for field in &self.arrow_schema.fields {
1117 column_factory.get_arrow_column_writer(
1118 field.data_type(),
1119 &self.props,
1120 &mut leaves,
1121 &mut writers,
1122 )?;
1123 }
1124 Ok(writers)
1125 }
1126
1127 #[cfg(feature = "encryption")]
1128 fn column_writer_factory(&self, row_group_idx: usize) -> ArrowColumnWriterFactory {
1129 ArrowColumnWriterFactory::new()
1130 .with_file_encryptor(row_group_idx, self.file_encryptor.clone())
1131 }
1132
1133 #[cfg(not(feature = "encryption"))]
1134 fn column_writer_factory(&self, _row_group_idx: usize) -> ArrowColumnWriterFactory {
1135 ArrowColumnWriterFactory::new()
1136 }
1137}
1138
1139#[deprecated(since = "57.0.0", note = "Use `ArrowRowGroupWriterFactory` instead")]
1141pub fn get_column_writers(
1142 parquet: &SchemaDescriptor,
1143 props: &WriterPropertiesPtr,
1144 arrow: &SchemaRef,
1145) -> Result<Vec<ArrowColumnWriter>> {
1146 let mut writers = Vec::with_capacity(arrow.fields.len());
1147 let mut leaves = parquet.columns().iter();
1148 let column_factory = ArrowColumnWriterFactory::new();
1149 for field in &arrow.fields {
1150 column_factory.get_arrow_column_writer(
1151 field.data_type(),
1152 props,
1153 &mut leaves,
1154 &mut writers,
1155 )?;
1156 }
1157 Ok(writers)
1158}
1159
1160struct ArrowColumnWriterFactory {
1162 #[cfg(feature = "encryption")]
1163 row_group_index: usize,
1164 #[cfg(feature = "encryption")]
1165 file_encryptor: Option<Arc<FileEncryptor>>,
1166}
1167
1168impl ArrowColumnWriterFactory {
1169 pub fn new() -> Self {
1170 Self {
1171 #[cfg(feature = "encryption")]
1172 row_group_index: 0,
1173 #[cfg(feature = "encryption")]
1174 file_encryptor: None,
1175 }
1176 }
1177
1178 #[cfg(feature = "encryption")]
1179 pub fn with_file_encryptor(
1180 mut self,
1181 row_group_index: usize,
1182 file_encryptor: Option<Arc<FileEncryptor>>,
1183 ) -> Self {
1184 self.row_group_index = row_group_index;
1185 self.file_encryptor = file_encryptor;
1186 self
1187 }
1188
1189 #[cfg(feature = "encryption")]
1190 fn create_page_writer(
1191 &self,
1192 column_descriptor: &ColumnDescPtr,
1193 column_index: usize,
1194 ) -> Result<Box<ArrowPageWriter>> {
1195 let column_path = column_descriptor.path().string();
1196 let page_encryptor = PageEncryptor::create_if_column_encrypted(
1197 &self.file_encryptor,
1198 self.row_group_index,
1199 column_index,
1200 &column_path,
1201 )?;
1202 Ok(Box::new(
1203 ArrowPageWriter::default().with_encryptor(page_encryptor),
1204 ))
1205 }
1206
1207 #[cfg(not(feature = "encryption"))]
1208 fn create_page_writer(
1209 &self,
1210 _column_descriptor: &ColumnDescPtr,
1211 _column_index: usize,
1212 ) -> Result<Box<ArrowPageWriter>> {
1213 Ok(Box::<ArrowPageWriter>::default())
1214 }
1215
1216 fn get_arrow_column_writer(
1219 &self,
1220 data_type: &ArrowDataType,
1221 props: &WriterPropertiesPtr,
1222 leaves: &mut Iter<'_, ColumnDescPtr>,
1223 out: &mut Vec<ArrowColumnWriter>,
1224 ) -> Result<()> {
1225 let col = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
1227 let page_writer = self.create_page_writer(desc, out.len())?;
1228 let chunk = page_writer.buffer.clone();
1229 let writer = get_column_writer(desc.clone(), props.clone(), page_writer);
1230 Ok(ArrowColumnWriter {
1231 chunk,
1232 writer: ArrowColumnWriterImpl::Column(writer),
1233 })
1234 };
1235
1236 let bytes = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
1238 let page_writer = self.create_page_writer(desc, out.len())?;
1239 let chunk = page_writer.buffer.clone();
1240 let writer = GenericColumnWriter::new(desc.clone(), props.clone(), page_writer);
1241 Ok(ArrowColumnWriter {
1242 chunk,
1243 writer: ArrowColumnWriterImpl::ByteArray(writer),
1244 })
1245 };
1246
1247 match data_type {
1248 _ if data_type.is_primitive() => out.push(col(leaves.next().unwrap())?),
1249 ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Boolean | ArrowDataType::Null => {
1250 out.push(col(leaves.next().unwrap())?)
1251 }
1252 ArrowDataType::LargeBinary
1253 | ArrowDataType::Binary
1254 | ArrowDataType::Utf8
1255 | ArrowDataType::LargeUtf8
1256 | ArrowDataType::BinaryView
1257 | ArrowDataType::Utf8View => out.push(bytes(leaves.next().unwrap())?),
1258 ArrowDataType::List(f)
1259 | ArrowDataType::LargeList(f)
1260 | ArrowDataType::FixedSizeList(f, _)
1261 | ArrowDataType::ListView(f)
1262 | ArrowDataType::LargeListView(f) => {
1263 self.get_arrow_column_writer(f.data_type(), props, leaves, out)?
1264 }
1265 ArrowDataType::Struct(fields) => {
1266 for field in fields {
1267 self.get_arrow_column_writer(field.data_type(), props, leaves, out)?
1268 }
1269 }
1270 ArrowDataType::Map(f, _) => match f.data_type() {
1271 ArrowDataType::Struct(f) => {
1272 self.get_arrow_column_writer(f[0].data_type(), props, leaves, out)?;
1273 self.get_arrow_column_writer(f[1].data_type(), props, leaves, out)?
1274 }
1275 _ => unreachable!("invalid map type"),
1276 },
1277 ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1278 ArrowDataType::Utf8
1279 | ArrowDataType::LargeUtf8
1280 | ArrowDataType::Binary
1281 | ArrowDataType::LargeBinary => out.push(bytes(leaves.next().unwrap())?),
1282 ArrowDataType::Utf8View | ArrowDataType::BinaryView => {
1283 out.push(bytes(leaves.next().unwrap())?)
1284 }
1285 ArrowDataType::FixedSizeBinary(_) => out.push(bytes(leaves.next().unwrap())?),
1286 _ => out.push(col(leaves.next().unwrap())?),
1287 },
1288 _ => {
1289 return Err(ParquetError::NYI(format!(
1290 "Attempting to write an Arrow type {data_type} to parquet that is not yet implemented"
1291 )));
1292 }
1293 }
1294 Ok(())
1295 }
1296}
1297
1298fn write_leaf(
1299 writer: &mut ColumnWriter<'_>,
1300 column: &dyn arrow_array::Array,
1301 levels: &ArrayLevels,
1302) -> Result<usize> {
1303 let indices = levels.non_null_indices();
1304
1305 match writer {
1306 ColumnWriter::Int32ColumnWriter(typed) => {
1308 match column.data_type() {
1309 ArrowDataType::Null => {
1310 let array = Int32Array::new_null(column.len());
1311 write_primitive(typed, array.values(), levels)
1312 }
1313 ArrowDataType::Int8 => {
1314 let array: Int32Array = column.as_primitive::<Int8Type>().unary(|x| x as i32);
1315 write_primitive(typed, array.values(), levels)
1316 }
1317 ArrowDataType::Int16 => {
1318 let array: Int32Array = column.as_primitive::<Int16Type>().unary(|x| x as i32);
1319 write_primitive(typed, array.values(), levels)
1320 }
1321 ArrowDataType::Int32 => {
1322 write_primitive(typed, column.as_primitive::<Int32Type>().values(), levels)
1323 }
1324 ArrowDataType::UInt8 => {
1325 let array: Int32Array = column.as_primitive::<UInt8Type>().unary(|x| x as i32);
1326 write_primitive(typed, array.values(), levels)
1327 }
1328 ArrowDataType::UInt16 => {
1329 let array: Int32Array = column.as_primitive::<UInt16Type>().unary(|x| x as i32);
1330 write_primitive(typed, array.values(), levels)
1331 }
1332 ArrowDataType::UInt32 => {
1333 let array = column.as_primitive::<UInt32Type>();
1336 write_primitive(typed, array.values().inner().typed_data(), levels)
1337 }
1338 ArrowDataType::Date32 => {
1339 let array = column.as_primitive::<Date32Type>();
1340 write_primitive(typed, array.values(), levels)
1341 }
1342 ArrowDataType::Time32(TimeUnit::Second) => {
1343 let array = column.as_primitive::<Time32SecondType>();
1344 write_primitive(typed, array.values(), levels)
1345 }
1346 ArrowDataType::Time32(TimeUnit::Millisecond) => {
1347 let array = column.as_primitive::<Time32MillisecondType>();
1348 write_primitive(typed, array.values(), levels)
1349 }
1350 ArrowDataType::Date64 => {
1351 let array: Int32Array = column
1353 .as_primitive::<Date64Type>()
1354 .unary(|x| (x / 86_400_000) as _);
1355
1356 write_primitive(typed, array.values(), levels)
1357 }
1358 ArrowDataType::Decimal32(_, _) => {
1359 let array = column
1360 .as_primitive::<Decimal32Type>()
1361 .unary::<_, Int32Type>(|v| v);
1362 write_primitive(typed, array.values(), levels)
1363 }
1364 ArrowDataType::Decimal64(_, _) => {
1365 let array = column
1367 .as_primitive::<Decimal64Type>()
1368 .unary::<_, Int32Type>(|v| v as i32);
1369 write_primitive(typed, array.values(), levels)
1370 }
1371 ArrowDataType::Decimal128(_, _) => {
1372 let array = column
1374 .as_primitive::<Decimal128Type>()
1375 .unary::<_, Int32Type>(|v| v as i32);
1376 write_primitive(typed, array.values(), levels)
1377 }
1378 ArrowDataType::Decimal256(_, _) => {
1379 let array = column
1381 .as_primitive::<Decimal256Type>()
1382 .unary::<_, Int32Type>(|v| v.as_i128() as i32);
1383 write_primitive(typed, array.values(), levels)
1384 }
1385 d => Err(ParquetError::General(format!("Cannot coerce {d} to I32"))),
1386 }
1387 }
1388 ColumnWriter::BoolColumnWriter(typed) => {
1389 let array = column.as_boolean();
1390 let values = get_bool_array_slice(array, indices.iter().copied());
1391 typed.write_batch_internal(
1392 values.as_slice(),
1393 None,
1394 levels.def_level_data().as_ref(),
1395 levels.rep_level_data().as_ref(),
1396 None,
1397 None,
1398 None,
1399 )
1400 }
1401 ColumnWriter::Int64ColumnWriter(typed) => {
1402 match column.data_type() {
1403 ArrowDataType::Date64 => {
1404 let array = column
1405 .as_primitive::<Date64Type>()
1406 .reinterpret_cast::<Int64Type>();
1407
1408 write_primitive(typed, array.values(), levels)
1409 }
1410 ArrowDataType::Int64 => {
1411 let array = column.as_primitive::<Int64Type>();
1412 write_primitive(typed, array.values(), levels)
1413 }
1414 ArrowDataType::UInt64 => {
1415 let values = column.as_primitive::<UInt64Type>().values();
1416 let array = values.inner().typed_data::<i64>();
1419 write_primitive(typed, array, levels)
1420 }
1421 ArrowDataType::Time64(TimeUnit::Microsecond) => {
1422 let array = column.as_primitive::<Time64MicrosecondType>();
1423 write_primitive(typed, array.values(), levels)
1424 }
1425 ArrowDataType::Time64(TimeUnit::Nanosecond) => {
1426 let array = column.as_primitive::<Time64NanosecondType>();
1427 write_primitive(typed, array.values(), levels)
1428 }
1429 ArrowDataType::Timestamp(unit, _) => match unit {
1430 TimeUnit::Second => {
1431 let array = column.as_primitive::<TimestampSecondType>();
1432 write_primitive(typed, array.values(), levels)
1433 }
1434 TimeUnit::Millisecond => {
1435 let array = column.as_primitive::<TimestampMillisecondType>();
1436 write_primitive(typed, array.values(), levels)
1437 }
1438 TimeUnit::Microsecond => {
1439 let array = column.as_primitive::<TimestampMicrosecondType>();
1440 write_primitive(typed, array.values(), levels)
1441 }
1442 TimeUnit::Nanosecond => {
1443 let array = column.as_primitive::<TimestampNanosecondType>();
1444 write_primitive(typed, array.values(), levels)
1445 }
1446 },
1447 ArrowDataType::Duration(unit) => match unit {
1448 TimeUnit::Second => {
1449 let array = column.as_primitive::<DurationSecondType>();
1450 write_primitive(typed, array.values(), levels)
1451 }
1452 TimeUnit::Millisecond => {
1453 let array = column.as_primitive::<DurationMillisecondType>();
1454 write_primitive(typed, array.values(), levels)
1455 }
1456 TimeUnit::Microsecond => {
1457 let array = column.as_primitive::<DurationMicrosecondType>();
1458 write_primitive(typed, array.values(), levels)
1459 }
1460 TimeUnit::Nanosecond => {
1461 let array = column.as_primitive::<DurationNanosecondType>();
1462 write_primitive(typed, array.values(), levels)
1463 }
1464 },
1465 ArrowDataType::Decimal64(_, _) => {
1466 let array = column
1467 .as_primitive::<Decimal64Type>()
1468 .reinterpret_cast::<Int64Type>();
1469 write_primitive(typed, array.values(), levels)
1470 }
1471 ArrowDataType::Decimal128(_, _) => {
1472 let array = column
1474 .as_primitive::<Decimal128Type>()
1475 .unary::<_, Int64Type>(|v| v as i64);
1476 write_primitive(typed, array.values(), levels)
1477 }
1478 ArrowDataType::Decimal256(_, _) => {
1479 let array = column
1481 .as_primitive::<Decimal256Type>()
1482 .unary::<_, Int64Type>(|v| v.as_i128() as i64);
1483 write_primitive(typed, array.values(), levels)
1484 }
1485 d => Err(ParquetError::General(format!("Cannot coerce {d} to I64"))),
1486 }
1487 }
1488 ColumnWriter::Int96ColumnWriter(_typed) => {
1489 unreachable!("Currently unreachable because data type not supported")
1490 }
1491 ColumnWriter::FloatColumnWriter(typed) => {
1492 let array = column.as_primitive::<Float32Type>();
1493 write_primitive(typed, array.values(), levels)
1494 }
1495 ColumnWriter::DoubleColumnWriter(typed) => {
1496 let array = column.as_primitive::<Float64Type>();
1497 write_primitive(typed, array.values(), levels)
1498 }
1499 ColumnWriter::ByteArrayColumnWriter(_) => {
1500 unreachable!("should use ByteArrayWriter")
1501 }
1502 ColumnWriter::FixedLenByteArrayColumnWriter(typed) => {
1503 let bytes = match column.data_type() {
1504 ArrowDataType::Interval(interval_unit) => match interval_unit {
1505 IntervalUnit::YearMonth => {
1506 let array = column.as_primitive::<IntervalYearMonthType>();
1507 get_interval_ym_array_slice(array, indices.iter().copied())
1508 }
1509 IntervalUnit::DayTime => {
1510 let array = column.as_primitive::<IntervalDayTimeType>();
1511 get_interval_dt_array_slice(array, indices.iter().copied())
1512 }
1513 _ => {
1514 return Err(ParquetError::NYI(format!(
1515 "Attempting to write an Arrow interval type {interval_unit:?} to parquet that is not yet implemented"
1516 )));
1517 }
1518 },
1519 ArrowDataType::FixedSizeBinary(_) => {
1520 let array = column.as_fixed_size_binary();
1521 get_fsb_array_slice(array, indices.iter().copied())
1522 }
1523 ArrowDataType::Decimal32(_, _) => {
1524 let array = column.as_primitive::<Decimal32Type>();
1525 get_decimal_32_array_slice(array, indices.iter().copied())
1526 }
1527 ArrowDataType::Decimal64(_, _) => {
1528 let array = column.as_primitive::<Decimal64Type>();
1529 get_decimal_64_array_slice(array, indices.iter().copied())
1530 }
1531 ArrowDataType::Decimal128(_, _) => {
1532 let array = column.as_primitive::<Decimal128Type>();
1533 get_decimal_128_array_slice(array, indices.iter().copied())
1534 }
1535 ArrowDataType::Decimal256(_, _) => {
1536 let array = column.as_primitive::<Decimal256Type>();
1537 get_decimal_256_array_slice(array, indices.iter().copied())
1538 }
1539 ArrowDataType::Float16 => {
1540 let array = column.as_primitive::<Float16Type>();
1541 get_float_16_array_slice(array, indices.iter().copied())
1542 }
1543 _ => {
1544 return Err(ParquetError::NYI(
1545 "Attempting to write an Arrow type that is not yet implemented".to_string(),
1546 ));
1547 }
1548 };
1549 typed.write_batch_internal(
1550 bytes.as_slice(),
1551 None,
1552 levels.def_level_data().as_ref(),
1553 levels.rep_level_data().as_ref(),
1554 None,
1555 None,
1556 None,
1557 )
1558 }
1559 }
1560}
1561
1562fn write_primitive<E: ColumnValueEncoder>(
1563 writer: &mut GenericColumnWriter<E>,
1564 values: &E::Values,
1565 levels: &ArrayLevels,
1566) -> Result<usize> {
1567 writer.write_batch_internal(
1568 values,
1569 Some(levels.non_null_indices()),
1570 levels.def_level_data().as_ref(),
1571 levels.rep_level_data().as_ref(),
1572 None,
1573 None,
1574 None,
1575 )
1576}
1577
1578fn get_bool_array_slice(
1579 array: &arrow_array::BooleanArray,
1580 indices: impl ExactSizeIterator<Item = usize>,
1581) -> Vec<bool> {
1582 let mut values = Vec::with_capacity(indices.len());
1583 for i in indices {
1584 values.push(array.value(i))
1585 }
1586 values
1587}
1588
1589fn get_interval_ym_array_slice(
1592 array: &arrow_array::IntervalYearMonthArray,
1593 indices: impl ExactSizeIterator<Item = usize>,
1594) -> Vec<FixedLenByteArray> {
1595 let mut values = Vec::with_capacity(indices.len());
1596 for i in indices {
1597 let mut value = array.value(i).to_le_bytes().to_vec();
1598 let mut suffix = vec![0; 8];
1599 value.append(&mut suffix);
1600 values.push(FixedLenByteArray::from(ByteArray::from(value)))
1601 }
1602 values
1603}
1604
1605fn get_interval_dt_array_slice(
1608 array: &arrow_array::IntervalDayTimeArray,
1609 indices: impl ExactSizeIterator<Item = usize>,
1610) -> Vec<FixedLenByteArray> {
1611 let mut values = Vec::with_capacity(indices.len());
1612 for i in indices {
1613 let mut out = [0; 12];
1614 let value = array.value(i);
1615 out[4..8].copy_from_slice(&value.days.to_le_bytes());
1616 out[8..12].copy_from_slice(&value.milliseconds.to_le_bytes());
1617 values.push(FixedLenByteArray::from(ByteArray::from(out.to_vec())));
1618 }
1619 values
1620}
1621
1622fn get_decimal_32_array_slice(
1623 array: &arrow_array::Decimal32Array,
1624 indices: impl ExactSizeIterator<Item = usize>,
1625) -> Vec<FixedLenByteArray> {
1626 let mut values = Vec::with_capacity(indices.len());
1627 let size = decimal_length_from_precision(array.precision());
1628 for i in indices {
1629 let as_be_bytes = array.value(i).to_be_bytes();
1630 let resized_value = as_be_bytes[(4 - size)..].to_vec();
1631 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1632 }
1633 values
1634}
1635
1636fn get_decimal_64_array_slice(
1637 array: &arrow_array::Decimal64Array,
1638 indices: impl ExactSizeIterator<Item = usize>,
1639) -> Vec<FixedLenByteArray> {
1640 let mut values = Vec::with_capacity(indices.len());
1641 let size = decimal_length_from_precision(array.precision());
1642 for i in indices {
1643 let as_be_bytes = array.value(i).to_be_bytes();
1644 let resized_value = as_be_bytes[(8 - size)..].to_vec();
1645 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1646 }
1647 values
1648}
1649
1650fn get_decimal_128_array_slice(
1651 array: &arrow_array::Decimal128Array,
1652 indices: impl ExactSizeIterator<Item = usize>,
1653) -> Vec<FixedLenByteArray> {
1654 let mut values = Vec::with_capacity(indices.len());
1655 let size = decimal_length_from_precision(array.precision());
1656 for i in indices {
1657 let as_be_bytes = array.value(i).to_be_bytes();
1658 let resized_value = as_be_bytes[(16 - size)..].to_vec();
1659 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1660 }
1661 values
1662}
1663
1664fn get_decimal_256_array_slice(
1665 array: &arrow_array::Decimal256Array,
1666 indices: impl ExactSizeIterator<Item = usize>,
1667) -> Vec<FixedLenByteArray> {
1668 let mut values = Vec::with_capacity(indices.len());
1669 let size = decimal_length_from_precision(array.precision());
1670 for i in indices {
1671 let as_be_bytes = array.value(i).to_be_bytes();
1672 let resized_value = as_be_bytes[(32 - size)..].to_vec();
1673 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1674 }
1675 values
1676}
1677
1678fn get_float_16_array_slice(
1679 array: &arrow_array::Float16Array,
1680 indices: impl ExactSizeIterator<Item = usize>,
1681) -> Vec<FixedLenByteArray> {
1682 let mut values = Vec::with_capacity(indices.len());
1683 for i in indices {
1684 let value = array.value(i).to_le_bytes().to_vec();
1685 values.push(FixedLenByteArray::from(ByteArray::from(value)));
1686 }
1687 values
1688}
1689
1690fn get_fsb_array_slice(
1691 array: &arrow_array::FixedSizeBinaryArray,
1692 indices: impl ExactSizeIterator<Item = usize>,
1693) -> Vec<FixedLenByteArray> {
1694 let mut values = Vec::with_capacity(indices.len());
1695 for i in indices {
1696 let value = array.value(i).to_vec();
1697 values.push(FixedLenByteArray::from(ByteArray::from(value)))
1698 }
1699 values
1700}
1701
1702#[cfg(test)]
1703mod tests {
1704 use super::*;
1705 use std::collections::HashMap;
1706
1707 use std::fs::File;
1708
1709 use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
1710 use crate::arrow::{ARROW_SCHEMA_META_KEY, PARQUET_FIELD_ID_META_KEY};
1711 use crate::column::page::{Page, PageReader};
1712 use crate::file::metadata::thrift::PageHeader;
1713 use crate::file::page_index::column_index::ColumnIndexMetaData;
1714 use crate::file::reader::SerializedPageReader;
1715 use crate::parquet_thrift::{ReadThrift, ThriftSliceInputProtocol};
1716 use crate::schema::types::ColumnPath;
1717 use arrow::datatypes::ToByteSlice;
1718 use arrow::datatypes::{DataType, Schema};
1719 use arrow::error::Result as ArrowResult;
1720 use arrow::util::data_gen::create_random_array;
1721 use arrow::util::pretty::pretty_format_batches;
1722 use arrow::{array::*, buffer::Buffer};
1723 use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano, NullBuffer, OffsetBuffer, i256};
1724 use arrow_schema::Fields;
1725 use half::f16;
1726 use num_traits::{FromPrimitive, ToPrimitive};
1727 use tempfile::tempfile;
1728
1729 use crate::basic::Encoding;
1730 use crate::data_type::AsBytes;
1731 use crate::file::metadata::{ColumnChunkMetaData, ParquetMetaData, ParquetMetaDataReader};
1732 use crate::file::properties::{
1733 BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion,
1734 };
1735 use crate::file::serialized_reader::ReadOptionsBuilder;
1736 use crate::file::{
1737 reader::{FileReader, SerializedFileReader},
1738 statistics::Statistics,
1739 };
1740
1741 #[test]
1742 fn arrow_writer() {
1743 let schema = Schema::new(vec![
1745 Field::new("a", DataType::Int32, false),
1746 Field::new("b", DataType::Int32, true),
1747 ]);
1748
1749 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1751 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1752
1753 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
1755
1756 roundtrip(batch, Some(SMALL_SIZE / 2));
1757 }
1758
1759 fn get_bytes_after_close(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1760 let mut buffer = vec![];
1761
1762 let mut writer = ArrowWriter::try_new(&mut buffer, schema, None).unwrap();
1763 writer.write(expected_batch).unwrap();
1764 writer.close().unwrap();
1765
1766 buffer
1767 }
1768
1769 fn get_bytes_by_into_inner(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1770 let mut writer = ArrowWriter::try_new(Vec::new(), schema, None).unwrap();
1771 writer.write(expected_batch).unwrap();
1772 writer.into_inner().unwrap()
1773 }
1774
1775 #[test]
1776 fn roundtrip_bytes() {
1777 let schema = Arc::new(Schema::new(vec![
1779 Field::new("a", DataType::Int32, false),
1780 Field::new("b", DataType::Int32, true),
1781 ]));
1782
1783 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1785 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1786
1787 let expected_batch =
1789 RecordBatch::try_new(schema.clone(), vec![Arc::new(a), Arc::new(b)]).unwrap();
1790
1791 for buffer in [
1792 get_bytes_after_close(schema.clone(), &expected_batch),
1793 get_bytes_by_into_inner(schema, &expected_batch),
1794 ] {
1795 let cursor = Bytes::from(buffer);
1796 let mut record_batch_reader = ParquetRecordBatchReader::try_new(cursor, 1024).unwrap();
1797
1798 let actual_batch = record_batch_reader
1799 .next()
1800 .expect("No batch found")
1801 .expect("Unable to get batch");
1802
1803 assert_eq!(expected_batch.schema(), actual_batch.schema());
1804 assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
1805 assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
1806 for i in 0..expected_batch.num_columns() {
1807 let expected_data = expected_batch.column(i).to_data();
1808 let actual_data = actual_batch.column(i).to_data();
1809
1810 assert_eq!(expected_data, actual_data);
1811 }
1812 }
1813 }
1814
1815 #[test]
1816 fn arrow_writer_non_null() {
1817 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1819
1820 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1822
1823 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1825
1826 roundtrip(batch, Some(SMALL_SIZE / 2));
1827 }
1828
1829 #[test]
1830 fn arrow_writer_list() {
1831 let schema = Schema::new(vec![Field::new(
1833 "a",
1834 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
1835 true,
1836 )]);
1837
1838 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1840
1841 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1844
1845 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
1847 DataType::Int32,
1848 false,
1849 ))))
1850 .len(5)
1851 .add_buffer(a_value_offsets)
1852 .add_child_data(a_values.into_data())
1853 .null_bit_buffer(Some(Buffer::from([0b00011011])))
1854 .build()
1855 .unwrap();
1856 let a = ListArray::from(a_list_data);
1857
1858 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1860
1861 assert_eq!(batch.column(0).null_count(), 1);
1862
1863 roundtrip(batch, None);
1866 }
1867
1868 #[test]
1869 fn arrow_writer_list_non_null() {
1870 let schema = Schema::new(vec![Field::new(
1872 "a",
1873 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
1874 false,
1875 )]);
1876
1877 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1879
1880 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1883
1884 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
1886 DataType::Int32,
1887 false,
1888 ))))
1889 .len(5)
1890 .add_buffer(a_value_offsets)
1891 .add_child_data(a_values.into_data())
1892 .build()
1893 .unwrap();
1894 let a = ListArray::from(a_list_data);
1895
1896 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1898
1899 assert_eq!(batch.column(0).null_count(), 0);
1902
1903 roundtrip(batch, None);
1904 }
1905
1906 #[test]
1907 fn arrow_writer_list_view() {
1908 let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
1909 let schema = Schema::new(vec![Field::new(
1910 "a",
1911 DataType::ListView(list_field.clone()),
1912 true,
1913 )]);
1914
1915 let a = ListViewArray::new(
1917 list_field,
1918 vec![0, 1, 0, 3, 6].into(),
1919 vec![1, 2, 0, 3, 4].into(),
1920 Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
1921 Some(vec![true, true, false, true, true].into()),
1922 );
1923
1924 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1925
1926 assert_eq!(batch.column(0).null_count(), 1);
1927
1928 roundtrip(batch, None);
1929 }
1930
1931 #[test]
1932 fn arrow_writer_list_view_non_null() {
1933 let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
1934 let schema = Schema::new(vec![Field::new(
1935 "a",
1936 DataType::ListView(list_field.clone()),
1937 false,
1938 )]);
1939
1940 let a = ListViewArray::new(
1942 list_field,
1943 vec![0, 1, 0, 3, 6].into(),
1944 vec![1, 2, 0, 3, 4].into(),
1945 Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
1946 None,
1947 );
1948
1949 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1950
1951 assert_eq!(batch.column(0).null_count(), 0);
1952
1953 roundtrip(batch, None);
1954 }
1955
1956 #[test]
1957 fn arrow_writer_list_view_out_of_order() {
1958 let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
1959 let schema = Schema::new(vec![Field::new(
1960 "a",
1961 DataType::ListView(list_field.clone()),
1962 false,
1963 )]);
1964
1965 let a = ListViewArray::new(
1967 list_field,
1968 vec![0, 1, 0, 6, 3].into(),
1969 vec![1, 2, 0, 4, 3].into(),
1970 Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
1971 None,
1972 );
1973
1974 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1975
1976 roundtrip(batch, None);
1977 }
1978
1979 #[test]
1980 fn arrow_writer_large_list_view() {
1981 let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
1982 let schema = Schema::new(vec![Field::new(
1983 "a",
1984 DataType::LargeListView(list_field.clone()),
1985 true,
1986 )]);
1987
1988 let a = LargeListViewArray::new(
1990 list_field,
1991 vec![0i64, 1, 0, 3, 6].into(),
1992 vec![1i64, 2, 0, 3, 4].into(),
1993 Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
1994 Some(vec![true, true, false, true, true].into()),
1995 );
1996
1997 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1998
1999 assert_eq!(batch.column(0).null_count(), 1);
2000
2001 roundtrip(batch, None);
2002 }
2003
2004 #[test]
2005 fn arrow_writer_list_view_with_struct() {
2006 let struct_fields = Fields::from(vec![
2008 Field::new("id", DataType::Int32, false),
2009 Field::new("name", DataType::Utf8, false),
2010 ]);
2011 let struct_type = DataType::Struct(struct_fields.clone());
2012 let list_field = Arc::new(Field::new("item", struct_type.clone(), false));
2013
2014 let schema = Schema::new(vec![Field::new(
2015 "a",
2016 DataType::ListView(list_field.clone()),
2017 true,
2018 )]);
2019
2020 let id_array = Int32Array::from(vec![1, 2, 3, 4, 5]);
2022 let name_array = StringArray::from(vec!["a", "b", "c", "d", "e"]);
2023 let struct_array = StructArray::new(
2024 struct_fields,
2025 vec![Arc::new(id_array), Arc::new(name_array)],
2026 None,
2027 );
2028
2029 let list_view = ListViewArray::new(
2031 list_field,
2032 vec![0, 2, 2].into(), vec![2, 0, 3].into(), Arc::new(struct_array),
2035 Some(vec![true, false, true].into()),
2036 );
2037
2038 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(list_view)]).unwrap();
2039
2040 roundtrip(batch, None);
2041 }
2042
2043 #[test]
2044 fn arrow_writer_binary() {
2045 let string_field = Field::new("a", DataType::Utf8, false);
2046 let binary_field = Field::new("b", DataType::Binary, false);
2047 let schema = Schema::new(vec![string_field, binary_field]);
2048
2049 let raw_string_values = vec!["foo", "bar", "baz", "quux"];
2050 let raw_binary_values = [
2051 b"foo".to_vec(),
2052 b"bar".to_vec(),
2053 b"baz".to_vec(),
2054 b"quux".to_vec(),
2055 ];
2056 let raw_binary_value_refs = raw_binary_values
2057 .iter()
2058 .map(|x| x.as_slice())
2059 .collect::<Vec<_>>();
2060
2061 let string_values = StringArray::from(raw_string_values.clone());
2062 let binary_values = BinaryArray::from(raw_binary_value_refs);
2063 let batch = RecordBatch::try_new(
2064 Arc::new(schema),
2065 vec![Arc::new(string_values), Arc::new(binary_values)],
2066 )
2067 .unwrap();
2068
2069 roundtrip(batch, Some(SMALL_SIZE / 2));
2070 }
2071
2072 #[test]
2073 fn arrow_writer_binary_view() {
2074 let string_field = Field::new("a", DataType::Utf8View, false);
2075 let binary_field = Field::new("b", DataType::BinaryView, false);
2076 let nullable_string_field = Field::new("a", DataType::Utf8View, true);
2077 let schema = Schema::new(vec![string_field, binary_field, nullable_string_field]);
2078
2079 let raw_string_values = vec!["foo", "bar", "large payload over 12 bytes", "lulu"];
2080 let raw_binary_values = vec![
2081 b"foo".to_vec(),
2082 b"bar".to_vec(),
2083 b"large payload over 12 bytes".to_vec(),
2084 b"lulu".to_vec(),
2085 ];
2086 let nullable_string_values =
2087 vec![Some("foo"), None, Some("large payload over 12 bytes"), None];
2088
2089 let string_view_values = StringViewArray::from(raw_string_values);
2090 let binary_view_values = BinaryViewArray::from_iter_values(raw_binary_values);
2091 let nullable_string_view_values = StringViewArray::from(nullable_string_values);
2092 let batch = RecordBatch::try_new(
2093 Arc::new(schema),
2094 vec![
2095 Arc::new(string_view_values),
2096 Arc::new(binary_view_values),
2097 Arc::new(nullable_string_view_values),
2098 ],
2099 )
2100 .unwrap();
2101
2102 roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
2103 roundtrip(batch, None);
2104 }
2105
2106 #[test]
2107 fn arrow_writer_binary_view_long_value() {
2108 let string_field = Field::new("a", DataType::Utf8View, false);
2109 let binary_field = Field::new("b", DataType::BinaryView, false);
2110 let schema = Schema::new(vec![string_field, binary_field]);
2111
2112 let long = "a".repeat(128);
2116 let raw_string_values = vec!["foo", long.as_str(), "bar"];
2117 let raw_binary_values = vec![b"foo".to_vec(), long.as_bytes().to_vec(), b"bar".to_vec()];
2118
2119 let string_view_values: ArrayRef = Arc::new(StringViewArray::from(raw_string_values));
2120 let binary_view_values: ArrayRef =
2121 Arc::new(BinaryViewArray::from_iter_values(raw_binary_values));
2122
2123 one_column_roundtrip(Arc::clone(&string_view_values), false);
2124 one_column_roundtrip(Arc::clone(&binary_view_values), false);
2125
2126 let batch = RecordBatch::try_new(
2127 Arc::new(schema),
2128 vec![string_view_values, binary_view_values],
2129 )
2130 .unwrap();
2131
2132 for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2134 let props = WriterProperties::builder()
2135 .set_writer_version(version)
2136 .set_dictionary_enabled(false)
2137 .build();
2138 roundtrip_opts(&batch, props);
2139 }
2140 }
2141
2142 fn get_decimal_batch(precision: u8, scale: i8) -> RecordBatch {
2143 let decimal_field = Field::new("a", DataType::Decimal128(precision, scale), false);
2144 let schema = Schema::new(vec![decimal_field]);
2145
2146 let decimal_values = vec![10_000, 50_000, 0, -100]
2147 .into_iter()
2148 .map(Some)
2149 .collect::<Decimal128Array>()
2150 .with_precision_and_scale(precision, scale)
2151 .unwrap();
2152
2153 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(decimal_values)]).unwrap()
2154 }
2155
2156 #[test]
2157 fn arrow_writer_decimal() {
2158 let batch_int32_decimal = get_decimal_batch(5, 2);
2160 roundtrip(batch_int32_decimal, Some(SMALL_SIZE / 2));
2161 let batch_int64_decimal = get_decimal_batch(12, 2);
2163 roundtrip(batch_int64_decimal, Some(SMALL_SIZE / 2));
2164 let batch_fixed_len_byte_array_decimal = get_decimal_batch(30, 2);
2166 roundtrip(batch_fixed_len_byte_array_decimal, Some(SMALL_SIZE / 2));
2167 }
2168
2169 #[test]
2170 fn arrow_writer_complex() {
2171 let struct_field_d = Arc::new(Field::new("d", DataType::Float64, true));
2173 let struct_field_f = Arc::new(Field::new("f", DataType::Float32, true));
2174 let struct_field_g = Arc::new(Field::new_list(
2175 "g",
2176 Field::new_list_field(DataType::Int16, true),
2177 false,
2178 ));
2179 let struct_field_h = Arc::new(Field::new_list(
2180 "h",
2181 Field::new_list_field(DataType::Int16, false),
2182 true,
2183 ));
2184 let struct_field_e = Arc::new(Field::new_struct(
2185 "e",
2186 vec![
2187 struct_field_f.clone(),
2188 struct_field_g.clone(),
2189 struct_field_h.clone(),
2190 ],
2191 false,
2192 ));
2193 let schema = Schema::new(vec![
2194 Field::new("a", DataType::Int32, false),
2195 Field::new("b", DataType::Int32, true),
2196 Field::new_struct(
2197 "c",
2198 vec![struct_field_d.clone(), struct_field_e.clone()],
2199 false,
2200 ),
2201 ]);
2202
2203 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2205 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
2206 let d = Float64Array::from(vec![None, None, None, Some(1.0), None]);
2207 let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]);
2208
2209 let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2210
2211 let g_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
2214
2215 let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
2217 .len(5)
2218 .add_buffer(g_value_offsets.clone())
2219 .add_child_data(g_value.to_data())
2220 .build()
2221 .unwrap();
2222 let g = ListArray::from(g_list_data);
2223 let h_list_data = ArrayData::builder(struct_field_h.data_type().clone())
2225 .len(5)
2226 .add_buffer(g_value_offsets)
2227 .add_child_data(g_value.to_data())
2228 .null_bit_buffer(Some(Buffer::from([0b00011011])))
2229 .build()
2230 .unwrap();
2231 let h = ListArray::from(h_list_data);
2232
2233 let e = StructArray::from(vec![
2234 (struct_field_f, Arc::new(f) as ArrayRef),
2235 (struct_field_g, Arc::new(g) as ArrayRef),
2236 (struct_field_h, Arc::new(h) as ArrayRef),
2237 ]);
2238
2239 let c = StructArray::from(vec![
2240 (struct_field_d, Arc::new(d) as ArrayRef),
2241 (struct_field_e, Arc::new(e) as ArrayRef),
2242 ]);
2243
2244 let batch = RecordBatch::try_new(
2246 Arc::new(schema),
2247 vec![Arc::new(a), Arc::new(b), Arc::new(c)],
2248 )
2249 .unwrap();
2250
2251 roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
2252 roundtrip(batch, Some(SMALL_SIZE / 3));
2253 }
2254
2255 #[test]
2256 fn arrow_writer_complex_mixed() {
2257 let offset_field = Arc::new(Field::new("offset", DataType::Int32, false));
2262 let partition_field = Arc::new(Field::new("partition", DataType::Int64, true));
2263 let topic_field = Arc::new(Field::new("topic", DataType::Utf8, true));
2264 let schema = Schema::new(vec![Field::new(
2265 "some_nested_object",
2266 DataType::Struct(Fields::from(vec![
2267 offset_field.clone(),
2268 partition_field.clone(),
2269 topic_field.clone(),
2270 ])),
2271 false,
2272 )]);
2273
2274 let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
2276 let partition = Int64Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
2277 let topic = StringArray::from(vec![Some("A"), None, Some("A"), Some(""), None]);
2278
2279 let some_nested_object = StructArray::from(vec![
2280 (offset_field, Arc::new(offset) as ArrayRef),
2281 (partition_field, Arc::new(partition) as ArrayRef),
2282 (topic_field, Arc::new(topic) as ArrayRef),
2283 ]);
2284
2285 let batch =
2287 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
2288
2289 roundtrip(batch, Some(SMALL_SIZE / 2));
2290 }
2291
2292 #[test]
2293 fn arrow_writer_map() {
2294 let json_content = r#"
2296 {"stocks":{"long": "$AAA", "short": "$BBB"}}
2297 {"stocks":{"long": null, "long": "$CCC", "short": null}}
2298 {"stocks":{"hedged": "$YYY", "long": null, "short": "$D"}}
2299 "#;
2300 let entries_struct_type = DataType::Struct(Fields::from(vec![
2301 Field::new("key", DataType::Utf8, false),
2302 Field::new("value", DataType::Utf8, true),
2303 ]));
2304 let stocks_field = Field::new(
2305 "stocks",
2306 DataType::Map(
2307 Arc::new(Field::new("entries", entries_struct_type, false)),
2308 false,
2309 ),
2310 true,
2311 );
2312 let schema = Arc::new(Schema::new(vec![stocks_field]));
2313 let builder = arrow::json::ReaderBuilder::new(schema).with_batch_size(64);
2314 let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
2315
2316 let batch = reader.next().unwrap().unwrap();
2317 roundtrip(batch, None);
2318 }
2319
2320 #[test]
2321 fn arrow_writer_2_level_struct() {
2322 let field_c = Field::new("c", DataType::Int32, true);
2324 let field_b = Field::new("b", DataType::Struct(vec![field_c].into()), true);
2325 let type_a = DataType::Struct(vec![field_b.clone()].into());
2326 let field_a = Field::new("a", type_a, true);
2327 let schema = Schema::new(vec![field_a.clone()]);
2328
2329 let c = Int32Array::from(vec![Some(1), None, Some(3), None, None, Some(6)]);
2331 let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
2332 .len(6)
2333 .null_bit_buffer(Some(Buffer::from([0b00100111])))
2334 .add_child_data(c.into_data())
2335 .build()
2336 .unwrap();
2337 let b = StructArray::from(b_data);
2338 let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
2339 .len(6)
2340 .null_bit_buffer(Some(Buffer::from([0b00101111])))
2341 .add_child_data(b.into_data())
2342 .build()
2343 .unwrap();
2344 let a = StructArray::from(a_data);
2345
2346 assert_eq!(a.null_count(), 1);
2347 assert_eq!(a.column(0).null_count(), 2);
2348
2349 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2351
2352 roundtrip(batch, Some(SMALL_SIZE / 2));
2353 }
2354
2355 #[test]
2356 fn arrow_writer_2_level_struct_non_null() {
2357 let field_c = Field::new("c", DataType::Int32, false);
2359 let type_b = DataType::Struct(vec![field_c].into());
2360 let field_b = Field::new("b", type_b.clone(), false);
2361 let type_a = DataType::Struct(vec![field_b].into());
2362 let field_a = Field::new("a", type_a.clone(), false);
2363 let schema = Schema::new(vec![field_a]);
2364
2365 let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
2367 let b_data = ArrayDataBuilder::new(type_b)
2368 .len(6)
2369 .add_child_data(c.into_data())
2370 .build()
2371 .unwrap();
2372 let b = StructArray::from(b_data);
2373 let a_data = ArrayDataBuilder::new(type_a)
2374 .len(6)
2375 .add_child_data(b.into_data())
2376 .build()
2377 .unwrap();
2378 let a = StructArray::from(a_data);
2379
2380 assert_eq!(a.null_count(), 0);
2381 assert_eq!(a.column(0).null_count(), 0);
2382
2383 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2385
2386 roundtrip(batch, Some(SMALL_SIZE / 2));
2387 }
2388
2389 #[test]
2390 fn arrow_writer_2_level_struct_mixed_null() {
2391 let field_c = Field::new("c", DataType::Int32, false);
2393 let type_b = DataType::Struct(vec![field_c].into());
2394 let field_b = Field::new("b", type_b.clone(), true);
2395 let type_a = DataType::Struct(vec![field_b].into());
2396 let field_a = Field::new("a", type_a.clone(), false);
2397 let schema = Schema::new(vec![field_a]);
2398
2399 let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
2401 let b_data = ArrayDataBuilder::new(type_b)
2402 .len(6)
2403 .null_bit_buffer(Some(Buffer::from([0b00100111])))
2404 .add_child_data(c.into_data())
2405 .build()
2406 .unwrap();
2407 let b = StructArray::from(b_data);
2408 let a_data = ArrayDataBuilder::new(type_a)
2410 .len(6)
2411 .add_child_data(b.into_data())
2412 .build()
2413 .unwrap();
2414 let a = StructArray::from(a_data);
2415
2416 assert_eq!(a.null_count(), 0);
2417 assert_eq!(a.column(0).null_count(), 2);
2418
2419 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2421
2422 roundtrip(batch, Some(SMALL_SIZE / 2));
2423 }
2424
2425 #[test]
2426 fn arrow_writer_2_level_struct_mixed_null_2() {
2427 let field_c = Field::new("c", DataType::Int32, false);
2429 let field_d = Field::new("d", DataType::FixedSizeBinary(4), false);
2430 let field_e = Field::new(
2431 "e",
2432 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2433 false,
2434 );
2435
2436 let field_b = Field::new(
2437 "b",
2438 DataType::Struct(vec![field_c, field_d, field_e].into()),
2439 false,
2440 );
2441 let type_a = DataType::Struct(vec![field_b.clone()].into());
2442 let field_a = Field::new("a", type_a, true);
2443 let schema = Schema::new(vec![field_a.clone()]);
2444
2445 let c = Int32Array::from_iter_values(0..6);
2447 let d = FixedSizeBinaryArray::try_from_iter(
2448 ["aaaa", "bbbb", "cccc", "dddd", "eeee", "ffff"].into_iter(),
2449 )
2450 .expect("four byte values");
2451 let e = Int32DictionaryArray::from_iter(["one", "two", "three", "four", "five", "one"]);
2452 let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
2453 .len(6)
2454 .add_child_data(c.into_data())
2455 .add_child_data(d.into_data())
2456 .add_child_data(e.into_data())
2457 .build()
2458 .unwrap();
2459 let b = StructArray::from(b_data);
2460 let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
2461 .len(6)
2462 .null_bit_buffer(Some(Buffer::from([0b00100101])))
2463 .add_child_data(b.into_data())
2464 .build()
2465 .unwrap();
2466 let a = StructArray::from(a_data);
2467
2468 assert_eq!(a.null_count(), 3);
2469 assert_eq!(a.column(0).null_count(), 0);
2470
2471 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2473
2474 roundtrip(batch, Some(SMALL_SIZE / 2));
2475 }
2476
2477 #[test]
2478 fn test_fixed_size_binary_in_dict() {
2479 fn test_fixed_size_binary_in_dict_inner<K>()
2480 where
2481 K: ArrowDictionaryKeyType,
2482 K::Native: FromPrimitive + ToPrimitive + TryFrom<u8>,
2483 <<K as arrow_array::ArrowPrimitiveType>::Native as TryFrom<u8>>::Error: std::fmt::Debug,
2484 {
2485 let field = Field::new(
2486 "a",
2487 DataType::Dictionary(
2488 Box::new(K::DATA_TYPE),
2489 Box::new(DataType::FixedSizeBinary(4)),
2490 ),
2491 false,
2492 );
2493 let schema = Schema::new(vec![field]);
2494
2495 let keys: Vec<K::Native> = vec![
2496 K::Native::try_from(0u8).unwrap(),
2497 K::Native::try_from(0u8).unwrap(),
2498 K::Native::try_from(1u8).unwrap(),
2499 ];
2500 let keys = PrimitiveArray::<K>::from_iter_values(keys);
2501 let values = FixedSizeBinaryArray::try_from_iter(
2502 vec![vec![0, 0, 0, 0], vec![1, 1, 1, 1]].into_iter(),
2503 )
2504 .unwrap();
2505
2506 let data = DictionaryArray::<K>::new(keys, Arc::new(values));
2507 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(data)]).unwrap();
2508 roundtrip(batch, None);
2509 }
2510
2511 test_fixed_size_binary_in_dict_inner::<UInt8Type>();
2512 test_fixed_size_binary_in_dict_inner::<UInt16Type>();
2513 test_fixed_size_binary_in_dict_inner::<UInt32Type>();
2514 test_fixed_size_binary_in_dict_inner::<UInt16Type>();
2515 test_fixed_size_binary_in_dict_inner::<Int8Type>();
2516 test_fixed_size_binary_in_dict_inner::<Int16Type>();
2517 test_fixed_size_binary_in_dict_inner::<Int32Type>();
2518 test_fixed_size_binary_in_dict_inner::<Int64Type>();
2519 }
2520
2521 #[test]
2522 fn test_empty_dict() {
2523 let struct_fields = Fields::from(vec![Field::new(
2524 "dict",
2525 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2526 false,
2527 )]);
2528
2529 let schema = Schema::new(vec![Field::new_struct(
2530 "struct",
2531 struct_fields.clone(),
2532 true,
2533 )]);
2534 let dictionary = Arc::new(DictionaryArray::new(
2535 Int32Array::new_null(5),
2536 Arc::new(StringArray::new_null(0)),
2537 ));
2538
2539 let s = StructArray::new(
2540 struct_fields,
2541 vec![dictionary],
2542 Some(NullBuffer::new_null(5)),
2543 );
2544
2545 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(s)]).unwrap();
2546 roundtrip(batch, None);
2547 }
2548 #[test]
2549 fn arrow_writer_page_size() {
2550 let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)]));
2551
2552 let mut builder = StringBuilder::with_capacity(100, 329 * 10_000);
2553
2554 for i in 0..10 {
2556 let value = i
2557 .to_string()
2558 .repeat(10)
2559 .chars()
2560 .take(10)
2561 .collect::<String>();
2562
2563 builder.append_value(value);
2564 }
2565
2566 let array = Arc::new(builder.finish());
2567
2568 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
2569
2570 let file = tempfile::tempfile().unwrap();
2571
2572 let props = WriterProperties::builder()
2574 .set_data_page_size_limit(1)
2575 .set_dictionary_page_size_limit(1)
2576 .set_write_batch_size(1)
2577 .build();
2578
2579 let mut writer =
2580 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
2581 .expect("Unable to write file");
2582 writer.write(&batch).unwrap();
2583 writer.close().unwrap();
2584
2585 let options = ReadOptionsBuilder::new().with_page_index().build();
2586 let reader =
2587 SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap();
2588
2589 let column = reader.metadata().row_group(0).columns();
2590
2591 assert_eq!(column.len(), 1);
2592
2593 assert!(
2596 column[0].dictionary_page_offset().is_some(),
2597 "Expected a dictionary page"
2598 );
2599
2600 assert!(reader.metadata().offset_index().is_some());
2601 let offset_indexes = &reader.metadata().offset_index().unwrap()[0];
2602
2603 let page_locations = offset_indexes[0].page_locations.clone();
2604
2605 assert_eq!(
2608 page_locations.len(),
2609 10,
2610 "Expected 10 pages but got {page_locations:#?}"
2611 );
2612 }
2613
2614 #[test]
2615 fn arrow_writer_float_nans() {
2616 let f16_field = Field::new("a", DataType::Float16, false);
2617 let f32_field = Field::new("b", DataType::Float32, false);
2618 let f64_field = Field::new("c", DataType::Float64, false);
2619 let schema = Schema::new(vec![f16_field, f32_field, f64_field]);
2620
2621 let f16_values = (0..MEDIUM_SIZE)
2622 .map(|i| {
2623 Some(if i % 2 == 0 {
2624 f16::NAN
2625 } else {
2626 f16::from_f32(i as f32)
2627 })
2628 })
2629 .collect::<Float16Array>();
2630
2631 let f32_values = (0..MEDIUM_SIZE)
2632 .map(|i| Some(if i % 2 == 0 { f32::NAN } else { i as f32 }))
2633 .collect::<Float32Array>();
2634
2635 let f64_values = (0..MEDIUM_SIZE)
2636 .map(|i| Some(if i % 2 == 0 { f64::NAN } else { i as f64 }))
2637 .collect::<Float64Array>();
2638
2639 let batch = RecordBatch::try_new(
2640 Arc::new(schema),
2641 vec![
2642 Arc::new(f16_values),
2643 Arc::new(f32_values),
2644 Arc::new(f64_values),
2645 ],
2646 )
2647 .unwrap();
2648
2649 roundtrip(batch, None);
2650 }
2651
2652 const SMALL_SIZE: usize = 7;
2653 const MEDIUM_SIZE: usize = 63;
2654
2655 fn roundtrip(expected_batch: RecordBatch, max_row_group_size: Option<usize>) -> Vec<Bytes> {
2658 let mut files = vec![];
2659 for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2660 let mut props = WriterProperties::builder().set_writer_version(version);
2661
2662 if let Some(size) = max_row_group_size {
2663 props = props.set_max_row_group_row_count(Some(size))
2664 }
2665
2666 let props = props.build();
2667 files.push(roundtrip_opts(&expected_batch, props))
2668 }
2669 files
2670 }
2671
2672 fn roundtrip_opts_with_array_validation<F>(
2676 expected_batch: &RecordBatch,
2677 props: WriterProperties,
2678 validate: F,
2679 ) -> Bytes
2680 where
2681 F: Fn(&ArrayData, &ArrayData),
2682 {
2683 let mut file = vec![];
2684
2685 let mut writer = ArrowWriter::try_new(&mut file, expected_batch.schema(), Some(props))
2686 .expect("Unable to write file");
2687 writer.write(expected_batch).unwrap();
2688 writer.close().unwrap();
2689
2690 let file = Bytes::from(file);
2691 let mut record_batch_reader =
2692 ParquetRecordBatchReader::try_new(file.clone(), 1024).unwrap();
2693
2694 let actual_batch = record_batch_reader
2695 .next()
2696 .expect("No batch found")
2697 .expect("Unable to get batch");
2698
2699 assert_eq!(expected_batch.schema(), actual_batch.schema());
2700 assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
2701 assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
2702 for i in 0..expected_batch.num_columns() {
2703 let expected_data = expected_batch.column(i).to_data();
2704 let actual_data = actual_batch.column(i).to_data();
2705 validate(&expected_data, &actual_data);
2706 }
2707
2708 file
2709 }
2710
2711 fn roundtrip_opts(expected_batch: &RecordBatch, props: WriterProperties) -> Bytes {
2712 roundtrip_opts_with_array_validation(expected_batch, props, |a, b| {
2713 a.validate_full().expect("valid expected data");
2714 b.validate_full().expect("valid actual data");
2715 assert_eq!(a, b)
2716 })
2717 }
2718
2719 struct RoundTripOptions {
2720 values: ArrayRef,
2721 schema: SchemaRef,
2722 bloom_filter: bool,
2723 bloom_filter_ndv: Option<u64>,
2724 bloom_filter_position: BloomFilterPosition,
2725 }
2726
2727 impl RoundTripOptions {
2728 fn new(values: ArrayRef, nullable: bool) -> Self {
2729 let data_type = values.data_type().clone();
2730 let schema = Schema::new(vec![Field::new("col", data_type, nullable)]);
2731 Self {
2732 values,
2733 schema: Arc::new(schema),
2734 bloom_filter: false,
2735 bloom_filter_ndv: None,
2736 bloom_filter_position: BloomFilterPosition::AfterRowGroup,
2737 }
2738 }
2739 }
2740
2741 fn one_column_roundtrip(values: ArrayRef, nullable: bool) -> Vec<Bytes> {
2742 one_column_roundtrip_with_options(RoundTripOptions::new(values, nullable))
2743 }
2744
2745 fn one_column_roundtrip_with_schema(values: ArrayRef, schema: SchemaRef) -> Vec<Bytes> {
2746 let mut options = RoundTripOptions::new(values, false);
2747 options.schema = schema;
2748 one_column_roundtrip_with_options(options)
2749 }
2750
2751 fn one_column_roundtrip_with_options(options: RoundTripOptions) -> Vec<Bytes> {
2752 let RoundTripOptions {
2753 values,
2754 schema,
2755 bloom_filter,
2756 bloom_filter_ndv,
2757 bloom_filter_position,
2758 } = options;
2759
2760 let encodings = match values.data_type() {
2761 DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary | DataType::LargeBinary => {
2762 vec![
2763 Encoding::PLAIN,
2764 Encoding::DELTA_BYTE_ARRAY,
2765 Encoding::DELTA_LENGTH_BYTE_ARRAY,
2766 ]
2767 }
2768 DataType::Int64
2769 | DataType::Int32
2770 | DataType::Int16
2771 | DataType::Int8
2772 | DataType::UInt64
2773 | DataType::UInt32
2774 | DataType::UInt16
2775 | DataType::UInt8 => vec![
2776 Encoding::PLAIN,
2777 Encoding::DELTA_BINARY_PACKED,
2778 Encoding::BYTE_STREAM_SPLIT,
2779 ],
2780 DataType::Float32 | DataType::Float64 => {
2781 vec![Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT]
2782 }
2783 _ => vec![Encoding::PLAIN],
2784 };
2785
2786 let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2787
2788 let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
2789
2790 let mut files = vec![];
2791 for dictionary_size in [0, 1, 1024] {
2792 for encoding in &encodings {
2793 for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2794 for row_group_size in row_group_sizes {
2795 let mut builder = WriterProperties::builder()
2796 .set_writer_version(version)
2797 .set_max_row_group_row_count(Some(row_group_size))
2798 .set_dictionary_enabled(dictionary_size != 0)
2799 .set_dictionary_page_size_limit(dictionary_size.max(1))
2800 .set_encoding(*encoding)
2801 .set_bloom_filter_enabled(bloom_filter)
2802 .set_bloom_filter_position(bloom_filter_position);
2803 if let Some(ndv) = bloom_filter_ndv {
2804 builder = builder.set_bloom_filter_max_ndv(ndv);
2805 }
2806 let props = builder.build();
2807
2808 files.push(roundtrip_opts(&expected_batch, props))
2809 }
2810 }
2811 }
2812 }
2813 files
2814 }
2815
2816 fn values_required<A, I>(iter: I) -> Vec<Bytes>
2817 where
2818 A: From<Vec<I::Item>> + Array + 'static,
2819 I: IntoIterator,
2820 {
2821 let raw_values: Vec<_> = iter.into_iter().collect();
2822 let values = Arc::new(A::from(raw_values));
2823 one_column_roundtrip(values, false)
2824 }
2825
2826 fn values_optional<A, I>(iter: I) -> Vec<Bytes>
2827 where
2828 A: From<Vec<Option<I::Item>>> + Array + 'static,
2829 I: IntoIterator,
2830 {
2831 let optional_raw_values: Vec<_> = iter
2832 .into_iter()
2833 .enumerate()
2834 .map(|(i, v)| if i % 2 == 0 { None } else { Some(v) })
2835 .collect();
2836 let optional_values = Arc::new(A::from(optional_raw_values));
2837 one_column_roundtrip(optional_values, true)
2838 }
2839
2840 fn required_and_optional<A, I>(iter: I)
2841 where
2842 A: From<Vec<I::Item>> + From<Vec<Option<I::Item>>> + Array + 'static,
2843 I: IntoIterator + Clone,
2844 {
2845 values_required::<A, I>(iter.clone());
2846 values_optional::<A, I>(iter);
2847 }
2848
2849 fn check_bloom_filter<T: AsBytes>(
2850 files: Vec<Bytes>,
2851 file_column: String,
2852 positive_values: Vec<T>,
2853 negative_values: Vec<T>,
2854 ) {
2855 files.into_iter().take(1).for_each(|file| {
2856 let file_reader = SerializedFileReader::new_with_options(
2857 file,
2858 ReadOptionsBuilder::new()
2859 .with_reader_properties(
2860 ReaderProperties::builder()
2861 .set_read_bloom_filter(true)
2862 .build(),
2863 )
2864 .build(),
2865 )
2866 .expect("Unable to open file as Parquet");
2867 let metadata = file_reader.metadata();
2868
2869 let mut bloom_filters: Vec<_> = vec![];
2871 for (ri, row_group) in metadata.row_groups().iter().enumerate() {
2872 if let Some((column_index, _)) = row_group
2873 .columns()
2874 .iter()
2875 .enumerate()
2876 .find(|(_, column)| column.column_path().string() == file_column)
2877 {
2878 let row_group_reader = file_reader
2879 .get_row_group(ri)
2880 .expect("Unable to read row group");
2881 if let Some(sbbf) = row_group_reader.get_column_bloom_filter(column_index) {
2882 bloom_filters.push(sbbf.clone());
2883 } else {
2884 panic!("No bloom filter for column named {file_column} found");
2885 }
2886 } else {
2887 panic!("No column named {file_column} found");
2888 }
2889 }
2890
2891 positive_values.iter().for_each(|value| {
2892 let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2893 assert!(
2894 found.is_some(),
2895 "{}",
2896 format!("Value {:?} should be in bloom filter", value.as_bytes())
2897 );
2898 });
2899
2900 negative_values.iter().for_each(|value| {
2901 let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2902 assert!(
2903 found.is_none(),
2904 "{}",
2905 format!("Value {:?} should not be in bloom filter", value.as_bytes())
2906 );
2907 });
2908 });
2909 }
2910
2911 #[test]
2912 fn all_null_primitive_single_column() {
2913 let values = Arc::new(Int32Array::from(vec![None; SMALL_SIZE]));
2914 one_column_roundtrip(values, true);
2915 }
2916 #[test]
2917 fn null_single_column() {
2918 let values = Arc::new(NullArray::new(SMALL_SIZE));
2919 one_column_roundtrip(values, true);
2920 }
2922
2923 #[test]
2924 fn bool_single_column() {
2925 required_and_optional::<BooleanArray, _>(
2926 [true, false].iter().cycle().copied().take(SMALL_SIZE),
2927 );
2928 }
2929
2930 #[test]
2931 fn bool_large_single_column() {
2932 let values = Arc::new(
2933 [None, Some(true), Some(false)]
2934 .iter()
2935 .cycle()
2936 .copied()
2937 .take(200_000)
2938 .collect::<BooleanArray>(),
2939 );
2940 let schema = Schema::new(vec![Field::new("col", values.data_type().clone(), true)]);
2941 let expected_batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2942 let file = tempfile::tempfile().unwrap();
2943
2944 let mut writer =
2945 ArrowWriter::try_new(file.try_clone().unwrap(), expected_batch.schema(), None)
2946 .expect("Unable to write file");
2947 writer.write(&expected_batch).unwrap();
2948 writer.close().unwrap();
2949 }
2950
2951 #[test]
2952 fn check_page_offset_index_with_nan() {
2953 let values = Arc::new(Float64Array::from(vec![f64::NAN; 10]));
2954 let schema = Schema::new(vec![Field::new("col", DataType::Float64, true)]);
2955 let batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2956
2957 let mut out = Vec::with_capacity(1024);
2958 let mut writer =
2959 ArrowWriter::try_new(&mut out, batch.schema(), None).expect("Unable to write file");
2960 writer.write(&batch).unwrap();
2961 let file_meta_data = writer.close().unwrap();
2962 for row_group in file_meta_data.row_groups() {
2963 for column in row_group.columns() {
2964 assert!(column.offset_index_offset().is_some());
2965 assert!(column.offset_index_length().is_some());
2966 assert!(column.column_index_offset().is_none());
2967 assert!(column.column_index_length().is_none());
2968 }
2969 }
2970 }
2971
2972 #[test]
2973 fn i8_single_column() {
2974 required_and_optional::<Int8Array, _>(0..SMALL_SIZE as i8);
2975 }
2976
2977 #[test]
2978 fn i16_single_column() {
2979 required_and_optional::<Int16Array, _>(0..SMALL_SIZE as i16);
2980 }
2981
2982 #[test]
2983 fn i32_single_column() {
2984 required_and_optional::<Int32Array, _>(0..SMALL_SIZE as i32);
2985 }
2986
2987 #[test]
2988 fn i64_single_column() {
2989 required_and_optional::<Int64Array, _>(0..SMALL_SIZE as i64);
2990 }
2991
2992 #[test]
2993 fn u8_single_column() {
2994 required_and_optional::<UInt8Array, _>(0..SMALL_SIZE as u8);
2995 }
2996
2997 #[test]
2998 fn u16_single_column() {
2999 required_and_optional::<UInt16Array, _>(0..SMALL_SIZE as u16);
3000 }
3001
3002 #[test]
3003 fn u32_single_column() {
3004 required_and_optional::<UInt32Array, _>(0..SMALL_SIZE as u32);
3005 }
3006
3007 #[test]
3008 fn u64_single_column() {
3009 required_and_optional::<UInt64Array, _>(0..SMALL_SIZE as u64);
3010 }
3011
3012 #[test]
3013 fn f32_single_column() {
3014 required_and_optional::<Float32Array, _>((0..SMALL_SIZE).map(|i| i as f32));
3015 }
3016
3017 #[test]
3018 fn f64_single_column() {
3019 required_and_optional::<Float64Array, _>((0..SMALL_SIZE).map(|i| i as f64));
3020 }
3021
3022 #[test]
3027 fn timestamp_second_single_column() {
3028 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
3029 let values = Arc::new(TimestampSecondArray::from(raw_values));
3030
3031 one_column_roundtrip(values, false);
3032 }
3033
3034 #[test]
3035 fn timestamp_millisecond_single_column() {
3036 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
3037 let values = Arc::new(TimestampMillisecondArray::from(raw_values));
3038
3039 one_column_roundtrip(values, false);
3040 }
3041
3042 #[test]
3043 fn timestamp_microsecond_single_column() {
3044 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
3045 let values = Arc::new(TimestampMicrosecondArray::from(raw_values));
3046
3047 one_column_roundtrip(values, false);
3048 }
3049
3050 #[test]
3051 fn timestamp_nanosecond_single_column() {
3052 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
3053 let values = Arc::new(TimestampNanosecondArray::from(raw_values));
3054
3055 one_column_roundtrip(values, false);
3056 }
3057
3058 #[test]
3059 fn date32_single_column() {
3060 required_and_optional::<Date32Array, _>(0..SMALL_SIZE as i32);
3061 }
3062
3063 #[test]
3064 fn date64_single_column() {
3065 required_and_optional::<Date64Array, _>(
3067 (0..(SMALL_SIZE as i64 * 86400000)).step_by(86400000),
3068 );
3069 }
3070
3071 #[test]
3072 fn time32_second_single_column() {
3073 required_and_optional::<Time32SecondArray, _>(0..SMALL_SIZE as i32);
3074 }
3075
3076 #[test]
3077 fn time32_millisecond_single_column() {
3078 required_and_optional::<Time32MillisecondArray, _>(0..SMALL_SIZE as i32);
3079 }
3080
3081 #[test]
3082 fn time64_microsecond_single_column() {
3083 required_and_optional::<Time64MicrosecondArray, _>(0..SMALL_SIZE as i64);
3084 }
3085
3086 #[test]
3087 fn time64_nanosecond_single_column() {
3088 required_and_optional::<Time64NanosecondArray, _>(0..SMALL_SIZE as i64);
3089 }
3090
3091 #[test]
3092 fn duration_second_single_column() {
3093 required_and_optional::<DurationSecondArray, _>(0..SMALL_SIZE as i64);
3094 }
3095
3096 #[test]
3097 fn duration_millisecond_single_column() {
3098 required_and_optional::<DurationMillisecondArray, _>(0..SMALL_SIZE as i64);
3099 }
3100
3101 #[test]
3102 fn duration_microsecond_single_column() {
3103 required_and_optional::<DurationMicrosecondArray, _>(0..SMALL_SIZE as i64);
3104 }
3105
3106 #[test]
3107 fn duration_nanosecond_single_column() {
3108 required_and_optional::<DurationNanosecondArray, _>(0..SMALL_SIZE as i64);
3109 }
3110
3111 #[test]
3112 fn interval_year_month_single_column() {
3113 required_and_optional::<IntervalYearMonthArray, _>(0..SMALL_SIZE as i32);
3114 }
3115
3116 #[test]
3117 fn interval_day_time_single_column() {
3118 required_and_optional::<IntervalDayTimeArray, _>(vec![
3119 IntervalDayTime::new(0, 1),
3120 IntervalDayTime::new(0, 3),
3121 IntervalDayTime::new(3, -2),
3122 IntervalDayTime::new(-200, 4),
3123 ]);
3124 }
3125
3126 #[test]
3127 #[should_panic(
3128 expected = "Attempting to write an Arrow interval type MonthDayNano to parquet that is not yet implemented"
3129 )]
3130 fn interval_month_day_nano_single_column() {
3131 required_and_optional::<IntervalMonthDayNanoArray, _>(vec![
3132 IntervalMonthDayNano::new(0, 1, 5),
3133 IntervalMonthDayNano::new(0, 3, 2),
3134 IntervalMonthDayNano::new(3, -2, -5),
3135 IntervalMonthDayNano::new(-200, 4, -1),
3136 ]);
3137 }
3138
3139 #[test]
3140 fn binary_single_column() {
3141 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3142 let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3143 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3144
3145 values_required::<BinaryArray, _>(many_vecs_iter);
3147 }
3148
3149 #[test]
3150 fn binary_view_single_column() {
3151 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3152 let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3153 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3154
3155 values_required::<BinaryViewArray, _>(many_vecs_iter);
3157 }
3158
3159 #[test]
3160 fn i32_column_bloom_filter_at_end() {
3161 let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
3162 let mut options = RoundTripOptions::new(array, false);
3163 options.bloom_filter = true;
3164 options.bloom_filter_position = BloomFilterPosition::End;
3165
3166 let files = one_column_roundtrip_with_options(options);
3167 check_bloom_filter(
3168 files,
3169 "col".to_string(),
3170 (0..SMALL_SIZE as i32).collect(),
3171 (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3172 );
3173 }
3174
3175 #[test]
3176 fn i32_column_bloom_filter() {
3177 let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
3178 let mut options = RoundTripOptions::new(array, false);
3179 options.bloom_filter = true;
3180
3181 let files = one_column_roundtrip_with_options(options);
3182 check_bloom_filter(
3183 files,
3184 "col".to_string(),
3185 (0..SMALL_SIZE as i32).collect(),
3186 (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3187 );
3188 }
3189
3190 #[test]
3195 fn i32_column_bloom_filter_fixed_ndv() {
3196 let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
3197
3198 let mut options = RoundTripOptions::new(array.clone(), false);
3200 options.bloom_filter = true;
3201 options.bloom_filter_ndv = Some(1_000_000);
3202
3203 let files = one_column_roundtrip_with_options(options);
3204 check_bloom_filter(
3205 files,
3206 "col".to_string(),
3207 (0..SMALL_SIZE as i32).collect(),
3208 (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3209 );
3210
3211 let mut options = RoundTripOptions::new(array, false);
3213 options.bloom_filter = true;
3214 options.bloom_filter_ndv = Some(3);
3215
3216 let files = one_column_roundtrip_with_options(options);
3217 check_bloom_filter(
3218 files,
3219 "col".to_string(),
3220 (0..SMALL_SIZE as i32).collect(),
3221 (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3222 );
3223 }
3224
3225 #[test]
3226 fn binary_column_bloom_filter() {
3227 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3228 let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3229 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3230
3231 let array = Arc::new(BinaryArray::from_iter_values(many_vecs_iter));
3232 let mut options = RoundTripOptions::new(array, false);
3233 options.bloom_filter = true;
3234
3235 let files = one_column_roundtrip_with_options(options);
3236 check_bloom_filter(
3237 files,
3238 "col".to_string(),
3239 many_vecs,
3240 vec![vec![(SMALL_SIZE + 1) as u8]],
3241 );
3242 }
3243
3244 #[test]
3245 fn empty_string_null_column_bloom_filter() {
3246 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3247 let raw_strs = raw_values.iter().map(|s| s.as_str());
3248
3249 let array = Arc::new(StringArray::from_iter_values(raw_strs));
3250 let mut options = RoundTripOptions::new(array, false);
3251 options.bloom_filter = true;
3252
3253 let files = one_column_roundtrip_with_options(options);
3254
3255 let optional_raw_values: Vec<_> = raw_values
3256 .iter()
3257 .enumerate()
3258 .filter_map(|(i, v)| if i % 2 == 0 { None } else { Some(v.as_str()) })
3259 .collect();
3260 check_bloom_filter(files, "col".to_string(), optional_raw_values, vec![""]);
3262 }
3263
3264 #[test]
3265 fn large_binary_single_column() {
3266 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3267 let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3268 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3269
3270 values_required::<LargeBinaryArray, _>(many_vecs_iter);
3272 }
3273
3274 #[test]
3275 fn fixed_size_binary_single_column() {
3276 let mut builder = FixedSizeBinaryBuilder::new(4);
3277 builder.append_value(b"0123").unwrap();
3278 builder.append_null();
3279 builder.append_value(b"8910").unwrap();
3280 builder.append_value(b"1112").unwrap();
3281 let array = Arc::new(builder.finish());
3282
3283 one_column_roundtrip(array, true);
3284 }
3285
3286 #[test]
3287 fn string_single_column() {
3288 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3289 let raw_strs = raw_values.iter().map(|s| s.as_str());
3290
3291 required_and_optional::<StringArray, _>(raw_strs);
3292 }
3293
3294 #[test]
3295 fn large_string_single_column() {
3296 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3297 let raw_strs = raw_values.iter().map(|s| s.as_str());
3298
3299 required_and_optional::<LargeStringArray, _>(raw_strs);
3300 }
3301
3302 #[test]
3303 fn string_view_single_column() {
3304 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3305 let raw_strs = raw_values.iter().map(|s| s.as_str());
3306
3307 required_and_optional::<StringViewArray, _>(raw_strs);
3308 }
3309
3310 #[test]
3311 fn null_list_single_column() {
3312 let null_field = Field::new_list_field(DataType::Null, true);
3313 let list_field = Field::new("emptylist", DataType::List(Arc::new(null_field)), true);
3314
3315 let schema = Schema::new(vec![list_field]);
3316
3317 let a_values = NullArray::new(2);
3319 let a_value_offsets = arrow::buffer::Buffer::from([0, 0, 0, 2].to_byte_slice());
3320 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
3321 DataType::Null,
3322 true,
3323 ))))
3324 .len(3)
3325 .add_buffer(a_value_offsets)
3326 .null_bit_buffer(Some(Buffer::from([0b00000101])))
3327 .add_child_data(a_values.into_data())
3328 .build()
3329 .unwrap();
3330
3331 let a = ListArray::from(a_list_data);
3332
3333 assert!(a.is_valid(0));
3334 assert!(!a.is_valid(1));
3335 assert!(a.is_valid(2));
3336
3337 assert_eq!(a.value(0).len(), 0);
3338 assert_eq!(a.value(2).len(), 2);
3339 assert_eq!(a.value(2).logical_nulls().unwrap().null_count(), 2);
3340
3341 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
3342 roundtrip(batch, None);
3343 }
3344
3345 #[test]
3346 fn list_single_column() {
3347 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
3348 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
3349 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
3350 DataType::Int32,
3351 false,
3352 ))))
3353 .len(5)
3354 .add_buffer(a_value_offsets)
3355 .null_bit_buffer(Some(Buffer::from([0b00011011])))
3356 .add_child_data(a_values.into_data())
3357 .build()
3358 .unwrap();
3359
3360 assert_eq!(a_list_data.null_count(), 1);
3361
3362 let a = ListArray::from(a_list_data);
3363 let values = Arc::new(a);
3364
3365 one_column_roundtrip(values, true);
3366 }
3367
3368 #[test]
3369 fn large_list_single_column() {
3370 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
3371 let a_value_offsets = arrow::buffer::Buffer::from([0i64, 1, 3, 3, 6, 10].to_byte_slice());
3372 let a_list_data = ArrayData::builder(DataType::LargeList(Arc::new(Field::new(
3373 "large_item",
3374 DataType::Int32,
3375 true,
3376 ))))
3377 .len(5)
3378 .add_buffer(a_value_offsets)
3379 .add_child_data(a_values.into_data())
3380 .null_bit_buffer(Some(Buffer::from([0b00011011])))
3381 .build()
3382 .unwrap();
3383
3384 assert_eq!(a_list_data.null_count(), 1);
3386
3387 let a = LargeListArray::from(a_list_data);
3388 let values = Arc::new(a);
3389
3390 one_column_roundtrip(values, true);
3391 }
3392
3393 #[test]
3394 fn list_nested_nulls() {
3395 use arrow::datatypes::Int32Type;
3396 let data = vec![
3397 Some(vec![Some(1)]),
3398 Some(vec![Some(2), Some(3)]),
3399 None,
3400 Some(vec![Some(4), Some(5), None]),
3401 Some(vec![None]),
3402 Some(vec![Some(6), Some(7)]),
3403 ];
3404
3405 let list = ListArray::from_iter_primitive::<Int32Type, _, _>(data.clone());
3406 one_column_roundtrip(Arc::new(list), true);
3407
3408 let list = LargeListArray::from_iter_primitive::<Int32Type, _, _>(data);
3409 one_column_roundtrip(Arc::new(list), true);
3410 }
3411
3412 #[test]
3413 fn struct_single_column() {
3414 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
3415 let struct_field_a = Arc::new(Field::new("f", DataType::Int32, false));
3416 let s = StructArray::from(vec![(struct_field_a, Arc::new(a_values) as ArrayRef)]);
3417
3418 let values = Arc::new(s);
3419 one_column_roundtrip(values, false);
3420 }
3421
3422 #[test]
3423 fn list_and_map_coerced_names() {
3424 let list_field =
3426 Field::new_list("my_list", Field::new("item", DataType::Int32, false), false);
3427 let map_field = Field::new_map(
3428 "my_map",
3429 "entries",
3430 Field::new("keys", DataType::Int32, false),
3431 Field::new("values", DataType::Int32, true),
3432 false,
3433 true,
3434 );
3435
3436 let list_array = create_random_array(&list_field, 100, 0.0, 0.0).unwrap();
3437 let map_array = create_random_array(&map_field, 100, 0.0, 0.0).unwrap();
3438
3439 let arrow_schema = Arc::new(Schema::new(vec![list_field, map_field]));
3440
3441 let props = Some(WriterProperties::builder().set_coerce_types(true).build());
3443 let file = tempfile::tempfile().unwrap();
3444 let mut writer =
3445 ArrowWriter::try_new(file.try_clone().unwrap(), arrow_schema.clone(), props).unwrap();
3446
3447 let batch = RecordBatch::try_new(arrow_schema, vec![list_array, map_array]).unwrap();
3448 writer.write(&batch).unwrap();
3449 let file_metadata = writer.close().unwrap();
3450
3451 let schema = file_metadata.file_metadata().schema();
3452 let list_field = &schema.get_fields()[0].get_fields()[0];
3454 assert_eq!(list_field.get_fields()[0].name(), "element");
3455
3456 let map_field = &schema.get_fields()[1].get_fields()[0];
3457 assert_eq!(map_field.name(), "key_value");
3459 assert_eq!(map_field.get_fields()[0].name(), "key");
3461 assert_eq!(map_field.get_fields()[1].name(), "value");
3463
3464 let reader = SerializedFileReader::new(file).unwrap();
3466 let file_schema = reader.metadata().file_metadata().schema();
3467 let fields = file_schema.get_fields();
3468 let list_field = &fields[0].get_fields()[0];
3469 assert_eq!(list_field.get_fields()[0].name(), "element");
3470 let map_field = &fields[1].get_fields()[0];
3471 assert_eq!(map_field.name(), "key_value");
3472 assert_eq!(map_field.get_fields()[0].name(), "key");
3473 assert_eq!(map_field.get_fields()[1].name(), "value");
3474 }
3475
3476 #[test]
3477 fn fallback_flush_data_page() {
3478 let raw_values: Vec<_> = (0..MEDIUM_SIZE).map(|i| i.to_string()).collect();
3480 let values = Arc::new(StringArray::from(raw_values));
3481 let encodings = vec![
3482 Encoding::DELTA_BYTE_ARRAY,
3483 Encoding::DELTA_LENGTH_BYTE_ARRAY,
3484 ];
3485 let data_type = values.data_type().clone();
3486 let schema = Arc::new(Schema::new(vec![Field::new("col", data_type, false)]));
3487 let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3488
3489 let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
3490 let data_page_size_limit: usize = 32;
3491 let write_batch_size: usize = 16;
3492
3493 for encoding in &encodings {
3494 for row_group_size in row_group_sizes {
3495 let props = WriterProperties::builder()
3496 .set_writer_version(WriterVersion::PARQUET_2_0)
3497 .set_max_row_group_row_count(Some(row_group_size))
3498 .set_dictionary_enabled(false)
3499 .set_encoding(*encoding)
3500 .set_data_page_size_limit(data_page_size_limit)
3501 .set_write_batch_size(write_batch_size)
3502 .build();
3503
3504 roundtrip_opts_with_array_validation(&expected_batch, props, |a, b| {
3505 let string_array_a = StringArray::from(a.clone());
3506 let string_array_b = StringArray::from(b.clone());
3507 let vec_a: Vec<&str> = string_array_a.iter().map(|v| v.unwrap()).collect();
3508 let vec_b: Vec<&str> = string_array_b.iter().map(|v| v.unwrap()).collect();
3509 assert_eq!(
3510 vec_a, vec_b,
3511 "failed for encoder: {encoding:?} and row_group_size: {row_group_size:?}"
3512 );
3513 });
3514 }
3515 }
3516 }
3517
3518 #[test]
3519 fn arrow_writer_string_dictionary() {
3520 #[allow(deprecated)]
3522 let schema = Arc::new(Schema::new(vec![Field::new_dict(
3523 "dictionary",
3524 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3525 true,
3526 42,
3527 true,
3528 )]));
3529
3530 let d: Int32DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
3532 .iter()
3533 .copied()
3534 .collect();
3535
3536 one_column_roundtrip_with_schema(Arc::new(d), schema);
3538 }
3539
3540 #[test]
3541 fn arrow_writer_test_type_compatibility() {
3542 fn ensure_compatible_write<T1, T2>(array1: T1, array2: T2, expected_result: T1)
3543 where
3544 T1: Array + 'static,
3545 T2: Array + 'static,
3546 {
3547 let schema1 = Arc::new(Schema::new(vec![Field::new(
3548 "a",
3549 array1.data_type().clone(),
3550 false,
3551 )]));
3552
3553 let file = tempfile().unwrap();
3554 let mut writer =
3555 ArrowWriter::try_new(file.try_clone().unwrap(), schema1.clone(), None).unwrap();
3556
3557 let rb1 = RecordBatch::try_new(schema1.clone(), vec![Arc::new(array1)]).unwrap();
3558 writer.write(&rb1).unwrap();
3559
3560 let schema2 = Arc::new(Schema::new(vec![Field::new(
3561 "a",
3562 array2.data_type().clone(),
3563 false,
3564 )]));
3565 let rb2 = RecordBatch::try_new(schema2, vec![Arc::new(array2)]).unwrap();
3566 writer.write(&rb2).unwrap();
3567
3568 writer.close().unwrap();
3569
3570 let mut record_batch_reader =
3571 ParquetRecordBatchReader::try_new(file.try_clone().unwrap(), 1024).unwrap();
3572 let actual_batch = record_batch_reader.next().unwrap().unwrap();
3573
3574 let expected_batch =
3575 RecordBatch::try_new(schema1, vec![Arc::new(expected_result)]).unwrap();
3576 assert_eq!(actual_batch, expected_batch);
3577 }
3578
3579 ensure_compatible_write(
3582 DictionaryArray::new(
3583 UInt8Array::from_iter_values(vec![0]),
3584 Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3585 ),
3586 StringArray::from_iter_values(vec!["barquet"]),
3587 DictionaryArray::new(
3588 UInt8Array::from_iter_values(vec![0, 1]),
3589 Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3590 ),
3591 );
3592
3593 ensure_compatible_write(
3594 StringArray::from_iter_values(vec!["parquet"]),
3595 DictionaryArray::new(
3596 UInt8Array::from_iter_values(vec![0]),
3597 Arc::new(StringArray::from_iter_values(vec!["barquet"])),
3598 ),
3599 StringArray::from_iter_values(vec!["parquet", "barquet"]),
3600 );
3601
3602 ensure_compatible_write(
3605 DictionaryArray::new(
3606 UInt8Array::from_iter_values(vec![0]),
3607 Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3608 ),
3609 DictionaryArray::new(
3610 UInt16Array::from_iter_values(vec![0]),
3611 Arc::new(StringArray::from_iter_values(vec!["barquet"])),
3612 ),
3613 DictionaryArray::new(
3614 UInt8Array::from_iter_values(vec![0, 1]),
3615 Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3616 ),
3617 );
3618
3619 ensure_compatible_write(
3621 DictionaryArray::new(
3622 UInt8Array::from_iter_values(vec![0]),
3623 Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3624 ),
3625 DictionaryArray::new(
3626 UInt8Array::from_iter_values(vec![0]),
3627 Arc::new(LargeStringArray::from_iter_values(vec!["barquet"])),
3628 ),
3629 DictionaryArray::new(
3630 UInt8Array::from_iter_values(vec![0, 1]),
3631 Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3632 ),
3633 );
3634
3635 ensure_compatible_write(
3637 DictionaryArray::new(
3638 UInt8Array::from_iter_values(vec![0]),
3639 Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3640 ),
3641 LargeStringArray::from_iter_values(vec!["barquet"]),
3642 DictionaryArray::new(
3643 UInt8Array::from_iter_values(vec![0, 1]),
3644 Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3645 ),
3646 );
3647
3648 ensure_compatible_write(
3651 StringArray::from_iter_values(vec!["parquet"]),
3652 LargeStringArray::from_iter_values(vec!["barquet"]),
3653 StringArray::from_iter_values(vec!["parquet", "barquet"]),
3654 );
3655
3656 ensure_compatible_write(
3657 LargeStringArray::from_iter_values(vec!["parquet"]),
3658 StringArray::from_iter_values(vec!["barquet"]),
3659 LargeStringArray::from_iter_values(vec!["parquet", "barquet"]),
3660 );
3661
3662 ensure_compatible_write(
3663 StringArray::from_iter_values(vec!["parquet"]),
3664 StringViewArray::from_iter_values(vec!["barquet"]),
3665 StringArray::from_iter_values(vec!["parquet", "barquet"]),
3666 );
3667
3668 ensure_compatible_write(
3669 StringViewArray::from_iter_values(vec!["parquet"]),
3670 StringArray::from_iter_values(vec!["barquet"]),
3671 StringViewArray::from_iter_values(vec!["parquet", "barquet"]),
3672 );
3673
3674 ensure_compatible_write(
3675 LargeStringArray::from_iter_values(vec!["parquet"]),
3676 StringViewArray::from_iter_values(vec!["barquet"]),
3677 LargeStringArray::from_iter_values(vec!["parquet", "barquet"]),
3678 );
3679
3680 ensure_compatible_write(
3681 StringViewArray::from_iter_values(vec!["parquet"]),
3682 LargeStringArray::from_iter_values(vec!["barquet"]),
3683 StringViewArray::from_iter_values(vec!["parquet", "barquet"]),
3684 );
3685
3686 ensure_compatible_write(
3689 BinaryArray::from_iter_values(vec![b"parquet"]),
3690 LargeBinaryArray::from_iter_values(vec![b"barquet"]),
3691 BinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3692 );
3693
3694 ensure_compatible_write(
3695 LargeBinaryArray::from_iter_values(vec![b"parquet"]),
3696 BinaryArray::from_iter_values(vec![b"barquet"]),
3697 LargeBinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3698 );
3699
3700 ensure_compatible_write(
3701 BinaryArray::from_iter_values(vec![b"parquet"]),
3702 BinaryViewArray::from_iter_values(vec![b"barquet"]),
3703 BinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3704 );
3705
3706 ensure_compatible_write(
3707 BinaryViewArray::from_iter_values(vec![b"parquet"]),
3708 BinaryArray::from_iter_values(vec![b"barquet"]),
3709 BinaryViewArray::from_iter_values(vec![b"parquet", b"barquet"]),
3710 );
3711
3712 ensure_compatible_write(
3713 BinaryViewArray::from_iter_values(vec![b"parquet"]),
3714 LargeBinaryArray::from_iter_values(vec![b"barquet"]),
3715 BinaryViewArray::from_iter_values(vec![b"parquet", b"barquet"]),
3716 );
3717
3718 ensure_compatible_write(
3719 LargeBinaryArray::from_iter_values(vec![b"parquet"]),
3720 BinaryViewArray::from_iter_values(vec![b"barquet"]),
3721 LargeBinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3722 );
3723
3724 let list_field_metadata = HashMap::from_iter(vec![(
3727 PARQUET_FIELD_ID_META_KEY.to_string(),
3728 "1".to_string(),
3729 )]);
3730 let list_field = Field::new_list_field(DataType::Int32, false);
3731
3732 let values1 = Arc::new(Int32Array::from(vec![0, 1, 2, 3, 4]));
3733 let offsets1 = OffsetBuffer::new(vec![0, 2, 5].into());
3734
3735 let values2 = Arc::new(Int32Array::from(vec![5, 6, 7, 8, 9]));
3736 let offsets2 = OffsetBuffer::new(vec![0, 3, 5].into());
3737
3738 let values_expected = Arc::new(Int32Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]));
3739 let offsets_expected = OffsetBuffer::new(vec![0, 2, 5, 8, 10].into());
3740
3741 ensure_compatible_write(
3742 ListArray::try_new(
3744 Arc::new(
3745 list_field
3746 .clone()
3747 .with_metadata(list_field_metadata.clone()),
3748 ),
3749 offsets1,
3750 values1,
3751 None,
3752 )
3753 .unwrap(),
3754 ListArray::try_new(Arc::new(list_field.clone()), offsets2, values2, None).unwrap(),
3756 ListArray::try_new(
3758 Arc::new(
3759 list_field
3760 .clone()
3761 .with_metadata(list_field_metadata.clone()),
3762 ),
3763 offsets_expected,
3764 values_expected,
3765 None,
3766 )
3767 .unwrap(),
3768 );
3769 }
3770
3771 #[test]
3772 fn arrow_writer_primitive_dictionary() {
3773 #[allow(deprecated)]
3775 let schema = Arc::new(Schema::new(vec![Field::new_dict(
3776 "dictionary",
3777 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::UInt32)),
3778 true,
3779 42,
3780 true,
3781 )]));
3782
3783 let mut builder = PrimitiveDictionaryBuilder::<UInt8Type, UInt32Type>::new();
3785 builder.append(12345678).unwrap();
3786 builder.append_null();
3787 builder.append(22345678).unwrap();
3788 builder.append(12345678).unwrap();
3789 let d = builder.finish();
3790
3791 one_column_roundtrip_with_schema(Arc::new(d), schema);
3792 }
3793
3794 #[test]
3795 fn arrow_writer_decimal32_dictionary() {
3796 let integers = vec![12345, 56789, 34567];
3797
3798 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3799
3800 let values = Decimal32Array::from(integers.clone())
3801 .with_precision_and_scale(5, 2)
3802 .unwrap();
3803
3804 let array = DictionaryArray::new(keys, Arc::new(values));
3805 one_column_roundtrip(Arc::new(array.clone()), true);
3806
3807 let values = Decimal32Array::from(integers)
3808 .with_precision_and_scale(9, 2)
3809 .unwrap();
3810
3811 let array = array.with_values(Arc::new(values));
3812 one_column_roundtrip(Arc::new(array), true);
3813 }
3814
3815 #[test]
3816 fn arrow_writer_decimal64_dictionary() {
3817 let integers = vec![12345, 56789, 34567];
3818
3819 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3820
3821 let values = Decimal64Array::from(integers.clone())
3822 .with_precision_and_scale(5, 2)
3823 .unwrap();
3824
3825 let array = DictionaryArray::new(keys, Arc::new(values));
3826 one_column_roundtrip(Arc::new(array.clone()), true);
3827
3828 let values = Decimal64Array::from(integers)
3829 .with_precision_and_scale(12, 2)
3830 .unwrap();
3831
3832 let array = array.with_values(Arc::new(values));
3833 one_column_roundtrip(Arc::new(array), true);
3834 }
3835
3836 #[test]
3837 fn arrow_writer_decimal128_dictionary() {
3838 let integers = vec![12345, 56789, 34567];
3839
3840 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3841
3842 let values = Decimal128Array::from(integers.clone())
3843 .with_precision_and_scale(5, 2)
3844 .unwrap();
3845
3846 let array = DictionaryArray::new(keys, Arc::new(values));
3847 one_column_roundtrip(Arc::new(array.clone()), true);
3848
3849 let values = Decimal128Array::from(integers)
3850 .with_precision_and_scale(12, 2)
3851 .unwrap();
3852
3853 let array = array.with_values(Arc::new(values));
3854 one_column_roundtrip(Arc::new(array), true);
3855 }
3856
3857 #[test]
3858 fn arrow_writer_decimal256_dictionary() {
3859 let integers = vec![
3860 i256::from_i128(12345),
3861 i256::from_i128(56789),
3862 i256::from_i128(34567),
3863 ];
3864
3865 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3866
3867 let values = Decimal256Array::from(integers.clone())
3868 .with_precision_and_scale(5, 2)
3869 .unwrap();
3870
3871 let array = DictionaryArray::new(keys, Arc::new(values));
3872 one_column_roundtrip(Arc::new(array.clone()), true);
3873
3874 let values = Decimal256Array::from(integers)
3875 .with_precision_and_scale(12, 2)
3876 .unwrap();
3877
3878 let array = array.with_values(Arc::new(values));
3879 one_column_roundtrip(Arc::new(array), true);
3880 }
3881
3882 #[test]
3883 fn arrow_writer_string_dictionary_unsigned_index() {
3884 #[allow(deprecated)]
3886 let schema = Arc::new(Schema::new(vec![Field::new_dict(
3887 "dictionary",
3888 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3889 true,
3890 42,
3891 true,
3892 )]));
3893
3894 let d: UInt8DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
3896 .iter()
3897 .copied()
3898 .collect();
3899
3900 one_column_roundtrip_with_schema(Arc::new(d), schema);
3901 }
3902
3903 #[test]
3904 fn u32_min_max() {
3905 let src = [
3907 u32::MIN,
3908 u32::MIN + 1,
3909 (i32::MAX as u32) - 1,
3910 i32::MAX as u32,
3911 (i32::MAX as u32) + 1,
3912 u32::MAX - 1,
3913 u32::MAX,
3914 ];
3915 let values = Arc::new(UInt32Array::from_iter_values(src.iter().cloned()));
3916 let files = one_column_roundtrip(values, false);
3917
3918 for file in files {
3919 let reader = SerializedFileReader::new(file).unwrap();
3921 let metadata = reader.metadata();
3922
3923 let mut row_offset = 0;
3924 for row_group in metadata.row_groups() {
3925 assert_eq!(row_group.num_columns(), 1);
3926 let column = row_group.column(0);
3927
3928 let num_values = column.num_values() as usize;
3929 let src_slice = &src[row_offset..row_offset + num_values];
3930 row_offset += column.num_values() as usize;
3931
3932 let stats = column.statistics().unwrap();
3933 if let Statistics::Int32(stats) = stats {
3934 assert_eq!(
3935 *stats.min_opt().unwrap() as u32,
3936 *src_slice.iter().min().unwrap()
3937 );
3938 assert_eq!(
3939 *stats.max_opt().unwrap() as u32,
3940 *src_slice.iter().max().unwrap()
3941 );
3942 } else {
3943 panic!("Statistics::Int32 missing")
3944 }
3945 }
3946 }
3947 }
3948
3949 #[test]
3950 fn u64_min_max() {
3951 let src = [
3953 u64::MIN,
3954 u64::MIN + 1,
3955 (i64::MAX as u64) - 1,
3956 i64::MAX as u64,
3957 (i64::MAX as u64) + 1,
3958 u64::MAX - 1,
3959 u64::MAX,
3960 ];
3961 let values = Arc::new(UInt64Array::from_iter_values(src.iter().cloned()));
3962 let files = one_column_roundtrip(values, false);
3963
3964 for file in files {
3965 let reader = SerializedFileReader::new(file).unwrap();
3967 let metadata = reader.metadata();
3968
3969 let mut row_offset = 0;
3970 for row_group in metadata.row_groups() {
3971 assert_eq!(row_group.num_columns(), 1);
3972 let column = row_group.column(0);
3973
3974 let num_values = column.num_values() as usize;
3975 let src_slice = &src[row_offset..row_offset + num_values];
3976 row_offset += column.num_values() as usize;
3977
3978 let stats = column.statistics().unwrap();
3979 if let Statistics::Int64(stats) = stats {
3980 assert_eq!(
3981 *stats.min_opt().unwrap() as u64,
3982 *src_slice.iter().min().unwrap()
3983 );
3984 assert_eq!(
3985 *stats.max_opt().unwrap() as u64,
3986 *src_slice.iter().max().unwrap()
3987 );
3988 } else {
3989 panic!("Statistics::Int64 missing")
3990 }
3991 }
3992 }
3993 }
3994
3995 #[test]
3996 fn statistics_null_counts_only_nulls() {
3997 let values = Arc::new(UInt64Array::from(vec![None, None]));
3999 let files = one_column_roundtrip(values, true);
4000
4001 for file in files {
4002 let reader = SerializedFileReader::new(file).unwrap();
4004 let metadata = reader.metadata();
4005 assert_eq!(metadata.num_row_groups(), 1);
4006 let row_group = metadata.row_group(0);
4007 assert_eq!(row_group.num_columns(), 1);
4008 let column = row_group.column(0);
4009 let stats = column.statistics().unwrap();
4010 assert_eq!(stats.null_count_opt(), Some(2));
4011 }
4012 }
4013
4014 #[test]
4015 fn test_list_of_struct_roundtrip() {
4016 let int_field = Field::new("a", DataType::Int32, true);
4018 let int_field2 = Field::new("b", DataType::Int32, true);
4019
4020 let int_builder = Int32Builder::with_capacity(10);
4021 let int_builder2 = Int32Builder::with_capacity(10);
4022
4023 let struct_builder = StructBuilder::new(
4024 vec![int_field, int_field2],
4025 vec![Box::new(int_builder), Box::new(int_builder2)],
4026 );
4027 let mut list_builder = ListBuilder::new(struct_builder);
4028
4029 let values = list_builder.values();
4034 values
4035 .field_builder::<Int32Builder>(0)
4036 .unwrap()
4037 .append_value(1);
4038 values
4039 .field_builder::<Int32Builder>(1)
4040 .unwrap()
4041 .append_value(2);
4042 values.append(true);
4043 list_builder.append(true);
4044
4045 list_builder.append(true);
4047
4048 list_builder.append(false);
4050
4051 let values = list_builder.values();
4053 values
4054 .field_builder::<Int32Builder>(0)
4055 .unwrap()
4056 .append_null();
4057 values
4058 .field_builder::<Int32Builder>(1)
4059 .unwrap()
4060 .append_null();
4061 values.append(false);
4062 values
4063 .field_builder::<Int32Builder>(0)
4064 .unwrap()
4065 .append_null();
4066 values
4067 .field_builder::<Int32Builder>(1)
4068 .unwrap()
4069 .append_null();
4070 values.append(false);
4071 list_builder.append(true);
4072
4073 let values = list_builder.values();
4075 values
4076 .field_builder::<Int32Builder>(0)
4077 .unwrap()
4078 .append_null();
4079 values
4080 .field_builder::<Int32Builder>(1)
4081 .unwrap()
4082 .append_value(3);
4083 values.append(true);
4084 list_builder.append(true);
4085
4086 let values = list_builder.values();
4088 values
4089 .field_builder::<Int32Builder>(0)
4090 .unwrap()
4091 .append_value(2);
4092 values
4093 .field_builder::<Int32Builder>(1)
4094 .unwrap()
4095 .append_null();
4096 values.append(true);
4097 list_builder.append(true);
4098
4099 let array = Arc::new(list_builder.finish());
4100
4101 one_column_roundtrip(array, true);
4102 }
4103
4104 fn row_group_sizes(metadata: &ParquetMetaData) -> Vec<i64> {
4105 metadata.row_groups().iter().map(|x| x.num_rows()).collect()
4106 }
4107
4108 #[test]
4109 fn test_aggregates_records() {
4110 let arrays = [
4111 Int32Array::from((0..100).collect::<Vec<_>>()),
4112 Int32Array::from((0..50).collect::<Vec<_>>()),
4113 Int32Array::from((200..500).collect::<Vec<_>>()),
4114 ];
4115
4116 let schema = Arc::new(Schema::new(vec![Field::new(
4117 "int",
4118 ArrowDataType::Int32,
4119 false,
4120 )]));
4121
4122 let file = tempfile::tempfile().unwrap();
4123
4124 let props = WriterProperties::builder()
4125 .set_max_row_group_row_count(Some(200))
4126 .build();
4127
4128 let mut writer =
4129 ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
4130
4131 for array in arrays {
4132 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
4133 writer.write(&batch).unwrap();
4134 }
4135
4136 writer.close().unwrap();
4137
4138 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4139 assert_eq!(&row_group_sizes(builder.metadata()), &[200, 200, 50]);
4140
4141 let batches = builder
4142 .with_batch_size(100)
4143 .build()
4144 .unwrap()
4145 .collect::<ArrowResult<Vec<_>>>()
4146 .unwrap();
4147
4148 assert_eq!(batches.len(), 5);
4149 assert!(batches.iter().all(|x| x.num_columns() == 1));
4150
4151 let batch_sizes: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
4152
4153 assert_eq!(&batch_sizes, &[100, 100, 100, 100, 50]);
4154
4155 let values: Vec<_> = batches
4156 .iter()
4157 .flat_map(|x| {
4158 x.column(0)
4159 .as_any()
4160 .downcast_ref::<Int32Array>()
4161 .unwrap()
4162 .values()
4163 .iter()
4164 .cloned()
4165 })
4166 .collect();
4167
4168 let expected_values: Vec<_> = [0..100, 0..50, 200..500].into_iter().flatten().collect();
4169 assert_eq!(&values, &expected_values)
4170 }
4171
4172 #[test]
4173 fn complex_aggregate() {
4174 let field_a = Arc::new(Field::new("leaf_a", DataType::Int32, false));
4176 let field_b = Arc::new(Field::new("leaf_b", DataType::Int32, true));
4177 let struct_a = Arc::new(Field::new(
4178 "struct_a",
4179 DataType::Struct(vec![field_a.clone(), field_b.clone()].into()),
4180 true,
4181 ));
4182
4183 let list_a = Arc::new(Field::new("list", DataType::List(struct_a), true));
4184 let struct_b = Arc::new(Field::new(
4185 "struct_b",
4186 DataType::Struct(vec![list_a.clone()].into()),
4187 false,
4188 ));
4189
4190 let schema = Arc::new(Schema::new(vec![struct_b]));
4191
4192 let field_a_array = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
4194 let field_b_array =
4195 Int32Array::from_iter(vec![Some(1), None, Some(2), None, None, Some(6)]);
4196
4197 let struct_a_array = StructArray::from(vec![
4198 (field_a.clone(), Arc::new(field_a_array) as ArrayRef),
4199 (field_b.clone(), Arc::new(field_b_array) as ArrayRef),
4200 ]);
4201
4202 let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
4203 .len(5)
4204 .add_buffer(Buffer::from_iter(vec![
4205 0_i32, 1_i32, 1_i32, 3_i32, 3_i32, 5_i32,
4206 ]))
4207 .null_bit_buffer(Some(Buffer::from_iter(vec![
4208 true, false, true, false, true,
4209 ])))
4210 .child_data(vec![struct_a_array.into_data()])
4211 .build()
4212 .unwrap();
4213
4214 let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
4215 let struct_b_array = StructArray::from(vec![(list_a.clone(), list_a_array)]);
4216
4217 let batch1 =
4218 RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
4219 .unwrap();
4220
4221 let field_a_array = Int32Array::from(vec![6, 7, 8, 9, 10]);
4222 let field_b_array = Int32Array::from_iter(vec![None, None, None, Some(1), None]);
4223
4224 let struct_a_array = StructArray::from(vec![
4225 (field_a, Arc::new(field_a_array) as ArrayRef),
4226 (field_b, Arc::new(field_b_array) as ArrayRef),
4227 ]);
4228
4229 let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
4230 .len(2)
4231 .add_buffer(Buffer::from_iter(vec![0_i32, 4_i32, 5_i32]))
4232 .child_data(vec![struct_a_array.into_data()])
4233 .build()
4234 .unwrap();
4235
4236 let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
4237 let struct_b_array = StructArray::from(vec![(list_a, list_a_array)]);
4238
4239 let batch2 =
4240 RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
4241 .unwrap();
4242
4243 let batches = &[batch1, batch2];
4244
4245 let expected = r#"
4248 +-------------------------------------------------------------------------------------------------------+
4249 | struct_b |
4250 +-------------------------------------------------------------------------------------------------------+
4251 | {list: [{leaf_a: 1, leaf_b: 1}]} |
4252 | {list: } |
4253 | {list: [{leaf_a: 2, leaf_b: }, {leaf_a: 3, leaf_b: 2}]} |
4254 | {list: } |
4255 | {list: [{leaf_a: 4, leaf_b: }, {leaf_a: 5, leaf_b: }]} |
4256 | {list: [{leaf_a: 6, leaf_b: }, {leaf_a: 7, leaf_b: }, {leaf_a: 8, leaf_b: }, {leaf_a: 9, leaf_b: 1}]} |
4257 | {list: [{leaf_a: 10, leaf_b: }]} |
4258 +-------------------------------------------------------------------------------------------------------+
4259 "#.trim().split('\n').map(|x| x.trim()).collect::<Vec<_>>().join("\n");
4260
4261 let actual = pretty_format_batches(batches).unwrap().to_string();
4262 assert_eq!(actual, expected);
4263
4264 let file = tempfile::tempfile().unwrap();
4266 let props = WriterProperties::builder()
4267 .set_max_row_group_row_count(Some(6))
4268 .build();
4269
4270 let mut writer =
4271 ArrowWriter::try_new(file.try_clone().unwrap(), schema, Some(props)).unwrap();
4272
4273 for batch in batches {
4274 writer.write(batch).unwrap();
4275 }
4276 writer.close().unwrap();
4277
4278 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4283 assert_eq!(&row_group_sizes(builder.metadata()), &[6, 1]);
4284
4285 let batches = builder
4286 .with_batch_size(2)
4287 .build()
4288 .unwrap()
4289 .collect::<ArrowResult<Vec<_>>>()
4290 .unwrap();
4291
4292 assert_eq!(batches.len(), 4);
4293 let batch_counts: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
4294 assert_eq!(&batch_counts, &[2, 2, 2, 1]);
4295
4296 let actual = pretty_format_batches(&batches).unwrap().to_string();
4297 assert_eq!(actual, expected);
4298 }
4299
4300 #[test]
4301 fn test_arrow_writer_metadata() {
4302 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
4303 let file_schema = batch_schema.clone().with_metadata(
4304 vec![("foo".to_string(), "bar".to_string())]
4305 .into_iter()
4306 .collect(),
4307 );
4308
4309 let batch = RecordBatch::try_new(
4310 Arc::new(batch_schema),
4311 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4312 )
4313 .unwrap();
4314
4315 let mut buf = Vec::with_capacity(1024);
4316 let mut writer = ArrowWriter::try_new(&mut buf, Arc::new(file_schema), None).unwrap();
4317 writer.write(&batch).unwrap();
4318 writer.close().unwrap();
4319 }
4320
4321 #[test]
4322 fn test_arrow_writer_nullable() {
4323 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
4324 let file_schema = Schema::new(vec![Field::new("int32", DataType::Int32, true)]);
4325 let file_schema = Arc::new(file_schema);
4326
4327 let batch = RecordBatch::try_new(
4328 Arc::new(batch_schema),
4329 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4330 )
4331 .unwrap();
4332
4333 let mut buf = Vec::with_capacity(1024);
4334 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
4335 writer.write(&batch).unwrap();
4336 writer.close().unwrap();
4337
4338 let mut read = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
4339 let back = read.next().unwrap().unwrap();
4340 assert_eq!(back.schema(), file_schema);
4341 assert_ne!(back.schema(), batch.schema());
4342 assert_eq!(back.column(0).as_ref(), batch.column(0).as_ref());
4343 }
4344
4345 #[test]
4346 fn in_progress_accounting() {
4347 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
4349
4350 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
4352
4353 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
4355
4356 let mut writer = ArrowWriter::try_new(vec![], batch.schema(), None).unwrap();
4357
4358 assert_eq!(writer.in_progress_size(), 0);
4360 assert_eq!(writer.in_progress_rows(), 0);
4361 assert_eq!(writer.memory_size(), 0);
4362 assert_eq!(writer.bytes_written(), 4); writer.write(&batch).unwrap();
4364
4365 let initial_size = writer.in_progress_size();
4367 assert!(initial_size > 0);
4368 assert_eq!(writer.in_progress_rows(), 5);
4369 let initial_memory = writer.memory_size();
4370 assert!(initial_memory > 0);
4371 assert!(
4373 initial_size <= initial_memory,
4374 "{initial_size} <= {initial_memory}"
4375 );
4376
4377 writer.write(&batch).unwrap();
4379 assert!(writer.in_progress_size() > initial_size);
4380 assert_eq!(writer.in_progress_rows(), 10);
4381 assert!(writer.memory_size() > initial_memory);
4382 assert!(
4383 writer.in_progress_size() <= writer.memory_size(),
4384 "in_progress_size {} <= memory_size {}",
4385 writer.in_progress_size(),
4386 writer.memory_size()
4387 );
4388
4389 let pre_flush_bytes_written = writer.bytes_written();
4391 writer.flush().unwrap();
4392 assert_eq!(writer.in_progress_size(), 0);
4393 assert_eq!(writer.memory_size(), 0);
4394 assert!(writer.bytes_written() > pre_flush_bytes_written);
4395
4396 writer.close().unwrap();
4397 }
4398
4399 #[test]
4400 fn test_writer_all_null() {
4401 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
4402 let b = Int32Array::new(vec![0; 5].into(), Some(NullBuffer::new_null(5)));
4403 let batch = RecordBatch::try_from_iter(vec![
4404 ("a", Arc::new(a) as ArrayRef),
4405 ("b", Arc::new(b) as ArrayRef),
4406 ])
4407 .unwrap();
4408
4409 let mut buf = Vec::with_capacity(1024);
4410 let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), None).unwrap();
4411 writer.write(&batch).unwrap();
4412 writer.close().unwrap();
4413
4414 let bytes = Bytes::from(buf);
4415 let options = ReadOptionsBuilder::new().with_page_index().build();
4416 let reader = SerializedFileReader::new_with_options(bytes, options).unwrap();
4417 let index = reader.metadata().offset_index().unwrap();
4418
4419 assert_eq!(index.len(), 1);
4420 assert_eq!(index[0].len(), 2); assert_eq!(index[0][0].page_locations().len(), 1); assert_eq!(index[0][1].page_locations().len(), 1); }
4424
4425 #[test]
4426 fn test_disabled_statistics_with_page() {
4427 let file_schema = Schema::new(vec![
4428 Field::new("a", DataType::Utf8, true),
4429 Field::new("b", DataType::Utf8, true),
4430 ]);
4431 let file_schema = Arc::new(file_schema);
4432
4433 let batch = RecordBatch::try_new(
4434 file_schema.clone(),
4435 vec![
4436 Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
4437 Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
4438 ],
4439 )
4440 .unwrap();
4441
4442 let props = WriterProperties::builder()
4443 .set_statistics_enabled(EnabledStatistics::None)
4444 .set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
4445 .build();
4446
4447 let mut buf = Vec::with_capacity(1024);
4448 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
4449 writer.write(&batch).unwrap();
4450
4451 let metadata = writer.close().unwrap();
4452 assert_eq!(metadata.num_row_groups(), 1);
4453 let row_group = metadata.row_group(0);
4454 assert_eq!(row_group.num_columns(), 2);
4455 assert!(row_group.column(0).offset_index_offset().is_some());
4457 assert!(row_group.column(0).column_index_offset().is_some());
4458 assert!(row_group.column(1).offset_index_offset().is_some());
4460 assert!(row_group.column(1).column_index_offset().is_none());
4461
4462 let options = ReadOptionsBuilder::new().with_page_index().build();
4463 let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
4464
4465 let row_group = reader.get_row_group(0).unwrap();
4466 let a_col = row_group.metadata().column(0);
4467 let b_col = row_group.metadata().column(1);
4468
4469 if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
4471 let min = byte_array_stats.min_opt().unwrap();
4472 let max = byte_array_stats.max_opt().unwrap();
4473
4474 assert_eq!(min.as_bytes(), b"a");
4475 assert_eq!(max.as_bytes(), b"d");
4476 } else {
4477 panic!("expecting Statistics::ByteArray");
4478 }
4479
4480 assert!(b_col.statistics().is_none());
4482
4483 let offset_index = reader.metadata().offset_index().unwrap();
4484 assert_eq!(offset_index.len(), 1); assert_eq!(offset_index[0].len(), 2); let column_index = reader.metadata().column_index().unwrap();
4488 assert_eq!(column_index.len(), 1); assert_eq!(column_index[0].len(), 2); let a_idx = &column_index[0][0];
4492 assert!(
4493 matches!(a_idx, ColumnIndexMetaData::BYTE_ARRAY(_)),
4494 "{a_idx:?}"
4495 );
4496 let b_idx = &column_index[0][1];
4497 assert!(matches!(b_idx, ColumnIndexMetaData::NONE), "{b_idx:?}");
4498 }
4499
4500 #[test]
4501 fn test_disabled_statistics_with_chunk() {
4502 let file_schema = Schema::new(vec![
4503 Field::new("a", DataType::Utf8, true),
4504 Field::new("b", DataType::Utf8, true),
4505 ]);
4506 let file_schema = Arc::new(file_schema);
4507
4508 let batch = RecordBatch::try_new(
4509 file_schema.clone(),
4510 vec![
4511 Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
4512 Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
4513 ],
4514 )
4515 .unwrap();
4516
4517 let props = WriterProperties::builder()
4518 .set_statistics_enabled(EnabledStatistics::None)
4519 .set_column_statistics_enabled("a".into(), EnabledStatistics::Chunk)
4520 .build();
4521
4522 let mut buf = Vec::with_capacity(1024);
4523 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
4524 writer.write(&batch).unwrap();
4525
4526 let metadata = writer.close().unwrap();
4527 assert_eq!(metadata.num_row_groups(), 1);
4528 let row_group = metadata.row_group(0);
4529 assert_eq!(row_group.num_columns(), 2);
4530 assert!(row_group.column(0).offset_index_offset().is_some());
4532 assert!(row_group.column(0).column_index_offset().is_none());
4533 assert!(row_group.column(1).offset_index_offset().is_some());
4535 assert!(row_group.column(1).column_index_offset().is_none());
4536
4537 let options = ReadOptionsBuilder::new().with_page_index().build();
4538 let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
4539
4540 let row_group = reader.get_row_group(0).unwrap();
4541 let a_col = row_group.metadata().column(0);
4542 let b_col = row_group.metadata().column(1);
4543
4544 if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
4546 let min = byte_array_stats.min_opt().unwrap();
4547 let max = byte_array_stats.max_opt().unwrap();
4548
4549 assert_eq!(min.as_bytes(), b"a");
4550 assert_eq!(max.as_bytes(), b"d");
4551 } else {
4552 panic!("expecting Statistics::ByteArray");
4553 }
4554
4555 assert!(b_col.statistics().is_none());
4557
4558 let column_index = reader.metadata().column_index().unwrap();
4559 assert_eq!(column_index.len(), 1); assert_eq!(column_index[0].len(), 2); let a_idx = &column_index[0][0];
4563 assert!(matches!(a_idx, ColumnIndexMetaData::NONE), "{a_idx:?}");
4564 let b_idx = &column_index[0][1];
4565 assert!(matches!(b_idx, ColumnIndexMetaData::NONE), "{b_idx:?}");
4566 }
4567
4568 #[test]
4569 fn test_arrow_writer_skip_metadata() {
4570 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
4571 let file_schema = Arc::new(batch_schema.clone());
4572
4573 let batch = RecordBatch::try_new(
4574 Arc::new(batch_schema),
4575 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4576 )
4577 .unwrap();
4578 let skip_options = ArrowWriterOptions::new().with_skip_arrow_metadata(true);
4579
4580 let mut buf = Vec::with_capacity(1024);
4581 let mut writer =
4582 ArrowWriter::try_new_with_options(&mut buf, file_schema.clone(), skip_options).unwrap();
4583 writer.write(&batch).unwrap();
4584 writer.close().unwrap();
4585
4586 let bytes = Bytes::from(buf);
4587 let reader_builder = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
4588 assert_eq!(file_schema, *reader_builder.schema());
4589 if let Some(key_value_metadata) = reader_builder
4590 .metadata()
4591 .file_metadata()
4592 .key_value_metadata()
4593 {
4594 assert!(
4595 !key_value_metadata
4596 .iter()
4597 .any(|kv| kv.key.as_str() == ARROW_SCHEMA_META_KEY)
4598 );
4599 }
4600 }
4601
4602 #[test]
4603 fn test_arrow_writer_skip_path_in_schema() {
4604 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
4605 let file_schema = Arc::new(batch_schema.clone());
4606
4607 let batch = RecordBatch::try_new(
4608 Arc::new(batch_schema),
4609 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4610 )
4611 .unwrap();
4612
4613 let skip_options = ArrowWriterOptions::new();
4615
4616 let mut buf = Vec::with_capacity(1024);
4617 let mut writer =
4618 ArrowWriter::try_new_with_options(&mut buf, file_schema.clone(), skip_options).unwrap();
4619 writer.write(&batch).unwrap();
4620 writer.close().unwrap();
4621
4622 let skip_options = ArrowWriterOptions::new().with_properties(
4624 WriterProperties::builder()
4625 .set_write_path_in_schema(false)
4626 .build(),
4627 );
4628
4629 let mut buf2 = Vec::with_capacity(1024);
4630 let mut writer =
4631 ArrowWriter::try_new_with_options(&mut buf2, file_schema.clone(), skip_options)
4632 .unwrap();
4633 writer.write(&batch).unwrap();
4634 writer.close().unwrap();
4635
4636 assert!(buf.len() > buf2.len());
4638 }
4639
4640 #[test]
4641 fn mismatched_schemas() {
4642 let batch_schema = Schema::new(vec![Field::new("count", DataType::Int32, false)]);
4643 let file_schema = Arc::new(Schema::new(vec![Field::new(
4644 "temperature",
4645 DataType::Float64,
4646 false,
4647 )]));
4648
4649 let batch = RecordBatch::try_new(
4650 Arc::new(batch_schema),
4651 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4652 )
4653 .unwrap();
4654
4655 let mut buf = Vec::with_capacity(1024);
4656 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
4657
4658 let err = writer.write(&batch).unwrap_err().to_string();
4659 assert_eq!(
4660 err,
4661 "Arrow: Incompatible type. Field 'temperature' has type Float64, array has type Int32"
4662 );
4663 }
4664
4665 #[test]
4666 fn test_roundtrip_empty_schema() {
4668 let empty_batch = RecordBatch::try_new_with_options(
4670 Arc::new(Schema::empty()),
4671 vec![],
4672 &RecordBatchOptions::default().with_row_count(Some(0)),
4673 )
4674 .unwrap();
4675
4676 let mut parquet_bytes: Vec<u8> = Vec::new();
4678 let mut writer =
4679 ArrowWriter::try_new(&mut parquet_bytes, empty_batch.schema(), None).unwrap();
4680 writer.write(&empty_batch).unwrap();
4681 writer.close().unwrap();
4682
4683 let bytes = Bytes::from(parquet_bytes);
4685 let reader = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
4686 assert_eq!(reader.schema(), &empty_batch.schema());
4687 let batches: Vec<_> = reader
4688 .build()
4689 .unwrap()
4690 .collect::<ArrowResult<Vec<_>>>()
4691 .unwrap();
4692 assert_eq!(batches.len(), 0);
4693 }
4694
4695 #[test]
4696 fn test_page_stats_not_written_by_default() {
4697 let string_field = Field::new("a", DataType::Utf8, false);
4698 let schema = Schema::new(vec![string_field]);
4699 let raw_string_values = vec!["Blart Versenwald III"];
4700 let string_values = StringArray::from(raw_string_values.clone());
4701 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
4702
4703 let props = WriterProperties::builder()
4704 .set_statistics_enabled(EnabledStatistics::Page)
4705 .set_dictionary_enabled(false)
4706 .set_encoding(Encoding::PLAIN)
4707 .set_compression(crate::basic::Compression::UNCOMPRESSED)
4708 .build();
4709
4710 let file = roundtrip_opts(&batch, props);
4711
4712 let first_page = &file[4..];
4717 let mut prot = ThriftSliceInputProtocol::new(first_page);
4718 let hdr = PageHeader::read_thrift(&mut prot).unwrap();
4719 let stats = hdr.data_page_header.unwrap().statistics;
4720
4721 assert!(stats.is_none());
4722 }
4723
4724 #[test]
4725 fn test_page_stats_when_enabled() {
4726 let string_field = Field::new("a", DataType::Utf8, false);
4727 let schema = Schema::new(vec![string_field]);
4728 let raw_string_values = vec!["Blart Versenwald III", "Andrew Lamb"];
4729 let string_values = StringArray::from(raw_string_values.clone());
4730 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
4731
4732 let props = WriterProperties::builder()
4733 .set_statistics_enabled(EnabledStatistics::Page)
4734 .set_dictionary_enabled(false)
4735 .set_encoding(Encoding::PLAIN)
4736 .set_write_page_header_statistics(true)
4737 .set_compression(crate::basic::Compression::UNCOMPRESSED)
4738 .build();
4739
4740 let file = roundtrip_opts(&batch, props);
4741
4742 let first_page = &file[4..];
4747 let mut prot = ThriftSliceInputProtocol::new(first_page);
4748 let hdr = PageHeader::read_thrift(&mut prot).unwrap();
4749 let stats = hdr.data_page_header.unwrap().statistics;
4750
4751 let stats = stats.unwrap();
4752 assert!(stats.is_max_value_exact.unwrap());
4754 assert!(stats.is_min_value_exact.unwrap());
4755 assert_eq!(stats.max_value.unwrap(), "Blart Versenwald III".as_bytes());
4756 assert_eq!(stats.min_value.unwrap(), "Andrew Lamb".as_bytes());
4757 }
4758
4759 #[test]
4760 fn test_page_stats_truncation() {
4761 let string_field = Field::new("a", DataType::Utf8, false);
4762 let binary_field = Field::new("b", DataType::Binary, false);
4763 let schema = Schema::new(vec![string_field, binary_field]);
4764
4765 let raw_string_values = vec!["Blart Versenwald III"];
4766 let raw_binary_values = [b"Blart Versenwald III".to_vec()];
4767 let raw_binary_value_refs = raw_binary_values
4768 .iter()
4769 .map(|x| x.as_slice())
4770 .collect::<Vec<_>>();
4771
4772 let string_values = StringArray::from(raw_string_values.clone());
4773 let binary_values = BinaryArray::from(raw_binary_value_refs);
4774 let batch = RecordBatch::try_new(
4775 Arc::new(schema),
4776 vec![Arc::new(string_values), Arc::new(binary_values)],
4777 )
4778 .unwrap();
4779
4780 let props = WriterProperties::builder()
4781 .set_statistics_truncate_length(Some(2))
4782 .set_dictionary_enabled(false)
4783 .set_encoding(Encoding::PLAIN)
4784 .set_write_page_header_statistics(true)
4785 .set_compression(crate::basic::Compression::UNCOMPRESSED)
4786 .build();
4787
4788 let file = roundtrip_opts(&batch, props);
4789
4790 let first_page = &file[4..];
4795 let mut prot = ThriftSliceInputProtocol::new(first_page);
4796 let hdr = PageHeader::read_thrift(&mut prot).unwrap();
4797 let stats = hdr.data_page_header.unwrap().statistics;
4798 assert!(stats.is_some());
4799 let stats = stats.unwrap();
4800 assert!(!stats.is_max_value_exact.unwrap());
4802 assert!(!stats.is_min_value_exact.unwrap());
4803 assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
4804 assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
4805
4806 let second_page = &prot.as_slice()[hdr.compressed_page_size as usize..];
4808 let mut prot = ThriftSliceInputProtocol::new(second_page);
4809 let hdr = PageHeader::read_thrift(&mut prot).unwrap();
4810 let stats = hdr.data_page_header.unwrap().statistics;
4811 assert!(stats.is_some());
4812 let stats = stats.unwrap();
4813 assert!(!stats.is_max_value_exact.unwrap());
4815 assert!(!stats.is_min_value_exact.unwrap());
4816 assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
4817 assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
4818 }
4819
4820 #[test]
4821 fn test_page_encoding_statistics_roundtrip() {
4822 let batch_schema = Schema::new(vec![Field::new(
4823 "int32",
4824 arrow_schema::DataType::Int32,
4825 false,
4826 )]);
4827
4828 let batch = RecordBatch::try_new(
4829 Arc::new(batch_schema.clone()),
4830 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4831 )
4832 .unwrap();
4833
4834 let mut file: File = tempfile::tempfile().unwrap();
4835 let mut writer = ArrowWriter::try_new(&mut file, Arc::new(batch_schema), None).unwrap();
4836 writer.write(&batch).unwrap();
4837 let file_metadata = writer.close().unwrap();
4838
4839 assert_eq!(file_metadata.num_row_groups(), 1);
4840 assert_eq!(file_metadata.row_group(0).num_columns(), 1);
4841 assert!(
4842 file_metadata
4843 .row_group(0)
4844 .column(0)
4845 .page_encoding_stats()
4846 .is_some()
4847 );
4848 let chunk_page_stats = file_metadata
4849 .row_group(0)
4850 .column(0)
4851 .page_encoding_stats()
4852 .unwrap();
4853
4854 let options = ReadOptionsBuilder::new()
4856 .with_page_index()
4857 .with_encoding_stats_as_mask(false)
4858 .build();
4859 let reader = SerializedFileReader::new_with_options(file, options).unwrap();
4860
4861 let rowgroup = reader.get_row_group(0).expect("row group missing");
4862 assert_eq!(rowgroup.num_columns(), 1);
4863 let column = rowgroup.metadata().column(0);
4864 assert!(column.page_encoding_stats().is_some());
4865 let file_page_stats = column.page_encoding_stats().unwrap();
4866 assert_eq!(chunk_page_stats, file_page_stats);
4867 }
4868
4869 #[test]
4870 fn test_different_dict_page_size_limit() {
4871 let array = Arc::new(Int64Array::from_iter(0..1024 * 1024));
4872 let schema = Arc::new(Schema::new(vec![
4873 Field::new("col0", arrow_schema::DataType::Int64, false),
4874 Field::new("col1", arrow_schema::DataType::Int64, false),
4875 ]));
4876 let batch =
4877 arrow_array::RecordBatch::try_new(schema.clone(), vec![array.clone(), array]).unwrap();
4878
4879 let props = WriterProperties::builder()
4880 .set_dictionary_page_size_limit(1024 * 1024)
4881 .set_column_dictionary_page_size_limit(ColumnPath::from("col1"), 1024 * 1024 * 4)
4882 .build();
4883 let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap();
4884 writer.write(&batch).unwrap();
4885 let data = Bytes::from(writer.into_inner().unwrap());
4886
4887 let mut metadata = ParquetMetaDataReader::new();
4888 metadata.try_parse(&data).unwrap();
4889 let metadata = metadata.finish().unwrap();
4890 let col0_meta = metadata.row_group(0).column(0);
4891 let col1_meta = metadata.row_group(0).column(1);
4892
4893 let get_dict_page_size = move |meta: &ColumnChunkMetaData| {
4894 let mut reader =
4895 SerializedPageReader::new(Arc::new(data.clone()), meta, 0, None).unwrap();
4896 let page = reader.get_next_page().unwrap().unwrap();
4897 match page {
4898 Page::DictionaryPage { buf, .. } => buf.len(),
4899 _ => panic!("expected DictionaryPage"),
4900 }
4901 };
4902
4903 assert_eq!(get_dict_page_size(col0_meta), 1024 * 1024);
4904 assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4);
4905 }
4906
4907 struct WriteBatchesShape {
4908 num_batches: usize,
4909 rows_per_batch: usize,
4910 row_size: usize,
4911 }
4912
4913 fn write_batches(
4915 WriteBatchesShape {
4916 num_batches,
4917 rows_per_batch,
4918 row_size,
4919 }: WriteBatchesShape,
4920 props: WriterProperties,
4921 ) -> ParquetRecordBatchReaderBuilder<File> {
4922 let schema = Arc::new(Schema::new(vec![Field::new(
4923 "str",
4924 ArrowDataType::Utf8,
4925 false,
4926 )]));
4927 let file = tempfile::tempfile().unwrap();
4928 let mut writer =
4929 ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
4930
4931 for batch_idx in 0..num_batches {
4932 let strings: Vec<String> = (0..rows_per_batch)
4933 .map(|i| format!("{:0>width$}", batch_idx * 10 + i, width = row_size))
4934 .collect();
4935 let array = StringArray::from(strings);
4936 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
4937 writer.write(&batch).unwrap();
4938 }
4939 writer.close().unwrap();
4940 ParquetRecordBatchReaderBuilder::try_new(file).unwrap()
4941 }
4942
4943 #[test]
4944 fn test_row_group_limit_none_writes_single_row_group() {
4946 let props = WriterProperties::builder()
4947 .set_max_row_group_row_count(None)
4948 .set_max_row_group_bytes(None)
4949 .build();
4950
4951 let builder = write_batches(
4952 WriteBatchesShape {
4953 num_batches: 1,
4954 rows_per_batch: 1000,
4955 row_size: 4,
4956 },
4957 props,
4958 );
4959
4960 assert_eq!(
4961 &row_group_sizes(builder.metadata()),
4962 &[1000],
4963 "With no limits, all rows should be in a single row group"
4964 );
4965 }
4966
4967 #[test]
4968 fn test_row_group_limit_rows_only() {
4970 let props = WriterProperties::builder()
4971 .set_max_row_group_row_count(Some(300))
4972 .set_max_row_group_bytes(None)
4973 .build();
4974
4975 let builder = write_batches(
4976 WriteBatchesShape {
4977 num_batches: 1,
4978 rows_per_batch: 1000,
4979 row_size: 4,
4980 },
4981 props,
4982 );
4983
4984 assert_eq!(
4985 &row_group_sizes(builder.metadata()),
4986 &[300, 300, 300, 100],
4987 "Row groups should be split by row count"
4988 );
4989 }
4990
4991 #[test]
4992 fn test_row_group_limit_bytes_only() {
4994 let props = WriterProperties::builder()
4995 .set_max_row_group_row_count(None)
4996 .set_max_row_group_bytes(Some(3500))
4998 .build();
4999
5000 let builder = write_batches(
5001 WriteBatchesShape {
5002 num_batches: 10,
5003 rows_per_batch: 10,
5004 row_size: 100,
5005 },
5006 props,
5007 );
5008
5009 let sizes = row_group_sizes(builder.metadata());
5010
5011 assert!(
5012 sizes.len() > 1,
5013 "Should have multiple row groups due to byte limit, got {sizes:?}",
5014 );
5015
5016 let total_rows: i64 = sizes.iter().sum();
5017 assert_eq!(total_rows, 100, "Total rows should be preserved");
5018 }
5019
5020 #[test]
5021 fn test_row_group_limit_bytes_flushes_when_current_group_already_too_large() {
5023 let schema = Arc::new(Schema::new(vec![Field::new(
5024 "str",
5025 ArrowDataType::Utf8,
5026 false,
5027 )]));
5028 let file = tempfile::tempfile().unwrap();
5029
5030 let props = WriterProperties::builder()
5032 .set_max_row_group_row_count(None)
5033 .set_max_row_group_bytes(None)
5034 .build();
5035 let mut writer =
5036 ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
5037
5038 let first_array = StringArray::from(
5039 (0..10)
5040 .map(|i| format!("{:0>100}", i))
5041 .collect::<Vec<String>>(),
5042 );
5043 let first_batch =
5044 RecordBatch::try_new(schema.clone(), vec![Arc::new(first_array)]).unwrap();
5045 writer.write(&first_batch).unwrap();
5046 assert_eq!(writer.in_progress_rows(), 10);
5047
5048 writer.max_row_group_bytes = Some(1);
5051
5052 let second_array = StringArray::from(vec!["x".to_string()]);
5053 let second_batch =
5054 RecordBatch::try_new(schema.clone(), vec![Arc::new(second_array)]).unwrap();
5055 writer.write(&second_batch).unwrap();
5056 writer.close().unwrap();
5057 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
5058
5059 assert_eq!(
5060 &row_group_sizes(builder.metadata()),
5061 &[10, 1],
5062 "The second write should flush an oversized in-progress row group first",
5063 );
5064 }
5065
5066 #[test]
5067 fn test_row_group_limit_both_row_wins_single_batch() {
5069 let props = WriterProperties::builder()
5070 .set_max_row_group_row_count(Some(200)) .set_max_row_group_bytes(Some(1024 * 1024)) .build();
5073
5074 let builder = write_batches(
5075 WriteBatchesShape {
5076 num_batches: 1,
5077 row_size: 4,
5078 rows_per_batch: 1000,
5079 },
5080 props,
5081 );
5082
5083 assert_eq!(
5084 &row_group_sizes(builder.metadata()),
5085 &[200, 200, 200, 200, 200],
5086 "Row limit should trigger before byte limit"
5087 );
5088 }
5089
5090 #[test]
5091 fn test_row_group_limit_both_row_wins_multiple_batches() {
5093 let props = WriterProperties::builder()
5094 .set_max_row_group_row_count(Some(5)) .set_max_row_group_bytes(Some(9999)) .build();
5097
5098 let builder = write_batches(
5099 WriteBatchesShape {
5100 num_batches: 10,
5101 rows_per_batch: 10,
5102 row_size: 100,
5103 },
5104 props,
5105 );
5106
5107 assert_eq!(
5108 &row_group_sizes(builder.metadata()),
5109 &[5; 20],
5110 "Row limit should trigger before byte limit"
5111 );
5112 }
5113
5114 #[test]
5115 fn test_row_group_limit_both_bytes_wins() {
5117 let props = WriterProperties::builder()
5118 .set_max_row_group_row_count(Some(1000)) .set_max_row_group_bytes(Some(3500)) .build();
5121
5122 let builder = write_batches(
5123 WriteBatchesShape {
5124 num_batches: 10,
5125 rows_per_batch: 10,
5126 row_size: 100,
5127 },
5128 props,
5129 );
5130
5131 let sizes = row_group_sizes(builder.metadata());
5132
5133 assert!(
5134 sizes.len() > 1,
5135 "Byte limit should trigger before row limit, got {sizes:?}",
5136 );
5137
5138 assert!(
5139 sizes.iter().all(|&s| s < 1000),
5140 "No row group should hit the row limit"
5141 );
5142
5143 let total_rows: i64 = sizes.iter().sum();
5144 assert_eq!(total_rows, 100, "Total rows should be preserved");
5145 }
5146
5147 #[test]
5148 fn arrow_column_chunk_close_mut_drops_column_index() {
5149 use crate::arrow::ArrowSchemaConverter;
5150 use crate::file::writer::SerializedFileWriter;
5151
5152 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)]));
5153 let props = Arc::new(
5154 WriterProperties::builder()
5155 .set_statistics_enabled(EnabledStatistics::Page)
5156 .build(),
5157 );
5158 let parquet_schema = ArrowSchemaConverter::new()
5159 .with_coerce_types(props.coerce_types())
5160 .convert(&schema)
5161 .unwrap();
5162
5163 let mut buf = Vec::with_capacity(1024);
5164 let mut writer =
5165 SerializedFileWriter::new(&mut buf, parquet_schema.root_schema_ptr(), props.clone())
5166 .unwrap();
5167
5168 let factory = ArrowRowGroupWriterFactory::new(&writer, Arc::clone(&schema));
5169 let mut col_writers = factory.create_column_writers(0).unwrap();
5170 let arr: ArrayRef = Arc::new(Int32Array::from_iter_values(0..64));
5171 for leaves in compute_leaves(schema.field(0), &arr).unwrap() {
5172 col_writers[0].write(&leaves).unwrap();
5173 }
5174 let mut chunk = col_writers.pop().unwrap().close().unwrap();
5175
5176 assert!(
5178 chunk.close().column_index.is_some(),
5179 "EnabledStatistics::Page should produce a column_index"
5180 );
5181
5182 chunk.close_mut().column_index = None;
5184 assert!(chunk.close().column_index.is_none());
5185
5186 let mut rg = writer.next_row_group().unwrap();
5187 chunk.append_to_row_group(&mut rg).unwrap();
5188 rg.close().unwrap();
5189 let file_meta = writer.close().unwrap();
5190
5191 let cc = file_meta.row_group(0).column(0);
5194 assert!(cc.column_index_range().is_none());
5195 }
5196}