1use crate::bloom_filter::Sbbf;
22use crate::format as parquet;
23use crate::format::{ColumnIndex, OffsetIndex};
24use crate::thrift::TSerializable;
25use std::fmt::Debug;
26use std::io::{BufWriter, IoSlice, Read};
27use std::{io::Write, sync::Arc};
28use thrift::protocol::TCompactOutputProtocol;
29
30use crate::column::page_encryption::PageEncryptor;
31use crate::column::writer::{get_typed_column_writer_mut, ColumnCloseResult, ColumnWriterImpl};
32use crate::column::{
33 page::{CompressedPage, PageWriteSpec, PageWriter},
34 writer::{get_column_writer, ColumnWriter},
35};
36use crate::data_type::DataType;
37#[cfg(feature = "encryption")]
38use crate::encryption::encrypt::{
39 get_column_crypto_metadata, FileEncryptionProperties, FileEncryptor,
40};
41use crate::errors::{ParquetError, Result};
42use crate::file::properties::{BloomFilterPosition, WriterPropertiesPtr};
43use crate::file::reader::ChunkReader;
44#[cfg(feature = "encryption")]
45use crate::file::PARQUET_MAGIC_ENCR_FOOTER;
46use crate::file::{metadata::*, PARQUET_MAGIC};
47use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor, TypePtr};
48
49pub struct TrackedWrite<W: Write> {
53 inner: BufWriter<W>,
54 bytes_written: usize,
55}
56
57impl<W: Write> TrackedWrite<W> {
58 pub fn new(inner: W) -> Self {
60 let buf_write = BufWriter::new(inner);
61 Self {
62 inner: buf_write,
63 bytes_written: 0,
64 }
65 }
66
67 pub fn bytes_written(&self) -> usize {
69 self.bytes_written
70 }
71
72 pub fn inner(&self) -> &W {
74 self.inner.get_ref()
75 }
76
77 pub fn inner_mut(&mut self) -> &mut W {
82 self.inner.get_mut()
83 }
84
85 pub fn into_inner(self) -> Result<W> {
87 self.inner.into_inner().map_err(|err| {
88 ParquetError::General(format!("fail to get inner writer: {:?}", err.to_string()))
89 })
90 }
91}
92
93impl<W: Write> Write for TrackedWrite<W> {
94 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
95 let bytes = self.inner.write(buf)?;
96 self.bytes_written += bytes;
97 Ok(bytes)
98 }
99
100 fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> std::io::Result<usize> {
101 let bytes = self.inner.write_vectored(bufs)?;
102 self.bytes_written += bytes;
103 Ok(bytes)
104 }
105
106 fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
107 self.inner.write_all(buf)?;
108 self.bytes_written += buf.len();
109
110 Ok(())
111 }
112
113 fn flush(&mut self) -> std::io::Result<()> {
114 self.inner.flush()
115 }
116}
117
118pub type OnCloseColumnChunk<'a> = Box<dyn FnOnce(ColumnCloseResult) -> Result<()> + 'a>;
120
121pub type OnCloseRowGroup<'a, W> = Box<
127 dyn FnOnce(
128 &'a mut TrackedWrite<W>,
129 RowGroupMetaData,
130 Vec<Option<Sbbf>>,
131 Vec<Option<ColumnIndex>>,
132 Vec<Option<OffsetIndex>>,
133 ) -> Result<()>
134 + 'a
135 + Send,
136>;
137
138pub struct SerializedFileWriter<W: Write> {
151 buf: TrackedWrite<W>,
152 schema: TypePtr,
153 descr: SchemaDescPtr,
154 props: WriterPropertiesPtr,
155 row_groups: Vec<RowGroupMetaData>,
156 bloom_filters: Vec<Vec<Option<Sbbf>>>,
157 column_indexes: Vec<Vec<Option<ColumnIndex>>>,
158 offset_indexes: Vec<Vec<Option<OffsetIndex>>>,
159 row_group_index: usize,
160 kv_metadatas: Vec<KeyValue>,
162 finished: bool,
163 #[cfg(feature = "encryption")]
164 file_encryptor: Option<Arc<FileEncryptor>>,
165}
166
167impl<W: Write> Debug for SerializedFileWriter<W> {
168 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
169 f.debug_struct("SerializedFileWriter")
172 .field("descr", &self.descr)
173 .field("row_group_index", &self.row_group_index)
174 .field("kv_metadatas", &self.kv_metadatas)
175 .finish_non_exhaustive()
176 }
177}
178
179impl<W: Write + Send> SerializedFileWriter<W> {
180 pub fn new(buf: W, schema: TypePtr, properties: WriterPropertiesPtr) -> Result<Self> {
182 let mut buf = TrackedWrite::new(buf);
183
184 let schema_descriptor = SchemaDescriptor::new(schema.clone());
185
186 #[cfg(feature = "encryption")]
187 let file_encryptor = Self::get_file_encryptor(&properties, &schema_descriptor)?;
188
189 Self::start_file(&properties, &mut buf)?;
190 Ok(Self {
191 buf,
192 schema,
193 descr: Arc::new(schema_descriptor),
194 props: properties,
195 row_groups: vec![],
196 bloom_filters: vec![],
197 column_indexes: Vec::new(),
198 offset_indexes: Vec::new(),
199 row_group_index: 0,
200 kv_metadatas: Vec::new(),
201 finished: false,
202 #[cfg(feature = "encryption")]
203 file_encryptor,
204 })
205 }
206
207 #[cfg(feature = "encryption")]
208 fn get_file_encryptor(
209 properties: &WriterPropertiesPtr,
210 schema_descriptor: &SchemaDescriptor,
211 ) -> Result<Option<Arc<FileEncryptor>>> {
212 if let Some(file_encryption_properties) = &properties.file_encryption_properties {
213 file_encryption_properties.validate_encrypted_column_names(schema_descriptor)?;
214
215 Ok(Some(Arc::new(FileEncryptor::new(
216 file_encryption_properties.clone(),
217 )?)))
218 } else {
219 Ok(None)
220 }
221 }
222
223 pub fn next_row_group(&mut self) -> Result<SerializedRowGroupWriter<'_, W>> {
230 self.assert_previous_writer_closed()?;
231 let ordinal = self.row_group_index;
232
233 let ordinal: i16 = ordinal.try_into().map_err(|_| {
234 ParquetError::General(format!(
235 "Parquet does not support more than {} row groups per file (currently: {})",
236 i16::MAX,
237 ordinal
238 ))
239 })?;
240
241 self.row_group_index = self
242 .row_group_index
243 .checked_add(1)
244 .expect("SerializedFileWriter::row_group_index overflowed");
245
246 let bloom_filter_position = self.properties().bloom_filter_position();
247 let row_groups = &mut self.row_groups;
248 let row_bloom_filters = &mut self.bloom_filters;
249 let row_column_indexes = &mut self.column_indexes;
250 let row_offset_indexes = &mut self.offset_indexes;
251 let on_close = move |buf,
252 mut metadata,
253 row_group_bloom_filter,
254 row_group_column_index,
255 row_group_offset_index| {
256 row_bloom_filters.push(row_group_bloom_filter);
257 row_column_indexes.push(row_group_column_index);
258 row_offset_indexes.push(row_group_offset_index);
259 match bloom_filter_position {
261 BloomFilterPosition::AfterRowGroup => {
262 write_bloom_filters(buf, row_bloom_filters, &mut metadata)?
263 }
264 BloomFilterPosition::End => (),
265 };
266 row_groups.push(metadata);
267 Ok(())
268 };
269
270 let row_group_writer = SerializedRowGroupWriter::new(
271 self.descr.clone(),
272 self.props.clone(),
273 &mut self.buf,
274 ordinal,
275 Some(Box::new(on_close)),
276 );
277 #[cfg(feature = "encryption")]
278 let row_group_writer = row_group_writer.with_file_encryptor(self.file_encryptor.clone());
279
280 Ok(row_group_writer)
281 }
282
283 pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
285 &self.row_groups
286 }
287
288 pub fn finish(&mut self) -> Result<parquet::FileMetaData> {
294 self.assert_previous_writer_closed()?;
295 let metadata = self.write_metadata()?;
296 self.buf.flush()?;
297 Ok(metadata)
298 }
299
300 pub fn close(mut self) -> Result<parquet::FileMetaData> {
302 self.finish()
303 }
304
305 #[cfg(not(feature = "encryption"))]
307 fn start_file(_properties: &WriterPropertiesPtr, buf: &mut TrackedWrite<W>) -> Result<()> {
308 buf.write_all(get_file_magic())?;
309 Ok(())
310 }
311
312 #[cfg(feature = "encryption")]
314 fn start_file(properties: &WriterPropertiesPtr, buf: &mut TrackedWrite<W>) -> Result<()> {
315 let magic = get_file_magic(properties.file_encryption_properties.as_ref());
316
317 buf.write_all(magic)?;
318 Ok(())
319 }
320
321 fn write_metadata(&mut self) -> Result<parquet::FileMetaData> {
323 self.finished = true;
324
325 for row_group in &mut self.row_groups {
327 write_bloom_filters(&mut self.buf, &mut self.bloom_filters, row_group)?;
328 }
329
330 let key_value_metadata = match self.props.key_value_metadata() {
331 Some(kv) => Some(kv.iter().chain(&self.kv_metadatas).cloned().collect()),
332 None if self.kv_metadatas.is_empty() => None,
333 None => Some(self.kv_metadatas.clone()),
334 };
335
336 let row_groups = self
337 .row_groups
338 .iter()
339 .map(|v| v.to_thrift())
340 .collect::<Vec<_>>();
341
342 let mut encoder = ThriftMetadataWriter::new(
343 &mut self.buf,
344 &self.schema,
345 &self.descr,
346 row_groups,
347 Some(self.props.created_by().to_string()),
348 self.props.writer_version().as_num(),
349 );
350
351 #[cfg(feature = "encryption")]
352 {
353 encoder = encoder.with_file_encryptor(self.file_encryptor.clone());
354 }
355
356 if let Some(key_value_metadata) = key_value_metadata {
357 encoder = encoder.with_key_value_metadata(key_value_metadata)
358 }
359 encoder = encoder.with_column_indexes(&self.column_indexes);
360 encoder = encoder.with_offset_indexes(&self.offset_indexes);
361 encoder.finish()
362 }
363
364 #[inline]
365 fn assert_previous_writer_closed(&self) -> Result<()> {
366 if self.finished {
367 return Err(general_err!("SerializedFileWriter already finished"));
368 }
369
370 if self.row_group_index != self.row_groups.len() {
371 Err(general_err!("Previous row group writer was not closed"))
372 } else {
373 Ok(())
374 }
375 }
376
377 pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
379 self.kv_metadatas.push(kv_metadata);
380 }
381
382 pub fn schema_descr(&self) -> &SchemaDescriptor {
384 &self.descr
385 }
386
387 pub fn properties(&self) -> &WriterPropertiesPtr {
389 &self.props
390 }
391
392 pub fn inner(&self) -> &W {
394 self.buf.inner()
395 }
396
397 pub fn inner_mut(&mut self) -> &mut W {
401 self.buf.inner_mut()
402 }
403
404 pub fn into_inner(mut self) -> Result<W> {
406 self.assert_previous_writer_closed()?;
407 let _ = self.write_metadata()?;
408
409 self.buf.into_inner()
410 }
411
412 pub fn bytes_written(&self) -> usize {
414 self.buf.bytes_written()
415 }
416
417 #[cfg(feature = "encryption")]
419 pub(crate) fn file_encryptor(&self) -> Option<Arc<FileEncryptor>> {
420 self.file_encryptor.clone()
421 }
422}
423
424fn write_bloom_filters<W: Write + Send>(
427 buf: &mut TrackedWrite<W>,
428 bloom_filters: &mut [Vec<Option<Sbbf>>],
429 row_group: &mut RowGroupMetaData,
430) -> Result<()> {
431 let row_group_idx: u16 = row_group
436 .ordinal()
437 .expect("Missing row group ordinal")
438 .try_into()
439 .map_err(|_| {
440 ParquetError::General(format!(
441 "Negative row group ordinal: {})",
442 row_group.ordinal().unwrap()
443 ))
444 })?;
445 let row_group_idx = row_group_idx as usize;
446 for (column_idx, column_chunk) in row_group.columns_mut().iter_mut().enumerate() {
447 if let Some(bloom_filter) = bloom_filters[row_group_idx][column_idx].take() {
448 let start_offset = buf.bytes_written();
449 bloom_filter.write(&mut *buf)?;
450 let end_offset = buf.bytes_written();
451 *column_chunk = column_chunk
453 .clone()
454 .into_builder()
455 .set_bloom_filter_offset(Some(start_offset as i64))
456 .set_bloom_filter_length(Some((end_offset - start_offset) as i32))
457 .build()?;
458 }
459 }
460 Ok(())
461}
462
463pub struct SerializedRowGroupWriter<'a, W: Write> {
475 descr: SchemaDescPtr,
476 props: WriterPropertiesPtr,
477 buf: &'a mut TrackedWrite<W>,
478 total_rows_written: Option<u64>,
479 total_bytes_written: u64,
480 total_uncompressed_bytes: i64,
481 column_index: usize,
482 row_group_metadata: Option<RowGroupMetaDataPtr>,
483 column_chunks: Vec<ColumnChunkMetaData>,
484 bloom_filters: Vec<Option<Sbbf>>,
485 column_indexes: Vec<Option<ColumnIndex>>,
486 offset_indexes: Vec<Option<OffsetIndex>>,
487 row_group_index: i16,
488 file_offset: i64,
489 on_close: Option<OnCloseRowGroup<'a, W>>,
490 #[cfg(feature = "encryption")]
491 file_encryptor: Option<Arc<FileEncryptor>>,
492}
493
494impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> {
495 pub fn new(
504 schema_descr: SchemaDescPtr,
505 properties: WriterPropertiesPtr,
506 buf: &'a mut TrackedWrite<W>,
507 row_group_index: i16,
508 on_close: Option<OnCloseRowGroup<'a, W>>,
509 ) -> Self {
510 let num_columns = schema_descr.num_columns();
511 let file_offset = buf.bytes_written() as i64;
512 Self {
513 buf,
514 row_group_index,
515 file_offset,
516 on_close,
517 total_rows_written: None,
518 descr: schema_descr,
519 props: properties,
520 column_index: 0,
521 row_group_metadata: None,
522 column_chunks: Vec::with_capacity(num_columns),
523 bloom_filters: Vec::with_capacity(num_columns),
524 column_indexes: Vec::with_capacity(num_columns),
525 offset_indexes: Vec::with_capacity(num_columns),
526 total_bytes_written: 0,
527 total_uncompressed_bytes: 0,
528 #[cfg(feature = "encryption")]
529 file_encryptor: None,
530 }
531 }
532
533 #[cfg(feature = "encryption")]
534 pub(crate) fn with_file_encryptor(
536 mut self,
537 file_encryptor: Option<Arc<FileEncryptor>>,
538 ) -> Self {
539 self.file_encryptor = file_encryptor;
540 self
541 }
542
543 fn next_column_desc(&mut self) -> Option<ColumnDescPtr> {
545 let ret = self.descr.columns().get(self.column_index)?.clone();
546 self.column_index += 1;
547 Some(ret)
548 }
549
550 fn get_on_close(&mut self) -> (&mut TrackedWrite<W>, OnCloseColumnChunk<'_>) {
552 let total_bytes_written = &mut self.total_bytes_written;
553 let total_uncompressed_bytes = &mut self.total_uncompressed_bytes;
554 let total_rows_written = &mut self.total_rows_written;
555 let column_chunks = &mut self.column_chunks;
556 let column_indexes = &mut self.column_indexes;
557 let offset_indexes = &mut self.offset_indexes;
558 let bloom_filters = &mut self.bloom_filters;
559
560 let on_close = |r: ColumnCloseResult| {
561 *total_bytes_written += r.bytes_written;
563 *total_uncompressed_bytes += r.metadata.uncompressed_size();
564 column_chunks.push(r.metadata);
565 bloom_filters.push(r.bloom_filter);
566 column_indexes.push(r.column_index);
567 offset_indexes.push(r.offset_index);
568
569 if let Some(rows) = *total_rows_written {
570 if rows != r.rows_written {
571 return Err(general_err!(
572 "Incorrect number of rows, expected {} != {} rows",
573 rows,
574 r.rows_written
575 ));
576 }
577 } else {
578 *total_rows_written = Some(r.rows_written);
579 }
580
581 Ok(())
582 };
583 (self.buf, Box::new(on_close))
584 }
585
586 pub(crate) fn next_column_with_factory<'b, F, C>(&'b mut self, factory: F) -> Result<Option<C>>
589 where
590 F: FnOnce(
591 ColumnDescPtr,
592 WriterPropertiesPtr,
593 Box<dyn PageWriter + 'b>,
594 OnCloseColumnChunk<'b>,
595 ) -> Result<C>,
596 {
597 self.assert_previous_writer_closed()?;
598
599 let encryptor_context = self.get_page_encryptor_context();
600
601 Ok(match self.next_column_desc() {
602 Some(column) => {
603 let props = self.props.clone();
604 let (buf, on_close) = self.get_on_close();
605
606 let page_writer = SerializedPageWriter::new(buf);
607 let page_writer =
608 Self::set_page_writer_encryptor(&column, encryptor_context, page_writer)?;
609
610 Some(factory(
611 column,
612 props,
613 Box::new(page_writer),
614 Box::new(on_close),
615 )?)
616 }
617 None => None,
618 })
619 }
620
621 pub fn next_column(&mut self) -> Result<Option<SerializedColumnWriter<'_>>> {
625 self.next_column_with_factory(|descr, props, page_writer, on_close| {
626 let column_writer = get_column_writer(descr, props, page_writer);
627 Ok(SerializedColumnWriter::new(column_writer, Some(on_close)))
628 })
629 }
630
631 pub fn append_column<R: ChunkReader>(
638 &mut self,
639 reader: &R,
640 mut close: ColumnCloseResult,
641 ) -> Result<()> {
642 self.assert_previous_writer_closed()?;
643 let desc = self
644 .next_column_desc()
645 .ok_or_else(|| general_err!("exhausted columns in SerializedRowGroupWriter"))?;
646
647 let metadata = close.metadata;
648
649 if metadata.column_descr() != desc.as_ref() {
650 return Err(general_err!(
651 "column descriptor mismatch, expected {:?} got {:?}",
652 desc,
653 metadata.column_descr()
654 ));
655 }
656
657 let src_dictionary_offset = metadata.dictionary_page_offset();
658 let src_data_offset = metadata.data_page_offset();
659 let src_offset = src_dictionary_offset.unwrap_or(src_data_offset);
660 let src_length = metadata.compressed_size();
661
662 let write_offset = self.buf.bytes_written();
663 let mut read = reader.get_read(src_offset as _)?.take(src_length as _);
664 let write_length = std::io::copy(&mut read, &mut self.buf)?;
665
666 if src_length as u64 != write_length {
667 return Err(general_err!(
668 "Failed to splice column data, expected {read_length} got {write_length}"
669 ));
670 }
671
672 let map_offset = |x| x - src_offset + write_offset as i64;
673 let mut builder = ColumnChunkMetaData::builder(metadata.column_descr_ptr())
674 .set_compression(metadata.compression())
675 .set_encodings(metadata.encodings().clone())
676 .set_total_compressed_size(metadata.compressed_size())
677 .set_total_uncompressed_size(metadata.uncompressed_size())
678 .set_num_values(metadata.num_values())
679 .set_data_page_offset(map_offset(src_data_offset))
680 .set_dictionary_page_offset(src_dictionary_offset.map(map_offset))
681 .set_unencoded_byte_array_data_bytes(metadata.unencoded_byte_array_data_bytes());
682
683 if let Some(rep_hist) = metadata.repetition_level_histogram() {
684 builder = builder.set_repetition_level_histogram(Some(rep_hist.clone()))
685 }
686 if let Some(def_hist) = metadata.definition_level_histogram() {
687 builder = builder.set_definition_level_histogram(Some(def_hist.clone()))
688 }
689 if let Some(statistics) = metadata.statistics() {
690 builder = builder.set_statistics(statistics.clone())
691 }
692 builder = self.set_column_crypto_metadata(builder, &metadata);
693 close.metadata = builder.build()?;
694
695 if let Some(offsets) = close.offset_index.as_mut() {
696 for location in &mut offsets.page_locations {
697 location.offset = map_offset(location.offset)
698 }
699 }
700
701 let (_, on_close) = self.get_on_close();
702 on_close(close)
703 }
704
705 pub fn close(mut self) -> Result<RowGroupMetaDataPtr> {
707 if self.row_group_metadata.is_none() {
708 self.assert_previous_writer_closed()?;
709
710 let column_chunks = std::mem::take(&mut self.column_chunks);
711 let row_group_metadata = RowGroupMetaData::builder(self.descr.clone())
712 .set_column_metadata(column_chunks)
713 .set_total_byte_size(self.total_uncompressed_bytes)
714 .set_num_rows(self.total_rows_written.unwrap_or(0) as i64)
715 .set_sorting_columns(self.props.sorting_columns().cloned())
716 .set_ordinal(self.row_group_index)
717 .set_file_offset(self.file_offset)
718 .build()?;
719
720 self.row_group_metadata = Some(Arc::new(row_group_metadata.clone()));
721
722 if let Some(on_close) = self.on_close.take() {
723 on_close(
724 self.buf,
725 row_group_metadata,
726 self.bloom_filters,
727 self.column_indexes,
728 self.offset_indexes,
729 )?
730 }
731 }
732
733 let metadata = self.row_group_metadata.as_ref().unwrap().clone();
734 Ok(metadata)
735 }
736
737 #[cfg(feature = "encryption")]
739 fn set_column_crypto_metadata(
740 &self,
741 builder: ColumnChunkMetaDataBuilder,
742 metadata: &ColumnChunkMetaData,
743 ) -> ColumnChunkMetaDataBuilder {
744 if let Some(file_encryptor) = self.file_encryptor.as_ref() {
745 builder.set_column_crypto_metadata(get_column_crypto_metadata(
746 file_encryptor.properties(),
747 &metadata.column_descr_ptr(),
748 ))
749 } else {
750 builder
751 }
752 }
753
754 #[cfg(feature = "encryption")]
756 fn get_page_encryptor_context(&self) -> PageEncryptorContext {
757 PageEncryptorContext {
758 file_encryptor: self.file_encryptor.clone(),
759 row_group_index: self.row_group_index as usize,
760 column_index: self.column_index,
761 }
762 }
763
764 #[cfg(feature = "encryption")]
766 fn set_page_writer_encryptor<'b>(
767 column: &ColumnDescPtr,
768 context: PageEncryptorContext,
769 page_writer: SerializedPageWriter<'b, W>,
770 ) -> Result<SerializedPageWriter<'b, W>> {
771 let page_encryptor = PageEncryptor::create_if_column_encrypted(
772 &context.file_encryptor,
773 context.row_group_index,
774 context.column_index,
775 &column.path().string(),
776 )?;
777
778 Ok(page_writer.with_page_encryptor(page_encryptor))
779 }
780
781 #[cfg(not(feature = "encryption"))]
783 fn set_column_crypto_metadata(
784 &self,
785 builder: ColumnChunkMetaDataBuilder,
786 _metadata: &ColumnChunkMetaData,
787 ) -> ColumnChunkMetaDataBuilder {
788 builder
789 }
790
791 #[cfg(not(feature = "encryption"))]
792 fn get_page_encryptor_context(&self) -> PageEncryptorContext {
793 PageEncryptorContext {}
794 }
795
796 #[cfg(not(feature = "encryption"))]
798 fn set_page_writer_encryptor<'b>(
799 _column: &ColumnDescPtr,
800 _context: PageEncryptorContext,
801 page_writer: SerializedPageWriter<'b, W>,
802 ) -> Result<SerializedPageWriter<'b, W>> {
803 Ok(page_writer)
804 }
805
806 #[inline]
807 fn assert_previous_writer_closed(&self) -> Result<()> {
808 if self.column_index != self.column_chunks.len() {
809 Err(general_err!("Previous column writer was not closed"))
810 } else {
811 Ok(())
812 }
813 }
814}
815
816#[cfg(feature = "encryption")]
818struct PageEncryptorContext {
819 file_encryptor: Option<Arc<FileEncryptor>>,
820 row_group_index: usize,
821 column_index: usize,
822}
823
824#[cfg(not(feature = "encryption"))]
825struct PageEncryptorContext {}
826
827pub struct SerializedColumnWriter<'a> {
829 inner: ColumnWriter<'a>,
830 on_close: Option<OnCloseColumnChunk<'a>>,
831}
832
833impl<'a> SerializedColumnWriter<'a> {
834 pub fn new(inner: ColumnWriter<'a>, on_close: Option<OnCloseColumnChunk<'a>>) -> Self {
837 Self { inner, on_close }
838 }
839
840 pub fn untyped(&mut self) -> &mut ColumnWriter<'a> {
842 &mut self.inner
843 }
844
845 pub fn typed<T: DataType>(&mut self) -> &mut ColumnWriterImpl<'a, T> {
847 get_typed_column_writer_mut(&mut self.inner)
848 }
849
850 pub fn close(mut self) -> Result<()> {
852 let r = self.inner.close()?;
853 if let Some(on_close) = self.on_close.take() {
854 on_close(r)?
855 }
856
857 Ok(())
858 }
859}
860
861pub struct SerializedPageWriter<'a, W: Write> {
866 sink: &'a mut TrackedWrite<W>,
867 #[cfg(feature = "encryption")]
868 page_encryptor: Option<PageEncryptor>,
869}
870
871impl<'a, W: Write> SerializedPageWriter<'a, W> {
872 pub fn new(sink: &'a mut TrackedWrite<W>) -> Self {
874 Self {
875 sink,
876 #[cfg(feature = "encryption")]
877 page_encryptor: None,
878 }
879 }
880
881 #[inline]
884 fn serialize_page_header(&mut self, header: parquet::PageHeader) -> Result<usize> {
885 let start_pos = self.sink.bytes_written();
886 match self.page_encryptor_and_sink_mut() {
887 Some((page_encryptor, sink)) => {
888 page_encryptor.encrypt_page_header(&header, sink)?;
889 }
890 None => {
891 let mut protocol = TCompactOutputProtocol::new(&mut self.sink);
892 header.write_to_out_protocol(&mut protocol)?;
893 }
894 }
895 Ok(self.sink.bytes_written() - start_pos)
896 }
897}
898
899#[cfg(feature = "encryption")]
900impl<'a, W: Write> SerializedPageWriter<'a, W> {
901 fn with_page_encryptor(mut self, page_encryptor: Option<PageEncryptor>) -> Self {
903 self.page_encryptor = page_encryptor;
904 self
905 }
906
907 fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
908 self.page_encryptor.as_mut()
909 }
910
911 fn page_encryptor_and_sink_mut(
912 &mut self,
913 ) -> Option<(&mut PageEncryptor, &mut &'a mut TrackedWrite<W>)> {
914 self.page_encryptor.as_mut().map(|pe| (pe, &mut self.sink))
915 }
916}
917
918#[cfg(not(feature = "encryption"))]
919impl<'a, W: Write> SerializedPageWriter<'a, W> {
920 fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
921 None
922 }
923
924 fn page_encryptor_and_sink_mut(
925 &mut self,
926 ) -> Option<(&mut PageEncryptor, &mut &'a mut TrackedWrite<W>)> {
927 None
928 }
929}
930
931impl<W: Write + Send> PageWriter for SerializedPageWriter<'_, W> {
932 fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
933 let page = match self.page_encryptor_mut() {
934 Some(page_encryptor) => page_encryptor.encrypt_compressed_page(page)?,
935 None => page,
936 };
937
938 let page_type = page.page_type();
939 let start_pos = self.sink.bytes_written() as u64;
940
941 let page_header = page.to_thrift_header();
942 let header_size = self.serialize_page_header(page_header)?;
943
944 self.sink.write_all(page.data())?;
945
946 let mut spec = PageWriteSpec::new();
947 spec.page_type = page_type;
948 spec.uncompressed_size = page.uncompressed_size() + header_size;
949 spec.compressed_size = page.compressed_size() + header_size;
950 spec.offset = start_pos;
951 spec.bytes_written = self.sink.bytes_written() as u64 - start_pos;
952 spec.num_values = page.num_values();
953
954 if let Some(page_encryptor) = self.page_encryptor_mut() {
955 if page.compressed_page().is_data_page() {
956 page_encryptor.increment_page();
957 }
958 }
959 Ok(spec)
960 }
961
962 fn close(&mut self) -> Result<()> {
963 self.sink.flush()?;
964 Ok(())
965 }
966}
967
968#[cfg(feature = "encryption")]
971pub(crate) fn get_file_magic(
972 file_encryption_properties: Option<&FileEncryptionProperties>,
973) -> &'static [u8; 4] {
974 match file_encryption_properties.as_ref() {
975 Some(encryption_properties) if encryption_properties.encrypt_footer() => {
976 &PARQUET_MAGIC_ENCR_FOOTER
977 }
978 _ => &PARQUET_MAGIC,
979 }
980}
981
982#[cfg(not(feature = "encryption"))]
983pub(crate) fn get_file_magic() -> &'static [u8; 4] {
984 &PARQUET_MAGIC
985}
986
987#[cfg(test)]
988mod tests {
989 use super::*;
990
991 #[cfg(feature = "arrow")]
992 use arrow_array::RecordBatchReader;
993 use bytes::Bytes;
994 use std::fs::File;
995
996 #[cfg(feature = "arrow")]
997 use crate::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
998 #[cfg(feature = "arrow")]
999 use crate::arrow::ArrowWriter;
1000 use crate::basic::{
1001 ColumnOrder, Compression, ConvertedType, Encoding, LogicalType, Repetition, SortOrder, Type,
1002 };
1003 use crate::column::page::{Page, PageReader};
1004 use crate::column::reader::get_typed_column_reader;
1005 use crate::compression::{create_codec, Codec, CodecOptionsBuilder};
1006 use crate::data_type::{BoolType, ByteArrayType, Int32Type};
1007 use crate::file::page_index::index::Index;
1008 use crate::file::properties::EnabledStatistics;
1009 use crate::file::serialized_reader::ReadOptionsBuilder;
1010 use crate::file::{
1011 properties::{ReaderProperties, WriterProperties, WriterVersion},
1012 reader::{FileReader, SerializedFileReader, SerializedPageReader},
1013 statistics::{from_thrift, to_thrift, Statistics},
1014 };
1015 use crate::format::SortingColumn;
1016 use crate::record::{Row, RowAccessor};
1017 use crate::schema::parser::parse_message_type;
1018 use crate::schema::types;
1019 use crate::schema::types::{ColumnDescriptor, ColumnPath};
1020 use crate::util::test_common::rand_gen::RandGen;
1021
1022 #[test]
1023 fn test_row_group_writer_error_not_all_columns_written() {
1024 let file = tempfile::tempfile().unwrap();
1025 let schema = Arc::new(
1026 types::Type::group_type_builder("schema")
1027 .with_fields(vec![Arc::new(
1028 types::Type::primitive_type_builder("col1", Type::INT32)
1029 .build()
1030 .unwrap(),
1031 )])
1032 .build()
1033 .unwrap(),
1034 );
1035 let props = Default::default();
1036 let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
1037 let row_group_writer = writer.next_row_group().unwrap();
1038 let res = row_group_writer.close();
1039 assert!(res.is_err());
1040 if let Err(err) = res {
1041 assert_eq!(
1042 format!("{err}"),
1043 "Parquet error: Column length mismatch: 1 != 0"
1044 );
1045 }
1046 }
1047
1048 #[test]
1049 fn test_row_group_writer_num_records_mismatch() {
1050 let file = tempfile::tempfile().unwrap();
1051 let schema = Arc::new(
1052 types::Type::group_type_builder("schema")
1053 .with_fields(vec![
1054 Arc::new(
1055 types::Type::primitive_type_builder("col1", Type::INT32)
1056 .with_repetition(Repetition::REQUIRED)
1057 .build()
1058 .unwrap(),
1059 ),
1060 Arc::new(
1061 types::Type::primitive_type_builder("col2", Type::INT32)
1062 .with_repetition(Repetition::REQUIRED)
1063 .build()
1064 .unwrap(),
1065 ),
1066 ])
1067 .build()
1068 .unwrap(),
1069 );
1070 let props = Default::default();
1071 let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
1072 let mut row_group_writer = writer.next_row_group().unwrap();
1073
1074 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
1075 col_writer
1076 .typed::<Int32Type>()
1077 .write_batch(&[1, 2, 3], None, None)
1078 .unwrap();
1079 col_writer.close().unwrap();
1080
1081 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
1082 col_writer
1083 .typed::<Int32Type>()
1084 .write_batch(&[1, 2], None, None)
1085 .unwrap();
1086
1087 let err = col_writer.close().unwrap_err();
1088 assert_eq!(
1089 err.to_string(),
1090 "Parquet error: Incorrect number of rows, expected 3 != 2 rows"
1091 );
1092 }
1093
1094 #[test]
1095 fn test_file_writer_empty_file() {
1096 let file = tempfile::tempfile().unwrap();
1097
1098 let schema = Arc::new(
1099 types::Type::group_type_builder("schema")
1100 .with_fields(vec![Arc::new(
1101 types::Type::primitive_type_builder("col1", Type::INT32)
1102 .build()
1103 .unwrap(),
1104 )])
1105 .build()
1106 .unwrap(),
1107 );
1108 let props = Default::default();
1109 let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1110 writer.close().unwrap();
1111
1112 let reader = SerializedFileReader::new(file).unwrap();
1113 assert_eq!(reader.get_row_iter(None).unwrap().count(), 0);
1114 }
1115
1116 #[test]
1117 fn test_file_writer_column_orders_populated() {
1118 let file = tempfile::tempfile().unwrap();
1119
1120 let schema = Arc::new(
1121 types::Type::group_type_builder("schema")
1122 .with_fields(vec![
1123 Arc::new(
1124 types::Type::primitive_type_builder("col1", Type::INT32)
1125 .build()
1126 .unwrap(),
1127 ),
1128 Arc::new(
1129 types::Type::primitive_type_builder("col2", Type::FIXED_LEN_BYTE_ARRAY)
1130 .with_converted_type(ConvertedType::INTERVAL)
1131 .with_length(12)
1132 .build()
1133 .unwrap(),
1134 ),
1135 Arc::new(
1136 types::Type::group_type_builder("nested")
1137 .with_repetition(Repetition::REQUIRED)
1138 .with_fields(vec![
1139 Arc::new(
1140 types::Type::primitive_type_builder(
1141 "col3",
1142 Type::FIXED_LEN_BYTE_ARRAY,
1143 )
1144 .with_logical_type(Some(LogicalType::Float16))
1145 .with_length(2)
1146 .build()
1147 .unwrap(),
1148 ),
1149 Arc::new(
1150 types::Type::primitive_type_builder("col4", Type::BYTE_ARRAY)
1151 .with_logical_type(Some(LogicalType::String))
1152 .build()
1153 .unwrap(),
1154 ),
1155 ])
1156 .build()
1157 .unwrap(),
1158 ),
1159 ])
1160 .build()
1161 .unwrap(),
1162 );
1163
1164 let props = Default::default();
1165 let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1166 writer.close().unwrap();
1167
1168 let reader = SerializedFileReader::new(file).unwrap();
1169
1170 let expected = vec![
1172 ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED),
1174 ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::UNDEFINED),
1176 ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED),
1178 ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::UNSIGNED),
1180 ];
1181 let actual = reader.metadata().file_metadata().column_orders();
1182
1183 assert!(actual.is_some());
1184 let actual = actual.unwrap();
1185 assert_eq!(*actual, expected);
1186 }
1187
1188 #[test]
1189 fn test_file_writer_with_metadata() {
1190 let file = tempfile::tempfile().unwrap();
1191
1192 let schema = Arc::new(
1193 types::Type::group_type_builder("schema")
1194 .with_fields(vec![Arc::new(
1195 types::Type::primitive_type_builder("col1", Type::INT32)
1196 .build()
1197 .unwrap(),
1198 )])
1199 .build()
1200 .unwrap(),
1201 );
1202 let props = Arc::new(
1203 WriterProperties::builder()
1204 .set_key_value_metadata(Some(vec![KeyValue::new(
1205 "key".to_string(),
1206 "value".to_string(),
1207 )]))
1208 .build(),
1209 );
1210 let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1211 writer.close().unwrap();
1212
1213 let reader = SerializedFileReader::new(file).unwrap();
1214 assert_eq!(
1215 reader
1216 .metadata()
1217 .file_metadata()
1218 .key_value_metadata()
1219 .to_owned()
1220 .unwrap()
1221 .len(),
1222 1
1223 );
1224 }
1225
1226 #[test]
1227 fn test_file_writer_v2_with_metadata() {
1228 let file = tempfile::tempfile().unwrap();
1229 let field_logical_type = Some(LogicalType::Integer {
1230 bit_width: 8,
1231 is_signed: false,
1232 });
1233 let field = Arc::new(
1234 types::Type::primitive_type_builder("col1", Type::INT32)
1235 .with_logical_type(field_logical_type.clone())
1236 .with_converted_type(field_logical_type.into())
1237 .build()
1238 .unwrap(),
1239 );
1240 let schema = Arc::new(
1241 types::Type::group_type_builder("schema")
1242 .with_fields(vec![field.clone()])
1243 .build()
1244 .unwrap(),
1245 );
1246 let props = Arc::new(
1247 WriterProperties::builder()
1248 .set_key_value_metadata(Some(vec![KeyValue::new(
1249 "key".to_string(),
1250 "value".to_string(),
1251 )]))
1252 .set_writer_version(WriterVersion::PARQUET_2_0)
1253 .build(),
1254 );
1255 let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1256 writer.close().unwrap();
1257
1258 let reader = SerializedFileReader::new(file).unwrap();
1259
1260 assert_eq!(
1261 reader
1262 .metadata()
1263 .file_metadata()
1264 .key_value_metadata()
1265 .to_owned()
1266 .unwrap()
1267 .len(),
1268 1
1269 );
1270
1271 let fields = reader.metadata().file_metadata().schema().get_fields();
1273 assert_eq!(fields.len(), 1);
1274 assert_eq!(fields[0], field);
1275 }
1276
1277 #[test]
1278 fn test_file_writer_with_sorting_columns_metadata() {
1279 let file = tempfile::tempfile().unwrap();
1280
1281 let schema = Arc::new(
1282 types::Type::group_type_builder("schema")
1283 .with_fields(vec![
1284 Arc::new(
1285 types::Type::primitive_type_builder("col1", Type::INT32)
1286 .build()
1287 .unwrap(),
1288 ),
1289 Arc::new(
1290 types::Type::primitive_type_builder("col2", Type::INT32)
1291 .build()
1292 .unwrap(),
1293 ),
1294 ])
1295 .build()
1296 .unwrap(),
1297 );
1298 let expected_result = Some(vec![SortingColumn {
1299 column_idx: 0,
1300 descending: false,
1301 nulls_first: true,
1302 }]);
1303 let props = Arc::new(
1304 WriterProperties::builder()
1305 .set_key_value_metadata(Some(vec![KeyValue::new(
1306 "key".to_string(),
1307 "value".to_string(),
1308 )]))
1309 .set_sorting_columns(expected_result.clone())
1310 .build(),
1311 );
1312 let mut writer =
1313 SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1314 let mut row_group_writer = writer.next_row_group().expect("get row group writer");
1315
1316 let col_writer = row_group_writer.next_column().unwrap().unwrap();
1317 col_writer.close().unwrap();
1318
1319 let col_writer = row_group_writer.next_column().unwrap().unwrap();
1320 col_writer.close().unwrap();
1321
1322 row_group_writer.close().unwrap();
1323 writer.close().unwrap();
1324
1325 let reader = SerializedFileReader::new(file).unwrap();
1326 let result: Vec<Option<&Vec<SortingColumn>>> = reader
1327 .metadata()
1328 .row_groups()
1329 .iter()
1330 .map(|f| f.sorting_columns())
1331 .collect();
1332 assert_eq!(expected_result.as_ref(), result[0]);
1334 }
1335
1336 #[test]
1337 fn test_file_writer_empty_row_groups() {
1338 let file = tempfile::tempfile().unwrap();
1339 test_file_roundtrip(file, vec![]);
1340 }
1341
1342 #[test]
1343 fn test_file_writer_single_row_group() {
1344 let file = tempfile::tempfile().unwrap();
1345 test_file_roundtrip(file, vec![vec![1, 2, 3, 4, 5]]);
1346 }
1347
1348 #[test]
1349 fn test_file_writer_multiple_row_groups() {
1350 let file = tempfile::tempfile().unwrap();
1351 test_file_roundtrip(
1352 file,
1353 vec![
1354 vec![1, 2, 3, 4, 5],
1355 vec![1, 2, 3],
1356 vec![1],
1357 vec![1, 2, 3, 4, 5, 6],
1358 ],
1359 );
1360 }
1361
1362 #[test]
1363 fn test_file_writer_multiple_large_row_groups() {
1364 let file = tempfile::tempfile().unwrap();
1365 test_file_roundtrip(
1366 file,
1367 vec![vec![123; 1024], vec![124; 1000], vec![125; 15], vec![]],
1368 );
1369 }
1370
1371 #[test]
1372 fn test_page_writer_data_pages() {
1373 let pages = vec![
1374 Page::DataPage {
1375 buf: Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]),
1376 num_values: 10,
1377 encoding: Encoding::DELTA_BINARY_PACKED,
1378 def_level_encoding: Encoding::RLE,
1379 rep_level_encoding: Encoding::RLE,
1380 statistics: Some(Statistics::int32(Some(1), Some(3), None, Some(7), true)),
1381 },
1382 Page::DataPageV2 {
1383 buf: Bytes::from(vec![4; 128]),
1384 num_values: 10,
1385 encoding: Encoding::DELTA_BINARY_PACKED,
1386 num_nulls: 2,
1387 num_rows: 12,
1388 def_levels_byte_len: 24,
1389 rep_levels_byte_len: 32,
1390 is_compressed: false,
1391 statistics: Some(Statistics::int32(Some(1), Some(3), None, Some(7), true)),
1392 },
1393 ];
1394
1395 test_page_roundtrip(&pages[..], Compression::SNAPPY, Type::INT32);
1396 test_page_roundtrip(&pages[..], Compression::UNCOMPRESSED, Type::INT32);
1397 }
1398
1399 #[test]
1400 fn test_page_writer_dict_pages() {
1401 let pages = vec![
1402 Page::DictionaryPage {
1403 buf: Bytes::from(vec![1, 2, 3, 4, 5]),
1404 num_values: 5,
1405 encoding: Encoding::RLE_DICTIONARY,
1406 is_sorted: false,
1407 },
1408 Page::DataPage {
1409 buf: Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]),
1410 num_values: 10,
1411 encoding: Encoding::DELTA_BINARY_PACKED,
1412 def_level_encoding: Encoding::RLE,
1413 rep_level_encoding: Encoding::RLE,
1414 statistics: Some(Statistics::int32(Some(1), Some(3), None, Some(7), true)),
1415 },
1416 Page::DataPageV2 {
1417 buf: Bytes::from(vec![4; 128]),
1418 num_values: 10,
1419 encoding: Encoding::DELTA_BINARY_PACKED,
1420 num_nulls: 2,
1421 num_rows: 12,
1422 def_levels_byte_len: 24,
1423 rep_levels_byte_len: 32,
1424 is_compressed: false,
1425 statistics: None,
1426 },
1427 ];
1428
1429 test_page_roundtrip(&pages[..], Compression::SNAPPY, Type::INT32);
1430 test_page_roundtrip(&pages[..], Compression::UNCOMPRESSED, Type::INT32);
1431 }
1432
1433 fn test_page_roundtrip(pages: &[Page], codec: Compression, physical_type: Type) {
1437 let mut compressed_pages = vec![];
1438 let mut total_num_values = 0i64;
1439 let codec_options = CodecOptionsBuilder::default()
1440 .set_backward_compatible_lz4(false)
1441 .build();
1442 let mut compressor = create_codec(codec, &codec_options).unwrap();
1443
1444 for page in pages {
1445 let uncompressed_len = page.buffer().len();
1446
1447 let compressed_page = match *page {
1448 Page::DataPage {
1449 ref buf,
1450 num_values,
1451 encoding,
1452 def_level_encoding,
1453 rep_level_encoding,
1454 ref statistics,
1455 } => {
1456 total_num_values += num_values as i64;
1457 let output_buf = compress_helper(compressor.as_mut(), buf);
1458
1459 Page::DataPage {
1460 buf: Bytes::from(output_buf),
1461 num_values,
1462 encoding,
1463 def_level_encoding,
1464 rep_level_encoding,
1465 statistics: from_thrift(physical_type, to_thrift(statistics.as_ref()))
1466 .unwrap(),
1467 }
1468 }
1469 Page::DataPageV2 {
1470 ref buf,
1471 num_values,
1472 encoding,
1473 num_nulls,
1474 num_rows,
1475 def_levels_byte_len,
1476 rep_levels_byte_len,
1477 ref statistics,
1478 ..
1479 } => {
1480 total_num_values += num_values as i64;
1481 let offset = (def_levels_byte_len + rep_levels_byte_len) as usize;
1482 let cmp_buf = compress_helper(compressor.as_mut(), &buf[offset..]);
1483 let mut output_buf = Vec::from(&buf[..offset]);
1484 output_buf.extend_from_slice(&cmp_buf[..]);
1485
1486 Page::DataPageV2 {
1487 buf: Bytes::from(output_buf),
1488 num_values,
1489 encoding,
1490 num_nulls,
1491 num_rows,
1492 def_levels_byte_len,
1493 rep_levels_byte_len,
1494 is_compressed: compressor.is_some(),
1495 statistics: from_thrift(physical_type, to_thrift(statistics.as_ref()))
1496 .unwrap(),
1497 }
1498 }
1499 Page::DictionaryPage {
1500 ref buf,
1501 num_values,
1502 encoding,
1503 is_sorted,
1504 } => {
1505 let output_buf = compress_helper(compressor.as_mut(), buf);
1506
1507 Page::DictionaryPage {
1508 buf: Bytes::from(output_buf),
1509 num_values,
1510 encoding,
1511 is_sorted,
1512 }
1513 }
1514 };
1515
1516 let compressed_page = CompressedPage::new(compressed_page, uncompressed_len);
1517 compressed_pages.push(compressed_page);
1518 }
1519
1520 let mut buffer: Vec<u8> = vec![];
1521 let mut result_pages: Vec<Page> = vec![];
1522 {
1523 let mut writer = TrackedWrite::new(&mut buffer);
1524 let mut page_writer = SerializedPageWriter::new(&mut writer);
1525
1526 for page in compressed_pages {
1527 page_writer.write_page(page).unwrap();
1528 }
1529 page_writer.close().unwrap();
1530 }
1531 {
1532 let reader = bytes::Bytes::from(buffer);
1533
1534 let t = types::Type::primitive_type_builder("t", physical_type)
1535 .build()
1536 .unwrap();
1537
1538 let desc = ColumnDescriptor::new(Arc::new(t), 0, 0, ColumnPath::new(vec![]));
1539 let meta = ColumnChunkMetaData::builder(Arc::new(desc))
1540 .set_compression(codec)
1541 .set_total_compressed_size(reader.len() as i64)
1542 .set_num_values(total_num_values)
1543 .build()
1544 .unwrap();
1545
1546 let props = ReaderProperties::builder()
1547 .set_backward_compatible_lz4(false)
1548 .build();
1549 let mut page_reader = SerializedPageReader::new_with_properties(
1550 Arc::new(reader),
1551 &meta,
1552 total_num_values as usize,
1553 None,
1554 Arc::new(props),
1555 )
1556 .unwrap();
1557
1558 while let Some(page) = page_reader.get_next_page().unwrap() {
1559 result_pages.push(page);
1560 }
1561 }
1562
1563 assert_eq!(result_pages.len(), pages.len());
1564 for i in 0..result_pages.len() {
1565 assert_page(&result_pages[i], &pages[i]);
1566 }
1567 }
1568
1569 fn compress_helper(compressor: Option<&mut Box<dyn Codec>>, data: &[u8]) -> Vec<u8> {
1571 let mut output_buf = vec![];
1572 if let Some(cmpr) = compressor {
1573 cmpr.compress(data, &mut output_buf).unwrap();
1574 } else {
1575 output_buf.extend_from_slice(data);
1576 }
1577 output_buf
1578 }
1579
1580 fn assert_page(left: &Page, right: &Page) {
1582 assert_eq!(left.page_type(), right.page_type());
1583 assert_eq!(&left.buffer(), &right.buffer());
1584 assert_eq!(left.num_values(), right.num_values());
1585 assert_eq!(left.encoding(), right.encoding());
1586 assert_eq!(to_thrift(left.statistics()), to_thrift(right.statistics()));
1587 }
1588
1589 fn test_roundtrip_i32<W, R>(
1591 file: W,
1592 data: Vec<Vec<i32>>,
1593 compression: Compression,
1594 ) -> crate::format::FileMetaData
1595 where
1596 W: Write + Send,
1597 R: ChunkReader + From<W> + 'static,
1598 {
1599 test_roundtrip::<W, R, Int32Type, _>(file, data, |r| r.get_int(0).unwrap(), compression)
1600 }
1601
1602 fn test_roundtrip<W, R, D, F>(
1605 mut file: W,
1606 data: Vec<Vec<D::T>>,
1607 value: F,
1608 compression: Compression,
1609 ) -> crate::format::FileMetaData
1610 where
1611 W: Write + Send,
1612 R: ChunkReader + From<W> + 'static,
1613 D: DataType,
1614 F: Fn(Row) -> D::T,
1615 {
1616 let schema = Arc::new(
1617 types::Type::group_type_builder("schema")
1618 .with_fields(vec![Arc::new(
1619 types::Type::primitive_type_builder("col1", D::get_physical_type())
1620 .with_repetition(Repetition::REQUIRED)
1621 .build()
1622 .unwrap(),
1623 )])
1624 .build()
1625 .unwrap(),
1626 );
1627 let props = Arc::new(
1628 WriterProperties::builder()
1629 .set_compression(compression)
1630 .build(),
1631 );
1632 let mut file_writer = SerializedFileWriter::new(&mut file, schema, props).unwrap();
1633 let mut rows: i64 = 0;
1634
1635 for (idx, subset) in data.iter().enumerate() {
1636 let row_group_file_offset = file_writer.buf.bytes_written();
1637 let mut row_group_writer = file_writer.next_row_group().unwrap();
1638 if let Some(mut writer) = row_group_writer.next_column().unwrap() {
1639 rows += writer
1640 .typed::<D>()
1641 .write_batch(&subset[..], None, None)
1642 .unwrap() as i64;
1643 writer.close().unwrap();
1644 }
1645 let last_group = row_group_writer.close().unwrap();
1646 let flushed = file_writer.flushed_row_groups();
1647 assert_eq!(flushed.len(), idx + 1);
1648 assert_eq!(Some(idx as i16), last_group.ordinal());
1649 assert_eq!(Some(row_group_file_offset as i64), last_group.file_offset());
1650 assert_eq!(&flushed[idx], last_group.as_ref());
1651 }
1652 let file_metadata = file_writer.close().unwrap();
1653
1654 let reader = SerializedFileReader::new(R::from(file)).unwrap();
1655 assert_eq!(reader.num_row_groups(), data.len());
1656 assert_eq!(
1657 reader.metadata().file_metadata().num_rows(),
1658 rows,
1659 "row count in metadata not equal to number of rows written"
1660 );
1661 for (i, item) in data.iter().enumerate().take(reader.num_row_groups()) {
1662 let row_group_reader = reader.get_row_group(i).unwrap();
1663 let iter = row_group_reader.get_row_iter(None).unwrap();
1664 let res: Vec<_> = iter.map(|row| row.unwrap()).map(&value).collect();
1665 let row_group_size = row_group_reader.metadata().total_byte_size();
1666 let uncompressed_size: i64 = row_group_reader
1667 .metadata()
1668 .columns()
1669 .iter()
1670 .map(|v| v.uncompressed_size())
1671 .sum();
1672 assert_eq!(row_group_size, uncompressed_size);
1673 assert_eq!(res, *item);
1674 }
1675 file_metadata
1676 }
1677
1678 fn test_file_roundtrip(file: File, data: Vec<Vec<i32>>) -> crate::format::FileMetaData {
1681 test_roundtrip_i32::<File, File>(file, data, Compression::UNCOMPRESSED)
1682 }
1683
1684 #[test]
1685 fn test_bytes_writer_empty_row_groups() {
1686 test_bytes_roundtrip(vec![], Compression::UNCOMPRESSED);
1687 }
1688
1689 #[test]
1690 fn test_bytes_writer_single_row_group() {
1691 test_bytes_roundtrip(vec![vec![1, 2, 3, 4, 5]], Compression::UNCOMPRESSED);
1692 }
1693
1694 #[test]
1695 fn test_bytes_writer_multiple_row_groups() {
1696 test_bytes_roundtrip(
1697 vec![
1698 vec![1, 2, 3, 4, 5],
1699 vec![1, 2, 3],
1700 vec![1],
1701 vec![1, 2, 3, 4, 5, 6],
1702 ],
1703 Compression::UNCOMPRESSED,
1704 );
1705 }
1706
1707 #[test]
1708 fn test_bytes_writer_single_row_group_compressed() {
1709 test_bytes_roundtrip(vec![vec![1, 2, 3, 4, 5]], Compression::SNAPPY);
1710 }
1711
1712 #[test]
1713 fn test_bytes_writer_multiple_row_groups_compressed() {
1714 test_bytes_roundtrip(
1715 vec![
1716 vec![1, 2, 3, 4, 5],
1717 vec![1, 2, 3],
1718 vec![1],
1719 vec![1, 2, 3, 4, 5, 6],
1720 ],
1721 Compression::SNAPPY,
1722 );
1723 }
1724
1725 fn test_bytes_roundtrip(data: Vec<Vec<i32>>, compression: Compression) {
1726 test_roundtrip_i32::<Vec<u8>, Bytes>(Vec::with_capacity(1024), data, compression);
1727 }
1728
1729 #[test]
1730 fn test_boolean_roundtrip() {
1731 let my_bool_values: Vec<_> = (0..2049).map(|idx| idx % 2 == 0).collect();
1732 test_roundtrip::<Vec<u8>, Bytes, BoolType, _>(
1733 Vec::with_capacity(1024),
1734 vec![my_bool_values],
1735 |r| r.get_bool(0).unwrap(),
1736 Compression::UNCOMPRESSED,
1737 );
1738 }
1739
1740 #[test]
1741 fn test_boolean_compressed_roundtrip() {
1742 let my_bool_values: Vec<_> = (0..2049).map(|idx| idx % 2 == 0).collect();
1743 test_roundtrip::<Vec<u8>, Bytes, BoolType, _>(
1744 Vec::with_capacity(1024),
1745 vec![my_bool_values],
1746 |r| r.get_bool(0).unwrap(),
1747 Compression::SNAPPY,
1748 );
1749 }
1750
1751 #[test]
1752 fn test_column_offset_index_file() {
1753 let file = tempfile::tempfile().unwrap();
1754 let file_metadata = test_file_roundtrip(file, vec![vec![1, 2, 3, 4, 5]]);
1755 file_metadata.row_groups.iter().for_each(|row_group| {
1756 row_group.columns.iter().for_each(|column_chunk| {
1757 assert_ne!(None, column_chunk.column_index_offset);
1758 assert_ne!(None, column_chunk.column_index_length);
1759
1760 assert_ne!(None, column_chunk.offset_index_offset);
1761 assert_ne!(None, column_chunk.offset_index_length);
1762 })
1763 });
1764 }
1765
1766 fn test_kv_metadata(initial_kv: Option<Vec<KeyValue>>, final_kv: Option<Vec<KeyValue>>) {
1767 let schema = Arc::new(
1768 types::Type::group_type_builder("schema")
1769 .with_fields(vec![Arc::new(
1770 types::Type::primitive_type_builder("col1", Type::INT32)
1771 .with_repetition(Repetition::REQUIRED)
1772 .build()
1773 .unwrap(),
1774 )])
1775 .build()
1776 .unwrap(),
1777 );
1778 let mut out = Vec::with_capacity(1024);
1779 let props = Arc::new(
1780 WriterProperties::builder()
1781 .set_key_value_metadata(initial_kv.clone())
1782 .build(),
1783 );
1784 let mut writer = SerializedFileWriter::new(&mut out, schema, props).unwrap();
1785 let mut row_group_writer = writer.next_row_group().unwrap();
1786 let column = row_group_writer.next_column().unwrap().unwrap();
1787 column.close().unwrap();
1788 row_group_writer.close().unwrap();
1789 if let Some(kvs) = &final_kv {
1790 for kv in kvs {
1791 writer.append_key_value_metadata(kv.clone())
1792 }
1793 }
1794 writer.close().unwrap();
1795
1796 let reader = SerializedFileReader::new(Bytes::from(out)).unwrap();
1797 let metadata = reader.metadata().file_metadata();
1798 let keys = metadata.key_value_metadata();
1799
1800 match (initial_kv, final_kv) {
1801 (Some(a), Some(b)) => {
1802 let keys = keys.unwrap();
1803 assert_eq!(keys.len(), a.len() + b.len());
1804 assert_eq!(&keys[..a.len()], a.as_slice());
1805 assert_eq!(&keys[a.len()..], b.as_slice());
1806 }
1807 (Some(v), None) => assert_eq!(keys.unwrap(), &v),
1808 (None, Some(v)) if !v.is_empty() => assert_eq!(keys.unwrap(), &v),
1809 _ => assert!(keys.is_none()),
1810 }
1811 }
1812
1813 #[test]
1814 fn test_append_metadata() {
1815 let kv1 = KeyValue::new("cupcakes".to_string(), "awesome".to_string());
1816 let kv2 = KeyValue::new("bingo".to_string(), "bongo".to_string());
1817
1818 test_kv_metadata(None, None);
1819 test_kv_metadata(Some(vec![kv1.clone()]), None);
1820 test_kv_metadata(None, Some(vec![kv2.clone()]));
1821 test_kv_metadata(Some(vec![kv1.clone()]), Some(vec![kv2.clone()]));
1822 test_kv_metadata(Some(vec![]), Some(vec![kv2]));
1823 test_kv_metadata(Some(vec![]), Some(vec![]));
1824 test_kv_metadata(Some(vec![kv1]), Some(vec![]));
1825 test_kv_metadata(None, Some(vec![]));
1826 }
1827
1828 #[test]
1829 fn test_backwards_compatible_statistics() {
1830 let message_type = "
1831 message test_schema {
1832 REQUIRED INT32 decimal1 (DECIMAL(8,2));
1833 REQUIRED INT32 i32 (INTEGER(32,true));
1834 REQUIRED INT32 u32 (INTEGER(32,false));
1835 }
1836 ";
1837
1838 let schema = Arc::new(parse_message_type(message_type).unwrap());
1839 let props = Default::default();
1840 let mut writer = SerializedFileWriter::new(vec![], schema, props).unwrap();
1841 let mut row_group_writer = writer.next_row_group().unwrap();
1842
1843 for _ in 0..3 {
1844 let mut writer = row_group_writer.next_column().unwrap().unwrap();
1845 writer
1846 .typed::<Int32Type>()
1847 .write_batch(&[1, 2, 3], None, None)
1848 .unwrap();
1849 writer.close().unwrap();
1850 }
1851 let metadata = row_group_writer.close().unwrap();
1852 writer.close().unwrap();
1853
1854 let thrift = metadata.to_thrift();
1855 let encoded_stats: Vec<_> = thrift
1856 .columns
1857 .into_iter()
1858 .map(|x| x.meta_data.unwrap().statistics.unwrap())
1859 .collect();
1860
1861 let s = &encoded_stats[0];
1863 assert_eq!(s.min.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1864 assert_eq!(s.max.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1865 assert_eq!(s.min_value.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1866 assert_eq!(s.max_value.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1867
1868 let s = &encoded_stats[1];
1870 assert_eq!(s.min.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1871 assert_eq!(s.max.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1872 assert_eq!(s.min_value.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1873 assert_eq!(s.max_value.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1874
1875 let s = &encoded_stats[2];
1877 assert_eq!(s.min.as_deref(), None);
1878 assert_eq!(s.max.as_deref(), None);
1879 assert_eq!(s.min_value.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1880 assert_eq!(s.max_value.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1881 }
1882
1883 #[test]
1884 fn test_spliced_write() {
1885 let message_type = "
1886 message test_schema {
1887 REQUIRED INT32 i32 (INTEGER(32,true));
1888 REQUIRED INT32 u32 (INTEGER(32,false));
1889 }
1890 ";
1891 let schema = Arc::new(parse_message_type(message_type).unwrap());
1892 let props = Arc::new(WriterProperties::builder().build());
1893
1894 let mut file = Vec::with_capacity(1024);
1895 let mut file_writer = SerializedFileWriter::new(&mut file, schema, props.clone()).unwrap();
1896
1897 let columns = file_writer.descr.columns();
1898 let mut column_state: Vec<(_, Option<ColumnCloseResult>)> = columns
1899 .iter()
1900 .map(|_| (TrackedWrite::new(Vec::with_capacity(1024)), None))
1901 .collect();
1902
1903 let mut column_state_slice = column_state.as_mut_slice();
1904 let mut column_writers = Vec::with_capacity(columns.len());
1905 for c in columns {
1906 let ((buf, out), tail) = column_state_slice.split_first_mut().unwrap();
1907 column_state_slice = tail;
1908
1909 let page_writer = Box::new(SerializedPageWriter::new(buf));
1910 let col_writer = get_column_writer(c.clone(), props.clone(), page_writer);
1911 column_writers.push(SerializedColumnWriter::new(
1912 col_writer,
1913 Some(Box::new(|on_close| {
1914 *out = Some(on_close);
1915 Ok(())
1916 })),
1917 ));
1918 }
1919
1920 let column_data = [[1, 2, 3, 4], [7, 3, 7, 3]];
1921
1922 for (writer, batch) in column_writers.iter_mut().zip(column_data) {
1924 let writer = writer.typed::<Int32Type>();
1925 writer.write_batch(&batch, None, None).unwrap();
1926 }
1927
1928 for writer in column_writers {
1930 writer.close().unwrap()
1931 }
1932
1933 let mut row_group_writer = file_writer.next_row_group().unwrap();
1935 for (write, close) in column_state {
1936 let buf = Bytes::from(write.into_inner().unwrap());
1937 row_group_writer
1938 .append_column(&buf, close.unwrap())
1939 .unwrap();
1940 }
1941 row_group_writer.close().unwrap();
1942 file_writer.close().unwrap();
1943
1944 let file = Bytes::from(file);
1946 let test_read = |reader: SerializedFileReader<Bytes>| {
1947 let row_group = reader.get_row_group(0).unwrap();
1948
1949 let mut out = Vec::with_capacity(4);
1950 let c1 = row_group.get_column_reader(0).unwrap();
1951 let mut c1 = get_typed_column_reader::<Int32Type>(c1);
1952 c1.read_records(4, None, None, &mut out).unwrap();
1953 assert_eq!(out, column_data[0]);
1954
1955 out.clear();
1956
1957 let c2 = row_group.get_column_reader(1).unwrap();
1958 let mut c2 = get_typed_column_reader::<Int32Type>(c2);
1959 c2.read_records(4, None, None, &mut out).unwrap();
1960 assert_eq!(out, column_data[1]);
1961 };
1962
1963 let reader = SerializedFileReader::new(file.clone()).unwrap();
1964 test_read(reader);
1965
1966 let options = ReadOptionsBuilder::new().with_page_index().build();
1967 let reader = SerializedFileReader::new_with_options(file, options).unwrap();
1968 test_read(reader);
1969 }
1970
1971 #[test]
1972 fn test_disabled_statistics() {
1973 let message_type = "
1974 message test_schema {
1975 REQUIRED INT32 a;
1976 REQUIRED INT32 b;
1977 }
1978 ";
1979 let schema = Arc::new(parse_message_type(message_type).unwrap());
1980 let props = WriterProperties::builder()
1981 .set_statistics_enabled(EnabledStatistics::None)
1982 .set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
1983 .set_offset_index_disabled(true) .build();
1985 let mut file = Vec::with_capacity(1024);
1986 let mut file_writer =
1987 SerializedFileWriter::new(&mut file, schema, Arc::new(props)).unwrap();
1988
1989 let mut row_group_writer = file_writer.next_row_group().unwrap();
1990 let mut a_writer = row_group_writer.next_column().unwrap().unwrap();
1991 let col_writer = a_writer.typed::<Int32Type>();
1992 col_writer.write_batch(&[1, 2, 3], None, None).unwrap();
1993 a_writer.close().unwrap();
1994
1995 let mut b_writer = row_group_writer.next_column().unwrap().unwrap();
1996 let col_writer = b_writer.typed::<Int32Type>();
1997 col_writer.write_batch(&[4, 5, 6], None, None).unwrap();
1998 b_writer.close().unwrap();
1999 row_group_writer.close().unwrap();
2000
2001 let metadata = file_writer.finish().unwrap();
2002 assert_eq!(metadata.row_groups.len(), 1);
2003 let row_group = &metadata.row_groups[0];
2004 assert_eq!(row_group.columns.len(), 2);
2005 assert!(row_group.columns[0].offset_index_offset.is_some());
2007 assert!(row_group.columns[0].column_index_offset.is_some());
2008 assert!(row_group.columns[1].offset_index_offset.is_some());
2010 assert!(row_group.columns[1].column_index_offset.is_none());
2011
2012 let err = file_writer.next_row_group().err().unwrap().to_string();
2013 assert_eq!(err, "Parquet error: SerializedFileWriter already finished");
2014
2015 drop(file_writer);
2016
2017 let options = ReadOptionsBuilder::new().with_page_index().build();
2018 let reader = SerializedFileReader::new_with_options(Bytes::from(file), options).unwrap();
2019
2020 let offset_index = reader.metadata().offset_index().unwrap();
2021 assert_eq!(offset_index.len(), 1); assert_eq!(offset_index[0].len(), 2); let column_index = reader.metadata().column_index().unwrap();
2025 assert_eq!(column_index.len(), 1); assert_eq!(column_index[0].len(), 2); let a_idx = &column_index[0][0];
2029 assert!(matches!(a_idx, Index::INT32(_)), "{a_idx:?}");
2030 let b_idx = &column_index[0][1];
2031 assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
2032 }
2033
2034 #[test]
2035 fn test_byte_array_size_statistics() {
2036 let message_type = "
2037 message test_schema {
2038 OPTIONAL BYTE_ARRAY a (UTF8);
2039 }
2040 ";
2041 let schema = Arc::new(parse_message_type(message_type).unwrap());
2042 let data = ByteArrayType::gen_vec(32, 7);
2043 let def_levels = [1, 1, 1, 1, 0, 1, 0, 1, 0, 1];
2044 let unenc_size: i64 = data.iter().map(|x| x.len() as i64).sum();
2045 let file: File = tempfile::tempfile().unwrap();
2046 let props = Arc::new(
2047 WriterProperties::builder()
2048 .set_statistics_enabled(EnabledStatistics::Page)
2049 .build(),
2050 );
2051
2052 let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
2053 let mut row_group_writer = writer.next_row_group().unwrap();
2054
2055 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
2056 col_writer
2057 .typed::<ByteArrayType>()
2058 .write_batch(&data, Some(&def_levels), None)
2059 .unwrap();
2060 col_writer.close().unwrap();
2061 row_group_writer.close().unwrap();
2062 let file_metadata = writer.close().unwrap();
2063
2064 assert_eq!(file_metadata.row_groups.len(), 1);
2065 assert_eq!(file_metadata.row_groups[0].columns.len(), 1);
2066 assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
2067
2068 let check_def_hist = |def_hist: &[i64]| {
2069 assert_eq!(def_hist.len(), 2);
2070 assert_eq!(def_hist[0], 3);
2071 assert_eq!(def_hist[1], 7);
2072 };
2073
2074 assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
2075 let meta_data = file_metadata.row_groups[0].columns[0]
2076 .meta_data
2077 .as_ref()
2078 .unwrap();
2079 assert!(meta_data.size_statistics.is_some());
2080 let size_stats = meta_data.size_statistics.as_ref().unwrap();
2081
2082 assert!(size_stats.repetition_level_histogram.is_none());
2083 assert!(size_stats.definition_level_histogram.is_some());
2084 assert!(size_stats.unencoded_byte_array_data_bytes.is_some());
2085 assert_eq!(
2086 unenc_size,
2087 size_stats.unencoded_byte_array_data_bytes.unwrap()
2088 );
2089 check_def_hist(size_stats.definition_level_histogram.as_ref().unwrap());
2090
2091 let options = ReadOptionsBuilder::new().with_page_index().build();
2093 let reader = SerializedFileReader::new_with_options(file, options).unwrap();
2094
2095 let rfile_metadata = reader.metadata().file_metadata();
2096 assert_eq!(rfile_metadata.num_rows(), file_metadata.num_rows);
2097 assert_eq!(reader.num_row_groups(), 1);
2098 let rowgroup = reader.get_row_group(0).unwrap();
2099 assert_eq!(rowgroup.num_columns(), 1);
2100 let column = rowgroup.metadata().column(0);
2101 assert!(column.definition_level_histogram().is_some());
2102 assert!(column.repetition_level_histogram().is_none());
2103 assert!(column.unencoded_byte_array_data_bytes().is_some());
2104 check_def_hist(column.definition_level_histogram().unwrap().values());
2105 assert_eq!(
2106 unenc_size,
2107 column.unencoded_byte_array_data_bytes().unwrap()
2108 );
2109
2110 assert!(reader.metadata().column_index().is_some());
2112 let column_index = reader.metadata().column_index().unwrap();
2113 assert_eq!(column_index.len(), 1);
2114 assert_eq!(column_index[0].len(), 1);
2115 let col_idx = if let Index::BYTE_ARRAY(index) = &column_index[0][0] {
2116 assert_eq!(index.indexes.len(), 1);
2117 &index.indexes[0]
2118 } else {
2119 unreachable!()
2120 };
2121
2122 assert!(col_idx.repetition_level_histogram().is_none());
2123 assert!(col_idx.definition_level_histogram().is_some());
2124 check_def_hist(col_idx.definition_level_histogram().unwrap().values());
2125
2126 assert!(reader.metadata().offset_index().is_some());
2127 let offset_index = reader.metadata().offset_index().unwrap();
2128 assert_eq!(offset_index.len(), 1);
2129 assert_eq!(offset_index[0].len(), 1);
2130 assert!(offset_index[0][0].unencoded_byte_array_data_bytes.is_some());
2131 let page_sizes = offset_index[0][0]
2132 .unencoded_byte_array_data_bytes
2133 .as_ref()
2134 .unwrap();
2135 assert_eq!(page_sizes.len(), 1);
2136 assert_eq!(page_sizes[0], unenc_size);
2137 }
2138
2139 #[test]
2140 fn test_too_many_rowgroups() {
2141 let message_type = "
2142 message test_schema {
2143 REQUIRED BYTE_ARRAY a (UTF8);
2144 }
2145 ";
2146 let schema = Arc::new(parse_message_type(message_type).unwrap());
2147 let file: File = tempfile::tempfile().unwrap();
2148 let props = Arc::new(
2149 WriterProperties::builder()
2150 .set_statistics_enabled(EnabledStatistics::None)
2151 .set_max_row_group_size(1)
2152 .build(),
2153 );
2154 let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
2155
2156 for i in 0..0x8001 {
2158 match writer.next_row_group() {
2159 Ok(mut row_group_writer) => {
2160 assert_ne!(i, 0x8000);
2161 let col_writer = row_group_writer.next_column().unwrap().unwrap();
2162 col_writer.close().unwrap();
2163 row_group_writer.close().unwrap();
2164 }
2165 Err(e) => {
2166 assert_eq!(i, 0x8000);
2167 assert_eq!(
2168 e.to_string(),
2169 "Parquet error: Parquet does not support more than 32767 row groups per file (currently: 32768)"
2170 );
2171 }
2172 }
2173 }
2174 writer.close().unwrap();
2175 }
2176
2177 #[test]
2178 fn test_size_statistics_with_repetition_and_nulls() {
2179 let message_type = "
2180 message test_schema {
2181 OPTIONAL group i32_list (LIST) {
2182 REPEATED group list {
2183 OPTIONAL INT32 element;
2184 }
2185 }
2186 }
2187 ";
2188 let schema = Arc::new(parse_message_type(message_type).unwrap());
2195 let data = [1, 2, 4, 7, 8, 9, 10];
2196 let def_levels = [3, 3, 0, 3, 2, 1, 3, 3, 3, 3];
2197 let rep_levels = [0, 1, 0, 0, 1, 0, 0, 1, 1, 1];
2198 let file = tempfile::tempfile().unwrap();
2199 let props = Arc::new(
2200 WriterProperties::builder()
2201 .set_statistics_enabled(EnabledStatistics::Page)
2202 .build(),
2203 );
2204 let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
2205 let mut row_group_writer = writer.next_row_group().unwrap();
2206
2207 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
2208 col_writer
2209 .typed::<Int32Type>()
2210 .write_batch(&data, Some(&def_levels), Some(&rep_levels))
2211 .unwrap();
2212 col_writer.close().unwrap();
2213 row_group_writer.close().unwrap();
2214 let file_metadata = writer.close().unwrap();
2215
2216 assert_eq!(file_metadata.row_groups.len(), 1);
2217 assert_eq!(file_metadata.row_groups[0].columns.len(), 1);
2218 assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
2219
2220 let check_def_hist = |def_hist: &[i64]| {
2221 assert_eq!(def_hist.len(), 4);
2222 assert_eq!(def_hist[0], 1);
2223 assert_eq!(def_hist[1], 1);
2224 assert_eq!(def_hist[2], 1);
2225 assert_eq!(def_hist[3], 7);
2226 };
2227
2228 let check_rep_hist = |rep_hist: &[i64]| {
2229 assert_eq!(rep_hist.len(), 2);
2230 assert_eq!(rep_hist[0], 5);
2231 assert_eq!(rep_hist[1], 5);
2232 };
2233
2234 assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
2237 let meta_data = file_metadata.row_groups[0].columns[0]
2238 .meta_data
2239 .as_ref()
2240 .unwrap();
2241 assert!(meta_data.size_statistics.is_some());
2242 let size_stats = meta_data.size_statistics.as_ref().unwrap();
2243 assert!(size_stats.repetition_level_histogram.is_some());
2244 assert!(size_stats.definition_level_histogram.is_some());
2245 assert!(size_stats.unencoded_byte_array_data_bytes.is_none());
2246 check_def_hist(size_stats.definition_level_histogram.as_ref().unwrap());
2247 check_rep_hist(size_stats.repetition_level_histogram.as_ref().unwrap());
2248
2249 let options = ReadOptionsBuilder::new().with_page_index().build();
2251 let reader = SerializedFileReader::new_with_options(file, options).unwrap();
2252
2253 let rfile_metadata = reader.metadata().file_metadata();
2254 assert_eq!(rfile_metadata.num_rows(), file_metadata.num_rows);
2255 assert_eq!(reader.num_row_groups(), 1);
2256 let rowgroup = reader.get_row_group(0).unwrap();
2257 assert_eq!(rowgroup.num_columns(), 1);
2258 let column = rowgroup.metadata().column(0);
2259 assert!(column.definition_level_histogram().is_some());
2260 assert!(column.repetition_level_histogram().is_some());
2261 assert!(column.unencoded_byte_array_data_bytes().is_none());
2262 check_def_hist(column.definition_level_histogram().unwrap().values());
2263 check_rep_hist(column.repetition_level_histogram().unwrap().values());
2264
2265 assert!(reader.metadata().column_index().is_some());
2267 let column_index = reader.metadata().column_index().unwrap();
2268 assert_eq!(column_index.len(), 1);
2269 assert_eq!(column_index[0].len(), 1);
2270 let col_idx = if let Index::INT32(index) = &column_index[0][0] {
2271 assert_eq!(index.indexes.len(), 1);
2272 &index.indexes[0]
2273 } else {
2274 unreachable!()
2275 };
2276
2277 check_def_hist(col_idx.definition_level_histogram().unwrap().values());
2278 check_rep_hist(col_idx.repetition_level_histogram().unwrap().values());
2279
2280 assert!(reader.metadata().offset_index().is_some());
2281 let offset_index = reader.metadata().offset_index().unwrap();
2282 assert_eq!(offset_index.len(), 1);
2283 assert_eq!(offset_index[0].len(), 1);
2284 assert!(offset_index[0][0].unencoded_byte_array_data_bytes.is_none());
2285 }
2286
2287 #[test]
2288 #[cfg(feature = "arrow")]
2289 fn test_byte_stream_split_extended_roundtrip() {
2290 let path = format!(
2291 "{}/byte_stream_split_extended.gzip.parquet",
2292 arrow::util::test_util::parquet_test_data(),
2293 );
2294 let file = File::open(path).unwrap();
2295
2296 let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(file)
2298 .expect("parquet open")
2299 .build()
2300 .expect("parquet open");
2301
2302 let file = tempfile::tempfile().unwrap();
2303 let props = WriterProperties::builder()
2304 .set_dictionary_enabled(false)
2305 .set_column_encoding(
2306 ColumnPath::from("float16_byte_stream_split"),
2307 Encoding::BYTE_STREAM_SPLIT,
2308 )
2309 .set_column_encoding(
2310 ColumnPath::from("float_byte_stream_split"),
2311 Encoding::BYTE_STREAM_SPLIT,
2312 )
2313 .set_column_encoding(
2314 ColumnPath::from("double_byte_stream_split"),
2315 Encoding::BYTE_STREAM_SPLIT,
2316 )
2317 .set_column_encoding(
2318 ColumnPath::from("int32_byte_stream_split"),
2319 Encoding::BYTE_STREAM_SPLIT,
2320 )
2321 .set_column_encoding(
2322 ColumnPath::from("int64_byte_stream_split"),
2323 Encoding::BYTE_STREAM_SPLIT,
2324 )
2325 .set_column_encoding(
2326 ColumnPath::from("flba5_byte_stream_split"),
2327 Encoding::BYTE_STREAM_SPLIT,
2328 )
2329 .set_column_encoding(
2330 ColumnPath::from("decimal_byte_stream_split"),
2331 Encoding::BYTE_STREAM_SPLIT,
2332 )
2333 .build();
2334
2335 let mut parquet_writer = ArrowWriter::try_new(
2336 file.try_clone().expect("cannot open file"),
2337 parquet_reader.schema(),
2338 Some(props),
2339 )
2340 .expect("create arrow writer");
2341
2342 for maybe_batch in parquet_reader {
2343 let batch = maybe_batch.expect("reading batch");
2344 parquet_writer.write(&batch).expect("writing data");
2345 }
2346
2347 parquet_writer.close().expect("finalizing file");
2348
2349 let reader = SerializedFileReader::new(file).expect("Failed to create reader");
2350 let filemeta = reader.metadata();
2351
2352 let check_encoding = |x: usize, filemeta: &ParquetMetaData| {
2354 assert!(filemeta
2355 .row_group(0)
2356 .column(x)
2357 .encodings()
2358 .contains(&Encoding::BYTE_STREAM_SPLIT));
2359 };
2360
2361 check_encoding(1, filemeta);
2362 check_encoding(3, filemeta);
2363 check_encoding(5, filemeta);
2364 check_encoding(7, filemeta);
2365 check_encoding(9, filemeta);
2366 check_encoding(11, filemeta);
2367 check_encoding(13, filemeta);
2368
2369 let mut iter = reader
2371 .get_row_iter(None)
2372 .expect("Failed to create row iterator");
2373
2374 let mut start = 0;
2375 let end = reader.metadata().file_metadata().num_rows();
2376
2377 let check_row = |row: Result<Row, ParquetError>| {
2378 assert!(row.is_ok());
2379 let r = row.unwrap();
2380 assert_eq!(r.get_float16(0).unwrap(), r.get_float16(1).unwrap());
2381 assert_eq!(r.get_float(2).unwrap(), r.get_float(3).unwrap());
2382 assert_eq!(r.get_double(4).unwrap(), r.get_double(5).unwrap());
2383 assert_eq!(r.get_int(6).unwrap(), r.get_int(7).unwrap());
2384 assert_eq!(r.get_long(8).unwrap(), r.get_long(9).unwrap());
2385 assert_eq!(r.get_bytes(10).unwrap(), r.get_bytes(11).unwrap());
2386 assert_eq!(r.get_decimal(12).unwrap(), r.get_decimal(13).unwrap());
2387 };
2388
2389 while start < end {
2390 match iter.next() {
2391 Some(row) => check_row(row),
2392 None => break,
2393 };
2394 start += 1;
2395 }
2396 }
2397}