1use bytes::Bytes;
21use half::f16;
22
23use crate::bloom_filter::Sbbf;
24use crate::format::{BoundaryOrder, ColumnIndex, OffsetIndex};
25use std::collections::{BTreeSet, VecDeque};
26use std::str;
27
28use crate::basic::{Compression, ConvertedType, Encoding, LogicalType, PageType, Type};
29use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter};
30use crate::column::writer::encoder::{ColumnValueEncoder, ColumnValueEncoderImpl, ColumnValues};
31use crate::compression::{create_codec, Codec, CodecOptionsBuilder};
32use crate::data_type::private::ParquetValueType;
33use crate::data_type::*;
34use crate::encodings::levels::LevelEncoder;
35#[cfg(feature = "encryption")]
36use crate::encryption::encrypt::get_column_crypto_metadata;
37use crate::errors::{ParquetError, Result};
38use crate::file::metadata::{
39 ColumnChunkMetaData, ColumnChunkMetaDataBuilder, ColumnIndexBuilder, LevelHistogram,
40 OffsetIndexBuilder,
41};
42use crate::file::page_encoding_stats::PageEncodingStats;
43use crate::file::properties::{
44 EnabledStatistics, WriterProperties, WriterPropertiesPtr, WriterVersion,
45};
46use crate::file::statistics::{Statistics, ValueStatistics};
47use crate::schema::types::{ColumnDescPtr, ColumnDescriptor};
48
49pub(crate) mod encoder;
50
51macro_rules! downcast_writer {
52 ($e:expr, $i:ident, $b:expr) => {
53 match $e {
54 Self::BoolColumnWriter($i) => $b,
55 Self::Int32ColumnWriter($i) => $b,
56 Self::Int64ColumnWriter($i) => $b,
57 Self::Int96ColumnWriter($i) => $b,
58 Self::FloatColumnWriter($i) => $b,
59 Self::DoubleColumnWriter($i) => $b,
60 Self::ByteArrayColumnWriter($i) => $b,
61 Self::FixedLenByteArrayColumnWriter($i) => $b,
62 }
63 };
64}
65
66pub enum ColumnWriter<'a> {
68 BoolColumnWriter(ColumnWriterImpl<'a, BoolType>),
70 Int32ColumnWriter(ColumnWriterImpl<'a, Int32Type>),
72 Int64ColumnWriter(ColumnWriterImpl<'a, Int64Type>),
74 Int96ColumnWriter(ColumnWriterImpl<'a, Int96Type>),
76 FloatColumnWriter(ColumnWriterImpl<'a, FloatType>),
78 DoubleColumnWriter(ColumnWriterImpl<'a, DoubleType>),
80 ByteArrayColumnWriter(ColumnWriterImpl<'a, ByteArrayType>),
82 FixedLenByteArrayColumnWriter(ColumnWriterImpl<'a, FixedLenByteArrayType>),
84}
85
86impl ColumnWriter<'_> {
87 #[cfg(feature = "arrow")]
89 pub(crate) fn memory_size(&self) -> usize {
90 downcast_writer!(self, typed, typed.memory_size())
91 }
92
93 #[cfg(feature = "arrow")]
95 pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
96 downcast_writer!(self, typed, typed.get_estimated_total_bytes())
97 }
98
99 pub fn close(self) -> Result<ColumnCloseResult> {
101 downcast_writer!(self, typed, typed.close())
102 }
103}
104
105#[deprecated(
106 since = "54.0.0",
107 note = "Seems like a stray and nobody knows what's it for. Will be removed in the next release."
108)]
109#[allow(missing_docs)]
110pub enum Level {
111 Page,
112 Column,
113}
114
115pub fn get_column_writer<'a>(
117 descr: ColumnDescPtr,
118 props: WriterPropertiesPtr,
119 page_writer: Box<dyn PageWriter + 'a>,
120) -> ColumnWriter<'a> {
121 match descr.physical_type() {
122 Type::BOOLEAN => {
123 ColumnWriter::BoolColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
124 }
125 Type::INT32 => {
126 ColumnWriter::Int32ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
127 }
128 Type::INT64 => {
129 ColumnWriter::Int64ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
130 }
131 Type::INT96 => {
132 ColumnWriter::Int96ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
133 }
134 Type::FLOAT => {
135 ColumnWriter::FloatColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
136 }
137 Type::DOUBLE => {
138 ColumnWriter::DoubleColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
139 }
140 Type::BYTE_ARRAY => {
141 ColumnWriter::ByteArrayColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
142 }
143 Type::FIXED_LEN_BYTE_ARRAY => ColumnWriter::FixedLenByteArrayColumnWriter(
144 ColumnWriterImpl::new(descr, props, page_writer),
145 ),
146 }
147}
148
149pub fn get_typed_column_writer<T: DataType>(col_writer: ColumnWriter) -> ColumnWriterImpl<T> {
154 T::get_column_writer(col_writer).unwrap_or_else(|| {
155 panic!(
156 "Failed to convert column writer into a typed column writer for `{}` type",
157 T::get_physical_type()
158 )
159 })
160}
161
162pub fn get_typed_column_writer_ref<'a, 'b: 'a, T: DataType>(
164 col_writer: &'b ColumnWriter<'a>,
165) -> &'b ColumnWriterImpl<'a, T> {
166 T::get_column_writer_ref(col_writer).unwrap_or_else(|| {
167 panic!(
168 "Failed to convert column writer into a typed column writer for `{}` type",
169 T::get_physical_type()
170 )
171 })
172}
173
174pub fn get_typed_column_writer_mut<'a, 'b: 'a, T: DataType>(
176 col_writer: &'a mut ColumnWriter<'b>,
177) -> &'a mut ColumnWriterImpl<'b, T> {
178 T::get_column_writer_mut(col_writer).unwrap_or_else(|| {
179 panic!(
180 "Failed to convert column writer into a typed column writer for `{}` type",
181 T::get_physical_type()
182 )
183 })
184}
185
186#[derive(Debug, Clone)]
188pub struct ColumnCloseResult {
189 pub bytes_written: u64,
191 pub rows_written: u64,
193 pub metadata: ColumnChunkMetaData,
195 pub bloom_filter: Option<Sbbf>,
197 pub column_index: Option<ColumnIndex>,
199 pub offset_index: Option<OffsetIndex>,
201}
202
203#[derive(Default)]
205struct PageMetrics {
206 num_buffered_values: u32,
207 num_buffered_rows: u32,
208 num_page_nulls: u64,
209 repetition_level_histogram: Option<LevelHistogram>,
210 definition_level_histogram: Option<LevelHistogram>,
211}
212
213impl PageMetrics {
214 fn new() -> Self {
215 Default::default()
216 }
217
218 fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
220 self.repetition_level_histogram = LevelHistogram::try_new(max_level);
221 self
222 }
223
224 fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
226 self.definition_level_histogram = LevelHistogram::try_new(max_level);
227 self
228 }
229
230 fn new_page(&mut self) {
233 self.num_buffered_values = 0;
234 self.num_buffered_rows = 0;
235 self.num_page_nulls = 0;
236 self.repetition_level_histogram
237 .as_mut()
238 .map(LevelHistogram::reset);
239 self.definition_level_histogram
240 .as_mut()
241 .map(LevelHistogram::reset);
242 }
243
244 fn update_repetition_level_histogram(&mut self, levels: &[i16]) {
246 if let Some(ref mut rep_hist) = self.repetition_level_histogram {
247 rep_hist.update_from_levels(levels);
248 }
249 }
250
251 fn update_definition_level_histogram(&mut self, levels: &[i16]) {
253 if let Some(ref mut def_hist) = self.definition_level_histogram {
254 def_hist.update_from_levels(levels);
255 }
256 }
257}
258
259#[derive(Default)]
261struct ColumnMetrics<T: Default> {
262 total_bytes_written: u64,
263 total_rows_written: u64,
264 total_uncompressed_size: u64,
265 total_compressed_size: u64,
266 total_num_values: u64,
267 dictionary_page_offset: Option<u64>,
268 data_page_offset: Option<u64>,
269 min_column_value: Option<T>,
270 max_column_value: Option<T>,
271 num_column_nulls: u64,
272 column_distinct_count: Option<u64>,
273 variable_length_bytes: Option<i64>,
274 repetition_level_histogram: Option<LevelHistogram>,
275 definition_level_histogram: Option<LevelHistogram>,
276}
277
278impl<T: Default> ColumnMetrics<T> {
279 fn new() -> Self {
280 Default::default()
281 }
282
283 fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
285 self.repetition_level_histogram = LevelHistogram::try_new(max_level);
286 self
287 }
288
289 fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
291 self.definition_level_histogram = LevelHistogram::try_new(max_level);
292 self
293 }
294
295 fn update_histogram(
297 chunk_histogram: &mut Option<LevelHistogram>,
298 page_histogram: &Option<LevelHistogram>,
299 ) {
300 if let (Some(page_hist), Some(chunk_hist)) = (page_histogram, chunk_histogram) {
301 chunk_hist.add(page_hist);
302 }
303 }
304
305 fn update_from_page_metrics(&mut self, page_metrics: &PageMetrics) {
308 ColumnMetrics::<T>::update_histogram(
309 &mut self.definition_level_histogram,
310 &page_metrics.definition_level_histogram,
311 );
312 ColumnMetrics::<T>::update_histogram(
313 &mut self.repetition_level_histogram,
314 &page_metrics.repetition_level_histogram,
315 );
316 }
317
318 fn update_variable_length_bytes(&mut self, variable_length_bytes: Option<i64>) {
320 if let Some(var_bytes) = variable_length_bytes {
321 *self.variable_length_bytes.get_or_insert(0) += var_bytes;
322 }
323 }
324}
325
326pub type ColumnWriterImpl<'a, T> = GenericColumnWriter<'a, ColumnValueEncoderImpl<T>>;
328
329pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> {
331 descr: ColumnDescPtr,
333 props: WriterPropertiesPtr,
334 statistics_enabled: EnabledStatistics,
335
336 page_writer: Box<dyn PageWriter + 'a>,
337 codec: Compression,
338 compressor: Option<Box<dyn Codec>>,
339 encoder: E,
340
341 page_metrics: PageMetrics,
342 column_metrics: ColumnMetrics<E::T>,
344
345 encodings: BTreeSet<Encoding>,
348 encoding_stats: Vec<PageEncodingStats>,
349 def_levels_sink: Vec<i16>,
351 rep_levels_sink: Vec<i16>,
352 data_pages: VecDeque<CompressedPage>,
353 column_index_builder: ColumnIndexBuilder,
355 offset_index_builder: Option<OffsetIndexBuilder>,
356
357 data_page_boundary_ascending: bool,
360 data_page_boundary_descending: bool,
361 last_non_null_data_page_min_max: Option<(E::T, E::T)>,
363}
364
365impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
366 pub fn new(
368 descr: ColumnDescPtr,
369 props: WriterPropertiesPtr,
370 page_writer: Box<dyn PageWriter + 'a>,
371 ) -> Self {
372 let codec = props.compression(descr.path());
373 let codec_options = CodecOptionsBuilder::default().build();
374 let compressor = create_codec(codec, &codec_options).unwrap();
375 let encoder = E::try_new(&descr, props.as_ref()).unwrap();
376
377 let statistics_enabled = props.statistics_enabled(descr.path());
378
379 let mut encodings = BTreeSet::new();
380 encodings.insert(Encoding::RLE);
382
383 let mut page_metrics = PageMetrics::new();
384 let mut column_metrics = ColumnMetrics::<E::T>::new();
385
386 if statistics_enabled != EnabledStatistics::None {
388 page_metrics = page_metrics
389 .with_repetition_level_histogram(descr.max_rep_level())
390 .with_definition_level_histogram(descr.max_def_level());
391 column_metrics = column_metrics
392 .with_repetition_level_histogram(descr.max_rep_level())
393 .with_definition_level_histogram(descr.max_def_level())
394 }
395
396 let mut column_index_builder = ColumnIndexBuilder::new();
398 if statistics_enabled != EnabledStatistics::Page {
399 column_index_builder.to_invalid()
400 }
401
402 let offset_index_builder = match props.offset_index_disabled() {
404 false => Some(OffsetIndexBuilder::new()),
405 _ => None,
406 };
407
408 Self {
409 descr,
410 props,
411 statistics_enabled,
412 page_writer,
413 codec,
414 compressor,
415 encoder,
416 def_levels_sink: vec![],
417 rep_levels_sink: vec![],
418 data_pages: VecDeque::new(),
419 page_metrics,
420 column_metrics,
421 column_index_builder,
422 offset_index_builder,
423 encodings,
424 encoding_stats: vec![],
425 data_page_boundary_ascending: true,
426 data_page_boundary_descending: true,
427 last_non_null_data_page_min_max: None,
428 }
429 }
430
431 #[allow(clippy::too_many_arguments)]
432 pub(crate) fn write_batch_internal(
433 &mut self,
434 values: &E::Values,
435 value_indices: Option<&[usize]>,
436 def_levels: Option<&[i16]>,
437 rep_levels: Option<&[i16]>,
438 min: Option<&E::T>,
439 max: Option<&E::T>,
440 distinct_count: Option<u64>,
441 ) -> Result<usize> {
442 if let (Some(def), Some(rep)) = (def_levels, rep_levels) {
444 if def.len() != rep.len() {
445 return Err(general_err!(
446 "Inconsistent length of definition and repetition levels: {} != {}",
447 def.len(),
448 rep.len()
449 ));
450 }
451 }
452
453 let num_levels = match def_levels {
464 Some(def_levels) => def_levels.len(),
465 None => values.len(),
466 };
467
468 if let Some(min) = min {
469 update_min(&self.descr, min, &mut self.column_metrics.min_column_value);
470 }
471 if let Some(max) = max {
472 update_max(&self.descr, max, &mut self.column_metrics.max_column_value);
473 }
474
475 if self.encoder.num_values() == 0 {
477 self.column_metrics.column_distinct_count = distinct_count;
478 } else {
479 self.column_metrics.column_distinct_count = None;
480 }
481
482 let mut values_offset = 0;
483 let mut levels_offset = 0;
484 let base_batch_size = self.props.write_batch_size();
485 while levels_offset < num_levels {
486 let mut end_offset = num_levels.min(levels_offset + base_batch_size);
487
488 if let Some(r) = rep_levels {
490 while end_offset < r.len() && r[end_offset] != 0 {
491 end_offset += 1;
492 }
493 }
494
495 values_offset += self.write_mini_batch(
496 values,
497 values_offset,
498 value_indices,
499 end_offset - levels_offset,
500 def_levels.map(|lv| &lv[levels_offset..end_offset]),
501 rep_levels.map(|lv| &lv[levels_offset..end_offset]),
502 )?;
503 levels_offset = end_offset;
504 }
505
506 Ok(values_offset)
508 }
509
510 pub fn write_batch(
523 &mut self,
524 values: &E::Values,
525 def_levels: Option<&[i16]>,
526 rep_levels: Option<&[i16]>,
527 ) -> Result<usize> {
528 self.write_batch_internal(values, None, def_levels, rep_levels, None, None, None)
529 }
530
531 pub fn write_batch_with_statistics(
539 &mut self,
540 values: &E::Values,
541 def_levels: Option<&[i16]>,
542 rep_levels: Option<&[i16]>,
543 min: Option<&E::T>,
544 max: Option<&E::T>,
545 distinct_count: Option<u64>,
546 ) -> Result<usize> {
547 self.write_batch_internal(
548 values,
549 None,
550 def_levels,
551 rep_levels,
552 min,
553 max,
554 distinct_count,
555 )
556 }
557
558 #[cfg(feature = "arrow")]
563 pub(crate) fn memory_size(&self) -> usize {
564 self.column_metrics.total_bytes_written as usize + self.encoder.estimated_memory_size()
565 }
566
567 pub fn get_total_bytes_written(&self) -> u64 {
573 self.column_metrics.total_bytes_written
574 }
575
576 #[cfg(feature = "arrow")]
582 pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
583 self.data_pages
584 .iter()
585 .map(|page| page.data().len() as u64)
586 .sum::<u64>()
587 + self.column_metrics.total_bytes_written
588 + self.encoder.estimated_data_page_size() as u64
589 + self.encoder.estimated_dict_page_size().unwrap_or_default() as u64
590 }
591
592 pub fn get_total_rows_written(&self) -> u64 {
595 self.column_metrics.total_rows_written
596 }
597
598 pub fn get_descriptor(&self) -> &ColumnDescPtr {
600 &self.descr
601 }
602
603 pub fn close(mut self) -> Result<ColumnCloseResult> {
606 if self.page_metrics.num_buffered_values > 0 {
607 self.add_data_page()?;
608 }
609 if self.encoder.has_dictionary() {
610 self.write_dictionary_page()?;
611 }
612 self.flush_data_pages()?;
613 let metadata = self.build_column_metadata()?;
614 self.page_writer.close()?;
615
616 let boundary_order = match (
617 self.data_page_boundary_ascending,
618 self.data_page_boundary_descending,
619 ) {
620 (true, _) => BoundaryOrder::ASCENDING,
623 (false, true) => BoundaryOrder::DESCENDING,
624 (false, false) => BoundaryOrder::UNORDERED,
625 };
626 self.column_index_builder.set_boundary_order(boundary_order);
627
628 let column_index = self
629 .column_index_builder
630 .valid()
631 .then(|| self.column_index_builder.build_to_thrift());
632
633 let offset_index = self.offset_index_builder.map(|b| b.build_to_thrift());
634
635 Ok(ColumnCloseResult {
636 bytes_written: self.column_metrics.total_bytes_written,
637 rows_written: self.column_metrics.total_rows_written,
638 bloom_filter: self.encoder.flush_bloom_filter(),
639 metadata,
640 column_index,
641 offset_index,
642 })
643 }
644
645 fn write_mini_batch(
649 &mut self,
650 values: &E::Values,
651 values_offset: usize,
652 value_indices: Option<&[usize]>,
653 num_levels: usize,
654 def_levels: Option<&[i16]>,
655 rep_levels: Option<&[i16]>,
656 ) -> Result<usize> {
657 let values_to_write = if self.descr.max_def_level() > 0 {
659 let levels = def_levels.ok_or_else(|| {
660 general_err!(
661 "Definition levels are required, because max definition level = {}",
662 self.descr.max_def_level()
663 )
664 })?;
665
666 let mut values_to_write = 0;
667 for &level in levels {
668 if level == self.descr.max_def_level() {
669 values_to_write += 1;
670 } else {
671 self.page_metrics.num_page_nulls += 1
673 }
674 }
675
676 self.page_metrics.update_definition_level_histogram(levels);
678
679 self.def_levels_sink.extend_from_slice(levels);
680 values_to_write
681 } else {
682 num_levels
683 };
684
685 if self.descr.max_rep_level() > 0 {
687 let levels = rep_levels.ok_or_else(|| {
689 general_err!(
690 "Repetition levels are required, because max repetition level = {}",
691 self.descr.max_rep_level()
692 )
693 })?;
694
695 if !levels.is_empty() && levels[0] != 0 {
696 return Err(general_err!(
697 "Write must start at a record boundary, got non-zero repetition level of {}",
698 levels[0]
699 ));
700 }
701
702 for &level in levels {
704 self.page_metrics.num_buffered_rows += (level == 0) as u32
705 }
706
707 self.page_metrics.update_repetition_level_histogram(levels);
709
710 self.rep_levels_sink.extend_from_slice(levels);
711 } else {
712 self.page_metrics.num_buffered_rows += num_levels as u32;
715 }
716
717 match value_indices {
718 Some(indices) => {
719 let indices = &indices[values_offset..values_offset + values_to_write];
720 self.encoder.write_gather(values, indices)?;
721 }
722 None => self.encoder.write(values, values_offset, values_to_write)?,
723 }
724
725 self.page_metrics.num_buffered_values += num_levels as u32;
726
727 if self.should_add_data_page() {
728 self.add_data_page()?;
729 }
730
731 if self.should_dict_fallback() {
732 self.dict_fallback()?;
733 }
734
735 Ok(values_to_write)
736 }
737
738 #[inline]
743 fn should_dict_fallback(&self) -> bool {
744 match self.encoder.estimated_dict_page_size() {
745 Some(size) => size >= self.props.dictionary_page_size_limit(),
746 None => false,
747 }
748 }
749
750 #[inline]
752 fn should_add_data_page(&self) -> bool {
753 if self.page_metrics.num_buffered_values == 0 {
758 return false;
759 }
760
761 self.page_metrics.num_buffered_rows as usize >= self.props.data_page_row_count_limit()
762 || self.encoder.estimated_data_page_size() >= self.props.data_page_size_limit()
763 }
764
765 fn dict_fallback(&mut self) -> Result<()> {
768 if self.page_metrics.num_buffered_values > 0 {
770 self.add_data_page()?;
771 }
772 self.write_dictionary_page()?;
773 self.flush_data_pages()?;
774 Ok(())
775 }
776
777 fn update_column_offset_index(
779 &mut self,
780 page_statistics: Option<&ValueStatistics<E::T>>,
781 page_variable_length_bytes: Option<i64>,
782 ) {
783 let null_page =
785 (self.page_metrics.num_buffered_rows as u64) == self.page_metrics.num_page_nulls;
786 if null_page && self.column_index_builder.valid() {
789 self.column_index_builder.append(
790 null_page,
791 vec![],
792 vec![],
793 self.page_metrics.num_page_nulls as i64,
794 );
795 } else if self.column_index_builder.valid() {
796 match &page_statistics {
799 None => {
800 self.column_index_builder.to_invalid();
801 }
802 Some(stat) => {
803 let new_min = stat.min_opt().unwrap();
805 let new_max = stat.max_opt().unwrap();
806 if let Some((last_min, last_max)) = &self.last_non_null_data_page_min_max {
807 if self.data_page_boundary_ascending {
808 let not_ascending = compare_greater(&self.descr, last_min, new_min)
810 || compare_greater(&self.descr, last_max, new_max);
811 if not_ascending {
812 self.data_page_boundary_ascending = false;
813 }
814 }
815
816 if self.data_page_boundary_descending {
817 let not_descending = compare_greater(&self.descr, new_min, last_min)
819 || compare_greater(&self.descr, new_max, last_max);
820 if not_descending {
821 self.data_page_boundary_descending = false;
822 }
823 }
824 }
825 self.last_non_null_data_page_min_max = Some((new_min.clone(), new_max.clone()));
826
827 if self.can_truncate_value() {
828 self.column_index_builder.append(
829 null_page,
830 self.truncate_min_value(
831 self.props.column_index_truncate_length(),
832 stat.min_bytes_opt().unwrap(),
833 )
834 .0,
835 self.truncate_max_value(
836 self.props.column_index_truncate_length(),
837 stat.max_bytes_opt().unwrap(),
838 )
839 .0,
840 self.page_metrics.num_page_nulls as i64,
841 );
842 } else {
843 self.column_index_builder.append(
844 null_page,
845 stat.min_bytes_opt().unwrap().to_vec(),
846 stat.max_bytes_opt().unwrap().to_vec(),
847 self.page_metrics.num_page_nulls as i64,
848 );
849 }
850 }
851 }
852 }
853
854 self.column_index_builder.append_histograms(
856 &self.page_metrics.repetition_level_histogram,
857 &self.page_metrics.definition_level_histogram,
858 );
859
860 if let Some(builder) = self.offset_index_builder.as_mut() {
862 builder.append_row_count(self.page_metrics.num_buffered_rows as i64);
863 builder.append_unencoded_byte_array_data_bytes(page_variable_length_bytes);
864 }
865 }
866
867 fn can_truncate_value(&self) -> bool {
869 match self.descr.physical_type() {
870 Type::FIXED_LEN_BYTE_ARRAY
874 if !matches!(
875 self.descr.logical_type(),
876 Some(LogicalType::Decimal { .. }) | Some(LogicalType::Float16)
877 ) =>
878 {
879 true
880 }
881 Type::BYTE_ARRAY => true,
882 _ => false,
884 }
885 }
886
887 fn is_utf8(&self) -> bool {
889 self.get_descriptor().logical_type() == Some(LogicalType::String)
890 || self.get_descriptor().converted_type() == ConvertedType::UTF8
891 }
892
893 fn truncate_min_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
904 truncation_length
905 .filter(|l| data.len() > *l)
906 .and_then(|l|
907 if self.is_utf8() {
909 match str::from_utf8(data) {
910 Ok(str_data) => truncate_utf8(str_data, l),
911 Err(_) => Some(data[..l].to_vec()),
912 }
913 } else {
914 Some(data[..l].to_vec())
915 }
916 )
917 .map(|truncated| (truncated, true))
918 .unwrap_or_else(|| (data.to_vec(), false))
919 }
920
921 fn truncate_max_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
935 truncation_length
936 .filter(|l| data.len() > *l)
937 .and_then(|l|
938 if self.is_utf8() {
940 match str::from_utf8(data) {
941 Ok(str_data) => truncate_and_increment_utf8(str_data, l),
942 Err(_) => increment(data[..l].to_vec()),
943 }
944 } else {
945 increment(data[..l].to_vec())
946 }
947 )
948 .map(|truncated| (truncated, true))
949 .unwrap_or_else(|| (data.to_vec(), false))
950 }
951
952 fn add_data_page(&mut self) -> Result<()> {
955 let values_data = self.encoder.flush_data_page()?;
957
958 let max_def_level = self.descr.max_def_level();
959 let max_rep_level = self.descr.max_rep_level();
960
961 self.column_metrics.num_column_nulls += self.page_metrics.num_page_nulls;
962
963 let page_statistics = match (values_data.min_value, values_data.max_value) {
964 (Some(min), Some(max)) => {
965 update_min(&self.descr, &min, &mut self.column_metrics.min_column_value);
967 update_max(&self.descr, &max, &mut self.column_metrics.max_column_value);
968
969 (self.statistics_enabled == EnabledStatistics::Page).then_some(
970 ValueStatistics::new(
971 Some(min),
972 Some(max),
973 None,
974 Some(self.page_metrics.num_page_nulls),
975 false,
976 ),
977 )
978 }
979 _ => None,
980 };
981
982 self.update_column_offset_index(
984 page_statistics.as_ref(),
985 values_data.variable_length_bytes,
986 );
987
988 self.column_metrics
990 .update_from_page_metrics(&self.page_metrics);
991 self.column_metrics
992 .update_variable_length_bytes(values_data.variable_length_bytes);
993
994 let page_statistics = page_statistics.map(Statistics::from);
995
996 let compressed_page = match self.props.writer_version() {
997 WriterVersion::PARQUET_1_0 => {
998 let mut buffer = vec![];
999
1000 if max_rep_level > 0 {
1001 buffer.extend_from_slice(
1002 &self.encode_levels_v1(
1003 Encoding::RLE,
1004 &self.rep_levels_sink[..],
1005 max_rep_level,
1006 )[..],
1007 );
1008 }
1009
1010 if max_def_level > 0 {
1011 buffer.extend_from_slice(
1012 &self.encode_levels_v1(
1013 Encoding::RLE,
1014 &self.def_levels_sink[..],
1015 max_def_level,
1016 )[..],
1017 );
1018 }
1019
1020 buffer.extend_from_slice(&values_data.buf);
1021 let uncompressed_size = buffer.len();
1022
1023 if let Some(ref mut cmpr) = self.compressor {
1024 let mut compressed_buf = Vec::with_capacity(uncompressed_size);
1025 cmpr.compress(&buffer[..], &mut compressed_buf)?;
1026 buffer = compressed_buf;
1027 }
1028
1029 let data_page = Page::DataPage {
1030 buf: buffer.into(),
1031 num_values: self.page_metrics.num_buffered_values,
1032 encoding: values_data.encoding,
1033 def_level_encoding: Encoding::RLE,
1034 rep_level_encoding: Encoding::RLE,
1035 statistics: page_statistics,
1036 };
1037
1038 CompressedPage::new(data_page, uncompressed_size)
1039 }
1040 WriterVersion::PARQUET_2_0 => {
1041 let mut rep_levels_byte_len = 0;
1042 let mut def_levels_byte_len = 0;
1043 let mut buffer = vec![];
1044
1045 if max_rep_level > 0 {
1046 let levels = self.encode_levels_v2(&self.rep_levels_sink[..], max_rep_level);
1047 rep_levels_byte_len = levels.len();
1048 buffer.extend_from_slice(&levels[..]);
1049 }
1050
1051 if max_def_level > 0 {
1052 let levels = self.encode_levels_v2(&self.def_levels_sink[..], max_def_level);
1053 def_levels_byte_len = levels.len();
1054 buffer.extend_from_slice(&levels[..]);
1055 }
1056
1057 let uncompressed_size =
1058 rep_levels_byte_len + def_levels_byte_len + values_data.buf.len();
1059
1060 match self.compressor {
1062 Some(ref mut cmpr) => {
1063 cmpr.compress(&values_data.buf, &mut buffer)?;
1064 }
1065 None => buffer.extend_from_slice(&values_data.buf),
1066 }
1067
1068 let data_page = Page::DataPageV2 {
1069 buf: buffer.into(),
1070 num_values: self.page_metrics.num_buffered_values,
1071 encoding: values_data.encoding,
1072 num_nulls: self.page_metrics.num_page_nulls as u32,
1073 num_rows: self.page_metrics.num_buffered_rows,
1074 def_levels_byte_len: def_levels_byte_len as u32,
1075 rep_levels_byte_len: rep_levels_byte_len as u32,
1076 is_compressed: self.compressor.is_some(),
1077 statistics: page_statistics,
1078 };
1079
1080 CompressedPage::new(data_page, uncompressed_size)
1081 }
1082 };
1083
1084 if self.encoder.has_dictionary() {
1086 self.data_pages.push_back(compressed_page);
1087 } else {
1088 self.write_data_page(compressed_page)?;
1089 }
1090
1091 self.column_metrics.total_rows_written += self.page_metrics.num_buffered_rows as u64;
1093
1094 self.rep_levels_sink.clear();
1096 self.def_levels_sink.clear();
1097 self.page_metrics.new_page();
1098
1099 Ok(())
1100 }
1101
1102 #[inline]
1105 fn flush_data_pages(&mut self) -> Result<()> {
1106 if self.page_metrics.num_buffered_values > 0 {
1108 self.add_data_page()?;
1109 }
1110
1111 while let Some(page) = self.data_pages.pop_front() {
1112 self.write_data_page(page)?;
1113 }
1114
1115 Ok(())
1116 }
1117
1118 fn build_column_metadata(&mut self) -> Result<ColumnChunkMetaData> {
1120 let total_compressed_size = self.column_metrics.total_compressed_size as i64;
1121 let total_uncompressed_size = self.column_metrics.total_uncompressed_size as i64;
1122 let num_values = self.column_metrics.total_num_values as i64;
1123 let dict_page_offset = self.column_metrics.dictionary_page_offset.map(|v| v as i64);
1124 let data_page_offset = self.column_metrics.data_page_offset.unwrap_or(0) as i64;
1126
1127 let mut builder = ColumnChunkMetaData::builder(self.descr.clone())
1128 .set_compression(self.codec)
1129 .set_encodings(self.encodings.iter().cloned().collect())
1130 .set_page_encoding_stats(self.encoding_stats.clone())
1131 .set_total_compressed_size(total_compressed_size)
1132 .set_total_uncompressed_size(total_uncompressed_size)
1133 .set_num_values(num_values)
1134 .set_data_page_offset(data_page_offset)
1135 .set_dictionary_page_offset(dict_page_offset);
1136
1137 if self.statistics_enabled != EnabledStatistics::None {
1138 let backwards_compatible_min_max = self.descr.sort_order().is_signed();
1139
1140 let statistics = ValueStatistics::<E::T>::new(
1141 self.column_metrics.min_column_value.clone(),
1142 self.column_metrics.max_column_value.clone(),
1143 self.column_metrics.column_distinct_count,
1144 Some(self.column_metrics.num_column_nulls),
1145 false,
1146 )
1147 .with_backwards_compatible_min_max(backwards_compatible_min_max)
1148 .into();
1149
1150 let statistics = match statistics {
1151 Statistics::ByteArray(stats) if stats._internal_has_min_max_set() => {
1152 let (min, did_truncate_min) = self.truncate_min_value(
1153 self.props.statistics_truncate_length(),
1154 stats.min_bytes_opt().unwrap(),
1155 );
1156 let (max, did_truncate_max) = self.truncate_max_value(
1157 self.props.statistics_truncate_length(),
1158 stats.max_bytes_opt().unwrap(),
1159 );
1160 Statistics::ByteArray(
1161 ValueStatistics::new(
1162 Some(min.into()),
1163 Some(max.into()),
1164 stats.distinct_count(),
1165 stats.null_count_opt(),
1166 backwards_compatible_min_max,
1167 )
1168 .with_max_is_exact(!did_truncate_max)
1169 .with_min_is_exact(!did_truncate_min),
1170 )
1171 }
1172 Statistics::FixedLenByteArray(stats)
1173 if (stats._internal_has_min_max_set() && self.can_truncate_value()) =>
1174 {
1175 let (min, did_truncate_min) = self.truncate_min_value(
1176 self.props.statistics_truncate_length(),
1177 stats.min_bytes_opt().unwrap(),
1178 );
1179 let (max, did_truncate_max) = self.truncate_max_value(
1180 self.props.statistics_truncate_length(),
1181 stats.max_bytes_opt().unwrap(),
1182 );
1183 Statistics::FixedLenByteArray(
1184 ValueStatistics::new(
1185 Some(min.into()),
1186 Some(max.into()),
1187 stats.distinct_count(),
1188 stats.null_count_opt(),
1189 backwards_compatible_min_max,
1190 )
1191 .with_max_is_exact(!did_truncate_max)
1192 .with_min_is_exact(!did_truncate_min),
1193 )
1194 }
1195 stats => stats,
1196 };
1197
1198 builder = builder
1199 .set_statistics(statistics)
1200 .set_unencoded_byte_array_data_bytes(self.column_metrics.variable_length_bytes)
1201 .set_repetition_level_histogram(
1202 self.column_metrics.repetition_level_histogram.take(),
1203 )
1204 .set_definition_level_histogram(
1205 self.column_metrics.definition_level_histogram.take(),
1206 );
1207 }
1208
1209 builder = self.set_column_chunk_encryption_properties(builder);
1210
1211 let metadata = builder.build()?;
1212 Ok(metadata)
1213 }
1214
1215 #[inline]
1217 fn encode_levels_v1(&self, encoding: Encoding, levels: &[i16], max_level: i16) -> Vec<u8> {
1218 let mut encoder = LevelEncoder::v1(encoding, max_level, levels.len());
1219 encoder.put(levels);
1220 encoder.consume()
1221 }
1222
1223 #[inline]
1226 fn encode_levels_v2(&self, levels: &[i16], max_level: i16) -> Vec<u8> {
1227 let mut encoder = LevelEncoder::v2(max_level, levels.len());
1228 encoder.put(levels);
1229 encoder.consume()
1230 }
1231
1232 #[inline]
1234 fn write_data_page(&mut self, page: CompressedPage) -> Result<()> {
1235 self.encodings.insert(page.encoding());
1236 match self.encoding_stats.last_mut() {
1237 Some(encoding_stats)
1238 if encoding_stats.page_type == page.page_type()
1239 && encoding_stats.encoding == page.encoding() =>
1240 {
1241 encoding_stats.count += 1;
1242 }
1243 _ => {
1244 self.encoding_stats.push(PageEncodingStats {
1247 page_type: page.page_type(),
1248 encoding: page.encoding(),
1249 count: 1,
1250 });
1251 }
1252 }
1253 let page_spec = self.page_writer.write_page(page)?;
1254 if let Some(builder) = self.offset_index_builder.as_mut() {
1257 builder
1258 .append_offset_and_size(page_spec.offset as i64, page_spec.compressed_size as i32)
1259 }
1260 self.update_metrics_for_page(page_spec);
1261 Ok(())
1262 }
1263
1264 #[inline]
1266 fn write_dictionary_page(&mut self) -> Result<()> {
1267 let compressed_page = {
1268 let mut page = self
1269 .encoder
1270 .flush_dict_page()?
1271 .ok_or_else(|| general_err!("Dictionary encoder is not set"))?;
1272
1273 let uncompressed_size = page.buf.len();
1274
1275 if let Some(ref mut cmpr) = self.compressor {
1276 let mut output_buf = Vec::with_capacity(uncompressed_size);
1277 cmpr.compress(&page.buf, &mut output_buf)?;
1278 page.buf = Bytes::from(output_buf);
1279 }
1280
1281 let dict_page = Page::DictionaryPage {
1282 buf: page.buf,
1283 num_values: page.num_values as u32,
1284 encoding: self.props.dictionary_page_encoding(),
1285 is_sorted: page.is_sorted,
1286 };
1287 CompressedPage::new(dict_page, uncompressed_size)
1288 };
1289
1290 self.encodings.insert(compressed_page.encoding());
1291 self.encoding_stats.push(PageEncodingStats {
1292 page_type: PageType::DICTIONARY_PAGE,
1293 encoding: compressed_page.encoding(),
1294 count: 1,
1295 });
1296 let page_spec = self.page_writer.write_page(compressed_page)?;
1297 self.update_metrics_for_page(page_spec);
1298 Ok(())
1300 }
1301
1302 #[inline]
1304 fn update_metrics_for_page(&mut self, page_spec: PageWriteSpec) {
1305 self.column_metrics.total_uncompressed_size += page_spec.uncompressed_size as u64;
1306 self.column_metrics.total_compressed_size += page_spec.compressed_size as u64;
1307 self.column_metrics.total_bytes_written += page_spec.bytes_written;
1308
1309 match page_spec.page_type {
1310 PageType::DATA_PAGE | PageType::DATA_PAGE_V2 => {
1311 self.column_metrics.total_num_values += page_spec.num_values as u64;
1312 if self.column_metrics.data_page_offset.is_none() {
1313 self.column_metrics.data_page_offset = Some(page_spec.offset);
1314 }
1315 }
1316 PageType::DICTIONARY_PAGE => {
1317 assert!(
1318 self.column_metrics.dictionary_page_offset.is_none(),
1319 "Dictionary offset is already set"
1320 );
1321 self.column_metrics.dictionary_page_offset = Some(page_spec.offset);
1322 }
1323 _ => {}
1324 }
1325 }
1326
1327 #[inline]
1328 #[cfg(feature = "encryption")]
1329 fn set_column_chunk_encryption_properties(
1330 &self,
1331 builder: ColumnChunkMetaDataBuilder,
1332 ) -> ColumnChunkMetaDataBuilder {
1333 if let Some(encryption_properties) = self.props.file_encryption_properties.as_ref() {
1334 builder.set_column_crypto_metadata(get_column_crypto_metadata(
1335 encryption_properties,
1336 &self.descr,
1337 ))
1338 } else {
1339 builder
1340 }
1341 }
1342
1343 #[inline]
1344 #[cfg(not(feature = "encryption"))]
1345 fn set_column_chunk_encryption_properties(
1346 &self,
1347 builder: ColumnChunkMetaDataBuilder,
1348 ) -> ColumnChunkMetaDataBuilder {
1349 builder
1350 }
1351}
1352
1353fn update_min<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, min: &mut Option<T>) {
1354 update_stat::<T, _>(descr, val, min, |cur| compare_greater(descr, cur, val))
1355}
1356
1357fn update_max<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, max: &mut Option<T>) {
1358 update_stat::<T, _>(descr, val, max, |cur| compare_greater(descr, val, cur))
1359}
1360
1361#[inline]
1362#[allow(clippy::eq_op)]
1363fn is_nan<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T) -> bool {
1364 match T::PHYSICAL_TYPE {
1365 Type::FLOAT | Type::DOUBLE => val != val,
1366 Type::FIXED_LEN_BYTE_ARRAY if descr.logical_type() == Some(LogicalType::Float16) => {
1367 let val = val.as_bytes();
1368 let val = f16::from_le_bytes([val[0], val[1]]);
1369 val.is_nan()
1370 }
1371 _ => false,
1372 }
1373}
1374
1375fn update_stat<T: ParquetValueType, F>(
1380 descr: &ColumnDescriptor,
1381 val: &T,
1382 cur: &mut Option<T>,
1383 should_update: F,
1384) where
1385 F: Fn(&T) -> bool,
1386{
1387 if is_nan(descr, val) {
1388 return;
1389 }
1390
1391 if cur.as_ref().map_or(true, should_update) {
1392 *cur = Some(val.clone());
1393 }
1394}
1395
1396fn compare_greater<T: ParquetValueType>(descr: &ColumnDescriptor, a: &T, b: &T) -> bool {
1398 if let Some(LogicalType::Integer { is_signed, .. }) = descr.logical_type() {
1399 if !is_signed {
1400 return a.as_u64().unwrap() > b.as_u64().unwrap();
1402 }
1403 }
1404
1405 match descr.converted_type() {
1406 ConvertedType::UINT_8
1407 | ConvertedType::UINT_16
1408 | ConvertedType::UINT_32
1409 | ConvertedType::UINT_64 => {
1410 return a.as_u64().unwrap() > b.as_u64().unwrap();
1411 }
1412 _ => {}
1413 };
1414
1415 if let Some(LogicalType::Decimal { .. }) = descr.logical_type() {
1416 match T::PHYSICAL_TYPE {
1417 Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
1418 return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1419 }
1420 _ => {}
1421 };
1422 }
1423
1424 if descr.converted_type() == ConvertedType::DECIMAL {
1425 match T::PHYSICAL_TYPE {
1426 Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
1427 return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1428 }
1429 _ => {}
1430 };
1431 };
1432
1433 if let Some(LogicalType::Float16) = descr.logical_type() {
1434 let a = a.as_bytes();
1435 let a = f16::from_le_bytes([a[0], a[1]]);
1436 let b = b.as_bytes();
1437 let b = f16::from_le_bytes([b[0], b[1]]);
1438 return a > b;
1439 }
1440
1441 a > b
1442}
1443
1444fn fallback_encoding(kind: Type, props: &WriterProperties) -> Encoding {
1452 match (kind, props.writer_version()) {
1453 (Type::BOOLEAN, WriterVersion::PARQUET_2_0) => Encoding::RLE,
1454 (Type::INT32, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1455 (Type::INT64, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1456 (Type::BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1457 (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1458 _ => Encoding::PLAIN,
1459 }
1460}
1461
1462fn has_dictionary_support(kind: Type, props: &WriterProperties) -> bool {
1464 match (kind, props.writer_version()) {
1465 (Type::BOOLEAN, _) => false,
1467 (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_1_0) => false,
1469 (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => true,
1470 _ => true,
1471 }
1472}
1473
1474fn compare_greater_byte_array_decimals(a: &[u8], b: &[u8]) -> bool {
1476 let a_length = a.len();
1477 let b_length = b.len();
1478
1479 if a_length == 0 || b_length == 0 {
1480 return a_length > 0;
1481 }
1482
1483 let first_a: u8 = a[0];
1484 let first_b: u8 = b[0];
1485
1486 if (0x80 & first_a) != (0x80 & first_b) || (a_length == b_length && first_a != first_b) {
1491 return (first_a as i8) > (first_b as i8);
1492 }
1493
1494 let extension: u8 = if (first_a as i8) < 0 { 0xFF } else { 0 };
1500
1501 if a_length != b_length {
1502 let not_equal = if a_length > b_length {
1503 let lead_length = a_length - b_length;
1504 a[0..lead_length].iter().any(|&x| x != extension)
1505 } else {
1506 let lead_length = b_length - a_length;
1507 b[0..lead_length].iter().any(|&x| x != extension)
1508 };
1509
1510 if not_equal {
1511 let negative_values: bool = (first_a as i8) < 0;
1512 let a_longer: bool = a_length > b_length;
1513 return if negative_values { !a_longer } else { a_longer };
1514 }
1515 }
1516
1517 (a[1..]) > (b[1..])
1518}
1519
1520fn truncate_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1526 let split = (1..=length).rfind(|x| data.is_char_boundary(*x))?;
1527 Some(data.as_bytes()[..split].to_vec())
1528}
1529
1530fn truncate_and_increment_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1536 let lower_bound = length.saturating_sub(3);
1538 let split = (lower_bound..=length).rfind(|x| data.is_char_boundary(*x))?;
1539 increment_utf8(data.get(..split)?)
1540}
1541
1542fn increment_utf8(data: &str) -> Option<Vec<u8>> {
1549 for (idx, original_char) in data.char_indices().rev() {
1550 let original_len = original_char.len_utf8();
1551 if let Some(next_char) = char::from_u32(original_char as u32 + 1) {
1552 if next_char.len_utf8() == original_len {
1554 let mut result = data.as_bytes()[..idx + original_len].to_vec();
1555 next_char.encode_utf8(&mut result[idx..]);
1556 return Some(result);
1557 }
1558 }
1559 }
1560
1561 None
1562}
1563
1564fn increment(mut data: Vec<u8>) -> Option<Vec<u8>> {
1568 for byte in data.iter_mut().rev() {
1569 let (incremented, overflow) = byte.overflowing_add(1);
1570 *byte = incremented;
1571
1572 if !overflow {
1573 return Some(data);
1574 }
1575 }
1576
1577 None
1578}
1579
1580#[cfg(test)]
1581mod tests {
1582 use crate::{
1583 file::{properties::DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, writer::SerializedFileWriter},
1584 schema::parser::parse_message_type,
1585 };
1586 use core::str;
1587 use rand::distr::uniform::SampleUniform;
1588 use std::{fs::File, sync::Arc};
1589
1590 use crate::column::{
1591 page::PageReader,
1592 reader::{get_column_reader, get_typed_column_reader, ColumnReaderImpl},
1593 };
1594 use crate::file::writer::TrackedWrite;
1595 use crate::file::{
1596 properties::ReaderProperties, reader::SerializedPageReader, writer::SerializedPageWriter,
1597 };
1598 use crate::schema::types::{ColumnPath, Type as SchemaType};
1599 use crate::util::test_common::rand_gen::random_numbers_range;
1600
1601 use super::*;
1602
1603 #[test]
1604 fn test_column_writer_inconsistent_def_rep_length() {
1605 let page_writer = get_test_page_writer();
1606 let props = Default::default();
1607 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 1, props);
1608 let res = writer.write_batch(&[1, 2, 3, 4], Some(&[1, 1, 1]), Some(&[0, 0]));
1609 assert!(res.is_err());
1610 if let Err(err) = res {
1611 assert_eq!(
1612 format!("{err}"),
1613 "Parquet error: Inconsistent length of definition and repetition levels: 3 != 2"
1614 );
1615 }
1616 }
1617
1618 #[test]
1619 fn test_column_writer_invalid_def_levels() {
1620 let page_writer = get_test_page_writer();
1621 let props = Default::default();
1622 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1623 let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1624 assert!(res.is_err());
1625 if let Err(err) = res {
1626 assert_eq!(
1627 format!("{err}"),
1628 "Parquet error: Definition levels are required, because max definition level = 1"
1629 );
1630 }
1631 }
1632
1633 #[test]
1634 fn test_column_writer_invalid_rep_levels() {
1635 let page_writer = get_test_page_writer();
1636 let props = Default::default();
1637 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 1, props);
1638 let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1639 assert!(res.is_err());
1640 if let Err(err) = res {
1641 assert_eq!(
1642 format!("{err}"),
1643 "Parquet error: Repetition levels are required, because max repetition level = 1"
1644 );
1645 }
1646 }
1647
1648 #[test]
1649 fn test_column_writer_not_enough_values_to_write() {
1650 let page_writer = get_test_page_writer();
1651 let props = Default::default();
1652 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1653 let res = writer.write_batch(&[1, 2], Some(&[1, 1, 1, 1]), None);
1654 assert!(res.is_err());
1655 if let Err(err) = res {
1656 assert_eq!(
1657 format!("{err}"),
1658 "Parquet error: Expected to write 4 values, but have only 2"
1659 );
1660 }
1661 }
1662
1663 #[test]
1664 fn test_column_writer_write_only_one_dictionary_page() {
1665 let page_writer = get_test_page_writer();
1666 let props = Default::default();
1667 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1668 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1669 writer.add_data_page().unwrap();
1671 writer.write_dictionary_page().unwrap();
1672 let err = writer.write_dictionary_page().unwrap_err().to_string();
1673 assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1674 }
1675
1676 #[test]
1677 fn test_column_writer_error_when_writing_disabled_dictionary() {
1678 let page_writer = get_test_page_writer();
1679 let props = Arc::new(
1680 WriterProperties::builder()
1681 .set_dictionary_enabled(false)
1682 .build(),
1683 );
1684 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1685 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1686 let err = writer.write_dictionary_page().unwrap_err().to_string();
1687 assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1688 }
1689
1690 #[test]
1691 fn test_column_writer_boolean_type_does_not_support_dictionary() {
1692 let page_writer = get_test_page_writer();
1693 let props = Arc::new(
1694 WriterProperties::builder()
1695 .set_dictionary_enabled(true)
1696 .build(),
1697 );
1698 let mut writer = get_test_column_writer::<BoolType>(page_writer, 0, 0, props);
1699 writer
1700 .write_batch(&[true, false, true, false], None, None)
1701 .unwrap();
1702
1703 let r = writer.close().unwrap();
1704 assert_eq!(r.bytes_written, 1);
1707 assert_eq!(r.rows_written, 4);
1708
1709 let metadata = r.metadata;
1710 assert_eq!(metadata.encodings(), &vec![Encoding::PLAIN, Encoding::RLE]);
1711 assert_eq!(metadata.num_values(), 4); assert_eq!(metadata.dictionary_page_offset(), None);
1713 }
1714
1715 #[test]
1716 fn test_column_writer_default_encoding_support_bool() {
1717 check_encoding_write_support::<BoolType>(
1718 WriterVersion::PARQUET_1_0,
1719 true,
1720 &[true, false],
1721 None,
1722 &[Encoding::PLAIN, Encoding::RLE],
1723 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1724 );
1725 check_encoding_write_support::<BoolType>(
1726 WriterVersion::PARQUET_1_0,
1727 false,
1728 &[true, false],
1729 None,
1730 &[Encoding::PLAIN, Encoding::RLE],
1731 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1732 );
1733 check_encoding_write_support::<BoolType>(
1734 WriterVersion::PARQUET_2_0,
1735 true,
1736 &[true, false],
1737 None,
1738 &[Encoding::RLE],
1739 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE, 1)],
1740 );
1741 check_encoding_write_support::<BoolType>(
1742 WriterVersion::PARQUET_2_0,
1743 false,
1744 &[true, false],
1745 None,
1746 &[Encoding::RLE],
1747 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE, 1)],
1748 );
1749 }
1750
1751 #[test]
1752 fn test_column_writer_default_encoding_support_int32() {
1753 check_encoding_write_support::<Int32Type>(
1754 WriterVersion::PARQUET_1_0,
1755 true,
1756 &[1, 2],
1757 Some(0),
1758 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1759 &[
1760 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1761 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1762 ],
1763 );
1764 check_encoding_write_support::<Int32Type>(
1765 WriterVersion::PARQUET_1_0,
1766 false,
1767 &[1, 2],
1768 None,
1769 &[Encoding::PLAIN, Encoding::RLE],
1770 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1771 );
1772 check_encoding_write_support::<Int32Type>(
1773 WriterVersion::PARQUET_2_0,
1774 true,
1775 &[1, 2],
1776 Some(0),
1777 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1778 &[
1779 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1780 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1781 ],
1782 );
1783 check_encoding_write_support::<Int32Type>(
1784 WriterVersion::PARQUET_2_0,
1785 false,
1786 &[1, 2],
1787 None,
1788 &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
1789 &[encoding_stats(
1790 PageType::DATA_PAGE_V2,
1791 Encoding::DELTA_BINARY_PACKED,
1792 1,
1793 )],
1794 );
1795 }
1796
1797 #[test]
1798 fn test_column_writer_default_encoding_support_int64() {
1799 check_encoding_write_support::<Int64Type>(
1800 WriterVersion::PARQUET_1_0,
1801 true,
1802 &[1, 2],
1803 Some(0),
1804 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1805 &[
1806 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1807 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1808 ],
1809 );
1810 check_encoding_write_support::<Int64Type>(
1811 WriterVersion::PARQUET_1_0,
1812 false,
1813 &[1, 2],
1814 None,
1815 &[Encoding::PLAIN, Encoding::RLE],
1816 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1817 );
1818 check_encoding_write_support::<Int64Type>(
1819 WriterVersion::PARQUET_2_0,
1820 true,
1821 &[1, 2],
1822 Some(0),
1823 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1824 &[
1825 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1826 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1827 ],
1828 );
1829 check_encoding_write_support::<Int64Type>(
1830 WriterVersion::PARQUET_2_0,
1831 false,
1832 &[1, 2],
1833 None,
1834 &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
1835 &[encoding_stats(
1836 PageType::DATA_PAGE_V2,
1837 Encoding::DELTA_BINARY_PACKED,
1838 1,
1839 )],
1840 );
1841 }
1842
1843 #[test]
1844 fn test_column_writer_default_encoding_support_int96() {
1845 check_encoding_write_support::<Int96Type>(
1846 WriterVersion::PARQUET_1_0,
1847 true,
1848 &[Int96::from(vec![1, 2, 3])],
1849 Some(0),
1850 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1851 &[
1852 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1853 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1854 ],
1855 );
1856 check_encoding_write_support::<Int96Type>(
1857 WriterVersion::PARQUET_1_0,
1858 false,
1859 &[Int96::from(vec![1, 2, 3])],
1860 None,
1861 &[Encoding::PLAIN, Encoding::RLE],
1862 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1863 );
1864 check_encoding_write_support::<Int96Type>(
1865 WriterVersion::PARQUET_2_0,
1866 true,
1867 &[Int96::from(vec![1, 2, 3])],
1868 Some(0),
1869 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1870 &[
1871 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1872 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1873 ],
1874 );
1875 check_encoding_write_support::<Int96Type>(
1876 WriterVersion::PARQUET_2_0,
1877 false,
1878 &[Int96::from(vec![1, 2, 3])],
1879 None,
1880 &[Encoding::PLAIN, Encoding::RLE],
1881 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1882 );
1883 }
1884
1885 #[test]
1886 fn test_column_writer_default_encoding_support_float() {
1887 check_encoding_write_support::<FloatType>(
1888 WriterVersion::PARQUET_1_0,
1889 true,
1890 &[1.0, 2.0],
1891 Some(0),
1892 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1893 &[
1894 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1895 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1896 ],
1897 );
1898 check_encoding_write_support::<FloatType>(
1899 WriterVersion::PARQUET_1_0,
1900 false,
1901 &[1.0, 2.0],
1902 None,
1903 &[Encoding::PLAIN, Encoding::RLE],
1904 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1905 );
1906 check_encoding_write_support::<FloatType>(
1907 WriterVersion::PARQUET_2_0,
1908 true,
1909 &[1.0, 2.0],
1910 Some(0),
1911 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1912 &[
1913 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1914 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1915 ],
1916 );
1917 check_encoding_write_support::<FloatType>(
1918 WriterVersion::PARQUET_2_0,
1919 false,
1920 &[1.0, 2.0],
1921 None,
1922 &[Encoding::PLAIN, Encoding::RLE],
1923 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1924 );
1925 }
1926
1927 #[test]
1928 fn test_column_writer_default_encoding_support_double() {
1929 check_encoding_write_support::<DoubleType>(
1930 WriterVersion::PARQUET_1_0,
1931 true,
1932 &[1.0, 2.0],
1933 Some(0),
1934 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1935 &[
1936 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1937 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1938 ],
1939 );
1940 check_encoding_write_support::<DoubleType>(
1941 WriterVersion::PARQUET_1_0,
1942 false,
1943 &[1.0, 2.0],
1944 None,
1945 &[Encoding::PLAIN, Encoding::RLE],
1946 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1947 );
1948 check_encoding_write_support::<DoubleType>(
1949 WriterVersion::PARQUET_2_0,
1950 true,
1951 &[1.0, 2.0],
1952 Some(0),
1953 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1954 &[
1955 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1956 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1957 ],
1958 );
1959 check_encoding_write_support::<DoubleType>(
1960 WriterVersion::PARQUET_2_0,
1961 false,
1962 &[1.0, 2.0],
1963 None,
1964 &[Encoding::PLAIN, Encoding::RLE],
1965 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1966 );
1967 }
1968
1969 #[test]
1970 fn test_column_writer_default_encoding_support_byte_array() {
1971 check_encoding_write_support::<ByteArrayType>(
1972 WriterVersion::PARQUET_1_0,
1973 true,
1974 &[ByteArray::from(vec![1u8])],
1975 Some(0),
1976 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1977 &[
1978 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1979 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1980 ],
1981 );
1982 check_encoding_write_support::<ByteArrayType>(
1983 WriterVersion::PARQUET_1_0,
1984 false,
1985 &[ByteArray::from(vec![1u8])],
1986 None,
1987 &[Encoding::PLAIN, Encoding::RLE],
1988 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1989 );
1990 check_encoding_write_support::<ByteArrayType>(
1991 WriterVersion::PARQUET_2_0,
1992 true,
1993 &[ByteArray::from(vec![1u8])],
1994 Some(0),
1995 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1996 &[
1997 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1998 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1999 ],
2000 );
2001 check_encoding_write_support::<ByteArrayType>(
2002 WriterVersion::PARQUET_2_0,
2003 false,
2004 &[ByteArray::from(vec![1u8])],
2005 None,
2006 &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
2007 &[encoding_stats(
2008 PageType::DATA_PAGE_V2,
2009 Encoding::DELTA_BYTE_ARRAY,
2010 1,
2011 )],
2012 );
2013 }
2014
2015 #[test]
2016 fn test_column_writer_default_encoding_support_fixed_len_byte_array() {
2017 check_encoding_write_support::<FixedLenByteArrayType>(
2018 WriterVersion::PARQUET_1_0,
2019 true,
2020 &[ByteArray::from(vec![1u8]).into()],
2021 None,
2022 &[Encoding::PLAIN, Encoding::RLE],
2023 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2024 );
2025 check_encoding_write_support::<FixedLenByteArrayType>(
2026 WriterVersion::PARQUET_1_0,
2027 false,
2028 &[ByteArray::from(vec![1u8]).into()],
2029 None,
2030 &[Encoding::PLAIN, Encoding::RLE],
2031 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2032 );
2033 check_encoding_write_support::<FixedLenByteArrayType>(
2034 WriterVersion::PARQUET_2_0,
2035 true,
2036 &[ByteArray::from(vec![1u8]).into()],
2037 Some(0),
2038 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2039 &[
2040 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2041 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2042 ],
2043 );
2044 check_encoding_write_support::<FixedLenByteArrayType>(
2045 WriterVersion::PARQUET_2_0,
2046 false,
2047 &[ByteArray::from(vec![1u8]).into()],
2048 None,
2049 &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
2050 &[encoding_stats(
2051 PageType::DATA_PAGE_V2,
2052 Encoding::DELTA_BYTE_ARRAY,
2053 1,
2054 )],
2055 );
2056 }
2057
2058 #[test]
2059 fn test_column_writer_check_metadata() {
2060 let page_writer = get_test_page_writer();
2061 let props = Default::default();
2062 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2063 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2064
2065 let r = writer.close().unwrap();
2066 assert_eq!(r.bytes_written, 20);
2067 assert_eq!(r.rows_written, 4);
2068
2069 let metadata = r.metadata;
2070 assert_eq!(
2071 metadata.encodings(),
2072 &vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
2073 );
2074 assert_eq!(metadata.num_values(), 4);
2075 assert_eq!(metadata.compressed_size(), 20);
2076 assert_eq!(metadata.uncompressed_size(), 20);
2077 assert_eq!(metadata.data_page_offset(), 0);
2078 assert_eq!(metadata.dictionary_page_offset(), Some(0));
2079 if let Some(stats) = metadata.statistics() {
2080 assert_eq!(stats.null_count_opt(), Some(0));
2081 assert_eq!(stats.distinct_count_opt(), None);
2082 if let Statistics::Int32(stats) = stats {
2083 assert_eq!(stats.min_opt().unwrap(), &1);
2084 assert_eq!(stats.max_opt().unwrap(), &4);
2085 } else {
2086 panic!("expecting Statistics::Int32");
2087 }
2088 } else {
2089 panic!("metadata missing statistics");
2090 }
2091 }
2092
2093 #[test]
2094 fn test_column_writer_check_byte_array_min_max() {
2095 let page_writer = get_test_page_writer();
2096 let props = Default::default();
2097 let mut writer = get_test_decimals_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
2098 writer
2099 .write_batch(
2100 &[
2101 ByteArray::from(vec![
2102 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
2103 35u8, 231u8, 90u8, 0u8, 0u8,
2104 ]),
2105 ByteArray::from(vec![
2106 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 228u8, 62u8, 146u8,
2107 152u8, 177u8, 56u8, 0u8, 0u8,
2108 ]),
2109 ByteArray::from(vec![
2110 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8,
2111 0u8,
2112 ]),
2113 ByteArray::from(vec![
2114 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
2115 44u8, 0u8, 0u8,
2116 ]),
2117 ],
2118 None,
2119 None,
2120 )
2121 .unwrap();
2122 let metadata = writer.close().unwrap().metadata;
2123 if let Some(stats) = metadata.statistics() {
2124 if let Statistics::ByteArray(stats) = stats {
2125 assert_eq!(
2126 stats.min_opt().unwrap(),
2127 &ByteArray::from(vec![
2128 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
2129 35u8, 231u8, 90u8, 0u8, 0u8,
2130 ])
2131 );
2132 assert_eq!(
2133 stats.max_opt().unwrap(),
2134 &ByteArray::from(vec![
2135 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
2136 44u8, 0u8, 0u8,
2137 ])
2138 );
2139 } else {
2140 panic!("expecting Statistics::ByteArray");
2141 }
2142 } else {
2143 panic!("metadata missing statistics");
2144 }
2145 }
2146
2147 #[test]
2148 fn test_column_writer_uint32_converted_type_min_max() {
2149 let page_writer = get_test_page_writer();
2150 let props = Default::default();
2151 let mut writer = get_test_unsigned_int_given_as_converted_column_writer::<Int32Type>(
2152 page_writer,
2153 0,
2154 0,
2155 props,
2156 );
2157 writer.write_batch(&[0, 1, 2, 3, 4, 5], None, None).unwrap();
2158 let metadata = writer.close().unwrap().metadata;
2159 if let Some(stats) = metadata.statistics() {
2160 if let Statistics::Int32(stats) = stats {
2161 assert_eq!(stats.min_opt().unwrap(), &0,);
2162 assert_eq!(stats.max_opt().unwrap(), &5,);
2163 } else {
2164 panic!("expecting Statistics::Int32");
2165 }
2166 } else {
2167 panic!("metadata missing statistics");
2168 }
2169 }
2170
2171 #[test]
2172 fn test_column_writer_precalculated_statistics() {
2173 let page_writer = get_test_page_writer();
2174 let props = Arc::new(
2175 WriterProperties::builder()
2176 .set_statistics_enabled(EnabledStatistics::Chunk)
2177 .build(),
2178 );
2179 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2180 writer
2181 .write_batch_with_statistics(
2182 &[1, 2, 3, 4],
2183 None,
2184 None,
2185 Some(&-17),
2186 Some(&9000),
2187 Some(55),
2188 )
2189 .unwrap();
2190
2191 let r = writer.close().unwrap();
2192 assert_eq!(r.bytes_written, 20);
2193 assert_eq!(r.rows_written, 4);
2194
2195 let metadata = r.metadata;
2196 assert_eq!(
2197 metadata.encodings(),
2198 &vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
2199 );
2200 assert_eq!(metadata.num_values(), 4);
2201 assert_eq!(metadata.compressed_size(), 20);
2202 assert_eq!(metadata.uncompressed_size(), 20);
2203 assert_eq!(metadata.data_page_offset(), 0);
2204 assert_eq!(metadata.dictionary_page_offset(), Some(0));
2205 if let Some(stats) = metadata.statistics() {
2206 assert_eq!(stats.null_count_opt(), Some(0));
2207 assert_eq!(stats.distinct_count_opt().unwrap_or(0), 55);
2208 if let Statistics::Int32(stats) = stats {
2209 assert_eq!(stats.min_opt().unwrap(), &-17);
2210 assert_eq!(stats.max_opt().unwrap(), &9000);
2211 } else {
2212 panic!("expecting Statistics::Int32");
2213 }
2214 } else {
2215 panic!("metadata missing statistics");
2216 }
2217 }
2218
2219 #[test]
2220 fn test_mixed_precomputed_statistics() {
2221 let mut buf = Vec::with_capacity(100);
2222 let mut write = TrackedWrite::new(&mut buf);
2223 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2224 let props = Default::default();
2225 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2226
2227 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2228 writer
2229 .write_batch_with_statistics(&[5, 6, 7], None, None, Some(&5), Some(&7), Some(3))
2230 .unwrap();
2231
2232 let r = writer.close().unwrap();
2233
2234 let stats = r.metadata.statistics().unwrap();
2235 assert_eq!(stats.min_bytes_opt().unwrap(), 1_i32.to_le_bytes());
2236 assert_eq!(stats.max_bytes_opt().unwrap(), 7_i32.to_le_bytes());
2237 assert_eq!(stats.null_count_opt(), Some(0));
2238 assert!(stats.distinct_count_opt().is_none());
2239
2240 drop(write);
2241
2242 let props = ReaderProperties::builder()
2243 .set_backward_compatible_lz4(false)
2244 .build();
2245 let reader = SerializedPageReader::new_with_properties(
2246 Arc::new(Bytes::from(buf)),
2247 &r.metadata,
2248 r.rows_written as usize,
2249 None,
2250 Arc::new(props),
2251 )
2252 .unwrap();
2253
2254 let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2255 assert_eq!(pages.len(), 2);
2256
2257 assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2258 assert_eq!(pages[1].page_type(), PageType::DATA_PAGE);
2259
2260 let page_statistics = pages[1].statistics().unwrap();
2261 assert_eq!(
2262 page_statistics.min_bytes_opt().unwrap(),
2263 1_i32.to_le_bytes()
2264 );
2265 assert_eq!(
2266 page_statistics.max_bytes_opt().unwrap(),
2267 7_i32.to_le_bytes()
2268 );
2269 assert_eq!(page_statistics.null_count_opt(), Some(0));
2270 assert!(page_statistics.distinct_count_opt().is_none());
2271 }
2272
2273 #[test]
2274 fn test_disabled_statistics() {
2275 let mut buf = Vec::with_capacity(100);
2276 let mut write = TrackedWrite::new(&mut buf);
2277 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2278 let props = WriterProperties::builder()
2279 .set_statistics_enabled(EnabledStatistics::None)
2280 .set_writer_version(WriterVersion::PARQUET_2_0)
2281 .build();
2282 let props = Arc::new(props);
2283
2284 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
2285 writer
2286 .write_batch(&[1, 2, 3, 4], Some(&[1, 0, 0, 1, 1, 1]), None)
2287 .unwrap();
2288
2289 let r = writer.close().unwrap();
2290 assert!(r.metadata.statistics().is_none());
2291
2292 drop(write);
2293
2294 let props = ReaderProperties::builder()
2295 .set_backward_compatible_lz4(false)
2296 .build();
2297 let reader = SerializedPageReader::new_with_properties(
2298 Arc::new(Bytes::from(buf)),
2299 &r.metadata,
2300 r.rows_written as usize,
2301 None,
2302 Arc::new(props),
2303 )
2304 .unwrap();
2305
2306 let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2307 assert_eq!(pages.len(), 2);
2308
2309 assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2310 assert_eq!(pages[1].page_type(), PageType::DATA_PAGE_V2);
2311
2312 match &pages[1] {
2313 Page::DataPageV2 {
2314 num_values,
2315 num_nulls,
2316 num_rows,
2317 statistics,
2318 ..
2319 } => {
2320 assert_eq!(*num_values, 6);
2321 assert_eq!(*num_nulls, 2);
2322 assert_eq!(*num_rows, 6);
2323 assert!(statistics.is_none());
2324 }
2325 _ => unreachable!(),
2326 }
2327 }
2328
2329 #[test]
2330 fn test_column_writer_empty_column_roundtrip() {
2331 let props = Default::default();
2332 column_roundtrip::<Int32Type>(props, &[], None, None);
2333 }
2334
2335 #[test]
2336 fn test_column_writer_non_nullable_values_roundtrip() {
2337 let props = Default::default();
2338 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 0, 0);
2339 }
2340
2341 #[test]
2342 fn test_column_writer_nullable_non_repeated_values_roundtrip() {
2343 let props = Default::default();
2344 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 0);
2345 }
2346
2347 #[test]
2348 fn test_column_writer_nullable_repeated_values_roundtrip() {
2349 let props = Default::default();
2350 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2351 }
2352
2353 #[test]
2354 fn test_column_writer_dictionary_fallback_small_data_page() {
2355 let props = WriterProperties::builder()
2356 .set_dictionary_page_size_limit(32)
2357 .set_data_page_size_limit(32)
2358 .build();
2359 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2360 }
2361
2362 #[test]
2363 fn test_column_writer_small_write_batch_size() {
2364 for i in &[1usize, 2, 5, 10, 11, 1023] {
2365 let props = WriterProperties::builder().set_write_batch_size(*i).build();
2366
2367 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2368 }
2369 }
2370
2371 #[test]
2372 fn test_column_writer_dictionary_disabled_v1() {
2373 let props = WriterProperties::builder()
2374 .set_writer_version(WriterVersion::PARQUET_1_0)
2375 .set_dictionary_enabled(false)
2376 .build();
2377 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2378 }
2379
2380 #[test]
2381 fn test_column_writer_dictionary_disabled_v2() {
2382 let props = WriterProperties::builder()
2383 .set_writer_version(WriterVersion::PARQUET_2_0)
2384 .set_dictionary_enabled(false)
2385 .build();
2386 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2387 }
2388
2389 #[test]
2390 fn test_column_writer_compression_v1() {
2391 let props = WriterProperties::builder()
2392 .set_writer_version(WriterVersion::PARQUET_1_0)
2393 .set_compression(Compression::SNAPPY)
2394 .build();
2395 column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2396 }
2397
2398 #[test]
2399 fn test_column_writer_compression_v2() {
2400 let props = WriterProperties::builder()
2401 .set_writer_version(WriterVersion::PARQUET_2_0)
2402 .set_compression(Compression::SNAPPY)
2403 .build();
2404 column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2405 }
2406
2407 #[test]
2408 fn test_column_writer_add_data_pages_with_dict() {
2409 let mut file = tempfile::tempfile().unwrap();
2412 let mut write = TrackedWrite::new(&mut file);
2413 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2414 let props = Arc::new(
2415 WriterProperties::builder()
2416 .set_data_page_size_limit(10)
2417 .set_write_batch_size(3) .build(),
2419 );
2420 let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
2421 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2422 writer.write_batch(data, None, None).unwrap();
2423 let r = writer.close().unwrap();
2424
2425 drop(write);
2426
2427 let props = ReaderProperties::builder()
2429 .set_backward_compatible_lz4(false)
2430 .build();
2431 let mut page_reader = Box::new(
2432 SerializedPageReader::new_with_properties(
2433 Arc::new(file),
2434 &r.metadata,
2435 r.rows_written as usize,
2436 None,
2437 Arc::new(props),
2438 )
2439 .unwrap(),
2440 );
2441 let mut res = Vec::new();
2442 while let Some(page) = page_reader.get_next_page().unwrap() {
2443 res.push((page.page_type(), page.num_values(), page.buffer().len()));
2444 }
2445 assert_eq!(
2446 res,
2447 vec![
2448 (PageType::DICTIONARY_PAGE, 10, 40),
2449 (PageType::DATA_PAGE, 9, 10),
2450 (PageType::DATA_PAGE, 1, 3),
2451 ]
2452 );
2453 assert_eq!(
2454 r.metadata.page_encoding_stats(),
2455 Some(&vec![
2456 PageEncodingStats {
2457 page_type: PageType::DICTIONARY_PAGE,
2458 encoding: Encoding::PLAIN,
2459 count: 1
2460 },
2461 PageEncodingStats {
2462 page_type: PageType::DATA_PAGE,
2463 encoding: Encoding::RLE_DICTIONARY,
2464 count: 2,
2465 }
2466 ])
2467 );
2468 }
2469
2470 #[test]
2471 fn test_bool_statistics() {
2472 let stats = statistics_roundtrip::<BoolType>(&[true, false, false, true]);
2473 assert!(!stats.is_min_max_backwards_compatible());
2476 if let Statistics::Boolean(stats) = stats {
2477 assert_eq!(stats.min_opt().unwrap(), &false);
2478 assert_eq!(stats.max_opt().unwrap(), &true);
2479 } else {
2480 panic!("expecting Statistics::Boolean, got {stats:?}");
2481 }
2482 }
2483
2484 #[test]
2485 fn test_int32_statistics() {
2486 let stats = statistics_roundtrip::<Int32Type>(&[-1, 3, -2, 2]);
2487 assert!(stats.is_min_max_backwards_compatible());
2488 if let Statistics::Int32(stats) = stats {
2489 assert_eq!(stats.min_opt().unwrap(), &-2);
2490 assert_eq!(stats.max_opt().unwrap(), &3);
2491 } else {
2492 panic!("expecting Statistics::Int32, got {stats:?}");
2493 }
2494 }
2495
2496 #[test]
2497 fn test_int64_statistics() {
2498 let stats = statistics_roundtrip::<Int64Type>(&[-1, 3, -2, 2]);
2499 assert!(stats.is_min_max_backwards_compatible());
2500 if let Statistics::Int64(stats) = stats {
2501 assert_eq!(stats.min_opt().unwrap(), &-2);
2502 assert_eq!(stats.max_opt().unwrap(), &3);
2503 } else {
2504 panic!("expecting Statistics::Int64, got {stats:?}");
2505 }
2506 }
2507
2508 #[test]
2509 fn test_int96_statistics() {
2510 let input = vec![
2511 Int96::from(vec![1, 20, 30]),
2512 Int96::from(vec![3, 20, 10]),
2513 Int96::from(vec![0, 20, 30]),
2514 Int96::from(vec![2, 20, 30]),
2515 ]
2516 .into_iter()
2517 .collect::<Vec<Int96>>();
2518
2519 let stats = statistics_roundtrip::<Int96Type>(&input);
2520 assert!(!stats.is_min_max_backwards_compatible());
2521 if let Statistics::Int96(stats) = stats {
2522 assert_eq!(stats.min_opt().unwrap(), &Int96::from(vec![0, 20, 30]));
2523 assert_eq!(stats.max_opt().unwrap(), &Int96::from(vec![3, 20, 10]));
2524 } else {
2525 panic!("expecting Statistics::Int96, got {stats:?}");
2526 }
2527 }
2528
2529 #[test]
2530 fn test_float_statistics() {
2531 let stats = statistics_roundtrip::<FloatType>(&[-1.0, 3.0, -2.0, 2.0]);
2532 assert!(stats.is_min_max_backwards_compatible());
2533 if let Statistics::Float(stats) = stats {
2534 assert_eq!(stats.min_opt().unwrap(), &-2.0);
2535 assert_eq!(stats.max_opt().unwrap(), &3.0);
2536 } else {
2537 panic!("expecting Statistics::Float, got {stats:?}");
2538 }
2539 }
2540
2541 #[test]
2542 fn test_double_statistics() {
2543 let stats = statistics_roundtrip::<DoubleType>(&[-1.0, 3.0, -2.0, 2.0]);
2544 assert!(stats.is_min_max_backwards_compatible());
2545 if let Statistics::Double(stats) = stats {
2546 assert_eq!(stats.min_opt().unwrap(), &-2.0);
2547 assert_eq!(stats.max_opt().unwrap(), &3.0);
2548 } else {
2549 panic!("expecting Statistics::Double, got {stats:?}");
2550 }
2551 }
2552
2553 #[test]
2554 fn test_byte_array_statistics() {
2555 let input = ["aawaa", "zz", "aaw", "m", "qrs"]
2556 .iter()
2557 .map(|&s| s.into())
2558 .collect::<Vec<_>>();
2559
2560 let stats = statistics_roundtrip::<ByteArrayType>(&input);
2561 assert!(!stats.is_min_max_backwards_compatible());
2562 if let Statistics::ByteArray(stats) = stats {
2563 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from("aaw"));
2564 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from("zz"));
2565 } else {
2566 panic!("expecting Statistics::ByteArray, got {stats:?}");
2567 }
2568 }
2569
2570 #[test]
2571 fn test_fixed_len_byte_array_statistics() {
2572 let input = ["aawaa", "zz ", "aaw ", "m ", "qrs "]
2573 .iter()
2574 .map(|&s| ByteArray::from(s).into())
2575 .collect::<Vec<_>>();
2576
2577 let stats = statistics_roundtrip::<FixedLenByteArrayType>(&input);
2578 assert!(!stats.is_min_max_backwards_compatible());
2579 if let Statistics::FixedLenByteArray(stats) = stats {
2580 let expected_min: FixedLenByteArray = ByteArray::from("aaw ").into();
2581 assert_eq!(stats.min_opt().unwrap(), &expected_min);
2582 let expected_max: FixedLenByteArray = ByteArray::from("zz ").into();
2583 assert_eq!(stats.max_opt().unwrap(), &expected_max);
2584 } else {
2585 panic!("expecting Statistics::FixedLenByteArray, got {stats:?}");
2586 }
2587 }
2588
2589 #[test]
2590 fn test_column_writer_check_float16_min_max() {
2591 let input = [
2592 -f16::ONE,
2593 f16::from_f32(3.0),
2594 -f16::from_f32(2.0),
2595 f16::from_f32(2.0),
2596 ]
2597 .into_iter()
2598 .map(|s| ByteArray::from(s).into())
2599 .collect::<Vec<_>>();
2600
2601 let stats = float16_statistics_roundtrip(&input);
2602 assert!(stats.is_min_max_backwards_compatible());
2603 assert_eq!(
2604 stats.min_opt().unwrap(),
2605 &ByteArray::from(-f16::from_f32(2.0))
2606 );
2607 assert_eq!(
2608 stats.max_opt().unwrap(),
2609 &ByteArray::from(f16::from_f32(3.0))
2610 );
2611 }
2612
2613 #[test]
2614 fn test_column_writer_check_float16_nan_middle() {
2615 let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
2616 .into_iter()
2617 .map(|s| ByteArray::from(s).into())
2618 .collect::<Vec<_>>();
2619
2620 let stats = float16_statistics_roundtrip(&input);
2621 assert!(stats.is_min_max_backwards_compatible());
2622 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2623 assert_eq!(
2624 stats.max_opt().unwrap(),
2625 &ByteArray::from(f16::ONE + f16::ONE)
2626 );
2627 }
2628
2629 #[test]
2630 fn test_float16_statistics_nan_middle() {
2631 let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
2632 .into_iter()
2633 .map(|s| ByteArray::from(s).into())
2634 .collect::<Vec<_>>();
2635
2636 let stats = float16_statistics_roundtrip(&input);
2637 assert!(stats.is_min_max_backwards_compatible());
2638 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2639 assert_eq!(
2640 stats.max_opt().unwrap(),
2641 &ByteArray::from(f16::ONE + f16::ONE)
2642 );
2643 }
2644
2645 #[test]
2646 fn test_float16_statistics_nan_start() {
2647 let input = [f16::NAN, f16::ONE, f16::ONE + f16::ONE]
2648 .into_iter()
2649 .map(|s| ByteArray::from(s).into())
2650 .collect::<Vec<_>>();
2651
2652 let stats = float16_statistics_roundtrip(&input);
2653 assert!(stats.is_min_max_backwards_compatible());
2654 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2655 assert_eq!(
2656 stats.max_opt().unwrap(),
2657 &ByteArray::from(f16::ONE + f16::ONE)
2658 );
2659 }
2660
2661 #[test]
2662 fn test_float16_statistics_nan_only() {
2663 let input = [f16::NAN, f16::NAN]
2664 .into_iter()
2665 .map(|s| ByteArray::from(s).into())
2666 .collect::<Vec<_>>();
2667
2668 let stats = float16_statistics_roundtrip(&input);
2669 assert!(stats.min_bytes_opt().is_none());
2670 assert!(stats.max_bytes_opt().is_none());
2671 assert!(stats.is_min_max_backwards_compatible());
2672 }
2673
2674 #[test]
2675 fn test_float16_statistics_zero_only() {
2676 let input = [f16::ZERO]
2677 .into_iter()
2678 .map(|s| ByteArray::from(s).into())
2679 .collect::<Vec<_>>();
2680
2681 let stats = float16_statistics_roundtrip(&input);
2682 assert!(stats.is_min_max_backwards_compatible());
2683 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2684 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2685 }
2686
2687 #[test]
2688 fn test_float16_statistics_neg_zero_only() {
2689 let input = [f16::NEG_ZERO]
2690 .into_iter()
2691 .map(|s| ByteArray::from(s).into())
2692 .collect::<Vec<_>>();
2693
2694 let stats = float16_statistics_roundtrip(&input);
2695 assert!(stats.is_min_max_backwards_compatible());
2696 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2697 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2698 }
2699
2700 #[test]
2701 fn test_float16_statistics_zero_min() {
2702 let input = [f16::ZERO, f16::ONE, f16::NAN, f16::PI]
2703 .into_iter()
2704 .map(|s| ByteArray::from(s).into())
2705 .collect::<Vec<_>>();
2706
2707 let stats = float16_statistics_roundtrip(&input);
2708 assert!(stats.is_min_max_backwards_compatible());
2709 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2710 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::PI));
2711 }
2712
2713 #[test]
2714 fn test_float16_statistics_neg_zero_max() {
2715 let input = [f16::NEG_ZERO, f16::NEG_ONE, f16::NAN, -f16::PI]
2716 .into_iter()
2717 .map(|s| ByteArray::from(s).into())
2718 .collect::<Vec<_>>();
2719
2720 let stats = float16_statistics_roundtrip(&input);
2721 assert!(stats.is_min_max_backwards_compatible());
2722 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(-f16::PI));
2723 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2724 }
2725
2726 #[test]
2727 fn test_float_statistics_nan_middle() {
2728 let stats = statistics_roundtrip::<FloatType>(&[1.0, f32::NAN, 2.0]);
2729 assert!(stats.is_min_max_backwards_compatible());
2730 if let Statistics::Float(stats) = stats {
2731 assert_eq!(stats.min_opt().unwrap(), &1.0);
2732 assert_eq!(stats.max_opt().unwrap(), &2.0);
2733 } else {
2734 panic!("expecting Statistics::Float");
2735 }
2736 }
2737
2738 #[test]
2739 fn test_float_statistics_nan_start() {
2740 let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, 1.0, 2.0]);
2741 assert!(stats.is_min_max_backwards_compatible());
2742 if let Statistics::Float(stats) = stats {
2743 assert_eq!(stats.min_opt().unwrap(), &1.0);
2744 assert_eq!(stats.max_opt().unwrap(), &2.0);
2745 } else {
2746 panic!("expecting Statistics::Float");
2747 }
2748 }
2749
2750 #[test]
2751 fn test_float_statistics_nan_only() {
2752 let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, f32::NAN]);
2753 assert!(stats.min_bytes_opt().is_none());
2754 assert!(stats.max_bytes_opt().is_none());
2755 assert!(stats.is_min_max_backwards_compatible());
2756 assert!(matches!(stats, Statistics::Float(_)));
2757 }
2758
2759 #[test]
2760 fn test_float_statistics_zero_only() {
2761 let stats = statistics_roundtrip::<FloatType>(&[0.0]);
2762 assert!(stats.is_min_max_backwards_compatible());
2763 if let Statistics::Float(stats) = stats {
2764 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2765 assert!(stats.min_opt().unwrap().is_sign_negative());
2766 assert_eq!(stats.max_opt().unwrap(), &0.0);
2767 assert!(stats.max_opt().unwrap().is_sign_positive());
2768 } else {
2769 panic!("expecting Statistics::Float");
2770 }
2771 }
2772
2773 #[test]
2774 fn test_float_statistics_neg_zero_only() {
2775 let stats = statistics_roundtrip::<FloatType>(&[-0.0]);
2776 assert!(stats.is_min_max_backwards_compatible());
2777 if let Statistics::Float(stats) = stats {
2778 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2779 assert!(stats.min_opt().unwrap().is_sign_negative());
2780 assert_eq!(stats.max_opt().unwrap(), &0.0);
2781 assert!(stats.max_opt().unwrap().is_sign_positive());
2782 } else {
2783 panic!("expecting Statistics::Float");
2784 }
2785 }
2786
2787 #[test]
2788 fn test_float_statistics_zero_min() {
2789 let stats = statistics_roundtrip::<FloatType>(&[0.0, 1.0, f32::NAN, 2.0]);
2790 assert!(stats.is_min_max_backwards_compatible());
2791 if let Statistics::Float(stats) = stats {
2792 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2793 assert!(stats.min_opt().unwrap().is_sign_negative());
2794 assert_eq!(stats.max_opt().unwrap(), &2.0);
2795 } else {
2796 panic!("expecting Statistics::Float");
2797 }
2798 }
2799
2800 #[test]
2801 fn test_float_statistics_neg_zero_max() {
2802 let stats = statistics_roundtrip::<FloatType>(&[-0.0, -1.0, f32::NAN, -2.0]);
2803 assert!(stats.is_min_max_backwards_compatible());
2804 if let Statistics::Float(stats) = stats {
2805 assert_eq!(stats.min_opt().unwrap(), &-2.0);
2806 assert_eq!(stats.max_opt().unwrap(), &0.0);
2807 assert!(stats.max_opt().unwrap().is_sign_positive());
2808 } else {
2809 panic!("expecting Statistics::Float");
2810 }
2811 }
2812
2813 #[test]
2814 fn test_double_statistics_nan_middle() {
2815 let stats = statistics_roundtrip::<DoubleType>(&[1.0, f64::NAN, 2.0]);
2816 assert!(stats.is_min_max_backwards_compatible());
2817 if let Statistics::Double(stats) = stats {
2818 assert_eq!(stats.min_opt().unwrap(), &1.0);
2819 assert_eq!(stats.max_opt().unwrap(), &2.0);
2820 } else {
2821 panic!("expecting Statistics::Double");
2822 }
2823 }
2824
2825 #[test]
2826 fn test_double_statistics_nan_start() {
2827 let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, 1.0, 2.0]);
2828 assert!(stats.is_min_max_backwards_compatible());
2829 if let Statistics::Double(stats) = stats {
2830 assert_eq!(stats.min_opt().unwrap(), &1.0);
2831 assert_eq!(stats.max_opt().unwrap(), &2.0);
2832 } else {
2833 panic!("expecting Statistics::Double");
2834 }
2835 }
2836
2837 #[test]
2838 fn test_double_statistics_nan_only() {
2839 let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, f64::NAN]);
2840 assert!(stats.min_bytes_opt().is_none());
2841 assert!(stats.max_bytes_opt().is_none());
2842 assert!(matches!(stats, Statistics::Double(_)));
2843 assert!(stats.is_min_max_backwards_compatible());
2844 }
2845
2846 #[test]
2847 fn test_double_statistics_zero_only() {
2848 let stats = statistics_roundtrip::<DoubleType>(&[0.0]);
2849 assert!(stats.is_min_max_backwards_compatible());
2850 if let Statistics::Double(stats) = stats {
2851 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2852 assert!(stats.min_opt().unwrap().is_sign_negative());
2853 assert_eq!(stats.max_opt().unwrap(), &0.0);
2854 assert!(stats.max_opt().unwrap().is_sign_positive());
2855 } else {
2856 panic!("expecting Statistics::Double");
2857 }
2858 }
2859
2860 #[test]
2861 fn test_double_statistics_neg_zero_only() {
2862 let stats = statistics_roundtrip::<DoubleType>(&[-0.0]);
2863 assert!(stats.is_min_max_backwards_compatible());
2864 if let Statistics::Double(stats) = stats {
2865 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2866 assert!(stats.min_opt().unwrap().is_sign_negative());
2867 assert_eq!(stats.max_opt().unwrap(), &0.0);
2868 assert!(stats.max_opt().unwrap().is_sign_positive());
2869 } else {
2870 panic!("expecting Statistics::Double");
2871 }
2872 }
2873
2874 #[test]
2875 fn test_double_statistics_zero_min() {
2876 let stats = statistics_roundtrip::<DoubleType>(&[0.0, 1.0, f64::NAN, 2.0]);
2877 assert!(stats.is_min_max_backwards_compatible());
2878 if let Statistics::Double(stats) = stats {
2879 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2880 assert!(stats.min_opt().unwrap().is_sign_negative());
2881 assert_eq!(stats.max_opt().unwrap(), &2.0);
2882 } else {
2883 panic!("expecting Statistics::Double");
2884 }
2885 }
2886
2887 #[test]
2888 fn test_double_statistics_neg_zero_max() {
2889 let stats = statistics_roundtrip::<DoubleType>(&[-0.0, -1.0, f64::NAN, -2.0]);
2890 assert!(stats.is_min_max_backwards_compatible());
2891 if let Statistics::Double(stats) = stats {
2892 assert_eq!(stats.min_opt().unwrap(), &-2.0);
2893 assert_eq!(stats.max_opt().unwrap(), &0.0);
2894 assert!(stats.max_opt().unwrap().is_sign_positive());
2895 } else {
2896 panic!("expecting Statistics::Double");
2897 }
2898 }
2899
2900 #[test]
2901 fn test_compare_greater_byte_array_decimals() {
2902 assert!(!compare_greater_byte_array_decimals(&[], &[],),);
2903 assert!(compare_greater_byte_array_decimals(&[1u8,], &[],),);
2904 assert!(!compare_greater_byte_array_decimals(&[], &[1u8,],),);
2905 assert!(compare_greater_byte_array_decimals(&[1u8,], &[0u8,],),);
2906 assert!(!compare_greater_byte_array_decimals(&[1u8,], &[1u8,],),);
2907 assert!(compare_greater_byte_array_decimals(&[1u8, 0u8,], &[0u8,],),);
2908 assert!(!compare_greater_byte_array_decimals(
2909 &[0u8, 1u8,],
2910 &[1u8, 0u8,],
2911 ),);
2912 assert!(!compare_greater_byte_array_decimals(
2913 &[255u8, 35u8, 0u8, 0u8,],
2914 &[0u8,],
2915 ),);
2916 assert!(compare_greater_byte_array_decimals(
2917 &[0u8,],
2918 &[255u8, 35u8, 0u8, 0u8,],
2919 ),);
2920 }
2921
2922 #[test]
2923 fn test_column_index_with_null_pages() {
2924 let page_writer = get_test_page_writer();
2926 let props = Default::default();
2927 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
2928 writer.write_batch(&[], Some(&[0, 0, 0, 0]), None).unwrap();
2929
2930 let r = writer.close().unwrap();
2931 assert!(r.column_index.is_some());
2932 let col_idx = r.column_index.unwrap();
2933 assert!(col_idx.null_pages[0]);
2935 assert_eq!(col_idx.min_values[0].len(), 0);
2937 assert_eq!(col_idx.max_values[0].len(), 0);
2938 assert!(col_idx.null_counts.is_some());
2940 assert_eq!(col_idx.null_counts.as_ref().unwrap()[0], 4);
2941 assert!(col_idx.repetition_level_histograms.is_none());
2943 assert!(col_idx.definition_level_histograms.is_some());
2945 assert_eq!(col_idx.definition_level_histograms.unwrap(), &[4, 0]);
2946 }
2947
2948 #[test]
2949 fn test_column_offset_index_metadata() {
2950 let page_writer = get_test_page_writer();
2953 let props = Default::default();
2954 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2955 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2956 writer.flush_data_pages().unwrap();
2958 writer.write_batch(&[4, 8, 2, -5], None, None).unwrap();
2960
2961 let r = writer.close().unwrap();
2962 let column_index = r.column_index.unwrap();
2963 let offset_index = r.offset_index.unwrap();
2964
2965 assert_eq!(8, r.rows_written);
2966
2967 assert_eq!(2, column_index.null_pages.len());
2969 assert_eq!(2, offset_index.page_locations.len());
2970 assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order);
2971 for idx in 0..2 {
2972 assert!(!column_index.null_pages[idx]);
2973 assert_eq!(0, column_index.null_counts.as_ref().unwrap()[idx]);
2974 }
2975
2976 if let Some(stats) = r.metadata.statistics() {
2977 assert_eq!(stats.null_count_opt(), Some(0));
2978 assert_eq!(stats.distinct_count_opt(), None);
2979 if let Statistics::Int32(stats) = stats {
2980 assert_eq!(
2984 stats.min_bytes_opt(),
2985 Some(column_index.min_values[1].as_slice())
2986 );
2987 assert_eq!(
2988 stats.max_bytes_opt(),
2989 column_index.max_values.get(1).map(Vec::as_slice)
2990 );
2991 } else {
2992 panic!("expecting Statistics::Int32");
2993 }
2994 } else {
2995 panic!("metadata missing statistics");
2996 }
2997
2998 assert_eq!(0, offset_index.page_locations[0].first_row_index);
3000 assert_eq!(4, offset_index.page_locations[1].first_row_index);
3001 }
3002
3003 #[test]
3005 fn test_column_offset_index_metadata_truncating() {
3006 let page_writer = get_test_page_writer();
3009 let props = Default::default();
3010 let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3011
3012 let mut data = vec![FixedLenByteArray::default(); 3];
3013 data[0].set_data(Bytes::from(vec![97_u8; 200]));
3015 data[1].set_data(Bytes::from(vec![112_u8; 200]));
3017 data[2].set_data(Bytes::from(vec![98_u8; 200]));
3018
3019 writer.write_batch(&data, None, None).unwrap();
3020
3021 writer.flush_data_pages().unwrap();
3022
3023 let r = writer.close().unwrap();
3024 let column_index = r.column_index.unwrap();
3025 let offset_index = r.offset_index.unwrap();
3026
3027 assert_eq!(3, r.rows_written);
3028
3029 assert_eq!(1, column_index.null_pages.len());
3031 assert_eq!(1, offset_index.page_locations.len());
3032 assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
3033 assert!(!column_index.null_pages[0]);
3034 assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]);
3035
3036 if let Some(stats) = r.metadata.statistics() {
3037 assert_eq!(stats.null_count_opt(), Some(0));
3038 assert_eq!(stats.distinct_count_opt(), None);
3039 if let Statistics::FixedLenByteArray(stats) = stats {
3040 let column_index_min_value = &column_index.min_values[0];
3041 let column_index_max_value = &column_index.max_values[0];
3042
3043 assert_ne!(
3045 stats.min_bytes_opt(),
3046 Some(column_index_min_value.as_slice())
3047 );
3048 assert_ne!(
3049 stats.max_bytes_opt(),
3050 Some(column_index_max_value.as_slice())
3051 );
3052
3053 assert_eq!(
3054 column_index_min_value.len(),
3055 DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
3056 );
3057 assert_eq!(column_index_min_value.as_slice(), &[97_u8; 64]);
3058 assert_eq!(
3059 column_index_max_value.len(),
3060 DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
3061 );
3062
3063 assert_eq!(
3065 *column_index_max_value.last().unwrap(),
3066 *column_index_max_value.first().unwrap() + 1
3067 );
3068 } else {
3069 panic!("expecting Statistics::FixedLenByteArray");
3070 }
3071 } else {
3072 panic!("metadata missing statistics");
3073 }
3074 }
3075
3076 #[test]
3077 fn test_column_offset_index_truncating_spec_example() {
3078 let page_writer = get_test_page_writer();
3081
3082 let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3084 let props = Arc::new(builder.build());
3085 let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3086
3087 let mut data = vec![FixedLenByteArray::default(); 1];
3088 data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
3090
3091 writer.write_batch(&data, None, None).unwrap();
3092
3093 writer.flush_data_pages().unwrap();
3094
3095 let r = writer.close().unwrap();
3096 let column_index = r.column_index.unwrap();
3097 let offset_index = r.offset_index.unwrap();
3098
3099 assert_eq!(1, r.rows_written);
3100
3101 assert_eq!(1, column_index.null_pages.len());
3103 assert_eq!(1, offset_index.page_locations.len());
3104 assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
3105 assert!(!column_index.null_pages[0]);
3106 assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]);
3107
3108 if let Some(stats) = r.metadata.statistics() {
3109 assert_eq!(stats.null_count_opt(), Some(0));
3110 assert_eq!(stats.distinct_count_opt(), None);
3111 if let Statistics::FixedLenByteArray(_stats) = stats {
3112 let column_index_min_value = &column_index.min_values[0];
3113 let column_index_max_value = &column_index.max_values[0];
3114
3115 assert_eq!(column_index_min_value.len(), 1);
3116 assert_eq!(column_index_max_value.len(), 1);
3117
3118 assert_eq!("B".as_bytes(), column_index_min_value.as_slice());
3119 assert_eq!("C".as_bytes(), column_index_max_value.as_slice());
3120
3121 assert_ne!(column_index_min_value, stats.min_bytes_opt().unwrap());
3122 assert_ne!(column_index_max_value, stats.max_bytes_opt().unwrap());
3123 } else {
3124 panic!("expecting Statistics::FixedLenByteArray");
3125 }
3126 } else {
3127 panic!("metadata missing statistics");
3128 }
3129 }
3130
3131 #[test]
3132 fn test_float16_min_max_no_truncation() {
3133 let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3135 let props = Arc::new(builder.build());
3136 let page_writer = get_test_page_writer();
3137 let mut writer = get_test_float16_column_writer(page_writer, props);
3138
3139 let expected_value = f16::PI.to_le_bytes().to_vec();
3140 let data = vec![ByteArray::from(expected_value.clone()).into()];
3141 writer.write_batch(&data, None, None).unwrap();
3142 writer.flush_data_pages().unwrap();
3143
3144 let r = writer.close().unwrap();
3145
3146 let column_index = r.column_index.unwrap();
3149 let column_index_min_bytes = column_index.min_values[0].as_slice();
3150 let column_index_max_bytes = column_index.max_values[0].as_slice();
3151 assert_eq!(expected_value, column_index_min_bytes);
3152 assert_eq!(expected_value, column_index_max_bytes);
3153
3154 let stats = r.metadata.statistics().unwrap();
3156 if let Statistics::FixedLenByteArray(stats) = stats {
3157 let stats_min_bytes = stats.min_bytes_opt().unwrap();
3158 let stats_max_bytes = stats.max_bytes_opt().unwrap();
3159 assert_eq!(expected_value, stats_min_bytes);
3160 assert_eq!(expected_value, stats_max_bytes);
3161 } else {
3162 panic!("expecting Statistics::FixedLenByteArray");
3163 }
3164 }
3165
3166 #[test]
3167 fn test_decimal_min_max_no_truncation() {
3168 let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3170 let props = Arc::new(builder.build());
3171 let page_writer = get_test_page_writer();
3172 let mut writer =
3173 get_test_decimals_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3174
3175 let expected_value = vec![
3176 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8, 35u8,
3177 231u8, 90u8, 0u8, 0u8,
3178 ];
3179 let data = vec![ByteArray::from(expected_value.clone()).into()];
3180 writer.write_batch(&data, None, None).unwrap();
3181 writer.flush_data_pages().unwrap();
3182
3183 let r = writer.close().unwrap();
3184
3185 let column_index = r.column_index.unwrap();
3188 let column_index_min_bytes = column_index.min_values[0].as_slice();
3189 let column_index_max_bytes = column_index.max_values[0].as_slice();
3190 assert_eq!(expected_value, column_index_min_bytes);
3191 assert_eq!(expected_value, column_index_max_bytes);
3192
3193 let stats = r.metadata.statistics().unwrap();
3195 if let Statistics::FixedLenByteArray(stats) = stats {
3196 let stats_min_bytes = stats.min_bytes_opt().unwrap();
3197 let stats_max_bytes = stats.max_bytes_opt().unwrap();
3198 assert_eq!(expected_value, stats_min_bytes);
3199 assert_eq!(expected_value, stats_max_bytes);
3200 } else {
3201 panic!("expecting Statistics::FixedLenByteArray");
3202 }
3203 }
3204
3205 #[test]
3206 fn test_statistics_truncating_byte_array() {
3207 let page_writer = get_test_page_writer();
3208
3209 const TEST_TRUNCATE_LENGTH: usize = 1;
3210
3211 let builder =
3213 WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3214 let props = Arc::new(builder.build());
3215 let mut writer = get_test_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
3216
3217 let mut data = vec![ByteArray::default(); 1];
3218 data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
3220
3221 writer.write_batch(&data, None, None).unwrap();
3222
3223 writer.flush_data_pages().unwrap();
3224
3225 let r = writer.close().unwrap();
3226
3227 assert_eq!(1, r.rows_written);
3228
3229 let stats = r.metadata.statistics().expect("statistics");
3230 assert_eq!(stats.null_count_opt(), Some(0));
3231 assert_eq!(stats.distinct_count_opt(), None);
3232 if let Statistics::ByteArray(_stats) = stats {
3233 let min_value = _stats.min_opt().unwrap();
3234 let max_value = _stats.max_opt().unwrap();
3235
3236 assert!(!_stats.min_is_exact());
3237 assert!(!_stats.max_is_exact());
3238
3239 assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3240 assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3241
3242 assert_eq!("B".as_bytes(), min_value.as_bytes());
3243 assert_eq!("C".as_bytes(), max_value.as_bytes());
3244 } else {
3245 panic!("expecting Statistics::ByteArray");
3246 }
3247 }
3248
3249 #[test]
3250 fn test_statistics_truncating_fixed_len_byte_array() {
3251 let page_writer = get_test_page_writer();
3252
3253 const TEST_TRUNCATE_LENGTH: usize = 1;
3254
3255 let builder =
3257 WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3258 let props = Arc::new(builder.build());
3259 let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3260
3261 let mut data = vec![FixedLenByteArray::default(); 1];
3262
3263 const PSEUDO_DECIMAL_VALUE: i128 = 6541894651216648486512564456564654;
3264 const PSEUDO_DECIMAL_BYTES: [u8; 16] = PSEUDO_DECIMAL_VALUE.to_be_bytes();
3265
3266 const EXPECTED_MIN: [u8; TEST_TRUNCATE_LENGTH] = [PSEUDO_DECIMAL_BYTES[0]]; const EXPECTED_MAX: [u8; TEST_TRUNCATE_LENGTH] =
3268 [PSEUDO_DECIMAL_BYTES[0].overflowing_add(1).0];
3269
3270 data[0].set_data(Bytes::from(PSEUDO_DECIMAL_BYTES.as_slice()));
3272
3273 writer.write_batch(&data, None, None).unwrap();
3274
3275 writer.flush_data_pages().unwrap();
3276
3277 let r = writer.close().unwrap();
3278
3279 assert_eq!(1, r.rows_written);
3280
3281 let stats = r.metadata.statistics().expect("statistics");
3282 assert_eq!(stats.null_count_opt(), Some(0));
3283 assert_eq!(stats.distinct_count_opt(), None);
3284 if let Statistics::FixedLenByteArray(_stats) = stats {
3285 let min_value = _stats.min_opt().unwrap();
3286 let max_value = _stats.max_opt().unwrap();
3287
3288 assert!(!_stats.min_is_exact());
3289 assert!(!_stats.max_is_exact());
3290
3291 assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3292 assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3293
3294 assert_eq!(EXPECTED_MIN.as_slice(), min_value.as_bytes());
3295 assert_eq!(EXPECTED_MAX.as_slice(), max_value.as_bytes());
3296
3297 let reconstructed_min = i128::from_be_bytes([
3298 min_value.as_bytes()[0],
3299 0,
3300 0,
3301 0,
3302 0,
3303 0,
3304 0,
3305 0,
3306 0,
3307 0,
3308 0,
3309 0,
3310 0,
3311 0,
3312 0,
3313 0,
3314 ]);
3315
3316 let reconstructed_max = i128::from_be_bytes([
3317 max_value.as_bytes()[0],
3318 0,
3319 0,
3320 0,
3321 0,
3322 0,
3323 0,
3324 0,
3325 0,
3326 0,
3327 0,
3328 0,
3329 0,
3330 0,
3331 0,
3332 0,
3333 ]);
3334
3335 println!("min: {reconstructed_min} {PSEUDO_DECIMAL_VALUE}");
3337 assert!(reconstructed_min <= PSEUDO_DECIMAL_VALUE);
3338 println!("max {reconstructed_max} {PSEUDO_DECIMAL_VALUE}");
3339 assert!(reconstructed_max >= PSEUDO_DECIMAL_VALUE);
3340 } else {
3341 panic!("expecting Statistics::FixedLenByteArray");
3342 }
3343 }
3344
3345 #[test]
3346 fn test_send() {
3347 fn test<T: Send>() {}
3348 test::<ColumnWriterImpl<Int32Type>>();
3349 }
3350
3351 #[test]
3352 fn test_increment() {
3353 let v = increment(vec![0, 0, 0]).unwrap();
3354 assert_eq!(&v, &[0, 0, 1]);
3355
3356 let v = increment(vec![0, 255, 255]).unwrap();
3358 assert_eq!(&v, &[1, 0, 0]);
3359
3360 let v = increment(vec![255, 255, 255]);
3362 assert!(v.is_none());
3363 }
3364
3365 #[test]
3366 fn test_increment_utf8() {
3367 let test_inc = |o: &str, expected: &str| {
3368 if let Ok(v) = String::from_utf8(increment_utf8(o).unwrap()) {
3369 assert_eq!(v, expected);
3371 assert!(*v > *o);
3373 let mut greater = ByteArray::new();
3375 greater.set_data(Bytes::from(v));
3376 let mut original = ByteArray::new();
3377 original.set_data(Bytes::from(o.as_bytes().to_vec()));
3378 assert!(greater > original);
3379 } else {
3380 panic!("Expected incremented UTF8 string to also be valid.");
3381 }
3382 };
3383
3384 test_inc("hello", "hellp");
3386
3387 test_inc("a\u{7f}", "b");
3389
3390 assert!(increment_utf8("\u{7f}\u{7f}").is_none());
3392
3393 test_inc("❤️🧡💛💚💙💜", "❤️🧡💛💚💙💝");
3395
3396 test_inc("éééé", "éééê");
3398
3399 test_inc("\u{ff}\u{ff}", "\u{ff}\u{100}");
3401
3402 test_inc("a\u{7ff}", "b");
3404
3405 assert!(increment_utf8("\u{7ff}\u{7ff}").is_none());
3407
3408 test_inc("ࠀࠀ", "ࠀࠁ");
3411
3412 test_inc("a\u{ffff}", "b");
3414
3415 assert!(increment_utf8("\u{ffff}\u{ffff}").is_none());
3417
3418 test_inc("𐀀𐀀", "𐀀𐀁");
3420
3421 test_inc("a\u{10ffff}", "b");
3423
3424 assert!(increment_utf8("\u{10ffff}\u{10ffff}").is_none());
3426
3427 test_inc("a\u{D7FF}", "b");
3430 }
3431
3432 #[test]
3433 fn test_truncate_utf8() {
3434 let data = "❤️🧡💛💚💙💜";
3436 let r = truncate_utf8(data, data.len()).unwrap();
3437 assert_eq!(r.len(), data.len());
3438 assert_eq!(&r, data.as_bytes());
3439
3440 let r = truncate_utf8(data, 13).unwrap();
3442 assert_eq!(r.len(), 10);
3443 assert_eq!(&r, "❤️🧡".as_bytes());
3444
3445 let r = truncate_utf8("\u{0836}", 1);
3447 assert!(r.is_none());
3448
3449 let r = truncate_and_increment_utf8("yyyyyyyyy", 8).unwrap();
3452 assert_eq!(&r, "yyyyyyyz".as_bytes());
3453
3454 let r = truncate_and_increment_utf8("ééééé", 7).unwrap();
3456 assert_eq!(&r, "ééê".as_bytes());
3457
3458 let r = truncate_and_increment_utf8("\u{ff}\u{ff}\u{ff}\u{ff}\u{ff}", 8).unwrap();
3460 assert_eq!(&r, "\u{ff}\u{ff}\u{ff}\u{100}".as_bytes());
3461
3462 let r = truncate_and_increment_utf8("߿߿߿߿߿", 8);
3464 assert!(r.is_none());
3465
3466 let r = truncate_and_increment_utf8("ࠀࠀࠀࠀ", 8).unwrap();
3469 assert_eq!(&r, "ࠀࠁ".as_bytes());
3470
3471 let r = truncate_and_increment_utf8("\u{ffff}\u{ffff}\u{ffff}", 8);
3473 assert!(r.is_none());
3474
3475 let r = truncate_and_increment_utf8("𐀀𐀀𐀀𐀀", 9).unwrap();
3477 assert_eq!(&r, "𐀀𐀁".as_bytes());
3478
3479 let r = truncate_and_increment_utf8("\u{10ffff}\u{10ffff}", 8);
3481 assert!(r.is_none());
3482 }
3483
3484 #[test]
3485 fn test_byte_array_truncate_invalid_utf8_statistics() {
3488 let message_type = "
3489 message test_schema {
3490 OPTIONAL BYTE_ARRAY a (UTF8);
3491 }
3492 ";
3493 let schema = Arc::new(parse_message_type(message_type).unwrap());
3494
3495 let data = vec![ByteArray::from(vec![128u8; 32]); 7];
3497 let def_levels = [1, 1, 1, 1, 0, 1, 0, 1, 0, 1];
3498 let file: File = tempfile::tempfile().unwrap();
3499 let props = Arc::new(
3500 WriterProperties::builder()
3501 .set_statistics_enabled(EnabledStatistics::Chunk)
3502 .set_statistics_truncate_length(Some(8))
3503 .build(),
3504 );
3505
3506 let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
3507 let mut row_group_writer = writer.next_row_group().unwrap();
3508
3509 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
3510 col_writer
3511 .typed::<ByteArrayType>()
3512 .write_batch(&data, Some(&def_levels), None)
3513 .unwrap();
3514 col_writer.close().unwrap();
3515 row_group_writer.close().unwrap();
3516 let file_metadata = writer.close().unwrap();
3517 assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
3518 let stats = file_metadata.row_groups[0].columns[0]
3519 .meta_data
3520 .as_ref()
3521 .unwrap()
3522 .statistics
3523 .as_ref()
3524 .unwrap();
3525 assert!(!stats.is_max_value_exact.unwrap());
3526 assert_eq!(
3529 stats.max_value,
3530 Some([128, 128, 128, 128, 128, 128, 128, 129].to_vec())
3531 );
3532 }
3533
3534 #[test]
3535 fn test_increment_max_binary_chars() {
3536 let r = increment(vec![0xFF, 0xFE, 0xFD, 0xFF, 0xFF]);
3537 assert_eq!(&r.unwrap(), &[0xFF, 0xFE, 0xFE, 0x00, 0x00]);
3538
3539 let incremented = increment(vec![0xFF, 0xFF, 0xFF]);
3540 assert!(incremented.is_none())
3541 }
3542
3543 #[test]
3544 fn test_no_column_index_when_stats_disabled() {
3545 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3549 let props = Arc::new(
3550 WriterProperties::builder()
3551 .set_statistics_enabled(EnabledStatistics::None)
3552 .build(),
3553 );
3554 let column_writer = get_column_writer(descr, props, get_test_page_writer());
3555 let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3556
3557 let data = Vec::new();
3558 let def_levels = vec![0; 10];
3559 writer.write_batch(&data, Some(&def_levels), None).unwrap();
3560 writer.flush_data_pages().unwrap();
3561
3562 let column_close_result = writer.close().unwrap();
3563 assert!(column_close_result.offset_index.is_some());
3564 assert!(column_close_result.column_index.is_none());
3565 }
3566
3567 #[test]
3568 fn test_no_offset_index_when_disabled() {
3569 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3571 let props = Arc::new(
3572 WriterProperties::builder()
3573 .set_statistics_enabled(EnabledStatistics::None)
3574 .set_offset_index_disabled(true)
3575 .build(),
3576 );
3577 let column_writer = get_column_writer(descr, props, get_test_page_writer());
3578 let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3579
3580 let data = Vec::new();
3581 let def_levels = vec![0; 10];
3582 writer.write_batch(&data, Some(&def_levels), None).unwrap();
3583 writer.flush_data_pages().unwrap();
3584
3585 let column_close_result = writer.close().unwrap();
3586 assert!(column_close_result.offset_index.is_none());
3587 assert!(column_close_result.column_index.is_none());
3588 }
3589
3590 #[test]
3591 fn test_offset_index_overridden() {
3592 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3594 let props = Arc::new(
3595 WriterProperties::builder()
3596 .set_statistics_enabled(EnabledStatistics::Page)
3597 .set_offset_index_disabled(true)
3598 .build(),
3599 );
3600 let column_writer = get_column_writer(descr, props, get_test_page_writer());
3601 let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3602
3603 let data = Vec::new();
3604 let def_levels = vec![0; 10];
3605 writer.write_batch(&data, Some(&def_levels), None).unwrap();
3606 writer.flush_data_pages().unwrap();
3607
3608 let column_close_result = writer.close().unwrap();
3609 assert!(column_close_result.offset_index.is_some());
3610 assert!(column_close_result.column_index.is_some());
3611 }
3612
3613 #[test]
3614 fn test_boundary_order() -> Result<()> {
3615 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3616 let column_close_result = write_multiple_pages::<Int32Type>(
3618 &descr,
3619 &[
3620 &[Some(-10), Some(10)],
3621 &[Some(-5), Some(11)],
3622 &[None],
3623 &[Some(-5), Some(11)],
3624 ],
3625 )?;
3626 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3627 assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3628
3629 let column_close_result = write_multiple_pages::<Int32Type>(
3631 &descr,
3632 &[
3633 &[Some(10), Some(11)],
3634 &[Some(5), Some(11)],
3635 &[None],
3636 &[Some(-5), Some(0)],
3637 ],
3638 )?;
3639 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3640 assert_eq!(boundary_order, BoundaryOrder::DESCENDING);
3641
3642 let column_close_result = write_multiple_pages::<Int32Type>(
3644 &descr,
3645 &[&[Some(10), Some(11)], &[None], &[Some(10), Some(11)]],
3646 )?;
3647 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3648 assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3649
3650 let column_close_result =
3652 write_multiple_pages::<Int32Type>(&descr, &[&[None], &[None], &[None]])?;
3653 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3654 assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3655
3656 let column_close_result =
3658 write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)]])?;
3659 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3660 assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3661
3662 let column_close_result =
3664 write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)], &[None]])?;
3665 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3666 assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3667
3668 let column_close_result = write_multiple_pages::<Int32Type>(
3670 &descr,
3671 &[
3672 &[Some(10), Some(11)],
3673 &[Some(11), Some(16)],
3674 &[None],
3675 &[Some(-5), Some(0)],
3676 ],
3677 )?;
3678 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3679 assert_eq!(boundary_order, BoundaryOrder::UNORDERED);
3680
3681 let column_close_result = write_multiple_pages::<Int32Type>(
3683 &descr,
3684 &[
3685 &[Some(1), Some(9)],
3686 &[Some(2), Some(8)],
3687 &[None],
3688 &[Some(3), Some(7)],
3689 ],
3690 )?;
3691 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3692 assert_eq!(boundary_order, BoundaryOrder::UNORDERED);
3693
3694 Ok(())
3695 }
3696
3697 #[test]
3698 fn test_boundary_order_logical_type() -> Result<()> {
3699 let f16_descr = Arc::new(get_test_float16_column_descr(1, 0));
3702 let fba_descr = {
3703 let tpe = SchemaType::primitive_type_builder(
3704 "col",
3705 FixedLenByteArrayType::get_physical_type(),
3706 )
3707 .with_length(2)
3708 .build()?;
3709 Arc::new(ColumnDescriptor::new(
3710 Arc::new(tpe),
3711 1,
3712 0,
3713 ColumnPath::from("col"),
3714 ))
3715 };
3716
3717 let values: &[&[Option<FixedLenByteArray>]] = &[
3718 &[Some(FixedLenByteArray::from(ByteArray::from(f16::ONE)))],
3719 &[Some(FixedLenByteArray::from(ByteArray::from(f16::ZERO)))],
3720 &[Some(FixedLenByteArray::from(ByteArray::from(
3721 f16::NEG_ZERO,
3722 )))],
3723 &[Some(FixedLenByteArray::from(ByteArray::from(f16::NEG_ONE)))],
3724 ];
3725
3726 let column_close_result =
3728 write_multiple_pages::<FixedLenByteArrayType>(&f16_descr, values)?;
3729 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3730 assert_eq!(boundary_order, BoundaryOrder::DESCENDING);
3731
3732 let column_close_result =
3734 write_multiple_pages::<FixedLenByteArrayType>(&fba_descr, values)?;
3735 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3736 assert_eq!(boundary_order, BoundaryOrder::UNORDERED);
3737
3738 Ok(())
3739 }
3740
3741 #[test]
3742 fn test_interval_stats_should_not_have_min_max() {
3743 let input = [
3744 vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
3745 vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
3746 vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2],
3747 ]
3748 .into_iter()
3749 .map(|s| ByteArray::from(s).into())
3750 .collect::<Vec<_>>();
3751
3752 let page_writer = get_test_page_writer();
3753 let mut writer = get_test_interval_column_writer(page_writer);
3754 writer.write_batch(&input, None, None).unwrap();
3755
3756 let metadata = writer.close().unwrap().metadata;
3757 let stats = if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
3758 stats.clone()
3759 } else {
3760 panic!("metadata missing statistics");
3761 };
3762 assert!(stats.min_bytes_opt().is_none());
3763 assert!(stats.max_bytes_opt().is_none());
3764 }
3765
3766 #[test]
3767 #[cfg(feature = "arrow")]
3768 fn test_column_writer_get_estimated_total_bytes() {
3769 let page_writer = get_test_page_writer();
3770 let props = Default::default();
3771 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
3772 assert_eq!(writer.get_estimated_total_bytes(), 0);
3773
3774 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
3775 writer.add_data_page().unwrap();
3776 let size_with_one_page = writer.get_estimated_total_bytes();
3777 assert_eq!(size_with_one_page, 20);
3778
3779 writer.write_batch(&[5, 6, 7, 8], None, None).unwrap();
3780 writer.add_data_page().unwrap();
3781 let size_with_two_pages = writer.get_estimated_total_bytes();
3782 assert_eq!(size_with_two_pages, 20 + 21);
3784 }
3785
3786 fn write_multiple_pages<T: DataType>(
3787 column_descr: &Arc<ColumnDescriptor>,
3788 pages: &[&[Option<T::T>]],
3789 ) -> Result<ColumnCloseResult> {
3790 let column_writer = get_column_writer(
3791 column_descr.clone(),
3792 Default::default(),
3793 get_test_page_writer(),
3794 );
3795 let mut writer = get_typed_column_writer::<T>(column_writer);
3796
3797 for &page in pages {
3798 let values = page.iter().filter_map(Clone::clone).collect::<Vec<_>>();
3799 let def_levels = page
3800 .iter()
3801 .map(|maybe_value| if maybe_value.is_some() { 1 } else { 0 })
3802 .collect::<Vec<_>>();
3803 writer.write_batch(&values, Some(&def_levels), None)?;
3804 writer.flush_data_pages()?;
3805 }
3806
3807 writer.close()
3808 }
3809
3810 fn column_roundtrip_random<T: DataType>(
3814 props: WriterProperties,
3815 max_size: usize,
3816 min_value: T::T,
3817 max_value: T::T,
3818 max_def_level: i16,
3819 max_rep_level: i16,
3820 ) where
3821 T::T: PartialOrd + SampleUniform + Copy,
3822 {
3823 let mut num_values: usize = 0;
3824
3825 let mut buf: Vec<i16> = Vec::new();
3826 let def_levels = if max_def_level > 0 {
3827 random_numbers_range(max_size, 0, max_def_level + 1, &mut buf);
3828 for &dl in &buf[..] {
3829 if dl == max_def_level {
3830 num_values += 1;
3831 }
3832 }
3833 Some(&buf[..])
3834 } else {
3835 num_values = max_size;
3836 None
3837 };
3838
3839 let mut buf: Vec<i16> = Vec::new();
3840 let rep_levels = if max_rep_level > 0 {
3841 random_numbers_range(max_size, 0, max_rep_level + 1, &mut buf);
3842 buf[0] = 0; Some(&buf[..])
3844 } else {
3845 None
3846 };
3847
3848 let mut values: Vec<T::T> = Vec::new();
3849 random_numbers_range(num_values, min_value, max_value, &mut values);
3850
3851 column_roundtrip::<T>(props, &values[..], def_levels, rep_levels);
3852 }
3853
3854 fn column_roundtrip<T: DataType>(
3856 props: WriterProperties,
3857 values: &[T::T],
3858 def_levels: Option<&[i16]>,
3859 rep_levels: Option<&[i16]>,
3860 ) {
3861 let mut file = tempfile::tempfile().unwrap();
3862 let mut write = TrackedWrite::new(&mut file);
3863 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
3864
3865 let max_def_level = match def_levels {
3866 Some(buf) => *buf.iter().max().unwrap_or(&0i16),
3867 None => 0i16,
3868 };
3869
3870 let max_rep_level = match rep_levels {
3871 Some(buf) => *buf.iter().max().unwrap_or(&0i16),
3872 None => 0i16,
3873 };
3874
3875 let mut max_batch_size = values.len();
3876 if let Some(levels) = def_levels {
3877 max_batch_size = max_batch_size.max(levels.len());
3878 }
3879 if let Some(levels) = rep_levels {
3880 max_batch_size = max_batch_size.max(levels.len());
3881 }
3882
3883 let mut writer =
3884 get_test_column_writer::<T>(page_writer, max_def_level, max_rep_level, Arc::new(props));
3885
3886 let values_written = writer.write_batch(values, def_levels, rep_levels).unwrap();
3887 assert_eq!(values_written, values.len());
3888 let result = writer.close().unwrap();
3889
3890 drop(write);
3891
3892 let props = ReaderProperties::builder()
3893 .set_backward_compatible_lz4(false)
3894 .build();
3895 let page_reader = Box::new(
3896 SerializedPageReader::new_with_properties(
3897 Arc::new(file),
3898 &result.metadata,
3899 result.rows_written as usize,
3900 None,
3901 Arc::new(props),
3902 )
3903 .unwrap(),
3904 );
3905 let mut reader = get_test_column_reader::<T>(page_reader, max_def_level, max_rep_level);
3906
3907 let mut actual_values = Vec::with_capacity(max_batch_size);
3908 let mut actual_def_levels = def_levels.map(|_| Vec::with_capacity(max_batch_size));
3909 let mut actual_rep_levels = rep_levels.map(|_| Vec::with_capacity(max_batch_size));
3910
3911 let (_, values_read, levels_read) = reader
3912 .read_records(
3913 max_batch_size,
3914 actual_def_levels.as_mut(),
3915 actual_rep_levels.as_mut(),
3916 &mut actual_values,
3917 )
3918 .unwrap();
3919
3920 assert_eq!(&actual_values[..values_read], values);
3923 match actual_def_levels {
3924 Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), def_levels),
3925 None => assert_eq!(None, def_levels),
3926 }
3927 match actual_rep_levels {
3928 Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), rep_levels),
3929 None => assert_eq!(None, rep_levels),
3930 }
3931
3932 if let Some(levels) = actual_rep_levels {
3935 let mut actual_rows_written = 0;
3936 for l in levels {
3937 if l == 0 {
3938 actual_rows_written += 1;
3939 }
3940 }
3941 assert_eq!(actual_rows_written, result.rows_written);
3942 } else if actual_def_levels.is_some() {
3943 assert_eq!(levels_read as u64, result.rows_written);
3944 } else {
3945 assert_eq!(values_read as u64, result.rows_written);
3946 }
3947 }
3948
3949 fn column_write_and_get_metadata<T: DataType>(
3952 props: WriterProperties,
3953 values: &[T::T],
3954 ) -> ColumnChunkMetaData {
3955 let page_writer = get_test_page_writer();
3956 let props = Arc::new(props);
3957 let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
3958 writer.write_batch(values, None, None).unwrap();
3959 writer.close().unwrap().metadata
3960 }
3961
3962 fn encoding_stats(page_type: PageType, encoding: Encoding, count: i32) -> PageEncodingStats {
3964 PageEncodingStats {
3965 page_type,
3966 encoding,
3967 count,
3968 }
3969 }
3970
3971 fn check_encoding_write_support<T: DataType>(
3975 version: WriterVersion,
3976 dict_enabled: bool,
3977 data: &[T::T],
3978 dictionary_page_offset: Option<i64>,
3979 encodings: &[Encoding],
3980 page_encoding_stats: &[PageEncodingStats],
3981 ) {
3982 let props = WriterProperties::builder()
3983 .set_writer_version(version)
3984 .set_dictionary_enabled(dict_enabled)
3985 .build();
3986 let meta = column_write_and_get_metadata::<T>(props, data);
3987 assert_eq!(meta.dictionary_page_offset(), dictionary_page_offset);
3988 assert_eq!(meta.encodings(), encodings);
3989 assert_eq!(meta.page_encoding_stats().unwrap(), page_encoding_stats);
3990 }
3991
3992 fn get_test_column_writer<'a, T: DataType>(
3994 page_writer: Box<dyn PageWriter + 'a>,
3995 max_def_level: i16,
3996 max_rep_level: i16,
3997 props: WriterPropertiesPtr,
3998 ) -> ColumnWriterImpl<'a, T> {
3999 let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
4000 let column_writer = get_column_writer(descr, props, page_writer);
4001 get_typed_column_writer::<T>(column_writer)
4002 }
4003
4004 fn get_test_column_reader<T: DataType>(
4006 page_reader: Box<dyn PageReader>,
4007 max_def_level: i16,
4008 max_rep_level: i16,
4009 ) -> ColumnReaderImpl<T> {
4010 let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
4011 let column_reader = get_column_reader(descr, page_reader);
4012 get_typed_column_reader::<T>(column_reader)
4013 }
4014
4015 fn get_test_column_descr<T: DataType>(
4017 max_def_level: i16,
4018 max_rep_level: i16,
4019 ) -> ColumnDescriptor {
4020 let path = ColumnPath::from("col");
4021 let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4022 .with_length(1)
4025 .build()
4026 .unwrap();
4027 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4028 }
4029
4030 fn get_test_page_writer() -> Box<dyn PageWriter> {
4032 Box::new(TestPageWriter {})
4033 }
4034
4035 struct TestPageWriter {}
4036
4037 impl PageWriter for TestPageWriter {
4038 fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
4039 let mut res = PageWriteSpec::new();
4040 res.page_type = page.page_type();
4041 res.uncompressed_size = page.uncompressed_size();
4042 res.compressed_size = page.compressed_size();
4043 res.num_values = page.num_values();
4044 res.offset = 0;
4045 res.bytes_written = page.data().len() as u64;
4046 Ok(res)
4047 }
4048
4049 fn close(&mut self) -> Result<()> {
4050 Ok(())
4051 }
4052 }
4053
4054 fn statistics_roundtrip<T: DataType>(values: &[<T as DataType>::T]) -> Statistics {
4056 let page_writer = get_test_page_writer();
4057 let props = Default::default();
4058 let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
4059 writer.write_batch(values, None, None).unwrap();
4060
4061 let metadata = writer.close().unwrap().metadata;
4062 if let Some(stats) = metadata.statistics() {
4063 stats.clone()
4064 } else {
4065 panic!("metadata missing statistics");
4066 }
4067 }
4068
4069 fn get_test_decimals_column_writer<T: DataType>(
4071 page_writer: Box<dyn PageWriter>,
4072 max_def_level: i16,
4073 max_rep_level: i16,
4074 props: WriterPropertiesPtr,
4075 ) -> ColumnWriterImpl<'static, T> {
4076 let descr = Arc::new(get_test_decimals_column_descr::<T>(
4077 max_def_level,
4078 max_rep_level,
4079 ));
4080 let column_writer = get_column_writer(descr, props, page_writer);
4081 get_typed_column_writer::<T>(column_writer)
4082 }
4083
4084 fn get_test_decimals_column_descr<T: DataType>(
4086 max_def_level: i16,
4087 max_rep_level: i16,
4088 ) -> ColumnDescriptor {
4089 let path = ColumnPath::from("col");
4090 let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4091 .with_length(16)
4092 .with_logical_type(Some(LogicalType::Decimal {
4093 scale: 2,
4094 precision: 3,
4095 }))
4096 .with_scale(2)
4097 .with_precision(3)
4098 .build()
4099 .unwrap();
4100 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4101 }
4102
4103 fn float16_statistics_roundtrip(
4104 values: &[FixedLenByteArray],
4105 ) -> ValueStatistics<FixedLenByteArray> {
4106 let page_writer = get_test_page_writer();
4107 let mut writer = get_test_float16_column_writer(page_writer, Default::default());
4108 writer.write_batch(values, None, None).unwrap();
4109
4110 let metadata = writer.close().unwrap().metadata;
4111 if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
4112 stats.clone()
4113 } else {
4114 panic!("metadata missing statistics");
4115 }
4116 }
4117
4118 fn get_test_float16_column_writer(
4119 page_writer: Box<dyn PageWriter>,
4120 props: WriterPropertiesPtr,
4121 ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
4122 let descr = Arc::new(get_test_float16_column_descr(0, 0));
4123 let column_writer = get_column_writer(descr, props, page_writer);
4124 get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
4125 }
4126
4127 fn get_test_float16_column_descr(max_def_level: i16, max_rep_level: i16) -> ColumnDescriptor {
4128 let path = ColumnPath::from("col");
4129 let tpe =
4130 SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
4131 .with_length(2)
4132 .with_logical_type(Some(LogicalType::Float16))
4133 .build()
4134 .unwrap();
4135 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4136 }
4137
4138 fn get_test_interval_column_writer(
4139 page_writer: Box<dyn PageWriter>,
4140 ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
4141 let descr = Arc::new(get_test_interval_column_descr());
4142 let column_writer = get_column_writer(descr, Default::default(), page_writer);
4143 get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
4144 }
4145
4146 fn get_test_interval_column_descr() -> ColumnDescriptor {
4147 let path = ColumnPath::from("col");
4148 let tpe =
4149 SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
4150 .with_length(12)
4151 .with_converted_type(ConvertedType::INTERVAL)
4152 .build()
4153 .unwrap();
4154 ColumnDescriptor::new(Arc::new(tpe), 0, 0, path)
4155 }
4156
4157 fn get_test_unsigned_int_given_as_converted_column_writer<'a, T: DataType>(
4159 page_writer: Box<dyn PageWriter + 'a>,
4160 max_def_level: i16,
4161 max_rep_level: i16,
4162 props: WriterPropertiesPtr,
4163 ) -> ColumnWriterImpl<'a, T> {
4164 let descr = Arc::new(get_test_converted_type_unsigned_integer_column_descr::<T>(
4165 max_def_level,
4166 max_rep_level,
4167 ));
4168 let column_writer = get_column_writer(descr, props, page_writer);
4169 get_typed_column_writer::<T>(column_writer)
4170 }
4171
4172 fn get_test_converted_type_unsigned_integer_column_descr<T: DataType>(
4174 max_def_level: i16,
4175 max_rep_level: i16,
4176 ) -> ColumnDescriptor {
4177 let path = ColumnPath::from("col");
4178 let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4179 .with_converted_type(ConvertedType::UINT_32)
4180 .build()
4181 .unwrap();
4182 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4183 }
4184}