1use crate::bloom_filter::Sbbf;
22use crate::format as parquet;
23use crate::format::{ColumnIndex, OffsetIndex};
24use crate::thrift::TSerializable;
25use std::fmt::Debug;
26use std::io::{BufWriter, IoSlice, Read};
27use std::{io::Write, sync::Arc};
28use thrift::protocol::TCompactOutputProtocol;
29
30use crate::column::writer::{get_typed_column_writer_mut, ColumnCloseResult, ColumnWriterImpl};
31use crate::column::{
32 page::{CompressedPage, PageWriteSpec, PageWriter},
33 writer::{get_column_writer, ColumnWriter},
34};
35use crate::data_type::DataType;
36use crate::errors::{ParquetError, Result};
37use crate::file::properties::{BloomFilterPosition, WriterPropertiesPtr};
38use crate::file::reader::ChunkReader;
39use crate::file::{metadata::*, PARQUET_MAGIC};
40use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor, TypePtr};
41
42pub struct TrackedWrite<W: Write> {
46 inner: BufWriter<W>,
47 bytes_written: usize,
48}
49
50impl<W: Write> TrackedWrite<W> {
51 pub fn new(inner: W) -> Self {
53 let buf_write = BufWriter::new(inner);
54 Self {
55 inner: buf_write,
56 bytes_written: 0,
57 }
58 }
59
60 pub fn bytes_written(&self) -> usize {
62 self.bytes_written
63 }
64
65 pub fn inner(&self) -> &W {
67 self.inner.get_ref()
68 }
69
70 pub fn inner_mut(&mut self) -> &mut W {
75 self.inner.get_mut()
76 }
77
78 pub fn into_inner(self) -> Result<W> {
80 self.inner.into_inner().map_err(|err| {
81 ParquetError::General(format!("fail to get inner writer: {:?}", err.to_string()))
82 })
83 }
84}
85
86impl<W: Write> Write for TrackedWrite<W> {
87 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
88 let bytes = self.inner.write(buf)?;
89 self.bytes_written += bytes;
90 Ok(bytes)
91 }
92
93 fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> std::io::Result<usize> {
94 let bytes = self.inner.write_vectored(bufs)?;
95 self.bytes_written += bytes;
96 Ok(bytes)
97 }
98
99 fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
100 self.inner.write_all(buf)?;
101 self.bytes_written += buf.len();
102
103 Ok(())
104 }
105
106 fn flush(&mut self) -> std::io::Result<()> {
107 self.inner.flush()
108 }
109}
110
111pub type OnCloseColumnChunk<'a> = Box<dyn FnOnce(ColumnCloseResult) -> Result<()> + 'a>;
113
114pub type OnCloseRowGroup<'a, W> = Box<
120 dyn FnOnce(
121 &'a mut TrackedWrite<W>,
122 RowGroupMetaData,
123 Vec<Option<Sbbf>>,
124 Vec<Option<ColumnIndex>>,
125 Vec<Option<OffsetIndex>>,
126 ) -> Result<()>
127 + 'a
128 + Send,
129>;
130
131pub struct SerializedFileWriter<W: Write> {
144 buf: TrackedWrite<W>,
145 schema: TypePtr,
146 descr: SchemaDescPtr,
147 props: WriterPropertiesPtr,
148 row_groups: Vec<RowGroupMetaData>,
149 bloom_filters: Vec<Vec<Option<Sbbf>>>,
150 column_indexes: Vec<Vec<Option<ColumnIndex>>>,
151 offset_indexes: Vec<Vec<Option<OffsetIndex>>>,
152 row_group_index: usize,
153 kv_metadatas: Vec<KeyValue>,
155 finished: bool,
156}
157
158impl<W: Write> Debug for SerializedFileWriter<W> {
159 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
160 f.debug_struct("SerializedFileWriter")
163 .field("descr", &self.descr)
164 .field("row_group_index", &self.row_group_index)
165 .field("kv_metadatas", &self.kv_metadatas)
166 .finish_non_exhaustive()
167 }
168}
169
170impl<W: Write + Send> SerializedFileWriter<W> {
171 pub fn new(buf: W, schema: TypePtr, properties: WriterPropertiesPtr) -> Result<Self> {
173 let mut buf = TrackedWrite::new(buf);
174 Self::start_file(&mut buf)?;
175 Ok(Self {
176 buf,
177 schema: schema.clone(),
178 descr: Arc::new(SchemaDescriptor::new(schema)),
179 props: properties,
180 row_groups: vec![],
181 bloom_filters: vec![],
182 column_indexes: Vec::new(),
183 offset_indexes: Vec::new(),
184 row_group_index: 0,
185 kv_metadatas: Vec::new(),
186 finished: false,
187 })
188 }
189
190 pub fn next_row_group(&mut self) -> Result<SerializedRowGroupWriter<'_, W>> {
197 self.assert_previous_writer_closed()?;
198 let ordinal = self.row_group_index;
199
200 let ordinal: i16 = ordinal.try_into().map_err(|_| {
201 ParquetError::General(format!(
202 "Parquet does not support more than {} row groups per file (currently: {})",
203 i16::MAX,
204 ordinal
205 ))
206 })?;
207
208 self.row_group_index = self
209 .row_group_index
210 .checked_add(1)
211 .expect("SerializedFileWriter::row_group_index overflowed");
212
213 let bloom_filter_position = self.properties().bloom_filter_position();
214 let row_groups = &mut self.row_groups;
215 let row_bloom_filters = &mut self.bloom_filters;
216 let row_column_indexes = &mut self.column_indexes;
217 let row_offset_indexes = &mut self.offset_indexes;
218 let on_close = move |buf,
219 mut metadata,
220 row_group_bloom_filter,
221 row_group_column_index,
222 row_group_offset_index| {
223 row_bloom_filters.push(row_group_bloom_filter);
224 row_column_indexes.push(row_group_column_index);
225 row_offset_indexes.push(row_group_offset_index);
226 match bloom_filter_position {
228 BloomFilterPosition::AfterRowGroup => {
229 write_bloom_filters(buf, row_bloom_filters, &mut metadata)?
230 }
231 BloomFilterPosition::End => (),
232 };
233 row_groups.push(metadata);
234 Ok(())
235 };
236
237 let row_group_writer = SerializedRowGroupWriter::new(
238 self.descr.clone(),
239 self.props.clone(),
240 &mut self.buf,
241 ordinal,
242 Some(Box::new(on_close)),
243 );
244 Ok(row_group_writer)
245 }
246
247 pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
249 &self.row_groups
250 }
251
252 pub fn finish(&mut self) -> Result<parquet::FileMetaData> {
258 self.assert_previous_writer_closed()?;
259 let metadata = self.write_metadata()?;
260 self.buf.flush()?;
261 Ok(metadata)
262 }
263
264 pub fn close(mut self) -> Result<parquet::FileMetaData> {
266 self.finish()
267 }
268
269 fn start_file(buf: &mut TrackedWrite<W>) -> Result<()> {
271 buf.write_all(&PARQUET_MAGIC)?;
272 Ok(())
273 }
274
275 fn write_metadata(&mut self) -> Result<parquet::FileMetaData> {
277 self.finished = true;
278
279 for row_group in &mut self.row_groups {
281 write_bloom_filters(&mut self.buf, &mut self.bloom_filters, row_group)?;
282 }
283
284 let key_value_metadata = match self.props.key_value_metadata() {
285 Some(kv) => Some(kv.iter().chain(&self.kv_metadatas).cloned().collect()),
286 None if self.kv_metadatas.is_empty() => None,
287 None => Some(self.kv_metadatas.clone()),
288 };
289
290 let row_groups = self
291 .row_groups
292 .iter()
293 .map(|v| v.to_thrift())
294 .collect::<Vec<_>>();
295
296 let mut encoder = ThriftMetadataWriter::new(
297 &mut self.buf,
298 &self.schema,
299 &self.descr,
300 row_groups,
301 Some(self.props.created_by().to_string()),
302 self.props.writer_version().as_num(),
303 );
304 if let Some(key_value_metadata) = key_value_metadata {
305 encoder = encoder.with_key_value_metadata(key_value_metadata)
306 }
307 encoder = encoder.with_column_indexes(&self.column_indexes);
308 encoder = encoder.with_offset_indexes(&self.offset_indexes);
309 encoder.finish()
310 }
311
312 #[inline]
313 fn assert_previous_writer_closed(&self) -> Result<()> {
314 if self.finished {
315 return Err(general_err!("SerializedFileWriter already finished"));
316 }
317
318 if self.row_group_index != self.row_groups.len() {
319 Err(general_err!("Previous row group writer was not closed"))
320 } else {
321 Ok(())
322 }
323 }
324
325 pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
327 self.kv_metadatas.push(kv_metadata);
328 }
329
330 pub fn schema_descr(&self) -> &SchemaDescriptor {
332 &self.descr
333 }
334
335 pub fn properties(&self) -> &WriterPropertiesPtr {
337 &self.props
338 }
339
340 pub fn inner(&self) -> &W {
342 self.buf.inner()
343 }
344
345 pub fn inner_mut(&mut self) -> &mut W {
349 self.buf.inner_mut()
350 }
351
352 pub fn into_inner(mut self) -> Result<W> {
354 self.assert_previous_writer_closed()?;
355 let _ = self.write_metadata()?;
356
357 self.buf.into_inner()
358 }
359
360 pub fn bytes_written(&self) -> usize {
362 self.buf.bytes_written()
363 }
364}
365
366fn write_bloom_filters<W: Write + Send>(
369 buf: &mut TrackedWrite<W>,
370 bloom_filters: &mut [Vec<Option<Sbbf>>],
371 row_group: &mut RowGroupMetaData,
372) -> Result<()> {
373 let row_group_idx: u16 = row_group
378 .ordinal()
379 .expect("Missing row group ordinal")
380 .try_into()
381 .map_err(|_| {
382 ParquetError::General(format!(
383 "Negative row group ordinal: {})",
384 row_group.ordinal().unwrap()
385 ))
386 })?;
387 let row_group_idx = row_group_idx as usize;
388 for (column_idx, column_chunk) in row_group.columns_mut().iter_mut().enumerate() {
389 if let Some(bloom_filter) = bloom_filters[row_group_idx][column_idx].take() {
390 let start_offset = buf.bytes_written();
391 bloom_filter.write(&mut *buf)?;
392 let end_offset = buf.bytes_written();
393 *column_chunk = column_chunk
395 .clone()
396 .into_builder()
397 .set_bloom_filter_offset(Some(start_offset as i64))
398 .set_bloom_filter_length(Some((end_offset - start_offset) as i32))
399 .build()?;
400 }
401 }
402 Ok(())
403}
404
405pub struct SerializedRowGroupWriter<'a, W: Write> {
417 descr: SchemaDescPtr,
418 props: WriterPropertiesPtr,
419 buf: &'a mut TrackedWrite<W>,
420 total_rows_written: Option<u64>,
421 total_bytes_written: u64,
422 total_uncompressed_bytes: i64,
423 column_index: usize,
424 row_group_metadata: Option<RowGroupMetaDataPtr>,
425 column_chunks: Vec<ColumnChunkMetaData>,
426 bloom_filters: Vec<Option<Sbbf>>,
427 column_indexes: Vec<Option<ColumnIndex>>,
428 offset_indexes: Vec<Option<OffsetIndex>>,
429 row_group_index: i16,
430 file_offset: i64,
431 on_close: Option<OnCloseRowGroup<'a, W>>,
432}
433
434impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> {
435 pub fn new(
444 schema_descr: SchemaDescPtr,
445 properties: WriterPropertiesPtr,
446 buf: &'a mut TrackedWrite<W>,
447 row_group_index: i16,
448 on_close: Option<OnCloseRowGroup<'a, W>>,
449 ) -> Self {
450 let num_columns = schema_descr.num_columns();
451 let file_offset = buf.bytes_written() as i64;
452 Self {
453 buf,
454 row_group_index,
455 file_offset,
456 on_close,
457 total_rows_written: None,
458 descr: schema_descr,
459 props: properties,
460 column_index: 0,
461 row_group_metadata: None,
462 column_chunks: Vec::with_capacity(num_columns),
463 bloom_filters: Vec::with_capacity(num_columns),
464 column_indexes: Vec::with_capacity(num_columns),
465 offset_indexes: Vec::with_capacity(num_columns),
466 total_bytes_written: 0,
467 total_uncompressed_bytes: 0,
468 }
469 }
470
471 fn next_column_desc(&mut self) -> Option<ColumnDescPtr> {
473 let ret = self.descr.columns().get(self.column_index)?.clone();
474 self.column_index += 1;
475 Some(ret)
476 }
477
478 fn get_on_close(&mut self) -> (&mut TrackedWrite<W>, OnCloseColumnChunk<'_>) {
480 let total_bytes_written = &mut self.total_bytes_written;
481 let total_uncompressed_bytes = &mut self.total_uncompressed_bytes;
482 let total_rows_written = &mut self.total_rows_written;
483 let column_chunks = &mut self.column_chunks;
484 let column_indexes = &mut self.column_indexes;
485 let offset_indexes = &mut self.offset_indexes;
486 let bloom_filters = &mut self.bloom_filters;
487
488 let on_close = |r: ColumnCloseResult| {
489 *total_bytes_written += r.bytes_written;
491 *total_uncompressed_bytes += r.metadata.uncompressed_size();
492 column_chunks.push(r.metadata);
493 bloom_filters.push(r.bloom_filter);
494 column_indexes.push(r.column_index);
495 offset_indexes.push(r.offset_index);
496
497 if let Some(rows) = *total_rows_written {
498 if rows != r.rows_written {
499 return Err(general_err!(
500 "Incorrect number of rows, expected {} != {} rows",
501 rows,
502 r.rows_written
503 ));
504 }
505 } else {
506 *total_rows_written = Some(r.rows_written);
507 }
508
509 Ok(())
510 };
511 (self.buf, Box::new(on_close))
512 }
513
514 pub(crate) fn next_column_with_factory<'b, F, C>(&'b mut self, factory: F) -> Result<Option<C>>
517 where
518 F: FnOnce(
519 ColumnDescPtr,
520 WriterPropertiesPtr,
521 Box<dyn PageWriter + 'b>,
522 OnCloseColumnChunk<'b>,
523 ) -> Result<C>,
524 {
525 self.assert_previous_writer_closed()?;
526 Ok(match self.next_column_desc() {
527 Some(column) => {
528 let props = self.props.clone();
529 let (buf, on_close) = self.get_on_close();
530 let page_writer = Box::new(SerializedPageWriter::new(buf));
531 Some(factory(column, props, page_writer, Box::new(on_close))?)
532 }
533 None => None,
534 })
535 }
536
537 pub fn next_column(&mut self) -> Result<Option<SerializedColumnWriter<'_>>> {
541 self.next_column_with_factory(|descr, props, page_writer, on_close| {
542 let column_writer = get_column_writer(descr, props, page_writer);
543 Ok(SerializedColumnWriter::new(column_writer, Some(on_close)))
544 })
545 }
546
547 pub fn append_column<R: ChunkReader>(
554 &mut self,
555 reader: &R,
556 mut close: ColumnCloseResult,
557 ) -> Result<()> {
558 self.assert_previous_writer_closed()?;
559 let desc = self
560 .next_column_desc()
561 .ok_or_else(|| general_err!("exhausted columns in SerializedRowGroupWriter"))?;
562
563 let metadata = close.metadata;
564
565 if metadata.column_descr() != desc.as_ref() {
566 return Err(general_err!(
567 "column descriptor mismatch, expected {:?} got {:?}",
568 desc,
569 metadata.column_descr()
570 ));
571 }
572
573 let src_dictionary_offset = metadata.dictionary_page_offset();
574 let src_data_offset = metadata.data_page_offset();
575 let src_offset = src_dictionary_offset.unwrap_or(src_data_offset);
576 let src_length = metadata.compressed_size();
577
578 let write_offset = self.buf.bytes_written();
579 let mut read = reader.get_read(src_offset as _)?.take(src_length as _);
580 let write_length = std::io::copy(&mut read, &mut self.buf)?;
581
582 if src_length as u64 != write_length {
583 return Err(general_err!(
584 "Failed to splice column data, expected {read_length} got {write_length}"
585 ));
586 }
587
588 let map_offset = |x| x - src_offset + write_offset as i64;
589 let mut builder = ColumnChunkMetaData::builder(metadata.column_descr_ptr())
590 .set_compression(metadata.compression())
591 .set_encodings(metadata.encodings().clone())
592 .set_total_compressed_size(metadata.compressed_size())
593 .set_total_uncompressed_size(metadata.uncompressed_size())
594 .set_num_values(metadata.num_values())
595 .set_data_page_offset(map_offset(src_data_offset))
596 .set_dictionary_page_offset(src_dictionary_offset.map(map_offset))
597 .set_unencoded_byte_array_data_bytes(metadata.unencoded_byte_array_data_bytes());
598
599 if let Some(rep_hist) = metadata.repetition_level_histogram() {
600 builder = builder.set_repetition_level_histogram(Some(rep_hist.clone()))
601 }
602 if let Some(def_hist) = metadata.definition_level_histogram() {
603 builder = builder.set_definition_level_histogram(Some(def_hist.clone()))
604 }
605 if let Some(statistics) = metadata.statistics() {
606 builder = builder.set_statistics(statistics.clone())
607 }
608 close.metadata = builder.build()?;
609
610 if let Some(offsets) = close.offset_index.as_mut() {
611 for location in &mut offsets.page_locations {
612 location.offset = map_offset(location.offset)
613 }
614 }
615
616 let (_, on_close) = self.get_on_close();
617 on_close(close)
618 }
619
620 pub fn close(mut self) -> Result<RowGroupMetaDataPtr> {
622 if self.row_group_metadata.is_none() {
623 self.assert_previous_writer_closed()?;
624
625 let column_chunks = std::mem::take(&mut self.column_chunks);
626 let row_group_metadata = RowGroupMetaData::builder(self.descr.clone())
627 .set_column_metadata(column_chunks)
628 .set_total_byte_size(self.total_uncompressed_bytes)
629 .set_num_rows(self.total_rows_written.unwrap_or(0) as i64)
630 .set_sorting_columns(self.props.sorting_columns().cloned())
631 .set_ordinal(self.row_group_index)
632 .set_file_offset(self.file_offset)
633 .build()?;
634
635 self.row_group_metadata = Some(Arc::new(row_group_metadata.clone()));
636
637 if let Some(on_close) = self.on_close.take() {
638 on_close(
639 self.buf,
640 row_group_metadata,
641 self.bloom_filters,
642 self.column_indexes,
643 self.offset_indexes,
644 )?
645 }
646 }
647
648 let metadata = self.row_group_metadata.as_ref().unwrap().clone();
649 Ok(metadata)
650 }
651
652 #[inline]
653 fn assert_previous_writer_closed(&self) -> Result<()> {
654 if self.column_index != self.column_chunks.len() {
655 Err(general_err!("Previous column writer was not closed"))
656 } else {
657 Ok(())
658 }
659 }
660}
661
662pub struct SerializedColumnWriter<'a> {
664 inner: ColumnWriter<'a>,
665 on_close: Option<OnCloseColumnChunk<'a>>,
666}
667
668impl<'a> SerializedColumnWriter<'a> {
669 pub fn new(inner: ColumnWriter<'a>, on_close: Option<OnCloseColumnChunk<'a>>) -> Self {
672 Self { inner, on_close }
673 }
674
675 pub fn untyped(&mut self) -> &mut ColumnWriter<'a> {
677 &mut self.inner
678 }
679
680 pub fn typed<T: DataType>(&mut self) -> &mut ColumnWriterImpl<'a, T> {
682 get_typed_column_writer_mut(&mut self.inner)
683 }
684
685 pub fn close(mut self) -> Result<()> {
687 let r = self.inner.close()?;
688 if let Some(on_close) = self.on_close.take() {
689 on_close(r)?
690 }
691
692 Ok(())
693 }
694}
695
696pub struct SerializedPageWriter<'a, W: Write> {
701 sink: &'a mut TrackedWrite<W>,
702}
703
704impl<'a, W: Write> SerializedPageWriter<'a, W> {
705 pub fn new(sink: &'a mut TrackedWrite<W>) -> Self {
707 Self { sink }
708 }
709
710 #[inline]
713 fn serialize_page_header(&mut self, header: parquet::PageHeader) -> Result<usize> {
714 let start_pos = self.sink.bytes_written();
715 {
716 let mut protocol = TCompactOutputProtocol::new(&mut self.sink);
717 header.write_to_out_protocol(&mut protocol)?;
718 }
719 Ok(self.sink.bytes_written() - start_pos)
720 }
721}
722
723impl<W: Write + Send> PageWriter for SerializedPageWriter<'_, W> {
724 fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
725 let page_type = page.page_type();
726 let start_pos = self.sink.bytes_written() as u64;
727
728 let page_header = page.to_thrift_header();
729 let header_size = self.serialize_page_header(page_header)?;
730 self.sink.write_all(page.data())?;
731
732 let mut spec = PageWriteSpec::new();
733 spec.page_type = page_type;
734 spec.uncompressed_size = page.uncompressed_size() + header_size;
735 spec.compressed_size = page.compressed_size() + header_size;
736 spec.offset = start_pos;
737 spec.bytes_written = self.sink.bytes_written() as u64 - start_pos;
738 spec.num_values = page.num_values();
739
740 Ok(spec)
741 }
742
743 fn close(&mut self) -> Result<()> {
744 self.sink.flush()?;
745 Ok(())
746 }
747}
748
749#[cfg(test)]
750mod tests {
751 use super::*;
752
753 #[cfg(feature = "arrow")]
754 use arrow_array::RecordBatchReader;
755 use bytes::Bytes;
756 use std::fs::File;
757
758 #[cfg(feature = "arrow")]
759 use crate::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
760 #[cfg(feature = "arrow")]
761 use crate::arrow::ArrowWriter;
762 use crate::basic::{
763 ColumnOrder, Compression, ConvertedType, Encoding, LogicalType, Repetition, SortOrder, Type,
764 };
765 use crate::column::page::{Page, PageReader};
766 use crate::column::reader::get_typed_column_reader;
767 use crate::compression::{create_codec, Codec, CodecOptionsBuilder};
768 use crate::data_type::{BoolType, ByteArrayType, Int32Type};
769 use crate::file::page_index::index::Index;
770 use crate::file::properties::EnabledStatistics;
771 use crate::file::serialized_reader::ReadOptionsBuilder;
772 use crate::file::{
773 properties::{ReaderProperties, WriterProperties, WriterVersion},
774 reader::{FileReader, SerializedFileReader, SerializedPageReader},
775 statistics::{from_thrift, to_thrift, Statistics},
776 };
777 use crate::format::SortingColumn;
778 use crate::record::{Row, RowAccessor};
779 use crate::schema::parser::parse_message_type;
780 use crate::schema::types;
781 use crate::schema::types::{ColumnDescriptor, ColumnPath};
782 use crate::util::test_common::rand_gen::RandGen;
783
784 #[test]
785 fn test_row_group_writer_error_not_all_columns_written() {
786 let file = tempfile::tempfile().unwrap();
787 let schema = Arc::new(
788 types::Type::group_type_builder("schema")
789 .with_fields(vec![Arc::new(
790 types::Type::primitive_type_builder("col1", Type::INT32)
791 .build()
792 .unwrap(),
793 )])
794 .build()
795 .unwrap(),
796 );
797 let props = Default::default();
798 let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
799 let row_group_writer = writer.next_row_group().unwrap();
800 let res = row_group_writer.close();
801 assert!(res.is_err());
802 if let Err(err) = res {
803 assert_eq!(
804 format!("{err}"),
805 "Parquet error: Column length mismatch: 1 != 0"
806 );
807 }
808 }
809
810 #[test]
811 fn test_row_group_writer_num_records_mismatch() {
812 let file = tempfile::tempfile().unwrap();
813 let schema = Arc::new(
814 types::Type::group_type_builder("schema")
815 .with_fields(vec![
816 Arc::new(
817 types::Type::primitive_type_builder("col1", Type::INT32)
818 .with_repetition(Repetition::REQUIRED)
819 .build()
820 .unwrap(),
821 ),
822 Arc::new(
823 types::Type::primitive_type_builder("col2", Type::INT32)
824 .with_repetition(Repetition::REQUIRED)
825 .build()
826 .unwrap(),
827 ),
828 ])
829 .build()
830 .unwrap(),
831 );
832 let props = Default::default();
833 let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
834 let mut row_group_writer = writer.next_row_group().unwrap();
835
836 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
837 col_writer
838 .typed::<Int32Type>()
839 .write_batch(&[1, 2, 3], None, None)
840 .unwrap();
841 col_writer.close().unwrap();
842
843 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
844 col_writer
845 .typed::<Int32Type>()
846 .write_batch(&[1, 2], None, None)
847 .unwrap();
848
849 let err = col_writer.close().unwrap_err();
850 assert_eq!(
851 err.to_string(),
852 "Parquet error: Incorrect number of rows, expected 3 != 2 rows"
853 );
854 }
855
856 #[test]
857 fn test_file_writer_empty_file() {
858 let file = tempfile::tempfile().unwrap();
859
860 let schema = Arc::new(
861 types::Type::group_type_builder("schema")
862 .with_fields(vec![Arc::new(
863 types::Type::primitive_type_builder("col1", Type::INT32)
864 .build()
865 .unwrap(),
866 )])
867 .build()
868 .unwrap(),
869 );
870 let props = Default::default();
871 let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
872 writer.close().unwrap();
873
874 let reader = SerializedFileReader::new(file).unwrap();
875 assert_eq!(reader.get_row_iter(None).unwrap().count(), 0);
876 }
877
878 #[test]
879 fn test_file_writer_column_orders_populated() {
880 let file = tempfile::tempfile().unwrap();
881
882 let schema = Arc::new(
883 types::Type::group_type_builder("schema")
884 .with_fields(vec![
885 Arc::new(
886 types::Type::primitive_type_builder("col1", Type::INT32)
887 .build()
888 .unwrap(),
889 ),
890 Arc::new(
891 types::Type::primitive_type_builder("col2", Type::FIXED_LEN_BYTE_ARRAY)
892 .with_converted_type(ConvertedType::INTERVAL)
893 .with_length(12)
894 .build()
895 .unwrap(),
896 ),
897 Arc::new(
898 types::Type::group_type_builder("nested")
899 .with_repetition(Repetition::REQUIRED)
900 .with_fields(vec![
901 Arc::new(
902 types::Type::primitive_type_builder(
903 "col3",
904 Type::FIXED_LEN_BYTE_ARRAY,
905 )
906 .with_logical_type(Some(LogicalType::Float16))
907 .with_length(2)
908 .build()
909 .unwrap(),
910 ),
911 Arc::new(
912 types::Type::primitive_type_builder("col4", Type::BYTE_ARRAY)
913 .with_logical_type(Some(LogicalType::String))
914 .build()
915 .unwrap(),
916 ),
917 ])
918 .build()
919 .unwrap(),
920 ),
921 ])
922 .build()
923 .unwrap(),
924 );
925
926 let props = Default::default();
927 let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
928 writer.close().unwrap();
929
930 let reader = SerializedFileReader::new(file).unwrap();
931
932 let expected = vec![
934 ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED),
936 ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::UNDEFINED),
938 ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED),
940 ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::UNSIGNED),
942 ];
943 let actual = reader.metadata().file_metadata().column_orders();
944
945 assert!(actual.is_some());
946 let actual = actual.unwrap();
947 assert_eq!(*actual, expected);
948 }
949
950 #[test]
951 fn test_file_writer_with_metadata() {
952 let file = tempfile::tempfile().unwrap();
953
954 let schema = Arc::new(
955 types::Type::group_type_builder("schema")
956 .with_fields(vec![Arc::new(
957 types::Type::primitive_type_builder("col1", Type::INT32)
958 .build()
959 .unwrap(),
960 )])
961 .build()
962 .unwrap(),
963 );
964 let props = Arc::new(
965 WriterProperties::builder()
966 .set_key_value_metadata(Some(vec![KeyValue::new(
967 "key".to_string(),
968 "value".to_string(),
969 )]))
970 .build(),
971 );
972 let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
973 writer.close().unwrap();
974
975 let reader = SerializedFileReader::new(file).unwrap();
976 assert_eq!(
977 reader
978 .metadata()
979 .file_metadata()
980 .key_value_metadata()
981 .to_owned()
982 .unwrap()
983 .len(),
984 1
985 );
986 }
987
988 #[test]
989 fn test_file_writer_v2_with_metadata() {
990 let file = tempfile::tempfile().unwrap();
991 let field_logical_type = Some(LogicalType::Integer {
992 bit_width: 8,
993 is_signed: false,
994 });
995 let field = Arc::new(
996 types::Type::primitive_type_builder("col1", Type::INT32)
997 .with_logical_type(field_logical_type.clone())
998 .with_converted_type(field_logical_type.into())
999 .build()
1000 .unwrap(),
1001 );
1002 let schema = Arc::new(
1003 types::Type::group_type_builder("schema")
1004 .with_fields(vec![field.clone()])
1005 .build()
1006 .unwrap(),
1007 );
1008 let props = Arc::new(
1009 WriterProperties::builder()
1010 .set_key_value_metadata(Some(vec![KeyValue::new(
1011 "key".to_string(),
1012 "value".to_string(),
1013 )]))
1014 .set_writer_version(WriterVersion::PARQUET_2_0)
1015 .build(),
1016 );
1017 let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1018 writer.close().unwrap();
1019
1020 let reader = SerializedFileReader::new(file).unwrap();
1021
1022 assert_eq!(
1023 reader
1024 .metadata()
1025 .file_metadata()
1026 .key_value_metadata()
1027 .to_owned()
1028 .unwrap()
1029 .len(),
1030 1
1031 );
1032
1033 let fields = reader.metadata().file_metadata().schema().get_fields();
1035 assert_eq!(fields.len(), 1);
1036 assert_eq!(fields[0], field);
1037 }
1038
1039 #[test]
1040 fn test_file_writer_with_sorting_columns_metadata() {
1041 let file = tempfile::tempfile().unwrap();
1042
1043 let schema = Arc::new(
1044 types::Type::group_type_builder("schema")
1045 .with_fields(vec![
1046 Arc::new(
1047 types::Type::primitive_type_builder("col1", Type::INT32)
1048 .build()
1049 .unwrap(),
1050 ),
1051 Arc::new(
1052 types::Type::primitive_type_builder("col2", Type::INT32)
1053 .build()
1054 .unwrap(),
1055 ),
1056 ])
1057 .build()
1058 .unwrap(),
1059 );
1060 let expected_result = Some(vec![SortingColumn {
1061 column_idx: 0,
1062 descending: false,
1063 nulls_first: true,
1064 }]);
1065 let props = Arc::new(
1066 WriterProperties::builder()
1067 .set_key_value_metadata(Some(vec![KeyValue::new(
1068 "key".to_string(),
1069 "value".to_string(),
1070 )]))
1071 .set_sorting_columns(expected_result.clone())
1072 .build(),
1073 );
1074 let mut writer =
1075 SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1076 let mut row_group_writer = writer.next_row_group().expect("get row group writer");
1077
1078 let col_writer = row_group_writer.next_column().unwrap().unwrap();
1079 col_writer.close().unwrap();
1080
1081 let col_writer = row_group_writer.next_column().unwrap().unwrap();
1082 col_writer.close().unwrap();
1083
1084 row_group_writer.close().unwrap();
1085 writer.close().unwrap();
1086
1087 let reader = SerializedFileReader::new(file).unwrap();
1088 let result: Vec<Option<&Vec<SortingColumn>>> = reader
1089 .metadata()
1090 .row_groups()
1091 .iter()
1092 .map(|f| f.sorting_columns())
1093 .collect();
1094 assert_eq!(expected_result.as_ref(), result[0]);
1096 }
1097
1098 #[test]
1099 fn test_file_writer_empty_row_groups() {
1100 let file = tempfile::tempfile().unwrap();
1101 test_file_roundtrip(file, vec![]);
1102 }
1103
1104 #[test]
1105 fn test_file_writer_single_row_group() {
1106 let file = tempfile::tempfile().unwrap();
1107 test_file_roundtrip(file, vec![vec![1, 2, 3, 4, 5]]);
1108 }
1109
1110 #[test]
1111 fn test_file_writer_multiple_row_groups() {
1112 let file = tempfile::tempfile().unwrap();
1113 test_file_roundtrip(
1114 file,
1115 vec![
1116 vec![1, 2, 3, 4, 5],
1117 vec![1, 2, 3],
1118 vec![1],
1119 vec![1, 2, 3, 4, 5, 6],
1120 ],
1121 );
1122 }
1123
1124 #[test]
1125 fn test_file_writer_multiple_large_row_groups() {
1126 let file = tempfile::tempfile().unwrap();
1127 test_file_roundtrip(
1128 file,
1129 vec![vec![123; 1024], vec![124; 1000], vec![125; 15], vec![]],
1130 );
1131 }
1132
1133 #[test]
1134 fn test_page_writer_data_pages() {
1135 let pages = vec![
1136 Page::DataPage {
1137 buf: Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]),
1138 num_values: 10,
1139 encoding: Encoding::DELTA_BINARY_PACKED,
1140 def_level_encoding: Encoding::RLE,
1141 rep_level_encoding: Encoding::RLE,
1142 statistics: Some(Statistics::int32(Some(1), Some(3), None, Some(7), true)),
1143 },
1144 Page::DataPageV2 {
1145 buf: Bytes::from(vec![4; 128]),
1146 num_values: 10,
1147 encoding: Encoding::DELTA_BINARY_PACKED,
1148 num_nulls: 2,
1149 num_rows: 12,
1150 def_levels_byte_len: 24,
1151 rep_levels_byte_len: 32,
1152 is_compressed: false,
1153 statistics: Some(Statistics::int32(Some(1), Some(3), None, Some(7), true)),
1154 },
1155 ];
1156
1157 test_page_roundtrip(&pages[..], Compression::SNAPPY, Type::INT32);
1158 test_page_roundtrip(&pages[..], Compression::UNCOMPRESSED, Type::INT32);
1159 }
1160
1161 #[test]
1162 fn test_page_writer_dict_pages() {
1163 let pages = vec![
1164 Page::DictionaryPage {
1165 buf: Bytes::from(vec![1, 2, 3, 4, 5]),
1166 num_values: 5,
1167 encoding: Encoding::RLE_DICTIONARY,
1168 is_sorted: false,
1169 },
1170 Page::DataPage {
1171 buf: Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]),
1172 num_values: 10,
1173 encoding: Encoding::DELTA_BINARY_PACKED,
1174 def_level_encoding: Encoding::RLE,
1175 rep_level_encoding: Encoding::RLE,
1176 statistics: Some(Statistics::int32(Some(1), Some(3), None, Some(7), true)),
1177 },
1178 Page::DataPageV2 {
1179 buf: Bytes::from(vec![4; 128]),
1180 num_values: 10,
1181 encoding: Encoding::DELTA_BINARY_PACKED,
1182 num_nulls: 2,
1183 num_rows: 12,
1184 def_levels_byte_len: 24,
1185 rep_levels_byte_len: 32,
1186 is_compressed: false,
1187 statistics: None,
1188 },
1189 ];
1190
1191 test_page_roundtrip(&pages[..], Compression::SNAPPY, Type::INT32);
1192 test_page_roundtrip(&pages[..], Compression::UNCOMPRESSED, Type::INT32);
1193 }
1194
1195 fn test_page_roundtrip(pages: &[Page], codec: Compression, physical_type: Type) {
1199 let mut compressed_pages = vec![];
1200 let mut total_num_values = 0i64;
1201 let codec_options = CodecOptionsBuilder::default()
1202 .set_backward_compatible_lz4(false)
1203 .build();
1204 let mut compressor = create_codec(codec, &codec_options).unwrap();
1205
1206 for page in pages {
1207 let uncompressed_len = page.buffer().len();
1208
1209 let compressed_page = match *page {
1210 Page::DataPage {
1211 ref buf,
1212 num_values,
1213 encoding,
1214 def_level_encoding,
1215 rep_level_encoding,
1216 ref statistics,
1217 } => {
1218 total_num_values += num_values as i64;
1219 let output_buf = compress_helper(compressor.as_mut(), buf);
1220
1221 Page::DataPage {
1222 buf: Bytes::from(output_buf),
1223 num_values,
1224 encoding,
1225 def_level_encoding,
1226 rep_level_encoding,
1227 statistics: from_thrift(physical_type, to_thrift(statistics.as_ref()))
1228 .unwrap(),
1229 }
1230 }
1231 Page::DataPageV2 {
1232 ref buf,
1233 num_values,
1234 encoding,
1235 num_nulls,
1236 num_rows,
1237 def_levels_byte_len,
1238 rep_levels_byte_len,
1239 ref statistics,
1240 ..
1241 } => {
1242 total_num_values += num_values as i64;
1243 let offset = (def_levels_byte_len + rep_levels_byte_len) as usize;
1244 let cmp_buf = compress_helper(compressor.as_mut(), &buf[offset..]);
1245 let mut output_buf = Vec::from(&buf[..offset]);
1246 output_buf.extend_from_slice(&cmp_buf[..]);
1247
1248 Page::DataPageV2 {
1249 buf: Bytes::from(output_buf),
1250 num_values,
1251 encoding,
1252 num_nulls,
1253 num_rows,
1254 def_levels_byte_len,
1255 rep_levels_byte_len,
1256 is_compressed: compressor.is_some(),
1257 statistics: from_thrift(physical_type, to_thrift(statistics.as_ref()))
1258 .unwrap(),
1259 }
1260 }
1261 Page::DictionaryPage {
1262 ref buf,
1263 num_values,
1264 encoding,
1265 is_sorted,
1266 } => {
1267 let output_buf = compress_helper(compressor.as_mut(), buf);
1268
1269 Page::DictionaryPage {
1270 buf: Bytes::from(output_buf),
1271 num_values,
1272 encoding,
1273 is_sorted,
1274 }
1275 }
1276 };
1277
1278 let compressed_page = CompressedPage::new(compressed_page, uncompressed_len);
1279 compressed_pages.push(compressed_page);
1280 }
1281
1282 let mut buffer: Vec<u8> = vec![];
1283 let mut result_pages: Vec<Page> = vec![];
1284 {
1285 let mut writer = TrackedWrite::new(&mut buffer);
1286 let mut page_writer = SerializedPageWriter::new(&mut writer);
1287
1288 for page in compressed_pages {
1289 page_writer.write_page(page).unwrap();
1290 }
1291 page_writer.close().unwrap();
1292 }
1293 {
1294 let reader = bytes::Bytes::from(buffer);
1295
1296 let t = types::Type::primitive_type_builder("t", physical_type)
1297 .build()
1298 .unwrap();
1299
1300 let desc = ColumnDescriptor::new(Arc::new(t), 0, 0, ColumnPath::new(vec![]));
1301 let meta = ColumnChunkMetaData::builder(Arc::new(desc))
1302 .set_compression(codec)
1303 .set_total_compressed_size(reader.len() as i64)
1304 .set_num_values(total_num_values)
1305 .build()
1306 .unwrap();
1307
1308 let props = ReaderProperties::builder()
1309 .set_backward_compatible_lz4(false)
1310 .build();
1311 let mut page_reader = SerializedPageReader::new_with_properties(
1312 Arc::new(reader),
1313 &meta,
1314 total_num_values as usize,
1315 None,
1316 Arc::new(props),
1317 )
1318 .unwrap();
1319
1320 while let Some(page) = page_reader.get_next_page().unwrap() {
1321 result_pages.push(page);
1322 }
1323 }
1324
1325 assert_eq!(result_pages.len(), pages.len());
1326 for i in 0..result_pages.len() {
1327 assert_page(&result_pages[i], &pages[i]);
1328 }
1329 }
1330
1331 fn compress_helper(compressor: Option<&mut Box<dyn Codec>>, data: &[u8]) -> Vec<u8> {
1333 let mut output_buf = vec![];
1334 if let Some(cmpr) = compressor {
1335 cmpr.compress(data, &mut output_buf).unwrap();
1336 } else {
1337 output_buf.extend_from_slice(data);
1338 }
1339 output_buf
1340 }
1341
1342 fn assert_page(left: &Page, right: &Page) {
1344 assert_eq!(left.page_type(), right.page_type());
1345 assert_eq!(&left.buffer(), &right.buffer());
1346 assert_eq!(left.num_values(), right.num_values());
1347 assert_eq!(left.encoding(), right.encoding());
1348 assert_eq!(to_thrift(left.statistics()), to_thrift(right.statistics()));
1349 }
1350
1351 fn test_roundtrip_i32<W, R>(
1353 file: W,
1354 data: Vec<Vec<i32>>,
1355 compression: Compression,
1356 ) -> crate::format::FileMetaData
1357 where
1358 W: Write + Send,
1359 R: ChunkReader + From<W> + 'static,
1360 {
1361 test_roundtrip::<W, R, Int32Type, _>(file, data, |r| r.get_int(0).unwrap(), compression)
1362 }
1363
1364 fn test_roundtrip<W, R, D, F>(
1367 mut file: W,
1368 data: Vec<Vec<D::T>>,
1369 value: F,
1370 compression: Compression,
1371 ) -> crate::format::FileMetaData
1372 where
1373 W: Write + Send,
1374 R: ChunkReader + From<W> + 'static,
1375 D: DataType,
1376 F: Fn(Row) -> D::T,
1377 {
1378 let schema = Arc::new(
1379 types::Type::group_type_builder("schema")
1380 .with_fields(vec![Arc::new(
1381 types::Type::primitive_type_builder("col1", D::get_physical_type())
1382 .with_repetition(Repetition::REQUIRED)
1383 .build()
1384 .unwrap(),
1385 )])
1386 .build()
1387 .unwrap(),
1388 );
1389 let props = Arc::new(
1390 WriterProperties::builder()
1391 .set_compression(compression)
1392 .build(),
1393 );
1394 let mut file_writer = SerializedFileWriter::new(&mut file, schema, props).unwrap();
1395 let mut rows: i64 = 0;
1396
1397 for (idx, subset) in data.iter().enumerate() {
1398 let row_group_file_offset = file_writer.buf.bytes_written();
1399 let mut row_group_writer = file_writer.next_row_group().unwrap();
1400 if let Some(mut writer) = row_group_writer.next_column().unwrap() {
1401 rows += writer
1402 .typed::<D>()
1403 .write_batch(&subset[..], None, None)
1404 .unwrap() as i64;
1405 writer.close().unwrap();
1406 }
1407 let last_group = row_group_writer.close().unwrap();
1408 let flushed = file_writer.flushed_row_groups();
1409 assert_eq!(flushed.len(), idx + 1);
1410 assert_eq!(Some(idx as i16), last_group.ordinal());
1411 assert_eq!(Some(row_group_file_offset as i64), last_group.file_offset());
1412 assert_eq!(&flushed[idx], last_group.as_ref());
1413 }
1414 let file_metadata = file_writer.close().unwrap();
1415
1416 let reader = SerializedFileReader::new(R::from(file)).unwrap();
1417 assert_eq!(reader.num_row_groups(), data.len());
1418 assert_eq!(
1419 reader.metadata().file_metadata().num_rows(),
1420 rows,
1421 "row count in metadata not equal to number of rows written"
1422 );
1423 for (i, item) in data.iter().enumerate().take(reader.num_row_groups()) {
1424 let row_group_reader = reader.get_row_group(i).unwrap();
1425 let iter = row_group_reader.get_row_iter(None).unwrap();
1426 let res: Vec<_> = iter.map(|row| row.unwrap()).map(&value).collect();
1427 let row_group_size = row_group_reader.metadata().total_byte_size();
1428 let uncompressed_size: i64 = row_group_reader
1429 .metadata()
1430 .columns()
1431 .iter()
1432 .map(|v| v.uncompressed_size())
1433 .sum();
1434 assert_eq!(row_group_size, uncompressed_size);
1435 assert_eq!(res, *item);
1436 }
1437 file_metadata
1438 }
1439
1440 fn test_file_roundtrip(file: File, data: Vec<Vec<i32>>) -> crate::format::FileMetaData {
1443 test_roundtrip_i32::<File, File>(file, data, Compression::UNCOMPRESSED)
1444 }
1445
1446 #[test]
1447 fn test_bytes_writer_empty_row_groups() {
1448 test_bytes_roundtrip(vec![], Compression::UNCOMPRESSED);
1449 }
1450
1451 #[test]
1452 fn test_bytes_writer_single_row_group() {
1453 test_bytes_roundtrip(vec![vec![1, 2, 3, 4, 5]], Compression::UNCOMPRESSED);
1454 }
1455
1456 #[test]
1457 fn test_bytes_writer_multiple_row_groups() {
1458 test_bytes_roundtrip(
1459 vec![
1460 vec![1, 2, 3, 4, 5],
1461 vec![1, 2, 3],
1462 vec![1],
1463 vec![1, 2, 3, 4, 5, 6],
1464 ],
1465 Compression::UNCOMPRESSED,
1466 );
1467 }
1468
1469 #[test]
1470 fn test_bytes_writer_single_row_group_compressed() {
1471 test_bytes_roundtrip(vec![vec![1, 2, 3, 4, 5]], Compression::SNAPPY);
1472 }
1473
1474 #[test]
1475 fn test_bytes_writer_multiple_row_groups_compressed() {
1476 test_bytes_roundtrip(
1477 vec![
1478 vec![1, 2, 3, 4, 5],
1479 vec![1, 2, 3],
1480 vec![1],
1481 vec![1, 2, 3, 4, 5, 6],
1482 ],
1483 Compression::SNAPPY,
1484 );
1485 }
1486
1487 fn test_bytes_roundtrip(data: Vec<Vec<i32>>, compression: Compression) {
1488 test_roundtrip_i32::<Vec<u8>, Bytes>(Vec::with_capacity(1024), data, compression);
1489 }
1490
1491 #[test]
1492 fn test_boolean_roundtrip() {
1493 let my_bool_values: Vec<_> = (0..2049).map(|idx| idx % 2 == 0).collect();
1494 test_roundtrip::<Vec<u8>, Bytes, BoolType, _>(
1495 Vec::with_capacity(1024),
1496 vec![my_bool_values],
1497 |r| r.get_bool(0).unwrap(),
1498 Compression::UNCOMPRESSED,
1499 );
1500 }
1501
1502 #[test]
1503 fn test_boolean_compressed_roundtrip() {
1504 let my_bool_values: Vec<_> = (0..2049).map(|idx| idx % 2 == 0).collect();
1505 test_roundtrip::<Vec<u8>, Bytes, BoolType, _>(
1506 Vec::with_capacity(1024),
1507 vec![my_bool_values],
1508 |r| r.get_bool(0).unwrap(),
1509 Compression::SNAPPY,
1510 );
1511 }
1512
1513 #[test]
1514 fn test_column_offset_index_file() {
1515 let file = tempfile::tempfile().unwrap();
1516 let file_metadata = test_file_roundtrip(file, vec![vec![1, 2, 3, 4, 5]]);
1517 file_metadata.row_groups.iter().for_each(|row_group| {
1518 row_group.columns.iter().for_each(|column_chunk| {
1519 assert_ne!(None, column_chunk.column_index_offset);
1520 assert_ne!(None, column_chunk.column_index_length);
1521
1522 assert_ne!(None, column_chunk.offset_index_offset);
1523 assert_ne!(None, column_chunk.offset_index_length);
1524 })
1525 });
1526 }
1527
1528 fn test_kv_metadata(initial_kv: Option<Vec<KeyValue>>, final_kv: Option<Vec<KeyValue>>) {
1529 let schema = Arc::new(
1530 types::Type::group_type_builder("schema")
1531 .with_fields(vec![Arc::new(
1532 types::Type::primitive_type_builder("col1", Type::INT32)
1533 .with_repetition(Repetition::REQUIRED)
1534 .build()
1535 .unwrap(),
1536 )])
1537 .build()
1538 .unwrap(),
1539 );
1540 let mut out = Vec::with_capacity(1024);
1541 let props = Arc::new(
1542 WriterProperties::builder()
1543 .set_key_value_metadata(initial_kv.clone())
1544 .build(),
1545 );
1546 let mut writer = SerializedFileWriter::new(&mut out, schema, props).unwrap();
1547 let mut row_group_writer = writer.next_row_group().unwrap();
1548 let column = row_group_writer.next_column().unwrap().unwrap();
1549 column.close().unwrap();
1550 row_group_writer.close().unwrap();
1551 if let Some(kvs) = &final_kv {
1552 for kv in kvs {
1553 writer.append_key_value_metadata(kv.clone())
1554 }
1555 }
1556 writer.close().unwrap();
1557
1558 let reader = SerializedFileReader::new(Bytes::from(out)).unwrap();
1559 let metadata = reader.metadata().file_metadata();
1560 let keys = metadata.key_value_metadata();
1561
1562 match (initial_kv, final_kv) {
1563 (Some(a), Some(b)) => {
1564 let keys = keys.unwrap();
1565 assert_eq!(keys.len(), a.len() + b.len());
1566 assert_eq!(&keys[..a.len()], a.as_slice());
1567 assert_eq!(&keys[a.len()..], b.as_slice());
1568 }
1569 (Some(v), None) => assert_eq!(keys.unwrap(), &v),
1570 (None, Some(v)) if !v.is_empty() => assert_eq!(keys.unwrap(), &v),
1571 _ => assert!(keys.is_none()),
1572 }
1573 }
1574
1575 #[test]
1576 fn test_append_metadata() {
1577 let kv1 = KeyValue::new("cupcakes".to_string(), "awesome".to_string());
1578 let kv2 = KeyValue::new("bingo".to_string(), "bongo".to_string());
1579
1580 test_kv_metadata(None, None);
1581 test_kv_metadata(Some(vec![kv1.clone()]), None);
1582 test_kv_metadata(None, Some(vec![kv2.clone()]));
1583 test_kv_metadata(Some(vec![kv1.clone()]), Some(vec![kv2.clone()]));
1584 test_kv_metadata(Some(vec![]), Some(vec![kv2]));
1585 test_kv_metadata(Some(vec![]), Some(vec![]));
1586 test_kv_metadata(Some(vec![kv1]), Some(vec![]));
1587 test_kv_metadata(None, Some(vec![]));
1588 }
1589
1590 #[test]
1591 fn test_backwards_compatible_statistics() {
1592 let message_type = "
1593 message test_schema {
1594 REQUIRED INT32 decimal1 (DECIMAL(8,2));
1595 REQUIRED INT32 i32 (INTEGER(32,true));
1596 REQUIRED INT32 u32 (INTEGER(32,false));
1597 }
1598 ";
1599
1600 let schema = Arc::new(parse_message_type(message_type).unwrap());
1601 let props = Default::default();
1602 let mut writer = SerializedFileWriter::new(vec![], schema, props).unwrap();
1603 let mut row_group_writer = writer.next_row_group().unwrap();
1604
1605 for _ in 0..3 {
1606 let mut writer = row_group_writer.next_column().unwrap().unwrap();
1607 writer
1608 .typed::<Int32Type>()
1609 .write_batch(&[1, 2, 3], None, None)
1610 .unwrap();
1611 writer.close().unwrap();
1612 }
1613 let metadata = row_group_writer.close().unwrap();
1614 writer.close().unwrap();
1615
1616 let thrift = metadata.to_thrift();
1617 let encoded_stats: Vec<_> = thrift
1618 .columns
1619 .into_iter()
1620 .map(|x| x.meta_data.unwrap().statistics.unwrap())
1621 .collect();
1622
1623 let s = &encoded_stats[0];
1625 assert_eq!(s.min.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1626 assert_eq!(s.max.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1627 assert_eq!(s.min_value.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1628 assert_eq!(s.max_value.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1629
1630 let s = &encoded_stats[1];
1632 assert_eq!(s.min.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1633 assert_eq!(s.max.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1634 assert_eq!(s.min_value.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1635 assert_eq!(s.max_value.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1636
1637 let s = &encoded_stats[2];
1639 assert_eq!(s.min.as_deref(), None);
1640 assert_eq!(s.max.as_deref(), None);
1641 assert_eq!(s.min_value.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1642 assert_eq!(s.max_value.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1643 }
1644
1645 #[test]
1646 fn test_spliced_write() {
1647 let message_type = "
1648 message test_schema {
1649 REQUIRED INT32 i32 (INTEGER(32,true));
1650 REQUIRED INT32 u32 (INTEGER(32,false));
1651 }
1652 ";
1653 let schema = Arc::new(parse_message_type(message_type).unwrap());
1654 let props = Arc::new(WriterProperties::builder().build());
1655
1656 let mut file = Vec::with_capacity(1024);
1657 let mut file_writer = SerializedFileWriter::new(&mut file, schema, props.clone()).unwrap();
1658
1659 let columns = file_writer.descr.columns();
1660 let mut column_state: Vec<(_, Option<ColumnCloseResult>)> = columns
1661 .iter()
1662 .map(|_| (TrackedWrite::new(Vec::with_capacity(1024)), None))
1663 .collect();
1664
1665 let mut column_state_slice = column_state.as_mut_slice();
1666 let mut column_writers = Vec::with_capacity(columns.len());
1667 for c in columns {
1668 let ((buf, out), tail) = column_state_slice.split_first_mut().unwrap();
1669 column_state_slice = tail;
1670
1671 let page_writer = Box::new(SerializedPageWriter::new(buf));
1672 let col_writer = get_column_writer(c.clone(), props.clone(), page_writer);
1673 column_writers.push(SerializedColumnWriter::new(
1674 col_writer,
1675 Some(Box::new(|on_close| {
1676 *out = Some(on_close);
1677 Ok(())
1678 })),
1679 ));
1680 }
1681
1682 let column_data = [[1, 2, 3, 4], [7, 3, 7, 3]];
1683
1684 for (writer, batch) in column_writers.iter_mut().zip(column_data) {
1686 let writer = writer.typed::<Int32Type>();
1687 writer.write_batch(&batch, None, None).unwrap();
1688 }
1689
1690 for writer in column_writers {
1692 writer.close().unwrap()
1693 }
1694
1695 let mut row_group_writer = file_writer.next_row_group().unwrap();
1697 for (write, close) in column_state {
1698 let buf = Bytes::from(write.into_inner().unwrap());
1699 row_group_writer
1700 .append_column(&buf, close.unwrap())
1701 .unwrap();
1702 }
1703 row_group_writer.close().unwrap();
1704 file_writer.close().unwrap();
1705
1706 let file = Bytes::from(file);
1708 let test_read = |reader: SerializedFileReader<Bytes>| {
1709 let row_group = reader.get_row_group(0).unwrap();
1710
1711 let mut out = Vec::with_capacity(4);
1712 let c1 = row_group.get_column_reader(0).unwrap();
1713 let mut c1 = get_typed_column_reader::<Int32Type>(c1);
1714 c1.read_records(4, None, None, &mut out).unwrap();
1715 assert_eq!(out, column_data[0]);
1716
1717 out.clear();
1718
1719 let c2 = row_group.get_column_reader(1).unwrap();
1720 let mut c2 = get_typed_column_reader::<Int32Type>(c2);
1721 c2.read_records(4, None, None, &mut out).unwrap();
1722 assert_eq!(out, column_data[1]);
1723 };
1724
1725 let reader = SerializedFileReader::new(file.clone()).unwrap();
1726 test_read(reader);
1727
1728 let options = ReadOptionsBuilder::new().with_page_index().build();
1729 let reader = SerializedFileReader::new_with_options(file, options).unwrap();
1730 test_read(reader);
1731 }
1732
1733 #[test]
1734 fn test_disabled_statistics() {
1735 let message_type = "
1736 message test_schema {
1737 REQUIRED INT32 a;
1738 REQUIRED INT32 b;
1739 }
1740 ";
1741 let schema = Arc::new(parse_message_type(message_type).unwrap());
1742 let props = WriterProperties::builder()
1743 .set_statistics_enabled(EnabledStatistics::None)
1744 .set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
1745 .set_offset_index_disabled(true) .build();
1747 let mut file = Vec::with_capacity(1024);
1748 let mut file_writer =
1749 SerializedFileWriter::new(&mut file, schema, Arc::new(props)).unwrap();
1750
1751 let mut row_group_writer = file_writer.next_row_group().unwrap();
1752 let mut a_writer = row_group_writer.next_column().unwrap().unwrap();
1753 let col_writer = a_writer.typed::<Int32Type>();
1754 col_writer.write_batch(&[1, 2, 3], None, None).unwrap();
1755 a_writer.close().unwrap();
1756
1757 let mut b_writer = row_group_writer.next_column().unwrap().unwrap();
1758 let col_writer = b_writer.typed::<Int32Type>();
1759 col_writer.write_batch(&[4, 5, 6], None, None).unwrap();
1760 b_writer.close().unwrap();
1761 row_group_writer.close().unwrap();
1762
1763 let metadata = file_writer.finish().unwrap();
1764 assert_eq!(metadata.row_groups.len(), 1);
1765 let row_group = &metadata.row_groups[0];
1766 assert_eq!(row_group.columns.len(), 2);
1767 assert!(row_group.columns[0].offset_index_offset.is_some());
1769 assert!(row_group.columns[0].column_index_offset.is_some());
1770 assert!(row_group.columns[1].offset_index_offset.is_some());
1772 assert!(row_group.columns[1].column_index_offset.is_none());
1773
1774 let err = file_writer.next_row_group().err().unwrap().to_string();
1775 assert_eq!(err, "Parquet error: SerializedFileWriter already finished");
1776
1777 drop(file_writer);
1778
1779 let options = ReadOptionsBuilder::new().with_page_index().build();
1780 let reader = SerializedFileReader::new_with_options(Bytes::from(file), options).unwrap();
1781
1782 let offset_index = reader.metadata().offset_index().unwrap();
1783 assert_eq!(offset_index.len(), 1); assert_eq!(offset_index[0].len(), 2); let column_index = reader.metadata().column_index().unwrap();
1787 assert_eq!(column_index.len(), 1); assert_eq!(column_index[0].len(), 2); let a_idx = &column_index[0][0];
1791 assert!(matches!(a_idx, Index::INT32(_)), "{a_idx:?}");
1792 let b_idx = &column_index[0][1];
1793 assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
1794 }
1795
1796 #[test]
1797 fn test_byte_array_size_statistics() {
1798 let message_type = "
1799 message test_schema {
1800 OPTIONAL BYTE_ARRAY a (UTF8);
1801 }
1802 ";
1803 let schema = Arc::new(parse_message_type(message_type).unwrap());
1804 let data = ByteArrayType::gen_vec(32, 7);
1805 let def_levels = [1, 1, 1, 1, 0, 1, 0, 1, 0, 1];
1806 let unenc_size: i64 = data.iter().map(|x| x.len() as i64).sum();
1807 let file: File = tempfile::tempfile().unwrap();
1808 let props = Arc::new(
1809 WriterProperties::builder()
1810 .set_statistics_enabled(EnabledStatistics::Page)
1811 .build(),
1812 );
1813
1814 let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
1815 let mut row_group_writer = writer.next_row_group().unwrap();
1816
1817 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
1818 col_writer
1819 .typed::<ByteArrayType>()
1820 .write_batch(&data, Some(&def_levels), None)
1821 .unwrap();
1822 col_writer.close().unwrap();
1823 row_group_writer.close().unwrap();
1824 let file_metadata = writer.close().unwrap();
1825
1826 assert_eq!(file_metadata.row_groups.len(), 1);
1827 assert_eq!(file_metadata.row_groups[0].columns.len(), 1);
1828 assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
1829
1830 let check_def_hist = |def_hist: &[i64]| {
1831 assert_eq!(def_hist.len(), 2);
1832 assert_eq!(def_hist[0], 3);
1833 assert_eq!(def_hist[1], 7);
1834 };
1835
1836 assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
1837 let meta_data = file_metadata.row_groups[0].columns[0]
1838 .meta_data
1839 .as_ref()
1840 .unwrap();
1841 assert!(meta_data.size_statistics.is_some());
1842 let size_stats = meta_data.size_statistics.as_ref().unwrap();
1843
1844 assert!(size_stats.repetition_level_histogram.is_none());
1845 assert!(size_stats.definition_level_histogram.is_some());
1846 assert!(size_stats.unencoded_byte_array_data_bytes.is_some());
1847 assert_eq!(
1848 unenc_size,
1849 size_stats.unencoded_byte_array_data_bytes.unwrap()
1850 );
1851 check_def_hist(size_stats.definition_level_histogram.as_ref().unwrap());
1852
1853 let options = ReadOptionsBuilder::new().with_page_index().build();
1855 let reader = SerializedFileReader::new_with_options(file, options).unwrap();
1856
1857 let rfile_metadata = reader.metadata().file_metadata();
1858 assert_eq!(rfile_metadata.num_rows(), file_metadata.num_rows);
1859 assert_eq!(reader.num_row_groups(), 1);
1860 let rowgroup = reader.get_row_group(0).unwrap();
1861 assert_eq!(rowgroup.num_columns(), 1);
1862 let column = rowgroup.metadata().column(0);
1863 assert!(column.definition_level_histogram().is_some());
1864 assert!(column.repetition_level_histogram().is_none());
1865 assert!(column.unencoded_byte_array_data_bytes().is_some());
1866 check_def_hist(column.definition_level_histogram().unwrap().values());
1867 assert_eq!(
1868 unenc_size,
1869 column.unencoded_byte_array_data_bytes().unwrap()
1870 );
1871
1872 assert!(reader.metadata().column_index().is_some());
1874 let column_index = reader.metadata().column_index().unwrap();
1875 assert_eq!(column_index.len(), 1);
1876 assert_eq!(column_index[0].len(), 1);
1877 let col_idx = if let Index::BYTE_ARRAY(index) = &column_index[0][0] {
1878 assert_eq!(index.indexes.len(), 1);
1879 &index.indexes[0]
1880 } else {
1881 unreachable!()
1882 };
1883
1884 assert!(col_idx.repetition_level_histogram().is_none());
1885 assert!(col_idx.definition_level_histogram().is_some());
1886 check_def_hist(col_idx.definition_level_histogram().unwrap().values());
1887
1888 assert!(reader.metadata().offset_index().is_some());
1889 let offset_index = reader.metadata().offset_index().unwrap();
1890 assert_eq!(offset_index.len(), 1);
1891 assert_eq!(offset_index[0].len(), 1);
1892 assert!(offset_index[0][0].unencoded_byte_array_data_bytes.is_some());
1893 let page_sizes = offset_index[0][0]
1894 .unencoded_byte_array_data_bytes
1895 .as_ref()
1896 .unwrap();
1897 assert_eq!(page_sizes.len(), 1);
1898 assert_eq!(page_sizes[0], unenc_size);
1899 }
1900
1901 #[test]
1902 fn test_too_many_rowgroups() {
1903 let message_type = "
1904 message test_schema {
1905 REQUIRED BYTE_ARRAY a (UTF8);
1906 }
1907 ";
1908 let schema = Arc::new(parse_message_type(message_type).unwrap());
1909 let file: File = tempfile::tempfile().unwrap();
1910 let props = Arc::new(
1911 WriterProperties::builder()
1912 .set_statistics_enabled(EnabledStatistics::None)
1913 .set_max_row_group_size(1)
1914 .build(),
1915 );
1916 let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
1917
1918 for i in 0..0x8001 {
1920 match writer.next_row_group() {
1921 Ok(mut row_group_writer) => {
1922 assert_ne!(i, 0x8000);
1923 let col_writer = row_group_writer.next_column().unwrap().unwrap();
1924 col_writer.close().unwrap();
1925 row_group_writer.close().unwrap();
1926 }
1927 Err(e) => {
1928 assert_eq!(i, 0x8000);
1929 assert_eq!(
1930 e.to_string(),
1931 "Parquet error: Parquet does not support more than 32767 row groups per file (currently: 32768)"
1932 );
1933 }
1934 }
1935 }
1936 writer.close().unwrap();
1937 }
1938
1939 #[test]
1940 fn test_size_statistics_with_repetition_and_nulls() {
1941 let message_type = "
1942 message test_schema {
1943 OPTIONAL group i32_list (LIST) {
1944 REPEATED group list {
1945 OPTIONAL INT32 element;
1946 }
1947 }
1948 }
1949 ";
1950 let schema = Arc::new(parse_message_type(message_type).unwrap());
1957 let data = [1, 2, 4, 7, 8, 9, 10];
1958 let def_levels = [3, 3, 0, 3, 2, 1, 3, 3, 3, 3];
1959 let rep_levels = [0, 1, 0, 0, 1, 0, 0, 1, 1, 1];
1960 let file = tempfile::tempfile().unwrap();
1961 let props = Arc::new(
1962 WriterProperties::builder()
1963 .set_statistics_enabled(EnabledStatistics::Page)
1964 .build(),
1965 );
1966 let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
1967 let mut row_group_writer = writer.next_row_group().unwrap();
1968
1969 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
1970 col_writer
1971 .typed::<Int32Type>()
1972 .write_batch(&data, Some(&def_levels), Some(&rep_levels))
1973 .unwrap();
1974 col_writer.close().unwrap();
1975 row_group_writer.close().unwrap();
1976 let file_metadata = writer.close().unwrap();
1977
1978 assert_eq!(file_metadata.row_groups.len(), 1);
1979 assert_eq!(file_metadata.row_groups[0].columns.len(), 1);
1980 assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
1981
1982 let check_def_hist = |def_hist: &[i64]| {
1983 assert_eq!(def_hist.len(), 4);
1984 assert_eq!(def_hist[0], 1);
1985 assert_eq!(def_hist[1], 1);
1986 assert_eq!(def_hist[2], 1);
1987 assert_eq!(def_hist[3], 7);
1988 };
1989
1990 let check_rep_hist = |rep_hist: &[i64]| {
1991 assert_eq!(rep_hist.len(), 2);
1992 assert_eq!(rep_hist[0], 5);
1993 assert_eq!(rep_hist[1], 5);
1994 };
1995
1996 assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
1999 let meta_data = file_metadata.row_groups[0].columns[0]
2000 .meta_data
2001 .as_ref()
2002 .unwrap();
2003 assert!(meta_data.size_statistics.is_some());
2004 let size_stats = meta_data.size_statistics.as_ref().unwrap();
2005 assert!(size_stats.repetition_level_histogram.is_some());
2006 assert!(size_stats.definition_level_histogram.is_some());
2007 assert!(size_stats.unencoded_byte_array_data_bytes.is_none());
2008 check_def_hist(size_stats.definition_level_histogram.as_ref().unwrap());
2009 check_rep_hist(size_stats.repetition_level_histogram.as_ref().unwrap());
2010
2011 let options = ReadOptionsBuilder::new().with_page_index().build();
2013 let reader = SerializedFileReader::new_with_options(file, options).unwrap();
2014
2015 let rfile_metadata = reader.metadata().file_metadata();
2016 assert_eq!(rfile_metadata.num_rows(), file_metadata.num_rows);
2017 assert_eq!(reader.num_row_groups(), 1);
2018 let rowgroup = reader.get_row_group(0).unwrap();
2019 assert_eq!(rowgroup.num_columns(), 1);
2020 let column = rowgroup.metadata().column(0);
2021 assert!(column.definition_level_histogram().is_some());
2022 assert!(column.repetition_level_histogram().is_some());
2023 assert!(column.unencoded_byte_array_data_bytes().is_none());
2024 check_def_hist(column.definition_level_histogram().unwrap().values());
2025 check_rep_hist(column.repetition_level_histogram().unwrap().values());
2026
2027 assert!(reader.metadata().column_index().is_some());
2029 let column_index = reader.metadata().column_index().unwrap();
2030 assert_eq!(column_index.len(), 1);
2031 assert_eq!(column_index[0].len(), 1);
2032 let col_idx = if let Index::INT32(index) = &column_index[0][0] {
2033 assert_eq!(index.indexes.len(), 1);
2034 &index.indexes[0]
2035 } else {
2036 unreachable!()
2037 };
2038
2039 check_def_hist(col_idx.definition_level_histogram().unwrap().values());
2040 check_rep_hist(col_idx.repetition_level_histogram().unwrap().values());
2041
2042 assert!(reader.metadata().offset_index().is_some());
2043 let offset_index = reader.metadata().offset_index().unwrap();
2044 assert_eq!(offset_index.len(), 1);
2045 assert_eq!(offset_index[0].len(), 1);
2046 assert!(offset_index[0][0].unencoded_byte_array_data_bytes.is_none());
2047 }
2048
2049 #[test]
2050 #[cfg(feature = "arrow")]
2051 fn test_byte_stream_split_extended_roundtrip() {
2052 let path = format!(
2053 "{}/byte_stream_split_extended.gzip.parquet",
2054 arrow::util::test_util::parquet_test_data(),
2055 );
2056 let file = File::open(path).unwrap();
2057
2058 let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(file)
2060 .expect("parquet open")
2061 .build()
2062 .expect("parquet open");
2063
2064 let file = tempfile::tempfile().unwrap();
2065 let props = WriterProperties::builder()
2066 .set_dictionary_enabled(false)
2067 .set_column_encoding(
2068 ColumnPath::from("float16_byte_stream_split"),
2069 Encoding::BYTE_STREAM_SPLIT,
2070 )
2071 .set_column_encoding(
2072 ColumnPath::from("float_byte_stream_split"),
2073 Encoding::BYTE_STREAM_SPLIT,
2074 )
2075 .set_column_encoding(
2076 ColumnPath::from("double_byte_stream_split"),
2077 Encoding::BYTE_STREAM_SPLIT,
2078 )
2079 .set_column_encoding(
2080 ColumnPath::from("int32_byte_stream_split"),
2081 Encoding::BYTE_STREAM_SPLIT,
2082 )
2083 .set_column_encoding(
2084 ColumnPath::from("int64_byte_stream_split"),
2085 Encoding::BYTE_STREAM_SPLIT,
2086 )
2087 .set_column_encoding(
2088 ColumnPath::from("flba5_byte_stream_split"),
2089 Encoding::BYTE_STREAM_SPLIT,
2090 )
2091 .set_column_encoding(
2092 ColumnPath::from("decimal_byte_stream_split"),
2093 Encoding::BYTE_STREAM_SPLIT,
2094 )
2095 .build();
2096
2097 let mut parquet_writer = ArrowWriter::try_new(
2098 file.try_clone().expect("cannot open file"),
2099 parquet_reader.schema(),
2100 Some(props),
2101 )
2102 .expect("create arrow writer");
2103
2104 for maybe_batch in parquet_reader {
2105 let batch = maybe_batch.expect("reading batch");
2106 parquet_writer.write(&batch).expect("writing data");
2107 }
2108
2109 parquet_writer.close().expect("finalizing file");
2110
2111 let reader = SerializedFileReader::new(file).expect("Failed to create reader");
2112 let filemeta = reader.metadata();
2113
2114 let check_encoding = |x: usize, filemeta: &ParquetMetaData| {
2116 assert!(filemeta
2117 .row_group(0)
2118 .column(x)
2119 .encodings()
2120 .contains(&Encoding::BYTE_STREAM_SPLIT));
2121 };
2122
2123 check_encoding(1, filemeta);
2124 check_encoding(3, filemeta);
2125 check_encoding(5, filemeta);
2126 check_encoding(7, filemeta);
2127 check_encoding(9, filemeta);
2128 check_encoding(11, filemeta);
2129 check_encoding(13, filemeta);
2130
2131 let mut iter = reader
2133 .get_row_iter(None)
2134 .expect("Failed to create row iterator");
2135
2136 let mut start = 0;
2137 let end = reader.metadata().file_metadata().num_rows();
2138
2139 let check_row = |row: Result<Row, ParquetError>| {
2140 assert!(row.is_ok());
2141 let r = row.unwrap();
2142 assert_eq!(r.get_float16(0).unwrap(), r.get_float16(1).unwrap());
2143 assert_eq!(r.get_float(2).unwrap(), r.get_float(3).unwrap());
2144 assert_eq!(r.get_double(4).unwrap(), r.get_double(5).unwrap());
2145 assert_eq!(r.get_int(6).unwrap(), r.get_int(7).unwrap());
2146 assert_eq!(r.get_long(8).unwrap(), r.get_long(9).unwrap());
2147 assert_eq!(r.get_bytes(10).unwrap(), r.get_bytes(11).unwrap());
2148 assert_eq!(r.get_decimal(12).unwrap(), r.get_decimal(13).unwrap());
2149 };
2150
2151 while start < end {
2152 match iter.next() {
2153 Some(row) => check_row(row),
2154 None => break,
2155 };
2156 start += 1;
2157 }
2158 }
2159}