1use crate::column::chunker::ContentDefinedChunker;
21
22use bytes::Bytes;
23use std::io::{Read, Write};
24use std::slice::Iter;
25use std::sync::{Arc, Mutex};
26use std::vec::IntoIter;
27
28use arrow_array::cast::AsArray;
29use arrow_array::types::*;
30use arrow_array::{ArrayRef, Int32Array, RecordBatch, RecordBatchWriter};
31use arrow_schema::{
32 ArrowError, DataType as ArrowDataType, Field, IntervalUnit, SchemaRef, TimeUnit,
33};
34
35use super::schema::{add_encoded_arrow_schema_to_metadata, decimal_length_from_precision};
36
37use crate::arrow::ArrowSchemaConverter;
38use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder;
39use crate::basic::PageType;
40use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter};
41use crate::column::page_encryption::PageEncryptor;
42use crate::column::writer::encoder::ColumnValueEncoder;
43use crate::column::writer::{
44 ColumnCloseResult, ColumnWriter, GenericColumnWriter, get_column_writer,
45};
46use crate::data_type::{ByteArray, FixedLenByteArray};
47#[cfg(feature = "encryption")]
48use crate::encryption::encrypt::FileEncryptor;
49use crate::errors::{ParquetError, Result};
50use crate::file::metadata::{KeyValue, ParquetMetaData, RowGroupMetaData};
51use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
52use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
53use crate::parquet_thrift::{ThriftCompactOutputProtocol, WriteThrift};
54use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor};
55use levels::{ArrayLevels, calculate_array_levels};
56
57mod byte_array;
58mod levels;
59
60#[doc(inline)]
61pub use crate::column::page_store::{
62 InMemoryPageStore, InMemoryPageStoreFactory, PageKey, PageStore, PageStoreArgs,
63 PageStoreFactory,
64};
65
66pub struct ArrowWriter<W: Write> {
183 writer: SerializedFileWriter<W>,
185
186 in_progress: Option<ArrowRowGroupWriter>,
188
189 arrow_schema: SchemaRef,
193
194 row_group_writer_factory: ArrowRowGroupWriterFactory,
196
197 max_row_group_row_count: Option<usize>,
199
200 max_row_group_bytes: Option<usize>,
202
203 cdc_chunkers: Option<Vec<ContentDefinedChunker>>,
205}
206
207impl<W: Write + Send> std::fmt::Debug for ArrowWriter<W> {
208 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
209 let buffered_memory = self.in_progress_size();
210 f.debug_struct("ArrowWriter")
211 .field("writer", &self.writer)
212 .field("in_progress_size", &format_args!("{buffered_memory} bytes"))
213 .field("in_progress_rows", &self.in_progress_rows())
214 .field("arrow_schema", &self.arrow_schema)
215 .field("max_row_group_row_count", &self.max_row_group_row_count)
216 .field("max_row_group_bytes", &self.max_row_group_bytes)
217 .finish()
218 }
219}
220
221impl<W: Write + Send> ArrowWriter<W> {
222 pub fn try_new(
228 writer: W,
229 arrow_schema: SchemaRef,
230 props: Option<WriterProperties>,
231 ) -> Result<Self> {
232 let options = ArrowWriterOptions::new().with_properties(props.unwrap_or_default());
233 Self::try_new_with_options(writer, arrow_schema, options)
234 }
235
236 pub fn try_new_with_options(
242 writer: W,
243 arrow_schema: SchemaRef,
244 options: ArrowWriterOptions,
245 ) -> Result<Self> {
246 let mut props = options.properties;
247
248 let schema = if let Some(parquet_schema) = options.schema_descr {
249 parquet_schema.clone()
250 } else {
251 let mut converter = ArrowSchemaConverter::new().with_coerce_types(props.coerce_types());
252 if let Some(schema_root) = &options.schema_root {
253 converter = converter.schema_root(schema_root);
254 }
255
256 converter.convert(&arrow_schema)?
257 };
258
259 if !options.skip_arrow_metadata {
260 add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
262 }
263
264 let max_row_group_row_count = props.max_row_group_row_count();
265 let max_row_group_bytes = props.max_row_group_bytes();
266
267 let props_ptr = Arc::new(props);
268 let file_writer =
269 SerializedFileWriter::new(writer, schema.root_schema_ptr(), Arc::clone(&props_ptr))?;
270
271 let mut row_group_writer_factory =
272 ArrowRowGroupWriterFactory::new(&file_writer, arrow_schema.clone());
273 if let Some(page_store_factory) = options.page_store_factory {
274 row_group_writer_factory =
275 row_group_writer_factory.with_page_store_factory(page_store_factory);
276 }
277
278 let cdc_chunkers = props_ptr
279 .content_defined_chunking()
280 .map(|opts| {
281 file_writer
282 .schema_descr()
283 .columns()
284 .iter()
285 .map(|desc| ContentDefinedChunker::new(desc, opts))
286 .collect::<Result<Vec<_>>>()
287 })
288 .transpose()?;
289
290 Ok(Self {
291 writer: file_writer,
292 in_progress: None,
293 arrow_schema,
294 row_group_writer_factory,
295 max_row_group_row_count,
296 max_row_group_bytes,
297 cdc_chunkers,
298 })
299 }
300
301 pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
303 self.writer.flushed_row_groups()
304 }
305
306 pub fn memory_size(&self) -> usize {
311 match &self.in_progress {
312 Some(in_progress) => in_progress.writers.iter().map(|x| x.memory_size()).sum(),
313 None => 0,
314 }
315 }
316
317 pub fn in_progress_size(&self) -> usize {
324 match &self.in_progress {
325 Some(in_progress) => in_progress
326 .writers
327 .iter()
328 .map(|x| x.get_estimated_total_bytes())
329 .sum(),
330 None => 0,
331 }
332 }
333
334 pub fn in_progress_rows(&self) -> usize {
336 self.in_progress
337 .as_ref()
338 .map(|x| x.buffered_rows)
339 .unwrap_or_default()
340 }
341
342 pub fn bytes_written(&self) -> usize {
344 self.writer.bytes_written()
345 }
346
347 pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
359 if batch.num_rows() == 0 {
360 return Ok(());
361 }
362
363 let in_progress = match &mut self.in_progress {
364 Some(in_progress) => in_progress,
365 x => x.insert(
366 self.row_group_writer_factory
367 .create_row_group_writer(self.writer.flushed_row_groups().len())?,
368 ),
369 };
370
371 if let Some(max_rows) = self.max_row_group_row_count {
372 if in_progress.buffered_rows + batch.num_rows() > max_rows {
373 let to_write = max_rows - in_progress.buffered_rows;
374 let a = batch.slice(0, to_write);
375 let b = batch.slice(to_write, batch.num_rows() - to_write);
376 self.write(&a)?;
377 return self.write(&b);
378 }
379 }
380
381 if let Some(max_bytes) = self.max_row_group_bytes {
384 if in_progress.buffered_rows > 0 {
385 let current_bytes = in_progress.get_estimated_total_bytes();
386
387 if current_bytes >= max_bytes {
388 self.flush()?;
389 return self.write(batch);
390 }
391
392 let avg_row_bytes = current_bytes / in_progress.buffered_rows;
393 if avg_row_bytes > 0 {
394 let remaining_bytes = max_bytes - current_bytes;
396 let rows_that_fit = remaining_bytes / avg_row_bytes;
397
398 if batch.num_rows() > rows_that_fit {
399 if rows_that_fit > 0 {
400 let a = batch.slice(0, rows_that_fit);
401 let b = batch.slice(rows_that_fit, batch.num_rows() - rows_that_fit);
402 self.write(&a)?;
403 return self.write(&b);
404 } else {
405 self.flush()?;
406 return self.write(batch);
407 }
408 }
409 }
410 }
411 }
412
413 match self.cdc_chunkers.as_mut() {
414 Some(chunkers) => in_progress.write_with_chunkers(batch, chunkers)?,
415 None => in_progress.write(batch)?,
416 }
417
418 let should_flush = self
419 .max_row_group_row_count
420 .is_some_and(|max| in_progress.buffered_rows >= max)
421 || self
422 .max_row_group_bytes
423 .is_some_and(|max| in_progress.get_estimated_total_bytes() >= max);
424
425 if should_flush {
426 self.flush()?
427 }
428 Ok(())
429 }
430
431 pub fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
436 self.writer.write_all(buf)
437 }
438
439 pub fn sync(&mut self) -> std::io::Result<()> {
441 self.writer.flush()
442 }
443
444 pub fn flush(&mut self) -> Result<()> {
449 let in_progress = match self.in_progress.take() {
450 Some(in_progress) => in_progress,
451 None => return Ok(()),
452 };
453
454 let mut row_group_writer = self.writer.next_row_group()?;
455 for chunk in in_progress.close()? {
456 chunk.append_to_row_group(&mut row_group_writer)?;
457 }
458 row_group_writer.close()?;
459 Ok(())
460 }
461
462 pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
466 self.writer.append_key_value_metadata(kv_metadata)
467 }
468
469 pub fn inner(&self) -> &W {
471 self.writer.inner()
472 }
473
474 pub fn inner_mut(&mut self) -> &mut W {
483 self.writer.inner_mut()
484 }
485
486 pub fn into_inner(mut self) -> Result<W> {
488 self.flush()?;
489 self.writer.into_inner()
490 }
491
492 pub fn finish(&mut self) -> Result<ParquetMetaData> {
498 self.flush()?;
499 self.writer.finish()
500 }
501
502 pub fn close(mut self) -> Result<ParquetMetaData> {
504 self.finish()
505 }
506
507 #[deprecated(
509 since = "56.2.0",
510 note = "Use `ArrowRowGroupWriterFactory` instead, see `ArrowColumnWriter` for an example"
511 )]
512 pub fn get_column_writers(&mut self) -> Result<Vec<ArrowColumnWriter>> {
513 self.flush()?;
514 let in_progress = self
515 .row_group_writer_factory
516 .create_row_group_writer(self.writer.flushed_row_groups().len())?;
517 Ok(in_progress.writers)
518 }
519
520 #[deprecated(
522 since = "56.2.0",
523 note = "Use `SerializedFileWriter` directly instead, see `ArrowColumnWriter` for an example"
524 )]
525 pub fn append_row_group(&mut self, chunks: Vec<ArrowColumnChunk>) -> Result<()> {
526 let mut row_group_writer = self.writer.next_row_group()?;
527 for chunk in chunks {
528 chunk.append_to_row_group(&mut row_group_writer)?;
529 }
530 row_group_writer.close()?;
531 Ok(())
532 }
533
534 pub fn into_serialized_writer(
541 mut self,
542 ) -> Result<(SerializedFileWriter<W>, ArrowRowGroupWriterFactory)> {
543 self.flush()?;
544 Ok((self.writer, self.row_group_writer_factory))
545 }
546}
547
548impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> {
549 fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
550 self.write(batch).map_err(|e| e.into())
551 }
552
553 fn close(self) -> std::result::Result<(), ArrowError> {
554 self.close()?;
555 Ok(())
556 }
557}
558
559#[derive(Debug, Clone, Default)]
563pub struct ArrowWriterOptions {
564 properties: WriterProperties,
565 skip_arrow_metadata: bool,
566 schema_root: Option<String>,
567 schema_descr: Option<SchemaDescriptor>,
568 page_store_factory: Option<Arc<dyn PageStoreFactory>>,
569}
570
571impl ArrowWriterOptions {
572 pub fn new() -> Self {
574 Self::default()
575 }
576
577 pub fn with_properties(self, properties: WriterProperties) -> Self {
579 Self { properties, ..self }
580 }
581
582 pub fn with_page_store_factory(self, page_store_factory: Arc<dyn PageStoreFactory>) -> Self {
660 Self {
661 page_store_factory: Some(page_store_factory),
662 ..self
663 }
664 }
665
666 pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
673 Self {
674 skip_arrow_metadata,
675 ..self
676 }
677 }
678
679 pub fn with_schema_root(self, schema_root: String) -> Self {
681 Self {
682 schema_root: Some(schema_root),
683 ..self
684 }
685 }
686
687 pub fn with_parquet_schema(self, schema_descr: SchemaDescriptor) -> Self {
693 Self {
694 schema_descr: Some(schema_descr),
695 ..self
696 }
697 }
698}
699
700struct ArrowColumnChunkData {
706 length: usize,
707 store: Box<dyn PageStore>,
708 keys: Vec<PageKey>,
709 dictionary: Vec<Bytes>,
720}
721
722impl ArrowColumnChunkData {
723 fn new(store: Box<dyn PageStore>) -> Self {
724 Self {
725 length: 0,
726 store,
727 keys: Vec::new(),
728 dictionary: Vec::new(),
729 }
730 }
731
732 fn push(&mut self, value: Bytes) -> Result<()> {
735 let key = self.store.put(value)?;
736 self.keys.push(key);
737 Ok(())
738 }
739
740 fn push_dictionary(&mut self, value: Bytes) {
742 self.dictionary.push(value);
743 }
744
745 fn dictionary_len(&self) -> usize {
747 self.dictionary.iter().map(Bytes::len).sum()
748 }
749
750 fn memory_size(&self) -> usize {
754 self.store.memory_size() + self.dictionary_len()
755 }
756}
757
758struct StreamingColumnChunkReader {
767 dictionary: IntoIter<Bytes>,
769 store: Box<dyn PageStore>,
770 keys: IntoIter<PageKey>,
771 current: Bytes,
773}
774
775impl StreamingColumnChunkReader {
776 fn new(data: ArrowColumnChunkData) -> Self {
777 Self {
778 dictionary: data.dictionary.into_iter(),
779 store: data.store,
780 keys: data.keys.into_iter(),
781 current: Bytes::new(),
782 }
783 }
784}
785
786impl Read for StreamingColumnChunkReader {
787 fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
788 while self.current.is_empty() {
791 if let Some(blob) = self.dictionary.next() {
792 self.current = blob;
793 } else if let Some(key) = self.keys.next() {
794 self.current = self.store.take(key).map_err(std::io::Error::other)?;
795 } else {
796 return Ok(0);
797 }
798 }
799
800 let len = self.current.len().min(out.len());
801 let b = self.current.split_to(len);
802 out[..len].copy_from_slice(&b);
803 Ok(len)
804 }
805}
806
807type SharedColumnChunk = Arc<Mutex<ArrowColumnChunkData>>;
812
813struct ArrowPageWriter {
814 buffer: SharedColumnChunk,
815 #[cfg(feature = "encryption")]
816 page_encryptor: Option<PageEncryptor>,
817}
818
819impl ArrowPageWriter {
820 fn new(store: Box<dyn PageStore>) -> Self {
822 Self {
823 buffer: Arc::new(Mutex::new(ArrowColumnChunkData::new(store))),
824 #[cfg(feature = "encryption")]
825 page_encryptor: None,
826 }
827 }
828
829 #[cfg(feature = "encryption")]
830 pub fn with_encryptor(mut self, page_encryptor: Option<PageEncryptor>) -> Self {
831 self.page_encryptor = page_encryptor;
832 self
833 }
834
835 #[cfg(feature = "encryption")]
836 fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
837 self.page_encryptor.as_mut()
838 }
839
840 #[cfg(not(feature = "encryption"))]
841 fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
842 None
843 }
844}
845
846impl PageWriter for ArrowPageWriter {
847 fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
848 let page = match self.page_encryptor_mut() {
849 Some(page_encryptor) => page_encryptor.encrypt_compressed_page(page)?,
850 None => page,
851 };
852
853 let page_header = page.to_thrift_header()?;
854 let header = {
855 let mut header = Vec::with_capacity(1024);
856
857 match self.page_encryptor_mut() {
858 Some(page_encryptor) => {
859 page_encryptor.encrypt_page_header(&page_header, &mut header)?;
860 if page.compressed_page().is_data_page() {
861 page_encryptor.increment_page();
862 }
863 }
864 None => {
865 let mut protocol = ThriftCompactOutputProtocol::new(&mut header);
866 page_header.write_thrift(&mut protocol)?;
867 }
868 };
869
870 Bytes::from(header)
871 };
872
873 let mut buf = self.buffer.try_lock().unwrap();
874
875 let data = page.compressed_page().buffer().clone();
876 let compressed_size = data.len() + header.len();
877
878 let mut spec = PageWriteSpec::new();
879 spec.page_type = page.page_type();
880 spec.num_values = page.num_values();
881 spec.uncompressed_size = page.uncompressed_size() + header.len();
882 spec.offset = buf.length as u64;
883 spec.compressed_size = compressed_size;
884 spec.bytes_written = compressed_size as u64;
885
886 buf.length += compressed_size;
887 if spec.page_type == PageType::DICTIONARY_PAGE {
888 buf.push_dictionary(header);
891 buf.push_dictionary(data);
892 } else {
893 buf.push(header)?;
894 buf.push(data)?;
895 }
896
897 Ok(spec)
898 }
899
900 fn defers_dictionary_ordering(&self) -> bool {
901 true
906 }
907
908 fn buffered_memory_size(&self) -> usize {
909 self.buffer.try_lock().unwrap().memory_size()
912 }
913
914 fn close(&mut self) -> Result<()> {
915 Ok(())
916 }
917}
918
919#[derive(Debug)]
921pub struct ArrowLeafColumn(ArrayLevels);
922
923pub fn compute_leaves(field: &Field, array: &ArrayRef) -> Result<Vec<ArrowLeafColumn>> {
928 let levels = calculate_array_levels(array, field)?;
929 Ok(levels.into_iter().map(ArrowLeafColumn).collect())
930}
931
932pub struct ArrowColumnChunk {
934 data: ArrowColumnChunkData,
935 close: ColumnCloseResult,
936}
937
938impl std::fmt::Debug for ArrowColumnChunk {
939 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
940 f.debug_struct("ArrowColumnChunk")
941 .field("length", &self.data.length)
942 .finish_non_exhaustive()
943 }
944}
945
946impl ArrowColumnChunk {
947 pub fn close(&self) -> &ColumnCloseResult {
954 &self.close
955 }
956
957 pub fn close_mut(&mut self) -> &mut ColumnCloseResult {
964 &mut self.close
965 }
966
967 pub fn append_to_row_group<W: Write + Send>(
970 self,
971 writer: &mut SerializedRowGroupWriter<'_, W>,
972 ) -> Result<()> {
973 let ArrowColumnChunk { data, close } = self;
974
975 let close = close.update_dictionary_location(data.dictionary_len())?;
979
980 let reader = StreamingColumnChunkReader::new(data);
981 writer.append_column_from_read(reader, close)
982 }
983}
984
985pub struct ArrowColumnWriter {
1083 writer: ArrowColumnWriterImpl,
1084 chunk: SharedColumnChunk,
1085}
1086
1087impl std::fmt::Debug for ArrowColumnWriter {
1088 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1089 f.debug_struct("ArrowColumnWriter").finish_non_exhaustive()
1090 }
1091}
1092
1093enum ArrowColumnWriterImpl {
1094 ByteArray(GenericColumnWriter<'static, ByteArrayEncoder>),
1095 Column(ColumnWriter<'static>),
1096}
1097
1098impl ArrowColumnWriter {
1099 pub fn write(&mut self, col: &ArrowLeafColumn) -> Result<()> {
1101 self.write_internal(&col.0)
1102 }
1103
1104 fn write_with_chunker(
1106 &mut self,
1107 col: &ArrowLeafColumn,
1108 chunker: &mut ContentDefinedChunker,
1109 ) -> Result<()> {
1110 let levels = &col.0;
1111 let chunks = chunker.get_arrow_chunks(
1112 levels.def_level_data().as_ref(),
1113 levels.rep_level_data().as_ref(),
1114 levels.array(),
1115 )?;
1116
1117 let num_chunks = chunks.len();
1118 for (i, chunk) in chunks.iter().enumerate() {
1119 let chunk_levels = levels.slice_for_chunk(chunk);
1120 self.write_internal(&chunk_levels)?;
1121
1122 if i + 1 < num_chunks {
1124 match &mut self.writer {
1125 ArrowColumnWriterImpl::Column(c) => c.add_data_page()?,
1126 ArrowColumnWriterImpl::ByteArray(c) => c.add_data_page()?,
1127 }
1128 }
1129 }
1130 Ok(())
1131 }
1132
1133 fn write_internal(&mut self, levels: &ArrayLevels) -> Result<()> {
1134 match &mut self.writer {
1135 ArrowColumnWriterImpl::Column(c) => {
1136 let leaf = levels.array();
1137 match leaf.as_any_dictionary_opt() {
1138 Some(dictionary) => {
1139 let materialized =
1140 arrow_select::take::take(dictionary.values(), dictionary.keys(), None)?;
1141 write_leaf(c, &materialized, levels)?
1142 }
1143 None => write_leaf(c, leaf, levels)?,
1144 };
1145 }
1146 ArrowColumnWriterImpl::ByteArray(c) => {
1147 write_primitive(c, levels.array().as_ref(), levels)?;
1148 }
1149 }
1150 Ok(())
1151 }
1152
1153 pub fn close(self) -> Result<ArrowColumnChunk> {
1155 let close = match self.writer {
1156 ArrowColumnWriterImpl::ByteArray(c) => c.close()?,
1157 ArrowColumnWriterImpl::Column(c) => c.close()?,
1158 };
1159 let chunk = Arc::try_unwrap(self.chunk).ok().unwrap();
1160 let data = chunk.into_inner().unwrap();
1161 Ok(ArrowColumnChunk { data, close })
1162 }
1163
1164 pub fn memory_size(&self) -> usize {
1175 match &self.writer {
1176 ArrowColumnWriterImpl::ByteArray(c) => c.memory_size(),
1177 ArrowColumnWriterImpl::Column(c) => c.memory_size(),
1178 }
1179 }
1180
1181 pub fn get_estimated_total_bytes(&self) -> usize {
1189 match &self.writer {
1190 ArrowColumnWriterImpl::ByteArray(c) => c.get_estimated_total_bytes() as _,
1191 ArrowColumnWriterImpl::Column(c) => c.get_estimated_total_bytes() as _,
1192 }
1193 }
1194}
1195
1196#[derive(Debug)]
1203struct ArrowRowGroupWriter {
1204 writers: Vec<ArrowColumnWriter>,
1205 schema: SchemaRef,
1206 buffered_rows: usize,
1207}
1208
1209impl ArrowRowGroupWriter {
1210 fn new(writers: Vec<ArrowColumnWriter>, arrow: &SchemaRef) -> Self {
1211 Self {
1212 writers,
1213 schema: arrow.clone(),
1214 buffered_rows: 0,
1215 }
1216 }
1217
1218 fn write(&mut self, batch: &RecordBatch) -> Result<()> {
1219 self.buffered_rows += batch.num_rows();
1220 let mut writers = self.writers.iter_mut();
1221 for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
1222 for leaf in compute_leaves(field.as_ref(), column)? {
1223 writers.next().unwrap().write(&leaf)?;
1224 }
1225 }
1226 Ok(())
1227 }
1228
1229 fn write_with_chunkers(
1230 &mut self,
1231 batch: &RecordBatch,
1232 chunkers: &mut [ContentDefinedChunker],
1233 ) -> Result<()> {
1234 self.buffered_rows += batch.num_rows();
1235 let mut writers = self.writers.iter_mut();
1236 let mut chunkers = chunkers.iter_mut();
1237 for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
1238 for leaf in compute_leaves(field.as_ref(), column)? {
1239 writers
1240 .next()
1241 .unwrap()
1242 .write_with_chunker(&leaf, chunkers.next().unwrap())?;
1243 }
1244 }
1245 Ok(())
1246 }
1247
1248 fn get_estimated_total_bytes(&self) -> usize {
1250 self.writers
1251 .iter()
1252 .map(|x| x.get_estimated_total_bytes())
1253 .sum()
1254 }
1255
1256 fn close(self) -> Result<Vec<ArrowColumnChunk>> {
1257 self.writers
1258 .into_iter()
1259 .map(|writer| writer.close())
1260 .collect()
1261 }
1262}
1263
1264#[derive(Debug)]
1269pub struct ArrowRowGroupWriterFactory {
1270 schema: SchemaDescPtr,
1271 arrow_schema: SchemaRef,
1272 props: WriterPropertiesPtr,
1273 page_store_factory: Arc<dyn PageStoreFactory>,
1274 #[cfg(feature = "encryption")]
1275 file_encryptor: Option<Arc<FileEncryptor>>,
1276}
1277
1278impl ArrowRowGroupWriterFactory {
1279 pub fn new<W: Write + Send>(
1281 file_writer: &SerializedFileWriter<W>,
1282 arrow_schema: SchemaRef,
1283 ) -> Self {
1284 let schema = Arc::clone(file_writer.schema_descr_ptr());
1285 let props = Arc::clone(file_writer.properties());
1286 Self {
1287 schema,
1288 arrow_schema,
1289 props,
1290 page_store_factory: Arc::new(InMemoryPageStoreFactory),
1291 #[cfg(feature = "encryption")]
1292 file_encryptor: file_writer.file_encryptor(),
1293 }
1294 }
1295
1296 pub fn with_page_store_factory(
1300 mut self,
1301 page_store_factory: Arc<dyn PageStoreFactory>,
1302 ) -> Self {
1303 self.page_store_factory = page_store_factory;
1304 self
1305 }
1306
1307 fn create_row_group_writer(&self, row_group_index: usize) -> Result<ArrowRowGroupWriter> {
1308 let writers = self.create_column_writers(row_group_index)?;
1309 Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema))
1310 }
1311
1312 pub fn create_column_writers(&self, row_group_index: usize) -> Result<Vec<ArrowColumnWriter>> {
1314 let mut writers = Vec::with_capacity(self.arrow_schema.fields.len());
1315 let mut leaves = self.schema.columns().iter();
1316 let column_factory = self.column_writer_factory(row_group_index);
1317 for field in &self.arrow_schema.fields {
1318 column_factory.get_arrow_column_writer(
1319 field.data_type(),
1320 &self.props,
1321 &mut leaves,
1322 &mut writers,
1323 )?;
1324 }
1325 Ok(writers)
1326 }
1327
1328 #[cfg(feature = "encryption")]
1329 fn column_writer_factory(&self, row_group_idx: usize) -> ArrowColumnWriterFactory {
1330 ArrowColumnWriterFactory::new()
1331 .with_page_store_factory(self.page_store_factory.clone())
1332 .with_file_encryptor(row_group_idx, self.file_encryptor.clone())
1333 }
1334
1335 #[cfg(not(feature = "encryption"))]
1336 fn column_writer_factory(&self, _row_group_idx: usize) -> ArrowColumnWriterFactory {
1337 ArrowColumnWriterFactory::new().with_page_store_factory(self.page_store_factory.clone())
1338 }
1339}
1340
1341#[deprecated(since = "57.0.0", note = "Use `ArrowRowGroupWriterFactory` instead")]
1343pub fn get_column_writers(
1344 parquet: &SchemaDescriptor,
1345 props: &WriterPropertiesPtr,
1346 arrow: &SchemaRef,
1347) -> Result<Vec<ArrowColumnWriter>> {
1348 let mut writers = Vec::with_capacity(arrow.fields.len());
1349 let mut leaves = parquet.columns().iter();
1350 let column_factory = ArrowColumnWriterFactory::new();
1351 for field in &arrow.fields {
1352 column_factory.get_arrow_column_writer(
1353 field.data_type(),
1354 props,
1355 &mut leaves,
1356 &mut writers,
1357 )?;
1358 }
1359 Ok(writers)
1360}
1361
1362struct ArrowColumnWriterFactory {
1364 page_store_factory: Arc<dyn PageStoreFactory>,
1366 #[cfg(feature = "encryption")]
1367 row_group_index: usize,
1368 #[cfg(feature = "encryption")]
1369 file_encryptor: Option<Arc<FileEncryptor>>,
1370}
1371
1372impl ArrowColumnWriterFactory {
1373 pub fn new() -> Self {
1374 Self {
1375 page_store_factory: Arc::new(InMemoryPageStoreFactory),
1376 #[cfg(feature = "encryption")]
1377 row_group_index: 0,
1378 #[cfg(feature = "encryption")]
1379 file_encryptor: None,
1380 }
1381 }
1382
1383 pub fn with_page_store_factory(
1385 mut self,
1386 page_store_factory: Arc<dyn PageStoreFactory>,
1387 ) -> Self {
1388 self.page_store_factory = page_store_factory;
1389 self
1390 }
1391
1392 #[cfg(feature = "encryption")]
1393 pub fn with_file_encryptor(
1394 mut self,
1395 row_group_index: usize,
1396 file_encryptor: Option<Arc<FileEncryptor>>,
1397 ) -> Self {
1398 self.row_group_index = row_group_index;
1399 self.file_encryptor = file_encryptor;
1400 self
1401 }
1402
1403 #[cfg(feature = "encryption")]
1404 fn create_page_writer(
1405 &self,
1406 column_descriptor: &ColumnDescPtr,
1407 column_index: usize,
1408 ) -> Result<Box<ArrowPageWriter>> {
1409 let column_path = column_descriptor.path().string();
1410 let page_encryptor = PageEncryptor::create_if_column_encrypted(
1411 &self.file_encryptor,
1412 self.row_group_index,
1413 column_index,
1414 &column_path,
1415 )?;
1416 let args = PageStoreArgs::new(column_index, column_descriptor);
1417 let store = self.page_store_factory.create(&args)?;
1418 Ok(Box::new(
1419 ArrowPageWriter::new(store).with_encryptor(page_encryptor),
1420 ))
1421 }
1422
1423 #[cfg(not(feature = "encryption"))]
1424 fn create_page_writer(
1425 &self,
1426 column_descriptor: &ColumnDescPtr,
1427 column_index: usize,
1428 ) -> Result<Box<ArrowPageWriter>> {
1429 let args = PageStoreArgs::new(column_index, column_descriptor);
1430 let store = self.page_store_factory.create(&args)?;
1431 Ok(Box::new(ArrowPageWriter::new(store)))
1432 }
1433
1434 fn get_arrow_column_writer(
1437 &self,
1438 data_type: &ArrowDataType,
1439 props: &WriterPropertiesPtr,
1440 leaves: &mut Iter<'_, ColumnDescPtr>,
1441 out: &mut Vec<ArrowColumnWriter>,
1442 ) -> Result<()> {
1443 let col = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
1445 let page_writer = self.create_page_writer(desc, out.len())?;
1446 let chunk = page_writer.buffer.clone();
1447 let writer = get_column_writer(desc.clone(), props.clone(), page_writer);
1448 Ok(ArrowColumnWriter {
1449 chunk,
1450 writer: ArrowColumnWriterImpl::Column(writer),
1451 })
1452 };
1453
1454 let bytes = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
1456 let page_writer = self.create_page_writer(desc, out.len())?;
1457 let chunk = page_writer.buffer.clone();
1458 let writer = GenericColumnWriter::new(desc.clone(), props.clone(), page_writer);
1459 Ok(ArrowColumnWriter {
1460 chunk,
1461 writer: ArrowColumnWriterImpl::ByteArray(writer),
1462 })
1463 };
1464
1465 match data_type {
1466 _ if data_type.is_primitive() => out.push(col(leaves.next().unwrap())?),
1467 ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Boolean | ArrowDataType::Null => {
1468 out.push(col(leaves.next().unwrap())?)
1469 }
1470 ArrowDataType::LargeBinary
1471 | ArrowDataType::Binary
1472 | ArrowDataType::Utf8
1473 | ArrowDataType::LargeUtf8
1474 | ArrowDataType::BinaryView
1475 | ArrowDataType::Utf8View => out.push(bytes(leaves.next().unwrap())?),
1476 ArrowDataType::List(f)
1477 | ArrowDataType::LargeList(f)
1478 | ArrowDataType::FixedSizeList(f, _)
1479 | ArrowDataType::ListView(f)
1480 | ArrowDataType::LargeListView(f) => {
1481 self.get_arrow_column_writer(f.data_type(), props, leaves, out)?
1482 }
1483 ArrowDataType::Struct(fields) => {
1484 for field in fields {
1485 self.get_arrow_column_writer(field.data_type(), props, leaves, out)?
1486 }
1487 }
1488 ArrowDataType::Map(f, _) => match f.data_type() {
1489 ArrowDataType::Struct(f) => {
1490 self.get_arrow_column_writer(f[0].data_type(), props, leaves, out)?;
1491 self.get_arrow_column_writer(f[1].data_type(), props, leaves, out)?
1492 }
1493 _ => unreachable!("invalid map type"),
1494 },
1495 ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1496 ArrowDataType::Utf8
1497 | ArrowDataType::LargeUtf8
1498 | ArrowDataType::Binary
1499 | ArrowDataType::LargeBinary => out.push(bytes(leaves.next().unwrap())?),
1500 ArrowDataType::Utf8View | ArrowDataType::BinaryView => {
1501 out.push(bytes(leaves.next().unwrap())?)
1502 }
1503 ArrowDataType::FixedSizeBinary(_) => out.push(bytes(leaves.next().unwrap())?),
1504 _ => out.push(col(leaves.next().unwrap())?),
1505 },
1506 _ => {
1507 return Err(ParquetError::NYI(format!(
1508 "Attempting to write an Arrow type {data_type} to parquet that is not yet implemented"
1509 )));
1510 }
1511 }
1512 Ok(())
1513 }
1514}
1515
1516fn write_leaf(
1517 writer: &mut ColumnWriter<'_>,
1518 column: &dyn arrow_array::Array,
1519 levels: &ArrayLevels,
1520) -> Result<usize> {
1521 let indices = levels.non_null_indices();
1522
1523 match writer {
1524 ColumnWriter::Int32ColumnWriter(typed) => {
1526 match column.data_type() {
1527 ArrowDataType::Null => {
1528 let array = Int32Array::new_null(column.len());
1529 write_primitive(typed, array.values(), levels)
1530 }
1531 ArrowDataType::Int8 => {
1532 let array: Int32Array = column.as_primitive::<Int8Type>().unary(|x| x as i32);
1533 write_primitive(typed, array.values(), levels)
1534 }
1535 ArrowDataType::Int16 => {
1536 let array: Int32Array = column.as_primitive::<Int16Type>().unary(|x| x as i32);
1537 write_primitive(typed, array.values(), levels)
1538 }
1539 ArrowDataType::Int32 => {
1540 write_primitive(typed, column.as_primitive::<Int32Type>().values(), levels)
1541 }
1542 ArrowDataType::UInt8 => {
1543 let array: Int32Array = column.as_primitive::<UInt8Type>().unary(|x| x as i32);
1544 write_primitive(typed, array.values(), levels)
1545 }
1546 ArrowDataType::UInt16 => {
1547 let array: Int32Array = column.as_primitive::<UInt16Type>().unary(|x| x as i32);
1548 write_primitive(typed, array.values(), levels)
1549 }
1550 ArrowDataType::UInt32 => {
1551 let array = column.as_primitive::<UInt32Type>();
1554 write_primitive(typed, array.values().inner().typed_data(), levels)
1555 }
1556 ArrowDataType::Date32 => {
1557 let array = column.as_primitive::<Date32Type>();
1558 write_primitive(typed, array.values(), levels)
1559 }
1560 ArrowDataType::Time32(TimeUnit::Second) => {
1561 let array = column.as_primitive::<Time32SecondType>();
1562 write_primitive(typed, array.values(), levels)
1563 }
1564 ArrowDataType::Time32(TimeUnit::Millisecond) => {
1565 let array = column.as_primitive::<Time32MillisecondType>();
1566 write_primitive(typed, array.values(), levels)
1567 }
1568 ArrowDataType::Date64 => {
1569 let array: Int32Array = column
1571 .as_primitive::<Date64Type>()
1572 .unary(|x| (x / 86_400_000) as _);
1573
1574 write_primitive(typed, array.values(), levels)
1575 }
1576 ArrowDataType::Decimal32(_, _) => {
1577 let array = column
1578 .as_primitive::<Decimal32Type>()
1579 .unary::<_, Int32Type>(|v| v);
1580 write_primitive(typed, array.values(), levels)
1581 }
1582 ArrowDataType::Decimal64(_, _) => {
1583 let array = column
1585 .as_primitive::<Decimal64Type>()
1586 .unary::<_, Int32Type>(|v| v as i32);
1587 write_primitive(typed, array.values(), levels)
1588 }
1589 ArrowDataType::Decimal128(_, _) => {
1590 let array = column
1592 .as_primitive::<Decimal128Type>()
1593 .unary::<_, Int32Type>(|v| v as i32);
1594 write_primitive(typed, array.values(), levels)
1595 }
1596 ArrowDataType::Decimal256(_, _) => {
1597 let array = column
1599 .as_primitive::<Decimal256Type>()
1600 .unary::<_, Int32Type>(|v| v.as_i128() as i32);
1601 write_primitive(typed, array.values(), levels)
1602 }
1603 d => Err(ParquetError::General(format!("Cannot coerce {d} to I32"))),
1604 }
1605 }
1606 ColumnWriter::BoolColumnWriter(typed) => {
1607 let array = column.as_boolean();
1608 let values = get_bool_array_slice(array, indices.iter().copied());
1609 typed.write_batch_internal(
1610 values.as_slice(),
1611 None,
1612 levels.def_level_data().as_ref(),
1613 levels.rep_level_data().as_ref(),
1614 None,
1615 None,
1616 None,
1617 )
1618 }
1619 ColumnWriter::Int64ColumnWriter(typed) => {
1620 match column.data_type() {
1621 ArrowDataType::Date64 => {
1622 let array = column
1623 .as_primitive::<Date64Type>()
1624 .reinterpret_cast::<Int64Type>();
1625
1626 write_primitive(typed, array.values(), levels)
1627 }
1628 ArrowDataType::Int64 => {
1629 let array = column.as_primitive::<Int64Type>();
1630 write_primitive(typed, array.values(), levels)
1631 }
1632 ArrowDataType::UInt64 => {
1633 let values = column.as_primitive::<UInt64Type>().values();
1634 let array = values.inner().typed_data::<i64>();
1637 write_primitive(typed, array, levels)
1638 }
1639 ArrowDataType::Time64(TimeUnit::Microsecond) => {
1640 let array = column.as_primitive::<Time64MicrosecondType>();
1641 write_primitive(typed, array.values(), levels)
1642 }
1643 ArrowDataType::Time64(TimeUnit::Nanosecond) => {
1644 let array = column.as_primitive::<Time64NanosecondType>();
1645 write_primitive(typed, array.values(), levels)
1646 }
1647 ArrowDataType::Timestamp(unit, _) => match unit {
1648 TimeUnit::Second => {
1649 let array = column.as_primitive::<TimestampSecondType>();
1650 write_primitive(typed, array.values(), levels)
1651 }
1652 TimeUnit::Millisecond => {
1653 let array = column.as_primitive::<TimestampMillisecondType>();
1654 write_primitive(typed, array.values(), levels)
1655 }
1656 TimeUnit::Microsecond => {
1657 let array = column.as_primitive::<TimestampMicrosecondType>();
1658 write_primitive(typed, array.values(), levels)
1659 }
1660 TimeUnit::Nanosecond => {
1661 let array = column.as_primitive::<TimestampNanosecondType>();
1662 write_primitive(typed, array.values(), levels)
1663 }
1664 },
1665 ArrowDataType::Duration(unit) => match unit {
1666 TimeUnit::Second => {
1667 let array = column.as_primitive::<DurationSecondType>();
1668 write_primitive(typed, array.values(), levels)
1669 }
1670 TimeUnit::Millisecond => {
1671 let array = column.as_primitive::<DurationMillisecondType>();
1672 write_primitive(typed, array.values(), levels)
1673 }
1674 TimeUnit::Microsecond => {
1675 let array = column.as_primitive::<DurationMicrosecondType>();
1676 write_primitive(typed, array.values(), levels)
1677 }
1678 TimeUnit::Nanosecond => {
1679 let array = column.as_primitive::<DurationNanosecondType>();
1680 write_primitive(typed, array.values(), levels)
1681 }
1682 },
1683 ArrowDataType::Decimal64(_, _) => {
1684 let array = column
1685 .as_primitive::<Decimal64Type>()
1686 .reinterpret_cast::<Int64Type>();
1687 write_primitive(typed, array.values(), levels)
1688 }
1689 ArrowDataType::Decimal128(_, _) => {
1690 let array = column
1692 .as_primitive::<Decimal128Type>()
1693 .unary::<_, Int64Type>(|v| v as i64);
1694 write_primitive(typed, array.values(), levels)
1695 }
1696 ArrowDataType::Decimal256(_, _) => {
1697 let array = column
1699 .as_primitive::<Decimal256Type>()
1700 .unary::<_, Int64Type>(|v| v.as_i128() as i64);
1701 write_primitive(typed, array.values(), levels)
1702 }
1703 d => Err(ParquetError::General(format!("Cannot coerce {d} to I64"))),
1704 }
1705 }
1706 ColumnWriter::Int96ColumnWriter(_typed) => {
1707 unreachable!("Currently unreachable because data type not supported")
1708 }
1709 ColumnWriter::FloatColumnWriter(typed) => {
1710 let array = column.as_primitive::<Float32Type>();
1711 write_primitive(typed, array.values(), levels)
1712 }
1713 ColumnWriter::DoubleColumnWriter(typed) => {
1714 let array = column.as_primitive::<Float64Type>();
1715 write_primitive(typed, array.values(), levels)
1716 }
1717 ColumnWriter::ByteArrayColumnWriter(_) => {
1718 unreachable!("should use ByteArrayWriter")
1719 }
1720 ColumnWriter::FixedLenByteArrayColumnWriter(typed) => {
1721 let bytes = match column.data_type() {
1722 ArrowDataType::Interval(interval_unit) => match interval_unit {
1723 IntervalUnit::YearMonth => {
1724 let array = column.as_primitive::<IntervalYearMonthType>();
1725 get_interval_ym_array_slice(array, indices.iter().copied())
1726 }
1727 IntervalUnit::DayTime => {
1728 let array = column.as_primitive::<IntervalDayTimeType>();
1729 get_interval_dt_array_slice(array, indices.iter().copied())
1730 }
1731 _ => {
1732 return Err(ParquetError::NYI(format!(
1733 "Attempting to write an Arrow interval type {interval_unit:?} to parquet that is not yet implemented"
1734 )));
1735 }
1736 },
1737 ArrowDataType::FixedSizeBinary(_) => {
1738 let array = column.as_fixed_size_binary();
1739 get_fsb_array_slice(array, indices.iter().copied())
1740 }
1741 ArrowDataType::Decimal32(_, _) => {
1742 let array = column.as_primitive::<Decimal32Type>();
1743 get_decimal_32_array_slice(array, indices.iter().copied())
1744 }
1745 ArrowDataType::Decimal64(_, _) => {
1746 let array = column.as_primitive::<Decimal64Type>();
1747 get_decimal_64_array_slice(array, indices.iter().copied())
1748 }
1749 ArrowDataType::Decimal128(_, _) => {
1750 let array = column.as_primitive::<Decimal128Type>();
1751 get_decimal_128_array_slice(array, indices.iter().copied())
1752 }
1753 ArrowDataType::Decimal256(_, _) => {
1754 let array = column.as_primitive::<Decimal256Type>();
1755 get_decimal_256_array_slice(array, indices.iter().copied())
1756 }
1757 ArrowDataType::Float16 => {
1758 let array = column.as_primitive::<Float16Type>();
1759 get_float_16_array_slice(array, indices.iter().copied())
1760 }
1761 _ => {
1762 return Err(ParquetError::NYI(
1763 "Attempting to write an Arrow type that is not yet implemented".to_string(),
1764 ));
1765 }
1766 };
1767 typed.write_batch_internal(
1768 bytes.as_slice(),
1769 None,
1770 levels.def_level_data().as_ref(),
1771 levels.rep_level_data().as_ref(),
1772 None,
1773 None,
1774 None,
1775 )
1776 }
1777 }
1778}
1779
1780fn write_primitive<E: ColumnValueEncoder>(
1781 writer: &mut GenericColumnWriter<E>,
1782 values: &E::Values,
1783 levels: &ArrayLevels,
1784) -> Result<usize> {
1785 writer.write_batch_internal(
1786 values,
1787 Some(levels.non_null_indices()),
1788 levels.def_level_data().as_ref(),
1789 levels.rep_level_data().as_ref(),
1790 None,
1791 None,
1792 None,
1793 )
1794}
1795
1796fn get_bool_array_slice(
1797 array: &arrow_array::BooleanArray,
1798 indices: impl ExactSizeIterator<Item = usize>,
1799) -> Vec<bool> {
1800 let mut values = Vec::with_capacity(indices.len());
1801 for i in indices {
1802 values.push(array.value(i))
1803 }
1804 values
1805}
1806
1807fn get_interval_ym_array_slice(
1810 array: &arrow_array::IntervalYearMonthArray,
1811 indices: impl ExactSizeIterator<Item = usize>,
1812) -> Vec<FixedLenByteArray> {
1813 let mut values = Vec::with_capacity(indices.len());
1814 for i in indices {
1815 let mut value = array.value(i).to_le_bytes().to_vec();
1816 let mut suffix = vec![0; 8];
1817 value.append(&mut suffix);
1818 values.push(FixedLenByteArray::from(ByteArray::from(value)))
1819 }
1820 values
1821}
1822
1823fn get_interval_dt_array_slice(
1826 array: &arrow_array::IntervalDayTimeArray,
1827 indices: impl ExactSizeIterator<Item = usize>,
1828) -> Vec<FixedLenByteArray> {
1829 let mut values = Vec::with_capacity(indices.len());
1830 for i in indices {
1831 let mut out = [0; 12];
1832 let value = array.value(i);
1833 out[4..8].copy_from_slice(&value.days.to_le_bytes());
1834 out[8..12].copy_from_slice(&value.milliseconds.to_le_bytes());
1835 values.push(FixedLenByteArray::from(ByteArray::from(out.to_vec())));
1836 }
1837 values
1838}
1839
1840fn get_decimal_32_array_slice(
1841 array: &arrow_array::Decimal32Array,
1842 indices: impl ExactSizeIterator<Item = usize>,
1843) -> Vec<FixedLenByteArray> {
1844 let mut values = Vec::with_capacity(indices.len());
1845 let size = decimal_length_from_precision(array.precision());
1846 for i in indices {
1847 let as_be_bytes = array.value(i).to_be_bytes();
1848 let resized_value = as_be_bytes[(4 - size)..].to_vec();
1849 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1850 }
1851 values
1852}
1853
1854fn get_decimal_64_array_slice(
1855 array: &arrow_array::Decimal64Array,
1856 indices: impl ExactSizeIterator<Item = usize>,
1857) -> Vec<FixedLenByteArray> {
1858 let mut values = Vec::with_capacity(indices.len());
1859 let size = decimal_length_from_precision(array.precision());
1860 for i in indices {
1861 let as_be_bytes = array.value(i).to_be_bytes();
1862 let resized_value = as_be_bytes[(8 - size)..].to_vec();
1863 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1864 }
1865 values
1866}
1867
1868fn get_decimal_128_array_slice(
1869 array: &arrow_array::Decimal128Array,
1870 indices: impl ExactSizeIterator<Item = usize>,
1871) -> Vec<FixedLenByteArray> {
1872 let mut values = Vec::with_capacity(indices.len());
1873 let size = decimal_length_from_precision(array.precision());
1874 for i in indices {
1875 let as_be_bytes = array.value(i).to_be_bytes();
1876 let resized_value = as_be_bytes[(16 - size)..].to_vec();
1877 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1878 }
1879 values
1880}
1881
1882fn get_decimal_256_array_slice(
1883 array: &arrow_array::Decimal256Array,
1884 indices: impl ExactSizeIterator<Item = usize>,
1885) -> Vec<FixedLenByteArray> {
1886 let mut values = Vec::with_capacity(indices.len());
1887 let size = decimal_length_from_precision(array.precision());
1888 for i in indices {
1889 let as_be_bytes = array.value(i).to_be_bytes();
1890 let resized_value = as_be_bytes[(32 - size)..].to_vec();
1891 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1892 }
1893 values
1894}
1895
1896fn get_float_16_array_slice(
1897 array: &arrow_array::Float16Array,
1898 indices: impl ExactSizeIterator<Item = usize>,
1899) -> Vec<FixedLenByteArray> {
1900 let mut values = Vec::with_capacity(indices.len());
1901 for i in indices {
1902 let value = array.value(i).to_le_bytes().to_vec();
1903 values.push(FixedLenByteArray::from(ByteArray::from(value)));
1904 }
1905 values
1906}
1907
1908fn get_fsb_array_slice(
1909 array: &arrow_array::FixedSizeBinaryArray,
1910 indices: impl ExactSizeIterator<Item = usize>,
1911) -> Vec<FixedLenByteArray> {
1912 let mut values = Vec::with_capacity(indices.len());
1913 for i in indices {
1914 let value = array.value(i).to_vec();
1915 values.push(FixedLenByteArray::from(ByteArray::from(value)))
1916 }
1917 values
1918}
1919
1920#[cfg(test)]
1921mod tests {
1922 use super::*;
1923 use std::collections::HashMap;
1924
1925 use std::fs::File;
1926
1927 use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
1928 use crate::arrow::{ARROW_SCHEMA_META_KEY, PARQUET_FIELD_ID_META_KEY};
1929 use crate::column::page::{Page, PageReader};
1930 use crate::file::metadata::thrift::PageHeader;
1931 use crate::file::page_index::column_index::ColumnIndexMetaData;
1932 use crate::file::reader::SerializedPageReader;
1933 use crate::parquet_thrift::{ReadThrift, ThriftSliceInputProtocol};
1934 use crate::schema::types::ColumnPath;
1935 use arrow::datatypes::ToByteSlice;
1936 use arrow::datatypes::{DataType, Schema};
1937 use arrow::error::Result as ArrowResult;
1938 use arrow::util::data_gen::create_random_array;
1939 use arrow::util::pretty::pretty_format_batches;
1940 use arrow::{array::*, buffer::Buffer};
1941 use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano, NullBuffer, OffsetBuffer, i256};
1942 use arrow_schema::Fields;
1943 use half::f16;
1944 use num_traits::{FromPrimitive, ToPrimitive};
1945 use tempfile::tempfile;
1946
1947 use crate::basic::Encoding;
1948 use crate::data_type::AsBytes;
1949 use crate::file::metadata::{ColumnChunkMetaData, ParquetMetaData, ParquetMetaDataReader};
1950 use crate::file::properties::{
1951 BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion,
1952 };
1953 use crate::file::serialized_reader::ReadOptionsBuilder;
1954 use crate::file::{
1955 reader::{FileReader, SerializedFileReader},
1956 statistics::Statistics,
1957 };
1958
1959 #[derive(Debug, Default)]
1964 struct RecordingPageStore {
1965 next: u64,
1966 blobs: HashMap<u64, Bytes>,
1967 puts: Arc<std::sync::atomic::AtomicUsize>,
1968 }
1969
1970 impl PageStore for RecordingPageStore {
1971 fn put(&mut self, value: Bytes) -> Result<PageKey> {
1972 let id = 100 + self.next * 7;
1974 self.next += 1;
1975 self.puts.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1976 self.blobs.insert(id, value);
1977 Ok(PageKey::new(id))
1978 }
1979
1980 fn take(&mut self, key: PageKey) -> Result<Bytes> {
1981 self.blobs
1982 .remove(&key.get())
1983 .ok_or_else(|| ParquetError::General(format!("missing key {}", key.get())))
1984 }
1985 }
1986
1987 #[derive(Debug)]
1988 struct RecordingPageStoreFactory {
1989 puts: Arc<std::sync::atomic::AtomicUsize>,
1990 }
1991
1992 impl PageStoreFactory for RecordingPageStoreFactory {
1993 fn create(&self, _args: &PageStoreArgs<'_>) -> Result<Box<dyn PageStore>> {
1994 Ok(Box::new(RecordingPageStore {
1995 puts: self.puts.clone(),
1996 ..Default::default()
1997 }))
1998 }
1999 }
2000
2001 #[test]
2005 fn custom_page_store_is_byte_identical_to_default() {
2006 let schema = Arc::new(Schema::new(vec![
2007 Field::new("i", DataType::Int32, true),
2008 Field::new("s", DataType::Utf8, true),
2010 ]));
2011 let i = Int32Array::from(vec![Some(1), None, Some(3), Some(4), Some(5), Some(6)]);
2012 let s = StringArray::from(vec![
2013 Some("a"),
2014 Some("bb"),
2015 Some("a"),
2016 None,
2017 Some("bb"),
2018 Some("ccc"),
2019 ]);
2020 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(i), Arc::new(s)]).unwrap();
2021
2022 let props = WriterProperties::builder()
2025 .set_max_row_group_row_count(Some(3))
2026 .build();
2027
2028 let write = |factory: Option<Arc<dyn PageStoreFactory>>| {
2029 let mut buffer = Vec::new();
2030 let mut opts = ArrowWriterOptions::new().with_properties(props.clone());
2031 if let Some(factory) = factory {
2032 opts = opts.with_page_store_factory(factory);
2033 }
2034 let mut writer =
2035 ArrowWriter::try_new_with_options(&mut buffer, schema.clone(), opts).unwrap();
2036 writer.write(&batch).unwrap();
2037 writer.close().unwrap();
2038 buffer
2039 };
2040
2041 let default_bytes = write(None);
2042
2043 let puts = Arc::new(std::sync::atomic::AtomicUsize::new(0));
2044 let custom_bytes = write(Some(Arc::new(RecordingPageStoreFactory {
2045 puts: puts.clone(),
2046 })));
2047
2048 assert!(
2049 puts.load(std::sync::atomic::Ordering::Relaxed) > 0,
2050 "custom PageStore was never written to"
2051 );
2052 assert_eq!(
2053 default_bytes, custom_bytes,
2054 "a custom PageStore must produce byte-identical output to the default"
2055 );
2056 }
2057
2058 #[test]
2064 fn dictionary_column_round_trips_with_offset_index_disabled() {
2065 let schema = Arc::new(Schema::new(vec![Field::new("k", DataType::Int32, true)]));
2066
2067 let values: Vec<Option<i32>> = (0..50_000).map(|i| Some(i % 8)).collect();
2070 let array = Int32Array::from(values.clone());
2071 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
2072
2073 let props = WriterProperties::builder()
2074 .set_offset_index_disabled(true)
2075 .set_data_page_row_count_limit(4096)
2076 .build();
2077 let opts = ArrowWriterOptions::new().with_properties(props);
2078
2079 let mut buffer = Vec::new();
2080 let mut writer =
2081 ArrowWriter::try_new_with_options(&mut buffer, schema.clone(), opts).unwrap();
2082 writer.write(&batch).unwrap();
2083 writer.close().unwrap();
2084
2085 let reader = ParquetRecordBatchReader::try_new(Bytes::from(buffer), values.len()).unwrap();
2086 let read: Vec<RecordBatch> = reader.collect::<ArrowResult<_>>().unwrap();
2087 let read_values: Vec<Option<i32>> = read
2088 .iter()
2089 .flat_map(|b| b.column(0).as_primitive::<Int32Type>().iter())
2090 .collect();
2091 assert_eq!(read_values, values);
2092 }
2093
2094 #[test]
2095 fn arrow_writer() {
2096 let schema = Schema::new(vec![
2098 Field::new("a", DataType::Int32, false),
2099 Field::new("b", DataType::Int32, true),
2100 ]);
2101
2102 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2104 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
2105
2106 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
2108
2109 roundtrip(batch, Some(SMALL_SIZE / 2));
2110 }
2111
2112 fn get_bytes_after_close(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
2113 let mut buffer = vec![];
2114
2115 let mut writer = ArrowWriter::try_new(&mut buffer, schema, None).unwrap();
2116 writer.write(expected_batch).unwrap();
2117 writer.close().unwrap();
2118
2119 buffer
2120 }
2121
2122 fn get_bytes_by_into_inner(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
2123 let mut writer = ArrowWriter::try_new(Vec::new(), schema, None).unwrap();
2124 writer.write(expected_batch).unwrap();
2125 writer.into_inner().unwrap()
2126 }
2127
2128 #[test]
2129 fn roundtrip_bytes() {
2130 let schema = Arc::new(Schema::new(vec![
2132 Field::new("a", DataType::Int32, false),
2133 Field::new("b", DataType::Int32, true),
2134 ]));
2135
2136 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2138 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
2139
2140 let expected_batch =
2142 RecordBatch::try_new(schema.clone(), vec![Arc::new(a), Arc::new(b)]).unwrap();
2143
2144 for buffer in [
2145 get_bytes_after_close(schema.clone(), &expected_batch),
2146 get_bytes_by_into_inner(schema, &expected_batch),
2147 ] {
2148 let cursor = Bytes::from(buffer);
2149 let mut record_batch_reader = ParquetRecordBatchReader::try_new(cursor, 1024).unwrap();
2150
2151 let actual_batch = record_batch_reader
2152 .next()
2153 .expect("No batch found")
2154 .expect("Unable to get batch");
2155
2156 assert_eq!(expected_batch.schema(), actual_batch.schema());
2157 assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
2158 assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
2159 for i in 0..expected_batch.num_columns() {
2160 let expected_data = expected_batch.column(i).to_data();
2161 let actual_data = actual_batch.column(i).to_data();
2162
2163 assert_eq!(expected_data, actual_data);
2164 }
2165 }
2166 }
2167
2168 #[test]
2169 fn arrow_writer_non_null() {
2170 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
2172
2173 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2175
2176 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2178
2179 roundtrip(batch, Some(SMALL_SIZE / 2));
2180 }
2181
2182 #[test]
2183 fn arrow_writer_list() {
2184 let schema = Schema::new(vec![Field::new(
2186 "a",
2187 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
2188 true,
2189 )]);
2190
2191 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2193
2194 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
2197
2198 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2200 DataType::Int32,
2201 false,
2202 ))))
2203 .len(5)
2204 .add_buffer(a_value_offsets)
2205 .add_child_data(a_values.into_data())
2206 .null_bit_buffer(Some(Buffer::from([0b00011011])))
2207 .build()
2208 .unwrap();
2209 let a = ListArray::from(a_list_data);
2210
2211 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2213
2214 assert_eq!(batch.column(0).null_count(), 1);
2215
2216 roundtrip(batch, None);
2219 }
2220
2221 #[test]
2222 fn arrow_writer_list_non_null() {
2223 let schema = Schema::new(vec![Field::new(
2225 "a",
2226 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
2227 false,
2228 )]);
2229
2230 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2232
2233 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
2236
2237 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2239 DataType::Int32,
2240 false,
2241 ))))
2242 .len(5)
2243 .add_buffer(a_value_offsets)
2244 .add_child_data(a_values.into_data())
2245 .build()
2246 .unwrap();
2247 let a = ListArray::from(a_list_data);
2248
2249 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2251
2252 assert_eq!(batch.column(0).null_count(), 0);
2255
2256 roundtrip(batch, None);
2257 }
2258
2259 #[test]
2260 fn arrow_writer_list_view() {
2261 let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
2262 let schema = Schema::new(vec![Field::new(
2263 "a",
2264 DataType::ListView(list_field.clone()),
2265 true,
2266 )]);
2267
2268 let a = ListViewArray::new(
2270 list_field,
2271 vec![0, 1, 0, 3, 6].into(),
2272 vec![1, 2, 0, 3, 4].into(),
2273 Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
2274 Some(vec![true, true, false, true, true].into()),
2275 );
2276
2277 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2278
2279 assert_eq!(batch.column(0).null_count(), 1);
2280
2281 roundtrip(batch, None);
2282 }
2283
2284 #[test]
2285 fn arrow_writer_list_view_non_null() {
2286 let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
2287 let schema = Schema::new(vec![Field::new(
2288 "a",
2289 DataType::ListView(list_field.clone()),
2290 false,
2291 )]);
2292
2293 let a = ListViewArray::new(
2295 list_field,
2296 vec![0, 1, 0, 3, 6].into(),
2297 vec![1, 2, 0, 3, 4].into(),
2298 Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
2299 None,
2300 );
2301
2302 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2303
2304 assert_eq!(batch.column(0).null_count(), 0);
2305
2306 roundtrip(batch, None);
2307 }
2308
2309 #[test]
2310 fn arrow_writer_list_view_out_of_order() {
2311 let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
2312 let schema = Schema::new(vec![Field::new(
2313 "a",
2314 DataType::ListView(list_field.clone()),
2315 false,
2316 )]);
2317
2318 let a = ListViewArray::new(
2320 list_field,
2321 vec![0, 1, 0, 6, 3].into(),
2322 vec![1, 2, 0, 4, 3].into(),
2323 Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
2324 None,
2325 );
2326
2327 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2328
2329 roundtrip(batch, None);
2330 }
2331
2332 #[test]
2333 fn arrow_writer_large_list_view() {
2334 let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
2335 let schema = Schema::new(vec![Field::new(
2336 "a",
2337 DataType::LargeListView(list_field.clone()),
2338 true,
2339 )]);
2340
2341 let a = LargeListViewArray::new(
2343 list_field,
2344 vec![0i64, 1, 0, 3, 6].into(),
2345 vec![1i64, 2, 0, 3, 4].into(),
2346 Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
2347 Some(vec![true, true, false, true, true].into()),
2348 );
2349
2350 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2351
2352 assert_eq!(batch.column(0).null_count(), 1);
2353
2354 roundtrip(batch, None);
2355 }
2356
2357 #[test]
2358 fn arrow_writer_list_view_with_struct() {
2359 let struct_fields = Fields::from(vec![
2361 Field::new("id", DataType::Int32, false),
2362 Field::new("name", DataType::Utf8, false),
2363 ]);
2364 let struct_type = DataType::Struct(struct_fields.clone());
2365 let list_field = Arc::new(Field::new("item", struct_type.clone(), false));
2366
2367 let schema = Schema::new(vec![Field::new(
2368 "a",
2369 DataType::ListView(list_field.clone()),
2370 true,
2371 )]);
2372
2373 let id_array = Int32Array::from(vec![1, 2, 3, 4, 5]);
2375 let name_array = StringArray::from(vec!["a", "b", "c", "d", "e"]);
2376 let struct_array = StructArray::new(
2377 struct_fields,
2378 vec![Arc::new(id_array), Arc::new(name_array)],
2379 None,
2380 );
2381
2382 let list_view = ListViewArray::new(
2384 list_field,
2385 vec![0, 2, 2].into(), vec![2, 0, 3].into(), Arc::new(struct_array),
2388 Some(vec![true, false, true].into()),
2389 );
2390
2391 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(list_view)]).unwrap();
2392
2393 roundtrip(batch, None);
2394 }
2395
2396 #[test]
2397 fn arrow_writer_binary() {
2398 let string_field = Field::new("a", DataType::Utf8, false);
2399 let binary_field = Field::new("b", DataType::Binary, false);
2400 let schema = Schema::new(vec![string_field, binary_field]);
2401
2402 let raw_string_values = vec!["foo", "bar", "baz", "quux"];
2403 let raw_binary_values = [
2404 b"foo".to_vec(),
2405 b"bar".to_vec(),
2406 b"baz".to_vec(),
2407 b"quux".to_vec(),
2408 ];
2409 let raw_binary_value_refs = raw_binary_values
2410 .iter()
2411 .map(|x| x.as_slice())
2412 .collect::<Vec<_>>();
2413
2414 let string_values = StringArray::from(raw_string_values.clone());
2415 let binary_values = BinaryArray::from(raw_binary_value_refs);
2416 let batch = RecordBatch::try_new(
2417 Arc::new(schema),
2418 vec![Arc::new(string_values), Arc::new(binary_values)],
2419 )
2420 .unwrap();
2421
2422 roundtrip(batch, Some(SMALL_SIZE / 2));
2423 }
2424
2425 #[test]
2426 fn arrow_writer_binary_view() {
2427 let string_field = Field::new("a", DataType::Utf8View, false);
2428 let binary_field = Field::new("b", DataType::BinaryView, false);
2429 let nullable_string_field = Field::new("a", DataType::Utf8View, true);
2430 let schema = Schema::new(vec![string_field, binary_field, nullable_string_field]);
2431
2432 let raw_string_values = vec!["foo", "bar", "large payload over 12 bytes", "lulu"];
2433 let raw_binary_values = vec![
2434 b"foo".to_vec(),
2435 b"bar".to_vec(),
2436 b"large payload over 12 bytes".to_vec(),
2437 b"lulu".to_vec(),
2438 ];
2439 let nullable_string_values =
2440 vec![Some("foo"), None, Some("large payload over 12 bytes"), None];
2441
2442 let string_view_values = StringViewArray::from(raw_string_values);
2443 let binary_view_values = BinaryViewArray::from_iter_values(raw_binary_values);
2444 let nullable_string_view_values = StringViewArray::from(nullable_string_values);
2445 let batch = RecordBatch::try_new(
2446 Arc::new(schema),
2447 vec![
2448 Arc::new(string_view_values),
2449 Arc::new(binary_view_values),
2450 Arc::new(nullable_string_view_values),
2451 ],
2452 )
2453 .unwrap();
2454
2455 roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
2456 roundtrip(batch, None);
2457 }
2458
2459 #[test]
2460 fn arrow_writer_binary_view_long_value() {
2461 let string_field = Field::new("a", DataType::Utf8View, false);
2462 let binary_field = Field::new("b", DataType::BinaryView, false);
2463 let schema = Schema::new(vec![string_field, binary_field]);
2464
2465 let long = "a".repeat(128);
2469 let raw_string_values = vec!["foo", long.as_str(), "bar"];
2470 let raw_binary_values = vec![b"foo".to_vec(), long.as_bytes().to_vec(), b"bar".to_vec()];
2471
2472 let string_view_values: ArrayRef = Arc::new(StringViewArray::from(raw_string_values));
2473 let binary_view_values: ArrayRef =
2474 Arc::new(BinaryViewArray::from_iter_values(raw_binary_values));
2475
2476 one_column_roundtrip(Arc::clone(&string_view_values), false);
2477 one_column_roundtrip(Arc::clone(&binary_view_values), false);
2478
2479 let batch = RecordBatch::try_new(
2480 Arc::new(schema),
2481 vec![string_view_values, binary_view_values],
2482 )
2483 .unwrap();
2484
2485 for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2487 let props = WriterProperties::builder()
2488 .set_writer_version(version)
2489 .set_dictionary_enabled(false)
2490 .build();
2491 roundtrip_opts(&batch, props);
2492 }
2493 }
2494
2495 fn get_decimal_batch(precision: u8, scale: i8) -> RecordBatch {
2496 let decimal_field = Field::new("a", DataType::Decimal128(precision, scale), false);
2497 let schema = Schema::new(vec![decimal_field]);
2498
2499 let decimal_values = vec![10_000, 50_000, 0, -100]
2500 .into_iter()
2501 .map(Some)
2502 .collect::<Decimal128Array>()
2503 .with_precision_and_scale(precision, scale)
2504 .unwrap();
2505
2506 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(decimal_values)]).unwrap()
2507 }
2508
2509 #[test]
2510 fn arrow_writer_decimal() {
2511 let batch_int32_decimal = get_decimal_batch(5, 2);
2513 roundtrip(batch_int32_decimal, Some(SMALL_SIZE / 2));
2514 let batch_int64_decimal = get_decimal_batch(12, 2);
2516 roundtrip(batch_int64_decimal, Some(SMALL_SIZE / 2));
2517 let batch_fixed_len_byte_array_decimal = get_decimal_batch(30, 2);
2519 roundtrip(batch_fixed_len_byte_array_decimal, Some(SMALL_SIZE / 2));
2520 }
2521
2522 #[test]
2523 fn arrow_writer_complex() {
2524 let struct_field_d = Arc::new(Field::new("d", DataType::Float64, true));
2526 let struct_field_f = Arc::new(Field::new("f", DataType::Float32, true));
2527 let struct_field_g = Arc::new(Field::new_list(
2528 "g",
2529 Field::new_list_field(DataType::Int16, true),
2530 false,
2531 ));
2532 let struct_field_h = Arc::new(Field::new_list(
2533 "h",
2534 Field::new_list_field(DataType::Int16, false),
2535 true,
2536 ));
2537 let struct_field_e = Arc::new(Field::new_struct(
2538 "e",
2539 vec![
2540 struct_field_f.clone(),
2541 struct_field_g.clone(),
2542 struct_field_h.clone(),
2543 ],
2544 false,
2545 ));
2546 let schema = Schema::new(vec![
2547 Field::new("a", DataType::Int32, false),
2548 Field::new("b", DataType::Int32, true),
2549 Field::new_struct(
2550 "c",
2551 vec![struct_field_d.clone(), struct_field_e.clone()],
2552 false,
2553 ),
2554 ]);
2555
2556 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2558 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
2559 let d = Float64Array::from(vec![None, None, None, Some(1.0), None]);
2560 let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]);
2561
2562 let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2563
2564 let g_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
2567
2568 let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
2570 .len(5)
2571 .add_buffer(g_value_offsets.clone())
2572 .add_child_data(g_value.to_data())
2573 .build()
2574 .unwrap();
2575 let g = ListArray::from(g_list_data);
2576 let h_list_data = ArrayData::builder(struct_field_h.data_type().clone())
2578 .len(5)
2579 .add_buffer(g_value_offsets)
2580 .add_child_data(g_value.to_data())
2581 .null_bit_buffer(Some(Buffer::from([0b00011011])))
2582 .build()
2583 .unwrap();
2584 let h = ListArray::from(h_list_data);
2585
2586 let e = StructArray::from(vec![
2587 (struct_field_f, Arc::new(f) as ArrayRef),
2588 (struct_field_g, Arc::new(g) as ArrayRef),
2589 (struct_field_h, Arc::new(h) as ArrayRef),
2590 ]);
2591
2592 let c = StructArray::from(vec![
2593 (struct_field_d, Arc::new(d) as ArrayRef),
2594 (struct_field_e, Arc::new(e) as ArrayRef),
2595 ]);
2596
2597 let batch = RecordBatch::try_new(
2599 Arc::new(schema),
2600 vec![Arc::new(a), Arc::new(b), Arc::new(c)],
2601 )
2602 .unwrap();
2603
2604 roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
2605 roundtrip(batch, Some(SMALL_SIZE / 3));
2606 }
2607
2608 #[test]
2609 fn arrow_writer_complex_mixed() {
2610 let offset_field = Arc::new(Field::new("offset", DataType::Int32, false));
2615 let partition_field = Arc::new(Field::new("partition", DataType::Int64, true));
2616 let topic_field = Arc::new(Field::new("topic", DataType::Utf8, true));
2617 let schema = Schema::new(vec![Field::new(
2618 "some_nested_object",
2619 DataType::Struct(Fields::from(vec![
2620 offset_field.clone(),
2621 partition_field.clone(),
2622 topic_field.clone(),
2623 ])),
2624 false,
2625 )]);
2626
2627 let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
2629 let partition = Int64Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
2630 let topic = StringArray::from(vec![Some("A"), None, Some("A"), Some(""), None]);
2631
2632 let some_nested_object = StructArray::from(vec![
2633 (offset_field, Arc::new(offset) as ArrayRef),
2634 (partition_field, Arc::new(partition) as ArrayRef),
2635 (topic_field, Arc::new(topic) as ArrayRef),
2636 ]);
2637
2638 let batch =
2640 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
2641
2642 roundtrip(batch, Some(SMALL_SIZE / 2));
2643 }
2644
2645 #[test]
2646 fn arrow_writer_map() {
2647 let json_content = r#"
2649 {"stocks":{"long": "$AAA", "short": "$BBB"}}
2650 {"stocks":{"long": null, "long": "$CCC", "short": null}}
2651 {"stocks":{"hedged": "$YYY", "long": null, "short": "$D"}}
2652 "#;
2653 let entries_struct_type = DataType::Struct(Fields::from(vec![
2654 Field::new("key", DataType::Utf8, false),
2655 Field::new("value", DataType::Utf8, true),
2656 ]));
2657 let stocks_field = Field::new(
2658 "stocks",
2659 DataType::Map(
2660 Arc::new(Field::new("entries", entries_struct_type, false)),
2661 false,
2662 ),
2663 true,
2664 );
2665 let schema = Arc::new(Schema::new(vec![stocks_field]));
2666 let builder = arrow::json::ReaderBuilder::new(schema).with_batch_size(64);
2667 let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
2668
2669 let batch = reader.next().unwrap().unwrap();
2670 roundtrip(batch, None);
2671 }
2672
2673 #[test]
2674 fn arrow_writer_2_level_struct() {
2675 let field_c = Field::new("c", DataType::Int32, true);
2677 let field_b = Field::new("b", DataType::Struct(vec![field_c].into()), true);
2678 let type_a = DataType::Struct(vec![field_b.clone()].into());
2679 let field_a = Field::new("a", type_a, true);
2680 let schema = Schema::new(vec![field_a.clone()]);
2681
2682 let c = Int32Array::from(vec![Some(1), None, Some(3), None, None, Some(6)]);
2684 let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
2685 .len(6)
2686 .null_bit_buffer(Some(Buffer::from([0b00100111])))
2687 .add_child_data(c.into_data())
2688 .build()
2689 .unwrap();
2690 let b = StructArray::from(b_data);
2691 let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
2692 .len(6)
2693 .null_bit_buffer(Some(Buffer::from([0b00101111])))
2694 .add_child_data(b.into_data())
2695 .build()
2696 .unwrap();
2697 let a = StructArray::from(a_data);
2698
2699 assert_eq!(a.null_count(), 1);
2700 assert_eq!(a.column(0).null_count(), 2);
2701
2702 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2704
2705 roundtrip(batch, Some(SMALL_SIZE / 2));
2706 }
2707
2708 #[test]
2709 fn arrow_writer_2_level_struct_non_null() {
2710 let field_c = Field::new("c", DataType::Int32, false);
2712 let type_b = DataType::Struct(vec![field_c].into());
2713 let field_b = Field::new("b", type_b.clone(), false);
2714 let type_a = DataType::Struct(vec![field_b].into());
2715 let field_a = Field::new("a", type_a.clone(), false);
2716 let schema = Schema::new(vec![field_a]);
2717
2718 let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
2720 let b_data = ArrayDataBuilder::new(type_b)
2721 .len(6)
2722 .add_child_data(c.into_data())
2723 .build()
2724 .unwrap();
2725 let b = StructArray::from(b_data);
2726 let a_data = ArrayDataBuilder::new(type_a)
2727 .len(6)
2728 .add_child_data(b.into_data())
2729 .build()
2730 .unwrap();
2731 let a = StructArray::from(a_data);
2732
2733 assert_eq!(a.null_count(), 0);
2734 assert_eq!(a.column(0).null_count(), 0);
2735
2736 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2738
2739 roundtrip(batch, Some(SMALL_SIZE / 2));
2740 }
2741
2742 #[test]
2743 fn arrow_writer_2_level_struct_mixed_null() {
2744 let field_c = Field::new("c", DataType::Int32, false);
2746 let type_b = DataType::Struct(vec![field_c].into());
2747 let field_b = Field::new("b", type_b.clone(), true);
2748 let type_a = DataType::Struct(vec![field_b].into());
2749 let field_a = Field::new("a", type_a.clone(), false);
2750 let schema = Schema::new(vec![field_a]);
2751
2752 let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
2754 let b_data = ArrayDataBuilder::new(type_b)
2755 .len(6)
2756 .null_bit_buffer(Some(Buffer::from([0b00100111])))
2757 .add_child_data(c.into_data())
2758 .build()
2759 .unwrap();
2760 let b = StructArray::from(b_data);
2761 let a_data = ArrayDataBuilder::new(type_a)
2763 .len(6)
2764 .add_child_data(b.into_data())
2765 .build()
2766 .unwrap();
2767 let a = StructArray::from(a_data);
2768
2769 assert_eq!(a.null_count(), 0);
2770 assert_eq!(a.column(0).null_count(), 2);
2771
2772 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2774
2775 roundtrip(batch, Some(SMALL_SIZE / 2));
2776 }
2777
2778 #[test]
2779 fn arrow_writer_2_level_struct_mixed_null_2() {
2780 let field_c = Field::new("c", DataType::Int32, false);
2782 let field_d = Field::new("d", DataType::FixedSizeBinary(4), false);
2783 let field_e = Field::new(
2784 "e",
2785 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2786 false,
2787 );
2788
2789 let field_b = Field::new(
2790 "b",
2791 DataType::Struct(vec![field_c, field_d, field_e].into()),
2792 false,
2793 );
2794 let type_a = DataType::Struct(vec![field_b.clone()].into());
2795 let field_a = Field::new("a", type_a, true);
2796 let schema = Schema::new(vec![field_a.clone()]);
2797
2798 let c = Int32Array::from_iter_values(0..6);
2800 let d = FixedSizeBinaryArray::try_from_iter(
2801 ["aaaa", "bbbb", "cccc", "dddd", "eeee", "ffff"].into_iter(),
2802 )
2803 .expect("four byte values");
2804 let e = Int32DictionaryArray::from_iter(["one", "two", "three", "four", "five", "one"]);
2805 let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
2806 .len(6)
2807 .add_child_data(c.into_data())
2808 .add_child_data(d.into_data())
2809 .add_child_data(e.into_data())
2810 .build()
2811 .unwrap();
2812 let b = StructArray::from(b_data);
2813 let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
2814 .len(6)
2815 .null_bit_buffer(Some(Buffer::from([0b00100101])))
2816 .add_child_data(b.into_data())
2817 .build()
2818 .unwrap();
2819 let a = StructArray::from(a_data);
2820
2821 assert_eq!(a.null_count(), 3);
2822 assert_eq!(a.column(0).null_count(), 0);
2823
2824 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2826
2827 roundtrip(batch, Some(SMALL_SIZE / 2));
2828 }
2829
2830 #[test]
2831 fn test_fixed_size_binary_in_dict() {
2832 fn test_fixed_size_binary_in_dict_inner<K>()
2833 where
2834 K: ArrowDictionaryKeyType,
2835 K::Native: FromPrimitive + ToPrimitive + TryFrom<u8>,
2836 <<K as arrow_array::ArrowPrimitiveType>::Native as TryFrom<u8>>::Error: std::fmt::Debug,
2837 {
2838 let field = Field::new(
2839 "a",
2840 DataType::Dictionary(
2841 Box::new(K::DATA_TYPE),
2842 Box::new(DataType::FixedSizeBinary(4)),
2843 ),
2844 false,
2845 );
2846 let schema = Schema::new(vec![field]);
2847
2848 let keys: Vec<K::Native> = vec![
2849 K::Native::try_from(0u8).unwrap(),
2850 K::Native::try_from(0u8).unwrap(),
2851 K::Native::try_from(1u8).unwrap(),
2852 ];
2853 let keys = PrimitiveArray::<K>::from_iter_values(keys);
2854 let values = FixedSizeBinaryArray::try_from_iter(
2855 vec![vec![0, 0, 0, 0], vec![1, 1, 1, 1]].into_iter(),
2856 )
2857 .unwrap();
2858
2859 let data = DictionaryArray::<K>::new(keys, Arc::new(values));
2860 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(data)]).unwrap();
2861 roundtrip(batch, None);
2862 }
2863
2864 test_fixed_size_binary_in_dict_inner::<UInt8Type>();
2865 test_fixed_size_binary_in_dict_inner::<UInt16Type>();
2866 test_fixed_size_binary_in_dict_inner::<UInt32Type>();
2867 test_fixed_size_binary_in_dict_inner::<UInt16Type>();
2868 test_fixed_size_binary_in_dict_inner::<Int8Type>();
2869 test_fixed_size_binary_in_dict_inner::<Int16Type>();
2870 test_fixed_size_binary_in_dict_inner::<Int32Type>();
2871 test_fixed_size_binary_in_dict_inner::<Int64Type>();
2872 }
2873
2874 #[test]
2875 fn test_empty_dict() {
2876 let struct_fields = Fields::from(vec![Field::new(
2877 "dict",
2878 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2879 false,
2880 )]);
2881
2882 let schema = Schema::new(vec![Field::new_struct(
2883 "struct",
2884 struct_fields.clone(),
2885 true,
2886 )]);
2887 let dictionary = Arc::new(DictionaryArray::new(
2888 Int32Array::new_null(5),
2889 Arc::new(StringArray::new_null(0)),
2890 ));
2891
2892 let s = StructArray::new(
2893 struct_fields,
2894 vec![dictionary],
2895 Some(NullBuffer::new_null(5)),
2896 );
2897
2898 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(s)]).unwrap();
2899 roundtrip(batch, None);
2900 }
2901 #[test]
2902 fn arrow_writer_page_size() {
2903 let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)]));
2904
2905 let mut builder = StringBuilder::with_capacity(100, 329 * 10_000);
2906
2907 for i in 0..10 {
2909 let value = i
2910 .to_string()
2911 .repeat(10)
2912 .chars()
2913 .take(10)
2914 .collect::<String>();
2915
2916 builder.append_value(value);
2917 }
2918
2919 let array = Arc::new(builder.finish());
2920
2921 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
2922
2923 let file = tempfile::tempfile().unwrap();
2924
2925 let props = WriterProperties::builder()
2927 .set_data_page_size_limit(1)
2928 .set_dictionary_page_size_limit(1)
2929 .set_write_batch_size(1)
2930 .build();
2931
2932 let mut writer =
2933 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
2934 .expect("Unable to write file");
2935 writer.write(&batch).unwrap();
2936 writer.close().unwrap();
2937
2938 let options = ReadOptionsBuilder::new().with_page_index().build();
2939 let reader =
2940 SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap();
2941
2942 let column = reader.metadata().row_group(0).columns();
2943
2944 assert_eq!(column.len(), 1);
2945
2946 assert!(
2949 column[0].dictionary_page_offset().is_some(),
2950 "Expected a dictionary page"
2951 );
2952
2953 assert!(reader.metadata().offset_index().is_some());
2954 let offset_indexes = &reader.metadata().offset_index().unwrap()[0];
2955
2956 let page_locations = offset_indexes[0].page_locations.clone();
2957
2958 assert_eq!(
2961 page_locations.len(),
2962 10,
2963 "Expected 10 pages but got {page_locations:#?}"
2964 );
2965 }
2966
2967 #[test]
2968 fn arrow_writer_float_nans() {
2969 let f16_field = Field::new("a", DataType::Float16, false);
2970 let f32_field = Field::new("b", DataType::Float32, false);
2971 let f64_field = Field::new("c", DataType::Float64, false);
2972 let schema = Schema::new(vec![f16_field, f32_field, f64_field]);
2973
2974 let f16_values = (0..MEDIUM_SIZE)
2975 .map(|i| {
2976 Some(if i % 2 == 0 {
2977 f16::NAN
2978 } else {
2979 f16::from_f32(i as f32)
2980 })
2981 })
2982 .collect::<Float16Array>();
2983
2984 let f32_values = (0..MEDIUM_SIZE)
2985 .map(|i| Some(if i % 2 == 0 { f32::NAN } else { i as f32 }))
2986 .collect::<Float32Array>();
2987
2988 let f64_values = (0..MEDIUM_SIZE)
2989 .map(|i| Some(if i % 2 == 0 { f64::NAN } else { i as f64 }))
2990 .collect::<Float64Array>();
2991
2992 let batch = RecordBatch::try_new(
2993 Arc::new(schema),
2994 vec![
2995 Arc::new(f16_values),
2996 Arc::new(f32_values),
2997 Arc::new(f64_values),
2998 ],
2999 )
3000 .unwrap();
3001
3002 roundtrip(batch, None);
3003 }
3004
3005 const SMALL_SIZE: usize = 7;
3006 const MEDIUM_SIZE: usize = 63;
3007
3008 fn roundtrip(expected_batch: RecordBatch, max_row_group_size: Option<usize>) -> Vec<Bytes> {
3011 let mut files = vec![];
3012 for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
3013 let mut props = WriterProperties::builder().set_writer_version(version);
3014
3015 if let Some(size) = max_row_group_size {
3016 props = props.set_max_row_group_row_count(Some(size))
3017 }
3018
3019 let props = props.build();
3020 files.push(roundtrip_opts(&expected_batch, props))
3021 }
3022 files
3023 }
3024
3025 fn roundtrip_opts_with_array_validation<F>(
3029 expected_batch: &RecordBatch,
3030 props: WriterProperties,
3031 validate: F,
3032 ) -> Bytes
3033 where
3034 F: Fn(&ArrayData, &ArrayData),
3035 {
3036 let mut file = vec![];
3037
3038 let mut writer = ArrowWriter::try_new(&mut file, expected_batch.schema(), Some(props))
3039 .expect("Unable to write file");
3040 writer.write(expected_batch).unwrap();
3041 writer.close().unwrap();
3042
3043 let file = Bytes::from(file);
3044 let mut record_batch_reader =
3045 ParquetRecordBatchReader::try_new(file.clone(), 1024).unwrap();
3046
3047 let actual_batch = record_batch_reader
3048 .next()
3049 .expect("No batch found")
3050 .expect("Unable to get batch");
3051
3052 assert_eq!(expected_batch.schema(), actual_batch.schema());
3053 assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
3054 assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
3055 for i in 0..expected_batch.num_columns() {
3056 let expected_data = expected_batch.column(i).to_data();
3057 let actual_data = actual_batch.column(i).to_data();
3058 validate(&expected_data, &actual_data);
3059 }
3060
3061 file
3062 }
3063
3064 fn roundtrip_opts(expected_batch: &RecordBatch, props: WriterProperties) -> Bytes {
3065 roundtrip_opts_with_array_validation(expected_batch, props, |a, b| {
3066 a.validate_full().expect("valid expected data");
3067 b.validate_full().expect("valid actual data");
3068 assert_eq!(a, b)
3069 })
3070 }
3071
3072 struct RoundTripOptions {
3073 values: ArrayRef,
3074 schema: SchemaRef,
3075 bloom_filter: bool,
3076 bloom_filter_ndv: Option<u64>,
3077 bloom_filter_position: BloomFilterPosition,
3078 }
3079
3080 impl RoundTripOptions {
3081 fn new(values: ArrayRef, nullable: bool) -> Self {
3082 let data_type = values.data_type().clone();
3083 let schema = Schema::new(vec![Field::new("col", data_type, nullable)]);
3084 Self {
3085 values,
3086 schema: Arc::new(schema),
3087 bloom_filter: false,
3088 bloom_filter_ndv: None,
3089 bloom_filter_position: BloomFilterPosition::AfterRowGroup,
3090 }
3091 }
3092 }
3093
3094 fn one_column_roundtrip(values: ArrayRef, nullable: bool) -> Vec<Bytes> {
3095 one_column_roundtrip_with_options(RoundTripOptions::new(values, nullable))
3096 }
3097
3098 fn one_column_roundtrip_with_schema(values: ArrayRef, schema: SchemaRef) -> Vec<Bytes> {
3099 let mut options = RoundTripOptions::new(values, false);
3100 options.schema = schema;
3101 one_column_roundtrip_with_options(options)
3102 }
3103
3104 fn one_column_roundtrip_with_options(options: RoundTripOptions) -> Vec<Bytes> {
3105 let RoundTripOptions {
3106 values,
3107 schema,
3108 bloom_filter,
3109 bloom_filter_ndv,
3110 bloom_filter_position,
3111 } = options;
3112
3113 let encodings = match values.data_type() {
3114 DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary | DataType::LargeBinary => {
3115 vec![
3116 Encoding::PLAIN,
3117 Encoding::DELTA_BYTE_ARRAY,
3118 Encoding::DELTA_LENGTH_BYTE_ARRAY,
3119 ]
3120 }
3121 DataType::Int64
3122 | DataType::Int32
3123 | DataType::Int16
3124 | DataType::Int8
3125 | DataType::UInt64
3126 | DataType::UInt32
3127 | DataType::UInt16
3128 | DataType::UInt8 => vec![
3129 Encoding::PLAIN,
3130 Encoding::DELTA_BINARY_PACKED,
3131 Encoding::BYTE_STREAM_SPLIT,
3132 ],
3133 DataType::Float32 | DataType::Float64 => {
3134 vec![Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT]
3135 }
3136 _ => vec![Encoding::PLAIN],
3137 };
3138
3139 let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3140
3141 let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
3142
3143 let mut files = vec![];
3144 for dictionary_size in [0, 1, 1024] {
3145 for encoding in &encodings {
3146 for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
3147 for row_group_size in row_group_sizes {
3148 let mut builder = WriterProperties::builder()
3149 .set_writer_version(version)
3150 .set_max_row_group_row_count(Some(row_group_size))
3151 .set_dictionary_enabled(dictionary_size != 0)
3152 .set_dictionary_page_size_limit(dictionary_size.max(1))
3153 .set_encoding(*encoding)
3154 .set_bloom_filter_enabled(bloom_filter)
3155 .set_bloom_filter_position(bloom_filter_position);
3156 if let Some(ndv) = bloom_filter_ndv {
3157 builder = builder.set_bloom_filter_max_ndv(ndv);
3158 }
3159 let props = builder.build();
3160
3161 files.push(roundtrip_opts(&expected_batch, props))
3162 }
3163 }
3164 }
3165 }
3166 files
3167 }
3168
3169 fn values_required<A, I>(iter: I) -> Vec<Bytes>
3170 where
3171 A: From<Vec<I::Item>> + Array + 'static,
3172 I: IntoIterator,
3173 {
3174 let raw_values: Vec<_> = iter.into_iter().collect();
3175 let values = Arc::new(A::from(raw_values));
3176 one_column_roundtrip(values, false)
3177 }
3178
3179 fn values_optional<A, I>(iter: I) -> Vec<Bytes>
3180 where
3181 A: From<Vec<Option<I::Item>>> + Array + 'static,
3182 I: IntoIterator,
3183 {
3184 let optional_raw_values: Vec<_> = iter
3185 .into_iter()
3186 .enumerate()
3187 .map(|(i, v)| if i % 2 == 0 { None } else { Some(v) })
3188 .collect();
3189 let optional_values = Arc::new(A::from(optional_raw_values));
3190 one_column_roundtrip(optional_values, true)
3191 }
3192
3193 fn required_and_optional<A, I>(iter: I)
3194 where
3195 A: From<Vec<I::Item>> + From<Vec<Option<I::Item>>> + Array + 'static,
3196 I: IntoIterator + Clone,
3197 {
3198 values_required::<A, I>(iter.clone());
3199 values_optional::<A, I>(iter);
3200 }
3201
3202 fn check_bloom_filter<T: AsBytes>(
3203 files: Vec<Bytes>,
3204 file_column: String,
3205 positive_values: Vec<T>,
3206 negative_values: Vec<T>,
3207 ) {
3208 files.into_iter().take(1).for_each(|file| {
3209 let file_reader = SerializedFileReader::new_with_options(
3210 file,
3211 ReadOptionsBuilder::new()
3212 .with_reader_properties(
3213 ReaderProperties::builder()
3214 .set_read_bloom_filter(true)
3215 .build(),
3216 )
3217 .build(),
3218 )
3219 .expect("Unable to open file as Parquet");
3220 let metadata = file_reader.metadata();
3221
3222 let mut bloom_filters: Vec<_> = vec![];
3224 for (ri, row_group) in metadata.row_groups().iter().enumerate() {
3225 if let Some((column_index, _)) = row_group
3226 .columns()
3227 .iter()
3228 .enumerate()
3229 .find(|(_, column)| column.column_path().string() == file_column)
3230 {
3231 let row_group_reader = file_reader
3232 .get_row_group(ri)
3233 .expect("Unable to read row group");
3234 if let Some(sbbf) = row_group_reader.get_column_bloom_filter(column_index) {
3235 bloom_filters.push(sbbf.clone());
3236 } else {
3237 panic!("No bloom filter for column named {file_column} found");
3238 }
3239 } else {
3240 panic!("No column named {file_column} found");
3241 }
3242 }
3243
3244 positive_values.iter().for_each(|value| {
3245 let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
3246 assert!(
3247 found.is_some(),
3248 "{}",
3249 format!("Value {:?} should be in bloom filter", value.as_bytes())
3250 );
3251 });
3252
3253 negative_values.iter().for_each(|value| {
3254 let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
3255 assert!(
3256 found.is_none(),
3257 "{}",
3258 format!("Value {:?} should not be in bloom filter", value.as_bytes())
3259 );
3260 });
3261 });
3262 }
3263
3264 #[test]
3265 fn all_null_primitive_single_column() {
3266 let values = Arc::new(Int32Array::from(vec![None; SMALL_SIZE]));
3267 one_column_roundtrip(values, true);
3268 }
3269 #[test]
3270 fn null_single_column() {
3271 let values = Arc::new(NullArray::new(SMALL_SIZE));
3272 one_column_roundtrip(values, true);
3273 }
3275
3276 #[test]
3277 fn bool_single_column() {
3278 required_and_optional::<BooleanArray, _>(
3279 [true, false].iter().cycle().copied().take(SMALL_SIZE),
3280 );
3281 }
3282
3283 #[test]
3284 fn bool_large_single_column() {
3285 let values = Arc::new(
3286 [None, Some(true), Some(false)]
3287 .iter()
3288 .cycle()
3289 .copied()
3290 .take(200_000)
3291 .collect::<BooleanArray>(),
3292 );
3293 let schema = Schema::new(vec![Field::new("col", values.data_type().clone(), true)]);
3294 let expected_batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
3295 let file = tempfile::tempfile().unwrap();
3296
3297 let mut writer =
3298 ArrowWriter::try_new(file.try_clone().unwrap(), expected_batch.schema(), None)
3299 .expect("Unable to write file");
3300 writer.write(&expected_batch).unwrap();
3301 writer.close().unwrap();
3302 }
3303
3304 #[test]
3305 fn check_page_offset_index_with_nan() {
3306 let values = Arc::new(Float64Array::from(vec![f64::NAN; 10]));
3307 let schema = Schema::new(vec![Field::new("col", DataType::Float64, true)]);
3308 let batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
3309
3310 let mut out = Vec::with_capacity(1024);
3311 let mut writer =
3312 ArrowWriter::try_new(&mut out, batch.schema(), None).expect("Unable to write file");
3313 writer.write(&batch).unwrap();
3314 let file_meta_data = writer.close().unwrap();
3315 for row_group in file_meta_data.row_groups() {
3316 for column in row_group.columns() {
3317 assert!(column.offset_index_offset().is_some());
3318 assert!(column.offset_index_length().is_some());
3319 assert!(column.column_index_offset().is_none());
3320 assert!(column.column_index_length().is_none());
3321 }
3322 }
3323 }
3324
3325 #[test]
3326 fn i8_single_column() {
3327 required_and_optional::<Int8Array, _>(0..SMALL_SIZE as i8);
3328 }
3329
3330 #[test]
3331 fn i16_single_column() {
3332 required_and_optional::<Int16Array, _>(0..SMALL_SIZE as i16);
3333 }
3334
3335 #[test]
3336 fn i32_single_column() {
3337 required_and_optional::<Int32Array, _>(0..SMALL_SIZE as i32);
3338 }
3339
3340 #[test]
3341 fn i64_single_column() {
3342 required_and_optional::<Int64Array, _>(0..SMALL_SIZE as i64);
3343 }
3344
3345 #[test]
3346 fn u8_single_column() {
3347 required_and_optional::<UInt8Array, _>(0..SMALL_SIZE as u8);
3348 }
3349
3350 #[test]
3351 fn u16_single_column() {
3352 required_and_optional::<UInt16Array, _>(0..SMALL_SIZE as u16);
3353 }
3354
3355 #[test]
3356 fn u32_single_column() {
3357 required_and_optional::<UInt32Array, _>(0..SMALL_SIZE as u32);
3358 }
3359
3360 #[test]
3361 fn u64_single_column() {
3362 required_and_optional::<UInt64Array, _>(0..SMALL_SIZE as u64);
3363 }
3364
3365 #[test]
3366 fn f32_single_column() {
3367 required_and_optional::<Float32Array, _>((0..SMALL_SIZE).map(|i| i as f32));
3368 }
3369
3370 #[test]
3371 fn f64_single_column() {
3372 required_and_optional::<Float64Array, _>((0..SMALL_SIZE).map(|i| i as f64));
3373 }
3374
3375 #[test]
3380 fn timestamp_second_single_column() {
3381 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
3382 let values = Arc::new(TimestampSecondArray::from(raw_values));
3383
3384 one_column_roundtrip(values, false);
3385 }
3386
3387 #[test]
3388 fn timestamp_millisecond_single_column() {
3389 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
3390 let values = Arc::new(TimestampMillisecondArray::from(raw_values));
3391
3392 one_column_roundtrip(values, false);
3393 }
3394
3395 #[test]
3396 fn timestamp_microsecond_single_column() {
3397 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
3398 let values = Arc::new(TimestampMicrosecondArray::from(raw_values));
3399
3400 one_column_roundtrip(values, false);
3401 }
3402
3403 #[test]
3404 fn timestamp_nanosecond_single_column() {
3405 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
3406 let values = Arc::new(TimestampNanosecondArray::from(raw_values));
3407
3408 one_column_roundtrip(values, false);
3409 }
3410
3411 #[test]
3412 fn date32_single_column() {
3413 required_and_optional::<Date32Array, _>(0..SMALL_SIZE as i32);
3414 }
3415
3416 #[test]
3417 fn date64_single_column() {
3418 required_and_optional::<Date64Array, _>(
3420 (0..(SMALL_SIZE as i64 * 86400000)).step_by(86400000),
3421 );
3422 }
3423
3424 #[test]
3425 fn time32_second_single_column() {
3426 required_and_optional::<Time32SecondArray, _>(0..SMALL_SIZE as i32);
3427 }
3428
3429 #[test]
3430 fn time32_millisecond_single_column() {
3431 required_and_optional::<Time32MillisecondArray, _>(0..SMALL_SIZE as i32);
3432 }
3433
3434 #[test]
3435 fn time64_microsecond_single_column() {
3436 required_and_optional::<Time64MicrosecondArray, _>(0..SMALL_SIZE as i64);
3437 }
3438
3439 #[test]
3440 fn time64_nanosecond_single_column() {
3441 required_and_optional::<Time64NanosecondArray, _>(0..SMALL_SIZE as i64);
3442 }
3443
3444 #[test]
3445 fn duration_second_single_column() {
3446 required_and_optional::<DurationSecondArray, _>(0..SMALL_SIZE as i64);
3447 }
3448
3449 #[test]
3450 fn duration_millisecond_single_column() {
3451 required_and_optional::<DurationMillisecondArray, _>(0..SMALL_SIZE as i64);
3452 }
3453
3454 #[test]
3455 fn duration_microsecond_single_column() {
3456 required_and_optional::<DurationMicrosecondArray, _>(0..SMALL_SIZE as i64);
3457 }
3458
3459 #[test]
3460 fn duration_nanosecond_single_column() {
3461 required_and_optional::<DurationNanosecondArray, _>(0..SMALL_SIZE as i64);
3462 }
3463
3464 #[test]
3465 fn interval_year_month_single_column() {
3466 required_and_optional::<IntervalYearMonthArray, _>(0..SMALL_SIZE as i32);
3467 }
3468
3469 #[test]
3470 fn interval_day_time_single_column() {
3471 required_and_optional::<IntervalDayTimeArray, _>(vec![
3472 IntervalDayTime::new(0, 1),
3473 IntervalDayTime::new(0, 3),
3474 IntervalDayTime::new(3, -2),
3475 IntervalDayTime::new(-200, 4),
3476 ]);
3477 }
3478
3479 #[test]
3480 #[should_panic(
3481 expected = "Attempting to write an Arrow interval type MonthDayNano to parquet that is not yet implemented"
3482 )]
3483 fn interval_month_day_nano_single_column() {
3484 required_and_optional::<IntervalMonthDayNanoArray, _>(vec![
3485 IntervalMonthDayNano::new(0, 1, 5),
3486 IntervalMonthDayNano::new(0, 3, 2),
3487 IntervalMonthDayNano::new(3, -2, -5),
3488 IntervalMonthDayNano::new(-200, 4, -1),
3489 ]);
3490 }
3491
3492 #[test]
3493 fn binary_single_column() {
3494 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3495 let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3496 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3497
3498 values_required::<BinaryArray, _>(many_vecs_iter);
3500 }
3501
3502 #[test]
3503 fn binary_view_single_column() {
3504 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3505 let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3506 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3507
3508 values_required::<BinaryViewArray, _>(many_vecs_iter);
3510 }
3511
3512 #[test]
3513 fn i32_column_bloom_filter_at_end() {
3514 let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
3515 let mut options = RoundTripOptions::new(array, false);
3516 options.bloom_filter = true;
3517 options.bloom_filter_position = BloomFilterPosition::End;
3518
3519 let files = one_column_roundtrip_with_options(options);
3520 check_bloom_filter(
3521 files,
3522 "col".to_string(),
3523 (0..SMALL_SIZE as i32).collect(),
3524 (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3525 );
3526 }
3527
3528 #[test]
3529 fn i32_column_bloom_filter() {
3530 let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
3531 let mut options = RoundTripOptions::new(array, false);
3532 options.bloom_filter = true;
3533
3534 let files = one_column_roundtrip_with_options(options);
3535 check_bloom_filter(
3536 files,
3537 "col".to_string(),
3538 (0..SMALL_SIZE as i32).collect(),
3539 (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3540 );
3541 }
3542
3543 #[test]
3548 fn i32_column_bloom_filter_fixed_ndv() {
3549 let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
3550
3551 let mut options = RoundTripOptions::new(array.clone(), false);
3553 options.bloom_filter = true;
3554 options.bloom_filter_ndv = Some(1_000_000);
3555
3556 let files = one_column_roundtrip_with_options(options);
3557 check_bloom_filter(
3558 files,
3559 "col".to_string(),
3560 (0..SMALL_SIZE as i32).collect(),
3561 (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3562 );
3563
3564 let mut options = RoundTripOptions::new(array, false);
3566 options.bloom_filter = true;
3567 options.bloom_filter_ndv = Some(3);
3568
3569 let files = one_column_roundtrip_with_options(options);
3570 check_bloom_filter(
3571 files,
3572 "col".to_string(),
3573 (0..SMALL_SIZE as i32).collect(),
3574 (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3575 );
3576 }
3577
3578 #[test]
3579 fn binary_column_bloom_filter() {
3580 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3581 let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3582 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3583
3584 let array = Arc::new(BinaryArray::from_iter_values(many_vecs_iter));
3585 let mut options = RoundTripOptions::new(array, false);
3586 options.bloom_filter = true;
3587
3588 let files = one_column_roundtrip_with_options(options);
3589 check_bloom_filter(
3590 files,
3591 "col".to_string(),
3592 many_vecs,
3593 vec![vec![(SMALL_SIZE + 1) as u8]],
3594 );
3595 }
3596
3597 #[test]
3598 fn empty_string_null_column_bloom_filter() {
3599 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3600 let raw_strs = raw_values.iter().map(|s| s.as_str());
3601
3602 let array = Arc::new(StringArray::from_iter_values(raw_strs));
3603 let mut options = RoundTripOptions::new(array, false);
3604 options.bloom_filter = true;
3605
3606 let files = one_column_roundtrip_with_options(options);
3607
3608 let optional_raw_values: Vec<_> = raw_values
3609 .iter()
3610 .enumerate()
3611 .filter_map(|(i, v)| if i % 2 == 0 { None } else { Some(v.as_str()) })
3612 .collect();
3613 check_bloom_filter(files, "col".to_string(), optional_raw_values, vec![""]);
3615 }
3616
3617 #[test]
3618 fn large_binary_single_column() {
3619 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3620 let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3621 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3622
3623 values_required::<LargeBinaryArray, _>(many_vecs_iter);
3625 }
3626
3627 #[test]
3628 fn fixed_size_binary_single_column() {
3629 let mut builder = FixedSizeBinaryBuilder::new(4);
3630 builder.append_value(b"0123").unwrap();
3631 builder.append_null();
3632 builder.append_value(b"8910").unwrap();
3633 builder.append_value(b"1112").unwrap();
3634 let array = Arc::new(builder.finish());
3635
3636 one_column_roundtrip(array, true);
3637 }
3638
3639 #[test]
3640 fn string_single_column() {
3641 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3642 let raw_strs = raw_values.iter().map(|s| s.as_str());
3643
3644 required_and_optional::<StringArray, _>(raw_strs);
3645 }
3646
3647 #[test]
3648 fn large_string_single_column() {
3649 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3650 let raw_strs = raw_values.iter().map(|s| s.as_str());
3651
3652 required_and_optional::<LargeStringArray, _>(raw_strs);
3653 }
3654
3655 #[test]
3656 fn string_view_single_column() {
3657 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3658 let raw_strs = raw_values.iter().map(|s| s.as_str());
3659
3660 required_and_optional::<StringViewArray, _>(raw_strs);
3661 }
3662
3663 #[test]
3664 fn null_list_single_column() {
3665 let null_field = Field::new_list_field(DataType::Null, true);
3666 let list_field = Field::new("emptylist", DataType::List(Arc::new(null_field)), true);
3667
3668 let schema = Schema::new(vec![list_field]);
3669
3670 let a_values = NullArray::new(2);
3672 let a_value_offsets = arrow::buffer::Buffer::from([0, 0, 0, 2].to_byte_slice());
3673 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
3674 DataType::Null,
3675 true,
3676 ))))
3677 .len(3)
3678 .add_buffer(a_value_offsets)
3679 .null_bit_buffer(Some(Buffer::from([0b00000101])))
3680 .add_child_data(a_values.into_data())
3681 .build()
3682 .unwrap();
3683
3684 let a = ListArray::from(a_list_data);
3685
3686 assert!(a.is_valid(0));
3687 assert!(!a.is_valid(1));
3688 assert!(a.is_valid(2));
3689
3690 assert_eq!(a.value(0).len(), 0);
3691 assert_eq!(a.value(2).len(), 2);
3692 assert_eq!(a.value(2).logical_nulls().unwrap().null_count(), 2);
3693
3694 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
3695 roundtrip(batch, None);
3696 }
3697
3698 #[test]
3699 fn list_single_column() {
3700 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
3701 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
3702 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
3703 DataType::Int32,
3704 false,
3705 ))))
3706 .len(5)
3707 .add_buffer(a_value_offsets)
3708 .null_bit_buffer(Some(Buffer::from([0b00011011])))
3709 .add_child_data(a_values.into_data())
3710 .build()
3711 .unwrap();
3712
3713 assert_eq!(a_list_data.null_count(), 1);
3714
3715 let a = ListArray::from(a_list_data);
3716 let values = Arc::new(a);
3717
3718 one_column_roundtrip(values, true);
3719 }
3720
3721 #[test]
3722 fn large_list_single_column() {
3723 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
3724 let a_value_offsets = arrow::buffer::Buffer::from([0i64, 1, 3, 3, 6, 10].to_byte_slice());
3725 let a_list_data = ArrayData::builder(DataType::LargeList(Arc::new(Field::new(
3726 "large_item",
3727 DataType::Int32,
3728 true,
3729 ))))
3730 .len(5)
3731 .add_buffer(a_value_offsets)
3732 .add_child_data(a_values.into_data())
3733 .null_bit_buffer(Some(Buffer::from([0b00011011])))
3734 .build()
3735 .unwrap();
3736
3737 assert_eq!(a_list_data.null_count(), 1);
3739
3740 let a = LargeListArray::from(a_list_data);
3741 let values = Arc::new(a);
3742
3743 one_column_roundtrip(values, true);
3744 }
3745
3746 #[test]
3747 fn list_nested_nulls() {
3748 use arrow::datatypes::Int32Type;
3749 let data = vec![
3750 Some(vec![Some(1)]),
3751 Some(vec![Some(2), Some(3)]),
3752 None,
3753 Some(vec![Some(4), Some(5), None]),
3754 Some(vec![None]),
3755 Some(vec![Some(6), Some(7)]),
3756 ];
3757
3758 let list = ListArray::from_iter_primitive::<Int32Type, _, _>(data.clone());
3759 one_column_roundtrip(Arc::new(list), true);
3760
3761 let list = LargeListArray::from_iter_primitive::<Int32Type, _, _>(data);
3762 one_column_roundtrip(Arc::new(list), true);
3763 }
3764
3765 #[test]
3766 fn struct_single_column() {
3767 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
3768 let struct_field_a = Arc::new(Field::new("f", DataType::Int32, false));
3769 let s = StructArray::from(vec![(struct_field_a, Arc::new(a_values) as ArrayRef)]);
3770
3771 let values = Arc::new(s);
3772 one_column_roundtrip(values, false);
3773 }
3774
3775 #[test]
3776 fn list_and_map_coerced_names() {
3777 let list_field =
3779 Field::new_list("my_list", Field::new("item", DataType::Int32, false), false);
3780 let map_field = Field::new_map(
3781 "my_map",
3782 "entries",
3783 Field::new("keys", DataType::Int32, false),
3784 Field::new("values", DataType::Int32, true),
3785 false,
3786 true,
3787 );
3788
3789 let list_array = create_random_array(&list_field, 100, 0.0, 0.0).unwrap();
3790 let map_array = create_random_array(&map_field, 100, 0.0, 0.0).unwrap();
3791
3792 let arrow_schema = Arc::new(Schema::new(vec![list_field, map_field]));
3793
3794 let props = Some(WriterProperties::builder().set_coerce_types(true).build());
3796 let file = tempfile::tempfile().unwrap();
3797 let mut writer =
3798 ArrowWriter::try_new(file.try_clone().unwrap(), arrow_schema.clone(), props).unwrap();
3799
3800 let batch = RecordBatch::try_new(arrow_schema, vec![list_array, map_array]).unwrap();
3801 writer.write(&batch).unwrap();
3802 let file_metadata = writer.close().unwrap();
3803
3804 let schema = file_metadata.file_metadata().schema();
3805 let list_field = &schema.get_fields()[0].get_fields()[0];
3807 assert_eq!(list_field.get_fields()[0].name(), "element");
3808
3809 let map_field = &schema.get_fields()[1].get_fields()[0];
3810 assert_eq!(map_field.name(), "key_value");
3812 assert_eq!(map_field.get_fields()[0].name(), "key");
3814 assert_eq!(map_field.get_fields()[1].name(), "value");
3816
3817 let reader = SerializedFileReader::new(file).unwrap();
3819 let file_schema = reader.metadata().file_metadata().schema();
3820 let fields = file_schema.get_fields();
3821 let list_field = &fields[0].get_fields()[0];
3822 assert_eq!(list_field.get_fields()[0].name(), "element");
3823 let map_field = &fields[1].get_fields()[0];
3824 assert_eq!(map_field.name(), "key_value");
3825 assert_eq!(map_field.get_fields()[0].name(), "key");
3826 assert_eq!(map_field.get_fields()[1].name(), "value");
3827 }
3828
3829 #[test]
3830 fn fallback_flush_data_page() {
3831 let raw_values: Vec<_> = (0..MEDIUM_SIZE).map(|i| i.to_string()).collect();
3833 let values = Arc::new(StringArray::from(raw_values));
3834 let encodings = vec![
3835 Encoding::DELTA_BYTE_ARRAY,
3836 Encoding::DELTA_LENGTH_BYTE_ARRAY,
3837 ];
3838 let data_type = values.data_type().clone();
3839 let schema = Arc::new(Schema::new(vec![Field::new("col", data_type, false)]));
3840 let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3841
3842 let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
3843 let data_page_size_limit: usize = 32;
3844 let write_batch_size: usize = 16;
3845
3846 for encoding in &encodings {
3847 for row_group_size in row_group_sizes {
3848 let props = WriterProperties::builder()
3849 .set_writer_version(WriterVersion::PARQUET_2_0)
3850 .set_max_row_group_row_count(Some(row_group_size))
3851 .set_dictionary_enabled(false)
3852 .set_encoding(*encoding)
3853 .set_data_page_size_limit(data_page_size_limit)
3854 .set_write_batch_size(write_batch_size)
3855 .build();
3856
3857 roundtrip_opts_with_array_validation(&expected_batch, props, |a, b| {
3858 let string_array_a = StringArray::from(a.clone());
3859 let string_array_b = StringArray::from(b.clone());
3860 let vec_a: Vec<&str> = string_array_a.iter().map(|v| v.unwrap()).collect();
3861 let vec_b: Vec<&str> = string_array_b.iter().map(|v| v.unwrap()).collect();
3862 assert_eq!(
3863 vec_a, vec_b,
3864 "failed for encoder: {encoding:?} and row_group_size: {row_group_size:?}"
3865 );
3866 });
3867 }
3868 }
3869 }
3870
3871 #[test]
3872 fn arrow_writer_string_dictionary() {
3873 #[allow(deprecated)]
3875 let schema = Arc::new(Schema::new(vec![Field::new_dict(
3876 "dictionary",
3877 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3878 true,
3879 42,
3880 true,
3881 )]));
3882
3883 let d: Int32DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
3885 .iter()
3886 .copied()
3887 .collect();
3888
3889 one_column_roundtrip_with_schema(Arc::new(d), schema);
3891 }
3892
3893 #[test]
3894 fn arrow_writer_test_type_compatibility() {
3895 fn ensure_compatible_write<T1, T2>(array1: T1, array2: T2, expected_result: T1)
3896 where
3897 T1: Array + 'static,
3898 T2: Array + 'static,
3899 {
3900 let schema1 = Arc::new(Schema::new(vec![Field::new(
3901 "a",
3902 array1.data_type().clone(),
3903 false,
3904 )]));
3905
3906 let file = tempfile().unwrap();
3907 let mut writer =
3908 ArrowWriter::try_new(file.try_clone().unwrap(), schema1.clone(), None).unwrap();
3909
3910 let rb1 = RecordBatch::try_new(schema1.clone(), vec![Arc::new(array1)]).unwrap();
3911 writer.write(&rb1).unwrap();
3912
3913 let schema2 = Arc::new(Schema::new(vec![Field::new(
3914 "a",
3915 array2.data_type().clone(),
3916 false,
3917 )]));
3918 let rb2 = RecordBatch::try_new(schema2, vec![Arc::new(array2)]).unwrap();
3919 writer.write(&rb2).unwrap();
3920
3921 writer.close().unwrap();
3922
3923 let mut record_batch_reader =
3924 ParquetRecordBatchReader::try_new(file.try_clone().unwrap(), 1024).unwrap();
3925 let actual_batch = record_batch_reader.next().unwrap().unwrap();
3926
3927 let expected_batch =
3928 RecordBatch::try_new(schema1, vec![Arc::new(expected_result)]).unwrap();
3929 assert_eq!(actual_batch, expected_batch);
3930 }
3931
3932 ensure_compatible_write(
3935 DictionaryArray::new(
3936 UInt8Array::from_iter_values(vec![0]),
3937 Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3938 ),
3939 StringArray::from_iter_values(vec!["barquet"]),
3940 DictionaryArray::new(
3941 UInt8Array::from_iter_values(vec![0, 1]),
3942 Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3943 ),
3944 );
3945
3946 ensure_compatible_write(
3947 StringArray::from_iter_values(vec!["parquet"]),
3948 DictionaryArray::new(
3949 UInt8Array::from_iter_values(vec![0]),
3950 Arc::new(StringArray::from_iter_values(vec!["barquet"])),
3951 ),
3952 StringArray::from_iter_values(vec!["parquet", "barquet"]),
3953 );
3954
3955 ensure_compatible_write(
3958 DictionaryArray::new(
3959 UInt8Array::from_iter_values(vec![0]),
3960 Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3961 ),
3962 DictionaryArray::new(
3963 UInt16Array::from_iter_values(vec![0]),
3964 Arc::new(StringArray::from_iter_values(vec!["barquet"])),
3965 ),
3966 DictionaryArray::new(
3967 UInt8Array::from_iter_values(vec![0, 1]),
3968 Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3969 ),
3970 );
3971
3972 ensure_compatible_write(
3974 DictionaryArray::new(
3975 UInt8Array::from_iter_values(vec![0]),
3976 Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3977 ),
3978 DictionaryArray::new(
3979 UInt8Array::from_iter_values(vec![0]),
3980 Arc::new(LargeStringArray::from_iter_values(vec!["barquet"])),
3981 ),
3982 DictionaryArray::new(
3983 UInt8Array::from_iter_values(vec![0, 1]),
3984 Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3985 ),
3986 );
3987
3988 ensure_compatible_write(
3990 DictionaryArray::new(
3991 UInt8Array::from_iter_values(vec![0]),
3992 Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3993 ),
3994 LargeStringArray::from_iter_values(vec!["barquet"]),
3995 DictionaryArray::new(
3996 UInt8Array::from_iter_values(vec![0, 1]),
3997 Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3998 ),
3999 );
4000
4001 ensure_compatible_write(
4004 StringArray::from_iter_values(vec!["parquet"]),
4005 LargeStringArray::from_iter_values(vec!["barquet"]),
4006 StringArray::from_iter_values(vec!["parquet", "barquet"]),
4007 );
4008
4009 ensure_compatible_write(
4010 LargeStringArray::from_iter_values(vec!["parquet"]),
4011 StringArray::from_iter_values(vec!["barquet"]),
4012 LargeStringArray::from_iter_values(vec!["parquet", "barquet"]),
4013 );
4014
4015 ensure_compatible_write(
4016 StringArray::from_iter_values(vec!["parquet"]),
4017 StringViewArray::from_iter_values(vec!["barquet"]),
4018 StringArray::from_iter_values(vec!["parquet", "barquet"]),
4019 );
4020
4021 ensure_compatible_write(
4022 StringViewArray::from_iter_values(vec!["parquet"]),
4023 StringArray::from_iter_values(vec!["barquet"]),
4024 StringViewArray::from_iter_values(vec!["parquet", "barquet"]),
4025 );
4026
4027 ensure_compatible_write(
4028 LargeStringArray::from_iter_values(vec!["parquet"]),
4029 StringViewArray::from_iter_values(vec!["barquet"]),
4030 LargeStringArray::from_iter_values(vec!["parquet", "barquet"]),
4031 );
4032
4033 ensure_compatible_write(
4034 StringViewArray::from_iter_values(vec!["parquet"]),
4035 LargeStringArray::from_iter_values(vec!["barquet"]),
4036 StringViewArray::from_iter_values(vec!["parquet", "barquet"]),
4037 );
4038
4039 ensure_compatible_write(
4042 BinaryArray::from_iter_values(vec![b"parquet"]),
4043 LargeBinaryArray::from_iter_values(vec![b"barquet"]),
4044 BinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
4045 );
4046
4047 ensure_compatible_write(
4048 LargeBinaryArray::from_iter_values(vec![b"parquet"]),
4049 BinaryArray::from_iter_values(vec![b"barquet"]),
4050 LargeBinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
4051 );
4052
4053 ensure_compatible_write(
4054 BinaryArray::from_iter_values(vec![b"parquet"]),
4055 BinaryViewArray::from_iter_values(vec![b"barquet"]),
4056 BinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
4057 );
4058
4059 ensure_compatible_write(
4060 BinaryViewArray::from_iter_values(vec![b"parquet"]),
4061 BinaryArray::from_iter_values(vec![b"barquet"]),
4062 BinaryViewArray::from_iter_values(vec![b"parquet", b"barquet"]),
4063 );
4064
4065 ensure_compatible_write(
4066 BinaryViewArray::from_iter_values(vec![b"parquet"]),
4067 LargeBinaryArray::from_iter_values(vec![b"barquet"]),
4068 BinaryViewArray::from_iter_values(vec![b"parquet", b"barquet"]),
4069 );
4070
4071 ensure_compatible_write(
4072 LargeBinaryArray::from_iter_values(vec![b"parquet"]),
4073 BinaryViewArray::from_iter_values(vec![b"barquet"]),
4074 LargeBinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
4075 );
4076
4077 let list_field_metadata = HashMap::from_iter(vec![(
4080 PARQUET_FIELD_ID_META_KEY.to_string(),
4081 "1".to_string(),
4082 )]);
4083 let list_field = Field::new_list_field(DataType::Int32, false);
4084
4085 let values1 = Arc::new(Int32Array::from(vec![0, 1, 2, 3, 4]));
4086 let offsets1 = OffsetBuffer::new(vec![0, 2, 5].into());
4087
4088 let values2 = Arc::new(Int32Array::from(vec![5, 6, 7, 8, 9]));
4089 let offsets2 = OffsetBuffer::new(vec![0, 3, 5].into());
4090
4091 let values_expected = Arc::new(Int32Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]));
4092 let offsets_expected = OffsetBuffer::new(vec![0, 2, 5, 8, 10].into());
4093
4094 ensure_compatible_write(
4095 ListArray::try_new(
4097 Arc::new(
4098 list_field
4099 .clone()
4100 .with_metadata(list_field_metadata.clone()),
4101 ),
4102 offsets1,
4103 values1,
4104 None,
4105 )
4106 .unwrap(),
4107 ListArray::try_new(Arc::new(list_field.clone()), offsets2, values2, None).unwrap(),
4109 ListArray::try_new(
4111 Arc::new(
4112 list_field
4113 .clone()
4114 .with_metadata(list_field_metadata.clone()),
4115 ),
4116 offsets_expected,
4117 values_expected,
4118 None,
4119 )
4120 .unwrap(),
4121 );
4122 }
4123
4124 #[test]
4125 fn arrow_writer_primitive_dictionary() {
4126 #[allow(deprecated)]
4128 let schema = Arc::new(Schema::new(vec![Field::new_dict(
4129 "dictionary",
4130 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::UInt32)),
4131 true,
4132 42,
4133 true,
4134 )]));
4135
4136 let mut builder = PrimitiveDictionaryBuilder::<UInt8Type, UInt32Type>::new();
4138 builder.append(12345678).unwrap();
4139 builder.append_null();
4140 builder.append(22345678).unwrap();
4141 builder.append(12345678).unwrap();
4142 let d = builder.finish();
4143
4144 one_column_roundtrip_with_schema(Arc::new(d), schema);
4145 }
4146
4147 #[test]
4148 fn arrow_writer_decimal32_dictionary() {
4149 let integers = vec![12345, 56789, 34567];
4150
4151 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
4152
4153 let values = Decimal32Array::from(integers.clone())
4154 .with_precision_and_scale(5, 2)
4155 .unwrap();
4156
4157 let array = DictionaryArray::new(keys, Arc::new(values));
4158 one_column_roundtrip(Arc::new(array.clone()), true);
4159
4160 let values = Decimal32Array::from(integers)
4161 .with_precision_and_scale(9, 2)
4162 .unwrap();
4163
4164 let array = array.with_values(Arc::new(values));
4165 one_column_roundtrip(Arc::new(array), true);
4166 }
4167
4168 #[test]
4169 fn arrow_writer_decimal64_dictionary() {
4170 let integers = vec![12345, 56789, 34567];
4171
4172 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
4173
4174 let values = Decimal64Array::from(integers.clone())
4175 .with_precision_and_scale(5, 2)
4176 .unwrap();
4177
4178 let array = DictionaryArray::new(keys, Arc::new(values));
4179 one_column_roundtrip(Arc::new(array.clone()), true);
4180
4181 let values = Decimal64Array::from(integers)
4182 .with_precision_and_scale(12, 2)
4183 .unwrap();
4184
4185 let array = array.with_values(Arc::new(values));
4186 one_column_roundtrip(Arc::new(array), true);
4187 }
4188
4189 #[test]
4190 fn arrow_writer_decimal128_dictionary() {
4191 let integers = vec![12345, 56789, 34567];
4192
4193 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
4194
4195 let values = Decimal128Array::from(integers.clone())
4196 .with_precision_and_scale(5, 2)
4197 .unwrap();
4198
4199 let array = DictionaryArray::new(keys, Arc::new(values));
4200 one_column_roundtrip(Arc::new(array.clone()), true);
4201
4202 let values = Decimal128Array::from(integers)
4203 .with_precision_and_scale(12, 2)
4204 .unwrap();
4205
4206 let array = array.with_values(Arc::new(values));
4207 one_column_roundtrip(Arc::new(array), true);
4208 }
4209
4210 #[test]
4211 fn arrow_writer_decimal256_dictionary() {
4212 let integers = vec![
4213 i256::from_i128(12345),
4214 i256::from_i128(56789),
4215 i256::from_i128(34567),
4216 ];
4217
4218 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
4219
4220 let values = Decimal256Array::from(integers.clone())
4221 .with_precision_and_scale(5, 2)
4222 .unwrap();
4223
4224 let array = DictionaryArray::new(keys, Arc::new(values));
4225 one_column_roundtrip(Arc::new(array.clone()), true);
4226
4227 let values = Decimal256Array::from(integers)
4228 .with_precision_and_scale(12, 2)
4229 .unwrap();
4230
4231 let array = array.with_values(Arc::new(values));
4232 one_column_roundtrip(Arc::new(array), true);
4233 }
4234
4235 #[test]
4236 fn arrow_writer_string_dictionary_unsigned_index() {
4237 #[allow(deprecated)]
4239 let schema = Arc::new(Schema::new(vec![Field::new_dict(
4240 "dictionary",
4241 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
4242 true,
4243 42,
4244 true,
4245 )]));
4246
4247 let d: UInt8DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
4249 .iter()
4250 .copied()
4251 .collect();
4252
4253 one_column_roundtrip_with_schema(Arc::new(d), schema);
4254 }
4255
4256 #[test]
4257 fn u32_min_max() {
4258 let src = [
4260 u32::MIN,
4261 u32::MIN + 1,
4262 (i32::MAX as u32) - 1,
4263 i32::MAX as u32,
4264 (i32::MAX as u32) + 1,
4265 u32::MAX - 1,
4266 u32::MAX,
4267 ];
4268 let values = Arc::new(UInt32Array::from_iter_values(src.iter().cloned()));
4269 let files = one_column_roundtrip(values, false);
4270
4271 for file in files {
4272 let reader = SerializedFileReader::new(file).unwrap();
4274 let metadata = reader.metadata();
4275
4276 let mut row_offset = 0;
4277 for row_group in metadata.row_groups() {
4278 assert_eq!(row_group.num_columns(), 1);
4279 let column = row_group.column(0);
4280
4281 let num_values = column.num_values() as usize;
4282 let src_slice = &src[row_offset..row_offset + num_values];
4283 row_offset += column.num_values() as usize;
4284
4285 let stats = column.statistics().unwrap();
4286 if let Statistics::Int32(stats) = stats {
4287 assert_eq!(
4288 *stats.min_opt().unwrap() as u32,
4289 *src_slice.iter().min().unwrap()
4290 );
4291 assert_eq!(
4292 *stats.max_opt().unwrap() as u32,
4293 *src_slice.iter().max().unwrap()
4294 );
4295 } else {
4296 panic!("Statistics::Int32 missing")
4297 }
4298 }
4299 }
4300 }
4301
4302 #[test]
4303 fn u64_min_max() {
4304 let src = [
4306 u64::MIN,
4307 u64::MIN + 1,
4308 (i64::MAX as u64) - 1,
4309 i64::MAX as u64,
4310 (i64::MAX as u64) + 1,
4311 u64::MAX - 1,
4312 u64::MAX,
4313 ];
4314 let values = Arc::new(UInt64Array::from_iter_values(src.iter().cloned()));
4315 let files = one_column_roundtrip(values, false);
4316
4317 for file in files {
4318 let reader = SerializedFileReader::new(file).unwrap();
4320 let metadata = reader.metadata();
4321
4322 let mut row_offset = 0;
4323 for row_group in metadata.row_groups() {
4324 assert_eq!(row_group.num_columns(), 1);
4325 let column = row_group.column(0);
4326
4327 let num_values = column.num_values() as usize;
4328 let src_slice = &src[row_offset..row_offset + num_values];
4329 row_offset += column.num_values() as usize;
4330
4331 let stats = column.statistics().unwrap();
4332 if let Statistics::Int64(stats) = stats {
4333 assert_eq!(
4334 *stats.min_opt().unwrap() as u64,
4335 *src_slice.iter().min().unwrap()
4336 );
4337 assert_eq!(
4338 *stats.max_opt().unwrap() as u64,
4339 *src_slice.iter().max().unwrap()
4340 );
4341 } else {
4342 panic!("Statistics::Int64 missing")
4343 }
4344 }
4345 }
4346 }
4347
4348 #[test]
4349 fn statistics_null_counts_only_nulls() {
4350 let values = Arc::new(UInt64Array::from(vec![None, None]));
4352 let files = one_column_roundtrip(values, true);
4353
4354 for file in files {
4355 let reader = SerializedFileReader::new(file).unwrap();
4357 let metadata = reader.metadata();
4358 assert_eq!(metadata.num_row_groups(), 1);
4359 let row_group = metadata.row_group(0);
4360 assert_eq!(row_group.num_columns(), 1);
4361 let column = row_group.column(0);
4362 let stats = column.statistics().unwrap();
4363 assert_eq!(stats.null_count_opt(), Some(2));
4364 }
4365 }
4366
4367 #[test]
4368 fn test_list_of_struct_roundtrip() {
4369 let int_field = Field::new("a", DataType::Int32, true);
4371 let int_field2 = Field::new("b", DataType::Int32, true);
4372
4373 let int_builder = Int32Builder::with_capacity(10);
4374 let int_builder2 = Int32Builder::with_capacity(10);
4375
4376 let struct_builder = StructBuilder::new(
4377 vec![int_field, int_field2],
4378 vec![Box::new(int_builder), Box::new(int_builder2)],
4379 );
4380 let mut list_builder = ListBuilder::new(struct_builder);
4381
4382 let values = list_builder.values();
4387 values
4388 .field_builder::<Int32Builder>(0)
4389 .unwrap()
4390 .append_value(1);
4391 values
4392 .field_builder::<Int32Builder>(1)
4393 .unwrap()
4394 .append_value(2);
4395 values.append(true);
4396 list_builder.append(true);
4397
4398 list_builder.append(true);
4400
4401 list_builder.append(false);
4403
4404 let values = list_builder.values();
4406 values
4407 .field_builder::<Int32Builder>(0)
4408 .unwrap()
4409 .append_null();
4410 values
4411 .field_builder::<Int32Builder>(1)
4412 .unwrap()
4413 .append_null();
4414 values.append(false);
4415 values
4416 .field_builder::<Int32Builder>(0)
4417 .unwrap()
4418 .append_null();
4419 values
4420 .field_builder::<Int32Builder>(1)
4421 .unwrap()
4422 .append_null();
4423 values.append(false);
4424 list_builder.append(true);
4425
4426 let values = list_builder.values();
4428 values
4429 .field_builder::<Int32Builder>(0)
4430 .unwrap()
4431 .append_null();
4432 values
4433 .field_builder::<Int32Builder>(1)
4434 .unwrap()
4435 .append_value(3);
4436 values.append(true);
4437 list_builder.append(true);
4438
4439 let values = list_builder.values();
4441 values
4442 .field_builder::<Int32Builder>(0)
4443 .unwrap()
4444 .append_value(2);
4445 values
4446 .field_builder::<Int32Builder>(1)
4447 .unwrap()
4448 .append_null();
4449 values.append(true);
4450 list_builder.append(true);
4451
4452 let array = Arc::new(list_builder.finish());
4453
4454 one_column_roundtrip(array, true);
4455 }
4456
4457 fn row_group_sizes(metadata: &ParquetMetaData) -> Vec<i64> {
4458 metadata.row_groups().iter().map(|x| x.num_rows()).collect()
4459 }
4460
4461 #[test]
4462 fn test_aggregates_records() {
4463 let arrays = [
4464 Int32Array::from((0..100).collect::<Vec<_>>()),
4465 Int32Array::from((0..50).collect::<Vec<_>>()),
4466 Int32Array::from((200..500).collect::<Vec<_>>()),
4467 ];
4468
4469 let schema = Arc::new(Schema::new(vec![Field::new(
4470 "int",
4471 ArrowDataType::Int32,
4472 false,
4473 )]));
4474
4475 let file = tempfile::tempfile().unwrap();
4476
4477 let props = WriterProperties::builder()
4478 .set_max_row_group_row_count(Some(200))
4479 .build();
4480
4481 let mut writer =
4482 ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
4483
4484 for array in arrays {
4485 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
4486 writer.write(&batch).unwrap();
4487 }
4488
4489 writer.close().unwrap();
4490
4491 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4492 assert_eq!(&row_group_sizes(builder.metadata()), &[200, 200, 50]);
4493
4494 let batches = builder
4495 .with_batch_size(100)
4496 .build()
4497 .unwrap()
4498 .collect::<ArrowResult<Vec<_>>>()
4499 .unwrap();
4500
4501 assert_eq!(batches.len(), 5);
4502 assert!(batches.iter().all(|x| x.num_columns() == 1));
4503
4504 let batch_sizes: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
4505
4506 assert_eq!(&batch_sizes, &[100, 100, 100, 100, 50]);
4507
4508 let values: Vec<_> = batches
4509 .iter()
4510 .flat_map(|x| {
4511 x.column(0)
4512 .as_any()
4513 .downcast_ref::<Int32Array>()
4514 .unwrap()
4515 .values()
4516 .iter()
4517 .cloned()
4518 })
4519 .collect();
4520
4521 let expected_values: Vec<_> = [0..100, 0..50, 200..500].into_iter().flatten().collect();
4522 assert_eq!(&values, &expected_values)
4523 }
4524
4525 #[test]
4526 fn complex_aggregate() {
4527 let field_a = Arc::new(Field::new("leaf_a", DataType::Int32, false));
4529 let field_b = Arc::new(Field::new("leaf_b", DataType::Int32, true));
4530 let struct_a = Arc::new(Field::new(
4531 "struct_a",
4532 DataType::Struct(vec![field_a.clone(), field_b.clone()].into()),
4533 true,
4534 ));
4535
4536 let list_a = Arc::new(Field::new("list", DataType::List(struct_a), true));
4537 let struct_b = Arc::new(Field::new(
4538 "struct_b",
4539 DataType::Struct(vec![list_a.clone()].into()),
4540 false,
4541 ));
4542
4543 let schema = Arc::new(Schema::new(vec![struct_b]));
4544
4545 let field_a_array = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
4547 let field_b_array =
4548 Int32Array::from_iter(vec![Some(1), None, Some(2), None, None, Some(6)]);
4549
4550 let struct_a_array = StructArray::from(vec![
4551 (field_a.clone(), Arc::new(field_a_array) as ArrayRef),
4552 (field_b.clone(), Arc::new(field_b_array) as ArrayRef),
4553 ]);
4554
4555 let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
4556 .len(5)
4557 .add_buffer(Buffer::from_iter(vec![
4558 0_i32, 1_i32, 1_i32, 3_i32, 3_i32, 5_i32,
4559 ]))
4560 .null_bit_buffer(Some(Buffer::from_iter(vec![
4561 true, false, true, false, true,
4562 ])))
4563 .child_data(vec![struct_a_array.into_data()])
4564 .build()
4565 .unwrap();
4566
4567 let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
4568 let struct_b_array = StructArray::from(vec![(list_a.clone(), list_a_array)]);
4569
4570 let batch1 =
4571 RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
4572 .unwrap();
4573
4574 let field_a_array = Int32Array::from(vec![6, 7, 8, 9, 10]);
4575 let field_b_array = Int32Array::from_iter(vec![None, None, None, Some(1), None]);
4576
4577 let struct_a_array = StructArray::from(vec![
4578 (field_a, Arc::new(field_a_array) as ArrayRef),
4579 (field_b, Arc::new(field_b_array) as ArrayRef),
4580 ]);
4581
4582 let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
4583 .len(2)
4584 .add_buffer(Buffer::from_iter(vec![0_i32, 4_i32, 5_i32]))
4585 .child_data(vec![struct_a_array.into_data()])
4586 .build()
4587 .unwrap();
4588
4589 let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
4590 let struct_b_array = StructArray::from(vec![(list_a, list_a_array)]);
4591
4592 let batch2 =
4593 RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
4594 .unwrap();
4595
4596 let batches = &[batch1, batch2];
4597
4598 let expected = r#"
4601 +-------------------------------------------------------------------------------------------------------+
4602 | struct_b |
4603 +-------------------------------------------------------------------------------------------------------+
4604 | {list: [{leaf_a: 1, leaf_b: 1}]} |
4605 | {list: } |
4606 | {list: [{leaf_a: 2, leaf_b: }, {leaf_a: 3, leaf_b: 2}]} |
4607 | {list: } |
4608 | {list: [{leaf_a: 4, leaf_b: }, {leaf_a: 5, leaf_b: }]} |
4609 | {list: [{leaf_a: 6, leaf_b: }, {leaf_a: 7, leaf_b: }, {leaf_a: 8, leaf_b: }, {leaf_a: 9, leaf_b: 1}]} |
4610 | {list: [{leaf_a: 10, leaf_b: }]} |
4611 +-------------------------------------------------------------------------------------------------------+
4612 "#.trim().split('\n').map(|x| x.trim()).collect::<Vec<_>>().join("\n");
4613
4614 let actual = pretty_format_batches(batches).unwrap().to_string();
4615 assert_eq!(actual, expected);
4616
4617 let file = tempfile::tempfile().unwrap();
4619 let props = WriterProperties::builder()
4620 .set_max_row_group_row_count(Some(6))
4621 .build();
4622
4623 let mut writer =
4624 ArrowWriter::try_new(file.try_clone().unwrap(), schema, Some(props)).unwrap();
4625
4626 for batch in batches {
4627 writer.write(batch).unwrap();
4628 }
4629 writer.close().unwrap();
4630
4631 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4636 assert_eq!(&row_group_sizes(builder.metadata()), &[6, 1]);
4637
4638 let batches = builder
4639 .with_batch_size(2)
4640 .build()
4641 .unwrap()
4642 .collect::<ArrowResult<Vec<_>>>()
4643 .unwrap();
4644
4645 assert_eq!(batches.len(), 4);
4646 let batch_counts: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
4647 assert_eq!(&batch_counts, &[2, 2, 2, 1]);
4648
4649 let actual = pretty_format_batches(&batches).unwrap().to_string();
4650 assert_eq!(actual, expected);
4651 }
4652
4653 #[test]
4654 fn test_arrow_writer_metadata() {
4655 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
4656 let file_schema = batch_schema.clone().with_metadata(
4657 vec![("foo".to_string(), "bar".to_string())]
4658 .into_iter()
4659 .collect(),
4660 );
4661
4662 let batch = RecordBatch::try_new(
4663 Arc::new(batch_schema),
4664 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4665 )
4666 .unwrap();
4667
4668 let mut buf = Vec::with_capacity(1024);
4669 let mut writer = ArrowWriter::try_new(&mut buf, Arc::new(file_schema), None).unwrap();
4670 writer.write(&batch).unwrap();
4671 writer.close().unwrap();
4672 }
4673
4674 #[test]
4675 fn test_arrow_writer_nullable() {
4676 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
4677 let file_schema = Schema::new(vec![Field::new("int32", DataType::Int32, true)]);
4678 let file_schema = Arc::new(file_schema);
4679
4680 let batch = RecordBatch::try_new(
4681 Arc::new(batch_schema),
4682 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4683 )
4684 .unwrap();
4685
4686 let mut buf = Vec::with_capacity(1024);
4687 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
4688 writer.write(&batch).unwrap();
4689 writer.close().unwrap();
4690
4691 let mut read = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
4692 let back = read.next().unwrap().unwrap();
4693 assert_eq!(back.schema(), file_schema);
4694 assert_ne!(back.schema(), batch.schema());
4695 assert_eq!(back.column(0).as_ref(), batch.column(0).as_ref());
4696 }
4697
4698 #[test]
4699 fn in_progress_accounting() {
4700 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
4702
4703 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
4705
4706 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
4708
4709 let mut writer = ArrowWriter::try_new(vec![], batch.schema(), None).unwrap();
4710
4711 assert_eq!(writer.in_progress_size(), 0);
4713 assert_eq!(writer.in_progress_rows(), 0);
4714 assert_eq!(writer.memory_size(), 0);
4715 assert_eq!(writer.bytes_written(), 4); writer.write(&batch).unwrap();
4717
4718 let initial_size = writer.in_progress_size();
4720 assert!(initial_size > 0);
4721 assert_eq!(writer.in_progress_rows(), 5);
4722 let initial_memory = writer.memory_size();
4723 assert!(initial_memory > 0);
4724 assert!(
4726 initial_size <= initial_memory,
4727 "{initial_size} <= {initial_memory}"
4728 );
4729
4730 writer.write(&batch).unwrap();
4732 assert!(writer.in_progress_size() > initial_size);
4733 assert_eq!(writer.in_progress_rows(), 10);
4734 assert!(writer.memory_size() > initial_memory);
4735 assert!(
4736 writer.in_progress_size() <= writer.memory_size(),
4737 "in_progress_size {} <= memory_size {}",
4738 writer.in_progress_size(),
4739 writer.memory_size()
4740 );
4741
4742 let pre_flush_bytes_written = writer.bytes_written();
4744 writer.flush().unwrap();
4745 assert_eq!(writer.in_progress_size(), 0);
4746 assert_eq!(writer.memory_size(), 0);
4747 assert!(writer.bytes_written() > pre_flush_bytes_written);
4748
4749 writer.close().unwrap();
4750 }
4751
4752 #[test]
4753 fn test_writer_all_null() {
4754 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
4755 let b = Int32Array::new(vec![0; 5].into(), Some(NullBuffer::new_null(5)));
4756 let batch = RecordBatch::try_from_iter(vec![
4757 ("a", Arc::new(a) as ArrayRef),
4758 ("b", Arc::new(b) as ArrayRef),
4759 ])
4760 .unwrap();
4761
4762 let mut buf = Vec::with_capacity(1024);
4763 let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), None).unwrap();
4764 writer.write(&batch).unwrap();
4765 writer.close().unwrap();
4766
4767 let bytes = Bytes::from(buf);
4768 let options = ReadOptionsBuilder::new().with_page_index().build();
4769 let reader = SerializedFileReader::new_with_options(bytes, options).unwrap();
4770 let index = reader.metadata().offset_index().unwrap();
4771
4772 assert_eq!(index.len(), 1);
4773 assert_eq!(index[0].len(), 2); assert_eq!(index[0][0].page_locations().len(), 1); assert_eq!(index[0][1].page_locations().len(), 1); }
4777
4778 #[test]
4779 fn test_disabled_statistics_with_page() {
4780 let file_schema = Schema::new(vec![
4781 Field::new("a", DataType::Utf8, true),
4782 Field::new("b", DataType::Utf8, true),
4783 ]);
4784 let file_schema = Arc::new(file_schema);
4785
4786 let batch = RecordBatch::try_new(
4787 file_schema.clone(),
4788 vec![
4789 Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
4790 Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
4791 ],
4792 )
4793 .unwrap();
4794
4795 let props = WriterProperties::builder()
4796 .set_statistics_enabled(EnabledStatistics::None)
4797 .set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
4798 .build();
4799
4800 let mut buf = Vec::with_capacity(1024);
4801 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
4802 writer.write(&batch).unwrap();
4803
4804 let metadata = writer.close().unwrap();
4805 assert_eq!(metadata.num_row_groups(), 1);
4806 let row_group = metadata.row_group(0);
4807 assert_eq!(row_group.num_columns(), 2);
4808 assert!(row_group.column(0).offset_index_offset().is_some());
4810 assert!(row_group.column(0).column_index_offset().is_some());
4811 assert!(row_group.column(1).offset_index_offset().is_some());
4813 assert!(row_group.column(1).column_index_offset().is_none());
4814
4815 let options = ReadOptionsBuilder::new().with_page_index().build();
4816 let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
4817
4818 let row_group = reader.get_row_group(0).unwrap();
4819 let a_col = row_group.metadata().column(0);
4820 let b_col = row_group.metadata().column(1);
4821
4822 if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
4824 let min = byte_array_stats.min_opt().unwrap();
4825 let max = byte_array_stats.max_opt().unwrap();
4826
4827 assert_eq!(min.as_bytes(), b"a");
4828 assert_eq!(max.as_bytes(), b"d");
4829 } else {
4830 panic!("expecting Statistics::ByteArray");
4831 }
4832
4833 assert!(b_col.statistics().is_none());
4835
4836 let offset_index = reader.metadata().offset_index().unwrap();
4837 assert_eq!(offset_index.len(), 1); assert_eq!(offset_index[0].len(), 2); let column_index = reader.metadata().column_index().unwrap();
4841 assert_eq!(column_index.len(), 1); assert_eq!(column_index[0].len(), 2); let a_idx = &column_index[0][0];
4845 assert!(
4846 matches!(a_idx, ColumnIndexMetaData::BYTE_ARRAY(_)),
4847 "{a_idx:?}"
4848 );
4849 let b_idx = &column_index[0][1];
4850 assert!(matches!(b_idx, ColumnIndexMetaData::NONE), "{b_idx:?}");
4851 }
4852
4853 #[test]
4854 fn test_disabled_statistics_with_chunk() {
4855 let file_schema = Schema::new(vec![
4856 Field::new("a", DataType::Utf8, true),
4857 Field::new("b", DataType::Utf8, true),
4858 ]);
4859 let file_schema = Arc::new(file_schema);
4860
4861 let batch = RecordBatch::try_new(
4862 file_schema.clone(),
4863 vec![
4864 Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
4865 Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
4866 ],
4867 )
4868 .unwrap();
4869
4870 let props = WriterProperties::builder()
4871 .set_statistics_enabled(EnabledStatistics::None)
4872 .set_column_statistics_enabled("a".into(), EnabledStatistics::Chunk)
4873 .build();
4874
4875 let mut buf = Vec::with_capacity(1024);
4876 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
4877 writer.write(&batch).unwrap();
4878
4879 let metadata = writer.close().unwrap();
4880 assert_eq!(metadata.num_row_groups(), 1);
4881 let row_group = metadata.row_group(0);
4882 assert_eq!(row_group.num_columns(), 2);
4883 assert!(row_group.column(0).offset_index_offset().is_some());
4885 assert!(row_group.column(0).column_index_offset().is_none());
4886 assert!(row_group.column(1).offset_index_offset().is_some());
4888 assert!(row_group.column(1).column_index_offset().is_none());
4889
4890 let options = ReadOptionsBuilder::new().with_page_index().build();
4891 let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
4892
4893 let row_group = reader.get_row_group(0).unwrap();
4894 let a_col = row_group.metadata().column(0);
4895 let b_col = row_group.metadata().column(1);
4896
4897 if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
4899 let min = byte_array_stats.min_opt().unwrap();
4900 let max = byte_array_stats.max_opt().unwrap();
4901
4902 assert_eq!(min.as_bytes(), b"a");
4903 assert_eq!(max.as_bytes(), b"d");
4904 } else {
4905 panic!("expecting Statistics::ByteArray");
4906 }
4907
4908 assert!(b_col.statistics().is_none());
4910
4911 let column_index = reader.metadata().column_index().unwrap();
4912 assert_eq!(column_index.len(), 1); assert_eq!(column_index[0].len(), 2); let a_idx = &column_index[0][0];
4916 assert!(matches!(a_idx, ColumnIndexMetaData::NONE), "{a_idx:?}");
4917 let b_idx = &column_index[0][1];
4918 assert!(matches!(b_idx, ColumnIndexMetaData::NONE), "{b_idx:?}");
4919 }
4920
4921 #[test]
4922 fn test_arrow_writer_skip_metadata() {
4923 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
4924 let file_schema = Arc::new(batch_schema.clone());
4925
4926 let batch = RecordBatch::try_new(
4927 Arc::new(batch_schema),
4928 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4929 )
4930 .unwrap();
4931 let skip_options = ArrowWriterOptions::new().with_skip_arrow_metadata(true);
4932
4933 let mut buf = Vec::with_capacity(1024);
4934 let mut writer =
4935 ArrowWriter::try_new_with_options(&mut buf, file_schema.clone(), skip_options).unwrap();
4936 writer.write(&batch).unwrap();
4937 writer.close().unwrap();
4938
4939 let bytes = Bytes::from(buf);
4940 let reader_builder = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
4941 assert_eq!(file_schema, *reader_builder.schema());
4942 if let Some(key_value_metadata) = reader_builder
4943 .metadata()
4944 .file_metadata()
4945 .key_value_metadata()
4946 {
4947 assert!(
4948 !key_value_metadata
4949 .iter()
4950 .any(|kv| kv.key.as_str() == ARROW_SCHEMA_META_KEY)
4951 );
4952 }
4953 }
4954
4955 #[test]
4956 fn test_arrow_writer_skip_path_in_schema() {
4957 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
4958 let file_schema = Arc::new(batch_schema.clone());
4959
4960 let batch = RecordBatch::try_new(
4961 Arc::new(batch_schema),
4962 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4963 )
4964 .unwrap();
4965
4966 let skip_options = ArrowWriterOptions::new();
4968
4969 let mut buf = Vec::with_capacity(1024);
4970 let mut writer =
4971 ArrowWriter::try_new_with_options(&mut buf, file_schema.clone(), skip_options).unwrap();
4972 writer.write(&batch).unwrap();
4973 writer.close().unwrap();
4974
4975 let skip_options = ArrowWriterOptions::new().with_properties(
4977 WriterProperties::builder()
4978 .set_write_path_in_schema(false)
4979 .build(),
4980 );
4981
4982 let mut buf2 = Vec::with_capacity(1024);
4983 let mut writer =
4984 ArrowWriter::try_new_with_options(&mut buf2, file_schema.clone(), skip_options)
4985 .unwrap();
4986 writer.write(&batch).unwrap();
4987 writer.close().unwrap();
4988
4989 assert!(buf.len() > buf2.len());
4991 }
4992
4993 #[test]
4994 fn mismatched_schemas() {
4995 let batch_schema = Schema::new(vec![Field::new("count", DataType::Int32, false)]);
4996 let file_schema = Arc::new(Schema::new(vec![Field::new(
4997 "temperature",
4998 DataType::Float64,
4999 false,
5000 )]));
5001
5002 let batch = RecordBatch::try_new(
5003 Arc::new(batch_schema),
5004 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
5005 )
5006 .unwrap();
5007
5008 let mut buf = Vec::with_capacity(1024);
5009 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
5010
5011 let err = writer.write(&batch).unwrap_err().to_string();
5012 assert_eq!(
5013 err,
5014 "Arrow: Incompatible type. Field 'temperature' has type Float64, array has type Int32"
5015 );
5016 }
5017
5018 #[test]
5019 fn test_roundtrip_empty_schema() {
5021 let empty_batch = RecordBatch::try_new_with_options(
5023 Arc::new(Schema::empty()),
5024 vec![],
5025 &RecordBatchOptions::default().with_row_count(Some(0)),
5026 )
5027 .unwrap();
5028
5029 let mut parquet_bytes: Vec<u8> = Vec::new();
5031 let mut writer =
5032 ArrowWriter::try_new(&mut parquet_bytes, empty_batch.schema(), None).unwrap();
5033 writer.write(&empty_batch).unwrap();
5034 writer.close().unwrap();
5035
5036 let bytes = Bytes::from(parquet_bytes);
5038 let reader = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
5039 assert_eq!(reader.schema(), &empty_batch.schema());
5040 let batches: Vec<_> = reader
5041 .build()
5042 .unwrap()
5043 .collect::<ArrowResult<Vec<_>>>()
5044 .unwrap();
5045 assert_eq!(batches.len(), 0);
5046 }
5047
5048 #[test]
5049 fn test_page_stats_not_written_by_default() {
5050 let string_field = Field::new("a", DataType::Utf8, false);
5051 let schema = Schema::new(vec![string_field]);
5052 let raw_string_values = vec!["Blart Versenwald III"];
5053 let string_values = StringArray::from(raw_string_values.clone());
5054 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
5055
5056 let props = WriterProperties::builder()
5057 .set_statistics_enabled(EnabledStatistics::Page)
5058 .set_dictionary_enabled(false)
5059 .set_encoding(Encoding::PLAIN)
5060 .set_compression(crate::basic::Compression::UNCOMPRESSED)
5061 .build();
5062
5063 let file = roundtrip_opts(&batch, props);
5064
5065 let first_page = &file[4..];
5070 let mut prot = ThriftSliceInputProtocol::new(first_page);
5071 let hdr = PageHeader::read_thrift(&mut prot).unwrap();
5072 let stats = hdr.data_page_header.unwrap().statistics;
5073
5074 assert!(stats.is_none());
5075 }
5076
5077 #[test]
5078 fn test_page_stats_when_enabled() {
5079 let string_field = Field::new("a", DataType::Utf8, false);
5080 let schema = Schema::new(vec![string_field]);
5081 let raw_string_values = vec!["Blart Versenwald III", "Andrew Lamb"];
5082 let string_values = StringArray::from(raw_string_values.clone());
5083 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
5084
5085 let props = WriterProperties::builder()
5086 .set_statistics_enabled(EnabledStatistics::Page)
5087 .set_dictionary_enabled(false)
5088 .set_encoding(Encoding::PLAIN)
5089 .set_write_page_header_statistics(true)
5090 .set_compression(crate::basic::Compression::UNCOMPRESSED)
5091 .build();
5092
5093 let file = roundtrip_opts(&batch, props);
5094
5095 let first_page = &file[4..];
5100 let mut prot = ThriftSliceInputProtocol::new(first_page);
5101 let hdr = PageHeader::read_thrift(&mut prot).unwrap();
5102 let stats = hdr.data_page_header.unwrap().statistics;
5103
5104 let stats = stats.unwrap();
5105 assert!(stats.is_max_value_exact.unwrap());
5107 assert!(stats.is_min_value_exact.unwrap());
5108 assert_eq!(stats.max_value.unwrap(), "Blart Versenwald III".as_bytes());
5109 assert_eq!(stats.min_value.unwrap(), "Andrew Lamb".as_bytes());
5110 }
5111
5112 #[test]
5113 fn test_page_stats_truncation() {
5114 let string_field = Field::new("a", DataType::Utf8, false);
5115 let binary_field = Field::new("b", DataType::Binary, false);
5116 let schema = Schema::new(vec![string_field, binary_field]);
5117
5118 let raw_string_values = vec!["Blart Versenwald III"];
5119 let raw_binary_values = [b"Blart Versenwald III".to_vec()];
5120 let raw_binary_value_refs = raw_binary_values
5121 .iter()
5122 .map(|x| x.as_slice())
5123 .collect::<Vec<_>>();
5124
5125 let string_values = StringArray::from(raw_string_values.clone());
5126 let binary_values = BinaryArray::from(raw_binary_value_refs);
5127 let batch = RecordBatch::try_new(
5128 Arc::new(schema),
5129 vec![Arc::new(string_values), Arc::new(binary_values)],
5130 )
5131 .unwrap();
5132
5133 let props = WriterProperties::builder()
5134 .set_statistics_truncate_length(Some(2))
5135 .set_dictionary_enabled(false)
5136 .set_encoding(Encoding::PLAIN)
5137 .set_write_page_header_statistics(true)
5138 .set_compression(crate::basic::Compression::UNCOMPRESSED)
5139 .build();
5140
5141 let file = roundtrip_opts(&batch, props);
5142
5143 let first_page = &file[4..];
5148 let mut prot = ThriftSliceInputProtocol::new(first_page);
5149 let hdr = PageHeader::read_thrift(&mut prot).unwrap();
5150 let stats = hdr.data_page_header.unwrap().statistics;
5151 assert!(stats.is_some());
5152 let stats = stats.unwrap();
5153 assert!(!stats.is_max_value_exact.unwrap());
5155 assert!(!stats.is_min_value_exact.unwrap());
5156 assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
5157 assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
5158
5159 let second_page = &prot.as_slice()[hdr.compressed_page_size as usize..];
5161 let mut prot = ThriftSliceInputProtocol::new(second_page);
5162 let hdr = PageHeader::read_thrift(&mut prot).unwrap();
5163 let stats = hdr.data_page_header.unwrap().statistics;
5164 assert!(stats.is_some());
5165 let stats = stats.unwrap();
5166 assert!(!stats.is_max_value_exact.unwrap());
5168 assert!(!stats.is_min_value_exact.unwrap());
5169 assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
5170 assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
5171 }
5172
5173 #[test]
5174 fn test_page_encoding_statistics_roundtrip() {
5175 let batch_schema = Schema::new(vec![Field::new(
5176 "int32",
5177 arrow_schema::DataType::Int32,
5178 false,
5179 )]);
5180
5181 let batch = RecordBatch::try_new(
5182 Arc::new(batch_schema.clone()),
5183 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
5184 )
5185 .unwrap();
5186
5187 let mut file: File = tempfile::tempfile().unwrap();
5188 let mut writer = ArrowWriter::try_new(&mut file, Arc::new(batch_schema), None).unwrap();
5189 writer.write(&batch).unwrap();
5190 let file_metadata = writer.close().unwrap();
5191
5192 assert_eq!(file_metadata.num_row_groups(), 1);
5193 assert_eq!(file_metadata.row_group(0).num_columns(), 1);
5194 assert!(
5195 file_metadata
5196 .row_group(0)
5197 .column(0)
5198 .page_encoding_stats()
5199 .is_some()
5200 );
5201 let chunk_page_stats = file_metadata
5202 .row_group(0)
5203 .column(0)
5204 .page_encoding_stats()
5205 .unwrap();
5206
5207 let options = ReadOptionsBuilder::new()
5209 .with_page_index()
5210 .with_encoding_stats_as_mask(false)
5211 .build();
5212 let reader = SerializedFileReader::new_with_options(file, options).unwrap();
5213
5214 let rowgroup = reader.get_row_group(0).expect("row group missing");
5215 assert_eq!(rowgroup.num_columns(), 1);
5216 let column = rowgroup.metadata().column(0);
5217 assert!(column.page_encoding_stats().is_some());
5218 let file_page_stats = column.page_encoding_stats().unwrap();
5219 assert_eq!(chunk_page_stats, file_page_stats);
5220 }
5221
5222 #[test]
5223 fn test_different_dict_page_size_limit() {
5224 let array = Arc::new(Int64Array::from_iter(0..1024 * 1024));
5225 let schema = Arc::new(Schema::new(vec![
5226 Field::new("col0", arrow_schema::DataType::Int64, false),
5227 Field::new("col1", arrow_schema::DataType::Int64, false),
5228 ]));
5229 let batch =
5230 arrow_array::RecordBatch::try_new(schema.clone(), vec![array.clone(), array]).unwrap();
5231
5232 let props = WriterProperties::builder()
5233 .set_dictionary_page_size_limit(1024 * 1024)
5234 .set_column_dictionary_page_size_limit(ColumnPath::from("col1"), 1024 * 1024 * 4)
5235 .build();
5236 let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap();
5237 writer.write(&batch).unwrap();
5238 let data = Bytes::from(writer.into_inner().unwrap());
5239
5240 let mut metadata = ParquetMetaDataReader::new();
5241 metadata.try_parse(&data).unwrap();
5242 let metadata = metadata.finish().unwrap();
5243 let col0_meta = metadata.row_group(0).column(0);
5244 let col1_meta = metadata.row_group(0).column(1);
5245
5246 let get_dict_page_size = move |meta: &ColumnChunkMetaData| {
5247 let mut reader =
5248 SerializedPageReader::new(Arc::new(data.clone()), meta, 0, None).unwrap();
5249 let page = reader.get_next_page().unwrap().unwrap();
5250 match page {
5251 Page::DictionaryPage { buf, .. } => buf.len(),
5252 _ => panic!("expected DictionaryPage"),
5253 }
5254 };
5255
5256 assert_eq!(get_dict_page_size(col0_meta), 1024 * 1024);
5257 assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4);
5258 }
5259
5260 #[test]
5261 fn test_arrow_writer_granular_mode_roundtrip() {
5262 let small = "tiny".to_string();
5271 let big = "x".repeat(64 * 1024);
5272 let strings: Vec<String> = (0..256)
5273 .map(|i| {
5274 if i % 16 == 0 {
5275 big.clone()
5276 } else {
5277 small.clone()
5278 }
5279 })
5280 .collect();
5281
5282 let schema = Arc::new(Schema::new(vec![Field::new(
5283 "col",
5284 ArrowDataType::Utf8,
5285 false,
5286 )]));
5287 let batch = RecordBatch::try_new(
5288 schema.clone(),
5289 vec![Arc::new(StringArray::from(strings.clone())) as _],
5290 )
5291 .unwrap();
5292
5293 let props = WriterProperties::builder()
5294 .set_dictionary_enabled(false)
5295 .set_data_page_size_limit(16 * 1024)
5296 .build();
5297 let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap();
5298 writer.write(&batch).unwrap();
5299 let data = Bytes::from(writer.into_inner().unwrap());
5300
5301 let mut reader = ParquetRecordBatchReader::try_new(data, 1024).unwrap();
5302 let read = reader.next().unwrap().unwrap();
5303 assert!(reader.next().is_none(), "expected one batch");
5304 let col = read
5305 .column(0)
5306 .as_any()
5307 .downcast_ref::<StringArray>()
5308 .unwrap();
5309 assert_eq!(col.len(), strings.len());
5310 for (i, expected) in strings.iter().enumerate() {
5311 assert_eq!(
5312 col.value(i),
5313 expected.as_str(),
5314 "value mismatch at index {i}"
5315 );
5316 }
5317 }
5318
5319 #[test]
5320 fn test_arrow_writer_all_null_string_column() {
5321 let num_rows = 1024;
5326 let schema = Arc::new(Schema::new(vec![Field::new(
5327 "col",
5328 ArrowDataType::Utf8,
5329 true,
5330 )]));
5331 let nulls: Vec<Option<&str>> = vec![None; num_rows];
5332 let batch = RecordBatch::try_new(
5333 schema.clone(),
5334 vec![Arc::new(StringArray::from(nulls)) as _],
5335 )
5336 .unwrap();
5337
5338 let props = WriterProperties::builder()
5339 .set_dictionary_enabled(false)
5340 .set_data_page_size_limit(16 * 1024)
5341 .build();
5342 let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap();
5343 writer.write(&batch).unwrap();
5344 let data = Bytes::from(writer.into_inner().unwrap());
5345
5346 let mut metadata = ParquetMetaDataReader::new();
5349 metadata.try_parse(&data).unwrap();
5350 let metadata = metadata.finish().unwrap();
5351 let row_group = metadata.row_group(0);
5352 let col_meta = row_group.column(0);
5353 assert_eq!(row_group.num_rows() as usize, num_rows);
5354 if let Some(stats) = col_meta.statistics() {
5357 assert_eq!(
5358 stats.null_count_opt().unwrap_or(0) as usize,
5359 num_rows,
5360 "expected all-null column to report null_count = num_rows"
5361 );
5362 }
5363
5364 let mut reader =
5365 SerializedPageReader::new(Arc::new(data.clone()), col_meta, num_rows, None).unwrap();
5366 let mut total_values = 0u32;
5367 while let Some(page) = reader.get_next_page().unwrap() {
5368 if matches!(page, Page::DataPage { .. } | Page::DataPageV2 { .. }) {
5369 total_values += page.num_values();
5370 }
5371 }
5372 assert_eq!(
5373 total_values as usize, num_rows,
5374 "expected every level position to be represented in some page"
5375 );
5376 }
5377
5378 struct WriteBatchesShape {
5379 num_batches: usize,
5380 rows_per_batch: usize,
5381 row_size: usize,
5382 }
5383
5384 fn write_batches(
5386 WriteBatchesShape {
5387 num_batches,
5388 rows_per_batch,
5389 row_size,
5390 }: WriteBatchesShape,
5391 props: WriterProperties,
5392 ) -> ParquetRecordBatchReaderBuilder<File> {
5393 let schema = Arc::new(Schema::new(vec![Field::new(
5394 "str",
5395 ArrowDataType::Utf8,
5396 false,
5397 )]));
5398 let file = tempfile::tempfile().unwrap();
5399 let mut writer =
5400 ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
5401
5402 for batch_idx in 0..num_batches {
5403 let strings: Vec<String> = (0..rows_per_batch)
5404 .map(|i| format!("{:0>width$}", batch_idx * 10 + i, width = row_size))
5405 .collect();
5406 let array = StringArray::from(strings);
5407 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
5408 writer.write(&batch).unwrap();
5409 }
5410 writer.close().unwrap();
5411 ParquetRecordBatchReaderBuilder::try_new(file).unwrap()
5412 }
5413
5414 #[test]
5415 fn test_row_group_limit_none_writes_single_row_group() {
5417 let props = WriterProperties::builder()
5418 .set_max_row_group_row_count(None)
5419 .set_max_row_group_bytes(None)
5420 .build();
5421
5422 let builder = write_batches(
5423 WriteBatchesShape {
5424 num_batches: 1,
5425 rows_per_batch: 1000,
5426 row_size: 4,
5427 },
5428 props,
5429 );
5430
5431 assert_eq!(
5432 &row_group_sizes(builder.metadata()),
5433 &[1000],
5434 "With no limits, all rows should be in a single row group"
5435 );
5436 }
5437
5438 #[test]
5439 fn test_row_group_limit_rows_only() {
5441 let props = WriterProperties::builder()
5442 .set_max_row_group_row_count(Some(300))
5443 .set_max_row_group_bytes(None)
5444 .build();
5445
5446 let builder = write_batches(
5447 WriteBatchesShape {
5448 num_batches: 1,
5449 rows_per_batch: 1000,
5450 row_size: 4,
5451 },
5452 props,
5453 );
5454
5455 assert_eq!(
5456 &row_group_sizes(builder.metadata()),
5457 &[300, 300, 300, 100],
5458 "Row groups should be split by row count"
5459 );
5460 }
5461
5462 #[test]
5463 fn test_row_group_limit_bytes_only() {
5465 let props = WriterProperties::builder()
5466 .set_max_row_group_row_count(None)
5467 .set_max_row_group_bytes(Some(3500))
5469 .build();
5470
5471 let builder = write_batches(
5472 WriteBatchesShape {
5473 num_batches: 10,
5474 rows_per_batch: 10,
5475 row_size: 100,
5476 },
5477 props,
5478 );
5479
5480 let sizes = row_group_sizes(builder.metadata());
5481
5482 assert!(
5483 sizes.len() > 1,
5484 "Should have multiple row groups due to byte limit, got {sizes:?}",
5485 );
5486
5487 let total_rows: i64 = sizes.iter().sum();
5488 assert_eq!(total_rows, 100, "Total rows should be preserved");
5489 }
5490
5491 #[test]
5492 fn test_row_group_limit_bytes_flushes_when_current_group_already_too_large() {
5494 let schema = Arc::new(Schema::new(vec![Field::new(
5495 "str",
5496 ArrowDataType::Utf8,
5497 false,
5498 )]));
5499 let file = tempfile::tempfile().unwrap();
5500
5501 let props = WriterProperties::builder()
5503 .set_max_row_group_row_count(None)
5504 .set_max_row_group_bytes(None)
5505 .build();
5506 let mut writer =
5507 ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
5508
5509 let first_array = StringArray::from(
5510 (0..10)
5511 .map(|i| format!("{:0>100}", i))
5512 .collect::<Vec<String>>(),
5513 );
5514 let first_batch =
5515 RecordBatch::try_new(schema.clone(), vec![Arc::new(first_array)]).unwrap();
5516 writer.write(&first_batch).unwrap();
5517 assert_eq!(writer.in_progress_rows(), 10);
5518
5519 writer.max_row_group_bytes = Some(1);
5522
5523 let second_array = StringArray::from(vec!["x".to_string()]);
5524 let second_batch =
5525 RecordBatch::try_new(schema.clone(), vec![Arc::new(second_array)]).unwrap();
5526 writer.write(&second_batch).unwrap();
5527 writer.close().unwrap();
5528 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
5529
5530 assert_eq!(
5531 &row_group_sizes(builder.metadata()),
5532 &[10, 1],
5533 "The second write should flush an oversized in-progress row group first",
5534 );
5535 }
5536
5537 #[test]
5538 fn test_row_group_limit_both_row_wins_single_batch() {
5540 let props = WriterProperties::builder()
5541 .set_max_row_group_row_count(Some(200)) .set_max_row_group_bytes(Some(1024 * 1024)) .build();
5544
5545 let builder = write_batches(
5546 WriteBatchesShape {
5547 num_batches: 1,
5548 row_size: 4,
5549 rows_per_batch: 1000,
5550 },
5551 props,
5552 );
5553
5554 assert_eq!(
5555 &row_group_sizes(builder.metadata()),
5556 &[200, 200, 200, 200, 200],
5557 "Row limit should trigger before byte limit"
5558 );
5559 }
5560
5561 #[test]
5562 fn test_row_group_limit_both_row_wins_multiple_batches() {
5564 let props = WriterProperties::builder()
5565 .set_max_row_group_row_count(Some(5)) .set_max_row_group_bytes(Some(9999)) .build();
5568
5569 let builder = write_batches(
5570 WriteBatchesShape {
5571 num_batches: 10,
5572 rows_per_batch: 10,
5573 row_size: 100,
5574 },
5575 props,
5576 );
5577
5578 assert_eq!(
5579 &row_group_sizes(builder.metadata()),
5580 &[5; 20],
5581 "Row limit should trigger before byte limit"
5582 );
5583 }
5584
5585 #[test]
5586 fn test_row_group_limit_both_bytes_wins() {
5588 let props = WriterProperties::builder()
5589 .set_max_row_group_row_count(Some(1000)) .set_max_row_group_bytes(Some(3500)) .build();
5592
5593 let builder = write_batches(
5594 WriteBatchesShape {
5595 num_batches: 10,
5596 rows_per_batch: 10,
5597 row_size: 100,
5598 },
5599 props,
5600 );
5601
5602 let sizes = row_group_sizes(builder.metadata());
5603
5604 assert!(
5605 sizes.len() > 1,
5606 "Byte limit should trigger before row limit, got {sizes:?}",
5607 );
5608
5609 assert!(
5610 sizes.iter().all(|&s| s < 1000),
5611 "No row group should hit the row limit"
5612 );
5613
5614 let total_rows: i64 = sizes.iter().sum();
5615 assert_eq!(total_rows, 100, "Total rows should be preserved");
5616 }
5617
5618 #[test]
5619 fn arrow_column_chunk_close_mut_drops_column_index() {
5620 use crate::arrow::ArrowSchemaConverter;
5621 use crate::file::writer::SerializedFileWriter;
5622
5623 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)]));
5624 let props = Arc::new(
5625 WriterProperties::builder()
5626 .set_statistics_enabled(EnabledStatistics::Page)
5627 .build(),
5628 );
5629 let parquet_schema = ArrowSchemaConverter::new()
5630 .with_coerce_types(props.coerce_types())
5631 .convert(&schema)
5632 .unwrap();
5633
5634 let mut buf = Vec::with_capacity(1024);
5635 let mut writer =
5636 SerializedFileWriter::new(&mut buf, parquet_schema.root_schema_ptr(), props.clone())
5637 .unwrap();
5638
5639 let factory = ArrowRowGroupWriterFactory::new(&writer, Arc::clone(&schema));
5640 let mut col_writers = factory.create_column_writers(0).unwrap();
5641 let arr: ArrayRef = Arc::new(Int32Array::from_iter_values(0..64));
5642 for leaves in compute_leaves(schema.field(0), &arr).unwrap() {
5643 col_writers[0].write(&leaves).unwrap();
5644 }
5645 let mut chunk = col_writers.pop().unwrap().close().unwrap();
5646
5647 assert!(
5649 chunk.close().column_index.is_some(),
5650 "EnabledStatistics::Page should produce a column_index"
5651 );
5652
5653 chunk.close_mut().column_index = None;
5655 assert!(chunk.close().column_index.is_none());
5656
5657 let mut rg = writer.next_row_group().unwrap();
5658 chunk.append_to_row_group(&mut rg).unwrap();
5659 rg.close().unwrap();
5660 let file_meta = writer.close().unwrap();
5661
5662 let cc = file_meta.row_group(0).column(0);
5665 assert!(cc.column_index_range().is_none());
5666 }
5667}