parquet/column/writer/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains column writer API.
19
20use bytes::Bytes;
21use half::f16;
22
23use crate::bloom_filter::Sbbf;
24use crate::format::{BoundaryOrder, ColumnIndex, OffsetIndex};
25use std::collections::{BTreeSet, VecDeque};
26use std::str;
27
28use crate::basic::{Compression, ConvertedType, Encoding, LogicalType, PageType, Type};
29use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter};
30use crate::column::writer::encoder::{ColumnValueEncoder, ColumnValueEncoderImpl, ColumnValues};
31use crate::compression::{create_codec, Codec, CodecOptionsBuilder};
32use crate::data_type::private::ParquetValueType;
33use crate::data_type::*;
34use crate::encodings::levels::LevelEncoder;
35#[cfg(feature = "encryption")]
36use crate::encryption::encrypt::get_column_crypto_metadata;
37use crate::errors::{ParquetError, Result};
38use crate::file::metadata::{
39    ColumnChunkMetaData, ColumnChunkMetaDataBuilder, ColumnIndexBuilder, LevelHistogram,
40    OffsetIndexBuilder,
41};
42use crate::file::page_encoding_stats::PageEncodingStats;
43use crate::file::properties::{
44    EnabledStatistics, WriterProperties, WriterPropertiesPtr, WriterVersion,
45};
46use crate::file::statistics::{Statistics, ValueStatistics};
47use crate::schema::types::{ColumnDescPtr, ColumnDescriptor};
48
49pub(crate) mod encoder;
50
51macro_rules! downcast_writer {
52    ($e:expr, $i:ident, $b:expr) => {
53        match $e {
54            Self::BoolColumnWriter($i) => $b,
55            Self::Int32ColumnWriter($i) => $b,
56            Self::Int64ColumnWriter($i) => $b,
57            Self::Int96ColumnWriter($i) => $b,
58            Self::FloatColumnWriter($i) => $b,
59            Self::DoubleColumnWriter($i) => $b,
60            Self::ByteArrayColumnWriter($i) => $b,
61            Self::FixedLenByteArrayColumnWriter($i) => $b,
62        }
63    };
64}
65
66/// Column writer for a Parquet type.
67pub enum ColumnWriter<'a> {
68    /// Column writer for boolean type
69    BoolColumnWriter(ColumnWriterImpl<'a, BoolType>),
70    /// Column writer for int32 type
71    Int32ColumnWriter(ColumnWriterImpl<'a, Int32Type>),
72    /// Column writer for int64 type
73    Int64ColumnWriter(ColumnWriterImpl<'a, Int64Type>),
74    /// Column writer for int96 (timestamp) type
75    Int96ColumnWriter(ColumnWriterImpl<'a, Int96Type>),
76    /// Column writer for float type
77    FloatColumnWriter(ColumnWriterImpl<'a, FloatType>),
78    /// Column writer for double type
79    DoubleColumnWriter(ColumnWriterImpl<'a, DoubleType>),
80    /// Column writer for byte array type
81    ByteArrayColumnWriter(ColumnWriterImpl<'a, ByteArrayType>),
82    /// Column writer for fixed length byte array type
83    FixedLenByteArrayColumnWriter(ColumnWriterImpl<'a, FixedLenByteArrayType>),
84}
85
86impl ColumnWriter<'_> {
87    /// Returns the estimated total memory usage
88    #[cfg(feature = "arrow")]
89    pub(crate) fn memory_size(&self) -> usize {
90        downcast_writer!(self, typed, typed.memory_size())
91    }
92
93    /// Returns the estimated total encoded bytes for this column writer
94    #[cfg(feature = "arrow")]
95    pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
96        downcast_writer!(self, typed, typed.get_estimated_total_bytes())
97    }
98
99    /// Close this [`ColumnWriter`]
100    pub fn close(self) -> Result<ColumnCloseResult> {
101        downcast_writer!(self, typed, typed.close())
102    }
103}
104
105/// Gets a specific column writer corresponding to column descriptor `descr`.
106pub fn get_column_writer<'a>(
107    descr: ColumnDescPtr,
108    props: WriterPropertiesPtr,
109    page_writer: Box<dyn PageWriter + 'a>,
110) -> ColumnWriter<'a> {
111    match descr.physical_type() {
112        Type::BOOLEAN => {
113            ColumnWriter::BoolColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
114        }
115        Type::INT32 => {
116            ColumnWriter::Int32ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
117        }
118        Type::INT64 => {
119            ColumnWriter::Int64ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
120        }
121        Type::INT96 => {
122            ColumnWriter::Int96ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
123        }
124        Type::FLOAT => {
125            ColumnWriter::FloatColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
126        }
127        Type::DOUBLE => {
128            ColumnWriter::DoubleColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
129        }
130        Type::BYTE_ARRAY => {
131            ColumnWriter::ByteArrayColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
132        }
133        Type::FIXED_LEN_BYTE_ARRAY => ColumnWriter::FixedLenByteArrayColumnWriter(
134            ColumnWriterImpl::new(descr, props, page_writer),
135        ),
136    }
137}
138
139/// Gets a typed column writer for the specific type `T`, by "up-casting" `col_writer` of
140/// non-generic type to a generic column writer type `ColumnWriterImpl`.
141///
142/// Panics if actual enum value for `col_writer` does not match the type `T`.
143pub fn get_typed_column_writer<T: DataType>(col_writer: ColumnWriter) -> ColumnWriterImpl<T> {
144    T::get_column_writer(col_writer).unwrap_or_else(|| {
145        panic!(
146            "Failed to convert column writer into a typed column writer for `{}` type",
147            T::get_physical_type()
148        )
149    })
150}
151
152/// Similar to `get_typed_column_writer` but returns a reference.
153pub fn get_typed_column_writer_ref<'a, 'b: 'a, T: DataType>(
154    col_writer: &'b ColumnWriter<'a>,
155) -> &'b ColumnWriterImpl<'a, T> {
156    T::get_column_writer_ref(col_writer).unwrap_or_else(|| {
157        panic!(
158            "Failed to convert column writer into a typed column writer for `{}` type",
159            T::get_physical_type()
160        )
161    })
162}
163
164/// Similar to `get_typed_column_writer` but returns a reference.
165pub fn get_typed_column_writer_mut<'a, 'b: 'a, T: DataType>(
166    col_writer: &'a mut ColumnWriter<'b>,
167) -> &'a mut ColumnWriterImpl<'b, T> {
168    T::get_column_writer_mut(col_writer).unwrap_or_else(|| {
169        panic!(
170            "Failed to convert column writer into a typed column writer for `{}` type",
171            T::get_physical_type()
172        )
173    })
174}
175
176/// Metadata returned by [`GenericColumnWriter::close`]
177#[derive(Debug, Clone)]
178pub struct ColumnCloseResult {
179    /// The total number of bytes written
180    pub bytes_written: u64,
181    /// The total number of rows written
182    pub rows_written: u64,
183    /// Metadata for this column chunk
184    pub metadata: ColumnChunkMetaData,
185    /// Optional bloom filter for this column
186    pub bloom_filter: Option<Sbbf>,
187    /// Optional column index, for filtering
188    pub column_index: Option<ColumnIndex>,
189    /// Optional offset index, identifying page locations
190    pub offset_index: Option<OffsetIndex>,
191}
192
193// Metrics per page
194#[derive(Default)]
195struct PageMetrics {
196    num_buffered_values: u32,
197    num_buffered_rows: u32,
198    num_page_nulls: u64,
199    repetition_level_histogram: Option<LevelHistogram>,
200    definition_level_histogram: Option<LevelHistogram>,
201}
202
203impl PageMetrics {
204    fn new() -> Self {
205        Default::default()
206    }
207
208    /// Initialize the repetition level histogram
209    fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
210        self.repetition_level_histogram = LevelHistogram::try_new(max_level);
211        self
212    }
213
214    /// Initialize the definition level histogram
215    fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
216        self.definition_level_histogram = LevelHistogram::try_new(max_level);
217        self
218    }
219
220    /// Resets the state of this `PageMetrics` to the initial state.
221    /// If histograms have been initialized their contents will be reset to zero.
222    fn new_page(&mut self) {
223        self.num_buffered_values = 0;
224        self.num_buffered_rows = 0;
225        self.num_page_nulls = 0;
226        self.repetition_level_histogram
227            .as_mut()
228            .map(LevelHistogram::reset);
229        self.definition_level_histogram
230            .as_mut()
231            .map(LevelHistogram::reset);
232    }
233
234    /// Updates histogram values using provided repetition levels
235    fn update_repetition_level_histogram(&mut self, levels: &[i16]) {
236        if let Some(ref mut rep_hist) = self.repetition_level_histogram {
237            rep_hist.update_from_levels(levels);
238        }
239    }
240
241    /// Updates histogram values using provided definition levels
242    fn update_definition_level_histogram(&mut self, levels: &[i16]) {
243        if let Some(ref mut def_hist) = self.definition_level_histogram {
244            def_hist.update_from_levels(levels);
245        }
246    }
247}
248
249// Metrics per column writer
250#[derive(Default)]
251struct ColumnMetrics<T: Default> {
252    total_bytes_written: u64,
253    total_rows_written: u64,
254    total_uncompressed_size: u64,
255    total_compressed_size: u64,
256    total_num_values: u64,
257    dictionary_page_offset: Option<u64>,
258    data_page_offset: Option<u64>,
259    min_column_value: Option<T>,
260    max_column_value: Option<T>,
261    num_column_nulls: u64,
262    column_distinct_count: Option<u64>,
263    variable_length_bytes: Option<i64>,
264    repetition_level_histogram: Option<LevelHistogram>,
265    definition_level_histogram: Option<LevelHistogram>,
266}
267
268impl<T: Default> ColumnMetrics<T> {
269    fn new() -> Self {
270        Default::default()
271    }
272
273    /// Initialize the repetition level histogram
274    fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
275        self.repetition_level_histogram = LevelHistogram::try_new(max_level);
276        self
277    }
278
279    /// Initialize the definition level histogram
280    fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
281        self.definition_level_histogram = LevelHistogram::try_new(max_level);
282        self
283    }
284
285    /// Sum `page_histogram` into `chunk_histogram`
286    fn update_histogram(
287        chunk_histogram: &mut Option<LevelHistogram>,
288        page_histogram: &Option<LevelHistogram>,
289    ) {
290        if let (Some(page_hist), Some(chunk_hist)) = (page_histogram, chunk_histogram) {
291            chunk_hist.add(page_hist);
292        }
293    }
294
295    /// Sum the provided PageMetrics histograms into the chunk histograms. Does nothing if
296    /// page histograms are not initialized.
297    fn update_from_page_metrics(&mut self, page_metrics: &PageMetrics) {
298        ColumnMetrics::<T>::update_histogram(
299            &mut self.definition_level_histogram,
300            &page_metrics.definition_level_histogram,
301        );
302        ColumnMetrics::<T>::update_histogram(
303            &mut self.repetition_level_histogram,
304            &page_metrics.repetition_level_histogram,
305        );
306    }
307
308    /// Sum the provided page variable_length_bytes into the chunk variable_length_bytes
309    fn update_variable_length_bytes(&mut self, variable_length_bytes: Option<i64>) {
310        if let Some(var_bytes) = variable_length_bytes {
311            *self.variable_length_bytes.get_or_insert(0) += var_bytes;
312        }
313    }
314}
315
316/// Typed column writer for a primitive column.
317pub type ColumnWriterImpl<'a, T> = GenericColumnWriter<'a, ColumnValueEncoderImpl<T>>;
318
319/// Generic column writer for a primitive column.
320pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> {
321    // Column writer properties
322    descr: ColumnDescPtr,
323    props: WriterPropertiesPtr,
324    statistics_enabled: EnabledStatistics,
325
326    page_writer: Box<dyn PageWriter + 'a>,
327    codec: Compression,
328    compressor: Option<Box<dyn Codec>>,
329    encoder: E,
330
331    page_metrics: PageMetrics,
332    // Metrics per column writer
333    column_metrics: ColumnMetrics<E::T>,
334
335    /// The order of encodings within the generated metadata does not impact its meaning,
336    /// but we use a BTreeSet so that the output is deterministic
337    encodings: BTreeSet<Encoding>,
338    encoding_stats: Vec<PageEncodingStats>,
339    // Reused buffers
340    def_levels_sink: Vec<i16>,
341    rep_levels_sink: Vec<i16>,
342    data_pages: VecDeque<CompressedPage>,
343    // column index and offset index
344    column_index_builder: ColumnIndexBuilder,
345    offset_index_builder: Option<OffsetIndexBuilder>,
346
347    // Below fields used to incrementally check boundary order across data pages.
348    // We assume they are ascending/descending until proven wrong.
349    data_page_boundary_ascending: bool,
350    data_page_boundary_descending: bool,
351    /// (min, max)
352    last_non_null_data_page_min_max: Option<(E::T, E::T)>,
353}
354
355impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
356    /// Returns a new instance of [`GenericColumnWriter`].
357    pub fn new(
358        descr: ColumnDescPtr,
359        props: WriterPropertiesPtr,
360        page_writer: Box<dyn PageWriter + 'a>,
361    ) -> Self {
362        let codec = props.compression(descr.path());
363        let codec_options = CodecOptionsBuilder::default().build();
364        let compressor = create_codec(codec, &codec_options).unwrap();
365        let encoder = E::try_new(&descr, props.as_ref()).unwrap();
366
367        let statistics_enabled = props.statistics_enabled(descr.path());
368
369        let mut encodings = BTreeSet::new();
370        // Used for level information
371        encodings.insert(Encoding::RLE);
372
373        let mut page_metrics = PageMetrics::new();
374        let mut column_metrics = ColumnMetrics::<E::T>::new();
375
376        // Initialize level histograms if collecting page or chunk statistics
377        if statistics_enabled != EnabledStatistics::None {
378            page_metrics = page_metrics
379                .with_repetition_level_histogram(descr.max_rep_level())
380                .with_definition_level_histogram(descr.max_def_level());
381            column_metrics = column_metrics
382                .with_repetition_level_histogram(descr.max_rep_level())
383                .with_definition_level_histogram(descr.max_def_level())
384        }
385
386        // Disable column_index_builder if not collecting page statistics.
387        let mut column_index_builder = ColumnIndexBuilder::new();
388        if statistics_enabled != EnabledStatistics::Page {
389            column_index_builder.to_invalid()
390        }
391
392        // Disable offset_index_builder if requested by user.
393        let offset_index_builder = match props.offset_index_disabled() {
394            false => Some(OffsetIndexBuilder::new()),
395            _ => None,
396        };
397
398        Self {
399            descr,
400            props,
401            statistics_enabled,
402            page_writer,
403            codec,
404            compressor,
405            encoder,
406            def_levels_sink: vec![],
407            rep_levels_sink: vec![],
408            data_pages: VecDeque::new(),
409            page_metrics,
410            column_metrics,
411            column_index_builder,
412            offset_index_builder,
413            encodings,
414            encoding_stats: vec![],
415            data_page_boundary_ascending: true,
416            data_page_boundary_descending: true,
417            last_non_null_data_page_min_max: None,
418        }
419    }
420
421    #[allow(clippy::too_many_arguments)]
422    pub(crate) fn write_batch_internal(
423        &mut self,
424        values: &E::Values,
425        value_indices: Option<&[usize]>,
426        def_levels: Option<&[i16]>,
427        rep_levels: Option<&[i16]>,
428        min: Option<&E::T>,
429        max: Option<&E::T>,
430        distinct_count: Option<u64>,
431    ) -> Result<usize> {
432        // Check if number of definition levels is the same as number of repetition levels.
433        if let (Some(def), Some(rep)) = (def_levels, rep_levels) {
434            if def.len() != rep.len() {
435                return Err(general_err!(
436                    "Inconsistent length of definition and repetition levels: {} != {}",
437                    def.len(),
438                    rep.len()
439                ));
440            }
441        }
442
443        // We check for DataPage limits only after we have inserted the values. If a user
444        // writes a large number of values, the DataPage size can be well above the limit.
445        //
446        // The purpose of this chunking is to bound this. Even if a user writes large
447        // number of values, the chunking will ensure that we add data page at a
448        // reasonable pagesize limit.
449
450        // TODO: find out why we don't account for size of levels when we estimate page
451        // size.
452
453        let num_levels = match def_levels {
454            Some(def_levels) => def_levels.len(),
455            None => values.len(),
456        };
457
458        if let Some(min) = min {
459            update_min(&self.descr, min, &mut self.column_metrics.min_column_value);
460        }
461        if let Some(max) = max {
462            update_max(&self.descr, max, &mut self.column_metrics.max_column_value);
463        }
464
465        // We can only set the distinct count if there are no other writes
466        if self.encoder.num_values() == 0 {
467            self.column_metrics.column_distinct_count = distinct_count;
468        } else {
469            self.column_metrics.column_distinct_count = None;
470        }
471
472        let mut values_offset = 0;
473        let mut levels_offset = 0;
474        let base_batch_size = self.props.write_batch_size();
475        while levels_offset < num_levels {
476            let mut end_offset = num_levels.min(levels_offset + base_batch_size);
477
478            // Split at record boundary
479            if let Some(r) = rep_levels {
480                while end_offset < r.len() && r[end_offset] != 0 {
481                    end_offset += 1;
482                }
483            }
484
485            values_offset += self.write_mini_batch(
486                values,
487                values_offset,
488                value_indices,
489                end_offset - levels_offset,
490                def_levels.map(|lv| &lv[levels_offset..end_offset]),
491                rep_levels.map(|lv| &lv[levels_offset..end_offset]),
492            )?;
493            levels_offset = end_offset;
494        }
495
496        // Return total number of values processed.
497        Ok(values_offset)
498    }
499
500    /// Writes batch of values, definition levels and repetition levels.
501    /// Returns number of values processed (written).
502    ///
503    /// If definition and repetition levels are provided, we write fully those levels and
504    /// select how many values to write (this number will be returned), since number of
505    /// actual written values may be smaller than provided values.
506    ///
507    /// If only values are provided, then all values are written and the length of
508    /// of the values buffer is returned.
509    ///
510    /// Definition and/or repetition levels can be omitted, if values are
511    /// non-nullable and/or non-repeated.
512    pub fn write_batch(
513        &mut self,
514        values: &E::Values,
515        def_levels: Option<&[i16]>,
516        rep_levels: Option<&[i16]>,
517    ) -> Result<usize> {
518        self.write_batch_internal(values, None, def_levels, rep_levels, None, None, None)
519    }
520
521    /// Writer may optionally provide pre-calculated statistics for use when computing
522    /// chunk-level statistics
523    ///
524    /// NB: [`WriterProperties::statistics_enabled`] must be set to [`EnabledStatistics::Chunk`]
525    /// for these statistics to take effect. If [`EnabledStatistics::None`] they will be ignored,
526    /// and if [`EnabledStatistics::Page`] the chunk statistics will instead be computed from the
527    /// computed page statistics
528    pub fn write_batch_with_statistics(
529        &mut self,
530        values: &E::Values,
531        def_levels: Option<&[i16]>,
532        rep_levels: Option<&[i16]>,
533        min: Option<&E::T>,
534        max: Option<&E::T>,
535        distinct_count: Option<u64>,
536    ) -> Result<usize> {
537        self.write_batch_internal(
538            values,
539            None,
540            def_levels,
541            rep_levels,
542            min,
543            max,
544            distinct_count,
545        )
546    }
547
548    /// Returns the estimated total memory usage.
549    ///
550    /// Unlike [`Self::get_estimated_total_bytes`] this is an estimate
551    /// of the current memory usage and not the final anticipated encoded size.
552    #[cfg(feature = "arrow")]
553    pub(crate) fn memory_size(&self) -> usize {
554        self.column_metrics.total_bytes_written as usize + self.encoder.estimated_memory_size()
555    }
556
557    /// Returns total number of bytes written by this column writer so far.
558    /// This value is also returned when column writer is closed.
559    ///
560    /// Note: this value does not include any buffered data that has not
561    /// yet been flushed to a page.
562    pub fn get_total_bytes_written(&self) -> u64 {
563        self.column_metrics.total_bytes_written
564    }
565
566    /// Returns the estimated total encoded bytes for this column writer.
567    ///
568    /// Unlike [`Self::get_total_bytes_written`] this includes an estimate
569    /// of any data that has not yet been flushed to a page, based on it's
570    /// anticipated encoded size.
571    #[cfg(feature = "arrow")]
572    pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
573        self.data_pages
574            .iter()
575            .map(|page| page.data().len() as u64)
576            .sum::<u64>()
577            + self.column_metrics.total_bytes_written
578            + self.encoder.estimated_data_page_size() as u64
579            + self.encoder.estimated_dict_page_size().unwrap_or_default() as u64
580    }
581
582    /// Returns total number of rows written by this column writer so far.
583    /// This value is also returned when column writer is closed.
584    pub fn get_total_rows_written(&self) -> u64 {
585        self.column_metrics.total_rows_written
586    }
587
588    /// Returns a reference to a [`ColumnDescPtr`]
589    pub fn get_descriptor(&self) -> &ColumnDescPtr {
590        &self.descr
591    }
592
593    /// Finalizes writes and closes the column writer.
594    /// Returns total bytes written, total rows written and column chunk metadata.
595    pub fn close(mut self) -> Result<ColumnCloseResult> {
596        if self.page_metrics.num_buffered_values > 0 {
597            self.add_data_page()?;
598        }
599        if self.encoder.has_dictionary() {
600            self.write_dictionary_page()?;
601        }
602        self.flush_data_pages()?;
603        let metadata = self.build_column_metadata()?;
604        self.page_writer.close()?;
605
606        let boundary_order = match (
607            self.data_page_boundary_ascending,
608            self.data_page_boundary_descending,
609        ) {
610            // If the lists are composed of equal elements then will be marked as ascending
611            // (Also the case if all pages are null pages)
612            (true, _) => BoundaryOrder::ASCENDING,
613            (false, true) => BoundaryOrder::DESCENDING,
614            (false, false) => BoundaryOrder::UNORDERED,
615        };
616        self.column_index_builder.set_boundary_order(boundary_order);
617
618        let column_index = self
619            .column_index_builder
620            .valid()
621            .then(|| self.column_index_builder.build_to_thrift());
622
623        let offset_index = self.offset_index_builder.map(|b| b.build_to_thrift());
624
625        Ok(ColumnCloseResult {
626            bytes_written: self.column_metrics.total_bytes_written,
627            rows_written: self.column_metrics.total_rows_written,
628            bloom_filter: self.encoder.flush_bloom_filter(),
629            metadata,
630            column_index,
631            offset_index,
632        })
633    }
634
635    /// Writes mini batch of values, definition and repetition levels.
636    /// This allows fine-grained processing of values and maintaining a reasonable
637    /// page size.
638    fn write_mini_batch(
639        &mut self,
640        values: &E::Values,
641        values_offset: usize,
642        value_indices: Option<&[usize]>,
643        num_levels: usize,
644        def_levels: Option<&[i16]>,
645        rep_levels: Option<&[i16]>,
646    ) -> Result<usize> {
647        // Process definition levels and determine how many values to write.
648        let values_to_write = if self.descr.max_def_level() > 0 {
649            let levels = def_levels.ok_or_else(|| {
650                general_err!(
651                    "Definition levels are required, because max definition level = {}",
652                    self.descr.max_def_level()
653                )
654            })?;
655
656            let mut values_to_write = 0;
657            for &level in levels {
658                if level == self.descr.max_def_level() {
659                    values_to_write += 1;
660                } else {
661                    // We must always compute this as it is used to populate v2 pages
662                    self.page_metrics.num_page_nulls += 1
663                }
664            }
665
666            // Update histogram
667            self.page_metrics.update_definition_level_histogram(levels);
668
669            self.def_levels_sink.extend_from_slice(levels);
670            values_to_write
671        } else {
672            num_levels
673        };
674
675        // Process repetition levels and determine how many rows we are about to process.
676        if self.descr.max_rep_level() > 0 {
677            // A row could contain more than one value.
678            let levels = rep_levels.ok_or_else(|| {
679                general_err!(
680                    "Repetition levels are required, because max repetition level = {}",
681                    self.descr.max_rep_level()
682                )
683            })?;
684
685            if !levels.is_empty() && levels[0] != 0 {
686                return Err(general_err!(
687                    "Write must start at a record boundary, got non-zero repetition level of {}",
688                    levels[0]
689                ));
690            }
691
692            // Count the occasions where we start a new row
693            for &level in levels {
694                self.page_metrics.num_buffered_rows += (level == 0) as u32
695            }
696
697            // Update histogram
698            self.page_metrics.update_repetition_level_histogram(levels);
699
700            self.rep_levels_sink.extend_from_slice(levels);
701        } else {
702            // Each value is exactly one row.
703            // Equals to the number of values, we count nulls as well.
704            self.page_metrics.num_buffered_rows += num_levels as u32;
705        }
706
707        match value_indices {
708            Some(indices) => {
709                let indices = &indices[values_offset..values_offset + values_to_write];
710                self.encoder.write_gather(values, indices)?;
711            }
712            None => self.encoder.write(values, values_offset, values_to_write)?,
713        }
714
715        self.page_metrics.num_buffered_values += num_levels as u32;
716
717        if self.should_add_data_page() {
718            self.add_data_page()?;
719        }
720
721        if self.should_dict_fallback() {
722            self.dict_fallback()?;
723        }
724
725        Ok(values_to_write)
726    }
727
728    /// Returns true if we need to fall back to non-dictionary encoding.
729    ///
730    /// We can only fall back if dictionary encoder is set and we have exceeded dictionary
731    /// size.
732    #[inline]
733    fn should_dict_fallback(&self) -> bool {
734        match self.encoder.estimated_dict_page_size() {
735            Some(size) => {
736                size >= self
737                    .props
738                    .column_dictionary_page_size_limit(self.descr.path())
739            }
740            None => false,
741        }
742    }
743
744    /// Returns true if there is enough data for a data page, false otherwise.
745    #[inline]
746    fn should_add_data_page(&self) -> bool {
747        // This is necessary in the event of a much larger dictionary size than page size
748        //
749        // In such a scenario the dictionary decoder may return an estimated encoded
750        // size in excess of the page size limit, even when there are no buffered values
751        if self.page_metrics.num_buffered_values == 0 {
752            return false;
753        }
754
755        self.page_metrics.num_buffered_rows as usize >= self.props.data_page_row_count_limit()
756            || self.encoder.estimated_data_page_size() >= self.props.data_page_size_limit()
757    }
758
759    /// Performs dictionary fallback.
760    /// Prepares and writes dictionary and all data pages into page writer.
761    fn dict_fallback(&mut self) -> Result<()> {
762        // At this point we know that we need to fall back.
763        if self.page_metrics.num_buffered_values > 0 {
764            self.add_data_page()?;
765        }
766        self.write_dictionary_page()?;
767        self.flush_data_pages()?;
768        Ok(())
769    }
770
771    /// Update the column index and offset index when adding the data page
772    fn update_column_offset_index(
773        &mut self,
774        page_statistics: Option<&ValueStatistics<E::T>>,
775        page_variable_length_bytes: Option<i64>,
776    ) {
777        // update the column index
778        let null_page =
779            (self.page_metrics.num_buffered_rows as u64) == self.page_metrics.num_page_nulls;
780        // a page contains only null values,
781        // and writers have to set the corresponding entries in min_values and max_values to byte[0]
782        if null_page && self.column_index_builder.valid() {
783            self.column_index_builder.append(
784                null_page,
785                vec![],
786                vec![],
787                self.page_metrics.num_page_nulls as i64,
788            );
789        } else if self.column_index_builder.valid() {
790            // from page statistics
791            // If can't get the page statistics, ignore this column/offset index for this column chunk
792            match &page_statistics {
793                None => {
794                    self.column_index_builder.to_invalid();
795                }
796                Some(stat) => {
797                    // Check if min/max are still ascending/descending across pages
798                    let new_min = stat.min_opt().unwrap();
799                    let new_max = stat.max_opt().unwrap();
800                    if let Some((last_min, last_max)) = &self.last_non_null_data_page_min_max {
801                        if self.data_page_boundary_ascending {
802                            // If last min/max are greater than new min/max then not ascending anymore
803                            let not_ascending = compare_greater(&self.descr, last_min, new_min)
804                                || compare_greater(&self.descr, last_max, new_max);
805                            if not_ascending {
806                                self.data_page_boundary_ascending = false;
807                            }
808                        }
809
810                        if self.data_page_boundary_descending {
811                            // If new min/max are greater than last min/max then not descending anymore
812                            let not_descending = compare_greater(&self.descr, new_min, last_min)
813                                || compare_greater(&self.descr, new_max, last_max);
814                            if not_descending {
815                                self.data_page_boundary_descending = false;
816                            }
817                        }
818                    }
819                    self.last_non_null_data_page_min_max = Some((new_min.clone(), new_max.clone()));
820
821                    if self.can_truncate_value() {
822                        self.column_index_builder.append(
823                            null_page,
824                            self.truncate_min_value(
825                                self.props.column_index_truncate_length(),
826                                stat.min_bytes_opt().unwrap(),
827                            )
828                            .0,
829                            self.truncate_max_value(
830                                self.props.column_index_truncate_length(),
831                                stat.max_bytes_opt().unwrap(),
832                            )
833                            .0,
834                            self.page_metrics.num_page_nulls as i64,
835                        );
836                    } else {
837                        self.column_index_builder.append(
838                            null_page,
839                            stat.min_bytes_opt().unwrap().to_vec(),
840                            stat.max_bytes_opt().unwrap().to_vec(),
841                            self.page_metrics.num_page_nulls as i64,
842                        );
843                    }
844                }
845            }
846        }
847
848        // Append page histograms to the `ColumnIndex` histograms
849        self.column_index_builder.append_histograms(
850            &self.page_metrics.repetition_level_histogram,
851            &self.page_metrics.definition_level_histogram,
852        );
853
854        // Update the offset index
855        if let Some(builder) = self.offset_index_builder.as_mut() {
856            builder.append_row_count(self.page_metrics.num_buffered_rows as i64);
857            builder.append_unencoded_byte_array_data_bytes(page_variable_length_bytes);
858        }
859    }
860
861    /// Determine if we should allow truncating min/max values for this column's statistics
862    fn can_truncate_value(&self) -> bool {
863        match self.descr.physical_type() {
864            // Don't truncate for Float16 and Decimal because their sort order is different
865            // from that of FIXED_LEN_BYTE_ARRAY sort order.
866            // So truncation of those types could lead to inaccurate min/max statistics
867            Type::FIXED_LEN_BYTE_ARRAY
868                if !matches!(
869                    self.descr.logical_type(),
870                    Some(LogicalType::Decimal { .. }) | Some(LogicalType::Float16)
871                ) =>
872            {
873                true
874            }
875            Type::BYTE_ARRAY => true,
876            // Truncation only applies for fba/binary physical types
877            _ => false,
878        }
879    }
880
881    /// Returns `true` if this column's logical type is a UTF-8 string.
882    fn is_utf8(&self) -> bool {
883        self.get_descriptor().logical_type() == Some(LogicalType::String)
884            || self.get_descriptor().converted_type() == ConvertedType::UTF8
885    }
886
887    /// Truncates a binary statistic to at most `truncation_length` bytes.
888    ///
889    /// If truncation is not possible, returns `data`.
890    ///
891    /// The `bool` in the returned tuple indicates whether truncation occurred or not.
892    ///
893    /// UTF-8 Note:
894    /// If the column type indicates UTF-8, and `data` contains valid UTF-8, then the result will
895    /// also remain valid UTF-8, but may be less tnan `truncation_length` bytes to avoid splitting
896    /// on non-character boundaries.
897    fn truncate_min_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
898        truncation_length
899            .filter(|l| data.len() > *l)
900            .and_then(|l|
901                // don't do extra work if this column isn't UTF-8
902                if self.is_utf8() {
903                    match str::from_utf8(data) {
904                        Ok(str_data) => truncate_utf8(str_data, l),
905                        Err(_) => Some(data[..l].to_vec()),
906                    }
907                } else {
908                    Some(data[..l].to_vec())
909                }
910            )
911            .map(|truncated| (truncated, true))
912            .unwrap_or_else(|| (data.to_vec(), false))
913    }
914
915    /// Truncates a binary statistic to at most `truncation_length` bytes, and then increment the
916    /// final byte(s) to yield a valid upper bound. This may result in a result of less than
917    /// `truncation_length` bytes if the last byte(s) overflows.
918    ///
919    /// If truncation is not possible, returns `data`.
920    ///
921    /// The `bool` in the returned tuple indicates whether truncation occurred or not.
922    ///
923    /// UTF-8 Note:
924    /// If the column type indicates UTF-8, and `data` contains valid UTF-8, then the result will
925    /// also remain valid UTF-8 (but again may be less than `truncation_length` bytes). If `data`
926    /// does not contain valid UTF-8, then truncation will occur as if the column is non-string
927    /// binary.
928    fn truncate_max_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
929        truncation_length
930            .filter(|l| data.len() > *l)
931            .and_then(|l|
932                // don't do extra work if this column isn't UTF-8
933                if self.is_utf8() {
934                    match str::from_utf8(data) {
935                        Ok(str_data) => truncate_and_increment_utf8(str_data, l),
936                        Err(_) => increment(data[..l].to_vec()),
937                    }
938                } else {
939                    increment(data[..l].to_vec())
940                }
941            )
942            .map(|truncated| (truncated, true))
943            .unwrap_or_else(|| (data.to_vec(), false))
944    }
945
946    /// Truncate the min and max values that will be written to a data page
947    /// header or column chunk Statistics
948    fn truncate_statistics(&self, statistics: Statistics) -> Statistics {
949        let backwards_compatible_min_max = self.descr.sort_order().is_signed();
950        match statistics {
951            Statistics::ByteArray(stats) if stats._internal_has_min_max_set() => {
952                let (min, did_truncate_min) = self.truncate_min_value(
953                    self.props.statistics_truncate_length(),
954                    stats.min_bytes_opt().unwrap(),
955                );
956                let (max, did_truncate_max) = self.truncate_max_value(
957                    self.props.statistics_truncate_length(),
958                    stats.max_bytes_opt().unwrap(),
959                );
960                Statistics::ByteArray(
961                    ValueStatistics::new(
962                        Some(min.into()),
963                        Some(max.into()),
964                        stats.distinct_count(),
965                        stats.null_count_opt(),
966                        backwards_compatible_min_max,
967                    )
968                    .with_max_is_exact(!did_truncate_max)
969                    .with_min_is_exact(!did_truncate_min),
970                )
971            }
972            Statistics::FixedLenByteArray(stats)
973                if (stats._internal_has_min_max_set() && self.can_truncate_value()) =>
974            {
975                let (min, did_truncate_min) = self.truncate_min_value(
976                    self.props.statistics_truncate_length(),
977                    stats.min_bytes_opt().unwrap(),
978                );
979                let (max, did_truncate_max) = self.truncate_max_value(
980                    self.props.statistics_truncate_length(),
981                    stats.max_bytes_opt().unwrap(),
982                );
983                Statistics::FixedLenByteArray(
984                    ValueStatistics::new(
985                        Some(min.into()),
986                        Some(max.into()),
987                        stats.distinct_count(),
988                        stats.null_count_opt(),
989                        backwards_compatible_min_max,
990                    )
991                    .with_max_is_exact(!did_truncate_max)
992                    .with_min_is_exact(!did_truncate_min),
993                )
994            }
995            stats => stats,
996        }
997    }
998
999    /// Adds data page.
1000    /// Data page is either buffered in case of dictionary encoding or written directly.
1001    fn add_data_page(&mut self) -> Result<()> {
1002        // Extract encoded values
1003        let values_data = self.encoder.flush_data_page()?;
1004
1005        let max_def_level = self.descr.max_def_level();
1006        let max_rep_level = self.descr.max_rep_level();
1007
1008        self.column_metrics.num_column_nulls += self.page_metrics.num_page_nulls;
1009
1010        let page_statistics = match (values_data.min_value, values_data.max_value) {
1011            (Some(min), Some(max)) => {
1012                // Update chunk level statistics
1013                update_min(&self.descr, &min, &mut self.column_metrics.min_column_value);
1014                update_max(&self.descr, &max, &mut self.column_metrics.max_column_value);
1015
1016                (self.statistics_enabled == EnabledStatistics::Page).then_some(
1017                    ValueStatistics::new(
1018                        Some(min),
1019                        Some(max),
1020                        None,
1021                        Some(self.page_metrics.num_page_nulls),
1022                        false,
1023                    ),
1024                )
1025            }
1026            _ => None,
1027        };
1028
1029        // update column and offset index
1030        self.update_column_offset_index(
1031            page_statistics.as_ref(),
1032            values_data.variable_length_bytes,
1033        );
1034
1035        // Update histograms and variable_length_bytes in column_metrics
1036        self.column_metrics
1037            .update_from_page_metrics(&self.page_metrics);
1038        self.column_metrics
1039            .update_variable_length_bytes(values_data.variable_length_bytes);
1040
1041        // From here on, we only need page statistics if they will be written to the page header.
1042        let page_statistics = page_statistics
1043            .filter(|_| self.props.write_page_header_statistics(self.descr.path()))
1044            .map(|stats| self.truncate_statistics(Statistics::from(stats)));
1045
1046        let compressed_page = match self.props.writer_version() {
1047            WriterVersion::PARQUET_1_0 => {
1048                let mut buffer = vec![];
1049
1050                if max_rep_level > 0 {
1051                    buffer.extend_from_slice(
1052                        &self.encode_levels_v1(
1053                            Encoding::RLE,
1054                            &self.rep_levels_sink[..],
1055                            max_rep_level,
1056                        )[..],
1057                    );
1058                }
1059
1060                if max_def_level > 0 {
1061                    buffer.extend_from_slice(
1062                        &self.encode_levels_v1(
1063                            Encoding::RLE,
1064                            &self.def_levels_sink[..],
1065                            max_def_level,
1066                        )[..],
1067                    );
1068                }
1069
1070                buffer.extend_from_slice(&values_data.buf);
1071                let uncompressed_size = buffer.len();
1072
1073                if let Some(ref mut cmpr) = self.compressor {
1074                    let mut compressed_buf = Vec::with_capacity(uncompressed_size);
1075                    cmpr.compress(&buffer[..], &mut compressed_buf)?;
1076                    buffer = compressed_buf;
1077                }
1078
1079                let data_page = Page::DataPage {
1080                    buf: buffer.into(),
1081                    num_values: self.page_metrics.num_buffered_values,
1082                    encoding: values_data.encoding,
1083                    def_level_encoding: Encoding::RLE,
1084                    rep_level_encoding: Encoding::RLE,
1085                    statistics: page_statistics,
1086                };
1087
1088                CompressedPage::new(data_page, uncompressed_size)
1089            }
1090            WriterVersion::PARQUET_2_0 => {
1091                let mut rep_levels_byte_len = 0;
1092                let mut def_levels_byte_len = 0;
1093                let mut buffer = vec![];
1094
1095                if max_rep_level > 0 {
1096                    let levels = self.encode_levels_v2(&self.rep_levels_sink[..], max_rep_level);
1097                    rep_levels_byte_len = levels.len();
1098                    buffer.extend_from_slice(&levels[..]);
1099                }
1100
1101                if max_def_level > 0 {
1102                    let levels = self.encode_levels_v2(&self.def_levels_sink[..], max_def_level);
1103                    def_levels_byte_len = levels.len();
1104                    buffer.extend_from_slice(&levels[..]);
1105                }
1106
1107                let uncompressed_size =
1108                    rep_levels_byte_len + def_levels_byte_len + values_data.buf.len();
1109
1110                // Data Page v2 compresses values only.
1111                match self.compressor {
1112                    Some(ref mut cmpr) => {
1113                        cmpr.compress(&values_data.buf, &mut buffer)?;
1114                    }
1115                    None => buffer.extend_from_slice(&values_data.buf),
1116                }
1117
1118                let data_page = Page::DataPageV2 {
1119                    buf: buffer.into(),
1120                    num_values: self.page_metrics.num_buffered_values,
1121                    encoding: values_data.encoding,
1122                    num_nulls: self.page_metrics.num_page_nulls as u32,
1123                    num_rows: self.page_metrics.num_buffered_rows,
1124                    def_levels_byte_len: def_levels_byte_len as u32,
1125                    rep_levels_byte_len: rep_levels_byte_len as u32,
1126                    is_compressed: self.compressor.is_some(),
1127                    statistics: page_statistics,
1128                };
1129
1130                CompressedPage::new(data_page, uncompressed_size)
1131            }
1132        };
1133
1134        // Check if we need to buffer data page or flush it to the sink directly.
1135        if self.encoder.has_dictionary() {
1136            self.data_pages.push_back(compressed_page);
1137        } else {
1138            self.write_data_page(compressed_page)?;
1139        }
1140
1141        // Update total number of rows.
1142        self.column_metrics.total_rows_written += self.page_metrics.num_buffered_rows as u64;
1143
1144        // Reset state.
1145        self.rep_levels_sink.clear();
1146        self.def_levels_sink.clear();
1147        self.page_metrics.new_page();
1148
1149        Ok(())
1150    }
1151
1152    /// Finalises any outstanding data pages and flushes buffered data pages from
1153    /// dictionary encoding into underlying sink.
1154    #[inline]
1155    fn flush_data_pages(&mut self) -> Result<()> {
1156        // Write all outstanding data to a new page.
1157        if self.page_metrics.num_buffered_values > 0 {
1158            self.add_data_page()?;
1159        }
1160
1161        while let Some(page) = self.data_pages.pop_front() {
1162            self.write_data_page(page)?;
1163        }
1164
1165        Ok(())
1166    }
1167
1168    /// Assembles column chunk metadata.
1169    fn build_column_metadata(&mut self) -> Result<ColumnChunkMetaData> {
1170        let total_compressed_size = self.column_metrics.total_compressed_size as i64;
1171        let total_uncompressed_size = self.column_metrics.total_uncompressed_size as i64;
1172        let num_values = self.column_metrics.total_num_values as i64;
1173        let dict_page_offset = self.column_metrics.dictionary_page_offset.map(|v| v as i64);
1174        // If data page offset is not set, then no pages have been written
1175        let data_page_offset = self.column_metrics.data_page_offset.unwrap_or(0) as i64;
1176
1177        let mut builder = ColumnChunkMetaData::builder(self.descr.clone())
1178            .set_compression(self.codec)
1179            .set_encodings(self.encodings.iter().cloned().collect())
1180            .set_page_encoding_stats(self.encoding_stats.clone())
1181            .set_total_compressed_size(total_compressed_size)
1182            .set_total_uncompressed_size(total_uncompressed_size)
1183            .set_num_values(num_values)
1184            .set_data_page_offset(data_page_offset)
1185            .set_dictionary_page_offset(dict_page_offset);
1186
1187        if self.statistics_enabled != EnabledStatistics::None {
1188            let backwards_compatible_min_max = self.descr.sort_order().is_signed();
1189
1190            let statistics = ValueStatistics::<E::T>::new(
1191                self.column_metrics.min_column_value.clone(),
1192                self.column_metrics.max_column_value.clone(),
1193                self.column_metrics.column_distinct_count,
1194                Some(self.column_metrics.num_column_nulls),
1195                false,
1196            )
1197            .with_backwards_compatible_min_max(backwards_compatible_min_max)
1198            .into();
1199
1200            let statistics = self.truncate_statistics(statistics);
1201
1202            builder = builder
1203                .set_statistics(statistics)
1204                .set_unencoded_byte_array_data_bytes(self.column_metrics.variable_length_bytes)
1205                .set_repetition_level_histogram(
1206                    self.column_metrics.repetition_level_histogram.take(),
1207                )
1208                .set_definition_level_histogram(
1209                    self.column_metrics.definition_level_histogram.take(),
1210                );
1211        }
1212
1213        builder = self.set_column_chunk_encryption_properties(builder);
1214
1215        let metadata = builder.build()?;
1216        Ok(metadata)
1217    }
1218
1219    /// Encodes definition or repetition levels for Data Page v1.
1220    #[inline]
1221    fn encode_levels_v1(&self, encoding: Encoding, levels: &[i16], max_level: i16) -> Vec<u8> {
1222        let mut encoder = LevelEncoder::v1(encoding, max_level, levels.len());
1223        encoder.put(levels);
1224        encoder.consume()
1225    }
1226
1227    /// Encodes definition or repetition levels for Data Page v2.
1228    /// Encoding is always RLE.
1229    #[inline]
1230    fn encode_levels_v2(&self, levels: &[i16], max_level: i16) -> Vec<u8> {
1231        let mut encoder = LevelEncoder::v2(max_level, levels.len());
1232        encoder.put(levels);
1233        encoder.consume()
1234    }
1235
1236    /// Writes compressed data page into underlying sink and updates global metrics.
1237    #[inline]
1238    fn write_data_page(&mut self, page: CompressedPage) -> Result<()> {
1239        self.encodings.insert(page.encoding());
1240        match self.encoding_stats.last_mut() {
1241            Some(encoding_stats)
1242                if encoding_stats.page_type == page.page_type()
1243                    && encoding_stats.encoding == page.encoding() =>
1244            {
1245                encoding_stats.count += 1;
1246            }
1247            _ => {
1248                // data page type does not change inside a file
1249                // encoding can currently only change from dictionary to non-dictionary once
1250                self.encoding_stats.push(PageEncodingStats {
1251                    page_type: page.page_type(),
1252                    encoding: page.encoding(),
1253                    count: 1,
1254                });
1255            }
1256        }
1257        let page_spec = self.page_writer.write_page(page)?;
1258        // update offset index
1259        // compressed_size = header_size + compressed_data_size
1260        if let Some(builder) = self.offset_index_builder.as_mut() {
1261            builder
1262                .append_offset_and_size(page_spec.offset as i64, page_spec.compressed_size as i32)
1263        }
1264        self.update_metrics_for_page(page_spec);
1265        Ok(())
1266    }
1267
1268    /// Writes dictionary page into underlying sink.
1269    #[inline]
1270    fn write_dictionary_page(&mut self) -> Result<()> {
1271        let compressed_page = {
1272            let mut page = self
1273                .encoder
1274                .flush_dict_page()?
1275                .ok_or_else(|| general_err!("Dictionary encoder is not set"))?;
1276
1277            let uncompressed_size = page.buf.len();
1278
1279            if let Some(ref mut cmpr) = self.compressor {
1280                let mut output_buf = Vec::with_capacity(uncompressed_size);
1281                cmpr.compress(&page.buf, &mut output_buf)?;
1282                page.buf = Bytes::from(output_buf);
1283            }
1284
1285            let dict_page = Page::DictionaryPage {
1286                buf: page.buf,
1287                num_values: page.num_values as u32,
1288                encoding: self.props.dictionary_page_encoding(),
1289                is_sorted: page.is_sorted,
1290            };
1291            CompressedPage::new(dict_page, uncompressed_size)
1292        };
1293
1294        self.encodings.insert(compressed_page.encoding());
1295        self.encoding_stats.push(PageEncodingStats {
1296            page_type: PageType::DICTIONARY_PAGE,
1297            encoding: compressed_page.encoding(),
1298            count: 1,
1299        });
1300        let page_spec = self.page_writer.write_page(compressed_page)?;
1301        self.update_metrics_for_page(page_spec);
1302        // For the directory page, don't need to update column/offset index.
1303        Ok(())
1304    }
1305
1306    /// Updates column writer metrics with each page metadata.
1307    #[inline]
1308    fn update_metrics_for_page(&mut self, page_spec: PageWriteSpec) {
1309        self.column_metrics.total_uncompressed_size += page_spec.uncompressed_size as u64;
1310        self.column_metrics.total_compressed_size += page_spec.compressed_size as u64;
1311        self.column_metrics.total_bytes_written += page_spec.bytes_written;
1312
1313        match page_spec.page_type {
1314            PageType::DATA_PAGE | PageType::DATA_PAGE_V2 => {
1315                self.column_metrics.total_num_values += page_spec.num_values as u64;
1316                if self.column_metrics.data_page_offset.is_none() {
1317                    self.column_metrics.data_page_offset = Some(page_spec.offset);
1318                }
1319            }
1320            PageType::DICTIONARY_PAGE => {
1321                assert!(
1322                    self.column_metrics.dictionary_page_offset.is_none(),
1323                    "Dictionary offset is already set"
1324                );
1325                self.column_metrics.dictionary_page_offset = Some(page_spec.offset);
1326            }
1327            _ => {}
1328        }
1329    }
1330
1331    #[inline]
1332    #[cfg(feature = "encryption")]
1333    fn set_column_chunk_encryption_properties(
1334        &self,
1335        builder: ColumnChunkMetaDataBuilder,
1336    ) -> ColumnChunkMetaDataBuilder {
1337        if let Some(encryption_properties) = self.props.file_encryption_properties.as_ref() {
1338            builder.set_column_crypto_metadata(get_column_crypto_metadata(
1339                encryption_properties,
1340                &self.descr,
1341            ))
1342        } else {
1343            builder
1344        }
1345    }
1346
1347    #[inline]
1348    #[cfg(not(feature = "encryption"))]
1349    fn set_column_chunk_encryption_properties(
1350        &self,
1351        builder: ColumnChunkMetaDataBuilder,
1352    ) -> ColumnChunkMetaDataBuilder {
1353        builder
1354    }
1355}
1356
1357fn update_min<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, min: &mut Option<T>) {
1358    update_stat::<T, _>(descr, val, min, |cur| compare_greater(descr, cur, val))
1359}
1360
1361fn update_max<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, max: &mut Option<T>) {
1362    update_stat::<T, _>(descr, val, max, |cur| compare_greater(descr, val, cur))
1363}
1364
1365#[inline]
1366#[allow(clippy::eq_op)]
1367fn is_nan<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T) -> bool {
1368    match T::PHYSICAL_TYPE {
1369        Type::FLOAT | Type::DOUBLE => val != val,
1370        Type::FIXED_LEN_BYTE_ARRAY if descr.logical_type() == Some(LogicalType::Float16) => {
1371            let val = val.as_bytes();
1372            let val = f16::from_le_bytes([val[0], val[1]]);
1373            val.is_nan()
1374        }
1375        _ => false,
1376    }
1377}
1378
1379/// Perform a conditional update of `cur`, skipping any NaN values
1380///
1381/// If `cur` is `None`, sets `cur` to `Some(val)`, otherwise calls `should_update` with
1382/// the value of `cur`, and updates `cur` to `Some(val)` if it returns `true`
1383fn update_stat<T: ParquetValueType, F>(
1384    descr: &ColumnDescriptor,
1385    val: &T,
1386    cur: &mut Option<T>,
1387    should_update: F,
1388) where
1389    F: Fn(&T) -> bool,
1390{
1391    if is_nan(descr, val) {
1392        return;
1393    }
1394
1395    if cur.as_ref().map_or(true, should_update) {
1396        *cur = Some(val.clone());
1397    }
1398}
1399
1400/// Evaluate `a > b` according to underlying logical type.
1401fn compare_greater<T: ParquetValueType>(descr: &ColumnDescriptor, a: &T, b: &T) -> bool {
1402    if let Some(LogicalType::Integer { is_signed, .. }) = descr.logical_type() {
1403        if !is_signed {
1404            // need to compare unsigned
1405            return a.as_u64().unwrap() > b.as_u64().unwrap();
1406        }
1407    }
1408
1409    match descr.converted_type() {
1410        ConvertedType::UINT_8
1411        | ConvertedType::UINT_16
1412        | ConvertedType::UINT_32
1413        | ConvertedType::UINT_64 => {
1414            return a.as_u64().unwrap() > b.as_u64().unwrap();
1415        }
1416        _ => {}
1417    };
1418
1419    if let Some(LogicalType::Decimal { .. }) = descr.logical_type() {
1420        match T::PHYSICAL_TYPE {
1421            Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
1422                return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1423            }
1424            _ => {}
1425        };
1426    }
1427
1428    if descr.converted_type() == ConvertedType::DECIMAL {
1429        match T::PHYSICAL_TYPE {
1430            Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
1431                return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1432            }
1433            _ => {}
1434        };
1435    };
1436
1437    if let Some(LogicalType::Float16) = descr.logical_type() {
1438        let a = a.as_bytes();
1439        let a = f16::from_le_bytes([a[0], a[1]]);
1440        let b = b.as_bytes();
1441        let b = f16::from_le_bytes([b[0], b[1]]);
1442        return a > b;
1443    }
1444
1445    a > b
1446}
1447
1448// ----------------------------------------------------------------------
1449// Encoding support for column writer.
1450// This mirrors parquet-mr default encodings for writes. See:
1451// https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV1ValuesWriterFactory.java
1452// https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV2ValuesWriterFactory.java
1453
1454/// Returns encoding for a column when no other encoding is provided in writer properties.
1455fn fallback_encoding(kind: Type, props: &WriterProperties) -> Encoding {
1456    match (kind, props.writer_version()) {
1457        (Type::BOOLEAN, WriterVersion::PARQUET_2_0) => Encoding::RLE,
1458        (Type::INT32, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1459        (Type::INT64, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1460        (Type::BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1461        (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1462        _ => Encoding::PLAIN,
1463    }
1464}
1465
1466/// Returns true if dictionary is supported for column writer, false otherwise.
1467fn has_dictionary_support(kind: Type, props: &WriterProperties) -> bool {
1468    match (kind, props.writer_version()) {
1469        // Booleans do not support dict encoding and should use a fallback encoding.
1470        (Type::BOOLEAN, _) => false,
1471        // Dictionary encoding was not enabled in PARQUET 1.0
1472        (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_1_0) => false,
1473        (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => true,
1474        _ => true,
1475    }
1476}
1477
1478/// Signed comparison of bytes arrays
1479fn compare_greater_byte_array_decimals(a: &[u8], b: &[u8]) -> bool {
1480    let a_length = a.len();
1481    let b_length = b.len();
1482
1483    if a_length == 0 || b_length == 0 {
1484        return a_length > 0;
1485    }
1486
1487    let first_a: u8 = a[0];
1488    let first_b: u8 = b[0];
1489
1490    // We can short circuit for different signed numbers or
1491    // for equal length bytes arrays that have different first bytes.
1492    // The equality requirement is necessary for sign extension cases.
1493    // 0xFF10 should be equal to 0x10 (due to big endian sign extension).
1494    if (0x80 & first_a) != (0x80 & first_b) || (a_length == b_length && first_a != first_b) {
1495        return (first_a as i8) > (first_b as i8);
1496    }
1497
1498    // When the lengths are unequal and the numbers are of the same
1499    // sign we need to do comparison by sign extending the shorter
1500    // value first, and once we get to equal sized arrays, lexicographical
1501    // unsigned comparison of everything but the first byte is sufficient.
1502
1503    let extension: u8 = if (first_a as i8) < 0 { 0xFF } else { 0 };
1504
1505    if a_length != b_length {
1506        let not_equal = if a_length > b_length {
1507            let lead_length = a_length - b_length;
1508            a[0..lead_length].iter().any(|&x| x != extension)
1509        } else {
1510            let lead_length = b_length - a_length;
1511            b[0..lead_length].iter().any(|&x| x != extension)
1512        };
1513
1514        if not_equal {
1515            let negative_values: bool = (first_a as i8) < 0;
1516            let a_longer: bool = a_length > b_length;
1517            return if negative_values { !a_longer } else { a_longer };
1518        }
1519    }
1520
1521    (a[1..]) > (b[1..])
1522}
1523
1524/// Truncate a UTF-8 slice to the longest prefix that is still a valid UTF-8 string,
1525/// while being less than `length` bytes and non-empty. Returns `None` if truncation
1526/// is not possible within those constraints.
1527///
1528/// The caller guarantees that data.len() > length.
1529fn truncate_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1530    let split = (1..=length).rfind(|x| data.is_char_boundary(*x))?;
1531    Some(data.as_bytes()[..split].to_vec())
1532}
1533
1534/// Truncate a UTF-8 slice and increment it's final character. The returned value is the
1535/// longest such slice that is still a valid UTF-8 string while being less than `length`
1536/// bytes and non-empty. Returns `None` if no such transformation is possible.
1537///
1538/// The caller guarantees that data.len() > length.
1539fn truncate_and_increment_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1540    // UTF-8 is max 4 bytes, so start search 3 back from desired length
1541    let lower_bound = length.saturating_sub(3);
1542    let split = (lower_bound..=length).rfind(|x| data.is_char_boundary(*x))?;
1543    increment_utf8(data.get(..split)?)
1544}
1545
1546/// Increment the final character in a UTF-8 string in such a way that the returned result
1547/// is still a valid UTF-8 string. The returned string may be shorter than the input if the
1548/// last character(s) cannot be incremented (due to overflow or producing invalid code points).
1549/// Returns `None` if the string cannot be incremented.
1550///
1551/// Note that this implementation will not promote an N-byte code point to (N+1) bytes.
1552fn increment_utf8(data: &str) -> Option<Vec<u8>> {
1553    for (idx, original_char) in data.char_indices().rev() {
1554        let original_len = original_char.len_utf8();
1555        if let Some(next_char) = char::from_u32(original_char as u32 + 1) {
1556            // do not allow increasing byte width of incremented char
1557            if next_char.len_utf8() == original_len {
1558                let mut result = data.as_bytes()[..idx + original_len].to_vec();
1559                next_char.encode_utf8(&mut result[idx..]);
1560                return Some(result);
1561            }
1562        }
1563    }
1564
1565    None
1566}
1567
1568/// Try and increment the bytes from right to left.
1569///
1570/// Returns `None` if all bytes are set to `u8::MAX`.
1571fn increment(mut data: Vec<u8>) -> Option<Vec<u8>> {
1572    for byte in data.iter_mut().rev() {
1573        let (incremented, overflow) = byte.overflowing_add(1);
1574        *byte = incremented;
1575
1576        if !overflow {
1577            return Some(data);
1578        }
1579    }
1580
1581    None
1582}
1583
1584#[cfg(test)]
1585mod tests {
1586    use crate::{
1587        file::{properties::DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, writer::SerializedFileWriter},
1588        schema::parser::parse_message_type,
1589    };
1590    use core::str;
1591    use rand::distr::uniform::SampleUniform;
1592    use std::{fs::File, sync::Arc};
1593
1594    use crate::column::{
1595        page::PageReader,
1596        reader::{get_column_reader, get_typed_column_reader, ColumnReaderImpl},
1597    };
1598    use crate::file::writer::TrackedWrite;
1599    use crate::file::{
1600        properties::ReaderProperties, reader::SerializedPageReader, writer::SerializedPageWriter,
1601    };
1602    use crate::schema::types::{ColumnPath, Type as SchemaType};
1603    use crate::util::test_common::rand_gen::random_numbers_range;
1604
1605    use super::*;
1606
1607    #[test]
1608    fn test_column_writer_inconsistent_def_rep_length() {
1609        let page_writer = get_test_page_writer();
1610        let props = Default::default();
1611        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 1, props);
1612        let res = writer.write_batch(&[1, 2, 3, 4], Some(&[1, 1, 1]), Some(&[0, 0]));
1613        assert!(res.is_err());
1614        if let Err(err) = res {
1615            assert_eq!(
1616                format!("{err}"),
1617                "Parquet error: Inconsistent length of definition and repetition levels: 3 != 2"
1618            );
1619        }
1620    }
1621
1622    #[test]
1623    fn test_column_writer_invalid_def_levels() {
1624        let page_writer = get_test_page_writer();
1625        let props = Default::default();
1626        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1627        let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1628        assert!(res.is_err());
1629        if let Err(err) = res {
1630            assert_eq!(
1631                format!("{err}"),
1632                "Parquet error: Definition levels are required, because max definition level = 1"
1633            );
1634        }
1635    }
1636
1637    #[test]
1638    fn test_column_writer_invalid_rep_levels() {
1639        let page_writer = get_test_page_writer();
1640        let props = Default::default();
1641        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 1, props);
1642        let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1643        assert!(res.is_err());
1644        if let Err(err) = res {
1645            assert_eq!(
1646                format!("{err}"),
1647                "Parquet error: Repetition levels are required, because max repetition level = 1"
1648            );
1649        }
1650    }
1651
1652    #[test]
1653    fn test_column_writer_not_enough_values_to_write() {
1654        let page_writer = get_test_page_writer();
1655        let props = Default::default();
1656        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1657        let res = writer.write_batch(&[1, 2], Some(&[1, 1, 1, 1]), None);
1658        assert!(res.is_err());
1659        if let Err(err) = res {
1660            assert_eq!(
1661                format!("{err}"),
1662                "Parquet error: Expected to write 4 values, but have only 2"
1663            );
1664        }
1665    }
1666
1667    #[test]
1668    fn test_column_writer_write_only_one_dictionary_page() {
1669        let page_writer = get_test_page_writer();
1670        let props = Default::default();
1671        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1672        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1673        // First page should be correctly written.
1674        writer.add_data_page().unwrap();
1675        writer.write_dictionary_page().unwrap();
1676        let err = writer.write_dictionary_page().unwrap_err().to_string();
1677        assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1678    }
1679
1680    #[test]
1681    fn test_column_writer_error_when_writing_disabled_dictionary() {
1682        let page_writer = get_test_page_writer();
1683        let props = Arc::new(
1684            WriterProperties::builder()
1685                .set_dictionary_enabled(false)
1686                .build(),
1687        );
1688        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1689        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1690        let err = writer.write_dictionary_page().unwrap_err().to_string();
1691        assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1692    }
1693
1694    #[test]
1695    fn test_column_writer_boolean_type_does_not_support_dictionary() {
1696        let page_writer = get_test_page_writer();
1697        let props = Arc::new(
1698            WriterProperties::builder()
1699                .set_dictionary_enabled(true)
1700                .build(),
1701        );
1702        let mut writer = get_test_column_writer::<BoolType>(page_writer, 0, 0, props);
1703        writer
1704            .write_batch(&[true, false, true, false], None, None)
1705            .unwrap();
1706
1707        let r = writer.close().unwrap();
1708        // PlainEncoder uses bit writer to write boolean values, which all fit into 1
1709        // byte.
1710        assert_eq!(r.bytes_written, 1);
1711        assert_eq!(r.rows_written, 4);
1712
1713        let metadata = r.metadata;
1714        assert_eq!(metadata.encodings(), &vec![Encoding::PLAIN, Encoding::RLE]);
1715        assert_eq!(metadata.num_values(), 4); // just values
1716        assert_eq!(metadata.dictionary_page_offset(), None);
1717    }
1718
1719    #[test]
1720    fn test_column_writer_default_encoding_support_bool() {
1721        check_encoding_write_support::<BoolType>(
1722            WriterVersion::PARQUET_1_0,
1723            true,
1724            &[true, false],
1725            None,
1726            &[Encoding::PLAIN, Encoding::RLE],
1727            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1728        );
1729        check_encoding_write_support::<BoolType>(
1730            WriterVersion::PARQUET_1_0,
1731            false,
1732            &[true, false],
1733            None,
1734            &[Encoding::PLAIN, Encoding::RLE],
1735            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1736        );
1737        check_encoding_write_support::<BoolType>(
1738            WriterVersion::PARQUET_2_0,
1739            true,
1740            &[true, false],
1741            None,
1742            &[Encoding::RLE],
1743            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE, 1)],
1744        );
1745        check_encoding_write_support::<BoolType>(
1746            WriterVersion::PARQUET_2_0,
1747            false,
1748            &[true, false],
1749            None,
1750            &[Encoding::RLE],
1751            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE, 1)],
1752        );
1753    }
1754
1755    #[test]
1756    fn test_column_writer_default_encoding_support_int32() {
1757        check_encoding_write_support::<Int32Type>(
1758            WriterVersion::PARQUET_1_0,
1759            true,
1760            &[1, 2],
1761            Some(0),
1762            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1763            &[
1764                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1765                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1766            ],
1767        );
1768        check_encoding_write_support::<Int32Type>(
1769            WriterVersion::PARQUET_1_0,
1770            false,
1771            &[1, 2],
1772            None,
1773            &[Encoding::PLAIN, Encoding::RLE],
1774            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1775        );
1776        check_encoding_write_support::<Int32Type>(
1777            WriterVersion::PARQUET_2_0,
1778            true,
1779            &[1, 2],
1780            Some(0),
1781            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1782            &[
1783                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1784                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1785            ],
1786        );
1787        check_encoding_write_support::<Int32Type>(
1788            WriterVersion::PARQUET_2_0,
1789            false,
1790            &[1, 2],
1791            None,
1792            &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
1793            &[encoding_stats(
1794                PageType::DATA_PAGE_V2,
1795                Encoding::DELTA_BINARY_PACKED,
1796                1,
1797            )],
1798        );
1799    }
1800
1801    #[test]
1802    fn test_column_writer_default_encoding_support_int64() {
1803        check_encoding_write_support::<Int64Type>(
1804            WriterVersion::PARQUET_1_0,
1805            true,
1806            &[1, 2],
1807            Some(0),
1808            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1809            &[
1810                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1811                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1812            ],
1813        );
1814        check_encoding_write_support::<Int64Type>(
1815            WriterVersion::PARQUET_1_0,
1816            false,
1817            &[1, 2],
1818            None,
1819            &[Encoding::PLAIN, Encoding::RLE],
1820            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1821        );
1822        check_encoding_write_support::<Int64Type>(
1823            WriterVersion::PARQUET_2_0,
1824            true,
1825            &[1, 2],
1826            Some(0),
1827            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1828            &[
1829                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1830                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1831            ],
1832        );
1833        check_encoding_write_support::<Int64Type>(
1834            WriterVersion::PARQUET_2_0,
1835            false,
1836            &[1, 2],
1837            None,
1838            &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
1839            &[encoding_stats(
1840                PageType::DATA_PAGE_V2,
1841                Encoding::DELTA_BINARY_PACKED,
1842                1,
1843            )],
1844        );
1845    }
1846
1847    #[test]
1848    fn test_column_writer_default_encoding_support_int96() {
1849        check_encoding_write_support::<Int96Type>(
1850            WriterVersion::PARQUET_1_0,
1851            true,
1852            &[Int96::from(vec![1, 2, 3])],
1853            Some(0),
1854            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1855            &[
1856                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1857                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1858            ],
1859        );
1860        check_encoding_write_support::<Int96Type>(
1861            WriterVersion::PARQUET_1_0,
1862            false,
1863            &[Int96::from(vec![1, 2, 3])],
1864            None,
1865            &[Encoding::PLAIN, Encoding::RLE],
1866            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1867        );
1868        check_encoding_write_support::<Int96Type>(
1869            WriterVersion::PARQUET_2_0,
1870            true,
1871            &[Int96::from(vec![1, 2, 3])],
1872            Some(0),
1873            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1874            &[
1875                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1876                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1877            ],
1878        );
1879        check_encoding_write_support::<Int96Type>(
1880            WriterVersion::PARQUET_2_0,
1881            false,
1882            &[Int96::from(vec![1, 2, 3])],
1883            None,
1884            &[Encoding::PLAIN, Encoding::RLE],
1885            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1886        );
1887    }
1888
1889    #[test]
1890    fn test_column_writer_default_encoding_support_float() {
1891        check_encoding_write_support::<FloatType>(
1892            WriterVersion::PARQUET_1_0,
1893            true,
1894            &[1.0, 2.0],
1895            Some(0),
1896            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1897            &[
1898                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1899                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1900            ],
1901        );
1902        check_encoding_write_support::<FloatType>(
1903            WriterVersion::PARQUET_1_0,
1904            false,
1905            &[1.0, 2.0],
1906            None,
1907            &[Encoding::PLAIN, Encoding::RLE],
1908            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1909        );
1910        check_encoding_write_support::<FloatType>(
1911            WriterVersion::PARQUET_2_0,
1912            true,
1913            &[1.0, 2.0],
1914            Some(0),
1915            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1916            &[
1917                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1918                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1919            ],
1920        );
1921        check_encoding_write_support::<FloatType>(
1922            WriterVersion::PARQUET_2_0,
1923            false,
1924            &[1.0, 2.0],
1925            None,
1926            &[Encoding::PLAIN, Encoding::RLE],
1927            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1928        );
1929    }
1930
1931    #[test]
1932    fn test_column_writer_default_encoding_support_double() {
1933        check_encoding_write_support::<DoubleType>(
1934            WriterVersion::PARQUET_1_0,
1935            true,
1936            &[1.0, 2.0],
1937            Some(0),
1938            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1939            &[
1940                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1941                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1942            ],
1943        );
1944        check_encoding_write_support::<DoubleType>(
1945            WriterVersion::PARQUET_1_0,
1946            false,
1947            &[1.0, 2.0],
1948            None,
1949            &[Encoding::PLAIN, Encoding::RLE],
1950            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1951        );
1952        check_encoding_write_support::<DoubleType>(
1953            WriterVersion::PARQUET_2_0,
1954            true,
1955            &[1.0, 2.0],
1956            Some(0),
1957            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1958            &[
1959                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1960                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1961            ],
1962        );
1963        check_encoding_write_support::<DoubleType>(
1964            WriterVersion::PARQUET_2_0,
1965            false,
1966            &[1.0, 2.0],
1967            None,
1968            &[Encoding::PLAIN, Encoding::RLE],
1969            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1970        );
1971    }
1972
1973    #[test]
1974    fn test_column_writer_default_encoding_support_byte_array() {
1975        check_encoding_write_support::<ByteArrayType>(
1976            WriterVersion::PARQUET_1_0,
1977            true,
1978            &[ByteArray::from(vec![1u8])],
1979            Some(0),
1980            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1981            &[
1982                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1983                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1984            ],
1985        );
1986        check_encoding_write_support::<ByteArrayType>(
1987            WriterVersion::PARQUET_1_0,
1988            false,
1989            &[ByteArray::from(vec![1u8])],
1990            None,
1991            &[Encoding::PLAIN, Encoding::RLE],
1992            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1993        );
1994        check_encoding_write_support::<ByteArrayType>(
1995            WriterVersion::PARQUET_2_0,
1996            true,
1997            &[ByteArray::from(vec![1u8])],
1998            Some(0),
1999            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2000            &[
2001                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2002                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2003            ],
2004        );
2005        check_encoding_write_support::<ByteArrayType>(
2006            WriterVersion::PARQUET_2_0,
2007            false,
2008            &[ByteArray::from(vec![1u8])],
2009            None,
2010            &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
2011            &[encoding_stats(
2012                PageType::DATA_PAGE_V2,
2013                Encoding::DELTA_BYTE_ARRAY,
2014                1,
2015            )],
2016        );
2017    }
2018
2019    #[test]
2020    fn test_column_writer_default_encoding_support_fixed_len_byte_array() {
2021        check_encoding_write_support::<FixedLenByteArrayType>(
2022            WriterVersion::PARQUET_1_0,
2023            true,
2024            &[ByteArray::from(vec![1u8]).into()],
2025            None,
2026            &[Encoding::PLAIN, Encoding::RLE],
2027            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2028        );
2029        check_encoding_write_support::<FixedLenByteArrayType>(
2030            WriterVersion::PARQUET_1_0,
2031            false,
2032            &[ByteArray::from(vec![1u8]).into()],
2033            None,
2034            &[Encoding::PLAIN, Encoding::RLE],
2035            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2036        );
2037        check_encoding_write_support::<FixedLenByteArrayType>(
2038            WriterVersion::PARQUET_2_0,
2039            true,
2040            &[ByteArray::from(vec![1u8]).into()],
2041            Some(0),
2042            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2043            &[
2044                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2045                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2046            ],
2047        );
2048        check_encoding_write_support::<FixedLenByteArrayType>(
2049            WriterVersion::PARQUET_2_0,
2050            false,
2051            &[ByteArray::from(vec![1u8]).into()],
2052            None,
2053            &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
2054            &[encoding_stats(
2055                PageType::DATA_PAGE_V2,
2056                Encoding::DELTA_BYTE_ARRAY,
2057                1,
2058            )],
2059        );
2060    }
2061
2062    #[test]
2063    fn test_column_writer_check_metadata() {
2064        let page_writer = get_test_page_writer();
2065        let props = Default::default();
2066        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2067        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2068
2069        let r = writer.close().unwrap();
2070        assert_eq!(r.bytes_written, 20);
2071        assert_eq!(r.rows_written, 4);
2072
2073        let metadata = r.metadata;
2074        assert_eq!(
2075            metadata.encodings(),
2076            &vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
2077        );
2078        assert_eq!(metadata.num_values(), 4);
2079        assert_eq!(metadata.compressed_size(), 20);
2080        assert_eq!(metadata.uncompressed_size(), 20);
2081        assert_eq!(metadata.data_page_offset(), 0);
2082        assert_eq!(metadata.dictionary_page_offset(), Some(0));
2083        if let Some(stats) = metadata.statistics() {
2084            assert_eq!(stats.null_count_opt(), Some(0));
2085            assert_eq!(stats.distinct_count_opt(), None);
2086            if let Statistics::Int32(stats) = stats {
2087                assert_eq!(stats.min_opt().unwrap(), &1);
2088                assert_eq!(stats.max_opt().unwrap(), &4);
2089            } else {
2090                panic!("expecting Statistics::Int32");
2091            }
2092        } else {
2093            panic!("metadata missing statistics");
2094        }
2095    }
2096
2097    #[test]
2098    fn test_column_writer_check_byte_array_min_max() {
2099        let page_writer = get_test_page_writer();
2100        let props = Default::default();
2101        let mut writer = get_test_decimals_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
2102        writer
2103            .write_batch(
2104                &[
2105                    ByteArray::from(vec![
2106                        255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
2107                        35u8, 231u8, 90u8, 0u8, 0u8,
2108                    ]),
2109                    ByteArray::from(vec![
2110                        255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 228u8, 62u8, 146u8,
2111                        152u8, 177u8, 56u8, 0u8, 0u8,
2112                    ]),
2113                    ByteArray::from(vec![
2114                        0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8,
2115                        0u8,
2116                    ]),
2117                    ByteArray::from(vec![
2118                        0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
2119                        44u8, 0u8, 0u8,
2120                    ]),
2121                ],
2122                None,
2123                None,
2124            )
2125            .unwrap();
2126        let metadata = writer.close().unwrap().metadata;
2127        if let Some(stats) = metadata.statistics() {
2128            if let Statistics::ByteArray(stats) = stats {
2129                assert_eq!(
2130                    stats.min_opt().unwrap(),
2131                    &ByteArray::from(vec![
2132                        255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
2133                        35u8, 231u8, 90u8, 0u8, 0u8,
2134                    ])
2135                );
2136                assert_eq!(
2137                    stats.max_opt().unwrap(),
2138                    &ByteArray::from(vec![
2139                        0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
2140                        44u8, 0u8, 0u8,
2141                    ])
2142                );
2143            } else {
2144                panic!("expecting Statistics::ByteArray");
2145            }
2146        } else {
2147            panic!("metadata missing statistics");
2148        }
2149    }
2150
2151    #[test]
2152    fn test_column_writer_uint32_converted_type_min_max() {
2153        let page_writer = get_test_page_writer();
2154        let props = Default::default();
2155        let mut writer = get_test_unsigned_int_given_as_converted_column_writer::<Int32Type>(
2156            page_writer,
2157            0,
2158            0,
2159            props,
2160        );
2161        writer.write_batch(&[0, 1, 2, 3, 4, 5], None, None).unwrap();
2162        let metadata = writer.close().unwrap().metadata;
2163        if let Some(stats) = metadata.statistics() {
2164            if let Statistics::Int32(stats) = stats {
2165                assert_eq!(stats.min_opt().unwrap(), &0,);
2166                assert_eq!(stats.max_opt().unwrap(), &5,);
2167            } else {
2168                panic!("expecting Statistics::Int32");
2169            }
2170        } else {
2171            panic!("metadata missing statistics");
2172        }
2173    }
2174
2175    #[test]
2176    fn test_column_writer_precalculated_statistics() {
2177        let page_writer = get_test_page_writer();
2178        let props = Arc::new(
2179            WriterProperties::builder()
2180                .set_statistics_enabled(EnabledStatistics::Chunk)
2181                .build(),
2182        );
2183        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2184        writer
2185            .write_batch_with_statistics(
2186                &[1, 2, 3, 4],
2187                None,
2188                None,
2189                Some(&-17),
2190                Some(&9000),
2191                Some(55),
2192            )
2193            .unwrap();
2194
2195        let r = writer.close().unwrap();
2196        assert_eq!(r.bytes_written, 20);
2197        assert_eq!(r.rows_written, 4);
2198
2199        let metadata = r.metadata;
2200        assert_eq!(
2201            metadata.encodings(),
2202            &vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
2203        );
2204        assert_eq!(metadata.num_values(), 4);
2205        assert_eq!(metadata.compressed_size(), 20);
2206        assert_eq!(metadata.uncompressed_size(), 20);
2207        assert_eq!(metadata.data_page_offset(), 0);
2208        assert_eq!(metadata.dictionary_page_offset(), Some(0));
2209        if let Some(stats) = metadata.statistics() {
2210            assert_eq!(stats.null_count_opt(), Some(0));
2211            assert_eq!(stats.distinct_count_opt().unwrap_or(0), 55);
2212            if let Statistics::Int32(stats) = stats {
2213                assert_eq!(stats.min_opt().unwrap(), &-17);
2214                assert_eq!(stats.max_opt().unwrap(), &9000);
2215            } else {
2216                panic!("expecting Statistics::Int32");
2217            }
2218        } else {
2219            panic!("metadata missing statistics");
2220        }
2221    }
2222
2223    #[test]
2224    fn test_mixed_precomputed_statistics() {
2225        let mut buf = Vec::with_capacity(100);
2226        let mut write = TrackedWrite::new(&mut buf);
2227        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2228        let props = Arc::new(
2229            WriterProperties::builder()
2230                .set_write_page_header_statistics(true)
2231                .build(),
2232        );
2233        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2234
2235        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2236        writer
2237            .write_batch_with_statistics(&[5, 6, 7], None, None, Some(&5), Some(&7), Some(3))
2238            .unwrap();
2239
2240        let r = writer.close().unwrap();
2241
2242        let stats = r.metadata.statistics().unwrap();
2243        assert_eq!(stats.min_bytes_opt().unwrap(), 1_i32.to_le_bytes());
2244        assert_eq!(stats.max_bytes_opt().unwrap(), 7_i32.to_le_bytes());
2245        assert_eq!(stats.null_count_opt(), Some(0));
2246        assert!(stats.distinct_count_opt().is_none());
2247
2248        drop(write);
2249
2250        let props = ReaderProperties::builder()
2251            .set_backward_compatible_lz4(false)
2252            .build();
2253        let reader = SerializedPageReader::new_with_properties(
2254            Arc::new(Bytes::from(buf)),
2255            &r.metadata,
2256            r.rows_written as usize,
2257            None,
2258            Arc::new(props),
2259        )
2260        .unwrap();
2261
2262        let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2263        assert_eq!(pages.len(), 2);
2264
2265        assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2266        assert_eq!(pages[1].page_type(), PageType::DATA_PAGE);
2267
2268        let page_statistics = pages[1].statistics().unwrap();
2269        assert_eq!(
2270            page_statistics.min_bytes_opt().unwrap(),
2271            1_i32.to_le_bytes()
2272        );
2273        assert_eq!(
2274            page_statistics.max_bytes_opt().unwrap(),
2275            7_i32.to_le_bytes()
2276        );
2277        assert_eq!(page_statistics.null_count_opt(), Some(0));
2278        assert!(page_statistics.distinct_count_opt().is_none());
2279    }
2280
2281    #[test]
2282    fn test_disabled_statistics() {
2283        let mut buf = Vec::with_capacity(100);
2284        let mut write = TrackedWrite::new(&mut buf);
2285        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2286        let props = WriterProperties::builder()
2287            .set_statistics_enabled(EnabledStatistics::None)
2288            .set_writer_version(WriterVersion::PARQUET_2_0)
2289            .build();
2290        let props = Arc::new(props);
2291
2292        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
2293        writer
2294            .write_batch(&[1, 2, 3, 4], Some(&[1, 0, 0, 1, 1, 1]), None)
2295            .unwrap();
2296
2297        let r = writer.close().unwrap();
2298        assert!(r.metadata.statistics().is_none());
2299
2300        drop(write);
2301
2302        let props = ReaderProperties::builder()
2303            .set_backward_compatible_lz4(false)
2304            .build();
2305        let reader = SerializedPageReader::new_with_properties(
2306            Arc::new(Bytes::from(buf)),
2307            &r.metadata,
2308            r.rows_written as usize,
2309            None,
2310            Arc::new(props),
2311        )
2312        .unwrap();
2313
2314        let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2315        assert_eq!(pages.len(), 2);
2316
2317        assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2318        assert_eq!(pages[1].page_type(), PageType::DATA_PAGE_V2);
2319
2320        match &pages[1] {
2321            Page::DataPageV2 {
2322                num_values,
2323                num_nulls,
2324                num_rows,
2325                statistics,
2326                ..
2327            } => {
2328                assert_eq!(*num_values, 6);
2329                assert_eq!(*num_nulls, 2);
2330                assert_eq!(*num_rows, 6);
2331                assert!(statistics.is_none());
2332            }
2333            _ => unreachable!(),
2334        }
2335    }
2336
2337    #[test]
2338    fn test_column_writer_empty_column_roundtrip() {
2339        let props = Default::default();
2340        column_roundtrip::<Int32Type>(props, &[], None, None);
2341    }
2342
2343    #[test]
2344    fn test_column_writer_non_nullable_values_roundtrip() {
2345        let props = Default::default();
2346        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 0, 0);
2347    }
2348
2349    #[test]
2350    fn test_column_writer_nullable_non_repeated_values_roundtrip() {
2351        let props = Default::default();
2352        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 0);
2353    }
2354
2355    #[test]
2356    fn test_column_writer_nullable_repeated_values_roundtrip() {
2357        let props = Default::default();
2358        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2359    }
2360
2361    #[test]
2362    fn test_column_writer_dictionary_fallback_small_data_page() {
2363        let props = WriterProperties::builder()
2364            .set_dictionary_page_size_limit(32)
2365            .set_data_page_size_limit(32)
2366            .build();
2367        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2368    }
2369
2370    #[test]
2371    fn test_column_writer_small_write_batch_size() {
2372        for i in &[1usize, 2, 5, 10, 11, 1023] {
2373            let props = WriterProperties::builder().set_write_batch_size(*i).build();
2374
2375            column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2376        }
2377    }
2378
2379    #[test]
2380    fn test_column_writer_dictionary_disabled_v1() {
2381        let props = WriterProperties::builder()
2382            .set_writer_version(WriterVersion::PARQUET_1_0)
2383            .set_dictionary_enabled(false)
2384            .build();
2385        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2386    }
2387
2388    #[test]
2389    fn test_column_writer_dictionary_disabled_v2() {
2390        let props = WriterProperties::builder()
2391            .set_writer_version(WriterVersion::PARQUET_2_0)
2392            .set_dictionary_enabled(false)
2393            .build();
2394        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2395    }
2396
2397    #[test]
2398    fn test_column_writer_compression_v1() {
2399        let props = WriterProperties::builder()
2400            .set_writer_version(WriterVersion::PARQUET_1_0)
2401            .set_compression(Compression::SNAPPY)
2402            .build();
2403        column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2404    }
2405
2406    #[test]
2407    fn test_column_writer_compression_v2() {
2408        let props = WriterProperties::builder()
2409            .set_writer_version(WriterVersion::PARQUET_2_0)
2410            .set_compression(Compression::SNAPPY)
2411            .build();
2412        column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2413    }
2414
2415    #[test]
2416    fn test_column_writer_add_data_pages_with_dict() {
2417        // ARROW-5129: Test verifies that we add data page in case of dictionary encoding
2418        // and no fallback occurred so far.
2419        let mut file = tempfile::tempfile().unwrap();
2420        let mut write = TrackedWrite::new(&mut file);
2421        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2422        let props = Arc::new(
2423            WriterProperties::builder()
2424                .set_data_page_size_limit(10)
2425                .set_write_batch_size(3) // write 3 values at a time
2426                .build(),
2427        );
2428        let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
2429        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2430        writer.write_batch(data, None, None).unwrap();
2431        let r = writer.close().unwrap();
2432
2433        drop(write);
2434
2435        // Read pages and check the sequence
2436        let props = ReaderProperties::builder()
2437            .set_backward_compatible_lz4(false)
2438            .build();
2439        let mut page_reader = Box::new(
2440            SerializedPageReader::new_with_properties(
2441                Arc::new(file),
2442                &r.metadata,
2443                r.rows_written as usize,
2444                None,
2445                Arc::new(props),
2446            )
2447            .unwrap(),
2448        );
2449        let mut res = Vec::new();
2450        while let Some(page) = page_reader.get_next_page().unwrap() {
2451            res.push((page.page_type(), page.num_values(), page.buffer().len()));
2452        }
2453        assert_eq!(
2454            res,
2455            vec![
2456                (PageType::DICTIONARY_PAGE, 10, 40),
2457                (PageType::DATA_PAGE, 9, 10),
2458                (PageType::DATA_PAGE, 1, 3),
2459            ]
2460        );
2461        assert_eq!(
2462            r.metadata.page_encoding_stats(),
2463            Some(&vec![
2464                PageEncodingStats {
2465                    page_type: PageType::DICTIONARY_PAGE,
2466                    encoding: Encoding::PLAIN,
2467                    count: 1
2468                },
2469                PageEncodingStats {
2470                    page_type: PageType::DATA_PAGE,
2471                    encoding: Encoding::RLE_DICTIONARY,
2472                    count: 2,
2473                }
2474            ])
2475        );
2476    }
2477
2478    #[test]
2479    fn test_bool_statistics() {
2480        let stats = statistics_roundtrip::<BoolType>(&[true, false, false, true]);
2481        // Booleans have an unsigned sort order and so are not compatible
2482        // with the deprecated `min` and `max` statistics
2483        assert!(!stats.is_min_max_backwards_compatible());
2484        if let Statistics::Boolean(stats) = stats {
2485            assert_eq!(stats.min_opt().unwrap(), &false);
2486            assert_eq!(stats.max_opt().unwrap(), &true);
2487        } else {
2488            panic!("expecting Statistics::Boolean, got {stats:?}");
2489        }
2490    }
2491
2492    #[test]
2493    fn test_int32_statistics() {
2494        let stats = statistics_roundtrip::<Int32Type>(&[-1, 3, -2, 2]);
2495        assert!(stats.is_min_max_backwards_compatible());
2496        if let Statistics::Int32(stats) = stats {
2497            assert_eq!(stats.min_opt().unwrap(), &-2);
2498            assert_eq!(stats.max_opt().unwrap(), &3);
2499        } else {
2500            panic!("expecting Statistics::Int32, got {stats:?}");
2501        }
2502    }
2503
2504    #[test]
2505    fn test_int64_statistics() {
2506        let stats = statistics_roundtrip::<Int64Type>(&[-1, 3, -2, 2]);
2507        assert!(stats.is_min_max_backwards_compatible());
2508        if let Statistics::Int64(stats) = stats {
2509            assert_eq!(stats.min_opt().unwrap(), &-2);
2510            assert_eq!(stats.max_opt().unwrap(), &3);
2511        } else {
2512            panic!("expecting Statistics::Int64, got {stats:?}");
2513        }
2514    }
2515
2516    #[test]
2517    fn test_int96_statistics() {
2518        let input = vec![
2519            Int96::from(vec![1, 20, 30]),
2520            Int96::from(vec![3, 20, 10]),
2521            Int96::from(vec![0, 20, 30]),
2522            Int96::from(vec![2, 20, 30]),
2523        ]
2524        .into_iter()
2525        .collect::<Vec<Int96>>();
2526
2527        let stats = statistics_roundtrip::<Int96Type>(&input);
2528        assert!(!stats.is_min_max_backwards_compatible());
2529        if let Statistics::Int96(stats) = stats {
2530            assert_eq!(stats.min_opt().unwrap(), &Int96::from(vec![0, 20, 30]));
2531            assert_eq!(stats.max_opt().unwrap(), &Int96::from(vec![3, 20, 10]));
2532        } else {
2533            panic!("expecting Statistics::Int96, got {stats:?}");
2534        }
2535    }
2536
2537    #[test]
2538    fn test_float_statistics() {
2539        let stats = statistics_roundtrip::<FloatType>(&[-1.0, 3.0, -2.0, 2.0]);
2540        assert!(stats.is_min_max_backwards_compatible());
2541        if let Statistics::Float(stats) = stats {
2542            assert_eq!(stats.min_opt().unwrap(), &-2.0);
2543            assert_eq!(stats.max_opt().unwrap(), &3.0);
2544        } else {
2545            panic!("expecting Statistics::Float, got {stats:?}");
2546        }
2547    }
2548
2549    #[test]
2550    fn test_double_statistics() {
2551        let stats = statistics_roundtrip::<DoubleType>(&[-1.0, 3.0, -2.0, 2.0]);
2552        assert!(stats.is_min_max_backwards_compatible());
2553        if let Statistics::Double(stats) = stats {
2554            assert_eq!(stats.min_opt().unwrap(), &-2.0);
2555            assert_eq!(stats.max_opt().unwrap(), &3.0);
2556        } else {
2557            panic!("expecting Statistics::Double, got {stats:?}");
2558        }
2559    }
2560
2561    #[test]
2562    fn test_byte_array_statistics() {
2563        let input = ["aawaa", "zz", "aaw", "m", "qrs"]
2564            .iter()
2565            .map(|&s| s.into())
2566            .collect::<Vec<_>>();
2567
2568        let stats = statistics_roundtrip::<ByteArrayType>(&input);
2569        assert!(!stats.is_min_max_backwards_compatible());
2570        if let Statistics::ByteArray(stats) = stats {
2571            assert_eq!(stats.min_opt().unwrap(), &ByteArray::from("aaw"));
2572            assert_eq!(stats.max_opt().unwrap(), &ByteArray::from("zz"));
2573        } else {
2574            panic!("expecting Statistics::ByteArray, got {stats:?}");
2575        }
2576    }
2577
2578    #[test]
2579    fn test_fixed_len_byte_array_statistics() {
2580        let input = ["aawaa", "zz   ", "aaw  ", "m    ", "qrs  "]
2581            .iter()
2582            .map(|&s| ByteArray::from(s).into())
2583            .collect::<Vec<_>>();
2584
2585        let stats = statistics_roundtrip::<FixedLenByteArrayType>(&input);
2586        assert!(!stats.is_min_max_backwards_compatible());
2587        if let Statistics::FixedLenByteArray(stats) = stats {
2588            let expected_min: FixedLenByteArray = ByteArray::from("aaw  ").into();
2589            assert_eq!(stats.min_opt().unwrap(), &expected_min);
2590            let expected_max: FixedLenByteArray = ByteArray::from("zz   ").into();
2591            assert_eq!(stats.max_opt().unwrap(), &expected_max);
2592        } else {
2593            panic!("expecting Statistics::FixedLenByteArray, got {stats:?}");
2594        }
2595    }
2596
2597    #[test]
2598    fn test_column_writer_check_float16_min_max() {
2599        let input = [
2600            -f16::ONE,
2601            f16::from_f32(3.0),
2602            -f16::from_f32(2.0),
2603            f16::from_f32(2.0),
2604        ]
2605        .into_iter()
2606        .map(|s| ByteArray::from(s).into())
2607        .collect::<Vec<_>>();
2608
2609        let stats = float16_statistics_roundtrip(&input);
2610        assert!(stats.is_min_max_backwards_compatible());
2611        assert_eq!(
2612            stats.min_opt().unwrap(),
2613            &ByteArray::from(-f16::from_f32(2.0))
2614        );
2615        assert_eq!(
2616            stats.max_opt().unwrap(),
2617            &ByteArray::from(f16::from_f32(3.0))
2618        );
2619    }
2620
2621    #[test]
2622    fn test_column_writer_check_float16_nan_middle() {
2623        let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
2624            .into_iter()
2625            .map(|s| ByteArray::from(s).into())
2626            .collect::<Vec<_>>();
2627
2628        let stats = float16_statistics_roundtrip(&input);
2629        assert!(stats.is_min_max_backwards_compatible());
2630        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2631        assert_eq!(
2632            stats.max_opt().unwrap(),
2633            &ByteArray::from(f16::ONE + f16::ONE)
2634        );
2635    }
2636
2637    #[test]
2638    fn test_float16_statistics_nan_middle() {
2639        let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
2640            .into_iter()
2641            .map(|s| ByteArray::from(s).into())
2642            .collect::<Vec<_>>();
2643
2644        let stats = float16_statistics_roundtrip(&input);
2645        assert!(stats.is_min_max_backwards_compatible());
2646        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2647        assert_eq!(
2648            stats.max_opt().unwrap(),
2649            &ByteArray::from(f16::ONE + f16::ONE)
2650        );
2651    }
2652
2653    #[test]
2654    fn test_float16_statistics_nan_start() {
2655        let input = [f16::NAN, f16::ONE, f16::ONE + f16::ONE]
2656            .into_iter()
2657            .map(|s| ByteArray::from(s).into())
2658            .collect::<Vec<_>>();
2659
2660        let stats = float16_statistics_roundtrip(&input);
2661        assert!(stats.is_min_max_backwards_compatible());
2662        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2663        assert_eq!(
2664            stats.max_opt().unwrap(),
2665            &ByteArray::from(f16::ONE + f16::ONE)
2666        );
2667    }
2668
2669    #[test]
2670    fn test_float16_statistics_nan_only() {
2671        let input = [f16::NAN, f16::NAN]
2672            .into_iter()
2673            .map(|s| ByteArray::from(s).into())
2674            .collect::<Vec<_>>();
2675
2676        let stats = float16_statistics_roundtrip(&input);
2677        assert!(stats.min_bytes_opt().is_none());
2678        assert!(stats.max_bytes_opt().is_none());
2679        assert!(stats.is_min_max_backwards_compatible());
2680    }
2681
2682    #[test]
2683    fn test_float16_statistics_zero_only() {
2684        let input = [f16::ZERO]
2685            .into_iter()
2686            .map(|s| ByteArray::from(s).into())
2687            .collect::<Vec<_>>();
2688
2689        let stats = float16_statistics_roundtrip(&input);
2690        assert!(stats.is_min_max_backwards_compatible());
2691        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2692        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2693    }
2694
2695    #[test]
2696    fn test_float16_statistics_neg_zero_only() {
2697        let input = [f16::NEG_ZERO]
2698            .into_iter()
2699            .map(|s| ByteArray::from(s).into())
2700            .collect::<Vec<_>>();
2701
2702        let stats = float16_statistics_roundtrip(&input);
2703        assert!(stats.is_min_max_backwards_compatible());
2704        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2705        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2706    }
2707
2708    #[test]
2709    fn test_float16_statistics_zero_min() {
2710        let input = [f16::ZERO, f16::ONE, f16::NAN, f16::PI]
2711            .into_iter()
2712            .map(|s| ByteArray::from(s).into())
2713            .collect::<Vec<_>>();
2714
2715        let stats = float16_statistics_roundtrip(&input);
2716        assert!(stats.is_min_max_backwards_compatible());
2717        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2718        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::PI));
2719    }
2720
2721    #[test]
2722    fn test_float16_statistics_neg_zero_max() {
2723        let input = [f16::NEG_ZERO, f16::NEG_ONE, f16::NAN, -f16::PI]
2724            .into_iter()
2725            .map(|s| ByteArray::from(s).into())
2726            .collect::<Vec<_>>();
2727
2728        let stats = float16_statistics_roundtrip(&input);
2729        assert!(stats.is_min_max_backwards_compatible());
2730        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(-f16::PI));
2731        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2732    }
2733
2734    #[test]
2735    fn test_float_statistics_nan_middle() {
2736        let stats = statistics_roundtrip::<FloatType>(&[1.0, f32::NAN, 2.0]);
2737        assert!(stats.is_min_max_backwards_compatible());
2738        if let Statistics::Float(stats) = stats {
2739            assert_eq!(stats.min_opt().unwrap(), &1.0);
2740            assert_eq!(stats.max_opt().unwrap(), &2.0);
2741        } else {
2742            panic!("expecting Statistics::Float");
2743        }
2744    }
2745
2746    #[test]
2747    fn test_float_statistics_nan_start() {
2748        let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, 1.0, 2.0]);
2749        assert!(stats.is_min_max_backwards_compatible());
2750        if let Statistics::Float(stats) = stats {
2751            assert_eq!(stats.min_opt().unwrap(), &1.0);
2752            assert_eq!(stats.max_opt().unwrap(), &2.0);
2753        } else {
2754            panic!("expecting Statistics::Float");
2755        }
2756    }
2757
2758    #[test]
2759    fn test_float_statistics_nan_only() {
2760        let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, f32::NAN]);
2761        assert!(stats.min_bytes_opt().is_none());
2762        assert!(stats.max_bytes_opt().is_none());
2763        assert!(stats.is_min_max_backwards_compatible());
2764        assert!(matches!(stats, Statistics::Float(_)));
2765    }
2766
2767    #[test]
2768    fn test_float_statistics_zero_only() {
2769        let stats = statistics_roundtrip::<FloatType>(&[0.0]);
2770        assert!(stats.is_min_max_backwards_compatible());
2771        if let Statistics::Float(stats) = stats {
2772            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2773            assert!(stats.min_opt().unwrap().is_sign_negative());
2774            assert_eq!(stats.max_opt().unwrap(), &0.0);
2775            assert!(stats.max_opt().unwrap().is_sign_positive());
2776        } else {
2777            panic!("expecting Statistics::Float");
2778        }
2779    }
2780
2781    #[test]
2782    fn test_float_statistics_neg_zero_only() {
2783        let stats = statistics_roundtrip::<FloatType>(&[-0.0]);
2784        assert!(stats.is_min_max_backwards_compatible());
2785        if let Statistics::Float(stats) = stats {
2786            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2787            assert!(stats.min_opt().unwrap().is_sign_negative());
2788            assert_eq!(stats.max_opt().unwrap(), &0.0);
2789            assert!(stats.max_opt().unwrap().is_sign_positive());
2790        } else {
2791            panic!("expecting Statistics::Float");
2792        }
2793    }
2794
2795    #[test]
2796    fn test_float_statistics_zero_min() {
2797        let stats = statistics_roundtrip::<FloatType>(&[0.0, 1.0, f32::NAN, 2.0]);
2798        assert!(stats.is_min_max_backwards_compatible());
2799        if let Statistics::Float(stats) = stats {
2800            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2801            assert!(stats.min_opt().unwrap().is_sign_negative());
2802            assert_eq!(stats.max_opt().unwrap(), &2.0);
2803        } else {
2804            panic!("expecting Statistics::Float");
2805        }
2806    }
2807
2808    #[test]
2809    fn test_float_statistics_neg_zero_max() {
2810        let stats = statistics_roundtrip::<FloatType>(&[-0.0, -1.0, f32::NAN, -2.0]);
2811        assert!(stats.is_min_max_backwards_compatible());
2812        if let Statistics::Float(stats) = stats {
2813            assert_eq!(stats.min_opt().unwrap(), &-2.0);
2814            assert_eq!(stats.max_opt().unwrap(), &0.0);
2815            assert!(stats.max_opt().unwrap().is_sign_positive());
2816        } else {
2817            panic!("expecting Statistics::Float");
2818        }
2819    }
2820
2821    #[test]
2822    fn test_double_statistics_nan_middle() {
2823        let stats = statistics_roundtrip::<DoubleType>(&[1.0, f64::NAN, 2.0]);
2824        assert!(stats.is_min_max_backwards_compatible());
2825        if let Statistics::Double(stats) = stats {
2826            assert_eq!(stats.min_opt().unwrap(), &1.0);
2827            assert_eq!(stats.max_opt().unwrap(), &2.0);
2828        } else {
2829            panic!("expecting Statistics::Double");
2830        }
2831    }
2832
2833    #[test]
2834    fn test_double_statistics_nan_start() {
2835        let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, 1.0, 2.0]);
2836        assert!(stats.is_min_max_backwards_compatible());
2837        if let Statistics::Double(stats) = stats {
2838            assert_eq!(stats.min_opt().unwrap(), &1.0);
2839            assert_eq!(stats.max_opt().unwrap(), &2.0);
2840        } else {
2841            panic!("expecting Statistics::Double");
2842        }
2843    }
2844
2845    #[test]
2846    fn test_double_statistics_nan_only() {
2847        let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, f64::NAN]);
2848        assert!(stats.min_bytes_opt().is_none());
2849        assert!(stats.max_bytes_opt().is_none());
2850        assert!(matches!(stats, Statistics::Double(_)));
2851        assert!(stats.is_min_max_backwards_compatible());
2852    }
2853
2854    #[test]
2855    fn test_double_statistics_zero_only() {
2856        let stats = statistics_roundtrip::<DoubleType>(&[0.0]);
2857        assert!(stats.is_min_max_backwards_compatible());
2858        if let Statistics::Double(stats) = stats {
2859            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2860            assert!(stats.min_opt().unwrap().is_sign_negative());
2861            assert_eq!(stats.max_opt().unwrap(), &0.0);
2862            assert!(stats.max_opt().unwrap().is_sign_positive());
2863        } else {
2864            panic!("expecting Statistics::Double");
2865        }
2866    }
2867
2868    #[test]
2869    fn test_double_statistics_neg_zero_only() {
2870        let stats = statistics_roundtrip::<DoubleType>(&[-0.0]);
2871        assert!(stats.is_min_max_backwards_compatible());
2872        if let Statistics::Double(stats) = stats {
2873            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2874            assert!(stats.min_opt().unwrap().is_sign_negative());
2875            assert_eq!(stats.max_opt().unwrap(), &0.0);
2876            assert!(stats.max_opt().unwrap().is_sign_positive());
2877        } else {
2878            panic!("expecting Statistics::Double");
2879        }
2880    }
2881
2882    #[test]
2883    fn test_double_statistics_zero_min() {
2884        let stats = statistics_roundtrip::<DoubleType>(&[0.0, 1.0, f64::NAN, 2.0]);
2885        assert!(stats.is_min_max_backwards_compatible());
2886        if let Statistics::Double(stats) = stats {
2887            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2888            assert!(stats.min_opt().unwrap().is_sign_negative());
2889            assert_eq!(stats.max_opt().unwrap(), &2.0);
2890        } else {
2891            panic!("expecting Statistics::Double");
2892        }
2893    }
2894
2895    #[test]
2896    fn test_double_statistics_neg_zero_max() {
2897        let stats = statistics_roundtrip::<DoubleType>(&[-0.0, -1.0, f64::NAN, -2.0]);
2898        assert!(stats.is_min_max_backwards_compatible());
2899        if let Statistics::Double(stats) = stats {
2900            assert_eq!(stats.min_opt().unwrap(), &-2.0);
2901            assert_eq!(stats.max_opt().unwrap(), &0.0);
2902            assert!(stats.max_opt().unwrap().is_sign_positive());
2903        } else {
2904            panic!("expecting Statistics::Double");
2905        }
2906    }
2907
2908    #[test]
2909    fn test_compare_greater_byte_array_decimals() {
2910        assert!(!compare_greater_byte_array_decimals(&[], &[],),);
2911        assert!(compare_greater_byte_array_decimals(&[1u8,], &[],),);
2912        assert!(!compare_greater_byte_array_decimals(&[], &[1u8,],),);
2913        assert!(compare_greater_byte_array_decimals(&[1u8,], &[0u8,],),);
2914        assert!(!compare_greater_byte_array_decimals(&[1u8,], &[1u8,],),);
2915        assert!(compare_greater_byte_array_decimals(&[1u8, 0u8,], &[0u8,],),);
2916        assert!(!compare_greater_byte_array_decimals(
2917            &[0u8, 1u8,],
2918            &[1u8, 0u8,],
2919        ),);
2920        assert!(!compare_greater_byte_array_decimals(
2921            &[255u8, 35u8, 0u8, 0u8,],
2922            &[0u8,],
2923        ),);
2924        assert!(compare_greater_byte_array_decimals(
2925            &[0u8,],
2926            &[255u8, 35u8, 0u8, 0u8,],
2927        ),);
2928    }
2929
2930    #[test]
2931    fn test_column_index_with_null_pages() {
2932        // write a single page of all nulls
2933        let page_writer = get_test_page_writer();
2934        let props = Default::default();
2935        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
2936        writer.write_batch(&[], Some(&[0, 0, 0, 0]), None).unwrap();
2937
2938        let r = writer.close().unwrap();
2939        assert!(r.column_index.is_some());
2940        let col_idx = r.column_index.unwrap();
2941        // null_pages should be true for page 0
2942        assert!(col_idx.null_pages[0]);
2943        // min and max should be empty byte arrays
2944        assert_eq!(col_idx.min_values[0].len(), 0);
2945        assert_eq!(col_idx.max_values[0].len(), 0);
2946        // null_counts should be defined and be 4 for page 0
2947        assert!(col_idx.null_counts.is_some());
2948        assert_eq!(col_idx.null_counts.as_ref().unwrap()[0], 4);
2949        // there is no repetition so rep histogram should be absent
2950        assert!(col_idx.repetition_level_histograms.is_none());
2951        // definition_level_histogram should be present and should be 0:4, 1:0
2952        assert!(col_idx.definition_level_histograms.is_some());
2953        assert_eq!(col_idx.definition_level_histograms.unwrap(), &[4, 0]);
2954    }
2955
2956    #[test]
2957    fn test_column_offset_index_metadata() {
2958        // write data
2959        // and check the offset index and column index
2960        let page_writer = get_test_page_writer();
2961        let props = Default::default();
2962        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2963        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2964        // first page
2965        writer.flush_data_pages().unwrap();
2966        // second page
2967        writer.write_batch(&[4, 8, 2, -5], None, None).unwrap();
2968
2969        let r = writer.close().unwrap();
2970        let column_index = r.column_index.unwrap();
2971        let offset_index = r.offset_index.unwrap();
2972
2973        assert_eq!(8, r.rows_written);
2974
2975        // column index
2976        assert_eq!(2, column_index.null_pages.len());
2977        assert_eq!(2, offset_index.page_locations.len());
2978        assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order);
2979        for idx in 0..2 {
2980            assert!(!column_index.null_pages[idx]);
2981            assert_eq!(0, column_index.null_counts.as_ref().unwrap()[idx]);
2982        }
2983
2984        if let Some(stats) = r.metadata.statistics() {
2985            assert_eq!(stats.null_count_opt(), Some(0));
2986            assert_eq!(stats.distinct_count_opt(), None);
2987            if let Statistics::Int32(stats) = stats {
2988                // first page is [1,2,3,4]
2989                // second page is [-5,2,4,8]
2990                // note that we don't increment here, as this is a non BinaryArray type.
2991                assert_eq!(
2992                    stats.min_bytes_opt(),
2993                    Some(column_index.min_values[1].as_slice())
2994                );
2995                assert_eq!(
2996                    stats.max_bytes_opt(),
2997                    column_index.max_values.get(1).map(Vec::as_slice)
2998                );
2999            } else {
3000                panic!("expecting Statistics::Int32");
3001            }
3002        } else {
3003            panic!("metadata missing statistics");
3004        }
3005
3006        // page location
3007        assert_eq!(0, offset_index.page_locations[0].first_row_index);
3008        assert_eq!(4, offset_index.page_locations[1].first_row_index);
3009    }
3010
3011    /// Verify min/max value truncation in the column index works as expected
3012    #[test]
3013    fn test_column_offset_index_metadata_truncating() {
3014        // write data
3015        // and check the offset index and column index
3016        let page_writer = get_test_page_writer();
3017        let props = WriterProperties::builder()
3018            .set_statistics_truncate_length(None) // disable column index truncation
3019            .build()
3020            .into();
3021        let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3022
3023        let mut data = vec![FixedLenByteArray::default(); 3];
3024        // This is the expected min value - "aaa..."
3025        data[0].set_data(Bytes::from(vec![97_u8; 200]));
3026        // This is the expected max value - "ZZZ..."
3027        data[1].set_data(Bytes::from(vec![112_u8; 200]));
3028        data[2].set_data(Bytes::from(vec![98_u8; 200]));
3029
3030        writer.write_batch(&data, None, None).unwrap();
3031
3032        writer.flush_data_pages().unwrap();
3033
3034        let r = writer.close().unwrap();
3035        let column_index = r.column_index.unwrap();
3036        let offset_index = r.offset_index.unwrap();
3037
3038        assert_eq!(3, r.rows_written);
3039
3040        // column index
3041        assert_eq!(1, column_index.null_pages.len());
3042        assert_eq!(1, offset_index.page_locations.len());
3043        assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
3044        assert!(!column_index.null_pages[0]);
3045        assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]);
3046
3047        if let Some(stats) = r.metadata.statistics() {
3048            assert_eq!(stats.null_count_opt(), Some(0));
3049            assert_eq!(stats.distinct_count_opt(), None);
3050            if let Statistics::FixedLenByteArray(stats) = stats {
3051                let column_index_min_value = &column_index.min_values[0];
3052                let column_index_max_value = &column_index.max_values[0];
3053
3054                // Column index stats are truncated, while the column chunk's aren't.
3055                assert_ne!(
3056                    stats.min_bytes_opt(),
3057                    Some(column_index_min_value.as_slice())
3058                );
3059                assert_ne!(
3060                    stats.max_bytes_opt(),
3061                    Some(column_index_max_value.as_slice())
3062                );
3063
3064                assert_eq!(
3065                    column_index_min_value.len(),
3066                    DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
3067                );
3068                assert_eq!(column_index_min_value.as_slice(), &[97_u8; 64]);
3069                assert_eq!(
3070                    column_index_max_value.len(),
3071                    DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
3072                );
3073
3074                // We expect the last byte to be incremented
3075                assert_eq!(
3076                    *column_index_max_value.last().unwrap(),
3077                    *column_index_max_value.first().unwrap() + 1
3078                );
3079            } else {
3080                panic!("expecting Statistics::FixedLenByteArray");
3081            }
3082        } else {
3083            panic!("metadata missing statistics");
3084        }
3085    }
3086
3087    #[test]
3088    fn test_column_offset_index_truncating_spec_example() {
3089        // write data
3090        // and check the offset index and column index
3091        let page_writer = get_test_page_writer();
3092
3093        // Truncate values at 1 byte
3094        let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3095        let props = Arc::new(builder.build());
3096        let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3097
3098        let mut data = vec![FixedLenByteArray::default(); 1];
3099        // This is the expected min value
3100        data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
3101
3102        writer.write_batch(&data, None, None).unwrap();
3103
3104        writer.flush_data_pages().unwrap();
3105
3106        let r = writer.close().unwrap();
3107        let column_index = r.column_index.unwrap();
3108        let offset_index = r.offset_index.unwrap();
3109
3110        assert_eq!(1, r.rows_written);
3111
3112        // column index
3113        assert_eq!(1, column_index.null_pages.len());
3114        assert_eq!(1, offset_index.page_locations.len());
3115        assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
3116        assert!(!column_index.null_pages[0]);
3117        assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]);
3118
3119        if let Some(stats) = r.metadata.statistics() {
3120            assert_eq!(stats.null_count_opt(), Some(0));
3121            assert_eq!(stats.distinct_count_opt(), None);
3122            if let Statistics::FixedLenByteArray(_stats) = stats {
3123                let column_index_min_value = &column_index.min_values[0];
3124                let column_index_max_value = &column_index.max_values[0];
3125
3126                assert_eq!(column_index_min_value.len(), 1);
3127                assert_eq!(column_index_max_value.len(), 1);
3128
3129                assert_eq!("B".as_bytes(), column_index_min_value.as_slice());
3130                assert_eq!("C".as_bytes(), column_index_max_value.as_slice());
3131
3132                assert_ne!(column_index_min_value, stats.min_bytes_opt().unwrap());
3133                assert_ne!(column_index_max_value, stats.max_bytes_opt().unwrap());
3134            } else {
3135                panic!("expecting Statistics::FixedLenByteArray");
3136            }
3137        } else {
3138            panic!("metadata missing statistics");
3139        }
3140    }
3141
3142    #[test]
3143    fn test_float16_min_max_no_truncation() {
3144        // Even if we set truncation to occur at 1 byte, we should not truncate for Float16
3145        let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3146        let props = Arc::new(builder.build());
3147        let page_writer = get_test_page_writer();
3148        let mut writer = get_test_float16_column_writer(page_writer, props);
3149
3150        let expected_value = f16::PI.to_le_bytes().to_vec();
3151        let data = vec![ByteArray::from(expected_value.clone()).into()];
3152        writer.write_batch(&data, None, None).unwrap();
3153        writer.flush_data_pages().unwrap();
3154
3155        let r = writer.close().unwrap();
3156
3157        // stats should still be written
3158        // ensure bytes weren't truncated for column index
3159        let column_index = r.column_index.unwrap();
3160        let column_index_min_bytes = column_index.min_values[0].as_slice();
3161        let column_index_max_bytes = column_index.max_values[0].as_slice();
3162        assert_eq!(expected_value, column_index_min_bytes);
3163        assert_eq!(expected_value, column_index_max_bytes);
3164
3165        // ensure bytes weren't truncated for statistics
3166        let stats = r.metadata.statistics().unwrap();
3167        if let Statistics::FixedLenByteArray(stats) = stats {
3168            let stats_min_bytes = stats.min_bytes_opt().unwrap();
3169            let stats_max_bytes = stats.max_bytes_opt().unwrap();
3170            assert_eq!(expected_value, stats_min_bytes);
3171            assert_eq!(expected_value, stats_max_bytes);
3172        } else {
3173            panic!("expecting Statistics::FixedLenByteArray");
3174        }
3175    }
3176
3177    #[test]
3178    fn test_decimal_min_max_no_truncation() {
3179        // Even if we set truncation to occur at 1 byte, we should not truncate for Decimal
3180        let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3181        let props = Arc::new(builder.build());
3182        let page_writer = get_test_page_writer();
3183        let mut writer =
3184            get_test_decimals_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3185
3186        let expected_value = vec![
3187            255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8, 35u8,
3188            231u8, 90u8, 0u8, 0u8,
3189        ];
3190        let data = vec![ByteArray::from(expected_value.clone()).into()];
3191        writer.write_batch(&data, None, None).unwrap();
3192        writer.flush_data_pages().unwrap();
3193
3194        let r = writer.close().unwrap();
3195
3196        // stats should still be written
3197        // ensure bytes weren't truncated for column index
3198        let column_index = r.column_index.unwrap();
3199        let column_index_min_bytes = column_index.min_values[0].as_slice();
3200        let column_index_max_bytes = column_index.max_values[0].as_slice();
3201        assert_eq!(expected_value, column_index_min_bytes);
3202        assert_eq!(expected_value, column_index_max_bytes);
3203
3204        // ensure bytes weren't truncated for statistics
3205        let stats = r.metadata.statistics().unwrap();
3206        if let Statistics::FixedLenByteArray(stats) = stats {
3207            let stats_min_bytes = stats.min_bytes_opt().unwrap();
3208            let stats_max_bytes = stats.max_bytes_opt().unwrap();
3209            assert_eq!(expected_value, stats_min_bytes);
3210            assert_eq!(expected_value, stats_max_bytes);
3211        } else {
3212            panic!("expecting Statistics::FixedLenByteArray");
3213        }
3214    }
3215
3216    #[test]
3217    fn test_statistics_truncating_byte_array_default() {
3218        let page_writer = get_test_page_writer();
3219
3220        // The default truncate length is 64 bytes
3221        let props = WriterProperties::builder().build().into();
3222        let mut writer = get_test_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
3223
3224        let mut data = vec![ByteArray::default(); 1];
3225        data[0].set_data(Bytes::from(String::from(
3226            "This string is longer than 64 bytes, so it will almost certainly be truncated.",
3227        )));
3228        writer.write_batch(&data, None, None).unwrap();
3229        writer.flush_data_pages().unwrap();
3230
3231        let r = writer.close().unwrap();
3232
3233        assert_eq!(1, r.rows_written);
3234
3235        let stats = r.metadata.statistics().expect("statistics");
3236        if let Statistics::ByteArray(_stats) = stats {
3237            let min_value = _stats.min_opt().unwrap();
3238            let max_value = _stats.max_opt().unwrap();
3239
3240            assert!(!_stats.min_is_exact());
3241            assert!(!_stats.max_is_exact());
3242
3243            let expected_len = 64;
3244            assert_eq!(min_value.len(), expected_len);
3245            assert_eq!(max_value.len(), expected_len);
3246
3247            let expected_min =
3248                "This string is longer than 64 bytes, so it will almost certainly".as_bytes();
3249            assert_eq!(expected_min, min_value.as_bytes());
3250            // note the max value is different from the min value: the last byte is incremented
3251            let expected_max =
3252                "This string is longer than 64 bytes, so it will almost certainlz".as_bytes();
3253            assert_eq!(expected_max, max_value.as_bytes());
3254        } else {
3255            panic!("expecting Statistics::ByteArray");
3256        }
3257    }
3258
3259    #[test]
3260    fn test_statistics_truncating_byte_array() {
3261        let page_writer = get_test_page_writer();
3262
3263        const TEST_TRUNCATE_LENGTH: usize = 1;
3264
3265        // Truncate values at 1 byte
3266        let builder =
3267            WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3268        let props = Arc::new(builder.build());
3269        let mut writer = get_test_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
3270
3271        let mut data = vec![ByteArray::default(); 1];
3272        // This is the expected min value
3273        data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
3274
3275        writer.write_batch(&data, None, None).unwrap();
3276
3277        writer.flush_data_pages().unwrap();
3278
3279        let r = writer.close().unwrap();
3280
3281        assert_eq!(1, r.rows_written);
3282
3283        let stats = r.metadata.statistics().expect("statistics");
3284        assert_eq!(stats.null_count_opt(), Some(0));
3285        assert_eq!(stats.distinct_count_opt(), None);
3286        if let Statistics::ByteArray(_stats) = stats {
3287            let min_value = _stats.min_opt().unwrap();
3288            let max_value = _stats.max_opt().unwrap();
3289
3290            assert!(!_stats.min_is_exact());
3291            assert!(!_stats.max_is_exact());
3292
3293            assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3294            assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3295
3296            assert_eq!("B".as_bytes(), min_value.as_bytes());
3297            assert_eq!("C".as_bytes(), max_value.as_bytes());
3298        } else {
3299            panic!("expecting Statistics::ByteArray");
3300        }
3301    }
3302
3303    #[test]
3304    fn test_statistics_truncating_fixed_len_byte_array() {
3305        let page_writer = get_test_page_writer();
3306
3307        const TEST_TRUNCATE_LENGTH: usize = 1;
3308
3309        // Truncate values at 1 byte
3310        let builder =
3311            WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3312        let props = Arc::new(builder.build());
3313        let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3314
3315        let mut data = vec![FixedLenByteArray::default(); 1];
3316
3317        const PSEUDO_DECIMAL_VALUE: i128 = 6541894651216648486512564456564654;
3318        const PSEUDO_DECIMAL_BYTES: [u8; 16] = PSEUDO_DECIMAL_VALUE.to_be_bytes();
3319
3320        const EXPECTED_MIN: [u8; TEST_TRUNCATE_LENGTH] = [PSEUDO_DECIMAL_BYTES[0]]; // parquet specifies big-endian order for decimals
3321        const EXPECTED_MAX: [u8; TEST_TRUNCATE_LENGTH] =
3322            [PSEUDO_DECIMAL_BYTES[0].overflowing_add(1).0];
3323
3324        // This is the expected min value
3325        data[0].set_data(Bytes::from(PSEUDO_DECIMAL_BYTES.as_slice()));
3326
3327        writer.write_batch(&data, None, None).unwrap();
3328
3329        writer.flush_data_pages().unwrap();
3330
3331        let r = writer.close().unwrap();
3332
3333        assert_eq!(1, r.rows_written);
3334
3335        let stats = r.metadata.statistics().expect("statistics");
3336        assert_eq!(stats.null_count_opt(), Some(0));
3337        assert_eq!(stats.distinct_count_opt(), None);
3338        if let Statistics::FixedLenByteArray(_stats) = stats {
3339            let min_value = _stats.min_opt().unwrap();
3340            let max_value = _stats.max_opt().unwrap();
3341
3342            assert!(!_stats.min_is_exact());
3343            assert!(!_stats.max_is_exact());
3344
3345            assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3346            assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3347
3348            assert_eq!(EXPECTED_MIN.as_slice(), min_value.as_bytes());
3349            assert_eq!(EXPECTED_MAX.as_slice(), max_value.as_bytes());
3350
3351            let reconstructed_min = i128::from_be_bytes([
3352                min_value.as_bytes()[0],
3353                0,
3354                0,
3355                0,
3356                0,
3357                0,
3358                0,
3359                0,
3360                0,
3361                0,
3362                0,
3363                0,
3364                0,
3365                0,
3366                0,
3367                0,
3368            ]);
3369
3370            let reconstructed_max = i128::from_be_bytes([
3371                max_value.as_bytes()[0],
3372                0,
3373                0,
3374                0,
3375                0,
3376                0,
3377                0,
3378                0,
3379                0,
3380                0,
3381                0,
3382                0,
3383                0,
3384                0,
3385                0,
3386                0,
3387            ]);
3388
3389            // check that the inner value is correctly bounded by the min/max
3390            println!("min: {reconstructed_min} {PSEUDO_DECIMAL_VALUE}");
3391            assert!(reconstructed_min <= PSEUDO_DECIMAL_VALUE);
3392            println!("max {reconstructed_max} {PSEUDO_DECIMAL_VALUE}");
3393            assert!(reconstructed_max >= PSEUDO_DECIMAL_VALUE);
3394        } else {
3395            panic!("expecting Statistics::FixedLenByteArray");
3396        }
3397    }
3398
3399    #[test]
3400    fn test_send() {
3401        fn test<T: Send>() {}
3402        test::<ColumnWriterImpl<Int32Type>>();
3403    }
3404
3405    #[test]
3406    fn test_increment() {
3407        let v = increment(vec![0, 0, 0]).unwrap();
3408        assert_eq!(&v, &[0, 0, 1]);
3409
3410        // Handle overflow
3411        let v = increment(vec![0, 255, 255]).unwrap();
3412        assert_eq!(&v, &[1, 0, 0]);
3413
3414        // Return `None` if all bytes are u8::MAX
3415        let v = increment(vec![255, 255, 255]);
3416        assert!(v.is_none());
3417    }
3418
3419    #[test]
3420    fn test_increment_utf8() {
3421        let test_inc = |o: &str, expected: &str| {
3422            if let Ok(v) = String::from_utf8(increment_utf8(o).unwrap()) {
3423                // Got the expected result...
3424                assert_eq!(v, expected);
3425                // and it's greater than the original string
3426                assert!(*v > *o);
3427                // Also show that BinaryArray level comparison works here
3428                let mut greater = ByteArray::new();
3429                greater.set_data(Bytes::from(v));
3430                let mut original = ByteArray::new();
3431                original.set_data(Bytes::from(o.as_bytes().to_vec()));
3432                assert!(greater > original);
3433            } else {
3434                panic!("Expected incremented UTF8 string to also be valid.");
3435            }
3436        };
3437
3438        // Basic ASCII case
3439        test_inc("hello", "hellp");
3440
3441        // 1-byte ending in max 1-byte
3442        test_inc("a\u{7f}", "b");
3443
3444        // 1-byte max should not truncate as it would need 2-byte code points
3445        assert!(increment_utf8("\u{7f}\u{7f}").is_none());
3446
3447        // UTF8 string
3448        test_inc("❤️🧡💛💚💙💜", "❤️🧡💛💚💙💝");
3449
3450        // 2-byte without overflow
3451        test_inc("éééé", "éééê");
3452
3453        // 2-byte that overflows lowest byte
3454        test_inc("\u{ff}\u{ff}", "\u{ff}\u{100}");
3455
3456        // 2-byte ending in max 2-byte
3457        test_inc("a\u{7ff}", "b");
3458
3459        // Max 2-byte should not truncate as it would need 3-byte code points
3460        assert!(increment_utf8("\u{7ff}\u{7ff}").is_none());
3461
3462        // 3-byte without overflow [U+800, U+800] -> [U+800, U+801] (note that these
3463        // characters should render right to left).
3464        test_inc("ࠀࠀ", "ࠀࠁ");
3465
3466        // 3-byte ending in max 3-byte
3467        test_inc("a\u{ffff}", "b");
3468
3469        // Max 3-byte should not truncate as it would need 4-byte code points
3470        assert!(increment_utf8("\u{ffff}\u{ffff}").is_none());
3471
3472        // 4-byte without overflow
3473        test_inc("𐀀𐀀", "𐀀𐀁");
3474
3475        // 4-byte ending in max unicode
3476        test_inc("a\u{10ffff}", "b");
3477
3478        // Max 4-byte should not truncate
3479        assert!(increment_utf8("\u{10ffff}\u{10ffff}").is_none());
3480
3481        // Skip over surrogate pair range (0xD800..=0xDFFF)
3482        //test_inc("a\u{D7FF}", "a\u{e000}");
3483        test_inc("a\u{D7FF}", "b");
3484    }
3485
3486    #[test]
3487    fn test_truncate_utf8() {
3488        // No-op
3489        let data = "❤️🧡💛💚💙💜";
3490        let r = truncate_utf8(data, data.len()).unwrap();
3491        assert_eq!(r.len(), data.len());
3492        assert_eq!(&r, data.as_bytes());
3493
3494        // We slice it away from the UTF8 boundary
3495        let r = truncate_utf8(data, 13).unwrap();
3496        assert_eq!(r.len(), 10);
3497        assert_eq!(&r, "❤️🧡".as_bytes());
3498
3499        // One multi-byte code point, and a length shorter than it, so we can't slice it
3500        let r = truncate_utf8("\u{0836}", 1);
3501        assert!(r.is_none());
3502
3503        // Test truncate and increment for max bounds on UTF-8 statistics
3504        // 7-bit (i.e. ASCII)
3505        let r = truncate_and_increment_utf8("yyyyyyyyy", 8).unwrap();
3506        assert_eq!(&r, "yyyyyyyz".as_bytes());
3507
3508        // 2-byte without overflow
3509        let r = truncate_and_increment_utf8("ééééé", 7).unwrap();
3510        assert_eq!(&r, "ééê".as_bytes());
3511
3512        // 2-byte that overflows lowest byte
3513        let r = truncate_and_increment_utf8("\u{ff}\u{ff}\u{ff}\u{ff}\u{ff}", 8).unwrap();
3514        assert_eq!(&r, "\u{ff}\u{ff}\u{ff}\u{100}".as_bytes());
3515
3516        // max 2-byte should not truncate as it would need 3-byte code points
3517        let r = truncate_and_increment_utf8("߿߿߿߿߿", 8);
3518        assert!(r.is_none());
3519
3520        // 3-byte without overflow [U+800, U+800, U+800] -> [U+800, U+801] (note that these
3521        // characters should render right to left).
3522        let r = truncate_and_increment_utf8("ࠀࠀࠀࠀ", 8).unwrap();
3523        assert_eq!(&r, "ࠀࠁ".as_bytes());
3524
3525        // max 3-byte should not truncate as it would need 4-byte code points
3526        let r = truncate_and_increment_utf8("\u{ffff}\u{ffff}\u{ffff}", 8);
3527        assert!(r.is_none());
3528
3529        // 4-byte without overflow
3530        let r = truncate_and_increment_utf8("𐀀𐀀𐀀𐀀", 9).unwrap();
3531        assert_eq!(&r, "𐀀𐀁".as_bytes());
3532
3533        // max 4-byte should not truncate
3534        let r = truncate_and_increment_utf8("\u{10ffff}\u{10ffff}", 8);
3535        assert!(r.is_none());
3536    }
3537
3538    #[test]
3539    // Check fallback truncation of statistics that should be UTF-8, but aren't
3540    // (see https://github.com/apache/arrow-rs/pull/6870).
3541    fn test_byte_array_truncate_invalid_utf8_statistics() {
3542        let message_type = "
3543            message test_schema {
3544                OPTIONAL BYTE_ARRAY a (UTF8);
3545            }
3546        ";
3547        let schema = Arc::new(parse_message_type(message_type).unwrap());
3548
3549        // Create Vec<ByteArray> containing non-UTF8 bytes
3550        let data = vec![ByteArray::from(vec![128u8; 32]); 7];
3551        let def_levels = [1, 1, 1, 1, 0, 1, 0, 1, 0, 1];
3552        let file: File = tempfile::tempfile().unwrap();
3553        let props = Arc::new(
3554            WriterProperties::builder()
3555                .set_statistics_enabled(EnabledStatistics::Chunk)
3556                .set_statistics_truncate_length(Some(8))
3557                .build(),
3558        );
3559
3560        let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
3561        let mut row_group_writer = writer.next_row_group().unwrap();
3562
3563        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
3564        col_writer
3565            .typed::<ByteArrayType>()
3566            .write_batch(&data, Some(&def_levels), None)
3567            .unwrap();
3568        col_writer.close().unwrap();
3569        row_group_writer.close().unwrap();
3570        let file_metadata = writer.close().unwrap();
3571        assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
3572        let stats = file_metadata.row_groups[0].columns[0]
3573            .meta_data
3574            .as_ref()
3575            .unwrap()
3576            .statistics
3577            .as_ref()
3578            .unwrap();
3579        assert!(!stats.is_max_value_exact.unwrap());
3580        // Truncation of invalid UTF-8 should fall back to binary truncation, so last byte should
3581        // be incremented by 1.
3582        assert_eq!(
3583            stats.max_value,
3584            Some([128, 128, 128, 128, 128, 128, 128, 129].to_vec())
3585        );
3586    }
3587
3588    #[test]
3589    fn test_increment_max_binary_chars() {
3590        let r = increment(vec![0xFF, 0xFE, 0xFD, 0xFF, 0xFF]);
3591        assert_eq!(&r.unwrap(), &[0xFF, 0xFE, 0xFE, 0x00, 0x00]);
3592
3593        let incremented = increment(vec![0xFF, 0xFF, 0xFF]);
3594        assert!(incremented.is_none())
3595    }
3596
3597    #[test]
3598    fn test_no_column_index_when_stats_disabled() {
3599        // https://github.com/apache/arrow-rs/issues/6010
3600        // Test that column index is not created/written for all-nulls column when page
3601        // statistics are disabled.
3602        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3603        let props = Arc::new(
3604            WriterProperties::builder()
3605                .set_statistics_enabled(EnabledStatistics::None)
3606                .build(),
3607        );
3608        let column_writer = get_column_writer(descr, props, get_test_page_writer());
3609        let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3610
3611        let data = Vec::new();
3612        let def_levels = vec![0; 10];
3613        writer.write_batch(&data, Some(&def_levels), None).unwrap();
3614        writer.flush_data_pages().unwrap();
3615
3616        let column_close_result = writer.close().unwrap();
3617        assert!(column_close_result.offset_index.is_some());
3618        assert!(column_close_result.column_index.is_none());
3619    }
3620
3621    #[test]
3622    fn test_no_offset_index_when_disabled() {
3623        // Test that offset indexes can be disabled
3624        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3625        let props = Arc::new(
3626            WriterProperties::builder()
3627                .set_statistics_enabled(EnabledStatistics::None)
3628                .set_offset_index_disabled(true)
3629                .build(),
3630        );
3631        let column_writer = get_column_writer(descr, props, get_test_page_writer());
3632        let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3633
3634        let data = Vec::new();
3635        let def_levels = vec![0; 10];
3636        writer.write_batch(&data, Some(&def_levels), None).unwrap();
3637        writer.flush_data_pages().unwrap();
3638
3639        let column_close_result = writer.close().unwrap();
3640        assert!(column_close_result.offset_index.is_none());
3641        assert!(column_close_result.column_index.is_none());
3642    }
3643
3644    #[test]
3645    fn test_offset_index_overridden() {
3646        // Test that offset indexes are not disabled when gathering page statistics
3647        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3648        let props = Arc::new(
3649            WriterProperties::builder()
3650                .set_statistics_enabled(EnabledStatistics::Page)
3651                .set_offset_index_disabled(true)
3652                .build(),
3653        );
3654        let column_writer = get_column_writer(descr, props, get_test_page_writer());
3655        let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3656
3657        let data = Vec::new();
3658        let def_levels = vec![0; 10];
3659        writer.write_batch(&data, Some(&def_levels), None).unwrap();
3660        writer.flush_data_pages().unwrap();
3661
3662        let column_close_result = writer.close().unwrap();
3663        assert!(column_close_result.offset_index.is_some());
3664        assert!(column_close_result.column_index.is_some());
3665    }
3666
3667    #[test]
3668    fn test_boundary_order() -> Result<()> {
3669        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3670        // min max both ascending
3671        let column_close_result = write_multiple_pages::<Int32Type>(
3672            &descr,
3673            &[
3674                &[Some(-10), Some(10)],
3675                &[Some(-5), Some(11)],
3676                &[None],
3677                &[Some(-5), Some(11)],
3678            ],
3679        )?;
3680        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3681        assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3682
3683        // min max both descending
3684        let column_close_result = write_multiple_pages::<Int32Type>(
3685            &descr,
3686            &[
3687                &[Some(10), Some(11)],
3688                &[Some(5), Some(11)],
3689                &[None],
3690                &[Some(-5), Some(0)],
3691            ],
3692        )?;
3693        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3694        assert_eq!(boundary_order, BoundaryOrder::DESCENDING);
3695
3696        // min max both equal
3697        let column_close_result = write_multiple_pages::<Int32Type>(
3698            &descr,
3699            &[&[Some(10), Some(11)], &[None], &[Some(10), Some(11)]],
3700        )?;
3701        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3702        assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3703
3704        // only nulls
3705        let column_close_result =
3706            write_multiple_pages::<Int32Type>(&descr, &[&[None], &[None], &[None]])?;
3707        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3708        assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3709
3710        // one page
3711        let column_close_result =
3712            write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)]])?;
3713        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3714        assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3715
3716        // one non-null page
3717        let column_close_result =
3718            write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)], &[None]])?;
3719        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3720        assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3721
3722        // min max both unordered
3723        let column_close_result = write_multiple_pages::<Int32Type>(
3724            &descr,
3725            &[
3726                &[Some(10), Some(11)],
3727                &[Some(11), Some(16)],
3728                &[None],
3729                &[Some(-5), Some(0)],
3730            ],
3731        )?;
3732        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3733        assert_eq!(boundary_order, BoundaryOrder::UNORDERED);
3734
3735        // min max both ordered in different orders
3736        let column_close_result = write_multiple_pages::<Int32Type>(
3737            &descr,
3738            &[
3739                &[Some(1), Some(9)],
3740                &[Some(2), Some(8)],
3741                &[None],
3742                &[Some(3), Some(7)],
3743            ],
3744        )?;
3745        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3746        assert_eq!(boundary_order, BoundaryOrder::UNORDERED);
3747
3748        Ok(())
3749    }
3750
3751    #[test]
3752    fn test_boundary_order_logical_type() -> Result<()> {
3753        // ensure that logical types account for different sort order than underlying
3754        // physical type representation
3755        let f16_descr = Arc::new(get_test_float16_column_descr(1, 0));
3756        let fba_descr = {
3757            let tpe = SchemaType::primitive_type_builder(
3758                "col",
3759                FixedLenByteArrayType::get_physical_type(),
3760            )
3761            .with_length(2)
3762            .build()?;
3763            Arc::new(ColumnDescriptor::new(
3764                Arc::new(tpe),
3765                1,
3766                0,
3767                ColumnPath::from("col"),
3768            ))
3769        };
3770
3771        let values: &[&[Option<FixedLenByteArray>]] = &[
3772            &[Some(FixedLenByteArray::from(ByteArray::from(f16::ONE)))],
3773            &[Some(FixedLenByteArray::from(ByteArray::from(f16::ZERO)))],
3774            &[Some(FixedLenByteArray::from(ByteArray::from(
3775                f16::NEG_ZERO,
3776            )))],
3777            &[Some(FixedLenByteArray::from(ByteArray::from(f16::NEG_ONE)))],
3778        ];
3779
3780        // f16 descending
3781        let column_close_result =
3782            write_multiple_pages::<FixedLenByteArrayType>(&f16_descr, values)?;
3783        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3784        assert_eq!(boundary_order, BoundaryOrder::DESCENDING);
3785
3786        // same bytes, but fba unordered
3787        let column_close_result =
3788            write_multiple_pages::<FixedLenByteArrayType>(&fba_descr, values)?;
3789        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3790        assert_eq!(boundary_order, BoundaryOrder::UNORDERED);
3791
3792        Ok(())
3793    }
3794
3795    #[test]
3796    fn test_interval_stats_should_not_have_min_max() {
3797        let input = [
3798            vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
3799            vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
3800            vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2],
3801        ]
3802        .into_iter()
3803        .map(|s| ByteArray::from(s).into())
3804        .collect::<Vec<_>>();
3805
3806        let page_writer = get_test_page_writer();
3807        let mut writer = get_test_interval_column_writer(page_writer);
3808        writer.write_batch(&input, None, None).unwrap();
3809
3810        let metadata = writer.close().unwrap().metadata;
3811        let stats = if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
3812            stats.clone()
3813        } else {
3814            panic!("metadata missing statistics");
3815        };
3816        assert!(stats.min_bytes_opt().is_none());
3817        assert!(stats.max_bytes_opt().is_none());
3818    }
3819
3820    #[test]
3821    #[cfg(feature = "arrow")]
3822    fn test_column_writer_get_estimated_total_bytes() {
3823        let page_writer = get_test_page_writer();
3824        let props = Default::default();
3825        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
3826        assert_eq!(writer.get_estimated_total_bytes(), 0);
3827
3828        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
3829        writer.add_data_page().unwrap();
3830        let size_with_one_page = writer.get_estimated_total_bytes();
3831        assert_eq!(size_with_one_page, 20);
3832
3833        writer.write_batch(&[5, 6, 7, 8], None, None).unwrap();
3834        writer.add_data_page().unwrap();
3835        let size_with_two_pages = writer.get_estimated_total_bytes();
3836        // different pages have different compressed lengths
3837        assert_eq!(size_with_two_pages, 20 + 21);
3838    }
3839
3840    fn write_multiple_pages<T: DataType>(
3841        column_descr: &Arc<ColumnDescriptor>,
3842        pages: &[&[Option<T::T>]],
3843    ) -> Result<ColumnCloseResult> {
3844        let column_writer = get_column_writer(
3845            column_descr.clone(),
3846            Default::default(),
3847            get_test_page_writer(),
3848        );
3849        let mut writer = get_typed_column_writer::<T>(column_writer);
3850
3851        for &page in pages {
3852            let values = page.iter().filter_map(Clone::clone).collect::<Vec<_>>();
3853            let def_levels = page
3854                .iter()
3855                .map(|maybe_value| if maybe_value.is_some() { 1 } else { 0 })
3856                .collect::<Vec<_>>();
3857            writer.write_batch(&values, Some(&def_levels), None)?;
3858            writer.flush_data_pages()?;
3859        }
3860
3861        writer.close()
3862    }
3863
3864    /// Performs write-read roundtrip with randomly generated values and levels.
3865    /// `max_size` is maximum number of values or levels (if `max_def_level` > 0) to write
3866    /// for a column.
3867    fn column_roundtrip_random<T: DataType>(
3868        props: WriterProperties,
3869        max_size: usize,
3870        min_value: T::T,
3871        max_value: T::T,
3872        max_def_level: i16,
3873        max_rep_level: i16,
3874    ) where
3875        T::T: PartialOrd + SampleUniform + Copy,
3876    {
3877        let mut num_values: usize = 0;
3878
3879        let mut buf: Vec<i16> = Vec::new();
3880        let def_levels = if max_def_level > 0 {
3881            random_numbers_range(max_size, 0, max_def_level + 1, &mut buf);
3882            for &dl in &buf[..] {
3883                if dl == max_def_level {
3884                    num_values += 1;
3885                }
3886            }
3887            Some(&buf[..])
3888        } else {
3889            num_values = max_size;
3890            None
3891        };
3892
3893        let mut buf: Vec<i16> = Vec::new();
3894        let rep_levels = if max_rep_level > 0 {
3895            random_numbers_range(max_size, 0, max_rep_level + 1, &mut buf);
3896            buf[0] = 0; // Must start on record boundary
3897            Some(&buf[..])
3898        } else {
3899            None
3900        };
3901
3902        let mut values: Vec<T::T> = Vec::new();
3903        random_numbers_range(num_values, min_value, max_value, &mut values);
3904
3905        column_roundtrip::<T>(props, &values[..], def_levels, rep_levels);
3906    }
3907
3908    /// Performs write-read roundtrip and asserts written values and levels.
3909    fn column_roundtrip<T: DataType>(
3910        props: WriterProperties,
3911        values: &[T::T],
3912        def_levels: Option<&[i16]>,
3913        rep_levels: Option<&[i16]>,
3914    ) {
3915        let mut file = tempfile::tempfile().unwrap();
3916        let mut write = TrackedWrite::new(&mut file);
3917        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
3918
3919        let max_def_level = match def_levels {
3920            Some(buf) => *buf.iter().max().unwrap_or(&0i16),
3921            None => 0i16,
3922        };
3923
3924        let max_rep_level = match rep_levels {
3925            Some(buf) => *buf.iter().max().unwrap_or(&0i16),
3926            None => 0i16,
3927        };
3928
3929        let mut max_batch_size = values.len();
3930        if let Some(levels) = def_levels {
3931            max_batch_size = max_batch_size.max(levels.len());
3932        }
3933        if let Some(levels) = rep_levels {
3934            max_batch_size = max_batch_size.max(levels.len());
3935        }
3936
3937        let mut writer =
3938            get_test_column_writer::<T>(page_writer, max_def_level, max_rep_level, Arc::new(props));
3939
3940        let values_written = writer.write_batch(values, def_levels, rep_levels).unwrap();
3941        assert_eq!(values_written, values.len());
3942        let result = writer.close().unwrap();
3943
3944        drop(write);
3945
3946        let props = ReaderProperties::builder()
3947            .set_backward_compatible_lz4(false)
3948            .build();
3949        let page_reader = Box::new(
3950            SerializedPageReader::new_with_properties(
3951                Arc::new(file),
3952                &result.metadata,
3953                result.rows_written as usize,
3954                None,
3955                Arc::new(props),
3956            )
3957            .unwrap(),
3958        );
3959        let mut reader = get_test_column_reader::<T>(page_reader, max_def_level, max_rep_level);
3960
3961        let mut actual_values = Vec::with_capacity(max_batch_size);
3962        let mut actual_def_levels = def_levels.map(|_| Vec::with_capacity(max_batch_size));
3963        let mut actual_rep_levels = rep_levels.map(|_| Vec::with_capacity(max_batch_size));
3964
3965        let (_, values_read, levels_read) = reader
3966            .read_records(
3967                max_batch_size,
3968                actual_def_levels.as_mut(),
3969                actual_rep_levels.as_mut(),
3970                &mut actual_values,
3971            )
3972            .unwrap();
3973
3974        // Assert values, definition and repetition levels.
3975
3976        assert_eq!(&actual_values[..values_read], values);
3977        match actual_def_levels {
3978            Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), def_levels),
3979            None => assert_eq!(None, def_levels),
3980        }
3981        match actual_rep_levels {
3982            Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), rep_levels),
3983            None => assert_eq!(None, rep_levels),
3984        }
3985
3986        // Assert written rows.
3987
3988        if let Some(levels) = actual_rep_levels {
3989            let mut actual_rows_written = 0;
3990            for l in levels {
3991                if l == 0 {
3992                    actual_rows_written += 1;
3993                }
3994            }
3995            assert_eq!(actual_rows_written, result.rows_written);
3996        } else if actual_def_levels.is_some() {
3997            assert_eq!(levels_read as u64, result.rows_written);
3998        } else {
3999            assert_eq!(values_read as u64, result.rows_written);
4000        }
4001    }
4002
4003    /// Performs write of provided values and returns column metadata of those values.
4004    /// Used to test encoding support for column writer.
4005    fn column_write_and_get_metadata<T: DataType>(
4006        props: WriterProperties,
4007        values: &[T::T],
4008    ) -> ColumnChunkMetaData {
4009        let page_writer = get_test_page_writer();
4010        let props = Arc::new(props);
4011        let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
4012        writer.write_batch(values, None, None).unwrap();
4013        writer.close().unwrap().metadata
4014    }
4015
4016    // Helper function to more compactly create a PageEncodingStats struct.
4017    fn encoding_stats(page_type: PageType, encoding: Encoding, count: i32) -> PageEncodingStats {
4018        PageEncodingStats {
4019            page_type,
4020            encoding,
4021            count,
4022        }
4023    }
4024
4025    // Function to use in tests for EncodingWriteSupport. This checks that dictionary
4026    // offset and encodings to make sure that column writer uses provided by trait
4027    // encodings.
4028    fn check_encoding_write_support<T: DataType>(
4029        version: WriterVersion,
4030        dict_enabled: bool,
4031        data: &[T::T],
4032        dictionary_page_offset: Option<i64>,
4033        encodings: &[Encoding],
4034        page_encoding_stats: &[PageEncodingStats],
4035    ) {
4036        let props = WriterProperties::builder()
4037            .set_writer_version(version)
4038            .set_dictionary_enabled(dict_enabled)
4039            .build();
4040        let meta = column_write_and_get_metadata::<T>(props, data);
4041        assert_eq!(meta.dictionary_page_offset(), dictionary_page_offset);
4042        assert_eq!(meta.encodings(), encodings);
4043        assert_eq!(meta.page_encoding_stats().unwrap(), page_encoding_stats);
4044    }
4045
4046    /// Returns column writer.
4047    fn get_test_column_writer<'a, T: DataType>(
4048        page_writer: Box<dyn PageWriter + 'a>,
4049        max_def_level: i16,
4050        max_rep_level: i16,
4051        props: WriterPropertiesPtr,
4052    ) -> ColumnWriterImpl<'a, T> {
4053        let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
4054        let column_writer = get_column_writer(descr, props, page_writer);
4055        get_typed_column_writer::<T>(column_writer)
4056    }
4057
4058    /// Returns column reader.
4059    fn get_test_column_reader<T: DataType>(
4060        page_reader: Box<dyn PageReader>,
4061        max_def_level: i16,
4062        max_rep_level: i16,
4063    ) -> ColumnReaderImpl<T> {
4064        let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
4065        let column_reader = get_column_reader(descr, page_reader);
4066        get_typed_column_reader::<T>(column_reader)
4067    }
4068
4069    /// Returns descriptor for primitive column.
4070    fn get_test_column_descr<T: DataType>(
4071        max_def_level: i16,
4072        max_rep_level: i16,
4073    ) -> ColumnDescriptor {
4074        let path = ColumnPath::from("col");
4075        let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4076            // length is set for "encoding support" tests for FIXED_LEN_BYTE_ARRAY type,
4077            // it should be no-op for other types
4078            .with_length(1)
4079            .build()
4080            .unwrap();
4081        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4082    }
4083
4084    /// Returns page writer that collects pages without serializing them.
4085    fn get_test_page_writer() -> Box<dyn PageWriter> {
4086        Box::new(TestPageWriter {})
4087    }
4088
4089    struct TestPageWriter {}
4090
4091    impl PageWriter for TestPageWriter {
4092        fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
4093            let mut res = PageWriteSpec::new();
4094            res.page_type = page.page_type();
4095            res.uncompressed_size = page.uncompressed_size();
4096            res.compressed_size = page.compressed_size();
4097            res.num_values = page.num_values();
4098            res.offset = 0;
4099            res.bytes_written = page.data().len() as u64;
4100            Ok(res)
4101        }
4102
4103        fn close(&mut self) -> Result<()> {
4104            Ok(())
4105        }
4106    }
4107
4108    /// Write data into parquet using [`get_test_page_writer`] and [`get_test_column_writer`] and returns generated statistics.
4109    fn statistics_roundtrip<T: DataType>(values: &[<T as DataType>::T]) -> Statistics {
4110        let page_writer = get_test_page_writer();
4111        let props = Default::default();
4112        let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
4113        writer.write_batch(values, None, None).unwrap();
4114
4115        let metadata = writer.close().unwrap().metadata;
4116        if let Some(stats) = metadata.statistics() {
4117            stats.clone()
4118        } else {
4119            panic!("metadata missing statistics");
4120        }
4121    }
4122
4123    /// Returns Decimals column writer.
4124    fn get_test_decimals_column_writer<T: DataType>(
4125        page_writer: Box<dyn PageWriter>,
4126        max_def_level: i16,
4127        max_rep_level: i16,
4128        props: WriterPropertiesPtr,
4129    ) -> ColumnWriterImpl<'static, T> {
4130        let descr = Arc::new(get_test_decimals_column_descr::<T>(
4131            max_def_level,
4132            max_rep_level,
4133        ));
4134        let column_writer = get_column_writer(descr, props, page_writer);
4135        get_typed_column_writer::<T>(column_writer)
4136    }
4137
4138    /// Returns descriptor for Decimal type with primitive column.
4139    fn get_test_decimals_column_descr<T: DataType>(
4140        max_def_level: i16,
4141        max_rep_level: i16,
4142    ) -> ColumnDescriptor {
4143        let path = ColumnPath::from("col");
4144        let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4145            .with_length(16)
4146            .with_logical_type(Some(LogicalType::Decimal {
4147                scale: 2,
4148                precision: 3,
4149            }))
4150            .with_scale(2)
4151            .with_precision(3)
4152            .build()
4153            .unwrap();
4154        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4155    }
4156
4157    fn float16_statistics_roundtrip(
4158        values: &[FixedLenByteArray],
4159    ) -> ValueStatistics<FixedLenByteArray> {
4160        let page_writer = get_test_page_writer();
4161        let mut writer = get_test_float16_column_writer(page_writer, Default::default());
4162        writer.write_batch(values, None, None).unwrap();
4163
4164        let metadata = writer.close().unwrap().metadata;
4165        if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
4166            stats.clone()
4167        } else {
4168            panic!("metadata missing statistics");
4169        }
4170    }
4171
4172    fn get_test_float16_column_writer(
4173        page_writer: Box<dyn PageWriter>,
4174        props: WriterPropertiesPtr,
4175    ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
4176        let descr = Arc::new(get_test_float16_column_descr(0, 0));
4177        let column_writer = get_column_writer(descr, props, page_writer);
4178        get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
4179    }
4180
4181    fn get_test_float16_column_descr(max_def_level: i16, max_rep_level: i16) -> ColumnDescriptor {
4182        let path = ColumnPath::from("col");
4183        let tpe =
4184            SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
4185                .with_length(2)
4186                .with_logical_type(Some(LogicalType::Float16))
4187                .build()
4188                .unwrap();
4189        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4190    }
4191
4192    fn get_test_interval_column_writer(
4193        page_writer: Box<dyn PageWriter>,
4194    ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
4195        let descr = Arc::new(get_test_interval_column_descr());
4196        let column_writer = get_column_writer(descr, Default::default(), page_writer);
4197        get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
4198    }
4199
4200    fn get_test_interval_column_descr() -> ColumnDescriptor {
4201        let path = ColumnPath::from("col");
4202        let tpe =
4203            SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
4204                .with_length(12)
4205                .with_converted_type(ConvertedType::INTERVAL)
4206                .build()
4207                .unwrap();
4208        ColumnDescriptor::new(Arc::new(tpe), 0, 0, path)
4209    }
4210
4211    /// Returns column writer for UINT32 Column provided as ConvertedType only
4212    fn get_test_unsigned_int_given_as_converted_column_writer<'a, T: DataType>(
4213        page_writer: Box<dyn PageWriter + 'a>,
4214        max_def_level: i16,
4215        max_rep_level: i16,
4216        props: WriterPropertiesPtr,
4217    ) -> ColumnWriterImpl<'a, T> {
4218        let descr = Arc::new(get_test_converted_type_unsigned_integer_column_descr::<T>(
4219            max_def_level,
4220            max_rep_level,
4221        ));
4222        let column_writer = get_column_writer(descr, props, page_writer);
4223        get_typed_column_writer::<T>(column_writer)
4224    }
4225
4226    /// Returns column descriptor for UINT32 Column provided as ConvertedType only
4227    fn get_test_converted_type_unsigned_integer_column_descr<T: DataType>(
4228        max_def_level: i16,
4229        max_rep_level: i16,
4230    ) -> ColumnDescriptor {
4231        let path = ColumnPath::from("col");
4232        let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4233            .with_converted_type(ConvertedType::UINT_32)
4234            .build()
4235            .unwrap();
4236        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4237    }
4238}