1use crate::column::chunker::ContentDefinedChunker;
21
22use bytes::Bytes;
23use std::io::{Read, Write};
24use std::slice::Iter;
25use std::sync::{Arc, Mutex};
26use std::vec::IntoIter;
27
28use arrow_array::cast::AsArray;
29use arrow_array::types::*;
30use arrow_array::{ArrayRef, Int32Array, RecordBatch, RecordBatchWriter};
31use arrow_schema::{
32 ArrowError, DataType as ArrowDataType, Field, IntervalUnit, SchemaRef, TimeUnit,
33};
34
35use super::schema::{add_encoded_arrow_schema_to_metadata, decimal_length_from_precision};
36
37use crate::arrow::ArrowSchemaConverter;
38use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder;
39use crate::basic::PageType;
40use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter};
41use crate::column::page_encryption::PageEncryptor;
42use crate::column::writer::encoder::ColumnValueEncoder;
43use crate::column::writer::{
44 ColumnCloseResult, ColumnWriter, GenericColumnWriter, get_column_writer,
45};
46use crate::data_type::{ByteArray, FixedLenByteArray};
47#[cfg(feature = "encryption")]
48use crate::encryption::encrypt::FileEncryptor;
49use crate::errors::{ParquetError, Result};
50use crate::file::metadata::{KeyValue, ParquetMetaData, RowGroupMetaData};
51use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
52use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
53use crate::parquet_thrift::{ThriftCompactOutputProtocol, WriteThrift};
54use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor};
55use levels::{ArrayLevels, calculate_array_levels};
56
57mod byte_array;
58mod levels;
59
60#[doc(inline)]
61pub use crate::column::page_store::{
62 InMemoryPageStore, InMemoryPageStoreFactory, PageKey, PageStore, PageStoreArgs,
63 PageStoreFactory,
64};
65
66pub struct ArrowWriter<W: Write> {
183 writer: SerializedFileWriter<W>,
185
186 in_progress: Option<ArrowRowGroupWriter>,
188
189 arrow_schema: SchemaRef,
193
194 row_group_writer_factory: ArrowRowGroupWriterFactory,
196
197 max_row_group_row_count: Option<usize>,
199
200 max_row_group_bytes: Option<usize>,
202
203 cdc_chunkers: Option<Vec<ContentDefinedChunker>>,
205}
206
207impl<W: Write + Send> std::fmt::Debug for ArrowWriter<W> {
208 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
209 let buffered_memory = self.in_progress_size();
210 f.debug_struct("ArrowWriter")
211 .field("writer", &self.writer)
212 .field("in_progress_size", &format_args!("{buffered_memory} bytes"))
213 .field("in_progress_rows", &self.in_progress_rows())
214 .field("arrow_schema", &self.arrow_schema)
215 .field("max_row_group_row_count", &self.max_row_group_row_count)
216 .field("max_row_group_bytes", &self.max_row_group_bytes)
217 .finish()
218 }
219}
220
221impl<W: Write + Send> ArrowWriter<W> {
222 pub fn try_new(
228 writer: W,
229 arrow_schema: SchemaRef,
230 props: Option<WriterProperties>,
231 ) -> Result<Self> {
232 let options = ArrowWriterOptions::new().with_properties(props.unwrap_or_default());
233 Self::try_new_with_options(writer, arrow_schema, options)
234 }
235
236 pub fn try_new_with_options(
242 writer: W,
243 arrow_schema: SchemaRef,
244 options: ArrowWriterOptions,
245 ) -> Result<Self> {
246 let mut props = options.properties;
247
248 let schema = if let Some(parquet_schema) = options.schema_descr {
249 parquet_schema.clone()
250 } else {
251 let mut converter = ArrowSchemaConverter::new().with_coerce_types(props.coerce_types());
252 if let Some(schema_root) = &options.schema_root {
253 converter = converter.schema_root(schema_root);
254 }
255
256 converter.convert(&arrow_schema)?
257 };
258
259 if !options.skip_arrow_metadata {
260 add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
262 }
263
264 let max_row_group_row_count = props.max_row_group_row_count();
265 let max_row_group_bytes = props.max_row_group_bytes();
266
267 let props_ptr = Arc::new(props);
268 let file_writer =
269 SerializedFileWriter::new(writer, schema.root_schema_ptr(), Arc::clone(&props_ptr))?;
270
271 let mut row_group_writer_factory =
272 ArrowRowGroupWriterFactory::new(&file_writer, arrow_schema.clone());
273 if let Some(page_store_factory) = options.page_store_factory {
274 row_group_writer_factory =
275 row_group_writer_factory.with_page_store_factory(page_store_factory);
276 }
277
278 let cdc_chunkers = props_ptr
279 .content_defined_chunking()
280 .map(|opts| {
281 file_writer
282 .schema_descr()
283 .columns()
284 .iter()
285 .map(|desc| ContentDefinedChunker::new(desc, opts))
286 .collect::<Result<Vec<_>>>()
287 })
288 .transpose()?;
289
290 Ok(Self {
291 writer: file_writer,
292 in_progress: None,
293 arrow_schema,
294 row_group_writer_factory,
295 max_row_group_row_count,
296 max_row_group_bytes,
297 cdc_chunkers,
298 })
299 }
300
301 pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
303 self.writer.flushed_row_groups()
304 }
305
306 pub fn memory_size(&self) -> usize {
311 match &self.in_progress {
312 Some(in_progress) => in_progress.writers.iter().map(|x| x.memory_size()).sum(),
313 None => 0,
314 }
315 }
316
317 pub fn in_progress_size(&self) -> usize {
324 match &self.in_progress {
325 Some(in_progress) => in_progress
326 .writers
327 .iter()
328 .map(|x| x.get_estimated_total_bytes())
329 .sum(),
330 None => 0,
331 }
332 }
333
334 pub fn in_progress_rows(&self) -> usize {
336 self.in_progress
337 .as_ref()
338 .map(|x| x.buffered_rows)
339 .unwrap_or_default()
340 }
341
342 pub fn bytes_written(&self) -> usize {
344 self.writer.bytes_written()
345 }
346
347 pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
359 if batch.num_rows() == 0 {
360 return Ok(());
361 }
362
363 let in_progress = match &mut self.in_progress {
364 Some(in_progress) => in_progress,
365 x => x.insert(
366 self.row_group_writer_factory
367 .create_row_group_writer(self.writer.flushed_row_groups().len())?,
368 ),
369 };
370
371 if let Some(max_rows) = self.max_row_group_row_count {
372 if in_progress.buffered_rows + batch.num_rows() > max_rows {
373 let to_write = max_rows - in_progress.buffered_rows;
374 let a = batch.slice(0, to_write);
375 let b = batch.slice(to_write, batch.num_rows() - to_write);
376 self.write(&a)?;
377 return self.write(&b);
378 }
379 }
380
381 if let Some(max_bytes) = self.max_row_group_bytes {
384 if in_progress.buffered_rows > 0 {
385 let current_bytes = in_progress.get_estimated_total_bytes();
386
387 if current_bytes >= max_bytes {
388 self.flush()?;
389 return self.write(batch);
390 }
391
392 let avg_row_bytes = current_bytes / in_progress.buffered_rows;
393 if avg_row_bytes > 0 {
394 let remaining_bytes = max_bytes - current_bytes;
396 let rows_that_fit = remaining_bytes / avg_row_bytes;
397
398 if batch.num_rows() > rows_that_fit {
399 if rows_that_fit > 0 {
400 let a = batch.slice(0, rows_that_fit);
401 let b = batch.slice(rows_that_fit, batch.num_rows() - rows_that_fit);
402 self.write(&a)?;
403 return self.write(&b);
404 } else {
405 self.flush()?;
406 return self.write(batch);
407 }
408 }
409 }
410 }
411 }
412
413 match self.cdc_chunkers.as_mut() {
414 Some(chunkers) => in_progress.write_with_chunkers(batch, chunkers)?,
415 None => in_progress.write(batch)?,
416 }
417
418 let should_flush = self
419 .max_row_group_row_count
420 .is_some_and(|max| in_progress.buffered_rows >= max)
421 || self
422 .max_row_group_bytes
423 .is_some_and(|max| in_progress.get_estimated_total_bytes() >= max);
424
425 if should_flush {
426 self.flush()?
427 }
428 Ok(())
429 }
430
431 pub fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
436 self.writer.write_all(buf)
437 }
438
439 pub fn sync(&mut self) -> std::io::Result<()> {
441 self.writer.flush()
442 }
443
444 pub fn flush(&mut self) -> Result<()> {
449 let in_progress = match self.in_progress.take() {
450 Some(in_progress) => in_progress,
451 None => return Ok(()),
452 };
453
454 let mut row_group_writer = self.writer.next_row_group()?;
455 for chunk in in_progress.close()? {
456 chunk.append_to_row_group(&mut row_group_writer)?;
457 }
458 row_group_writer.close()?;
459 Ok(())
460 }
461
462 pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
466 self.writer.append_key_value_metadata(kv_metadata)
467 }
468
469 pub fn inner(&self) -> &W {
471 self.writer.inner()
472 }
473
474 pub fn inner_mut(&mut self) -> &mut W {
483 self.writer.inner_mut()
484 }
485
486 pub fn into_inner(mut self) -> Result<W> {
488 self.flush()?;
489 self.writer.into_inner()
490 }
491
492 pub fn finish(&mut self) -> Result<ParquetMetaData> {
498 self.flush()?;
499 self.writer.finish()
500 }
501
502 pub fn close(mut self) -> Result<ParquetMetaData> {
504 self.finish()
505 }
506
507 #[deprecated(
509 since = "56.2.0",
510 note = "Use `ArrowRowGroupWriterFactory` instead, see `ArrowColumnWriter` for an example"
511 )]
512 pub fn get_column_writers(&mut self) -> Result<Vec<ArrowColumnWriter>> {
513 self.flush()?;
514 let in_progress = self
515 .row_group_writer_factory
516 .create_row_group_writer(self.writer.flushed_row_groups().len())?;
517 Ok(in_progress.writers)
518 }
519
520 #[deprecated(
522 since = "56.2.0",
523 note = "Use `SerializedFileWriter` directly instead, see `ArrowColumnWriter` for an example"
524 )]
525 pub fn append_row_group(&mut self, chunks: Vec<ArrowColumnChunk>) -> Result<()> {
526 let mut row_group_writer = self.writer.next_row_group()?;
527 for chunk in chunks {
528 chunk.append_to_row_group(&mut row_group_writer)?;
529 }
530 row_group_writer.close()?;
531 Ok(())
532 }
533
534 pub fn into_serialized_writer(
541 mut self,
542 ) -> Result<(SerializedFileWriter<W>, ArrowRowGroupWriterFactory)> {
543 self.flush()?;
544 Ok((self.writer, self.row_group_writer_factory))
545 }
546}
547
548impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> {
549 fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
550 self.write(batch).map_err(|e| e.into())
551 }
552
553 fn close(self) -> std::result::Result<(), ArrowError> {
554 self.close()?;
555 Ok(())
556 }
557}
558
559#[derive(Debug, Clone, Default)]
563pub struct ArrowWriterOptions {
564 properties: WriterProperties,
565 skip_arrow_metadata: bool,
566 schema_root: Option<String>,
567 schema_descr: Option<SchemaDescriptor>,
568 page_store_factory: Option<Arc<dyn PageStoreFactory>>,
569}
570
571impl ArrowWriterOptions {
572 pub fn new() -> Self {
574 Self::default()
575 }
576
577 pub fn with_properties(self, properties: WriterProperties) -> Self {
579 Self { properties, ..self }
580 }
581
582 pub fn with_page_store_factory(self, page_store_factory: Arc<dyn PageStoreFactory>) -> Self {
660 Self {
661 page_store_factory: Some(page_store_factory),
662 ..self
663 }
664 }
665
666 pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
673 Self {
674 skip_arrow_metadata,
675 ..self
676 }
677 }
678
679 pub fn with_schema_root(self, schema_root: String) -> Self {
681 Self {
682 schema_root: Some(schema_root),
683 ..self
684 }
685 }
686
687 pub fn with_parquet_schema(self, schema_descr: SchemaDescriptor) -> Self {
693 Self {
694 schema_descr: Some(schema_descr),
695 ..self
696 }
697 }
698}
699
700struct ArrowColumnChunkData {
706 length: usize,
707 store: Box<dyn PageStore>,
708 keys: Vec<PageKey>,
709 dictionary_keys: Vec<PageKey>,
720 dictionary_len: usize,
724}
725
726impl ArrowColumnChunkData {
727 fn new(store: Box<dyn PageStore>) -> Self {
728 Self {
729 length: 0,
730 store,
731 keys: Vec::new(),
732 dictionary_keys: Vec::new(),
733 dictionary_len: 0,
734 }
735 }
736
737 fn push(&mut self, value: Bytes) -> Result<()> {
740 let key = self.store.put(value)?;
741 self.keys.push(key);
742 Ok(())
743 }
744
745 fn push_dictionary(&mut self, value: Bytes) -> Result<()> {
749 self.dictionary_len += value.len();
750 let key = self.store.put(value)?;
751 self.dictionary_keys.push(key);
752 Ok(())
753 }
754
755 fn memory_size(&self) -> usize {
758 self.store.memory_size()
759 }
760}
761
762struct StreamingColumnChunkReader {
771 store: Box<dyn PageStore>,
772 keys: IntoIter<PageKey>,
775 current: Bytes,
777}
778
779impl StreamingColumnChunkReader {
780 fn new(data: ArrowColumnChunkData) -> Self {
781 let keys = if data.dictionary_keys.is_empty() {
784 data.keys
785 } else {
786 let mut keys = Vec::with_capacity(data.dictionary_keys.len() + data.keys.len());
787 keys.extend(data.dictionary_keys);
788 keys.extend(data.keys);
789 keys
790 };
791 Self {
792 store: data.store,
793 keys: keys.into_iter(),
794 current: Bytes::new(),
795 }
796 }
797}
798
799impl Read for StreamingColumnChunkReader {
800 fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
801 while self.current.is_empty() {
804 if let Some(key) = self.keys.next() {
805 self.current = self.store.take(key).map_err(std::io::Error::other)?;
806 } else {
807 return Ok(0);
808 }
809 }
810
811 let len = self.current.len().min(out.len());
812 let b = self.current.split_to(len);
813 out[..len].copy_from_slice(&b);
814 Ok(len)
815 }
816}
817
818type SharedColumnChunk = Arc<Mutex<ArrowColumnChunkData>>;
823
824struct ArrowPageWriter {
825 buffer: SharedColumnChunk,
826 #[cfg(feature = "encryption")]
827 page_encryptor: Option<PageEncryptor>,
828}
829
830impl ArrowPageWriter {
831 fn new(store: Box<dyn PageStore>) -> Self {
833 Self {
834 buffer: Arc::new(Mutex::new(ArrowColumnChunkData::new(store))),
835 #[cfg(feature = "encryption")]
836 page_encryptor: None,
837 }
838 }
839
840 #[cfg(feature = "encryption")]
841 pub fn with_encryptor(mut self, page_encryptor: Option<PageEncryptor>) -> Self {
842 self.page_encryptor = page_encryptor;
843 self
844 }
845
846 #[cfg(feature = "encryption")]
847 fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
848 self.page_encryptor.as_mut()
849 }
850
851 #[cfg(not(feature = "encryption"))]
852 fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
853 None
854 }
855}
856
857impl PageWriter for ArrowPageWriter {
858 fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
859 let page = match self.page_encryptor_mut() {
860 Some(page_encryptor) => page_encryptor.encrypt_compressed_page(page)?,
861 None => page,
862 };
863
864 let page_header = page.to_thrift_header()?;
865 let header = {
866 let mut header = Vec::with_capacity(1024);
867
868 match self.page_encryptor_mut() {
869 Some(page_encryptor) => {
870 page_encryptor.encrypt_page_header(&page_header, &mut header)?;
871 if page.compressed_page().is_data_page() {
872 page_encryptor.increment_page();
873 }
874 }
875 None => {
876 let mut protocol = ThriftCompactOutputProtocol::new(&mut header);
877 page_header.write_thrift(&mut protocol)?;
878 }
879 };
880
881 Bytes::from(header)
882 };
883
884 let mut buf = self.buffer.try_lock().unwrap();
885
886 let data = page.compressed_page().buffer().clone();
887 let compressed_size = data.len() + header.len();
888
889 let mut spec = PageWriteSpec::new();
890 spec.page_type = page.page_type();
891 spec.num_values = page.num_values();
892 spec.uncompressed_size = page.uncompressed_size() + header.len();
893 spec.offset = buf.length as u64;
894 spec.compressed_size = compressed_size;
895 spec.bytes_written = compressed_size as u64;
896
897 buf.length += compressed_size;
898 if spec.page_type == PageType::DICTIONARY_PAGE {
899 buf.push_dictionary(header)?;
902 buf.push_dictionary(data)?;
903 } else {
904 buf.push(header)?;
905 buf.push(data)?;
906 }
907
908 Ok(spec)
909 }
910
911 fn defers_dictionary_ordering(&self) -> bool {
912 true
917 }
918
919 fn buffered_memory_size(&self) -> usize {
920 self.buffer.try_lock().unwrap().memory_size()
923 }
924
925 fn close(&mut self) -> Result<()> {
926 Ok(())
927 }
928}
929
930#[derive(Debug)]
932pub struct ArrowLeafColumn(ArrayLevels);
933
934pub fn compute_leaves(field: &Field, array: &ArrayRef) -> Result<Vec<ArrowLeafColumn>> {
939 let levels = calculate_array_levels(array, field)?;
940 Ok(levels.into_iter().map(ArrowLeafColumn).collect())
941}
942
943pub struct ArrowColumnChunk {
945 data: ArrowColumnChunkData,
946 close: ColumnCloseResult,
947}
948
949impl std::fmt::Debug for ArrowColumnChunk {
950 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
951 f.debug_struct("ArrowColumnChunk")
952 .field("length", &self.data.length)
953 .finish_non_exhaustive()
954 }
955}
956
957impl ArrowColumnChunk {
958 pub fn close(&self) -> &ColumnCloseResult {
965 &self.close
966 }
967
968 pub fn close_mut(&mut self) -> &mut ColumnCloseResult {
975 &mut self.close
976 }
977
978 pub fn append_to_row_group<W: Write + Send>(
981 self,
982 writer: &mut SerializedRowGroupWriter<'_, W>,
983 ) -> Result<()> {
984 let ArrowColumnChunk { data, close } = self;
985
986 let close = close.update_dictionary_location(data.dictionary_len)?;
990
991 let reader = StreamingColumnChunkReader::new(data);
992 writer.append_column_from_read(reader, close)
993 }
994}
995
996pub struct ArrowColumnWriter {
1094 writer: ArrowColumnWriterImpl,
1095 chunk: SharedColumnChunk,
1096}
1097
1098impl std::fmt::Debug for ArrowColumnWriter {
1099 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1100 f.debug_struct("ArrowColumnWriter").finish_non_exhaustive()
1101 }
1102}
1103
1104enum ArrowColumnWriterImpl {
1105 ByteArray(GenericColumnWriter<'static, ByteArrayEncoder>),
1106 Column(ColumnWriter<'static>),
1107}
1108
1109impl ArrowColumnWriter {
1110 pub fn write(&mut self, col: &ArrowLeafColumn) -> Result<()> {
1112 self.write_internal(&col.0)
1113 }
1114
1115 fn write_with_chunker(
1117 &mut self,
1118 col: &ArrowLeafColumn,
1119 chunker: &mut ContentDefinedChunker,
1120 ) -> Result<()> {
1121 let levels = &col.0;
1122 let chunks = chunker.get_arrow_chunks(
1123 levels.def_level_data().as_ref(),
1124 levels.rep_level_data().as_ref(),
1125 levels.array(),
1126 )?;
1127
1128 let num_chunks = chunks.len();
1129 for (i, chunk) in chunks.iter().enumerate() {
1130 let chunk_levels = levels.slice_for_chunk(chunk);
1131 self.write_internal(&chunk_levels)?;
1132
1133 if i + 1 < num_chunks {
1135 match &mut self.writer {
1136 ArrowColumnWriterImpl::Column(c) => c.add_data_page()?,
1137 ArrowColumnWriterImpl::ByteArray(c) => c.add_data_page()?,
1138 }
1139 }
1140 }
1141 Ok(())
1142 }
1143
1144 fn write_internal(&mut self, levels: &ArrayLevels) -> Result<()> {
1145 match &mut self.writer {
1146 ArrowColumnWriterImpl::Column(c) => {
1147 let leaf = levels.array();
1148 match leaf.as_any_dictionary_opt() {
1149 Some(dictionary) => {
1150 let materialized =
1151 arrow_select::take::take(dictionary.values(), dictionary.keys(), None)?;
1152 write_leaf(c, &materialized, levels)?
1153 }
1154 None => write_leaf(c, leaf, levels)?,
1155 };
1156 }
1157 ArrowColumnWriterImpl::ByteArray(c) => {
1158 write_primitive(c, levels.array().as_ref(), levels)?;
1159 }
1160 }
1161 Ok(())
1162 }
1163
1164 pub fn close(self) -> Result<ArrowColumnChunk> {
1166 let close = match self.writer {
1167 ArrowColumnWriterImpl::ByteArray(c) => c.close()?,
1168 ArrowColumnWriterImpl::Column(c) => c.close()?,
1169 };
1170 let chunk = Arc::try_unwrap(self.chunk).ok().unwrap();
1171 let data = chunk.into_inner().unwrap();
1172 Ok(ArrowColumnChunk { data, close })
1173 }
1174
1175 pub fn memory_size(&self) -> usize {
1186 match &self.writer {
1187 ArrowColumnWriterImpl::ByteArray(c) => c.memory_size(),
1188 ArrowColumnWriterImpl::Column(c) => c.memory_size(),
1189 }
1190 }
1191
1192 pub fn get_estimated_total_bytes(&self) -> usize {
1200 match &self.writer {
1201 ArrowColumnWriterImpl::ByteArray(c) => c.get_estimated_total_bytes() as _,
1202 ArrowColumnWriterImpl::Column(c) => c.get_estimated_total_bytes() as _,
1203 }
1204 }
1205}
1206
1207#[derive(Debug)]
1214struct ArrowRowGroupWriter {
1215 writers: Vec<ArrowColumnWriter>,
1216 schema: SchemaRef,
1217 buffered_rows: usize,
1218}
1219
1220impl ArrowRowGroupWriter {
1221 fn new(writers: Vec<ArrowColumnWriter>, arrow: &SchemaRef) -> Self {
1222 Self {
1223 writers,
1224 schema: arrow.clone(),
1225 buffered_rows: 0,
1226 }
1227 }
1228
1229 fn write(&mut self, batch: &RecordBatch) -> Result<()> {
1230 self.buffered_rows += batch.num_rows();
1231 let mut writers = self.writers.iter_mut();
1232 for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
1233 for leaf in compute_leaves(field.as_ref(), column)? {
1234 writers.next().unwrap().write(&leaf)?;
1235 }
1236 }
1237 Ok(())
1238 }
1239
1240 fn write_with_chunkers(
1241 &mut self,
1242 batch: &RecordBatch,
1243 chunkers: &mut [ContentDefinedChunker],
1244 ) -> Result<()> {
1245 self.buffered_rows += batch.num_rows();
1246 let mut writers = self.writers.iter_mut();
1247 let mut chunkers = chunkers.iter_mut();
1248 for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
1249 for leaf in compute_leaves(field.as_ref(), column)? {
1250 writers
1251 .next()
1252 .unwrap()
1253 .write_with_chunker(&leaf, chunkers.next().unwrap())?;
1254 }
1255 }
1256 Ok(())
1257 }
1258
1259 fn get_estimated_total_bytes(&self) -> usize {
1261 self.writers
1262 .iter()
1263 .map(|x| x.get_estimated_total_bytes())
1264 .sum()
1265 }
1266
1267 fn close(self) -> Result<Vec<ArrowColumnChunk>> {
1268 self.writers
1269 .into_iter()
1270 .map(|writer| writer.close())
1271 .collect()
1272 }
1273}
1274
1275#[derive(Debug)]
1280pub struct ArrowRowGroupWriterFactory {
1281 schema: SchemaDescPtr,
1282 arrow_schema: SchemaRef,
1283 props: WriterPropertiesPtr,
1284 page_store_factory: Arc<dyn PageStoreFactory>,
1285 #[cfg(feature = "encryption")]
1286 file_encryptor: Option<Arc<FileEncryptor>>,
1287}
1288
1289impl ArrowRowGroupWriterFactory {
1290 pub fn new<W: Write + Send>(
1292 file_writer: &SerializedFileWriter<W>,
1293 arrow_schema: SchemaRef,
1294 ) -> Self {
1295 let schema = Arc::clone(file_writer.schema_descr_ptr());
1296 let props = Arc::clone(file_writer.properties());
1297 Self {
1298 schema,
1299 arrow_schema,
1300 props,
1301 page_store_factory: Arc::new(InMemoryPageStoreFactory),
1302 #[cfg(feature = "encryption")]
1303 file_encryptor: file_writer.file_encryptor(),
1304 }
1305 }
1306
1307 pub fn with_page_store_factory(
1311 mut self,
1312 page_store_factory: Arc<dyn PageStoreFactory>,
1313 ) -> Self {
1314 self.page_store_factory = page_store_factory;
1315 self
1316 }
1317
1318 fn create_row_group_writer(&self, row_group_index: usize) -> Result<ArrowRowGroupWriter> {
1319 let writers = self.create_column_writers(row_group_index)?;
1320 Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema))
1321 }
1322
1323 pub fn create_column_writers(&self, row_group_index: usize) -> Result<Vec<ArrowColumnWriter>> {
1325 let mut writers = Vec::with_capacity(self.arrow_schema.fields.len());
1326 let mut leaves = self.schema.columns().iter();
1327 let column_factory = self.column_writer_factory(row_group_index);
1328 for field in &self.arrow_schema.fields {
1329 column_factory.get_arrow_column_writer(
1330 field.data_type(),
1331 &self.props,
1332 &mut leaves,
1333 &mut writers,
1334 )?;
1335 }
1336 Ok(writers)
1337 }
1338
1339 #[cfg(feature = "encryption")]
1340 fn column_writer_factory(&self, row_group_idx: usize) -> ArrowColumnWriterFactory {
1341 ArrowColumnWriterFactory::new()
1342 .with_page_store_factory(self.page_store_factory.clone())
1343 .with_file_encryptor(row_group_idx, self.file_encryptor.clone())
1344 }
1345
1346 #[cfg(not(feature = "encryption"))]
1347 fn column_writer_factory(&self, _row_group_idx: usize) -> ArrowColumnWriterFactory {
1348 ArrowColumnWriterFactory::new().with_page_store_factory(self.page_store_factory.clone())
1349 }
1350}
1351
1352#[deprecated(since = "57.0.0", note = "Use `ArrowRowGroupWriterFactory` instead")]
1354pub fn get_column_writers(
1355 parquet: &SchemaDescriptor,
1356 props: &WriterPropertiesPtr,
1357 arrow: &SchemaRef,
1358) -> Result<Vec<ArrowColumnWriter>> {
1359 let mut writers = Vec::with_capacity(arrow.fields.len());
1360 let mut leaves = parquet.columns().iter();
1361 let column_factory = ArrowColumnWriterFactory::new();
1362 for field in &arrow.fields {
1363 column_factory.get_arrow_column_writer(
1364 field.data_type(),
1365 props,
1366 &mut leaves,
1367 &mut writers,
1368 )?;
1369 }
1370 Ok(writers)
1371}
1372
1373struct ArrowColumnWriterFactory {
1375 page_store_factory: Arc<dyn PageStoreFactory>,
1377 #[cfg(feature = "encryption")]
1378 row_group_index: usize,
1379 #[cfg(feature = "encryption")]
1380 file_encryptor: Option<Arc<FileEncryptor>>,
1381}
1382
1383impl ArrowColumnWriterFactory {
1384 pub fn new() -> Self {
1385 Self {
1386 page_store_factory: Arc::new(InMemoryPageStoreFactory),
1387 #[cfg(feature = "encryption")]
1388 row_group_index: 0,
1389 #[cfg(feature = "encryption")]
1390 file_encryptor: None,
1391 }
1392 }
1393
1394 pub fn with_page_store_factory(
1396 mut self,
1397 page_store_factory: Arc<dyn PageStoreFactory>,
1398 ) -> Self {
1399 self.page_store_factory = page_store_factory;
1400 self
1401 }
1402
1403 #[cfg(feature = "encryption")]
1404 pub fn with_file_encryptor(
1405 mut self,
1406 row_group_index: usize,
1407 file_encryptor: Option<Arc<FileEncryptor>>,
1408 ) -> Self {
1409 self.row_group_index = row_group_index;
1410 self.file_encryptor = file_encryptor;
1411 self
1412 }
1413
1414 #[cfg(feature = "encryption")]
1415 fn create_page_writer(
1416 &self,
1417 column_descriptor: &ColumnDescPtr,
1418 column_index: usize,
1419 ) -> Result<Box<ArrowPageWriter>> {
1420 let column_path = column_descriptor.path().string();
1421 let page_encryptor = PageEncryptor::create_if_column_encrypted(
1422 &self.file_encryptor,
1423 self.row_group_index,
1424 column_index,
1425 &column_path,
1426 )?;
1427 let args = PageStoreArgs::new(column_index, column_descriptor);
1428 let store = self.page_store_factory.create(&args)?;
1429 Ok(Box::new(
1430 ArrowPageWriter::new(store).with_encryptor(page_encryptor),
1431 ))
1432 }
1433
1434 #[cfg(not(feature = "encryption"))]
1435 fn create_page_writer(
1436 &self,
1437 column_descriptor: &ColumnDescPtr,
1438 column_index: usize,
1439 ) -> Result<Box<ArrowPageWriter>> {
1440 let args = PageStoreArgs::new(column_index, column_descriptor);
1441 let store = self.page_store_factory.create(&args)?;
1442 Ok(Box::new(ArrowPageWriter::new(store)))
1443 }
1444
1445 fn get_arrow_column_writer(
1448 &self,
1449 data_type: &ArrowDataType,
1450 props: &WriterPropertiesPtr,
1451 leaves: &mut Iter<'_, ColumnDescPtr>,
1452 out: &mut Vec<ArrowColumnWriter>,
1453 ) -> Result<()> {
1454 let col = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
1456 let page_writer = self.create_page_writer(desc, out.len())?;
1457 let chunk = page_writer.buffer.clone();
1458 let writer = get_column_writer(desc.clone(), props.clone(), page_writer);
1459 Ok(ArrowColumnWriter {
1460 chunk,
1461 writer: ArrowColumnWriterImpl::Column(writer),
1462 })
1463 };
1464
1465 let bytes = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
1467 let page_writer = self.create_page_writer(desc, out.len())?;
1468 let chunk = page_writer.buffer.clone();
1469 let writer = GenericColumnWriter::new(desc.clone(), props.clone(), page_writer);
1470 Ok(ArrowColumnWriter {
1471 chunk,
1472 writer: ArrowColumnWriterImpl::ByteArray(writer),
1473 })
1474 };
1475
1476 match data_type {
1477 _ if data_type.is_primitive() => out.push(col(leaves.next().unwrap())?),
1478 ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Boolean | ArrowDataType::Null => {
1479 out.push(col(leaves.next().unwrap())?)
1480 }
1481 ArrowDataType::LargeBinary
1482 | ArrowDataType::Binary
1483 | ArrowDataType::Utf8
1484 | ArrowDataType::LargeUtf8
1485 | ArrowDataType::BinaryView
1486 | ArrowDataType::Utf8View => out.push(bytes(leaves.next().unwrap())?),
1487 ArrowDataType::List(f)
1488 | ArrowDataType::LargeList(f)
1489 | ArrowDataType::FixedSizeList(f, _)
1490 | ArrowDataType::ListView(f)
1491 | ArrowDataType::LargeListView(f) => {
1492 self.get_arrow_column_writer(f.data_type(), props, leaves, out)?
1493 }
1494 ArrowDataType::Struct(fields) => {
1495 for field in fields {
1496 self.get_arrow_column_writer(field.data_type(), props, leaves, out)?
1497 }
1498 }
1499 ArrowDataType::Map(f, _) => match f.data_type() {
1500 ArrowDataType::Struct(f) => {
1501 self.get_arrow_column_writer(f[0].data_type(), props, leaves, out)?;
1502 self.get_arrow_column_writer(f[1].data_type(), props, leaves, out)?
1503 }
1504 _ => unreachable!("invalid map type"),
1505 },
1506 ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1507 ArrowDataType::Utf8
1508 | ArrowDataType::LargeUtf8
1509 | ArrowDataType::Binary
1510 | ArrowDataType::LargeBinary => out.push(bytes(leaves.next().unwrap())?),
1511 ArrowDataType::Utf8View | ArrowDataType::BinaryView => {
1512 out.push(bytes(leaves.next().unwrap())?)
1513 }
1514 ArrowDataType::FixedSizeBinary(_) => out.push(bytes(leaves.next().unwrap())?),
1515 _ => out.push(col(leaves.next().unwrap())?),
1516 },
1517 ArrowDataType::RunEndEncoded(_, value_field) => {
1518 self.get_arrow_column_writer(value_field.data_type(), props, leaves, out)?
1519 }
1520 _ => {
1521 return Err(ParquetError::NYI(format!(
1522 "Attempting to write an Arrow type {data_type} to parquet that is not yet implemented"
1523 )));
1524 }
1525 }
1526 Ok(())
1527 }
1528}
1529
1530fn write_leaf(
1531 writer: &mut ColumnWriter<'_>,
1532 column: &dyn arrow_array::Array,
1533 levels: &ArrayLevels,
1534) -> Result<usize> {
1535 let indices = levels.non_null_indices();
1536
1537 match writer {
1538 ColumnWriter::Int32ColumnWriter(typed) => {
1540 match column.data_type() {
1541 ArrowDataType::Null => {
1542 let array = Int32Array::new_null(column.len());
1543 write_primitive(typed, array.values(), levels)
1544 }
1545 ArrowDataType::Int8 => {
1546 let array: Int32Array = column.as_primitive::<Int8Type>().unary(|x| x as i32);
1547 write_primitive(typed, array.values(), levels)
1548 }
1549 ArrowDataType::Int16 => {
1550 let array: Int32Array = column.as_primitive::<Int16Type>().unary(|x| x as i32);
1551 write_primitive(typed, array.values(), levels)
1552 }
1553 ArrowDataType::Int32 => {
1554 write_primitive(typed, column.as_primitive::<Int32Type>().values(), levels)
1555 }
1556 ArrowDataType::UInt8 => {
1557 let array: Int32Array = column.as_primitive::<UInt8Type>().unary(|x| x as i32);
1558 write_primitive(typed, array.values(), levels)
1559 }
1560 ArrowDataType::UInt16 => {
1561 let array: Int32Array = column.as_primitive::<UInt16Type>().unary(|x| x as i32);
1562 write_primitive(typed, array.values(), levels)
1563 }
1564 ArrowDataType::UInt32 => {
1565 let array = column.as_primitive::<UInt32Type>();
1568 write_primitive(typed, array.values().inner().typed_data(), levels)
1569 }
1570 ArrowDataType::Date32 => {
1571 let array = column.as_primitive::<Date32Type>();
1572 write_primitive(typed, array.values(), levels)
1573 }
1574 ArrowDataType::Time32(TimeUnit::Second) => {
1575 let array = column.as_primitive::<Time32SecondType>();
1576 write_primitive(typed, array.values(), levels)
1577 }
1578 ArrowDataType::Time32(TimeUnit::Millisecond) => {
1579 let array = column.as_primitive::<Time32MillisecondType>();
1580 write_primitive(typed, array.values(), levels)
1581 }
1582 ArrowDataType::Date64 => {
1583 let array: Int32Array = column
1585 .as_primitive::<Date64Type>()
1586 .unary(|x| (x / 86_400_000) as _);
1587
1588 write_primitive(typed, array.values(), levels)
1589 }
1590 ArrowDataType::Decimal32(_, _) => {
1591 let array = column
1592 .as_primitive::<Decimal32Type>()
1593 .unary::<_, Int32Type>(|v| v);
1594 write_primitive(typed, array.values(), levels)
1595 }
1596 ArrowDataType::Decimal64(_, _) => {
1597 let array = column
1599 .as_primitive::<Decimal64Type>()
1600 .unary::<_, Int32Type>(|v| v as i32);
1601 write_primitive(typed, array.values(), levels)
1602 }
1603 ArrowDataType::Decimal128(_, _) => {
1604 let array = column
1606 .as_primitive::<Decimal128Type>()
1607 .unary::<_, Int32Type>(|v| v as i32);
1608 write_primitive(typed, array.values(), levels)
1609 }
1610 ArrowDataType::Decimal256(_, _) => {
1611 let array = column
1613 .as_primitive::<Decimal256Type>()
1614 .unary::<_, Int32Type>(|v| v.as_i128() as i32);
1615 write_primitive(typed, array.values(), levels)
1616 }
1617 d => Err(ParquetError::General(format!("Cannot coerce {d} to I32"))),
1618 }
1619 }
1620 ColumnWriter::BoolColumnWriter(typed) => {
1621 let array = column.as_boolean();
1622 let values = get_bool_array_slice(array, indices.iter().copied());
1623 typed.write_batch_internal(
1624 values.as_slice(),
1625 None,
1626 levels.def_level_data().as_ref(),
1627 levels.rep_level_data().as_ref(),
1628 None,
1629 None,
1630 None,
1631 )
1632 }
1633 ColumnWriter::Int64ColumnWriter(typed) => {
1634 match column.data_type() {
1635 ArrowDataType::Date64 => {
1636 let array = column
1637 .as_primitive::<Date64Type>()
1638 .reinterpret_cast::<Int64Type>();
1639
1640 write_primitive(typed, array.values(), levels)
1641 }
1642 ArrowDataType::Int64 => {
1643 let array = column.as_primitive::<Int64Type>();
1644 write_primitive(typed, array.values(), levels)
1645 }
1646 ArrowDataType::UInt64 => {
1647 let values = column.as_primitive::<UInt64Type>().values();
1648 let array = values.inner().typed_data::<i64>();
1651 write_primitive(typed, array, levels)
1652 }
1653 ArrowDataType::Time64(TimeUnit::Microsecond) => {
1654 let array = column.as_primitive::<Time64MicrosecondType>();
1655 write_primitive(typed, array.values(), levels)
1656 }
1657 ArrowDataType::Time64(TimeUnit::Nanosecond) => {
1658 let array = column.as_primitive::<Time64NanosecondType>();
1659 write_primitive(typed, array.values(), levels)
1660 }
1661 ArrowDataType::Timestamp(unit, _) => match unit {
1662 TimeUnit::Second => {
1663 let array = column.as_primitive::<TimestampSecondType>();
1664 write_primitive(typed, array.values(), levels)
1665 }
1666 TimeUnit::Millisecond => {
1667 let array = column.as_primitive::<TimestampMillisecondType>();
1668 write_primitive(typed, array.values(), levels)
1669 }
1670 TimeUnit::Microsecond => {
1671 let array = column.as_primitive::<TimestampMicrosecondType>();
1672 write_primitive(typed, array.values(), levels)
1673 }
1674 TimeUnit::Nanosecond => {
1675 let array = column.as_primitive::<TimestampNanosecondType>();
1676 write_primitive(typed, array.values(), levels)
1677 }
1678 },
1679 ArrowDataType::Duration(unit) => match unit {
1680 TimeUnit::Second => {
1681 let array = column.as_primitive::<DurationSecondType>();
1682 write_primitive(typed, array.values(), levels)
1683 }
1684 TimeUnit::Millisecond => {
1685 let array = column.as_primitive::<DurationMillisecondType>();
1686 write_primitive(typed, array.values(), levels)
1687 }
1688 TimeUnit::Microsecond => {
1689 let array = column.as_primitive::<DurationMicrosecondType>();
1690 write_primitive(typed, array.values(), levels)
1691 }
1692 TimeUnit::Nanosecond => {
1693 let array = column.as_primitive::<DurationNanosecondType>();
1694 write_primitive(typed, array.values(), levels)
1695 }
1696 },
1697 ArrowDataType::Decimal64(_, _) => {
1698 let array = column
1699 .as_primitive::<Decimal64Type>()
1700 .reinterpret_cast::<Int64Type>();
1701 write_primitive(typed, array.values(), levels)
1702 }
1703 ArrowDataType::Decimal128(_, _) => {
1704 let array = column
1706 .as_primitive::<Decimal128Type>()
1707 .unary::<_, Int64Type>(|v| v as i64);
1708 write_primitive(typed, array.values(), levels)
1709 }
1710 ArrowDataType::Decimal256(_, _) => {
1711 let array = column
1713 .as_primitive::<Decimal256Type>()
1714 .unary::<_, Int64Type>(|v| v.as_i128() as i64);
1715 write_primitive(typed, array.values(), levels)
1716 }
1717 d => Err(ParquetError::General(format!("Cannot coerce {d} to I64"))),
1718 }
1719 }
1720 ColumnWriter::Int96ColumnWriter(_typed) => {
1721 unreachable!("Currently unreachable because data type not supported")
1722 }
1723 ColumnWriter::FloatColumnWriter(typed) => {
1724 let array = column.as_primitive::<Float32Type>();
1725 write_primitive(typed, array.values(), levels)
1726 }
1727 ColumnWriter::DoubleColumnWriter(typed) => {
1728 let array = column.as_primitive::<Float64Type>();
1729 write_primitive(typed, array.values(), levels)
1730 }
1731 ColumnWriter::ByteArrayColumnWriter(_) => {
1732 unreachable!("should use ByteArrayWriter")
1733 }
1734 ColumnWriter::FixedLenByteArrayColumnWriter(typed) => {
1735 let bytes = match column.data_type() {
1736 ArrowDataType::Interval(interval_unit) => match interval_unit {
1737 IntervalUnit::YearMonth => {
1738 let array = column.as_primitive::<IntervalYearMonthType>();
1739 get_interval_ym_array_slice(array, indices.iter().copied())
1740 }
1741 IntervalUnit::DayTime => {
1742 let array = column.as_primitive::<IntervalDayTimeType>();
1743 get_interval_dt_array_slice(array, indices.iter().copied())
1744 }
1745 _ => {
1746 return Err(ParquetError::NYI(format!(
1747 "Attempting to write an Arrow interval type {interval_unit:?} to parquet that is not yet implemented"
1748 )));
1749 }
1750 },
1751 ArrowDataType::FixedSizeBinary(_) => {
1752 let array = column.as_fixed_size_binary();
1753 get_fsb_array_slice(array, indices.iter().copied())
1754 }
1755 ArrowDataType::Decimal32(_, _) => {
1756 let array = column.as_primitive::<Decimal32Type>();
1757 get_decimal_32_array_slice(array, indices.iter().copied())
1758 }
1759 ArrowDataType::Decimal64(_, _) => {
1760 let array = column.as_primitive::<Decimal64Type>();
1761 get_decimal_64_array_slice(array, indices.iter().copied())
1762 }
1763 ArrowDataType::Decimal128(_, _) => {
1764 let array = column.as_primitive::<Decimal128Type>();
1765 get_decimal_128_array_slice(array, indices.iter().copied())
1766 }
1767 ArrowDataType::Decimal256(_, _) => {
1768 let array = column.as_primitive::<Decimal256Type>();
1769 get_decimal_256_array_slice(array, indices.iter().copied())
1770 }
1771 ArrowDataType::Float16 => {
1772 let array = column.as_primitive::<Float16Type>();
1773 get_float_16_array_slice(array, indices.iter().copied())
1774 }
1775 _ => {
1776 return Err(ParquetError::NYI(
1777 "Attempting to write an Arrow type that is not yet implemented".to_string(),
1778 ));
1779 }
1780 };
1781 typed.write_batch_internal(
1782 bytes.as_slice(),
1783 None,
1784 levels.def_level_data().as_ref(),
1785 levels.rep_level_data().as_ref(),
1786 None,
1787 None,
1788 None,
1789 )
1790 }
1791 }
1792}
1793
1794fn write_primitive<E: ColumnValueEncoder>(
1795 writer: &mut GenericColumnWriter<E>,
1796 values: &E::Values,
1797 levels: &ArrayLevels,
1798) -> Result<usize> {
1799 writer.write_batch_internal(
1800 values,
1801 Some(levels.non_null_indices()),
1802 levels.def_level_data().as_ref(),
1803 levels.rep_level_data().as_ref(),
1804 None,
1805 None,
1806 None,
1807 )
1808}
1809
1810fn get_bool_array_slice(
1811 array: &arrow_array::BooleanArray,
1812 indices: impl ExactSizeIterator<Item = usize>,
1813) -> Vec<bool> {
1814 let mut values = Vec::with_capacity(indices.len());
1815 for i in indices {
1816 values.push(array.value(i))
1817 }
1818 values
1819}
1820
1821fn get_interval_ym_array_slice(
1824 array: &arrow_array::IntervalYearMonthArray,
1825 indices: impl ExactSizeIterator<Item = usize>,
1826) -> Vec<FixedLenByteArray> {
1827 let mut values = Vec::with_capacity(indices.len());
1828 for i in indices {
1829 let mut value = array.value(i).to_le_bytes().to_vec();
1830 let mut suffix = vec![0; 8];
1831 value.append(&mut suffix);
1832 values.push(FixedLenByteArray::from(ByteArray::from(value)))
1833 }
1834 values
1835}
1836
1837fn get_interval_dt_array_slice(
1840 array: &arrow_array::IntervalDayTimeArray,
1841 indices: impl ExactSizeIterator<Item = usize>,
1842) -> Vec<FixedLenByteArray> {
1843 let mut values = Vec::with_capacity(indices.len());
1844 for i in indices {
1845 let mut out = [0; 12];
1846 let value = array.value(i);
1847 out[4..8].copy_from_slice(&value.days.to_le_bytes());
1848 out[8..12].copy_from_slice(&value.milliseconds.to_le_bytes());
1849 values.push(FixedLenByteArray::from(ByteArray::from(out.to_vec())));
1850 }
1851 values
1852}
1853
1854fn get_decimal_32_array_slice(
1855 array: &arrow_array::Decimal32Array,
1856 indices: impl ExactSizeIterator<Item = usize>,
1857) -> Vec<FixedLenByteArray> {
1858 let mut values = Vec::with_capacity(indices.len());
1859 let size = decimal_length_from_precision(array.precision());
1860 for i in indices {
1861 let as_be_bytes = array.value(i).to_be_bytes();
1862 let resized_value = as_be_bytes[(4 - size)..].to_vec();
1863 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1864 }
1865 values
1866}
1867
1868fn get_decimal_64_array_slice(
1869 array: &arrow_array::Decimal64Array,
1870 indices: impl ExactSizeIterator<Item = usize>,
1871) -> Vec<FixedLenByteArray> {
1872 let mut values = Vec::with_capacity(indices.len());
1873 let size = decimal_length_from_precision(array.precision());
1874 for i in indices {
1875 let as_be_bytes = array.value(i).to_be_bytes();
1876 let resized_value = as_be_bytes[(8 - size)..].to_vec();
1877 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1878 }
1879 values
1880}
1881
1882fn get_decimal_128_array_slice(
1883 array: &arrow_array::Decimal128Array,
1884 indices: impl ExactSizeIterator<Item = usize>,
1885) -> Vec<FixedLenByteArray> {
1886 let mut values = Vec::with_capacity(indices.len());
1887 let size = decimal_length_from_precision(array.precision());
1888 for i in indices {
1889 let as_be_bytes = array.value(i).to_be_bytes();
1890 let resized_value = as_be_bytes[(16 - size)..].to_vec();
1891 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1892 }
1893 values
1894}
1895
1896fn get_decimal_256_array_slice(
1897 array: &arrow_array::Decimal256Array,
1898 indices: impl ExactSizeIterator<Item = usize>,
1899) -> Vec<FixedLenByteArray> {
1900 let mut values = Vec::with_capacity(indices.len());
1901 let size = decimal_length_from_precision(array.precision());
1902 for i in indices {
1903 let as_be_bytes = array.value(i).to_be_bytes();
1904 let resized_value = as_be_bytes[(32 - size)..].to_vec();
1905 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1906 }
1907 values
1908}
1909
1910fn get_float_16_array_slice(
1911 array: &arrow_array::Float16Array,
1912 indices: impl ExactSizeIterator<Item = usize>,
1913) -> Vec<FixedLenByteArray> {
1914 let mut values = Vec::with_capacity(indices.len());
1915 for i in indices {
1916 let value = array.value(i).to_le_bytes().to_vec();
1917 values.push(FixedLenByteArray::from(ByteArray::from(value)));
1918 }
1919 values
1920}
1921
1922fn get_fsb_array_slice(
1923 array: &arrow_array::FixedSizeBinaryArray,
1924 indices: impl ExactSizeIterator<Item = usize>,
1925) -> Vec<FixedLenByteArray> {
1926 let mut values = Vec::with_capacity(indices.len());
1927 for i in indices {
1928 let value = array.value(i).to_vec();
1929 values.push(FixedLenByteArray::from(ByteArray::from(value)))
1930 }
1931 values
1932}
1933
1934#[cfg(test)]
1935mod tests {
1936 use super::*;
1937 use std::collections::HashMap;
1938
1939 use std::fs::File;
1940
1941 use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
1942 use crate::arrow::{ARROW_SCHEMA_META_KEY, PARQUET_FIELD_ID_META_KEY};
1943 use crate::column::page::{Page, PageReader};
1944 use crate::file::metadata::thrift::PageHeader;
1945 use crate::file::page_index::column_index::ColumnIndexMetaData;
1946 use crate::file::reader::SerializedPageReader;
1947 use crate::parquet_thrift::{ReadThrift, ThriftSliceInputProtocol};
1948 use crate::schema::types::ColumnPath;
1949 use arrow::datatypes::ToByteSlice;
1950 use arrow::datatypes::{DataType, Schema};
1951 use arrow::error::Result as ArrowResult;
1952 use arrow::util::data_gen::create_random_array;
1953 use arrow::util::pretty::pretty_format_batches;
1954 use arrow::{array::*, buffer::Buffer};
1955 use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano, NullBuffer, OffsetBuffer, i256};
1956 use arrow_schema::Fields;
1957 use half::f16;
1958 use num_traits::{FromPrimitive, ToPrimitive};
1959 use tempfile::tempfile;
1960
1961 use crate::basic::Encoding;
1962 use crate::data_type::AsBytes;
1963 use crate::file::metadata::{ColumnChunkMetaData, ParquetMetaData, ParquetMetaDataReader};
1964 use crate::file::properties::{
1965 BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion,
1966 };
1967 use crate::file::serialized_reader::ReadOptionsBuilder;
1968 use crate::file::{
1969 reader::{FileReader, SerializedFileReader},
1970 statistics::Statistics,
1971 };
1972
1973 #[derive(Debug, Default)]
1978 struct RecordingPageStore {
1979 next: u64,
1980 blobs: HashMap<u64, Bytes>,
1981 puts: Arc<std::sync::atomic::AtomicUsize>,
1982 }
1983
1984 impl PageStore for RecordingPageStore {
1985 fn put(&mut self, value: Bytes) -> Result<PageKey> {
1986 let id = 100 + self.next * 7;
1988 self.next += 1;
1989 self.puts.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1990 self.blobs.insert(id, value);
1991 Ok(PageKey::new(id))
1992 }
1993
1994 fn take(&mut self, key: PageKey) -> Result<Bytes> {
1995 self.blobs
1996 .remove(&key.get())
1997 .ok_or_else(|| ParquetError::General(format!("missing key {}", key.get())))
1998 }
1999 }
2000
2001 #[derive(Debug)]
2002 struct RecordingPageStoreFactory {
2003 puts: Arc<std::sync::atomic::AtomicUsize>,
2004 }
2005
2006 impl PageStoreFactory for RecordingPageStoreFactory {
2007 fn create(&self, _args: &PageStoreArgs<'_>) -> Result<Box<dyn PageStore>> {
2008 Ok(Box::new(RecordingPageStore {
2009 puts: self.puts.clone(),
2010 ..Default::default()
2011 }))
2012 }
2013 }
2014
2015 #[test]
2019 fn custom_page_store_is_byte_identical_to_default() {
2020 let schema = Arc::new(Schema::new(vec![
2021 Field::new("i", DataType::Int32, true),
2022 Field::new("s", DataType::Utf8, true),
2024 ]));
2025 let i = Int32Array::from(vec![Some(1), None, Some(3), Some(4), Some(5), Some(6)]);
2026 let s = StringArray::from(vec![
2027 Some("a"),
2028 Some("bb"),
2029 Some("a"),
2030 None,
2031 Some("bb"),
2032 Some("ccc"),
2033 ]);
2034 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(i), Arc::new(s)]).unwrap();
2035
2036 let props = WriterProperties::builder()
2039 .set_max_row_group_row_count(Some(3))
2040 .build();
2041
2042 let write = |factory: Option<Arc<dyn PageStoreFactory>>| {
2043 let mut buffer = Vec::new();
2044 let mut opts = ArrowWriterOptions::new().with_properties(props.clone());
2045 if let Some(factory) = factory {
2046 opts = opts.with_page_store_factory(factory);
2047 }
2048 let mut writer =
2049 ArrowWriter::try_new_with_options(&mut buffer, schema.clone(), opts).unwrap();
2050 writer.write(&batch).unwrap();
2051 writer.close().unwrap();
2052 buffer
2053 };
2054
2055 let default_bytes = write(None);
2056
2057 let puts = Arc::new(std::sync::atomic::AtomicUsize::new(0));
2058 let custom_bytes = write(Some(Arc::new(RecordingPageStoreFactory {
2059 puts: puts.clone(),
2060 })));
2061
2062 assert!(
2063 puts.load(std::sync::atomic::Ordering::Relaxed) > 0,
2064 "custom PageStore was never written to"
2065 );
2066 assert_eq!(
2067 default_bytes, custom_bytes,
2068 "a custom PageStore must produce byte-identical output to the default"
2069 );
2070 }
2071
2072 #[test]
2078 fn dictionary_column_round_trips_with_offset_index_disabled() {
2079 let schema = Arc::new(Schema::new(vec![Field::new("k", DataType::Int32, true)]));
2080
2081 let values: Vec<Option<i32>> = (0..50_000).map(|i| Some(i % 8)).collect();
2084 let array = Int32Array::from(values.clone());
2085 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
2086
2087 let props = WriterProperties::builder()
2088 .set_offset_index_disabled(true)
2089 .set_data_page_row_count_limit(4096)
2090 .build();
2091 let opts = ArrowWriterOptions::new().with_properties(props);
2092
2093 let mut buffer = Vec::new();
2094 let mut writer =
2095 ArrowWriter::try_new_with_options(&mut buffer, schema.clone(), opts).unwrap();
2096 writer.write(&batch).unwrap();
2097 writer.close().unwrap();
2098
2099 let reader = ParquetRecordBatchReader::try_new(Bytes::from(buffer), values.len()).unwrap();
2100 let read: Vec<RecordBatch> = reader.collect::<ArrowResult<_>>().unwrap();
2101 let read_values: Vec<Option<i32>> = read
2102 .iter()
2103 .flat_map(|b| b.column(0).as_primitive::<Int32Type>().iter())
2104 .collect();
2105 assert_eq!(read_values, values);
2106 }
2107
2108 #[test]
2113 fn dictionary_page_is_routed_through_the_store() {
2114 #[derive(Debug, Default)]
2116 struct SizeRecordingPageStore {
2117 blobs: Vec<Bytes>,
2118 bytes_put: Arc<std::sync::atomic::AtomicUsize>,
2119 }
2120 impl PageStore for SizeRecordingPageStore {
2121 fn put(&mut self, value: Bytes) -> Result<PageKey> {
2122 self.bytes_put
2123 .fetch_add(value.len(), std::sync::atomic::Ordering::Relaxed);
2124 let key = PageKey::new(self.blobs.len() as u64);
2125 self.blobs.push(value);
2126 Ok(key)
2127 }
2128 fn take(&mut self, key: PageKey) -> Result<Bytes> {
2129 Ok(std::mem::take(&mut self.blobs[key.get() as usize]))
2130 }
2131 }
2132 #[derive(Debug)]
2133 struct Factory {
2134 bytes_put: Arc<std::sync::atomic::AtomicUsize>,
2135 }
2136 impl PageStoreFactory for Factory {
2137 fn create(&self, _args: &PageStoreArgs<'_>) -> Result<Box<dyn PageStore>> {
2138 Ok(Box::new(SizeRecordingPageStore {
2139 bytes_put: self.bytes_put.clone(),
2140 ..Default::default()
2141 }))
2142 }
2143 }
2144
2145 let schema = Arc::new(Schema::new(vec![Field::new("s", DataType::Utf8, false)]));
2146 let values: Vec<&str> = (0..2048)
2149 .map(|i| ["alpha", "beta", "gamma", "delta"][i % 4])
2150 .collect();
2151 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(StringArray::from(values))])
2152 .unwrap();
2153
2154 let bytes_put = Arc::new(std::sync::atomic::AtomicUsize::new(0));
2155 let opts = ArrowWriterOptions::new().with_page_store_factory(Arc::new(Factory {
2156 bytes_put: bytes_put.clone(),
2157 }));
2158
2159 let mut buffer = Vec::new();
2162 let mut writer =
2163 ArrowWriter::try_new_with_options(&mut buffer, schema.clone(), opts).unwrap();
2164 writer.write(&batch).unwrap();
2165 writer.close().unwrap();
2166
2167 let reader = SerializedFileReader::new(Bytes::from(buffer)).unwrap();
2168 let column = reader.metadata().row_group(0).column(0);
2169 assert!(
2170 column.dictionary_page_offset().is_some(),
2171 "expected the column to be dictionary-encoded"
2172 );
2173
2174 assert_eq!(
2178 bytes_put.load(std::sync::atomic::Ordering::Relaxed) as i64,
2179 column.compressed_size(),
2180 "the dictionary page must pass through the store like any other page"
2181 );
2182 }
2183
2184 #[test]
2185 fn arrow_writer() {
2186 let schema = Schema::new(vec![
2188 Field::new("a", DataType::Int32, false),
2189 Field::new("b", DataType::Int32, true),
2190 ]);
2191
2192 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2194 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
2195
2196 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
2198
2199 roundtrip(batch, Some(SMALL_SIZE / 2));
2200 }
2201
2202 fn get_bytes_after_close(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
2203 let mut buffer = vec![];
2204
2205 let mut writer = ArrowWriter::try_new(&mut buffer, schema, None).unwrap();
2206 writer.write(expected_batch).unwrap();
2207 writer.close().unwrap();
2208
2209 buffer
2210 }
2211
2212 fn get_bytes_by_into_inner(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
2213 let mut writer = ArrowWriter::try_new(Vec::new(), schema, None).unwrap();
2214 writer.write(expected_batch).unwrap();
2215 writer.into_inner().unwrap()
2216 }
2217
2218 #[test]
2219 fn roundtrip_bytes() {
2220 let schema = Arc::new(Schema::new(vec![
2222 Field::new("a", DataType::Int32, false),
2223 Field::new("b", DataType::Int32, true),
2224 ]));
2225
2226 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2228 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
2229
2230 let expected_batch =
2232 RecordBatch::try_new(schema.clone(), vec![Arc::new(a), Arc::new(b)]).unwrap();
2233
2234 for buffer in [
2235 get_bytes_after_close(schema.clone(), &expected_batch),
2236 get_bytes_by_into_inner(schema, &expected_batch),
2237 ] {
2238 let cursor = Bytes::from(buffer);
2239 let mut record_batch_reader = ParquetRecordBatchReader::try_new(cursor, 1024).unwrap();
2240
2241 let actual_batch = record_batch_reader
2242 .next()
2243 .expect("No batch found")
2244 .expect("Unable to get batch");
2245
2246 assert_eq!(expected_batch.schema(), actual_batch.schema());
2247 assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
2248 assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
2249 for i in 0..expected_batch.num_columns() {
2250 let expected_data = expected_batch.column(i).to_data();
2251 let actual_data = actual_batch.column(i).to_data();
2252
2253 assert_eq!(expected_data, actual_data);
2254 }
2255 }
2256 }
2257
2258 #[test]
2259 fn arrow_writer_non_null() {
2260 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
2262
2263 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2265
2266 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2268
2269 roundtrip(batch, Some(SMALL_SIZE / 2));
2270 }
2271
2272 #[test]
2273 fn arrow_writer_list() {
2274 let schema = Schema::new(vec![Field::new(
2276 "a",
2277 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
2278 true,
2279 )]);
2280
2281 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2283
2284 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
2287
2288 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2290 DataType::Int32,
2291 false,
2292 ))))
2293 .len(5)
2294 .add_buffer(a_value_offsets)
2295 .add_child_data(a_values.into_data())
2296 .null_bit_buffer(Some(Buffer::from([0b00011011])))
2297 .build()
2298 .unwrap();
2299 let a = ListArray::from(a_list_data);
2300
2301 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2303
2304 assert_eq!(batch.column(0).null_count(), 1);
2305
2306 roundtrip(batch, None);
2309 }
2310
2311 #[test]
2312 fn arrow_writer_list_non_null() {
2313 let schema = Schema::new(vec![Field::new(
2315 "a",
2316 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
2317 false,
2318 )]);
2319
2320 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2322
2323 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
2326
2327 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2329 DataType::Int32,
2330 false,
2331 ))))
2332 .len(5)
2333 .add_buffer(a_value_offsets)
2334 .add_child_data(a_values.into_data())
2335 .build()
2336 .unwrap();
2337 let a = ListArray::from(a_list_data);
2338
2339 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2341
2342 assert_eq!(batch.column(0).null_count(), 0);
2345
2346 roundtrip(batch, None);
2347 }
2348
2349 #[test]
2350 fn arrow_writer_list_view() {
2351 let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
2352 let schema = Schema::new(vec![Field::new(
2353 "a",
2354 DataType::ListView(list_field.clone()),
2355 true,
2356 )]);
2357
2358 let a = ListViewArray::new(
2360 list_field,
2361 vec![0, 1, 0, 3, 6].into(),
2362 vec![1, 2, 0, 3, 4].into(),
2363 Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
2364 Some(vec![true, true, false, true, true].into()),
2365 );
2366
2367 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2368
2369 assert_eq!(batch.column(0).null_count(), 1);
2370
2371 roundtrip(batch, None);
2372 }
2373
2374 #[test]
2375 fn arrow_writer_list_view_non_null() {
2376 let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
2377 let schema = Schema::new(vec![Field::new(
2378 "a",
2379 DataType::ListView(list_field.clone()),
2380 false,
2381 )]);
2382
2383 let a = ListViewArray::new(
2385 list_field,
2386 vec![0, 1, 0, 3, 6].into(),
2387 vec![1, 2, 0, 3, 4].into(),
2388 Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
2389 None,
2390 );
2391
2392 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2393
2394 assert_eq!(batch.column(0).null_count(), 0);
2395
2396 roundtrip(batch, None);
2397 }
2398
2399 #[test]
2400 fn arrow_writer_list_view_out_of_order() {
2401 let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
2402 let schema = Schema::new(vec![Field::new(
2403 "a",
2404 DataType::ListView(list_field.clone()),
2405 false,
2406 )]);
2407
2408 let a = ListViewArray::new(
2410 list_field,
2411 vec![0, 1, 0, 6, 3].into(),
2412 vec![1, 2, 0, 4, 3].into(),
2413 Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
2414 None,
2415 );
2416
2417 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2418
2419 roundtrip(batch, None);
2420 }
2421
2422 #[test]
2423 fn arrow_writer_large_list_view() {
2424 let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
2425 let schema = Schema::new(vec![Field::new(
2426 "a",
2427 DataType::LargeListView(list_field.clone()),
2428 true,
2429 )]);
2430
2431 let a = LargeListViewArray::new(
2433 list_field,
2434 vec![0i64, 1, 0, 3, 6].into(),
2435 vec![1i64, 2, 0, 3, 4].into(),
2436 Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
2437 Some(vec![true, true, false, true, true].into()),
2438 );
2439
2440 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2441
2442 assert_eq!(batch.column(0).null_count(), 1);
2443
2444 roundtrip(batch, None);
2445 }
2446
2447 #[test]
2448 fn arrow_writer_list_view_with_struct() {
2449 let struct_fields = Fields::from(vec![
2451 Field::new("id", DataType::Int32, false),
2452 Field::new("name", DataType::Utf8, false),
2453 ]);
2454 let struct_type = DataType::Struct(struct_fields.clone());
2455 let list_field = Arc::new(Field::new("item", struct_type.clone(), false));
2456
2457 let schema = Schema::new(vec![Field::new(
2458 "a",
2459 DataType::ListView(list_field.clone()),
2460 true,
2461 )]);
2462
2463 let id_array = Int32Array::from(vec![1, 2, 3, 4, 5]);
2465 let name_array = StringArray::from(vec!["a", "b", "c", "d", "e"]);
2466 let struct_array = StructArray::new(
2467 struct_fields,
2468 vec![Arc::new(id_array), Arc::new(name_array)],
2469 None,
2470 );
2471
2472 let list_view = ListViewArray::new(
2474 list_field,
2475 vec![0, 2, 2].into(), vec![2, 0, 3].into(), Arc::new(struct_array),
2478 Some(vec![true, false, true].into()),
2479 );
2480
2481 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(list_view)]).unwrap();
2482
2483 roundtrip(batch, None);
2484 }
2485
2486 #[test]
2487 fn arrow_writer_binary() {
2488 let string_field = Field::new("a", DataType::Utf8, false);
2489 let binary_field = Field::new("b", DataType::Binary, false);
2490 let schema = Schema::new(vec![string_field, binary_field]);
2491
2492 let raw_string_values = vec!["foo", "bar", "baz", "quux"];
2493 let raw_binary_values = [
2494 b"foo".to_vec(),
2495 b"bar".to_vec(),
2496 b"baz".to_vec(),
2497 b"quux".to_vec(),
2498 ];
2499 let raw_binary_value_refs = raw_binary_values
2500 .iter()
2501 .map(|x| x.as_slice())
2502 .collect::<Vec<_>>();
2503
2504 let string_values = StringArray::from(raw_string_values.clone());
2505 let binary_values = BinaryArray::from(raw_binary_value_refs);
2506 let batch = RecordBatch::try_new(
2507 Arc::new(schema),
2508 vec![Arc::new(string_values), Arc::new(binary_values)],
2509 )
2510 .unwrap();
2511
2512 roundtrip(batch, Some(SMALL_SIZE / 2));
2513 }
2514
2515 #[test]
2516 fn arrow_writer_binary_view() {
2517 let string_field = Field::new("a", DataType::Utf8View, false);
2518 let binary_field = Field::new("b", DataType::BinaryView, false);
2519 let nullable_string_field = Field::new("a", DataType::Utf8View, true);
2520 let schema = Schema::new(vec![string_field, binary_field, nullable_string_field]);
2521
2522 let raw_string_values = vec!["foo", "bar", "large payload over 12 bytes", "lulu"];
2523 let raw_binary_values = vec![
2524 b"foo".to_vec(),
2525 b"bar".to_vec(),
2526 b"large payload over 12 bytes".to_vec(),
2527 b"lulu".to_vec(),
2528 ];
2529 let nullable_string_values =
2530 vec![Some("foo"), None, Some("large payload over 12 bytes"), None];
2531
2532 let string_view_values = StringViewArray::from(raw_string_values);
2533 let binary_view_values = BinaryViewArray::from_iter_values(raw_binary_values);
2534 let nullable_string_view_values = StringViewArray::from(nullable_string_values);
2535 let batch = RecordBatch::try_new(
2536 Arc::new(schema),
2537 vec![
2538 Arc::new(string_view_values),
2539 Arc::new(binary_view_values),
2540 Arc::new(nullable_string_view_values),
2541 ],
2542 )
2543 .unwrap();
2544
2545 roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
2546 roundtrip(batch, None);
2547 }
2548
2549 #[test]
2550 fn arrow_writer_binary_view_long_value() {
2551 let string_field = Field::new("a", DataType::Utf8View, false);
2552 let binary_field = Field::new("b", DataType::BinaryView, false);
2553 let schema = Schema::new(vec![string_field, binary_field]);
2554
2555 let long = "a".repeat(128);
2559 let raw_string_values = vec!["foo", long.as_str(), "bar"];
2560 let raw_binary_values = vec![b"foo".to_vec(), long.as_bytes().to_vec(), b"bar".to_vec()];
2561
2562 let string_view_values: ArrayRef = Arc::new(StringViewArray::from(raw_string_values));
2563 let binary_view_values: ArrayRef =
2564 Arc::new(BinaryViewArray::from_iter_values(raw_binary_values));
2565
2566 one_column_roundtrip(Arc::clone(&string_view_values), false);
2567 one_column_roundtrip(Arc::clone(&binary_view_values), false);
2568
2569 let batch = RecordBatch::try_new(
2570 Arc::new(schema),
2571 vec![string_view_values, binary_view_values],
2572 )
2573 .unwrap();
2574
2575 for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2577 let props = WriterProperties::builder()
2578 .set_writer_version(version)
2579 .set_dictionary_enabled(false)
2580 .build();
2581 roundtrip_opts(&batch, props);
2582 }
2583 }
2584
2585 fn get_decimal_batch(precision: u8, scale: i8) -> RecordBatch {
2586 let decimal_field = Field::new("a", DataType::Decimal128(precision, scale), false);
2587 let schema = Schema::new(vec![decimal_field]);
2588
2589 let decimal_values = vec![10_000, 50_000, 0, -100]
2590 .into_iter()
2591 .map(Some)
2592 .collect::<Decimal128Array>()
2593 .with_precision_and_scale(precision, scale)
2594 .unwrap();
2595
2596 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(decimal_values)]).unwrap()
2597 }
2598
2599 #[test]
2600 fn arrow_writer_decimal() {
2601 let batch_int32_decimal = get_decimal_batch(5, 2);
2603 roundtrip(batch_int32_decimal, Some(SMALL_SIZE / 2));
2604 let batch_int64_decimal = get_decimal_batch(12, 2);
2606 roundtrip(batch_int64_decimal, Some(SMALL_SIZE / 2));
2607 let batch_fixed_len_byte_array_decimal = get_decimal_batch(30, 2);
2609 roundtrip(batch_fixed_len_byte_array_decimal, Some(SMALL_SIZE / 2));
2610 }
2611
2612 #[test]
2613 fn arrow_writer_complex() {
2614 let struct_field_d = Arc::new(Field::new("d", DataType::Float64, true));
2616 let struct_field_f = Arc::new(Field::new("f", DataType::Float32, true));
2617 let struct_field_g = Arc::new(Field::new_list(
2618 "g",
2619 Field::new_list_field(DataType::Int16, true),
2620 false,
2621 ));
2622 let struct_field_h = Arc::new(Field::new_list(
2623 "h",
2624 Field::new_list_field(DataType::Int16, false),
2625 true,
2626 ));
2627 let struct_field_e = Arc::new(Field::new_struct(
2628 "e",
2629 vec![
2630 struct_field_f.clone(),
2631 struct_field_g.clone(),
2632 struct_field_h.clone(),
2633 ],
2634 false,
2635 ));
2636 let schema = Schema::new(vec![
2637 Field::new("a", DataType::Int32, false),
2638 Field::new("b", DataType::Int32, true),
2639 Field::new_struct(
2640 "c",
2641 vec![struct_field_d.clone(), struct_field_e.clone()],
2642 false,
2643 ),
2644 ]);
2645
2646 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2648 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
2649 let d = Float64Array::from(vec![None, None, None, Some(1.0), None]);
2650 let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]);
2651
2652 let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2653
2654 let g_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
2657
2658 let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
2660 .len(5)
2661 .add_buffer(g_value_offsets.clone())
2662 .add_child_data(g_value.to_data())
2663 .build()
2664 .unwrap();
2665 let g = ListArray::from(g_list_data);
2666 let h_list_data = ArrayData::builder(struct_field_h.data_type().clone())
2668 .len(5)
2669 .add_buffer(g_value_offsets)
2670 .add_child_data(g_value.to_data())
2671 .null_bit_buffer(Some(Buffer::from([0b00011011])))
2672 .build()
2673 .unwrap();
2674 let h = ListArray::from(h_list_data);
2675
2676 let e = StructArray::from(vec![
2677 (struct_field_f, Arc::new(f) as ArrayRef),
2678 (struct_field_g, Arc::new(g) as ArrayRef),
2679 (struct_field_h, Arc::new(h) as ArrayRef),
2680 ]);
2681
2682 let c = StructArray::from(vec![
2683 (struct_field_d, Arc::new(d) as ArrayRef),
2684 (struct_field_e, Arc::new(e) as ArrayRef),
2685 ]);
2686
2687 let batch = RecordBatch::try_new(
2689 Arc::new(schema),
2690 vec![Arc::new(a), Arc::new(b), Arc::new(c)],
2691 )
2692 .unwrap();
2693
2694 roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
2695 roundtrip(batch, Some(SMALL_SIZE / 3));
2696 }
2697
2698 #[test]
2699 fn arrow_writer_complex_mixed() {
2700 let offset_field = Arc::new(Field::new("offset", DataType::Int32, false));
2705 let partition_field = Arc::new(Field::new("partition", DataType::Int64, true));
2706 let topic_field = Arc::new(Field::new("topic", DataType::Utf8, true));
2707 let schema = Schema::new(vec![Field::new(
2708 "some_nested_object",
2709 DataType::Struct(Fields::from(vec![
2710 offset_field.clone(),
2711 partition_field.clone(),
2712 topic_field.clone(),
2713 ])),
2714 false,
2715 )]);
2716
2717 let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
2719 let partition = Int64Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
2720 let topic = StringArray::from(vec![Some("A"), None, Some("A"), Some(""), None]);
2721
2722 let some_nested_object = StructArray::from(vec![
2723 (offset_field, Arc::new(offset) as ArrayRef),
2724 (partition_field, Arc::new(partition) as ArrayRef),
2725 (topic_field, Arc::new(topic) as ArrayRef),
2726 ]);
2727
2728 let batch =
2730 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
2731
2732 roundtrip(batch, Some(SMALL_SIZE / 2));
2733 }
2734
2735 #[test]
2736 fn arrow_writer_map() {
2737 let json_content = r#"
2739 {"stocks":{"long": "$AAA", "short": "$BBB"}}
2740 {"stocks":{"long": null, "long": "$CCC", "short": null}}
2741 {"stocks":{"hedged": "$YYY", "long": null, "short": "$D"}}
2742 "#;
2743 let entries_struct_type = DataType::Struct(Fields::from(vec![
2744 Field::new("key", DataType::Utf8, false),
2745 Field::new("value", DataType::Utf8, true),
2746 ]));
2747 let stocks_field = Field::new(
2748 "stocks",
2749 DataType::Map(
2750 Arc::new(Field::new("entries", entries_struct_type, false)),
2751 false,
2752 ),
2753 true,
2754 );
2755 let schema = Arc::new(Schema::new(vec![stocks_field]));
2756 let builder = arrow::json::ReaderBuilder::new(schema).with_batch_size(64);
2757 let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
2758
2759 let batch = reader.next().unwrap().unwrap();
2760 roundtrip(batch, None);
2761 }
2762
2763 #[test]
2764 fn arrow_writer_2_level_struct() {
2765 let field_c = Field::new("c", DataType::Int32, true);
2767 let field_b = Field::new("b", DataType::Struct(vec![field_c].into()), true);
2768 let type_a = DataType::Struct(vec![field_b.clone()].into());
2769 let field_a = Field::new("a", type_a, true);
2770 let schema = Schema::new(vec![field_a.clone()]);
2771
2772 let c = Int32Array::from(vec![Some(1), None, Some(3), None, None, Some(6)]);
2774 let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
2775 .len(6)
2776 .null_bit_buffer(Some(Buffer::from([0b00100111])))
2777 .add_child_data(c.into_data())
2778 .build()
2779 .unwrap();
2780 let b = StructArray::from(b_data);
2781 let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
2782 .len(6)
2783 .null_bit_buffer(Some(Buffer::from([0b00101111])))
2784 .add_child_data(b.into_data())
2785 .build()
2786 .unwrap();
2787 let a = StructArray::from(a_data);
2788
2789 assert_eq!(a.null_count(), 1);
2790 assert_eq!(a.column(0).null_count(), 2);
2791
2792 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2794
2795 roundtrip(batch, Some(SMALL_SIZE / 2));
2796 }
2797
2798 #[test]
2799 fn arrow_writer_2_level_struct_non_null() {
2800 let field_c = Field::new("c", DataType::Int32, false);
2802 let type_b = DataType::Struct(vec![field_c].into());
2803 let field_b = Field::new("b", type_b.clone(), false);
2804 let type_a = DataType::Struct(vec![field_b].into());
2805 let field_a = Field::new("a", type_a.clone(), false);
2806 let schema = Schema::new(vec![field_a]);
2807
2808 let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
2810 let b_data = ArrayDataBuilder::new(type_b)
2811 .len(6)
2812 .add_child_data(c.into_data())
2813 .build()
2814 .unwrap();
2815 let b = StructArray::from(b_data);
2816 let a_data = ArrayDataBuilder::new(type_a)
2817 .len(6)
2818 .add_child_data(b.into_data())
2819 .build()
2820 .unwrap();
2821 let a = StructArray::from(a_data);
2822
2823 assert_eq!(a.null_count(), 0);
2824 assert_eq!(a.column(0).null_count(), 0);
2825
2826 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2828
2829 roundtrip(batch, Some(SMALL_SIZE / 2));
2830 }
2831
2832 #[test]
2833 fn arrow_writer_2_level_struct_mixed_null() {
2834 let field_c = Field::new("c", DataType::Int32, false);
2836 let type_b = DataType::Struct(vec![field_c].into());
2837 let field_b = Field::new("b", type_b.clone(), true);
2838 let type_a = DataType::Struct(vec![field_b].into());
2839 let field_a = Field::new("a", type_a.clone(), false);
2840 let schema = Schema::new(vec![field_a]);
2841
2842 let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
2844 let b_data = ArrayDataBuilder::new(type_b)
2845 .len(6)
2846 .null_bit_buffer(Some(Buffer::from([0b00100111])))
2847 .add_child_data(c.into_data())
2848 .build()
2849 .unwrap();
2850 let b = StructArray::from(b_data);
2851 let a_data = ArrayDataBuilder::new(type_a)
2853 .len(6)
2854 .add_child_data(b.into_data())
2855 .build()
2856 .unwrap();
2857 let a = StructArray::from(a_data);
2858
2859 assert_eq!(a.null_count(), 0);
2860 assert_eq!(a.column(0).null_count(), 2);
2861
2862 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2864
2865 roundtrip(batch, Some(SMALL_SIZE / 2));
2866 }
2867
2868 #[test]
2869 fn arrow_writer_2_level_struct_mixed_null_2() {
2870 let field_c = Field::new("c", DataType::Int32, false);
2872 let field_d = Field::new("d", DataType::FixedSizeBinary(4), false);
2873 let field_e = Field::new(
2874 "e",
2875 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2876 false,
2877 );
2878
2879 let field_b = Field::new(
2880 "b",
2881 DataType::Struct(vec![field_c, field_d, field_e].into()),
2882 false,
2883 );
2884 let type_a = DataType::Struct(vec![field_b.clone()].into());
2885 let field_a = Field::new("a", type_a, true);
2886 let schema = Schema::new(vec![field_a.clone()]);
2887
2888 let c = Int32Array::from_iter_values(0..6);
2890 let d = FixedSizeBinaryArray::try_from_iter(
2891 ["aaaa", "bbbb", "cccc", "dddd", "eeee", "ffff"].into_iter(),
2892 )
2893 .expect("four byte values");
2894 let e = Int32DictionaryArray::from_iter(["one", "two", "three", "four", "five", "one"]);
2895 let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
2896 .len(6)
2897 .add_child_data(c.into_data())
2898 .add_child_data(d.into_data())
2899 .add_child_data(e.into_data())
2900 .build()
2901 .unwrap();
2902 let b = StructArray::from(b_data);
2903 let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
2904 .len(6)
2905 .null_bit_buffer(Some(Buffer::from([0b00100101])))
2906 .add_child_data(b.into_data())
2907 .build()
2908 .unwrap();
2909 let a = StructArray::from(a_data);
2910
2911 assert_eq!(a.null_count(), 3);
2912 assert_eq!(a.column(0).null_count(), 0);
2913
2914 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2916
2917 roundtrip(batch, Some(SMALL_SIZE / 2));
2918 }
2919
2920 #[test]
2921 fn test_fixed_size_binary_in_dict() {
2922 fn test_fixed_size_binary_in_dict_inner<K>()
2923 where
2924 K: ArrowDictionaryKeyType,
2925 K::Native: FromPrimitive + ToPrimitive + TryFrom<u8>,
2926 <<K as arrow_array::ArrowPrimitiveType>::Native as TryFrom<u8>>::Error: std::fmt::Debug,
2927 {
2928 let field = Field::new(
2929 "a",
2930 DataType::Dictionary(
2931 Box::new(K::DATA_TYPE),
2932 Box::new(DataType::FixedSizeBinary(4)),
2933 ),
2934 false,
2935 );
2936 let schema = Schema::new(vec![field]);
2937
2938 let keys: Vec<K::Native> = vec![
2939 K::Native::try_from(0u8).unwrap(),
2940 K::Native::try_from(0u8).unwrap(),
2941 K::Native::try_from(1u8).unwrap(),
2942 ];
2943 let keys = PrimitiveArray::<K>::from_iter_values(keys);
2944 let values = FixedSizeBinaryArray::try_from_iter(
2945 vec![vec![0, 0, 0, 0], vec![1, 1, 1, 1]].into_iter(),
2946 )
2947 .unwrap();
2948
2949 let data = DictionaryArray::<K>::new(keys, Arc::new(values));
2950 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(data)]).unwrap();
2951 roundtrip(batch, None);
2952 }
2953
2954 test_fixed_size_binary_in_dict_inner::<UInt8Type>();
2955 test_fixed_size_binary_in_dict_inner::<UInt16Type>();
2956 test_fixed_size_binary_in_dict_inner::<UInt32Type>();
2957 test_fixed_size_binary_in_dict_inner::<UInt16Type>();
2958 test_fixed_size_binary_in_dict_inner::<Int8Type>();
2959 test_fixed_size_binary_in_dict_inner::<Int16Type>();
2960 test_fixed_size_binary_in_dict_inner::<Int32Type>();
2961 test_fixed_size_binary_in_dict_inner::<Int64Type>();
2962 }
2963
2964 #[test]
2965 fn test_empty_dict() {
2966 let struct_fields = Fields::from(vec![Field::new(
2967 "dict",
2968 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2969 false,
2970 )]);
2971
2972 let schema = Schema::new(vec![Field::new_struct(
2973 "struct",
2974 struct_fields.clone(),
2975 true,
2976 )]);
2977 let dictionary = Arc::new(DictionaryArray::new(
2978 Int32Array::new_null(5),
2979 Arc::new(StringArray::new_null(0)),
2980 ));
2981
2982 let s = StructArray::new(
2983 struct_fields,
2984 vec![dictionary],
2985 Some(NullBuffer::new_null(5)),
2986 );
2987
2988 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(s)]).unwrap();
2989 roundtrip(batch, None);
2990 }
2991 #[test]
2992 fn arrow_writer_page_size() {
2993 let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)]));
2994
2995 let mut builder = StringBuilder::with_capacity(100, 329 * 10_000);
2996
2997 for i in 0..10 {
2999 let value = i
3000 .to_string()
3001 .repeat(10)
3002 .chars()
3003 .take(10)
3004 .collect::<String>();
3005
3006 builder.append_value(value);
3007 }
3008
3009 let array = Arc::new(builder.finish());
3010
3011 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
3012
3013 let file = tempfile::tempfile().unwrap();
3014
3015 let props = WriterProperties::builder()
3017 .set_data_page_size_limit(1)
3018 .set_dictionary_page_size_limit(1)
3019 .set_write_batch_size(1)
3020 .build();
3021
3022 let mut writer =
3023 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
3024 .expect("Unable to write file");
3025 writer.write(&batch).unwrap();
3026 writer.close().unwrap();
3027
3028 let options = ReadOptionsBuilder::new().with_page_index().build();
3029 let reader =
3030 SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap();
3031
3032 let column = reader.metadata().row_group(0).columns();
3033
3034 assert_eq!(column.len(), 1);
3035
3036 assert!(
3039 column[0].dictionary_page_offset().is_some(),
3040 "Expected a dictionary page"
3041 );
3042
3043 assert!(reader.metadata().offset_index().is_some());
3044 let offset_indexes = &reader.metadata().offset_index().unwrap()[0];
3045
3046 let page_locations = offset_indexes[0].page_locations.clone();
3047
3048 assert_eq!(
3051 page_locations.len(),
3052 10,
3053 "Expected 10 pages but got {page_locations:#?}"
3054 );
3055 }
3056
3057 #[test]
3058 fn arrow_writer_float_nans() {
3059 let f16_field = Field::new("a", DataType::Float16, false);
3060 let f32_field = Field::new("b", DataType::Float32, false);
3061 let f64_field = Field::new("c", DataType::Float64, false);
3062 let schema = Schema::new(vec![f16_field, f32_field, f64_field]);
3063
3064 let f16_values = (0..MEDIUM_SIZE)
3065 .map(|i| {
3066 Some(if i % 2 == 0 {
3067 f16::NAN
3068 } else {
3069 f16::from_f32(i as f32)
3070 })
3071 })
3072 .collect::<Float16Array>();
3073
3074 let f32_values = (0..MEDIUM_SIZE)
3075 .map(|i| Some(if i % 2 == 0 { f32::NAN } else { i as f32 }))
3076 .collect::<Float32Array>();
3077
3078 let f64_values = (0..MEDIUM_SIZE)
3079 .map(|i| Some(if i % 2 == 0 { f64::NAN } else { i as f64 }))
3080 .collect::<Float64Array>();
3081
3082 let batch = RecordBatch::try_new(
3083 Arc::new(schema),
3084 vec![
3085 Arc::new(f16_values),
3086 Arc::new(f32_values),
3087 Arc::new(f64_values),
3088 ],
3089 )
3090 .unwrap();
3091
3092 roundtrip(batch, None);
3093 }
3094
3095 const SMALL_SIZE: usize = 7;
3096 const MEDIUM_SIZE: usize = 63;
3097
3098 fn roundtrip(expected_batch: RecordBatch, max_row_group_size: Option<usize>) -> Vec<Bytes> {
3101 let mut files = vec![];
3102 for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
3103 let mut props = WriterProperties::builder().set_writer_version(version);
3104
3105 if let Some(size) = max_row_group_size {
3106 props = props.set_max_row_group_row_count(Some(size))
3107 }
3108
3109 let props = props.build();
3110 files.push(roundtrip_opts(&expected_batch, props))
3111 }
3112 files
3113 }
3114
3115 fn roundtrip_opts_with_array_validation<F>(
3119 expected_batch: &RecordBatch,
3120 props: WriterProperties,
3121 validate: F,
3122 ) -> Bytes
3123 where
3124 F: Fn(&ArrayData, &ArrayData),
3125 {
3126 let mut file = vec![];
3127
3128 let mut writer = ArrowWriter::try_new(&mut file, expected_batch.schema(), Some(props))
3129 .expect("Unable to write file");
3130 writer.write(expected_batch).unwrap();
3131 writer.close().unwrap();
3132
3133 let file = Bytes::from(file);
3134 let mut record_batch_reader =
3135 ParquetRecordBatchReader::try_new(file.clone(), 1024).unwrap();
3136
3137 let actual_batch = record_batch_reader
3138 .next()
3139 .expect("No batch found")
3140 .expect("Unable to get batch");
3141
3142 assert_eq!(expected_batch.schema(), actual_batch.schema());
3143 assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
3144 assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
3145 for i in 0..expected_batch.num_columns() {
3146 let expected_data = expected_batch.column(i).to_data();
3147 let actual_data = actual_batch.column(i).to_data();
3148 validate(&expected_data, &actual_data);
3149 }
3150
3151 file
3152 }
3153
3154 fn roundtrip_opts(expected_batch: &RecordBatch, props: WriterProperties) -> Bytes {
3155 roundtrip_opts_with_array_validation(expected_batch, props, |a, b| {
3156 a.validate_full().expect("valid expected data");
3157 b.validate_full().expect("valid actual data");
3158 assert_eq!(a, b)
3159 })
3160 }
3161
3162 struct RoundTripOptions {
3163 values: ArrayRef,
3164 schema: SchemaRef,
3165 bloom_filter: bool,
3166 bloom_filter_ndv: Option<u64>,
3167 bloom_filter_position: BloomFilterPosition,
3168 }
3169
3170 impl RoundTripOptions {
3171 fn new(values: ArrayRef, nullable: bool) -> Self {
3172 let data_type = values.data_type().clone();
3173 let schema = Schema::new(vec![Field::new("col", data_type, nullable)]);
3174 Self {
3175 values,
3176 schema: Arc::new(schema),
3177 bloom_filter: false,
3178 bloom_filter_ndv: None,
3179 bloom_filter_position: BloomFilterPosition::AfterRowGroup,
3180 }
3181 }
3182 }
3183
3184 fn one_column_roundtrip(values: ArrayRef, nullable: bool) -> Vec<Bytes> {
3185 one_column_roundtrip_with_options(RoundTripOptions::new(values, nullable))
3186 }
3187
3188 fn one_column_roundtrip_with_schema(values: ArrayRef, schema: SchemaRef) -> Vec<Bytes> {
3189 let mut options = RoundTripOptions::new(values, false);
3190 options.schema = schema;
3191 one_column_roundtrip_with_options(options)
3192 }
3193
3194 fn one_column_roundtrip_with_options(options: RoundTripOptions) -> Vec<Bytes> {
3195 let RoundTripOptions {
3196 values,
3197 schema,
3198 bloom_filter,
3199 bloom_filter_ndv,
3200 bloom_filter_position,
3201 } = options;
3202
3203 let encodings = match values.data_type() {
3204 DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary | DataType::LargeBinary => {
3205 vec![
3206 Encoding::PLAIN,
3207 Encoding::DELTA_BYTE_ARRAY,
3208 Encoding::DELTA_LENGTH_BYTE_ARRAY,
3209 ]
3210 }
3211 DataType::Int64
3212 | DataType::Int32
3213 | DataType::Int16
3214 | DataType::Int8
3215 | DataType::UInt64
3216 | DataType::UInt32
3217 | DataType::UInt16
3218 | DataType::UInt8 => vec![
3219 Encoding::PLAIN,
3220 Encoding::DELTA_BINARY_PACKED,
3221 Encoding::BYTE_STREAM_SPLIT,
3222 ],
3223 DataType::Float32 | DataType::Float64 => {
3224 vec![Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT]
3225 }
3226 _ => vec![Encoding::PLAIN],
3227 };
3228
3229 let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3230
3231 let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
3232
3233 let mut files = vec![];
3234 for dictionary_size in [0, 1, 1024] {
3235 for encoding in &encodings {
3236 for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
3237 for row_group_size in row_group_sizes {
3238 let mut builder = WriterProperties::builder()
3239 .set_writer_version(version)
3240 .set_max_row_group_row_count(Some(row_group_size))
3241 .set_dictionary_enabled(dictionary_size != 0)
3242 .set_dictionary_page_size_limit(dictionary_size.max(1))
3243 .set_encoding(*encoding)
3244 .set_bloom_filter_enabled(bloom_filter)
3245 .set_bloom_filter_position(bloom_filter_position);
3246 if let Some(ndv) = bloom_filter_ndv {
3247 builder = builder.set_bloom_filter_max_ndv(ndv);
3248 }
3249 let props = builder.build();
3250
3251 files.push(roundtrip_opts(&expected_batch, props))
3252 }
3253 }
3254 }
3255 }
3256 files
3257 }
3258
3259 fn values_required<A, I>(iter: I) -> Vec<Bytes>
3260 where
3261 A: From<Vec<I::Item>> + Array + 'static,
3262 I: IntoIterator,
3263 {
3264 let raw_values: Vec<_> = iter.into_iter().collect();
3265 let values = Arc::new(A::from(raw_values));
3266 one_column_roundtrip(values, false)
3267 }
3268
3269 fn values_optional<A, I>(iter: I) -> Vec<Bytes>
3270 where
3271 A: From<Vec<Option<I::Item>>> + Array + 'static,
3272 I: IntoIterator,
3273 {
3274 let optional_raw_values: Vec<_> = iter
3275 .into_iter()
3276 .enumerate()
3277 .map(|(i, v)| if i % 2 == 0 { None } else { Some(v) })
3278 .collect();
3279 let optional_values = Arc::new(A::from(optional_raw_values));
3280 one_column_roundtrip(optional_values, true)
3281 }
3282
3283 fn required_and_optional<A, I>(iter: I)
3284 where
3285 A: From<Vec<I::Item>> + From<Vec<Option<I::Item>>> + Array + 'static,
3286 I: IntoIterator + Clone,
3287 {
3288 values_required::<A, I>(iter.clone());
3289 values_optional::<A, I>(iter);
3290 }
3291
3292 fn check_bloom_filter<T: AsBytes>(
3293 files: Vec<Bytes>,
3294 file_column: String,
3295 positive_values: Vec<T>,
3296 negative_values: Vec<T>,
3297 ) {
3298 files.into_iter().take(1).for_each(|file| {
3299 let file_reader = SerializedFileReader::new_with_options(
3300 file,
3301 ReadOptionsBuilder::new()
3302 .with_reader_properties(
3303 ReaderProperties::builder()
3304 .set_read_bloom_filter(true)
3305 .build(),
3306 )
3307 .build(),
3308 )
3309 .expect("Unable to open file as Parquet");
3310 let metadata = file_reader.metadata();
3311
3312 let mut bloom_filters: Vec<_> = vec![];
3314 for (ri, row_group) in metadata.row_groups().iter().enumerate() {
3315 if let Some((column_index, _)) = row_group
3316 .columns()
3317 .iter()
3318 .enumerate()
3319 .find(|(_, column)| column.column_path().string() == file_column)
3320 {
3321 let row_group_reader = file_reader
3322 .get_row_group(ri)
3323 .expect("Unable to read row group");
3324 if let Some(sbbf) = row_group_reader.get_column_bloom_filter(column_index) {
3325 bloom_filters.push(sbbf.clone());
3326 } else {
3327 panic!("No bloom filter for column named {file_column} found");
3328 }
3329 } else {
3330 panic!("No column named {file_column} found");
3331 }
3332 }
3333
3334 positive_values.iter().for_each(|value| {
3335 let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
3336 assert!(
3337 found.is_some(),
3338 "{}",
3339 format!("Value {:?} should be in bloom filter", value.as_bytes())
3340 );
3341 });
3342
3343 negative_values.iter().for_each(|value| {
3344 let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
3345 assert!(
3346 found.is_none(),
3347 "{}",
3348 format!("Value {:?} should not be in bloom filter", value.as_bytes())
3349 );
3350 });
3351 });
3352 }
3353
3354 #[test]
3355 fn all_null_primitive_single_column() {
3356 let values = Arc::new(Int32Array::from(vec![None; SMALL_SIZE]));
3357 one_column_roundtrip(values, true);
3358 }
3359 #[test]
3360 fn null_single_column() {
3361 let values = Arc::new(NullArray::new(SMALL_SIZE));
3362 one_column_roundtrip(values, true);
3363 }
3365
3366 #[test]
3367 fn bool_single_column() {
3368 required_and_optional::<BooleanArray, _>(
3369 [true, false].iter().cycle().copied().take(SMALL_SIZE),
3370 );
3371 }
3372
3373 #[test]
3374 fn bool_large_single_column() {
3375 let values = Arc::new(
3376 [None, Some(true), Some(false)]
3377 .iter()
3378 .cycle()
3379 .copied()
3380 .take(200_000)
3381 .collect::<BooleanArray>(),
3382 );
3383 let schema = Schema::new(vec![Field::new("col", values.data_type().clone(), true)]);
3384 let expected_batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
3385 let file = tempfile::tempfile().unwrap();
3386
3387 let mut writer =
3388 ArrowWriter::try_new(file.try_clone().unwrap(), expected_batch.schema(), None)
3389 .expect("Unable to write file");
3390 writer.write(&expected_batch).unwrap();
3391 writer.close().unwrap();
3392 }
3393
3394 #[test]
3395 fn check_page_offset_index_with_nan() {
3396 let values = Arc::new(Float64Array::from(vec![f64::NAN; 10]));
3397 let schema = Schema::new(vec![Field::new("col", DataType::Float64, true)]);
3398 let batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
3399
3400 let mut out = Vec::with_capacity(1024);
3401 let mut writer =
3402 ArrowWriter::try_new(&mut out, batch.schema(), None).expect("Unable to write file");
3403 writer.write(&batch).unwrap();
3404 let file_meta_data = writer.close().unwrap();
3405 for row_group in file_meta_data.row_groups() {
3406 for column in row_group.columns() {
3407 assert!(column.offset_index_offset().is_some());
3408 assert!(column.offset_index_length().is_some());
3409 assert!(column.column_index_offset().is_none());
3410 assert!(column.column_index_length().is_none());
3411 }
3412 }
3413 }
3414
3415 #[test]
3416 fn i8_single_column() {
3417 required_and_optional::<Int8Array, _>(0..SMALL_SIZE as i8);
3418 }
3419
3420 #[test]
3421 fn i16_single_column() {
3422 required_and_optional::<Int16Array, _>(0..SMALL_SIZE as i16);
3423 }
3424
3425 #[test]
3426 fn i32_single_column() {
3427 required_and_optional::<Int32Array, _>(0..SMALL_SIZE as i32);
3428 }
3429
3430 #[test]
3431 fn i64_single_column() {
3432 required_and_optional::<Int64Array, _>(0..SMALL_SIZE as i64);
3433 }
3434
3435 #[test]
3436 fn u8_single_column() {
3437 required_and_optional::<UInt8Array, _>(0..SMALL_SIZE as u8);
3438 }
3439
3440 #[test]
3441 fn u16_single_column() {
3442 required_and_optional::<UInt16Array, _>(0..SMALL_SIZE as u16);
3443 }
3444
3445 #[test]
3446 fn u32_single_column() {
3447 required_and_optional::<UInt32Array, _>(0..SMALL_SIZE as u32);
3448 }
3449
3450 #[test]
3451 fn u64_single_column() {
3452 required_and_optional::<UInt64Array, _>(0..SMALL_SIZE as u64);
3453 }
3454
3455 #[test]
3456 fn f32_single_column() {
3457 required_and_optional::<Float32Array, _>((0..SMALL_SIZE).map(|i| i as f32));
3458 }
3459
3460 #[test]
3461 fn f64_single_column() {
3462 required_and_optional::<Float64Array, _>((0..SMALL_SIZE).map(|i| i as f64));
3463 }
3464
3465 #[test]
3470 fn timestamp_second_single_column() {
3471 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
3472 let values = Arc::new(TimestampSecondArray::from(raw_values));
3473
3474 one_column_roundtrip(values, false);
3475 }
3476
3477 #[test]
3478 fn timestamp_millisecond_single_column() {
3479 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
3480 let values = Arc::new(TimestampMillisecondArray::from(raw_values));
3481
3482 one_column_roundtrip(values, false);
3483 }
3484
3485 #[test]
3486 fn timestamp_microsecond_single_column() {
3487 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
3488 let values = Arc::new(TimestampMicrosecondArray::from(raw_values));
3489
3490 one_column_roundtrip(values, false);
3491 }
3492
3493 #[test]
3494 fn timestamp_nanosecond_single_column() {
3495 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
3496 let values = Arc::new(TimestampNanosecondArray::from(raw_values));
3497
3498 one_column_roundtrip(values, false);
3499 }
3500
3501 #[test]
3502 fn date32_single_column() {
3503 required_and_optional::<Date32Array, _>(0..SMALL_SIZE as i32);
3504 }
3505
3506 #[test]
3507 fn date64_single_column() {
3508 required_and_optional::<Date64Array, _>(
3510 (0..(SMALL_SIZE as i64 * 86400000)).step_by(86400000),
3511 );
3512 }
3513
3514 #[test]
3515 fn time32_second_single_column() {
3516 required_and_optional::<Time32SecondArray, _>(0..SMALL_SIZE as i32);
3517 }
3518
3519 #[test]
3520 fn time32_millisecond_single_column() {
3521 required_and_optional::<Time32MillisecondArray, _>(0..SMALL_SIZE as i32);
3522 }
3523
3524 #[test]
3525 fn time64_microsecond_single_column() {
3526 required_and_optional::<Time64MicrosecondArray, _>(0..SMALL_SIZE as i64);
3527 }
3528
3529 #[test]
3530 fn time64_nanosecond_single_column() {
3531 required_and_optional::<Time64NanosecondArray, _>(0..SMALL_SIZE as i64);
3532 }
3533
3534 #[test]
3535 fn duration_second_single_column() {
3536 required_and_optional::<DurationSecondArray, _>(0..SMALL_SIZE as i64);
3537 }
3538
3539 #[test]
3540 fn duration_millisecond_single_column() {
3541 required_and_optional::<DurationMillisecondArray, _>(0..SMALL_SIZE as i64);
3542 }
3543
3544 #[test]
3545 fn duration_microsecond_single_column() {
3546 required_and_optional::<DurationMicrosecondArray, _>(0..SMALL_SIZE as i64);
3547 }
3548
3549 #[test]
3550 fn duration_nanosecond_single_column() {
3551 required_and_optional::<DurationNanosecondArray, _>(0..SMALL_SIZE as i64);
3552 }
3553
3554 #[test]
3555 fn interval_year_month_single_column() {
3556 required_and_optional::<IntervalYearMonthArray, _>(0..SMALL_SIZE as i32);
3557 }
3558
3559 #[test]
3560 fn interval_day_time_single_column() {
3561 required_and_optional::<IntervalDayTimeArray, _>(vec![
3562 IntervalDayTime::new(0, 1),
3563 IntervalDayTime::new(0, 3),
3564 IntervalDayTime::new(3, -2),
3565 IntervalDayTime::new(-200, 4),
3566 ]);
3567 }
3568
3569 #[test]
3570 #[should_panic(
3571 expected = "Attempting to write an Arrow interval type MonthDayNano to parquet that is not yet implemented"
3572 )]
3573 fn interval_month_day_nano_single_column() {
3574 required_and_optional::<IntervalMonthDayNanoArray, _>(vec![
3575 IntervalMonthDayNano::new(0, 1, 5),
3576 IntervalMonthDayNano::new(0, 3, 2),
3577 IntervalMonthDayNano::new(3, -2, -5),
3578 IntervalMonthDayNano::new(-200, 4, -1),
3579 ]);
3580 }
3581
3582 #[test]
3583 fn binary_single_column() {
3584 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3585 let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3586 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3587
3588 values_required::<BinaryArray, _>(many_vecs_iter);
3590 }
3591
3592 #[test]
3593 fn binary_view_single_column() {
3594 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3595 let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3596 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3597
3598 values_required::<BinaryViewArray, _>(many_vecs_iter);
3600 }
3601
3602 #[test]
3603 fn i32_column_bloom_filter_at_end() {
3604 let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
3605 let mut options = RoundTripOptions::new(array, false);
3606 options.bloom_filter = true;
3607 options.bloom_filter_position = BloomFilterPosition::End;
3608
3609 let files = one_column_roundtrip_with_options(options);
3610 check_bloom_filter(
3611 files,
3612 "col".to_string(),
3613 (0..SMALL_SIZE as i32).collect(),
3614 (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3615 );
3616 }
3617
3618 #[test]
3619 fn i32_column_bloom_filter() {
3620 let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
3621 let mut options = RoundTripOptions::new(array, false);
3622 options.bloom_filter = true;
3623
3624 let files = one_column_roundtrip_with_options(options);
3625 check_bloom_filter(
3626 files,
3627 "col".to_string(),
3628 (0..SMALL_SIZE as i32).collect(),
3629 (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3630 );
3631 }
3632
3633 #[test]
3638 fn i32_column_bloom_filter_fixed_ndv() {
3639 let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
3640
3641 let mut options = RoundTripOptions::new(array.clone(), false);
3643 options.bloom_filter = true;
3644 options.bloom_filter_ndv = Some(1_000_000);
3645
3646 let files = one_column_roundtrip_with_options(options);
3647 check_bloom_filter(
3648 files,
3649 "col".to_string(),
3650 (0..SMALL_SIZE as i32).collect(),
3651 (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3652 );
3653
3654 let mut options = RoundTripOptions::new(array, false);
3656 options.bloom_filter = true;
3657 options.bloom_filter_ndv = Some(3);
3658
3659 let files = one_column_roundtrip_with_options(options);
3660 check_bloom_filter(
3661 files,
3662 "col".to_string(),
3663 (0..SMALL_SIZE as i32).collect(),
3664 (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3665 );
3666 }
3667
3668 #[test]
3669 fn binary_column_bloom_filter() {
3670 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3671 let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3672 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3673
3674 let array = Arc::new(BinaryArray::from_iter_values(many_vecs_iter));
3675 let mut options = RoundTripOptions::new(array, false);
3676 options.bloom_filter = true;
3677
3678 let files = one_column_roundtrip_with_options(options);
3679 check_bloom_filter(
3680 files,
3681 "col".to_string(),
3682 many_vecs,
3683 vec![vec![(SMALL_SIZE + 1) as u8]],
3684 );
3685 }
3686
3687 #[test]
3688 fn empty_string_null_column_bloom_filter() {
3689 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3690 let raw_strs = raw_values.iter().map(|s| s.as_str());
3691
3692 let array = Arc::new(StringArray::from_iter_values(raw_strs));
3693 let mut options = RoundTripOptions::new(array, false);
3694 options.bloom_filter = true;
3695
3696 let files = one_column_roundtrip_with_options(options);
3697
3698 let optional_raw_values: Vec<_> = raw_values
3699 .iter()
3700 .enumerate()
3701 .filter_map(|(i, v)| if i % 2 == 0 { None } else { Some(v.as_str()) })
3702 .collect();
3703 check_bloom_filter(files, "col".to_string(), optional_raw_values, vec![""]);
3705 }
3706
3707 #[test]
3708 fn large_binary_single_column() {
3709 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3710 let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3711 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3712
3713 values_required::<LargeBinaryArray, _>(many_vecs_iter);
3715 }
3716
3717 #[test]
3718 fn fixed_size_binary_single_column() {
3719 let mut builder = FixedSizeBinaryBuilder::new(4);
3720 builder.append_value(b"0123").unwrap();
3721 builder.append_null();
3722 builder.append_value(b"8910").unwrap();
3723 builder.append_value(b"1112").unwrap();
3724 let array = Arc::new(builder.finish());
3725
3726 one_column_roundtrip(array, true);
3727 }
3728
3729 #[test]
3730 fn string_single_column() {
3731 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3732 let raw_strs = raw_values.iter().map(|s| s.as_str());
3733
3734 required_and_optional::<StringArray, _>(raw_strs);
3735 }
3736
3737 #[test]
3738 fn large_string_single_column() {
3739 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3740 let raw_strs = raw_values.iter().map(|s| s.as_str());
3741
3742 required_and_optional::<LargeStringArray, _>(raw_strs);
3743 }
3744
3745 #[test]
3746 fn string_view_single_column() {
3747 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3748 let raw_strs = raw_values.iter().map(|s| s.as_str());
3749
3750 required_and_optional::<StringViewArray, _>(raw_strs);
3751 }
3752
3753 #[test]
3754 fn null_list_single_column() {
3755 let null_field = Field::new_list_field(DataType::Null, true);
3756 let list_field = Field::new("emptylist", DataType::List(Arc::new(null_field)), true);
3757
3758 let schema = Schema::new(vec![list_field]);
3759
3760 let a_values = NullArray::new(2);
3762 let a_value_offsets = arrow::buffer::Buffer::from([0, 0, 0, 2].to_byte_slice());
3763 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
3764 DataType::Null,
3765 true,
3766 ))))
3767 .len(3)
3768 .add_buffer(a_value_offsets)
3769 .null_bit_buffer(Some(Buffer::from([0b00000101])))
3770 .add_child_data(a_values.into_data())
3771 .build()
3772 .unwrap();
3773
3774 let a = ListArray::from(a_list_data);
3775
3776 assert!(a.is_valid(0));
3777 assert!(!a.is_valid(1));
3778 assert!(a.is_valid(2));
3779
3780 assert_eq!(a.value(0).len(), 0);
3781 assert_eq!(a.value(2).len(), 2);
3782 assert_eq!(a.value(2).logical_nulls().unwrap().null_count(), 2);
3783
3784 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
3785 roundtrip(batch, None);
3786 }
3787
3788 #[test]
3789 fn list_single_column() {
3790 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
3791 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
3792 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
3793 DataType::Int32,
3794 false,
3795 ))))
3796 .len(5)
3797 .add_buffer(a_value_offsets)
3798 .null_bit_buffer(Some(Buffer::from([0b00011011])))
3799 .add_child_data(a_values.into_data())
3800 .build()
3801 .unwrap();
3802
3803 assert_eq!(a_list_data.null_count(), 1);
3804
3805 let a = ListArray::from(a_list_data);
3806 let values = Arc::new(a);
3807
3808 one_column_roundtrip(values, true);
3809 }
3810
3811 #[test]
3812 fn large_list_single_column() {
3813 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
3814 let a_value_offsets = arrow::buffer::Buffer::from([0i64, 1, 3, 3, 6, 10].to_byte_slice());
3815 let a_list_data = ArrayData::builder(DataType::LargeList(Arc::new(Field::new(
3816 "large_item",
3817 DataType::Int32,
3818 true,
3819 ))))
3820 .len(5)
3821 .add_buffer(a_value_offsets)
3822 .add_child_data(a_values.into_data())
3823 .null_bit_buffer(Some(Buffer::from([0b00011011])))
3824 .build()
3825 .unwrap();
3826
3827 assert_eq!(a_list_data.null_count(), 1);
3829
3830 let a = LargeListArray::from(a_list_data);
3831 let values = Arc::new(a);
3832
3833 one_column_roundtrip(values, true);
3834 }
3835
3836 #[test]
3837 fn list_nested_nulls() {
3838 use arrow::datatypes::Int32Type;
3839 let data = vec![
3840 Some(vec![Some(1)]),
3841 Some(vec![Some(2), Some(3)]),
3842 None,
3843 Some(vec![Some(4), Some(5), None]),
3844 Some(vec![None]),
3845 Some(vec![Some(6), Some(7)]),
3846 ];
3847
3848 let list = ListArray::from_iter_primitive::<Int32Type, _, _>(data.clone());
3849 one_column_roundtrip(Arc::new(list), true);
3850
3851 let list = LargeListArray::from_iter_primitive::<Int32Type, _, _>(data);
3852 one_column_roundtrip(Arc::new(list), true);
3853 }
3854
3855 #[test]
3856 fn struct_single_column() {
3857 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
3858 let struct_field_a = Arc::new(Field::new("f", DataType::Int32, false));
3859 let s = StructArray::from(vec![(struct_field_a, Arc::new(a_values) as ArrayRef)]);
3860
3861 let values = Arc::new(s);
3862 one_column_roundtrip(values, false);
3863 }
3864
3865 #[test]
3866 fn list_and_map_coerced_names() {
3867 let list_field =
3869 Field::new_list("my_list", Field::new("item", DataType::Int32, false), false);
3870 let map_field = Field::new_map(
3871 "my_map",
3872 "entries",
3873 Field::new("keys", DataType::Int32, false),
3874 Field::new("values", DataType::Int32, true),
3875 false,
3876 true,
3877 );
3878
3879 let list_array = create_random_array(&list_field, 100, 0.0, 0.0).unwrap();
3880 let map_array = create_random_array(&map_field, 100, 0.0, 0.0).unwrap();
3881
3882 let arrow_schema = Arc::new(Schema::new(vec![list_field, map_field]));
3883
3884 let props = Some(WriterProperties::builder().set_coerce_types(true).build());
3886 let file = tempfile::tempfile().unwrap();
3887 let mut writer =
3888 ArrowWriter::try_new(file.try_clone().unwrap(), arrow_schema.clone(), props).unwrap();
3889
3890 let batch = RecordBatch::try_new(arrow_schema, vec![list_array, map_array]).unwrap();
3891 writer.write(&batch).unwrap();
3892 let file_metadata = writer.close().unwrap();
3893
3894 let schema = file_metadata.file_metadata().schema();
3895 let list_field = &schema.get_fields()[0].get_fields()[0];
3897 assert_eq!(list_field.get_fields()[0].name(), "element");
3898
3899 let map_field = &schema.get_fields()[1].get_fields()[0];
3900 assert_eq!(map_field.name(), "key_value");
3902 assert_eq!(map_field.get_fields()[0].name(), "key");
3904 assert_eq!(map_field.get_fields()[1].name(), "value");
3906
3907 let reader = SerializedFileReader::new(file).unwrap();
3909 let file_schema = reader.metadata().file_metadata().schema();
3910 let fields = file_schema.get_fields();
3911 let list_field = &fields[0].get_fields()[0];
3912 assert_eq!(list_field.get_fields()[0].name(), "element");
3913 let map_field = &fields[1].get_fields()[0];
3914 assert_eq!(map_field.name(), "key_value");
3915 assert_eq!(map_field.get_fields()[0].name(), "key");
3916 assert_eq!(map_field.get_fields()[1].name(), "value");
3917 }
3918
3919 #[test]
3920 fn fallback_flush_data_page() {
3921 let raw_values: Vec<_> = (0..MEDIUM_SIZE).map(|i| i.to_string()).collect();
3923 let values = Arc::new(StringArray::from(raw_values));
3924 let encodings = vec![
3925 Encoding::DELTA_BYTE_ARRAY,
3926 Encoding::DELTA_LENGTH_BYTE_ARRAY,
3927 ];
3928 let data_type = values.data_type().clone();
3929 let schema = Arc::new(Schema::new(vec![Field::new("col", data_type, false)]));
3930 let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3931
3932 let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
3933 let data_page_size_limit: usize = 32;
3934 let write_batch_size: usize = 16;
3935
3936 for encoding in &encodings {
3937 for row_group_size in row_group_sizes {
3938 let props = WriterProperties::builder()
3939 .set_writer_version(WriterVersion::PARQUET_2_0)
3940 .set_max_row_group_row_count(Some(row_group_size))
3941 .set_dictionary_enabled(false)
3942 .set_encoding(*encoding)
3943 .set_data_page_size_limit(data_page_size_limit)
3944 .set_write_batch_size(write_batch_size)
3945 .build();
3946
3947 roundtrip_opts_with_array_validation(&expected_batch, props, |a, b| {
3948 let string_array_a = StringArray::from(a.clone());
3949 let string_array_b = StringArray::from(b.clone());
3950 let vec_a: Vec<&str> = string_array_a.iter().map(|v| v.unwrap()).collect();
3951 let vec_b: Vec<&str> = string_array_b.iter().map(|v| v.unwrap()).collect();
3952 assert_eq!(
3953 vec_a, vec_b,
3954 "failed for encoder: {encoding:?} and row_group_size: {row_group_size:?}"
3955 );
3956 });
3957 }
3958 }
3959 }
3960
3961 #[test]
3962 fn arrow_writer_string_dictionary() {
3963 #[allow(deprecated)]
3965 let schema = Arc::new(Schema::new(vec![Field::new_dict(
3966 "dictionary",
3967 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3968 true,
3969 42,
3970 true,
3971 )]));
3972
3973 let d: Int32DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
3975 .iter()
3976 .copied()
3977 .collect();
3978
3979 one_column_roundtrip_with_schema(Arc::new(d), schema);
3981 }
3982
3983 #[test]
3984 fn arrow_writer_test_type_compatibility() {
3985 fn ensure_compatible_write<T1, T2>(array1: T1, array2: T2, expected_result: T1)
3986 where
3987 T1: Array + 'static,
3988 T2: Array + 'static,
3989 {
3990 let schema1 = Arc::new(Schema::new(vec![Field::new(
3991 "a",
3992 array1.data_type().clone(),
3993 false,
3994 )]));
3995
3996 let file = tempfile().unwrap();
3997 let mut writer =
3998 ArrowWriter::try_new(file.try_clone().unwrap(), schema1.clone(), None).unwrap();
3999
4000 let rb1 = RecordBatch::try_new(schema1.clone(), vec![Arc::new(array1)]).unwrap();
4001 writer.write(&rb1).unwrap();
4002
4003 let schema2 = Arc::new(Schema::new(vec![Field::new(
4004 "a",
4005 array2.data_type().clone(),
4006 false,
4007 )]));
4008 let rb2 = RecordBatch::try_new(schema2, vec![Arc::new(array2)]).unwrap();
4009 writer.write(&rb2).unwrap();
4010
4011 writer.close().unwrap();
4012
4013 let mut record_batch_reader =
4014 ParquetRecordBatchReader::try_new(file.try_clone().unwrap(), 1024).unwrap();
4015 let actual_batch = record_batch_reader.next().unwrap().unwrap();
4016
4017 let expected_batch =
4018 RecordBatch::try_new(schema1, vec![Arc::new(expected_result)]).unwrap();
4019 assert_eq!(actual_batch, expected_batch);
4020 }
4021
4022 ensure_compatible_write(
4025 DictionaryArray::new(
4026 UInt8Array::from_iter_values(vec![0]),
4027 Arc::new(StringArray::from_iter_values(vec!["parquet"])),
4028 ),
4029 StringArray::from_iter_values(vec!["barquet"]),
4030 DictionaryArray::new(
4031 UInt8Array::from_iter_values(vec![0, 1]),
4032 Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
4033 ),
4034 );
4035
4036 ensure_compatible_write(
4037 StringArray::from_iter_values(vec!["parquet"]),
4038 DictionaryArray::new(
4039 UInt8Array::from_iter_values(vec![0]),
4040 Arc::new(StringArray::from_iter_values(vec!["barquet"])),
4041 ),
4042 StringArray::from_iter_values(vec!["parquet", "barquet"]),
4043 );
4044
4045 ensure_compatible_write(
4048 DictionaryArray::new(
4049 UInt8Array::from_iter_values(vec![0]),
4050 Arc::new(StringArray::from_iter_values(vec!["parquet"])),
4051 ),
4052 DictionaryArray::new(
4053 UInt16Array::from_iter_values(vec![0]),
4054 Arc::new(StringArray::from_iter_values(vec!["barquet"])),
4055 ),
4056 DictionaryArray::new(
4057 UInt8Array::from_iter_values(vec![0, 1]),
4058 Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
4059 ),
4060 );
4061
4062 ensure_compatible_write(
4064 DictionaryArray::new(
4065 UInt8Array::from_iter_values(vec![0]),
4066 Arc::new(StringArray::from_iter_values(vec!["parquet"])),
4067 ),
4068 DictionaryArray::new(
4069 UInt8Array::from_iter_values(vec![0]),
4070 Arc::new(LargeStringArray::from_iter_values(vec!["barquet"])),
4071 ),
4072 DictionaryArray::new(
4073 UInt8Array::from_iter_values(vec![0, 1]),
4074 Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
4075 ),
4076 );
4077
4078 ensure_compatible_write(
4080 DictionaryArray::new(
4081 UInt8Array::from_iter_values(vec![0]),
4082 Arc::new(StringArray::from_iter_values(vec!["parquet"])),
4083 ),
4084 LargeStringArray::from_iter_values(vec!["barquet"]),
4085 DictionaryArray::new(
4086 UInt8Array::from_iter_values(vec![0, 1]),
4087 Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
4088 ),
4089 );
4090
4091 ensure_compatible_write(
4094 StringArray::from_iter_values(vec!["parquet"]),
4095 LargeStringArray::from_iter_values(vec!["barquet"]),
4096 StringArray::from_iter_values(vec!["parquet", "barquet"]),
4097 );
4098
4099 ensure_compatible_write(
4100 LargeStringArray::from_iter_values(vec!["parquet"]),
4101 StringArray::from_iter_values(vec!["barquet"]),
4102 LargeStringArray::from_iter_values(vec!["parquet", "barquet"]),
4103 );
4104
4105 ensure_compatible_write(
4106 StringArray::from_iter_values(vec!["parquet"]),
4107 StringViewArray::from_iter_values(vec!["barquet"]),
4108 StringArray::from_iter_values(vec!["parquet", "barquet"]),
4109 );
4110
4111 ensure_compatible_write(
4112 StringViewArray::from_iter_values(vec!["parquet"]),
4113 StringArray::from_iter_values(vec!["barquet"]),
4114 StringViewArray::from_iter_values(vec!["parquet", "barquet"]),
4115 );
4116
4117 ensure_compatible_write(
4118 LargeStringArray::from_iter_values(vec!["parquet"]),
4119 StringViewArray::from_iter_values(vec!["barquet"]),
4120 LargeStringArray::from_iter_values(vec!["parquet", "barquet"]),
4121 );
4122
4123 ensure_compatible_write(
4124 StringViewArray::from_iter_values(vec!["parquet"]),
4125 LargeStringArray::from_iter_values(vec!["barquet"]),
4126 StringViewArray::from_iter_values(vec!["parquet", "barquet"]),
4127 );
4128
4129 ensure_compatible_write(
4132 BinaryArray::from_iter_values(vec![b"parquet"]),
4133 LargeBinaryArray::from_iter_values(vec![b"barquet"]),
4134 BinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
4135 );
4136
4137 ensure_compatible_write(
4138 LargeBinaryArray::from_iter_values(vec![b"parquet"]),
4139 BinaryArray::from_iter_values(vec![b"barquet"]),
4140 LargeBinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
4141 );
4142
4143 ensure_compatible_write(
4144 BinaryArray::from_iter_values(vec![b"parquet"]),
4145 BinaryViewArray::from_iter_values(vec![b"barquet"]),
4146 BinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
4147 );
4148
4149 ensure_compatible_write(
4150 BinaryViewArray::from_iter_values(vec![b"parquet"]),
4151 BinaryArray::from_iter_values(vec![b"barquet"]),
4152 BinaryViewArray::from_iter_values(vec![b"parquet", b"barquet"]),
4153 );
4154
4155 ensure_compatible_write(
4156 BinaryViewArray::from_iter_values(vec![b"parquet"]),
4157 LargeBinaryArray::from_iter_values(vec![b"barquet"]),
4158 BinaryViewArray::from_iter_values(vec![b"parquet", b"barquet"]),
4159 );
4160
4161 ensure_compatible_write(
4162 LargeBinaryArray::from_iter_values(vec![b"parquet"]),
4163 BinaryViewArray::from_iter_values(vec![b"barquet"]),
4164 LargeBinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
4165 );
4166
4167 let list_field_metadata = HashMap::from_iter(vec![(
4170 PARQUET_FIELD_ID_META_KEY.to_string(),
4171 "1".to_string(),
4172 )]);
4173 let list_field = Field::new_list_field(DataType::Int32, false);
4174
4175 let values1 = Arc::new(Int32Array::from(vec![0, 1, 2, 3, 4]));
4176 let offsets1 = OffsetBuffer::new(vec![0, 2, 5].into());
4177
4178 let values2 = Arc::new(Int32Array::from(vec![5, 6, 7, 8, 9]));
4179 let offsets2 = OffsetBuffer::new(vec![0, 3, 5].into());
4180
4181 let values_expected = Arc::new(Int32Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]));
4182 let offsets_expected = OffsetBuffer::new(vec![0, 2, 5, 8, 10].into());
4183
4184 ensure_compatible_write(
4185 ListArray::try_new(
4187 Arc::new(
4188 list_field
4189 .clone()
4190 .with_metadata(list_field_metadata.clone()),
4191 ),
4192 offsets1,
4193 values1,
4194 None,
4195 )
4196 .unwrap(),
4197 ListArray::try_new(Arc::new(list_field.clone()), offsets2, values2, None).unwrap(),
4199 ListArray::try_new(
4201 Arc::new(
4202 list_field
4203 .clone()
4204 .with_metadata(list_field_metadata.clone()),
4205 ),
4206 offsets_expected,
4207 values_expected,
4208 None,
4209 )
4210 .unwrap(),
4211 );
4212 }
4213
4214 #[test]
4215 fn arrow_writer_primitive_dictionary() {
4216 #[allow(deprecated)]
4218 let schema = Arc::new(Schema::new(vec![Field::new_dict(
4219 "dictionary",
4220 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::UInt32)),
4221 true,
4222 42,
4223 true,
4224 )]));
4225
4226 let mut builder = PrimitiveDictionaryBuilder::<UInt8Type, UInt32Type>::new();
4228 builder.append(12345678).unwrap();
4229 builder.append_null();
4230 builder.append(22345678).unwrap();
4231 builder.append(12345678).unwrap();
4232 let d = builder.finish();
4233
4234 one_column_roundtrip_with_schema(Arc::new(d), schema);
4235 }
4236
4237 #[test]
4238 fn arrow_writer_decimal32_dictionary() {
4239 let integers = vec![12345, 56789, 34567];
4240
4241 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
4242
4243 let values = Decimal32Array::from(integers.clone())
4244 .with_precision_and_scale(5, 2)
4245 .unwrap();
4246
4247 let array = DictionaryArray::new(keys, Arc::new(values));
4248 one_column_roundtrip(Arc::new(array.clone()), true);
4249
4250 let values = Decimal32Array::from(integers)
4251 .with_precision_and_scale(9, 2)
4252 .unwrap();
4253
4254 let array = array.with_values(Arc::new(values));
4255 one_column_roundtrip(Arc::new(array), true);
4256 }
4257
4258 #[test]
4259 fn arrow_writer_decimal64_dictionary() {
4260 let integers = vec![12345, 56789, 34567];
4261
4262 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
4263
4264 let values = Decimal64Array::from(integers.clone())
4265 .with_precision_and_scale(5, 2)
4266 .unwrap();
4267
4268 let array = DictionaryArray::new(keys, Arc::new(values));
4269 one_column_roundtrip(Arc::new(array.clone()), true);
4270
4271 let values = Decimal64Array::from(integers)
4272 .with_precision_and_scale(12, 2)
4273 .unwrap();
4274
4275 let array = array.with_values(Arc::new(values));
4276 one_column_roundtrip(Arc::new(array), true);
4277 }
4278
4279 #[test]
4280 fn arrow_writer_decimal128_dictionary() {
4281 let integers = vec![12345, 56789, 34567];
4282
4283 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
4284
4285 let values = Decimal128Array::from(integers.clone())
4286 .with_precision_and_scale(5, 2)
4287 .unwrap();
4288
4289 let array = DictionaryArray::new(keys, Arc::new(values));
4290 one_column_roundtrip(Arc::new(array.clone()), true);
4291
4292 let values = Decimal128Array::from(integers)
4293 .with_precision_and_scale(12, 2)
4294 .unwrap();
4295
4296 let array = array.with_values(Arc::new(values));
4297 one_column_roundtrip(Arc::new(array), true);
4298 }
4299
4300 #[test]
4301 fn arrow_writer_decimal256_dictionary() {
4302 let integers = vec![
4303 i256::from_i128(12345),
4304 i256::from_i128(56789),
4305 i256::from_i128(34567),
4306 ];
4307
4308 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
4309
4310 let values = Decimal256Array::from(integers.clone())
4311 .with_precision_and_scale(5, 2)
4312 .unwrap();
4313
4314 let array = DictionaryArray::new(keys, Arc::new(values));
4315 one_column_roundtrip(Arc::new(array.clone()), true);
4316
4317 let values = Decimal256Array::from(integers)
4318 .with_precision_and_scale(12, 2)
4319 .unwrap();
4320
4321 let array = array.with_values(Arc::new(values));
4322 one_column_roundtrip(Arc::new(array), true);
4323 }
4324
4325 #[test]
4326 fn arrow_writer_string_dictionary_unsigned_index() {
4327 #[allow(deprecated)]
4329 let schema = Arc::new(Schema::new(vec![Field::new_dict(
4330 "dictionary",
4331 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
4332 true,
4333 42,
4334 true,
4335 )]));
4336
4337 let d: UInt8DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
4339 .iter()
4340 .copied()
4341 .collect();
4342
4343 one_column_roundtrip_with_schema(Arc::new(d), schema);
4344 }
4345
4346 #[test]
4347 fn u32_min_max() {
4348 let src = [
4350 u32::MIN,
4351 u32::MIN + 1,
4352 (i32::MAX as u32) - 1,
4353 i32::MAX as u32,
4354 (i32::MAX as u32) + 1,
4355 u32::MAX - 1,
4356 u32::MAX,
4357 ];
4358 let values = Arc::new(UInt32Array::from_iter_values(src.iter().cloned()));
4359 let files = one_column_roundtrip(values, false);
4360
4361 for file in files {
4362 let reader = SerializedFileReader::new(file).unwrap();
4364 let metadata = reader.metadata();
4365
4366 let mut row_offset = 0;
4367 for row_group in metadata.row_groups() {
4368 assert_eq!(row_group.num_columns(), 1);
4369 let column = row_group.column(0);
4370
4371 let num_values = column.num_values() as usize;
4372 let src_slice = &src[row_offset..row_offset + num_values];
4373 row_offset += column.num_values() as usize;
4374
4375 let stats = column.statistics().unwrap();
4376 if let Statistics::Int32(stats) = stats {
4377 assert_eq!(
4378 *stats.min_opt().unwrap() as u32,
4379 *src_slice.iter().min().unwrap()
4380 );
4381 assert_eq!(
4382 *stats.max_opt().unwrap() as u32,
4383 *src_slice.iter().max().unwrap()
4384 );
4385 } else {
4386 panic!("Statistics::Int32 missing")
4387 }
4388 }
4389 }
4390 }
4391
4392 #[test]
4393 fn u64_min_max() {
4394 let src = [
4396 u64::MIN,
4397 u64::MIN + 1,
4398 (i64::MAX as u64) - 1,
4399 i64::MAX as u64,
4400 (i64::MAX as u64) + 1,
4401 u64::MAX - 1,
4402 u64::MAX,
4403 ];
4404 let values = Arc::new(UInt64Array::from_iter_values(src.iter().cloned()));
4405 let files = one_column_roundtrip(values, false);
4406
4407 for file in files {
4408 let reader = SerializedFileReader::new(file).unwrap();
4410 let metadata = reader.metadata();
4411
4412 let mut row_offset = 0;
4413 for row_group in metadata.row_groups() {
4414 assert_eq!(row_group.num_columns(), 1);
4415 let column = row_group.column(0);
4416
4417 let num_values = column.num_values() as usize;
4418 let src_slice = &src[row_offset..row_offset + num_values];
4419 row_offset += column.num_values() as usize;
4420
4421 let stats = column.statistics().unwrap();
4422 if let Statistics::Int64(stats) = stats {
4423 assert_eq!(
4424 *stats.min_opt().unwrap() as u64,
4425 *src_slice.iter().min().unwrap()
4426 );
4427 assert_eq!(
4428 *stats.max_opt().unwrap() as u64,
4429 *src_slice.iter().max().unwrap()
4430 );
4431 } else {
4432 panic!("Statistics::Int64 missing")
4433 }
4434 }
4435 }
4436 }
4437
4438 #[test]
4439 fn statistics_null_counts_only_nulls() {
4440 let values = Arc::new(UInt64Array::from(vec![None, None]));
4442 let files = one_column_roundtrip(values, true);
4443
4444 for file in files {
4445 let reader = SerializedFileReader::new(file).unwrap();
4447 let metadata = reader.metadata();
4448 assert_eq!(metadata.num_row_groups(), 1);
4449 let row_group = metadata.row_group(0);
4450 assert_eq!(row_group.num_columns(), 1);
4451 let column = row_group.column(0);
4452 let stats = column.statistics().unwrap();
4453 assert_eq!(stats.null_count_opt(), Some(2));
4454 }
4455 }
4456
4457 #[test]
4458 fn test_list_of_struct_roundtrip() {
4459 let int_field = Field::new("a", DataType::Int32, true);
4461 let int_field2 = Field::new("b", DataType::Int32, true);
4462
4463 let int_builder = Int32Builder::with_capacity(10);
4464 let int_builder2 = Int32Builder::with_capacity(10);
4465
4466 let struct_builder = StructBuilder::new(
4467 vec![int_field, int_field2],
4468 vec![Box::new(int_builder), Box::new(int_builder2)],
4469 );
4470 let mut list_builder = ListBuilder::new(struct_builder);
4471
4472 let values = list_builder.values();
4477 values
4478 .field_builder::<Int32Builder>(0)
4479 .unwrap()
4480 .append_value(1);
4481 values
4482 .field_builder::<Int32Builder>(1)
4483 .unwrap()
4484 .append_value(2);
4485 values.append(true);
4486 list_builder.append(true);
4487
4488 list_builder.append(true);
4490
4491 list_builder.append(false);
4493
4494 let values = list_builder.values();
4496 values
4497 .field_builder::<Int32Builder>(0)
4498 .unwrap()
4499 .append_null();
4500 values
4501 .field_builder::<Int32Builder>(1)
4502 .unwrap()
4503 .append_null();
4504 values.append(false);
4505 values
4506 .field_builder::<Int32Builder>(0)
4507 .unwrap()
4508 .append_null();
4509 values
4510 .field_builder::<Int32Builder>(1)
4511 .unwrap()
4512 .append_null();
4513 values.append(false);
4514 list_builder.append(true);
4515
4516 let values = list_builder.values();
4518 values
4519 .field_builder::<Int32Builder>(0)
4520 .unwrap()
4521 .append_null();
4522 values
4523 .field_builder::<Int32Builder>(1)
4524 .unwrap()
4525 .append_value(3);
4526 values.append(true);
4527 list_builder.append(true);
4528
4529 let values = list_builder.values();
4531 values
4532 .field_builder::<Int32Builder>(0)
4533 .unwrap()
4534 .append_value(2);
4535 values
4536 .field_builder::<Int32Builder>(1)
4537 .unwrap()
4538 .append_null();
4539 values.append(true);
4540 list_builder.append(true);
4541
4542 let array = Arc::new(list_builder.finish());
4543
4544 one_column_roundtrip(array, true);
4545 }
4546
4547 fn row_group_sizes(metadata: &ParquetMetaData) -> Vec<i64> {
4548 metadata.row_groups().iter().map(|x| x.num_rows()).collect()
4549 }
4550
4551 #[test]
4552 fn test_aggregates_records() {
4553 let arrays = [
4554 Int32Array::from((0..100).collect::<Vec<_>>()),
4555 Int32Array::from((0..50).collect::<Vec<_>>()),
4556 Int32Array::from((200..500).collect::<Vec<_>>()),
4557 ];
4558
4559 let schema = Arc::new(Schema::new(vec![Field::new(
4560 "int",
4561 ArrowDataType::Int32,
4562 false,
4563 )]));
4564
4565 let file = tempfile::tempfile().unwrap();
4566
4567 let props = WriterProperties::builder()
4568 .set_max_row_group_row_count(Some(200))
4569 .build();
4570
4571 let mut writer =
4572 ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
4573
4574 for array in arrays {
4575 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
4576 writer.write(&batch).unwrap();
4577 }
4578
4579 writer.close().unwrap();
4580
4581 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4582 assert_eq!(&row_group_sizes(builder.metadata()), &[200, 200, 50]);
4583
4584 let batches = builder
4585 .with_batch_size(100)
4586 .build()
4587 .unwrap()
4588 .collect::<ArrowResult<Vec<_>>>()
4589 .unwrap();
4590
4591 assert_eq!(batches.len(), 5);
4592 assert!(batches.iter().all(|x| x.num_columns() == 1));
4593
4594 let batch_sizes: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
4595
4596 assert_eq!(&batch_sizes, &[100, 100, 100, 100, 50]);
4597
4598 let values: Vec<_> = batches
4599 .iter()
4600 .flat_map(|x| {
4601 x.column(0)
4602 .as_any()
4603 .downcast_ref::<Int32Array>()
4604 .unwrap()
4605 .values()
4606 .iter()
4607 .cloned()
4608 })
4609 .collect();
4610
4611 let expected_values: Vec<_> = [0..100, 0..50, 200..500].into_iter().flatten().collect();
4612 assert_eq!(&values, &expected_values)
4613 }
4614
4615 #[test]
4616 fn complex_aggregate() {
4617 let field_a = Arc::new(Field::new("leaf_a", DataType::Int32, false));
4619 let field_b = Arc::new(Field::new("leaf_b", DataType::Int32, true));
4620 let struct_a = Arc::new(Field::new(
4621 "struct_a",
4622 DataType::Struct(vec![field_a.clone(), field_b.clone()].into()),
4623 true,
4624 ));
4625
4626 let list_a = Arc::new(Field::new("list", DataType::List(struct_a), true));
4627 let struct_b = Arc::new(Field::new(
4628 "struct_b",
4629 DataType::Struct(vec![list_a.clone()].into()),
4630 false,
4631 ));
4632
4633 let schema = Arc::new(Schema::new(vec![struct_b]));
4634
4635 let field_a_array = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
4637 let field_b_array =
4638 Int32Array::from_iter(vec![Some(1), None, Some(2), None, None, Some(6)]);
4639
4640 let struct_a_array = StructArray::from(vec![
4641 (field_a.clone(), Arc::new(field_a_array) as ArrayRef),
4642 (field_b.clone(), Arc::new(field_b_array) as ArrayRef),
4643 ]);
4644
4645 let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
4646 .len(5)
4647 .add_buffer(Buffer::from_iter(vec![
4648 0_i32, 1_i32, 1_i32, 3_i32, 3_i32, 5_i32,
4649 ]))
4650 .null_bit_buffer(Some(Buffer::from_iter(vec![
4651 true, false, true, false, true,
4652 ])))
4653 .child_data(vec![struct_a_array.into_data()])
4654 .build()
4655 .unwrap();
4656
4657 let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
4658 let struct_b_array = StructArray::from(vec![(list_a.clone(), list_a_array)]);
4659
4660 let batch1 =
4661 RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
4662 .unwrap();
4663
4664 let field_a_array = Int32Array::from(vec![6, 7, 8, 9, 10]);
4665 let field_b_array = Int32Array::from_iter(vec![None, None, None, Some(1), None]);
4666
4667 let struct_a_array = StructArray::from(vec![
4668 (field_a, Arc::new(field_a_array) as ArrayRef),
4669 (field_b, Arc::new(field_b_array) as ArrayRef),
4670 ]);
4671
4672 let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
4673 .len(2)
4674 .add_buffer(Buffer::from_iter(vec![0_i32, 4_i32, 5_i32]))
4675 .child_data(vec![struct_a_array.into_data()])
4676 .build()
4677 .unwrap();
4678
4679 let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
4680 let struct_b_array = StructArray::from(vec![(list_a, list_a_array)]);
4681
4682 let batch2 =
4683 RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
4684 .unwrap();
4685
4686 let batches = &[batch1, batch2];
4687
4688 let expected = r#"
4691 +-------------------------------------------------------------------------------------------------------+
4692 | struct_b |
4693 +-------------------------------------------------------------------------------------------------------+
4694 | {list: [{leaf_a: 1, leaf_b: 1}]} |
4695 | {list: } |
4696 | {list: [{leaf_a: 2, leaf_b: }, {leaf_a: 3, leaf_b: 2}]} |
4697 | {list: } |
4698 | {list: [{leaf_a: 4, leaf_b: }, {leaf_a: 5, leaf_b: }]} |
4699 | {list: [{leaf_a: 6, leaf_b: }, {leaf_a: 7, leaf_b: }, {leaf_a: 8, leaf_b: }, {leaf_a: 9, leaf_b: 1}]} |
4700 | {list: [{leaf_a: 10, leaf_b: }]} |
4701 +-------------------------------------------------------------------------------------------------------+
4702 "#.trim().split('\n').map(|x| x.trim()).collect::<Vec<_>>().join("\n");
4703
4704 let actual = pretty_format_batches(batches).unwrap().to_string();
4705 assert_eq!(actual, expected);
4706
4707 let file = tempfile::tempfile().unwrap();
4709 let props = WriterProperties::builder()
4710 .set_max_row_group_row_count(Some(6))
4711 .build();
4712
4713 let mut writer =
4714 ArrowWriter::try_new(file.try_clone().unwrap(), schema, Some(props)).unwrap();
4715
4716 for batch in batches {
4717 writer.write(batch).unwrap();
4718 }
4719 writer.close().unwrap();
4720
4721 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4726 assert_eq!(&row_group_sizes(builder.metadata()), &[6, 1]);
4727
4728 let batches = builder
4729 .with_batch_size(2)
4730 .build()
4731 .unwrap()
4732 .collect::<ArrowResult<Vec<_>>>()
4733 .unwrap();
4734
4735 assert_eq!(batches.len(), 4);
4736 let batch_counts: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
4737 assert_eq!(&batch_counts, &[2, 2, 2, 1]);
4738
4739 let actual = pretty_format_batches(&batches).unwrap().to_string();
4740 assert_eq!(actual, expected);
4741 }
4742
4743 #[test]
4744 fn test_arrow_writer_metadata() {
4745 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
4746 let file_schema = batch_schema.clone().with_metadata(
4747 vec![("foo".to_string(), "bar".to_string())]
4748 .into_iter()
4749 .collect(),
4750 );
4751
4752 let batch = RecordBatch::try_new(
4753 Arc::new(batch_schema),
4754 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4755 )
4756 .unwrap();
4757
4758 let mut buf = Vec::with_capacity(1024);
4759 let mut writer = ArrowWriter::try_new(&mut buf, Arc::new(file_schema), None).unwrap();
4760 writer.write(&batch).unwrap();
4761 writer.close().unwrap();
4762 }
4763
4764 #[test]
4765 fn test_arrow_writer_nullable() {
4766 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
4767 let file_schema = Schema::new(vec![Field::new("int32", DataType::Int32, true)]);
4768 let file_schema = Arc::new(file_schema);
4769
4770 let batch = RecordBatch::try_new(
4771 Arc::new(batch_schema),
4772 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4773 )
4774 .unwrap();
4775
4776 let mut buf = Vec::with_capacity(1024);
4777 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
4778 writer.write(&batch).unwrap();
4779 writer.close().unwrap();
4780
4781 let mut read = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
4782 let back = read.next().unwrap().unwrap();
4783 assert_eq!(back.schema(), file_schema);
4784 assert_ne!(back.schema(), batch.schema());
4785 assert_eq!(back.column(0).as_ref(), batch.column(0).as_ref());
4786 }
4787
4788 #[test]
4789 fn in_progress_accounting() {
4790 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
4792
4793 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
4795
4796 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
4798
4799 let mut writer = ArrowWriter::try_new(vec![], batch.schema(), None).unwrap();
4800
4801 assert_eq!(writer.in_progress_size(), 0);
4803 assert_eq!(writer.in_progress_rows(), 0);
4804 assert_eq!(writer.memory_size(), 0);
4805 assert_eq!(writer.bytes_written(), 4); writer.write(&batch).unwrap();
4807
4808 let initial_size = writer.in_progress_size();
4810 assert!(initial_size > 0);
4811 assert_eq!(writer.in_progress_rows(), 5);
4812 let initial_memory = writer.memory_size();
4813 assert!(initial_memory > 0);
4814 assert!(
4816 initial_size <= initial_memory,
4817 "{initial_size} <= {initial_memory}"
4818 );
4819
4820 writer.write(&batch).unwrap();
4822 assert!(writer.in_progress_size() > initial_size);
4823 assert_eq!(writer.in_progress_rows(), 10);
4824 assert!(writer.memory_size() > initial_memory);
4825 assert!(
4826 writer.in_progress_size() <= writer.memory_size(),
4827 "in_progress_size {} <= memory_size {}",
4828 writer.in_progress_size(),
4829 writer.memory_size()
4830 );
4831
4832 let pre_flush_bytes_written = writer.bytes_written();
4834 writer.flush().unwrap();
4835 assert_eq!(writer.in_progress_size(), 0);
4836 assert_eq!(writer.memory_size(), 0);
4837 assert!(writer.bytes_written() > pre_flush_bytes_written);
4838
4839 writer.close().unwrap();
4840 }
4841
4842 #[test]
4843 fn test_writer_all_null() {
4844 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
4845 let b = Int32Array::new(vec![0; 5].into(), Some(NullBuffer::new_null(5)));
4846 let batch = RecordBatch::try_from_iter(vec![
4847 ("a", Arc::new(a) as ArrayRef),
4848 ("b", Arc::new(b) as ArrayRef),
4849 ])
4850 .unwrap();
4851
4852 let mut buf = Vec::with_capacity(1024);
4853 let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), None).unwrap();
4854 writer.write(&batch).unwrap();
4855 writer.close().unwrap();
4856
4857 let bytes = Bytes::from(buf);
4858 let options = ReadOptionsBuilder::new().with_page_index().build();
4859 let reader = SerializedFileReader::new_with_options(bytes, options).unwrap();
4860 let index = reader.metadata().offset_index().unwrap();
4861
4862 assert_eq!(index.len(), 1);
4863 assert_eq!(index[0].len(), 2); assert_eq!(index[0][0].page_locations().len(), 1); assert_eq!(index[0][1].page_locations().len(), 1); }
4867
4868 #[test]
4869 fn test_disabled_statistics_with_page() {
4870 let file_schema = Schema::new(vec![
4871 Field::new("a", DataType::Utf8, true),
4872 Field::new("b", DataType::Utf8, true),
4873 ]);
4874 let file_schema = Arc::new(file_schema);
4875
4876 let batch = RecordBatch::try_new(
4877 file_schema.clone(),
4878 vec![
4879 Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
4880 Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
4881 ],
4882 )
4883 .unwrap();
4884
4885 let props = WriterProperties::builder()
4886 .set_statistics_enabled(EnabledStatistics::None)
4887 .set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
4888 .build();
4889
4890 let mut buf = Vec::with_capacity(1024);
4891 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
4892 writer.write(&batch).unwrap();
4893
4894 let metadata = writer.close().unwrap();
4895 assert_eq!(metadata.num_row_groups(), 1);
4896 let row_group = metadata.row_group(0);
4897 assert_eq!(row_group.num_columns(), 2);
4898 assert!(row_group.column(0).offset_index_offset().is_some());
4900 assert!(row_group.column(0).column_index_offset().is_some());
4901 assert!(row_group.column(1).offset_index_offset().is_some());
4903 assert!(row_group.column(1).column_index_offset().is_none());
4904
4905 let options = ReadOptionsBuilder::new().with_page_index().build();
4906 let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
4907
4908 let row_group = reader.get_row_group(0).unwrap();
4909 let a_col = row_group.metadata().column(0);
4910 let b_col = row_group.metadata().column(1);
4911
4912 if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
4914 let min = byte_array_stats.min_opt().unwrap();
4915 let max = byte_array_stats.max_opt().unwrap();
4916
4917 assert_eq!(min.as_bytes(), b"a");
4918 assert_eq!(max.as_bytes(), b"d");
4919 } else {
4920 panic!("expecting Statistics::ByteArray");
4921 }
4922
4923 assert!(b_col.statistics().is_none());
4925
4926 let offset_index = reader.metadata().offset_index().unwrap();
4927 assert_eq!(offset_index.len(), 1); assert_eq!(offset_index[0].len(), 2); let column_index = reader.metadata().column_index().unwrap();
4931 assert_eq!(column_index.len(), 1); assert_eq!(column_index[0].len(), 2); let a_idx = &column_index[0][0];
4935 assert!(
4936 matches!(a_idx, ColumnIndexMetaData::BYTE_ARRAY(_)),
4937 "{a_idx:?}"
4938 );
4939 let b_idx = &column_index[0][1];
4940 assert!(matches!(b_idx, ColumnIndexMetaData::NONE), "{b_idx:?}");
4941 }
4942
4943 #[test]
4944 fn test_disabled_statistics_with_chunk() {
4945 let file_schema = Schema::new(vec![
4946 Field::new("a", DataType::Utf8, true),
4947 Field::new("b", DataType::Utf8, true),
4948 ]);
4949 let file_schema = Arc::new(file_schema);
4950
4951 let batch = RecordBatch::try_new(
4952 file_schema.clone(),
4953 vec![
4954 Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
4955 Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
4956 ],
4957 )
4958 .unwrap();
4959
4960 let props = WriterProperties::builder()
4961 .set_statistics_enabled(EnabledStatistics::None)
4962 .set_column_statistics_enabled("a".into(), EnabledStatistics::Chunk)
4963 .build();
4964
4965 let mut buf = Vec::with_capacity(1024);
4966 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
4967 writer.write(&batch).unwrap();
4968
4969 let metadata = writer.close().unwrap();
4970 assert_eq!(metadata.num_row_groups(), 1);
4971 let row_group = metadata.row_group(0);
4972 assert_eq!(row_group.num_columns(), 2);
4973 assert!(row_group.column(0).offset_index_offset().is_some());
4975 assert!(row_group.column(0).column_index_offset().is_none());
4976 assert!(row_group.column(1).offset_index_offset().is_some());
4978 assert!(row_group.column(1).column_index_offset().is_none());
4979
4980 let options = ReadOptionsBuilder::new().with_page_index().build();
4981 let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
4982
4983 let row_group = reader.get_row_group(0).unwrap();
4984 let a_col = row_group.metadata().column(0);
4985 let b_col = row_group.metadata().column(1);
4986
4987 if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
4989 let min = byte_array_stats.min_opt().unwrap();
4990 let max = byte_array_stats.max_opt().unwrap();
4991
4992 assert_eq!(min.as_bytes(), b"a");
4993 assert_eq!(max.as_bytes(), b"d");
4994 } else {
4995 panic!("expecting Statistics::ByteArray");
4996 }
4997
4998 assert!(b_col.statistics().is_none());
5000
5001 let column_index = reader.metadata().column_index().unwrap();
5002 assert_eq!(column_index.len(), 1); assert_eq!(column_index[0].len(), 2); let a_idx = &column_index[0][0];
5006 assert!(matches!(a_idx, ColumnIndexMetaData::NONE), "{a_idx:?}");
5007 let b_idx = &column_index[0][1];
5008 assert!(matches!(b_idx, ColumnIndexMetaData::NONE), "{b_idx:?}");
5009 }
5010
5011 #[test]
5012 fn test_arrow_writer_skip_metadata() {
5013 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
5014 let file_schema = Arc::new(batch_schema.clone());
5015
5016 let batch = RecordBatch::try_new(
5017 Arc::new(batch_schema),
5018 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
5019 )
5020 .unwrap();
5021 let skip_options = ArrowWriterOptions::new().with_skip_arrow_metadata(true);
5022
5023 let mut buf = Vec::with_capacity(1024);
5024 let mut writer =
5025 ArrowWriter::try_new_with_options(&mut buf, file_schema.clone(), skip_options).unwrap();
5026 writer.write(&batch).unwrap();
5027 writer.close().unwrap();
5028
5029 let bytes = Bytes::from(buf);
5030 let reader_builder = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
5031 assert_eq!(file_schema, *reader_builder.schema());
5032 if let Some(key_value_metadata) = reader_builder
5033 .metadata()
5034 .file_metadata()
5035 .key_value_metadata()
5036 {
5037 assert!(
5038 !key_value_metadata
5039 .iter()
5040 .any(|kv| kv.key.as_str() == ARROW_SCHEMA_META_KEY)
5041 );
5042 }
5043 }
5044
5045 #[test]
5046 fn test_arrow_writer_skip_path_in_schema() {
5047 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
5048 let file_schema = Arc::new(batch_schema.clone());
5049
5050 let batch = RecordBatch::try_new(
5051 Arc::new(batch_schema),
5052 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
5053 )
5054 .unwrap();
5055
5056 let skip_options = ArrowWriterOptions::new();
5058
5059 let mut buf = Vec::with_capacity(1024);
5060 let mut writer =
5061 ArrowWriter::try_new_with_options(&mut buf, file_schema.clone(), skip_options).unwrap();
5062 writer.write(&batch).unwrap();
5063 writer.close().unwrap();
5064
5065 let skip_options = ArrowWriterOptions::new().with_properties(
5067 WriterProperties::builder()
5068 .set_write_path_in_schema(false)
5069 .build(),
5070 );
5071
5072 let mut buf2 = Vec::with_capacity(1024);
5073 let mut writer =
5074 ArrowWriter::try_new_with_options(&mut buf2, file_schema.clone(), skip_options)
5075 .unwrap();
5076 writer.write(&batch).unwrap();
5077 writer.close().unwrap();
5078
5079 assert!(buf.len() > buf2.len());
5081 }
5082
5083 #[test]
5084 fn mismatched_schemas() {
5085 let batch_schema = Schema::new(vec![Field::new("count", DataType::Int32, false)]);
5086 let file_schema = Arc::new(Schema::new(vec![Field::new(
5087 "temperature",
5088 DataType::Float64,
5089 false,
5090 )]));
5091
5092 let batch = RecordBatch::try_new(
5093 Arc::new(batch_schema),
5094 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
5095 )
5096 .unwrap();
5097
5098 let mut buf = Vec::with_capacity(1024);
5099 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
5100
5101 let err = writer.write(&batch).unwrap_err().to_string();
5102 assert_eq!(
5103 err,
5104 "Arrow: Incompatible type. Field 'temperature' has type Float64, array has type Int32"
5105 );
5106 }
5107
5108 #[test]
5109 fn test_roundtrip_empty_schema() {
5111 let empty_batch = RecordBatch::try_new_with_options(
5113 Arc::new(Schema::empty()),
5114 vec![],
5115 &RecordBatchOptions::default().with_row_count(Some(0)),
5116 )
5117 .unwrap();
5118
5119 let mut parquet_bytes: Vec<u8> = Vec::new();
5121 let mut writer =
5122 ArrowWriter::try_new(&mut parquet_bytes, empty_batch.schema(), None).unwrap();
5123 writer.write(&empty_batch).unwrap();
5124 writer.close().unwrap();
5125
5126 let bytes = Bytes::from(parquet_bytes);
5128 let reader = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
5129 assert_eq!(reader.schema(), &empty_batch.schema());
5130 let batches: Vec<_> = reader
5131 .build()
5132 .unwrap()
5133 .collect::<ArrowResult<Vec<_>>>()
5134 .unwrap();
5135 assert_eq!(batches.len(), 0);
5136 }
5137
5138 #[test]
5139 fn test_page_stats_not_written_by_default() {
5140 let string_field = Field::new("a", DataType::Utf8, false);
5141 let schema = Schema::new(vec![string_field]);
5142 let raw_string_values = vec!["Blart Versenwald III"];
5143 let string_values = StringArray::from(raw_string_values.clone());
5144 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
5145
5146 let props = WriterProperties::builder()
5147 .set_statistics_enabled(EnabledStatistics::Page)
5148 .set_dictionary_enabled(false)
5149 .set_encoding(Encoding::PLAIN)
5150 .set_compression(crate::basic::Compression::UNCOMPRESSED)
5151 .build();
5152
5153 let file = roundtrip_opts(&batch, props);
5154
5155 let first_page = &file[4..];
5160 let mut prot = ThriftSliceInputProtocol::new(first_page);
5161 let hdr = PageHeader::read_thrift(&mut prot).unwrap();
5162 let stats = hdr.data_page_header.unwrap().statistics;
5163
5164 assert!(stats.is_none());
5165 }
5166
5167 #[test]
5168 fn test_page_stats_when_enabled() {
5169 let string_field = Field::new("a", DataType::Utf8, false);
5170 let schema = Schema::new(vec![string_field]);
5171 let raw_string_values = vec!["Blart Versenwald III", "Andrew Lamb"];
5172 let string_values = StringArray::from(raw_string_values.clone());
5173 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
5174
5175 let props = WriterProperties::builder()
5176 .set_statistics_enabled(EnabledStatistics::Page)
5177 .set_dictionary_enabled(false)
5178 .set_encoding(Encoding::PLAIN)
5179 .set_write_page_header_statistics(true)
5180 .set_compression(crate::basic::Compression::UNCOMPRESSED)
5181 .build();
5182
5183 let file = roundtrip_opts(&batch, props);
5184
5185 let first_page = &file[4..];
5190 let mut prot = ThriftSliceInputProtocol::new(first_page);
5191 let hdr = PageHeader::read_thrift(&mut prot).unwrap();
5192 let stats = hdr.data_page_header.unwrap().statistics;
5193
5194 let stats = stats.unwrap();
5195 assert!(stats.is_max_value_exact.unwrap());
5197 assert!(stats.is_min_value_exact.unwrap());
5198 assert_eq!(stats.max_value.unwrap(), "Blart Versenwald III".as_bytes());
5199 assert_eq!(stats.min_value.unwrap(), "Andrew Lamb".as_bytes());
5200 }
5201
5202 #[test]
5203 fn test_page_stats_truncation() {
5204 let string_field = Field::new("a", DataType::Utf8, false);
5205 let binary_field = Field::new("b", DataType::Binary, false);
5206 let schema = Schema::new(vec![string_field, binary_field]);
5207
5208 let raw_string_values = vec!["Blart Versenwald III"];
5209 let raw_binary_values = [b"Blart Versenwald III".to_vec()];
5210 let raw_binary_value_refs = raw_binary_values
5211 .iter()
5212 .map(|x| x.as_slice())
5213 .collect::<Vec<_>>();
5214
5215 let string_values = StringArray::from(raw_string_values.clone());
5216 let binary_values = BinaryArray::from(raw_binary_value_refs);
5217 let batch = RecordBatch::try_new(
5218 Arc::new(schema),
5219 vec![Arc::new(string_values), Arc::new(binary_values)],
5220 )
5221 .unwrap();
5222
5223 let props = WriterProperties::builder()
5224 .set_statistics_truncate_length(Some(2))
5225 .set_dictionary_enabled(false)
5226 .set_encoding(Encoding::PLAIN)
5227 .set_write_page_header_statistics(true)
5228 .set_compression(crate::basic::Compression::UNCOMPRESSED)
5229 .build();
5230
5231 let file = roundtrip_opts(&batch, props);
5232
5233 let first_page = &file[4..];
5238 let mut prot = ThriftSliceInputProtocol::new(first_page);
5239 let hdr = PageHeader::read_thrift(&mut prot).unwrap();
5240 let stats = hdr.data_page_header.unwrap().statistics;
5241 assert!(stats.is_some());
5242 let stats = stats.unwrap();
5243 assert!(!stats.is_max_value_exact.unwrap());
5245 assert!(!stats.is_min_value_exact.unwrap());
5246 assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
5247 assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
5248
5249 let second_page = &prot.as_slice()[hdr.compressed_page_size as usize..];
5251 let mut prot = ThriftSliceInputProtocol::new(second_page);
5252 let hdr = PageHeader::read_thrift(&mut prot).unwrap();
5253 let stats = hdr.data_page_header.unwrap().statistics;
5254 assert!(stats.is_some());
5255 let stats = stats.unwrap();
5256 assert!(!stats.is_max_value_exact.unwrap());
5258 assert!(!stats.is_min_value_exact.unwrap());
5259 assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
5260 assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
5261 }
5262
5263 #[test]
5264 fn test_page_encoding_statistics_roundtrip() {
5265 let batch_schema = Schema::new(vec![Field::new(
5266 "int32",
5267 arrow_schema::DataType::Int32,
5268 false,
5269 )]);
5270
5271 let batch = RecordBatch::try_new(
5272 Arc::new(batch_schema.clone()),
5273 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
5274 )
5275 .unwrap();
5276
5277 let mut file: File = tempfile::tempfile().unwrap();
5278 let mut writer = ArrowWriter::try_new(&mut file, Arc::new(batch_schema), None).unwrap();
5279 writer.write(&batch).unwrap();
5280 let file_metadata = writer.close().unwrap();
5281
5282 assert_eq!(file_metadata.num_row_groups(), 1);
5283 assert_eq!(file_metadata.row_group(0).num_columns(), 1);
5284 assert!(
5285 file_metadata
5286 .row_group(0)
5287 .column(0)
5288 .page_encoding_stats()
5289 .is_some()
5290 );
5291 let chunk_page_stats = file_metadata
5292 .row_group(0)
5293 .column(0)
5294 .page_encoding_stats()
5295 .unwrap();
5296
5297 let options = ReadOptionsBuilder::new()
5299 .with_page_index()
5300 .with_encoding_stats_as_mask(false)
5301 .build();
5302 let reader = SerializedFileReader::new_with_options(file, options).unwrap();
5303
5304 let rowgroup = reader.get_row_group(0).expect("row group missing");
5305 assert_eq!(rowgroup.num_columns(), 1);
5306 let column = rowgroup.metadata().column(0);
5307 assert!(column.page_encoding_stats().is_some());
5308 let file_page_stats = column.page_encoding_stats().unwrap();
5309 assert_eq!(chunk_page_stats, file_page_stats);
5310 }
5311
5312 #[test]
5313 fn test_different_dict_page_size_limit() {
5314 let array = Arc::new(Int64Array::from_iter(0..1024 * 1024));
5315 let schema = Arc::new(Schema::new(vec![
5316 Field::new("col0", arrow_schema::DataType::Int64, false),
5317 Field::new("col1", arrow_schema::DataType::Int64, false),
5318 ]));
5319 let batch =
5320 arrow_array::RecordBatch::try_new(schema.clone(), vec![array.clone(), array]).unwrap();
5321
5322 let props = WriterProperties::builder()
5323 .set_dictionary_page_size_limit(1024 * 1024)
5324 .set_column_dictionary_page_size_limit(ColumnPath::from("col1"), 1024 * 1024 * 4)
5325 .build();
5326 let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap();
5327 writer.write(&batch).unwrap();
5328 let data = Bytes::from(writer.into_inner().unwrap());
5329
5330 let mut metadata = ParquetMetaDataReader::new();
5331 metadata.try_parse(&data).unwrap();
5332 let metadata = metadata.finish().unwrap();
5333 let col0_meta = metadata.row_group(0).column(0);
5334 let col1_meta = metadata.row_group(0).column(1);
5335
5336 let get_dict_page_size = move |meta: &ColumnChunkMetaData| {
5337 let mut reader =
5338 SerializedPageReader::new(Arc::new(data.clone()), meta, 0, None).unwrap();
5339 let page = reader.get_next_page().unwrap().unwrap();
5340 match page {
5341 Page::DictionaryPage { buf, .. } => buf.len(),
5342 _ => panic!("expected DictionaryPage"),
5343 }
5344 };
5345
5346 assert_eq!(get_dict_page_size(col0_meta), 1024 * 1024);
5347 assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4);
5348 }
5349
5350 #[test]
5351 fn test_arrow_writer_granular_mode_roundtrip() {
5352 let small = "tiny".to_string();
5361 let big = "x".repeat(64 * 1024);
5362 let strings: Vec<String> = (0..256)
5363 .map(|i| {
5364 if i % 16 == 0 {
5365 big.clone()
5366 } else {
5367 small.clone()
5368 }
5369 })
5370 .collect();
5371
5372 let schema = Arc::new(Schema::new(vec![Field::new(
5373 "col",
5374 ArrowDataType::Utf8,
5375 false,
5376 )]));
5377 let batch = RecordBatch::try_new(
5378 schema.clone(),
5379 vec![Arc::new(StringArray::from(strings.clone())) as _],
5380 )
5381 .unwrap();
5382
5383 let props = WriterProperties::builder()
5384 .set_dictionary_enabled(false)
5385 .set_data_page_size_limit(16 * 1024)
5386 .build();
5387 let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap();
5388 writer.write(&batch).unwrap();
5389 let data = Bytes::from(writer.into_inner().unwrap());
5390
5391 let mut reader = ParquetRecordBatchReader::try_new(data, 1024).unwrap();
5392 let read = reader.next().unwrap().unwrap();
5393 assert!(reader.next().is_none(), "expected one batch");
5394 let col = read
5395 .column(0)
5396 .as_any()
5397 .downcast_ref::<StringArray>()
5398 .unwrap();
5399 assert_eq!(col.len(), strings.len());
5400 for (i, expected) in strings.iter().enumerate() {
5401 assert_eq!(
5402 col.value(i),
5403 expected.as_str(),
5404 "value mismatch at index {i}"
5405 );
5406 }
5407 }
5408
5409 #[test]
5410 fn test_arrow_writer_all_null_string_column() {
5411 let num_rows = 1024;
5416 let schema = Arc::new(Schema::new(vec![Field::new(
5417 "col",
5418 ArrowDataType::Utf8,
5419 true,
5420 )]));
5421 let nulls: Vec<Option<&str>> = vec![None; num_rows];
5422 let batch = RecordBatch::try_new(
5423 schema.clone(),
5424 vec![Arc::new(StringArray::from(nulls)) as _],
5425 )
5426 .unwrap();
5427
5428 let props = WriterProperties::builder()
5429 .set_dictionary_enabled(false)
5430 .set_data_page_size_limit(16 * 1024)
5431 .build();
5432 let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap();
5433 writer.write(&batch).unwrap();
5434 let data = Bytes::from(writer.into_inner().unwrap());
5435
5436 let mut metadata = ParquetMetaDataReader::new();
5439 metadata.try_parse(&data).unwrap();
5440 let metadata = metadata.finish().unwrap();
5441 let row_group = metadata.row_group(0);
5442 let col_meta = row_group.column(0);
5443 assert_eq!(row_group.num_rows() as usize, num_rows);
5444 if let Some(stats) = col_meta.statistics() {
5447 assert_eq!(
5448 stats.null_count_opt().unwrap_or(0) as usize,
5449 num_rows,
5450 "expected all-null column to report null_count = num_rows"
5451 );
5452 }
5453
5454 let mut reader =
5455 SerializedPageReader::new(Arc::new(data.clone()), col_meta, num_rows, None).unwrap();
5456 let mut total_values = 0u32;
5457 while let Some(page) = reader.get_next_page().unwrap() {
5458 if matches!(page, Page::DataPage { .. } | Page::DataPageV2 { .. }) {
5459 total_values += page.num_values();
5460 }
5461 }
5462 assert_eq!(
5463 total_values as usize, num_rows,
5464 "expected every level position to be represented in some page"
5465 );
5466 }
5467
5468 struct WriteBatchesShape {
5469 num_batches: usize,
5470 rows_per_batch: usize,
5471 row_size: usize,
5472 }
5473
5474 fn write_batches(
5476 WriteBatchesShape {
5477 num_batches,
5478 rows_per_batch,
5479 row_size,
5480 }: WriteBatchesShape,
5481 props: WriterProperties,
5482 ) -> ParquetRecordBatchReaderBuilder<File> {
5483 let schema = Arc::new(Schema::new(vec![Field::new(
5484 "str",
5485 ArrowDataType::Utf8,
5486 false,
5487 )]));
5488 let file = tempfile::tempfile().unwrap();
5489 let mut writer =
5490 ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
5491
5492 for batch_idx in 0..num_batches {
5493 let strings: Vec<String> = (0..rows_per_batch)
5494 .map(|i| format!("{:0>width$}", batch_idx * 10 + i, width = row_size))
5495 .collect();
5496 let array = StringArray::from(strings);
5497 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
5498 writer.write(&batch).unwrap();
5499 }
5500 writer.close().unwrap();
5501 ParquetRecordBatchReaderBuilder::try_new(file).unwrap()
5502 }
5503
5504 #[test]
5505 fn test_row_group_limit_none_writes_single_row_group() {
5507 let props = WriterProperties::builder()
5508 .set_max_row_group_row_count(None)
5509 .set_max_row_group_bytes(None)
5510 .build();
5511
5512 let builder = write_batches(
5513 WriteBatchesShape {
5514 num_batches: 1,
5515 rows_per_batch: 1000,
5516 row_size: 4,
5517 },
5518 props,
5519 );
5520
5521 assert_eq!(
5522 &row_group_sizes(builder.metadata()),
5523 &[1000],
5524 "With no limits, all rows should be in a single row group"
5525 );
5526 }
5527
5528 #[test]
5529 fn test_row_group_limit_rows_only() {
5531 let props = WriterProperties::builder()
5532 .set_max_row_group_row_count(Some(300))
5533 .set_max_row_group_bytes(None)
5534 .build();
5535
5536 let builder = write_batches(
5537 WriteBatchesShape {
5538 num_batches: 1,
5539 rows_per_batch: 1000,
5540 row_size: 4,
5541 },
5542 props,
5543 );
5544
5545 assert_eq!(
5546 &row_group_sizes(builder.metadata()),
5547 &[300, 300, 300, 100],
5548 "Row groups should be split by row count"
5549 );
5550 }
5551
5552 #[test]
5553 fn test_row_group_limit_bytes_only() {
5555 let props = WriterProperties::builder()
5556 .set_max_row_group_row_count(None)
5557 .set_max_row_group_bytes(Some(3500))
5559 .build();
5560
5561 let builder = write_batches(
5562 WriteBatchesShape {
5563 num_batches: 10,
5564 rows_per_batch: 10,
5565 row_size: 100,
5566 },
5567 props,
5568 );
5569
5570 let sizes = row_group_sizes(builder.metadata());
5571
5572 assert!(
5573 sizes.len() > 1,
5574 "Should have multiple row groups due to byte limit, got {sizes:?}",
5575 );
5576
5577 let total_rows: i64 = sizes.iter().sum();
5578 assert_eq!(total_rows, 100, "Total rows should be preserved");
5579 }
5580
5581 #[test]
5582 fn test_row_group_limit_bytes_flushes_when_current_group_already_too_large() {
5584 let schema = Arc::new(Schema::new(vec![Field::new(
5585 "str",
5586 ArrowDataType::Utf8,
5587 false,
5588 )]));
5589 let file = tempfile::tempfile().unwrap();
5590
5591 let props = WriterProperties::builder()
5593 .set_max_row_group_row_count(None)
5594 .set_max_row_group_bytes(None)
5595 .build();
5596 let mut writer =
5597 ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
5598
5599 let first_array = StringArray::from(
5600 (0..10)
5601 .map(|i| format!("{:0>100}", i))
5602 .collect::<Vec<String>>(),
5603 );
5604 let first_batch =
5605 RecordBatch::try_new(schema.clone(), vec![Arc::new(first_array)]).unwrap();
5606 writer.write(&first_batch).unwrap();
5607 assert_eq!(writer.in_progress_rows(), 10);
5608
5609 writer.max_row_group_bytes = Some(1);
5612
5613 let second_array = StringArray::from(vec!["x".to_string()]);
5614 let second_batch =
5615 RecordBatch::try_new(schema.clone(), vec![Arc::new(second_array)]).unwrap();
5616 writer.write(&second_batch).unwrap();
5617 writer.close().unwrap();
5618 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
5619
5620 assert_eq!(
5621 &row_group_sizes(builder.metadata()),
5622 &[10, 1],
5623 "The second write should flush an oversized in-progress row group first",
5624 );
5625 }
5626
5627 #[test]
5628 fn test_row_group_limit_both_row_wins_single_batch() {
5630 let props = WriterProperties::builder()
5631 .set_max_row_group_row_count(Some(200)) .set_max_row_group_bytes(Some(1024 * 1024)) .build();
5634
5635 let builder = write_batches(
5636 WriteBatchesShape {
5637 num_batches: 1,
5638 row_size: 4,
5639 rows_per_batch: 1000,
5640 },
5641 props,
5642 );
5643
5644 assert_eq!(
5645 &row_group_sizes(builder.metadata()),
5646 &[200, 200, 200, 200, 200],
5647 "Row limit should trigger before byte limit"
5648 );
5649 }
5650
5651 #[test]
5652 fn test_row_group_limit_both_row_wins_multiple_batches() {
5654 let props = WriterProperties::builder()
5655 .set_max_row_group_row_count(Some(5)) .set_max_row_group_bytes(Some(9999)) .build();
5658
5659 let builder = write_batches(
5660 WriteBatchesShape {
5661 num_batches: 10,
5662 rows_per_batch: 10,
5663 row_size: 100,
5664 },
5665 props,
5666 );
5667
5668 assert_eq!(
5669 &row_group_sizes(builder.metadata()),
5670 &[5; 20],
5671 "Row limit should trigger before byte limit"
5672 );
5673 }
5674
5675 #[test]
5676 fn test_row_group_limit_both_bytes_wins() {
5678 let props = WriterProperties::builder()
5679 .set_max_row_group_row_count(Some(1000)) .set_max_row_group_bytes(Some(3500)) .build();
5682
5683 let builder = write_batches(
5684 WriteBatchesShape {
5685 num_batches: 10,
5686 rows_per_batch: 10,
5687 row_size: 100,
5688 },
5689 props,
5690 );
5691
5692 let sizes = row_group_sizes(builder.metadata());
5693
5694 assert!(
5695 sizes.len() > 1,
5696 "Byte limit should trigger before row limit, got {sizes:?}",
5697 );
5698
5699 assert!(
5700 sizes.iter().all(|&s| s < 1000),
5701 "No row group should hit the row limit"
5702 );
5703
5704 let total_rows: i64 = sizes.iter().sum();
5705 assert_eq!(total_rows, 100, "Total rows should be preserved");
5706 }
5707
5708 #[test]
5709 fn arrow_column_chunk_close_mut_drops_column_index() {
5710 use crate::arrow::ArrowSchemaConverter;
5711 use crate::file::writer::SerializedFileWriter;
5712
5713 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)]));
5714 let props = Arc::new(
5715 WriterProperties::builder()
5716 .set_statistics_enabled(EnabledStatistics::Page)
5717 .build(),
5718 );
5719 let parquet_schema = ArrowSchemaConverter::new()
5720 .with_coerce_types(props.coerce_types())
5721 .convert(&schema)
5722 .unwrap();
5723
5724 let mut buf = Vec::with_capacity(1024);
5725 let mut writer =
5726 SerializedFileWriter::new(&mut buf, parquet_schema.root_schema_ptr(), props.clone())
5727 .unwrap();
5728
5729 let factory = ArrowRowGroupWriterFactory::new(&writer, Arc::clone(&schema));
5730 let mut col_writers = factory.create_column_writers(0).unwrap();
5731 let arr: ArrayRef = Arc::new(Int32Array::from_iter_values(0..64));
5732 for leaves in compute_leaves(schema.field(0), &arr).unwrap() {
5733 col_writers[0].write(&leaves).unwrap();
5734 }
5735 let mut chunk = col_writers.pop().unwrap().close().unwrap();
5736
5737 assert!(
5739 chunk.close().column_index.is_some(),
5740 "EnabledStatistics::Page should produce a column_index"
5741 );
5742
5743 chunk.close_mut().column_index = None;
5745 assert!(chunk.close().column_index.is_none());
5746
5747 let mut rg = writer.next_row_group().unwrap();
5748 chunk.append_to_row_group(&mut rg).unwrap();
5749 rg.close().unwrap();
5750 let file_meta = writer.close().unwrap();
5751
5752 let cc = file_meta.row_group(0).column(0);
5755 assert!(cc.column_index_range().is_none());
5756 }
5757
5758 fn write_column_to_bytes(array: ArrayRef) -> Bytes {
5760 let schema = Arc::new(Schema::new(vec![Field::new(
5761 "col",
5762 array.data_type().clone(),
5763 true,
5764 )]));
5765 let buf = get_bytes_after_close(
5766 schema.clone(),
5767 &RecordBatch::try_new(schema, vec![array]).unwrap(),
5768 );
5769 Bytes::from(buf)
5770 }
5771
5772 fn read_column_with_schema(bytes: Bytes, schema: SchemaRef) -> ArrayRef {
5776 let opts = crate::arrow::arrow_reader::ArrowReaderOptions::new().with_schema(schema);
5777 ParquetRecordBatchReaderBuilder::try_new_with_options(bytes, opts)
5778 .unwrap()
5779 .build()
5780 .unwrap()
5781 .next()
5782 .unwrap()
5783 .unwrap()
5784 .column(0)
5785 .clone()
5786 }
5787
5788 fn ree_write_read_roundtrip(ree: ArrayRef, flat: ArrayRef) {
5789 let flat_schema = Arc::new(Schema::new(vec![Field::new(
5790 "col",
5791 flat.data_type().clone(),
5792 true,
5793 )]));
5794 let ree_bytes = write_column_to_bytes(ree);
5795 let flat_bytes = write_column_to_bytes(flat.clone());
5796 assert_eq!(
5797 ree_bytes, flat_bytes,
5798 "REE and flat bytes should be identical"
5799 );
5800
5801 let decoded_ree = read_column_with_schema(ree_bytes, flat_schema.clone());
5802 let decoded_flat = read_column_with_schema(flat_bytes, flat_schema);
5803
5804 assert_eq!(decoded_ree.as_ref(), flat.as_ref());
5805 assert_eq!(decoded_ree.as_ref(), decoded_flat.as_ref());
5806 }
5807
5808 #[test]
5809 fn ree_string() {
5810 let ree: ArrayRef = Arc::new(
5811 [Some("a"), Some("a"), None, Some("b"), Some("b")]
5812 .into_iter()
5813 .collect::<Int32RunArray>(),
5814 );
5815 let flat: ArrayRef = Arc::new(StringArray::from(vec![
5816 Some("a"),
5817 Some("a"),
5818 None,
5819 Some("b"),
5820 Some("b"),
5821 ]));
5822 ree_write_read_roundtrip(ree, flat);
5823 }
5824
5825 #[test]
5826 fn ree_int32() {
5827 let mut b = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
5828 for v in [Some(1), Some(1), None, Some(2), Some(2)] {
5829 b.append_option(v);
5830 }
5831 let ree: ArrayRef = Arc::new(b.finish());
5832 let flat: ArrayRef = Arc::new(Int32Array::from(vec![
5833 Some(1),
5834 Some(1),
5835 None,
5836 Some(2),
5837 Some(2),
5838 ]));
5839 ree_write_read_roundtrip(ree, flat);
5840 }
5841
5842 #[test]
5843 fn ree_bool() {
5844 let ree: ArrayRef = Arc::new(
5846 RunArray::try_new(
5847 &Int32Array::from(vec![3, 5, 7]),
5848 &BooleanArray::from(vec![Some(true), None, Some(false)]),
5849 )
5850 .unwrap(),
5851 );
5852 let flat: ArrayRef = Arc::new(BooleanArray::from(vec![
5853 Some(true),
5854 Some(true),
5855 Some(true),
5856 None,
5857 None,
5858 Some(false),
5859 Some(false),
5860 ]));
5861 ree_write_read_roundtrip(ree, flat);
5862 }
5863
5864 #[test]
5865 fn ree_fixed_size_binary() {
5866 let mk = |vals: &[Option<&[u8]>]| -> FixedSizeBinaryArray {
5867 let mut b = FixedSizeBinaryBuilder::new(2);
5868 for v in vals {
5869 match v {
5870 Some(x) => b.append_value(x).unwrap(),
5871 None => b.append_null(),
5872 }
5873 }
5874 b.finish()
5875 };
5876 let ree: ArrayRef = Arc::new(
5878 RunArray::try_new(
5879 &Int32Array::from(vec![2, 4, 6]),
5880 &mk(&[Some(b"aa"), None, Some(b"bb")]),
5881 )
5882 .unwrap(),
5883 );
5884 let flat: ArrayRef = Arc::new(mk(&[
5885 Some(b"aa"),
5886 Some(b"aa"),
5887 None,
5888 None,
5889 Some(b"bb"),
5890 Some(b"bb"),
5891 ]));
5892 ree_write_read_roundtrip(ree, flat);
5893 }
5894
5895 #[test]
5896 fn ree_single_run() {
5897 let ree: ArrayRef = Arc::new(["x", "x", "x"].into_iter().collect::<Int32RunArray>());
5898 let flat: ArrayRef = Arc::new(StringArray::from(vec!["x", "x", "x"]));
5899 ree_write_read_roundtrip(ree, flat);
5900 }
5901
5902 #[test]
5903 fn ree_float32() {
5904 let ree: ArrayRef = Arc::new(
5906 RunArray::try_new(
5907 &Int32Array::from(vec![2, 4, 5]),
5908 &Float32Array::from(vec![Some(1.0_f32), None, Some(2.5_f32)]),
5909 )
5910 .unwrap(),
5911 );
5912 let flat: ArrayRef = Arc::new(Float32Array::from(vec![
5913 Some(1.0_f32),
5914 Some(1.0_f32),
5915 None,
5916 None,
5917 Some(2.5_f32),
5918 ]));
5919 ree_write_read_roundtrip(ree, flat);
5920 }
5921
5922 #[test]
5923 fn ree_sliced() {
5924 let full: ArrayRef = Arc::new(
5929 RunArray::try_new(
5930 &Int32Array::from(vec![3, 5, 7]),
5931 &StringArray::from(vec!["a", "b", "c"]),
5932 )
5933 .unwrap(),
5934 );
5935 let sliced = full.slice(2, 5);
5936 let flat: ArrayRef = Arc::new(StringArray::from(vec!["a", "b", "b", "c", "c"]));
5937 ree_write_read_roundtrip(sliced, flat);
5938 }
5939
5940 #[test]
5941 fn ree_struct_with_ree_child() {
5942 let run_ends = Int32Array::from(vec![2i32, 3, 5]);
5945
5946 let col_a: ArrayRef = Arc::new(
5947 RunArray::try_new(
5948 &run_ends,
5949 &StringArray::from(vec![Some("foo"), None, Some("bar")]),
5950 )
5951 .unwrap(),
5952 );
5953 let col_b: ArrayRef = Arc::new(
5954 RunArray::try_new(&run_ends, &Int32Array::from(vec![Some(1), None, Some(2)])).unwrap(),
5955 );
5956
5957 let struct_array: ArrayRef = Arc::new(StructArray::new(
5958 Fields::from(vec![
5959 Field::new("a", col_a.data_type().clone(), true),
5960 Field::new("b", col_b.data_type().clone(), true),
5961 ]),
5962 vec![col_a, col_b],
5963 None,
5964 ));
5965
5966 let schema = Arc::new(Schema::new(vec![Field::new(
5967 "row",
5968 struct_array.data_type().clone(),
5969 true,
5970 )]));
5971 let batch = RecordBatch::try_new(schema.clone(), vec![struct_array]).unwrap();
5972
5973 let mut buf = Vec::new();
5974 let mut writer = ArrowWriter::try_new(&mut buf, schema, None).unwrap();
5975 writer.write(&batch).unwrap();
5976 let metadata = writer.close().unwrap();
5977
5978 let parquet_schema = metadata.file_metadata().schema_descr();
5979 assert_eq!(parquet_schema.num_columns(), 2);
5980 assert_eq!(
5981 parquet_schema.column(0).physical_type(),
5982 crate::basic::Type::BYTE_ARRAY
5983 );
5984 assert_eq!(parquet_schema.column(0).path().string(), "row.a");
5985 assert_eq!(
5986 parquet_schema.column(1).physical_type(),
5987 crate::basic::Type::INT32
5988 );
5989 assert_eq!(parquet_schema.column(1).path().string(), "row.b");
5990 }
5991}