1use bytes::Bytes;
21use half::f16;
22
23use crate::bloom_filter::Sbbf;
24use crate::format::{BoundaryOrder, ColumnIndex, OffsetIndex};
25use std::collections::{BTreeSet, VecDeque};
26use std::str;
27
28use crate::basic::{Compression, ConvertedType, Encoding, LogicalType, PageType, Type};
29use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter};
30use crate::column::writer::encoder::{ColumnValueEncoder, ColumnValueEncoderImpl, ColumnValues};
31use crate::compression::{create_codec, Codec, CodecOptionsBuilder};
32use crate::data_type::private::ParquetValueType;
33use crate::data_type::*;
34use crate::encodings::levels::LevelEncoder;
35#[cfg(feature = "encryption")]
36use crate::encryption::encrypt::get_column_crypto_metadata;
37use crate::errors::{ParquetError, Result};
38use crate::file::metadata::{
39 ColumnChunkMetaDataBuilder, ColumnIndexBuilder, LevelHistogram, OffsetIndexBuilder,
40};
41use crate::file::properties::EnabledStatistics;
42use crate::file::statistics::{Statistics, ValueStatistics};
43use crate::file::{
44 metadata::ColumnChunkMetaData,
45 properties::{WriterProperties, WriterPropertiesPtr, WriterVersion},
46};
47use crate::schema::types::{ColumnDescPtr, ColumnDescriptor};
48
49pub(crate) mod encoder;
50
51macro_rules! downcast_writer {
52 ($e:expr, $i:ident, $b:expr) => {
53 match $e {
54 Self::BoolColumnWriter($i) => $b,
55 Self::Int32ColumnWriter($i) => $b,
56 Self::Int64ColumnWriter($i) => $b,
57 Self::Int96ColumnWriter($i) => $b,
58 Self::FloatColumnWriter($i) => $b,
59 Self::DoubleColumnWriter($i) => $b,
60 Self::ByteArrayColumnWriter($i) => $b,
61 Self::FixedLenByteArrayColumnWriter($i) => $b,
62 }
63 };
64}
65
66pub enum ColumnWriter<'a> {
68 BoolColumnWriter(ColumnWriterImpl<'a, BoolType>),
70 Int32ColumnWriter(ColumnWriterImpl<'a, Int32Type>),
72 Int64ColumnWriter(ColumnWriterImpl<'a, Int64Type>),
74 Int96ColumnWriter(ColumnWriterImpl<'a, Int96Type>),
76 FloatColumnWriter(ColumnWriterImpl<'a, FloatType>),
78 DoubleColumnWriter(ColumnWriterImpl<'a, DoubleType>),
80 ByteArrayColumnWriter(ColumnWriterImpl<'a, ByteArrayType>),
82 FixedLenByteArrayColumnWriter(ColumnWriterImpl<'a, FixedLenByteArrayType>),
84}
85
86impl ColumnWriter<'_> {
87 #[cfg(feature = "arrow")]
89 pub(crate) fn memory_size(&self) -> usize {
90 downcast_writer!(self, typed, typed.memory_size())
91 }
92
93 #[cfg(feature = "arrow")]
95 pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
96 downcast_writer!(self, typed, typed.get_estimated_total_bytes())
97 }
98
99 pub fn close(self) -> Result<ColumnCloseResult> {
101 downcast_writer!(self, typed, typed.close())
102 }
103}
104
105#[deprecated(
106 since = "54.0.0",
107 note = "Seems like a stray and nobody knows what's it for. Will be removed in the next release."
108)]
109#[allow(missing_docs)]
110pub enum Level {
111 Page,
112 Column,
113}
114
115pub fn get_column_writer<'a>(
117 descr: ColumnDescPtr,
118 props: WriterPropertiesPtr,
119 page_writer: Box<dyn PageWriter + 'a>,
120) -> ColumnWriter<'a> {
121 match descr.physical_type() {
122 Type::BOOLEAN => {
123 ColumnWriter::BoolColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
124 }
125 Type::INT32 => {
126 ColumnWriter::Int32ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
127 }
128 Type::INT64 => {
129 ColumnWriter::Int64ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
130 }
131 Type::INT96 => {
132 ColumnWriter::Int96ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
133 }
134 Type::FLOAT => {
135 ColumnWriter::FloatColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
136 }
137 Type::DOUBLE => {
138 ColumnWriter::DoubleColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
139 }
140 Type::BYTE_ARRAY => {
141 ColumnWriter::ByteArrayColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
142 }
143 Type::FIXED_LEN_BYTE_ARRAY => ColumnWriter::FixedLenByteArrayColumnWriter(
144 ColumnWriterImpl::new(descr, props, page_writer),
145 ),
146 }
147}
148
149pub fn get_typed_column_writer<T: DataType>(col_writer: ColumnWriter) -> ColumnWriterImpl<T> {
154 T::get_column_writer(col_writer).unwrap_or_else(|| {
155 panic!(
156 "Failed to convert column writer into a typed column writer for `{}` type",
157 T::get_physical_type()
158 )
159 })
160}
161
162pub fn get_typed_column_writer_ref<'a, 'b: 'a, T: DataType>(
164 col_writer: &'b ColumnWriter<'a>,
165) -> &'b ColumnWriterImpl<'a, T> {
166 T::get_column_writer_ref(col_writer).unwrap_or_else(|| {
167 panic!(
168 "Failed to convert column writer into a typed column writer for `{}` type",
169 T::get_physical_type()
170 )
171 })
172}
173
174pub fn get_typed_column_writer_mut<'a, 'b: 'a, T: DataType>(
176 col_writer: &'a mut ColumnWriter<'b>,
177) -> &'a mut ColumnWriterImpl<'b, T> {
178 T::get_column_writer_mut(col_writer).unwrap_or_else(|| {
179 panic!(
180 "Failed to convert column writer into a typed column writer for `{}` type",
181 T::get_physical_type()
182 )
183 })
184}
185
186#[derive(Debug, Clone)]
188pub struct ColumnCloseResult {
189 pub bytes_written: u64,
191 pub rows_written: u64,
193 pub metadata: ColumnChunkMetaData,
195 pub bloom_filter: Option<Sbbf>,
197 pub column_index: Option<ColumnIndex>,
199 pub offset_index: Option<OffsetIndex>,
201}
202
203#[derive(Default)]
205struct PageMetrics {
206 num_buffered_values: u32,
207 num_buffered_rows: u32,
208 num_page_nulls: u64,
209 repetition_level_histogram: Option<LevelHistogram>,
210 definition_level_histogram: Option<LevelHistogram>,
211}
212
213impl PageMetrics {
214 fn new() -> Self {
215 Default::default()
216 }
217
218 fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
220 self.repetition_level_histogram = LevelHistogram::try_new(max_level);
221 self
222 }
223
224 fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
226 self.definition_level_histogram = LevelHistogram::try_new(max_level);
227 self
228 }
229
230 fn new_page(&mut self) {
233 self.num_buffered_values = 0;
234 self.num_buffered_rows = 0;
235 self.num_page_nulls = 0;
236 self.repetition_level_histogram
237 .as_mut()
238 .map(LevelHistogram::reset);
239 self.definition_level_histogram
240 .as_mut()
241 .map(LevelHistogram::reset);
242 }
243
244 fn update_repetition_level_histogram(&mut self, levels: &[i16]) {
246 if let Some(ref mut rep_hist) = self.repetition_level_histogram {
247 rep_hist.update_from_levels(levels);
248 }
249 }
250
251 fn update_definition_level_histogram(&mut self, levels: &[i16]) {
253 if let Some(ref mut def_hist) = self.definition_level_histogram {
254 def_hist.update_from_levels(levels);
255 }
256 }
257}
258
259#[derive(Default)]
261struct ColumnMetrics<T: Default> {
262 total_bytes_written: u64,
263 total_rows_written: u64,
264 total_uncompressed_size: u64,
265 total_compressed_size: u64,
266 total_num_values: u64,
267 dictionary_page_offset: Option<u64>,
268 data_page_offset: Option<u64>,
269 min_column_value: Option<T>,
270 max_column_value: Option<T>,
271 num_column_nulls: u64,
272 column_distinct_count: Option<u64>,
273 variable_length_bytes: Option<i64>,
274 repetition_level_histogram: Option<LevelHistogram>,
275 definition_level_histogram: Option<LevelHistogram>,
276}
277
278impl<T: Default> ColumnMetrics<T> {
279 fn new() -> Self {
280 Default::default()
281 }
282
283 fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
285 self.repetition_level_histogram = LevelHistogram::try_new(max_level);
286 self
287 }
288
289 fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
291 self.definition_level_histogram = LevelHistogram::try_new(max_level);
292 self
293 }
294
295 fn update_histogram(
297 chunk_histogram: &mut Option<LevelHistogram>,
298 page_histogram: &Option<LevelHistogram>,
299 ) {
300 if let (Some(page_hist), Some(chunk_hist)) = (page_histogram, chunk_histogram) {
301 chunk_hist.add(page_hist);
302 }
303 }
304
305 fn update_from_page_metrics(&mut self, page_metrics: &PageMetrics) {
308 ColumnMetrics::<T>::update_histogram(
309 &mut self.definition_level_histogram,
310 &page_metrics.definition_level_histogram,
311 );
312 ColumnMetrics::<T>::update_histogram(
313 &mut self.repetition_level_histogram,
314 &page_metrics.repetition_level_histogram,
315 );
316 }
317
318 fn update_variable_length_bytes(&mut self, variable_length_bytes: Option<i64>) {
320 if let Some(var_bytes) = variable_length_bytes {
321 *self.variable_length_bytes.get_or_insert(0) += var_bytes;
322 }
323 }
324}
325
326pub type ColumnWriterImpl<'a, T> = GenericColumnWriter<'a, ColumnValueEncoderImpl<T>>;
328
329pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> {
331 descr: ColumnDescPtr,
333 props: WriterPropertiesPtr,
334 statistics_enabled: EnabledStatistics,
335
336 page_writer: Box<dyn PageWriter + 'a>,
337 codec: Compression,
338 compressor: Option<Box<dyn Codec>>,
339 encoder: E,
340
341 page_metrics: PageMetrics,
342 column_metrics: ColumnMetrics<E::T>,
344
345 encodings: BTreeSet<Encoding>,
348 def_levels_sink: Vec<i16>,
350 rep_levels_sink: Vec<i16>,
351 data_pages: VecDeque<CompressedPage>,
352 column_index_builder: ColumnIndexBuilder,
354 offset_index_builder: Option<OffsetIndexBuilder>,
355
356 data_page_boundary_ascending: bool,
359 data_page_boundary_descending: bool,
360 last_non_null_data_page_min_max: Option<(E::T, E::T)>,
362}
363
364impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
365 pub fn new(
367 descr: ColumnDescPtr,
368 props: WriterPropertiesPtr,
369 page_writer: Box<dyn PageWriter + 'a>,
370 ) -> Self {
371 let codec = props.compression(descr.path());
372 let codec_options = CodecOptionsBuilder::default().build();
373 let compressor = create_codec(codec, &codec_options).unwrap();
374 let encoder = E::try_new(&descr, props.as_ref()).unwrap();
375
376 let statistics_enabled = props.statistics_enabled(descr.path());
377
378 let mut encodings = BTreeSet::new();
379 encodings.insert(Encoding::RLE);
381
382 let mut page_metrics = PageMetrics::new();
383 let mut column_metrics = ColumnMetrics::<E::T>::new();
384
385 if statistics_enabled != EnabledStatistics::None {
387 page_metrics = page_metrics
388 .with_repetition_level_histogram(descr.max_rep_level())
389 .with_definition_level_histogram(descr.max_def_level());
390 column_metrics = column_metrics
391 .with_repetition_level_histogram(descr.max_rep_level())
392 .with_definition_level_histogram(descr.max_def_level())
393 }
394
395 let mut column_index_builder = ColumnIndexBuilder::new();
397 if statistics_enabled != EnabledStatistics::Page {
398 column_index_builder.to_invalid()
399 }
400
401 let offset_index_builder = match props.offset_index_disabled() {
403 false => Some(OffsetIndexBuilder::new()),
404 _ => None,
405 };
406
407 Self {
408 descr,
409 props,
410 statistics_enabled,
411 page_writer,
412 codec,
413 compressor,
414 encoder,
415 def_levels_sink: vec![],
416 rep_levels_sink: vec![],
417 data_pages: VecDeque::new(),
418 page_metrics,
419 column_metrics,
420 column_index_builder,
421 offset_index_builder,
422 encodings,
423 data_page_boundary_ascending: true,
424 data_page_boundary_descending: true,
425 last_non_null_data_page_min_max: None,
426 }
427 }
428
429 #[allow(clippy::too_many_arguments)]
430 pub(crate) fn write_batch_internal(
431 &mut self,
432 values: &E::Values,
433 value_indices: Option<&[usize]>,
434 def_levels: Option<&[i16]>,
435 rep_levels: Option<&[i16]>,
436 min: Option<&E::T>,
437 max: Option<&E::T>,
438 distinct_count: Option<u64>,
439 ) -> Result<usize> {
440 if let (Some(def), Some(rep)) = (def_levels, rep_levels) {
442 if def.len() != rep.len() {
443 return Err(general_err!(
444 "Inconsistent length of definition and repetition levels: {} != {}",
445 def.len(),
446 rep.len()
447 ));
448 }
449 }
450
451 let num_levels = match def_levels {
462 Some(def_levels) => def_levels.len(),
463 None => values.len(),
464 };
465
466 if let Some(min) = min {
467 update_min(&self.descr, min, &mut self.column_metrics.min_column_value);
468 }
469 if let Some(max) = max {
470 update_max(&self.descr, max, &mut self.column_metrics.max_column_value);
471 }
472
473 if self.encoder.num_values() == 0 {
475 self.column_metrics.column_distinct_count = distinct_count;
476 } else {
477 self.column_metrics.column_distinct_count = None;
478 }
479
480 let mut values_offset = 0;
481 let mut levels_offset = 0;
482 let base_batch_size = self.props.write_batch_size();
483 while levels_offset < num_levels {
484 let mut end_offset = num_levels.min(levels_offset + base_batch_size);
485
486 if let Some(r) = rep_levels {
488 while end_offset < r.len() && r[end_offset] != 0 {
489 end_offset += 1;
490 }
491 }
492
493 values_offset += self.write_mini_batch(
494 values,
495 values_offset,
496 value_indices,
497 end_offset - levels_offset,
498 def_levels.map(|lv| &lv[levels_offset..end_offset]),
499 rep_levels.map(|lv| &lv[levels_offset..end_offset]),
500 )?;
501 levels_offset = end_offset;
502 }
503
504 Ok(values_offset)
506 }
507
508 pub fn write_batch(
521 &mut self,
522 values: &E::Values,
523 def_levels: Option<&[i16]>,
524 rep_levels: Option<&[i16]>,
525 ) -> Result<usize> {
526 self.write_batch_internal(values, None, def_levels, rep_levels, None, None, None)
527 }
528
529 pub fn write_batch_with_statistics(
537 &mut self,
538 values: &E::Values,
539 def_levels: Option<&[i16]>,
540 rep_levels: Option<&[i16]>,
541 min: Option<&E::T>,
542 max: Option<&E::T>,
543 distinct_count: Option<u64>,
544 ) -> Result<usize> {
545 self.write_batch_internal(
546 values,
547 None,
548 def_levels,
549 rep_levels,
550 min,
551 max,
552 distinct_count,
553 )
554 }
555
556 #[cfg(feature = "arrow")]
561 pub(crate) fn memory_size(&self) -> usize {
562 self.column_metrics.total_bytes_written as usize + self.encoder.estimated_memory_size()
563 }
564
565 pub fn get_total_bytes_written(&self) -> u64 {
571 self.column_metrics.total_bytes_written
572 }
573
574 #[cfg(feature = "arrow")]
580 pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
581 self.data_pages
582 .iter()
583 .map(|page| page.data().len() as u64)
584 .sum::<u64>()
585 + self.column_metrics.total_bytes_written
586 + self.encoder.estimated_data_page_size() as u64
587 + self.encoder.estimated_dict_page_size().unwrap_or_default() as u64
588 }
589
590 pub fn get_total_rows_written(&self) -> u64 {
593 self.column_metrics.total_rows_written
594 }
595
596 pub fn get_descriptor(&self) -> &ColumnDescPtr {
598 &self.descr
599 }
600
601 pub fn close(mut self) -> Result<ColumnCloseResult> {
604 if self.page_metrics.num_buffered_values > 0 {
605 self.add_data_page()?;
606 }
607 if self.encoder.has_dictionary() {
608 self.write_dictionary_page()?;
609 }
610 self.flush_data_pages()?;
611 let metadata = self.build_column_metadata()?;
612 self.page_writer.close()?;
613
614 let boundary_order = match (
615 self.data_page_boundary_ascending,
616 self.data_page_boundary_descending,
617 ) {
618 (true, _) => BoundaryOrder::ASCENDING,
621 (false, true) => BoundaryOrder::DESCENDING,
622 (false, false) => BoundaryOrder::UNORDERED,
623 };
624 self.column_index_builder.set_boundary_order(boundary_order);
625
626 let column_index = self
627 .column_index_builder
628 .valid()
629 .then(|| self.column_index_builder.build_to_thrift());
630
631 let offset_index = self.offset_index_builder.map(|b| b.build_to_thrift());
632
633 Ok(ColumnCloseResult {
634 bytes_written: self.column_metrics.total_bytes_written,
635 rows_written: self.column_metrics.total_rows_written,
636 bloom_filter: self.encoder.flush_bloom_filter(),
637 metadata,
638 column_index,
639 offset_index,
640 })
641 }
642
643 fn write_mini_batch(
647 &mut self,
648 values: &E::Values,
649 values_offset: usize,
650 value_indices: Option<&[usize]>,
651 num_levels: usize,
652 def_levels: Option<&[i16]>,
653 rep_levels: Option<&[i16]>,
654 ) -> Result<usize> {
655 let values_to_write = if self.descr.max_def_level() > 0 {
657 let levels = def_levels.ok_or_else(|| {
658 general_err!(
659 "Definition levels are required, because max definition level = {}",
660 self.descr.max_def_level()
661 )
662 })?;
663
664 let mut values_to_write = 0;
665 for &level in levels {
666 if level == self.descr.max_def_level() {
667 values_to_write += 1;
668 } else {
669 self.page_metrics.num_page_nulls += 1
671 }
672 }
673
674 self.page_metrics.update_definition_level_histogram(levels);
676
677 self.def_levels_sink.extend_from_slice(levels);
678 values_to_write
679 } else {
680 num_levels
681 };
682
683 if self.descr.max_rep_level() > 0 {
685 let levels = rep_levels.ok_or_else(|| {
687 general_err!(
688 "Repetition levels are required, because max repetition level = {}",
689 self.descr.max_rep_level()
690 )
691 })?;
692
693 if !levels.is_empty() && levels[0] != 0 {
694 return Err(general_err!(
695 "Write must start at a record boundary, got non-zero repetition level of {}",
696 levels[0]
697 ));
698 }
699
700 for &level in levels {
702 self.page_metrics.num_buffered_rows += (level == 0) as u32
703 }
704
705 self.page_metrics.update_repetition_level_histogram(levels);
707
708 self.rep_levels_sink.extend_from_slice(levels);
709 } else {
710 self.page_metrics.num_buffered_rows += num_levels as u32;
713 }
714
715 match value_indices {
716 Some(indices) => {
717 let indices = &indices[values_offset..values_offset + values_to_write];
718 self.encoder.write_gather(values, indices)?;
719 }
720 None => self.encoder.write(values, values_offset, values_to_write)?,
721 }
722
723 self.page_metrics.num_buffered_values += num_levels as u32;
724
725 if self.should_add_data_page() {
726 self.add_data_page()?;
727 }
728
729 if self.should_dict_fallback() {
730 self.dict_fallback()?;
731 }
732
733 Ok(values_to_write)
734 }
735
736 #[inline]
741 fn should_dict_fallback(&self) -> bool {
742 match self.encoder.estimated_dict_page_size() {
743 Some(size) => size >= self.props.dictionary_page_size_limit(),
744 None => false,
745 }
746 }
747
748 #[inline]
750 fn should_add_data_page(&self) -> bool {
751 if self.page_metrics.num_buffered_values == 0 {
756 return false;
757 }
758
759 self.page_metrics.num_buffered_rows as usize >= self.props.data_page_row_count_limit()
760 || self.encoder.estimated_data_page_size() >= self.props.data_page_size_limit()
761 }
762
763 fn dict_fallback(&mut self) -> Result<()> {
766 if self.page_metrics.num_buffered_values > 0 {
768 self.add_data_page()?;
769 }
770 self.write_dictionary_page()?;
771 self.flush_data_pages()?;
772 Ok(())
773 }
774
775 fn update_column_offset_index(
777 &mut self,
778 page_statistics: Option<&ValueStatistics<E::T>>,
779 page_variable_length_bytes: Option<i64>,
780 ) {
781 let null_page =
783 (self.page_metrics.num_buffered_rows as u64) == self.page_metrics.num_page_nulls;
784 if null_page && self.column_index_builder.valid() {
787 self.column_index_builder.append(
788 null_page,
789 vec![],
790 vec![],
791 self.page_metrics.num_page_nulls as i64,
792 );
793 } else if self.column_index_builder.valid() {
794 match &page_statistics {
797 None => {
798 self.column_index_builder.to_invalid();
799 }
800 Some(stat) => {
801 let new_min = stat.min_opt().unwrap();
803 let new_max = stat.max_opt().unwrap();
804 if let Some((last_min, last_max)) = &self.last_non_null_data_page_min_max {
805 if self.data_page_boundary_ascending {
806 let not_ascending = compare_greater(&self.descr, last_min, new_min)
808 || compare_greater(&self.descr, last_max, new_max);
809 if not_ascending {
810 self.data_page_boundary_ascending = false;
811 }
812 }
813
814 if self.data_page_boundary_descending {
815 let not_descending = compare_greater(&self.descr, new_min, last_min)
817 || compare_greater(&self.descr, new_max, last_max);
818 if not_descending {
819 self.data_page_boundary_descending = false;
820 }
821 }
822 }
823 self.last_non_null_data_page_min_max = Some((new_min.clone(), new_max.clone()));
824
825 if self.can_truncate_value() {
826 self.column_index_builder.append(
827 null_page,
828 self.truncate_min_value(
829 self.props.column_index_truncate_length(),
830 stat.min_bytes_opt().unwrap(),
831 )
832 .0,
833 self.truncate_max_value(
834 self.props.column_index_truncate_length(),
835 stat.max_bytes_opt().unwrap(),
836 )
837 .0,
838 self.page_metrics.num_page_nulls as i64,
839 );
840 } else {
841 self.column_index_builder.append(
842 null_page,
843 stat.min_bytes_opt().unwrap().to_vec(),
844 stat.max_bytes_opt().unwrap().to_vec(),
845 self.page_metrics.num_page_nulls as i64,
846 );
847 }
848 }
849 }
850 }
851
852 self.column_index_builder.append_histograms(
854 &self.page_metrics.repetition_level_histogram,
855 &self.page_metrics.definition_level_histogram,
856 );
857
858 if let Some(builder) = self.offset_index_builder.as_mut() {
860 builder.append_row_count(self.page_metrics.num_buffered_rows as i64);
861 builder.append_unencoded_byte_array_data_bytes(page_variable_length_bytes);
862 }
863 }
864
865 fn can_truncate_value(&self) -> bool {
867 match self.descr.physical_type() {
868 Type::FIXED_LEN_BYTE_ARRAY
872 if !matches!(
873 self.descr.logical_type(),
874 Some(LogicalType::Decimal { .. }) | Some(LogicalType::Float16)
875 ) =>
876 {
877 true
878 }
879 Type::BYTE_ARRAY => true,
880 _ => false,
882 }
883 }
884
885 fn is_utf8(&self) -> bool {
887 self.get_descriptor().logical_type() == Some(LogicalType::String)
888 || self.get_descriptor().converted_type() == ConvertedType::UTF8
889 }
890
891 fn truncate_min_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
902 truncation_length
903 .filter(|l| data.len() > *l)
904 .and_then(|l|
905 if self.is_utf8() {
907 match str::from_utf8(data) {
908 Ok(str_data) => truncate_utf8(str_data, l),
909 Err(_) => Some(data[..l].to_vec()),
910 }
911 } else {
912 Some(data[..l].to_vec())
913 }
914 )
915 .map(|truncated| (truncated, true))
916 .unwrap_or_else(|| (data.to_vec(), false))
917 }
918
919 fn truncate_max_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
933 truncation_length
934 .filter(|l| data.len() > *l)
935 .and_then(|l|
936 if self.is_utf8() {
938 match str::from_utf8(data) {
939 Ok(str_data) => truncate_and_increment_utf8(str_data, l),
940 Err(_) => increment(data[..l].to_vec()),
941 }
942 } else {
943 increment(data[..l].to_vec())
944 }
945 )
946 .map(|truncated| (truncated, true))
947 .unwrap_or_else(|| (data.to_vec(), false))
948 }
949
950 fn add_data_page(&mut self) -> Result<()> {
953 let values_data = self.encoder.flush_data_page()?;
955
956 let max_def_level = self.descr.max_def_level();
957 let max_rep_level = self.descr.max_rep_level();
958
959 self.column_metrics.num_column_nulls += self.page_metrics.num_page_nulls;
960
961 let page_statistics = match (values_data.min_value, values_data.max_value) {
962 (Some(min), Some(max)) => {
963 update_min(&self.descr, &min, &mut self.column_metrics.min_column_value);
965 update_max(&self.descr, &max, &mut self.column_metrics.max_column_value);
966
967 (self.statistics_enabled == EnabledStatistics::Page).then_some(
968 ValueStatistics::new(
969 Some(min),
970 Some(max),
971 None,
972 Some(self.page_metrics.num_page_nulls),
973 false,
974 ),
975 )
976 }
977 _ => None,
978 };
979
980 self.update_column_offset_index(
982 page_statistics.as_ref(),
983 values_data.variable_length_bytes,
984 );
985
986 self.column_metrics
988 .update_from_page_metrics(&self.page_metrics);
989 self.column_metrics
990 .update_variable_length_bytes(values_data.variable_length_bytes);
991
992 let page_statistics = page_statistics.map(Statistics::from);
993
994 let compressed_page = match self.props.writer_version() {
995 WriterVersion::PARQUET_1_0 => {
996 let mut buffer = vec![];
997
998 if max_rep_level > 0 {
999 buffer.extend_from_slice(
1000 &self.encode_levels_v1(
1001 Encoding::RLE,
1002 &self.rep_levels_sink[..],
1003 max_rep_level,
1004 )[..],
1005 );
1006 }
1007
1008 if max_def_level > 0 {
1009 buffer.extend_from_slice(
1010 &self.encode_levels_v1(
1011 Encoding::RLE,
1012 &self.def_levels_sink[..],
1013 max_def_level,
1014 )[..],
1015 );
1016 }
1017
1018 buffer.extend_from_slice(&values_data.buf);
1019 let uncompressed_size = buffer.len();
1020
1021 if let Some(ref mut cmpr) = self.compressor {
1022 let mut compressed_buf = Vec::with_capacity(uncompressed_size);
1023 cmpr.compress(&buffer[..], &mut compressed_buf)?;
1024 buffer = compressed_buf;
1025 }
1026
1027 let data_page = Page::DataPage {
1028 buf: buffer.into(),
1029 num_values: self.page_metrics.num_buffered_values,
1030 encoding: values_data.encoding,
1031 def_level_encoding: Encoding::RLE,
1032 rep_level_encoding: Encoding::RLE,
1033 statistics: page_statistics,
1034 };
1035
1036 CompressedPage::new(data_page, uncompressed_size)
1037 }
1038 WriterVersion::PARQUET_2_0 => {
1039 let mut rep_levels_byte_len = 0;
1040 let mut def_levels_byte_len = 0;
1041 let mut buffer = vec![];
1042
1043 if max_rep_level > 0 {
1044 let levels = self.encode_levels_v2(&self.rep_levels_sink[..], max_rep_level);
1045 rep_levels_byte_len = levels.len();
1046 buffer.extend_from_slice(&levels[..]);
1047 }
1048
1049 if max_def_level > 0 {
1050 let levels = self.encode_levels_v2(&self.def_levels_sink[..], max_def_level);
1051 def_levels_byte_len = levels.len();
1052 buffer.extend_from_slice(&levels[..]);
1053 }
1054
1055 let uncompressed_size =
1056 rep_levels_byte_len + def_levels_byte_len + values_data.buf.len();
1057
1058 match self.compressor {
1060 Some(ref mut cmpr) => {
1061 cmpr.compress(&values_data.buf, &mut buffer)?;
1062 }
1063 None => buffer.extend_from_slice(&values_data.buf),
1064 }
1065
1066 let data_page = Page::DataPageV2 {
1067 buf: buffer.into(),
1068 num_values: self.page_metrics.num_buffered_values,
1069 encoding: values_data.encoding,
1070 num_nulls: self.page_metrics.num_page_nulls as u32,
1071 num_rows: self.page_metrics.num_buffered_rows,
1072 def_levels_byte_len: def_levels_byte_len as u32,
1073 rep_levels_byte_len: rep_levels_byte_len as u32,
1074 is_compressed: self.compressor.is_some(),
1075 statistics: page_statistics,
1076 };
1077
1078 CompressedPage::new(data_page, uncompressed_size)
1079 }
1080 };
1081
1082 if self.encoder.has_dictionary() {
1084 self.data_pages.push_back(compressed_page);
1085 } else {
1086 self.write_data_page(compressed_page)?;
1087 }
1088
1089 self.column_metrics.total_rows_written += self.page_metrics.num_buffered_rows as u64;
1091
1092 self.rep_levels_sink.clear();
1094 self.def_levels_sink.clear();
1095 self.page_metrics.new_page();
1096
1097 Ok(())
1098 }
1099
1100 #[inline]
1103 fn flush_data_pages(&mut self) -> Result<()> {
1104 if self.page_metrics.num_buffered_values > 0 {
1106 self.add_data_page()?;
1107 }
1108
1109 while let Some(page) = self.data_pages.pop_front() {
1110 self.write_data_page(page)?;
1111 }
1112
1113 Ok(())
1114 }
1115
1116 fn build_column_metadata(&mut self) -> Result<ColumnChunkMetaData> {
1118 let total_compressed_size = self.column_metrics.total_compressed_size as i64;
1119 let total_uncompressed_size = self.column_metrics.total_uncompressed_size as i64;
1120 let num_values = self.column_metrics.total_num_values as i64;
1121 let dict_page_offset = self.column_metrics.dictionary_page_offset.map(|v| v as i64);
1122 let data_page_offset = self.column_metrics.data_page_offset.unwrap_or(0) as i64;
1124
1125 let mut builder = ColumnChunkMetaData::builder(self.descr.clone())
1126 .set_compression(self.codec)
1127 .set_encodings(self.encodings.iter().cloned().collect())
1128 .set_total_compressed_size(total_compressed_size)
1129 .set_total_uncompressed_size(total_uncompressed_size)
1130 .set_num_values(num_values)
1131 .set_data_page_offset(data_page_offset)
1132 .set_dictionary_page_offset(dict_page_offset);
1133
1134 if self.statistics_enabled != EnabledStatistics::None {
1135 let backwards_compatible_min_max = self.descr.sort_order().is_signed();
1136
1137 let statistics = ValueStatistics::<E::T>::new(
1138 self.column_metrics.min_column_value.clone(),
1139 self.column_metrics.max_column_value.clone(),
1140 self.column_metrics.column_distinct_count,
1141 Some(self.column_metrics.num_column_nulls),
1142 false,
1143 )
1144 .with_backwards_compatible_min_max(backwards_compatible_min_max)
1145 .into();
1146
1147 let statistics = match statistics {
1148 Statistics::ByteArray(stats) if stats._internal_has_min_max_set() => {
1149 let (min, did_truncate_min) = self.truncate_min_value(
1150 self.props.statistics_truncate_length(),
1151 stats.min_bytes_opt().unwrap(),
1152 );
1153 let (max, did_truncate_max) = self.truncate_max_value(
1154 self.props.statistics_truncate_length(),
1155 stats.max_bytes_opt().unwrap(),
1156 );
1157 Statistics::ByteArray(
1158 ValueStatistics::new(
1159 Some(min.into()),
1160 Some(max.into()),
1161 stats.distinct_count(),
1162 stats.null_count_opt(),
1163 backwards_compatible_min_max,
1164 )
1165 .with_max_is_exact(!did_truncate_max)
1166 .with_min_is_exact(!did_truncate_min),
1167 )
1168 }
1169 Statistics::FixedLenByteArray(stats)
1170 if (stats._internal_has_min_max_set() && self.can_truncate_value()) =>
1171 {
1172 let (min, did_truncate_min) = self.truncate_min_value(
1173 self.props.statistics_truncate_length(),
1174 stats.min_bytes_opt().unwrap(),
1175 );
1176 let (max, did_truncate_max) = self.truncate_max_value(
1177 self.props.statistics_truncate_length(),
1178 stats.max_bytes_opt().unwrap(),
1179 );
1180 Statistics::FixedLenByteArray(
1181 ValueStatistics::new(
1182 Some(min.into()),
1183 Some(max.into()),
1184 stats.distinct_count(),
1185 stats.null_count_opt(),
1186 backwards_compatible_min_max,
1187 )
1188 .with_max_is_exact(!did_truncate_max)
1189 .with_min_is_exact(!did_truncate_min),
1190 )
1191 }
1192 stats => stats,
1193 };
1194
1195 builder = builder
1196 .set_statistics(statistics)
1197 .set_unencoded_byte_array_data_bytes(self.column_metrics.variable_length_bytes)
1198 .set_repetition_level_histogram(
1199 self.column_metrics.repetition_level_histogram.take(),
1200 )
1201 .set_definition_level_histogram(
1202 self.column_metrics.definition_level_histogram.take(),
1203 );
1204 }
1205
1206 builder = self.set_column_chunk_encryption_properties(builder);
1207
1208 let metadata = builder.build()?;
1209 Ok(metadata)
1210 }
1211
1212 #[inline]
1214 fn encode_levels_v1(&self, encoding: Encoding, levels: &[i16], max_level: i16) -> Vec<u8> {
1215 let mut encoder = LevelEncoder::v1(encoding, max_level, levels.len());
1216 encoder.put(levels);
1217 encoder.consume()
1218 }
1219
1220 #[inline]
1223 fn encode_levels_v2(&self, levels: &[i16], max_level: i16) -> Vec<u8> {
1224 let mut encoder = LevelEncoder::v2(max_level, levels.len());
1225 encoder.put(levels);
1226 encoder.consume()
1227 }
1228
1229 #[inline]
1231 fn write_data_page(&mut self, page: CompressedPage) -> Result<()> {
1232 self.encodings.insert(page.encoding());
1233 let page_spec = self.page_writer.write_page(page)?;
1234 if let Some(builder) = self.offset_index_builder.as_mut() {
1237 builder
1238 .append_offset_and_size(page_spec.offset as i64, page_spec.compressed_size as i32)
1239 }
1240 self.update_metrics_for_page(page_spec);
1241 Ok(())
1242 }
1243
1244 #[inline]
1246 fn write_dictionary_page(&mut self) -> Result<()> {
1247 let compressed_page = {
1248 let mut page = self
1249 .encoder
1250 .flush_dict_page()?
1251 .ok_or_else(|| general_err!("Dictionary encoder is not set"))?;
1252
1253 let uncompressed_size = page.buf.len();
1254
1255 if let Some(ref mut cmpr) = self.compressor {
1256 let mut output_buf = Vec::with_capacity(uncompressed_size);
1257 cmpr.compress(&page.buf, &mut output_buf)?;
1258 page.buf = Bytes::from(output_buf);
1259 }
1260
1261 let dict_page = Page::DictionaryPage {
1262 buf: page.buf,
1263 num_values: page.num_values as u32,
1264 encoding: self.props.dictionary_page_encoding(),
1265 is_sorted: page.is_sorted,
1266 };
1267 CompressedPage::new(dict_page, uncompressed_size)
1268 };
1269
1270 self.encodings.insert(compressed_page.encoding());
1271 let page_spec = self.page_writer.write_page(compressed_page)?;
1272 self.update_metrics_for_page(page_spec);
1273 Ok(())
1275 }
1276
1277 #[inline]
1279 fn update_metrics_for_page(&mut self, page_spec: PageWriteSpec) {
1280 self.column_metrics.total_uncompressed_size += page_spec.uncompressed_size as u64;
1281 self.column_metrics.total_compressed_size += page_spec.compressed_size as u64;
1282 self.column_metrics.total_bytes_written += page_spec.bytes_written;
1283
1284 match page_spec.page_type {
1285 PageType::DATA_PAGE | PageType::DATA_PAGE_V2 => {
1286 self.column_metrics.total_num_values += page_spec.num_values as u64;
1287 if self.column_metrics.data_page_offset.is_none() {
1288 self.column_metrics.data_page_offset = Some(page_spec.offset);
1289 }
1290 }
1291 PageType::DICTIONARY_PAGE => {
1292 assert!(
1293 self.column_metrics.dictionary_page_offset.is_none(),
1294 "Dictionary offset is already set"
1295 );
1296 self.column_metrics.dictionary_page_offset = Some(page_spec.offset);
1297 }
1298 _ => {}
1299 }
1300 }
1301
1302 #[inline]
1303 #[cfg(feature = "encryption")]
1304 fn set_column_chunk_encryption_properties(
1305 &self,
1306 builder: ColumnChunkMetaDataBuilder,
1307 ) -> ColumnChunkMetaDataBuilder {
1308 if let Some(encryption_properties) = self.props.file_encryption_properties.as_ref() {
1309 builder.set_column_crypto_metadata(get_column_crypto_metadata(
1310 encryption_properties,
1311 &self.descr,
1312 ))
1313 } else {
1314 builder
1315 }
1316 }
1317
1318 #[inline]
1319 #[cfg(not(feature = "encryption"))]
1320 fn set_column_chunk_encryption_properties(
1321 &self,
1322 builder: ColumnChunkMetaDataBuilder,
1323 ) -> ColumnChunkMetaDataBuilder {
1324 builder
1325 }
1326}
1327
1328fn update_min<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, min: &mut Option<T>) {
1329 update_stat::<T, _>(descr, val, min, |cur| compare_greater(descr, cur, val))
1330}
1331
1332fn update_max<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, max: &mut Option<T>) {
1333 update_stat::<T, _>(descr, val, max, |cur| compare_greater(descr, val, cur))
1334}
1335
1336#[inline]
1337#[allow(clippy::eq_op)]
1338fn is_nan<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T) -> bool {
1339 match T::PHYSICAL_TYPE {
1340 Type::FLOAT | Type::DOUBLE => val != val,
1341 Type::FIXED_LEN_BYTE_ARRAY if descr.logical_type() == Some(LogicalType::Float16) => {
1342 let val = val.as_bytes();
1343 let val = f16::from_le_bytes([val[0], val[1]]);
1344 val.is_nan()
1345 }
1346 _ => false,
1347 }
1348}
1349
1350fn update_stat<T: ParquetValueType, F>(
1355 descr: &ColumnDescriptor,
1356 val: &T,
1357 cur: &mut Option<T>,
1358 should_update: F,
1359) where
1360 F: Fn(&T) -> bool,
1361{
1362 if is_nan(descr, val) {
1363 return;
1364 }
1365
1366 if cur.as_ref().map_or(true, should_update) {
1367 *cur = Some(val.clone());
1368 }
1369}
1370
1371fn compare_greater<T: ParquetValueType>(descr: &ColumnDescriptor, a: &T, b: &T) -> bool {
1373 if let Some(LogicalType::Integer { is_signed, .. }) = descr.logical_type() {
1374 if !is_signed {
1375 return a.as_u64().unwrap() > b.as_u64().unwrap();
1377 }
1378 }
1379
1380 match descr.converted_type() {
1381 ConvertedType::UINT_8
1382 | ConvertedType::UINT_16
1383 | ConvertedType::UINT_32
1384 | ConvertedType::UINT_64 => {
1385 return a.as_u64().unwrap() > b.as_u64().unwrap();
1386 }
1387 _ => {}
1388 };
1389
1390 if let Some(LogicalType::Decimal { .. }) = descr.logical_type() {
1391 match T::PHYSICAL_TYPE {
1392 Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
1393 return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1394 }
1395 _ => {}
1396 };
1397 }
1398
1399 if descr.converted_type() == ConvertedType::DECIMAL {
1400 match T::PHYSICAL_TYPE {
1401 Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
1402 return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1403 }
1404 _ => {}
1405 };
1406 };
1407
1408 if let Some(LogicalType::Float16) = descr.logical_type() {
1409 let a = a.as_bytes();
1410 let a = f16::from_le_bytes([a[0], a[1]]);
1411 let b = b.as_bytes();
1412 let b = f16::from_le_bytes([b[0], b[1]]);
1413 return a > b;
1414 }
1415
1416 a > b
1417}
1418
1419fn fallback_encoding(kind: Type, props: &WriterProperties) -> Encoding {
1427 match (kind, props.writer_version()) {
1428 (Type::BOOLEAN, WriterVersion::PARQUET_2_0) => Encoding::RLE,
1429 (Type::INT32, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1430 (Type::INT64, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1431 (Type::BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1432 (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1433 _ => Encoding::PLAIN,
1434 }
1435}
1436
1437fn has_dictionary_support(kind: Type, props: &WriterProperties) -> bool {
1439 match (kind, props.writer_version()) {
1440 (Type::BOOLEAN, _) => false,
1442 (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_1_0) => false,
1444 (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => true,
1445 _ => true,
1446 }
1447}
1448
1449fn compare_greater_byte_array_decimals(a: &[u8], b: &[u8]) -> bool {
1451 let a_length = a.len();
1452 let b_length = b.len();
1453
1454 if a_length == 0 || b_length == 0 {
1455 return a_length > 0;
1456 }
1457
1458 let first_a: u8 = a[0];
1459 let first_b: u8 = b[0];
1460
1461 if (0x80 & first_a) != (0x80 & first_b) || (a_length == b_length && first_a != first_b) {
1466 return (first_a as i8) > (first_b as i8);
1467 }
1468
1469 let extension: u8 = if (first_a as i8) < 0 { 0xFF } else { 0 };
1475
1476 if a_length != b_length {
1477 let not_equal = if a_length > b_length {
1478 let lead_length = a_length - b_length;
1479 a[0..lead_length].iter().any(|&x| x != extension)
1480 } else {
1481 let lead_length = b_length - a_length;
1482 b[0..lead_length].iter().any(|&x| x != extension)
1483 };
1484
1485 if not_equal {
1486 let negative_values: bool = (first_a as i8) < 0;
1487 let a_longer: bool = a_length > b_length;
1488 return if negative_values { !a_longer } else { a_longer };
1489 }
1490 }
1491
1492 (a[1..]) > (b[1..])
1493}
1494
1495fn truncate_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1501 let split = (1..=length).rfind(|x| data.is_char_boundary(*x))?;
1502 Some(data.as_bytes()[..split].to_vec())
1503}
1504
1505fn truncate_and_increment_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1511 let lower_bound = length.saturating_sub(3);
1513 let split = (lower_bound..=length).rfind(|x| data.is_char_boundary(*x))?;
1514 increment_utf8(data.get(..split)?)
1515}
1516
1517fn increment_utf8(data: &str) -> Option<Vec<u8>> {
1524 for (idx, original_char) in data.char_indices().rev() {
1525 let original_len = original_char.len_utf8();
1526 if let Some(next_char) = char::from_u32(original_char as u32 + 1) {
1527 if next_char.len_utf8() == original_len {
1529 let mut result = data.as_bytes()[..idx + original_len].to_vec();
1530 next_char.encode_utf8(&mut result[idx..]);
1531 return Some(result);
1532 }
1533 }
1534 }
1535
1536 None
1537}
1538
1539fn increment(mut data: Vec<u8>) -> Option<Vec<u8>> {
1543 for byte in data.iter_mut().rev() {
1544 let (incremented, overflow) = byte.overflowing_add(1);
1545 *byte = incremented;
1546
1547 if !overflow {
1548 return Some(data);
1549 }
1550 }
1551
1552 None
1553}
1554
1555#[cfg(test)]
1556mod tests {
1557 use crate::{
1558 file::{properties::DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, writer::SerializedFileWriter},
1559 schema::parser::parse_message_type,
1560 };
1561 use core::str;
1562 use rand::distr::uniform::SampleUniform;
1563 use std::{fs::File, sync::Arc};
1564
1565 use crate::column::{
1566 page::PageReader,
1567 reader::{get_column_reader, get_typed_column_reader, ColumnReaderImpl},
1568 };
1569 use crate::file::writer::TrackedWrite;
1570 use crate::file::{
1571 properties::ReaderProperties, reader::SerializedPageReader, writer::SerializedPageWriter,
1572 };
1573 use crate::schema::types::{ColumnPath, Type as SchemaType};
1574 use crate::util::test_common::rand_gen::random_numbers_range;
1575
1576 use super::*;
1577
1578 #[test]
1579 fn test_column_writer_inconsistent_def_rep_length() {
1580 let page_writer = get_test_page_writer();
1581 let props = Default::default();
1582 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 1, props);
1583 let res = writer.write_batch(&[1, 2, 3, 4], Some(&[1, 1, 1]), Some(&[0, 0]));
1584 assert!(res.is_err());
1585 if let Err(err) = res {
1586 assert_eq!(
1587 format!("{err}"),
1588 "Parquet error: Inconsistent length of definition and repetition levels: 3 != 2"
1589 );
1590 }
1591 }
1592
1593 #[test]
1594 fn test_column_writer_invalid_def_levels() {
1595 let page_writer = get_test_page_writer();
1596 let props = Default::default();
1597 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1598 let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1599 assert!(res.is_err());
1600 if let Err(err) = res {
1601 assert_eq!(
1602 format!("{err}"),
1603 "Parquet error: Definition levels are required, because max definition level = 1"
1604 );
1605 }
1606 }
1607
1608 #[test]
1609 fn test_column_writer_invalid_rep_levels() {
1610 let page_writer = get_test_page_writer();
1611 let props = Default::default();
1612 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 1, props);
1613 let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1614 assert!(res.is_err());
1615 if let Err(err) = res {
1616 assert_eq!(
1617 format!("{err}"),
1618 "Parquet error: Repetition levels are required, because max repetition level = 1"
1619 );
1620 }
1621 }
1622
1623 #[test]
1624 fn test_column_writer_not_enough_values_to_write() {
1625 let page_writer = get_test_page_writer();
1626 let props = Default::default();
1627 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1628 let res = writer.write_batch(&[1, 2], Some(&[1, 1, 1, 1]), None);
1629 assert!(res.is_err());
1630 if let Err(err) = res {
1631 assert_eq!(
1632 format!("{err}"),
1633 "Parquet error: Expected to write 4 values, but have only 2"
1634 );
1635 }
1636 }
1637
1638 #[test]
1639 fn test_column_writer_write_only_one_dictionary_page() {
1640 let page_writer = get_test_page_writer();
1641 let props = Default::default();
1642 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1643 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1644 writer.add_data_page().unwrap();
1646 writer.write_dictionary_page().unwrap();
1647 let err = writer.write_dictionary_page().unwrap_err().to_string();
1648 assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1649 }
1650
1651 #[test]
1652 fn test_column_writer_error_when_writing_disabled_dictionary() {
1653 let page_writer = get_test_page_writer();
1654 let props = Arc::new(
1655 WriterProperties::builder()
1656 .set_dictionary_enabled(false)
1657 .build(),
1658 );
1659 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1660 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1661 let err = writer.write_dictionary_page().unwrap_err().to_string();
1662 assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1663 }
1664
1665 #[test]
1666 fn test_column_writer_boolean_type_does_not_support_dictionary() {
1667 let page_writer = get_test_page_writer();
1668 let props = Arc::new(
1669 WriterProperties::builder()
1670 .set_dictionary_enabled(true)
1671 .build(),
1672 );
1673 let mut writer = get_test_column_writer::<BoolType>(page_writer, 0, 0, props);
1674 writer
1675 .write_batch(&[true, false, true, false], None, None)
1676 .unwrap();
1677
1678 let r = writer.close().unwrap();
1679 assert_eq!(r.bytes_written, 1);
1682 assert_eq!(r.rows_written, 4);
1683
1684 let metadata = r.metadata;
1685 assert_eq!(metadata.encodings(), &vec![Encoding::PLAIN, Encoding::RLE]);
1686 assert_eq!(metadata.num_values(), 4); assert_eq!(metadata.dictionary_page_offset(), None);
1688 }
1689
1690 #[test]
1691 fn test_column_writer_default_encoding_support_bool() {
1692 check_encoding_write_support::<BoolType>(
1693 WriterVersion::PARQUET_1_0,
1694 true,
1695 &[true, false],
1696 None,
1697 &[Encoding::PLAIN, Encoding::RLE],
1698 );
1699 check_encoding_write_support::<BoolType>(
1700 WriterVersion::PARQUET_1_0,
1701 false,
1702 &[true, false],
1703 None,
1704 &[Encoding::PLAIN, Encoding::RLE],
1705 );
1706 check_encoding_write_support::<BoolType>(
1707 WriterVersion::PARQUET_2_0,
1708 true,
1709 &[true, false],
1710 None,
1711 &[Encoding::RLE],
1712 );
1713 check_encoding_write_support::<BoolType>(
1714 WriterVersion::PARQUET_2_0,
1715 false,
1716 &[true, false],
1717 None,
1718 &[Encoding::RLE],
1719 );
1720 }
1721
1722 #[test]
1723 fn test_column_writer_default_encoding_support_int32() {
1724 check_encoding_write_support::<Int32Type>(
1725 WriterVersion::PARQUET_1_0,
1726 true,
1727 &[1, 2],
1728 Some(0),
1729 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1730 );
1731 check_encoding_write_support::<Int32Type>(
1732 WriterVersion::PARQUET_1_0,
1733 false,
1734 &[1, 2],
1735 None,
1736 &[Encoding::PLAIN, Encoding::RLE],
1737 );
1738 check_encoding_write_support::<Int32Type>(
1739 WriterVersion::PARQUET_2_0,
1740 true,
1741 &[1, 2],
1742 Some(0),
1743 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1744 );
1745 check_encoding_write_support::<Int32Type>(
1746 WriterVersion::PARQUET_2_0,
1747 false,
1748 &[1, 2],
1749 None,
1750 &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
1751 );
1752 }
1753
1754 #[test]
1755 fn test_column_writer_default_encoding_support_int64() {
1756 check_encoding_write_support::<Int64Type>(
1757 WriterVersion::PARQUET_1_0,
1758 true,
1759 &[1, 2],
1760 Some(0),
1761 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1762 );
1763 check_encoding_write_support::<Int64Type>(
1764 WriterVersion::PARQUET_1_0,
1765 false,
1766 &[1, 2],
1767 None,
1768 &[Encoding::PLAIN, Encoding::RLE],
1769 );
1770 check_encoding_write_support::<Int64Type>(
1771 WriterVersion::PARQUET_2_0,
1772 true,
1773 &[1, 2],
1774 Some(0),
1775 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1776 );
1777 check_encoding_write_support::<Int64Type>(
1778 WriterVersion::PARQUET_2_0,
1779 false,
1780 &[1, 2],
1781 None,
1782 &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
1783 );
1784 }
1785
1786 #[test]
1787 fn test_column_writer_default_encoding_support_int96() {
1788 check_encoding_write_support::<Int96Type>(
1789 WriterVersion::PARQUET_1_0,
1790 true,
1791 &[Int96::from(vec![1, 2, 3])],
1792 Some(0),
1793 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1794 );
1795 check_encoding_write_support::<Int96Type>(
1796 WriterVersion::PARQUET_1_0,
1797 false,
1798 &[Int96::from(vec![1, 2, 3])],
1799 None,
1800 &[Encoding::PLAIN, Encoding::RLE],
1801 );
1802 check_encoding_write_support::<Int96Type>(
1803 WriterVersion::PARQUET_2_0,
1804 true,
1805 &[Int96::from(vec![1, 2, 3])],
1806 Some(0),
1807 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1808 );
1809 check_encoding_write_support::<Int96Type>(
1810 WriterVersion::PARQUET_2_0,
1811 false,
1812 &[Int96::from(vec![1, 2, 3])],
1813 None,
1814 &[Encoding::PLAIN, Encoding::RLE],
1815 );
1816 }
1817
1818 #[test]
1819 fn test_column_writer_default_encoding_support_float() {
1820 check_encoding_write_support::<FloatType>(
1821 WriterVersion::PARQUET_1_0,
1822 true,
1823 &[1.0, 2.0],
1824 Some(0),
1825 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1826 );
1827 check_encoding_write_support::<FloatType>(
1828 WriterVersion::PARQUET_1_0,
1829 false,
1830 &[1.0, 2.0],
1831 None,
1832 &[Encoding::PLAIN, Encoding::RLE],
1833 );
1834 check_encoding_write_support::<FloatType>(
1835 WriterVersion::PARQUET_2_0,
1836 true,
1837 &[1.0, 2.0],
1838 Some(0),
1839 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1840 );
1841 check_encoding_write_support::<FloatType>(
1842 WriterVersion::PARQUET_2_0,
1843 false,
1844 &[1.0, 2.0],
1845 None,
1846 &[Encoding::PLAIN, Encoding::RLE],
1847 );
1848 }
1849
1850 #[test]
1851 fn test_column_writer_default_encoding_support_double() {
1852 check_encoding_write_support::<DoubleType>(
1853 WriterVersion::PARQUET_1_0,
1854 true,
1855 &[1.0, 2.0],
1856 Some(0),
1857 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1858 );
1859 check_encoding_write_support::<DoubleType>(
1860 WriterVersion::PARQUET_1_0,
1861 false,
1862 &[1.0, 2.0],
1863 None,
1864 &[Encoding::PLAIN, Encoding::RLE],
1865 );
1866 check_encoding_write_support::<DoubleType>(
1867 WriterVersion::PARQUET_2_0,
1868 true,
1869 &[1.0, 2.0],
1870 Some(0),
1871 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1872 );
1873 check_encoding_write_support::<DoubleType>(
1874 WriterVersion::PARQUET_2_0,
1875 false,
1876 &[1.0, 2.0],
1877 None,
1878 &[Encoding::PLAIN, Encoding::RLE],
1879 );
1880 }
1881
1882 #[test]
1883 fn test_column_writer_default_encoding_support_byte_array() {
1884 check_encoding_write_support::<ByteArrayType>(
1885 WriterVersion::PARQUET_1_0,
1886 true,
1887 &[ByteArray::from(vec![1u8])],
1888 Some(0),
1889 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1890 );
1891 check_encoding_write_support::<ByteArrayType>(
1892 WriterVersion::PARQUET_1_0,
1893 false,
1894 &[ByteArray::from(vec![1u8])],
1895 None,
1896 &[Encoding::PLAIN, Encoding::RLE],
1897 );
1898 check_encoding_write_support::<ByteArrayType>(
1899 WriterVersion::PARQUET_2_0,
1900 true,
1901 &[ByteArray::from(vec![1u8])],
1902 Some(0),
1903 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1904 );
1905 check_encoding_write_support::<ByteArrayType>(
1906 WriterVersion::PARQUET_2_0,
1907 false,
1908 &[ByteArray::from(vec![1u8])],
1909 None,
1910 &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
1911 );
1912 }
1913
1914 #[test]
1915 fn test_column_writer_default_encoding_support_fixed_len_byte_array() {
1916 check_encoding_write_support::<FixedLenByteArrayType>(
1917 WriterVersion::PARQUET_1_0,
1918 true,
1919 &[ByteArray::from(vec![1u8]).into()],
1920 None,
1921 &[Encoding::PLAIN, Encoding::RLE],
1922 );
1923 check_encoding_write_support::<FixedLenByteArrayType>(
1924 WriterVersion::PARQUET_1_0,
1925 false,
1926 &[ByteArray::from(vec![1u8]).into()],
1927 None,
1928 &[Encoding::PLAIN, Encoding::RLE],
1929 );
1930 check_encoding_write_support::<FixedLenByteArrayType>(
1931 WriterVersion::PARQUET_2_0,
1932 true,
1933 &[ByteArray::from(vec![1u8]).into()],
1934 Some(0),
1935 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1936 );
1937 check_encoding_write_support::<FixedLenByteArrayType>(
1938 WriterVersion::PARQUET_2_0,
1939 false,
1940 &[ByteArray::from(vec![1u8]).into()],
1941 None,
1942 &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
1943 );
1944 }
1945
1946 #[test]
1947 fn test_column_writer_check_metadata() {
1948 let page_writer = get_test_page_writer();
1949 let props = Default::default();
1950 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1951 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1952
1953 let r = writer.close().unwrap();
1954 assert_eq!(r.bytes_written, 20);
1955 assert_eq!(r.rows_written, 4);
1956
1957 let metadata = r.metadata;
1958 assert_eq!(
1959 metadata.encodings(),
1960 &vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
1961 );
1962 assert_eq!(metadata.num_values(), 4);
1963 assert_eq!(metadata.compressed_size(), 20);
1964 assert_eq!(metadata.uncompressed_size(), 20);
1965 assert_eq!(metadata.data_page_offset(), 0);
1966 assert_eq!(metadata.dictionary_page_offset(), Some(0));
1967 if let Some(stats) = metadata.statistics() {
1968 assert_eq!(stats.null_count_opt(), Some(0));
1969 assert_eq!(stats.distinct_count_opt(), None);
1970 if let Statistics::Int32(stats) = stats {
1971 assert_eq!(stats.min_opt().unwrap(), &1);
1972 assert_eq!(stats.max_opt().unwrap(), &4);
1973 } else {
1974 panic!("expecting Statistics::Int32");
1975 }
1976 } else {
1977 panic!("metadata missing statistics");
1978 }
1979 }
1980
1981 #[test]
1982 fn test_column_writer_check_byte_array_min_max() {
1983 let page_writer = get_test_page_writer();
1984 let props = Default::default();
1985 let mut writer = get_test_decimals_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
1986 writer
1987 .write_batch(
1988 &[
1989 ByteArray::from(vec![
1990 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
1991 35u8, 231u8, 90u8, 0u8, 0u8,
1992 ]),
1993 ByteArray::from(vec![
1994 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 228u8, 62u8, 146u8,
1995 152u8, 177u8, 56u8, 0u8, 0u8,
1996 ]),
1997 ByteArray::from(vec![
1998 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8,
1999 0u8,
2000 ]),
2001 ByteArray::from(vec![
2002 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
2003 44u8, 0u8, 0u8,
2004 ]),
2005 ],
2006 None,
2007 None,
2008 )
2009 .unwrap();
2010 let metadata = writer.close().unwrap().metadata;
2011 if let Some(stats) = metadata.statistics() {
2012 if let Statistics::ByteArray(stats) = stats {
2013 assert_eq!(
2014 stats.min_opt().unwrap(),
2015 &ByteArray::from(vec![
2016 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
2017 35u8, 231u8, 90u8, 0u8, 0u8,
2018 ])
2019 );
2020 assert_eq!(
2021 stats.max_opt().unwrap(),
2022 &ByteArray::from(vec![
2023 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
2024 44u8, 0u8, 0u8,
2025 ])
2026 );
2027 } else {
2028 panic!("expecting Statistics::ByteArray");
2029 }
2030 } else {
2031 panic!("metadata missing statistics");
2032 }
2033 }
2034
2035 #[test]
2036 fn test_column_writer_uint32_converted_type_min_max() {
2037 let page_writer = get_test_page_writer();
2038 let props = Default::default();
2039 let mut writer = get_test_unsigned_int_given_as_converted_column_writer::<Int32Type>(
2040 page_writer,
2041 0,
2042 0,
2043 props,
2044 );
2045 writer.write_batch(&[0, 1, 2, 3, 4, 5], None, None).unwrap();
2046 let metadata = writer.close().unwrap().metadata;
2047 if let Some(stats) = metadata.statistics() {
2048 if let Statistics::Int32(stats) = stats {
2049 assert_eq!(stats.min_opt().unwrap(), &0,);
2050 assert_eq!(stats.max_opt().unwrap(), &5,);
2051 } else {
2052 panic!("expecting Statistics::Int32");
2053 }
2054 } else {
2055 panic!("metadata missing statistics");
2056 }
2057 }
2058
2059 #[test]
2060 fn test_column_writer_precalculated_statistics() {
2061 let page_writer = get_test_page_writer();
2062 let props = Arc::new(
2063 WriterProperties::builder()
2064 .set_statistics_enabled(EnabledStatistics::Chunk)
2065 .build(),
2066 );
2067 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2068 writer
2069 .write_batch_with_statistics(
2070 &[1, 2, 3, 4],
2071 None,
2072 None,
2073 Some(&-17),
2074 Some(&9000),
2075 Some(55),
2076 )
2077 .unwrap();
2078
2079 let r = writer.close().unwrap();
2080 assert_eq!(r.bytes_written, 20);
2081 assert_eq!(r.rows_written, 4);
2082
2083 let metadata = r.metadata;
2084 assert_eq!(
2085 metadata.encodings(),
2086 &vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
2087 );
2088 assert_eq!(metadata.num_values(), 4);
2089 assert_eq!(metadata.compressed_size(), 20);
2090 assert_eq!(metadata.uncompressed_size(), 20);
2091 assert_eq!(metadata.data_page_offset(), 0);
2092 assert_eq!(metadata.dictionary_page_offset(), Some(0));
2093 if let Some(stats) = metadata.statistics() {
2094 assert_eq!(stats.null_count_opt(), Some(0));
2095 assert_eq!(stats.distinct_count_opt().unwrap_or(0), 55);
2096 if let Statistics::Int32(stats) = stats {
2097 assert_eq!(stats.min_opt().unwrap(), &-17);
2098 assert_eq!(stats.max_opt().unwrap(), &9000);
2099 } else {
2100 panic!("expecting Statistics::Int32");
2101 }
2102 } else {
2103 panic!("metadata missing statistics");
2104 }
2105 }
2106
2107 #[test]
2108 fn test_mixed_precomputed_statistics() {
2109 let mut buf = Vec::with_capacity(100);
2110 let mut write = TrackedWrite::new(&mut buf);
2111 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2112 let props = Default::default();
2113 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2114
2115 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2116 writer
2117 .write_batch_with_statistics(&[5, 6, 7], None, None, Some(&5), Some(&7), Some(3))
2118 .unwrap();
2119
2120 let r = writer.close().unwrap();
2121
2122 let stats = r.metadata.statistics().unwrap();
2123 assert_eq!(stats.min_bytes_opt().unwrap(), 1_i32.to_le_bytes());
2124 assert_eq!(stats.max_bytes_opt().unwrap(), 7_i32.to_le_bytes());
2125 assert_eq!(stats.null_count_opt(), Some(0));
2126 assert!(stats.distinct_count_opt().is_none());
2127
2128 drop(write);
2129
2130 let props = ReaderProperties::builder()
2131 .set_backward_compatible_lz4(false)
2132 .build();
2133 let reader = SerializedPageReader::new_with_properties(
2134 Arc::new(Bytes::from(buf)),
2135 &r.metadata,
2136 r.rows_written as usize,
2137 None,
2138 Arc::new(props),
2139 )
2140 .unwrap();
2141
2142 let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2143 assert_eq!(pages.len(), 2);
2144
2145 assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2146 assert_eq!(pages[1].page_type(), PageType::DATA_PAGE);
2147
2148 let page_statistics = pages[1].statistics().unwrap();
2149 assert_eq!(
2150 page_statistics.min_bytes_opt().unwrap(),
2151 1_i32.to_le_bytes()
2152 );
2153 assert_eq!(
2154 page_statistics.max_bytes_opt().unwrap(),
2155 7_i32.to_le_bytes()
2156 );
2157 assert_eq!(page_statistics.null_count_opt(), Some(0));
2158 assert!(page_statistics.distinct_count_opt().is_none());
2159 }
2160
2161 #[test]
2162 fn test_disabled_statistics() {
2163 let mut buf = Vec::with_capacity(100);
2164 let mut write = TrackedWrite::new(&mut buf);
2165 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2166 let props = WriterProperties::builder()
2167 .set_statistics_enabled(EnabledStatistics::None)
2168 .set_writer_version(WriterVersion::PARQUET_2_0)
2169 .build();
2170 let props = Arc::new(props);
2171
2172 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
2173 writer
2174 .write_batch(&[1, 2, 3, 4], Some(&[1, 0, 0, 1, 1, 1]), None)
2175 .unwrap();
2176
2177 let r = writer.close().unwrap();
2178 assert!(r.metadata.statistics().is_none());
2179
2180 drop(write);
2181
2182 let props = ReaderProperties::builder()
2183 .set_backward_compatible_lz4(false)
2184 .build();
2185 let reader = SerializedPageReader::new_with_properties(
2186 Arc::new(Bytes::from(buf)),
2187 &r.metadata,
2188 r.rows_written as usize,
2189 None,
2190 Arc::new(props),
2191 )
2192 .unwrap();
2193
2194 let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2195 assert_eq!(pages.len(), 2);
2196
2197 assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2198 assert_eq!(pages[1].page_type(), PageType::DATA_PAGE_V2);
2199
2200 match &pages[1] {
2201 Page::DataPageV2 {
2202 num_values,
2203 num_nulls,
2204 num_rows,
2205 statistics,
2206 ..
2207 } => {
2208 assert_eq!(*num_values, 6);
2209 assert_eq!(*num_nulls, 2);
2210 assert_eq!(*num_rows, 6);
2211 assert!(statistics.is_none());
2212 }
2213 _ => unreachable!(),
2214 }
2215 }
2216
2217 #[test]
2218 fn test_column_writer_empty_column_roundtrip() {
2219 let props = Default::default();
2220 column_roundtrip::<Int32Type>(props, &[], None, None);
2221 }
2222
2223 #[test]
2224 fn test_column_writer_non_nullable_values_roundtrip() {
2225 let props = Default::default();
2226 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 0, 0);
2227 }
2228
2229 #[test]
2230 fn test_column_writer_nullable_non_repeated_values_roundtrip() {
2231 let props = Default::default();
2232 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 0);
2233 }
2234
2235 #[test]
2236 fn test_column_writer_nullable_repeated_values_roundtrip() {
2237 let props = Default::default();
2238 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2239 }
2240
2241 #[test]
2242 fn test_column_writer_dictionary_fallback_small_data_page() {
2243 let props = WriterProperties::builder()
2244 .set_dictionary_page_size_limit(32)
2245 .set_data_page_size_limit(32)
2246 .build();
2247 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2248 }
2249
2250 #[test]
2251 fn test_column_writer_small_write_batch_size() {
2252 for i in &[1usize, 2, 5, 10, 11, 1023] {
2253 let props = WriterProperties::builder().set_write_batch_size(*i).build();
2254
2255 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2256 }
2257 }
2258
2259 #[test]
2260 fn test_column_writer_dictionary_disabled_v1() {
2261 let props = WriterProperties::builder()
2262 .set_writer_version(WriterVersion::PARQUET_1_0)
2263 .set_dictionary_enabled(false)
2264 .build();
2265 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2266 }
2267
2268 #[test]
2269 fn test_column_writer_dictionary_disabled_v2() {
2270 let props = WriterProperties::builder()
2271 .set_writer_version(WriterVersion::PARQUET_2_0)
2272 .set_dictionary_enabled(false)
2273 .build();
2274 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2275 }
2276
2277 #[test]
2278 fn test_column_writer_compression_v1() {
2279 let props = WriterProperties::builder()
2280 .set_writer_version(WriterVersion::PARQUET_1_0)
2281 .set_compression(Compression::SNAPPY)
2282 .build();
2283 column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2284 }
2285
2286 #[test]
2287 fn test_column_writer_compression_v2() {
2288 let props = WriterProperties::builder()
2289 .set_writer_version(WriterVersion::PARQUET_2_0)
2290 .set_compression(Compression::SNAPPY)
2291 .build();
2292 column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2293 }
2294
2295 #[test]
2296 fn test_column_writer_add_data_pages_with_dict() {
2297 let mut file = tempfile::tempfile().unwrap();
2300 let mut write = TrackedWrite::new(&mut file);
2301 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2302 let props = Arc::new(
2303 WriterProperties::builder()
2304 .set_data_page_size_limit(10)
2305 .set_write_batch_size(3) .build(),
2307 );
2308 let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
2309 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2310 writer.write_batch(data, None, None).unwrap();
2311 let r = writer.close().unwrap();
2312
2313 drop(write);
2314
2315 let props = ReaderProperties::builder()
2317 .set_backward_compatible_lz4(false)
2318 .build();
2319 let mut page_reader = Box::new(
2320 SerializedPageReader::new_with_properties(
2321 Arc::new(file),
2322 &r.metadata,
2323 r.rows_written as usize,
2324 None,
2325 Arc::new(props),
2326 )
2327 .unwrap(),
2328 );
2329 let mut res = Vec::new();
2330 while let Some(page) = page_reader.get_next_page().unwrap() {
2331 res.push((page.page_type(), page.num_values(), page.buffer().len()));
2332 }
2333 assert_eq!(
2334 res,
2335 vec![
2336 (PageType::DICTIONARY_PAGE, 10, 40),
2337 (PageType::DATA_PAGE, 9, 10),
2338 (PageType::DATA_PAGE, 1, 3),
2339 ]
2340 );
2341 }
2342
2343 #[test]
2344 fn test_bool_statistics() {
2345 let stats = statistics_roundtrip::<BoolType>(&[true, false, false, true]);
2346 assert!(!stats.is_min_max_backwards_compatible());
2349 if let Statistics::Boolean(stats) = stats {
2350 assert_eq!(stats.min_opt().unwrap(), &false);
2351 assert_eq!(stats.max_opt().unwrap(), &true);
2352 } else {
2353 panic!("expecting Statistics::Boolean, got {stats:?}");
2354 }
2355 }
2356
2357 #[test]
2358 fn test_int32_statistics() {
2359 let stats = statistics_roundtrip::<Int32Type>(&[-1, 3, -2, 2]);
2360 assert!(stats.is_min_max_backwards_compatible());
2361 if let Statistics::Int32(stats) = stats {
2362 assert_eq!(stats.min_opt().unwrap(), &-2);
2363 assert_eq!(stats.max_opt().unwrap(), &3);
2364 } else {
2365 panic!("expecting Statistics::Int32, got {stats:?}");
2366 }
2367 }
2368
2369 #[test]
2370 fn test_int64_statistics() {
2371 let stats = statistics_roundtrip::<Int64Type>(&[-1, 3, -2, 2]);
2372 assert!(stats.is_min_max_backwards_compatible());
2373 if let Statistics::Int64(stats) = stats {
2374 assert_eq!(stats.min_opt().unwrap(), &-2);
2375 assert_eq!(stats.max_opt().unwrap(), &3);
2376 } else {
2377 panic!("expecting Statistics::Int64, got {stats:?}");
2378 }
2379 }
2380
2381 #[test]
2382 fn test_int96_statistics() {
2383 let input = vec![
2384 Int96::from(vec![1, 20, 30]),
2385 Int96::from(vec![3, 20, 10]),
2386 Int96::from(vec![0, 20, 30]),
2387 Int96::from(vec![2, 20, 30]),
2388 ]
2389 .into_iter()
2390 .collect::<Vec<Int96>>();
2391
2392 let stats = statistics_roundtrip::<Int96Type>(&input);
2393 assert!(!stats.is_min_max_backwards_compatible());
2394 if let Statistics::Int96(stats) = stats {
2395 assert_eq!(stats.min_opt().unwrap(), &Int96::from(vec![0, 20, 30]));
2396 assert_eq!(stats.max_opt().unwrap(), &Int96::from(vec![3, 20, 10]));
2397 } else {
2398 panic!("expecting Statistics::Int96, got {stats:?}");
2399 }
2400 }
2401
2402 #[test]
2403 fn test_float_statistics() {
2404 let stats = statistics_roundtrip::<FloatType>(&[-1.0, 3.0, -2.0, 2.0]);
2405 assert!(stats.is_min_max_backwards_compatible());
2406 if let Statistics::Float(stats) = stats {
2407 assert_eq!(stats.min_opt().unwrap(), &-2.0);
2408 assert_eq!(stats.max_opt().unwrap(), &3.0);
2409 } else {
2410 panic!("expecting Statistics::Float, got {stats:?}");
2411 }
2412 }
2413
2414 #[test]
2415 fn test_double_statistics() {
2416 let stats = statistics_roundtrip::<DoubleType>(&[-1.0, 3.0, -2.0, 2.0]);
2417 assert!(stats.is_min_max_backwards_compatible());
2418 if let Statistics::Double(stats) = stats {
2419 assert_eq!(stats.min_opt().unwrap(), &-2.0);
2420 assert_eq!(stats.max_opt().unwrap(), &3.0);
2421 } else {
2422 panic!("expecting Statistics::Double, got {stats:?}");
2423 }
2424 }
2425
2426 #[test]
2427 fn test_byte_array_statistics() {
2428 let input = ["aawaa", "zz", "aaw", "m", "qrs"]
2429 .iter()
2430 .map(|&s| s.into())
2431 .collect::<Vec<_>>();
2432
2433 let stats = statistics_roundtrip::<ByteArrayType>(&input);
2434 assert!(!stats.is_min_max_backwards_compatible());
2435 if let Statistics::ByteArray(stats) = stats {
2436 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from("aaw"));
2437 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from("zz"));
2438 } else {
2439 panic!("expecting Statistics::ByteArray, got {stats:?}");
2440 }
2441 }
2442
2443 #[test]
2444 fn test_fixed_len_byte_array_statistics() {
2445 let input = ["aawaa", "zz ", "aaw ", "m ", "qrs "]
2446 .iter()
2447 .map(|&s| ByteArray::from(s).into())
2448 .collect::<Vec<_>>();
2449
2450 let stats = statistics_roundtrip::<FixedLenByteArrayType>(&input);
2451 assert!(!stats.is_min_max_backwards_compatible());
2452 if let Statistics::FixedLenByteArray(stats) = stats {
2453 let expected_min: FixedLenByteArray = ByteArray::from("aaw ").into();
2454 assert_eq!(stats.min_opt().unwrap(), &expected_min);
2455 let expected_max: FixedLenByteArray = ByteArray::from("zz ").into();
2456 assert_eq!(stats.max_opt().unwrap(), &expected_max);
2457 } else {
2458 panic!("expecting Statistics::FixedLenByteArray, got {stats:?}");
2459 }
2460 }
2461
2462 #[test]
2463 fn test_column_writer_check_float16_min_max() {
2464 let input = [
2465 -f16::ONE,
2466 f16::from_f32(3.0),
2467 -f16::from_f32(2.0),
2468 f16::from_f32(2.0),
2469 ]
2470 .into_iter()
2471 .map(|s| ByteArray::from(s).into())
2472 .collect::<Vec<_>>();
2473
2474 let stats = float16_statistics_roundtrip(&input);
2475 assert!(stats.is_min_max_backwards_compatible());
2476 assert_eq!(
2477 stats.min_opt().unwrap(),
2478 &ByteArray::from(-f16::from_f32(2.0))
2479 );
2480 assert_eq!(
2481 stats.max_opt().unwrap(),
2482 &ByteArray::from(f16::from_f32(3.0))
2483 );
2484 }
2485
2486 #[test]
2487 fn test_column_writer_check_float16_nan_middle() {
2488 let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
2489 .into_iter()
2490 .map(|s| ByteArray::from(s).into())
2491 .collect::<Vec<_>>();
2492
2493 let stats = float16_statistics_roundtrip(&input);
2494 assert!(stats.is_min_max_backwards_compatible());
2495 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2496 assert_eq!(
2497 stats.max_opt().unwrap(),
2498 &ByteArray::from(f16::ONE + f16::ONE)
2499 );
2500 }
2501
2502 #[test]
2503 fn test_float16_statistics_nan_middle() {
2504 let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
2505 .into_iter()
2506 .map(|s| ByteArray::from(s).into())
2507 .collect::<Vec<_>>();
2508
2509 let stats = float16_statistics_roundtrip(&input);
2510 assert!(stats.is_min_max_backwards_compatible());
2511 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2512 assert_eq!(
2513 stats.max_opt().unwrap(),
2514 &ByteArray::from(f16::ONE + f16::ONE)
2515 );
2516 }
2517
2518 #[test]
2519 fn test_float16_statistics_nan_start() {
2520 let input = [f16::NAN, f16::ONE, f16::ONE + f16::ONE]
2521 .into_iter()
2522 .map(|s| ByteArray::from(s).into())
2523 .collect::<Vec<_>>();
2524
2525 let stats = float16_statistics_roundtrip(&input);
2526 assert!(stats.is_min_max_backwards_compatible());
2527 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2528 assert_eq!(
2529 stats.max_opt().unwrap(),
2530 &ByteArray::from(f16::ONE + f16::ONE)
2531 );
2532 }
2533
2534 #[test]
2535 fn test_float16_statistics_nan_only() {
2536 let input = [f16::NAN, f16::NAN]
2537 .into_iter()
2538 .map(|s| ByteArray::from(s).into())
2539 .collect::<Vec<_>>();
2540
2541 let stats = float16_statistics_roundtrip(&input);
2542 assert!(stats.min_bytes_opt().is_none());
2543 assert!(stats.max_bytes_opt().is_none());
2544 assert!(stats.is_min_max_backwards_compatible());
2545 }
2546
2547 #[test]
2548 fn test_float16_statistics_zero_only() {
2549 let input = [f16::ZERO]
2550 .into_iter()
2551 .map(|s| ByteArray::from(s).into())
2552 .collect::<Vec<_>>();
2553
2554 let stats = float16_statistics_roundtrip(&input);
2555 assert!(stats.is_min_max_backwards_compatible());
2556 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2557 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2558 }
2559
2560 #[test]
2561 fn test_float16_statistics_neg_zero_only() {
2562 let input = [f16::NEG_ZERO]
2563 .into_iter()
2564 .map(|s| ByteArray::from(s).into())
2565 .collect::<Vec<_>>();
2566
2567 let stats = float16_statistics_roundtrip(&input);
2568 assert!(stats.is_min_max_backwards_compatible());
2569 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2570 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2571 }
2572
2573 #[test]
2574 fn test_float16_statistics_zero_min() {
2575 let input = [f16::ZERO, f16::ONE, f16::NAN, f16::PI]
2576 .into_iter()
2577 .map(|s| ByteArray::from(s).into())
2578 .collect::<Vec<_>>();
2579
2580 let stats = float16_statistics_roundtrip(&input);
2581 assert!(stats.is_min_max_backwards_compatible());
2582 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2583 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::PI));
2584 }
2585
2586 #[test]
2587 fn test_float16_statistics_neg_zero_max() {
2588 let input = [f16::NEG_ZERO, f16::NEG_ONE, f16::NAN, -f16::PI]
2589 .into_iter()
2590 .map(|s| ByteArray::from(s).into())
2591 .collect::<Vec<_>>();
2592
2593 let stats = float16_statistics_roundtrip(&input);
2594 assert!(stats.is_min_max_backwards_compatible());
2595 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(-f16::PI));
2596 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2597 }
2598
2599 #[test]
2600 fn test_float_statistics_nan_middle() {
2601 let stats = statistics_roundtrip::<FloatType>(&[1.0, f32::NAN, 2.0]);
2602 assert!(stats.is_min_max_backwards_compatible());
2603 if let Statistics::Float(stats) = stats {
2604 assert_eq!(stats.min_opt().unwrap(), &1.0);
2605 assert_eq!(stats.max_opt().unwrap(), &2.0);
2606 } else {
2607 panic!("expecting Statistics::Float");
2608 }
2609 }
2610
2611 #[test]
2612 fn test_float_statistics_nan_start() {
2613 let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, 1.0, 2.0]);
2614 assert!(stats.is_min_max_backwards_compatible());
2615 if let Statistics::Float(stats) = stats {
2616 assert_eq!(stats.min_opt().unwrap(), &1.0);
2617 assert_eq!(stats.max_opt().unwrap(), &2.0);
2618 } else {
2619 panic!("expecting Statistics::Float");
2620 }
2621 }
2622
2623 #[test]
2624 fn test_float_statistics_nan_only() {
2625 let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, f32::NAN]);
2626 assert!(stats.min_bytes_opt().is_none());
2627 assert!(stats.max_bytes_opt().is_none());
2628 assert!(stats.is_min_max_backwards_compatible());
2629 assert!(matches!(stats, Statistics::Float(_)));
2630 }
2631
2632 #[test]
2633 fn test_float_statistics_zero_only() {
2634 let stats = statistics_roundtrip::<FloatType>(&[0.0]);
2635 assert!(stats.is_min_max_backwards_compatible());
2636 if let Statistics::Float(stats) = stats {
2637 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2638 assert!(stats.min_opt().unwrap().is_sign_negative());
2639 assert_eq!(stats.max_opt().unwrap(), &0.0);
2640 assert!(stats.max_opt().unwrap().is_sign_positive());
2641 } else {
2642 panic!("expecting Statistics::Float");
2643 }
2644 }
2645
2646 #[test]
2647 fn test_float_statistics_neg_zero_only() {
2648 let stats = statistics_roundtrip::<FloatType>(&[-0.0]);
2649 assert!(stats.is_min_max_backwards_compatible());
2650 if let Statistics::Float(stats) = stats {
2651 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2652 assert!(stats.min_opt().unwrap().is_sign_negative());
2653 assert_eq!(stats.max_opt().unwrap(), &0.0);
2654 assert!(stats.max_opt().unwrap().is_sign_positive());
2655 } else {
2656 panic!("expecting Statistics::Float");
2657 }
2658 }
2659
2660 #[test]
2661 fn test_float_statistics_zero_min() {
2662 let stats = statistics_roundtrip::<FloatType>(&[0.0, 1.0, f32::NAN, 2.0]);
2663 assert!(stats.is_min_max_backwards_compatible());
2664 if let Statistics::Float(stats) = stats {
2665 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2666 assert!(stats.min_opt().unwrap().is_sign_negative());
2667 assert_eq!(stats.max_opt().unwrap(), &2.0);
2668 } else {
2669 panic!("expecting Statistics::Float");
2670 }
2671 }
2672
2673 #[test]
2674 fn test_float_statistics_neg_zero_max() {
2675 let stats = statistics_roundtrip::<FloatType>(&[-0.0, -1.0, f32::NAN, -2.0]);
2676 assert!(stats.is_min_max_backwards_compatible());
2677 if let Statistics::Float(stats) = stats {
2678 assert_eq!(stats.min_opt().unwrap(), &-2.0);
2679 assert_eq!(stats.max_opt().unwrap(), &0.0);
2680 assert!(stats.max_opt().unwrap().is_sign_positive());
2681 } else {
2682 panic!("expecting Statistics::Float");
2683 }
2684 }
2685
2686 #[test]
2687 fn test_double_statistics_nan_middle() {
2688 let stats = statistics_roundtrip::<DoubleType>(&[1.0, f64::NAN, 2.0]);
2689 assert!(stats.is_min_max_backwards_compatible());
2690 if let Statistics::Double(stats) = stats {
2691 assert_eq!(stats.min_opt().unwrap(), &1.0);
2692 assert_eq!(stats.max_opt().unwrap(), &2.0);
2693 } else {
2694 panic!("expecting Statistics::Double");
2695 }
2696 }
2697
2698 #[test]
2699 fn test_double_statistics_nan_start() {
2700 let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, 1.0, 2.0]);
2701 assert!(stats.is_min_max_backwards_compatible());
2702 if let Statistics::Double(stats) = stats {
2703 assert_eq!(stats.min_opt().unwrap(), &1.0);
2704 assert_eq!(stats.max_opt().unwrap(), &2.0);
2705 } else {
2706 panic!("expecting Statistics::Double");
2707 }
2708 }
2709
2710 #[test]
2711 fn test_double_statistics_nan_only() {
2712 let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, f64::NAN]);
2713 assert!(stats.min_bytes_opt().is_none());
2714 assert!(stats.max_bytes_opt().is_none());
2715 assert!(matches!(stats, Statistics::Double(_)));
2716 assert!(stats.is_min_max_backwards_compatible());
2717 }
2718
2719 #[test]
2720 fn test_double_statistics_zero_only() {
2721 let stats = statistics_roundtrip::<DoubleType>(&[0.0]);
2722 assert!(stats.is_min_max_backwards_compatible());
2723 if let Statistics::Double(stats) = stats {
2724 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2725 assert!(stats.min_opt().unwrap().is_sign_negative());
2726 assert_eq!(stats.max_opt().unwrap(), &0.0);
2727 assert!(stats.max_opt().unwrap().is_sign_positive());
2728 } else {
2729 panic!("expecting Statistics::Double");
2730 }
2731 }
2732
2733 #[test]
2734 fn test_double_statistics_neg_zero_only() {
2735 let stats = statistics_roundtrip::<DoubleType>(&[-0.0]);
2736 assert!(stats.is_min_max_backwards_compatible());
2737 if let Statistics::Double(stats) = stats {
2738 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2739 assert!(stats.min_opt().unwrap().is_sign_negative());
2740 assert_eq!(stats.max_opt().unwrap(), &0.0);
2741 assert!(stats.max_opt().unwrap().is_sign_positive());
2742 } else {
2743 panic!("expecting Statistics::Double");
2744 }
2745 }
2746
2747 #[test]
2748 fn test_double_statistics_zero_min() {
2749 let stats = statistics_roundtrip::<DoubleType>(&[0.0, 1.0, f64::NAN, 2.0]);
2750 assert!(stats.is_min_max_backwards_compatible());
2751 if let Statistics::Double(stats) = stats {
2752 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2753 assert!(stats.min_opt().unwrap().is_sign_negative());
2754 assert_eq!(stats.max_opt().unwrap(), &2.0);
2755 } else {
2756 panic!("expecting Statistics::Double");
2757 }
2758 }
2759
2760 #[test]
2761 fn test_double_statistics_neg_zero_max() {
2762 let stats = statistics_roundtrip::<DoubleType>(&[-0.0, -1.0, f64::NAN, -2.0]);
2763 assert!(stats.is_min_max_backwards_compatible());
2764 if let Statistics::Double(stats) = stats {
2765 assert_eq!(stats.min_opt().unwrap(), &-2.0);
2766 assert_eq!(stats.max_opt().unwrap(), &0.0);
2767 assert!(stats.max_opt().unwrap().is_sign_positive());
2768 } else {
2769 panic!("expecting Statistics::Double");
2770 }
2771 }
2772
2773 #[test]
2774 fn test_compare_greater_byte_array_decimals() {
2775 assert!(!compare_greater_byte_array_decimals(&[], &[],),);
2776 assert!(compare_greater_byte_array_decimals(&[1u8,], &[],),);
2777 assert!(!compare_greater_byte_array_decimals(&[], &[1u8,],),);
2778 assert!(compare_greater_byte_array_decimals(&[1u8,], &[0u8,],),);
2779 assert!(!compare_greater_byte_array_decimals(&[1u8,], &[1u8,],),);
2780 assert!(compare_greater_byte_array_decimals(&[1u8, 0u8,], &[0u8,],),);
2781 assert!(!compare_greater_byte_array_decimals(
2782 &[0u8, 1u8,],
2783 &[1u8, 0u8,],
2784 ),);
2785 assert!(!compare_greater_byte_array_decimals(
2786 &[255u8, 35u8, 0u8, 0u8,],
2787 &[0u8,],
2788 ),);
2789 assert!(compare_greater_byte_array_decimals(
2790 &[0u8,],
2791 &[255u8, 35u8, 0u8, 0u8,],
2792 ),);
2793 }
2794
2795 #[test]
2796 fn test_column_index_with_null_pages() {
2797 let page_writer = get_test_page_writer();
2799 let props = Default::default();
2800 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
2801 writer.write_batch(&[], Some(&[0, 0, 0, 0]), None).unwrap();
2802
2803 let r = writer.close().unwrap();
2804 assert!(r.column_index.is_some());
2805 let col_idx = r.column_index.unwrap();
2806 assert!(col_idx.null_pages[0]);
2808 assert_eq!(col_idx.min_values[0].len(), 0);
2810 assert_eq!(col_idx.max_values[0].len(), 0);
2811 assert!(col_idx.null_counts.is_some());
2813 assert_eq!(col_idx.null_counts.as_ref().unwrap()[0], 4);
2814 assert!(col_idx.repetition_level_histograms.is_none());
2816 assert!(col_idx.definition_level_histograms.is_some());
2818 assert_eq!(col_idx.definition_level_histograms.unwrap(), &[4, 0]);
2819 }
2820
2821 #[test]
2822 fn test_column_offset_index_metadata() {
2823 let page_writer = get_test_page_writer();
2826 let props = Default::default();
2827 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2828 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2829 writer.flush_data_pages().unwrap();
2831 writer.write_batch(&[4, 8, 2, -5], None, None).unwrap();
2833
2834 let r = writer.close().unwrap();
2835 let column_index = r.column_index.unwrap();
2836 let offset_index = r.offset_index.unwrap();
2837
2838 assert_eq!(8, r.rows_written);
2839
2840 assert_eq!(2, column_index.null_pages.len());
2842 assert_eq!(2, offset_index.page_locations.len());
2843 assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order);
2844 for idx in 0..2 {
2845 assert!(!column_index.null_pages[idx]);
2846 assert_eq!(0, column_index.null_counts.as_ref().unwrap()[idx]);
2847 }
2848
2849 if let Some(stats) = r.metadata.statistics() {
2850 assert_eq!(stats.null_count_opt(), Some(0));
2851 assert_eq!(stats.distinct_count_opt(), None);
2852 if let Statistics::Int32(stats) = stats {
2853 assert_eq!(
2857 stats.min_bytes_opt(),
2858 Some(column_index.min_values[1].as_slice())
2859 );
2860 assert_eq!(
2861 stats.max_bytes_opt(),
2862 column_index.max_values.get(1).map(Vec::as_slice)
2863 );
2864 } else {
2865 panic!("expecting Statistics::Int32");
2866 }
2867 } else {
2868 panic!("metadata missing statistics");
2869 }
2870
2871 assert_eq!(0, offset_index.page_locations[0].first_row_index);
2873 assert_eq!(4, offset_index.page_locations[1].first_row_index);
2874 }
2875
2876 #[test]
2878 fn test_column_offset_index_metadata_truncating() {
2879 let page_writer = get_test_page_writer();
2882 let props = Default::default();
2883 let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
2884
2885 let mut data = vec![FixedLenByteArray::default(); 3];
2886 data[0].set_data(Bytes::from(vec![97_u8; 200]));
2888 data[1].set_data(Bytes::from(vec![112_u8; 200]));
2890 data[2].set_data(Bytes::from(vec![98_u8; 200]));
2891
2892 writer.write_batch(&data, None, None).unwrap();
2893
2894 writer.flush_data_pages().unwrap();
2895
2896 let r = writer.close().unwrap();
2897 let column_index = r.column_index.unwrap();
2898 let offset_index = r.offset_index.unwrap();
2899
2900 assert_eq!(3, r.rows_written);
2901
2902 assert_eq!(1, column_index.null_pages.len());
2904 assert_eq!(1, offset_index.page_locations.len());
2905 assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
2906 assert!(!column_index.null_pages[0]);
2907 assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]);
2908
2909 if let Some(stats) = r.metadata.statistics() {
2910 assert_eq!(stats.null_count_opt(), Some(0));
2911 assert_eq!(stats.distinct_count_opt(), None);
2912 if let Statistics::FixedLenByteArray(stats) = stats {
2913 let column_index_min_value = &column_index.min_values[0];
2914 let column_index_max_value = &column_index.max_values[0];
2915
2916 assert_ne!(
2918 stats.min_bytes_opt(),
2919 Some(column_index_min_value.as_slice())
2920 );
2921 assert_ne!(
2922 stats.max_bytes_opt(),
2923 Some(column_index_max_value.as_slice())
2924 );
2925
2926 assert_eq!(
2927 column_index_min_value.len(),
2928 DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
2929 );
2930 assert_eq!(column_index_min_value.as_slice(), &[97_u8; 64]);
2931 assert_eq!(
2932 column_index_max_value.len(),
2933 DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
2934 );
2935
2936 assert_eq!(
2938 *column_index_max_value.last().unwrap(),
2939 *column_index_max_value.first().unwrap() + 1
2940 );
2941 } else {
2942 panic!("expecting Statistics::FixedLenByteArray");
2943 }
2944 } else {
2945 panic!("metadata missing statistics");
2946 }
2947 }
2948
2949 #[test]
2950 fn test_column_offset_index_truncating_spec_example() {
2951 let page_writer = get_test_page_writer();
2954
2955 let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
2957 let props = Arc::new(builder.build());
2958 let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
2959
2960 let mut data = vec![FixedLenByteArray::default(); 1];
2961 data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
2963
2964 writer.write_batch(&data, None, None).unwrap();
2965
2966 writer.flush_data_pages().unwrap();
2967
2968 let r = writer.close().unwrap();
2969 let column_index = r.column_index.unwrap();
2970 let offset_index = r.offset_index.unwrap();
2971
2972 assert_eq!(1, r.rows_written);
2973
2974 assert_eq!(1, column_index.null_pages.len());
2976 assert_eq!(1, offset_index.page_locations.len());
2977 assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
2978 assert!(!column_index.null_pages[0]);
2979 assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]);
2980
2981 if let Some(stats) = r.metadata.statistics() {
2982 assert_eq!(stats.null_count_opt(), Some(0));
2983 assert_eq!(stats.distinct_count_opt(), None);
2984 if let Statistics::FixedLenByteArray(_stats) = stats {
2985 let column_index_min_value = &column_index.min_values[0];
2986 let column_index_max_value = &column_index.max_values[0];
2987
2988 assert_eq!(column_index_min_value.len(), 1);
2989 assert_eq!(column_index_max_value.len(), 1);
2990
2991 assert_eq!("B".as_bytes(), column_index_min_value.as_slice());
2992 assert_eq!("C".as_bytes(), column_index_max_value.as_slice());
2993
2994 assert_ne!(column_index_min_value, stats.min_bytes_opt().unwrap());
2995 assert_ne!(column_index_max_value, stats.max_bytes_opt().unwrap());
2996 } else {
2997 panic!("expecting Statistics::FixedLenByteArray");
2998 }
2999 } else {
3000 panic!("metadata missing statistics");
3001 }
3002 }
3003
3004 #[test]
3005 fn test_float16_min_max_no_truncation() {
3006 let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3008 let props = Arc::new(builder.build());
3009 let page_writer = get_test_page_writer();
3010 let mut writer = get_test_float16_column_writer(page_writer, props);
3011
3012 let expected_value = f16::PI.to_le_bytes().to_vec();
3013 let data = vec![ByteArray::from(expected_value.clone()).into()];
3014 writer.write_batch(&data, None, None).unwrap();
3015 writer.flush_data_pages().unwrap();
3016
3017 let r = writer.close().unwrap();
3018
3019 let column_index = r.column_index.unwrap();
3022 let column_index_min_bytes = column_index.min_values[0].as_slice();
3023 let column_index_max_bytes = column_index.max_values[0].as_slice();
3024 assert_eq!(expected_value, column_index_min_bytes);
3025 assert_eq!(expected_value, column_index_max_bytes);
3026
3027 let stats = r.metadata.statistics().unwrap();
3029 if let Statistics::FixedLenByteArray(stats) = stats {
3030 let stats_min_bytes = stats.min_bytes_opt().unwrap();
3031 let stats_max_bytes = stats.max_bytes_opt().unwrap();
3032 assert_eq!(expected_value, stats_min_bytes);
3033 assert_eq!(expected_value, stats_max_bytes);
3034 } else {
3035 panic!("expecting Statistics::FixedLenByteArray");
3036 }
3037 }
3038
3039 #[test]
3040 fn test_decimal_min_max_no_truncation() {
3041 let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3043 let props = Arc::new(builder.build());
3044 let page_writer = get_test_page_writer();
3045 let mut writer =
3046 get_test_decimals_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3047
3048 let expected_value = vec![
3049 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8, 35u8,
3050 231u8, 90u8, 0u8, 0u8,
3051 ];
3052 let data = vec![ByteArray::from(expected_value.clone()).into()];
3053 writer.write_batch(&data, None, None).unwrap();
3054 writer.flush_data_pages().unwrap();
3055
3056 let r = writer.close().unwrap();
3057
3058 let column_index = r.column_index.unwrap();
3061 let column_index_min_bytes = column_index.min_values[0].as_slice();
3062 let column_index_max_bytes = column_index.max_values[0].as_slice();
3063 assert_eq!(expected_value, column_index_min_bytes);
3064 assert_eq!(expected_value, column_index_max_bytes);
3065
3066 let stats = r.metadata.statistics().unwrap();
3068 if let Statistics::FixedLenByteArray(stats) = stats {
3069 let stats_min_bytes = stats.min_bytes_opt().unwrap();
3070 let stats_max_bytes = stats.max_bytes_opt().unwrap();
3071 assert_eq!(expected_value, stats_min_bytes);
3072 assert_eq!(expected_value, stats_max_bytes);
3073 } else {
3074 panic!("expecting Statistics::FixedLenByteArray");
3075 }
3076 }
3077
3078 #[test]
3079 fn test_statistics_truncating_byte_array() {
3080 let page_writer = get_test_page_writer();
3081
3082 const TEST_TRUNCATE_LENGTH: usize = 1;
3083
3084 let builder =
3086 WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3087 let props = Arc::new(builder.build());
3088 let mut writer = get_test_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
3089
3090 let mut data = vec![ByteArray::default(); 1];
3091 data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
3093
3094 writer.write_batch(&data, None, None).unwrap();
3095
3096 writer.flush_data_pages().unwrap();
3097
3098 let r = writer.close().unwrap();
3099
3100 assert_eq!(1, r.rows_written);
3101
3102 let stats = r.metadata.statistics().expect("statistics");
3103 assert_eq!(stats.null_count_opt(), Some(0));
3104 assert_eq!(stats.distinct_count_opt(), None);
3105 if let Statistics::ByteArray(_stats) = stats {
3106 let min_value = _stats.min_opt().unwrap();
3107 let max_value = _stats.max_opt().unwrap();
3108
3109 assert!(!_stats.min_is_exact());
3110 assert!(!_stats.max_is_exact());
3111
3112 assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3113 assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3114
3115 assert_eq!("B".as_bytes(), min_value.as_bytes());
3116 assert_eq!("C".as_bytes(), max_value.as_bytes());
3117 } else {
3118 panic!("expecting Statistics::ByteArray");
3119 }
3120 }
3121
3122 #[test]
3123 fn test_statistics_truncating_fixed_len_byte_array() {
3124 let page_writer = get_test_page_writer();
3125
3126 const TEST_TRUNCATE_LENGTH: usize = 1;
3127
3128 let builder =
3130 WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3131 let props = Arc::new(builder.build());
3132 let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3133
3134 let mut data = vec![FixedLenByteArray::default(); 1];
3135
3136 const PSEUDO_DECIMAL_VALUE: i128 = 6541894651216648486512564456564654;
3137 const PSEUDO_DECIMAL_BYTES: [u8; 16] = PSEUDO_DECIMAL_VALUE.to_be_bytes();
3138
3139 const EXPECTED_MIN: [u8; TEST_TRUNCATE_LENGTH] = [PSEUDO_DECIMAL_BYTES[0]]; const EXPECTED_MAX: [u8; TEST_TRUNCATE_LENGTH] =
3141 [PSEUDO_DECIMAL_BYTES[0].overflowing_add(1).0];
3142
3143 data[0].set_data(Bytes::from(PSEUDO_DECIMAL_BYTES.as_slice()));
3145
3146 writer.write_batch(&data, None, None).unwrap();
3147
3148 writer.flush_data_pages().unwrap();
3149
3150 let r = writer.close().unwrap();
3151
3152 assert_eq!(1, r.rows_written);
3153
3154 let stats = r.metadata.statistics().expect("statistics");
3155 assert_eq!(stats.null_count_opt(), Some(0));
3156 assert_eq!(stats.distinct_count_opt(), None);
3157 if let Statistics::FixedLenByteArray(_stats) = stats {
3158 let min_value = _stats.min_opt().unwrap();
3159 let max_value = _stats.max_opt().unwrap();
3160
3161 assert!(!_stats.min_is_exact());
3162 assert!(!_stats.max_is_exact());
3163
3164 assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3165 assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3166
3167 assert_eq!(EXPECTED_MIN.as_slice(), min_value.as_bytes());
3168 assert_eq!(EXPECTED_MAX.as_slice(), max_value.as_bytes());
3169
3170 let reconstructed_min = i128::from_be_bytes([
3171 min_value.as_bytes()[0],
3172 0,
3173 0,
3174 0,
3175 0,
3176 0,
3177 0,
3178 0,
3179 0,
3180 0,
3181 0,
3182 0,
3183 0,
3184 0,
3185 0,
3186 0,
3187 ]);
3188
3189 let reconstructed_max = i128::from_be_bytes([
3190 max_value.as_bytes()[0],
3191 0,
3192 0,
3193 0,
3194 0,
3195 0,
3196 0,
3197 0,
3198 0,
3199 0,
3200 0,
3201 0,
3202 0,
3203 0,
3204 0,
3205 0,
3206 ]);
3207
3208 println!("min: {reconstructed_min} {PSEUDO_DECIMAL_VALUE}");
3210 assert!(reconstructed_min <= PSEUDO_DECIMAL_VALUE);
3211 println!("max {reconstructed_max} {PSEUDO_DECIMAL_VALUE}");
3212 assert!(reconstructed_max >= PSEUDO_DECIMAL_VALUE);
3213 } else {
3214 panic!("expecting Statistics::FixedLenByteArray");
3215 }
3216 }
3217
3218 #[test]
3219 fn test_send() {
3220 fn test<T: Send>() {}
3221 test::<ColumnWriterImpl<Int32Type>>();
3222 }
3223
3224 #[test]
3225 fn test_increment() {
3226 let v = increment(vec![0, 0, 0]).unwrap();
3227 assert_eq!(&v, &[0, 0, 1]);
3228
3229 let v = increment(vec![0, 255, 255]).unwrap();
3231 assert_eq!(&v, &[1, 0, 0]);
3232
3233 let v = increment(vec![255, 255, 255]);
3235 assert!(v.is_none());
3236 }
3237
3238 #[test]
3239 fn test_increment_utf8() {
3240 let test_inc = |o: &str, expected: &str| {
3241 if let Ok(v) = String::from_utf8(increment_utf8(o).unwrap()) {
3242 assert_eq!(v, expected);
3244 assert!(*v > *o);
3246 let mut greater = ByteArray::new();
3248 greater.set_data(Bytes::from(v));
3249 let mut original = ByteArray::new();
3250 original.set_data(Bytes::from(o.as_bytes().to_vec()));
3251 assert!(greater > original);
3252 } else {
3253 panic!("Expected incremented UTF8 string to also be valid.");
3254 }
3255 };
3256
3257 test_inc("hello", "hellp");
3259
3260 test_inc("a\u{7f}", "b");
3262
3263 assert!(increment_utf8("\u{7f}\u{7f}").is_none());
3265
3266 test_inc("❤️🧡💛💚💙💜", "❤️🧡💛💚💙💝");
3268
3269 test_inc("éééé", "éééê");
3271
3272 test_inc("\u{ff}\u{ff}", "\u{ff}\u{100}");
3274
3275 test_inc("a\u{7ff}", "b");
3277
3278 assert!(increment_utf8("\u{7ff}\u{7ff}").is_none());
3280
3281 test_inc("ࠀࠀ", "ࠀࠁ");
3284
3285 test_inc("a\u{ffff}", "b");
3287
3288 assert!(increment_utf8("\u{ffff}\u{ffff}").is_none());
3290
3291 test_inc("𐀀𐀀", "𐀀𐀁");
3293
3294 test_inc("a\u{10ffff}", "b");
3296
3297 assert!(increment_utf8("\u{10ffff}\u{10ffff}").is_none());
3299
3300 test_inc("a\u{D7FF}", "b");
3303 }
3304
3305 #[test]
3306 fn test_truncate_utf8() {
3307 let data = "❤️🧡💛💚💙💜";
3309 let r = truncate_utf8(data, data.len()).unwrap();
3310 assert_eq!(r.len(), data.len());
3311 assert_eq!(&r, data.as_bytes());
3312
3313 let r = truncate_utf8(data, 13).unwrap();
3315 assert_eq!(r.len(), 10);
3316 assert_eq!(&r, "❤️🧡".as_bytes());
3317
3318 let r = truncate_utf8("\u{0836}", 1);
3320 assert!(r.is_none());
3321
3322 let r = truncate_and_increment_utf8("yyyyyyyyy", 8).unwrap();
3325 assert_eq!(&r, "yyyyyyyz".as_bytes());
3326
3327 let r = truncate_and_increment_utf8("ééééé", 7).unwrap();
3329 assert_eq!(&r, "ééê".as_bytes());
3330
3331 let r = truncate_and_increment_utf8("\u{ff}\u{ff}\u{ff}\u{ff}\u{ff}", 8).unwrap();
3333 assert_eq!(&r, "\u{ff}\u{ff}\u{ff}\u{100}".as_bytes());
3334
3335 let r = truncate_and_increment_utf8("߿߿߿߿߿", 8);
3337 assert!(r.is_none());
3338
3339 let r = truncate_and_increment_utf8("ࠀࠀࠀࠀ", 8).unwrap();
3342 assert_eq!(&r, "ࠀࠁ".as_bytes());
3343
3344 let r = truncate_and_increment_utf8("\u{ffff}\u{ffff}\u{ffff}", 8);
3346 assert!(r.is_none());
3347
3348 let r = truncate_and_increment_utf8("𐀀𐀀𐀀𐀀", 9).unwrap();
3350 assert_eq!(&r, "𐀀𐀁".as_bytes());
3351
3352 let r = truncate_and_increment_utf8("\u{10ffff}\u{10ffff}", 8);
3354 assert!(r.is_none());
3355 }
3356
3357 #[test]
3358 fn test_byte_array_truncate_invalid_utf8_statistics() {
3361 let message_type = "
3362 message test_schema {
3363 OPTIONAL BYTE_ARRAY a (UTF8);
3364 }
3365 ";
3366 let schema = Arc::new(parse_message_type(message_type).unwrap());
3367
3368 let data = vec![ByteArray::from(vec![128u8; 32]); 7];
3370 let def_levels = [1, 1, 1, 1, 0, 1, 0, 1, 0, 1];
3371 let file: File = tempfile::tempfile().unwrap();
3372 let props = Arc::new(
3373 WriterProperties::builder()
3374 .set_statistics_enabled(EnabledStatistics::Chunk)
3375 .set_statistics_truncate_length(Some(8))
3376 .build(),
3377 );
3378
3379 let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
3380 let mut row_group_writer = writer.next_row_group().unwrap();
3381
3382 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
3383 col_writer
3384 .typed::<ByteArrayType>()
3385 .write_batch(&data, Some(&def_levels), None)
3386 .unwrap();
3387 col_writer.close().unwrap();
3388 row_group_writer.close().unwrap();
3389 let file_metadata = writer.close().unwrap();
3390 assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
3391 let stats = file_metadata.row_groups[0].columns[0]
3392 .meta_data
3393 .as_ref()
3394 .unwrap()
3395 .statistics
3396 .as_ref()
3397 .unwrap();
3398 assert!(!stats.is_max_value_exact.unwrap());
3399 assert_eq!(
3402 stats.max_value,
3403 Some([128, 128, 128, 128, 128, 128, 128, 129].to_vec())
3404 );
3405 }
3406
3407 #[test]
3408 fn test_increment_max_binary_chars() {
3409 let r = increment(vec![0xFF, 0xFE, 0xFD, 0xFF, 0xFF]);
3410 assert_eq!(&r.unwrap(), &[0xFF, 0xFE, 0xFE, 0x00, 0x00]);
3411
3412 let incremented = increment(vec![0xFF, 0xFF, 0xFF]);
3413 assert!(incremented.is_none())
3414 }
3415
3416 #[test]
3417 fn test_no_column_index_when_stats_disabled() {
3418 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3422 let props = Arc::new(
3423 WriterProperties::builder()
3424 .set_statistics_enabled(EnabledStatistics::None)
3425 .build(),
3426 );
3427 let column_writer = get_column_writer(descr, props, get_test_page_writer());
3428 let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3429
3430 let data = Vec::new();
3431 let def_levels = vec![0; 10];
3432 writer.write_batch(&data, Some(&def_levels), None).unwrap();
3433 writer.flush_data_pages().unwrap();
3434
3435 let column_close_result = writer.close().unwrap();
3436 assert!(column_close_result.offset_index.is_some());
3437 assert!(column_close_result.column_index.is_none());
3438 }
3439
3440 #[test]
3441 fn test_no_offset_index_when_disabled() {
3442 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3444 let props = Arc::new(
3445 WriterProperties::builder()
3446 .set_statistics_enabled(EnabledStatistics::None)
3447 .set_offset_index_disabled(true)
3448 .build(),
3449 );
3450 let column_writer = get_column_writer(descr, props, get_test_page_writer());
3451 let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3452
3453 let data = Vec::new();
3454 let def_levels = vec![0; 10];
3455 writer.write_batch(&data, Some(&def_levels), None).unwrap();
3456 writer.flush_data_pages().unwrap();
3457
3458 let column_close_result = writer.close().unwrap();
3459 assert!(column_close_result.offset_index.is_none());
3460 assert!(column_close_result.column_index.is_none());
3461 }
3462
3463 #[test]
3464 fn test_offset_index_overridden() {
3465 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3467 let props = Arc::new(
3468 WriterProperties::builder()
3469 .set_statistics_enabled(EnabledStatistics::Page)
3470 .set_offset_index_disabled(true)
3471 .build(),
3472 );
3473 let column_writer = get_column_writer(descr, props, get_test_page_writer());
3474 let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3475
3476 let data = Vec::new();
3477 let def_levels = vec![0; 10];
3478 writer.write_batch(&data, Some(&def_levels), None).unwrap();
3479 writer.flush_data_pages().unwrap();
3480
3481 let column_close_result = writer.close().unwrap();
3482 assert!(column_close_result.offset_index.is_some());
3483 assert!(column_close_result.column_index.is_some());
3484 }
3485
3486 #[test]
3487 fn test_boundary_order() -> Result<()> {
3488 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3489 let column_close_result = write_multiple_pages::<Int32Type>(
3491 &descr,
3492 &[
3493 &[Some(-10), Some(10)],
3494 &[Some(-5), Some(11)],
3495 &[None],
3496 &[Some(-5), Some(11)],
3497 ],
3498 )?;
3499 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3500 assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3501
3502 let column_close_result = write_multiple_pages::<Int32Type>(
3504 &descr,
3505 &[
3506 &[Some(10), Some(11)],
3507 &[Some(5), Some(11)],
3508 &[None],
3509 &[Some(-5), Some(0)],
3510 ],
3511 )?;
3512 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3513 assert_eq!(boundary_order, BoundaryOrder::DESCENDING);
3514
3515 let column_close_result = write_multiple_pages::<Int32Type>(
3517 &descr,
3518 &[&[Some(10), Some(11)], &[None], &[Some(10), Some(11)]],
3519 )?;
3520 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3521 assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3522
3523 let column_close_result =
3525 write_multiple_pages::<Int32Type>(&descr, &[&[None], &[None], &[None]])?;
3526 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3527 assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3528
3529 let column_close_result =
3531 write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)]])?;
3532 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3533 assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3534
3535 let column_close_result =
3537 write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)], &[None]])?;
3538 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3539 assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3540
3541 let column_close_result = write_multiple_pages::<Int32Type>(
3543 &descr,
3544 &[
3545 &[Some(10), Some(11)],
3546 &[Some(11), Some(16)],
3547 &[None],
3548 &[Some(-5), Some(0)],
3549 ],
3550 )?;
3551 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3552 assert_eq!(boundary_order, BoundaryOrder::UNORDERED);
3553
3554 let column_close_result = write_multiple_pages::<Int32Type>(
3556 &descr,
3557 &[
3558 &[Some(1), Some(9)],
3559 &[Some(2), Some(8)],
3560 &[None],
3561 &[Some(3), Some(7)],
3562 ],
3563 )?;
3564 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3565 assert_eq!(boundary_order, BoundaryOrder::UNORDERED);
3566
3567 Ok(())
3568 }
3569
3570 #[test]
3571 fn test_boundary_order_logical_type() -> Result<()> {
3572 let f16_descr = Arc::new(get_test_float16_column_descr(1, 0));
3575 let fba_descr = {
3576 let tpe = SchemaType::primitive_type_builder(
3577 "col",
3578 FixedLenByteArrayType::get_physical_type(),
3579 )
3580 .with_length(2)
3581 .build()?;
3582 Arc::new(ColumnDescriptor::new(
3583 Arc::new(tpe),
3584 1,
3585 0,
3586 ColumnPath::from("col"),
3587 ))
3588 };
3589
3590 let values: &[&[Option<FixedLenByteArray>]] = &[
3591 &[Some(FixedLenByteArray::from(ByteArray::from(f16::ONE)))],
3592 &[Some(FixedLenByteArray::from(ByteArray::from(f16::ZERO)))],
3593 &[Some(FixedLenByteArray::from(ByteArray::from(
3594 f16::NEG_ZERO,
3595 )))],
3596 &[Some(FixedLenByteArray::from(ByteArray::from(f16::NEG_ONE)))],
3597 ];
3598
3599 let column_close_result =
3601 write_multiple_pages::<FixedLenByteArrayType>(&f16_descr, values)?;
3602 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3603 assert_eq!(boundary_order, BoundaryOrder::DESCENDING);
3604
3605 let column_close_result =
3607 write_multiple_pages::<FixedLenByteArrayType>(&fba_descr, values)?;
3608 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3609 assert_eq!(boundary_order, BoundaryOrder::UNORDERED);
3610
3611 Ok(())
3612 }
3613
3614 #[test]
3615 fn test_interval_stats_should_not_have_min_max() {
3616 let input = [
3617 vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
3618 vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
3619 vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2],
3620 ]
3621 .into_iter()
3622 .map(|s| ByteArray::from(s).into())
3623 .collect::<Vec<_>>();
3624
3625 let page_writer = get_test_page_writer();
3626 let mut writer = get_test_interval_column_writer(page_writer);
3627 writer.write_batch(&input, None, None).unwrap();
3628
3629 let metadata = writer.close().unwrap().metadata;
3630 let stats = if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
3631 stats.clone()
3632 } else {
3633 panic!("metadata missing statistics");
3634 };
3635 assert!(stats.min_bytes_opt().is_none());
3636 assert!(stats.max_bytes_opt().is_none());
3637 }
3638
3639 #[test]
3640 #[cfg(feature = "arrow")]
3641 fn test_column_writer_get_estimated_total_bytes() {
3642 let page_writer = get_test_page_writer();
3643 let props = Default::default();
3644 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
3645 assert_eq!(writer.get_estimated_total_bytes(), 0);
3646
3647 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
3648 writer.add_data_page().unwrap();
3649 let size_with_one_page = writer.get_estimated_total_bytes();
3650 assert_eq!(size_with_one_page, 20);
3651
3652 writer.write_batch(&[5, 6, 7, 8], None, None).unwrap();
3653 writer.add_data_page().unwrap();
3654 let size_with_two_pages = writer.get_estimated_total_bytes();
3655 assert_eq!(size_with_two_pages, 20 + 21);
3657 }
3658
3659 fn write_multiple_pages<T: DataType>(
3660 column_descr: &Arc<ColumnDescriptor>,
3661 pages: &[&[Option<T::T>]],
3662 ) -> Result<ColumnCloseResult> {
3663 let column_writer = get_column_writer(
3664 column_descr.clone(),
3665 Default::default(),
3666 get_test_page_writer(),
3667 );
3668 let mut writer = get_typed_column_writer::<T>(column_writer);
3669
3670 for &page in pages {
3671 let values = page.iter().filter_map(Clone::clone).collect::<Vec<_>>();
3672 let def_levels = page
3673 .iter()
3674 .map(|maybe_value| if maybe_value.is_some() { 1 } else { 0 })
3675 .collect::<Vec<_>>();
3676 writer.write_batch(&values, Some(&def_levels), None)?;
3677 writer.flush_data_pages()?;
3678 }
3679
3680 writer.close()
3681 }
3682
3683 fn column_roundtrip_random<T: DataType>(
3687 props: WriterProperties,
3688 max_size: usize,
3689 min_value: T::T,
3690 max_value: T::T,
3691 max_def_level: i16,
3692 max_rep_level: i16,
3693 ) where
3694 T::T: PartialOrd + SampleUniform + Copy,
3695 {
3696 let mut num_values: usize = 0;
3697
3698 let mut buf: Vec<i16> = Vec::new();
3699 let def_levels = if max_def_level > 0 {
3700 random_numbers_range(max_size, 0, max_def_level + 1, &mut buf);
3701 for &dl in &buf[..] {
3702 if dl == max_def_level {
3703 num_values += 1;
3704 }
3705 }
3706 Some(&buf[..])
3707 } else {
3708 num_values = max_size;
3709 None
3710 };
3711
3712 let mut buf: Vec<i16> = Vec::new();
3713 let rep_levels = if max_rep_level > 0 {
3714 random_numbers_range(max_size, 0, max_rep_level + 1, &mut buf);
3715 buf[0] = 0; Some(&buf[..])
3717 } else {
3718 None
3719 };
3720
3721 let mut values: Vec<T::T> = Vec::new();
3722 random_numbers_range(num_values, min_value, max_value, &mut values);
3723
3724 column_roundtrip::<T>(props, &values[..], def_levels, rep_levels);
3725 }
3726
3727 fn column_roundtrip<T: DataType>(
3729 props: WriterProperties,
3730 values: &[T::T],
3731 def_levels: Option<&[i16]>,
3732 rep_levels: Option<&[i16]>,
3733 ) {
3734 let mut file = tempfile::tempfile().unwrap();
3735 let mut write = TrackedWrite::new(&mut file);
3736 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
3737
3738 let max_def_level = match def_levels {
3739 Some(buf) => *buf.iter().max().unwrap_or(&0i16),
3740 None => 0i16,
3741 };
3742
3743 let max_rep_level = match rep_levels {
3744 Some(buf) => *buf.iter().max().unwrap_or(&0i16),
3745 None => 0i16,
3746 };
3747
3748 let mut max_batch_size = values.len();
3749 if let Some(levels) = def_levels {
3750 max_batch_size = max_batch_size.max(levels.len());
3751 }
3752 if let Some(levels) = rep_levels {
3753 max_batch_size = max_batch_size.max(levels.len());
3754 }
3755
3756 let mut writer =
3757 get_test_column_writer::<T>(page_writer, max_def_level, max_rep_level, Arc::new(props));
3758
3759 let values_written = writer.write_batch(values, def_levels, rep_levels).unwrap();
3760 assert_eq!(values_written, values.len());
3761 let result = writer.close().unwrap();
3762
3763 drop(write);
3764
3765 let props = ReaderProperties::builder()
3766 .set_backward_compatible_lz4(false)
3767 .build();
3768 let page_reader = Box::new(
3769 SerializedPageReader::new_with_properties(
3770 Arc::new(file),
3771 &result.metadata,
3772 result.rows_written as usize,
3773 None,
3774 Arc::new(props),
3775 )
3776 .unwrap(),
3777 );
3778 let mut reader = get_test_column_reader::<T>(page_reader, max_def_level, max_rep_level);
3779
3780 let mut actual_values = Vec::with_capacity(max_batch_size);
3781 let mut actual_def_levels = def_levels.map(|_| Vec::with_capacity(max_batch_size));
3782 let mut actual_rep_levels = rep_levels.map(|_| Vec::with_capacity(max_batch_size));
3783
3784 let (_, values_read, levels_read) = reader
3785 .read_records(
3786 max_batch_size,
3787 actual_def_levels.as_mut(),
3788 actual_rep_levels.as_mut(),
3789 &mut actual_values,
3790 )
3791 .unwrap();
3792
3793 assert_eq!(&actual_values[..values_read], values);
3796 match actual_def_levels {
3797 Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), def_levels),
3798 None => assert_eq!(None, def_levels),
3799 }
3800 match actual_rep_levels {
3801 Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), rep_levels),
3802 None => assert_eq!(None, rep_levels),
3803 }
3804
3805 if let Some(levels) = actual_rep_levels {
3808 let mut actual_rows_written = 0;
3809 for l in levels {
3810 if l == 0 {
3811 actual_rows_written += 1;
3812 }
3813 }
3814 assert_eq!(actual_rows_written, result.rows_written);
3815 } else if actual_def_levels.is_some() {
3816 assert_eq!(levels_read as u64, result.rows_written);
3817 } else {
3818 assert_eq!(values_read as u64, result.rows_written);
3819 }
3820 }
3821
3822 fn column_write_and_get_metadata<T: DataType>(
3825 props: WriterProperties,
3826 values: &[T::T],
3827 ) -> ColumnChunkMetaData {
3828 let page_writer = get_test_page_writer();
3829 let props = Arc::new(props);
3830 let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
3831 writer.write_batch(values, None, None).unwrap();
3832 writer.close().unwrap().metadata
3833 }
3834
3835 fn check_encoding_write_support<T: DataType>(
3839 version: WriterVersion,
3840 dict_enabled: bool,
3841 data: &[T::T],
3842 dictionary_page_offset: Option<i64>,
3843 encodings: &[Encoding],
3844 ) {
3845 let props = WriterProperties::builder()
3846 .set_writer_version(version)
3847 .set_dictionary_enabled(dict_enabled)
3848 .build();
3849 let meta = column_write_and_get_metadata::<T>(props, data);
3850 assert_eq!(meta.dictionary_page_offset(), dictionary_page_offset);
3851 assert_eq!(meta.encodings(), &encodings);
3852 }
3853
3854 fn get_test_column_writer<'a, T: DataType>(
3856 page_writer: Box<dyn PageWriter + 'a>,
3857 max_def_level: i16,
3858 max_rep_level: i16,
3859 props: WriterPropertiesPtr,
3860 ) -> ColumnWriterImpl<'a, T> {
3861 let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
3862 let column_writer = get_column_writer(descr, props, page_writer);
3863 get_typed_column_writer::<T>(column_writer)
3864 }
3865
3866 fn get_test_column_reader<T: DataType>(
3868 page_reader: Box<dyn PageReader>,
3869 max_def_level: i16,
3870 max_rep_level: i16,
3871 ) -> ColumnReaderImpl<T> {
3872 let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
3873 let column_reader = get_column_reader(descr, page_reader);
3874 get_typed_column_reader::<T>(column_reader)
3875 }
3876
3877 fn get_test_column_descr<T: DataType>(
3879 max_def_level: i16,
3880 max_rep_level: i16,
3881 ) -> ColumnDescriptor {
3882 let path = ColumnPath::from("col");
3883 let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
3884 .with_length(1)
3887 .build()
3888 .unwrap();
3889 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
3890 }
3891
3892 fn get_test_page_writer() -> Box<dyn PageWriter> {
3894 Box::new(TestPageWriter {})
3895 }
3896
3897 struct TestPageWriter {}
3898
3899 impl PageWriter for TestPageWriter {
3900 fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
3901 let mut res = PageWriteSpec::new();
3902 res.page_type = page.page_type();
3903 res.uncompressed_size = page.uncompressed_size();
3904 res.compressed_size = page.compressed_size();
3905 res.num_values = page.num_values();
3906 res.offset = 0;
3907 res.bytes_written = page.data().len() as u64;
3908 Ok(res)
3909 }
3910
3911 fn close(&mut self) -> Result<()> {
3912 Ok(())
3913 }
3914 }
3915
3916 fn statistics_roundtrip<T: DataType>(values: &[<T as DataType>::T]) -> Statistics {
3918 let page_writer = get_test_page_writer();
3919 let props = Default::default();
3920 let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
3921 writer.write_batch(values, None, None).unwrap();
3922
3923 let metadata = writer.close().unwrap().metadata;
3924 if let Some(stats) = metadata.statistics() {
3925 stats.clone()
3926 } else {
3927 panic!("metadata missing statistics");
3928 }
3929 }
3930
3931 fn get_test_decimals_column_writer<T: DataType>(
3933 page_writer: Box<dyn PageWriter>,
3934 max_def_level: i16,
3935 max_rep_level: i16,
3936 props: WriterPropertiesPtr,
3937 ) -> ColumnWriterImpl<'static, T> {
3938 let descr = Arc::new(get_test_decimals_column_descr::<T>(
3939 max_def_level,
3940 max_rep_level,
3941 ));
3942 let column_writer = get_column_writer(descr, props, page_writer);
3943 get_typed_column_writer::<T>(column_writer)
3944 }
3945
3946 fn get_test_decimals_column_descr<T: DataType>(
3948 max_def_level: i16,
3949 max_rep_level: i16,
3950 ) -> ColumnDescriptor {
3951 let path = ColumnPath::from("col");
3952 let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
3953 .with_length(16)
3954 .with_logical_type(Some(LogicalType::Decimal {
3955 scale: 2,
3956 precision: 3,
3957 }))
3958 .with_scale(2)
3959 .with_precision(3)
3960 .build()
3961 .unwrap();
3962 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
3963 }
3964
3965 fn float16_statistics_roundtrip(
3966 values: &[FixedLenByteArray],
3967 ) -> ValueStatistics<FixedLenByteArray> {
3968 let page_writer = get_test_page_writer();
3969 let mut writer = get_test_float16_column_writer(page_writer, Default::default());
3970 writer.write_batch(values, None, None).unwrap();
3971
3972 let metadata = writer.close().unwrap().metadata;
3973 if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
3974 stats.clone()
3975 } else {
3976 panic!("metadata missing statistics");
3977 }
3978 }
3979
3980 fn get_test_float16_column_writer(
3981 page_writer: Box<dyn PageWriter>,
3982 props: WriterPropertiesPtr,
3983 ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
3984 let descr = Arc::new(get_test_float16_column_descr(0, 0));
3985 let column_writer = get_column_writer(descr, props, page_writer);
3986 get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
3987 }
3988
3989 fn get_test_float16_column_descr(max_def_level: i16, max_rep_level: i16) -> ColumnDescriptor {
3990 let path = ColumnPath::from("col");
3991 let tpe =
3992 SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
3993 .with_length(2)
3994 .with_logical_type(Some(LogicalType::Float16))
3995 .build()
3996 .unwrap();
3997 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
3998 }
3999
4000 fn get_test_interval_column_writer(
4001 page_writer: Box<dyn PageWriter>,
4002 ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
4003 let descr = Arc::new(get_test_interval_column_descr());
4004 let column_writer = get_column_writer(descr, Default::default(), page_writer);
4005 get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
4006 }
4007
4008 fn get_test_interval_column_descr() -> ColumnDescriptor {
4009 let path = ColumnPath::from("col");
4010 let tpe =
4011 SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
4012 .with_length(12)
4013 .with_converted_type(ConvertedType::INTERVAL)
4014 .build()
4015 .unwrap();
4016 ColumnDescriptor::new(Arc::new(tpe), 0, 0, path)
4017 }
4018
4019 fn get_test_unsigned_int_given_as_converted_column_writer<'a, T: DataType>(
4021 page_writer: Box<dyn PageWriter + 'a>,
4022 max_def_level: i16,
4023 max_rep_level: i16,
4024 props: WriterPropertiesPtr,
4025 ) -> ColumnWriterImpl<'a, T> {
4026 let descr = Arc::new(get_test_converted_type_unsigned_integer_column_descr::<T>(
4027 max_def_level,
4028 max_rep_level,
4029 ));
4030 let column_writer = get_column_writer(descr, props, page_writer);
4031 get_typed_column_writer::<T>(column_writer)
4032 }
4033
4034 fn get_test_converted_type_unsigned_integer_column_descr<T: DataType>(
4036 max_def_level: i16,
4037 max_rep_level: i16,
4038 ) -> ColumnDescriptor {
4039 let path = ColumnPath::from("col");
4040 let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4041 .with_converted_type(ConvertedType::UINT_32)
4042 .build()
4043 .unwrap();
4044 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4045 }
4046}