1use crate::bloom_filter::Sbbf;
21use crate::file::metadata::thrift::PageHeader;
22use crate::file::page_index::column_index::ColumnIndexMetaData;
23use crate::file::page_index::offset_index::OffsetIndexMetaData;
24use crate::parquet_thrift::{ThriftCompactOutputProtocol, WriteThrift};
25use std::fmt::Debug;
26use std::io::{BufWriter, IoSlice, Read};
27use std::{io::Write, sync::Arc};
28
29use crate::column::page_encryption::PageEncryptor;
30use crate::column::writer::{ColumnCloseResult, ColumnWriterImpl, get_typed_column_writer_mut};
31use crate::column::{
32 page::{CompressedPage, PageWriteSpec, PageWriter},
33 writer::{ColumnWriter, get_column_writer},
34};
35use crate::data_type::DataType;
36#[cfg(feature = "encryption")]
37use crate::encryption::encrypt::{
38 FileEncryptionProperties, FileEncryptor, get_column_crypto_metadata,
39};
40use crate::errors::{ParquetError, Result};
41#[cfg(feature = "encryption")]
42use crate::file::PARQUET_MAGIC_ENCR_FOOTER;
43use crate::file::properties::{BloomFilterPosition, WriterPropertiesPtr};
44use crate::file::reader::ChunkReader;
45use crate::file::{PARQUET_MAGIC, metadata::*};
46use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor, TypePtr};
47
48pub struct TrackedWrite<W: Write> {
52 inner: BufWriter<W>,
53 bytes_written: usize,
54}
55
56impl<W: Write> TrackedWrite<W> {
57 pub fn new(inner: W) -> Self {
59 let buf_write = BufWriter::new(inner);
60 Self {
61 inner: buf_write,
62 bytes_written: 0,
63 }
64 }
65
66 pub fn bytes_written(&self) -> usize {
68 self.bytes_written
69 }
70
71 pub fn inner(&self) -> &W {
73 self.inner.get_ref()
74 }
75
76 pub fn inner_mut(&mut self) -> &mut W {
81 self.inner.get_mut()
82 }
83
84 pub fn into_inner(self) -> Result<W> {
86 self.inner.into_inner().map_err(|err| {
87 ParquetError::General(format!("fail to get inner writer: {:?}", err.to_string()))
88 })
89 }
90}
91
92impl<W: Write> Write for TrackedWrite<W> {
93 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
94 let bytes = self.inner.write(buf)?;
95 self.bytes_written += bytes;
96 Ok(bytes)
97 }
98
99 fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> std::io::Result<usize> {
100 let bytes = self.inner.write_vectored(bufs)?;
101 self.bytes_written += bytes;
102 Ok(bytes)
103 }
104
105 fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
106 self.inner.write_all(buf)?;
107 self.bytes_written += buf.len();
108
109 Ok(())
110 }
111
112 fn flush(&mut self) -> std::io::Result<()> {
113 self.inner.flush()
114 }
115}
116
117pub type OnCloseColumnChunk<'a> = Box<dyn FnOnce(ColumnCloseResult) -> Result<()> + 'a>;
119
120pub type OnCloseRowGroup<'a, W> = Box<
126 dyn FnOnce(
127 &'a mut TrackedWrite<W>,
128 RowGroupMetaData,
129 Vec<Option<Sbbf>>,
130 Vec<Option<ColumnIndexMetaData>>,
131 Vec<Option<OffsetIndexMetaData>>,
132 ) -> Result<()>
133 + 'a
134 + Send,
135>;
136
137pub struct SerializedFileWriter<W: Write> {
157 buf: TrackedWrite<W>,
158 descr: SchemaDescPtr,
159 props: WriterPropertiesPtr,
160 row_groups: Vec<RowGroupMetaData>,
161 bloom_filters: Vec<Vec<Option<Sbbf>>>,
162 column_indexes: Vec<Vec<Option<ColumnIndexMetaData>>>,
163 offset_indexes: Vec<Vec<Option<OffsetIndexMetaData>>>,
164 row_group_index: usize,
165 kv_metadatas: Vec<KeyValue>,
167 finished: bool,
168 #[cfg(feature = "encryption")]
169 file_encryptor: Option<Arc<FileEncryptor>>,
170}
171
172impl<W: Write> Debug for SerializedFileWriter<W> {
173 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
174 f.debug_struct("SerializedFileWriter")
177 .field("descr", &self.descr)
178 .field("row_group_index", &self.row_group_index)
179 .field("kv_metadatas", &self.kv_metadatas)
180 .finish_non_exhaustive()
181 }
182}
183
184impl<W: Write + Send> SerializedFileWriter<W> {
185 pub fn new(buf: W, schema: TypePtr, properties: WriterPropertiesPtr) -> Result<Self> {
187 let mut buf = TrackedWrite::new(buf);
188
189 let schema_descriptor = SchemaDescriptor::new(schema.clone());
190
191 #[cfg(feature = "encryption")]
192 let file_encryptor = Self::get_file_encryptor(&properties, &schema_descriptor)?;
193
194 Self::start_file(&properties, &mut buf)?;
195 Ok(Self {
196 buf,
197 descr: Arc::new(schema_descriptor),
198 props: properties,
199 row_groups: vec![],
200 bloom_filters: vec![],
201 column_indexes: Vec::new(),
202 offset_indexes: Vec::new(),
203 row_group_index: 0,
204 kv_metadatas: Vec::new(),
205 finished: false,
206 #[cfg(feature = "encryption")]
207 file_encryptor,
208 })
209 }
210
211 #[cfg(feature = "encryption")]
212 fn get_file_encryptor(
213 properties: &WriterPropertiesPtr,
214 schema_descriptor: &SchemaDescriptor,
215 ) -> Result<Option<Arc<FileEncryptor>>> {
216 if let Some(file_encryption_properties) = properties.file_encryption_properties() {
217 file_encryption_properties.validate_encrypted_column_names(schema_descriptor)?;
218
219 Ok(Some(Arc::new(FileEncryptor::new(Arc::clone(
220 file_encryption_properties,
221 ))?)))
222 } else {
223 Ok(None)
224 }
225 }
226
227 pub fn next_row_group(&mut self) -> Result<SerializedRowGroupWriter<'_, W>> {
236 self.assert_previous_writer_closed()?;
237 let ordinal = self.row_group_index;
238
239 let ordinal: i16 = ordinal.try_into().map_err(|_| {
240 ParquetError::General(format!(
241 "Parquet does not support more than {} row groups per file (currently: {})",
242 i16::MAX,
243 ordinal
244 ))
245 })?;
246
247 self.row_group_index = self
248 .row_group_index
249 .checked_add(1)
250 .expect("SerializedFileWriter::row_group_index overflowed");
251
252 let bloom_filter_position = self.properties().bloom_filter_position();
253 let row_groups = &mut self.row_groups;
254 let row_bloom_filters = &mut self.bloom_filters;
255 let row_column_indexes = &mut self.column_indexes;
256 let row_offset_indexes = &mut self.offset_indexes;
257 let on_close = move |buf,
258 mut metadata,
259 row_group_bloom_filter,
260 row_group_column_index,
261 row_group_offset_index| {
262 row_bloom_filters.push(row_group_bloom_filter);
263 row_column_indexes.push(row_group_column_index);
264 row_offset_indexes.push(row_group_offset_index);
265 match bloom_filter_position {
267 BloomFilterPosition::AfterRowGroup => {
268 write_bloom_filters(buf, row_bloom_filters, &mut metadata)?
269 }
270 BloomFilterPosition::End => (),
271 };
272 row_groups.push(metadata);
273 Ok(())
274 };
275
276 let row_group_writer = SerializedRowGroupWriter::new(
277 self.descr.clone(),
278 self.props.clone(),
279 &mut self.buf,
280 ordinal,
281 Some(Box::new(on_close)),
282 );
283 #[cfg(feature = "encryption")]
284 let row_group_writer = row_group_writer.with_file_encryptor(self.file_encryptor.clone());
285
286 Ok(row_group_writer)
287 }
288
289 pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
291 &self.row_groups
292 }
293
294 pub fn finish(&mut self) -> Result<ParquetMetaData> {
300 self.assert_previous_writer_closed()?;
301 let metadata = self.write_metadata()?;
302 self.buf.flush()?;
303 Ok(metadata)
304 }
305
306 pub fn close(mut self) -> Result<ParquetMetaData> {
308 self.finish()
309 }
310
311 #[cfg(not(feature = "encryption"))]
313 fn start_file(_properties: &WriterPropertiesPtr, buf: &mut TrackedWrite<W>) -> Result<()> {
314 buf.write_all(get_file_magic())?;
315 Ok(())
316 }
317
318 #[cfg(feature = "encryption")]
320 fn start_file(properties: &WriterPropertiesPtr, buf: &mut TrackedWrite<W>) -> Result<()> {
321 let magic = get_file_magic(properties.file_encryption_properties.as_ref());
322
323 buf.write_all(magic)?;
324 Ok(())
325 }
326
327 fn write_metadata(&mut self) -> Result<ParquetMetaData> {
330 self.finished = true;
331
332 for row_group in &mut self.row_groups {
334 write_bloom_filters(&mut self.buf, &mut self.bloom_filters, row_group)?;
335 }
336
337 let key_value_metadata = match self.props.key_value_metadata() {
338 Some(kv) => Some(kv.iter().chain(&self.kv_metadatas).cloned().collect()),
339 None if self.kv_metadatas.is_empty() => None,
340 None => Some(self.kv_metadatas.clone()),
341 };
342
343 let row_groups = std::mem::take(&mut self.row_groups);
345 let column_indexes = std::mem::take(&mut self.column_indexes);
346 let offset_indexes = std::mem::take(&mut self.offset_indexes);
347
348 let mut encoder = ThriftMetadataWriter::new(
349 &mut self.buf,
350 &self.descr,
351 row_groups,
352 Some(self.props.created_by().to_string()),
353 self.props.writer_version().as_num(),
354 );
355
356 #[cfg(feature = "encryption")]
357 {
358 encoder = encoder.with_file_encryptor(self.file_encryptor.clone());
359 }
360
361 if let Some(key_value_metadata) = key_value_metadata {
362 encoder = encoder.with_key_value_metadata(key_value_metadata)
363 }
364
365 encoder = encoder.with_column_indexes(column_indexes);
366 if !self.props.offset_index_disabled() {
367 encoder = encoder.with_offset_indexes(offset_indexes);
368 }
369 encoder.finish()
370 }
371
372 #[inline]
373 fn assert_previous_writer_closed(&self) -> Result<()> {
374 if self.finished {
375 return Err(general_err!("SerializedFileWriter already finished"));
376 }
377
378 if self.row_group_index != self.row_groups.len() {
379 Err(general_err!("Previous row group writer was not closed"))
380 } else {
381 Ok(())
382 }
383 }
384
385 pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
387 self.kv_metadatas.push(kv_metadata);
388 }
389
390 pub fn schema_descr(&self) -> &SchemaDescriptor {
392 &self.descr
393 }
394
395 #[cfg(feature = "arrow")]
397 pub(crate) fn schema_descr_ptr(&self) -> &SchemaDescPtr {
398 &self.descr
399 }
400
401 pub fn properties(&self) -> &WriterPropertiesPtr {
403 &self.props
404 }
405
406 pub fn inner(&self) -> &W {
408 self.buf.inner()
409 }
410
411 pub fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
420 self.buf.write_all(buf)
421 }
422
423 pub fn flush(&mut self) -> std::io::Result<()> {
425 self.buf.flush()
426 }
427
428 pub fn inner_mut(&mut self) -> &mut W {
437 self.buf.inner_mut()
438 }
439
440 pub fn into_inner(mut self) -> Result<W> {
442 self.assert_previous_writer_closed()?;
443 let _ = self.write_metadata()?;
444
445 self.buf.into_inner()
446 }
447
448 pub fn bytes_written(&self) -> usize {
450 self.buf.bytes_written()
451 }
452
453 #[cfg(feature = "encryption")]
455 pub(crate) fn file_encryptor(&self) -> Option<Arc<FileEncryptor>> {
456 self.file_encryptor.clone()
457 }
458}
459
460fn write_bloom_filters<W: Write + Send>(
463 buf: &mut TrackedWrite<W>,
464 bloom_filters: &mut [Vec<Option<Sbbf>>],
465 row_group: &mut RowGroupMetaData,
466) -> Result<()> {
467 let row_group_idx: u16 = row_group
472 .ordinal()
473 .expect("Missing row group ordinal")
474 .try_into()
475 .map_err(|_| {
476 ParquetError::General(format!(
477 "Negative row group ordinal: {})",
478 row_group.ordinal().unwrap()
479 ))
480 })?;
481 let row_group_idx = row_group_idx as usize;
482 for (column_idx, column_chunk) in row_group.columns_mut().iter_mut().enumerate() {
483 if let Some(bloom_filter) = bloom_filters[row_group_idx][column_idx].take() {
484 let start_offset = buf.bytes_written();
485 bloom_filter.write(&mut *buf)?;
486 let end_offset = buf.bytes_written();
487 *column_chunk = column_chunk
489 .clone()
490 .into_builder()
491 .set_bloom_filter_offset(Some(start_offset as i64))
492 .set_bloom_filter_length(Some((end_offset - start_offset) as i32))
493 .build()?;
494 }
495 }
496 Ok(())
497}
498
499pub struct SerializedRowGroupWriter<'a, W: Write> {
512 descr: SchemaDescPtr,
513 props: WriterPropertiesPtr,
514 buf: &'a mut TrackedWrite<W>,
515 total_rows_written: Option<u64>,
516 total_bytes_written: u64,
517 total_uncompressed_bytes: i64,
518 column_index: usize,
519 row_group_metadata: Option<RowGroupMetaDataPtr>,
520 column_chunks: Vec<ColumnChunkMetaData>,
521 bloom_filters: Vec<Option<Sbbf>>,
522 column_indexes: Vec<Option<ColumnIndexMetaData>>,
523 offset_indexes: Vec<Option<OffsetIndexMetaData>>,
524 row_group_index: i16,
525 file_offset: i64,
526 on_close: Option<OnCloseRowGroup<'a, W>>,
527 #[cfg(feature = "encryption")]
528 file_encryptor: Option<Arc<FileEncryptor>>,
529}
530
531impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> {
532 pub fn new(
541 schema_descr: SchemaDescPtr,
542 properties: WriterPropertiesPtr,
543 buf: &'a mut TrackedWrite<W>,
544 row_group_index: i16,
545 on_close: Option<OnCloseRowGroup<'a, W>>,
546 ) -> Self {
547 let num_columns = schema_descr.num_columns();
548 let file_offset = buf.bytes_written() as i64;
549 Self {
550 buf,
551 row_group_index,
552 file_offset,
553 on_close,
554 total_rows_written: None,
555 descr: schema_descr,
556 props: properties,
557 column_index: 0,
558 row_group_metadata: None,
559 column_chunks: Vec::with_capacity(num_columns),
560 bloom_filters: Vec::with_capacity(num_columns),
561 column_indexes: Vec::with_capacity(num_columns),
562 offset_indexes: Vec::with_capacity(num_columns),
563 total_bytes_written: 0,
564 total_uncompressed_bytes: 0,
565 #[cfg(feature = "encryption")]
566 file_encryptor: None,
567 }
568 }
569
570 #[cfg(feature = "encryption")]
571 pub(crate) fn with_file_encryptor(
573 mut self,
574 file_encryptor: Option<Arc<FileEncryptor>>,
575 ) -> Self {
576 self.file_encryptor = file_encryptor;
577 self
578 }
579
580 fn next_column_desc(&mut self) -> Option<ColumnDescPtr> {
582 let ret = self.descr.columns().get(self.column_index)?.clone();
583 self.column_index += 1;
584 Some(ret)
585 }
586
587 fn get_on_close(&mut self) -> (&mut TrackedWrite<W>, OnCloseColumnChunk<'_>) {
589 let total_bytes_written = &mut self.total_bytes_written;
590 let total_uncompressed_bytes = &mut self.total_uncompressed_bytes;
591 let total_rows_written = &mut self.total_rows_written;
592 let column_chunks = &mut self.column_chunks;
593 let column_indexes = &mut self.column_indexes;
594 let offset_indexes = &mut self.offset_indexes;
595 let bloom_filters = &mut self.bloom_filters;
596
597 let on_close = |r: ColumnCloseResult| {
598 *total_bytes_written += r.bytes_written;
600 *total_uncompressed_bytes += r.metadata.uncompressed_size();
601 column_chunks.push(r.metadata);
602 bloom_filters.push(r.bloom_filter);
603 column_indexes.push(r.column_index);
604 offset_indexes.push(r.offset_index);
605
606 if let Some(rows) = *total_rows_written {
607 if rows != r.rows_written {
608 return Err(general_err!(
609 "Incorrect number of rows, expected {} != {} rows",
610 rows,
611 r.rows_written
612 ));
613 }
614 } else {
615 *total_rows_written = Some(r.rows_written);
616 }
617
618 Ok(())
619 };
620 (self.buf, Box::new(on_close))
621 }
622
623 pub(crate) fn next_column_with_factory<'b, F, C>(&'b mut self, factory: F) -> Result<Option<C>>
626 where
627 F: FnOnce(
628 ColumnDescPtr,
629 WriterPropertiesPtr,
630 Box<dyn PageWriter + 'b>,
631 OnCloseColumnChunk<'b>,
632 ) -> Result<C>,
633 {
634 self.assert_previous_writer_closed()?;
635
636 let encryptor_context = self.get_page_encryptor_context();
637
638 Ok(match self.next_column_desc() {
639 Some(column) => {
640 let props = self.props.clone();
641 let (buf, on_close) = self.get_on_close();
642
643 let page_writer = SerializedPageWriter::new(buf);
644 let page_writer =
645 Self::set_page_writer_encryptor(&column, encryptor_context, page_writer)?;
646
647 Some(factory(
648 column,
649 props,
650 Box::new(page_writer),
651 Box::new(on_close),
652 )?)
653 }
654 None => None,
655 })
656 }
657
658 pub fn next_column(&mut self) -> Result<Option<SerializedColumnWriter<'_>>> {
662 self.next_column_with_factory(|descr, props, page_writer, on_close| {
663 let column_writer = get_column_writer(descr, props, page_writer);
664 Ok(SerializedColumnWriter::new(column_writer, Some(on_close)))
665 })
666 }
667
668 pub fn append_column<R: ChunkReader>(
683 &mut self,
684 reader: &R,
685 mut close: ColumnCloseResult,
686 ) -> Result<()> {
687 self.assert_previous_writer_closed()?;
688 let desc = self
689 .next_column_desc()
690 .ok_or_else(|| general_err!("exhausted columns in SerializedRowGroupWriter"))?;
691
692 let metadata = close.metadata;
693
694 if metadata.column_descr() != desc.as_ref() {
695 return Err(general_err!(
696 "column descriptor mismatch, expected {:?} got {:?}",
697 desc,
698 metadata.column_descr()
699 ));
700 }
701
702 let src_dictionary_offset = metadata.dictionary_page_offset();
703 let src_data_offset = metadata.data_page_offset();
704 let src_offset = src_dictionary_offset.unwrap_or(src_data_offset);
705 let src_length = metadata.compressed_size();
706
707 let write_offset = self.buf.bytes_written();
708 let mut read = reader.get_read(src_offset as _)?.take(src_length as _);
709 let write_length = std::io::copy(&mut read, &mut self.buf)?;
710
711 if src_length as u64 != write_length {
712 return Err(general_err!(
713 "Failed to splice column data, expected {read_length} got {write_length}"
714 ));
715 }
716
717 let map_offset = |x| x - src_offset + write_offset as i64;
718 let mut builder = ColumnChunkMetaData::builder(metadata.column_descr_ptr())
719 .set_compression(metadata.compression())
720 .set_encodings_mask(*metadata.encodings_mask())
721 .set_total_compressed_size(metadata.compressed_size())
722 .set_total_uncompressed_size(metadata.uncompressed_size())
723 .set_num_values(metadata.num_values())
724 .set_data_page_offset(map_offset(src_data_offset))
725 .set_dictionary_page_offset(src_dictionary_offset.map(map_offset))
726 .set_unencoded_byte_array_data_bytes(metadata.unencoded_byte_array_data_bytes());
727
728 if let Some(rep_hist) = metadata.repetition_level_histogram() {
729 builder = builder.set_repetition_level_histogram(Some(rep_hist.clone()))
730 }
731 if let Some(def_hist) = metadata.definition_level_histogram() {
732 builder = builder.set_definition_level_histogram(Some(def_hist.clone()))
733 }
734 if let Some(statistics) = metadata.statistics() {
735 builder = builder.set_statistics(statistics.clone())
736 }
737 if let Some(geo_statistics) = metadata.geo_statistics() {
738 builder = builder.set_geo_statistics(Box::new(geo_statistics.clone()))
739 }
740 if let Some(page_encoding_stats) = metadata.page_encoding_stats() {
741 builder = builder.set_page_encoding_stats(page_encoding_stats.clone())
742 }
743 builder = self.set_column_crypto_metadata(builder, &metadata);
744 close.metadata = builder.build()?;
745
746 if let Some(offsets) = close.offset_index.as_mut() {
747 for location in &mut offsets.page_locations {
748 location.offset = map_offset(location.offset)
749 }
750 }
751
752 let (_, on_close) = self.get_on_close();
753 on_close(close)
754 }
755
756 pub fn close(mut self) -> Result<RowGroupMetaDataPtr> {
758 if self.row_group_metadata.is_none() {
759 self.assert_previous_writer_closed()?;
760
761 let column_chunks = std::mem::take(&mut self.column_chunks);
762 let row_group_metadata = RowGroupMetaData::builder(self.descr.clone())
763 .set_column_metadata(column_chunks)
764 .set_total_byte_size(self.total_uncompressed_bytes)
765 .set_num_rows(self.total_rows_written.unwrap_or(0) as i64)
766 .set_sorting_columns(self.props.sorting_columns().cloned())
767 .set_ordinal(self.row_group_index)
768 .set_file_offset(self.file_offset)
769 .build()?;
770
771 self.row_group_metadata = Some(Arc::new(row_group_metadata.clone()));
772
773 if let Some(on_close) = self.on_close.take() {
774 on_close(
775 self.buf,
776 row_group_metadata,
777 self.bloom_filters,
778 self.column_indexes,
779 self.offset_indexes,
780 )?
781 }
782 }
783
784 let metadata = self.row_group_metadata.as_ref().unwrap().clone();
785 Ok(metadata)
786 }
787
788 #[cfg(feature = "encryption")]
790 fn set_column_crypto_metadata(
791 &self,
792 builder: ColumnChunkMetaDataBuilder,
793 metadata: &ColumnChunkMetaData,
794 ) -> ColumnChunkMetaDataBuilder {
795 if let Some(file_encryptor) = self.file_encryptor.as_ref() {
796 builder.set_column_crypto_metadata(get_column_crypto_metadata(
797 file_encryptor.properties(),
798 &metadata.column_descr_ptr(),
799 ))
800 } else {
801 builder
802 }
803 }
804
805 #[cfg(feature = "encryption")]
807 fn get_page_encryptor_context(&self) -> PageEncryptorContext {
808 PageEncryptorContext {
809 file_encryptor: self.file_encryptor.clone(),
810 row_group_index: self.row_group_index as usize,
811 column_index: self.column_index,
812 }
813 }
814
815 #[cfg(feature = "encryption")]
817 fn set_page_writer_encryptor<'b>(
818 column: &ColumnDescPtr,
819 context: PageEncryptorContext,
820 page_writer: SerializedPageWriter<'b, W>,
821 ) -> Result<SerializedPageWriter<'b, W>> {
822 let page_encryptor = PageEncryptor::create_if_column_encrypted(
823 &context.file_encryptor,
824 context.row_group_index,
825 context.column_index,
826 &column.path().string(),
827 )?;
828
829 Ok(page_writer.with_page_encryptor(page_encryptor))
830 }
831
832 #[cfg(not(feature = "encryption"))]
834 fn set_column_crypto_metadata(
835 &self,
836 builder: ColumnChunkMetaDataBuilder,
837 _metadata: &ColumnChunkMetaData,
838 ) -> ColumnChunkMetaDataBuilder {
839 builder
840 }
841
842 #[cfg(not(feature = "encryption"))]
843 fn get_page_encryptor_context(&self) -> PageEncryptorContext {
844 PageEncryptorContext {}
845 }
846
847 #[cfg(not(feature = "encryption"))]
849 fn set_page_writer_encryptor<'b>(
850 _column: &ColumnDescPtr,
851 _context: PageEncryptorContext,
852 page_writer: SerializedPageWriter<'b, W>,
853 ) -> Result<SerializedPageWriter<'b, W>> {
854 Ok(page_writer)
855 }
856
857 #[inline]
858 fn assert_previous_writer_closed(&self) -> Result<()> {
859 if self.column_index != self.column_chunks.len() {
860 Err(general_err!("Previous column writer was not closed"))
861 } else {
862 Ok(())
863 }
864 }
865}
866
867#[cfg(feature = "encryption")]
869struct PageEncryptorContext {
870 file_encryptor: Option<Arc<FileEncryptor>>,
871 row_group_index: usize,
872 column_index: usize,
873}
874
875#[cfg(not(feature = "encryption"))]
876struct PageEncryptorContext {}
877
878pub struct SerializedColumnWriter<'a> {
880 inner: ColumnWriter<'a>,
881 on_close: Option<OnCloseColumnChunk<'a>>,
882}
883
884impl<'a> SerializedColumnWriter<'a> {
885 pub fn new(inner: ColumnWriter<'a>, on_close: Option<OnCloseColumnChunk<'a>>) -> Self {
888 Self { inner, on_close }
889 }
890
891 pub fn untyped(&mut self) -> &mut ColumnWriter<'a> {
893 &mut self.inner
894 }
895
896 pub fn typed<T: DataType>(&mut self) -> &mut ColumnWriterImpl<'a, T> {
898 get_typed_column_writer_mut(&mut self.inner)
899 }
900
901 pub fn close(mut self) -> Result<()> {
903 let r = self.inner.close()?;
904 if let Some(on_close) = self.on_close.take() {
905 on_close(r)?
906 }
907
908 Ok(())
909 }
910}
911
912pub struct SerializedPageWriter<'a, W: Write> {
917 sink: &'a mut TrackedWrite<W>,
918 #[cfg(feature = "encryption")]
919 page_encryptor: Option<PageEncryptor>,
920}
921
922impl<'a, W: Write> SerializedPageWriter<'a, W> {
923 pub fn new(sink: &'a mut TrackedWrite<W>) -> Self {
925 Self {
926 sink,
927 #[cfg(feature = "encryption")]
928 page_encryptor: None,
929 }
930 }
931
932 #[inline]
935 fn serialize_page_header(&mut self, header: PageHeader) -> Result<usize> {
936 let start_pos = self.sink.bytes_written();
937 match self.page_encryptor_and_sink_mut() {
938 Some((page_encryptor, sink)) => {
939 page_encryptor.encrypt_page_header(&header, sink)?;
940 }
941 None => {
942 let mut protocol = ThriftCompactOutputProtocol::new(&mut self.sink);
943 header.write_thrift(&mut protocol)?;
944 }
945 }
946 Ok(self.sink.bytes_written() - start_pos)
947 }
948}
949
950#[cfg(feature = "encryption")]
951impl<'a, W: Write> SerializedPageWriter<'a, W> {
952 fn with_page_encryptor(mut self, page_encryptor: Option<PageEncryptor>) -> Self {
954 self.page_encryptor = page_encryptor;
955 self
956 }
957
958 fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
959 self.page_encryptor.as_mut()
960 }
961
962 fn page_encryptor_and_sink_mut(
963 &mut self,
964 ) -> Option<(&mut PageEncryptor, &mut &'a mut TrackedWrite<W>)> {
965 self.page_encryptor.as_mut().map(|pe| (pe, &mut self.sink))
966 }
967}
968
969#[cfg(not(feature = "encryption"))]
970impl<'a, W: Write> SerializedPageWriter<'a, W> {
971 fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
972 None
973 }
974
975 fn page_encryptor_and_sink_mut(
976 &mut self,
977 ) -> Option<(&mut PageEncryptor, &mut &'a mut TrackedWrite<W>)> {
978 None
979 }
980}
981
982impl<W: Write + Send> PageWriter for SerializedPageWriter<'_, W> {
983 fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
984 let page = match self.page_encryptor_mut() {
985 Some(page_encryptor) => page_encryptor.encrypt_compressed_page(page)?,
986 None => page,
987 };
988
989 let page_type = page.page_type();
990 let start_pos = self.sink.bytes_written() as u64;
991
992 let page_header = page.to_thrift_header()?;
993 let header_size = self.serialize_page_header(page_header)?;
994
995 self.sink.write_all(page.data())?;
996
997 let mut spec = PageWriteSpec::new();
998 spec.page_type = page_type;
999 spec.uncompressed_size = page.uncompressed_size() + header_size;
1000 spec.compressed_size = page.compressed_size() + header_size;
1001 spec.offset = start_pos;
1002 spec.bytes_written = self.sink.bytes_written() as u64 - start_pos;
1003 spec.num_values = page.num_values();
1004
1005 if let Some(page_encryptor) = self.page_encryptor_mut() {
1006 if page.compressed_page().is_data_page() {
1007 page_encryptor.increment_page();
1008 }
1009 }
1010 Ok(spec)
1011 }
1012
1013 fn close(&mut self) -> Result<()> {
1014 self.sink.flush()?;
1015 Ok(())
1016 }
1017}
1018
1019#[cfg(feature = "encryption")]
1022pub(crate) fn get_file_magic(
1023 file_encryption_properties: Option<&Arc<FileEncryptionProperties>>,
1024) -> &'static [u8; 4] {
1025 match file_encryption_properties.as_ref() {
1026 Some(encryption_properties) if encryption_properties.encrypt_footer() => {
1027 &PARQUET_MAGIC_ENCR_FOOTER
1028 }
1029 _ => &PARQUET_MAGIC,
1030 }
1031}
1032
1033#[cfg(not(feature = "encryption"))]
1034pub(crate) fn get_file_magic() -> &'static [u8; 4] {
1035 &PARQUET_MAGIC
1036}
1037
1038#[cfg(test)]
1039mod tests {
1040 use super::*;
1041
1042 #[cfg(feature = "arrow")]
1043 use arrow_array::RecordBatchReader;
1044 use bytes::Bytes;
1045 use std::fs::File;
1046
1047 #[cfg(feature = "arrow")]
1048 use crate::arrow::ArrowWriter;
1049 #[cfg(feature = "arrow")]
1050 use crate::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
1051 use crate::basic::{
1052 ColumnOrder, Compression, ConvertedType, Encoding, LogicalType, Repetition, SortOrder, Type,
1053 };
1054 use crate::column::page::{Page, PageReader};
1055 use crate::column::reader::get_typed_column_reader;
1056 use crate::compression::{Codec, CodecOptionsBuilder, create_codec};
1057 use crate::data_type::{BoolType, ByteArrayType, Int32Type};
1058 use crate::file::page_index::column_index::ColumnIndexMetaData;
1059 use crate::file::properties::EnabledStatistics;
1060 use crate::file::serialized_reader::ReadOptionsBuilder;
1061 use crate::file::statistics::{from_thrift_page_stats, page_stats_to_thrift};
1062 use crate::file::{
1063 properties::{ReaderProperties, WriterProperties, WriterVersion},
1064 reader::{FileReader, SerializedFileReader, SerializedPageReader},
1065 statistics::Statistics,
1066 };
1067 use crate::record::{Row, RowAccessor};
1068 use crate::schema::parser::parse_message_type;
1069 use crate::schema::types;
1070 use crate::schema::types::{ColumnDescriptor, ColumnPath};
1071 use crate::util::test_common::file_util::get_test_file;
1072 use crate::util::test_common::rand_gen::RandGen;
1073
1074 #[test]
1075 fn test_row_group_writer_error_not_all_columns_written() {
1076 let file = tempfile::tempfile().unwrap();
1077 let schema = Arc::new(
1078 types::Type::group_type_builder("schema")
1079 .with_fields(vec![Arc::new(
1080 types::Type::primitive_type_builder("col1", Type::INT32)
1081 .build()
1082 .unwrap(),
1083 )])
1084 .build()
1085 .unwrap(),
1086 );
1087 let props = Default::default();
1088 let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
1089 let row_group_writer = writer.next_row_group().unwrap();
1090 let res = row_group_writer.close();
1091 assert!(res.is_err());
1092 if let Err(err) = res {
1093 assert_eq!(
1094 format!("{err}"),
1095 "Parquet error: Column length mismatch: 1 != 0"
1096 );
1097 }
1098 }
1099
1100 #[test]
1101 fn test_row_group_writer_num_records_mismatch() {
1102 let file = tempfile::tempfile().unwrap();
1103 let schema = Arc::new(
1104 types::Type::group_type_builder("schema")
1105 .with_fields(vec![
1106 Arc::new(
1107 types::Type::primitive_type_builder("col1", Type::INT32)
1108 .with_repetition(Repetition::REQUIRED)
1109 .build()
1110 .unwrap(),
1111 ),
1112 Arc::new(
1113 types::Type::primitive_type_builder("col2", Type::INT32)
1114 .with_repetition(Repetition::REQUIRED)
1115 .build()
1116 .unwrap(),
1117 ),
1118 ])
1119 .build()
1120 .unwrap(),
1121 );
1122 let props = Default::default();
1123 let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
1124 let mut row_group_writer = writer.next_row_group().unwrap();
1125
1126 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
1127 col_writer
1128 .typed::<Int32Type>()
1129 .write_batch(&[1, 2, 3], None, None)
1130 .unwrap();
1131 col_writer.close().unwrap();
1132
1133 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
1134 col_writer
1135 .typed::<Int32Type>()
1136 .write_batch(&[1, 2], None, None)
1137 .unwrap();
1138
1139 let err = col_writer.close().unwrap_err();
1140 assert_eq!(
1141 err.to_string(),
1142 "Parquet error: Incorrect number of rows, expected 3 != 2 rows"
1143 );
1144 }
1145
1146 #[test]
1147 fn test_file_writer_empty_file() {
1148 let file = tempfile::tempfile().unwrap();
1149
1150 let schema = Arc::new(
1151 types::Type::group_type_builder("schema")
1152 .with_fields(vec![Arc::new(
1153 types::Type::primitive_type_builder("col1", Type::INT32)
1154 .build()
1155 .unwrap(),
1156 )])
1157 .build()
1158 .unwrap(),
1159 );
1160 let props = Default::default();
1161 let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1162 writer.close().unwrap();
1163
1164 let reader = SerializedFileReader::new(file).unwrap();
1165 assert_eq!(reader.get_row_iter(None).unwrap().count(), 0);
1166 }
1167
1168 #[test]
1169 fn test_file_writer_column_orders_populated() {
1170 let file = tempfile::tempfile().unwrap();
1171
1172 let schema = Arc::new(
1173 types::Type::group_type_builder("schema")
1174 .with_fields(vec![
1175 Arc::new(
1176 types::Type::primitive_type_builder("col1", Type::INT32)
1177 .build()
1178 .unwrap(),
1179 ),
1180 Arc::new(
1181 types::Type::primitive_type_builder("col2", Type::FIXED_LEN_BYTE_ARRAY)
1182 .with_converted_type(ConvertedType::INTERVAL)
1183 .with_length(12)
1184 .build()
1185 .unwrap(),
1186 ),
1187 Arc::new(
1188 types::Type::group_type_builder("nested")
1189 .with_repetition(Repetition::REQUIRED)
1190 .with_fields(vec![
1191 Arc::new(
1192 types::Type::primitive_type_builder(
1193 "col3",
1194 Type::FIXED_LEN_BYTE_ARRAY,
1195 )
1196 .with_logical_type(Some(LogicalType::Float16))
1197 .with_length(2)
1198 .build()
1199 .unwrap(),
1200 ),
1201 Arc::new(
1202 types::Type::primitive_type_builder("col4", Type::BYTE_ARRAY)
1203 .with_logical_type(Some(LogicalType::String))
1204 .build()
1205 .unwrap(),
1206 ),
1207 ])
1208 .build()
1209 .unwrap(),
1210 ),
1211 ])
1212 .build()
1213 .unwrap(),
1214 );
1215
1216 let props = Default::default();
1217 let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1218 writer.close().unwrap();
1219
1220 let reader = SerializedFileReader::new(file).unwrap();
1221
1222 let expected = vec![
1224 ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED),
1226 ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::UNDEFINED),
1228 ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED),
1230 ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::UNSIGNED),
1232 ];
1233 let actual = reader.metadata().file_metadata().column_orders();
1234
1235 assert!(actual.is_some());
1236 let actual = actual.unwrap();
1237 assert_eq!(*actual, expected);
1238 }
1239
1240 #[test]
1241 fn test_file_writer_with_metadata() {
1242 let file = tempfile::tempfile().unwrap();
1243
1244 let schema = Arc::new(
1245 types::Type::group_type_builder("schema")
1246 .with_fields(vec![Arc::new(
1247 types::Type::primitive_type_builder("col1", Type::INT32)
1248 .build()
1249 .unwrap(),
1250 )])
1251 .build()
1252 .unwrap(),
1253 );
1254 let props = Arc::new(
1255 WriterProperties::builder()
1256 .set_key_value_metadata(Some(vec![KeyValue::new(
1257 "key".to_string(),
1258 "value".to_string(),
1259 )]))
1260 .build(),
1261 );
1262 let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1263 writer.close().unwrap();
1264
1265 let reader = SerializedFileReader::new(file).unwrap();
1266 assert_eq!(
1267 reader
1268 .metadata()
1269 .file_metadata()
1270 .key_value_metadata()
1271 .to_owned()
1272 .unwrap()
1273 .len(),
1274 1
1275 );
1276 }
1277
1278 #[test]
1279 fn test_file_writer_v2_with_metadata() {
1280 let file = tempfile::tempfile().unwrap();
1281 let field_logical_type = Some(LogicalType::Integer {
1282 bit_width: 8,
1283 is_signed: false,
1284 });
1285 let field = Arc::new(
1286 types::Type::primitive_type_builder("col1", Type::INT32)
1287 .with_logical_type(field_logical_type.clone())
1288 .with_converted_type(field_logical_type.into())
1289 .build()
1290 .unwrap(),
1291 );
1292 let schema = Arc::new(
1293 types::Type::group_type_builder("schema")
1294 .with_fields(vec![field.clone()])
1295 .build()
1296 .unwrap(),
1297 );
1298 let props = Arc::new(
1299 WriterProperties::builder()
1300 .set_key_value_metadata(Some(vec![KeyValue::new(
1301 "key".to_string(),
1302 "value".to_string(),
1303 )]))
1304 .set_writer_version(WriterVersion::PARQUET_2_0)
1305 .build(),
1306 );
1307 let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1308 writer.close().unwrap();
1309
1310 let reader = SerializedFileReader::new(file).unwrap();
1311
1312 assert_eq!(
1313 reader
1314 .metadata()
1315 .file_metadata()
1316 .key_value_metadata()
1317 .to_owned()
1318 .unwrap()
1319 .len(),
1320 1
1321 );
1322
1323 let fields = reader.metadata().file_metadata().schema().get_fields();
1325 assert_eq!(fields.len(), 1);
1326 assert_eq!(fields[0], field);
1327 }
1328
1329 #[test]
1330 fn test_file_writer_with_sorting_columns_metadata() {
1331 let file = tempfile::tempfile().unwrap();
1332
1333 let schema = Arc::new(
1334 types::Type::group_type_builder("schema")
1335 .with_fields(vec![
1336 Arc::new(
1337 types::Type::primitive_type_builder("col1", Type::INT32)
1338 .build()
1339 .unwrap(),
1340 ),
1341 Arc::new(
1342 types::Type::primitive_type_builder("col2", Type::INT32)
1343 .build()
1344 .unwrap(),
1345 ),
1346 ])
1347 .build()
1348 .unwrap(),
1349 );
1350 let expected_result = Some(vec![SortingColumn {
1351 column_idx: 0,
1352 descending: false,
1353 nulls_first: true,
1354 }]);
1355 let props = Arc::new(
1356 WriterProperties::builder()
1357 .set_key_value_metadata(Some(vec![KeyValue::new(
1358 "key".to_string(),
1359 "value".to_string(),
1360 )]))
1361 .set_sorting_columns(expected_result.clone())
1362 .build(),
1363 );
1364 let mut writer =
1365 SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1366 let mut row_group_writer = writer.next_row_group().expect("get row group writer");
1367
1368 let col_writer = row_group_writer.next_column().unwrap().unwrap();
1369 col_writer.close().unwrap();
1370
1371 let col_writer = row_group_writer.next_column().unwrap().unwrap();
1372 col_writer.close().unwrap();
1373
1374 row_group_writer.close().unwrap();
1375 writer.close().unwrap();
1376
1377 let reader = SerializedFileReader::new(file).unwrap();
1378 let result: Vec<Option<&Vec<SortingColumn>>> = reader
1379 .metadata()
1380 .row_groups()
1381 .iter()
1382 .map(|f| f.sorting_columns())
1383 .collect();
1384 assert_eq!(expected_result.as_ref(), result[0]);
1386 }
1387
1388 #[test]
1389 fn test_file_writer_empty_row_groups() {
1390 let file = tempfile::tempfile().unwrap();
1391 test_file_roundtrip(file, vec![]);
1392 }
1393
1394 #[test]
1395 fn test_file_writer_single_row_group() {
1396 let file = tempfile::tempfile().unwrap();
1397 test_file_roundtrip(file, vec![vec![1, 2, 3, 4, 5]]);
1398 }
1399
1400 #[test]
1401 fn test_file_writer_multiple_row_groups() {
1402 let file = tempfile::tempfile().unwrap();
1403 test_file_roundtrip(
1404 file,
1405 vec![
1406 vec![1, 2, 3, 4, 5],
1407 vec![1, 2, 3],
1408 vec![1],
1409 vec![1, 2, 3, 4, 5, 6],
1410 ],
1411 );
1412 }
1413
1414 #[test]
1415 fn test_file_writer_multiple_large_row_groups() {
1416 let file = tempfile::tempfile().unwrap();
1417 test_file_roundtrip(
1418 file,
1419 vec![vec![123; 1024], vec![124; 1000], vec![125; 15], vec![]],
1420 );
1421 }
1422
1423 #[test]
1424 fn test_page_writer_data_pages() {
1425 let pages = [
1426 Page::DataPage {
1427 buf: Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]),
1428 num_values: 10,
1429 encoding: Encoding::DELTA_BINARY_PACKED,
1430 def_level_encoding: Encoding::RLE,
1431 rep_level_encoding: Encoding::RLE,
1432 statistics: Some(Statistics::int32(Some(1), Some(3), None, Some(7), true)),
1433 },
1434 Page::DataPageV2 {
1435 buf: Bytes::from(vec![4; 128]),
1436 num_values: 10,
1437 encoding: Encoding::DELTA_BINARY_PACKED,
1438 num_nulls: 2,
1439 num_rows: 12,
1440 def_levels_byte_len: 24,
1441 rep_levels_byte_len: 32,
1442 is_compressed: false,
1443 statistics: Some(Statistics::int32(Some(1), Some(3), None, Some(7), true)),
1444 },
1445 ];
1446
1447 test_page_roundtrip(&pages[..], Compression::SNAPPY, Type::INT32);
1448 test_page_roundtrip(&pages[..], Compression::UNCOMPRESSED, Type::INT32);
1449 }
1450
1451 #[test]
1452 fn test_page_writer_dict_pages() {
1453 let pages = [
1454 Page::DictionaryPage {
1455 buf: Bytes::from(vec![1, 2, 3, 4, 5]),
1456 num_values: 5,
1457 encoding: Encoding::RLE_DICTIONARY,
1458 is_sorted: false,
1459 },
1460 Page::DataPage {
1461 buf: Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]),
1462 num_values: 10,
1463 encoding: Encoding::DELTA_BINARY_PACKED,
1464 def_level_encoding: Encoding::RLE,
1465 rep_level_encoding: Encoding::RLE,
1466 statistics: Some(Statistics::int32(Some(1), Some(3), None, Some(7), true)),
1467 },
1468 Page::DataPageV2 {
1469 buf: Bytes::from(vec![4; 128]),
1470 num_values: 10,
1471 encoding: Encoding::DELTA_BINARY_PACKED,
1472 num_nulls: 2,
1473 num_rows: 12,
1474 def_levels_byte_len: 24,
1475 rep_levels_byte_len: 32,
1476 is_compressed: false,
1477 statistics: None,
1478 },
1479 ];
1480
1481 test_page_roundtrip(&pages[..], Compression::SNAPPY, Type::INT32);
1482 test_page_roundtrip(&pages[..], Compression::UNCOMPRESSED, Type::INT32);
1483 }
1484
1485 fn test_page_roundtrip(pages: &[Page], codec: Compression, physical_type: Type) {
1489 let mut compressed_pages = vec![];
1490 let mut total_num_values = 0i64;
1491 let codec_options = CodecOptionsBuilder::default()
1492 .set_backward_compatible_lz4(false)
1493 .build();
1494 let mut compressor = create_codec(codec, &codec_options).unwrap();
1495
1496 for page in pages {
1497 let uncompressed_len = page.buffer().len();
1498
1499 let compressed_page = match *page {
1500 Page::DataPage {
1501 ref buf,
1502 num_values,
1503 encoding,
1504 def_level_encoding,
1505 rep_level_encoding,
1506 ref statistics,
1507 } => {
1508 total_num_values += num_values as i64;
1509 let output_buf = compress_helper(compressor.as_mut(), buf);
1510
1511 Page::DataPage {
1512 buf: Bytes::from(output_buf),
1513 num_values,
1514 encoding,
1515 def_level_encoding,
1516 rep_level_encoding,
1517 statistics: from_thrift_page_stats(
1518 physical_type,
1519 page_stats_to_thrift(statistics.as_ref()),
1520 )
1521 .unwrap(),
1522 }
1523 }
1524 Page::DataPageV2 {
1525 ref buf,
1526 num_values,
1527 encoding,
1528 num_nulls,
1529 num_rows,
1530 def_levels_byte_len,
1531 rep_levels_byte_len,
1532 ref statistics,
1533 ..
1534 } => {
1535 total_num_values += num_values as i64;
1536 let offset = (def_levels_byte_len + rep_levels_byte_len) as usize;
1537 let cmp_buf = compress_helper(compressor.as_mut(), &buf[offset..]);
1538 let mut output_buf = Vec::from(&buf[..offset]);
1539 output_buf.extend_from_slice(&cmp_buf[..]);
1540
1541 Page::DataPageV2 {
1542 buf: Bytes::from(output_buf),
1543 num_values,
1544 encoding,
1545 num_nulls,
1546 num_rows,
1547 def_levels_byte_len,
1548 rep_levels_byte_len,
1549 is_compressed: compressor.is_some(),
1550 statistics: from_thrift_page_stats(
1551 physical_type,
1552 page_stats_to_thrift(statistics.as_ref()),
1553 )
1554 .unwrap(),
1555 }
1556 }
1557 Page::DictionaryPage {
1558 ref buf,
1559 num_values,
1560 encoding,
1561 is_sorted,
1562 } => {
1563 let output_buf = compress_helper(compressor.as_mut(), buf);
1564
1565 Page::DictionaryPage {
1566 buf: Bytes::from(output_buf),
1567 num_values,
1568 encoding,
1569 is_sorted,
1570 }
1571 }
1572 };
1573
1574 let compressed_page = CompressedPage::new(compressed_page, uncompressed_len);
1575 compressed_pages.push(compressed_page);
1576 }
1577
1578 let mut buffer: Vec<u8> = vec![];
1579 let mut result_pages: Vec<Page> = vec![];
1580 {
1581 let mut writer = TrackedWrite::new(&mut buffer);
1582 let mut page_writer = SerializedPageWriter::new(&mut writer);
1583
1584 for page in compressed_pages {
1585 page_writer.write_page(page).unwrap();
1586 }
1587 page_writer.close().unwrap();
1588 }
1589 {
1590 let reader = bytes::Bytes::from(buffer);
1591
1592 let t = types::Type::primitive_type_builder("t", physical_type)
1593 .build()
1594 .unwrap();
1595
1596 let desc = ColumnDescriptor::new(Arc::new(t), 0, 0, ColumnPath::new(vec![]));
1597 let meta = ColumnChunkMetaData::builder(Arc::new(desc))
1598 .set_compression(codec)
1599 .set_total_compressed_size(reader.len() as i64)
1600 .set_num_values(total_num_values)
1601 .build()
1602 .unwrap();
1603
1604 let props = ReaderProperties::builder()
1605 .set_backward_compatible_lz4(false)
1606 .set_read_page_statistics(true)
1607 .build();
1608 let mut page_reader = SerializedPageReader::new_with_properties(
1609 Arc::new(reader),
1610 &meta,
1611 total_num_values as usize,
1612 None,
1613 Arc::new(props),
1614 )
1615 .unwrap();
1616
1617 while let Some(page) = page_reader.get_next_page().unwrap() {
1618 result_pages.push(page);
1619 }
1620 }
1621
1622 assert_eq!(result_pages.len(), pages.len());
1623 for i in 0..result_pages.len() {
1624 assert_page(&result_pages[i], &pages[i]);
1625 }
1626 }
1627
1628 fn compress_helper(compressor: Option<&mut Box<dyn Codec>>, data: &[u8]) -> Vec<u8> {
1630 let mut output_buf = vec![];
1631 if let Some(cmpr) = compressor {
1632 cmpr.compress(data, &mut output_buf).unwrap();
1633 } else {
1634 output_buf.extend_from_slice(data);
1635 }
1636 output_buf
1637 }
1638
1639 fn assert_page(left: &Page, right: &Page) {
1641 assert_eq!(left.page_type(), right.page_type());
1642 assert_eq!(&left.buffer(), &right.buffer());
1643 assert_eq!(left.num_values(), right.num_values());
1644 assert_eq!(left.encoding(), right.encoding());
1645 assert_eq!(
1646 page_stats_to_thrift(left.statistics()),
1647 page_stats_to_thrift(right.statistics())
1648 );
1649 }
1650
1651 fn test_roundtrip_i32<W, R>(
1653 file: W,
1654 data: Vec<Vec<i32>>,
1655 compression: Compression,
1656 ) -> ParquetMetaData
1657 where
1658 W: Write + Send,
1659 R: ChunkReader + From<W> + 'static,
1660 {
1661 test_roundtrip::<W, R, Int32Type, _>(file, data, |r| r.get_int(0).unwrap(), compression)
1662 }
1663
1664 fn test_roundtrip<W, R, D, F>(
1667 mut file: W,
1668 data: Vec<Vec<D::T>>,
1669 value: F,
1670 compression: Compression,
1671 ) -> ParquetMetaData
1672 where
1673 W: Write + Send,
1674 R: ChunkReader + From<W> + 'static,
1675 D: DataType,
1676 F: Fn(Row) -> D::T,
1677 {
1678 let schema = Arc::new(
1679 types::Type::group_type_builder("schema")
1680 .with_fields(vec![Arc::new(
1681 types::Type::primitive_type_builder("col1", D::get_physical_type())
1682 .with_repetition(Repetition::REQUIRED)
1683 .build()
1684 .unwrap(),
1685 )])
1686 .build()
1687 .unwrap(),
1688 );
1689 let props = Arc::new(
1690 WriterProperties::builder()
1691 .set_compression(compression)
1692 .build(),
1693 );
1694 let mut file_writer = SerializedFileWriter::new(&mut file, schema, props).unwrap();
1695 let mut rows: i64 = 0;
1696
1697 for (idx, subset) in data.iter().enumerate() {
1698 let row_group_file_offset = file_writer.buf.bytes_written();
1699 let mut row_group_writer = file_writer.next_row_group().unwrap();
1700 if let Some(mut writer) = row_group_writer.next_column().unwrap() {
1701 rows += writer
1702 .typed::<D>()
1703 .write_batch(&subset[..], None, None)
1704 .unwrap() as i64;
1705 writer.close().unwrap();
1706 }
1707 let last_group = row_group_writer.close().unwrap();
1708 let flushed = file_writer.flushed_row_groups();
1709 assert_eq!(flushed.len(), idx + 1);
1710 assert_eq!(Some(idx as i16), last_group.ordinal());
1711 assert_eq!(Some(row_group_file_offset as i64), last_group.file_offset());
1712 assert_eq!(&flushed[idx], last_group.as_ref());
1713 }
1714 let file_metadata = file_writer.close().unwrap();
1715
1716 let reader = SerializedFileReader::new(R::from(file)).unwrap();
1717 assert_eq!(reader.num_row_groups(), data.len());
1718 assert_eq!(
1719 reader.metadata().file_metadata().num_rows(),
1720 rows,
1721 "row count in metadata not equal to number of rows written"
1722 );
1723 for (i, item) in data.iter().enumerate().take(reader.num_row_groups()) {
1724 let row_group_reader = reader.get_row_group(i).unwrap();
1725 let iter = row_group_reader.get_row_iter(None).unwrap();
1726 let res: Vec<_> = iter.map(|row| row.unwrap()).map(&value).collect();
1727 let row_group_size = row_group_reader.metadata().total_byte_size();
1728 let uncompressed_size: i64 = row_group_reader
1729 .metadata()
1730 .columns()
1731 .iter()
1732 .map(|v| v.uncompressed_size())
1733 .sum();
1734 assert_eq!(row_group_size, uncompressed_size);
1735 assert_eq!(res, *item);
1736 }
1737 file_metadata
1738 }
1739
1740 fn test_file_roundtrip(file: File, data: Vec<Vec<i32>>) -> ParquetMetaData {
1743 test_roundtrip_i32::<File, File>(file, data, Compression::UNCOMPRESSED)
1744 }
1745
1746 #[test]
1747 fn test_bytes_writer_empty_row_groups() {
1748 test_bytes_roundtrip(vec![], Compression::UNCOMPRESSED);
1749 }
1750
1751 #[test]
1752 fn test_bytes_writer_single_row_group() {
1753 test_bytes_roundtrip(vec![vec![1, 2, 3, 4, 5]], Compression::UNCOMPRESSED);
1754 }
1755
1756 #[test]
1757 fn test_bytes_writer_multiple_row_groups() {
1758 test_bytes_roundtrip(
1759 vec![
1760 vec![1, 2, 3, 4, 5],
1761 vec![1, 2, 3],
1762 vec![1],
1763 vec![1, 2, 3, 4, 5, 6],
1764 ],
1765 Compression::UNCOMPRESSED,
1766 );
1767 }
1768
1769 #[test]
1770 fn test_bytes_writer_single_row_group_compressed() {
1771 test_bytes_roundtrip(vec![vec![1, 2, 3, 4, 5]], Compression::SNAPPY);
1772 }
1773
1774 #[test]
1775 fn test_bytes_writer_multiple_row_groups_compressed() {
1776 test_bytes_roundtrip(
1777 vec![
1778 vec![1, 2, 3, 4, 5],
1779 vec![1, 2, 3],
1780 vec![1],
1781 vec![1, 2, 3, 4, 5, 6],
1782 ],
1783 Compression::SNAPPY,
1784 );
1785 }
1786
1787 fn test_bytes_roundtrip(data: Vec<Vec<i32>>, compression: Compression) {
1788 test_roundtrip_i32::<Vec<u8>, Bytes>(Vec::with_capacity(1024), data, compression);
1789 }
1790
1791 #[test]
1792 fn test_boolean_roundtrip() {
1793 let my_bool_values: Vec<_> = (0..2049).map(|idx| idx % 2 == 0).collect();
1794 test_roundtrip::<Vec<u8>, Bytes, BoolType, _>(
1795 Vec::with_capacity(1024),
1796 vec![my_bool_values],
1797 |r| r.get_bool(0).unwrap(),
1798 Compression::UNCOMPRESSED,
1799 );
1800 }
1801
1802 #[test]
1803 fn test_boolean_compressed_roundtrip() {
1804 let my_bool_values: Vec<_> = (0..2049).map(|idx| idx % 2 == 0).collect();
1805 test_roundtrip::<Vec<u8>, Bytes, BoolType, _>(
1806 Vec::with_capacity(1024),
1807 vec![my_bool_values],
1808 |r| r.get_bool(0).unwrap(),
1809 Compression::SNAPPY,
1810 );
1811 }
1812
1813 #[test]
1814 fn test_column_offset_index_file() {
1815 let file = tempfile::tempfile().unwrap();
1816 let file_metadata = test_file_roundtrip(file, vec![vec![1, 2, 3, 4, 5]]);
1817 file_metadata.row_groups().iter().for_each(|row_group| {
1818 row_group.columns().iter().for_each(|column_chunk| {
1819 assert!(column_chunk.column_index_offset().is_some());
1820 assert!(column_chunk.column_index_length().is_some());
1821 assert!(column_chunk.offset_index_offset().is_some());
1822 assert!(column_chunk.offset_index_length().is_some());
1823 })
1824 });
1825 }
1826
1827 fn test_kv_metadata(initial_kv: Option<Vec<KeyValue>>, final_kv: Option<Vec<KeyValue>>) {
1828 let schema = Arc::new(
1829 types::Type::group_type_builder("schema")
1830 .with_fields(vec![Arc::new(
1831 types::Type::primitive_type_builder("col1", Type::INT32)
1832 .with_repetition(Repetition::REQUIRED)
1833 .build()
1834 .unwrap(),
1835 )])
1836 .build()
1837 .unwrap(),
1838 );
1839 let mut out = Vec::with_capacity(1024);
1840 let props = Arc::new(
1841 WriterProperties::builder()
1842 .set_key_value_metadata(initial_kv.clone())
1843 .build(),
1844 );
1845 let mut writer = SerializedFileWriter::new(&mut out, schema, props).unwrap();
1846 let mut row_group_writer = writer.next_row_group().unwrap();
1847 let column = row_group_writer.next_column().unwrap().unwrap();
1848 column.close().unwrap();
1849 row_group_writer.close().unwrap();
1850 if let Some(kvs) = &final_kv {
1851 for kv in kvs {
1852 writer.append_key_value_metadata(kv.clone())
1853 }
1854 }
1855 writer.close().unwrap();
1856
1857 let reader = SerializedFileReader::new(Bytes::from(out)).unwrap();
1858 let metadata = reader.metadata().file_metadata();
1859 let keys = metadata.key_value_metadata();
1860
1861 match (initial_kv, final_kv) {
1862 (Some(a), Some(b)) => {
1863 let keys = keys.unwrap();
1864 assert_eq!(keys.len(), a.len() + b.len());
1865 assert_eq!(&keys[..a.len()], a.as_slice());
1866 assert_eq!(&keys[a.len()..], b.as_slice());
1867 }
1868 (Some(v), None) => assert_eq!(keys.unwrap(), &v),
1869 (None, Some(v)) if !v.is_empty() => assert_eq!(keys.unwrap(), &v),
1870 _ => assert!(keys.is_none()),
1871 }
1872 }
1873
1874 #[test]
1875 fn test_append_metadata() {
1876 let kv1 = KeyValue::new("cupcakes".to_string(), "awesome".to_string());
1877 let kv2 = KeyValue::new("bingo".to_string(), "bongo".to_string());
1878
1879 test_kv_metadata(None, None);
1880 test_kv_metadata(Some(vec![kv1.clone()]), None);
1881 test_kv_metadata(None, Some(vec![kv2.clone()]));
1882 test_kv_metadata(Some(vec![kv1.clone()]), Some(vec![kv2.clone()]));
1883 test_kv_metadata(Some(vec![]), Some(vec![kv2]));
1884 test_kv_metadata(Some(vec![]), Some(vec![]));
1885 test_kv_metadata(Some(vec![kv1]), Some(vec![]));
1886 test_kv_metadata(None, Some(vec![]));
1887 }
1888
1889 #[test]
1890 fn test_backwards_compatible_statistics() {
1891 let message_type = "
1892 message test_schema {
1893 REQUIRED INT32 decimal1 (DECIMAL(8,2));
1894 REQUIRED INT32 i32 (INTEGER(32,true));
1895 REQUIRED INT32 u32 (INTEGER(32,false));
1896 }
1897 ";
1898
1899 let schema = Arc::new(parse_message_type(message_type).unwrap());
1900 let props = Default::default();
1901 let mut writer = SerializedFileWriter::new(vec![], schema, props).unwrap();
1902 let mut row_group_writer = writer.next_row_group().unwrap();
1903
1904 for _ in 0..3 {
1905 let mut writer = row_group_writer.next_column().unwrap().unwrap();
1906 writer
1907 .typed::<Int32Type>()
1908 .write_batch(&[1, 2, 3], None, None)
1909 .unwrap();
1910 writer.close().unwrap();
1911 }
1912 let metadata = row_group_writer.close().unwrap();
1913 writer.close().unwrap();
1914
1915 let s = page_stats_to_thrift(metadata.column(0).statistics()).unwrap();
1917 assert_eq!(s.min.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1918 assert_eq!(s.max.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1919 assert_eq!(s.min_value.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1920 assert_eq!(s.max_value.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1921
1922 let s = page_stats_to_thrift(metadata.column(1).statistics()).unwrap();
1924 assert_eq!(s.min.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1925 assert_eq!(s.max.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1926 assert_eq!(s.min_value.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1927 assert_eq!(s.max_value.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1928
1929 let s = page_stats_to_thrift(metadata.column(2).statistics()).unwrap();
1931 assert_eq!(s.min.as_deref(), None);
1932 assert_eq!(s.max.as_deref(), None);
1933 assert_eq!(s.min_value.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1934 assert_eq!(s.max_value.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1935 }
1936
1937 #[test]
1938 fn test_spliced_write() {
1939 let message_type = "
1940 message test_schema {
1941 REQUIRED INT32 i32 (INTEGER(32,true));
1942 REQUIRED INT32 u32 (INTEGER(32,false));
1943 }
1944 ";
1945 let schema = Arc::new(parse_message_type(message_type).unwrap());
1946 let props = Arc::new(WriterProperties::builder().build());
1947
1948 let mut file = Vec::with_capacity(1024);
1949 let mut file_writer = SerializedFileWriter::new(&mut file, schema, props.clone()).unwrap();
1950
1951 let columns = file_writer.descr.columns();
1952 let mut column_state: Vec<(_, Option<ColumnCloseResult>)> = columns
1953 .iter()
1954 .map(|_| (TrackedWrite::new(Vec::with_capacity(1024)), None))
1955 .collect();
1956
1957 let mut column_state_slice = column_state.as_mut_slice();
1958 let mut column_writers = Vec::with_capacity(columns.len());
1959 for c in columns {
1960 let ((buf, out), tail) = column_state_slice.split_first_mut().unwrap();
1961 column_state_slice = tail;
1962
1963 let page_writer = Box::new(SerializedPageWriter::new(buf));
1964 let col_writer = get_column_writer(c.clone(), props.clone(), page_writer);
1965 column_writers.push(SerializedColumnWriter::new(
1966 col_writer,
1967 Some(Box::new(|on_close| {
1968 *out = Some(on_close);
1969 Ok(())
1970 })),
1971 ));
1972 }
1973
1974 let column_data = [[1, 2, 3, 4], [7, 3, 7, 3]];
1975
1976 for (writer, batch) in column_writers.iter_mut().zip(column_data) {
1978 let writer = writer.typed::<Int32Type>();
1979 writer.write_batch(&batch, None, None).unwrap();
1980 }
1981
1982 for writer in column_writers {
1984 writer.close().unwrap()
1985 }
1986
1987 let mut row_group_writer = file_writer.next_row_group().unwrap();
1989 for (write, close) in column_state {
1990 let buf = Bytes::from(write.into_inner().unwrap());
1991 row_group_writer
1992 .append_column(&buf, close.unwrap())
1993 .unwrap();
1994 }
1995 row_group_writer.close().unwrap();
1996 file_writer.close().unwrap();
1997
1998 let file = Bytes::from(file);
2000 let test_read = |reader: SerializedFileReader<Bytes>| {
2001 let row_group = reader.get_row_group(0).unwrap();
2002
2003 let mut out = Vec::with_capacity(4);
2004 let c1 = row_group.get_column_reader(0).unwrap();
2005 let mut c1 = get_typed_column_reader::<Int32Type>(c1);
2006 c1.read_records(4, None, None, &mut out).unwrap();
2007 assert_eq!(out, column_data[0]);
2008
2009 out.clear();
2010
2011 let c2 = row_group.get_column_reader(1).unwrap();
2012 let mut c2 = get_typed_column_reader::<Int32Type>(c2);
2013 c2.read_records(4, None, None, &mut out).unwrap();
2014 assert_eq!(out, column_data[1]);
2015 };
2016
2017 let reader = SerializedFileReader::new(file.clone()).unwrap();
2018 test_read(reader);
2019
2020 let options = ReadOptionsBuilder::new().with_page_index().build();
2021 let reader = SerializedFileReader::new_with_options(file, options).unwrap();
2022 test_read(reader);
2023 }
2024
2025 #[test]
2026 fn test_disabled_statistics() {
2027 let message_type = "
2028 message test_schema {
2029 REQUIRED INT32 a;
2030 REQUIRED INT32 b;
2031 }
2032 ";
2033 let schema = Arc::new(parse_message_type(message_type).unwrap());
2034 let props = WriterProperties::builder()
2035 .set_statistics_enabled(EnabledStatistics::None)
2036 .set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
2037 .set_offset_index_disabled(true) .build();
2039 let mut file = Vec::with_capacity(1024);
2040 let mut file_writer =
2041 SerializedFileWriter::new(&mut file, schema, Arc::new(props)).unwrap();
2042
2043 let mut row_group_writer = file_writer.next_row_group().unwrap();
2044 let mut a_writer = row_group_writer.next_column().unwrap().unwrap();
2045 let col_writer = a_writer.typed::<Int32Type>();
2046 col_writer.write_batch(&[1, 2, 3], None, None).unwrap();
2047 a_writer.close().unwrap();
2048
2049 let mut b_writer = row_group_writer.next_column().unwrap().unwrap();
2050 let col_writer = b_writer.typed::<Int32Type>();
2051 col_writer.write_batch(&[4, 5, 6], None, None).unwrap();
2052 b_writer.close().unwrap();
2053 row_group_writer.close().unwrap();
2054
2055 let metadata = file_writer.finish().unwrap();
2056 assert_eq!(metadata.num_row_groups(), 1);
2057 let row_group = metadata.row_group(0);
2058 assert_eq!(row_group.num_columns(), 2);
2059 assert!(row_group.column(0).offset_index_offset().is_some());
2061 assert!(row_group.column(0).column_index_offset().is_some());
2062 assert!(row_group.column(1).offset_index_offset().is_some());
2064 assert!(row_group.column(1).column_index_offset().is_none());
2065
2066 let err = file_writer.next_row_group().err().unwrap().to_string();
2067 assert_eq!(err, "Parquet error: SerializedFileWriter already finished");
2068
2069 drop(file_writer);
2070
2071 let options = ReadOptionsBuilder::new().with_page_index().build();
2072 let reader = SerializedFileReader::new_with_options(Bytes::from(file), options).unwrap();
2073
2074 let offset_index = reader.metadata().offset_index().unwrap();
2075 assert_eq!(offset_index.len(), 1); assert_eq!(offset_index[0].len(), 2); let column_index = reader.metadata().column_index().unwrap();
2079 assert_eq!(column_index.len(), 1); assert_eq!(column_index[0].len(), 2); let a_idx = &column_index[0][0];
2083 assert!(matches!(a_idx, ColumnIndexMetaData::INT32(_)), "{a_idx:?}");
2084 let b_idx = &column_index[0][1];
2085 assert!(matches!(b_idx, ColumnIndexMetaData::NONE), "{b_idx:?}");
2086 }
2087
2088 #[test]
2089 fn test_byte_array_size_statistics() {
2090 let message_type = "
2091 message test_schema {
2092 OPTIONAL BYTE_ARRAY a (UTF8);
2093 }
2094 ";
2095 let schema = Arc::new(parse_message_type(message_type).unwrap());
2096 let data = ByteArrayType::gen_vec(32, 7);
2097 let def_levels = [1, 1, 1, 1, 0, 1, 0, 1, 0, 1];
2098 let unenc_size: i64 = data.iter().map(|x| x.len() as i64).sum();
2099 let file: File = tempfile::tempfile().unwrap();
2100 let props = Arc::new(
2101 WriterProperties::builder()
2102 .set_statistics_enabled(EnabledStatistics::Page)
2103 .build(),
2104 );
2105
2106 let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
2107 let mut row_group_writer = writer.next_row_group().unwrap();
2108
2109 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
2110 col_writer
2111 .typed::<ByteArrayType>()
2112 .write_batch(&data, Some(&def_levels), None)
2113 .unwrap();
2114 col_writer.close().unwrap();
2115 row_group_writer.close().unwrap();
2116 let file_metadata = writer.close().unwrap();
2117
2118 assert_eq!(file_metadata.num_row_groups(), 1);
2119 assert_eq!(file_metadata.row_group(0).num_columns(), 1);
2120
2121 let check_def_hist = |def_hist: &[i64]| {
2122 assert_eq!(def_hist.len(), 2);
2123 assert_eq!(def_hist[0], 3);
2124 assert_eq!(def_hist[1], 7);
2125 };
2126
2127 let meta_data = file_metadata.row_group(0).column(0);
2128
2129 assert!(meta_data.repetition_level_histogram().is_none());
2130 assert!(meta_data.definition_level_histogram().is_some());
2131 assert!(meta_data.unencoded_byte_array_data_bytes().is_some());
2132 assert_eq!(
2133 unenc_size,
2134 meta_data.unencoded_byte_array_data_bytes().unwrap()
2135 );
2136 check_def_hist(meta_data.definition_level_histogram().unwrap().values());
2137
2138 let options = ReadOptionsBuilder::new().with_page_index().build();
2140 let reader = SerializedFileReader::new_with_options(file, options).unwrap();
2141
2142 let rfile_metadata = reader.metadata().file_metadata();
2143 assert_eq!(
2144 rfile_metadata.num_rows(),
2145 file_metadata.file_metadata().num_rows()
2146 );
2147 assert_eq!(reader.num_row_groups(), 1);
2148 let rowgroup = reader.get_row_group(0).unwrap();
2149 assert_eq!(rowgroup.num_columns(), 1);
2150 let column = rowgroup.metadata().column(0);
2151 assert!(column.definition_level_histogram().is_some());
2152 assert!(column.repetition_level_histogram().is_none());
2153 assert!(column.unencoded_byte_array_data_bytes().is_some());
2154 check_def_hist(column.definition_level_histogram().unwrap().values());
2155 assert_eq!(
2156 unenc_size,
2157 column.unencoded_byte_array_data_bytes().unwrap()
2158 );
2159
2160 assert!(reader.metadata().column_index().is_some());
2162 let column_index = reader.metadata().column_index().unwrap();
2163 assert_eq!(column_index.len(), 1);
2164 assert_eq!(column_index[0].len(), 1);
2165 let col_idx = if let ColumnIndexMetaData::BYTE_ARRAY(index) = &column_index[0][0] {
2166 assert_eq!(index.num_pages(), 1);
2167 index
2168 } else {
2169 unreachable!()
2170 };
2171
2172 assert!(col_idx.repetition_level_histogram(0).is_none());
2173 assert!(col_idx.definition_level_histogram(0).is_some());
2174 check_def_hist(col_idx.definition_level_histogram(0).unwrap());
2175
2176 assert!(reader.metadata().offset_index().is_some());
2177 let offset_index = reader.metadata().offset_index().unwrap();
2178 assert_eq!(offset_index.len(), 1);
2179 assert_eq!(offset_index[0].len(), 1);
2180 assert!(offset_index[0][0].unencoded_byte_array_data_bytes.is_some());
2181 let page_sizes = offset_index[0][0]
2182 .unencoded_byte_array_data_bytes
2183 .as_ref()
2184 .unwrap();
2185 assert_eq!(page_sizes.len(), 1);
2186 assert_eq!(page_sizes[0], unenc_size);
2187 }
2188
2189 #[test]
2190 fn test_too_many_rowgroups() {
2191 let message_type = "
2192 message test_schema {
2193 REQUIRED BYTE_ARRAY a (UTF8);
2194 }
2195 ";
2196 let schema = Arc::new(parse_message_type(message_type).unwrap());
2197 let file: File = tempfile::tempfile().unwrap();
2198 let props = Arc::new(
2199 WriterProperties::builder()
2200 .set_statistics_enabled(EnabledStatistics::None)
2201 .set_max_row_group_size(1)
2202 .build(),
2203 );
2204 let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
2205
2206 for i in 0..0x8001 {
2208 match writer.next_row_group() {
2209 Ok(mut row_group_writer) => {
2210 assert_ne!(i, 0x8000);
2211 let col_writer = row_group_writer.next_column().unwrap().unwrap();
2212 col_writer.close().unwrap();
2213 row_group_writer.close().unwrap();
2214 }
2215 Err(e) => {
2216 assert_eq!(i, 0x8000);
2217 assert_eq!(
2218 e.to_string(),
2219 "Parquet error: Parquet does not support more than 32767 row groups per file (currently: 32768)"
2220 );
2221 }
2222 }
2223 }
2224 writer.close().unwrap();
2225 }
2226
2227 #[test]
2228 fn test_size_statistics_with_repetition_and_nulls() {
2229 let message_type = "
2230 message test_schema {
2231 OPTIONAL group i32_list (LIST) {
2232 REPEATED group list {
2233 OPTIONAL INT32 element;
2234 }
2235 }
2236 }
2237 ";
2238 let schema = Arc::new(parse_message_type(message_type).unwrap());
2245 let data = [1, 2, 4, 7, 8, 9, 10];
2246 let def_levels = [3, 3, 0, 3, 2, 1, 3, 3, 3, 3];
2247 let rep_levels = [0, 1, 0, 0, 1, 0, 0, 1, 1, 1];
2248 let file = tempfile::tempfile().unwrap();
2249 let props = Arc::new(
2250 WriterProperties::builder()
2251 .set_statistics_enabled(EnabledStatistics::Page)
2252 .build(),
2253 );
2254 let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
2255 let mut row_group_writer = writer.next_row_group().unwrap();
2256
2257 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
2258 col_writer
2259 .typed::<Int32Type>()
2260 .write_batch(&data, Some(&def_levels), Some(&rep_levels))
2261 .unwrap();
2262 col_writer.close().unwrap();
2263 row_group_writer.close().unwrap();
2264 let file_metadata = writer.close().unwrap();
2265
2266 assert_eq!(file_metadata.num_row_groups(), 1);
2267 assert_eq!(file_metadata.row_group(0).num_columns(), 1);
2268
2269 let check_def_hist = |def_hist: &[i64]| {
2270 assert_eq!(def_hist.len(), 4);
2271 assert_eq!(def_hist[0], 1);
2272 assert_eq!(def_hist[1], 1);
2273 assert_eq!(def_hist[2], 1);
2274 assert_eq!(def_hist[3], 7);
2275 };
2276
2277 let check_rep_hist = |rep_hist: &[i64]| {
2278 assert_eq!(rep_hist.len(), 2);
2279 assert_eq!(rep_hist[0], 5);
2280 assert_eq!(rep_hist[1], 5);
2281 };
2282
2283 let meta_data = file_metadata.row_group(0).column(0);
2286 assert!(meta_data.repetition_level_histogram().is_some());
2287 assert!(meta_data.definition_level_histogram().is_some());
2288 assert!(meta_data.unencoded_byte_array_data_bytes().is_none());
2289 check_def_hist(meta_data.definition_level_histogram().unwrap().values());
2290 check_rep_hist(meta_data.repetition_level_histogram().unwrap().values());
2291
2292 let options = ReadOptionsBuilder::new().with_page_index().build();
2294 let reader = SerializedFileReader::new_with_options(file, options).unwrap();
2295
2296 let rfile_metadata = reader.metadata().file_metadata();
2297 assert_eq!(
2298 rfile_metadata.num_rows(),
2299 file_metadata.file_metadata().num_rows()
2300 );
2301 assert_eq!(reader.num_row_groups(), 1);
2302 let rowgroup = reader.get_row_group(0).unwrap();
2303 assert_eq!(rowgroup.num_columns(), 1);
2304 let column = rowgroup.metadata().column(0);
2305 assert!(column.definition_level_histogram().is_some());
2306 assert!(column.repetition_level_histogram().is_some());
2307 assert!(column.unencoded_byte_array_data_bytes().is_none());
2308 check_def_hist(column.definition_level_histogram().unwrap().values());
2309 check_rep_hist(column.repetition_level_histogram().unwrap().values());
2310
2311 assert!(reader.metadata().column_index().is_some());
2313 let column_index = reader.metadata().column_index().unwrap();
2314 assert_eq!(column_index.len(), 1);
2315 assert_eq!(column_index[0].len(), 1);
2316 let col_idx = if let ColumnIndexMetaData::INT32(index) = &column_index[0][0] {
2317 assert_eq!(index.num_pages(), 1);
2318 index
2319 } else {
2320 unreachable!()
2321 };
2322
2323 check_def_hist(col_idx.definition_level_histogram(0).unwrap());
2324 check_rep_hist(col_idx.repetition_level_histogram(0).unwrap());
2325
2326 assert!(reader.metadata().offset_index().is_some());
2327 let offset_index = reader.metadata().offset_index().unwrap();
2328 assert_eq!(offset_index.len(), 1);
2329 assert_eq!(offset_index[0].len(), 1);
2330 assert!(offset_index[0][0].unencoded_byte_array_data_bytes.is_none());
2331 }
2332
2333 #[test]
2334 #[cfg(feature = "arrow")]
2335 fn test_byte_stream_split_extended_roundtrip() {
2336 let path = format!(
2337 "{}/byte_stream_split_extended.gzip.parquet",
2338 arrow::util::test_util::parquet_test_data(),
2339 );
2340 let file = File::open(path).unwrap();
2341
2342 let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(file)
2344 .expect("parquet open")
2345 .build()
2346 .expect("parquet open");
2347
2348 let file = tempfile::tempfile().unwrap();
2349 let props = WriterProperties::builder()
2350 .set_dictionary_enabled(false)
2351 .set_column_encoding(
2352 ColumnPath::from("float16_byte_stream_split"),
2353 Encoding::BYTE_STREAM_SPLIT,
2354 )
2355 .set_column_encoding(
2356 ColumnPath::from("float_byte_stream_split"),
2357 Encoding::BYTE_STREAM_SPLIT,
2358 )
2359 .set_column_encoding(
2360 ColumnPath::from("double_byte_stream_split"),
2361 Encoding::BYTE_STREAM_SPLIT,
2362 )
2363 .set_column_encoding(
2364 ColumnPath::from("int32_byte_stream_split"),
2365 Encoding::BYTE_STREAM_SPLIT,
2366 )
2367 .set_column_encoding(
2368 ColumnPath::from("int64_byte_stream_split"),
2369 Encoding::BYTE_STREAM_SPLIT,
2370 )
2371 .set_column_encoding(
2372 ColumnPath::from("flba5_byte_stream_split"),
2373 Encoding::BYTE_STREAM_SPLIT,
2374 )
2375 .set_column_encoding(
2376 ColumnPath::from("decimal_byte_stream_split"),
2377 Encoding::BYTE_STREAM_SPLIT,
2378 )
2379 .build();
2380
2381 let mut parquet_writer = ArrowWriter::try_new(
2382 file.try_clone().expect("cannot open file"),
2383 parquet_reader.schema(),
2384 Some(props),
2385 )
2386 .expect("create arrow writer");
2387
2388 for maybe_batch in parquet_reader {
2389 let batch = maybe_batch.expect("reading batch");
2390 parquet_writer.write(&batch).expect("writing data");
2391 }
2392
2393 parquet_writer.close().expect("finalizing file");
2394
2395 let reader = SerializedFileReader::new(file).expect("Failed to create reader");
2396 let filemeta = reader.metadata();
2397
2398 let check_encoding = |x: usize, filemeta: &ParquetMetaData| {
2400 assert!(
2401 filemeta
2402 .row_group(0)
2403 .column(x)
2404 .encodings()
2405 .collect::<Vec<_>>()
2406 .contains(&Encoding::BYTE_STREAM_SPLIT)
2407 );
2408 };
2409
2410 check_encoding(1, filemeta);
2411 check_encoding(3, filemeta);
2412 check_encoding(5, filemeta);
2413 check_encoding(7, filemeta);
2414 check_encoding(9, filemeta);
2415 check_encoding(11, filemeta);
2416 check_encoding(13, filemeta);
2417
2418 let mut iter = reader
2420 .get_row_iter(None)
2421 .expect("Failed to create row iterator");
2422
2423 let mut start = 0;
2424 let end = reader.metadata().file_metadata().num_rows();
2425
2426 let check_row = |row: Result<Row, ParquetError>| {
2427 assert!(row.is_ok());
2428 let r = row.unwrap();
2429 assert_eq!(r.get_float16(0).unwrap(), r.get_float16(1).unwrap());
2430 assert_eq!(r.get_float(2).unwrap(), r.get_float(3).unwrap());
2431 assert_eq!(r.get_double(4).unwrap(), r.get_double(5).unwrap());
2432 assert_eq!(r.get_int(6).unwrap(), r.get_int(7).unwrap());
2433 assert_eq!(r.get_long(8).unwrap(), r.get_long(9).unwrap());
2434 assert_eq!(r.get_bytes(10).unwrap(), r.get_bytes(11).unwrap());
2435 assert_eq!(r.get_decimal(12).unwrap(), r.get_decimal(13).unwrap());
2436 };
2437
2438 while start < end {
2439 match iter.next() {
2440 Some(row) => check_row(row),
2441 None => break,
2442 };
2443 start += 1;
2444 }
2445 }
2446
2447 #[test]
2448 fn test_rewrite_no_page_indexes() {
2449 let file = get_test_file("alltypes_tiny_pages.parquet");
2450 let metadata = ParquetMetaDataReader::new()
2451 .with_page_index_policy(PageIndexPolicy::Optional)
2452 .parse_and_finish(&file)
2453 .unwrap();
2454
2455 let props = Arc::new(WriterProperties::builder().build());
2456 let schema = metadata.file_metadata().schema_descr().root_schema_ptr();
2457 let output = Vec::<u8>::new();
2458 let mut writer = SerializedFileWriter::new(output, schema, props).unwrap();
2459
2460 for rg in metadata.row_groups() {
2461 let mut rg_out = writer.next_row_group().unwrap();
2462 for column in rg.columns() {
2463 let result = ColumnCloseResult {
2464 bytes_written: column.compressed_size() as _,
2465 rows_written: rg.num_rows() as _,
2466 metadata: column.clone(),
2467 bloom_filter: None,
2468 column_index: None,
2469 offset_index: None,
2470 };
2471 rg_out.append_column(&file, result).unwrap();
2472 }
2473 rg_out.close().unwrap();
2474 }
2475 writer.close().unwrap();
2476 }
2477
2478 #[test]
2479 fn test_rewrite_missing_column_index() {
2480 let file = get_test_file("alltypes_tiny_pages.parquet");
2482 let metadata = ParquetMetaDataReader::new()
2483 .with_page_index_policy(PageIndexPolicy::Optional)
2484 .parse_and_finish(&file)
2485 .unwrap();
2486
2487 let props = Arc::new(WriterProperties::builder().build());
2488 let schema = metadata.file_metadata().schema_descr().root_schema_ptr();
2489 let output = Vec::<u8>::new();
2490 let mut writer = SerializedFileWriter::new(output, schema, props).unwrap();
2491
2492 let column_indexes = metadata.column_index();
2493 let offset_indexes = metadata.offset_index();
2494
2495 for (rg_idx, rg) in metadata.row_groups().iter().enumerate() {
2496 let rg_column_indexes = column_indexes.and_then(|ci| ci.get(rg_idx));
2497 let rg_offset_indexes = offset_indexes.and_then(|oi| oi.get(rg_idx));
2498 let mut rg_out = writer.next_row_group().unwrap();
2499 for (col_idx, column) in rg.columns().iter().enumerate() {
2500 let column_index = rg_column_indexes.and_then(|row| row.get(col_idx)).cloned();
2501 let offset_index = rg_offset_indexes.and_then(|row| row.get(col_idx)).cloned();
2502 let result = ColumnCloseResult {
2503 bytes_written: column.compressed_size() as _,
2504 rows_written: rg.num_rows() as _,
2505 metadata: column.clone(),
2506 bloom_filter: None,
2507 column_index,
2508 offset_index,
2509 };
2510 rg_out.append_column(&file, result).unwrap();
2511 }
2512 rg_out.close().unwrap();
2513 }
2514 writer.close().unwrap();
2515 }
2516}