1use crate::column::chunker::ContentDefinedChunker;
21
22use bytes::Bytes;
23use std::io::{Read, Write};
24use std::slice::Iter;
25use std::sync::{Arc, Mutex};
26use std::vec::IntoIter;
27
28use arrow_array::cast::AsArray;
29use arrow_array::types::*;
30use arrow_array::{ArrayRef, Int32Array, RecordBatch, RecordBatchWriter};
31use arrow_schema::{
32 ArrowError, DataType as ArrowDataType, Field, IntervalUnit, SchemaRef, TimeUnit,
33};
34
35use super::schema::{add_encoded_arrow_schema_to_metadata, decimal_length_from_precision};
36
37use crate::arrow::ArrowSchemaConverter;
38use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder;
39use crate::basic::PageType;
40use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter};
41use crate::column::page_encryption::PageEncryptor;
42use crate::column::writer::encoder::ColumnValueEncoder;
43use crate::column::writer::{
44 ColumnCloseResult, ColumnWriter, GenericColumnWriter, get_column_writer,
45};
46use crate::data_type::{ByteArray, FixedLenByteArray};
47#[cfg(feature = "encryption")]
48use crate::encryption::encrypt::FileEncryptor;
49use crate::errors::{ParquetError, Result};
50use crate::file::metadata::{KeyValue, ParquetMetaData, RowGroupMetaData};
51use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
52use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
53use crate::parquet_thrift::{ThriftCompactOutputProtocol, WriteThrift};
54use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor};
55use levels::{ArrayLevels, calculate_array_levels};
56
57mod byte_array;
58mod levels;
59
60#[doc(inline)]
61pub use crate::column::page_store::{
62 InMemoryPageStore, InMemoryPageStoreFactory, PageKey, PageStore, PageStoreArgs,
63 PageStoreFactory,
64};
65
66pub struct ArrowWriter<W: Write> {
183 writer: SerializedFileWriter<W>,
185
186 in_progress: Option<ArrowRowGroupWriter>,
188
189 arrow_schema: SchemaRef,
193
194 row_group_writer_factory: ArrowRowGroupWriterFactory,
196
197 max_row_group_row_count: Option<usize>,
199
200 max_row_group_bytes: Option<usize>,
202
203 cdc_chunkers: Option<Vec<ContentDefinedChunker>>,
205}
206
207impl<W: Write + Send> std::fmt::Debug for ArrowWriter<W> {
208 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
209 let buffered_memory = self.in_progress_size();
210 f.debug_struct("ArrowWriter")
211 .field("writer", &self.writer)
212 .field("in_progress_size", &format_args!("{buffered_memory} bytes"))
213 .field("in_progress_rows", &self.in_progress_rows())
214 .field("arrow_schema", &self.arrow_schema)
215 .field("max_row_group_row_count", &self.max_row_group_row_count)
216 .field("max_row_group_bytes", &self.max_row_group_bytes)
217 .finish()
218 }
219}
220
221impl<W: Write + Send> ArrowWriter<W> {
222 pub fn try_new(
228 writer: W,
229 arrow_schema: SchemaRef,
230 props: Option<WriterProperties>,
231 ) -> Result<Self> {
232 let options = ArrowWriterOptions::new().with_properties(props.unwrap_or_default());
233 Self::try_new_with_options(writer, arrow_schema, options)
234 }
235
236 pub fn try_new_with_options(
242 writer: W,
243 arrow_schema: SchemaRef,
244 options: ArrowWriterOptions,
245 ) -> Result<Self> {
246 let mut props = options.properties;
247
248 let schema = if let Some(parquet_schema) = options.schema_descr {
249 parquet_schema.clone()
250 } else {
251 let mut converter = ArrowSchemaConverter::new().with_coerce_types(props.coerce_types());
252 if let Some(schema_root) = &options.schema_root {
253 converter = converter.schema_root(schema_root);
254 }
255
256 converter.convert(&arrow_schema)?
257 };
258
259 if !options.skip_arrow_metadata {
260 add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
262 }
263
264 let max_row_group_row_count = props.max_row_group_row_count();
265 let max_row_group_bytes = props.max_row_group_bytes();
266
267 let props_ptr = Arc::new(props);
268 let file_writer =
269 SerializedFileWriter::new(writer, schema.root_schema_ptr(), Arc::clone(&props_ptr))?;
270
271 let mut row_group_writer_factory =
272 ArrowRowGroupWriterFactory::new(&file_writer, arrow_schema.clone());
273 if let Some(page_store_factory) = options.page_store_factory {
274 row_group_writer_factory =
275 row_group_writer_factory.with_page_store_factory(page_store_factory);
276 }
277
278 let cdc_chunkers = props_ptr
279 .content_defined_chunking()
280 .map(|opts| {
281 file_writer
282 .schema_descr()
283 .columns()
284 .iter()
285 .map(|desc| ContentDefinedChunker::new(desc, opts))
286 .collect::<Result<Vec<_>>>()
287 })
288 .transpose()?;
289
290 Ok(Self {
291 writer: file_writer,
292 in_progress: None,
293 arrow_schema,
294 row_group_writer_factory,
295 max_row_group_row_count,
296 max_row_group_bytes,
297 cdc_chunkers,
298 })
299 }
300
301 pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
303 self.writer.flushed_row_groups()
304 }
305
306 pub fn memory_size(&self) -> usize {
311 match &self.in_progress {
312 Some(in_progress) => in_progress.writers.iter().map(|x| x.memory_size()).sum(),
313 None => 0,
314 }
315 }
316
317 pub fn in_progress_size(&self) -> usize {
324 match &self.in_progress {
325 Some(in_progress) => in_progress
326 .writers
327 .iter()
328 .map(|x| x.get_estimated_total_bytes())
329 .sum(),
330 None => 0,
331 }
332 }
333
334 pub fn in_progress_rows(&self) -> usize {
336 self.in_progress
337 .as_ref()
338 .map(|x| x.buffered_rows)
339 .unwrap_or_default()
340 }
341
342 pub fn bytes_written(&self) -> usize {
344 self.writer.bytes_written()
345 }
346
347 pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
359 if batch.num_rows() == 0 {
360 return Ok(());
361 }
362
363 let in_progress = match &mut self.in_progress {
364 Some(in_progress) => in_progress,
365 x => x.insert(
366 self.row_group_writer_factory
367 .create_row_group_writer(self.writer.flushed_row_groups().len())?,
368 ),
369 };
370
371 if let Some(max_rows) = self.max_row_group_row_count {
372 if in_progress.buffered_rows + batch.num_rows() > max_rows {
373 let to_write = max_rows - in_progress.buffered_rows;
374 let a = batch.slice(0, to_write);
375 let b = batch.slice(to_write, batch.num_rows() - to_write);
376 self.write(&a)?;
377 return self.write(&b);
378 }
379 }
380
381 if let Some(max_bytes) = self.max_row_group_bytes {
384 if in_progress.buffered_rows > 0 {
385 let current_bytes = in_progress.get_estimated_total_bytes();
386
387 if current_bytes >= max_bytes {
388 self.flush()?;
389 return self.write(batch);
390 }
391
392 let avg_row_bytes = current_bytes / in_progress.buffered_rows;
393 if avg_row_bytes > 0 {
394 let remaining_bytes = max_bytes - current_bytes;
396 let rows_that_fit = remaining_bytes / avg_row_bytes;
397
398 if batch.num_rows() > rows_that_fit {
399 if rows_that_fit > 0 {
400 let a = batch.slice(0, rows_that_fit);
401 let b = batch.slice(rows_that_fit, batch.num_rows() - rows_that_fit);
402 self.write(&a)?;
403 return self.write(&b);
404 } else {
405 self.flush()?;
406 return self.write(batch);
407 }
408 }
409 }
410 }
411 }
412
413 match self.cdc_chunkers.as_mut() {
414 Some(chunkers) => in_progress.write_with_chunkers(batch, chunkers)?,
415 None => in_progress.write(batch)?,
416 }
417
418 let should_flush = self
419 .max_row_group_row_count
420 .is_some_and(|max| in_progress.buffered_rows >= max)
421 || self
422 .max_row_group_bytes
423 .is_some_and(|max| in_progress.get_estimated_total_bytes() >= max);
424
425 if should_flush {
426 self.flush()?
427 }
428 Ok(())
429 }
430
431 pub fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
436 self.writer.write_all(buf)
437 }
438
439 pub fn sync(&mut self) -> std::io::Result<()> {
441 self.writer.flush()
442 }
443
444 pub fn flush(&mut self) -> Result<()> {
449 let in_progress = match self.in_progress.take() {
450 Some(in_progress) => in_progress,
451 None => return Ok(()),
452 };
453
454 let mut row_group_writer = self.writer.next_row_group()?;
455 for chunk in in_progress.close()? {
456 chunk.append_to_row_group(&mut row_group_writer)?;
457 }
458 row_group_writer.close()?;
459 Ok(())
460 }
461
462 pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
466 self.writer.append_key_value_metadata(kv_metadata)
467 }
468
469 pub fn inner(&self) -> &W {
471 self.writer.inner()
472 }
473
474 pub fn inner_mut(&mut self) -> &mut W {
483 self.writer.inner_mut()
484 }
485
486 pub fn into_inner(mut self) -> Result<W> {
488 self.flush()?;
489 self.writer.into_inner()
490 }
491
492 pub fn finish(&mut self) -> Result<ParquetMetaData> {
498 self.flush()?;
499 self.writer.finish()
500 }
501
502 pub fn close(mut self) -> Result<ParquetMetaData> {
504 self.finish()
505 }
506
507 #[deprecated(
509 since = "56.2.0",
510 note = "Use `ArrowRowGroupWriterFactory` instead, see `ArrowColumnWriter` for an example"
511 )]
512 pub fn get_column_writers(&mut self) -> Result<Vec<ArrowColumnWriter>> {
513 self.flush()?;
514 let in_progress = self
515 .row_group_writer_factory
516 .create_row_group_writer(self.writer.flushed_row_groups().len())?;
517 Ok(in_progress.writers)
518 }
519
520 #[deprecated(
522 since = "56.2.0",
523 note = "Use `SerializedFileWriter` directly instead, see `ArrowColumnWriter` for an example"
524 )]
525 pub fn append_row_group(&mut self, chunks: Vec<ArrowColumnChunk>) -> Result<()> {
526 let mut row_group_writer = self.writer.next_row_group()?;
527 for chunk in chunks {
528 chunk.append_to_row_group(&mut row_group_writer)?;
529 }
530 row_group_writer.close()?;
531 Ok(())
532 }
533
534 pub fn into_serialized_writer(
541 mut self,
542 ) -> Result<(SerializedFileWriter<W>, ArrowRowGroupWriterFactory)> {
543 self.flush()?;
544 Ok((self.writer, self.row_group_writer_factory))
545 }
546}
547
548impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> {
549 fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
550 self.write(batch).map_err(|e| e.into())
551 }
552
553 fn close(self) -> std::result::Result<(), ArrowError> {
554 self.close()?;
555 Ok(())
556 }
557}
558
559#[derive(Debug, Clone, Default)]
563pub struct ArrowWriterOptions {
564 properties: WriterProperties,
565 skip_arrow_metadata: bool,
566 schema_root: Option<String>,
567 schema_descr: Option<SchemaDescriptor>,
568 page_store_factory: Option<Arc<dyn PageStoreFactory>>,
569}
570
571impl ArrowWriterOptions {
572 pub fn new() -> Self {
574 Self::default()
575 }
576
577 pub fn with_properties(self, properties: WriterProperties) -> Self {
579 Self { properties, ..self }
580 }
581
582 pub fn with_page_store_factory(self, page_store_factory: Arc<dyn PageStoreFactory>) -> Self {
668 Self {
669 page_store_factory: Some(page_store_factory),
670 ..self
671 }
672 }
673
674 pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
681 Self {
682 skip_arrow_metadata,
683 ..self
684 }
685 }
686
687 pub fn with_schema_root(self, schema_root: String) -> Self {
689 Self {
690 schema_root: Some(schema_root),
691 ..self
692 }
693 }
694
695 pub fn with_parquet_schema(self, schema_descr: SchemaDescriptor) -> Self {
701 Self {
702 schema_descr: Some(schema_descr),
703 ..self
704 }
705 }
706}
707
708struct ArrowColumnChunkData {
714 length: usize,
715 store: Box<dyn PageStore>,
716 keys: Vec<PageKey>,
717 dictionary_keys: Vec<PageKey>,
728 dictionary_len: usize,
732}
733
734impl ArrowColumnChunkData {
735 fn new(store: Box<dyn PageStore>) -> Self {
736 Self {
737 length: 0,
738 store,
739 keys: Vec::new(),
740 dictionary_keys: Vec::new(),
741 dictionary_len: 0,
742 }
743 }
744
745 fn push(&mut self, value: Bytes) -> Result<()> {
748 let key = self.store.put(value)?;
749 self.keys.push(key);
750 Ok(())
751 }
752
753 fn push_dictionary(&mut self, value: Bytes) -> Result<()> {
757 self.dictionary_len += value.len();
758 let key = self.store.put(value)?;
759 self.dictionary_keys.push(key);
760 Ok(())
761 }
762
763 fn memory_size(&self) -> usize {
766 self.store.memory_size()
767 }
768}
769
770struct StreamingColumnChunkReader {
779 store: Box<dyn PageStore>,
780 keys: IntoIter<PageKey>,
783 current: Bytes,
785}
786
787impl StreamingColumnChunkReader {
788 fn new(data: ArrowColumnChunkData) -> Self {
789 let keys = if data.dictionary_keys.is_empty() {
792 data.keys
793 } else {
794 let mut keys = Vec::with_capacity(data.dictionary_keys.len() + data.keys.len());
795 keys.extend(data.dictionary_keys);
796 keys.extend(data.keys);
797 keys
798 };
799 Self {
800 store: data.store,
801 keys: keys.into_iter(),
802 current: Bytes::new(),
803 }
804 }
805}
806
807impl Read for StreamingColumnChunkReader {
808 fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
809 while self.current.is_empty() {
812 if let Some(key) = self.keys.next() {
813 self.current = self.store.take(key).map_err(std::io::Error::other)?;
814 } else {
815 return Ok(0);
816 }
817 }
818
819 let len = self.current.len().min(out.len());
820 let b = self.current.split_to(len);
821 out[..len].copy_from_slice(&b);
822 Ok(len)
823 }
824}
825
826type SharedColumnChunk = Arc<Mutex<ArrowColumnChunkData>>;
831
832struct ArrowPageWriter {
833 buffer: SharedColumnChunk,
834 #[cfg(feature = "encryption")]
835 page_encryptor: Option<PageEncryptor>,
836}
837
838impl ArrowPageWriter {
839 fn new(store: Box<dyn PageStore>) -> Self {
841 Self {
842 buffer: Arc::new(Mutex::new(ArrowColumnChunkData::new(store))),
843 #[cfg(feature = "encryption")]
844 page_encryptor: None,
845 }
846 }
847
848 #[cfg(feature = "encryption")]
849 pub fn with_encryptor(mut self, page_encryptor: Option<PageEncryptor>) -> Self {
850 self.page_encryptor = page_encryptor;
851 self
852 }
853
854 #[cfg(feature = "encryption")]
855 fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
856 self.page_encryptor.as_mut()
857 }
858
859 #[cfg(not(feature = "encryption"))]
860 fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
861 None
862 }
863}
864
865impl PageWriter for ArrowPageWriter {
866 fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
867 let page = match self.page_encryptor_mut() {
868 Some(page_encryptor) => page_encryptor.encrypt_compressed_page(page)?,
869 None => page,
870 };
871
872 let page_header = page.to_thrift_header()?;
873 let header = {
874 let mut header = Vec::with_capacity(1024);
875
876 match self.page_encryptor_mut() {
877 Some(page_encryptor) => {
878 page_encryptor.encrypt_page_header(&page_header, &mut header)?;
879 if page.compressed_page().is_data_page() {
880 page_encryptor.increment_page();
881 }
882 }
883 None => {
884 let mut protocol = ThriftCompactOutputProtocol::new(&mut header);
885 page_header.write_thrift(&mut protocol)?;
886 }
887 };
888
889 Bytes::from(header)
890 };
891
892 let mut buf = self.buffer.try_lock().unwrap();
893
894 let data = page.compressed_page().buffer().clone();
895 let compressed_size = data.len() + header.len();
896
897 let mut spec = PageWriteSpec::new();
898 spec.page_type = page.page_type();
899 spec.num_values = page.num_values();
900 spec.uncompressed_size = page.uncompressed_size() + header.len();
901 spec.offset = buf.length as u64;
902 spec.compressed_size = compressed_size;
903 spec.bytes_written = compressed_size as u64;
904
905 buf.length += compressed_size;
906 if spec.page_type == PageType::DICTIONARY_PAGE {
907 buf.push_dictionary(header)?;
910 buf.push_dictionary(data)?;
911 } else {
912 buf.push(header)?;
913 buf.push(data)?;
914 }
915
916 Ok(spec)
917 }
918
919 fn defers_dictionary_ordering(&self) -> bool {
920 true
925 }
926
927 fn buffered_memory_size(&self) -> usize {
928 self.buffer.try_lock().unwrap().memory_size()
931 }
932
933 fn close(&mut self) -> Result<()> {
934 Ok(())
935 }
936}
937
938#[derive(Debug)]
940pub struct ArrowLeafColumn(ArrayLevels);
941
942pub fn compute_leaves(field: &Field, array: &ArrayRef) -> Result<Vec<ArrowLeafColumn>> {
947 let levels = calculate_array_levels(array, field)?;
948 Ok(levels.into_iter().map(ArrowLeafColumn).collect())
949}
950
951pub struct ArrowColumnChunk {
953 data: ArrowColumnChunkData,
954 close: ColumnCloseResult,
955}
956
957impl std::fmt::Debug for ArrowColumnChunk {
958 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
959 f.debug_struct("ArrowColumnChunk")
960 .field("length", &self.data.length)
961 .finish_non_exhaustive()
962 }
963}
964
965impl ArrowColumnChunk {
966 pub fn close(&self) -> &ColumnCloseResult {
973 &self.close
974 }
975
976 pub fn close_mut(&mut self) -> &mut ColumnCloseResult {
983 &mut self.close
984 }
985
986 pub fn append_to_row_group<W: Write + Send>(
989 self,
990 writer: &mut SerializedRowGroupWriter<'_, W>,
991 ) -> Result<()> {
992 let ArrowColumnChunk { data, close } = self;
993
994 let close = close.update_dictionary_location(data.dictionary_len)?;
998
999 let reader = StreamingColumnChunkReader::new(data);
1000 writer.append_column_from_read(reader, close)
1001 }
1002}
1003
1004pub struct ArrowColumnWriter {
1102 writer: ArrowColumnWriterImpl,
1103 chunk: SharedColumnChunk,
1104}
1105
1106impl std::fmt::Debug for ArrowColumnWriter {
1107 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1108 f.debug_struct("ArrowColumnWriter").finish_non_exhaustive()
1109 }
1110}
1111
1112enum ArrowColumnWriterImpl {
1113 ByteArray(GenericColumnWriter<'static, ByteArrayEncoder>),
1114 Column(ColumnWriter<'static>),
1115}
1116
1117impl ArrowColumnWriter {
1118 pub fn write(&mut self, col: &ArrowLeafColumn) -> Result<()> {
1120 self.write_internal(&col.0)
1121 }
1122
1123 fn write_with_chunker(
1125 &mut self,
1126 col: &ArrowLeafColumn,
1127 chunker: &mut ContentDefinedChunker,
1128 ) -> Result<()> {
1129 let levels = &col.0;
1130 let chunks = chunker.get_arrow_chunks(
1131 levels.def_level_data().as_ref(),
1132 levels.rep_level_data().as_ref(),
1133 levels.array(),
1134 )?;
1135
1136 let num_chunks = chunks.len();
1137 for (i, chunk) in chunks.iter().enumerate() {
1138 let chunk_levels = levels.slice_for_chunk(chunk);
1139 self.write_internal(&chunk_levels)?;
1140
1141 if i + 1 < num_chunks {
1143 match &mut self.writer {
1144 ArrowColumnWriterImpl::Column(c) => c.add_data_page()?,
1145 ArrowColumnWriterImpl::ByteArray(c) => c.add_data_page()?,
1146 }
1147 }
1148 }
1149 Ok(())
1150 }
1151
1152 fn write_internal(&mut self, levels: &ArrayLevels) -> Result<()> {
1153 match &mut self.writer {
1154 ArrowColumnWriterImpl::Column(c) => {
1155 let leaf = levels.array();
1156 match leaf.as_any_dictionary_opt() {
1157 Some(dictionary) => {
1158 let materialized =
1159 arrow_select::take::take(dictionary.values(), dictionary.keys(), None)?;
1160 write_leaf(c, &materialized, levels)?
1161 }
1162 None => write_leaf(c, leaf, levels)?,
1163 };
1164 }
1165 ArrowColumnWriterImpl::ByteArray(c) => {
1166 write_primitive(c, levels.array().as_ref(), levels)?;
1167 }
1168 }
1169 Ok(())
1170 }
1171
1172 pub fn close(self) -> Result<ArrowColumnChunk> {
1174 let close = match self.writer {
1175 ArrowColumnWriterImpl::ByteArray(c) => c.close()?,
1176 ArrowColumnWriterImpl::Column(c) => c.close()?,
1177 };
1178 let chunk = Arc::try_unwrap(self.chunk).ok().unwrap();
1179 let data = chunk.into_inner().unwrap();
1180 Ok(ArrowColumnChunk { data, close })
1181 }
1182
1183 pub fn memory_size(&self) -> usize {
1194 match &self.writer {
1195 ArrowColumnWriterImpl::ByteArray(c) => c.memory_size(),
1196 ArrowColumnWriterImpl::Column(c) => c.memory_size(),
1197 }
1198 }
1199
1200 pub fn get_estimated_total_bytes(&self) -> usize {
1208 match &self.writer {
1209 ArrowColumnWriterImpl::ByteArray(c) => c.get_estimated_total_bytes() as _,
1210 ArrowColumnWriterImpl::Column(c) => c.get_estimated_total_bytes() as _,
1211 }
1212 }
1213}
1214
1215#[derive(Debug)]
1222struct ArrowRowGroupWriter {
1223 writers: Vec<ArrowColumnWriter>,
1224 schema: SchemaRef,
1225 buffered_rows: usize,
1226}
1227
1228impl ArrowRowGroupWriter {
1229 fn new(writers: Vec<ArrowColumnWriter>, arrow: &SchemaRef) -> Self {
1230 Self {
1231 writers,
1232 schema: arrow.clone(),
1233 buffered_rows: 0,
1234 }
1235 }
1236
1237 fn write(&mut self, batch: &RecordBatch) -> Result<()> {
1238 self.buffered_rows += batch.num_rows();
1239 let mut writers = self.writers.iter_mut();
1240 for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
1241 for leaf in compute_leaves(field.as_ref(), column)? {
1242 writers.next().unwrap().write(&leaf)?;
1243 }
1244 }
1245 Ok(())
1246 }
1247
1248 fn write_with_chunkers(
1249 &mut self,
1250 batch: &RecordBatch,
1251 chunkers: &mut [ContentDefinedChunker],
1252 ) -> Result<()> {
1253 self.buffered_rows += batch.num_rows();
1254 let mut writers = self.writers.iter_mut();
1255 let mut chunkers = chunkers.iter_mut();
1256 for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
1257 for leaf in compute_leaves(field.as_ref(), column)? {
1258 writers
1259 .next()
1260 .unwrap()
1261 .write_with_chunker(&leaf, chunkers.next().unwrap())?;
1262 }
1263 }
1264 Ok(())
1265 }
1266
1267 fn get_estimated_total_bytes(&self) -> usize {
1269 self.writers
1270 .iter()
1271 .map(|x| x.get_estimated_total_bytes())
1272 .sum()
1273 }
1274
1275 fn close(self) -> Result<Vec<ArrowColumnChunk>> {
1276 self.writers
1277 .into_iter()
1278 .map(|writer| writer.close())
1279 .collect()
1280 }
1281}
1282
1283#[derive(Debug)]
1288pub struct ArrowRowGroupWriterFactory {
1289 schema: SchemaDescPtr,
1290 arrow_schema: SchemaRef,
1291 props: WriterPropertiesPtr,
1292 page_store_factory: Arc<dyn PageStoreFactory>,
1293 #[cfg(feature = "encryption")]
1294 file_encryptor: Option<Arc<FileEncryptor>>,
1295}
1296
1297impl ArrowRowGroupWriterFactory {
1298 pub fn new<W: Write + Send>(
1300 file_writer: &SerializedFileWriter<W>,
1301 arrow_schema: SchemaRef,
1302 ) -> Self {
1303 let schema = Arc::clone(file_writer.schema_descr_ptr());
1304 let props = Arc::clone(file_writer.properties());
1305 Self {
1306 schema,
1307 arrow_schema,
1308 props,
1309 page_store_factory: Arc::new(InMemoryPageStoreFactory),
1310 #[cfg(feature = "encryption")]
1311 file_encryptor: file_writer.file_encryptor(),
1312 }
1313 }
1314
1315 pub fn with_page_store_factory(
1319 mut self,
1320 page_store_factory: Arc<dyn PageStoreFactory>,
1321 ) -> Self {
1322 self.page_store_factory = page_store_factory;
1323 self
1324 }
1325
1326 fn create_row_group_writer(&self, row_group_index: usize) -> Result<ArrowRowGroupWriter> {
1327 let writers = self.create_column_writers(row_group_index)?;
1328 Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema))
1329 }
1330
1331 pub fn create_column_writers(&self, row_group_index: usize) -> Result<Vec<ArrowColumnWriter>> {
1333 let mut writers = Vec::with_capacity(self.arrow_schema.fields.len());
1334 let mut leaves = self.schema.columns().iter();
1335 let column_factory = self.column_writer_factory(row_group_index);
1336 for field in &self.arrow_schema.fields {
1337 column_factory.get_arrow_column_writer(
1338 field.data_type(),
1339 &self.props,
1340 &mut leaves,
1341 &mut writers,
1342 )?;
1343 }
1344 Ok(writers)
1345 }
1346
1347 #[cfg(feature = "encryption")]
1348 fn column_writer_factory(&self, row_group_idx: usize) -> ArrowColumnWriterFactory {
1349 ArrowColumnWriterFactory::new()
1350 .with_page_store_factory(self.page_store_factory.clone())
1351 .with_file_encryptor(row_group_idx, self.file_encryptor.clone())
1352 }
1353
1354 #[cfg(not(feature = "encryption"))]
1355 fn column_writer_factory(&self, _row_group_idx: usize) -> ArrowColumnWriterFactory {
1356 ArrowColumnWriterFactory::new().with_page_store_factory(self.page_store_factory.clone())
1357 }
1358}
1359
1360#[deprecated(since = "57.0.0", note = "Use `ArrowRowGroupWriterFactory` instead")]
1362pub fn get_column_writers(
1363 parquet: &SchemaDescriptor,
1364 props: &WriterPropertiesPtr,
1365 arrow: &SchemaRef,
1366) -> Result<Vec<ArrowColumnWriter>> {
1367 let mut writers = Vec::with_capacity(arrow.fields.len());
1368 let mut leaves = parquet.columns().iter();
1369 let column_factory = ArrowColumnWriterFactory::new();
1370 for field in &arrow.fields {
1371 column_factory.get_arrow_column_writer(
1372 field.data_type(),
1373 props,
1374 &mut leaves,
1375 &mut writers,
1376 )?;
1377 }
1378 Ok(writers)
1379}
1380
1381struct ArrowColumnWriterFactory {
1383 page_store_factory: Arc<dyn PageStoreFactory>,
1385 #[cfg(feature = "encryption")]
1386 row_group_index: usize,
1387 #[cfg(feature = "encryption")]
1388 file_encryptor: Option<Arc<FileEncryptor>>,
1389}
1390
1391impl ArrowColumnWriterFactory {
1392 pub fn new() -> Self {
1393 Self {
1394 page_store_factory: Arc::new(InMemoryPageStoreFactory),
1395 #[cfg(feature = "encryption")]
1396 row_group_index: 0,
1397 #[cfg(feature = "encryption")]
1398 file_encryptor: None,
1399 }
1400 }
1401
1402 pub fn with_page_store_factory(
1404 mut self,
1405 page_store_factory: Arc<dyn PageStoreFactory>,
1406 ) -> Self {
1407 self.page_store_factory = page_store_factory;
1408 self
1409 }
1410
1411 #[cfg(feature = "encryption")]
1412 pub fn with_file_encryptor(
1413 mut self,
1414 row_group_index: usize,
1415 file_encryptor: Option<Arc<FileEncryptor>>,
1416 ) -> Self {
1417 self.row_group_index = row_group_index;
1418 self.file_encryptor = file_encryptor;
1419 self
1420 }
1421
1422 #[cfg(feature = "encryption")]
1423 fn create_page_writer(
1424 &self,
1425 column_descriptor: &ColumnDescPtr,
1426 column_index: usize,
1427 ) -> Result<Box<ArrowPageWriter>> {
1428 let column_path = column_descriptor.path().string();
1429 let page_encryptor = PageEncryptor::create_if_column_encrypted(
1430 &self.file_encryptor,
1431 self.row_group_index,
1432 column_index,
1433 &column_path,
1434 )?;
1435 let args = PageStoreArgs::new(column_index, column_descriptor);
1436 let store = self.page_store_factory.create(&args)?;
1437 Ok(Box::new(
1438 ArrowPageWriter::new(store).with_encryptor(page_encryptor),
1439 ))
1440 }
1441
1442 #[cfg(not(feature = "encryption"))]
1443 fn create_page_writer(
1444 &self,
1445 column_descriptor: &ColumnDescPtr,
1446 column_index: usize,
1447 ) -> Result<Box<ArrowPageWriter>> {
1448 let args = PageStoreArgs::new(column_index, column_descriptor);
1449 let store = self.page_store_factory.create(&args)?;
1450 Ok(Box::new(ArrowPageWriter::new(store)))
1451 }
1452
1453 fn get_arrow_column_writer(
1456 &self,
1457 data_type: &ArrowDataType,
1458 props: &WriterPropertiesPtr,
1459 leaves: &mut Iter<'_, ColumnDescPtr>,
1460 out: &mut Vec<ArrowColumnWriter>,
1461 ) -> Result<()> {
1462 let col = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
1464 let page_writer = self.create_page_writer(desc, out.len())?;
1465 let chunk = page_writer.buffer.clone();
1466 let writer = get_column_writer(desc.clone(), props.clone(), page_writer);
1467 Ok(ArrowColumnWriter {
1468 chunk,
1469 writer: ArrowColumnWriterImpl::Column(writer),
1470 })
1471 };
1472
1473 let bytes = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
1475 let page_writer = self.create_page_writer(desc, out.len())?;
1476 let chunk = page_writer.buffer.clone();
1477 let writer = GenericColumnWriter::new(desc.clone(), props.clone(), page_writer);
1478 Ok(ArrowColumnWriter {
1479 chunk,
1480 writer: ArrowColumnWriterImpl::ByteArray(writer),
1481 })
1482 };
1483
1484 match data_type {
1485 _ if data_type.is_primitive() => out.push(col(leaves.next().unwrap())?),
1486 ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Boolean | ArrowDataType::Null => {
1487 out.push(col(leaves.next().unwrap())?)
1488 }
1489 ArrowDataType::LargeBinary
1490 | ArrowDataType::Binary
1491 | ArrowDataType::Utf8
1492 | ArrowDataType::LargeUtf8
1493 | ArrowDataType::BinaryView
1494 | ArrowDataType::Utf8View => out.push(bytes(leaves.next().unwrap())?),
1495 ArrowDataType::List(f)
1496 | ArrowDataType::LargeList(f)
1497 | ArrowDataType::FixedSizeList(f, _)
1498 | ArrowDataType::ListView(f)
1499 | ArrowDataType::LargeListView(f) => {
1500 self.get_arrow_column_writer(f.data_type(), props, leaves, out)?
1501 }
1502 ArrowDataType::Struct(fields) => {
1503 for field in fields {
1504 self.get_arrow_column_writer(field.data_type(), props, leaves, out)?
1505 }
1506 }
1507 ArrowDataType::Map(f, _) => match f.data_type() {
1508 ArrowDataType::Struct(f) => {
1509 self.get_arrow_column_writer(f[0].data_type(), props, leaves, out)?;
1510 self.get_arrow_column_writer(f[1].data_type(), props, leaves, out)?
1511 }
1512 _ => unreachable!("invalid map type"),
1513 },
1514 ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1515 ArrowDataType::Utf8
1516 | ArrowDataType::LargeUtf8
1517 | ArrowDataType::Binary
1518 | ArrowDataType::LargeBinary => out.push(bytes(leaves.next().unwrap())?),
1519 ArrowDataType::Utf8View | ArrowDataType::BinaryView => {
1520 out.push(bytes(leaves.next().unwrap())?)
1521 }
1522 ArrowDataType::FixedSizeBinary(_) => out.push(bytes(leaves.next().unwrap())?),
1523 _ => out.push(col(leaves.next().unwrap())?),
1524 },
1525 ArrowDataType::RunEndEncoded(_, value_field) => {
1526 self.get_arrow_column_writer(value_field.data_type(), props, leaves, out)?
1527 }
1528 _ => {
1529 return Err(ParquetError::NYI(format!(
1530 "Attempting to write an Arrow type {data_type} to parquet that is not yet implemented"
1531 )));
1532 }
1533 }
1534 Ok(())
1535 }
1536}
1537
1538fn write_leaf(
1539 writer: &mut ColumnWriter<'_>,
1540 column: &dyn arrow_array::Array,
1541 levels: &ArrayLevels,
1542) -> Result<usize> {
1543 let indices = levels.non_null_indices();
1544
1545 match writer {
1546 ColumnWriter::Int32ColumnWriter(typed) => {
1548 match column.data_type() {
1549 ArrowDataType::Null => {
1550 let array = Int32Array::new_null(column.len());
1551 write_primitive(typed, array.values(), levels)
1552 }
1553 ArrowDataType::Int8 => {
1554 let array: Int32Array = column.as_primitive::<Int8Type>().unary(|x| x as i32);
1555 write_primitive(typed, array.values(), levels)
1556 }
1557 ArrowDataType::Int16 => {
1558 let array: Int32Array = column.as_primitive::<Int16Type>().unary(|x| x as i32);
1559 write_primitive(typed, array.values(), levels)
1560 }
1561 ArrowDataType::Int32 => {
1562 write_primitive(typed, column.as_primitive::<Int32Type>().values(), levels)
1563 }
1564 ArrowDataType::UInt8 => {
1565 let array: Int32Array = column.as_primitive::<UInt8Type>().unary(|x| x as i32);
1566 write_primitive(typed, array.values(), levels)
1567 }
1568 ArrowDataType::UInt16 => {
1569 let array: Int32Array = column.as_primitive::<UInt16Type>().unary(|x| x as i32);
1570 write_primitive(typed, array.values(), levels)
1571 }
1572 ArrowDataType::UInt32 => {
1573 let array = column.as_primitive::<UInt32Type>();
1576 write_primitive(typed, array.values().inner().typed_data(), levels)
1577 }
1578 ArrowDataType::Date32 => {
1579 let array = column.as_primitive::<Date32Type>();
1580 write_primitive(typed, array.values(), levels)
1581 }
1582 ArrowDataType::Time32(TimeUnit::Second) => {
1583 let array = column.as_primitive::<Time32SecondType>();
1584 write_primitive(typed, array.values(), levels)
1585 }
1586 ArrowDataType::Time32(TimeUnit::Millisecond) => {
1587 let array = column.as_primitive::<Time32MillisecondType>();
1588 write_primitive(typed, array.values(), levels)
1589 }
1590 ArrowDataType::Date64 => {
1591 let array: Int32Array = column
1593 .as_primitive::<Date64Type>()
1594 .unary(|x| (x / 86_400_000) as _);
1595
1596 write_primitive(typed, array.values(), levels)
1597 }
1598 ArrowDataType::Decimal32(_, _) => {
1599 let array = column
1600 .as_primitive::<Decimal32Type>()
1601 .unary::<_, Int32Type>(|v| v);
1602 write_primitive(typed, array.values(), levels)
1603 }
1604 ArrowDataType::Decimal64(_, _) => {
1605 let array = column
1607 .as_primitive::<Decimal64Type>()
1608 .unary::<_, Int32Type>(|v| v as i32);
1609 write_primitive(typed, array.values(), levels)
1610 }
1611 ArrowDataType::Decimal128(_, _) => {
1612 let array = column
1614 .as_primitive::<Decimal128Type>()
1615 .unary::<_, Int32Type>(|v| v as i32);
1616 write_primitive(typed, array.values(), levels)
1617 }
1618 ArrowDataType::Decimal256(_, _) => {
1619 let array = column
1621 .as_primitive::<Decimal256Type>()
1622 .unary::<_, Int32Type>(|v| v.as_i128() as i32);
1623 write_primitive(typed, array.values(), levels)
1624 }
1625 d => Err(ParquetError::General(format!("Cannot coerce {d} to I32"))),
1626 }
1627 }
1628 ColumnWriter::BoolColumnWriter(typed) => {
1629 let array = column.as_boolean();
1630 let values = get_bool_array_slice(array, indices.iter().copied());
1631 typed.write_batch_internal(
1632 values.as_slice(),
1633 None,
1634 levels.def_level_data().as_ref(),
1635 levels.rep_level_data().as_ref(),
1636 None,
1637 None,
1638 None,
1639 )
1640 }
1641 ColumnWriter::Int64ColumnWriter(typed) => {
1642 match column.data_type() {
1643 ArrowDataType::Date64 => {
1644 let array = column
1645 .as_primitive::<Date64Type>()
1646 .reinterpret_cast::<Int64Type>();
1647
1648 write_primitive(typed, array.values(), levels)
1649 }
1650 ArrowDataType::Int64 => {
1651 let array = column.as_primitive::<Int64Type>();
1652 write_primitive(typed, array.values(), levels)
1653 }
1654 ArrowDataType::UInt64 => {
1655 let values = column.as_primitive::<UInt64Type>().values();
1656 let array = values.inner().typed_data::<i64>();
1659 write_primitive(typed, array, levels)
1660 }
1661 ArrowDataType::Time64(TimeUnit::Microsecond) => {
1662 let array = column.as_primitive::<Time64MicrosecondType>();
1663 write_primitive(typed, array.values(), levels)
1664 }
1665 ArrowDataType::Time64(TimeUnit::Nanosecond) => {
1666 let array = column.as_primitive::<Time64NanosecondType>();
1667 write_primitive(typed, array.values(), levels)
1668 }
1669 ArrowDataType::Timestamp(unit, _) => match unit {
1670 TimeUnit::Second => {
1671 let array = column.as_primitive::<TimestampSecondType>();
1672 write_primitive(typed, array.values(), levels)
1673 }
1674 TimeUnit::Millisecond => {
1675 let array = column.as_primitive::<TimestampMillisecondType>();
1676 write_primitive(typed, array.values(), levels)
1677 }
1678 TimeUnit::Microsecond => {
1679 let array = column.as_primitive::<TimestampMicrosecondType>();
1680 write_primitive(typed, array.values(), levels)
1681 }
1682 TimeUnit::Nanosecond => {
1683 let array = column.as_primitive::<TimestampNanosecondType>();
1684 write_primitive(typed, array.values(), levels)
1685 }
1686 },
1687 ArrowDataType::Duration(unit) => match unit {
1688 TimeUnit::Second => {
1689 let array = column.as_primitive::<DurationSecondType>();
1690 write_primitive(typed, array.values(), levels)
1691 }
1692 TimeUnit::Millisecond => {
1693 let array = column.as_primitive::<DurationMillisecondType>();
1694 write_primitive(typed, array.values(), levels)
1695 }
1696 TimeUnit::Microsecond => {
1697 let array = column.as_primitive::<DurationMicrosecondType>();
1698 write_primitive(typed, array.values(), levels)
1699 }
1700 TimeUnit::Nanosecond => {
1701 let array = column.as_primitive::<DurationNanosecondType>();
1702 write_primitive(typed, array.values(), levels)
1703 }
1704 },
1705 ArrowDataType::Decimal64(_, _) => {
1706 let array = column
1707 .as_primitive::<Decimal64Type>()
1708 .reinterpret_cast::<Int64Type>();
1709 write_primitive(typed, array.values(), levels)
1710 }
1711 ArrowDataType::Decimal128(_, _) => {
1712 let array = column
1714 .as_primitive::<Decimal128Type>()
1715 .unary::<_, Int64Type>(|v| v as i64);
1716 write_primitive(typed, array.values(), levels)
1717 }
1718 ArrowDataType::Decimal256(_, _) => {
1719 let array = column
1721 .as_primitive::<Decimal256Type>()
1722 .unary::<_, Int64Type>(|v| v.as_i128() as i64);
1723 write_primitive(typed, array.values(), levels)
1724 }
1725 d => Err(ParquetError::General(format!("Cannot coerce {d} to I64"))),
1726 }
1727 }
1728 ColumnWriter::Int96ColumnWriter(_typed) => {
1729 unreachable!("Currently unreachable because data type not supported")
1730 }
1731 ColumnWriter::FloatColumnWriter(typed) => {
1732 let array = column.as_primitive::<Float32Type>();
1733 write_primitive(typed, array.values(), levels)
1734 }
1735 ColumnWriter::DoubleColumnWriter(typed) => {
1736 let array = column.as_primitive::<Float64Type>();
1737 write_primitive(typed, array.values(), levels)
1738 }
1739 ColumnWriter::ByteArrayColumnWriter(_) => {
1740 unreachable!("should use ByteArrayWriter")
1741 }
1742 ColumnWriter::FixedLenByteArrayColumnWriter(typed) => {
1743 let bytes = match column.data_type() {
1744 ArrowDataType::Interval(interval_unit) => match interval_unit {
1745 IntervalUnit::YearMonth => {
1746 let array = column.as_primitive::<IntervalYearMonthType>();
1747 get_interval_ym_array_slice(array, indices.iter().copied())
1748 }
1749 IntervalUnit::DayTime => {
1750 let array = column.as_primitive::<IntervalDayTimeType>();
1751 get_interval_dt_array_slice(array, indices.iter().copied())
1752 }
1753 _ => {
1754 return Err(ParquetError::NYI(format!(
1755 "Attempting to write an Arrow interval type {interval_unit:?} to parquet that is not yet implemented"
1756 )));
1757 }
1758 },
1759 ArrowDataType::FixedSizeBinary(_) => {
1760 let array = column.as_fixed_size_binary();
1761 get_fsb_array_slice(array, indices.iter().copied())
1762 }
1763 ArrowDataType::Decimal32(_, _) => {
1764 let array = column.as_primitive::<Decimal32Type>();
1765 get_decimal_32_array_slice(array, indices.iter().copied())
1766 }
1767 ArrowDataType::Decimal64(_, _) => {
1768 let array = column.as_primitive::<Decimal64Type>();
1769 get_decimal_64_array_slice(array, indices.iter().copied())
1770 }
1771 ArrowDataType::Decimal128(_, _) => {
1772 let array = column.as_primitive::<Decimal128Type>();
1773 get_decimal_128_array_slice(array, indices.iter().copied())
1774 }
1775 ArrowDataType::Decimal256(_, _) => {
1776 let array = column.as_primitive::<Decimal256Type>();
1777 get_decimal_256_array_slice(array, indices.iter().copied())
1778 }
1779 ArrowDataType::Float16 => {
1780 let array = column.as_primitive::<Float16Type>();
1781 get_float_16_array_slice(array, indices.iter().copied())
1782 }
1783 _ => {
1784 return Err(ParquetError::NYI(
1785 "Attempting to write an Arrow type that is not yet implemented".to_string(),
1786 ));
1787 }
1788 };
1789 typed.write_batch_internal(
1790 bytes.as_slice(),
1791 None,
1792 levels.def_level_data().as_ref(),
1793 levels.rep_level_data().as_ref(),
1794 None,
1795 None,
1796 None,
1797 )
1798 }
1799 }
1800}
1801
1802fn write_primitive<E: ColumnValueEncoder>(
1803 writer: &mut GenericColumnWriter<E>,
1804 values: &E::Values,
1805 levels: &ArrayLevels,
1806) -> Result<usize> {
1807 writer.write_batch_internal(
1808 values,
1809 Some(levels.non_null_indices()),
1810 levels.def_level_data().as_ref(),
1811 levels.rep_level_data().as_ref(),
1812 None,
1813 None,
1814 None,
1815 )
1816}
1817
1818fn get_bool_array_slice(
1819 array: &arrow_array::BooleanArray,
1820 indices: impl ExactSizeIterator<Item = usize>,
1821) -> Vec<bool> {
1822 let mut values = Vec::with_capacity(indices.len());
1823 for i in indices {
1824 values.push(array.value(i))
1825 }
1826 values
1827}
1828
1829fn get_interval_ym_array_slice(
1832 array: &arrow_array::IntervalYearMonthArray,
1833 indices: impl ExactSizeIterator<Item = usize>,
1834) -> Vec<FixedLenByteArray> {
1835 let mut values = Vec::with_capacity(indices.len());
1836 for i in indices {
1837 let mut value = array.value(i).to_le_bytes().to_vec();
1838 let mut suffix = vec![0; 8];
1839 value.append(&mut suffix);
1840 values.push(FixedLenByteArray::from(ByteArray::from(value)))
1841 }
1842 values
1843}
1844
1845fn get_interval_dt_array_slice(
1848 array: &arrow_array::IntervalDayTimeArray,
1849 indices: impl ExactSizeIterator<Item = usize>,
1850) -> Vec<FixedLenByteArray> {
1851 let mut values = Vec::with_capacity(indices.len());
1852 for i in indices {
1853 let mut out = [0; 12];
1854 let value = array.value(i);
1855 out[4..8].copy_from_slice(&value.days.to_le_bytes());
1856 out[8..12].copy_from_slice(&value.milliseconds.to_le_bytes());
1857 values.push(FixedLenByteArray::from(ByteArray::from(out.to_vec())));
1858 }
1859 values
1860}
1861
1862fn get_decimal_32_array_slice(
1863 array: &arrow_array::Decimal32Array,
1864 indices: impl ExactSizeIterator<Item = usize>,
1865) -> Vec<FixedLenByteArray> {
1866 let mut values = Vec::with_capacity(indices.len());
1867 let size = decimal_length_from_precision(array.precision());
1868 for i in indices {
1869 let as_be_bytes = array.value(i).to_be_bytes();
1870 let resized_value = as_be_bytes[(4 - size)..].to_vec();
1871 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1872 }
1873 values
1874}
1875
1876fn get_decimal_64_array_slice(
1877 array: &arrow_array::Decimal64Array,
1878 indices: impl ExactSizeIterator<Item = usize>,
1879) -> Vec<FixedLenByteArray> {
1880 let mut values = Vec::with_capacity(indices.len());
1881 let size = decimal_length_from_precision(array.precision());
1882 for i in indices {
1883 let as_be_bytes = array.value(i).to_be_bytes();
1884 let resized_value = as_be_bytes[(8 - size)..].to_vec();
1885 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1886 }
1887 values
1888}
1889
1890fn get_decimal_128_array_slice(
1891 array: &arrow_array::Decimal128Array,
1892 indices: impl ExactSizeIterator<Item = usize>,
1893) -> Vec<FixedLenByteArray> {
1894 let mut values = Vec::with_capacity(indices.len());
1895 let size = decimal_length_from_precision(array.precision());
1896 for i in indices {
1897 let as_be_bytes = array.value(i).to_be_bytes();
1898 let resized_value = as_be_bytes[(16 - size)..].to_vec();
1899 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1900 }
1901 values
1902}
1903
1904fn get_decimal_256_array_slice(
1905 array: &arrow_array::Decimal256Array,
1906 indices: impl ExactSizeIterator<Item = usize>,
1907) -> Vec<FixedLenByteArray> {
1908 let mut values = Vec::with_capacity(indices.len());
1909 let size = decimal_length_from_precision(array.precision());
1910 for i in indices {
1911 let as_be_bytes = array.value(i).to_be_bytes();
1912 let resized_value = as_be_bytes[(32 - size)..].to_vec();
1913 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1914 }
1915 values
1916}
1917
1918fn get_float_16_array_slice(
1919 array: &arrow_array::Float16Array,
1920 indices: impl ExactSizeIterator<Item = usize>,
1921) -> Vec<FixedLenByteArray> {
1922 let mut values = Vec::with_capacity(indices.len());
1923 for i in indices {
1924 let value = array.value(i).to_le_bytes().to_vec();
1925 values.push(FixedLenByteArray::from(ByteArray::from(value)));
1926 }
1927 values
1928}
1929
1930fn get_fsb_array_slice(
1931 array: &arrow_array::FixedSizeBinaryArray,
1932 indices: impl ExactSizeIterator<Item = usize>,
1933) -> Vec<FixedLenByteArray> {
1934 let mut values = Vec::with_capacity(indices.len());
1935 for i in indices {
1936 let value = array.value(i).to_vec();
1937 values.push(FixedLenByteArray::from(ByteArray::from(value)))
1938 }
1939 values
1940}
1941
1942#[cfg(test)]
1943mod tests {
1944 use super::*;
1945 use std::collections::HashMap;
1946
1947 use std::fs::File;
1948
1949 use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
1950 use crate::arrow::{ARROW_SCHEMA_META_KEY, PARQUET_FIELD_ID_META_KEY};
1951 use crate::column::page::{Page, PageReader};
1952 use crate::file::metadata::thrift::PageHeader;
1953 use crate::file::page_index::column_index::ColumnIndexMetaData;
1954 use crate::file::reader::SerializedPageReader;
1955 use crate::parquet_thrift::{ReadThrift, ThriftSliceInputProtocol};
1956 use crate::schema::types::ColumnPath;
1957 use arrow::datatypes::ToByteSlice;
1958 use arrow::datatypes::{DataType, Schema};
1959 use arrow::error::Result as ArrowResult;
1960 use arrow::util::data_gen::create_random_array;
1961 use arrow::util::pretty::pretty_format_batches;
1962 use arrow::{array::*, buffer::Buffer};
1963 use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano, NullBuffer, OffsetBuffer, i256};
1964 use arrow_schema::Fields;
1965 use half::f16;
1966 use num_traits::{FromPrimitive, ToPrimitive};
1967 use tempfile::tempfile;
1968
1969 use crate::basic::Encoding;
1970 use crate::data_type::AsBytes;
1971 use crate::file::metadata::{ColumnChunkMetaData, ParquetMetaData, ParquetMetaDataReader};
1972 use crate::file::properties::{
1973 BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion,
1974 };
1975 use crate::file::serialized_reader::ReadOptionsBuilder;
1976 use crate::file::{
1977 reader::{FileReader, SerializedFileReader},
1978 statistics::Statistics,
1979 };
1980
1981 #[derive(Debug, Default)]
1986 struct RecordingPageStore {
1987 next: u64,
1988 blobs: HashMap<u64, Bytes>,
1989 puts: Arc<std::sync::atomic::AtomicUsize>,
1990 }
1991
1992 impl PageStore for RecordingPageStore {
1993 fn put(&mut self, value: Bytes) -> Result<PageKey> {
1994 let id = 100 + self.next * 7;
1996 self.next += 1;
1997 self.puts.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1998 self.blobs.insert(id, value);
1999 Ok(PageKey::new(id))
2000 }
2001
2002 fn take(&mut self, key: PageKey) -> Result<Bytes> {
2003 self.blobs
2004 .remove(&key.get())
2005 .ok_or_else(|| ParquetError::General(format!("missing key {}", key.get())))
2006 }
2007 }
2008
2009 #[derive(Debug)]
2010 struct RecordingPageStoreFactory {
2011 puts: Arc<std::sync::atomic::AtomicUsize>,
2012 }
2013
2014 impl PageStoreFactory for RecordingPageStoreFactory {
2015 fn create(&self, _args: &PageStoreArgs<'_>) -> Result<Box<dyn PageStore>> {
2016 Ok(Box::new(RecordingPageStore {
2017 puts: self.puts.clone(),
2018 ..Default::default()
2019 }))
2020 }
2021 }
2022
2023 #[test]
2027 fn custom_page_store_is_byte_identical_to_default() {
2028 let schema = Arc::new(Schema::new(vec![
2029 Field::new("i", DataType::Int32, true),
2030 Field::new("s", DataType::Utf8, true),
2032 ]));
2033 let i = Int32Array::from(vec![Some(1), None, Some(3), Some(4), Some(5), Some(6)]);
2034 let s = StringArray::from(vec![
2035 Some("a"),
2036 Some("bb"),
2037 Some("a"),
2038 None,
2039 Some("bb"),
2040 Some("ccc"),
2041 ]);
2042 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(i), Arc::new(s)]).unwrap();
2043
2044 let props = WriterProperties::builder()
2047 .set_max_row_group_row_count(Some(3))
2048 .build();
2049
2050 let write = |factory: Option<Arc<dyn PageStoreFactory>>| {
2051 let mut buffer = Vec::new();
2052 let mut opts = ArrowWriterOptions::new().with_properties(props.clone());
2053 if let Some(factory) = factory {
2054 opts = opts.with_page_store_factory(factory);
2055 }
2056 let mut writer =
2057 ArrowWriter::try_new_with_options(&mut buffer, schema.clone(), opts).unwrap();
2058 writer.write(&batch).unwrap();
2059 writer.close().unwrap();
2060 buffer
2061 };
2062
2063 let default_bytes = write(None);
2064
2065 let puts = Arc::new(std::sync::atomic::AtomicUsize::new(0));
2066 let custom_bytes = write(Some(Arc::new(RecordingPageStoreFactory {
2067 puts: puts.clone(),
2068 })));
2069
2070 assert!(
2071 puts.load(std::sync::atomic::Ordering::Relaxed) > 0,
2072 "custom PageStore was never written to"
2073 );
2074 assert_eq!(
2075 default_bytes, custom_bytes,
2076 "a custom PageStore must produce byte-identical output to the default"
2077 );
2078 }
2079
2080 #[test]
2086 fn dictionary_column_round_trips_with_offset_index_disabled() {
2087 let schema = Arc::new(Schema::new(vec![Field::new("k", DataType::Int32, true)]));
2088
2089 let values: Vec<Option<i32>> = (0..50_000).map(|i| Some(i % 8)).collect();
2092 let array = Int32Array::from(values.clone());
2093 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
2094
2095 let props = WriterProperties::builder()
2096 .set_offset_index_disabled(true)
2097 .set_data_page_row_count_limit(4096)
2098 .build();
2099 let opts = ArrowWriterOptions::new().with_properties(props);
2100
2101 let mut buffer = Vec::new();
2102 let mut writer =
2103 ArrowWriter::try_new_with_options(&mut buffer, schema.clone(), opts).unwrap();
2104 writer.write(&batch).unwrap();
2105 writer.close().unwrap();
2106
2107 let reader = ParquetRecordBatchReader::try_new(Bytes::from(buffer), values.len()).unwrap();
2108 let read: Vec<RecordBatch> = reader.collect::<ArrowResult<_>>().unwrap();
2109 let read_values: Vec<Option<i32>> = read
2110 .iter()
2111 .flat_map(|b| b.column(0).as_primitive::<Int32Type>().iter())
2112 .collect();
2113 assert_eq!(read_values, values);
2114 }
2115
2116 #[test]
2121 fn dictionary_page_is_routed_through_the_store() {
2122 #[derive(Debug, Default)]
2124 struct SizeRecordingPageStore {
2125 blobs: Vec<Bytes>,
2126 bytes_put: Arc<std::sync::atomic::AtomicUsize>,
2127 }
2128 impl PageStore for SizeRecordingPageStore {
2129 fn put(&mut self, value: Bytes) -> Result<PageKey> {
2130 self.bytes_put
2131 .fetch_add(value.len(), std::sync::atomic::Ordering::Relaxed);
2132 let key = PageKey::new(self.blobs.len() as u64);
2133 self.blobs.push(value);
2134 Ok(key)
2135 }
2136 fn take(&mut self, key: PageKey) -> Result<Bytes> {
2137 Ok(std::mem::take(&mut self.blobs[key.get() as usize]))
2138 }
2139 }
2140 #[derive(Debug)]
2141 struct Factory {
2142 bytes_put: Arc<std::sync::atomic::AtomicUsize>,
2143 }
2144 impl PageStoreFactory for Factory {
2145 fn create(&self, _args: &PageStoreArgs<'_>) -> Result<Box<dyn PageStore>> {
2146 Ok(Box::new(SizeRecordingPageStore {
2147 bytes_put: self.bytes_put.clone(),
2148 ..Default::default()
2149 }))
2150 }
2151 }
2152
2153 let schema = Arc::new(Schema::new(vec![Field::new("s", DataType::Utf8, false)]));
2154 let values: Vec<&str> = (0..2048)
2157 .map(|i| ["alpha", "beta", "gamma", "delta"][i % 4])
2158 .collect();
2159 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(StringArray::from(values))])
2160 .unwrap();
2161
2162 let bytes_put = Arc::new(std::sync::atomic::AtomicUsize::new(0));
2163 let opts = ArrowWriterOptions::new().with_page_store_factory(Arc::new(Factory {
2164 bytes_put: bytes_put.clone(),
2165 }));
2166
2167 let mut buffer = Vec::new();
2170 let mut writer =
2171 ArrowWriter::try_new_with_options(&mut buffer, schema.clone(), opts).unwrap();
2172 writer.write(&batch).unwrap();
2173 writer.close().unwrap();
2174
2175 let reader = SerializedFileReader::new(Bytes::from(buffer)).unwrap();
2176 let column = reader.metadata().row_group(0).column(0);
2177 assert!(
2178 column.dictionary_page_offset().is_some(),
2179 "expected the column to be dictionary-encoded"
2180 );
2181
2182 assert_eq!(
2186 bytes_put.load(std::sync::atomic::Ordering::Relaxed) as i64,
2187 column.compressed_size(),
2188 "the dictionary page must pass through the store like any other page"
2189 );
2190 }
2191
2192 #[test]
2193 fn arrow_writer() {
2194 let schema = Schema::new(vec![
2196 Field::new("a", DataType::Int32, false),
2197 Field::new("b", DataType::Int32, true),
2198 ]);
2199
2200 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2202 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
2203
2204 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
2206
2207 roundtrip(batch, Some(SMALL_SIZE / 2));
2208 }
2209
2210 fn get_bytes_after_close(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
2211 let mut buffer = vec![];
2212
2213 let mut writer = ArrowWriter::try_new(&mut buffer, schema, None).unwrap();
2214 writer.write(expected_batch).unwrap();
2215 writer.close().unwrap();
2216
2217 buffer
2218 }
2219
2220 fn get_bytes_by_into_inner(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
2221 let mut writer = ArrowWriter::try_new(Vec::new(), schema, None).unwrap();
2222 writer.write(expected_batch).unwrap();
2223 writer.into_inner().unwrap()
2224 }
2225
2226 #[test]
2227 fn roundtrip_bytes() {
2228 let schema = Arc::new(Schema::new(vec![
2230 Field::new("a", DataType::Int32, false),
2231 Field::new("b", DataType::Int32, true),
2232 ]));
2233
2234 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2236 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
2237
2238 let expected_batch =
2240 RecordBatch::try_new(schema.clone(), vec![Arc::new(a), Arc::new(b)]).unwrap();
2241
2242 for buffer in [
2243 get_bytes_after_close(schema.clone(), &expected_batch),
2244 get_bytes_by_into_inner(schema, &expected_batch),
2245 ] {
2246 let cursor = Bytes::from(buffer);
2247 let mut record_batch_reader = ParquetRecordBatchReader::try_new(cursor, 1024).unwrap();
2248
2249 let actual_batch = record_batch_reader
2250 .next()
2251 .expect("No batch found")
2252 .expect("Unable to get batch");
2253
2254 assert_eq!(expected_batch.schema(), actual_batch.schema());
2255 assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
2256 assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
2257 for i in 0..expected_batch.num_columns() {
2258 let expected_data = expected_batch.column(i).to_data();
2259 let actual_data = actual_batch.column(i).to_data();
2260
2261 assert_eq!(expected_data, actual_data);
2262 }
2263 }
2264 }
2265
2266 #[test]
2267 fn arrow_writer_non_null() {
2268 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
2270
2271 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2273
2274 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2276
2277 roundtrip(batch, Some(SMALL_SIZE / 2));
2278 }
2279
2280 #[test]
2281 fn arrow_writer_list() {
2282 let schema = Schema::new(vec![Field::new(
2284 "a",
2285 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
2286 true,
2287 )]);
2288
2289 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2291
2292 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
2295
2296 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2298 DataType::Int32,
2299 false,
2300 ))))
2301 .len(5)
2302 .add_buffer(a_value_offsets)
2303 .add_child_data(a_values.into_data())
2304 .null_bit_buffer(Some(Buffer::from([0b00011011])))
2305 .build()
2306 .unwrap();
2307 let a = ListArray::from(a_list_data);
2308
2309 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2311
2312 assert_eq!(batch.column(0).null_count(), 1);
2313
2314 roundtrip(batch, None);
2317 }
2318
2319 #[test]
2320 fn arrow_writer_list_non_null() {
2321 let schema = Schema::new(vec![Field::new(
2323 "a",
2324 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
2325 false,
2326 )]);
2327
2328 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2330
2331 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
2334
2335 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2337 DataType::Int32,
2338 false,
2339 ))))
2340 .len(5)
2341 .add_buffer(a_value_offsets)
2342 .add_child_data(a_values.into_data())
2343 .build()
2344 .unwrap();
2345 let a = ListArray::from(a_list_data);
2346
2347 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2349
2350 assert_eq!(batch.column(0).null_count(), 0);
2353
2354 roundtrip(batch, None);
2355 }
2356
2357 #[test]
2358 fn arrow_writer_list_view() {
2359 let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
2360 let schema = Schema::new(vec![Field::new(
2361 "a",
2362 DataType::ListView(list_field.clone()),
2363 true,
2364 )]);
2365
2366 let a = ListViewArray::new(
2368 list_field,
2369 vec![0, 1, 0, 3, 6].into(),
2370 vec![1, 2, 0, 3, 4].into(),
2371 Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
2372 Some(vec![true, true, false, true, true].into()),
2373 );
2374
2375 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2376
2377 assert_eq!(batch.column(0).null_count(), 1);
2378
2379 roundtrip(batch, None);
2380 }
2381
2382 #[test]
2383 fn arrow_writer_list_view_non_null() {
2384 let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
2385 let schema = Schema::new(vec![Field::new(
2386 "a",
2387 DataType::ListView(list_field.clone()),
2388 false,
2389 )]);
2390
2391 let a = ListViewArray::new(
2393 list_field,
2394 vec![0, 1, 0, 3, 6].into(),
2395 vec![1, 2, 0, 3, 4].into(),
2396 Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
2397 None,
2398 );
2399
2400 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2401
2402 assert_eq!(batch.column(0).null_count(), 0);
2403
2404 roundtrip(batch, None);
2405 }
2406
2407 #[test]
2408 fn arrow_writer_list_view_out_of_order() {
2409 let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
2410 let schema = Schema::new(vec![Field::new(
2411 "a",
2412 DataType::ListView(list_field.clone()),
2413 false,
2414 )]);
2415
2416 let a = ListViewArray::new(
2418 list_field,
2419 vec![0, 1, 0, 6, 3].into(),
2420 vec![1, 2, 0, 4, 3].into(),
2421 Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
2422 None,
2423 );
2424
2425 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2426
2427 roundtrip(batch, None);
2428 }
2429
2430 #[test]
2431 fn arrow_writer_large_list_view() {
2432 let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
2433 let schema = Schema::new(vec![Field::new(
2434 "a",
2435 DataType::LargeListView(list_field.clone()),
2436 true,
2437 )]);
2438
2439 let a = LargeListViewArray::new(
2441 list_field,
2442 vec![0i64, 1, 0, 3, 6].into(),
2443 vec![1i64, 2, 0, 3, 4].into(),
2444 Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
2445 Some(vec![true, true, false, true, true].into()),
2446 );
2447
2448 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2449
2450 assert_eq!(batch.column(0).null_count(), 1);
2451
2452 roundtrip(batch, None);
2453 }
2454
2455 #[test]
2456 fn arrow_writer_list_view_with_struct() {
2457 let struct_fields = Fields::from(vec![
2459 Field::new("id", DataType::Int32, false),
2460 Field::new("name", DataType::Utf8, false),
2461 ]);
2462 let struct_type = DataType::Struct(struct_fields.clone());
2463 let list_field = Arc::new(Field::new("item", struct_type.clone(), false));
2464
2465 let schema = Schema::new(vec![Field::new(
2466 "a",
2467 DataType::ListView(list_field.clone()),
2468 true,
2469 )]);
2470
2471 let id_array = Int32Array::from(vec![1, 2, 3, 4, 5]);
2473 let name_array = StringArray::from(vec!["a", "b", "c", "d", "e"]);
2474 let struct_array = StructArray::new(
2475 struct_fields,
2476 vec![Arc::new(id_array), Arc::new(name_array)],
2477 None,
2478 );
2479
2480 let list_view = ListViewArray::new(
2482 list_field,
2483 vec![0, 2, 2].into(), vec![2, 0, 3].into(), Arc::new(struct_array),
2486 Some(vec![true, false, true].into()),
2487 );
2488
2489 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(list_view)]).unwrap();
2490
2491 roundtrip(batch, None);
2492 }
2493
2494 #[test]
2495 fn arrow_writer_binary() {
2496 let string_field = Field::new("a", DataType::Utf8, false);
2497 let binary_field = Field::new("b", DataType::Binary, false);
2498 let schema = Schema::new(vec![string_field, binary_field]);
2499
2500 let raw_string_values = vec!["foo", "bar", "baz", "quux"];
2501 let raw_binary_values = [
2502 b"foo".to_vec(),
2503 b"bar".to_vec(),
2504 b"baz".to_vec(),
2505 b"quux".to_vec(),
2506 ];
2507 let raw_binary_value_refs = raw_binary_values
2508 .iter()
2509 .map(|x| x.as_slice())
2510 .collect::<Vec<_>>();
2511
2512 let string_values = StringArray::from(raw_string_values.clone());
2513 let binary_values = BinaryArray::from(raw_binary_value_refs);
2514 let batch = RecordBatch::try_new(
2515 Arc::new(schema),
2516 vec![Arc::new(string_values), Arc::new(binary_values)],
2517 )
2518 .unwrap();
2519
2520 roundtrip(batch, Some(SMALL_SIZE / 2));
2521 }
2522
2523 #[test]
2524 fn arrow_writer_binary_view() {
2525 let string_field = Field::new("a", DataType::Utf8View, false);
2526 let binary_field = Field::new("b", DataType::BinaryView, false);
2527 let nullable_string_field = Field::new("a", DataType::Utf8View, true);
2528 let schema = Schema::new(vec![string_field, binary_field, nullable_string_field]);
2529
2530 let raw_string_values = vec!["foo", "bar", "large payload over 12 bytes", "lulu"];
2531 let raw_binary_values = vec![
2532 b"foo".to_vec(),
2533 b"bar".to_vec(),
2534 b"large payload over 12 bytes".to_vec(),
2535 b"lulu".to_vec(),
2536 ];
2537 let nullable_string_values =
2538 vec![Some("foo"), None, Some("large payload over 12 bytes"), None];
2539
2540 let string_view_values = StringViewArray::from(raw_string_values);
2541 let binary_view_values = BinaryViewArray::from_iter_values(raw_binary_values);
2542 let nullable_string_view_values = StringViewArray::from(nullable_string_values);
2543 let batch = RecordBatch::try_new(
2544 Arc::new(schema),
2545 vec![
2546 Arc::new(string_view_values),
2547 Arc::new(binary_view_values),
2548 Arc::new(nullable_string_view_values),
2549 ],
2550 )
2551 .unwrap();
2552
2553 roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
2554 roundtrip(batch, None);
2555 }
2556
2557 #[test]
2558 fn arrow_writer_binary_view_long_value() {
2559 let string_field = Field::new("a", DataType::Utf8View, false);
2560 let binary_field = Field::new("b", DataType::BinaryView, false);
2561 let schema = Schema::new(vec![string_field, binary_field]);
2562
2563 let long = "a".repeat(128);
2567 let raw_string_values = vec!["foo", long.as_str(), "bar"];
2568 let raw_binary_values = vec![b"foo".to_vec(), long.as_bytes().to_vec(), b"bar".to_vec()];
2569
2570 let string_view_values: ArrayRef = Arc::new(StringViewArray::from(raw_string_values));
2571 let binary_view_values: ArrayRef =
2572 Arc::new(BinaryViewArray::from_iter_values(raw_binary_values));
2573
2574 one_column_roundtrip(Arc::clone(&string_view_values), false);
2575 one_column_roundtrip(Arc::clone(&binary_view_values), false);
2576
2577 let batch = RecordBatch::try_new(
2578 Arc::new(schema),
2579 vec![string_view_values, binary_view_values],
2580 )
2581 .unwrap();
2582
2583 for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2585 let props = WriterProperties::builder()
2586 .set_writer_version(version)
2587 .set_dictionary_enabled(false)
2588 .build();
2589 roundtrip_opts(&batch, props);
2590 }
2591 }
2592
2593 fn get_decimal_batch(precision: u8, scale: i8) -> RecordBatch {
2594 let decimal_field = Field::new("a", DataType::Decimal128(precision, scale), false);
2595 let schema = Schema::new(vec![decimal_field]);
2596
2597 let decimal_values = vec![10_000, 50_000, 0, -100]
2598 .into_iter()
2599 .map(Some)
2600 .collect::<Decimal128Array>()
2601 .with_precision_and_scale(precision, scale)
2602 .unwrap();
2603
2604 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(decimal_values)]).unwrap()
2605 }
2606
2607 #[test]
2608 fn arrow_writer_decimal() {
2609 let batch_int32_decimal = get_decimal_batch(5, 2);
2611 roundtrip(batch_int32_decimal, Some(SMALL_SIZE / 2));
2612 let batch_int64_decimal = get_decimal_batch(12, 2);
2614 roundtrip(batch_int64_decimal, Some(SMALL_SIZE / 2));
2615 let batch_fixed_len_byte_array_decimal = get_decimal_batch(30, 2);
2617 roundtrip(batch_fixed_len_byte_array_decimal, Some(SMALL_SIZE / 2));
2618 }
2619
2620 #[test]
2621 fn arrow_writer_complex() {
2622 let struct_field_d = Arc::new(Field::new("d", DataType::Float64, true));
2624 let struct_field_f = Arc::new(Field::new("f", DataType::Float32, true));
2625 let struct_field_g = Arc::new(Field::new_list(
2626 "g",
2627 Field::new_list_field(DataType::Int16, true),
2628 false,
2629 ));
2630 let struct_field_h = Arc::new(Field::new_list(
2631 "h",
2632 Field::new_list_field(DataType::Int16, false),
2633 true,
2634 ));
2635 let struct_field_e = Arc::new(Field::new_struct(
2636 "e",
2637 vec![
2638 struct_field_f.clone(),
2639 struct_field_g.clone(),
2640 struct_field_h.clone(),
2641 ],
2642 false,
2643 ));
2644 let schema = Schema::new(vec![
2645 Field::new("a", DataType::Int32, false),
2646 Field::new("b", DataType::Int32, true),
2647 Field::new_struct(
2648 "c",
2649 vec![struct_field_d.clone(), struct_field_e.clone()],
2650 false,
2651 ),
2652 ]);
2653
2654 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2656 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
2657 let d = Float64Array::from(vec![None, None, None, Some(1.0), None]);
2658 let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]);
2659
2660 let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2661
2662 let g_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
2665
2666 let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
2668 .len(5)
2669 .add_buffer(g_value_offsets.clone())
2670 .add_child_data(g_value.to_data())
2671 .build()
2672 .unwrap();
2673 let g = ListArray::from(g_list_data);
2674 let h_list_data = ArrayData::builder(struct_field_h.data_type().clone())
2676 .len(5)
2677 .add_buffer(g_value_offsets)
2678 .add_child_data(g_value.to_data())
2679 .null_bit_buffer(Some(Buffer::from([0b00011011])))
2680 .build()
2681 .unwrap();
2682 let h = ListArray::from(h_list_data);
2683
2684 let e = StructArray::from(vec![
2685 (struct_field_f, Arc::new(f) as ArrayRef),
2686 (struct_field_g, Arc::new(g) as ArrayRef),
2687 (struct_field_h, Arc::new(h) as ArrayRef),
2688 ]);
2689
2690 let c = StructArray::from(vec![
2691 (struct_field_d, Arc::new(d) as ArrayRef),
2692 (struct_field_e, Arc::new(e) as ArrayRef),
2693 ]);
2694
2695 let batch = RecordBatch::try_new(
2697 Arc::new(schema),
2698 vec![Arc::new(a), Arc::new(b), Arc::new(c)],
2699 )
2700 .unwrap();
2701
2702 roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
2703 roundtrip(batch, Some(SMALL_SIZE / 3));
2704 }
2705
2706 #[test]
2707 fn arrow_writer_complex_mixed() {
2708 let offset_field = Arc::new(Field::new("offset", DataType::Int32, false));
2713 let partition_field = Arc::new(Field::new("partition", DataType::Int64, true));
2714 let topic_field = Arc::new(Field::new("topic", DataType::Utf8, true));
2715 let schema = Schema::new(vec![Field::new(
2716 "some_nested_object",
2717 DataType::Struct(Fields::from(vec![
2718 offset_field.clone(),
2719 partition_field.clone(),
2720 topic_field.clone(),
2721 ])),
2722 false,
2723 )]);
2724
2725 let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
2727 let partition = Int64Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
2728 let topic = StringArray::from(vec![Some("A"), None, Some("A"), Some(""), None]);
2729
2730 let some_nested_object = StructArray::from(vec![
2731 (offset_field, Arc::new(offset) as ArrayRef),
2732 (partition_field, Arc::new(partition) as ArrayRef),
2733 (topic_field, Arc::new(topic) as ArrayRef),
2734 ]);
2735
2736 let batch =
2738 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
2739
2740 roundtrip(batch, Some(SMALL_SIZE / 2));
2741 }
2742
2743 #[test]
2744 fn arrow_writer_map() {
2745 let json_content = r#"
2747 {"stocks":{"long": "$AAA", "short": "$BBB"}}
2748 {"stocks":{"long": null, "long": "$CCC", "short": null}}
2749 {"stocks":{"hedged": "$YYY", "long": null, "short": "$D"}}
2750 "#;
2751 let entries_struct_type = DataType::Struct(Fields::from(vec![
2752 Field::new("key", DataType::Utf8, false),
2753 Field::new("value", DataType::Utf8, true),
2754 ]));
2755 let stocks_field = Field::new(
2756 "stocks",
2757 DataType::Map(
2758 Arc::new(Field::new("entries", entries_struct_type, false)),
2759 false,
2760 ),
2761 true,
2762 );
2763 let schema = Arc::new(Schema::new(vec![stocks_field]));
2764 let builder = arrow::json::ReaderBuilder::new(schema).with_batch_size(64);
2765 let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
2766
2767 let batch = reader.next().unwrap().unwrap();
2768 roundtrip(batch, None);
2769 }
2770
2771 #[test]
2772 fn arrow_writer_2_level_struct() {
2773 let field_c = Field::new("c", DataType::Int32, true);
2775 let field_b = Field::new("b", DataType::Struct(vec![field_c].into()), true);
2776 let type_a = DataType::Struct(vec![field_b.clone()].into());
2777 let field_a = Field::new("a", type_a, true);
2778 let schema = Schema::new(vec![field_a.clone()]);
2779
2780 let c = Int32Array::from(vec![Some(1), None, Some(3), None, None, Some(6)]);
2782 let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
2783 .len(6)
2784 .null_bit_buffer(Some(Buffer::from([0b00100111])))
2785 .add_child_data(c.into_data())
2786 .build()
2787 .unwrap();
2788 let b = StructArray::from(b_data);
2789 let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
2790 .len(6)
2791 .null_bit_buffer(Some(Buffer::from([0b00101111])))
2792 .add_child_data(b.into_data())
2793 .build()
2794 .unwrap();
2795 let a = StructArray::from(a_data);
2796
2797 assert_eq!(a.null_count(), 1);
2798 assert_eq!(a.column(0).null_count(), 2);
2799
2800 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2802
2803 roundtrip(batch, Some(SMALL_SIZE / 2));
2804 }
2805
2806 #[test]
2807 fn arrow_writer_2_level_struct_non_null() {
2808 let field_c = Field::new("c", DataType::Int32, false);
2810 let type_b = DataType::Struct(vec![field_c].into());
2811 let field_b = Field::new("b", type_b.clone(), false);
2812 let type_a = DataType::Struct(vec![field_b].into());
2813 let field_a = Field::new("a", type_a.clone(), false);
2814 let schema = Schema::new(vec![field_a]);
2815
2816 let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
2818 let b_data = ArrayDataBuilder::new(type_b)
2819 .len(6)
2820 .add_child_data(c.into_data())
2821 .build()
2822 .unwrap();
2823 let b = StructArray::from(b_data);
2824 let a_data = ArrayDataBuilder::new(type_a)
2825 .len(6)
2826 .add_child_data(b.into_data())
2827 .build()
2828 .unwrap();
2829 let a = StructArray::from(a_data);
2830
2831 assert_eq!(a.null_count(), 0);
2832 assert_eq!(a.column(0).null_count(), 0);
2833
2834 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2836
2837 roundtrip(batch, Some(SMALL_SIZE / 2));
2838 }
2839
2840 #[test]
2841 fn arrow_writer_2_level_struct_mixed_null() {
2842 let field_c = Field::new("c", DataType::Int32, false);
2844 let type_b = DataType::Struct(vec![field_c].into());
2845 let field_b = Field::new("b", type_b.clone(), true);
2846 let type_a = DataType::Struct(vec![field_b].into());
2847 let field_a = Field::new("a", type_a.clone(), false);
2848 let schema = Schema::new(vec![field_a]);
2849
2850 let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
2852 let b_data = ArrayDataBuilder::new(type_b)
2853 .len(6)
2854 .null_bit_buffer(Some(Buffer::from([0b00100111])))
2855 .add_child_data(c.into_data())
2856 .build()
2857 .unwrap();
2858 let b = StructArray::from(b_data);
2859 let a_data = ArrayDataBuilder::new(type_a)
2861 .len(6)
2862 .add_child_data(b.into_data())
2863 .build()
2864 .unwrap();
2865 let a = StructArray::from(a_data);
2866
2867 assert_eq!(a.null_count(), 0);
2868 assert_eq!(a.column(0).null_count(), 2);
2869
2870 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2872
2873 roundtrip(batch, Some(SMALL_SIZE / 2));
2874 }
2875
2876 #[test]
2877 fn arrow_writer_2_level_struct_mixed_null_2() {
2878 let field_c = Field::new("c", DataType::Int32, false);
2880 let field_d = Field::new("d", DataType::FixedSizeBinary(4), false);
2881 let field_e = Field::new(
2882 "e",
2883 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2884 false,
2885 );
2886
2887 let field_b = Field::new(
2888 "b",
2889 DataType::Struct(vec![field_c, field_d, field_e].into()),
2890 false,
2891 );
2892 let type_a = DataType::Struct(vec![field_b.clone()].into());
2893 let field_a = Field::new("a", type_a, true);
2894 let schema = Schema::new(vec![field_a.clone()]);
2895
2896 let c = Int32Array::from_iter_values(0..6);
2898 let d = FixedSizeBinaryArray::try_from_iter(
2899 ["aaaa", "bbbb", "cccc", "dddd", "eeee", "ffff"].into_iter(),
2900 )
2901 .expect("four byte values");
2902 let e = Int32DictionaryArray::from_iter(["one", "two", "three", "four", "five", "one"]);
2903 let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
2904 .len(6)
2905 .add_child_data(c.into_data())
2906 .add_child_data(d.into_data())
2907 .add_child_data(e.into_data())
2908 .build()
2909 .unwrap();
2910 let b = StructArray::from(b_data);
2911 let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
2912 .len(6)
2913 .null_bit_buffer(Some(Buffer::from([0b00100101])))
2914 .add_child_data(b.into_data())
2915 .build()
2916 .unwrap();
2917 let a = StructArray::from(a_data);
2918
2919 assert_eq!(a.null_count(), 3);
2920 assert_eq!(a.column(0).null_count(), 0);
2921
2922 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2924
2925 roundtrip(batch, Some(SMALL_SIZE / 2));
2926 }
2927
2928 #[test]
2929 fn test_fixed_size_binary_in_dict() {
2930 fn test_fixed_size_binary_in_dict_inner<K>()
2931 where
2932 K: ArrowDictionaryKeyType,
2933 K::Native: FromPrimitive + ToPrimitive + TryFrom<u8>,
2934 <<K as arrow_array::ArrowPrimitiveType>::Native as TryFrom<u8>>::Error: std::fmt::Debug,
2935 {
2936 let field = Field::new(
2937 "a",
2938 DataType::Dictionary(
2939 Box::new(K::DATA_TYPE),
2940 Box::new(DataType::FixedSizeBinary(4)),
2941 ),
2942 false,
2943 );
2944 let schema = Schema::new(vec![field]);
2945
2946 let keys: Vec<K::Native> = vec![
2947 K::Native::try_from(0u8).unwrap(),
2948 K::Native::try_from(0u8).unwrap(),
2949 K::Native::try_from(1u8).unwrap(),
2950 ];
2951 let keys = PrimitiveArray::<K>::from_iter_values(keys);
2952 let values = FixedSizeBinaryArray::try_from_iter(
2953 vec![vec![0, 0, 0, 0], vec![1, 1, 1, 1]].into_iter(),
2954 )
2955 .unwrap();
2956
2957 let data = DictionaryArray::<K>::new(keys, Arc::new(values));
2958 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(data)]).unwrap();
2959 roundtrip(batch, None);
2960 }
2961
2962 test_fixed_size_binary_in_dict_inner::<UInt8Type>();
2963 test_fixed_size_binary_in_dict_inner::<UInt16Type>();
2964 test_fixed_size_binary_in_dict_inner::<UInt32Type>();
2965 test_fixed_size_binary_in_dict_inner::<UInt16Type>();
2966 test_fixed_size_binary_in_dict_inner::<Int8Type>();
2967 test_fixed_size_binary_in_dict_inner::<Int16Type>();
2968 test_fixed_size_binary_in_dict_inner::<Int32Type>();
2969 test_fixed_size_binary_in_dict_inner::<Int64Type>();
2970 }
2971
2972 #[test]
2973 fn test_empty_dict() {
2974 let struct_fields = Fields::from(vec![Field::new(
2975 "dict",
2976 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2977 false,
2978 )]);
2979
2980 let schema = Schema::new(vec![Field::new_struct(
2981 "struct",
2982 struct_fields.clone(),
2983 true,
2984 )]);
2985 let dictionary = Arc::new(DictionaryArray::new(
2986 Int32Array::new_null(5),
2987 Arc::new(StringArray::new_null(0)),
2988 ));
2989
2990 let s = StructArray::new(
2991 struct_fields,
2992 vec![dictionary],
2993 Some(NullBuffer::new_null(5)),
2994 );
2995
2996 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(s)]).unwrap();
2997 roundtrip(batch, None);
2998 }
2999 #[test]
3000 fn arrow_writer_page_size() {
3001 let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)]));
3002
3003 let mut builder = StringBuilder::with_capacity(100, 329 * 10_000);
3004
3005 for i in 0..10 {
3007 let value = i
3008 .to_string()
3009 .repeat(10)
3010 .chars()
3011 .take(10)
3012 .collect::<String>();
3013
3014 builder.append_value(value);
3015 }
3016
3017 let array = Arc::new(builder.finish());
3018
3019 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
3020
3021 let file = tempfile::tempfile().unwrap();
3022
3023 let props = WriterProperties::builder()
3025 .set_data_page_size_limit(1)
3026 .set_dictionary_page_size_limit(1)
3027 .set_write_batch_size(1)
3028 .build();
3029
3030 let mut writer =
3031 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
3032 .expect("Unable to write file");
3033 writer.write(&batch).unwrap();
3034 writer.close().unwrap();
3035
3036 let options = ReadOptionsBuilder::new().with_page_index().build();
3037 let reader =
3038 SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap();
3039
3040 let column = reader.metadata().row_group(0).columns();
3041
3042 assert_eq!(column.len(), 1);
3043
3044 assert!(
3047 column[0].dictionary_page_offset().is_some(),
3048 "Expected a dictionary page"
3049 );
3050
3051 assert!(reader.metadata().offset_index().is_some());
3052 let offset_indexes = &reader.metadata().offset_index().unwrap()[0];
3053
3054 let page_locations = offset_indexes[0].page_locations.clone();
3055
3056 assert_eq!(
3059 page_locations.len(),
3060 10,
3061 "Expected 10 pages but got {page_locations:#?}"
3062 );
3063 }
3064
3065 #[test]
3066 fn arrow_writer_float_nans() {
3067 let f16_field = Field::new("a", DataType::Float16, false);
3068 let f32_field = Field::new("b", DataType::Float32, false);
3069 let f64_field = Field::new("c", DataType::Float64, false);
3070 let schema = Schema::new(vec![f16_field, f32_field, f64_field]);
3071
3072 let f16_values = (0..MEDIUM_SIZE)
3073 .map(|i| {
3074 Some(if i % 2 == 0 {
3075 f16::NAN
3076 } else {
3077 f16::from_f32(i as f32)
3078 })
3079 })
3080 .collect::<Float16Array>();
3081
3082 let f32_values = (0..MEDIUM_SIZE)
3083 .map(|i| Some(if i % 2 == 0 { f32::NAN } else { i as f32 }))
3084 .collect::<Float32Array>();
3085
3086 let f64_values = (0..MEDIUM_SIZE)
3087 .map(|i| Some(if i % 2 == 0 { f64::NAN } else { i as f64 }))
3088 .collect::<Float64Array>();
3089
3090 let batch = RecordBatch::try_new(
3091 Arc::new(schema),
3092 vec![
3093 Arc::new(f16_values),
3094 Arc::new(f32_values),
3095 Arc::new(f64_values),
3096 ],
3097 )
3098 .unwrap();
3099
3100 roundtrip(batch, None);
3101 }
3102
3103 const SMALL_SIZE: usize = 7;
3104 const MEDIUM_SIZE: usize = 63;
3105
3106 fn roundtrip(expected_batch: RecordBatch, max_row_group_size: Option<usize>) -> Vec<Bytes> {
3109 let mut files = vec![];
3110 for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
3111 let mut props = WriterProperties::builder().set_writer_version(version);
3112
3113 if let Some(size) = max_row_group_size {
3114 props = props.set_max_row_group_row_count(Some(size))
3115 }
3116
3117 let props = props.build();
3118 files.push(roundtrip_opts(&expected_batch, props))
3119 }
3120 files
3121 }
3122
3123 fn roundtrip_opts_with_array_validation<F>(
3127 expected_batch: &RecordBatch,
3128 props: WriterProperties,
3129 validate: F,
3130 ) -> Bytes
3131 where
3132 F: Fn(&ArrayData, &ArrayData),
3133 {
3134 let mut file = vec![];
3135
3136 let mut writer = ArrowWriter::try_new(&mut file, expected_batch.schema(), Some(props))
3137 .expect("Unable to write file");
3138 writer.write(expected_batch).unwrap();
3139 writer.close().unwrap();
3140
3141 let file = Bytes::from(file);
3142 let mut record_batch_reader =
3143 ParquetRecordBatchReader::try_new(file.clone(), 1024).unwrap();
3144
3145 let actual_batch = record_batch_reader
3146 .next()
3147 .expect("No batch found")
3148 .expect("Unable to get batch");
3149
3150 assert_eq!(expected_batch.schema(), actual_batch.schema());
3151 assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
3152 assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
3153 for i in 0..expected_batch.num_columns() {
3154 let expected_data = expected_batch.column(i).to_data();
3155 let actual_data = actual_batch.column(i).to_data();
3156 validate(&expected_data, &actual_data);
3157 }
3158
3159 file
3160 }
3161
3162 fn roundtrip_opts(expected_batch: &RecordBatch, props: WriterProperties) -> Bytes {
3163 roundtrip_opts_with_array_validation(expected_batch, props, |a, b| {
3164 a.validate_full().expect("valid expected data");
3165 b.validate_full().expect("valid actual data");
3166 assert_eq!(a, b)
3167 })
3168 }
3169
3170 struct RoundTripOptions {
3171 values: ArrayRef,
3172 schema: SchemaRef,
3173 bloom_filter: bool,
3174 bloom_filter_ndv: Option<u64>,
3175 bloom_filter_position: BloomFilterPosition,
3176 }
3177
3178 impl RoundTripOptions {
3179 fn new(values: ArrayRef, nullable: bool) -> Self {
3180 let data_type = values.data_type().clone();
3181 let schema = Schema::new(vec![Field::new("col", data_type, nullable)]);
3182 Self {
3183 values,
3184 schema: Arc::new(schema),
3185 bloom_filter: false,
3186 bloom_filter_ndv: None,
3187 bloom_filter_position: BloomFilterPosition::AfterRowGroup,
3188 }
3189 }
3190 }
3191
3192 fn one_column_roundtrip(values: ArrayRef, nullable: bool) -> Vec<Bytes> {
3193 one_column_roundtrip_with_options(RoundTripOptions::new(values, nullable))
3194 }
3195
3196 fn one_column_roundtrip_with_schema(values: ArrayRef, schema: SchemaRef) -> Vec<Bytes> {
3197 let mut options = RoundTripOptions::new(values, false);
3198 options.schema = schema;
3199 one_column_roundtrip_with_options(options)
3200 }
3201
3202 fn one_column_roundtrip_with_options(options: RoundTripOptions) -> Vec<Bytes> {
3203 let RoundTripOptions {
3204 values,
3205 schema,
3206 bloom_filter,
3207 bloom_filter_ndv,
3208 bloom_filter_position,
3209 } = options;
3210
3211 let encodings = match values.data_type() {
3212 DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary | DataType::LargeBinary => {
3213 vec![
3214 Encoding::PLAIN,
3215 Encoding::DELTA_BYTE_ARRAY,
3216 Encoding::DELTA_LENGTH_BYTE_ARRAY,
3217 ]
3218 }
3219 DataType::Int64
3220 | DataType::Int32
3221 | DataType::Int16
3222 | DataType::Int8
3223 | DataType::UInt64
3224 | DataType::UInt32
3225 | DataType::UInt16
3226 | DataType::UInt8 => vec![
3227 Encoding::PLAIN,
3228 Encoding::DELTA_BINARY_PACKED,
3229 Encoding::BYTE_STREAM_SPLIT,
3230 ],
3231 DataType::Float32 | DataType::Float64 => {
3232 vec![Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT]
3233 }
3234 _ => vec![Encoding::PLAIN],
3235 };
3236
3237 let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3238
3239 let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
3240
3241 let mut files = vec![];
3242 for dictionary_size in [0, 1, 1024] {
3243 for encoding in &encodings {
3244 for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
3245 for row_group_size in row_group_sizes {
3246 let mut builder = WriterProperties::builder()
3247 .set_writer_version(version)
3248 .set_max_row_group_row_count(Some(row_group_size))
3249 .set_dictionary_enabled(dictionary_size != 0)
3250 .set_dictionary_page_size_limit(dictionary_size.max(1))
3251 .set_encoding(*encoding)
3252 .set_bloom_filter_enabled(bloom_filter)
3253 .set_bloom_filter_position(bloom_filter_position);
3254 if let Some(ndv) = bloom_filter_ndv {
3255 builder = builder.set_bloom_filter_max_ndv(ndv);
3256 }
3257 let props = builder.build();
3258
3259 files.push(roundtrip_opts(&expected_batch, props))
3260 }
3261 }
3262 }
3263 }
3264 files
3265 }
3266
3267 fn values_required<A, I>(iter: I) -> Vec<Bytes>
3268 where
3269 A: From<Vec<I::Item>> + Array + 'static,
3270 I: IntoIterator,
3271 {
3272 let raw_values: Vec<_> = iter.into_iter().collect();
3273 let values = Arc::new(A::from(raw_values));
3274 one_column_roundtrip(values, false)
3275 }
3276
3277 fn values_optional<A, I>(iter: I) -> Vec<Bytes>
3278 where
3279 A: From<Vec<Option<I::Item>>> + Array + 'static,
3280 I: IntoIterator,
3281 {
3282 let optional_raw_values: Vec<_> = iter
3283 .into_iter()
3284 .enumerate()
3285 .map(|(i, v)| if i % 2 == 0 { None } else { Some(v) })
3286 .collect();
3287 let optional_values = Arc::new(A::from(optional_raw_values));
3288 one_column_roundtrip(optional_values, true)
3289 }
3290
3291 fn required_and_optional<A, I>(iter: I)
3292 where
3293 A: From<Vec<I::Item>> + From<Vec<Option<I::Item>>> + Array + 'static,
3294 I: IntoIterator + Clone,
3295 {
3296 values_required::<A, I>(iter.clone());
3297 values_optional::<A, I>(iter);
3298 }
3299
3300 fn check_bloom_filter<T: AsBytes>(
3301 files: Vec<Bytes>,
3302 file_column: String,
3303 positive_values: Vec<T>,
3304 negative_values: Vec<T>,
3305 ) {
3306 files.into_iter().take(1).for_each(|file| {
3307 let file_reader = SerializedFileReader::new_with_options(
3308 file,
3309 ReadOptionsBuilder::new()
3310 .with_reader_properties(
3311 ReaderProperties::builder()
3312 .set_read_bloom_filter(true)
3313 .build(),
3314 )
3315 .build(),
3316 )
3317 .expect("Unable to open file as Parquet");
3318 let metadata = file_reader.metadata();
3319
3320 let mut bloom_filters: Vec<_> = vec![];
3322 for (ri, row_group) in metadata.row_groups().iter().enumerate() {
3323 if let Some((column_index, _)) = row_group
3324 .columns()
3325 .iter()
3326 .enumerate()
3327 .find(|(_, column)| column.column_path().string() == file_column)
3328 {
3329 let row_group_reader = file_reader
3330 .get_row_group(ri)
3331 .expect("Unable to read row group");
3332 if let Some(sbbf) = row_group_reader.get_column_bloom_filter(column_index) {
3333 bloom_filters.push(sbbf.clone());
3334 } else {
3335 panic!("No bloom filter for column named {file_column} found");
3336 }
3337 } else {
3338 panic!("No column named {file_column} found");
3339 }
3340 }
3341
3342 positive_values.iter().for_each(|value| {
3343 let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
3344 assert!(
3345 found.is_some(),
3346 "{}",
3347 format!("Value {:?} should be in bloom filter", value.as_bytes())
3348 );
3349 });
3350
3351 negative_values.iter().for_each(|value| {
3352 let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
3353 assert!(
3354 found.is_none(),
3355 "{}",
3356 format!("Value {:?} should not be in bloom filter", value.as_bytes())
3357 );
3358 });
3359 });
3360 }
3361
3362 #[test]
3363 fn all_null_primitive_single_column() {
3364 let values = Arc::new(Int32Array::from(vec![None; SMALL_SIZE]));
3365 one_column_roundtrip(values, true);
3366 }
3367 #[test]
3368 fn null_single_column() {
3369 let values = Arc::new(NullArray::new(SMALL_SIZE));
3370 one_column_roundtrip(values, true);
3371 }
3373
3374 #[test]
3375 fn bool_single_column() {
3376 required_and_optional::<BooleanArray, _>(
3377 [true, false].iter().cycle().copied().take(SMALL_SIZE),
3378 );
3379 }
3380
3381 #[test]
3382 fn bool_large_single_column() {
3383 let values = Arc::new(
3384 [None, Some(true), Some(false)]
3385 .iter()
3386 .cycle()
3387 .copied()
3388 .take(200_000)
3389 .collect::<BooleanArray>(),
3390 );
3391 let schema = Schema::new(vec![Field::new("col", values.data_type().clone(), true)]);
3392 let expected_batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
3393 let file = tempfile::tempfile().unwrap();
3394
3395 let mut writer =
3396 ArrowWriter::try_new(file.try_clone().unwrap(), expected_batch.schema(), None)
3397 .expect("Unable to write file");
3398 writer.write(&expected_batch).unwrap();
3399 writer.close().unwrap();
3400 }
3401
3402 #[test]
3403 fn check_page_offset_index_with_nan() {
3404 let values = Arc::new(Float64Array::from(vec![f64::NAN; 10]));
3405 let schema = Schema::new(vec![Field::new("col", DataType::Float64, true)]);
3406 let batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
3407
3408 let mut out = Vec::with_capacity(1024);
3409 let mut writer =
3410 ArrowWriter::try_new(&mut out, batch.schema(), None).expect("Unable to write file");
3411 writer.write(&batch).unwrap();
3412 let file_meta_data = writer.close().unwrap();
3413 for row_group in file_meta_data.row_groups() {
3414 for column in row_group.columns() {
3415 assert!(column.offset_index_offset().is_some());
3416 assert!(column.offset_index_length().is_some());
3417 assert!(column.column_index_offset().is_none());
3418 assert!(column.column_index_length().is_none());
3419 }
3420 }
3421 }
3422
3423 #[test]
3424 fn i8_single_column() {
3425 required_and_optional::<Int8Array, _>(0..SMALL_SIZE as i8);
3426 }
3427
3428 #[test]
3429 fn i16_single_column() {
3430 required_and_optional::<Int16Array, _>(0..SMALL_SIZE as i16);
3431 }
3432
3433 #[test]
3434 fn i32_single_column() {
3435 required_and_optional::<Int32Array, _>(0..SMALL_SIZE as i32);
3436 }
3437
3438 #[test]
3439 fn i64_single_column() {
3440 required_and_optional::<Int64Array, _>(0..SMALL_SIZE as i64);
3441 }
3442
3443 #[test]
3444 fn u8_single_column() {
3445 required_and_optional::<UInt8Array, _>(0..SMALL_SIZE as u8);
3446 }
3447
3448 #[test]
3449 fn u16_single_column() {
3450 required_and_optional::<UInt16Array, _>(0..SMALL_SIZE as u16);
3451 }
3452
3453 #[test]
3454 fn u32_single_column() {
3455 required_and_optional::<UInt32Array, _>(0..SMALL_SIZE as u32);
3456 }
3457
3458 #[test]
3459 fn u64_single_column() {
3460 required_and_optional::<UInt64Array, _>(0..SMALL_SIZE as u64);
3461 }
3462
3463 #[test]
3464 fn f32_single_column() {
3465 required_and_optional::<Float32Array, _>((0..SMALL_SIZE).map(|i| i as f32));
3466 }
3467
3468 #[test]
3469 fn f64_single_column() {
3470 required_and_optional::<Float64Array, _>((0..SMALL_SIZE).map(|i| i as f64));
3471 }
3472
3473 #[test]
3478 fn timestamp_second_single_column() {
3479 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
3480 let values = Arc::new(TimestampSecondArray::from(raw_values));
3481
3482 one_column_roundtrip(values, false);
3483 }
3484
3485 #[test]
3486 fn timestamp_millisecond_single_column() {
3487 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
3488 let values = Arc::new(TimestampMillisecondArray::from(raw_values));
3489
3490 one_column_roundtrip(values, false);
3491 }
3492
3493 #[test]
3494 fn timestamp_microsecond_single_column() {
3495 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
3496 let values = Arc::new(TimestampMicrosecondArray::from(raw_values));
3497
3498 one_column_roundtrip(values, false);
3499 }
3500
3501 #[test]
3502 fn timestamp_nanosecond_single_column() {
3503 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
3504 let values = Arc::new(TimestampNanosecondArray::from(raw_values));
3505
3506 one_column_roundtrip(values, false);
3507 }
3508
3509 #[test]
3510 fn date32_single_column() {
3511 required_and_optional::<Date32Array, _>(0..SMALL_SIZE as i32);
3512 }
3513
3514 #[test]
3515 fn date64_single_column() {
3516 required_and_optional::<Date64Array, _>(
3518 (0..(SMALL_SIZE as i64 * 86400000)).step_by(86400000),
3519 );
3520 }
3521
3522 #[test]
3523 fn time32_second_single_column() {
3524 required_and_optional::<Time32SecondArray, _>(0..SMALL_SIZE as i32);
3525 }
3526
3527 #[test]
3528 fn time32_millisecond_single_column() {
3529 required_and_optional::<Time32MillisecondArray, _>(0..SMALL_SIZE as i32);
3530 }
3531
3532 #[test]
3533 fn time64_microsecond_single_column() {
3534 required_and_optional::<Time64MicrosecondArray, _>(0..SMALL_SIZE as i64);
3535 }
3536
3537 #[test]
3538 fn time64_nanosecond_single_column() {
3539 required_and_optional::<Time64NanosecondArray, _>(0..SMALL_SIZE as i64);
3540 }
3541
3542 #[test]
3543 fn duration_second_single_column() {
3544 required_and_optional::<DurationSecondArray, _>(0..SMALL_SIZE as i64);
3545 }
3546
3547 #[test]
3548 fn duration_millisecond_single_column() {
3549 required_and_optional::<DurationMillisecondArray, _>(0..SMALL_SIZE as i64);
3550 }
3551
3552 #[test]
3553 fn duration_microsecond_single_column() {
3554 required_and_optional::<DurationMicrosecondArray, _>(0..SMALL_SIZE as i64);
3555 }
3556
3557 #[test]
3558 fn duration_nanosecond_single_column() {
3559 required_and_optional::<DurationNanosecondArray, _>(0..SMALL_SIZE as i64);
3560 }
3561
3562 #[test]
3563 fn interval_year_month_single_column() {
3564 required_and_optional::<IntervalYearMonthArray, _>(0..SMALL_SIZE as i32);
3565 }
3566
3567 #[test]
3568 fn interval_day_time_single_column() {
3569 required_and_optional::<IntervalDayTimeArray, _>(vec![
3570 IntervalDayTime::new(0, 1),
3571 IntervalDayTime::new(0, 3),
3572 IntervalDayTime::new(3, -2),
3573 IntervalDayTime::new(-200, 4),
3574 ]);
3575 }
3576
3577 #[test]
3578 #[should_panic(
3579 expected = "Attempting to write an Arrow interval type MonthDayNano to parquet that is not yet implemented"
3580 )]
3581 fn interval_month_day_nano_single_column() {
3582 required_and_optional::<IntervalMonthDayNanoArray, _>(vec![
3583 IntervalMonthDayNano::new(0, 1, 5),
3584 IntervalMonthDayNano::new(0, 3, 2),
3585 IntervalMonthDayNano::new(3, -2, -5),
3586 IntervalMonthDayNano::new(-200, 4, -1),
3587 ]);
3588 }
3589
3590 #[test]
3591 fn binary_single_column() {
3592 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3593 let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3594 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3595
3596 values_required::<BinaryArray, _>(many_vecs_iter);
3598 }
3599
3600 #[test]
3601 fn binary_view_single_column() {
3602 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3603 let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3604 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3605
3606 values_required::<BinaryViewArray, _>(many_vecs_iter);
3608 }
3609
3610 #[test]
3611 fn i32_column_bloom_filter_at_end() {
3612 let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
3613 let mut options = RoundTripOptions::new(array, false);
3614 options.bloom_filter = true;
3615 options.bloom_filter_position = BloomFilterPosition::End;
3616
3617 let files = one_column_roundtrip_with_options(options);
3618 check_bloom_filter(
3619 files,
3620 "col".to_string(),
3621 (0..SMALL_SIZE as i32).collect(),
3622 (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3623 );
3624 }
3625
3626 #[test]
3627 fn i32_column_bloom_filter() {
3628 let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
3629 let mut options = RoundTripOptions::new(array, false);
3630 options.bloom_filter = true;
3631
3632 let files = one_column_roundtrip_with_options(options);
3633 check_bloom_filter(
3634 files,
3635 "col".to_string(),
3636 (0..SMALL_SIZE as i32).collect(),
3637 (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3638 );
3639 }
3640
3641 #[test]
3646 fn i32_column_bloom_filter_fixed_ndv() {
3647 let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
3648
3649 let mut options = RoundTripOptions::new(array.clone(), false);
3651 options.bloom_filter = true;
3652 options.bloom_filter_ndv = Some(1_000_000);
3653
3654 let files = one_column_roundtrip_with_options(options);
3655 check_bloom_filter(
3656 files,
3657 "col".to_string(),
3658 (0..SMALL_SIZE as i32).collect(),
3659 (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3660 );
3661
3662 let mut options = RoundTripOptions::new(array, false);
3664 options.bloom_filter = true;
3665 options.bloom_filter_ndv = Some(3);
3666
3667 let files = one_column_roundtrip_with_options(options);
3668 check_bloom_filter(
3669 files,
3670 "col".to_string(),
3671 (0..SMALL_SIZE as i32).collect(),
3672 (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3673 );
3674 }
3675
3676 #[test]
3677 fn binary_column_bloom_filter() {
3678 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3679 let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3680 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3681
3682 let array = Arc::new(BinaryArray::from_iter_values(many_vecs_iter));
3683 let mut options = RoundTripOptions::new(array, false);
3684 options.bloom_filter = true;
3685
3686 let files = one_column_roundtrip_with_options(options);
3687 check_bloom_filter(
3688 files,
3689 "col".to_string(),
3690 many_vecs,
3691 vec![vec![(SMALL_SIZE + 1) as u8]],
3692 );
3693 }
3694
3695 #[test]
3696 fn empty_string_null_column_bloom_filter() {
3697 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3698 let raw_strs = raw_values.iter().map(|s| s.as_str());
3699
3700 let array = Arc::new(StringArray::from_iter_values(raw_strs));
3701 let mut options = RoundTripOptions::new(array, false);
3702 options.bloom_filter = true;
3703
3704 let files = one_column_roundtrip_with_options(options);
3705
3706 let optional_raw_values: Vec<_> = raw_values
3707 .iter()
3708 .enumerate()
3709 .filter_map(|(i, v)| if i % 2 == 0 { None } else { Some(v.as_str()) })
3710 .collect();
3711 check_bloom_filter(files, "col".to_string(), optional_raw_values, vec![""]);
3713 }
3714
3715 #[test]
3716 fn large_binary_single_column() {
3717 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3718 let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3719 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3720
3721 values_required::<LargeBinaryArray, _>(many_vecs_iter);
3723 }
3724
3725 #[test]
3726 fn fixed_size_binary_single_column() {
3727 let mut builder = FixedSizeBinaryBuilder::new(4);
3728 builder.append_value(b"0123").unwrap();
3729 builder.append_null();
3730 builder.append_value(b"8910").unwrap();
3731 builder.append_value(b"1112").unwrap();
3732 let array = Arc::new(builder.finish());
3733
3734 one_column_roundtrip(array, true);
3735 }
3736
3737 #[test]
3738 fn string_single_column() {
3739 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3740 let raw_strs = raw_values.iter().map(|s| s.as_str());
3741
3742 required_and_optional::<StringArray, _>(raw_strs);
3743 }
3744
3745 #[test]
3746 fn large_string_single_column() {
3747 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3748 let raw_strs = raw_values.iter().map(|s| s.as_str());
3749
3750 required_and_optional::<LargeStringArray, _>(raw_strs);
3751 }
3752
3753 #[test]
3754 fn string_view_single_column() {
3755 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3756 let raw_strs = raw_values.iter().map(|s| s.as_str());
3757
3758 required_and_optional::<StringViewArray, _>(raw_strs);
3759 }
3760
3761 #[test]
3762 fn null_list_single_column() {
3763 let null_field = Field::new_list_field(DataType::Null, true);
3764 let list_field = Field::new("emptylist", DataType::List(Arc::new(null_field)), true);
3765
3766 let schema = Schema::new(vec![list_field]);
3767
3768 let a_values = NullArray::new(2);
3770 let a_value_offsets = arrow::buffer::Buffer::from([0, 0, 0, 2].to_byte_slice());
3771 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
3772 DataType::Null,
3773 true,
3774 ))))
3775 .len(3)
3776 .add_buffer(a_value_offsets)
3777 .null_bit_buffer(Some(Buffer::from([0b00000101])))
3778 .add_child_data(a_values.into_data())
3779 .build()
3780 .unwrap();
3781
3782 let a = ListArray::from(a_list_data);
3783
3784 assert!(a.is_valid(0));
3785 assert!(!a.is_valid(1));
3786 assert!(a.is_valid(2));
3787
3788 assert_eq!(a.value(0).len(), 0);
3789 assert_eq!(a.value(2).len(), 2);
3790 assert_eq!(a.value(2).logical_nulls().unwrap().null_count(), 2);
3791
3792 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
3793 roundtrip(batch, None);
3794 }
3795
3796 #[test]
3797 fn list_single_column() {
3798 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
3799 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
3800 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
3801 DataType::Int32,
3802 false,
3803 ))))
3804 .len(5)
3805 .add_buffer(a_value_offsets)
3806 .null_bit_buffer(Some(Buffer::from([0b00011011])))
3807 .add_child_data(a_values.into_data())
3808 .build()
3809 .unwrap();
3810
3811 assert_eq!(a_list_data.null_count(), 1);
3812
3813 let a = ListArray::from(a_list_data);
3814 let values = Arc::new(a);
3815
3816 one_column_roundtrip(values, true);
3817 }
3818
3819 #[test]
3820 fn large_list_single_column() {
3821 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
3822 let a_value_offsets = arrow::buffer::Buffer::from([0i64, 1, 3, 3, 6, 10].to_byte_slice());
3823 let a_list_data = ArrayData::builder(DataType::LargeList(Arc::new(Field::new(
3824 "large_item",
3825 DataType::Int32,
3826 true,
3827 ))))
3828 .len(5)
3829 .add_buffer(a_value_offsets)
3830 .add_child_data(a_values.into_data())
3831 .null_bit_buffer(Some(Buffer::from([0b00011011])))
3832 .build()
3833 .unwrap();
3834
3835 assert_eq!(a_list_data.null_count(), 1);
3837
3838 let a = LargeListArray::from(a_list_data);
3839 let values = Arc::new(a);
3840
3841 one_column_roundtrip(values, true);
3842 }
3843
3844 #[test]
3845 fn list_nested_nulls() {
3846 use arrow::datatypes::Int32Type;
3847 let data = vec![
3848 Some(vec![Some(1)]),
3849 Some(vec![Some(2), Some(3)]),
3850 None,
3851 Some(vec![Some(4), Some(5), None]),
3852 Some(vec![None]),
3853 Some(vec![Some(6), Some(7)]),
3854 ];
3855
3856 let list = ListArray::from_iter_primitive::<Int32Type, _, _>(data.clone());
3857 one_column_roundtrip(Arc::new(list), true);
3858
3859 let list = LargeListArray::from_iter_primitive::<Int32Type, _, _>(data);
3860 one_column_roundtrip(Arc::new(list), true);
3861 }
3862
3863 #[test]
3864 fn struct_single_column() {
3865 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
3866 let struct_field_a = Arc::new(Field::new("f", DataType::Int32, false));
3867 let s = StructArray::from(vec![(struct_field_a, Arc::new(a_values) as ArrayRef)]);
3868
3869 let values = Arc::new(s);
3870 one_column_roundtrip(values, false);
3871 }
3872
3873 #[test]
3874 fn list_and_map_coerced_names() {
3875 let list_field =
3877 Field::new_list("my_list", Field::new("item", DataType::Int32, false), false);
3878 let map_field = Field::new_map(
3879 "my_map",
3880 "entries",
3881 Field::new("keys", DataType::Int32, false),
3882 Field::new("values", DataType::Int32, true),
3883 false,
3884 true,
3885 );
3886
3887 let list_array = create_random_array(&list_field, 100, 0.0, 0.0).unwrap();
3888 let map_array = create_random_array(&map_field, 100, 0.0, 0.0).unwrap();
3889
3890 let arrow_schema = Arc::new(Schema::new(vec![list_field, map_field]));
3891
3892 let props = Some(WriterProperties::builder().set_coerce_types(true).build());
3894 let file = tempfile::tempfile().unwrap();
3895 let mut writer =
3896 ArrowWriter::try_new(file.try_clone().unwrap(), arrow_schema.clone(), props).unwrap();
3897
3898 let batch = RecordBatch::try_new(arrow_schema, vec![list_array, map_array]).unwrap();
3899 writer.write(&batch).unwrap();
3900 let file_metadata = writer.close().unwrap();
3901
3902 let schema = file_metadata.file_metadata().schema();
3903 let list_field = &schema.get_fields()[0].get_fields()[0];
3905 assert_eq!(list_field.get_fields()[0].name(), "element");
3906
3907 let map_field = &schema.get_fields()[1].get_fields()[0];
3908 assert_eq!(map_field.name(), "key_value");
3910 assert_eq!(map_field.get_fields()[0].name(), "key");
3912 assert_eq!(map_field.get_fields()[1].name(), "value");
3914
3915 let reader = SerializedFileReader::new(file).unwrap();
3917 let file_schema = reader.metadata().file_metadata().schema();
3918 let fields = file_schema.get_fields();
3919 let list_field = &fields[0].get_fields()[0];
3920 assert_eq!(list_field.get_fields()[0].name(), "element");
3921 let map_field = &fields[1].get_fields()[0];
3922 assert_eq!(map_field.name(), "key_value");
3923 assert_eq!(map_field.get_fields()[0].name(), "key");
3924 assert_eq!(map_field.get_fields()[1].name(), "value");
3925 }
3926
3927 #[test]
3928 fn fallback_flush_data_page() {
3929 let raw_values: Vec<_> = (0..MEDIUM_SIZE).map(|i| i.to_string()).collect();
3931 let values = Arc::new(StringArray::from(raw_values));
3932 let encodings = vec![
3933 Encoding::DELTA_BYTE_ARRAY,
3934 Encoding::DELTA_LENGTH_BYTE_ARRAY,
3935 ];
3936 let data_type = values.data_type().clone();
3937 let schema = Arc::new(Schema::new(vec![Field::new("col", data_type, false)]));
3938 let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3939
3940 let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
3941 let data_page_size_limit: usize = 32;
3942 let write_batch_size: usize = 16;
3943
3944 for encoding in &encodings {
3945 for row_group_size in row_group_sizes {
3946 let props = WriterProperties::builder()
3947 .set_writer_version(WriterVersion::PARQUET_2_0)
3948 .set_max_row_group_row_count(Some(row_group_size))
3949 .set_dictionary_enabled(false)
3950 .set_encoding(*encoding)
3951 .set_data_page_size_limit(data_page_size_limit)
3952 .set_write_batch_size(write_batch_size)
3953 .build();
3954
3955 roundtrip_opts_with_array_validation(&expected_batch, props, |a, b| {
3956 let string_array_a = StringArray::from(a.clone());
3957 let string_array_b = StringArray::from(b.clone());
3958 let vec_a: Vec<&str> = string_array_a.iter().map(|v| v.unwrap()).collect();
3959 let vec_b: Vec<&str> = string_array_b.iter().map(|v| v.unwrap()).collect();
3960 assert_eq!(
3961 vec_a, vec_b,
3962 "failed for encoder: {encoding:?} and row_group_size: {row_group_size:?}"
3963 );
3964 });
3965 }
3966 }
3967 }
3968
3969 #[test]
3970 fn arrow_writer_string_dictionary() {
3971 #[allow(deprecated)]
3973 let schema = Arc::new(Schema::new(vec![Field::new_dict(
3974 "dictionary",
3975 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3976 true,
3977 42,
3978 true,
3979 )]));
3980
3981 let d: Int32DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
3983 .iter()
3984 .copied()
3985 .collect();
3986
3987 one_column_roundtrip_with_schema(Arc::new(d), schema);
3989 }
3990
3991 #[test]
3992 fn arrow_writer_test_type_compatibility() {
3993 fn ensure_compatible_write<T1, T2>(array1: T1, array2: T2, expected_result: T1)
3994 where
3995 T1: Array + 'static,
3996 T2: Array + 'static,
3997 {
3998 let schema1 = Arc::new(Schema::new(vec![Field::new(
3999 "a",
4000 array1.data_type().clone(),
4001 false,
4002 )]));
4003
4004 let file = tempfile().unwrap();
4005 let mut writer =
4006 ArrowWriter::try_new(file.try_clone().unwrap(), schema1.clone(), None).unwrap();
4007
4008 let rb1 = RecordBatch::try_new(schema1.clone(), vec![Arc::new(array1)]).unwrap();
4009 writer.write(&rb1).unwrap();
4010
4011 let schema2 = Arc::new(Schema::new(vec![Field::new(
4012 "a",
4013 array2.data_type().clone(),
4014 false,
4015 )]));
4016 let rb2 = RecordBatch::try_new(schema2, vec![Arc::new(array2)]).unwrap();
4017 writer.write(&rb2).unwrap();
4018
4019 writer.close().unwrap();
4020
4021 let mut record_batch_reader =
4022 ParquetRecordBatchReader::try_new(file.try_clone().unwrap(), 1024).unwrap();
4023 let actual_batch = record_batch_reader.next().unwrap().unwrap();
4024
4025 let expected_batch =
4026 RecordBatch::try_new(schema1, vec![Arc::new(expected_result)]).unwrap();
4027 assert_eq!(actual_batch, expected_batch);
4028 }
4029
4030 ensure_compatible_write(
4033 DictionaryArray::new(
4034 UInt8Array::from_iter_values(vec![0]),
4035 Arc::new(StringArray::from_iter_values(vec!["parquet"])),
4036 ),
4037 StringArray::from_iter_values(vec!["barquet"]),
4038 DictionaryArray::new(
4039 UInt8Array::from_iter_values(vec![0, 1]),
4040 Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
4041 ),
4042 );
4043
4044 ensure_compatible_write(
4045 StringArray::from_iter_values(vec!["parquet"]),
4046 DictionaryArray::new(
4047 UInt8Array::from_iter_values(vec![0]),
4048 Arc::new(StringArray::from_iter_values(vec!["barquet"])),
4049 ),
4050 StringArray::from_iter_values(vec!["parquet", "barquet"]),
4051 );
4052
4053 ensure_compatible_write(
4056 DictionaryArray::new(
4057 UInt8Array::from_iter_values(vec![0]),
4058 Arc::new(StringArray::from_iter_values(vec!["parquet"])),
4059 ),
4060 DictionaryArray::new(
4061 UInt16Array::from_iter_values(vec![0]),
4062 Arc::new(StringArray::from_iter_values(vec!["barquet"])),
4063 ),
4064 DictionaryArray::new(
4065 UInt8Array::from_iter_values(vec![0, 1]),
4066 Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
4067 ),
4068 );
4069
4070 ensure_compatible_write(
4072 DictionaryArray::new(
4073 UInt8Array::from_iter_values(vec![0]),
4074 Arc::new(StringArray::from_iter_values(vec!["parquet"])),
4075 ),
4076 DictionaryArray::new(
4077 UInt8Array::from_iter_values(vec![0]),
4078 Arc::new(LargeStringArray::from_iter_values(vec!["barquet"])),
4079 ),
4080 DictionaryArray::new(
4081 UInt8Array::from_iter_values(vec![0, 1]),
4082 Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
4083 ),
4084 );
4085
4086 ensure_compatible_write(
4088 DictionaryArray::new(
4089 UInt8Array::from_iter_values(vec![0]),
4090 Arc::new(StringArray::from_iter_values(vec!["parquet"])),
4091 ),
4092 LargeStringArray::from_iter_values(vec!["barquet"]),
4093 DictionaryArray::new(
4094 UInt8Array::from_iter_values(vec![0, 1]),
4095 Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
4096 ),
4097 );
4098
4099 ensure_compatible_write(
4102 StringArray::from_iter_values(vec!["parquet"]),
4103 LargeStringArray::from_iter_values(vec!["barquet"]),
4104 StringArray::from_iter_values(vec!["parquet", "barquet"]),
4105 );
4106
4107 ensure_compatible_write(
4108 LargeStringArray::from_iter_values(vec!["parquet"]),
4109 StringArray::from_iter_values(vec!["barquet"]),
4110 LargeStringArray::from_iter_values(vec!["parquet", "barquet"]),
4111 );
4112
4113 ensure_compatible_write(
4114 StringArray::from_iter_values(vec!["parquet"]),
4115 StringViewArray::from_iter_values(vec!["barquet"]),
4116 StringArray::from_iter_values(vec!["parquet", "barquet"]),
4117 );
4118
4119 ensure_compatible_write(
4120 StringViewArray::from_iter_values(vec!["parquet"]),
4121 StringArray::from_iter_values(vec!["barquet"]),
4122 StringViewArray::from_iter_values(vec!["parquet", "barquet"]),
4123 );
4124
4125 ensure_compatible_write(
4126 LargeStringArray::from_iter_values(vec!["parquet"]),
4127 StringViewArray::from_iter_values(vec!["barquet"]),
4128 LargeStringArray::from_iter_values(vec!["parquet", "barquet"]),
4129 );
4130
4131 ensure_compatible_write(
4132 StringViewArray::from_iter_values(vec!["parquet"]),
4133 LargeStringArray::from_iter_values(vec!["barquet"]),
4134 StringViewArray::from_iter_values(vec!["parquet", "barquet"]),
4135 );
4136
4137 ensure_compatible_write(
4140 BinaryArray::from_iter_values(vec![b"parquet"]),
4141 LargeBinaryArray::from_iter_values(vec![b"barquet"]),
4142 BinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
4143 );
4144
4145 ensure_compatible_write(
4146 LargeBinaryArray::from_iter_values(vec![b"parquet"]),
4147 BinaryArray::from_iter_values(vec![b"barquet"]),
4148 LargeBinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
4149 );
4150
4151 ensure_compatible_write(
4152 BinaryArray::from_iter_values(vec![b"parquet"]),
4153 BinaryViewArray::from_iter_values(vec![b"barquet"]),
4154 BinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
4155 );
4156
4157 ensure_compatible_write(
4158 BinaryViewArray::from_iter_values(vec![b"parquet"]),
4159 BinaryArray::from_iter_values(vec![b"barquet"]),
4160 BinaryViewArray::from_iter_values(vec![b"parquet", b"barquet"]),
4161 );
4162
4163 ensure_compatible_write(
4164 BinaryViewArray::from_iter_values(vec![b"parquet"]),
4165 LargeBinaryArray::from_iter_values(vec![b"barquet"]),
4166 BinaryViewArray::from_iter_values(vec![b"parquet", b"barquet"]),
4167 );
4168
4169 ensure_compatible_write(
4170 LargeBinaryArray::from_iter_values(vec![b"parquet"]),
4171 BinaryViewArray::from_iter_values(vec![b"barquet"]),
4172 LargeBinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
4173 );
4174
4175 let list_field_metadata = HashMap::from_iter(vec![(
4178 PARQUET_FIELD_ID_META_KEY.to_string(),
4179 "1".to_string(),
4180 )]);
4181 let list_field = Field::new_list_field(DataType::Int32, false);
4182
4183 let values1 = Arc::new(Int32Array::from(vec![0, 1, 2, 3, 4]));
4184 let offsets1 = OffsetBuffer::new(vec![0, 2, 5].into());
4185
4186 let values2 = Arc::new(Int32Array::from(vec![5, 6, 7, 8, 9]));
4187 let offsets2 = OffsetBuffer::new(vec![0, 3, 5].into());
4188
4189 let values_expected = Arc::new(Int32Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]));
4190 let offsets_expected = OffsetBuffer::new(vec![0, 2, 5, 8, 10].into());
4191
4192 ensure_compatible_write(
4193 ListArray::try_new(
4195 Arc::new(
4196 list_field
4197 .clone()
4198 .with_metadata(list_field_metadata.clone()),
4199 ),
4200 offsets1,
4201 values1,
4202 None,
4203 )
4204 .unwrap(),
4205 ListArray::try_new(Arc::new(list_field.clone()), offsets2, values2, None).unwrap(),
4207 ListArray::try_new(
4209 Arc::new(
4210 list_field
4211 .clone()
4212 .with_metadata(list_field_metadata.clone()),
4213 ),
4214 offsets_expected,
4215 values_expected,
4216 None,
4217 )
4218 .unwrap(),
4219 );
4220 }
4221
4222 #[test]
4223 fn arrow_writer_primitive_dictionary() {
4224 #[allow(deprecated)]
4226 let schema = Arc::new(Schema::new(vec![Field::new_dict(
4227 "dictionary",
4228 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::UInt32)),
4229 true,
4230 42,
4231 true,
4232 )]));
4233
4234 let mut builder = PrimitiveDictionaryBuilder::<UInt8Type, UInt32Type>::new();
4236 builder.append(12345678).unwrap();
4237 builder.append_null();
4238 builder.append(22345678).unwrap();
4239 builder.append(12345678).unwrap();
4240 let d = builder.finish();
4241
4242 one_column_roundtrip_with_schema(Arc::new(d), schema);
4243 }
4244
4245 #[test]
4246 fn arrow_writer_decimal32_dictionary() {
4247 let integers = vec![12345, 56789, 34567];
4248
4249 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
4250
4251 let values = Decimal32Array::from(integers.clone())
4252 .with_precision_and_scale(5, 2)
4253 .unwrap();
4254
4255 let array = DictionaryArray::new(keys, Arc::new(values));
4256 one_column_roundtrip(Arc::new(array.clone()), true);
4257
4258 let values = Decimal32Array::from(integers)
4259 .with_precision_and_scale(9, 2)
4260 .unwrap();
4261
4262 let array = array.with_values(Arc::new(values));
4263 one_column_roundtrip(Arc::new(array), true);
4264 }
4265
4266 #[test]
4267 fn arrow_writer_decimal64_dictionary() {
4268 let integers = vec![12345, 56789, 34567];
4269
4270 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
4271
4272 let values = Decimal64Array::from(integers.clone())
4273 .with_precision_and_scale(5, 2)
4274 .unwrap();
4275
4276 let array = DictionaryArray::new(keys, Arc::new(values));
4277 one_column_roundtrip(Arc::new(array.clone()), true);
4278
4279 let values = Decimal64Array::from(integers)
4280 .with_precision_and_scale(12, 2)
4281 .unwrap();
4282
4283 let array = array.with_values(Arc::new(values));
4284 one_column_roundtrip(Arc::new(array), true);
4285 }
4286
4287 #[test]
4288 fn arrow_writer_decimal128_dictionary() {
4289 let integers = vec![12345, 56789, 34567];
4290
4291 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
4292
4293 let values = Decimal128Array::from(integers.clone())
4294 .with_precision_and_scale(5, 2)
4295 .unwrap();
4296
4297 let array = DictionaryArray::new(keys, Arc::new(values));
4298 one_column_roundtrip(Arc::new(array.clone()), true);
4299
4300 let values = Decimal128Array::from(integers)
4301 .with_precision_and_scale(12, 2)
4302 .unwrap();
4303
4304 let array = array.with_values(Arc::new(values));
4305 one_column_roundtrip(Arc::new(array), true);
4306 }
4307
4308 #[test]
4309 fn arrow_writer_decimal256_dictionary() {
4310 let integers = vec![
4311 i256::from_i128(12345),
4312 i256::from_i128(56789),
4313 i256::from_i128(34567),
4314 ];
4315
4316 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
4317
4318 let values = Decimal256Array::from(integers.clone())
4319 .with_precision_and_scale(5, 2)
4320 .unwrap();
4321
4322 let array = DictionaryArray::new(keys, Arc::new(values));
4323 one_column_roundtrip(Arc::new(array.clone()), true);
4324
4325 let values = Decimal256Array::from(integers)
4326 .with_precision_and_scale(12, 2)
4327 .unwrap();
4328
4329 let array = array.with_values(Arc::new(values));
4330 one_column_roundtrip(Arc::new(array), true);
4331 }
4332
4333 #[test]
4334 fn arrow_writer_string_dictionary_unsigned_index() {
4335 #[allow(deprecated)]
4337 let schema = Arc::new(Schema::new(vec![Field::new_dict(
4338 "dictionary",
4339 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
4340 true,
4341 42,
4342 true,
4343 )]));
4344
4345 let d: UInt8DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
4347 .iter()
4348 .copied()
4349 .collect();
4350
4351 one_column_roundtrip_with_schema(Arc::new(d), schema);
4352 }
4353
4354 #[test]
4355 fn u32_min_max() {
4356 let src = [
4358 u32::MIN,
4359 u32::MIN + 1,
4360 (i32::MAX as u32) - 1,
4361 i32::MAX as u32,
4362 (i32::MAX as u32) + 1,
4363 u32::MAX - 1,
4364 u32::MAX,
4365 ];
4366 let values = Arc::new(UInt32Array::from_iter_values(src.iter().cloned()));
4367 let files = one_column_roundtrip(values, false);
4368
4369 for file in files {
4370 let reader = SerializedFileReader::new(file).unwrap();
4372 let metadata = reader.metadata();
4373
4374 let mut row_offset = 0;
4375 for row_group in metadata.row_groups() {
4376 assert_eq!(row_group.num_columns(), 1);
4377 let column = row_group.column(0);
4378
4379 let num_values = column.num_values() as usize;
4380 let src_slice = &src[row_offset..row_offset + num_values];
4381 row_offset += column.num_values() as usize;
4382
4383 let stats = column.statistics().unwrap();
4384 if let Statistics::Int32(stats) = stats {
4385 assert_eq!(
4386 *stats.min_opt().unwrap() as u32,
4387 *src_slice.iter().min().unwrap()
4388 );
4389 assert_eq!(
4390 *stats.max_opt().unwrap() as u32,
4391 *src_slice.iter().max().unwrap()
4392 );
4393 } else {
4394 panic!("Statistics::Int32 missing")
4395 }
4396 }
4397 }
4398 }
4399
4400 #[test]
4401 fn u64_min_max() {
4402 let src = [
4404 u64::MIN,
4405 u64::MIN + 1,
4406 (i64::MAX as u64) - 1,
4407 i64::MAX as u64,
4408 (i64::MAX as u64) + 1,
4409 u64::MAX - 1,
4410 u64::MAX,
4411 ];
4412 let values = Arc::new(UInt64Array::from_iter_values(src.iter().cloned()));
4413 let files = one_column_roundtrip(values, false);
4414
4415 for file in files {
4416 let reader = SerializedFileReader::new(file).unwrap();
4418 let metadata = reader.metadata();
4419
4420 let mut row_offset = 0;
4421 for row_group in metadata.row_groups() {
4422 assert_eq!(row_group.num_columns(), 1);
4423 let column = row_group.column(0);
4424
4425 let num_values = column.num_values() as usize;
4426 let src_slice = &src[row_offset..row_offset + num_values];
4427 row_offset += column.num_values() as usize;
4428
4429 let stats = column.statistics().unwrap();
4430 if let Statistics::Int64(stats) = stats {
4431 assert_eq!(
4432 *stats.min_opt().unwrap() as u64,
4433 *src_slice.iter().min().unwrap()
4434 );
4435 assert_eq!(
4436 *stats.max_opt().unwrap() as u64,
4437 *src_slice.iter().max().unwrap()
4438 );
4439 } else {
4440 panic!("Statistics::Int64 missing")
4441 }
4442 }
4443 }
4444 }
4445
4446 #[test]
4447 fn statistics_null_counts_only_nulls() {
4448 let values = Arc::new(UInt64Array::from(vec![None, None]));
4450 let files = one_column_roundtrip(values, true);
4451
4452 for file in files {
4453 let reader = SerializedFileReader::new(file).unwrap();
4455 let metadata = reader.metadata();
4456 assert_eq!(metadata.num_row_groups(), 1);
4457 let row_group = metadata.row_group(0);
4458 assert_eq!(row_group.num_columns(), 1);
4459 let column = row_group.column(0);
4460 let stats = column.statistics().unwrap();
4461 assert_eq!(stats.null_count_opt(), Some(2));
4462 }
4463 }
4464
4465 #[test]
4466 fn test_list_of_struct_roundtrip() {
4467 let int_field = Field::new("a", DataType::Int32, true);
4469 let int_field2 = Field::new("b", DataType::Int32, true);
4470
4471 let int_builder = Int32Builder::with_capacity(10);
4472 let int_builder2 = Int32Builder::with_capacity(10);
4473
4474 let struct_builder = StructBuilder::new(
4475 vec![int_field, int_field2],
4476 vec![Box::new(int_builder), Box::new(int_builder2)],
4477 );
4478 let mut list_builder = ListBuilder::new(struct_builder);
4479
4480 let values = list_builder.values();
4485 values
4486 .field_builder::<Int32Builder>(0)
4487 .unwrap()
4488 .append_value(1);
4489 values
4490 .field_builder::<Int32Builder>(1)
4491 .unwrap()
4492 .append_value(2);
4493 values.append(true);
4494 list_builder.append(true);
4495
4496 list_builder.append(true);
4498
4499 list_builder.append(false);
4501
4502 let values = list_builder.values();
4504 values
4505 .field_builder::<Int32Builder>(0)
4506 .unwrap()
4507 .append_null();
4508 values
4509 .field_builder::<Int32Builder>(1)
4510 .unwrap()
4511 .append_null();
4512 values.append(false);
4513 values
4514 .field_builder::<Int32Builder>(0)
4515 .unwrap()
4516 .append_null();
4517 values
4518 .field_builder::<Int32Builder>(1)
4519 .unwrap()
4520 .append_null();
4521 values.append(false);
4522 list_builder.append(true);
4523
4524 let values = list_builder.values();
4526 values
4527 .field_builder::<Int32Builder>(0)
4528 .unwrap()
4529 .append_null();
4530 values
4531 .field_builder::<Int32Builder>(1)
4532 .unwrap()
4533 .append_value(3);
4534 values.append(true);
4535 list_builder.append(true);
4536
4537 let values = list_builder.values();
4539 values
4540 .field_builder::<Int32Builder>(0)
4541 .unwrap()
4542 .append_value(2);
4543 values
4544 .field_builder::<Int32Builder>(1)
4545 .unwrap()
4546 .append_null();
4547 values.append(true);
4548 list_builder.append(true);
4549
4550 let array = Arc::new(list_builder.finish());
4551
4552 one_column_roundtrip(array, true);
4553 }
4554
4555 fn row_group_sizes(metadata: &ParquetMetaData) -> Vec<i64> {
4556 metadata.row_groups().iter().map(|x| x.num_rows()).collect()
4557 }
4558
4559 #[test]
4560 fn test_aggregates_records() {
4561 let arrays = [
4562 Int32Array::from((0..100).collect::<Vec<_>>()),
4563 Int32Array::from((0..50).collect::<Vec<_>>()),
4564 Int32Array::from((200..500).collect::<Vec<_>>()),
4565 ];
4566
4567 let schema = Arc::new(Schema::new(vec![Field::new(
4568 "int",
4569 ArrowDataType::Int32,
4570 false,
4571 )]));
4572
4573 let file = tempfile::tempfile().unwrap();
4574
4575 let props = WriterProperties::builder()
4576 .set_max_row_group_row_count(Some(200))
4577 .build();
4578
4579 let mut writer =
4580 ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
4581
4582 for array in arrays {
4583 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
4584 writer.write(&batch).unwrap();
4585 }
4586
4587 writer.close().unwrap();
4588
4589 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4590 assert_eq!(&row_group_sizes(builder.metadata()), &[200, 200, 50]);
4591
4592 let batches = builder
4593 .with_batch_size(100)
4594 .build()
4595 .unwrap()
4596 .collect::<ArrowResult<Vec<_>>>()
4597 .unwrap();
4598
4599 assert_eq!(batches.len(), 5);
4600 assert!(batches.iter().all(|x| x.num_columns() == 1));
4601
4602 let batch_sizes: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
4603
4604 assert_eq!(&batch_sizes, &[100, 100, 100, 100, 50]);
4605
4606 let values: Vec<_> = batches
4607 .iter()
4608 .flat_map(|x| {
4609 x.column(0)
4610 .as_any()
4611 .downcast_ref::<Int32Array>()
4612 .unwrap()
4613 .values()
4614 .iter()
4615 .cloned()
4616 })
4617 .collect();
4618
4619 let expected_values: Vec<_> = [0..100, 0..50, 200..500].into_iter().flatten().collect();
4620 assert_eq!(&values, &expected_values)
4621 }
4622
4623 #[test]
4624 fn complex_aggregate() {
4625 let field_a = Arc::new(Field::new("leaf_a", DataType::Int32, false));
4627 let field_b = Arc::new(Field::new("leaf_b", DataType::Int32, true));
4628 let struct_a = Arc::new(Field::new(
4629 "struct_a",
4630 DataType::Struct(vec![field_a.clone(), field_b.clone()].into()),
4631 true,
4632 ));
4633
4634 let list_a = Arc::new(Field::new("list", DataType::List(struct_a), true));
4635 let struct_b = Arc::new(Field::new(
4636 "struct_b",
4637 DataType::Struct(vec![list_a.clone()].into()),
4638 false,
4639 ));
4640
4641 let schema = Arc::new(Schema::new(vec![struct_b]));
4642
4643 let field_a_array = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
4645 let field_b_array =
4646 Int32Array::from_iter(vec![Some(1), None, Some(2), None, None, Some(6)]);
4647
4648 let struct_a_array = StructArray::from(vec![
4649 (field_a.clone(), Arc::new(field_a_array) as ArrayRef),
4650 (field_b.clone(), Arc::new(field_b_array) as ArrayRef),
4651 ]);
4652
4653 let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
4654 .len(5)
4655 .add_buffer(Buffer::from_iter(vec![
4656 0_i32, 1_i32, 1_i32, 3_i32, 3_i32, 5_i32,
4657 ]))
4658 .null_bit_buffer(Some(Buffer::from_iter(vec![
4659 true, false, true, false, true,
4660 ])))
4661 .child_data(vec![struct_a_array.into_data()])
4662 .build()
4663 .unwrap();
4664
4665 let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
4666 let struct_b_array = StructArray::from(vec![(list_a.clone(), list_a_array)]);
4667
4668 let batch1 =
4669 RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
4670 .unwrap();
4671
4672 let field_a_array = Int32Array::from(vec![6, 7, 8, 9, 10]);
4673 let field_b_array = Int32Array::from_iter(vec![None, None, None, Some(1), None]);
4674
4675 let struct_a_array = StructArray::from(vec![
4676 (field_a, Arc::new(field_a_array) as ArrayRef),
4677 (field_b, Arc::new(field_b_array) as ArrayRef),
4678 ]);
4679
4680 let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
4681 .len(2)
4682 .add_buffer(Buffer::from_iter(vec![0_i32, 4_i32, 5_i32]))
4683 .child_data(vec![struct_a_array.into_data()])
4684 .build()
4685 .unwrap();
4686
4687 let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
4688 let struct_b_array = StructArray::from(vec![(list_a, list_a_array)]);
4689
4690 let batch2 =
4691 RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
4692 .unwrap();
4693
4694 let batches = &[batch1, batch2];
4695
4696 let expected = r#"
4699 +-------------------------------------------------------------------------------------------------------+
4700 | struct_b |
4701 +-------------------------------------------------------------------------------------------------------+
4702 | {list: [{leaf_a: 1, leaf_b: 1}]} |
4703 | {list: } |
4704 | {list: [{leaf_a: 2, leaf_b: }, {leaf_a: 3, leaf_b: 2}]} |
4705 | {list: } |
4706 | {list: [{leaf_a: 4, leaf_b: }, {leaf_a: 5, leaf_b: }]} |
4707 | {list: [{leaf_a: 6, leaf_b: }, {leaf_a: 7, leaf_b: }, {leaf_a: 8, leaf_b: }, {leaf_a: 9, leaf_b: 1}]} |
4708 | {list: [{leaf_a: 10, leaf_b: }]} |
4709 +-------------------------------------------------------------------------------------------------------+
4710 "#.trim().split('\n').map(|x| x.trim()).collect::<Vec<_>>().join("\n");
4711
4712 let actual = pretty_format_batches(batches).unwrap().to_string();
4713 assert_eq!(actual, expected);
4714
4715 let file = tempfile::tempfile().unwrap();
4717 let props = WriterProperties::builder()
4718 .set_max_row_group_row_count(Some(6))
4719 .build();
4720
4721 let mut writer =
4722 ArrowWriter::try_new(file.try_clone().unwrap(), schema, Some(props)).unwrap();
4723
4724 for batch in batches {
4725 writer.write(batch).unwrap();
4726 }
4727 writer.close().unwrap();
4728
4729 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4734 assert_eq!(&row_group_sizes(builder.metadata()), &[6, 1]);
4735
4736 let batches = builder
4737 .with_batch_size(2)
4738 .build()
4739 .unwrap()
4740 .collect::<ArrowResult<Vec<_>>>()
4741 .unwrap();
4742
4743 assert_eq!(batches.len(), 4);
4744 let batch_counts: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
4745 assert_eq!(&batch_counts, &[2, 2, 2, 1]);
4746
4747 let actual = pretty_format_batches(&batches).unwrap().to_string();
4748 assert_eq!(actual, expected);
4749 }
4750
4751 #[test]
4752 fn test_arrow_writer_metadata() {
4753 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
4754 let file_schema = batch_schema.clone().with_metadata(
4755 vec![("foo".to_string(), "bar".to_string())]
4756 .into_iter()
4757 .collect(),
4758 );
4759
4760 let batch = RecordBatch::try_new(
4761 Arc::new(batch_schema),
4762 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4763 )
4764 .unwrap();
4765
4766 let mut buf = Vec::with_capacity(1024);
4767 let mut writer = ArrowWriter::try_new(&mut buf, Arc::new(file_schema), None).unwrap();
4768 writer.write(&batch).unwrap();
4769 writer.close().unwrap();
4770 }
4771
4772 #[test]
4773 fn test_arrow_writer_nullable() {
4774 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
4775 let file_schema = Schema::new(vec![Field::new("int32", DataType::Int32, true)]);
4776 let file_schema = Arc::new(file_schema);
4777
4778 let batch = RecordBatch::try_new(
4779 Arc::new(batch_schema),
4780 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4781 )
4782 .unwrap();
4783
4784 let mut buf = Vec::with_capacity(1024);
4785 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
4786 writer.write(&batch).unwrap();
4787 writer.close().unwrap();
4788
4789 let mut read = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
4790 let back = read.next().unwrap().unwrap();
4791 assert_eq!(back.schema(), file_schema);
4792 assert_ne!(back.schema(), batch.schema());
4793 assert_eq!(back.column(0).as_ref(), batch.column(0).as_ref());
4794 }
4795
4796 #[test]
4797 fn in_progress_accounting() {
4798 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
4800
4801 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
4803
4804 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
4806
4807 let mut writer = ArrowWriter::try_new(vec![], batch.schema(), None).unwrap();
4808
4809 assert_eq!(writer.in_progress_size(), 0);
4811 assert_eq!(writer.in_progress_rows(), 0);
4812 assert_eq!(writer.memory_size(), 0);
4813 assert_eq!(writer.bytes_written(), 4); writer.write(&batch).unwrap();
4815
4816 let initial_size = writer.in_progress_size();
4818 assert!(initial_size > 0);
4819 assert_eq!(writer.in_progress_rows(), 5);
4820 let initial_memory = writer.memory_size();
4821 assert!(initial_memory > 0);
4822 assert!(
4824 initial_size <= initial_memory,
4825 "{initial_size} <= {initial_memory}"
4826 );
4827
4828 writer.write(&batch).unwrap();
4830 assert!(writer.in_progress_size() > initial_size);
4831 assert_eq!(writer.in_progress_rows(), 10);
4832 assert!(writer.memory_size() > initial_memory);
4833 assert!(
4834 writer.in_progress_size() <= writer.memory_size(),
4835 "in_progress_size {} <= memory_size {}",
4836 writer.in_progress_size(),
4837 writer.memory_size()
4838 );
4839
4840 let pre_flush_bytes_written = writer.bytes_written();
4842 writer.flush().unwrap();
4843 assert_eq!(writer.in_progress_size(), 0);
4844 assert_eq!(writer.memory_size(), 0);
4845 assert!(writer.bytes_written() > pre_flush_bytes_written);
4846
4847 writer.close().unwrap();
4848 }
4849
4850 #[test]
4851 fn test_writer_all_null() {
4852 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
4853 let b = Int32Array::new(vec![0; 5].into(), Some(NullBuffer::new_null(5)));
4854 let batch = RecordBatch::try_from_iter(vec![
4855 ("a", Arc::new(a) as ArrayRef),
4856 ("b", Arc::new(b) as ArrayRef),
4857 ])
4858 .unwrap();
4859
4860 let mut buf = Vec::with_capacity(1024);
4861 let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), None).unwrap();
4862 writer.write(&batch).unwrap();
4863 writer.close().unwrap();
4864
4865 let bytes = Bytes::from(buf);
4866 let options = ReadOptionsBuilder::new().with_page_index().build();
4867 let reader = SerializedFileReader::new_with_options(bytes, options).unwrap();
4868 let index = reader.metadata().offset_index().unwrap();
4869
4870 assert_eq!(index.len(), 1);
4871 assert_eq!(index[0].len(), 2); assert_eq!(index[0][0].page_locations().len(), 1); assert_eq!(index[0][1].page_locations().len(), 1); }
4875
4876 #[test]
4877 fn test_disabled_statistics_with_page() {
4878 let file_schema = Schema::new(vec![
4879 Field::new("a", DataType::Utf8, true),
4880 Field::new("b", DataType::Utf8, true),
4881 ]);
4882 let file_schema = Arc::new(file_schema);
4883
4884 let batch = RecordBatch::try_new(
4885 file_schema.clone(),
4886 vec![
4887 Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
4888 Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
4889 ],
4890 )
4891 .unwrap();
4892
4893 let props = WriterProperties::builder()
4894 .set_statistics_enabled(EnabledStatistics::None)
4895 .set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
4896 .build();
4897
4898 let mut buf = Vec::with_capacity(1024);
4899 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
4900 writer.write(&batch).unwrap();
4901
4902 let metadata = writer.close().unwrap();
4903 assert_eq!(metadata.num_row_groups(), 1);
4904 let row_group = metadata.row_group(0);
4905 assert_eq!(row_group.num_columns(), 2);
4906 assert!(row_group.column(0).offset_index_offset().is_some());
4908 assert!(row_group.column(0).column_index_offset().is_some());
4909 assert!(row_group.column(1).offset_index_offset().is_some());
4911 assert!(row_group.column(1).column_index_offset().is_none());
4912
4913 let options = ReadOptionsBuilder::new().with_page_index().build();
4914 let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
4915
4916 let row_group = reader.get_row_group(0).unwrap();
4917 let a_col = row_group.metadata().column(0);
4918 let b_col = row_group.metadata().column(1);
4919
4920 if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
4922 let min = byte_array_stats.min_opt().unwrap();
4923 let max = byte_array_stats.max_opt().unwrap();
4924
4925 assert_eq!(min.as_bytes(), b"a");
4926 assert_eq!(max.as_bytes(), b"d");
4927 } else {
4928 panic!("expecting Statistics::ByteArray");
4929 }
4930
4931 assert!(b_col.statistics().is_none());
4933
4934 let offset_index = reader.metadata().offset_index().unwrap();
4935 assert_eq!(offset_index.len(), 1); assert_eq!(offset_index[0].len(), 2); let column_index = reader.metadata().column_index().unwrap();
4939 assert_eq!(column_index.len(), 1); assert_eq!(column_index[0].len(), 2); let a_idx = &column_index[0][0];
4943 assert!(
4944 matches!(a_idx, ColumnIndexMetaData::BYTE_ARRAY(_)),
4945 "{a_idx:?}"
4946 );
4947 let b_idx = &column_index[0][1];
4948 assert!(matches!(b_idx, ColumnIndexMetaData::NONE), "{b_idx:?}");
4949 }
4950
4951 #[test]
4952 fn test_disabled_statistics_with_chunk() {
4953 let file_schema = Schema::new(vec![
4954 Field::new("a", DataType::Utf8, true),
4955 Field::new("b", DataType::Utf8, true),
4956 ]);
4957 let file_schema = Arc::new(file_schema);
4958
4959 let batch = RecordBatch::try_new(
4960 file_schema.clone(),
4961 vec![
4962 Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
4963 Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
4964 ],
4965 )
4966 .unwrap();
4967
4968 let props = WriterProperties::builder()
4969 .set_statistics_enabled(EnabledStatistics::None)
4970 .set_column_statistics_enabled("a".into(), EnabledStatistics::Chunk)
4971 .build();
4972
4973 let mut buf = Vec::with_capacity(1024);
4974 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
4975 writer.write(&batch).unwrap();
4976
4977 let metadata = writer.close().unwrap();
4978 assert_eq!(metadata.num_row_groups(), 1);
4979 let row_group = metadata.row_group(0);
4980 assert_eq!(row_group.num_columns(), 2);
4981 assert!(row_group.column(0).offset_index_offset().is_some());
4983 assert!(row_group.column(0).column_index_offset().is_none());
4984 assert!(row_group.column(1).offset_index_offset().is_some());
4986 assert!(row_group.column(1).column_index_offset().is_none());
4987
4988 let options = ReadOptionsBuilder::new().with_page_index().build();
4989 let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
4990
4991 let row_group = reader.get_row_group(0).unwrap();
4992 let a_col = row_group.metadata().column(0);
4993 let b_col = row_group.metadata().column(1);
4994
4995 if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
4997 let min = byte_array_stats.min_opt().unwrap();
4998 let max = byte_array_stats.max_opt().unwrap();
4999
5000 assert_eq!(min.as_bytes(), b"a");
5001 assert_eq!(max.as_bytes(), b"d");
5002 } else {
5003 panic!("expecting Statistics::ByteArray");
5004 }
5005
5006 assert!(b_col.statistics().is_none());
5008
5009 let column_index = reader.metadata().column_index().unwrap();
5010 assert_eq!(column_index.len(), 1); assert_eq!(column_index[0].len(), 2); let a_idx = &column_index[0][0];
5014 assert!(matches!(a_idx, ColumnIndexMetaData::NONE), "{a_idx:?}");
5015 let b_idx = &column_index[0][1];
5016 assert!(matches!(b_idx, ColumnIndexMetaData::NONE), "{b_idx:?}");
5017 }
5018
5019 #[test]
5020 fn test_arrow_writer_skip_metadata() {
5021 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
5022 let file_schema = Arc::new(batch_schema.clone());
5023
5024 let batch = RecordBatch::try_new(
5025 Arc::new(batch_schema),
5026 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
5027 )
5028 .unwrap();
5029 let skip_options = ArrowWriterOptions::new().with_skip_arrow_metadata(true);
5030
5031 let mut buf = Vec::with_capacity(1024);
5032 let mut writer =
5033 ArrowWriter::try_new_with_options(&mut buf, file_schema.clone(), skip_options).unwrap();
5034 writer.write(&batch).unwrap();
5035 writer.close().unwrap();
5036
5037 let bytes = Bytes::from(buf);
5038 let reader_builder = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
5039 assert_eq!(file_schema, *reader_builder.schema());
5040 if let Some(key_value_metadata) = reader_builder
5041 .metadata()
5042 .file_metadata()
5043 .key_value_metadata()
5044 {
5045 assert!(
5046 !key_value_metadata
5047 .iter()
5048 .any(|kv| kv.key.as_str() == ARROW_SCHEMA_META_KEY)
5049 );
5050 }
5051 }
5052
5053 #[test]
5054 fn test_arrow_writer_skip_path_in_schema() {
5055 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
5056 let file_schema = Arc::new(batch_schema.clone());
5057
5058 let batch = RecordBatch::try_new(
5059 Arc::new(batch_schema),
5060 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
5061 )
5062 .unwrap();
5063
5064 let skip_options = ArrowWriterOptions::new();
5066
5067 let mut buf = Vec::with_capacity(1024);
5068 let mut writer =
5069 ArrowWriter::try_new_with_options(&mut buf, file_schema.clone(), skip_options).unwrap();
5070 writer.write(&batch).unwrap();
5071 writer.close().unwrap();
5072
5073 let skip_options = ArrowWriterOptions::new().with_properties(
5075 WriterProperties::builder()
5076 .set_write_path_in_schema(false)
5077 .build(),
5078 );
5079
5080 let mut buf2 = Vec::with_capacity(1024);
5081 let mut writer =
5082 ArrowWriter::try_new_with_options(&mut buf2, file_schema.clone(), skip_options)
5083 .unwrap();
5084 writer.write(&batch).unwrap();
5085 writer.close().unwrap();
5086
5087 assert!(buf.len() > buf2.len());
5089 }
5090
5091 #[test]
5092 fn mismatched_schemas() {
5093 let batch_schema = Schema::new(vec![Field::new("count", DataType::Int32, false)]);
5094 let file_schema = Arc::new(Schema::new(vec![Field::new(
5095 "temperature",
5096 DataType::Float64,
5097 false,
5098 )]));
5099
5100 let batch = RecordBatch::try_new(
5101 Arc::new(batch_schema),
5102 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
5103 )
5104 .unwrap();
5105
5106 let mut buf = Vec::with_capacity(1024);
5107 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
5108
5109 let err = writer.write(&batch).unwrap_err().to_string();
5110 assert_eq!(
5111 err,
5112 "Arrow: Incompatible type. Field 'temperature' has type Float64, array has type Int32"
5113 );
5114 }
5115
5116 #[test]
5117 fn test_roundtrip_empty_schema() {
5119 let empty_batch = RecordBatch::try_new_with_options(
5121 Arc::new(Schema::empty()),
5122 vec![],
5123 &RecordBatchOptions::default().with_row_count(Some(0)),
5124 )
5125 .unwrap();
5126
5127 let mut parquet_bytes: Vec<u8> = Vec::new();
5129 let mut writer =
5130 ArrowWriter::try_new(&mut parquet_bytes, empty_batch.schema(), None).unwrap();
5131 writer.write(&empty_batch).unwrap();
5132 writer.close().unwrap();
5133
5134 let bytes = Bytes::from(parquet_bytes);
5136 let reader = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
5137 assert_eq!(reader.schema(), &empty_batch.schema());
5138 let batches: Vec<_> = reader
5139 .build()
5140 .unwrap()
5141 .collect::<ArrowResult<Vec<_>>>()
5142 .unwrap();
5143 assert_eq!(batches.len(), 0);
5144 }
5145
5146 #[test]
5147 fn test_page_stats_not_written_by_default() {
5148 let string_field = Field::new("a", DataType::Utf8, false);
5149 let schema = Schema::new(vec![string_field]);
5150 let raw_string_values = vec!["Blart Versenwald III"];
5151 let string_values = StringArray::from(raw_string_values.clone());
5152 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
5153
5154 let props = WriterProperties::builder()
5155 .set_statistics_enabled(EnabledStatistics::Page)
5156 .set_dictionary_enabled(false)
5157 .set_encoding(Encoding::PLAIN)
5158 .set_compression(crate::basic::Compression::UNCOMPRESSED)
5159 .build();
5160
5161 let file = roundtrip_opts(&batch, props);
5162
5163 let first_page = &file[4..];
5168 let mut prot = ThriftSliceInputProtocol::new(first_page);
5169 let hdr = PageHeader::read_thrift(&mut prot).unwrap();
5170 let stats = hdr.data_page_header.unwrap().statistics;
5171
5172 assert!(stats.is_none());
5173 }
5174
5175 #[test]
5176 fn test_page_stats_when_enabled() {
5177 let string_field = Field::new("a", DataType::Utf8, false);
5178 let schema = Schema::new(vec![string_field]);
5179 let raw_string_values = vec!["Blart Versenwald III", "Andrew Lamb"];
5180 let string_values = StringArray::from(raw_string_values.clone());
5181 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
5182
5183 let props = WriterProperties::builder()
5184 .set_statistics_enabled(EnabledStatistics::Page)
5185 .set_dictionary_enabled(false)
5186 .set_encoding(Encoding::PLAIN)
5187 .set_write_page_header_statistics(true)
5188 .set_compression(crate::basic::Compression::UNCOMPRESSED)
5189 .build();
5190
5191 let file = roundtrip_opts(&batch, props);
5192
5193 let first_page = &file[4..];
5198 let mut prot = ThriftSliceInputProtocol::new(first_page);
5199 let hdr = PageHeader::read_thrift(&mut prot).unwrap();
5200 let stats = hdr.data_page_header.unwrap().statistics;
5201
5202 let stats = stats.unwrap();
5203 assert!(stats.is_max_value_exact.unwrap());
5205 assert!(stats.is_min_value_exact.unwrap());
5206 assert_eq!(stats.max_value.unwrap(), "Blart Versenwald III".as_bytes());
5207 assert_eq!(stats.min_value.unwrap(), "Andrew Lamb".as_bytes());
5208 }
5209
5210 #[test]
5211 fn test_page_stats_truncation() {
5212 let string_field = Field::new("a", DataType::Utf8, false);
5213 let binary_field = Field::new("b", DataType::Binary, false);
5214 let schema = Schema::new(vec![string_field, binary_field]);
5215
5216 let raw_string_values = vec!["Blart Versenwald III"];
5217 let raw_binary_values = [b"Blart Versenwald III".to_vec()];
5218 let raw_binary_value_refs = raw_binary_values
5219 .iter()
5220 .map(|x| x.as_slice())
5221 .collect::<Vec<_>>();
5222
5223 let string_values = StringArray::from(raw_string_values.clone());
5224 let binary_values = BinaryArray::from(raw_binary_value_refs);
5225 let batch = RecordBatch::try_new(
5226 Arc::new(schema),
5227 vec![Arc::new(string_values), Arc::new(binary_values)],
5228 )
5229 .unwrap();
5230
5231 let props = WriterProperties::builder()
5232 .set_statistics_truncate_length(Some(2))
5233 .set_dictionary_enabled(false)
5234 .set_encoding(Encoding::PLAIN)
5235 .set_write_page_header_statistics(true)
5236 .set_compression(crate::basic::Compression::UNCOMPRESSED)
5237 .build();
5238
5239 let file = roundtrip_opts(&batch, props);
5240
5241 let first_page = &file[4..];
5246 let mut prot = ThriftSliceInputProtocol::new(first_page);
5247 let hdr = PageHeader::read_thrift(&mut prot).unwrap();
5248 let stats = hdr.data_page_header.unwrap().statistics;
5249 assert!(stats.is_some());
5250 let stats = stats.unwrap();
5251 assert!(!stats.is_max_value_exact.unwrap());
5253 assert!(!stats.is_min_value_exact.unwrap());
5254 assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
5255 assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
5256
5257 let second_page = &prot.as_slice()[hdr.compressed_page_size as usize..];
5259 let mut prot = ThriftSliceInputProtocol::new(second_page);
5260 let hdr = PageHeader::read_thrift(&mut prot).unwrap();
5261 let stats = hdr.data_page_header.unwrap().statistics;
5262 assert!(stats.is_some());
5263 let stats = stats.unwrap();
5264 assert!(!stats.is_max_value_exact.unwrap());
5266 assert!(!stats.is_min_value_exact.unwrap());
5267 assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
5268 assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
5269 }
5270
5271 #[test]
5272 fn test_page_encoding_statistics_roundtrip() {
5273 let batch_schema = Schema::new(vec![Field::new(
5274 "int32",
5275 arrow_schema::DataType::Int32,
5276 false,
5277 )]);
5278
5279 let batch = RecordBatch::try_new(
5280 Arc::new(batch_schema.clone()),
5281 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
5282 )
5283 .unwrap();
5284
5285 let mut file: File = tempfile::tempfile().unwrap();
5286 let mut writer = ArrowWriter::try_new(&mut file, Arc::new(batch_schema), None).unwrap();
5287 writer.write(&batch).unwrap();
5288 let file_metadata = writer.close().unwrap();
5289
5290 assert_eq!(file_metadata.num_row_groups(), 1);
5291 assert_eq!(file_metadata.row_group(0).num_columns(), 1);
5292 assert!(
5293 file_metadata
5294 .row_group(0)
5295 .column(0)
5296 .page_encoding_stats()
5297 .is_some()
5298 );
5299 let chunk_page_stats = file_metadata
5300 .row_group(0)
5301 .column(0)
5302 .page_encoding_stats()
5303 .unwrap();
5304
5305 let options = ReadOptionsBuilder::new()
5307 .with_page_index()
5308 .with_encoding_stats_as_mask(false)
5309 .build();
5310 let reader = SerializedFileReader::new_with_options(file, options).unwrap();
5311
5312 let rowgroup = reader.get_row_group(0).expect("row group missing");
5313 assert_eq!(rowgroup.num_columns(), 1);
5314 let column = rowgroup.metadata().column(0);
5315 assert!(column.page_encoding_stats().is_some());
5316 let file_page_stats = column.page_encoding_stats().unwrap();
5317 assert_eq!(chunk_page_stats, file_page_stats);
5318 }
5319
5320 #[test]
5321 fn test_different_dict_page_size_limit() {
5322 let array = Arc::new(Int64Array::from_iter(0..1024 * 1024));
5323 let schema = Arc::new(Schema::new(vec![
5324 Field::new("col0", arrow_schema::DataType::Int64, false),
5325 Field::new("col1", arrow_schema::DataType::Int64, false),
5326 ]));
5327 let batch =
5328 arrow_array::RecordBatch::try_new(schema.clone(), vec![array.clone(), array]).unwrap();
5329
5330 let props = WriterProperties::builder()
5331 .set_dictionary_page_size_limit(1024 * 1024)
5332 .set_column_dictionary_page_size_limit(ColumnPath::from("col1"), 1024 * 1024 * 4)
5333 .build();
5334 let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap();
5335 writer.write(&batch).unwrap();
5336 let data = Bytes::from(writer.into_inner().unwrap());
5337
5338 let mut metadata = ParquetMetaDataReader::new();
5339 metadata.try_parse(&data).unwrap();
5340 let metadata = metadata.finish().unwrap();
5341 let col0_meta = metadata.row_group(0).column(0);
5342 let col1_meta = metadata.row_group(0).column(1);
5343
5344 let get_dict_page_size = move |meta: &ColumnChunkMetaData| {
5345 let mut reader =
5346 SerializedPageReader::new(Arc::new(data.clone()), meta, 0, None).unwrap();
5347 let page = reader.get_next_page().unwrap().unwrap();
5348 match page {
5349 Page::DictionaryPage { buf, .. } => buf.len(),
5350 _ => panic!("expected DictionaryPage"),
5351 }
5352 };
5353
5354 assert_eq!(get_dict_page_size(col0_meta), 1024 * 1024);
5355 assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4);
5356 }
5357
5358 #[test]
5359 fn test_arrow_writer_granular_mode_roundtrip() {
5360 let small = "tiny".to_string();
5369 let big = "x".repeat(64 * 1024);
5370 let strings: Vec<String> = (0..256)
5371 .map(|i| {
5372 if i % 16 == 0 {
5373 big.clone()
5374 } else {
5375 small.clone()
5376 }
5377 })
5378 .collect();
5379
5380 let schema = Arc::new(Schema::new(vec![Field::new(
5381 "col",
5382 ArrowDataType::Utf8,
5383 false,
5384 )]));
5385 let batch = RecordBatch::try_new(
5386 schema.clone(),
5387 vec![Arc::new(StringArray::from(strings.clone())) as _],
5388 )
5389 .unwrap();
5390
5391 let props = WriterProperties::builder()
5392 .set_dictionary_enabled(false)
5393 .set_data_page_size_limit(16 * 1024)
5394 .build();
5395 let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap();
5396 writer.write(&batch).unwrap();
5397 let data = Bytes::from(writer.into_inner().unwrap());
5398
5399 let mut reader = ParquetRecordBatchReader::try_new(data, 1024).unwrap();
5400 let read = reader.next().unwrap().unwrap();
5401 assert!(reader.next().is_none(), "expected one batch");
5402 let col = read
5403 .column(0)
5404 .as_any()
5405 .downcast_ref::<StringArray>()
5406 .unwrap();
5407 assert_eq!(col.len(), strings.len());
5408 for (i, expected) in strings.iter().enumerate() {
5409 assert_eq!(
5410 col.value(i),
5411 expected.as_str(),
5412 "value mismatch at index {i}"
5413 );
5414 }
5415 }
5416
5417 #[test]
5418 fn test_arrow_writer_all_null_string_column() {
5419 let num_rows = 1024;
5424 let schema = Arc::new(Schema::new(vec![Field::new(
5425 "col",
5426 ArrowDataType::Utf8,
5427 true,
5428 )]));
5429 let nulls: Vec<Option<&str>> = vec![None; num_rows];
5430 let batch = RecordBatch::try_new(
5431 schema.clone(),
5432 vec![Arc::new(StringArray::from(nulls)) as _],
5433 )
5434 .unwrap();
5435
5436 let props = WriterProperties::builder()
5437 .set_dictionary_enabled(false)
5438 .set_data_page_size_limit(16 * 1024)
5439 .build();
5440 let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap();
5441 writer.write(&batch).unwrap();
5442 let data = Bytes::from(writer.into_inner().unwrap());
5443
5444 let mut metadata = ParquetMetaDataReader::new();
5447 metadata.try_parse(&data).unwrap();
5448 let metadata = metadata.finish().unwrap();
5449 let row_group = metadata.row_group(0);
5450 let col_meta = row_group.column(0);
5451 assert_eq!(row_group.num_rows() as usize, num_rows);
5452 if let Some(stats) = col_meta.statistics() {
5455 assert_eq!(
5456 stats.null_count_opt().unwrap_or(0) as usize,
5457 num_rows,
5458 "expected all-null column to report null_count = num_rows"
5459 );
5460 }
5461
5462 let mut reader =
5463 SerializedPageReader::new(Arc::new(data.clone()), col_meta, num_rows, None).unwrap();
5464 let mut total_values = 0u32;
5465 while let Some(page) = reader.get_next_page().unwrap() {
5466 if matches!(page, Page::DataPage { .. } | Page::DataPageV2 { .. }) {
5467 total_values += page.num_values();
5468 }
5469 }
5470 assert_eq!(
5471 total_values as usize, num_rows,
5472 "expected every level position to be represented in some page"
5473 );
5474 }
5475
5476 struct WriteBatchesShape {
5477 num_batches: usize,
5478 rows_per_batch: usize,
5479 row_size: usize,
5480 }
5481
5482 fn write_batches(
5484 WriteBatchesShape {
5485 num_batches,
5486 rows_per_batch,
5487 row_size,
5488 }: WriteBatchesShape,
5489 props: WriterProperties,
5490 ) -> ParquetRecordBatchReaderBuilder<File> {
5491 let schema = Arc::new(Schema::new(vec![Field::new(
5492 "str",
5493 ArrowDataType::Utf8,
5494 false,
5495 )]));
5496 let file = tempfile::tempfile().unwrap();
5497 let mut writer =
5498 ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
5499
5500 for batch_idx in 0..num_batches {
5501 let strings: Vec<String> = (0..rows_per_batch)
5502 .map(|i| format!("{:0>width$}", batch_idx * 10 + i, width = row_size))
5503 .collect();
5504 let array = StringArray::from(strings);
5505 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
5506 writer.write(&batch).unwrap();
5507 }
5508 writer.close().unwrap();
5509 ParquetRecordBatchReaderBuilder::try_new(file).unwrap()
5510 }
5511
5512 #[test]
5513 fn test_row_group_limit_none_writes_single_row_group() {
5515 let props = WriterProperties::builder()
5516 .set_max_row_group_row_count(None)
5517 .set_max_row_group_bytes(None)
5518 .build();
5519
5520 let builder = write_batches(
5521 WriteBatchesShape {
5522 num_batches: 1,
5523 rows_per_batch: 1000,
5524 row_size: 4,
5525 },
5526 props,
5527 );
5528
5529 assert_eq!(
5530 &row_group_sizes(builder.metadata()),
5531 &[1000],
5532 "With no limits, all rows should be in a single row group"
5533 );
5534 }
5535
5536 #[test]
5537 fn test_row_group_limit_rows_only() {
5539 let props = WriterProperties::builder()
5540 .set_max_row_group_row_count(Some(300))
5541 .set_max_row_group_bytes(None)
5542 .build();
5543
5544 let builder = write_batches(
5545 WriteBatchesShape {
5546 num_batches: 1,
5547 rows_per_batch: 1000,
5548 row_size: 4,
5549 },
5550 props,
5551 );
5552
5553 assert_eq!(
5554 &row_group_sizes(builder.metadata()),
5555 &[300, 300, 300, 100],
5556 "Row groups should be split by row count"
5557 );
5558 }
5559
5560 #[test]
5561 fn test_row_group_limit_bytes_only() {
5563 let props = WriterProperties::builder()
5564 .set_max_row_group_row_count(None)
5565 .set_max_row_group_bytes(Some(3500))
5567 .build();
5568
5569 let builder = write_batches(
5570 WriteBatchesShape {
5571 num_batches: 10,
5572 rows_per_batch: 10,
5573 row_size: 100,
5574 },
5575 props,
5576 );
5577
5578 let sizes = row_group_sizes(builder.metadata());
5579
5580 assert!(
5581 sizes.len() > 1,
5582 "Should have multiple row groups due to byte limit, got {sizes:?}",
5583 );
5584
5585 let total_rows: i64 = sizes.iter().sum();
5586 assert_eq!(total_rows, 100, "Total rows should be preserved");
5587 }
5588
5589 #[test]
5590 fn test_row_group_limit_bytes_flushes_when_current_group_already_too_large() {
5592 let schema = Arc::new(Schema::new(vec![Field::new(
5593 "str",
5594 ArrowDataType::Utf8,
5595 false,
5596 )]));
5597 let file = tempfile::tempfile().unwrap();
5598
5599 let props = WriterProperties::builder()
5601 .set_max_row_group_row_count(None)
5602 .set_max_row_group_bytes(None)
5603 .build();
5604 let mut writer =
5605 ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
5606
5607 let first_array = StringArray::from(
5608 (0..10)
5609 .map(|i| format!("{:0>100}", i))
5610 .collect::<Vec<String>>(),
5611 );
5612 let first_batch =
5613 RecordBatch::try_new(schema.clone(), vec![Arc::new(first_array)]).unwrap();
5614 writer.write(&first_batch).unwrap();
5615 assert_eq!(writer.in_progress_rows(), 10);
5616
5617 writer.max_row_group_bytes = Some(1);
5620
5621 let second_array = StringArray::from(vec!["x".to_string()]);
5622 let second_batch =
5623 RecordBatch::try_new(schema.clone(), vec![Arc::new(second_array)]).unwrap();
5624 writer.write(&second_batch).unwrap();
5625 writer.close().unwrap();
5626 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
5627
5628 assert_eq!(
5629 &row_group_sizes(builder.metadata()),
5630 &[10, 1],
5631 "The second write should flush an oversized in-progress row group first",
5632 );
5633 }
5634
5635 #[test]
5636 fn test_row_group_limit_both_row_wins_single_batch() {
5638 let props = WriterProperties::builder()
5639 .set_max_row_group_row_count(Some(200)) .set_max_row_group_bytes(Some(1024 * 1024)) .build();
5642
5643 let builder = write_batches(
5644 WriteBatchesShape {
5645 num_batches: 1,
5646 row_size: 4,
5647 rows_per_batch: 1000,
5648 },
5649 props,
5650 );
5651
5652 assert_eq!(
5653 &row_group_sizes(builder.metadata()),
5654 &[200, 200, 200, 200, 200],
5655 "Row limit should trigger before byte limit"
5656 );
5657 }
5658
5659 #[test]
5660 fn test_row_group_limit_both_row_wins_multiple_batches() {
5662 let props = WriterProperties::builder()
5663 .set_max_row_group_row_count(Some(5)) .set_max_row_group_bytes(Some(9999)) .build();
5666
5667 let builder = write_batches(
5668 WriteBatchesShape {
5669 num_batches: 10,
5670 rows_per_batch: 10,
5671 row_size: 100,
5672 },
5673 props,
5674 );
5675
5676 assert_eq!(
5677 &row_group_sizes(builder.metadata()),
5678 &[5; 20],
5679 "Row limit should trigger before byte limit"
5680 );
5681 }
5682
5683 #[test]
5684 fn test_row_group_limit_both_bytes_wins() {
5686 let props = WriterProperties::builder()
5687 .set_max_row_group_row_count(Some(1000)) .set_max_row_group_bytes(Some(3500)) .build();
5690
5691 let builder = write_batches(
5692 WriteBatchesShape {
5693 num_batches: 10,
5694 rows_per_batch: 10,
5695 row_size: 100,
5696 },
5697 props,
5698 );
5699
5700 let sizes = row_group_sizes(builder.metadata());
5701
5702 assert!(
5703 sizes.len() > 1,
5704 "Byte limit should trigger before row limit, got {sizes:?}",
5705 );
5706
5707 assert!(
5708 sizes.iter().all(|&s| s < 1000),
5709 "No row group should hit the row limit"
5710 );
5711
5712 let total_rows: i64 = sizes.iter().sum();
5713 assert_eq!(total_rows, 100, "Total rows should be preserved");
5714 }
5715
5716 #[test]
5717 fn arrow_column_chunk_close_mut_drops_column_index() {
5718 use crate::arrow::ArrowSchemaConverter;
5719 use crate::file::writer::SerializedFileWriter;
5720
5721 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)]));
5722 let props = Arc::new(
5723 WriterProperties::builder()
5724 .set_statistics_enabled(EnabledStatistics::Page)
5725 .build(),
5726 );
5727 let parquet_schema = ArrowSchemaConverter::new()
5728 .with_coerce_types(props.coerce_types())
5729 .convert(&schema)
5730 .unwrap();
5731
5732 let mut buf = Vec::with_capacity(1024);
5733 let mut writer =
5734 SerializedFileWriter::new(&mut buf, parquet_schema.root_schema_ptr(), props.clone())
5735 .unwrap();
5736
5737 let factory = ArrowRowGroupWriterFactory::new(&writer, Arc::clone(&schema));
5738 let mut col_writers = factory.create_column_writers(0).unwrap();
5739 let arr: ArrayRef = Arc::new(Int32Array::from_iter_values(0..64));
5740 for leaves in compute_leaves(schema.field(0), &arr).unwrap() {
5741 col_writers[0].write(&leaves).unwrap();
5742 }
5743 let mut chunk = col_writers.pop().unwrap().close().unwrap();
5744
5745 assert!(
5747 chunk.close().column_index.is_some(),
5748 "EnabledStatistics::Page should produce a column_index"
5749 );
5750
5751 chunk.close_mut().column_index = None;
5753 assert!(chunk.close().column_index.is_none());
5754
5755 let mut rg = writer.next_row_group().unwrap();
5756 chunk.append_to_row_group(&mut rg).unwrap();
5757 rg.close().unwrap();
5758 let file_meta = writer.close().unwrap();
5759
5760 let cc = file_meta.row_group(0).column(0);
5763 assert!(cc.column_index_range().is_none());
5764 }
5765
5766 fn write_column_to_bytes(array: ArrayRef) -> Bytes {
5768 let schema = Arc::new(Schema::new(vec![Field::new(
5769 "col",
5770 array.data_type().clone(),
5771 true,
5772 )]));
5773 let buf = get_bytes_after_close(
5774 schema.clone(),
5775 &RecordBatch::try_new(schema, vec![array]).unwrap(),
5776 );
5777 Bytes::from(buf)
5778 }
5779
5780 fn read_column_with_schema(bytes: Bytes, schema: SchemaRef) -> ArrayRef {
5784 let opts = crate::arrow::arrow_reader::ArrowReaderOptions::new().with_schema(schema);
5785 ParquetRecordBatchReaderBuilder::try_new_with_options(bytes, opts)
5786 .unwrap()
5787 .build()
5788 .unwrap()
5789 .next()
5790 .unwrap()
5791 .unwrap()
5792 .column(0)
5793 .clone()
5794 }
5795
5796 fn ree_write_read_roundtrip(ree: ArrayRef, flat: ArrayRef) {
5797 let flat_schema = Arc::new(Schema::new(vec![Field::new(
5798 "col",
5799 flat.data_type().clone(),
5800 true,
5801 )]));
5802 let ree_bytes = write_column_to_bytes(ree);
5803 let flat_bytes = write_column_to_bytes(flat.clone());
5804 assert_eq!(
5805 ree_bytes, flat_bytes,
5806 "REE and flat bytes should be identical"
5807 );
5808
5809 let decoded_ree = read_column_with_schema(ree_bytes, flat_schema.clone());
5810 let decoded_flat = read_column_with_schema(flat_bytes, flat_schema);
5811
5812 assert_eq!(decoded_ree.as_ref(), flat.as_ref());
5813 assert_eq!(decoded_ree.as_ref(), decoded_flat.as_ref());
5814 }
5815
5816 #[test]
5817 fn ree_string() {
5818 let ree: ArrayRef = Arc::new(
5819 [Some("a"), Some("a"), None, Some("b"), Some("b")]
5820 .into_iter()
5821 .collect::<Int32RunArray>(),
5822 );
5823 let flat: ArrayRef = Arc::new(StringArray::from(vec![
5824 Some("a"),
5825 Some("a"),
5826 None,
5827 Some("b"),
5828 Some("b"),
5829 ]));
5830 ree_write_read_roundtrip(ree, flat);
5831 }
5832
5833 #[test]
5834 fn ree_int32() {
5835 let mut b = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
5836 for v in [Some(1), Some(1), None, Some(2), Some(2)] {
5837 b.append_option(v);
5838 }
5839 let ree: ArrayRef = Arc::new(b.finish());
5840 let flat: ArrayRef = Arc::new(Int32Array::from(vec![
5841 Some(1),
5842 Some(1),
5843 None,
5844 Some(2),
5845 Some(2),
5846 ]));
5847 ree_write_read_roundtrip(ree, flat);
5848 }
5849
5850 #[test]
5851 fn ree_bool() {
5852 let ree: ArrayRef = Arc::new(
5854 RunArray::try_new(
5855 &Int32Array::from(vec![3, 5, 7]),
5856 &BooleanArray::from(vec![Some(true), None, Some(false)]),
5857 )
5858 .unwrap(),
5859 );
5860 let flat: ArrayRef = Arc::new(BooleanArray::from(vec![
5861 Some(true),
5862 Some(true),
5863 Some(true),
5864 None,
5865 None,
5866 Some(false),
5867 Some(false),
5868 ]));
5869 ree_write_read_roundtrip(ree, flat);
5870 }
5871
5872 #[test]
5873 fn ree_fixed_size_binary() {
5874 let mk = |vals: &[Option<&[u8]>]| -> FixedSizeBinaryArray {
5875 let mut b = FixedSizeBinaryBuilder::new(2);
5876 for v in vals {
5877 match v {
5878 Some(x) => b.append_value(x).unwrap(),
5879 None => b.append_null(),
5880 }
5881 }
5882 b.finish()
5883 };
5884 let ree: ArrayRef = Arc::new(
5886 RunArray::try_new(
5887 &Int32Array::from(vec![2, 4, 6]),
5888 &mk(&[Some(b"aa"), None, Some(b"bb")]),
5889 )
5890 .unwrap(),
5891 );
5892 let flat: ArrayRef = Arc::new(mk(&[
5893 Some(b"aa"),
5894 Some(b"aa"),
5895 None,
5896 None,
5897 Some(b"bb"),
5898 Some(b"bb"),
5899 ]));
5900 ree_write_read_roundtrip(ree, flat);
5901 }
5902
5903 #[test]
5904 fn ree_single_run() {
5905 let ree: ArrayRef = Arc::new(["x", "x", "x"].into_iter().collect::<Int32RunArray>());
5906 let flat: ArrayRef = Arc::new(StringArray::from(vec!["x", "x", "x"]));
5907 ree_write_read_roundtrip(ree, flat);
5908 }
5909
5910 #[test]
5911 fn ree_float32() {
5912 let ree: ArrayRef = Arc::new(
5914 RunArray::try_new(
5915 &Int32Array::from(vec![2, 4, 5]),
5916 &Float32Array::from(vec![Some(1.0_f32), None, Some(2.5_f32)]),
5917 )
5918 .unwrap(),
5919 );
5920 let flat: ArrayRef = Arc::new(Float32Array::from(vec![
5921 Some(1.0_f32),
5922 Some(1.0_f32),
5923 None,
5924 None,
5925 Some(2.5_f32),
5926 ]));
5927 ree_write_read_roundtrip(ree, flat);
5928 }
5929
5930 #[test]
5931 fn ree_sliced() {
5932 let full: ArrayRef = Arc::new(
5937 RunArray::try_new(
5938 &Int32Array::from(vec![3, 5, 7]),
5939 &StringArray::from(vec!["a", "b", "c"]),
5940 )
5941 .unwrap(),
5942 );
5943 let sliced = full.slice(2, 5);
5944 let flat: ArrayRef = Arc::new(StringArray::from(vec!["a", "b", "b", "c", "c"]));
5945 ree_write_read_roundtrip(sliced, flat);
5946 }
5947
5948 #[test]
5949 fn ree_struct_with_ree_child() {
5950 let run_ends = Int32Array::from(vec![2i32, 3, 5]);
5953
5954 let col_a: ArrayRef = Arc::new(
5955 RunArray::try_new(
5956 &run_ends,
5957 &StringArray::from(vec![Some("foo"), None, Some("bar")]),
5958 )
5959 .unwrap(),
5960 );
5961 let col_b: ArrayRef = Arc::new(
5962 RunArray::try_new(&run_ends, &Int32Array::from(vec![Some(1), None, Some(2)])).unwrap(),
5963 );
5964
5965 let struct_array: ArrayRef = Arc::new(StructArray::new(
5966 Fields::from(vec![
5967 Field::new("a", col_a.data_type().clone(), true),
5968 Field::new("b", col_b.data_type().clone(), true),
5969 ]),
5970 vec![col_a, col_b],
5971 None,
5972 ));
5973
5974 let schema = Arc::new(Schema::new(vec![Field::new(
5975 "row",
5976 struct_array.data_type().clone(),
5977 true,
5978 )]));
5979 let batch = RecordBatch::try_new(schema.clone(), vec![struct_array]).unwrap();
5980
5981 let mut buf = Vec::new();
5982 let mut writer = ArrowWriter::try_new(&mut buf, schema, None).unwrap();
5983 writer.write(&batch).unwrap();
5984 let metadata = writer.close().unwrap();
5985
5986 let parquet_schema = metadata.file_metadata().schema_descr();
5987 assert_eq!(parquet_schema.num_columns(), 2);
5988 assert_eq!(
5989 parquet_schema.column(0).physical_type(),
5990 crate::basic::Type::BYTE_ARRAY
5991 );
5992 assert_eq!(parquet_schema.column(0).path().string(), "row.a");
5993 assert_eq!(
5994 parquet_schema.column(1).physical_type(),
5995 crate::basic::Type::INT32
5996 );
5997 assert_eq!(parquet_schema.column(1).path().string(), "row.b");
5998 }
5999}