1use crate::column::chunker::ContentDefinedChunker;
21
22use bytes::Bytes;
23use std::io::{Read, Write};
24use std::slice::Iter;
25use std::sync::{Arc, Mutex};
26use std::vec::IntoIter;
27
28use arrow_array::cast::AsArray;
29use arrow_array::types::*;
30use arrow_array::{ArrayRef, Int32Array, RecordBatch, RecordBatchWriter};
31use arrow_schema::{
32 ArrowError, DataType as ArrowDataType, Field, IntervalUnit, SchemaRef, TimeUnit,
33};
34
35use super::schema::{add_encoded_arrow_schema_to_metadata, decimal_length_from_precision};
36
37use crate::arrow::ArrowSchemaConverter;
38use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder;
39use crate::basic::PageType;
40use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter};
41use crate::column::page_encryption::PageEncryptor;
42use crate::column::writer::encoder::ColumnValueEncoder;
43use crate::column::writer::{
44 ColumnCloseResult, ColumnWriter, GenericColumnWriter, get_column_writer,
45};
46use crate::data_type::{ByteArray, FixedLenByteArray};
47#[cfg(feature = "encryption")]
48use crate::encryption::encrypt::FileEncryptor;
49use crate::errors::{ParquetError, Result};
50use crate::file::metadata::{KeyValue, ParquetMetaData, RowGroupMetaData};
51use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
52use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
53use crate::parquet_thrift::{ThriftCompactOutputProtocol, WriteThrift};
54use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor};
55use levels::{ArrayLevels, calculate_array_levels};
56
57mod byte_array;
58mod levels;
59
60#[doc(inline)]
61pub use crate::column::page_store::{
62 InMemoryPageStore, InMemoryPageStoreFactory, PageKey, PageStore, PageStoreArgs,
63 PageStoreFactory,
64};
65
66pub struct ArrowWriter<W: Write> {
183 writer: SerializedFileWriter<W>,
185
186 in_progress: Option<ArrowRowGroupWriter>,
188
189 arrow_schema: SchemaRef,
193
194 row_group_writer_factory: ArrowRowGroupWriterFactory,
196
197 max_row_group_row_count: Option<usize>,
199
200 max_row_group_bytes: Option<usize>,
202
203 cdc_chunkers: Option<Vec<ContentDefinedChunker>>,
205}
206
207impl<W: Write + Send> std::fmt::Debug for ArrowWriter<W> {
208 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
209 let buffered_memory = self.in_progress_size();
210 f.debug_struct("ArrowWriter")
211 .field("writer", &self.writer)
212 .field("in_progress_size", &format_args!("{buffered_memory} bytes"))
213 .field("in_progress_rows", &self.in_progress_rows())
214 .field("arrow_schema", &self.arrow_schema)
215 .field("max_row_group_row_count", &self.max_row_group_row_count)
216 .field("max_row_group_bytes", &self.max_row_group_bytes)
217 .finish()
218 }
219}
220
221impl<W: Write + Send> ArrowWriter<W> {
222 pub fn try_new(
228 writer: W,
229 arrow_schema: SchemaRef,
230 props: Option<WriterProperties>,
231 ) -> Result<Self> {
232 let options = ArrowWriterOptions::new().with_properties(props.unwrap_or_default());
233 Self::try_new_with_options(writer, arrow_schema, options)
234 }
235
236 pub fn try_new_with_options(
242 writer: W,
243 arrow_schema: SchemaRef,
244 options: ArrowWriterOptions,
245 ) -> Result<Self> {
246 let mut props = options.properties;
247
248 let schema = if let Some(parquet_schema) = options.schema_descr {
249 parquet_schema.clone()
250 } else {
251 let mut converter = ArrowSchemaConverter::new().with_coerce_types(props.coerce_types());
252 if let Some(schema_root) = &options.schema_root {
253 converter = converter.schema_root(schema_root);
254 }
255
256 converter.convert(&arrow_schema)?
257 };
258
259 if !options.skip_arrow_metadata {
260 add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
262 }
263
264 let max_row_group_row_count = props.max_row_group_row_count();
265 let max_row_group_bytes = props.max_row_group_bytes();
266
267 let props_ptr = Arc::new(props);
268 let file_writer =
269 SerializedFileWriter::new(writer, schema.root_schema_ptr(), Arc::clone(&props_ptr))?;
270
271 let mut row_group_writer_factory =
272 ArrowRowGroupWriterFactory::new(&file_writer, arrow_schema.clone());
273 if let Some(page_store_factory) = options.page_store_factory {
274 row_group_writer_factory =
275 row_group_writer_factory.with_page_store_factory(page_store_factory);
276 }
277
278 let cdc_chunkers = props_ptr
279 .content_defined_chunking()
280 .map(|opts| {
281 file_writer
282 .schema_descr()
283 .columns()
284 .iter()
285 .map(|desc| ContentDefinedChunker::new(desc, opts))
286 .collect::<Result<Vec<_>>>()
287 })
288 .transpose()?;
289
290 Ok(Self {
291 writer: file_writer,
292 in_progress: None,
293 arrow_schema,
294 row_group_writer_factory,
295 max_row_group_row_count,
296 max_row_group_bytes,
297 cdc_chunkers,
298 })
299 }
300
301 pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
303 self.writer.flushed_row_groups()
304 }
305
306 pub fn memory_size(&self) -> usize {
311 match &self.in_progress {
312 Some(in_progress) => in_progress.writers.iter().map(|x| x.memory_size()).sum(),
313 None => 0,
314 }
315 }
316
317 pub fn in_progress_size(&self) -> usize {
324 match &self.in_progress {
325 Some(in_progress) => in_progress
326 .writers
327 .iter()
328 .map(|x| x.get_estimated_total_bytes())
329 .sum(),
330 None => 0,
331 }
332 }
333
334 pub fn in_progress_rows(&self) -> usize {
336 self.in_progress
337 .as_ref()
338 .map(|x| x.buffered_rows)
339 .unwrap_or_default()
340 }
341
342 pub fn bytes_written(&self) -> usize {
344 self.writer.bytes_written()
345 }
346
347 pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
359 if batch.num_rows() == 0 {
360 return Ok(());
361 }
362
363 let in_progress = match &mut self.in_progress {
364 Some(in_progress) => in_progress,
365 x => x.insert(
366 self.row_group_writer_factory
367 .create_row_group_writer(self.writer.flushed_row_groups().len())?,
368 ),
369 };
370
371 if let Some(max_rows) = self.max_row_group_row_count {
372 if in_progress.buffered_rows + batch.num_rows() > max_rows {
373 let to_write = max_rows - in_progress.buffered_rows;
374 let a = batch.slice(0, to_write);
375 let b = batch.slice(to_write, batch.num_rows() - to_write);
376 self.write(&a)?;
377 return self.write(&b);
378 }
379 }
380
381 if let Some(max_bytes) = self.max_row_group_bytes {
384 if in_progress.buffered_rows > 0 {
385 let current_bytes = in_progress.get_estimated_total_bytes();
386
387 if current_bytes >= max_bytes {
388 self.flush()?;
389 return self.write(batch);
390 }
391
392 if let Some(avg_row_bytes) = current_bytes
393 .checked_div(in_progress.buffered_rows)
394 .filter(|avg_row_bytes| *avg_row_bytes > 0)
395 {
396 let remaining_bytes = max_bytes - current_bytes;
398 let rows_that_fit = remaining_bytes.checked_div(avg_row_bytes).unwrap_or(0);
399
400 if batch.num_rows() > rows_that_fit {
401 if rows_that_fit > 0 {
402 let a = batch.slice(0, rows_that_fit);
403 let b = batch.slice(rows_that_fit, batch.num_rows() - rows_that_fit);
404 self.write(&a)?;
405 return self.write(&b);
406 } else {
407 self.flush()?;
408 return self.write(batch);
409 }
410 }
411 }
412 }
413 }
414
415 match self.cdc_chunkers.as_mut() {
416 Some(chunkers) => in_progress.write_with_chunkers(batch, chunkers)?,
417 None => in_progress.write(batch)?,
418 }
419
420 let should_flush = self
421 .max_row_group_row_count
422 .is_some_and(|max| in_progress.buffered_rows >= max)
423 || self
424 .max_row_group_bytes
425 .is_some_and(|max| in_progress.get_estimated_total_bytes() >= max);
426
427 if should_flush {
428 self.flush()?
429 }
430 Ok(())
431 }
432
433 pub fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
438 self.writer.write_all(buf)
439 }
440
441 pub fn sync(&mut self) -> std::io::Result<()> {
443 self.writer.flush()
444 }
445
446 pub fn flush(&mut self) -> Result<()> {
451 let in_progress = match self.in_progress.take() {
452 Some(in_progress) => in_progress,
453 None => return Ok(()),
454 };
455
456 let mut row_group_writer = self.writer.next_row_group()?;
457 for chunk in in_progress.close()? {
458 chunk.append_to_row_group(&mut row_group_writer)?;
459 }
460 row_group_writer.close()?;
461 Ok(())
462 }
463
464 pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
468 self.writer.append_key_value_metadata(kv_metadata)
469 }
470
471 pub fn inner(&self) -> &W {
473 self.writer.inner()
474 }
475
476 pub fn inner_mut(&mut self) -> &mut W {
485 self.writer.inner_mut()
486 }
487
488 pub fn into_inner(mut self) -> Result<W> {
490 self.flush()?;
491 self.writer.into_inner()
492 }
493
494 pub fn finish(&mut self) -> Result<ParquetMetaData> {
500 self.flush()?;
501 self.writer.finish()
502 }
503
504 pub fn close(mut self) -> Result<ParquetMetaData> {
506 self.finish()
507 }
508
509 #[deprecated(
511 since = "56.2.0",
512 note = "Use `ArrowRowGroupWriterFactory` instead, see `ArrowColumnWriter` for an example"
513 )]
514 pub fn get_column_writers(&mut self) -> Result<Vec<ArrowColumnWriter>> {
515 self.flush()?;
516 let in_progress = self
517 .row_group_writer_factory
518 .create_row_group_writer(self.writer.flushed_row_groups().len())?;
519 Ok(in_progress.writers)
520 }
521
522 #[deprecated(
524 since = "56.2.0",
525 note = "Use `SerializedFileWriter` directly instead, see `ArrowColumnWriter` for an example"
526 )]
527 pub fn append_row_group(&mut self, chunks: Vec<ArrowColumnChunk>) -> Result<()> {
528 let mut row_group_writer = self.writer.next_row_group()?;
529 for chunk in chunks {
530 chunk.append_to_row_group(&mut row_group_writer)?;
531 }
532 row_group_writer.close()?;
533 Ok(())
534 }
535
536 pub fn into_serialized_writer(
543 mut self,
544 ) -> Result<(SerializedFileWriter<W>, ArrowRowGroupWriterFactory)> {
545 self.flush()?;
546 Ok((self.writer, self.row_group_writer_factory))
547 }
548}
549
550impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> {
551 fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
552 self.write(batch).map_err(|e| e.into())
553 }
554
555 fn close(self) -> std::result::Result<(), ArrowError> {
556 self.close()?;
557 Ok(())
558 }
559}
560
561#[derive(Debug, Clone, Default)]
565pub struct ArrowWriterOptions {
566 properties: WriterProperties,
567 skip_arrow_metadata: bool,
568 schema_root: Option<String>,
569 schema_descr: Option<SchemaDescriptor>,
570 page_store_factory: Option<Arc<dyn PageStoreFactory>>,
571}
572
573impl ArrowWriterOptions {
574 pub fn new() -> Self {
576 Self::default()
577 }
578
579 pub fn with_properties(self, properties: WriterProperties) -> Self {
581 Self { properties, ..self }
582 }
583
584 pub fn with_page_store_factory(self, page_store_factory: Arc<dyn PageStoreFactory>) -> Self {
670 Self {
671 page_store_factory: Some(page_store_factory),
672 ..self
673 }
674 }
675
676 pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
683 Self {
684 skip_arrow_metadata,
685 ..self
686 }
687 }
688
689 pub fn with_schema_root(self, schema_root: String) -> Self {
691 Self {
692 schema_root: Some(schema_root),
693 ..self
694 }
695 }
696
697 pub fn with_parquet_schema(self, schema_descr: SchemaDescriptor) -> Self {
703 Self {
704 schema_descr: Some(schema_descr),
705 ..self
706 }
707 }
708}
709
710struct ArrowColumnChunkData {
716 length: usize,
717 store: Box<dyn PageStore>,
718 keys: Vec<PageKey>,
719 dictionary_keys: Vec<PageKey>,
730 dictionary_len: usize,
734}
735
736impl ArrowColumnChunkData {
737 fn new(store: Box<dyn PageStore>) -> Self {
738 Self {
739 length: 0,
740 store,
741 keys: Vec::new(),
742 dictionary_keys: Vec::new(),
743 dictionary_len: 0,
744 }
745 }
746
747 fn push(&mut self, value: Bytes) -> Result<()> {
750 let key = self.store.put(value)?;
751 self.keys.push(key);
752 Ok(())
753 }
754
755 fn push_dictionary(&mut self, value: Bytes) -> Result<()> {
759 self.dictionary_len += value.len();
760 let key = self.store.put(value)?;
761 self.dictionary_keys.push(key);
762 Ok(())
763 }
764
765 fn memory_size(&self) -> usize {
768 self.store.memory_size()
769 }
770}
771
772struct StreamingColumnChunkReader {
781 store: Box<dyn PageStore>,
782 keys: IntoIter<PageKey>,
785 current: Bytes,
787}
788
789impl StreamingColumnChunkReader {
790 fn new(data: ArrowColumnChunkData) -> Self {
791 let keys = if data.dictionary_keys.is_empty() {
794 data.keys
795 } else {
796 let mut keys = Vec::with_capacity(data.dictionary_keys.len() + data.keys.len());
797 keys.extend(data.dictionary_keys);
798 keys.extend(data.keys);
799 keys
800 };
801 Self {
802 store: data.store,
803 keys: keys.into_iter(),
804 current: Bytes::new(),
805 }
806 }
807}
808
809impl Read for StreamingColumnChunkReader {
810 fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
811 while self.current.is_empty() {
814 if let Some(key) = self.keys.next() {
815 self.current = self.store.take(key).map_err(std::io::Error::other)?;
816 } else {
817 return Ok(0);
818 }
819 }
820
821 let len = self.current.len().min(out.len());
822 let b = self.current.split_to(len);
823 out[..len].copy_from_slice(&b);
824 Ok(len)
825 }
826}
827
828type SharedColumnChunk = Arc<Mutex<ArrowColumnChunkData>>;
833
834struct ArrowPageWriter {
835 buffer: SharedColumnChunk,
836 #[cfg(feature = "encryption")]
837 page_encryptor: Option<PageEncryptor>,
838}
839
840impl ArrowPageWriter {
841 fn new(store: Box<dyn PageStore>) -> Self {
843 Self {
844 buffer: Arc::new(Mutex::new(ArrowColumnChunkData::new(store))),
845 #[cfg(feature = "encryption")]
846 page_encryptor: None,
847 }
848 }
849
850 #[cfg(feature = "encryption")]
851 pub fn with_encryptor(mut self, page_encryptor: Option<PageEncryptor>) -> Self {
852 self.page_encryptor = page_encryptor;
853 self
854 }
855
856 #[cfg(feature = "encryption")]
857 fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
858 self.page_encryptor.as_mut()
859 }
860
861 #[cfg(not(feature = "encryption"))]
862 fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
863 None
864 }
865}
866
867impl PageWriter for ArrowPageWriter {
868 fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
869 let page = match self.page_encryptor_mut() {
870 Some(page_encryptor) => page_encryptor.encrypt_compressed_page(page)?,
871 None => page,
872 };
873
874 let page_header = page.to_thrift_header()?;
875 let header = {
876 let mut header = Vec::with_capacity(1024);
877
878 match self.page_encryptor_mut() {
879 Some(page_encryptor) => {
880 page_encryptor.encrypt_page_header(&page_header, &mut header)?;
881 if page.compressed_page().is_data_page() {
882 page_encryptor.increment_page();
883 }
884 }
885 None => {
886 let mut protocol = ThriftCompactOutputProtocol::new(&mut header);
887 page_header.write_thrift(&mut protocol)?;
888 }
889 };
890
891 Bytes::from(header)
892 };
893
894 let mut buf = self.buffer.try_lock().unwrap();
895
896 let data = page.compressed_page().buffer().clone();
897 let compressed_size = data.len() + header.len();
898
899 let mut spec = PageWriteSpec::new();
900 spec.page_type = page.page_type();
901 spec.num_values = page.num_values();
902 spec.uncompressed_size = page.uncompressed_size() + header.len();
903 spec.offset = buf.length as u64;
904 spec.compressed_size = compressed_size;
905 spec.bytes_written = compressed_size as u64;
906
907 buf.length += compressed_size;
908 if spec.page_type == PageType::DICTIONARY_PAGE {
909 buf.push_dictionary(header)?;
912 buf.push_dictionary(data)?;
913 } else {
914 buf.push(header)?;
915 buf.push(data)?;
916 }
917
918 Ok(spec)
919 }
920
921 fn defers_dictionary_ordering(&self) -> bool {
922 true
927 }
928
929 fn buffered_memory_size(&self) -> usize {
930 self.buffer.try_lock().unwrap().memory_size()
933 }
934
935 fn close(&mut self) -> Result<()> {
936 Ok(())
937 }
938}
939
940#[derive(Debug)]
942pub struct ArrowLeafColumn(ArrayLevels);
943
944pub fn compute_leaves(field: &Field, array: &ArrayRef) -> Result<Vec<ArrowLeafColumn>> {
949 let levels = calculate_array_levels(array, field)?;
950 Ok(levels.into_iter().map(ArrowLeafColumn).collect())
951}
952
953pub struct ArrowColumnChunk {
955 data: ArrowColumnChunkData,
956 close: ColumnCloseResult,
957}
958
959impl std::fmt::Debug for ArrowColumnChunk {
960 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
961 f.debug_struct("ArrowColumnChunk")
962 .field("length", &self.data.length)
963 .finish_non_exhaustive()
964 }
965}
966
967impl ArrowColumnChunk {
968 pub fn close(&self) -> &ColumnCloseResult {
975 &self.close
976 }
977
978 pub fn close_mut(&mut self) -> &mut ColumnCloseResult {
985 &mut self.close
986 }
987
988 pub fn append_to_row_group<W: Write + Send>(
991 self,
992 writer: &mut SerializedRowGroupWriter<'_, W>,
993 ) -> Result<()> {
994 let ArrowColumnChunk { data, close } = self;
995
996 let close = close.update_dictionary_location(data.dictionary_len)?;
1000
1001 let reader = StreamingColumnChunkReader::new(data);
1002 writer.append_column_from_read(reader, close)
1003 }
1004}
1005
1006pub struct ArrowColumnWriter {
1104 writer: ArrowColumnWriterImpl,
1105 chunk: SharedColumnChunk,
1106}
1107
1108impl std::fmt::Debug for ArrowColumnWriter {
1109 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1110 f.debug_struct("ArrowColumnWriter").finish_non_exhaustive()
1111 }
1112}
1113
1114enum ArrowColumnWriterImpl {
1115 ByteArray(GenericColumnWriter<'static, ByteArrayEncoder>),
1116 Column(ColumnWriter<'static>),
1117}
1118
1119impl ArrowColumnWriter {
1120 pub fn write(&mut self, col: &ArrowLeafColumn) -> Result<()> {
1122 self.write_internal(&col.0)
1123 }
1124
1125 fn write_with_chunker(
1127 &mut self,
1128 col: &ArrowLeafColumn,
1129 chunker: &mut ContentDefinedChunker,
1130 ) -> Result<()> {
1131 let levels = &col.0;
1132 let chunks = chunker.get_arrow_chunks(
1133 levels.def_level_data().as_ref(),
1134 levels.rep_level_data().as_ref(),
1135 levels.array(),
1136 )?;
1137
1138 let num_chunks = chunks.len();
1139 for (i, chunk) in chunks.iter().enumerate() {
1140 let chunk_levels = levels.slice_for_chunk(chunk);
1141 self.write_internal(&chunk_levels)?;
1142
1143 if i + 1 < num_chunks {
1145 match &mut self.writer {
1146 ArrowColumnWriterImpl::Column(c) => c.add_data_page()?,
1147 ArrowColumnWriterImpl::ByteArray(c) => c.add_data_page()?,
1148 }
1149 }
1150 }
1151 Ok(())
1152 }
1153
1154 fn write_internal(&mut self, levels: &ArrayLevels) -> Result<()> {
1155 match &mut self.writer {
1156 ArrowColumnWriterImpl::Column(c) => {
1157 let leaf = levels.array();
1158 match leaf.as_any_dictionary_opt() {
1159 Some(dictionary) => {
1160 let materialized =
1161 arrow_select::take::take(dictionary.values(), dictionary.keys(), None)?;
1162 write_leaf(c, &materialized, levels)?
1163 }
1164 None => write_leaf(c, leaf, levels)?,
1165 };
1166 }
1167 ArrowColumnWriterImpl::ByteArray(c) => {
1168 write_primitive(c, levels.array().as_ref(), levels)?;
1169 }
1170 }
1171 Ok(())
1172 }
1173
1174 pub fn close(self) -> Result<ArrowColumnChunk> {
1176 let close = match self.writer {
1177 ArrowColumnWriterImpl::ByteArray(c) => c.close()?,
1178 ArrowColumnWriterImpl::Column(c) => c.close()?,
1179 };
1180 let chunk = Arc::try_unwrap(self.chunk).ok().unwrap();
1181 let data = chunk.into_inner().unwrap();
1182 Ok(ArrowColumnChunk { data, close })
1183 }
1184
1185 pub fn memory_size(&self) -> usize {
1196 match &self.writer {
1197 ArrowColumnWriterImpl::ByteArray(c) => c.memory_size(),
1198 ArrowColumnWriterImpl::Column(c) => c.memory_size(),
1199 }
1200 }
1201
1202 pub fn get_estimated_total_bytes(&self) -> usize {
1210 match &self.writer {
1211 ArrowColumnWriterImpl::ByteArray(c) => c.get_estimated_total_bytes() as _,
1212 ArrowColumnWriterImpl::Column(c) => c.get_estimated_total_bytes() as _,
1213 }
1214 }
1215}
1216
1217#[derive(Debug)]
1224struct ArrowRowGroupWriter {
1225 writers: Vec<ArrowColumnWriter>,
1226 schema: SchemaRef,
1227 buffered_rows: usize,
1228}
1229
1230impl ArrowRowGroupWriter {
1231 fn new(writers: Vec<ArrowColumnWriter>, arrow: &SchemaRef) -> Self {
1232 Self {
1233 writers,
1234 schema: arrow.clone(),
1235 buffered_rows: 0,
1236 }
1237 }
1238
1239 fn write(&mut self, batch: &RecordBatch) -> Result<()> {
1240 self.buffered_rows += batch.num_rows();
1241 let mut writers = self.writers.iter_mut();
1242 for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
1243 for leaf in compute_leaves(field.as_ref(), column)? {
1244 writers.next().unwrap().write(&leaf)?;
1245 }
1246 }
1247 Ok(())
1248 }
1249
1250 fn write_with_chunkers(
1251 &mut self,
1252 batch: &RecordBatch,
1253 chunkers: &mut [ContentDefinedChunker],
1254 ) -> Result<()> {
1255 self.buffered_rows += batch.num_rows();
1256 let mut writers = self.writers.iter_mut();
1257 let mut chunkers = chunkers.iter_mut();
1258 for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
1259 for leaf in compute_leaves(field.as_ref(), column)? {
1260 writers
1261 .next()
1262 .unwrap()
1263 .write_with_chunker(&leaf, chunkers.next().unwrap())?;
1264 }
1265 }
1266 Ok(())
1267 }
1268
1269 fn get_estimated_total_bytes(&self) -> usize {
1271 self.writers
1272 .iter()
1273 .map(|x| x.get_estimated_total_bytes())
1274 .sum()
1275 }
1276
1277 fn close(self) -> Result<Vec<ArrowColumnChunk>> {
1278 self.writers
1279 .into_iter()
1280 .map(|writer| writer.close())
1281 .collect()
1282 }
1283}
1284
1285#[derive(Debug)]
1290pub struct ArrowRowGroupWriterFactory {
1291 schema: SchemaDescPtr,
1292 arrow_schema: SchemaRef,
1293 props: WriterPropertiesPtr,
1294 page_store_factory: Arc<dyn PageStoreFactory>,
1295 #[cfg(feature = "encryption")]
1296 file_encryptor: Option<Arc<FileEncryptor>>,
1297}
1298
1299impl ArrowRowGroupWriterFactory {
1300 pub fn new<W: Write + Send>(
1302 file_writer: &SerializedFileWriter<W>,
1303 arrow_schema: SchemaRef,
1304 ) -> Self {
1305 let schema = Arc::clone(file_writer.schema_descr_ptr());
1306 let props = Arc::clone(file_writer.properties());
1307 Self {
1308 schema,
1309 arrow_schema,
1310 props,
1311 page_store_factory: Arc::new(InMemoryPageStoreFactory),
1312 #[cfg(feature = "encryption")]
1313 file_encryptor: file_writer.file_encryptor(),
1314 }
1315 }
1316
1317 pub fn with_page_store_factory(
1321 mut self,
1322 page_store_factory: Arc<dyn PageStoreFactory>,
1323 ) -> Self {
1324 self.page_store_factory = page_store_factory;
1325 self
1326 }
1327
1328 fn create_row_group_writer(&self, row_group_index: usize) -> Result<ArrowRowGroupWriter> {
1329 let writers = self.create_column_writers(row_group_index)?;
1330 Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema))
1331 }
1332
1333 pub fn create_column_writers(&self, row_group_index: usize) -> Result<Vec<ArrowColumnWriter>> {
1335 let mut writers = Vec::with_capacity(self.arrow_schema.fields.len());
1336 let mut leaves = self.schema.columns().iter();
1337 let column_factory = self.column_writer_factory(row_group_index);
1338 for field in &self.arrow_schema.fields {
1339 column_factory.get_arrow_column_writer(
1340 field.data_type(),
1341 &self.props,
1342 &mut leaves,
1343 &mut writers,
1344 )?;
1345 }
1346 Ok(writers)
1347 }
1348
1349 #[cfg(feature = "encryption")]
1350 fn column_writer_factory(&self, row_group_idx: usize) -> ArrowColumnWriterFactory {
1351 ArrowColumnWriterFactory::new()
1352 .with_page_store_factory(self.page_store_factory.clone())
1353 .with_file_encryptor(row_group_idx, self.file_encryptor.clone())
1354 }
1355
1356 #[cfg(not(feature = "encryption"))]
1357 fn column_writer_factory(&self, _row_group_idx: usize) -> ArrowColumnWriterFactory {
1358 ArrowColumnWriterFactory::new().with_page_store_factory(self.page_store_factory.clone())
1359 }
1360}
1361
1362#[deprecated(since = "57.0.0", note = "Use `ArrowRowGroupWriterFactory` instead")]
1364pub fn get_column_writers(
1365 parquet: &SchemaDescriptor,
1366 props: &WriterPropertiesPtr,
1367 arrow: &SchemaRef,
1368) -> Result<Vec<ArrowColumnWriter>> {
1369 let mut writers = Vec::with_capacity(arrow.fields.len());
1370 let mut leaves = parquet.columns().iter();
1371 let column_factory = ArrowColumnWriterFactory::new();
1372 for field in &arrow.fields {
1373 column_factory.get_arrow_column_writer(
1374 field.data_type(),
1375 props,
1376 &mut leaves,
1377 &mut writers,
1378 )?;
1379 }
1380 Ok(writers)
1381}
1382
1383struct ArrowColumnWriterFactory {
1385 page_store_factory: Arc<dyn PageStoreFactory>,
1387 #[cfg(feature = "encryption")]
1388 row_group_index: usize,
1389 #[cfg(feature = "encryption")]
1390 file_encryptor: Option<Arc<FileEncryptor>>,
1391}
1392
1393impl ArrowColumnWriterFactory {
1394 pub fn new() -> Self {
1395 Self {
1396 page_store_factory: Arc::new(InMemoryPageStoreFactory),
1397 #[cfg(feature = "encryption")]
1398 row_group_index: 0,
1399 #[cfg(feature = "encryption")]
1400 file_encryptor: None,
1401 }
1402 }
1403
1404 pub fn with_page_store_factory(
1406 mut self,
1407 page_store_factory: Arc<dyn PageStoreFactory>,
1408 ) -> Self {
1409 self.page_store_factory = page_store_factory;
1410 self
1411 }
1412
1413 #[cfg(feature = "encryption")]
1414 pub fn with_file_encryptor(
1415 mut self,
1416 row_group_index: usize,
1417 file_encryptor: Option<Arc<FileEncryptor>>,
1418 ) -> Self {
1419 self.row_group_index = row_group_index;
1420 self.file_encryptor = file_encryptor;
1421 self
1422 }
1423
1424 #[cfg(feature = "encryption")]
1425 fn create_page_writer(
1426 &self,
1427 column_descriptor: &ColumnDescPtr,
1428 column_index: usize,
1429 ) -> Result<Box<ArrowPageWriter>> {
1430 let column_path = column_descriptor.path().string();
1431 let page_encryptor = PageEncryptor::create_if_column_encrypted(
1432 &self.file_encryptor,
1433 self.row_group_index,
1434 column_index,
1435 &column_path,
1436 )?;
1437 let args = PageStoreArgs::new(column_index, column_descriptor);
1438 let store = self.page_store_factory.create(&args)?;
1439 Ok(Box::new(
1440 ArrowPageWriter::new(store).with_encryptor(page_encryptor),
1441 ))
1442 }
1443
1444 #[cfg(not(feature = "encryption"))]
1445 fn create_page_writer(
1446 &self,
1447 column_descriptor: &ColumnDescPtr,
1448 column_index: usize,
1449 ) -> Result<Box<ArrowPageWriter>> {
1450 let args = PageStoreArgs::new(column_index, column_descriptor);
1451 let store = self.page_store_factory.create(&args)?;
1452 Ok(Box::new(ArrowPageWriter::new(store)))
1453 }
1454
1455 fn get_arrow_column_writer(
1458 &self,
1459 data_type: &ArrowDataType,
1460 props: &WriterPropertiesPtr,
1461 leaves: &mut Iter<'_, ColumnDescPtr>,
1462 out: &mut Vec<ArrowColumnWriter>,
1463 ) -> Result<()> {
1464 let col = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
1466 let page_writer = self.create_page_writer(desc, out.len())?;
1467 let chunk = page_writer.buffer.clone();
1468 let writer = get_column_writer(desc.clone(), props.clone(), page_writer);
1469 Ok(ArrowColumnWriter {
1470 chunk,
1471 writer: ArrowColumnWriterImpl::Column(writer),
1472 })
1473 };
1474
1475 let bytes = |desc: &ColumnDescPtr| -> Result<ArrowColumnWriter> {
1477 let page_writer = self.create_page_writer(desc, out.len())?;
1478 let chunk = page_writer.buffer.clone();
1479 let writer = GenericColumnWriter::new(desc.clone(), props.clone(), page_writer);
1480 Ok(ArrowColumnWriter {
1481 chunk,
1482 writer: ArrowColumnWriterImpl::ByteArray(writer),
1483 })
1484 };
1485
1486 match data_type {
1487 _ if data_type.is_primitive() => out.push(col(leaves.next().unwrap())?),
1488 ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Boolean | ArrowDataType::Null => {
1489 out.push(col(leaves.next().unwrap())?)
1490 }
1491 ArrowDataType::LargeBinary
1492 | ArrowDataType::Binary
1493 | ArrowDataType::Utf8
1494 | ArrowDataType::LargeUtf8
1495 | ArrowDataType::BinaryView
1496 | ArrowDataType::Utf8View => out.push(bytes(leaves.next().unwrap())?),
1497 ArrowDataType::List(f)
1498 | ArrowDataType::LargeList(f)
1499 | ArrowDataType::FixedSizeList(f, _)
1500 | ArrowDataType::ListView(f)
1501 | ArrowDataType::LargeListView(f) => {
1502 self.get_arrow_column_writer(f.data_type(), props, leaves, out)?
1503 }
1504 ArrowDataType::Struct(fields) => {
1505 for field in fields {
1506 self.get_arrow_column_writer(field.data_type(), props, leaves, out)?
1507 }
1508 }
1509 ArrowDataType::Map(f, _) => match f.data_type() {
1510 ArrowDataType::Struct(f) => {
1511 self.get_arrow_column_writer(f[0].data_type(), props, leaves, out)?;
1512 self.get_arrow_column_writer(f[1].data_type(), props, leaves, out)?
1513 }
1514 _ => unreachable!("invalid map type"),
1515 },
1516 ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1517 ArrowDataType::Utf8
1518 | ArrowDataType::LargeUtf8
1519 | ArrowDataType::Binary
1520 | ArrowDataType::LargeBinary => out.push(bytes(leaves.next().unwrap())?),
1521 ArrowDataType::Utf8View | ArrowDataType::BinaryView => {
1522 out.push(bytes(leaves.next().unwrap())?)
1523 }
1524 ArrowDataType::FixedSizeBinary(_) => out.push(bytes(leaves.next().unwrap())?),
1525 _ => out.push(col(leaves.next().unwrap())?),
1526 },
1527 ArrowDataType::RunEndEncoded(_, value_field) => {
1528 self.get_arrow_column_writer(value_field.data_type(), props, leaves, out)?
1529 }
1530 _ => {
1531 return Err(ParquetError::NYI(format!(
1532 "Attempting to write an Arrow type {data_type} to parquet that is not yet implemented"
1533 )));
1534 }
1535 }
1536 Ok(())
1537 }
1538}
1539
1540fn write_leaf(
1541 writer: &mut ColumnWriter<'_>,
1542 column: &dyn arrow_array::Array,
1543 levels: &ArrayLevels,
1544) -> Result<usize> {
1545 let indices = levels.non_null_indices();
1546
1547 match writer {
1548 ColumnWriter::Int32ColumnWriter(typed) => {
1550 match column.data_type() {
1551 ArrowDataType::Null => {
1552 let array = Int32Array::new_null(column.len());
1553 write_primitive(typed, array.values(), levels)
1554 }
1555 ArrowDataType::Int8 => {
1556 let array: Int32Array = column.as_primitive::<Int8Type>().unary(|x| x as i32);
1557 write_primitive(typed, array.values(), levels)
1558 }
1559 ArrowDataType::Int16 => {
1560 let array: Int32Array = column.as_primitive::<Int16Type>().unary(|x| x as i32);
1561 write_primitive(typed, array.values(), levels)
1562 }
1563 ArrowDataType::Int32 => {
1564 write_primitive(typed, column.as_primitive::<Int32Type>().values(), levels)
1565 }
1566 ArrowDataType::UInt8 => {
1567 let array: Int32Array = column.as_primitive::<UInt8Type>().unary(|x| x as i32);
1568 write_primitive(typed, array.values(), levels)
1569 }
1570 ArrowDataType::UInt16 => {
1571 let array: Int32Array = column.as_primitive::<UInt16Type>().unary(|x| x as i32);
1572 write_primitive(typed, array.values(), levels)
1573 }
1574 ArrowDataType::UInt32 => {
1575 let array = column.as_primitive::<UInt32Type>();
1578 write_primitive(typed, array.values().inner().typed_data(), levels)
1579 }
1580 ArrowDataType::Date32 => {
1581 let array = column.as_primitive::<Date32Type>();
1582 write_primitive(typed, array.values(), levels)
1583 }
1584 ArrowDataType::Time32(TimeUnit::Second) => {
1585 let array = column.as_primitive::<Time32SecondType>();
1586 write_primitive(typed, array.values(), levels)
1587 }
1588 ArrowDataType::Time32(TimeUnit::Millisecond) => {
1589 let array = column.as_primitive::<Time32MillisecondType>();
1590 write_primitive(typed, array.values(), levels)
1591 }
1592 ArrowDataType::Date64 => {
1593 let array: Int32Array = column
1595 .as_primitive::<Date64Type>()
1596 .unary(|x| (x / 86_400_000) as _);
1597
1598 write_primitive(typed, array.values(), levels)
1599 }
1600 ArrowDataType::Decimal32(_, _) => {
1601 let array = column
1602 .as_primitive::<Decimal32Type>()
1603 .unary::<_, Int32Type>(|v| v);
1604 write_primitive(typed, array.values(), levels)
1605 }
1606 ArrowDataType::Decimal64(_, _) => {
1607 let array = column
1609 .as_primitive::<Decimal64Type>()
1610 .unary::<_, Int32Type>(|v| v as i32);
1611 write_primitive(typed, array.values(), levels)
1612 }
1613 ArrowDataType::Decimal128(_, _) => {
1614 let array = column
1616 .as_primitive::<Decimal128Type>()
1617 .unary::<_, Int32Type>(|v| v as i32);
1618 write_primitive(typed, array.values(), levels)
1619 }
1620 ArrowDataType::Decimal256(_, _) => {
1621 let array = column
1623 .as_primitive::<Decimal256Type>()
1624 .unary::<_, Int32Type>(|v| v.as_i128() as i32);
1625 write_primitive(typed, array.values(), levels)
1626 }
1627 d => Err(ParquetError::General(format!("Cannot coerce {d} to I32"))),
1628 }
1629 }
1630 ColumnWriter::BoolColumnWriter(typed) => {
1631 let array = column.as_boolean();
1632 let values = get_bool_array_slice(array, indices.iter().copied());
1633 typed.write_batch_internal(
1634 values.as_slice(),
1635 None,
1636 levels.def_level_data().as_ref(),
1637 levels.rep_level_data().as_ref(),
1638 None,
1639 None,
1640 None,
1641 )
1642 }
1643 ColumnWriter::Int64ColumnWriter(typed) => {
1644 match column.data_type() {
1645 ArrowDataType::Date64 => {
1646 let array = column
1647 .as_primitive::<Date64Type>()
1648 .reinterpret_cast::<Int64Type>();
1649
1650 write_primitive(typed, array.values(), levels)
1651 }
1652 ArrowDataType::Int64 => {
1653 let array = column.as_primitive::<Int64Type>();
1654 write_primitive(typed, array.values(), levels)
1655 }
1656 ArrowDataType::UInt64 => {
1657 let values = column.as_primitive::<UInt64Type>().values();
1658 let array = values.inner().typed_data::<i64>();
1661 write_primitive(typed, array, levels)
1662 }
1663 ArrowDataType::Time64(TimeUnit::Microsecond) => {
1664 let array = column.as_primitive::<Time64MicrosecondType>();
1665 write_primitive(typed, array.values(), levels)
1666 }
1667 ArrowDataType::Time64(TimeUnit::Nanosecond) => {
1668 let array = column.as_primitive::<Time64NanosecondType>();
1669 write_primitive(typed, array.values(), levels)
1670 }
1671 ArrowDataType::Timestamp(unit, _) => match unit {
1672 TimeUnit::Second => {
1673 let array = column.as_primitive::<TimestampSecondType>();
1674 write_primitive(typed, array.values(), levels)
1675 }
1676 TimeUnit::Millisecond => {
1677 let array = column.as_primitive::<TimestampMillisecondType>();
1678 write_primitive(typed, array.values(), levels)
1679 }
1680 TimeUnit::Microsecond => {
1681 let array = column.as_primitive::<TimestampMicrosecondType>();
1682 write_primitive(typed, array.values(), levels)
1683 }
1684 TimeUnit::Nanosecond => {
1685 let array = column.as_primitive::<TimestampNanosecondType>();
1686 write_primitive(typed, array.values(), levels)
1687 }
1688 },
1689 ArrowDataType::Duration(unit) => match unit {
1690 TimeUnit::Second => {
1691 let array = column.as_primitive::<DurationSecondType>();
1692 write_primitive(typed, array.values(), levels)
1693 }
1694 TimeUnit::Millisecond => {
1695 let array = column.as_primitive::<DurationMillisecondType>();
1696 write_primitive(typed, array.values(), levels)
1697 }
1698 TimeUnit::Microsecond => {
1699 let array = column.as_primitive::<DurationMicrosecondType>();
1700 write_primitive(typed, array.values(), levels)
1701 }
1702 TimeUnit::Nanosecond => {
1703 let array = column.as_primitive::<DurationNanosecondType>();
1704 write_primitive(typed, array.values(), levels)
1705 }
1706 },
1707 ArrowDataType::Decimal64(_, _) => {
1708 let array = column
1709 .as_primitive::<Decimal64Type>()
1710 .reinterpret_cast::<Int64Type>();
1711 write_primitive(typed, array.values(), levels)
1712 }
1713 ArrowDataType::Decimal128(_, _) => {
1714 let array = column
1716 .as_primitive::<Decimal128Type>()
1717 .unary::<_, Int64Type>(|v| v as i64);
1718 write_primitive(typed, array.values(), levels)
1719 }
1720 ArrowDataType::Decimal256(_, _) => {
1721 let array = column
1723 .as_primitive::<Decimal256Type>()
1724 .unary::<_, Int64Type>(|v| v.as_i128() as i64);
1725 write_primitive(typed, array.values(), levels)
1726 }
1727 d => Err(ParquetError::General(format!("Cannot coerce {d} to I64"))),
1728 }
1729 }
1730 ColumnWriter::Int96ColumnWriter(_typed) => {
1731 unreachable!("Currently unreachable because data type not supported")
1732 }
1733 ColumnWriter::FloatColumnWriter(typed) => {
1734 let array = column.as_primitive::<Float32Type>();
1735 write_primitive(typed, array.values(), levels)
1736 }
1737 ColumnWriter::DoubleColumnWriter(typed) => {
1738 let array = column.as_primitive::<Float64Type>();
1739 write_primitive(typed, array.values(), levels)
1740 }
1741 ColumnWriter::ByteArrayColumnWriter(_) => {
1742 unreachable!("should use ByteArrayWriter")
1743 }
1744 ColumnWriter::FixedLenByteArrayColumnWriter(typed) => {
1745 let bytes = match column.data_type() {
1746 ArrowDataType::Interval(interval_unit) => match interval_unit {
1747 IntervalUnit::YearMonth => {
1748 let array = column.as_primitive::<IntervalYearMonthType>();
1749 get_interval_ym_array_slice(array, indices.iter().copied())
1750 }
1751 IntervalUnit::DayTime => {
1752 let array = column.as_primitive::<IntervalDayTimeType>();
1753 get_interval_dt_array_slice(array, indices.iter().copied())
1754 }
1755 _ => {
1756 return Err(ParquetError::NYI(format!(
1757 "Attempting to write an Arrow interval type {interval_unit:?} to parquet that is not yet implemented"
1758 )));
1759 }
1760 },
1761 ArrowDataType::FixedSizeBinary(_) => {
1762 let array = column.as_fixed_size_binary();
1763 get_fsb_array_slice(array, indices.iter().copied())
1764 }
1765 ArrowDataType::Decimal32(_, _) => {
1766 let array = column.as_primitive::<Decimal32Type>();
1767 get_decimal_32_array_slice(array, indices.iter().copied())
1768 }
1769 ArrowDataType::Decimal64(_, _) => {
1770 let array = column.as_primitive::<Decimal64Type>();
1771 get_decimal_64_array_slice(array, indices.iter().copied())
1772 }
1773 ArrowDataType::Decimal128(_, _) => {
1774 let array = column.as_primitive::<Decimal128Type>();
1775 get_decimal_128_array_slice(array, indices.iter().copied())
1776 }
1777 ArrowDataType::Decimal256(_, _) => {
1778 let array = column.as_primitive::<Decimal256Type>();
1779 get_decimal_256_array_slice(array, indices.iter().copied())
1780 }
1781 ArrowDataType::Float16 => {
1782 let array = column.as_primitive::<Float16Type>();
1783 get_float_16_array_slice(array, indices.iter().copied())
1784 }
1785 _ => {
1786 return Err(ParquetError::NYI(
1787 "Attempting to write an Arrow type that is not yet implemented".to_string(),
1788 ));
1789 }
1790 };
1791 typed.write_batch_internal(
1792 bytes.as_slice(),
1793 None,
1794 levels.def_level_data().as_ref(),
1795 levels.rep_level_data().as_ref(),
1796 None,
1797 None,
1798 None,
1799 )
1800 }
1801 }
1802}
1803
1804fn write_primitive<E: ColumnValueEncoder>(
1805 writer: &mut GenericColumnWriter<E>,
1806 values: &E::Values,
1807 levels: &ArrayLevels,
1808) -> Result<usize> {
1809 writer.write_batch_internal(
1810 values,
1811 Some(levels.non_null_indices()),
1812 levels.def_level_data().as_ref(),
1813 levels.rep_level_data().as_ref(),
1814 None,
1815 None,
1816 None,
1817 )
1818}
1819
1820fn get_bool_array_slice(
1821 array: &arrow_array::BooleanArray,
1822 indices: impl ExactSizeIterator<Item = usize>,
1823) -> Vec<bool> {
1824 let mut values = Vec::with_capacity(indices.len());
1825 for i in indices {
1826 values.push(array.value(i))
1827 }
1828 values
1829}
1830
1831fn get_interval_ym_array_slice(
1834 array: &arrow_array::IntervalYearMonthArray,
1835 indices: impl ExactSizeIterator<Item = usize>,
1836) -> Vec<FixedLenByteArray> {
1837 let mut values = Vec::with_capacity(indices.len());
1838 for i in indices {
1839 let mut value = array.value(i).to_le_bytes().to_vec();
1840 let mut suffix = vec![0; 8];
1841 value.append(&mut suffix);
1842 values.push(FixedLenByteArray::from(ByteArray::from(value)))
1843 }
1844 values
1845}
1846
1847fn get_interval_dt_array_slice(
1850 array: &arrow_array::IntervalDayTimeArray,
1851 indices: impl ExactSizeIterator<Item = usize>,
1852) -> Vec<FixedLenByteArray> {
1853 let mut values = Vec::with_capacity(indices.len());
1854 for i in indices {
1855 let mut out = [0; 12];
1856 let value = array.value(i);
1857 out[4..8].copy_from_slice(&value.days.to_le_bytes());
1858 out[8..12].copy_from_slice(&value.milliseconds.to_le_bytes());
1859 values.push(FixedLenByteArray::from(ByteArray::from(out.to_vec())));
1860 }
1861 values
1862}
1863
1864fn get_decimal_32_array_slice(
1865 array: &arrow_array::Decimal32Array,
1866 indices: impl ExactSizeIterator<Item = usize>,
1867) -> Vec<FixedLenByteArray> {
1868 let mut values = Vec::with_capacity(indices.len());
1869 let size = decimal_length_from_precision(array.precision());
1870 for i in indices {
1871 let as_be_bytes = array.value(i).to_be_bytes();
1872 let resized_value = as_be_bytes[(4 - size)..].to_vec();
1873 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1874 }
1875 values
1876}
1877
1878fn get_decimal_64_array_slice(
1879 array: &arrow_array::Decimal64Array,
1880 indices: impl ExactSizeIterator<Item = usize>,
1881) -> Vec<FixedLenByteArray> {
1882 let mut values = Vec::with_capacity(indices.len());
1883 let size = decimal_length_from_precision(array.precision());
1884 for i in indices {
1885 let as_be_bytes = array.value(i).to_be_bytes();
1886 let resized_value = as_be_bytes[(8 - size)..].to_vec();
1887 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1888 }
1889 values
1890}
1891
1892fn get_decimal_128_array_slice(
1893 array: &arrow_array::Decimal128Array,
1894 indices: impl ExactSizeIterator<Item = usize>,
1895) -> Vec<FixedLenByteArray> {
1896 let mut values = Vec::with_capacity(indices.len());
1897 let size = decimal_length_from_precision(array.precision());
1898 for i in indices {
1899 let as_be_bytes = array.value(i).to_be_bytes();
1900 let resized_value = as_be_bytes[(16 - size)..].to_vec();
1901 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1902 }
1903 values
1904}
1905
1906fn get_decimal_256_array_slice(
1907 array: &arrow_array::Decimal256Array,
1908 indices: impl ExactSizeIterator<Item = usize>,
1909) -> Vec<FixedLenByteArray> {
1910 let mut values = Vec::with_capacity(indices.len());
1911 let size = decimal_length_from_precision(array.precision());
1912 for i in indices {
1913 let as_be_bytes = array.value(i).to_be_bytes();
1914 let resized_value = as_be_bytes[(32 - size)..].to_vec();
1915 values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
1916 }
1917 values
1918}
1919
1920fn get_float_16_array_slice(
1921 array: &arrow_array::Float16Array,
1922 indices: impl ExactSizeIterator<Item = usize>,
1923) -> Vec<FixedLenByteArray> {
1924 let mut values = Vec::with_capacity(indices.len());
1925 for i in indices {
1926 let value = array.value(i).to_le_bytes().to_vec();
1927 values.push(FixedLenByteArray::from(ByteArray::from(value)));
1928 }
1929 values
1930}
1931
1932fn get_fsb_array_slice(
1933 array: &arrow_array::FixedSizeBinaryArray,
1934 indices: impl ExactSizeIterator<Item = usize>,
1935) -> Vec<FixedLenByteArray> {
1936 let mut values = Vec::with_capacity(indices.len());
1937 for i in indices {
1938 let value = array.value(i).to_vec();
1939 values.push(FixedLenByteArray::from(ByteArray::from(value)))
1940 }
1941 values
1942}
1943
1944#[cfg(test)]
1945mod tests {
1946 use super::*;
1947 use std::collections::HashMap;
1948
1949 use std::fs::File;
1950
1951 use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
1952 use crate::arrow::{ARROW_SCHEMA_META_KEY, PARQUET_FIELD_ID_META_KEY};
1953 use crate::column::page::{Page, PageReader};
1954 use crate::file::metadata::thrift::PageHeader;
1955 use crate::file::page_index::column_index::ColumnIndexMetaData;
1956 use crate::file::reader::SerializedPageReader;
1957 use crate::parquet_thrift::{ReadThrift, ThriftSliceInputProtocol};
1958 use crate::schema::types::ColumnPath;
1959 use arrow::datatypes::ToByteSlice;
1960 use arrow::datatypes::{DataType, Schema};
1961 use arrow::error::Result as ArrowResult;
1962 use arrow::util::data_gen::create_random_array;
1963 use arrow::util::pretty::pretty_format_batches;
1964 use arrow::{array::*, buffer::Buffer};
1965 use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano, NullBuffer, OffsetBuffer, i256};
1966 use arrow_schema::Fields;
1967 use half::f16;
1968 use num_traits::{FromPrimitive, ToPrimitive};
1969 use tempfile::tempfile;
1970
1971 use crate::basic::Encoding;
1972 use crate::data_type::AsBytes;
1973 use crate::file::metadata::{ColumnChunkMetaData, ParquetMetaData, ParquetMetaDataReader};
1974 use crate::file::properties::{
1975 BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion,
1976 };
1977 use crate::file::serialized_reader::ReadOptionsBuilder;
1978 use crate::file::{
1979 reader::{FileReader, SerializedFileReader},
1980 statistics::Statistics,
1981 };
1982
1983 #[derive(Debug, Default)]
1988 struct RecordingPageStore {
1989 next: u64,
1990 blobs: HashMap<u64, Bytes>,
1991 puts: Arc<std::sync::atomic::AtomicUsize>,
1992 }
1993
1994 impl PageStore for RecordingPageStore {
1995 fn put(&mut self, value: Bytes) -> Result<PageKey> {
1996 let id = 100 + self.next * 7;
1998 self.next += 1;
1999 self.puts.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2000 self.blobs.insert(id, value);
2001 Ok(PageKey::new(id))
2002 }
2003
2004 fn take(&mut self, key: PageKey) -> Result<Bytes> {
2005 self.blobs
2006 .remove(&key.get())
2007 .ok_or_else(|| ParquetError::General(format!("missing key {}", key.get())))
2008 }
2009 }
2010
2011 #[derive(Debug)]
2012 struct RecordingPageStoreFactory {
2013 puts: Arc<std::sync::atomic::AtomicUsize>,
2014 }
2015
2016 impl PageStoreFactory for RecordingPageStoreFactory {
2017 fn create(&self, _args: &PageStoreArgs<'_>) -> Result<Box<dyn PageStore>> {
2018 Ok(Box::new(RecordingPageStore {
2019 puts: self.puts.clone(),
2020 ..Default::default()
2021 }))
2022 }
2023 }
2024
2025 #[test]
2029 fn custom_page_store_is_byte_identical_to_default() {
2030 let schema = Arc::new(Schema::new(vec![
2031 Field::new("i", DataType::Int32, true),
2032 Field::new("s", DataType::Utf8, true),
2034 ]));
2035 let i = Int32Array::from(vec![Some(1), None, Some(3), Some(4), Some(5), Some(6)]);
2036 let s = StringArray::from(vec![
2037 Some("a"),
2038 Some("bb"),
2039 Some("a"),
2040 None,
2041 Some("bb"),
2042 Some("ccc"),
2043 ]);
2044 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(i), Arc::new(s)]).unwrap();
2045
2046 let props = WriterProperties::builder()
2049 .set_max_row_group_row_count(Some(3))
2050 .build();
2051
2052 let write = |factory: Option<Arc<dyn PageStoreFactory>>| {
2053 let mut buffer = Vec::new();
2054 let mut opts = ArrowWriterOptions::new().with_properties(props.clone());
2055 if let Some(factory) = factory {
2056 opts = opts.with_page_store_factory(factory);
2057 }
2058 let mut writer =
2059 ArrowWriter::try_new_with_options(&mut buffer, schema.clone(), opts).unwrap();
2060 writer.write(&batch).unwrap();
2061 writer.close().unwrap();
2062 buffer
2063 };
2064
2065 let default_bytes = write(None);
2066
2067 let puts = Arc::new(std::sync::atomic::AtomicUsize::new(0));
2068 let custom_bytes = write(Some(Arc::new(RecordingPageStoreFactory {
2069 puts: puts.clone(),
2070 })));
2071
2072 assert!(
2073 puts.load(std::sync::atomic::Ordering::Relaxed) > 0,
2074 "custom PageStore was never written to"
2075 );
2076 assert_eq!(
2077 default_bytes, custom_bytes,
2078 "a custom PageStore must produce byte-identical output to the default"
2079 );
2080 }
2081
2082 #[test]
2088 fn dictionary_column_round_trips_with_offset_index_disabled() {
2089 let schema = Arc::new(Schema::new(vec![Field::new("k", DataType::Int32, true)]));
2090
2091 let values: Vec<Option<i32>> = (0..50_000).map(|i| Some(i % 8)).collect();
2094 let array = Int32Array::from(values.clone());
2095 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
2096
2097 let props = WriterProperties::builder()
2098 .set_offset_index_disabled(true)
2099 .set_data_page_row_count_limit(4096)
2100 .build();
2101 let opts = ArrowWriterOptions::new().with_properties(props);
2102
2103 let mut buffer = Vec::new();
2104 let mut writer =
2105 ArrowWriter::try_new_with_options(&mut buffer, schema.clone(), opts).unwrap();
2106 writer.write(&batch).unwrap();
2107 writer.close().unwrap();
2108
2109 let reader = ParquetRecordBatchReader::try_new(Bytes::from(buffer), values.len()).unwrap();
2110 let read: Vec<RecordBatch> = reader.collect::<ArrowResult<_>>().unwrap();
2111 let read_values: Vec<Option<i32>> = read
2112 .iter()
2113 .flat_map(|b| b.column(0).as_primitive::<Int32Type>().iter())
2114 .collect();
2115 assert_eq!(read_values, values);
2116 }
2117
2118 #[test]
2123 fn dictionary_page_is_routed_through_the_store() {
2124 #[derive(Debug, Default)]
2126 struct SizeRecordingPageStore {
2127 blobs: Vec<Bytes>,
2128 bytes_put: Arc<std::sync::atomic::AtomicUsize>,
2129 }
2130 impl PageStore for SizeRecordingPageStore {
2131 fn put(&mut self, value: Bytes) -> Result<PageKey> {
2132 self.bytes_put
2133 .fetch_add(value.len(), std::sync::atomic::Ordering::Relaxed);
2134 let key = PageKey::new(self.blobs.len() as u64);
2135 self.blobs.push(value);
2136 Ok(key)
2137 }
2138 fn take(&mut self, key: PageKey) -> Result<Bytes> {
2139 Ok(std::mem::take(&mut self.blobs[key.get() as usize]))
2140 }
2141 }
2142 #[derive(Debug)]
2143 struct Factory {
2144 bytes_put: Arc<std::sync::atomic::AtomicUsize>,
2145 }
2146 impl PageStoreFactory for Factory {
2147 fn create(&self, _args: &PageStoreArgs<'_>) -> Result<Box<dyn PageStore>> {
2148 Ok(Box::new(SizeRecordingPageStore {
2149 bytes_put: self.bytes_put.clone(),
2150 ..Default::default()
2151 }))
2152 }
2153 }
2154
2155 let schema = Arc::new(Schema::new(vec![Field::new("s", DataType::Utf8, false)]));
2156 let values: Vec<&str> = (0..2048)
2159 .map(|i| ["alpha", "beta", "gamma", "delta"][i % 4])
2160 .collect();
2161 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(StringArray::from(values))])
2162 .unwrap();
2163
2164 let bytes_put = Arc::new(std::sync::atomic::AtomicUsize::new(0));
2165 let opts = ArrowWriterOptions::new().with_page_store_factory(Arc::new(Factory {
2166 bytes_put: bytes_put.clone(),
2167 }));
2168
2169 let mut buffer = Vec::new();
2172 let mut writer =
2173 ArrowWriter::try_new_with_options(&mut buffer, schema.clone(), opts).unwrap();
2174 writer.write(&batch).unwrap();
2175 writer.close().unwrap();
2176
2177 let reader = SerializedFileReader::new(Bytes::from(buffer)).unwrap();
2178 let column = reader.metadata().row_group(0).column(0);
2179 assert!(
2180 column.dictionary_page_offset().is_some(),
2181 "expected the column to be dictionary-encoded"
2182 );
2183
2184 assert_eq!(
2188 bytes_put.load(std::sync::atomic::Ordering::Relaxed) as i64,
2189 column.compressed_size(),
2190 "the dictionary page must pass through the store like any other page"
2191 );
2192 }
2193
2194 #[test]
2195 fn arrow_writer() {
2196 let schema = Schema::new(vec![
2198 Field::new("a", DataType::Int32, false),
2199 Field::new("b", DataType::Int32, true),
2200 ]);
2201
2202 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2204 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
2205
2206 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap();
2208
2209 roundtrip(batch, Some(SMALL_SIZE / 2));
2210 }
2211
2212 fn get_bytes_after_close(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
2213 let mut buffer = vec![];
2214
2215 let mut writer = ArrowWriter::try_new(&mut buffer, schema, None).unwrap();
2216 writer.write(expected_batch).unwrap();
2217 writer.close().unwrap();
2218
2219 buffer
2220 }
2221
2222 fn get_bytes_by_into_inner(schema: SchemaRef, expected_batch: &RecordBatch) -> Vec<u8> {
2223 let mut writer = ArrowWriter::try_new(Vec::new(), schema, None).unwrap();
2224 writer.write(expected_batch).unwrap();
2225 writer.into_inner().unwrap()
2226 }
2227
2228 #[test]
2229 fn roundtrip_bytes() {
2230 let schema = Arc::new(Schema::new(vec![
2232 Field::new("a", DataType::Int32, false),
2233 Field::new("b", DataType::Int32, true),
2234 ]));
2235
2236 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2238 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
2239
2240 let expected_batch =
2242 RecordBatch::try_new(schema.clone(), vec![Arc::new(a), Arc::new(b)]).unwrap();
2243
2244 for buffer in [
2245 get_bytes_after_close(schema.clone(), &expected_batch),
2246 get_bytes_by_into_inner(schema, &expected_batch),
2247 ] {
2248 let cursor = Bytes::from(buffer);
2249 let mut record_batch_reader = ParquetRecordBatchReader::try_new(cursor, 1024).unwrap();
2250
2251 let actual_batch = record_batch_reader
2252 .next()
2253 .expect("No batch found")
2254 .expect("Unable to get batch");
2255
2256 assert_eq!(expected_batch.schema(), actual_batch.schema());
2257 assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
2258 assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
2259 for i in 0..expected_batch.num_columns() {
2260 let expected_data = expected_batch.column(i).to_data();
2261 let actual_data = actual_batch.column(i).to_data();
2262
2263 assert_eq!(expected_data, actual_data);
2264 }
2265 }
2266 }
2267
2268 #[test]
2269 fn arrow_writer_non_null() {
2270 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
2272
2273 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2275
2276 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2278
2279 roundtrip(batch, Some(SMALL_SIZE / 2));
2280 }
2281
2282 #[test]
2283 fn arrow_writer_list() {
2284 let schema = Schema::new(vec![Field::new(
2286 "a",
2287 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
2288 true,
2289 )]);
2290
2291 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2293
2294 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
2297
2298 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2300 DataType::Int32,
2301 false,
2302 ))))
2303 .len(5)
2304 .add_buffer(a_value_offsets)
2305 .add_child_data(a_values.into_data())
2306 .null_bit_buffer(Some(Buffer::from([0b00011011])))
2307 .build()
2308 .unwrap();
2309 let a = ListArray::from(a_list_data);
2310
2311 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2313
2314 assert_eq!(batch.column(0).null_count(), 1);
2315
2316 roundtrip(batch, None);
2319 }
2320
2321 #[test]
2322 fn arrow_writer_list_non_null() {
2323 let schema = Schema::new(vec![Field::new(
2325 "a",
2326 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false))),
2327 false,
2328 )]);
2329
2330 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2332
2333 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
2336
2337 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
2339 DataType::Int32,
2340 false,
2341 ))))
2342 .len(5)
2343 .add_buffer(a_value_offsets)
2344 .add_child_data(a_values.into_data())
2345 .build()
2346 .unwrap();
2347 let a = ListArray::from(a_list_data);
2348
2349 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2351
2352 assert_eq!(batch.column(0).null_count(), 0);
2355
2356 roundtrip(batch, None);
2357 }
2358
2359 #[test]
2360 fn arrow_writer_list_view() {
2361 let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
2362 let schema = Schema::new(vec![Field::new(
2363 "a",
2364 DataType::ListView(list_field.clone()),
2365 true,
2366 )]);
2367
2368 let a = ListViewArray::new(
2370 list_field,
2371 vec![0, 1, 0, 3, 6].into(),
2372 vec![1, 2, 0, 3, 4].into(),
2373 Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
2374 Some(vec![true, true, false, true, true].into()),
2375 );
2376
2377 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2378
2379 assert_eq!(batch.column(0).null_count(), 1);
2380
2381 roundtrip(batch, None);
2382 }
2383
2384 #[test]
2385 fn arrow_writer_list_view_non_null() {
2386 let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
2387 let schema = Schema::new(vec![Field::new(
2388 "a",
2389 DataType::ListView(list_field.clone()),
2390 false,
2391 )]);
2392
2393 let a = ListViewArray::new(
2395 list_field,
2396 vec![0, 1, 0, 3, 6].into(),
2397 vec![1, 2, 0, 3, 4].into(),
2398 Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
2399 None,
2400 );
2401
2402 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2403
2404 assert_eq!(batch.column(0).null_count(), 0);
2405
2406 roundtrip(batch, None);
2407 }
2408
2409 #[test]
2410 fn arrow_writer_list_view_out_of_order() {
2411 let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
2412 let schema = Schema::new(vec![Field::new(
2413 "a",
2414 DataType::ListView(list_field.clone()),
2415 false,
2416 )]);
2417
2418 let a = ListViewArray::new(
2420 list_field,
2421 vec![0, 1, 0, 6, 3].into(),
2422 vec![1, 2, 0, 4, 3].into(),
2423 Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
2424 None,
2425 );
2426
2427 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2428
2429 roundtrip(batch, None);
2430 }
2431
2432 #[test]
2433 fn arrow_writer_large_list_view() {
2434 let list_field = Arc::new(Field::new_list_field(DataType::Int32, false));
2435 let schema = Schema::new(vec![Field::new(
2436 "a",
2437 DataType::LargeListView(list_field.clone()),
2438 true,
2439 )]);
2440
2441 let a = LargeListViewArray::new(
2443 list_field,
2444 vec![0i64, 1, 0, 3, 6].into(),
2445 vec![1i64, 2, 0, 3, 4].into(),
2446 Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
2447 Some(vec![true, true, false, true, true].into()),
2448 );
2449
2450 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2451
2452 assert_eq!(batch.column(0).null_count(), 1);
2453
2454 roundtrip(batch, None);
2455 }
2456
2457 #[test]
2458 fn arrow_writer_list_view_with_struct() {
2459 let struct_fields = Fields::from(vec![
2461 Field::new("id", DataType::Int32, false),
2462 Field::new("name", DataType::Utf8, false),
2463 ]);
2464 let struct_type = DataType::Struct(struct_fields.clone());
2465 let list_field = Arc::new(Field::new("item", struct_type.clone(), false));
2466
2467 let schema = Schema::new(vec![Field::new(
2468 "a",
2469 DataType::ListView(list_field.clone()),
2470 true,
2471 )]);
2472
2473 let id_array = Int32Array::from(vec![1, 2, 3, 4, 5]);
2475 let name_array = StringArray::from(vec!["a", "b", "c", "d", "e"]);
2476 let struct_array = StructArray::new(
2477 struct_fields,
2478 vec![Arc::new(id_array), Arc::new(name_array)],
2479 None,
2480 );
2481
2482 let list_view = ListViewArray::new(
2484 list_field,
2485 vec![0, 2, 2].into(), vec![2, 0, 3].into(), Arc::new(struct_array),
2488 Some(vec![true, false, true].into()),
2489 );
2490
2491 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(list_view)]).unwrap();
2492
2493 roundtrip(batch, None);
2494 }
2495
2496 #[test]
2497 fn arrow_writer_binary() {
2498 let string_field = Field::new("a", DataType::Utf8, false);
2499 let binary_field = Field::new("b", DataType::Binary, false);
2500 let schema = Schema::new(vec![string_field, binary_field]);
2501
2502 let raw_string_values = vec!["foo", "bar", "baz", "quux"];
2503 let raw_binary_values = [
2504 b"foo".to_vec(),
2505 b"bar".to_vec(),
2506 b"baz".to_vec(),
2507 b"quux".to_vec(),
2508 ];
2509 let raw_binary_value_refs = raw_binary_values
2510 .iter()
2511 .map(|x| x.as_slice())
2512 .collect::<Vec<_>>();
2513
2514 let string_values = StringArray::from(raw_string_values.clone());
2515 let binary_values = BinaryArray::from(raw_binary_value_refs);
2516 let batch = RecordBatch::try_new(
2517 Arc::new(schema),
2518 vec![Arc::new(string_values), Arc::new(binary_values)],
2519 )
2520 .unwrap();
2521
2522 roundtrip(batch, Some(SMALL_SIZE / 2));
2523 }
2524
2525 #[test]
2526 fn arrow_writer_binary_view() {
2527 let string_field = Field::new("a", DataType::Utf8View, false);
2528 let binary_field = Field::new("b", DataType::BinaryView, false);
2529 let nullable_string_field = Field::new("a", DataType::Utf8View, true);
2530 let schema = Schema::new(vec![string_field, binary_field, nullable_string_field]);
2531
2532 let raw_string_values = vec!["foo", "bar", "large payload over 12 bytes", "lulu"];
2533 let raw_binary_values = vec![
2534 b"foo".to_vec(),
2535 b"bar".to_vec(),
2536 b"large payload over 12 bytes".to_vec(),
2537 b"lulu".to_vec(),
2538 ];
2539 let nullable_string_values =
2540 vec![Some("foo"), None, Some("large payload over 12 bytes"), None];
2541
2542 let string_view_values = StringViewArray::from(raw_string_values);
2543 let binary_view_values = BinaryViewArray::from_iter_values(raw_binary_values);
2544 let nullable_string_view_values = StringViewArray::from(nullable_string_values);
2545 let batch = RecordBatch::try_new(
2546 Arc::new(schema),
2547 vec![
2548 Arc::new(string_view_values),
2549 Arc::new(binary_view_values),
2550 Arc::new(nullable_string_view_values),
2551 ],
2552 )
2553 .unwrap();
2554
2555 roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
2556 roundtrip(batch, None);
2557 }
2558
2559 #[test]
2560 fn arrow_writer_binary_view_long_value() {
2561 let string_field = Field::new("a", DataType::Utf8View, false);
2562 let binary_field = Field::new("b", DataType::BinaryView, false);
2563 let schema = Schema::new(vec![string_field, binary_field]);
2564
2565 let long = "a".repeat(128);
2569 let raw_string_values = vec!["foo", long.as_str(), "bar"];
2570 let raw_binary_values = vec![b"foo".to_vec(), long.as_bytes().to_vec(), b"bar".to_vec()];
2571
2572 let string_view_values: ArrayRef = Arc::new(StringViewArray::from(raw_string_values));
2573 let binary_view_values: ArrayRef =
2574 Arc::new(BinaryViewArray::from_iter_values(raw_binary_values));
2575
2576 one_column_roundtrip(Arc::clone(&string_view_values), false);
2577 one_column_roundtrip(Arc::clone(&binary_view_values), false);
2578
2579 let batch = RecordBatch::try_new(
2580 Arc::new(schema),
2581 vec![string_view_values, binary_view_values],
2582 )
2583 .unwrap();
2584
2585 for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2587 let props = WriterProperties::builder()
2588 .set_writer_version(version)
2589 .set_dictionary_enabled(false)
2590 .build();
2591 roundtrip_opts(&batch, props);
2592 }
2593 }
2594
2595 fn get_decimal_batch(precision: u8, scale: i8) -> RecordBatch {
2596 let decimal_field = Field::new("a", DataType::Decimal128(precision, scale), false);
2597 let schema = Schema::new(vec![decimal_field]);
2598
2599 let decimal_values = vec![10_000, 50_000, 0, -100]
2600 .into_iter()
2601 .map(Some)
2602 .collect::<Decimal128Array>()
2603 .with_precision_and_scale(precision, scale)
2604 .unwrap();
2605
2606 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(decimal_values)]).unwrap()
2607 }
2608
2609 #[test]
2610 fn arrow_writer_decimal() {
2611 let batch_int32_decimal = get_decimal_batch(5, 2);
2613 roundtrip(batch_int32_decimal, Some(SMALL_SIZE / 2));
2614 let batch_int64_decimal = get_decimal_batch(12, 2);
2616 roundtrip(batch_int64_decimal, Some(SMALL_SIZE / 2));
2617 let batch_fixed_len_byte_array_decimal = get_decimal_batch(30, 2);
2619 roundtrip(batch_fixed_len_byte_array_decimal, Some(SMALL_SIZE / 2));
2620 }
2621
2622 #[test]
2623 fn arrow_writer_complex() {
2624 let struct_field_d = Arc::new(Field::new("d", DataType::Float64, true));
2626 let struct_field_f = Arc::new(Field::new("f", DataType::Float32, true));
2627 let struct_field_g = Arc::new(Field::new_list(
2628 "g",
2629 Field::new_list_field(DataType::Int16, true),
2630 false,
2631 ));
2632 let struct_field_h = Arc::new(Field::new_list(
2633 "h",
2634 Field::new_list_field(DataType::Int16, false),
2635 true,
2636 ));
2637 let struct_field_e = Arc::new(Field::new_struct(
2638 "e",
2639 vec![
2640 struct_field_f.clone(),
2641 struct_field_g.clone(),
2642 struct_field_h.clone(),
2643 ],
2644 false,
2645 ));
2646 let schema = Schema::new(vec![
2647 Field::new("a", DataType::Int32, false),
2648 Field::new("b", DataType::Int32, true),
2649 Field::new_struct(
2650 "c",
2651 vec![struct_field_d.clone(), struct_field_e.clone()],
2652 false,
2653 ),
2654 ]);
2655
2656 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2658 let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
2659 let d = Float64Array::from(vec![None, None, None, Some(1.0), None]);
2660 let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]);
2661
2662 let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
2663
2664 let g_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
2667
2668 let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
2670 .len(5)
2671 .add_buffer(g_value_offsets.clone())
2672 .add_child_data(g_value.to_data())
2673 .build()
2674 .unwrap();
2675 let g = ListArray::from(g_list_data);
2676 let h_list_data = ArrayData::builder(struct_field_h.data_type().clone())
2678 .len(5)
2679 .add_buffer(g_value_offsets)
2680 .add_child_data(g_value.to_data())
2681 .null_bit_buffer(Some(Buffer::from([0b00011011])))
2682 .build()
2683 .unwrap();
2684 let h = ListArray::from(h_list_data);
2685
2686 let e = StructArray::from(vec![
2687 (struct_field_f, Arc::new(f) as ArrayRef),
2688 (struct_field_g, Arc::new(g) as ArrayRef),
2689 (struct_field_h, Arc::new(h) as ArrayRef),
2690 ]);
2691
2692 let c = StructArray::from(vec![
2693 (struct_field_d, Arc::new(d) as ArrayRef),
2694 (struct_field_e, Arc::new(e) as ArrayRef),
2695 ]);
2696
2697 let batch = RecordBatch::try_new(
2699 Arc::new(schema),
2700 vec![Arc::new(a), Arc::new(b), Arc::new(c)],
2701 )
2702 .unwrap();
2703
2704 roundtrip(batch.clone(), Some(SMALL_SIZE / 2));
2705 roundtrip(batch, Some(SMALL_SIZE / 3));
2706 }
2707
2708 #[test]
2709 fn arrow_writer_complex_mixed() {
2710 let offset_field = Arc::new(Field::new("offset", DataType::Int32, false));
2715 let partition_field = Arc::new(Field::new("partition", DataType::Int64, true));
2716 let topic_field = Arc::new(Field::new("topic", DataType::Utf8, true));
2717 let schema = Schema::new(vec![Field::new(
2718 "some_nested_object",
2719 DataType::Struct(Fields::from(vec![
2720 offset_field.clone(),
2721 partition_field.clone(),
2722 topic_field.clone(),
2723 ])),
2724 false,
2725 )]);
2726
2727 let offset = Int32Array::from(vec![1, 2, 3, 4, 5]);
2729 let partition = Int64Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
2730 let topic = StringArray::from(vec![Some("A"), None, Some("A"), Some(""), None]);
2731
2732 let some_nested_object = StructArray::from(vec![
2733 (offset_field, Arc::new(offset) as ArrayRef),
2734 (partition_field, Arc::new(partition) as ArrayRef),
2735 (topic_field, Arc::new(topic) as ArrayRef),
2736 ]);
2737
2738 let batch =
2740 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]).unwrap();
2741
2742 roundtrip(batch, Some(SMALL_SIZE / 2));
2743 }
2744
2745 #[test]
2746 fn arrow_writer_map() {
2747 let json_content = r#"
2749 {"stocks":{"long": "$AAA", "short": "$BBB"}}
2750 {"stocks":{"long": null, "long": "$CCC", "short": null}}
2751 {"stocks":{"hedged": "$YYY", "long": null, "short": "$D"}}
2752 "#;
2753 let entries_struct_type = DataType::Struct(Fields::from(vec![
2754 Field::new("key", DataType::Utf8, false),
2755 Field::new("value", DataType::Utf8, true),
2756 ]));
2757 let stocks_field = Field::new(
2758 "stocks",
2759 DataType::Map(
2760 Arc::new(Field::new("entries", entries_struct_type, false)),
2761 false,
2762 ),
2763 true,
2764 );
2765 let schema = Arc::new(Schema::new(vec![stocks_field]));
2766 let builder = arrow::json::ReaderBuilder::new(schema).with_batch_size(64);
2767 let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
2768
2769 let batch = reader.next().unwrap().unwrap();
2770 roundtrip(batch, None);
2771 }
2772
2773 #[test]
2774 fn arrow_writer_2_level_struct() {
2775 let field_c = Field::new("c", DataType::Int32, true);
2777 let field_b = Field::new("b", DataType::Struct(vec![field_c].into()), true);
2778 let type_a = DataType::Struct(vec![field_b.clone()].into());
2779 let field_a = Field::new("a", type_a, true);
2780 let schema = Schema::new(vec![field_a.clone()]);
2781
2782 let c = Int32Array::from(vec![Some(1), None, Some(3), None, None, Some(6)]);
2784 let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
2785 .len(6)
2786 .null_bit_buffer(Some(Buffer::from([0b00100111])))
2787 .add_child_data(c.into_data())
2788 .build()
2789 .unwrap();
2790 let b = StructArray::from(b_data);
2791 let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
2792 .len(6)
2793 .null_bit_buffer(Some(Buffer::from([0b00101111])))
2794 .add_child_data(b.into_data())
2795 .build()
2796 .unwrap();
2797 let a = StructArray::from(a_data);
2798
2799 assert_eq!(a.null_count(), 1);
2800 assert_eq!(a.column(0).null_count(), 2);
2801
2802 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2804
2805 roundtrip(batch, Some(SMALL_SIZE / 2));
2806 }
2807
2808 #[test]
2809 fn arrow_writer_2_level_struct_non_null() {
2810 let field_c = Field::new("c", DataType::Int32, false);
2812 let type_b = DataType::Struct(vec![field_c].into());
2813 let field_b = Field::new("b", type_b.clone(), false);
2814 let type_a = DataType::Struct(vec![field_b].into());
2815 let field_a = Field::new("a", type_a.clone(), false);
2816 let schema = Schema::new(vec![field_a]);
2817
2818 let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
2820 let b_data = ArrayDataBuilder::new(type_b)
2821 .len(6)
2822 .add_child_data(c.into_data())
2823 .build()
2824 .unwrap();
2825 let b = StructArray::from(b_data);
2826 let a_data = ArrayDataBuilder::new(type_a)
2827 .len(6)
2828 .add_child_data(b.into_data())
2829 .build()
2830 .unwrap();
2831 let a = StructArray::from(a_data);
2832
2833 assert_eq!(a.null_count(), 0);
2834 assert_eq!(a.column(0).null_count(), 0);
2835
2836 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2838
2839 roundtrip(batch, Some(SMALL_SIZE / 2));
2840 }
2841
2842 #[test]
2843 fn arrow_writer_2_level_struct_mixed_null() {
2844 let field_c = Field::new("c", DataType::Int32, false);
2846 let type_b = DataType::Struct(vec![field_c].into());
2847 let field_b = Field::new("b", type_b.clone(), true);
2848 let type_a = DataType::Struct(vec![field_b].into());
2849 let field_a = Field::new("a", type_a.clone(), false);
2850 let schema = Schema::new(vec![field_a]);
2851
2852 let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
2854 let b_data = ArrayDataBuilder::new(type_b)
2855 .len(6)
2856 .null_bit_buffer(Some(Buffer::from([0b00100111])))
2857 .add_child_data(c.into_data())
2858 .build()
2859 .unwrap();
2860 let b = StructArray::from(b_data);
2861 let a_data = ArrayDataBuilder::new(type_a)
2863 .len(6)
2864 .add_child_data(b.into_data())
2865 .build()
2866 .unwrap();
2867 let a = StructArray::from(a_data);
2868
2869 assert_eq!(a.null_count(), 0);
2870 assert_eq!(a.column(0).null_count(), 2);
2871
2872 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2874
2875 roundtrip(batch, Some(SMALL_SIZE / 2));
2876 }
2877
2878 #[test]
2879 fn arrow_writer_2_level_struct_mixed_null_2() {
2880 let field_c = Field::new("c", DataType::Int32, false);
2882 let field_d = Field::new("d", DataType::FixedSizeBinary(4), false);
2883 let field_e = Field::new(
2884 "e",
2885 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2886 false,
2887 );
2888
2889 let field_b = Field::new(
2890 "b",
2891 DataType::Struct(vec![field_c, field_d, field_e].into()),
2892 false,
2893 );
2894 let type_a = DataType::Struct(vec![field_b.clone()].into());
2895 let field_a = Field::new("a", type_a, true);
2896 let schema = Schema::new(vec![field_a.clone()]);
2897
2898 let c = Int32Array::from_iter_values(0..6);
2900 let d = FixedSizeBinaryArray::try_from_iter(
2901 ["aaaa", "bbbb", "cccc", "dddd", "eeee", "ffff"].into_iter(),
2902 )
2903 .expect("four byte values");
2904 let e = Int32DictionaryArray::from_iter(["one", "two", "three", "four", "five", "one"]);
2905 let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
2906 .len(6)
2907 .add_child_data(c.into_data())
2908 .add_child_data(d.into_data())
2909 .add_child_data(e.into_data())
2910 .build()
2911 .unwrap();
2912 let b = StructArray::from(b_data);
2913 let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
2914 .len(6)
2915 .null_bit_buffer(Some(Buffer::from([0b00100101])))
2916 .add_child_data(b.into_data())
2917 .build()
2918 .unwrap();
2919 let a = StructArray::from(a_data);
2920
2921 assert_eq!(a.null_count(), 3);
2922 assert_eq!(a.column(0).null_count(), 0);
2923
2924 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
2926
2927 roundtrip(batch, Some(SMALL_SIZE / 2));
2928 }
2929
2930 #[test]
2931 fn test_fixed_size_binary_in_dict() {
2932 fn test_fixed_size_binary_in_dict_inner<K>()
2933 where
2934 K: ArrowDictionaryKeyType,
2935 K::Native: FromPrimitive + ToPrimitive + TryFrom<u8>,
2936 <<K as arrow_array::ArrowPrimitiveType>::Native as TryFrom<u8>>::Error: std::fmt::Debug,
2937 {
2938 let field = Field::new(
2939 "a",
2940 DataType::Dictionary(
2941 Box::new(K::DATA_TYPE),
2942 Box::new(DataType::FixedSizeBinary(4)),
2943 ),
2944 false,
2945 );
2946 let schema = Schema::new(vec![field]);
2947
2948 let keys: Vec<K::Native> = vec![
2949 K::Native::try_from(0u8).unwrap(),
2950 K::Native::try_from(0u8).unwrap(),
2951 K::Native::try_from(1u8).unwrap(),
2952 ];
2953 let keys = PrimitiveArray::<K>::from_iter_values(keys);
2954 let values = FixedSizeBinaryArray::try_from_iter(
2955 vec![vec![0, 0, 0, 0], vec![1, 1, 1, 1]].into_iter(),
2956 )
2957 .unwrap();
2958
2959 let data = DictionaryArray::<K>::new(keys, Arc::new(values));
2960 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(data)]).unwrap();
2961 roundtrip(batch, None);
2962 }
2963
2964 test_fixed_size_binary_in_dict_inner::<UInt8Type>();
2965 test_fixed_size_binary_in_dict_inner::<UInt16Type>();
2966 test_fixed_size_binary_in_dict_inner::<UInt32Type>();
2967 test_fixed_size_binary_in_dict_inner::<UInt16Type>();
2968 test_fixed_size_binary_in_dict_inner::<Int8Type>();
2969 test_fixed_size_binary_in_dict_inner::<Int16Type>();
2970 test_fixed_size_binary_in_dict_inner::<Int32Type>();
2971 test_fixed_size_binary_in_dict_inner::<Int64Type>();
2972 }
2973
2974 #[test]
2975 fn test_empty_dict() {
2976 let struct_fields = Fields::from(vec![Field::new(
2977 "dict",
2978 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2979 false,
2980 )]);
2981
2982 let schema = Schema::new(vec![Field::new_struct(
2983 "struct",
2984 struct_fields.clone(),
2985 true,
2986 )]);
2987 let dictionary = Arc::new(DictionaryArray::new(
2988 Int32Array::new_null(5),
2989 Arc::new(StringArray::new_null(0)),
2990 ));
2991
2992 let s = StructArray::new(
2993 struct_fields,
2994 vec![dictionary],
2995 Some(NullBuffer::new_null(5)),
2996 );
2997
2998 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(s)]).unwrap();
2999 roundtrip(batch, None);
3000 }
3001 #[test]
3002 fn arrow_writer_page_size() {
3003 let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)]));
3004
3005 let mut builder = StringBuilder::with_capacity(100, 329 * 10_000);
3006
3007 for i in 0..10 {
3009 let value = i
3010 .to_string()
3011 .repeat(10)
3012 .chars()
3013 .take(10)
3014 .collect::<String>();
3015
3016 builder.append_value(value);
3017 }
3018
3019 let array = Arc::new(builder.finish());
3020
3021 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
3022
3023 let file = tempfile::tempfile().unwrap();
3024
3025 let props = WriterProperties::builder()
3027 .set_data_page_size_limit(1)
3028 .set_dictionary_page_size_limit(1)
3029 .set_write_batch_size(1)
3030 .build();
3031
3032 let mut writer =
3033 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
3034 .expect("Unable to write file");
3035 writer.write(&batch).unwrap();
3036 writer.close().unwrap();
3037
3038 let options = ReadOptionsBuilder::new().with_page_index().build();
3039 let reader =
3040 SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap();
3041
3042 let column = reader.metadata().row_group(0).columns();
3043
3044 assert_eq!(column.len(), 1);
3045
3046 assert!(
3049 column[0].dictionary_page_offset().is_some(),
3050 "Expected a dictionary page"
3051 );
3052
3053 assert!(reader.metadata().offset_index().is_some());
3054 let offset_indexes = &reader.metadata().offset_index().unwrap()[0];
3055
3056 let page_locations = offset_indexes[0].page_locations.clone();
3057
3058 assert_eq!(
3061 page_locations.len(),
3062 10,
3063 "Expected 10 pages but got {page_locations:#?}"
3064 );
3065 }
3066
3067 #[test]
3068 fn arrow_writer_float_nans() {
3069 let f16_field = Field::new("a", DataType::Float16, false);
3070 let f32_field = Field::new("b", DataType::Float32, false);
3071 let f64_field = Field::new("c", DataType::Float64, false);
3072 let schema = Schema::new(vec![f16_field, f32_field, f64_field]);
3073
3074 let f16_values = (0..MEDIUM_SIZE)
3075 .map(|i| {
3076 Some(if i % 2 == 0 {
3077 f16::NAN
3078 } else {
3079 f16::from_f32(i as f32)
3080 })
3081 })
3082 .collect::<Float16Array>();
3083
3084 let f32_values = (0..MEDIUM_SIZE)
3085 .map(|i| Some(if i % 2 == 0 { f32::NAN } else { i as f32 }))
3086 .collect::<Float32Array>();
3087
3088 let f64_values = (0..MEDIUM_SIZE)
3089 .map(|i| Some(if i % 2 == 0 { f64::NAN } else { i as f64 }))
3090 .collect::<Float64Array>();
3091
3092 let batch = RecordBatch::try_new(
3093 Arc::new(schema),
3094 vec![
3095 Arc::new(f16_values),
3096 Arc::new(f32_values),
3097 Arc::new(f64_values),
3098 ],
3099 )
3100 .unwrap();
3101
3102 roundtrip(batch, None);
3103 }
3104
3105 const SMALL_SIZE: usize = 7;
3106 const MEDIUM_SIZE: usize = 63;
3107
3108 fn roundtrip(expected_batch: RecordBatch, max_row_group_size: Option<usize>) -> Vec<Bytes> {
3111 let mut files = vec![];
3112 for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
3113 let mut props = WriterProperties::builder().set_writer_version(version);
3114
3115 if let Some(size) = max_row_group_size {
3116 props = props.set_max_row_group_row_count(Some(size))
3117 }
3118
3119 let props = props.build();
3120 files.push(roundtrip_opts(&expected_batch, props))
3121 }
3122 files
3123 }
3124
3125 fn roundtrip_opts_with_array_validation<F>(
3129 expected_batch: &RecordBatch,
3130 props: WriterProperties,
3131 validate: F,
3132 ) -> Bytes
3133 where
3134 F: Fn(&ArrayData, &ArrayData),
3135 {
3136 let mut file = vec![];
3137
3138 let mut writer = ArrowWriter::try_new(&mut file, expected_batch.schema(), Some(props))
3139 .expect("Unable to write file");
3140 writer.write(expected_batch).unwrap();
3141 writer.close().unwrap();
3142
3143 let file = Bytes::from(file);
3144 let mut record_batch_reader =
3145 ParquetRecordBatchReader::try_new(file.clone(), 1024).unwrap();
3146
3147 let actual_batch = record_batch_reader
3148 .next()
3149 .expect("No batch found")
3150 .expect("Unable to get batch");
3151
3152 assert_eq!(expected_batch.schema(), actual_batch.schema());
3153 assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
3154 assert_eq!(expected_batch.num_rows(), actual_batch.num_rows());
3155 for i in 0..expected_batch.num_columns() {
3156 let expected_data = expected_batch.column(i).to_data();
3157 let actual_data = actual_batch.column(i).to_data();
3158 validate(&expected_data, &actual_data);
3159 }
3160
3161 file
3162 }
3163
3164 fn roundtrip_opts(expected_batch: &RecordBatch, props: WriterProperties) -> Bytes {
3165 roundtrip_opts_with_array_validation(expected_batch, props, |a, b| {
3166 a.validate_full().expect("valid expected data");
3167 b.validate_full().expect("valid actual data");
3168 assert_eq!(a, b)
3169 })
3170 }
3171
3172 struct RoundTripOptions {
3173 values: ArrayRef,
3174 schema: SchemaRef,
3175 bloom_filter: bool,
3176 bloom_filter_ndv: Option<u64>,
3177 bloom_filter_position: BloomFilterPosition,
3178 }
3179
3180 impl RoundTripOptions {
3181 fn new(values: ArrayRef, nullable: bool) -> Self {
3182 let data_type = values.data_type().clone();
3183 let schema = Schema::new(vec![Field::new("col", data_type, nullable)]);
3184 Self {
3185 values,
3186 schema: Arc::new(schema),
3187 bloom_filter: false,
3188 bloom_filter_ndv: None,
3189 bloom_filter_position: BloomFilterPosition::AfterRowGroup,
3190 }
3191 }
3192 }
3193
3194 fn one_column_roundtrip(values: ArrayRef, nullable: bool) -> Vec<Bytes> {
3195 one_column_roundtrip_with_options(RoundTripOptions::new(values, nullable))
3196 }
3197
3198 fn one_column_roundtrip_with_schema(values: ArrayRef, schema: SchemaRef) -> Vec<Bytes> {
3199 let mut options = RoundTripOptions::new(values, false);
3200 options.schema = schema;
3201 one_column_roundtrip_with_options(options)
3202 }
3203
3204 fn one_column_roundtrip_with_options(options: RoundTripOptions) -> Vec<Bytes> {
3205 let RoundTripOptions {
3206 values,
3207 schema,
3208 bloom_filter,
3209 bloom_filter_ndv,
3210 bloom_filter_position,
3211 } = options;
3212
3213 let encodings = match values.data_type() {
3214 DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary | DataType::LargeBinary => {
3215 vec![
3216 Encoding::PLAIN,
3217 Encoding::DELTA_BYTE_ARRAY,
3218 Encoding::DELTA_LENGTH_BYTE_ARRAY,
3219 ]
3220 }
3221 DataType::Int64
3222 | DataType::Int32
3223 | DataType::Int16
3224 | DataType::Int8
3225 | DataType::UInt64
3226 | DataType::UInt32
3227 | DataType::UInt16
3228 | DataType::UInt8 => vec![
3229 Encoding::PLAIN,
3230 Encoding::DELTA_BINARY_PACKED,
3231 Encoding::BYTE_STREAM_SPLIT,
3232 ],
3233 DataType::Float32 | DataType::Float64 => {
3234 vec![Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT]
3235 }
3236 _ => vec![Encoding::PLAIN],
3237 };
3238
3239 let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3240
3241 let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
3242
3243 let mut files = vec![];
3244 for dictionary_size in [0, 1, 1024] {
3245 for encoding in &encodings {
3246 for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
3247 for row_group_size in row_group_sizes {
3248 let mut builder = WriterProperties::builder()
3249 .set_writer_version(version)
3250 .set_max_row_group_row_count(Some(row_group_size))
3251 .set_dictionary_enabled(dictionary_size != 0)
3252 .set_dictionary_page_size_limit(dictionary_size.max(1))
3253 .set_encoding(*encoding)
3254 .set_bloom_filter_enabled(bloom_filter)
3255 .set_bloom_filter_position(bloom_filter_position);
3256 if let Some(ndv) = bloom_filter_ndv {
3257 builder = builder.set_bloom_filter_max_ndv(ndv);
3258 }
3259 let props = builder.build();
3260
3261 files.push(roundtrip_opts(&expected_batch, props))
3262 }
3263 }
3264 }
3265 }
3266 files
3267 }
3268
3269 fn values_required<A, I>(iter: I) -> Vec<Bytes>
3270 where
3271 A: From<Vec<I::Item>> + Array + 'static,
3272 I: IntoIterator,
3273 {
3274 let raw_values: Vec<_> = iter.into_iter().collect();
3275 let values = Arc::new(A::from(raw_values));
3276 one_column_roundtrip(values, false)
3277 }
3278
3279 fn values_optional<A, I>(iter: I) -> Vec<Bytes>
3280 where
3281 A: From<Vec<Option<I::Item>>> + Array + 'static,
3282 I: IntoIterator,
3283 {
3284 let optional_raw_values: Vec<_> = iter
3285 .into_iter()
3286 .enumerate()
3287 .map(|(i, v)| if i % 2 == 0 { None } else { Some(v) })
3288 .collect();
3289 let optional_values = Arc::new(A::from(optional_raw_values));
3290 one_column_roundtrip(optional_values, true)
3291 }
3292
3293 fn required_and_optional<A, I>(iter: I)
3294 where
3295 A: From<Vec<I::Item>> + From<Vec<Option<I::Item>>> + Array + 'static,
3296 I: IntoIterator + Clone,
3297 {
3298 values_required::<A, I>(iter.clone());
3299 values_optional::<A, I>(iter);
3300 }
3301
3302 fn check_bloom_filter<T: AsBytes>(
3303 files: Vec<Bytes>,
3304 file_column: String,
3305 positive_values: Vec<T>,
3306 negative_values: Vec<T>,
3307 ) {
3308 files.into_iter().take(1).for_each(|file| {
3309 let file_reader = SerializedFileReader::new_with_options(
3310 file,
3311 ReadOptionsBuilder::new()
3312 .with_reader_properties(
3313 ReaderProperties::builder()
3314 .set_read_bloom_filter(true)
3315 .build(),
3316 )
3317 .build(),
3318 )
3319 .expect("Unable to open file as Parquet");
3320 let metadata = file_reader.metadata();
3321
3322 let mut bloom_filters: Vec<_> = vec![];
3324 for (ri, row_group) in metadata.row_groups().iter().enumerate() {
3325 if let Some((column_index, _)) = row_group
3326 .columns()
3327 .iter()
3328 .enumerate()
3329 .find(|(_, column)| column.column_path().string() == file_column)
3330 {
3331 let row_group_reader = file_reader
3332 .get_row_group(ri)
3333 .expect("Unable to read row group");
3334 if let Some(sbbf) = row_group_reader.get_column_bloom_filter(column_index) {
3335 bloom_filters.push(sbbf.clone());
3336 } else {
3337 panic!("No bloom filter for column named {file_column} found");
3338 }
3339 } else {
3340 panic!("No column named {file_column} found");
3341 }
3342 }
3343
3344 positive_values.iter().for_each(|value| {
3345 let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
3346 assert!(
3347 found.is_some(),
3348 "{}",
3349 format!("Value {:?} should be in bloom filter", value.as_bytes())
3350 );
3351 });
3352
3353 negative_values.iter().for_each(|value| {
3354 let found = bloom_filters.iter().find(|sbbf| sbbf.check(value));
3355 assert!(
3356 found.is_none(),
3357 "{}",
3358 format!("Value {:?} should not be in bloom filter", value.as_bytes())
3359 );
3360 });
3361 });
3362 }
3363
3364 #[test]
3365 fn all_null_primitive_single_column() {
3366 let values = Arc::new(Int32Array::from(vec![None; SMALL_SIZE]));
3367 one_column_roundtrip(values, true);
3368 }
3369 #[test]
3370 fn null_single_column() {
3371 let values = Arc::new(NullArray::new(SMALL_SIZE));
3372 one_column_roundtrip(values, true);
3373 }
3375
3376 #[test]
3377 fn bool_single_column() {
3378 required_and_optional::<BooleanArray, _>(
3379 [true, false].iter().cycle().copied().take(SMALL_SIZE),
3380 );
3381 }
3382
3383 #[test]
3384 fn bool_large_single_column() {
3385 let values = Arc::new(
3386 [None, Some(true), Some(false)]
3387 .iter()
3388 .cycle()
3389 .copied()
3390 .take(200_000)
3391 .collect::<BooleanArray>(),
3392 );
3393 let schema = Schema::new(vec![Field::new("col", values.data_type().clone(), true)]);
3394 let expected_batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
3395 let file = tempfile::tempfile().unwrap();
3396
3397 let mut writer =
3398 ArrowWriter::try_new(file.try_clone().unwrap(), expected_batch.schema(), None)
3399 .expect("Unable to write file");
3400 writer.write(&expected_batch).unwrap();
3401 writer.close().unwrap();
3402 }
3403
3404 #[test]
3405 fn check_page_offset_index_with_nan() {
3406 let values = Arc::new(Float64Array::from(vec![f64::NAN; 10]));
3407 let schema = Schema::new(vec![Field::new("col", DataType::Float64, true)]);
3408 let batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
3409
3410 let mut out = Vec::with_capacity(1024);
3411 let mut writer =
3412 ArrowWriter::try_new(&mut out, batch.schema(), None).expect("Unable to write file");
3413 writer.write(&batch).unwrap();
3414 let file_meta_data = writer.close().unwrap();
3415 for row_group in file_meta_data.row_groups() {
3416 for column in row_group.columns() {
3417 assert!(column.offset_index_offset().is_some());
3418 assert!(column.offset_index_length().is_some());
3419 assert!(column.column_index_offset().is_none());
3420 assert!(column.column_index_length().is_none());
3421 }
3422 }
3423 }
3424
3425 #[test]
3426 fn i8_single_column() {
3427 required_and_optional::<Int8Array, _>(0..SMALL_SIZE as i8);
3428 }
3429
3430 #[test]
3431 fn i16_single_column() {
3432 required_and_optional::<Int16Array, _>(0..SMALL_SIZE as i16);
3433 }
3434
3435 #[test]
3436 fn i32_single_column() {
3437 required_and_optional::<Int32Array, _>(0..SMALL_SIZE as i32);
3438 }
3439
3440 #[test]
3441 fn i64_single_column() {
3442 required_and_optional::<Int64Array, _>(0..SMALL_SIZE as i64);
3443 }
3444
3445 #[test]
3446 fn u8_single_column() {
3447 required_and_optional::<UInt8Array, _>(0..SMALL_SIZE as u8);
3448 }
3449
3450 #[test]
3451 fn u16_single_column() {
3452 required_and_optional::<UInt16Array, _>(0..SMALL_SIZE as u16);
3453 }
3454
3455 #[test]
3456 fn u32_single_column() {
3457 required_and_optional::<UInt32Array, _>(0..SMALL_SIZE as u32);
3458 }
3459
3460 #[test]
3461 fn u64_single_column() {
3462 required_and_optional::<UInt64Array, _>(0..SMALL_SIZE as u64);
3463 }
3464
3465 #[test]
3466 fn f32_single_column() {
3467 required_and_optional::<Float32Array, _>((0..SMALL_SIZE).map(|i| i as f32));
3468 }
3469
3470 #[test]
3471 fn f64_single_column() {
3472 required_and_optional::<Float64Array, _>((0..SMALL_SIZE).map(|i| i as f64));
3473 }
3474
3475 #[test]
3480 fn timestamp_second_single_column() {
3481 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
3482 let values = Arc::new(TimestampSecondArray::from(raw_values));
3483
3484 one_column_roundtrip(values, false);
3485 }
3486
3487 #[test]
3488 fn timestamp_millisecond_single_column() {
3489 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
3490 let values = Arc::new(TimestampMillisecondArray::from(raw_values));
3491
3492 one_column_roundtrip(values, false);
3493 }
3494
3495 #[test]
3496 fn timestamp_microsecond_single_column() {
3497 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
3498 let values = Arc::new(TimestampMicrosecondArray::from(raw_values));
3499
3500 one_column_roundtrip(values, false);
3501 }
3502
3503 #[test]
3504 fn timestamp_nanosecond_single_column() {
3505 let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
3506 let values = Arc::new(TimestampNanosecondArray::from(raw_values));
3507
3508 one_column_roundtrip(values, false);
3509 }
3510
3511 #[test]
3512 fn date32_single_column() {
3513 required_and_optional::<Date32Array, _>(0..SMALL_SIZE as i32);
3514 }
3515
3516 #[test]
3517 fn date64_single_column() {
3518 required_and_optional::<Date64Array, _>(
3520 (0..(SMALL_SIZE as i64 * 86400000)).step_by(86400000),
3521 );
3522 }
3523
3524 #[test]
3525 fn time32_second_single_column() {
3526 required_and_optional::<Time32SecondArray, _>(0..SMALL_SIZE as i32);
3527 }
3528
3529 #[test]
3530 fn time32_millisecond_single_column() {
3531 required_and_optional::<Time32MillisecondArray, _>(0..SMALL_SIZE as i32);
3532 }
3533
3534 #[test]
3535 fn time64_microsecond_single_column() {
3536 required_and_optional::<Time64MicrosecondArray, _>(0..SMALL_SIZE as i64);
3537 }
3538
3539 #[test]
3540 fn time64_nanosecond_single_column() {
3541 required_and_optional::<Time64NanosecondArray, _>(0..SMALL_SIZE as i64);
3542 }
3543
3544 #[test]
3545 fn duration_second_single_column() {
3546 required_and_optional::<DurationSecondArray, _>(0..SMALL_SIZE as i64);
3547 }
3548
3549 #[test]
3550 fn duration_millisecond_single_column() {
3551 required_and_optional::<DurationMillisecondArray, _>(0..SMALL_SIZE as i64);
3552 }
3553
3554 #[test]
3555 fn duration_microsecond_single_column() {
3556 required_and_optional::<DurationMicrosecondArray, _>(0..SMALL_SIZE as i64);
3557 }
3558
3559 #[test]
3560 fn duration_nanosecond_single_column() {
3561 required_and_optional::<DurationNanosecondArray, _>(0..SMALL_SIZE as i64);
3562 }
3563
3564 #[test]
3565 fn interval_year_month_single_column() {
3566 required_and_optional::<IntervalYearMonthArray, _>(0..SMALL_SIZE as i32);
3567 }
3568
3569 #[test]
3570 fn interval_day_time_single_column() {
3571 required_and_optional::<IntervalDayTimeArray, _>(vec![
3572 IntervalDayTime::new(0, 1),
3573 IntervalDayTime::new(0, 3),
3574 IntervalDayTime::new(3, -2),
3575 IntervalDayTime::new(-200, 4),
3576 ]);
3577 }
3578
3579 #[test]
3580 #[should_panic(
3581 expected = "Attempting to write an Arrow interval type MonthDayNano to parquet that is not yet implemented"
3582 )]
3583 fn interval_month_day_nano_single_column() {
3584 required_and_optional::<IntervalMonthDayNanoArray, _>(vec![
3585 IntervalMonthDayNano::new(0, 1, 5),
3586 IntervalMonthDayNano::new(0, 3, 2),
3587 IntervalMonthDayNano::new(3, -2, -5),
3588 IntervalMonthDayNano::new(-200, 4, -1),
3589 ]);
3590 }
3591
3592 #[test]
3593 fn binary_single_column() {
3594 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3595 let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3596 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3597
3598 values_required::<BinaryArray, _>(many_vecs_iter);
3600 }
3601
3602 #[test]
3603 fn binary_view_single_column() {
3604 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3605 let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3606 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3607
3608 values_required::<BinaryViewArray, _>(many_vecs_iter);
3610 }
3611
3612 #[test]
3613 fn i32_column_bloom_filter_at_end() {
3614 let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
3615 let mut options = RoundTripOptions::new(array, false);
3616 options.bloom_filter = true;
3617 options.bloom_filter_position = BloomFilterPosition::End;
3618
3619 let files = one_column_roundtrip_with_options(options);
3620 check_bloom_filter(
3621 files,
3622 "col".to_string(),
3623 (0..SMALL_SIZE as i32).collect(),
3624 (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3625 );
3626 }
3627
3628 #[test]
3629 fn i32_column_bloom_filter() {
3630 let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
3631 let mut options = RoundTripOptions::new(array, false);
3632 options.bloom_filter = true;
3633
3634 let files = one_column_roundtrip_with_options(options);
3635 check_bloom_filter(
3636 files,
3637 "col".to_string(),
3638 (0..SMALL_SIZE as i32).collect(),
3639 (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3640 );
3641 }
3642
3643 #[test]
3648 fn i32_column_bloom_filter_fixed_ndv() {
3649 let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
3650
3651 let mut options = RoundTripOptions::new(array.clone(), false);
3653 options.bloom_filter = true;
3654 options.bloom_filter_ndv = Some(1_000_000);
3655
3656 let files = one_column_roundtrip_with_options(options);
3657 check_bloom_filter(
3658 files,
3659 "col".to_string(),
3660 (0..SMALL_SIZE as i32).collect(),
3661 (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3662 );
3663
3664 let mut options = RoundTripOptions::new(array, false);
3666 options.bloom_filter = true;
3667 options.bloom_filter_ndv = Some(3);
3668
3669 let files = one_column_roundtrip_with_options(options);
3670 check_bloom_filter(
3671 files,
3672 "col".to_string(),
3673 (0..SMALL_SIZE as i32).collect(),
3674 (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
3675 );
3676 }
3677
3678 #[test]
3679 fn binary_column_bloom_filter() {
3680 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3681 let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3682 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3683
3684 let array = Arc::new(BinaryArray::from_iter_values(many_vecs_iter));
3685 let mut options = RoundTripOptions::new(array, false);
3686 options.bloom_filter = true;
3687
3688 let files = one_column_roundtrip_with_options(options);
3689 check_bloom_filter(
3690 files,
3691 "col".to_string(),
3692 many_vecs,
3693 vec![vec![(SMALL_SIZE + 1) as u8]],
3694 );
3695 }
3696
3697 #[test]
3698 fn empty_string_null_column_bloom_filter() {
3699 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3700 let raw_strs = raw_values.iter().map(|s| s.as_str());
3701
3702 let array = Arc::new(StringArray::from_iter_values(raw_strs));
3703 let mut options = RoundTripOptions::new(array, false);
3704 options.bloom_filter = true;
3705
3706 let files = one_column_roundtrip_with_options(options);
3707
3708 let optional_raw_values: Vec<_> = raw_values
3709 .iter()
3710 .enumerate()
3711 .filter_map(|(i, v)| if i % 2 == 0 { None } else { Some(v.as_str()) })
3712 .collect();
3713 check_bloom_filter(files, "col".to_string(), optional_raw_values, vec![""]);
3715 }
3716
3717 #[test]
3718 fn large_binary_single_column() {
3719 let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
3720 let many_vecs: Vec<_> = std::iter::repeat_n(one_vec, SMALL_SIZE).collect();
3721 let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());
3722
3723 values_required::<LargeBinaryArray, _>(many_vecs_iter);
3725 }
3726
3727 #[test]
3728 fn fixed_size_binary_single_column() {
3729 let mut builder = FixedSizeBinaryBuilder::new(4);
3730 builder.append_value(b"0123").unwrap();
3731 builder.append_null();
3732 builder.append_value(b"8910").unwrap();
3733 builder.append_value(b"1112").unwrap();
3734 let array = Arc::new(builder.finish());
3735
3736 one_column_roundtrip(array, true);
3737 }
3738
3739 #[test]
3740 fn string_single_column() {
3741 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3742 let raw_strs = raw_values.iter().map(|s| s.as_str());
3743
3744 required_and_optional::<StringArray, _>(raw_strs);
3745 }
3746
3747 #[test]
3748 fn large_string_single_column() {
3749 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3750 let raw_strs = raw_values.iter().map(|s| s.as_str());
3751
3752 required_and_optional::<LargeStringArray, _>(raw_strs);
3753 }
3754
3755 #[test]
3756 fn string_view_single_column() {
3757 let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
3758 let raw_strs = raw_values.iter().map(|s| s.as_str());
3759
3760 required_and_optional::<StringViewArray, _>(raw_strs);
3761 }
3762
3763 #[test]
3764 fn null_list_single_column() {
3765 let null_field = Field::new_list_field(DataType::Null, true);
3766 let list_field = Field::new("emptylist", DataType::List(Arc::new(null_field)), true);
3767
3768 let schema = Schema::new(vec![list_field]);
3769
3770 let a_values = NullArray::new(2);
3772 let a_value_offsets = arrow::buffer::Buffer::from([0, 0, 0, 2].to_byte_slice());
3773 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
3774 DataType::Null,
3775 true,
3776 ))))
3777 .len(3)
3778 .add_buffer(a_value_offsets)
3779 .null_bit_buffer(Some(Buffer::from([0b00000101])))
3780 .add_child_data(a_values.into_data())
3781 .build()
3782 .unwrap();
3783
3784 let a = ListArray::from(a_list_data);
3785
3786 assert!(a.is_valid(0));
3787 assert!(!a.is_valid(1));
3788 assert!(a.is_valid(2));
3789
3790 assert_eq!(a.value(0).len(), 0);
3791 assert_eq!(a.value(2).len(), 2);
3792 assert_eq!(a.value(2).logical_nulls().unwrap().null_count(), 2);
3793
3794 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
3795 roundtrip(batch, None);
3796 }
3797
3798 #[test]
3799 fn list_single_column() {
3800 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
3801 let a_value_offsets = arrow::buffer::Buffer::from([0, 1, 3, 3, 6, 10].to_byte_slice());
3802 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new_list_field(
3803 DataType::Int32,
3804 false,
3805 ))))
3806 .len(5)
3807 .add_buffer(a_value_offsets)
3808 .null_bit_buffer(Some(Buffer::from([0b00011011])))
3809 .add_child_data(a_values.into_data())
3810 .build()
3811 .unwrap();
3812
3813 assert_eq!(a_list_data.null_count(), 1);
3814
3815 let a = ListArray::from(a_list_data);
3816 let values = Arc::new(a);
3817
3818 one_column_roundtrip(values, true);
3819 }
3820
3821 #[test]
3822 fn large_list_single_column() {
3823 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
3824 let a_value_offsets = arrow::buffer::Buffer::from([0i64, 1, 3, 3, 6, 10].to_byte_slice());
3825 let a_list_data = ArrayData::builder(DataType::LargeList(Arc::new(Field::new(
3826 "large_item",
3827 DataType::Int32,
3828 true,
3829 ))))
3830 .len(5)
3831 .add_buffer(a_value_offsets)
3832 .add_child_data(a_values.into_data())
3833 .null_bit_buffer(Some(Buffer::from([0b00011011])))
3834 .build()
3835 .unwrap();
3836
3837 assert_eq!(a_list_data.null_count(), 1);
3839
3840 let a = LargeListArray::from(a_list_data);
3841 let values = Arc::new(a);
3842
3843 one_column_roundtrip(values, true);
3844 }
3845
3846 #[test]
3847 fn list_nested_nulls() {
3848 use arrow::datatypes::Int32Type;
3849 let data = vec![
3850 Some(vec![Some(1)]),
3851 Some(vec![Some(2), Some(3)]),
3852 None,
3853 Some(vec![Some(4), Some(5), None]),
3854 Some(vec![None]),
3855 Some(vec![Some(6), Some(7)]),
3856 ];
3857
3858 let list = ListArray::from_iter_primitive::<Int32Type, _, _>(data.clone());
3859 one_column_roundtrip(Arc::new(list), true);
3860
3861 let list = LargeListArray::from_iter_primitive::<Int32Type, _, _>(data);
3862 one_column_roundtrip(Arc::new(list), true);
3863 }
3864
3865 #[test]
3866 fn list_utf8_view_selective_padding_roundtrip() {
3867 let item = Arc::new(Field::new_list_field(DataType::Utf8View, true));
3868 let mut builder = ListBuilder::new(StringViewBuilder::new()).with_field(item);
3869 builder.values().append_value("a");
3870 builder.values().append_null();
3871 builder.append(true);
3872 builder.append(false);
3875 builder.values().append_value("large payload over 12 bytes");
3877 builder.append(true);
3878
3879 one_column_roundtrip(Arc::new(builder.finish()), true);
3880 }
3881
3882 #[test]
3883 fn struct_single_column() {
3884 let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
3885 let struct_field_a = Arc::new(Field::new("f", DataType::Int32, false));
3886 let s = StructArray::from(vec![(struct_field_a, Arc::new(a_values) as ArrayRef)]);
3887
3888 let values = Arc::new(s);
3889 one_column_roundtrip(values, false);
3890 }
3891
3892 #[test]
3893 fn list_and_map_coerced_names() {
3894 let list_field =
3896 Field::new_list("my_list", Field::new("item", DataType::Int32, false), false);
3897 let map_field = Field::new_map(
3898 "my_map",
3899 "entries",
3900 Field::new("keys", DataType::Int32, false),
3901 Field::new("values", DataType::Int32, true),
3902 false,
3903 true,
3904 );
3905
3906 let list_array = create_random_array(&list_field, 100, 0.0, 0.0).unwrap();
3907 let map_array = create_random_array(&map_field, 100, 0.0, 0.0).unwrap();
3908
3909 let arrow_schema = Arc::new(Schema::new(vec![list_field, map_field]));
3910
3911 let props = Some(WriterProperties::builder().set_coerce_types(true).build());
3913 let file = tempfile::tempfile().unwrap();
3914 let mut writer =
3915 ArrowWriter::try_new(file.try_clone().unwrap(), arrow_schema.clone(), props).unwrap();
3916
3917 let batch = RecordBatch::try_new(arrow_schema, vec![list_array, map_array]).unwrap();
3918 writer.write(&batch).unwrap();
3919 let file_metadata = writer.close().unwrap();
3920
3921 let schema = file_metadata.file_metadata().schema();
3922 let list_field = &schema.get_fields()[0].get_fields()[0];
3924 assert_eq!(list_field.get_fields()[0].name(), "element");
3925
3926 let map_field = &schema.get_fields()[1].get_fields()[0];
3927 assert_eq!(map_field.name(), "key_value");
3929 assert_eq!(map_field.get_fields()[0].name(), "key");
3931 assert_eq!(map_field.get_fields()[1].name(), "value");
3933
3934 let reader = SerializedFileReader::new(file).unwrap();
3936 let file_schema = reader.metadata().file_metadata().schema();
3937 let fields = file_schema.get_fields();
3938 let list_field = &fields[0].get_fields()[0];
3939 assert_eq!(list_field.get_fields()[0].name(), "element");
3940 let map_field = &fields[1].get_fields()[0];
3941 assert_eq!(map_field.name(), "key_value");
3942 assert_eq!(map_field.get_fields()[0].name(), "key");
3943 assert_eq!(map_field.get_fields()[1].name(), "value");
3944 }
3945
3946 #[test]
3947 fn fallback_flush_data_page() {
3948 let raw_values: Vec<_> = (0..MEDIUM_SIZE).map(|i| i.to_string()).collect();
3950 let values = Arc::new(StringArray::from(raw_values));
3951 let encodings = vec![
3952 Encoding::DELTA_BYTE_ARRAY,
3953 Encoding::DELTA_LENGTH_BYTE_ARRAY,
3954 ];
3955 let data_type = values.data_type().clone();
3956 let schema = Arc::new(Schema::new(vec![Field::new("col", data_type, false)]));
3957 let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
3958
3959 let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];
3960 let data_page_size_limit: usize = 32;
3961 let write_batch_size: usize = 16;
3962
3963 for encoding in &encodings {
3964 for row_group_size in row_group_sizes {
3965 let props = WriterProperties::builder()
3966 .set_writer_version(WriterVersion::PARQUET_2_0)
3967 .set_max_row_group_row_count(Some(row_group_size))
3968 .set_dictionary_enabled(false)
3969 .set_encoding(*encoding)
3970 .set_data_page_size_limit(data_page_size_limit)
3971 .set_write_batch_size(write_batch_size)
3972 .build();
3973
3974 roundtrip_opts_with_array_validation(&expected_batch, props, |a, b| {
3975 let string_array_a = StringArray::from(a.clone());
3976 let string_array_b = StringArray::from(b.clone());
3977 let vec_a: Vec<&str> = string_array_a.iter().map(|v| v.unwrap()).collect();
3978 let vec_b: Vec<&str> = string_array_b.iter().map(|v| v.unwrap()).collect();
3979 assert_eq!(
3980 vec_a, vec_b,
3981 "failed for encoder: {encoding:?} and row_group_size: {row_group_size:?}"
3982 );
3983 });
3984 }
3985 }
3986 }
3987
3988 #[test]
3989 fn arrow_writer_string_dictionary() {
3990 #[allow(deprecated)]
3992 let schema = Arc::new(Schema::new(vec![Field::new_dict(
3993 "dictionary",
3994 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3995 true,
3996 42,
3997 true,
3998 )]));
3999
4000 let d: Int32DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
4002 .iter()
4003 .copied()
4004 .collect();
4005
4006 one_column_roundtrip_with_schema(Arc::new(d), schema);
4008 }
4009
4010 #[test]
4011 fn arrow_writer_test_type_compatibility() {
4012 fn ensure_compatible_write<T1, T2>(array1: T1, array2: T2, expected_result: T1)
4013 where
4014 T1: Array + 'static,
4015 T2: Array + 'static,
4016 {
4017 let schema1 = Arc::new(Schema::new(vec![Field::new(
4018 "a",
4019 array1.data_type().clone(),
4020 false,
4021 )]));
4022
4023 let file = tempfile().unwrap();
4024 let mut writer =
4025 ArrowWriter::try_new(file.try_clone().unwrap(), schema1.clone(), None).unwrap();
4026
4027 let rb1 = RecordBatch::try_new(schema1.clone(), vec![Arc::new(array1)]).unwrap();
4028 writer.write(&rb1).unwrap();
4029
4030 let schema2 = Arc::new(Schema::new(vec![Field::new(
4031 "a",
4032 array2.data_type().clone(),
4033 false,
4034 )]));
4035 let rb2 = RecordBatch::try_new(schema2, vec![Arc::new(array2)]).unwrap();
4036 writer.write(&rb2).unwrap();
4037
4038 writer.close().unwrap();
4039
4040 let mut record_batch_reader =
4041 ParquetRecordBatchReader::try_new(file.try_clone().unwrap(), 1024).unwrap();
4042 let actual_batch = record_batch_reader.next().unwrap().unwrap();
4043
4044 let expected_batch =
4045 RecordBatch::try_new(schema1, vec![Arc::new(expected_result)]).unwrap();
4046 assert_eq!(actual_batch, expected_batch);
4047 }
4048
4049 ensure_compatible_write(
4052 DictionaryArray::new(
4053 UInt8Array::from_iter_values(vec![0]),
4054 Arc::new(StringArray::from_iter_values(vec!["parquet"])),
4055 ),
4056 StringArray::from_iter_values(vec!["barquet"]),
4057 DictionaryArray::new(
4058 UInt8Array::from_iter_values(vec![0, 1]),
4059 Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
4060 ),
4061 );
4062
4063 ensure_compatible_write(
4064 StringArray::from_iter_values(vec!["parquet"]),
4065 DictionaryArray::new(
4066 UInt8Array::from_iter_values(vec![0]),
4067 Arc::new(StringArray::from_iter_values(vec!["barquet"])),
4068 ),
4069 StringArray::from_iter_values(vec!["parquet", "barquet"]),
4070 );
4071
4072 ensure_compatible_write(
4075 DictionaryArray::new(
4076 UInt8Array::from_iter_values(vec![0]),
4077 Arc::new(StringArray::from_iter_values(vec!["parquet"])),
4078 ),
4079 DictionaryArray::new(
4080 UInt16Array::from_iter_values(vec![0]),
4081 Arc::new(StringArray::from_iter_values(vec!["barquet"])),
4082 ),
4083 DictionaryArray::new(
4084 UInt8Array::from_iter_values(vec![0, 1]),
4085 Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
4086 ),
4087 );
4088
4089 ensure_compatible_write(
4091 DictionaryArray::new(
4092 UInt8Array::from_iter_values(vec![0]),
4093 Arc::new(StringArray::from_iter_values(vec!["parquet"])),
4094 ),
4095 DictionaryArray::new(
4096 UInt8Array::from_iter_values(vec![0]),
4097 Arc::new(LargeStringArray::from_iter_values(vec!["barquet"])),
4098 ),
4099 DictionaryArray::new(
4100 UInt8Array::from_iter_values(vec![0, 1]),
4101 Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
4102 ),
4103 );
4104
4105 ensure_compatible_write(
4107 DictionaryArray::new(
4108 UInt8Array::from_iter_values(vec![0]),
4109 Arc::new(StringArray::from_iter_values(vec!["parquet"])),
4110 ),
4111 LargeStringArray::from_iter_values(vec!["barquet"]),
4112 DictionaryArray::new(
4113 UInt8Array::from_iter_values(vec![0, 1]),
4114 Arc::new(StringArray::from_iter_values(vec!["parquet", "barquet"])),
4115 ),
4116 );
4117
4118 ensure_compatible_write(
4121 StringArray::from_iter_values(vec!["parquet"]),
4122 LargeStringArray::from_iter_values(vec!["barquet"]),
4123 StringArray::from_iter_values(vec!["parquet", "barquet"]),
4124 );
4125
4126 ensure_compatible_write(
4127 LargeStringArray::from_iter_values(vec!["parquet"]),
4128 StringArray::from_iter_values(vec!["barquet"]),
4129 LargeStringArray::from_iter_values(vec!["parquet", "barquet"]),
4130 );
4131
4132 ensure_compatible_write(
4133 StringArray::from_iter_values(vec!["parquet"]),
4134 StringViewArray::from_iter_values(vec!["barquet"]),
4135 StringArray::from_iter_values(vec!["parquet", "barquet"]),
4136 );
4137
4138 ensure_compatible_write(
4139 StringViewArray::from_iter_values(vec!["parquet"]),
4140 StringArray::from_iter_values(vec!["barquet"]),
4141 StringViewArray::from_iter_values(vec!["parquet", "barquet"]),
4142 );
4143
4144 ensure_compatible_write(
4145 LargeStringArray::from_iter_values(vec!["parquet"]),
4146 StringViewArray::from_iter_values(vec!["barquet"]),
4147 LargeStringArray::from_iter_values(vec!["parquet", "barquet"]),
4148 );
4149
4150 ensure_compatible_write(
4151 StringViewArray::from_iter_values(vec!["parquet"]),
4152 LargeStringArray::from_iter_values(vec!["barquet"]),
4153 StringViewArray::from_iter_values(vec!["parquet", "barquet"]),
4154 );
4155
4156 ensure_compatible_write(
4159 BinaryArray::from_iter_values(vec![b"parquet"]),
4160 LargeBinaryArray::from_iter_values(vec![b"barquet"]),
4161 BinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
4162 );
4163
4164 ensure_compatible_write(
4165 LargeBinaryArray::from_iter_values(vec![b"parquet"]),
4166 BinaryArray::from_iter_values(vec![b"barquet"]),
4167 LargeBinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
4168 );
4169
4170 ensure_compatible_write(
4171 BinaryArray::from_iter_values(vec![b"parquet"]),
4172 BinaryViewArray::from_iter_values(vec![b"barquet"]),
4173 BinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
4174 );
4175
4176 ensure_compatible_write(
4177 BinaryViewArray::from_iter_values(vec![b"parquet"]),
4178 BinaryArray::from_iter_values(vec![b"barquet"]),
4179 BinaryViewArray::from_iter_values(vec![b"parquet", b"barquet"]),
4180 );
4181
4182 ensure_compatible_write(
4183 BinaryViewArray::from_iter_values(vec![b"parquet"]),
4184 LargeBinaryArray::from_iter_values(vec![b"barquet"]),
4185 BinaryViewArray::from_iter_values(vec![b"parquet", b"barquet"]),
4186 );
4187
4188 ensure_compatible_write(
4189 LargeBinaryArray::from_iter_values(vec![b"parquet"]),
4190 BinaryViewArray::from_iter_values(vec![b"barquet"]),
4191 LargeBinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
4192 );
4193
4194 let list_field_metadata = HashMap::from_iter(vec![(
4197 PARQUET_FIELD_ID_META_KEY.to_string(),
4198 "1".to_string(),
4199 )]);
4200 let list_field = Field::new_list_field(DataType::Int32, false);
4201
4202 let values1 = Arc::new(Int32Array::from(vec![0, 1, 2, 3, 4]));
4203 let offsets1 = OffsetBuffer::new(vec![0, 2, 5].into());
4204
4205 let values2 = Arc::new(Int32Array::from(vec![5, 6, 7, 8, 9]));
4206 let offsets2 = OffsetBuffer::new(vec![0, 3, 5].into());
4207
4208 let values_expected = Arc::new(Int32Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]));
4209 let offsets_expected = OffsetBuffer::new(vec![0, 2, 5, 8, 10].into());
4210
4211 ensure_compatible_write(
4212 ListArray::try_new(
4214 Arc::new(
4215 list_field
4216 .clone()
4217 .with_metadata(list_field_metadata.clone()),
4218 ),
4219 offsets1,
4220 values1,
4221 None,
4222 )
4223 .unwrap(),
4224 ListArray::try_new(Arc::new(list_field.clone()), offsets2, values2, None).unwrap(),
4226 ListArray::try_new(
4228 Arc::new(
4229 list_field
4230 .clone()
4231 .with_metadata(list_field_metadata.clone()),
4232 ),
4233 offsets_expected,
4234 values_expected,
4235 None,
4236 )
4237 .unwrap(),
4238 );
4239 }
4240
4241 #[test]
4242 fn arrow_writer_primitive_dictionary() {
4243 #[allow(deprecated)]
4245 let schema = Arc::new(Schema::new(vec![Field::new_dict(
4246 "dictionary",
4247 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::UInt32)),
4248 true,
4249 42,
4250 true,
4251 )]));
4252
4253 let mut builder = PrimitiveDictionaryBuilder::<UInt8Type, UInt32Type>::new();
4255 builder.append(12345678).unwrap();
4256 builder.append_null();
4257 builder.append(22345678).unwrap();
4258 builder.append(12345678).unwrap();
4259 let d = builder.finish();
4260
4261 one_column_roundtrip_with_schema(Arc::new(d), schema);
4262 }
4263
4264 #[test]
4265 fn arrow_writer_decimal32_dictionary() {
4266 let integers = vec![12345, 56789, 34567];
4267
4268 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
4269
4270 let values = Decimal32Array::from(integers.clone())
4271 .with_precision_and_scale(5, 2)
4272 .unwrap();
4273
4274 let array = DictionaryArray::new(keys, Arc::new(values));
4275 one_column_roundtrip(Arc::new(array.clone()), true);
4276
4277 let values = Decimal32Array::from(integers)
4278 .with_precision_and_scale(9, 2)
4279 .unwrap();
4280
4281 let array = array.with_values(Arc::new(values));
4282 one_column_roundtrip(Arc::new(array), true);
4283 }
4284
4285 #[test]
4286 fn arrow_writer_decimal64_dictionary() {
4287 let integers = vec![12345, 56789, 34567];
4288
4289 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
4290
4291 let values = Decimal64Array::from(integers.clone())
4292 .with_precision_and_scale(5, 2)
4293 .unwrap();
4294
4295 let array = DictionaryArray::new(keys, Arc::new(values));
4296 one_column_roundtrip(Arc::new(array.clone()), true);
4297
4298 let values = Decimal64Array::from(integers)
4299 .with_precision_and_scale(12, 2)
4300 .unwrap();
4301
4302 let array = array.with_values(Arc::new(values));
4303 one_column_roundtrip(Arc::new(array), true);
4304 }
4305
4306 #[test]
4307 fn arrow_writer_decimal128_dictionary() {
4308 let integers = vec![12345, 56789, 34567];
4309
4310 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
4311
4312 let values = Decimal128Array::from(integers.clone())
4313 .with_precision_and_scale(5, 2)
4314 .unwrap();
4315
4316 let array = DictionaryArray::new(keys, Arc::new(values));
4317 one_column_roundtrip(Arc::new(array.clone()), true);
4318
4319 let values = Decimal128Array::from(integers)
4320 .with_precision_and_scale(12, 2)
4321 .unwrap();
4322
4323 let array = array.with_values(Arc::new(values));
4324 one_column_roundtrip(Arc::new(array), true);
4325 }
4326
4327 #[test]
4328 fn arrow_writer_decimal256_dictionary() {
4329 let integers = vec![
4330 i256::from_i128(12345),
4331 i256::from_i128(56789),
4332 i256::from_i128(34567),
4333 ];
4334
4335 let keys = UInt8Array::from(vec![Some(0), None, Some(1), Some(2), Some(1)]);
4336
4337 let values = Decimal256Array::from(integers.clone())
4338 .with_precision_and_scale(5, 2)
4339 .unwrap();
4340
4341 let array = DictionaryArray::new(keys, Arc::new(values));
4342 one_column_roundtrip(Arc::new(array.clone()), true);
4343
4344 let values = Decimal256Array::from(integers)
4345 .with_precision_and_scale(12, 2)
4346 .unwrap();
4347
4348 let array = array.with_values(Arc::new(values));
4349 one_column_roundtrip(Arc::new(array), true);
4350 }
4351
4352 #[test]
4353 fn arrow_writer_string_dictionary_unsigned_index() {
4354 #[allow(deprecated)]
4356 let schema = Arc::new(Schema::new(vec![Field::new_dict(
4357 "dictionary",
4358 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
4359 true,
4360 42,
4361 true,
4362 )]));
4363
4364 let d: UInt8DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")]
4366 .iter()
4367 .copied()
4368 .collect();
4369
4370 one_column_roundtrip_with_schema(Arc::new(d), schema);
4371 }
4372
4373 #[test]
4374 fn u32_min_max() {
4375 let src = [
4377 u32::MIN,
4378 1,
4379 (i32::MAX as u32) - 1,
4380 i32::MAX as u32,
4381 (i32::MAX as u32) + 1,
4382 u32::MAX - 1,
4383 u32::MAX,
4384 ];
4385 let values = Arc::new(UInt32Array::from_iter_values(src.iter().cloned()));
4386 let files = one_column_roundtrip(values, false);
4387
4388 for file in files {
4389 let reader = SerializedFileReader::new(file).unwrap();
4391 let metadata = reader.metadata();
4392
4393 let mut row_offset = 0;
4394 for row_group in metadata.row_groups() {
4395 assert_eq!(row_group.num_columns(), 1);
4396 let column = row_group.column(0);
4397
4398 let num_values = column.num_values() as usize;
4399 let src_slice = &src[row_offset..row_offset + num_values];
4400 row_offset += column.num_values() as usize;
4401
4402 let stats = column.statistics().unwrap();
4403 if let Statistics::Int32(stats) = stats {
4404 assert_eq!(
4405 *stats.min_opt().unwrap() as u32,
4406 *src_slice.iter().min().unwrap()
4407 );
4408 assert_eq!(
4409 *stats.max_opt().unwrap() as u32,
4410 *src_slice.iter().max().unwrap()
4411 );
4412 } else {
4413 panic!("Statistics::Int32 missing")
4414 }
4415 }
4416 }
4417 }
4418
4419 #[test]
4420 fn u64_min_max() {
4421 let src = [
4423 u64::MIN,
4424 1,
4425 (i64::MAX as u64) - 1,
4426 i64::MAX as u64,
4427 (i64::MAX as u64) + 1,
4428 u64::MAX - 1,
4429 u64::MAX,
4430 ];
4431 let values = Arc::new(UInt64Array::from_iter_values(src.iter().cloned()));
4432 let files = one_column_roundtrip(values, false);
4433
4434 for file in files {
4435 let reader = SerializedFileReader::new(file).unwrap();
4437 let metadata = reader.metadata();
4438
4439 let mut row_offset = 0;
4440 for row_group in metadata.row_groups() {
4441 assert_eq!(row_group.num_columns(), 1);
4442 let column = row_group.column(0);
4443
4444 let num_values = column.num_values() as usize;
4445 let src_slice = &src[row_offset..row_offset + num_values];
4446 row_offset += column.num_values() as usize;
4447
4448 let stats = column.statistics().unwrap();
4449 if let Statistics::Int64(stats) = stats {
4450 assert_eq!(
4451 *stats.min_opt().unwrap() as u64,
4452 *src_slice.iter().min().unwrap()
4453 );
4454 assert_eq!(
4455 *stats.max_opt().unwrap() as u64,
4456 *src_slice.iter().max().unwrap()
4457 );
4458 } else {
4459 panic!("Statistics::Int64 missing")
4460 }
4461 }
4462 }
4463 }
4464
4465 #[test]
4466 fn statistics_null_counts_only_nulls() {
4467 let values = Arc::new(UInt64Array::from(vec![None, None]));
4469 let files = one_column_roundtrip(values, true);
4470
4471 for file in files {
4472 let reader = SerializedFileReader::new(file).unwrap();
4474 let metadata = reader.metadata();
4475 assert_eq!(metadata.num_row_groups(), 1);
4476 let row_group = metadata.row_group(0);
4477 assert_eq!(row_group.num_columns(), 1);
4478 let column = row_group.column(0);
4479 let stats = column.statistics().unwrap();
4480 assert_eq!(stats.null_count_opt(), Some(2));
4481 }
4482 }
4483
4484 #[test]
4485 fn test_list_of_struct_roundtrip() {
4486 let int_field = Field::new("a", DataType::Int32, true);
4488 let int_field2 = Field::new("b", DataType::Int32, true);
4489
4490 let int_builder = Int32Builder::with_capacity(10);
4491 let int_builder2 = Int32Builder::with_capacity(10);
4492
4493 let struct_builder = StructBuilder::new(
4494 vec![int_field, int_field2],
4495 vec![Box::new(int_builder), Box::new(int_builder2)],
4496 );
4497 let mut list_builder = ListBuilder::new(struct_builder);
4498
4499 let values = list_builder.values();
4504 values
4505 .field_builder::<Int32Builder>(0)
4506 .unwrap()
4507 .append_value(1);
4508 values
4509 .field_builder::<Int32Builder>(1)
4510 .unwrap()
4511 .append_value(2);
4512 values.append(true);
4513 list_builder.append(true);
4514
4515 list_builder.append(true);
4517
4518 list_builder.append(false);
4520
4521 let values = list_builder.values();
4523 values
4524 .field_builder::<Int32Builder>(0)
4525 .unwrap()
4526 .append_null();
4527 values
4528 .field_builder::<Int32Builder>(1)
4529 .unwrap()
4530 .append_null();
4531 values.append(false);
4532 values
4533 .field_builder::<Int32Builder>(0)
4534 .unwrap()
4535 .append_null();
4536 values
4537 .field_builder::<Int32Builder>(1)
4538 .unwrap()
4539 .append_null();
4540 values.append(false);
4541 list_builder.append(true);
4542
4543 let values = list_builder.values();
4545 values
4546 .field_builder::<Int32Builder>(0)
4547 .unwrap()
4548 .append_null();
4549 values
4550 .field_builder::<Int32Builder>(1)
4551 .unwrap()
4552 .append_value(3);
4553 values.append(true);
4554 list_builder.append(true);
4555
4556 let values = list_builder.values();
4558 values
4559 .field_builder::<Int32Builder>(0)
4560 .unwrap()
4561 .append_value(2);
4562 values
4563 .field_builder::<Int32Builder>(1)
4564 .unwrap()
4565 .append_null();
4566 values.append(true);
4567 list_builder.append(true);
4568
4569 let array = Arc::new(list_builder.finish());
4570
4571 one_column_roundtrip(array, true);
4572 }
4573
4574 fn row_group_sizes(metadata: &ParquetMetaData) -> Vec<i64> {
4575 metadata.row_groups().iter().map(|x| x.num_rows()).collect()
4576 }
4577
4578 #[test]
4579 fn test_aggregates_records() {
4580 let arrays = [
4581 Int32Array::from((0..100).collect::<Vec<_>>()),
4582 Int32Array::from((0..50).collect::<Vec<_>>()),
4583 Int32Array::from((200..500).collect::<Vec<_>>()),
4584 ];
4585
4586 let schema = Arc::new(Schema::new(vec![Field::new(
4587 "int",
4588 ArrowDataType::Int32,
4589 false,
4590 )]));
4591
4592 let file = tempfile::tempfile().unwrap();
4593
4594 let props = WriterProperties::builder()
4595 .set_max_row_group_row_count(Some(200))
4596 .build();
4597
4598 let mut writer =
4599 ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
4600
4601 for array in arrays {
4602 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
4603 writer.write(&batch).unwrap();
4604 }
4605
4606 writer.close().unwrap();
4607
4608 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4609 assert_eq!(&row_group_sizes(builder.metadata()), &[200, 200, 50]);
4610
4611 let batches = builder
4612 .with_batch_size(100)
4613 .build()
4614 .unwrap()
4615 .collect::<ArrowResult<Vec<_>>>()
4616 .unwrap();
4617
4618 assert_eq!(batches.len(), 5);
4619 assert!(batches.iter().all(|x| x.num_columns() == 1));
4620
4621 let batch_sizes: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
4622
4623 assert_eq!(&batch_sizes, &[100, 100, 100, 100, 50]);
4624
4625 let values: Vec<_> = batches
4626 .iter()
4627 .flat_map(|x| {
4628 x.column(0)
4629 .as_any()
4630 .downcast_ref::<Int32Array>()
4631 .unwrap()
4632 .values()
4633 .iter()
4634 .cloned()
4635 })
4636 .collect();
4637
4638 let expected_values: Vec<_> = [0..100, 0..50, 200..500].into_iter().flatten().collect();
4639 assert_eq!(&values, &expected_values)
4640 }
4641
4642 #[test]
4643 fn complex_aggregate() {
4644 let field_a = Arc::new(Field::new("leaf_a", DataType::Int32, false));
4646 let field_b = Arc::new(Field::new("leaf_b", DataType::Int32, true));
4647 let struct_a = Arc::new(Field::new(
4648 "struct_a",
4649 DataType::Struct(vec![field_a.clone(), field_b.clone()].into()),
4650 true,
4651 ));
4652
4653 let list_a = Arc::new(Field::new("list", DataType::List(struct_a), true));
4654 let struct_b = Arc::new(Field::new(
4655 "struct_b",
4656 DataType::Struct(vec![list_a.clone()].into()),
4657 false,
4658 ));
4659
4660 let schema = Arc::new(Schema::new(vec![struct_b]));
4661
4662 let field_a_array = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
4664 let field_b_array =
4665 Int32Array::from_iter(vec![Some(1), None, Some(2), None, None, Some(6)]);
4666
4667 let struct_a_array = StructArray::from(vec![
4668 (field_a.clone(), Arc::new(field_a_array) as ArrayRef),
4669 (field_b.clone(), Arc::new(field_b_array) as ArrayRef),
4670 ]);
4671
4672 let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
4673 .len(5)
4674 .add_buffer(Buffer::from_iter(vec![
4675 0_i32, 1_i32, 1_i32, 3_i32, 3_i32, 5_i32,
4676 ]))
4677 .null_bit_buffer(Some(Buffer::from_iter(vec![
4678 true, false, true, false, true,
4679 ])))
4680 .child_data(vec![struct_a_array.into_data()])
4681 .build()
4682 .unwrap();
4683
4684 let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
4685 let struct_b_array = StructArray::from(vec![(list_a.clone(), list_a_array)]);
4686
4687 let batch1 =
4688 RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
4689 .unwrap();
4690
4691 let field_a_array = Int32Array::from(vec![6, 7, 8, 9, 10]);
4692 let field_b_array = Int32Array::from_iter(vec![None, None, None, Some(1), None]);
4693
4694 let struct_a_array = StructArray::from(vec![
4695 (field_a, Arc::new(field_a_array) as ArrayRef),
4696 (field_b, Arc::new(field_b_array) as ArrayRef),
4697 ]);
4698
4699 let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
4700 .len(2)
4701 .add_buffer(Buffer::from_iter(vec![0_i32, 4_i32, 5_i32]))
4702 .child_data(vec![struct_a_array.into_data()])
4703 .build()
4704 .unwrap();
4705
4706 let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
4707 let struct_b_array = StructArray::from(vec![(list_a, list_a_array)]);
4708
4709 let batch2 =
4710 RecordBatch::try_from_iter(vec![("struct_b", Arc::new(struct_b_array) as ArrayRef)])
4711 .unwrap();
4712
4713 let batches = &[batch1, batch2];
4714
4715 let expected = r#"
4718 +-------------------------------------------------------------------------------------------------------+
4719 | struct_b |
4720 +-------------------------------------------------------------------------------------------------------+
4721 | {list: [{leaf_a: 1, leaf_b: 1}]} |
4722 | {list: } |
4723 | {list: [{leaf_a: 2, leaf_b: }, {leaf_a: 3, leaf_b: 2}]} |
4724 | {list: } |
4725 | {list: [{leaf_a: 4, leaf_b: }, {leaf_a: 5, leaf_b: }]} |
4726 | {list: [{leaf_a: 6, leaf_b: }, {leaf_a: 7, leaf_b: }, {leaf_a: 8, leaf_b: }, {leaf_a: 9, leaf_b: 1}]} |
4727 | {list: [{leaf_a: 10, leaf_b: }]} |
4728 +-------------------------------------------------------------------------------------------------------+
4729 "#.trim().split('\n').map(|x| x.trim()).collect::<Vec<_>>().join("\n");
4730
4731 let actual = pretty_format_batches(batches).unwrap().to_string();
4732 assert_eq!(actual, expected);
4733
4734 let file = tempfile::tempfile().unwrap();
4736 let props = WriterProperties::builder()
4737 .set_max_row_group_row_count(Some(6))
4738 .build();
4739
4740 let mut writer =
4741 ArrowWriter::try_new(file.try_clone().unwrap(), schema, Some(props)).unwrap();
4742
4743 for batch in batches {
4744 writer.write(batch).unwrap();
4745 }
4746 writer.close().unwrap();
4747
4748 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4753 assert_eq!(&row_group_sizes(builder.metadata()), &[6, 1]);
4754
4755 let batches = builder
4756 .with_batch_size(2)
4757 .build()
4758 .unwrap()
4759 .collect::<ArrowResult<Vec<_>>>()
4760 .unwrap();
4761
4762 assert_eq!(batches.len(), 4);
4763 let batch_counts: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
4764 assert_eq!(&batch_counts, &[2, 2, 2, 1]);
4765
4766 let actual = pretty_format_batches(&batches).unwrap().to_string();
4767 assert_eq!(actual, expected);
4768 }
4769
4770 #[test]
4771 fn test_arrow_writer_metadata() {
4772 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
4773 let file_schema = batch_schema.clone().with_metadata(
4774 vec![("foo".to_string(), "bar".to_string())]
4775 .into_iter()
4776 .collect(),
4777 );
4778
4779 let batch = RecordBatch::try_new(
4780 Arc::new(batch_schema),
4781 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4782 )
4783 .unwrap();
4784
4785 let mut buf = Vec::with_capacity(1024);
4786 let mut writer = ArrowWriter::try_new(&mut buf, Arc::new(file_schema), None).unwrap();
4787 writer.write(&batch).unwrap();
4788 writer.close().unwrap();
4789 }
4790
4791 #[test]
4792 fn test_arrow_writer_nullable() {
4793 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
4794 let file_schema = Schema::new(vec![Field::new("int32", DataType::Int32, true)]);
4795 let file_schema = Arc::new(file_schema);
4796
4797 let batch = RecordBatch::try_new(
4798 Arc::new(batch_schema),
4799 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
4800 )
4801 .unwrap();
4802
4803 let mut buf = Vec::with_capacity(1024);
4804 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
4805 writer.write(&batch).unwrap();
4806 writer.close().unwrap();
4807
4808 let mut read = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
4809 let back = read.next().unwrap().unwrap();
4810 assert_eq!(back.schema(), file_schema);
4811 assert_ne!(back.schema(), batch.schema());
4812 assert_eq!(back.column(0).as_ref(), batch.column(0).as_ref());
4813 }
4814
4815 #[test]
4816 fn in_progress_accounting() {
4817 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
4819
4820 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
4822
4823 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
4825
4826 let mut writer = ArrowWriter::try_new(vec![], batch.schema(), None).unwrap();
4827
4828 assert_eq!(writer.in_progress_size(), 0);
4830 assert_eq!(writer.in_progress_rows(), 0);
4831 assert_eq!(writer.memory_size(), 0);
4832 assert_eq!(writer.bytes_written(), 4); writer.write(&batch).unwrap();
4834
4835 let initial_size = writer.in_progress_size();
4837 assert!(initial_size > 0);
4838 assert_eq!(writer.in_progress_rows(), 5);
4839 let initial_memory = writer.memory_size();
4840 assert!(initial_memory > 0);
4841 assert!(
4843 initial_size <= initial_memory,
4844 "{initial_size} <= {initial_memory}"
4845 );
4846
4847 writer.write(&batch).unwrap();
4849 assert!(writer.in_progress_size() > initial_size);
4850 assert_eq!(writer.in_progress_rows(), 10);
4851 assert!(writer.memory_size() > initial_memory);
4852 assert!(
4853 writer.in_progress_size() <= writer.memory_size(),
4854 "in_progress_size {} <= memory_size {}",
4855 writer.in_progress_size(),
4856 writer.memory_size()
4857 );
4858
4859 let pre_flush_bytes_written = writer.bytes_written();
4861 writer.flush().unwrap();
4862 assert_eq!(writer.in_progress_size(), 0);
4863 assert_eq!(writer.memory_size(), 0);
4864 assert!(writer.bytes_written() > pre_flush_bytes_written);
4865
4866 writer.close().unwrap();
4867 }
4868
4869 #[test]
4870 fn test_writer_all_null() {
4871 let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
4872 let b = Int32Array::new(vec![0; 5].into(), Some(NullBuffer::new_null(5)));
4873 let batch = RecordBatch::try_from_iter(vec![
4874 ("a", Arc::new(a) as ArrayRef),
4875 ("b", Arc::new(b) as ArrayRef),
4876 ])
4877 .unwrap();
4878
4879 let mut buf = Vec::with_capacity(1024);
4880 let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), None).unwrap();
4881 writer.write(&batch).unwrap();
4882 writer.close().unwrap();
4883
4884 let bytes = Bytes::from(buf);
4885 let options = ReadOptionsBuilder::new().with_page_index().build();
4886 let reader = SerializedFileReader::new_with_options(bytes, options).unwrap();
4887 let index = reader.metadata().offset_index().unwrap();
4888
4889 assert_eq!(index.len(), 1);
4890 assert_eq!(index[0].len(), 2); assert_eq!(index[0][0].page_locations().len(), 1); assert_eq!(index[0][1].page_locations().len(), 1); }
4894
4895 #[test]
4896 fn test_disabled_statistics_with_page() {
4897 let file_schema = Schema::new(vec![
4898 Field::new("a", DataType::Utf8, true),
4899 Field::new("b", DataType::Utf8, true),
4900 ]);
4901 let file_schema = Arc::new(file_schema);
4902
4903 let batch = RecordBatch::try_new(
4904 file_schema.clone(),
4905 vec![
4906 Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
4907 Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
4908 ],
4909 )
4910 .unwrap();
4911
4912 let props = WriterProperties::builder()
4913 .set_statistics_enabled(EnabledStatistics::None)
4914 .set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
4915 .build();
4916
4917 let mut buf = Vec::with_capacity(1024);
4918 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
4919 writer.write(&batch).unwrap();
4920
4921 let metadata = writer.close().unwrap();
4922 assert_eq!(metadata.num_row_groups(), 1);
4923 let row_group = metadata.row_group(0);
4924 assert_eq!(row_group.num_columns(), 2);
4925 assert!(row_group.column(0).offset_index_offset().is_some());
4927 assert!(row_group.column(0).column_index_offset().is_some());
4928 assert!(row_group.column(1).offset_index_offset().is_some());
4930 assert!(row_group.column(1).column_index_offset().is_none());
4931
4932 let options = ReadOptionsBuilder::new().with_page_index().build();
4933 let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
4934
4935 let row_group = reader.get_row_group(0).unwrap();
4936 let a_col = row_group.metadata().column(0);
4937 let b_col = row_group.metadata().column(1);
4938
4939 if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
4941 let min = byte_array_stats.min_opt().unwrap();
4942 let max = byte_array_stats.max_opt().unwrap();
4943
4944 assert_eq!(min.as_bytes(), b"a");
4945 assert_eq!(max.as_bytes(), b"d");
4946 } else {
4947 panic!("expecting Statistics::ByteArray");
4948 }
4949
4950 assert!(b_col.statistics().is_none());
4952
4953 let offset_index = reader.metadata().offset_index().unwrap();
4954 assert_eq!(offset_index.len(), 1); assert_eq!(offset_index[0].len(), 2); let column_index = reader.metadata().column_index().unwrap();
4958 assert_eq!(column_index.len(), 1); assert_eq!(column_index[0].len(), 2); let a_idx = &column_index[0][0];
4962 assert!(
4963 matches!(a_idx, ColumnIndexMetaData::BYTE_ARRAY(_)),
4964 "{a_idx:?}"
4965 );
4966 let b_idx = &column_index[0][1];
4967 assert!(matches!(b_idx, ColumnIndexMetaData::NONE), "{b_idx:?}");
4968 }
4969
4970 #[test]
4971 fn test_disabled_statistics_with_chunk() {
4972 let file_schema = Schema::new(vec![
4973 Field::new("a", DataType::Utf8, true),
4974 Field::new("b", DataType::Utf8, true),
4975 ]);
4976 let file_schema = Arc::new(file_schema);
4977
4978 let batch = RecordBatch::try_new(
4979 file_schema.clone(),
4980 vec![
4981 Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as _,
4982 Arc::new(StringArray::from(vec!["w", "x", "y", "z"])) as _,
4983 ],
4984 )
4985 .unwrap();
4986
4987 let props = WriterProperties::builder()
4988 .set_statistics_enabled(EnabledStatistics::None)
4989 .set_column_statistics_enabled("a".into(), EnabledStatistics::Chunk)
4990 .build();
4991
4992 let mut buf = Vec::with_capacity(1024);
4993 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), Some(props)).unwrap();
4994 writer.write(&batch).unwrap();
4995
4996 let metadata = writer.close().unwrap();
4997 assert_eq!(metadata.num_row_groups(), 1);
4998 let row_group = metadata.row_group(0);
4999 assert_eq!(row_group.num_columns(), 2);
5000 assert!(row_group.column(0).offset_index_offset().is_some());
5002 assert!(row_group.column(0).column_index_offset().is_none());
5003 assert!(row_group.column(1).offset_index_offset().is_some());
5005 assert!(row_group.column(1).column_index_offset().is_none());
5006
5007 let options = ReadOptionsBuilder::new().with_page_index().build();
5008 let reader = SerializedFileReader::new_with_options(Bytes::from(buf), options).unwrap();
5009
5010 let row_group = reader.get_row_group(0).unwrap();
5011 let a_col = row_group.metadata().column(0);
5012 let b_col = row_group.metadata().column(1);
5013
5014 if let Statistics::ByteArray(byte_array_stats) = a_col.statistics().unwrap() {
5016 let min = byte_array_stats.min_opt().unwrap();
5017 let max = byte_array_stats.max_opt().unwrap();
5018
5019 assert_eq!(min.as_bytes(), b"a");
5020 assert_eq!(max.as_bytes(), b"d");
5021 } else {
5022 panic!("expecting Statistics::ByteArray");
5023 }
5024
5025 assert!(b_col.statistics().is_none());
5027
5028 let column_index = reader.metadata().column_index().unwrap();
5029 assert_eq!(column_index.len(), 1); assert_eq!(column_index[0].len(), 2); let a_idx = &column_index[0][0];
5033 assert!(matches!(a_idx, ColumnIndexMetaData::NONE), "{a_idx:?}");
5034 let b_idx = &column_index[0][1];
5035 assert!(matches!(b_idx, ColumnIndexMetaData::NONE), "{b_idx:?}");
5036 }
5037
5038 #[test]
5039 fn test_arrow_writer_skip_metadata() {
5040 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
5041 let file_schema = Arc::new(batch_schema.clone());
5042
5043 let batch = RecordBatch::try_new(
5044 Arc::new(batch_schema),
5045 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
5046 )
5047 .unwrap();
5048 let skip_options = ArrowWriterOptions::new().with_skip_arrow_metadata(true);
5049
5050 let mut buf = Vec::with_capacity(1024);
5051 let mut writer =
5052 ArrowWriter::try_new_with_options(&mut buf, file_schema.clone(), skip_options).unwrap();
5053 writer.write(&batch).unwrap();
5054 writer.close().unwrap();
5055
5056 let bytes = Bytes::from(buf);
5057 let reader_builder = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
5058 assert_eq!(file_schema, *reader_builder.schema());
5059 if let Some(key_value_metadata) = reader_builder
5060 .metadata()
5061 .file_metadata()
5062 .key_value_metadata()
5063 {
5064 assert!(
5065 !key_value_metadata
5066 .iter()
5067 .any(|kv| kv.key.as_str() == ARROW_SCHEMA_META_KEY)
5068 );
5069 }
5070 }
5071
5072 #[test]
5073 fn test_arrow_writer_skip_path_in_schema() {
5074 let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
5075 let file_schema = Arc::new(batch_schema.clone());
5076
5077 let batch = RecordBatch::try_new(
5078 Arc::new(batch_schema),
5079 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
5080 )
5081 .unwrap();
5082
5083 let skip_options = ArrowWriterOptions::new();
5085
5086 let mut buf = Vec::with_capacity(1024);
5087 let mut writer =
5088 ArrowWriter::try_new_with_options(&mut buf, file_schema.clone(), skip_options).unwrap();
5089 writer.write(&batch).unwrap();
5090 writer.close().unwrap();
5091
5092 let skip_options = ArrowWriterOptions::new().with_properties(
5094 WriterProperties::builder()
5095 .set_write_path_in_schema(false)
5096 .build(),
5097 );
5098
5099 let mut buf2 = Vec::with_capacity(1024);
5100 let mut writer =
5101 ArrowWriter::try_new_with_options(&mut buf2, file_schema.clone(), skip_options)
5102 .unwrap();
5103 writer.write(&batch).unwrap();
5104 writer.close().unwrap();
5105
5106 assert!(buf.len() > buf2.len());
5108 }
5109
5110 #[test]
5111 fn mismatched_schemas() {
5112 let batch_schema = Schema::new(vec![Field::new("count", DataType::Int32, false)]);
5113 let file_schema = Arc::new(Schema::new(vec![Field::new(
5114 "temperature",
5115 DataType::Float64,
5116 false,
5117 )]));
5118
5119 let batch = RecordBatch::try_new(
5120 Arc::new(batch_schema),
5121 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
5122 )
5123 .unwrap();
5124
5125 let mut buf = Vec::with_capacity(1024);
5126 let mut writer = ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
5127
5128 let err = writer.write(&batch).unwrap_err().to_string();
5129 assert_eq!(
5130 err,
5131 "Arrow: Incompatible type. Field 'temperature' has type Float64, array has type Int32"
5132 );
5133 }
5134
5135 #[test]
5136 fn test_roundtrip_empty_schema() {
5138 let empty_batch = RecordBatch::try_new_with_options(
5140 Arc::new(Schema::empty()),
5141 vec![],
5142 &RecordBatchOptions::default().with_row_count(Some(0)),
5143 )
5144 .unwrap();
5145
5146 let mut parquet_bytes: Vec<u8> = Vec::new();
5148 let mut writer =
5149 ArrowWriter::try_new(&mut parquet_bytes, empty_batch.schema(), None).unwrap();
5150 writer.write(&empty_batch).unwrap();
5151 writer.close().unwrap();
5152
5153 let bytes = Bytes::from(parquet_bytes);
5155 let reader = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
5156 assert_eq!(reader.schema(), &empty_batch.schema());
5157 let batches: Vec<_> = reader
5158 .build()
5159 .unwrap()
5160 .collect::<ArrowResult<Vec<_>>>()
5161 .unwrap();
5162 assert_eq!(batches.len(), 0);
5163 }
5164
5165 #[test]
5166 fn test_page_stats_not_written_by_default() {
5167 let string_field = Field::new("a", DataType::Utf8, false);
5168 let schema = Schema::new(vec![string_field]);
5169 let raw_string_values = vec!["Blart Versenwald III"];
5170 let string_values = StringArray::from(raw_string_values.clone());
5171 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
5172
5173 let props = WriterProperties::builder()
5174 .set_statistics_enabled(EnabledStatistics::Page)
5175 .set_dictionary_enabled(false)
5176 .set_encoding(Encoding::PLAIN)
5177 .set_compression(crate::basic::Compression::UNCOMPRESSED)
5178 .build();
5179
5180 let file = roundtrip_opts(&batch, props);
5181
5182 let first_page = &file[4..];
5187 let mut prot = ThriftSliceInputProtocol::new(first_page);
5188 let hdr = PageHeader::read_thrift(&mut prot).unwrap();
5189 let stats = hdr.data_page_header.unwrap().statistics;
5190
5191 assert!(stats.is_none());
5192 }
5193
5194 #[test]
5195 fn test_page_stats_when_enabled() {
5196 let string_field = Field::new("a", DataType::Utf8, false);
5197 let schema = Schema::new(vec![string_field]);
5198 let raw_string_values = vec!["Blart Versenwald III", "Andrew Lamb"];
5199 let string_values = StringArray::from(raw_string_values.clone());
5200 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(string_values)]).unwrap();
5201
5202 let props = WriterProperties::builder()
5203 .set_statistics_enabled(EnabledStatistics::Page)
5204 .set_dictionary_enabled(false)
5205 .set_encoding(Encoding::PLAIN)
5206 .set_write_page_header_statistics(true)
5207 .set_compression(crate::basic::Compression::UNCOMPRESSED)
5208 .build();
5209
5210 let file = roundtrip_opts(&batch, props);
5211
5212 let first_page = &file[4..];
5217 let mut prot = ThriftSliceInputProtocol::new(first_page);
5218 let hdr = PageHeader::read_thrift(&mut prot).unwrap();
5219 let stats = hdr.data_page_header.unwrap().statistics;
5220
5221 let stats = stats.unwrap();
5222 assert!(stats.is_max_value_exact.unwrap());
5224 assert!(stats.is_min_value_exact.unwrap());
5225 assert_eq!(stats.max_value.unwrap(), "Blart Versenwald III".as_bytes());
5226 assert_eq!(stats.min_value.unwrap(), "Andrew Lamb".as_bytes());
5227 }
5228
5229 #[test]
5230 fn test_page_stats_truncation() {
5231 let string_field = Field::new("a", DataType::Utf8, false);
5232 let binary_field = Field::new("b", DataType::Binary, false);
5233 let schema = Schema::new(vec![string_field, binary_field]);
5234
5235 let raw_string_values = vec!["Blart Versenwald III"];
5236 let raw_binary_values = [b"Blart Versenwald III".to_vec()];
5237 let raw_binary_value_refs = raw_binary_values
5238 .iter()
5239 .map(|x| x.as_slice())
5240 .collect::<Vec<_>>();
5241
5242 let string_values = StringArray::from(raw_string_values.clone());
5243 let binary_values = BinaryArray::from(raw_binary_value_refs);
5244 let batch = RecordBatch::try_new(
5245 Arc::new(schema),
5246 vec![Arc::new(string_values), Arc::new(binary_values)],
5247 )
5248 .unwrap();
5249
5250 let props = WriterProperties::builder()
5251 .set_statistics_truncate_length(Some(2))
5252 .set_dictionary_enabled(false)
5253 .set_encoding(Encoding::PLAIN)
5254 .set_write_page_header_statistics(true)
5255 .set_compression(crate::basic::Compression::UNCOMPRESSED)
5256 .build();
5257
5258 let file = roundtrip_opts(&batch, props);
5259
5260 let first_page = &file[4..];
5265 let mut prot = ThriftSliceInputProtocol::new(first_page);
5266 let hdr = PageHeader::read_thrift(&mut prot).unwrap();
5267 let stats = hdr.data_page_header.unwrap().statistics;
5268 assert!(stats.is_some());
5269 let stats = stats.unwrap();
5270 assert!(!stats.is_max_value_exact.unwrap());
5272 assert!(!stats.is_min_value_exact.unwrap());
5273 assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
5274 assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
5275
5276 let second_page = &prot.as_slice()[hdr.compressed_page_size as usize..];
5278 let mut prot = ThriftSliceInputProtocol::new(second_page);
5279 let hdr = PageHeader::read_thrift(&mut prot).unwrap();
5280 let stats = hdr.data_page_header.unwrap().statistics;
5281 assert!(stats.is_some());
5282 let stats = stats.unwrap();
5283 assert!(!stats.is_max_value_exact.unwrap());
5285 assert!(!stats.is_min_value_exact.unwrap());
5286 assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
5287 assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
5288 }
5289
5290 #[test]
5291 fn test_page_encoding_statistics_roundtrip() {
5292 let batch_schema = Schema::new(vec![Field::new(
5293 "int32",
5294 arrow_schema::DataType::Int32,
5295 false,
5296 )]);
5297
5298 let batch = RecordBatch::try_new(
5299 Arc::new(batch_schema.clone()),
5300 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
5301 )
5302 .unwrap();
5303
5304 let mut file: File = tempfile::tempfile().unwrap();
5305 let mut writer = ArrowWriter::try_new(&mut file, Arc::new(batch_schema), None).unwrap();
5306 writer.write(&batch).unwrap();
5307 let file_metadata = writer.close().unwrap();
5308
5309 assert_eq!(file_metadata.num_row_groups(), 1);
5310 assert_eq!(file_metadata.row_group(0).num_columns(), 1);
5311 assert!(
5312 file_metadata
5313 .row_group(0)
5314 .column(0)
5315 .page_encoding_stats()
5316 .is_some()
5317 );
5318 let chunk_page_stats = file_metadata
5319 .row_group(0)
5320 .column(0)
5321 .page_encoding_stats()
5322 .unwrap();
5323
5324 let options = ReadOptionsBuilder::new()
5326 .with_page_index()
5327 .with_encoding_stats_as_mask(false)
5328 .build();
5329 let reader = SerializedFileReader::new_with_options(file, options).unwrap();
5330
5331 let rowgroup = reader.get_row_group(0).expect("row group missing");
5332 assert_eq!(rowgroup.num_columns(), 1);
5333 let column = rowgroup.metadata().column(0);
5334 assert!(column.page_encoding_stats().is_some());
5335 let file_page_stats = column.page_encoding_stats().unwrap();
5336 assert_eq!(chunk_page_stats, file_page_stats);
5337 }
5338
5339 #[test]
5340 fn test_different_dict_page_size_limit() {
5341 let array = Arc::new(Int64Array::from_iter(0..1024 * 1024));
5342 let schema = Arc::new(Schema::new(vec![
5343 Field::new("col0", arrow_schema::DataType::Int64, false),
5344 Field::new("col1", arrow_schema::DataType::Int64, false),
5345 ]));
5346 let batch =
5347 arrow_array::RecordBatch::try_new(schema.clone(), vec![array.clone(), array]).unwrap();
5348
5349 let props = WriterProperties::builder()
5350 .set_dictionary_page_size_limit(1024 * 1024)
5351 .set_column_dictionary_page_size_limit(ColumnPath::from("col1"), 1024 * 1024 * 4)
5352 .build();
5353 let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap();
5354 writer.write(&batch).unwrap();
5355 let data = Bytes::from(writer.into_inner().unwrap());
5356
5357 let mut metadata = ParquetMetaDataReader::new();
5358 metadata.try_parse(&data).unwrap();
5359 let metadata = metadata.finish().unwrap();
5360 let col0_meta = metadata.row_group(0).column(0);
5361 let col1_meta = metadata.row_group(0).column(1);
5362
5363 let get_dict_page_size = move |meta: &ColumnChunkMetaData| {
5364 let mut reader =
5365 SerializedPageReader::new(Arc::new(data.clone()), meta, 0, None).unwrap();
5366 let page = reader.get_next_page().unwrap().unwrap();
5367 match page {
5368 Page::DictionaryPage { buf, .. } => buf.len(),
5369 _ => panic!("expected DictionaryPage"),
5370 }
5371 };
5372
5373 assert_eq!(get_dict_page_size(col0_meta), 1024 * 1024);
5374 assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4);
5375 }
5376
5377 #[test]
5378 fn test_arrow_writer_granular_mode_roundtrip() {
5379 let small = "tiny".to_string();
5388 let big = "x".repeat(64 * 1024);
5389 let strings: Vec<String> = (0..256)
5390 .map(|i| {
5391 if i % 16 == 0 {
5392 big.clone()
5393 } else {
5394 small.clone()
5395 }
5396 })
5397 .collect();
5398
5399 let schema = Arc::new(Schema::new(vec![Field::new(
5400 "col",
5401 ArrowDataType::Utf8,
5402 false,
5403 )]));
5404 let batch = RecordBatch::try_new(
5405 schema.clone(),
5406 vec![Arc::new(StringArray::from(strings.clone())) as _],
5407 )
5408 .unwrap();
5409
5410 let props = WriterProperties::builder()
5411 .set_dictionary_enabled(false)
5412 .set_data_page_size_limit(16 * 1024)
5413 .build();
5414 let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap();
5415 writer.write(&batch).unwrap();
5416 let data = Bytes::from(writer.into_inner().unwrap());
5417
5418 let mut reader = ParquetRecordBatchReader::try_new(data, 1024).unwrap();
5419 let read = reader.next().unwrap().unwrap();
5420 assert!(reader.next().is_none(), "expected one batch");
5421 let col = read
5422 .column(0)
5423 .as_any()
5424 .downcast_ref::<StringArray>()
5425 .unwrap();
5426 assert_eq!(col.len(), strings.len());
5427 for (i, expected) in strings.iter().enumerate() {
5428 assert_eq!(
5429 col.value(i),
5430 expected.as_str(),
5431 "value mismatch at index {i}"
5432 );
5433 }
5434 }
5435
5436 #[test]
5437 fn test_arrow_writer_all_null_string_column() {
5438 let num_rows = 1024;
5443 let schema = Arc::new(Schema::new(vec![Field::new(
5444 "col",
5445 ArrowDataType::Utf8,
5446 true,
5447 )]));
5448 let nulls: Vec<Option<&str>> = vec![None; num_rows];
5449 let batch = RecordBatch::try_new(
5450 schema.clone(),
5451 vec![Arc::new(StringArray::from(nulls)) as _],
5452 )
5453 .unwrap();
5454
5455 let props = WriterProperties::builder()
5456 .set_dictionary_enabled(false)
5457 .set_data_page_size_limit(16 * 1024)
5458 .build();
5459 let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap();
5460 writer.write(&batch).unwrap();
5461 let data = Bytes::from(writer.into_inner().unwrap());
5462
5463 let mut metadata = ParquetMetaDataReader::new();
5466 metadata.try_parse(&data).unwrap();
5467 let metadata = metadata.finish().unwrap();
5468 let row_group = metadata.row_group(0);
5469 let col_meta = row_group.column(0);
5470 assert_eq!(row_group.num_rows() as usize, num_rows);
5471 if let Some(stats) = col_meta.statistics() {
5474 assert_eq!(
5475 stats.null_count_opt().unwrap_or(0) as usize,
5476 num_rows,
5477 "expected all-null column to report null_count = num_rows"
5478 );
5479 }
5480
5481 let mut reader =
5482 SerializedPageReader::new(Arc::new(data.clone()), col_meta, num_rows, None).unwrap();
5483 let mut total_values = 0u32;
5484 while let Some(page) = reader.get_next_page().unwrap() {
5485 if matches!(page, Page::DataPage { .. } | Page::DataPageV2 { .. }) {
5486 total_values += page.num_values();
5487 }
5488 }
5489 assert_eq!(
5490 total_values as usize, num_rows,
5491 "expected every level position to be represented in some page"
5492 );
5493 }
5494
5495 struct WriteBatchesShape {
5496 num_batches: usize,
5497 rows_per_batch: usize,
5498 row_size: usize,
5499 }
5500
5501 fn write_batches(
5503 WriteBatchesShape {
5504 num_batches,
5505 rows_per_batch,
5506 row_size,
5507 }: WriteBatchesShape,
5508 props: WriterProperties,
5509 ) -> ParquetRecordBatchReaderBuilder<File> {
5510 let schema = Arc::new(Schema::new(vec![Field::new(
5511 "str",
5512 ArrowDataType::Utf8,
5513 false,
5514 )]));
5515 let file = tempfile::tempfile().unwrap();
5516 let mut writer =
5517 ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
5518
5519 for batch_idx in 0..num_batches {
5520 let strings: Vec<String> = (0..rows_per_batch)
5521 .map(|i| format!("{:0>width$}", batch_idx * 10 + i, width = row_size))
5522 .collect();
5523 let array = StringArray::from(strings);
5524 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
5525 writer.write(&batch).unwrap();
5526 }
5527 writer.close().unwrap();
5528 ParquetRecordBatchReaderBuilder::try_new(file).unwrap()
5529 }
5530
5531 #[test]
5532 fn test_row_group_limit_none_writes_single_row_group() {
5534 let props = WriterProperties::builder()
5535 .set_max_row_group_row_count(None)
5536 .set_max_row_group_bytes(None)
5537 .build();
5538
5539 let builder = write_batches(
5540 WriteBatchesShape {
5541 num_batches: 1,
5542 rows_per_batch: 1000,
5543 row_size: 4,
5544 },
5545 props,
5546 );
5547
5548 assert_eq!(
5549 &row_group_sizes(builder.metadata()),
5550 &[1000],
5551 "With no limits, all rows should be in a single row group"
5552 );
5553 }
5554
5555 #[test]
5556 fn test_row_group_limit_rows_only() {
5558 let props = WriterProperties::builder()
5559 .set_max_row_group_row_count(Some(300))
5560 .set_max_row_group_bytes(None)
5561 .build();
5562
5563 let builder = write_batches(
5564 WriteBatchesShape {
5565 num_batches: 1,
5566 rows_per_batch: 1000,
5567 row_size: 4,
5568 },
5569 props,
5570 );
5571
5572 assert_eq!(
5573 &row_group_sizes(builder.metadata()),
5574 &[300, 300, 300, 100],
5575 "Row groups should be split by row count"
5576 );
5577 }
5578
5579 #[test]
5580 fn test_row_group_limit_bytes_only() {
5582 let props = WriterProperties::builder()
5583 .set_max_row_group_row_count(None)
5584 .set_max_row_group_bytes(Some(3500))
5586 .build();
5587
5588 let builder = write_batches(
5589 WriteBatchesShape {
5590 num_batches: 10,
5591 rows_per_batch: 10,
5592 row_size: 100,
5593 },
5594 props,
5595 );
5596
5597 let sizes = row_group_sizes(builder.metadata());
5598
5599 assert!(
5600 sizes.len() > 1,
5601 "Should have multiple row groups due to byte limit, got {sizes:?}",
5602 );
5603
5604 let total_rows: i64 = sizes.iter().sum();
5605 assert_eq!(total_rows, 100, "Total rows should be preserved");
5606 }
5607
5608 #[test]
5609 fn test_row_group_limit_bytes_flushes_when_current_group_already_too_large() {
5611 let schema = Arc::new(Schema::new(vec![Field::new(
5612 "str",
5613 ArrowDataType::Utf8,
5614 false,
5615 )]));
5616 let file = tempfile::tempfile().unwrap();
5617
5618 let props = WriterProperties::builder()
5620 .set_max_row_group_row_count(None)
5621 .set_max_row_group_bytes(None)
5622 .build();
5623 let mut writer =
5624 ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap();
5625
5626 let first_array = StringArray::from(
5627 (0..10)
5628 .map(|i| format!("{:0>100}", i))
5629 .collect::<Vec<String>>(),
5630 );
5631 let first_batch =
5632 RecordBatch::try_new(schema.clone(), vec![Arc::new(first_array)]).unwrap();
5633 writer.write(&first_batch).unwrap();
5634 assert_eq!(writer.in_progress_rows(), 10);
5635
5636 writer.max_row_group_bytes = Some(1);
5639
5640 let second_array = StringArray::from(vec!["x".to_string()]);
5641 let second_batch =
5642 RecordBatch::try_new(schema.clone(), vec![Arc::new(second_array)]).unwrap();
5643 writer.write(&second_batch).unwrap();
5644 writer.close().unwrap();
5645 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
5646
5647 assert_eq!(
5648 &row_group_sizes(builder.metadata()),
5649 &[10, 1],
5650 "The second write should flush an oversized in-progress row group first",
5651 );
5652 }
5653
5654 #[test]
5655 fn test_row_group_limit_both_row_wins_single_batch() {
5657 let props = WriterProperties::builder()
5658 .set_max_row_group_row_count(Some(200)) .set_max_row_group_bytes(Some(1024 * 1024)) .build();
5661
5662 let builder = write_batches(
5663 WriteBatchesShape {
5664 num_batches: 1,
5665 row_size: 4,
5666 rows_per_batch: 1000,
5667 },
5668 props,
5669 );
5670
5671 assert_eq!(
5672 &row_group_sizes(builder.metadata()),
5673 &[200, 200, 200, 200, 200],
5674 "Row limit should trigger before byte limit"
5675 );
5676 }
5677
5678 #[test]
5679 fn test_row_group_limit_both_row_wins_multiple_batches() {
5681 let props = WriterProperties::builder()
5682 .set_max_row_group_row_count(Some(5)) .set_max_row_group_bytes(Some(9999)) .build();
5685
5686 let builder = write_batches(
5687 WriteBatchesShape {
5688 num_batches: 10,
5689 rows_per_batch: 10,
5690 row_size: 100,
5691 },
5692 props,
5693 );
5694
5695 assert_eq!(
5696 &row_group_sizes(builder.metadata()),
5697 &[5; 20],
5698 "Row limit should trigger before byte limit"
5699 );
5700 }
5701
5702 #[test]
5703 fn test_row_group_limit_both_bytes_wins() {
5705 let props = WriterProperties::builder()
5706 .set_max_row_group_row_count(Some(1000)) .set_max_row_group_bytes(Some(3500)) .build();
5709
5710 let builder = write_batches(
5711 WriteBatchesShape {
5712 num_batches: 10,
5713 rows_per_batch: 10,
5714 row_size: 100,
5715 },
5716 props,
5717 );
5718
5719 let sizes = row_group_sizes(builder.metadata());
5720
5721 assert!(
5722 sizes.len() > 1,
5723 "Byte limit should trigger before row limit, got {sizes:?}",
5724 );
5725
5726 assert!(
5727 sizes.iter().all(|&s| s < 1000),
5728 "No row group should hit the row limit"
5729 );
5730
5731 let total_rows: i64 = sizes.iter().sum();
5732 assert_eq!(total_rows, 100, "Total rows should be preserved");
5733 }
5734
5735 #[test]
5736 fn arrow_column_chunk_close_mut_drops_column_index() {
5737 use crate::arrow::ArrowSchemaConverter;
5738 use crate::file::writer::SerializedFileWriter;
5739
5740 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)]));
5741 let props = Arc::new(
5742 WriterProperties::builder()
5743 .set_statistics_enabled(EnabledStatistics::Page)
5744 .build(),
5745 );
5746 let parquet_schema = ArrowSchemaConverter::new()
5747 .with_coerce_types(props.coerce_types())
5748 .convert(&schema)
5749 .unwrap();
5750
5751 let mut buf = Vec::with_capacity(1024);
5752 let mut writer =
5753 SerializedFileWriter::new(&mut buf, parquet_schema.root_schema_ptr(), props.clone())
5754 .unwrap();
5755
5756 let factory = ArrowRowGroupWriterFactory::new(&writer, Arc::clone(&schema));
5757 let mut col_writers = factory.create_column_writers(0).unwrap();
5758 let arr: ArrayRef = Arc::new(Int32Array::from_iter_values(0..64));
5759 for leaves in compute_leaves(schema.field(0), &arr).unwrap() {
5760 col_writers[0].write(&leaves).unwrap();
5761 }
5762 let mut chunk = col_writers.pop().unwrap().close().unwrap();
5763
5764 assert!(
5766 chunk.close().column_index.is_some(),
5767 "EnabledStatistics::Page should produce a column_index"
5768 );
5769
5770 chunk.close_mut().column_index = None;
5772 assert!(chunk.close().column_index.is_none());
5773
5774 let mut rg = writer.next_row_group().unwrap();
5775 chunk.append_to_row_group(&mut rg).unwrap();
5776 rg.close().unwrap();
5777 let file_meta = writer.close().unwrap();
5778
5779 let cc = file_meta.row_group(0).column(0);
5782 assert!(cc.column_index_range().is_none());
5783 }
5784
5785 fn write_column_to_bytes(array: ArrayRef) -> Bytes {
5787 let schema = Arc::new(Schema::new(vec![Field::new(
5788 "col",
5789 array.data_type().clone(),
5790 true,
5791 )]));
5792 let buf = get_bytes_after_close(
5793 schema.clone(),
5794 &RecordBatch::try_new(schema, vec![array]).unwrap(),
5795 );
5796 Bytes::from(buf)
5797 }
5798
5799 fn read_column_with_schema(bytes: Bytes, schema: SchemaRef) -> ArrayRef {
5803 let opts = crate::arrow::arrow_reader::ArrowReaderOptions::new().with_schema(schema);
5804 ParquetRecordBatchReaderBuilder::try_new_with_options(bytes, opts)
5805 .unwrap()
5806 .build()
5807 .unwrap()
5808 .next()
5809 .unwrap()
5810 .unwrap()
5811 .column(0)
5812 .clone()
5813 }
5814
5815 fn ree_write_read_roundtrip(ree: ArrayRef, flat: ArrayRef) {
5816 let flat_schema = Arc::new(Schema::new(vec![Field::new(
5817 "col",
5818 flat.data_type().clone(),
5819 true,
5820 )]));
5821 let ree_bytes = write_column_to_bytes(ree);
5822 let flat_bytes = write_column_to_bytes(flat.clone());
5823 assert_eq!(
5824 ree_bytes, flat_bytes,
5825 "REE and flat bytes should be identical"
5826 );
5827
5828 let decoded_ree = read_column_with_schema(ree_bytes, flat_schema.clone());
5829 let decoded_flat = read_column_with_schema(flat_bytes, flat_schema);
5830
5831 assert_eq!(decoded_ree.as_ref(), flat.as_ref());
5832 assert_eq!(decoded_ree.as_ref(), decoded_flat.as_ref());
5833 }
5834
5835 #[test]
5836 fn ree_string() {
5837 let ree: ArrayRef = Arc::new(
5838 [Some("a"), Some("a"), None, Some("b"), Some("b")]
5839 .into_iter()
5840 .collect::<Int32RunArray>(),
5841 );
5842 let flat: ArrayRef = Arc::new(StringArray::from(vec![
5843 Some("a"),
5844 Some("a"),
5845 None,
5846 Some("b"),
5847 Some("b"),
5848 ]));
5849 ree_write_read_roundtrip(ree, flat);
5850 }
5851
5852 #[test]
5853 fn ree_int32() {
5854 let mut b = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
5855 for v in [Some(1), Some(1), None, Some(2), Some(2)] {
5856 b.append_option(v);
5857 }
5858 let ree: ArrayRef = Arc::new(b.finish());
5859 let flat: ArrayRef = Arc::new(Int32Array::from(vec![
5860 Some(1),
5861 Some(1),
5862 None,
5863 Some(2),
5864 Some(2),
5865 ]));
5866 ree_write_read_roundtrip(ree, flat);
5867 }
5868
5869 #[test]
5870 fn ree_bool() {
5871 let ree: ArrayRef = Arc::new(
5873 RunArray::try_new(
5874 &Int32Array::from(vec![3, 5, 7]),
5875 &BooleanArray::from(vec![Some(true), None, Some(false)]),
5876 )
5877 .unwrap(),
5878 );
5879 let flat: ArrayRef = Arc::new(BooleanArray::from(vec![
5880 Some(true),
5881 Some(true),
5882 Some(true),
5883 None,
5884 None,
5885 Some(false),
5886 Some(false),
5887 ]));
5888 ree_write_read_roundtrip(ree, flat);
5889 }
5890
5891 #[test]
5892 fn ree_fixed_size_binary() {
5893 let mk = |vals: &[Option<&[u8]>]| -> FixedSizeBinaryArray {
5894 let mut b = FixedSizeBinaryBuilder::new(2);
5895 for v in vals {
5896 match v {
5897 Some(x) => b.append_value(x).unwrap(),
5898 None => b.append_null(),
5899 }
5900 }
5901 b.finish()
5902 };
5903 let ree: ArrayRef = Arc::new(
5905 RunArray::try_new(
5906 &Int32Array::from(vec![2, 4, 6]),
5907 &mk(&[Some(b"aa"), None, Some(b"bb")]),
5908 )
5909 .unwrap(),
5910 );
5911 let flat: ArrayRef = Arc::new(mk(&[
5912 Some(b"aa"),
5913 Some(b"aa"),
5914 None,
5915 None,
5916 Some(b"bb"),
5917 Some(b"bb"),
5918 ]));
5919 ree_write_read_roundtrip(ree, flat);
5920 }
5921
5922 #[test]
5923 fn ree_single_run() {
5924 let ree: ArrayRef = Arc::new(["x", "x", "x"].into_iter().collect::<Int32RunArray>());
5925 let flat: ArrayRef = Arc::new(StringArray::from(vec!["x", "x", "x"]));
5926 ree_write_read_roundtrip(ree, flat);
5927 }
5928
5929 #[test]
5930 fn ree_float32() {
5931 let ree: ArrayRef = Arc::new(
5933 RunArray::try_new(
5934 &Int32Array::from(vec![2, 4, 5]),
5935 &Float32Array::from(vec![Some(1.0_f32), None, Some(2.5_f32)]),
5936 )
5937 .unwrap(),
5938 );
5939 let flat: ArrayRef = Arc::new(Float32Array::from(vec![
5940 Some(1.0_f32),
5941 Some(1.0_f32),
5942 None,
5943 None,
5944 Some(2.5_f32),
5945 ]));
5946 ree_write_read_roundtrip(ree, flat);
5947 }
5948
5949 #[test]
5950 fn ree_sliced() {
5951 let full: ArrayRef = Arc::new(
5956 RunArray::try_new(
5957 &Int32Array::from(vec![3, 5, 7]),
5958 &StringArray::from(vec!["a", "b", "c"]),
5959 )
5960 .unwrap(),
5961 );
5962 let sliced = full.slice(2, 5);
5963 let flat: ArrayRef = Arc::new(StringArray::from(vec!["a", "b", "b", "c", "c"]));
5964 ree_write_read_roundtrip(sliced, flat);
5965 }
5966
5967 #[test]
5968 fn ree_struct_with_ree_child() {
5969 let run_ends = Int32Array::from(vec![2i32, 3, 5]);
5972
5973 let col_a: ArrayRef = Arc::new(
5974 RunArray::try_new(
5975 &run_ends,
5976 &StringArray::from(vec![Some("foo"), None, Some("bar")]),
5977 )
5978 .unwrap(),
5979 );
5980 let col_b: ArrayRef = Arc::new(
5981 RunArray::try_new(&run_ends, &Int32Array::from(vec![Some(1), None, Some(2)])).unwrap(),
5982 );
5983
5984 let struct_array: ArrayRef = Arc::new(StructArray::new(
5985 Fields::from(vec![
5986 Field::new("a", col_a.data_type().clone(), true),
5987 Field::new("b", col_b.data_type().clone(), true),
5988 ]),
5989 vec![col_a, col_b],
5990 None,
5991 ));
5992
5993 let schema = Arc::new(Schema::new(vec![Field::new(
5994 "row",
5995 struct_array.data_type().clone(),
5996 true,
5997 )]));
5998 let batch = RecordBatch::try_new(schema.clone(), vec![struct_array]).unwrap();
5999
6000 let mut buf = Vec::new();
6001 let mut writer = ArrowWriter::try_new(&mut buf, schema, None).unwrap();
6002 writer.write(&batch).unwrap();
6003 let metadata = writer.close().unwrap();
6004
6005 let parquet_schema = metadata.file_metadata().schema_descr();
6006 assert_eq!(parquet_schema.num_columns(), 2);
6007 assert_eq!(
6008 parquet_schema.column(0).physical_type(),
6009 crate::basic::Type::BYTE_ARRAY
6010 );
6011 assert_eq!(parquet_schema.column(0).path().string(), "row.a");
6012 assert_eq!(
6013 parquet_schema.column(1).physical_type(),
6014 crate::basic::Type::INT32
6015 );
6016 assert_eq!(parquet_schema.column(1).path().string(), "row.b");
6017 }
6018}