1use crate::bloom_filter::Sbbf;
22use crate::format as parquet;
23use crate::format::{ColumnIndex, OffsetIndex};
24use crate::thrift::TSerializable;
25use std::fmt::Debug;
26use std::io::{BufWriter, IoSlice, Read};
27use std::{io::Write, sync::Arc};
28use thrift::protocol::TCompactOutputProtocol;
29
30use crate::column::page_encryption::PageEncryptor;
31use crate::column::writer::{get_typed_column_writer_mut, ColumnCloseResult, ColumnWriterImpl};
32use crate::column::{
33 page::{CompressedPage, PageWriteSpec, PageWriter},
34 writer::{get_column_writer, ColumnWriter},
35};
36use crate::data_type::DataType;
37#[cfg(feature = "encryption")]
38use crate::encryption::encrypt::{
39 get_column_crypto_metadata, FileEncryptionProperties, FileEncryptor,
40};
41use crate::errors::{ParquetError, Result};
42use crate::file::properties::{BloomFilterPosition, WriterPropertiesPtr};
43use crate::file::reader::ChunkReader;
44#[cfg(feature = "encryption")]
45use crate::file::PARQUET_MAGIC_ENCR_FOOTER;
46use crate::file::{metadata::*, PARQUET_MAGIC};
47use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor, TypePtr};
48
49pub struct TrackedWrite<W: Write> {
53 inner: BufWriter<W>,
54 bytes_written: usize,
55}
56
57impl<W: Write> TrackedWrite<W> {
58 pub fn new(inner: W) -> Self {
60 let buf_write = BufWriter::new(inner);
61 Self {
62 inner: buf_write,
63 bytes_written: 0,
64 }
65 }
66
67 pub fn bytes_written(&self) -> usize {
69 self.bytes_written
70 }
71
72 pub fn inner(&self) -> &W {
74 self.inner.get_ref()
75 }
76
77 pub fn inner_mut(&mut self) -> &mut W {
82 self.inner.get_mut()
83 }
84
85 pub fn into_inner(self) -> Result<W> {
87 self.inner.into_inner().map_err(|err| {
88 ParquetError::General(format!("fail to get inner writer: {:?}", err.to_string()))
89 })
90 }
91}
92
93impl<W: Write> Write for TrackedWrite<W> {
94 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
95 let bytes = self.inner.write(buf)?;
96 self.bytes_written += bytes;
97 Ok(bytes)
98 }
99
100 fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> std::io::Result<usize> {
101 let bytes = self.inner.write_vectored(bufs)?;
102 self.bytes_written += bytes;
103 Ok(bytes)
104 }
105
106 fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
107 self.inner.write_all(buf)?;
108 self.bytes_written += buf.len();
109
110 Ok(())
111 }
112
113 fn flush(&mut self) -> std::io::Result<()> {
114 self.inner.flush()
115 }
116}
117
118pub type OnCloseColumnChunk<'a> = Box<dyn FnOnce(ColumnCloseResult) -> Result<()> + 'a>;
120
121pub type OnCloseRowGroup<'a, W> = Box<
127 dyn FnOnce(
128 &'a mut TrackedWrite<W>,
129 RowGroupMetaData,
130 Vec<Option<Sbbf>>,
131 Vec<Option<ColumnIndex>>,
132 Vec<Option<OffsetIndex>>,
133 ) -> Result<()>
134 + 'a
135 + Send,
136>;
137
138pub struct SerializedFileWriter<W: Write> {
151 buf: TrackedWrite<W>,
152 schema: TypePtr,
153 descr: SchemaDescPtr,
154 props: WriterPropertiesPtr,
155 row_groups: Vec<RowGroupMetaData>,
156 bloom_filters: Vec<Vec<Option<Sbbf>>>,
157 column_indexes: Vec<Vec<Option<ColumnIndex>>>,
158 offset_indexes: Vec<Vec<Option<OffsetIndex>>>,
159 row_group_index: usize,
160 kv_metadatas: Vec<KeyValue>,
162 finished: bool,
163 #[cfg(feature = "encryption")]
164 file_encryptor: Option<Arc<FileEncryptor>>,
165}
166
167impl<W: Write> Debug for SerializedFileWriter<W> {
168 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
169 f.debug_struct("SerializedFileWriter")
172 .field("descr", &self.descr)
173 .field("row_group_index", &self.row_group_index)
174 .field("kv_metadatas", &self.kv_metadatas)
175 .finish_non_exhaustive()
176 }
177}
178
179impl<W: Write + Send> SerializedFileWriter<W> {
180 pub fn new(buf: W, schema: TypePtr, properties: WriterPropertiesPtr) -> Result<Self> {
182 let mut buf = TrackedWrite::new(buf);
183
184 let schema_descriptor = SchemaDescriptor::new(schema.clone());
185
186 #[cfg(feature = "encryption")]
187 let file_encryptor = Self::get_file_encryptor(&properties, &schema_descriptor)?;
188
189 Self::start_file(&properties, &mut buf)?;
190 Ok(Self {
191 buf,
192 schema,
193 descr: Arc::new(schema_descriptor),
194 props: properties,
195 row_groups: vec![],
196 bloom_filters: vec![],
197 column_indexes: Vec::new(),
198 offset_indexes: Vec::new(),
199 row_group_index: 0,
200 kv_metadatas: Vec::new(),
201 finished: false,
202 #[cfg(feature = "encryption")]
203 file_encryptor,
204 })
205 }
206
207 #[cfg(feature = "encryption")]
208 fn get_file_encryptor(
209 properties: &WriterPropertiesPtr,
210 schema_descriptor: &SchemaDescriptor,
211 ) -> Result<Option<Arc<FileEncryptor>>> {
212 if let Some(file_encryption_properties) = &properties.file_encryption_properties {
213 file_encryption_properties.validate_encrypted_column_names(schema_descriptor)?;
214
215 if !file_encryption_properties.encrypt_footer() {
216 return Err(general_err!(
217 "Writing encrypted files with plaintext footers is not supported yet"
218 ));
219 }
220
221 Ok(Some(Arc::new(FileEncryptor::new(
222 file_encryption_properties.clone(),
223 )?)))
224 } else {
225 Ok(None)
226 }
227 }
228
229 pub fn next_row_group(&mut self) -> Result<SerializedRowGroupWriter<'_, W>> {
236 self.assert_previous_writer_closed()?;
237 let ordinal = self.row_group_index;
238
239 let ordinal: i16 = ordinal.try_into().map_err(|_| {
240 ParquetError::General(format!(
241 "Parquet does not support more than {} row groups per file (currently: {})",
242 i16::MAX,
243 ordinal
244 ))
245 })?;
246
247 self.row_group_index = self
248 .row_group_index
249 .checked_add(1)
250 .expect("SerializedFileWriter::row_group_index overflowed");
251
252 let bloom_filter_position = self.properties().bloom_filter_position();
253 let row_groups = &mut self.row_groups;
254 let row_bloom_filters = &mut self.bloom_filters;
255 let row_column_indexes = &mut self.column_indexes;
256 let row_offset_indexes = &mut self.offset_indexes;
257 let on_close = move |buf,
258 mut metadata,
259 row_group_bloom_filter,
260 row_group_column_index,
261 row_group_offset_index| {
262 row_bloom_filters.push(row_group_bloom_filter);
263 row_column_indexes.push(row_group_column_index);
264 row_offset_indexes.push(row_group_offset_index);
265 match bloom_filter_position {
267 BloomFilterPosition::AfterRowGroup => {
268 write_bloom_filters(buf, row_bloom_filters, &mut metadata)?
269 }
270 BloomFilterPosition::End => (),
271 };
272 row_groups.push(metadata);
273 Ok(())
274 };
275
276 let row_group_writer = SerializedRowGroupWriter::new(
277 self.descr.clone(),
278 self.props.clone(),
279 &mut self.buf,
280 ordinal,
281 Some(Box::new(on_close)),
282 );
283 #[cfg(feature = "encryption")]
284 let row_group_writer = row_group_writer.with_file_encryptor(self.file_encryptor.clone());
285
286 Ok(row_group_writer)
287 }
288
289 pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
291 &self.row_groups
292 }
293
294 pub fn finish(&mut self) -> Result<parquet::FileMetaData> {
300 self.assert_previous_writer_closed()?;
301 let metadata = self.write_metadata()?;
302 self.buf.flush()?;
303 Ok(metadata)
304 }
305
306 pub fn close(mut self) -> Result<parquet::FileMetaData> {
308 self.finish()
309 }
310
311 #[cfg(not(feature = "encryption"))]
313 fn start_file(_properties: &WriterPropertiesPtr, buf: &mut TrackedWrite<W>) -> Result<()> {
314 buf.write_all(get_file_magic())?;
315 Ok(())
316 }
317
318 #[cfg(feature = "encryption")]
320 fn start_file(properties: &WriterPropertiesPtr, buf: &mut TrackedWrite<W>) -> Result<()> {
321 let magic = get_file_magic(properties.file_encryption_properties.as_ref());
322
323 buf.write_all(magic)?;
324 Ok(())
325 }
326
327 fn write_metadata(&mut self) -> Result<parquet::FileMetaData> {
329 self.finished = true;
330
331 for row_group in &mut self.row_groups {
333 write_bloom_filters(&mut self.buf, &mut self.bloom_filters, row_group)?;
334 }
335
336 let key_value_metadata = match self.props.key_value_metadata() {
337 Some(kv) => Some(kv.iter().chain(&self.kv_metadatas).cloned().collect()),
338 None if self.kv_metadatas.is_empty() => None,
339 None => Some(self.kv_metadatas.clone()),
340 };
341
342 let row_groups = self
343 .row_groups
344 .iter()
345 .map(|v| v.to_thrift())
346 .collect::<Vec<_>>();
347
348 let mut encoder = ThriftMetadataWriter::new(
349 &mut self.buf,
350 &self.schema,
351 &self.descr,
352 row_groups,
353 Some(self.props.created_by().to_string()),
354 self.props.writer_version().as_num(),
355 );
356
357 #[cfg(feature = "encryption")]
358 {
359 encoder = encoder.with_file_encryptor(self.file_encryptor.clone());
360 }
361
362 if let Some(key_value_metadata) = key_value_metadata {
363 encoder = encoder.with_key_value_metadata(key_value_metadata)
364 }
365 encoder = encoder.with_column_indexes(&self.column_indexes);
366 encoder = encoder.with_offset_indexes(&self.offset_indexes);
367 encoder.finish()
368 }
369
370 #[inline]
371 fn assert_previous_writer_closed(&self) -> Result<()> {
372 if self.finished {
373 return Err(general_err!("SerializedFileWriter already finished"));
374 }
375
376 if self.row_group_index != self.row_groups.len() {
377 Err(general_err!("Previous row group writer was not closed"))
378 } else {
379 Ok(())
380 }
381 }
382
383 pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
385 self.kv_metadatas.push(kv_metadata);
386 }
387
388 pub fn schema_descr(&self) -> &SchemaDescriptor {
390 &self.descr
391 }
392
393 pub fn properties(&self) -> &WriterPropertiesPtr {
395 &self.props
396 }
397
398 pub fn inner(&self) -> &W {
400 self.buf.inner()
401 }
402
403 pub fn inner_mut(&mut self) -> &mut W {
407 self.buf.inner_mut()
408 }
409
410 pub fn into_inner(mut self) -> Result<W> {
412 self.assert_previous_writer_closed()?;
413 let _ = self.write_metadata()?;
414
415 self.buf.into_inner()
416 }
417
418 pub fn bytes_written(&self) -> usize {
420 self.buf.bytes_written()
421 }
422
423 #[cfg(feature = "encryption")]
425 pub(crate) fn file_encryptor(&self) -> Option<Arc<FileEncryptor>> {
426 self.file_encryptor.clone()
427 }
428}
429
430fn write_bloom_filters<W: Write + Send>(
433 buf: &mut TrackedWrite<W>,
434 bloom_filters: &mut [Vec<Option<Sbbf>>],
435 row_group: &mut RowGroupMetaData,
436) -> Result<()> {
437 let row_group_idx: u16 = row_group
442 .ordinal()
443 .expect("Missing row group ordinal")
444 .try_into()
445 .map_err(|_| {
446 ParquetError::General(format!(
447 "Negative row group ordinal: {})",
448 row_group.ordinal().unwrap()
449 ))
450 })?;
451 let row_group_idx = row_group_idx as usize;
452 for (column_idx, column_chunk) in row_group.columns_mut().iter_mut().enumerate() {
453 if let Some(bloom_filter) = bloom_filters[row_group_idx][column_idx].take() {
454 let start_offset = buf.bytes_written();
455 bloom_filter.write(&mut *buf)?;
456 let end_offset = buf.bytes_written();
457 *column_chunk = column_chunk
459 .clone()
460 .into_builder()
461 .set_bloom_filter_offset(Some(start_offset as i64))
462 .set_bloom_filter_length(Some((end_offset - start_offset) as i32))
463 .build()?;
464 }
465 }
466 Ok(())
467}
468
469pub struct SerializedRowGroupWriter<'a, W: Write> {
481 descr: SchemaDescPtr,
482 props: WriterPropertiesPtr,
483 buf: &'a mut TrackedWrite<W>,
484 total_rows_written: Option<u64>,
485 total_bytes_written: u64,
486 total_uncompressed_bytes: i64,
487 column_index: usize,
488 row_group_metadata: Option<RowGroupMetaDataPtr>,
489 column_chunks: Vec<ColumnChunkMetaData>,
490 bloom_filters: Vec<Option<Sbbf>>,
491 column_indexes: Vec<Option<ColumnIndex>>,
492 offset_indexes: Vec<Option<OffsetIndex>>,
493 row_group_index: i16,
494 file_offset: i64,
495 on_close: Option<OnCloseRowGroup<'a, W>>,
496 #[cfg(feature = "encryption")]
497 file_encryptor: Option<Arc<FileEncryptor>>,
498}
499
500impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> {
501 pub fn new(
510 schema_descr: SchemaDescPtr,
511 properties: WriterPropertiesPtr,
512 buf: &'a mut TrackedWrite<W>,
513 row_group_index: i16,
514 on_close: Option<OnCloseRowGroup<'a, W>>,
515 ) -> Self {
516 let num_columns = schema_descr.num_columns();
517 let file_offset = buf.bytes_written() as i64;
518 Self {
519 buf,
520 row_group_index,
521 file_offset,
522 on_close,
523 total_rows_written: None,
524 descr: schema_descr,
525 props: properties,
526 column_index: 0,
527 row_group_metadata: None,
528 column_chunks: Vec::with_capacity(num_columns),
529 bloom_filters: Vec::with_capacity(num_columns),
530 column_indexes: Vec::with_capacity(num_columns),
531 offset_indexes: Vec::with_capacity(num_columns),
532 total_bytes_written: 0,
533 total_uncompressed_bytes: 0,
534 #[cfg(feature = "encryption")]
535 file_encryptor: None,
536 }
537 }
538
539 #[cfg(feature = "encryption")]
540 pub(crate) fn with_file_encryptor(
542 mut self,
543 file_encryptor: Option<Arc<FileEncryptor>>,
544 ) -> Self {
545 self.file_encryptor = file_encryptor;
546 self
547 }
548
549 fn next_column_desc(&mut self) -> Option<ColumnDescPtr> {
551 let ret = self.descr.columns().get(self.column_index)?.clone();
552 self.column_index += 1;
553 Some(ret)
554 }
555
556 fn get_on_close(&mut self) -> (&mut TrackedWrite<W>, OnCloseColumnChunk<'_>) {
558 let total_bytes_written = &mut self.total_bytes_written;
559 let total_uncompressed_bytes = &mut self.total_uncompressed_bytes;
560 let total_rows_written = &mut self.total_rows_written;
561 let column_chunks = &mut self.column_chunks;
562 let column_indexes = &mut self.column_indexes;
563 let offset_indexes = &mut self.offset_indexes;
564 let bloom_filters = &mut self.bloom_filters;
565
566 let on_close = |r: ColumnCloseResult| {
567 *total_bytes_written += r.bytes_written;
569 *total_uncompressed_bytes += r.metadata.uncompressed_size();
570 column_chunks.push(r.metadata);
571 bloom_filters.push(r.bloom_filter);
572 column_indexes.push(r.column_index);
573 offset_indexes.push(r.offset_index);
574
575 if let Some(rows) = *total_rows_written {
576 if rows != r.rows_written {
577 return Err(general_err!(
578 "Incorrect number of rows, expected {} != {} rows",
579 rows,
580 r.rows_written
581 ));
582 }
583 } else {
584 *total_rows_written = Some(r.rows_written);
585 }
586
587 Ok(())
588 };
589 (self.buf, Box::new(on_close))
590 }
591
592 pub(crate) fn next_column_with_factory<'b, F, C>(&'b mut self, factory: F) -> Result<Option<C>>
595 where
596 F: FnOnce(
597 ColumnDescPtr,
598 WriterPropertiesPtr,
599 Box<dyn PageWriter + 'b>,
600 OnCloseColumnChunk<'b>,
601 ) -> Result<C>,
602 {
603 self.assert_previous_writer_closed()?;
604
605 let encryptor_context = self.get_page_encryptor_context();
606
607 Ok(match self.next_column_desc() {
608 Some(column) => {
609 let props = self.props.clone();
610 let (buf, on_close) = self.get_on_close();
611
612 let page_writer = SerializedPageWriter::new(buf);
613 let page_writer =
614 Self::set_page_writer_encryptor(&column, encryptor_context, page_writer)?;
615
616 Some(factory(
617 column,
618 props,
619 Box::new(page_writer),
620 Box::new(on_close),
621 )?)
622 }
623 None => None,
624 })
625 }
626
627 pub fn next_column(&mut self) -> Result<Option<SerializedColumnWriter<'_>>> {
631 self.next_column_with_factory(|descr, props, page_writer, on_close| {
632 let column_writer = get_column_writer(descr, props, page_writer);
633 Ok(SerializedColumnWriter::new(column_writer, Some(on_close)))
634 })
635 }
636
637 pub fn append_column<R: ChunkReader>(
644 &mut self,
645 reader: &R,
646 mut close: ColumnCloseResult,
647 ) -> Result<()> {
648 self.assert_previous_writer_closed()?;
649 let desc = self
650 .next_column_desc()
651 .ok_or_else(|| general_err!("exhausted columns in SerializedRowGroupWriter"))?;
652
653 let metadata = close.metadata;
654
655 if metadata.column_descr() != desc.as_ref() {
656 return Err(general_err!(
657 "column descriptor mismatch, expected {:?} got {:?}",
658 desc,
659 metadata.column_descr()
660 ));
661 }
662
663 let src_dictionary_offset = metadata.dictionary_page_offset();
664 let src_data_offset = metadata.data_page_offset();
665 let src_offset = src_dictionary_offset.unwrap_or(src_data_offset);
666 let src_length = metadata.compressed_size();
667
668 let write_offset = self.buf.bytes_written();
669 let mut read = reader.get_read(src_offset as _)?.take(src_length as _);
670 let write_length = std::io::copy(&mut read, &mut self.buf)?;
671
672 if src_length as u64 != write_length {
673 return Err(general_err!(
674 "Failed to splice column data, expected {read_length} got {write_length}"
675 ));
676 }
677
678 let map_offset = |x| x - src_offset + write_offset as i64;
679 let mut builder = ColumnChunkMetaData::builder(metadata.column_descr_ptr())
680 .set_compression(metadata.compression())
681 .set_encodings(metadata.encodings().clone())
682 .set_total_compressed_size(metadata.compressed_size())
683 .set_total_uncompressed_size(metadata.uncompressed_size())
684 .set_num_values(metadata.num_values())
685 .set_data_page_offset(map_offset(src_data_offset))
686 .set_dictionary_page_offset(src_dictionary_offset.map(map_offset))
687 .set_unencoded_byte_array_data_bytes(metadata.unencoded_byte_array_data_bytes());
688
689 if let Some(rep_hist) = metadata.repetition_level_histogram() {
690 builder = builder.set_repetition_level_histogram(Some(rep_hist.clone()))
691 }
692 if let Some(def_hist) = metadata.definition_level_histogram() {
693 builder = builder.set_definition_level_histogram(Some(def_hist.clone()))
694 }
695 if let Some(statistics) = metadata.statistics() {
696 builder = builder.set_statistics(statistics.clone())
697 }
698 builder = self.set_column_crypto_metadata(builder, &metadata);
699 close.metadata = builder.build()?;
700
701 if let Some(offsets) = close.offset_index.as_mut() {
702 for location in &mut offsets.page_locations {
703 location.offset = map_offset(location.offset)
704 }
705 }
706
707 let (_, on_close) = self.get_on_close();
708 on_close(close)
709 }
710
711 pub fn close(mut self) -> Result<RowGroupMetaDataPtr> {
713 if self.row_group_metadata.is_none() {
714 self.assert_previous_writer_closed()?;
715
716 let column_chunks = std::mem::take(&mut self.column_chunks);
717 let row_group_metadata = RowGroupMetaData::builder(self.descr.clone())
718 .set_column_metadata(column_chunks)
719 .set_total_byte_size(self.total_uncompressed_bytes)
720 .set_num_rows(self.total_rows_written.unwrap_or(0) as i64)
721 .set_sorting_columns(self.props.sorting_columns().cloned())
722 .set_ordinal(self.row_group_index)
723 .set_file_offset(self.file_offset)
724 .build()?;
725
726 self.row_group_metadata = Some(Arc::new(row_group_metadata.clone()));
727
728 if let Some(on_close) = self.on_close.take() {
729 on_close(
730 self.buf,
731 row_group_metadata,
732 self.bloom_filters,
733 self.column_indexes,
734 self.offset_indexes,
735 )?
736 }
737 }
738
739 let metadata = self.row_group_metadata.as_ref().unwrap().clone();
740 Ok(metadata)
741 }
742
743 #[cfg(feature = "encryption")]
745 fn set_column_crypto_metadata(
746 &self,
747 builder: ColumnChunkMetaDataBuilder,
748 metadata: &ColumnChunkMetaData,
749 ) -> ColumnChunkMetaDataBuilder {
750 if let Some(file_encryptor) = self.file_encryptor.as_ref() {
751 builder.set_column_crypto_metadata(get_column_crypto_metadata(
752 file_encryptor.properties(),
753 &metadata.column_descr_ptr(),
754 ))
755 } else {
756 builder
757 }
758 }
759
760 #[cfg(feature = "encryption")]
762 fn get_page_encryptor_context(&self) -> PageEncryptorContext {
763 PageEncryptorContext {
764 file_encryptor: self.file_encryptor.clone(),
765 row_group_index: self.row_group_index as usize,
766 column_index: self.column_index,
767 }
768 }
769
770 #[cfg(feature = "encryption")]
772 fn set_page_writer_encryptor<'b>(
773 column: &ColumnDescPtr,
774 context: PageEncryptorContext,
775 page_writer: SerializedPageWriter<'b, W>,
776 ) -> Result<SerializedPageWriter<'b, W>> {
777 let page_encryptor = PageEncryptor::create_if_column_encrypted(
778 &context.file_encryptor,
779 context.row_group_index,
780 context.column_index,
781 &column.path().string(),
782 )?;
783
784 Ok(page_writer.with_page_encryptor(page_encryptor))
785 }
786
787 #[cfg(not(feature = "encryption"))]
789 fn set_column_crypto_metadata(
790 &self,
791 builder: ColumnChunkMetaDataBuilder,
792 _metadata: &ColumnChunkMetaData,
793 ) -> ColumnChunkMetaDataBuilder {
794 builder
795 }
796
797 #[cfg(not(feature = "encryption"))]
798 fn get_page_encryptor_context(&self) -> PageEncryptorContext {
799 PageEncryptorContext {}
800 }
801
802 #[cfg(not(feature = "encryption"))]
804 fn set_page_writer_encryptor<'b>(
805 _column: &ColumnDescPtr,
806 _context: PageEncryptorContext,
807 page_writer: SerializedPageWriter<'b, W>,
808 ) -> Result<SerializedPageWriter<'b, W>> {
809 Ok(page_writer)
810 }
811
812 #[inline]
813 fn assert_previous_writer_closed(&self) -> Result<()> {
814 if self.column_index != self.column_chunks.len() {
815 Err(general_err!("Previous column writer was not closed"))
816 } else {
817 Ok(())
818 }
819 }
820}
821
822#[cfg(feature = "encryption")]
824struct PageEncryptorContext {
825 file_encryptor: Option<Arc<FileEncryptor>>,
826 row_group_index: usize,
827 column_index: usize,
828}
829
830#[cfg(not(feature = "encryption"))]
831struct PageEncryptorContext {}
832
833pub struct SerializedColumnWriter<'a> {
835 inner: ColumnWriter<'a>,
836 on_close: Option<OnCloseColumnChunk<'a>>,
837}
838
839impl<'a> SerializedColumnWriter<'a> {
840 pub fn new(inner: ColumnWriter<'a>, on_close: Option<OnCloseColumnChunk<'a>>) -> Self {
843 Self { inner, on_close }
844 }
845
846 pub fn untyped(&mut self) -> &mut ColumnWriter<'a> {
848 &mut self.inner
849 }
850
851 pub fn typed<T: DataType>(&mut self) -> &mut ColumnWriterImpl<'a, T> {
853 get_typed_column_writer_mut(&mut self.inner)
854 }
855
856 pub fn close(mut self) -> Result<()> {
858 let r = self.inner.close()?;
859 if let Some(on_close) = self.on_close.take() {
860 on_close(r)?
861 }
862
863 Ok(())
864 }
865}
866
867pub struct SerializedPageWriter<'a, W: Write> {
872 sink: &'a mut TrackedWrite<W>,
873 #[cfg(feature = "encryption")]
874 page_encryptor: Option<PageEncryptor>,
875}
876
877impl<'a, W: Write> SerializedPageWriter<'a, W> {
878 pub fn new(sink: &'a mut TrackedWrite<W>) -> Self {
880 Self {
881 sink,
882 #[cfg(feature = "encryption")]
883 page_encryptor: None,
884 }
885 }
886
887 #[inline]
890 fn serialize_page_header(&mut self, header: parquet::PageHeader) -> Result<usize> {
891 let start_pos = self.sink.bytes_written();
892 match self.page_encryptor_and_sink_mut() {
893 Some((page_encryptor, sink)) => {
894 page_encryptor.encrypt_page_header(&header, sink)?;
895 }
896 None => {
897 let mut protocol = TCompactOutputProtocol::new(&mut self.sink);
898 header.write_to_out_protocol(&mut protocol)?;
899 }
900 }
901 Ok(self.sink.bytes_written() - start_pos)
902 }
903}
904
905#[cfg(feature = "encryption")]
906impl<'a, W: Write> SerializedPageWriter<'a, W> {
907 fn with_page_encryptor(mut self, page_encryptor: Option<PageEncryptor>) -> Self {
909 self.page_encryptor = page_encryptor;
910 self
911 }
912
913 fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
914 self.page_encryptor.as_mut()
915 }
916
917 fn page_encryptor_and_sink_mut(
918 &mut self,
919 ) -> Option<(&mut PageEncryptor, &mut &'a mut TrackedWrite<W>)> {
920 self.page_encryptor.as_mut().map(|pe| (pe, &mut self.sink))
921 }
922}
923
924#[cfg(not(feature = "encryption"))]
925impl<'a, W: Write> SerializedPageWriter<'a, W> {
926 fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
927 None
928 }
929
930 fn page_encryptor_and_sink_mut(
931 &mut self,
932 ) -> Option<(&mut PageEncryptor, &mut &'a mut TrackedWrite<W>)> {
933 None
934 }
935}
936
937impl<W: Write + Send> PageWriter for SerializedPageWriter<'_, W> {
938 fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
939 let page = match self.page_encryptor_mut() {
940 Some(page_encryptor) => page_encryptor.encrypt_compressed_page(page)?,
941 None => page,
942 };
943
944 let page_type = page.page_type();
945 let start_pos = self.sink.bytes_written() as u64;
946
947 let page_header = page.to_thrift_header();
948 let header_size = self.serialize_page_header(page_header)?;
949
950 self.sink.write_all(page.data())?;
951
952 let mut spec = PageWriteSpec::new();
953 spec.page_type = page_type;
954 spec.uncompressed_size = page.uncompressed_size() + header_size;
955 spec.compressed_size = page.compressed_size() + header_size;
956 spec.offset = start_pos;
957 spec.bytes_written = self.sink.bytes_written() as u64 - start_pos;
958 spec.num_values = page.num_values();
959
960 if let Some(page_encryptor) = self.page_encryptor_mut() {
961 if page.compressed_page().is_data_page() {
962 page_encryptor.increment_page();
963 }
964 }
965 Ok(spec)
966 }
967
968 fn close(&mut self) -> Result<()> {
969 self.sink.flush()?;
970 Ok(())
971 }
972}
973
974#[cfg(feature = "encryption")]
977pub(crate) fn get_file_magic(
978 file_encryption_properties: Option<&FileEncryptionProperties>,
979) -> &'static [u8; 4] {
980 match file_encryption_properties.as_ref() {
981 Some(encryption_properties) if encryption_properties.encrypt_footer() => {
982 &PARQUET_MAGIC_ENCR_FOOTER
983 }
984 _ => &PARQUET_MAGIC,
985 }
986}
987
988#[cfg(not(feature = "encryption"))]
989pub(crate) fn get_file_magic() -> &'static [u8; 4] {
990 &PARQUET_MAGIC
991}
992
993#[cfg(test)]
994mod tests {
995 use super::*;
996
997 #[cfg(feature = "arrow")]
998 use arrow_array::RecordBatchReader;
999 use bytes::Bytes;
1000 use std::fs::File;
1001
1002 #[cfg(feature = "arrow")]
1003 use crate::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
1004 #[cfg(feature = "arrow")]
1005 use crate::arrow::ArrowWriter;
1006 use crate::basic::{
1007 ColumnOrder, Compression, ConvertedType, Encoding, LogicalType, Repetition, SortOrder, Type,
1008 };
1009 use crate::column::page::{Page, PageReader};
1010 use crate::column::reader::get_typed_column_reader;
1011 use crate::compression::{create_codec, Codec, CodecOptionsBuilder};
1012 use crate::data_type::{BoolType, ByteArrayType, Int32Type};
1013 use crate::file::page_index::index::Index;
1014 use crate::file::properties::EnabledStatistics;
1015 use crate::file::serialized_reader::ReadOptionsBuilder;
1016 use crate::file::{
1017 properties::{ReaderProperties, WriterProperties, WriterVersion},
1018 reader::{FileReader, SerializedFileReader, SerializedPageReader},
1019 statistics::{from_thrift, to_thrift, Statistics},
1020 };
1021 use crate::format::SortingColumn;
1022 use crate::record::{Row, RowAccessor};
1023 use crate::schema::parser::parse_message_type;
1024 use crate::schema::types;
1025 use crate::schema::types::{ColumnDescriptor, ColumnPath};
1026 use crate::util::test_common::rand_gen::RandGen;
1027
1028 #[test]
1029 fn test_row_group_writer_error_not_all_columns_written() {
1030 let file = tempfile::tempfile().unwrap();
1031 let schema = Arc::new(
1032 types::Type::group_type_builder("schema")
1033 .with_fields(vec![Arc::new(
1034 types::Type::primitive_type_builder("col1", Type::INT32)
1035 .build()
1036 .unwrap(),
1037 )])
1038 .build()
1039 .unwrap(),
1040 );
1041 let props = Default::default();
1042 let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
1043 let row_group_writer = writer.next_row_group().unwrap();
1044 let res = row_group_writer.close();
1045 assert!(res.is_err());
1046 if let Err(err) = res {
1047 assert_eq!(
1048 format!("{err}"),
1049 "Parquet error: Column length mismatch: 1 != 0"
1050 );
1051 }
1052 }
1053
1054 #[test]
1055 fn test_row_group_writer_num_records_mismatch() {
1056 let file = tempfile::tempfile().unwrap();
1057 let schema = Arc::new(
1058 types::Type::group_type_builder("schema")
1059 .with_fields(vec![
1060 Arc::new(
1061 types::Type::primitive_type_builder("col1", Type::INT32)
1062 .with_repetition(Repetition::REQUIRED)
1063 .build()
1064 .unwrap(),
1065 ),
1066 Arc::new(
1067 types::Type::primitive_type_builder("col2", Type::INT32)
1068 .with_repetition(Repetition::REQUIRED)
1069 .build()
1070 .unwrap(),
1071 ),
1072 ])
1073 .build()
1074 .unwrap(),
1075 );
1076 let props = Default::default();
1077 let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
1078 let mut row_group_writer = writer.next_row_group().unwrap();
1079
1080 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
1081 col_writer
1082 .typed::<Int32Type>()
1083 .write_batch(&[1, 2, 3], None, None)
1084 .unwrap();
1085 col_writer.close().unwrap();
1086
1087 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
1088 col_writer
1089 .typed::<Int32Type>()
1090 .write_batch(&[1, 2], None, None)
1091 .unwrap();
1092
1093 let err = col_writer.close().unwrap_err();
1094 assert_eq!(
1095 err.to_string(),
1096 "Parquet error: Incorrect number of rows, expected 3 != 2 rows"
1097 );
1098 }
1099
1100 #[test]
1101 fn test_file_writer_empty_file() {
1102 let file = tempfile::tempfile().unwrap();
1103
1104 let schema = Arc::new(
1105 types::Type::group_type_builder("schema")
1106 .with_fields(vec![Arc::new(
1107 types::Type::primitive_type_builder("col1", Type::INT32)
1108 .build()
1109 .unwrap(),
1110 )])
1111 .build()
1112 .unwrap(),
1113 );
1114 let props = Default::default();
1115 let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1116 writer.close().unwrap();
1117
1118 let reader = SerializedFileReader::new(file).unwrap();
1119 assert_eq!(reader.get_row_iter(None).unwrap().count(), 0);
1120 }
1121
1122 #[test]
1123 fn test_file_writer_column_orders_populated() {
1124 let file = tempfile::tempfile().unwrap();
1125
1126 let schema = Arc::new(
1127 types::Type::group_type_builder("schema")
1128 .with_fields(vec![
1129 Arc::new(
1130 types::Type::primitive_type_builder("col1", Type::INT32)
1131 .build()
1132 .unwrap(),
1133 ),
1134 Arc::new(
1135 types::Type::primitive_type_builder("col2", Type::FIXED_LEN_BYTE_ARRAY)
1136 .with_converted_type(ConvertedType::INTERVAL)
1137 .with_length(12)
1138 .build()
1139 .unwrap(),
1140 ),
1141 Arc::new(
1142 types::Type::group_type_builder("nested")
1143 .with_repetition(Repetition::REQUIRED)
1144 .with_fields(vec![
1145 Arc::new(
1146 types::Type::primitive_type_builder(
1147 "col3",
1148 Type::FIXED_LEN_BYTE_ARRAY,
1149 )
1150 .with_logical_type(Some(LogicalType::Float16))
1151 .with_length(2)
1152 .build()
1153 .unwrap(),
1154 ),
1155 Arc::new(
1156 types::Type::primitive_type_builder("col4", Type::BYTE_ARRAY)
1157 .with_logical_type(Some(LogicalType::String))
1158 .build()
1159 .unwrap(),
1160 ),
1161 ])
1162 .build()
1163 .unwrap(),
1164 ),
1165 ])
1166 .build()
1167 .unwrap(),
1168 );
1169
1170 let props = Default::default();
1171 let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1172 writer.close().unwrap();
1173
1174 let reader = SerializedFileReader::new(file).unwrap();
1175
1176 let expected = vec![
1178 ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED),
1180 ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::UNDEFINED),
1182 ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED),
1184 ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::UNSIGNED),
1186 ];
1187 let actual = reader.metadata().file_metadata().column_orders();
1188
1189 assert!(actual.is_some());
1190 let actual = actual.unwrap();
1191 assert_eq!(*actual, expected);
1192 }
1193
1194 #[test]
1195 fn test_file_writer_with_metadata() {
1196 let file = tempfile::tempfile().unwrap();
1197
1198 let schema = Arc::new(
1199 types::Type::group_type_builder("schema")
1200 .with_fields(vec![Arc::new(
1201 types::Type::primitive_type_builder("col1", Type::INT32)
1202 .build()
1203 .unwrap(),
1204 )])
1205 .build()
1206 .unwrap(),
1207 );
1208 let props = Arc::new(
1209 WriterProperties::builder()
1210 .set_key_value_metadata(Some(vec![KeyValue::new(
1211 "key".to_string(),
1212 "value".to_string(),
1213 )]))
1214 .build(),
1215 );
1216 let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1217 writer.close().unwrap();
1218
1219 let reader = SerializedFileReader::new(file).unwrap();
1220 assert_eq!(
1221 reader
1222 .metadata()
1223 .file_metadata()
1224 .key_value_metadata()
1225 .to_owned()
1226 .unwrap()
1227 .len(),
1228 1
1229 );
1230 }
1231
1232 #[test]
1233 fn test_file_writer_v2_with_metadata() {
1234 let file = tempfile::tempfile().unwrap();
1235 let field_logical_type = Some(LogicalType::Integer {
1236 bit_width: 8,
1237 is_signed: false,
1238 });
1239 let field = Arc::new(
1240 types::Type::primitive_type_builder("col1", Type::INT32)
1241 .with_logical_type(field_logical_type.clone())
1242 .with_converted_type(field_logical_type.into())
1243 .build()
1244 .unwrap(),
1245 );
1246 let schema = Arc::new(
1247 types::Type::group_type_builder("schema")
1248 .with_fields(vec![field.clone()])
1249 .build()
1250 .unwrap(),
1251 );
1252 let props = Arc::new(
1253 WriterProperties::builder()
1254 .set_key_value_metadata(Some(vec![KeyValue::new(
1255 "key".to_string(),
1256 "value".to_string(),
1257 )]))
1258 .set_writer_version(WriterVersion::PARQUET_2_0)
1259 .build(),
1260 );
1261 let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1262 writer.close().unwrap();
1263
1264 let reader = SerializedFileReader::new(file).unwrap();
1265
1266 assert_eq!(
1267 reader
1268 .metadata()
1269 .file_metadata()
1270 .key_value_metadata()
1271 .to_owned()
1272 .unwrap()
1273 .len(),
1274 1
1275 );
1276
1277 let fields = reader.metadata().file_metadata().schema().get_fields();
1279 assert_eq!(fields.len(), 1);
1280 assert_eq!(fields[0], field);
1281 }
1282
1283 #[test]
1284 fn test_file_writer_with_sorting_columns_metadata() {
1285 let file = tempfile::tempfile().unwrap();
1286
1287 let schema = Arc::new(
1288 types::Type::group_type_builder("schema")
1289 .with_fields(vec![
1290 Arc::new(
1291 types::Type::primitive_type_builder("col1", Type::INT32)
1292 .build()
1293 .unwrap(),
1294 ),
1295 Arc::new(
1296 types::Type::primitive_type_builder("col2", Type::INT32)
1297 .build()
1298 .unwrap(),
1299 ),
1300 ])
1301 .build()
1302 .unwrap(),
1303 );
1304 let expected_result = Some(vec![SortingColumn {
1305 column_idx: 0,
1306 descending: false,
1307 nulls_first: true,
1308 }]);
1309 let props = Arc::new(
1310 WriterProperties::builder()
1311 .set_key_value_metadata(Some(vec![KeyValue::new(
1312 "key".to_string(),
1313 "value".to_string(),
1314 )]))
1315 .set_sorting_columns(expected_result.clone())
1316 .build(),
1317 );
1318 let mut writer =
1319 SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1320 let mut row_group_writer = writer.next_row_group().expect("get row group writer");
1321
1322 let col_writer = row_group_writer.next_column().unwrap().unwrap();
1323 col_writer.close().unwrap();
1324
1325 let col_writer = row_group_writer.next_column().unwrap().unwrap();
1326 col_writer.close().unwrap();
1327
1328 row_group_writer.close().unwrap();
1329 writer.close().unwrap();
1330
1331 let reader = SerializedFileReader::new(file).unwrap();
1332 let result: Vec<Option<&Vec<SortingColumn>>> = reader
1333 .metadata()
1334 .row_groups()
1335 .iter()
1336 .map(|f| f.sorting_columns())
1337 .collect();
1338 assert_eq!(expected_result.as_ref(), result[0]);
1340 }
1341
1342 #[test]
1343 fn test_file_writer_empty_row_groups() {
1344 let file = tempfile::tempfile().unwrap();
1345 test_file_roundtrip(file, vec![]);
1346 }
1347
1348 #[test]
1349 fn test_file_writer_single_row_group() {
1350 let file = tempfile::tempfile().unwrap();
1351 test_file_roundtrip(file, vec![vec![1, 2, 3, 4, 5]]);
1352 }
1353
1354 #[test]
1355 fn test_file_writer_multiple_row_groups() {
1356 let file = tempfile::tempfile().unwrap();
1357 test_file_roundtrip(
1358 file,
1359 vec![
1360 vec![1, 2, 3, 4, 5],
1361 vec![1, 2, 3],
1362 vec![1],
1363 vec![1, 2, 3, 4, 5, 6],
1364 ],
1365 );
1366 }
1367
1368 #[test]
1369 fn test_file_writer_multiple_large_row_groups() {
1370 let file = tempfile::tempfile().unwrap();
1371 test_file_roundtrip(
1372 file,
1373 vec![vec![123; 1024], vec![124; 1000], vec![125; 15], vec![]],
1374 );
1375 }
1376
1377 #[test]
1378 fn test_page_writer_data_pages() {
1379 let pages = vec![
1380 Page::DataPage {
1381 buf: Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]),
1382 num_values: 10,
1383 encoding: Encoding::DELTA_BINARY_PACKED,
1384 def_level_encoding: Encoding::RLE,
1385 rep_level_encoding: Encoding::RLE,
1386 statistics: Some(Statistics::int32(Some(1), Some(3), None, Some(7), true)),
1387 },
1388 Page::DataPageV2 {
1389 buf: Bytes::from(vec![4; 128]),
1390 num_values: 10,
1391 encoding: Encoding::DELTA_BINARY_PACKED,
1392 num_nulls: 2,
1393 num_rows: 12,
1394 def_levels_byte_len: 24,
1395 rep_levels_byte_len: 32,
1396 is_compressed: false,
1397 statistics: Some(Statistics::int32(Some(1), Some(3), None, Some(7), true)),
1398 },
1399 ];
1400
1401 test_page_roundtrip(&pages[..], Compression::SNAPPY, Type::INT32);
1402 test_page_roundtrip(&pages[..], Compression::UNCOMPRESSED, Type::INT32);
1403 }
1404
1405 #[test]
1406 fn test_page_writer_dict_pages() {
1407 let pages = vec![
1408 Page::DictionaryPage {
1409 buf: Bytes::from(vec![1, 2, 3, 4, 5]),
1410 num_values: 5,
1411 encoding: Encoding::RLE_DICTIONARY,
1412 is_sorted: false,
1413 },
1414 Page::DataPage {
1415 buf: Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]),
1416 num_values: 10,
1417 encoding: Encoding::DELTA_BINARY_PACKED,
1418 def_level_encoding: Encoding::RLE,
1419 rep_level_encoding: Encoding::RLE,
1420 statistics: Some(Statistics::int32(Some(1), Some(3), None, Some(7), true)),
1421 },
1422 Page::DataPageV2 {
1423 buf: Bytes::from(vec![4; 128]),
1424 num_values: 10,
1425 encoding: Encoding::DELTA_BINARY_PACKED,
1426 num_nulls: 2,
1427 num_rows: 12,
1428 def_levels_byte_len: 24,
1429 rep_levels_byte_len: 32,
1430 is_compressed: false,
1431 statistics: None,
1432 },
1433 ];
1434
1435 test_page_roundtrip(&pages[..], Compression::SNAPPY, Type::INT32);
1436 test_page_roundtrip(&pages[..], Compression::UNCOMPRESSED, Type::INT32);
1437 }
1438
1439 fn test_page_roundtrip(pages: &[Page], codec: Compression, physical_type: Type) {
1443 let mut compressed_pages = vec![];
1444 let mut total_num_values = 0i64;
1445 let codec_options = CodecOptionsBuilder::default()
1446 .set_backward_compatible_lz4(false)
1447 .build();
1448 let mut compressor = create_codec(codec, &codec_options).unwrap();
1449
1450 for page in pages {
1451 let uncompressed_len = page.buffer().len();
1452
1453 let compressed_page = match *page {
1454 Page::DataPage {
1455 ref buf,
1456 num_values,
1457 encoding,
1458 def_level_encoding,
1459 rep_level_encoding,
1460 ref statistics,
1461 } => {
1462 total_num_values += num_values as i64;
1463 let output_buf = compress_helper(compressor.as_mut(), buf);
1464
1465 Page::DataPage {
1466 buf: Bytes::from(output_buf),
1467 num_values,
1468 encoding,
1469 def_level_encoding,
1470 rep_level_encoding,
1471 statistics: from_thrift(physical_type, to_thrift(statistics.as_ref()))
1472 .unwrap(),
1473 }
1474 }
1475 Page::DataPageV2 {
1476 ref buf,
1477 num_values,
1478 encoding,
1479 num_nulls,
1480 num_rows,
1481 def_levels_byte_len,
1482 rep_levels_byte_len,
1483 ref statistics,
1484 ..
1485 } => {
1486 total_num_values += num_values as i64;
1487 let offset = (def_levels_byte_len + rep_levels_byte_len) as usize;
1488 let cmp_buf = compress_helper(compressor.as_mut(), &buf[offset..]);
1489 let mut output_buf = Vec::from(&buf[..offset]);
1490 output_buf.extend_from_slice(&cmp_buf[..]);
1491
1492 Page::DataPageV2 {
1493 buf: Bytes::from(output_buf),
1494 num_values,
1495 encoding,
1496 num_nulls,
1497 num_rows,
1498 def_levels_byte_len,
1499 rep_levels_byte_len,
1500 is_compressed: compressor.is_some(),
1501 statistics: from_thrift(physical_type, to_thrift(statistics.as_ref()))
1502 .unwrap(),
1503 }
1504 }
1505 Page::DictionaryPage {
1506 ref buf,
1507 num_values,
1508 encoding,
1509 is_sorted,
1510 } => {
1511 let output_buf = compress_helper(compressor.as_mut(), buf);
1512
1513 Page::DictionaryPage {
1514 buf: Bytes::from(output_buf),
1515 num_values,
1516 encoding,
1517 is_sorted,
1518 }
1519 }
1520 };
1521
1522 let compressed_page = CompressedPage::new(compressed_page, uncompressed_len);
1523 compressed_pages.push(compressed_page);
1524 }
1525
1526 let mut buffer: Vec<u8> = vec![];
1527 let mut result_pages: Vec<Page> = vec![];
1528 {
1529 let mut writer = TrackedWrite::new(&mut buffer);
1530 let mut page_writer = SerializedPageWriter::new(&mut writer);
1531
1532 for page in compressed_pages {
1533 page_writer.write_page(page).unwrap();
1534 }
1535 page_writer.close().unwrap();
1536 }
1537 {
1538 let reader = bytes::Bytes::from(buffer);
1539
1540 let t = types::Type::primitive_type_builder("t", physical_type)
1541 .build()
1542 .unwrap();
1543
1544 let desc = ColumnDescriptor::new(Arc::new(t), 0, 0, ColumnPath::new(vec![]));
1545 let meta = ColumnChunkMetaData::builder(Arc::new(desc))
1546 .set_compression(codec)
1547 .set_total_compressed_size(reader.len() as i64)
1548 .set_num_values(total_num_values)
1549 .build()
1550 .unwrap();
1551
1552 let props = ReaderProperties::builder()
1553 .set_backward_compatible_lz4(false)
1554 .build();
1555 let mut page_reader = SerializedPageReader::new_with_properties(
1556 Arc::new(reader),
1557 &meta,
1558 total_num_values as usize,
1559 None,
1560 Arc::new(props),
1561 )
1562 .unwrap();
1563
1564 while let Some(page) = page_reader.get_next_page().unwrap() {
1565 result_pages.push(page);
1566 }
1567 }
1568
1569 assert_eq!(result_pages.len(), pages.len());
1570 for i in 0..result_pages.len() {
1571 assert_page(&result_pages[i], &pages[i]);
1572 }
1573 }
1574
1575 fn compress_helper(compressor: Option<&mut Box<dyn Codec>>, data: &[u8]) -> Vec<u8> {
1577 let mut output_buf = vec![];
1578 if let Some(cmpr) = compressor {
1579 cmpr.compress(data, &mut output_buf).unwrap();
1580 } else {
1581 output_buf.extend_from_slice(data);
1582 }
1583 output_buf
1584 }
1585
1586 fn assert_page(left: &Page, right: &Page) {
1588 assert_eq!(left.page_type(), right.page_type());
1589 assert_eq!(&left.buffer(), &right.buffer());
1590 assert_eq!(left.num_values(), right.num_values());
1591 assert_eq!(left.encoding(), right.encoding());
1592 assert_eq!(to_thrift(left.statistics()), to_thrift(right.statistics()));
1593 }
1594
1595 fn test_roundtrip_i32<W, R>(
1597 file: W,
1598 data: Vec<Vec<i32>>,
1599 compression: Compression,
1600 ) -> crate::format::FileMetaData
1601 where
1602 W: Write + Send,
1603 R: ChunkReader + From<W> + 'static,
1604 {
1605 test_roundtrip::<W, R, Int32Type, _>(file, data, |r| r.get_int(0).unwrap(), compression)
1606 }
1607
1608 fn test_roundtrip<W, R, D, F>(
1611 mut file: W,
1612 data: Vec<Vec<D::T>>,
1613 value: F,
1614 compression: Compression,
1615 ) -> crate::format::FileMetaData
1616 where
1617 W: Write + Send,
1618 R: ChunkReader + From<W> + 'static,
1619 D: DataType,
1620 F: Fn(Row) -> D::T,
1621 {
1622 let schema = Arc::new(
1623 types::Type::group_type_builder("schema")
1624 .with_fields(vec![Arc::new(
1625 types::Type::primitive_type_builder("col1", D::get_physical_type())
1626 .with_repetition(Repetition::REQUIRED)
1627 .build()
1628 .unwrap(),
1629 )])
1630 .build()
1631 .unwrap(),
1632 );
1633 let props = Arc::new(
1634 WriterProperties::builder()
1635 .set_compression(compression)
1636 .build(),
1637 );
1638 let mut file_writer = SerializedFileWriter::new(&mut file, schema, props).unwrap();
1639 let mut rows: i64 = 0;
1640
1641 for (idx, subset) in data.iter().enumerate() {
1642 let row_group_file_offset = file_writer.buf.bytes_written();
1643 let mut row_group_writer = file_writer.next_row_group().unwrap();
1644 if let Some(mut writer) = row_group_writer.next_column().unwrap() {
1645 rows += writer
1646 .typed::<D>()
1647 .write_batch(&subset[..], None, None)
1648 .unwrap() as i64;
1649 writer.close().unwrap();
1650 }
1651 let last_group = row_group_writer.close().unwrap();
1652 let flushed = file_writer.flushed_row_groups();
1653 assert_eq!(flushed.len(), idx + 1);
1654 assert_eq!(Some(idx as i16), last_group.ordinal());
1655 assert_eq!(Some(row_group_file_offset as i64), last_group.file_offset());
1656 assert_eq!(&flushed[idx], last_group.as_ref());
1657 }
1658 let file_metadata = file_writer.close().unwrap();
1659
1660 let reader = SerializedFileReader::new(R::from(file)).unwrap();
1661 assert_eq!(reader.num_row_groups(), data.len());
1662 assert_eq!(
1663 reader.metadata().file_metadata().num_rows(),
1664 rows,
1665 "row count in metadata not equal to number of rows written"
1666 );
1667 for (i, item) in data.iter().enumerate().take(reader.num_row_groups()) {
1668 let row_group_reader = reader.get_row_group(i).unwrap();
1669 let iter = row_group_reader.get_row_iter(None).unwrap();
1670 let res: Vec<_> = iter.map(|row| row.unwrap()).map(&value).collect();
1671 let row_group_size = row_group_reader.metadata().total_byte_size();
1672 let uncompressed_size: i64 = row_group_reader
1673 .metadata()
1674 .columns()
1675 .iter()
1676 .map(|v| v.uncompressed_size())
1677 .sum();
1678 assert_eq!(row_group_size, uncompressed_size);
1679 assert_eq!(res, *item);
1680 }
1681 file_metadata
1682 }
1683
1684 fn test_file_roundtrip(file: File, data: Vec<Vec<i32>>) -> crate::format::FileMetaData {
1687 test_roundtrip_i32::<File, File>(file, data, Compression::UNCOMPRESSED)
1688 }
1689
1690 #[test]
1691 fn test_bytes_writer_empty_row_groups() {
1692 test_bytes_roundtrip(vec![], Compression::UNCOMPRESSED);
1693 }
1694
1695 #[test]
1696 fn test_bytes_writer_single_row_group() {
1697 test_bytes_roundtrip(vec![vec![1, 2, 3, 4, 5]], Compression::UNCOMPRESSED);
1698 }
1699
1700 #[test]
1701 fn test_bytes_writer_multiple_row_groups() {
1702 test_bytes_roundtrip(
1703 vec![
1704 vec![1, 2, 3, 4, 5],
1705 vec![1, 2, 3],
1706 vec![1],
1707 vec![1, 2, 3, 4, 5, 6],
1708 ],
1709 Compression::UNCOMPRESSED,
1710 );
1711 }
1712
1713 #[test]
1714 fn test_bytes_writer_single_row_group_compressed() {
1715 test_bytes_roundtrip(vec![vec![1, 2, 3, 4, 5]], Compression::SNAPPY);
1716 }
1717
1718 #[test]
1719 fn test_bytes_writer_multiple_row_groups_compressed() {
1720 test_bytes_roundtrip(
1721 vec![
1722 vec![1, 2, 3, 4, 5],
1723 vec![1, 2, 3],
1724 vec![1],
1725 vec![1, 2, 3, 4, 5, 6],
1726 ],
1727 Compression::SNAPPY,
1728 );
1729 }
1730
1731 fn test_bytes_roundtrip(data: Vec<Vec<i32>>, compression: Compression) {
1732 test_roundtrip_i32::<Vec<u8>, Bytes>(Vec::with_capacity(1024), data, compression);
1733 }
1734
1735 #[test]
1736 fn test_boolean_roundtrip() {
1737 let my_bool_values: Vec<_> = (0..2049).map(|idx| idx % 2 == 0).collect();
1738 test_roundtrip::<Vec<u8>, Bytes, BoolType, _>(
1739 Vec::with_capacity(1024),
1740 vec![my_bool_values],
1741 |r| r.get_bool(0).unwrap(),
1742 Compression::UNCOMPRESSED,
1743 );
1744 }
1745
1746 #[test]
1747 fn test_boolean_compressed_roundtrip() {
1748 let my_bool_values: Vec<_> = (0..2049).map(|idx| idx % 2 == 0).collect();
1749 test_roundtrip::<Vec<u8>, Bytes, BoolType, _>(
1750 Vec::with_capacity(1024),
1751 vec![my_bool_values],
1752 |r| r.get_bool(0).unwrap(),
1753 Compression::SNAPPY,
1754 );
1755 }
1756
1757 #[test]
1758 fn test_column_offset_index_file() {
1759 let file = tempfile::tempfile().unwrap();
1760 let file_metadata = test_file_roundtrip(file, vec![vec![1, 2, 3, 4, 5]]);
1761 file_metadata.row_groups.iter().for_each(|row_group| {
1762 row_group.columns.iter().for_each(|column_chunk| {
1763 assert_ne!(None, column_chunk.column_index_offset);
1764 assert_ne!(None, column_chunk.column_index_length);
1765
1766 assert_ne!(None, column_chunk.offset_index_offset);
1767 assert_ne!(None, column_chunk.offset_index_length);
1768 })
1769 });
1770 }
1771
1772 fn test_kv_metadata(initial_kv: Option<Vec<KeyValue>>, final_kv: Option<Vec<KeyValue>>) {
1773 let schema = Arc::new(
1774 types::Type::group_type_builder("schema")
1775 .with_fields(vec![Arc::new(
1776 types::Type::primitive_type_builder("col1", Type::INT32)
1777 .with_repetition(Repetition::REQUIRED)
1778 .build()
1779 .unwrap(),
1780 )])
1781 .build()
1782 .unwrap(),
1783 );
1784 let mut out = Vec::with_capacity(1024);
1785 let props = Arc::new(
1786 WriterProperties::builder()
1787 .set_key_value_metadata(initial_kv.clone())
1788 .build(),
1789 );
1790 let mut writer = SerializedFileWriter::new(&mut out, schema, props).unwrap();
1791 let mut row_group_writer = writer.next_row_group().unwrap();
1792 let column = row_group_writer.next_column().unwrap().unwrap();
1793 column.close().unwrap();
1794 row_group_writer.close().unwrap();
1795 if let Some(kvs) = &final_kv {
1796 for kv in kvs {
1797 writer.append_key_value_metadata(kv.clone())
1798 }
1799 }
1800 writer.close().unwrap();
1801
1802 let reader = SerializedFileReader::new(Bytes::from(out)).unwrap();
1803 let metadata = reader.metadata().file_metadata();
1804 let keys = metadata.key_value_metadata();
1805
1806 match (initial_kv, final_kv) {
1807 (Some(a), Some(b)) => {
1808 let keys = keys.unwrap();
1809 assert_eq!(keys.len(), a.len() + b.len());
1810 assert_eq!(&keys[..a.len()], a.as_slice());
1811 assert_eq!(&keys[a.len()..], b.as_slice());
1812 }
1813 (Some(v), None) => assert_eq!(keys.unwrap(), &v),
1814 (None, Some(v)) if !v.is_empty() => assert_eq!(keys.unwrap(), &v),
1815 _ => assert!(keys.is_none()),
1816 }
1817 }
1818
1819 #[test]
1820 fn test_append_metadata() {
1821 let kv1 = KeyValue::new("cupcakes".to_string(), "awesome".to_string());
1822 let kv2 = KeyValue::new("bingo".to_string(), "bongo".to_string());
1823
1824 test_kv_metadata(None, None);
1825 test_kv_metadata(Some(vec![kv1.clone()]), None);
1826 test_kv_metadata(None, Some(vec![kv2.clone()]));
1827 test_kv_metadata(Some(vec![kv1.clone()]), Some(vec![kv2.clone()]));
1828 test_kv_metadata(Some(vec![]), Some(vec![kv2]));
1829 test_kv_metadata(Some(vec![]), Some(vec![]));
1830 test_kv_metadata(Some(vec![kv1]), Some(vec![]));
1831 test_kv_metadata(None, Some(vec![]));
1832 }
1833
1834 #[test]
1835 fn test_backwards_compatible_statistics() {
1836 let message_type = "
1837 message test_schema {
1838 REQUIRED INT32 decimal1 (DECIMAL(8,2));
1839 REQUIRED INT32 i32 (INTEGER(32,true));
1840 REQUIRED INT32 u32 (INTEGER(32,false));
1841 }
1842 ";
1843
1844 let schema = Arc::new(parse_message_type(message_type).unwrap());
1845 let props = Default::default();
1846 let mut writer = SerializedFileWriter::new(vec![], schema, props).unwrap();
1847 let mut row_group_writer = writer.next_row_group().unwrap();
1848
1849 for _ in 0..3 {
1850 let mut writer = row_group_writer.next_column().unwrap().unwrap();
1851 writer
1852 .typed::<Int32Type>()
1853 .write_batch(&[1, 2, 3], None, None)
1854 .unwrap();
1855 writer.close().unwrap();
1856 }
1857 let metadata = row_group_writer.close().unwrap();
1858 writer.close().unwrap();
1859
1860 let thrift = metadata.to_thrift();
1861 let encoded_stats: Vec<_> = thrift
1862 .columns
1863 .into_iter()
1864 .map(|x| x.meta_data.unwrap().statistics.unwrap())
1865 .collect();
1866
1867 let s = &encoded_stats[0];
1869 assert_eq!(s.min.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1870 assert_eq!(s.max.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1871 assert_eq!(s.min_value.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1872 assert_eq!(s.max_value.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1873
1874 let s = &encoded_stats[1];
1876 assert_eq!(s.min.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1877 assert_eq!(s.max.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1878 assert_eq!(s.min_value.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1879 assert_eq!(s.max_value.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1880
1881 let s = &encoded_stats[2];
1883 assert_eq!(s.min.as_deref(), None);
1884 assert_eq!(s.max.as_deref(), None);
1885 assert_eq!(s.min_value.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1886 assert_eq!(s.max_value.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1887 }
1888
1889 #[test]
1890 fn test_spliced_write() {
1891 let message_type = "
1892 message test_schema {
1893 REQUIRED INT32 i32 (INTEGER(32,true));
1894 REQUIRED INT32 u32 (INTEGER(32,false));
1895 }
1896 ";
1897 let schema = Arc::new(parse_message_type(message_type).unwrap());
1898 let props = Arc::new(WriterProperties::builder().build());
1899
1900 let mut file = Vec::with_capacity(1024);
1901 let mut file_writer = SerializedFileWriter::new(&mut file, schema, props.clone()).unwrap();
1902
1903 let columns = file_writer.descr.columns();
1904 let mut column_state: Vec<(_, Option<ColumnCloseResult>)> = columns
1905 .iter()
1906 .map(|_| (TrackedWrite::new(Vec::with_capacity(1024)), None))
1907 .collect();
1908
1909 let mut column_state_slice = column_state.as_mut_slice();
1910 let mut column_writers = Vec::with_capacity(columns.len());
1911 for c in columns {
1912 let ((buf, out), tail) = column_state_slice.split_first_mut().unwrap();
1913 column_state_slice = tail;
1914
1915 let page_writer = Box::new(SerializedPageWriter::new(buf));
1916 let col_writer = get_column_writer(c.clone(), props.clone(), page_writer);
1917 column_writers.push(SerializedColumnWriter::new(
1918 col_writer,
1919 Some(Box::new(|on_close| {
1920 *out = Some(on_close);
1921 Ok(())
1922 })),
1923 ));
1924 }
1925
1926 let column_data = [[1, 2, 3, 4], [7, 3, 7, 3]];
1927
1928 for (writer, batch) in column_writers.iter_mut().zip(column_data) {
1930 let writer = writer.typed::<Int32Type>();
1931 writer.write_batch(&batch, None, None).unwrap();
1932 }
1933
1934 for writer in column_writers {
1936 writer.close().unwrap()
1937 }
1938
1939 let mut row_group_writer = file_writer.next_row_group().unwrap();
1941 for (write, close) in column_state {
1942 let buf = Bytes::from(write.into_inner().unwrap());
1943 row_group_writer
1944 .append_column(&buf, close.unwrap())
1945 .unwrap();
1946 }
1947 row_group_writer.close().unwrap();
1948 file_writer.close().unwrap();
1949
1950 let file = Bytes::from(file);
1952 let test_read = |reader: SerializedFileReader<Bytes>| {
1953 let row_group = reader.get_row_group(0).unwrap();
1954
1955 let mut out = Vec::with_capacity(4);
1956 let c1 = row_group.get_column_reader(0).unwrap();
1957 let mut c1 = get_typed_column_reader::<Int32Type>(c1);
1958 c1.read_records(4, None, None, &mut out).unwrap();
1959 assert_eq!(out, column_data[0]);
1960
1961 out.clear();
1962
1963 let c2 = row_group.get_column_reader(1).unwrap();
1964 let mut c2 = get_typed_column_reader::<Int32Type>(c2);
1965 c2.read_records(4, None, None, &mut out).unwrap();
1966 assert_eq!(out, column_data[1]);
1967 };
1968
1969 let reader = SerializedFileReader::new(file.clone()).unwrap();
1970 test_read(reader);
1971
1972 let options = ReadOptionsBuilder::new().with_page_index().build();
1973 let reader = SerializedFileReader::new_with_options(file, options).unwrap();
1974 test_read(reader);
1975 }
1976
1977 #[test]
1978 fn test_disabled_statistics() {
1979 let message_type = "
1980 message test_schema {
1981 REQUIRED INT32 a;
1982 REQUIRED INT32 b;
1983 }
1984 ";
1985 let schema = Arc::new(parse_message_type(message_type).unwrap());
1986 let props = WriterProperties::builder()
1987 .set_statistics_enabled(EnabledStatistics::None)
1988 .set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
1989 .set_offset_index_disabled(true) .build();
1991 let mut file = Vec::with_capacity(1024);
1992 let mut file_writer =
1993 SerializedFileWriter::new(&mut file, schema, Arc::new(props)).unwrap();
1994
1995 let mut row_group_writer = file_writer.next_row_group().unwrap();
1996 let mut a_writer = row_group_writer.next_column().unwrap().unwrap();
1997 let col_writer = a_writer.typed::<Int32Type>();
1998 col_writer.write_batch(&[1, 2, 3], None, None).unwrap();
1999 a_writer.close().unwrap();
2000
2001 let mut b_writer = row_group_writer.next_column().unwrap().unwrap();
2002 let col_writer = b_writer.typed::<Int32Type>();
2003 col_writer.write_batch(&[4, 5, 6], None, None).unwrap();
2004 b_writer.close().unwrap();
2005 row_group_writer.close().unwrap();
2006
2007 let metadata = file_writer.finish().unwrap();
2008 assert_eq!(metadata.row_groups.len(), 1);
2009 let row_group = &metadata.row_groups[0];
2010 assert_eq!(row_group.columns.len(), 2);
2011 assert!(row_group.columns[0].offset_index_offset.is_some());
2013 assert!(row_group.columns[0].column_index_offset.is_some());
2014 assert!(row_group.columns[1].offset_index_offset.is_some());
2016 assert!(row_group.columns[1].column_index_offset.is_none());
2017
2018 let err = file_writer.next_row_group().err().unwrap().to_string();
2019 assert_eq!(err, "Parquet error: SerializedFileWriter already finished");
2020
2021 drop(file_writer);
2022
2023 let options = ReadOptionsBuilder::new().with_page_index().build();
2024 let reader = SerializedFileReader::new_with_options(Bytes::from(file), options).unwrap();
2025
2026 let offset_index = reader.metadata().offset_index().unwrap();
2027 assert_eq!(offset_index.len(), 1); assert_eq!(offset_index[0].len(), 2); let column_index = reader.metadata().column_index().unwrap();
2031 assert_eq!(column_index.len(), 1); assert_eq!(column_index[0].len(), 2); let a_idx = &column_index[0][0];
2035 assert!(matches!(a_idx, Index::INT32(_)), "{a_idx:?}");
2036 let b_idx = &column_index[0][1];
2037 assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
2038 }
2039
2040 #[test]
2041 fn test_byte_array_size_statistics() {
2042 let message_type = "
2043 message test_schema {
2044 OPTIONAL BYTE_ARRAY a (UTF8);
2045 }
2046 ";
2047 let schema = Arc::new(parse_message_type(message_type).unwrap());
2048 let data = ByteArrayType::gen_vec(32, 7);
2049 let def_levels = [1, 1, 1, 1, 0, 1, 0, 1, 0, 1];
2050 let unenc_size: i64 = data.iter().map(|x| x.len() as i64).sum();
2051 let file: File = tempfile::tempfile().unwrap();
2052 let props = Arc::new(
2053 WriterProperties::builder()
2054 .set_statistics_enabled(EnabledStatistics::Page)
2055 .build(),
2056 );
2057
2058 let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
2059 let mut row_group_writer = writer.next_row_group().unwrap();
2060
2061 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
2062 col_writer
2063 .typed::<ByteArrayType>()
2064 .write_batch(&data, Some(&def_levels), None)
2065 .unwrap();
2066 col_writer.close().unwrap();
2067 row_group_writer.close().unwrap();
2068 let file_metadata = writer.close().unwrap();
2069
2070 assert_eq!(file_metadata.row_groups.len(), 1);
2071 assert_eq!(file_metadata.row_groups[0].columns.len(), 1);
2072 assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
2073
2074 let check_def_hist = |def_hist: &[i64]| {
2075 assert_eq!(def_hist.len(), 2);
2076 assert_eq!(def_hist[0], 3);
2077 assert_eq!(def_hist[1], 7);
2078 };
2079
2080 assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
2081 let meta_data = file_metadata.row_groups[0].columns[0]
2082 .meta_data
2083 .as_ref()
2084 .unwrap();
2085 assert!(meta_data.size_statistics.is_some());
2086 let size_stats = meta_data.size_statistics.as_ref().unwrap();
2087
2088 assert!(size_stats.repetition_level_histogram.is_none());
2089 assert!(size_stats.definition_level_histogram.is_some());
2090 assert!(size_stats.unencoded_byte_array_data_bytes.is_some());
2091 assert_eq!(
2092 unenc_size,
2093 size_stats.unencoded_byte_array_data_bytes.unwrap()
2094 );
2095 check_def_hist(size_stats.definition_level_histogram.as_ref().unwrap());
2096
2097 let options = ReadOptionsBuilder::new().with_page_index().build();
2099 let reader = SerializedFileReader::new_with_options(file, options).unwrap();
2100
2101 let rfile_metadata = reader.metadata().file_metadata();
2102 assert_eq!(rfile_metadata.num_rows(), file_metadata.num_rows);
2103 assert_eq!(reader.num_row_groups(), 1);
2104 let rowgroup = reader.get_row_group(0).unwrap();
2105 assert_eq!(rowgroup.num_columns(), 1);
2106 let column = rowgroup.metadata().column(0);
2107 assert!(column.definition_level_histogram().is_some());
2108 assert!(column.repetition_level_histogram().is_none());
2109 assert!(column.unencoded_byte_array_data_bytes().is_some());
2110 check_def_hist(column.definition_level_histogram().unwrap().values());
2111 assert_eq!(
2112 unenc_size,
2113 column.unencoded_byte_array_data_bytes().unwrap()
2114 );
2115
2116 assert!(reader.metadata().column_index().is_some());
2118 let column_index = reader.metadata().column_index().unwrap();
2119 assert_eq!(column_index.len(), 1);
2120 assert_eq!(column_index[0].len(), 1);
2121 let col_idx = if let Index::BYTE_ARRAY(index) = &column_index[0][0] {
2122 assert_eq!(index.indexes.len(), 1);
2123 &index.indexes[0]
2124 } else {
2125 unreachable!()
2126 };
2127
2128 assert!(col_idx.repetition_level_histogram().is_none());
2129 assert!(col_idx.definition_level_histogram().is_some());
2130 check_def_hist(col_idx.definition_level_histogram().unwrap().values());
2131
2132 assert!(reader.metadata().offset_index().is_some());
2133 let offset_index = reader.metadata().offset_index().unwrap();
2134 assert_eq!(offset_index.len(), 1);
2135 assert_eq!(offset_index[0].len(), 1);
2136 assert!(offset_index[0][0].unencoded_byte_array_data_bytes.is_some());
2137 let page_sizes = offset_index[0][0]
2138 .unencoded_byte_array_data_bytes
2139 .as_ref()
2140 .unwrap();
2141 assert_eq!(page_sizes.len(), 1);
2142 assert_eq!(page_sizes[0], unenc_size);
2143 }
2144
2145 #[test]
2146 fn test_too_many_rowgroups() {
2147 let message_type = "
2148 message test_schema {
2149 REQUIRED BYTE_ARRAY a (UTF8);
2150 }
2151 ";
2152 let schema = Arc::new(parse_message_type(message_type).unwrap());
2153 let file: File = tempfile::tempfile().unwrap();
2154 let props = Arc::new(
2155 WriterProperties::builder()
2156 .set_statistics_enabled(EnabledStatistics::None)
2157 .set_max_row_group_size(1)
2158 .build(),
2159 );
2160 let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
2161
2162 for i in 0..0x8001 {
2164 match writer.next_row_group() {
2165 Ok(mut row_group_writer) => {
2166 assert_ne!(i, 0x8000);
2167 let col_writer = row_group_writer.next_column().unwrap().unwrap();
2168 col_writer.close().unwrap();
2169 row_group_writer.close().unwrap();
2170 }
2171 Err(e) => {
2172 assert_eq!(i, 0x8000);
2173 assert_eq!(
2174 e.to_string(),
2175 "Parquet error: Parquet does not support more than 32767 row groups per file (currently: 32768)"
2176 );
2177 }
2178 }
2179 }
2180 writer.close().unwrap();
2181 }
2182
2183 #[test]
2184 fn test_size_statistics_with_repetition_and_nulls() {
2185 let message_type = "
2186 message test_schema {
2187 OPTIONAL group i32_list (LIST) {
2188 REPEATED group list {
2189 OPTIONAL INT32 element;
2190 }
2191 }
2192 }
2193 ";
2194 let schema = Arc::new(parse_message_type(message_type).unwrap());
2201 let data = [1, 2, 4, 7, 8, 9, 10];
2202 let def_levels = [3, 3, 0, 3, 2, 1, 3, 3, 3, 3];
2203 let rep_levels = [0, 1, 0, 0, 1, 0, 0, 1, 1, 1];
2204 let file = tempfile::tempfile().unwrap();
2205 let props = Arc::new(
2206 WriterProperties::builder()
2207 .set_statistics_enabled(EnabledStatistics::Page)
2208 .build(),
2209 );
2210 let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
2211 let mut row_group_writer = writer.next_row_group().unwrap();
2212
2213 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
2214 col_writer
2215 .typed::<Int32Type>()
2216 .write_batch(&data, Some(&def_levels), Some(&rep_levels))
2217 .unwrap();
2218 col_writer.close().unwrap();
2219 row_group_writer.close().unwrap();
2220 let file_metadata = writer.close().unwrap();
2221
2222 assert_eq!(file_metadata.row_groups.len(), 1);
2223 assert_eq!(file_metadata.row_groups[0].columns.len(), 1);
2224 assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
2225
2226 let check_def_hist = |def_hist: &[i64]| {
2227 assert_eq!(def_hist.len(), 4);
2228 assert_eq!(def_hist[0], 1);
2229 assert_eq!(def_hist[1], 1);
2230 assert_eq!(def_hist[2], 1);
2231 assert_eq!(def_hist[3], 7);
2232 };
2233
2234 let check_rep_hist = |rep_hist: &[i64]| {
2235 assert_eq!(rep_hist.len(), 2);
2236 assert_eq!(rep_hist[0], 5);
2237 assert_eq!(rep_hist[1], 5);
2238 };
2239
2240 assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
2243 let meta_data = file_metadata.row_groups[0].columns[0]
2244 .meta_data
2245 .as_ref()
2246 .unwrap();
2247 assert!(meta_data.size_statistics.is_some());
2248 let size_stats = meta_data.size_statistics.as_ref().unwrap();
2249 assert!(size_stats.repetition_level_histogram.is_some());
2250 assert!(size_stats.definition_level_histogram.is_some());
2251 assert!(size_stats.unencoded_byte_array_data_bytes.is_none());
2252 check_def_hist(size_stats.definition_level_histogram.as_ref().unwrap());
2253 check_rep_hist(size_stats.repetition_level_histogram.as_ref().unwrap());
2254
2255 let options = ReadOptionsBuilder::new().with_page_index().build();
2257 let reader = SerializedFileReader::new_with_options(file, options).unwrap();
2258
2259 let rfile_metadata = reader.metadata().file_metadata();
2260 assert_eq!(rfile_metadata.num_rows(), file_metadata.num_rows);
2261 assert_eq!(reader.num_row_groups(), 1);
2262 let rowgroup = reader.get_row_group(0).unwrap();
2263 assert_eq!(rowgroup.num_columns(), 1);
2264 let column = rowgroup.metadata().column(0);
2265 assert!(column.definition_level_histogram().is_some());
2266 assert!(column.repetition_level_histogram().is_some());
2267 assert!(column.unencoded_byte_array_data_bytes().is_none());
2268 check_def_hist(column.definition_level_histogram().unwrap().values());
2269 check_rep_hist(column.repetition_level_histogram().unwrap().values());
2270
2271 assert!(reader.metadata().column_index().is_some());
2273 let column_index = reader.metadata().column_index().unwrap();
2274 assert_eq!(column_index.len(), 1);
2275 assert_eq!(column_index[0].len(), 1);
2276 let col_idx = if let Index::INT32(index) = &column_index[0][0] {
2277 assert_eq!(index.indexes.len(), 1);
2278 &index.indexes[0]
2279 } else {
2280 unreachable!()
2281 };
2282
2283 check_def_hist(col_idx.definition_level_histogram().unwrap().values());
2284 check_rep_hist(col_idx.repetition_level_histogram().unwrap().values());
2285
2286 assert!(reader.metadata().offset_index().is_some());
2287 let offset_index = reader.metadata().offset_index().unwrap();
2288 assert_eq!(offset_index.len(), 1);
2289 assert_eq!(offset_index[0].len(), 1);
2290 assert!(offset_index[0][0].unencoded_byte_array_data_bytes.is_none());
2291 }
2292
2293 #[test]
2294 #[cfg(feature = "arrow")]
2295 fn test_byte_stream_split_extended_roundtrip() {
2296 let path = format!(
2297 "{}/byte_stream_split_extended.gzip.parquet",
2298 arrow::util::test_util::parquet_test_data(),
2299 );
2300 let file = File::open(path).unwrap();
2301
2302 let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(file)
2304 .expect("parquet open")
2305 .build()
2306 .expect("parquet open");
2307
2308 let file = tempfile::tempfile().unwrap();
2309 let props = WriterProperties::builder()
2310 .set_dictionary_enabled(false)
2311 .set_column_encoding(
2312 ColumnPath::from("float16_byte_stream_split"),
2313 Encoding::BYTE_STREAM_SPLIT,
2314 )
2315 .set_column_encoding(
2316 ColumnPath::from("float_byte_stream_split"),
2317 Encoding::BYTE_STREAM_SPLIT,
2318 )
2319 .set_column_encoding(
2320 ColumnPath::from("double_byte_stream_split"),
2321 Encoding::BYTE_STREAM_SPLIT,
2322 )
2323 .set_column_encoding(
2324 ColumnPath::from("int32_byte_stream_split"),
2325 Encoding::BYTE_STREAM_SPLIT,
2326 )
2327 .set_column_encoding(
2328 ColumnPath::from("int64_byte_stream_split"),
2329 Encoding::BYTE_STREAM_SPLIT,
2330 )
2331 .set_column_encoding(
2332 ColumnPath::from("flba5_byte_stream_split"),
2333 Encoding::BYTE_STREAM_SPLIT,
2334 )
2335 .set_column_encoding(
2336 ColumnPath::from("decimal_byte_stream_split"),
2337 Encoding::BYTE_STREAM_SPLIT,
2338 )
2339 .build();
2340
2341 let mut parquet_writer = ArrowWriter::try_new(
2342 file.try_clone().expect("cannot open file"),
2343 parquet_reader.schema(),
2344 Some(props),
2345 )
2346 .expect("create arrow writer");
2347
2348 for maybe_batch in parquet_reader {
2349 let batch = maybe_batch.expect("reading batch");
2350 parquet_writer.write(&batch).expect("writing data");
2351 }
2352
2353 parquet_writer.close().expect("finalizing file");
2354
2355 let reader = SerializedFileReader::new(file).expect("Failed to create reader");
2356 let filemeta = reader.metadata();
2357
2358 let check_encoding = |x: usize, filemeta: &ParquetMetaData| {
2360 assert!(filemeta
2361 .row_group(0)
2362 .column(x)
2363 .encodings()
2364 .contains(&Encoding::BYTE_STREAM_SPLIT));
2365 };
2366
2367 check_encoding(1, filemeta);
2368 check_encoding(3, filemeta);
2369 check_encoding(5, filemeta);
2370 check_encoding(7, filemeta);
2371 check_encoding(9, filemeta);
2372 check_encoding(11, filemeta);
2373 check_encoding(13, filemeta);
2374
2375 let mut iter = reader
2377 .get_row_iter(None)
2378 .expect("Failed to create row iterator");
2379
2380 let mut start = 0;
2381 let end = reader.metadata().file_metadata().num_rows();
2382
2383 let check_row = |row: Result<Row, ParquetError>| {
2384 assert!(row.is_ok());
2385 let r = row.unwrap();
2386 assert_eq!(r.get_float16(0).unwrap(), r.get_float16(1).unwrap());
2387 assert_eq!(r.get_float(2).unwrap(), r.get_float(3).unwrap());
2388 assert_eq!(r.get_double(4).unwrap(), r.get_double(5).unwrap());
2389 assert_eq!(r.get_int(6).unwrap(), r.get_int(7).unwrap());
2390 assert_eq!(r.get_long(8).unwrap(), r.get_long(9).unwrap());
2391 assert_eq!(r.get_bytes(10).unwrap(), r.get_bytes(11).unwrap());
2392 assert_eq!(r.get_decimal(12).unwrap(), r.get_decimal(13).unwrap());
2393 };
2394
2395 while start < end {
2396 match iter.next() {
2397 Some(row) => check_row(row),
2398 None => break,
2399 };
2400 start += 1;
2401 }
2402 }
2403}