1use crate::column::chunker::ContentDefinedChunker;
21
22use bytes::Bytes;
23use std::io::{Read, Write};
24use std::slice::Iter;
25use std::sync::{Arc, Mutex};
26use std::vec::IntoIter;
27
28use arrow_array::cast::AsArray;
29use arrow_array::types::*;
30use arrow_array::{ArrayRef, Int32Array, RecordBatch, RecordBatchWriter};
31use arrow_schema::{
32 ArrowError, DataType as ArrowDataType, Field, IntervalUnit, SchemaRef, TimeUnit,
33};
34
35use super::schema::{add_encoded_arrow_schema_to_metadata, decimal_length_from_precision};
36
37use crate::arrow::ArrowSchemaConverter;
38use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder;
39use crate::basic::PageType;
40use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter};
41use crate::column::page_encryption::PageEncryptor;
42use crate::column::writer::encoder::ColumnValueEncoder;
43use crate::column::writer::{
44 ColumnCloseResult, ColumnWriter, GenericColumnWriter, get_column_writer,
45};
46use crate::data_type::{ByteArray, FixedLenByteArray};
47#[cfg(feature = "encryption")]
48use crate::encryption::encrypt::FileEncryptor;
49use crate::errors::{ParquetError, Result};
50use crate::file::metadata::{KeyValue, ParquetMetaData, RowGroupMetaData};
51use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
52use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
53use crate::parquet_thrift::{ThriftCompactOutputProtocol, WriteThrift};
54use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor};
55use levels::{ArrayLevels, calculate_array_levels};
56
57mod byte_array;
58mod levels;
59
60#[doc(inline)]
61pub use crate::column::page_store::{
62 InMemoryPageStore, InMemoryPageStoreFactory, PageKey, PageStore, PageStoreArgs,
63 PageStoreFactory,
64};
65
66pub struct ArrowWriter<W: Write> {
183 writer: SerializedFileWriter<W>,
185
186 in_progress: Option<ArrowRowGroupWriter>,
188
189 arrow_schema: SchemaRef,
193
194 row_group_writer_factory: ArrowRowGroupWriterFactory,
196
197 max_row_group_row_count: Option<usize>,
199
200 max_row_group_bytes: Option<usize>,
202
203 cdc_chunkers: Option<Vec<ContentDefinedChunker>>,
205}
206
207impl<W: Write + Send> std::fmt::Debug for ArrowWriter<W> {
208 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
209 let buffered_memory = self.in_progress_size();
210 f.debug_struct("ArrowWriter")
211 .field("writer", &self.writer)
212 .field("in_progress_size", &format_args!("{buffered_memory} bytes"))
213 .field("in_progress_rows", &self.in_progress_rows())
214 .field("arrow_schema", &self.arrow_schema)
215 .field("max_row_group_row_count", &self.max_row_group_row_count)
216 .field("max_row_group_bytes", &self.max_row_group_bytes)
217 .finish()
218 }
219}
220
221impl<W: Write + Send> ArrowWriter<W> {
222 pub fn try_new(
228 writer: W,
229 arrow_schema: SchemaRef,
230 props: Option<WriterProperties>,
231 ) -> Result<Self> {
232 let options = ArrowWriterOptions::new().with_properties(props.unwrap_or_default());
233 Self::try_new_with_options(writer, arrow_schema, options)
234 }
235
236 pub fn try_new_with_options(
242 writer: W,
243 arrow_schema: SchemaRef,
244 options: ArrowWriterOptions,
245 ) -> Result<Self> {
246 let mut props = options.properties;
247
248 let schema = if let Some(parquet_schema) = options.schema_descr {
249 parquet_schema.clone()
250 } else {
251 let mut converter = ArrowSchemaConverter::new().with_coerce_types(props.coerce_types());
252 if let Some(schema_root) = &options.schema_root {
253 converter = converter.schema_root(schema_root);
254 }
255
256 converter.convert(&arrow_schema)?
257 };
258
259 if !options.skip_arrow_metadata {
260 add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
262 }
263
264 let max_row_group_row_count = props.max_row_group_row_count();
265 let max_row_group_bytes = props.max_row_group_bytes();
266
267 let props_ptr = Arc::new(props);
268 let file_writer =
269 SerializedFileWriter::new(writer, schema.root_schema_ptr(), Arc::clone(&props_ptr))?;
270
271 let mut row_group_writer_factory =
272 ArrowRowGroupWriterFactory::new(&file_writer, arrow_schema.clone());
273 if let Some(page_store_factory) = options.page_store_factory {
274 row_group_writer_factory =
275 row_group_writer_factory.with_page_store_factory(page_store_factory);
276 }
277
278 let cdc_chunkers = props_ptr
279 .content_defined_chunking()
280 .map(|opts| {
281 file_writer
282 .schema_descr()
283 .columns()
284 .iter()
285 .map(|desc| ContentDefinedChunker::new(desc, opts))
286 .collect::<Result<Vec<_>>>()
287 })
288 .transpose()?;
289
290 Ok(Self {
291 writer: file_writer,
292 in_progress: None,
293 arrow_schema,
294 row_group_writer_factory,
295 max_row_group_row_count,
296 max_row_group_bytes,
297 cdc_chunkers,
298 })
299 }
300
301 pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
303 self.writer.flushed_row_groups()
304 }
305
306 pub fn memory_size(&self) -> usize {
311 match &self.in_progress {
312 Some(in_progress) => in_progress.writers.iter().map(|x| x.memory_size()).sum(),
313 None => 0,
314 }
315 }
316
317 pub fn in_progress_size(&self) -> usize {
324 match &self.in_progress {
325 Some(in_progress) => in_progress
326 .writers
327 .iter()
328 .map(|x| x.get_estimated_total_bytes())
329 .sum(),
330 None => 0,
331 }
332 }
333
334 pub fn in_progress_rows(&self) -> usize {
336 self.in_progress
337 .as_ref()
338 .map(|x| x.buffered_rows)
339 .unwrap_or_default()
340 }
341
342 pub fn bytes_written(&self) -> usize {
344 self.writer.bytes_written()
345 }
346
347 pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
359 if batch.num_rows() == 0 {
360 return Ok(());
361 }
362
363 let in_progress = match &mut self.in_progress {
364 Some(in_progress) => in_progress,
365 x => x.insert(
366 self.row_group_writer_factory
367 .create_row_group_writer(self.writer.flushed_row_groups().len())?,
368 ),
369 };
370
371 if let Some(max_rows) = self.max_row_group_row_count {
372 if in_progress.buffered_rows + batch.num_rows() > max_rows {
373 let to_write = max_rows - in_progress.buffered_rows;
374 let a = batch.slice(0, to_write);
375 let b = batch.slice(to_write, batch.num_rows() - to_write);
376 self.write(&a)?;
377 return self.write(&b);
378 }
379 }
380
381 if let Some(max_bytes) = self.max_row_group_bytes {
384 if in_progress.buffered_rows > 0 {
385 let current_bytes = in_progress.get_estimated_total_bytes();
386
387 if current_bytes >= max_bytes {
388 self.flush()?;
389 return self.write(batch);
390 }
391
392 let avg_row_bytes = current_bytes / in_progress.buffered_rows;
393 if avg_row_bytes > 0 {
394 let remaining_bytes = max_bytes - current_bytes;
396 let rows_that_fit = remaining_bytes / avg_row_bytes;
397
398 if batch.num_rows() > rows_that_fit {
399 if rows_that_fit > 0 {
400 let a = batch.slice(0, rows_that_fit);
401 let b = batch.slice(rows_that_fit, batch.num_rows() - rows_that_fit);
402 self.write(&a)?;
403 return self.write(&b);
404 } else {
405 self.flush()?;
406 return self.write(batch);
407 }
408 }
409 }
410 }
411 }
412
413 match self.cdc_chunkers.as_mut() {
414 Some(chunkers) => in_progress.write_with_chunkers(batch, chunkers)?,
415 None => in_progress.write(batch)?,
416 }
417
418 let should_flush = self
419 .max_row_group_row_count
420 .is_some_and(|max| in_progress.buffered_rows >= max)
421 || self
422 .max_row_group_bytes
423 .is_some_and(|max| in_progress.get_estimated_total_bytes() >= max);
424
425 if should_flush {
426 self.flush()?
427 }
428 Ok(())
429 }
430
431 pub fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
436 self.writer.write_all(buf)
437 }
438
439 pub fn sync(&mut self) -> std::io::Result<()> {
441 self.writer.flush()
442 }
443
444 pub fn flush(&mut self) -> Result<()> {
449 let in_progress = match self.in_progress.take() {
450 Some(in_progress) => in_progress,
451 None => return Ok(()),
452 };
453
454 let mut row_group_writer = self.writer.next_row_group()?;
455 for chunk in in_progress.close()? {
456 chunk.append_to_row_group(&mut row_group_writer)?;
457 }
458 row_group_writer.close()?;
459 Ok(())
460 }
461
462 pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
466 self.writer.append_key_value_metadata(kv_metadata)
467 }
468
469 pub fn inner(&self) -> &W {
471 self.writer.inner()
472 }
473
474 pub fn inner_mut(&mut self) -> &mut W {
483 self.writer.inner_mut()
484 }
485
486 pub fn into_inner(mut self) -> Result<W> {
488 self.flush()?;
489 self.writer.into_inner()
490 }
491
492 pub fn finish(&mut self) -> Result<ParquetMetaData> {
498 self.flush()?;
499 self.writer.finish()
500 }
501
502 pub fn close(mut self) -> Result<ParquetMetaData> {
504 self.finish()
505 }
506
507 #[deprecated(
509 since = "56.2.0",
510 note = "Use `ArrowRowGroupWriterFactory` instead, see `ArrowColumnWriter` for an example"
511 )]
512 pub fn get_column_writers(&mut self) -> Result<Vec<ArrowColumnWriter>> {
513 self.flush()?;
514 let in_progress = self
515 .row_group_writer_factory
516 .create_row_group_writer(self.writer.flushed_row_groups().len())?;
517 Ok(in_progress.writers)
518 }
519
520 #[deprecated(
522 since = "56.2.0",
523 note = "Use `SerializedFileWriter` directly instead, see `ArrowColumnWriter` for an example"
524 )]
525 pub fn append_row_group(&mut self, chunks: Vec<ArrowColumnChunk>) -> Result<()> {
526 let mut row_group_writer = self.writer.next_row_group()?;
527 for chunk in chunks {
528 chunk.append_to_row_group(&mut row_group_writer)?;
529 }
530 row_group_writer.close()?;
531 Ok(())
532 }
533
534 pub fn into_serialized_writer(
541 mut self,
542 ) -> Result<(SerializedFileWriter<W>, ArrowRowGroupWriterFactory)> {
543 self.flush()?;
544 Ok((self.writer, self.row_group_writer_factory))
545 }
546}
547
548impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> {
549 fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
550 self.write(batch).map_err(|e| e.into())
551 }
552
553 fn close(self) -> std::result::Result<(), ArrowError> {
554 self.close()?;
555 Ok(())
556 }
557}
558
559#[derive(Debug, Clone, Default)]
563pub struct ArrowWriterOptions {
564 properties: WriterProperties,
565 skip_arrow_metadata: bool,
566 schema_root: Option<String>,
567 schema_descr: Option<SchemaDescriptor>,
568 page_store_factory: Option<Arc<dyn PageStoreFactory>>,
569}
570
571impl ArrowWriterOptions {
572 pub fn new() -> Self {
574 Self::default()
575 }
576
577 pub fn with_properties(self, properties: WriterProperties) -> Self {
579 Self { properties, ..self }
580 }
581
582 pub fn with_page_store_factory(self, page_store_factory: Arc<dyn PageStoreFactory>) -> Self {
660 Self {
661 page_store_factory: Some(page_store_factory),
662 ..self
663 }
664 }
665
666 pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
673 Self {
674 skip_arrow_metadata,
675 ..self
676 }
677 }
678
679 pub fn with_schema_root(self, schema_root: String) -> Self {
681 Self {
682 schema_root: Some(schema_root),
683 ..self
684 }
685 }
686
687 pub fn with_parquet_schema(self, schema_descr: SchemaDescriptor) -> Self {
693 Self {
694 schema_descr: Some(schema_descr),
695 ..self
696 }
697 }
698}
699
700struct ArrowColumnChunkData {
706 length: usize,
707 store: Box<dyn PageStore>,
708 keys: Vec<PageKey>,
709 dictionary: Vec<Bytes>,
720}
721
722impl ArrowColumnChunkData {
723 fn new(store: Box<dyn PageStore>) -> Self {
724 Self {
725 length: 0,
726 store,
727 keys: Vec::new(),
728 dictionary: Vec::new(),
729 }
730 }
731
732 fn push(&mut self, value: Bytes) -> Result<()> {
735 let key = self.store.put(value)?;
736 self.keys.push(key);
737 Ok(())
738 }
739
740 fn push_dictionary(&mut self, value: Bytes) {
742 self.dictionary.push(value);
743 }
744
745 fn dictionary_len(&self) -> usize {
747 self.dictionary.iter().map(Bytes::len).sum()
748 }
749
750 fn memory_size(&self) -> usize {
754 self.store.memory_size() + self.dictionary_len()
755 }
756}
757
758struct StreamingColumnChunkReader {
767 dictionary: IntoIter<Bytes>,
769 store: Box<dyn PageStore>,
770 keys: IntoIter<PageKey>,
771 current: Bytes,
773}
774
775impl StreamingColumnChunkReader {
776 fn new(data: ArrowColumnChunkData) -> Self {
777 Self {
778 dictionary: data.dictionary.into_iter(),
779 store: data.store,
780 keys: data.keys.into_iter(),
781 current: Bytes::new(),
782 }
783 }
784}
785
786impl Read for StreamingColumnChunkReader {
787 fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
788 while self.current.is_empty() {
791 if let Some(blob) = self.dictionary.next() {
792 self.current = blob;
793 } else if let Some(key) = self.keys.next() {
794 self.current = self.store.take(key).map_err(std::io::Error::other)?;
795 } else {
796 return Ok(0);
797 }
798 }
799
800 let len = self.current.len().min(out.len());
801 let b = self.current.split_to(len);
802 out[..len].copy_from_slice(&b);
803 Ok(len)
804 }
805}
806
807type SharedColumnChunk = Arc<Mutex<ArrowColumnChunkData>>;
812
813struct ArrowPageWriter {
814 buffer: SharedColumnChunk,
815 #[cfg(feature = "encryption")]
816 page_encryptor: Option<PageEncryptor>,
817}
818
819impl ArrowPageWriter {
820 fn new(store: Box<dyn PageStore>) -> Self {
822 Self {
823 buffer: Arc::new(Mutex::new(ArrowColumnChunkData::new(store))),
824 #[cfg(feature = "encryption")]
825 page_encryptor: None,
826 }
827 }
828
829 #[cfg(feature = "encryption")]
830 pub fn with_encryptor(mut self, page_encryptor: Option<PageEncryptor>) -> Self {
831 self.page_encryptor = page_encryptor;
832 self
833 }
834
835 #[cfg(feature = "encryption")]
836 fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
837 self.page_encryptor.as_mut()
838 }
839
840 #[cfg(not(feature = "encryption"))]
841 fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
842 None
843 }
844}
845
846impl PageWriter for ArrowPageWriter {
847 fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
848 let page = match self.page_encryptor_mut() {
849 Some(page_encryptor) => page_encryptor.encrypt_compressed_page(page)?,
850 None => page,
851 };
852
853 let page_header = page.to_thrift_header()?;
854 let header = {
855 let mut header = Vec::with_capacity(1024);
856
857 match self.page_encryptor_mut() {
858 Some(page_encryptor) => {
859 page_encryptor.encrypt_page_header(&page_header, &mut header)?;
860 if page.compressed_page().is_data_page() {
861 page_encryptor.increment_page();
862 }
863 }
864 None => {
865 let mut protocol = ThriftCompactOutputProtocol::new(&mut header);
866 page_header.write_thrift(&mut protocol)?;
867 }
868 };
869
870 Bytes::from(header)
871 };
872
873 let mut buf = self.buffer.try_lock().unwrap();
874
875 let data = page.compressed_page().buffer().clone();
876 let compressed_size = data.len() + header.len();
877
878 let mut spec = PageWriteSpec::new();
879 spec.page_type = page.page_type();
880 spec.num_values = page.num_values();
881 spec.uncompressed_size = page.uncompressed_size() + header.len();
882 spec.offset = buf.length as u64;
883 spec.compressed_size = compressed_size;
884 spec.bytes_written = compressed_size as u64;
885
886 buf.length += compressed_size;
887 if spec.page_type == PageType::DICTIONARY_PAGE {
888 buf.push_dictionary(header);
891 buf.push_dictionary(data);
892 } else {
893 buf.push(header)?;
894 buf.push(data)?;
895 }
896
897 Ok(spec)
898 }
899
900 fn defers_dictionary_ordering(&self) -> bool {
901 true
906 }
907
908 fn buffered_memory_size(&self) -> usize {
909 self.buffer.try_lock().unwrap().memory_size()
912 }
913
914 fn close(&mut self) -> Result<()> {
915 Ok(())
916 }
917}
918
919#[derive(Debug)]
921pub struct ArrowLeafColumn(ArrayLevels);
922
923pub fn compute_leaves(field: &Field, array: &ArrayRef) -> Result<Vec<ArrowLeafColumn>> {
928 let levels = calculate_array_levels(array, field)?;
929 Ok(levels.into_iter().map(ArrowLeafColumn).collect())
930}
931
932pub struct ArrowColumnChunk {
934 data: ArrowColumnChunkData,
935 close: ColumnCloseResult,
936}
937
938impl std::fmt::Debug for ArrowColumnChunk {
939 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
940 f.debug_struct("ArrowColumnChunk")
941 .field("length", &self.data.length)
942 .finish_non_exhaustive()
943 }
944}
945
946impl ArrowColumnChunk {
947 pub fn close(&self) -> &ColumnCloseResult {
954 &self.close
955 }
956
957 pub fn close_mut(&mut self) -> &mut ColumnCloseResult {
964 &mut self.close
965 }
966
967 pub fn append_to_row_group<W: Write + Send>(
970 self,
971 writer: &mut SerializedRowGroupWriter<'_, W>,
972 ) -> Result<()> {
973 let ArrowColumnChunk { data, close } = self;
974
975 let close = close.update_dictionary_location(data.dictionary_len())?;
979
980 let reader = StreamingColumnChunkReader::new(data);
981 writer.append_column_from_read(reader, close)
982 }
983}
984
985pub struct ArrowColumnWriter {
1083 writer: ArrowColumnWriterImpl,
1084 chunk: SharedColumnChunk,
1085}
1086
1087impl std::fmt::Debug for ArrowColumnWriter {
1088 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1089 f.debug_struct("ArrowColumnWriter").finish_non_exhaustive()
1090 }
1091}
1092
1093enum ArrowColumnWriterImpl {
1094 ByteArray(GenericColumnWriter<'static, ByteArrayEncoder>),
1095 Column(ColumnWriter<'static>),
1096}
1097
1098impl ArrowColumnWriter {
1099 pub fn write(&mut self, col: &ArrowLeafColumn) -> Result<()> {
1101 self.write_internal(&col.0)
1102 }
1103
1104 fn write_with_chunker(
1106 &mut self,
1107 col: &ArrowLeafColumn,
1108 chunker: &mut ContentDefinedChunker,
1109 ) -> Result<()> {
1110 let levels = &col.0;
1111 let chunks = chunker.get_arrow_chunks(
1112 levels.def_level_data().as_ref(),
1113 levels.rep_level_data().as_ref(),
1114 levels.array(),
1115 )?;
1116
1117 let num_chunks = chunks.len();
1118 for (i, chunk) in chunks.iter().enumerate() {
1119 let chunk_levels = levels.slice_for_chunk(chunk);
1120 self.write_internal(&chunk_levels)?;
1121
1122 if i + 1 < num_chunks {
1124 match &mut self.writer {
1125 ArrowColumnWriterImpl::Column(c) => c.add_data_page()?,
1126 ArrowColumnWriterImpl::ByteArray(c) => c.add_data_page()?,
1127 }
1128 }
1129 }
1130 Ok(())
1131 }
1132
1133 fn write_internal(&mut self, levels: &ArrayLevels) -> Result<()> {
1134 match &mut self.writer {
1135 ArrowColumnWriterImpl::Column(c) => {
1136 let leaf = levels.array();
1137 match leaf.as_any_dictionary_opt() {
1138 Some(dictionary) => {
1139 let materialized =
1140 arrow_select::take::take(dictionary.values(), dictionary.keys(), None)?;
1141 write_leaf(c, &materialized, levels)?
1142 }
1143 None => write_leaf(c, leaf, levels)?,
1144 };
1145 }
1146 ArrowColumnWriterImpl::ByteArray(c) => {
1147 write_primitive(c, levels.array().as_ref(), levels)?;
1148 }
1149 }
1150 Ok(())
1151 }
1152
1153 pub fn close(self) -> Result<ArrowColumnChunk> {
1155 let close = match self.writer {
1156 ArrowColumnWriterImpl::ByteArray(c) => c.close()?,
1157 ArrowColumnWriterImpl::Column(c) => c.close()?,
1158 };
1159 let chunk = Arc::try_unwrap(self.chunk).ok().unwrap();
1160 let data = chunk.into_inner().unwrap();
1161 Ok(ArrowColumnChunk { data, close })
1162 }
1163
1164 pub fn memory_size(&self) -> usize {
1175 match &self.writer {
1176 ArrowColumnWriterImpl::ByteArray(c) => c.memory_size(),
1177 ArrowColumnWriterImpl::Column(c) => c.memory_size(),
1178 }
1179 }
1180
1181 pub fn get_estimated_total_bytes(&self) -> usize {
1189 match &self.writer {
1190 ArrowColumnWriterImpl::ByteArray(c) => c.get_estimated_total_bytes() as _,
1191 ArrowColumnWriterImpl::Column(c) => c.get_estimated_total_bytes() as _,
1192 }
1193 }
1194}
1195
1196#[derive(Debug)]
1203struct ArrowRowGroupWriter {
1204 writers: Vec<ArrowColumnWriter>,
1205 schema: SchemaRef,
1206 buffered_rows: usize,
1207}
1208
1209impl ArrowRowGroupWriter {
1210 fn new(writers: Vec<ArrowColumnWriter>, arrow: &SchemaRef) -> Self {
1211 Self {
1212 writers,
1213 schema: arrow.clone(),
1214 buffered_rows: 0,
1215 }
1216 }
1217
1218 fn write(&mut self, batch: &RecordBatch) -> Result<()> {
1219 self.buffered_rows += batch.num_rows();
1220 let mut writers = self.writers.iter_mut();
1221 for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
1222 for leaf in compute_leaves(field.as_ref(), column)? {
1223 writers.next().unwrap().write(&leaf)?;
1224 }
1225 }
1226 Ok(())
1227 }
1228
1229 fn write_with_chunkers(
1230 &mut self,
1231 batch: &RecordBatch,
1232 chunkers: &mut [ContentDefinedChunker],
1233 ) -> Result<()> {
1234 self.buffered_rows += batch.num_rows();
1235 let mut writers = self.writers.iter_mut();
1236 let mut chunkers = chunkers.iter_mut();
1237 for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
1238 for leaf in compute_leaves(field.as_ref(), column)? {
1239 writers
1240 .next()
1241 .unwrap()
1242 .write_with_chunker(&leaf, chunkers.next().unwrap())?;
1243 }
1244 }
1245 Ok(())
1246 }
1247
1248 fn get_estimated_total_bytes(&self) -> usize {
1250 self.writers
1251 .iter()
1252 .map(|x| x.get_estimated_total_bytes())
1253 .sum()
1254 }
1255
1256 fn close(self) -> Result<Vec<ArrowColumnChunk>> {
1257 self.writers
1258 .into_iter()
1259 .map(|writer| writer.close())
1260 .collect()
1261 }
1262}
1263
1264#[derive(Debug)]
1269pub struct ArrowRowGroupWriterFactory {
1270 schema: SchemaDescPtr,
1271 arrow_schema: SchemaRef,
1272 props: WriterPropertiesPtr,
1273 page_store_factory: Arc<dyn PageStoreFactory>,
1274 #[cfg(feature = "encryption")]
1275 file_encryptor: Option<Arc<FileEncryptor>>,
1276}
1277
1278impl ArrowRowGroupWriterFactory {
1279 pub fn new<W: Write + Send>(
1281 file_writer: &SerializedFileWriter<W>,
1282 arrow_schema: SchemaRef,
1283 ) -> Self {
1284 let schema = Arc::clone(file_writer.schema_descr_ptr());
1285 let props = Arc::clone(file_writer.properties());
1286 Self {
1287 schema,
1288 arrow_schema,
1289 props,
1290 page_store_factory: Arc::new(InMemoryPageStoreFactory),
1291 #[cfg(feature = "encryption")]
1292 file_encryptor: file_writer.file_encryptor(),
1293 }
1294 }
1295
1296 pub fn with_page_store_factory(
1300 mut self,
1301 page_store_factory: Arc<dyn PageStoreFactory>,
1302 ) -> Self {
1303 self.page_store_factory = page_store_factory;
1304 self
1305 }
1306
1307 fn create_row_group_writer(&self, row_group_index: usize) -> Result<ArrowRowGroupWriter> {
1308 let writers = self.create_column_writers(row_group_index)?;
1309 Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema))
1310 }
1311
1312 pub fn create_column_writers(&self, row_group_index: usize) -> Result<Vec<ArrowColumnWriter>> {
1314 let mut writers = Vec::with_capacity(self.arrow_schema.fields.len());
1315 let mut leaves = self.schema.columns().iter();
1316 let column_factory = self.column_writer_factory(row_group_index);
1317 for field in &self.arrow_schema.fields {
1318 column_factory.get_arrow_column_writer(
1319 field.data_type(),
1320 &self.props,
1321 &mut leaves,
1322 &mut writers,
1323 )?;
1324 }
1325 Ok(writers)
1326 }
1327
1328 #[cfg(feature = "encryption")]
1329 fn column_writer_factory(&self, row_group_idx: usize) -> ArrowColumnWriterFactory {
1330 ArrowColumnWriterFactory::new()
1331 .with_page_store_factory(self.page_store_factory.clone())
1332 .with_file_encryptor(row_group_idx, self.file_encryptor.clone())
1333 }
1334
1335 #[cfg(not(feature = "encryption"))]
1336 fn column_writer_factory(&self, _row_group_idx: usize) -> ArrowColumnWriterFactory {
1337 ArrowColumnWriterFactory::new().with_page_store_factory(self.page_store_factory.clone())
1338 }
1339}
1340
1341#[deprecated(since = "57.0.0", note = "Use `ArrowRowGroupWriterFactory` instead")]
1343pub fn get_column_writers(
1344 parquet: &SchemaDescriptor,
1345 props: &WriterPropertiesPtr,
1346 arrow: &SchemaRef,
1347) -> Result<Vec<ArrowColumnWriter>> {
1348 let mut writers = Vec::with_capacity(arrow.fields.len());
1349 let mut leaves = parquet.columns().iter();
1350 let column_factory = ArrowColumnWriterFactory::new();
1351 for field in &arrow.fields {
1352 column_factory.get_arrow_column_writer(
1353 field.data_type(),
1354 props,
1355 &mut leaves,
1356 &mut writers,
1357 )?;
1358 }
1359 Ok(writers)
1360}
1361
1362struct ArrowColumnWriterFactory {
1364 page_store_factory: Arc<dyn PageStoreFactory>,
1366 #[cfg(feature = "encryption")]
1367 row_group_index: usize,
1368 #[cfg(feature = "encryption")]
1369 file_encryptor: Option<Arc<FileEncryptor>>,
1370}
1371
1372impl ArrowColumnWriterFactory {
1373 pub fn new() -> Self {
1374 Self {
1375 page_store_factory: Arc::new(InMemoryPageStoreFactory),
1376 #[cfg(feature = "encryption")]
1377 row_group_index: 0,
1378 #[cfg(feature = "encryption")]
1379 file_encryptor: None,
1380 }
1381 }
1382
1383 pub fn with_page_store_factory(
1385 mut self,
1386 page_store_factory: Arc<dyn PageStoreFactory>,
1387 ) -> Self {
1388 self.page_store_factory = page_store_factory;
1389 self
1390 }
1391
1392 #[cfg(feature = "encryption")]
1393 pub fn with_file_encryptor(
1394 mut self,
1395 row_group_index: usize,
1396 file_encryptor: Option<Arc<FileEncryptor>>,
1397 ) -> Self {
1398 self.row_group_index = row_group_index;
1399 self.file_encryptor = file_encryptor;
1400 self
1401 }
1402
1403 #[cfg(feature = "encryption")]
1404 fn create_page_writer(
1405 &self,
1406 column_descriptor: &ColumnDescPtr,
1407 column_index: usize,
1408 ) -> Result<Box<ArrowPageWriter>> {
1409 let column_path = column_descriptor.path().string();
1410 let page_encryptor = PageEncryptor::create_if_column_encrypted(
1411 &self.file_encryptor,
1412 self.row_group_index,
1413 column_index,
1414 &column_path,
1415 )?;
1416 let args = PageStoreArgs::new(column_index, column_descriptor);
1417 let store = self.page_store_factory.create(&args)?;
1418 Ok(Box::new(
1419 ArrowPageWriter::new(store).with_encryptor(page_encryptor),
1420 ))
1421 }
1422
1423 #[cfg(not(feature = "encryption"))]
1424 fn create_page_writer(
1425 &self,
1426 column_descriptor: &ColumnDescPtr,
1427 column_index: usize,
1428 ) -> Result<Box<ArrowPageWriter>> {
1429 let args = PageStoreArgs::new(column_index, column_descriptor);
1430 let store = self.page_store_factory.create(&args)?;
1431 Ok(Box::new(ArrowPageWriter::new(store)))
1432 }
1433
1434 fn get_arrow_column_writer(
1437 &self,
1438 data_type: &ArrowDataType,
1439 props: &WriterPropertiesPtr,
1440 leaves: &mut Iter<'_, ColumnDescPtr>,
1441 out: &mut Vec<ArrowColumnWriter>,
1442 ) -> Result<()> {
1443 let col = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
1445 let page_writer = self.create_page_writer(desc, out.len())?;
1446 let chunk = page_writer.buffer.clone();
1447 let writer = get_column_writer(desc.clone(), props.clone(), page_writer);
1448 Ok(ArrowColumnWriter {
1449 chunk,
1450 writer: ArrowColumnWriterImpl::Column(writer),
1451 })
1452 };
1453
1454 let bytes = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
1456 let page_writer = self.create_page_writer(desc, out.len())?;
1457 let chunk = page_writer.buffer.clone();
1458 let writer = GenericColumnWriter::new(desc.clone(), props.clone(), page_writer);
1459 Ok(ArrowColumnWriter {
1460 chunk,
1461 writer: ArrowColumnWriterImpl::ByteArray(writer),
1462 })
1463 };
1464
1465 match data_type {
1466 _ if data_type.is_primitive() => out.push(col(leaves.next().unwrap())?),
1467 ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Boolean | ArrowDataType::Null => {
1468 out.push(col(leaves.next().unwrap())?)
1469 }
1470 ArrowDataType::LargeBinary
1471 | ArrowDataType::Binary
1472 | ArrowDataType::Utf8
1473 | ArrowDataType::LargeUtf8
1474 | ArrowDataType::BinaryView
1475 | ArrowDataType::Utf8View => out.push(bytes(leaves.next().unwrap())?),
1476 ArrowDataType::List(f)
1477 | ArrowDataType::LargeList(f)
1478 | ArrowDataType::FixedSizeList(f, _)
1479 | ArrowDataType::ListView(f)
1480 | ArrowDataType::LargeListView(f) => {
1481 self.get_arrow_column_writer(f.data_type(), props, leaves, out)?
1482 }
1483 ArrowDataType::Struct(fields) => {
1484 for field in fields {
1485 self.get_arrow_column_writer(field.data_type(), props, leaves, out)?
1486 }
1487 }
1488 ArrowDataType::Map(f, _) => match f.data_type() {
1489 ArrowDataType::Struct(f) => {
1490 self.get_arrow_column_writer(f[0].data_type(), props, leaves, out)?;
1491 self.get_arrow_column_writer(f[1].data_type(), props, leaves, out)?
1492 }
1493 _ => unreachable!("invalid map type"),
1494 },
1495 ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1496 ArrowDataType::Utf8
1497 | ArrowDataType::LargeUtf8
1498 | ArrowDataType::Binary
1499 | ArrowDataType::LargeBinary => out.push(bytes(leaves.next().unwrap())?),
1500 ArrowDataType::Utf8View | ArrowDataType::BinaryView => {
1501 out.push(bytes(leaves.next().unwrap())?)
1502 }
1503 ArrowDataType::FixedSizeBinary(_) => out.push(bytes(leaves.next().unwrap())?),
1504 _ => out.push(col(leaves.next().unwrap())?),
1505 },
1506 ArrowDataType::RunEndEncoded(_, value_field) => {
1507 self.get_arrow_column_writer(value_field.data_type(), props, leaves, out)?
1508 }
1509 _ => {
1510 return Err(ParquetError::NYI(format!(
1511 "Attempting to write an Arrow type {data_type} to parquet that is not yet implemented"
1512 )));
1513 }
1514 }
1515 Ok(())
1516 }
1517}
1518
1519fn write_leaf(
1520 writer: &mut ColumnWriter<'_>,
1521 column: &dyn arrow_array::Array,
1522 levels: &ArrayLevels,
1523) -> Result<usize> {
1524 let indices = levels.non_null_indices();
1525
1526 match writer {
1527 ColumnWriter::Int32ColumnWriter(typed) => {
1529 match column.data_type() {
1530 ArrowDataType::Null => {
1531 let array = Int32Array::new_null(column.len());
1532 write_primitive(typed, array.values(), levels)
1533 }
1534 ArrowDataType::Int8 => {
1535 let array: Int32Array = column.as_primitive::<Int8Type>().unary(|x| x as i32);
1536 write_primitive(typed, array.values(), levels)
1537 }
1538 ArrowDataType::Int16 => {
1539 let array: Int32Array = column.as_primitive::<Int16Type>().unary(|x| x as i32);
1540 write_primitive(typed, array.values(), levels)
1541 }
1542 ArrowDataType::Int32 => {
1543 write_primitive(typed, column.as_primitive::<Int32Type>().values(), levels)
1544 }
1545 ArrowDataType::UInt8 => {
1546 let array: Int32Array = column.as_primitive::<UInt8Type>().unary(|x| x as i32);
1547 write_primitive(typed, array.values(), levels)
1548 }
1549 ArrowDataType::UInt16 => {
1550 let array: Int32Array = column.as_primitive::<UInt16Type>().unary(|x| x as i32);
1551 write_primitive(typed, array.values(), levels)
1552 }
1553 ArrowDataType::UInt32 => {
1554 let array = column.as_primitive::<UInt32Type>();
1557 write_primitive(typed, array.values().inner().typed_data(), levels)
1558 }
1559 ArrowDataType::Date32 => {
1560 let array = column.as_primitive::<Date32Type>();
1561 write_primitive(typed, array.values(), levels)
1562 }
1563 ArrowDataType::Time32(TimeUnit::Second) => {
1564 let array = column.as_primitive::<Time32SecondType>();
1565 write_primitive(typed, array.values(), levels)
1566 }
1567 ArrowDataType::Time32(TimeUnit::Millisecond) => {
1568 let array = column.as_primitive::<Time32MillisecondType>();
1569 write_primitive(typed, array.values(), levels)
1570 }
1571 ArrowDataType::Date64 => {
1572 let array: Int32Array = column
1574 .as_primitive::<Date64Type>()
1575 .unary(|x| (x / 86_400_000) as _);
1576
1577 write_primitive(typed, array.values(), levels)
1578 }
1579 ArrowDataType::Decimal32(_, _) => {
1580 let array = column
1581 .as_primitive::<Decimal32Type>()
1582 .unary::<_, Int32Type>(|v| v);
1583 write_primitive(typed, array.values(), levels)
1584 }
1585 ArrowDataType::Decimal64(_, _) => {
1586 let array = column
1588 .as_primitive::<Decimal64Type>()
1589 .unary::<_, Int32Type>(|v| v as i32);
1590 write_primitive(typed, array.values(), levels)
1591 }
1592 ArrowDataType::Decimal128(_, _) => {
1593 let array = column
1595 .as_primitive::<Decimal128Type>()
1596 .unary::<_, Int32Type>(|v| v as i32);
1597 write_primitive(typed, array.values(), levels)
1598 }
1599 ArrowDataType::Decimal256(_, _) => {
1600 let array = column
1602 .as_primitive::<Decimal256Type>()
1603 .unary::<_, Int32Type>(|v| v.as_i128() as i32);
1604 write_primitive(typed, array.values(), levels)
1605 }
1606 d => Err(ParquetError::General(format!("Cannot coerce {d} to I32"))),
1607 }
1608 }
1609 ColumnWriter::BoolColumnWriter(typed) => {
1610 let array = column.as_boolean();
1611 let values = get_bool_array_slice(array, indices.iter().copied());
1612 typed.write_batch_internal(
1613 values.as_slice(),
1614 None,
1615 levels.def_level_data().as_ref(),
1616 levels.rep_level_data().as_ref(),
1617 None,
1618 None,
1619 None,
1620 )
1621 }
1622 ColumnWriter::Int64ColumnWriter(typed) => {
1623 match column.data_type() {
1624 ArrowDataType::Date64 => {
1625 let array = column
1626 .as_primitive::<Date64Type>()
1627 .reinterpret_cast::<Int64Type>();
1628
1629 write_primitive(typed, array.values(), levels)
1630 }
1631 ArrowDataType::Int64 => {
1632 let array = column.as_primitive::<Int64Type>();
1633 write_primitive(typed, array.values(), levels)
1634 }
1635 ArrowDataType::UInt64 => {
1636 let values = column.as_primitive::<UInt64Type>().values();
1637 let array = values.inner().typed_data::<i64>();
1640 write_primitive(typed, array, levels)
1641 }
1642 ArrowDataType::Time64(TimeUnit::Microsecond) => {
1643 let array = column.as_primitive::<Time64MicrosecondType>();
1644 write_primitive(typed, array.values(), levels)
1645 }
1646 ArrowDataType::Time64(TimeUnit::Nanosecond) => {
1647 let array = column.as_primitive::<Time64NanosecondType>();
1648 write_primitive(typed, array.values(), levels)
1649 }
1650 ArrowDataType::Timestamp(unit, _) => match unit {
1651 TimeUnit::Second => {
1652 let array = column.as_primitive::<TimestampSecondType>();
1653 write_primitive(typed, array.values(), levels)
1654 }
1655 TimeUnit::Millisecond => {
1656 let array = column.as_primitive::<TimestampMillisecondType>();
1657 write_primitive(typed, array.values(), levels)
1658 }
1659 TimeUnit::Microsecond => {
1660 let array = column.as_primitive::<TimestampMicrosecondType>();
1661 write_primitive(typed, array.values(), levels)
1662 }
1663 TimeUnit::Nanosecond => {
1664 let array = column.as_primitive::<TimestampNanosecondType>();
1665 write_primitive(typed, array.values(), levels)
1666 }
1667 },
1668 ArrowDataType::Duration(unit) => match unit {
1669 TimeUnit::Second => {
1670 let array = column.as_primitive::<DurationSecondType>();
1671 write_primitive(typed, array.values(), levels)
1672 }
1673 TimeUnit::Millisecond => {
1674 let array = column.as_primitive::<DurationMillisecondType>();
1675 write_primitive(typed, array.values(), levels)
1676 }
1677 TimeUnit::Microsecond => {
1678 let array = column.as_primitive::<DurationMicrosecondType>();
1679 write_primitive(typed, array.values(), levels)
1680 }
1681 TimeUnit::Nanosecond => {
1682 let array = column.as_primitive::<DurationNanosecondType>();
1683 write_primitive(typed, array.values(), levels)
1684 }
1685 },
1686 ArrowDataType::Decimal64(_, _) => {
1687 let array = column
1688 .as_primitive::<Decimal64Type>()
1689 .reinterpret_cast::<Int64Type>();
1690 write_primitive(typed, array.values(), levels)
1691 }
1692 ArrowDataType::Decimal128(_, _) => {
1693 let array = column
1695 .as_primitive::<Decimal128Type>()
1696 .unary::<_, Int64Type>(|v| v as i64);
1697 write_primitive(typed, array.values(), levels)
1698 }
1699 ArrowDataType::Decimal256(_, _) => {
1700 let array = column
1702 .as_primitive::<Decimal256Type>()
1703 .unary::<_, Int64Type>(|v| v.as_i128() as i64);
1704 write_primitive(typed, array.values(), levels)
1705 }
1706 d => Err(ParquetError::General(format!("Cannot coerce {d} to I64"))),
1707 }
1708 }
1709 ColumnWriter::Int96ColumnWriter(_typed) => {
1710 unreachable!("Currently unreachable because data type not supported")
1711 }
1712 ColumnWriter::FloatColumnWriter(typed) => {
1713 let array = column.as_primitive::<Float32Type>();
1714 write_primitive(typed, array.values(), levels)
1715 }
1716 ColumnWriter::DoubleColumnWriter(typed) => {
1717 let array = column.as_primitive::<Float64Type>();
1718 write_primitive(typed, array.values(), levels)
1719 }
1720 ColumnWriter::ByteArrayColumnWriter(_) => {
1721 unreachable!("should use ByteArrayWriter")
1722 }
1723 ColumnWriter::FixedLenByteArrayColumnWriter(typed) => {
1724 let bytes = match column.data_type() {
1725 ArrowDataType::Interval(interval_unit) => match interval_unit {
1726 IntervalUnit::YearMonth => {
1727 let array = column.as_primitive::<IntervalYearMonthType>();
1728 get_interval_ym_array_slice(array, indices.iter().copied())
1729 }
1730 IntervalUnit::DayTime => {
1731 let array = column.as_primitive::<IntervalDayTimeType>();
1732 get_interval_dt_array_slice(array, indices.iter().copied())
1733 }
1734 _ => {
1735 return Err(ParquetError::NYI(format!(
1736 "Attempting to write an Arrow interval type {interval_unit:?} to parquet that is not yet implemented"
1737 )));
1738 }
1739 },
1740 ArrowDataType::FixedSizeBinary(_) => {
1741 let array = column.as_fixed_size_binary();
1742 get_fsb_array_slice(array, indices.iter().copied())
1743 }
1744 ArrowDataType::Decimal32(_, _) => {
1745 let array = column.as_primitive::<Decimal32Type>();
1746 get_decimal_32_array_slice(array, indices.iter().copied())
1747 }
1748 ArrowDataType::Decimal64(_, _) => {
1749 let array = column.as_primitive::<Decimal64Type>();
1750 get_decimal_64_array_slice(array, indices.iter().copied())
1751 }
1752 ArrowDataType::Decimal128(_, _) => {
1753 let array = column.as_primitive::<Decimal128Type>();
1754 get_decimal_128_array_slice(array, indices.iter().copied())
1755 }
1756 ArrowDataType::Decimal256(_, _) => {
1757 let array = column.as_primitive::<Decimal256Type>();
1758 get_decimal_256_array_slice(array, indices.iter().copied())
1759 }
1760 ArrowDataType::Float16 => {
1761 let array = column.as_primitive::<Float16Type>();
1762 get_float_16_array_slice(array, indices.iter().copied())
1763 }
1764 _ => {
1765 return Err(ParquetError::NYI(
1766 "Attempting to write an Arrow type that is not yet implemented".to_string(),
1767 ));
1768 }
1769 };
1770 typed.write_batch_internal(
1771 bytes.as_slice(),
1772 None,
1773 levels.def_level_data().as_ref(),
1774 levels.rep_level_data().as_ref(),
1775 None,
1776 None,
1777 None,
1778 )
1779 }
1780 }
1781}
1782
1783fn write_primitive<E: ColumnValueEncoder>(
1784 writer: &mut GenericColumnWriter<E>,
1785 values: &E::Values,
1786 levels: &ArrayLevels,
1787) -> Result<usize> {
1788 writer.write_batch_internal(
1789 values,
1790 Some(levels.non_null_indices()),
1791 levels.def_level_data().as_ref(),
1792 levels.rep_level_data().as_ref(),
1793 None,
1794 None,
1795 None,
1796 )
1797}
1798
1799fn get_bool_array_slice(
1800 array: &arrow_array::BooleanArray,
1801 indices: impl ExactSizeIterator<Item = usize>,
1802) -> Vec<bool> {
1803 let mut values = Vec::with_capacity(indices.len());
1804 for i in indices {
1805 values.push(array.value(i))
1806 }
1807 values
1808}
1809
1810fn get_interval_ym_array_slice(
1813 array: &arrow_array::IntervalYearMonthArray,
1814 indices: impl ExactSizeIterator<Item = usize>,
1815) -> Vec<FixedLenByteArray> {
1816 let mut values = Vec::with_capacity(indices.len());
1817 for i in indices {
1818 let mut value = array.value(i).to_le_bytes().to_vec();
1819 let mut suffix = vec![0; 8];
1820 value.append(&mut suffix);
1821 values.push(FixedLenByteArray::from(ByteArray::from(value)))
1822 }
1823 values
1824}
1825
1826fn get_interval_dt_array_slice(
1829 array: &arrow_array::IntervalDayTimeArray,
1830 indices: impl ExactSizeIterator<Item = usize>,
1831) -> Vec<FixedLenByteArray> {
1832 let mut values = Vec::with_capacity(indices.len());
1833 for i in indices {
1834 let mut out = [0; 12];
1835 let value = array.value(i);
1836 out[4..8].copy_from_slice(&value.days.to_le_bytes());
1837 out[8..12].copy_from_slice(&value.milliseconds.to_le_bytes());
1838 values.push(FixedLenByteArray::from(ByteArray::from(out.to_vec())));
1839 }
1840 values
1841}
1842
1843fn get_decimal_32_array_slice(
1844 array: &arrow_array::Decimal32Array,
1845 indices: impl ExactSizeIterator<Item = usize>,
1846) -> Vec<FixedLenByteArray> {
1847 let mut values = Vec::with_capacity(indices.len());
1848 let size = decimal_length_from_precision(array.precision());
1849 for i in indices {
1850 let as_be_bytes = array.value(i).to_be_bytes();
1851 let resized_value = as_be_bytes[(4 - size)..].to_vec();
1852 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1853 }
1854 values
1855}
1856
1857fn get_decimal_64_array_slice(
1858 array: &arrow_array::Decimal64Array,
1859 indices: impl ExactSizeIterator<Item = usize>,
1860) -> Vec<FixedLenByteArray> {
1861 let mut values = Vec::with_capacity(indices.len());
1862 let size = decimal_length_from_precision(array.precision());
1863 for i in indices {
1864 let as_be_bytes = array.value(i).to_be_bytes();
1865 let resized_value = as_be_bytes[(8 - size)..].to_vec();
1866 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1867 }
1868 values
1869}
1870
1871fn get_decimal_128_array_slice(
1872 array: &arrow_array::Decimal128Array,
1873 indices: impl ExactSizeIterator<Item = usize>,
1874) -> Vec<FixedLenByteArray> {
1875 let mut values = Vec::with_capacity(indices.len());
1876 let size = decimal_length_from_precision(array.precision());
1877 for i in indices {
1878 let as_be_bytes = array.value(i).to_be_bytes();
1879 let resized_value = as_be_bytes[(16 - size)..].to_vec();
1880 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1881 }
1882 values
1883}
1884
1885fn get_decimal_256_array_slice(
1886 array: &arrow_array::Decimal256Array,
1887 indices: impl ExactSizeIterator<Item = usize>,
1888) -> Vec<FixedLenByteArray> {
1889 let mut values = Vec::with_capacity(indices.len());
1890 let size = decimal_length_from_precision(array.precision());
1891 for i in indices {
1892 let as_be_bytes = array.value(i).to_be_bytes();
1893 let resized_value = as_be_bytes[(32 - size)..].to_vec();
1894 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1895 }
1896 values
1897}
1898
1899fn get_float_16_array_slice(
1900 array: &arrow_array::Float16Array,
1901 indices: impl ExactSizeIterator<Item = usize>,
1902) -> Vec<FixedLenByteArray> {
1903 let mut values = Vec::with_capacity(indices.len());
1904 for i in indices {
1905 let value = array.value(i).to_le_bytes().to_vec();
1906 values.push(FixedLenByteArray::from(ByteArray::from(value)));
1907 }
1908 values
1909}
1910
1911fn get_fsb_array_slice(
1912 array: &arrow_array::FixedSizeBinaryArray,
1913 indices: impl ExactSizeIterator<Item = usize>,
1914) -> Vec<FixedLenByteArray> {
1915 let mut values = Vec::with_capacity(indices.len());
1916 for i in indices {
1917 let value = array.value(i).to_vec();
1918 values.push(FixedLenByteArray::from(ByteArray::from(value)))
1919 }
1920 values
1921}
1922
1923#[cfg(test)]
1924mod tests {
1925 use super::*;
1926 use std::collections::HashMap;
1927
1928 use std::fs::File;
1929
1930 use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
1931 use crate::arrow::{ARROW_SCHEMA_META_KEY, PARQUET_FIELD_ID_META_KEY};
1932 use crate::column::page::{Page, PageReader};
1933 use crate::file::metadata::thrift::PageHeader;
1934 use crate::file::page_index::column_index::ColumnIndexMetaData;
1935 use crate::file::reader::SerializedPageReader;
1936 use crate::parquet_thrift::{ReadThrift, ThriftSliceInputProtocol};
1937 use crate::schema::types::ColumnPath;
1938 use arrow::datatypes::ToByteSlice;
1939 use arrow::datatypes::{DataType, Schema};
1940 use arrow::error::Result as ArrowResult;
1941 use arrow::util::data_gen::create_random_array;
1942 use arrow::util::pretty::pretty_format_batches;
1943 use arrow::{array::*, buffer::Buffer};
1944 use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano, NullBuffer, OffsetBuffer, i256};
1945 use arrow_schema::Fields;
1946 use half::f16;
1947 use num_traits::{FromPrimitive, ToPrimitive};
1948 use tempfile::tempfile;
1949
1950 use crate::basic::Encoding;
1951 use crate::data_type::AsBytes;
1952 use crate::file::metadata::{ColumnChunkMetaData, ParquetMetaData, ParquetMetaDataReader};
1953 use crate::file::properties::{
1954 BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion,
1955 };
1956 use crate::file::serialized_reader::ReadOptionsBuilder;
1957 use crate::file::{
1958 reader::{FileReader, SerializedFileReader},
1959 statistics::Statistics,
1960 };
1961
1962 #[derive(Debug, Default)]
1967 struct RecordingPageStore {
1968 next: u64,
1969 blobs: HashMap<u64, Bytes>,
1970 puts: Arc<std::sync::atomic::AtomicUsize>,
1971 }
1972
1973 impl PageStore for RecordingPageStore {
1974 fn put(&mut self, value: Bytes) -> Result<PageKey> {
1975 let id = 100 + self.next * 7;
1977 self.next += 1;
1978 self.puts.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1979 self.blobs.insert(id, value);
1980 Ok(PageKey::new(id))
1981 }
1982
1983 fn take(&mut self, key: PageKey) -> Result<Bytes> {
1984 self.blobs
1985 .remove(&key.get())
1986 .ok_or_else(|| ParquetError::General(format!("missing key {}", key.get())))
1987 }
1988 }
1989
1990 #[derive(Debug)]
1991 struct RecordingPageStoreFactory {
1992 puts: Arc<std::sync::atomic::AtomicUsize>,
1993 }
1994
1995 impl PageStoreFactory for RecordingPageStoreFactory {
1996 fn create(&self, _args: &PageStoreArgs<'_>) -> Result<Box<dyn PageStore>> {
1997 Ok(Box::new(RecordingPageStore {
1998 puts: self.puts.clone(),
1999 ..Default::default()
2000 }))
2001 }
2002 }
2003
2004 #[test]
2008 fn custom_page_store_is_byte_identical_to_default() {
2009 let schema = Arc::new(Schema::new(vec![
2010 Field::new("i", DataType::Int32, true),
2011 Field::new("s", DataType::Utf8, true),
2013 ]));
2014 let i = Int32Array::from(vec![Some(1), None, Some(3), Some(4), Some(5), Some(6)]);
2015 let s = StringArray::from(vec![
2016 Some("a"),
2017 Some("bb"),
2018 Some("a"),
2019 None,
2020 Some("bb"),
2021 Some("ccc"),
2022 ]);
2023 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(i), Arc::new(s)]).unwrap();
2024
2025 let props = WriterProperties::builder()
2028 .set_max_row_group_row_count(Some(3))
2029 .build();
2030
2031 let write = |factory: Option<Arc<dyn PageStoreFactory>>| {
2032 let mut buffer = Vec::new();
2033 let mut opts = ArrowWriterOptions::new().with_properties(props.clone());
2034 if let Some(factory) = factory {
2035 opts = opts.with_page_store_factory(factory);
2036 }
2037 let mut writer =
2038 ArrowWriter::try_new_with_options(&mut buffer, schema.clone(), opts).unwrap();
2039 writer.write(&batch).unwrap();
2040 writer.close().unwrap();
2041 buffer
2042 };
2043
2044 let default_bytes = write(None);
2045
2046 let puts = Arc::new(std::sync::atomic::AtomicUsize::new(0));
2047 let custom_bytes = write(Some(Arc::new(RecordingPageStoreFactory {
2048 puts: puts.clone(),
2049 })));
2050
2051 assert!(
2052 puts.load(std::sync::atomic::Ordering::Relaxed) > 0,
2053 "custom PageStore was never written to"
2054 );
2055 assert_eq!(
2056 default_bytes, custom_bytes,
2057 "a custom PageStore must produce byte-identical output to the default"
2058 );
2059 }
2060
2061 #[test]
2067 fn dictionary_column_round_trips_with_offset_index_disabled() {
2068 let schema = Arc::new(Schema::new(vec![Field::new("k", DataType::Int32, true)]));
2069
2070 let values: Vec<Option<i32>> = (0..50_000).map(|i| Some(i % 8)).collect();
2073 let array = Int32Array::from(values.clone());
2074 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
2075
2076 let props = WriterProperties::builder()
2077 .set_offset_index_disabled(true)
2078 .set_data_page_row_count_limit(4096)
2079 .build();
2080 let opts = ArrowWriterOptions::new().with_properties(props);
2081
2082 let mut buffer = Vec::new();
2083 let mut writer =
2084 ArrowWriter::try_new_with_options(&mut buffer, schema.clone(), opts).unwrap();
2085 writer.write(&batch).unwrap();
2086 writer.close().unwrap();
2087
2088 let reader = ParquetRecordBatchReader::try_new(Bytes::from(buffer), values.len()).unwrap();
2089 let read: Vec<RecordBatch> = reader.collect::<ArrowResult<_>>().unwrap();
2090 let read_values: Vec<Option<i32>> = read
2091 .iter()
2092 .flat_map(|b| b.column(0).as_primitive::<Int32Type>().iter())
2093 .collect();
2094 assert_eq!(read_values, values);
2095 }
2096
2097 #[test]
2098 fn arrow_writer() {
2099 let schema = Schema::new(vec![
2101 Field::new("a", DataType::Int32, false),
2102 Field::new("b", DataType::Int32, true),
2103 ]);
2104
2105 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2107 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
2108
2109 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
2111
2112 roundtrip(batch, Some(SMALL_SIZE / 2));
2113 }
2114
2115 fn get_bytes_after_close(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
2116 let mut buffer = vec![];
2117
2118 let mut writer = ArrowWriter::try_new(&mut buffer, schema, None).unwrap();
2119 writer.write(expected_batch).unwrap();
2120 writer.close().unwrap();
2121
2122 buffer
2123 }
2124
2125 fn get_bytes_by_into_inner(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
2126 let mut writer = ArrowWriter::try_new(Vec::new(), schema, None).unwrap();
2127 writer.write(expected_batch).unwrap();
2128 writer.into_inner().unwrap()
2129 }
2130
2131 #[test]
2132 fn roundtrip_bytes() {
2133 let schema = Arc::new(Schema::new(vec![
2135 Field::new("a", DataType::Int32, false),
2136 Field::new("b", DataType::Int32, true),
2137 ]));
2138
2139 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2141 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
2142
2143 let expected_batch =
2145 RecordBatch::try_new(schema.clone(), vec![Arc::new(a), Arc::new(b)]).unwrap();
2146
2147 for buffer in [
2148 get_bytes_after_close(schema.clone(), &expected_batch),
2149 get_bytes_by_into_inner(schema, &expected_batch),
2150 ] {
2151 let cursor = Bytes::from(buffer);
2152 let mut record_batch_reader = ParquetRecordBatchReader::try_new(cursor, 1024).unwrap();
2153
2154 let actual_batch = record_batch_reader
2155 .next()
2156 .expect("No batch found")
2157 .expect("Unable to get batch");
2158
2159 assert_eq!(expected_batch.schema(), actual_batch.schema());
2160 assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
2161 assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
2162 for i in 0..expected_batch.num_columns() {
2163 let expected_data = expected_batch.column(i).to_data();
2164 let actual_data = actual_batch.column(i).to_data();
2165
2166 assert_eq!(expected_data, actual_data);
2167 }
2168 }
2169 }
2170
2171 #[test]
2172 fn arrow_writer_non_null() {
2173 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
2175
2176 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2178
2179 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2181
2182 roundtrip(batch, Some(SMALL_SIZE / 2));
2183 }
2184
2185 #[test]
2186 fn arrow_writer_list() {
2187 let schema = Schema::new(vec![Field::new(
2189 "a",
2190 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
2191 true,
2192 )]);
2193
2194 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2196
2197 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
2200
2201 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2203 DataType::Int32,
2204 false,
2205 ))))
2206 .len(5)
2207 .add_buffer(a_value_offsets)
2208 .add_child_data(a_values.into_data())
2209 .null_bit_buffer(Some(Buffer::from([0b00011011])))
2210 .build()
2211 .unwrap();
2212 let a = ListArray::from(a_list_data);
2213
2214 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2216
2217 assert_eq!(batch.column(0).null_count(), 1);
2218
2219 roundtrip(batch, None);
2222 }
2223
2224 #[test]
2225 fn arrow_writer_list_non_null() {
2226 let schema = Schema::new(vec![Field::new(
2228 "a",
2229 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
2230 false,
2231 )]);
2232
2233 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2235
2236 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
2239
2240 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2242 DataType::Int32,
2243 false,
2244 ))))
2245 .len(5)
2246 .add_buffer(a_value_offsets)
2247 .add_child_data(a_values.into_data())
2248 .build()
2249 .unwrap();
2250 let a = ListArray::from(a_list_data);
2251
2252 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2254
2255 assert_eq!(batch.column(0).null_count(), 0);
2258
2259 roundtrip(batch, None);
2260 }
2261
2262 #[test]
2263 fn arrow_writer_list_view() {
2264 let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
2265 let schema = Schema::new(vec![Field::new(
2266 "a",
2267 DataType::ListView(list_field.clone()),
2268 true,
2269 )]);
2270
2271 let a = ListViewArray::new(
2273 list_field,
2274 vec![0, 1, 0, 3, 6].into(),
2275 vec![1, 2, 0, 3, 4].into(),
2276 Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
2277 Some(vec![true, true, false, true, true].into()),
2278 );
2279
2280 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2281
2282 assert_eq!(batch.column(0).null_count(), 1);
2283
2284 roundtrip(batch, None);
2285 }
2286
2287 #[test]
2288 fn arrow_writer_list_view_non_null() {
2289 let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
2290 let schema = Schema::new(vec![Field::new(
2291 "a",
2292 DataType::ListView(list_field.clone()),
2293 false,
2294 )]);
2295
2296 let a = ListViewArray::new(
2298 list_field,
2299 vec![0, 1, 0, 3, 6].into(),
2300 vec![1, 2, 0, 3, 4].into(),
2301 Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
2302 None,
2303 );
2304
2305 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2306
2307 assert_eq!(batch.column(0).null_count(), 0);
2308
2309 roundtrip(batch, None);
2310 }
2311
2312 #[test]
2313 fn arrow_writer_list_view_out_of_order() {
2314 let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
2315 let schema = Schema::new(vec![Field::new(
2316 "a",
2317 DataType::ListView(list_field.clone()),
2318 false,
2319 )]);
2320
2321 let a = ListViewArray::new(
2323 list_field,
2324 vec![0, 1, 0, 6, 3].into(),
2325 vec![1, 2, 0, 4, 3].into(),
2326 Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
2327 None,
2328 );
2329
2330 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2331
2332 roundtrip(batch, None);
2333 }
2334
2335 #[test]
2336 fn arrow_writer_large_list_view() {
2337 let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
2338 let schema = Schema::new(vec![Field::new(
2339 "a",
2340 DataType::LargeListView(list_field.clone()),
2341 true,
2342 )]);
2343
2344 let a = LargeListViewArray::new(
2346 list_field,
2347 vec![0i64, 1, 0, 3, 6].into(),
2348 vec![1i64, 2, 0, 3, 4].into(),
2349 Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
2350 Some(vec![true, true, false, true, true].into()),
2351 );
2352
2353 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2354
2355 assert_eq!(batch.column(0).null_count(), 1);
2356
2357 roundtrip(batch, None);
2358 }
2359
2360 #[test]
2361 fn arrow_writer_list_view_with_struct() {
2362 let struct_fields = Fields::from(vec![
2364 Field::new("id", DataType::Int32, false),
2365 Field::new("name", DataType::Utf8, false),
2366 ]);
2367 let struct_type = DataType::Struct(struct_fields.clone());
2368 let list_field = Arc::new(Field::new("item", struct_type.clone(), false));
2369
2370 let schema = Schema::new(vec![Field::new(
2371 "a",
2372 DataType::ListView(list_field.clone()),
2373 true,
2374 )]);
2375
2376 let id_array = Int32Array::from(vec![1, 2, 3, 4, 5]);
2378 let name_array = StringArray::from(vec!["a", "b", "c", "d", "e"]);
2379 let struct_array = StructArray::new(
2380 struct_fields,
2381 vec![Arc::new(id_array), Arc::new(name_array)],
2382 None,
2383 );
2384
2385 let list_view = ListViewArray::new(
2387 list_field,
2388 vec![0, 2, 2].into(), vec![2, 0, 3].into(), Arc::new(struct_array),
2391 Some(vec![true, false, true].into()),
2392 );
2393
2394 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(list_view)]).unwrap();
2395
2396 roundtrip(batch, None);
2397 }
2398
2399 #[test]
2400 fn arrow_writer_binary() {
2401 let string_field = Field::new("a", DataType::Utf8, false);
2402 let binary_field = Field::new("b", DataType::Binary, false);
2403 let schema = Schema::new(vec![string_field, binary_field]);
2404
2405 let raw_string_values = vec!["foo", "bar", "baz", "quux"];
2406 let raw_binary_values = [
2407 b"foo".to_vec(),
2408 b"bar".to_vec(),
2409 b"baz".to_vec(),
2410 b"quux".to_vec(),
2411 ];
2412 let raw_binary_value_refs = raw_binary_values
2413 .iter()
2414 .map(|x| x.as_slice())
2415 .collect::<Vec<_>>();
2416
2417 let string_values = StringArray::from(raw_string_values.clone());
2418 let binary_values = BinaryArray::from(raw_binary_value_refs);
2419 let batch = RecordBatch::try_new(
2420 Arc::new(schema),
2421 vec![Arc::new(string_values), Arc::new(binary_values)],
2422 )
2423 .unwrap();
2424
2425 roundtrip(batch, Some(SMALL_SIZE / 2));
2426 }
2427
2428 #[test]
2429 fn arrow_writer_binary_view() {
2430 let string_field = Field::new("a", DataType::Utf8View, false);
2431 let binary_field = Field::new("b", DataType::BinaryView, false);
2432 let nullable_string_field = Field::new("a", DataType::Utf8View, true);
2433 let schema = Schema::new(vec![string_field, binary_field, nullable_string_field]);
2434
2435 let raw_string_values = vec!["foo", "bar", "large payload over 12 bytes", "lulu"];
2436 let raw_binary_values = vec![
2437 b"foo".to_vec(),
2438 b"bar".to_vec(),
2439 b"large payload over 12 bytes".to_vec(),
2440 b"lulu".to_vec(),
2441 ];
2442 let nullable_string_values =
2443 vec![Some("foo"), None, Some("large payload over 12 bytes"), None];
2444
2445 let string_view_values = StringViewArray::from(raw_string_values);
2446 let binary_view_values = BinaryViewArray::from_iter_values(raw_binary_values);
2447 let nullable_string_view_values = StringViewArray::from(nullable_string_values);
2448 let batch = RecordBatch::try_new(
2449 Arc::new(schema),
2450 vec![
2451 Arc::new(string_view_values),
2452 Arc::new(binary_view_values),
2453 Arc::new(nullable_string_view_values),
2454 ],
2455 )
2456 .unwrap();
2457
2458 roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
2459 roundtrip(batch, None);
2460 }
2461
2462 #[test]
2463 fn arrow_writer_binary_view_long_value() {
2464 let string_field = Field::new("a", DataType::Utf8View, false);
2465 let binary_field = Field::new("b", DataType::BinaryView, false);
2466 let schema = Schema::new(vec![string_field, binary_field]);
2467
2468 let long = "a".repeat(128);
2472 let raw_string_values = vec!["foo", long.as_str(), "bar"];
2473 let raw_binary_values = vec![b"foo".to_vec(), long.as_bytes().to_vec(), b"bar".to_vec()];
2474
2475 let string_view_values: ArrayRef = Arc::new(StringViewArray::from(raw_string_values));
2476 let binary_view_values: ArrayRef =
2477 Arc::new(BinaryViewArray::from_iter_values(raw_binary_values));
2478
2479 one_column_roundtrip(Arc::clone(&string_view_values), false);
2480 one_column_roundtrip(Arc::clone(&binary_view_values), false);
2481
2482 let batch = RecordBatch::try_new(
2483 Arc::new(schema),
2484 vec![string_view_values, binary_view_values],
2485 )
2486 .unwrap();
2487
2488 for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2490 let props = WriterProperties::builder()
2491 .set_writer_version(version)
2492 .set_dictionary_enabled(false)
2493 .build();
2494 roundtrip_opts(&batch, props);
2495 }
2496 }
2497
2498 fn get_decimal_batch(precision: u8, scale: i8) -> RecordBatch {
2499 let decimal_field = Field::new("a", DataType::Decimal128(precision, scale), false);
2500 let schema = Schema::new(vec![decimal_field]);
2501
2502 let decimal_values = vec![10_000, 50_000, 0, -100]
2503 .into_iter()
2504 .map(Some)
2505 .collect::<Decimal128Array>()
2506 .with_precision_and_scale(precision, scale)
2507 .unwrap();
2508
2509 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(decimal_values)]).unwrap()
2510 }
2511
2512 #[test]
2513 fn arrow_writer_decimal() {
2514 let batch_int32_decimal = get_decimal_batch(5, 2);
2516 roundtrip(batch_int32_decimal, Some(SMALL_SIZE / 2));
2517 let batch_int64_decimal = get_decimal_batch(12, 2);
2519 roundtrip(batch_int64_decimal, Some(SMALL_SIZE / 2));
2520 let batch_fixed_len_byte_array_decimal = get_decimal_batch(30, 2);
2522 roundtrip(batch_fixed_len_byte_array_decimal, Some(SMALL_SIZE / 2));
2523 }
2524
2525 #[test]
2526 fn arrow_writer_complex() {
2527 let struct_field_d = Arc::new(Field::new("d", DataType::Float64, true));
2529 let struct_field_f = Arc::new(Field::new("f", DataType::Float32, true));
2530 let struct_field_g = Arc::new(Field::new_list(
2531 "g",
2532 Field::new_list_field(DataType::Int16, true),
2533 false,
2534 ));
2535 let struct_field_h = Arc::new(Field::new_list(
2536 "h",
2537 Field::new_list_field(DataType::Int16, false),
2538 true,
2539 ));
2540 let struct_field_e = Arc::new(Field::new_struct(
2541 "e",
2542 vec![
2543 struct_field_f.clone(),
2544 struct_field_g.clone(),
2545 struct_field_h.clone(),
2546 ],
2547 false,
2548 ));
2549 let schema = Schema::new(vec![
2550 Field::new("a", DataType::Int32, false),
2551 Field::new("b", DataType::Int32, true),
2552 Field::new_struct(
2553 "c",
2554 vec![struct_field_d.clone(), struct_field_e.clone()],
2555 false,
2556 ),
2557 ]);
2558
2559 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2561 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
2562 let d = Float64Array::from(vec![None, None, None, Some(1.0), None]);
2563 let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]);
2564
2565 let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2566
2567 let g_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
2570
2571 let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
2573 .len(5)
2574 .add_buffer(g_value_offsets.clone())
2575 .add_child_data(g_value.to_data())
2576 .build()
2577 .unwrap();
2578 let g = ListArray::from(g_list_data);
2579 let h_list_data = ArrayData::builder(struct_field_h.data_type().clone())
2581 .len(5)
2582 .add_buffer(g_value_offsets)
2583 .add_child_data(g_value.to_data())
2584 .null_bit_buffer(Some(Buffer::from([0b00011011])))
2585 .build()
2586 .unwrap();
2587 let h = ListArray::from(h_list_data);
2588
2589 let e = StructArray::from(vec![
2590 (struct_field_f, Arc::new(f) as ArrayRef),
2591 (struct_field_g, Arc::new(g) as ArrayRef),
2592 (struct_field_h, Arc::new(h) as ArrayRef),
2593 ]);
2594
2595 let c = StructArray::from(vec![
2596 (struct_field_d, Arc::new(d) as ArrayRef),
2597 (struct_field_e, Arc::new(e) as ArrayRef),
2598 ]);
2599
2600 let batch = RecordBatch::try_new(
2602 Arc::new(schema),
2603 vec![Arc::new(a), Arc::new(b), Arc::new(c)],
2604 )
2605 .unwrap();
2606
2607 roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
2608 roundtrip(batch, Some(SMALL_SIZE / 3));
2609 }
2610
2611 #[test]
2612 fn arrow_writer_complex_mixed() {
2613 let offset_field = Arc::new(Field::new("offset", DataType::Int32, false));
2618 let partition_field = Arc::new(Field::new("partition", DataType::Int64, true));
2619 let topic_field = Arc::new(Field::new("topic", DataType::Utf8, true));
2620 let schema = Schema::new(vec![Field::new(
2621 "some_nested_object",
2622 DataType::Struct(Fields::from(vec![
2623 offset_field.clone(),
2624 partition_field.clone(),
2625 topic_field.clone(),
2626 ])),
2627 false,
2628 )]);
2629
2630 let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
2632 let partition = Int64Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
2633 let topic = StringArray::from(vec![Some("A"), None, Some("A"), Some(""), None]);
2634
2635 let some_nested_object = StructArray::from(vec![
2636 (offset_field, Arc::new(offset) as ArrayRef),
2637 (partition_field, Arc::new(partition) as ArrayRef),
2638 (topic_field, Arc::new(topic) as ArrayRef),
2639 ]);
2640
2641 let batch =
2643 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
2644
2645 roundtrip(batch, Some(SMALL_SIZE / 2));
2646 }
2647
2648 #[test]
2649 fn arrow_writer_map() {
2650 let json_content = r#"
2652 {"stocks":{"long": "$AAA", "short": "$BBB"}}
2653 {"stocks":{"long": null, "long": "$CCC", "short": null}}
2654 {"stocks":{"hedged": "$YYY", "long": null, "short": "$D"}}
2655 "#;
2656 let entries_struct_type = DataType::Struct(Fields::from(vec![
2657 Field::new("key", DataType::Utf8, false),
2658 Field::new("value", DataType::Utf8, true),
2659 ]));
2660 let stocks_field = Field::new(
2661 "stocks",
2662 DataType::Map(
2663 Arc::new(Field::new("entries", entries_struct_type, false)),
2664 false,
2665 ),
2666 true,
2667 );
2668 let schema = Arc::new(Schema::new(vec![stocks_field]));
2669 let builder = arrow::json::ReaderBuilder::new(schema).with_batch_size(64);
2670 let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
2671
2672 let batch = reader.next().unwrap().unwrap();
2673 roundtrip(batch, None);
2674 }
2675
2676 #[test]
2677 fn arrow_writer_2_level_struct() {
2678 let field_c = Field::new("c", DataType::Int32, true);
2680 let field_b = Field::new("b", DataType::Struct(vec![field_c].into()), true);
2681 let type_a = DataType::Struct(vec![field_b.clone()].into());
2682 let field_a = Field::new("a", type_a, true);
2683 let schema = Schema::new(vec![field_a.clone()]);
2684
2685 let c = Int32Array::from(vec![Some(1), None, Some(3), None, None, Some(6)]);
2687 let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
2688 .len(6)
2689 .null_bit_buffer(Some(Buffer::from([0b00100111])))
2690 .add_child_data(c.into_data())
2691 .build()
2692 .unwrap();
2693 let b = StructArray::from(b_data);
2694 let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
2695 .len(6)
2696 .null_bit_buffer(Some(Buffer::from([0b00101111])))
2697 .add_child_data(b.into_data())
2698 .build()
2699 .unwrap();
2700 let a = StructArray::from(a_data);
2701
2702 assert_eq!(a.null_count(), 1);
2703 assert_eq!(a.column(0).null_count(), 2);
2704
2705 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2707
2708 roundtrip(batch, Some(SMALL_SIZE / 2));
2709 }
2710
2711 #[test]
2712 fn arrow_writer_2_level_struct_non_null() {
2713 let field_c = Field::new("c", DataType::Int32, false);
2715 let type_b = DataType::Struct(vec![field_c].into());
2716 let field_b = Field::new("b", type_b.clone(), false);
2717 let type_a = DataType::Struct(vec![field_b].into());
2718 let field_a = Field::new("a", type_a.clone(), false);
2719 let schema = Schema::new(vec![field_a]);
2720
2721 let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
2723 let b_data = ArrayDataBuilder::new(type_b)
2724 .len(6)
2725 .add_child_data(c.into_data())
2726 .build()
2727 .unwrap();
2728 let b = StructArray::from(b_data);
2729 let a_data = ArrayDataBuilder::new(type_a)
2730 .len(6)
2731 .add_child_data(b.into_data())
2732 .build()
2733 .unwrap();
2734 let a = StructArray::from(a_data);
2735
2736 assert_eq!(a.null_count(), 0);
2737 assert_eq!(a.column(0).null_count(), 0);
2738
2739 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2741
2742 roundtrip(batch, Some(SMALL_SIZE / 2));
2743 }
2744
2745 #[test]
2746 fn arrow_writer_2_level_struct_mixed_null() {
2747 let field_c = Field::new("c", DataType::Int32, false);
2749 let type_b = DataType::Struct(vec![field_c].into());
2750 let field_b = Field::new("b", type_b.clone(), true);
2751 let type_a = DataType::Struct(vec![field_b].into());
2752 let field_a = Field::new("a", type_a.clone(), false);
2753 let schema = Schema::new(vec![field_a]);
2754
2755 let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
2757 let b_data = ArrayDataBuilder::new(type_b)
2758 .len(6)
2759 .null_bit_buffer(Some(Buffer::from([0b00100111])))
2760 .add_child_data(c.into_data())
2761 .build()
2762 .unwrap();
2763 let b = StructArray::from(b_data);
2764 let a_data = ArrayDataBuilder::new(type_a)
2766 .len(6)
2767 .add_child_data(b.into_data())
2768 .build()
2769 .unwrap();
2770 let a = StructArray::from(a_data);
2771
2772 assert_eq!(a.null_count(), 0);
2773 assert_eq!(a.column(0).null_count(), 2);
2774
2775 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2777
2778 roundtrip(batch, Some(SMALL_SIZE / 2));
2779 }
2780
2781 #[test]
2782 fn arrow_writer_2_level_struct_mixed_null_2() {
2783 let field_c = Field::new("c", DataType::Int32, false);
2785 let field_d = Field::new("d", DataType::FixedSizeBinary(4), false);
2786 let field_e = Field::new(
2787 "e",
2788 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2789 false,
2790 );
2791
2792 let field_b = Field::new(
2793 "b",
2794 DataType::Struct(vec![field_c, field_d, field_e].into()),
2795 false,
2796 );
2797 let type_a = DataType::Struct(vec![field_b.clone()].into());
2798 let field_a = Field::new("a", type_a, true);
2799 let schema = Schema::new(vec![field_a.clone()]);
2800
2801 let c = Int32Array::from_iter_values(0..6);
2803 let d = FixedSizeBinaryArray::try_from_iter(
2804 ["aaaa", "bbbb", "cccc", "dddd", "eeee", "ffff"].into_iter(),
2805 )
2806 .expect("four byte values");
2807 let e = Int32DictionaryArray::from_iter(["one", "two", "three", "four", "five", "one"]);
2808 let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
2809 .len(6)
2810 .add_child_data(c.into_data())
2811 .add_child_data(d.into_data())
2812 .add_child_data(e.into_data())
2813 .build()
2814 .unwrap();
2815 let b = StructArray::from(b_data);
2816 let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
2817 .len(6)
2818 .null_bit_buffer(Some(Buffer::from([0b00100101])))
2819 .add_child_data(b.into_data())
2820 .build()
2821 .unwrap();
2822 let a = StructArray::from(a_data);
2823
2824 assert_eq!(a.null_count(), 3);
2825 assert_eq!(a.column(0).null_count(), 0);
2826
2827 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2829
2830 roundtrip(batch, Some(SMALL_SIZE / 2));
2831 }
2832
2833 #[test]
2834 fn test_fixed_size_binary_in_dict() {
2835 fn test_fixed_size_binary_in_dict_inner<K>()
2836 where
2837 K: ArrowDictionaryKeyType,
2838 K::Native: FromPrimitive + ToPrimitive + TryFrom<u8>,
2839 <<K as arrow_array::ArrowPrimitiveType>::Native as TryFrom<u8>>::Error: std::fmt::Debug,
2840 {
2841 let field = Field::new(
2842 "a",
2843 DataType::Dictionary(
2844 Box::new(K::DATA_TYPE),
2845 Box::new(DataType::FixedSizeBinary(4)),
2846 ),
2847 false,
2848 );
2849 let schema = Schema::new(vec![field]);
2850
2851 let keys: Vec<K::Native> = vec![
2852 K::Native::try_from(0u8).unwrap(),
2853 K::Native::try_from(0u8).unwrap(),
2854 K::Native::try_from(1u8).unwrap(),
2855 ];
2856 let keys = PrimitiveArray::<K>::from_iter_values(keys);
2857 let values = FixedSizeBinaryArray::try_from_iter(
2858 vec![vec![0, 0, 0, 0], vec![1, 1, 1, 1]].into_iter(),
2859 )
2860 .unwrap();
2861
2862 let data = DictionaryArray::<K>::new(keys, Arc::new(values));
2863 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(data)]).unwrap();
2864 roundtrip(batch, None);
2865 }
2866
2867 test_fixed_size_binary_in_dict_inner::<UInt8Type>();
2868 test_fixed_size_binary_in_dict_inner::<UInt16Type>();
2869 test_fixed_size_binary_in_dict_inner::<UInt32Type>();
2870 test_fixed_size_binary_in_dict_inner::<UInt16Type>();
2871 test_fixed_size_binary_in_dict_inner::<Int8Type>();
2872 test_fixed_size_binary_in_dict_inner::<Int16Type>();
2873 test_fixed_size_binary_in_dict_inner::<Int32Type>();
2874 test_fixed_size_binary_in_dict_inner::<Int64Type>();
2875 }
2876
2877 #[test]
2878 fn test_empty_dict() {
2879 let struct_fields = Fields::from(vec![Field::new(
2880 "dict",
2881 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2882 false,
2883 )]);
2884
2885 let schema = Schema::new(vec![Field::new_struct(
2886 "struct",
2887 struct_fields.clone(),
2888 true,
2889 )]);
2890 let dictionary = Arc::new(DictionaryArray::new(
2891 Int32Array::new_null(5),
2892 Arc::new(StringArray::new_null(0)),
2893 ));
2894
2895 let s = StructArray::new(
2896 struct_fields,
2897 vec![dictionary],
2898 Some(NullBuffer::new_null(5)),
2899 );
2900
2901 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(s)]).unwrap();
2902 roundtrip(batch, None);
2903 }
2904 #[test]
2905 fn arrow_writer_page_size() {
2906 let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)]));
2907
2908 let mut builder = StringBuilder::with_capacity(100, 329 * 10_000);
2909
2910 for i in 0..10 {
2912 let value = i
2913 .to_string()
2914 .repeat(10)
2915 .chars()
2916 .take(10)
2917 .collect::<String>();
2918
2919 builder.append_value(value);
2920 }
2921
2922 let array = Arc::new(builder.finish());
2923
2924 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
2925
2926 let file = tempfile::tempfile().unwrap();
2927
2928 let props = WriterProperties::builder()
2930 .set_data_page_size_limit(1)
2931 .set_dictionary_page_size_limit(1)
2932 .set_write_batch_size(1)
2933 .build();
2934
2935 let mut writer =
2936 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
2937 .expect("Unable to write file");
2938 writer.write(&batch).unwrap();
2939 writer.close().unwrap();
2940
2941 let options = ReadOptionsBuilder::new().with_page_index().build();
2942 let reader =
2943 SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap();
2944
2945 let column = reader.metadata().row_group(0).columns();
2946
2947 assert_eq!(column.len(), 1);
2948
2949 assert!(
2952 column[0].dictionary_page_offset().is_some(),
2953 "Expected a dictionary page"
2954 );
2955
2956 assert!(reader.metadata().offset_index().is_some());
2957 let offset_indexes = &reader.metadata().offset_index().unwrap()[0];
2958
2959 let page_locations = offset_indexes[0].page_locations.clone();
2960
2961 assert_eq!(
2964 page_locations.len(),
2965 10,
2966 "Expected 10 pages but got {page_locations:#?}"
2967 );
2968 }
2969
2970 #[test]
2971 fn arrow_writer_float_nans() {
2972 let f16_field = Field::new("a", DataType::Float16, false);
2973 let f32_field = Field::new("b", DataType::Float32, false);
2974 let f64_field = Field::new("c", DataType::Float64, false);
2975 let schema = Schema::new(vec![f16_field, f32_field, f64_field]);
2976
2977 let f16_values = (0..MEDIUM_SIZE)
2978 .map(|i| {
2979 Some(if i % 2 == 0 {
2980 f16::NAN
2981 } else {
2982 f16::from_f32(i as f32)
2983 })
2984 })
2985 .collect::<Float16Array>();
2986
2987 let f32_values = (0..MEDIUM_SIZE)
2988 .map(|i| Some(if i % 2 == 0 { f32::NAN } else { i as f32 }))
2989 .collect::<Float32Array>();
2990
2991 let f64_values = (0..MEDIUM_SIZE)
2992 .map(|i| Some(if i % 2 == 0 { f64::NAN } else { i as f64 }))
2993 .collect::<Float64Array>();
2994
2995 let batch = RecordBatch::try_new(
2996 Arc::new(schema),
2997 vec![
2998 Arc::new(f16_values),
2999 Arc::new(f32_values),
3000 Arc::new(f64_values),
3001 ],
3002 )
3003 .unwrap();
3004
3005 roundtrip(batch, None);
3006 }
3007
3008 const SMALL_SIZE: usize = 7;
3009 const MEDIUM_SIZE: usize = 63;
3010
3011 fn roundtrip(expected_batch: RecordBatch, max_row_group_size: Option<usize>) -> Vec<Bytes> {
3014 let mut files = vec![];
3015 for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
3016 let mut props = WriterProperties::builder().set_writer_version(version);
3017
3018 if let Some(size) = max_row_group_size {
3019 props = props.set_max_row_group_row_count(Some(size))
3020 }
3021
3022 let props = props.build();
3023 files.push(roundtrip_opts(&expected_batch, props))
3024 }
3025 files
3026 }
3027
3028 fn roundtrip_opts_with_array_validation<F>(
3032 expected_batch: &RecordBatch,
3033 props: WriterProperties,
3034 validate: F,
3035 ) -> Bytes
3036 where
3037 F: Fn(&ArrayData, &ArrayData),
3038 {
3039 let mut file = vec![];
3040
3041 let mut writer = ArrowWriter::try_new(&mut file, expected_batch.schema(), Some(props))
3042 .expect("Unable to write file");
3043 writer.write(expected_batch).unwrap();
3044 writer.close().unwrap();
3045
3046 let file = Bytes::from(file);
3047 let mut record_batch_reader =
3048 ParquetRecordBatchReader::try_new(file.clone(), 1024).unwrap();
3049
3050 let actual_batch = record_batch_reader
3051 .next()
3052 .expect("No batch found")
3053 .expect("Unable to get batch");
3054
3055 assert_eq!(expected_batch.schema(), actual_batch.schema());
3056 assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
3057 assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
3058 for i in 0..expected_batch.num_columns() {
3059 let expected_data = expected_batch.column(i).to_data();
3060 let actual_data = actual_batch.column(i).to_data();
3061 validate(&expected_data, &actual_data);
3062 }
3063
3064 file
3065 }
3066
3067 fn roundtrip_opts(expected_batch: &RecordBatch, props: WriterProperties) -> Bytes {
3068 roundtrip_opts_with_array_validation(expected_batch, props, |a, b| {
3069 a.validate_full().expect("valid expected data");
3070 b.validate_full().expect("valid actual data");
3071 assert_eq!(a, b)
3072 })
3073 }
3074
3075 struct RoundTripOptions {
3076 values: ArrayRef,
3077 schema: SchemaRef,
3078 bloom_filter: bool,
3079 bloom_filter_ndv: Option<u64>,
3080 bloom_filter_position: BloomFilterPosition,
3081 }
3082
3083 impl RoundTripOptions {
3084 fn new(values: ArrayRef, nullable: bool) -> Self {
3085 let data_type = values.data_type().clone();
3086 let schema = Schema::new(vec![Field::new("col", data_type, nullable)]);
3087 Self {
3088 values,
3089 schema: Arc::new(schema),
3090 bloom_filter: false,
3091 bloom_filter_ndv: None,
3092 bloom_filter_position: BloomFilterPosition::AfterRowGroup,
3093 }
3094 }
3095 }
3096
3097 fn one_column_roundtrip(values: ArrayRef, nullable: bool) -> Vec<Bytes> {
3098 one_column_roundtrip_with_options(RoundTripOptions::new(values, nullable))
3099 }
3100
3101 fn one_column_roundtrip_with_schema(values: ArrayRef, schema: SchemaRef) -> Vec<Bytes> {
3102 let mut options = RoundTripOptions::new(values, false);
3103 options.schema = schema;
3104 one_column_roundtrip_with_options(options)
3105 }
3106
3107 fn one_column_roundtrip_with_options(options: RoundTripOptions) -> Vec<Bytes> {
3108 let RoundTripOptions {
3109 values,
3110 schema,
3111 bloom_filter,
3112 bloom_filter_ndv,
3113 bloom_filter_position,
3114 } = options;
3115
3116 let encodings = match values.data_type() {
3117 DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary | DataType::LargeBinary => {
3118 vec![
3119 Encoding::PLAIN,
3120 Encoding::DELTA_BYTE_ARRAY,
3121 Encoding::DELTA_LENGTH_BYTE_ARRAY,
3122 ]
3123 }
3124 DataType::Int64
3125 | DataType::Int32
3126 | DataType::Int16
3127 | DataType::Int8
3128 | DataType::UInt64
3129 | DataType::UInt32
3130 | DataType::UInt16
3131 | DataType::UInt8 => vec![
3132 Encoding::PLAIN,
3133 Encoding::DELTA_BINARY_PACKED,
3134 Encoding::BYTE_STREAM_SPLIT,
3135 ],
3136 DataType::Float32 | DataType::Float64 => {
3137 vec![Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT]
3138 }
3139 _ => vec![Encoding::PLAIN],
3140 };
3141
3142 let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3143
3144 let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
3145
3146 let mut files = vec![];
3147 for dictionary_size in [0, 1, 1024] {
3148 for encoding in &encodings {
3149 for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
3150 for row_group_size in row_group_sizes {
3151 let mut builder = WriterProperties::builder()
3152 .set_writer_version(version)
3153 .set_max_row_group_row_count(Some(row_group_size))
3154 .set_dictionary_enabled(dictionary_size != 0)
3155 .set_dictionary_page_size_limit(dictionary_size.max(1))
3156 .set_encoding(*encoding)
3157 .set_bloom_filter_enabled(bloom_filter)
3158 .set_bloom_filter_position(bloom_filter_position);
3159 if let Some(ndv) = bloom_filter_ndv {
3160 builder = builder.set_bloom_filter_max_ndv(ndv);
3161 }
3162 let props = builder.build();
3163
3164 files.push(roundtrip_opts(&expected_batch, props))
3165 }
3166 }
3167 }
3168 }
3169 files
3170 }
3171
3172 fn values_required<A, I>(iter: I) -> Vec<Bytes>
3173 where
3174 A: From<Vec<I::Item>> + Array + 'static,
3175 I: IntoIterator,
3176 {
3177 let raw_values: Vec<_> = iter.into_iter().collect();
3178 let values = Arc::new(A::from(raw_values));
3179 one_column_roundtrip(values, false)
3180 }
3181
3182 fn values_optional<A, I>(iter: I) -> Vec<Bytes>
3183 where
3184 A: From<Vec<Option<I::Item>>> + Array + 'static,
3185 I: IntoIterator,
3186 {
3187 let optional_raw_values: Vec<_> = iter
3188 .into_iter()
3189 .enumerate()
3190 .map(|(i, v)| if i % 2 == 0 { None } else { Some(v) })
3191 .collect();
3192 let optional_values = Arc::new(A::from(optional_raw_values));
3193 one_column_roundtrip(optional_values, true)
3194 }
3195
3196 fn required_and_optional<A, I>(iter: I)
3197 where
3198 A: From<Vec<I::Item>> + From<Vec<Option<I::Item>>> + Array + 'static,
3199 I: IntoIterator + Clone,
3200 {
3201 values_required::<A, I>(iter.clone());
3202 values_optional::<A, I>(iter);
3203 }
3204
3205 fn check_bloom_filter<T: AsBytes>(
3206 files: Vec<Bytes>,
3207 file_column: String,
3208 positive_values: Vec<T>,
3209 negative_values: Vec<T>,
3210 ) {
3211 files.into_iter().take(1).for_each(|file| {
3212 let file_reader = SerializedFileReader::new_with_options(
3213 file,
3214 ReadOptionsBuilder::new()
3215 .with_reader_properties(
3216 ReaderProperties::builder()
3217 .set_read_bloom_filter(true)
3218 .build(),
3219 )
3220 .build(),
3221 )
3222 .expect("Unable to open file as Parquet");
3223 let metadata = file_reader.metadata();
3224
3225 let mut bloom_filters: Vec<_> = vec![];
3227 for (ri, row_group) in metadata.row_groups().iter().enumerate() {
3228 if let Some((column_index, _)) = row_group
3229 .columns()
3230 .iter()
3231 .enumerate()
3232 .find(|(_, column)| column.column_path().string() == file_column)
3233 {
3234 let row_group_reader = file_reader
3235 .get_row_group(ri)
3236 .expect("Unable to read row group");
3237 if let Some(sbbf) = row_group_reader.get_column_bloom_filter(column_index) {
3238 bloom_filters.push(sbbf.clone());
3239 } else {
3240 panic!("No bloom filter for column named {file_column} found");
3241 }
3242 } else {
3243 panic!("No column named {file_column} found");
3244 }
3245 }
3246
3247 positive_values.iter().for_each(|value| {
3248 let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
3249 assert!(
3250 found.is_some(),
3251 "{}",
3252 format!("Value {:?} should be in bloom filter", value.as_bytes())
3253 );
3254 });
3255
3256 negative_values.iter().for_each(|value| {
3257 let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
3258 assert!(
3259 found.is_none(),
3260 "{}",
3261 format!("Value {:?} should not be in bloom filter", value.as_bytes())
3262 );
3263 });
3264 });
3265 }
3266
3267 #[test]
3268 fn all_null_primitive_single_column() {
3269 let values = Arc::new(Int32Array::from(vec![None; SMALL_SIZE]));
3270 one_column_roundtrip(values, true);
3271 }
3272 #[test]
3273 fn null_single_column() {
3274 let values = Arc::new(NullArray::new(SMALL_SIZE));
3275 one_column_roundtrip(values, true);
3276 }
3278
3279 #[test]
3280 fn bool_single_column() {
3281 required_and_optional::<BooleanArray, _>(
3282 [true, false].iter().cycle().copied().take(SMALL_SIZE),
3283 );
3284 }
3285
3286 #[test]
3287 fn bool_large_single_column() {
3288 let values = Arc::new(
3289 [None, Some(true), Some(false)]
3290 .iter()
3291 .cycle()
3292 .copied()
3293 .take(200_000)
3294 .collect::<BooleanArray>(),
3295 );
3296 let schema = Schema::new(vec![Field::new("col", values.data_type().clone(), true)]);
3297 let expected_batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
3298 let file = tempfile::tempfile().unwrap();
3299
3300 let mut writer =
3301 ArrowWriter::try_new(file.try_clone().unwrap(), expected_batch.schema(), None)
3302 .expect("Unable to write file");
3303 writer.write(&expected_batch).unwrap();
3304 writer.close().unwrap();
3305 }
3306
3307 #[test]
3308 fn check_page_offset_index_with_nan() {
3309 let values = Arc::new(Float64Array::from(vec![f64::NAN; 10]));
3310 let schema = Schema::new(vec![Field::new("col", DataType::Float64, true)]);
3311 let batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
3312
3313 let mut out = Vec::with_capacity(1024);
3314 let mut writer =
3315 ArrowWriter::try_new(&mut out, batch.schema(), None).expect("Unable to write file");
3316 writer.write(&batch).unwrap();
3317 let file_meta_data = writer.close().unwrap();
3318 for row_group in file_meta_data.row_groups() {
3319 for column in row_group.columns() {
3320 assert!(column.offset_index_offset().is_some());
3321 assert!(column.offset_index_length().is_some());
3322 assert!(column.column_index_offset().is_none());
3323 assert!(column.column_index_length().is_none());
3324 }
3325 }
3326 }
3327
3328 #[test]
3329 fn i8_single_column() {
3330 required_and_optional::<Int8Array, _>(0..SMALL_SIZE as i8);
3331 }
3332
3333 #[test]
3334 fn i16_single_column() {
3335 required_and_optional::<Int16Array, _>(0..SMALL_SIZE as i16);
3336 }
3337
3338 #[test]
3339 fn i32_single_column() {
3340 required_and_optional::<Int32Array, _>(0..SMALL_SIZE as i32);
3341 }
3342
3343 #[test]
3344 fn i64_single_column() {
3345 required_and_optional::<Int64Array, _>(0..SMALL_SIZE as i64);
3346 }
3347
3348 #[test]
3349 fn u8_single_column() {
3350 required_and_optional::<UInt8Array, _>(0..SMALL_SIZE as u8);
3351 }
3352
3353 #[test]
3354 fn u16_single_column() {
3355 required_and_optional::<UInt16Array, _>(0..SMALL_SIZE as u16);
3356 }
3357
3358 #[test]
3359 fn u32_single_column() {
3360 required_and_optional::<UInt32Array, _>(0..SMALL_SIZE as u32);
3361 }
3362
3363 #[test]
3364 fn u64_single_column() {
3365 required_and_optional::<UInt64Array, _>(0..SMALL_SIZE as u64);
3366 }
3367
3368 #[test]
3369 fn f32_single_column() {
3370 required_and_optional::<Float32Array, _>((0..SMALL_SIZE).map(|i| i as f32));
3371 }
3372
3373 #[test]
3374 fn f64_single_column() {
3375 required_and_optional::<Float64Array, _>((0..SMALL_SIZE).map(|i| i as f64));
3376 }
3377
3378 #[test]
3383 fn timestamp_second_single_column() {
3384 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
3385 let values = Arc::new(TimestampSecondArray::from(raw_values));
3386
3387 one_column_roundtrip(values, false);
3388 }
3389
3390 #[test]
3391 fn timestamp_millisecond_single_column() {
3392 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
3393 let values = Arc::new(TimestampMillisecondArray::from(raw_values));
3394
3395 one_column_roundtrip(values, false);
3396 }
3397
3398 #[test]
3399 fn timestamp_microsecond_single_column() {
3400 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
3401 let values = Arc::new(TimestampMicrosecondArray::from(raw_values));
3402
3403 one_column_roundtrip(values, false);
3404 }
3405
3406 #[test]
3407 fn timestamp_nanosecond_single_column() {
3408 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
3409 let values = Arc::new(TimestampNanosecondArray::from(raw_values));
3410
3411 one_column_roundtrip(values, false);
3412 }
3413
3414 #[test]
3415 fn date32_single_column() {
3416 required_and_optional::<Date32Array, _>(0..SMALL_SIZE as i32);
3417 }
3418
3419 #[test]
3420 fn date64_single_column() {
3421 required_and_optional::<Date64Array, _>(
3423 (0..(SMALL_SIZE as i64 * 86400000)).step_by(86400000),
3424 );
3425 }
3426
3427 #[test]
3428 fn time32_second_single_column() {
3429 required_and_optional::<Time32SecondArray, _>(0..SMALL_SIZE as i32);
3430 }
3431
3432 #[test]
3433 fn time32_millisecond_single_column() {
3434 required_and_optional::<Time32MillisecondArray, _>(0..SMALL_SIZE as i32);
3435 }
3436
3437 #[test]
3438 fn time64_microsecond_single_column() {
3439 required_and_optional::<Time64MicrosecondArray, _>(0..SMALL_SIZE as i64);
3440 }
3441
3442 #[test]
3443 fn time64_nanosecond_single_column() {
3444 required_and_optional::<Time64NanosecondArray, _>(0..SMALL_SIZE as i64);
3445 }
3446
3447 #[test]
3448 fn duration_second_single_column() {
3449 required_and_optional::<DurationSecondArray, _>(0..SMALL_SIZE as i64);
3450 }
3451
3452 #[test]
3453 fn duration_millisecond_single_column() {
3454 required_and_optional::<DurationMillisecondArray, _>(0..SMALL_SIZE as i64);
3455 }
3456
3457 #[test]
3458 fn duration_microsecond_single_column() {
3459 required_and_optional::<DurationMicrosecondArray, _>(0..SMALL_SIZE as i64);
3460 }
3461
3462 #[test]
3463 fn duration_nanosecond_single_column() {
3464 required_and_optional::<DurationNanosecondArray, _>(0..SMALL_SIZE as i64);
3465 }
3466
3467 #[test]
3468 fn interval_year_month_single_column() {
3469 required_and_optional::<IntervalYearMonthArray, _>(0..SMALL_SIZE as i32);
3470 }
3471
3472 #[test]
3473 fn interval_day_time_single_column() {
3474 required_and_optional::<IntervalDayTimeArray, _>(vec![
3475 IntervalDayTime::new(0, 1),
3476 IntervalDayTime::new(0, 3),
3477 IntervalDayTime::new(3, -2),
3478 IntervalDayTime::new(-200, 4),
3479 ]);
3480 }
3481
3482 #[test]
3483 #[should_panic(
3484 expected = "Attempting to write an Arrow interval type MonthDayNano to parquet that is not yet implemented"
3485 )]
3486 fn interval_month_day_nano_single_column() {
3487 required_and_optional::<IntervalMonthDayNanoArray, _>(vec![
3488 IntervalMonthDayNano::new(0, 1, 5),
3489 IntervalMonthDayNano::new(0, 3, 2),
3490 IntervalMonthDayNano::new(3, -2, -5),
3491 IntervalMonthDayNano::new(-200, 4, -1),
3492 ]);
3493 }
3494
3495 #[test]
3496 fn binary_single_column() {
3497 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3498 let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3499 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3500
3501 values_required::<BinaryArray, _>(many_vecs_iter);
3503 }
3504
3505 #[test]
3506 fn binary_view_single_column() {
3507 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3508 let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3509 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3510
3511 values_required::<BinaryViewArray, _>(many_vecs_iter);
3513 }
3514
3515 #[test]
3516 fn i32_column_bloom_filter_at_end() {
3517 let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
3518 let mut options = RoundTripOptions::new(array, false);
3519 options.bloom_filter = true;
3520 options.bloom_filter_position = BloomFilterPosition::End;
3521
3522 let files = one_column_roundtrip_with_options(options);
3523 check_bloom_filter(
3524 files,
3525 "col".to_string(),
3526 (0..SMALL_SIZE as i32).collect(),
3527 (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3528 );
3529 }
3530
3531 #[test]
3532 fn i32_column_bloom_filter() {
3533 let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
3534 let mut options = RoundTripOptions::new(array, false);
3535 options.bloom_filter = true;
3536
3537 let files = one_column_roundtrip_with_options(options);
3538 check_bloom_filter(
3539 files,
3540 "col".to_string(),
3541 (0..SMALL_SIZE as i32).collect(),
3542 (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3543 );
3544 }
3545
3546 #[test]
3551 fn i32_column_bloom_filter_fixed_ndv() {
3552 let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
3553
3554 let mut options = RoundTripOptions::new(array.clone(), false);
3556 options.bloom_filter = true;
3557 options.bloom_filter_ndv = Some(1_000_000);
3558
3559 let files = one_column_roundtrip_with_options(options);
3560 check_bloom_filter(
3561 files,
3562 "col".to_string(),
3563 (0..SMALL_SIZE as i32).collect(),
3564 (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3565 );
3566
3567 let mut options = RoundTripOptions::new(array, false);
3569 options.bloom_filter = true;
3570 options.bloom_filter_ndv = Some(3);
3571
3572 let files = one_column_roundtrip_with_options(options);
3573 check_bloom_filter(
3574 files,
3575 "col".to_string(),
3576 (0..SMALL_SIZE as i32).collect(),
3577 (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3578 );
3579 }
3580
3581 #[test]
3582 fn binary_column_bloom_filter() {
3583 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3584 let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3585 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3586
3587 let array = Arc::new(BinaryArray::from_iter_values(many_vecs_iter));
3588 let mut options = RoundTripOptions::new(array, false);
3589 options.bloom_filter = true;
3590
3591 let files = one_column_roundtrip_with_options(options);
3592 check_bloom_filter(
3593 files,
3594 "col".to_string(),
3595 many_vecs,
3596 vec![vec![(SMALL_SIZE + 1) as u8]],
3597 );
3598 }
3599
3600 #[test]
3601 fn empty_string_null_column_bloom_filter() {
3602 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3603 let raw_strs = raw_values.iter().map(|s| s.as_str());
3604
3605 let array = Arc::new(StringArray::from_iter_values(raw_strs));
3606 let mut options = RoundTripOptions::new(array, false);
3607 options.bloom_filter = true;
3608
3609 let files = one_column_roundtrip_with_options(options);
3610
3611 let optional_raw_values: Vec<_> = raw_values
3612 .iter()
3613 .enumerate()
3614 .filter_map(|(i, v)| if i % 2 == 0 { None } else { Some(v.as_str()) })
3615 .collect();
3616 check_bloom_filter(files, "col".to_string(), optional_raw_values, vec![""]);
3618 }
3619
3620 #[test]
3621 fn large_binary_single_column() {
3622 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3623 let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3624 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3625
3626 values_required::<LargeBinaryArray, _>(many_vecs_iter);
3628 }
3629
3630 #[test]
3631 fn fixed_size_binary_single_column() {
3632 let mut builder = FixedSizeBinaryBuilder::new(4);
3633 builder.append_value(b"0123").unwrap();
3634 builder.append_null();
3635 builder.append_value(b"8910").unwrap();
3636 builder.append_value(b"1112").unwrap();
3637 let array = Arc::new(builder.finish());
3638
3639 one_column_roundtrip(array, true);
3640 }
3641
3642 #[test]
3643 fn string_single_column() {
3644 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3645 let raw_strs = raw_values.iter().map(|s| s.as_str());
3646
3647 required_and_optional::<StringArray, _>(raw_strs);
3648 }
3649
3650 #[test]
3651 fn large_string_single_column() {
3652 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3653 let raw_strs = raw_values.iter().map(|s| s.as_str());
3654
3655 required_and_optional::<LargeStringArray, _>(raw_strs);
3656 }
3657
3658 #[test]
3659 fn string_view_single_column() {
3660 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3661 let raw_strs = raw_values.iter().map(|s| s.as_str());
3662
3663 required_and_optional::<StringViewArray, _>(raw_strs);
3664 }
3665
3666 #[test]
3667 fn null_list_single_column() {
3668 let null_field = Field::new_list_field(DataType::Null, true);
3669 let list_field = Field::new("emptylist", DataType::List(Arc::new(null_field)), true);
3670
3671 let schema = Schema::new(vec![list_field]);
3672
3673 let a_values = NullArray::new(2);
3675 let a_value_offsets = arrow::buffer::Buffer::from([0, 0, 0, 2].to_byte_slice());
3676 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
3677 DataType::Null,
3678 true,
3679 ))))
3680 .len(3)
3681 .add_buffer(a_value_offsets)
3682 .null_bit_buffer(Some(Buffer::from([0b00000101])))
3683 .add_child_data(a_values.into_data())
3684 .build()
3685 .unwrap();
3686
3687 let a = ListArray::from(a_list_data);
3688
3689 assert!(a.is_valid(0));
3690 assert!(!a.is_valid(1));
3691 assert!(a.is_valid(2));
3692
3693 assert_eq!(a.value(0).len(), 0);
3694 assert_eq!(a.value(2).len(), 2);
3695 assert_eq!(a.value(2).logical_nulls().unwrap().null_count(), 2);
3696
3697 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
3698 roundtrip(batch, None);
3699 }
3700
3701 #[test]
3702 fn list_single_column() {
3703 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
3704 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
3705 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
3706 DataType::Int32,
3707 false,
3708 ))))
3709 .len(5)
3710 .add_buffer(a_value_offsets)
3711 .null_bit_buffer(Some(Buffer::from([0b00011011])))
3712 .add_child_data(a_values.into_data())
3713 .build()
3714 .unwrap();
3715
3716 assert_eq!(a_list_data.null_count(), 1);
3717
3718 let a = ListArray::from(a_list_data);
3719 let values = Arc::new(a);
3720
3721 one_column_roundtrip(values, true);
3722 }
3723
3724 #[test]
3725 fn large_list_single_column() {
3726 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
3727 let a_value_offsets = arrow::buffer::Buffer::from([0i64, 1, 3, 3, 6, 10].to_byte_slice());
3728 let a_list_data = ArrayData::builder(DataType::LargeList(Arc::new(Field::new(
3729 "large_item",
3730 DataType::Int32,
3731 true,
3732 ))))
3733 .len(5)
3734 .add_buffer(a_value_offsets)
3735 .add_child_data(a_values.into_data())
3736 .null_bit_buffer(Some(Buffer::from([0b00011011])))
3737 .build()
3738 .unwrap();
3739
3740 assert_eq!(a_list_data.null_count(), 1);
3742
3743 let a = LargeListArray::from(a_list_data);
3744 let values = Arc::new(a);
3745
3746 one_column_roundtrip(values, true);
3747 }
3748
3749 #[test]
3750 fn list_nested_nulls() {
3751 use arrow::datatypes::Int32Type;
3752 let data = vec![
3753 Some(vec![Some(1)]),
3754 Some(vec![Some(2), Some(3)]),
3755 None,
3756 Some(vec![Some(4), Some(5), None]),
3757 Some(vec![None]),
3758 Some(vec![Some(6), Some(7)]),
3759 ];
3760
3761 let list = ListArray::from_iter_primitive::<Int32Type, _, _>(data.clone());
3762 one_column_roundtrip(Arc::new(list), true);
3763
3764 let list = LargeListArray::from_iter_primitive::<Int32Type, _, _>(data);
3765 one_column_roundtrip(Arc::new(list), true);
3766 }
3767
3768 #[test]
3769 fn struct_single_column() {
3770 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
3771 let struct_field_a = Arc::new(Field::new("f", DataType::Int32, false));
3772 let s = StructArray::from(vec![(struct_field_a, Arc::new(a_values) as ArrayRef)]);
3773
3774 let values = Arc::new(s);
3775 one_column_roundtrip(values, false);
3776 }
3777
3778 #[test]
3779 fn list_and_map_coerced_names() {
3780 let list_field =
3782 Field::new_list("my_list", Field::new("item", DataType::Int32, false), false);
3783 let map_field = Field::new_map(
3784 "my_map",
3785 "entries",
3786 Field::new("keys", DataType::Int32, false),
3787 Field::new("values", DataType::Int32, true),
3788 false,
3789 true,
3790 );
3791
3792 let list_array = create_random_array(&list_field, 100, 0.0, 0.0).unwrap();
3793 let map_array = create_random_array(&map_field, 100, 0.0, 0.0).unwrap();
3794
3795 let arrow_schema = Arc::new(Schema::new(vec![list_field, map_field]));
3796
3797 let props = Some(WriterProperties::builder().set_coerce_types(true).build());
3799 let file = tempfile::tempfile().unwrap();
3800 let mut writer =
3801 ArrowWriter::try_new(file.try_clone().unwrap(), arrow_schema.clone(), props).unwrap();
3802
3803 let batch = RecordBatch::try_new(arrow_schema, vec![list_array, map_array]).unwrap();
3804 writer.write(&batch).unwrap();
3805 let file_metadata = writer.close().unwrap();
3806
3807 let schema = file_metadata.file_metadata().schema();
3808 let list_field = &schema.get_fields()[0].get_fields()[0];
3810 assert_eq!(list_field.get_fields()[0].name(), "element");
3811
3812 let map_field = &schema.get_fields()[1].get_fields()[0];
3813 assert_eq!(map_field.name(), "key_value");
3815 assert_eq!(map_field.get_fields()[0].name(), "key");
3817 assert_eq!(map_field.get_fields()[1].name(), "value");
3819
3820 let reader = SerializedFileReader::new(file).unwrap();
3822 let file_schema = reader.metadata().file_metadata().schema();
3823 let fields = file_schema.get_fields();
3824 let list_field = &fields[0].get_fields()[0];
3825 assert_eq!(list_field.get_fields()[0].name(), "element");
3826 let map_field = &fields[1].get_fields()[0];
3827 assert_eq!(map_field.name(), "key_value");
3828 assert_eq!(map_field.get_fields()[0].name(), "key");
3829 assert_eq!(map_field.get_fields()[1].name(), "value");
3830 }
3831
3832 #[test]
3833 fn fallback_flush_data_page() {
3834 let raw_values: Vec<_> = (0..MEDIUM_SIZE).map(|i| i.to_string()).collect();
3836 let values = Arc::new(StringArray::from(raw_values));
3837 let encodings = vec![
3838 Encoding::DELTA_BYTE_ARRAY,
3839 Encoding::DELTA_LENGTH_BYTE_ARRAY,
3840 ];
3841 let data_type = values.data_type().clone();
3842 let schema = Arc::new(Schema::new(vec![Field::new("col", data_type, false)]));
3843 let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3844
3845 let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
3846 let data_page_size_limit: usize = 32;
3847 let write_batch_size: usize = 16;
3848
3849 for encoding in &encodings {
3850 for row_group_size in row_group_sizes {
3851 let props = WriterProperties::builder()
3852 .set_writer_version(WriterVersion::PARQUET_2_0)
3853 .set_max_row_group_row_count(Some(row_group_size))
3854 .set_dictionary_enabled(false)
3855 .set_encoding(*encoding)
3856 .set_data_page_size_limit(data_page_size_limit)
3857 .set_write_batch_size(write_batch_size)
3858 .build();
3859
3860 roundtrip_opts_with_array_validation(&expected_batch, props, |a, b| {
3861 let string_array_a = StringArray::from(a.clone());
3862 let string_array_b = StringArray::from(b.clone());
3863 let vec_a: Vec<&str> = string_array_a.iter().map(|v| v.unwrap()).collect();
3864 let vec_b: Vec<&str> = string_array_b.iter().map(|v| v.unwrap()).collect();
3865 assert_eq!(
3866 vec_a, vec_b,
3867 "failed for encoder: {encoding:?} and row_group_size: {row_group_size:?}"
3868 );
3869 });
3870 }
3871 }
3872 }
3873
3874 #[test]
3875 fn arrow_writer_string_dictionary() {
3876 #[allow(deprecated)]
3878 let schema = Arc::new(Schema::new(vec![Field::new_dict(
3879 "dictionary",
3880 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3881 true,
3882 42,
3883 true,
3884 )]));
3885
3886 let d: Int32DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
3888 .iter()
3889 .copied()
3890 .collect();
3891
3892 one_column_roundtrip_with_schema(Arc::new(d), schema);
3894 }
3895
3896 #[test]
3897 fn arrow_writer_test_type_compatibility() {
3898 fn ensure_compatible_write<T1, T2>(array1: T1, array2: T2, expected_result: T1)
3899 where
3900 T1: Array + 'static,
3901 T2: Array + 'static,
3902 {
3903 let schema1 = Arc::new(Schema::new(vec![Field::new(
3904 "a",
3905 array1.data_type().clone(),
3906 false,
3907 )]));
3908
3909 let file = tempfile().unwrap();
3910 let mut writer =
3911 ArrowWriter::try_new(file.try_clone().unwrap(), schema1.clone(), None).unwrap();
3912
3913 let rb1 = RecordBatch::try_new(schema1.clone(), vec![Arc::new(array1)]).unwrap();
3914 writer.write(&rb1).unwrap();
3915
3916 let schema2 = Arc::new(Schema::new(vec![Field::new(
3917 "a",
3918 array2.data_type().clone(),
3919 false,
3920 )]));
3921 let rb2 = RecordBatch::try_new(schema2, vec![Arc::new(array2)]).unwrap();
3922 writer.write(&rb2).unwrap();
3923
3924 writer.close().unwrap();
3925
3926 let mut record_batch_reader =
3927 ParquetRecordBatchReader::try_new(file.try_clone().unwrap(), 1024).unwrap();
3928 let actual_batch = record_batch_reader.next().unwrap().unwrap();
3929
3930 let expected_batch =
3931 RecordBatch::try_new(schema1, vec![Arc::new(expected_result)]).unwrap();
3932 assert_eq!(actual_batch, expected_batch);
3933 }
3934
3935 ensure_compatible_write(
3938 DictionaryArray::new(
3939 UInt8Array::from_iter_values(vec![0]),
3940 Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3941 ),
3942 StringArray::from_iter_values(vec!["barquet"]),
3943 DictionaryArray::new(
3944 UInt8Array::from_iter_values(vec![0, 1]),
3945 Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3946 ),
3947 );
3948
3949 ensure_compatible_write(
3950 StringArray::from_iter_values(vec!["parquet"]),
3951 DictionaryArray::new(
3952 UInt8Array::from_iter_values(vec![0]),
3953 Arc::new(StringArray::from_iter_values(vec!["barquet"])),
3954 ),
3955 StringArray::from_iter_values(vec!["parquet", "barquet"]),
3956 );
3957
3958 ensure_compatible_write(
3961 DictionaryArray::new(
3962 UInt8Array::from_iter_values(vec![0]),
3963 Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3964 ),
3965 DictionaryArray::new(
3966 UInt16Array::from_iter_values(vec![0]),
3967 Arc::new(StringArray::from_iter_values(vec!["barquet"])),
3968 ),
3969 DictionaryArray::new(
3970 UInt8Array::from_iter_values(vec![0, 1]),
3971 Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3972 ),
3973 );
3974
3975 ensure_compatible_write(
3977 DictionaryArray::new(
3978 UInt8Array::from_iter_values(vec![0]),
3979 Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3980 ),
3981 DictionaryArray::new(
3982 UInt8Array::from_iter_values(vec![0]),
3983 Arc::new(LargeStringArray::from_iter_values(vec!["barquet"])),
3984 ),
3985 DictionaryArray::new(
3986 UInt8Array::from_iter_values(vec![0, 1]),
3987 Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
3988 ),
3989 );
3990
3991 ensure_compatible_write(
3993 DictionaryArray::new(
3994 UInt8Array::from_iter_values(vec![0]),
3995 Arc::new(StringArray::from_iter_values(vec!["parquet"])),
3996 ),
3997 LargeStringArray::from_iter_values(vec!["barquet"]),
3998 DictionaryArray::new(
3999 UInt8Array::from_iter_values(vec![0, 1]),
4000 Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
4001 ),
4002 );
4003
4004 ensure_compatible_write(
4007 StringArray::from_iter_values(vec!["parquet"]),
4008 LargeStringArray::from_iter_values(vec!["barquet"]),
4009 StringArray::from_iter_values(vec!["parquet", "barquet"]),
4010 );
4011
4012 ensure_compatible_write(
4013 LargeStringArray::from_iter_values(vec!["parquet"]),
4014 StringArray::from_iter_values(vec!["barquet"]),
4015 LargeStringArray::from_iter_values(vec!["parquet", "barquet"]),
4016 );
4017
4018 ensure_compatible_write(
4019 StringArray::from_iter_values(vec!["parquet"]),
4020 StringViewArray::from_iter_values(vec!["barquet"]),
4021 StringArray::from_iter_values(vec!["parquet", "barquet"]),
4022 );
4023
4024 ensure_compatible_write(
4025 StringViewArray::from_iter_values(vec!["parquet"]),
4026 StringArray::from_iter_values(vec!["barquet"]),
4027 StringViewArray::from_iter_values(vec!["parquet", "barquet"]),
4028 );
4029
4030 ensure_compatible_write(
4031 LargeStringArray::from_iter_values(vec!["parquet"]),
4032 StringViewArray::from_iter_values(vec!["barquet"]),
4033 LargeStringArray::from_iter_values(vec!["parquet", "barquet"]),
4034 );
4035
4036 ensure_compatible_write(
4037 StringViewArray::from_iter_values(vec!["parquet"]),
4038 LargeStringArray::from_iter_values(vec!["barquet"]),
4039 StringViewArray::from_iter_values(vec!["parquet", "barquet"]),
4040 );
4041
4042 ensure_compatible_write(
4045 BinaryArray::from_iter_values(vec![b"parquet"]),
4046 LargeBinaryArray::from_iter_values(vec![b"barquet"]),
4047 BinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
4048 );
4049
4050 ensure_compatible_write(
4051 LargeBinaryArray::from_iter_values(vec![b"parquet"]),
4052 BinaryArray::from_iter_values(vec![b"barquet"]),
4053 LargeBinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
4054 );
4055
4056 ensure_compatible_write(
4057 BinaryArray::from_iter_values(vec![b"parquet"]),
4058 BinaryViewArray::from_iter_values(vec![b"barquet"]),
4059 BinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
4060 );
4061
4062 ensure_compatible_write(
4063 BinaryViewArray::from_iter_values(vec![b"parquet"]),
4064 BinaryArray::from_iter_values(vec![b"barquet"]),
4065 BinaryViewArray::from_iter_values(vec![b"parquet", b"barquet"]),
4066 );
4067
4068 ensure_compatible_write(
4069 BinaryViewArray::from_iter_values(vec![b"parquet"]),
4070 LargeBinaryArray::from_iter_values(vec![b"barquet"]),
4071 BinaryViewArray::from_iter_values(vec![b"parquet", b"barquet"]),
4072 );
4073
4074 ensure_compatible_write(
4075 LargeBinaryArray::from_iter_values(vec![b"parquet"]),
4076 BinaryViewArray::from_iter_values(vec![b"barquet"]),
4077 LargeBinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
4078 );
4079
4080 let list_field_metadata = HashMap::from_iter(vec![(
4083 PARQUET_FIELD_ID_META_KEY.to_string(),
4084 "1".to_string(),
4085 )]);
4086 let list_field = Field::new_list_field(DataType::Int32, false);
4087
4088 let values1 = Arc::new(Int32Array::from(vec![0, 1, 2, 3, 4]));
4089 let offsets1 = OffsetBuffer::new(vec![0, 2, 5].into());
4090
4091 let values2 = Arc::new(Int32Array::from(vec![5, 6, 7, 8, 9]));
4092 let offsets2 = OffsetBuffer::new(vec![0, 3, 5].into());
4093
4094 let values_expected = Arc::new(Int32Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]));
4095 let offsets_expected = OffsetBuffer::new(vec![0, 2, 5, 8, 10].into());
4096
4097 ensure_compatible_write(
4098 ListArray::try_new(
4100 Arc::new(
4101 list_field
4102 .clone()
4103 .with_metadata(list_field_metadata.clone()),
4104 ),
4105 offsets1,
4106 values1,
4107 None,
4108 )
4109 .unwrap(),
4110 ListArray::try_new(Arc::new(list_field.clone()), offsets2, values2, None).unwrap(),
4112 ListArray::try_new(
4114 Arc::new(
4115 list_field
4116 .clone()
4117 .with_metadata(list_field_metadata.clone()),
4118 ),
4119 offsets_expected,
4120 values_expected,
4121 None,
4122 )
4123 .unwrap(),
4124 );
4125 }
4126
4127 #[test]
4128 fn arrow_writer_primitive_dictionary() {
4129 #[allow(deprecated)]
4131 let schema = Arc::new(Schema::new(vec![Field::new_dict(
4132 "dictionary",
4133 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::UInt32)),
4134 true,
4135 42,
4136 true,
4137 )]));
4138
4139 let mut builder = PrimitiveDictionaryBuilder::<UInt8Type, UInt32Type>::new();
4141 builder.append(12345678).unwrap();
4142 builder.append_null();
4143 builder.append(22345678).unwrap();
4144 builder.append(12345678).unwrap();
4145 let d = builder.finish();
4146
4147 one_column_roundtrip_with_schema(Arc::new(d), schema);
4148 }
4149
4150 #[test]
4151 fn arrow_writer_decimal32_dictionary() {
4152 let integers = vec![12345, 56789, 34567];
4153
4154 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
4155
4156 let values = Decimal32Array::from(integers.clone())
4157 .with_precision_and_scale(5, 2)
4158 .unwrap();
4159
4160 let array = DictionaryArray::new(keys, Arc::new(values));
4161 one_column_roundtrip(Arc::new(array.clone()), true);
4162
4163 let values = Decimal32Array::from(integers)
4164 .with_precision_and_scale(9, 2)
4165 .unwrap();
4166
4167 let array = array.with_values(Arc::new(values));
4168 one_column_roundtrip(Arc::new(array), true);
4169 }
4170
4171 #[test]
4172 fn arrow_writer_decimal64_dictionary() {
4173 let integers = vec![12345, 56789, 34567];
4174
4175 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
4176
4177 let values = Decimal64Array::from(integers.clone())
4178 .with_precision_and_scale(5, 2)
4179 .unwrap();
4180
4181 let array = DictionaryArray::new(keys, Arc::new(values));
4182 one_column_roundtrip(Arc::new(array.clone()), true);
4183
4184 let values = Decimal64Array::from(integers)
4185 .with_precision_and_scale(12, 2)
4186 .unwrap();
4187
4188 let array = array.with_values(Arc::new(values));
4189 one_column_roundtrip(Arc::new(array), true);
4190 }
4191
4192 #[test]
4193 fn arrow_writer_decimal128_dictionary() {
4194 let integers = vec![12345, 56789, 34567];
4195
4196 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
4197
4198 let values = Decimal128Array::from(integers.clone())
4199 .with_precision_and_scale(5, 2)
4200 .unwrap();
4201
4202 let array = DictionaryArray::new(keys, Arc::new(values));
4203 one_column_roundtrip(Arc::new(array.clone()), true);
4204
4205 let values = Decimal128Array::from(integers)
4206 .with_precision_and_scale(12, 2)
4207 .unwrap();
4208
4209 let array = array.with_values(Arc::new(values));
4210 one_column_roundtrip(Arc::new(array), true);
4211 }
4212
4213 #[test]
4214 fn arrow_writer_decimal256_dictionary() {
4215 let integers = vec![
4216 i256::from_i128(12345),
4217 i256::from_i128(56789),
4218 i256::from_i128(34567),
4219 ];
4220
4221 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
4222
4223 let values = Decimal256Array::from(integers.clone())
4224 .with_precision_and_scale(5, 2)
4225 .unwrap();
4226
4227 let array = DictionaryArray::new(keys, Arc::new(values));
4228 one_column_roundtrip(Arc::new(array.clone()), true);
4229
4230 let values = Decimal256Array::from(integers)
4231 .with_precision_and_scale(12, 2)
4232 .unwrap();
4233
4234 let array = array.with_values(Arc::new(values));
4235 one_column_roundtrip(Arc::new(array), true);
4236 }
4237
4238 #[test]
4239 fn arrow_writer_string_dictionary_unsigned_index() {
4240 #[allow(deprecated)]
4242 let schema = Arc::new(Schema::new(vec![Field::new_dict(
4243 "dictionary",
4244 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
4245 true,
4246 42,
4247 true,
4248 )]));
4249
4250 let d: UInt8DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
4252 .iter()
4253 .copied()
4254 .collect();
4255
4256 one_column_roundtrip_with_schema(Arc::new(d), schema);
4257 }
4258
4259 #[test]
4260 fn u32_min_max() {
4261 let src = [
4263 u32::MIN,
4264 u32::MIN + 1,
4265 (i32::MAX as u32) - 1,
4266 i32::MAX as u32,
4267 (i32::MAX as u32) + 1,
4268 u32::MAX - 1,
4269 u32::MAX,
4270 ];
4271 let values = Arc::new(UInt32Array::from_iter_values(src.iter().cloned()));
4272 let files = one_column_roundtrip(values, false);
4273
4274 for file in files {
4275 let reader = SerializedFileReader::new(file).unwrap();
4277 let metadata = reader.metadata();
4278
4279 let mut row_offset = 0;
4280 for row_group in metadata.row_groups() {
4281 assert_eq!(row_group.num_columns(), 1);
4282 let column = row_group.column(0);
4283
4284 let num_values = column.num_values() as usize;
4285 let src_slice = &src[row_offset..row_offset + num_values];
4286 row_offset += column.num_values() as usize;
4287
4288 let stats = column.statistics().unwrap();
4289 if let Statistics::Int32(stats) = stats {
4290 assert_eq!(
4291 *stats.min_opt().unwrap() as u32,
4292 *src_slice.iter().min().unwrap()
4293 );
4294 assert_eq!(
4295 *stats.max_opt().unwrap() as u32,
4296 *src_slice.iter().max().unwrap()
4297 );
4298 } else {
4299 panic!("Statistics::Int32 missing")
4300 }
4301 }
4302 }
4303 }
4304
4305 #[test]
4306 fn u64_min_max() {
4307 let src = [
4309 u64::MIN,
4310 u64::MIN + 1,
4311 (i64::MAX as u64) - 1,
4312 i64::MAX as u64,
4313 (i64::MAX as u64) + 1,
4314 u64::MAX - 1,
4315 u64::MAX,
4316 ];
4317 let values = Arc::new(UInt64Array::from_iter_values(src.iter().cloned()));
4318 let files = one_column_roundtrip(values, false);
4319
4320 for file in files {
4321 let reader = SerializedFileReader::new(file).unwrap();
4323 let metadata = reader.metadata();
4324
4325 let mut row_offset = 0;
4326 for row_group in metadata.row_groups() {
4327 assert_eq!(row_group.num_columns(), 1);
4328 let column = row_group.column(0);
4329
4330 let num_values = column.num_values() as usize;
4331 let src_slice = &src[row_offset..row_offset + num_values];
4332 row_offset += column.num_values() as usize;
4333
4334 let stats = column.statistics().unwrap();
4335 if let Statistics::Int64(stats) = stats {
4336 assert_eq!(
4337 *stats.min_opt().unwrap() as u64,
4338 *src_slice.iter().min().unwrap()
4339 );
4340 assert_eq!(
4341 *stats.max_opt().unwrap() as u64,
4342 *src_slice.iter().max().unwrap()
4343 );
4344 } else {
4345 panic!("Statistics::Int64 missing")
4346 }
4347 }
4348 }
4349 }
4350
4351 #[test]
4352 fn statistics_null_counts_only_nulls() {
4353 let values = Arc::new(UInt64Array::from(vec![None, None]));
4355 let files = one_column_roundtrip(values, true);
4356
4357 for file in files {
4358 let reader = SerializedFileReader::new(file).unwrap();
4360 let metadata = reader.metadata();
4361 assert_eq!(metadata.num_row_groups(), 1);
4362 let row_group = metadata.row_group(0);
4363 assert_eq!(row_group.num_columns(), 1);
4364 let column = row_group.column(0);
4365 let stats = column.statistics().unwrap();
4366 assert_eq!(stats.null_count_opt(), Some(2));
4367 }
4368 }
4369
4370 #[test]
4371 fn test_list_of_struct_roundtrip() {
4372 let int_field = Field::new("a", DataType::Int32, true);
4374 let int_field2 = Field::new("b", DataType::Int32, true);
4375
4376 let int_builder = Int32Builder::with_capacity(10);
4377 let int_builder2 = Int32Builder::with_capacity(10);
4378
4379 let struct_builder = StructBuilder::new(
4380 vec![int_field, int_field2],
4381 vec![Box::new(int_builder), Box::new(int_builder2)],
4382 );
4383 let mut list_builder = ListBuilder::new(struct_builder);
4384
4385 let values = list_builder.values();
4390 values
4391 .field_builder::<Int32Builder>(0)
4392 .unwrap()
4393 .append_value(1);
4394 values
4395 .field_builder::<Int32Builder>(1)
4396 .unwrap()
4397 .append_value(2);
4398 values.append(true);
4399 list_builder.append(true);
4400
4401 list_builder.append(true);
4403
4404 list_builder.append(false);
4406
4407 let values = list_builder.values();
4409 values
4410 .field_builder::<Int32Builder>(0)
4411 .unwrap()
4412 .append_null();
4413 values
4414 .field_builder::<Int32Builder>(1)
4415 .unwrap()
4416 .append_null();
4417 values.append(false);
4418 values
4419 .field_builder::<Int32Builder>(0)
4420 .unwrap()
4421 .append_null();
4422 values
4423 .field_builder::<Int32Builder>(1)
4424 .unwrap()
4425 .append_null();
4426 values.append(false);
4427 list_builder.append(true);
4428
4429 let values = list_builder.values();
4431 values
4432 .field_builder::<Int32Builder>(0)
4433 .unwrap()
4434 .append_null();
4435 values
4436 .field_builder::<Int32Builder>(1)
4437 .unwrap()
4438 .append_value(3);
4439 values.append(true);
4440 list_builder.append(true);
4441
4442 let values = list_builder.values();
4444 values
4445 .field_builder::<Int32Builder>(0)
4446 .unwrap()
4447 .append_value(2);
4448 values
4449 .field_builder::<Int32Builder>(1)
4450 .unwrap()
4451 .append_null();
4452 values.append(true);
4453 list_builder.append(true);
4454
4455 let array = Arc::new(list_builder.finish());
4456
4457 one_column_roundtrip(array, true);
4458 }
4459
4460 fn row_group_sizes(metadata: &ParquetMetaData) -> Vec<i64> {
4461 metadata.row_groups().iter().map(|x| x.num_rows()).collect()
4462 }
4463
4464 #[test]
4465 fn test_aggregates_records() {
4466 let arrays = [
4467 Int32Array::from((0..100).collect::<Vec<_>>()),
4468 Int32Array::from((0..50).collect::<Vec<_>>()),
4469 Int32Array::from((200..500).collect::<Vec<_>>()),
4470 ];
4471
4472 let schema = Arc::new(Schema::new(vec![Field::new(
4473 "int",
4474 ArrowDataType::Int32,
4475 false,
4476 )]));
4477
4478 let file = tempfile::tempfile().unwrap();
4479
4480 let props = WriterProperties::builder()
4481 .set_max_row_group_row_count(Some(200))
4482 .build();
4483
4484 let mut writer =
4485 ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
4486
4487 for array in arrays {
4488 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
4489 writer.write(&batch).unwrap();
4490 }
4491
4492 writer.close().unwrap();
4493
4494 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4495 assert_eq!(&row_group_sizes(builder.metadata()), &[200, 200, 50]);
4496
4497 let batches = builder
4498 .with_batch_size(100)
4499 .build()
4500 .unwrap()
4501 .collect::<ArrowResult<Vec<_>>>()
4502 .unwrap();
4503
4504 assert_eq!(batches.len(), 5);
4505 assert!(batches.iter().all(|x| x.num_columns() == 1));
4506
4507 let batch_sizes: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
4508
4509 assert_eq!(&batch_sizes, &[100, 100, 100, 100, 50]);
4510
4511 let values: Vec<_> = batches
4512 .iter()
4513 .flat_map(|x| {
4514 x.column(0)
4515 .as_any()
4516 .downcast_ref::<Int32Array>()
4517 .unwrap()
4518 .values()
4519 .iter()
4520 .cloned()
4521 })
4522 .collect();
4523
4524 let expected_values: Vec<_> = [0..100, 0..50, 200..500].into_iter().flatten().collect();
4525 assert_eq!(&values, &expected_values)
4526 }
4527
4528 #[test]
4529 fn complex_aggregate() {
4530 let field_a = Arc::new(Field::new("leaf_a", DataType::Int32, false));
4532 let field_b = Arc::new(Field::new("leaf_b", DataType::Int32, true));
4533 let struct_a = Arc::new(Field::new(
4534 "struct_a",
4535 DataType::Struct(vec![field_a.clone(), field_b.clone()].into()),
4536 true,
4537 ));
4538
4539 let list_a = Arc::new(Field::new("list", DataType::List(struct_a), true));
4540 let struct_b = Arc::new(Field::new(
4541 "struct_b",
4542 DataType::Struct(vec![list_a.clone()].into()),
4543 false,
4544 ));
4545
4546 let schema = Arc::new(Schema::new(vec![struct_b]));
4547
4548 let field_a_array = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
4550 let field_b_array =
4551 Int32Array::from_iter(vec![Some(1), None, Some(2), None, None, Some(6)]);
4552
4553 let struct_a_array = StructArray::from(vec![
4554 (field_a.clone(), Arc::new(field_a_array) as ArrayRef),
4555 (field_b.clone(), Arc::new(field_b_array) as ArrayRef),
4556 ]);
4557
4558 let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
4559 .len(5)
4560 .add_buffer(Buffer::from_iter(vec![
4561 0_i32, 1_i32, 1_i32, 3_i32, 3_i32, 5_i32,
4562 ]))
4563 .null_bit_buffer(Some(Buffer::from_iter(vec![
4564 true, false, true, false, true,
4565 ])))
4566 .child_data(vec![struct_a_array.into_data()])
4567 .build()
4568 .unwrap();
4569
4570 let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
4571 let struct_b_array = StructArray::from(vec![(list_a.clone(), list_a_array)]);
4572
4573 let batch1 =
4574 RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
4575 .unwrap();
4576
4577 let field_a_array = Int32Array::from(vec![6, 7, 8, 9, 10]);
4578 let field_b_array = Int32Array::from_iter(vec![None, None, None, Some(1), None]);
4579
4580 let struct_a_array = StructArray::from(vec![
4581 (field_a, Arc::new(field_a_array) as ArrayRef),
4582 (field_b, Arc::new(field_b_array) as ArrayRef),
4583 ]);
4584
4585 let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
4586 .len(2)
4587 .add_buffer(Buffer::from_iter(vec![0_i32, 4_i32, 5_i32]))
4588 .child_data(vec![struct_a_array.into_data()])
4589 .build()
4590 .unwrap();
4591
4592 let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
4593 let struct_b_array = StructArray::from(vec![(list_a, list_a_array)]);
4594
4595 let batch2 =
4596 RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
4597 .unwrap();
4598
4599 let batches = &[batch1, batch2];
4600
4601 let expected = r#"
4604 +-------------------------------------------------------------------------------------------------------+
4605 | struct_b |
4606 +-------------------------------------------------------------------------------------------------------+
4607 | {list: [{leaf_a: 1, leaf_b: 1}]} |
4608 | {list: } |
4609 | {list: [{leaf_a: 2, leaf_b: }, {leaf_a: 3, leaf_b: 2}]} |
4610 | {list: } |
4611 | {list: [{leaf_a: 4, leaf_b: }, {leaf_a: 5, leaf_b: }]} |
4612 | {list: [{leaf_a: 6, leaf_b: }, {leaf_a: 7, leaf_b: }, {leaf_a: 8, leaf_b: }, {leaf_a: 9, leaf_b: 1}]} |
4613 | {list: [{leaf_a: 10, leaf_b: }]} |
4614 +-------------------------------------------------------------------------------------------------------+
4615 "#.trim().split('\n').map(|x| x.trim()).collect::<Vec<_>>().join("\n");
4616
4617 let actual = pretty_format_batches(batches).unwrap().to_string();
4618 assert_eq!(actual, expected);
4619
4620 let file = tempfile::tempfile().unwrap();
4622 let props = WriterProperties::builder()
4623 .set_max_row_group_row_count(Some(6))
4624 .build();
4625
4626 let mut writer =
4627 ArrowWriter::try_new(file.try_clone().unwrap(), schema, Some(props)).unwrap();
4628
4629 for batch in batches {
4630 writer.write(batch).unwrap();
4631 }
4632 writer.close().unwrap();
4633
4634 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4639 assert_eq!(&row_group_sizes(builder.metadata()), &[6, 1]);
4640
4641 let batches = builder
4642 .with_batch_size(2)
4643 .build()
4644 .unwrap()
4645 .collect::<ArrowResult<Vec<_>>>()
4646 .unwrap();
4647
4648 assert_eq!(batches.len(), 4);
4649 let batch_counts: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
4650 assert_eq!(&batch_counts, &[2, 2, 2, 1]);
4651
4652 let actual = pretty_format_batches(&batches).unwrap().to_string();
4653 assert_eq!(actual, expected);
4654 }
4655
4656 #[test]
4657 fn test_arrow_writer_metadata() {
4658 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
4659 let file_schema = batch_schema.clone().with_metadata(
4660 vec![("foo".to_string(), "bar".to_string())]
4661 .into_iter()
4662 .collect(),
4663 );
4664
4665 let batch = RecordBatch::try_new(
4666 Arc::new(batch_schema),
4667 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4668 )
4669 .unwrap();
4670
4671 let mut buf = Vec::with_capacity(1024);
4672 let mut writer = ArrowWriter::try_new(&mut buf, Arc::new(file_schema), None).unwrap();
4673 writer.write(&batch).unwrap();
4674 writer.close().unwrap();
4675 }
4676
4677 #[test]
4678 fn test_arrow_writer_nullable() {
4679 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
4680 let file_schema = Schema::new(vec![Field::new("int32", DataType::Int32, true)]);
4681 let file_schema = Arc::new(file_schema);
4682
4683 let batch = RecordBatch::try_new(
4684 Arc::new(batch_schema),
4685 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4686 )
4687 .unwrap();
4688
4689 let mut buf = Vec::with_capacity(1024);
4690 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
4691 writer.write(&batch).unwrap();
4692 writer.close().unwrap();
4693
4694 let mut read = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
4695 let back = read.next().unwrap().unwrap();
4696 assert_eq!(back.schema(), file_schema);
4697 assert_ne!(back.schema(), batch.schema());
4698 assert_eq!(back.column(0).as_ref(), batch.column(0).as_ref());
4699 }
4700
4701 #[test]
4702 fn in_progress_accounting() {
4703 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
4705
4706 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
4708
4709 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
4711
4712 let mut writer = ArrowWriter::try_new(vec![], batch.schema(), None).unwrap();
4713
4714 assert_eq!(writer.in_progress_size(), 0);
4716 assert_eq!(writer.in_progress_rows(), 0);
4717 assert_eq!(writer.memory_size(), 0);
4718 assert_eq!(writer.bytes_written(), 4); writer.write(&batch).unwrap();
4720
4721 let initial_size = writer.in_progress_size();
4723 assert!(initial_size > 0);
4724 assert_eq!(writer.in_progress_rows(), 5);
4725 let initial_memory = writer.memory_size();
4726 assert!(initial_memory > 0);
4727 assert!(
4729 initial_size <= initial_memory,
4730 "{initial_size} <= {initial_memory}"
4731 );
4732
4733 writer.write(&batch).unwrap();
4735 assert!(writer.in_progress_size() > initial_size);
4736 assert_eq!(writer.in_progress_rows(), 10);
4737 assert!(writer.memory_size() > initial_memory);
4738 assert!(
4739 writer.in_progress_size() <= writer.memory_size(),
4740 "in_progress_size {} <= memory_size {}",
4741 writer.in_progress_size(),
4742 writer.memory_size()
4743 );
4744
4745 let pre_flush_bytes_written = writer.bytes_written();
4747 writer.flush().unwrap();
4748 assert_eq!(writer.in_progress_size(), 0);
4749 assert_eq!(writer.memory_size(), 0);
4750 assert!(writer.bytes_written() > pre_flush_bytes_written);
4751
4752 writer.close().unwrap();
4753 }
4754
4755 #[test]
4756 fn test_writer_all_null() {
4757 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
4758 let b = Int32Array::new(vec![0; 5].into(), Some(NullBuffer::new_null(5)));
4759 let batch = RecordBatch::try_from_iter(vec![
4760 ("a", Arc::new(a) as ArrayRef),
4761 ("b", Arc::new(b) as ArrayRef),
4762 ])
4763 .unwrap();
4764
4765 let mut buf = Vec::with_capacity(1024);
4766 let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), None).unwrap();
4767 writer.write(&batch).unwrap();
4768 writer.close().unwrap();
4769
4770 let bytes = Bytes::from(buf);
4771 let options = ReadOptionsBuilder::new().with_page_index().build();
4772 let reader = SerializedFileReader::new_with_options(bytes, options).unwrap();
4773 let index = reader.metadata().offset_index().unwrap();
4774
4775 assert_eq!(index.len(), 1);
4776 assert_eq!(index[0].len(), 2); assert_eq!(index[0][0].page_locations().len(), 1); assert_eq!(index[0][1].page_locations().len(), 1); }
4780
4781 #[test]
4782 fn test_disabled_statistics_with_page() {
4783 let file_schema = Schema::new(vec![
4784 Field::new("a", DataType::Utf8, true),
4785 Field::new("b", DataType::Utf8, true),
4786 ]);
4787 let file_schema = Arc::new(file_schema);
4788
4789 let batch = RecordBatch::try_new(
4790 file_schema.clone(),
4791 vec![
4792 Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
4793 Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
4794 ],
4795 )
4796 .unwrap();
4797
4798 let props = WriterProperties::builder()
4799 .set_statistics_enabled(EnabledStatistics::None)
4800 .set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
4801 .build();
4802
4803 let mut buf = Vec::with_capacity(1024);
4804 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
4805 writer.write(&batch).unwrap();
4806
4807 let metadata = writer.close().unwrap();
4808 assert_eq!(metadata.num_row_groups(), 1);
4809 let row_group = metadata.row_group(0);
4810 assert_eq!(row_group.num_columns(), 2);
4811 assert!(row_group.column(0).offset_index_offset().is_some());
4813 assert!(row_group.column(0).column_index_offset().is_some());
4814 assert!(row_group.column(1).offset_index_offset().is_some());
4816 assert!(row_group.column(1).column_index_offset().is_none());
4817
4818 let options = ReadOptionsBuilder::new().with_page_index().build();
4819 let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
4820
4821 let row_group = reader.get_row_group(0).unwrap();
4822 let a_col = row_group.metadata().column(0);
4823 let b_col = row_group.metadata().column(1);
4824
4825 if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
4827 let min = byte_array_stats.min_opt().unwrap();
4828 let max = byte_array_stats.max_opt().unwrap();
4829
4830 assert_eq!(min.as_bytes(), b"a");
4831 assert_eq!(max.as_bytes(), b"d");
4832 } else {
4833 panic!("expecting Statistics::ByteArray");
4834 }
4835
4836 assert!(b_col.statistics().is_none());
4838
4839 let offset_index = reader.metadata().offset_index().unwrap();
4840 assert_eq!(offset_index.len(), 1); assert_eq!(offset_index[0].len(), 2); let column_index = reader.metadata().column_index().unwrap();
4844 assert_eq!(column_index.len(), 1); assert_eq!(column_index[0].len(), 2); let a_idx = &column_index[0][0];
4848 assert!(
4849 matches!(a_idx, ColumnIndexMetaData::BYTE_ARRAY(_)),
4850 "{a_idx:?}"
4851 );
4852 let b_idx = &column_index[0][1];
4853 assert!(matches!(b_idx, ColumnIndexMetaData::NONE), "{b_idx:?}");
4854 }
4855
4856 #[test]
4857 fn test_disabled_statistics_with_chunk() {
4858 let file_schema = Schema::new(vec![
4859 Field::new("a", DataType::Utf8, true),
4860 Field::new("b", DataType::Utf8, true),
4861 ]);
4862 let file_schema = Arc::new(file_schema);
4863
4864 let batch = RecordBatch::try_new(
4865 file_schema.clone(),
4866 vec![
4867 Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
4868 Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
4869 ],
4870 )
4871 .unwrap();
4872
4873 let props = WriterProperties::builder()
4874 .set_statistics_enabled(EnabledStatistics::None)
4875 .set_column_statistics_enabled("a".into(), EnabledStatistics::Chunk)
4876 .build();
4877
4878 let mut buf = Vec::with_capacity(1024);
4879 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
4880 writer.write(&batch).unwrap();
4881
4882 let metadata = writer.close().unwrap();
4883 assert_eq!(metadata.num_row_groups(), 1);
4884 let row_group = metadata.row_group(0);
4885 assert_eq!(row_group.num_columns(), 2);
4886 assert!(row_group.column(0).offset_index_offset().is_some());
4888 assert!(row_group.column(0).column_index_offset().is_none());
4889 assert!(row_group.column(1).offset_index_offset().is_some());
4891 assert!(row_group.column(1).column_index_offset().is_none());
4892
4893 let options = ReadOptionsBuilder::new().with_page_index().build();
4894 let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
4895
4896 let row_group = reader.get_row_group(0).unwrap();
4897 let a_col = row_group.metadata().column(0);
4898 let b_col = row_group.metadata().column(1);
4899
4900 if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
4902 let min = byte_array_stats.min_opt().unwrap();
4903 let max = byte_array_stats.max_opt().unwrap();
4904
4905 assert_eq!(min.as_bytes(), b"a");
4906 assert_eq!(max.as_bytes(), b"d");
4907 } else {
4908 panic!("expecting Statistics::ByteArray");
4909 }
4910
4911 assert!(b_col.statistics().is_none());
4913
4914 let column_index = reader.metadata().column_index().unwrap();
4915 assert_eq!(column_index.len(), 1); assert_eq!(column_index[0].len(), 2); let a_idx = &column_index[0][0];
4919 assert!(matches!(a_idx, ColumnIndexMetaData::NONE), "{a_idx:?}");
4920 let b_idx = &column_index[0][1];
4921 assert!(matches!(b_idx, ColumnIndexMetaData::NONE), "{b_idx:?}");
4922 }
4923
4924 #[test]
4925 fn test_arrow_writer_skip_metadata() {
4926 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
4927 let file_schema = Arc::new(batch_schema.clone());
4928
4929 let batch = RecordBatch::try_new(
4930 Arc::new(batch_schema),
4931 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4932 )
4933 .unwrap();
4934 let skip_options = ArrowWriterOptions::new().with_skip_arrow_metadata(true);
4935
4936 let mut buf = Vec::with_capacity(1024);
4937 let mut writer =
4938 ArrowWriter::try_new_with_options(&mut buf, file_schema.clone(), skip_options).unwrap();
4939 writer.write(&batch).unwrap();
4940 writer.close().unwrap();
4941
4942 let bytes = Bytes::from(buf);
4943 let reader_builder = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
4944 assert_eq!(file_schema, *reader_builder.schema());
4945 if let Some(key_value_metadata) = reader_builder
4946 .metadata()
4947 .file_metadata()
4948 .key_value_metadata()
4949 {
4950 assert!(
4951 !key_value_metadata
4952 .iter()
4953 .any(|kv| kv.key.as_str() == ARROW_SCHEMA_META_KEY)
4954 );
4955 }
4956 }
4957
4958 #[test]
4959 fn test_arrow_writer_skip_path_in_schema() {
4960 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
4961 let file_schema = Arc::new(batch_schema.clone());
4962
4963 let batch = RecordBatch::try_new(
4964 Arc::new(batch_schema),
4965 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4966 )
4967 .unwrap();
4968
4969 let skip_options = ArrowWriterOptions::new();
4971
4972 let mut buf = Vec::with_capacity(1024);
4973 let mut writer =
4974 ArrowWriter::try_new_with_options(&mut buf, file_schema.clone(), skip_options).unwrap();
4975 writer.write(&batch).unwrap();
4976 writer.close().unwrap();
4977
4978 let skip_options = ArrowWriterOptions::new().with_properties(
4980 WriterProperties::builder()
4981 .set_write_path_in_schema(false)
4982 .build(),
4983 );
4984
4985 let mut buf2 = Vec::with_capacity(1024);
4986 let mut writer =
4987 ArrowWriter::try_new_with_options(&mut buf2, file_schema.clone(), skip_options)
4988 .unwrap();
4989 writer.write(&batch).unwrap();
4990 writer.close().unwrap();
4991
4992 assert!(buf.len() > buf2.len());
4994 }
4995
4996 #[test]
4997 fn mismatched_schemas() {
4998 let batch_schema = Schema::new(vec![Field::new("count", DataType::Int32, false)]);
4999 let file_schema = Arc::new(Schema::new(vec![Field::new(
5000 "temperature",
5001 DataType::Float64,
5002 false,
5003 )]));
5004
5005 let batch = RecordBatch::try_new(
5006 Arc::new(batch_schema),
5007 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
5008 )
5009 .unwrap();
5010
5011 let mut buf = Vec::with_capacity(1024);
5012 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
5013
5014 let err = writer.write(&batch).unwrap_err().to_string();
5015 assert_eq!(
5016 err,
5017 "Arrow: Incompatible type. Field 'temperature' has type Float64, array has type Int32"
5018 );
5019 }
5020
5021 #[test]
5022 fn test_roundtrip_empty_schema() {
5024 let empty_batch = RecordBatch::try_new_with_options(
5026 Arc::new(Schema::empty()),
5027 vec![],
5028 &RecordBatchOptions::default().with_row_count(Some(0)),
5029 )
5030 .unwrap();
5031
5032 let mut parquet_bytes: Vec<u8> = Vec::new();
5034 let mut writer =
5035 ArrowWriter::try_new(&mut parquet_bytes, empty_batch.schema(), None).unwrap();
5036 writer.write(&empty_batch).unwrap();
5037 writer.close().unwrap();
5038
5039 let bytes = Bytes::from(parquet_bytes);
5041 let reader = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
5042 assert_eq!(reader.schema(), &empty_batch.schema());
5043 let batches: Vec<_> = reader
5044 .build()
5045 .unwrap()
5046 .collect::<ArrowResult<Vec<_>>>()
5047 .unwrap();
5048 assert_eq!(batches.len(), 0);
5049 }
5050
5051 #[test]
5052 fn test_page_stats_not_written_by_default() {
5053 let string_field = Field::new("a", DataType::Utf8, false);
5054 let schema = Schema::new(vec![string_field]);
5055 let raw_string_values = vec!["Blart Versenwald III"];
5056 let string_values = StringArray::from(raw_string_values.clone());
5057 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
5058
5059 let props = WriterProperties::builder()
5060 .set_statistics_enabled(EnabledStatistics::Page)
5061 .set_dictionary_enabled(false)
5062 .set_encoding(Encoding::PLAIN)
5063 .set_compression(crate::basic::Compression::UNCOMPRESSED)
5064 .build();
5065
5066 let file = roundtrip_opts(&batch, props);
5067
5068 let first_page = &file[4..];
5073 let mut prot = ThriftSliceInputProtocol::new(first_page);
5074 let hdr = PageHeader::read_thrift(&mut prot).unwrap();
5075 let stats = hdr.data_page_header.unwrap().statistics;
5076
5077 assert!(stats.is_none());
5078 }
5079
5080 #[test]
5081 fn test_page_stats_when_enabled() {
5082 let string_field = Field::new("a", DataType::Utf8, false);
5083 let schema = Schema::new(vec![string_field]);
5084 let raw_string_values = vec!["Blart Versenwald III", "Andrew Lamb"];
5085 let string_values = StringArray::from(raw_string_values.clone());
5086 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
5087
5088 let props = WriterProperties::builder()
5089 .set_statistics_enabled(EnabledStatistics::Page)
5090 .set_dictionary_enabled(false)
5091 .set_encoding(Encoding::PLAIN)
5092 .set_write_page_header_statistics(true)
5093 .set_compression(crate::basic::Compression::UNCOMPRESSED)
5094 .build();
5095
5096 let file = roundtrip_opts(&batch, props);
5097
5098 let first_page = &file[4..];
5103 let mut prot = ThriftSliceInputProtocol::new(first_page);
5104 let hdr = PageHeader::read_thrift(&mut prot).unwrap();
5105 let stats = hdr.data_page_header.unwrap().statistics;
5106
5107 let stats = stats.unwrap();
5108 assert!(stats.is_max_value_exact.unwrap());
5110 assert!(stats.is_min_value_exact.unwrap());
5111 assert_eq!(stats.max_value.unwrap(), "Blart Versenwald III".as_bytes());
5112 assert_eq!(stats.min_value.unwrap(), "Andrew Lamb".as_bytes());
5113 }
5114
5115 #[test]
5116 fn test_page_stats_truncation() {
5117 let string_field = Field::new("a", DataType::Utf8, false);
5118 let binary_field = Field::new("b", DataType::Binary, false);
5119 let schema = Schema::new(vec![string_field, binary_field]);
5120
5121 let raw_string_values = vec!["Blart Versenwald III"];
5122 let raw_binary_values = [b"Blart Versenwald III".to_vec()];
5123 let raw_binary_value_refs = raw_binary_values
5124 .iter()
5125 .map(|x| x.as_slice())
5126 .collect::<Vec<_>>();
5127
5128 let string_values = StringArray::from(raw_string_values.clone());
5129 let binary_values = BinaryArray::from(raw_binary_value_refs);
5130 let batch = RecordBatch::try_new(
5131 Arc::new(schema),
5132 vec![Arc::new(string_values), Arc::new(binary_values)],
5133 )
5134 .unwrap();
5135
5136 let props = WriterProperties::builder()
5137 .set_statistics_truncate_length(Some(2))
5138 .set_dictionary_enabled(false)
5139 .set_encoding(Encoding::PLAIN)
5140 .set_write_page_header_statistics(true)
5141 .set_compression(crate::basic::Compression::UNCOMPRESSED)
5142 .build();
5143
5144 let file = roundtrip_opts(&batch, props);
5145
5146 let first_page = &file[4..];
5151 let mut prot = ThriftSliceInputProtocol::new(first_page);
5152 let hdr = PageHeader::read_thrift(&mut prot).unwrap();
5153 let stats = hdr.data_page_header.unwrap().statistics;
5154 assert!(stats.is_some());
5155 let stats = stats.unwrap();
5156 assert!(!stats.is_max_value_exact.unwrap());
5158 assert!(!stats.is_min_value_exact.unwrap());
5159 assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
5160 assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
5161
5162 let second_page = &prot.as_slice()[hdr.compressed_page_size as usize..];
5164 let mut prot = ThriftSliceInputProtocol::new(second_page);
5165 let hdr = PageHeader::read_thrift(&mut prot).unwrap();
5166 let stats = hdr.data_page_header.unwrap().statistics;
5167 assert!(stats.is_some());
5168 let stats = stats.unwrap();
5169 assert!(!stats.is_max_value_exact.unwrap());
5171 assert!(!stats.is_min_value_exact.unwrap());
5172 assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
5173 assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
5174 }
5175
5176 #[test]
5177 fn test_page_encoding_statistics_roundtrip() {
5178 let batch_schema = Schema::new(vec![Field::new(
5179 "int32",
5180 arrow_schema::DataType::Int32,
5181 false,
5182 )]);
5183
5184 let batch = RecordBatch::try_new(
5185 Arc::new(batch_schema.clone()),
5186 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
5187 )
5188 .unwrap();
5189
5190 let mut file: File = tempfile::tempfile().unwrap();
5191 let mut writer = ArrowWriter::try_new(&mut file, Arc::new(batch_schema), None).unwrap();
5192 writer.write(&batch).unwrap();
5193 let file_metadata = writer.close().unwrap();
5194
5195 assert_eq!(file_metadata.num_row_groups(), 1);
5196 assert_eq!(file_metadata.row_group(0).num_columns(), 1);
5197 assert!(
5198 file_metadata
5199 .row_group(0)
5200 .column(0)
5201 .page_encoding_stats()
5202 .is_some()
5203 );
5204 let chunk_page_stats = file_metadata
5205 .row_group(0)
5206 .column(0)
5207 .page_encoding_stats()
5208 .unwrap();
5209
5210 let options = ReadOptionsBuilder::new()
5212 .with_page_index()
5213 .with_encoding_stats_as_mask(false)
5214 .build();
5215 let reader = SerializedFileReader::new_with_options(file, options).unwrap();
5216
5217 let rowgroup = reader.get_row_group(0).expect("row group missing");
5218 assert_eq!(rowgroup.num_columns(), 1);
5219 let column = rowgroup.metadata().column(0);
5220 assert!(column.page_encoding_stats().is_some());
5221 let file_page_stats = column.page_encoding_stats().unwrap();
5222 assert_eq!(chunk_page_stats, file_page_stats);
5223 }
5224
5225 #[test]
5226 fn test_different_dict_page_size_limit() {
5227 let array = Arc::new(Int64Array::from_iter(0..1024 * 1024));
5228 let schema = Arc::new(Schema::new(vec![
5229 Field::new("col0", arrow_schema::DataType::Int64, false),
5230 Field::new("col1", arrow_schema::DataType::Int64, false),
5231 ]));
5232 let batch =
5233 arrow_array::RecordBatch::try_new(schema.clone(), vec![array.clone(), array]).unwrap();
5234
5235 let props = WriterProperties::builder()
5236 .set_dictionary_page_size_limit(1024 * 1024)
5237 .set_column_dictionary_page_size_limit(ColumnPath::from("col1"), 1024 * 1024 * 4)
5238 .build();
5239 let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap();
5240 writer.write(&batch).unwrap();
5241 let data = Bytes::from(writer.into_inner().unwrap());
5242
5243 let mut metadata = ParquetMetaDataReader::new();
5244 metadata.try_parse(&data).unwrap();
5245 let metadata = metadata.finish().unwrap();
5246 let col0_meta = metadata.row_group(0).column(0);
5247 let col1_meta = metadata.row_group(0).column(1);
5248
5249 let get_dict_page_size = move |meta: &ColumnChunkMetaData| {
5250 let mut reader =
5251 SerializedPageReader::new(Arc::new(data.clone()), meta, 0, None).unwrap();
5252 let page = reader.get_next_page().unwrap().unwrap();
5253 match page {
5254 Page::DictionaryPage { buf, .. } => buf.len(),
5255 _ => panic!("expected DictionaryPage"),
5256 }
5257 };
5258
5259 assert_eq!(get_dict_page_size(col0_meta), 1024 * 1024);
5260 assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4);
5261 }
5262
5263 #[test]
5264 fn test_arrow_writer_granular_mode_roundtrip() {
5265 let small = "tiny".to_string();
5274 let big = "x".repeat(64 * 1024);
5275 let strings: Vec<String> = (0..256)
5276 .map(|i| {
5277 if i % 16 == 0 {
5278 big.clone()
5279 } else {
5280 small.clone()
5281 }
5282 })
5283 .collect();
5284
5285 let schema = Arc::new(Schema::new(vec![Field::new(
5286 "col",
5287 ArrowDataType::Utf8,
5288 false,
5289 )]));
5290 let batch = RecordBatch::try_new(
5291 schema.clone(),
5292 vec![Arc::new(StringArray::from(strings.clone())) as _],
5293 )
5294 .unwrap();
5295
5296 let props = WriterProperties::builder()
5297 .set_dictionary_enabled(false)
5298 .set_data_page_size_limit(16 * 1024)
5299 .build();
5300 let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap();
5301 writer.write(&batch).unwrap();
5302 let data = Bytes::from(writer.into_inner().unwrap());
5303
5304 let mut reader = ParquetRecordBatchReader::try_new(data, 1024).unwrap();
5305 let read = reader.next().unwrap().unwrap();
5306 assert!(reader.next().is_none(), "expected one batch");
5307 let col = read
5308 .column(0)
5309 .as_any()
5310 .downcast_ref::<StringArray>()
5311 .unwrap();
5312 assert_eq!(col.len(), strings.len());
5313 for (i, expected) in strings.iter().enumerate() {
5314 assert_eq!(
5315 col.value(i),
5316 expected.as_str(),
5317 "value mismatch at index {i}"
5318 );
5319 }
5320 }
5321
5322 #[test]
5323 fn test_arrow_writer_all_null_string_column() {
5324 let num_rows = 1024;
5329 let schema = Arc::new(Schema::new(vec![Field::new(
5330 "col",
5331 ArrowDataType::Utf8,
5332 true,
5333 )]));
5334 let nulls: Vec<Option<&str>> = vec![None; num_rows];
5335 let batch = RecordBatch::try_new(
5336 schema.clone(),
5337 vec![Arc::new(StringArray::from(nulls)) as _],
5338 )
5339 .unwrap();
5340
5341 let props = WriterProperties::builder()
5342 .set_dictionary_enabled(false)
5343 .set_data_page_size_limit(16 * 1024)
5344 .build();
5345 let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap();
5346 writer.write(&batch).unwrap();
5347 let data = Bytes::from(writer.into_inner().unwrap());
5348
5349 let mut metadata = ParquetMetaDataReader::new();
5352 metadata.try_parse(&data).unwrap();
5353 let metadata = metadata.finish().unwrap();
5354 let row_group = metadata.row_group(0);
5355 let col_meta = row_group.column(0);
5356 assert_eq!(row_group.num_rows() as usize, num_rows);
5357 if let Some(stats) = col_meta.statistics() {
5360 assert_eq!(
5361 stats.null_count_opt().unwrap_or(0) as usize,
5362 num_rows,
5363 "expected all-null column to report null_count = num_rows"
5364 );
5365 }
5366
5367 let mut reader =
5368 SerializedPageReader::new(Arc::new(data.clone()), col_meta, num_rows, None).unwrap();
5369 let mut total_values = 0u32;
5370 while let Some(page) = reader.get_next_page().unwrap() {
5371 if matches!(page, Page::DataPage { .. } | Page::DataPageV2 { .. }) {
5372 total_values += page.num_values();
5373 }
5374 }
5375 assert_eq!(
5376 total_values as usize, num_rows,
5377 "expected every level position to be represented in some page"
5378 );
5379 }
5380
5381 struct WriteBatchesShape {
5382 num_batches: usize,
5383 rows_per_batch: usize,
5384 row_size: usize,
5385 }
5386
5387 fn write_batches(
5389 WriteBatchesShape {
5390 num_batches,
5391 rows_per_batch,
5392 row_size,
5393 }: WriteBatchesShape,
5394 props: WriterProperties,
5395 ) -> ParquetRecordBatchReaderBuilder<File> {
5396 let schema = Arc::new(Schema::new(vec![Field::new(
5397 "str",
5398 ArrowDataType::Utf8,
5399 false,
5400 )]));
5401 let file = tempfile::tempfile().unwrap();
5402 let mut writer =
5403 ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
5404
5405 for batch_idx in 0..num_batches {
5406 let strings: Vec<String> = (0..rows_per_batch)
5407 .map(|i| format!("{:0>width$}", batch_idx * 10 + i, width = row_size))
5408 .collect();
5409 let array = StringArray::from(strings);
5410 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
5411 writer.write(&batch).unwrap();
5412 }
5413 writer.close().unwrap();
5414 ParquetRecordBatchReaderBuilder::try_new(file).unwrap()
5415 }
5416
5417 #[test]
5418 fn test_row_group_limit_none_writes_single_row_group() {
5420 let props = WriterProperties::builder()
5421 .set_max_row_group_row_count(None)
5422 .set_max_row_group_bytes(None)
5423 .build();
5424
5425 let builder = write_batches(
5426 WriteBatchesShape {
5427 num_batches: 1,
5428 rows_per_batch: 1000,
5429 row_size: 4,
5430 },
5431 props,
5432 );
5433
5434 assert_eq!(
5435 &row_group_sizes(builder.metadata()),
5436 &[1000],
5437 "With no limits, all rows should be in a single row group"
5438 );
5439 }
5440
5441 #[test]
5442 fn test_row_group_limit_rows_only() {
5444 let props = WriterProperties::builder()
5445 .set_max_row_group_row_count(Some(300))
5446 .set_max_row_group_bytes(None)
5447 .build();
5448
5449 let builder = write_batches(
5450 WriteBatchesShape {
5451 num_batches: 1,
5452 rows_per_batch: 1000,
5453 row_size: 4,
5454 },
5455 props,
5456 );
5457
5458 assert_eq!(
5459 &row_group_sizes(builder.metadata()),
5460 &[300, 300, 300, 100],
5461 "Row groups should be split by row count"
5462 );
5463 }
5464
5465 #[test]
5466 fn test_row_group_limit_bytes_only() {
5468 let props = WriterProperties::builder()
5469 .set_max_row_group_row_count(None)
5470 .set_max_row_group_bytes(Some(3500))
5472 .build();
5473
5474 let builder = write_batches(
5475 WriteBatchesShape {
5476 num_batches: 10,
5477 rows_per_batch: 10,
5478 row_size: 100,
5479 },
5480 props,
5481 );
5482
5483 let sizes = row_group_sizes(builder.metadata());
5484
5485 assert!(
5486 sizes.len() > 1,
5487 "Should have multiple row groups due to byte limit, got {sizes:?}",
5488 );
5489
5490 let total_rows: i64 = sizes.iter().sum();
5491 assert_eq!(total_rows, 100, "Total rows should be preserved");
5492 }
5493
5494 #[test]
5495 fn test_row_group_limit_bytes_flushes_when_current_group_already_too_large() {
5497 let schema = Arc::new(Schema::new(vec![Field::new(
5498 "str",
5499 ArrowDataType::Utf8,
5500 false,
5501 )]));
5502 let file = tempfile::tempfile().unwrap();
5503
5504 let props = WriterProperties::builder()
5506 .set_max_row_group_row_count(None)
5507 .set_max_row_group_bytes(None)
5508 .build();
5509 let mut writer =
5510 ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
5511
5512 let first_array = StringArray::from(
5513 (0..10)
5514 .map(|i| format!("{:0>100}", i))
5515 .collect::<Vec<String>>(),
5516 );
5517 let first_batch =
5518 RecordBatch::try_new(schema.clone(), vec![Arc::new(first_array)]).unwrap();
5519 writer.write(&first_batch).unwrap();
5520 assert_eq!(writer.in_progress_rows(), 10);
5521
5522 writer.max_row_group_bytes = Some(1);
5525
5526 let second_array = StringArray::from(vec!["x".to_string()]);
5527 let second_batch =
5528 RecordBatch::try_new(schema.clone(), vec![Arc::new(second_array)]).unwrap();
5529 writer.write(&second_batch).unwrap();
5530 writer.close().unwrap();
5531 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
5532
5533 assert_eq!(
5534 &row_group_sizes(builder.metadata()),
5535 &[10, 1],
5536 "The second write should flush an oversized in-progress row group first",
5537 );
5538 }
5539
5540 #[test]
5541 fn test_row_group_limit_both_row_wins_single_batch() {
5543 let props = WriterProperties::builder()
5544 .set_max_row_group_row_count(Some(200)) .set_max_row_group_bytes(Some(1024 * 1024)) .build();
5547
5548 let builder = write_batches(
5549 WriteBatchesShape {
5550 num_batches: 1,
5551 row_size: 4,
5552 rows_per_batch: 1000,
5553 },
5554 props,
5555 );
5556
5557 assert_eq!(
5558 &row_group_sizes(builder.metadata()),
5559 &[200, 200, 200, 200, 200],
5560 "Row limit should trigger before byte limit"
5561 );
5562 }
5563
5564 #[test]
5565 fn test_row_group_limit_both_row_wins_multiple_batches() {
5567 let props = WriterProperties::builder()
5568 .set_max_row_group_row_count(Some(5)) .set_max_row_group_bytes(Some(9999)) .build();
5571
5572 let builder = write_batches(
5573 WriteBatchesShape {
5574 num_batches: 10,
5575 rows_per_batch: 10,
5576 row_size: 100,
5577 },
5578 props,
5579 );
5580
5581 assert_eq!(
5582 &row_group_sizes(builder.metadata()),
5583 &[5; 20],
5584 "Row limit should trigger before byte limit"
5585 );
5586 }
5587
5588 #[test]
5589 fn test_row_group_limit_both_bytes_wins() {
5591 let props = WriterProperties::builder()
5592 .set_max_row_group_row_count(Some(1000)) .set_max_row_group_bytes(Some(3500)) .build();
5595
5596 let builder = write_batches(
5597 WriteBatchesShape {
5598 num_batches: 10,
5599 rows_per_batch: 10,
5600 row_size: 100,
5601 },
5602 props,
5603 );
5604
5605 let sizes = row_group_sizes(builder.metadata());
5606
5607 assert!(
5608 sizes.len() > 1,
5609 "Byte limit should trigger before row limit, got {sizes:?}",
5610 );
5611
5612 assert!(
5613 sizes.iter().all(|&s| s < 1000),
5614 "No row group should hit the row limit"
5615 );
5616
5617 let total_rows: i64 = sizes.iter().sum();
5618 assert_eq!(total_rows, 100, "Total rows should be preserved");
5619 }
5620
5621 #[test]
5622 fn arrow_column_chunk_close_mut_drops_column_index() {
5623 use crate::arrow::ArrowSchemaConverter;
5624 use crate::file::writer::SerializedFileWriter;
5625
5626 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)]));
5627 let props = Arc::new(
5628 WriterProperties::builder()
5629 .set_statistics_enabled(EnabledStatistics::Page)
5630 .build(),
5631 );
5632 let parquet_schema = ArrowSchemaConverter::new()
5633 .with_coerce_types(props.coerce_types())
5634 .convert(&schema)
5635 .unwrap();
5636
5637 let mut buf = Vec::with_capacity(1024);
5638 let mut writer =
5639 SerializedFileWriter::new(&mut buf, parquet_schema.root_schema_ptr(), props.clone())
5640 .unwrap();
5641
5642 let factory = ArrowRowGroupWriterFactory::new(&writer, Arc::clone(&schema));
5643 let mut col_writers = factory.create_column_writers(0).unwrap();
5644 let arr: ArrayRef = Arc::new(Int32Array::from_iter_values(0..64));
5645 for leaves in compute_leaves(schema.field(0), &arr).unwrap() {
5646 col_writers[0].write(&leaves).unwrap();
5647 }
5648 let mut chunk = col_writers.pop().unwrap().close().unwrap();
5649
5650 assert!(
5652 chunk.close().column_index.is_some(),
5653 "EnabledStatistics::Page should produce a column_index"
5654 );
5655
5656 chunk.close_mut().column_index = None;
5658 assert!(chunk.close().column_index.is_none());
5659
5660 let mut rg = writer.next_row_group().unwrap();
5661 chunk.append_to_row_group(&mut rg).unwrap();
5662 rg.close().unwrap();
5663 let file_meta = writer.close().unwrap();
5664
5665 let cc = file_meta.row_group(0).column(0);
5668 assert!(cc.column_index_range().is_none());
5669 }
5670
5671 fn write_column_to_bytes(array: ArrayRef) -> Bytes {
5673 let schema = Arc::new(Schema::new(vec![Field::new(
5674 "col",
5675 array.data_type().clone(),
5676 true,
5677 )]));
5678 let buf = get_bytes_after_close(
5679 schema.clone(),
5680 &RecordBatch::try_new(schema, vec![array]).unwrap(),
5681 );
5682 Bytes::from(buf)
5683 }
5684
5685 fn read_column_with_schema(bytes: Bytes, schema: SchemaRef) -> ArrayRef {
5689 let opts = crate::arrow::arrow_reader::ArrowReaderOptions::new().with_schema(schema);
5690 ParquetRecordBatchReaderBuilder::try_new_with_options(bytes, opts)
5691 .unwrap()
5692 .build()
5693 .unwrap()
5694 .next()
5695 .unwrap()
5696 .unwrap()
5697 .column(0)
5698 .clone()
5699 }
5700
5701 fn ree_write_read_roundtrip(ree: ArrayRef, flat: ArrayRef) {
5702 let flat_schema = Arc::new(Schema::new(vec![Field::new(
5703 "col",
5704 flat.data_type().clone(),
5705 true,
5706 )]));
5707 let ree_bytes = write_column_to_bytes(ree);
5708 let flat_bytes = write_column_to_bytes(flat.clone());
5709 assert_eq!(
5710 ree_bytes, flat_bytes,
5711 "REE and flat bytes should be identical"
5712 );
5713
5714 let decoded_ree = read_column_with_schema(ree_bytes, flat_schema.clone());
5715 let decoded_flat = read_column_with_schema(flat_bytes, flat_schema);
5716
5717 assert_eq!(decoded_ree.as_ref(), flat.as_ref());
5718 assert_eq!(decoded_ree.as_ref(), decoded_flat.as_ref());
5719 }
5720
5721 #[test]
5722 fn ree_string() {
5723 let ree: ArrayRef = Arc::new(
5724 [Some("a"), Some("a"), None, Some("b"), Some("b")]
5725 .into_iter()
5726 .collect::<Int32RunArray>(),
5727 );
5728 let flat: ArrayRef = Arc::new(StringArray::from(vec![
5729 Some("a"),
5730 Some("a"),
5731 None,
5732 Some("b"),
5733 Some("b"),
5734 ]));
5735 ree_write_read_roundtrip(ree, flat);
5736 }
5737
5738 #[test]
5739 fn ree_int32() {
5740 let mut b = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
5741 for v in [Some(1), Some(1), None, Some(2), Some(2)] {
5742 b.append_option(v);
5743 }
5744 let ree: ArrayRef = Arc::new(b.finish());
5745 let flat: ArrayRef = Arc::new(Int32Array::from(vec![
5746 Some(1),
5747 Some(1),
5748 None,
5749 Some(2),
5750 Some(2),
5751 ]));
5752 ree_write_read_roundtrip(ree, flat);
5753 }
5754
5755 #[test]
5756 fn ree_bool() {
5757 let ree: ArrayRef = Arc::new(
5759 RunArray::try_new(
5760 &Int32Array::from(vec![3, 5, 7]),
5761 &BooleanArray::from(vec![Some(true), None, Some(false)]),
5762 )
5763 .unwrap(),
5764 );
5765 let flat: ArrayRef = Arc::new(BooleanArray::from(vec![
5766 Some(true),
5767 Some(true),
5768 Some(true),
5769 None,
5770 None,
5771 Some(false),
5772 Some(false),
5773 ]));
5774 ree_write_read_roundtrip(ree, flat);
5775 }
5776
5777 #[test]
5778 fn ree_fixed_size_binary() {
5779 let mk = |vals: &[Option<&[u8]>]| -> FixedSizeBinaryArray {
5780 let mut b = FixedSizeBinaryBuilder::new(2);
5781 for v in vals {
5782 match v {
5783 Some(x) => b.append_value(x).unwrap(),
5784 None => b.append_null(),
5785 }
5786 }
5787 b.finish()
5788 };
5789 let ree: ArrayRef = Arc::new(
5791 RunArray::try_new(
5792 &Int32Array::from(vec![2, 4, 6]),
5793 &mk(&[Some(b"aa"), None, Some(b"bb")]),
5794 )
5795 .unwrap(),
5796 );
5797 let flat: ArrayRef = Arc::new(mk(&[
5798 Some(b"aa"),
5799 Some(b"aa"),
5800 None,
5801 None,
5802 Some(b"bb"),
5803 Some(b"bb"),
5804 ]));
5805 ree_write_read_roundtrip(ree, flat);
5806 }
5807
5808 #[test]
5809 fn ree_single_run() {
5810 let ree: ArrayRef = Arc::new(["x", "x", "x"].into_iter().collect::<Int32RunArray>());
5811 let flat: ArrayRef = Arc::new(StringArray::from(vec!["x", "x", "x"]));
5812 ree_write_read_roundtrip(ree, flat);
5813 }
5814
5815 #[test]
5816 fn ree_float32() {
5817 let ree: ArrayRef = Arc::new(
5819 RunArray::try_new(
5820 &Int32Array::from(vec![2, 4, 5]),
5821 &Float32Array::from(vec![Some(1.0_f32), None, Some(2.5_f32)]),
5822 )
5823 .unwrap(),
5824 );
5825 let flat: ArrayRef = Arc::new(Float32Array::from(vec![
5826 Some(1.0_f32),
5827 Some(1.0_f32),
5828 None,
5829 None,
5830 Some(2.5_f32),
5831 ]));
5832 ree_write_read_roundtrip(ree, flat);
5833 }
5834
5835 #[test]
5836 fn ree_sliced() {
5837 let full: ArrayRef = Arc::new(
5842 RunArray::try_new(
5843 &Int32Array::from(vec![3, 5, 7]),
5844 &StringArray::from(vec!["a", "b", "c"]),
5845 )
5846 .unwrap(),
5847 );
5848 let sliced = full.slice(2, 5);
5849 let flat: ArrayRef = Arc::new(StringArray::from(vec!["a", "b", "b", "c", "c"]));
5850 ree_write_read_roundtrip(sliced, flat);
5851 }
5852
5853 #[test]
5854 fn ree_struct_with_ree_child() {
5855 let run_ends = Int32Array::from(vec![2i32, 3, 5]);
5858
5859 let col_a: ArrayRef = Arc::new(
5860 RunArray::try_new(
5861 &run_ends,
5862 &StringArray::from(vec![Some("foo"), None, Some("bar")]),
5863 )
5864 .unwrap(),
5865 );
5866 let col_b: ArrayRef = Arc::new(
5867 RunArray::try_new(&run_ends, &Int32Array::from(vec![Some(1), None, Some(2)])).unwrap(),
5868 );
5869
5870 let struct_array: ArrayRef = Arc::new(StructArray::new(
5871 Fields::from(vec![
5872 Field::new("a", col_a.data_type().clone(), true),
5873 Field::new("b", col_b.data_type().clone(), true),
5874 ]),
5875 vec![col_a, col_b],
5876 None,
5877 ));
5878
5879 let schema = Arc::new(Schema::new(vec![Field::new(
5880 "row",
5881 struct_array.data_type().clone(),
5882 true,
5883 )]));
5884 let batch = RecordBatch::try_new(schema.clone(), vec![struct_array]).unwrap();
5885
5886 let mut buf = Vec::new();
5887 let mut writer = ArrowWriter::try_new(&mut buf, schema, None).unwrap();
5888 writer.write(&batch).unwrap();
5889 let metadata = writer.close().unwrap();
5890
5891 let parquet_schema = metadata.file_metadata().schema_descr();
5892 assert_eq!(parquet_schema.num_columns(), 2);
5893 assert_eq!(
5894 parquet_schema.column(0).physical_type(),
5895 crate::basic::Type::BYTE_ARRAY
5896 );
5897 assert_eq!(parquet_schema.column(0).path().string(), "row.a");
5898 assert_eq!(
5899 parquet_schema.column(1).physical_type(),
5900 crate::basic::Type::INT32
5901 );
5902 assert_eq!(parquet_schema.column(1).path().string(), "row.b");
5903 }
5904}