Skip to main content

parquet/column/writer/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains column writer API.
19
20use bytes::Bytes;
21use half::f16;
22
23use crate::bloom_filter::Sbbf;
24use crate::file::page_index::column_index::ColumnIndexMetaData;
25use crate::file::page_index::offset_index::OffsetIndexMetaData;
26use std::collections::{BTreeSet, VecDeque};
27use std::str;
28
29use crate::basic::{
30    BoundaryOrder, Compression, ConvertedType, Encoding, EncodingMask, LogicalType, PageType, Type,
31};
32use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter};
33use crate::column::writer::encoder::{ColumnValueEncoder, ColumnValueEncoderImpl, ColumnValues};
34use crate::compression::{Codec, CodecOptionsBuilder, create_codec};
35use crate::data_type::private::ParquetValueType;
36use crate::data_type::*;
37use crate::encodings::levels::LevelEncoder;
38#[cfg(feature = "encryption")]
39use crate::encryption::encrypt::get_column_crypto_metadata;
40use crate::errors::{ParquetError, Result};
41use crate::file::metadata::{
42    ColumnChunkMetaData, ColumnChunkMetaDataBuilder, ColumnIndexBuilder, LevelHistogram,
43    OffsetIndexBuilder, PageEncodingStats,
44};
45use crate::file::properties::{
46    EnabledStatistics, WriterProperties, WriterPropertiesPtr, WriterVersion,
47};
48use crate::file::statistics::{Statistics, ValueStatistics};
49use crate::schema::types::{ColumnDescPtr, ColumnDescriptor};
50
51pub(crate) mod encoder;
52
53macro_rules! downcast_writer {
54    ($e:expr, $i:ident, $b:expr) => {
55        match $e {
56            Self::BoolColumnWriter($i) => $b,
57            Self::Int32ColumnWriter($i) => $b,
58            Self::Int64ColumnWriter($i) => $b,
59            Self::Int96ColumnWriter($i) => $b,
60            Self::FloatColumnWriter($i) => $b,
61            Self::DoubleColumnWriter($i) => $b,
62            Self::ByteArrayColumnWriter($i) => $b,
63            Self::FixedLenByteArrayColumnWriter($i) => $b,
64        }
65    };
66}
67
68/// Column writer for a Parquet type.
69///
70/// See [`get_column_writer`] to create instances of this type
71pub enum ColumnWriter<'a> {
72    /// Column writer for boolean type
73    BoolColumnWriter(ColumnWriterImpl<'a, BoolType>),
74    /// Column writer for int32 type
75    Int32ColumnWriter(ColumnWriterImpl<'a, Int32Type>),
76    /// Column writer for int64 type
77    Int64ColumnWriter(ColumnWriterImpl<'a, Int64Type>),
78    /// Column writer for int96 (timestamp) type
79    Int96ColumnWriter(ColumnWriterImpl<'a, Int96Type>),
80    /// Column writer for float type
81    FloatColumnWriter(ColumnWriterImpl<'a, FloatType>),
82    /// Column writer for double type
83    DoubleColumnWriter(ColumnWriterImpl<'a, DoubleType>),
84    /// Column writer for byte array type
85    ByteArrayColumnWriter(ColumnWriterImpl<'a, ByteArrayType>),
86    /// Column writer for fixed length byte array type
87    FixedLenByteArrayColumnWriter(ColumnWriterImpl<'a, FixedLenByteArrayType>),
88}
89
90impl ColumnWriter<'_> {
91    /// Returns the estimated total memory usage
92    #[cfg(feature = "arrow")]
93    pub(crate) fn memory_size(&self) -> usize {
94        downcast_writer!(self, typed, typed.memory_size())
95    }
96
97    /// Returns the estimated total encoded bytes for this column writer
98    #[cfg(feature = "arrow")]
99    pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
100        downcast_writer!(self, typed, typed.get_estimated_total_bytes())
101    }
102
103    /// Finalize the currently buffered values as a data page.
104    ///
105    /// This is used by content-defined chunking to force a page boundary at
106    /// content-determined positions.
107    #[cfg(feature = "arrow")]
108    pub(crate) fn add_data_page(&mut self) -> Result<()> {
109        downcast_writer!(self, typed, typed.add_data_page())
110    }
111
112    /// Close this [`ColumnWriter`], returning the metadata for the column chunk.
113    pub fn close(self) -> Result<ColumnCloseResult> {
114        downcast_writer!(self, typed, typed.close())
115    }
116}
117
118/// Create a specific column writer corresponding to column descriptor `descr`.
119pub fn get_column_writer<'a>(
120    descr: ColumnDescPtr,
121    props: WriterPropertiesPtr,
122    page_writer: Box<dyn PageWriter + 'a>,
123) -> ColumnWriter<'a> {
124    match descr.physical_type() {
125        Type::BOOLEAN => {
126            ColumnWriter::BoolColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
127        }
128        Type::INT32 => {
129            ColumnWriter::Int32ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
130        }
131        Type::INT64 => {
132            ColumnWriter::Int64ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
133        }
134        Type::INT96 => {
135            ColumnWriter::Int96ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
136        }
137        Type::FLOAT => {
138            ColumnWriter::FloatColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
139        }
140        Type::DOUBLE => {
141            ColumnWriter::DoubleColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
142        }
143        Type::BYTE_ARRAY => {
144            ColumnWriter::ByteArrayColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
145        }
146        Type::FIXED_LEN_BYTE_ARRAY => ColumnWriter::FixedLenByteArrayColumnWriter(
147            ColumnWriterImpl::new(descr, props, page_writer),
148        ),
149    }
150}
151
152/// Gets a typed column writer for the specific type `T`, by "up-casting" `col_writer` of
153/// non-generic type to a generic column writer type `ColumnWriterImpl`.
154///
155/// Panics if actual enum value for `col_writer` does not match the type `T`.
156pub fn get_typed_column_writer<T: DataType>(col_writer: ColumnWriter) -> ColumnWriterImpl<T> {
157    T::get_column_writer(col_writer).unwrap_or_else(|| {
158        panic!(
159            "Failed to convert column writer into a typed column writer for `{}` type",
160            T::get_physical_type()
161        )
162    })
163}
164
165/// Similar to `get_typed_column_writer` but returns a reference.
166pub fn get_typed_column_writer_ref<'a, 'b: 'a, T: DataType>(
167    col_writer: &'b ColumnWriter<'a>,
168) -> &'b ColumnWriterImpl<'a, T> {
169    T::get_column_writer_ref(col_writer).unwrap_or_else(|| {
170        panic!(
171            "Failed to convert column writer into a typed column writer for `{}` type",
172            T::get_physical_type()
173        )
174    })
175}
176
177/// Similar to `get_typed_column_writer` but returns a reference.
178pub fn get_typed_column_writer_mut<'a, 'b: 'a, T: DataType>(
179    col_writer: &'a mut ColumnWriter<'b>,
180) -> &'a mut ColumnWriterImpl<'b, T> {
181    T::get_column_writer_mut(col_writer).unwrap_or_else(|| {
182        panic!(
183            "Failed to convert column writer into a typed column writer for `{}` type",
184            T::get_physical_type()
185        )
186    })
187}
188
189/// Metadata for a column chunk of a Parquet file.
190///
191/// Note this structure is returned by [`ColumnWriter::close`].
192#[derive(Debug, Clone)]
193pub struct ColumnCloseResult {
194    /// The total number of bytes written
195    pub bytes_written: u64,
196    /// The total number of rows written
197    pub rows_written: u64,
198    /// Metadata for this column chunk
199    pub metadata: ColumnChunkMetaData,
200    /// Optional bloom filter for this column
201    pub bloom_filter: Option<Sbbf>,
202    /// Optional column index, for filtering
203    pub column_index: Option<ColumnIndexMetaData>,
204    /// Optional offset index, identifying page locations
205    pub offset_index: Option<OffsetIndexMetaData>,
206}
207
208// Metrics per page
209#[derive(Default)]
210struct PageMetrics {
211    num_buffered_values: u32,
212    num_buffered_rows: u32,
213    num_page_nulls: u64,
214    repetition_level_histogram: Option<LevelHistogram>,
215    definition_level_histogram: Option<LevelHistogram>,
216}
217
218impl PageMetrics {
219    fn new() -> Self {
220        Default::default()
221    }
222
223    /// Initialize the repetition level histogram
224    fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
225        self.repetition_level_histogram = LevelHistogram::try_new(max_level);
226        self
227    }
228
229    /// Initialize the definition level histogram
230    fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
231        self.definition_level_histogram = LevelHistogram::try_new(max_level);
232        self
233    }
234
235    /// Resets the state of this `PageMetrics` to the initial state.
236    /// If histograms have been initialized their contents will be reset to zero.
237    fn new_page(&mut self) {
238        self.num_buffered_values = 0;
239        self.num_buffered_rows = 0;
240        self.num_page_nulls = 0;
241        self.repetition_level_histogram
242            .as_mut()
243            .map(LevelHistogram::reset);
244        self.definition_level_histogram
245            .as_mut()
246            .map(LevelHistogram::reset);
247    }
248
249    /// Updates histogram values using provided repetition levels
250    fn update_repetition_level_histogram(&mut self, levels: &[i16]) {
251        if let Some(ref mut rep_hist) = self.repetition_level_histogram {
252            rep_hist.update_from_levels(levels);
253        }
254    }
255
256    /// Updates histogram values using provided definition levels
257    fn update_definition_level_histogram(&mut self, levels: &[i16]) {
258        if let Some(ref mut def_hist) = self.definition_level_histogram {
259            def_hist.update_from_levels(levels);
260        }
261    }
262}
263
264// Metrics per column writer
265#[derive(Default)]
266struct ColumnMetrics<T: Default> {
267    total_bytes_written: u64,
268    total_rows_written: u64,
269    total_uncompressed_size: u64,
270    total_compressed_size: u64,
271    total_num_values: u64,
272    dictionary_page_offset: Option<u64>,
273    data_page_offset: Option<u64>,
274    min_column_value: Option<T>,
275    max_column_value: Option<T>,
276    num_column_nulls: u64,
277    column_distinct_count: Option<u64>,
278    variable_length_bytes: Option<i64>,
279    repetition_level_histogram: Option<LevelHistogram>,
280    definition_level_histogram: Option<LevelHistogram>,
281}
282
283impl<T: Default> ColumnMetrics<T> {
284    fn new() -> Self {
285        Default::default()
286    }
287
288    /// Initialize the repetition level histogram
289    fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
290        self.repetition_level_histogram = LevelHistogram::try_new(max_level);
291        self
292    }
293
294    /// Initialize the definition level histogram
295    fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
296        self.definition_level_histogram = LevelHistogram::try_new(max_level);
297        self
298    }
299
300    /// Sum `page_histogram` into `chunk_histogram`
301    fn update_histogram(
302        chunk_histogram: &mut Option<LevelHistogram>,
303        page_histogram: &Option<LevelHistogram>,
304    ) {
305        if let (Some(page_hist), Some(chunk_hist)) = (page_histogram, chunk_histogram) {
306            chunk_hist.add(page_hist);
307        }
308    }
309
310    /// Sum the provided PageMetrics histograms into the chunk histograms. Does nothing if
311    /// page histograms are not initialized.
312    fn update_from_page_metrics(&mut self, page_metrics: &PageMetrics) {
313        ColumnMetrics::<T>::update_histogram(
314            &mut self.definition_level_histogram,
315            &page_metrics.definition_level_histogram,
316        );
317        ColumnMetrics::<T>::update_histogram(
318            &mut self.repetition_level_histogram,
319            &page_metrics.repetition_level_histogram,
320        );
321    }
322
323    /// Sum the provided page variable_length_bytes into the chunk variable_length_bytes
324    fn update_variable_length_bytes(&mut self, variable_length_bytes: Option<i64>) {
325        if let Some(var_bytes) = variable_length_bytes {
326            *self.variable_length_bytes.get_or_insert(0) += var_bytes;
327        }
328    }
329}
330
331/// Typed column writer for a primitive column.
332pub type ColumnWriterImpl<'a, T> = GenericColumnWriter<'a, ColumnValueEncoderImpl<T>>;
333
334/// Generic column writer for a primitive Parquet column
335pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> {
336    // Column writer properties
337    descr: ColumnDescPtr,
338    props: WriterPropertiesPtr,
339    statistics_enabled: EnabledStatistics,
340
341    page_writer: Box<dyn PageWriter + 'a>,
342    codec: Compression,
343    compressor: Option<Box<dyn Codec>>,
344    encoder: E,
345
346    page_metrics: PageMetrics,
347    // Metrics per column writer
348    column_metrics: ColumnMetrics<E::T>,
349
350    /// The order of encodings within the generated metadata does not impact its meaning,
351    /// but we use a BTreeSet so that the output is deterministic
352    encodings: BTreeSet<Encoding>,
353    encoding_stats: Vec<PageEncodingStats>,
354    // Streaming level encoders for definition/repetition levels.
355    def_levels_encoder: LevelEncoder,
356    rep_levels_encoder: LevelEncoder,
357    data_pages: VecDeque<CompressedPage>,
358    // column index and offset index
359    column_index_builder: ColumnIndexBuilder,
360    offset_index_builder: Option<OffsetIndexBuilder>,
361
362    // Below fields used to incrementally check boundary order across data pages.
363    // We assume they are ascending/descending until proven wrong.
364    data_page_boundary_ascending: bool,
365    data_page_boundary_descending: bool,
366    /// (min, max)
367    last_non_null_data_page_min_max: Option<(E::T, E::T)>,
368}
369
370impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
371    /// Returns a new instance of [`GenericColumnWriter`].
372    pub fn new(
373        descr: ColumnDescPtr,
374        props: WriterPropertiesPtr,
375        page_writer: Box<dyn PageWriter + 'a>,
376    ) -> Self {
377        let codec = props.compression(descr.path());
378        let codec_options = CodecOptionsBuilder::default().build();
379        let compressor = create_codec(codec, &codec_options).unwrap();
380        let encoder = E::try_new(&descr, props.as_ref()).unwrap();
381
382        let statistics_enabled = props.statistics_enabled(descr.path());
383
384        let mut encodings = BTreeSet::new();
385        // Used for level information
386        encodings.insert(Encoding::RLE);
387
388        let mut page_metrics = PageMetrics::new();
389        let mut column_metrics = ColumnMetrics::<E::T>::new();
390
391        // Initialize level histograms if collecting page or chunk statistics
392        if statistics_enabled != EnabledStatistics::None {
393            page_metrics = page_metrics
394                .with_repetition_level_histogram(descr.max_rep_level())
395                .with_definition_level_histogram(descr.max_def_level());
396            column_metrics = column_metrics
397                .with_repetition_level_histogram(descr.max_rep_level())
398                .with_definition_level_histogram(descr.max_def_level())
399        }
400
401        // Disable column_index_builder if not collecting page statistics.
402        let mut column_index_builder = ColumnIndexBuilder::new(descr.physical_type());
403        if statistics_enabled != EnabledStatistics::Page {
404            column_index_builder.to_invalid()
405        }
406
407        // Disable offset_index_builder if requested by user.
408        let offset_index_builder = match props.offset_index_disabled() {
409            false => Some(OffsetIndexBuilder::new()),
410            _ => None,
411        };
412
413        Self {
414            def_levels_encoder: Self::create_level_encoder(descr.max_def_level(), &props),
415            rep_levels_encoder: Self::create_level_encoder(descr.max_rep_level(), &props),
416            descr,
417            props,
418            statistics_enabled,
419            page_writer,
420            codec,
421            compressor,
422            encoder,
423            data_pages: VecDeque::new(),
424            page_metrics,
425            column_metrics,
426            column_index_builder,
427            offset_index_builder,
428            encodings,
429            encoding_stats: vec![],
430            data_page_boundary_ascending: true,
431            data_page_boundary_descending: true,
432            last_non_null_data_page_min_max: None,
433        }
434    }
435
436    #[allow(clippy::too_many_arguments)]
437    pub(crate) fn write_batch_internal(
438        &mut self,
439        values: &E::Values,
440        value_indices: Option<&[usize]>,
441        def_levels: Option<&[i16]>,
442        rep_levels: Option<&[i16]>,
443        min: Option<&E::T>,
444        max: Option<&E::T>,
445        distinct_count: Option<u64>,
446    ) -> Result<usize> {
447        // Check if number of definition levels is the same as number of repetition levels.
448        if let (Some(def), Some(rep)) = (def_levels, rep_levels) {
449            if def.len() != rep.len() {
450                return Err(general_err!(
451                    "Inconsistent length of definition and repetition levels: {} != {}",
452                    def.len(),
453                    rep.len()
454                ));
455            }
456        }
457
458        // We check for DataPage limits only after we have inserted the values. If a user
459        // writes a large number of values, the DataPage size can be well above the limit.
460        //
461        // The purpose of this chunking is to bound this. Even if a user writes large
462        // number of values, the chunking will ensure that we add data page at a
463        // reasonable pagesize limit.
464
465        // TODO: find out why we don't account for size of levels when we estimate page
466        // size.
467
468        let num_levels = match def_levels {
469            Some(def_levels) => def_levels.len(),
470            None => values.len(),
471        };
472
473        if let Some(min) = min {
474            update_min(&self.descr, min, &mut self.column_metrics.min_column_value);
475        }
476        if let Some(max) = max {
477            update_max(&self.descr, max, &mut self.column_metrics.max_column_value);
478        }
479
480        // We can only set the distinct count if there are no other writes
481        if self.encoder.num_values() == 0 {
482            self.column_metrics.column_distinct_count = distinct_count;
483        } else {
484            self.column_metrics.column_distinct_count = None;
485        }
486
487        let mut values_offset = 0;
488        let mut levels_offset = 0;
489        let base_batch_size = self.props.write_batch_size();
490        while levels_offset < num_levels {
491            let mut end_offset = num_levels.min(levels_offset + base_batch_size);
492
493            // Split at record boundary
494            if let Some(r) = rep_levels {
495                while end_offset < r.len() && r[end_offset] != 0 {
496                    end_offset += 1;
497                }
498            }
499
500            values_offset += self.write_mini_batch(
501                values,
502                values_offset,
503                value_indices,
504                end_offset - levels_offset,
505                def_levels.map(|lv| &lv[levels_offset..end_offset]),
506                rep_levels.map(|lv| &lv[levels_offset..end_offset]),
507            )?;
508            levels_offset = end_offset;
509        }
510
511        // Return total number of values processed.
512        Ok(values_offset)
513    }
514
515    /// Writes batch of values, definition levels and repetition levels.
516    /// Returns number of values processed (written).
517    ///
518    /// If definition and repetition levels are provided, we write fully those levels and
519    /// select how many values to write (this number will be returned), since number of
520    /// actual written values may be smaller than provided values.
521    ///
522    /// If only values are provided, then all values are written and the length of
523    /// of the values buffer is returned.
524    ///
525    /// Definition and/or repetition levels can be omitted, if values are
526    /// non-nullable and/or non-repeated.
527    pub fn write_batch(
528        &mut self,
529        values: &E::Values,
530        def_levels: Option<&[i16]>,
531        rep_levels: Option<&[i16]>,
532    ) -> Result<usize> {
533        self.write_batch_internal(values, None, def_levels, rep_levels, None, None, None)
534    }
535
536    /// Writer may optionally provide pre-calculated statistics for use when computing
537    /// chunk-level statistics
538    ///
539    /// NB: [`WriterProperties::statistics_enabled`] must be set to [`EnabledStatistics::Chunk`]
540    /// for these statistics to take effect. If [`EnabledStatistics::None`] they will be ignored,
541    /// and if [`EnabledStatistics::Page`] the chunk statistics will instead be computed from the
542    /// computed page statistics
543    pub fn write_batch_with_statistics(
544        &mut self,
545        values: &E::Values,
546        def_levels: Option<&[i16]>,
547        rep_levels: Option<&[i16]>,
548        min: Option<&E::T>,
549        max: Option<&E::T>,
550        distinct_count: Option<u64>,
551    ) -> Result<usize> {
552        self.write_batch_internal(
553            values,
554            None,
555            def_levels,
556            rep_levels,
557            min,
558            max,
559            distinct_count,
560        )
561    }
562
563    /// Returns the estimated total memory usage.
564    ///
565    /// Unlike [`Self::get_estimated_total_bytes`] this is an estimate
566    /// of the current memory usage and not the final anticipated encoded size.
567    #[cfg(feature = "arrow")]
568    pub(crate) fn memory_size(&self) -> usize {
569        self.column_metrics.total_bytes_written as usize + self.encoder.estimated_memory_size()
570    }
571
572    /// Returns total number of bytes written by this column writer so far.
573    /// This value is also returned when column writer is closed.
574    ///
575    /// Note: this value does not include any buffered data that has not
576    /// yet been flushed to a page.
577    pub fn get_total_bytes_written(&self) -> u64 {
578        self.column_metrics.total_bytes_written
579    }
580
581    /// Returns the estimated total encoded bytes for this column writer.
582    ///
583    /// Unlike [`Self::get_total_bytes_written`] this includes an estimate
584    /// of any data that has not yet been flushed to a page, based on it's
585    /// anticipated encoded size.
586    #[cfg(feature = "arrow")]
587    pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
588        self.data_pages
589            .iter()
590            .map(|page| page.data().len() as u64)
591            .sum::<u64>()
592            + self.column_metrics.total_bytes_written
593            + self.encoder.estimated_data_page_size() as u64
594            + self.encoder.estimated_dict_page_size().unwrap_or_default() as u64
595    }
596
597    /// Returns total number of rows written by this column writer so far.
598    /// This value is also returned when column writer is closed.
599    pub fn get_total_rows_written(&self) -> u64 {
600        self.column_metrics.total_rows_written
601    }
602
603    /// Returns a reference to a [`ColumnDescPtr`]
604    pub fn get_descriptor(&self) -> &ColumnDescPtr {
605        &self.descr
606    }
607
608    /// Finalizes writes and closes the column writer.
609    /// Returns total bytes written, total rows written and column chunk metadata.
610    pub fn close(mut self) -> Result<ColumnCloseResult> {
611        if self.page_metrics.num_buffered_values > 0 {
612            self.add_data_page()?;
613        }
614        if self.encoder.has_dictionary() {
615            self.write_dictionary_page()?;
616        }
617        self.flush_data_pages()?;
618        let metadata = self.build_column_metadata()?;
619        self.page_writer.close()?;
620
621        let boundary_order = match (
622            self.data_page_boundary_ascending,
623            self.data_page_boundary_descending,
624        ) {
625            // If the lists are composed of equal elements then will be marked as ascending
626            // (Also the case if all pages are null pages)
627            (true, _) => BoundaryOrder::ASCENDING,
628            (false, true) => BoundaryOrder::DESCENDING,
629            (false, false) => BoundaryOrder::UNORDERED,
630        };
631        self.column_index_builder.set_boundary_order(boundary_order);
632
633        let column_index = match self.column_index_builder.valid() {
634            true => Some(self.column_index_builder.build()?),
635            false => None,
636        };
637
638        let offset_index = self.offset_index_builder.map(|b| b.build());
639
640        Ok(ColumnCloseResult {
641            bytes_written: self.column_metrics.total_bytes_written,
642            rows_written: self.column_metrics.total_rows_written,
643            bloom_filter: self.encoder.flush_bloom_filter(),
644            metadata,
645            column_index,
646            offset_index,
647        })
648    }
649
650    /// Creates a new streaming level encoder appropriate for the writer version.
651    fn create_level_encoder(max_level: i16, props: &WriterProperties) -> LevelEncoder {
652        match props.writer_version() {
653            WriterVersion::PARQUET_1_0 => LevelEncoder::v1_streaming(max_level),
654            WriterVersion::PARQUET_2_0 => LevelEncoder::v2_streaming(max_level),
655        }
656    }
657
658    /// Writes mini batch of values, definition and repetition levels.
659    /// This allows fine-grained processing of values and maintaining a reasonable
660    /// page size.
661    fn write_mini_batch(
662        &mut self,
663        values: &E::Values,
664        values_offset: usize,
665        value_indices: Option<&[usize]>,
666        num_levels: usize,
667        def_levels: Option<&[i16]>,
668        rep_levels: Option<&[i16]>,
669    ) -> Result<usize> {
670        // Process definition levels and determine how many values to write.
671        let values_to_write = if self.descr.max_def_level() > 0 {
672            let levels = def_levels.ok_or_else(|| {
673                general_err!(
674                    "Definition levels are required, because max definition level = {}",
675                    self.descr.max_def_level()
676                )
677            })?;
678
679            let values_to_write = levels
680                .iter()
681                .map(|level| (*level == self.descr.max_def_level()) as usize)
682                .sum();
683            self.page_metrics.num_page_nulls += (levels.len() - values_to_write) as u64;
684
685            // Update histogram
686            self.page_metrics.update_definition_level_histogram(levels);
687
688            self.def_levels_encoder.put(levels);
689            values_to_write
690        } else {
691            num_levels
692        };
693
694        // Process repetition levels and determine how many rows we are about to process.
695        if self.descr.max_rep_level() > 0 {
696            // A row could contain more than one value.
697            let levels = rep_levels.ok_or_else(|| {
698                general_err!(
699                    "Repetition levels are required, because max repetition level = {}",
700                    self.descr.max_rep_level()
701                )
702            })?;
703
704            if !levels.is_empty() && levels[0] != 0 {
705                return Err(general_err!(
706                    "Write must start at a record boundary, got non-zero repetition level of {}",
707                    levels[0]
708                ));
709            }
710
711            // Count the occasions where we start a new row
712            for &level in levels {
713                self.page_metrics.num_buffered_rows += (level == 0) as u32
714            }
715
716            // Update histogram
717            self.page_metrics.update_repetition_level_histogram(levels);
718
719            self.rep_levels_encoder.put(levels);
720        } else {
721            // Each value is exactly one row.
722            // Equals to the number of values, we count nulls as well.
723            self.page_metrics.num_buffered_rows += num_levels as u32;
724        }
725
726        match value_indices {
727            Some(indices) => {
728                let indices = &indices[values_offset..values_offset + values_to_write];
729                self.encoder.write_gather(values, indices)?;
730            }
731            None => self.encoder.write(values, values_offset, values_to_write)?,
732        }
733
734        self.page_metrics.num_buffered_values += num_levels as u32;
735
736        if self.should_add_data_page() {
737            self.add_data_page()?;
738        }
739
740        if self.should_dict_fallback() {
741            self.dict_fallback()?;
742        }
743
744        Ok(values_to_write)
745    }
746
747    /// Returns true if we need to fall back to non-dictionary encoding.
748    ///
749    /// We can only fall back if dictionary encoder is set and we have exceeded dictionary
750    /// size.
751    #[inline]
752    fn should_dict_fallback(&self) -> bool {
753        match self.encoder.estimated_dict_page_size() {
754            Some(size) => {
755                size >= self
756                    .props
757                    .column_dictionary_page_size_limit(self.descr.path())
758            }
759            None => false,
760        }
761    }
762
763    /// Returns true if there is enough data for a data page, false otherwise.
764    #[inline]
765    fn should_add_data_page(&self) -> bool {
766        // This is necessary in the event of a much larger dictionary size than page size
767        //
768        // In such a scenario the dictionary decoder may return an estimated encoded
769        // size in excess of the page size limit, even when there are no buffered values
770        if self.page_metrics.num_buffered_values == 0 {
771            return false;
772        }
773
774        self.page_metrics.num_buffered_rows as usize >= self.props.data_page_row_count_limit()
775            || self.encoder.estimated_data_page_size()
776                >= self.props.column_data_page_size_limit(self.descr.path())
777    }
778
779    /// Performs dictionary fallback.
780    /// Prepares and writes dictionary and all data pages into page writer.
781    fn dict_fallback(&mut self) -> Result<()> {
782        // At this point we know that we need to fall back.
783        if self.page_metrics.num_buffered_values > 0 {
784            self.add_data_page()?;
785        }
786        self.write_dictionary_page()?;
787        self.flush_data_pages()?;
788        Ok(())
789    }
790
791    /// Update the column index and offset index when adding the data page
792    fn update_column_offset_index(
793        &mut self,
794        page_statistics: Option<&ValueStatistics<E::T>>,
795        page_variable_length_bytes: Option<i64>,
796    ) {
797        // update the column index
798        let null_page =
799            (self.page_metrics.num_buffered_rows as u64) == self.page_metrics.num_page_nulls;
800        // a page contains only null values,
801        // and writers have to set the corresponding entries in min_values and max_values to byte[0]
802        if null_page && self.column_index_builder.valid() {
803            self.column_index_builder.append(
804                null_page,
805                vec![],
806                vec![],
807                self.page_metrics.num_page_nulls as i64,
808            );
809        } else if self.column_index_builder.valid() {
810            // from page statistics
811            // If can't get the page statistics, ignore this column/offset index for this column chunk
812            match &page_statistics {
813                None => {
814                    self.column_index_builder.to_invalid();
815                }
816                Some(stat) => {
817                    // Check if min/max are still ascending/descending across pages
818                    let new_min = stat.min_opt().unwrap();
819                    let new_max = stat.max_opt().unwrap();
820                    if let Some((last_min, last_max)) = &self.last_non_null_data_page_min_max {
821                        if self.data_page_boundary_ascending {
822                            // If last min/max are greater than new min/max then not ascending anymore
823                            let not_ascending = compare_greater(&self.descr, last_min, new_min)
824                                || compare_greater(&self.descr, last_max, new_max);
825                            if not_ascending {
826                                self.data_page_boundary_ascending = false;
827                            }
828                        }
829
830                        if self.data_page_boundary_descending {
831                            // If new min/max are greater than last min/max then not descending anymore
832                            let not_descending = compare_greater(&self.descr, new_min, last_min)
833                                || compare_greater(&self.descr, new_max, last_max);
834                            if not_descending {
835                                self.data_page_boundary_descending = false;
836                            }
837                        }
838                    }
839                    self.last_non_null_data_page_min_max = Some((new_min.clone(), new_max.clone()));
840
841                    if self.can_truncate_value() {
842                        self.column_index_builder.append(
843                            null_page,
844                            self.truncate_min_value(
845                                self.props.column_index_truncate_length(),
846                                stat.min_bytes_opt().unwrap(),
847                            )
848                            .0,
849                            self.truncate_max_value(
850                                self.props.column_index_truncate_length(),
851                                stat.max_bytes_opt().unwrap(),
852                            )
853                            .0,
854                            self.page_metrics.num_page_nulls as i64,
855                        );
856                    } else {
857                        self.column_index_builder.append(
858                            null_page,
859                            stat.min_bytes_opt().unwrap().to_vec(),
860                            stat.max_bytes_opt().unwrap().to_vec(),
861                            self.page_metrics.num_page_nulls as i64,
862                        );
863                    }
864                }
865            }
866        }
867
868        // Append page histograms to the `ColumnIndex` histograms
869        self.column_index_builder.append_histograms(
870            &self.page_metrics.repetition_level_histogram,
871            &self.page_metrics.definition_level_histogram,
872        );
873
874        // Update the offset index
875        if let Some(builder) = self.offset_index_builder.as_mut() {
876            builder.append_row_count(self.page_metrics.num_buffered_rows as i64);
877            builder.append_unencoded_byte_array_data_bytes(page_variable_length_bytes);
878        }
879    }
880
881    /// Determine if we should allow truncating min/max values for this column's statistics
882    fn can_truncate_value(&self) -> bool {
883        match self.descr.physical_type() {
884            // Don't truncate for Float16 and Decimal because their sort order is different
885            // from that of FIXED_LEN_BYTE_ARRAY sort order.
886            // So truncation of those types could lead to inaccurate min/max statistics
887            Type::FIXED_LEN_BYTE_ARRAY
888                if !matches!(
889                    self.descr.logical_type_ref(),
890                    Some(&LogicalType::Decimal { .. }) | Some(&LogicalType::Float16)
891                ) =>
892            {
893                true
894            }
895            Type::BYTE_ARRAY => true,
896            // Truncation only applies for fba/binary physical types
897            _ => false,
898        }
899    }
900
901    /// Returns `true` if this column's logical type is a UTF-8 string.
902    fn is_utf8(&self) -> bool {
903        self.get_descriptor().logical_type_ref() == Some(&LogicalType::String)
904            || self.get_descriptor().converted_type() == ConvertedType::UTF8
905    }
906
907    /// Truncates a binary statistic to at most `truncation_length` bytes.
908    ///
909    /// If truncation is not possible, returns `data`.
910    ///
911    /// The `bool` in the returned tuple indicates whether truncation occurred or not.
912    ///
913    /// UTF-8 Note:
914    /// If the column type indicates UTF-8, and `data` contains valid UTF-8, then the result will
915    /// also remain valid UTF-8, but may be less tnan `truncation_length` bytes to avoid splitting
916    /// on non-character boundaries.
917    fn truncate_min_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
918        truncation_length
919            .filter(|l| data.len() > *l)
920            .and_then(|l|
921                // don't do extra work if this column isn't UTF-8
922                if self.is_utf8() {
923                    match str::from_utf8(data) {
924                        Ok(str_data) => truncate_utf8(str_data, l),
925                        Err(_) => Some(data[..l].to_vec()),
926                    }
927                } else {
928                    Some(data[..l].to_vec())
929                }
930            )
931            .map(|truncated| (truncated, true))
932            .unwrap_or_else(|| (data.to_vec(), false))
933    }
934
935    /// Truncates a binary statistic to at most `truncation_length` bytes, and then increment the
936    /// final byte(s) to yield a valid upper bound. This may result in a result of less than
937    /// `truncation_length` bytes if the last byte(s) overflows.
938    ///
939    /// If truncation is not possible, returns `data`.
940    ///
941    /// The `bool` in the returned tuple indicates whether truncation occurred or not.
942    ///
943    /// UTF-8 Note:
944    /// If the column type indicates UTF-8, and `data` contains valid UTF-8, then the result will
945    /// also remain valid UTF-8 (but again may be less than `truncation_length` bytes). If `data`
946    /// does not contain valid UTF-8, then truncation will occur as if the column is non-string
947    /// binary.
948    fn truncate_max_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
949        truncation_length
950            .filter(|l| data.len() > *l)
951            .and_then(|l|
952                // don't do extra work if this column isn't UTF-8
953                if self.is_utf8() {
954                    match str::from_utf8(data) {
955                        Ok(str_data) => truncate_and_increment_utf8(str_data, l),
956                        Err(_) => increment(data[..l].to_vec()),
957                    }
958                } else {
959                    increment(data[..l].to_vec())
960                }
961            )
962            .map(|truncated| (truncated, true))
963            .unwrap_or_else(|| (data.to_vec(), false))
964    }
965
966    /// Truncate the min and max values that will be written to a data page
967    /// header or column chunk Statistics
968    fn truncate_statistics(&self, statistics: Statistics) -> Statistics {
969        let backwards_compatible_min_max = self.descr.sort_order().is_signed();
970        match statistics {
971            Statistics::ByteArray(stats) if stats._internal_has_min_max_set() => {
972                let (min, did_truncate_min) = self.truncate_min_value(
973                    self.props.statistics_truncate_length(),
974                    stats.min_bytes_opt().unwrap(),
975                );
976                let (max, did_truncate_max) = self.truncate_max_value(
977                    self.props.statistics_truncate_length(),
978                    stats.max_bytes_opt().unwrap(),
979                );
980                Statistics::ByteArray(
981                    ValueStatistics::new(
982                        Some(min.into()),
983                        Some(max.into()),
984                        stats.distinct_count(),
985                        stats.null_count_opt(),
986                        backwards_compatible_min_max,
987                    )
988                    .with_max_is_exact(!did_truncate_max)
989                    .with_min_is_exact(!did_truncate_min),
990                )
991            }
992            Statistics::FixedLenByteArray(stats)
993                if (stats._internal_has_min_max_set() && self.can_truncate_value()) =>
994            {
995                let (min, did_truncate_min) = self.truncate_min_value(
996                    self.props.statistics_truncate_length(),
997                    stats.min_bytes_opt().unwrap(),
998                );
999                let (max, did_truncate_max) = self.truncate_max_value(
1000                    self.props.statistics_truncate_length(),
1001                    stats.max_bytes_opt().unwrap(),
1002                );
1003                Statistics::FixedLenByteArray(
1004                    ValueStatistics::new(
1005                        Some(min.into()),
1006                        Some(max.into()),
1007                        stats.distinct_count(),
1008                        stats.null_count_opt(),
1009                        backwards_compatible_min_max,
1010                    )
1011                    .with_max_is_exact(!did_truncate_max)
1012                    .with_min_is_exact(!did_truncate_min),
1013                )
1014            }
1015            stats => stats,
1016        }
1017    }
1018
1019    /// Adds data page.
1020    /// Data page is either buffered in case of dictionary encoding or written directly.
1021    pub(crate) fn add_data_page(&mut self) -> Result<()> {
1022        // Extract encoded values
1023        let values_data = self.encoder.flush_data_page()?;
1024
1025        let max_def_level = self.descr.max_def_level();
1026        let max_rep_level = self.descr.max_rep_level();
1027
1028        self.column_metrics.num_column_nulls += self.page_metrics.num_page_nulls;
1029
1030        let page_statistics = match (values_data.min_value, values_data.max_value) {
1031            (Some(min), Some(max)) => {
1032                // Update chunk level statistics
1033                update_min(&self.descr, &min, &mut self.column_metrics.min_column_value);
1034                update_max(&self.descr, &max, &mut self.column_metrics.max_column_value);
1035
1036                (self.statistics_enabled == EnabledStatistics::Page).then_some(
1037                    ValueStatistics::new(
1038                        Some(min),
1039                        Some(max),
1040                        None,
1041                        Some(self.page_metrics.num_page_nulls),
1042                        false,
1043                    ),
1044                )
1045            }
1046            _ => None,
1047        };
1048
1049        // update column and offset index
1050        self.update_column_offset_index(
1051            page_statistics.as_ref(),
1052            values_data.variable_length_bytes,
1053        );
1054
1055        // Update histograms and variable_length_bytes in column_metrics
1056        self.column_metrics
1057            .update_from_page_metrics(&self.page_metrics);
1058        self.column_metrics
1059            .update_variable_length_bytes(values_data.variable_length_bytes);
1060
1061        // From here on, we only need page statistics if they will be written to the page header.
1062        let page_statistics = page_statistics
1063            .filter(|_| self.props.write_page_header_statistics(self.descr.path()))
1064            .map(|stats| self.truncate_statistics(Statistics::from(stats)));
1065
1066        let compressed_page = match self.props.writer_version() {
1067            WriterVersion::PARQUET_1_0 => {
1068                let mut buffer = vec![];
1069
1070                if max_rep_level > 0 {
1071                    self.rep_levels_encoder
1072                        .flush_to(|data| buffer.extend_from_slice(data));
1073                }
1074
1075                if max_def_level > 0 {
1076                    self.def_levels_encoder
1077                        .flush_to(|data| buffer.extend_from_slice(data));
1078                }
1079
1080                buffer.extend_from_slice(&values_data.buf);
1081                let uncompressed_size = buffer.len();
1082
1083                if let Some(ref mut cmpr) = self.compressor {
1084                    let mut compressed_buf = Vec::with_capacity(uncompressed_size);
1085                    cmpr.compress(&buffer[..], &mut compressed_buf)?;
1086                    compressed_buf.shrink_to_fit();
1087                    buffer = compressed_buf;
1088                }
1089
1090                let data_page = Page::DataPage {
1091                    buf: buffer.into(),
1092                    num_values: self.page_metrics.num_buffered_values,
1093                    encoding: values_data.encoding,
1094                    def_level_encoding: Encoding::RLE,
1095                    rep_level_encoding: Encoding::RLE,
1096                    statistics: page_statistics,
1097                };
1098
1099                CompressedPage::new(data_page, uncompressed_size)
1100            }
1101            WriterVersion::PARQUET_2_0 => {
1102                let mut rep_levels_byte_len = 0;
1103                let mut def_levels_byte_len = 0;
1104                let mut buffer = vec![];
1105
1106                if max_rep_level > 0 {
1107                    self.rep_levels_encoder
1108                        .flush_to(|data| buffer.extend_from_slice(data));
1109                    rep_levels_byte_len = buffer.len();
1110                }
1111
1112                if max_def_level > 0 {
1113                    self.def_levels_encoder
1114                        .flush_to(|data| buffer.extend_from_slice(data));
1115                    def_levels_byte_len = buffer.len() - rep_levels_byte_len;
1116                }
1117
1118                let uncompressed_size =
1119                    rep_levels_byte_len + def_levels_byte_len + values_data.buf.len();
1120
1121                // Data Page v2 compresses values only.
1122                let is_compressed = match self.compressor {
1123                    Some(ref mut cmpr) => {
1124                        let buffer_len = buffer.len();
1125                        cmpr.compress(&values_data.buf, &mut buffer)?;
1126                        if uncompressed_size <= buffer.len() - buffer_len {
1127                            buffer.truncate(buffer_len);
1128                            buffer.extend_from_slice(&values_data.buf);
1129                            false
1130                        } else {
1131                            true
1132                        }
1133                    }
1134                    None => {
1135                        buffer.extend_from_slice(&values_data.buf);
1136                        false
1137                    }
1138                };
1139
1140                let data_page = Page::DataPageV2 {
1141                    buf: buffer.into(),
1142                    num_values: self.page_metrics.num_buffered_values,
1143                    encoding: values_data.encoding,
1144                    num_nulls: self.page_metrics.num_page_nulls as u32,
1145                    num_rows: self.page_metrics.num_buffered_rows,
1146                    def_levels_byte_len: def_levels_byte_len as u32,
1147                    rep_levels_byte_len: rep_levels_byte_len as u32,
1148                    is_compressed,
1149                    statistics: page_statistics,
1150                };
1151
1152                CompressedPage::new(data_page, uncompressed_size)
1153            }
1154        };
1155
1156        // Check if we need to buffer data page or flush it to the sink directly.
1157        if self.encoder.has_dictionary() {
1158            self.data_pages.push_back(compressed_page);
1159        } else {
1160            self.write_data_page(compressed_page)?;
1161        }
1162
1163        // Update total number of rows.
1164        self.column_metrics.total_rows_written += self.page_metrics.num_buffered_rows as u64;
1165        self.page_metrics.new_page();
1166
1167        Ok(())
1168    }
1169
1170    /// Finalises any outstanding data pages and flushes buffered data pages from
1171    /// dictionary encoding into underlying sink.
1172    #[inline]
1173    fn flush_data_pages(&mut self) -> Result<()> {
1174        // Write all outstanding data to a new page.
1175        if self.page_metrics.num_buffered_values > 0 {
1176            self.add_data_page()?;
1177        }
1178
1179        while let Some(page) = self.data_pages.pop_front() {
1180            self.write_data_page(page)?;
1181        }
1182
1183        Ok(())
1184    }
1185
1186    /// Assembles column chunk metadata.
1187    fn build_column_metadata(&mut self) -> Result<ColumnChunkMetaData> {
1188        let total_compressed_size = self.column_metrics.total_compressed_size as i64;
1189        let total_uncompressed_size = self.column_metrics.total_uncompressed_size as i64;
1190        let num_values = self.column_metrics.total_num_values as i64;
1191        let dict_page_offset = self.column_metrics.dictionary_page_offset.map(|v| v as i64);
1192        // If data page offset is not set, then no pages have been written
1193        let data_page_offset = self.column_metrics.data_page_offset.unwrap_or(0) as i64;
1194
1195        let mut builder = ColumnChunkMetaData::builder(self.descr.clone())
1196            .set_compression(self.codec)
1197            .set_encodings_mask(EncodingMask::new_from_encodings(self.encodings.iter()))
1198            .set_page_encoding_stats(self.encoding_stats.clone())
1199            .set_total_compressed_size(total_compressed_size)
1200            .set_total_uncompressed_size(total_uncompressed_size)
1201            .set_num_values(num_values)
1202            .set_data_page_offset(data_page_offset)
1203            .set_dictionary_page_offset(dict_page_offset);
1204
1205        if self.statistics_enabled != EnabledStatistics::None {
1206            let backwards_compatible_min_max = self.descr.sort_order().is_signed();
1207
1208            let statistics = ValueStatistics::<E::T>::new(
1209                self.column_metrics.min_column_value.clone(),
1210                self.column_metrics.max_column_value.clone(),
1211                self.column_metrics.column_distinct_count,
1212                Some(self.column_metrics.num_column_nulls),
1213                false,
1214            )
1215            .with_backwards_compatible_min_max(backwards_compatible_min_max)
1216            .into();
1217
1218            let statistics = self.truncate_statistics(statistics);
1219
1220            builder = builder
1221                .set_statistics(statistics)
1222                .set_unencoded_byte_array_data_bytes(self.column_metrics.variable_length_bytes)
1223                .set_repetition_level_histogram(
1224                    self.column_metrics.repetition_level_histogram.take(),
1225                )
1226                .set_definition_level_histogram(
1227                    self.column_metrics.definition_level_histogram.take(),
1228                );
1229
1230            if let Some(geo_stats) = self.encoder.flush_geospatial_statistics() {
1231                builder = builder.set_geo_statistics(geo_stats);
1232            }
1233        }
1234
1235        builder = self.set_column_chunk_encryption_properties(builder);
1236
1237        let metadata = builder.build()?;
1238        Ok(metadata)
1239    }
1240
1241    /// Writes compressed data page into underlying sink and updates global metrics.
1242    #[inline]
1243    fn write_data_page(&mut self, page: CompressedPage) -> Result<()> {
1244        self.encodings.insert(page.encoding());
1245        match self.encoding_stats.last_mut() {
1246            Some(encoding_stats)
1247                if encoding_stats.page_type == page.page_type()
1248                    && encoding_stats.encoding == page.encoding() =>
1249            {
1250                encoding_stats.count += 1;
1251            }
1252            _ => {
1253                // data page type does not change inside a file
1254                // encoding can currently only change from dictionary to non-dictionary once
1255                self.encoding_stats.push(PageEncodingStats {
1256                    page_type: page.page_type(),
1257                    encoding: page.encoding(),
1258                    count: 1,
1259                });
1260            }
1261        }
1262        let page_spec = self.page_writer.write_page(page)?;
1263        // update offset index
1264        // compressed_size = header_size + compressed_data_size
1265        if let Some(builder) = self.offset_index_builder.as_mut() {
1266            builder
1267                .append_offset_and_size(page_spec.offset as i64, page_spec.compressed_size as i32)
1268        }
1269        self.update_metrics_for_page(page_spec);
1270        Ok(())
1271    }
1272
1273    /// Writes dictionary page into underlying sink.
1274    #[inline]
1275    fn write_dictionary_page(&mut self) -> Result<()> {
1276        let compressed_page = {
1277            let mut page = self
1278                .encoder
1279                .flush_dict_page()?
1280                .ok_or_else(|| general_err!("Dictionary encoder is not set"))?;
1281
1282            let uncompressed_size = page.buf.len();
1283
1284            if let Some(ref mut cmpr) = self.compressor {
1285                let mut output_buf = Vec::with_capacity(uncompressed_size);
1286                cmpr.compress(&page.buf, &mut output_buf)?;
1287                page.buf = Bytes::from(output_buf);
1288            }
1289
1290            let dict_page = Page::DictionaryPage {
1291                buf: page.buf,
1292                num_values: page.num_values as u32,
1293                encoding: self.props.dictionary_page_encoding(),
1294                is_sorted: page.is_sorted,
1295            };
1296            CompressedPage::new(dict_page, uncompressed_size)
1297        };
1298
1299        self.encodings.insert(compressed_page.encoding());
1300        self.encoding_stats.push(PageEncodingStats {
1301            page_type: PageType::DICTIONARY_PAGE,
1302            encoding: compressed_page.encoding(),
1303            count: 1,
1304        });
1305        let page_spec = self.page_writer.write_page(compressed_page)?;
1306        self.update_metrics_for_page(page_spec);
1307        // For the directory page, don't need to update column/offset index.
1308        Ok(())
1309    }
1310
1311    /// Updates column writer metrics with each page metadata.
1312    #[inline]
1313    fn update_metrics_for_page(&mut self, page_spec: PageWriteSpec) {
1314        self.column_metrics.total_uncompressed_size += page_spec.uncompressed_size as u64;
1315        self.column_metrics.total_compressed_size += page_spec.compressed_size as u64;
1316        self.column_metrics.total_bytes_written += page_spec.bytes_written;
1317
1318        match page_spec.page_type {
1319            PageType::DATA_PAGE | PageType::DATA_PAGE_V2 => {
1320                self.column_metrics.total_num_values += page_spec.num_values as u64;
1321                if self.column_metrics.data_page_offset.is_none() {
1322                    self.column_metrics.data_page_offset = Some(page_spec.offset);
1323                }
1324            }
1325            PageType::DICTIONARY_PAGE => {
1326                assert!(
1327                    self.column_metrics.dictionary_page_offset.is_none(),
1328                    "Dictionary offset is already set"
1329                );
1330                self.column_metrics.dictionary_page_offset = Some(page_spec.offset);
1331            }
1332            _ => {}
1333        }
1334    }
1335
1336    #[inline]
1337    #[cfg(feature = "encryption")]
1338    fn set_column_chunk_encryption_properties(
1339        &self,
1340        builder: ColumnChunkMetaDataBuilder,
1341    ) -> ColumnChunkMetaDataBuilder {
1342        if let Some(encryption_properties) = self.props.file_encryption_properties.as_ref() {
1343            builder.set_column_crypto_metadata(get_column_crypto_metadata(
1344                encryption_properties,
1345                &self.descr,
1346            ))
1347        } else {
1348            builder
1349        }
1350    }
1351
1352    #[inline]
1353    #[cfg(not(feature = "encryption"))]
1354    fn set_column_chunk_encryption_properties(
1355        &self,
1356        builder: ColumnChunkMetaDataBuilder,
1357    ) -> ColumnChunkMetaDataBuilder {
1358        builder
1359    }
1360}
1361
1362fn update_min<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, min: &mut Option<T>) {
1363    update_stat::<T, _>(descr, val, min, |cur| compare_greater(descr, cur, val))
1364}
1365
1366fn update_max<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, max: &mut Option<T>) {
1367    update_stat::<T, _>(descr, val, max, |cur| compare_greater(descr, val, cur))
1368}
1369
1370#[inline]
1371#[allow(clippy::eq_op)]
1372fn is_nan<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T) -> bool {
1373    match T::PHYSICAL_TYPE {
1374        Type::FLOAT | Type::DOUBLE => val != val,
1375        Type::FIXED_LEN_BYTE_ARRAY if descr.logical_type_ref() == Some(&LogicalType::Float16) => {
1376            let val = val.as_bytes();
1377            let val = f16::from_le_bytes([val[0], val[1]]);
1378            val.is_nan()
1379        }
1380        _ => false,
1381    }
1382}
1383
1384/// Perform a conditional update of `cur`, skipping any NaN values
1385///
1386/// If `cur` is `None`, sets `cur` to `Some(val)`, otherwise calls `should_update` with
1387/// the value of `cur`, and updates `cur` to `Some(val)` if it returns `true`
1388fn update_stat<T: ParquetValueType, F>(
1389    descr: &ColumnDescriptor,
1390    val: &T,
1391    cur: &mut Option<T>,
1392    should_update: F,
1393) where
1394    F: Fn(&T) -> bool,
1395{
1396    if is_nan(descr, val) {
1397        return;
1398    }
1399
1400    if cur.as_ref().is_none_or(should_update) {
1401        *cur = Some(val.clone());
1402    }
1403}
1404
1405/// Evaluate `a > b` according to underlying logical type.
1406fn compare_greater<T: ParquetValueType>(descr: &ColumnDescriptor, a: &T, b: &T) -> bool {
1407    match T::PHYSICAL_TYPE {
1408        Type::INT32 | Type::INT64 => {
1409            if let Some(LogicalType::Integer {
1410                is_signed: false, ..
1411            }) = descr.logical_type_ref()
1412            {
1413                // need to compare unsigned
1414                return compare_greater_unsigned_int(a, b);
1415            }
1416
1417            match descr.converted_type() {
1418                ConvertedType::UINT_8
1419                | ConvertedType::UINT_16
1420                | ConvertedType::UINT_32
1421                | ConvertedType::UINT_64 => {
1422                    return compare_greater_unsigned_int(a, b);
1423                }
1424                _ => {}
1425            };
1426        }
1427        Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
1428            if let Some(LogicalType::Decimal { .. }) = descr.logical_type_ref() {
1429                return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1430            }
1431            if let ConvertedType::DECIMAL = descr.converted_type() {
1432                return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1433            }
1434            if let Some(LogicalType::Float16) = descr.logical_type_ref() {
1435                return compare_greater_f16(a.as_bytes(), b.as_bytes());
1436            }
1437        }
1438
1439        _ => {}
1440    }
1441
1442    // compare independent of logical / converted type
1443    a > b
1444}
1445
1446// ----------------------------------------------------------------------
1447// Encoding support for column writer.
1448// This mirrors parquet-mr default encodings for writes. See:
1449// https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV1ValuesWriterFactory.java
1450// https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV2ValuesWriterFactory.java
1451
1452/// Returns encoding for a column when no other encoding is provided in writer properties.
1453fn fallback_encoding(kind: Type, props: &WriterProperties) -> Encoding {
1454    match (kind, props.writer_version()) {
1455        (Type::BOOLEAN, WriterVersion::PARQUET_2_0) => Encoding::RLE,
1456        (Type::INT32, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1457        (Type::INT64, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1458        (Type::BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1459        (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1460        _ => Encoding::PLAIN,
1461    }
1462}
1463
1464/// Returns true if dictionary is supported for column writer, false otherwise.
1465fn has_dictionary_support(kind: Type, props: &WriterProperties) -> bool {
1466    match (kind, props.writer_version()) {
1467        // Booleans do not support dict encoding and should use a fallback encoding.
1468        (Type::BOOLEAN, _) => false,
1469        // Dictionary encoding was not enabled in PARQUET 1.0
1470        (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_1_0) => false,
1471        (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => true,
1472        _ => true,
1473    }
1474}
1475
1476#[inline]
1477fn compare_greater_unsigned_int<T: ParquetValueType>(a: &T, b: &T) -> bool {
1478    a.as_u64().unwrap() > b.as_u64().unwrap()
1479}
1480
1481#[inline]
1482fn compare_greater_f16(a: &[u8], b: &[u8]) -> bool {
1483    let a = f16::from_le_bytes(a.try_into().unwrap());
1484    let b = f16::from_le_bytes(b.try_into().unwrap());
1485    a > b
1486}
1487
1488/// Signed comparison of bytes arrays
1489fn compare_greater_byte_array_decimals(a: &[u8], b: &[u8]) -> bool {
1490    let a_length = a.len();
1491    let b_length = b.len();
1492
1493    if a_length == 0 || b_length == 0 {
1494        return a_length > 0;
1495    }
1496
1497    let first_a: u8 = a[0];
1498    let first_b: u8 = b[0];
1499
1500    // We can short circuit for different signed numbers or
1501    // for equal length bytes arrays that have different first bytes.
1502    // The equality requirement is necessary for sign extension cases.
1503    // 0xFF10 should be equal to 0x10 (due to big endian sign extension).
1504    if (0x80 & first_a) != (0x80 & first_b) || (a_length == b_length && first_a != first_b) {
1505        return (first_a as i8) > (first_b as i8);
1506    }
1507
1508    // When the lengths are unequal and the numbers are of the same
1509    // sign we need to do comparison by sign extending the shorter
1510    // value first, and once we get to equal sized arrays, lexicographical
1511    // unsigned comparison of everything but the first byte is sufficient.
1512
1513    let extension: u8 = if (first_a as i8) < 0 { 0xFF } else { 0 };
1514
1515    if a_length != b_length {
1516        let not_equal = if a_length > b_length {
1517            let lead_length = a_length - b_length;
1518            a[0..lead_length].iter().any(|&x| x != extension)
1519        } else {
1520            let lead_length = b_length - a_length;
1521            b[0..lead_length].iter().any(|&x| x != extension)
1522        };
1523
1524        if not_equal {
1525            let negative_values: bool = (first_a as i8) < 0;
1526            let a_longer: bool = a_length > b_length;
1527            return if negative_values { !a_longer } else { a_longer };
1528        }
1529    }
1530
1531    (a[1..]) > (b[1..])
1532}
1533
1534/// Truncate a UTF-8 slice to the longest prefix that is still a valid UTF-8 string,
1535/// while being less than `length` bytes and non-empty. Returns `None` if truncation
1536/// is not possible within those constraints.
1537///
1538/// The caller guarantees that data.len() > length.
1539fn truncate_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1540    let split = (1..=length).rfind(|x| data.is_char_boundary(*x))?;
1541    Some(data.as_bytes()[..split].to_vec())
1542}
1543
1544/// Truncate a UTF-8 slice and increment it's final character. The returned value is the
1545/// longest such slice that is still a valid UTF-8 string while being less than `length`
1546/// bytes and non-empty. Returns `None` if no such transformation is possible.
1547///
1548/// The caller guarantees that data.len() > length.
1549fn truncate_and_increment_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1550    // UTF-8 is max 4 bytes, so start search 3 back from desired length
1551    let lower_bound = length.saturating_sub(3);
1552    let split = (lower_bound..=length).rfind(|x| data.is_char_boundary(*x))?;
1553    increment_utf8(data.get(..split)?)
1554}
1555
1556/// Increment the final character in a UTF-8 string in such a way that the returned result
1557/// is still a valid UTF-8 string. The returned string may be shorter than the input if the
1558/// last character(s) cannot be incremented (due to overflow or producing invalid code points).
1559/// Returns `None` if the string cannot be incremented.
1560///
1561/// Note that this implementation will not promote an N-byte code point to (N+1) bytes.
1562fn increment_utf8(data: &str) -> Option<Vec<u8>> {
1563    for (idx, original_char) in data.char_indices().rev() {
1564        let original_len = original_char.len_utf8();
1565        if let Some(next_char) = char::from_u32(original_char as u32 + 1) {
1566            // do not allow increasing byte width of incremented char
1567            if next_char.len_utf8() == original_len {
1568                let mut result = data.as_bytes()[..idx + original_len].to_vec();
1569                next_char.encode_utf8(&mut result[idx..]);
1570                return Some(result);
1571            }
1572        }
1573    }
1574
1575    None
1576}
1577
1578/// Try and increment the bytes from right to left.
1579///
1580/// Returns `None` if all bytes are set to `u8::MAX`.
1581fn increment(mut data: Vec<u8>) -> Option<Vec<u8>> {
1582    for byte in data.iter_mut().rev() {
1583        let (incremented, overflow) = byte.overflowing_add(1);
1584        *byte = incremented;
1585
1586        if !overflow {
1587            return Some(data);
1588        }
1589    }
1590
1591    None
1592}
1593
1594#[cfg(test)]
1595mod tests {
1596    use crate::{
1597        file::{properties::DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, writer::SerializedFileWriter},
1598        schema::parser::parse_message_type,
1599    };
1600    use core::str;
1601    use rand::distr::uniform::SampleUniform;
1602    use std::{fs::File, sync::Arc};
1603
1604    use crate::column::{
1605        page::PageReader,
1606        reader::{ColumnReaderImpl, get_column_reader, get_typed_column_reader},
1607    };
1608    use crate::file::writer::TrackedWrite;
1609    use crate::file::{
1610        properties::ReaderProperties, reader::SerializedPageReader, writer::SerializedPageWriter,
1611    };
1612    use crate::schema::types::{ColumnPath, Type as SchemaType};
1613    use crate::util::test_common::rand_gen::random_numbers_range;
1614
1615    use super::*;
1616
1617    #[test]
1618    fn test_column_writer_inconsistent_def_rep_length() {
1619        let page_writer = get_test_page_writer();
1620        let props = Default::default();
1621        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 1, props);
1622        let res = writer.write_batch(&[1, 2, 3, 4], Some(&[1, 1, 1]), Some(&[0, 0]));
1623        assert!(res.is_err());
1624        if let Err(err) = res {
1625            assert_eq!(
1626                format!("{err}"),
1627                "Parquet error: Inconsistent length of definition and repetition levels: 3 != 2"
1628            );
1629        }
1630    }
1631
1632    #[test]
1633    fn test_column_writer_invalid_def_levels() {
1634        let page_writer = get_test_page_writer();
1635        let props = Default::default();
1636        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1637        let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1638        assert!(res.is_err());
1639        if let Err(err) = res {
1640            assert_eq!(
1641                format!("{err}"),
1642                "Parquet error: Definition levels are required, because max definition level = 1"
1643            );
1644        }
1645    }
1646
1647    #[test]
1648    fn test_column_writer_invalid_rep_levels() {
1649        let page_writer = get_test_page_writer();
1650        let props = Default::default();
1651        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 1, props);
1652        let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1653        assert!(res.is_err());
1654        if let Err(err) = res {
1655            assert_eq!(
1656                format!("{err}"),
1657                "Parquet error: Repetition levels are required, because max repetition level = 1"
1658            );
1659        }
1660    }
1661
1662    #[test]
1663    fn test_column_writer_not_enough_values_to_write() {
1664        let page_writer = get_test_page_writer();
1665        let props = Default::default();
1666        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1667        let res = writer.write_batch(&[1, 2], Some(&[1, 1, 1, 1]), None);
1668        assert!(res.is_err());
1669        if let Err(err) = res {
1670            assert_eq!(
1671                format!("{err}"),
1672                "Parquet error: Expected to write 4 values, but have only 2"
1673            );
1674        }
1675    }
1676
1677    #[test]
1678    fn test_column_writer_write_only_one_dictionary_page() {
1679        let page_writer = get_test_page_writer();
1680        let props = Default::default();
1681        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1682        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1683        // First page should be correctly written.
1684        writer.add_data_page().unwrap();
1685        writer.write_dictionary_page().unwrap();
1686        let err = writer.write_dictionary_page().unwrap_err().to_string();
1687        assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1688    }
1689
1690    #[test]
1691    fn test_column_writer_error_when_writing_disabled_dictionary() {
1692        let page_writer = get_test_page_writer();
1693        let props = Arc::new(
1694            WriterProperties::builder()
1695                .set_dictionary_enabled(false)
1696                .build(),
1697        );
1698        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1699        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1700        let err = writer.write_dictionary_page().unwrap_err().to_string();
1701        assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1702    }
1703
1704    #[test]
1705    fn test_column_writer_boolean_type_does_not_support_dictionary() {
1706        let page_writer = get_test_page_writer();
1707        let props = Arc::new(
1708            WriterProperties::builder()
1709                .set_dictionary_enabled(true)
1710                .build(),
1711        );
1712        let mut writer = get_test_column_writer::<BoolType>(page_writer, 0, 0, props);
1713        writer
1714            .write_batch(&[true, false, true, false], None, None)
1715            .unwrap();
1716
1717        let r = writer.close().unwrap();
1718        // PlainEncoder uses bit writer to write boolean values, which all fit into 1
1719        // byte.
1720        assert_eq!(r.bytes_written, 1);
1721        assert_eq!(r.rows_written, 4);
1722
1723        let metadata = r.metadata;
1724        assert_eq!(
1725            metadata.encodings().collect::<Vec<_>>(),
1726            vec![Encoding::PLAIN, Encoding::RLE]
1727        );
1728        assert_eq!(metadata.num_values(), 4); // just values
1729        assert_eq!(metadata.dictionary_page_offset(), None);
1730    }
1731
1732    #[test]
1733    fn test_column_writer_default_encoding_support_bool() {
1734        check_encoding_write_support::<BoolType>(
1735            WriterVersion::PARQUET_1_0,
1736            true,
1737            &[true, false],
1738            None,
1739            &[Encoding::PLAIN, Encoding::RLE],
1740            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1741        );
1742        check_encoding_write_support::<BoolType>(
1743            WriterVersion::PARQUET_1_0,
1744            false,
1745            &[true, false],
1746            None,
1747            &[Encoding::PLAIN, Encoding::RLE],
1748            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1749        );
1750        check_encoding_write_support::<BoolType>(
1751            WriterVersion::PARQUET_2_0,
1752            true,
1753            &[true, false],
1754            None,
1755            &[Encoding::RLE],
1756            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE, 1)],
1757        );
1758        check_encoding_write_support::<BoolType>(
1759            WriterVersion::PARQUET_2_0,
1760            false,
1761            &[true, false],
1762            None,
1763            &[Encoding::RLE],
1764            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE, 1)],
1765        );
1766    }
1767
1768    #[test]
1769    fn test_column_writer_default_encoding_support_int32() {
1770        check_encoding_write_support::<Int32Type>(
1771            WriterVersion::PARQUET_1_0,
1772            true,
1773            &[1, 2],
1774            Some(0),
1775            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1776            &[
1777                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1778                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1779            ],
1780        );
1781        check_encoding_write_support::<Int32Type>(
1782            WriterVersion::PARQUET_1_0,
1783            false,
1784            &[1, 2],
1785            None,
1786            &[Encoding::PLAIN, Encoding::RLE],
1787            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1788        );
1789        check_encoding_write_support::<Int32Type>(
1790            WriterVersion::PARQUET_2_0,
1791            true,
1792            &[1, 2],
1793            Some(0),
1794            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1795            &[
1796                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1797                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1798            ],
1799        );
1800        check_encoding_write_support::<Int32Type>(
1801            WriterVersion::PARQUET_2_0,
1802            false,
1803            &[1, 2],
1804            None,
1805            &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
1806            &[encoding_stats(
1807                PageType::DATA_PAGE_V2,
1808                Encoding::DELTA_BINARY_PACKED,
1809                1,
1810            )],
1811        );
1812    }
1813
1814    #[test]
1815    fn test_column_writer_default_encoding_support_int64() {
1816        check_encoding_write_support::<Int64Type>(
1817            WriterVersion::PARQUET_1_0,
1818            true,
1819            &[1, 2],
1820            Some(0),
1821            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1822            &[
1823                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1824                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1825            ],
1826        );
1827        check_encoding_write_support::<Int64Type>(
1828            WriterVersion::PARQUET_1_0,
1829            false,
1830            &[1, 2],
1831            None,
1832            &[Encoding::PLAIN, Encoding::RLE],
1833            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1834        );
1835        check_encoding_write_support::<Int64Type>(
1836            WriterVersion::PARQUET_2_0,
1837            true,
1838            &[1, 2],
1839            Some(0),
1840            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1841            &[
1842                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1843                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1844            ],
1845        );
1846        check_encoding_write_support::<Int64Type>(
1847            WriterVersion::PARQUET_2_0,
1848            false,
1849            &[1, 2],
1850            None,
1851            &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
1852            &[encoding_stats(
1853                PageType::DATA_PAGE_V2,
1854                Encoding::DELTA_BINARY_PACKED,
1855                1,
1856            )],
1857        );
1858    }
1859
1860    #[test]
1861    fn test_column_writer_default_encoding_support_int96() {
1862        check_encoding_write_support::<Int96Type>(
1863            WriterVersion::PARQUET_1_0,
1864            true,
1865            &[Int96::from(vec![1, 2, 3])],
1866            Some(0),
1867            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1868            &[
1869                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1870                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1871            ],
1872        );
1873        check_encoding_write_support::<Int96Type>(
1874            WriterVersion::PARQUET_1_0,
1875            false,
1876            &[Int96::from(vec![1, 2, 3])],
1877            None,
1878            &[Encoding::PLAIN, Encoding::RLE],
1879            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1880        );
1881        check_encoding_write_support::<Int96Type>(
1882            WriterVersion::PARQUET_2_0,
1883            true,
1884            &[Int96::from(vec![1, 2, 3])],
1885            Some(0),
1886            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1887            &[
1888                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1889                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1890            ],
1891        );
1892        check_encoding_write_support::<Int96Type>(
1893            WriterVersion::PARQUET_2_0,
1894            false,
1895            &[Int96::from(vec![1, 2, 3])],
1896            None,
1897            &[Encoding::PLAIN, Encoding::RLE],
1898            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1899        );
1900    }
1901
1902    #[test]
1903    fn test_column_writer_default_encoding_support_float() {
1904        check_encoding_write_support::<FloatType>(
1905            WriterVersion::PARQUET_1_0,
1906            true,
1907            &[1.0, 2.0],
1908            Some(0),
1909            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1910            &[
1911                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1912                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1913            ],
1914        );
1915        check_encoding_write_support::<FloatType>(
1916            WriterVersion::PARQUET_1_0,
1917            false,
1918            &[1.0, 2.0],
1919            None,
1920            &[Encoding::PLAIN, Encoding::RLE],
1921            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1922        );
1923        check_encoding_write_support::<FloatType>(
1924            WriterVersion::PARQUET_2_0,
1925            true,
1926            &[1.0, 2.0],
1927            Some(0),
1928            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1929            &[
1930                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1931                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1932            ],
1933        );
1934        check_encoding_write_support::<FloatType>(
1935            WriterVersion::PARQUET_2_0,
1936            false,
1937            &[1.0, 2.0],
1938            None,
1939            &[Encoding::PLAIN, Encoding::RLE],
1940            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1941        );
1942    }
1943
1944    #[test]
1945    fn test_column_writer_default_encoding_support_double() {
1946        check_encoding_write_support::<DoubleType>(
1947            WriterVersion::PARQUET_1_0,
1948            true,
1949            &[1.0, 2.0],
1950            Some(0),
1951            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1952            &[
1953                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1954                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1955            ],
1956        );
1957        check_encoding_write_support::<DoubleType>(
1958            WriterVersion::PARQUET_1_0,
1959            false,
1960            &[1.0, 2.0],
1961            None,
1962            &[Encoding::PLAIN, Encoding::RLE],
1963            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1964        );
1965        check_encoding_write_support::<DoubleType>(
1966            WriterVersion::PARQUET_2_0,
1967            true,
1968            &[1.0, 2.0],
1969            Some(0),
1970            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1971            &[
1972                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1973                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1974            ],
1975        );
1976        check_encoding_write_support::<DoubleType>(
1977            WriterVersion::PARQUET_2_0,
1978            false,
1979            &[1.0, 2.0],
1980            None,
1981            &[Encoding::PLAIN, Encoding::RLE],
1982            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1983        );
1984    }
1985
1986    #[test]
1987    fn test_column_writer_default_encoding_support_byte_array() {
1988        check_encoding_write_support::<ByteArrayType>(
1989            WriterVersion::PARQUET_1_0,
1990            true,
1991            &[ByteArray::from(vec![1u8])],
1992            Some(0),
1993            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1994            &[
1995                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1996                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1997            ],
1998        );
1999        check_encoding_write_support::<ByteArrayType>(
2000            WriterVersion::PARQUET_1_0,
2001            false,
2002            &[ByteArray::from(vec![1u8])],
2003            None,
2004            &[Encoding::PLAIN, Encoding::RLE],
2005            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2006        );
2007        check_encoding_write_support::<ByteArrayType>(
2008            WriterVersion::PARQUET_2_0,
2009            true,
2010            &[ByteArray::from(vec![1u8])],
2011            Some(0),
2012            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2013            &[
2014                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2015                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2016            ],
2017        );
2018        check_encoding_write_support::<ByteArrayType>(
2019            WriterVersion::PARQUET_2_0,
2020            false,
2021            &[ByteArray::from(vec![1u8])],
2022            None,
2023            &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
2024            &[encoding_stats(
2025                PageType::DATA_PAGE_V2,
2026                Encoding::DELTA_BYTE_ARRAY,
2027                1,
2028            )],
2029        );
2030    }
2031
2032    #[test]
2033    fn test_column_writer_default_encoding_support_fixed_len_byte_array() {
2034        check_encoding_write_support::<FixedLenByteArrayType>(
2035            WriterVersion::PARQUET_1_0,
2036            true,
2037            &[ByteArray::from(vec![1u8]).into()],
2038            None,
2039            &[Encoding::PLAIN, Encoding::RLE],
2040            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2041        );
2042        check_encoding_write_support::<FixedLenByteArrayType>(
2043            WriterVersion::PARQUET_1_0,
2044            false,
2045            &[ByteArray::from(vec![1u8]).into()],
2046            None,
2047            &[Encoding::PLAIN, Encoding::RLE],
2048            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2049        );
2050        check_encoding_write_support::<FixedLenByteArrayType>(
2051            WriterVersion::PARQUET_2_0,
2052            true,
2053            &[ByteArray::from(vec![1u8]).into()],
2054            Some(0),
2055            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2056            &[
2057                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2058                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2059            ],
2060        );
2061        check_encoding_write_support::<FixedLenByteArrayType>(
2062            WriterVersion::PARQUET_2_0,
2063            false,
2064            &[ByteArray::from(vec![1u8]).into()],
2065            None,
2066            &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
2067            &[encoding_stats(
2068                PageType::DATA_PAGE_V2,
2069                Encoding::DELTA_BYTE_ARRAY,
2070                1,
2071            )],
2072        );
2073    }
2074
2075    #[test]
2076    fn test_column_writer_check_metadata() {
2077        let page_writer = get_test_page_writer();
2078        let props = Default::default();
2079        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2080        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2081
2082        let r = writer.close().unwrap();
2083        assert_eq!(r.bytes_written, 20);
2084        assert_eq!(r.rows_written, 4);
2085
2086        let metadata = r.metadata;
2087        assert_eq!(
2088            metadata.encodings().collect::<Vec<_>>(),
2089            vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
2090        );
2091        assert_eq!(metadata.num_values(), 4);
2092        assert_eq!(metadata.compressed_size(), 20);
2093        assert_eq!(metadata.uncompressed_size(), 20);
2094        assert_eq!(metadata.data_page_offset(), 0);
2095        assert_eq!(metadata.dictionary_page_offset(), Some(0));
2096        if let Some(stats) = metadata.statistics() {
2097            assert_eq!(stats.null_count_opt(), Some(0));
2098            assert_eq!(stats.distinct_count_opt(), None);
2099            if let Statistics::Int32(stats) = stats {
2100                assert_eq!(stats.min_opt().unwrap(), &1);
2101                assert_eq!(stats.max_opt().unwrap(), &4);
2102            } else {
2103                panic!("expecting Statistics::Int32");
2104            }
2105        } else {
2106            panic!("metadata missing statistics");
2107        }
2108    }
2109
2110    #[test]
2111    fn test_column_writer_check_byte_array_min_max() {
2112        let page_writer = get_test_page_writer();
2113        let props = Default::default();
2114        let mut writer = get_test_decimals_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
2115        writer
2116            .write_batch(
2117                &[
2118                    ByteArray::from(vec![
2119                        255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
2120                        35u8, 231u8, 90u8, 0u8, 0u8,
2121                    ]),
2122                    ByteArray::from(vec![
2123                        255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 228u8, 62u8, 146u8,
2124                        152u8, 177u8, 56u8, 0u8, 0u8,
2125                    ]),
2126                    ByteArray::from(vec![
2127                        0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8,
2128                        0u8,
2129                    ]),
2130                    ByteArray::from(vec![
2131                        0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
2132                        44u8, 0u8, 0u8,
2133                    ]),
2134                ],
2135                None,
2136                None,
2137            )
2138            .unwrap();
2139        let metadata = writer.close().unwrap().metadata;
2140        if let Some(stats) = metadata.statistics() {
2141            if let Statistics::ByteArray(stats) = stats {
2142                assert_eq!(
2143                    stats.min_opt().unwrap(),
2144                    &ByteArray::from(vec![
2145                        255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
2146                        35u8, 231u8, 90u8, 0u8, 0u8,
2147                    ])
2148                );
2149                assert_eq!(
2150                    stats.max_opt().unwrap(),
2151                    &ByteArray::from(vec![
2152                        0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
2153                        44u8, 0u8, 0u8,
2154                    ])
2155                );
2156            } else {
2157                panic!("expecting Statistics::ByteArray");
2158            }
2159        } else {
2160            panic!("metadata missing statistics");
2161        }
2162    }
2163
2164    #[test]
2165    fn test_column_writer_uint32_converted_type_min_max() {
2166        let page_writer = get_test_page_writer();
2167        let props = Default::default();
2168        let mut writer = get_test_unsigned_int_given_as_converted_column_writer::<Int32Type>(
2169            page_writer,
2170            0,
2171            0,
2172            props,
2173        );
2174        writer.write_batch(&[0, 1, 2, 3, 4, 5], None, None).unwrap();
2175        let metadata = writer.close().unwrap().metadata;
2176        if let Some(stats) = metadata.statistics() {
2177            if let Statistics::Int32(stats) = stats {
2178                assert_eq!(stats.min_opt().unwrap(), &0,);
2179                assert_eq!(stats.max_opt().unwrap(), &5,);
2180            } else {
2181                panic!("expecting Statistics::Int32");
2182            }
2183        } else {
2184            panic!("metadata missing statistics");
2185        }
2186    }
2187
2188    #[test]
2189    fn test_column_writer_precalculated_statistics() {
2190        let page_writer = get_test_page_writer();
2191        let props = Arc::new(
2192            WriterProperties::builder()
2193                .set_statistics_enabled(EnabledStatistics::Chunk)
2194                .build(),
2195        );
2196        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2197        writer
2198            .write_batch_with_statistics(
2199                &[1, 2, 3, 4],
2200                None,
2201                None,
2202                Some(&-17),
2203                Some(&9000),
2204                Some(55),
2205            )
2206            .unwrap();
2207
2208        let r = writer.close().unwrap();
2209        assert_eq!(r.bytes_written, 20);
2210        assert_eq!(r.rows_written, 4);
2211
2212        let metadata = r.metadata;
2213        assert_eq!(
2214            metadata.encodings().collect::<Vec<_>>(),
2215            vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
2216        );
2217        assert_eq!(metadata.num_values(), 4);
2218        assert_eq!(metadata.compressed_size(), 20);
2219        assert_eq!(metadata.uncompressed_size(), 20);
2220        assert_eq!(metadata.data_page_offset(), 0);
2221        assert_eq!(metadata.dictionary_page_offset(), Some(0));
2222        if let Some(stats) = metadata.statistics() {
2223            assert_eq!(stats.null_count_opt(), Some(0));
2224            assert_eq!(stats.distinct_count_opt().unwrap_or(0), 55);
2225            if let Statistics::Int32(stats) = stats {
2226                assert_eq!(stats.min_opt().unwrap(), &-17);
2227                assert_eq!(stats.max_opt().unwrap(), &9000);
2228            } else {
2229                panic!("expecting Statistics::Int32");
2230            }
2231        } else {
2232            panic!("metadata missing statistics");
2233        }
2234    }
2235
2236    #[test]
2237    fn test_mixed_precomputed_statistics() {
2238        let mut buf = Vec::with_capacity(100);
2239        let mut write = TrackedWrite::new(&mut buf);
2240        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2241        let props = Arc::new(
2242            WriterProperties::builder()
2243                .set_write_page_header_statistics(true)
2244                .build(),
2245        );
2246        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2247
2248        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2249        writer
2250            .write_batch_with_statistics(&[5, 6, 7], None, None, Some(&5), Some(&7), Some(3))
2251            .unwrap();
2252
2253        let r = writer.close().unwrap();
2254
2255        let stats = r.metadata.statistics().unwrap();
2256        assert_eq!(stats.min_bytes_opt().unwrap(), 1_i32.to_le_bytes());
2257        assert_eq!(stats.max_bytes_opt().unwrap(), 7_i32.to_le_bytes());
2258        assert_eq!(stats.null_count_opt(), Some(0));
2259        assert!(stats.distinct_count_opt().is_none());
2260
2261        drop(write);
2262
2263        let props = ReaderProperties::builder()
2264            .set_backward_compatible_lz4(false)
2265            .set_read_page_statistics(true)
2266            .build();
2267        let reader = SerializedPageReader::new_with_properties(
2268            Arc::new(Bytes::from(buf)),
2269            &r.metadata,
2270            r.rows_written as usize,
2271            None,
2272            Arc::new(props),
2273        )
2274        .unwrap();
2275
2276        let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2277        assert_eq!(pages.len(), 2);
2278
2279        assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2280        assert_eq!(pages[1].page_type(), PageType::DATA_PAGE);
2281
2282        let page_statistics = pages[1].statistics().unwrap();
2283        assert_eq!(
2284            page_statistics.min_bytes_opt().unwrap(),
2285            1_i32.to_le_bytes()
2286        );
2287        assert_eq!(
2288            page_statistics.max_bytes_opt().unwrap(),
2289            7_i32.to_le_bytes()
2290        );
2291        assert_eq!(page_statistics.null_count_opt(), Some(0));
2292        assert!(page_statistics.distinct_count_opt().is_none());
2293    }
2294
2295    #[test]
2296    fn test_disabled_statistics() {
2297        let mut buf = Vec::with_capacity(100);
2298        let mut write = TrackedWrite::new(&mut buf);
2299        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2300        let props = WriterProperties::builder()
2301            .set_statistics_enabled(EnabledStatistics::None)
2302            .set_writer_version(WriterVersion::PARQUET_2_0)
2303            .build();
2304        let props = Arc::new(props);
2305
2306        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
2307        writer
2308            .write_batch(&[1, 2, 3, 4], Some(&[1, 0, 0, 1, 1, 1]), None)
2309            .unwrap();
2310
2311        let r = writer.close().unwrap();
2312        assert!(r.metadata.statistics().is_none());
2313
2314        drop(write);
2315
2316        let props = ReaderProperties::builder()
2317            .set_backward_compatible_lz4(false)
2318            .build();
2319        let reader = SerializedPageReader::new_with_properties(
2320            Arc::new(Bytes::from(buf)),
2321            &r.metadata,
2322            r.rows_written as usize,
2323            None,
2324            Arc::new(props),
2325        )
2326        .unwrap();
2327
2328        let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2329        assert_eq!(pages.len(), 2);
2330
2331        assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2332        assert_eq!(pages[1].page_type(), PageType::DATA_PAGE_V2);
2333
2334        match &pages[1] {
2335            Page::DataPageV2 {
2336                num_values,
2337                num_nulls,
2338                num_rows,
2339                statistics,
2340                ..
2341            } => {
2342                assert_eq!(*num_values, 6);
2343                assert_eq!(*num_nulls, 2);
2344                assert_eq!(*num_rows, 6);
2345                assert!(statistics.is_none());
2346            }
2347            _ => unreachable!(),
2348        }
2349    }
2350
2351    #[test]
2352    fn test_column_writer_empty_column_roundtrip() {
2353        let props = Default::default();
2354        column_roundtrip::<Int32Type>(props, &[], None, None);
2355    }
2356
2357    #[test]
2358    fn test_column_writer_non_nullable_values_roundtrip() {
2359        let props = Default::default();
2360        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 0, 0);
2361    }
2362
2363    #[test]
2364    fn test_column_writer_nullable_non_repeated_values_roundtrip() {
2365        let props = Default::default();
2366        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 0);
2367    }
2368
2369    #[test]
2370    fn test_column_writer_nullable_repeated_values_roundtrip() {
2371        let props = Default::default();
2372        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2373    }
2374
2375    #[test]
2376    fn test_column_writer_dictionary_fallback_small_data_page() {
2377        let props = WriterProperties::builder()
2378            .set_dictionary_page_size_limit(32)
2379            .set_data_page_size_limit(32)
2380            .build();
2381        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2382    }
2383
2384    #[test]
2385    fn test_column_writer_small_write_batch_size() {
2386        for i in &[1usize, 2, 5, 10, 11, 1023] {
2387            let props = WriterProperties::builder().set_write_batch_size(*i).build();
2388
2389            column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2390        }
2391    }
2392
2393    #[test]
2394    fn test_column_writer_dictionary_disabled_v1() {
2395        let props = WriterProperties::builder()
2396            .set_writer_version(WriterVersion::PARQUET_1_0)
2397            .set_dictionary_enabled(false)
2398            .build();
2399        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2400    }
2401
2402    #[test]
2403    fn test_column_writer_dictionary_disabled_v2() {
2404        let props = WriterProperties::builder()
2405            .set_writer_version(WriterVersion::PARQUET_2_0)
2406            .set_dictionary_enabled(false)
2407            .build();
2408        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2409    }
2410
2411    #[test]
2412    fn test_column_writer_compression_v1() {
2413        let props = WriterProperties::builder()
2414            .set_writer_version(WriterVersion::PARQUET_1_0)
2415            .set_compression(Compression::SNAPPY)
2416            .build();
2417        column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2418    }
2419
2420    #[test]
2421    fn test_column_writer_compression_v2() {
2422        let props = WriterProperties::builder()
2423            .set_writer_version(WriterVersion::PARQUET_2_0)
2424            .set_compression(Compression::SNAPPY)
2425            .build();
2426        column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2427    }
2428
2429    #[test]
2430    fn test_column_writer_add_data_pages_with_dict() {
2431        // ARROW-5129: Test verifies that we add data page in case of dictionary encoding
2432        // and no fallback occurred so far.
2433        let mut file = tempfile::tempfile().unwrap();
2434        let mut write = TrackedWrite::new(&mut file);
2435        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2436        let props = Arc::new(
2437            WriterProperties::builder()
2438                .set_data_page_size_limit(10)
2439                .set_write_batch_size(3) // write 3 values at a time
2440                .build(),
2441        );
2442        let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
2443        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2444        writer.write_batch(data, None, None).unwrap();
2445        let r = writer.close().unwrap();
2446
2447        drop(write);
2448
2449        // Read pages and check the sequence
2450        let props = ReaderProperties::builder()
2451            .set_backward_compatible_lz4(false)
2452            .build();
2453        let mut page_reader = Box::new(
2454            SerializedPageReader::new_with_properties(
2455                Arc::new(file),
2456                &r.metadata,
2457                r.rows_written as usize,
2458                None,
2459                Arc::new(props),
2460            )
2461            .unwrap(),
2462        );
2463        let mut res = Vec::new();
2464        while let Some(page) = page_reader.get_next_page().unwrap() {
2465            res.push((page.page_type(), page.num_values(), page.buffer().len()));
2466        }
2467        assert_eq!(
2468            res,
2469            vec![
2470                (PageType::DICTIONARY_PAGE, 10, 40),
2471                (PageType::DATA_PAGE, 9, 10),
2472                (PageType::DATA_PAGE, 1, 3),
2473            ]
2474        );
2475        assert_eq!(
2476            r.metadata.page_encoding_stats(),
2477            Some(&vec![
2478                PageEncodingStats {
2479                    page_type: PageType::DICTIONARY_PAGE,
2480                    encoding: Encoding::PLAIN,
2481                    count: 1
2482                },
2483                PageEncodingStats {
2484                    page_type: PageType::DATA_PAGE,
2485                    encoding: Encoding::RLE_DICTIONARY,
2486                    count: 2,
2487                }
2488            ])
2489        );
2490    }
2491
2492    #[test]
2493    fn test_column_writer_column_data_page_size_limit() {
2494        let props = Arc::new(
2495            WriterProperties::builder()
2496                .set_writer_version(WriterVersion::PARQUET_1_0)
2497                .set_dictionary_enabled(false)
2498                .set_data_page_size_limit(1000)
2499                .set_column_data_page_size_limit(ColumnPath::from("col"), 10)
2500                .set_write_batch_size(3)
2501                .build(),
2502        );
2503        let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
2504
2505        let col_values =
2506            write_and_collect_page_values(ColumnPath::from("col"), Arc::clone(&props), data);
2507        let other_values = write_and_collect_page_values(ColumnPath::from("other"), props, data);
2508
2509        assert_eq!(col_values, vec![3, 3, 3, 1]);
2510        assert_eq!(other_values, vec![10]);
2511    }
2512
2513    #[test]
2514    fn test_bool_statistics() {
2515        let stats = statistics_roundtrip::<BoolType>(&[true, false, false, true]);
2516        // Booleans have an unsigned sort order and so are not compatible
2517        // with the deprecated `min` and `max` statistics
2518        assert!(!stats.is_min_max_backwards_compatible());
2519        if let Statistics::Boolean(stats) = stats {
2520            assert_eq!(stats.min_opt().unwrap(), &false);
2521            assert_eq!(stats.max_opt().unwrap(), &true);
2522        } else {
2523            panic!("expecting Statistics::Boolean, got {stats:?}");
2524        }
2525    }
2526
2527    #[test]
2528    fn test_int32_statistics() {
2529        let stats = statistics_roundtrip::<Int32Type>(&[-1, 3, -2, 2]);
2530        assert!(stats.is_min_max_backwards_compatible());
2531        if let Statistics::Int32(stats) = stats {
2532            assert_eq!(stats.min_opt().unwrap(), &-2);
2533            assert_eq!(stats.max_opt().unwrap(), &3);
2534        } else {
2535            panic!("expecting Statistics::Int32, got {stats:?}");
2536        }
2537    }
2538
2539    #[test]
2540    fn test_int64_statistics() {
2541        let stats = statistics_roundtrip::<Int64Type>(&[-1, 3, -2, 2]);
2542        assert!(stats.is_min_max_backwards_compatible());
2543        if let Statistics::Int64(stats) = stats {
2544            assert_eq!(stats.min_opt().unwrap(), &-2);
2545            assert_eq!(stats.max_opt().unwrap(), &3);
2546        } else {
2547            panic!("expecting Statistics::Int64, got {stats:?}");
2548        }
2549    }
2550
2551    #[test]
2552    fn test_int96_statistics() {
2553        let input = vec![
2554            Int96::from(vec![1, 20, 30]),
2555            Int96::from(vec![3, 20, 10]),
2556            Int96::from(vec![0, 20, 30]),
2557            Int96::from(vec![2, 20, 30]),
2558        ]
2559        .into_iter()
2560        .collect::<Vec<Int96>>();
2561
2562        let stats = statistics_roundtrip::<Int96Type>(&input);
2563        assert!(!stats.is_min_max_backwards_compatible());
2564        if let Statistics::Int96(stats) = stats {
2565            assert_eq!(stats.min_opt().unwrap(), &Int96::from(vec![3, 20, 10]));
2566            assert_eq!(stats.max_opt().unwrap(), &Int96::from(vec![2, 20, 30]));
2567        } else {
2568            panic!("expecting Statistics::Int96, got {stats:?}");
2569        }
2570    }
2571
2572    #[test]
2573    fn test_float_statistics() {
2574        let stats = statistics_roundtrip::<FloatType>(&[-1.0, 3.0, -2.0, 2.0]);
2575        assert!(stats.is_min_max_backwards_compatible());
2576        if let Statistics::Float(stats) = stats {
2577            assert_eq!(stats.min_opt().unwrap(), &-2.0);
2578            assert_eq!(stats.max_opt().unwrap(), &3.0);
2579        } else {
2580            panic!("expecting Statistics::Float, got {stats:?}");
2581        }
2582    }
2583
2584    #[test]
2585    fn test_double_statistics() {
2586        let stats = statistics_roundtrip::<DoubleType>(&[-1.0, 3.0, -2.0, 2.0]);
2587        assert!(stats.is_min_max_backwards_compatible());
2588        if let Statistics::Double(stats) = stats {
2589            assert_eq!(stats.min_opt().unwrap(), &-2.0);
2590            assert_eq!(stats.max_opt().unwrap(), &3.0);
2591        } else {
2592            panic!("expecting Statistics::Double, got {stats:?}");
2593        }
2594    }
2595
2596    #[test]
2597    fn test_byte_array_statistics() {
2598        let input = ["aawaa", "zz", "aaw", "m", "qrs"]
2599            .iter()
2600            .map(|&s| s.into())
2601            .collect::<Vec<_>>();
2602
2603        let stats = statistics_roundtrip::<ByteArrayType>(&input);
2604        assert!(!stats.is_min_max_backwards_compatible());
2605        if let Statistics::ByteArray(stats) = stats {
2606            assert_eq!(stats.min_opt().unwrap(), &ByteArray::from("aaw"));
2607            assert_eq!(stats.max_opt().unwrap(), &ByteArray::from("zz"));
2608        } else {
2609            panic!("expecting Statistics::ByteArray, got {stats:?}");
2610        }
2611    }
2612
2613    #[test]
2614    fn test_fixed_len_byte_array_statistics() {
2615        let input = ["aawaa", "zz   ", "aaw  ", "m    ", "qrs  "]
2616            .iter()
2617            .map(|&s| ByteArray::from(s).into())
2618            .collect::<Vec<_>>();
2619
2620        let stats = statistics_roundtrip::<FixedLenByteArrayType>(&input);
2621        assert!(!stats.is_min_max_backwards_compatible());
2622        if let Statistics::FixedLenByteArray(stats) = stats {
2623            let expected_min: FixedLenByteArray = ByteArray::from("aaw  ").into();
2624            assert_eq!(stats.min_opt().unwrap(), &expected_min);
2625            let expected_max: FixedLenByteArray = ByteArray::from("zz   ").into();
2626            assert_eq!(stats.max_opt().unwrap(), &expected_max);
2627        } else {
2628            panic!("expecting Statistics::FixedLenByteArray, got {stats:?}");
2629        }
2630    }
2631
2632    #[test]
2633    fn test_column_writer_check_float16_min_max() {
2634        let input = [
2635            -f16::ONE,
2636            f16::from_f32(3.0),
2637            -f16::from_f32(2.0),
2638            f16::from_f32(2.0),
2639        ]
2640        .into_iter()
2641        .map(|s| ByteArray::from(s).into())
2642        .collect::<Vec<_>>();
2643
2644        let stats = float16_statistics_roundtrip(&input);
2645        assert!(stats.is_min_max_backwards_compatible());
2646        assert_eq!(
2647            stats.min_opt().unwrap(),
2648            &ByteArray::from(-f16::from_f32(2.0))
2649        );
2650        assert_eq!(
2651            stats.max_opt().unwrap(),
2652            &ByteArray::from(f16::from_f32(3.0))
2653        );
2654    }
2655
2656    #[test]
2657    fn test_column_writer_check_float16_nan_middle() {
2658        let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
2659            .into_iter()
2660            .map(|s| ByteArray::from(s).into())
2661            .collect::<Vec<_>>();
2662
2663        let stats = float16_statistics_roundtrip(&input);
2664        assert!(stats.is_min_max_backwards_compatible());
2665        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2666        assert_eq!(
2667            stats.max_opt().unwrap(),
2668            &ByteArray::from(f16::ONE + f16::ONE)
2669        );
2670    }
2671
2672    #[test]
2673    fn test_float16_statistics_nan_middle() {
2674        let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
2675            .into_iter()
2676            .map(|s| ByteArray::from(s).into())
2677            .collect::<Vec<_>>();
2678
2679        let stats = float16_statistics_roundtrip(&input);
2680        assert!(stats.is_min_max_backwards_compatible());
2681        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2682        assert_eq!(
2683            stats.max_opt().unwrap(),
2684            &ByteArray::from(f16::ONE + f16::ONE)
2685        );
2686    }
2687
2688    #[test]
2689    fn test_float16_statistics_nan_start() {
2690        let input = [f16::NAN, f16::ONE, f16::ONE + f16::ONE]
2691            .into_iter()
2692            .map(|s| ByteArray::from(s).into())
2693            .collect::<Vec<_>>();
2694
2695        let stats = float16_statistics_roundtrip(&input);
2696        assert!(stats.is_min_max_backwards_compatible());
2697        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2698        assert_eq!(
2699            stats.max_opt().unwrap(),
2700            &ByteArray::from(f16::ONE + f16::ONE)
2701        );
2702    }
2703
2704    #[test]
2705    fn test_float16_statistics_nan_only() {
2706        let input = [f16::NAN, f16::NAN]
2707            .into_iter()
2708            .map(|s| ByteArray::from(s).into())
2709            .collect::<Vec<_>>();
2710
2711        let stats = float16_statistics_roundtrip(&input);
2712        assert!(stats.min_bytes_opt().is_none());
2713        assert!(stats.max_bytes_opt().is_none());
2714        assert!(stats.is_min_max_backwards_compatible());
2715    }
2716
2717    #[test]
2718    fn test_float16_statistics_zero_only() {
2719        let input = [f16::ZERO]
2720            .into_iter()
2721            .map(|s| ByteArray::from(s).into())
2722            .collect::<Vec<_>>();
2723
2724        let stats = float16_statistics_roundtrip(&input);
2725        assert!(stats.is_min_max_backwards_compatible());
2726        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2727        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2728    }
2729
2730    #[test]
2731    fn test_float16_statistics_neg_zero_only() {
2732        let input = [f16::NEG_ZERO]
2733            .into_iter()
2734            .map(|s| ByteArray::from(s).into())
2735            .collect::<Vec<_>>();
2736
2737        let stats = float16_statistics_roundtrip(&input);
2738        assert!(stats.is_min_max_backwards_compatible());
2739        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2740        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2741    }
2742
2743    #[test]
2744    fn test_float16_statistics_zero_min() {
2745        let input = [f16::ZERO, f16::ONE, f16::NAN, f16::PI]
2746            .into_iter()
2747            .map(|s| ByteArray::from(s).into())
2748            .collect::<Vec<_>>();
2749
2750        let stats = float16_statistics_roundtrip(&input);
2751        assert!(stats.is_min_max_backwards_compatible());
2752        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2753        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::PI));
2754    }
2755
2756    #[test]
2757    fn test_float16_statistics_neg_zero_max() {
2758        let input = [f16::NEG_ZERO, f16::NEG_ONE, f16::NAN, -f16::PI]
2759            .into_iter()
2760            .map(|s| ByteArray::from(s).into())
2761            .collect::<Vec<_>>();
2762
2763        let stats = float16_statistics_roundtrip(&input);
2764        assert!(stats.is_min_max_backwards_compatible());
2765        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(-f16::PI));
2766        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2767    }
2768
2769    #[test]
2770    fn test_float_statistics_nan_middle() {
2771        let stats = statistics_roundtrip::<FloatType>(&[1.0, f32::NAN, 2.0]);
2772        assert!(stats.is_min_max_backwards_compatible());
2773        if let Statistics::Float(stats) = stats {
2774            assert_eq!(stats.min_opt().unwrap(), &1.0);
2775            assert_eq!(stats.max_opt().unwrap(), &2.0);
2776        } else {
2777            panic!("expecting Statistics::Float");
2778        }
2779    }
2780
2781    #[test]
2782    fn test_float_statistics_nan_start() {
2783        let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, 1.0, 2.0]);
2784        assert!(stats.is_min_max_backwards_compatible());
2785        if let Statistics::Float(stats) = stats {
2786            assert_eq!(stats.min_opt().unwrap(), &1.0);
2787            assert_eq!(stats.max_opt().unwrap(), &2.0);
2788        } else {
2789            panic!("expecting Statistics::Float");
2790        }
2791    }
2792
2793    #[test]
2794    fn test_float_statistics_nan_only() {
2795        let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, f32::NAN]);
2796        assert!(stats.min_bytes_opt().is_none());
2797        assert!(stats.max_bytes_opt().is_none());
2798        assert!(stats.is_min_max_backwards_compatible());
2799        assert!(matches!(stats, Statistics::Float(_)));
2800    }
2801
2802    #[test]
2803    fn test_float_statistics_zero_only() {
2804        let stats = statistics_roundtrip::<FloatType>(&[0.0]);
2805        assert!(stats.is_min_max_backwards_compatible());
2806        if let Statistics::Float(stats) = stats {
2807            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2808            assert!(stats.min_opt().unwrap().is_sign_negative());
2809            assert_eq!(stats.max_opt().unwrap(), &0.0);
2810            assert!(stats.max_opt().unwrap().is_sign_positive());
2811        } else {
2812            panic!("expecting Statistics::Float");
2813        }
2814    }
2815
2816    #[test]
2817    fn test_float_statistics_neg_zero_only() {
2818        let stats = statistics_roundtrip::<FloatType>(&[-0.0]);
2819        assert!(stats.is_min_max_backwards_compatible());
2820        if let Statistics::Float(stats) = stats {
2821            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2822            assert!(stats.min_opt().unwrap().is_sign_negative());
2823            assert_eq!(stats.max_opt().unwrap(), &0.0);
2824            assert!(stats.max_opt().unwrap().is_sign_positive());
2825        } else {
2826            panic!("expecting Statistics::Float");
2827        }
2828    }
2829
2830    #[test]
2831    fn test_float_statistics_zero_min() {
2832        let stats = statistics_roundtrip::<FloatType>(&[0.0, 1.0, f32::NAN, 2.0]);
2833        assert!(stats.is_min_max_backwards_compatible());
2834        if let Statistics::Float(stats) = stats {
2835            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2836            assert!(stats.min_opt().unwrap().is_sign_negative());
2837            assert_eq!(stats.max_opt().unwrap(), &2.0);
2838        } else {
2839            panic!("expecting Statistics::Float");
2840        }
2841    }
2842
2843    #[test]
2844    fn test_float_statistics_neg_zero_max() {
2845        let stats = statistics_roundtrip::<FloatType>(&[-0.0, -1.0, f32::NAN, -2.0]);
2846        assert!(stats.is_min_max_backwards_compatible());
2847        if let Statistics::Float(stats) = stats {
2848            assert_eq!(stats.min_opt().unwrap(), &-2.0);
2849            assert_eq!(stats.max_opt().unwrap(), &0.0);
2850            assert!(stats.max_opt().unwrap().is_sign_positive());
2851        } else {
2852            panic!("expecting Statistics::Float");
2853        }
2854    }
2855
2856    #[test]
2857    fn test_double_statistics_nan_middle() {
2858        let stats = statistics_roundtrip::<DoubleType>(&[1.0, f64::NAN, 2.0]);
2859        assert!(stats.is_min_max_backwards_compatible());
2860        if let Statistics::Double(stats) = stats {
2861            assert_eq!(stats.min_opt().unwrap(), &1.0);
2862            assert_eq!(stats.max_opt().unwrap(), &2.0);
2863        } else {
2864            panic!("expecting Statistics::Double");
2865        }
2866    }
2867
2868    #[test]
2869    fn test_double_statistics_nan_start() {
2870        let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, 1.0, 2.0]);
2871        assert!(stats.is_min_max_backwards_compatible());
2872        if let Statistics::Double(stats) = stats {
2873            assert_eq!(stats.min_opt().unwrap(), &1.0);
2874            assert_eq!(stats.max_opt().unwrap(), &2.0);
2875        } else {
2876            panic!("expecting Statistics::Double");
2877        }
2878    }
2879
2880    #[test]
2881    fn test_double_statistics_nan_only() {
2882        let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, f64::NAN]);
2883        assert!(stats.min_bytes_opt().is_none());
2884        assert!(stats.max_bytes_opt().is_none());
2885        assert!(matches!(stats, Statistics::Double(_)));
2886        assert!(stats.is_min_max_backwards_compatible());
2887    }
2888
2889    #[test]
2890    fn test_double_statistics_zero_only() {
2891        let stats = statistics_roundtrip::<DoubleType>(&[0.0]);
2892        assert!(stats.is_min_max_backwards_compatible());
2893        if let Statistics::Double(stats) = stats {
2894            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2895            assert!(stats.min_opt().unwrap().is_sign_negative());
2896            assert_eq!(stats.max_opt().unwrap(), &0.0);
2897            assert!(stats.max_opt().unwrap().is_sign_positive());
2898        } else {
2899            panic!("expecting Statistics::Double");
2900        }
2901    }
2902
2903    #[test]
2904    fn test_double_statistics_neg_zero_only() {
2905        let stats = statistics_roundtrip::<DoubleType>(&[-0.0]);
2906        assert!(stats.is_min_max_backwards_compatible());
2907        if let Statistics::Double(stats) = stats {
2908            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2909            assert!(stats.min_opt().unwrap().is_sign_negative());
2910            assert_eq!(stats.max_opt().unwrap(), &0.0);
2911            assert!(stats.max_opt().unwrap().is_sign_positive());
2912        } else {
2913            panic!("expecting Statistics::Double");
2914        }
2915    }
2916
2917    #[test]
2918    fn test_double_statistics_zero_min() {
2919        let stats = statistics_roundtrip::<DoubleType>(&[0.0, 1.0, f64::NAN, 2.0]);
2920        assert!(stats.is_min_max_backwards_compatible());
2921        if let Statistics::Double(stats) = stats {
2922            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2923            assert!(stats.min_opt().unwrap().is_sign_negative());
2924            assert_eq!(stats.max_opt().unwrap(), &2.0);
2925        } else {
2926            panic!("expecting Statistics::Double");
2927        }
2928    }
2929
2930    #[test]
2931    fn test_double_statistics_neg_zero_max() {
2932        let stats = statistics_roundtrip::<DoubleType>(&[-0.0, -1.0, f64::NAN, -2.0]);
2933        assert!(stats.is_min_max_backwards_compatible());
2934        if let Statistics::Double(stats) = stats {
2935            assert_eq!(stats.min_opt().unwrap(), &-2.0);
2936            assert_eq!(stats.max_opt().unwrap(), &0.0);
2937            assert!(stats.max_opt().unwrap().is_sign_positive());
2938        } else {
2939            panic!("expecting Statistics::Double");
2940        }
2941    }
2942
2943    #[test]
2944    fn test_compare_greater_byte_array_decimals() {
2945        assert!(!compare_greater_byte_array_decimals(&[], &[],),);
2946        assert!(compare_greater_byte_array_decimals(&[1u8,], &[],),);
2947        assert!(!compare_greater_byte_array_decimals(&[], &[1u8,],),);
2948        assert!(compare_greater_byte_array_decimals(&[1u8,], &[0u8,],),);
2949        assert!(!compare_greater_byte_array_decimals(&[1u8,], &[1u8,],),);
2950        assert!(compare_greater_byte_array_decimals(&[1u8, 0u8,], &[0u8,],),);
2951        assert!(!compare_greater_byte_array_decimals(
2952            &[0u8, 1u8,],
2953            &[1u8, 0u8,],
2954        ),);
2955        assert!(!compare_greater_byte_array_decimals(
2956            &[255u8, 35u8, 0u8, 0u8,],
2957            &[0u8,],
2958        ),);
2959        assert!(compare_greater_byte_array_decimals(
2960            &[0u8,],
2961            &[255u8, 35u8, 0u8, 0u8,],
2962        ),);
2963    }
2964
2965    #[test]
2966    fn test_column_index_with_null_pages() {
2967        // write a single page of all nulls
2968        let page_writer = get_test_page_writer();
2969        let props = Default::default();
2970        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
2971        writer.write_batch(&[], Some(&[0, 0, 0, 0]), None).unwrap();
2972
2973        let r = writer.close().unwrap();
2974        assert!(r.column_index.is_some());
2975        let col_idx = r.column_index.unwrap();
2976        let col_idx = match col_idx {
2977            ColumnIndexMetaData::INT32(col_idx) => col_idx,
2978            _ => panic!("wrong stats type"),
2979        };
2980        // null_pages should be true for page 0
2981        assert!(col_idx.is_null_page(0));
2982        // min and max should be empty byte arrays
2983        assert!(col_idx.min_value(0).is_none());
2984        assert!(col_idx.max_value(0).is_none());
2985        // null_counts should be defined and be 4 for page 0
2986        assert!(col_idx.null_count(0).is_some());
2987        assert_eq!(col_idx.null_count(0), Some(4));
2988        // there is no repetition so rep histogram should be absent
2989        assert!(col_idx.repetition_level_histogram(0).is_none());
2990        // definition_level_histogram should be present and should be 0:4, 1:0
2991        assert!(col_idx.definition_level_histogram(0).is_some());
2992        assert_eq!(col_idx.definition_level_histogram(0).unwrap(), &[4, 0]);
2993    }
2994
2995    #[test]
2996    fn test_column_offset_index_metadata() {
2997        // write data
2998        // and check the offset index and column index
2999        let page_writer = get_test_page_writer();
3000        let props = Default::default();
3001        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
3002        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
3003        // first page
3004        writer.flush_data_pages().unwrap();
3005        // second page
3006        writer.write_batch(&[4, 8, 2, -5], None, None).unwrap();
3007
3008        let r = writer.close().unwrap();
3009        let column_index = r.column_index.unwrap();
3010        let offset_index = r.offset_index.unwrap();
3011
3012        assert_eq!(8, r.rows_written);
3013
3014        // column index
3015        let column_index = match column_index {
3016            ColumnIndexMetaData::INT32(column_index) => column_index,
3017            _ => panic!("wrong stats type"),
3018        };
3019        assert_eq!(2, column_index.num_pages());
3020        assert_eq!(2, offset_index.page_locations.len());
3021        assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order);
3022        for idx in 0..2 {
3023            assert!(!column_index.is_null_page(idx));
3024            assert_eq!(0, column_index.null_count(0).unwrap());
3025        }
3026
3027        if let Some(stats) = r.metadata.statistics() {
3028            assert_eq!(stats.null_count_opt(), Some(0));
3029            assert_eq!(stats.distinct_count_opt(), None);
3030            if let Statistics::Int32(stats) = stats {
3031                // first page is [1,2,3,4]
3032                // second page is [-5,2,4,8]
3033                // note that we don't increment here, as this is a non BinaryArray type.
3034                assert_eq!(stats.min_opt(), column_index.min_value(1));
3035                assert_eq!(stats.max_opt(), column_index.max_value(1));
3036            } else {
3037                panic!("expecting Statistics::Int32");
3038            }
3039        } else {
3040            panic!("metadata missing statistics");
3041        }
3042
3043        // page location
3044        assert_eq!(0, offset_index.page_locations[0].first_row_index);
3045        assert_eq!(4, offset_index.page_locations[1].first_row_index);
3046    }
3047
3048    /// Verify min/max value truncation in the column index works as expected
3049    #[test]
3050    fn test_column_offset_index_metadata_truncating() {
3051        // write data
3052        // and check the offset index and column index
3053        let page_writer = get_test_page_writer();
3054        let props = WriterProperties::builder()
3055            .set_statistics_truncate_length(None) // disable column index truncation
3056            .build()
3057            .into();
3058        let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3059
3060        let mut data = vec![FixedLenByteArray::default(); 3];
3061        // This is the expected min value - "aaa..."
3062        data[0].set_data(Bytes::from(vec![97_u8; 200]));
3063        // This is the expected max value - "ZZZ..."
3064        data[1].set_data(Bytes::from(vec![112_u8; 200]));
3065        data[2].set_data(Bytes::from(vec![98_u8; 200]));
3066
3067        writer.write_batch(&data, None, None).unwrap();
3068
3069        writer.flush_data_pages().unwrap();
3070
3071        let r = writer.close().unwrap();
3072        let column_index = r.column_index.unwrap();
3073        let offset_index = r.offset_index.unwrap();
3074
3075        let column_index = match column_index {
3076            ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
3077            _ => panic!("wrong stats type"),
3078        };
3079
3080        assert_eq!(3, r.rows_written);
3081
3082        // column index
3083        assert_eq!(1, column_index.num_pages());
3084        assert_eq!(1, offset_index.page_locations.len());
3085        assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
3086        assert!(!column_index.is_null_page(0));
3087        assert_eq!(Some(0), column_index.null_count(0));
3088
3089        if let Some(stats) = r.metadata.statistics() {
3090            assert_eq!(stats.null_count_opt(), Some(0));
3091            assert_eq!(stats.distinct_count_opt(), None);
3092            if let Statistics::FixedLenByteArray(stats) = stats {
3093                let column_index_min_value = column_index.min_value(0).unwrap();
3094                let column_index_max_value = column_index.max_value(0).unwrap();
3095
3096                // Column index stats are truncated, while the column chunk's aren't.
3097                assert_ne!(stats.min_bytes_opt().unwrap(), column_index_min_value);
3098                assert_ne!(stats.max_bytes_opt().unwrap(), column_index_max_value);
3099
3100                assert_eq!(
3101                    column_index_min_value.len(),
3102                    DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
3103                );
3104                assert_eq!(column_index_min_value, &[97_u8; 64]);
3105                assert_eq!(
3106                    column_index_max_value.len(),
3107                    DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
3108                );
3109
3110                // We expect the last byte to be incremented
3111                assert_eq!(
3112                    *column_index_max_value.last().unwrap(),
3113                    *column_index_max_value.first().unwrap() + 1
3114                );
3115            } else {
3116                panic!("expecting Statistics::FixedLenByteArray");
3117            }
3118        } else {
3119            panic!("metadata missing statistics");
3120        }
3121    }
3122
3123    #[test]
3124    fn test_column_offset_index_truncating_spec_example() {
3125        // write data
3126        // and check the offset index and column index
3127        let page_writer = get_test_page_writer();
3128
3129        // Truncate values at 1 byte
3130        let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3131        let props = Arc::new(builder.build());
3132        let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3133
3134        let mut data = vec![FixedLenByteArray::default(); 1];
3135        // This is the expected min value
3136        data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
3137
3138        writer.write_batch(&data, None, None).unwrap();
3139
3140        writer.flush_data_pages().unwrap();
3141
3142        let r = writer.close().unwrap();
3143        let column_index = r.column_index.unwrap();
3144        let offset_index = r.offset_index.unwrap();
3145
3146        let column_index = match column_index {
3147            ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
3148            _ => panic!("wrong stats type"),
3149        };
3150
3151        assert_eq!(1, r.rows_written);
3152
3153        // column index
3154        assert_eq!(1, column_index.num_pages());
3155        assert_eq!(1, offset_index.page_locations.len());
3156        assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
3157        assert!(!column_index.is_null_page(0));
3158        assert_eq!(Some(0), column_index.null_count(0));
3159
3160        if let Some(stats) = r.metadata.statistics() {
3161            assert_eq!(stats.null_count_opt(), Some(0));
3162            assert_eq!(stats.distinct_count_opt(), None);
3163            if let Statistics::FixedLenByteArray(_stats) = stats {
3164                let column_index_min_value = column_index.min_value(0).unwrap();
3165                let column_index_max_value = column_index.max_value(0).unwrap();
3166
3167                assert_eq!(column_index_min_value.len(), 1);
3168                assert_eq!(column_index_max_value.len(), 1);
3169
3170                assert_eq!("B".as_bytes(), column_index_min_value);
3171                assert_eq!("C".as_bytes(), column_index_max_value);
3172
3173                assert_ne!(column_index_min_value, stats.min_bytes_opt().unwrap());
3174                assert_ne!(column_index_max_value, stats.max_bytes_opt().unwrap());
3175            } else {
3176                panic!("expecting Statistics::FixedLenByteArray");
3177            }
3178        } else {
3179            panic!("metadata missing statistics");
3180        }
3181    }
3182
3183    #[test]
3184    fn test_float16_min_max_no_truncation() {
3185        // Even if we set truncation to occur at 1 byte, we should not truncate for Float16
3186        let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3187        let props = Arc::new(builder.build());
3188        let page_writer = get_test_page_writer();
3189        let mut writer = get_test_float16_column_writer(page_writer, props);
3190
3191        let expected_value = f16::PI.to_le_bytes().to_vec();
3192        let data = vec![ByteArray::from(expected_value.clone()).into()];
3193        writer.write_batch(&data, None, None).unwrap();
3194        writer.flush_data_pages().unwrap();
3195
3196        let r = writer.close().unwrap();
3197
3198        // stats should still be written
3199        // ensure bytes weren't truncated for column index
3200        let column_index = r.column_index.unwrap();
3201        let column_index = match column_index {
3202            ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
3203            _ => panic!("wrong stats type"),
3204        };
3205        let column_index_min_bytes = column_index.min_value(0).unwrap();
3206        let column_index_max_bytes = column_index.max_value(0).unwrap();
3207        assert_eq!(expected_value, column_index_min_bytes);
3208        assert_eq!(expected_value, column_index_max_bytes);
3209
3210        // ensure bytes weren't truncated for statistics
3211        let stats = r.metadata.statistics().unwrap();
3212        if let Statistics::FixedLenByteArray(stats) = stats {
3213            let stats_min_bytes = stats.min_bytes_opt().unwrap();
3214            let stats_max_bytes = stats.max_bytes_opt().unwrap();
3215            assert_eq!(expected_value, stats_min_bytes);
3216            assert_eq!(expected_value, stats_max_bytes);
3217        } else {
3218            panic!("expecting Statistics::FixedLenByteArray");
3219        }
3220    }
3221
3222    #[test]
3223    fn test_decimal_min_max_no_truncation() {
3224        // Even if we set truncation to occur at 1 byte, we should not truncate for Decimal
3225        let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3226        let props = Arc::new(builder.build());
3227        let page_writer = get_test_page_writer();
3228        let mut writer =
3229            get_test_decimals_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3230
3231        let expected_value = vec![
3232            255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8, 35u8,
3233            231u8, 90u8, 0u8, 0u8,
3234        ];
3235        let data = vec![ByteArray::from(expected_value.clone()).into()];
3236        writer.write_batch(&data, None, None).unwrap();
3237        writer.flush_data_pages().unwrap();
3238
3239        let r = writer.close().unwrap();
3240
3241        // stats should still be written
3242        // ensure bytes weren't truncated for column index
3243        let column_index = r.column_index.unwrap();
3244        let column_index = match column_index {
3245            ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
3246            _ => panic!("wrong stats type"),
3247        };
3248        let column_index_min_bytes = column_index.min_value(0).unwrap();
3249        let column_index_max_bytes = column_index.max_value(0).unwrap();
3250        assert_eq!(expected_value, column_index_min_bytes);
3251        assert_eq!(expected_value, column_index_max_bytes);
3252
3253        // ensure bytes weren't truncated for statistics
3254        let stats = r.metadata.statistics().unwrap();
3255        if let Statistics::FixedLenByteArray(stats) = stats {
3256            let stats_min_bytes = stats.min_bytes_opt().unwrap();
3257            let stats_max_bytes = stats.max_bytes_opt().unwrap();
3258            assert_eq!(expected_value, stats_min_bytes);
3259            assert_eq!(expected_value, stats_max_bytes);
3260        } else {
3261            panic!("expecting Statistics::FixedLenByteArray");
3262        }
3263    }
3264
3265    #[test]
3266    fn test_statistics_truncating_byte_array_default() {
3267        let page_writer = get_test_page_writer();
3268
3269        // The default truncate length is 64 bytes
3270        let props = WriterProperties::builder().build().into();
3271        let mut writer = get_test_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
3272
3273        let mut data = vec![ByteArray::default(); 1];
3274        data[0].set_data(Bytes::from(String::from(
3275            "This string is longer than 64 bytes, so it will almost certainly be truncated.",
3276        )));
3277        writer.write_batch(&data, None, None).unwrap();
3278        writer.flush_data_pages().unwrap();
3279
3280        let r = writer.close().unwrap();
3281
3282        assert_eq!(1, r.rows_written);
3283
3284        let stats = r.metadata.statistics().expect("statistics");
3285        if let Statistics::ByteArray(_stats) = stats {
3286            let min_value = _stats.min_opt().unwrap();
3287            let max_value = _stats.max_opt().unwrap();
3288
3289            assert!(!_stats.min_is_exact());
3290            assert!(!_stats.max_is_exact());
3291
3292            let expected_len = 64;
3293            assert_eq!(min_value.len(), expected_len);
3294            assert_eq!(max_value.len(), expected_len);
3295
3296            let expected_min =
3297                "This string is longer than 64 bytes, so it will almost certainly".as_bytes();
3298            assert_eq!(expected_min, min_value.as_bytes());
3299            // note the max value is different from the min value: the last byte is incremented
3300            let expected_max =
3301                "This string is longer than 64 bytes, so it will almost certainlz".as_bytes();
3302            assert_eq!(expected_max, max_value.as_bytes());
3303        } else {
3304            panic!("expecting Statistics::ByteArray");
3305        }
3306    }
3307
3308    #[test]
3309    fn test_statistics_truncating_byte_array() {
3310        let page_writer = get_test_page_writer();
3311
3312        const TEST_TRUNCATE_LENGTH: usize = 1;
3313
3314        // Truncate values at 1 byte
3315        let builder =
3316            WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3317        let props = Arc::new(builder.build());
3318        let mut writer = get_test_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
3319
3320        let mut data = vec![ByteArray::default(); 1];
3321        // This is the expected min value
3322        data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
3323
3324        writer.write_batch(&data, None, None).unwrap();
3325
3326        writer.flush_data_pages().unwrap();
3327
3328        let r = writer.close().unwrap();
3329
3330        assert_eq!(1, r.rows_written);
3331
3332        let stats = r.metadata.statistics().expect("statistics");
3333        assert_eq!(stats.null_count_opt(), Some(0));
3334        assert_eq!(stats.distinct_count_opt(), None);
3335        if let Statistics::ByteArray(_stats) = stats {
3336            let min_value = _stats.min_opt().unwrap();
3337            let max_value = _stats.max_opt().unwrap();
3338
3339            assert!(!_stats.min_is_exact());
3340            assert!(!_stats.max_is_exact());
3341
3342            assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3343            assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3344
3345            assert_eq!("B".as_bytes(), min_value.as_bytes());
3346            assert_eq!("C".as_bytes(), max_value.as_bytes());
3347        } else {
3348            panic!("expecting Statistics::ByteArray");
3349        }
3350    }
3351
3352    #[test]
3353    fn test_statistics_truncating_fixed_len_byte_array() {
3354        let page_writer = get_test_page_writer();
3355
3356        const TEST_TRUNCATE_LENGTH: usize = 1;
3357
3358        // Truncate values at 1 byte
3359        let builder =
3360            WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3361        let props = Arc::new(builder.build());
3362        let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3363
3364        let mut data = vec![FixedLenByteArray::default(); 1];
3365
3366        const PSEUDO_DECIMAL_VALUE: i128 = 6541894651216648486512564456564654;
3367        const PSEUDO_DECIMAL_BYTES: [u8; 16] = PSEUDO_DECIMAL_VALUE.to_be_bytes();
3368
3369        const EXPECTED_MIN: [u8; TEST_TRUNCATE_LENGTH] = [PSEUDO_DECIMAL_BYTES[0]]; // parquet specifies big-endian order for decimals
3370        const EXPECTED_MAX: [u8; TEST_TRUNCATE_LENGTH] =
3371            [PSEUDO_DECIMAL_BYTES[0].overflowing_add(1).0];
3372
3373        // This is the expected min value
3374        data[0].set_data(Bytes::from(PSEUDO_DECIMAL_BYTES.as_slice()));
3375
3376        writer.write_batch(&data, None, None).unwrap();
3377
3378        writer.flush_data_pages().unwrap();
3379
3380        let r = writer.close().unwrap();
3381
3382        assert_eq!(1, r.rows_written);
3383
3384        let stats = r.metadata.statistics().expect("statistics");
3385        assert_eq!(stats.null_count_opt(), Some(0));
3386        assert_eq!(stats.distinct_count_opt(), None);
3387        if let Statistics::FixedLenByteArray(_stats) = stats {
3388            let min_value = _stats.min_opt().unwrap();
3389            let max_value = _stats.max_opt().unwrap();
3390
3391            assert!(!_stats.min_is_exact());
3392            assert!(!_stats.max_is_exact());
3393
3394            assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3395            assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3396
3397            assert_eq!(EXPECTED_MIN.as_slice(), min_value.as_bytes());
3398            assert_eq!(EXPECTED_MAX.as_slice(), max_value.as_bytes());
3399
3400            let reconstructed_min = i128::from_be_bytes([
3401                min_value.as_bytes()[0],
3402                0,
3403                0,
3404                0,
3405                0,
3406                0,
3407                0,
3408                0,
3409                0,
3410                0,
3411                0,
3412                0,
3413                0,
3414                0,
3415                0,
3416                0,
3417            ]);
3418
3419            let reconstructed_max = i128::from_be_bytes([
3420                max_value.as_bytes()[0],
3421                0,
3422                0,
3423                0,
3424                0,
3425                0,
3426                0,
3427                0,
3428                0,
3429                0,
3430                0,
3431                0,
3432                0,
3433                0,
3434                0,
3435                0,
3436            ]);
3437
3438            // check that the inner value is correctly bounded by the min/max
3439            println!("min: {reconstructed_min} {PSEUDO_DECIMAL_VALUE}");
3440            assert!(reconstructed_min <= PSEUDO_DECIMAL_VALUE);
3441            println!("max {reconstructed_max} {PSEUDO_DECIMAL_VALUE}");
3442            assert!(reconstructed_max >= PSEUDO_DECIMAL_VALUE);
3443        } else {
3444            panic!("expecting Statistics::FixedLenByteArray");
3445        }
3446    }
3447
3448    #[test]
3449    fn test_send() {
3450        fn test<T: Send>() {}
3451        test::<ColumnWriterImpl<Int32Type>>();
3452    }
3453
3454    #[test]
3455    fn test_increment() {
3456        let v = increment(vec![0, 0, 0]).unwrap();
3457        assert_eq!(&v, &[0, 0, 1]);
3458
3459        // Handle overflow
3460        let v = increment(vec![0, 255, 255]).unwrap();
3461        assert_eq!(&v, &[1, 0, 0]);
3462
3463        // Return `None` if all bytes are u8::MAX
3464        let v = increment(vec![255, 255, 255]);
3465        assert!(v.is_none());
3466    }
3467
3468    #[test]
3469    fn test_increment_utf8() {
3470        let test_inc = |o: &str, expected: &str| {
3471            if let Ok(v) = String::from_utf8(increment_utf8(o).unwrap()) {
3472                // Got the expected result...
3473                assert_eq!(v, expected);
3474                // and it's greater than the original string
3475                assert!(*v > *o);
3476                // Also show that BinaryArray level comparison works here
3477                let mut greater = ByteArray::new();
3478                greater.set_data(Bytes::from(v));
3479                let mut original = ByteArray::new();
3480                original.set_data(Bytes::from(o.as_bytes().to_vec()));
3481                assert!(greater > original);
3482            } else {
3483                panic!("Expected incremented UTF8 string to also be valid.");
3484            }
3485        };
3486
3487        // Basic ASCII case
3488        test_inc("hello", "hellp");
3489
3490        // 1-byte ending in max 1-byte
3491        test_inc("a\u{7f}", "b");
3492
3493        // 1-byte max should not truncate as it would need 2-byte code points
3494        assert!(increment_utf8("\u{7f}\u{7f}").is_none());
3495
3496        // UTF8 string
3497        test_inc("❤️🧡💛💚💙💜", "❤️🧡💛💚💙💝");
3498
3499        // 2-byte without overflow
3500        test_inc("éééé", "éééê");
3501
3502        // 2-byte that overflows lowest byte
3503        test_inc("\u{ff}\u{ff}", "\u{ff}\u{100}");
3504
3505        // 2-byte ending in max 2-byte
3506        test_inc("a\u{7ff}", "b");
3507
3508        // Max 2-byte should not truncate as it would need 3-byte code points
3509        assert!(increment_utf8("\u{7ff}\u{7ff}").is_none());
3510
3511        // 3-byte without overflow [U+800, U+800] -> [U+800, U+801] (note that these
3512        // characters should render right to left).
3513        test_inc("ࠀࠀ", "ࠀࠁ");
3514
3515        // 3-byte ending in max 3-byte
3516        test_inc("a\u{ffff}", "b");
3517
3518        // Max 3-byte should not truncate as it would need 4-byte code points
3519        assert!(increment_utf8("\u{ffff}\u{ffff}").is_none());
3520
3521        // 4-byte without overflow
3522        test_inc("𐀀𐀀", "𐀀𐀁");
3523
3524        // 4-byte ending in max unicode
3525        test_inc("a\u{10ffff}", "b");
3526
3527        // Max 4-byte should not truncate
3528        assert!(increment_utf8("\u{10ffff}\u{10ffff}").is_none());
3529
3530        // Skip over surrogate pair range (0xD800..=0xDFFF)
3531        //test_inc("a\u{D7FF}", "a\u{e000}");
3532        test_inc("a\u{D7FF}", "b");
3533    }
3534
3535    #[test]
3536    fn test_truncate_utf8() {
3537        // No-op
3538        let data = "❤️🧡💛💚💙💜";
3539        let r = truncate_utf8(data, data.len()).unwrap();
3540        assert_eq!(r.len(), data.len());
3541        assert_eq!(&r, data.as_bytes());
3542
3543        // We slice it away from the UTF8 boundary
3544        let r = truncate_utf8(data, 13).unwrap();
3545        assert_eq!(r.len(), 10);
3546        assert_eq!(&r, "❤️🧡".as_bytes());
3547
3548        // One multi-byte code point, and a length shorter than it, so we can't slice it
3549        let r = truncate_utf8("\u{0836}", 1);
3550        assert!(r.is_none());
3551
3552        // Test truncate and increment for max bounds on UTF-8 statistics
3553        // 7-bit (i.e. ASCII)
3554        let r = truncate_and_increment_utf8("yyyyyyyyy", 8).unwrap();
3555        assert_eq!(&r, "yyyyyyyz".as_bytes());
3556
3557        // 2-byte without overflow
3558        let r = truncate_and_increment_utf8("ééééé", 7).unwrap();
3559        assert_eq!(&r, "ééê".as_bytes());
3560
3561        // 2-byte that overflows lowest byte
3562        let r = truncate_and_increment_utf8("\u{ff}\u{ff}\u{ff}\u{ff}\u{ff}", 8).unwrap();
3563        assert_eq!(&r, "\u{ff}\u{ff}\u{ff}\u{100}".as_bytes());
3564
3565        // max 2-byte should not truncate as it would need 3-byte code points
3566        let r = truncate_and_increment_utf8("߿߿߿߿߿", 8);
3567        assert!(r.is_none());
3568
3569        // 3-byte without overflow [U+800, U+800, U+800] -> [U+800, U+801] (note that these
3570        // characters should render right to left).
3571        let r = truncate_and_increment_utf8("ࠀࠀࠀࠀ", 8).unwrap();
3572        assert_eq!(&r, "ࠀࠁ".as_bytes());
3573
3574        // max 3-byte should not truncate as it would need 4-byte code points
3575        let r = truncate_and_increment_utf8("\u{ffff}\u{ffff}\u{ffff}", 8);
3576        assert!(r.is_none());
3577
3578        // 4-byte without overflow
3579        let r = truncate_and_increment_utf8("𐀀𐀀𐀀𐀀", 9).unwrap();
3580        assert_eq!(&r, "𐀀𐀁".as_bytes());
3581
3582        // max 4-byte should not truncate
3583        let r = truncate_and_increment_utf8("\u{10ffff}\u{10ffff}", 8);
3584        assert!(r.is_none());
3585    }
3586
3587    #[test]
3588    // Check fallback truncation of statistics that should be UTF-8, but aren't
3589    // (see https://github.com/apache/arrow-rs/pull/6870).
3590    fn test_byte_array_truncate_invalid_utf8_statistics() {
3591        let message_type = "
3592            message test_schema {
3593                OPTIONAL BYTE_ARRAY a (UTF8);
3594            }
3595        ";
3596        let schema = Arc::new(parse_message_type(message_type).unwrap());
3597
3598        // Create Vec<ByteArray> containing non-UTF8 bytes
3599        let data = vec![ByteArray::from(vec![128u8; 32]); 7];
3600        let def_levels = [1, 1, 1, 1, 0, 1, 0, 1, 0, 1];
3601        let file: File = tempfile::tempfile().unwrap();
3602        let props = Arc::new(
3603            WriterProperties::builder()
3604                .set_statistics_enabled(EnabledStatistics::Chunk)
3605                .set_statistics_truncate_length(Some(8))
3606                .build(),
3607        );
3608
3609        let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
3610        let mut row_group_writer = writer.next_row_group().unwrap();
3611
3612        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
3613        col_writer
3614            .typed::<ByteArrayType>()
3615            .write_batch(&data, Some(&def_levels), None)
3616            .unwrap();
3617        col_writer.close().unwrap();
3618        row_group_writer.close().unwrap();
3619        let file_metadata = writer.close().unwrap();
3620        let stats = file_metadata.row_group(0).column(0).statistics().unwrap();
3621        assert!(!stats.max_is_exact());
3622        // Truncation of invalid UTF-8 should fall back to binary truncation, so last byte should
3623        // be incremented by 1.
3624        assert_eq!(
3625            stats.max_bytes_opt().map(|v| v.to_vec()),
3626            Some([128, 128, 128, 128, 128, 128, 128, 129].to_vec())
3627        );
3628    }
3629
3630    #[test]
3631    fn test_increment_max_binary_chars() {
3632        let r = increment(vec![0xFF, 0xFE, 0xFD, 0xFF, 0xFF]);
3633        assert_eq!(&r.unwrap(), &[0xFF, 0xFE, 0xFE, 0x00, 0x00]);
3634
3635        let incremented = increment(vec![0xFF, 0xFF, 0xFF]);
3636        assert!(incremented.is_none())
3637    }
3638
3639    #[test]
3640    fn test_no_column_index_when_stats_disabled() {
3641        // https://github.com/apache/arrow-rs/issues/6010
3642        // Test that column index is not created/written for all-nulls column when page
3643        // statistics are disabled.
3644        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3645        let props = Arc::new(
3646            WriterProperties::builder()
3647                .set_statistics_enabled(EnabledStatistics::None)
3648                .build(),
3649        );
3650        let column_writer = get_column_writer(descr, props, get_test_page_writer());
3651        let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3652
3653        let data = Vec::new();
3654        let def_levels = vec![0; 10];
3655        writer.write_batch(&data, Some(&def_levels), None).unwrap();
3656        writer.flush_data_pages().unwrap();
3657
3658        let column_close_result = writer.close().unwrap();
3659        assert!(column_close_result.offset_index.is_some());
3660        assert!(column_close_result.column_index.is_none());
3661    }
3662
3663    #[test]
3664    fn test_no_offset_index_when_disabled() {
3665        // Test that offset indexes can be disabled
3666        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3667        let props = Arc::new(
3668            WriterProperties::builder()
3669                .set_statistics_enabled(EnabledStatistics::None)
3670                .set_offset_index_disabled(true)
3671                .build(),
3672        );
3673        let column_writer = get_column_writer(descr, props, get_test_page_writer());
3674        let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3675
3676        let data = Vec::new();
3677        let def_levels = vec![0; 10];
3678        writer.write_batch(&data, Some(&def_levels), None).unwrap();
3679        writer.flush_data_pages().unwrap();
3680
3681        let column_close_result = writer.close().unwrap();
3682        assert!(column_close_result.offset_index.is_none());
3683        assert!(column_close_result.column_index.is_none());
3684    }
3685
3686    #[test]
3687    fn test_offset_index_overridden() {
3688        // Test that offset indexes are not disabled when gathering page statistics
3689        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3690        let props = Arc::new(
3691            WriterProperties::builder()
3692                .set_statistics_enabled(EnabledStatistics::Page)
3693                .set_offset_index_disabled(true)
3694                .build(),
3695        );
3696        let column_writer = get_column_writer(descr, props, get_test_page_writer());
3697        let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3698
3699        let data = Vec::new();
3700        let def_levels = vec![0; 10];
3701        writer.write_batch(&data, Some(&def_levels), None).unwrap();
3702        writer.flush_data_pages().unwrap();
3703
3704        let column_close_result = writer.close().unwrap();
3705        assert!(column_close_result.offset_index.is_some());
3706        assert!(column_close_result.column_index.is_some());
3707    }
3708
3709    #[test]
3710    fn test_boundary_order() -> Result<()> {
3711        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3712        // min max both ascending
3713        let column_close_result = write_multiple_pages::<Int32Type>(
3714            &descr,
3715            &[
3716                &[Some(-10), Some(10)],
3717                &[Some(-5), Some(11)],
3718                &[None],
3719                &[Some(-5), Some(11)],
3720            ],
3721        )?;
3722        let boundary_order = column_close_result
3723            .column_index
3724            .unwrap()
3725            .get_boundary_order();
3726        assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3727
3728        // min max both descending
3729        let column_close_result = write_multiple_pages::<Int32Type>(
3730            &descr,
3731            &[
3732                &[Some(10), Some(11)],
3733                &[Some(5), Some(11)],
3734                &[None],
3735                &[Some(-5), Some(0)],
3736            ],
3737        )?;
3738        let boundary_order = column_close_result
3739            .column_index
3740            .unwrap()
3741            .get_boundary_order();
3742        assert_eq!(boundary_order, Some(BoundaryOrder::DESCENDING));
3743
3744        // min max both equal
3745        let column_close_result = write_multiple_pages::<Int32Type>(
3746            &descr,
3747            &[&[Some(10), Some(11)], &[None], &[Some(10), Some(11)]],
3748        )?;
3749        let boundary_order = column_close_result
3750            .column_index
3751            .unwrap()
3752            .get_boundary_order();
3753        assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3754
3755        // only nulls
3756        let column_close_result =
3757            write_multiple_pages::<Int32Type>(&descr, &[&[None], &[None], &[None]])?;
3758        let boundary_order = column_close_result
3759            .column_index
3760            .unwrap()
3761            .get_boundary_order();
3762        assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3763
3764        // one page
3765        let column_close_result =
3766            write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)]])?;
3767        let boundary_order = column_close_result
3768            .column_index
3769            .unwrap()
3770            .get_boundary_order();
3771        assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3772
3773        // one non-null page
3774        let column_close_result =
3775            write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)], &[None]])?;
3776        let boundary_order = column_close_result
3777            .column_index
3778            .unwrap()
3779            .get_boundary_order();
3780        assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3781
3782        // min max both unordered
3783        let column_close_result = write_multiple_pages::<Int32Type>(
3784            &descr,
3785            &[
3786                &[Some(10), Some(11)],
3787                &[Some(11), Some(16)],
3788                &[None],
3789                &[Some(-5), Some(0)],
3790            ],
3791        )?;
3792        let boundary_order = column_close_result
3793            .column_index
3794            .unwrap()
3795            .get_boundary_order();
3796        assert_eq!(boundary_order, Some(BoundaryOrder::UNORDERED));
3797
3798        // min max both ordered in different orders
3799        let column_close_result = write_multiple_pages::<Int32Type>(
3800            &descr,
3801            &[
3802                &[Some(1), Some(9)],
3803                &[Some(2), Some(8)],
3804                &[None],
3805                &[Some(3), Some(7)],
3806            ],
3807        )?;
3808        let boundary_order = column_close_result
3809            .column_index
3810            .unwrap()
3811            .get_boundary_order();
3812        assert_eq!(boundary_order, Some(BoundaryOrder::UNORDERED));
3813
3814        Ok(())
3815    }
3816
3817    #[test]
3818    fn test_boundary_order_logical_type() -> Result<()> {
3819        // ensure that logical types account for different sort order than underlying
3820        // physical type representation
3821        let f16_descr = Arc::new(get_test_float16_column_descr(1, 0));
3822        let fba_descr = {
3823            let tpe = SchemaType::primitive_type_builder(
3824                "col",
3825                FixedLenByteArrayType::get_physical_type(),
3826            )
3827            .with_length(2)
3828            .build()?;
3829            Arc::new(ColumnDescriptor::new(
3830                Arc::new(tpe),
3831                1,
3832                0,
3833                ColumnPath::from("col"),
3834            ))
3835        };
3836
3837        let values: &[&[Option<FixedLenByteArray>]] = &[
3838            &[Some(FixedLenByteArray::from(ByteArray::from(f16::ONE)))],
3839            &[Some(FixedLenByteArray::from(ByteArray::from(f16::ZERO)))],
3840            &[Some(FixedLenByteArray::from(ByteArray::from(
3841                f16::NEG_ZERO,
3842            )))],
3843            &[Some(FixedLenByteArray::from(ByteArray::from(f16::NEG_ONE)))],
3844        ];
3845
3846        // f16 descending
3847        let column_close_result =
3848            write_multiple_pages::<FixedLenByteArrayType>(&f16_descr, values)?;
3849        let boundary_order = column_close_result
3850            .column_index
3851            .unwrap()
3852            .get_boundary_order();
3853        assert_eq!(boundary_order, Some(BoundaryOrder::DESCENDING));
3854
3855        // same bytes, but fba unordered
3856        let column_close_result =
3857            write_multiple_pages::<FixedLenByteArrayType>(&fba_descr, values)?;
3858        let boundary_order = column_close_result
3859            .column_index
3860            .unwrap()
3861            .get_boundary_order();
3862        assert_eq!(boundary_order, Some(BoundaryOrder::UNORDERED));
3863
3864        Ok(())
3865    }
3866
3867    #[test]
3868    fn test_interval_stats_should_not_have_min_max() {
3869        let input = [
3870            vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
3871            vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
3872            vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2],
3873        ]
3874        .into_iter()
3875        .map(|s| ByteArray::from(s).into())
3876        .collect::<Vec<_>>();
3877
3878        let page_writer = get_test_page_writer();
3879        let mut writer = get_test_interval_column_writer(page_writer);
3880        writer.write_batch(&input, None, None).unwrap();
3881
3882        let metadata = writer.close().unwrap().metadata;
3883        let stats = if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
3884            stats.clone()
3885        } else {
3886            panic!("metadata missing statistics");
3887        };
3888        assert!(stats.min_bytes_opt().is_none());
3889        assert!(stats.max_bytes_opt().is_none());
3890    }
3891
3892    #[test]
3893    #[cfg(feature = "arrow")]
3894    fn test_column_writer_get_estimated_total_bytes() {
3895        let page_writer = get_test_page_writer();
3896        let props = Default::default();
3897        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
3898        assert_eq!(writer.get_estimated_total_bytes(), 0);
3899
3900        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
3901        writer.add_data_page().unwrap();
3902        let size_with_one_page = writer.get_estimated_total_bytes();
3903        assert_eq!(size_with_one_page, 20);
3904
3905        writer.write_batch(&[5, 6, 7, 8], None, None).unwrap();
3906        writer.add_data_page().unwrap();
3907        let size_with_two_pages = writer.get_estimated_total_bytes();
3908        // different pages have different compressed lengths
3909        assert_eq!(size_with_two_pages, 20 + 21);
3910    }
3911
3912    fn write_multiple_pages<T: DataType>(
3913        column_descr: &Arc<ColumnDescriptor>,
3914        pages: &[&[Option<T::T>]],
3915    ) -> Result<ColumnCloseResult> {
3916        let column_writer = get_column_writer(
3917            column_descr.clone(),
3918            Default::default(),
3919            get_test_page_writer(),
3920        );
3921        let mut writer = get_typed_column_writer::<T>(column_writer);
3922
3923        for &page in pages {
3924            let values = page.iter().filter_map(Clone::clone).collect::<Vec<_>>();
3925            let def_levels = page
3926                .iter()
3927                .map(|maybe_value| if maybe_value.is_some() { 1 } else { 0 })
3928                .collect::<Vec<_>>();
3929            writer.write_batch(&values, Some(&def_levels), None)?;
3930            writer.flush_data_pages()?;
3931        }
3932
3933        writer.close()
3934    }
3935
3936    /// Performs write-read roundtrip with randomly generated values and levels.
3937    /// `max_size` is maximum number of values or levels (if `max_def_level` > 0) to write
3938    /// for a column.
3939    fn column_roundtrip_random<T: DataType>(
3940        props: WriterProperties,
3941        max_size: usize,
3942        min_value: T::T,
3943        max_value: T::T,
3944        max_def_level: i16,
3945        max_rep_level: i16,
3946    ) where
3947        T::T: PartialOrd + SampleUniform + Copy,
3948    {
3949        let mut num_values: usize = 0;
3950
3951        let mut buf: Vec<i16> = Vec::new();
3952        let def_levels = if max_def_level > 0 {
3953            random_numbers_range(max_size, 0, max_def_level + 1, &mut buf);
3954            for &dl in &buf[..] {
3955                if dl == max_def_level {
3956                    num_values += 1;
3957                }
3958            }
3959            Some(&buf[..])
3960        } else {
3961            num_values = max_size;
3962            None
3963        };
3964
3965        let mut buf: Vec<i16> = Vec::new();
3966        let rep_levels = if max_rep_level > 0 {
3967            random_numbers_range(max_size, 0, max_rep_level + 1, &mut buf);
3968            buf[0] = 0; // Must start on record boundary
3969            Some(&buf[..])
3970        } else {
3971            None
3972        };
3973
3974        let mut values: Vec<T::T> = Vec::new();
3975        random_numbers_range(num_values, min_value, max_value, &mut values);
3976
3977        column_roundtrip::<T>(props, &values[..], def_levels, rep_levels);
3978    }
3979
3980    /// Performs write-read roundtrip and asserts written values and levels.
3981    fn column_roundtrip<T: DataType>(
3982        props: WriterProperties,
3983        values: &[T::T],
3984        def_levels: Option<&[i16]>,
3985        rep_levels: Option<&[i16]>,
3986    ) {
3987        let mut file = tempfile::tempfile().unwrap();
3988        let mut write = TrackedWrite::new(&mut file);
3989        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
3990
3991        let max_def_level = match def_levels {
3992            Some(buf) => *buf.iter().max().unwrap_or(&0i16),
3993            None => 0i16,
3994        };
3995
3996        let max_rep_level = match rep_levels {
3997            Some(buf) => *buf.iter().max().unwrap_or(&0i16),
3998            None => 0i16,
3999        };
4000
4001        let mut max_batch_size = values.len();
4002        if let Some(levels) = def_levels {
4003            max_batch_size = max_batch_size.max(levels.len());
4004        }
4005        if let Some(levels) = rep_levels {
4006            max_batch_size = max_batch_size.max(levels.len());
4007        }
4008
4009        let mut writer =
4010            get_test_column_writer::<T>(page_writer, max_def_level, max_rep_level, Arc::new(props));
4011
4012        let values_written = writer.write_batch(values, def_levels, rep_levels).unwrap();
4013        assert_eq!(values_written, values.len());
4014        let result = writer.close().unwrap();
4015
4016        drop(write);
4017
4018        let props = ReaderProperties::builder()
4019            .set_backward_compatible_lz4(false)
4020            .build();
4021        let page_reader = Box::new(
4022            SerializedPageReader::new_with_properties(
4023                Arc::new(file),
4024                &result.metadata,
4025                result.rows_written as usize,
4026                None,
4027                Arc::new(props),
4028            )
4029            .unwrap(),
4030        );
4031        let mut reader = get_test_column_reader::<T>(page_reader, max_def_level, max_rep_level);
4032
4033        let mut actual_values = Vec::with_capacity(max_batch_size);
4034        let mut actual_def_levels = def_levels.map(|_| Vec::with_capacity(max_batch_size));
4035        let mut actual_rep_levels = rep_levels.map(|_| Vec::with_capacity(max_batch_size));
4036
4037        let (_, values_read, levels_read) = reader
4038            .read_records(
4039                max_batch_size,
4040                actual_def_levels.as_mut(),
4041                actual_rep_levels.as_mut(),
4042                &mut actual_values,
4043            )
4044            .unwrap();
4045
4046        // Assert values, definition and repetition levels.
4047
4048        assert_eq!(&actual_values[..values_read], values);
4049        match actual_def_levels {
4050            Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), def_levels),
4051            None => assert_eq!(None, def_levels),
4052        }
4053        match actual_rep_levels {
4054            Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), rep_levels),
4055            None => assert_eq!(None, rep_levels),
4056        }
4057
4058        // Assert written rows.
4059
4060        if let Some(levels) = actual_rep_levels {
4061            let mut actual_rows_written = 0;
4062            for l in levels {
4063                if l == 0 {
4064                    actual_rows_written += 1;
4065                }
4066            }
4067            assert_eq!(actual_rows_written, result.rows_written);
4068        } else if actual_def_levels.is_some() {
4069            assert_eq!(levels_read as u64, result.rows_written);
4070        } else {
4071            assert_eq!(values_read as u64, result.rows_written);
4072        }
4073    }
4074
4075    /// Performs write of provided values and returns column metadata of those values.
4076    /// Used to test encoding support for column writer.
4077    fn column_write_and_get_metadata<T: DataType>(
4078        props: WriterProperties,
4079        values: &[T::T],
4080    ) -> ColumnChunkMetaData {
4081        let page_writer = get_test_page_writer();
4082        let props = Arc::new(props);
4083        let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
4084        writer.write_batch(values, None, None).unwrap();
4085        writer.close().unwrap().metadata
4086    }
4087
4088    // Helper function to more compactly create a PageEncodingStats struct.
4089    fn encoding_stats(page_type: PageType, encoding: Encoding, count: i32) -> PageEncodingStats {
4090        PageEncodingStats {
4091            page_type,
4092            encoding,
4093            count,
4094        }
4095    }
4096
4097    // Function to use in tests for EncodingWriteSupport. This checks that dictionary
4098    // offset and encodings to make sure that column writer uses provided by trait
4099    // encodings.
4100    fn check_encoding_write_support<T: DataType>(
4101        version: WriterVersion,
4102        dict_enabled: bool,
4103        data: &[T::T],
4104        dictionary_page_offset: Option<i64>,
4105        encodings: &[Encoding],
4106        page_encoding_stats: &[PageEncodingStats],
4107    ) {
4108        let props = WriterProperties::builder()
4109            .set_writer_version(version)
4110            .set_dictionary_enabled(dict_enabled)
4111            .build();
4112        let meta = column_write_and_get_metadata::<T>(props, data);
4113        assert_eq!(meta.dictionary_page_offset(), dictionary_page_offset);
4114        assert_eq!(meta.encodings().collect::<Vec<_>>(), encodings);
4115        assert_eq!(meta.page_encoding_stats().unwrap(), page_encoding_stats);
4116    }
4117
4118    /// Returns column writer.
4119    fn get_test_column_writer<'a, T: DataType>(
4120        page_writer: Box<dyn PageWriter + 'a>,
4121        max_def_level: i16,
4122        max_rep_level: i16,
4123        props: WriterPropertiesPtr,
4124    ) -> ColumnWriterImpl<'a, T> {
4125        let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
4126        let column_writer = get_column_writer(descr, props, page_writer);
4127        get_typed_column_writer::<T>(column_writer)
4128    }
4129
4130    fn get_test_column_writer_with_path<'a, T: DataType>(
4131        page_writer: Box<dyn PageWriter + 'a>,
4132        max_def_level: i16,
4133        max_rep_level: i16,
4134        props: WriterPropertiesPtr,
4135        path: ColumnPath,
4136    ) -> ColumnWriterImpl<'a, T> {
4137        let descr = Arc::new(get_test_column_descr_with_path::<T>(
4138            max_def_level,
4139            max_rep_level,
4140            path,
4141        ));
4142        let column_writer = get_column_writer(descr, props, page_writer);
4143        get_typed_column_writer::<T>(column_writer)
4144    }
4145
4146    /// Returns column reader.
4147    fn get_test_column_reader<T: DataType>(
4148        page_reader: Box<dyn PageReader>,
4149        max_def_level: i16,
4150        max_rep_level: i16,
4151    ) -> ColumnReaderImpl<T> {
4152        let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
4153        let column_reader = get_column_reader(descr, page_reader);
4154        get_typed_column_reader::<T>(column_reader)
4155    }
4156
4157    /// Returns descriptor for primitive column.
4158    fn get_test_column_descr<T: DataType>(
4159        max_def_level: i16,
4160        max_rep_level: i16,
4161    ) -> ColumnDescriptor {
4162        let path = ColumnPath::from("col");
4163        let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4164            // length is set for "encoding support" tests for FIXED_LEN_BYTE_ARRAY type,
4165            // it should be no-op for other types
4166            .with_length(1)
4167            .build()
4168            .unwrap();
4169        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4170    }
4171
4172    fn get_test_column_descr_with_path<T: DataType>(
4173        max_def_level: i16,
4174        max_rep_level: i16,
4175        path: ColumnPath,
4176    ) -> ColumnDescriptor {
4177        let name = path.string();
4178        let tpe = SchemaType::primitive_type_builder(&name, T::get_physical_type())
4179            // length is set for "encoding support" tests for FIXED_LEN_BYTE_ARRAY type,
4180            // it should be no-op for other types
4181            .with_length(1)
4182            .build()
4183            .unwrap();
4184        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4185    }
4186
4187    fn write_and_collect_page_values(
4188        path: ColumnPath,
4189        props: WriterPropertiesPtr,
4190        data: &[i32],
4191    ) -> Vec<u32> {
4192        let mut file = tempfile::tempfile().unwrap();
4193        let mut write = TrackedWrite::new(&mut file);
4194        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
4195        let mut writer =
4196            get_test_column_writer_with_path::<Int32Type>(page_writer, 0, 0, props, path);
4197        writer.write_batch(data, None, None).unwrap();
4198        let r = writer.close().unwrap();
4199
4200        drop(write);
4201
4202        let props = ReaderProperties::builder()
4203            .set_backward_compatible_lz4(false)
4204            .build();
4205        let mut page_reader = Box::new(
4206            SerializedPageReader::new_with_properties(
4207                Arc::new(file),
4208                &r.metadata,
4209                r.rows_written as usize,
4210                None,
4211                Arc::new(props),
4212            )
4213            .unwrap(),
4214        );
4215
4216        let mut values_per_page = Vec::new();
4217        while let Some(page) = page_reader.get_next_page().unwrap() {
4218            assert_eq!(page.page_type(), PageType::DATA_PAGE);
4219            values_per_page.push(page.num_values());
4220        }
4221
4222        values_per_page
4223    }
4224
4225    /// Returns page writer that collects pages without serializing them.
4226    fn get_test_page_writer() -> Box<dyn PageWriter> {
4227        Box::new(TestPageWriter {})
4228    }
4229
4230    struct TestPageWriter {}
4231
4232    impl PageWriter for TestPageWriter {
4233        fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
4234            let mut res = PageWriteSpec::new();
4235            res.page_type = page.page_type();
4236            res.uncompressed_size = page.uncompressed_size();
4237            res.compressed_size = page.compressed_size();
4238            res.num_values = page.num_values();
4239            res.offset = 0;
4240            res.bytes_written = page.data().len() as u64;
4241            Ok(res)
4242        }
4243
4244        fn close(&mut self) -> Result<()> {
4245            Ok(())
4246        }
4247    }
4248
4249    /// Write data into parquet using [`get_test_page_writer`] and [`get_test_column_writer`] and returns generated statistics.
4250    fn statistics_roundtrip<T: DataType>(values: &[<T as DataType>::T]) -> Statistics {
4251        let page_writer = get_test_page_writer();
4252        let props = Default::default();
4253        let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
4254        writer.write_batch(values, None, None).unwrap();
4255
4256        let metadata = writer.close().unwrap().metadata;
4257        if let Some(stats) = metadata.statistics() {
4258            stats.clone()
4259        } else {
4260            panic!("metadata missing statistics");
4261        }
4262    }
4263
4264    /// Returns Decimals column writer.
4265    fn get_test_decimals_column_writer<T: DataType>(
4266        page_writer: Box<dyn PageWriter>,
4267        max_def_level: i16,
4268        max_rep_level: i16,
4269        props: WriterPropertiesPtr,
4270    ) -> ColumnWriterImpl<'static, T> {
4271        let descr = Arc::new(get_test_decimals_column_descr::<T>(
4272            max_def_level,
4273            max_rep_level,
4274        ));
4275        let column_writer = get_column_writer(descr, props, page_writer);
4276        get_typed_column_writer::<T>(column_writer)
4277    }
4278
4279    /// Returns descriptor for Decimal type with primitive column.
4280    fn get_test_decimals_column_descr<T: DataType>(
4281        max_def_level: i16,
4282        max_rep_level: i16,
4283    ) -> ColumnDescriptor {
4284        let path = ColumnPath::from("col");
4285        let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4286            .with_length(16)
4287            .with_logical_type(Some(LogicalType::Decimal {
4288                scale: 2,
4289                precision: 3,
4290            }))
4291            .with_scale(2)
4292            .with_precision(3)
4293            .build()
4294            .unwrap();
4295        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4296    }
4297
4298    fn float16_statistics_roundtrip(
4299        values: &[FixedLenByteArray],
4300    ) -> ValueStatistics<FixedLenByteArray> {
4301        let page_writer = get_test_page_writer();
4302        let mut writer = get_test_float16_column_writer(page_writer, Default::default());
4303        writer.write_batch(values, None, None).unwrap();
4304
4305        let metadata = writer.close().unwrap().metadata;
4306        if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
4307            stats.clone()
4308        } else {
4309            panic!("metadata missing statistics");
4310        }
4311    }
4312
4313    fn get_test_float16_column_writer(
4314        page_writer: Box<dyn PageWriter>,
4315        props: WriterPropertiesPtr,
4316    ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
4317        let descr = Arc::new(get_test_float16_column_descr(0, 0));
4318        let column_writer = get_column_writer(descr, props, page_writer);
4319        get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
4320    }
4321
4322    fn get_test_float16_column_descr(max_def_level: i16, max_rep_level: i16) -> ColumnDescriptor {
4323        let path = ColumnPath::from("col");
4324        let tpe =
4325            SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
4326                .with_length(2)
4327                .with_logical_type(Some(LogicalType::Float16))
4328                .build()
4329                .unwrap();
4330        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4331    }
4332
4333    fn get_test_interval_column_writer(
4334        page_writer: Box<dyn PageWriter>,
4335    ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
4336        let descr = Arc::new(get_test_interval_column_descr());
4337        let column_writer = get_column_writer(descr, Default::default(), page_writer);
4338        get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
4339    }
4340
4341    fn get_test_interval_column_descr() -> ColumnDescriptor {
4342        let path = ColumnPath::from("col");
4343        let tpe =
4344            SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
4345                .with_length(12)
4346                .with_converted_type(ConvertedType::INTERVAL)
4347                .build()
4348                .unwrap();
4349        ColumnDescriptor::new(Arc::new(tpe), 0, 0, path)
4350    }
4351
4352    /// Returns column writer for UINT32 Column provided as ConvertedType only
4353    fn get_test_unsigned_int_given_as_converted_column_writer<'a, T: DataType>(
4354        page_writer: Box<dyn PageWriter + 'a>,
4355        max_def_level: i16,
4356        max_rep_level: i16,
4357        props: WriterPropertiesPtr,
4358    ) -> ColumnWriterImpl<'a, T> {
4359        let descr = Arc::new(get_test_converted_type_unsigned_integer_column_descr::<T>(
4360            max_def_level,
4361            max_rep_level,
4362        ));
4363        let column_writer = get_column_writer(descr, props, page_writer);
4364        get_typed_column_writer::<T>(column_writer)
4365    }
4366
4367    /// Returns column descriptor for UINT32 Column provided as ConvertedType only
4368    fn get_test_converted_type_unsigned_integer_column_descr<T: DataType>(
4369        max_def_level: i16,
4370        max_rep_level: i16,
4371    ) -> ColumnDescriptor {
4372        let path = ColumnPath::from("col");
4373        let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4374            .with_converted_type(ConvertedType::UINT_32)
4375            .build()
4376            .unwrap();
4377        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4378    }
4379
4380    #[test]
4381    fn test_page_v2_snappy_compression_fallback() {
4382        // Test that PageV2 sets is_compressed to false when Snappy compression increases data size
4383        let page_writer = TestPageWriter {};
4384
4385        // Create WriterProperties with PageV2 and Snappy compression
4386        let props = WriterProperties::builder()
4387            .set_writer_version(WriterVersion::PARQUET_2_0)
4388            // Disable dictionary to ensure data is written directly
4389            .set_dictionary_enabled(false)
4390            .set_compression(Compression::SNAPPY)
4391            .build();
4392
4393        let mut column_writer =
4394            get_test_column_writer::<ByteArrayType>(Box::new(page_writer), 0, 0, Arc::new(props));
4395
4396        // Create small, simple data that Snappy compression will likely increase in size
4397        // due to compression overhead for very small data
4398        let values = vec![ByteArray::from("a")];
4399
4400        column_writer.write_batch(&values, None, None).unwrap();
4401
4402        let result = column_writer.close().unwrap();
4403        assert_eq!(
4404            result.metadata.uncompressed_size(),
4405            result.metadata.compressed_size()
4406        );
4407    }
4408}