1use bytes::Bytes;
21use half::f16;
22
23use crate::bloom_filter::Sbbf;
24use crate::format::{BoundaryOrder, ColumnIndex, OffsetIndex};
25use std::collections::{BTreeSet, VecDeque};
26use std::str;
27
28use crate::basic::{Compression, ConvertedType, Encoding, LogicalType, PageType, Type};
29use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter};
30use crate::column::writer::encoder::{ColumnValueEncoder, ColumnValueEncoderImpl, ColumnValues};
31use crate::compression::{create_codec, Codec, CodecOptionsBuilder};
32use crate::data_type::private::ParquetValueType;
33use crate::data_type::*;
34use crate::encodings::levels::LevelEncoder;
35use crate::errors::{ParquetError, Result};
36use crate::file::metadata::{ColumnIndexBuilder, LevelHistogram, OffsetIndexBuilder};
37use crate::file::properties::EnabledStatistics;
38use crate::file::statistics::{Statistics, ValueStatistics};
39use crate::file::{
40 metadata::ColumnChunkMetaData,
41 properties::{WriterProperties, WriterPropertiesPtr, WriterVersion},
42};
43use crate::schema::types::{ColumnDescPtr, ColumnDescriptor};
44
45pub(crate) mod encoder;
46
47macro_rules! downcast_writer {
48 ($e:expr, $i:ident, $b:expr) => {
49 match $e {
50 Self::BoolColumnWriter($i) => $b,
51 Self::Int32ColumnWriter($i) => $b,
52 Self::Int64ColumnWriter($i) => $b,
53 Self::Int96ColumnWriter($i) => $b,
54 Self::FloatColumnWriter($i) => $b,
55 Self::DoubleColumnWriter($i) => $b,
56 Self::ByteArrayColumnWriter($i) => $b,
57 Self::FixedLenByteArrayColumnWriter($i) => $b,
58 }
59 };
60}
61
62pub enum ColumnWriter<'a> {
64 BoolColumnWriter(ColumnWriterImpl<'a, BoolType>),
66 Int32ColumnWriter(ColumnWriterImpl<'a, Int32Type>),
68 Int64ColumnWriter(ColumnWriterImpl<'a, Int64Type>),
70 Int96ColumnWriter(ColumnWriterImpl<'a, Int96Type>),
72 FloatColumnWriter(ColumnWriterImpl<'a, FloatType>),
74 DoubleColumnWriter(ColumnWriterImpl<'a, DoubleType>),
76 ByteArrayColumnWriter(ColumnWriterImpl<'a, ByteArrayType>),
78 FixedLenByteArrayColumnWriter(ColumnWriterImpl<'a, FixedLenByteArrayType>),
80}
81
82impl ColumnWriter<'_> {
83 #[cfg(feature = "arrow")]
85 pub(crate) fn memory_size(&self) -> usize {
86 downcast_writer!(self, typed, typed.memory_size())
87 }
88
89 #[cfg(feature = "arrow")]
91 pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
92 downcast_writer!(self, typed, typed.get_estimated_total_bytes())
93 }
94
95 pub fn close(self) -> Result<ColumnCloseResult> {
97 downcast_writer!(self, typed, typed.close())
98 }
99}
100
101#[deprecated(
102 since = "54.0.0",
103 note = "Seems like a stray and nobody knows what's it for. Will be removed in the next release."
104)]
105#[allow(missing_docs)]
106pub enum Level {
107 Page,
108 Column,
109}
110
111pub fn get_column_writer<'a>(
113 descr: ColumnDescPtr,
114 props: WriterPropertiesPtr,
115 page_writer: Box<dyn PageWriter + 'a>,
116) -> ColumnWriter<'a> {
117 match descr.physical_type() {
118 Type::BOOLEAN => {
119 ColumnWriter::BoolColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
120 }
121 Type::INT32 => {
122 ColumnWriter::Int32ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
123 }
124 Type::INT64 => {
125 ColumnWriter::Int64ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
126 }
127 Type::INT96 => {
128 ColumnWriter::Int96ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
129 }
130 Type::FLOAT => {
131 ColumnWriter::FloatColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
132 }
133 Type::DOUBLE => {
134 ColumnWriter::DoubleColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
135 }
136 Type::BYTE_ARRAY => {
137 ColumnWriter::ByteArrayColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
138 }
139 Type::FIXED_LEN_BYTE_ARRAY => ColumnWriter::FixedLenByteArrayColumnWriter(
140 ColumnWriterImpl::new(descr, props, page_writer),
141 ),
142 }
143}
144
145pub fn get_typed_column_writer<T: DataType>(col_writer: ColumnWriter) -> ColumnWriterImpl<T> {
150 T::get_column_writer(col_writer).unwrap_or_else(|| {
151 panic!(
152 "Failed to convert column writer into a typed column writer for `{}` type",
153 T::get_physical_type()
154 )
155 })
156}
157
158pub fn get_typed_column_writer_ref<'a, 'b: 'a, T: DataType>(
160 col_writer: &'b ColumnWriter<'a>,
161) -> &'b ColumnWriterImpl<'a, T> {
162 T::get_column_writer_ref(col_writer).unwrap_or_else(|| {
163 panic!(
164 "Failed to convert column writer into a typed column writer for `{}` type",
165 T::get_physical_type()
166 )
167 })
168}
169
170pub fn get_typed_column_writer_mut<'a, 'b: 'a, T: DataType>(
172 col_writer: &'a mut ColumnWriter<'b>,
173) -> &'a mut ColumnWriterImpl<'b, T> {
174 T::get_column_writer_mut(col_writer).unwrap_or_else(|| {
175 panic!(
176 "Failed to convert column writer into a typed column writer for `{}` type",
177 T::get_physical_type()
178 )
179 })
180}
181
182#[derive(Debug, Clone)]
184pub struct ColumnCloseResult {
185 pub bytes_written: u64,
187 pub rows_written: u64,
189 pub metadata: ColumnChunkMetaData,
191 pub bloom_filter: Option<Sbbf>,
193 pub column_index: Option<ColumnIndex>,
195 pub offset_index: Option<OffsetIndex>,
197}
198
199#[derive(Default)]
201struct PageMetrics {
202 num_buffered_values: u32,
203 num_buffered_rows: u32,
204 num_page_nulls: u64,
205 repetition_level_histogram: Option<LevelHistogram>,
206 definition_level_histogram: Option<LevelHistogram>,
207}
208
209impl PageMetrics {
210 fn new() -> Self {
211 Default::default()
212 }
213
214 fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
216 self.repetition_level_histogram = LevelHistogram::try_new(max_level);
217 self
218 }
219
220 fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
222 self.definition_level_histogram = LevelHistogram::try_new(max_level);
223 self
224 }
225
226 fn new_page(&mut self) {
229 self.num_buffered_values = 0;
230 self.num_buffered_rows = 0;
231 self.num_page_nulls = 0;
232 self.repetition_level_histogram
233 .as_mut()
234 .map(LevelHistogram::reset);
235 self.definition_level_histogram
236 .as_mut()
237 .map(LevelHistogram::reset);
238 }
239
240 fn update_repetition_level_histogram(&mut self, levels: &[i16]) {
242 if let Some(ref mut rep_hist) = self.repetition_level_histogram {
243 rep_hist.update_from_levels(levels);
244 }
245 }
246
247 fn update_definition_level_histogram(&mut self, levels: &[i16]) {
249 if let Some(ref mut def_hist) = self.definition_level_histogram {
250 def_hist.update_from_levels(levels);
251 }
252 }
253}
254
255#[derive(Default)]
257struct ColumnMetrics<T: Default> {
258 total_bytes_written: u64,
259 total_rows_written: u64,
260 total_uncompressed_size: u64,
261 total_compressed_size: u64,
262 total_num_values: u64,
263 dictionary_page_offset: Option<u64>,
264 data_page_offset: Option<u64>,
265 min_column_value: Option<T>,
266 max_column_value: Option<T>,
267 num_column_nulls: u64,
268 column_distinct_count: Option<u64>,
269 variable_length_bytes: Option<i64>,
270 repetition_level_histogram: Option<LevelHistogram>,
271 definition_level_histogram: Option<LevelHistogram>,
272}
273
274impl<T: Default> ColumnMetrics<T> {
275 fn new() -> Self {
276 Default::default()
277 }
278
279 fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
281 self.repetition_level_histogram = LevelHistogram::try_new(max_level);
282 self
283 }
284
285 fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
287 self.definition_level_histogram = LevelHistogram::try_new(max_level);
288 self
289 }
290
291 fn update_histogram(
293 chunk_histogram: &mut Option<LevelHistogram>,
294 page_histogram: &Option<LevelHistogram>,
295 ) {
296 if let (Some(page_hist), Some(chunk_hist)) = (page_histogram, chunk_histogram) {
297 chunk_hist.add(page_hist);
298 }
299 }
300
301 fn update_from_page_metrics(&mut self, page_metrics: &PageMetrics) {
304 ColumnMetrics::<T>::update_histogram(
305 &mut self.definition_level_histogram,
306 &page_metrics.definition_level_histogram,
307 );
308 ColumnMetrics::<T>::update_histogram(
309 &mut self.repetition_level_histogram,
310 &page_metrics.repetition_level_histogram,
311 );
312 }
313
314 fn update_variable_length_bytes(&mut self, variable_length_bytes: Option<i64>) {
316 if let Some(var_bytes) = variable_length_bytes {
317 *self.variable_length_bytes.get_or_insert(0) += var_bytes;
318 }
319 }
320}
321
322pub type ColumnWriterImpl<'a, T> = GenericColumnWriter<'a, ColumnValueEncoderImpl<T>>;
324
325pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> {
327 descr: ColumnDescPtr,
329 props: WriterPropertiesPtr,
330 statistics_enabled: EnabledStatistics,
331
332 page_writer: Box<dyn PageWriter + 'a>,
333 codec: Compression,
334 compressor: Option<Box<dyn Codec>>,
335 encoder: E,
336
337 page_metrics: PageMetrics,
338 column_metrics: ColumnMetrics<E::T>,
340
341 encodings: BTreeSet<Encoding>,
344 def_levels_sink: Vec<i16>,
346 rep_levels_sink: Vec<i16>,
347 data_pages: VecDeque<CompressedPage>,
348 column_index_builder: ColumnIndexBuilder,
350 offset_index_builder: Option<OffsetIndexBuilder>,
351
352 data_page_boundary_ascending: bool,
355 data_page_boundary_descending: bool,
356 last_non_null_data_page_min_max: Option<(E::T, E::T)>,
358}
359
360impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
361 pub fn new(
363 descr: ColumnDescPtr,
364 props: WriterPropertiesPtr,
365 page_writer: Box<dyn PageWriter + 'a>,
366 ) -> Self {
367 let codec = props.compression(descr.path());
368 let codec_options = CodecOptionsBuilder::default().build();
369 let compressor = create_codec(codec, &codec_options).unwrap();
370 let encoder = E::try_new(&descr, props.as_ref()).unwrap();
371
372 let statistics_enabled = props.statistics_enabled(descr.path());
373
374 let mut encodings = BTreeSet::new();
375 encodings.insert(Encoding::RLE);
377
378 let mut page_metrics = PageMetrics::new();
379 let mut column_metrics = ColumnMetrics::<E::T>::new();
380
381 if statistics_enabled != EnabledStatistics::None {
383 page_metrics = page_metrics
384 .with_repetition_level_histogram(descr.max_rep_level())
385 .with_definition_level_histogram(descr.max_def_level());
386 column_metrics = column_metrics
387 .with_repetition_level_histogram(descr.max_rep_level())
388 .with_definition_level_histogram(descr.max_def_level())
389 }
390
391 let mut column_index_builder = ColumnIndexBuilder::new();
393 if statistics_enabled != EnabledStatistics::Page {
394 column_index_builder.to_invalid()
395 }
396
397 let offset_index_builder = match props.offset_index_disabled() {
399 false => Some(OffsetIndexBuilder::new()),
400 _ => None,
401 };
402
403 Self {
404 descr,
405 props,
406 statistics_enabled,
407 page_writer,
408 codec,
409 compressor,
410 encoder,
411 def_levels_sink: vec![],
412 rep_levels_sink: vec![],
413 data_pages: VecDeque::new(),
414 page_metrics,
415 column_metrics,
416 column_index_builder,
417 offset_index_builder,
418 encodings,
419 data_page_boundary_ascending: true,
420 data_page_boundary_descending: true,
421 last_non_null_data_page_min_max: None,
422 }
423 }
424
425 #[allow(clippy::too_many_arguments)]
426 pub(crate) fn write_batch_internal(
427 &mut self,
428 values: &E::Values,
429 value_indices: Option<&[usize]>,
430 def_levels: Option<&[i16]>,
431 rep_levels: Option<&[i16]>,
432 min: Option<&E::T>,
433 max: Option<&E::T>,
434 distinct_count: Option<u64>,
435 ) -> Result<usize> {
436 if let (Some(def), Some(rep)) = (def_levels, rep_levels) {
438 if def.len() != rep.len() {
439 return Err(general_err!(
440 "Inconsistent length of definition and repetition levels: {} != {}",
441 def.len(),
442 rep.len()
443 ));
444 }
445 }
446
447 let num_levels = match def_levels {
458 Some(def_levels) => def_levels.len(),
459 None => values.len(),
460 };
461
462 if let Some(min) = min {
463 update_min(&self.descr, min, &mut self.column_metrics.min_column_value);
464 }
465 if let Some(max) = max {
466 update_max(&self.descr, max, &mut self.column_metrics.max_column_value);
467 }
468
469 if self.encoder.num_values() == 0 {
471 self.column_metrics.column_distinct_count = distinct_count;
472 } else {
473 self.column_metrics.column_distinct_count = None;
474 }
475
476 let mut values_offset = 0;
477 let mut levels_offset = 0;
478 let base_batch_size = self.props.write_batch_size();
479 while levels_offset < num_levels {
480 let mut end_offset = num_levels.min(levels_offset + base_batch_size);
481
482 if let Some(r) = rep_levels {
484 while end_offset < r.len() && r[end_offset] != 0 {
485 end_offset += 1;
486 }
487 }
488
489 values_offset += self.write_mini_batch(
490 values,
491 values_offset,
492 value_indices,
493 end_offset - levels_offset,
494 def_levels.map(|lv| &lv[levels_offset..end_offset]),
495 rep_levels.map(|lv| &lv[levels_offset..end_offset]),
496 )?;
497 levels_offset = end_offset;
498 }
499
500 Ok(values_offset)
502 }
503
504 pub fn write_batch(
517 &mut self,
518 values: &E::Values,
519 def_levels: Option<&[i16]>,
520 rep_levels: Option<&[i16]>,
521 ) -> Result<usize> {
522 self.write_batch_internal(values, None, def_levels, rep_levels, None, None, None)
523 }
524
525 pub fn write_batch_with_statistics(
533 &mut self,
534 values: &E::Values,
535 def_levels: Option<&[i16]>,
536 rep_levels: Option<&[i16]>,
537 min: Option<&E::T>,
538 max: Option<&E::T>,
539 distinct_count: Option<u64>,
540 ) -> Result<usize> {
541 self.write_batch_internal(
542 values,
543 None,
544 def_levels,
545 rep_levels,
546 min,
547 max,
548 distinct_count,
549 )
550 }
551
552 #[cfg(feature = "arrow")]
557 pub(crate) fn memory_size(&self) -> usize {
558 self.column_metrics.total_bytes_written as usize + self.encoder.estimated_memory_size()
559 }
560
561 pub fn get_total_bytes_written(&self) -> u64 {
567 self.column_metrics.total_bytes_written
568 }
569
570 #[cfg(feature = "arrow")]
576 pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
577 self.data_pages
578 .iter()
579 .map(|page| page.data().len() as u64)
580 .sum::<u64>()
581 + self.column_metrics.total_bytes_written
582 + self.encoder.estimated_data_page_size() as u64
583 + self.encoder.estimated_dict_page_size().unwrap_or_default() as u64
584 }
585
586 pub fn get_total_rows_written(&self) -> u64 {
589 self.column_metrics.total_rows_written
590 }
591
592 pub fn get_descriptor(&self) -> &ColumnDescPtr {
594 &self.descr
595 }
596
597 pub fn close(mut self) -> Result<ColumnCloseResult> {
600 if self.page_metrics.num_buffered_values > 0 {
601 self.add_data_page()?;
602 }
603 if self.encoder.has_dictionary() {
604 self.write_dictionary_page()?;
605 }
606 self.flush_data_pages()?;
607 let metadata = self.build_column_metadata()?;
608 self.page_writer.close()?;
609
610 let boundary_order = match (
611 self.data_page_boundary_ascending,
612 self.data_page_boundary_descending,
613 ) {
614 (true, _) => BoundaryOrder::ASCENDING,
617 (false, true) => BoundaryOrder::DESCENDING,
618 (false, false) => BoundaryOrder::UNORDERED,
619 };
620 self.column_index_builder.set_boundary_order(boundary_order);
621
622 let column_index = self
623 .column_index_builder
624 .valid()
625 .then(|| self.column_index_builder.build_to_thrift());
626
627 let offset_index = self.offset_index_builder.map(|b| b.build_to_thrift());
628
629 Ok(ColumnCloseResult {
630 bytes_written: self.column_metrics.total_bytes_written,
631 rows_written: self.column_metrics.total_rows_written,
632 bloom_filter: self.encoder.flush_bloom_filter(),
633 metadata,
634 column_index,
635 offset_index,
636 })
637 }
638
639 fn write_mini_batch(
643 &mut self,
644 values: &E::Values,
645 values_offset: usize,
646 value_indices: Option<&[usize]>,
647 num_levels: usize,
648 def_levels: Option<&[i16]>,
649 rep_levels: Option<&[i16]>,
650 ) -> Result<usize> {
651 let values_to_write = if self.descr.max_def_level() > 0 {
653 let levels = def_levels.ok_or_else(|| {
654 general_err!(
655 "Definition levels are required, because max definition level = {}",
656 self.descr.max_def_level()
657 )
658 })?;
659
660 let mut values_to_write = 0;
661 for &level in levels {
662 if level == self.descr.max_def_level() {
663 values_to_write += 1;
664 } else {
665 self.page_metrics.num_page_nulls += 1
667 }
668 }
669
670 self.page_metrics.update_definition_level_histogram(levels);
672
673 self.def_levels_sink.extend_from_slice(levels);
674 values_to_write
675 } else {
676 num_levels
677 };
678
679 if self.descr.max_rep_level() > 0 {
681 let levels = rep_levels.ok_or_else(|| {
683 general_err!(
684 "Repetition levels are required, because max repetition level = {}",
685 self.descr.max_rep_level()
686 )
687 })?;
688
689 if !levels.is_empty() && levels[0] != 0 {
690 return Err(general_err!(
691 "Write must start at a record boundary, got non-zero repetition level of {}",
692 levels[0]
693 ));
694 }
695
696 for &level in levels {
698 self.page_metrics.num_buffered_rows += (level == 0) as u32
699 }
700
701 self.page_metrics.update_repetition_level_histogram(levels);
703
704 self.rep_levels_sink.extend_from_slice(levels);
705 } else {
706 self.page_metrics.num_buffered_rows += num_levels as u32;
709 }
710
711 match value_indices {
712 Some(indices) => {
713 let indices = &indices[values_offset..values_offset + values_to_write];
714 self.encoder.write_gather(values, indices)?;
715 }
716 None => self.encoder.write(values, values_offset, values_to_write)?,
717 }
718
719 self.page_metrics.num_buffered_values += num_levels as u32;
720
721 if self.should_add_data_page() {
722 self.add_data_page()?;
723 }
724
725 if self.should_dict_fallback() {
726 self.dict_fallback()?;
727 }
728
729 Ok(values_to_write)
730 }
731
732 #[inline]
737 fn should_dict_fallback(&self) -> bool {
738 match self.encoder.estimated_dict_page_size() {
739 Some(size) => size >= self.props.dictionary_page_size_limit(),
740 None => false,
741 }
742 }
743
744 #[inline]
746 fn should_add_data_page(&self) -> bool {
747 if self.page_metrics.num_buffered_values == 0 {
752 return false;
753 }
754
755 self.page_metrics.num_buffered_rows as usize >= self.props.data_page_row_count_limit()
756 || self.encoder.estimated_data_page_size() >= self.props.data_page_size_limit()
757 }
758
759 fn dict_fallback(&mut self) -> Result<()> {
762 if self.page_metrics.num_buffered_values > 0 {
764 self.add_data_page()?;
765 }
766 self.write_dictionary_page()?;
767 self.flush_data_pages()?;
768 Ok(())
769 }
770
771 fn update_column_offset_index(
773 &mut self,
774 page_statistics: Option<&ValueStatistics<E::T>>,
775 page_variable_length_bytes: Option<i64>,
776 ) {
777 let null_page =
779 (self.page_metrics.num_buffered_rows as u64) == self.page_metrics.num_page_nulls;
780 if null_page && self.column_index_builder.valid() {
783 self.column_index_builder.append(
784 null_page,
785 vec![],
786 vec![],
787 self.page_metrics.num_page_nulls as i64,
788 );
789 } else if self.column_index_builder.valid() {
790 match &page_statistics {
793 None => {
794 self.column_index_builder.to_invalid();
795 }
796 Some(stat) => {
797 let new_min = stat.min_opt().unwrap();
799 let new_max = stat.max_opt().unwrap();
800 if let Some((last_min, last_max)) = &self.last_non_null_data_page_min_max {
801 if self.data_page_boundary_ascending {
802 let not_ascending = compare_greater(&self.descr, last_min, new_min)
804 || compare_greater(&self.descr, last_max, new_max);
805 if not_ascending {
806 self.data_page_boundary_ascending = false;
807 }
808 }
809
810 if self.data_page_boundary_descending {
811 let not_descending = compare_greater(&self.descr, new_min, last_min)
813 || compare_greater(&self.descr, new_max, last_max);
814 if not_descending {
815 self.data_page_boundary_descending = false;
816 }
817 }
818 }
819 self.last_non_null_data_page_min_max = Some((new_min.clone(), new_max.clone()));
820
821 if self.can_truncate_value() {
822 self.column_index_builder.append(
823 null_page,
824 self.truncate_min_value(
825 self.props.column_index_truncate_length(),
826 stat.min_bytes_opt().unwrap(),
827 )
828 .0,
829 self.truncate_max_value(
830 self.props.column_index_truncate_length(),
831 stat.max_bytes_opt().unwrap(),
832 )
833 .0,
834 self.page_metrics.num_page_nulls as i64,
835 );
836 } else {
837 self.column_index_builder.append(
838 null_page,
839 stat.min_bytes_opt().unwrap().to_vec(),
840 stat.max_bytes_opt().unwrap().to_vec(),
841 self.page_metrics.num_page_nulls as i64,
842 );
843 }
844 }
845 }
846 }
847
848 self.column_index_builder.append_histograms(
850 &self.page_metrics.repetition_level_histogram,
851 &self.page_metrics.definition_level_histogram,
852 );
853
854 if let Some(builder) = self.offset_index_builder.as_mut() {
856 builder.append_row_count(self.page_metrics.num_buffered_rows as i64);
857 builder.append_unencoded_byte_array_data_bytes(page_variable_length_bytes);
858 }
859 }
860
861 fn can_truncate_value(&self) -> bool {
863 match self.descr.physical_type() {
864 Type::FIXED_LEN_BYTE_ARRAY
868 if !matches!(
869 self.descr.logical_type(),
870 Some(LogicalType::Decimal { .. }) | Some(LogicalType::Float16)
871 ) =>
872 {
873 true
874 }
875 Type::BYTE_ARRAY => true,
876 _ => false,
878 }
879 }
880
881 fn is_utf8(&self) -> bool {
883 self.get_descriptor().logical_type() == Some(LogicalType::String)
884 || self.get_descriptor().converted_type() == ConvertedType::UTF8
885 }
886
887 fn truncate_min_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
898 truncation_length
899 .filter(|l| data.len() > *l)
900 .and_then(|l|
901 if self.is_utf8() {
903 match str::from_utf8(data) {
904 Ok(str_data) => truncate_utf8(str_data, l),
905 Err(_) => Some(data[..l].to_vec()),
906 }
907 } else {
908 Some(data[..l].to_vec())
909 }
910 )
911 .map(|truncated| (truncated, true))
912 .unwrap_or_else(|| (data.to_vec(), false))
913 }
914
915 fn truncate_max_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
929 truncation_length
930 .filter(|l| data.len() > *l)
931 .and_then(|l|
932 if self.is_utf8() {
934 match str::from_utf8(data) {
935 Ok(str_data) => truncate_and_increment_utf8(str_data, l),
936 Err(_) => increment(data[..l].to_vec()),
937 }
938 } else {
939 increment(data[..l].to_vec())
940 }
941 )
942 .map(|truncated| (truncated, true))
943 .unwrap_or_else(|| (data.to_vec(), false))
944 }
945
946 fn add_data_page(&mut self) -> Result<()> {
949 let values_data = self.encoder.flush_data_page()?;
951
952 let max_def_level = self.descr.max_def_level();
953 let max_rep_level = self.descr.max_rep_level();
954
955 self.column_metrics.num_column_nulls += self.page_metrics.num_page_nulls;
956
957 let page_statistics = match (values_data.min_value, values_data.max_value) {
958 (Some(min), Some(max)) => {
959 update_min(&self.descr, &min, &mut self.column_metrics.min_column_value);
961 update_max(&self.descr, &max, &mut self.column_metrics.max_column_value);
962
963 (self.statistics_enabled == EnabledStatistics::Page).then_some(
964 ValueStatistics::new(
965 Some(min),
966 Some(max),
967 None,
968 Some(self.page_metrics.num_page_nulls),
969 false,
970 ),
971 )
972 }
973 _ => None,
974 };
975
976 self.update_column_offset_index(
978 page_statistics.as_ref(),
979 values_data.variable_length_bytes,
980 );
981
982 self.column_metrics
984 .update_from_page_metrics(&self.page_metrics);
985 self.column_metrics
986 .update_variable_length_bytes(values_data.variable_length_bytes);
987
988 let page_statistics = page_statistics.map(Statistics::from);
989
990 let compressed_page = match self.props.writer_version() {
991 WriterVersion::PARQUET_1_0 => {
992 let mut buffer = vec![];
993
994 if max_rep_level > 0 {
995 buffer.extend_from_slice(
996 &self.encode_levels_v1(
997 Encoding::RLE,
998 &self.rep_levels_sink[..],
999 max_rep_level,
1000 )[..],
1001 );
1002 }
1003
1004 if max_def_level > 0 {
1005 buffer.extend_from_slice(
1006 &self.encode_levels_v1(
1007 Encoding::RLE,
1008 &self.def_levels_sink[..],
1009 max_def_level,
1010 )[..],
1011 );
1012 }
1013
1014 buffer.extend_from_slice(&values_data.buf);
1015 let uncompressed_size = buffer.len();
1016
1017 if let Some(ref mut cmpr) = self.compressor {
1018 let mut compressed_buf = Vec::with_capacity(uncompressed_size);
1019 cmpr.compress(&buffer[..], &mut compressed_buf)?;
1020 buffer = compressed_buf;
1021 }
1022
1023 let data_page = Page::DataPage {
1024 buf: buffer.into(),
1025 num_values: self.page_metrics.num_buffered_values,
1026 encoding: values_data.encoding,
1027 def_level_encoding: Encoding::RLE,
1028 rep_level_encoding: Encoding::RLE,
1029 statistics: page_statistics,
1030 };
1031
1032 CompressedPage::new(data_page, uncompressed_size)
1033 }
1034 WriterVersion::PARQUET_2_0 => {
1035 let mut rep_levels_byte_len = 0;
1036 let mut def_levels_byte_len = 0;
1037 let mut buffer = vec![];
1038
1039 if max_rep_level > 0 {
1040 let levels = self.encode_levels_v2(&self.rep_levels_sink[..], max_rep_level);
1041 rep_levels_byte_len = levels.len();
1042 buffer.extend_from_slice(&levels[..]);
1043 }
1044
1045 if max_def_level > 0 {
1046 let levels = self.encode_levels_v2(&self.def_levels_sink[..], max_def_level);
1047 def_levels_byte_len = levels.len();
1048 buffer.extend_from_slice(&levels[..]);
1049 }
1050
1051 let uncompressed_size =
1052 rep_levels_byte_len + def_levels_byte_len + values_data.buf.len();
1053
1054 match self.compressor {
1056 Some(ref mut cmpr) => {
1057 cmpr.compress(&values_data.buf, &mut buffer)?;
1058 }
1059 None => buffer.extend_from_slice(&values_data.buf),
1060 }
1061
1062 let data_page = Page::DataPageV2 {
1063 buf: buffer.into(),
1064 num_values: self.page_metrics.num_buffered_values,
1065 encoding: values_data.encoding,
1066 num_nulls: self.page_metrics.num_page_nulls as u32,
1067 num_rows: self.page_metrics.num_buffered_rows,
1068 def_levels_byte_len: def_levels_byte_len as u32,
1069 rep_levels_byte_len: rep_levels_byte_len as u32,
1070 is_compressed: self.compressor.is_some(),
1071 statistics: page_statistics,
1072 };
1073
1074 CompressedPage::new(data_page, uncompressed_size)
1075 }
1076 };
1077
1078 if self.encoder.has_dictionary() {
1080 self.data_pages.push_back(compressed_page);
1081 } else {
1082 self.write_data_page(compressed_page)?;
1083 }
1084
1085 self.column_metrics.total_rows_written += self.page_metrics.num_buffered_rows as u64;
1087
1088 self.rep_levels_sink.clear();
1090 self.def_levels_sink.clear();
1091 self.page_metrics.new_page();
1092
1093 Ok(())
1094 }
1095
1096 #[inline]
1099 fn flush_data_pages(&mut self) -> Result<()> {
1100 if self.page_metrics.num_buffered_values > 0 {
1102 self.add_data_page()?;
1103 }
1104
1105 while let Some(page) = self.data_pages.pop_front() {
1106 self.write_data_page(page)?;
1107 }
1108
1109 Ok(())
1110 }
1111
1112 fn build_column_metadata(&mut self) -> Result<ColumnChunkMetaData> {
1114 let total_compressed_size = self.column_metrics.total_compressed_size as i64;
1115 let total_uncompressed_size = self.column_metrics.total_uncompressed_size as i64;
1116 let num_values = self.column_metrics.total_num_values as i64;
1117 let dict_page_offset = self.column_metrics.dictionary_page_offset.map(|v| v as i64);
1118 let data_page_offset = self.column_metrics.data_page_offset.unwrap_or(0) as i64;
1120
1121 let mut builder = ColumnChunkMetaData::builder(self.descr.clone())
1122 .set_compression(self.codec)
1123 .set_encodings(self.encodings.iter().cloned().collect())
1124 .set_total_compressed_size(total_compressed_size)
1125 .set_total_uncompressed_size(total_uncompressed_size)
1126 .set_num_values(num_values)
1127 .set_data_page_offset(data_page_offset)
1128 .set_dictionary_page_offset(dict_page_offset);
1129
1130 if self.statistics_enabled != EnabledStatistics::None {
1131 let backwards_compatible_min_max = self.descr.sort_order().is_signed();
1132
1133 let statistics = ValueStatistics::<E::T>::new(
1134 self.column_metrics.min_column_value.clone(),
1135 self.column_metrics.max_column_value.clone(),
1136 self.column_metrics.column_distinct_count,
1137 Some(self.column_metrics.num_column_nulls),
1138 false,
1139 )
1140 .with_backwards_compatible_min_max(backwards_compatible_min_max)
1141 .into();
1142
1143 let statistics = match statistics {
1144 Statistics::ByteArray(stats) if stats._internal_has_min_max_set() => {
1145 let (min, did_truncate_min) = self.truncate_min_value(
1146 self.props.statistics_truncate_length(),
1147 stats.min_bytes_opt().unwrap(),
1148 );
1149 let (max, did_truncate_max) = self.truncate_max_value(
1150 self.props.statistics_truncate_length(),
1151 stats.max_bytes_opt().unwrap(),
1152 );
1153 Statistics::ByteArray(
1154 ValueStatistics::new(
1155 Some(min.into()),
1156 Some(max.into()),
1157 stats.distinct_count(),
1158 stats.null_count_opt(),
1159 backwards_compatible_min_max,
1160 )
1161 .with_max_is_exact(!did_truncate_max)
1162 .with_min_is_exact(!did_truncate_min),
1163 )
1164 }
1165 Statistics::FixedLenByteArray(stats)
1166 if (stats._internal_has_min_max_set() && self.can_truncate_value()) =>
1167 {
1168 let (min, did_truncate_min) = self.truncate_min_value(
1169 self.props.statistics_truncate_length(),
1170 stats.min_bytes_opt().unwrap(),
1171 );
1172 let (max, did_truncate_max) = self.truncate_max_value(
1173 self.props.statistics_truncate_length(),
1174 stats.max_bytes_opt().unwrap(),
1175 );
1176 Statistics::FixedLenByteArray(
1177 ValueStatistics::new(
1178 Some(min.into()),
1179 Some(max.into()),
1180 stats.distinct_count(),
1181 stats.null_count_opt(),
1182 backwards_compatible_min_max,
1183 )
1184 .with_max_is_exact(!did_truncate_max)
1185 .with_min_is_exact(!did_truncate_min),
1186 )
1187 }
1188 stats => stats,
1189 };
1190
1191 builder = builder
1192 .set_statistics(statistics)
1193 .set_unencoded_byte_array_data_bytes(self.column_metrics.variable_length_bytes)
1194 .set_repetition_level_histogram(
1195 self.column_metrics.repetition_level_histogram.take(),
1196 )
1197 .set_definition_level_histogram(
1198 self.column_metrics.definition_level_histogram.take(),
1199 );
1200 }
1201
1202 let metadata = builder.build()?;
1203 Ok(metadata)
1204 }
1205
1206 #[inline]
1208 fn encode_levels_v1(&self, encoding: Encoding, levels: &[i16], max_level: i16) -> Vec<u8> {
1209 let mut encoder = LevelEncoder::v1(encoding, max_level, levels.len());
1210 encoder.put(levels);
1211 encoder.consume()
1212 }
1213
1214 #[inline]
1217 fn encode_levels_v2(&self, levels: &[i16], max_level: i16) -> Vec<u8> {
1218 let mut encoder = LevelEncoder::v2(max_level, levels.len());
1219 encoder.put(levels);
1220 encoder.consume()
1221 }
1222
1223 #[inline]
1225 fn write_data_page(&mut self, page: CompressedPage) -> Result<()> {
1226 self.encodings.insert(page.encoding());
1227 let page_spec = self.page_writer.write_page(page)?;
1228 if let Some(builder) = self.offset_index_builder.as_mut() {
1231 builder
1232 .append_offset_and_size(page_spec.offset as i64, page_spec.compressed_size as i32)
1233 }
1234 self.update_metrics_for_page(page_spec);
1235 Ok(())
1236 }
1237
1238 #[inline]
1240 fn write_dictionary_page(&mut self) -> Result<()> {
1241 let compressed_page = {
1242 let mut page = self
1243 .encoder
1244 .flush_dict_page()?
1245 .ok_or_else(|| general_err!("Dictionary encoder is not set"))?;
1246
1247 let uncompressed_size = page.buf.len();
1248
1249 if let Some(ref mut cmpr) = self.compressor {
1250 let mut output_buf = Vec::with_capacity(uncompressed_size);
1251 cmpr.compress(&page.buf, &mut output_buf)?;
1252 page.buf = Bytes::from(output_buf);
1253 }
1254
1255 let dict_page = Page::DictionaryPage {
1256 buf: page.buf,
1257 num_values: page.num_values as u32,
1258 encoding: self.props.dictionary_page_encoding(),
1259 is_sorted: page.is_sorted,
1260 };
1261 CompressedPage::new(dict_page, uncompressed_size)
1262 };
1263
1264 self.encodings.insert(compressed_page.encoding());
1265 let page_spec = self.page_writer.write_page(compressed_page)?;
1266 self.update_metrics_for_page(page_spec);
1267 Ok(())
1269 }
1270
1271 #[inline]
1273 fn update_metrics_for_page(&mut self, page_spec: PageWriteSpec) {
1274 self.column_metrics.total_uncompressed_size += page_spec.uncompressed_size as u64;
1275 self.column_metrics.total_compressed_size += page_spec.compressed_size as u64;
1276 self.column_metrics.total_bytes_written += page_spec.bytes_written;
1277
1278 match page_spec.page_type {
1279 PageType::DATA_PAGE | PageType::DATA_PAGE_V2 => {
1280 self.column_metrics.total_num_values += page_spec.num_values as u64;
1281 if self.column_metrics.data_page_offset.is_none() {
1282 self.column_metrics.data_page_offset = Some(page_spec.offset);
1283 }
1284 }
1285 PageType::DICTIONARY_PAGE => {
1286 assert!(
1287 self.column_metrics.dictionary_page_offset.is_none(),
1288 "Dictionary offset is already set"
1289 );
1290 self.column_metrics.dictionary_page_offset = Some(page_spec.offset);
1291 }
1292 _ => {}
1293 }
1294 }
1295}
1296
1297fn update_min<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, min: &mut Option<T>) {
1298 update_stat::<T, _>(descr, val, min, |cur| compare_greater(descr, cur, val))
1299}
1300
1301fn update_max<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, max: &mut Option<T>) {
1302 update_stat::<T, _>(descr, val, max, |cur| compare_greater(descr, val, cur))
1303}
1304
1305#[inline]
1306#[allow(clippy::eq_op)]
1307fn is_nan<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T) -> bool {
1308 match T::PHYSICAL_TYPE {
1309 Type::FLOAT | Type::DOUBLE => val != val,
1310 Type::FIXED_LEN_BYTE_ARRAY if descr.logical_type() == Some(LogicalType::Float16) => {
1311 let val = val.as_bytes();
1312 let val = f16::from_le_bytes([val[0], val[1]]);
1313 val.is_nan()
1314 }
1315 _ => false,
1316 }
1317}
1318
1319fn update_stat<T: ParquetValueType, F>(
1324 descr: &ColumnDescriptor,
1325 val: &T,
1326 cur: &mut Option<T>,
1327 should_update: F,
1328) where
1329 F: Fn(&T) -> bool,
1330{
1331 if is_nan(descr, val) {
1332 return;
1333 }
1334
1335 if cur.as_ref().map_or(true, should_update) {
1336 *cur = Some(val.clone());
1337 }
1338}
1339
1340fn compare_greater<T: ParquetValueType>(descr: &ColumnDescriptor, a: &T, b: &T) -> bool {
1342 if let Some(LogicalType::Integer { is_signed, .. }) = descr.logical_type() {
1343 if !is_signed {
1344 return a.as_u64().unwrap() > b.as_u64().unwrap();
1346 }
1347 }
1348
1349 match descr.converted_type() {
1350 ConvertedType::UINT_8
1351 | ConvertedType::UINT_16
1352 | ConvertedType::UINT_32
1353 | ConvertedType::UINT_64 => {
1354 return a.as_u64().unwrap() > b.as_u64().unwrap();
1355 }
1356 _ => {}
1357 };
1358
1359 if let Some(LogicalType::Decimal { .. }) = descr.logical_type() {
1360 match T::PHYSICAL_TYPE {
1361 Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
1362 return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1363 }
1364 _ => {}
1365 };
1366 }
1367
1368 if descr.converted_type() == ConvertedType::DECIMAL {
1369 match T::PHYSICAL_TYPE {
1370 Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
1371 return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1372 }
1373 _ => {}
1374 };
1375 };
1376
1377 if let Some(LogicalType::Float16) = descr.logical_type() {
1378 let a = a.as_bytes();
1379 let a = f16::from_le_bytes([a[0], a[1]]);
1380 let b = b.as_bytes();
1381 let b = f16::from_le_bytes([b[0], b[1]]);
1382 return a > b;
1383 }
1384
1385 a > b
1386}
1387
1388fn fallback_encoding(kind: Type, props: &WriterProperties) -> Encoding {
1396 match (kind, props.writer_version()) {
1397 (Type::BOOLEAN, WriterVersion::PARQUET_2_0) => Encoding::RLE,
1398 (Type::INT32, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1399 (Type::INT64, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1400 (Type::BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1401 (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1402 _ => Encoding::PLAIN,
1403 }
1404}
1405
1406fn has_dictionary_support(kind: Type, props: &WriterProperties) -> bool {
1408 match (kind, props.writer_version()) {
1409 (Type::BOOLEAN, _) => false,
1411 (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_1_0) => false,
1413 (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => true,
1414 _ => true,
1415 }
1416}
1417
1418fn compare_greater_byte_array_decimals(a: &[u8], b: &[u8]) -> bool {
1420 let a_length = a.len();
1421 let b_length = b.len();
1422
1423 if a_length == 0 || b_length == 0 {
1424 return a_length > 0;
1425 }
1426
1427 let first_a: u8 = a[0];
1428 let first_b: u8 = b[0];
1429
1430 if (0x80 & first_a) != (0x80 & first_b) || (a_length == b_length && first_a != first_b) {
1435 return (first_a as i8) > (first_b as i8);
1436 }
1437
1438 let extension: u8 = if (first_a as i8) < 0 { 0xFF } else { 0 };
1444
1445 if a_length != b_length {
1446 let not_equal = if a_length > b_length {
1447 let lead_length = a_length - b_length;
1448 a[0..lead_length].iter().any(|&x| x != extension)
1449 } else {
1450 let lead_length = b_length - a_length;
1451 b[0..lead_length].iter().any(|&x| x != extension)
1452 };
1453
1454 if not_equal {
1455 let negative_values: bool = (first_a as i8) < 0;
1456 let a_longer: bool = a_length > b_length;
1457 return if negative_values { !a_longer } else { a_longer };
1458 }
1459 }
1460
1461 (a[1..]) > (b[1..])
1462}
1463
1464fn truncate_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1470 let split = (1..=length).rfind(|x| data.is_char_boundary(*x))?;
1471 Some(data.as_bytes()[..split].to_vec())
1472}
1473
1474fn truncate_and_increment_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1480 let lower_bound = length.saturating_sub(3);
1482 let split = (lower_bound..=length).rfind(|x| data.is_char_boundary(*x))?;
1483 increment_utf8(data.get(..split)?)
1484}
1485
1486fn increment_utf8(data: &str) -> Option<Vec<u8>> {
1493 for (idx, original_char) in data.char_indices().rev() {
1494 let original_len = original_char.len_utf8();
1495 if let Some(next_char) = char::from_u32(original_char as u32 + 1) {
1496 if next_char.len_utf8() == original_len {
1498 let mut result = data.as_bytes()[..idx + original_len].to_vec();
1499 next_char.encode_utf8(&mut result[idx..]);
1500 return Some(result);
1501 }
1502 }
1503 }
1504
1505 None
1506}
1507
1508fn increment(mut data: Vec<u8>) -> Option<Vec<u8>> {
1512 for byte in data.iter_mut().rev() {
1513 let (incremented, overflow) = byte.overflowing_add(1);
1514 *byte = incremented;
1515
1516 if !overflow {
1517 return Some(data);
1518 }
1519 }
1520
1521 None
1522}
1523
1524#[cfg(test)]
1525mod tests {
1526 use crate::{
1527 file::{properties::DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, writer::SerializedFileWriter},
1528 schema::parser::parse_message_type,
1529 };
1530 use core::str;
1531 use rand::distributions::uniform::SampleUniform;
1532 use std::{fs::File, sync::Arc};
1533
1534 use crate::column::{
1535 page::PageReader,
1536 reader::{get_column_reader, get_typed_column_reader, ColumnReaderImpl},
1537 };
1538 use crate::file::writer::TrackedWrite;
1539 use crate::file::{
1540 properties::ReaderProperties, reader::SerializedPageReader, writer::SerializedPageWriter,
1541 };
1542 use crate::schema::types::{ColumnPath, Type as SchemaType};
1543 use crate::util::test_common::rand_gen::random_numbers_range;
1544
1545 use super::*;
1546
1547 #[test]
1548 fn test_column_writer_inconsistent_def_rep_length() {
1549 let page_writer = get_test_page_writer();
1550 let props = Default::default();
1551 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 1, props);
1552 let res = writer.write_batch(&[1, 2, 3, 4], Some(&[1, 1, 1]), Some(&[0, 0]));
1553 assert!(res.is_err());
1554 if let Err(err) = res {
1555 assert_eq!(
1556 format!("{err}"),
1557 "Parquet error: Inconsistent length of definition and repetition levels: 3 != 2"
1558 );
1559 }
1560 }
1561
1562 #[test]
1563 fn test_column_writer_invalid_def_levels() {
1564 let page_writer = get_test_page_writer();
1565 let props = Default::default();
1566 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1567 let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1568 assert!(res.is_err());
1569 if let Err(err) = res {
1570 assert_eq!(
1571 format!("{err}"),
1572 "Parquet error: Definition levels are required, because max definition level = 1"
1573 );
1574 }
1575 }
1576
1577 #[test]
1578 fn test_column_writer_invalid_rep_levels() {
1579 let page_writer = get_test_page_writer();
1580 let props = Default::default();
1581 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 1, props);
1582 let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1583 assert!(res.is_err());
1584 if let Err(err) = res {
1585 assert_eq!(
1586 format!("{err}"),
1587 "Parquet error: Repetition levels are required, because max repetition level = 1"
1588 );
1589 }
1590 }
1591
1592 #[test]
1593 fn test_column_writer_not_enough_values_to_write() {
1594 let page_writer = get_test_page_writer();
1595 let props = Default::default();
1596 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1597 let res = writer.write_batch(&[1, 2], Some(&[1, 1, 1, 1]), None);
1598 assert!(res.is_err());
1599 if let Err(err) = res {
1600 assert_eq!(
1601 format!("{err}"),
1602 "Parquet error: Expected to write 4 values, but have only 2"
1603 );
1604 }
1605 }
1606
1607 #[test]
1608 fn test_column_writer_write_only_one_dictionary_page() {
1609 let page_writer = get_test_page_writer();
1610 let props = Default::default();
1611 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1612 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1613 writer.add_data_page().unwrap();
1615 writer.write_dictionary_page().unwrap();
1616 let err = writer.write_dictionary_page().unwrap_err().to_string();
1617 assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1618 }
1619
1620 #[test]
1621 fn test_column_writer_error_when_writing_disabled_dictionary() {
1622 let page_writer = get_test_page_writer();
1623 let props = Arc::new(
1624 WriterProperties::builder()
1625 .set_dictionary_enabled(false)
1626 .build(),
1627 );
1628 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1629 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1630 let err = writer.write_dictionary_page().unwrap_err().to_string();
1631 assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1632 }
1633
1634 #[test]
1635 fn test_column_writer_boolean_type_does_not_support_dictionary() {
1636 let page_writer = get_test_page_writer();
1637 let props = Arc::new(
1638 WriterProperties::builder()
1639 .set_dictionary_enabled(true)
1640 .build(),
1641 );
1642 let mut writer = get_test_column_writer::<BoolType>(page_writer, 0, 0, props);
1643 writer
1644 .write_batch(&[true, false, true, false], None, None)
1645 .unwrap();
1646
1647 let r = writer.close().unwrap();
1648 assert_eq!(r.bytes_written, 1);
1651 assert_eq!(r.rows_written, 4);
1652
1653 let metadata = r.metadata;
1654 assert_eq!(metadata.encodings(), &vec![Encoding::PLAIN, Encoding::RLE]);
1655 assert_eq!(metadata.num_values(), 4); assert_eq!(metadata.dictionary_page_offset(), None);
1657 }
1658
1659 #[test]
1660 fn test_column_writer_default_encoding_support_bool() {
1661 check_encoding_write_support::<BoolType>(
1662 WriterVersion::PARQUET_1_0,
1663 true,
1664 &[true, false],
1665 None,
1666 &[Encoding::PLAIN, Encoding::RLE],
1667 );
1668 check_encoding_write_support::<BoolType>(
1669 WriterVersion::PARQUET_1_0,
1670 false,
1671 &[true, false],
1672 None,
1673 &[Encoding::PLAIN, Encoding::RLE],
1674 );
1675 check_encoding_write_support::<BoolType>(
1676 WriterVersion::PARQUET_2_0,
1677 true,
1678 &[true, false],
1679 None,
1680 &[Encoding::RLE],
1681 );
1682 check_encoding_write_support::<BoolType>(
1683 WriterVersion::PARQUET_2_0,
1684 false,
1685 &[true, false],
1686 None,
1687 &[Encoding::RLE],
1688 );
1689 }
1690
1691 #[test]
1692 fn test_column_writer_default_encoding_support_int32() {
1693 check_encoding_write_support::<Int32Type>(
1694 WriterVersion::PARQUET_1_0,
1695 true,
1696 &[1, 2],
1697 Some(0),
1698 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1699 );
1700 check_encoding_write_support::<Int32Type>(
1701 WriterVersion::PARQUET_1_0,
1702 false,
1703 &[1, 2],
1704 None,
1705 &[Encoding::PLAIN, Encoding::RLE],
1706 );
1707 check_encoding_write_support::<Int32Type>(
1708 WriterVersion::PARQUET_2_0,
1709 true,
1710 &[1, 2],
1711 Some(0),
1712 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1713 );
1714 check_encoding_write_support::<Int32Type>(
1715 WriterVersion::PARQUET_2_0,
1716 false,
1717 &[1, 2],
1718 None,
1719 &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
1720 );
1721 }
1722
1723 #[test]
1724 fn test_column_writer_default_encoding_support_int64() {
1725 check_encoding_write_support::<Int64Type>(
1726 WriterVersion::PARQUET_1_0,
1727 true,
1728 &[1, 2],
1729 Some(0),
1730 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1731 );
1732 check_encoding_write_support::<Int64Type>(
1733 WriterVersion::PARQUET_1_0,
1734 false,
1735 &[1, 2],
1736 None,
1737 &[Encoding::PLAIN, Encoding::RLE],
1738 );
1739 check_encoding_write_support::<Int64Type>(
1740 WriterVersion::PARQUET_2_0,
1741 true,
1742 &[1, 2],
1743 Some(0),
1744 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1745 );
1746 check_encoding_write_support::<Int64Type>(
1747 WriterVersion::PARQUET_2_0,
1748 false,
1749 &[1, 2],
1750 None,
1751 &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
1752 );
1753 }
1754
1755 #[test]
1756 fn test_column_writer_default_encoding_support_int96() {
1757 check_encoding_write_support::<Int96Type>(
1758 WriterVersion::PARQUET_1_0,
1759 true,
1760 &[Int96::from(vec![1, 2, 3])],
1761 Some(0),
1762 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1763 );
1764 check_encoding_write_support::<Int96Type>(
1765 WriterVersion::PARQUET_1_0,
1766 false,
1767 &[Int96::from(vec![1, 2, 3])],
1768 None,
1769 &[Encoding::PLAIN, Encoding::RLE],
1770 );
1771 check_encoding_write_support::<Int96Type>(
1772 WriterVersion::PARQUET_2_0,
1773 true,
1774 &[Int96::from(vec![1, 2, 3])],
1775 Some(0),
1776 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1777 );
1778 check_encoding_write_support::<Int96Type>(
1779 WriterVersion::PARQUET_2_0,
1780 false,
1781 &[Int96::from(vec![1, 2, 3])],
1782 None,
1783 &[Encoding::PLAIN, Encoding::RLE],
1784 );
1785 }
1786
1787 #[test]
1788 fn test_column_writer_default_encoding_support_float() {
1789 check_encoding_write_support::<FloatType>(
1790 WriterVersion::PARQUET_1_0,
1791 true,
1792 &[1.0, 2.0],
1793 Some(0),
1794 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1795 );
1796 check_encoding_write_support::<FloatType>(
1797 WriterVersion::PARQUET_1_0,
1798 false,
1799 &[1.0, 2.0],
1800 None,
1801 &[Encoding::PLAIN, Encoding::RLE],
1802 );
1803 check_encoding_write_support::<FloatType>(
1804 WriterVersion::PARQUET_2_0,
1805 true,
1806 &[1.0, 2.0],
1807 Some(0),
1808 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1809 );
1810 check_encoding_write_support::<FloatType>(
1811 WriterVersion::PARQUET_2_0,
1812 false,
1813 &[1.0, 2.0],
1814 None,
1815 &[Encoding::PLAIN, Encoding::RLE],
1816 );
1817 }
1818
1819 #[test]
1820 fn test_column_writer_default_encoding_support_double() {
1821 check_encoding_write_support::<DoubleType>(
1822 WriterVersion::PARQUET_1_0,
1823 true,
1824 &[1.0, 2.0],
1825 Some(0),
1826 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1827 );
1828 check_encoding_write_support::<DoubleType>(
1829 WriterVersion::PARQUET_1_0,
1830 false,
1831 &[1.0, 2.0],
1832 None,
1833 &[Encoding::PLAIN, Encoding::RLE],
1834 );
1835 check_encoding_write_support::<DoubleType>(
1836 WriterVersion::PARQUET_2_0,
1837 true,
1838 &[1.0, 2.0],
1839 Some(0),
1840 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1841 );
1842 check_encoding_write_support::<DoubleType>(
1843 WriterVersion::PARQUET_2_0,
1844 false,
1845 &[1.0, 2.0],
1846 None,
1847 &[Encoding::PLAIN, Encoding::RLE],
1848 );
1849 }
1850
1851 #[test]
1852 fn test_column_writer_default_encoding_support_byte_array() {
1853 check_encoding_write_support::<ByteArrayType>(
1854 WriterVersion::PARQUET_1_0,
1855 true,
1856 &[ByteArray::from(vec![1u8])],
1857 Some(0),
1858 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1859 );
1860 check_encoding_write_support::<ByteArrayType>(
1861 WriterVersion::PARQUET_1_0,
1862 false,
1863 &[ByteArray::from(vec![1u8])],
1864 None,
1865 &[Encoding::PLAIN, Encoding::RLE],
1866 );
1867 check_encoding_write_support::<ByteArrayType>(
1868 WriterVersion::PARQUET_2_0,
1869 true,
1870 &[ByteArray::from(vec![1u8])],
1871 Some(0),
1872 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1873 );
1874 check_encoding_write_support::<ByteArrayType>(
1875 WriterVersion::PARQUET_2_0,
1876 false,
1877 &[ByteArray::from(vec![1u8])],
1878 None,
1879 &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
1880 );
1881 }
1882
1883 #[test]
1884 fn test_column_writer_default_encoding_support_fixed_len_byte_array() {
1885 check_encoding_write_support::<FixedLenByteArrayType>(
1886 WriterVersion::PARQUET_1_0,
1887 true,
1888 &[ByteArray::from(vec![1u8]).into()],
1889 None,
1890 &[Encoding::PLAIN, Encoding::RLE],
1891 );
1892 check_encoding_write_support::<FixedLenByteArrayType>(
1893 WriterVersion::PARQUET_1_0,
1894 false,
1895 &[ByteArray::from(vec![1u8]).into()],
1896 None,
1897 &[Encoding::PLAIN, Encoding::RLE],
1898 );
1899 check_encoding_write_support::<FixedLenByteArrayType>(
1900 WriterVersion::PARQUET_2_0,
1901 true,
1902 &[ByteArray::from(vec![1u8]).into()],
1903 Some(0),
1904 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1905 );
1906 check_encoding_write_support::<FixedLenByteArrayType>(
1907 WriterVersion::PARQUET_2_0,
1908 false,
1909 &[ByteArray::from(vec![1u8]).into()],
1910 None,
1911 &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
1912 );
1913 }
1914
1915 #[test]
1916 fn test_column_writer_check_metadata() {
1917 let page_writer = get_test_page_writer();
1918 let props = Default::default();
1919 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1920 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1921
1922 let r = writer.close().unwrap();
1923 assert_eq!(r.bytes_written, 20);
1924 assert_eq!(r.rows_written, 4);
1925
1926 let metadata = r.metadata;
1927 assert_eq!(
1928 metadata.encodings(),
1929 &vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
1930 );
1931 assert_eq!(metadata.num_values(), 4);
1932 assert_eq!(metadata.compressed_size(), 20);
1933 assert_eq!(metadata.uncompressed_size(), 20);
1934 assert_eq!(metadata.data_page_offset(), 0);
1935 assert_eq!(metadata.dictionary_page_offset(), Some(0));
1936 if let Some(stats) = metadata.statistics() {
1937 assert_eq!(stats.null_count_opt(), Some(0));
1938 assert_eq!(stats.distinct_count_opt(), None);
1939 if let Statistics::Int32(stats) = stats {
1940 assert_eq!(stats.min_opt().unwrap(), &1);
1941 assert_eq!(stats.max_opt().unwrap(), &4);
1942 } else {
1943 panic!("expecting Statistics::Int32");
1944 }
1945 } else {
1946 panic!("metadata missing statistics");
1947 }
1948 }
1949
1950 #[test]
1951 fn test_column_writer_check_byte_array_min_max() {
1952 let page_writer = get_test_page_writer();
1953 let props = Default::default();
1954 let mut writer = get_test_decimals_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
1955 writer
1956 .write_batch(
1957 &[
1958 ByteArray::from(vec![
1959 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
1960 35u8, 231u8, 90u8, 0u8, 0u8,
1961 ]),
1962 ByteArray::from(vec![
1963 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 228u8, 62u8, 146u8,
1964 152u8, 177u8, 56u8, 0u8, 0u8,
1965 ]),
1966 ByteArray::from(vec![
1967 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8,
1968 0u8,
1969 ]),
1970 ByteArray::from(vec![
1971 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
1972 44u8, 0u8, 0u8,
1973 ]),
1974 ],
1975 None,
1976 None,
1977 )
1978 .unwrap();
1979 let metadata = writer.close().unwrap().metadata;
1980 if let Some(stats) = metadata.statistics() {
1981 if let Statistics::ByteArray(stats) = stats {
1982 assert_eq!(
1983 stats.min_opt().unwrap(),
1984 &ByteArray::from(vec![
1985 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
1986 35u8, 231u8, 90u8, 0u8, 0u8,
1987 ])
1988 );
1989 assert_eq!(
1990 stats.max_opt().unwrap(),
1991 &ByteArray::from(vec![
1992 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
1993 44u8, 0u8, 0u8,
1994 ])
1995 );
1996 } else {
1997 panic!("expecting Statistics::ByteArray");
1998 }
1999 } else {
2000 panic!("metadata missing statistics");
2001 }
2002 }
2003
2004 #[test]
2005 fn test_column_writer_uint32_converted_type_min_max() {
2006 let page_writer = get_test_page_writer();
2007 let props = Default::default();
2008 let mut writer = get_test_unsigned_int_given_as_converted_column_writer::<Int32Type>(
2009 page_writer,
2010 0,
2011 0,
2012 props,
2013 );
2014 writer.write_batch(&[0, 1, 2, 3, 4, 5], None, None).unwrap();
2015 let metadata = writer.close().unwrap().metadata;
2016 if let Some(stats) = metadata.statistics() {
2017 if let Statistics::Int32(stats) = stats {
2018 assert_eq!(stats.min_opt().unwrap(), &0,);
2019 assert_eq!(stats.max_opt().unwrap(), &5,);
2020 } else {
2021 panic!("expecting Statistics::Int32");
2022 }
2023 } else {
2024 panic!("metadata missing statistics");
2025 }
2026 }
2027
2028 #[test]
2029 fn test_column_writer_precalculated_statistics() {
2030 let page_writer = get_test_page_writer();
2031 let props = Arc::new(
2032 WriterProperties::builder()
2033 .set_statistics_enabled(EnabledStatistics::Chunk)
2034 .build(),
2035 );
2036 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2037 writer
2038 .write_batch_with_statistics(
2039 &[1, 2, 3, 4],
2040 None,
2041 None,
2042 Some(&-17),
2043 Some(&9000),
2044 Some(55),
2045 )
2046 .unwrap();
2047
2048 let r = writer.close().unwrap();
2049 assert_eq!(r.bytes_written, 20);
2050 assert_eq!(r.rows_written, 4);
2051
2052 let metadata = r.metadata;
2053 assert_eq!(
2054 metadata.encodings(),
2055 &vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
2056 );
2057 assert_eq!(metadata.num_values(), 4);
2058 assert_eq!(metadata.compressed_size(), 20);
2059 assert_eq!(metadata.uncompressed_size(), 20);
2060 assert_eq!(metadata.data_page_offset(), 0);
2061 assert_eq!(metadata.dictionary_page_offset(), Some(0));
2062 if let Some(stats) = metadata.statistics() {
2063 assert_eq!(stats.null_count_opt(), Some(0));
2064 assert_eq!(stats.distinct_count_opt().unwrap_or(0), 55);
2065 if let Statistics::Int32(stats) = stats {
2066 assert_eq!(stats.min_opt().unwrap(), &-17);
2067 assert_eq!(stats.max_opt().unwrap(), &9000);
2068 } else {
2069 panic!("expecting Statistics::Int32");
2070 }
2071 } else {
2072 panic!("metadata missing statistics");
2073 }
2074 }
2075
2076 #[test]
2077 fn test_mixed_precomputed_statistics() {
2078 let mut buf = Vec::with_capacity(100);
2079 let mut write = TrackedWrite::new(&mut buf);
2080 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2081 let props = Default::default();
2082 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2083
2084 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2085 writer
2086 .write_batch_with_statistics(&[5, 6, 7], None, None, Some(&5), Some(&7), Some(3))
2087 .unwrap();
2088
2089 let r = writer.close().unwrap();
2090
2091 let stats = r.metadata.statistics().unwrap();
2092 assert_eq!(stats.min_bytes_opt().unwrap(), 1_i32.to_le_bytes());
2093 assert_eq!(stats.max_bytes_opt().unwrap(), 7_i32.to_le_bytes());
2094 assert_eq!(stats.null_count_opt(), Some(0));
2095 assert!(stats.distinct_count_opt().is_none());
2096
2097 drop(write);
2098
2099 let props = ReaderProperties::builder()
2100 .set_backward_compatible_lz4(false)
2101 .build();
2102 let reader = SerializedPageReader::new_with_properties(
2103 Arc::new(Bytes::from(buf)),
2104 &r.metadata,
2105 r.rows_written as usize,
2106 None,
2107 Arc::new(props),
2108 )
2109 .unwrap();
2110
2111 let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2112 assert_eq!(pages.len(), 2);
2113
2114 assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2115 assert_eq!(pages[1].page_type(), PageType::DATA_PAGE);
2116
2117 let page_statistics = pages[1].statistics().unwrap();
2118 assert_eq!(
2119 page_statistics.min_bytes_opt().unwrap(),
2120 1_i32.to_le_bytes()
2121 );
2122 assert_eq!(
2123 page_statistics.max_bytes_opt().unwrap(),
2124 7_i32.to_le_bytes()
2125 );
2126 assert_eq!(page_statistics.null_count_opt(), Some(0));
2127 assert!(page_statistics.distinct_count_opt().is_none());
2128 }
2129
2130 #[test]
2131 fn test_disabled_statistics() {
2132 let mut buf = Vec::with_capacity(100);
2133 let mut write = TrackedWrite::new(&mut buf);
2134 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2135 let props = WriterProperties::builder()
2136 .set_statistics_enabled(EnabledStatistics::None)
2137 .set_writer_version(WriterVersion::PARQUET_2_0)
2138 .build();
2139 let props = Arc::new(props);
2140
2141 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
2142 writer
2143 .write_batch(&[1, 2, 3, 4], Some(&[1, 0, 0, 1, 1, 1]), None)
2144 .unwrap();
2145
2146 let r = writer.close().unwrap();
2147 assert!(r.metadata.statistics().is_none());
2148
2149 drop(write);
2150
2151 let props = ReaderProperties::builder()
2152 .set_backward_compatible_lz4(false)
2153 .build();
2154 let reader = SerializedPageReader::new_with_properties(
2155 Arc::new(Bytes::from(buf)),
2156 &r.metadata,
2157 r.rows_written as usize,
2158 None,
2159 Arc::new(props),
2160 )
2161 .unwrap();
2162
2163 let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2164 assert_eq!(pages.len(), 2);
2165
2166 assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2167 assert_eq!(pages[1].page_type(), PageType::DATA_PAGE_V2);
2168
2169 match &pages[1] {
2170 Page::DataPageV2 {
2171 num_values,
2172 num_nulls,
2173 num_rows,
2174 statistics,
2175 ..
2176 } => {
2177 assert_eq!(*num_values, 6);
2178 assert_eq!(*num_nulls, 2);
2179 assert_eq!(*num_rows, 6);
2180 assert!(statistics.is_none());
2181 }
2182 _ => unreachable!(),
2183 }
2184 }
2185
2186 #[test]
2187 fn test_column_writer_empty_column_roundtrip() {
2188 let props = Default::default();
2189 column_roundtrip::<Int32Type>(props, &[], None, None);
2190 }
2191
2192 #[test]
2193 fn test_column_writer_non_nullable_values_roundtrip() {
2194 let props = Default::default();
2195 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 0, 0);
2196 }
2197
2198 #[test]
2199 fn test_column_writer_nullable_non_repeated_values_roundtrip() {
2200 let props = Default::default();
2201 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 0);
2202 }
2203
2204 #[test]
2205 fn test_column_writer_nullable_repeated_values_roundtrip() {
2206 let props = Default::default();
2207 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2208 }
2209
2210 #[test]
2211 fn test_column_writer_dictionary_fallback_small_data_page() {
2212 let props = WriterProperties::builder()
2213 .set_dictionary_page_size_limit(32)
2214 .set_data_page_size_limit(32)
2215 .build();
2216 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2217 }
2218
2219 #[test]
2220 fn test_column_writer_small_write_batch_size() {
2221 for i in &[1usize, 2, 5, 10, 11, 1023] {
2222 let props = WriterProperties::builder().set_write_batch_size(*i).build();
2223
2224 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2225 }
2226 }
2227
2228 #[test]
2229 fn test_column_writer_dictionary_disabled_v1() {
2230 let props = WriterProperties::builder()
2231 .set_writer_version(WriterVersion::PARQUET_1_0)
2232 .set_dictionary_enabled(false)
2233 .build();
2234 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2235 }
2236
2237 #[test]
2238 fn test_column_writer_dictionary_disabled_v2() {
2239 let props = WriterProperties::builder()
2240 .set_writer_version(WriterVersion::PARQUET_2_0)
2241 .set_dictionary_enabled(false)
2242 .build();
2243 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2244 }
2245
2246 #[test]
2247 fn test_column_writer_compression_v1() {
2248 let props = WriterProperties::builder()
2249 .set_writer_version(WriterVersion::PARQUET_1_0)
2250 .set_compression(Compression::SNAPPY)
2251 .build();
2252 column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2253 }
2254
2255 #[test]
2256 fn test_column_writer_compression_v2() {
2257 let props = WriterProperties::builder()
2258 .set_writer_version(WriterVersion::PARQUET_2_0)
2259 .set_compression(Compression::SNAPPY)
2260 .build();
2261 column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2262 }
2263
2264 #[test]
2265 fn test_column_writer_add_data_pages_with_dict() {
2266 let mut file = tempfile::tempfile().unwrap();
2269 let mut write = TrackedWrite::new(&mut file);
2270 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2271 let props = Arc::new(
2272 WriterProperties::builder()
2273 .set_data_page_size_limit(10)
2274 .set_write_batch_size(3) .build(),
2276 );
2277 let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
2278 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2279 writer.write_batch(data, None, None).unwrap();
2280 let r = writer.close().unwrap();
2281
2282 drop(write);
2283
2284 let props = ReaderProperties::builder()
2286 .set_backward_compatible_lz4(false)
2287 .build();
2288 let mut page_reader = Box::new(
2289 SerializedPageReader::new_with_properties(
2290 Arc::new(file),
2291 &r.metadata,
2292 r.rows_written as usize,
2293 None,
2294 Arc::new(props),
2295 )
2296 .unwrap(),
2297 );
2298 let mut res = Vec::new();
2299 while let Some(page) = page_reader.get_next_page().unwrap() {
2300 res.push((page.page_type(), page.num_values(), page.buffer().len()));
2301 }
2302 assert_eq!(
2303 res,
2304 vec![
2305 (PageType::DICTIONARY_PAGE, 10, 40),
2306 (PageType::DATA_PAGE, 9, 10),
2307 (PageType::DATA_PAGE, 1, 3),
2308 ]
2309 );
2310 }
2311
2312 #[test]
2313 fn test_bool_statistics() {
2314 let stats = statistics_roundtrip::<BoolType>(&[true, false, false, true]);
2315 assert!(!stats.is_min_max_backwards_compatible());
2318 if let Statistics::Boolean(stats) = stats {
2319 assert_eq!(stats.min_opt().unwrap(), &false);
2320 assert_eq!(stats.max_opt().unwrap(), &true);
2321 } else {
2322 panic!("expecting Statistics::Boolean, got {stats:?}");
2323 }
2324 }
2325
2326 #[test]
2327 fn test_int32_statistics() {
2328 let stats = statistics_roundtrip::<Int32Type>(&[-1, 3, -2, 2]);
2329 assert!(stats.is_min_max_backwards_compatible());
2330 if let Statistics::Int32(stats) = stats {
2331 assert_eq!(stats.min_opt().unwrap(), &-2);
2332 assert_eq!(stats.max_opt().unwrap(), &3);
2333 } else {
2334 panic!("expecting Statistics::Int32, got {stats:?}");
2335 }
2336 }
2337
2338 #[test]
2339 fn test_int64_statistics() {
2340 let stats = statistics_roundtrip::<Int64Type>(&[-1, 3, -2, 2]);
2341 assert!(stats.is_min_max_backwards_compatible());
2342 if let Statistics::Int64(stats) = stats {
2343 assert_eq!(stats.min_opt().unwrap(), &-2);
2344 assert_eq!(stats.max_opt().unwrap(), &3);
2345 } else {
2346 panic!("expecting Statistics::Int64, got {stats:?}");
2347 }
2348 }
2349
2350 #[test]
2351 fn test_int96_statistics() {
2352 let input = vec![
2353 Int96::from(vec![1, 20, 30]),
2354 Int96::from(vec![3, 20, 10]),
2355 Int96::from(vec![0, 20, 30]),
2356 Int96::from(vec![2, 20, 30]),
2357 ]
2358 .into_iter()
2359 .collect::<Vec<Int96>>();
2360
2361 let stats = statistics_roundtrip::<Int96Type>(&input);
2362 assert!(!stats.is_min_max_backwards_compatible());
2363 if let Statistics::Int96(stats) = stats {
2364 assert_eq!(stats.min_opt().unwrap(), &Int96::from(vec![0, 20, 30]));
2365 assert_eq!(stats.max_opt().unwrap(), &Int96::from(vec![3, 20, 10]));
2366 } else {
2367 panic!("expecting Statistics::Int96, got {stats:?}");
2368 }
2369 }
2370
2371 #[test]
2372 fn test_float_statistics() {
2373 let stats = statistics_roundtrip::<FloatType>(&[-1.0, 3.0, -2.0, 2.0]);
2374 assert!(stats.is_min_max_backwards_compatible());
2375 if let Statistics::Float(stats) = stats {
2376 assert_eq!(stats.min_opt().unwrap(), &-2.0);
2377 assert_eq!(stats.max_opt().unwrap(), &3.0);
2378 } else {
2379 panic!("expecting Statistics::Float, got {stats:?}");
2380 }
2381 }
2382
2383 #[test]
2384 fn test_double_statistics() {
2385 let stats = statistics_roundtrip::<DoubleType>(&[-1.0, 3.0, -2.0, 2.0]);
2386 assert!(stats.is_min_max_backwards_compatible());
2387 if let Statistics::Double(stats) = stats {
2388 assert_eq!(stats.min_opt().unwrap(), &-2.0);
2389 assert_eq!(stats.max_opt().unwrap(), &3.0);
2390 } else {
2391 panic!("expecting Statistics::Double, got {stats:?}");
2392 }
2393 }
2394
2395 #[test]
2396 fn test_byte_array_statistics() {
2397 let input = ["aawaa", "zz", "aaw", "m", "qrs"]
2398 .iter()
2399 .map(|&s| s.into())
2400 .collect::<Vec<_>>();
2401
2402 let stats = statistics_roundtrip::<ByteArrayType>(&input);
2403 assert!(!stats.is_min_max_backwards_compatible());
2404 if let Statistics::ByteArray(stats) = stats {
2405 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from("aaw"));
2406 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from("zz"));
2407 } else {
2408 panic!("expecting Statistics::ByteArray, got {stats:?}");
2409 }
2410 }
2411
2412 #[test]
2413 fn test_fixed_len_byte_array_statistics() {
2414 let input = ["aawaa", "zz ", "aaw ", "m ", "qrs "]
2415 .iter()
2416 .map(|&s| ByteArray::from(s).into())
2417 .collect::<Vec<_>>();
2418
2419 let stats = statistics_roundtrip::<FixedLenByteArrayType>(&input);
2420 assert!(!stats.is_min_max_backwards_compatible());
2421 if let Statistics::FixedLenByteArray(stats) = stats {
2422 let expected_min: FixedLenByteArray = ByteArray::from("aaw ").into();
2423 assert_eq!(stats.min_opt().unwrap(), &expected_min);
2424 let expected_max: FixedLenByteArray = ByteArray::from("zz ").into();
2425 assert_eq!(stats.max_opt().unwrap(), &expected_max);
2426 } else {
2427 panic!("expecting Statistics::FixedLenByteArray, got {stats:?}");
2428 }
2429 }
2430
2431 #[test]
2432 fn test_column_writer_check_float16_min_max() {
2433 let input = [
2434 -f16::ONE,
2435 f16::from_f32(3.0),
2436 -f16::from_f32(2.0),
2437 f16::from_f32(2.0),
2438 ]
2439 .into_iter()
2440 .map(|s| ByteArray::from(s).into())
2441 .collect::<Vec<_>>();
2442
2443 let stats = float16_statistics_roundtrip(&input);
2444 assert!(stats.is_min_max_backwards_compatible());
2445 assert_eq!(
2446 stats.min_opt().unwrap(),
2447 &ByteArray::from(-f16::from_f32(2.0))
2448 );
2449 assert_eq!(
2450 stats.max_opt().unwrap(),
2451 &ByteArray::from(f16::from_f32(3.0))
2452 );
2453 }
2454
2455 #[test]
2456 fn test_column_writer_check_float16_nan_middle() {
2457 let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
2458 .into_iter()
2459 .map(|s| ByteArray::from(s).into())
2460 .collect::<Vec<_>>();
2461
2462 let stats = float16_statistics_roundtrip(&input);
2463 assert!(stats.is_min_max_backwards_compatible());
2464 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2465 assert_eq!(
2466 stats.max_opt().unwrap(),
2467 &ByteArray::from(f16::ONE + f16::ONE)
2468 );
2469 }
2470
2471 #[test]
2472 fn test_float16_statistics_nan_middle() {
2473 let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
2474 .into_iter()
2475 .map(|s| ByteArray::from(s).into())
2476 .collect::<Vec<_>>();
2477
2478 let stats = float16_statistics_roundtrip(&input);
2479 assert!(stats.is_min_max_backwards_compatible());
2480 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2481 assert_eq!(
2482 stats.max_opt().unwrap(),
2483 &ByteArray::from(f16::ONE + f16::ONE)
2484 );
2485 }
2486
2487 #[test]
2488 fn test_float16_statistics_nan_start() {
2489 let input = [f16::NAN, f16::ONE, f16::ONE + f16::ONE]
2490 .into_iter()
2491 .map(|s| ByteArray::from(s).into())
2492 .collect::<Vec<_>>();
2493
2494 let stats = float16_statistics_roundtrip(&input);
2495 assert!(stats.is_min_max_backwards_compatible());
2496 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2497 assert_eq!(
2498 stats.max_opt().unwrap(),
2499 &ByteArray::from(f16::ONE + f16::ONE)
2500 );
2501 }
2502
2503 #[test]
2504 fn test_float16_statistics_nan_only() {
2505 let input = [f16::NAN, f16::NAN]
2506 .into_iter()
2507 .map(|s| ByteArray::from(s).into())
2508 .collect::<Vec<_>>();
2509
2510 let stats = float16_statistics_roundtrip(&input);
2511 assert!(stats.min_bytes_opt().is_none());
2512 assert!(stats.max_bytes_opt().is_none());
2513 assert!(stats.is_min_max_backwards_compatible());
2514 }
2515
2516 #[test]
2517 fn test_float16_statistics_zero_only() {
2518 let input = [f16::ZERO]
2519 .into_iter()
2520 .map(|s| ByteArray::from(s).into())
2521 .collect::<Vec<_>>();
2522
2523 let stats = float16_statistics_roundtrip(&input);
2524 assert!(stats.is_min_max_backwards_compatible());
2525 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2526 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2527 }
2528
2529 #[test]
2530 fn test_float16_statistics_neg_zero_only() {
2531 let input = [f16::NEG_ZERO]
2532 .into_iter()
2533 .map(|s| ByteArray::from(s).into())
2534 .collect::<Vec<_>>();
2535
2536 let stats = float16_statistics_roundtrip(&input);
2537 assert!(stats.is_min_max_backwards_compatible());
2538 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2539 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2540 }
2541
2542 #[test]
2543 fn test_float16_statistics_zero_min() {
2544 let input = [f16::ZERO, f16::ONE, f16::NAN, f16::PI]
2545 .into_iter()
2546 .map(|s| ByteArray::from(s).into())
2547 .collect::<Vec<_>>();
2548
2549 let stats = float16_statistics_roundtrip(&input);
2550 assert!(stats.is_min_max_backwards_compatible());
2551 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2552 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::PI));
2553 }
2554
2555 #[test]
2556 fn test_float16_statistics_neg_zero_max() {
2557 let input = [f16::NEG_ZERO, f16::NEG_ONE, f16::NAN, -f16::PI]
2558 .into_iter()
2559 .map(|s| ByteArray::from(s).into())
2560 .collect::<Vec<_>>();
2561
2562 let stats = float16_statistics_roundtrip(&input);
2563 assert!(stats.is_min_max_backwards_compatible());
2564 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(-f16::PI));
2565 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2566 }
2567
2568 #[test]
2569 fn test_float_statistics_nan_middle() {
2570 let stats = statistics_roundtrip::<FloatType>(&[1.0, f32::NAN, 2.0]);
2571 assert!(stats.is_min_max_backwards_compatible());
2572 if let Statistics::Float(stats) = stats {
2573 assert_eq!(stats.min_opt().unwrap(), &1.0);
2574 assert_eq!(stats.max_opt().unwrap(), &2.0);
2575 } else {
2576 panic!("expecting Statistics::Float");
2577 }
2578 }
2579
2580 #[test]
2581 fn test_float_statistics_nan_start() {
2582 let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, 1.0, 2.0]);
2583 assert!(stats.is_min_max_backwards_compatible());
2584 if let Statistics::Float(stats) = stats {
2585 assert_eq!(stats.min_opt().unwrap(), &1.0);
2586 assert_eq!(stats.max_opt().unwrap(), &2.0);
2587 } else {
2588 panic!("expecting Statistics::Float");
2589 }
2590 }
2591
2592 #[test]
2593 fn test_float_statistics_nan_only() {
2594 let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, f32::NAN]);
2595 assert!(stats.min_bytes_opt().is_none());
2596 assert!(stats.max_bytes_opt().is_none());
2597 assert!(stats.is_min_max_backwards_compatible());
2598 assert!(matches!(stats, Statistics::Float(_)));
2599 }
2600
2601 #[test]
2602 fn test_float_statistics_zero_only() {
2603 let stats = statistics_roundtrip::<FloatType>(&[0.0]);
2604 assert!(stats.is_min_max_backwards_compatible());
2605 if let Statistics::Float(stats) = stats {
2606 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2607 assert!(stats.min_opt().unwrap().is_sign_negative());
2608 assert_eq!(stats.max_opt().unwrap(), &0.0);
2609 assert!(stats.max_opt().unwrap().is_sign_positive());
2610 } else {
2611 panic!("expecting Statistics::Float");
2612 }
2613 }
2614
2615 #[test]
2616 fn test_float_statistics_neg_zero_only() {
2617 let stats = statistics_roundtrip::<FloatType>(&[-0.0]);
2618 assert!(stats.is_min_max_backwards_compatible());
2619 if let Statistics::Float(stats) = stats {
2620 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2621 assert!(stats.min_opt().unwrap().is_sign_negative());
2622 assert_eq!(stats.max_opt().unwrap(), &0.0);
2623 assert!(stats.max_opt().unwrap().is_sign_positive());
2624 } else {
2625 panic!("expecting Statistics::Float");
2626 }
2627 }
2628
2629 #[test]
2630 fn test_float_statistics_zero_min() {
2631 let stats = statistics_roundtrip::<FloatType>(&[0.0, 1.0, f32::NAN, 2.0]);
2632 assert!(stats.is_min_max_backwards_compatible());
2633 if let Statistics::Float(stats) = stats {
2634 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2635 assert!(stats.min_opt().unwrap().is_sign_negative());
2636 assert_eq!(stats.max_opt().unwrap(), &2.0);
2637 } else {
2638 panic!("expecting Statistics::Float");
2639 }
2640 }
2641
2642 #[test]
2643 fn test_float_statistics_neg_zero_max() {
2644 let stats = statistics_roundtrip::<FloatType>(&[-0.0, -1.0, f32::NAN, -2.0]);
2645 assert!(stats.is_min_max_backwards_compatible());
2646 if let Statistics::Float(stats) = stats {
2647 assert_eq!(stats.min_opt().unwrap(), &-2.0);
2648 assert_eq!(stats.max_opt().unwrap(), &0.0);
2649 assert!(stats.max_opt().unwrap().is_sign_positive());
2650 } else {
2651 panic!("expecting Statistics::Float");
2652 }
2653 }
2654
2655 #[test]
2656 fn test_double_statistics_nan_middle() {
2657 let stats = statistics_roundtrip::<DoubleType>(&[1.0, f64::NAN, 2.0]);
2658 assert!(stats.is_min_max_backwards_compatible());
2659 if let Statistics::Double(stats) = stats {
2660 assert_eq!(stats.min_opt().unwrap(), &1.0);
2661 assert_eq!(stats.max_opt().unwrap(), &2.0);
2662 } else {
2663 panic!("expecting Statistics::Double");
2664 }
2665 }
2666
2667 #[test]
2668 fn test_double_statistics_nan_start() {
2669 let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, 1.0, 2.0]);
2670 assert!(stats.is_min_max_backwards_compatible());
2671 if let Statistics::Double(stats) = stats {
2672 assert_eq!(stats.min_opt().unwrap(), &1.0);
2673 assert_eq!(stats.max_opt().unwrap(), &2.0);
2674 } else {
2675 panic!("expecting Statistics::Double");
2676 }
2677 }
2678
2679 #[test]
2680 fn test_double_statistics_nan_only() {
2681 let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, f64::NAN]);
2682 assert!(stats.min_bytes_opt().is_none());
2683 assert!(stats.max_bytes_opt().is_none());
2684 assert!(matches!(stats, Statistics::Double(_)));
2685 assert!(stats.is_min_max_backwards_compatible());
2686 }
2687
2688 #[test]
2689 fn test_double_statistics_zero_only() {
2690 let stats = statistics_roundtrip::<DoubleType>(&[0.0]);
2691 assert!(stats.is_min_max_backwards_compatible());
2692 if let Statistics::Double(stats) = stats {
2693 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2694 assert!(stats.min_opt().unwrap().is_sign_negative());
2695 assert_eq!(stats.max_opt().unwrap(), &0.0);
2696 assert!(stats.max_opt().unwrap().is_sign_positive());
2697 } else {
2698 panic!("expecting Statistics::Double");
2699 }
2700 }
2701
2702 #[test]
2703 fn test_double_statistics_neg_zero_only() {
2704 let stats = statistics_roundtrip::<DoubleType>(&[-0.0]);
2705 assert!(stats.is_min_max_backwards_compatible());
2706 if let Statistics::Double(stats) = stats {
2707 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2708 assert!(stats.min_opt().unwrap().is_sign_negative());
2709 assert_eq!(stats.max_opt().unwrap(), &0.0);
2710 assert!(stats.max_opt().unwrap().is_sign_positive());
2711 } else {
2712 panic!("expecting Statistics::Double");
2713 }
2714 }
2715
2716 #[test]
2717 fn test_double_statistics_zero_min() {
2718 let stats = statistics_roundtrip::<DoubleType>(&[0.0, 1.0, f64::NAN, 2.0]);
2719 assert!(stats.is_min_max_backwards_compatible());
2720 if let Statistics::Double(stats) = stats {
2721 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2722 assert!(stats.min_opt().unwrap().is_sign_negative());
2723 assert_eq!(stats.max_opt().unwrap(), &2.0);
2724 } else {
2725 panic!("expecting Statistics::Double");
2726 }
2727 }
2728
2729 #[test]
2730 fn test_double_statistics_neg_zero_max() {
2731 let stats = statistics_roundtrip::<DoubleType>(&[-0.0, -1.0, f64::NAN, -2.0]);
2732 assert!(stats.is_min_max_backwards_compatible());
2733 if let Statistics::Double(stats) = stats {
2734 assert_eq!(stats.min_opt().unwrap(), &-2.0);
2735 assert_eq!(stats.max_opt().unwrap(), &0.0);
2736 assert!(stats.max_opt().unwrap().is_sign_positive());
2737 } else {
2738 panic!("expecting Statistics::Double");
2739 }
2740 }
2741
2742 #[test]
2743 fn test_compare_greater_byte_array_decimals() {
2744 assert!(!compare_greater_byte_array_decimals(&[], &[],),);
2745 assert!(compare_greater_byte_array_decimals(&[1u8,], &[],),);
2746 assert!(!compare_greater_byte_array_decimals(&[], &[1u8,],),);
2747 assert!(compare_greater_byte_array_decimals(&[1u8,], &[0u8,],),);
2748 assert!(!compare_greater_byte_array_decimals(&[1u8,], &[1u8,],),);
2749 assert!(compare_greater_byte_array_decimals(&[1u8, 0u8,], &[0u8,],),);
2750 assert!(!compare_greater_byte_array_decimals(
2751 &[0u8, 1u8,],
2752 &[1u8, 0u8,],
2753 ),);
2754 assert!(!compare_greater_byte_array_decimals(
2755 &[255u8, 35u8, 0u8, 0u8,],
2756 &[0u8,],
2757 ),);
2758 assert!(compare_greater_byte_array_decimals(
2759 &[0u8,],
2760 &[255u8, 35u8, 0u8, 0u8,],
2761 ),);
2762 }
2763
2764 #[test]
2765 fn test_column_index_with_null_pages() {
2766 let page_writer = get_test_page_writer();
2768 let props = Default::default();
2769 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
2770 writer.write_batch(&[], Some(&[0, 0, 0, 0]), None).unwrap();
2771
2772 let r = writer.close().unwrap();
2773 assert!(r.column_index.is_some());
2774 let col_idx = r.column_index.unwrap();
2775 assert!(col_idx.null_pages[0]);
2777 assert_eq!(col_idx.min_values[0].len(), 0);
2779 assert_eq!(col_idx.max_values[0].len(), 0);
2780 assert!(col_idx.null_counts.is_some());
2782 assert_eq!(col_idx.null_counts.as_ref().unwrap()[0], 4);
2783 assert!(col_idx.repetition_level_histograms.is_none());
2785 assert!(col_idx.definition_level_histograms.is_some());
2787 assert_eq!(col_idx.definition_level_histograms.unwrap(), &[4, 0]);
2788 }
2789
2790 #[test]
2791 fn test_column_offset_index_metadata() {
2792 let page_writer = get_test_page_writer();
2795 let props = Default::default();
2796 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2797 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2798 writer.flush_data_pages().unwrap();
2800 writer.write_batch(&[4, 8, 2, -5], None, None).unwrap();
2802
2803 let r = writer.close().unwrap();
2804 let column_index = r.column_index.unwrap();
2805 let offset_index = r.offset_index.unwrap();
2806
2807 assert_eq!(8, r.rows_written);
2808
2809 assert_eq!(2, column_index.null_pages.len());
2811 assert_eq!(2, offset_index.page_locations.len());
2812 assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order);
2813 for idx in 0..2 {
2814 assert!(!column_index.null_pages[idx]);
2815 assert_eq!(0, column_index.null_counts.as_ref().unwrap()[idx]);
2816 }
2817
2818 if let Some(stats) = r.metadata.statistics() {
2819 assert_eq!(stats.null_count_opt(), Some(0));
2820 assert_eq!(stats.distinct_count_opt(), None);
2821 if let Statistics::Int32(stats) = stats {
2822 assert_eq!(
2826 stats.min_bytes_opt(),
2827 Some(column_index.min_values[1].as_slice())
2828 );
2829 assert_eq!(
2830 stats.max_bytes_opt(),
2831 column_index.max_values.get(1).map(Vec::as_slice)
2832 );
2833 } else {
2834 panic!("expecting Statistics::Int32");
2835 }
2836 } else {
2837 panic!("metadata missing statistics");
2838 }
2839
2840 assert_eq!(0, offset_index.page_locations[0].first_row_index);
2842 assert_eq!(4, offset_index.page_locations[1].first_row_index);
2843 }
2844
2845 #[test]
2847 fn test_column_offset_index_metadata_truncating() {
2848 let page_writer = get_test_page_writer();
2851 let props = Default::default();
2852 let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
2853
2854 let mut data = vec![FixedLenByteArray::default(); 3];
2855 data[0].set_data(Bytes::from(vec![97_u8; 200]));
2857 data[1].set_data(Bytes::from(vec![112_u8; 200]));
2859 data[2].set_data(Bytes::from(vec![98_u8; 200]));
2860
2861 writer.write_batch(&data, None, None).unwrap();
2862
2863 writer.flush_data_pages().unwrap();
2864
2865 let r = writer.close().unwrap();
2866 let column_index = r.column_index.unwrap();
2867 let offset_index = r.offset_index.unwrap();
2868
2869 assert_eq!(3, r.rows_written);
2870
2871 assert_eq!(1, column_index.null_pages.len());
2873 assert_eq!(1, offset_index.page_locations.len());
2874 assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
2875 assert!(!column_index.null_pages[0]);
2876 assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]);
2877
2878 if let Some(stats) = r.metadata.statistics() {
2879 assert_eq!(stats.null_count_opt(), Some(0));
2880 assert_eq!(stats.distinct_count_opt(), None);
2881 if let Statistics::FixedLenByteArray(stats) = stats {
2882 let column_index_min_value = &column_index.min_values[0];
2883 let column_index_max_value = &column_index.max_values[0];
2884
2885 assert_ne!(
2887 stats.min_bytes_opt(),
2888 Some(column_index_min_value.as_slice())
2889 );
2890 assert_ne!(
2891 stats.max_bytes_opt(),
2892 Some(column_index_max_value.as_slice())
2893 );
2894
2895 assert_eq!(
2896 column_index_min_value.len(),
2897 DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
2898 );
2899 assert_eq!(column_index_min_value.as_slice(), &[97_u8; 64]);
2900 assert_eq!(
2901 column_index_max_value.len(),
2902 DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
2903 );
2904
2905 assert_eq!(
2907 *column_index_max_value.last().unwrap(),
2908 *column_index_max_value.first().unwrap() + 1
2909 );
2910 } else {
2911 panic!("expecting Statistics::FixedLenByteArray");
2912 }
2913 } else {
2914 panic!("metadata missing statistics");
2915 }
2916 }
2917
2918 #[test]
2919 fn test_column_offset_index_truncating_spec_example() {
2920 let page_writer = get_test_page_writer();
2923
2924 let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
2926 let props = Arc::new(builder.build());
2927 let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
2928
2929 let mut data = vec![FixedLenByteArray::default(); 1];
2930 data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
2932
2933 writer.write_batch(&data, None, None).unwrap();
2934
2935 writer.flush_data_pages().unwrap();
2936
2937 let r = writer.close().unwrap();
2938 let column_index = r.column_index.unwrap();
2939 let offset_index = r.offset_index.unwrap();
2940
2941 assert_eq!(1, r.rows_written);
2942
2943 assert_eq!(1, column_index.null_pages.len());
2945 assert_eq!(1, offset_index.page_locations.len());
2946 assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
2947 assert!(!column_index.null_pages[0]);
2948 assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]);
2949
2950 if let Some(stats) = r.metadata.statistics() {
2951 assert_eq!(stats.null_count_opt(), Some(0));
2952 assert_eq!(stats.distinct_count_opt(), None);
2953 if let Statistics::FixedLenByteArray(_stats) = stats {
2954 let column_index_min_value = &column_index.min_values[0];
2955 let column_index_max_value = &column_index.max_values[0];
2956
2957 assert_eq!(column_index_min_value.len(), 1);
2958 assert_eq!(column_index_max_value.len(), 1);
2959
2960 assert_eq!("B".as_bytes(), column_index_min_value.as_slice());
2961 assert_eq!("C".as_bytes(), column_index_max_value.as_slice());
2962
2963 assert_ne!(column_index_min_value, stats.min_bytes_opt().unwrap());
2964 assert_ne!(column_index_max_value, stats.max_bytes_opt().unwrap());
2965 } else {
2966 panic!("expecting Statistics::FixedLenByteArray");
2967 }
2968 } else {
2969 panic!("metadata missing statistics");
2970 }
2971 }
2972
2973 #[test]
2974 fn test_float16_min_max_no_truncation() {
2975 let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
2977 let props = Arc::new(builder.build());
2978 let page_writer = get_test_page_writer();
2979 let mut writer = get_test_float16_column_writer(page_writer, props);
2980
2981 let expected_value = f16::PI.to_le_bytes().to_vec();
2982 let data = vec![ByteArray::from(expected_value.clone()).into()];
2983 writer.write_batch(&data, None, None).unwrap();
2984 writer.flush_data_pages().unwrap();
2985
2986 let r = writer.close().unwrap();
2987
2988 let column_index = r.column_index.unwrap();
2991 let column_index_min_bytes = column_index.min_values[0].as_slice();
2992 let column_index_max_bytes = column_index.max_values[0].as_slice();
2993 assert_eq!(expected_value, column_index_min_bytes);
2994 assert_eq!(expected_value, column_index_max_bytes);
2995
2996 let stats = r.metadata.statistics().unwrap();
2998 if let Statistics::FixedLenByteArray(stats) = stats {
2999 let stats_min_bytes = stats.min_bytes_opt().unwrap();
3000 let stats_max_bytes = stats.max_bytes_opt().unwrap();
3001 assert_eq!(expected_value, stats_min_bytes);
3002 assert_eq!(expected_value, stats_max_bytes);
3003 } else {
3004 panic!("expecting Statistics::FixedLenByteArray");
3005 }
3006 }
3007
3008 #[test]
3009 fn test_decimal_min_max_no_truncation() {
3010 let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3012 let props = Arc::new(builder.build());
3013 let page_writer = get_test_page_writer();
3014 let mut writer =
3015 get_test_decimals_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3016
3017 let expected_value = vec![
3018 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8, 35u8,
3019 231u8, 90u8, 0u8, 0u8,
3020 ];
3021 let data = vec![ByteArray::from(expected_value.clone()).into()];
3022 writer.write_batch(&data, None, None).unwrap();
3023 writer.flush_data_pages().unwrap();
3024
3025 let r = writer.close().unwrap();
3026
3027 let column_index = r.column_index.unwrap();
3030 let column_index_min_bytes = column_index.min_values[0].as_slice();
3031 let column_index_max_bytes = column_index.max_values[0].as_slice();
3032 assert_eq!(expected_value, column_index_min_bytes);
3033 assert_eq!(expected_value, column_index_max_bytes);
3034
3035 let stats = r.metadata.statistics().unwrap();
3037 if let Statistics::FixedLenByteArray(stats) = stats {
3038 let stats_min_bytes = stats.min_bytes_opt().unwrap();
3039 let stats_max_bytes = stats.max_bytes_opt().unwrap();
3040 assert_eq!(expected_value, stats_min_bytes);
3041 assert_eq!(expected_value, stats_max_bytes);
3042 } else {
3043 panic!("expecting Statistics::FixedLenByteArray");
3044 }
3045 }
3046
3047 #[test]
3048 fn test_statistics_truncating_byte_array() {
3049 let page_writer = get_test_page_writer();
3050
3051 const TEST_TRUNCATE_LENGTH: usize = 1;
3052
3053 let builder =
3055 WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3056 let props = Arc::new(builder.build());
3057 let mut writer = get_test_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
3058
3059 let mut data = vec![ByteArray::default(); 1];
3060 data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
3062
3063 writer.write_batch(&data, None, None).unwrap();
3064
3065 writer.flush_data_pages().unwrap();
3066
3067 let r = writer.close().unwrap();
3068
3069 assert_eq!(1, r.rows_written);
3070
3071 let stats = r.metadata.statistics().expect("statistics");
3072 assert_eq!(stats.null_count_opt(), Some(0));
3073 assert_eq!(stats.distinct_count_opt(), None);
3074 if let Statistics::ByteArray(_stats) = stats {
3075 let min_value = _stats.min_opt().unwrap();
3076 let max_value = _stats.max_opt().unwrap();
3077
3078 assert!(!_stats.min_is_exact());
3079 assert!(!_stats.max_is_exact());
3080
3081 assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3082 assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3083
3084 assert_eq!("B".as_bytes(), min_value.as_bytes());
3085 assert_eq!("C".as_bytes(), max_value.as_bytes());
3086 } else {
3087 panic!("expecting Statistics::ByteArray");
3088 }
3089 }
3090
3091 #[test]
3092 fn test_statistics_truncating_fixed_len_byte_array() {
3093 let page_writer = get_test_page_writer();
3094
3095 const TEST_TRUNCATE_LENGTH: usize = 1;
3096
3097 let builder =
3099 WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3100 let props = Arc::new(builder.build());
3101 let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3102
3103 let mut data = vec![FixedLenByteArray::default(); 1];
3104
3105 const PSEUDO_DECIMAL_VALUE: i128 = 6541894651216648486512564456564654;
3106 const PSEUDO_DECIMAL_BYTES: [u8; 16] = PSEUDO_DECIMAL_VALUE.to_be_bytes();
3107
3108 const EXPECTED_MIN: [u8; TEST_TRUNCATE_LENGTH] = [PSEUDO_DECIMAL_BYTES[0]]; const EXPECTED_MAX: [u8; TEST_TRUNCATE_LENGTH] =
3110 [PSEUDO_DECIMAL_BYTES[0].overflowing_add(1).0];
3111
3112 data[0].set_data(Bytes::from(PSEUDO_DECIMAL_BYTES.as_slice()));
3114
3115 writer.write_batch(&data, None, None).unwrap();
3116
3117 writer.flush_data_pages().unwrap();
3118
3119 let r = writer.close().unwrap();
3120
3121 assert_eq!(1, r.rows_written);
3122
3123 let stats = r.metadata.statistics().expect("statistics");
3124 assert_eq!(stats.null_count_opt(), Some(0));
3125 assert_eq!(stats.distinct_count_opt(), None);
3126 if let Statistics::FixedLenByteArray(_stats) = stats {
3127 let min_value = _stats.min_opt().unwrap();
3128 let max_value = _stats.max_opt().unwrap();
3129
3130 assert!(!_stats.min_is_exact());
3131 assert!(!_stats.max_is_exact());
3132
3133 assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3134 assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3135
3136 assert_eq!(EXPECTED_MIN.as_slice(), min_value.as_bytes());
3137 assert_eq!(EXPECTED_MAX.as_slice(), max_value.as_bytes());
3138
3139 let reconstructed_min = i128::from_be_bytes([
3140 min_value.as_bytes()[0],
3141 0,
3142 0,
3143 0,
3144 0,
3145 0,
3146 0,
3147 0,
3148 0,
3149 0,
3150 0,
3151 0,
3152 0,
3153 0,
3154 0,
3155 0,
3156 ]);
3157
3158 let reconstructed_max = i128::from_be_bytes([
3159 max_value.as_bytes()[0],
3160 0,
3161 0,
3162 0,
3163 0,
3164 0,
3165 0,
3166 0,
3167 0,
3168 0,
3169 0,
3170 0,
3171 0,
3172 0,
3173 0,
3174 0,
3175 ]);
3176
3177 println!("min: {reconstructed_min} {PSEUDO_DECIMAL_VALUE}");
3179 assert!(reconstructed_min <= PSEUDO_DECIMAL_VALUE);
3180 println!("max {reconstructed_max} {PSEUDO_DECIMAL_VALUE}");
3181 assert!(reconstructed_max >= PSEUDO_DECIMAL_VALUE);
3182 } else {
3183 panic!("expecting Statistics::FixedLenByteArray");
3184 }
3185 }
3186
3187 #[test]
3188 fn test_send() {
3189 fn test<T: Send>() {}
3190 test::<ColumnWriterImpl<Int32Type>>();
3191 }
3192
3193 #[test]
3194 fn test_increment() {
3195 let v = increment(vec![0, 0, 0]).unwrap();
3196 assert_eq!(&v, &[0, 0, 1]);
3197
3198 let v = increment(vec![0, 255, 255]).unwrap();
3200 assert_eq!(&v, &[1, 0, 0]);
3201
3202 let v = increment(vec![255, 255, 255]);
3204 assert!(v.is_none());
3205 }
3206
3207 #[test]
3208 fn test_increment_utf8() {
3209 let test_inc = |o: &str, expected: &str| {
3210 if let Ok(v) = String::from_utf8(increment_utf8(o).unwrap()) {
3211 assert_eq!(v, expected);
3213 assert!(*v > *o);
3215 let mut greater = ByteArray::new();
3217 greater.set_data(Bytes::from(v));
3218 let mut original = ByteArray::new();
3219 original.set_data(Bytes::from(o.as_bytes().to_vec()));
3220 assert!(greater > original);
3221 } else {
3222 panic!("Expected incremented UTF8 string to also be valid.");
3223 }
3224 };
3225
3226 test_inc("hello", "hellp");
3228
3229 test_inc("a\u{7f}", "b");
3231
3232 assert!(increment_utf8("\u{7f}\u{7f}").is_none());
3234
3235 test_inc("❤️🧡💛💚💙💜", "❤️🧡💛💚💙💝");
3237
3238 test_inc("éééé", "éééê");
3240
3241 test_inc("\u{ff}\u{ff}", "\u{ff}\u{100}");
3243
3244 test_inc("a\u{7ff}", "b");
3246
3247 assert!(increment_utf8("\u{7ff}\u{7ff}").is_none());
3249
3250 test_inc("ࠀࠀ", "ࠀࠁ");
3253
3254 test_inc("a\u{ffff}", "b");
3256
3257 assert!(increment_utf8("\u{ffff}\u{ffff}").is_none());
3259
3260 test_inc("𐀀𐀀", "𐀀𐀁");
3262
3263 test_inc("a\u{10ffff}", "b");
3265
3266 assert!(increment_utf8("\u{10ffff}\u{10ffff}").is_none());
3268
3269 test_inc("a\u{D7FF}", "b");
3272 }
3273
3274 #[test]
3275 fn test_truncate_utf8() {
3276 let data = "❤️🧡💛💚💙💜";
3278 let r = truncate_utf8(data, data.len()).unwrap();
3279 assert_eq!(r.len(), data.len());
3280 assert_eq!(&r, data.as_bytes());
3281
3282 let r = truncate_utf8(data, 13).unwrap();
3284 assert_eq!(r.len(), 10);
3285 assert_eq!(&r, "❤️🧡".as_bytes());
3286
3287 let r = truncate_utf8("\u{0836}", 1);
3289 assert!(r.is_none());
3290
3291 let r = truncate_and_increment_utf8("yyyyyyyyy", 8).unwrap();
3294 assert_eq!(&r, "yyyyyyyz".as_bytes());
3295
3296 let r = truncate_and_increment_utf8("ééééé", 7).unwrap();
3298 assert_eq!(&r, "ééê".as_bytes());
3299
3300 let r = truncate_and_increment_utf8("\u{ff}\u{ff}\u{ff}\u{ff}\u{ff}", 8).unwrap();
3302 assert_eq!(&r, "\u{ff}\u{ff}\u{ff}\u{100}".as_bytes());
3303
3304 let r = truncate_and_increment_utf8("߿߿߿߿߿", 8);
3306 assert!(r.is_none());
3307
3308 let r = truncate_and_increment_utf8("ࠀࠀࠀࠀ", 8).unwrap();
3311 assert_eq!(&r, "ࠀࠁ".as_bytes());
3312
3313 let r = truncate_and_increment_utf8("\u{ffff}\u{ffff}\u{ffff}", 8);
3315 assert!(r.is_none());
3316
3317 let r = truncate_and_increment_utf8("𐀀𐀀𐀀𐀀", 9).unwrap();
3319 assert_eq!(&r, "𐀀𐀁".as_bytes());
3320
3321 let r = truncate_and_increment_utf8("\u{10ffff}\u{10ffff}", 8);
3323 assert!(r.is_none());
3324 }
3325
3326 #[test]
3327 fn test_byte_array_truncate_invalid_utf8_statistics() {
3330 let message_type = "
3331 message test_schema {
3332 OPTIONAL BYTE_ARRAY a (UTF8);
3333 }
3334 ";
3335 let schema = Arc::new(parse_message_type(message_type).unwrap());
3336
3337 let data = vec![ByteArray::from(vec![128u8; 32]); 7];
3339 let def_levels = [1, 1, 1, 1, 0, 1, 0, 1, 0, 1];
3340 let file: File = tempfile::tempfile().unwrap();
3341 let props = Arc::new(
3342 WriterProperties::builder()
3343 .set_statistics_enabled(EnabledStatistics::Chunk)
3344 .set_statistics_truncate_length(Some(8))
3345 .build(),
3346 );
3347
3348 let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
3349 let mut row_group_writer = writer.next_row_group().unwrap();
3350
3351 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
3352 col_writer
3353 .typed::<ByteArrayType>()
3354 .write_batch(&data, Some(&def_levels), None)
3355 .unwrap();
3356 col_writer.close().unwrap();
3357 row_group_writer.close().unwrap();
3358 let file_metadata = writer.close().unwrap();
3359 assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
3360 let stats = file_metadata.row_groups[0].columns[0]
3361 .meta_data
3362 .as_ref()
3363 .unwrap()
3364 .statistics
3365 .as_ref()
3366 .unwrap();
3367 assert!(!stats.is_max_value_exact.unwrap());
3368 assert_eq!(
3371 stats.max_value,
3372 Some([128, 128, 128, 128, 128, 128, 128, 129].to_vec())
3373 );
3374 }
3375
3376 #[test]
3377 fn test_increment_max_binary_chars() {
3378 let r = increment(vec![0xFF, 0xFE, 0xFD, 0xFF, 0xFF]);
3379 assert_eq!(&r.unwrap(), &[0xFF, 0xFE, 0xFE, 0x00, 0x00]);
3380
3381 let incremented = increment(vec![0xFF, 0xFF, 0xFF]);
3382 assert!(incremented.is_none())
3383 }
3384
3385 #[test]
3386 fn test_no_column_index_when_stats_disabled() {
3387 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3391 let props = Arc::new(
3392 WriterProperties::builder()
3393 .set_statistics_enabled(EnabledStatistics::None)
3394 .build(),
3395 );
3396 let column_writer = get_column_writer(descr, props, get_test_page_writer());
3397 let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3398
3399 let data = Vec::new();
3400 let def_levels = vec![0; 10];
3401 writer.write_batch(&data, Some(&def_levels), None).unwrap();
3402 writer.flush_data_pages().unwrap();
3403
3404 let column_close_result = writer.close().unwrap();
3405 assert!(column_close_result.offset_index.is_some());
3406 assert!(column_close_result.column_index.is_none());
3407 }
3408
3409 #[test]
3410 fn test_no_offset_index_when_disabled() {
3411 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3413 let props = Arc::new(
3414 WriterProperties::builder()
3415 .set_statistics_enabled(EnabledStatistics::None)
3416 .set_offset_index_disabled(true)
3417 .build(),
3418 );
3419 let column_writer = get_column_writer(descr, props, get_test_page_writer());
3420 let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3421
3422 let data = Vec::new();
3423 let def_levels = vec![0; 10];
3424 writer.write_batch(&data, Some(&def_levels), None).unwrap();
3425 writer.flush_data_pages().unwrap();
3426
3427 let column_close_result = writer.close().unwrap();
3428 assert!(column_close_result.offset_index.is_none());
3429 assert!(column_close_result.column_index.is_none());
3430 }
3431
3432 #[test]
3433 fn test_offset_index_overridden() {
3434 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3436 let props = Arc::new(
3437 WriterProperties::builder()
3438 .set_statistics_enabled(EnabledStatistics::Page)
3439 .set_offset_index_disabled(true)
3440 .build(),
3441 );
3442 let column_writer = get_column_writer(descr, props, get_test_page_writer());
3443 let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3444
3445 let data = Vec::new();
3446 let def_levels = vec![0; 10];
3447 writer.write_batch(&data, Some(&def_levels), None).unwrap();
3448 writer.flush_data_pages().unwrap();
3449
3450 let column_close_result = writer.close().unwrap();
3451 assert!(column_close_result.offset_index.is_some());
3452 assert!(column_close_result.column_index.is_some());
3453 }
3454
3455 #[test]
3456 fn test_boundary_order() -> Result<()> {
3457 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3458 let column_close_result = write_multiple_pages::<Int32Type>(
3460 &descr,
3461 &[
3462 &[Some(-10), Some(10)],
3463 &[Some(-5), Some(11)],
3464 &[None],
3465 &[Some(-5), Some(11)],
3466 ],
3467 )?;
3468 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3469 assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3470
3471 let column_close_result = write_multiple_pages::<Int32Type>(
3473 &descr,
3474 &[
3475 &[Some(10), Some(11)],
3476 &[Some(5), Some(11)],
3477 &[None],
3478 &[Some(-5), Some(0)],
3479 ],
3480 )?;
3481 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3482 assert_eq!(boundary_order, BoundaryOrder::DESCENDING);
3483
3484 let column_close_result = write_multiple_pages::<Int32Type>(
3486 &descr,
3487 &[&[Some(10), Some(11)], &[None], &[Some(10), Some(11)]],
3488 )?;
3489 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3490 assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3491
3492 let column_close_result =
3494 write_multiple_pages::<Int32Type>(&descr, &[&[None], &[None], &[None]])?;
3495 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3496 assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3497
3498 let column_close_result =
3500 write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)]])?;
3501 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3502 assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3503
3504 let column_close_result =
3506 write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)], &[None]])?;
3507 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3508 assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3509
3510 let column_close_result = write_multiple_pages::<Int32Type>(
3512 &descr,
3513 &[
3514 &[Some(10), Some(11)],
3515 &[Some(11), Some(16)],
3516 &[None],
3517 &[Some(-5), Some(0)],
3518 ],
3519 )?;
3520 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3521 assert_eq!(boundary_order, BoundaryOrder::UNORDERED);
3522
3523 let column_close_result = write_multiple_pages::<Int32Type>(
3525 &descr,
3526 &[
3527 &[Some(1), Some(9)],
3528 &[Some(2), Some(8)],
3529 &[None],
3530 &[Some(3), Some(7)],
3531 ],
3532 )?;
3533 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3534 assert_eq!(boundary_order, BoundaryOrder::UNORDERED);
3535
3536 Ok(())
3537 }
3538
3539 #[test]
3540 fn test_boundary_order_logical_type() -> Result<()> {
3541 let f16_descr = Arc::new(get_test_float16_column_descr(1, 0));
3544 let fba_descr = {
3545 let tpe = SchemaType::primitive_type_builder(
3546 "col",
3547 FixedLenByteArrayType::get_physical_type(),
3548 )
3549 .with_length(2)
3550 .build()?;
3551 Arc::new(ColumnDescriptor::new(
3552 Arc::new(tpe),
3553 1,
3554 0,
3555 ColumnPath::from("col"),
3556 ))
3557 };
3558
3559 let values: &[&[Option<FixedLenByteArray>]] = &[
3560 &[Some(FixedLenByteArray::from(ByteArray::from(f16::ONE)))],
3561 &[Some(FixedLenByteArray::from(ByteArray::from(f16::ZERO)))],
3562 &[Some(FixedLenByteArray::from(ByteArray::from(
3563 f16::NEG_ZERO,
3564 )))],
3565 &[Some(FixedLenByteArray::from(ByteArray::from(f16::NEG_ONE)))],
3566 ];
3567
3568 let column_close_result =
3570 write_multiple_pages::<FixedLenByteArrayType>(&f16_descr, values)?;
3571 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3572 assert_eq!(boundary_order, BoundaryOrder::DESCENDING);
3573
3574 let column_close_result =
3576 write_multiple_pages::<FixedLenByteArrayType>(&fba_descr, values)?;
3577 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3578 assert_eq!(boundary_order, BoundaryOrder::UNORDERED);
3579
3580 Ok(())
3581 }
3582
3583 #[test]
3584 fn test_interval_stats_should_not_have_min_max() {
3585 let input = [
3586 vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
3587 vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
3588 vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2],
3589 ]
3590 .into_iter()
3591 .map(|s| ByteArray::from(s).into())
3592 .collect::<Vec<_>>();
3593
3594 let page_writer = get_test_page_writer();
3595 let mut writer = get_test_interval_column_writer(page_writer);
3596 writer.write_batch(&input, None, None).unwrap();
3597
3598 let metadata = writer.close().unwrap().metadata;
3599 let stats = if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
3600 stats.clone()
3601 } else {
3602 panic!("metadata missing statistics");
3603 };
3604 assert!(stats.min_bytes_opt().is_none());
3605 assert!(stats.max_bytes_opt().is_none());
3606 }
3607
3608 #[test]
3609 #[cfg(feature = "arrow")]
3610 fn test_column_writer_get_estimated_total_bytes() {
3611 let page_writer = get_test_page_writer();
3612 let props = Default::default();
3613 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
3614 assert_eq!(writer.get_estimated_total_bytes(), 0);
3615
3616 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
3617 writer.add_data_page().unwrap();
3618 let size_with_one_page = writer.get_estimated_total_bytes();
3619 assert_eq!(size_with_one_page, 20);
3620
3621 writer.write_batch(&[5, 6, 7, 8], None, None).unwrap();
3622 writer.add_data_page().unwrap();
3623 let size_with_two_pages = writer.get_estimated_total_bytes();
3624 assert_eq!(size_with_two_pages, 20 + 21);
3626 }
3627
3628 fn write_multiple_pages<T: DataType>(
3629 column_descr: &Arc<ColumnDescriptor>,
3630 pages: &[&[Option<T::T>]],
3631 ) -> Result<ColumnCloseResult> {
3632 let column_writer = get_column_writer(
3633 column_descr.clone(),
3634 Default::default(),
3635 get_test_page_writer(),
3636 );
3637 let mut writer = get_typed_column_writer::<T>(column_writer);
3638
3639 for &page in pages {
3640 let values = page.iter().filter_map(Clone::clone).collect::<Vec<_>>();
3641 let def_levels = page
3642 .iter()
3643 .map(|maybe_value| if maybe_value.is_some() { 1 } else { 0 })
3644 .collect::<Vec<_>>();
3645 writer.write_batch(&values, Some(&def_levels), None)?;
3646 writer.flush_data_pages()?;
3647 }
3648
3649 writer.close()
3650 }
3651
3652 fn column_roundtrip_random<T: DataType>(
3656 props: WriterProperties,
3657 max_size: usize,
3658 min_value: T::T,
3659 max_value: T::T,
3660 max_def_level: i16,
3661 max_rep_level: i16,
3662 ) where
3663 T::T: PartialOrd + SampleUniform + Copy,
3664 {
3665 let mut num_values: usize = 0;
3666
3667 let mut buf: Vec<i16> = Vec::new();
3668 let def_levels = if max_def_level > 0 {
3669 random_numbers_range(max_size, 0, max_def_level + 1, &mut buf);
3670 for &dl in &buf[..] {
3671 if dl == max_def_level {
3672 num_values += 1;
3673 }
3674 }
3675 Some(&buf[..])
3676 } else {
3677 num_values = max_size;
3678 None
3679 };
3680
3681 let mut buf: Vec<i16> = Vec::new();
3682 let rep_levels = if max_rep_level > 0 {
3683 random_numbers_range(max_size, 0, max_rep_level + 1, &mut buf);
3684 buf[0] = 0; Some(&buf[..])
3686 } else {
3687 None
3688 };
3689
3690 let mut values: Vec<T::T> = Vec::new();
3691 random_numbers_range(num_values, min_value, max_value, &mut values);
3692
3693 column_roundtrip::<T>(props, &values[..], def_levels, rep_levels);
3694 }
3695
3696 fn column_roundtrip<T: DataType>(
3698 props: WriterProperties,
3699 values: &[T::T],
3700 def_levels: Option<&[i16]>,
3701 rep_levels: Option<&[i16]>,
3702 ) {
3703 let mut file = tempfile::tempfile().unwrap();
3704 let mut write = TrackedWrite::new(&mut file);
3705 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
3706
3707 let max_def_level = match def_levels {
3708 Some(buf) => *buf.iter().max().unwrap_or(&0i16),
3709 None => 0i16,
3710 };
3711
3712 let max_rep_level = match rep_levels {
3713 Some(buf) => *buf.iter().max().unwrap_or(&0i16),
3714 None => 0i16,
3715 };
3716
3717 let mut max_batch_size = values.len();
3718 if let Some(levels) = def_levels {
3719 max_batch_size = max_batch_size.max(levels.len());
3720 }
3721 if let Some(levels) = rep_levels {
3722 max_batch_size = max_batch_size.max(levels.len());
3723 }
3724
3725 let mut writer =
3726 get_test_column_writer::<T>(page_writer, max_def_level, max_rep_level, Arc::new(props));
3727
3728 let values_written = writer.write_batch(values, def_levels, rep_levels).unwrap();
3729 assert_eq!(values_written, values.len());
3730 let result = writer.close().unwrap();
3731
3732 drop(write);
3733
3734 let props = ReaderProperties::builder()
3735 .set_backward_compatible_lz4(false)
3736 .build();
3737 let page_reader = Box::new(
3738 SerializedPageReader::new_with_properties(
3739 Arc::new(file),
3740 &result.metadata,
3741 result.rows_written as usize,
3742 None,
3743 Arc::new(props),
3744 )
3745 .unwrap(),
3746 );
3747 let mut reader = get_test_column_reader::<T>(page_reader, max_def_level, max_rep_level);
3748
3749 let mut actual_values = Vec::with_capacity(max_batch_size);
3750 let mut actual_def_levels = def_levels.map(|_| Vec::with_capacity(max_batch_size));
3751 let mut actual_rep_levels = rep_levels.map(|_| Vec::with_capacity(max_batch_size));
3752
3753 let (_, values_read, levels_read) = reader
3754 .read_records(
3755 max_batch_size,
3756 actual_def_levels.as_mut(),
3757 actual_rep_levels.as_mut(),
3758 &mut actual_values,
3759 )
3760 .unwrap();
3761
3762 assert_eq!(&actual_values[..values_read], values);
3765 match actual_def_levels {
3766 Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), def_levels),
3767 None => assert_eq!(None, def_levels),
3768 }
3769 match actual_rep_levels {
3770 Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), rep_levels),
3771 None => assert_eq!(None, rep_levels),
3772 }
3773
3774 if let Some(levels) = actual_rep_levels {
3777 let mut actual_rows_written = 0;
3778 for l in levels {
3779 if l == 0 {
3780 actual_rows_written += 1;
3781 }
3782 }
3783 assert_eq!(actual_rows_written, result.rows_written);
3784 } else if actual_def_levels.is_some() {
3785 assert_eq!(levels_read as u64, result.rows_written);
3786 } else {
3787 assert_eq!(values_read as u64, result.rows_written);
3788 }
3789 }
3790
3791 fn column_write_and_get_metadata<T: DataType>(
3794 props: WriterProperties,
3795 values: &[T::T],
3796 ) -> ColumnChunkMetaData {
3797 let page_writer = get_test_page_writer();
3798 let props = Arc::new(props);
3799 let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
3800 writer.write_batch(values, None, None).unwrap();
3801 writer.close().unwrap().metadata
3802 }
3803
3804 fn check_encoding_write_support<T: DataType>(
3808 version: WriterVersion,
3809 dict_enabled: bool,
3810 data: &[T::T],
3811 dictionary_page_offset: Option<i64>,
3812 encodings: &[Encoding],
3813 ) {
3814 let props = WriterProperties::builder()
3815 .set_writer_version(version)
3816 .set_dictionary_enabled(dict_enabled)
3817 .build();
3818 let meta = column_write_and_get_metadata::<T>(props, data);
3819 assert_eq!(meta.dictionary_page_offset(), dictionary_page_offset);
3820 assert_eq!(meta.encodings(), &encodings);
3821 }
3822
3823 fn get_test_column_writer<'a, T: DataType>(
3825 page_writer: Box<dyn PageWriter + 'a>,
3826 max_def_level: i16,
3827 max_rep_level: i16,
3828 props: WriterPropertiesPtr,
3829 ) -> ColumnWriterImpl<'a, T> {
3830 let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
3831 let column_writer = get_column_writer(descr, props, page_writer);
3832 get_typed_column_writer::<T>(column_writer)
3833 }
3834
3835 fn get_test_column_reader<T: DataType>(
3837 page_reader: Box<dyn PageReader>,
3838 max_def_level: i16,
3839 max_rep_level: i16,
3840 ) -> ColumnReaderImpl<T> {
3841 let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
3842 let column_reader = get_column_reader(descr, page_reader);
3843 get_typed_column_reader::<T>(column_reader)
3844 }
3845
3846 fn get_test_column_descr<T: DataType>(
3848 max_def_level: i16,
3849 max_rep_level: i16,
3850 ) -> ColumnDescriptor {
3851 let path = ColumnPath::from("col");
3852 let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
3853 .with_length(1)
3856 .build()
3857 .unwrap();
3858 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
3859 }
3860
3861 fn get_test_page_writer() -> Box<dyn PageWriter> {
3863 Box::new(TestPageWriter {})
3864 }
3865
3866 struct TestPageWriter {}
3867
3868 impl PageWriter for TestPageWriter {
3869 fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
3870 let mut res = PageWriteSpec::new();
3871 res.page_type = page.page_type();
3872 res.uncompressed_size = page.uncompressed_size();
3873 res.compressed_size = page.compressed_size();
3874 res.num_values = page.num_values();
3875 res.offset = 0;
3876 res.bytes_written = page.data().len() as u64;
3877 Ok(res)
3878 }
3879
3880 fn close(&mut self) -> Result<()> {
3881 Ok(())
3882 }
3883 }
3884
3885 fn statistics_roundtrip<T: DataType>(values: &[<T as DataType>::T]) -> Statistics {
3887 let page_writer = get_test_page_writer();
3888 let props = Default::default();
3889 let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
3890 writer.write_batch(values, None, None).unwrap();
3891
3892 let metadata = writer.close().unwrap().metadata;
3893 if let Some(stats) = metadata.statistics() {
3894 stats.clone()
3895 } else {
3896 panic!("metadata missing statistics");
3897 }
3898 }
3899
3900 fn get_test_decimals_column_writer<T: DataType>(
3902 page_writer: Box<dyn PageWriter>,
3903 max_def_level: i16,
3904 max_rep_level: i16,
3905 props: WriterPropertiesPtr,
3906 ) -> ColumnWriterImpl<'static, T> {
3907 let descr = Arc::new(get_test_decimals_column_descr::<T>(
3908 max_def_level,
3909 max_rep_level,
3910 ));
3911 let column_writer = get_column_writer(descr, props, page_writer);
3912 get_typed_column_writer::<T>(column_writer)
3913 }
3914
3915 fn get_test_decimals_column_descr<T: DataType>(
3917 max_def_level: i16,
3918 max_rep_level: i16,
3919 ) -> ColumnDescriptor {
3920 let path = ColumnPath::from("col");
3921 let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
3922 .with_length(16)
3923 .with_logical_type(Some(LogicalType::Decimal {
3924 scale: 2,
3925 precision: 3,
3926 }))
3927 .with_scale(2)
3928 .with_precision(3)
3929 .build()
3930 .unwrap();
3931 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
3932 }
3933
3934 fn float16_statistics_roundtrip(
3935 values: &[FixedLenByteArray],
3936 ) -> ValueStatistics<FixedLenByteArray> {
3937 let page_writer = get_test_page_writer();
3938 let mut writer = get_test_float16_column_writer(page_writer, Default::default());
3939 writer.write_batch(values, None, None).unwrap();
3940
3941 let metadata = writer.close().unwrap().metadata;
3942 if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
3943 stats.clone()
3944 } else {
3945 panic!("metadata missing statistics");
3946 }
3947 }
3948
3949 fn get_test_float16_column_writer(
3950 page_writer: Box<dyn PageWriter>,
3951 props: WriterPropertiesPtr,
3952 ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
3953 let descr = Arc::new(get_test_float16_column_descr(0, 0));
3954 let column_writer = get_column_writer(descr, props, page_writer);
3955 get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
3956 }
3957
3958 fn get_test_float16_column_descr(max_def_level: i16, max_rep_level: i16) -> ColumnDescriptor {
3959 let path = ColumnPath::from("col");
3960 let tpe =
3961 SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
3962 .with_length(2)
3963 .with_logical_type(Some(LogicalType::Float16))
3964 .build()
3965 .unwrap();
3966 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
3967 }
3968
3969 fn get_test_interval_column_writer(
3970 page_writer: Box<dyn PageWriter>,
3971 ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
3972 let descr = Arc::new(get_test_interval_column_descr());
3973 let column_writer = get_column_writer(descr, Default::default(), page_writer);
3974 get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
3975 }
3976
3977 fn get_test_interval_column_descr() -> ColumnDescriptor {
3978 let path = ColumnPath::from("col");
3979 let tpe =
3980 SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
3981 .with_length(12)
3982 .with_converted_type(ConvertedType::INTERVAL)
3983 .build()
3984 .unwrap();
3985 ColumnDescriptor::new(Arc::new(tpe), 0, 0, path)
3986 }
3987
3988 fn get_test_unsigned_int_given_as_converted_column_writer<'a, T: DataType>(
3990 page_writer: Box<dyn PageWriter + 'a>,
3991 max_def_level: i16,
3992 max_rep_level: i16,
3993 props: WriterPropertiesPtr,
3994 ) -> ColumnWriterImpl<'a, T> {
3995 let descr = Arc::new(get_test_converted_type_unsigned_integer_column_descr::<T>(
3996 max_def_level,
3997 max_rep_level,
3998 ));
3999 let column_writer = get_column_writer(descr, props, page_writer);
4000 get_typed_column_writer::<T>(column_writer)
4001 }
4002
4003 fn get_test_converted_type_unsigned_integer_column_descr<T: DataType>(
4005 max_def_level: i16,
4006 max_rep_level: i16,
4007 ) -> ColumnDescriptor {
4008 let path = ColumnPath::from("col");
4009 let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4010 .with_converted_type(ConvertedType::UINT_32)
4011 .build()
4012 .unwrap();
4013 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4014 }
4015}