1use crate::bloom_filter::Sbbf;
22use crate::format as parquet;
23use crate::format::{ColumnIndex, OffsetIndex};
24use crate::thrift::TSerializable;
25use std::fmt::Debug;
26use std::io::{BufWriter, IoSlice, Read};
27use std::{io::Write, sync::Arc};
28use thrift::protocol::TCompactOutputProtocol;
29
30use crate::column::page_encryption::PageEncryptor;
31use crate::column::writer::{get_typed_column_writer_mut, ColumnCloseResult, ColumnWriterImpl};
32use crate::column::{
33 page::{CompressedPage, PageWriteSpec, PageWriter},
34 writer::{get_column_writer, ColumnWriter},
35};
36use crate::data_type::DataType;
37#[cfg(feature = "encryption")]
38use crate::encryption::encrypt::{
39 get_column_crypto_metadata, FileEncryptionProperties, FileEncryptor,
40};
41use crate::errors::{ParquetError, Result};
42use crate::file::properties::{BloomFilterPosition, WriterPropertiesPtr};
43use crate::file::reader::ChunkReader;
44#[cfg(feature = "encryption")]
45use crate::file::PARQUET_MAGIC_ENCR_FOOTER;
46use crate::file::{metadata::*, PARQUET_MAGIC};
47use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor, TypePtr};
48
49pub struct TrackedWrite<W: Write> {
53 inner: BufWriter<W>,
54 bytes_written: usize,
55}
56
57impl<W: Write> TrackedWrite<W> {
58 pub fn new(inner: W) -> Self {
60 let buf_write = BufWriter::new(inner);
61 Self {
62 inner: buf_write,
63 bytes_written: 0,
64 }
65 }
66
67 pub fn bytes_written(&self) -> usize {
69 self.bytes_written
70 }
71
72 pub fn inner(&self) -> &W {
74 self.inner.get_ref()
75 }
76
77 pub fn inner_mut(&mut self) -> &mut W {
82 self.inner.get_mut()
83 }
84
85 pub fn into_inner(self) -> Result<W> {
87 self.inner.into_inner().map_err(|err| {
88 ParquetError::General(format!("fail to get inner writer: {:?}", err.to_string()))
89 })
90 }
91}
92
93impl<W: Write> Write for TrackedWrite<W> {
94 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
95 let bytes = self.inner.write(buf)?;
96 self.bytes_written += bytes;
97 Ok(bytes)
98 }
99
100 fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> std::io::Result<usize> {
101 let bytes = self.inner.write_vectored(bufs)?;
102 self.bytes_written += bytes;
103 Ok(bytes)
104 }
105
106 fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
107 self.inner.write_all(buf)?;
108 self.bytes_written += buf.len();
109
110 Ok(())
111 }
112
113 fn flush(&mut self) -> std::io::Result<()> {
114 self.inner.flush()
115 }
116}
117
118pub type OnCloseColumnChunk<'a> = Box<dyn FnOnce(ColumnCloseResult) -> Result<()> + 'a>;
120
121pub type OnCloseRowGroup<'a, W> = Box<
127 dyn FnOnce(
128 &'a mut TrackedWrite<W>,
129 RowGroupMetaData,
130 Vec<Option<Sbbf>>,
131 Vec<Option<ColumnIndex>>,
132 Vec<Option<OffsetIndex>>,
133 ) -> Result<()>
134 + 'a
135 + Send,
136>;
137
138pub struct SerializedFileWriter<W: Write> {
151 buf: TrackedWrite<W>,
152 schema: TypePtr,
153 descr: SchemaDescPtr,
154 props: WriterPropertiesPtr,
155 row_groups: Vec<RowGroupMetaData>,
156 bloom_filters: Vec<Vec<Option<Sbbf>>>,
157 column_indexes: Vec<Vec<Option<ColumnIndex>>>,
158 offset_indexes: Vec<Vec<Option<OffsetIndex>>>,
159 row_group_index: usize,
160 kv_metadatas: Vec<KeyValue>,
162 finished: bool,
163 #[cfg(feature = "encryption")]
164 file_encryptor: Option<Arc<FileEncryptor>>,
165}
166
167impl<W: Write> Debug for SerializedFileWriter<W> {
168 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
169 f.debug_struct("SerializedFileWriter")
172 .field("descr", &self.descr)
173 .field("row_group_index", &self.row_group_index)
174 .field("kv_metadatas", &self.kv_metadatas)
175 .finish_non_exhaustive()
176 }
177}
178
179impl<W: Write + Send> SerializedFileWriter<W> {
180 pub fn new(buf: W, schema: TypePtr, properties: WriterPropertiesPtr) -> Result<Self> {
182 let mut buf = TrackedWrite::new(buf);
183
184 let schema_descriptor = SchemaDescriptor::new(schema.clone());
185
186 #[cfg(feature = "encryption")]
187 let file_encryptor = Self::get_file_encryptor(&properties, &schema_descriptor)?;
188
189 Self::start_file(&properties, &mut buf)?;
190 Ok(Self {
191 buf,
192 schema,
193 descr: Arc::new(schema_descriptor),
194 props: properties,
195 row_groups: vec![],
196 bloom_filters: vec![],
197 column_indexes: Vec::new(),
198 offset_indexes: Vec::new(),
199 row_group_index: 0,
200 kv_metadatas: Vec::new(),
201 finished: false,
202 #[cfg(feature = "encryption")]
203 file_encryptor,
204 })
205 }
206
207 #[cfg(feature = "encryption")]
208 fn get_file_encryptor(
209 properties: &WriterPropertiesPtr,
210 schema_descriptor: &SchemaDescriptor,
211 ) -> Result<Option<Arc<FileEncryptor>>> {
212 if let Some(file_encryption_properties) = &properties.file_encryption_properties {
213 file_encryption_properties.validate_encrypted_column_names(schema_descriptor)?;
214
215 Ok(Some(Arc::new(FileEncryptor::new(
216 file_encryption_properties.clone(),
217 )?)))
218 } else {
219 Ok(None)
220 }
221 }
222
223 pub fn next_row_group(&mut self) -> Result<SerializedRowGroupWriter<'_, W>> {
230 self.assert_previous_writer_closed()?;
231 let ordinal = self.row_group_index;
232
233 let ordinal: i16 = ordinal.try_into().map_err(|_| {
234 ParquetError::General(format!(
235 "Parquet does not support more than {} row groups per file (currently: {})",
236 i16::MAX,
237 ordinal
238 ))
239 })?;
240
241 self.row_group_index = self
242 .row_group_index
243 .checked_add(1)
244 .expect("SerializedFileWriter::row_group_index overflowed");
245
246 let bloom_filter_position = self.properties().bloom_filter_position();
247 let row_groups = &mut self.row_groups;
248 let row_bloom_filters = &mut self.bloom_filters;
249 let row_column_indexes = &mut self.column_indexes;
250 let row_offset_indexes = &mut self.offset_indexes;
251 let on_close = move |buf,
252 mut metadata,
253 row_group_bloom_filter,
254 row_group_column_index,
255 row_group_offset_index| {
256 row_bloom_filters.push(row_group_bloom_filter);
257 row_column_indexes.push(row_group_column_index);
258 row_offset_indexes.push(row_group_offset_index);
259 match bloom_filter_position {
261 BloomFilterPosition::AfterRowGroup => {
262 write_bloom_filters(buf, row_bloom_filters, &mut metadata)?
263 }
264 BloomFilterPosition::End => (),
265 };
266 row_groups.push(metadata);
267 Ok(())
268 };
269
270 let row_group_writer = SerializedRowGroupWriter::new(
271 self.descr.clone(),
272 self.props.clone(),
273 &mut self.buf,
274 ordinal,
275 Some(Box::new(on_close)),
276 );
277 #[cfg(feature = "encryption")]
278 let row_group_writer = row_group_writer.with_file_encryptor(self.file_encryptor.clone());
279
280 Ok(row_group_writer)
281 }
282
283 pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
285 &self.row_groups
286 }
287
288 pub fn finish(&mut self) -> Result<parquet::FileMetaData> {
294 self.assert_previous_writer_closed()?;
295 let metadata = self.write_metadata()?;
296 self.buf.flush()?;
297 Ok(metadata)
298 }
299
300 pub fn close(mut self) -> Result<parquet::FileMetaData> {
302 self.finish()
303 }
304
305 #[cfg(not(feature = "encryption"))]
307 fn start_file(_properties: &WriterPropertiesPtr, buf: &mut TrackedWrite<W>) -> Result<()> {
308 buf.write_all(get_file_magic())?;
309 Ok(())
310 }
311
312 #[cfg(feature = "encryption")]
314 fn start_file(properties: &WriterPropertiesPtr, buf: &mut TrackedWrite<W>) -> Result<()> {
315 let magic = get_file_magic(properties.file_encryption_properties.as_ref());
316
317 buf.write_all(magic)?;
318 Ok(())
319 }
320
321 fn write_metadata(&mut self) -> Result<parquet::FileMetaData> {
323 self.finished = true;
324
325 for row_group in &mut self.row_groups {
327 write_bloom_filters(&mut self.buf, &mut self.bloom_filters, row_group)?;
328 }
329
330 let key_value_metadata = match self.props.key_value_metadata() {
331 Some(kv) => Some(kv.iter().chain(&self.kv_metadatas).cloned().collect()),
332 None if self.kv_metadatas.is_empty() => None,
333 None => Some(self.kv_metadatas.clone()),
334 };
335
336 let row_groups = self
337 .row_groups
338 .iter()
339 .map(|v| v.to_thrift())
340 .collect::<Vec<_>>();
341
342 let mut encoder = ThriftMetadataWriter::new(
343 &mut self.buf,
344 &self.schema,
345 &self.descr,
346 row_groups,
347 Some(self.props.created_by().to_string()),
348 self.props.writer_version().as_num(),
349 );
350
351 #[cfg(feature = "encryption")]
352 {
353 encoder = encoder.with_file_encryptor(self.file_encryptor.clone());
354 }
355
356 if let Some(key_value_metadata) = key_value_metadata {
357 encoder = encoder.with_key_value_metadata(key_value_metadata)
358 }
359 encoder = encoder.with_column_indexes(&self.column_indexes);
360 encoder = encoder.with_offset_indexes(&self.offset_indexes);
361 encoder.finish()
362 }
363
364 #[inline]
365 fn assert_previous_writer_closed(&self) -> Result<()> {
366 if self.finished {
367 return Err(general_err!("SerializedFileWriter already finished"));
368 }
369
370 if self.row_group_index != self.row_groups.len() {
371 Err(general_err!("Previous row group writer was not closed"))
372 } else {
373 Ok(())
374 }
375 }
376
377 pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
379 self.kv_metadatas.push(kv_metadata);
380 }
381
382 pub fn schema_descr(&self) -> &SchemaDescriptor {
384 &self.descr
385 }
386
387 pub fn properties(&self) -> &WriterPropertiesPtr {
389 &self.props
390 }
391
392 pub fn inner(&self) -> &W {
394 self.buf.inner()
395 }
396
397 pub fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
406 self.buf.write_all(buf)
407 }
408
409 pub fn inner_mut(&mut self) -> &mut W {
418 self.buf.inner_mut()
419 }
420
421 pub fn into_inner(mut self) -> Result<W> {
423 self.assert_previous_writer_closed()?;
424 let _ = self.write_metadata()?;
425
426 self.buf.into_inner()
427 }
428
429 pub fn bytes_written(&self) -> usize {
431 self.buf.bytes_written()
432 }
433
434 #[cfg(feature = "encryption")]
436 pub(crate) fn file_encryptor(&self) -> Option<Arc<FileEncryptor>> {
437 self.file_encryptor.clone()
438 }
439}
440
441fn write_bloom_filters<W: Write + Send>(
444 buf: &mut TrackedWrite<W>,
445 bloom_filters: &mut [Vec<Option<Sbbf>>],
446 row_group: &mut RowGroupMetaData,
447) -> Result<()> {
448 let row_group_idx: u16 = row_group
453 .ordinal()
454 .expect("Missing row group ordinal")
455 .try_into()
456 .map_err(|_| {
457 ParquetError::General(format!(
458 "Negative row group ordinal: {})",
459 row_group.ordinal().unwrap()
460 ))
461 })?;
462 let row_group_idx = row_group_idx as usize;
463 for (column_idx, column_chunk) in row_group.columns_mut().iter_mut().enumerate() {
464 if let Some(bloom_filter) = bloom_filters[row_group_idx][column_idx].take() {
465 let start_offset = buf.bytes_written();
466 bloom_filter.write(&mut *buf)?;
467 let end_offset = buf.bytes_written();
468 *column_chunk = column_chunk
470 .clone()
471 .into_builder()
472 .set_bloom_filter_offset(Some(start_offset as i64))
473 .set_bloom_filter_length(Some((end_offset - start_offset) as i32))
474 .build()?;
475 }
476 }
477 Ok(())
478}
479
480pub struct SerializedRowGroupWriter<'a, W: Write> {
492 descr: SchemaDescPtr,
493 props: WriterPropertiesPtr,
494 buf: &'a mut TrackedWrite<W>,
495 total_rows_written: Option<u64>,
496 total_bytes_written: u64,
497 total_uncompressed_bytes: i64,
498 column_index: usize,
499 row_group_metadata: Option<RowGroupMetaDataPtr>,
500 column_chunks: Vec<ColumnChunkMetaData>,
501 bloom_filters: Vec<Option<Sbbf>>,
502 column_indexes: Vec<Option<ColumnIndex>>,
503 offset_indexes: Vec<Option<OffsetIndex>>,
504 row_group_index: i16,
505 file_offset: i64,
506 on_close: Option<OnCloseRowGroup<'a, W>>,
507 #[cfg(feature = "encryption")]
508 file_encryptor: Option<Arc<FileEncryptor>>,
509}
510
511impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> {
512 pub fn new(
521 schema_descr: SchemaDescPtr,
522 properties: WriterPropertiesPtr,
523 buf: &'a mut TrackedWrite<W>,
524 row_group_index: i16,
525 on_close: Option<OnCloseRowGroup<'a, W>>,
526 ) -> Self {
527 let num_columns = schema_descr.num_columns();
528 let file_offset = buf.bytes_written() as i64;
529 Self {
530 buf,
531 row_group_index,
532 file_offset,
533 on_close,
534 total_rows_written: None,
535 descr: schema_descr,
536 props: properties,
537 column_index: 0,
538 row_group_metadata: None,
539 column_chunks: Vec::with_capacity(num_columns),
540 bloom_filters: Vec::with_capacity(num_columns),
541 column_indexes: Vec::with_capacity(num_columns),
542 offset_indexes: Vec::with_capacity(num_columns),
543 total_bytes_written: 0,
544 total_uncompressed_bytes: 0,
545 #[cfg(feature = "encryption")]
546 file_encryptor: None,
547 }
548 }
549
550 #[cfg(feature = "encryption")]
551 pub(crate) fn with_file_encryptor(
553 mut self,
554 file_encryptor: Option<Arc<FileEncryptor>>,
555 ) -> Self {
556 self.file_encryptor = file_encryptor;
557 self
558 }
559
560 fn next_column_desc(&mut self) -> Option<ColumnDescPtr> {
562 let ret = self.descr.columns().get(self.column_index)?.clone();
563 self.column_index += 1;
564 Some(ret)
565 }
566
567 fn get_on_close(&mut self) -> (&mut TrackedWrite<W>, OnCloseColumnChunk<'_>) {
569 let total_bytes_written = &mut self.total_bytes_written;
570 let total_uncompressed_bytes = &mut self.total_uncompressed_bytes;
571 let total_rows_written = &mut self.total_rows_written;
572 let column_chunks = &mut self.column_chunks;
573 let column_indexes = &mut self.column_indexes;
574 let offset_indexes = &mut self.offset_indexes;
575 let bloom_filters = &mut self.bloom_filters;
576
577 let on_close = |r: ColumnCloseResult| {
578 *total_bytes_written += r.bytes_written;
580 *total_uncompressed_bytes += r.metadata.uncompressed_size();
581 column_chunks.push(r.metadata);
582 bloom_filters.push(r.bloom_filter);
583 column_indexes.push(r.column_index);
584 offset_indexes.push(r.offset_index);
585
586 if let Some(rows) = *total_rows_written {
587 if rows != r.rows_written {
588 return Err(general_err!(
589 "Incorrect number of rows, expected {} != {} rows",
590 rows,
591 r.rows_written
592 ));
593 }
594 } else {
595 *total_rows_written = Some(r.rows_written);
596 }
597
598 Ok(())
599 };
600 (self.buf, Box::new(on_close))
601 }
602
603 pub(crate) fn next_column_with_factory<'b, F, C>(&'b mut self, factory: F) -> Result<Option<C>>
606 where
607 F: FnOnce(
608 ColumnDescPtr,
609 WriterPropertiesPtr,
610 Box<dyn PageWriter + 'b>,
611 OnCloseColumnChunk<'b>,
612 ) -> Result<C>,
613 {
614 self.assert_previous_writer_closed()?;
615
616 let encryptor_context = self.get_page_encryptor_context();
617
618 Ok(match self.next_column_desc() {
619 Some(column) => {
620 let props = self.props.clone();
621 let (buf, on_close) = self.get_on_close();
622
623 let page_writer = SerializedPageWriter::new(buf);
624 let page_writer =
625 Self::set_page_writer_encryptor(&column, encryptor_context, page_writer)?;
626
627 Some(factory(
628 column,
629 props,
630 Box::new(page_writer),
631 Box::new(on_close),
632 )?)
633 }
634 None => None,
635 })
636 }
637
638 pub fn next_column(&mut self) -> Result<Option<SerializedColumnWriter<'_>>> {
642 self.next_column_with_factory(|descr, props, page_writer, on_close| {
643 let column_writer = get_column_writer(descr, props, page_writer);
644 Ok(SerializedColumnWriter::new(column_writer, Some(on_close)))
645 })
646 }
647
648 pub fn append_column<R: ChunkReader>(
655 &mut self,
656 reader: &R,
657 mut close: ColumnCloseResult,
658 ) -> Result<()> {
659 self.assert_previous_writer_closed()?;
660 let desc = self
661 .next_column_desc()
662 .ok_or_else(|| general_err!("exhausted columns in SerializedRowGroupWriter"))?;
663
664 let metadata = close.metadata;
665
666 if metadata.column_descr() != desc.as_ref() {
667 return Err(general_err!(
668 "column descriptor mismatch, expected {:?} got {:?}",
669 desc,
670 metadata.column_descr()
671 ));
672 }
673
674 let src_dictionary_offset = metadata.dictionary_page_offset();
675 let src_data_offset = metadata.data_page_offset();
676 let src_offset = src_dictionary_offset.unwrap_or(src_data_offset);
677 let src_length = metadata.compressed_size();
678
679 let write_offset = self.buf.bytes_written();
680 let mut read = reader.get_read(src_offset as _)?.take(src_length as _);
681 let write_length = std::io::copy(&mut read, &mut self.buf)?;
682
683 if src_length as u64 != write_length {
684 return Err(general_err!(
685 "Failed to splice column data, expected {read_length} got {write_length}"
686 ));
687 }
688
689 let map_offset = |x| x - src_offset + write_offset as i64;
690 let mut builder = ColumnChunkMetaData::builder(metadata.column_descr_ptr())
691 .set_compression(metadata.compression())
692 .set_encodings(metadata.encodings().clone())
693 .set_total_compressed_size(metadata.compressed_size())
694 .set_total_uncompressed_size(metadata.uncompressed_size())
695 .set_num_values(metadata.num_values())
696 .set_data_page_offset(map_offset(src_data_offset))
697 .set_dictionary_page_offset(src_dictionary_offset.map(map_offset))
698 .set_unencoded_byte_array_data_bytes(metadata.unencoded_byte_array_data_bytes());
699
700 if let Some(rep_hist) = metadata.repetition_level_histogram() {
701 builder = builder.set_repetition_level_histogram(Some(rep_hist.clone()))
702 }
703 if let Some(def_hist) = metadata.definition_level_histogram() {
704 builder = builder.set_definition_level_histogram(Some(def_hist.clone()))
705 }
706 if let Some(statistics) = metadata.statistics() {
707 builder = builder.set_statistics(statistics.clone())
708 }
709 if let Some(page_encoding_stats) = metadata.page_encoding_stats() {
710 builder = builder.set_page_encoding_stats(page_encoding_stats.clone())
711 }
712 builder = self.set_column_crypto_metadata(builder, &metadata);
713 close.metadata = builder.build()?;
714
715 if let Some(offsets) = close.offset_index.as_mut() {
716 for location in &mut offsets.page_locations {
717 location.offset = map_offset(location.offset)
718 }
719 }
720
721 let (_, on_close) = self.get_on_close();
722 on_close(close)
723 }
724
725 pub fn close(mut self) -> Result<RowGroupMetaDataPtr> {
727 if self.row_group_metadata.is_none() {
728 self.assert_previous_writer_closed()?;
729
730 let column_chunks = std::mem::take(&mut self.column_chunks);
731 let row_group_metadata = RowGroupMetaData::builder(self.descr.clone())
732 .set_column_metadata(column_chunks)
733 .set_total_byte_size(self.total_uncompressed_bytes)
734 .set_num_rows(self.total_rows_written.unwrap_or(0) as i64)
735 .set_sorting_columns(self.props.sorting_columns().cloned())
736 .set_ordinal(self.row_group_index)
737 .set_file_offset(self.file_offset)
738 .build()?;
739
740 self.row_group_metadata = Some(Arc::new(row_group_metadata.clone()));
741
742 if let Some(on_close) = self.on_close.take() {
743 on_close(
744 self.buf,
745 row_group_metadata,
746 self.bloom_filters,
747 self.column_indexes,
748 self.offset_indexes,
749 )?
750 }
751 }
752
753 let metadata = self.row_group_metadata.as_ref().unwrap().clone();
754 Ok(metadata)
755 }
756
757 #[cfg(feature = "encryption")]
759 fn set_column_crypto_metadata(
760 &self,
761 builder: ColumnChunkMetaDataBuilder,
762 metadata: &ColumnChunkMetaData,
763 ) -> ColumnChunkMetaDataBuilder {
764 if let Some(file_encryptor) = self.file_encryptor.as_ref() {
765 builder.set_column_crypto_metadata(get_column_crypto_metadata(
766 file_encryptor.properties(),
767 &metadata.column_descr_ptr(),
768 ))
769 } else {
770 builder
771 }
772 }
773
774 #[cfg(feature = "encryption")]
776 fn get_page_encryptor_context(&self) -> PageEncryptorContext {
777 PageEncryptorContext {
778 file_encryptor: self.file_encryptor.clone(),
779 row_group_index: self.row_group_index as usize,
780 column_index: self.column_index,
781 }
782 }
783
784 #[cfg(feature = "encryption")]
786 fn set_page_writer_encryptor<'b>(
787 column: &ColumnDescPtr,
788 context: PageEncryptorContext,
789 page_writer: SerializedPageWriter<'b, W>,
790 ) -> Result<SerializedPageWriter<'b, W>> {
791 let page_encryptor = PageEncryptor::create_if_column_encrypted(
792 &context.file_encryptor,
793 context.row_group_index,
794 context.column_index,
795 &column.path().string(),
796 )?;
797
798 Ok(page_writer.with_page_encryptor(page_encryptor))
799 }
800
801 #[cfg(not(feature = "encryption"))]
803 fn set_column_crypto_metadata(
804 &self,
805 builder: ColumnChunkMetaDataBuilder,
806 _metadata: &ColumnChunkMetaData,
807 ) -> ColumnChunkMetaDataBuilder {
808 builder
809 }
810
811 #[cfg(not(feature = "encryption"))]
812 fn get_page_encryptor_context(&self) -> PageEncryptorContext {
813 PageEncryptorContext {}
814 }
815
816 #[cfg(not(feature = "encryption"))]
818 fn set_page_writer_encryptor<'b>(
819 _column: &ColumnDescPtr,
820 _context: PageEncryptorContext,
821 page_writer: SerializedPageWriter<'b, W>,
822 ) -> Result<SerializedPageWriter<'b, W>> {
823 Ok(page_writer)
824 }
825
826 #[inline]
827 fn assert_previous_writer_closed(&self) -> Result<()> {
828 if self.column_index != self.column_chunks.len() {
829 Err(general_err!("Previous column writer was not closed"))
830 } else {
831 Ok(())
832 }
833 }
834}
835
836#[cfg(feature = "encryption")]
838struct PageEncryptorContext {
839 file_encryptor: Option<Arc<FileEncryptor>>,
840 row_group_index: usize,
841 column_index: usize,
842}
843
844#[cfg(not(feature = "encryption"))]
845struct PageEncryptorContext {}
846
847pub struct SerializedColumnWriter<'a> {
849 inner: ColumnWriter<'a>,
850 on_close: Option<OnCloseColumnChunk<'a>>,
851}
852
853impl<'a> SerializedColumnWriter<'a> {
854 pub fn new(inner: ColumnWriter<'a>, on_close: Option<OnCloseColumnChunk<'a>>) -> Self {
857 Self { inner, on_close }
858 }
859
860 pub fn untyped(&mut self) -> &mut ColumnWriter<'a> {
862 &mut self.inner
863 }
864
865 pub fn typed<T: DataType>(&mut self) -> &mut ColumnWriterImpl<'a, T> {
867 get_typed_column_writer_mut(&mut self.inner)
868 }
869
870 pub fn close(mut self) -> Result<()> {
872 let r = self.inner.close()?;
873 if let Some(on_close) = self.on_close.take() {
874 on_close(r)?
875 }
876
877 Ok(())
878 }
879}
880
881pub struct SerializedPageWriter<'a, W: Write> {
886 sink: &'a mut TrackedWrite<W>,
887 #[cfg(feature = "encryption")]
888 page_encryptor: Option<PageEncryptor>,
889}
890
891impl<'a, W: Write> SerializedPageWriter<'a, W> {
892 pub fn new(sink: &'a mut TrackedWrite<W>) -> Self {
894 Self {
895 sink,
896 #[cfg(feature = "encryption")]
897 page_encryptor: None,
898 }
899 }
900
901 #[inline]
904 fn serialize_page_header(&mut self, header: parquet::PageHeader) -> Result<usize> {
905 let start_pos = self.sink.bytes_written();
906 match self.page_encryptor_and_sink_mut() {
907 Some((page_encryptor, sink)) => {
908 page_encryptor.encrypt_page_header(&header, sink)?;
909 }
910 None => {
911 let mut protocol = TCompactOutputProtocol::new(&mut self.sink);
912 header.write_to_out_protocol(&mut protocol)?;
913 }
914 }
915 Ok(self.sink.bytes_written() - start_pos)
916 }
917}
918
919#[cfg(feature = "encryption")]
920impl<'a, W: Write> SerializedPageWriter<'a, W> {
921 fn with_page_encryptor(mut self, page_encryptor: Option<PageEncryptor>) -> Self {
923 self.page_encryptor = page_encryptor;
924 self
925 }
926
927 fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
928 self.page_encryptor.as_mut()
929 }
930
931 fn page_encryptor_and_sink_mut(
932 &mut self,
933 ) -> Option<(&mut PageEncryptor, &mut &'a mut TrackedWrite<W>)> {
934 self.page_encryptor.as_mut().map(|pe| (pe, &mut self.sink))
935 }
936}
937
938#[cfg(not(feature = "encryption"))]
939impl<'a, W: Write> SerializedPageWriter<'a, W> {
940 fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
941 None
942 }
943
944 fn page_encryptor_and_sink_mut(
945 &mut self,
946 ) -> Option<(&mut PageEncryptor, &mut &'a mut TrackedWrite<W>)> {
947 None
948 }
949}
950
951impl<W: Write + Send> PageWriter for SerializedPageWriter<'_, W> {
952 fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
953 let page = match self.page_encryptor_mut() {
954 Some(page_encryptor) => page_encryptor.encrypt_compressed_page(page)?,
955 None => page,
956 };
957
958 let page_type = page.page_type();
959 let start_pos = self.sink.bytes_written() as u64;
960
961 let page_header = page.to_thrift_header();
962 let header_size = self.serialize_page_header(page_header)?;
963
964 self.sink.write_all(page.data())?;
965
966 let mut spec = PageWriteSpec::new();
967 spec.page_type = page_type;
968 spec.uncompressed_size = page.uncompressed_size() + header_size;
969 spec.compressed_size = page.compressed_size() + header_size;
970 spec.offset = start_pos;
971 spec.bytes_written = self.sink.bytes_written() as u64 - start_pos;
972 spec.num_values = page.num_values();
973
974 if let Some(page_encryptor) = self.page_encryptor_mut() {
975 if page.compressed_page().is_data_page() {
976 page_encryptor.increment_page();
977 }
978 }
979 Ok(spec)
980 }
981
982 fn close(&mut self) -> Result<()> {
983 self.sink.flush()?;
984 Ok(())
985 }
986}
987
988#[cfg(feature = "encryption")]
991pub(crate) fn get_file_magic(
992 file_encryption_properties: Option<&FileEncryptionProperties>,
993) -> &'static [u8; 4] {
994 match file_encryption_properties.as_ref() {
995 Some(encryption_properties) if encryption_properties.encrypt_footer() => {
996 &PARQUET_MAGIC_ENCR_FOOTER
997 }
998 _ => &PARQUET_MAGIC,
999 }
1000}
1001
1002#[cfg(not(feature = "encryption"))]
1003pub(crate) fn get_file_magic() -> &'static [u8; 4] {
1004 &PARQUET_MAGIC
1005}
1006
1007#[cfg(test)]
1008mod tests {
1009 use super::*;
1010
1011 #[cfg(feature = "arrow")]
1012 use arrow_array::RecordBatchReader;
1013 use bytes::Bytes;
1014 use std::fs::File;
1015
1016 #[cfg(feature = "arrow")]
1017 use crate::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
1018 #[cfg(feature = "arrow")]
1019 use crate::arrow::ArrowWriter;
1020 use crate::basic::{
1021 ColumnOrder, Compression, ConvertedType, Encoding, LogicalType, Repetition, SortOrder, Type,
1022 };
1023 use crate::column::page::{Page, PageReader};
1024 use crate::column::reader::get_typed_column_reader;
1025 use crate::compression::{create_codec, Codec, CodecOptionsBuilder};
1026 use crate::data_type::{BoolType, ByteArrayType, Int32Type};
1027 use crate::file::page_index::index::Index;
1028 use crate::file::properties::EnabledStatistics;
1029 use crate::file::serialized_reader::ReadOptionsBuilder;
1030 use crate::file::{
1031 properties::{ReaderProperties, WriterProperties, WriterVersion},
1032 reader::{FileReader, SerializedFileReader, SerializedPageReader},
1033 statistics::{from_thrift, to_thrift, Statistics},
1034 };
1035 use crate::format::SortingColumn;
1036 use crate::record::{Row, RowAccessor};
1037 use crate::schema::parser::parse_message_type;
1038 use crate::schema::types;
1039 use crate::schema::types::{ColumnDescriptor, ColumnPath};
1040 use crate::util::test_common::rand_gen::RandGen;
1041
1042 #[test]
1043 fn test_row_group_writer_error_not_all_columns_written() {
1044 let file = tempfile::tempfile().unwrap();
1045 let schema = Arc::new(
1046 types::Type::group_type_builder("schema")
1047 .with_fields(vec![Arc::new(
1048 types::Type::primitive_type_builder("col1", Type::INT32)
1049 .build()
1050 .unwrap(),
1051 )])
1052 .build()
1053 .unwrap(),
1054 );
1055 let props = Default::default();
1056 let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
1057 let row_group_writer = writer.next_row_group().unwrap();
1058 let res = row_group_writer.close();
1059 assert!(res.is_err());
1060 if let Err(err) = res {
1061 assert_eq!(
1062 format!("{err}"),
1063 "Parquet error: Column length mismatch: 1 != 0"
1064 );
1065 }
1066 }
1067
1068 #[test]
1069 fn test_row_group_writer_num_records_mismatch() {
1070 let file = tempfile::tempfile().unwrap();
1071 let schema = Arc::new(
1072 types::Type::group_type_builder("schema")
1073 .with_fields(vec![
1074 Arc::new(
1075 types::Type::primitive_type_builder("col1", Type::INT32)
1076 .with_repetition(Repetition::REQUIRED)
1077 .build()
1078 .unwrap(),
1079 ),
1080 Arc::new(
1081 types::Type::primitive_type_builder("col2", Type::INT32)
1082 .with_repetition(Repetition::REQUIRED)
1083 .build()
1084 .unwrap(),
1085 ),
1086 ])
1087 .build()
1088 .unwrap(),
1089 );
1090 let props = Default::default();
1091 let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
1092 let mut row_group_writer = writer.next_row_group().unwrap();
1093
1094 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
1095 col_writer
1096 .typed::<Int32Type>()
1097 .write_batch(&[1, 2, 3], None, None)
1098 .unwrap();
1099 col_writer.close().unwrap();
1100
1101 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
1102 col_writer
1103 .typed::<Int32Type>()
1104 .write_batch(&[1, 2], None, None)
1105 .unwrap();
1106
1107 let err = col_writer.close().unwrap_err();
1108 assert_eq!(
1109 err.to_string(),
1110 "Parquet error: Incorrect number of rows, expected 3 != 2 rows"
1111 );
1112 }
1113
1114 #[test]
1115 fn test_file_writer_empty_file() {
1116 let file = tempfile::tempfile().unwrap();
1117
1118 let schema = Arc::new(
1119 types::Type::group_type_builder("schema")
1120 .with_fields(vec![Arc::new(
1121 types::Type::primitive_type_builder("col1", Type::INT32)
1122 .build()
1123 .unwrap(),
1124 )])
1125 .build()
1126 .unwrap(),
1127 );
1128 let props = Default::default();
1129 let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1130 writer.close().unwrap();
1131
1132 let reader = SerializedFileReader::new(file).unwrap();
1133 assert_eq!(reader.get_row_iter(None).unwrap().count(), 0);
1134 }
1135
1136 #[test]
1137 fn test_file_writer_column_orders_populated() {
1138 let file = tempfile::tempfile().unwrap();
1139
1140 let schema = Arc::new(
1141 types::Type::group_type_builder("schema")
1142 .with_fields(vec![
1143 Arc::new(
1144 types::Type::primitive_type_builder("col1", Type::INT32)
1145 .build()
1146 .unwrap(),
1147 ),
1148 Arc::new(
1149 types::Type::primitive_type_builder("col2", Type::FIXED_LEN_BYTE_ARRAY)
1150 .with_converted_type(ConvertedType::INTERVAL)
1151 .with_length(12)
1152 .build()
1153 .unwrap(),
1154 ),
1155 Arc::new(
1156 types::Type::group_type_builder("nested")
1157 .with_repetition(Repetition::REQUIRED)
1158 .with_fields(vec![
1159 Arc::new(
1160 types::Type::primitive_type_builder(
1161 "col3",
1162 Type::FIXED_LEN_BYTE_ARRAY,
1163 )
1164 .with_logical_type(Some(LogicalType::Float16))
1165 .with_length(2)
1166 .build()
1167 .unwrap(),
1168 ),
1169 Arc::new(
1170 types::Type::primitive_type_builder("col4", Type::BYTE_ARRAY)
1171 .with_logical_type(Some(LogicalType::String))
1172 .build()
1173 .unwrap(),
1174 ),
1175 ])
1176 .build()
1177 .unwrap(),
1178 ),
1179 ])
1180 .build()
1181 .unwrap(),
1182 );
1183
1184 let props = Default::default();
1185 let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1186 writer.close().unwrap();
1187
1188 let reader = SerializedFileReader::new(file).unwrap();
1189
1190 let expected = vec![
1192 ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED),
1194 ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::UNDEFINED),
1196 ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED),
1198 ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::UNSIGNED),
1200 ];
1201 let actual = reader.metadata().file_metadata().column_orders();
1202
1203 assert!(actual.is_some());
1204 let actual = actual.unwrap();
1205 assert_eq!(*actual, expected);
1206 }
1207
1208 #[test]
1209 fn test_file_writer_with_metadata() {
1210 let file = tempfile::tempfile().unwrap();
1211
1212 let schema = Arc::new(
1213 types::Type::group_type_builder("schema")
1214 .with_fields(vec![Arc::new(
1215 types::Type::primitive_type_builder("col1", Type::INT32)
1216 .build()
1217 .unwrap(),
1218 )])
1219 .build()
1220 .unwrap(),
1221 );
1222 let props = Arc::new(
1223 WriterProperties::builder()
1224 .set_key_value_metadata(Some(vec![KeyValue::new(
1225 "key".to_string(),
1226 "value".to_string(),
1227 )]))
1228 .build(),
1229 );
1230 let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1231 writer.close().unwrap();
1232
1233 let reader = SerializedFileReader::new(file).unwrap();
1234 assert_eq!(
1235 reader
1236 .metadata()
1237 .file_metadata()
1238 .key_value_metadata()
1239 .to_owned()
1240 .unwrap()
1241 .len(),
1242 1
1243 );
1244 }
1245
1246 #[test]
1247 fn test_file_writer_v2_with_metadata() {
1248 let file = tempfile::tempfile().unwrap();
1249 let field_logical_type = Some(LogicalType::Integer {
1250 bit_width: 8,
1251 is_signed: false,
1252 });
1253 let field = Arc::new(
1254 types::Type::primitive_type_builder("col1", Type::INT32)
1255 .with_logical_type(field_logical_type.clone())
1256 .with_converted_type(field_logical_type.into())
1257 .build()
1258 .unwrap(),
1259 );
1260 let schema = Arc::new(
1261 types::Type::group_type_builder("schema")
1262 .with_fields(vec![field.clone()])
1263 .build()
1264 .unwrap(),
1265 );
1266 let props = Arc::new(
1267 WriterProperties::builder()
1268 .set_key_value_metadata(Some(vec![KeyValue::new(
1269 "key".to_string(),
1270 "value".to_string(),
1271 )]))
1272 .set_writer_version(WriterVersion::PARQUET_2_0)
1273 .build(),
1274 );
1275 let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1276 writer.close().unwrap();
1277
1278 let reader = SerializedFileReader::new(file).unwrap();
1279
1280 assert_eq!(
1281 reader
1282 .metadata()
1283 .file_metadata()
1284 .key_value_metadata()
1285 .to_owned()
1286 .unwrap()
1287 .len(),
1288 1
1289 );
1290
1291 let fields = reader.metadata().file_metadata().schema().get_fields();
1293 assert_eq!(fields.len(), 1);
1294 assert_eq!(fields[0], field);
1295 }
1296
1297 #[test]
1298 fn test_file_writer_with_sorting_columns_metadata() {
1299 let file = tempfile::tempfile().unwrap();
1300
1301 let schema = Arc::new(
1302 types::Type::group_type_builder("schema")
1303 .with_fields(vec![
1304 Arc::new(
1305 types::Type::primitive_type_builder("col1", Type::INT32)
1306 .build()
1307 .unwrap(),
1308 ),
1309 Arc::new(
1310 types::Type::primitive_type_builder("col2", Type::INT32)
1311 .build()
1312 .unwrap(),
1313 ),
1314 ])
1315 .build()
1316 .unwrap(),
1317 );
1318 let expected_result = Some(vec![SortingColumn {
1319 column_idx: 0,
1320 descending: false,
1321 nulls_first: true,
1322 }]);
1323 let props = Arc::new(
1324 WriterProperties::builder()
1325 .set_key_value_metadata(Some(vec![KeyValue::new(
1326 "key".to_string(),
1327 "value".to_string(),
1328 )]))
1329 .set_sorting_columns(expected_result.clone())
1330 .build(),
1331 );
1332 let mut writer =
1333 SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1334 let mut row_group_writer = writer.next_row_group().expect("get row group writer");
1335
1336 let col_writer = row_group_writer.next_column().unwrap().unwrap();
1337 col_writer.close().unwrap();
1338
1339 let col_writer = row_group_writer.next_column().unwrap().unwrap();
1340 col_writer.close().unwrap();
1341
1342 row_group_writer.close().unwrap();
1343 writer.close().unwrap();
1344
1345 let reader = SerializedFileReader::new(file).unwrap();
1346 let result: Vec<Option<&Vec<SortingColumn>>> = reader
1347 .metadata()
1348 .row_groups()
1349 .iter()
1350 .map(|f| f.sorting_columns())
1351 .collect();
1352 assert_eq!(expected_result.as_ref(), result[0]);
1354 }
1355
1356 #[test]
1357 fn test_file_writer_empty_row_groups() {
1358 let file = tempfile::tempfile().unwrap();
1359 test_file_roundtrip(file, vec![]);
1360 }
1361
1362 #[test]
1363 fn test_file_writer_single_row_group() {
1364 let file = tempfile::tempfile().unwrap();
1365 test_file_roundtrip(file, vec![vec![1, 2, 3, 4, 5]]);
1366 }
1367
1368 #[test]
1369 fn test_file_writer_multiple_row_groups() {
1370 let file = tempfile::tempfile().unwrap();
1371 test_file_roundtrip(
1372 file,
1373 vec![
1374 vec![1, 2, 3, 4, 5],
1375 vec![1, 2, 3],
1376 vec![1],
1377 vec![1, 2, 3, 4, 5, 6],
1378 ],
1379 );
1380 }
1381
1382 #[test]
1383 fn test_file_writer_multiple_large_row_groups() {
1384 let file = tempfile::tempfile().unwrap();
1385 test_file_roundtrip(
1386 file,
1387 vec![vec![123; 1024], vec![124; 1000], vec![125; 15], vec![]],
1388 );
1389 }
1390
1391 #[test]
1392 fn test_page_writer_data_pages() {
1393 let pages = vec![
1394 Page::DataPage {
1395 buf: Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]),
1396 num_values: 10,
1397 encoding: Encoding::DELTA_BINARY_PACKED,
1398 def_level_encoding: Encoding::RLE,
1399 rep_level_encoding: Encoding::RLE,
1400 statistics: Some(Statistics::int32(Some(1), Some(3), None, Some(7), true)),
1401 },
1402 Page::DataPageV2 {
1403 buf: Bytes::from(vec![4; 128]),
1404 num_values: 10,
1405 encoding: Encoding::DELTA_BINARY_PACKED,
1406 num_nulls: 2,
1407 num_rows: 12,
1408 def_levels_byte_len: 24,
1409 rep_levels_byte_len: 32,
1410 is_compressed: false,
1411 statistics: Some(Statistics::int32(Some(1), Some(3), None, Some(7), true)),
1412 },
1413 ];
1414
1415 test_page_roundtrip(&pages[..], Compression::SNAPPY, Type::INT32);
1416 test_page_roundtrip(&pages[..], Compression::UNCOMPRESSED, Type::INT32);
1417 }
1418
1419 #[test]
1420 fn test_page_writer_dict_pages() {
1421 let pages = vec![
1422 Page::DictionaryPage {
1423 buf: Bytes::from(vec![1, 2, 3, 4, 5]),
1424 num_values: 5,
1425 encoding: Encoding::RLE_DICTIONARY,
1426 is_sorted: false,
1427 },
1428 Page::DataPage {
1429 buf: Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]),
1430 num_values: 10,
1431 encoding: Encoding::DELTA_BINARY_PACKED,
1432 def_level_encoding: Encoding::RLE,
1433 rep_level_encoding: Encoding::RLE,
1434 statistics: Some(Statistics::int32(Some(1), Some(3), None, Some(7), true)),
1435 },
1436 Page::DataPageV2 {
1437 buf: Bytes::from(vec![4; 128]),
1438 num_values: 10,
1439 encoding: Encoding::DELTA_BINARY_PACKED,
1440 num_nulls: 2,
1441 num_rows: 12,
1442 def_levels_byte_len: 24,
1443 rep_levels_byte_len: 32,
1444 is_compressed: false,
1445 statistics: None,
1446 },
1447 ];
1448
1449 test_page_roundtrip(&pages[..], Compression::SNAPPY, Type::INT32);
1450 test_page_roundtrip(&pages[..], Compression::UNCOMPRESSED, Type::INT32);
1451 }
1452
1453 fn test_page_roundtrip(pages: &[Page], codec: Compression, physical_type: Type) {
1457 let mut compressed_pages = vec![];
1458 let mut total_num_values = 0i64;
1459 let codec_options = CodecOptionsBuilder::default()
1460 .set_backward_compatible_lz4(false)
1461 .build();
1462 let mut compressor = create_codec(codec, &codec_options).unwrap();
1463
1464 for page in pages {
1465 let uncompressed_len = page.buffer().len();
1466
1467 let compressed_page = match *page {
1468 Page::DataPage {
1469 ref buf,
1470 num_values,
1471 encoding,
1472 def_level_encoding,
1473 rep_level_encoding,
1474 ref statistics,
1475 } => {
1476 total_num_values += num_values as i64;
1477 let output_buf = compress_helper(compressor.as_mut(), buf);
1478
1479 Page::DataPage {
1480 buf: Bytes::from(output_buf),
1481 num_values,
1482 encoding,
1483 def_level_encoding,
1484 rep_level_encoding,
1485 statistics: from_thrift(physical_type, to_thrift(statistics.as_ref()))
1486 .unwrap(),
1487 }
1488 }
1489 Page::DataPageV2 {
1490 ref buf,
1491 num_values,
1492 encoding,
1493 num_nulls,
1494 num_rows,
1495 def_levels_byte_len,
1496 rep_levels_byte_len,
1497 ref statistics,
1498 ..
1499 } => {
1500 total_num_values += num_values as i64;
1501 let offset = (def_levels_byte_len + rep_levels_byte_len) as usize;
1502 let cmp_buf = compress_helper(compressor.as_mut(), &buf[offset..]);
1503 let mut output_buf = Vec::from(&buf[..offset]);
1504 output_buf.extend_from_slice(&cmp_buf[..]);
1505
1506 Page::DataPageV2 {
1507 buf: Bytes::from(output_buf),
1508 num_values,
1509 encoding,
1510 num_nulls,
1511 num_rows,
1512 def_levels_byte_len,
1513 rep_levels_byte_len,
1514 is_compressed: compressor.is_some(),
1515 statistics: from_thrift(physical_type, to_thrift(statistics.as_ref()))
1516 .unwrap(),
1517 }
1518 }
1519 Page::DictionaryPage {
1520 ref buf,
1521 num_values,
1522 encoding,
1523 is_sorted,
1524 } => {
1525 let output_buf = compress_helper(compressor.as_mut(), buf);
1526
1527 Page::DictionaryPage {
1528 buf: Bytes::from(output_buf),
1529 num_values,
1530 encoding,
1531 is_sorted,
1532 }
1533 }
1534 };
1535
1536 let compressed_page = CompressedPage::new(compressed_page, uncompressed_len);
1537 compressed_pages.push(compressed_page);
1538 }
1539
1540 let mut buffer: Vec<u8> = vec![];
1541 let mut result_pages: Vec<Page> = vec![];
1542 {
1543 let mut writer = TrackedWrite::new(&mut buffer);
1544 let mut page_writer = SerializedPageWriter::new(&mut writer);
1545
1546 for page in compressed_pages {
1547 page_writer.write_page(page).unwrap();
1548 }
1549 page_writer.close().unwrap();
1550 }
1551 {
1552 let reader = bytes::Bytes::from(buffer);
1553
1554 let t = types::Type::primitive_type_builder("t", physical_type)
1555 .build()
1556 .unwrap();
1557
1558 let desc = ColumnDescriptor::new(Arc::new(t), 0, 0, ColumnPath::new(vec![]));
1559 let meta = ColumnChunkMetaData::builder(Arc::new(desc))
1560 .set_compression(codec)
1561 .set_total_compressed_size(reader.len() as i64)
1562 .set_num_values(total_num_values)
1563 .build()
1564 .unwrap();
1565
1566 let props = ReaderProperties::builder()
1567 .set_backward_compatible_lz4(false)
1568 .build();
1569 let mut page_reader = SerializedPageReader::new_with_properties(
1570 Arc::new(reader),
1571 &meta,
1572 total_num_values as usize,
1573 None,
1574 Arc::new(props),
1575 )
1576 .unwrap();
1577
1578 while let Some(page) = page_reader.get_next_page().unwrap() {
1579 result_pages.push(page);
1580 }
1581 }
1582
1583 assert_eq!(result_pages.len(), pages.len());
1584 for i in 0..result_pages.len() {
1585 assert_page(&result_pages[i], &pages[i]);
1586 }
1587 }
1588
1589 fn compress_helper(compressor: Option<&mut Box<dyn Codec>>, data: &[u8]) -> Vec<u8> {
1591 let mut output_buf = vec![];
1592 if let Some(cmpr) = compressor {
1593 cmpr.compress(data, &mut output_buf).unwrap();
1594 } else {
1595 output_buf.extend_from_slice(data);
1596 }
1597 output_buf
1598 }
1599
1600 fn assert_page(left: &Page, right: &Page) {
1602 assert_eq!(left.page_type(), right.page_type());
1603 assert_eq!(&left.buffer(), &right.buffer());
1604 assert_eq!(left.num_values(), right.num_values());
1605 assert_eq!(left.encoding(), right.encoding());
1606 assert_eq!(to_thrift(left.statistics()), to_thrift(right.statistics()));
1607 }
1608
1609 fn test_roundtrip_i32<W, R>(
1611 file: W,
1612 data: Vec<Vec<i32>>,
1613 compression: Compression,
1614 ) -> crate::format::FileMetaData
1615 where
1616 W: Write + Send,
1617 R: ChunkReader + From<W> + 'static,
1618 {
1619 test_roundtrip::<W, R, Int32Type, _>(file, data, |r| r.get_int(0).unwrap(), compression)
1620 }
1621
1622 fn test_roundtrip<W, R, D, F>(
1625 mut file: W,
1626 data: Vec<Vec<D::T>>,
1627 value: F,
1628 compression: Compression,
1629 ) -> crate::format::FileMetaData
1630 where
1631 W: Write + Send,
1632 R: ChunkReader + From<W> + 'static,
1633 D: DataType,
1634 F: Fn(Row) -> D::T,
1635 {
1636 let schema = Arc::new(
1637 types::Type::group_type_builder("schema")
1638 .with_fields(vec![Arc::new(
1639 types::Type::primitive_type_builder("col1", D::get_physical_type())
1640 .with_repetition(Repetition::REQUIRED)
1641 .build()
1642 .unwrap(),
1643 )])
1644 .build()
1645 .unwrap(),
1646 );
1647 let props = Arc::new(
1648 WriterProperties::builder()
1649 .set_compression(compression)
1650 .build(),
1651 );
1652 let mut file_writer = SerializedFileWriter::new(&mut file, schema, props).unwrap();
1653 let mut rows: i64 = 0;
1654
1655 for (idx, subset) in data.iter().enumerate() {
1656 let row_group_file_offset = file_writer.buf.bytes_written();
1657 let mut row_group_writer = file_writer.next_row_group().unwrap();
1658 if let Some(mut writer) = row_group_writer.next_column().unwrap() {
1659 rows += writer
1660 .typed::<D>()
1661 .write_batch(&subset[..], None, None)
1662 .unwrap() as i64;
1663 writer.close().unwrap();
1664 }
1665 let last_group = row_group_writer.close().unwrap();
1666 let flushed = file_writer.flushed_row_groups();
1667 assert_eq!(flushed.len(), idx + 1);
1668 assert_eq!(Some(idx as i16), last_group.ordinal());
1669 assert_eq!(Some(row_group_file_offset as i64), last_group.file_offset());
1670 assert_eq!(&flushed[idx], last_group.as_ref());
1671 }
1672 let file_metadata = file_writer.close().unwrap();
1673
1674 let reader = SerializedFileReader::new(R::from(file)).unwrap();
1675 assert_eq!(reader.num_row_groups(), data.len());
1676 assert_eq!(
1677 reader.metadata().file_metadata().num_rows(),
1678 rows,
1679 "row count in metadata not equal to number of rows written"
1680 );
1681 for (i, item) in data.iter().enumerate().take(reader.num_row_groups()) {
1682 let row_group_reader = reader.get_row_group(i).unwrap();
1683 let iter = row_group_reader.get_row_iter(None).unwrap();
1684 let res: Vec<_> = iter.map(|row| row.unwrap()).map(&value).collect();
1685 let row_group_size = row_group_reader.metadata().total_byte_size();
1686 let uncompressed_size: i64 = row_group_reader
1687 .metadata()
1688 .columns()
1689 .iter()
1690 .map(|v| v.uncompressed_size())
1691 .sum();
1692 assert_eq!(row_group_size, uncompressed_size);
1693 assert_eq!(res, *item);
1694 }
1695 file_metadata
1696 }
1697
1698 fn test_file_roundtrip(file: File, data: Vec<Vec<i32>>) -> crate::format::FileMetaData {
1701 test_roundtrip_i32::<File, File>(file, data, Compression::UNCOMPRESSED)
1702 }
1703
1704 #[test]
1705 fn test_bytes_writer_empty_row_groups() {
1706 test_bytes_roundtrip(vec![], Compression::UNCOMPRESSED);
1707 }
1708
1709 #[test]
1710 fn test_bytes_writer_single_row_group() {
1711 test_bytes_roundtrip(vec![vec![1, 2, 3, 4, 5]], Compression::UNCOMPRESSED);
1712 }
1713
1714 #[test]
1715 fn test_bytes_writer_multiple_row_groups() {
1716 test_bytes_roundtrip(
1717 vec![
1718 vec![1, 2, 3, 4, 5],
1719 vec![1, 2, 3],
1720 vec![1],
1721 vec![1, 2, 3, 4, 5, 6],
1722 ],
1723 Compression::UNCOMPRESSED,
1724 );
1725 }
1726
1727 #[test]
1728 fn test_bytes_writer_single_row_group_compressed() {
1729 test_bytes_roundtrip(vec![vec![1, 2, 3, 4, 5]], Compression::SNAPPY);
1730 }
1731
1732 #[test]
1733 fn test_bytes_writer_multiple_row_groups_compressed() {
1734 test_bytes_roundtrip(
1735 vec![
1736 vec![1, 2, 3, 4, 5],
1737 vec![1, 2, 3],
1738 vec![1],
1739 vec![1, 2, 3, 4, 5, 6],
1740 ],
1741 Compression::SNAPPY,
1742 );
1743 }
1744
1745 fn test_bytes_roundtrip(data: Vec<Vec<i32>>, compression: Compression) {
1746 test_roundtrip_i32::<Vec<u8>, Bytes>(Vec::with_capacity(1024), data, compression);
1747 }
1748
1749 #[test]
1750 fn test_boolean_roundtrip() {
1751 let my_bool_values: Vec<_> = (0..2049).map(|idx| idx % 2 == 0).collect();
1752 test_roundtrip::<Vec<u8>, Bytes, BoolType, _>(
1753 Vec::with_capacity(1024),
1754 vec![my_bool_values],
1755 |r| r.get_bool(0).unwrap(),
1756 Compression::UNCOMPRESSED,
1757 );
1758 }
1759
1760 #[test]
1761 fn test_boolean_compressed_roundtrip() {
1762 let my_bool_values: Vec<_> = (0..2049).map(|idx| idx % 2 == 0).collect();
1763 test_roundtrip::<Vec<u8>, Bytes, BoolType, _>(
1764 Vec::with_capacity(1024),
1765 vec![my_bool_values],
1766 |r| r.get_bool(0).unwrap(),
1767 Compression::SNAPPY,
1768 );
1769 }
1770
1771 #[test]
1772 fn test_column_offset_index_file() {
1773 let file = tempfile::tempfile().unwrap();
1774 let file_metadata = test_file_roundtrip(file, vec![vec![1, 2, 3, 4, 5]]);
1775 file_metadata.row_groups.iter().for_each(|row_group| {
1776 row_group.columns.iter().for_each(|column_chunk| {
1777 assert_ne!(None, column_chunk.column_index_offset);
1778 assert_ne!(None, column_chunk.column_index_length);
1779
1780 assert_ne!(None, column_chunk.offset_index_offset);
1781 assert_ne!(None, column_chunk.offset_index_length);
1782 })
1783 });
1784 }
1785
1786 fn test_kv_metadata(initial_kv: Option<Vec<KeyValue>>, final_kv: Option<Vec<KeyValue>>) {
1787 let schema = Arc::new(
1788 types::Type::group_type_builder("schema")
1789 .with_fields(vec![Arc::new(
1790 types::Type::primitive_type_builder("col1", Type::INT32)
1791 .with_repetition(Repetition::REQUIRED)
1792 .build()
1793 .unwrap(),
1794 )])
1795 .build()
1796 .unwrap(),
1797 );
1798 let mut out = Vec::with_capacity(1024);
1799 let props = Arc::new(
1800 WriterProperties::builder()
1801 .set_key_value_metadata(initial_kv.clone())
1802 .build(),
1803 );
1804 let mut writer = SerializedFileWriter::new(&mut out, schema, props).unwrap();
1805 let mut row_group_writer = writer.next_row_group().unwrap();
1806 let column = row_group_writer.next_column().unwrap().unwrap();
1807 column.close().unwrap();
1808 row_group_writer.close().unwrap();
1809 if let Some(kvs) = &final_kv {
1810 for kv in kvs {
1811 writer.append_key_value_metadata(kv.clone())
1812 }
1813 }
1814 writer.close().unwrap();
1815
1816 let reader = SerializedFileReader::new(Bytes::from(out)).unwrap();
1817 let metadata = reader.metadata().file_metadata();
1818 let keys = metadata.key_value_metadata();
1819
1820 match (initial_kv, final_kv) {
1821 (Some(a), Some(b)) => {
1822 let keys = keys.unwrap();
1823 assert_eq!(keys.len(), a.len() + b.len());
1824 assert_eq!(&keys[..a.len()], a.as_slice());
1825 assert_eq!(&keys[a.len()..], b.as_slice());
1826 }
1827 (Some(v), None) => assert_eq!(keys.unwrap(), &v),
1828 (None, Some(v)) if !v.is_empty() => assert_eq!(keys.unwrap(), &v),
1829 _ => assert!(keys.is_none()),
1830 }
1831 }
1832
1833 #[test]
1834 fn test_append_metadata() {
1835 let kv1 = KeyValue::new("cupcakes".to_string(), "awesome".to_string());
1836 let kv2 = KeyValue::new("bingo".to_string(), "bongo".to_string());
1837
1838 test_kv_metadata(None, None);
1839 test_kv_metadata(Some(vec![kv1.clone()]), None);
1840 test_kv_metadata(None, Some(vec![kv2.clone()]));
1841 test_kv_metadata(Some(vec![kv1.clone()]), Some(vec![kv2.clone()]));
1842 test_kv_metadata(Some(vec![]), Some(vec![kv2]));
1843 test_kv_metadata(Some(vec![]), Some(vec![]));
1844 test_kv_metadata(Some(vec![kv1]), Some(vec![]));
1845 test_kv_metadata(None, Some(vec![]));
1846 }
1847
1848 #[test]
1849 fn test_backwards_compatible_statistics() {
1850 let message_type = "
1851 message test_schema {
1852 REQUIRED INT32 decimal1 (DECIMAL(8,2));
1853 REQUIRED INT32 i32 (INTEGER(32,true));
1854 REQUIRED INT32 u32 (INTEGER(32,false));
1855 }
1856 ";
1857
1858 let schema = Arc::new(parse_message_type(message_type).unwrap());
1859 let props = Default::default();
1860 let mut writer = SerializedFileWriter::new(vec![], schema, props).unwrap();
1861 let mut row_group_writer = writer.next_row_group().unwrap();
1862
1863 for _ in 0..3 {
1864 let mut writer = row_group_writer.next_column().unwrap().unwrap();
1865 writer
1866 .typed::<Int32Type>()
1867 .write_batch(&[1, 2, 3], None, None)
1868 .unwrap();
1869 writer.close().unwrap();
1870 }
1871 let metadata = row_group_writer.close().unwrap();
1872 writer.close().unwrap();
1873
1874 let thrift = metadata.to_thrift();
1875 let encoded_stats: Vec<_> = thrift
1876 .columns
1877 .into_iter()
1878 .map(|x| x.meta_data.unwrap().statistics.unwrap())
1879 .collect();
1880
1881 let s = &encoded_stats[0];
1883 assert_eq!(s.min.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1884 assert_eq!(s.max.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1885 assert_eq!(s.min_value.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1886 assert_eq!(s.max_value.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1887
1888 let s = &encoded_stats[1];
1890 assert_eq!(s.min.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1891 assert_eq!(s.max.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1892 assert_eq!(s.min_value.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1893 assert_eq!(s.max_value.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1894
1895 let s = &encoded_stats[2];
1897 assert_eq!(s.min.as_deref(), None);
1898 assert_eq!(s.max.as_deref(), None);
1899 assert_eq!(s.min_value.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1900 assert_eq!(s.max_value.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1901 }
1902
1903 #[test]
1904 fn test_spliced_write() {
1905 let message_type = "
1906 message test_schema {
1907 REQUIRED INT32 i32 (INTEGER(32,true));
1908 REQUIRED INT32 u32 (INTEGER(32,false));
1909 }
1910 ";
1911 let schema = Arc::new(parse_message_type(message_type).unwrap());
1912 let props = Arc::new(WriterProperties::builder().build());
1913
1914 let mut file = Vec::with_capacity(1024);
1915 let mut file_writer = SerializedFileWriter::new(&mut file, schema, props.clone()).unwrap();
1916
1917 let columns = file_writer.descr.columns();
1918 let mut column_state: Vec<(_, Option<ColumnCloseResult>)> = columns
1919 .iter()
1920 .map(|_| (TrackedWrite::new(Vec::with_capacity(1024)), None))
1921 .collect();
1922
1923 let mut column_state_slice = column_state.as_mut_slice();
1924 let mut column_writers = Vec::with_capacity(columns.len());
1925 for c in columns {
1926 let ((buf, out), tail) = column_state_slice.split_first_mut().unwrap();
1927 column_state_slice = tail;
1928
1929 let page_writer = Box::new(SerializedPageWriter::new(buf));
1930 let col_writer = get_column_writer(c.clone(), props.clone(), page_writer);
1931 column_writers.push(SerializedColumnWriter::new(
1932 col_writer,
1933 Some(Box::new(|on_close| {
1934 *out = Some(on_close);
1935 Ok(())
1936 })),
1937 ));
1938 }
1939
1940 let column_data = [[1, 2, 3, 4], [7, 3, 7, 3]];
1941
1942 for (writer, batch) in column_writers.iter_mut().zip(column_data) {
1944 let writer = writer.typed::<Int32Type>();
1945 writer.write_batch(&batch, None, None).unwrap();
1946 }
1947
1948 for writer in column_writers {
1950 writer.close().unwrap()
1951 }
1952
1953 let mut row_group_writer = file_writer.next_row_group().unwrap();
1955 for (write, close) in column_state {
1956 let buf = Bytes::from(write.into_inner().unwrap());
1957 row_group_writer
1958 .append_column(&buf, close.unwrap())
1959 .unwrap();
1960 }
1961 row_group_writer.close().unwrap();
1962 file_writer.close().unwrap();
1963
1964 let file = Bytes::from(file);
1966 let test_read = |reader: SerializedFileReader<Bytes>| {
1967 let row_group = reader.get_row_group(0).unwrap();
1968
1969 let mut out = Vec::with_capacity(4);
1970 let c1 = row_group.get_column_reader(0).unwrap();
1971 let mut c1 = get_typed_column_reader::<Int32Type>(c1);
1972 c1.read_records(4, None, None, &mut out).unwrap();
1973 assert_eq!(out, column_data[0]);
1974
1975 out.clear();
1976
1977 let c2 = row_group.get_column_reader(1).unwrap();
1978 let mut c2 = get_typed_column_reader::<Int32Type>(c2);
1979 c2.read_records(4, None, None, &mut out).unwrap();
1980 assert_eq!(out, column_data[1]);
1981 };
1982
1983 let reader = SerializedFileReader::new(file.clone()).unwrap();
1984 test_read(reader);
1985
1986 let options = ReadOptionsBuilder::new().with_page_index().build();
1987 let reader = SerializedFileReader::new_with_options(file, options).unwrap();
1988 test_read(reader);
1989 }
1990
1991 #[test]
1992 fn test_disabled_statistics() {
1993 let message_type = "
1994 message test_schema {
1995 REQUIRED INT32 a;
1996 REQUIRED INT32 b;
1997 }
1998 ";
1999 let schema = Arc::new(parse_message_type(message_type).unwrap());
2000 let props = WriterProperties::builder()
2001 .set_statistics_enabled(EnabledStatistics::None)
2002 .set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
2003 .set_offset_index_disabled(true) .build();
2005 let mut file = Vec::with_capacity(1024);
2006 let mut file_writer =
2007 SerializedFileWriter::new(&mut file, schema, Arc::new(props)).unwrap();
2008
2009 let mut row_group_writer = file_writer.next_row_group().unwrap();
2010 let mut a_writer = row_group_writer.next_column().unwrap().unwrap();
2011 let col_writer = a_writer.typed::<Int32Type>();
2012 col_writer.write_batch(&[1, 2, 3], None, None).unwrap();
2013 a_writer.close().unwrap();
2014
2015 let mut b_writer = row_group_writer.next_column().unwrap().unwrap();
2016 let col_writer = b_writer.typed::<Int32Type>();
2017 col_writer.write_batch(&[4, 5, 6], None, None).unwrap();
2018 b_writer.close().unwrap();
2019 row_group_writer.close().unwrap();
2020
2021 let metadata = file_writer.finish().unwrap();
2022 assert_eq!(metadata.row_groups.len(), 1);
2023 let row_group = &metadata.row_groups[0];
2024 assert_eq!(row_group.columns.len(), 2);
2025 assert!(row_group.columns[0].offset_index_offset.is_some());
2027 assert!(row_group.columns[0].column_index_offset.is_some());
2028 assert!(row_group.columns[1].offset_index_offset.is_some());
2030 assert!(row_group.columns[1].column_index_offset.is_none());
2031
2032 let err = file_writer.next_row_group().err().unwrap().to_string();
2033 assert_eq!(err, "Parquet error: SerializedFileWriter already finished");
2034
2035 drop(file_writer);
2036
2037 let options = ReadOptionsBuilder::new().with_page_index().build();
2038 let reader = SerializedFileReader::new_with_options(Bytes::from(file), options).unwrap();
2039
2040 let offset_index = reader.metadata().offset_index().unwrap();
2041 assert_eq!(offset_index.len(), 1); assert_eq!(offset_index[0].len(), 2); let column_index = reader.metadata().column_index().unwrap();
2045 assert_eq!(column_index.len(), 1); assert_eq!(column_index[0].len(), 2); let a_idx = &column_index[0][0];
2049 assert!(matches!(a_idx, Index::INT32(_)), "{a_idx:?}");
2050 let b_idx = &column_index[0][1];
2051 assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
2052 }
2053
2054 #[test]
2055 fn test_byte_array_size_statistics() {
2056 let message_type = "
2057 message test_schema {
2058 OPTIONAL BYTE_ARRAY a (UTF8);
2059 }
2060 ";
2061 let schema = Arc::new(parse_message_type(message_type).unwrap());
2062 let data = ByteArrayType::gen_vec(32, 7);
2063 let def_levels = [1, 1, 1, 1, 0, 1, 0, 1, 0, 1];
2064 let unenc_size: i64 = data.iter().map(|x| x.len() as i64).sum();
2065 let file: File = tempfile::tempfile().unwrap();
2066 let props = Arc::new(
2067 WriterProperties::builder()
2068 .set_statistics_enabled(EnabledStatistics::Page)
2069 .build(),
2070 );
2071
2072 let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
2073 let mut row_group_writer = writer.next_row_group().unwrap();
2074
2075 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
2076 col_writer
2077 .typed::<ByteArrayType>()
2078 .write_batch(&data, Some(&def_levels), None)
2079 .unwrap();
2080 col_writer.close().unwrap();
2081 row_group_writer.close().unwrap();
2082 let file_metadata = writer.close().unwrap();
2083
2084 assert_eq!(file_metadata.row_groups.len(), 1);
2085 assert_eq!(file_metadata.row_groups[0].columns.len(), 1);
2086 assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
2087
2088 let check_def_hist = |def_hist: &[i64]| {
2089 assert_eq!(def_hist.len(), 2);
2090 assert_eq!(def_hist[0], 3);
2091 assert_eq!(def_hist[1], 7);
2092 };
2093
2094 assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
2095 let meta_data = file_metadata.row_groups[0].columns[0]
2096 .meta_data
2097 .as_ref()
2098 .unwrap();
2099 assert!(meta_data.size_statistics.is_some());
2100 let size_stats = meta_data.size_statistics.as_ref().unwrap();
2101
2102 assert!(size_stats.repetition_level_histogram.is_none());
2103 assert!(size_stats.definition_level_histogram.is_some());
2104 assert!(size_stats.unencoded_byte_array_data_bytes.is_some());
2105 assert_eq!(
2106 unenc_size,
2107 size_stats.unencoded_byte_array_data_bytes.unwrap()
2108 );
2109 check_def_hist(size_stats.definition_level_histogram.as_ref().unwrap());
2110
2111 let options = ReadOptionsBuilder::new().with_page_index().build();
2113 let reader = SerializedFileReader::new_with_options(file, options).unwrap();
2114
2115 let rfile_metadata = reader.metadata().file_metadata();
2116 assert_eq!(rfile_metadata.num_rows(), file_metadata.num_rows);
2117 assert_eq!(reader.num_row_groups(), 1);
2118 let rowgroup = reader.get_row_group(0).unwrap();
2119 assert_eq!(rowgroup.num_columns(), 1);
2120 let column = rowgroup.metadata().column(0);
2121 assert!(column.definition_level_histogram().is_some());
2122 assert!(column.repetition_level_histogram().is_none());
2123 assert!(column.unencoded_byte_array_data_bytes().is_some());
2124 check_def_hist(column.definition_level_histogram().unwrap().values());
2125 assert_eq!(
2126 unenc_size,
2127 column.unencoded_byte_array_data_bytes().unwrap()
2128 );
2129
2130 assert!(reader.metadata().column_index().is_some());
2132 let column_index = reader.metadata().column_index().unwrap();
2133 assert_eq!(column_index.len(), 1);
2134 assert_eq!(column_index[0].len(), 1);
2135 let col_idx = if let Index::BYTE_ARRAY(index) = &column_index[0][0] {
2136 assert_eq!(index.indexes.len(), 1);
2137 &index.indexes[0]
2138 } else {
2139 unreachable!()
2140 };
2141
2142 assert!(col_idx.repetition_level_histogram().is_none());
2143 assert!(col_idx.definition_level_histogram().is_some());
2144 check_def_hist(col_idx.definition_level_histogram().unwrap().values());
2145
2146 assert!(reader.metadata().offset_index().is_some());
2147 let offset_index = reader.metadata().offset_index().unwrap();
2148 assert_eq!(offset_index.len(), 1);
2149 assert_eq!(offset_index[0].len(), 1);
2150 assert!(offset_index[0][0].unencoded_byte_array_data_bytes.is_some());
2151 let page_sizes = offset_index[0][0]
2152 .unencoded_byte_array_data_bytes
2153 .as_ref()
2154 .unwrap();
2155 assert_eq!(page_sizes.len(), 1);
2156 assert_eq!(page_sizes[0], unenc_size);
2157 }
2158
2159 #[test]
2160 fn test_too_many_rowgroups() {
2161 let message_type = "
2162 message test_schema {
2163 REQUIRED BYTE_ARRAY a (UTF8);
2164 }
2165 ";
2166 let schema = Arc::new(parse_message_type(message_type).unwrap());
2167 let file: File = tempfile::tempfile().unwrap();
2168 let props = Arc::new(
2169 WriterProperties::builder()
2170 .set_statistics_enabled(EnabledStatistics::None)
2171 .set_max_row_group_size(1)
2172 .build(),
2173 );
2174 let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
2175
2176 for i in 0..0x8001 {
2178 match writer.next_row_group() {
2179 Ok(mut row_group_writer) => {
2180 assert_ne!(i, 0x8000);
2181 let col_writer = row_group_writer.next_column().unwrap().unwrap();
2182 col_writer.close().unwrap();
2183 row_group_writer.close().unwrap();
2184 }
2185 Err(e) => {
2186 assert_eq!(i, 0x8000);
2187 assert_eq!(
2188 e.to_string(),
2189 "Parquet error: Parquet does not support more than 32767 row groups per file (currently: 32768)"
2190 );
2191 }
2192 }
2193 }
2194 writer.close().unwrap();
2195 }
2196
2197 #[test]
2198 fn test_size_statistics_with_repetition_and_nulls() {
2199 let message_type = "
2200 message test_schema {
2201 OPTIONAL group i32_list (LIST) {
2202 REPEATED group list {
2203 OPTIONAL INT32 element;
2204 }
2205 }
2206 }
2207 ";
2208 let schema = Arc::new(parse_message_type(message_type).unwrap());
2215 let data = [1, 2, 4, 7, 8, 9, 10];
2216 let def_levels = [3, 3, 0, 3, 2, 1, 3, 3, 3, 3];
2217 let rep_levels = [0, 1, 0, 0, 1, 0, 0, 1, 1, 1];
2218 let file = tempfile::tempfile().unwrap();
2219 let props = Arc::new(
2220 WriterProperties::builder()
2221 .set_statistics_enabled(EnabledStatistics::Page)
2222 .build(),
2223 );
2224 let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
2225 let mut row_group_writer = writer.next_row_group().unwrap();
2226
2227 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
2228 col_writer
2229 .typed::<Int32Type>()
2230 .write_batch(&data, Some(&def_levels), Some(&rep_levels))
2231 .unwrap();
2232 col_writer.close().unwrap();
2233 row_group_writer.close().unwrap();
2234 let file_metadata = writer.close().unwrap();
2235
2236 assert_eq!(file_metadata.row_groups.len(), 1);
2237 assert_eq!(file_metadata.row_groups[0].columns.len(), 1);
2238 assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
2239
2240 let check_def_hist = |def_hist: &[i64]| {
2241 assert_eq!(def_hist.len(), 4);
2242 assert_eq!(def_hist[0], 1);
2243 assert_eq!(def_hist[1], 1);
2244 assert_eq!(def_hist[2], 1);
2245 assert_eq!(def_hist[3], 7);
2246 };
2247
2248 let check_rep_hist = |rep_hist: &[i64]| {
2249 assert_eq!(rep_hist.len(), 2);
2250 assert_eq!(rep_hist[0], 5);
2251 assert_eq!(rep_hist[1], 5);
2252 };
2253
2254 assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
2257 let meta_data = file_metadata.row_groups[0].columns[0]
2258 .meta_data
2259 .as_ref()
2260 .unwrap();
2261 assert!(meta_data.size_statistics.is_some());
2262 let size_stats = meta_data.size_statistics.as_ref().unwrap();
2263 assert!(size_stats.repetition_level_histogram.is_some());
2264 assert!(size_stats.definition_level_histogram.is_some());
2265 assert!(size_stats.unencoded_byte_array_data_bytes.is_none());
2266 check_def_hist(size_stats.definition_level_histogram.as_ref().unwrap());
2267 check_rep_hist(size_stats.repetition_level_histogram.as_ref().unwrap());
2268
2269 let options = ReadOptionsBuilder::new().with_page_index().build();
2271 let reader = SerializedFileReader::new_with_options(file, options).unwrap();
2272
2273 let rfile_metadata = reader.metadata().file_metadata();
2274 assert_eq!(rfile_metadata.num_rows(), file_metadata.num_rows);
2275 assert_eq!(reader.num_row_groups(), 1);
2276 let rowgroup = reader.get_row_group(0).unwrap();
2277 assert_eq!(rowgroup.num_columns(), 1);
2278 let column = rowgroup.metadata().column(0);
2279 assert!(column.definition_level_histogram().is_some());
2280 assert!(column.repetition_level_histogram().is_some());
2281 assert!(column.unencoded_byte_array_data_bytes().is_none());
2282 check_def_hist(column.definition_level_histogram().unwrap().values());
2283 check_rep_hist(column.repetition_level_histogram().unwrap().values());
2284
2285 assert!(reader.metadata().column_index().is_some());
2287 let column_index = reader.metadata().column_index().unwrap();
2288 assert_eq!(column_index.len(), 1);
2289 assert_eq!(column_index[0].len(), 1);
2290 let col_idx = if let Index::INT32(index) = &column_index[0][0] {
2291 assert_eq!(index.indexes.len(), 1);
2292 &index.indexes[0]
2293 } else {
2294 unreachable!()
2295 };
2296
2297 check_def_hist(col_idx.definition_level_histogram().unwrap().values());
2298 check_rep_hist(col_idx.repetition_level_histogram().unwrap().values());
2299
2300 assert!(reader.metadata().offset_index().is_some());
2301 let offset_index = reader.metadata().offset_index().unwrap();
2302 assert_eq!(offset_index.len(), 1);
2303 assert_eq!(offset_index[0].len(), 1);
2304 assert!(offset_index[0][0].unencoded_byte_array_data_bytes.is_none());
2305 }
2306
2307 #[test]
2308 #[cfg(feature = "arrow")]
2309 fn test_byte_stream_split_extended_roundtrip() {
2310 let path = format!(
2311 "{}/byte_stream_split_extended.gzip.parquet",
2312 arrow::util::test_util::parquet_test_data(),
2313 );
2314 let file = File::open(path).unwrap();
2315
2316 let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(file)
2318 .expect("parquet open")
2319 .build()
2320 .expect("parquet open");
2321
2322 let file = tempfile::tempfile().unwrap();
2323 let props = WriterProperties::builder()
2324 .set_dictionary_enabled(false)
2325 .set_column_encoding(
2326 ColumnPath::from("float16_byte_stream_split"),
2327 Encoding::BYTE_STREAM_SPLIT,
2328 )
2329 .set_column_encoding(
2330 ColumnPath::from("float_byte_stream_split"),
2331 Encoding::BYTE_STREAM_SPLIT,
2332 )
2333 .set_column_encoding(
2334 ColumnPath::from("double_byte_stream_split"),
2335 Encoding::BYTE_STREAM_SPLIT,
2336 )
2337 .set_column_encoding(
2338 ColumnPath::from("int32_byte_stream_split"),
2339 Encoding::BYTE_STREAM_SPLIT,
2340 )
2341 .set_column_encoding(
2342 ColumnPath::from("int64_byte_stream_split"),
2343 Encoding::BYTE_STREAM_SPLIT,
2344 )
2345 .set_column_encoding(
2346 ColumnPath::from("flba5_byte_stream_split"),
2347 Encoding::BYTE_STREAM_SPLIT,
2348 )
2349 .set_column_encoding(
2350 ColumnPath::from("decimal_byte_stream_split"),
2351 Encoding::BYTE_STREAM_SPLIT,
2352 )
2353 .build();
2354
2355 let mut parquet_writer = ArrowWriter::try_new(
2356 file.try_clone().expect("cannot open file"),
2357 parquet_reader.schema(),
2358 Some(props),
2359 )
2360 .expect("create arrow writer");
2361
2362 for maybe_batch in parquet_reader {
2363 let batch = maybe_batch.expect("reading batch");
2364 parquet_writer.write(&batch).expect("writing data");
2365 }
2366
2367 parquet_writer.close().expect("finalizing file");
2368
2369 let reader = SerializedFileReader::new(file).expect("Failed to create reader");
2370 let filemeta = reader.metadata();
2371
2372 let check_encoding = |x: usize, filemeta: &ParquetMetaData| {
2374 assert!(filemeta
2375 .row_group(0)
2376 .column(x)
2377 .encodings()
2378 .contains(&Encoding::BYTE_STREAM_SPLIT));
2379 };
2380
2381 check_encoding(1, filemeta);
2382 check_encoding(3, filemeta);
2383 check_encoding(5, filemeta);
2384 check_encoding(7, filemeta);
2385 check_encoding(9, filemeta);
2386 check_encoding(11, filemeta);
2387 check_encoding(13, filemeta);
2388
2389 let mut iter = reader
2391 .get_row_iter(None)
2392 .expect("Failed to create row iterator");
2393
2394 let mut start = 0;
2395 let end = reader.metadata().file_metadata().num_rows();
2396
2397 let check_row = |row: Result<Row, ParquetError>| {
2398 assert!(row.is_ok());
2399 let r = row.unwrap();
2400 assert_eq!(r.get_float16(0).unwrap(), r.get_float16(1).unwrap());
2401 assert_eq!(r.get_float(2).unwrap(), r.get_float(3).unwrap());
2402 assert_eq!(r.get_double(4).unwrap(), r.get_double(5).unwrap());
2403 assert_eq!(r.get_int(6).unwrap(), r.get_int(7).unwrap());
2404 assert_eq!(r.get_long(8).unwrap(), r.get_long(9).unwrap());
2405 assert_eq!(r.get_bytes(10).unwrap(), r.get_bytes(11).unwrap());
2406 assert_eq!(r.get_decimal(12).unwrap(), r.get_decimal(13).unwrap());
2407 };
2408
2409 while start < end {
2410 match iter.next() {
2411 Some(row) => check_row(row),
2412 None => break,
2413 };
2414 start += 1;
2415 }
2416 }
2417}