1use crate::bloom_filter::Sbbf;
21use crate::format as parquet;
22use crate::format::{ColumnIndex, OffsetIndex};
23use crate::thrift::TSerializable;
24use std::fmt::Debug;
25use std::io::{BufWriter, IoSlice, Read};
26use std::{io::Write, sync::Arc};
27use thrift::protocol::TCompactOutputProtocol;
28
29use crate::column::page_encryption::PageEncryptor;
30use crate::column::writer::{get_typed_column_writer_mut, ColumnCloseResult, ColumnWriterImpl};
31use crate::column::{
32 page::{CompressedPage, PageWriteSpec, PageWriter},
33 writer::{get_column_writer, ColumnWriter},
34};
35use crate::data_type::DataType;
36#[cfg(feature = "encryption")]
37use crate::encryption::encrypt::{
38 get_column_crypto_metadata, FileEncryptionProperties, FileEncryptor,
39};
40use crate::errors::{ParquetError, Result};
41use crate::file::properties::{BloomFilterPosition, WriterPropertiesPtr};
42use crate::file::reader::ChunkReader;
43#[cfg(feature = "encryption")]
44use crate::file::PARQUET_MAGIC_ENCR_FOOTER;
45use crate::file::{metadata::*, PARQUET_MAGIC};
46use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor, TypePtr};
47
48pub struct TrackedWrite<W: Write> {
52 inner: BufWriter<W>,
53 bytes_written: usize,
54}
55
56impl<W: Write> TrackedWrite<W> {
57 pub fn new(inner: W) -> Self {
59 let buf_write = BufWriter::new(inner);
60 Self {
61 inner: buf_write,
62 bytes_written: 0,
63 }
64 }
65
66 pub fn bytes_written(&self) -> usize {
68 self.bytes_written
69 }
70
71 pub fn inner(&self) -> &W {
73 self.inner.get_ref()
74 }
75
76 pub fn inner_mut(&mut self) -> &mut W {
81 self.inner.get_mut()
82 }
83
84 pub fn into_inner(self) -> Result<W> {
86 self.inner.into_inner().map_err(|err| {
87 ParquetError::General(format!("fail to get inner writer: {:?}", err.to_string()))
88 })
89 }
90}
91
92impl<W: Write> Write for TrackedWrite<W> {
93 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
94 let bytes = self.inner.write(buf)?;
95 self.bytes_written += bytes;
96 Ok(bytes)
97 }
98
99 fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> std::io::Result<usize> {
100 let bytes = self.inner.write_vectored(bufs)?;
101 self.bytes_written += bytes;
102 Ok(bytes)
103 }
104
105 fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
106 self.inner.write_all(buf)?;
107 self.bytes_written += buf.len();
108
109 Ok(())
110 }
111
112 fn flush(&mut self) -> std::io::Result<()> {
113 self.inner.flush()
114 }
115}
116
117pub type OnCloseColumnChunk<'a> = Box<dyn FnOnce(ColumnCloseResult) -> Result<()> + 'a>;
119
120pub type OnCloseRowGroup<'a, W> = Box<
126 dyn FnOnce(
127 &'a mut TrackedWrite<W>,
128 RowGroupMetaData,
129 Vec<Option<Sbbf>>,
130 Vec<Option<ColumnIndex>>,
131 Vec<Option<OffsetIndex>>,
132 ) -> Result<()>
133 + 'a
134 + Send,
135>;
136
137pub struct SerializedFileWriter<W: Write> {
157 buf: TrackedWrite<W>,
158 schema: TypePtr,
159 descr: SchemaDescPtr,
160 props: WriterPropertiesPtr,
161 row_groups: Vec<RowGroupMetaData>,
162 bloom_filters: Vec<Vec<Option<Sbbf>>>,
163 column_indexes: Vec<Vec<Option<ColumnIndex>>>,
164 offset_indexes: Vec<Vec<Option<OffsetIndex>>>,
165 row_group_index: usize,
166 kv_metadatas: Vec<KeyValue>,
168 finished: bool,
169 #[cfg(feature = "encryption")]
170 file_encryptor: Option<Arc<FileEncryptor>>,
171}
172
173impl<W: Write> Debug for SerializedFileWriter<W> {
174 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
175 f.debug_struct("SerializedFileWriter")
178 .field("descr", &self.descr)
179 .field("row_group_index", &self.row_group_index)
180 .field("kv_metadatas", &self.kv_metadatas)
181 .finish_non_exhaustive()
182 }
183}
184
185impl<W: Write + Send> SerializedFileWriter<W> {
186 pub fn new(buf: W, schema: TypePtr, properties: WriterPropertiesPtr) -> Result<Self> {
188 let mut buf = TrackedWrite::new(buf);
189
190 let schema_descriptor = SchemaDescriptor::new(schema.clone());
191
192 #[cfg(feature = "encryption")]
193 let file_encryptor = Self::get_file_encryptor(&properties, &schema_descriptor)?;
194
195 Self::start_file(&properties, &mut buf)?;
196 Ok(Self {
197 buf,
198 schema,
199 descr: Arc::new(schema_descriptor),
200 props: properties,
201 row_groups: vec![],
202 bloom_filters: vec![],
203 column_indexes: Vec::new(),
204 offset_indexes: Vec::new(),
205 row_group_index: 0,
206 kv_metadatas: Vec::new(),
207 finished: false,
208 #[cfg(feature = "encryption")]
209 file_encryptor,
210 })
211 }
212
213 #[cfg(feature = "encryption")]
214 fn get_file_encryptor(
215 properties: &WriterPropertiesPtr,
216 schema_descriptor: &SchemaDescriptor,
217 ) -> Result<Option<Arc<FileEncryptor>>> {
218 if let Some(file_encryption_properties) = &properties.file_encryption_properties {
219 file_encryption_properties.validate_encrypted_column_names(schema_descriptor)?;
220
221 Ok(Some(Arc::new(FileEncryptor::new(
222 file_encryption_properties.clone(),
223 )?)))
224 } else {
225 Ok(None)
226 }
227 }
228
229 pub fn next_row_group(&mut self) -> Result<SerializedRowGroupWriter<'_, W>> {
238 self.assert_previous_writer_closed()?;
239 let ordinal = self.row_group_index;
240
241 let ordinal: i16 = ordinal.try_into().map_err(|_| {
242 ParquetError::General(format!(
243 "Parquet does not support more than {} row groups per file (currently: {})",
244 i16::MAX,
245 ordinal
246 ))
247 })?;
248
249 self.row_group_index = self
250 .row_group_index
251 .checked_add(1)
252 .expect("SerializedFileWriter::row_group_index overflowed");
253
254 let bloom_filter_position = self.properties().bloom_filter_position();
255 let row_groups = &mut self.row_groups;
256 let row_bloom_filters = &mut self.bloom_filters;
257 let row_column_indexes = &mut self.column_indexes;
258 let row_offset_indexes = &mut self.offset_indexes;
259 let on_close = move |buf,
260 mut metadata,
261 row_group_bloom_filter,
262 row_group_column_index,
263 row_group_offset_index| {
264 row_bloom_filters.push(row_group_bloom_filter);
265 row_column_indexes.push(row_group_column_index);
266 row_offset_indexes.push(row_group_offset_index);
267 match bloom_filter_position {
269 BloomFilterPosition::AfterRowGroup => {
270 write_bloom_filters(buf, row_bloom_filters, &mut metadata)?
271 }
272 BloomFilterPosition::End => (),
273 };
274 row_groups.push(metadata);
275 Ok(())
276 };
277
278 let row_group_writer = SerializedRowGroupWriter::new(
279 self.descr.clone(),
280 self.props.clone(),
281 &mut self.buf,
282 ordinal,
283 Some(Box::new(on_close)),
284 );
285 #[cfg(feature = "encryption")]
286 let row_group_writer = row_group_writer.with_file_encryptor(self.file_encryptor.clone());
287
288 Ok(row_group_writer)
289 }
290
291 pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
293 &self.row_groups
294 }
295
296 pub fn finish(&mut self) -> Result<parquet::FileMetaData> {
302 self.assert_previous_writer_closed()?;
303 let metadata = self.write_metadata()?;
304 self.buf.flush()?;
305 Ok(metadata)
306 }
307
308 pub fn close(mut self) -> Result<parquet::FileMetaData> {
310 self.finish()
311 }
312
313 #[cfg(not(feature = "encryption"))]
315 fn start_file(_properties: &WriterPropertiesPtr, buf: &mut TrackedWrite<W>) -> Result<()> {
316 buf.write_all(get_file_magic())?;
317 Ok(())
318 }
319
320 #[cfg(feature = "encryption")]
322 fn start_file(properties: &WriterPropertiesPtr, buf: &mut TrackedWrite<W>) -> Result<()> {
323 let magic = get_file_magic(properties.file_encryption_properties.as_ref());
324
325 buf.write_all(magic)?;
326 Ok(())
327 }
328
329 fn write_metadata(&mut self) -> Result<parquet::FileMetaData> {
331 self.finished = true;
332
333 for row_group in &mut self.row_groups {
335 write_bloom_filters(&mut self.buf, &mut self.bloom_filters, row_group)?;
336 }
337
338 let key_value_metadata = match self.props.key_value_metadata() {
339 Some(kv) => Some(kv.iter().chain(&self.kv_metadatas).cloned().collect()),
340 None if self.kv_metadatas.is_empty() => None,
341 None => Some(self.kv_metadatas.clone()),
342 };
343
344 let row_groups = self
345 .row_groups
346 .iter()
347 .map(|v| v.to_thrift())
348 .collect::<Vec<_>>();
349
350 let mut encoder = ThriftMetadataWriter::new(
351 &mut self.buf,
352 &self.schema,
353 &self.descr,
354 row_groups,
355 Some(self.props.created_by().to_string()),
356 self.props.writer_version().as_num(),
357 );
358
359 #[cfg(feature = "encryption")]
360 {
361 encoder = encoder.with_file_encryptor(self.file_encryptor.clone());
362 }
363
364 if let Some(key_value_metadata) = key_value_metadata {
365 encoder = encoder.with_key_value_metadata(key_value_metadata)
366 }
367 encoder = encoder.with_column_indexes(&self.column_indexes);
368 encoder = encoder.with_offset_indexes(&self.offset_indexes);
369 encoder.finish()
370 }
371
372 #[inline]
373 fn assert_previous_writer_closed(&self) -> Result<()> {
374 if self.finished {
375 return Err(general_err!("SerializedFileWriter already finished"));
376 }
377
378 if self.row_group_index != self.row_groups.len() {
379 Err(general_err!("Previous row group writer was not closed"))
380 } else {
381 Ok(())
382 }
383 }
384
385 pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
387 self.kv_metadatas.push(kv_metadata);
388 }
389
390 pub fn schema_descr(&self) -> &SchemaDescriptor {
392 &self.descr
393 }
394
395 pub fn properties(&self) -> &WriterPropertiesPtr {
397 &self.props
398 }
399
400 pub fn inner(&self) -> &W {
402 self.buf.inner()
403 }
404
405 pub fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
414 self.buf.write_all(buf)
415 }
416
417 pub fn inner_mut(&mut self) -> &mut W {
426 self.buf.inner_mut()
427 }
428
429 pub fn into_inner(mut self) -> Result<W> {
431 self.assert_previous_writer_closed()?;
432 let _ = self.write_metadata()?;
433
434 self.buf.into_inner()
435 }
436
437 pub fn bytes_written(&self) -> usize {
439 self.buf.bytes_written()
440 }
441
442 #[cfg(feature = "encryption")]
444 pub(crate) fn file_encryptor(&self) -> Option<Arc<FileEncryptor>> {
445 self.file_encryptor.clone()
446 }
447}
448
449fn write_bloom_filters<W: Write + Send>(
452 buf: &mut TrackedWrite<W>,
453 bloom_filters: &mut [Vec<Option<Sbbf>>],
454 row_group: &mut RowGroupMetaData,
455) -> Result<()> {
456 let row_group_idx: u16 = row_group
461 .ordinal()
462 .expect("Missing row group ordinal")
463 .try_into()
464 .map_err(|_| {
465 ParquetError::General(format!(
466 "Negative row group ordinal: {})",
467 row_group.ordinal().unwrap()
468 ))
469 })?;
470 let row_group_idx = row_group_idx as usize;
471 for (column_idx, column_chunk) in row_group.columns_mut().iter_mut().enumerate() {
472 if let Some(bloom_filter) = bloom_filters[row_group_idx][column_idx].take() {
473 let start_offset = buf.bytes_written();
474 bloom_filter.write(&mut *buf)?;
475 let end_offset = buf.bytes_written();
476 *column_chunk = column_chunk
478 .clone()
479 .into_builder()
480 .set_bloom_filter_offset(Some(start_offset as i64))
481 .set_bloom_filter_length(Some((end_offset - start_offset) as i32))
482 .build()?;
483 }
484 }
485 Ok(())
486}
487
488pub struct SerializedRowGroupWriter<'a, W: Write> {
501 descr: SchemaDescPtr,
502 props: WriterPropertiesPtr,
503 buf: &'a mut TrackedWrite<W>,
504 total_rows_written: Option<u64>,
505 total_bytes_written: u64,
506 total_uncompressed_bytes: i64,
507 column_index: usize,
508 row_group_metadata: Option<RowGroupMetaDataPtr>,
509 column_chunks: Vec<ColumnChunkMetaData>,
510 bloom_filters: Vec<Option<Sbbf>>,
511 column_indexes: Vec<Option<ColumnIndex>>,
512 offset_indexes: Vec<Option<OffsetIndex>>,
513 row_group_index: i16,
514 file_offset: i64,
515 on_close: Option<OnCloseRowGroup<'a, W>>,
516 #[cfg(feature = "encryption")]
517 file_encryptor: Option<Arc<FileEncryptor>>,
518}
519
520impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> {
521 pub fn new(
530 schema_descr: SchemaDescPtr,
531 properties: WriterPropertiesPtr,
532 buf: &'a mut TrackedWrite<W>,
533 row_group_index: i16,
534 on_close: Option<OnCloseRowGroup<'a, W>>,
535 ) -> Self {
536 let num_columns = schema_descr.num_columns();
537 let file_offset = buf.bytes_written() as i64;
538 Self {
539 buf,
540 row_group_index,
541 file_offset,
542 on_close,
543 total_rows_written: None,
544 descr: schema_descr,
545 props: properties,
546 column_index: 0,
547 row_group_metadata: None,
548 column_chunks: Vec::with_capacity(num_columns),
549 bloom_filters: Vec::with_capacity(num_columns),
550 column_indexes: Vec::with_capacity(num_columns),
551 offset_indexes: Vec::with_capacity(num_columns),
552 total_bytes_written: 0,
553 total_uncompressed_bytes: 0,
554 #[cfg(feature = "encryption")]
555 file_encryptor: None,
556 }
557 }
558
559 #[cfg(feature = "encryption")]
560 pub(crate) fn with_file_encryptor(
562 mut self,
563 file_encryptor: Option<Arc<FileEncryptor>>,
564 ) -> Self {
565 self.file_encryptor = file_encryptor;
566 self
567 }
568
569 fn next_column_desc(&mut self) -> Option<ColumnDescPtr> {
571 let ret = self.descr.columns().get(self.column_index)?.clone();
572 self.column_index += 1;
573 Some(ret)
574 }
575
576 fn get_on_close(&mut self) -> (&mut TrackedWrite<W>, OnCloseColumnChunk<'_>) {
578 let total_bytes_written = &mut self.total_bytes_written;
579 let total_uncompressed_bytes = &mut self.total_uncompressed_bytes;
580 let total_rows_written = &mut self.total_rows_written;
581 let column_chunks = &mut self.column_chunks;
582 let column_indexes = &mut self.column_indexes;
583 let offset_indexes = &mut self.offset_indexes;
584 let bloom_filters = &mut self.bloom_filters;
585
586 let on_close = |r: ColumnCloseResult| {
587 *total_bytes_written += r.bytes_written;
589 *total_uncompressed_bytes += r.metadata.uncompressed_size();
590 column_chunks.push(r.metadata);
591 bloom_filters.push(r.bloom_filter);
592 column_indexes.push(r.column_index);
593 offset_indexes.push(r.offset_index);
594
595 if let Some(rows) = *total_rows_written {
596 if rows != r.rows_written {
597 return Err(general_err!(
598 "Incorrect number of rows, expected {} != {} rows",
599 rows,
600 r.rows_written
601 ));
602 }
603 } else {
604 *total_rows_written = Some(r.rows_written);
605 }
606
607 Ok(())
608 };
609 (self.buf, Box::new(on_close))
610 }
611
612 pub(crate) fn next_column_with_factory<'b, F, C>(&'b mut self, factory: F) -> Result<Option<C>>
615 where
616 F: FnOnce(
617 ColumnDescPtr,
618 WriterPropertiesPtr,
619 Box<dyn PageWriter + 'b>,
620 OnCloseColumnChunk<'b>,
621 ) -> Result<C>,
622 {
623 self.assert_previous_writer_closed()?;
624
625 let encryptor_context = self.get_page_encryptor_context();
626
627 Ok(match self.next_column_desc() {
628 Some(column) => {
629 let props = self.props.clone();
630 let (buf, on_close) = self.get_on_close();
631
632 let page_writer = SerializedPageWriter::new(buf);
633 let page_writer =
634 Self::set_page_writer_encryptor(&column, encryptor_context, page_writer)?;
635
636 Some(factory(
637 column,
638 props,
639 Box::new(page_writer),
640 Box::new(on_close),
641 )?)
642 }
643 None => None,
644 })
645 }
646
647 pub fn next_column(&mut self) -> Result<Option<SerializedColumnWriter<'_>>> {
651 self.next_column_with_factory(|descr, props, page_writer, on_close| {
652 let column_writer = get_column_writer(descr, props, page_writer);
653 Ok(SerializedColumnWriter::new(column_writer, Some(on_close)))
654 })
655 }
656
657 pub fn append_column<R: ChunkReader>(
672 &mut self,
673 reader: &R,
674 mut close: ColumnCloseResult,
675 ) -> Result<()> {
676 self.assert_previous_writer_closed()?;
677 let desc = self
678 .next_column_desc()
679 .ok_or_else(|| general_err!("exhausted columns in SerializedRowGroupWriter"))?;
680
681 let metadata = close.metadata;
682
683 if metadata.column_descr() != desc.as_ref() {
684 return Err(general_err!(
685 "column descriptor mismatch, expected {:?} got {:?}",
686 desc,
687 metadata.column_descr()
688 ));
689 }
690
691 let src_dictionary_offset = metadata.dictionary_page_offset();
692 let src_data_offset = metadata.data_page_offset();
693 let src_offset = src_dictionary_offset.unwrap_or(src_data_offset);
694 let src_length = metadata.compressed_size();
695
696 let write_offset = self.buf.bytes_written();
697 let mut read = reader.get_read(src_offset as _)?.take(src_length as _);
698 let write_length = std::io::copy(&mut read, &mut self.buf)?;
699
700 if src_length as u64 != write_length {
701 return Err(general_err!(
702 "Failed to splice column data, expected {read_length} got {write_length}"
703 ));
704 }
705
706 let map_offset = |x| x - src_offset + write_offset as i64;
707 let mut builder = ColumnChunkMetaData::builder(metadata.column_descr_ptr())
708 .set_compression(metadata.compression())
709 .set_encodings(metadata.encodings().clone())
710 .set_total_compressed_size(metadata.compressed_size())
711 .set_total_uncompressed_size(metadata.uncompressed_size())
712 .set_num_values(metadata.num_values())
713 .set_data_page_offset(map_offset(src_data_offset))
714 .set_dictionary_page_offset(src_dictionary_offset.map(map_offset))
715 .set_unencoded_byte_array_data_bytes(metadata.unencoded_byte_array_data_bytes());
716
717 if let Some(rep_hist) = metadata.repetition_level_histogram() {
718 builder = builder.set_repetition_level_histogram(Some(rep_hist.clone()))
719 }
720 if let Some(def_hist) = metadata.definition_level_histogram() {
721 builder = builder.set_definition_level_histogram(Some(def_hist.clone()))
722 }
723 if let Some(statistics) = metadata.statistics() {
724 builder = builder.set_statistics(statistics.clone())
725 }
726 if let Some(page_encoding_stats) = metadata.page_encoding_stats() {
727 builder = builder.set_page_encoding_stats(page_encoding_stats.clone())
728 }
729 builder = self.set_column_crypto_metadata(builder, &metadata);
730 close.metadata = builder.build()?;
731
732 if let Some(offsets) = close.offset_index.as_mut() {
733 for location in &mut offsets.page_locations {
734 location.offset = map_offset(location.offset)
735 }
736 }
737
738 let (_, on_close) = self.get_on_close();
739 on_close(close)
740 }
741
742 pub fn close(mut self) -> Result<RowGroupMetaDataPtr> {
744 if self.row_group_metadata.is_none() {
745 self.assert_previous_writer_closed()?;
746
747 let column_chunks = std::mem::take(&mut self.column_chunks);
748 let row_group_metadata = RowGroupMetaData::builder(self.descr.clone())
749 .set_column_metadata(column_chunks)
750 .set_total_byte_size(self.total_uncompressed_bytes)
751 .set_num_rows(self.total_rows_written.unwrap_or(0) as i64)
752 .set_sorting_columns(self.props.sorting_columns().cloned())
753 .set_ordinal(self.row_group_index)
754 .set_file_offset(self.file_offset)
755 .build()?;
756
757 self.row_group_metadata = Some(Arc::new(row_group_metadata.clone()));
758
759 if let Some(on_close) = self.on_close.take() {
760 on_close(
761 self.buf,
762 row_group_metadata,
763 self.bloom_filters,
764 self.column_indexes,
765 self.offset_indexes,
766 )?
767 }
768 }
769
770 let metadata = self.row_group_metadata.as_ref().unwrap().clone();
771 Ok(metadata)
772 }
773
774 #[cfg(feature = "encryption")]
776 fn set_column_crypto_metadata(
777 &self,
778 builder: ColumnChunkMetaDataBuilder,
779 metadata: &ColumnChunkMetaData,
780 ) -> ColumnChunkMetaDataBuilder {
781 if let Some(file_encryptor) = self.file_encryptor.as_ref() {
782 builder.set_column_crypto_metadata(get_column_crypto_metadata(
783 file_encryptor.properties(),
784 &metadata.column_descr_ptr(),
785 ))
786 } else {
787 builder
788 }
789 }
790
791 #[cfg(feature = "encryption")]
793 fn get_page_encryptor_context(&self) -> PageEncryptorContext {
794 PageEncryptorContext {
795 file_encryptor: self.file_encryptor.clone(),
796 row_group_index: self.row_group_index as usize,
797 column_index: self.column_index,
798 }
799 }
800
801 #[cfg(feature = "encryption")]
803 fn set_page_writer_encryptor<'b>(
804 column: &ColumnDescPtr,
805 context: PageEncryptorContext,
806 page_writer: SerializedPageWriter<'b, W>,
807 ) -> Result<SerializedPageWriter<'b, W>> {
808 let page_encryptor = PageEncryptor::create_if_column_encrypted(
809 &context.file_encryptor,
810 context.row_group_index,
811 context.column_index,
812 &column.path().string(),
813 )?;
814
815 Ok(page_writer.with_page_encryptor(page_encryptor))
816 }
817
818 #[cfg(not(feature = "encryption"))]
820 fn set_column_crypto_metadata(
821 &self,
822 builder: ColumnChunkMetaDataBuilder,
823 _metadata: &ColumnChunkMetaData,
824 ) -> ColumnChunkMetaDataBuilder {
825 builder
826 }
827
828 #[cfg(not(feature = "encryption"))]
829 fn get_page_encryptor_context(&self) -> PageEncryptorContext {
830 PageEncryptorContext {}
831 }
832
833 #[cfg(not(feature = "encryption"))]
835 fn set_page_writer_encryptor<'b>(
836 _column: &ColumnDescPtr,
837 _context: PageEncryptorContext,
838 page_writer: SerializedPageWriter<'b, W>,
839 ) -> Result<SerializedPageWriter<'b, W>> {
840 Ok(page_writer)
841 }
842
843 #[inline]
844 fn assert_previous_writer_closed(&self) -> Result<()> {
845 if self.column_index != self.column_chunks.len() {
846 Err(general_err!("Previous column writer was not closed"))
847 } else {
848 Ok(())
849 }
850 }
851}
852
853#[cfg(feature = "encryption")]
855struct PageEncryptorContext {
856 file_encryptor: Option<Arc<FileEncryptor>>,
857 row_group_index: usize,
858 column_index: usize,
859}
860
861#[cfg(not(feature = "encryption"))]
862struct PageEncryptorContext {}
863
864pub struct SerializedColumnWriter<'a> {
866 inner: ColumnWriter<'a>,
867 on_close: Option<OnCloseColumnChunk<'a>>,
868}
869
870impl<'a> SerializedColumnWriter<'a> {
871 pub fn new(inner: ColumnWriter<'a>, on_close: Option<OnCloseColumnChunk<'a>>) -> Self {
874 Self { inner, on_close }
875 }
876
877 pub fn untyped(&mut self) -> &mut ColumnWriter<'a> {
879 &mut self.inner
880 }
881
882 pub fn typed<T: DataType>(&mut self) -> &mut ColumnWriterImpl<'a, T> {
884 get_typed_column_writer_mut(&mut self.inner)
885 }
886
887 pub fn close(mut self) -> Result<()> {
889 let r = self.inner.close()?;
890 if let Some(on_close) = self.on_close.take() {
891 on_close(r)?
892 }
893
894 Ok(())
895 }
896}
897
898pub struct SerializedPageWriter<'a, W: Write> {
903 sink: &'a mut TrackedWrite<W>,
904 #[cfg(feature = "encryption")]
905 page_encryptor: Option<PageEncryptor>,
906}
907
908impl<'a, W: Write> SerializedPageWriter<'a, W> {
909 pub fn new(sink: &'a mut TrackedWrite<W>) -> Self {
911 Self {
912 sink,
913 #[cfg(feature = "encryption")]
914 page_encryptor: None,
915 }
916 }
917
918 #[inline]
921 fn serialize_page_header(&mut self, header: parquet::PageHeader) -> Result<usize> {
922 let start_pos = self.sink.bytes_written();
923 match self.page_encryptor_and_sink_mut() {
924 Some((page_encryptor, sink)) => {
925 page_encryptor.encrypt_page_header(&header, sink)?;
926 }
927 None => {
928 let mut protocol = TCompactOutputProtocol::new(&mut self.sink);
929 header.write_to_out_protocol(&mut protocol)?;
930 }
931 }
932 Ok(self.sink.bytes_written() - start_pos)
933 }
934}
935
936#[cfg(feature = "encryption")]
937impl<'a, W: Write> SerializedPageWriter<'a, W> {
938 fn with_page_encryptor(mut self, page_encryptor: Option<PageEncryptor>) -> Self {
940 self.page_encryptor = page_encryptor;
941 self
942 }
943
944 fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
945 self.page_encryptor.as_mut()
946 }
947
948 fn page_encryptor_and_sink_mut(
949 &mut self,
950 ) -> Option<(&mut PageEncryptor, &mut &'a mut TrackedWrite<W>)> {
951 self.page_encryptor.as_mut().map(|pe| (pe, &mut self.sink))
952 }
953}
954
955#[cfg(not(feature = "encryption"))]
956impl<'a, W: Write> SerializedPageWriter<'a, W> {
957 fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
958 None
959 }
960
961 fn page_encryptor_and_sink_mut(
962 &mut self,
963 ) -> Option<(&mut PageEncryptor, &mut &'a mut TrackedWrite<W>)> {
964 None
965 }
966}
967
968impl<W: Write + Send> PageWriter for SerializedPageWriter<'_, W> {
969 fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
970 let page = match self.page_encryptor_mut() {
971 Some(page_encryptor) => page_encryptor.encrypt_compressed_page(page)?,
972 None => page,
973 };
974
975 let page_type = page.page_type();
976 let start_pos = self.sink.bytes_written() as u64;
977
978 let page_header = page.to_thrift_header()?;
979 let header_size = self.serialize_page_header(page_header)?;
980
981 self.sink.write_all(page.data())?;
982
983 let mut spec = PageWriteSpec::new();
984 spec.page_type = page_type;
985 spec.uncompressed_size = page.uncompressed_size() + header_size;
986 spec.compressed_size = page.compressed_size() + header_size;
987 spec.offset = start_pos;
988 spec.bytes_written = self.sink.bytes_written() as u64 - start_pos;
989 spec.num_values = page.num_values();
990
991 if let Some(page_encryptor) = self.page_encryptor_mut() {
992 if page.compressed_page().is_data_page() {
993 page_encryptor.increment_page();
994 }
995 }
996 Ok(spec)
997 }
998
999 fn close(&mut self) -> Result<()> {
1000 self.sink.flush()?;
1001 Ok(())
1002 }
1003}
1004
1005#[cfg(feature = "encryption")]
1008pub(crate) fn get_file_magic(
1009 file_encryption_properties: Option<&FileEncryptionProperties>,
1010) -> &'static [u8; 4] {
1011 match file_encryption_properties.as_ref() {
1012 Some(encryption_properties) if encryption_properties.encrypt_footer() => {
1013 &PARQUET_MAGIC_ENCR_FOOTER
1014 }
1015 _ => &PARQUET_MAGIC,
1016 }
1017}
1018
1019#[cfg(not(feature = "encryption"))]
1020pub(crate) fn get_file_magic() -> &'static [u8; 4] {
1021 &PARQUET_MAGIC
1022}
1023
1024#[cfg(test)]
1025mod tests {
1026 use super::*;
1027
1028 #[cfg(feature = "arrow")]
1029 use arrow_array::RecordBatchReader;
1030 use bytes::Bytes;
1031 use std::fs::File;
1032
1033 #[cfg(feature = "arrow")]
1034 use crate::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
1035 #[cfg(feature = "arrow")]
1036 use crate::arrow::ArrowWriter;
1037 use crate::basic::{
1038 ColumnOrder, Compression, ConvertedType, Encoding, LogicalType, Repetition, SortOrder, Type,
1039 };
1040 use crate::column::page::{Page, PageReader};
1041 use crate::column::reader::get_typed_column_reader;
1042 use crate::compression::{create_codec, Codec, CodecOptionsBuilder};
1043 use crate::data_type::{BoolType, ByteArrayType, Int32Type};
1044 use crate::file::page_index::index::Index;
1045 use crate::file::properties::EnabledStatistics;
1046 use crate::file::serialized_reader::ReadOptionsBuilder;
1047 use crate::file::{
1048 properties::{ReaderProperties, WriterProperties, WriterVersion},
1049 reader::{FileReader, SerializedFileReader, SerializedPageReader},
1050 statistics::{from_thrift, to_thrift, Statistics},
1051 };
1052 use crate::format::SortingColumn;
1053 use crate::record::{Row, RowAccessor};
1054 use crate::schema::parser::parse_message_type;
1055 use crate::schema::types;
1056 use crate::schema::types::{ColumnDescriptor, ColumnPath};
1057 use crate::util::test_common::rand_gen::RandGen;
1058
1059 #[test]
1060 fn test_row_group_writer_error_not_all_columns_written() {
1061 let file = tempfile::tempfile().unwrap();
1062 let schema = Arc::new(
1063 types::Type::group_type_builder("schema")
1064 .with_fields(vec![Arc::new(
1065 types::Type::primitive_type_builder("col1", Type::INT32)
1066 .build()
1067 .unwrap(),
1068 )])
1069 .build()
1070 .unwrap(),
1071 );
1072 let props = Default::default();
1073 let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
1074 let row_group_writer = writer.next_row_group().unwrap();
1075 let res = row_group_writer.close();
1076 assert!(res.is_err());
1077 if let Err(err) = res {
1078 assert_eq!(
1079 format!("{err}"),
1080 "Parquet error: Column length mismatch: 1 != 0"
1081 );
1082 }
1083 }
1084
1085 #[test]
1086 fn test_row_group_writer_num_records_mismatch() {
1087 let file = tempfile::tempfile().unwrap();
1088 let schema = Arc::new(
1089 types::Type::group_type_builder("schema")
1090 .with_fields(vec![
1091 Arc::new(
1092 types::Type::primitive_type_builder("col1", Type::INT32)
1093 .with_repetition(Repetition::REQUIRED)
1094 .build()
1095 .unwrap(),
1096 ),
1097 Arc::new(
1098 types::Type::primitive_type_builder("col2", Type::INT32)
1099 .with_repetition(Repetition::REQUIRED)
1100 .build()
1101 .unwrap(),
1102 ),
1103 ])
1104 .build()
1105 .unwrap(),
1106 );
1107 let props = Default::default();
1108 let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
1109 let mut row_group_writer = writer.next_row_group().unwrap();
1110
1111 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
1112 col_writer
1113 .typed::<Int32Type>()
1114 .write_batch(&[1, 2, 3], None, None)
1115 .unwrap();
1116 col_writer.close().unwrap();
1117
1118 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
1119 col_writer
1120 .typed::<Int32Type>()
1121 .write_batch(&[1, 2], None, None)
1122 .unwrap();
1123
1124 let err = col_writer.close().unwrap_err();
1125 assert_eq!(
1126 err.to_string(),
1127 "Parquet error: Incorrect number of rows, expected 3 != 2 rows"
1128 );
1129 }
1130
1131 #[test]
1132 fn test_file_writer_empty_file() {
1133 let file = tempfile::tempfile().unwrap();
1134
1135 let schema = Arc::new(
1136 types::Type::group_type_builder("schema")
1137 .with_fields(vec![Arc::new(
1138 types::Type::primitive_type_builder("col1", Type::INT32)
1139 .build()
1140 .unwrap(),
1141 )])
1142 .build()
1143 .unwrap(),
1144 );
1145 let props = Default::default();
1146 let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1147 writer.close().unwrap();
1148
1149 let reader = SerializedFileReader::new(file).unwrap();
1150 assert_eq!(reader.get_row_iter(None).unwrap().count(), 0);
1151 }
1152
1153 #[test]
1154 fn test_file_writer_column_orders_populated() {
1155 let file = tempfile::tempfile().unwrap();
1156
1157 let schema = Arc::new(
1158 types::Type::group_type_builder("schema")
1159 .with_fields(vec![
1160 Arc::new(
1161 types::Type::primitive_type_builder("col1", Type::INT32)
1162 .build()
1163 .unwrap(),
1164 ),
1165 Arc::new(
1166 types::Type::primitive_type_builder("col2", Type::FIXED_LEN_BYTE_ARRAY)
1167 .with_converted_type(ConvertedType::INTERVAL)
1168 .with_length(12)
1169 .build()
1170 .unwrap(),
1171 ),
1172 Arc::new(
1173 types::Type::group_type_builder("nested")
1174 .with_repetition(Repetition::REQUIRED)
1175 .with_fields(vec![
1176 Arc::new(
1177 types::Type::primitive_type_builder(
1178 "col3",
1179 Type::FIXED_LEN_BYTE_ARRAY,
1180 )
1181 .with_logical_type(Some(LogicalType::Float16))
1182 .with_length(2)
1183 .build()
1184 .unwrap(),
1185 ),
1186 Arc::new(
1187 types::Type::primitive_type_builder("col4", Type::BYTE_ARRAY)
1188 .with_logical_type(Some(LogicalType::String))
1189 .build()
1190 .unwrap(),
1191 ),
1192 ])
1193 .build()
1194 .unwrap(),
1195 ),
1196 ])
1197 .build()
1198 .unwrap(),
1199 );
1200
1201 let props = Default::default();
1202 let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1203 writer.close().unwrap();
1204
1205 let reader = SerializedFileReader::new(file).unwrap();
1206
1207 let expected = vec![
1209 ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED),
1211 ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::UNDEFINED),
1213 ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED),
1215 ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::UNSIGNED),
1217 ];
1218 let actual = reader.metadata().file_metadata().column_orders();
1219
1220 assert!(actual.is_some());
1221 let actual = actual.unwrap();
1222 assert_eq!(*actual, expected);
1223 }
1224
1225 #[test]
1226 fn test_file_writer_with_metadata() {
1227 let file = tempfile::tempfile().unwrap();
1228
1229 let schema = Arc::new(
1230 types::Type::group_type_builder("schema")
1231 .with_fields(vec![Arc::new(
1232 types::Type::primitive_type_builder("col1", Type::INT32)
1233 .build()
1234 .unwrap(),
1235 )])
1236 .build()
1237 .unwrap(),
1238 );
1239 let props = Arc::new(
1240 WriterProperties::builder()
1241 .set_key_value_metadata(Some(vec![KeyValue::new(
1242 "key".to_string(),
1243 "value".to_string(),
1244 )]))
1245 .build(),
1246 );
1247 let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1248 writer.close().unwrap();
1249
1250 let reader = SerializedFileReader::new(file).unwrap();
1251 assert_eq!(
1252 reader
1253 .metadata()
1254 .file_metadata()
1255 .key_value_metadata()
1256 .to_owned()
1257 .unwrap()
1258 .len(),
1259 1
1260 );
1261 }
1262
1263 #[test]
1264 fn test_file_writer_v2_with_metadata() {
1265 let file = tempfile::tempfile().unwrap();
1266 let field_logical_type = Some(LogicalType::Integer {
1267 bit_width: 8,
1268 is_signed: false,
1269 });
1270 let field = Arc::new(
1271 types::Type::primitive_type_builder("col1", Type::INT32)
1272 .with_logical_type(field_logical_type.clone())
1273 .with_converted_type(field_logical_type.into())
1274 .build()
1275 .unwrap(),
1276 );
1277 let schema = Arc::new(
1278 types::Type::group_type_builder("schema")
1279 .with_fields(vec![field.clone()])
1280 .build()
1281 .unwrap(),
1282 );
1283 let props = Arc::new(
1284 WriterProperties::builder()
1285 .set_key_value_metadata(Some(vec![KeyValue::new(
1286 "key".to_string(),
1287 "value".to_string(),
1288 )]))
1289 .set_writer_version(WriterVersion::PARQUET_2_0)
1290 .build(),
1291 );
1292 let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1293 writer.close().unwrap();
1294
1295 let reader = SerializedFileReader::new(file).unwrap();
1296
1297 assert_eq!(
1298 reader
1299 .metadata()
1300 .file_metadata()
1301 .key_value_metadata()
1302 .to_owned()
1303 .unwrap()
1304 .len(),
1305 1
1306 );
1307
1308 let fields = reader.metadata().file_metadata().schema().get_fields();
1310 assert_eq!(fields.len(), 1);
1311 assert_eq!(fields[0], field);
1312 }
1313
1314 #[test]
1315 fn test_file_writer_with_sorting_columns_metadata() {
1316 let file = tempfile::tempfile().unwrap();
1317
1318 let schema = Arc::new(
1319 types::Type::group_type_builder("schema")
1320 .with_fields(vec![
1321 Arc::new(
1322 types::Type::primitive_type_builder("col1", Type::INT32)
1323 .build()
1324 .unwrap(),
1325 ),
1326 Arc::new(
1327 types::Type::primitive_type_builder("col2", Type::INT32)
1328 .build()
1329 .unwrap(),
1330 ),
1331 ])
1332 .build()
1333 .unwrap(),
1334 );
1335 let expected_result = Some(vec![SortingColumn {
1336 column_idx: 0,
1337 descending: false,
1338 nulls_first: true,
1339 }]);
1340 let props = Arc::new(
1341 WriterProperties::builder()
1342 .set_key_value_metadata(Some(vec![KeyValue::new(
1343 "key".to_string(),
1344 "value".to_string(),
1345 )]))
1346 .set_sorting_columns(expected_result.clone())
1347 .build(),
1348 );
1349 let mut writer =
1350 SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1351 let mut row_group_writer = writer.next_row_group().expect("get row group writer");
1352
1353 let col_writer = row_group_writer.next_column().unwrap().unwrap();
1354 col_writer.close().unwrap();
1355
1356 let col_writer = row_group_writer.next_column().unwrap().unwrap();
1357 col_writer.close().unwrap();
1358
1359 row_group_writer.close().unwrap();
1360 writer.close().unwrap();
1361
1362 let reader = SerializedFileReader::new(file).unwrap();
1363 let result: Vec<Option<&Vec<SortingColumn>>> = reader
1364 .metadata()
1365 .row_groups()
1366 .iter()
1367 .map(|f| f.sorting_columns())
1368 .collect();
1369 assert_eq!(expected_result.as_ref(), result[0]);
1371 }
1372
1373 #[test]
1374 fn test_file_writer_empty_row_groups() {
1375 let file = tempfile::tempfile().unwrap();
1376 test_file_roundtrip(file, vec![]);
1377 }
1378
1379 #[test]
1380 fn test_file_writer_single_row_group() {
1381 let file = tempfile::tempfile().unwrap();
1382 test_file_roundtrip(file, vec![vec![1, 2, 3, 4, 5]]);
1383 }
1384
1385 #[test]
1386 fn test_file_writer_multiple_row_groups() {
1387 let file = tempfile::tempfile().unwrap();
1388 test_file_roundtrip(
1389 file,
1390 vec![
1391 vec![1, 2, 3, 4, 5],
1392 vec![1, 2, 3],
1393 vec![1],
1394 vec![1, 2, 3, 4, 5, 6],
1395 ],
1396 );
1397 }
1398
1399 #[test]
1400 fn test_file_writer_multiple_large_row_groups() {
1401 let file = tempfile::tempfile().unwrap();
1402 test_file_roundtrip(
1403 file,
1404 vec![vec![123; 1024], vec![124; 1000], vec![125; 15], vec![]],
1405 );
1406 }
1407
1408 #[test]
1409 fn test_page_writer_data_pages() {
1410 let pages = vec![
1411 Page::DataPage {
1412 buf: Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]),
1413 num_values: 10,
1414 encoding: Encoding::DELTA_BINARY_PACKED,
1415 def_level_encoding: Encoding::RLE,
1416 rep_level_encoding: Encoding::RLE,
1417 statistics: Some(Statistics::int32(Some(1), Some(3), None, Some(7), true)),
1418 },
1419 Page::DataPageV2 {
1420 buf: Bytes::from(vec![4; 128]),
1421 num_values: 10,
1422 encoding: Encoding::DELTA_BINARY_PACKED,
1423 num_nulls: 2,
1424 num_rows: 12,
1425 def_levels_byte_len: 24,
1426 rep_levels_byte_len: 32,
1427 is_compressed: false,
1428 statistics: Some(Statistics::int32(Some(1), Some(3), None, Some(7), true)),
1429 },
1430 ];
1431
1432 test_page_roundtrip(&pages[..], Compression::SNAPPY, Type::INT32);
1433 test_page_roundtrip(&pages[..], Compression::UNCOMPRESSED, Type::INT32);
1434 }
1435
1436 #[test]
1437 fn test_page_writer_dict_pages() {
1438 let pages = vec![
1439 Page::DictionaryPage {
1440 buf: Bytes::from(vec![1, 2, 3, 4, 5]),
1441 num_values: 5,
1442 encoding: Encoding::RLE_DICTIONARY,
1443 is_sorted: false,
1444 },
1445 Page::DataPage {
1446 buf: Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]),
1447 num_values: 10,
1448 encoding: Encoding::DELTA_BINARY_PACKED,
1449 def_level_encoding: Encoding::RLE,
1450 rep_level_encoding: Encoding::RLE,
1451 statistics: Some(Statistics::int32(Some(1), Some(3), None, Some(7), true)),
1452 },
1453 Page::DataPageV2 {
1454 buf: Bytes::from(vec![4; 128]),
1455 num_values: 10,
1456 encoding: Encoding::DELTA_BINARY_PACKED,
1457 num_nulls: 2,
1458 num_rows: 12,
1459 def_levels_byte_len: 24,
1460 rep_levels_byte_len: 32,
1461 is_compressed: false,
1462 statistics: None,
1463 },
1464 ];
1465
1466 test_page_roundtrip(&pages[..], Compression::SNAPPY, Type::INT32);
1467 test_page_roundtrip(&pages[..], Compression::UNCOMPRESSED, Type::INT32);
1468 }
1469
1470 fn test_page_roundtrip(pages: &[Page], codec: Compression, physical_type: Type) {
1474 let mut compressed_pages = vec![];
1475 let mut total_num_values = 0i64;
1476 let codec_options = CodecOptionsBuilder::default()
1477 .set_backward_compatible_lz4(false)
1478 .build();
1479 let mut compressor = create_codec(codec, &codec_options).unwrap();
1480
1481 for page in pages {
1482 let uncompressed_len = page.buffer().len();
1483
1484 let compressed_page = match *page {
1485 Page::DataPage {
1486 ref buf,
1487 num_values,
1488 encoding,
1489 def_level_encoding,
1490 rep_level_encoding,
1491 ref statistics,
1492 } => {
1493 total_num_values += num_values as i64;
1494 let output_buf = compress_helper(compressor.as_mut(), buf);
1495
1496 Page::DataPage {
1497 buf: Bytes::from(output_buf),
1498 num_values,
1499 encoding,
1500 def_level_encoding,
1501 rep_level_encoding,
1502 statistics: from_thrift(physical_type, to_thrift(statistics.as_ref()))
1503 .unwrap(),
1504 }
1505 }
1506 Page::DataPageV2 {
1507 ref buf,
1508 num_values,
1509 encoding,
1510 num_nulls,
1511 num_rows,
1512 def_levels_byte_len,
1513 rep_levels_byte_len,
1514 ref statistics,
1515 ..
1516 } => {
1517 total_num_values += num_values as i64;
1518 let offset = (def_levels_byte_len + rep_levels_byte_len) as usize;
1519 let cmp_buf = compress_helper(compressor.as_mut(), &buf[offset..]);
1520 let mut output_buf = Vec::from(&buf[..offset]);
1521 output_buf.extend_from_slice(&cmp_buf[..]);
1522
1523 Page::DataPageV2 {
1524 buf: Bytes::from(output_buf),
1525 num_values,
1526 encoding,
1527 num_nulls,
1528 num_rows,
1529 def_levels_byte_len,
1530 rep_levels_byte_len,
1531 is_compressed: compressor.is_some(),
1532 statistics: from_thrift(physical_type, to_thrift(statistics.as_ref()))
1533 .unwrap(),
1534 }
1535 }
1536 Page::DictionaryPage {
1537 ref buf,
1538 num_values,
1539 encoding,
1540 is_sorted,
1541 } => {
1542 let output_buf = compress_helper(compressor.as_mut(), buf);
1543
1544 Page::DictionaryPage {
1545 buf: Bytes::from(output_buf),
1546 num_values,
1547 encoding,
1548 is_sorted,
1549 }
1550 }
1551 };
1552
1553 let compressed_page = CompressedPage::new(compressed_page, uncompressed_len);
1554 compressed_pages.push(compressed_page);
1555 }
1556
1557 let mut buffer: Vec<u8> = vec![];
1558 let mut result_pages: Vec<Page> = vec![];
1559 {
1560 let mut writer = TrackedWrite::new(&mut buffer);
1561 let mut page_writer = SerializedPageWriter::new(&mut writer);
1562
1563 for page in compressed_pages {
1564 page_writer.write_page(page).unwrap();
1565 }
1566 page_writer.close().unwrap();
1567 }
1568 {
1569 let reader = bytes::Bytes::from(buffer);
1570
1571 let t = types::Type::primitive_type_builder("t", physical_type)
1572 .build()
1573 .unwrap();
1574
1575 let desc = ColumnDescriptor::new(Arc::new(t), 0, 0, ColumnPath::new(vec![]));
1576 let meta = ColumnChunkMetaData::builder(Arc::new(desc))
1577 .set_compression(codec)
1578 .set_total_compressed_size(reader.len() as i64)
1579 .set_num_values(total_num_values)
1580 .build()
1581 .unwrap();
1582
1583 let props = ReaderProperties::builder()
1584 .set_backward_compatible_lz4(false)
1585 .build();
1586 let mut page_reader = SerializedPageReader::new_with_properties(
1587 Arc::new(reader),
1588 &meta,
1589 total_num_values as usize,
1590 None,
1591 Arc::new(props),
1592 )
1593 .unwrap();
1594
1595 while let Some(page) = page_reader.get_next_page().unwrap() {
1596 result_pages.push(page);
1597 }
1598 }
1599
1600 assert_eq!(result_pages.len(), pages.len());
1601 for i in 0..result_pages.len() {
1602 assert_page(&result_pages[i], &pages[i]);
1603 }
1604 }
1605
1606 fn compress_helper(compressor: Option<&mut Box<dyn Codec>>, data: &[u8]) -> Vec<u8> {
1608 let mut output_buf = vec![];
1609 if let Some(cmpr) = compressor {
1610 cmpr.compress(data, &mut output_buf).unwrap();
1611 } else {
1612 output_buf.extend_from_slice(data);
1613 }
1614 output_buf
1615 }
1616
1617 fn assert_page(left: &Page, right: &Page) {
1619 assert_eq!(left.page_type(), right.page_type());
1620 assert_eq!(&left.buffer(), &right.buffer());
1621 assert_eq!(left.num_values(), right.num_values());
1622 assert_eq!(left.encoding(), right.encoding());
1623 assert_eq!(to_thrift(left.statistics()), to_thrift(right.statistics()));
1624 }
1625
1626 fn test_roundtrip_i32<W, R>(
1628 file: W,
1629 data: Vec<Vec<i32>>,
1630 compression: Compression,
1631 ) -> crate::format::FileMetaData
1632 where
1633 W: Write + Send,
1634 R: ChunkReader + From<W> + 'static,
1635 {
1636 test_roundtrip::<W, R, Int32Type, _>(file, data, |r| r.get_int(0).unwrap(), compression)
1637 }
1638
1639 fn test_roundtrip<W, R, D, F>(
1642 mut file: W,
1643 data: Vec<Vec<D::T>>,
1644 value: F,
1645 compression: Compression,
1646 ) -> crate::format::FileMetaData
1647 where
1648 W: Write + Send,
1649 R: ChunkReader + From<W> + 'static,
1650 D: DataType,
1651 F: Fn(Row) -> D::T,
1652 {
1653 let schema = Arc::new(
1654 types::Type::group_type_builder("schema")
1655 .with_fields(vec![Arc::new(
1656 types::Type::primitive_type_builder("col1", D::get_physical_type())
1657 .with_repetition(Repetition::REQUIRED)
1658 .build()
1659 .unwrap(),
1660 )])
1661 .build()
1662 .unwrap(),
1663 );
1664 let props = Arc::new(
1665 WriterProperties::builder()
1666 .set_compression(compression)
1667 .build(),
1668 );
1669 let mut file_writer = SerializedFileWriter::new(&mut file, schema, props).unwrap();
1670 let mut rows: i64 = 0;
1671
1672 for (idx, subset) in data.iter().enumerate() {
1673 let row_group_file_offset = file_writer.buf.bytes_written();
1674 let mut row_group_writer = file_writer.next_row_group().unwrap();
1675 if let Some(mut writer) = row_group_writer.next_column().unwrap() {
1676 rows += writer
1677 .typed::<D>()
1678 .write_batch(&subset[..], None, None)
1679 .unwrap() as i64;
1680 writer.close().unwrap();
1681 }
1682 let last_group = row_group_writer.close().unwrap();
1683 let flushed = file_writer.flushed_row_groups();
1684 assert_eq!(flushed.len(), idx + 1);
1685 assert_eq!(Some(idx as i16), last_group.ordinal());
1686 assert_eq!(Some(row_group_file_offset as i64), last_group.file_offset());
1687 assert_eq!(&flushed[idx], last_group.as_ref());
1688 }
1689 let file_metadata = file_writer.close().unwrap();
1690
1691 let reader = SerializedFileReader::new(R::from(file)).unwrap();
1692 assert_eq!(reader.num_row_groups(), data.len());
1693 assert_eq!(
1694 reader.metadata().file_metadata().num_rows(),
1695 rows,
1696 "row count in metadata not equal to number of rows written"
1697 );
1698 for (i, item) in data.iter().enumerate().take(reader.num_row_groups()) {
1699 let row_group_reader = reader.get_row_group(i).unwrap();
1700 let iter = row_group_reader.get_row_iter(None).unwrap();
1701 let res: Vec<_> = iter.map(|row| row.unwrap()).map(&value).collect();
1702 let row_group_size = row_group_reader.metadata().total_byte_size();
1703 let uncompressed_size: i64 = row_group_reader
1704 .metadata()
1705 .columns()
1706 .iter()
1707 .map(|v| v.uncompressed_size())
1708 .sum();
1709 assert_eq!(row_group_size, uncompressed_size);
1710 assert_eq!(res, *item);
1711 }
1712 file_metadata
1713 }
1714
1715 fn test_file_roundtrip(file: File, data: Vec<Vec<i32>>) -> crate::format::FileMetaData {
1718 test_roundtrip_i32::<File, File>(file, data, Compression::UNCOMPRESSED)
1719 }
1720
1721 #[test]
1722 fn test_bytes_writer_empty_row_groups() {
1723 test_bytes_roundtrip(vec![], Compression::UNCOMPRESSED);
1724 }
1725
1726 #[test]
1727 fn test_bytes_writer_single_row_group() {
1728 test_bytes_roundtrip(vec![vec![1, 2, 3, 4, 5]], Compression::UNCOMPRESSED);
1729 }
1730
1731 #[test]
1732 fn test_bytes_writer_multiple_row_groups() {
1733 test_bytes_roundtrip(
1734 vec![
1735 vec![1, 2, 3, 4, 5],
1736 vec![1, 2, 3],
1737 vec![1],
1738 vec![1, 2, 3, 4, 5, 6],
1739 ],
1740 Compression::UNCOMPRESSED,
1741 );
1742 }
1743
1744 #[test]
1745 fn test_bytes_writer_single_row_group_compressed() {
1746 test_bytes_roundtrip(vec![vec![1, 2, 3, 4, 5]], Compression::SNAPPY);
1747 }
1748
1749 #[test]
1750 fn test_bytes_writer_multiple_row_groups_compressed() {
1751 test_bytes_roundtrip(
1752 vec![
1753 vec![1, 2, 3, 4, 5],
1754 vec![1, 2, 3],
1755 vec![1],
1756 vec![1, 2, 3, 4, 5, 6],
1757 ],
1758 Compression::SNAPPY,
1759 );
1760 }
1761
1762 fn test_bytes_roundtrip(data: Vec<Vec<i32>>, compression: Compression) {
1763 test_roundtrip_i32::<Vec<u8>, Bytes>(Vec::with_capacity(1024), data, compression);
1764 }
1765
1766 #[test]
1767 fn test_boolean_roundtrip() {
1768 let my_bool_values: Vec<_> = (0..2049).map(|idx| idx % 2 == 0).collect();
1769 test_roundtrip::<Vec<u8>, Bytes, BoolType, _>(
1770 Vec::with_capacity(1024),
1771 vec![my_bool_values],
1772 |r| r.get_bool(0).unwrap(),
1773 Compression::UNCOMPRESSED,
1774 );
1775 }
1776
1777 #[test]
1778 fn test_boolean_compressed_roundtrip() {
1779 let my_bool_values: Vec<_> = (0..2049).map(|idx| idx % 2 == 0).collect();
1780 test_roundtrip::<Vec<u8>, Bytes, BoolType, _>(
1781 Vec::with_capacity(1024),
1782 vec![my_bool_values],
1783 |r| r.get_bool(0).unwrap(),
1784 Compression::SNAPPY,
1785 );
1786 }
1787
1788 #[test]
1789 fn test_column_offset_index_file() {
1790 let file = tempfile::tempfile().unwrap();
1791 let file_metadata = test_file_roundtrip(file, vec![vec![1, 2, 3, 4, 5]]);
1792 file_metadata.row_groups.iter().for_each(|row_group| {
1793 row_group.columns.iter().for_each(|column_chunk| {
1794 assert_ne!(None, column_chunk.column_index_offset);
1795 assert_ne!(None, column_chunk.column_index_length);
1796
1797 assert_ne!(None, column_chunk.offset_index_offset);
1798 assert_ne!(None, column_chunk.offset_index_length);
1799 })
1800 });
1801 }
1802
1803 fn test_kv_metadata(initial_kv: Option<Vec<KeyValue>>, final_kv: Option<Vec<KeyValue>>) {
1804 let schema = Arc::new(
1805 types::Type::group_type_builder("schema")
1806 .with_fields(vec![Arc::new(
1807 types::Type::primitive_type_builder("col1", Type::INT32)
1808 .with_repetition(Repetition::REQUIRED)
1809 .build()
1810 .unwrap(),
1811 )])
1812 .build()
1813 .unwrap(),
1814 );
1815 let mut out = Vec::with_capacity(1024);
1816 let props = Arc::new(
1817 WriterProperties::builder()
1818 .set_key_value_metadata(initial_kv.clone())
1819 .build(),
1820 );
1821 let mut writer = SerializedFileWriter::new(&mut out, schema, props).unwrap();
1822 let mut row_group_writer = writer.next_row_group().unwrap();
1823 let column = row_group_writer.next_column().unwrap().unwrap();
1824 column.close().unwrap();
1825 row_group_writer.close().unwrap();
1826 if let Some(kvs) = &final_kv {
1827 for kv in kvs {
1828 writer.append_key_value_metadata(kv.clone())
1829 }
1830 }
1831 writer.close().unwrap();
1832
1833 let reader = SerializedFileReader::new(Bytes::from(out)).unwrap();
1834 let metadata = reader.metadata().file_metadata();
1835 let keys = metadata.key_value_metadata();
1836
1837 match (initial_kv, final_kv) {
1838 (Some(a), Some(b)) => {
1839 let keys = keys.unwrap();
1840 assert_eq!(keys.len(), a.len() + b.len());
1841 assert_eq!(&keys[..a.len()], a.as_slice());
1842 assert_eq!(&keys[a.len()..], b.as_slice());
1843 }
1844 (Some(v), None) => assert_eq!(keys.unwrap(), &v),
1845 (None, Some(v)) if !v.is_empty() => assert_eq!(keys.unwrap(), &v),
1846 _ => assert!(keys.is_none()),
1847 }
1848 }
1849
1850 #[test]
1851 fn test_append_metadata() {
1852 let kv1 = KeyValue::new("cupcakes".to_string(), "awesome".to_string());
1853 let kv2 = KeyValue::new("bingo".to_string(), "bongo".to_string());
1854
1855 test_kv_metadata(None, None);
1856 test_kv_metadata(Some(vec![kv1.clone()]), None);
1857 test_kv_metadata(None, Some(vec![kv2.clone()]));
1858 test_kv_metadata(Some(vec![kv1.clone()]), Some(vec![kv2.clone()]));
1859 test_kv_metadata(Some(vec![]), Some(vec![kv2]));
1860 test_kv_metadata(Some(vec![]), Some(vec![]));
1861 test_kv_metadata(Some(vec![kv1]), Some(vec![]));
1862 test_kv_metadata(None, Some(vec![]));
1863 }
1864
1865 #[test]
1866 fn test_backwards_compatible_statistics() {
1867 let message_type = "
1868 message test_schema {
1869 REQUIRED INT32 decimal1 (DECIMAL(8,2));
1870 REQUIRED INT32 i32 (INTEGER(32,true));
1871 REQUIRED INT32 u32 (INTEGER(32,false));
1872 }
1873 ";
1874
1875 let schema = Arc::new(parse_message_type(message_type).unwrap());
1876 let props = Default::default();
1877 let mut writer = SerializedFileWriter::new(vec![], schema, props).unwrap();
1878 let mut row_group_writer = writer.next_row_group().unwrap();
1879
1880 for _ in 0..3 {
1881 let mut writer = row_group_writer.next_column().unwrap().unwrap();
1882 writer
1883 .typed::<Int32Type>()
1884 .write_batch(&[1, 2, 3], None, None)
1885 .unwrap();
1886 writer.close().unwrap();
1887 }
1888 let metadata = row_group_writer.close().unwrap();
1889 writer.close().unwrap();
1890
1891 let thrift = metadata.to_thrift();
1892 let encoded_stats: Vec<_> = thrift
1893 .columns
1894 .into_iter()
1895 .map(|x| x.meta_data.unwrap().statistics.unwrap())
1896 .collect();
1897
1898 let s = &encoded_stats[0];
1900 assert_eq!(s.min.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1901 assert_eq!(s.max.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1902 assert_eq!(s.min_value.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1903 assert_eq!(s.max_value.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1904
1905 let s = &encoded_stats[1];
1907 assert_eq!(s.min.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1908 assert_eq!(s.max.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1909 assert_eq!(s.min_value.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1910 assert_eq!(s.max_value.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1911
1912 let s = &encoded_stats[2];
1914 assert_eq!(s.min.as_deref(), None);
1915 assert_eq!(s.max.as_deref(), None);
1916 assert_eq!(s.min_value.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1917 assert_eq!(s.max_value.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1918 }
1919
1920 #[test]
1921 fn test_spliced_write() {
1922 let message_type = "
1923 message test_schema {
1924 REQUIRED INT32 i32 (INTEGER(32,true));
1925 REQUIRED INT32 u32 (INTEGER(32,false));
1926 }
1927 ";
1928 let schema = Arc::new(parse_message_type(message_type).unwrap());
1929 let props = Arc::new(WriterProperties::builder().build());
1930
1931 let mut file = Vec::with_capacity(1024);
1932 let mut file_writer = SerializedFileWriter::new(&mut file, schema, props.clone()).unwrap();
1933
1934 let columns = file_writer.descr.columns();
1935 let mut column_state: Vec<(_, Option<ColumnCloseResult>)> = columns
1936 .iter()
1937 .map(|_| (TrackedWrite::new(Vec::with_capacity(1024)), None))
1938 .collect();
1939
1940 let mut column_state_slice = column_state.as_mut_slice();
1941 let mut column_writers = Vec::with_capacity(columns.len());
1942 for c in columns {
1943 let ((buf, out), tail) = column_state_slice.split_first_mut().unwrap();
1944 column_state_slice = tail;
1945
1946 let page_writer = Box::new(SerializedPageWriter::new(buf));
1947 let col_writer = get_column_writer(c.clone(), props.clone(), page_writer);
1948 column_writers.push(SerializedColumnWriter::new(
1949 col_writer,
1950 Some(Box::new(|on_close| {
1951 *out = Some(on_close);
1952 Ok(())
1953 })),
1954 ));
1955 }
1956
1957 let column_data = [[1, 2, 3, 4], [7, 3, 7, 3]];
1958
1959 for (writer, batch) in column_writers.iter_mut().zip(column_data) {
1961 let writer = writer.typed::<Int32Type>();
1962 writer.write_batch(&batch, None, None).unwrap();
1963 }
1964
1965 for writer in column_writers {
1967 writer.close().unwrap()
1968 }
1969
1970 let mut row_group_writer = file_writer.next_row_group().unwrap();
1972 for (write, close) in column_state {
1973 let buf = Bytes::from(write.into_inner().unwrap());
1974 row_group_writer
1975 .append_column(&buf, close.unwrap())
1976 .unwrap();
1977 }
1978 row_group_writer.close().unwrap();
1979 file_writer.close().unwrap();
1980
1981 let file = Bytes::from(file);
1983 let test_read = |reader: SerializedFileReader<Bytes>| {
1984 let row_group = reader.get_row_group(0).unwrap();
1985
1986 let mut out = Vec::with_capacity(4);
1987 let c1 = row_group.get_column_reader(0).unwrap();
1988 let mut c1 = get_typed_column_reader::<Int32Type>(c1);
1989 c1.read_records(4, None, None, &mut out).unwrap();
1990 assert_eq!(out, column_data[0]);
1991
1992 out.clear();
1993
1994 let c2 = row_group.get_column_reader(1).unwrap();
1995 let mut c2 = get_typed_column_reader::<Int32Type>(c2);
1996 c2.read_records(4, None, None, &mut out).unwrap();
1997 assert_eq!(out, column_data[1]);
1998 };
1999
2000 let reader = SerializedFileReader::new(file.clone()).unwrap();
2001 test_read(reader);
2002
2003 let options = ReadOptionsBuilder::new().with_page_index().build();
2004 let reader = SerializedFileReader::new_with_options(file, options).unwrap();
2005 test_read(reader);
2006 }
2007
2008 #[test]
2009 fn test_disabled_statistics() {
2010 let message_type = "
2011 message test_schema {
2012 REQUIRED INT32 a;
2013 REQUIRED INT32 b;
2014 }
2015 ";
2016 let schema = Arc::new(parse_message_type(message_type).unwrap());
2017 let props = WriterProperties::builder()
2018 .set_statistics_enabled(EnabledStatistics::None)
2019 .set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
2020 .set_offset_index_disabled(true) .build();
2022 let mut file = Vec::with_capacity(1024);
2023 let mut file_writer =
2024 SerializedFileWriter::new(&mut file, schema, Arc::new(props)).unwrap();
2025
2026 let mut row_group_writer = file_writer.next_row_group().unwrap();
2027 let mut a_writer = row_group_writer.next_column().unwrap().unwrap();
2028 let col_writer = a_writer.typed::<Int32Type>();
2029 col_writer.write_batch(&[1, 2, 3], None, None).unwrap();
2030 a_writer.close().unwrap();
2031
2032 let mut b_writer = row_group_writer.next_column().unwrap().unwrap();
2033 let col_writer = b_writer.typed::<Int32Type>();
2034 col_writer.write_batch(&[4, 5, 6], None, None).unwrap();
2035 b_writer.close().unwrap();
2036 row_group_writer.close().unwrap();
2037
2038 let metadata = file_writer.finish().unwrap();
2039 assert_eq!(metadata.row_groups.len(), 1);
2040 let row_group = &metadata.row_groups[0];
2041 assert_eq!(row_group.columns.len(), 2);
2042 assert!(row_group.columns[0].offset_index_offset.is_some());
2044 assert!(row_group.columns[0].column_index_offset.is_some());
2045 assert!(row_group.columns[1].offset_index_offset.is_some());
2047 assert!(row_group.columns[1].column_index_offset.is_none());
2048
2049 let err = file_writer.next_row_group().err().unwrap().to_string();
2050 assert_eq!(err, "Parquet error: SerializedFileWriter already finished");
2051
2052 drop(file_writer);
2053
2054 let options = ReadOptionsBuilder::new().with_page_index().build();
2055 let reader = SerializedFileReader::new_with_options(Bytes::from(file), options).unwrap();
2056
2057 let offset_index = reader.metadata().offset_index().unwrap();
2058 assert_eq!(offset_index.len(), 1); assert_eq!(offset_index[0].len(), 2); let column_index = reader.metadata().column_index().unwrap();
2062 assert_eq!(column_index.len(), 1); assert_eq!(column_index[0].len(), 2); let a_idx = &column_index[0][0];
2066 assert!(matches!(a_idx, Index::INT32(_)), "{a_idx:?}");
2067 let b_idx = &column_index[0][1];
2068 assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
2069 }
2070
2071 #[test]
2072 fn test_byte_array_size_statistics() {
2073 let message_type = "
2074 message test_schema {
2075 OPTIONAL BYTE_ARRAY a (UTF8);
2076 }
2077 ";
2078 let schema = Arc::new(parse_message_type(message_type).unwrap());
2079 let data = ByteArrayType::gen_vec(32, 7);
2080 let def_levels = [1, 1, 1, 1, 0, 1, 0, 1, 0, 1];
2081 let unenc_size: i64 = data.iter().map(|x| x.len() as i64).sum();
2082 let file: File = tempfile::tempfile().unwrap();
2083 let props = Arc::new(
2084 WriterProperties::builder()
2085 .set_statistics_enabled(EnabledStatistics::Page)
2086 .build(),
2087 );
2088
2089 let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
2090 let mut row_group_writer = writer.next_row_group().unwrap();
2091
2092 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
2093 col_writer
2094 .typed::<ByteArrayType>()
2095 .write_batch(&data, Some(&def_levels), None)
2096 .unwrap();
2097 col_writer.close().unwrap();
2098 row_group_writer.close().unwrap();
2099 let file_metadata = writer.close().unwrap();
2100
2101 assert_eq!(file_metadata.row_groups.len(), 1);
2102 assert_eq!(file_metadata.row_groups[0].columns.len(), 1);
2103 assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
2104
2105 let check_def_hist = |def_hist: &[i64]| {
2106 assert_eq!(def_hist.len(), 2);
2107 assert_eq!(def_hist[0], 3);
2108 assert_eq!(def_hist[1], 7);
2109 };
2110
2111 assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
2112 let meta_data = file_metadata.row_groups[0].columns[0]
2113 .meta_data
2114 .as_ref()
2115 .unwrap();
2116 assert!(meta_data.size_statistics.is_some());
2117 let size_stats = meta_data.size_statistics.as_ref().unwrap();
2118
2119 assert!(size_stats.repetition_level_histogram.is_none());
2120 assert!(size_stats.definition_level_histogram.is_some());
2121 assert!(size_stats.unencoded_byte_array_data_bytes.is_some());
2122 assert_eq!(
2123 unenc_size,
2124 size_stats.unencoded_byte_array_data_bytes.unwrap()
2125 );
2126 check_def_hist(size_stats.definition_level_histogram.as_ref().unwrap());
2127
2128 let options = ReadOptionsBuilder::new().with_page_index().build();
2130 let reader = SerializedFileReader::new_with_options(file, options).unwrap();
2131
2132 let rfile_metadata = reader.metadata().file_metadata();
2133 assert_eq!(rfile_metadata.num_rows(), file_metadata.num_rows);
2134 assert_eq!(reader.num_row_groups(), 1);
2135 let rowgroup = reader.get_row_group(0).unwrap();
2136 assert_eq!(rowgroup.num_columns(), 1);
2137 let column = rowgroup.metadata().column(0);
2138 assert!(column.definition_level_histogram().is_some());
2139 assert!(column.repetition_level_histogram().is_none());
2140 assert!(column.unencoded_byte_array_data_bytes().is_some());
2141 check_def_hist(column.definition_level_histogram().unwrap().values());
2142 assert_eq!(
2143 unenc_size,
2144 column.unencoded_byte_array_data_bytes().unwrap()
2145 );
2146
2147 assert!(reader.metadata().column_index().is_some());
2149 let column_index = reader.metadata().column_index().unwrap();
2150 assert_eq!(column_index.len(), 1);
2151 assert_eq!(column_index[0].len(), 1);
2152 let col_idx = if let Index::BYTE_ARRAY(index) = &column_index[0][0] {
2153 assert_eq!(index.indexes.len(), 1);
2154 &index.indexes[0]
2155 } else {
2156 unreachable!()
2157 };
2158
2159 assert!(col_idx.repetition_level_histogram().is_none());
2160 assert!(col_idx.definition_level_histogram().is_some());
2161 check_def_hist(col_idx.definition_level_histogram().unwrap().values());
2162
2163 assert!(reader.metadata().offset_index().is_some());
2164 let offset_index = reader.metadata().offset_index().unwrap();
2165 assert_eq!(offset_index.len(), 1);
2166 assert_eq!(offset_index[0].len(), 1);
2167 assert!(offset_index[0][0].unencoded_byte_array_data_bytes.is_some());
2168 let page_sizes = offset_index[0][0]
2169 .unencoded_byte_array_data_bytes
2170 .as_ref()
2171 .unwrap();
2172 assert_eq!(page_sizes.len(), 1);
2173 assert_eq!(page_sizes[0], unenc_size);
2174 }
2175
2176 #[test]
2177 fn test_too_many_rowgroups() {
2178 let message_type = "
2179 message test_schema {
2180 REQUIRED BYTE_ARRAY a (UTF8);
2181 }
2182 ";
2183 let schema = Arc::new(parse_message_type(message_type).unwrap());
2184 let file: File = tempfile::tempfile().unwrap();
2185 let props = Arc::new(
2186 WriterProperties::builder()
2187 .set_statistics_enabled(EnabledStatistics::None)
2188 .set_max_row_group_size(1)
2189 .build(),
2190 );
2191 let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
2192
2193 for i in 0..0x8001 {
2195 match writer.next_row_group() {
2196 Ok(mut row_group_writer) => {
2197 assert_ne!(i, 0x8000);
2198 let col_writer = row_group_writer.next_column().unwrap().unwrap();
2199 col_writer.close().unwrap();
2200 row_group_writer.close().unwrap();
2201 }
2202 Err(e) => {
2203 assert_eq!(i, 0x8000);
2204 assert_eq!(
2205 e.to_string(),
2206 "Parquet error: Parquet does not support more than 32767 row groups per file (currently: 32768)"
2207 );
2208 }
2209 }
2210 }
2211 writer.close().unwrap();
2212 }
2213
2214 #[test]
2215 fn test_size_statistics_with_repetition_and_nulls() {
2216 let message_type = "
2217 message test_schema {
2218 OPTIONAL group i32_list (LIST) {
2219 REPEATED group list {
2220 OPTIONAL INT32 element;
2221 }
2222 }
2223 }
2224 ";
2225 let schema = Arc::new(parse_message_type(message_type).unwrap());
2232 let data = [1, 2, 4, 7, 8, 9, 10];
2233 let def_levels = [3, 3, 0, 3, 2, 1, 3, 3, 3, 3];
2234 let rep_levels = [0, 1, 0, 0, 1, 0, 0, 1, 1, 1];
2235 let file = tempfile::tempfile().unwrap();
2236 let props = Arc::new(
2237 WriterProperties::builder()
2238 .set_statistics_enabled(EnabledStatistics::Page)
2239 .build(),
2240 );
2241 let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
2242 let mut row_group_writer = writer.next_row_group().unwrap();
2243
2244 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
2245 col_writer
2246 .typed::<Int32Type>()
2247 .write_batch(&data, Some(&def_levels), Some(&rep_levels))
2248 .unwrap();
2249 col_writer.close().unwrap();
2250 row_group_writer.close().unwrap();
2251 let file_metadata = writer.close().unwrap();
2252
2253 assert_eq!(file_metadata.row_groups.len(), 1);
2254 assert_eq!(file_metadata.row_groups[0].columns.len(), 1);
2255 assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
2256
2257 let check_def_hist = |def_hist: &[i64]| {
2258 assert_eq!(def_hist.len(), 4);
2259 assert_eq!(def_hist[0], 1);
2260 assert_eq!(def_hist[1], 1);
2261 assert_eq!(def_hist[2], 1);
2262 assert_eq!(def_hist[3], 7);
2263 };
2264
2265 let check_rep_hist = |rep_hist: &[i64]| {
2266 assert_eq!(rep_hist.len(), 2);
2267 assert_eq!(rep_hist[0], 5);
2268 assert_eq!(rep_hist[1], 5);
2269 };
2270
2271 assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
2274 let meta_data = file_metadata.row_groups[0].columns[0]
2275 .meta_data
2276 .as_ref()
2277 .unwrap();
2278 assert!(meta_data.size_statistics.is_some());
2279 let size_stats = meta_data.size_statistics.as_ref().unwrap();
2280 assert!(size_stats.repetition_level_histogram.is_some());
2281 assert!(size_stats.definition_level_histogram.is_some());
2282 assert!(size_stats.unencoded_byte_array_data_bytes.is_none());
2283 check_def_hist(size_stats.definition_level_histogram.as_ref().unwrap());
2284 check_rep_hist(size_stats.repetition_level_histogram.as_ref().unwrap());
2285
2286 let options = ReadOptionsBuilder::new().with_page_index().build();
2288 let reader = SerializedFileReader::new_with_options(file, options).unwrap();
2289
2290 let rfile_metadata = reader.metadata().file_metadata();
2291 assert_eq!(rfile_metadata.num_rows(), file_metadata.num_rows);
2292 assert_eq!(reader.num_row_groups(), 1);
2293 let rowgroup = reader.get_row_group(0).unwrap();
2294 assert_eq!(rowgroup.num_columns(), 1);
2295 let column = rowgroup.metadata().column(0);
2296 assert!(column.definition_level_histogram().is_some());
2297 assert!(column.repetition_level_histogram().is_some());
2298 assert!(column.unencoded_byte_array_data_bytes().is_none());
2299 check_def_hist(column.definition_level_histogram().unwrap().values());
2300 check_rep_hist(column.repetition_level_histogram().unwrap().values());
2301
2302 assert!(reader.metadata().column_index().is_some());
2304 let column_index = reader.metadata().column_index().unwrap();
2305 assert_eq!(column_index.len(), 1);
2306 assert_eq!(column_index[0].len(), 1);
2307 let col_idx = if let Index::INT32(index) = &column_index[0][0] {
2308 assert_eq!(index.indexes.len(), 1);
2309 &index.indexes[0]
2310 } else {
2311 unreachable!()
2312 };
2313
2314 check_def_hist(col_idx.definition_level_histogram().unwrap().values());
2315 check_rep_hist(col_idx.repetition_level_histogram().unwrap().values());
2316
2317 assert!(reader.metadata().offset_index().is_some());
2318 let offset_index = reader.metadata().offset_index().unwrap();
2319 assert_eq!(offset_index.len(), 1);
2320 assert_eq!(offset_index[0].len(), 1);
2321 assert!(offset_index[0][0].unencoded_byte_array_data_bytes.is_none());
2322 }
2323
2324 #[test]
2325 #[cfg(feature = "arrow")]
2326 fn test_byte_stream_split_extended_roundtrip() {
2327 let path = format!(
2328 "{}/byte_stream_split_extended.gzip.parquet",
2329 arrow::util::test_util::parquet_test_data(),
2330 );
2331 let file = File::open(path).unwrap();
2332
2333 let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(file)
2335 .expect("parquet open")
2336 .build()
2337 .expect("parquet open");
2338
2339 let file = tempfile::tempfile().unwrap();
2340 let props = WriterProperties::builder()
2341 .set_dictionary_enabled(false)
2342 .set_column_encoding(
2343 ColumnPath::from("float16_byte_stream_split"),
2344 Encoding::BYTE_STREAM_SPLIT,
2345 )
2346 .set_column_encoding(
2347 ColumnPath::from("float_byte_stream_split"),
2348 Encoding::BYTE_STREAM_SPLIT,
2349 )
2350 .set_column_encoding(
2351 ColumnPath::from("double_byte_stream_split"),
2352 Encoding::BYTE_STREAM_SPLIT,
2353 )
2354 .set_column_encoding(
2355 ColumnPath::from("int32_byte_stream_split"),
2356 Encoding::BYTE_STREAM_SPLIT,
2357 )
2358 .set_column_encoding(
2359 ColumnPath::from("int64_byte_stream_split"),
2360 Encoding::BYTE_STREAM_SPLIT,
2361 )
2362 .set_column_encoding(
2363 ColumnPath::from("flba5_byte_stream_split"),
2364 Encoding::BYTE_STREAM_SPLIT,
2365 )
2366 .set_column_encoding(
2367 ColumnPath::from("decimal_byte_stream_split"),
2368 Encoding::BYTE_STREAM_SPLIT,
2369 )
2370 .build();
2371
2372 let mut parquet_writer = ArrowWriter::try_new(
2373 file.try_clone().expect("cannot open file"),
2374 parquet_reader.schema(),
2375 Some(props),
2376 )
2377 .expect("create arrow writer");
2378
2379 for maybe_batch in parquet_reader {
2380 let batch = maybe_batch.expect("reading batch");
2381 parquet_writer.write(&batch).expect("writing data");
2382 }
2383
2384 parquet_writer.close().expect("finalizing file");
2385
2386 let reader = SerializedFileReader::new(file).expect("Failed to create reader");
2387 let filemeta = reader.metadata();
2388
2389 let check_encoding = |x: usize, filemeta: &ParquetMetaData| {
2391 assert!(filemeta
2392 .row_group(0)
2393 .column(x)
2394 .encodings()
2395 .contains(&Encoding::BYTE_STREAM_SPLIT));
2396 };
2397
2398 check_encoding(1, filemeta);
2399 check_encoding(3, filemeta);
2400 check_encoding(5, filemeta);
2401 check_encoding(7, filemeta);
2402 check_encoding(9, filemeta);
2403 check_encoding(11, filemeta);
2404 check_encoding(13, filemeta);
2405
2406 let mut iter = reader
2408 .get_row_iter(None)
2409 .expect("Failed to create row iterator");
2410
2411 let mut start = 0;
2412 let end = reader.metadata().file_metadata().num_rows();
2413
2414 let check_row = |row: Result<Row, ParquetError>| {
2415 assert!(row.is_ok());
2416 let r = row.unwrap();
2417 assert_eq!(r.get_float16(0).unwrap(), r.get_float16(1).unwrap());
2418 assert_eq!(r.get_float(2).unwrap(), r.get_float(3).unwrap());
2419 assert_eq!(r.get_double(4).unwrap(), r.get_double(5).unwrap());
2420 assert_eq!(r.get_int(6).unwrap(), r.get_int(7).unwrap());
2421 assert_eq!(r.get_long(8).unwrap(), r.get_long(9).unwrap());
2422 assert_eq!(r.get_bytes(10).unwrap(), r.get_bytes(11).unwrap());
2423 assert_eq!(r.get_decimal(12).unwrap(), r.get_decimal(13).unwrap());
2424 };
2425
2426 while start < end {
2427 match iter.next() {
2428 Some(row) => check_row(row),
2429 None => break,
2430 };
2431 start += 1;
2432 }
2433 }
2434}