1use crate::column::chunker::ContentDefinedChunker;
21
22use bytes::Bytes;
23use std::io::{Read, Write};
24use std::iter::Peekable;
25use std::slice::Iter;
26use std::sync::{Arc, Mutex};
27use std::vec::IntoIter;
28
29use arrow_array::cast::AsArray;
30use arrow_array::types::*;
31use arrow_array::{ArrayRef, Int32Array, RecordBatch, RecordBatchWriter};
32use arrow_schema::{
33 ArrowError, DataType as ArrowDataType, Field, IntervalUnit, SchemaRef, TimeUnit,
34};
35
36use super::schema::{add_encoded_arrow_schema_to_metadata, decimal_length_from_precision};
37
38use crate::arrow::ArrowSchemaConverter;
39use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder;
40use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter};
41use crate::column::page_encryption::PageEncryptor;
42use crate::column::writer::encoder::ColumnValueEncoder;
43use crate::column::writer::{
44 ColumnCloseResult, ColumnWriter, GenericColumnWriter, get_column_writer,
45};
46use crate::data_type::{ByteArray, FixedLenByteArray};
47#[cfg(feature = "encryption")]
48use crate::encryption::encrypt::FileEncryptor;
49use crate::errors::{ParquetError, Result};
50use crate::file::metadata::{KeyValue, ParquetMetaData, RowGroupMetaData};
51use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
52use crate::file::reader::{ChunkReader, Length};
53use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
54use crate::parquet_thrift::{ThriftCompactOutputProtocol, WriteThrift};
55use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor};
56use levels::{ArrayLevels, calculate_array_levels};
57
58mod byte_array;
59mod levels;
60
61pub struct ArrowWriter<W: Write> {
178 writer: SerializedFileWriter<W>,
180
181 in_progress: Option<ArrowRowGroupWriter>,
183
184 arrow_schema: SchemaRef,
188
189 row_group_writer_factory: ArrowRowGroupWriterFactory,
191
192 max_row_group_row_count: Option<usize>,
194
195 max_row_group_bytes: Option<usize>,
197
198 cdc_chunkers: Option<Vec<ContentDefinedChunker>>,
200}
201
202impl<W: Write + Send> std::fmt::Debug for ArrowWriter<W> {
203 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
204 let buffered_memory = self.in_progress_size();
205 f.debug_struct("ArrowWriter")
206 .field("writer", &self.writer)
207 .field("in_progress_size", &format_args!("{buffered_memory} bytes"))
208 .field("in_progress_rows", &self.in_progress_rows())
209 .field("arrow_schema", &self.arrow_schema)
210 .field("max_row_group_row_count", &self.max_row_group_row_count)
211 .field("max_row_group_bytes", &self.max_row_group_bytes)
212 .finish()
213 }
214}
215
216impl<W: Write + Send> ArrowWriter<W> {
217 pub fn try_new(
223 writer: W,
224 arrow_schema: SchemaRef,
225 props: Option<WriterProperties>,
226 ) -> Result<Self> {
227 let options = ArrowWriterOptions::new().with_properties(props.unwrap_or_default());
228 Self::try_new_with_options(writer, arrow_schema, options)
229 }
230
231 pub fn try_new_with_options(
237 writer: W,
238 arrow_schema: SchemaRef,
239 options: ArrowWriterOptions,
240 ) -> Result<Self> {
241 let mut props = options.properties;
242
243 let schema = if let Some(parquet_schema) = options.schema_descr {
244 parquet_schema.clone()
245 } else {
246 let mut converter = ArrowSchemaConverter::new().with_coerce_types(props.coerce_types());
247 if let Some(schema_root) = &options.schema_root {
248 converter = converter.schema_root(schema_root);
249 }
250
251 converter.convert(&arrow_schema)?
252 };
253
254 if !options.skip_arrow_metadata {
255 add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
257 }
258
259 let max_row_group_row_count = props.max_row_group_row_count();
260 let max_row_group_bytes = props.max_row_group_bytes();
261
262 let props_ptr = Arc::new(props);
263 let file_writer =
264 SerializedFileWriter::new(writer, schema.root_schema_ptr(), Arc::clone(&props_ptr))?;
265
266 let row_group_writer_factory =
267 ArrowRowGroupWriterFactory::new(&file_writer, arrow_schema.clone());
268
269 let cdc_chunkers = props_ptr
270 .content_defined_chunking()
271 .map(|opts| {
272 file_writer
273 .schema_descr()
274 .columns()
275 .iter()
276 .map(|desc| ContentDefinedChunker::new(desc, opts))
277 .collect::<Result<Vec<_>>>()
278 })
279 .transpose()?;
280
281 Ok(Self {
282 writer: file_writer,
283 in_progress: None,
284 arrow_schema,
285 row_group_writer_factory,
286 max_row_group_row_count,
287 max_row_group_bytes,
288 cdc_chunkers,
289 })
290 }
291
292 pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
294 self.writer.flushed_row_groups()
295 }
296
297 pub fn memory_size(&self) -> usize {
302 match &self.in_progress {
303 Some(in_progress) => in_progress.writers.iter().map(|x| x.memory_size()).sum(),
304 None => 0,
305 }
306 }
307
308 pub fn in_progress_size(&self) -> usize {
315 match &self.in_progress {
316 Some(in_progress) => in_progress
317 .writers
318 .iter()
319 .map(|x| x.get_estimated_total_bytes())
320 .sum(),
321 None => 0,
322 }
323 }
324
325 pub fn in_progress_rows(&self) -> usize {
327 self.in_progress
328 .as_ref()
329 .map(|x| x.buffered_rows)
330 .unwrap_or_default()
331 }
332
333 pub fn bytes_written(&self) -> usize {
335 self.writer.bytes_written()
336 }
337
338 pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
350 if batch.num_rows() == 0 {
351 return Ok(());
352 }
353
354 let in_progress = match &mut self.in_progress {
355 Some(in_progress) => in_progress,
356 x => x.insert(
357 self.row_group_writer_factory
358 .create_row_group_writer(self.writer.flushed_row_groups().len())?,
359 ),
360 };
361
362 if let Some(max_rows) = self.max_row_group_row_count {
363 if in_progress.buffered_rows + batch.num_rows() > max_rows {
364 let to_write = max_rows - in_progress.buffered_rows;
365 let a = batch.slice(0, to_write);
366 let b = batch.slice(to_write, batch.num_rows() - to_write);
367 self.write(&a)?;
368 return self.write(&b);
369 }
370 }
371
372 if let Some(max_bytes) = self.max_row_group_bytes {
375 if in_progress.buffered_rows > 0 {
376 let current_bytes = in_progress.get_estimated_total_bytes();
377
378 if current_bytes >= max_bytes {
379 self.flush()?;
380 return self.write(batch);
381 }
382
383 let avg_row_bytes = current_bytes / in_progress.buffered_rows;
384 if avg_row_bytes > 0 {
385 let remaining_bytes = max_bytes - current_bytes;
387 let rows_that_fit = remaining_bytes / avg_row_bytes;
388
389 if batch.num_rows() > rows_that_fit {
390 if rows_that_fit > 0 {
391 let a = batch.slice(0, rows_that_fit);
392 let b = batch.slice(rows_that_fit, batch.num_rows() - rows_that_fit);
393 self.write(&a)?;
394 return self.write(&b);
395 } else {
396 self.flush()?;
397 return self.write(batch);
398 }
399 }
400 }
401 }
402 }
403
404 match self.cdc_chunkers.as_mut() {
405 Some(chunkers) => in_progress.write_with_chunkers(batch, chunkers)?,
406 None => in_progress.write(batch)?,
407 }
408
409 let should_flush = self
410 .max_row_group_row_count
411 .is_some_and(|max| in_progress.buffered_rows >= max)
412 || self
413 .max_row_group_bytes
414 .is_some_and(|max| in_progress.get_estimated_total_bytes() >= max);
415
416 if should_flush {
417 self.flush()?
418 }
419 Ok(())
420 }
421
422 pub fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
427 self.writer.write_all(buf)
428 }
429
430 pub fn sync(&mut self) -> std::io::Result<()> {
432 self.writer.flush()
433 }
434
435 pub fn flush(&mut self) -> Result<()> {
440 let in_progress = match self.in_progress.take() {
441 Some(in_progress) => in_progress,
442 None => return Ok(()),
443 };
444
445 let mut row_group_writer = self.writer.next_row_group()?;
446 for chunk in in_progress.close()? {
447 chunk.append_to_row_group(&mut row_group_writer)?;
448 }
449 row_group_writer.close()?;
450 Ok(())
451 }
452
453 pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
457 self.writer.append_key_value_metadata(kv_metadata)
458 }
459
460 pub fn inner(&self) -> &W {
462 self.writer.inner()
463 }
464
465 pub fn inner_mut(&mut self) -> &mut W {
474 self.writer.inner_mut()
475 }
476
477 pub fn into_inner(mut self) -> Result<W> {
479 self.flush()?;
480 self.writer.into_inner()
481 }
482
483 pub fn finish(&mut self) -> Result<ParquetMetaData> {
489 self.flush()?;
490 self.writer.finish()
491 }
492
493 pub fn close(mut self) -> Result<ParquetMetaData> {
495 self.finish()
496 }
497
498 #[deprecated(
500 since = "56.2.0",
501 note = "Use `ArrowRowGroupWriterFactory` instead, see `ArrowColumnWriter` for an example"
502 )]
503 pub fn get_column_writers(&mut self) -> Result<Vec<ArrowColumnWriter>> {
504 self.flush()?;
505 let in_progress = self
506 .row_group_writer_factory
507 .create_row_group_writer(self.writer.flushed_row_groups().len())?;
508 Ok(in_progress.writers)
509 }
510
511 #[deprecated(
513 since = "56.2.0",
514 note = "Use `SerializedFileWriter` directly instead, see `ArrowColumnWriter` for an example"
515 )]
516 pub fn append_row_group(&mut self, chunks: Vec<ArrowColumnChunk>) -> Result<()> {
517 let mut row_group_writer = self.writer.next_row_group()?;
518 for chunk in chunks {
519 chunk.append_to_row_group(&mut row_group_writer)?;
520 }
521 row_group_writer.close()?;
522 Ok(())
523 }
524
525 pub fn into_serialized_writer(
532 mut self,
533 ) -> Result<(SerializedFileWriter<W>, ArrowRowGroupWriterFactory)> {
534 self.flush()?;
535 Ok((self.writer, self.row_group_writer_factory))
536 }
537}
538
539impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> {
540 fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
541 self.write(batch).map_err(|e| e.into())
542 }
543
544 fn close(self) -> std::result::Result<(), ArrowError> {
545 self.close()?;
546 Ok(())
547 }
548}
549
550#[derive(Debug, Clone, Default)]
554pub struct ArrowWriterOptions {
555 properties: WriterProperties,
556 skip_arrow_metadata: bool,
557 schema_root: Option<String>,
558 schema_descr: Option<SchemaDescriptor>,
559}
560
561impl ArrowWriterOptions {
562 pub fn new() -> Self {
564 Self::default()
565 }
566
567 pub fn with_properties(self, properties: WriterProperties) -> Self {
569 Self { properties, ..self }
570 }
571
572 pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
579 Self {
580 skip_arrow_metadata,
581 ..self
582 }
583 }
584
585 pub fn with_schema_root(self, schema_root: String) -> Self {
587 Self {
588 schema_root: Some(schema_root),
589 ..self
590 }
591 }
592
593 pub fn with_parquet_schema(self, schema_descr: SchemaDescriptor) -> Self {
599 Self {
600 schema_descr: Some(schema_descr),
601 ..self
602 }
603 }
604}
605
606#[derive(Default)]
608struct ArrowColumnChunkData {
609 length: usize,
610 data: Vec<Bytes>,
611}
612
613impl Length for ArrowColumnChunkData {
614 fn len(&self) -> u64 {
615 self.length as _
616 }
617}
618
619impl ChunkReader for ArrowColumnChunkData {
620 type T = ArrowColumnChunkReader;
621
622 fn get_read(&self, start: u64) -> Result<Self::T> {
623 assert_eq!(start, 0); Ok(ArrowColumnChunkReader(
625 self.data.clone().into_iter().peekable(),
626 ))
627 }
628
629 fn get_bytes(&self, _start: u64, _length: usize) -> Result<Bytes> {
630 unimplemented!()
631 }
632}
633
634struct ArrowColumnChunkReader(Peekable<IntoIter<Bytes>>);
636
637impl Read for ArrowColumnChunkReader {
638 fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
639 let buffer = loop {
640 match self.0.peek_mut() {
641 Some(b) if b.is_empty() => {
642 self.0.next();
643 continue;
644 }
645 Some(b) => break b,
646 None => return Ok(0),
647 }
648 };
649
650 let len = buffer.len().min(out.len());
651 let b = buffer.split_to(len);
652 out[..len].copy_from_slice(&b);
653 Ok(len)
654 }
655}
656
657type SharedColumnChunk = Arc<Mutex<ArrowColumnChunkData>>;
662
663#[derive(Default)]
664struct ArrowPageWriter {
665 buffer: SharedColumnChunk,
666 #[cfg(feature = "encryption")]
667 page_encryptor: Option<PageEncryptor>,
668}
669
670impl ArrowPageWriter {
671 #[cfg(feature = "encryption")]
672 pub fn with_encryptor(mut self, page_encryptor: Option<PageEncryptor>) -> Self {
673 self.page_encryptor = page_encryptor;
674 self
675 }
676
677 #[cfg(feature = "encryption")]
678 fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
679 self.page_encryptor.as_mut()
680 }
681
682 #[cfg(not(feature = "encryption"))]
683 fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
684 None
685 }
686}
687
688impl PageWriter for ArrowPageWriter {
689 fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
690 let page = match self.page_encryptor_mut() {
691 Some(page_encryptor) => page_encryptor.encrypt_compressed_page(page)?,
692 None => page,
693 };
694
695 let page_header = page.to_thrift_header()?;
696 let header = {
697 let mut header = Vec::with_capacity(1024);
698
699 match self.page_encryptor_mut() {
700 Some(page_encryptor) => {
701 page_encryptor.encrypt_page_header(&page_header, &mut header)?;
702 if page.compressed_page().is_data_page() {
703 page_encryptor.increment_page();
704 }
705 }
706 None => {
707 let mut protocol = ThriftCompactOutputProtocol::new(&mut header);
708 page_header.write_thrift(&mut protocol)?;
709 }
710 };
711
712 Bytes::from(header)
713 };
714
715 let mut buf = self.buffer.try_lock().unwrap();
716
717 let data = page.compressed_page().buffer().clone();
718 let compressed_size = data.len() + header.len();
719
720 let mut spec = PageWriteSpec::new();
721 spec.page_type = page.page_type();
722 spec.num_values = page.num_values();
723 spec.uncompressed_size = page.uncompressed_size() + header.len();
724 spec.offset = buf.length as u64;
725 spec.compressed_size = compressed_size;
726 spec.bytes_written = compressed_size as u64;
727
728 buf.length += compressed_size;
729 buf.data.push(header);
730 buf.data.push(data);
731
732 Ok(spec)
733 }
734
735 fn close(&mut self) -> Result<()> {
736 Ok(())
737 }
738}
739
740#[derive(Debug)]
742pub struct ArrowLeafColumn(ArrayLevels);
743
744pub fn compute_leaves(field: &Field, array: &ArrayRef) -> Result<Vec<ArrowLeafColumn>> {
749 let levels = calculate_array_levels(array, field)?;
750 Ok(levels.into_iter().map(ArrowLeafColumn).collect())
751}
752
753pub struct ArrowColumnChunk {
755 data: ArrowColumnChunkData,
756 close: ColumnCloseResult,
757}
758
759impl std::fmt::Debug for ArrowColumnChunk {
760 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
761 f.debug_struct("ArrowColumnChunk")
762 .field("length", &self.data.length)
763 .finish_non_exhaustive()
764 }
765}
766
767impl ArrowColumnChunk {
768 pub fn close(&self) -> &ColumnCloseResult {
775 &self.close
776 }
777
778 pub fn close_mut(&mut self) -> &mut ColumnCloseResult {
785 &mut self.close
786 }
787
788 pub fn append_to_row_group<W: Write + Send>(
790 self,
791 writer: &mut SerializedRowGroupWriter<'_, W>,
792 ) -> Result<()> {
793 writer.append_column(&self.data, self.close)
794 }
795}
796
797pub struct ArrowColumnWriter {
895 writer: ArrowColumnWriterImpl,
896 chunk: SharedColumnChunk,
897}
898
899impl std::fmt::Debug for ArrowColumnWriter {
900 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
901 f.debug_struct("ArrowColumnWriter").finish_non_exhaustive()
902 }
903}
904
905enum ArrowColumnWriterImpl {
906 ByteArray(GenericColumnWriter<'static, ByteArrayEncoder>),
907 Column(ColumnWriter<'static>),
908}
909
910impl ArrowColumnWriter {
911 pub fn write(&mut self, col: &ArrowLeafColumn) -> Result<()> {
913 self.write_internal(&col.0)
914 }
915
916 fn write_with_chunker(
918 &mut self,
919 col: &ArrowLeafColumn,
920 chunker: &mut ContentDefinedChunker,
921 ) -> Result<()> {
922 let levels = &col.0;
923 let chunks =
924 chunker.get_arrow_chunks(levels.def_levels(), levels.rep_levels(), levels.array())?;
925
926 let num_chunks = chunks.len();
927 for (i, chunk) in chunks.iter().enumerate() {
928 let chunk_levels = levels.slice_for_chunk(chunk);
929 self.write_internal(&chunk_levels)?;
930
931 if i + 1 < num_chunks {
933 match &mut self.writer {
934 ArrowColumnWriterImpl::Column(c) => c.add_data_page()?,
935 ArrowColumnWriterImpl::ByteArray(c) => c.add_data_page()?,
936 }
937 }
938 }
939 Ok(())
940 }
941
942 fn write_internal(&mut self, levels: &ArrayLevels) -> Result<()> {
943 match &mut self.writer {
944 ArrowColumnWriterImpl::Column(c) => {
945 let leaf = levels.array();
946 match leaf.as_any_dictionary_opt() {
947 Some(dictionary) => {
948 let materialized =
949 arrow_select::take::take(dictionary.values(), dictionary.keys(), None)?;
950 write_leaf(c, &materialized, levels)?
951 }
952 None => write_leaf(c, leaf, levels)?,
953 };
954 }
955 ArrowColumnWriterImpl::ByteArray(c) => {
956 write_primitive(c, levels.array().as_ref(), levels)?;
957 }
958 }
959 Ok(())
960 }
961
962 pub fn close(self) -> Result<ArrowColumnChunk> {
964 let close = match self.writer {
965 ArrowColumnWriterImpl::ByteArray(c) => c.close()?,
966 ArrowColumnWriterImpl::Column(c) => c.close()?,
967 };
968 let chunk = Arc::try_unwrap(self.chunk).ok().unwrap();
969 let data = chunk.into_inner().unwrap();
970 Ok(ArrowColumnChunk { data, close })
971 }
972
973 pub fn memory_size(&self) -> usize {
984 match &self.writer {
985 ArrowColumnWriterImpl::ByteArray(c) => c.memory_size(),
986 ArrowColumnWriterImpl::Column(c) => c.memory_size(),
987 }
988 }
989
990 pub fn get_estimated_total_bytes(&self) -> usize {
998 match &self.writer {
999 ArrowColumnWriterImpl::ByteArray(c) => c.get_estimated_total_bytes() as _,
1000 ArrowColumnWriterImpl::Column(c) => c.get_estimated_total_bytes() as _,
1001 }
1002 }
1003}
1004
1005#[derive(Debug)]
1012struct ArrowRowGroupWriter {
1013 writers: Vec<ArrowColumnWriter>,
1014 schema: SchemaRef,
1015 buffered_rows: usize,
1016}
1017
1018impl ArrowRowGroupWriter {
1019 fn new(writers: Vec<ArrowColumnWriter>, arrow: &SchemaRef) -> Self {
1020 Self {
1021 writers,
1022 schema: arrow.clone(),
1023 buffered_rows: 0,
1024 }
1025 }
1026
1027 fn write(&mut self, batch: &RecordBatch) -> Result<()> {
1028 self.buffered_rows += batch.num_rows();
1029 let mut writers = self.writers.iter_mut();
1030 for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
1031 for leaf in compute_leaves(field.as_ref(), column)? {
1032 writers.next().unwrap().write(&leaf)?;
1033 }
1034 }
1035 Ok(())
1036 }
1037
1038 fn write_with_chunkers(
1039 &mut self,
1040 batch: &RecordBatch,
1041 chunkers: &mut [ContentDefinedChunker],
1042 ) -> Result<()> {
1043 self.buffered_rows += batch.num_rows();
1044 let mut writers = self.writers.iter_mut();
1045 let mut chunkers = chunkers.iter_mut();
1046 for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
1047 for leaf in compute_leaves(field.as_ref(), column)? {
1048 writers
1049 .next()
1050 .unwrap()
1051 .write_with_chunker(&leaf, chunkers.next().unwrap())?;
1052 }
1053 }
1054 Ok(())
1055 }
1056
1057 fn get_estimated_total_bytes(&self) -> usize {
1059 self.writers
1060 .iter()
1061 .map(|x| x.get_estimated_total_bytes())
1062 .sum()
1063 }
1064
1065 fn close(self) -> Result<Vec<ArrowColumnChunk>> {
1066 self.writers
1067 .into_iter()
1068 .map(|writer| writer.close())
1069 .collect()
1070 }
1071}
1072
1073#[derive(Debug)]
1078pub struct ArrowRowGroupWriterFactory {
1079 schema: SchemaDescPtr,
1080 arrow_schema: SchemaRef,
1081 props: WriterPropertiesPtr,
1082 #[cfg(feature = "encryption")]
1083 file_encryptor: Option<Arc<FileEncryptor>>,
1084}
1085
1086impl ArrowRowGroupWriterFactory {
1087 pub fn new<W: Write + Send>(
1089 file_writer: &SerializedFileWriter<W>,
1090 arrow_schema: SchemaRef,
1091 ) -> Self {
1092 let schema = Arc::clone(file_writer.schema_descr_ptr());
1093 let props = Arc::clone(file_writer.properties());
1094 Self {
1095 schema,
1096 arrow_schema,
1097 props,
1098 #[cfg(feature = "encryption")]
1099 file_encryptor: file_writer.file_encryptor(),
1100 }
1101 }
1102
1103 fn create_row_group_writer(&self, row_group_index: usize) -> Result<ArrowRowGroupWriter> {
1104 let writers = self.create_column_writers(row_group_index)?;
1105 Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema))
1106 }
1107
1108 pub fn create_column_writers(&self, row_group_index: usize) -> Result<Vec<ArrowColumnWriter>> {
1110 let mut writers = Vec::with_capacity(self.arrow_schema.fields.len());
1111 let mut leaves = self.schema.columns().iter();
1112 let column_factory = self.column_writer_factory(row_group_index);
1113 for field in &self.arrow_schema.fields {
1114 column_factory.get_arrow_column_writer(
1115 field.data_type(),
1116 &self.props,
1117 &mut leaves,
1118 &mut writers,
1119 )?;
1120 }
1121 Ok(writers)
1122 }
1123
1124 #[cfg(feature = "encryption")]
1125 fn column_writer_factory(&self, row_group_idx: usize) -> ArrowColumnWriterFactory {
1126 ArrowColumnWriterFactory::new()
1127 .with_file_encryptor(row_group_idx, self.file_encryptor.clone())
1128 }
1129
1130 #[cfg(not(feature = "encryption"))]
1131 fn column_writer_factory(&self, _row_group_idx: usize) -> ArrowColumnWriterFactory {
1132 ArrowColumnWriterFactory::new()
1133 }
1134}
1135
1136#[deprecated(since = "57.0.0", note = "Use `ArrowRowGroupWriterFactory` instead")]
1138pub fn get_column_writers(
1139 parquet: &SchemaDescriptor,
1140 props: &WriterPropertiesPtr,
1141 arrow: &SchemaRef,
1142) -> Result<Vec<ArrowColumnWriter>> {
1143 let mut writers = Vec::with_capacity(arrow.fields.len());
1144 let mut leaves = parquet.columns().iter();
1145 let column_factory = ArrowColumnWriterFactory::new();
1146 for field in &arrow.fields {
1147 column_factory.get_arrow_column_writer(
1148 field.data_type(),
1149 props,
1150 &mut leaves,
1151 &mut writers,
1152 )?;
1153 }
1154 Ok(writers)
1155}
1156
1157struct ArrowColumnWriterFactory {
1159 #[cfg(feature = "encryption")]
1160 row_group_index: usize,
1161 #[cfg(feature = "encryption")]
1162 file_encryptor: Option<Arc<FileEncryptor>>,
1163}
1164
1165impl ArrowColumnWriterFactory {
1166 pub fn new() -> Self {
1167 Self {
1168 #[cfg(feature = "encryption")]
1169 row_group_index: 0,
1170 #[cfg(feature = "encryption")]
1171 file_encryptor: None,
1172 }
1173 }
1174
1175 #[cfg(feature = "encryption")]
1176 pub fn with_file_encryptor(
1177 mut self,
1178 row_group_index: usize,
1179 file_encryptor: Option<Arc<FileEncryptor>>,
1180 ) -> Self {
1181 self.row_group_index = row_group_index;
1182 self.file_encryptor = file_encryptor;
1183 self
1184 }
1185
1186 #[cfg(feature = "encryption")]
1187 fn create_page_writer(
1188 &self,
1189 column_descriptor: &ColumnDescPtr,
1190 column_index: usize,
1191 ) -> Result<Box<ArrowPageWriter>> {
1192 let column_path = column_descriptor.path().string();
1193 let page_encryptor = PageEncryptor::create_if_column_encrypted(
1194 &self.file_encryptor,
1195 self.row_group_index,
1196 column_index,
1197 &column_path,
1198 )?;
1199 Ok(Box::new(
1200 ArrowPageWriter::default().with_encryptor(page_encryptor),
1201 ))
1202 }
1203
1204 #[cfg(not(feature = "encryption"))]
1205 fn create_page_writer(
1206 &self,
1207 _column_descriptor: &ColumnDescPtr,
1208 _column_index: usize,
1209 ) -> Result<Box<ArrowPageWriter>> {
1210 Ok(Box::<ArrowPageWriter>::default())
1211 }
1212
1213 fn get_arrow_column_writer(
1216 &self,
1217 data_type: &ArrowDataType,
1218 props: &WriterPropertiesPtr,
1219 leaves: &mut Iter<'_, ColumnDescPtr>,
1220 out: &mut Vec<ArrowColumnWriter>,
1221 ) -> Result<()> {
1222 let col = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
1224 let page_writer = self.create_page_writer(desc, out.len())?;
1225 let chunk = page_writer.buffer.clone();
1226 let writer = get_column_writer(desc.clone(), props.clone(), page_writer);
1227 Ok(ArrowColumnWriter {
1228 chunk,
1229 writer: ArrowColumnWriterImpl::Column(writer),
1230 })
1231 };
1232
1233 let bytes = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
1235 let page_writer = self.create_page_writer(desc, out.len())?;
1236 let chunk = page_writer.buffer.clone();
1237 let writer = GenericColumnWriter::new(desc.clone(), props.clone(), page_writer);
1238 Ok(ArrowColumnWriter {
1239 chunk,
1240 writer: ArrowColumnWriterImpl::ByteArray(writer),
1241 })
1242 };
1243
1244 match data_type {
1245 _ if data_type.is_primitive() => out.push(col(leaves.next().unwrap())?),
1246 ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Boolean | ArrowDataType::Null => {
1247 out.push(col(leaves.next().unwrap())?)
1248 }
1249 ArrowDataType::LargeBinary
1250 | ArrowDataType::Binary
1251 | ArrowDataType::Utf8
1252 | ArrowDataType::LargeUtf8
1253 | ArrowDataType::BinaryView
1254 | ArrowDataType::Utf8View => out.push(bytes(leaves.next().unwrap())?),
1255 ArrowDataType::List(f)
1256 | ArrowDataType::LargeList(f)
1257 | ArrowDataType::FixedSizeList(f, _)
1258 | ArrowDataType::ListView(f)
1259 | ArrowDataType::LargeListView(f) => {
1260 self.get_arrow_column_writer(f.data_type(), props, leaves, out)?
1261 }
1262 ArrowDataType::Struct(fields) => {
1263 for field in fields {
1264 self.get_arrow_column_writer(field.data_type(), props, leaves, out)?
1265 }
1266 }
1267 ArrowDataType::Map(f, _) => match f.data_type() {
1268 ArrowDataType::Struct(f) => {
1269 self.get_arrow_column_writer(f[0].data_type(), props, leaves, out)?;
1270 self.get_arrow_column_writer(f[1].data_type(), props, leaves, out)?
1271 }
1272 _ => unreachable!("invalid map type"),
1273 },
1274 ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1275 ArrowDataType::Utf8
1276 | ArrowDataType::LargeUtf8
1277 | ArrowDataType::Binary
1278 | ArrowDataType::LargeBinary => out.push(bytes(leaves.next().unwrap())?),
1279 ArrowDataType::Utf8View | ArrowDataType::BinaryView => {
1280 out.push(bytes(leaves.next().unwrap())?)
1281 }
1282 ArrowDataType::FixedSizeBinary(_) => out.push(bytes(leaves.next().unwrap())?),
1283 _ => out.push(col(leaves.next().unwrap())?),
1284 },
1285 _ => {
1286 return Err(ParquetError::NYI(format!(
1287 "Attempting to write an Arrow type {data_type} to parquet that is not yet implemented"
1288 )));
1289 }
1290 }
1291 Ok(())
1292 }
1293}
1294
1295fn write_leaf(
1296 writer: &mut ColumnWriter<'_>,
1297 column: &dyn arrow_array::Array,
1298 levels: &ArrayLevels,
1299) -> Result<usize> {
1300 let indices = levels.non_null_indices();
1301
1302 match writer {
1303 ColumnWriter::Int32ColumnWriter(typed) => {
1305 match column.data_type() {
1306 ArrowDataType::Null => {
1307 let array = Int32Array::new_null(column.len());
1308 write_primitive(typed, array.values(), levels)
1309 }
1310 ArrowDataType::Int8 => {
1311 let array: Int32Array = column.as_primitive::<Int8Type>().unary(|x| x as i32);
1312 write_primitive(typed, array.values(), levels)
1313 }
1314 ArrowDataType::Int16 => {
1315 let array: Int32Array = column.as_primitive::<Int16Type>().unary(|x| x as i32);
1316 write_primitive(typed, array.values(), levels)
1317 }
1318 ArrowDataType::Int32 => {
1319 write_primitive(typed, column.as_primitive::<Int32Type>().values(), levels)
1320 }
1321 ArrowDataType::UInt8 => {
1322 let array: Int32Array = column.as_primitive::<UInt8Type>().unary(|x| x as i32);
1323 write_primitive(typed, array.values(), levels)
1324 }
1325 ArrowDataType::UInt16 => {
1326 let array: Int32Array = column.as_primitive::<UInt16Type>().unary(|x| x as i32);
1327 write_primitive(typed, array.values(), levels)
1328 }
1329 ArrowDataType::UInt32 => {
1330 let array = column.as_primitive::<UInt32Type>();
1333 write_primitive(typed, array.values().inner().typed_data(), levels)
1334 }
1335 ArrowDataType::Date32 => {
1336 let array = column.as_primitive::<Date32Type>();
1337 write_primitive(typed, array.values(), levels)
1338 }
1339 ArrowDataType::Time32(TimeUnit::Second) => {
1340 let array = column.as_primitive::<Time32SecondType>();
1341 write_primitive(typed, array.values(), levels)
1342 }
1343 ArrowDataType::Time32(TimeUnit::Millisecond) => {
1344 let array = column.as_primitive::<Time32MillisecondType>();
1345 write_primitive(typed, array.values(), levels)
1346 }
1347 ArrowDataType::Date64 => {
1348 let array: Int32Array = column
1350 .as_primitive::<Date64Type>()
1351 .unary(|x| (x / 86_400_000) as _);
1352
1353 write_primitive(typed, array.values(), levels)
1354 }
1355 ArrowDataType::Decimal32(_, _) => {
1356 let array = column
1357 .as_primitive::<Decimal32Type>()
1358 .unary::<_, Int32Type>(|v| v);
1359 write_primitive(typed, array.values(), levels)
1360 }
1361 ArrowDataType::Decimal64(_, _) => {
1362 let array = column
1364 .as_primitive::<Decimal64Type>()
1365 .unary::<_, Int32Type>(|v| v as i32);
1366 write_primitive(typed, array.values(), levels)
1367 }
1368 ArrowDataType::Decimal128(_, _) => {
1369 let array = column
1371 .as_primitive::<Decimal128Type>()
1372 .unary::<_, Int32Type>(|v| v as i32);
1373 write_primitive(typed, array.values(), levels)
1374 }
1375 ArrowDataType::Decimal256(_, _) => {
1376 let array = column
1378 .as_primitive::<Decimal256Type>()
1379 .unary::<_, Int32Type>(|v| v.as_i128() as i32);
1380 write_primitive(typed, array.values(), levels)
1381 }
1382 d => Err(ParquetError::General(format!("Cannot coerce {d} to I32"))),
1383 }
1384 }
1385 ColumnWriter::BoolColumnWriter(typed) => {
1386 let array = column.as_boolean();
1387 typed.write_batch(
1388 get_bool_array_slice(array, indices).as_slice(),
1389 levels.def_levels(),
1390 levels.rep_levels(),
1391 )
1392 }
1393 ColumnWriter::Int64ColumnWriter(typed) => {
1394 match column.data_type() {
1395 ArrowDataType::Date64 => {
1396 let array = column
1397 .as_primitive::<Date64Type>()
1398 .reinterpret_cast::<Int64Type>();
1399
1400 write_primitive(typed, array.values(), levels)
1401 }
1402 ArrowDataType::Int64 => {
1403 let array = column.as_primitive::<Int64Type>();
1404 write_primitive(typed, array.values(), levels)
1405 }
1406 ArrowDataType::UInt64 => {
1407 let values = column.as_primitive::<UInt64Type>().values();
1408 let array = values.inner().typed_data::<i64>();
1411 write_primitive(typed, array, levels)
1412 }
1413 ArrowDataType::Time64(TimeUnit::Microsecond) => {
1414 let array = column.as_primitive::<Time64MicrosecondType>();
1415 write_primitive(typed, array.values(), levels)
1416 }
1417 ArrowDataType::Time64(TimeUnit::Nanosecond) => {
1418 let array = column.as_primitive::<Time64NanosecondType>();
1419 write_primitive(typed, array.values(), levels)
1420 }
1421 ArrowDataType::Timestamp(unit, _) => match unit {
1422 TimeUnit::Second => {
1423 let array = column.as_primitive::<TimestampSecondType>();
1424 write_primitive(typed, array.values(), levels)
1425 }
1426 TimeUnit::Millisecond => {
1427 let array = column.as_primitive::<TimestampMillisecondType>();
1428 write_primitive(typed, array.values(), levels)
1429 }
1430 TimeUnit::Microsecond => {
1431 let array = column.as_primitive::<TimestampMicrosecondType>();
1432 write_primitive(typed, array.values(), levels)
1433 }
1434 TimeUnit::Nanosecond => {
1435 let array = column.as_primitive::<TimestampNanosecondType>();
1436 write_primitive(typed, array.values(), levels)
1437 }
1438 },
1439 ArrowDataType::Duration(unit) => match unit {
1440 TimeUnit::Second => {
1441 let array = column.as_primitive::<DurationSecondType>();
1442 write_primitive(typed, array.values(), levels)
1443 }
1444 TimeUnit::Millisecond => {
1445 let array = column.as_primitive::<DurationMillisecondType>();
1446 write_primitive(typed, array.values(), levels)
1447 }
1448 TimeUnit::Microsecond => {
1449 let array = column.as_primitive::<DurationMicrosecondType>();
1450 write_primitive(typed, array.values(), levels)
1451 }
1452 TimeUnit::Nanosecond => {
1453 let array = column.as_primitive::<DurationNanosecondType>();
1454 write_primitive(typed, array.values(), levels)
1455 }
1456 },
1457 ArrowDataType::Decimal64(_, _) => {
1458 let array = column
1459 .as_primitive::<Decimal64Type>()
1460 .reinterpret_cast::<Int64Type>();
1461 write_primitive(typed, array.values(), levels)
1462 }
1463 ArrowDataType::Decimal128(_, _) => {
1464 let array = column
1466 .as_primitive::<Decimal128Type>()
1467 .unary::<_, Int64Type>(|v| v as i64);
1468 write_primitive(typed, array.values(), levels)
1469 }
1470 ArrowDataType::Decimal256(_, _) => {
1471 let array = column
1473 .as_primitive::<Decimal256Type>()
1474 .unary::<_, Int64Type>(|v| v.as_i128() as i64);
1475 write_primitive(typed, array.values(), levels)
1476 }
1477 d => Err(ParquetError::General(format!("Cannot coerce {d} to I64"))),
1478 }
1479 }
1480 ColumnWriter::Int96ColumnWriter(_typed) => {
1481 unreachable!("Currently unreachable because data type not supported")
1482 }
1483 ColumnWriter::FloatColumnWriter(typed) => {
1484 let array = column.as_primitive::<Float32Type>();
1485 write_primitive(typed, array.values(), levels)
1486 }
1487 ColumnWriter::DoubleColumnWriter(typed) => {
1488 let array = column.as_primitive::<Float64Type>();
1489 write_primitive(typed, array.values(), levels)
1490 }
1491 ColumnWriter::ByteArrayColumnWriter(_) => {
1492 unreachable!("should use ByteArrayWriter")
1493 }
1494 ColumnWriter::FixedLenByteArrayColumnWriter(typed) => {
1495 let bytes = match column.data_type() {
1496 ArrowDataType::Interval(interval_unit) => match interval_unit {
1497 IntervalUnit::YearMonth => {
1498 let array = column.as_primitive::<IntervalYearMonthType>();
1499 get_interval_ym_array_slice(array, indices)
1500 }
1501 IntervalUnit::DayTime => {
1502 let array = column.as_primitive::<IntervalDayTimeType>();
1503 get_interval_dt_array_slice(array, indices)
1504 }
1505 _ => {
1506 return Err(ParquetError::NYI(format!(
1507 "Attempting to write an Arrow interval type {interval_unit:?} to parquet that is not yet implemented"
1508 )));
1509 }
1510 },
1511 ArrowDataType::FixedSizeBinary(_) => {
1512 let array = column.as_fixed_size_binary();
1513 get_fsb_array_slice(array, indices)
1514 }
1515 ArrowDataType::Decimal32(_, _) => {
1516 let array = column.as_primitive::<Decimal32Type>();
1517 get_decimal_32_array_slice(array, indices)
1518 }
1519 ArrowDataType::Decimal64(_, _) => {
1520 let array = column.as_primitive::<Decimal64Type>();
1521 get_decimal_64_array_slice(array, indices)
1522 }
1523 ArrowDataType::Decimal128(_, _) => {
1524 let array = column.as_primitive::<Decimal128Type>();
1525 get_decimal_128_array_slice(array, indices)
1526 }
1527 ArrowDataType::Decimal256(_, _) => {
1528 let array = column.as_primitive::<Decimal256Type>();
1529 get_decimal_256_array_slice(array, indices)
1530 }
1531 ArrowDataType::Float16 => {
1532 let array = column.as_primitive::<Float16Type>();
1533 get_float_16_array_slice(array, indices)
1534 }
1535 _ => {
1536 return Err(ParquetError::NYI(
1537 "Attempting to write an Arrow type that is not yet implemented".to_string(),
1538 ));
1539 }
1540 };
1541 typed.write_batch(bytes.as_slice(), levels.def_levels(), levels.rep_levels())
1542 }
1543 }
1544}
1545
1546fn write_primitive<E: ColumnValueEncoder>(
1547 writer: &mut GenericColumnWriter<E>,
1548 values: &E::Values,
1549 levels: &ArrayLevels,
1550) -> Result<usize> {
1551 writer.write_batch_internal(
1552 values,
1553 Some(levels.non_null_indices()),
1554 levels.def_levels(),
1555 levels.rep_levels(),
1556 None,
1557 None,
1558 None,
1559 )
1560}
1561
1562fn get_bool_array_slice(array: &arrow_array::BooleanArray, indices: &[usize]) -> Vec<bool> {
1563 let mut values = Vec::with_capacity(indices.len());
1564 for i in indices {
1565 values.push(array.value(*i))
1566 }
1567 values
1568}
1569
1570fn get_interval_ym_array_slice(
1573 array: &arrow_array::IntervalYearMonthArray,
1574 indices: &[usize],
1575) -> Vec<FixedLenByteArray> {
1576 let mut values = Vec::with_capacity(indices.len());
1577 for i in indices {
1578 let mut value = array.value(*i).to_le_bytes().to_vec();
1579 let mut suffix = vec![0; 8];
1580 value.append(&mut suffix);
1581 values.push(FixedLenByteArray::from(ByteArray::from(value)))
1582 }
1583 values
1584}
1585
1586fn get_interval_dt_array_slice(
1589 array: &arrow_array::IntervalDayTimeArray,
1590 indices: &[usize],
1591) -> Vec<FixedLenByteArray> {
1592 let mut values = Vec::with_capacity(indices.len());
1593 for i in indices {
1594 let mut out = [0; 12];
1595 let value = array.value(*i);
1596 out[4..8].copy_from_slice(&value.days.to_le_bytes());
1597 out[8..12].copy_from_slice(&value.milliseconds.to_le_bytes());
1598 values.push(FixedLenByteArray::from(ByteArray::from(out.to_vec())));
1599 }
1600 values
1601}
1602
1603fn get_decimal_32_array_slice(
1604 array: &arrow_array::Decimal32Array,
1605 indices: &[usize],
1606) -> Vec<FixedLenByteArray> {
1607 let mut values = Vec::with_capacity(indices.len());
1608 let size = decimal_length_from_precision(array.precision());
1609 for i in indices {
1610 let as_be_bytes = array.value(*i).to_be_bytes();
1611 let resized_value = as_be_bytes[(4 - size)..].to_vec();
1612 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1613 }
1614 values
1615}
1616
1617fn get_decimal_64_array_slice(
1618 array: &arrow_array::Decimal64Array,
1619 indices: &[usize],
1620) -> Vec<FixedLenByteArray> {
1621 let mut values = Vec::with_capacity(indices.len());
1622 let size = decimal_length_from_precision(array.precision());
1623 for i in indices {
1624 let as_be_bytes = array.value(*i).to_be_bytes();
1625 let resized_value = as_be_bytes[(8 - size)..].to_vec();
1626 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1627 }
1628 values
1629}
1630
1631fn get_decimal_128_array_slice(
1632 array: &arrow_array::Decimal128Array,
1633 indices: &[usize],
1634) -> Vec<FixedLenByteArray> {
1635 let mut values = Vec::with_capacity(indices.len());
1636 let size = decimal_length_from_precision(array.precision());
1637 for i in indices {
1638 let as_be_bytes = array.value(*i).to_be_bytes();
1639 let resized_value = as_be_bytes[(16 - size)..].to_vec();
1640 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1641 }
1642 values
1643}
1644
1645fn get_decimal_256_array_slice(
1646 array: &arrow_array::Decimal256Array,
1647 indices: &[usize],
1648) -> Vec<FixedLenByteArray> {
1649 let mut values = Vec::with_capacity(indices.len());
1650 let size = decimal_length_from_precision(array.precision());
1651 for i in indices {
1652 let as_be_bytes = array.value(*i).to_be_bytes();
1653 let resized_value = as_be_bytes[(32 - size)..].to_vec();
1654 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1655 }
1656 values
1657}
1658
1659fn get_float_16_array_slice(
1660 array: &arrow_array::Float16Array,
1661 indices: &[usize],
1662) -> Vec<FixedLenByteArray> {
1663 let mut values = Vec::with_capacity(indices.len());
1664 for i in indices {
1665 let value = array.value(*i).to_le_bytes().to_vec();
1666 values.push(FixedLenByteArray::from(ByteArray::from(value)));
1667 }
1668 values
1669}
1670
1671fn get_fsb_array_slice(
1672 array: &arrow_array::FixedSizeBinaryArray,
1673 indices: &[usize],
1674) -> Vec<FixedLenByteArray> {
1675 let mut values = Vec::with_capacity(indices.len());
1676 for i in indices {
1677 let value = array.value(*i).to_vec();
1678 values.push(FixedLenByteArray::from(ByteArray::from(value)))
1679 }
1680 values
1681}
1682
1683#[cfg(test)]
1684mod tests {
1685 use super::*;
1686 use std::collections::HashMap;
1687
1688 use std::fs::File;
1689
1690 use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
1691 use crate::arrow::{ARROW_SCHEMA_META_KEY, PARQUET_FIELD_ID_META_KEY};
1692 use crate::column::page::{Page, PageReader};
1693 use crate::file::metadata::thrift::PageHeader;
1694 use crate::file::page_index::column_index::ColumnIndexMetaData;
1695 use crate::file::reader::SerializedPageReader;
1696 use crate::parquet_thrift::{ReadThrift, ThriftSliceInputProtocol};
1697 use crate::schema::types::ColumnPath;
1698 use arrow::datatypes::ToByteSlice;
1699 use arrow::datatypes::{DataType, Schema};
1700 use arrow::error::Result as ArrowResult;
1701 use arrow::util::data_gen::create_random_array;
1702 use arrow::util::pretty::pretty_format_batches;
1703 use arrow::{array::*, buffer::Buffer};
1704 use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano, NullBuffer, OffsetBuffer, i256};
1705 use arrow_schema::Fields;
1706 use half::f16;
1707 use num_traits::{FromPrimitive, ToPrimitive};
1708 use tempfile::tempfile;
1709
1710 use crate::basic::Encoding;
1711 use crate::data_type::AsBytes;
1712 use crate::file::metadata::{ColumnChunkMetaData, ParquetMetaData, ParquetMetaDataReader};
1713 use crate::file::properties::{
1714 BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion,
1715 };
1716 use crate::file::serialized_reader::ReadOptionsBuilder;
1717 use crate::file::{
1718 reader::{FileReader, SerializedFileReader},
1719 statistics::Statistics,
1720 };
1721
1722 #[test]
1723 fn arrow_writer() {
1724 let schema = Schema::new(vec![
1726 Field::new("a", DataType::Int32, false),
1727 Field::new("b", DataType::Int32, true),
1728 ]);
1729
1730 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1732 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1733
1734 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
1736
1737 roundtrip(batch, Some(SMALL_SIZE / 2));
1738 }
1739
1740 fn get_bytes_after_close(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1741 let mut buffer = vec![];
1742
1743 let mut writer = ArrowWriter::try_new(&mut buffer, schema, None).unwrap();
1744 writer.write(expected_batch).unwrap();
1745 writer.close().unwrap();
1746
1747 buffer
1748 }
1749
1750 fn get_bytes_by_into_inner(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
1751 let mut writer = ArrowWriter::try_new(Vec::new(), schema, None).unwrap();
1752 writer.write(expected_batch).unwrap();
1753 writer.into_inner().unwrap()
1754 }
1755
1756 #[test]
1757 fn roundtrip_bytes() {
1758 let schema = Arc::new(Schema::new(vec![
1760 Field::new("a", DataType::Int32, false),
1761 Field::new("b", DataType::Int32, true),
1762 ]));
1763
1764 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1766 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
1767
1768 let expected_batch =
1770 RecordBatch::try_new(schema.clone(), vec![Arc::new(a), Arc::new(b)]).unwrap();
1771
1772 for buffer in [
1773 get_bytes_after_close(schema.clone(), &expected_batch),
1774 get_bytes_by_into_inner(schema, &expected_batch),
1775 ] {
1776 let cursor = Bytes::from(buffer);
1777 let mut record_batch_reader = ParquetRecordBatchReader::try_new(cursor, 1024).unwrap();
1778
1779 let actual_batch = record_batch_reader
1780 .next()
1781 .expect("No batch found")
1782 .expect("Unable to get batch");
1783
1784 assert_eq!(expected_batch.schema(), actual_batch.schema());
1785 assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
1786 assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
1787 for i in 0..expected_batch.num_columns() {
1788 let expected_data = expected_batch.column(i).to_data();
1789 let actual_data = actual_batch.column(i).to_data();
1790
1791 assert_eq!(expected_data, actual_data);
1792 }
1793 }
1794 }
1795
1796 #[test]
1797 fn arrow_writer_non_null() {
1798 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1800
1801 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1803
1804 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1806
1807 roundtrip(batch, Some(SMALL_SIZE / 2));
1808 }
1809
1810 #[test]
1811 fn arrow_writer_list() {
1812 let schema = Schema::new(vec![Field::new(
1814 "a",
1815 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
1816 true,
1817 )]);
1818
1819 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1821
1822 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1825
1826 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
1828 DataType::Int32,
1829 false,
1830 ))))
1831 .len(5)
1832 .add_buffer(a_value_offsets)
1833 .add_child_data(a_values.into_data())
1834 .null_bit_buffer(Some(Buffer::from([0b00011011])))
1835 .build()
1836 .unwrap();
1837 let a = ListArray::from(a_list_data);
1838
1839 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1841
1842 assert_eq!(batch.column(0).null_count(), 1);
1843
1844 roundtrip(batch, None);
1847 }
1848
1849 #[test]
1850 fn arrow_writer_list_non_null() {
1851 let schema = Schema::new(vec![Field::new(
1853 "a",
1854 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
1855 false,
1856 )]);
1857
1858 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1860
1861 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
1864
1865 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
1867 DataType::Int32,
1868 false,
1869 ))))
1870 .len(5)
1871 .add_buffer(a_value_offsets)
1872 .add_child_data(a_values.into_data())
1873 .build()
1874 .unwrap();
1875 let a = ListArray::from(a_list_data);
1876
1877 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1879
1880 assert_eq!(batch.column(0).null_count(), 0);
1883
1884 roundtrip(batch, None);
1885 }
1886
1887 #[test]
1888 fn arrow_writer_list_view() {
1889 let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
1890 let schema = Schema::new(vec![Field::new(
1891 "a",
1892 DataType::ListView(list_field.clone()),
1893 true,
1894 )]);
1895
1896 let a = ListViewArray::new(
1898 list_field,
1899 vec![0, 1, 0, 3, 6].into(),
1900 vec![1, 2, 0, 3, 4].into(),
1901 Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
1902 Some(vec![true, true, false, true, true].into()),
1903 );
1904
1905 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1906
1907 assert_eq!(batch.column(0).null_count(), 1);
1908
1909 roundtrip(batch, None);
1910 }
1911
1912 #[test]
1913 fn arrow_writer_list_view_non_null() {
1914 let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
1915 let schema = Schema::new(vec![Field::new(
1916 "a",
1917 DataType::ListView(list_field.clone()),
1918 false,
1919 )]);
1920
1921 let a = ListViewArray::new(
1923 list_field,
1924 vec![0, 1, 0, 3, 6].into(),
1925 vec![1, 2, 0, 3, 4].into(),
1926 Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
1927 None,
1928 );
1929
1930 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1931
1932 assert_eq!(batch.column(0).null_count(), 0);
1933
1934 roundtrip(batch, None);
1935 }
1936
1937 #[test]
1938 fn arrow_writer_list_view_out_of_order() {
1939 let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
1940 let schema = Schema::new(vec![Field::new(
1941 "a",
1942 DataType::ListView(list_field.clone()),
1943 false,
1944 )]);
1945
1946 let a = ListViewArray::new(
1948 list_field,
1949 vec![0, 1, 0, 6, 3].into(),
1950 vec![1, 2, 0, 4, 3].into(),
1951 Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
1952 None,
1953 );
1954
1955 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1956
1957 roundtrip(batch, None);
1958 }
1959
1960 #[test]
1961 fn arrow_writer_large_list_view() {
1962 let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
1963 let schema = Schema::new(vec![Field::new(
1964 "a",
1965 DataType::LargeListView(list_field.clone()),
1966 true,
1967 )]);
1968
1969 let a = LargeListViewArray::new(
1971 list_field,
1972 vec![0i64, 1, 0, 3, 6].into(),
1973 vec![1i64, 2, 0, 3, 4].into(),
1974 Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
1975 Some(vec![true, true, false, true, true].into()),
1976 );
1977
1978 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
1979
1980 assert_eq!(batch.column(0).null_count(), 1);
1981
1982 roundtrip(batch, None);
1983 }
1984
1985 #[test]
1986 fn arrow_writer_list_view_with_struct() {
1987 let struct_fields = Fields::from(vec![
1989 Field::new("id", DataType::Int32, false),
1990 Field::new("name", DataType::Utf8, false),
1991 ]);
1992 let struct_type = DataType::Struct(struct_fields.clone());
1993 let list_field = Arc::new(Field::new("item", struct_type.clone(), false));
1994
1995 let schema = Schema::new(vec![Field::new(
1996 "a",
1997 DataType::ListView(list_field.clone()),
1998 true,
1999 )]);
2000
2001 let id_array = Int32Array::from(vec![1, 2, 3, 4, 5]);
2003 let name_array = StringArray::from(vec!["a", "b", "c", "d", "e"]);
2004 let struct_array = StructArray::new(
2005 struct_fields,
2006 vec![Arc::new(id_array), Arc::new(name_array)],
2007 None,
2008 );
2009
2010 let list_view = ListViewArray::new(
2012 list_field,
2013 vec![0, 2, 2].into(), vec![2, 0, 3].into(), Arc::new(struct_array),
2016 Some(vec![true, false, true].into()),
2017 );
2018
2019 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(list_view)]).unwrap();
2020
2021 roundtrip(batch, None);
2022 }
2023
2024 #[test]
2025 fn arrow_writer_binary() {
2026 let string_field = Field::new("a", DataType::Utf8, false);
2027 let binary_field = Field::new("b", DataType::Binary, false);
2028 let schema = Schema::new(vec![string_field, binary_field]);
2029
2030 let raw_string_values = vec!["foo", "bar", "baz", "quux"];
2031 let raw_binary_values = [
2032 b"foo".to_vec(),
2033 b"bar".to_vec(),
2034 b"baz".to_vec(),
2035 b"quux".to_vec(),
2036 ];
2037 let raw_binary_value_refs = raw_binary_values
2038 .iter()
2039 .map(|x| x.as_slice())
2040 .collect::<Vec<_>>();
2041
2042 let string_values = StringArray::from(raw_string_values.clone());
2043 let binary_values = BinaryArray::from(raw_binary_value_refs);
2044 let batch = RecordBatch::try_new(
2045 Arc::new(schema),
2046 vec![Arc::new(string_values), Arc::new(binary_values)],
2047 )
2048 .unwrap();
2049
2050 roundtrip(batch, Some(SMALL_SIZE / 2));
2051 }
2052
2053 #[test]
2054 fn arrow_writer_binary_view() {
2055 let string_field = Field::new("a", DataType::Utf8View, false);
2056 let binary_field = Field::new("b", DataType::BinaryView, false);
2057 let nullable_string_field = Field::new("a", DataType::Utf8View, true);
2058 let schema = Schema::new(vec![string_field, binary_field, nullable_string_field]);
2059
2060 let raw_string_values = vec!["foo", "bar", "large payload over 12 bytes", "lulu"];
2061 let raw_binary_values = vec![
2062 b"foo".to_vec(),
2063 b"bar".to_vec(),
2064 b"large payload over 12 bytes".to_vec(),
2065 b"lulu".to_vec(),
2066 ];
2067 let nullable_string_values =
2068 vec![Some("foo"), None, Some("large payload over 12 bytes"), None];
2069
2070 let string_view_values = StringViewArray::from(raw_string_values);
2071 let binary_view_values = BinaryViewArray::from_iter_values(raw_binary_values);
2072 let nullable_string_view_values = StringViewArray::from(nullable_string_values);
2073 let batch = RecordBatch::try_new(
2074 Arc::new(schema),
2075 vec![
2076 Arc::new(string_view_values),
2077 Arc::new(binary_view_values),
2078 Arc::new(nullable_string_view_values),
2079 ],
2080 )
2081 .unwrap();
2082
2083 roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
2084 roundtrip(batch, None);
2085 }
2086
2087 #[test]
2088 fn arrow_writer_binary_view_long_value() {
2089 let string_field = Field::new("a", DataType::Utf8View, false);
2090 let binary_field = Field::new("b", DataType::BinaryView, false);
2091 let schema = Schema::new(vec![string_field, binary_field]);
2092
2093 let long = "a".repeat(128);
2097 let raw_string_values = vec!["foo", long.as_str(), "bar"];
2098 let raw_binary_values = vec![b"foo".to_vec(), long.as_bytes().to_vec(), b"bar".to_vec()];
2099
2100 let string_view_values: ArrayRef = Arc::new(StringViewArray::from(raw_string_values));
2101 let binary_view_values: ArrayRef =
2102 Arc::new(BinaryViewArray::from_iter_values(raw_binary_values));
2103
2104 one_column_roundtrip(Arc::clone(&string_view_values), false);
2105 one_column_roundtrip(Arc::clone(&binary_view_values), false);
2106
2107 let batch = RecordBatch::try_new(
2108 Arc::new(schema),
2109 vec![string_view_values, binary_view_values],
2110 )
2111 .unwrap();
2112
2113 for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2115 let props = WriterProperties::builder()
2116 .set_writer_version(version)
2117 .set_dictionary_enabled(false)
2118 .build();
2119 roundtrip_opts(&batch, props);
2120 }
2121 }
2122
2123 fn get_decimal_batch(precision: u8, scale: i8) -> RecordBatch {
2124 let decimal_field = Field::new("a", DataType::Decimal128(precision, scale), false);
2125 let schema = Schema::new(vec![decimal_field]);
2126
2127 let decimal_values = vec![10_000, 50_000, 0, -100]
2128 .into_iter()
2129 .map(Some)
2130 .collect::<Decimal128Array>()
2131 .with_precision_and_scale(precision, scale)
2132 .unwrap();
2133
2134 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(decimal_values)]).unwrap()
2135 }
2136
2137 #[test]
2138 fn arrow_writer_decimal() {
2139 let batch_int32_decimal = get_decimal_batch(5, 2);
2141 roundtrip(batch_int32_decimal, Some(SMALL_SIZE / 2));
2142 let batch_int64_decimal = get_decimal_batch(12, 2);
2144 roundtrip(batch_int64_decimal, Some(SMALL_SIZE / 2));
2145 let batch_fixed_len_byte_array_decimal = get_decimal_batch(30, 2);
2147 roundtrip(batch_fixed_len_byte_array_decimal, Some(SMALL_SIZE / 2));
2148 }
2149
2150 #[test]
2151 fn arrow_writer_complex() {
2152 let struct_field_d = Arc::new(Field::new("d", DataType::Float64, true));
2154 let struct_field_f = Arc::new(Field::new("f", DataType::Float32, true));
2155 let struct_field_g = Arc::new(Field::new_list(
2156 "g",
2157 Field::new_list_field(DataType::Int16, true),
2158 false,
2159 ));
2160 let struct_field_h = Arc::new(Field::new_list(
2161 "h",
2162 Field::new_list_field(DataType::Int16, false),
2163 true,
2164 ));
2165 let struct_field_e = Arc::new(Field::new_struct(
2166 "e",
2167 vec![
2168 struct_field_f.clone(),
2169 struct_field_g.clone(),
2170 struct_field_h.clone(),
2171 ],
2172 false,
2173 ));
2174 let schema = Schema::new(vec![
2175 Field::new("a", DataType::Int32, false),
2176 Field::new("b", DataType::Int32, true),
2177 Field::new_struct(
2178 "c",
2179 vec![struct_field_d.clone(), struct_field_e.clone()],
2180 false,
2181 ),
2182 ]);
2183
2184 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2186 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
2187 let d = Float64Array::from(vec![None, None, None, Some(1.0), None]);
2188 let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]);
2189
2190 let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2191
2192 let g_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
2195
2196 let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
2198 .len(5)
2199 .add_buffer(g_value_offsets.clone())
2200 .add_child_data(g_value.to_data())
2201 .build()
2202 .unwrap();
2203 let g = ListArray::from(g_list_data);
2204 let h_list_data = ArrayData::builder(struct_field_h.data_type().clone())
2206 .len(5)
2207 .add_buffer(g_value_offsets)
2208 .add_child_data(g_value.to_data())
2209 .null_bit_buffer(Some(Buffer::from([0b00011011])))
2210 .build()
2211 .unwrap();
2212 let h = ListArray::from(h_list_data);
2213
2214 let e = StructArray::from(vec![
2215 (struct_field_f, Arc::new(f) as ArrayRef),
2216 (struct_field_g, Arc::new(g) as ArrayRef),
2217 (struct_field_h, Arc::new(h) as ArrayRef),
2218 ]);
2219
2220 let c = StructArray::from(vec![
2221 (struct_field_d, Arc::new(d) as ArrayRef),
2222 (struct_field_e, Arc::new(e) as ArrayRef),
2223 ]);
2224
2225 let batch = RecordBatch::try_new(
2227 Arc::new(schema),
2228 vec![Arc::new(a), Arc::new(b), Arc::new(c)],
2229 )
2230 .unwrap();
2231
2232 roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
2233 roundtrip(batch, Some(SMALL_SIZE / 3));
2234 }
2235
2236 #[test]
2237 fn arrow_writer_complex_mixed() {
2238 let offset_field = Arc::new(Field::new("offset", DataType::Int32, false));
2243 let partition_field = Arc::new(Field::new("partition", DataType::Int64, true));
2244 let topic_field = Arc::new(Field::new("topic", DataType::Utf8, true));
2245 let schema = Schema::new(vec![Field::new(
2246 "some_nested_object",
2247 DataType::Struct(Fields::from(vec![
2248 offset_field.clone(),
2249 partition_field.clone(),
2250 topic_field.clone(),
2251 ])),
2252 false,
2253 )]);
2254
2255 let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
2257 let partition = Int64Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
2258 let topic = StringArray::from(vec![Some("A"), None, Some("A"), Some(""), None]);
2259
2260 let some_nested_object = StructArray::from(vec![
2261 (offset_field, Arc::new(offset) as ArrayRef),
2262 (partition_field, Arc::new(partition) as ArrayRef),
2263 (topic_field, Arc::new(topic) as ArrayRef),
2264 ]);
2265
2266 let batch =
2268 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
2269
2270 roundtrip(batch, Some(SMALL_SIZE / 2));
2271 }
2272
2273 #[test]
2274 fn arrow_writer_map() {
2275 let json_content = r#"
2277 {"stocks":{"long": "$AAA", "short": "$BBB"}}
2278 {"stocks":{"long": null, "long": "$CCC", "short": null}}
2279 {"stocks":{"hedged": "$YYY", "long": null, "short": "$D"}}
2280 "#;
2281 let entries_struct_type = DataType::Struct(Fields::from(vec![
2282 Field::new("key", DataType::Utf8, false),
2283 Field::new("value", DataType::Utf8, true),
2284 ]));
2285 let stocks_field = Field::new(
2286 "stocks",
2287 DataType::Map(
2288 Arc::new(Field::new("entries", entries_struct_type, false)),
2289 false,
2290 ),
2291 true,
2292 );
2293 let schema = Arc::new(Schema::new(vec![stocks_field]));
2294 let builder = arrow::json::ReaderBuilder::new(schema).with_batch_size(64);
2295 let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
2296
2297 let batch = reader.next().unwrap().unwrap();
2298 roundtrip(batch, None);
2299 }
2300
2301 #[test]
2302 fn arrow_writer_2_level_struct() {
2303 let field_c = Field::new("c", DataType::Int32, true);
2305 let field_b = Field::new("b", DataType::Struct(vec![field_c].into()), true);
2306 let type_a = DataType::Struct(vec![field_b.clone()].into());
2307 let field_a = Field::new("a", type_a, true);
2308 let schema = Schema::new(vec![field_a.clone()]);
2309
2310 let c = Int32Array::from(vec![Some(1), None, Some(3), None, None, Some(6)]);
2312 let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
2313 .len(6)
2314 .null_bit_buffer(Some(Buffer::from([0b00100111])))
2315 .add_child_data(c.into_data())
2316 .build()
2317 .unwrap();
2318 let b = StructArray::from(b_data);
2319 let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
2320 .len(6)
2321 .null_bit_buffer(Some(Buffer::from([0b00101111])))
2322 .add_child_data(b.into_data())
2323 .build()
2324 .unwrap();
2325 let a = StructArray::from(a_data);
2326
2327 assert_eq!(a.null_count(), 1);
2328 assert_eq!(a.column(0).null_count(), 2);
2329
2330 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2332
2333 roundtrip(batch, Some(SMALL_SIZE / 2));
2334 }
2335
2336 #[test]
2337 fn arrow_writer_2_level_struct_non_null() {
2338 let field_c = Field::new("c", DataType::Int32, false);
2340 let type_b = DataType::Struct(vec![field_c].into());
2341 let field_b = Field::new("b", type_b.clone(), false);
2342 let type_a = DataType::Struct(vec![field_b].into());
2343 let field_a = Field::new("a", type_a.clone(), false);
2344 let schema = Schema::new(vec![field_a]);
2345
2346 let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
2348 let b_data = ArrayDataBuilder::new(type_b)
2349 .len(6)
2350 .add_child_data(c.into_data())
2351 .build()
2352 .unwrap();
2353 let b = StructArray::from(b_data);
2354 let a_data = ArrayDataBuilder::new(type_a)
2355 .len(6)
2356 .add_child_data(b.into_data())
2357 .build()
2358 .unwrap();
2359 let a = StructArray::from(a_data);
2360
2361 assert_eq!(a.null_count(), 0);
2362 assert_eq!(a.column(0).null_count(), 0);
2363
2364 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2366
2367 roundtrip(batch, Some(SMALL_SIZE / 2));
2368 }
2369
2370 #[test]
2371 fn arrow_writer_2_level_struct_mixed_null() {
2372 let field_c = Field::new("c", DataType::Int32, false);
2374 let type_b = DataType::Struct(vec![field_c].into());
2375 let field_b = Field::new("b", type_b.clone(), true);
2376 let type_a = DataType::Struct(vec![field_b].into());
2377 let field_a = Field::new("a", type_a.clone(), false);
2378 let schema = Schema::new(vec![field_a]);
2379
2380 let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
2382 let b_data = ArrayDataBuilder::new(type_b)
2383 .len(6)
2384 .null_bit_buffer(Some(Buffer::from([0b00100111])))
2385 .add_child_data(c.into_data())
2386 .build()
2387 .unwrap();
2388 let b = StructArray::from(b_data);
2389 let a_data = ArrayDataBuilder::new(type_a)
2391 .len(6)
2392 .add_child_data(b.into_data())
2393 .build()
2394 .unwrap();
2395 let a = StructArray::from(a_data);
2396
2397 assert_eq!(a.null_count(), 0);
2398 assert_eq!(a.column(0).null_count(), 2);
2399
2400 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2402
2403 roundtrip(batch, Some(SMALL_SIZE / 2));
2404 }
2405
2406 #[test]
2407 fn arrow_writer_2_level_struct_mixed_null_2() {
2408 let field_c = Field::new("c", DataType::Int32, false);
2410 let field_d = Field::new("d", DataType::FixedSizeBinary(4), false);
2411 let field_e = Field::new(
2412 "e",
2413 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2414 false,
2415 );
2416
2417 let field_b = Field::new(
2418 "b",
2419 DataType::Struct(vec![field_c, field_d, field_e].into()),
2420 false,
2421 );
2422 let type_a = DataType::Struct(vec![field_b.clone()].into());
2423 let field_a = Field::new("a", type_a, true);
2424 let schema = Schema::new(vec![field_a.clone()]);
2425
2426 let c = Int32Array::from_iter_values(0..6);
2428 let d = FixedSizeBinaryArray::try_from_iter(
2429 ["aaaa", "bbbb", "cccc", "dddd", "eeee", "ffff"].into_iter(),
2430 )
2431 .expect("four byte values");
2432 let e = Int32DictionaryArray::from_iter(["one", "two", "three", "four", "five", "one"]);
2433 let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
2434 .len(6)
2435 .add_child_data(c.into_data())
2436 .add_child_data(d.into_data())
2437 .add_child_data(e.into_data())
2438 .build()
2439 .unwrap();
2440 let b = StructArray::from(b_data);
2441 let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
2442 .len(6)
2443 .null_bit_buffer(Some(Buffer::from([0b00100101])))
2444 .add_child_data(b.into_data())
2445 .build()
2446 .unwrap();
2447 let a = StructArray::from(a_data);
2448
2449 assert_eq!(a.null_count(), 3);
2450 assert_eq!(a.column(0).null_count(), 0);
2451
2452 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2454
2455 roundtrip(batch, Some(SMALL_SIZE / 2));
2456 }
2457
2458 #[test]
2459 fn test_fixed_size_binary_in_dict() {
2460 fn test_fixed_size_binary_in_dict_inner<K>()
2461 where
2462 K: ArrowDictionaryKeyType,
2463 K::Native: FromPrimitive + ToPrimitive + TryFrom<u8>,
2464 <<K as arrow_array::ArrowPrimitiveType>::Native as TryFrom<u8>>::Error: std::fmt::Debug,
2465 {
2466 let field = Field::new(
2467 "a",
2468 DataType::Dictionary(
2469 Box::new(K::DATA_TYPE),
2470 Box::new(DataType::FixedSizeBinary(4)),
2471 ),
2472 false,
2473 );
2474 let schema = Schema::new(vec![field]);
2475
2476 let keys: Vec<K::Native> = vec![
2477 K::Native::try_from(0u8).unwrap(),
2478 K::Native::try_from(0u8).unwrap(),
2479 K::Native::try_from(1u8).unwrap(),
2480 ];
2481 let keys = PrimitiveArray::<K>::from_iter_values(keys);
2482 let values = FixedSizeBinaryArray::try_from_iter(
2483 vec![vec![0, 0, 0, 0], vec![1, 1, 1, 1]].into_iter(),
2484 )
2485 .unwrap();
2486
2487 let data = DictionaryArray::<K>::new(keys, Arc::new(values));
2488 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(data)]).unwrap();
2489 roundtrip(batch, None);
2490 }
2491
2492 test_fixed_size_binary_in_dict_inner::<UInt8Type>();
2493 test_fixed_size_binary_in_dict_inner::<UInt16Type>();
2494 test_fixed_size_binary_in_dict_inner::<UInt32Type>();
2495 test_fixed_size_binary_in_dict_inner::<UInt16Type>();
2496 test_fixed_size_binary_in_dict_inner::<Int8Type>();
2497 test_fixed_size_binary_in_dict_inner::<Int16Type>();
2498 test_fixed_size_binary_in_dict_inner::<Int32Type>();
2499 test_fixed_size_binary_in_dict_inner::<Int64Type>();
2500 }
2501
2502 #[test]
2503 fn test_empty_dict() {
2504 let struct_fields = Fields::from(vec![Field::new(
2505 "dict",
2506 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2507 false,
2508 )]);
2509
2510 let schema = Schema::new(vec![Field::new_struct(
2511 "struct",
2512 struct_fields.clone(),
2513 true,
2514 )]);
2515 let dictionary = Arc::new(DictionaryArray::new(
2516 Int32Array::new_null(5),
2517 Arc::new(StringArray::new_null(0)),
2518 ));
2519
2520 let s = StructArray::new(
2521 struct_fields,
2522 vec![dictionary],
2523 Some(NullBuffer::new_null(5)),
2524 );
2525
2526 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(s)]).unwrap();
2527 roundtrip(batch, None);
2528 }
2529 #[test]
2530 fn arrow_writer_page_size() {
2531 let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)]));
2532
2533 let mut builder = StringBuilder::with_capacity(100, 329 * 10_000);
2534
2535 for i in 0..10 {
2537 let value = i
2538 .to_string()
2539 .repeat(10)
2540 .chars()
2541 .take(10)
2542 .collect::<String>();
2543
2544 builder.append_value(value);
2545 }
2546
2547 let array = Arc::new(builder.finish());
2548
2549 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
2550
2551 let file = tempfile::tempfile().unwrap();
2552
2553 let props = WriterProperties::builder()
2555 .set_data_page_size_limit(1)
2556 .set_dictionary_page_size_limit(1)
2557 .set_write_batch_size(1)
2558 .build();
2559
2560 let mut writer =
2561 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
2562 .expect("Unable to write file");
2563 writer.write(&batch).unwrap();
2564 writer.close().unwrap();
2565
2566 let options = ReadOptionsBuilder::new().with_page_index().build();
2567 let reader =
2568 SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap();
2569
2570 let column = reader.metadata().row_group(0).columns();
2571
2572 assert_eq!(column.len(), 1);
2573
2574 assert!(
2577 column[0].dictionary_page_offset().is_some(),
2578 "Expected a dictionary page"
2579 );
2580
2581 assert!(reader.metadata().offset_index().is_some());
2582 let offset_indexes = &reader.metadata().offset_index().unwrap()[0];
2583
2584 let page_locations = offset_indexes[0].page_locations.clone();
2585
2586 assert_eq!(
2589 page_locations.len(),
2590 10,
2591 "Expected 10 pages but got {page_locations:#?}"
2592 );
2593 }
2594
2595 #[test]
2596 fn arrow_writer_float_nans() {
2597 let f16_field = Field::new("a", DataType::Float16, false);
2598 let f32_field = Field::new("b", DataType::Float32, false);
2599 let f64_field = Field::new("c", DataType::Float64, false);
2600 let schema = Schema::new(vec![f16_field, f32_field, f64_field]);
2601
2602 let f16_values = (0..MEDIUM_SIZE)
2603 .map(|i| {
2604 Some(if i % 2 == 0 {
2605 f16::NAN
2606 } else {
2607 f16::from_f32(i as f32)
2608 })
2609 })
2610 .collect::<Float16Array>();
2611
2612 let f32_values = (0..MEDIUM_SIZE)
2613 .map(|i| Some(if i % 2 == 0 { f32::NAN } else { i as f32 }))
2614 .collect::<Float32Array>();
2615
2616 let f64_values = (0..MEDIUM_SIZE)
2617 .map(|i| Some(if i % 2 == 0 { f64::NAN } else { i as f64 }))
2618 .collect::<Float64Array>();
2619
2620 let batch = RecordBatch::try_new(
2621 Arc::new(schema),
2622 vec![
2623 Arc::new(f16_values),
2624 Arc::new(f32_values),
2625 Arc::new(f64_values),
2626 ],
2627 )
2628 .unwrap();
2629
2630 roundtrip(batch, None);
2631 }
2632
2633 const SMALL_SIZE: usize = 7;
2634 const MEDIUM_SIZE: usize = 63;
2635
2636 fn roundtrip(expected_batch: RecordBatch, max_row_group_size: Option<usize>) -> Vec<Bytes> {
2639 let mut files = vec![];
2640 for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2641 let mut props = WriterProperties::builder().set_writer_version(version);
2642
2643 if let Some(size) = max_row_group_size {
2644 props = props.set_max_row_group_row_count(Some(size))
2645 }
2646
2647 let props = props.build();
2648 files.push(roundtrip_opts(&expected_batch, props))
2649 }
2650 files
2651 }
2652
2653 fn roundtrip_opts_with_array_validation<F>(
2657 expected_batch: &RecordBatch,
2658 props: WriterProperties,
2659 validate: F,
2660 ) -> Bytes
2661 where
2662 F: Fn(&ArrayData, &ArrayData),
2663 {
2664 let mut file = vec![];
2665
2666 let mut writer = ArrowWriter::try_new(&mut file, expected_batch.schema(), Some(props))
2667 .expect("Unable to write file");
2668 writer.write(expected_batch).unwrap();
2669 writer.close().unwrap();
2670
2671 let file = Bytes::from(file);
2672 let mut record_batch_reader =
2673 ParquetRecordBatchReader::try_new(file.clone(), 1024).unwrap();
2674
2675 let actual_batch = record_batch_reader
2676 .next()
2677 .expect("No batch found")
2678 .expect("Unable to get batch");
2679
2680 assert_eq!(expected_batch.schema(), actual_batch.schema());
2681 assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
2682 assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
2683 for i in 0..expected_batch.num_columns() {
2684 let expected_data = expected_batch.column(i).to_data();
2685 let actual_data = actual_batch.column(i).to_data();
2686 validate(&expected_data, &actual_data);
2687 }
2688
2689 file
2690 }
2691
2692 fn roundtrip_opts(expected_batch: &RecordBatch, props: WriterProperties) -> Bytes {
2693 roundtrip_opts_with_array_validation(expected_batch, props, |a, b| {
2694 a.validate_full().expect("valid expected data");
2695 b.validate_full().expect("valid actual data");
2696 assert_eq!(a, b)
2697 })
2698 }
2699
2700 struct RoundTripOptions {
2701 values: ArrayRef,
2702 schema: SchemaRef,
2703 bloom_filter: bool,
2704 bloom_filter_ndv: Option<u64>,
2705 bloom_filter_position: BloomFilterPosition,
2706 }
2707
2708 impl RoundTripOptions {
2709 fn new(values: ArrayRef, nullable: bool) -> Self {
2710 let data_type = values.data_type().clone();
2711 let schema = Schema::new(vec![Field::new("col", data_type, nullable)]);
2712 Self {
2713 values,
2714 schema: Arc::new(schema),
2715 bloom_filter: false,
2716 bloom_filter_ndv: None,
2717 bloom_filter_position: BloomFilterPosition::AfterRowGroup,
2718 }
2719 }
2720 }
2721
2722 fn one_column_roundtrip(values: ArrayRef, nullable: bool) -> Vec<Bytes> {
2723 one_column_roundtrip_with_options(RoundTripOptions::new(values, nullable))
2724 }
2725
2726 fn one_column_roundtrip_with_schema(values: ArrayRef, schema: SchemaRef) -> Vec<Bytes> {
2727 let mut options = RoundTripOptions::new(values, false);
2728 options.schema = schema;
2729 one_column_roundtrip_with_options(options)
2730 }
2731
2732 fn one_column_roundtrip_with_options(options: RoundTripOptions) -> Vec<Bytes> {
2733 let RoundTripOptions {
2734 values,
2735 schema,
2736 bloom_filter,
2737 bloom_filter_ndv,
2738 bloom_filter_position,
2739 } = options;
2740
2741 let encodings = match values.data_type() {
2742 DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary | DataType::LargeBinary => {
2743 vec![
2744 Encoding::PLAIN,
2745 Encoding::DELTA_BYTE_ARRAY,
2746 Encoding::DELTA_LENGTH_BYTE_ARRAY,
2747 ]
2748 }
2749 DataType::Int64
2750 | DataType::Int32
2751 | DataType::Int16
2752 | DataType::Int8
2753 | DataType::UInt64
2754 | DataType::UInt32
2755 | DataType::UInt16
2756 | DataType::UInt8 => vec![
2757 Encoding::PLAIN,
2758 Encoding::DELTA_BINARY_PACKED,
2759 Encoding::BYTE_STREAM_SPLIT,
2760 ],
2761 DataType::Float32 | DataType::Float64 => {
2762 vec![Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT]
2763 }
2764 _ => vec![Encoding::PLAIN],
2765 };
2766
2767 let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
2768
2769 let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
2770
2771 let mut files = vec![];
2772 for dictionary_size in [0, 1, 1024] {
2773 for encoding in &encodings {
2774 for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2775 for row_group_size in row_group_sizes {
2776 let mut builder = WriterProperties::builder()
2777 .set_writer_version(version)
2778 .set_max_row_group_row_count(Some(row_group_size))
2779 .set_dictionary_enabled(dictionary_size != 0)
2780 .set_dictionary_page_size_limit(dictionary_size.max(1))
2781 .set_encoding(*encoding)
2782 .set_bloom_filter_enabled(bloom_filter)
2783 .set_bloom_filter_position(bloom_filter_position);
2784 if let Some(ndv) = bloom_filter_ndv {
2785 builder = builder.set_bloom_filter_ndv(ndv);
2786 }
2787 let props = builder.build();
2788
2789 files.push(roundtrip_opts(&expected_batch, props))
2790 }
2791 }
2792 }
2793 }
2794 files
2795 }
2796
2797 fn values_required<A, I>(iter: I) -> Vec<Bytes>
2798 where
2799 A: From<Vec<I::Item>> + Array + 'static,
2800 I: IntoIterator,
2801 {
2802 let raw_values: Vec<_> = iter.into_iter().collect();
2803 let values = Arc::new(A::from(raw_values));
2804 one_column_roundtrip(values, false)
2805 }
2806
2807 fn values_optional<A, I>(iter: I) -> Vec<Bytes>
2808 where
2809 A: From<Vec<Option<I::Item>>> + Array + 'static,
2810 I: IntoIterator,
2811 {
2812 let optional_raw_values: Vec<_> = iter
2813 .into_iter()
2814 .enumerate()
2815 .map(|(i, v)| if i % 2 == 0 { None } else { Some(v) })
2816 .collect();
2817 let optional_values = Arc::new(A::from(optional_raw_values));
2818 one_column_roundtrip(optional_values, true)
2819 }
2820
2821 fn required_and_optional<A, I>(iter: I)
2822 where
2823 A: From<Vec<I::Item>> + From<Vec<Option<I::Item>>> + Array + 'static,
2824 I: IntoIterator + Clone,
2825 {
2826 values_required::<A, I>(iter.clone());
2827 values_optional::<A, I>(iter);
2828 }
2829
2830 fn check_bloom_filter<T: AsBytes>(
2831 files: Vec<Bytes>,
2832 file_column: String,
2833 positive_values: Vec<T>,
2834 negative_values: Vec<T>,
2835 ) {
2836 files.into_iter().take(1).for_each(|file| {
2837 let file_reader = SerializedFileReader::new_with_options(
2838 file,
2839 ReadOptionsBuilder::new()
2840 .with_reader_properties(
2841 ReaderProperties::builder()
2842 .set_read_bloom_filter(true)
2843 .build(),
2844 )
2845 .build(),
2846 )
2847 .expect("Unable to open file as Parquet");
2848 let metadata = file_reader.metadata();
2849
2850 let mut bloom_filters: Vec<_> = vec![];
2852 for (ri, row_group) in metadata.row_groups().iter().enumerate() {
2853 if let Some((column_index, _)) = row_group
2854 .columns()
2855 .iter()
2856 .enumerate()
2857 .find(|(_, column)| column.column_path().string() == file_column)
2858 {
2859 let row_group_reader = file_reader
2860 .get_row_group(ri)
2861 .expect("Unable to read row group");
2862 if let Some(sbbf) = row_group_reader.get_column_bloom_filter(column_index) {
2863 bloom_filters.push(sbbf.clone());
2864 } else {
2865 panic!("No bloom filter for column named {file_column} found");
2866 }
2867 } else {
2868 panic!("No column named {file_column} found");
2869 }
2870 }
2871
2872 positive_values.iter().for_each(|value| {
2873 let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2874 assert!(
2875 found.is_some(),
2876 "{}",
2877 format!("Value {:?} should be in bloom filter", value.as_bytes())
2878 );
2879 });
2880
2881 negative_values.iter().for_each(|value| {
2882 let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
2883 assert!(
2884 found.is_none(),
2885 "{}",
2886 format!("Value {:?} should not be in bloom filter", value.as_bytes())
2887 );
2888 });
2889 });
2890 }
2891
2892 #[test]
2893 fn all_null_primitive_single_column() {
2894 let values = Arc::new(Int32Array::from(vec![None; SMALL_SIZE]));
2895 one_column_roundtrip(values, true);
2896 }
2897 #[test]
2898 fn null_single_column() {
2899 let values = Arc::new(NullArray::new(SMALL_SIZE));
2900 one_column_roundtrip(values, true);
2901 }
2903
2904 #[test]
2905 fn bool_single_column() {
2906 required_and_optional::<BooleanArray, _>(
2907 [true, false].iter().cycle().copied().take(SMALL_SIZE),
2908 );
2909 }
2910
2911 #[test]
2912 fn bool_large_single_column() {
2913 let values = Arc::new(
2914 [None, Some(true), Some(false)]
2915 .iter()
2916 .cycle()
2917 .copied()
2918 .take(200_000)
2919 .collect::<BooleanArray>(),
2920 );
2921 let schema = Schema::new(vec![Field::new("col", values.data_type().clone(), true)]);
2922 let expected_batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2923 let file = tempfile::tempfile().unwrap();
2924
2925 let mut writer =
2926 ArrowWriter::try_new(file.try_clone().unwrap(), expected_batch.schema(), None)
2927 .expect("Unable to write file");
2928 writer.write(&expected_batch).unwrap();
2929 writer.close().unwrap();
2930 }
2931
2932 #[test]
2933 fn check_page_offset_index_with_nan() {
2934 let values = Arc::new(Float64Array::from(vec![f64::NAN; 10]));
2935 let schema = Schema::new(vec![Field::new("col", DataType::Float64, true)]);
2936 let batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
2937
2938 let mut out = Vec::with_capacity(1024);
2939 let mut writer =
2940 ArrowWriter::try_new(&mut out, batch.schema(), None).expect("Unable to write file");
2941 writer.write(&batch).unwrap();
2942 let file_meta_data = writer.close().unwrap();
2943 for row_group in file_meta_data.row_groups() {
2944 for column in row_group.columns() {
2945 assert!(column.offset_index_offset().is_some());
2946 assert!(column.offset_index_length().is_some());
2947 assert!(column.column_index_offset().is_none());
2948 assert!(column.column_index_length().is_none());
2949 }
2950 }
2951 }
2952
2953 #[test]
2954 fn i8_single_column() {
2955 required_and_optional::<Int8Array, _>(0..SMALL_SIZE as i8);
2956 }
2957
2958 #[test]
2959 fn i16_single_column() {
2960 required_and_optional::<Int16Array, _>(0..SMALL_SIZE as i16);
2961 }
2962
2963 #[test]
2964 fn i32_single_column() {
2965 required_and_optional::<Int32Array, _>(0..SMALL_SIZE as i32);
2966 }
2967
2968 #[test]
2969 fn i64_single_column() {
2970 required_and_optional::<Int64Array, _>(0..SMALL_SIZE as i64);
2971 }
2972
2973 #[test]
2974 fn u8_single_column() {
2975 required_and_optional::<UInt8Array, _>(0..SMALL_SIZE as u8);
2976 }
2977
2978 #[test]
2979 fn u16_single_column() {
2980 required_and_optional::<UInt16Array, _>(0..SMALL_SIZE as u16);
2981 }
2982
2983 #[test]
2984 fn u32_single_column() {
2985 required_and_optional::<UInt32Array, _>(0..SMALL_SIZE as u32);
2986 }
2987
2988 #[test]
2989 fn u64_single_column() {
2990 required_and_optional::<UInt64Array, _>(0..SMALL_SIZE as u64);
2991 }
2992
2993 #[test]
2994 fn f32_single_column() {
2995 required_and_optional::<Float32Array, _>((0..SMALL_SIZE).map(|i| i as f32));
2996 }
2997
2998 #[test]
2999 fn f64_single_column() {
3000 required_and_optional::<Float64Array, _>((0..SMALL_SIZE).map(|i| i as f64));
3001 }
3002
3003 #[test]
3008 fn timestamp_second_single_column() {
3009 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
3010 let values = Arc::new(TimestampSecondArray::from(raw_values));
3011
3012 one_column_roundtrip(values, false);
3013 }
3014
3015 #[test]
3016 fn timestamp_millisecond_single_column() {
3017 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
3018 let values = Arc::new(TimestampMillisecondArray::from(raw_values));
3019
3020 one_column_roundtrip(values, false);
3021 }
3022
3023 #[test]
3024 fn timestamp_microsecond_single_column() {
3025 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
3026 let values = Arc::new(TimestampMicrosecondArray::from(raw_values));
3027
3028 one_column_roundtrip(values, false);
3029 }
3030
3031 #[test]
3032 fn timestamp_nanosecond_single_column() {
3033 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
3034 let values = Arc::new(TimestampNanosecondArray::from(raw_values));
3035
3036 one_column_roundtrip(values, false);
3037 }
3038
3039 #[test]
3040 fn date32_single_column() {
3041 required_and_optional::<Date32Array, _>(0..SMALL_SIZE as i32);
3042 }
3043
3044 #[test]
3045 fn date64_single_column() {
3046 required_and_optional::<Date64Array, _>(
3048 (0..(SMALL_SIZE as i64 * 86400000)).step_by(86400000),
3049 );
3050 }
3051
3052 #[test]
3053 fn time32_second_single_column() {
3054 required_and_optional::<Time32SecondArray, _>(0..SMALL_SIZE as i32);
3055 }
3056
3057 #[test]
3058 fn time32_millisecond_single_column() {
3059 required_and_optional::<Time32MillisecondArray, _>(0..SMALL_SIZE as i32);
3060 }
3061
3062 #[test]
3063 fn time64_microsecond_single_column() {
3064 required_and_optional::<Time64MicrosecondArray, _>(0..SMALL_SIZE as i64);
3065 }
3066
3067 #[test]
3068 fn time64_nanosecond_single_column() {
3069 required_and_optional::<Time64NanosecondArray, _>(0..SMALL_SIZE as i64);
3070 }
3071
3072 #[test]
3073 fn duration_second_single_column() {
3074 required_and_optional::<DurationSecondArray, _>(0..SMALL_SIZE as i64);
3075 }
3076
3077 #[test]
3078 fn duration_millisecond_single_column() {
3079 required_and_optional::<DurationMillisecondArray, _>(0..SMALL_SIZE as i64);
3080 }
3081
3082 #[test]
3083 fn duration_microsecond_single_column() {
3084 required_and_optional::<DurationMicrosecondArray, _>(0..SMALL_SIZE as i64);
3085 }
3086
3087 #[test]
3088 fn duration_nanosecond_single_column() {
3089 required_and_optional::<DurationNanosecondArray, _>(0..SMALL_SIZE as i64);
3090 }
3091
3092 #[test]
3093 fn interval_year_month_single_column() {
3094 required_and_optional::<IntervalYearMonthArray, _>(0..SMALL_SIZE as i32);
3095 }
3096
3097 #[test]
3098 fn interval_day_time_single_column() {
3099 required_and_optional::<IntervalDayTimeArray, _>(vec![
3100 IntervalDayTime::new(0, 1),
3101 IntervalDayTime::new(0, 3),
3102 IntervalDayTime::new(3, -2),
3103 IntervalDayTime::new(-200, 4),
3104 ]);
3105 }
3106
3107 #[test]
3108 #[should_panic(
3109 expected = "Attempting to write an Arrow interval type MonthDayNano to parquet that is not yet implemented"
3110 )]
3111 fn interval_month_day_nano_single_column() {
3112 required_and_optional::<IntervalMonthDayNanoArray, _>(vec![
3113 IntervalMonthDayNano::new(0, 1, 5),
3114 IntervalMonthDayNano::new(0, 3, 2),
3115 IntervalMonthDayNano::new(3, -2, -5),
3116 IntervalMonthDayNano::new(-200, 4, -1),
3117 ]);
3118 }
3119
3120 #[test]
3121 fn binary_single_column() {
3122 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3123 let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3124 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3125
3126 values_required::<BinaryArray, _>(many_vecs_iter);
3128 }
3129
3130 #[test]
3131 fn binary_view_single_column() {
3132 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3133 let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3134 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3135
3136 values_required::<BinaryViewArray, _>(many_vecs_iter);
3138 }
3139
3140 #[test]
3141 fn i32_column_bloom_filter_at_end() {
3142 let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
3143 let mut options = RoundTripOptions::new(array, false);
3144 options.bloom_filter = true;
3145 options.bloom_filter_position = BloomFilterPosition::End;
3146
3147 let files = one_column_roundtrip_with_options(options);
3148 check_bloom_filter(
3149 files,
3150 "col".to_string(),
3151 (0..SMALL_SIZE as i32).collect(),
3152 (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3153 );
3154 }
3155
3156 #[test]
3157 fn i32_column_bloom_filter() {
3158 let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
3159 let mut options = RoundTripOptions::new(array, false);
3160 options.bloom_filter = true;
3161
3162 let files = one_column_roundtrip_with_options(options);
3163 check_bloom_filter(
3164 files,
3165 "col".to_string(),
3166 (0..SMALL_SIZE as i32).collect(),
3167 (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3168 );
3169 }
3170
3171 #[test]
3176 fn i32_column_bloom_filter_fixed_ndv() {
3177 let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
3178
3179 let mut options = RoundTripOptions::new(array.clone(), false);
3181 options.bloom_filter = true;
3182 options.bloom_filter_ndv = Some(1_000_000);
3183
3184 let files = one_column_roundtrip_with_options(options);
3185 check_bloom_filter(
3186 files,
3187 "col".to_string(),
3188 (0..SMALL_SIZE as i32).collect(),
3189 (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3190 );
3191
3192 let mut options = RoundTripOptions::new(array, false);
3194 options.bloom_filter = true;
3195 options.bloom_filter_ndv = Some(3);
3196
3197 let files = one_column_roundtrip_with_options(options);
3198 check_bloom_filter(
3199 files,
3200 "col".to_string(),
3201 (0..SMALL_SIZE as i32).collect(),
3202 (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3203 );
3204 }
3205
3206 #[test]
3207 fn binary_column_bloom_filter() {
3208 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3209 let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3210 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3211
3212 let array = Arc::new(BinaryArray::from_iter_values(many_vecs_iter));
3213 let mut options = RoundTripOptions::new(array, false);
3214 options.bloom_filter = true;
3215
3216 let files = one_column_roundtrip_with_options(options);
3217 check_bloom_filter(
3218 files,
3219 "col".to_string(),
3220 many_vecs,
3221 vec![vec![(SMALL_SIZE + 1) as u8]],
3222 );
3223 }
3224
3225 #[test]
3226 fn empty_string_null_column_bloom_filter() {
3227 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3228 let raw_strs = raw_values.iter().map(|s| s.as_str());
3229
3230 let array = Arc::new(StringArray::from_iter_values(raw_strs));
3231 let mut options = RoundTripOptions::new(array, false);
3232 options.bloom_filter = true;
3233
3234 let files = one_column_roundtrip_with_options(options);
3235
3236 let optional_raw_values: Vec<_> = raw_values
3237 .iter()
3238 .enumerate()
3239 .filter_map(|(i, v)| if i % 2 == 0 { None } else { Some(v.as_str()) })
3240 .collect();
3241 check_bloom_filter(files, "col".to_string(), optional_raw_values, vec![""]);
3243 }
3244
3245 #[test]
3246 fn large_binary_single_column() {
3247 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3248 let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3249 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3250
3251 values_required::<LargeBinaryArray, _>(many_vecs_iter);
3253 }
3254
3255 #[test]
3256 fn fixed_size_binary_single_column() {
3257 let mut builder = FixedSizeBinaryBuilder::new(4);
3258 builder.append_value(b"0123").unwrap();
3259 builder.append_null();
3260 builder.append_value(b"8910").unwrap();
3261 builder.append_value(b"1112").unwrap();
3262 let array = Arc::new(builder.finish());
3263
3264 one_column_roundtrip(array, true);
3265 }
3266
3267 #[test]
3268 fn string_single_column() {
3269 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3270 let raw_strs = raw_values.iter().map(|s| s.as_str());
3271
3272 required_and_optional::<StringArray, _>(raw_strs);
3273 }
3274
3275 #[test]
3276 fn large_string_single_column() {
3277 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3278 let raw_strs = raw_values.iter().map(|s| s.as_str());
3279
3280 required_and_optional::<LargeStringArray, _>(raw_strs);
3281 }
3282
3283 #[test]
3284 fn string_view_single_column() {
3285 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3286 let raw_strs = raw_values.iter().map(|s| s.as_str());
3287
3288 required_and_optional::<StringViewArray, _>(raw_strs);
3289 }
3290
3291 #[test]
3292 fn null_list_single_column() {
3293 let null_field = Field::new_list_field(DataType::Null, true);
3294 let list_field = Field::new("emptylist", DataType::List(Arc::new(null_field)), true);
3295
3296 let schema = Schema::new(vec![list_field]);
3297
3298 let a_values = NullArray::new(2);
3300 let a_value_offsets = arrow::buffer::Buffer::from([0, 0, 0, 2].to_byte_slice());
3301 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
3302 DataType::Null,
3303 true,
3304 ))))
3305 .len(3)
3306 .add_buffer(a_value_offsets)
3307 .null_bit_buffer(Some(Buffer::from([0b00000101])))
3308 .add_child_data(a_values.into_data())
3309 .build()
3310 .unwrap();
3311
3312 let a = ListArray::from(a_list_data);
3313
3314 assert!(a.is_valid(0));
3315 assert!(!a.is_valid(1));
3316 assert!(a.is_valid(2));
3317
3318 assert_eq!(a.value(0).len(), 0);
3319 assert_eq!(a.value(2).len(), 2);
3320 assert_eq!(a.value(2).logical_nulls().unwrap().null_count(), 2);
3321
3322 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
3323 roundtrip(batch, None);
3324 }
3325
3326 #[test]
3327 fn list_single_column() {
3328 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
3329 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
3330 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
3331 DataType::Int32,
3332 false,
3333 ))))
3334 .len(5)
3335 .add_buffer(a_value_offsets)
3336 .null_bit_buffer(Some(Buffer::from([0b00011011])))
3337 .add_child_data(a_values.into_data())
3338 .build()
3339 .unwrap();
3340
3341 assert_eq!(a_list_data.null_count(), 1);
3342
3343 let a = ListArray::from(a_list_data);
3344 let values = Arc::new(a);
3345
3346 one_column_roundtrip(values, true);
3347 }
3348
3349 #[test]
3350 fn large_list_single_column() {
3351 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
3352 let a_value_offsets = arrow::buffer::Buffer::from([0i64, 1, 3, 3, 6, 10].to_byte_slice());
3353 let a_list_data = ArrayData::builder(DataType::LargeList(Arc::new(Field::new(
3354 "large_item",
3355 DataType::Int32,
3356 true,
3357 ))))
3358 .len(5)
3359 .add_buffer(a_value_offsets)
3360 .add_child_data(a_values.into_data())
3361 .null_bit_buffer(Some(Buffer::from([0b00011011])))
3362 .build()
3363 .unwrap();
3364
3365 assert_eq!(a_list_data.null_count(), 1);
3367
3368 let a = LargeListArray::from(a_list_data);
3369 let values = Arc::new(a);
3370
3371 one_column_roundtrip(values, true);
3372 }
3373
3374 #[test]
3375 fn list_nested_nulls() {
3376 use arrow::datatypes::Int32Type;
3377 let data = vec![
3378 Some(vec![Some(1)]),
3379 Some(vec![Some(2), Some(3)]),
3380 None,
3381 Some(vec![Some(4), Some(5), None]),
3382 Some(vec![None]),
3383 Some(vec![Some(6), Some(7)]),
3384 ];
3385
3386 let list = ListArray::from_iter_primitive::<Int32Type, _, _>(data.clone());
3387 one_column_roundtrip(Arc::new(list), true);
3388
3389 let list = LargeListArray::from_iter_primitive::<Int32Type, _, _>(data);
3390 one_column_roundtrip(Arc::new(list), true);
3391 }
3392
3393 #[test]
3394 fn struct_single_column() {
3395 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
3396 let struct_field_a = Arc::new(Field::new("f", DataType::Int32, false));
3397 let s = StructArray::from(vec![(struct_field_a, Arc::new(a_values) as ArrayRef)]);
3398
3399 let values = Arc::new(s);
3400 one_column_roundtrip(values, false);
3401 }
3402
3403 #[test]
3404 fn list_and_map_coerced_names() {
3405 let list_field =
3407 Field::new_list("my_list", Field::new("item", DataType::Int32, false), false);
3408 let map_field = Field::new_map(
3409 "my_map",
3410 "entries",
3411 Field::new("keys", DataType::Int32, false),
3412 Field::new("values", DataType::Int32, true),
3413 false,
3414 true,
3415 );
3416
3417 let list_array = create_random_array(&list_field, 100, 0.0, 0.0).unwrap();
3418 let map_array = create_random_array(&map_field, 100, 0.0, 0.0).unwrap();
3419
3420 let arrow_schema = Arc::new(Schema::new(vec![list_field, map_field]));
3421
3422 let props = Some(WriterProperties::builder().set_coerce_types(true).build());
3424 let file = tempfile::tempfile().unwrap();
3425 let mut writer =
3426 ArrowWriter::try_new(file.try_clone().unwrap(), arrow_schema.clone(), props).unwrap();
3427
3428 let batch = RecordBatch::try_new(arrow_schema, vec![list_array, map_array]).unwrap();
3429 writer.write(&batch).unwrap();
3430 let file_metadata = writer.close().unwrap();
3431
3432 let schema = file_metadata.file_metadata().schema();
3433 let list_field = &schema.get_fields()[0].get_fields()[0];
3435 assert_eq!(list_field.get_fields()[0].name(), "element");
3436
3437 let map_field = &schema.get_fields()[1].get_fields()[0];
3438 assert_eq!(map_field.name(), "key_value");
3440 assert_eq!(map_field.get_fields()[0].name(), "key");
3442 assert_eq!(map_field.get_fields()[1].name(), "value");
3444
3445 let reader = SerializedFileReader::new(file).unwrap();
3447 let file_schema = reader.metadata().file_metadata().schema();
3448 let fields = file_schema.get_fields();
3449 let list_field = &fields[0].get_fields()[0];
3450 assert_eq!(list_field.get_fields()[0].name(), "element");
3451 let map_field = &fields[1].get_fields()[0];
3452 assert_eq!(map_field.name(), "key_value");
3453 assert_eq!(map_field.get_fields()[0].name(), "key");
3454 assert_eq!(map_field.get_fields()[1].name(), "value");
3455 }
3456
3457 #[test]
3458 fn fallback_flush_data_page() {
3459 let raw_values: Vec<_> = (0..MEDIUM_SIZE).map(|i| i.to_string()).collect();
3461 let values = Arc::new(StringArray::from(raw_values));
3462 let encodings = vec![
3463 Encoding::DELTA_BYTE_ARRAY,
3464 Encoding::DELTA_LENGTH_BYTE_ARRAY,
3465 ];
3466 let data_type = values.data_type().clone();
3467 let schema = Arc::new(Schema::new(vec![Field::new("col", data_type, false)]));
3468 let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3469
3470 let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
3471 let data_page_size_limit: usize = 32;
3472 let write_batch_size: usize = 16;
3473
3474 for encoding in &encodings {
3475 for row_group_size in row_group_sizes {
3476 let props = WriterProperties::builder()
3477 .set_writer_version(WriterVersion::PARQUET_2_0)
3478 .set_max_row_group_row_count(Some(row_group_size))
3479 .set_dictionary_enabled(false)
3480 .set_encoding(*encoding)
3481 .set_data_page_size_limit(data_page_size_limit)
3482 .set_write_batch_size(write_batch_size)
3483 .build();
3484
3485 roundtrip_opts_with_array_validation(&expected_batch, props, |a, b| {
3486 let string_array_a = StringArray::from(a.clone());
3487 let string_array_b = StringArray::from(b.clone());
3488 let vec_a: Vec<&str> = string_array_a.iter().map(|v| v.unwrap()).collect();
3489 let vec_b: Vec<&str> = string_array_b.iter().map(|v| v.unwrap()).collect();
3490 assert_eq!(
3491 vec_a, vec_b,
3492 "failed for encoder: {encoding:?} and row_group_size: {row_group_size:?}"
3493 );
3494 });
3495 }
3496 }
3497 }
3498
3499 #[test]
3500 fn arrow_writer_string_dictionary() {
3501 #[allow(deprecated)]
3503 let schema = Arc::new(Schema::new(vec![Field::new_dict(
3504 "dictionary",
3505 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3506 true,
3507 42,
3508 true,
3509 )]));
3510
3511 let d: Int32DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
3513 .iter()
3514 .copied()
3515 .collect();
3516
3517 one_column_roundtrip_with_schema(Arc::new(d), schema);
3519 }
3520
3521 #[test]
3522 fn arrow_writer_test_type_compatibility() {
3523 fn ensure_compatible_write<T1, T2>(array1: T1, array2: T2, expected_result: T1)
3524 where
3525 T1: Array + 'static,
3526 T2: Array + 'static,
3527 {
3528 let schema1 = Arc::new(Schema::new(vec![Field::new(
3529 "a",
3530 array1.data_type().clone(),
3531 false,
3532 )]));
3533
3534 let file = tempfile().unwrap();
3535 let mut writer =
3536 ArrowWriter::try_new(file.try_clone().unwrap(), schema1.clone(), None).unwrap();
3537
3538 let rb1 = RecordBatch::try_new(schema1.clone(), vec![Arc::new(array1)]).unwrap();
3539 writer.write(&rb1).unwrap();
3540
3541 let schema2 = Arc::new(Schema::new(vec![Field::new(
3542 "a",
3543 array2.data_type().clone(),
3544 false,
3545 )]));
3546 let rb2 = RecordBatch::try_new(schema2, vec![Arc::new(array2)]).unwrap();
3547 writer.write(&rb2).unwrap();
3548
3549 writer.close().unwrap();
3550
3551 let mut record_batch_reader =
3552 ParquetRecordBatchReader::try_new(file.try_clone().unwrap(), 1024).unwrap();
3553 let actual_batch = record_batch_reader.next().unwrap().unwrap();
3554
3555 let expected_batch =
3556 RecordBatch::try_new(schema1, vec![Arc::new(expected_result)]).unwrap();
3557 assert_eq!(actual_batch, expected_batch);
3558 }
3559
3560 ensure_compatible_write(
3563 DictionaryArray::new(
3564 UInt8Array::from_iter_values(vec![0]),
3565 Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3566 ),
3567 StringArray::from_iter_values(vec!["barquet"]),
3568 DictionaryArray::new(
3569 UInt8Array::from_iter_values(vec![0, 1]),
3570 Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3571 ),
3572 );
3573
3574 ensure_compatible_write(
3575 StringArray::from_iter_values(vec!["parquet"]),
3576 DictionaryArray::new(
3577 UInt8Array::from_iter_values(vec![0]),
3578 Arc::new(StringArray::from_iter_values(vec!["barquet"])),
3579 ),
3580 StringArray::from_iter_values(vec!["parquet", "barquet"]),
3581 );
3582
3583 ensure_compatible_write(
3586 DictionaryArray::new(
3587 UInt8Array::from_iter_values(vec![0]),
3588 Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3589 ),
3590 DictionaryArray::new(
3591 UInt16Array::from_iter_values(vec![0]),
3592 Arc::new(StringArray::from_iter_values(vec!["barquet"])),
3593 ),
3594 DictionaryArray::new(
3595 UInt8Array::from_iter_values(vec![0, 1]),
3596 Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3597 ),
3598 );
3599
3600 ensure_compatible_write(
3602 DictionaryArray::new(
3603 UInt8Array::from_iter_values(vec![0]),
3604 Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3605 ),
3606 DictionaryArray::new(
3607 UInt8Array::from_iter_values(vec![0]),
3608 Arc::new(LargeStringArray::from_iter_values(vec!["barquet"])),
3609 ),
3610 DictionaryArray::new(
3611 UInt8Array::from_iter_values(vec![0, 1]),
3612 Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3613 ),
3614 );
3615
3616 ensure_compatible_write(
3618 DictionaryArray::new(
3619 UInt8Array::from_iter_values(vec![0]),
3620 Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3621 ),
3622 LargeStringArray::from_iter_values(vec!["barquet"]),
3623 DictionaryArray::new(
3624 UInt8Array::from_iter_values(vec![0, 1]),
3625 Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3626 ),
3627 );
3628
3629 ensure_compatible_write(
3632 StringArray::from_iter_values(vec!["parquet"]),
3633 LargeStringArray::from_iter_values(vec!["barquet"]),
3634 StringArray::from_iter_values(vec!["parquet", "barquet"]),
3635 );
3636
3637 ensure_compatible_write(
3638 LargeStringArray::from_iter_values(vec!["parquet"]),
3639 StringArray::from_iter_values(vec!["barquet"]),
3640 LargeStringArray::from_iter_values(vec!["parquet", "barquet"]),
3641 );
3642
3643 ensure_compatible_write(
3644 StringArray::from_iter_values(vec!["parquet"]),
3645 StringViewArray::from_iter_values(vec!["barquet"]),
3646 StringArray::from_iter_values(vec!["parquet", "barquet"]),
3647 );
3648
3649 ensure_compatible_write(
3650 StringViewArray::from_iter_values(vec!["parquet"]),
3651 StringArray::from_iter_values(vec!["barquet"]),
3652 StringViewArray::from_iter_values(vec!["parquet", "barquet"]),
3653 );
3654
3655 ensure_compatible_write(
3656 LargeStringArray::from_iter_values(vec!["parquet"]),
3657 StringViewArray::from_iter_values(vec!["barquet"]),
3658 LargeStringArray::from_iter_values(vec!["parquet", "barquet"]),
3659 );
3660
3661 ensure_compatible_write(
3662 StringViewArray::from_iter_values(vec!["parquet"]),
3663 LargeStringArray::from_iter_values(vec!["barquet"]),
3664 StringViewArray::from_iter_values(vec!["parquet", "barquet"]),
3665 );
3666
3667 ensure_compatible_write(
3670 BinaryArray::from_iter_values(vec![b"parquet"]),
3671 LargeBinaryArray::from_iter_values(vec![b"barquet"]),
3672 BinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3673 );
3674
3675 ensure_compatible_write(
3676 LargeBinaryArray::from_iter_values(vec![b"parquet"]),
3677 BinaryArray::from_iter_values(vec![b"barquet"]),
3678 LargeBinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3679 );
3680
3681 ensure_compatible_write(
3682 BinaryArray::from_iter_values(vec![b"parquet"]),
3683 BinaryViewArray::from_iter_values(vec![b"barquet"]),
3684 BinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3685 );
3686
3687 ensure_compatible_write(
3688 BinaryViewArray::from_iter_values(vec![b"parquet"]),
3689 BinaryArray::from_iter_values(vec![b"barquet"]),
3690 BinaryViewArray::from_iter_values(vec![b"parquet", b"barquet"]),
3691 );
3692
3693 ensure_compatible_write(
3694 BinaryViewArray::from_iter_values(vec![b"parquet"]),
3695 LargeBinaryArray::from_iter_values(vec![b"barquet"]),
3696 BinaryViewArray::from_iter_values(vec![b"parquet", b"barquet"]),
3697 );
3698
3699 ensure_compatible_write(
3700 LargeBinaryArray::from_iter_values(vec![b"parquet"]),
3701 BinaryViewArray::from_iter_values(vec![b"barquet"]),
3702 LargeBinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
3703 );
3704
3705 let list_field_metadata = HashMap::from_iter(vec![(
3708 PARQUET_FIELD_ID_META_KEY.to_string(),
3709 "1".to_string(),
3710 )]);
3711 let list_field = Field::new_list_field(DataType::Int32, false);
3712
3713 let values1 = Arc::new(Int32Array::from(vec![0, 1, 2, 3, 4]));
3714 let offsets1 = OffsetBuffer::new(vec![0, 2, 5].into());
3715
3716 let values2 = Arc::new(Int32Array::from(vec![5, 6, 7, 8, 9]));
3717 let offsets2 = OffsetBuffer::new(vec![0, 3, 5].into());
3718
3719 let values_expected = Arc::new(Int32Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]));
3720 let offsets_expected = OffsetBuffer::new(vec![0, 2, 5, 8, 10].into());
3721
3722 ensure_compatible_write(
3723 ListArray::try_new(
3725 Arc::new(
3726 list_field
3727 .clone()
3728 .with_metadata(list_field_metadata.clone()),
3729 ),
3730 offsets1,
3731 values1,
3732 None,
3733 )
3734 .unwrap(),
3735 ListArray::try_new(Arc::new(list_field.clone()), offsets2, values2, None).unwrap(),
3737 ListArray::try_new(
3739 Arc::new(
3740 list_field
3741 .clone()
3742 .with_metadata(list_field_metadata.clone()),
3743 ),
3744 offsets_expected,
3745 values_expected,
3746 None,
3747 )
3748 .unwrap(),
3749 );
3750 }
3751
3752 #[test]
3753 fn arrow_writer_primitive_dictionary() {
3754 #[allow(deprecated)]
3756 let schema = Arc::new(Schema::new(vec![Field::new_dict(
3757 "dictionary",
3758 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::UInt32)),
3759 true,
3760 42,
3761 true,
3762 )]));
3763
3764 let mut builder = PrimitiveDictionaryBuilder::<UInt8Type, UInt32Type>::new();
3766 builder.append(12345678).unwrap();
3767 builder.append_null();
3768 builder.append(22345678).unwrap();
3769 builder.append(12345678).unwrap();
3770 let d = builder.finish();
3771
3772 one_column_roundtrip_with_schema(Arc::new(d), schema);
3773 }
3774
3775 #[test]
3776 fn arrow_writer_decimal32_dictionary() {
3777 let integers = vec![12345, 56789, 34567];
3778
3779 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3780
3781 let values = Decimal32Array::from(integers.clone())
3782 .with_precision_and_scale(5, 2)
3783 .unwrap();
3784
3785 let array = DictionaryArray::new(keys, Arc::new(values));
3786 one_column_roundtrip(Arc::new(array.clone()), true);
3787
3788 let values = Decimal32Array::from(integers)
3789 .with_precision_and_scale(9, 2)
3790 .unwrap();
3791
3792 let array = array.with_values(Arc::new(values));
3793 one_column_roundtrip(Arc::new(array), true);
3794 }
3795
3796 #[test]
3797 fn arrow_writer_decimal64_dictionary() {
3798 let integers = vec![12345, 56789, 34567];
3799
3800 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3801
3802 let values = Decimal64Array::from(integers.clone())
3803 .with_precision_and_scale(5, 2)
3804 .unwrap();
3805
3806 let array = DictionaryArray::new(keys, Arc::new(values));
3807 one_column_roundtrip(Arc::new(array.clone()), true);
3808
3809 let values = Decimal64Array::from(integers)
3810 .with_precision_and_scale(12, 2)
3811 .unwrap();
3812
3813 let array = array.with_values(Arc::new(values));
3814 one_column_roundtrip(Arc::new(array), true);
3815 }
3816
3817 #[test]
3818 fn arrow_writer_decimal128_dictionary() {
3819 let integers = vec![12345, 56789, 34567];
3820
3821 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3822
3823 let values = Decimal128Array::from(integers.clone())
3824 .with_precision_and_scale(5, 2)
3825 .unwrap();
3826
3827 let array = DictionaryArray::new(keys, Arc::new(values));
3828 one_column_roundtrip(Arc::new(array.clone()), true);
3829
3830 let values = Decimal128Array::from(integers)
3831 .with_precision_and_scale(12, 2)
3832 .unwrap();
3833
3834 let array = array.with_values(Arc::new(values));
3835 one_column_roundtrip(Arc::new(array), true);
3836 }
3837
3838 #[test]
3839 fn arrow_writer_decimal256_dictionary() {
3840 let integers = vec![
3841 i256::from_i128(12345),
3842 i256::from_i128(56789),
3843 i256::from_i128(34567),
3844 ];
3845
3846 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
3847
3848 let values = Decimal256Array::from(integers.clone())
3849 .with_precision_and_scale(5, 2)
3850 .unwrap();
3851
3852 let array = DictionaryArray::new(keys, Arc::new(values));
3853 one_column_roundtrip(Arc::new(array.clone()), true);
3854
3855 let values = Decimal256Array::from(integers)
3856 .with_precision_and_scale(12, 2)
3857 .unwrap();
3858
3859 let array = array.with_values(Arc::new(values));
3860 one_column_roundtrip(Arc::new(array), true);
3861 }
3862
3863 #[test]
3864 fn arrow_writer_string_dictionary_unsigned_index() {
3865 #[allow(deprecated)]
3867 let schema = Arc::new(Schema::new(vec![Field::new_dict(
3868 "dictionary",
3869 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3870 true,
3871 42,
3872 true,
3873 )]));
3874
3875 let d: UInt8DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
3877 .iter()
3878 .copied()
3879 .collect();
3880
3881 one_column_roundtrip_with_schema(Arc::new(d), schema);
3882 }
3883
3884 #[test]
3885 fn u32_min_max() {
3886 let src = [
3888 u32::MIN,
3889 u32::MIN + 1,
3890 (i32::MAX as u32) - 1,
3891 i32::MAX as u32,
3892 (i32::MAX as u32) + 1,
3893 u32::MAX - 1,
3894 u32::MAX,
3895 ];
3896 let values = Arc::new(UInt32Array::from_iter_values(src.iter().cloned()));
3897 let files = one_column_roundtrip(values, false);
3898
3899 for file in files {
3900 let reader = SerializedFileReader::new(file).unwrap();
3902 let metadata = reader.metadata();
3903
3904 let mut row_offset = 0;
3905 for row_group in metadata.row_groups() {
3906 assert_eq!(row_group.num_columns(), 1);
3907 let column = row_group.column(0);
3908
3909 let num_values = column.num_values() as usize;
3910 let src_slice = &src[row_offset..row_offset + num_values];
3911 row_offset += column.num_values() as usize;
3912
3913 let stats = column.statistics().unwrap();
3914 if let Statistics::Int32(stats) = stats {
3915 assert_eq!(
3916 *stats.min_opt().unwrap() as u32,
3917 *src_slice.iter().min().unwrap()
3918 );
3919 assert_eq!(
3920 *stats.max_opt().unwrap() as u32,
3921 *src_slice.iter().max().unwrap()
3922 );
3923 } else {
3924 panic!("Statistics::Int32 missing")
3925 }
3926 }
3927 }
3928 }
3929
3930 #[test]
3931 fn u64_min_max() {
3932 let src = [
3934 u64::MIN,
3935 u64::MIN + 1,
3936 (i64::MAX as u64) - 1,
3937 i64::MAX as u64,
3938 (i64::MAX as u64) + 1,
3939 u64::MAX - 1,
3940 u64::MAX,
3941 ];
3942 let values = Arc::new(UInt64Array::from_iter_values(src.iter().cloned()));
3943 let files = one_column_roundtrip(values, false);
3944
3945 for file in files {
3946 let reader = SerializedFileReader::new(file).unwrap();
3948 let metadata = reader.metadata();
3949
3950 let mut row_offset = 0;
3951 for row_group in metadata.row_groups() {
3952 assert_eq!(row_group.num_columns(), 1);
3953 let column = row_group.column(0);
3954
3955 let num_values = column.num_values() as usize;
3956 let src_slice = &src[row_offset..row_offset + num_values];
3957 row_offset += column.num_values() as usize;
3958
3959 let stats = column.statistics().unwrap();
3960 if let Statistics::Int64(stats) = stats {
3961 assert_eq!(
3962 *stats.min_opt().unwrap() as u64,
3963 *src_slice.iter().min().unwrap()
3964 );
3965 assert_eq!(
3966 *stats.max_opt().unwrap() as u64,
3967 *src_slice.iter().max().unwrap()
3968 );
3969 } else {
3970 panic!("Statistics::Int64 missing")
3971 }
3972 }
3973 }
3974 }
3975
3976 #[test]
3977 fn statistics_null_counts_only_nulls() {
3978 let values = Arc::new(UInt64Array::from(vec![None, None]));
3980 let files = one_column_roundtrip(values, true);
3981
3982 for file in files {
3983 let reader = SerializedFileReader::new(file).unwrap();
3985 let metadata = reader.metadata();
3986 assert_eq!(metadata.num_row_groups(), 1);
3987 let row_group = metadata.row_group(0);
3988 assert_eq!(row_group.num_columns(), 1);
3989 let column = row_group.column(0);
3990 let stats = column.statistics().unwrap();
3991 assert_eq!(stats.null_count_opt(), Some(2));
3992 }
3993 }
3994
3995 #[test]
3996 fn test_list_of_struct_roundtrip() {
3997 let int_field = Field::new("a", DataType::Int32, true);
3999 let int_field2 = Field::new("b", DataType::Int32, true);
4000
4001 let int_builder = Int32Builder::with_capacity(10);
4002 let int_builder2 = Int32Builder::with_capacity(10);
4003
4004 let struct_builder = StructBuilder::new(
4005 vec![int_field, int_field2],
4006 vec![Box::new(int_builder), Box::new(int_builder2)],
4007 );
4008 let mut list_builder = ListBuilder::new(struct_builder);
4009
4010 let values = list_builder.values();
4015 values
4016 .field_builder::<Int32Builder>(0)
4017 .unwrap()
4018 .append_value(1);
4019 values
4020 .field_builder::<Int32Builder>(1)
4021 .unwrap()
4022 .append_value(2);
4023 values.append(true);
4024 list_builder.append(true);
4025
4026 list_builder.append(true);
4028
4029 list_builder.append(false);
4031
4032 let values = list_builder.values();
4034 values
4035 .field_builder::<Int32Builder>(0)
4036 .unwrap()
4037 .append_null();
4038 values
4039 .field_builder::<Int32Builder>(1)
4040 .unwrap()
4041 .append_null();
4042 values.append(false);
4043 values
4044 .field_builder::<Int32Builder>(0)
4045 .unwrap()
4046 .append_null();
4047 values
4048 .field_builder::<Int32Builder>(1)
4049 .unwrap()
4050 .append_null();
4051 values.append(false);
4052 list_builder.append(true);
4053
4054 let values = list_builder.values();
4056 values
4057 .field_builder::<Int32Builder>(0)
4058 .unwrap()
4059 .append_null();
4060 values
4061 .field_builder::<Int32Builder>(1)
4062 .unwrap()
4063 .append_value(3);
4064 values.append(true);
4065 list_builder.append(true);
4066
4067 let values = list_builder.values();
4069 values
4070 .field_builder::<Int32Builder>(0)
4071 .unwrap()
4072 .append_value(2);
4073 values
4074 .field_builder::<Int32Builder>(1)
4075 .unwrap()
4076 .append_null();
4077 values.append(true);
4078 list_builder.append(true);
4079
4080 let array = Arc::new(list_builder.finish());
4081
4082 one_column_roundtrip(array, true);
4083 }
4084
4085 fn row_group_sizes(metadata: &ParquetMetaData) -> Vec<i64> {
4086 metadata.row_groups().iter().map(|x| x.num_rows()).collect()
4087 }
4088
4089 #[test]
4090 fn test_aggregates_records() {
4091 let arrays = [
4092 Int32Array::from((0..100).collect::<Vec<_>>()),
4093 Int32Array::from((0..50).collect::<Vec<_>>()),
4094 Int32Array::from((200..500).collect::<Vec<_>>()),
4095 ];
4096
4097 let schema = Arc::new(Schema::new(vec![Field::new(
4098 "int",
4099 ArrowDataType::Int32,
4100 false,
4101 )]));
4102
4103 let file = tempfile::tempfile().unwrap();
4104
4105 let props = WriterProperties::builder()
4106 .set_max_row_group_row_count(Some(200))
4107 .build();
4108
4109 let mut writer =
4110 ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
4111
4112 for array in arrays {
4113 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
4114 writer.write(&batch).unwrap();
4115 }
4116
4117 writer.close().unwrap();
4118
4119 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4120 assert_eq!(&row_group_sizes(builder.metadata()), &[200, 200, 50]);
4121
4122 let batches = builder
4123 .with_batch_size(100)
4124 .build()
4125 .unwrap()
4126 .collect::<ArrowResult<Vec<_>>>()
4127 .unwrap();
4128
4129 assert_eq!(batches.len(), 5);
4130 assert!(batches.iter().all(|x| x.num_columns() == 1));
4131
4132 let batch_sizes: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
4133
4134 assert_eq!(&batch_sizes, &[100, 100, 100, 100, 50]);
4135
4136 let values: Vec<_> = batches
4137 .iter()
4138 .flat_map(|x| {
4139 x.column(0)
4140 .as_any()
4141 .downcast_ref::<Int32Array>()
4142 .unwrap()
4143 .values()
4144 .iter()
4145 .cloned()
4146 })
4147 .collect();
4148
4149 let expected_values: Vec<_> = [0..100, 0..50, 200..500].into_iter().flatten().collect();
4150 assert_eq!(&values, &expected_values)
4151 }
4152
4153 #[test]
4154 fn complex_aggregate() {
4155 let field_a = Arc::new(Field::new("leaf_a", DataType::Int32, false));
4157 let field_b = Arc::new(Field::new("leaf_b", DataType::Int32, true));
4158 let struct_a = Arc::new(Field::new(
4159 "struct_a",
4160 DataType::Struct(vec![field_a.clone(), field_b.clone()].into()),
4161 true,
4162 ));
4163
4164 let list_a = Arc::new(Field::new("list", DataType::List(struct_a), true));
4165 let struct_b = Arc::new(Field::new(
4166 "struct_b",
4167 DataType::Struct(vec![list_a.clone()].into()),
4168 false,
4169 ));
4170
4171 let schema = Arc::new(Schema::new(vec![struct_b]));
4172
4173 let field_a_array = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
4175 let field_b_array =
4176 Int32Array::from_iter(vec![Some(1), None, Some(2), None, None, Some(6)]);
4177
4178 let struct_a_array = StructArray::from(vec![
4179 (field_a.clone(), Arc::new(field_a_array) as ArrayRef),
4180 (field_b.clone(), Arc::new(field_b_array) as ArrayRef),
4181 ]);
4182
4183 let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
4184 .len(5)
4185 .add_buffer(Buffer::from_iter(vec![
4186 0_i32, 1_i32, 1_i32, 3_i32, 3_i32, 5_i32,
4187 ]))
4188 .null_bit_buffer(Some(Buffer::from_iter(vec![
4189 true, false, true, false, true,
4190 ])))
4191 .child_data(vec![struct_a_array.into_data()])
4192 .build()
4193 .unwrap();
4194
4195 let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
4196 let struct_b_array = StructArray::from(vec![(list_a.clone(), list_a_array)]);
4197
4198 let batch1 =
4199 RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
4200 .unwrap();
4201
4202 let field_a_array = Int32Array::from(vec![6, 7, 8, 9, 10]);
4203 let field_b_array = Int32Array::from_iter(vec![None, None, None, Some(1), None]);
4204
4205 let struct_a_array = StructArray::from(vec![
4206 (field_a, Arc::new(field_a_array) as ArrayRef),
4207 (field_b, Arc::new(field_b_array) as ArrayRef),
4208 ]);
4209
4210 let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
4211 .len(2)
4212 .add_buffer(Buffer::from_iter(vec![0_i32, 4_i32, 5_i32]))
4213 .child_data(vec![struct_a_array.into_data()])
4214 .build()
4215 .unwrap();
4216
4217 let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
4218 let struct_b_array = StructArray::from(vec![(list_a, list_a_array)]);
4219
4220 let batch2 =
4221 RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
4222 .unwrap();
4223
4224 let batches = &[batch1, batch2];
4225
4226 let expected = r#"
4229 +-------------------------------------------------------------------------------------------------------+
4230 | struct_b |
4231 +-------------------------------------------------------------------------------------------------------+
4232 | {list: [{leaf_a: 1, leaf_b: 1}]} |
4233 | {list: } |
4234 | {list: [{leaf_a: 2, leaf_b: }, {leaf_a: 3, leaf_b: 2}]} |
4235 | {list: } |
4236 | {list: [{leaf_a: 4, leaf_b: }, {leaf_a: 5, leaf_b: }]} |
4237 | {list: [{leaf_a: 6, leaf_b: }, {leaf_a: 7, leaf_b: }, {leaf_a: 8, leaf_b: }, {leaf_a: 9, leaf_b: 1}]} |
4238 | {list: [{leaf_a: 10, leaf_b: }]} |
4239 +-------------------------------------------------------------------------------------------------------+
4240 "#.trim().split('\n').map(|x| x.trim()).collect::<Vec<_>>().join("\n");
4241
4242 let actual = pretty_format_batches(batches).unwrap().to_string();
4243 assert_eq!(actual, expected);
4244
4245 let file = tempfile::tempfile().unwrap();
4247 let props = WriterProperties::builder()
4248 .set_max_row_group_row_count(Some(6))
4249 .build();
4250
4251 let mut writer =
4252 ArrowWriter::try_new(file.try_clone().unwrap(), schema, Some(props)).unwrap();
4253
4254 for batch in batches {
4255 writer.write(batch).unwrap();
4256 }
4257 writer.close().unwrap();
4258
4259 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4264 assert_eq!(&row_group_sizes(builder.metadata()), &[6, 1]);
4265
4266 let batches = builder
4267 .with_batch_size(2)
4268 .build()
4269 .unwrap()
4270 .collect::<ArrowResult<Vec<_>>>()
4271 .unwrap();
4272
4273 assert_eq!(batches.len(), 4);
4274 let batch_counts: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
4275 assert_eq!(&batch_counts, &[2, 2, 2, 1]);
4276
4277 let actual = pretty_format_batches(&batches).unwrap().to_string();
4278 assert_eq!(actual, expected);
4279 }
4280
4281 #[test]
4282 fn test_arrow_writer_metadata() {
4283 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
4284 let file_schema = batch_schema.clone().with_metadata(
4285 vec![("foo".to_string(), "bar".to_string())]
4286 .into_iter()
4287 .collect(),
4288 );
4289
4290 let batch = RecordBatch::try_new(
4291 Arc::new(batch_schema),
4292 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4293 )
4294 .unwrap();
4295
4296 let mut buf = Vec::with_capacity(1024);
4297 let mut writer = ArrowWriter::try_new(&mut buf, Arc::new(file_schema), None).unwrap();
4298 writer.write(&batch).unwrap();
4299 writer.close().unwrap();
4300 }
4301
4302 #[test]
4303 fn test_arrow_writer_nullable() {
4304 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
4305 let file_schema = Schema::new(vec![Field::new("int32", DataType::Int32, true)]);
4306 let file_schema = Arc::new(file_schema);
4307
4308 let batch = RecordBatch::try_new(
4309 Arc::new(batch_schema),
4310 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4311 )
4312 .unwrap();
4313
4314 let mut buf = Vec::with_capacity(1024);
4315 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
4316 writer.write(&batch).unwrap();
4317 writer.close().unwrap();
4318
4319 let mut read = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
4320 let back = read.next().unwrap().unwrap();
4321 assert_eq!(back.schema(), file_schema);
4322 assert_ne!(back.schema(), batch.schema());
4323 assert_eq!(back.column(0).as_ref(), batch.column(0).as_ref());
4324 }
4325
4326 #[test]
4327 fn in_progress_accounting() {
4328 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
4330
4331 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
4333
4334 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
4336
4337 let mut writer = ArrowWriter::try_new(vec![], batch.schema(), None).unwrap();
4338
4339 assert_eq!(writer.in_progress_size(), 0);
4341 assert_eq!(writer.in_progress_rows(), 0);
4342 assert_eq!(writer.memory_size(), 0);
4343 assert_eq!(writer.bytes_written(), 4); writer.write(&batch).unwrap();
4345
4346 let initial_size = writer.in_progress_size();
4348 assert!(initial_size > 0);
4349 assert_eq!(writer.in_progress_rows(), 5);
4350 let initial_memory = writer.memory_size();
4351 assert!(initial_memory > 0);
4352 assert!(
4354 initial_size <= initial_memory,
4355 "{initial_size} <= {initial_memory}"
4356 );
4357
4358 writer.write(&batch).unwrap();
4360 assert!(writer.in_progress_size() > initial_size);
4361 assert_eq!(writer.in_progress_rows(), 10);
4362 assert!(writer.memory_size() > initial_memory);
4363 assert!(
4364 writer.in_progress_size() <= writer.memory_size(),
4365 "in_progress_size {} <= memory_size {}",
4366 writer.in_progress_size(),
4367 writer.memory_size()
4368 );
4369
4370 let pre_flush_bytes_written = writer.bytes_written();
4372 writer.flush().unwrap();
4373 assert_eq!(writer.in_progress_size(), 0);
4374 assert_eq!(writer.memory_size(), 0);
4375 assert!(writer.bytes_written() > pre_flush_bytes_written);
4376
4377 writer.close().unwrap();
4378 }
4379
4380 #[test]
4381 fn test_writer_all_null() {
4382 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
4383 let b = Int32Array::new(vec![0; 5].into(), Some(NullBuffer::new_null(5)));
4384 let batch = RecordBatch::try_from_iter(vec![
4385 ("a", Arc::new(a) as ArrayRef),
4386 ("b", Arc::new(b) as ArrayRef),
4387 ])
4388 .unwrap();
4389
4390 let mut buf = Vec::with_capacity(1024);
4391 let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), None).unwrap();
4392 writer.write(&batch).unwrap();
4393 writer.close().unwrap();
4394
4395 let bytes = Bytes::from(buf);
4396 let options = ReadOptionsBuilder::new().with_page_index().build();
4397 let reader = SerializedFileReader::new_with_options(bytes, options).unwrap();
4398 let index = reader.metadata().offset_index().unwrap();
4399
4400 assert_eq!(index.len(), 1);
4401 assert_eq!(index[0].len(), 2); assert_eq!(index[0][0].page_locations().len(), 1); assert_eq!(index[0][1].page_locations().len(), 1); }
4405
4406 #[test]
4407 fn test_disabled_statistics_with_page() {
4408 let file_schema = Schema::new(vec![
4409 Field::new("a", DataType::Utf8, true),
4410 Field::new("b", DataType::Utf8, true),
4411 ]);
4412 let file_schema = Arc::new(file_schema);
4413
4414 let batch = RecordBatch::try_new(
4415 file_schema.clone(),
4416 vec![
4417 Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
4418 Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
4419 ],
4420 )
4421 .unwrap();
4422
4423 let props = WriterProperties::builder()
4424 .set_statistics_enabled(EnabledStatistics::None)
4425 .set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
4426 .build();
4427
4428 let mut buf = Vec::with_capacity(1024);
4429 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
4430 writer.write(&batch).unwrap();
4431
4432 let metadata = writer.close().unwrap();
4433 assert_eq!(metadata.num_row_groups(), 1);
4434 let row_group = metadata.row_group(0);
4435 assert_eq!(row_group.num_columns(), 2);
4436 assert!(row_group.column(0).offset_index_offset().is_some());
4438 assert!(row_group.column(0).column_index_offset().is_some());
4439 assert!(row_group.column(1).offset_index_offset().is_some());
4441 assert!(row_group.column(1).column_index_offset().is_none());
4442
4443 let options = ReadOptionsBuilder::new().with_page_index().build();
4444 let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
4445
4446 let row_group = reader.get_row_group(0).unwrap();
4447 let a_col = row_group.metadata().column(0);
4448 let b_col = row_group.metadata().column(1);
4449
4450 if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
4452 let min = byte_array_stats.min_opt().unwrap();
4453 let max = byte_array_stats.max_opt().unwrap();
4454
4455 assert_eq!(min.as_bytes(), b"a");
4456 assert_eq!(max.as_bytes(), b"d");
4457 } else {
4458 panic!("expecting Statistics::ByteArray");
4459 }
4460
4461 assert!(b_col.statistics().is_none());
4463
4464 let offset_index = reader.metadata().offset_index().unwrap();
4465 assert_eq!(offset_index.len(), 1); assert_eq!(offset_index[0].len(), 2); let column_index = reader.metadata().column_index().unwrap();
4469 assert_eq!(column_index.len(), 1); assert_eq!(column_index[0].len(), 2); let a_idx = &column_index[0][0];
4473 assert!(
4474 matches!(a_idx, ColumnIndexMetaData::BYTE_ARRAY(_)),
4475 "{a_idx:?}"
4476 );
4477 let b_idx = &column_index[0][1];
4478 assert!(matches!(b_idx, ColumnIndexMetaData::NONE), "{b_idx:?}");
4479 }
4480
4481 #[test]
4482 fn test_disabled_statistics_with_chunk() {
4483 let file_schema = Schema::new(vec![
4484 Field::new("a", DataType::Utf8, true),
4485 Field::new("b", DataType::Utf8, true),
4486 ]);
4487 let file_schema = Arc::new(file_schema);
4488
4489 let batch = RecordBatch::try_new(
4490 file_schema.clone(),
4491 vec![
4492 Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
4493 Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
4494 ],
4495 )
4496 .unwrap();
4497
4498 let props = WriterProperties::builder()
4499 .set_statistics_enabled(EnabledStatistics::None)
4500 .set_column_statistics_enabled("a".into(), EnabledStatistics::Chunk)
4501 .build();
4502
4503 let mut buf = Vec::with_capacity(1024);
4504 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
4505 writer.write(&batch).unwrap();
4506
4507 let metadata = writer.close().unwrap();
4508 assert_eq!(metadata.num_row_groups(), 1);
4509 let row_group = metadata.row_group(0);
4510 assert_eq!(row_group.num_columns(), 2);
4511 assert!(row_group.column(0).offset_index_offset().is_some());
4513 assert!(row_group.column(0).column_index_offset().is_none());
4514 assert!(row_group.column(1).offset_index_offset().is_some());
4516 assert!(row_group.column(1).column_index_offset().is_none());
4517
4518 let options = ReadOptionsBuilder::new().with_page_index().build();
4519 let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
4520
4521 let row_group = reader.get_row_group(0).unwrap();
4522 let a_col = row_group.metadata().column(0);
4523 let b_col = row_group.metadata().column(1);
4524
4525 if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
4527 let min = byte_array_stats.min_opt().unwrap();
4528 let max = byte_array_stats.max_opt().unwrap();
4529
4530 assert_eq!(min.as_bytes(), b"a");
4531 assert_eq!(max.as_bytes(), b"d");
4532 } else {
4533 panic!("expecting Statistics::ByteArray");
4534 }
4535
4536 assert!(b_col.statistics().is_none());
4538
4539 let column_index = reader.metadata().column_index().unwrap();
4540 assert_eq!(column_index.len(), 1); assert_eq!(column_index[0].len(), 2); let a_idx = &column_index[0][0];
4544 assert!(matches!(a_idx, ColumnIndexMetaData::NONE), "{a_idx:?}");
4545 let b_idx = &column_index[0][1];
4546 assert!(matches!(b_idx, ColumnIndexMetaData::NONE), "{b_idx:?}");
4547 }
4548
4549 #[test]
4550 fn test_arrow_writer_skip_metadata() {
4551 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
4552 let file_schema = Arc::new(batch_schema.clone());
4553
4554 let batch = RecordBatch::try_new(
4555 Arc::new(batch_schema),
4556 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4557 )
4558 .unwrap();
4559 let skip_options = ArrowWriterOptions::new().with_skip_arrow_metadata(true);
4560
4561 let mut buf = Vec::with_capacity(1024);
4562 let mut writer =
4563 ArrowWriter::try_new_with_options(&mut buf, file_schema.clone(), skip_options).unwrap();
4564 writer.write(&batch).unwrap();
4565 writer.close().unwrap();
4566
4567 let bytes = Bytes::from(buf);
4568 let reader_builder = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
4569 assert_eq!(file_schema, *reader_builder.schema());
4570 if let Some(key_value_metadata) = reader_builder
4571 .metadata()
4572 .file_metadata()
4573 .key_value_metadata()
4574 {
4575 assert!(
4576 !key_value_metadata
4577 .iter()
4578 .any(|kv| kv.key.as_str() == ARROW_SCHEMA_META_KEY)
4579 );
4580 }
4581 }
4582
4583 #[test]
4584 fn mismatched_schemas() {
4585 let batch_schema = Schema::new(vec![Field::new("count", DataType::Int32, false)]);
4586 let file_schema = Arc::new(Schema::new(vec![Field::new(
4587 "temperature",
4588 DataType::Float64,
4589 false,
4590 )]));
4591
4592 let batch = RecordBatch::try_new(
4593 Arc::new(batch_schema),
4594 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4595 )
4596 .unwrap();
4597
4598 let mut buf = Vec::with_capacity(1024);
4599 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
4600
4601 let err = writer.write(&batch).unwrap_err().to_string();
4602 assert_eq!(
4603 err,
4604 "Arrow: Incompatible type. Field 'temperature' has type Float64, array has type Int32"
4605 );
4606 }
4607
4608 #[test]
4609 fn test_roundtrip_empty_schema() {
4611 let empty_batch = RecordBatch::try_new_with_options(
4613 Arc::new(Schema::empty()),
4614 vec![],
4615 &RecordBatchOptions::default().with_row_count(Some(0)),
4616 )
4617 .unwrap();
4618
4619 let mut parquet_bytes: Vec<u8> = Vec::new();
4621 let mut writer =
4622 ArrowWriter::try_new(&mut parquet_bytes, empty_batch.schema(), None).unwrap();
4623 writer.write(&empty_batch).unwrap();
4624 writer.close().unwrap();
4625
4626 let bytes = Bytes::from(parquet_bytes);
4628 let reader = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
4629 assert_eq!(reader.schema(), &empty_batch.schema());
4630 let batches: Vec<_> = reader
4631 .build()
4632 .unwrap()
4633 .collect::<ArrowResult<Vec<_>>>()
4634 .unwrap();
4635 assert_eq!(batches.len(), 0);
4636 }
4637
4638 #[test]
4639 fn test_page_stats_not_written_by_default() {
4640 let string_field = Field::new("a", DataType::Utf8, false);
4641 let schema = Schema::new(vec![string_field]);
4642 let raw_string_values = vec!["Blart Versenwald III"];
4643 let string_values = StringArray::from(raw_string_values.clone());
4644 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
4645
4646 let props = WriterProperties::builder()
4647 .set_statistics_enabled(EnabledStatistics::Page)
4648 .set_dictionary_enabled(false)
4649 .set_encoding(Encoding::PLAIN)
4650 .set_compression(crate::basic::Compression::UNCOMPRESSED)
4651 .build();
4652
4653 let file = roundtrip_opts(&batch, props);
4654
4655 let first_page = &file[4..];
4660 let mut prot = ThriftSliceInputProtocol::new(first_page);
4661 let hdr = PageHeader::read_thrift(&mut prot).unwrap();
4662 let stats = hdr.data_page_header.unwrap().statistics;
4663
4664 assert!(stats.is_none());
4665 }
4666
4667 #[test]
4668 fn test_page_stats_when_enabled() {
4669 let string_field = Field::new("a", DataType::Utf8, false);
4670 let schema = Schema::new(vec![string_field]);
4671 let raw_string_values = vec!["Blart Versenwald III", "Andrew Lamb"];
4672 let string_values = StringArray::from(raw_string_values.clone());
4673 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
4674
4675 let props = WriterProperties::builder()
4676 .set_statistics_enabled(EnabledStatistics::Page)
4677 .set_dictionary_enabled(false)
4678 .set_encoding(Encoding::PLAIN)
4679 .set_write_page_header_statistics(true)
4680 .set_compression(crate::basic::Compression::UNCOMPRESSED)
4681 .build();
4682
4683 let file = roundtrip_opts(&batch, props);
4684
4685 let first_page = &file[4..];
4690 let mut prot = ThriftSliceInputProtocol::new(first_page);
4691 let hdr = PageHeader::read_thrift(&mut prot).unwrap();
4692 let stats = hdr.data_page_header.unwrap().statistics;
4693
4694 let stats = stats.unwrap();
4695 assert!(stats.is_max_value_exact.unwrap());
4697 assert!(stats.is_min_value_exact.unwrap());
4698 assert_eq!(stats.max_value.unwrap(), "Blart Versenwald III".as_bytes());
4699 assert_eq!(stats.min_value.unwrap(), "Andrew Lamb".as_bytes());
4700 }
4701
4702 #[test]
4703 fn test_page_stats_truncation() {
4704 let string_field = Field::new("a", DataType::Utf8, false);
4705 let binary_field = Field::new("b", DataType::Binary, false);
4706 let schema = Schema::new(vec![string_field, binary_field]);
4707
4708 let raw_string_values = vec!["Blart Versenwald III"];
4709 let raw_binary_values = [b"Blart Versenwald III".to_vec()];
4710 let raw_binary_value_refs = raw_binary_values
4711 .iter()
4712 .map(|x| x.as_slice())
4713 .collect::<Vec<_>>();
4714
4715 let string_values = StringArray::from(raw_string_values.clone());
4716 let binary_values = BinaryArray::from(raw_binary_value_refs);
4717 let batch = RecordBatch::try_new(
4718 Arc::new(schema),
4719 vec![Arc::new(string_values), Arc::new(binary_values)],
4720 )
4721 .unwrap();
4722
4723 let props = WriterProperties::builder()
4724 .set_statistics_truncate_length(Some(2))
4725 .set_dictionary_enabled(false)
4726 .set_encoding(Encoding::PLAIN)
4727 .set_write_page_header_statistics(true)
4728 .set_compression(crate::basic::Compression::UNCOMPRESSED)
4729 .build();
4730
4731 let file = roundtrip_opts(&batch, props);
4732
4733 let first_page = &file[4..];
4738 let mut prot = ThriftSliceInputProtocol::new(first_page);
4739 let hdr = PageHeader::read_thrift(&mut prot).unwrap();
4740 let stats = hdr.data_page_header.unwrap().statistics;
4741 assert!(stats.is_some());
4742 let stats = stats.unwrap();
4743 assert!(!stats.is_max_value_exact.unwrap());
4745 assert!(!stats.is_min_value_exact.unwrap());
4746 assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
4747 assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
4748
4749 let second_page = &prot.as_slice()[hdr.compressed_page_size as usize..];
4751 let mut prot = ThriftSliceInputProtocol::new(second_page);
4752 let hdr = PageHeader::read_thrift(&mut prot).unwrap();
4753 let stats = hdr.data_page_header.unwrap().statistics;
4754 assert!(stats.is_some());
4755 let stats = stats.unwrap();
4756 assert!(!stats.is_max_value_exact.unwrap());
4758 assert!(!stats.is_min_value_exact.unwrap());
4759 assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
4760 assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
4761 }
4762
4763 #[test]
4764 fn test_page_encoding_statistics_roundtrip() {
4765 let batch_schema = Schema::new(vec![Field::new(
4766 "int32",
4767 arrow_schema::DataType::Int32,
4768 false,
4769 )]);
4770
4771 let batch = RecordBatch::try_new(
4772 Arc::new(batch_schema.clone()),
4773 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4774 )
4775 .unwrap();
4776
4777 let mut file: File = tempfile::tempfile().unwrap();
4778 let mut writer = ArrowWriter::try_new(&mut file, Arc::new(batch_schema), None).unwrap();
4779 writer.write(&batch).unwrap();
4780 let file_metadata = writer.close().unwrap();
4781
4782 assert_eq!(file_metadata.num_row_groups(), 1);
4783 assert_eq!(file_metadata.row_group(0).num_columns(), 1);
4784 assert!(
4785 file_metadata
4786 .row_group(0)
4787 .column(0)
4788 .page_encoding_stats()
4789 .is_some()
4790 );
4791 let chunk_page_stats = file_metadata
4792 .row_group(0)
4793 .column(0)
4794 .page_encoding_stats()
4795 .unwrap();
4796
4797 let options = ReadOptionsBuilder::new()
4799 .with_page_index()
4800 .with_encoding_stats_as_mask(false)
4801 .build();
4802 let reader = SerializedFileReader::new_with_options(file, options).unwrap();
4803
4804 let rowgroup = reader.get_row_group(0).expect("row group missing");
4805 assert_eq!(rowgroup.num_columns(), 1);
4806 let column = rowgroup.metadata().column(0);
4807 assert!(column.page_encoding_stats().is_some());
4808 let file_page_stats = column.page_encoding_stats().unwrap();
4809 assert_eq!(chunk_page_stats, file_page_stats);
4810 }
4811
4812 #[test]
4813 fn test_different_dict_page_size_limit() {
4814 let array = Arc::new(Int64Array::from_iter(0..1024 * 1024));
4815 let schema = Arc::new(Schema::new(vec![
4816 Field::new("col0", arrow_schema::DataType::Int64, false),
4817 Field::new("col1", arrow_schema::DataType::Int64, false),
4818 ]));
4819 let batch =
4820 arrow_array::RecordBatch::try_new(schema.clone(), vec![array.clone(), array]).unwrap();
4821
4822 let props = WriterProperties::builder()
4823 .set_dictionary_page_size_limit(1024 * 1024)
4824 .set_column_dictionary_page_size_limit(ColumnPath::from("col1"), 1024 * 1024 * 4)
4825 .build();
4826 let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap();
4827 writer.write(&batch).unwrap();
4828 let data = Bytes::from(writer.into_inner().unwrap());
4829
4830 let mut metadata = ParquetMetaDataReader::new();
4831 metadata.try_parse(&data).unwrap();
4832 let metadata = metadata.finish().unwrap();
4833 let col0_meta = metadata.row_group(0).column(0);
4834 let col1_meta = metadata.row_group(0).column(1);
4835
4836 let get_dict_page_size = move |meta: &ColumnChunkMetaData| {
4837 let mut reader =
4838 SerializedPageReader::new(Arc::new(data.clone()), meta, 0, None).unwrap();
4839 let page = reader.get_next_page().unwrap().unwrap();
4840 match page {
4841 Page::DictionaryPage { buf, .. } => buf.len(),
4842 _ => panic!("expected DictionaryPage"),
4843 }
4844 };
4845
4846 assert_eq!(get_dict_page_size(col0_meta), 1024 * 1024);
4847 assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4);
4848 }
4849
4850 struct WriteBatchesShape {
4851 num_batches: usize,
4852 rows_per_batch: usize,
4853 row_size: usize,
4854 }
4855
4856 fn write_batches(
4858 WriteBatchesShape {
4859 num_batches,
4860 rows_per_batch,
4861 row_size,
4862 }: WriteBatchesShape,
4863 props: WriterProperties,
4864 ) -> ParquetRecordBatchReaderBuilder<File> {
4865 let schema = Arc::new(Schema::new(vec![Field::new(
4866 "str",
4867 ArrowDataType::Utf8,
4868 false,
4869 )]));
4870 let file = tempfile::tempfile().unwrap();
4871 let mut writer =
4872 ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
4873
4874 for batch_idx in 0..num_batches {
4875 let strings: Vec<String> = (0..rows_per_batch)
4876 .map(|i| format!("{:0>width$}", batch_idx * 10 + i, width = row_size))
4877 .collect();
4878 let array = StringArray::from(strings);
4879 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
4880 writer.write(&batch).unwrap();
4881 }
4882 writer.close().unwrap();
4883 ParquetRecordBatchReaderBuilder::try_new(file).unwrap()
4884 }
4885
4886 #[test]
4887 fn test_row_group_limit_none_writes_single_row_group() {
4889 let props = WriterProperties::builder()
4890 .set_max_row_group_row_count(None)
4891 .set_max_row_group_bytes(None)
4892 .build();
4893
4894 let builder = write_batches(
4895 WriteBatchesShape {
4896 num_batches: 1,
4897 rows_per_batch: 1000,
4898 row_size: 4,
4899 },
4900 props,
4901 );
4902
4903 assert_eq!(
4904 &row_group_sizes(builder.metadata()),
4905 &[1000],
4906 "With no limits, all rows should be in a single row group"
4907 );
4908 }
4909
4910 #[test]
4911 fn test_row_group_limit_rows_only() {
4913 let props = WriterProperties::builder()
4914 .set_max_row_group_row_count(Some(300))
4915 .set_max_row_group_bytes(None)
4916 .build();
4917
4918 let builder = write_batches(
4919 WriteBatchesShape {
4920 num_batches: 1,
4921 rows_per_batch: 1000,
4922 row_size: 4,
4923 },
4924 props,
4925 );
4926
4927 assert_eq!(
4928 &row_group_sizes(builder.metadata()),
4929 &[300, 300, 300, 100],
4930 "Row groups should be split by row count"
4931 );
4932 }
4933
4934 #[test]
4935 fn test_row_group_limit_bytes_only() {
4937 let props = WriterProperties::builder()
4938 .set_max_row_group_row_count(None)
4939 .set_max_row_group_bytes(Some(3500))
4941 .build();
4942
4943 let builder = write_batches(
4944 WriteBatchesShape {
4945 num_batches: 10,
4946 rows_per_batch: 10,
4947 row_size: 100,
4948 },
4949 props,
4950 );
4951
4952 let sizes = row_group_sizes(builder.metadata());
4953
4954 assert!(
4955 sizes.len() > 1,
4956 "Should have multiple row groups due to byte limit, got {sizes:?}",
4957 );
4958
4959 let total_rows: i64 = sizes.iter().sum();
4960 assert_eq!(total_rows, 100, "Total rows should be preserved");
4961 }
4962
4963 #[test]
4964 fn test_row_group_limit_bytes_flushes_when_current_group_already_too_large() {
4966 let schema = Arc::new(Schema::new(vec![Field::new(
4967 "str",
4968 ArrowDataType::Utf8,
4969 false,
4970 )]));
4971 let file = tempfile::tempfile().unwrap();
4972
4973 let props = WriterProperties::builder()
4975 .set_max_row_group_row_count(None)
4976 .set_max_row_group_bytes(None)
4977 .build();
4978 let mut writer =
4979 ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
4980
4981 let first_array = StringArray::from(
4982 (0..10)
4983 .map(|i| format!("{:0>100}", i))
4984 .collect::<Vec<String>>(),
4985 );
4986 let first_batch =
4987 RecordBatch::try_new(schema.clone(), vec![Arc::new(first_array)]).unwrap();
4988 writer.write(&first_batch).unwrap();
4989 assert_eq!(writer.in_progress_rows(), 10);
4990
4991 writer.max_row_group_bytes = Some(1);
4994
4995 let second_array = StringArray::from(vec!["x".to_string()]);
4996 let second_batch =
4997 RecordBatch::try_new(schema.clone(), vec![Arc::new(second_array)]).unwrap();
4998 writer.write(&second_batch).unwrap();
4999 writer.close().unwrap();
5000 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
5001
5002 assert_eq!(
5003 &row_group_sizes(builder.metadata()),
5004 &[10, 1],
5005 "The second write should flush an oversized in-progress row group first",
5006 );
5007 }
5008
5009 #[test]
5010 fn test_row_group_limit_both_row_wins_single_batch() {
5012 let props = WriterProperties::builder()
5013 .set_max_row_group_row_count(Some(200)) .set_max_row_group_bytes(Some(1024 * 1024)) .build();
5016
5017 let builder = write_batches(
5018 WriteBatchesShape {
5019 num_batches: 1,
5020 row_size: 4,
5021 rows_per_batch: 1000,
5022 },
5023 props,
5024 );
5025
5026 assert_eq!(
5027 &row_group_sizes(builder.metadata()),
5028 &[200, 200, 200, 200, 200],
5029 "Row limit should trigger before byte limit"
5030 );
5031 }
5032
5033 #[test]
5034 fn test_row_group_limit_both_row_wins_multiple_batches() {
5036 let props = WriterProperties::builder()
5037 .set_max_row_group_row_count(Some(5)) .set_max_row_group_bytes(Some(9999)) .build();
5040
5041 let builder = write_batches(
5042 WriteBatchesShape {
5043 num_batches: 10,
5044 rows_per_batch: 10,
5045 row_size: 100,
5046 },
5047 props,
5048 );
5049
5050 assert_eq!(
5051 &row_group_sizes(builder.metadata()),
5052 &[5; 20],
5053 "Row limit should trigger before byte limit"
5054 );
5055 }
5056
5057 #[test]
5058 fn test_row_group_limit_both_bytes_wins() {
5060 let props = WriterProperties::builder()
5061 .set_max_row_group_row_count(Some(1000)) .set_max_row_group_bytes(Some(3500)) .build();
5064
5065 let builder = write_batches(
5066 WriteBatchesShape {
5067 num_batches: 10,
5068 rows_per_batch: 10,
5069 row_size: 100,
5070 },
5071 props,
5072 );
5073
5074 let sizes = row_group_sizes(builder.metadata());
5075
5076 assert!(
5077 sizes.len() > 1,
5078 "Byte limit should trigger before row limit, got {sizes:?}",
5079 );
5080
5081 assert!(
5082 sizes.iter().all(|&s| s < 1000),
5083 "No row group should hit the row limit"
5084 );
5085
5086 let total_rows: i64 = sizes.iter().sum();
5087 assert_eq!(total_rows, 100, "Total rows should be preserved");
5088 }
5089
5090 #[test]
5091 fn arrow_column_chunk_close_mut_drops_column_index() {
5092 use crate::arrow::ArrowSchemaConverter;
5093 use crate::file::writer::SerializedFileWriter;
5094
5095 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)]));
5096 let props = Arc::new(
5097 WriterProperties::builder()
5098 .set_statistics_enabled(EnabledStatistics::Page)
5099 .build(),
5100 );
5101 let parquet_schema = ArrowSchemaConverter::new()
5102 .with_coerce_types(props.coerce_types())
5103 .convert(&schema)
5104 .unwrap();
5105
5106 let mut buf = Vec::with_capacity(1024);
5107 let mut writer =
5108 SerializedFileWriter::new(&mut buf, parquet_schema.root_schema_ptr(), props.clone())
5109 .unwrap();
5110
5111 let factory = ArrowRowGroupWriterFactory::new(&writer, Arc::clone(&schema));
5112 let mut col_writers = factory.create_column_writers(0).unwrap();
5113 let arr: ArrayRef = Arc::new(Int32Array::from_iter_values(0..64));
5114 for leaves in compute_leaves(schema.field(0), &arr).unwrap() {
5115 col_writers[0].write(&leaves).unwrap();
5116 }
5117 let mut chunk = col_writers.pop().unwrap().close().unwrap();
5118
5119 assert!(
5121 chunk.close().column_index.is_some(),
5122 "EnabledStatistics::Page should produce a column_index"
5123 );
5124
5125 chunk.close_mut().column_index = None;
5127 assert!(chunk.close().column_index.is_none());
5128
5129 let mut rg = writer.next_row_group().unwrap();
5130 chunk.append_to_row_group(&mut rg).unwrap();
5131 rg.close().unwrap();
5132 let file_meta = writer.close().unwrap();
5133
5134 let cc = file_meta.row_group(0).column(0);
5137 assert!(cc.column_index_range().is_none());
5138 }
5139}