parquet/column/writer/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains column writer API.
19
20use bytes::Bytes;
21use half::f16;
22
23use crate::bloom_filter::Sbbf;
24use crate::format::{BoundaryOrder, ColumnIndex, OffsetIndex};
25use std::collections::{BTreeSet, VecDeque};
26use std::str;
27
28use crate::basic::{Compression, ConvertedType, Encoding, LogicalType, PageType, Type};
29use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter};
30use crate::column::writer::encoder::{ColumnValueEncoder, ColumnValueEncoderImpl, ColumnValues};
31use crate::compression::{create_codec, Codec, CodecOptionsBuilder};
32use crate::data_type::private::ParquetValueType;
33use crate::data_type::*;
34use crate::encodings::levels::LevelEncoder;
35#[cfg(feature = "encryption")]
36use crate::encryption::encrypt::get_column_crypto_metadata;
37use crate::errors::{ParquetError, Result};
38use crate::file::metadata::{
39    ColumnChunkMetaData, ColumnChunkMetaDataBuilder, ColumnIndexBuilder, LevelHistogram,
40    OffsetIndexBuilder,
41};
42use crate::file::page_encoding_stats::PageEncodingStats;
43use crate::file::properties::{
44    EnabledStatistics, WriterProperties, WriterPropertiesPtr, WriterVersion,
45};
46use crate::file::statistics::{Statistics, ValueStatistics};
47use crate::schema::types::{ColumnDescPtr, ColumnDescriptor};
48
49pub(crate) mod encoder;
50
51macro_rules! downcast_writer {
52    ($e:expr, $i:ident, $b:expr) => {
53        match $e {
54            Self::BoolColumnWriter($i) => $b,
55            Self::Int32ColumnWriter($i) => $b,
56            Self::Int64ColumnWriter($i) => $b,
57            Self::Int96ColumnWriter($i) => $b,
58            Self::FloatColumnWriter($i) => $b,
59            Self::DoubleColumnWriter($i) => $b,
60            Self::ByteArrayColumnWriter($i) => $b,
61            Self::FixedLenByteArrayColumnWriter($i) => $b,
62        }
63    };
64}
65
66/// Column writer for a Parquet type.
67pub enum ColumnWriter<'a> {
68    /// Column writer for boolean type
69    BoolColumnWriter(ColumnWriterImpl<'a, BoolType>),
70    /// Column writer for int32 type
71    Int32ColumnWriter(ColumnWriterImpl<'a, Int32Type>),
72    /// Column writer for int64 type
73    Int64ColumnWriter(ColumnWriterImpl<'a, Int64Type>),
74    /// Column writer for int96 (timestamp) type
75    Int96ColumnWriter(ColumnWriterImpl<'a, Int96Type>),
76    /// Column writer for float type
77    FloatColumnWriter(ColumnWriterImpl<'a, FloatType>),
78    /// Column writer for double type
79    DoubleColumnWriter(ColumnWriterImpl<'a, DoubleType>),
80    /// Column writer for byte array type
81    ByteArrayColumnWriter(ColumnWriterImpl<'a, ByteArrayType>),
82    /// Column writer for fixed length byte array type
83    FixedLenByteArrayColumnWriter(ColumnWriterImpl<'a, FixedLenByteArrayType>),
84}
85
86impl ColumnWriter<'_> {
87    /// Returns the estimated total memory usage
88    #[cfg(feature = "arrow")]
89    pub(crate) fn memory_size(&self) -> usize {
90        downcast_writer!(self, typed, typed.memory_size())
91    }
92
93    /// Returns the estimated total encoded bytes for this column writer
94    #[cfg(feature = "arrow")]
95    pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
96        downcast_writer!(self, typed, typed.get_estimated_total_bytes())
97    }
98
99    /// Close this [`ColumnWriter`]
100    pub fn close(self) -> Result<ColumnCloseResult> {
101        downcast_writer!(self, typed, typed.close())
102    }
103}
104
105/// Gets a specific column writer corresponding to column descriptor `descr`.
106pub fn get_column_writer<'a>(
107    descr: ColumnDescPtr,
108    props: WriterPropertiesPtr,
109    page_writer: Box<dyn PageWriter + 'a>,
110) -> ColumnWriter<'a> {
111    match descr.physical_type() {
112        Type::BOOLEAN => {
113            ColumnWriter::BoolColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
114        }
115        Type::INT32 => {
116            ColumnWriter::Int32ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
117        }
118        Type::INT64 => {
119            ColumnWriter::Int64ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
120        }
121        Type::INT96 => {
122            ColumnWriter::Int96ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
123        }
124        Type::FLOAT => {
125            ColumnWriter::FloatColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
126        }
127        Type::DOUBLE => {
128            ColumnWriter::DoubleColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
129        }
130        Type::BYTE_ARRAY => {
131            ColumnWriter::ByteArrayColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
132        }
133        Type::FIXED_LEN_BYTE_ARRAY => ColumnWriter::FixedLenByteArrayColumnWriter(
134            ColumnWriterImpl::new(descr, props, page_writer),
135        ),
136    }
137}
138
139/// Gets a typed column writer for the specific type `T`, by "up-casting" `col_writer` of
140/// non-generic type to a generic column writer type `ColumnWriterImpl`.
141///
142/// Panics if actual enum value for `col_writer` does not match the type `T`.
143pub fn get_typed_column_writer<T: DataType>(col_writer: ColumnWriter) -> ColumnWriterImpl<T> {
144    T::get_column_writer(col_writer).unwrap_or_else(|| {
145        panic!(
146            "Failed to convert column writer into a typed column writer for `{}` type",
147            T::get_physical_type()
148        )
149    })
150}
151
152/// Similar to `get_typed_column_writer` but returns a reference.
153pub fn get_typed_column_writer_ref<'a, 'b: 'a, T: DataType>(
154    col_writer: &'b ColumnWriter<'a>,
155) -> &'b ColumnWriterImpl<'a, T> {
156    T::get_column_writer_ref(col_writer).unwrap_or_else(|| {
157        panic!(
158            "Failed to convert column writer into a typed column writer for `{}` type",
159            T::get_physical_type()
160        )
161    })
162}
163
164/// Similar to `get_typed_column_writer` but returns a reference.
165pub fn get_typed_column_writer_mut<'a, 'b: 'a, T: DataType>(
166    col_writer: &'a mut ColumnWriter<'b>,
167) -> &'a mut ColumnWriterImpl<'b, T> {
168    T::get_column_writer_mut(col_writer).unwrap_or_else(|| {
169        panic!(
170            "Failed to convert column writer into a typed column writer for `{}` type",
171            T::get_physical_type()
172        )
173    })
174}
175
176/// Metadata returned by [`GenericColumnWriter::close`]
177#[derive(Debug, Clone)]
178pub struct ColumnCloseResult {
179    /// The total number of bytes written
180    pub bytes_written: u64,
181    /// The total number of rows written
182    pub rows_written: u64,
183    /// Metadata for this column chunk
184    pub metadata: ColumnChunkMetaData,
185    /// Optional bloom filter for this column
186    pub bloom_filter: Option<Sbbf>,
187    /// Optional column index, for filtering
188    pub column_index: Option<ColumnIndex>,
189    /// Optional offset index, identifying page locations
190    pub offset_index: Option<OffsetIndex>,
191}
192
193// Metrics per page
194#[derive(Default)]
195struct PageMetrics {
196    num_buffered_values: u32,
197    num_buffered_rows: u32,
198    num_page_nulls: u64,
199    repetition_level_histogram: Option<LevelHistogram>,
200    definition_level_histogram: Option<LevelHistogram>,
201}
202
203impl PageMetrics {
204    fn new() -> Self {
205        Default::default()
206    }
207
208    /// Initialize the repetition level histogram
209    fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
210        self.repetition_level_histogram = LevelHistogram::try_new(max_level);
211        self
212    }
213
214    /// Initialize the definition level histogram
215    fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
216        self.definition_level_histogram = LevelHistogram::try_new(max_level);
217        self
218    }
219
220    /// Resets the state of this `PageMetrics` to the initial state.
221    /// If histograms have been initialized their contents will be reset to zero.
222    fn new_page(&mut self) {
223        self.num_buffered_values = 0;
224        self.num_buffered_rows = 0;
225        self.num_page_nulls = 0;
226        self.repetition_level_histogram
227            .as_mut()
228            .map(LevelHistogram::reset);
229        self.definition_level_histogram
230            .as_mut()
231            .map(LevelHistogram::reset);
232    }
233
234    /// Updates histogram values using provided repetition levels
235    fn update_repetition_level_histogram(&mut self, levels: &[i16]) {
236        if let Some(ref mut rep_hist) = self.repetition_level_histogram {
237            rep_hist.update_from_levels(levels);
238        }
239    }
240
241    /// Updates histogram values using provided definition levels
242    fn update_definition_level_histogram(&mut self, levels: &[i16]) {
243        if let Some(ref mut def_hist) = self.definition_level_histogram {
244            def_hist.update_from_levels(levels);
245        }
246    }
247}
248
249// Metrics per column writer
250#[derive(Default)]
251struct ColumnMetrics<T: Default> {
252    total_bytes_written: u64,
253    total_rows_written: u64,
254    total_uncompressed_size: u64,
255    total_compressed_size: u64,
256    total_num_values: u64,
257    dictionary_page_offset: Option<u64>,
258    data_page_offset: Option<u64>,
259    min_column_value: Option<T>,
260    max_column_value: Option<T>,
261    num_column_nulls: u64,
262    column_distinct_count: Option<u64>,
263    variable_length_bytes: Option<i64>,
264    repetition_level_histogram: Option<LevelHistogram>,
265    definition_level_histogram: Option<LevelHistogram>,
266}
267
268impl<T: Default> ColumnMetrics<T> {
269    fn new() -> Self {
270        Default::default()
271    }
272
273    /// Initialize the repetition level histogram
274    fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
275        self.repetition_level_histogram = LevelHistogram::try_new(max_level);
276        self
277    }
278
279    /// Initialize the definition level histogram
280    fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
281        self.definition_level_histogram = LevelHistogram::try_new(max_level);
282        self
283    }
284
285    /// Sum `page_histogram` into `chunk_histogram`
286    fn update_histogram(
287        chunk_histogram: &mut Option<LevelHistogram>,
288        page_histogram: &Option<LevelHistogram>,
289    ) {
290        if let (Some(page_hist), Some(chunk_hist)) = (page_histogram, chunk_histogram) {
291            chunk_hist.add(page_hist);
292        }
293    }
294
295    /// Sum the provided PageMetrics histograms into the chunk histograms. Does nothing if
296    /// page histograms are not initialized.
297    fn update_from_page_metrics(&mut self, page_metrics: &PageMetrics) {
298        ColumnMetrics::<T>::update_histogram(
299            &mut self.definition_level_histogram,
300            &page_metrics.definition_level_histogram,
301        );
302        ColumnMetrics::<T>::update_histogram(
303            &mut self.repetition_level_histogram,
304            &page_metrics.repetition_level_histogram,
305        );
306    }
307
308    /// Sum the provided page variable_length_bytes into the chunk variable_length_bytes
309    fn update_variable_length_bytes(&mut self, variable_length_bytes: Option<i64>) {
310        if let Some(var_bytes) = variable_length_bytes {
311            *self.variable_length_bytes.get_or_insert(0) += var_bytes;
312        }
313    }
314}
315
316/// Typed column writer for a primitive column.
317pub type ColumnWriterImpl<'a, T> = GenericColumnWriter<'a, ColumnValueEncoderImpl<T>>;
318
319/// Generic column writer for a primitive column.
320pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> {
321    // Column writer properties
322    descr: ColumnDescPtr,
323    props: WriterPropertiesPtr,
324    statistics_enabled: EnabledStatistics,
325
326    page_writer: Box<dyn PageWriter + 'a>,
327    codec: Compression,
328    compressor: Option<Box<dyn Codec>>,
329    encoder: E,
330
331    page_metrics: PageMetrics,
332    // Metrics per column writer
333    column_metrics: ColumnMetrics<E::T>,
334
335    /// The order of encodings within the generated metadata does not impact its meaning,
336    /// but we use a BTreeSet so that the output is deterministic
337    encodings: BTreeSet<Encoding>,
338    encoding_stats: Vec<PageEncodingStats>,
339    // Reused buffers
340    def_levels_sink: Vec<i16>,
341    rep_levels_sink: Vec<i16>,
342    data_pages: VecDeque<CompressedPage>,
343    // column index and offset index
344    column_index_builder: ColumnIndexBuilder,
345    offset_index_builder: Option<OffsetIndexBuilder>,
346
347    // Below fields used to incrementally check boundary order across data pages.
348    // We assume they are ascending/descending until proven wrong.
349    data_page_boundary_ascending: bool,
350    data_page_boundary_descending: bool,
351    /// (min, max)
352    last_non_null_data_page_min_max: Option<(E::T, E::T)>,
353}
354
355impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
356    /// Returns a new instance of [`GenericColumnWriter`].
357    pub fn new(
358        descr: ColumnDescPtr,
359        props: WriterPropertiesPtr,
360        page_writer: Box<dyn PageWriter + 'a>,
361    ) -> Self {
362        let codec = props.compression(descr.path());
363        let codec_options = CodecOptionsBuilder::default().build();
364        let compressor = create_codec(codec, &codec_options).unwrap();
365        let encoder = E::try_new(&descr, props.as_ref()).unwrap();
366
367        let statistics_enabled = props.statistics_enabled(descr.path());
368
369        let mut encodings = BTreeSet::new();
370        // Used for level information
371        encodings.insert(Encoding::RLE);
372
373        let mut page_metrics = PageMetrics::new();
374        let mut column_metrics = ColumnMetrics::<E::T>::new();
375
376        // Initialize level histograms if collecting page or chunk statistics
377        if statistics_enabled != EnabledStatistics::None {
378            page_metrics = page_metrics
379                .with_repetition_level_histogram(descr.max_rep_level())
380                .with_definition_level_histogram(descr.max_def_level());
381            column_metrics = column_metrics
382                .with_repetition_level_histogram(descr.max_rep_level())
383                .with_definition_level_histogram(descr.max_def_level())
384        }
385
386        // Disable column_index_builder if not collecting page statistics.
387        let mut column_index_builder = ColumnIndexBuilder::new();
388        if statistics_enabled != EnabledStatistics::Page {
389            column_index_builder.to_invalid()
390        }
391
392        // Disable offset_index_builder if requested by user.
393        let offset_index_builder = match props.offset_index_disabled() {
394            false => Some(OffsetIndexBuilder::new()),
395            _ => None,
396        };
397
398        Self {
399            descr,
400            props,
401            statistics_enabled,
402            page_writer,
403            codec,
404            compressor,
405            encoder,
406            def_levels_sink: vec![],
407            rep_levels_sink: vec![],
408            data_pages: VecDeque::new(),
409            page_metrics,
410            column_metrics,
411            column_index_builder,
412            offset_index_builder,
413            encodings,
414            encoding_stats: vec![],
415            data_page_boundary_ascending: true,
416            data_page_boundary_descending: true,
417            last_non_null_data_page_min_max: None,
418        }
419    }
420
421    #[allow(clippy::too_many_arguments)]
422    pub(crate) fn write_batch_internal(
423        &mut self,
424        values: &E::Values,
425        value_indices: Option<&[usize]>,
426        def_levels: Option<&[i16]>,
427        rep_levels: Option<&[i16]>,
428        min: Option<&E::T>,
429        max: Option<&E::T>,
430        distinct_count: Option<u64>,
431    ) -> Result<usize> {
432        // Check if number of definition levels is the same as number of repetition levels.
433        if let (Some(def), Some(rep)) = (def_levels, rep_levels) {
434            if def.len() != rep.len() {
435                return Err(general_err!(
436                    "Inconsistent length of definition and repetition levels: {} != {}",
437                    def.len(),
438                    rep.len()
439                ));
440            }
441        }
442
443        // We check for DataPage limits only after we have inserted the values. If a user
444        // writes a large number of values, the DataPage size can be well above the limit.
445        //
446        // The purpose of this chunking is to bound this. Even if a user writes large
447        // number of values, the chunking will ensure that we add data page at a
448        // reasonable pagesize limit.
449
450        // TODO: find out why we don't account for size of levels when we estimate page
451        // size.
452
453        let num_levels = match def_levels {
454            Some(def_levels) => def_levels.len(),
455            None => values.len(),
456        };
457
458        if let Some(min) = min {
459            update_min(&self.descr, min, &mut self.column_metrics.min_column_value);
460        }
461        if let Some(max) = max {
462            update_max(&self.descr, max, &mut self.column_metrics.max_column_value);
463        }
464
465        // We can only set the distinct count if there are no other writes
466        if self.encoder.num_values() == 0 {
467            self.column_metrics.column_distinct_count = distinct_count;
468        } else {
469            self.column_metrics.column_distinct_count = None;
470        }
471
472        let mut values_offset = 0;
473        let mut levels_offset = 0;
474        let base_batch_size = self.props.write_batch_size();
475        while levels_offset < num_levels {
476            let mut end_offset = num_levels.min(levels_offset + base_batch_size);
477
478            // Split at record boundary
479            if let Some(r) = rep_levels {
480                while end_offset < r.len() && r[end_offset] != 0 {
481                    end_offset += 1;
482                }
483            }
484
485            values_offset += self.write_mini_batch(
486                values,
487                values_offset,
488                value_indices,
489                end_offset - levels_offset,
490                def_levels.map(|lv| &lv[levels_offset..end_offset]),
491                rep_levels.map(|lv| &lv[levels_offset..end_offset]),
492            )?;
493            levels_offset = end_offset;
494        }
495
496        // Return total number of values processed.
497        Ok(values_offset)
498    }
499
500    /// Writes batch of values, definition levels and repetition levels.
501    /// Returns number of values processed (written).
502    ///
503    /// If definition and repetition levels are provided, we write fully those levels and
504    /// select how many values to write (this number will be returned), since number of
505    /// actual written values may be smaller than provided values.
506    ///
507    /// If only values are provided, then all values are written and the length of
508    /// of the values buffer is returned.
509    ///
510    /// Definition and/or repetition levels can be omitted, if values are
511    /// non-nullable and/or non-repeated.
512    pub fn write_batch(
513        &mut self,
514        values: &E::Values,
515        def_levels: Option<&[i16]>,
516        rep_levels: Option<&[i16]>,
517    ) -> Result<usize> {
518        self.write_batch_internal(values, None, def_levels, rep_levels, None, None, None)
519    }
520
521    /// Writer may optionally provide pre-calculated statistics for use when computing
522    /// chunk-level statistics
523    ///
524    /// NB: [`WriterProperties::statistics_enabled`] must be set to [`EnabledStatistics::Chunk`]
525    /// for these statistics to take effect. If [`EnabledStatistics::None`] they will be ignored,
526    /// and if [`EnabledStatistics::Page`] the chunk statistics will instead be computed from the
527    /// computed page statistics
528    pub fn write_batch_with_statistics(
529        &mut self,
530        values: &E::Values,
531        def_levels: Option<&[i16]>,
532        rep_levels: Option<&[i16]>,
533        min: Option<&E::T>,
534        max: Option<&E::T>,
535        distinct_count: Option<u64>,
536    ) -> Result<usize> {
537        self.write_batch_internal(
538            values,
539            None,
540            def_levels,
541            rep_levels,
542            min,
543            max,
544            distinct_count,
545        )
546    }
547
548    /// Returns the estimated total memory usage.
549    ///
550    /// Unlike [`Self::get_estimated_total_bytes`] this is an estimate
551    /// of the current memory usage and not the final anticipated encoded size.
552    #[cfg(feature = "arrow")]
553    pub(crate) fn memory_size(&self) -> usize {
554        self.column_metrics.total_bytes_written as usize + self.encoder.estimated_memory_size()
555    }
556
557    /// Returns total number of bytes written by this column writer so far.
558    /// This value is also returned when column writer is closed.
559    ///
560    /// Note: this value does not include any buffered data that has not
561    /// yet been flushed to a page.
562    pub fn get_total_bytes_written(&self) -> u64 {
563        self.column_metrics.total_bytes_written
564    }
565
566    /// Returns the estimated total encoded bytes for this column writer.
567    ///
568    /// Unlike [`Self::get_total_bytes_written`] this includes an estimate
569    /// of any data that has not yet been flushed to a page, based on it's
570    /// anticipated encoded size.
571    #[cfg(feature = "arrow")]
572    pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
573        self.data_pages
574            .iter()
575            .map(|page| page.data().len() as u64)
576            .sum::<u64>()
577            + self.column_metrics.total_bytes_written
578            + self.encoder.estimated_data_page_size() as u64
579            + self.encoder.estimated_dict_page_size().unwrap_or_default() as u64
580    }
581
582    /// Returns total number of rows written by this column writer so far.
583    /// This value is also returned when column writer is closed.
584    pub fn get_total_rows_written(&self) -> u64 {
585        self.column_metrics.total_rows_written
586    }
587
588    /// Returns a reference to a [`ColumnDescPtr`]
589    pub fn get_descriptor(&self) -> &ColumnDescPtr {
590        &self.descr
591    }
592
593    /// Finalizes writes and closes the column writer.
594    /// Returns total bytes written, total rows written and column chunk metadata.
595    pub fn close(mut self) -> Result<ColumnCloseResult> {
596        if self.page_metrics.num_buffered_values > 0 {
597            self.add_data_page()?;
598        }
599        if self.encoder.has_dictionary() {
600            self.write_dictionary_page()?;
601        }
602        self.flush_data_pages()?;
603        let metadata = self.build_column_metadata()?;
604        self.page_writer.close()?;
605
606        let boundary_order = match (
607            self.data_page_boundary_ascending,
608            self.data_page_boundary_descending,
609        ) {
610            // If the lists are composed of equal elements then will be marked as ascending
611            // (Also the case if all pages are null pages)
612            (true, _) => BoundaryOrder::ASCENDING,
613            (false, true) => BoundaryOrder::DESCENDING,
614            (false, false) => BoundaryOrder::UNORDERED,
615        };
616        self.column_index_builder.set_boundary_order(boundary_order);
617
618        let column_index = self
619            .column_index_builder
620            .valid()
621            .then(|| self.column_index_builder.build_to_thrift());
622
623        let offset_index = self.offset_index_builder.map(|b| b.build_to_thrift());
624
625        Ok(ColumnCloseResult {
626            bytes_written: self.column_metrics.total_bytes_written,
627            rows_written: self.column_metrics.total_rows_written,
628            bloom_filter: self.encoder.flush_bloom_filter(),
629            metadata,
630            column_index,
631            offset_index,
632        })
633    }
634
635    /// Writes mini batch of values, definition and repetition levels.
636    /// This allows fine-grained processing of values and maintaining a reasonable
637    /// page size.
638    fn write_mini_batch(
639        &mut self,
640        values: &E::Values,
641        values_offset: usize,
642        value_indices: Option<&[usize]>,
643        num_levels: usize,
644        def_levels: Option<&[i16]>,
645        rep_levels: Option<&[i16]>,
646    ) -> Result<usize> {
647        // Process definition levels and determine how many values to write.
648        let values_to_write = if self.descr.max_def_level() > 0 {
649            let levels = def_levels.ok_or_else(|| {
650                general_err!(
651                    "Definition levels are required, because max definition level = {}",
652                    self.descr.max_def_level()
653                )
654            })?;
655
656            let values_to_write = levels
657                .iter()
658                .map(|level| (*level == self.descr.max_def_level()) as usize)
659                .sum();
660            self.page_metrics.num_page_nulls += (levels.len() - values_to_write) as u64;
661
662            // Update histogram
663            self.page_metrics.update_definition_level_histogram(levels);
664
665            self.def_levels_sink.extend_from_slice(levels);
666            values_to_write
667        } else {
668            num_levels
669        };
670
671        // Process repetition levels and determine how many rows we are about to process.
672        if self.descr.max_rep_level() > 0 {
673            // A row could contain more than one value.
674            let levels = rep_levels.ok_or_else(|| {
675                general_err!(
676                    "Repetition levels are required, because max repetition level = {}",
677                    self.descr.max_rep_level()
678                )
679            })?;
680
681            if !levels.is_empty() && levels[0] != 0 {
682                return Err(general_err!(
683                    "Write must start at a record boundary, got non-zero repetition level of {}",
684                    levels[0]
685                ));
686            }
687
688            // Count the occasions where we start a new row
689            for &level in levels {
690                self.page_metrics.num_buffered_rows += (level == 0) as u32
691            }
692
693            // Update histogram
694            self.page_metrics.update_repetition_level_histogram(levels);
695
696            self.rep_levels_sink.extend_from_slice(levels);
697        } else {
698            // Each value is exactly one row.
699            // Equals to the number of values, we count nulls as well.
700            self.page_metrics.num_buffered_rows += num_levels as u32;
701        }
702
703        match value_indices {
704            Some(indices) => {
705                let indices = &indices[values_offset..values_offset + values_to_write];
706                self.encoder.write_gather(values, indices)?;
707            }
708            None => self.encoder.write(values, values_offset, values_to_write)?,
709        }
710
711        self.page_metrics.num_buffered_values += num_levels as u32;
712
713        if self.should_add_data_page() {
714            self.add_data_page()?;
715        }
716
717        if self.should_dict_fallback() {
718            self.dict_fallback()?;
719        }
720
721        Ok(values_to_write)
722    }
723
724    /// Returns true if we need to fall back to non-dictionary encoding.
725    ///
726    /// We can only fall back if dictionary encoder is set and we have exceeded dictionary
727    /// size.
728    #[inline]
729    fn should_dict_fallback(&self) -> bool {
730        match self.encoder.estimated_dict_page_size() {
731            Some(size) => {
732                size >= self
733                    .props
734                    .column_dictionary_page_size_limit(self.descr.path())
735            }
736            None => false,
737        }
738    }
739
740    /// Returns true if there is enough data for a data page, false otherwise.
741    #[inline]
742    fn should_add_data_page(&self) -> bool {
743        // This is necessary in the event of a much larger dictionary size than page size
744        //
745        // In such a scenario the dictionary decoder may return an estimated encoded
746        // size in excess of the page size limit, even when there are no buffered values
747        if self.page_metrics.num_buffered_values == 0 {
748            return false;
749        }
750
751        self.page_metrics.num_buffered_rows as usize >= self.props.data_page_row_count_limit()
752            || self.encoder.estimated_data_page_size() >= self.props.data_page_size_limit()
753    }
754
755    /// Performs dictionary fallback.
756    /// Prepares and writes dictionary and all data pages into page writer.
757    fn dict_fallback(&mut self) -> Result<()> {
758        // At this point we know that we need to fall back.
759        if self.page_metrics.num_buffered_values > 0 {
760            self.add_data_page()?;
761        }
762        self.write_dictionary_page()?;
763        self.flush_data_pages()?;
764        Ok(())
765    }
766
767    /// Update the column index and offset index when adding the data page
768    fn update_column_offset_index(
769        &mut self,
770        page_statistics: Option<&ValueStatistics<E::T>>,
771        page_variable_length_bytes: Option<i64>,
772    ) {
773        // update the column index
774        let null_page =
775            (self.page_metrics.num_buffered_rows as u64) == self.page_metrics.num_page_nulls;
776        // a page contains only null values,
777        // and writers have to set the corresponding entries in min_values and max_values to byte[0]
778        if null_page && self.column_index_builder.valid() {
779            self.column_index_builder.append(
780                null_page,
781                vec![],
782                vec![],
783                self.page_metrics.num_page_nulls as i64,
784            );
785        } else if self.column_index_builder.valid() {
786            // from page statistics
787            // If can't get the page statistics, ignore this column/offset index for this column chunk
788            match &page_statistics {
789                None => {
790                    self.column_index_builder.to_invalid();
791                }
792                Some(stat) => {
793                    // Check if min/max are still ascending/descending across pages
794                    let new_min = stat.min_opt().unwrap();
795                    let new_max = stat.max_opt().unwrap();
796                    if let Some((last_min, last_max)) = &self.last_non_null_data_page_min_max {
797                        if self.data_page_boundary_ascending {
798                            // If last min/max are greater than new min/max then not ascending anymore
799                            let not_ascending = compare_greater(&self.descr, last_min, new_min)
800                                || compare_greater(&self.descr, last_max, new_max);
801                            if not_ascending {
802                                self.data_page_boundary_ascending = false;
803                            }
804                        }
805
806                        if self.data_page_boundary_descending {
807                            // If new min/max are greater than last min/max then not descending anymore
808                            let not_descending = compare_greater(&self.descr, new_min, last_min)
809                                || compare_greater(&self.descr, new_max, last_max);
810                            if not_descending {
811                                self.data_page_boundary_descending = false;
812                            }
813                        }
814                    }
815                    self.last_non_null_data_page_min_max = Some((new_min.clone(), new_max.clone()));
816
817                    if self.can_truncate_value() {
818                        self.column_index_builder.append(
819                            null_page,
820                            self.truncate_min_value(
821                                self.props.column_index_truncate_length(),
822                                stat.min_bytes_opt().unwrap(),
823                            )
824                            .0,
825                            self.truncate_max_value(
826                                self.props.column_index_truncate_length(),
827                                stat.max_bytes_opt().unwrap(),
828                            )
829                            .0,
830                            self.page_metrics.num_page_nulls as i64,
831                        );
832                    } else {
833                        self.column_index_builder.append(
834                            null_page,
835                            stat.min_bytes_opt().unwrap().to_vec(),
836                            stat.max_bytes_opt().unwrap().to_vec(),
837                            self.page_metrics.num_page_nulls as i64,
838                        );
839                    }
840                }
841            }
842        }
843
844        // Append page histograms to the `ColumnIndex` histograms
845        self.column_index_builder.append_histograms(
846            &self.page_metrics.repetition_level_histogram,
847            &self.page_metrics.definition_level_histogram,
848        );
849
850        // Update the offset index
851        if let Some(builder) = self.offset_index_builder.as_mut() {
852            builder.append_row_count(self.page_metrics.num_buffered_rows as i64);
853            builder.append_unencoded_byte_array_data_bytes(page_variable_length_bytes);
854        }
855    }
856
857    /// Determine if we should allow truncating min/max values for this column's statistics
858    fn can_truncate_value(&self) -> bool {
859        match self.descr.physical_type() {
860            // Don't truncate for Float16 and Decimal because their sort order is different
861            // from that of FIXED_LEN_BYTE_ARRAY sort order.
862            // So truncation of those types could lead to inaccurate min/max statistics
863            Type::FIXED_LEN_BYTE_ARRAY
864                if !matches!(
865                    self.descr.logical_type(),
866                    Some(LogicalType::Decimal { .. }) | Some(LogicalType::Float16)
867                ) =>
868            {
869                true
870            }
871            Type::BYTE_ARRAY => true,
872            // Truncation only applies for fba/binary physical types
873            _ => false,
874        }
875    }
876
877    /// Returns `true` if this column's logical type is a UTF-8 string.
878    fn is_utf8(&self) -> bool {
879        self.get_descriptor().logical_type() == Some(LogicalType::String)
880            || self.get_descriptor().converted_type() == ConvertedType::UTF8
881    }
882
883    /// Truncates a binary statistic to at most `truncation_length` bytes.
884    ///
885    /// If truncation is not possible, returns `data`.
886    ///
887    /// The `bool` in the returned tuple indicates whether truncation occurred or not.
888    ///
889    /// UTF-8 Note:
890    /// If the column type indicates UTF-8, and `data` contains valid UTF-8, then the result will
891    /// also remain valid UTF-8, but may be less tnan `truncation_length` bytes to avoid splitting
892    /// on non-character boundaries.
893    fn truncate_min_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
894        truncation_length
895            .filter(|l| data.len() > *l)
896            .and_then(|l|
897                // don't do extra work if this column isn't UTF-8
898                if self.is_utf8() {
899                    match str::from_utf8(data) {
900                        Ok(str_data) => truncate_utf8(str_data, l),
901                        Err(_) => Some(data[..l].to_vec()),
902                    }
903                } else {
904                    Some(data[..l].to_vec())
905                }
906            )
907            .map(|truncated| (truncated, true))
908            .unwrap_or_else(|| (data.to_vec(), false))
909    }
910
911    /// Truncates a binary statistic to at most `truncation_length` bytes, and then increment the
912    /// final byte(s) to yield a valid upper bound. This may result in a result of less than
913    /// `truncation_length` bytes if the last byte(s) overflows.
914    ///
915    /// If truncation is not possible, returns `data`.
916    ///
917    /// The `bool` in the returned tuple indicates whether truncation occurred or not.
918    ///
919    /// UTF-8 Note:
920    /// If the column type indicates UTF-8, and `data` contains valid UTF-8, then the result will
921    /// also remain valid UTF-8 (but again may be less than `truncation_length` bytes). If `data`
922    /// does not contain valid UTF-8, then truncation will occur as if the column is non-string
923    /// binary.
924    fn truncate_max_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
925        truncation_length
926            .filter(|l| data.len() > *l)
927            .and_then(|l|
928                // don't do extra work if this column isn't UTF-8
929                if self.is_utf8() {
930                    match str::from_utf8(data) {
931                        Ok(str_data) => truncate_and_increment_utf8(str_data, l),
932                        Err(_) => increment(data[..l].to_vec()),
933                    }
934                } else {
935                    increment(data[..l].to_vec())
936                }
937            )
938            .map(|truncated| (truncated, true))
939            .unwrap_or_else(|| (data.to_vec(), false))
940    }
941
942    /// Truncate the min and max values that will be written to a data page
943    /// header or column chunk Statistics
944    fn truncate_statistics(&self, statistics: Statistics) -> Statistics {
945        let backwards_compatible_min_max = self.descr.sort_order().is_signed();
946        match statistics {
947            Statistics::ByteArray(stats) if stats._internal_has_min_max_set() => {
948                let (min, did_truncate_min) = self.truncate_min_value(
949                    self.props.statistics_truncate_length(),
950                    stats.min_bytes_opt().unwrap(),
951                );
952                let (max, did_truncate_max) = self.truncate_max_value(
953                    self.props.statistics_truncate_length(),
954                    stats.max_bytes_opt().unwrap(),
955                );
956                Statistics::ByteArray(
957                    ValueStatistics::new(
958                        Some(min.into()),
959                        Some(max.into()),
960                        stats.distinct_count(),
961                        stats.null_count_opt(),
962                        backwards_compatible_min_max,
963                    )
964                    .with_max_is_exact(!did_truncate_max)
965                    .with_min_is_exact(!did_truncate_min),
966                )
967            }
968            Statistics::FixedLenByteArray(stats)
969                if (stats._internal_has_min_max_set() && self.can_truncate_value()) =>
970            {
971                let (min, did_truncate_min) = self.truncate_min_value(
972                    self.props.statistics_truncate_length(),
973                    stats.min_bytes_opt().unwrap(),
974                );
975                let (max, did_truncate_max) = self.truncate_max_value(
976                    self.props.statistics_truncate_length(),
977                    stats.max_bytes_opt().unwrap(),
978                );
979                Statistics::FixedLenByteArray(
980                    ValueStatistics::new(
981                        Some(min.into()),
982                        Some(max.into()),
983                        stats.distinct_count(),
984                        stats.null_count_opt(),
985                        backwards_compatible_min_max,
986                    )
987                    .with_max_is_exact(!did_truncate_max)
988                    .with_min_is_exact(!did_truncate_min),
989                )
990            }
991            stats => stats,
992        }
993    }
994
995    /// Adds data page.
996    /// Data page is either buffered in case of dictionary encoding or written directly.
997    fn add_data_page(&mut self) -> Result<()> {
998        // Extract encoded values
999        let values_data = self.encoder.flush_data_page()?;
1000
1001        let max_def_level = self.descr.max_def_level();
1002        let max_rep_level = self.descr.max_rep_level();
1003
1004        self.column_metrics.num_column_nulls += self.page_metrics.num_page_nulls;
1005
1006        let page_statistics = match (values_data.min_value, values_data.max_value) {
1007            (Some(min), Some(max)) => {
1008                // Update chunk level statistics
1009                update_min(&self.descr, &min, &mut self.column_metrics.min_column_value);
1010                update_max(&self.descr, &max, &mut self.column_metrics.max_column_value);
1011
1012                (self.statistics_enabled == EnabledStatistics::Page).then_some(
1013                    ValueStatistics::new(
1014                        Some(min),
1015                        Some(max),
1016                        None,
1017                        Some(self.page_metrics.num_page_nulls),
1018                        false,
1019                    ),
1020                )
1021            }
1022            _ => None,
1023        };
1024
1025        // update column and offset index
1026        self.update_column_offset_index(
1027            page_statistics.as_ref(),
1028            values_data.variable_length_bytes,
1029        );
1030
1031        // Update histograms and variable_length_bytes in column_metrics
1032        self.column_metrics
1033            .update_from_page_metrics(&self.page_metrics);
1034        self.column_metrics
1035            .update_variable_length_bytes(values_data.variable_length_bytes);
1036
1037        // From here on, we only need page statistics if they will be written to the page header.
1038        let page_statistics = page_statistics
1039            .filter(|_| self.props.write_page_header_statistics(self.descr.path()))
1040            .map(|stats| self.truncate_statistics(Statistics::from(stats)));
1041
1042        let compressed_page = match self.props.writer_version() {
1043            WriterVersion::PARQUET_1_0 => {
1044                let mut buffer = vec![];
1045
1046                if max_rep_level > 0 {
1047                    buffer.extend_from_slice(
1048                        &self.encode_levels_v1(
1049                            Encoding::RLE,
1050                            &self.rep_levels_sink[..],
1051                            max_rep_level,
1052                        )[..],
1053                    );
1054                }
1055
1056                if max_def_level > 0 {
1057                    buffer.extend_from_slice(
1058                        &self.encode_levels_v1(
1059                            Encoding::RLE,
1060                            &self.def_levels_sink[..],
1061                            max_def_level,
1062                        )[..],
1063                    );
1064                }
1065
1066                buffer.extend_from_slice(&values_data.buf);
1067                let uncompressed_size = buffer.len();
1068
1069                if let Some(ref mut cmpr) = self.compressor {
1070                    let mut compressed_buf = Vec::with_capacity(uncompressed_size);
1071                    cmpr.compress(&buffer[..], &mut compressed_buf)?;
1072                    buffer = compressed_buf;
1073                }
1074
1075                let data_page = Page::DataPage {
1076                    buf: buffer.into(),
1077                    num_values: self.page_metrics.num_buffered_values,
1078                    encoding: values_data.encoding,
1079                    def_level_encoding: Encoding::RLE,
1080                    rep_level_encoding: Encoding::RLE,
1081                    statistics: page_statistics,
1082                };
1083
1084                CompressedPage::new(data_page, uncompressed_size)
1085            }
1086            WriterVersion::PARQUET_2_0 => {
1087                let mut rep_levels_byte_len = 0;
1088                let mut def_levels_byte_len = 0;
1089                let mut buffer = vec![];
1090
1091                if max_rep_level > 0 {
1092                    let levels = self.encode_levels_v2(&self.rep_levels_sink[..], max_rep_level);
1093                    rep_levels_byte_len = levels.len();
1094                    buffer.extend_from_slice(&levels[..]);
1095                }
1096
1097                if max_def_level > 0 {
1098                    let levels = self.encode_levels_v2(&self.def_levels_sink[..], max_def_level);
1099                    def_levels_byte_len = levels.len();
1100                    buffer.extend_from_slice(&levels[..]);
1101                }
1102
1103                let uncompressed_size =
1104                    rep_levels_byte_len + def_levels_byte_len + values_data.buf.len();
1105
1106                // Data Page v2 compresses values only.
1107                match self.compressor {
1108                    Some(ref mut cmpr) => {
1109                        cmpr.compress(&values_data.buf, &mut buffer)?;
1110                    }
1111                    None => buffer.extend_from_slice(&values_data.buf),
1112                }
1113
1114                let data_page = Page::DataPageV2 {
1115                    buf: buffer.into(),
1116                    num_values: self.page_metrics.num_buffered_values,
1117                    encoding: values_data.encoding,
1118                    num_nulls: self.page_metrics.num_page_nulls as u32,
1119                    num_rows: self.page_metrics.num_buffered_rows,
1120                    def_levels_byte_len: def_levels_byte_len as u32,
1121                    rep_levels_byte_len: rep_levels_byte_len as u32,
1122                    is_compressed: self.compressor.is_some(),
1123                    statistics: page_statistics,
1124                };
1125
1126                CompressedPage::new(data_page, uncompressed_size)
1127            }
1128        };
1129
1130        // Check if we need to buffer data page or flush it to the sink directly.
1131        if self.encoder.has_dictionary() {
1132            self.data_pages.push_back(compressed_page);
1133        } else {
1134            self.write_data_page(compressed_page)?;
1135        }
1136
1137        // Update total number of rows.
1138        self.column_metrics.total_rows_written += self.page_metrics.num_buffered_rows as u64;
1139
1140        // Reset state.
1141        self.rep_levels_sink.clear();
1142        self.def_levels_sink.clear();
1143        self.page_metrics.new_page();
1144
1145        Ok(())
1146    }
1147
1148    /// Finalises any outstanding data pages and flushes buffered data pages from
1149    /// dictionary encoding into underlying sink.
1150    #[inline]
1151    fn flush_data_pages(&mut self) -> Result<()> {
1152        // Write all outstanding data to a new page.
1153        if self.page_metrics.num_buffered_values > 0 {
1154            self.add_data_page()?;
1155        }
1156
1157        while let Some(page) = self.data_pages.pop_front() {
1158            self.write_data_page(page)?;
1159        }
1160
1161        Ok(())
1162    }
1163
1164    /// Assembles column chunk metadata.
1165    fn build_column_metadata(&mut self) -> Result<ColumnChunkMetaData> {
1166        let total_compressed_size = self.column_metrics.total_compressed_size as i64;
1167        let total_uncompressed_size = self.column_metrics.total_uncompressed_size as i64;
1168        let num_values = self.column_metrics.total_num_values as i64;
1169        let dict_page_offset = self.column_metrics.dictionary_page_offset.map(|v| v as i64);
1170        // If data page offset is not set, then no pages have been written
1171        let data_page_offset = self.column_metrics.data_page_offset.unwrap_or(0) as i64;
1172
1173        let mut builder = ColumnChunkMetaData::builder(self.descr.clone())
1174            .set_compression(self.codec)
1175            .set_encodings(self.encodings.iter().cloned().collect())
1176            .set_page_encoding_stats(self.encoding_stats.clone())
1177            .set_total_compressed_size(total_compressed_size)
1178            .set_total_uncompressed_size(total_uncompressed_size)
1179            .set_num_values(num_values)
1180            .set_data_page_offset(data_page_offset)
1181            .set_dictionary_page_offset(dict_page_offset);
1182
1183        if self.statistics_enabled != EnabledStatistics::None {
1184            let backwards_compatible_min_max = self.descr.sort_order().is_signed();
1185
1186            let statistics = ValueStatistics::<E::T>::new(
1187                self.column_metrics.min_column_value.clone(),
1188                self.column_metrics.max_column_value.clone(),
1189                self.column_metrics.column_distinct_count,
1190                Some(self.column_metrics.num_column_nulls),
1191                false,
1192            )
1193            .with_backwards_compatible_min_max(backwards_compatible_min_max)
1194            .into();
1195
1196            let statistics = self.truncate_statistics(statistics);
1197
1198            builder = builder
1199                .set_statistics(statistics)
1200                .set_unencoded_byte_array_data_bytes(self.column_metrics.variable_length_bytes)
1201                .set_repetition_level_histogram(
1202                    self.column_metrics.repetition_level_histogram.take(),
1203                )
1204                .set_definition_level_histogram(
1205                    self.column_metrics.definition_level_histogram.take(),
1206                );
1207        }
1208
1209        builder = self.set_column_chunk_encryption_properties(builder);
1210
1211        let metadata = builder.build()?;
1212        Ok(metadata)
1213    }
1214
1215    /// Encodes definition or repetition levels for Data Page v1.
1216    #[inline]
1217    fn encode_levels_v1(&self, encoding: Encoding, levels: &[i16], max_level: i16) -> Vec<u8> {
1218        let mut encoder = LevelEncoder::v1(encoding, max_level, levels.len());
1219        encoder.put(levels);
1220        encoder.consume()
1221    }
1222
1223    /// Encodes definition or repetition levels for Data Page v2.
1224    /// Encoding is always RLE.
1225    #[inline]
1226    fn encode_levels_v2(&self, levels: &[i16], max_level: i16) -> Vec<u8> {
1227        let mut encoder = LevelEncoder::v2(max_level, levels.len());
1228        encoder.put(levels);
1229        encoder.consume()
1230    }
1231
1232    /// Writes compressed data page into underlying sink and updates global metrics.
1233    #[inline]
1234    fn write_data_page(&mut self, page: CompressedPage) -> Result<()> {
1235        self.encodings.insert(page.encoding());
1236        match self.encoding_stats.last_mut() {
1237            Some(encoding_stats)
1238                if encoding_stats.page_type == page.page_type()
1239                    && encoding_stats.encoding == page.encoding() =>
1240            {
1241                encoding_stats.count += 1;
1242            }
1243            _ => {
1244                // data page type does not change inside a file
1245                // encoding can currently only change from dictionary to non-dictionary once
1246                self.encoding_stats.push(PageEncodingStats {
1247                    page_type: page.page_type(),
1248                    encoding: page.encoding(),
1249                    count: 1,
1250                });
1251            }
1252        }
1253        let page_spec = self.page_writer.write_page(page)?;
1254        // update offset index
1255        // compressed_size = header_size + compressed_data_size
1256        if let Some(builder) = self.offset_index_builder.as_mut() {
1257            builder
1258                .append_offset_and_size(page_spec.offset as i64, page_spec.compressed_size as i32)
1259        }
1260        self.update_metrics_for_page(page_spec);
1261        Ok(())
1262    }
1263
1264    /// Writes dictionary page into underlying sink.
1265    #[inline]
1266    fn write_dictionary_page(&mut self) -> Result<()> {
1267        let compressed_page = {
1268            let mut page = self
1269                .encoder
1270                .flush_dict_page()?
1271                .ok_or_else(|| general_err!("Dictionary encoder is not set"))?;
1272
1273            let uncompressed_size = page.buf.len();
1274
1275            if let Some(ref mut cmpr) = self.compressor {
1276                let mut output_buf = Vec::with_capacity(uncompressed_size);
1277                cmpr.compress(&page.buf, &mut output_buf)?;
1278                page.buf = Bytes::from(output_buf);
1279            }
1280
1281            let dict_page = Page::DictionaryPage {
1282                buf: page.buf,
1283                num_values: page.num_values as u32,
1284                encoding: self.props.dictionary_page_encoding(),
1285                is_sorted: page.is_sorted,
1286            };
1287            CompressedPage::new(dict_page, uncompressed_size)
1288        };
1289
1290        self.encodings.insert(compressed_page.encoding());
1291        self.encoding_stats.push(PageEncodingStats {
1292            page_type: PageType::DICTIONARY_PAGE,
1293            encoding: compressed_page.encoding(),
1294            count: 1,
1295        });
1296        let page_spec = self.page_writer.write_page(compressed_page)?;
1297        self.update_metrics_for_page(page_spec);
1298        // For the directory page, don't need to update column/offset index.
1299        Ok(())
1300    }
1301
1302    /// Updates column writer metrics with each page metadata.
1303    #[inline]
1304    fn update_metrics_for_page(&mut self, page_spec: PageWriteSpec) {
1305        self.column_metrics.total_uncompressed_size += page_spec.uncompressed_size as u64;
1306        self.column_metrics.total_compressed_size += page_spec.compressed_size as u64;
1307        self.column_metrics.total_bytes_written += page_spec.bytes_written;
1308
1309        match page_spec.page_type {
1310            PageType::DATA_PAGE | PageType::DATA_PAGE_V2 => {
1311                self.column_metrics.total_num_values += page_spec.num_values as u64;
1312                if self.column_metrics.data_page_offset.is_none() {
1313                    self.column_metrics.data_page_offset = Some(page_spec.offset);
1314                }
1315            }
1316            PageType::DICTIONARY_PAGE => {
1317                assert!(
1318                    self.column_metrics.dictionary_page_offset.is_none(),
1319                    "Dictionary offset is already set"
1320                );
1321                self.column_metrics.dictionary_page_offset = Some(page_spec.offset);
1322            }
1323            _ => {}
1324        }
1325    }
1326
1327    #[inline]
1328    #[cfg(feature = "encryption")]
1329    fn set_column_chunk_encryption_properties(
1330        &self,
1331        builder: ColumnChunkMetaDataBuilder,
1332    ) -> ColumnChunkMetaDataBuilder {
1333        if let Some(encryption_properties) = self.props.file_encryption_properties.as_ref() {
1334            builder.set_column_crypto_metadata(get_column_crypto_metadata(
1335                encryption_properties,
1336                &self.descr,
1337            ))
1338        } else {
1339            builder
1340        }
1341    }
1342
1343    #[inline]
1344    #[cfg(not(feature = "encryption"))]
1345    fn set_column_chunk_encryption_properties(
1346        &self,
1347        builder: ColumnChunkMetaDataBuilder,
1348    ) -> ColumnChunkMetaDataBuilder {
1349        builder
1350    }
1351}
1352
1353fn update_min<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, min: &mut Option<T>) {
1354    update_stat::<T, _>(descr, val, min, |cur| compare_greater(descr, cur, val))
1355}
1356
1357fn update_max<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, max: &mut Option<T>) {
1358    update_stat::<T, _>(descr, val, max, |cur| compare_greater(descr, val, cur))
1359}
1360
1361#[inline]
1362#[allow(clippy::eq_op)]
1363fn is_nan<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T) -> bool {
1364    match T::PHYSICAL_TYPE {
1365        Type::FLOAT | Type::DOUBLE => val != val,
1366        Type::FIXED_LEN_BYTE_ARRAY if descr.logical_type() == Some(LogicalType::Float16) => {
1367            let val = val.as_bytes();
1368            let val = f16::from_le_bytes([val[0], val[1]]);
1369            val.is_nan()
1370        }
1371        _ => false,
1372    }
1373}
1374
1375/// Perform a conditional update of `cur`, skipping any NaN values
1376///
1377/// If `cur` is `None`, sets `cur` to `Some(val)`, otherwise calls `should_update` with
1378/// the value of `cur`, and updates `cur` to `Some(val)` if it returns `true`
1379fn update_stat<T: ParquetValueType, F>(
1380    descr: &ColumnDescriptor,
1381    val: &T,
1382    cur: &mut Option<T>,
1383    should_update: F,
1384) where
1385    F: Fn(&T) -> bool,
1386{
1387    if is_nan(descr, val) {
1388        return;
1389    }
1390
1391    if cur.as_ref().is_none_or(should_update) {
1392        *cur = Some(val.clone());
1393    }
1394}
1395
1396/// Evaluate `a > b` according to underlying logical type.
1397fn compare_greater<T: ParquetValueType>(descr: &ColumnDescriptor, a: &T, b: &T) -> bool {
1398    match T::PHYSICAL_TYPE {
1399        Type::INT32 | Type::INT64 => {
1400            if let Some(LogicalType::Integer {
1401                is_signed: false, ..
1402            }) = descr.logical_type()
1403            {
1404                // need to compare unsigned
1405                return compare_greater_unsigned_int(a, b);
1406            }
1407
1408            match descr.converted_type() {
1409                ConvertedType::UINT_8
1410                | ConvertedType::UINT_16
1411                | ConvertedType::UINT_32
1412                | ConvertedType::UINT_64 => {
1413                    return compare_greater_unsigned_int(a, b);
1414                }
1415                _ => {}
1416            };
1417        }
1418        Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
1419            if let Some(LogicalType::Decimal { .. }) = descr.logical_type() {
1420                return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1421            }
1422            if let ConvertedType::DECIMAL = descr.converted_type() {
1423                return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1424            }
1425            if let Some(LogicalType::Float16) = descr.logical_type() {
1426                return compare_greater_f16(a.as_bytes(), b.as_bytes());
1427            }
1428        }
1429
1430        _ => {}
1431    }
1432
1433    // compare independent of logical / converted type
1434    a > b
1435}
1436
1437// ----------------------------------------------------------------------
1438// Encoding support for column writer.
1439// This mirrors parquet-mr default encodings for writes. See:
1440// https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV1ValuesWriterFactory.java
1441// https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV2ValuesWriterFactory.java
1442
1443/// Returns encoding for a column when no other encoding is provided in writer properties.
1444fn fallback_encoding(kind: Type, props: &WriterProperties) -> Encoding {
1445    match (kind, props.writer_version()) {
1446        (Type::BOOLEAN, WriterVersion::PARQUET_2_0) => Encoding::RLE,
1447        (Type::INT32, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1448        (Type::INT64, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1449        (Type::BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1450        (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1451        _ => Encoding::PLAIN,
1452    }
1453}
1454
1455/// Returns true if dictionary is supported for column writer, false otherwise.
1456fn has_dictionary_support(kind: Type, props: &WriterProperties) -> bool {
1457    match (kind, props.writer_version()) {
1458        // Booleans do not support dict encoding and should use a fallback encoding.
1459        (Type::BOOLEAN, _) => false,
1460        // Dictionary encoding was not enabled in PARQUET 1.0
1461        (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_1_0) => false,
1462        (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => true,
1463        _ => true,
1464    }
1465}
1466
1467#[inline]
1468fn compare_greater_unsigned_int<T: ParquetValueType>(a: &T, b: &T) -> bool {
1469    a.as_u64().unwrap() > b.as_u64().unwrap()
1470}
1471
1472#[inline]
1473fn compare_greater_f16(a: &[u8], b: &[u8]) -> bool {
1474    let a = f16::from_le_bytes(a.try_into().unwrap());
1475    let b = f16::from_le_bytes(b.try_into().unwrap());
1476    a > b
1477}
1478
1479/// Signed comparison of bytes arrays
1480fn compare_greater_byte_array_decimals(a: &[u8], b: &[u8]) -> bool {
1481    let a_length = a.len();
1482    let b_length = b.len();
1483
1484    if a_length == 0 || b_length == 0 {
1485        return a_length > 0;
1486    }
1487
1488    let first_a: u8 = a[0];
1489    let first_b: u8 = b[0];
1490
1491    // We can short circuit for different signed numbers or
1492    // for equal length bytes arrays that have different first bytes.
1493    // The equality requirement is necessary for sign extension cases.
1494    // 0xFF10 should be equal to 0x10 (due to big endian sign extension).
1495    if (0x80 & first_a) != (0x80 & first_b) || (a_length == b_length && first_a != first_b) {
1496        return (first_a as i8) > (first_b as i8);
1497    }
1498
1499    // When the lengths are unequal and the numbers are of the same
1500    // sign we need to do comparison by sign extending the shorter
1501    // value first, and once we get to equal sized arrays, lexicographical
1502    // unsigned comparison of everything but the first byte is sufficient.
1503
1504    let extension: u8 = if (first_a as i8) < 0 { 0xFF } else { 0 };
1505
1506    if a_length != b_length {
1507        let not_equal = if a_length > b_length {
1508            let lead_length = a_length - b_length;
1509            a[0..lead_length].iter().any(|&x| x != extension)
1510        } else {
1511            let lead_length = b_length - a_length;
1512            b[0..lead_length].iter().any(|&x| x != extension)
1513        };
1514
1515        if not_equal {
1516            let negative_values: bool = (first_a as i8) < 0;
1517            let a_longer: bool = a_length > b_length;
1518            return if negative_values { !a_longer } else { a_longer };
1519        }
1520    }
1521
1522    (a[1..]) > (b[1..])
1523}
1524
1525/// Truncate a UTF-8 slice to the longest prefix that is still a valid UTF-8 string,
1526/// while being less than `length` bytes and non-empty. Returns `None` if truncation
1527/// is not possible within those constraints.
1528///
1529/// The caller guarantees that data.len() > length.
1530fn truncate_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1531    let split = (1..=length).rfind(|x| data.is_char_boundary(*x))?;
1532    Some(data.as_bytes()[..split].to_vec())
1533}
1534
1535/// Truncate a UTF-8 slice and increment it's final character. The returned value is the
1536/// longest such slice that is still a valid UTF-8 string while being less than `length`
1537/// bytes and non-empty. Returns `None` if no such transformation is possible.
1538///
1539/// The caller guarantees that data.len() > length.
1540fn truncate_and_increment_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1541    // UTF-8 is max 4 bytes, so start search 3 back from desired length
1542    let lower_bound = length.saturating_sub(3);
1543    let split = (lower_bound..=length).rfind(|x| data.is_char_boundary(*x))?;
1544    increment_utf8(data.get(..split)?)
1545}
1546
1547/// Increment the final character in a UTF-8 string in such a way that the returned result
1548/// is still a valid UTF-8 string. The returned string may be shorter than the input if the
1549/// last character(s) cannot be incremented (due to overflow or producing invalid code points).
1550/// Returns `None` if the string cannot be incremented.
1551///
1552/// Note that this implementation will not promote an N-byte code point to (N+1) bytes.
1553fn increment_utf8(data: &str) -> Option<Vec<u8>> {
1554    for (idx, original_char) in data.char_indices().rev() {
1555        let original_len = original_char.len_utf8();
1556        if let Some(next_char) = char::from_u32(original_char as u32 + 1) {
1557            // do not allow increasing byte width of incremented char
1558            if next_char.len_utf8() == original_len {
1559                let mut result = data.as_bytes()[..idx + original_len].to_vec();
1560                next_char.encode_utf8(&mut result[idx..]);
1561                return Some(result);
1562            }
1563        }
1564    }
1565
1566    None
1567}
1568
1569/// Try and increment the bytes from right to left.
1570///
1571/// Returns `None` if all bytes are set to `u8::MAX`.
1572fn increment(mut data: Vec<u8>) -> Option<Vec<u8>> {
1573    for byte in data.iter_mut().rev() {
1574        let (incremented, overflow) = byte.overflowing_add(1);
1575        *byte = incremented;
1576
1577        if !overflow {
1578            return Some(data);
1579        }
1580    }
1581
1582    None
1583}
1584
1585#[cfg(test)]
1586mod tests {
1587    use crate::{
1588        file::{properties::DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, writer::SerializedFileWriter},
1589        schema::parser::parse_message_type,
1590    };
1591    use core::str;
1592    use rand::distr::uniform::SampleUniform;
1593    use std::{fs::File, sync::Arc};
1594
1595    use crate::column::{
1596        page::PageReader,
1597        reader::{get_column_reader, get_typed_column_reader, ColumnReaderImpl},
1598    };
1599    use crate::file::writer::TrackedWrite;
1600    use crate::file::{
1601        properties::ReaderProperties, reader::SerializedPageReader, writer::SerializedPageWriter,
1602    };
1603    use crate::schema::types::{ColumnPath, Type as SchemaType};
1604    use crate::util::test_common::rand_gen::random_numbers_range;
1605
1606    use super::*;
1607
1608    #[test]
1609    fn test_column_writer_inconsistent_def_rep_length() {
1610        let page_writer = get_test_page_writer();
1611        let props = Default::default();
1612        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 1, props);
1613        let res = writer.write_batch(&[1, 2, 3, 4], Some(&[1, 1, 1]), Some(&[0, 0]));
1614        assert!(res.is_err());
1615        if let Err(err) = res {
1616            assert_eq!(
1617                format!("{err}"),
1618                "Parquet error: Inconsistent length of definition and repetition levels: 3 != 2"
1619            );
1620        }
1621    }
1622
1623    #[test]
1624    fn test_column_writer_invalid_def_levels() {
1625        let page_writer = get_test_page_writer();
1626        let props = Default::default();
1627        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1628        let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1629        assert!(res.is_err());
1630        if let Err(err) = res {
1631            assert_eq!(
1632                format!("{err}"),
1633                "Parquet error: Definition levels are required, because max definition level = 1"
1634            );
1635        }
1636    }
1637
1638    #[test]
1639    fn test_column_writer_invalid_rep_levels() {
1640        let page_writer = get_test_page_writer();
1641        let props = Default::default();
1642        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 1, props);
1643        let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1644        assert!(res.is_err());
1645        if let Err(err) = res {
1646            assert_eq!(
1647                format!("{err}"),
1648                "Parquet error: Repetition levels are required, because max repetition level = 1"
1649            );
1650        }
1651    }
1652
1653    #[test]
1654    fn test_column_writer_not_enough_values_to_write() {
1655        let page_writer = get_test_page_writer();
1656        let props = Default::default();
1657        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1658        let res = writer.write_batch(&[1, 2], Some(&[1, 1, 1, 1]), None);
1659        assert!(res.is_err());
1660        if let Err(err) = res {
1661            assert_eq!(
1662                format!("{err}"),
1663                "Parquet error: Expected to write 4 values, but have only 2"
1664            );
1665        }
1666    }
1667
1668    #[test]
1669    fn test_column_writer_write_only_one_dictionary_page() {
1670        let page_writer = get_test_page_writer();
1671        let props = Default::default();
1672        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1673        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1674        // First page should be correctly written.
1675        writer.add_data_page().unwrap();
1676        writer.write_dictionary_page().unwrap();
1677        let err = writer.write_dictionary_page().unwrap_err().to_string();
1678        assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1679    }
1680
1681    #[test]
1682    fn test_column_writer_error_when_writing_disabled_dictionary() {
1683        let page_writer = get_test_page_writer();
1684        let props = Arc::new(
1685            WriterProperties::builder()
1686                .set_dictionary_enabled(false)
1687                .build(),
1688        );
1689        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1690        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1691        let err = writer.write_dictionary_page().unwrap_err().to_string();
1692        assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1693    }
1694
1695    #[test]
1696    fn test_column_writer_boolean_type_does_not_support_dictionary() {
1697        let page_writer = get_test_page_writer();
1698        let props = Arc::new(
1699            WriterProperties::builder()
1700                .set_dictionary_enabled(true)
1701                .build(),
1702        );
1703        let mut writer = get_test_column_writer::<BoolType>(page_writer, 0, 0, props);
1704        writer
1705            .write_batch(&[true, false, true, false], None, None)
1706            .unwrap();
1707
1708        let r = writer.close().unwrap();
1709        // PlainEncoder uses bit writer to write boolean values, which all fit into 1
1710        // byte.
1711        assert_eq!(r.bytes_written, 1);
1712        assert_eq!(r.rows_written, 4);
1713
1714        let metadata = r.metadata;
1715        assert_eq!(metadata.encodings(), &vec![Encoding::PLAIN, Encoding::RLE]);
1716        assert_eq!(metadata.num_values(), 4); // just values
1717        assert_eq!(metadata.dictionary_page_offset(), None);
1718    }
1719
1720    #[test]
1721    fn test_column_writer_default_encoding_support_bool() {
1722        check_encoding_write_support::<BoolType>(
1723            WriterVersion::PARQUET_1_0,
1724            true,
1725            &[true, false],
1726            None,
1727            &[Encoding::PLAIN, Encoding::RLE],
1728            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1729        );
1730        check_encoding_write_support::<BoolType>(
1731            WriterVersion::PARQUET_1_0,
1732            false,
1733            &[true, false],
1734            None,
1735            &[Encoding::PLAIN, Encoding::RLE],
1736            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1737        );
1738        check_encoding_write_support::<BoolType>(
1739            WriterVersion::PARQUET_2_0,
1740            true,
1741            &[true, false],
1742            None,
1743            &[Encoding::RLE],
1744            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE, 1)],
1745        );
1746        check_encoding_write_support::<BoolType>(
1747            WriterVersion::PARQUET_2_0,
1748            false,
1749            &[true, false],
1750            None,
1751            &[Encoding::RLE],
1752            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE, 1)],
1753        );
1754    }
1755
1756    #[test]
1757    fn test_column_writer_default_encoding_support_int32() {
1758        check_encoding_write_support::<Int32Type>(
1759            WriterVersion::PARQUET_1_0,
1760            true,
1761            &[1, 2],
1762            Some(0),
1763            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1764            &[
1765                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1766                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1767            ],
1768        );
1769        check_encoding_write_support::<Int32Type>(
1770            WriterVersion::PARQUET_1_0,
1771            false,
1772            &[1, 2],
1773            None,
1774            &[Encoding::PLAIN, Encoding::RLE],
1775            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1776        );
1777        check_encoding_write_support::<Int32Type>(
1778            WriterVersion::PARQUET_2_0,
1779            true,
1780            &[1, 2],
1781            Some(0),
1782            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1783            &[
1784                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1785                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1786            ],
1787        );
1788        check_encoding_write_support::<Int32Type>(
1789            WriterVersion::PARQUET_2_0,
1790            false,
1791            &[1, 2],
1792            None,
1793            &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
1794            &[encoding_stats(
1795                PageType::DATA_PAGE_V2,
1796                Encoding::DELTA_BINARY_PACKED,
1797                1,
1798            )],
1799        );
1800    }
1801
1802    #[test]
1803    fn test_column_writer_default_encoding_support_int64() {
1804        check_encoding_write_support::<Int64Type>(
1805            WriterVersion::PARQUET_1_0,
1806            true,
1807            &[1, 2],
1808            Some(0),
1809            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1810            &[
1811                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1812                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1813            ],
1814        );
1815        check_encoding_write_support::<Int64Type>(
1816            WriterVersion::PARQUET_1_0,
1817            false,
1818            &[1, 2],
1819            None,
1820            &[Encoding::PLAIN, Encoding::RLE],
1821            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1822        );
1823        check_encoding_write_support::<Int64Type>(
1824            WriterVersion::PARQUET_2_0,
1825            true,
1826            &[1, 2],
1827            Some(0),
1828            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1829            &[
1830                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1831                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1832            ],
1833        );
1834        check_encoding_write_support::<Int64Type>(
1835            WriterVersion::PARQUET_2_0,
1836            false,
1837            &[1, 2],
1838            None,
1839            &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
1840            &[encoding_stats(
1841                PageType::DATA_PAGE_V2,
1842                Encoding::DELTA_BINARY_PACKED,
1843                1,
1844            )],
1845        );
1846    }
1847
1848    #[test]
1849    fn test_column_writer_default_encoding_support_int96() {
1850        check_encoding_write_support::<Int96Type>(
1851            WriterVersion::PARQUET_1_0,
1852            true,
1853            &[Int96::from(vec![1, 2, 3])],
1854            Some(0),
1855            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1856            &[
1857                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1858                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1859            ],
1860        );
1861        check_encoding_write_support::<Int96Type>(
1862            WriterVersion::PARQUET_1_0,
1863            false,
1864            &[Int96::from(vec![1, 2, 3])],
1865            None,
1866            &[Encoding::PLAIN, Encoding::RLE],
1867            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1868        );
1869        check_encoding_write_support::<Int96Type>(
1870            WriterVersion::PARQUET_2_0,
1871            true,
1872            &[Int96::from(vec![1, 2, 3])],
1873            Some(0),
1874            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1875            &[
1876                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1877                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1878            ],
1879        );
1880        check_encoding_write_support::<Int96Type>(
1881            WriterVersion::PARQUET_2_0,
1882            false,
1883            &[Int96::from(vec![1, 2, 3])],
1884            None,
1885            &[Encoding::PLAIN, Encoding::RLE],
1886            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1887        );
1888    }
1889
1890    #[test]
1891    fn test_column_writer_default_encoding_support_float() {
1892        check_encoding_write_support::<FloatType>(
1893            WriterVersion::PARQUET_1_0,
1894            true,
1895            &[1.0, 2.0],
1896            Some(0),
1897            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1898            &[
1899                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1900                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1901            ],
1902        );
1903        check_encoding_write_support::<FloatType>(
1904            WriterVersion::PARQUET_1_0,
1905            false,
1906            &[1.0, 2.0],
1907            None,
1908            &[Encoding::PLAIN, Encoding::RLE],
1909            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1910        );
1911        check_encoding_write_support::<FloatType>(
1912            WriterVersion::PARQUET_2_0,
1913            true,
1914            &[1.0, 2.0],
1915            Some(0),
1916            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1917            &[
1918                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1919                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1920            ],
1921        );
1922        check_encoding_write_support::<FloatType>(
1923            WriterVersion::PARQUET_2_0,
1924            false,
1925            &[1.0, 2.0],
1926            None,
1927            &[Encoding::PLAIN, Encoding::RLE],
1928            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1929        );
1930    }
1931
1932    #[test]
1933    fn test_column_writer_default_encoding_support_double() {
1934        check_encoding_write_support::<DoubleType>(
1935            WriterVersion::PARQUET_1_0,
1936            true,
1937            &[1.0, 2.0],
1938            Some(0),
1939            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1940            &[
1941                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1942                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1943            ],
1944        );
1945        check_encoding_write_support::<DoubleType>(
1946            WriterVersion::PARQUET_1_0,
1947            false,
1948            &[1.0, 2.0],
1949            None,
1950            &[Encoding::PLAIN, Encoding::RLE],
1951            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1952        );
1953        check_encoding_write_support::<DoubleType>(
1954            WriterVersion::PARQUET_2_0,
1955            true,
1956            &[1.0, 2.0],
1957            Some(0),
1958            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1959            &[
1960                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1961                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1962            ],
1963        );
1964        check_encoding_write_support::<DoubleType>(
1965            WriterVersion::PARQUET_2_0,
1966            false,
1967            &[1.0, 2.0],
1968            None,
1969            &[Encoding::PLAIN, Encoding::RLE],
1970            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1971        );
1972    }
1973
1974    #[test]
1975    fn test_column_writer_default_encoding_support_byte_array() {
1976        check_encoding_write_support::<ByteArrayType>(
1977            WriterVersion::PARQUET_1_0,
1978            true,
1979            &[ByteArray::from(vec![1u8])],
1980            Some(0),
1981            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1982            &[
1983                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1984                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1985            ],
1986        );
1987        check_encoding_write_support::<ByteArrayType>(
1988            WriterVersion::PARQUET_1_0,
1989            false,
1990            &[ByteArray::from(vec![1u8])],
1991            None,
1992            &[Encoding::PLAIN, Encoding::RLE],
1993            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1994        );
1995        check_encoding_write_support::<ByteArrayType>(
1996            WriterVersion::PARQUET_2_0,
1997            true,
1998            &[ByteArray::from(vec![1u8])],
1999            Some(0),
2000            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2001            &[
2002                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2003                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2004            ],
2005        );
2006        check_encoding_write_support::<ByteArrayType>(
2007            WriterVersion::PARQUET_2_0,
2008            false,
2009            &[ByteArray::from(vec![1u8])],
2010            None,
2011            &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
2012            &[encoding_stats(
2013                PageType::DATA_PAGE_V2,
2014                Encoding::DELTA_BYTE_ARRAY,
2015                1,
2016            )],
2017        );
2018    }
2019
2020    #[test]
2021    fn test_column_writer_default_encoding_support_fixed_len_byte_array() {
2022        check_encoding_write_support::<FixedLenByteArrayType>(
2023            WriterVersion::PARQUET_1_0,
2024            true,
2025            &[ByteArray::from(vec![1u8]).into()],
2026            None,
2027            &[Encoding::PLAIN, Encoding::RLE],
2028            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2029        );
2030        check_encoding_write_support::<FixedLenByteArrayType>(
2031            WriterVersion::PARQUET_1_0,
2032            false,
2033            &[ByteArray::from(vec![1u8]).into()],
2034            None,
2035            &[Encoding::PLAIN, Encoding::RLE],
2036            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2037        );
2038        check_encoding_write_support::<FixedLenByteArrayType>(
2039            WriterVersion::PARQUET_2_0,
2040            true,
2041            &[ByteArray::from(vec![1u8]).into()],
2042            Some(0),
2043            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2044            &[
2045                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2046                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2047            ],
2048        );
2049        check_encoding_write_support::<FixedLenByteArrayType>(
2050            WriterVersion::PARQUET_2_0,
2051            false,
2052            &[ByteArray::from(vec![1u8]).into()],
2053            None,
2054            &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
2055            &[encoding_stats(
2056                PageType::DATA_PAGE_V2,
2057                Encoding::DELTA_BYTE_ARRAY,
2058                1,
2059            )],
2060        );
2061    }
2062
2063    #[test]
2064    fn test_column_writer_check_metadata() {
2065        let page_writer = get_test_page_writer();
2066        let props = Default::default();
2067        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2068        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2069
2070        let r = writer.close().unwrap();
2071        assert_eq!(r.bytes_written, 20);
2072        assert_eq!(r.rows_written, 4);
2073
2074        let metadata = r.metadata;
2075        assert_eq!(
2076            metadata.encodings(),
2077            &vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
2078        );
2079        assert_eq!(metadata.num_values(), 4);
2080        assert_eq!(metadata.compressed_size(), 20);
2081        assert_eq!(metadata.uncompressed_size(), 20);
2082        assert_eq!(metadata.data_page_offset(), 0);
2083        assert_eq!(metadata.dictionary_page_offset(), Some(0));
2084        if let Some(stats) = metadata.statistics() {
2085            assert_eq!(stats.null_count_opt(), Some(0));
2086            assert_eq!(stats.distinct_count_opt(), None);
2087            if let Statistics::Int32(stats) = stats {
2088                assert_eq!(stats.min_opt().unwrap(), &1);
2089                assert_eq!(stats.max_opt().unwrap(), &4);
2090            } else {
2091                panic!("expecting Statistics::Int32");
2092            }
2093        } else {
2094            panic!("metadata missing statistics");
2095        }
2096    }
2097
2098    #[test]
2099    fn test_column_writer_check_byte_array_min_max() {
2100        let page_writer = get_test_page_writer();
2101        let props = Default::default();
2102        let mut writer = get_test_decimals_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
2103        writer
2104            .write_batch(
2105                &[
2106                    ByteArray::from(vec![
2107                        255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
2108                        35u8, 231u8, 90u8, 0u8, 0u8,
2109                    ]),
2110                    ByteArray::from(vec![
2111                        255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 228u8, 62u8, 146u8,
2112                        152u8, 177u8, 56u8, 0u8, 0u8,
2113                    ]),
2114                    ByteArray::from(vec![
2115                        0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8,
2116                        0u8,
2117                    ]),
2118                    ByteArray::from(vec![
2119                        0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
2120                        44u8, 0u8, 0u8,
2121                    ]),
2122                ],
2123                None,
2124                None,
2125            )
2126            .unwrap();
2127        let metadata = writer.close().unwrap().metadata;
2128        if let Some(stats) = metadata.statistics() {
2129            if let Statistics::ByteArray(stats) = stats {
2130                assert_eq!(
2131                    stats.min_opt().unwrap(),
2132                    &ByteArray::from(vec![
2133                        255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
2134                        35u8, 231u8, 90u8, 0u8, 0u8,
2135                    ])
2136                );
2137                assert_eq!(
2138                    stats.max_opt().unwrap(),
2139                    &ByteArray::from(vec![
2140                        0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
2141                        44u8, 0u8, 0u8,
2142                    ])
2143                );
2144            } else {
2145                panic!("expecting Statistics::ByteArray");
2146            }
2147        } else {
2148            panic!("metadata missing statistics");
2149        }
2150    }
2151
2152    #[test]
2153    fn test_column_writer_uint32_converted_type_min_max() {
2154        let page_writer = get_test_page_writer();
2155        let props = Default::default();
2156        let mut writer = get_test_unsigned_int_given_as_converted_column_writer::<Int32Type>(
2157            page_writer,
2158            0,
2159            0,
2160            props,
2161        );
2162        writer.write_batch(&[0, 1, 2, 3, 4, 5], None, None).unwrap();
2163        let metadata = writer.close().unwrap().metadata;
2164        if let Some(stats) = metadata.statistics() {
2165            if let Statistics::Int32(stats) = stats {
2166                assert_eq!(stats.min_opt().unwrap(), &0,);
2167                assert_eq!(stats.max_opt().unwrap(), &5,);
2168            } else {
2169                panic!("expecting Statistics::Int32");
2170            }
2171        } else {
2172            panic!("metadata missing statistics");
2173        }
2174    }
2175
2176    #[test]
2177    fn test_column_writer_precalculated_statistics() {
2178        let page_writer = get_test_page_writer();
2179        let props = Arc::new(
2180            WriterProperties::builder()
2181                .set_statistics_enabled(EnabledStatistics::Chunk)
2182                .build(),
2183        );
2184        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2185        writer
2186            .write_batch_with_statistics(
2187                &[1, 2, 3, 4],
2188                None,
2189                None,
2190                Some(&-17),
2191                Some(&9000),
2192                Some(55),
2193            )
2194            .unwrap();
2195
2196        let r = writer.close().unwrap();
2197        assert_eq!(r.bytes_written, 20);
2198        assert_eq!(r.rows_written, 4);
2199
2200        let metadata = r.metadata;
2201        assert_eq!(
2202            metadata.encodings(),
2203            &vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
2204        );
2205        assert_eq!(metadata.num_values(), 4);
2206        assert_eq!(metadata.compressed_size(), 20);
2207        assert_eq!(metadata.uncompressed_size(), 20);
2208        assert_eq!(metadata.data_page_offset(), 0);
2209        assert_eq!(metadata.dictionary_page_offset(), Some(0));
2210        if let Some(stats) = metadata.statistics() {
2211            assert_eq!(stats.null_count_opt(), Some(0));
2212            assert_eq!(stats.distinct_count_opt().unwrap_or(0), 55);
2213            if let Statistics::Int32(stats) = stats {
2214                assert_eq!(stats.min_opt().unwrap(), &-17);
2215                assert_eq!(stats.max_opt().unwrap(), &9000);
2216            } else {
2217                panic!("expecting Statistics::Int32");
2218            }
2219        } else {
2220            panic!("metadata missing statistics");
2221        }
2222    }
2223
2224    #[test]
2225    fn test_mixed_precomputed_statistics() {
2226        let mut buf = Vec::with_capacity(100);
2227        let mut write = TrackedWrite::new(&mut buf);
2228        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2229        let props = Arc::new(
2230            WriterProperties::builder()
2231                .set_write_page_header_statistics(true)
2232                .build(),
2233        );
2234        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2235
2236        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2237        writer
2238            .write_batch_with_statistics(&[5, 6, 7], None, None, Some(&5), Some(&7), Some(3))
2239            .unwrap();
2240
2241        let r = writer.close().unwrap();
2242
2243        let stats = r.metadata.statistics().unwrap();
2244        assert_eq!(stats.min_bytes_opt().unwrap(), 1_i32.to_le_bytes());
2245        assert_eq!(stats.max_bytes_opt().unwrap(), 7_i32.to_le_bytes());
2246        assert_eq!(stats.null_count_opt(), Some(0));
2247        assert!(stats.distinct_count_opt().is_none());
2248
2249        drop(write);
2250
2251        let props = ReaderProperties::builder()
2252            .set_backward_compatible_lz4(false)
2253            .build();
2254        let reader = SerializedPageReader::new_with_properties(
2255            Arc::new(Bytes::from(buf)),
2256            &r.metadata,
2257            r.rows_written as usize,
2258            None,
2259            Arc::new(props),
2260        )
2261        .unwrap();
2262
2263        let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2264        assert_eq!(pages.len(), 2);
2265
2266        assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2267        assert_eq!(pages[1].page_type(), PageType::DATA_PAGE);
2268
2269        let page_statistics = pages[1].statistics().unwrap();
2270        assert_eq!(
2271            page_statistics.min_bytes_opt().unwrap(),
2272            1_i32.to_le_bytes()
2273        );
2274        assert_eq!(
2275            page_statistics.max_bytes_opt().unwrap(),
2276            7_i32.to_le_bytes()
2277        );
2278        assert_eq!(page_statistics.null_count_opt(), Some(0));
2279        assert!(page_statistics.distinct_count_opt().is_none());
2280    }
2281
2282    #[test]
2283    fn test_disabled_statistics() {
2284        let mut buf = Vec::with_capacity(100);
2285        let mut write = TrackedWrite::new(&mut buf);
2286        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2287        let props = WriterProperties::builder()
2288            .set_statistics_enabled(EnabledStatistics::None)
2289            .set_writer_version(WriterVersion::PARQUET_2_0)
2290            .build();
2291        let props = Arc::new(props);
2292
2293        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
2294        writer
2295            .write_batch(&[1, 2, 3, 4], Some(&[1, 0, 0, 1, 1, 1]), None)
2296            .unwrap();
2297
2298        let r = writer.close().unwrap();
2299        assert!(r.metadata.statistics().is_none());
2300
2301        drop(write);
2302
2303        let props = ReaderProperties::builder()
2304            .set_backward_compatible_lz4(false)
2305            .build();
2306        let reader = SerializedPageReader::new_with_properties(
2307            Arc::new(Bytes::from(buf)),
2308            &r.metadata,
2309            r.rows_written as usize,
2310            None,
2311            Arc::new(props),
2312        )
2313        .unwrap();
2314
2315        let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2316        assert_eq!(pages.len(), 2);
2317
2318        assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2319        assert_eq!(pages[1].page_type(), PageType::DATA_PAGE_V2);
2320
2321        match &pages[1] {
2322            Page::DataPageV2 {
2323                num_values,
2324                num_nulls,
2325                num_rows,
2326                statistics,
2327                ..
2328            } => {
2329                assert_eq!(*num_values, 6);
2330                assert_eq!(*num_nulls, 2);
2331                assert_eq!(*num_rows, 6);
2332                assert!(statistics.is_none());
2333            }
2334            _ => unreachable!(),
2335        }
2336    }
2337
2338    #[test]
2339    fn test_column_writer_empty_column_roundtrip() {
2340        let props = Default::default();
2341        column_roundtrip::<Int32Type>(props, &[], None, None);
2342    }
2343
2344    #[test]
2345    fn test_column_writer_non_nullable_values_roundtrip() {
2346        let props = Default::default();
2347        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 0, 0);
2348    }
2349
2350    #[test]
2351    fn test_column_writer_nullable_non_repeated_values_roundtrip() {
2352        let props = Default::default();
2353        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 0);
2354    }
2355
2356    #[test]
2357    fn test_column_writer_nullable_repeated_values_roundtrip() {
2358        let props = Default::default();
2359        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2360    }
2361
2362    #[test]
2363    fn test_column_writer_dictionary_fallback_small_data_page() {
2364        let props = WriterProperties::builder()
2365            .set_dictionary_page_size_limit(32)
2366            .set_data_page_size_limit(32)
2367            .build();
2368        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2369    }
2370
2371    #[test]
2372    fn test_column_writer_small_write_batch_size() {
2373        for i in &[1usize, 2, 5, 10, 11, 1023] {
2374            let props = WriterProperties::builder().set_write_batch_size(*i).build();
2375
2376            column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2377        }
2378    }
2379
2380    #[test]
2381    fn test_column_writer_dictionary_disabled_v1() {
2382        let props = WriterProperties::builder()
2383            .set_writer_version(WriterVersion::PARQUET_1_0)
2384            .set_dictionary_enabled(false)
2385            .build();
2386        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2387    }
2388
2389    #[test]
2390    fn test_column_writer_dictionary_disabled_v2() {
2391        let props = WriterProperties::builder()
2392            .set_writer_version(WriterVersion::PARQUET_2_0)
2393            .set_dictionary_enabled(false)
2394            .build();
2395        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2396    }
2397
2398    #[test]
2399    fn test_column_writer_compression_v1() {
2400        let props = WriterProperties::builder()
2401            .set_writer_version(WriterVersion::PARQUET_1_0)
2402            .set_compression(Compression::SNAPPY)
2403            .build();
2404        column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2405    }
2406
2407    #[test]
2408    fn test_column_writer_compression_v2() {
2409        let props = WriterProperties::builder()
2410            .set_writer_version(WriterVersion::PARQUET_2_0)
2411            .set_compression(Compression::SNAPPY)
2412            .build();
2413        column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2414    }
2415
2416    #[test]
2417    fn test_column_writer_add_data_pages_with_dict() {
2418        // ARROW-5129: Test verifies that we add data page in case of dictionary encoding
2419        // and no fallback occurred so far.
2420        let mut file = tempfile::tempfile().unwrap();
2421        let mut write = TrackedWrite::new(&mut file);
2422        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2423        let props = Arc::new(
2424            WriterProperties::builder()
2425                .set_data_page_size_limit(10)
2426                .set_write_batch_size(3) // write 3 values at a time
2427                .build(),
2428        );
2429        let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
2430        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2431        writer.write_batch(data, None, None).unwrap();
2432        let r = writer.close().unwrap();
2433
2434        drop(write);
2435
2436        // Read pages and check the sequence
2437        let props = ReaderProperties::builder()
2438            .set_backward_compatible_lz4(false)
2439            .build();
2440        let mut page_reader = Box::new(
2441            SerializedPageReader::new_with_properties(
2442                Arc::new(file),
2443                &r.metadata,
2444                r.rows_written as usize,
2445                None,
2446                Arc::new(props),
2447            )
2448            .unwrap(),
2449        );
2450        let mut res = Vec::new();
2451        while let Some(page) = page_reader.get_next_page().unwrap() {
2452            res.push((page.page_type(), page.num_values(), page.buffer().len()));
2453        }
2454        assert_eq!(
2455            res,
2456            vec![
2457                (PageType::DICTIONARY_PAGE, 10, 40),
2458                (PageType::DATA_PAGE, 9, 10),
2459                (PageType::DATA_PAGE, 1, 3),
2460            ]
2461        );
2462        assert_eq!(
2463            r.metadata.page_encoding_stats(),
2464            Some(&vec![
2465                PageEncodingStats {
2466                    page_type: PageType::DICTIONARY_PAGE,
2467                    encoding: Encoding::PLAIN,
2468                    count: 1
2469                },
2470                PageEncodingStats {
2471                    page_type: PageType::DATA_PAGE,
2472                    encoding: Encoding::RLE_DICTIONARY,
2473                    count: 2,
2474                }
2475            ])
2476        );
2477    }
2478
2479    #[test]
2480    fn test_bool_statistics() {
2481        let stats = statistics_roundtrip::<BoolType>(&[true, false, false, true]);
2482        // Booleans have an unsigned sort order and so are not compatible
2483        // with the deprecated `min` and `max` statistics
2484        assert!(!stats.is_min_max_backwards_compatible());
2485        if let Statistics::Boolean(stats) = stats {
2486            assert_eq!(stats.min_opt().unwrap(), &false);
2487            assert_eq!(stats.max_opt().unwrap(), &true);
2488        } else {
2489            panic!("expecting Statistics::Boolean, got {stats:?}");
2490        }
2491    }
2492
2493    #[test]
2494    fn test_int32_statistics() {
2495        let stats = statistics_roundtrip::<Int32Type>(&[-1, 3, -2, 2]);
2496        assert!(stats.is_min_max_backwards_compatible());
2497        if let Statistics::Int32(stats) = stats {
2498            assert_eq!(stats.min_opt().unwrap(), &-2);
2499            assert_eq!(stats.max_opt().unwrap(), &3);
2500        } else {
2501            panic!("expecting Statistics::Int32, got {stats:?}");
2502        }
2503    }
2504
2505    #[test]
2506    fn test_int64_statistics() {
2507        let stats = statistics_roundtrip::<Int64Type>(&[-1, 3, -2, 2]);
2508        assert!(stats.is_min_max_backwards_compatible());
2509        if let Statistics::Int64(stats) = stats {
2510            assert_eq!(stats.min_opt().unwrap(), &-2);
2511            assert_eq!(stats.max_opt().unwrap(), &3);
2512        } else {
2513            panic!("expecting Statistics::Int64, got {stats:?}");
2514        }
2515    }
2516
2517    #[test]
2518    fn test_int96_statistics() {
2519        let input = vec![
2520            Int96::from(vec![1, 20, 30]),
2521            Int96::from(vec![3, 20, 10]),
2522            Int96::from(vec![0, 20, 30]),
2523            Int96::from(vec![2, 20, 30]),
2524        ]
2525        .into_iter()
2526        .collect::<Vec<Int96>>();
2527
2528        let stats = statistics_roundtrip::<Int96Type>(&input);
2529        assert!(!stats.is_min_max_backwards_compatible());
2530        if let Statistics::Int96(stats) = stats {
2531            assert_eq!(stats.min_opt().unwrap(), &Int96::from(vec![3, 20, 10]));
2532            assert_eq!(stats.max_opt().unwrap(), &Int96::from(vec![2, 20, 30]));
2533        } else {
2534            panic!("expecting Statistics::Int96, got {stats:?}");
2535        }
2536    }
2537
2538    #[test]
2539    fn test_float_statistics() {
2540        let stats = statistics_roundtrip::<FloatType>(&[-1.0, 3.0, -2.0, 2.0]);
2541        assert!(stats.is_min_max_backwards_compatible());
2542        if let Statistics::Float(stats) = stats {
2543            assert_eq!(stats.min_opt().unwrap(), &-2.0);
2544            assert_eq!(stats.max_opt().unwrap(), &3.0);
2545        } else {
2546            panic!("expecting Statistics::Float, got {stats:?}");
2547        }
2548    }
2549
2550    #[test]
2551    fn test_double_statistics() {
2552        let stats = statistics_roundtrip::<DoubleType>(&[-1.0, 3.0, -2.0, 2.0]);
2553        assert!(stats.is_min_max_backwards_compatible());
2554        if let Statistics::Double(stats) = stats {
2555            assert_eq!(stats.min_opt().unwrap(), &-2.0);
2556            assert_eq!(stats.max_opt().unwrap(), &3.0);
2557        } else {
2558            panic!("expecting Statistics::Double, got {stats:?}");
2559        }
2560    }
2561
2562    #[test]
2563    fn test_byte_array_statistics() {
2564        let input = ["aawaa", "zz", "aaw", "m", "qrs"]
2565            .iter()
2566            .map(|&s| s.into())
2567            .collect::<Vec<_>>();
2568
2569        let stats = statistics_roundtrip::<ByteArrayType>(&input);
2570        assert!(!stats.is_min_max_backwards_compatible());
2571        if let Statistics::ByteArray(stats) = stats {
2572            assert_eq!(stats.min_opt().unwrap(), &ByteArray::from("aaw"));
2573            assert_eq!(stats.max_opt().unwrap(), &ByteArray::from("zz"));
2574        } else {
2575            panic!("expecting Statistics::ByteArray, got {stats:?}");
2576        }
2577    }
2578
2579    #[test]
2580    fn test_fixed_len_byte_array_statistics() {
2581        let input = ["aawaa", "zz   ", "aaw  ", "m    ", "qrs  "]
2582            .iter()
2583            .map(|&s| ByteArray::from(s).into())
2584            .collect::<Vec<_>>();
2585
2586        let stats = statistics_roundtrip::<FixedLenByteArrayType>(&input);
2587        assert!(!stats.is_min_max_backwards_compatible());
2588        if let Statistics::FixedLenByteArray(stats) = stats {
2589            let expected_min: FixedLenByteArray = ByteArray::from("aaw  ").into();
2590            assert_eq!(stats.min_opt().unwrap(), &expected_min);
2591            let expected_max: FixedLenByteArray = ByteArray::from("zz   ").into();
2592            assert_eq!(stats.max_opt().unwrap(), &expected_max);
2593        } else {
2594            panic!("expecting Statistics::FixedLenByteArray, got {stats:?}");
2595        }
2596    }
2597
2598    #[test]
2599    fn test_column_writer_check_float16_min_max() {
2600        let input = [
2601            -f16::ONE,
2602            f16::from_f32(3.0),
2603            -f16::from_f32(2.0),
2604            f16::from_f32(2.0),
2605        ]
2606        .into_iter()
2607        .map(|s| ByteArray::from(s).into())
2608        .collect::<Vec<_>>();
2609
2610        let stats = float16_statistics_roundtrip(&input);
2611        assert!(stats.is_min_max_backwards_compatible());
2612        assert_eq!(
2613            stats.min_opt().unwrap(),
2614            &ByteArray::from(-f16::from_f32(2.0))
2615        );
2616        assert_eq!(
2617            stats.max_opt().unwrap(),
2618            &ByteArray::from(f16::from_f32(3.0))
2619        );
2620    }
2621
2622    #[test]
2623    fn test_column_writer_check_float16_nan_middle() {
2624        let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
2625            .into_iter()
2626            .map(|s| ByteArray::from(s).into())
2627            .collect::<Vec<_>>();
2628
2629        let stats = float16_statistics_roundtrip(&input);
2630        assert!(stats.is_min_max_backwards_compatible());
2631        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2632        assert_eq!(
2633            stats.max_opt().unwrap(),
2634            &ByteArray::from(f16::ONE + f16::ONE)
2635        );
2636    }
2637
2638    #[test]
2639    fn test_float16_statistics_nan_middle() {
2640        let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
2641            .into_iter()
2642            .map(|s| ByteArray::from(s).into())
2643            .collect::<Vec<_>>();
2644
2645        let stats = float16_statistics_roundtrip(&input);
2646        assert!(stats.is_min_max_backwards_compatible());
2647        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2648        assert_eq!(
2649            stats.max_opt().unwrap(),
2650            &ByteArray::from(f16::ONE + f16::ONE)
2651        );
2652    }
2653
2654    #[test]
2655    fn test_float16_statistics_nan_start() {
2656        let input = [f16::NAN, f16::ONE, f16::ONE + f16::ONE]
2657            .into_iter()
2658            .map(|s| ByteArray::from(s).into())
2659            .collect::<Vec<_>>();
2660
2661        let stats = float16_statistics_roundtrip(&input);
2662        assert!(stats.is_min_max_backwards_compatible());
2663        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2664        assert_eq!(
2665            stats.max_opt().unwrap(),
2666            &ByteArray::from(f16::ONE + f16::ONE)
2667        );
2668    }
2669
2670    #[test]
2671    fn test_float16_statistics_nan_only() {
2672        let input = [f16::NAN, f16::NAN]
2673            .into_iter()
2674            .map(|s| ByteArray::from(s).into())
2675            .collect::<Vec<_>>();
2676
2677        let stats = float16_statistics_roundtrip(&input);
2678        assert!(stats.min_bytes_opt().is_none());
2679        assert!(stats.max_bytes_opt().is_none());
2680        assert!(stats.is_min_max_backwards_compatible());
2681    }
2682
2683    #[test]
2684    fn test_float16_statistics_zero_only() {
2685        let input = [f16::ZERO]
2686            .into_iter()
2687            .map(|s| ByteArray::from(s).into())
2688            .collect::<Vec<_>>();
2689
2690        let stats = float16_statistics_roundtrip(&input);
2691        assert!(stats.is_min_max_backwards_compatible());
2692        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2693        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2694    }
2695
2696    #[test]
2697    fn test_float16_statistics_neg_zero_only() {
2698        let input = [f16::NEG_ZERO]
2699            .into_iter()
2700            .map(|s| ByteArray::from(s).into())
2701            .collect::<Vec<_>>();
2702
2703        let stats = float16_statistics_roundtrip(&input);
2704        assert!(stats.is_min_max_backwards_compatible());
2705        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2706        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2707    }
2708
2709    #[test]
2710    fn test_float16_statistics_zero_min() {
2711        let input = [f16::ZERO, f16::ONE, f16::NAN, f16::PI]
2712            .into_iter()
2713            .map(|s| ByteArray::from(s).into())
2714            .collect::<Vec<_>>();
2715
2716        let stats = float16_statistics_roundtrip(&input);
2717        assert!(stats.is_min_max_backwards_compatible());
2718        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2719        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::PI));
2720    }
2721
2722    #[test]
2723    fn test_float16_statistics_neg_zero_max() {
2724        let input = [f16::NEG_ZERO, f16::NEG_ONE, f16::NAN, -f16::PI]
2725            .into_iter()
2726            .map(|s| ByteArray::from(s).into())
2727            .collect::<Vec<_>>();
2728
2729        let stats = float16_statistics_roundtrip(&input);
2730        assert!(stats.is_min_max_backwards_compatible());
2731        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(-f16::PI));
2732        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2733    }
2734
2735    #[test]
2736    fn test_float_statistics_nan_middle() {
2737        let stats = statistics_roundtrip::<FloatType>(&[1.0, f32::NAN, 2.0]);
2738        assert!(stats.is_min_max_backwards_compatible());
2739        if let Statistics::Float(stats) = stats {
2740            assert_eq!(stats.min_opt().unwrap(), &1.0);
2741            assert_eq!(stats.max_opt().unwrap(), &2.0);
2742        } else {
2743            panic!("expecting Statistics::Float");
2744        }
2745    }
2746
2747    #[test]
2748    fn test_float_statistics_nan_start() {
2749        let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, 1.0, 2.0]);
2750        assert!(stats.is_min_max_backwards_compatible());
2751        if let Statistics::Float(stats) = stats {
2752            assert_eq!(stats.min_opt().unwrap(), &1.0);
2753            assert_eq!(stats.max_opt().unwrap(), &2.0);
2754        } else {
2755            panic!("expecting Statistics::Float");
2756        }
2757    }
2758
2759    #[test]
2760    fn test_float_statistics_nan_only() {
2761        let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, f32::NAN]);
2762        assert!(stats.min_bytes_opt().is_none());
2763        assert!(stats.max_bytes_opt().is_none());
2764        assert!(stats.is_min_max_backwards_compatible());
2765        assert!(matches!(stats, Statistics::Float(_)));
2766    }
2767
2768    #[test]
2769    fn test_float_statistics_zero_only() {
2770        let stats = statistics_roundtrip::<FloatType>(&[0.0]);
2771        assert!(stats.is_min_max_backwards_compatible());
2772        if let Statistics::Float(stats) = stats {
2773            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2774            assert!(stats.min_opt().unwrap().is_sign_negative());
2775            assert_eq!(stats.max_opt().unwrap(), &0.0);
2776            assert!(stats.max_opt().unwrap().is_sign_positive());
2777        } else {
2778            panic!("expecting Statistics::Float");
2779        }
2780    }
2781
2782    #[test]
2783    fn test_float_statistics_neg_zero_only() {
2784        let stats = statistics_roundtrip::<FloatType>(&[-0.0]);
2785        assert!(stats.is_min_max_backwards_compatible());
2786        if let Statistics::Float(stats) = stats {
2787            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2788            assert!(stats.min_opt().unwrap().is_sign_negative());
2789            assert_eq!(stats.max_opt().unwrap(), &0.0);
2790            assert!(stats.max_opt().unwrap().is_sign_positive());
2791        } else {
2792            panic!("expecting Statistics::Float");
2793        }
2794    }
2795
2796    #[test]
2797    fn test_float_statistics_zero_min() {
2798        let stats = statistics_roundtrip::<FloatType>(&[0.0, 1.0, f32::NAN, 2.0]);
2799        assert!(stats.is_min_max_backwards_compatible());
2800        if let Statistics::Float(stats) = stats {
2801            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2802            assert!(stats.min_opt().unwrap().is_sign_negative());
2803            assert_eq!(stats.max_opt().unwrap(), &2.0);
2804        } else {
2805            panic!("expecting Statistics::Float");
2806        }
2807    }
2808
2809    #[test]
2810    fn test_float_statistics_neg_zero_max() {
2811        let stats = statistics_roundtrip::<FloatType>(&[-0.0, -1.0, f32::NAN, -2.0]);
2812        assert!(stats.is_min_max_backwards_compatible());
2813        if let Statistics::Float(stats) = stats {
2814            assert_eq!(stats.min_opt().unwrap(), &-2.0);
2815            assert_eq!(stats.max_opt().unwrap(), &0.0);
2816            assert!(stats.max_opt().unwrap().is_sign_positive());
2817        } else {
2818            panic!("expecting Statistics::Float");
2819        }
2820    }
2821
2822    #[test]
2823    fn test_double_statistics_nan_middle() {
2824        let stats = statistics_roundtrip::<DoubleType>(&[1.0, f64::NAN, 2.0]);
2825        assert!(stats.is_min_max_backwards_compatible());
2826        if let Statistics::Double(stats) = stats {
2827            assert_eq!(stats.min_opt().unwrap(), &1.0);
2828            assert_eq!(stats.max_opt().unwrap(), &2.0);
2829        } else {
2830            panic!("expecting Statistics::Double");
2831        }
2832    }
2833
2834    #[test]
2835    fn test_double_statistics_nan_start() {
2836        let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, 1.0, 2.0]);
2837        assert!(stats.is_min_max_backwards_compatible());
2838        if let Statistics::Double(stats) = stats {
2839            assert_eq!(stats.min_opt().unwrap(), &1.0);
2840            assert_eq!(stats.max_opt().unwrap(), &2.0);
2841        } else {
2842            panic!("expecting Statistics::Double");
2843        }
2844    }
2845
2846    #[test]
2847    fn test_double_statistics_nan_only() {
2848        let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, f64::NAN]);
2849        assert!(stats.min_bytes_opt().is_none());
2850        assert!(stats.max_bytes_opt().is_none());
2851        assert!(matches!(stats, Statistics::Double(_)));
2852        assert!(stats.is_min_max_backwards_compatible());
2853    }
2854
2855    #[test]
2856    fn test_double_statistics_zero_only() {
2857        let stats = statistics_roundtrip::<DoubleType>(&[0.0]);
2858        assert!(stats.is_min_max_backwards_compatible());
2859        if let Statistics::Double(stats) = stats {
2860            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2861            assert!(stats.min_opt().unwrap().is_sign_negative());
2862            assert_eq!(stats.max_opt().unwrap(), &0.0);
2863            assert!(stats.max_opt().unwrap().is_sign_positive());
2864        } else {
2865            panic!("expecting Statistics::Double");
2866        }
2867    }
2868
2869    #[test]
2870    fn test_double_statistics_neg_zero_only() {
2871        let stats = statistics_roundtrip::<DoubleType>(&[-0.0]);
2872        assert!(stats.is_min_max_backwards_compatible());
2873        if let Statistics::Double(stats) = stats {
2874            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2875            assert!(stats.min_opt().unwrap().is_sign_negative());
2876            assert_eq!(stats.max_opt().unwrap(), &0.0);
2877            assert!(stats.max_opt().unwrap().is_sign_positive());
2878        } else {
2879            panic!("expecting Statistics::Double");
2880        }
2881    }
2882
2883    #[test]
2884    fn test_double_statistics_zero_min() {
2885        let stats = statistics_roundtrip::<DoubleType>(&[0.0, 1.0, f64::NAN, 2.0]);
2886        assert!(stats.is_min_max_backwards_compatible());
2887        if let Statistics::Double(stats) = stats {
2888            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2889            assert!(stats.min_opt().unwrap().is_sign_negative());
2890            assert_eq!(stats.max_opt().unwrap(), &2.0);
2891        } else {
2892            panic!("expecting Statistics::Double");
2893        }
2894    }
2895
2896    #[test]
2897    fn test_double_statistics_neg_zero_max() {
2898        let stats = statistics_roundtrip::<DoubleType>(&[-0.0, -1.0, f64::NAN, -2.0]);
2899        assert!(stats.is_min_max_backwards_compatible());
2900        if let Statistics::Double(stats) = stats {
2901            assert_eq!(stats.min_opt().unwrap(), &-2.0);
2902            assert_eq!(stats.max_opt().unwrap(), &0.0);
2903            assert!(stats.max_opt().unwrap().is_sign_positive());
2904        } else {
2905            panic!("expecting Statistics::Double");
2906        }
2907    }
2908
2909    #[test]
2910    fn test_compare_greater_byte_array_decimals() {
2911        assert!(!compare_greater_byte_array_decimals(&[], &[],),);
2912        assert!(compare_greater_byte_array_decimals(&[1u8,], &[],),);
2913        assert!(!compare_greater_byte_array_decimals(&[], &[1u8,],),);
2914        assert!(compare_greater_byte_array_decimals(&[1u8,], &[0u8,],),);
2915        assert!(!compare_greater_byte_array_decimals(&[1u8,], &[1u8,],),);
2916        assert!(compare_greater_byte_array_decimals(&[1u8, 0u8,], &[0u8,],),);
2917        assert!(!compare_greater_byte_array_decimals(
2918            &[0u8, 1u8,],
2919            &[1u8, 0u8,],
2920        ),);
2921        assert!(!compare_greater_byte_array_decimals(
2922            &[255u8, 35u8, 0u8, 0u8,],
2923            &[0u8,],
2924        ),);
2925        assert!(compare_greater_byte_array_decimals(
2926            &[0u8,],
2927            &[255u8, 35u8, 0u8, 0u8,],
2928        ),);
2929    }
2930
2931    #[test]
2932    fn test_column_index_with_null_pages() {
2933        // write a single page of all nulls
2934        let page_writer = get_test_page_writer();
2935        let props = Default::default();
2936        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
2937        writer.write_batch(&[], Some(&[0, 0, 0, 0]), None).unwrap();
2938
2939        let r = writer.close().unwrap();
2940        assert!(r.column_index.is_some());
2941        let col_idx = r.column_index.unwrap();
2942        // null_pages should be true for page 0
2943        assert!(col_idx.null_pages[0]);
2944        // min and max should be empty byte arrays
2945        assert_eq!(col_idx.min_values[0].len(), 0);
2946        assert_eq!(col_idx.max_values[0].len(), 0);
2947        // null_counts should be defined and be 4 for page 0
2948        assert!(col_idx.null_counts.is_some());
2949        assert_eq!(col_idx.null_counts.as_ref().unwrap()[0], 4);
2950        // there is no repetition so rep histogram should be absent
2951        assert!(col_idx.repetition_level_histograms.is_none());
2952        // definition_level_histogram should be present and should be 0:4, 1:0
2953        assert!(col_idx.definition_level_histograms.is_some());
2954        assert_eq!(col_idx.definition_level_histograms.unwrap(), &[4, 0]);
2955    }
2956
2957    #[test]
2958    fn test_column_offset_index_metadata() {
2959        // write data
2960        // and check the offset index and column index
2961        let page_writer = get_test_page_writer();
2962        let props = Default::default();
2963        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2964        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2965        // first page
2966        writer.flush_data_pages().unwrap();
2967        // second page
2968        writer.write_batch(&[4, 8, 2, -5], None, None).unwrap();
2969
2970        let r = writer.close().unwrap();
2971        let column_index = r.column_index.unwrap();
2972        let offset_index = r.offset_index.unwrap();
2973
2974        assert_eq!(8, r.rows_written);
2975
2976        // column index
2977        assert_eq!(2, column_index.null_pages.len());
2978        assert_eq!(2, offset_index.page_locations.len());
2979        assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order);
2980        for idx in 0..2 {
2981            assert!(!column_index.null_pages[idx]);
2982            assert_eq!(0, column_index.null_counts.as_ref().unwrap()[idx]);
2983        }
2984
2985        if let Some(stats) = r.metadata.statistics() {
2986            assert_eq!(stats.null_count_opt(), Some(0));
2987            assert_eq!(stats.distinct_count_opt(), None);
2988            if let Statistics::Int32(stats) = stats {
2989                // first page is [1,2,3,4]
2990                // second page is [-5,2,4,8]
2991                // note that we don't increment here, as this is a non BinaryArray type.
2992                assert_eq!(
2993                    stats.min_bytes_opt(),
2994                    Some(column_index.min_values[1].as_slice())
2995                );
2996                assert_eq!(
2997                    stats.max_bytes_opt(),
2998                    column_index.max_values.get(1).map(Vec::as_slice)
2999                );
3000            } else {
3001                panic!("expecting Statistics::Int32");
3002            }
3003        } else {
3004            panic!("metadata missing statistics");
3005        }
3006
3007        // page location
3008        assert_eq!(0, offset_index.page_locations[0].first_row_index);
3009        assert_eq!(4, offset_index.page_locations[1].first_row_index);
3010    }
3011
3012    /// Verify min/max value truncation in the column index works as expected
3013    #[test]
3014    fn test_column_offset_index_metadata_truncating() {
3015        // write data
3016        // and check the offset index and column index
3017        let page_writer = get_test_page_writer();
3018        let props = WriterProperties::builder()
3019            .set_statistics_truncate_length(None) // disable column index truncation
3020            .build()
3021            .into();
3022        let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3023
3024        let mut data = vec![FixedLenByteArray::default(); 3];
3025        // This is the expected min value - "aaa..."
3026        data[0].set_data(Bytes::from(vec![97_u8; 200]));
3027        // This is the expected max value - "ZZZ..."
3028        data[1].set_data(Bytes::from(vec![112_u8; 200]));
3029        data[2].set_data(Bytes::from(vec![98_u8; 200]));
3030
3031        writer.write_batch(&data, None, None).unwrap();
3032
3033        writer.flush_data_pages().unwrap();
3034
3035        let r = writer.close().unwrap();
3036        let column_index = r.column_index.unwrap();
3037        let offset_index = r.offset_index.unwrap();
3038
3039        assert_eq!(3, r.rows_written);
3040
3041        // column index
3042        assert_eq!(1, column_index.null_pages.len());
3043        assert_eq!(1, offset_index.page_locations.len());
3044        assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
3045        assert!(!column_index.null_pages[0]);
3046        assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]);
3047
3048        if let Some(stats) = r.metadata.statistics() {
3049            assert_eq!(stats.null_count_opt(), Some(0));
3050            assert_eq!(stats.distinct_count_opt(), None);
3051            if let Statistics::FixedLenByteArray(stats) = stats {
3052                let column_index_min_value = &column_index.min_values[0];
3053                let column_index_max_value = &column_index.max_values[0];
3054
3055                // Column index stats are truncated, while the column chunk's aren't.
3056                assert_ne!(
3057                    stats.min_bytes_opt(),
3058                    Some(column_index_min_value.as_slice())
3059                );
3060                assert_ne!(
3061                    stats.max_bytes_opt(),
3062                    Some(column_index_max_value.as_slice())
3063                );
3064
3065                assert_eq!(
3066                    column_index_min_value.len(),
3067                    DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
3068                );
3069                assert_eq!(column_index_min_value.as_slice(), &[97_u8; 64]);
3070                assert_eq!(
3071                    column_index_max_value.len(),
3072                    DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
3073                );
3074
3075                // We expect the last byte to be incremented
3076                assert_eq!(
3077                    *column_index_max_value.last().unwrap(),
3078                    *column_index_max_value.first().unwrap() + 1
3079                );
3080            } else {
3081                panic!("expecting Statistics::FixedLenByteArray");
3082            }
3083        } else {
3084            panic!("metadata missing statistics");
3085        }
3086    }
3087
3088    #[test]
3089    fn test_column_offset_index_truncating_spec_example() {
3090        // write data
3091        // and check the offset index and column index
3092        let page_writer = get_test_page_writer();
3093
3094        // Truncate values at 1 byte
3095        let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3096        let props = Arc::new(builder.build());
3097        let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3098
3099        let mut data = vec![FixedLenByteArray::default(); 1];
3100        // This is the expected min value
3101        data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
3102
3103        writer.write_batch(&data, None, None).unwrap();
3104
3105        writer.flush_data_pages().unwrap();
3106
3107        let r = writer.close().unwrap();
3108        let column_index = r.column_index.unwrap();
3109        let offset_index = r.offset_index.unwrap();
3110
3111        assert_eq!(1, r.rows_written);
3112
3113        // column index
3114        assert_eq!(1, column_index.null_pages.len());
3115        assert_eq!(1, offset_index.page_locations.len());
3116        assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
3117        assert!(!column_index.null_pages[0]);
3118        assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]);
3119
3120        if let Some(stats) = r.metadata.statistics() {
3121            assert_eq!(stats.null_count_opt(), Some(0));
3122            assert_eq!(stats.distinct_count_opt(), None);
3123            if let Statistics::FixedLenByteArray(_stats) = stats {
3124                let column_index_min_value = &column_index.min_values[0];
3125                let column_index_max_value = &column_index.max_values[0];
3126
3127                assert_eq!(column_index_min_value.len(), 1);
3128                assert_eq!(column_index_max_value.len(), 1);
3129
3130                assert_eq!("B".as_bytes(), column_index_min_value.as_slice());
3131                assert_eq!("C".as_bytes(), column_index_max_value.as_slice());
3132
3133                assert_ne!(column_index_min_value, stats.min_bytes_opt().unwrap());
3134                assert_ne!(column_index_max_value, stats.max_bytes_opt().unwrap());
3135            } else {
3136                panic!("expecting Statistics::FixedLenByteArray");
3137            }
3138        } else {
3139            panic!("metadata missing statistics");
3140        }
3141    }
3142
3143    #[test]
3144    fn test_float16_min_max_no_truncation() {
3145        // Even if we set truncation to occur at 1 byte, we should not truncate for Float16
3146        let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3147        let props = Arc::new(builder.build());
3148        let page_writer = get_test_page_writer();
3149        let mut writer = get_test_float16_column_writer(page_writer, props);
3150
3151        let expected_value = f16::PI.to_le_bytes().to_vec();
3152        let data = vec![ByteArray::from(expected_value.clone()).into()];
3153        writer.write_batch(&data, None, None).unwrap();
3154        writer.flush_data_pages().unwrap();
3155
3156        let r = writer.close().unwrap();
3157
3158        // stats should still be written
3159        // ensure bytes weren't truncated for column index
3160        let column_index = r.column_index.unwrap();
3161        let column_index_min_bytes = column_index.min_values[0].as_slice();
3162        let column_index_max_bytes = column_index.max_values[0].as_slice();
3163        assert_eq!(expected_value, column_index_min_bytes);
3164        assert_eq!(expected_value, column_index_max_bytes);
3165
3166        // ensure bytes weren't truncated for statistics
3167        let stats = r.metadata.statistics().unwrap();
3168        if let Statistics::FixedLenByteArray(stats) = stats {
3169            let stats_min_bytes = stats.min_bytes_opt().unwrap();
3170            let stats_max_bytes = stats.max_bytes_opt().unwrap();
3171            assert_eq!(expected_value, stats_min_bytes);
3172            assert_eq!(expected_value, stats_max_bytes);
3173        } else {
3174            panic!("expecting Statistics::FixedLenByteArray");
3175        }
3176    }
3177
3178    #[test]
3179    fn test_decimal_min_max_no_truncation() {
3180        // Even if we set truncation to occur at 1 byte, we should not truncate for Decimal
3181        let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3182        let props = Arc::new(builder.build());
3183        let page_writer = get_test_page_writer();
3184        let mut writer =
3185            get_test_decimals_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3186
3187        let expected_value = vec![
3188            255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8, 35u8,
3189            231u8, 90u8, 0u8, 0u8,
3190        ];
3191        let data = vec![ByteArray::from(expected_value.clone()).into()];
3192        writer.write_batch(&data, None, None).unwrap();
3193        writer.flush_data_pages().unwrap();
3194
3195        let r = writer.close().unwrap();
3196
3197        // stats should still be written
3198        // ensure bytes weren't truncated for column index
3199        let column_index = r.column_index.unwrap();
3200        let column_index_min_bytes = column_index.min_values[0].as_slice();
3201        let column_index_max_bytes = column_index.max_values[0].as_slice();
3202        assert_eq!(expected_value, column_index_min_bytes);
3203        assert_eq!(expected_value, column_index_max_bytes);
3204
3205        // ensure bytes weren't truncated for statistics
3206        let stats = r.metadata.statistics().unwrap();
3207        if let Statistics::FixedLenByteArray(stats) = stats {
3208            let stats_min_bytes = stats.min_bytes_opt().unwrap();
3209            let stats_max_bytes = stats.max_bytes_opt().unwrap();
3210            assert_eq!(expected_value, stats_min_bytes);
3211            assert_eq!(expected_value, stats_max_bytes);
3212        } else {
3213            panic!("expecting Statistics::FixedLenByteArray");
3214        }
3215    }
3216
3217    #[test]
3218    fn test_statistics_truncating_byte_array_default() {
3219        let page_writer = get_test_page_writer();
3220
3221        // The default truncate length is 64 bytes
3222        let props = WriterProperties::builder().build().into();
3223        let mut writer = get_test_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
3224
3225        let mut data = vec![ByteArray::default(); 1];
3226        data[0].set_data(Bytes::from(String::from(
3227            "This string is longer than 64 bytes, so it will almost certainly be truncated.",
3228        )));
3229        writer.write_batch(&data, None, None).unwrap();
3230        writer.flush_data_pages().unwrap();
3231
3232        let r = writer.close().unwrap();
3233
3234        assert_eq!(1, r.rows_written);
3235
3236        let stats = r.metadata.statistics().expect("statistics");
3237        if let Statistics::ByteArray(_stats) = stats {
3238            let min_value = _stats.min_opt().unwrap();
3239            let max_value = _stats.max_opt().unwrap();
3240
3241            assert!(!_stats.min_is_exact());
3242            assert!(!_stats.max_is_exact());
3243
3244            let expected_len = 64;
3245            assert_eq!(min_value.len(), expected_len);
3246            assert_eq!(max_value.len(), expected_len);
3247
3248            let expected_min =
3249                "This string is longer than 64 bytes, so it will almost certainly".as_bytes();
3250            assert_eq!(expected_min, min_value.as_bytes());
3251            // note the max value is different from the min value: the last byte is incremented
3252            let expected_max =
3253                "This string is longer than 64 bytes, so it will almost certainlz".as_bytes();
3254            assert_eq!(expected_max, max_value.as_bytes());
3255        } else {
3256            panic!("expecting Statistics::ByteArray");
3257        }
3258    }
3259
3260    #[test]
3261    fn test_statistics_truncating_byte_array() {
3262        let page_writer = get_test_page_writer();
3263
3264        const TEST_TRUNCATE_LENGTH: usize = 1;
3265
3266        // Truncate values at 1 byte
3267        let builder =
3268            WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3269        let props = Arc::new(builder.build());
3270        let mut writer = get_test_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
3271
3272        let mut data = vec![ByteArray::default(); 1];
3273        // This is the expected min value
3274        data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
3275
3276        writer.write_batch(&data, None, None).unwrap();
3277
3278        writer.flush_data_pages().unwrap();
3279
3280        let r = writer.close().unwrap();
3281
3282        assert_eq!(1, r.rows_written);
3283
3284        let stats = r.metadata.statistics().expect("statistics");
3285        assert_eq!(stats.null_count_opt(), Some(0));
3286        assert_eq!(stats.distinct_count_opt(), None);
3287        if let Statistics::ByteArray(_stats) = stats {
3288            let min_value = _stats.min_opt().unwrap();
3289            let max_value = _stats.max_opt().unwrap();
3290
3291            assert!(!_stats.min_is_exact());
3292            assert!(!_stats.max_is_exact());
3293
3294            assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3295            assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3296
3297            assert_eq!("B".as_bytes(), min_value.as_bytes());
3298            assert_eq!("C".as_bytes(), max_value.as_bytes());
3299        } else {
3300            panic!("expecting Statistics::ByteArray");
3301        }
3302    }
3303
3304    #[test]
3305    fn test_statistics_truncating_fixed_len_byte_array() {
3306        let page_writer = get_test_page_writer();
3307
3308        const TEST_TRUNCATE_LENGTH: usize = 1;
3309
3310        // Truncate values at 1 byte
3311        let builder =
3312            WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3313        let props = Arc::new(builder.build());
3314        let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3315
3316        let mut data = vec![FixedLenByteArray::default(); 1];
3317
3318        const PSEUDO_DECIMAL_VALUE: i128 = 6541894651216648486512564456564654;
3319        const PSEUDO_DECIMAL_BYTES: [u8; 16] = PSEUDO_DECIMAL_VALUE.to_be_bytes();
3320
3321        const EXPECTED_MIN: [u8; TEST_TRUNCATE_LENGTH] = [PSEUDO_DECIMAL_BYTES[0]]; // parquet specifies big-endian order for decimals
3322        const EXPECTED_MAX: [u8; TEST_TRUNCATE_LENGTH] =
3323            [PSEUDO_DECIMAL_BYTES[0].overflowing_add(1).0];
3324
3325        // This is the expected min value
3326        data[0].set_data(Bytes::from(PSEUDO_DECIMAL_BYTES.as_slice()));
3327
3328        writer.write_batch(&data, None, None).unwrap();
3329
3330        writer.flush_data_pages().unwrap();
3331
3332        let r = writer.close().unwrap();
3333
3334        assert_eq!(1, r.rows_written);
3335
3336        let stats = r.metadata.statistics().expect("statistics");
3337        assert_eq!(stats.null_count_opt(), Some(0));
3338        assert_eq!(stats.distinct_count_opt(), None);
3339        if let Statistics::FixedLenByteArray(_stats) = stats {
3340            let min_value = _stats.min_opt().unwrap();
3341            let max_value = _stats.max_opt().unwrap();
3342
3343            assert!(!_stats.min_is_exact());
3344            assert!(!_stats.max_is_exact());
3345
3346            assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3347            assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3348
3349            assert_eq!(EXPECTED_MIN.as_slice(), min_value.as_bytes());
3350            assert_eq!(EXPECTED_MAX.as_slice(), max_value.as_bytes());
3351
3352            let reconstructed_min = i128::from_be_bytes([
3353                min_value.as_bytes()[0],
3354                0,
3355                0,
3356                0,
3357                0,
3358                0,
3359                0,
3360                0,
3361                0,
3362                0,
3363                0,
3364                0,
3365                0,
3366                0,
3367                0,
3368                0,
3369            ]);
3370
3371            let reconstructed_max = i128::from_be_bytes([
3372                max_value.as_bytes()[0],
3373                0,
3374                0,
3375                0,
3376                0,
3377                0,
3378                0,
3379                0,
3380                0,
3381                0,
3382                0,
3383                0,
3384                0,
3385                0,
3386                0,
3387                0,
3388            ]);
3389
3390            // check that the inner value is correctly bounded by the min/max
3391            println!("min: {reconstructed_min} {PSEUDO_DECIMAL_VALUE}");
3392            assert!(reconstructed_min <= PSEUDO_DECIMAL_VALUE);
3393            println!("max {reconstructed_max} {PSEUDO_DECIMAL_VALUE}");
3394            assert!(reconstructed_max >= PSEUDO_DECIMAL_VALUE);
3395        } else {
3396            panic!("expecting Statistics::FixedLenByteArray");
3397        }
3398    }
3399
3400    #[test]
3401    fn test_send() {
3402        fn test<T: Send>() {}
3403        test::<ColumnWriterImpl<Int32Type>>();
3404    }
3405
3406    #[test]
3407    fn test_increment() {
3408        let v = increment(vec![0, 0, 0]).unwrap();
3409        assert_eq!(&v, &[0, 0, 1]);
3410
3411        // Handle overflow
3412        let v = increment(vec![0, 255, 255]).unwrap();
3413        assert_eq!(&v, &[1, 0, 0]);
3414
3415        // Return `None` if all bytes are u8::MAX
3416        let v = increment(vec![255, 255, 255]);
3417        assert!(v.is_none());
3418    }
3419
3420    #[test]
3421    fn test_increment_utf8() {
3422        let test_inc = |o: &str, expected: &str| {
3423            if let Ok(v) = String::from_utf8(increment_utf8(o).unwrap()) {
3424                // Got the expected result...
3425                assert_eq!(v, expected);
3426                // and it's greater than the original string
3427                assert!(*v > *o);
3428                // Also show that BinaryArray level comparison works here
3429                let mut greater = ByteArray::new();
3430                greater.set_data(Bytes::from(v));
3431                let mut original = ByteArray::new();
3432                original.set_data(Bytes::from(o.as_bytes().to_vec()));
3433                assert!(greater > original);
3434            } else {
3435                panic!("Expected incremented UTF8 string to also be valid.");
3436            }
3437        };
3438
3439        // Basic ASCII case
3440        test_inc("hello", "hellp");
3441
3442        // 1-byte ending in max 1-byte
3443        test_inc("a\u{7f}", "b");
3444
3445        // 1-byte max should not truncate as it would need 2-byte code points
3446        assert!(increment_utf8("\u{7f}\u{7f}").is_none());
3447
3448        // UTF8 string
3449        test_inc("❤️🧡💛💚💙💜", "❤️🧡💛💚💙💝");
3450
3451        // 2-byte without overflow
3452        test_inc("éééé", "éééê");
3453
3454        // 2-byte that overflows lowest byte
3455        test_inc("\u{ff}\u{ff}", "\u{ff}\u{100}");
3456
3457        // 2-byte ending in max 2-byte
3458        test_inc("a\u{7ff}", "b");
3459
3460        // Max 2-byte should not truncate as it would need 3-byte code points
3461        assert!(increment_utf8("\u{7ff}\u{7ff}").is_none());
3462
3463        // 3-byte without overflow [U+800, U+800] -> [U+800, U+801] (note that these
3464        // characters should render right to left).
3465        test_inc("ࠀࠀ", "ࠀࠁ");
3466
3467        // 3-byte ending in max 3-byte
3468        test_inc("a\u{ffff}", "b");
3469
3470        // Max 3-byte should not truncate as it would need 4-byte code points
3471        assert!(increment_utf8("\u{ffff}\u{ffff}").is_none());
3472
3473        // 4-byte without overflow
3474        test_inc("𐀀𐀀", "𐀀𐀁");
3475
3476        // 4-byte ending in max unicode
3477        test_inc("a\u{10ffff}", "b");
3478
3479        // Max 4-byte should not truncate
3480        assert!(increment_utf8("\u{10ffff}\u{10ffff}").is_none());
3481
3482        // Skip over surrogate pair range (0xD800..=0xDFFF)
3483        //test_inc("a\u{D7FF}", "a\u{e000}");
3484        test_inc("a\u{D7FF}", "b");
3485    }
3486
3487    #[test]
3488    fn test_truncate_utf8() {
3489        // No-op
3490        let data = "❤️🧡💛💚💙💜";
3491        let r = truncate_utf8(data, data.len()).unwrap();
3492        assert_eq!(r.len(), data.len());
3493        assert_eq!(&r, data.as_bytes());
3494
3495        // We slice it away from the UTF8 boundary
3496        let r = truncate_utf8(data, 13).unwrap();
3497        assert_eq!(r.len(), 10);
3498        assert_eq!(&r, "❤️🧡".as_bytes());
3499
3500        // One multi-byte code point, and a length shorter than it, so we can't slice it
3501        let r = truncate_utf8("\u{0836}", 1);
3502        assert!(r.is_none());
3503
3504        // Test truncate and increment for max bounds on UTF-8 statistics
3505        // 7-bit (i.e. ASCII)
3506        let r = truncate_and_increment_utf8("yyyyyyyyy", 8).unwrap();
3507        assert_eq!(&r, "yyyyyyyz".as_bytes());
3508
3509        // 2-byte without overflow
3510        let r = truncate_and_increment_utf8("ééééé", 7).unwrap();
3511        assert_eq!(&r, "ééê".as_bytes());
3512
3513        // 2-byte that overflows lowest byte
3514        let r = truncate_and_increment_utf8("\u{ff}\u{ff}\u{ff}\u{ff}\u{ff}", 8).unwrap();
3515        assert_eq!(&r, "\u{ff}\u{ff}\u{ff}\u{100}".as_bytes());
3516
3517        // max 2-byte should not truncate as it would need 3-byte code points
3518        let r = truncate_and_increment_utf8("߿߿߿߿߿", 8);
3519        assert!(r.is_none());
3520
3521        // 3-byte without overflow [U+800, U+800, U+800] -> [U+800, U+801] (note that these
3522        // characters should render right to left).
3523        let r = truncate_and_increment_utf8("ࠀࠀࠀࠀ", 8).unwrap();
3524        assert_eq!(&r, "ࠀࠁ".as_bytes());
3525
3526        // max 3-byte should not truncate as it would need 4-byte code points
3527        let r = truncate_and_increment_utf8("\u{ffff}\u{ffff}\u{ffff}", 8);
3528        assert!(r.is_none());
3529
3530        // 4-byte without overflow
3531        let r = truncate_and_increment_utf8("𐀀𐀀𐀀𐀀", 9).unwrap();
3532        assert_eq!(&r, "𐀀𐀁".as_bytes());
3533
3534        // max 4-byte should not truncate
3535        let r = truncate_and_increment_utf8("\u{10ffff}\u{10ffff}", 8);
3536        assert!(r.is_none());
3537    }
3538
3539    #[test]
3540    // Check fallback truncation of statistics that should be UTF-8, but aren't
3541    // (see https://github.com/apache/arrow-rs/pull/6870).
3542    fn test_byte_array_truncate_invalid_utf8_statistics() {
3543        let message_type = "
3544            message test_schema {
3545                OPTIONAL BYTE_ARRAY a (UTF8);
3546            }
3547        ";
3548        let schema = Arc::new(parse_message_type(message_type).unwrap());
3549
3550        // Create Vec<ByteArray> containing non-UTF8 bytes
3551        let data = vec![ByteArray::from(vec![128u8; 32]); 7];
3552        let def_levels = [1, 1, 1, 1, 0, 1, 0, 1, 0, 1];
3553        let file: File = tempfile::tempfile().unwrap();
3554        let props = Arc::new(
3555            WriterProperties::builder()
3556                .set_statistics_enabled(EnabledStatistics::Chunk)
3557                .set_statistics_truncate_length(Some(8))
3558                .build(),
3559        );
3560
3561        let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
3562        let mut row_group_writer = writer.next_row_group().unwrap();
3563
3564        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
3565        col_writer
3566            .typed::<ByteArrayType>()
3567            .write_batch(&data, Some(&def_levels), None)
3568            .unwrap();
3569        col_writer.close().unwrap();
3570        row_group_writer.close().unwrap();
3571        let file_metadata = writer.close().unwrap();
3572        assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
3573        let stats = file_metadata.row_groups[0].columns[0]
3574            .meta_data
3575            .as_ref()
3576            .unwrap()
3577            .statistics
3578            .as_ref()
3579            .unwrap();
3580        assert!(!stats.is_max_value_exact.unwrap());
3581        // Truncation of invalid UTF-8 should fall back to binary truncation, so last byte should
3582        // be incremented by 1.
3583        assert_eq!(
3584            stats.max_value,
3585            Some([128, 128, 128, 128, 128, 128, 128, 129].to_vec())
3586        );
3587    }
3588
3589    #[test]
3590    fn test_increment_max_binary_chars() {
3591        let r = increment(vec![0xFF, 0xFE, 0xFD, 0xFF, 0xFF]);
3592        assert_eq!(&r.unwrap(), &[0xFF, 0xFE, 0xFE, 0x00, 0x00]);
3593
3594        let incremented = increment(vec![0xFF, 0xFF, 0xFF]);
3595        assert!(incremented.is_none())
3596    }
3597
3598    #[test]
3599    fn test_no_column_index_when_stats_disabled() {
3600        // https://github.com/apache/arrow-rs/issues/6010
3601        // Test that column index is not created/written for all-nulls column when page
3602        // statistics are disabled.
3603        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3604        let props = Arc::new(
3605            WriterProperties::builder()
3606                .set_statistics_enabled(EnabledStatistics::None)
3607                .build(),
3608        );
3609        let column_writer = get_column_writer(descr, props, get_test_page_writer());
3610        let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3611
3612        let data = Vec::new();
3613        let def_levels = vec![0; 10];
3614        writer.write_batch(&data, Some(&def_levels), None).unwrap();
3615        writer.flush_data_pages().unwrap();
3616
3617        let column_close_result = writer.close().unwrap();
3618        assert!(column_close_result.offset_index.is_some());
3619        assert!(column_close_result.column_index.is_none());
3620    }
3621
3622    #[test]
3623    fn test_no_offset_index_when_disabled() {
3624        // Test that offset indexes can be disabled
3625        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3626        let props = Arc::new(
3627            WriterProperties::builder()
3628                .set_statistics_enabled(EnabledStatistics::None)
3629                .set_offset_index_disabled(true)
3630                .build(),
3631        );
3632        let column_writer = get_column_writer(descr, props, get_test_page_writer());
3633        let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3634
3635        let data = Vec::new();
3636        let def_levels = vec![0; 10];
3637        writer.write_batch(&data, Some(&def_levels), None).unwrap();
3638        writer.flush_data_pages().unwrap();
3639
3640        let column_close_result = writer.close().unwrap();
3641        assert!(column_close_result.offset_index.is_none());
3642        assert!(column_close_result.column_index.is_none());
3643    }
3644
3645    #[test]
3646    fn test_offset_index_overridden() {
3647        // Test that offset indexes are not disabled when gathering page statistics
3648        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3649        let props = Arc::new(
3650            WriterProperties::builder()
3651                .set_statistics_enabled(EnabledStatistics::Page)
3652                .set_offset_index_disabled(true)
3653                .build(),
3654        );
3655        let column_writer = get_column_writer(descr, props, get_test_page_writer());
3656        let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3657
3658        let data = Vec::new();
3659        let def_levels = vec![0; 10];
3660        writer.write_batch(&data, Some(&def_levels), None).unwrap();
3661        writer.flush_data_pages().unwrap();
3662
3663        let column_close_result = writer.close().unwrap();
3664        assert!(column_close_result.offset_index.is_some());
3665        assert!(column_close_result.column_index.is_some());
3666    }
3667
3668    #[test]
3669    fn test_boundary_order() -> Result<()> {
3670        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3671        // min max both ascending
3672        let column_close_result = write_multiple_pages::<Int32Type>(
3673            &descr,
3674            &[
3675                &[Some(-10), Some(10)],
3676                &[Some(-5), Some(11)],
3677                &[None],
3678                &[Some(-5), Some(11)],
3679            ],
3680        )?;
3681        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3682        assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3683
3684        // min max both descending
3685        let column_close_result = write_multiple_pages::<Int32Type>(
3686            &descr,
3687            &[
3688                &[Some(10), Some(11)],
3689                &[Some(5), Some(11)],
3690                &[None],
3691                &[Some(-5), Some(0)],
3692            ],
3693        )?;
3694        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3695        assert_eq!(boundary_order, BoundaryOrder::DESCENDING);
3696
3697        // min max both equal
3698        let column_close_result = write_multiple_pages::<Int32Type>(
3699            &descr,
3700            &[&[Some(10), Some(11)], &[None], &[Some(10), Some(11)]],
3701        )?;
3702        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3703        assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3704
3705        // only nulls
3706        let column_close_result =
3707            write_multiple_pages::<Int32Type>(&descr, &[&[None], &[None], &[None]])?;
3708        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3709        assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3710
3711        // one page
3712        let column_close_result =
3713            write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)]])?;
3714        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3715        assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3716
3717        // one non-null page
3718        let column_close_result =
3719            write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)], &[None]])?;
3720        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3721        assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3722
3723        // min max both unordered
3724        let column_close_result = write_multiple_pages::<Int32Type>(
3725            &descr,
3726            &[
3727                &[Some(10), Some(11)],
3728                &[Some(11), Some(16)],
3729                &[None],
3730                &[Some(-5), Some(0)],
3731            ],
3732        )?;
3733        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3734        assert_eq!(boundary_order, BoundaryOrder::UNORDERED);
3735
3736        // min max both ordered in different orders
3737        let column_close_result = write_multiple_pages::<Int32Type>(
3738            &descr,
3739            &[
3740                &[Some(1), Some(9)],
3741                &[Some(2), Some(8)],
3742                &[None],
3743                &[Some(3), Some(7)],
3744            ],
3745        )?;
3746        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3747        assert_eq!(boundary_order, BoundaryOrder::UNORDERED);
3748
3749        Ok(())
3750    }
3751
3752    #[test]
3753    fn test_boundary_order_logical_type() -> Result<()> {
3754        // ensure that logical types account for different sort order than underlying
3755        // physical type representation
3756        let f16_descr = Arc::new(get_test_float16_column_descr(1, 0));
3757        let fba_descr = {
3758            let tpe = SchemaType::primitive_type_builder(
3759                "col",
3760                FixedLenByteArrayType::get_physical_type(),
3761            )
3762            .with_length(2)
3763            .build()?;
3764            Arc::new(ColumnDescriptor::new(
3765                Arc::new(tpe),
3766                1,
3767                0,
3768                ColumnPath::from("col"),
3769            ))
3770        };
3771
3772        let values: &[&[Option<FixedLenByteArray>]] = &[
3773            &[Some(FixedLenByteArray::from(ByteArray::from(f16::ONE)))],
3774            &[Some(FixedLenByteArray::from(ByteArray::from(f16::ZERO)))],
3775            &[Some(FixedLenByteArray::from(ByteArray::from(
3776                f16::NEG_ZERO,
3777            )))],
3778            &[Some(FixedLenByteArray::from(ByteArray::from(f16::NEG_ONE)))],
3779        ];
3780
3781        // f16 descending
3782        let column_close_result =
3783            write_multiple_pages::<FixedLenByteArrayType>(&f16_descr, values)?;
3784        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3785        assert_eq!(boundary_order, BoundaryOrder::DESCENDING);
3786
3787        // same bytes, but fba unordered
3788        let column_close_result =
3789            write_multiple_pages::<FixedLenByteArrayType>(&fba_descr, values)?;
3790        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3791        assert_eq!(boundary_order, BoundaryOrder::UNORDERED);
3792
3793        Ok(())
3794    }
3795
3796    #[test]
3797    fn test_interval_stats_should_not_have_min_max() {
3798        let input = [
3799            vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
3800            vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
3801            vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2],
3802        ]
3803        .into_iter()
3804        .map(|s| ByteArray::from(s).into())
3805        .collect::<Vec<_>>();
3806
3807        let page_writer = get_test_page_writer();
3808        let mut writer = get_test_interval_column_writer(page_writer);
3809        writer.write_batch(&input, None, None).unwrap();
3810
3811        let metadata = writer.close().unwrap().metadata;
3812        let stats = if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
3813            stats.clone()
3814        } else {
3815            panic!("metadata missing statistics");
3816        };
3817        assert!(stats.min_bytes_opt().is_none());
3818        assert!(stats.max_bytes_opt().is_none());
3819    }
3820
3821    #[test]
3822    #[cfg(feature = "arrow")]
3823    fn test_column_writer_get_estimated_total_bytes() {
3824        let page_writer = get_test_page_writer();
3825        let props = Default::default();
3826        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
3827        assert_eq!(writer.get_estimated_total_bytes(), 0);
3828
3829        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
3830        writer.add_data_page().unwrap();
3831        let size_with_one_page = writer.get_estimated_total_bytes();
3832        assert_eq!(size_with_one_page, 20);
3833
3834        writer.write_batch(&[5, 6, 7, 8], None, None).unwrap();
3835        writer.add_data_page().unwrap();
3836        let size_with_two_pages = writer.get_estimated_total_bytes();
3837        // different pages have different compressed lengths
3838        assert_eq!(size_with_two_pages, 20 + 21);
3839    }
3840
3841    fn write_multiple_pages<T: DataType>(
3842        column_descr: &Arc<ColumnDescriptor>,
3843        pages: &[&[Option<T::T>]],
3844    ) -> Result<ColumnCloseResult> {
3845        let column_writer = get_column_writer(
3846            column_descr.clone(),
3847            Default::default(),
3848            get_test_page_writer(),
3849        );
3850        let mut writer = get_typed_column_writer::<T>(column_writer);
3851
3852        for &page in pages {
3853            let values = page.iter().filter_map(Clone::clone).collect::<Vec<_>>();
3854            let def_levels = page
3855                .iter()
3856                .map(|maybe_value| if maybe_value.is_some() { 1 } else { 0 })
3857                .collect::<Vec<_>>();
3858            writer.write_batch(&values, Some(&def_levels), None)?;
3859            writer.flush_data_pages()?;
3860        }
3861
3862        writer.close()
3863    }
3864
3865    /// Performs write-read roundtrip with randomly generated values and levels.
3866    /// `max_size` is maximum number of values or levels (if `max_def_level` > 0) to write
3867    /// for a column.
3868    fn column_roundtrip_random<T: DataType>(
3869        props: WriterProperties,
3870        max_size: usize,
3871        min_value: T::T,
3872        max_value: T::T,
3873        max_def_level: i16,
3874        max_rep_level: i16,
3875    ) where
3876        T::T: PartialOrd + SampleUniform + Copy,
3877    {
3878        let mut num_values: usize = 0;
3879
3880        let mut buf: Vec<i16> = Vec::new();
3881        let def_levels = if max_def_level > 0 {
3882            random_numbers_range(max_size, 0, max_def_level + 1, &mut buf);
3883            for &dl in &buf[..] {
3884                if dl == max_def_level {
3885                    num_values += 1;
3886                }
3887            }
3888            Some(&buf[..])
3889        } else {
3890            num_values = max_size;
3891            None
3892        };
3893
3894        let mut buf: Vec<i16> = Vec::new();
3895        let rep_levels = if max_rep_level > 0 {
3896            random_numbers_range(max_size, 0, max_rep_level + 1, &mut buf);
3897            buf[0] = 0; // Must start on record boundary
3898            Some(&buf[..])
3899        } else {
3900            None
3901        };
3902
3903        let mut values: Vec<T::T> = Vec::new();
3904        random_numbers_range(num_values, min_value, max_value, &mut values);
3905
3906        column_roundtrip::<T>(props, &values[..], def_levels, rep_levels);
3907    }
3908
3909    /// Performs write-read roundtrip and asserts written values and levels.
3910    fn column_roundtrip<T: DataType>(
3911        props: WriterProperties,
3912        values: &[T::T],
3913        def_levels: Option<&[i16]>,
3914        rep_levels: Option<&[i16]>,
3915    ) {
3916        let mut file = tempfile::tempfile().unwrap();
3917        let mut write = TrackedWrite::new(&mut file);
3918        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
3919
3920        let max_def_level = match def_levels {
3921            Some(buf) => *buf.iter().max().unwrap_or(&0i16),
3922            None => 0i16,
3923        };
3924
3925        let max_rep_level = match rep_levels {
3926            Some(buf) => *buf.iter().max().unwrap_or(&0i16),
3927            None => 0i16,
3928        };
3929
3930        let mut max_batch_size = values.len();
3931        if let Some(levels) = def_levels {
3932            max_batch_size = max_batch_size.max(levels.len());
3933        }
3934        if let Some(levels) = rep_levels {
3935            max_batch_size = max_batch_size.max(levels.len());
3936        }
3937
3938        let mut writer =
3939            get_test_column_writer::<T>(page_writer, max_def_level, max_rep_level, Arc::new(props));
3940
3941        let values_written = writer.write_batch(values, def_levels, rep_levels).unwrap();
3942        assert_eq!(values_written, values.len());
3943        let result = writer.close().unwrap();
3944
3945        drop(write);
3946
3947        let props = ReaderProperties::builder()
3948            .set_backward_compatible_lz4(false)
3949            .build();
3950        let page_reader = Box::new(
3951            SerializedPageReader::new_with_properties(
3952                Arc::new(file),
3953                &result.metadata,
3954                result.rows_written as usize,
3955                None,
3956                Arc::new(props),
3957            )
3958            .unwrap(),
3959        );
3960        let mut reader = get_test_column_reader::<T>(page_reader, max_def_level, max_rep_level);
3961
3962        let mut actual_values = Vec::with_capacity(max_batch_size);
3963        let mut actual_def_levels = def_levels.map(|_| Vec::with_capacity(max_batch_size));
3964        let mut actual_rep_levels = rep_levels.map(|_| Vec::with_capacity(max_batch_size));
3965
3966        let (_, values_read, levels_read) = reader
3967            .read_records(
3968                max_batch_size,
3969                actual_def_levels.as_mut(),
3970                actual_rep_levels.as_mut(),
3971                &mut actual_values,
3972            )
3973            .unwrap();
3974
3975        // Assert values, definition and repetition levels.
3976
3977        assert_eq!(&actual_values[..values_read], values);
3978        match actual_def_levels {
3979            Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), def_levels),
3980            None => assert_eq!(None, def_levels),
3981        }
3982        match actual_rep_levels {
3983            Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), rep_levels),
3984            None => assert_eq!(None, rep_levels),
3985        }
3986
3987        // Assert written rows.
3988
3989        if let Some(levels) = actual_rep_levels {
3990            let mut actual_rows_written = 0;
3991            for l in levels {
3992                if l == 0 {
3993                    actual_rows_written += 1;
3994                }
3995            }
3996            assert_eq!(actual_rows_written, result.rows_written);
3997        } else if actual_def_levels.is_some() {
3998            assert_eq!(levels_read as u64, result.rows_written);
3999        } else {
4000            assert_eq!(values_read as u64, result.rows_written);
4001        }
4002    }
4003
4004    /// Performs write of provided values and returns column metadata of those values.
4005    /// Used to test encoding support for column writer.
4006    fn column_write_and_get_metadata<T: DataType>(
4007        props: WriterProperties,
4008        values: &[T::T],
4009    ) -> ColumnChunkMetaData {
4010        let page_writer = get_test_page_writer();
4011        let props = Arc::new(props);
4012        let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
4013        writer.write_batch(values, None, None).unwrap();
4014        writer.close().unwrap().metadata
4015    }
4016
4017    // Helper function to more compactly create a PageEncodingStats struct.
4018    fn encoding_stats(page_type: PageType, encoding: Encoding, count: i32) -> PageEncodingStats {
4019        PageEncodingStats {
4020            page_type,
4021            encoding,
4022            count,
4023        }
4024    }
4025
4026    // Function to use in tests for EncodingWriteSupport. This checks that dictionary
4027    // offset and encodings to make sure that column writer uses provided by trait
4028    // encodings.
4029    fn check_encoding_write_support<T: DataType>(
4030        version: WriterVersion,
4031        dict_enabled: bool,
4032        data: &[T::T],
4033        dictionary_page_offset: Option<i64>,
4034        encodings: &[Encoding],
4035        page_encoding_stats: &[PageEncodingStats],
4036    ) {
4037        let props = WriterProperties::builder()
4038            .set_writer_version(version)
4039            .set_dictionary_enabled(dict_enabled)
4040            .build();
4041        let meta = column_write_and_get_metadata::<T>(props, data);
4042        assert_eq!(meta.dictionary_page_offset(), dictionary_page_offset);
4043        assert_eq!(meta.encodings(), encodings);
4044        assert_eq!(meta.page_encoding_stats().unwrap(), page_encoding_stats);
4045    }
4046
4047    /// Returns column writer.
4048    fn get_test_column_writer<'a, T: DataType>(
4049        page_writer: Box<dyn PageWriter + 'a>,
4050        max_def_level: i16,
4051        max_rep_level: i16,
4052        props: WriterPropertiesPtr,
4053    ) -> ColumnWriterImpl<'a, T> {
4054        let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
4055        let column_writer = get_column_writer(descr, props, page_writer);
4056        get_typed_column_writer::<T>(column_writer)
4057    }
4058
4059    /// Returns column reader.
4060    fn get_test_column_reader<T: DataType>(
4061        page_reader: Box<dyn PageReader>,
4062        max_def_level: i16,
4063        max_rep_level: i16,
4064    ) -> ColumnReaderImpl<T> {
4065        let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
4066        let column_reader = get_column_reader(descr, page_reader);
4067        get_typed_column_reader::<T>(column_reader)
4068    }
4069
4070    /// Returns descriptor for primitive column.
4071    fn get_test_column_descr<T: DataType>(
4072        max_def_level: i16,
4073        max_rep_level: i16,
4074    ) -> ColumnDescriptor {
4075        let path = ColumnPath::from("col");
4076        let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4077            // length is set for "encoding support" tests for FIXED_LEN_BYTE_ARRAY type,
4078            // it should be no-op for other types
4079            .with_length(1)
4080            .build()
4081            .unwrap();
4082        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4083    }
4084
4085    /// Returns page writer that collects pages without serializing them.
4086    fn get_test_page_writer() -> Box<dyn PageWriter> {
4087        Box::new(TestPageWriter {})
4088    }
4089
4090    struct TestPageWriter {}
4091
4092    impl PageWriter for TestPageWriter {
4093        fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
4094            let mut res = PageWriteSpec::new();
4095            res.page_type = page.page_type();
4096            res.uncompressed_size = page.uncompressed_size();
4097            res.compressed_size = page.compressed_size();
4098            res.num_values = page.num_values();
4099            res.offset = 0;
4100            res.bytes_written = page.data().len() as u64;
4101            Ok(res)
4102        }
4103
4104        fn close(&mut self) -> Result<()> {
4105            Ok(())
4106        }
4107    }
4108
4109    /// Write data into parquet using [`get_test_page_writer`] and [`get_test_column_writer`] and returns generated statistics.
4110    fn statistics_roundtrip<T: DataType>(values: &[<T as DataType>::T]) -> Statistics {
4111        let page_writer = get_test_page_writer();
4112        let props = Default::default();
4113        let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
4114        writer.write_batch(values, None, None).unwrap();
4115
4116        let metadata = writer.close().unwrap().metadata;
4117        if let Some(stats) = metadata.statistics() {
4118            stats.clone()
4119        } else {
4120            panic!("metadata missing statistics");
4121        }
4122    }
4123
4124    /// Returns Decimals column writer.
4125    fn get_test_decimals_column_writer<T: DataType>(
4126        page_writer: Box<dyn PageWriter>,
4127        max_def_level: i16,
4128        max_rep_level: i16,
4129        props: WriterPropertiesPtr,
4130    ) -> ColumnWriterImpl<'static, T> {
4131        let descr = Arc::new(get_test_decimals_column_descr::<T>(
4132            max_def_level,
4133            max_rep_level,
4134        ));
4135        let column_writer = get_column_writer(descr, props, page_writer);
4136        get_typed_column_writer::<T>(column_writer)
4137    }
4138
4139    /// Returns descriptor for Decimal type with primitive column.
4140    fn get_test_decimals_column_descr<T: DataType>(
4141        max_def_level: i16,
4142        max_rep_level: i16,
4143    ) -> ColumnDescriptor {
4144        let path = ColumnPath::from("col");
4145        let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4146            .with_length(16)
4147            .with_logical_type(Some(LogicalType::Decimal {
4148                scale: 2,
4149                precision: 3,
4150            }))
4151            .with_scale(2)
4152            .with_precision(3)
4153            .build()
4154            .unwrap();
4155        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4156    }
4157
4158    fn float16_statistics_roundtrip(
4159        values: &[FixedLenByteArray],
4160    ) -> ValueStatistics<FixedLenByteArray> {
4161        let page_writer = get_test_page_writer();
4162        let mut writer = get_test_float16_column_writer(page_writer, Default::default());
4163        writer.write_batch(values, None, None).unwrap();
4164
4165        let metadata = writer.close().unwrap().metadata;
4166        if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
4167            stats.clone()
4168        } else {
4169            panic!("metadata missing statistics");
4170        }
4171    }
4172
4173    fn get_test_float16_column_writer(
4174        page_writer: Box<dyn PageWriter>,
4175        props: WriterPropertiesPtr,
4176    ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
4177        let descr = Arc::new(get_test_float16_column_descr(0, 0));
4178        let column_writer = get_column_writer(descr, props, page_writer);
4179        get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
4180    }
4181
4182    fn get_test_float16_column_descr(max_def_level: i16, max_rep_level: i16) -> ColumnDescriptor {
4183        let path = ColumnPath::from("col");
4184        let tpe =
4185            SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
4186                .with_length(2)
4187                .with_logical_type(Some(LogicalType::Float16))
4188                .build()
4189                .unwrap();
4190        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4191    }
4192
4193    fn get_test_interval_column_writer(
4194        page_writer: Box<dyn PageWriter>,
4195    ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
4196        let descr = Arc::new(get_test_interval_column_descr());
4197        let column_writer = get_column_writer(descr, Default::default(), page_writer);
4198        get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
4199    }
4200
4201    fn get_test_interval_column_descr() -> ColumnDescriptor {
4202        let path = ColumnPath::from("col");
4203        let tpe =
4204            SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
4205                .with_length(12)
4206                .with_converted_type(ConvertedType::INTERVAL)
4207                .build()
4208                .unwrap();
4209        ColumnDescriptor::new(Arc::new(tpe), 0, 0, path)
4210    }
4211
4212    /// Returns column writer for UINT32 Column provided as ConvertedType only
4213    fn get_test_unsigned_int_given_as_converted_column_writer<'a, T: DataType>(
4214        page_writer: Box<dyn PageWriter + 'a>,
4215        max_def_level: i16,
4216        max_rep_level: i16,
4217        props: WriterPropertiesPtr,
4218    ) -> ColumnWriterImpl<'a, T> {
4219        let descr = Arc::new(get_test_converted_type_unsigned_integer_column_descr::<T>(
4220            max_def_level,
4221            max_rep_level,
4222        ));
4223        let column_writer = get_column_writer(descr, props, page_writer);
4224        get_typed_column_writer::<T>(column_writer)
4225    }
4226
4227    /// Returns column descriptor for UINT32 Column provided as ConvertedType only
4228    fn get_test_converted_type_unsigned_integer_column_descr<T: DataType>(
4229        max_def_level: i16,
4230        max_rep_level: i16,
4231    ) -> ColumnDescriptor {
4232        let path = ColumnPath::from("col");
4233        let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4234            .with_converted_type(ConvertedType::UINT_32)
4235            .build()
4236            .unwrap();
4237        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4238    }
4239}