parquet/column/writer/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains column writer API.
19
20use bytes::Bytes;
21use half::f16;
22
23use crate::bloom_filter::Sbbf;
24use crate::format::{BoundaryOrder, ColumnIndex, OffsetIndex};
25use std::collections::{BTreeSet, VecDeque};
26use std::str;
27
28use crate::basic::{Compression, ConvertedType, Encoding, LogicalType, PageType, Type};
29use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter};
30use crate::column::writer::encoder::{ColumnValueEncoder, ColumnValueEncoderImpl, ColumnValues};
31use crate::compression::{create_codec, Codec, CodecOptionsBuilder};
32use crate::data_type::private::ParquetValueType;
33use crate::data_type::*;
34use crate::encodings::levels::LevelEncoder;
35#[cfg(feature = "encryption")]
36use crate::encryption::encrypt::get_column_crypto_metadata;
37use crate::errors::{ParquetError, Result};
38use crate::file::metadata::{
39    ColumnChunkMetaData, ColumnChunkMetaDataBuilder, ColumnIndexBuilder, LevelHistogram,
40    OffsetIndexBuilder,
41};
42use crate::file::page_encoding_stats::PageEncodingStats;
43use crate::file::properties::{
44    EnabledStatistics, WriterProperties, WriterPropertiesPtr, WriterVersion,
45};
46use crate::file::statistics::{Statistics, ValueStatistics};
47use crate::schema::types::{ColumnDescPtr, ColumnDescriptor};
48
49pub(crate) mod encoder;
50
51macro_rules! downcast_writer {
52    ($e:expr, $i:ident, $b:expr) => {
53        match $e {
54            Self::BoolColumnWriter($i) => $b,
55            Self::Int32ColumnWriter($i) => $b,
56            Self::Int64ColumnWriter($i) => $b,
57            Self::Int96ColumnWriter($i) => $b,
58            Self::FloatColumnWriter($i) => $b,
59            Self::DoubleColumnWriter($i) => $b,
60            Self::ByteArrayColumnWriter($i) => $b,
61            Self::FixedLenByteArrayColumnWriter($i) => $b,
62        }
63    };
64}
65
66/// Column writer for a Parquet type.
67pub enum ColumnWriter<'a> {
68    /// Column writer for boolean type
69    BoolColumnWriter(ColumnWriterImpl<'a, BoolType>),
70    /// Column writer for int32 type
71    Int32ColumnWriter(ColumnWriterImpl<'a, Int32Type>),
72    /// Column writer for int64 type
73    Int64ColumnWriter(ColumnWriterImpl<'a, Int64Type>),
74    /// Column writer for int96 (timestamp) type
75    Int96ColumnWriter(ColumnWriterImpl<'a, Int96Type>),
76    /// Column writer for float type
77    FloatColumnWriter(ColumnWriterImpl<'a, FloatType>),
78    /// Column writer for double type
79    DoubleColumnWriter(ColumnWriterImpl<'a, DoubleType>),
80    /// Column writer for byte array type
81    ByteArrayColumnWriter(ColumnWriterImpl<'a, ByteArrayType>),
82    /// Column writer for fixed length byte array type
83    FixedLenByteArrayColumnWriter(ColumnWriterImpl<'a, FixedLenByteArrayType>),
84}
85
86impl ColumnWriter<'_> {
87    /// Returns the estimated total memory usage
88    #[cfg(feature = "arrow")]
89    pub(crate) fn memory_size(&self) -> usize {
90        downcast_writer!(self, typed, typed.memory_size())
91    }
92
93    /// Returns the estimated total encoded bytes for this column writer
94    #[cfg(feature = "arrow")]
95    pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
96        downcast_writer!(self, typed, typed.get_estimated_total_bytes())
97    }
98
99    /// Close this [`ColumnWriter`]
100    pub fn close(self) -> Result<ColumnCloseResult> {
101        downcast_writer!(self, typed, typed.close())
102    }
103}
104
105/// Gets a specific column writer corresponding to column descriptor `descr`.
106pub fn get_column_writer<'a>(
107    descr: ColumnDescPtr,
108    props: WriterPropertiesPtr,
109    page_writer: Box<dyn PageWriter + 'a>,
110) -> ColumnWriter<'a> {
111    match descr.physical_type() {
112        Type::BOOLEAN => {
113            ColumnWriter::BoolColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
114        }
115        Type::INT32 => {
116            ColumnWriter::Int32ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
117        }
118        Type::INT64 => {
119            ColumnWriter::Int64ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
120        }
121        Type::INT96 => {
122            ColumnWriter::Int96ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
123        }
124        Type::FLOAT => {
125            ColumnWriter::FloatColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
126        }
127        Type::DOUBLE => {
128            ColumnWriter::DoubleColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
129        }
130        Type::BYTE_ARRAY => {
131            ColumnWriter::ByteArrayColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
132        }
133        Type::FIXED_LEN_BYTE_ARRAY => ColumnWriter::FixedLenByteArrayColumnWriter(
134            ColumnWriterImpl::new(descr, props, page_writer),
135        ),
136    }
137}
138
139/// Gets a typed column writer for the specific type `T`, by "up-casting" `col_writer` of
140/// non-generic type to a generic column writer type `ColumnWriterImpl`.
141///
142/// Panics if actual enum value for `col_writer` does not match the type `T`.
143pub fn get_typed_column_writer<T: DataType>(col_writer: ColumnWriter) -> ColumnWriterImpl<T> {
144    T::get_column_writer(col_writer).unwrap_or_else(|| {
145        panic!(
146            "Failed to convert column writer into a typed column writer for `{}` type",
147            T::get_physical_type()
148        )
149    })
150}
151
152/// Similar to `get_typed_column_writer` but returns a reference.
153pub fn get_typed_column_writer_ref<'a, 'b: 'a, T: DataType>(
154    col_writer: &'b ColumnWriter<'a>,
155) -> &'b ColumnWriterImpl<'a, T> {
156    T::get_column_writer_ref(col_writer).unwrap_or_else(|| {
157        panic!(
158            "Failed to convert column writer into a typed column writer for `{}` type",
159            T::get_physical_type()
160        )
161    })
162}
163
164/// Similar to `get_typed_column_writer` but returns a reference.
165pub fn get_typed_column_writer_mut<'a, 'b: 'a, T: DataType>(
166    col_writer: &'a mut ColumnWriter<'b>,
167) -> &'a mut ColumnWriterImpl<'b, T> {
168    T::get_column_writer_mut(col_writer).unwrap_or_else(|| {
169        panic!(
170            "Failed to convert column writer into a typed column writer for `{}` type",
171            T::get_physical_type()
172        )
173    })
174}
175
176/// Metadata returned by [`GenericColumnWriter::close`]
177#[derive(Debug, Clone)]
178pub struct ColumnCloseResult {
179    /// The total number of bytes written
180    pub bytes_written: u64,
181    /// The total number of rows written
182    pub rows_written: u64,
183    /// Metadata for this column chunk
184    pub metadata: ColumnChunkMetaData,
185    /// Optional bloom filter for this column
186    pub bloom_filter: Option<Sbbf>,
187    /// Optional column index, for filtering
188    pub column_index: Option<ColumnIndex>,
189    /// Optional offset index, identifying page locations
190    pub offset_index: Option<OffsetIndex>,
191}
192
193// Metrics per page
194#[derive(Default)]
195struct PageMetrics {
196    num_buffered_values: u32,
197    num_buffered_rows: u32,
198    num_page_nulls: u64,
199    repetition_level_histogram: Option<LevelHistogram>,
200    definition_level_histogram: Option<LevelHistogram>,
201}
202
203impl PageMetrics {
204    fn new() -> Self {
205        Default::default()
206    }
207
208    /// Initialize the repetition level histogram
209    fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
210        self.repetition_level_histogram = LevelHistogram::try_new(max_level);
211        self
212    }
213
214    /// Initialize the definition level histogram
215    fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
216        self.definition_level_histogram = LevelHistogram::try_new(max_level);
217        self
218    }
219
220    /// Resets the state of this `PageMetrics` to the initial state.
221    /// If histograms have been initialized their contents will be reset to zero.
222    fn new_page(&mut self) {
223        self.num_buffered_values = 0;
224        self.num_buffered_rows = 0;
225        self.num_page_nulls = 0;
226        self.repetition_level_histogram
227            .as_mut()
228            .map(LevelHistogram::reset);
229        self.definition_level_histogram
230            .as_mut()
231            .map(LevelHistogram::reset);
232    }
233
234    /// Updates histogram values using provided repetition levels
235    fn update_repetition_level_histogram(&mut self, levels: &[i16]) {
236        if let Some(ref mut rep_hist) = self.repetition_level_histogram {
237            rep_hist.update_from_levels(levels);
238        }
239    }
240
241    /// Updates histogram values using provided definition levels
242    fn update_definition_level_histogram(&mut self, levels: &[i16]) {
243        if let Some(ref mut def_hist) = self.definition_level_histogram {
244            def_hist.update_from_levels(levels);
245        }
246    }
247}
248
249// Metrics per column writer
250#[derive(Default)]
251struct ColumnMetrics<T: Default> {
252    total_bytes_written: u64,
253    total_rows_written: u64,
254    total_uncompressed_size: u64,
255    total_compressed_size: u64,
256    total_num_values: u64,
257    dictionary_page_offset: Option<u64>,
258    data_page_offset: Option<u64>,
259    min_column_value: Option<T>,
260    max_column_value: Option<T>,
261    num_column_nulls: u64,
262    column_distinct_count: Option<u64>,
263    variable_length_bytes: Option<i64>,
264    repetition_level_histogram: Option<LevelHistogram>,
265    definition_level_histogram: Option<LevelHistogram>,
266}
267
268impl<T: Default> ColumnMetrics<T> {
269    fn new() -> Self {
270        Default::default()
271    }
272
273    /// Initialize the repetition level histogram
274    fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
275        self.repetition_level_histogram = LevelHistogram::try_new(max_level);
276        self
277    }
278
279    /// Initialize the definition level histogram
280    fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
281        self.definition_level_histogram = LevelHistogram::try_new(max_level);
282        self
283    }
284
285    /// Sum `page_histogram` into `chunk_histogram`
286    fn update_histogram(
287        chunk_histogram: &mut Option<LevelHistogram>,
288        page_histogram: &Option<LevelHistogram>,
289    ) {
290        if let (Some(page_hist), Some(chunk_hist)) = (page_histogram, chunk_histogram) {
291            chunk_hist.add(page_hist);
292        }
293    }
294
295    /// Sum the provided PageMetrics histograms into the chunk histograms. Does nothing if
296    /// page histograms are not initialized.
297    fn update_from_page_metrics(&mut self, page_metrics: &PageMetrics) {
298        ColumnMetrics::<T>::update_histogram(
299            &mut self.definition_level_histogram,
300            &page_metrics.definition_level_histogram,
301        );
302        ColumnMetrics::<T>::update_histogram(
303            &mut self.repetition_level_histogram,
304            &page_metrics.repetition_level_histogram,
305        );
306    }
307
308    /// Sum the provided page variable_length_bytes into the chunk variable_length_bytes
309    fn update_variable_length_bytes(&mut self, variable_length_bytes: Option<i64>) {
310        if let Some(var_bytes) = variable_length_bytes {
311            *self.variable_length_bytes.get_or_insert(0) += var_bytes;
312        }
313    }
314}
315
316/// Typed column writer for a primitive column.
317pub type ColumnWriterImpl<'a, T> = GenericColumnWriter<'a, ColumnValueEncoderImpl<T>>;
318
319/// Generic column writer for a primitive column.
320pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> {
321    // Column writer properties
322    descr: ColumnDescPtr,
323    props: WriterPropertiesPtr,
324    statistics_enabled: EnabledStatistics,
325
326    page_writer: Box<dyn PageWriter + 'a>,
327    codec: Compression,
328    compressor: Option<Box<dyn Codec>>,
329    encoder: E,
330
331    page_metrics: PageMetrics,
332    // Metrics per column writer
333    column_metrics: ColumnMetrics<E::T>,
334
335    /// The order of encodings within the generated metadata does not impact its meaning,
336    /// but we use a BTreeSet so that the output is deterministic
337    encodings: BTreeSet<Encoding>,
338    encoding_stats: Vec<PageEncodingStats>,
339    // Reused buffers
340    def_levels_sink: Vec<i16>,
341    rep_levels_sink: Vec<i16>,
342    data_pages: VecDeque<CompressedPage>,
343    // column index and offset index
344    column_index_builder: ColumnIndexBuilder,
345    offset_index_builder: Option<OffsetIndexBuilder>,
346
347    // Below fields used to incrementally check boundary order across data pages.
348    // We assume they are ascending/descending until proven wrong.
349    data_page_boundary_ascending: bool,
350    data_page_boundary_descending: bool,
351    /// (min, max)
352    last_non_null_data_page_min_max: Option<(E::T, E::T)>,
353}
354
355impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
356    /// Returns a new instance of [`GenericColumnWriter`].
357    pub fn new(
358        descr: ColumnDescPtr,
359        props: WriterPropertiesPtr,
360        page_writer: Box<dyn PageWriter + 'a>,
361    ) -> Self {
362        let codec = props.compression(descr.path());
363        let codec_options = CodecOptionsBuilder::default().build();
364        let compressor = create_codec(codec, &codec_options).unwrap();
365        let encoder = E::try_new(&descr, props.as_ref()).unwrap();
366
367        let statistics_enabled = props.statistics_enabled(descr.path());
368
369        let mut encodings = BTreeSet::new();
370        // Used for level information
371        encodings.insert(Encoding::RLE);
372
373        let mut page_metrics = PageMetrics::new();
374        let mut column_metrics = ColumnMetrics::<E::T>::new();
375
376        // Initialize level histograms if collecting page or chunk statistics
377        if statistics_enabled != EnabledStatistics::None {
378            page_metrics = page_metrics
379                .with_repetition_level_histogram(descr.max_rep_level())
380                .with_definition_level_histogram(descr.max_def_level());
381            column_metrics = column_metrics
382                .with_repetition_level_histogram(descr.max_rep_level())
383                .with_definition_level_histogram(descr.max_def_level())
384        }
385
386        // Disable column_index_builder if not collecting page statistics.
387        let mut column_index_builder = ColumnIndexBuilder::new();
388        if statistics_enabled != EnabledStatistics::Page {
389            column_index_builder.to_invalid()
390        }
391
392        // Disable offset_index_builder if requested by user.
393        let offset_index_builder = match props.offset_index_disabled() {
394            false => Some(OffsetIndexBuilder::new()),
395            _ => None,
396        };
397
398        Self {
399            descr,
400            props,
401            statistics_enabled,
402            page_writer,
403            codec,
404            compressor,
405            encoder,
406            def_levels_sink: vec![],
407            rep_levels_sink: vec![],
408            data_pages: VecDeque::new(),
409            page_metrics,
410            column_metrics,
411            column_index_builder,
412            offset_index_builder,
413            encodings,
414            encoding_stats: vec![],
415            data_page_boundary_ascending: true,
416            data_page_boundary_descending: true,
417            last_non_null_data_page_min_max: None,
418        }
419    }
420
421    #[allow(clippy::too_many_arguments)]
422    pub(crate) fn write_batch_internal(
423        &mut self,
424        values: &E::Values,
425        value_indices: Option<&[usize]>,
426        def_levels: Option<&[i16]>,
427        rep_levels: Option<&[i16]>,
428        min: Option<&E::T>,
429        max: Option<&E::T>,
430        distinct_count: Option<u64>,
431    ) -> Result<usize> {
432        // Check if number of definition levels is the same as number of repetition levels.
433        if let (Some(def), Some(rep)) = (def_levels, rep_levels) {
434            if def.len() != rep.len() {
435                return Err(general_err!(
436                    "Inconsistent length of definition and repetition levels: {} != {}",
437                    def.len(),
438                    rep.len()
439                ));
440            }
441        }
442
443        // We check for DataPage limits only after we have inserted the values. If a user
444        // writes a large number of values, the DataPage size can be well above the limit.
445        //
446        // The purpose of this chunking is to bound this. Even if a user writes large
447        // number of values, the chunking will ensure that we add data page at a
448        // reasonable pagesize limit.
449
450        // TODO: find out why we don't account for size of levels when we estimate page
451        // size.
452
453        let num_levels = match def_levels {
454            Some(def_levels) => def_levels.len(),
455            None => values.len(),
456        };
457
458        if let Some(min) = min {
459            update_min(&self.descr, min, &mut self.column_metrics.min_column_value);
460        }
461        if let Some(max) = max {
462            update_max(&self.descr, max, &mut self.column_metrics.max_column_value);
463        }
464
465        // We can only set the distinct count if there are no other writes
466        if self.encoder.num_values() == 0 {
467            self.column_metrics.column_distinct_count = distinct_count;
468        } else {
469            self.column_metrics.column_distinct_count = None;
470        }
471
472        let mut values_offset = 0;
473        let mut levels_offset = 0;
474        let base_batch_size = self.props.write_batch_size();
475        while levels_offset < num_levels {
476            let mut end_offset = num_levels.min(levels_offset + base_batch_size);
477
478            // Split at record boundary
479            if let Some(r) = rep_levels {
480                while end_offset < r.len() && r[end_offset] != 0 {
481                    end_offset += 1;
482                }
483            }
484
485            values_offset += self.write_mini_batch(
486                values,
487                values_offset,
488                value_indices,
489                end_offset - levels_offset,
490                def_levels.map(|lv| &lv[levels_offset..end_offset]),
491                rep_levels.map(|lv| &lv[levels_offset..end_offset]),
492            )?;
493            levels_offset = end_offset;
494        }
495
496        // Return total number of values processed.
497        Ok(values_offset)
498    }
499
500    /// Writes batch of values, definition levels and repetition levels.
501    /// Returns number of values processed (written).
502    ///
503    /// If definition and repetition levels are provided, we write fully those levels and
504    /// select how many values to write (this number will be returned), since number of
505    /// actual written values may be smaller than provided values.
506    ///
507    /// If only values are provided, then all values are written and the length of
508    /// of the values buffer is returned.
509    ///
510    /// Definition and/or repetition levels can be omitted, if values are
511    /// non-nullable and/or non-repeated.
512    pub fn write_batch(
513        &mut self,
514        values: &E::Values,
515        def_levels: Option<&[i16]>,
516        rep_levels: Option<&[i16]>,
517    ) -> Result<usize> {
518        self.write_batch_internal(values, None, def_levels, rep_levels, None, None, None)
519    }
520
521    /// Writer may optionally provide pre-calculated statistics for use when computing
522    /// chunk-level statistics
523    ///
524    /// NB: [`WriterProperties::statistics_enabled`] must be set to [`EnabledStatistics::Chunk`]
525    /// for these statistics to take effect. If [`EnabledStatistics::None`] they will be ignored,
526    /// and if [`EnabledStatistics::Page`] the chunk statistics will instead be computed from the
527    /// computed page statistics
528    pub fn write_batch_with_statistics(
529        &mut self,
530        values: &E::Values,
531        def_levels: Option<&[i16]>,
532        rep_levels: Option<&[i16]>,
533        min: Option<&E::T>,
534        max: Option<&E::T>,
535        distinct_count: Option<u64>,
536    ) -> Result<usize> {
537        self.write_batch_internal(
538            values,
539            None,
540            def_levels,
541            rep_levels,
542            min,
543            max,
544            distinct_count,
545        )
546    }
547
548    /// Returns the estimated total memory usage.
549    ///
550    /// Unlike [`Self::get_estimated_total_bytes`] this is an estimate
551    /// of the current memory usage and not the final anticipated encoded size.
552    #[cfg(feature = "arrow")]
553    pub(crate) fn memory_size(&self) -> usize {
554        self.column_metrics.total_bytes_written as usize + self.encoder.estimated_memory_size()
555    }
556
557    /// Returns total number of bytes written by this column writer so far.
558    /// This value is also returned when column writer is closed.
559    ///
560    /// Note: this value does not include any buffered data that has not
561    /// yet been flushed to a page.
562    pub fn get_total_bytes_written(&self) -> u64 {
563        self.column_metrics.total_bytes_written
564    }
565
566    /// Returns the estimated total encoded bytes for this column writer.
567    ///
568    /// Unlike [`Self::get_total_bytes_written`] this includes an estimate
569    /// of any data that has not yet been flushed to a page, based on it's
570    /// anticipated encoded size.
571    #[cfg(feature = "arrow")]
572    pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
573        self.data_pages
574            .iter()
575            .map(|page| page.data().len() as u64)
576            .sum::<u64>()
577            + self.column_metrics.total_bytes_written
578            + self.encoder.estimated_data_page_size() as u64
579            + self.encoder.estimated_dict_page_size().unwrap_or_default() as u64
580    }
581
582    /// Returns total number of rows written by this column writer so far.
583    /// This value is also returned when column writer is closed.
584    pub fn get_total_rows_written(&self) -> u64 {
585        self.column_metrics.total_rows_written
586    }
587
588    /// Returns a reference to a [`ColumnDescPtr`]
589    pub fn get_descriptor(&self) -> &ColumnDescPtr {
590        &self.descr
591    }
592
593    /// Finalizes writes and closes the column writer.
594    /// Returns total bytes written, total rows written and column chunk metadata.
595    pub fn close(mut self) -> Result<ColumnCloseResult> {
596        if self.page_metrics.num_buffered_values > 0 {
597            self.add_data_page()?;
598        }
599        if self.encoder.has_dictionary() {
600            self.write_dictionary_page()?;
601        }
602        self.flush_data_pages()?;
603        let metadata = self.build_column_metadata()?;
604        self.page_writer.close()?;
605
606        let boundary_order = match (
607            self.data_page_boundary_ascending,
608            self.data_page_boundary_descending,
609        ) {
610            // If the lists are composed of equal elements then will be marked as ascending
611            // (Also the case if all pages are null pages)
612            (true, _) => BoundaryOrder::ASCENDING,
613            (false, true) => BoundaryOrder::DESCENDING,
614            (false, false) => BoundaryOrder::UNORDERED,
615        };
616        self.column_index_builder.set_boundary_order(boundary_order);
617
618        let column_index = self
619            .column_index_builder
620            .valid()
621            .then(|| self.column_index_builder.build_to_thrift());
622
623        let offset_index = self.offset_index_builder.map(|b| b.build_to_thrift());
624
625        Ok(ColumnCloseResult {
626            bytes_written: self.column_metrics.total_bytes_written,
627            rows_written: self.column_metrics.total_rows_written,
628            bloom_filter: self.encoder.flush_bloom_filter(),
629            metadata,
630            column_index,
631            offset_index,
632        })
633    }
634
635    /// Writes mini batch of values, definition and repetition levels.
636    /// This allows fine-grained processing of values and maintaining a reasonable
637    /// page size.
638    fn write_mini_batch(
639        &mut self,
640        values: &E::Values,
641        values_offset: usize,
642        value_indices: Option<&[usize]>,
643        num_levels: usize,
644        def_levels: Option<&[i16]>,
645        rep_levels: Option<&[i16]>,
646    ) -> Result<usize> {
647        // Process definition levels and determine how many values to write.
648        let values_to_write = if self.descr.max_def_level() > 0 {
649            let levels = def_levels.ok_or_else(|| {
650                general_err!(
651                    "Definition levels are required, because max definition level = {}",
652                    self.descr.max_def_level()
653                )
654            })?;
655
656            let values_to_write = levels
657                .iter()
658                .map(|level| (*level == self.descr.max_def_level()) as usize)
659                .sum();
660            self.page_metrics.num_page_nulls += (levels.len() - values_to_write) as u64;
661
662            // Update histogram
663            self.page_metrics.update_definition_level_histogram(levels);
664
665            self.def_levels_sink.extend_from_slice(levels);
666            values_to_write
667        } else {
668            num_levels
669        };
670
671        // Process repetition levels and determine how many rows we are about to process.
672        if self.descr.max_rep_level() > 0 {
673            // A row could contain more than one value.
674            let levels = rep_levels.ok_or_else(|| {
675                general_err!(
676                    "Repetition levels are required, because max repetition level = {}",
677                    self.descr.max_rep_level()
678                )
679            })?;
680
681            if !levels.is_empty() && levels[0] != 0 {
682                return Err(general_err!(
683                    "Write must start at a record boundary, got non-zero repetition level of {}",
684                    levels[0]
685                ));
686            }
687
688            // Count the occasions where we start a new row
689            for &level in levels {
690                self.page_metrics.num_buffered_rows += (level == 0) as u32
691            }
692
693            // Update histogram
694            self.page_metrics.update_repetition_level_histogram(levels);
695
696            self.rep_levels_sink.extend_from_slice(levels);
697        } else {
698            // Each value is exactly one row.
699            // Equals to the number of values, we count nulls as well.
700            self.page_metrics.num_buffered_rows += num_levels as u32;
701        }
702
703        match value_indices {
704            Some(indices) => {
705                let indices = &indices[values_offset..values_offset + values_to_write];
706                self.encoder.write_gather(values, indices)?;
707            }
708            None => self.encoder.write(values, values_offset, values_to_write)?,
709        }
710
711        self.page_metrics.num_buffered_values += num_levels as u32;
712
713        if self.should_add_data_page() {
714            self.add_data_page()?;
715        }
716
717        if self.should_dict_fallback() {
718            self.dict_fallback()?;
719        }
720
721        Ok(values_to_write)
722    }
723
724    /// Returns true if we need to fall back to non-dictionary encoding.
725    ///
726    /// We can only fall back if dictionary encoder is set and we have exceeded dictionary
727    /// size.
728    #[inline]
729    fn should_dict_fallback(&self) -> bool {
730        match self.encoder.estimated_dict_page_size() {
731            Some(size) => {
732                size >= self
733                    .props
734                    .column_dictionary_page_size_limit(self.descr.path())
735            }
736            None => false,
737        }
738    }
739
740    /// Returns true if there is enough data for a data page, false otherwise.
741    #[inline]
742    fn should_add_data_page(&self) -> bool {
743        // This is necessary in the event of a much larger dictionary size than page size
744        //
745        // In such a scenario the dictionary decoder may return an estimated encoded
746        // size in excess of the page size limit, even when there are no buffered values
747        if self.page_metrics.num_buffered_values == 0 {
748            return false;
749        }
750
751        self.page_metrics.num_buffered_rows as usize >= self.props.data_page_row_count_limit()
752            || self.encoder.estimated_data_page_size() >= self.props.data_page_size_limit()
753    }
754
755    /// Performs dictionary fallback.
756    /// Prepares and writes dictionary and all data pages into page writer.
757    fn dict_fallback(&mut self) -> Result<()> {
758        // At this point we know that we need to fall back.
759        if self.page_metrics.num_buffered_values > 0 {
760            self.add_data_page()?;
761        }
762        self.write_dictionary_page()?;
763        self.flush_data_pages()?;
764        Ok(())
765    }
766
767    /// Update the column index and offset index when adding the data page
768    fn update_column_offset_index(
769        &mut self,
770        page_statistics: Option<&ValueStatistics<E::T>>,
771        page_variable_length_bytes: Option<i64>,
772    ) {
773        // update the column index
774        let null_page =
775            (self.page_metrics.num_buffered_rows as u64) == self.page_metrics.num_page_nulls;
776        // a page contains only null values,
777        // and writers have to set the corresponding entries in min_values and max_values to byte[0]
778        if null_page && self.column_index_builder.valid() {
779            self.column_index_builder.append(
780                null_page,
781                vec![],
782                vec![],
783                self.page_metrics.num_page_nulls as i64,
784            );
785        } else if self.column_index_builder.valid() {
786            // from page statistics
787            // If can't get the page statistics, ignore this column/offset index for this column chunk
788            match &page_statistics {
789                None => {
790                    self.column_index_builder.to_invalid();
791                }
792                Some(stat) => {
793                    // Check if min/max are still ascending/descending across pages
794                    let new_min = stat.min_opt().unwrap();
795                    let new_max = stat.max_opt().unwrap();
796                    if let Some((last_min, last_max)) = &self.last_non_null_data_page_min_max {
797                        if self.data_page_boundary_ascending {
798                            // If last min/max are greater than new min/max then not ascending anymore
799                            let not_ascending = compare_greater(&self.descr, last_min, new_min)
800                                || compare_greater(&self.descr, last_max, new_max);
801                            if not_ascending {
802                                self.data_page_boundary_ascending = false;
803                            }
804                        }
805
806                        if self.data_page_boundary_descending {
807                            // If new min/max are greater than last min/max then not descending anymore
808                            let not_descending = compare_greater(&self.descr, new_min, last_min)
809                                || compare_greater(&self.descr, new_max, last_max);
810                            if not_descending {
811                                self.data_page_boundary_descending = false;
812                            }
813                        }
814                    }
815                    self.last_non_null_data_page_min_max = Some((new_min.clone(), new_max.clone()));
816
817                    if self.can_truncate_value() {
818                        self.column_index_builder.append(
819                            null_page,
820                            self.truncate_min_value(
821                                self.props.column_index_truncate_length(),
822                                stat.min_bytes_opt().unwrap(),
823                            )
824                            .0,
825                            self.truncate_max_value(
826                                self.props.column_index_truncate_length(),
827                                stat.max_bytes_opt().unwrap(),
828                            )
829                            .0,
830                            self.page_metrics.num_page_nulls as i64,
831                        );
832                    } else {
833                        self.column_index_builder.append(
834                            null_page,
835                            stat.min_bytes_opt().unwrap().to_vec(),
836                            stat.max_bytes_opt().unwrap().to_vec(),
837                            self.page_metrics.num_page_nulls as i64,
838                        );
839                    }
840                }
841            }
842        }
843
844        // Append page histograms to the `ColumnIndex` histograms
845        self.column_index_builder.append_histograms(
846            &self.page_metrics.repetition_level_histogram,
847            &self.page_metrics.definition_level_histogram,
848        );
849
850        // Update the offset index
851        if let Some(builder) = self.offset_index_builder.as_mut() {
852            builder.append_row_count(self.page_metrics.num_buffered_rows as i64);
853            builder.append_unencoded_byte_array_data_bytes(page_variable_length_bytes);
854        }
855    }
856
857    /// Determine if we should allow truncating min/max values for this column's statistics
858    fn can_truncate_value(&self) -> bool {
859        match self.descr.physical_type() {
860            // Don't truncate for Float16 and Decimal because their sort order is different
861            // from that of FIXED_LEN_BYTE_ARRAY sort order.
862            // So truncation of those types could lead to inaccurate min/max statistics
863            Type::FIXED_LEN_BYTE_ARRAY
864                if !matches!(
865                    self.descr.logical_type(),
866                    Some(LogicalType::Decimal { .. }) | Some(LogicalType::Float16)
867                ) =>
868            {
869                true
870            }
871            Type::BYTE_ARRAY => true,
872            // Truncation only applies for fba/binary physical types
873            _ => false,
874        }
875    }
876
877    /// Returns `true` if this column's logical type is a UTF-8 string.
878    fn is_utf8(&self) -> bool {
879        self.get_descriptor().logical_type() == Some(LogicalType::String)
880            || self.get_descriptor().converted_type() == ConvertedType::UTF8
881    }
882
883    /// Truncates a binary statistic to at most `truncation_length` bytes.
884    ///
885    /// If truncation is not possible, returns `data`.
886    ///
887    /// The `bool` in the returned tuple indicates whether truncation occurred or not.
888    ///
889    /// UTF-8 Note:
890    /// If the column type indicates UTF-8, and `data` contains valid UTF-8, then the result will
891    /// also remain valid UTF-8, but may be less tnan `truncation_length` bytes to avoid splitting
892    /// on non-character boundaries.
893    fn truncate_min_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
894        truncation_length
895            .filter(|l| data.len() > *l)
896            .and_then(|l|
897                // don't do extra work if this column isn't UTF-8
898                if self.is_utf8() {
899                    match str::from_utf8(data) {
900                        Ok(str_data) => truncate_utf8(str_data, l),
901                        Err(_) => Some(data[..l].to_vec()),
902                    }
903                } else {
904                    Some(data[..l].to_vec())
905                }
906            )
907            .map(|truncated| (truncated, true))
908            .unwrap_or_else(|| (data.to_vec(), false))
909    }
910
911    /// Truncates a binary statistic to at most `truncation_length` bytes, and then increment the
912    /// final byte(s) to yield a valid upper bound. This may result in a result of less than
913    /// `truncation_length` bytes if the last byte(s) overflows.
914    ///
915    /// If truncation is not possible, returns `data`.
916    ///
917    /// The `bool` in the returned tuple indicates whether truncation occurred or not.
918    ///
919    /// UTF-8 Note:
920    /// If the column type indicates UTF-8, and `data` contains valid UTF-8, then the result will
921    /// also remain valid UTF-8 (but again may be less than `truncation_length` bytes). If `data`
922    /// does not contain valid UTF-8, then truncation will occur as if the column is non-string
923    /// binary.
924    fn truncate_max_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
925        truncation_length
926            .filter(|l| data.len() > *l)
927            .and_then(|l|
928                // don't do extra work if this column isn't UTF-8
929                if self.is_utf8() {
930                    match str::from_utf8(data) {
931                        Ok(str_data) => truncate_and_increment_utf8(str_data, l),
932                        Err(_) => increment(data[..l].to_vec()),
933                    }
934                } else {
935                    increment(data[..l].to_vec())
936                }
937            )
938            .map(|truncated| (truncated, true))
939            .unwrap_or_else(|| (data.to_vec(), false))
940    }
941
942    /// Truncate the min and max values that will be written to a data page
943    /// header or column chunk Statistics
944    fn truncate_statistics(&self, statistics: Statistics) -> Statistics {
945        let backwards_compatible_min_max = self.descr.sort_order().is_signed();
946        match statistics {
947            Statistics::ByteArray(stats) if stats._internal_has_min_max_set() => {
948                let (min, did_truncate_min) = self.truncate_min_value(
949                    self.props.statistics_truncate_length(),
950                    stats.min_bytes_opt().unwrap(),
951                );
952                let (max, did_truncate_max) = self.truncate_max_value(
953                    self.props.statistics_truncate_length(),
954                    stats.max_bytes_opt().unwrap(),
955                );
956                Statistics::ByteArray(
957                    ValueStatistics::new(
958                        Some(min.into()),
959                        Some(max.into()),
960                        stats.distinct_count(),
961                        stats.null_count_opt(),
962                        backwards_compatible_min_max,
963                    )
964                    .with_max_is_exact(!did_truncate_max)
965                    .with_min_is_exact(!did_truncate_min),
966                )
967            }
968            Statistics::FixedLenByteArray(stats)
969                if (stats._internal_has_min_max_set() && self.can_truncate_value()) =>
970            {
971                let (min, did_truncate_min) = self.truncate_min_value(
972                    self.props.statistics_truncate_length(),
973                    stats.min_bytes_opt().unwrap(),
974                );
975                let (max, did_truncate_max) = self.truncate_max_value(
976                    self.props.statistics_truncate_length(),
977                    stats.max_bytes_opt().unwrap(),
978                );
979                Statistics::FixedLenByteArray(
980                    ValueStatistics::new(
981                        Some(min.into()),
982                        Some(max.into()),
983                        stats.distinct_count(),
984                        stats.null_count_opt(),
985                        backwards_compatible_min_max,
986                    )
987                    .with_max_is_exact(!did_truncate_max)
988                    .with_min_is_exact(!did_truncate_min),
989                )
990            }
991            stats => stats,
992        }
993    }
994
995    /// Adds data page.
996    /// Data page is either buffered in case of dictionary encoding or written directly.
997    fn add_data_page(&mut self) -> Result<()> {
998        // Extract encoded values
999        let values_data = self.encoder.flush_data_page()?;
1000
1001        let max_def_level = self.descr.max_def_level();
1002        let max_rep_level = self.descr.max_rep_level();
1003
1004        self.column_metrics.num_column_nulls += self.page_metrics.num_page_nulls;
1005
1006        let page_statistics = match (values_data.min_value, values_data.max_value) {
1007            (Some(min), Some(max)) => {
1008                // Update chunk level statistics
1009                update_min(&self.descr, &min, &mut self.column_metrics.min_column_value);
1010                update_max(&self.descr, &max, &mut self.column_metrics.max_column_value);
1011
1012                (self.statistics_enabled == EnabledStatistics::Page).then_some(
1013                    ValueStatistics::new(
1014                        Some(min),
1015                        Some(max),
1016                        None,
1017                        Some(self.page_metrics.num_page_nulls),
1018                        false,
1019                    ),
1020                )
1021            }
1022            _ => None,
1023        };
1024
1025        // update column and offset index
1026        self.update_column_offset_index(
1027            page_statistics.as_ref(),
1028            values_data.variable_length_bytes,
1029        );
1030
1031        // Update histograms and variable_length_bytes in column_metrics
1032        self.column_metrics
1033            .update_from_page_metrics(&self.page_metrics);
1034        self.column_metrics
1035            .update_variable_length_bytes(values_data.variable_length_bytes);
1036
1037        // From here on, we only need page statistics if they will be written to the page header.
1038        let page_statistics = page_statistics
1039            .filter(|_| self.props.write_page_header_statistics(self.descr.path()))
1040            .map(|stats| self.truncate_statistics(Statistics::from(stats)));
1041
1042        let compressed_page = match self.props.writer_version() {
1043            WriterVersion::PARQUET_1_0 => {
1044                let mut buffer = vec![];
1045
1046                if max_rep_level > 0 {
1047                    buffer.extend_from_slice(
1048                        &self.encode_levels_v1(
1049                            Encoding::RLE,
1050                            &self.rep_levels_sink[..],
1051                            max_rep_level,
1052                        )[..],
1053                    );
1054                }
1055
1056                if max_def_level > 0 {
1057                    buffer.extend_from_slice(
1058                        &self.encode_levels_v1(
1059                            Encoding::RLE,
1060                            &self.def_levels_sink[..],
1061                            max_def_level,
1062                        )[..],
1063                    );
1064                }
1065
1066                buffer.extend_from_slice(&values_data.buf);
1067                let uncompressed_size = buffer.len();
1068
1069                if let Some(ref mut cmpr) = self.compressor {
1070                    let mut compressed_buf = Vec::with_capacity(uncompressed_size);
1071                    cmpr.compress(&buffer[..], &mut compressed_buf)?;
1072                    buffer = compressed_buf;
1073                }
1074
1075                let data_page = Page::DataPage {
1076                    buf: buffer.into(),
1077                    num_values: self.page_metrics.num_buffered_values,
1078                    encoding: values_data.encoding,
1079                    def_level_encoding: Encoding::RLE,
1080                    rep_level_encoding: Encoding::RLE,
1081                    statistics: page_statistics,
1082                };
1083
1084                CompressedPage::new(data_page, uncompressed_size)
1085            }
1086            WriterVersion::PARQUET_2_0 => {
1087                let mut rep_levels_byte_len = 0;
1088                let mut def_levels_byte_len = 0;
1089                let mut buffer = vec![];
1090
1091                if max_rep_level > 0 {
1092                    let levels = self.encode_levels_v2(&self.rep_levels_sink[..], max_rep_level);
1093                    rep_levels_byte_len = levels.len();
1094                    buffer.extend_from_slice(&levels[..]);
1095                }
1096
1097                if max_def_level > 0 {
1098                    let levels = self.encode_levels_v2(&self.def_levels_sink[..], max_def_level);
1099                    def_levels_byte_len = levels.len();
1100                    buffer.extend_from_slice(&levels[..]);
1101                }
1102
1103                let uncompressed_size =
1104                    rep_levels_byte_len + def_levels_byte_len + values_data.buf.len();
1105
1106                // Data Page v2 compresses values only.
1107                match self.compressor {
1108                    Some(ref mut cmpr) => {
1109                        cmpr.compress(&values_data.buf, &mut buffer)?;
1110                    }
1111                    None => buffer.extend_from_slice(&values_data.buf),
1112                }
1113
1114                let data_page = Page::DataPageV2 {
1115                    buf: buffer.into(),
1116                    num_values: self.page_metrics.num_buffered_values,
1117                    encoding: values_data.encoding,
1118                    num_nulls: self.page_metrics.num_page_nulls as u32,
1119                    num_rows: self.page_metrics.num_buffered_rows,
1120                    def_levels_byte_len: def_levels_byte_len as u32,
1121                    rep_levels_byte_len: rep_levels_byte_len as u32,
1122                    is_compressed: self.compressor.is_some(),
1123                    statistics: page_statistics,
1124                };
1125
1126                CompressedPage::new(data_page, uncompressed_size)
1127            }
1128        };
1129
1130        // Check if we need to buffer data page or flush it to the sink directly.
1131        if self.encoder.has_dictionary() {
1132            self.data_pages.push_back(compressed_page);
1133        } else {
1134            self.write_data_page(compressed_page)?;
1135        }
1136
1137        // Update total number of rows.
1138        self.column_metrics.total_rows_written += self.page_metrics.num_buffered_rows as u64;
1139
1140        // Reset state.
1141        self.rep_levels_sink.clear();
1142        self.def_levels_sink.clear();
1143        self.page_metrics.new_page();
1144
1145        Ok(())
1146    }
1147
1148    /// Finalises any outstanding data pages and flushes buffered data pages from
1149    /// dictionary encoding into underlying sink.
1150    #[inline]
1151    fn flush_data_pages(&mut self) -> Result<()> {
1152        // Write all outstanding data to a new page.
1153        if self.page_metrics.num_buffered_values > 0 {
1154            self.add_data_page()?;
1155        }
1156
1157        while let Some(page) = self.data_pages.pop_front() {
1158            self.write_data_page(page)?;
1159        }
1160
1161        Ok(())
1162    }
1163
1164    /// Assembles column chunk metadata.
1165    fn build_column_metadata(&mut self) -> Result<ColumnChunkMetaData> {
1166        let total_compressed_size = self.column_metrics.total_compressed_size as i64;
1167        let total_uncompressed_size = self.column_metrics.total_uncompressed_size as i64;
1168        let num_values = self.column_metrics.total_num_values as i64;
1169        let dict_page_offset = self.column_metrics.dictionary_page_offset.map(|v| v as i64);
1170        // If data page offset is not set, then no pages have been written
1171        let data_page_offset = self.column_metrics.data_page_offset.unwrap_or(0) as i64;
1172
1173        let mut builder = ColumnChunkMetaData::builder(self.descr.clone())
1174            .set_compression(self.codec)
1175            .set_encodings(self.encodings.iter().cloned().collect())
1176            .set_page_encoding_stats(self.encoding_stats.clone())
1177            .set_total_compressed_size(total_compressed_size)
1178            .set_total_uncompressed_size(total_uncompressed_size)
1179            .set_num_values(num_values)
1180            .set_data_page_offset(data_page_offset)
1181            .set_dictionary_page_offset(dict_page_offset);
1182
1183        if self.statistics_enabled != EnabledStatistics::None {
1184            let backwards_compatible_min_max = self.descr.sort_order().is_signed();
1185
1186            let statistics = ValueStatistics::<E::T>::new(
1187                self.column_metrics.min_column_value.clone(),
1188                self.column_metrics.max_column_value.clone(),
1189                self.column_metrics.column_distinct_count,
1190                Some(self.column_metrics.num_column_nulls),
1191                false,
1192            )
1193            .with_backwards_compatible_min_max(backwards_compatible_min_max)
1194            .into();
1195
1196            let statistics = self.truncate_statistics(statistics);
1197
1198            builder = builder
1199                .set_statistics(statistics)
1200                .set_unencoded_byte_array_data_bytes(self.column_metrics.variable_length_bytes)
1201                .set_repetition_level_histogram(
1202                    self.column_metrics.repetition_level_histogram.take(),
1203                )
1204                .set_definition_level_histogram(
1205                    self.column_metrics.definition_level_histogram.take(),
1206                );
1207        }
1208
1209        builder = self.set_column_chunk_encryption_properties(builder);
1210
1211        let metadata = builder.build()?;
1212        Ok(metadata)
1213    }
1214
1215    /// Encodes definition or repetition levels for Data Page v1.
1216    #[inline]
1217    fn encode_levels_v1(&self, encoding: Encoding, levels: &[i16], max_level: i16) -> Vec<u8> {
1218        let mut encoder = LevelEncoder::v1(encoding, max_level, levels.len());
1219        encoder.put(levels);
1220        encoder.consume()
1221    }
1222
1223    /// Encodes definition or repetition levels for Data Page v2.
1224    /// Encoding is always RLE.
1225    #[inline]
1226    fn encode_levels_v2(&self, levels: &[i16], max_level: i16) -> Vec<u8> {
1227        let mut encoder = LevelEncoder::v2(max_level, levels.len());
1228        encoder.put(levels);
1229        encoder.consume()
1230    }
1231
1232    /// Writes compressed data page into underlying sink and updates global metrics.
1233    #[inline]
1234    fn write_data_page(&mut self, page: CompressedPage) -> Result<()> {
1235        self.encodings.insert(page.encoding());
1236        match self.encoding_stats.last_mut() {
1237            Some(encoding_stats)
1238                if encoding_stats.page_type == page.page_type()
1239                    && encoding_stats.encoding == page.encoding() =>
1240            {
1241                encoding_stats.count += 1;
1242            }
1243            _ => {
1244                // data page type does not change inside a file
1245                // encoding can currently only change from dictionary to non-dictionary once
1246                self.encoding_stats.push(PageEncodingStats {
1247                    page_type: page.page_type(),
1248                    encoding: page.encoding(),
1249                    count: 1,
1250                });
1251            }
1252        }
1253        let page_spec = self.page_writer.write_page(page)?;
1254        // update offset index
1255        // compressed_size = header_size + compressed_data_size
1256        if let Some(builder) = self.offset_index_builder.as_mut() {
1257            builder
1258                .append_offset_and_size(page_spec.offset as i64, page_spec.compressed_size as i32)
1259        }
1260        self.update_metrics_for_page(page_spec);
1261        Ok(())
1262    }
1263
1264    /// Writes dictionary page into underlying sink.
1265    #[inline]
1266    fn write_dictionary_page(&mut self) -> Result<()> {
1267        let compressed_page = {
1268            let mut page = self
1269                .encoder
1270                .flush_dict_page()?
1271                .ok_or_else(|| general_err!("Dictionary encoder is not set"))?;
1272
1273            let uncompressed_size = page.buf.len();
1274
1275            if let Some(ref mut cmpr) = self.compressor {
1276                let mut output_buf = Vec::with_capacity(uncompressed_size);
1277                cmpr.compress(&page.buf, &mut output_buf)?;
1278                page.buf = Bytes::from(output_buf);
1279            }
1280
1281            let dict_page = Page::DictionaryPage {
1282                buf: page.buf,
1283                num_values: page.num_values as u32,
1284                encoding: self.props.dictionary_page_encoding(),
1285                is_sorted: page.is_sorted,
1286            };
1287            CompressedPage::new(dict_page, uncompressed_size)
1288        };
1289
1290        self.encodings.insert(compressed_page.encoding());
1291        self.encoding_stats.push(PageEncodingStats {
1292            page_type: PageType::DICTIONARY_PAGE,
1293            encoding: compressed_page.encoding(),
1294            count: 1,
1295        });
1296        let page_spec = self.page_writer.write_page(compressed_page)?;
1297        self.update_metrics_for_page(page_spec);
1298        // For the directory page, don't need to update column/offset index.
1299        Ok(())
1300    }
1301
1302    /// Updates column writer metrics with each page metadata.
1303    #[inline]
1304    fn update_metrics_for_page(&mut self, page_spec: PageWriteSpec) {
1305        self.column_metrics.total_uncompressed_size += page_spec.uncompressed_size as u64;
1306        self.column_metrics.total_compressed_size += page_spec.compressed_size as u64;
1307        self.column_metrics.total_bytes_written += page_spec.bytes_written;
1308
1309        match page_spec.page_type {
1310            PageType::DATA_PAGE | PageType::DATA_PAGE_V2 => {
1311                self.column_metrics.total_num_values += page_spec.num_values as u64;
1312                if self.column_metrics.data_page_offset.is_none() {
1313                    self.column_metrics.data_page_offset = Some(page_spec.offset);
1314                }
1315            }
1316            PageType::DICTIONARY_PAGE => {
1317                assert!(
1318                    self.column_metrics.dictionary_page_offset.is_none(),
1319                    "Dictionary offset is already set"
1320                );
1321                self.column_metrics.dictionary_page_offset = Some(page_spec.offset);
1322            }
1323            _ => {}
1324        }
1325    }
1326
1327    #[inline]
1328    #[cfg(feature = "encryption")]
1329    fn set_column_chunk_encryption_properties(
1330        &self,
1331        builder: ColumnChunkMetaDataBuilder,
1332    ) -> ColumnChunkMetaDataBuilder {
1333        if let Some(encryption_properties) = self.props.file_encryption_properties.as_ref() {
1334            builder.set_column_crypto_metadata(get_column_crypto_metadata(
1335                encryption_properties,
1336                &self.descr,
1337            ))
1338        } else {
1339            builder
1340        }
1341    }
1342
1343    #[inline]
1344    #[cfg(not(feature = "encryption"))]
1345    fn set_column_chunk_encryption_properties(
1346        &self,
1347        builder: ColumnChunkMetaDataBuilder,
1348    ) -> ColumnChunkMetaDataBuilder {
1349        builder
1350    }
1351}
1352
1353fn update_min<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, min: &mut Option<T>) {
1354    update_stat::<T, _>(descr, val, min, |cur| compare_greater(descr, cur, val))
1355}
1356
1357fn update_max<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, max: &mut Option<T>) {
1358    update_stat::<T, _>(descr, val, max, |cur| compare_greater(descr, val, cur))
1359}
1360
1361#[inline]
1362#[allow(clippy::eq_op)]
1363fn is_nan<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T) -> bool {
1364    match T::PHYSICAL_TYPE {
1365        Type::FLOAT | Type::DOUBLE => val != val,
1366        Type::FIXED_LEN_BYTE_ARRAY if descr.logical_type() == Some(LogicalType::Float16) => {
1367            let val = val.as_bytes();
1368            let val = f16::from_le_bytes([val[0], val[1]]);
1369            val.is_nan()
1370        }
1371        _ => false,
1372    }
1373}
1374
1375/// Perform a conditional update of `cur`, skipping any NaN values
1376///
1377/// If `cur` is `None`, sets `cur` to `Some(val)`, otherwise calls `should_update` with
1378/// the value of `cur`, and updates `cur` to `Some(val)` if it returns `true`
1379fn update_stat<T: ParquetValueType, F>(
1380    descr: &ColumnDescriptor,
1381    val: &T,
1382    cur: &mut Option<T>,
1383    should_update: F,
1384) where
1385    F: Fn(&T) -> bool,
1386{
1387    if is_nan(descr, val) {
1388        return;
1389    }
1390
1391    if cur.as_ref().map_or(true, should_update) {
1392        *cur = Some(val.clone());
1393    }
1394}
1395
1396/// Evaluate `a > b` according to underlying logical type.
1397fn compare_greater<T: ParquetValueType>(descr: &ColumnDescriptor, a: &T, b: &T) -> bool {
1398    if let Some(LogicalType::Integer { is_signed, .. }) = descr.logical_type() {
1399        if !is_signed {
1400            // need to compare unsigned
1401            return a.as_u64().unwrap() > b.as_u64().unwrap();
1402        }
1403    }
1404
1405    match descr.converted_type() {
1406        ConvertedType::UINT_8
1407        | ConvertedType::UINT_16
1408        | ConvertedType::UINT_32
1409        | ConvertedType::UINT_64 => {
1410            return a.as_u64().unwrap() > b.as_u64().unwrap();
1411        }
1412        _ => {}
1413    };
1414
1415    if let Some(LogicalType::Decimal { .. }) = descr.logical_type() {
1416        match T::PHYSICAL_TYPE {
1417            Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
1418                return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1419            }
1420            _ => {}
1421        };
1422    }
1423
1424    if descr.converted_type() == ConvertedType::DECIMAL {
1425        match T::PHYSICAL_TYPE {
1426            Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
1427                return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1428            }
1429            _ => {}
1430        };
1431    };
1432
1433    if let Some(LogicalType::Float16) = descr.logical_type() {
1434        let a = a.as_bytes();
1435        let a = f16::from_le_bytes([a[0], a[1]]);
1436        let b = b.as_bytes();
1437        let b = f16::from_le_bytes([b[0], b[1]]);
1438        return a > b;
1439    }
1440
1441    a > b
1442}
1443
1444// ----------------------------------------------------------------------
1445// Encoding support for column writer.
1446// This mirrors parquet-mr default encodings for writes. See:
1447// https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV1ValuesWriterFactory.java
1448// https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV2ValuesWriterFactory.java
1449
1450/// Returns encoding for a column when no other encoding is provided in writer properties.
1451fn fallback_encoding(kind: Type, props: &WriterProperties) -> Encoding {
1452    match (kind, props.writer_version()) {
1453        (Type::BOOLEAN, WriterVersion::PARQUET_2_0) => Encoding::RLE,
1454        (Type::INT32, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1455        (Type::INT64, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1456        (Type::BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1457        (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1458        _ => Encoding::PLAIN,
1459    }
1460}
1461
1462/// Returns true if dictionary is supported for column writer, false otherwise.
1463fn has_dictionary_support(kind: Type, props: &WriterProperties) -> bool {
1464    match (kind, props.writer_version()) {
1465        // Booleans do not support dict encoding and should use a fallback encoding.
1466        (Type::BOOLEAN, _) => false,
1467        // Dictionary encoding was not enabled in PARQUET 1.0
1468        (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_1_0) => false,
1469        (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => true,
1470        _ => true,
1471    }
1472}
1473
1474/// Signed comparison of bytes arrays
1475fn compare_greater_byte_array_decimals(a: &[u8], b: &[u8]) -> bool {
1476    let a_length = a.len();
1477    let b_length = b.len();
1478
1479    if a_length == 0 || b_length == 0 {
1480        return a_length > 0;
1481    }
1482
1483    let first_a: u8 = a[0];
1484    let first_b: u8 = b[0];
1485
1486    // We can short circuit for different signed numbers or
1487    // for equal length bytes arrays that have different first bytes.
1488    // The equality requirement is necessary for sign extension cases.
1489    // 0xFF10 should be equal to 0x10 (due to big endian sign extension).
1490    if (0x80 & first_a) != (0x80 & first_b) || (a_length == b_length && first_a != first_b) {
1491        return (first_a as i8) > (first_b as i8);
1492    }
1493
1494    // When the lengths are unequal and the numbers are of the same
1495    // sign we need to do comparison by sign extending the shorter
1496    // value first, and once we get to equal sized arrays, lexicographical
1497    // unsigned comparison of everything but the first byte is sufficient.
1498
1499    let extension: u8 = if (first_a as i8) < 0 { 0xFF } else { 0 };
1500
1501    if a_length != b_length {
1502        let not_equal = if a_length > b_length {
1503            let lead_length = a_length - b_length;
1504            a[0..lead_length].iter().any(|&x| x != extension)
1505        } else {
1506            let lead_length = b_length - a_length;
1507            b[0..lead_length].iter().any(|&x| x != extension)
1508        };
1509
1510        if not_equal {
1511            let negative_values: bool = (first_a as i8) < 0;
1512            let a_longer: bool = a_length > b_length;
1513            return if negative_values { !a_longer } else { a_longer };
1514        }
1515    }
1516
1517    (a[1..]) > (b[1..])
1518}
1519
1520/// Truncate a UTF-8 slice to the longest prefix that is still a valid UTF-8 string,
1521/// while being less than `length` bytes and non-empty. Returns `None` if truncation
1522/// is not possible within those constraints.
1523///
1524/// The caller guarantees that data.len() > length.
1525fn truncate_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1526    let split = (1..=length).rfind(|x| data.is_char_boundary(*x))?;
1527    Some(data.as_bytes()[..split].to_vec())
1528}
1529
1530/// Truncate a UTF-8 slice and increment it's final character. The returned value is the
1531/// longest such slice that is still a valid UTF-8 string while being less than `length`
1532/// bytes and non-empty. Returns `None` if no such transformation is possible.
1533///
1534/// The caller guarantees that data.len() > length.
1535fn truncate_and_increment_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1536    // UTF-8 is max 4 bytes, so start search 3 back from desired length
1537    let lower_bound = length.saturating_sub(3);
1538    let split = (lower_bound..=length).rfind(|x| data.is_char_boundary(*x))?;
1539    increment_utf8(data.get(..split)?)
1540}
1541
1542/// Increment the final character in a UTF-8 string in such a way that the returned result
1543/// is still a valid UTF-8 string. The returned string may be shorter than the input if the
1544/// last character(s) cannot be incremented (due to overflow or producing invalid code points).
1545/// Returns `None` if the string cannot be incremented.
1546///
1547/// Note that this implementation will not promote an N-byte code point to (N+1) bytes.
1548fn increment_utf8(data: &str) -> Option<Vec<u8>> {
1549    for (idx, original_char) in data.char_indices().rev() {
1550        let original_len = original_char.len_utf8();
1551        if let Some(next_char) = char::from_u32(original_char as u32 + 1) {
1552            // do not allow increasing byte width of incremented char
1553            if next_char.len_utf8() == original_len {
1554                let mut result = data.as_bytes()[..idx + original_len].to_vec();
1555                next_char.encode_utf8(&mut result[idx..]);
1556                return Some(result);
1557            }
1558        }
1559    }
1560
1561    None
1562}
1563
1564/// Try and increment the bytes from right to left.
1565///
1566/// Returns `None` if all bytes are set to `u8::MAX`.
1567fn increment(mut data: Vec<u8>) -> Option<Vec<u8>> {
1568    for byte in data.iter_mut().rev() {
1569        let (incremented, overflow) = byte.overflowing_add(1);
1570        *byte = incremented;
1571
1572        if !overflow {
1573            return Some(data);
1574        }
1575    }
1576
1577    None
1578}
1579
1580#[cfg(test)]
1581mod tests {
1582    use crate::{
1583        file::{properties::DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, writer::SerializedFileWriter},
1584        schema::parser::parse_message_type,
1585    };
1586    use core::str;
1587    use rand::distr::uniform::SampleUniform;
1588    use std::{fs::File, sync::Arc};
1589
1590    use crate::column::{
1591        page::PageReader,
1592        reader::{get_column_reader, get_typed_column_reader, ColumnReaderImpl},
1593    };
1594    use crate::file::writer::TrackedWrite;
1595    use crate::file::{
1596        properties::ReaderProperties, reader::SerializedPageReader, writer::SerializedPageWriter,
1597    };
1598    use crate::schema::types::{ColumnPath, Type as SchemaType};
1599    use crate::util::test_common::rand_gen::random_numbers_range;
1600
1601    use super::*;
1602
1603    #[test]
1604    fn test_column_writer_inconsistent_def_rep_length() {
1605        let page_writer = get_test_page_writer();
1606        let props = Default::default();
1607        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 1, props);
1608        let res = writer.write_batch(&[1, 2, 3, 4], Some(&[1, 1, 1]), Some(&[0, 0]));
1609        assert!(res.is_err());
1610        if let Err(err) = res {
1611            assert_eq!(
1612                format!("{err}"),
1613                "Parquet error: Inconsistent length of definition and repetition levels: 3 != 2"
1614            );
1615        }
1616    }
1617
1618    #[test]
1619    fn test_column_writer_invalid_def_levels() {
1620        let page_writer = get_test_page_writer();
1621        let props = Default::default();
1622        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1623        let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1624        assert!(res.is_err());
1625        if let Err(err) = res {
1626            assert_eq!(
1627                format!("{err}"),
1628                "Parquet error: Definition levels are required, because max definition level = 1"
1629            );
1630        }
1631    }
1632
1633    #[test]
1634    fn test_column_writer_invalid_rep_levels() {
1635        let page_writer = get_test_page_writer();
1636        let props = Default::default();
1637        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 1, props);
1638        let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1639        assert!(res.is_err());
1640        if let Err(err) = res {
1641            assert_eq!(
1642                format!("{err}"),
1643                "Parquet error: Repetition levels are required, because max repetition level = 1"
1644            );
1645        }
1646    }
1647
1648    #[test]
1649    fn test_column_writer_not_enough_values_to_write() {
1650        let page_writer = get_test_page_writer();
1651        let props = Default::default();
1652        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1653        let res = writer.write_batch(&[1, 2], Some(&[1, 1, 1, 1]), None);
1654        assert!(res.is_err());
1655        if let Err(err) = res {
1656            assert_eq!(
1657                format!("{err}"),
1658                "Parquet error: Expected to write 4 values, but have only 2"
1659            );
1660        }
1661    }
1662
1663    #[test]
1664    fn test_column_writer_write_only_one_dictionary_page() {
1665        let page_writer = get_test_page_writer();
1666        let props = Default::default();
1667        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1668        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1669        // First page should be correctly written.
1670        writer.add_data_page().unwrap();
1671        writer.write_dictionary_page().unwrap();
1672        let err = writer.write_dictionary_page().unwrap_err().to_string();
1673        assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1674    }
1675
1676    #[test]
1677    fn test_column_writer_error_when_writing_disabled_dictionary() {
1678        let page_writer = get_test_page_writer();
1679        let props = Arc::new(
1680            WriterProperties::builder()
1681                .set_dictionary_enabled(false)
1682                .build(),
1683        );
1684        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1685        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1686        let err = writer.write_dictionary_page().unwrap_err().to_string();
1687        assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1688    }
1689
1690    #[test]
1691    fn test_column_writer_boolean_type_does_not_support_dictionary() {
1692        let page_writer = get_test_page_writer();
1693        let props = Arc::new(
1694            WriterProperties::builder()
1695                .set_dictionary_enabled(true)
1696                .build(),
1697        );
1698        let mut writer = get_test_column_writer::<BoolType>(page_writer, 0, 0, props);
1699        writer
1700            .write_batch(&[true, false, true, false], None, None)
1701            .unwrap();
1702
1703        let r = writer.close().unwrap();
1704        // PlainEncoder uses bit writer to write boolean values, which all fit into 1
1705        // byte.
1706        assert_eq!(r.bytes_written, 1);
1707        assert_eq!(r.rows_written, 4);
1708
1709        let metadata = r.metadata;
1710        assert_eq!(metadata.encodings(), &vec![Encoding::PLAIN, Encoding::RLE]);
1711        assert_eq!(metadata.num_values(), 4); // just values
1712        assert_eq!(metadata.dictionary_page_offset(), None);
1713    }
1714
1715    #[test]
1716    fn test_column_writer_default_encoding_support_bool() {
1717        check_encoding_write_support::<BoolType>(
1718            WriterVersion::PARQUET_1_0,
1719            true,
1720            &[true, false],
1721            None,
1722            &[Encoding::PLAIN, Encoding::RLE],
1723            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1724        );
1725        check_encoding_write_support::<BoolType>(
1726            WriterVersion::PARQUET_1_0,
1727            false,
1728            &[true, false],
1729            None,
1730            &[Encoding::PLAIN, Encoding::RLE],
1731            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1732        );
1733        check_encoding_write_support::<BoolType>(
1734            WriterVersion::PARQUET_2_0,
1735            true,
1736            &[true, false],
1737            None,
1738            &[Encoding::RLE],
1739            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE, 1)],
1740        );
1741        check_encoding_write_support::<BoolType>(
1742            WriterVersion::PARQUET_2_0,
1743            false,
1744            &[true, false],
1745            None,
1746            &[Encoding::RLE],
1747            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE, 1)],
1748        );
1749    }
1750
1751    #[test]
1752    fn test_column_writer_default_encoding_support_int32() {
1753        check_encoding_write_support::<Int32Type>(
1754            WriterVersion::PARQUET_1_0,
1755            true,
1756            &[1, 2],
1757            Some(0),
1758            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1759            &[
1760                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1761                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1762            ],
1763        );
1764        check_encoding_write_support::<Int32Type>(
1765            WriterVersion::PARQUET_1_0,
1766            false,
1767            &[1, 2],
1768            None,
1769            &[Encoding::PLAIN, Encoding::RLE],
1770            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1771        );
1772        check_encoding_write_support::<Int32Type>(
1773            WriterVersion::PARQUET_2_0,
1774            true,
1775            &[1, 2],
1776            Some(0),
1777            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1778            &[
1779                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1780                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1781            ],
1782        );
1783        check_encoding_write_support::<Int32Type>(
1784            WriterVersion::PARQUET_2_0,
1785            false,
1786            &[1, 2],
1787            None,
1788            &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
1789            &[encoding_stats(
1790                PageType::DATA_PAGE_V2,
1791                Encoding::DELTA_BINARY_PACKED,
1792                1,
1793            )],
1794        );
1795    }
1796
1797    #[test]
1798    fn test_column_writer_default_encoding_support_int64() {
1799        check_encoding_write_support::<Int64Type>(
1800            WriterVersion::PARQUET_1_0,
1801            true,
1802            &[1, 2],
1803            Some(0),
1804            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1805            &[
1806                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1807                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1808            ],
1809        );
1810        check_encoding_write_support::<Int64Type>(
1811            WriterVersion::PARQUET_1_0,
1812            false,
1813            &[1, 2],
1814            None,
1815            &[Encoding::PLAIN, Encoding::RLE],
1816            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1817        );
1818        check_encoding_write_support::<Int64Type>(
1819            WriterVersion::PARQUET_2_0,
1820            true,
1821            &[1, 2],
1822            Some(0),
1823            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1824            &[
1825                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1826                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1827            ],
1828        );
1829        check_encoding_write_support::<Int64Type>(
1830            WriterVersion::PARQUET_2_0,
1831            false,
1832            &[1, 2],
1833            None,
1834            &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
1835            &[encoding_stats(
1836                PageType::DATA_PAGE_V2,
1837                Encoding::DELTA_BINARY_PACKED,
1838                1,
1839            )],
1840        );
1841    }
1842
1843    #[test]
1844    fn test_column_writer_default_encoding_support_int96() {
1845        check_encoding_write_support::<Int96Type>(
1846            WriterVersion::PARQUET_1_0,
1847            true,
1848            &[Int96::from(vec![1, 2, 3])],
1849            Some(0),
1850            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1851            &[
1852                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1853                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1854            ],
1855        );
1856        check_encoding_write_support::<Int96Type>(
1857            WriterVersion::PARQUET_1_0,
1858            false,
1859            &[Int96::from(vec![1, 2, 3])],
1860            None,
1861            &[Encoding::PLAIN, Encoding::RLE],
1862            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1863        );
1864        check_encoding_write_support::<Int96Type>(
1865            WriterVersion::PARQUET_2_0,
1866            true,
1867            &[Int96::from(vec![1, 2, 3])],
1868            Some(0),
1869            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1870            &[
1871                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1872                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1873            ],
1874        );
1875        check_encoding_write_support::<Int96Type>(
1876            WriterVersion::PARQUET_2_0,
1877            false,
1878            &[Int96::from(vec![1, 2, 3])],
1879            None,
1880            &[Encoding::PLAIN, Encoding::RLE],
1881            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1882        );
1883    }
1884
1885    #[test]
1886    fn test_column_writer_default_encoding_support_float() {
1887        check_encoding_write_support::<FloatType>(
1888            WriterVersion::PARQUET_1_0,
1889            true,
1890            &[1.0, 2.0],
1891            Some(0),
1892            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1893            &[
1894                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1895                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1896            ],
1897        );
1898        check_encoding_write_support::<FloatType>(
1899            WriterVersion::PARQUET_1_0,
1900            false,
1901            &[1.0, 2.0],
1902            None,
1903            &[Encoding::PLAIN, Encoding::RLE],
1904            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1905        );
1906        check_encoding_write_support::<FloatType>(
1907            WriterVersion::PARQUET_2_0,
1908            true,
1909            &[1.0, 2.0],
1910            Some(0),
1911            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1912            &[
1913                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1914                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1915            ],
1916        );
1917        check_encoding_write_support::<FloatType>(
1918            WriterVersion::PARQUET_2_0,
1919            false,
1920            &[1.0, 2.0],
1921            None,
1922            &[Encoding::PLAIN, Encoding::RLE],
1923            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1924        );
1925    }
1926
1927    #[test]
1928    fn test_column_writer_default_encoding_support_double() {
1929        check_encoding_write_support::<DoubleType>(
1930            WriterVersion::PARQUET_1_0,
1931            true,
1932            &[1.0, 2.0],
1933            Some(0),
1934            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1935            &[
1936                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1937                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1938            ],
1939        );
1940        check_encoding_write_support::<DoubleType>(
1941            WriterVersion::PARQUET_1_0,
1942            false,
1943            &[1.0, 2.0],
1944            None,
1945            &[Encoding::PLAIN, Encoding::RLE],
1946            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1947        );
1948        check_encoding_write_support::<DoubleType>(
1949            WriterVersion::PARQUET_2_0,
1950            true,
1951            &[1.0, 2.0],
1952            Some(0),
1953            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1954            &[
1955                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1956                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1957            ],
1958        );
1959        check_encoding_write_support::<DoubleType>(
1960            WriterVersion::PARQUET_2_0,
1961            false,
1962            &[1.0, 2.0],
1963            None,
1964            &[Encoding::PLAIN, Encoding::RLE],
1965            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1966        );
1967    }
1968
1969    #[test]
1970    fn test_column_writer_default_encoding_support_byte_array() {
1971        check_encoding_write_support::<ByteArrayType>(
1972            WriterVersion::PARQUET_1_0,
1973            true,
1974            &[ByteArray::from(vec![1u8])],
1975            Some(0),
1976            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1977            &[
1978                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1979                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1980            ],
1981        );
1982        check_encoding_write_support::<ByteArrayType>(
1983            WriterVersion::PARQUET_1_0,
1984            false,
1985            &[ByteArray::from(vec![1u8])],
1986            None,
1987            &[Encoding::PLAIN, Encoding::RLE],
1988            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1989        );
1990        check_encoding_write_support::<ByteArrayType>(
1991            WriterVersion::PARQUET_2_0,
1992            true,
1993            &[ByteArray::from(vec![1u8])],
1994            Some(0),
1995            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1996            &[
1997                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1998                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1999            ],
2000        );
2001        check_encoding_write_support::<ByteArrayType>(
2002            WriterVersion::PARQUET_2_0,
2003            false,
2004            &[ByteArray::from(vec![1u8])],
2005            None,
2006            &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
2007            &[encoding_stats(
2008                PageType::DATA_PAGE_V2,
2009                Encoding::DELTA_BYTE_ARRAY,
2010                1,
2011            )],
2012        );
2013    }
2014
2015    #[test]
2016    fn test_column_writer_default_encoding_support_fixed_len_byte_array() {
2017        check_encoding_write_support::<FixedLenByteArrayType>(
2018            WriterVersion::PARQUET_1_0,
2019            true,
2020            &[ByteArray::from(vec![1u8]).into()],
2021            None,
2022            &[Encoding::PLAIN, Encoding::RLE],
2023            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2024        );
2025        check_encoding_write_support::<FixedLenByteArrayType>(
2026            WriterVersion::PARQUET_1_0,
2027            false,
2028            &[ByteArray::from(vec![1u8]).into()],
2029            None,
2030            &[Encoding::PLAIN, Encoding::RLE],
2031            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2032        );
2033        check_encoding_write_support::<FixedLenByteArrayType>(
2034            WriterVersion::PARQUET_2_0,
2035            true,
2036            &[ByteArray::from(vec![1u8]).into()],
2037            Some(0),
2038            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2039            &[
2040                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2041                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2042            ],
2043        );
2044        check_encoding_write_support::<FixedLenByteArrayType>(
2045            WriterVersion::PARQUET_2_0,
2046            false,
2047            &[ByteArray::from(vec![1u8]).into()],
2048            None,
2049            &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
2050            &[encoding_stats(
2051                PageType::DATA_PAGE_V2,
2052                Encoding::DELTA_BYTE_ARRAY,
2053                1,
2054            )],
2055        );
2056    }
2057
2058    #[test]
2059    fn test_column_writer_check_metadata() {
2060        let page_writer = get_test_page_writer();
2061        let props = Default::default();
2062        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2063        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2064
2065        let r = writer.close().unwrap();
2066        assert_eq!(r.bytes_written, 20);
2067        assert_eq!(r.rows_written, 4);
2068
2069        let metadata = r.metadata;
2070        assert_eq!(
2071            metadata.encodings(),
2072            &vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
2073        );
2074        assert_eq!(metadata.num_values(), 4);
2075        assert_eq!(metadata.compressed_size(), 20);
2076        assert_eq!(metadata.uncompressed_size(), 20);
2077        assert_eq!(metadata.data_page_offset(), 0);
2078        assert_eq!(metadata.dictionary_page_offset(), Some(0));
2079        if let Some(stats) = metadata.statistics() {
2080            assert_eq!(stats.null_count_opt(), Some(0));
2081            assert_eq!(stats.distinct_count_opt(), None);
2082            if let Statistics::Int32(stats) = stats {
2083                assert_eq!(stats.min_opt().unwrap(), &1);
2084                assert_eq!(stats.max_opt().unwrap(), &4);
2085            } else {
2086                panic!("expecting Statistics::Int32");
2087            }
2088        } else {
2089            panic!("metadata missing statistics");
2090        }
2091    }
2092
2093    #[test]
2094    fn test_column_writer_check_byte_array_min_max() {
2095        let page_writer = get_test_page_writer();
2096        let props = Default::default();
2097        let mut writer = get_test_decimals_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
2098        writer
2099            .write_batch(
2100                &[
2101                    ByteArray::from(vec![
2102                        255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
2103                        35u8, 231u8, 90u8, 0u8, 0u8,
2104                    ]),
2105                    ByteArray::from(vec![
2106                        255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 228u8, 62u8, 146u8,
2107                        152u8, 177u8, 56u8, 0u8, 0u8,
2108                    ]),
2109                    ByteArray::from(vec![
2110                        0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8,
2111                        0u8,
2112                    ]),
2113                    ByteArray::from(vec![
2114                        0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
2115                        44u8, 0u8, 0u8,
2116                    ]),
2117                ],
2118                None,
2119                None,
2120            )
2121            .unwrap();
2122        let metadata = writer.close().unwrap().metadata;
2123        if let Some(stats) = metadata.statistics() {
2124            if let Statistics::ByteArray(stats) = stats {
2125                assert_eq!(
2126                    stats.min_opt().unwrap(),
2127                    &ByteArray::from(vec![
2128                        255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
2129                        35u8, 231u8, 90u8, 0u8, 0u8,
2130                    ])
2131                );
2132                assert_eq!(
2133                    stats.max_opt().unwrap(),
2134                    &ByteArray::from(vec![
2135                        0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
2136                        44u8, 0u8, 0u8,
2137                    ])
2138                );
2139            } else {
2140                panic!("expecting Statistics::ByteArray");
2141            }
2142        } else {
2143            panic!("metadata missing statistics");
2144        }
2145    }
2146
2147    #[test]
2148    fn test_column_writer_uint32_converted_type_min_max() {
2149        let page_writer = get_test_page_writer();
2150        let props = Default::default();
2151        let mut writer = get_test_unsigned_int_given_as_converted_column_writer::<Int32Type>(
2152            page_writer,
2153            0,
2154            0,
2155            props,
2156        );
2157        writer.write_batch(&[0, 1, 2, 3, 4, 5], None, None).unwrap();
2158        let metadata = writer.close().unwrap().metadata;
2159        if let Some(stats) = metadata.statistics() {
2160            if let Statistics::Int32(stats) = stats {
2161                assert_eq!(stats.min_opt().unwrap(), &0,);
2162                assert_eq!(stats.max_opt().unwrap(), &5,);
2163            } else {
2164                panic!("expecting Statistics::Int32");
2165            }
2166        } else {
2167            panic!("metadata missing statistics");
2168        }
2169    }
2170
2171    #[test]
2172    fn test_column_writer_precalculated_statistics() {
2173        let page_writer = get_test_page_writer();
2174        let props = Arc::new(
2175            WriterProperties::builder()
2176                .set_statistics_enabled(EnabledStatistics::Chunk)
2177                .build(),
2178        );
2179        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2180        writer
2181            .write_batch_with_statistics(
2182                &[1, 2, 3, 4],
2183                None,
2184                None,
2185                Some(&-17),
2186                Some(&9000),
2187                Some(55),
2188            )
2189            .unwrap();
2190
2191        let r = writer.close().unwrap();
2192        assert_eq!(r.bytes_written, 20);
2193        assert_eq!(r.rows_written, 4);
2194
2195        let metadata = r.metadata;
2196        assert_eq!(
2197            metadata.encodings(),
2198            &vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
2199        );
2200        assert_eq!(metadata.num_values(), 4);
2201        assert_eq!(metadata.compressed_size(), 20);
2202        assert_eq!(metadata.uncompressed_size(), 20);
2203        assert_eq!(metadata.data_page_offset(), 0);
2204        assert_eq!(metadata.dictionary_page_offset(), Some(0));
2205        if let Some(stats) = metadata.statistics() {
2206            assert_eq!(stats.null_count_opt(), Some(0));
2207            assert_eq!(stats.distinct_count_opt().unwrap_or(0), 55);
2208            if let Statistics::Int32(stats) = stats {
2209                assert_eq!(stats.min_opt().unwrap(), &-17);
2210                assert_eq!(stats.max_opt().unwrap(), &9000);
2211            } else {
2212                panic!("expecting Statistics::Int32");
2213            }
2214        } else {
2215            panic!("metadata missing statistics");
2216        }
2217    }
2218
2219    #[test]
2220    fn test_mixed_precomputed_statistics() {
2221        let mut buf = Vec::with_capacity(100);
2222        let mut write = TrackedWrite::new(&mut buf);
2223        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2224        let props = Arc::new(
2225            WriterProperties::builder()
2226                .set_write_page_header_statistics(true)
2227                .build(),
2228        );
2229        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2230
2231        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2232        writer
2233            .write_batch_with_statistics(&[5, 6, 7], None, None, Some(&5), Some(&7), Some(3))
2234            .unwrap();
2235
2236        let r = writer.close().unwrap();
2237
2238        let stats = r.metadata.statistics().unwrap();
2239        assert_eq!(stats.min_bytes_opt().unwrap(), 1_i32.to_le_bytes());
2240        assert_eq!(stats.max_bytes_opt().unwrap(), 7_i32.to_le_bytes());
2241        assert_eq!(stats.null_count_opt(), Some(0));
2242        assert!(stats.distinct_count_opt().is_none());
2243
2244        drop(write);
2245
2246        let props = ReaderProperties::builder()
2247            .set_backward_compatible_lz4(false)
2248            .build();
2249        let reader = SerializedPageReader::new_with_properties(
2250            Arc::new(Bytes::from(buf)),
2251            &r.metadata,
2252            r.rows_written as usize,
2253            None,
2254            Arc::new(props),
2255        )
2256        .unwrap();
2257
2258        let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2259        assert_eq!(pages.len(), 2);
2260
2261        assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2262        assert_eq!(pages[1].page_type(), PageType::DATA_PAGE);
2263
2264        let page_statistics = pages[1].statistics().unwrap();
2265        assert_eq!(
2266            page_statistics.min_bytes_opt().unwrap(),
2267            1_i32.to_le_bytes()
2268        );
2269        assert_eq!(
2270            page_statistics.max_bytes_opt().unwrap(),
2271            7_i32.to_le_bytes()
2272        );
2273        assert_eq!(page_statistics.null_count_opt(), Some(0));
2274        assert!(page_statistics.distinct_count_opt().is_none());
2275    }
2276
2277    #[test]
2278    fn test_disabled_statistics() {
2279        let mut buf = Vec::with_capacity(100);
2280        let mut write = TrackedWrite::new(&mut buf);
2281        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2282        let props = WriterProperties::builder()
2283            .set_statistics_enabled(EnabledStatistics::None)
2284            .set_writer_version(WriterVersion::PARQUET_2_0)
2285            .build();
2286        let props = Arc::new(props);
2287
2288        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
2289        writer
2290            .write_batch(&[1, 2, 3, 4], Some(&[1, 0, 0, 1, 1, 1]), None)
2291            .unwrap();
2292
2293        let r = writer.close().unwrap();
2294        assert!(r.metadata.statistics().is_none());
2295
2296        drop(write);
2297
2298        let props = ReaderProperties::builder()
2299            .set_backward_compatible_lz4(false)
2300            .build();
2301        let reader = SerializedPageReader::new_with_properties(
2302            Arc::new(Bytes::from(buf)),
2303            &r.metadata,
2304            r.rows_written as usize,
2305            None,
2306            Arc::new(props),
2307        )
2308        .unwrap();
2309
2310        let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2311        assert_eq!(pages.len(), 2);
2312
2313        assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2314        assert_eq!(pages[1].page_type(), PageType::DATA_PAGE_V2);
2315
2316        match &pages[1] {
2317            Page::DataPageV2 {
2318                num_values,
2319                num_nulls,
2320                num_rows,
2321                statistics,
2322                ..
2323            } => {
2324                assert_eq!(*num_values, 6);
2325                assert_eq!(*num_nulls, 2);
2326                assert_eq!(*num_rows, 6);
2327                assert!(statistics.is_none());
2328            }
2329            _ => unreachable!(),
2330        }
2331    }
2332
2333    #[test]
2334    fn test_column_writer_empty_column_roundtrip() {
2335        let props = Default::default();
2336        column_roundtrip::<Int32Type>(props, &[], None, None);
2337    }
2338
2339    #[test]
2340    fn test_column_writer_non_nullable_values_roundtrip() {
2341        let props = Default::default();
2342        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 0, 0);
2343    }
2344
2345    #[test]
2346    fn test_column_writer_nullable_non_repeated_values_roundtrip() {
2347        let props = Default::default();
2348        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 0);
2349    }
2350
2351    #[test]
2352    fn test_column_writer_nullable_repeated_values_roundtrip() {
2353        let props = Default::default();
2354        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2355    }
2356
2357    #[test]
2358    fn test_column_writer_dictionary_fallback_small_data_page() {
2359        let props = WriterProperties::builder()
2360            .set_dictionary_page_size_limit(32)
2361            .set_data_page_size_limit(32)
2362            .build();
2363        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2364    }
2365
2366    #[test]
2367    fn test_column_writer_small_write_batch_size() {
2368        for i in &[1usize, 2, 5, 10, 11, 1023] {
2369            let props = WriterProperties::builder().set_write_batch_size(*i).build();
2370
2371            column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2372        }
2373    }
2374
2375    #[test]
2376    fn test_column_writer_dictionary_disabled_v1() {
2377        let props = WriterProperties::builder()
2378            .set_writer_version(WriterVersion::PARQUET_1_0)
2379            .set_dictionary_enabled(false)
2380            .build();
2381        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2382    }
2383
2384    #[test]
2385    fn test_column_writer_dictionary_disabled_v2() {
2386        let props = WriterProperties::builder()
2387            .set_writer_version(WriterVersion::PARQUET_2_0)
2388            .set_dictionary_enabled(false)
2389            .build();
2390        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2391    }
2392
2393    #[test]
2394    fn test_column_writer_compression_v1() {
2395        let props = WriterProperties::builder()
2396            .set_writer_version(WriterVersion::PARQUET_1_0)
2397            .set_compression(Compression::SNAPPY)
2398            .build();
2399        column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2400    }
2401
2402    #[test]
2403    fn test_column_writer_compression_v2() {
2404        let props = WriterProperties::builder()
2405            .set_writer_version(WriterVersion::PARQUET_2_0)
2406            .set_compression(Compression::SNAPPY)
2407            .build();
2408        column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2409    }
2410
2411    #[test]
2412    fn test_column_writer_add_data_pages_with_dict() {
2413        // ARROW-5129: Test verifies that we add data page in case of dictionary encoding
2414        // and no fallback occurred so far.
2415        let mut file = tempfile::tempfile().unwrap();
2416        let mut write = TrackedWrite::new(&mut file);
2417        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2418        let props = Arc::new(
2419            WriterProperties::builder()
2420                .set_data_page_size_limit(10)
2421                .set_write_batch_size(3) // write 3 values at a time
2422                .build(),
2423        );
2424        let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
2425        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2426        writer.write_batch(data, None, None).unwrap();
2427        let r = writer.close().unwrap();
2428
2429        drop(write);
2430
2431        // Read pages and check the sequence
2432        let props = ReaderProperties::builder()
2433            .set_backward_compatible_lz4(false)
2434            .build();
2435        let mut page_reader = Box::new(
2436            SerializedPageReader::new_with_properties(
2437                Arc::new(file),
2438                &r.metadata,
2439                r.rows_written as usize,
2440                None,
2441                Arc::new(props),
2442            )
2443            .unwrap(),
2444        );
2445        let mut res = Vec::new();
2446        while let Some(page) = page_reader.get_next_page().unwrap() {
2447            res.push((page.page_type(), page.num_values(), page.buffer().len()));
2448        }
2449        assert_eq!(
2450            res,
2451            vec![
2452                (PageType::DICTIONARY_PAGE, 10, 40),
2453                (PageType::DATA_PAGE, 9, 10),
2454                (PageType::DATA_PAGE, 1, 3),
2455            ]
2456        );
2457        assert_eq!(
2458            r.metadata.page_encoding_stats(),
2459            Some(&vec![
2460                PageEncodingStats {
2461                    page_type: PageType::DICTIONARY_PAGE,
2462                    encoding: Encoding::PLAIN,
2463                    count: 1
2464                },
2465                PageEncodingStats {
2466                    page_type: PageType::DATA_PAGE,
2467                    encoding: Encoding::RLE_DICTIONARY,
2468                    count: 2,
2469                }
2470            ])
2471        );
2472    }
2473
2474    #[test]
2475    fn test_bool_statistics() {
2476        let stats = statistics_roundtrip::<BoolType>(&[true, false, false, true]);
2477        // Booleans have an unsigned sort order and so are not compatible
2478        // with the deprecated `min` and `max` statistics
2479        assert!(!stats.is_min_max_backwards_compatible());
2480        if let Statistics::Boolean(stats) = stats {
2481            assert_eq!(stats.min_opt().unwrap(), &false);
2482            assert_eq!(stats.max_opt().unwrap(), &true);
2483        } else {
2484            panic!("expecting Statistics::Boolean, got {stats:?}");
2485        }
2486    }
2487
2488    #[test]
2489    fn test_int32_statistics() {
2490        let stats = statistics_roundtrip::<Int32Type>(&[-1, 3, -2, 2]);
2491        assert!(stats.is_min_max_backwards_compatible());
2492        if let Statistics::Int32(stats) = stats {
2493            assert_eq!(stats.min_opt().unwrap(), &-2);
2494            assert_eq!(stats.max_opt().unwrap(), &3);
2495        } else {
2496            panic!("expecting Statistics::Int32, got {stats:?}");
2497        }
2498    }
2499
2500    #[test]
2501    fn test_int64_statistics() {
2502        let stats = statistics_roundtrip::<Int64Type>(&[-1, 3, -2, 2]);
2503        assert!(stats.is_min_max_backwards_compatible());
2504        if let Statistics::Int64(stats) = stats {
2505            assert_eq!(stats.min_opt().unwrap(), &-2);
2506            assert_eq!(stats.max_opt().unwrap(), &3);
2507        } else {
2508            panic!("expecting Statistics::Int64, got {stats:?}");
2509        }
2510    }
2511
2512    #[test]
2513    fn test_int96_statistics() {
2514        let input = vec![
2515            Int96::from(vec![1, 20, 30]),
2516            Int96::from(vec![3, 20, 10]),
2517            Int96::from(vec![0, 20, 30]),
2518            Int96::from(vec![2, 20, 30]),
2519        ]
2520        .into_iter()
2521        .collect::<Vec<Int96>>();
2522
2523        let stats = statistics_roundtrip::<Int96Type>(&input);
2524        assert!(!stats.is_min_max_backwards_compatible());
2525        if let Statistics::Int96(stats) = stats {
2526            assert_eq!(stats.min_opt().unwrap(), &Int96::from(vec![0, 20, 30]));
2527            assert_eq!(stats.max_opt().unwrap(), &Int96::from(vec![3, 20, 10]));
2528        } else {
2529            panic!("expecting Statistics::Int96, got {stats:?}");
2530        }
2531    }
2532
2533    #[test]
2534    fn test_float_statistics() {
2535        let stats = statistics_roundtrip::<FloatType>(&[-1.0, 3.0, -2.0, 2.0]);
2536        assert!(stats.is_min_max_backwards_compatible());
2537        if let Statistics::Float(stats) = stats {
2538            assert_eq!(stats.min_opt().unwrap(), &-2.0);
2539            assert_eq!(stats.max_opt().unwrap(), &3.0);
2540        } else {
2541            panic!("expecting Statistics::Float, got {stats:?}");
2542        }
2543    }
2544
2545    #[test]
2546    fn test_double_statistics() {
2547        let stats = statistics_roundtrip::<DoubleType>(&[-1.0, 3.0, -2.0, 2.0]);
2548        assert!(stats.is_min_max_backwards_compatible());
2549        if let Statistics::Double(stats) = stats {
2550            assert_eq!(stats.min_opt().unwrap(), &-2.0);
2551            assert_eq!(stats.max_opt().unwrap(), &3.0);
2552        } else {
2553            panic!("expecting Statistics::Double, got {stats:?}");
2554        }
2555    }
2556
2557    #[test]
2558    fn test_byte_array_statistics() {
2559        let input = ["aawaa", "zz", "aaw", "m", "qrs"]
2560            .iter()
2561            .map(|&s| s.into())
2562            .collect::<Vec<_>>();
2563
2564        let stats = statistics_roundtrip::<ByteArrayType>(&input);
2565        assert!(!stats.is_min_max_backwards_compatible());
2566        if let Statistics::ByteArray(stats) = stats {
2567            assert_eq!(stats.min_opt().unwrap(), &ByteArray::from("aaw"));
2568            assert_eq!(stats.max_opt().unwrap(), &ByteArray::from("zz"));
2569        } else {
2570            panic!("expecting Statistics::ByteArray, got {stats:?}");
2571        }
2572    }
2573
2574    #[test]
2575    fn test_fixed_len_byte_array_statistics() {
2576        let input = ["aawaa", "zz   ", "aaw  ", "m    ", "qrs  "]
2577            .iter()
2578            .map(|&s| ByteArray::from(s).into())
2579            .collect::<Vec<_>>();
2580
2581        let stats = statistics_roundtrip::<FixedLenByteArrayType>(&input);
2582        assert!(!stats.is_min_max_backwards_compatible());
2583        if let Statistics::FixedLenByteArray(stats) = stats {
2584            let expected_min: FixedLenByteArray = ByteArray::from("aaw  ").into();
2585            assert_eq!(stats.min_opt().unwrap(), &expected_min);
2586            let expected_max: FixedLenByteArray = ByteArray::from("zz   ").into();
2587            assert_eq!(stats.max_opt().unwrap(), &expected_max);
2588        } else {
2589            panic!("expecting Statistics::FixedLenByteArray, got {stats:?}");
2590        }
2591    }
2592
2593    #[test]
2594    fn test_column_writer_check_float16_min_max() {
2595        let input = [
2596            -f16::ONE,
2597            f16::from_f32(3.0),
2598            -f16::from_f32(2.0),
2599            f16::from_f32(2.0),
2600        ]
2601        .into_iter()
2602        .map(|s| ByteArray::from(s).into())
2603        .collect::<Vec<_>>();
2604
2605        let stats = float16_statistics_roundtrip(&input);
2606        assert!(stats.is_min_max_backwards_compatible());
2607        assert_eq!(
2608            stats.min_opt().unwrap(),
2609            &ByteArray::from(-f16::from_f32(2.0))
2610        );
2611        assert_eq!(
2612            stats.max_opt().unwrap(),
2613            &ByteArray::from(f16::from_f32(3.0))
2614        );
2615    }
2616
2617    #[test]
2618    fn test_column_writer_check_float16_nan_middle() {
2619        let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
2620            .into_iter()
2621            .map(|s| ByteArray::from(s).into())
2622            .collect::<Vec<_>>();
2623
2624        let stats = float16_statistics_roundtrip(&input);
2625        assert!(stats.is_min_max_backwards_compatible());
2626        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2627        assert_eq!(
2628            stats.max_opt().unwrap(),
2629            &ByteArray::from(f16::ONE + f16::ONE)
2630        );
2631    }
2632
2633    #[test]
2634    fn test_float16_statistics_nan_middle() {
2635        let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
2636            .into_iter()
2637            .map(|s| ByteArray::from(s).into())
2638            .collect::<Vec<_>>();
2639
2640        let stats = float16_statistics_roundtrip(&input);
2641        assert!(stats.is_min_max_backwards_compatible());
2642        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2643        assert_eq!(
2644            stats.max_opt().unwrap(),
2645            &ByteArray::from(f16::ONE + f16::ONE)
2646        );
2647    }
2648
2649    #[test]
2650    fn test_float16_statistics_nan_start() {
2651        let input = [f16::NAN, f16::ONE, f16::ONE + f16::ONE]
2652            .into_iter()
2653            .map(|s| ByteArray::from(s).into())
2654            .collect::<Vec<_>>();
2655
2656        let stats = float16_statistics_roundtrip(&input);
2657        assert!(stats.is_min_max_backwards_compatible());
2658        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2659        assert_eq!(
2660            stats.max_opt().unwrap(),
2661            &ByteArray::from(f16::ONE + f16::ONE)
2662        );
2663    }
2664
2665    #[test]
2666    fn test_float16_statistics_nan_only() {
2667        let input = [f16::NAN, f16::NAN]
2668            .into_iter()
2669            .map(|s| ByteArray::from(s).into())
2670            .collect::<Vec<_>>();
2671
2672        let stats = float16_statistics_roundtrip(&input);
2673        assert!(stats.min_bytes_opt().is_none());
2674        assert!(stats.max_bytes_opt().is_none());
2675        assert!(stats.is_min_max_backwards_compatible());
2676    }
2677
2678    #[test]
2679    fn test_float16_statistics_zero_only() {
2680        let input = [f16::ZERO]
2681            .into_iter()
2682            .map(|s| ByteArray::from(s).into())
2683            .collect::<Vec<_>>();
2684
2685        let stats = float16_statistics_roundtrip(&input);
2686        assert!(stats.is_min_max_backwards_compatible());
2687        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2688        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2689    }
2690
2691    #[test]
2692    fn test_float16_statistics_neg_zero_only() {
2693        let input = [f16::NEG_ZERO]
2694            .into_iter()
2695            .map(|s| ByteArray::from(s).into())
2696            .collect::<Vec<_>>();
2697
2698        let stats = float16_statistics_roundtrip(&input);
2699        assert!(stats.is_min_max_backwards_compatible());
2700        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2701        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2702    }
2703
2704    #[test]
2705    fn test_float16_statistics_zero_min() {
2706        let input = [f16::ZERO, f16::ONE, f16::NAN, f16::PI]
2707            .into_iter()
2708            .map(|s| ByteArray::from(s).into())
2709            .collect::<Vec<_>>();
2710
2711        let stats = float16_statistics_roundtrip(&input);
2712        assert!(stats.is_min_max_backwards_compatible());
2713        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2714        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::PI));
2715    }
2716
2717    #[test]
2718    fn test_float16_statistics_neg_zero_max() {
2719        let input = [f16::NEG_ZERO, f16::NEG_ONE, f16::NAN, -f16::PI]
2720            .into_iter()
2721            .map(|s| ByteArray::from(s).into())
2722            .collect::<Vec<_>>();
2723
2724        let stats = float16_statistics_roundtrip(&input);
2725        assert!(stats.is_min_max_backwards_compatible());
2726        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(-f16::PI));
2727        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2728    }
2729
2730    #[test]
2731    fn test_float_statistics_nan_middle() {
2732        let stats = statistics_roundtrip::<FloatType>(&[1.0, f32::NAN, 2.0]);
2733        assert!(stats.is_min_max_backwards_compatible());
2734        if let Statistics::Float(stats) = stats {
2735            assert_eq!(stats.min_opt().unwrap(), &1.0);
2736            assert_eq!(stats.max_opt().unwrap(), &2.0);
2737        } else {
2738            panic!("expecting Statistics::Float");
2739        }
2740    }
2741
2742    #[test]
2743    fn test_float_statistics_nan_start() {
2744        let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, 1.0, 2.0]);
2745        assert!(stats.is_min_max_backwards_compatible());
2746        if let Statistics::Float(stats) = stats {
2747            assert_eq!(stats.min_opt().unwrap(), &1.0);
2748            assert_eq!(stats.max_opt().unwrap(), &2.0);
2749        } else {
2750            panic!("expecting Statistics::Float");
2751        }
2752    }
2753
2754    #[test]
2755    fn test_float_statistics_nan_only() {
2756        let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, f32::NAN]);
2757        assert!(stats.min_bytes_opt().is_none());
2758        assert!(stats.max_bytes_opt().is_none());
2759        assert!(stats.is_min_max_backwards_compatible());
2760        assert!(matches!(stats, Statistics::Float(_)));
2761    }
2762
2763    #[test]
2764    fn test_float_statistics_zero_only() {
2765        let stats = statistics_roundtrip::<FloatType>(&[0.0]);
2766        assert!(stats.is_min_max_backwards_compatible());
2767        if let Statistics::Float(stats) = stats {
2768            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2769            assert!(stats.min_opt().unwrap().is_sign_negative());
2770            assert_eq!(stats.max_opt().unwrap(), &0.0);
2771            assert!(stats.max_opt().unwrap().is_sign_positive());
2772        } else {
2773            panic!("expecting Statistics::Float");
2774        }
2775    }
2776
2777    #[test]
2778    fn test_float_statistics_neg_zero_only() {
2779        let stats = statistics_roundtrip::<FloatType>(&[-0.0]);
2780        assert!(stats.is_min_max_backwards_compatible());
2781        if let Statistics::Float(stats) = stats {
2782            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2783            assert!(stats.min_opt().unwrap().is_sign_negative());
2784            assert_eq!(stats.max_opt().unwrap(), &0.0);
2785            assert!(stats.max_opt().unwrap().is_sign_positive());
2786        } else {
2787            panic!("expecting Statistics::Float");
2788        }
2789    }
2790
2791    #[test]
2792    fn test_float_statistics_zero_min() {
2793        let stats = statistics_roundtrip::<FloatType>(&[0.0, 1.0, f32::NAN, 2.0]);
2794        assert!(stats.is_min_max_backwards_compatible());
2795        if let Statistics::Float(stats) = stats {
2796            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2797            assert!(stats.min_opt().unwrap().is_sign_negative());
2798            assert_eq!(stats.max_opt().unwrap(), &2.0);
2799        } else {
2800            panic!("expecting Statistics::Float");
2801        }
2802    }
2803
2804    #[test]
2805    fn test_float_statistics_neg_zero_max() {
2806        let stats = statistics_roundtrip::<FloatType>(&[-0.0, -1.0, f32::NAN, -2.0]);
2807        assert!(stats.is_min_max_backwards_compatible());
2808        if let Statistics::Float(stats) = stats {
2809            assert_eq!(stats.min_opt().unwrap(), &-2.0);
2810            assert_eq!(stats.max_opt().unwrap(), &0.0);
2811            assert!(stats.max_opt().unwrap().is_sign_positive());
2812        } else {
2813            panic!("expecting Statistics::Float");
2814        }
2815    }
2816
2817    #[test]
2818    fn test_double_statistics_nan_middle() {
2819        let stats = statistics_roundtrip::<DoubleType>(&[1.0, f64::NAN, 2.0]);
2820        assert!(stats.is_min_max_backwards_compatible());
2821        if let Statistics::Double(stats) = stats {
2822            assert_eq!(stats.min_opt().unwrap(), &1.0);
2823            assert_eq!(stats.max_opt().unwrap(), &2.0);
2824        } else {
2825            panic!("expecting Statistics::Double");
2826        }
2827    }
2828
2829    #[test]
2830    fn test_double_statistics_nan_start() {
2831        let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, 1.0, 2.0]);
2832        assert!(stats.is_min_max_backwards_compatible());
2833        if let Statistics::Double(stats) = stats {
2834            assert_eq!(stats.min_opt().unwrap(), &1.0);
2835            assert_eq!(stats.max_opt().unwrap(), &2.0);
2836        } else {
2837            panic!("expecting Statistics::Double");
2838        }
2839    }
2840
2841    #[test]
2842    fn test_double_statistics_nan_only() {
2843        let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, f64::NAN]);
2844        assert!(stats.min_bytes_opt().is_none());
2845        assert!(stats.max_bytes_opt().is_none());
2846        assert!(matches!(stats, Statistics::Double(_)));
2847        assert!(stats.is_min_max_backwards_compatible());
2848    }
2849
2850    #[test]
2851    fn test_double_statistics_zero_only() {
2852        let stats = statistics_roundtrip::<DoubleType>(&[0.0]);
2853        assert!(stats.is_min_max_backwards_compatible());
2854        if let Statistics::Double(stats) = stats {
2855            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2856            assert!(stats.min_opt().unwrap().is_sign_negative());
2857            assert_eq!(stats.max_opt().unwrap(), &0.0);
2858            assert!(stats.max_opt().unwrap().is_sign_positive());
2859        } else {
2860            panic!("expecting Statistics::Double");
2861        }
2862    }
2863
2864    #[test]
2865    fn test_double_statistics_neg_zero_only() {
2866        let stats = statistics_roundtrip::<DoubleType>(&[-0.0]);
2867        assert!(stats.is_min_max_backwards_compatible());
2868        if let Statistics::Double(stats) = stats {
2869            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2870            assert!(stats.min_opt().unwrap().is_sign_negative());
2871            assert_eq!(stats.max_opt().unwrap(), &0.0);
2872            assert!(stats.max_opt().unwrap().is_sign_positive());
2873        } else {
2874            panic!("expecting Statistics::Double");
2875        }
2876    }
2877
2878    #[test]
2879    fn test_double_statistics_zero_min() {
2880        let stats = statistics_roundtrip::<DoubleType>(&[0.0, 1.0, f64::NAN, 2.0]);
2881        assert!(stats.is_min_max_backwards_compatible());
2882        if let Statistics::Double(stats) = stats {
2883            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2884            assert!(stats.min_opt().unwrap().is_sign_negative());
2885            assert_eq!(stats.max_opt().unwrap(), &2.0);
2886        } else {
2887            panic!("expecting Statistics::Double");
2888        }
2889    }
2890
2891    #[test]
2892    fn test_double_statistics_neg_zero_max() {
2893        let stats = statistics_roundtrip::<DoubleType>(&[-0.0, -1.0, f64::NAN, -2.0]);
2894        assert!(stats.is_min_max_backwards_compatible());
2895        if let Statistics::Double(stats) = stats {
2896            assert_eq!(stats.min_opt().unwrap(), &-2.0);
2897            assert_eq!(stats.max_opt().unwrap(), &0.0);
2898            assert!(stats.max_opt().unwrap().is_sign_positive());
2899        } else {
2900            panic!("expecting Statistics::Double");
2901        }
2902    }
2903
2904    #[test]
2905    fn test_compare_greater_byte_array_decimals() {
2906        assert!(!compare_greater_byte_array_decimals(&[], &[],),);
2907        assert!(compare_greater_byte_array_decimals(&[1u8,], &[],),);
2908        assert!(!compare_greater_byte_array_decimals(&[], &[1u8,],),);
2909        assert!(compare_greater_byte_array_decimals(&[1u8,], &[0u8,],),);
2910        assert!(!compare_greater_byte_array_decimals(&[1u8,], &[1u8,],),);
2911        assert!(compare_greater_byte_array_decimals(&[1u8, 0u8,], &[0u8,],),);
2912        assert!(!compare_greater_byte_array_decimals(
2913            &[0u8, 1u8,],
2914            &[1u8, 0u8,],
2915        ),);
2916        assert!(!compare_greater_byte_array_decimals(
2917            &[255u8, 35u8, 0u8, 0u8,],
2918            &[0u8,],
2919        ),);
2920        assert!(compare_greater_byte_array_decimals(
2921            &[0u8,],
2922            &[255u8, 35u8, 0u8, 0u8,],
2923        ),);
2924    }
2925
2926    #[test]
2927    fn test_column_index_with_null_pages() {
2928        // write a single page of all nulls
2929        let page_writer = get_test_page_writer();
2930        let props = Default::default();
2931        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
2932        writer.write_batch(&[], Some(&[0, 0, 0, 0]), None).unwrap();
2933
2934        let r = writer.close().unwrap();
2935        assert!(r.column_index.is_some());
2936        let col_idx = r.column_index.unwrap();
2937        // null_pages should be true for page 0
2938        assert!(col_idx.null_pages[0]);
2939        // min and max should be empty byte arrays
2940        assert_eq!(col_idx.min_values[0].len(), 0);
2941        assert_eq!(col_idx.max_values[0].len(), 0);
2942        // null_counts should be defined and be 4 for page 0
2943        assert!(col_idx.null_counts.is_some());
2944        assert_eq!(col_idx.null_counts.as_ref().unwrap()[0], 4);
2945        // there is no repetition so rep histogram should be absent
2946        assert!(col_idx.repetition_level_histograms.is_none());
2947        // definition_level_histogram should be present and should be 0:4, 1:0
2948        assert!(col_idx.definition_level_histograms.is_some());
2949        assert_eq!(col_idx.definition_level_histograms.unwrap(), &[4, 0]);
2950    }
2951
2952    #[test]
2953    fn test_column_offset_index_metadata() {
2954        // write data
2955        // and check the offset index and column index
2956        let page_writer = get_test_page_writer();
2957        let props = Default::default();
2958        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2959        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2960        // first page
2961        writer.flush_data_pages().unwrap();
2962        // second page
2963        writer.write_batch(&[4, 8, 2, -5], None, None).unwrap();
2964
2965        let r = writer.close().unwrap();
2966        let column_index = r.column_index.unwrap();
2967        let offset_index = r.offset_index.unwrap();
2968
2969        assert_eq!(8, r.rows_written);
2970
2971        // column index
2972        assert_eq!(2, column_index.null_pages.len());
2973        assert_eq!(2, offset_index.page_locations.len());
2974        assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order);
2975        for idx in 0..2 {
2976            assert!(!column_index.null_pages[idx]);
2977            assert_eq!(0, column_index.null_counts.as_ref().unwrap()[idx]);
2978        }
2979
2980        if let Some(stats) = r.metadata.statistics() {
2981            assert_eq!(stats.null_count_opt(), Some(0));
2982            assert_eq!(stats.distinct_count_opt(), None);
2983            if let Statistics::Int32(stats) = stats {
2984                // first page is [1,2,3,4]
2985                // second page is [-5,2,4,8]
2986                // note that we don't increment here, as this is a non BinaryArray type.
2987                assert_eq!(
2988                    stats.min_bytes_opt(),
2989                    Some(column_index.min_values[1].as_slice())
2990                );
2991                assert_eq!(
2992                    stats.max_bytes_opt(),
2993                    column_index.max_values.get(1).map(Vec::as_slice)
2994                );
2995            } else {
2996                panic!("expecting Statistics::Int32");
2997            }
2998        } else {
2999            panic!("metadata missing statistics");
3000        }
3001
3002        // page location
3003        assert_eq!(0, offset_index.page_locations[0].first_row_index);
3004        assert_eq!(4, offset_index.page_locations[1].first_row_index);
3005    }
3006
3007    /// Verify min/max value truncation in the column index works as expected
3008    #[test]
3009    fn test_column_offset_index_metadata_truncating() {
3010        // write data
3011        // and check the offset index and column index
3012        let page_writer = get_test_page_writer();
3013        let props = WriterProperties::builder()
3014            .set_statistics_truncate_length(None) // disable column index truncation
3015            .build()
3016            .into();
3017        let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3018
3019        let mut data = vec![FixedLenByteArray::default(); 3];
3020        // This is the expected min value - "aaa..."
3021        data[0].set_data(Bytes::from(vec![97_u8; 200]));
3022        // This is the expected max value - "ZZZ..."
3023        data[1].set_data(Bytes::from(vec![112_u8; 200]));
3024        data[2].set_data(Bytes::from(vec![98_u8; 200]));
3025
3026        writer.write_batch(&data, None, None).unwrap();
3027
3028        writer.flush_data_pages().unwrap();
3029
3030        let r = writer.close().unwrap();
3031        let column_index = r.column_index.unwrap();
3032        let offset_index = r.offset_index.unwrap();
3033
3034        assert_eq!(3, r.rows_written);
3035
3036        // column index
3037        assert_eq!(1, column_index.null_pages.len());
3038        assert_eq!(1, offset_index.page_locations.len());
3039        assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
3040        assert!(!column_index.null_pages[0]);
3041        assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]);
3042
3043        if let Some(stats) = r.metadata.statistics() {
3044            assert_eq!(stats.null_count_opt(), Some(0));
3045            assert_eq!(stats.distinct_count_opt(), None);
3046            if let Statistics::FixedLenByteArray(stats) = stats {
3047                let column_index_min_value = &column_index.min_values[0];
3048                let column_index_max_value = &column_index.max_values[0];
3049
3050                // Column index stats are truncated, while the column chunk's aren't.
3051                assert_ne!(
3052                    stats.min_bytes_opt(),
3053                    Some(column_index_min_value.as_slice())
3054                );
3055                assert_ne!(
3056                    stats.max_bytes_opt(),
3057                    Some(column_index_max_value.as_slice())
3058                );
3059
3060                assert_eq!(
3061                    column_index_min_value.len(),
3062                    DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
3063                );
3064                assert_eq!(column_index_min_value.as_slice(), &[97_u8; 64]);
3065                assert_eq!(
3066                    column_index_max_value.len(),
3067                    DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
3068                );
3069
3070                // We expect the last byte to be incremented
3071                assert_eq!(
3072                    *column_index_max_value.last().unwrap(),
3073                    *column_index_max_value.first().unwrap() + 1
3074                );
3075            } else {
3076                panic!("expecting Statistics::FixedLenByteArray");
3077            }
3078        } else {
3079            panic!("metadata missing statistics");
3080        }
3081    }
3082
3083    #[test]
3084    fn test_column_offset_index_truncating_spec_example() {
3085        // write data
3086        // and check the offset index and column index
3087        let page_writer = get_test_page_writer();
3088
3089        // Truncate values at 1 byte
3090        let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3091        let props = Arc::new(builder.build());
3092        let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3093
3094        let mut data = vec![FixedLenByteArray::default(); 1];
3095        // This is the expected min value
3096        data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
3097
3098        writer.write_batch(&data, None, None).unwrap();
3099
3100        writer.flush_data_pages().unwrap();
3101
3102        let r = writer.close().unwrap();
3103        let column_index = r.column_index.unwrap();
3104        let offset_index = r.offset_index.unwrap();
3105
3106        assert_eq!(1, r.rows_written);
3107
3108        // column index
3109        assert_eq!(1, column_index.null_pages.len());
3110        assert_eq!(1, offset_index.page_locations.len());
3111        assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
3112        assert!(!column_index.null_pages[0]);
3113        assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]);
3114
3115        if let Some(stats) = r.metadata.statistics() {
3116            assert_eq!(stats.null_count_opt(), Some(0));
3117            assert_eq!(stats.distinct_count_opt(), None);
3118            if let Statistics::FixedLenByteArray(_stats) = stats {
3119                let column_index_min_value = &column_index.min_values[0];
3120                let column_index_max_value = &column_index.max_values[0];
3121
3122                assert_eq!(column_index_min_value.len(), 1);
3123                assert_eq!(column_index_max_value.len(), 1);
3124
3125                assert_eq!("B".as_bytes(), column_index_min_value.as_slice());
3126                assert_eq!("C".as_bytes(), column_index_max_value.as_slice());
3127
3128                assert_ne!(column_index_min_value, stats.min_bytes_opt().unwrap());
3129                assert_ne!(column_index_max_value, stats.max_bytes_opt().unwrap());
3130            } else {
3131                panic!("expecting Statistics::FixedLenByteArray");
3132            }
3133        } else {
3134            panic!("metadata missing statistics");
3135        }
3136    }
3137
3138    #[test]
3139    fn test_float16_min_max_no_truncation() {
3140        // Even if we set truncation to occur at 1 byte, we should not truncate for Float16
3141        let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3142        let props = Arc::new(builder.build());
3143        let page_writer = get_test_page_writer();
3144        let mut writer = get_test_float16_column_writer(page_writer, props);
3145
3146        let expected_value = f16::PI.to_le_bytes().to_vec();
3147        let data = vec![ByteArray::from(expected_value.clone()).into()];
3148        writer.write_batch(&data, None, None).unwrap();
3149        writer.flush_data_pages().unwrap();
3150
3151        let r = writer.close().unwrap();
3152
3153        // stats should still be written
3154        // ensure bytes weren't truncated for column index
3155        let column_index = r.column_index.unwrap();
3156        let column_index_min_bytes = column_index.min_values[0].as_slice();
3157        let column_index_max_bytes = column_index.max_values[0].as_slice();
3158        assert_eq!(expected_value, column_index_min_bytes);
3159        assert_eq!(expected_value, column_index_max_bytes);
3160
3161        // ensure bytes weren't truncated for statistics
3162        let stats = r.metadata.statistics().unwrap();
3163        if let Statistics::FixedLenByteArray(stats) = stats {
3164            let stats_min_bytes = stats.min_bytes_opt().unwrap();
3165            let stats_max_bytes = stats.max_bytes_opt().unwrap();
3166            assert_eq!(expected_value, stats_min_bytes);
3167            assert_eq!(expected_value, stats_max_bytes);
3168        } else {
3169            panic!("expecting Statistics::FixedLenByteArray");
3170        }
3171    }
3172
3173    #[test]
3174    fn test_decimal_min_max_no_truncation() {
3175        // Even if we set truncation to occur at 1 byte, we should not truncate for Decimal
3176        let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3177        let props = Arc::new(builder.build());
3178        let page_writer = get_test_page_writer();
3179        let mut writer =
3180            get_test_decimals_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3181
3182        let expected_value = vec![
3183            255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8, 35u8,
3184            231u8, 90u8, 0u8, 0u8,
3185        ];
3186        let data = vec![ByteArray::from(expected_value.clone()).into()];
3187        writer.write_batch(&data, None, None).unwrap();
3188        writer.flush_data_pages().unwrap();
3189
3190        let r = writer.close().unwrap();
3191
3192        // stats should still be written
3193        // ensure bytes weren't truncated for column index
3194        let column_index = r.column_index.unwrap();
3195        let column_index_min_bytes = column_index.min_values[0].as_slice();
3196        let column_index_max_bytes = column_index.max_values[0].as_slice();
3197        assert_eq!(expected_value, column_index_min_bytes);
3198        assert_eq!(expected_value, column_index_max_bytes);
3199
3200        // ensure bytes weren't truncated for statistics
3201        let stats = r.metadata.statistics().unwrap();
3202        if let Statistics::FixedLenByteArray(stats) = stats {
3203            let stats_min_bytes = stats.min_bytes_opt().unwrap();
3204            let stats_max_bytes = stats.max_bytes_opt().unwrap();
3205            assert_eq!(expected_value, stats_min_bytes);
3206            assert_eq!(expected_value, stats_max_bytes);
3207        } else {
3208            panic!("expecting Statistics::FixedLenByteArray");
3209        }
3210    }
3211
3212    #[test]
3213    fn test_statistics_truncating_byte_array_default() {
3214        let page_writer = get_test_page_writer();
3215
3216        // The default truncate length is 64 bytes
3217        let props = WriterProperties::builder().build().into();
3218        let mut writer = get_test_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
3219
3220        let mut data = vec![ByteArray::default(); 1];
3221        data[0].set_data(Bytes::from(String::from(
3222            "This string is longer than 64 bytes, so it will almost certainly be truncated.",
3223        )));
3224        writer.write_batch(&data, None, None).unwrap();
3225        writer.flush_data_pages().unwrap();
3226
3227        let r = writer.close().unwrap();
3228
3229        assert_eq!(1, r.rows_written);
3230
3231        let stats = r.metadata.statistics().expect("statistics");
3232        if let Statistics::ByteArray(_stats) = stats {
3233            let min_value = _stats.min_opt().unwrap();
3234            let max_value = _stats.max_opt().unwrap();
3235
3236            assert!(!_stats.min_is_exact());
3237            assert!(!_stats.max_is_exact());
3238
3239            let expected_len = 64;
3240            assert_eq!(min_value.len(), expected_len);
3241            assert_eq!(max_value.len(), expected_len);
3242
3243            let expected_min =
3244                "This string is longer than 64 bytes, so it will almost certainly".as_bytes();
3245            assert_eq!(expected_min, min_value.as_bytes());
3246            // note the max value is different from the min value: the last byte is incremented
3247            let expected_max =
3248                "This string is longer than 64 bytes, so it will almost certainlz".as_bytes();
3249            assert_eq!(expected_max, max_value.as_bytes());
3250        } else {
3251            panic!("expecting Statistics::ByteArray");
3252        }
3253    }
3254
3255    #[test]
3256    fn test_statistics_truncating_byte_array() {
3257        let page_writer = get_test_page_writer();
3258
3259        const TEST_TRUNCATE_LENGTH: usize = 1;
3260
3261        // Truncate values at 1 byte
3262        let builder =
3263            WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3264        let props = Arc::new(builder.build());
3265        let mut writer = get_test_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
3266
3267        let mut data = vec![ByteArray::default(); 1];
3268        // This is the expected min value
3269        data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
3270
3271        writer.write_batch(&data, None, None).unwrap();
3272
3273        writer.flush_data_pages().unwrap();
3274
3275        let r = writer.close().unwrap();
3276
3277        assert_eq!(1, r.rows_written);
3278
3279        let stats = r.metadata.statistics().expect("statistics");
3280        assert_eq!(stats.null_count_opt(), Some(0));
3281        assert_eq!(stats.distinct_count_opt(), None);
3282        if let Statistics::ByteArray(_stats) = stats {
3283            let min_value = _stats.min_opt().unwrap();
3284            let max_value = _stats.max_opt().unwrap();
3285
3286            assert!(!_stats.min_is_exact());
3287            assert!(!_stats.max_is_exact());
3288
3289            assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3290            assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3291
3292            assert_eq!("B".as_bytes(), min_value.as_bytes());
3293            assert_eq!("C".as_bytes(), max_value.as_bytes());
3294        } else {
3295            panic!("expecting Statistics::ByteArray");
3296        }
3297    }
3298
3299    #[test]
3300    fn test_statistics_truncating_fixed_len_byte_array() {
3301        let page_writer = get_test_page_writer();
3302
3303        const TEST_TRUNCATE_LENGTH: usize = 1;
3304
3305        // Truncate values at 1 byte
3306        let builder =
3307            WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3308        let props = Arc::new(builder.build());
3309        let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3310
3311        let mut data = vec![FixedLenByteArray::default(); 1];
3312
3313        const PSEUDO_DECIMAL_VALUE: i128 = 6541894651216648486512564456564654;
3314        const PSEUDO_DECIMAL_BYTES: [u8; 16] = PSEUDO_DECIMAL_VALUE.to_be_bytes();
3315
3316        const EXPECTED_MIN: [u8; TEST_TRUNCATE_LENGTH] = [PSEUDO_DECIMAL_BYTES[0]]; // parquet specifies big-endian order for decimals
3317        const EXPECTED_MAX: [u8; TEST_TRUNCATE_LENGTH] =
3318            [PSEUDO_DECIMAL_BYTES[0].overflowing_add(1).0];
3319
3320        // This is the expected min value
3321        data[0].set_data(Bytes::from(PSEUDO_DECIMAL_BYTES.as_slice()));
3322
3323        writer.write_batch(&data, None, None).unwrap();
3324
3325        writer.flush_data_pages().unwrap();
3326
3327        let r = writer.close().unwrap();
3328
3329        assert_eq!(1, r.rows_written);
3330
3331        let stats = r.metadata.statistics().expect("statistics");
3332        assert_eq!(stats.null_count_opt(), Some(0));
3333        assert_eq!(stats.distinct_count_opt(), None);
3334        if let Statistics::FixedLenByteArray(_stats) = stats {
3335            let min_value = _stats.min_opt().unwrap();
3336            let max_value = _stats.max_opt().unwrap();
3337
3338            assert!(!_stats.min_is_exact());
3339            assert!(!_stats.max_is_exact());
3340
3341            assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3342            assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3343
3344            assert_eq!(EXPECTED_MIN.as_slice(), min_value.as_bytes());
3345            assert_eq!(EXPECTED_MAX.as_slice(), max_value.as_bytes());
3346
3347            let reconstructed_min = i128::from_be_bytes([
3348                min_value.as_bytes()[0],
3349                0,
3350                0,
3351                0,
3352                0,
3353                0,
3354                0,
3355                0,
3356                0,
3357                0,
3358                0,
3359                0,
3360                0,
3361                0,
3362                0,
3363                0,
3364            ]);
3365
3366            let reconstructed_max = i128::from_be_bytes([
3367                max_value.as_bytes()[0],
3368                0,
3369                0,
3370                0,
3371                0,
3372                0,
3373                0,
3374                0,
3375                0,
3376                0,
3377                0,
3378                0,
3379                0,
3380                0,
3381                0,
3382                0,
3383            ]);
3384
3385            // check that the inner value is correctly bounded by the min/max
3386            println!("min: {reconstructed_min} {PSEUDO_DECIMAL_VALUE}");
3387            assert!(reconstructed_min <= PSEUDO_DECIMAL_VALUE);
3388            println!("max {reconstructed_max} {PSEUDO_DECIMAL_VALUE}");
3389            assert!(reconstructed_max >= PSEUDO_DECIMAL_VALUE);
3390        } else {
3391            panic!("expecting Statistics::FixedLenByteArray");
3392        }
3393    }
3394
3395    #[test]
3396    fn test_send() {
3397        fn test<T: Send>() {}
3398        test::<ColumnWriterImpl<Int32Type>>();
3399    }
3400
3401    #[test]
3402    fn test_increment() {
3403        let v = increment(vec![0, 0, 0]).unwrap();
3404        assert_eq!(&v, &[0, 0, 1]);
3405
3406        // Handle overflow
3407        let v = increment(vec![0, 255, 255]).unwrap();
3408        assert_eq!(&v, &[1, 0, 0]);
3409
3410        // Return `None` if all bytes are u8::MAX
3411        let v = increment(vec![255, 255, 255]);
3412        assert!(v.is_none());
3413    }
3414
3415    #[test]
3416    fn test_increment_utf8() {
3417        let test_inc = |o: &str, expected: &str| {
3418            if let Ok(v) = String::from_utf8(increment_utf8(o).unwrap()) {
3419                // Got the expected result...
3420                assert_eq!(v, expected);
3421                // and it's greater than the original string
3422                assert!(*v > *o);
3423                // Also show that BinaryArray level comparison works here
3424                let mut greater = ByteArray::new();
3425                greater.set_data(Bytes::from(v));
3426                let mut original = ByteArray::new();
3427                original.set_data(Bytes::from(o.as_bytes().to_vec()));
3428                assert!(greater > original);
3429            } else {
3430                panic!("Expected incremented UTF8 string to also be valid.");
3431            }
3432        };
3433
3434        // Basic ASCII case
3435        test_inc("hello", "hellp");
3436
3437        // 1-byte ending in max 1-byte
3438        test_inc("a\u{7f}", "b");
3439
3440        // 1-byte max should not truncate as it would need 2-byte code points
3441        assert!(increment_utf8("\u{7f}\u{7f}").is_none());
3442
3443        // UTF8 string
3444        test_inc("❤️🧡💛💚💙💜", "❤️🧡💛💚💙💝");
3445
3446        // 2-byte without overflow
3447        test_inc("éééé", "éééê");
3448
3449        // 2-byte that overflows lowest byte
3450        test_inc("\u{ff}\u{ff}", "\u{ff}\u{100}");
3451
3452        // 2-byte ending in max 2-byte
3453        test_inc("a\u{7ff}", "b");
3454
3455        // Max 2-byte should not truncate as it would need 3-byte code points
3456        assert!(increment_utf8("\u{7ff}\u{7ff}").is_none());
3457
3458        // 3-byte without overflow [U+800, U+800] -> [U+800, U+801] (note that these
3459        // characters should render right to left).
3460        test_inc("ࠀࠀ", "ࠀࠁ");
3461
3462        // 3-byte ending in max 3-byte
3463        test_inc("a\u{ffff}", "b");
3464
3465        // Max 3-byte should not truncate as it would need 4-byte code points
3466        assert!(increment_utf8("\u{ffff}\u{ffff}").is_none());
3467
3468        // 4-byte without overflow
3469        test_inc("𐀀𐀀", "𐀀𐀁");
3470
3471        // 4-byte ending in max unicode
3472        test_inc("a\u{10ffff}", "b");
3473
3474        // Max 4-byte should not truncate
3475        assert!(increment_utf8("\u{10ffff}\u{10ffff}").is_none());
3476
3477        // Skip over surrogate pair range (0xD800..=0xDFFF)
3478        //test_inc("a\u{D7FF}", "a\u{e000}");
3479        test_inc("a\u{D7FF}", "b");
3480    }
3481
3482    #[test]
3483    fn test_truncate_utf8() {
3484        // No-op
3485        let data = "❤️🧡💛💚💙💜";
3486        let r = truncate_utf8(data, data.len()).unwrap();
3487        assert_eq!(r.len(), data.len());
3488        assert_eq!(&r, data.as_bytes());
3489
3490        // We slice it away from the UTF8 boundary
3491        let r = truncate_utf8(data, 13).unwrap();
3492        assert_eq!(r.len(), 10);
3493        assert_eq!(&r, "❤️🧡".as_bytes());
3494
3495        // One multi-byte code point, and a length shorter than it, so we can't slice it
3496        let r = truncate_utf8("\u{0836}", 1);
3497        assert!(r.is_none());
3498
3499        // Test truncate and increment for max bounds on UTF-8 statistics
3500        // 7-bit (i.e. ASCII)
3501        let r = truncate_and_increment_utf8("yyyyyyyyy", 8).unwrap();
3502        assert_eq!(&r, "yyyyyyyz".as_bytes());
3503
3504        // 2-byte without overflow
3505        let r = truncate_and_increment_utf8("ééééé", 7).unwrap();
3506        assert_eq!(&r, "ééê".as_bytes());
3507
3508        // 2-byte that overflows lowest byte
3509        let r = truncate_and_increment_utf8("\u{ff}\u{ff}\u{ff}\u{ff}\u{ff}", 8).unwrap();
3510        assert_eq!(&r, "\u{ff}\u{ff}\u{ff}\u{100}".as_bytes());
3511
3512        // max 2-byte should not truncate as it would need 3-byte code points
3513        let r = truncate_and_increment_utf8("߿߿߿߿߿", 8);
3514        assert!(r.is_none());
3515
3516        // 3-byte without overflow [U+800, U+800, U+800] -> [U+800, U+801] (note that these
3517        // characters should render right to left).
3518        let r = truncate_and_increment_utf8("ࠀࠀࠀࠀ", 8).unwrap();
3519        assert_eq!(&r, "ࠀࠁ".as_bytes());
3520
3521        // max 3-byte should not truncate as it would need 4-byte code points
3522        let r = truncate_and_increment_utf8("\u{ffff}\u{ffff}\u{ffff}", 8);
3523        assert!(r.is_none());
3524
3525        // 4-byte without overflow
3526        let r = truncate_and_increment_utf8("𐀀𐀀𐀀𐀀", 9).unwrap();
3527        assert_eq!(&r, "𐀀𐀁".as_bytes());
3528
3529        // max 4-byte should not truncate
3530        let r = truncate_and_increment_utf8("\u{10ffff}\u{10ffff}", 8);
3531        assert!(r.is_none());
3532    }
3533
3534    #[test]
3535    // Check fallback truncation of statistics that should be UTF-8, but aren't
3536    // (see https://github.com/apache/arrow-rs/pull/6870).
3537    fn test_byte_array_truncate_invalid_utf8_statistics() {
3538        let message_type = "
3539            message test_schema {
3540                OPTIONAL BYTE_ARRAY a (UTF8);
3541            }
3542        ";
3543        let schema = Arc::new(parse_message_type(message_type).unwrap());
3544
3545        // Create Vec<ByteArray> containing non-UTF8 bytes
3546        let data = vec![ByteArray::from(vec![128u8; 32]); 7];
3547        let def_levels = [1, 1, 1, 1, 0, 1, 0, 1, 0, 1];
3548        let file: File = tempfile::tempfile().unwrap();
3549        let props = Arc::new(
3550            WriterProperties::builder()
3551                .set_statistics_enabled(EnabledStatistics::Chunk)
3552                .set_statistics_truncate_length(Some(8))
3553                .build(),
3554        );
3555
3556        let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
3557        let mut row_group_writer = writer.next_row_group().unwrap();
3558
3559        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
3560        col_writer
3561            .typed::<ByteArrayType>()
3562            .write_batch(&data, Some(&def_levels), None)
3563            .unwrap();
3564        col_writer.close().unwrap();
3565        row_group_writer.close().unwrap();
3566        let file_metadata = writer.close().unwrap();
3567        assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
3568        let stats = file_metadata.row_groups[0].columns[0]
3569            .meta_data
3570            .as_ref()
3571            .unwrap()
3572            .statistics
3573            .as_ref()
3574            .unwrap();
3575        assert!(!stats.is_max_value_exact.unwrap());
3576        // Truncation of invalid UTF-8 should fall back to binary truncation, so last byte should
3577        // be incremented by 1.
3578        assert_eq!(
3579            stats.max_value,
3580            Some([128, 128, 128, 128, 128, 128, 128, 129].to_vec())
3581        );
3582    }
3583
3584    #[test]
3585    fn test_increment_max_binary_chars() {
3586        let r = increment(vec![0xFF, 0xFE, 0xFD, 0xFF, 0xFF]);
3587        assert_eq!(&r.unwrap(), &[0xFF, 0xFE, 0xFE, 0x00, 0x00]);
3588
3589        let incremented = increment(vec![0xFF, 0xFF, 0xFF]);
3590        assert!(incremented.is_none())
3591    }
3592
3593    #[test]
3594    fn test_no_column_index_when_stats_disabled() {
3595        // https://github.com/apache/arrow-rs/issues/6010
3596        // Test that column index is not created/written for all-nulls column when page
3597        // statistics are disabled.
3598        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3599        let props = Arc::new(
3600            WriterProperties::builder()
3601                .set_statistics_enabled(EnabledStatistics::None)
3602                .build(),
3603        );
3604        let column_writer = get_column_writer(descr, props, get_test_page_writer());
3605        let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3606
3607        let data = Vec::new();
3608        let def_levels = vec![0; 10];
3609        writer.write_batch(&data, Some(&def_levels), None).unwrap();
3610        writer.flush_data_pages().unwrap();
3611
3612        let column_close_result = writer.close().unwrap();
3613        assert!(column_close_result.offset_index.is_some());
3614        assert!(column_close_result.column_index.is_none());
3615    }
3616
3617    #[test]
3618    fn test_no_offset_index_when_disabled() {
3619        // Test that offset indexes can be disabled
3620        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3621        let props = Arc::new(
3622            WriterProperties::builder()
3623                .set_statistics_enabled(EnabledStatistics::None)
3624                .set_offset_index_disabled(true)
3625                .build(),
3626        );
3627        let column_writer = get_column_writer(descr, props, get_test_page_writer());
3628        let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3629
3630        let data = Vec::new();
3631        let def_levels = vec![0; 10];
3632        writer.write_batch(&data, Some(&def_levels), None).unwrap();
3633        writer.flush_data_pages().unwrap();
3634
3635        let column_close_result = writer.close().unwrap();
3636        assert!(column_close_result.offset_index.is_none());
3637        assert!(column_close_result.column_index.is_none());
3638    }
3639
3640    #[test]
3641    fn test_offset_index_overridden() {
3642        // Test that offset indexes are not disabled when gathering page statistics
3643        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3644        let props = Arc::new(
3645            WriterProperties::builder()
3646                .set_statistics_enabled(EnabledStatistics::Page)
3647                .set_offset_index_disabled(true)
3648                .build(),
3649        );
3650        let column_writer = get_column_writer(descr, props, get_test_page_writer());
3651        let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3652
3653        let data = Vec::new();
3654        let def_levels = vec![0; 10];
3655        writer.write_batch(&data, Some(&def_levels), None).unwrap();
3656        writer.flush_data_pages().unwrap();
3657
3658        let column_close_result = writer.close().unwrap();
3659        assert!(column_close_result.offset_index.is_some());
3660        assert!(column_close_result.column_index.is_some());
3661    }
3662
3663    #[test]
3664    fn test_boundary_order() -> Result<()> {
3665        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3666        // min max both ascending
3667        let column_close_result = write_multiple_pages::<Int32Type>(
3668            &descr,
3669            &[
3670                &[Some(-10), Some(10)],
3671                &[Some(-5), Some(11)],
3672                &[None],
3673                &[Some(-5), Some(11)],
3674            ],
3675        )?;
3676        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3677        assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3678
3679        // min max both descending
3680        let column_close_result = write_multiple_pages::<Int32Type>(
3681            &descr,
3682            &[
3683                &[Some(10), Some(11)],
3684                &[Some(5), Some(11)],
3685                &[None],
3686                &[Some(-5), Some(0)],
3687            ],
3688        )?;
3689        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3690        assert_eq!(boundary_order, BoundaryOrder::DESCENDING);
3691
3692        // min max both equal
3693        let column_close_result = write_multiple_pages::<Int32Type>(
3694            &descr,
3695            &[&[Some(10), Some(11)], &[None], &[Some(10), Some(11)]],
3696        )?;
3697        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3698        assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3699
3700        // only nulls
3701        let column_close_result =
3702            write_multiple_pages::<Int32Type>(&descr, &[&[None], &[None], &[None]])?;
3703        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3704        assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3705
3706        // one page
3707        let column_close_result =
3708            write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)]])?;
3709        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3710        assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3711
3712        // one non-null page
3713        let column_close_result =
3714            write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)], &[None]])?;
3715        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3716        assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3717
3718        // min max both unordered
3719        let column_close_result = write_multiple_pages::<Int32Type>(
3720            &descr,
3721            &[
3722                &[Some(10), Some(11)],
3723                &[Some(11), Some(16)],
3724                &[None],
3725                &[Some(-5), Some(0)],
3726            ],
3727        )?;
3728        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3729        assert_eq!(boundary_order, BoundaryOrder::UNORDERED);
3730
3731        // min max both ordered in different orders
3732        let column_close_result = write_multiple_pages::<Int32Type>(
3733            &descr,
3734            &[
3735                &[Some(1), Some(9)],
3736                &[Some(2), Some(8)],
3737                &[None],
3738                &[Some(3), Some(7)],
3739            ],
3740        )?;
3741        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3742        assert_eq!(boundary_order, BoundaryOrder::UNORDERED);
3743
3744        Ok(())
3745    }
3746
3747    #[test]
3748    fn test_boundary_order_logical_type() -> Result<()> {
3749        // ensure that logical types account for different sort order than underlying
3750        // physical type representation
3751        let f16_descr = Arc::new(get_test_float16_column_descr(1, 0));
3752        let fba_descr = {
3753            let tpe = SchemaType::primitive_type_builder(
3754                "col",
3755                FixedLenByteArrayType::get_physical_type(),
3756            )
3757            .with_length(2)
3758            .build()?;
3759            Arc::new(ColumnDescriptor::new(
3760                Arc::new(tpe),
3761                1,
3762                0,
3763                ColumnPath::from("col"),
3764            ))
3765        };
3766
3767        let values: &[&[Option<FixedLenByteArray>]] = &[
3768            &[Some(FixedLenByteArray::from(ByteArray::from(f16::ONE)))],
3769            &[Some(FixedLenByteArray::from(ByteArray::from(f16::ZERO)))],
3770            &[Some(FixedLenByteArray::from(ByteArray::from(
3771                f16::NEG_ZERO,
3772            )))],
3773            &[Some(FixedLenByteArray::from(ByteArray::from(f16::NEG_ONE)))],
3774        ];
3775
3776        // f16 descending
3777        let column_close_result =
3778            write_multiple_pages::<FixedLenByteArrayType>(&f16_descr, values)?;
3779        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3780        assert_eq!(boundary_order, BoundaryOrder::DESCENDING);
3781
3782        // same bytes, but fba unordered
3783        let column_close_result =
3784            write_multiple_pages::<FixedLenByteArrayType>(&fba_descr, values)?;
3785        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3786        assert_eq!(boundary_order, BoundaryOrder::UNORDERED);
3787
3788        Ok(())
3789    }
3790
3791    #[test]
3792    fn test_interval_stats_should_not_have_min_max() {
3793        let input = [
3794            vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
3795            vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
3796            vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2],
3797        ]
3798        .into_iter()
3799        .map(|s| ByteArray::from(s).into())
3800        .collect::<Vec<_>>();
3801
3802        let page_writer = get_test_page_writer();
3803        let mut writer = get_test_interval_column_writer(page_writer);
3804        writer.write_batch(&input, None, None).unwrap();
3805
3806        let metadata = writer.close().unwrap().metadata;
3807        let stats = if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
3808            stats.clone()
3809        } else {
3810            panic!("metadata missing statistics");
3811        };
3812        assert!(stats.min_bytes_opt().is_none());
3813        assert!(stats.max_bytes_opt().is_none());
3814    }
3815
3816    #[test]
3817    #[cfg(feature = "arrow")]
3818    fn test_column_writer_get_estimated_total_bytes() {
3819        let page_writer = get_test_page_writer();
3820        let props = Default::default();
3821        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
3822        assert_eq!(writer.get_estimated_total_bytes(), 0);
3823
3824        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
3825        writer.add_data_page().unwrap();
3826        let size_with_one_page = writer.get_estimated_total_bytes();
3827        assert_eq!(size_with_one_page, 20);
3828
3829        writer.write_batch(&[5, 6, 7, 8], None, None).unwrap();
3830        writer.add_data_page().unwrap();
3831        let size_with_two_pages = writer.get_estimated_total_bytes();
3832        // different pages have different compressed lengths
3833        assert_eq!(size_with_two_pages, 20 + 21);
3834    }
3835
3836    fn write_multiple_pages<T: DataType>(
3837        column_descr: &Arc<ColumnDescriptor>,
3838        pages: &[&[Option<T::T>]],
3839    ) -> Result<ColumnCloseResult> {
3840        let column_writer = get_column_writer(
3841            column_descr.clone(),
3842            Default::default(),
3843            get_test_page_writer(),
3844        );
3845        let mut writer = get_typed_column_writer::<T>(column_writer);
3846
3847        for &page in pages {
3848            let values = page.iter().filter_map(Clone::clone).collect::<Vec<_>>();
3849            let def_levels = page
3850                .iter()
3851                .map(|maybe_value| if maybe_value.is_some() { 1 } else { 0 })
3852                .collect::<Vec<_>>();
3853            writer.write_batch(&values, Some(&def_levels), None)?;
3854            writer.flush_data_pages()?;
3855        }
3856
3857        writer.close()
3858    }
3859
3860    /// Performs write-read roundtrip with randomly generated values and levels.
3861    /// `max_size` is maximum number of values or levels (if `max_def_level` > 0) to write
3862    /// for a column.
3863    fn column_roundtrip_random<T: DataType>(
3864        props: WriterProperties,
3865        max_size: usize,
3866        min_value: T::T,
3867        max_value: T::T,
3868        max_def_level: i16,
3869        max_rep_level: i16,
3870    ) where
3871        T::T: PartialOrd + SampleUniform + Copy,
3872    {
3873        let mut num_values: usize = 0;
3874
3875        let mut buf: Vec<i16> = Vec::new();
3876        let def_levels = if max_def_level > 0 {
3877            random_numbers_range(max_size, 0, max_def_level + 1, &mut buf);
3878            for &dl in &buf[..] {
3879                if dl == max_def_level {
3880                    num_values += 1;
3881                }
3882            }
3883            Some(&buf[..])
3884        } else {
3885            num_values = max_size;
3886            None
3887        };
3888
3889        let mut buf: Vec<i16> = Vec::new();
3890        let rep_levels = if max_rep_level > 0 {
3891            random_numbers_range(max_size, 0, max_rep_level + 1, &mut buf);
3892            buf[0] = 0; // Must start on record boundary
3893            Some(&buf[..])
3894        } else {
3895            None
3896        };
3897
3898        let mut values: Vec<T::T> = Vec::new();
3899        random_numbers_range(num_values, min_value, max_value, &mut values);
3900
3901        column_roundtrip::<T>(props, &values[..], def_levels, rep_levels);
3902    }
3903
3904    /// Performs write-read roundtrip and asserts written values and levels.
3905    fn column_roundtrip<T: DataType>(
3906        props: WriterProperties,
3907        values: &[T::T],
3908        def_levels: Option<&[i16]>,
3909        rep_levels: Option<&[i16]>,
3910    ) {
3911        let mut file = tempfile::tempfile().unwrap();
3912        let mut write = TrackedWrite::new(&mut file);
3913        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
3914
3915        let max_def_level = match def_levels {
3916            Some(buf) => *buf.iter().max().unwrap_or(&0i16),
3917            None => 0i16,
3918        };
3919
3920        let max_rep_level = match rep_levels {
3921            Some(buf) => *buf.iter().max().unwrap_or(&0i16),
3922            None => 0i16,
3923        };
3924
3925        let mut max_batch_size = values.len();
3926        if let Some(levels) = def_levels {
3927            max_batch_size = max_batch_size.max(levels.len());
3928        }
3929        if let Some(levels) = rep_levels {
3930            max_batch_size = max_batch_size.max(levels.len());
3931        }
3932
3933        let mut writer =
3934            get_test_column_writer::<T>(page_writer, max_def_level, max_rep_level, Arc::new(props));
3935
3936        let values_written = writer.write_batch(values, def_levels, rep_levels).unwrap();
3937        assert_eq!(values_written, values.len());
3938        let result = writer.close().unwrap();
3939
3940        drop(write);
3941
3942        let props = ReaderProperties::builder()
3943            .set_backward_compatible_lz4(false)
3944            .build();
3945        let page_reader = Box::new(
3946            SerializedPageReader::new_with_properties(
3947                Arc::new(file),
3948                &result.metadata,
3949                result.rows_written as usize,
3950                None,
3951                Arc::new(props),
3952            )
3953            .unwrap(),
3954        );
3955        let mut reader = get_test_column_reader::<T>(page_reader, max_def_level, max_rep_level);
3956
3957        let mut actual_values = Vec::with_capacity(max_batch_size);
3958        let mut actual_def_levels = def_levels.map(|_| Vec::with_capacity(max_batch_size));
3959        let mut actual_rep_levels = rep_levels.map(|_| Vec::with_capacity(max_batch_size));
3960
3961        let (_, values_read, levels_read) = reader
3962            .read_records(
3963                max_batch_size,
3964                actual_def_levels.as_mut(),
3965                actual_rep_levels.as_mut(),
3966                &mut actual_values,
3967            )
3968            .unwrap();
3969
3970        // Assert values, definition and repetition levels.
3971
3972        assert_eq!(&actual_values[..values_read], values);
3973        match actual_def_levels {
3974            Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), def_levels),
3975            None => assert_eq!(None, def_levels),
3976        }
3977        match actual_rep_levels {
3978            Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), rep_levels),
3979            None => assert_eq!(None, rep_levels),
3980        }
3981
3982        // Assert written rows.
3983
3984        if let Some(levels) = actual_rep_levels {
3985            let mut actual_rows_written = 0;
3986            for l in levels {
3987                if l == 0 {
3988                    actual_rows_written += 1;
3989                }
3990            }
3991            assert_eq!(actual_rows_written, result.rows_written);
3992        } else if actual_def_levels.is_some() {
3993            assert_eq!(levels_read as u64, result.rows_written);
3994        } else {
3995            assert_eq!(values_read as u64, result.rows_written);
3996        }
3997    }
3998
3999    /// Performs write of provided values and returns column metadata of those values.
4000    /// Used to test encoding support for column writer.
4001    fn column_write_and_get_metadata<T: DataType>(
4002        props: WriterProperties,
4003        values: &[T::T],
4004    ) -> ColumnChunkMetaData {
4005        let page_writer = get_test_page_writer();
4006        let props = Arc::new(props);
4007        let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
4008        writer.write_batch(values, None, None).unwrap();
4009        writer.close().unwrap().metadata
4010    }
4011
4012    // Helper function to more compactly create a PageEncodingStats struct.
4013    fn encoding_stats(page_type: PageType, encoding: Encoding, count: i32) -> PageEncodingStats {
4014        PageEncodingStats {
4015            page_type,
4016            encoding,
4017            count,
4018        }
4019    }
4020
4021    // Function to use in tests for EncodingWriteSupport. This checks that dictionary
4022    // offset and encodings to make sure that column writer uses provided by trait
4023    // encodings.
4024    fn check_encoding_write_support<T: DataType>(
4025        version: WriterVersion,
4026        dict_enabled: bool,
4027        data: &[T::T],
4028        dictionary_page_offset: Option<i64>,
4029        encodings: &[Encoding],
4030        page_encoding_stats: &[PageEncodingStats],
4031    ) {
4032        let props = WriterProperties::builder()
4033            .set_writer_version(version)
4034            .set_dictionary_enabled(dict_enabled)
4035            .build();
4036        let meta = column_write_and_get_metadata::<T>(props, data);
4037        assert_eq!(meta.dictionary_page_offset(), dictionary_page_offset);
4038        assert_eq!(meta.encodings(), encodings);
4039        assert_eq!(meta.page_encoding_stats().unwrap(), page_encoding_stats);
4040    }
4041
4042    /// Returns column writer.
4043    fn get_test_column_writer<'a, T: DataType>(
4044        page_writer: Box<dyn PageWriter + 'a>,
4045        max_def_level: i16,
4046        max_rep_level: i16,
4047        props: WriterPropertiesPtr,
4048    ) -> ColumnWriterImpl<'a, T> {
4049        let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
4050        let column_writer = get_column_writer(descr, props, page_writer);
4051        get_typed_column_writer::<T>(column_writer)
4052    }
4053
4054    /// Returns column reader.
4055    fn get_test_column_reader<T: DataType>(
4056        page_reader: Box<dyn PageReader>,
4057        max_def_level: i16,
4058        max_rep_level: i16,
4059    ) -> ColumnReaderImpl<T> {
4060        let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
4061        let column_reader = get_column_reader(descr, page_reader);
4062        get_typed_column_reader::<T>(column_reader)
4063    }
4064
4065    /// Returns descriptor for primitive column.
4066    fn get_test_column_descr<T: DataType>(
4067        max_def_level: i16,
4068        max_rep_level: i16,
4069    ) -> ColumnDescriptor {
4070        let path = ColumnPath::from("col");
4071        let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4072            // length is set for "encoding support" tests for FIXED_LEN_BYTE_ARRAY type,
4073            // it should be no-op for other types
4074            .with_length(1)
4075            .build()
4076            .unwrap();
4077        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4078    }
4079
4080    /// Returns page writer that collects pages without serializing them.
4081    fn get_test_page_writer() -> Box<dyn PageWriter> {
4082        Box::new(TestPageWriter {})
4083    }
4084
4085    struct TestPageWriter {}
4086
4087    impl PageWriter for TestPageWriter {
4088        fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
4089            let mut res = PageWriteSpec::new();
4090            res.page_type = page.page_type();
4091            res.uncompressed_size = page.uncompressed_size();
4092            res.compressed_size = page.compressed_size();
4093            res.num_values = page.num_values();
4094            res.offset = 0;
4095            res.bytes_written = page.data().len() as u64;
4096            Ok(res)
4097        }
4098
4099        fn close(&mut self) -> Result<()> {
4100            Ok(())
4101        }
4102    }
4103
4104    /// Write data into parquet using [`get_test_page_writer`] and [`get_test_column_writer`] and returns generated statistics.
4105    fn statistics_roundtrip<T: DataType>(values: &[<T as DataType>::T]) -> Statistics {
4106        let page_writer = get_test_page_writer();
4107        let props = Default::default();
4108        let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
4109        writer.write_batch(values, None, None).unwrap();
4110
4111        let metadata = writer.close().unwrap().metadata;
4112        if let Some(stats) = metadata.statistics() {
4113            stats.clone()
4114        } else {
4115            panic!("metadata missing statistics");
4116        }
4117    }
4118
4119    /// Returns Decimals column writer.
4120    fn get_test_decimals_column_writer<T: DataType>(
4121        page_writer: Box<dyn PageWriter>,
4122        max_def_level: i16,
4123        max_rep_level: i16,
4124        props: WriterPropertiesPtr,
4125    ) -> ColumnWriterImpl<'static, T> {
4126        let descr = Arc::new(get_test_decimals_column_descr::<T>(
4127            max_def_level,
4128            max_rep_level,
4129        ));
4130        let column_writer = get_column_writer(descr, props, page_writer);
4131        get_typed_column_writer::<T>(column_writer)
4132    }
4133
4134    /// Returns descriptor for Decimal type with primitive column.
4135    fn get_test_decimals_column_descr<T: DataType>(
4136        max_def_level: i16,
4137        max_rep_level: i16,
4138    ) -> ColumnDescriptor {
4139        let path = ColumnPath::from("col");
4140        let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4141            .with_length(16)
4142            .with_logical_type(Some(LogicalType::Decimal {
4143                scale: 2,
4144                precision: 3,
4145            }))
4146            .with_scale(2)
4147            .with_precision(3)
4148            .build()
4149            .unwrap();
4150        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4151    }
4152
4153    fn float16_statistics_roundtrip(
4154        values: &[FixedLenByteArray],
4155    ) -> ValueStatistics<FixedLenByteArray> {
4156        let page_writer = get_test_page_writer();
4157        let mut writer = get_test_float16_column_writer(page_writer, Default::default());
4158        writer.write_batch(values, None, None).unwrap();
4159
4160        let metadata = writer.close().unwrap().metadata;
4161        if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
4162            stats.clone()
4163        } else {
4164            panic!("metadata missing statistics");
4165        }
4166    }
4167
4168    fn get_test_float16_column_writer(
4169        page_writer: Box<dyn PageWriter>,
4170        props: WriterPropertiesPtr,
4171    ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
4172        let descr = Arc::new(get_test_float16_column_descr(0, 0));
4173        let column_writer = get_column_writer(descr, props, page_writer);
4174        get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
4175    }
4176
4177    fn get_test_float16_column_descr(max_def_level: i16, max_rep_level: i16) -> ColumnDescriptor {
4178        let path = ColumnPath::from("col");
4179        let tpe =
4180            SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
4181                .with_length(2)
4182                .with_logical_type(Some(LogicalType::Float16))
4183                .build()
4184                .unwrap();
4185        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4186    }
4187
4188    fn get_test_interval_column_writer(
4189        page_writer: Box<dyn PageWriter>,
4190    ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
4191        let descr = Arc::new(get_test_interval_column_descr());
4192        let column_writer = get_column_writer(descr, Default::default(), page_writer);
4193        get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
4194    }
4195
4196    fn get_test_interval_column_descr() -> ColumnDescriptor {
4197        let path = ColumnPath::from("col");
4198        let tpe =
4199            SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
4200                .with_length(12)
4201                .with_converted_type(ConvertedType::INTERVAL)
4202                .build()
4203                .unwrap();
4204        ColumnDescriptor::new(Arc::new(tpe), 0, 0, path)
4205    }
4206
4207    /// Returns column writer for UINT32 Column provided as ConvertedType only
4208    fn get_test_unsigned_int_given_as_converted_column_writer<'a, T: DataType>(
4209        page_writer: Box<dyn PageWriter + 'a>,
4210        max_def_level: i16,
4211        max_rep_level: i16,
4212        props: WriterPropertiesPtr,
4213    ) -> ColumnWriterImpl<'a, T> {
4214        let descr = Arc::new(get_test_converted_type_unsigned_integer_column_descr::<T>(
4215            max_def_level,
4216            max_rep_level,
4217        ));
4218        let column_writer = get_column_writer(descr, props, page_writer);
4219        get_typed_column_writer::<T>(column_writer)
4220    }
4221
4222    /// Returns column descriptor for UINT32 Column provided as ConvertedType only
4223    fn get_test_converted_type_unsigned_integer_column_descr<T: DataType>(
4224        max_def_level: i16,
4225        max_rep_level: i16,
4226    ) -> ColumnDescriptor {
4227        let path = ColumnPath::from("col");
4228        let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4229            .with_converted_type(ConvertedType::UINT_32)
4230            .build()
4231            .unwrap();
4232        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4233    }
4234}