1use crate::bloom_filter::Sbbf;
21use crate::file::metadata::thrift::PageHeader;
22use crate::file::page_index::column_index::ColumnIndexMetaData;
23use crate::file::page_index::offset_index::OffsetIndexMetaData;
24use crate::parquet_thrift::{ThriftCompactOutputProtocol, WriteThrift};
25use std::fmt::Debug;
26use std::io::{BufWriter, IoSlice, Read};
27use std::{io::Write, sync::Arc};
28
29use crate::column::page_encryption::PageEncryptor;
30use crate::column::writer::{ColumnCloseResult, ColumnWriterImpl, get_typed_column_writer_mut};
31use crate::column::{
32 page::{CompressedPage, PageWriteSpec, PageWriter},
33 writer::{ColumnWriter, get_column_writer},
34};
35use crate::data_type::DataType;
36#[cfg(feature = "encryption")]
37use crate::encryption::encrypt::{
38 FileEncryptionProperties, FileEncryptor, get_column_crypto_metadata,
39};
40use crate::errors::{ParquetError, Result};
41#[cfg(feature = "encryption")]
42use crate::file::PARQUET_MAGIC_ENCR_FOOTER;
43use crate::file::properties::{BloomFilterPosition, WriterPropertiesPtr};
44use crate::file::reader::ChunkReader;
45use crate::file::{PARQUET_MAGIC, metadata::*};
46use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor, TypePtr};
47
48pub struct TrackedWrite<W: Write> {
52 inner: BufWriter<W>,
53 bytes_written: usize,
54}
55
56impl<W: Write> TrackedWrite<W> {
57 pub fn new(inner: W) -> Self {
59 let buf_write = BufWriter::new(inner);
60 Self {
61 inner: buf_write,
62 bytes_written: 0,
63 }
64 }
65
66 pub fn bytes_written(&self) -> usize {
68 self.bytes_written
69 }
70
71 pub fn inner(&self) -> &W {
73 self.inner.get_ref()
74 }
75
76 pub fn inner_mut(&mut self) -> &mut W {
81 self.inner.get_mut()
82 }
83
84 pub fn into_inner(self) -> Result<W> {
86 self.inner.into_inner().map_err(|err| {
87 ParquetError::General(format!("fail to get inner writer: {:?}", err.to_string()))
88 })
89 }
90}
91
92impl<W: Write> Write for TrackedWrite<W> {
93 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
94 let bytes = self.inner.write(buf)?;
95 self.bytes_written += bytes;
96 Ok(bytes)
97 }
98
99 fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> std::io::Result<usize> {
100 let bytes = self.inner.write_vectored(bufs)?;
101 self.bytes_written += bytes;
102 Ok(bytes)
103 }
104
105 fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
106 self.inner.write_all(buf)?;
107 self.bytes_written += buf.len();
108
109 Ok(())
110 }
111
112 fn flush(&mut self) -> std::io::Result<()> {
113 self.inner.flush()
114 }
115}
116
117pub type OnCloseColumnChunk<'a> = Box<dyn FnOnce(ColumnCloseResult) -> Result<()> + 'a>;
119
120pub type OnCloseRowGroup<'a, W> = Box<
126 dyn FnOnce(
127 &'a mut TrackedWrite<W>,
128 RowGroupMetaData,
129 Vec<Option<Sbbf>>,
130 Vec<Option<ColumnIndexMetaData>>,
131 Vec<Option<OffsetIndexMetaData>>,
132 ) -> Result<()>
133 + 'a
134 + Send,
135>;
136
137pub struct SerializedFileWriter<W: Write> {
157 buf: TrackedWrite<W>,
158 descr: SchemaDescPtr,
159 props: WriterPropertiesPtr,
160 row_groups: Vec<RowGroupMetaData>,
161 bloom_filters: Vec<Vec<Option<Sbbf>>>,
162 column_indexes: Vec<Vec<Option<ColumnIndexMetaData>>>,
163 offset_indexes: Vec<Vec<Option<OffsetIndexMetaData>>>,
164 row_group_index: usize,
165 kv_metadatas: Vec<KeyValue>,
167 finished: bool,
168 #[cfg(feature = "encryption")]
169 file_encryptor: Option<Arc<FileEncryptor>>,
170}
171
172impl<W: Write> Debug for SerializedFileWriter<W> {
173 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
174 f.debug_struct("SerializedFileWriter")
177 .field("descr", &self.descr)
178 .field("row_group_index", &self.row_group_index)
179 .field("kv_metadatas", &self.kv_metadatas)
180 .finish_non_exhaustive()
181 }
182}
183
184impl<W: Write + Send> SerializedFileWriter<W> {
185 pub fn new(buf: W, schema: TypePtr, properties: WriterPropertiesPtr) -> Result<Self> {
187 let mut buf = TrackedWrite::new(buf);
188
189 let schema_descriptor = SchemaDescriptor::new(schema.clone());
190
191 #[cfg(feature = "encryption")]
192 let file_encryptor = Self::get_file_encryptor(&properties, &schema_descriptor)?;
193
194 Self::start_file(&properties, &mut buf)?;
195 Ok(Self {
196 buf,
197 descr: Arc::new(schema_descriptor),
198 props: properties,
199 row_groups: vec![],
200 bloom_filters: vec![],
201 column_indexes: Vec::new(),
202 offset_indexes: Vec::new(),
203 row_group_index: 0,
204 kv_metadatas: Vec::new(),
205 finished: false,
206 #[cfg(feature = "encryption")]
207 file_encryptor,
208 })
209 }
210
211 #[cfg(feature = "encryption")]
212 fn get_file_encryptor(
213 properties: &WriterPropertiesPtr,
214 schema_descriptor: &SchemaDescriptor,
215 ) -> Result<Option<Arc<FileEncryptor>>> {
216 if let Some(file_encryption_properties) = properties.file_encryption_properties() {
217 file_encryption_properties.validate_encrypted_column_names(schema_descriptor)?;
218
219 Ok(Some(Arc::new(FileEncryptor::new(Arc::clone(
220 file_encryption_properties,
221 ))?)))
222 } else {
223 Ok(None)
224 }
225 }
226
227 pub fn next_row_group(&mut self) -> Result<SerializedRowGroupWriter<'_, W>> {
236 self.assert_previous_writer_closed()?;
237 let ordinal = self.row_group_index;
238
239 let ordinal: i16 = ordinal.try_into().map_err(|_| {
240 ParquetError::General(format!(
241 "Parquet does not support more than {} row groups per file (currently: {})",
242 i16::MAX,
243 ordinal
244 ))
245 })?;
246
247 self.row_group_index = self
248 .row_group_index
249 .checked_add(1)
250 .expect("SerializedFileWriter::row_group_index overflowed");
251
252 let bloom_filter_position = self.properties().bloom_filter_position();
253 let row_groups = &mut self.row_groups;
254 let row_bloom_filters = &mut self.bloom_filters;
255 let row_column_indexes = &mut self.column_indexes;
256 let row_offset_indexes = &mut self.offset_indexes;
257 let on_close = move |buf,
258 mut metadata,
259 row_group_bloom_filter,
260 row_group_column_index,
261 row_group_offset_index| {
262 row_bloom_filters.push(row_group_bloom_filter);
263 row_column_indexes.push(row_group_column_index);
264 row_offset_indexes.push(row_group_offset_index);
265 match bloom_filter_position {
267 BloomFilterPosition::AfterRowGroup => {
268 write_bloom_filters(buf, row_bloom_filters, &mut metadata)?
269 }
270 BloomFilterPosition::End => (),
271 };
272 row_groups.push(metadata);
273 Ok(())
274 };
275
276 let row_group_writer = SerializedRowGroupWriter::new(
277 self.descr.clone(),
278 self.props.clone(),
279 &mut self.buf,
280 ordinal,
281 Some(Box::new(on_close)),
282 );
283 #[cfg(feature = "encryption")]
284 let row_group_writer = row_group_writer.with_file_encryptor(self.file_encryptor.clone());
285
286 Ok(row_group_writer)
287 }
288
289 pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
291 &self.row_groups
292 }
293
294 pub fn finish(&mut self) -> Result<ParquetMetaData> {
300 self.assert_previous_writer_closed()?;
301 let metadata = self.write_metadata()?;
302 self.buf.flush()?;
303 Ok(metadata)
304 }
305
306 pub fn close(mut self) -> Result<ParquetMetaData> {
308 self.finish()
309 }
310
311 #[cfg(not(feature = "encryption"))]
313 fn start_file(_properties: &WriterPropertiesPtr, buf: &mut TrackedWrite<W>) -> Result<()> {
314 buf.write_all(get_file_magic())?;
315 Ok(())
316 }
317
318 #[cfg(feature = "encryption")]
320 fn start_file(properties: &WriterPropertiesPtr, buf: &mut TrackedWrite<W>) -> Result<()> {
321 let magic = get_file_magic(properties.file_encryption_properties.as_ref());
322
323 buf.write_all(magic)?;
324 Ok(())
325 }
326
327 fn write_metadata(&mut self) -> Result<ParquetMetaData> {
330 self.finished = true;
331
332 for row_group in &mut self.row_groups {
334 write_bloom_filters(&mut self.buf, &mut self.bloom_filters, row_group)?;
335 }
336
337 let key_value_metadata = match self.props.key_value_metadata() {
338 Some(kv) => Some(kv.iter().chain(&self.kv_metadatas).cloned().collect()),
339 None if self.kv_metadatas.is_empty() => None,
340 None => Some(self.kv_metadatas.clone()),
341 };
342
343 let row_groups = std::mem::take(&mut self.row_groups);
345 let column_indexes = std::mem::take(&mut self.column_indexes);
346 let offset_indexes = std::mem::take(&mut self.offset_indexes);
347
348 let write_path_in_schema = self.props.write_path_in_schema();
349 let mut encoder = ThriftMetadataWriter::new(
350 &mut self.buf,
351 &self.descr,
352 row_groups,
353 Some(self.props.created_by().to_string()),
354 self.props.writer_version().as_num(),
355 write_path_in_schema,
356 );
357
358 #[cfg(feature = "encryption")]
359 {
360 encoder = encoder.with_file_encryptor(self.file_encryptor.clone());
361 }
362
363 if let Some(key_value_metadata) = key_value_metadata {
364 encoder = encoder.with_key_value_metadata(key_value_metadata)
365 }
366
367 encoder = encoder.with_column_indexes(column_indexes);
368 if !self.props.offset_index_disabled() {
369 encoder = encoder.with_offset_indexes(offset_indexes);
370 }
371 encoder.finish()
372 }
373
374 #[inline]
375 fn assert_previous_writer_closed(&self) -> Result<()> {
376 if self.finished {
377 return Err(general_err!("SerializedFileWriter already finished"));
378 }
379
380 if self.row_group_index != self.row_groups.len() {
381 Err(general_err!("Previous row group writer was not closed"))
382 } else {
383 Ok(())
384 }
385 }
386
387 pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
389 self.kv_metadatas.push(kv_metadata);
390 }
391
392 pub fn schema_descr(&self) -> &SchemaDescriptor {
394 &self.descr
395 }
396
397 #[cfg(feature = "arrow")]
399 pub(crate) fn schema_descr_ptr(&self) -> &SchemaDescPtr {
400 &self.descr
401 }
402
403 pub fn properties(&self) -> &WriterPropertiesPtr {
405 &self.props
406 }
407
408 pub fn inner(&self) -> &W {
410 self.buf.inner()
411 }
412
413 pub fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
422 self.buf.write_all(buf)
423 }
424
425 pub fn flush(&mut self) -> std::io::Result<()> {
427 self.buf.flush()
428 }
429
430 pub fn inner_mut(&mut self) -> &mut W {
439 self.buf.inner_mut()
440 }
441
442 pub fn into_inner(mut self) -> Result<W> {
444 self.assert_previous_writer_closed()?;
445 let _ = self.write_metadata()?;
446
447 self.buf.into_inner()
448 }
449
450 pub fn bytes_written(&self) -> usize {
452 self.buf.bytes_written()
453 }
454
455 #[cfg(feature = "encryption")]
457 pub(crate) fn file_encryptor(&self) -> Option<Arc<FileEncryptor>> {
458 self.file_encryptor.clone()
459 }
460}
461
462fn write_bloom_filters<W: Write + Send>(
465 buf: &mut TrackedWrite<W>,
466 bloom_filters: &mut [Vec<Option<Sbbf>>],
467 row_group: &mut RowGroupMetaData,
468) -> Result<()> {
469 let row_group_idx: u16 = row_group
474 .ordinal()
475 .expect("Missing row group ordinal")
476 .try_into()
477 .map_err(|_| {
478 ParquetError::General(format!(
479 "Negative row group ordinal: {})",
480 row_group.ordinal().unwrap()
481 ))
482 })?;
483 let row_group_idx = row_group_idx as usize;
484 for (column_idx, column_chunk) in row_group.columns_mut().iter_mut().enumerate() {
485 if let Some(bloom_filter) = bloom_filters[row_group_idx][column_idx].take() {
486 let start_offset = buf.bytes_written();
487 bloom_filter.write(&mut *buf)?;
488 let end_offset = buf.bytes_written();
489 *column_chunk = column_chunk
491 .clone()
492 .into_builder()
493 .set_bloom_filter_offset(Some(start_offset as i64))
494 .set_bloom_filter_length(Some((end_offset - start_offset) as i32))
495 .build()?;
496 }
497 }
498 Ok(())
499}
500
501pub struct SerializedRowGroupWriter<'a, W: Write> {
514 descr: SchemaDescPtr,
515 props: WriterPropertiesPtr,
516 buf: &'a mut TrackedWrite<W>,
517 total_rows_written: Option<u64>,
518 total_bytes_written: u64,
519 total_uncompressed_bytes: i64,
520 column_index: usize,
521 row_group_metadata: Option<RowGroupMetaDataPtr>,
522 column_chunks: Vec<ColumnChunkMetaData>,
523 bloom_filters: Vec<Option<Sbbf>>,
524 column_indexes: Vec<Option<ColumnIndexMetaData>>,
525 offset_indexes: Vec<Option<OffsetIndexMetaData>>,
526 row_group_index: i16,
527 file_offset: i64,
528 on_close: Option<OnCloseRowGroup<'a, W>>,
529 #[cfg(feature = "encryption")]
530 file_encryptor: Option<Arc<FileEncryptor>>,
531}
532
533impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> {
534 pub fn new(
543 schema_descr: SchemaDescPtr,
544 properties: WriterPropertiesPtr,
545 buf: &'a mut TrackedWrite<W>,
546 row_group_index: i16,
547 on_close: Option<OnCloseRowGroup<'a, W>>,
548 ) -> Self {
549 let num_columns = schema_descr.num_columns();
550 let file_offset = buf.bytes_written() as i64;
551 Self {
552 buf,
553 row_group_index,
554 file_offset,
555 on_close,
556 total_rows_written: None,
557 descr: schema_descr,
558 props: properties,
559 column_index: 0,
560 row_group_metadata: None,
561 column_chunks: Vec::with_capacity(num_columns),
562 bloom_filters: Vec::with_capacity(num_columns),
563 column_indexes: Vec::with_capacity(num_columns),
564 offset_indexes: Vec::with_capacity(num_columns),
565 total_bytes_written: 0,
566 total_uncompressed_bytes: 0,
567 #[cfg(feature = "encryption")]
568 file_encryptor: None,
569 }
570 }
571
572 #[cfg(feature = "encryption")]
573 pub(crate) fn with_file_encryptor(
575 mut self,
576 file_encryptor: Option<Arc<FileEncryptor>>,
577 ) -> Self {
578 self.file_encryptor = file_encryptor;
579 self
580 }
581
582 fn next_column_desc(&mut self) -> Option<ColumnDescPtr> {
584 let ret = self.descr.columns().get(self.column_index)?.clone();
585 self.column_index += 1;
586 Some(ret)
587 }
588
589 fn get_on_close(&mut self) -> (&mut TrackedWrite<W>, OnCloseColumnChunk<'_>) {
591 let total_bytes_written = &mut self.total_bytes_written;
592 let total_uncompressed_bytes = &mut self.total_uncompressed_bytes;
593 let total_rows_written = &mut self.total_rows_written;
594 let column_chunks = &mut self.column_chunks;
595 let column_indexes = &mut self.column_indexes;
596 let offset_indexes = &mut self.offset_indexes;
597 let bloom_filters = &mut self.bloom_filters;
598
599 let on_close = |r: ColumnCloseResult| {
600 *total_bytes_written += r.bytes_written;
602 *total_uncompressed_bytes += r.metadata.uncompressed_size();
603 column_chunks.push(r.metadata);
604 bloom_filters.push(r.bloom_filter);
605 column_indexes.push(r.column_index);
606 offset_indexes.push(r.offset_index);
607
608 if let Some(rows) = *total_rows_written {
609 if rows != r.rows_written {
610 return Err(general_err!(
611 "Incorrect number of rows, expected {} != {} rows",
612 rows,
613 r.rows_written
614 ));
615 }
616 } else {
617 *total_rows_written = Some(r.rows_written);
618 }
619
620 Ok(())
621 };
622 (self.buf, Box::new(on_close))
623 }
624
625 pub(crate) fn next_column_with_factory<'b, F, C>(&'b mut self, factory: F) -> Result<Option<C>>
628 where
629 F: FnOnce(
630 ColumnDescPtr,
631 WriterPropertiesPtr,
632 Box<dyn PageWriter + 'b>,
633 OnCloseColumnChunk<'b>,
634 ) -> Result<C>,
635 {
636 self.assert_previous_writer_closed()?;
637
638 let encryptor_context = self.get_page_encryptor_context();
639
640 Ok(match self.next_column_desc() {
641 Some(column) => {
642 let props = self.props.clone();
643 let (buf, on_close) = self.get_on_close();
644
645 let page_writer = SerializedPageWriter::new(buf);
646 let page_writer =
647 Self::set_page_writer_encryptor(&column, encryptor_context, page_writer)?;
648
649 Some(factory(
650 column,
651 props,
652 Box::new(page_writer),
653 Box::new(on_close),
654 )?)
655 }
656 None => None,
657 })
658 }
659
660 pub fn next_column(&mut self) -> Result<Option<SerializedColumnWriter<'_>>> {
664 self.next_column_with_factory(|descr, props, page_writer, on_close| {
665 let column_writer = get_column_writer(descr, props, page_writer);
666 Ok(SerializedColumnWriter::new(column_writer, Some(on_close)))
667 })
668 }
669
670 pub fn append_column<R: ChunkReader>(
685 &mut self,
686 reader: &R,
687 mut close: ColumnCloseResult,
688 ) -> Result<()> {
689 self.assert_previous_writer_closed()?;
690 let desc = self
691 .next_column_desc()
692 .ok_or_else(|| general_err!("exhausted columns in SerializedRowGroupWriter"))?;
693
694 let metadata = close.metadata;
695
696 if metadata.column_descr() != desc.as_ref() {
697 return Err(general_err!(
698 "column descriptor mismatch, expected {:?} got {:?}",
699 desc,
700 metadata.column_descr()
701 ));
702 }
703
704 let src_dictionary_offset = metadata.dictionary_page_offset();
705 let src_data_offset = metadata.data_page_offset();
706 let src_offset = src_dictionary_offset.unwrap_or(src_data_offset);
707 let src_length = metadata.compressed_size();
708
709 let write_offset = self.buf.bytes_written();
710 let mut read = reader.get_read(src_offset as _)?.take(src_length as _);
711 let write_length = std::io::copy(&mut read, &mut self.buf)?;
712
713 if src_length as u64 != write_length {
714 return Err(general_err!(
715 "Failed to splice column data, expected {read_length} got {write_length}"
716 ));
717 }
718
719 let map_offset = |x| x - src_offset + write_offset as i64;
720 let mut builder = ColumnChunkMetaData::builder(metadata.column_descr_ptr())
721 .set_compression_codec(metadata.compression_codec())
722 .set_encodings_mask(*metadata.encodings_mask())
723 .set_total_compressed_size(metadata.compressed_size())
724 .set_total_uncompressed_size(metadata.uncompressed_size())
725 .set_num_values(metadata.num_values())
726 .set_data_page_offset(map_offset(src_data_offset))
727 .set_dictionary_page_offset(src_dictionary_offset.map(map_offset))
728 .set_unencoded_byte_array_data_bytes(metadata.unencoded_byte_array_data_bytes());
729
730 if let Some(rep_hist) = metadata.repetition_level_histogram() {
731 builder = builder.set_repetition_level_histogram(Some(rep_hist.clone()))
732 }
733 if let Some(def_hist) = metadata.definition_level_histogram() {
734 builder = builder.set_definition_level_histogram(Some(def_hist.clone()))
735 }
736 if let Some(statistics) = metadata.statistics() {
737 builder = builder.set_statistics(statistics.clone())
738 }
739 if let Some(geo_statistics) = metadata.geo_statistics() {
740 builder = builder.set_geo_statistics(Box::new(geo_statistics.clone()))
741 }
742 if let Some(page_encoding_stats) = metadata.page_encoding_stats() {
743 builder = builder.set_page_encoding_stats(page_encoding_stats.clone())
744 }
745 builder = self.set_column_crypto_metadata(builder, &metadata);
746 close.metadata = builder.build()?;
747
748 if let Some(offsets) = close.offset_index.as_mut() {
749 for location in &mut offsets.page_locations {
750 location.offset = map_offset(location.offset)
751 }
752 }
753
754 let (_, on_close) = self.get_on_close();
755 on_close(close)
756 }
757
758 pub fn close(mut self) -> Result<RowGroupMetaDataPtr> {
760 if self.row_group_metadata.is_none() {
761 self.assert_previous_writer_closed()?;
762
763 let column_chunks = std::mem::take(&mut self.column_chunks);
764 let row_group_metadata = RowGroupMetaData::builder(self.descr.clone())
765 .set_column_metadata(column_chunks)
766 .set_total_byte_size(self.total_uncompressed_bytes)
767 .set_num_rows(self.total_rows_written.unwrap_or(0) as i64)
768 .set_sorting_columns(self.props.sorting_columns().cloned())
769 .set_ordinal(self.row_group_index)
770 .set_file_offset(self.file_offset)
771 .build()?;
772
773 self.row_group_metadata = Some(Arc::new(row_group_metadata.clone()));
774
775 if let Some(on_close) = self.on_close.take() {
776 on_close(
777 self.buf,
778 row_group_metadata,
779 self.bloom_filters,
780 self.column_indexes,
781 self.offset_indexes,
782 )?
783 }
784 }
785
786 let metadata = self.row_group_metadata.as_ref().unwrap().clone();
787 Ok(metadata)
788 }
789
790 #[cfg(feature = "encryption")]
792 fn set_column_crypto_metadata(
793 &self,
794 builder: ColumnChunkMetaDataBuilder,
795 metadata: &ColumnChunkMetaData,
796 ) -> ColumnChunkMetaDataBuilder {
797 if let Some(file_encryptor) = self.file_encryptor.as_ref() {
798 builder.set_column_crypto_metadata(get_column_crypto_metadata(
799 file_encryptor.properties(),
800 &metadata.column_descr_ptr(),
801 ))
802 } else {
803 builder
804 }
805 }
806
807 #[cfg(feature = "encryption")]
809 fn get_page_encryptor_context(&self) -> PageEncryptorContext {
810 PageEncryptorContext {
811 file_encryptor: self.file_encryptor.clone(),
812 row_group_index: self.row_group_index as usize,
813 column_index: self.column_index,
814 }
815 }
816
817 #[cfg(feature = "encryption")]
819 fn set_page_writer_encryptor<'b>(
820 column: &ColumnDescPtr,
821 context: PageEncryptorContext,
822 page_writer: SerializedPageWriter<'b, W>,
823 ) -> Result<SerializedPageWriter<'b, W>> {
824 let page_encryptor = PageEncryptor::create_if_column_encrypted(
825 &context.file_encryptor,
826 context.row_group_index,
827 context.column_index,
828 &column.path().string(),
829 )?;
830
831 Ok(page_writer.with_page_encryptor(page_encryptor))
832 }
833
834 #[cfg(not(feature = "encryption"))]
836 fn set_column_crypto_metadata(
837 &self,
838 builder: ColumnChunkMetaDataBuilder,
839 _metadata: &ColumnChunkMetaData,
840 ) -> ColumnChunkMetaDataBuilder {
841 builder
842 }
843
844 #[cfg(not(feature = "encryption"))]
845 fn get_page_encryptor_context(&self) -> PageEncryptorContext {
846 PageEncryptorContext {}
847 }
848
849 #[cfg(not(feature = "encryption"))]
851 fn set_page_writer_encryptor<'b>(
852 _column: &ColumnDescPtr,
853 _context: PageEncryptorContext,
854 page_writer: SerializedPageWriter<'b, W>,
855 ) -> Result<SerializedPageWriter<'b, W>> {
856 Ok(page_writer)
857 }
858
859 #[inline]
860 fn assert_previous_writer_closed(&self) -> Result<()> {
861 if self.column_index != self.column_chunks.len() {
862 Err(general_err!("Previous column writer was not closed"))
863 } else {
864 Ok(())
865 }
866 }
867}
868
869#[cfg(feature = "encryption")]
871struct PageEncryptorContext {
872 file_encryptor: Option<Arc<FileEncryptor>>,
873 row_group_index: usize,
874 column_index: usize,
875}
876
877#[cfg(not(feature = "encryption"))]
878struct PageEncryptorContext {}
879
880pub struct SerializedColumnWriter<'a> {
882 inner: ColumnWriter<'a>,
883 on_close: Option<OnCloseColumnChunk<'a>>,
884}
885
886impl<'a> SerializedColumnWriter<'a> {
887 pub fn new(inner: ColumnWriter<'a>, on_close: Option<OnCloseColumnChunk<'a>>) -> Self {
890 Self { inner, on_close }
891 }
892
893 pub fn untyped(&mut self) -> &mut ColumnWriter<'a> {
895 &mut self.inner
896 }
897
898 pub fn typed<T: DataType>(&mut self) -> &mut ColumnWriterImpl<'a, T> {
900 get_typed_column_writer_mut(&mut self.inner)
901 }
902
903 pub fn close(mut self) -> Result<()> {
905 let r = self.inner.close()?;
906 if let Some(on_close) = self.on_close.take() {
907 on_close(r)?
908 }
909
910 Ok(())
911 }
912}
913
914pub struct SerializedPageWriter<'a, W: Write> {
919 sink: &'a mut TrackedWrite<W>,
920 #[cfg(feature = "encryption")]
921 page_encryptor: Option<PageEncryptor>,
922}
923
924impl<'a, W: Write> SerializedPageWriter<'a, W> {
925 pub fn new(sink: &'a mut TrackedWrite<W>) -> Self {
927 Self {
928 sink,
929 #[cfg(feature = "encryption")]
930 page_encryptor: None,
931 }
932 }
933
934 #[inline]
937 fn serialize_page_header(&mut self, header: PageHeader) -> Result<usize> {
938 let start_pos = self.sink.bytes_written();
939 match self.page_encryptor_and_sink_mut() {
940 Some((page_encryptor, sink)) => {
941 page_encryptor.encrypt_page_header(&header, sink)?;
942 }
943 None => {
944 let mut protocol = ThriftCompactOutputProtocol::new(&mut self.sink);
945 header.write_thrift(&mut protocol)?;
946 }
947 }
948 Ok(self.sink.bytes_written() - start_pos)
949 }
950}
951
952#[cfg(feature = "encryption")]
953impl<'a, W: Write> SerializedPageWriter<'a, W> {
954 fn with_page_encryptor(mut self, page_encryptor: Option<PageEncryptor>) -> Self {
956 self.page_encryptor = page_encryptor;
957 self
958 }
959
960 fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
961 self.page_encryptor.as_mut()
962 }
963
964 fn page_encryptor_and_sink_mut(
965 &mut self,
966 ) -> Option<(&mut PageEncryptor, &mut &'a mut TrackedWrite<W>)> {
967 self.page_encryptor.as_mut().map(|pe| (pe, &mut self.sink))
968 }
969}
970
971#[cfg(not(feature = "encryption"))]
972impl<'a, W: Write> SerializedPageWriter<'a, W> {
973 fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
974 None
975 }
976
977 fn page_encryptor_and_sink_mut(
978 &mut self,
979 ) -> Option<(&mut PageEncryptor, &mut &'a mut TrackedWrite<W>)> {
980 None
981 }
982}
983
984impl<W: Write + Send> PageWriter for SerializedPageWriter<'_, W> {
985 fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
986 let page = match self.page_encryptor_mut() {
987 Some(page_encryptor) => page_encryptor.encrypt_compressed_page(page)?,
988 None => page,
989 };
990
991 let page_type = page.page_type();
992 let start_pos = self.sink.bytes_written() as u64;
993
994 let page_header = page.to_thrift_header()?;
995 let header_size = self.serialize_page_header(page_header)?;
996
997 self.sink.write_all(page.data())?;
998
999 let mut spec = PageWriteSpec::new();
1000 spec.page_type = page_type;
1001 spec.uncompressed_size = page.uncompressed_size() + header_size;
1002 spec.compressed_size = page.compressed_size() + header_size;
1003 spec.offset = start_pos;
1004 spec.bytes_written = self.sink.bytes_written() as u64 - start_pos;
1005 spec.num_values = page.num_values();
1006
1007 if let Some(page_encryptor) = self.page_encryptor_mut() {
1008 if page.compressed_page().is_data_page() {
1009 page_encryptor.increment_page();
1010 }
1011 }
1012 Ok(spec)
1013 }
1014
1015 fn close(&mut self) -> Result<()> {
1016 self.sink.flush()?;
1017 Ok(())
1018 }
1019}
1020
1021#[cfg(feature = "encryption")]
1024pub(crate) fn get_file_magic(
1025 file_encryption_properties: Option<&Arc<FileEncryptionProperties>>,
1026) -> &'static [u8; 4] {
1027 match file_encryption_properties.as_ref() {
1028 Some(encryption_properties) if encryption_properties.encrypt_footer() => {
1029 &PARQUET_MAGIC_ENCR_FOOTER
1030 }
1031 _ => &PARQUET_MAGIC,
1032 }
1033}
1034
1035#[cfg(not(feature = "encryption"))]
1036pub(crate) fn get_file_magic() -> &'static [u8; 4] {
1037 &PARQUET_MAGIC
1038}
1039
1040#[cfg(test)]
1041mod tests {
1042 use super::*;
1043
1044 #[cfg(feature = "arrow")]
1045 use arrow_array::RecordBatchReader;
1046 use bytes::Bytes;
1047 use std::fs::File;
1048
1049 #[cfg(feature = "arrow")]
1050 use crate::arrow::ArrowWriter;
1051 #[cfg(feature = "arrow")]
1052 use crate::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
1053 use crate::basic::{
1054 ColumnOrder, Compression, ConvertedType, Encoding, LogicalType, Repetition, SortOrder, Type,
1055 };
1056 use crate::column::page::{Page, PageReader};
1057 use crate::column::reader::get_typed_column_reader;
1058 use crate::compression::{Codec, CodecOptionsBuilder, create_codec};
1059 use crate::data_type::{BoolType, ByteArrayType, Int32Type};
1060 use crate::file::page_index::column_index::ColumnIndexMetaData;
1061 use crate::file::properties::EnabledStatistics;
1062 use crate::file::serialized_reader::ReadOptionsBuilder;
1063 use crate::file::statistics::{from_thrift_page_stats, page_stats_to_thrift};
1064 use crate::file::{
1065 properties::{ReaderProperties, WriterProperties, WriterVersion},
1066 reader::{FileReader, SerializedFileReader, SerializedPageReader},
1067 statistics::Statistics,
1068 };
1069 use crate::record::{Row, RowAccessor};
1070 use crate::schema::parser::parse_message_type;
1071 use crate::schema::types;
1072 use crate::schema::types::{ColumnDescriptor, ColumnPath};
1073 use crate::util::test_common::file_util::get_test_file;
1074 use crate::util::test_common::rand_gen::RandGen;
1075
1076 #[test]
1077 fn test_row_group_writer_error_not_all_columns_written() {
1078 let file = tempfile::tempfile().unwrap();
1079 let schema = Arc::new(
1080 types::Type::group_type_builder("schema")
1081 .with_fields(vec![Arc::new(
1082 types::Type::primitive_type_builder("col1", Type::INT32)
1083 .build()
1084 .unwrap(),
1085 )])
1086 .build()
1087 .unwrap(),
1088 );
1089 let props = Default::default();
1090 let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
1091 let row_group_writer = writer.next_row_group().unwrap();
1092 let res = row_group_writer.close();
1093 assert!(res.is_err());
1094 if let Err(err) = res {
1095 assert_eq!(
1096 format!("{err}"),
1097 "Parquet error: Column length mismatch: 1 != 0"
1098 );
1099 }
1100 }
1101
1102 #[test]
1103 fn test_row_group_writer_num_records_mismatch() {
1104 let file = tempfile::tempfile().unwrap();
1105 let schema = Arc::new(
1106 types::Type::group_type_builder("schema")
1107 .with_fields(vec![
1108 Arc::new(
1109 types::Type::primitive_type_builder("col1", Type::INT32)
1110 .with_repetition(Repetition::REQUIRED)
1111 .build()
1112 .unwrap(),
1113 ),
1114 Arc::new(
1115 types::Type::primitive_type_builder("col2", Type::INT32)
1116 .with_repetition(Repetition::REQUIRED)
1117 .build()
1118 .unwrap(),
1119 ),
1120 ])
1121 .build()
1122 .unwrap(),
1123 );
1124 let props = Default::default();
1125 let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
1126 let mut row_group_writer = writer.next_row_group().unwrap();
1127
1128 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
1129 col_writer
1130 .typed::<Int32Type>()
1131 .write_batch(&[1, 2, 3], None, None)
1132 .unwrap();
1133 col_writer.close().unwrap();
1134
1135 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
1136 col_writer
1137 .typed::<Int32Type>()
1138 .write_batch(&[1, 2], None, None)
1139 .unwrap();
1140
1141 let err = col_writer.close().unwrap_err();
1142 assert_eq!(
1143 err.to_string(),
1144 "Parquet error: Incorrect number of rows, expected 3 != 2 rows"
1145 );
1146 }
1147
1148 #[test]
1149 fn test_file_writer_empty_file() {
1150 let file = tempfile::tempfile().unwrap();
1151
1152 let schema = Arc::new(
1153 types::Type::group_type_builder("schema")
1154 .with_fields(vec![Arc::new(
1155 types::Type::primitive_type_builder("col1", Type::INT32)
1156 .build()
1157 .unwrap(),
1158 )])
1159 .build()
1160 .unwrap(),
1161 );
1162 let props = Default::default();
1163 let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1164 writer.close().unwrap();
1165
1166 let reader = SerializedFileReader::new(file).unwrap();
1167 assert_eq!(reader.get_row_iter(None).unwrap().count(), 0);
1168 }
1169
1170 #[test]
1171 fn test_file_writer_column_orders_populated() {
1172 let file = tempfile::tempfile().unwrap();
1173
1174 let schema = Arc::new(
1175 types::Type::group_type_builder("schema")
1176 .with_fields(vec![
1177 Arc::new(
1178 types::Type::primitive_type_builder("col1", Type::INT32)
1179 .build()
1180 .unwrap(),
1181 ),
1182 Arc::new(
1183 types::Type::primitive_type_builder("col2", Type::FIXED_LEN_BYTE_ARRAY)
1184 .with_converted_type(ConvertedType::INTERVAL)
1185 .with_length(12)
1186 .build()
1187 .unwrap(),
1188 ),
1189 Arc::new(
1190 types::Type::group_type_builder("nested")
1191 .with_repetition(Repetition::REQUIRED)
1192 .with_fields(vec![
1193 Arc::new(
1194 types::Type::primitive_type_builder(
1195 "col3",
1196 Type::FIXED_LEN_BYTE_ARRAY,
1197 )
1198 .with_logical_type(Some(LogicalType::Float16))
1199 .with_length(2)
1200 .build()
1201 .unwrap(),
1202 ),
1203 Arc::new(
1204 types::Type::primitive_type_builder("col4", Type::BYTE_ARRAY)
1205 .with_logical_type(Some(LogicalType::String))
1206 .build()
1207 .unwrap(),
1208 ),
1209 ])
1210 .build()
1211 .unwrap(),
1212 ),
1213 ])
1214 .build()
1215 .unwrap(),
1216 );
1217
1218 let props = Default::default();
1219 let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1220 writer.close().unwrap();
1221
1222 let reader = SerializedFileReader::new(file).unwrap();
1223
1224 let expected = vec![
1226 ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED),
1228 ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::UNDEFINED),
1230 ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED),
1232 ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::UNSIGNED),
1234 ];
1235 let actual = reader.metadata().file_metadata().column_orders();
1236
1237 assert!(actual.is_some());
1238 let actual = actual.unwrap();
1239 assert_eq!(*actual, expected);
1240 }
1241
1242 #[test]
1243 fn test_file_writer_with_metadata() {
1244 let file = tempfile::tempfile().unwrap();
1245
1246 let schema = Arc::new(
1247 types::Type::group_type_builder("schema")
1248 .with_fields(vec![Arc::new(
1249 types::Type::primitive_type_builder("col1", Type::INT32)
1250 .build()
1251 .unwrap(),
1252 )])
1253 .build()
1254 .unwrap(),
1255 );
1256 let props = Arc::new(
1257 WriterProperties::builder()
1258 .set_key_value_metadata(Some(vec![KeyValue::new(
1259 "key".to_string(),
1260 "value".to_string(),
1261 )]))
1262 .build(),
1263 );
1264 let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1265 writer.close().unwrap();
1266
1267 let reader = SerializedFileReader::new(file).unwrap();
1268 assert_eq!(
1269 reader
1270 .metadata()
1271 .file_metadata()
1272 .key_value_metadata()
1273 .to_owned()
1274 .unwrap()
1275 .len(),
1276 1
1277 );
1278 }
1279
1280 #[test]
1281 fn test_file_writer_v2_with_metadata() {
1282 let file = tempfile::tempfile().unwrap();
1283 let field_logical_type = Some(LogicalType::Integer {
1284 bit_width: 8,
1285 is_signed: false,
1286 });
1287 let field = Arc::new(
1288 types::Type::primitive_type_builder("col1", Type::INT32)
1289 .with_logical_type(field_logical_type.clone())
1290 .with_converted_type(field_logical_type.into())
1291 .build()
1292 .unwrap(),
1293 );
1294 let schema = Arc::new(
1295 types::Type::group_type_builder("schema")
1296 .with_fields(vec![field.clone()])
1297 .build()
1298 .unwrap(),
1299 );
1300 let props = Arc::new(
1301 WriterProperties::builder()
1302 .set_key_value_metadata(Some(vec![KeyValue::new(
1303 "key".to_string(),
1304 "value".to_string(),
1305 )]))
1306 .set_writer_version(WriterVersion::PARQUET_2_0)
1307 .build(),
1308 );
1309 let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1310 writer.close().unwrap();
1311
1312 let reader = SerializedFileReader::new(file).unwrap();
1313
1314 assert_eq!(
1315 reader
1316 .metadata()
1317 .file_metadata()
1318 .key_value_metadata()
1319 .to_owned()
1320 .unwrap()
1321 .len(),
1322 1
1323 );
1324
1325 let fields = reader.metadata().file_metadata().schema().get_fields();
1327 assert_eq!(fields.len(), 1);
1328 assert_eq!(fields[0], field);
1329 }
1330
1331 #[test]
1332 fn test_file_writer_with_sorting_columns_metadata() {
1333 let file = tempfile::tempfile().unwrap();
1334
1335 let schema = Arc::new(
1336 types::Type::group_type_builder("schema")
1337 .with_fields(vec![
1338 Arc::new(
1339 types::Type::primitive_type_builder("col1", Type::INT32)
1340 .build()
1341 .unwrap(),
1342 ),
1343 Arc::new(
1344 types::Type::primitive_type_builder("col2", Type::INT32)
1345 .build()
1346 .unwrap(),
1347 ),
1348 ])
1349 .build()
1350 .unwrap(),
1351 );
1352 let expected_result = Some(vec![SortingColumn {
1353 column_idx: 0,
1354 descending: false,
1355 nulls_first: true,
1356 }]);
1357 let props = Arc::new(
1358 WriterProperties::builder()
1359 .set_key_value_metadata(Some(vec![KeyValue::new(
1360 "key".to_string(),
1361 "value".to_string(),
1362 )]))
1363 .set_sorting_columns(expected_result.clone())
1364 .build(),
1365 );
1366 let mut writer =
1367 SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1368 let mut row_group_writer = writer.next_row_group().expect("get row group writer");
1369
1370 let col_writer = row_group_writer.next_column().unwrap().unwrap();
1371 col_writer.close().unwrap();
1372
1373 let col_writer = row_group_writer.next_column().unwrap().unwrap();
1374 col_writer.close().unwrap();
1375
1376 row_group_writer.close().unwrap();
1377 writer.close().unwrap();
1378
1379 let reader = SerializedFileReader::new(file).unwrap();
1380 let result: Vec<Option<&Vec<SortingColumn>>> = reader
1381 .metadata()
1382 .row_groups()
1383 .iter()
1384 .map(|f| f.sorting_columns())
1385 .collect();
1386 assert_eq!(expected_result.as_ref(), result[0]);
1388 }
1389
1390 #[test]
1391 fn test_file_writer_empty_row_groups() {
1392 let file = tempfile::tempfile().unwrap();
1393 test_file_roundtrip(file, vec![]);
1394 }
1395
1396 #[test]
1397 fn test_file_writer_single_row_group() {
1398 let file = tempfile::tempfile().unwrap();
1399 test_file_roundtrip(file, vec![vec![1, 2, 3, 4, 5]]);
1400 }
1401
1402 #[test]
1403 fn test_file_writer_multiple_row_groups() {
1404 let file = tempfile::tempfile().unwrap();
1405 test_file_roundtrip(
1406 file,
1407 vec![
1408 vec![1, 2, 3, 4, 5],
1409 vec![1, 2, 3],
1410 vec![1],
1411 vec![1, 2, 3, 4, 5, 6],
1412 ],
1413 );
1414 }
1415
1416 #[test]
1417 fn test_file_writer_multiple_large_row_groups() {
1418 let file = tempfile::tempfile().unwrap();
1419 test_file_roundtrip(
1420 file,
1421 vec![vec![123; 1024], vec![124; 1000], vec![125; 15], vec![]],
1422 );
1423 }
1424
1425 #[test]
1426 fn test_page_writer_data_pages() {
1427 let pages = [
1428 Page::DataPage {
1429 buf: Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]),
1430 num_values: 10,
1431 encoding: Encoding::DELTA_BINARY_PACKED,
1432 def_level_encoding: Encoding::RLE,
1433 rep_level_encoding: Encoding::RLE,
1434 statistics: Some(Statistics::int32(Some(1), Some(3), None, Some(7), true)),
1435 },
1436 Page::DataPageV2 {
1437 buf: Bytes::from(vec![4; 128]),
1438 num_values: 10,
1439 encoding: Encoding::DELTA_BINARY_PACKED,
1440 num_nulls: 2,
1441 num_rows: 12,
1442 def_levels_byte_len: 24,
1443 rep_levels_byte_len: 32,
1444 is_compressed: false,
1445 statistics: Some(Statistics::int32(Some(1), Some(3), None, Some(7), true)),
1446 },
1447 ];
1448
1449 test_page_roundtrip(&pages[..], Compression::SNAPPY, Type::INT32);
1450 test_page_roundtrip(&pages[..], Compression::UNCOMPRESSED, Type::INT32);
1451 }
1452
1453 #[test]
1454 fn test_page_writer_dict_pages() {
1455 let pages = [
1456 Page::DictionaryPage {
1457 buf: Bytes::from(vec![1, 2, 3, 4, 5]),
1458 num_values: 5,
1459 encoding: Encoding::RLE_DICTIONARY,
1460 is_sorted: false,
1461 },
1462 Page::DataPage {
1463 buf: Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]),
1464 num_values: 10,
1465 encoding: Encoding::DELTA_BINARY_PACKED,
1466 def_level_encoding: Encoding::RLE,
1467 rep_level_encoding: Encoding::RLE,
1468 statistics: Some(Statistics::int32(Some(1), Some(3), None, Some(7), true)),
1469 },
1470 Page::DataPageV2 {
1471 buf: Bytes::from(vec![4; 128]),
1472 num_values: 10,
1473 encoding: Encoding::DELTA_BINARY_PACKED,
1474 num_nulls: 2,
1475 num_rows: 12,
1476 def_levels_byte_len: 24,
1477 rep_levels_byte_len: 32,
1478 is_compressed: false,
1479 statistics: None,
1480 },
1481 ];
1482
1483 test_page_roundtrip(&pages[..], Compression::SNAPPY, Type::INT32);
1484 test_page_roundtrip(&pages[..], Compression::UNCOMPRESSED, Type::INT32);
1485 }
1486
1487 fn test_page_roundtrip(pages: &[Page], codec: Compression, physical_type: Type) {
1491 let mut compressed_pages = vec![];
1492 let mut total_num_values = 0i64;
1493 let codec_options = CodecOptionsBuilder::default()
1494 .set_backward_compatible_lz4(false)
1495 .build();
1496 let mut compressor = create_codec(codec, &codec_options).unwrap();
1497
1498 for page in pages {
1499 let uncompressed_len = page.buffer().len();
1500
1501 let compressed_page = match *page {
1502 Page::DataPage {
1503 ref buf,
1504 num_values,
1505 encoding,
1506 def_level_encoding,
1507 rep_level_encoding,
1508 ref statistics,
1509 } => {
1510 total_num_values += num_values as i64;
1511 let output_buf = compress_helper(compressor.as_mut(), buf);
1512
1513 Page::DataPage {
1514 buf: Bytes::from(output_buf),
1515 num_values,
1516 encoding,
1517 def_level_encoding,
1518 rep_level_encoding,
1519 statistics: from_thrift_page_stats(
1520 physical_type,
1521 page_stats_to_thrift(statistics.as_ref()),
1522 )
1523 .unwrap(),
1524 }
1525 }
1526 Page::DataPageV2 {
1527 ref buf,
1528 num_values,
1529 encoding,
1530 num_nulls,
1531 num_rows,
1532 def_levels_byte_len,
1533 rep_levels_byte_len,
1534 ref statistics,
1535 ..
1536 } => {
1537 total_num_values += num_values as i64;
1538 let offset = (def_levels_byte_len + rep_levels_byte_len) as usize;
1539 let cmp_buf = compress_helper(compressor.as_mut(), &buf[offset..]);
1540 let mut output_buf = Vec::from(&buf[..offset]);
1541 output_buf.extend_from_slice(&cmp_buf[..]);
1542
1543 Page::DataPageV2 {
1544 buf: Bytes::from(output_buf),
1545 num_values,
1546 encoding,
1547 num_nulls,
1548 num_rows,
1549 def_levels_byte_len,
1550 rep_levels_byte_len,
1551 is_compressed: compressor.is_some(),
1552 statistics: from_thrift_page_stats(
1553 physical_type,
1554 page_stats_to_thrift(statistics.as_ref()),
1555 )
1556 .unwrap(),
1557 }
1558 }
1559 Page::DictionaryPage {
1560 ref buf,
1561 num_values,
1562 encoding,
1563 is_sorted,
1564 } => {
1565 let output_buf = compress_helper(compressor.as_mut(), buf);
1566
1567 Page::DictionaryPage {
1568 buf: Bytes::from(output_buf),
1569 num_values,
1570 encoding,
1571 is_sorted,
1572 }
1573 }
1574 };
1575
1576 let compressed_page = CompressedPage::new(compressed_page, uncompressed_len);
1577 compressed_pages.push(compressed_page);
1578 }
1579
1580 let mut buffer: Vec<u8> = vec![];
1581 let mut result_pages: Vec<Page> = vec![];
1582 {
1583 let mut writer = TrackedWrite::new(&mut buffer);
1584 let mut page_writer = SerializedPageWriter::new(&mut writer);
1585
1586 for page in compressed_pages {
1587 page_writer.write_page(page).unwrap();
1588 }
1589 page_writer.close().unwrap();
1590 }
1591 {
1592 let reader = bytes::Bytes::from(buffer);
1593
1594 let t = types::Type::primitive_type_builder("t", physical_type)
1595 .build()
1596 .unwrap();
1597
1598 let desc = ColumnDescriptor::new(Arc::new(t), 0, 0, ColumnPath::new(vec![]));
1599 let meta = ColumnChunkMetaData::builder(Arc::new(desc))
1600 .set_compression_codec(codec.into())
1601 .set_total_compressed_size(reader.len() as i64)
1602 .set_num_values(total_num_values)
1603 .build()
1604 .unwrap();
1605
1606 let props = ReaderProperties::builder()
1607 .set_backward_compatible_lz4(false)
1608 .set_read_page_statistics(true)
1609 .build();
1610 let mut page_reader = SerializedPageReader::new_with_properties(
1611 Arc::new(reader),
1612 &meta,
1613 total_num_values as usize,
1614 None,
1615 Arc::new(props),
1616 )
1617 .unwrap();
1618
1619 while let Some(page) = page_reader.get_next_page().unwrap() {
1620 result_pages.push(page);
1621 }
1622 }
1623
1624 assert_eq!(result_pages.len(), pages.len());
1625 for i in 0..result_pages.len() {
1626 assert_page(&result_pages[i], &pages[i]);
1627 }
1628 }
1629
1630 fn compress_helper(compressor: Option<&mut Box<dyn Codec>>, data: &[u8]) -> Vec<u8> {
1632 let mut output_buf = vec![];
1633 if let Some(cmpr) = compressor {
1634 cmpr.compress(data, &mut output_buf).unwrap();
1635 } else {
1636 output_buf.extend_from_slice(data);
1637 }
1638 output_buf
1639 }
1640
1641 fn assert_page(left: &Page, right: &Page) {
1643 assert_eq!(left.page_type(), right.page_type());
1644 assert_eq!(&left.buffer(), &right.buffer());
1645 assert_eq!(left.num_values(), right.num_values());
1646 assert_eq!(left.encoding(), right.encoding());
1647 assert_eq!(
1648 page_stats_to_thrift(left.statistics()),
1649 page_stats_to_thrift(right.statistics())
1650 );
1651 }
1652
1653 fn test_roundtrip_i32<W, R>(
1655 file: W,
1656 data: Vec<Vec<i32>>,
1657 compression: Compression,
1658 ) -> ParquetMetaData
1659 where
1660 W: Write + Send,
1661 R: ChunkReader + From<W> + 'static,
1662 {
1663 test_roundtrip::<W, R, Int32Type, _>(file, data, |r| r.get_int(0).unwrap(), compression)
1664 }
1665
1666 fn test_roundtrip<W, R, D, F>(
1669 mut file: W,
1670 data: Vec<Vec<D::T>>,
1671 value: F,
1672 compression: Compression,
1673 ) -> ParquetMetaData
1674 where
1675 W: Write + Send,
1676 R: ChunkReader + From<W> + 'static,
1677 D: DataType,
1678 F: Fn(Row) -> D::T,
1679 {
1680 let schema = Arc::new(
1681 types::Type::group_type_builder("schema")
1682 .with_fields(vec![Arc::new(
1683 types::Type::primitive_type_builder("col1", D::get_physical_type())
1684 .with_repetition(Repetition::REQUIRED)
1685 .build()
1686 .unwrap(),
1687 )])
1688 .build()
1689 .unwrap(),
1690 );
1691 let props = Arc::new(
1692 WriterProperties::builder()
1693 .set_compression(compression)
1694 .build(),
1695 );
1696 let mut file_writer = SerializedFileWriter::new(&mut file, schema, props).unwrap();
1697 let mut rows: i64 = 0;
1698
1699 for (idx, subset) in data.iter().enumerate() {
1700 let row_group_file_offset = file_writer.buf.bytes_written();
1701 let mut row_group_writer = file_writer.next_row_group().unwrap();
1702 if let Some(mut writer) = row_group_writer.next_column().unwrap() {
1703 rows += writer
1704 .typed::<D>()
1705 .write_batch(&subset[..], None, None)
1706 .unwrap() as i64;
1707 writer.close().unwrap();
1708 }
1709 let last_group = row_group_writer.close().unwrap();
1710 let flushed = file_writer.flushed_row_groups();
1711 assert_eq!(flushed.len(), idx + 1);
1712 assert_eq!(Some(idx as i16), last_group.ordinal());
1713 assert_eq!(Some(row_group_file_offset as i64), last_group.file_offset());
1714 assert_eq!(&flushed[idx], last_group.as_ref());
1715 }
1716 let file_metadata = file_writer.close().unwrap();
1717
1718 let reader = SerializedFileReader::new(R::from(file)).unwrap();
1719 assert_eq!(reader.num_row_groups(), data.len());
1720 assert_eq!(
1721 reader.metadata().file_metadata().num_rows(),
1722 rows,
1723 "row count in metadata not equal to number of rows written"
1724 );
1725 for (i, item) in data.iter().enumerate().take(reader.num_row_groups()) {
1726 let row_group_reader = reader.get_row_group(i).unwrap();
1727 let iter = row_group_reader.get_row_iter(None).unwrap();
1728 let res: Vec<_> = iter.map(|row| row.unwrap()).map(&value).collect();
1729 let row_group_size = row_group_reader.metadata().total_byte_size();
1730 let uncompressed_size: i64 = row_group_reader
1731 .metadata()
1732 .columns()
1733 .iter()
1734 .map(|v| v.uncompressed_size())
1735 .sum();
1736 assert_eq!(row_group_size, uncompressed_size);
1737 assert_eq!(res, *item);
1738 }
1739 file_metadata
1740 }
1741
1742 fn test_file_roundtrip(file: File, data: Vec<Vec<i32>>) -> ParquetMetaData {
1745 test_roundtrip_i32::<File, File>(file, data, Compression::UNCOMPRESSED)
1746 }
1747
1748 #[test]
1749 fn test_bytes_writer_empty_row_groups() {
1750 test_bytes_roundtrip(vec![], Compression::UNCOMPRESSED);
1751 }
1752
1753 #[test]
1754 fn test_bytes_writer_single_row_group() {
1755 test_bytes_roundtrip(vec![vec![1, 2, 3, 4, 5]], Compression::UNCOMPRESSED);
1756 }
1757
1758 #[test]
1759 fn test_bytes_writer_multiple_row_groups() {
1760 test_bytes_roundtrip(
1761 vec![
1762 vec![1, 2, 3, 4, 5],
1763 vec![1, 2, 3],
1764 vec![1],
1765 vec![1, 2, 3, 4, 5, 6],
1766 ],
1767 Compression::UNCOMPRESSED,
1768 );
1769 }
1770
1771 #[test]
1772 fn test_bytes_writer_single_row_group_compressed() {
1773 test_bytes_roundtrip(vec![vec![1, 2, 3, 4, 5]], Compression::SNAPPY);
1774 }
1775
1776 #[test]
1777 fn test_bytes_writer_multiple_row_groups_compressed() {
1778 test_bytes_roundtrip(
1779 vec![
1780 vec![1, 2, 3, 4, 5],
1781 vec![1, 2, 3],
1782 vec![1],
1783 vec![1, 2, 3, 4, 5, 6],
1784 ],
1785 Compression::SNAPPY,
1786 );
1787 }
1788
1789 fn test_bytes_roundtrip(data: Vec<Vec<i32>>, compression: Compression) {
1790 test_roundtrip_i32::<Vec<u8>, Bytes>(Vec::with_capacity(1024), data, compression);
1791 }
1792
1793 #[test]
1794 fn test_boolean_roundtrip() {
1795 let my_bool_values: Vec<_> = (0..2049).map(|idx| idx % 2 == 0).collect();
1796 test_roundtrip::<Vec<u8>, Bytes, BoolType, _>(
1797 Vec::with_capacity(1024),
1798 vec![my_bool_values],
1799 |r| r.get_bool(0).unwrap(),
1800 Compression::UNCOMPRESSED,
1801 );
1802 }
1803
1804 #[test]
1805 fn test_boolean_compressed_roundtrip() {
1806 let my_bool_values: Vec<_> = (0..2049).map(|idx| idx % 2 == 0).collect();
1807 test_roundtrip::<Vec<u8>, Bytes, BoolType, _>(
1808 Vec::with_capacity(1024),
1809 vec![my_bool_values],
1810 |r| r.get_bool(0).unwrap(),
1811 Compression::SNAPPY,
1812 );
1813 }
1814
1815 #[test]
1816 fn test_column_offset_index_file() {
1817 let file = tempfile::tempfile().unwrap();
1818 let file_metadata = test_file_roundtrip(file, vec![vec![1, 2, 3, 4, 5]]);
1819 file_metadata.row_groups().iter().for_each(|row_group| {
1820 row_group.columns().iter().for_each(|column_chunk| {
1821 assert!(column_chunk.column_index_offset().is_some());
1822 assert!(column_chunk.column_index_length().is_some());
1823 assert!(column_chunk.offset_index_offset().is_some());
1824 assert!(column_chunk.offset_index_length().is_some());
1825 })
1826 });
1827 }
1828
1829 fn test_kv_metadata(initial_kv: Option<Vec<KeyValue>>, final_kv: Option<Vec<KeyValue>>) {
1830 let schema = Arc::new(
1831 types::Type::group_type_builder("schema")
1832 .with_fields(vec![Arc::new(
1833 types::Type::primitive_type_builder("col1", Type::INT32)
1834 .with_repetition(Repetition::REQUIRED)
1835 .build()
1836 .unwrap(),
1837 )])
1838 .build()
1839 .unwrap(),
1840 );
1841 let mut out = Vec::with_capacity(1024);
1842 let props = Arc::new(
1843 WriterProperties::builder()
1844 .set_key_value_metadata(initial_kv.clone())
1845 .build(),
1846 );
1847 let mut writer = SerializedFileWriter::new(&mut out, schema, props).unwrap();
1848 let mut row_group_writer = writer.next_row_group().unwrap();
1849 let column = row_group_writer.next_column().unwrap().unwrap();
1850 column.close().unwrap();
1851 row_group_writer.close().unwrap();
1852 if let Some(kvs) = &final_kv {
1853 for kv in kvs {
1854 writer.append_key_value_metadata(kv.clone())
1855 }
1856 }
1857 writer.close().unwrap();
1858
1859 let reader = SerializedFileReader::new(Bytes::from(out)).unwrap();
1860 let metadata = reader.metadata().file_metadata();
1861 let keys = metadata.key_value_metadata();
1862
1863 match (initial_kv, final_kv) {
1864 (Some(a), Some(b)) => {
1865 let keys = keys.unwrap();
1866 assert_eq!(keys.len(), a.len() + b.len());
1867 assert_eq!(&keys[..a.len()], a.as_slice());
1868 assert_eq!(&keys[a.len()..], b.as_slice());
1869 }
1870 (Some(v), None) => assert_eq!(keys.unwrap(), &v),
1871 (None, Some(v)) if !v.is_empty() => assert_eq!(keys.unwrap(), &v),
1872 _ => assert!(keys.is_none()),
1873 }
1874 }
1875
1876 #[test]
1877 fn test_append_metadata() {
1878 let kv1 = KeyValue::new("cupcakes".to_string(), "awesome".to_string());
1879 let kv2 = KeyValue::new("bingo".to_string(), "bongo".to_string());
1880
1881 test_kv_metadata(None, None);
1882 test_kv_metadata(Some(vec![kv1.clone()]), None);
1883 test_kv_metadata(None, Some(vec![kv2.clone()]));
1884 test_kv_metadata(Some(vec![kv1.clone()]), Some(vec![kv2.clone()]));
1885 test_kv_metadata(Some(vec![]), Some(vec![kv2]));
1886 test_kv_metadata(Some(vec![]), Some(vec![]));
1887 test_kv_metadata(Some(vec![kv1]), Some(vec![]));
1888 test_kv_metadata(None, Some(vec![]));
1889 }
1890
1891 #[test]
1892 fn test_backwards_compatible_statistics() {
1893 let message_type = "
1894 message test_schema {
1895 REQUIRED INT32 decimal1 (DECIMAL(8,2));
1896 REQUIRED INT32 i32 (INTEGER(32,true));
1897 REQUIRED INT32 u32 (INTEGER(32,false));
1898 }
1899 ";
1900
1901 let schema = Arc::new(parse_message_type(message_type).unwrap());
1902 let props = Default::default();
1903 let mut writer = SerializedFileWriter::new(vec![], schema, props).unwrap();
1904 let mut row_group_writer = writer.next_row_group().unwrap();
1905
1906 for _ in 0..3 {
1907 let mut writer = row_group_writer.next_column().unwrap().unwrap();
1908 writer
1909 .typed::<Int32Type>()
1910 .write_batch(&[1, 2, 3], None, None)
1911 .unwrap();
1912 writer.close().unwrap();
1913 }
1914 let metadata = row_group_writer.close().unwrap();
1915 writer.close().unwrap();
1916
1917 let s = page_stats_to_thrift(metadata.column(0).statistics()).unwrap();
1919 assert_eq!(s.min.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1920 assert_eq!(s.max.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1921 assert_eq!(s.min_value.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1922 assert_eq!(s.max_value.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1923
1924 let s = page_stats_to_thrift(metadata.column(1).statistics()).unwrap();
1926 assert_eq!(s.min.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1927 assert_eq!(s.max.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1928 assert_eq!(s.min_value.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1929 assert_eq!(s.max_value.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1930
1931 let s = page_stats_to_thrift(metadata.column(2).statistics()).unwrap();
1933 assert_eq!(s.min.as_deref(), None);
1934 assert_eq!(s.max.as_deref(), None);
1935 assert_eq!(s.min_value.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1936 assert_eq!(s.max_value.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1937 }
1938
1939 #[test]
1940 fn test_spliced_write() {
1941 let message_type = "
1942 message test_schema {
1943 REQUIRED INT32 i32 (INTEGER(32,true));
1944 REQUIRED INT32 u32 (INTEGER(32,false));
1945 }
1946 ";
1947 let schema = Arc::new(parse_message_type(message_type).unwrap());
1948 let props = Arc::new(WriterProperties::builder().build());
1949
1950 let mut file = Vec::with_capacity(1024);
1951 let mut file_writer = SerializedFileWriter::new(&mut file, schema, props.clone()).unwrap();
1952
1953 let columns = file_writer.descr.columns();
1954 let mut column_state: Vec<(_, Option<ColumnCloseResult>)> = columns
1955 .iter()
1956 .map(|_| (TrackedWrite::new(Vec::with_capacity(1024)), None))
1957 .collect();
1958
1959 let mut column_state_slice = column_state.as_mut_slice();
1960 let mut column_writers = Vec::with_capacity(columns.len());
1961 for c in columns {
1962 let ((buf, out), tail) = column_state_slice.split_first_mut().unwrap();
1963 column_state_slice = tail;
1964
1965 let page_writer = Box::new(SerializedPageWriter::new(buf));
1966 let col_writer = get_column_writer(c.clone(), props.clone(), page_writer);
1967 column_writers.push(SerializedColumnWriter::new(
1968 col_writer,
1969 Some(Box::new(|on_close| {
1970 *out = Some(on_close);
1971 Ok(())
1972 })),
1973 ));
1974 }
1975
1976 let column_data = [[1, 2, 3, 4], [7, 3, 7, 3]];
1977
1978 for (writer, batch) in column_writers.iter_mut().zip(column_data) {
1980 let writer = writer.typed::<Int32Type>();
1981 writer.write_batch(&batch, None, None).unwrap();
1982 }
1983
1984 for writer in column_writers {
1986 writer.close().unwrap()
1987 }
1988
1989 let mut row_group_writer = file_writer.next_row_group().unwrap();
1991 for (write, close) in column_state {
1992 let buf = Bytes::from(write.into_inner().unwrap());
1993 row_group_writer
1994 .append_column(&buf, close.unwrap())
1995 .unwrap();
1996 }
1997 row_group_writer.close().unwrap();
1998 file_writer.close().unwrap();
1999
2000 let file = Bytes::from(file);
2002 let test_read = |reader: SerializedFileReader<Bytes>| {
2003 let row_group = reader.get_row_group(0).unwrap();
2004
2005 let mut out = Vec::with_capacity(4);
2006 let c1 = row_group.get_column_reader(0).unwrap();
2007 let mut c1 = get_typed_column_reader::<Int32Type>(c1);
2008 c1.read_records(4, None, None, &mut out).unwrap();
2009 assert_eq!(out, column_data[0]);
2010
2011 out.clear();
2012
2013 let c2 = row_group.get_column_reader(1).unwrap();
2014 let mut c2 = get_typed_column_reader::<Int32Type>(c2);
2015 c2.read_records(4, None, None, &mut out).unwrap();
2016 assert_eq!(out, column_data[1]);
2017 };
2018
2019 let reader = SerializedFileReader::new(file.clone()).unwrap();
2020 test_read(reader);
2021
2022 let options = ReadOptionsBuilder::new().with_page_index().build();
2023 let reader = SerializedFileReader::new_with_options(file, options).unwrap();
2024 test_read(reader);
2025 }
2026
2027 #[test]
2028 fn test_disabled_statistics() {
2029 let message_type = "
2030 message test_schema {
2031 REQUIRED INT32 a;
2032 REQUIRED INT32 b;
2033 }
2034 ";
2035 let schema = Arc::new(parse_message_type(message_type).unwrap());
2036 let props = WriterProperties::builder()
2037 .set_statistics_enabled(EnabledStatistics::None)
2038 .set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
2039 .set_offset_index_disabled(true) .build();
2041 let mut file = Vec::with_capacity(1024);
2042 let mut file_writer =
2043 SerializedFileWriter::new(&mut file, schema, Arc::new(props)).unwrap();
2044
2045 let mut row_group_writer = file_writer.next_row_group().unwrap();
2046 let mut a_writer = row_group_writer.next_column().unwrap().unwrap();
2047 let col_writer = a_writer.typed::<Int32Type>();
2048 col_writer.write_batch(&[1, 2, 3], None, None).unwrap();
2049 a_writer.close().unwrap();
2050
2051 let mut b_writer = row_group_writer.next_column().unwrap().unwrap();
2052 let col_writer = b_writer.typed::<Int32Type>();
2053 col_writer.write_batch(&[4, 5, 6], None, None).unwrap();
2054 b_writer.close().unwrap();
2055 row_group_writer.close().unwrap();
2056
2057 let metadata = file_writer.finish().unwrap();
2058 assert_eq!(metadata.num_row_groups(), 1);
2059 let row_group = metadata.row_group(0);
2060 assert_eq!(row_group.num_columns(), 2);
2061 assert!(row_group.column(0).offset_index_offset().is_some());
2063 assert!(row_group.column(0).column_index_offset().is_some());
2064 assert!(row_group.column(1).offset_index_offset().is_some());
2066 assert!(row_group.column(1).column_index_offset().is_none());
2067
2068 let err = file_writer.next_row_group().err().unwrap().to_string();
2069 assert_eq!(err, "Parquet error: SerializedFileWriter already finished");
2070
2071 drop(file_writer);
2072
2073 let options = ReadOptionsBuilder::new().with_page_index().build();
2074 let reader = SerializedFileReader::new_with_options(Bytes::from(file), options).unwrap();
2075
2076 let offset_index = reader.metadata().offset_index().unwrap();
2077 assert_eq!(offset_index.len(), 1); assert_eq!(offset_index[0].len(), 2); let column_index = reader.metadata().column_index().unwrap();
2081 assert_eq!(column_index.len(), 1); assert_eq!(column_index[0].len(), 2); let a_idx = &column_index[0][0];
2085 assert!(matches!(a_idx, ColumnIndexMetaData::INT32(_)), "{a_idx:?}");
2086 let b_idx = &column_index[0][1];
2087 assert!(matches!(b_idx, ColumnIndexMetaData::NONE), "{b_idx:?}");
2088 }
2089
2090 #[test]
2091 fn test_byte_array_size_statistics() {
2092 let message_type = "
2093 message test_schema {
2094 OPTIONAL BYTE_ARRAY a (UTF8);
2095 }
2096 ";
2097 let schema = Arc::new(parse_message_type(message_type).unwrap());
2098 let data = ByteArrayType::gen_vec(32, 7);
2099 let def_levels = [1, 1, 1, 1, 0, 1, 0, 1, 0, 1];
2100 let unenc_size: i64 = data.iter().map(|x| x.len() as i64).sum();
2101 let file: File = tempfile::tempfile().unwrap();
2102 let props = Arc::new(
2103 WriterProperties::builder()
2104 .set_statistics_enabled(EnabledStatistics::Page)
2105 .build(),
2106 );
2107
2108 let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
2109 let mut row_group_writer = writer.next_row_group().unwrap();
2110
2111 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
2112 col_writer
2113 .typed::<ByteArrayType>()
2114 .write_batch(&data, Some(&def_levels), None)
2115 .unwrap();
2116 col_writer.close().unwrap();
2117 row_group_writer.close().unwrap();
2118 let file_metadata = writer.close().unwrap();
2119
2120 assert_eq!(file_metadata.num_row_groups(), 1);
2121 assert_eq!(file_metadata.row_group(0).num_columns(), 1);
2122
2123 let check_def_hist = |def_hist: &[i64]| {
2124 assert_eq!(def_hist.len(), 2);
2125 assert_eq!(def_hist[0], 3);
2126 assert_eq!(def_hist[1], 7);
2127 };
2128
2129 let meta_data = file_metadata.row_group(0).column(0);
2130
2131 assert!(meta_data.repetition_level_histogram().is_none());
2132 assert!(meta_data.definition_level_histogram().is_some());
2133 assert!(meta_data.unencoded_byte_array_data_bytes().is_some());
2134 assert_eq!(
2135 unenc_size,
2136 meta_data.unencoded_byte_array_data_bytes().unwrap()
2137 );
2138 check_def_hist(meta_data.definition_level_histogram().unwrap().values());
2139
2140 let options = ReadOptionsBuilder::new().with_page_index().build();
2142 let reader = SerializedFileReader::new_with_options(file, options).unwrap();
2143
2144 let rfile_metadata = reader.metadata().file_metadata();
2145 assert_eq!(
2146 rfile_metadata.num_rows(),
2147 file_metadata.file_metadata().num_rows()
2148 );
2149 assert_eq!(reader.num_row_groups(), 1);
2150 let rowgroup = reader.get_row_group(0).unwrap();
2151 assert_eq!(rowgroup.num_columns(), 1);
2152 let column = rowgroup.metadata().column(0);
2153 assert!(column.definition_level_histogram().is_some());
2154 assert!(column.repetition_level_histogram().is_none());
2155 assert!(column.unencoded_byte_array_data_bytes().is_some());
2156 check_def_hist(column.definition_level_histogram().unwrap().values());
2157 assert_eq!(
2158 unenc_size,
2159 column.unencoded_byte_array_data_bytes().unwrap()
2160 );
2161
2162 assert!(reader.metadata().column_index().is_some());
2164 let column_index = reader.metadata().column_index().unwrap();
2165 assert_eq!(column_index.len(), 1);
2166 assert_eq!(column_index[0].len(), 1);
2167 let col_idx = if let ColumnIndexMetaData::BYTE_ARRAY(index) = &column_index[0][0] {
2168 assert_eq!(index.num_pages(), 1);
2169 index
2170 } else {
2171 unreachable!()
2172 };
2173
2174 assert!(col_idx.repetition_level_histogram(0).is_none());
2175 assert!(col_idx.definition_level_histogram(0).is_some());
2176 check_def_hist(col_idx.definition_level_histogram(0).unwrap());
2177
2178 assert!(reader.metadata().offset_index().is_some());
2179 let offset_index = reader.metadata().offset_index().unwrap();
2180 assert_eq!(offset_index.len(), 1);
2181 assert_eq!(offset_index[0].len(), 1);
2182 assert!(offset_index[0][0].unencoded_byte_array_data_bytes.is_some());
2183 let page_sizes = offset_index[0][0]
2184 .unencoded_byte_array_data_bytes
2185 .as_ref()
2186 .unwrap();
2187 assert_eq!(page_sizes.len(), 1);
2188 assert_eq!(page_sizes[0], unenc_size);
2189 }
2190
2191 #[test]
2192 fn test_too_many_rowgroups() {
2193 let message_type = "
2194 message test_schema {
2195 REQUIRED BYTE_ARRAY a (UTF8);
2196 }
2197 ";
2198 let schema = Arc::new(parse_message_type(message_type).unwrap());
2199 let file: File = tempfile::tempfile().unwrap();
2200 let props = Arc::new(
2201 WriterProperties::builder()
2202 .set_statistics_enabled(EnabledStatistics::None)
2203 .set_max_row_group_row_count(Some(1))
2204 .build(),
2205 );
2206 let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
2207
2208 for i in 0..0x8001 {
2210 match writer.next_row_group() {
2211 Ok(mut row_group_writer) => {
2212 assert_ne!(i, 0x8000);
2213 let col_writer = row_group_writer.next_column().unwrap().unwrap();
2214 col_writer.close().unwrap();
2215 row_group_writer.close().unwrap();
2216 }
2217 Err(e) => {
2218 assert_eq!(i, 0x8000);
2219 assert_eq!(
2220 e.to_string(),
2221 "Parquet error: Parquet does not support more than 32767 row groups per file (currently: 32768)"
2222 );
2223 }
2224 }
2225 }
2226 writer.close().unwrap();
2227 }
2228
2229 #[test]
2230 fn test_size_statistics_with_repetition_and_nulls() {
2231 let message_type = "
2232 message test_schema {
2233 OPTIONAL group i32_list (LIST) {
2234 REPEATED group list {
2235 OPTIONAL INT32 element;
2236 }
2237 }
2238 }
2239 ";
2240 let schema = Arc::new(parse_message_type(message_type).unwrap());
2247 let data = [1, 2, 4, 7, 8, 9, 10];
2248 let def_levels = [3, 3, 0, 3, 2, 1, 3, 3, 3, 3];
2249 let rep_levels = [0, 1, 0, 0, 1, 0, 0, 1, 1, 1];
2250 let file = tempfile::tempfile().unwrap();
2251 let props = Arc::new(
2252 WriterProperties::builder()
2253 .set_statistics_enabled(EnabledStatistics::Page)
2254 .build(),
2255 );
2256 let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
2257 let mut row_group_writer = writer.next_row_group().unwrap();
2258
2259 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
2260 col_writer
2261 .typed::<Int32Type>()
2262 .write_batch(&data, Some(&def_levels), Some(&rep_levels))
2263 .unwrap();
2264 col_writer.close().unwrap();
2265 row_group_writer.close().unwrap();
2266 let file_metadata = writer.close().unwrap();
2267
2268 assert_eq!(file_metadata.num_row_groups(), 1);
2269 assert_eq!(file_metadata.row_group(0).num_columns(), 1);
2270
2271 let check_def_hist = |def_hist: &[i64]| {
2272 assert_eq!(def_hist.len(), 4);
2273 assert_eq!(def_hist[0], 1);
2274 assert_eq!(def_hist[1], 1);
2275 assert_eq!(def_hist[2], 1);
2276 assert_eq!(def_hist[3], 7);
2277 };
2278
2279 let check_rep_hist = |rep_hist: &[i64]| {
2280 assert_eq!(rep_hist.len(), 2);
2281 assert_eq!(rep_hist[0], 5);
2282 assert_eq!(rep_hist[1], 5);
2283 };
2284
2285 let meta_data = file_metadata.row_group(0).column(0);
2288 assert!(meta_data.repetition_level_histogram().is_some());
2289 assert!(meta_data.definition_level_histogram().is_some());
2290 assert!(meta_data.unencoded_byte_array_data_bytes().is_none());
2291 check_def_hist(meta_data.definition_level_histogram().unwrap().values());
2292 check_rep_hist(meta_data.repetition_level_histogram().unwrap().values());
2293
2294 let options = ReadOptionsBuilder::new().with_page_index().build();
2296 let reader = SerializedFileReader::new_with_options(file, options).unwrap();
2297
2298 let rfile_metadata = reader.metadata().file_metadata();
2299 assert_eq!(
2300 rfile_metadata.num_rows(),
2301 file_metadata.file_metadata().num_rows()
2302 );
2303 assert_eq!(reader.num_row_groups(), 1);
2304 let rowgroup = reader.get_row_group(0).unwrap();
2305 assert_eq!(rowgroup.num_columns(), 1);
2306 let column = rowgroup.metadata().column(0);
2307 assert!(column.definition_level_histogram().is_some());
2308 assert!(column.repetition_level_histogram().is_some());
2309 assert!(column.unencoded_byte_array_data_bytes().is_none());
2310 check_def_hist(column.definition_level_histogram().unwrap().values());
2311 check_rep_hist(column.repetition_level_histogram().unwrap().values());
2312
2313 assert!(reader.metadata().column_index().is_some());
2315 let column_index = reader.metadata().column_index().unwrap();
2316 assert_eq!(column_index.len(), 1);
2317 assert_eq!(column_index[0].len(), 1);
2318 let col_idx = if let ColumnIndexMetaData::INT32(index) = &column_index[0][0] {
2319 assert_eq!(index.num_pages(), 1);
2320 index
2321 } else {
2322 unreachable!()
2323 };
2324
2325 check_def_hist(col_idx.definition_level_histogram(0).unwrap());
2326 check_rep_hist(col_idx.repetition_level_histogram(0).unwrap());
2327
2328 assert!(reader.metadata().offset_index().is_some());
2329 let offset_index = reader.metadata().offset_index().unwrap();
2330 assert_eq!(offset_index.len(), 1);
2331 assert_eq!(offset_index[0].len(), 1);
2332 assert!(offset_index[0][0].unencoded_byte_array_data_bytes.is_none());
2333 }
2334
2335 #[test]
2336 #[cfg(feature = "arrow")]
2337 fn test_byte_stream_split_extended_roundtrip() {
2338 let path = format!(
2339 "{}/byte_stream_split_extended.gzip.parquet",
2340 arrow::util::test_util::parquet_test_data(),
2341 );
2342 let file = File::open(path).unwrap();
2343
2344 let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(file)
2346 .expect("parquet open")
2347 .build()
2348 .expect("parquet open");
2349
2350 let file = tempfile::tempfile().unwrap();
2351 let props = WriterProperties::builder()
2352 .set_dictionary_enabled(false)
2353 .set_column_encoding(
2354 ColumnPath::from("float16_byte_stream_split"),
2355 Encoding::BYTE_STREAM_SPLIT,
2356 )
2357 .set_column_encoding(
2358 ColumnPath::from("float_byte_stream_split"),
2359 Encoding::BYTE_STREAM_SPLIT,
2360 )
2361 .set_column_encoding(
2362 ColumnPath::from("double_byte_stream_split"),
2363 Encoding::BYTE_STREAM_SPLIT,
2364 )
2365 .set_column_encoding(
2366 ColumnPath::from("int32_byte_stream_split"),
2367 Encoding::BYTE_STREAM_SPLIT,
2368 )
2369 .set_column_encoding(
2370 ColumnPath::from("int64_byte_stream_split"),
2371 Encoding::BYTE_STREAM_SPLIT,
2372 )
2373 .set_column_encoding(
2374 ColumnPath::from("flba5_byte_stream_split"),
2375 Encoding::BYTE_STREAM_SPLIT,
2376 )
2377 .set_column_encoding(
2378 ColumnPath::from("decimal_byte_stream_split"),
2379 Encoding::BYTE_STREAM_SPLIT,
2380 )
2381 .build();
2382
2383 let mut parquet_writer = ArrowWriter::try_new(
2384 file.try_clone().expect("cannot open file"),
2385 parquet_reader.schema(),
2386 Some(props),
2387 )
2388 .expect("create arrow writer");
2389
2390 for maybe_batch in parquet_reader {
2391 let batch = maybe_batch.expect("reading batch");
2392 parquet_writer.write(&batch).expect("writing data");
2393 }
2394
2395 parquet_writer.close().expect("finalizing file");
2396
2397 let reader = SerializedFileReader::new(file).expect("Failed to create reader");
2398 let filemeta = reader.metadata();
2399
2400 let check_encoding = |x: usize, filemeta: &ParquetMetaData| {
2402 assert!(
2403 filemeta
2404 .row_group(0)
2405 .column(x)
2406 .encodings()
2407 .collect::<Vec<_>>()
2408 .contains(&Encoding::BYTE_STREAM_SPLIT)
2409 );
2410 };
2411
2412 check_encoding(1, filemeta);
2413 check_encoding(3, filemeta);
2414 check_encoding(5, filemeta);
2415 check_encoding(7, filemeta);
2416 check_encoding(9, filemeta);
2417 check_encoding(11, filemeta);
2418 check_encoding(13, filemeta);
2419
2420 let mut iter = reader
2422 .get_row_iter(None)
2423 .expect("Failed to create row iterator");
2424
2425 let mut start = 0;
2426 let end = reader.metadata().file_metadata().num_rows();
2427
2428 let check_row = |row: Result<Row, ParquetError>| {
2429 assert!(row.is_ok());
2430 let r = row.unwrap();
2431 assert_eq!(r.get_float16(0).unwrap(), r.get_float16(1).unwrap());
2432 assert_eq!(r.get_float(2).unwrap(), r.get_float(3).unwrap());
2433 assert_eq!(r.get_double(4).unwrap(), r.get_double(5).unwrap());
2434 assert_eq!(r.get_int(6).unwrap(), r.get_int(7).unwrap());
2435 assert_eq!(r.get_long(8).unwrap(), r.get_long(9).unwrap());
2436 assert_eq!(r.get_bytes(10).unwrap(), r.get_bytes(11).unwrap());
2437 assert_eq!(r.get_decimal(12).unwrap(), r.get_decimal(13).unwrap());
2438 };
2439
2440 while start < end {
2441 match iter.next() {
2442 Some(row) => check_row(row),
2443 None => break,
2444 };
2445 start += 1;
2446 }
2447 }
2448
2449 #[test]
2450 fn test_rewrite_no_page_indexes() {
2451 let file = get_test_file("alltypes_tiny_pages.parquet");
2452 let metadata = ParquetMetaDataReader::new()
2453 .with_page_index_policy(PageIndexPolicy::Optional)
2454 .parse_and_finish(&file)
2455 .unwrap();
2456
2457 let props = Arc::new(WriterProperties::builder().build());
2458 let schema = metadata.file_metadata().schema_descr().root_schema_ptr();
2459 let output = Vec::<u8>::new();
2460 let mut writer = SerializedFileWriter::new(output, schema, props).unwrap();
2461
2462 for rg in metadata.row_groups() {
2463 let mut rg_out = writer.next_row_group().unwrap();
2464 for column in rg.columns() {
2465 let result = ColumnCloseResult {
2466 bytes_written: column.compressed_size() as _,
2467 rows_written: rg.num_rows() as _,
2468 metadata: column.clone(),
2469 bloom_filter: None,
2470 column_index: None,
2471 offset_index: None,
2472 };
2473 rg_out.append_column(&file, result).unwrap();
2474 }
2475 rg_out.close().unwrap();
2476 }
2477 writer.close().unwrap();
2478 }
2479
2480 #[test]
2481 fn test_rewrite_missing_column_index() {
2482 let file = get_test_file("alltypes_tiny_pages.parquet");
2484 let metadata = ParquetMetaDataReader::new()
2485 .with_page_index_policy(PageIndexPolicy::Optional)
2486 .parse_and_finish(&file)
2487 .unwrap();
2488
2489 let props = Arc::new(WriterProperties::builder().build());
2490 let schema = metadata.file_metadata().schema_descr().root_schema_ptr();
2491 let output = Vec::<u8>::new();
2492 let mut writer = SerializedFileWriter::new(output, schema, props).unwrap();
2493
2494 let column_indexes = metadata.column_index();
2495 let offset_indexes = metadata.offset_index();
2496
2497 for (rg_idx, rg) in metadata.row_groups().iter().enumerate() {
2498 let rg_column_indexes = column_indexes.and_then(|ci| ci.get(rg_idx));
2499 let rg_offset_indexes = offset_indexes.and_then(|oi| oi.get(rg_idx));
2500 let mut rg_out = writer.next_row_group().unwrap();
2501 for (col_idx, column) in rg.columns().iter().enumerate() {
2502 let column_index = rg_column_indexes.and_then(|row| row.get(col_idx)).cloned();
2503 let offset_index = rg_offset_indexes.and_then(|row| row.get(col_idx)).cloned();
2504 let result = ColumnCloseResult {
2505 bytes_written: column.compressed_size() as _,
2506 rows_written: rg.num_rows() as _,
2507 metadata: column.clone(),
2508 bloom_filter: None,
2509 column_index,
2510 offset_index,
2511 };
2512 rg_out.append_column(&file, result).unwrap();
2513 }
2514 rg_out.close().unwrap();
2515 }
2516 writer.close().unwrap();
2517 }
2518}