1use crate::bloom_filter::Sbbf;
21use crate::file::metadata::thrift::PageHeader;
22use crate::file::page_index::column_index::ColumnIndexMetaData;
23use crate::file::page_index::offset_index::OffsetIndexMetaData;
24use crate::parquet_thrift::{ThriftCompactOutputProtocol, WriteThrift};
25use std::fmt::Debug;
26use std::io::{BufWriter, IoSlice, Read};
27use std::{io::Write, sync::Arc};
28
29use crate::column::page_encryption::PageEncryptor;
30use crate::column::writer::{ColumnCloseResult, ColumnWriterImpl, get_typed_column_writer_mut};
31use crate::column::{
32 page::{CompressedPage, PageWriteSpec, PageWriter},
33 writer::{ColumnWriter, get_column_writer},
34};
35use crate::data_type::DataType;
36#[cfg(feature = "encryption")]
37use crate::encryption::encrypt::{
38 FileEncryptionProperties, FileEncryptor, get_column_crypto_metadata,
39};
40use crate::errors::{ParquetError, Result};
41#[cfg(feature = "encryption")]
42use crate::file::PARQUET_MAGIC_ENCR_FOOTER;
43use crate::file::properties::{BloomFilterPosition, WriterPropertiesPtr};
44use crate::file::reader::ChunkReader;
45use crate::file::{PARQUET_MAGIC, metadata::*};
46use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor, TypePtr};
47
48pub struct TrackedWrite<W: Write> {
52 inner: BufWriter<W>,
53 bytes_written: usize,
54}
55
56impl<W: Write> TrackedWrite<W> {
57 pub fn new(inner: W) -> Self {
59 let buf_write = BufWriter::new(inner);
60 Self {
61 inner: buf_write,
62 bytes_written: 0,
63 }
64 }
65
66 pub fn bytes_written(&self) -> usize {
68 self.bytes_written
69 }
70
71 pub fn inner(&self) -> &W {
73 self.inner.get_ref()
74 }
75
76 pub fn inner_mut(&mut self) -> &mut W {
81 self.inner.get_mut()
82 }
83
84 pub fn into_inner(self) -> Result<W> {
86 self.inner.into_inner().map_err(|err| {
87 ParquetError::General(format!("fail to get inner writer: {:?}", err.to_string()))
88 })
89 }
90}
91
92impl<W: Write> Write for TrackedWrite<W> {
93 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
94 let bytes = self.inner.write(buf)?;
95 self.bytes_written += bytes;
96 Ok(bytes)
97 }
98
99 fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> std::io::Result<usize> {
100 let bytes = self.inner.write_vectored(bufs)?;
101 self.bytes_written += bytes;
102 Ok(bytes)
103 }
104
105 fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
106 self.inner.write_all(buf)?;
107 self.bytes_written += buf.len();
108
109 Ok(())
110 }
111
112 fn flush(&mut self) -> std::io::Result<()> {
113 self.inner.flush()
114 }
115}
116
117pub type OnCloseColumnChunk<'a> = Box<dyn FnOnce(ColumnCloseResult) -> Result<()> + 'a>;
119
120pub type OnCloseRowGroup<'a, W> = Box<
126 dyn FnOnce(
127 &'a mut TrackedWrite<W>,
128 RowGroupMetaData,
129 Vec<Option<Sbbf>>,
130 Vec<Option<ColumnIndexMetaData>>,
131 Vec<Option<OffsetIndexMetaData>>,
132 ) -> Result<()>
133 + 'a
134 + Send,
135>;
136
137pub struct SerializedFileWriter<W: Write> {
157 buf: TrackedWrite<W>,
158 descr: SchemaDescPtr,
159 props: WriterPropertiesPtr,
160 row_groups: Vec<RowGroupMetaData>,
161 bloom_filters: Vec<Vec<Option<Sbbf>>>,
162 column_indexes: Vec<Vec<Option<ColumnIndexMetaData>>>,
163 offset_indexes: Vec<Vec<Option<OffsetIndexMetaData>>>,
164 row_group_index: usize,
165 kv_metadatas: Vec<KeyValue>,
167 finished: bool,
168 #[cfg(feature = "encryption")]
169 file_encryptor: Option<Arc<FileEncryptor>>,
170}
171
172impl<W: Write> Debug for SerializedFileWriter<W> {
173 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
174 f.debug_struct("SerializedFileWriter")
177 .field("descr", &self.descr)
178 .field("row_group_index", &self.row_group_index)
179 .field("kv_metadatas", &self.kv_metadatas)
180 .finish_non_exhaustive()
181 }
182}
183
184impl<W: Write + Send> SerializedFileWriter<W> {
185 pub fn new(buf: W, schema: TypePtr, properties: WriterPropertiesPtr) -> Result<Self> {
187 let mut buf = TrackedWrite::new(buf);
188
189 let schema_descriptor = SchemaDescriptor::new(schema.clone());
190
191 #[cfg(feature = "encryption")]
192 let file_encryptor = Self::get_file_encryptor(&properties, &schema_descriptor)?;
193
194 Self::start_file(&properties, &mut buf)?;
195 Ok(Self {
196 buf,
197 descr: Arc::new(schema_descriptor),
198 props: properties,
199 row_groups: vec![],
200 bloom_filters: vec![],
201 column_indexes: Vec::new(),
202 offset_indexes: Vec::new(),
203 row_group_index: 0,
204 kv_metadatas: Vec::new(),
205 finished: false,
206 #[cfg(feature = "encryption")]
207 file_encryptor,
208 })
209 }
210
211 #[cfg(feature = "encryption")]
212 fn get_file_encryptor(
213 properties: &WriterPropertiesPtr,
214 schema_descriptor: &SchemaDescriptor,
215 ) -> Result<Option<Arc<FileEncryptor>>> {
216 if let Some(file_encryption_properties) = properties.file_encryption_properties() {
217 file_encryption_properties.validate_encrypted_column_names(schema_descriptor)?;
218
219 Ok(Some(Arc::new(FileEncryptor::new(Arc::clone(
220 file_encryption_properties,
221 ))?)))
222 } else {
223 Ok(None)
224 }
225 }
226
227 pub fn next_row_group(&mut self) -> Result<SerializedRowGroupWriter<'_, W>> {
236 self.assert_previous_writer_closed()?;
237 let ordinal = self.row_group_index;
238
239 let ordinal: i16 = ordinal.try_into().map_err(|_| {
240 ParquetError::General(format!(
241 "Parquet does not support more than {} row groups per file (currently: {})",
242 i16::MAX,
243 ordinal
244 ))
245 })?;
246
247 self.row_group_index = self
248 .row_group_index
249 .checked_add(1)
250 .expect("SerializedFileWriter::row_group_index overflowed");
251
252 let bloom_filter_position = self.properties().bloom_filter_position();
253 let row_groups = &mut self.row_groups;
254 let row_bloom_filters = &mut self.bloom_filters;
255 let row_column_indexes = &mut self.column_indexes;
256 let row_offset_indexes = &mut self.offset_indexes;
257 let on_close = move |buf,
258 mut metadata,
259 row_group_bloom_filter,
260 row_group_column_index,
261 row_group_offset_index| {
262 row_bloom_filters.push(row_group_bloom_filter);
263 row_column_indexes.push(row_group_column_index);
264 row_offset_indexes.push(row_group_offset_index);
265 match bloom_filter_position {
267 BloomFilterPosition::AfterRowGroup => {
268 write_bloom_filters(buf, row_bloom_filters, &mut metadata)?
269 }
270 BloomFilterPosition::End => (),
271 };
272 row_groups.push(metadata);
273 Ok(())
274 };
275
276 let row_group_writer = SerializedRowGroupWriter::new(
277 self.descr.clone(),
278 self.props.clone(),
279 &mut self.buf,
280 ordinal,
281 Some(Box::new(on_close)),
282 );
283 #[cfg(feature = "encryption")]
284 let row_group_writer = row_group_writer.with_file_encryptor(self.file_encryptor.clone());
285
286 Ok(row_group_writer)
287 }
288
289 pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
291 &self.row_groups
292 }
293
294 pub fn finish(&mut self) -> Result<ParquetMetaData> {
300 self.assert_previous_writer_closed()?;
301 let metadata = self.write_metadata()?;
302 self.buf.flush()?;
303 Ok(metadata)
304 }
305
306 pub fn close(mut self) -> Result<ParquetMetaData> {
308 self.finish()
309 }
310
311 #[cfg(not(feature = "encryption"))]
313 fn start_file(_properties: &WriterPropertiesPtr, buf: &mut TrackedWrite<W>) -> Result<()> {
314 buf.write_all(get_file_magic())?;
315 Ok(())
316 }
317
318 #[cfg(feature = "encryption")]
320 fn start_file(properties: &WriterPropertiesPtr, buf: &mut TrackedWrite<W>) -> Result<()> {
321 let magic = get_file_magic(properties.file_encryption_properties.as_ref());
322
323 buf.write_all(magic)?;
324 Ok(())
325 }
326
327 fn write_metadata(&mut self) -> Result<ParquetMetaData> {
330 self.finished = true;
331
332 for row_group in &mut self.row_groups {
334 write_bloom_filters(&mut self.buf, &mut self.bloom_filters, row_group)?;
335 }
336
337 let key_value_metadata = match self.props.key_value_metadata() {
338 Some(kv) => Some(kv.iter().chain(&self.kv_metadatas).cloned().collect()),
339 None if self.kv_metadatas.is_empty() => None,
340 None => Some(self.kv_metadatas.clone()),
341 };
342
343 let row_groups = std::mem::take(&mut self.row_groups);
345 let column_indexes = std::mem::take(&mut self.column_indexes);
346 let offset_indexes = std::mem::take(&mut self.offset_indexes);
347
348 let write_path_in_schema = self.props.write_path_in_schema();
349 let mut encoder = ThriftMetadataWriter::new(
350 &mut self.buf,
351 &self.descr,
352 row_groups,
353 Some(self.props.created_by().to_string()),
354 self.props.writer_version().as_num(),
355 write_path_in_schema,
356 );
357
358 #[cfg(feature = "encryption")]
359 {
360 encoder = encoder.with_file_encryptor(self.file_encryptor.clone());
361 }
362
363 if let Some(key_value_metadata) = key_value_metadata {
364 encoder = encoder.with_key_value_metadata(key_value_metadata)
365 }
366
367 encoder = encoder.with_column_indexes(column_indexes);
368 if !self.props.offset_index_disabled() {
369 encoder = encoder.with_offset_indexes(offset_indexes);
370 }
371 encoder.finish()
372 }
373
374 #[inline]
375 fn assert_previous_writer_closed(&self) -> Result<()> {
376 if self.finished {
377 return Err(general_err!("SerializedFileWriter already finished"));
378 }
379
380 if self.row_group_index != self.row_groups.len() {
381 Err(general_err!("Previous row group writer was not closed"))
382 } else {
383 Ok(())
384 }
385 }
386
387 pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
389 self.kv_metadatas.push(kv_metadata);
390 }
391
392 pub fn schema_descr(&self) -> &SchemaDescriptor {
394 &self.descr
395 }
396
397 #[cfg(feature = "arrow")]
399 pub(crate) fn schema_descr_ptr(&self) -> &SchemaDescPtr {
400 &self.descr
401 }
402
403 pub fn properties(&self) -> &WriterPropertiesPtr {
405 &self.props
406 }
407
408 pub fn inner(&self) -> &W {
410 self.buf.inner()
411 }
412
413 pub fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
422 self.buf.write_all(buf)
423 }
424
425 pub fn flush(&mut self) -> std::io::Result<()> {
427 self.buf.flush()
428 }
429
430 pub fn inner_mut(&mut self) -> &mut W {
439 self.buf.inner_mut()
440 }
441
442 pub fn into_inner(mut self) -> Result<W> {
444 self.assert_previous_writer_closed()?;
445 let _ = self.write_metadata()?;
446
447 self.buf.into_inner()
448 }
449
450 pub fn bytes_written(&self) -> usize {
452 self.buf.bytes_written()
453 }
454
455 #[cfg(feature = "encryption")]
457 pub(crate) fn file_encryptor(&self) -> Option<Arc<FileEncryptor>> {
458 self.file_encryptor.clone()
459 }
460}
461
462fn write_bloom_filters<W: Write + Send>(
465 buf: &mut TrackedWrite<W>,
466 bloom_filters: &mut [Vec<Option<Sbbf>>],
467 row_group: &mut RowGroupMetaData,
468) -> Result<()> {
469 let row_group_idx: u16 = row_group
474 .ordinal()
475 .expect("Missing row group ordinal")
476 .try_into()
477 .map_err(|_| {
478 ParquetError::General(format!(
479 "Negative row group ordinal: {})",
480 row_group.ordinal().unwrap()
481 ))
482 })?;
483 let row_group_idx = row_group_idx as usize;
484 for (column_idx, column_chunk) in row_group.columns_mut().iter_mut().enumerate() {
485 if let Some(bloom_filter) = bloom_filters[row_group_idx][column_idx].take() {
486 let start_offset = buf.bytes_written();
487 bloom_filter.write(&mut *buf)?;
488 let end_offset = buf.bytes_written();
489 *column_chunk = column_chunk
491 .clone()
492 .into_builder()
493 .set_bloom_filter_offset(Some(start_offset as i64))
494 .set_bloom_filter_length(Some((end_offset - start_offset) as i32))
495 .build()?;
496 }
497 }
498 Ok(())
499}
500
501pub struct SerializedRowGroupWriter<'a, W: Write> {
514 descr: SchemaDescPtr,
515 props: WriterPropertiesPtr,
516 buf: &'a mut TrackedWrite<W>,
517 total_rows_written: Option<u64>,
518 total_bytes_written: u64,
519 total_uncompressed_bytes: i64,
520 column_index: usize,
521 row_group_metadata: Option<RowGroupMetaDataPtr>,
522 column_chunks: Vec<ColumnChunkMetaData>,
523 bloom_filters: Vec<Option<Sbbf>>,
524 column_indexes: Vec<Option<ColumnIndexMetaData>>,
525 offset_indexes: Vec<Option<OffsetIndexMetaData>>,
526 row_group_index: i16,
527 file_offset: i64,
528 on_close: Option<OnCloseRowGroup<'a, W>>,
529 #[cfg(feature = "encryption")]
530 file_encryptor: Option<Arc<FileEncryptor>>,
531}
532
533impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> {
534 pub fn new(
543 schema_descr: SchemaDescPtr,
544 properties: WriterPropertiesPtr,
545 buf: &'a mut TrackedWrite<W>,
546 row_group_index: i16,
547 on_close: Option<OnCloseRowGroup<'a, W>>,
548 ) -> Self {
549 let num_columns = schema_descr.num_columns();
550 let file_offset = buf.bytes_written() as i64;
551 Self {
552 buf,
553 row_group_index,
554 file_offset,
555 on_close,
556 total_rows_written: None,
557 descr: schema_descr,
558 props: properties,
559 column_index: 0,
560 row_group_metadata: None,
561 column_chunks: Vec::with_capacity(num_columns),
562 bloom_filters: Vec::with_capacity(num_columns),
563 column_indexes: Vec::with_capacity(num_columns),
564 offset_indexes: Vec::with_capacity(num_columns),
565 total_bytes_written: 0,
566 total_uncompressed_bytes: 0,
567 #[cfg(feature = "encryption")]
568 file_encryptor: None,
569 }
570 }
571
572 #[cfg(feature = "encryption")]
573 pub(crate) fn with_file_encryptor(
575 mut self,
576 file_encryptor: Option<Arc<FileEncryptor>>,
577 ) -> Self {
578 self.file_encryptor = file_encryptor;
579 self
580 }
581
582 fn next_column_desc(&mut self) -> Option<ColumnDescPtr> {
584 let ret = self.descr.columns().get(self.column_index)?.clone();
585 self.column_index += 1;
586 Some(ret)
587 }
588
589 fn get_on_close(&mut self) -> (&mut TrackedWrite<W>, OnCloseColumnChunk<'_>) {
591 let total_bytes_written = &mut self.total_bytes_written;
592 let total_uncompressed_bytes = &mut self.total_uncompressed_bytes;
593 let total_rows_written = &mut self.total_rows_written;
594 let column_chunks = &mut self.column_chunks;
595 let column_indexes = &mut self.column_indexes;
596 let offset_indexes = &mut self.offset_indexes;
597 let bloom_filters = &mut self.bloom_filters;
598
599 let on_close = |r: ColumnCloseResult| {
600 *total_bytes_written += r.bytes_written;
602 *total_uncompressed_bytes += r.metadata.uncompressed_size();
603 column_chunks.push(r.metadata);
604 bloom_filters.push(r.bloom_filter);
605 column_indexes.push(r.column_index);
606 offset_indexes.push(r.offset_index);
607
608 if let Some(rows) = *total_rows_written {
609 if rows != r.rows_written {
610 return Err(general_err!(
611 "Incorrect number of rows, expected {} != {} rows",
612 rows,
613 r.rows_written
614 ));
615 }
616 } else {
617 *total_rows_written = Some(r.rows_written);
618 }
619
620 Ok(())
621 };
622 (self.buf, Box::new(on_close))
623 }
624
625 pub(crate) fn next_column_with_factory<'b, F, C>(&'b mut self, factory: F) -> Result<Option<C>>
628 where
629 F: FnOnce(
630 ColumnDescPtr,
631 WriterPropertiesPtr,
632 Box<dyn PageWriter + 'b>,
633 OnCloseColumnChunk<'b>,
634 ) -> Result<C>,
635 {
636 self.assert_previous_writer_closed()?;
637
638 let encryptor_context = self.get_page_encryptor_context();
639
640 Ok(match self.next_column_desc() {
641 Some(column) => {
642 let props = self.props.clone();
643 let (buf, on_close) = self.get_on_close();
644
645 let page_writer = SerializedPageWriter::new(buf);
646 let page_writer =
647 Self::set_page_writer_encryptor(&column, encryptor_context, page_writer)?;
648
649 Some(factory(
650 column,
651 props,
652 Box::new(page_writer),
653 Box::new(on_close),
654 )?)
655 }
656 None => None,
657 })
658 }
659
660 pub fn next_column(&mut self) -> Result<Option<SerializedColumnWriter<'_>>> {
664 self.next_column_with_factory(|descr, props, page_writer, on_close| {
665 let column_writer = get_column_writer(descr, props, page_writer);
666 Ok(SerializedColumnWriter::new(column_writer, Some(on_close)))
667 })
668 }
669
670 pub fn append_column<R: ChunkReader>(
685 &mut self,
686 reader: &R,
687 close: ColumnCloseResult,
688 ) -> Result<()> {
689 let metadata = &close.metadata;
692 let src_offset = metadata
693 .dictionary_page_offset()
694 .unwrap_or_else(|| metadata.data_page_offset());
695 let read = reader.get_read(src_offset as _)?;
696 self.append_column_from_read(read, close)
697 }
698
699 pub(crate) fn append_column_from_read<R: Read>(
709 &mut self,
710 read: R,
711 mut close: ColumnCloseResult,
712 ) -> Result<()> {
713 self.assert_previous_writer_closed()?;
714 let desc = self
715 .next_column_desc()
716 .ok_or_else(|| general_err!("exhausted columns in SerializedRowGroupWriter"))?;
717
718 let metadata = close.metadata;
719
720 if metadata.column_descr() != desc.as_ref() {
721 return Err(general_err!(
722 "column descriptor mismatch, expected {:?} got {:?}",
723 desc,
724 metadata.column_descr()
725 ));
726 }
727
728 let src_dictionary_offset = metadata.dictionary_page_offset();
729 let src_data_offset = metadata.data_page_offset();
730 let src_offset = src_dictionary_offset.unwrap_or(src_data_offset);
731 let src_length = metadata.compressed_size();
732
733 let write_offset = self.buf.bytes_written();
734 let mut read = read.take(src_length as _);
735 let write_length = std::io::copy(&mut read, &mut self.buf)?;
736
737 if src_length as u64 != write_length {
738 return Err(general_err!(
739 "Failed to splice column data, expected {read_length} got {write_length}"
740 ));
741 }
742
743 let map_offset = |x| x - src_offset + write_offset as i64;
744 let mut builder = ColumnChunkMetaData::builder(metadata.column_descr_ptr())
745 .set_compression_codec(metadata.compression_codec())
746 .set_encodings_mask(*metadata.encodings_mask())
747 .set_total_compressed_size(metadata.compressed_size())
748 .set_total_uncompressed_size(metadata.uncompressed_size())
749 .set_num_values(metadata.num_values())
750 .set_data_page_offset(map_offset(src_data_offset))
751 .set_dictionary_page_offset(src_dictionary_offset.map(map_offset))
752 .set_unencoded_byte_array_data_bytes(metadata.unencoded_byte_array_data_bytes());
753
754 if let Some(rep_hist) = metadata.repetition_level_histogram() {
755 builder = builder.set_repetition_level_histogram(Some(rep_hist.clone()))
756 }
757 if let Some(def_hist) = metadata.definition_level_histogram() {
758 builder = builder.set_definition_level_histogram(Some(def_hist.clone()))
759 }
760 if let Some(statistics) = metadata.statistics() {
761 builder = builder.set_statistics(statistics.clone())
762 }
763 if let Some(geo_statistics) = metadata.geo_statistics() {
764 builder = builder.set_geo_statistics(Box::new(geo_statistics.clone()))
765 }
766 if let Some(page_encoding_stats) = metadata.page_encoding_stats() {
767 builder = builder.set_page_encoding_stats(page_encoding_stats.clone())
768 }
769 builder = self.set_column_crypto_metadata(builder, &metadata);
770 close.metadata = builder.build()?;
771
772 if let Some(offsets) = close.offset_index.as_mut() {
773 for location in &mut offsets.page_locations {
774 location.offset = map_offset(location.offset)
775 }
776 }
777
778 let (_, on_close) = self.get_on_close();
779 on_close(close)
780 }
781
782 pub fn close(mut self) -> Result<RowGroupMetaDataPtr> {
784 if self.row_group_metadata.is_none() {
785 self.assert_previous_writer_closed()?;
786
787 let column_chunks = std::mem::take(&mut self.column_chunks);
788 let row_group_metadata = RowGroupMetaData::builder(self.descr.clone())
789 .set_column_metadata(column_chunks)
790 .set_total_byte_size(self.total_uncompressed_bytes)
791 .set_num_rows(self.total_rows_written.unwrap_or(0) as i64)
792 .set_sorting_columns(self.props.sorting_columns().cloned())
793 .set_ordinal(self.row_group_index)
794 .set_file_offset(self.file_offset)
795 .build()?;
796
797 self.row_group_metadata = Some(Arc::new(row_group_metadata.clone()));
798
799 if let Some(on_close) = self.on_close.take() {
800 on_close(
801 self.buf,
802 row_group_metadata,
803 self.bloom_filters,
804 self.column_indexes,
805 self.offset_indexes,
806 )?
807 }
808 }
809
810 let metadata = self.row_group_metadata.as_ref().unwrap().clone();
811 Ok(metadata)
812 }
813
814 #[cfg(feature = "encryption")]
816 fn set_column_crypto_metadata(
817 &self,
818 builder: ColumnChunkMetaDataBuilder,
819 metadata: &ColumnChunkMetaData,
820 ) -> ColumnChunkMetaDataBuilder {
821 if let Some(file_encryptor) = self.file_encryptor.as_ref() {
822 builder.set_column_crypto_metadata(get_column_crypto_metadata(
823 file_encryptor.properties(),
824 &metadata.column_descr_ptr(),
825 ))
826 } else {
827 builder
828 }
829 }
830
831 #[cfg(feature = "encryption")]
833 fn get_page_encryptor_context(&self) -> PageEncryptorContext {
834 PageEncryptorContext {
835 file_encryptor: self.file_encryptor.clone(),
836 row_group_index: self.row_group_index as usize,
837 column_index: self.column_index,
838 }
839 }
840
841 #[cfg(feature = "encryption")]
843 fn set_page_writer_encryptor<'b>(
844 column: &ColumnDescPtr,
845 context: PageEncryptorContext,
846 page_writer: SerializedPageWriter<'b, W>,
847 ) -> Result<SerializedPageWriter<'b, W>> {
848 let page_encryptor = PageEncryptor::create_if_column_encrypted(
849 &context.file_encryptor,
850 context.row_group_index,
851 context.column_index,
852 &column.path().string(),
853 )?;
854
855 Ok(page_writer.with_page_encryptor(page_encryptor))
856 }
857
858 #[cfg(not(feature = "encryption"))]
860 fn set_column_crypto_metadata(
861 &self,
862 builder: ColumnChunkMetaDataBuilder,
863 _metadata: &ColumnChunkMetaData,
864 ) -> ColumnChunkMetaDataBuilder {
865 builder
866 }
867
868 #[cfg(not(feature = "encryption"))]
869 fn get_page_encryptor_context(&self) -> PageEncryptorContext {
870 PageEncryptorContext {}
871 }
872
873 #[cfg(not(feature = "encryption"))]
875 fn set_page_writer_encryptor<'b>(
876 _column: &ColumnDescPtr,
877 _context: PageEncryptorContext,
878 page_writer: SerializedPageWriter<'b, W>,
879 ) -> Result<SerializedPageWriter<'b, W>> {
880 Ok(page_writer)
881 }
882
883 #[inline]
884 fn assert_previous_writer_closed(&self) -> Result<()> {
885 if self.column_index != self.column_chunks.len() {
886 Err(general_err!("Previous column writer was not closed"))
887 } else {
888 Ok(())
889 }
890 }
891}
892
893#[cfg(feature = "encryption")]
895struct PageEncryptorContext {
896 file_encryptor: Option<Arc<FileEncryptor>>,
897 row_group_index: usize,
898 column_index: usize,
899}
900
901#[cfg(not(feature = "encryption"))]
902struct PageEncryptorContext {}
903
904pub struct SerializedColumnWriter<'a> {
906 inner: ColumnWriter<'a>,
907 on_close: Option<OnCloseColumnChunk<'a>>,
908}
909
910impl<'a> SerializedColumnWriter<'a> {
911 pub fn new(inner: ColumnWriter<'a>, on_close: Option<OnCloseColumnChunk<'a>>) -> Self {
914 Self { inner, on_close }
915 }
916
917 pub fn untyped(&mut self) -> &mut ColumnWriter<'a> {
919 &mut self.inner
920 }
921
922 pub fn typed<T: DataType>(&mut self) -> &mut ColumnWriterImpl<'a, T> {
924 get_typed_column_writer_mut(&mut self.inner)
925 }
926
927 pub fn close(mut self) -> Result<()> {
929 let r = self.inner.close()?;
930 if let Some(on_close) = self.on_close.take() {
931 on_close(r)?
932 }
933
934 Ok(())
935 }
936}
937
938pub struct SerializedPageWriter<'a, W: Write> {
943 sink: &'a mut TrackedWrite<W>,
944 #[cfg(feature = "encryption")]
945 page_encryptor: Option<PageEncryptor>,
946}
947
948impl<'a, W: Write> SerializedPageWriter<'a, W> {
949 pub fn new(sink: &'a mut TrackedWrite<W>) -> Self {
951 Self {
952 sink,
953 #[cfg(feature = "encryption")]
954 page_encryptor: None,
955 }
956 }
957
958 #[inline]
961 fn serialize_page_header(&mut self, header: PageHeader) -> Result<usize> {
962 let start_pos = self.sink.bytes_written();
963 match self.page_encryptor_and_sink_mut() {
964 Some((page_encryptor, sink)) => {
965 page_encryptor.encrypt_page_header(&header, sink)?;
966 }
967 None => {
968 let mut protocol = ThriftCompactOutputProtocol::new(&mut self.sink);
969 header.write_thrift(&mut protocol)?;
970 }
971 }
972 Ok(self.sink.bytes_written() - start_pos)
973 }
974}
975
976#[cfg(feature = "encryption")]
977impl<'a, W: Write> SerializedPageWriter<'a, W> {
978 fn with_page_encryptor(mut self, page_encryptor: Option<PageEncryptor>) -> Self {
980 self.page_encryptor = page_encryptor;
981 self
982 }
983
984 fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
985 self.page_encryptor.as_mut()
986 }
987
988 fn page_encryptor_and_sink_mut(
989 &mut self,
990 ) -> Option<(&mut PageEncryptor, &mut &'a mut TrackedWrite<W>)> {
991 self.page_encryptor.as_mut().map(|pe| (pe, &mut self.sink))
992 }
993}
994
995#[cfg(not(feature = "encryption"))]
996impl<'a, W: Write> SerializedPageWriter<'a, W> {
997 fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
998 None
999 }
1000
1001 fn page_encryptor_and_sink_mut(
1002 &mut self,
1003 ) -> Option<(&mut PageEncryptor, &mut &'a mut TrackedWrite<W>)> {
1004 None
1005 }
1006}
1007
1008impl<W: Write + Send> PageWriter for SerializedPageWriter<'_, W> {
1009 fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
1010 let page = match self.page_encryptor_mut() {
1011 Some(page_encryptor) => page_encryptor.encrypt_compressed_page(page)?,
1012 None => page,
1013 };
1014
1015 let page_type = page.page_type();
1016 let start_pos = self.sink.bytes_written() as u64;
1017
1018 let page_header = page.to_thrift_header()?;
1019 let header_size = self.serialize_page_header(page_header)?;
1020
1021 self.sink.write_all(page.data())?;
1022
1023 let mut spec = PageWriteSpec::new();
1024 spec.page_type = page_type;
1025 spec.uncompressed_size = page.uncompressed_size() + header_size;
1026 spec.compressed_size = page.compressed_size() + header_size;
1027 spec.offset = start_pos;
1028 spec.bytes_written = self.sink.bytes_written() as u64 - start_pos;
1029 spec.num_values = page.num_values();
1030
1031 if let Some(page_encryptor) = self.page_encryptor_mut() {
1032 if page.compressed_page().is_data_page() {
1033 page_encryptor.increment_page();
1034 }
1035 }
1036 Ok(spec)
1037 }
1038
1039 fn close(&mut self) -> Result<()> {
1040 self.sink.flush()?;
1041 Ok(())
1042 }
1043}
1044
1045#[cfg(feature = "encryption")]
1048pub(crate) fn get_file_magic(
1049 file_encryption_properties: Option<&Arc<FileEncryptionProperties>>,
1050) -> &'static [u8; 4] {
1051 match file_encryption_properties.as_ref() {
1052 Some(encryption_properties) if encryption_properties.encrypt_footer() => {
1053 &PARQUET_MAGIC_ENCR_FOOTER
1054 }
1055 _ => &PARQUET_MAGIC,
1056 }
1057}
1058
1059#[cfg(not(feature = "encryption"))]
1060pub(crate) fn get_file_magic() -> &'static [u8; 4] {
1061 &PARQUET_MAGIC
1062}
1063
1064#[cfg(test)]
1065mod tests {
1066 use super::*;
1067
1068 #[cfg(feature = "arrow")]
1069 use arrow_array::RecordBatchReader;
1070 use bytes::Bytes;
1071 use std::fs::File;
1072
1073 #[cfg(feature = "arrow")]
1074 use crate::arrow::ArrowWriter;
1075 #[cfg(feature = "arrow")]
1076 use crate::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
1077 use crate::basic::{
1078 ColumnOrder, Compression, ConvertedType, Encoding, LogicalType, Repetition, SortOrder, Type,
1079 };
1080 use crate::column::page::{Page, PageReader};
1081 use crate::column::reader::get_typed_column_reader;
1082 use crate::compression::{Codec, CodecOptionsBuilder, create_codec};
1083 use crate::data_type::{BoolType, ByteArrayType, Int32Type};
1084 use crate::file::page_index::column_index::ColumnIndexMetaData;
1085 use crate::file::properties::EnabledStatistics;
1086 use crate::file::serialized_reader::ReadOptionsBuilder;
1087 use crate::file::statistics::{from_thrift_page_stats, page_stats_to_thrift};
1088 use crate::file::{
1089 properties::{ReaderProperties, WriterProperties, WriterVersion},
1090 reader::{FileReader, SerializedFileReader, SerializedPageReader},
1091 statistics::Statistics,
1092 };
1093 use crate::record::{Row, RowAccessor};
1094 use crate::schema::parser::parse_message_type;
1095 use crate::schema::types;
1096 use crate::schema::types::{ColumnDescriptor, ColumnPath};
1097 use crate::util::test_common::file_util::get_test_file;
1098 use crate::util::test_common::rand_gen::RandGen;
1099
1100 #[test]
1101 fn test_row_group_writer_error_not_all_columns_written() {
1102 let file = tempfile::tempfile().unwrap();
1103 let schema = Arc::new(
1104 types::Type::group_type_builder("schema")
1105 .with_fields(vec![Arc::new(
1106 types::Type::primitive_type_builder("col1", Type::INT32)
1107 .build()
1108 .unwrap(),
1109 )])
1110 .build()
1111 .unwrap(),
1112 );
1113 let props = Default::default();
1114 let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
1115 let row_group_writer = writer.next_row_group().unwrap();
1116 let res = row_group_writer.close();
1117 assert!(res.is_err());
1118 if let Err(err) = res {
1119 assert_eq!(
1120 format!("{err}"),
1121 "Parquet error: Column length mismatch: 1 != 0"
1122 );
1123 }
1124 }
1125
1126 #[test]
1127 fn test_row_group_writer_num_records_mismatch() {
1128 let file = tempfile::tempfile().unwrap();
1129 let schema = Arc::new(
1130 types::Type::group_type_builder("schema")
1131 .with_fields(vec![
1132 Arc::new(
1133 types::Type::primitive_type_builder("col1", Type::INT32)
1134 .with_repetition(Repetition::REQUIRED)
1135 .build()
1136 .unwrap(),
1137 ),
1138 Arc::new(
1139 types::Type::primitive_type_builder("col2", Type::INT32)
1140 .with_repetition(Repetition::REQUIRED)
1141 .build()
1142 .unwrap(),
1143 ),
1144 ])
1145 .build()
1146 .unwrap(),
1147 );
1148 let props = Default::default();
1149 let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
1150 let mut row_group_writer = writer.next_row_group().unwrap();
1151
1152 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
1153 col_writer
1154 .typed::<Int32Type>()
1155 .write_batch(&[1, 2, 3], None, None)
1156 .unwrap();
1157 col_writer.close().unwrap();
1158
1159 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
1160 col_writer
1161 .typed::<Int32Type>()
1162 .write_batch(&[1, 2], None, None)
1163 .unwrap();
1164
1165 let err = col_writer.close().unwrap_err();
1166 assert_eq!(
1167 err.to_string(),
1168 "Parquet error: Incorrect number of rows, expected 3 != 2 rows"
1169 );
1170 }
1171
1172 #[test]
1173 fn test_file_writer_empty_file() {
1174 let file = tempfile::tempfile().unwrap();
1175
1176 let schema = Arc::new(
1177 types::Type::group_type_builder("schema")
1178 .with_fields(vec![Arc::new(
1179 types::Type::primitive_type_builder("col1", Type::INT32)
1180 .build()
1181 .unwrap(),
1182 )])
1183 .build()
1184 .unwrap(),
1185 );
1186 let props = Default::default();
1187 let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1188 writer.close().unwrap();
1189
1190 let reader = SerializedFileReader::new(file).unwrap();
1191 assert_eq!(reader.get_row_iter(None).unwrap().count(), 0);
1192 }
1193
1194 #[test]
1195 fn test_file_writer_column_orders_populated() {
1196 let file = tempfile::tempfile().unwrap();
1197
1198 let schema = Arc::new(
1199 types::Type::group_type_builder("schema")
1200 .with_fields(vec![
1201 Arc::new(
1202 types::Type::primitive_type_builder("col1", Type::INT32)
1203 .build()
1204 .unwrap(),
1205 ),
1206 Arc::new(
1207 types::Type::primitive_type_builder("col2", Type::FIXED_LEN_BYTE_ARRAY)
1208 .with_converted_type(ConvertedType::INTERVAL)
1209 .with_length(12)
1210 .build()
1211 .unwrap(),
1212 ),
1213 Arc::new(
1214 types::Type::group_type_builder("nested")
1215 .with_repetition(Repetition::REQUIRED)
1216 .with_fields(vec![
1217 Arc::new(
1218 types::Type::primitive_type_builder(
1219 "col3",
1220 Type::FIXED_LEN_BYTE_ARRAY,
1221 )
1222 .with_logical_type(Some(LogicalType::Float16))
1223 .with_length(2)
1224 .build()
1225 .unwrap(),
1226 ),
1227 Arc::new(
1228 types::Type::primitive_type_builder("col4", Type::BYTE_ARRAY)
1229 .with_logical_type(Some(LogicalType::String))
1230 .build()
1231 .unwrap(),
1232 ),
1233 ])
1234 .build()
1235 .unwrap(),
1236 ),
1237 ])
1238 .build()
1239 .unwrap(),
1240 );
1241
1242 let props = Default::default();
1243 let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1244 writer.close().unwrap();
1245
1246 let reader = SerializedFileReader::new(file).unwrap();
1247
1248 let expected = vec![
1250 ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED),
1252 ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::UNDEFINED),
1254 ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED),
1256 ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::UNSIGNED),
1258 ];
1259 let actual = reader.metadata().file_metadata().column_orders();
1260
1261 assert!(actual.is_some());
1262 let actual = actual.unwrap();
1263 assert_eq!(*actual, expected);
1264 }
1265
1266 #[test]
1267 fn test_file_writer_with_metadata() {
1268 let file = tempfile::tempfile().unwrap();
1269
1270 let schema = Arc::new(
1271 types::Type::group_type_builder("schema")
1272 .with_fields(vec![Arc::new(
1273 types::Type::primitive_type_builder("col1", Type::INT32)
1274 .build()
1275 .unwrap(),
1276 )])
1277 .build()
1278 .unwrap(),
1279 );
1280 let props = Arc::new(
1281 WriterProperties::builder()
1282 .set_key_value_metadata(Some(vec![KeyValue::new(
1283 "key".to_string(),
1284 "value".to_string(),
1285 )]))
1286 .build(),
1287 );
1288 let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1289 writer.close().unwrap();
1290
1291 let reader = SerializedFileReader::new(file).unwrap();
1292 assert_eq!(
1293 reader
1294 .metadata()
1295 .file_metadata()
1296 .key_value_metadata()
1297 .to_owned()
1298 .unwrap()
1299 .len(),
1300 1
1301 );
1302 }
1303
1304 #[test]
1305 fn test_file_writer_v2_with_metadata() {
1306 let file = tempfile::tempfile().unwrap();
1307 let field_logical_type = Some(LogicalType::integer(8, false));
1308 let field = Arc::new(
1309 types::Type::primitive_type_builder("col1", Type::INT32)
1310 .with_logical_type(field_logical_type.clone())
1311 .with_converted_type(field_logical_type.into())
1312 .build()
1313 .unwrap(),
1314 );
1315 let schema = Arc::new(
1316 types::Type::group_type_builder("schema")
1317 .with_fields(vec![field.clone()])
1318 .build()
1319 .unwrap(),
1320 );
1321 let props = Arc::new(
1322 WriterProperties::builder()
1323 .set_key_value_metadata(Some(vec![KeyValue::new(
1324 "key".to_string(),
1325 "value".to_string(),
1326 )]))
1327 .set_writer_version(WriterVersion::PARQUET_2_0)
1328 .build(),
1329 );
1330 let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1331 writer.close().unwrap();
1332
1333 let reader = SerializedFileReader::new(file).unwrap();
1334
1335 assert_eq!(
1336 reader
1337 .metadata()
1338 .file_metadata()
1339 .key_value_metadata()
1340 .to_owned()
1341 .unwrap()
1342 .len(),
1343 1
1344 );
1345
1346 let fields = reader.metadata().file_metadata().schema().get_fields();
1348 assert_eq!(fields.len(), 1);
1349 assert_eq!(fields[0], field);
1350 }
1351
1352 #[test]
1353 fn test_file_writer_with_sorting_columns_metadata() {
1354 let file = tempfile::tempfile().unwrap();
1355
1356 let schema = Arc::new(
1357 types::Type::group_type_builder("schema")
1358 .with_fields(vec![
1359 Arc::new(
1360 types::Type::primitive_type_builder("col1", Type::INT32)
1361 .build()
1362 .unwrap(),
1363 ),
1364 Arc::new(
1365 types::Type::primitive_type_builder("col2", Type::INT32)
1366 .build()
1367 .unwrap(),
1368 ),
1369 ])
1370 .build()
1371 .unwrap(),
1372 );
1373 let expected_result = Some(vec![SortingColumn {
1374 column_idx: 0,
1375 descending: false,
1376 nulls_first: true,
1377 }]);
1378 let props = Arc::new(
1379 WriterProperties::builder()
1380 .set_key_value_metadata(Some(vec![KeyValue::new(
1381 "key".to_string(),
1382 "value".to_string(),
1383 )]))
1384 .set_sorting_columns(expected_result.clone())
1385 .build(),
1386 );
1387 let mut writer =
1388 SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1389 let mut row_group_writer = writer.next_row_group().expect("get row group writer");
1390
1391 let col_writer = row_group_writer.next_column().unwrap().unwrap();
1392 col_writer.close().unwrap();
1393
1394 let col_writer = row_group_writer.next_column().unwrap().unwrap();
1395 col_writer.close().unwrap();
1396
1397 row_group_writer.close().unwrap();
1398 writer.close().unwrap();
1399
1400 let reader = SerializedFileReader::new(file).unwrap();
1401 let result: Vec<Option<&Vec<SortingColumn>>> = reader
1402 .metadata()
1403 .row_groups()
1404 .iter()
1405 .map(|f| f.sorting_columns())
1406 .collect();
1407 assert_eq!(expected_result.as_ref(), result[0]);
1409 }
1410
1411 #[test]
1412 fn test_file_writer_empty_row_groups() {
1413 let file = tempfile::tempfile().unwrap();
1414 test_file_roundtrip(file, vec![]);
1415 }
1416
1417 #[test]
1418 fn test_file_writer_single_row_group() {
1419 let file = tempfile::tempfile().unwrap();
1420 test_file_roundtrip(file, vec![vec![1, 2, 3, 4, 5]]);
1421 }
1422
1423 #[test]
1424 fn test_file_writer_multiple_row_groups() {
1425 let file = tempfile::tempfile().unwrap();
1426 test_file_roundtrip(
1427 file,
1428 vec![
1429 vec![1, 2, 3, 4, 5],
1430 vec![1, 2, 3],
1431 vec![1],
1432 vec![1, 2, 3, 4, 5, 6],
1433 ],
1434 );
1435 }
1436
1437 #[test]
1438 fn test_file_writer_multiple_large_row_groups() {
1439 let file = tempfile::tempfile().unwrap();
1440 test_file_roundtrip(
1441 file,
1442 vec![vec![123; 1024], vec![124; 1000], vec![125; 15], vec![]],
1443 );
1444 }
1445
1446 #[test]
1447 fn test_page_writer_data_pages() {
1448 let pages = [
1449 Page::DataPage {
1450 buf: Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]),
1451 num_values: 10,
1452 encoding: Encoding::DELTA_BINARY_PACKED,
1453 def_level_encoding: Encoding::RLE,
1454 rep_level_encoding: Encoding::RLE,
1455 statistics: Some(Statistics::int32(Some(1), Some(3), None, Some(7), true)),
1456 },
1457 Page::DataPageV2 {
1458 buf: Bytes::from(vec![4; 128]),
1459 num_values: 10,
1460 encoding: Encoding::DELTA_BINARY_PACKED,
1461 num_nulls: 2,
1462 num_rows: 12,
1463 def_levels_byte_len: 24,
1464 rep_levels_byte_len: 32,
1465 is_compressed: false,
1466 statistics: Some(Statistics::int32(Some(1), Some(3), None, Some(7), true)),
1467 },
1468 ];
1469
1470 test_page_roundtrip(&pages[..], Compression::SNAPPY, Type::INT32);
1471 test_page_roundtrip(&pages[..], Compression::UNCOMPRESSED, Type::INT32);
1472 }
1473
1474 #[test]
1475 fn test_page_writer_dict_pages() {
1476 let pages = [
1477 Page::DictionaryPage {
1478 buf: Bytes::from(vec![1, 2, 3, 4, 5]),
1479 num_values: 5,
1480 encoding: Encoding::RLE_DICTIONARY,
1481 is_sorted: false,
1482 },
1483 Page::DataPage {
1484 buf: Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]),
1485 num_values: 10,
1486 encoding: Encoding::DELTA_BINARY_PACKED,
1487 def_level_encoding: Encoding::RLE,
1488 rep_level_encoding: Encoding::RLE,
1489 statistics: Some(Statistics::int32(Some(1), Some(3), None, Some(7), true)),
1490 },
1491 Page::DataPageV2 {
1492 buf: Bytes::from(vec![4; 128]),
1493 num_values: 10,
1494 encoding: Encoding::DELTA_BINARY_PACKED,
1495 num_nulls: 2,
1496 num_rows: 12,
1497 def_levels_byte_len: 24,
1498 rep_levels_byte_len: 32,
1499 is_compressed: false,
1500 statistics: None,
1501 },
1502 ];
1503
1504 test_page_roundtrip(&pages[..], Compression::SNAPPY, Type::INT32);
1505 test_page_roundtrip(&pages[..], Compression::UNCOMPRESSED, Type::INT32);
1506 }
1507
1508 fn test_page_roundtrip(pages: &[Page], codec: Compression, physical_type: Type) {
1512 let mut compressed_pages = vec![];
1513 let mut total_num_values = 0i64;
1514 let codec_options = CodecOptionsBuilder::default()
1515 .set_backward_compatible_lz4(false)
1516 .build();
1517 let mut compressor = create_codec(codec, &codec_options).unwrap();
1518
1519 for page in pages {
1520 let uncompressed_len = page.buffer().len();
1521
1522 let compressed_page = match *page {
1523 Page::DataPage {
1524 ref buf,
1525 num_values,
1526 encoding,
1527 def_level_encoding,
1528 rep_level_encoding,
1529 ref statistics,
1530 } => {
1531 total_num_values += num_values as i64;
1532 let output_buf = compress_helper(compressor.as_mut(), buf);
1533
1534 Page::DataPage {
1535 buf: Bytes::from(output_buf),
1536 num_values,
1537 encoding,
1538 def_level_encoding,
1539 rep_level_encoding,
1540 statistics: from_thrift_page_stats(
1541 physical_type,
1542 page_stats_to_thrift(statistics.as_ref()),
1543 )
1544 .unwrap(),
1545 }
1546 }
1547 Page::DataPageV2 {
1548 ref buf,
1549 num_values,
1550 encoding,
1551 num_nulls,
1552 num_rows,
1553 def_levels_byte_len,
1554 rep_levels_byte_len,
1555 ref statistics,
1556 ..
1557 } => {
1558 total_num_values += num_values as i64;
1559 let offset = (def_levels_byte_len + rep_levels_byte_len) as usize;
1560 let cmp_buf = compress_helper(compressor.as_mut(), &buf[offset..]);
1561 let mut output_buf = Vec::from(&buf[..offset]);
1562 output_buf.extend_from_slice(&cmp_buf[..]);
1563
1564 Page::DataPageV2 {
1565 buf: Bytes::from(output_buf),
1566 num_values,
1567 encoding,
1568 num_nulls,
1569 num_rows,
1570 def_levels_byte_len,
1571 rep_levels_byte_len,
1572 is_compressed: compressor.is_some(),
1573 statistics: from_thrift_page_stats(
1574 physical_type,
1575 page_stats_to_thrift(statistics.as_ref()),
1576 )
1577 .unwrap(),
1578 }
1579 }
1580 Page::DictionaryPage {
1581 ref buf,
1582 num_values,
1583 encoding,
1584 is_sorted,
1585 } => {
1586 let output_buf = compress_helper(compressor.as_mut(), buf);
1587
1588 Page::DictionaryPage {
1589 buf: Bytes::from(output_buf),
1590 num_values,
1591 encoding,
1592 is_sorted,
1593 }
1594 }
1595 };
1596
1597 let compressed_page = CompressedPage::new(compressed_page, uncompressed_len);
1598 compressed_pages.push(compressed_page);
1599 }
1600
1601 let mut buffer: Vec<u8> = vec![];
1602 let mut result_pages: Vec<Page> = vec![];
1603 {
1604 let mut writer = TrackedWrite::new(&mut buffer);
1605 let mut page_writer = SerializedPageWriter::new(&mut writer);
1606
1607 for page in compressed_pages {
1608 page_writer.write_page(page).unwrap();
1609 }
1610 page_writer.close().unwrap();
1611 }
1612 {
1613 let reader = bytes::Bytes::from(buffer);
1614
1615 let t = types::Type::primitive_type_builder("t", physical_type)
1616 .build()
1617 .unwrap();
1618
1619 let desc = ColumnDescriptor::new(Arc::new(t), 0, 0, ColumnPath::new(vec![]));
1620 let meta = ColumnChunkMetaData::builder(Arc::new(desc))
1621 .set_compression_codec(codec.into())
1622 .set_total_compressed_size(reader.len() as i64)
1623 .set_num_values(total_num_values)
1624 .build()
1625 .unwrap();
1626
1627 let props = ReaderProperties::builder()
1628 .set_backward_compatible_lz4(false)
1629 .set_read_page_statistics(true)
1630 .build();
1631 let mut page_reader = SerializedPageReader::new_with_properties(
1632 Arc::new(reader),
1633 &meta,
1634 total_num_values as usize,
1635 None,
1636 Arc::new(props),
1637 )
1638 .unwrap();
1639
1640 while let Some(page) = page_reader.get_next_page().unwrap() {
1641 result_pages.push(page);
1642 }
1643 }
1644
1645 assert_eq!(result_pages.len(), pages.len());
1646 for i in 0..result_pages.len() {
1647 assert_page(&result_pages[i], &pages[i]);
1648 }
1649 }
1650
1651 fn compress_helper(compressor: Option<&mut Box<dyn Codec>>, data: &[u8]) -> Vec<u8> {
1653 let mut output_buf = vec![];
1654 if let Some(cmpr) = compressor {
1655 cmpr.compress(data, &mut output_buf).unwrap();
1656 } else {
1657 output_buf.extend_from_slice(data);
1658 }
1659 output_buf
1660 }
1661
1662 fn assert_page(left: &Page, right: &Page) {
1664 assert_eq!(left.page_type(), right.page_type());
1665 assert_eq!(&left.buffer(), &right.buffer());
1666 assert_eq!(left.num_values(), right.num_values());
1667 assert_eq!(left.encoding(), right.encoding());
1668 assert_eq!(
1669 page_stats_to_thrift(left.statistics()),
1670 page_stats_to_thrift(right.statistics())
1671 );
1672 }
1673
1674 fn test_roundtrip_i32<W, R>(
1676 file: W,
1677 data: Vec<Vec<i32>>,
1678 compression: Compression,
1679 ) -> ParquetMetaData
1680 where
1681 W: Write + Send,
1682 R: ChunkReader + From<W> + 'static,
1683 {
1684 test_roundtrip::<W, R, Int32Type, _>(file, data, |r| r.get_int(0).unwrap(), compression)
1685 }
1686
1687 fn test_roundtrip<W, R, D, F>(
1690 mut file: W,
1691 data: Vec<Vec<D::T>>,
1692 value: F,
1693 compression: Compression,
1694 ) -> ParquetMetaData
1695 where
1696 W: Write + Send,
1697 R: ChunkReader + From<W> + 'static,
1698 D: DataType,
1699 F: Fn(Row) -> D::T,
1700 {
1701 let schema = Arc::new(
1702 types::Type::group_type_builder("schema")
1703 .with_fields(vec![Arc::new(
1704 types::Type::primitive_type_builder("col1", D::get_physical_type())
1705 .with_repetition(Repetition::REQUIRED)
1706 .build()
1707 .unwrap(),
1708 )])
1709 .build()
1710 .unwrap(),
1711 );
1712 let props = Arc::new(
1713 WriterProperties::builder()
1714 .set_compression(compression)
1715 .build(),
1716 );
1717 let mut file_writer = SerializedFileWriter::new(&mut file, schema, props).unwrap();
1718 let mut rows: i64 = 0;
1719
1720 for (idx, subset) in data.iter().enumerate() {
1721 let row_group_file_offset = file_writer.buf.bytes_written();
1722 let mut row_group_writer = file_writer.next_row_group().unwrap();
1723 if let Some(mut writer) = row_group_writer.next_column().unwrap() {
1724 rows += writer
1725 .typed::<D>()
1726 .write_batch(&subset[..], None, None)
1727 .unwrap() as i64;
1728 writer.close().unwrap();
1729 }
1730 let last_group = row_group_writer.close().unwrap();
1731 let flushed = file_writer.flushed_row_groups();
1732 assert_eq!(flushed.len(), idx + 1);
1733 assert_eq!(Some(idx as i16), last_group.ordinal());
1734 assert_eq!(Some(row_group_file_offset as i64), last_group.file_offset());
1735 assert_eq!(&flushed[idx], last_group.as_ref());
1736 }
1737 let file_metadata = file_writer.close().unwrap();
1738
1739 let reader = SerializedFileReader::new(R::from(file)).unwrap();
1740 assert_eq!(reader.num_row_groups(), data.len());
1741 assert_eq!(
1742 reader.metadata().file_metadata().num_rows(),
1743 rows,
1744 "row count in metadata not equal to number of rows written"
1745 );
1746 for (i, item) in data.iter().enumerate().take(reader.num_row_groups()) {
1747 let row_group_reader = reader.get_row_group(i).unwrap();
1748 let iter = row_group_reader.get_row_iter(None).unwrap();
1749 let res: Vec<_> = iter.map(|row| row.unwrap()).map(&value).collect();
1750 let row_group_size = row_group_reader.metadata().total_byte_size();
1751 let uncompressed_size: i64 = row_group_reader
1752 .metadata()
1753 .columns()
1754 .iter()
1755 .map(|v| v.uncompressed_size())
1756 .sum();
1757 assert_eq!(row_group_size, uncompressed_size);
1758 assert_eq!(res, *item);
1759 }
1760 file_metadata
1761 }
1762
1763 fn test_file_roundtrip(file: File, data: Vec<Vec<i32>>) -> ParquetMetaData {
1766 test_roundtrip_i32::<File, File>(file, data, Compression::UNCOMPRESSED)
1767 }
1768
1769 #[test]
1770 fn test_bytes_writer_empty_row_groups() {
1771 test_bytes_roundtrip(vec![], Compression::UNCOMPRESSED);
1772 }
1773
1774 #[test]
1775 fn test_bytes_writer_single_row_group() {
1776 test_bytes_roundtrip(vec![vec![1, 2, 3, 4, 5]], Compression::UNCOMPRESSED);
1777 }
1778
1779 #[test]
1780 fn test_bytes_writer_multiple_row_groups() {
1781 test_bytes_roundtrip(
1782 vec![
1783 vec![1, 2, 3, 4, 5],
1784 vec![1, 2, 3],
1785 vec![1],
1786 vec![1, 2, 3, 4, 5, 6],
1787 ],
1788 Compression::UNCOMPRESSED,
1789 );
1790 }
1791
1792 #[test]
1793 fn test_bytes_writer_single_row_group_compressed() {
1794 test_bytes_roundtrip(vec![vec![1, 2, 3, 4, 5]], Compression::SNAPPY);
1795 }
1796
1797 #[test]
1798 fn test_bytes_writer_multiple_row_groups_compressed() {
1799 test_bytes_roundtrip(
1800 vec![
1801 vec![1, 2, 3, 4, 5],
1802 vec![1, 2, 3],
1803 vec![1],
1804 vec![1, 2, 3, 4, 5, 6],
1805 ],
1806 Compression::SNAPPY,
1807 );
1808 }
1809
1810 fn test_bytes_roundtrip(data: Vec<Vec<i32>>, compression: Compression) {
1811 test_roundtrip_i32::<Vec<u8>, Bytes>(Vec::with_capacity(1024), data, compression);
1812 }
1813
1814 #[test]
1815 fn test_boolean_roundtrip() {
1816 let my_bool_values: Vec<_> = (0..2049).map(|idx| idx % 2 == 0).collect();
1817 test_roundtrip::<Vec<u8>, Bytes, BoolType, _>(
1818 Vec::with_capacity(1024),
1819 vec![my_bool_values],
1820 |r| r.get_bool(0).unwrap(),
1821 Compression::UNCOMPRESSED,
1822 );
1823 }
1824
1825 #[test]
1826 fn test_boolean_compressed_roundtrip() {
1827 let my_bool_values: Vec<_> = (0..2049).map(|idx| idx % 2 == 0).collect();
1828 test_roundtrip::<Vec<u8>, Bytes, BoolType, _>(
1829 Vec::with_capacity(1024),
1830 vec![my_bool_values],
1831 |r| r.get_bool(0).unwrap(),
1832 Compression::SNAPPY,
1833 );
1834 }
1835
1836 #[test]
1837 fn test_column_offset_index_file() {
1838 let file = tempfile::tempfile().unwrap();
1839 let file_metadata = test_file_roundtrip(file, vec![vec![1, 2, 3, 4, 5]]);
1840 file_metadata.row_groups().iter().for_each(|row_group| {
1841 row_group.columns().iter().for_each(|column_chunk| {
1842 assert!(column_chunk.column_index_offset().is_some());
1843 assert!(column_chunk.column_index_length().is_some());
1844 assert!(column_chunk.offset_index_offset().is_some());
1845 assert!(column_chunk.offset_index_length().is_some());
1846 })
1847 });
1848 }
1849
1850 fn test_kv_metadata(initial_kv: Option<Vec<KeyValue>>, final_kv: Option<Vec<KeyValue>>) {
1851 let schema = Arc::new(
1852 types::Type::group_type_builder("schema")
1853 .with_fields(vec![Arc::new(
1854 types::Type::primitive_type_builder("col1", Type::INT32)
1855 .with_repetition(Repetition::REQUIRED)
1856 .build()
1857 .unwrap(),
1858 )])
1859 .build()
1860 .unwrap(),
1861 );
1862 let mut out = Vec::with_capacity(1024);
1863 let props = Arc::new(
1864 WriterProperties::builder()
1865 .set_key_value_metadata(initial_kv.clone())
1866 .build(),
1867 );
1868 let mut writer = SerializedFileWriter::new(&mut out, schema, props).unwrap();
1869 let mut row_group_writer = writer.next_row_group().unwrap();
1870 let column = row_group_writer.next_column().unwrap().unwrap();
1871 column.close().unwrap();
1872 row_group_writer.close().unwrap();
1873 if let Some(kvs) = &final_kv {
1874 for kv in kvs {
1875 writer.append_key_value_metadata(kv.clone())
1876 }
1877 }
1878 writer.close().unwrap();
1879
1880 let reader = SerializedFileReader::new(Bytes::from(out)).unwrap();
1881 let metadata = reader.metadata().file_metadata();
1882 let keys = metadata.key_value_metadata();
1883
1884 match (initial_kv, final_kv) {
1885 (Some(a), Some(b)) => {
1886 let keys = keys.unwrap();
1887 assert_eq!(keys.len(), a.len() + b.len());
1888 assert_eq!(&keys[..a.len()], a.as_slice());
1889 assert_eq!(&keys[a.len()..], b.as_slice());
1890 }
1891 (Some(v), None) => assert_eq!(keys.unwrap(), &v),
1892 (None, Some(v)) if !v.is_empty() => assert_eq!(keys.unwrap(), &v),
1893 _ => assert!(keys.is_none()),
1894 }
1895 }
1896
1897 #[test]
1898 fn test_append_metadata() {
1899 let kv1 = KeyValue::new("cupcakes".to_string(), "awesome".to_string());
1900 let kv2 = KeyValue::new("bingo".to_string(), "bongo".to_string());
1901
1902 test_kv_metadata(None, None);
1903 test_kv_metadata(Some(vec![kv1.clone()]), None);
1904 test_kv_metadata(None, Some(vec![kv2.clone()]));
1905 test_kv_metadata(Some(vec![kv1.clone()]), Some(vec![kv2.clone()]));
1906 test_kv_metadata(Some(vec![]), Some(vec![kv2]));
1907 test_kv_metadata(Some(vec![]), Some(vec![]));
1908 test_kv_metadata(Some(vec![kv1]), Some(vec![]));
1909 test_kv_metadata(None, Some(vec![]));
1910 }
1911
1912 #[test]
1913 fn test_backwards_compatible_statistics() {
1914 let message_type = "
1915 message test_schema {
1916 REQUIRED INT32 decimal1 (DECIMAL(8,2));
1917 REQUIRED INT32 i32 (INTEGER(32,true));
1918 REQUIRED INT32 u32 (INTEGER(32,false));
1919 }
1920 ";
1921
1922 let schema = Arc::new(parse_message_type(message_type).unwrap());
1923 let props = Default::default();
1924 let mut writer = SerializedFileWriter::new(vec![], schema, props).unwrap();
1925 let mut row_group_writer = writer.next_row_group().unwrap();
1926
1927 for _ in 0..3 {
1928 let mut writer = row_group_writer.next_column().unwrap().unwrap();
1929 writer
1930 .typed::<Int32Type>()
1931 .write_batch(&[1, 2, 3], None, None)
1932 .unwrap();
1933 writer.close().unwrap();
1934 }
1935 let metadata = row_group_writer.close().unwrap();
1936 writer.close().unwrap();
1937
1938 let s = page_stats_to_thrift(metadata.column(0).statistics()).unwrap();
1940 assert_eq!(s.min.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1941 assert_eq!(s.max.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1942 assert_eq!(s.min_value.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1943 assert_eq!(s.max_value.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1944
1945 let s = page_stats_to_thrift(metadata.column(1).statistics()).unwrap();
1947 assert_eq!(s.min.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1948 assert_eq!(s.max.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1949 assert_eq!(s.min_value.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1950 assert_eq!(s.max_value.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1951
1952 let s = page_stats_to_thrift(metadata.column(2).statistics()).unwrap();
1954 assert_eq!(s.min.as_deref(), None);
1955 assert_eq!(s.max.as_deref(), None);
1956 assert_eq!(s.min_value.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1957 assert_eq!(s.max_value.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1958 }
1959
1960 #[test]
1961 fn test_spliced_write() {
1962 let message_type = "
1963 message test_schema {
1964 REQUIRED INT32 i32 (INTEGER(32,true));
1965 REQUIRED INT32 u32 (INTEGER(32,false));
1966 }
1967 ";
1968 let schema = Arc::new(parse_message_type(message_type).unwrap());
1969 let props = Arc::new(WriterProperties::builder().build());
1970
1971 let mut file = Vec::with_capacity(1024);
1972 let mut file_writer = SerializedFileWriter::new(&mut file, schema, props.clone()).unwrap();
1973
1974 let columns = file_writer.descr.columns();
1975 let mut column_state: Vec<(_, Option<ColumnCloseResult>)> = columns
1976 .iter()
1977 .map(|_| (TrackedWrite::new(Vec::with_capacity(1024)), None))
1978 .collect();
1979
1980 let mut column_state_slice = column_state.as_mut_slice();
1981 let mut column_writers = Vec::with_capacity(columns.len());
1982 for c in columns {
1983 let ((buf, out), tail) = column_state_slice.split_first_mut().unwrap();
1984 column_state_slice = tail;
1985
1986 let page_writer = Box::new(SerializedPageWriter::new(buf));
1987 let col_writer = get_column_writer(c.clone(), props.clone(), page_writer);
1988 column_writers.push(SerializedColumnWriter::new(
1989 col_writer,
1990 Some(Box::new(|on_close| {
1991 *out = Some(on_close);
1992 Ok(())
1993 })),
1994 ));
1995 }
1996
1997 let column_data = [[1, 2, 3, 4], [7, 3, 7, 3]];
1998
1999 for (writer, batch) in column_writers.iter_mut().zip(column_data) {
2001 let writer = writer.typed::<Int32Type>();
2002 writer.write_batch(&batch, None, None).unwrap();
2003 }
2004
2005 for writer in column_writers {
2007 writer.close().unwrap()
2008 }
2009
2010 let mut row_group_writer = file_writer.next_row_group().unwrap();
2012 for (write, close) in column_state {
2013 let buf = Bytes::from(write.into_inner().unwrap());
2014 row_group_writer
2015 .append_column(&buf, close.unwrap())
2016 .unwrap();
2017 }
2018 row_group_writer.close().unwrap();
2019 file_writer.close().unwrap();
2020
2021 let file = Bytes::from(file);
2023 let test_read = |reader: SerializedFileReader<Bytes>| {
2024 let row_group = reader.get_row_group(0).unwrap();
2025
2026 let mut out = Vec::with_capacity(4);
2027 let c1 = row_group.get_column_reader(0).unwrap();
2028 let mut c1 = get_typed_column_reader::<Int32Type>(c1);
2029 c1.read_records(4, None, None, &mut out).unwrap();
2030 assert_eq!(out, column_data[0]);
2031
2032 out.clear();
2033
2034 let c2 = row_group.get_column_reader(1).unwrap();
2035 let mut c2 = get_typed_column_reader::<Int32Type>(c2);
2036 c2.read_records(4, None, None, &mut out).unwrap();
2037 assert_eq!(out, column_data[1]);
2038 };
2039
2040 let reader = SerializedFileReader::new(file.clone()).unwrap();
2041 test_read(reader);
2042
2043 let options = ReadOptionsBuilder::new().with_page_index().build();
2044 let reader = SerializedFileReader::new_with_options(file, options).unwrap();
2045 test_read(reader);
2046 }
2047
2048 #[test]
2049 fn test_disabled_statistics() {
2050 let message_type = "
2051 message test_schema {
2052 REQUIRED INT32 a;
2053 REQUIRED INT32 b;
2054 }
2055 ";
2056 let schema = Arc::new(parse_message_type(message_type).unwrap());
2057 let props = WriterProperties::builder()
2058 .set_statistics_enabled(EnabledStatistics::None)
2059 .set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
2060 .set_offset_index_disabled(true) .build();
2062 let mut file = Vec::with_capacity(1024);
2063 let mut file_writer =
2064 SerializedFileWriter::new(&mut file, schema, Arc::new(props)).unwrap();
2065
2066 let mut row_group_writer = file_writer.next_row_group().unwrap();
2067 let mut a_writer = row_group_writer.next_column().unwrap().unwrap();
2068 let col_writer = a_writer.typed::<Int32Type>();
2069 col_writer.write_batch(&[1, 2, 3], None, None).unwrap();
2070 a_writer.close().unwrap();
2071
2072 let mut b_writer = row_group_writer.next_column().unwrap().unwrap();
2073 let col_writer = b_writer.typed::<Int32Type>();
2074 col_writer.write_batch(&[4, 5, 6], None, None).unwrap();
2075 b_writer.close().unwrap();
2076 row_group_writer.close().unwrap();
2077
2078 let metadata = file_writer.finish().unwrap();
2079 assert_eq!(metadata.num_row_groups(), 1);
2080 let row_group = metadata.row_group(0);
2081 assert_eq!(row_group.num_columns(), 2);
2082 assert!(row_group.column(0).offset_index_offset().is_some());
2084 assert!(row_group.column(0).column_index_offset().is_some());
2085 assert!(row_group.column(1).offset_index_offset().is_some());
2087 assert!(row_group.column(1).column_index_offset().is_none());
2088
2089 let err = file_writer.next_row_group().err().unwrap().to_string();
2090 assert_eq!(err, "Parquet error: SerializedFileWriter already finished");
2091
2092 drop(file_writer);
2093
2094 let options = ReadOptionsBuilder::new().with_page_index().build();
2095 let reader = SerializedFileReader::new_with_options(Bytes::from(file), options).unwrap();
2096
2097 let offset_index = reader.metadata().offset_index().unwrap();
2098 assert_eq!(offset_index.len(), 1); assert_eq!(offset_index[0].len(), 2); let column_index = reader.metadata().column_index().unwrap();
2102 assert_eq!(column_index.len(), 1); assert_eq!(column_index[0].len(), 2); let a_idx = &column_index[0][0];
2106 assert!(matches!(a_idx, ColumnIndexMetaData::INT32(_)), "{a_idx:?}");
2107 let b_idx = &column_index[0][1];
2108 assert!(matches!(b_idx, ColumnIndexMetaData::NONE), "{b_idx:?}");
2109 }
2110
2111 #[test]
2112 fn test_byte_array_size_statistics() {
2113 let message_type = "
2114 message test_schema {
2115 OPTIONAL BYTE_ARRAY a (UTF8);
2116 }
2117 ";
2118 let schema = Arc::new(parse_message_type(message_type).unwrap());
2119 let data = ByteArrayType::gen_vec(32, 7);
2120 let def_levels = [1, 1, 1, 1, 0, 1, 0, 1, 0, 1];
2121 let unenc_size: i64 = data.iter().map(|x| x.len() as i64).sum();
2122 let file: File = tempfile::tempfile().unwrap();
2123 let props = Arc::new(
2124 WriterProperties::builder()
2125 .set_statistics_enabled(EnabledStatistics::Page)
2126 .build(),
2127 );
2128
2129 let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
2130 let mut row_group_writer = writer.next_row_group().unwrap();
2131
2132 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
2133 col_writer
2134 .typed::<ByteArrayType>()
2135 .write_batch(&data, Some(&def_levels), None)
2136 .unwrap();
2137 col_writer.close().unwrap();
2138 row_group_writer.close().unwrap();
2139 let file_metadata = writer.close().unwrap();
2140
2141 assert_eq!(file_metadata.num_row_groups(), 1);
2142 assert_eq!(file_metadata.row_group(0).num_columns(), 1);
2143
2144 let check_def_hist = |def_hist: &[i64]| {
2145 assert_eq!(def_hist.len(), 2);
2146 assert_eq!(def_hist[0], 3);
2147 assert_eq!(def_hist[1], 7);
2148 };
2149
2150 let meta_data = file_metadata.row_group(0).column(0);
2151
2152 assert!(meta_data.repetition_level_histogram().is_none());
2153 assert!(meta_data.definition_level_histogram().is_some());
2154 assert!(meta_data.unencoded_byte_array_data_bytes().is_some());
2155 assert_eq!(
2156 unenc_size,
2157 meta_data.unencoded_byte_array_data_bytes().unwrap()
2158 );
2159 check_def_hist(meta_data.definition_level_histogram().unwrap().values());
2160
2161 let options = ReadOptionsBuilder::new().with_page_index().build();
2163 let reader = SerializedFileReader::new_with_options(file, options).unwrap();
2164
2165 let rfile_metadata = reader.metadata().file_metadata();
2166 assert_eq!(
2167 rfile_metadata.num_rows(),
2168 file_metadata.file_metadata().num_rows()
2169 );
2170 assert_eq!(reader.num_row_groups(), 1);
2171 let rowgroup = reader.get_row_group(0).unwrap();
2172 assert_eq!(rowgroup.num_columns(), 1);
2173 let column = rowgroup.metadata().column(0);
2174 assert!(column.definition_level_histogram().is_some());
2175 assert!(column.repetition_level_histogram().is_none());
2176 assert!(column.unencoded_byte_array_data_bytes().is_some());
2177 check_def_hist(column.definition_level_histogram().unwrap().values());
2178 assert_eq!(
2179 unenc_size,
2180 column.unencoded_byte_array_data_bytes().unwrap()
2181 );
2182
2183 assert!(reader.metadata().column_index().is_some());
2185 let column_index = reader.metadata().column_index().unwrap();
2186 assert_eq!(column_index.len(), 1);
2187 assert_eq!(column_index[0].len(), 1);
2188 let col_idx = if let ColumnIndexMetaData::BYTE_ARRAY(index) = &column_index[0][0] {
2189 assert_eq!(index.num_pages(), 1);
2190 index
2191 } else {
2192 unreachable!()
2193 };
2194
2195 assert!(col_idx.repetition_level_histogram(0).is_none());
2196 assert!(col_idx.definition_level_histogram(0).is_some());
2197 check_def_hist(col_idx.definition_level_histogram(0).unwrap());
2198
2199 assert!(reader.metadata().offset_index().is_some());
2200 let offset_index = reader.metadata().offset_index().unwrap();
2201 assert_eq!(offset_index.len(), 1);
2202 assert_eq!(offset_index[0].len(), 1);
2203 assert!(offset_index[0][0].unencoded_byte_array_data_bytes.is_some());
2204 let page_sizes = offset_index[0][0]
2205 .unencoded_byte_array_data_bytes
2206 .as_ref()
2207 .unwrap();
2208 assert_eq!(page_sizes.len(), 1);
2209 assert_eq!(page_sizes[0], unenc_size);
2210 }
2211
2212 #[test]
2213 fn test_too_many_rowgroups() {
2214 let message_type = "
2215 message test_schema {
2216 REQUIRED BYTE_ARRAY a (UTF8);
2217 }
2218 ";
2219 let schema = Arc::new(parse_message_type(message_type).unwrap());
2220 let file: File = tempfile::tempfile().unwrap();
2221 let props = Arc::new(
2222 WriterProperties::builder()
2223 .set_statistics_enabled(EnabledStatistics::None)
2224 .set_max_row_group_row_count(Some(1))
2225 .build(),
2226 );
2227 let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
2228
2229 for i in 0..0x8001 {
2231 match writer.next_row_group() {
2232 Ok(mut row_group_writer) => {
2233 assert_ne!(i, 0x8000);
2234 let col_writer = row_group_writer.next_column().unwrap().unwrap();
2235 col_writer.close().unwrap();
2236 row_group_writer.close().unwrap();
2237 }
2238 Err(e) => {
2239 assert_eq!(i, 0x8000);
2240 assert_eq!(
2241 e.to_string(),
2242 "Parquet error: Parquet does not support more than 32767 row groups per file (currently: 32768)"
2243 );
2244 }
2245 }
2246 }
2247 writer.close().unwrap();
2248 }
2249
2250 #[test]
2251 fn test_size_statistics_with_repetition_and_nulls() {
2252 let message_type = "
2253 message test_schema {
2254 OPTIONAL group i32_list (LIST) {
2255 REPEATED group list {
2256 OPTIONAL INT32 element;
2257 }
2258 }
2259 }
2260 ";
2261 let schema = Arc::new(parse_message_type(message_type).unwrap());
2268 let data = [1, 2, 4, 7, 8, 9, 10];
2269 let def_levels = [3, 3, 0, 3, 2, 1, 3, 3, 3, 3];
2270 let rep_levels = [0, 1, 0, 0, 1, 0, 0, 1, 1, 1];
2271 let file = tempfile::tempfile().unwrap();
2272 let props = Arc::new(
2273 WriterProperties::builder()
2274 .set_statistics_enabled(EnabledStatistics::Page)
2275 .build(),
2276 );
2277 let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
2278 let mut row_group_writer = writer.next_row_group().unwrap();
2279
2280 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
2281 col_writer
2282 .typed::<Int32Type>()
2283 .write_batch(&data, Some(&def_levels), Some(&rep_levels))
2284 .unwrap();
2285 col_writer.close().unwrap();
2286 row_group_writer.close().unwrap();
2287 let file_metadata = writer.close().unwrap();
2288
2289 assert_eq!(file_metadata.num_row_groups(), 1);
2290 assert_eq!(file_metadata.row_group(0).num_columns(), 1);
2291
2292 let check_def_hist = |def_hist: &[i64]| {
2293 assert_eq!(def_hist.len(), 4);
2294 assert_eq!(def_hist[0], 1);
2295 assert_eq!(def_hist[1], 1);
2296 assert_eq!(def_hist[2], 1);
2297 assert_eq!(def_hist[3], 7);
2298 };
2299
2300 let check_rep_hist = |rep_hist: &[i64]| {
2301 assert_eq!(rep_hist.len(), 2);
2302 assert_eq!(rep_hist[0], 5);
2303 assert_eq!(rep_hist[1], 5);
2304 };
2305
2306 let meta_data = file_metadata.row_group(0).column(0);
2309 assert!(meta_data.repetition_level_histogram().is_some());
2310 assert!(meta_data.definition_level_histogram().is_some());
2311 assert!(meta_data.unencoded_byte_array_data_bytes().is_none());
2312 check_def_hist(meta_data.definition_level_histogram().unwrap().values());
2313 check_rep_hist(meta_data.repetition_level_histogram().unwrap().values());
2314
2315 let options = ReadOptionsBuilder::new().with_page_index().build();
2317 let reader = SerializedFileReader::new_with_options(file, options).unwrap();
2318
2319 let rfile_metadata = reader.metadata().file_metadata();
2320 assert_eq!(
2321 rfile_metadata.num_rows(),
2322 file_metadata.file_metadata().num_rows()
2323 );
2324 assert_eq!(reader.num_row_groups(), 1);
2325 let rowgroup = reader.get_row_group(0).unwrap();
2326 assert_eq!(rowgroup.num_columns(), 1);
2327 let column = rowgroup.metadata().column(0);
2328 assert!(column.definition_level_histogram().is_some());
2329 assert!(column.repetition_level_histogram().is_some());
2330 assert!(column.unencoded_byte_array_data_bytes().is_none());
2331 check_def_hist(column.definition_level_histogram().unwrap().values());
2332 check_rep_hist(column.repetition_level_histogram().unwrap().values());
2333
2334 assert!(reader.metadata().column_index().is_some());
2336 let column_index = reader.metadata().column_index().unwrap();
2337 assert_eq!(column_index.len(), 1);
2338 assert_eq!(column_index[0].len(), 1);
2339 let col_idx = if let ColumnIndexMetaData::INT32(index) = &column_index[0][0] {
2340 assert_eq!(index.num_pages(), 1);
2341 index
2342 } else {
2343 unreachable!()
2344 };
2345
2346 check_def_hist(col_idx.definition_level_histogram(0).unwrap());
2347 check_rep_hist(col_idx.repetition_level_histogram(0).unwrap());
2348
2349 assert!(reader.metadata().offset_index().is_some());
2350 let offset_index = reader.metadata().offset_index().unwrap();
2351 assert_eq!(offset_index.len(), 1);
2352 assert_eq!(offset_index[0].len(), 1);
2353 assert!(offset_index[0][0].unencoded_byte_array_data_bytes.is_none());
2354 }
2355
2356 #[test]
2357 #[cfg(feature = "arrow")]
2358 fn test_byte_stream_split_extended_roundtrip() {
2359 let path = format!(
2360 "{}/byte_stream_split_extended.gzip.parquet",
2361 arrow::util::test_util::parquet_test_data(),
2362 );
2363 let file = File::open(path).unwrap();
2364
2365 let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(file)
2367 .expect("parquet open")
2368 .build()
2369 .expect("parquet open");
2370
2371 let file = tempfile::tempfile().unwrap();
2372 let props = WriterProperties::builder()
2373 .set_dictionary_enabled(false)
2374 .set_column_encoding(
2375 ColumnPath::from("float16_byte_stream_split"),
2376 Encoding::BYTE_STREAM_SPLIT,
2377 )
2378 .set_column_encoding(
2379 ColumnPath::from("float_byte_stream_split"),
2380 Encoding::BYTE_STREAM_SPLIT,
2381 )
2382 .set_column_encoding(
2383 ColumnPath::from("double_byte_stream_split"),
2384 Encoding::BYTE_STREAM_SPLIT,
2385 )
2386 .set_column_encoding(
2387 ColumnPath::from("int32_byte_stream_split"),
2388 Encoding::BYTE_STREAM_SPLIT,
2389 )
2390 .set_column_encoding(
2391 ColumnPath::from("int64_byte_stream_split"),
2392 Encoding::BYTE_STREAM_SPLIT,
2393 )
2394 .set_column_encoding(
2395 ColumnPath::from("flba5_byte_stream_split"),
2396 Encoding::BYTE_STREAM_SPLIT,
2397 )
2398 .set_column_encoding(
2399 ColumnPath::from("decimal_byte_stream_split"),
2400 Encoding::BYTE_STREAM_SPLIT,
2401 )
2402 .build();
2403
2404 let mut parquet_writer = ArrowWriter::try_new(
2405 file.try_clone().expect("cannot open file"),
2406 parquet_reader.schema(),
2407 Some(props),
2408 )
2409 .expect("create arrow writer");
2410
2411 for maybe_batch in parquet_reader {
2412 let batch = maybe_batch.expect("reading batch");
2413 parquet_writer.write(&batch).expect("writing data");
2414 }
2415
2416 parquet_writer.close().expect("finalizing file");
2417
2418 let reader = SerializedFileReader::new(file).expect("Failed to create reader");
2419 let filemeta = reader.metadata();
2420
2421 let check_encoding = |x: usize, filemeta: &ParquetMetaData| {
2423 assert!(
2424 filemeta
2425 .row_group(0)
2426 .column(x)
2427 .encodings()
2428 .collect::<Vec<_>>()
2429 .contains(&Encoding::BYTE_STREAM_SPLIT)
2430 );
2431 };
2432
2433 check_encoding(1, filemeta);
2434 check_encoding(3, filemeta);
2435 check_encoding(5, filemeta);
2436 check_encoding(7, filemeta);
2437 check_encoding(9, filemeta);
2438 check_encoding(11, filemeta);
2439 check_encoding(13, filemeta);
2440
2441 let mut iter = reader
2443 .get_row_iter(None)
2444 .expect("Failed to create row iterator");
2445
2446 let mut start = 0;
2447 let end = reader.metadata().file_metadata().num_rows();
2448
2449 let check_row = |row: Result<Row, ParquetError>| {
2450 assert!(row.is_ok());
2451 let r = row.unwrap();
2452 assert_eq!(r.get_float16(0).unwrap(), r.get_float16(1).unwrap());
2453 assert_eq!(r.get_float(2).unwrap(), r.get_float(3).unwrap());
2454 assert_eq!(r.get_double(4).unwrap(), r.get_double(5).unwrap());
2455 assert_eq!(r.get_int(6).unwrap(), r.get_int(7).unwrap());
2456 assert_eq!(r.get_long(8).unwrap(), r.get_long(9).unwrap());
2457 assert_eq!(r.get_bytes(10).unwrap(), r.get_bytes(11).unwrap());
2458 assert_eq!(r.get_decimal(12).unwrap(), r.get_decimal(13).unwrap());
2459 };
2460
2461 while start < end {
2462 match iter.next() {
2463 Some(row) => check_row(row),
2464 None => break,
2465 };
2466 start += 1;
2467 }
2468 }
2469
2470 #[test]
2471 fn test_rewrite_no_page_indexes() {
2472 let file = get_test_file("alltypes_tiny_pages.parquet");
2473 let metadata = ParquetMetaDataReader::new()
2474 .with_page_index_policy(PageIndexPolicy::Optional)
2475 .parse_and_finish(&file)
2476 .unwrap();
2477
2478 let props = Arc::new(WriterProperties::builder().build());
2479 let schema = metadata.file_metadata().schema_descr().root_schema_ptr();
2480 let output = Vec::<u8>::new();
2481 let mut writer = SerializedFileWriter::new(output, schema, props).unwrap();
2482
2483 for rg in metadata.row_groups() {
2484 let mut rg_out = writer.next_row_group().unwrap();
2485 for column in rg.columns() {
2486 let result = ColumnCloseResult {
2487 bytes_written: column.compressed_size() as _,
2488 rows_written: rg.num_rows() as _,
2489 metadata: column.clone(),
2490 bloom_filter: None,
2491 column_index: None,
2492 offset_index: None,
2493 };
2494 rg_out.append_column(&file, result).unwrap();
2495 }
2496 rg_out.close().unwrap();
2497 }
2498 writer.close().unwrap();
2499 }
2500
2501 #[test]
2502 fn test_rewrite_missing_column_index() {
2503 let file = get_test_file("alltypes_tiny_pages.parquet");
2505 let metadata = ParquetMetaDataReader::new()
2506 .with_page_index_policy(PageIndexPolicy::Optional)
2507 .parse_and_finish(&file)
2508 .unwrap();
2509
2510 let props = Arc::new(WriterProperties::builder().build());
2511 let schema = metadata.file_metadata().schema_descr().root_schema_ptr();
2512 let output = Vec::<u8>::new();
2513 let mut writer = SerializedFileWriter::new(output, schema, props).unwrap();
2514
2515 let column_indexes = metadata.column_index();
2516 let offset_indexes = metadata.offset_index();
2517
2518 for (rg_idx, rg) in metadata.row_groups().iter().enumerate() {
2519 let rg_column_indexes = column_indexes.and_then(|ci| ci.get(rg_idx));
2520 let rg_offset_indexes = offset_indexes.and_then(|oi| oi.get(rg_idx));
2521 let mut rg_out = writer.next_row_group().unwrap();
2522 for (col_idx, column) in rg.columns().iter().enumerate() {
2523 let column_index = rg_column_indexes.and_then(|row| row.get(col_idx)).cloned();
2524 let offset_index = rg_offset_indexes.and_then(|row| row.get(col_idx)).cloned();
2525 let result = ColumnCloseResult {
2526 bytes_written: column.compressed_size() as _,
2527 rows_written: rg.num_rows() as _,
2528 metadata: column.clone(),
2529 bloom_filter: None,
2530 column_index,
2531 offset_index,
2532 };
2533 rg_out.append_column(&file, result).unwrap();
2534 }
2535 rg_out.close().unwrap();
2536 }
2537 writer.close().unwrap();
2538 }
2539}