Skip to main content

parquet/column/writer/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains column writer API.
19
20use bytes::Bytes;
21use half::f16;
22
23use crate::bloom_filter::Sbbf;
24use crate::file::page_index::column_index::ColumnIndexMetaData;
25use crate::file::page_index::offset_index::OffsetIndexMetaData;
26use std::collections::{BTreeSet, VecDeque};
27use std::str;
28
29use crate::basic::{
30    BoundaryOrder, Compression, ConvertedType, Encoding, EncodingMask, LogicalType, PageType, Type,
31};
32use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter};
33use crate::column::writer::encoder::{ColumnValueEncoder, ColumnValueEncoderImpl, ColumnValues};
34use crate::compression::{Codec, CodecOptionsBuilder, create_codec};
35use crate::data_type::private::ParquetValueType;
36use crate::data_type::*;
37use crate::encodings::levels::LevelEncoder;
38#[cfg(feature = "encryption")]
39use crate::encryption::encrypt::get_column_crypto_metadata;
40use crate::errors::{ParquetError, Result};
41use crate::file::metadata::{
42    ColumnChunkMetaData, ColumnChunkMetaDataBuilder, ColumnIndexBuilder, LevelHistogram,
43    OffsetIndexBuilder, PageEncodingStats,
44};
45use crate::file::properties::{
46    EnabledStatistics, WriterProperties, WriterPropertiesPtr, WriterVersion,
47};
48use crate::file::statistics::{Statistics, ValueStatistics};
49use crate::schema::types::{ColumnDescPtr, ColumnDescriptor};
50
51pub(crate) mod encoder;
52
53macro_rules! downcast_writer {
54    ($e:expr, $i:ident, $b:expr) => {
55        match $e {
56            Self::BoolColumnWriter($i) => $b,
57            Self::Int32ColumnWriter($i) => $b,
58            Self::Int64ColumnWriter($i) => $b,
59            Self::Int96ColumnWriter($i) => $b,
60            Self::FloatColumnWriter($i) => $b,
61            Self::DoubleColumnWriter($i) => $b,
62            Self::ByteArrayColumnWriter($i) => $b,
63            Self::FixedLenByteArrayColumnWriter($i) => $b,
64        }
65    };
66}
67
68/// Column writer for a Parquet type.
69///
70/// See [`get_column_writer`] to create instances of this type
71pub enum ColumnWriter<'a> {
72    /// Column writer for boolean type
73    BoolColumnWriter(ColumnWriterImpl<'a, BoolType>),
74    /// Column writer for int32 type
75    Int32ColumnWriter(ColumnWriterImpl<'a, Int32Type>),
76    /// Column writer for int64 type
77    Int64ColumnWriter(ColumnWriterImpl<'a, Int64Type>),
78    /// Column writer for int96 (timestamp) type
79    Int96ColumnWriter(ColumnWriterImpl<'a, Int96Type>),
80    /// Column writer for float type
81    FloatColumnWriter(ColumnWriterImpl<'a, FloatType>),
82    /// Column writer for double type
83    DoubleColumnWriter(ColumnWriterImpl<'a, DoubleType>),
84    /// Column writer for byte array type
85    ByteArrayColumnWriter(ColumnWriterImpl<'a, ByteArrayType>),
86    /// Column writer for fixed length byte array type
87    FixedLenByteArrayColumnWriter(ColumnWriterImpl<'a, FixedLenByteArrayType>),
88}
89
90impl ColumnWriter<'_> {
91    /// Returns the estimated total memory usage
92    #[cfg(feature = "arrow")]
93    pub(crate) fn memory_size(&self) -> usize {
94        downcast_writer!(self, typed, typed.memory_size())
95    }
96
97    /// Returns the estimated total encoded bytes for this column writer
98    #[cfg(feature = "arrow")]
99    pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
100        downcast_writer!(self, typed, typed.get_estimated_total_bytes())
101    }
102
103    /// Close this [`ColumnWriter`], returning the metadata for the column chunk.
104    pub fn close(self) -> Result<ColumnCloseResult> {
105        downcast_writer!(self, typed, typed.close())
106    }
107}
108
109/// Create a specific column writer corresponding to column descriptor `descr`.
110pub fn get_column_writer<'a>(
111    descr: ColumnDescPtr,
112    props: WriterPropertiesPtr,
113    page_writer: Box<dyn PageWriter + 'a>,
114) -> ColumnWriter<'a> {
115    match descr.physical_type() {
116        Type::BOOLEAN => {
117            ColumnWriter::BoolColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
118        }
119        Type::INT32 => {
120            ColumnWriter::Int32ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
121        }
122        Type::INT64 => {
123            ColumnWriter::Int64ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
124        }
125        Type::INT96 => {
126            ColumnWriter::Int96ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
127        }
128        Type::FLOAT => {
129            ColumnWriter::FloatColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
130        }
131        Type::DOUBLE => {
132            ColumnWriter::DoubleColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
133        }
134        Type::BYTE_ARRAY => {
135            ColumnWriter::ByteArrayColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
136        }
137        Type::FIXED_LEN_BYTE_ARRAY => ColumnWriter::FixedLenByteArrayColumnWriter(
138            ColumnWriterImpl::new(descr, props, page_writer),
139        ),
140    }
141}
142
143/// Gets a typed column writer for the specific type `T`, by "up-casting" `col_writer` of
144/// non-generic type to a generic column writer type `ColumnWriterImpl`.
145///
146/// Panics if actual enum value for `col_writer` does not match the type `T`.
147pub fn get_typed_column_writer<T: DataType>(col_writer: ColumnWriter) -> ColumnWriterImpl<T> {
148    T::get_column_writer(col_writer).unwrap_or_else(|| {
149        panic!(
150            "Failed to convert column writer into a typed column writer for `{}` type",
151            T::get_physical_type()
152        )
153    })
154}
155
156/// Similar to `get_typed_column_writer` but returns a reference.
157pub fn get_typed_column_writer_ref<'a, 'b: 'a, T: DataType>(
158    col_writer: &'b ColumnWriter<'a>,
159) -> &'b ColumnWriterImpl<'a, T> {
160    T::get_column_writer_ref(col_writer).unwrap_or_else(|| {
161        panic!(
162            "Failed to convert column writer into a typed column writer for `{}` type",
163            T::get_physical_type()
164        )
165    })
166}
167
168/// Similar to `get_typed_column_writer` but returns a reference.
169pub fn get_typed_column_writer_mut<'a, 'b: 'a, T: DataType>(
170    col_writer: &'a mut ColumnWriter<'b>,
171) -> &'a mut ColumnWriterImpl<'b, T> {
172    T::get_column_writer_mut(col_writer).unwrap_or_else(|| {
173        panic!(
174            "Failed to convert column writer into a typed column writer for `{}` type",
175            T::get_physical_type()
176        )
177    })
178}
179
180/// Metadata for a column chunk of a Parquet file.
181///
182/// Note this structure is returned by [`ColumnWriter::close`].
183#[derive(Debug, Clone)]
184pub struct ColumnCloseResult {
185    /// The total number of bytes written
186    pub bytes_written: u64,
187    /// The total number of rows written
188    pub rows_written: u64,
189    /// Metadata for this column chunk
190    pub metadata: ColumnChunkMetaData,
191    /// Optional bloom filter for this column
192    pub bloom_filter: Option<Sbbf>,
193    /// Optional column index, for filtering
194    pub column_index: Option<ColumnIndexMetaData>,
195    /// Optional offset index, identifying page locations
196    pub offset_index: Option<OffsetIndexMetaData>,
197}
198
199// Metrics per page
200#[derive(Default)]
201struct PageMetrics {
202    num_buffered_values: u32,
203    num_buffered_rows: u32,
204    num_page_nulls: u64,
205    repetition_level_histogram: Option<LevelHistogram>,
206    definition_level_histogram: Option<LevelHistogram>,
207}
208
209impl PageMetrics {
210    fn new() -> Self {
211        Default::default()
212    }
213
214    /// Initialize the repetition level histogram
215    fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
216        self.repetition_level_histogram = LevelHistogram::try_new(max_level);
217        self
218    }
219
220    /// Initialize the definition level histogram
221    fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
222        self.definition_level_histogram = LevelHistogram::try_new(max_level);
223        self
224    }
225
226    /// Resets the state of this `PageMetrics` to the initial state.
227    /// If histograms have been initialized their contents will be reset to zero.
228    fn new_page(&mut self) {
229        self.num_buffered_values = 0;
230        self.num_buffered_rows = 0;
231        self.num_page_nulls = 0;
232        self.repetition_level_histogram
233            .as_mut()
234            .map(LevelHistogram::reset);
235        self.definition_level_histogram
236            .as_mut()
237            .map(LevelHistogram::reset);
238    }
239
240    /// Updates histogram values using provided repetition levels
241    fn update_repetition_level_histogram(&mut self, levels: &[i16]) {
242        if let Some(ref mut rep_hist) = self.repetition_level_histogram {
243            rep_hist.update_from_levels(levels);
244        }
245    }
246
247    /// Updates histogram values using provided definition levels
248    fn update_definition_level_histogram(&mut self, levels: &[i16]) {
249        if let Some(ref mut def_hist) = self.definition_level_histogram {
250            def_hist.update_from_levels(levels);
251        }
252    }
253}
254
255// Metrics per column writer
256#[derive(Default)]
257struct ColumnMetrics<T: Default> {
258    total_bytes_written: u64,
259    total_rows_written: u64,
260    total_uncompressed_size: u64,
261    total_compressed_size: u64,
262    total_num_values: u64,
263    dictionary_page_offset: Option<u64>,
264    data_page_offset: Option<u64>,
265    min_column_value: Option<T>,
266    max_column_value: Option<T>,
267    num_column_nulls: u64,
268    column_distinct_count: Option<u64>,
269    variable_length_bytes: Option<i64>,
270    repetition_level_histogram: Option<LevelHistogram>,
271    definition_level_histogram: Option<LevelHistogram>,
272}
273
274impl<T: Default> ColumnMetrics<T> {
275    fn new() -> Self {
276        Default::default()
277    }
278
279    /// Initialize the repetition level histogram
280    fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
281        self.repetition_level_histogram = LevelHistogram::try_new(max_level);
282        self
283    }
284
285    /// Initialize the definition level histogram
286    fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
287        self.definition_level_histogram = LevelHistogram::try_new(max_level);
288        self
289    }
290
291    /// Sum `page_histogram` into `chunk_histogram`
292    fn update_histogram(
293        chunk_histogram: &mut Option<LevelHistogram>,
294        page_histogram: &Option<LevelHistogram>,
295    ) {
296        if let (Some(page_hist), Some(chunk_hist)) = (page_histogram, chunk_histogram) {
297            chunk_hist.add(page_hist);
298        }
299    }
300
301    /// Sum the provided PageMetrics histograms into the chunk histograms. Does nothing if
302    /// page histograms are not initialized.
303    fn update_from_page_metrics(&mut self, page_metrics: &PageMetrics) {
304        ColumnMetrics::<T>::update_histogram(
305            &mut self.definition_level_histogram,
306            &page_metrics.definition_level_histogram,
307        );
308        ColumnMetrics::<T>::update_histogram(
309            &mut self.repetition_level_histogram,
310            &page_metrics.repetition_level_histogram,
311        );
312    }
313
314    /// Sum the provided page variable_length_bytes into the chunk variable_length_bytes
315    fn update_variable_length_bytes(&mut self, variable_length_bytes: Option<i64>) {
316        if let Some(var_bytes) = variable_length_bytes {
317            *self.variable_length_bytes.get_or_insert(0) += var_bytes;
318        }
319    }
320}
321
322/// Typed column writer for a primitive column.
323pub type ColumnWriterImpl<'a, T> = GenericColumnWriter<'a, ColumnValueEncoderImpl<T>>;
324
325/// Generic column writer for a primitive Parquet column
326pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> {
327    // Column writer properties
328    descr: ColumnDescPtr,
329    props: WriterPropertiesPtr,
330    statistics_enabled: EnabledStatistics,
331
332    page_writer: Box<dyn PageWriter + 'a>,
333    codec: Compression,
334    compressor: Option<Box<dyn Codec>>,
335    encoder: E,
336
337    page_metrics: PageMetrics,
338    // Metrics per column writer
339    column_metrics: ColumnMetrics<E::T>,
340
341    /// The order of encodings within the generated metadata does not impact its meaning,
342    /// but we use a BTreeSet so that the output is deterministic
343    encodings: BTreeSet<Encoding>,
344    encoding_stats: Vec<PageEncodingStats>,
345    // Reused buffers
346    def_levels_sink: Vec<i16>,
347    rep_levels_sink: Vec<i16>,
348    data_pages: VecDeque<CompressedPage>,
349    // column index and offset index
350    column_index_builder: ColumnIndexBuilder,
351    offset_index_builder: Option<OffsetIndexBuilder>,
352
353    // Below fields used to incrementally check boundary order across data pages.
354    // We assume they are ascending/descending until proven wrong.
355    data_page_boundary_ascending: bool,
356    data_page_boundary_descending: bool,
357    /// (min, max)
358    last_non_null_data_page_min_max: Option<(E::T, E::T)>,
359}
360
361impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
362    /// Returns a new instance of [`GenericColumnWriter`].
363    pub fn new(
364        descr: ColumnDescPtr,
365        props: WriterPropertiesPtr,
366        page_writer: Box<dyn PageWriter + 'a>,
367    ) -> Self {
368        let codec = props.compression(descr.path());
369        let codec_options = CodecOptionsBuilder::default().build();
370        let compressor = create_codec(codec, &codec_options).unwrap();
371        let encoder = E::try_new(&descr, props.as_ref()).unwrap();
372
373        let statistics_enabled = props.statistics_enabled(descr.path());
374
375        let mut encodings = BTreeSet::new();
376        // Used for level information
377        encodings.insert(Encoding::RLE);
378
379        let mut page_metrics = PageMetrics::new();
380        let mut column_metrics = ColumnMetrics::<E::T>::new();
381
382        // Initialize level histograms if collecting page or chunk statistics
383        if statistics_enabled != EnabledStatistics::None {
384            page_metrics = page_metrics
385                .with_repetition_level_histogram(descr.max_rep_level())
386                .with_definition_level_histogram(descr.max_def_level());
387            column_metrics = column_metrics
388                .with_repetition_level_histogram(descr.max_rep_level())
389                .with_definition_level_histogram(descr.max_def_level())
390        }
391
392        // Disable column_index_builder if not collecting page statistics.
393        let mut column_index_builder = ColumnIndexBuilder::new(descr.physical_type());
394        if statistics_enabled != EnabledStatistics::Page {
395            column_index_builder.to_invalid()
396        }
397
398        // Disable offset_index_builder if requested by user.
399        let offset_index_builder = match props.offset_index_disabled() {
400            false => Some(OffsetIndexBuilder::new()),
401            _ => None,
402        };
403
404        Self {
405            descr,
406            props,
407            statistics_enabled,
408            page_writer,
409            codec,
410            compressor,
411            encoder,
412            def_levels_sink: vec![],
413            rep_levels_sink: vec![],
414            data_pages: VecDeque::new(),
415            page_metrics,
416            column_metrics,
417            column_index_builder,
418            offset_index_builder,
419            encodings,
420            encoding_stats: vec![],
421            data_page_boundary_ascending: true,
422            data_page_boundary_descending: true,
423            last_non_null_data_page_min_max: None,
424        }
425    }
426
427    #[allow(clippy::too_many_arguments)]
428    pub(crate) fn write_batch_internal(
429        &mut self,
430        values: &E::Values,
431        value_indices: Option<&[usize]>,
432        def_levels: Option<&[i16]>,
433        rep_levels: Option<&[i16]>,
434        min: Option<&E::T>,
435        max: Option<&E::T>,
436        distinct_count: Option<u64>,
437    ) -> Result<usize> {
438        // Check if number of definition levels is the same as number of repetition levels.
439        if let (Some(def), Some(rep)) = (def_levels, rep_levels) {
440            if def.len() != rep.len() {
441                return Err(general_err!(
442                    "Inconsistent length of definition and repetition levels: {} != {}",
443                    def.len(),
444                    rep.len()
445                ));
446            }
447        }
448
449        // We check for DataPage limits only after we have inserted the values. If a user
450        // writes a large number of values, the DataPage size can be well above the limit.
451        //
452        // The purpose of this chunking is to bound this. Even if a user writes large
453        // number of values, the chunking will ensure that we add data page at a
454        // reasonable pagesize limit.
455
456        // TODO: find out why we don't account for size of levels when we estimate page
457        // size.
458
459        let num_levels = match def_levels {
460            Some(def_levels) => def_levels.len(),
461            None => values.len(),
462        };
463
464        if let Some(min) = min {
465            update_min(&self.descr, min, &mut self.column_metrics.min_column_value);
466        }
467        if let Some(max) = max {
468            update_max(&self.descr, max, &mut self.column_metrics.max_column_value);
469        }
470
471        // We can only set the distinct count if there are no other writes
472        if self.encoder.num_values() == 0 {
473            self.column_metrics.column_distinct_count = distinct_count;
474        } else {
475            self.column_metrics.column_distinct_count = None;
476        }
477
478        let mut values_offset = 0;
479        let mut levels_offset = 0;
480        let base_batch_size = self.props.write_batch_size();
481        while levels_offset < num_levels {
482            let mut end_offset = num_levels.min(levels_offset + base_batch_size);
483
484            // Split at record boundary
485            if let Some(r) = rep_levels {
486                while end_offset < r.len() && r[end_offset] != 0 {
487                    end_offset += 1;
488                }
489            }
490
491            values_offset += self.write_mini_batch(
492                values,
493                values_offset,
494                value_indices,
495                end_offset - levels_offset,
496                def_levels.map(|lv| &lv[levels_offset..end_offset]),
497                rep_levels.map(|lv| &lv[levels_offset..end_offset]),
498            )?;
499            levels_offset = end_offset;
500        }
501
502        // Return total number of values processed.
503        Ok(values_offset)
504    }
505
506    /// Writes batch of values, definition levels and repetition levels.
507    /// Returns number of values processed (written).
508    ///
509    /// If definition and repetition levels are provided, we write fully those levels and
510    /// select how many values to write (this number will be returned), since number of
511    /// actual written values may be smaller than provided values.
512    ///
513    /// If only values are provided, then all values are written and the length of
514    /// of the values buffer is returned.
515    ///
516    /// Definition and/or repetition levels can be omitted, if values are
517    /// non-nullable and/or non-repeated.
518    pub fn write_batch(
519        &mut self,
520        values: &E::Values,
521        def_levels: Option<&[i16]>,
522        rep_levels: Option<&[i16]>,
523    ) -> Result<usize> {
524        self.write_batch_internal(values, None, def_levels, rep_levels, None, None, None)
525    }
526
527    /// Writer may optionally provide pre-calculated statistics for use when computing
528    /// chunk-level statistics
529    ///
530    /// NB: [`WriterProperties::statistics_enabled`] must be set to [`EnabledStatistics::Chunk`]
531    /// for these statistics to take effect. If [`EnabledStatistics::None`] they will be ignored,
532    /// and if [`EnabledStatistics::Page`] the chunk statistics will instead be computed from the
533    /// computed page statistics
534    pub fn write_batch_with_statistics(
535        &mut self,
536        values: &E::Values,
537        def_levels: Option<&[i16]>,
538        rep_levels: Option<&[i16]>,
539        min: Option<&E::T>,
540        max: Option<&E::T>,
541        distinct_count: Option<u64>,
542    ) -> Result<usize> {
543        self.write_batch_internal(
544            values,
545            None,
546            def_levels,
547            rep_levels,
548            min,
549            max,
550            distinct_count,
551        )
552    }
553
554    /// Returns the estimated total memory usage.
555    ///
556    /// Unlike [`Self::get_estimated_total_bytes`] this is an estimate
557    /// of the current memory usage and not the final anticipated encoded size.
558    #[cfg(feature = "arrow")]
559    pub(crate) fn memory_size(&self) -> usize {
560        self.column_metrics.total_bytes_written as usize + self.encoder.estimated_memory_size()
561    }
562
563    /// Returns total number of bytes written by this column writer so far.
564    /// This value is also returned when column writer is closed.
565    ///
566    /// Note: this value does not include any buffered data that has not
567    /// yet been flushed to a page.
568    pub fn get_total_bytes_written(&self) -> u64 {
569        self.column_metrics.total_bytes_written
570    }
571
572    /// Returns the estimated total encoded bytes for this column writer.
573    ///
574    /// Unlike [`Self::get_total_bytes_written`] this includes an estimate
575    /// of any data that has not yet been flushed to a page, based on it's
576    /// anticipated encoded size.
577    #[cfg(feature = "arrow")]
578    pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
579        self.data_pages
580            .iter()
581            .map(|page| page.data().len() as u64)
582            .sum::<u64>()
583            + self.column_metrics.total_bytes_written
584            + self.encoder.estimated_data_page_size() as u64
585            + self.encoder.estimated_dict_page_size().unwrap_or_default() as u64
586    }
587
588    /// Returns total number of rows written by this column writer so far.
589    /// This value is also returned when column writer is closed.
590    pub fn get_total_rows_written(&self) -> u64 {
591        self.column_metrics.total_rows_written
592    }
593
594    /// Returns a reference to a [`ColumnDescPtr`]
595    pub fn get_descriptor(&self) -> &ColumnDescPtr {
596        &self.descr
597    }
598
599    /// Finalizes writes and closes the column writer.
600    /// Returns total bytes written, total rows written and column chunk metadata.
601    pub fn close(mut self) -> Result<ColumnCloseResult> {
602        if self.page_metrics.num_buffered_values > 0 {
603            self.add_data_page()?;
604        }
605        if self.encoder.has_dictionary() {
606            self.write_dictionary_page()?;
607        }
608        self.flush_data_pages()?;
609        let metadata = self.build_column_metadata()?;
610        self.page_writer.close()?;
611
612        let boundary_order = match (
613            self.data_page_boundary_ascending,
614            self.data_page_boundary_descending,
615        ) {
616            // If the lists are composed of equal elements then will be marked as ascending
617            // (Also the case if all pages are null pages)
618            (true, _) => BoundaryOrder::ASCENDING,
619            (false, true) => BoundaryOrder::DESCENDING,
620            (false, false) => BoundaryOrder::UNORDERED,
621        };
622        self.column_index_builder.set_boundary_order(boundary_order);
623
624        let column_index = match self.column_index_builder.valid() {
625            true => Some(self.column_index_builder.build()?),
626            false => None,
627        };
628
629        let offset_index = self.offset_index_builder.map(|b| b.build());
630
631        Ok(ColumnCloseResult {
632            bytes_written: self.column_metrics.total_bytes_written,
633            rows_written: self.column_metrics.total_rows_written,
634            bloom_filter: self.encoder.flush_bloom_filter(),
635            metadata,
636            column_index,
637            offset_index,
638        })
639    }
640
641    /// Writes mini batch of values, definition and repetition levels.
642    /// This allows fine-grained processing of values and maintaining a reasonable
643    /// page size.
644    fn write_mini_batch(
645        &mut self,
646        values: &E::Values,
647        values_offset: usize,
648        value_indices: Option<&[usize]>,
649        num_levels: usize,
650        def_levels: Option<&[i16]>,
651        rep_levels: Option<&[i16]>,
652    ) -> Result<usize> {
653        // Process definition levels and determine how many values to write.
654        let values_to_write = if self.descr.max_def_level() > 0 {
655            let levels = def_levels.ok_or_else(|| {
656                general_err!(
657                    "Definition levels are required, because max definition level = {}",
658                    self.descr.max_def_level()
659                )
660            })?;
661
662            let values_to_write = levels
663                .iter()
664                .map(|level| (*level == self.descr.max_def_level()) as usize)
665                .sum();
666            self.page_metrics.num_page_nulls += (levels.len() - values_to_write) as u64;
667
668            // Update histogram
669            self.page_metrics.update_definition_level_histogram(levels);
670
671            self.def_levels_sink.extend_from_slice(levels);
672            values_to_write
673        } else {
674            num_levels
675        };
676
677        // Process repetition levels and determine how many rows we are about to process.
678        if self.descr.max_rep_level() > 0 {
679            // A row could contain more than one value.
680            let levels = rep_levels.ok_or_else(|| {
681                general_err!(
682                    "Repetition levels are required, because max repetition level = {}",
683                    self.descr.max_rep_level()
684                )
685            })?;
686
687            if !levels.is_empty() && levels[0] != 0 {
688                return Err(general_err!(
689                    "Write must start at a record boundary, got non-zero repetition level of {}",
690                    levels[0]
691                ));
692            }
693
694            // Count the occasions where we start a new row
695            for &level in levels {
696                self.page_metrics.num_buffered_rows += (level == 0) as u32
697            }
698
699            // Update histogram
700            self.page_metrics.update_repetition_level_histogram(levels);
701
702            self.rep_levels_sink.extend_from_slice(levels);
703        } else {
704            // Each value is exactly one row.
705            // Equals to the number of values, we count nulls as well.
706            self.page_metrics.num_buffered_rows += num_levels as u32;
707        }
708
709        match value_indices {
710            Some(indices) => {
711                let indices = &indices[values_offset..values_offset + values_to_write];
712                self.encoder.write_gather(values, indices)?;
713            }
714            None => self.encoder.write(values, values_offset, values_to_write)?,
715        }
716
717        self.page_metrics.num_buffered_values += num_levels as u32;
718
719        if self.should_add_data_page() {
720            self.add_data_page()?;
721        }
722
723        if self.should_dict_fallback() {
724            self.dict_fallback()?;
725        }
726
727        Ok(values_to_write)
728    }
729
730    /// Returns true if we need to fall back to non-dictionary encoding.
731    ///
732    /// We can only fall back if dictionary encoder is set and we have exceeded dictionary
733    /// size.
734    #[inline]
735    fn should_dict_fallback(&self) -> bool {
736        match self.encoder.estimated_dict_page_size() {
737            Some(size) => {
738                size >= self
739                    .props
740                    .column_dictionary_page_size_limit(self.descr.path())
741            }
742            None => false,
743        }
744    }
745
746    /// Returns true if there is enough data for a data page, false otherwise.
747    #[inline]
748    fn should_add_data_page(&self) -> bool {
749        // This is necessary in the event of a much larger dictionary size than page size
750        //
751        // In such a scenario the dictionary decoder may return an estimated encoded
752        // size in excess of the page size limit, even when there are no buffered values
753        if self.page_metrics.num_buffered_values == 0 {
754            return false;
755        }
756
757        self.page_metrics.num_buffered_rows as usize >= self.props.data_page_row_count_limit()
758            || self.encoder.estimated_data_page_size()
759                >= self.props.column_data_page_size_limit(self.descr.path())
760    }
761
762    /// Performs dictionary fallback.
763    /// Prepares and writes dictionary and all data pages into page writer.
764    fn dict_fallback(&mut self) -> Result<()> {
765        // At this point we know that we need to fall back.
766        if self.page_metrics.num_buffered_values > 0 {
767            self.add_data_page()?;
768        }
769        self.write_dictionary_page()?;
770        self.flush_data_pages()?;
771        Ok(())
772    }
773
774    /// Update the column index and offset index when adding the data page
775    fn update_column_offset_index(
776        &mut self,
777        page_statistics: Option<&ValueStatistics<E::T>>,
778        page_variable_length_bytes: Option<i64>,
779    ) {
780        // update the column index
781        let null_page =
782            (self.page_metrics.num_buffered_rows as u64) == self.page_metrics.num_page_nulls;
783        // a page contains only null values,
784        // and writers have to set the corresponding entries in min_values and max_values to byte[0]
785        if null_page && self.column_index_builder.valid() {
786            self.column_index_builder.append(
787                null_page,
788                vec![],
789                vec![],
790                self.page_metrics.num_page_nulls as i64,
791            );
792        } else if self.column_index_builder.valid() {
793            // from page statistics
794            // If can't get the page statistics, ignore this column/offset index for this column chunk
795            match &page_statistics {
796                None => {
797                    self.column_index_builder.to_invalid();
798                }
799                Some(stat) => {
800                    // Check if min/max are still ascending/descending across pages
801                    let new_min = stat.min_opt().unwrap();
802                    let new_max = stat.max_opt().unwrap();
803                    if let Some((last_min, last_max)) = &self.last_non_null_data_page_min_max {
804                        if self.data_page_boundary_ascending {
805                            // If last min/max are greater than new min/max then not ascending anymore
806                            let not_ascending = compare_greater(&self.descr, last_min, new_min)
807                                || compare_greater(&self.descr, last_max, new_max);
808                            if not_ascending {
809                                self.data_page_boundary_ascending = false;
810                            }
811                        }
812
813                        if self.data_page_boundary_descending {
814                            // If new min/max are greater than last min/max then not descending anymore
815                            let not_descending = compare_greater(&self.descr, new_min, last_min)
816                                || compare_greater(&self.descr, new_max, last_max);
817                            if not_descending {
818                                self.data_page_boundary_descending = false;
819                            }
820                        }
821                    }
822                    self.last_non_null_data_page_min_max = Some((new_min.clone(), new_max.clone()));
823
824                    if self.can_truncate_value() {
825                        self.column_index_builder.append(
826                            null_page,
827                            self.truncate_min_value(
828                                self.props.column_index_truncate_length(),
829                                stat.min_bytes_opt().unwrap(),
830                            )
831                            .0,
832                            self.truncate_max_value(
833                                self.props.column_index_truncate_length(),
834                                stat.max_bytes_opt().unwrap(),
835                            )
836                            .0,
837                            self.page_metrics.num_page_nulls as i64,
838                        );
839                    } else {
840                        self.column_index_builder.append(
841                            null_page,
842                            stat.min_bytes_opt().unwrap().to_vec(),
843                            stat.max_bytes_opt().unwrap().to_vec(),
844                            self.page_metrics.num_page_nulls as i64,
845                        );
846                    }
847                }
848            }
849        }
850
851        // Append page histograms to the `ColumnIndex` histograms
852        self.column_index_builder.append_histograms(
853            &self.page_metrics.repetition_level_histogram,
854            &self.page_metrics.definition_level_histogram,
855        );
856
857        // Update the offset index
858        if let Some(builder) = self.offset_index_builder.as_mut() {
859            builder.append_row_count(self.page_metrics.num_buffered_rows as i64);
860            builder.append_unencoded_byte_array_data_bytes(page_variable_length_bytes);
861        }
862    }
863
864    /// Determine if we should allow truncating min/max values for this column's statistics
865    fn can_truncate_value(&self) -> bool {
866        match self.descr.physical_type() {
867            // Don't truncate for Float16 and Decimal because their sort order is different
868            // from that of FIXED_LEN_BYTE_ARRAY sort order.
869            // So truncation of those types could lead to inaccurate min/max statistics
870            Type::FIXED_LEN_BYTE_ARRAY
871                if !matches!(
872                    self.descr.logical_type_ref(),
873                    Some(&LogicalType::Decimal { .. }) | Some(&LogicalType::Float16)
874                ) =>
875            {
876                true
877            }
878            Type::BYTE_ARRAY => true,
879            // Truncation only applies for fba/binary physical types
880            _ => false,
881        }
882    }
883
884    /// Returns `true` if this column's logical type is a UTF-8 string.
885    fn is_utf8(&self) -> bool {
886        self.get_descriptor().logical_type_ref() == Some(&LogicalType::String)
887            || self.get_descriptor().converted_type() == ConvertedType::UTF8
888    }
889
890    /// Truncates a binary statistic to at most `truncation_length` bytes.
891    ///
892    /// If truncation is not possible, returns `data`.
893    ///
894    /// The `bool` in the returned tuple indicates whether truncation occurred or not.
895    ///
896    /// UTF-8 Note:
897    /// If the column type indicates UTF-8, and `data` contains valid UTF-8, then the result will
898    /// also remain valid UTF-8, but may be less tnan `truncation_length` bytes to avoid splitting
899    /// on non-character boundaries.
900    fn truncate_min_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
901        truncation_length
902            .filter(|l| data.len() > *l)
903            .and_then(|l|
904                // don't do extra work if this column isn't UTF-8
905                if self.is_utf8() {
906                    match str::from_utf8(data) {
907                        Ok(str_data) => truncate_utf8(str_data, l),
908                        Err(_) => Some(data[..l].to_vec()),
909                    }
910                } else {
911                    Some(data[..l].to_vec())
912                }
913            )
914            .map(|truncated| (truncated, true))
915            .unwrap_or_else(|| (data.to_vec(), false))
916    }
917
918    /// Truncates a binary statistic to at most `truncation_length` bytes, and then increment the
919    /// final byte(s) to yield a valid upper bound. This may result in a result of less than
920    /// `truncation_length` bytes if the last byte(s) overflows.
921    ///
922    /// If truncation is not possible, returns `data`.
923    ///
924    /// The `bool` in the returned tuple indicates whether truncation occurred or not.
925    ///
926    /// UTF-8 Note:
927    /// If the column type indicates UTF-8, and `data` contains valid UTF-8, then the result will
928    /// also remain valid UTF-8 (but again may be less than `truncation_length` bytes). If `data`
929    /// does not contain valid UTF-8, then truncation will occur as if the column is non-string
930    /// binary.
931    fn truncate_max_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
932        truncation_length
933            .filter(|l| data.len() > *l)
934            .and_then(|l|
935                // don't do extra work if this column isn't UTF-8
936                if self.is_utf8() {
937                    match str::from_utf8(data) {
938                        Ok(str_data) => truncate_and_increment_utf8(str_data, l),
939                        Err(_) => increment(data[..l].to_vec()),
940                    }
941                } else {
942                    increment(data[..l].to_vec())
943                }
944            )
945            .map(|truncated| (truncated, true))
946            .unwrap_or_else(|| (data.to_vec(), false))
947    }
948
949    /// Truncate the min and max values that will be written to a data page
950    /// header or column chunk Statistics
951    fn truncate_statistics(&self, statistics: Statistics) -> Statistics {
952        let backwards_compatible_min_max = self.descr.sort_order().is_signed();
953        match statistics {
954            Statistics::ByteArray(stats) if stats._internal_has_min_max_set() => {
955                let (min, did_truncate_min) = self.truncate_min_value(
956                    self.props.statistics_truncate_length(),
957                    stats.min_bytes_opt().unwrap(),
958                );
959                let (max, did_truncate_max) = self.truncate_max_value(
960                    self.props.statistics_truncate_length(),
961                    stats.max_bytes_opt().unwrap(),
962                );
963                Statistics::ByteArray(
964                    ValueStatistics::new(
965                        Some(min.into()),
966                        Some(max.into()),
967                        stats.distinct_count(),
968                        stats.null_count_opt(),
969                        backwards_compatible_min_max,
970                    )
971                    .with_max_is_exact(!did_truncate_max)
972                    .with_min_is_exact(!did_truncate_min),
973                )
974            }
975            Statistics::FixedLenByteArray(stats)
976                if (stats._internal_has_min_max_set() && self.can_truncate_value()) =>
977            {
978                let (min, did_truncate_min) = self.truncate_min_value(
979                    self.props.statistics_truncate_length(),
980                    stats.min_bytes_opt().unwrap(),
981                );
982                let (max, did_truncate_max) = self.truncate_max_value(
983                    self.props.statistics_truncate_length(),
984                    stats.max_bytes_opt().unwrap(),
985                );
986                Statistics::FixedLenByteArray(
987                    ValueStatistics::new(
988                        Some(min.into()),
989                        Some(max.into()),
990                        stats.distinct_count(),
991                        stats.null_count_opt(),
992                        backwards_compatible_min_max,
993                    )
994                    .with_max_is_exact(!did_truncate_max)
995                    .with_min_is_exact(!did_truncate_min),
996                )
997            }
998            stats => stats,
999        }
1000    }
1001
1002    /// Adds data page.
1003    /// Data page is either buffered in case of dictionary encoding or written directly.
1004    fn add_data_page(&mut self) -> Result<()> {
1005        // Extract encoded values
1006        let values_data = self.encoder.flush_data_page()?;
1007
1008        let max_def_level = self.descr.max_def_level();
1009        let max_rep_level = self.descr.max_rep_level();
1010
1011        self.column_metrics.num_column_nulls += self.page_metrics.num_page_nulls;
1012
1013        let page_statistics = match (values_data.min_value, values_data.max_value) {
1014            (Some(min), Some(max)) => {
1015                // Update chunk level statistics
1016                update_min(&self.descr, &min, &mut self.column_metrics.min_column_value);
1017                update_max(&self.descr, &max, &mut self.column_metrics.max_column_value);
1018
1019                (self.statistics_enabled == EnabledStatistics::Page).then_some(
1020                    ValueStatistics::new(
1021                        Some(min),
1022                        Some(max),
1023                        None,
1024                        Some(self.page_metrics.num_page_nulls),
1025                        false,
1026                    ),
1027                )
1028            }
1029            _ => None,
1030        };
1031
1032        // update column and offset index
1033        self.update_column_offset_index(
1034            page_statistics.as_ref(),
1035            values_data.variable_length_bytes,
1036        );
1037
1038        // Update histograms and variable_length_bytes in column_metrics
1039        self.column_metrics
1040            .update_from_page_metrics(&self.page_metrics);
1041        self.column_metrics
1042            .update_variable_length_bytes(values_data.variable_length_bytes);
1043
1044        // From here on, we only need page statistics if they will be written to the page header.
1045        let page_statistics = page_statistics
1046            .filter(|_| self.props.write_page_header_statistics(self.descr.path()))
1047            .map(|stats| self.truncate_statistics(Statistics::from(stats)));
1048
1049        let compressed_page = match self.props.writer_version() {
1050            WriterVersion::PARQUET_1_0 => {
1051                let mut buffer = vec![];
1052
1053                if max_rep_level > 0 {
1054                    buffer.extend_from_slice(
1055                        &self.encode_levels_v1(
1056                            Encoding::RLE,
1057                            &self.rep_levels_sink[..],
1058                            max_rep_level,
1059                        )[..],
1060                    );
1061                }
1062
1063                if max_def_level > 0 {
1064                    buffer.extend_from_slice(
1065                        &self.encode_levels_v1(
1066                            Encoding::RLE,
1067                            &self.def_levels_sink[..],
1068                            max_def_level,
1069                        )[..],
1070                    );
1071                }
1072
1073                buffer.extend_from_slice(&values_data.buf);
1074                let uncompressed_size = buffer.len();
1075
1076                if let Some(ref mut cmpr) = self.compressor {
1077                    let mut compressed_buf = Vec::with_capacity(uncompressed_size);
1078                    cmpr.compress(&buffer[..], &mut compressed_buf)?;
1079                    compressed_buf.shrink_to_fit();
1080                    buffer = compressed_buf;
1081                }
1082
1083                let data_page = Page::DataPage {
1084                    buf: buffer.into(),
1085                    num_values: self.page_metrics.num_buffered_values,
1086                    encoding: values_data.encoding,
1087                    def_level_encoding: Encoding::RLE,
1088                    rep_level_encoding: Encoding::RLE,
1089                    statistics: page_statistics,
1090                };
1091
1092                CompressedPage::new(data_page, uncompressed_size)
1093            }
1094            WriterVersion::PARQUET_2_0 => {
1095                let mut rep_levels_byte_len = 0;
1096                let mut def_levels_byte_len = 0;
1097                let mut buffer = vec![];
1098
1099                if max_rep_level > 0 {
1100                    let levels = self.encode_levels_v2(&self.rep_levels_sink[..], max_rep_level);
1101                    rep_levels_byte_len = levels.len();
1102                    buffer.extend_from_slice(&levels[..]);
1103                }
1104
1105                if max_def_level > 0 {
1106                    let levels = self.encode_levels_v2(&self.def_levels_sink[..], max_def_level);
1107                    def_levels_byte_len = levels.len();
1108                    buffer.extend_from_slice(&levels[..]);
1109                }
1110
1111                let uncompressed_size =
1112                    rep_levels_byte_len + def_levels_byte_len + values_data.buf.len();
1113
1114                // Data Page v2 compresses values only.
1115                let is_compressed = match self.compressor {
1116                    Some(ref mut cmpr) => {
1117                        let buffer_len = buffer.len();
1118                        cmpr.compress(&values_data.buf, &mut buffer)?;
1119                        if uncompressed_size <= buffer.len() - buffer_len {
1120                            buffer.truncate(buffer_len);
1121                            buffer.extend_from_slice(&values_data.buf);
1122                            false
1123                        } else {
1124                            true
1125                        }
1126                    }
1127                    None => {
1128                        buffer.extend_from_slice(&values_data.buf);
1129                        false
1130                    }
1131                };
1132
1133                let data_page = Page::DataPageV2 {
1134                    buf: buffer.into(),
1135                    num_values: self.page_metrics.num_buffered_values,
1136                    encoding: values_data.encoding,
1137                    num_nulls: self.page_metrics.num_page_nulls as u32,
1138                    num_rows: self.page_metrics.num_buffered_rows,
1139                    def_levels_byte_len: def_levels_byte_len as u32,
1140                    rep_levels_byte_len: rep_levels_byte_len as u32,
1141                    is_compressed,
1142                    statistics: page_statistics,
1143                };
1144
1145                CompressedPage::new(data_page, uncompressed_size)
1146            }
1147        };
1148
1149        // Check if we need to buffer data page or flush it to the sink directly.
1150        if self.encoder.has_dictionary() {
1151            self.data_pages.push_back(compressed_page);
1152        } else {
1153            self.write_data_page(compressed_page)?;
1154        }
1155
1156        // Update total number of rows.
1157        self.column_metrics.total_rows_written += self.page_metrics.num_buffered_rows as u64;
1158
1159        // Reset state.
1160        self.rep_levels_sink.clear();
1161        self.def_levels_sink.clear();
1162        self.page_metrics.new_page();
1163
1164        Ok(())
1165    }
1166
1167    /// Finalises any outstanding data pages and flushes buffered data pages from
1168    /// dictionary encoding into underlying sink.
1169    #[inline]
1170    fn flush_data_pages(&mut self) -> Result<()> {
1171        // Write all outstanding data to a new page.
1172        if self.page_metrics.num_buffered_values > 0 {
1173            self.add_data_page()?;
1174        }
1175
1176        while let Some(page) = self.data_pages.pop_front() {
1177            self.write_data_page(page)?;
1178        }
1179
1180        Ok(())
1181    }
1182
1183    /// Assembles column chunk metadata.
1184    fn build_column_metadata(&mut self) -> Result<ColumnChunkMetaData> {
1185        let total_compressed_size = self.column_metrics.total_compressed_size as i64;
1186        let total_uncompressed_size = self.column_metrics.total_uncompressed_size as i64;
1187        let num_values = self.column_metrics.total_num_values as i64;
1188        let dict_page_offset = self.column_metrics.dictionary_page_offset.map(|v| v as i64);
1189        // If data page offset is not set, then no pages have been written
1190        let data_page_offset = self.column_metrics.data_page_offset.unwrap_or(0) as i64;
1191
1192        let mut builder = ColumnChunkMetaData::builder(self.descr.clone())
1193            .set_compression(self.codec)
1194            .set_encodings_mask(EncodingMask::new_from_encodings(self.encodings.iter()))
1195            .set_page_encoding_stats(self.encoding_stats.clone())
1196            .set_total_compressed_size(total_compressed_size)
1197            .set_total_uncompressed_size(total_uncompressed_size)
1198            .set_num_values(num_values)
1199            .set_data_page_offset(data_page_offset)
1200            .set_dictionary_page_offset(dict_page_offset);
1201
1202        if self.statistics_enabled != EnabledStatistics::None {
1203            let backwards_compatible_min_max = self.descr.sort_order().is_signed();
1204
1205            let statistics = ValueStatistics::<E::T>::new(
1206                self.column_metrics.min_column_value.clone(),
1207                self.column_metrics.max_column_value.clone(),
1208                self.column_metrics.column_distinct_count,
1209                Some(self.column_metrics.num_column_nulls),
1210                false,
1211            )
1212            .with_backwards_compatible_min_max(backwards_compatible_min_max)
1213            .into();
1214
1215            let statistics = self.truncate_statistics(statistics);
1216
1217            builder = builder
1218                .set_statistics(statistics)
1219                .set_unencoded_byte_array_data_bytes(self.column_metrics.variable_length_bytes)
1220                .set_repetition_level_histogram(
1221                    self.column_metrics.repetition_level_histogram.take(),
1222                )
1223                .set_definition_level_histogram(
1224                    self.column_metrics.definition_level_histogram.take(),
1225                );
1226
1227            if let Some(geo_stats) = self.encoder.flush_geospatial_statistics() {
1228                builder = builder.set_geo_statistics(geo_stats);
1229            }
1230        }
1231
1232        builder = self.set_column_chunk_encryption_properties(builder);
1233
1234        let metadata = builder.build()?;
1235        Ok(metadata)
1236    }
1237
1238    /// Encodes definition or repetition levels for Data Page v1.
1239    #[inline]
1240    fn encode_levels_v1(&self, encoding: Encoding, levels: &[i16], max_level: i16) -> Vec<u8> {
1241        let mut encoder = LevelEncoder::v1(encoding, max_level, levels.len());
1242        encoder.put(levels);
1243        encoder.consume()
1244    }
1245
1246    /// Encodes definition or repetition levels for Data Page v2.
1247    /// Encoding is always RLE.
1248    #[inline]
1249    fn encode_levels_v2(&self, levels: &[i16], max_level: i16) -> Vec<u8> {
1250        let mut encoder = LevelEncoder::v2(max_level, levels.len());
1251        encoder.put(levels);
1252        encoder.consume()
1253    }
1254
1255    /// Writes compressed data page into underlying sink and updates global metrics.
1256    #[inline]
1257    fn write_data_page(&mut self, page: CompressedPage) -> Result<()> {
1258        self.encodings.insert(page.encoding());
1259        match self.encoding_stats.last_mut() {
1260            Some(encoding_stats)
1261                if encoding_stats.page_type == page.page_type()
1262                    && encoding_stats.encoding == page.encoding() =>
1263            {
1264                encoding_stats.count += 1;
1265            }
1266            _ => {
1267                // data page type does not change inside a file
1268                // encoding can currently only change from dictionary to non-dictionary once
1269                self.encoding_stats.push(PageEncodingStats {
1270                    page_type: page.page_type(),
1271                    encoding: page.encoding(),
1272                    count: 1,
1273                });
1274            }
1275        }
1276        let page_spec = self.page_writer.write_page(page)?;
1277        // update offset index
1278        // compressed_size = header_size + compressed_data_size
1279        if let Some(builder) = self.offset_index_builder.as_mut() {
1280            builder
1281                .append_offset_and_size(page_spec.offset as i64, page_spec.compressed_size as i32)
1282        }
1283        self.update_metrics_for_page(page_spec);
1284        Ok(())
1285    }
1286
1287    /// Writes dictionary page into underlying sink.
1288    #[inline]
1289    fn write_dictionary_page(&mut self) -> Result<()> {
1290        let compressed_page = {
1291            let mut page = self
1292                .encoder
1293                .flush_dict_page()?
1294                .ok_or_else(|| general_err!("Dictionary encoder is not set"))?;
1295
1296            let uncompressed_size = page.buf.len();
1297
1298            if let Some(ref mut cmpr) = self.compressor {
1299                let mut output_buf = Vec::with_capacity(uncompressed_size);
1300                cmpr.compress(&page.buf, &mut output_buf)?;
1301                page.buf = Bytes::from(output_buf);
1302            }
1303
1304            let dict_page = Page::DictionaryPage {
1305                buf: page.buf,
1306                num_values: page.num_values as u32,
1307                encoding: self.props.dictionary_page_encoding(),
1308                is_sorted: page.is_sorted,
1309            };
1310            CompressedPage::new(dict_page, uncompressed_size)
1311        };
1312
1313        self.encodings.insert(compressed_page.encoding());
1314        self.encoding_stats.push(PageEncodingStats {
1315            page_type: PageType::DICTIONARY_PAGE,
1316            encoding: compressed_page.encoding(),
1317            count: 1,
1318        });
1319        let page_spec = self.page_writer.write_page(compressed_page)?;
1320        self.update_metrics_for_page(page_spec);
1321        // For the directory page, don't need to update column/offset index.
1322        Ok(())
1323    }
1324
1325    /// Updates column writer metrics with each page metadata.
1326    #[inline]
1327    fn update_metrics_for_page(&mut self, page_spec: PageWriteSpec) {
1328        self.column_metrics.total_uncompressed_size += page_spec.uncompressed_size as u64;
1329        self.column_metrics.total_compressed_size += page_spec.compressed_size as u64;
1330        self.column_metrics.total_bytes_written += page_spec.bytes_written;
1331
1332        match page_spec.page_type {
1333            PageType::DATA_PAGE | PageType::DATA_PAGE_V2 => {
1334                self.column_metrics.total_num_values += page_spec.num_values as u64;
1335                if self.column_metrics.data_page_offset.is_none() {
1336                    self.column_metrics.data_page_offset = Some(page_spec.offset);
1337                }
1338            }
1339            PageType::DICTIONARY_PAGE => {
1340                assert!(
1341                    self.column_metrics.dictionary_page_offset.is_none(),
1342                    "Dictionary offset is already set"
1343                );
1344                self.column_metrics.dictionary_page_offset = Some(page_spec.offset);
1345            }
1346            _ => {}
1347        }
1348    }
1349
1350    #[inline]
1351    #[cfg(feature = "encryption")]
1352    fn set_column_chunk_encryption_properties(
1353        &self,
1354        builder: ColumnChunkMetaDataBuilder,
1355    ) -> ColumnChunkMetaDataBuilder {
1356        if let Some(encryption_properties) = self.props.file_encryption_properties.as_ref() {
1357            builder.set_column_crypto_metadata(get_column_crypto_metadata(
1358                encryption_properties,
1359                &self.descr,
1360            ))
1361        } else {
1362            builder
1363        }
1364    }
1365
1366    #[inline]
1367    #[cfg(not(feature = "encryption"))]
1368    fn set_column_chunk_encryption_properties(
1369        &self,
1370        builder: ColumnChunkMetaDataBuilder,
1371    ) -> ColumnChunkMetaDataBuilder {
1372        builder
1373    }
1374}
1375
1376fn update_min<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, min: &mut Option<T>) {
1377    update_stat::<T, _>(descr, val, min, |cur| compare_greater(descr, cur, val))
1378}
1379
1380fn update_max<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, max: &mut Option<T>) {
1381    update_stat::<T, _>(descr, val, max, |cur| compare_greater(descr, val, cur))
1382}
1383
1384#[inline]
1385#[allow(clippy::eq_op)]
1386fn is_nan<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T) -> bool {
1387    match T::PHYSICAL_TYPE {
1388        Type::FLOAT | Type::DOUBLE => val != val,
1389        Type::FIXED_LEN_BYTE_ARRAY if descr.logical_type_ref() == Some(&LogicalType::Float16) => {
1390            let val = val.as_bytes();
1391            let val = f16::from_le_bytes([val[0], val[1]]);
1392            val.is_nan()
1393        }
1394        _ => false,
1395    }
1396}
1397
1398/// Perform a conditional update of `cur`, skipping any NaN values
1399///
1400/// If `cur` is `None`, sets `cur` to `Some(val)`, otherwise calls `should_update` with
1401/// the value of `cur`, and updates `cur` to `Some(val)` if it returns `true`
1402fn update_stat<T: ParquetValueType, F>(
1403    descr: &ColumnDescriptor,
1404    val: &T,
1405    cur: &mut Option<T>,
1406    should_update: F,
1407) where
1408    F: Fn(&T) -> bool,
1409{
1410    if is_nan(descr, val) {
1411        return;
1412    }
1413
1414    if cur.as_ref().is_none_or(should_update) {
1415        *cur = Some(val.clone());
1416    }
1417}
1418
1419/// Evaluate `a > b` according to underlying logical type.
1420fn compare_greater<T: ParquetValueType>(descr: &ColumnDescriptor, a: &T, b: &T) -> bool {
1421    match T::PHYSICAL_TYPE {
1422        Type::INT32 | Type::INT64 => {
1423            if let Some(LogicalType::Integer {
1424                is_signed: false, ..
1425            }) = descr.logical_type_ref()
1426            {
1427                // need to compare unsigned
1428                return compare_greater_unsigned_int(a, b);
1429            }
1430
1431            match descr.converted_type() {
1432                ConvertedType::UINT_8
1433                | ConvertedType::UINT_16
1434                | ConvertedType::UINT_32
1435                | ConvertedType::UINT_64 => {
1436                    return compare_greater_unsigned_int(a, b);
1437                }
1438                _ => {}
1439            };
1440        }
1441        Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
1442            if let Some(LogicalType::Decimal { .. }) = descr.logical_type_ref() {
1443                return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1444            }
1445            if let ConvertedType::DECIMAL = descr.converted_type() {
1446                return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1447            }
1448            if let Some(LogicalType::Float16) = descr.logical_type_ref() {
1449                return compare_greater_f16(a.as_bytes(), b.as_bytes());
1450            }
1451        }
1452
1453        _ => {}
1454    }
1455
1456    // compare independent of logical / converted type
1457    a > b
1458}
1459
1460// ----------------------------------------------------------------------
1461// Encoding support for column writer.
1462// This mirrors parquet-mr default encodings for writes. See:
1463// https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV1ValuesWriterFactory.java
1464// https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV2ValuesWriterFactory.java
1465
1466/// Returns encoding for a column when no other encoding is provided in writer properties.
1467fn fallback_encoding(kind: Type, props: &WriterProperties) -> Encoding {
1468    match (kind, props.writer_version()) {
1469        (Type::BOOLEAN, WriterVersion::PARQUET_2_0) => Encoding::RLE,
1470        (Type::INT32, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1471        (Type::INT64, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1472        (Type::BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1473        (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1474        _ => Encoding::PLAIN,
1475    }
1476}
1477
1478/// Returns true if dictionary is supported for column writer, false otherwise.
1479fn has_dictionary_support(kind: Type, props: &WriterProperties) -> bool {
1480    match (kind, props.writer_version()) {
1481        // Booleans do not support dict encoding and should use a fallback encoding.
1482        (Type::BOOLEAN, _) => false,
1483        // Dictionary encoding was not enabled in PARQUET 1.0
1484        (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_1_0) => false,
1485        (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => true,
1486        _ => true,
1487    }
1488}
1489
1490#[inline]
1491fn compare_greater_unsigned_int<T: ParquetValueType>(a: &T, b: &T) -> bool {
1492    a.as_u64().unwrap() > b.as_u64().unwrap()
1493}
1494
1495#[inline]
1496fn compare_greater_f16(a: &[u8], b: &[u8]) -> bool {
1497    let a = f16::from_le_bytes(a.try_into().unwrap());
1498    let b = f16::from_le_bytes(b.try_into().unwrap());
1499    a > b
1500}
1501
1502/// Signed comparison of bytes arrays
1503fn compare_greater_byte_array_decimals(a: &[u8], b: &[u8]) -> bool {
1504    let a_length = a.len();
1505    let b_length = b.len();
1506
1507    if a_length == 0 || b_length == 0 {
1508        return a_length > 0;
1509    }
1510
1511    let first_a: u8 = a[0];
1512    let first_b: u8 = b[0];
1513
1514    // We can short circuit for different signed numbers or
1515    // for equal length bytes arrays that have different first bytes.
1516    // The equality requirement is necessary for sign extension cases.
1517    // 0xFF10 should be equal to 0x10 (due to big endian sign extension).
1518    if (0x80 & first_a) != (0x80 & first_b) || (a_length == b_length && first_a != first_b) {
1519        return (first_a as i8) > (first_b as i8);
1520    }
1521
1522    // When the lengths are unequal and the numbers are of the same
1523    // sign we need to do comparison by sign extending the shorter
1524    // value first, and once we get to equal sized arrays, lexicographical
1525    // unsigned comparison of everything but the first byte is sufficient.
1526
1527    let extension: u8 = if (first_a as i8) < 0 { 0xFF } else { 0 };
1528
1529    if a_length != b_length {
1530        let not_equal = if a_length > b_length {
1531            let lead_length = a_length - b_length;
1532            a[0..lead_length].iter().any(|&x| x != extension)
1533        } else {
1534            let lead_length = b_length - a_length;
1535            b[0..lead_length].iter().any(|&x| x != extension)
1536        };
1537
1538        if not_equal {
1539            let negative_values: bool = (first_a as i8) < 0;
1540            let a_longer: bool = a_length > b_length;
1541            return if negative_values { !a_longer } else { a_longer };
1542        }
1543    }
1544
1545    (a[1..]) > (b[1..])
1546}
1547
1548/// Truncate a UTF-8 slice to the longest prefix that is still a valid UTF-8 string,
1549/// while being less than `length` bytes and non-empty. Returns `None` if truncation
1550/// is not possible within those constraints.
1551///
1552/// The caller guarantees that data.len() > length.
1553fn truncate_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1554    let split = (1..=length).rfind(|x| data.is_char_boundary(*x))?;
1555    Some(data.as_bytes()[..split].to_vec())
1556}
1557
1558/// Truncate a UTF-8 slice and increment it's final character. The returned value is the
1559/// longest such slice that is still a valid UTF-8 string while being less than `length`
1560/// bytes and non-empty. Returns `None` if no such transformation is possible.
1561///
1562/// The caller guarantees that data.len() > length.
1563fn truncate_and_increment_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1564    // UTF-8 is max 4 bytes, so start search 3 back from desired length
1565    let lower_bound = length.saturating_sub(3);
1566    let split = (lower_bound..=length).rfind(|x| data.is_char_boundary(*x))?;
1567    increment_utf8(data.get(..split)?)
1568}
1569
1570/// Increment the final character in a UTF-8 string in such a way that the returned result
1571/// is still a valid UTF-8 string. The returned string may be shorter than the input if the
1572/// last character(s) cannot be incremented (due to overflow or producing invalid code points).
1573/// Returns `None` if the string cannot be incremented.
1574///
1575/// Note that this implementation will not promote an N-byte code point to (N+1) bytes.
1576fn increment_utf8(data: &str) -> Option<Vec<u8>> {
1577    for (idx, original_char) in data.char_indices().rev() {
1578        let original_len = original_char.len_utf8();
1579        if let Some(next_char) = char::from_u32(original_char as u32 + 1) {
1580            // do not allow increasing byte width of incremented char
1581            if next_char.len_utf8() == original_len {
1582                let mut result = data.as_bytes()[..idx + original_len].to_vec();
1583                next_char.encode_utf8(&mut result[idx..]);
1584                return Some(result);
1585            }
1586        }
1587    }
1588
1589    None
1590}
1591
1592/// Try and increment the bytes from right to left.
1593///
1594/// Returns `None` if all bytes are set to `u8::MAX`.
1595fn increment(mut data: Vec<u8>) -> Option<Vec<u8>> {
1596    for byte in data.iter_mut().rev() {
1597        let (incremented, overflow) = byte.overflowing_add(1);
1598        *byte = incremented;
1599
1600        if !overflow {
1601            return Some(data);
1602        }
1603    }
1604
1605    None
1606}
1607
1608#[cfg(test)]
1609mod tests {
1610    use crate::{
1611        file::{properties::DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, writer::SerializedFileWriter},
1612        schema::parser::parse_message_type,
1613    };
1614    use core::str;
1615    use rand::distr::uniform::SampleUniform;
1616    use std::{fs::File, sync::Arc};
1617
1618    use crate::column::{
1619        page::PageReader,
1620        reader::{ColumnReaderImpl, get_column_reader, get_typed_column_reader},
1621    };
1622    use crate::file::writer::TrackedWrite;
1623    use crate::file::{
1624        properties::ReaderProperties, reader::SerializedPageReader, writer::SerializedPageWriter,
1625    };
1626    use crate::schema::types::{ColumnPath, Type as SchemaType};
1627    use crate::util::test_common::rand_gen::random_numbers_range;
1628
1629    use super::*;
1630
1631    #[test]
1632    fn test_column_writer_inconsistent_def_rep_length() {
1633        let page_writer = get_test_page_writer();
1634        let props = Default::default();
1635        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 1, props);
1636        let res = writer.write_batch(&[1, 2, 3, 4], Some(&[1, 1, 1]), Some(&[0, 0]));
1637        assert!(res.is_err());
1638        if let Err(err) = res {
1639            assert_eq!(
1640                format!("{err}"),
1641                "Parquet error: Inconsistent length of definition and repetition levels: 3 != 2"
1642            );
1643        }
1644    }
1645
1646    #[test]
1647    fn test_column_writer_invalid_def_levels() {
1648        let page_writer = get_test_page_writer();
1649        let props = Default::default();
1650        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1651        let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1652        assert!(res.is_err());
1653        if let Err(err) = res {
1654            assert_eq!(
1655                format!("{err}"),
1656                "Parquet error: Definition levels are required, because max definition level = 1"
1657            );
1658        }
1659    }
1660
1661    #[test]
1662    fn test_column_writer_invalid_rep_levels() {
1663        let page_writer = get_test_page_writer();
1664        let props = Default::default();
1665        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 1, props);
1666        let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1667        assert!(res.is_err());
1668        if let Err(err) = res {
1669            assert_eq!(
1670                format!("{err}"),
1671                "Parquet error: Repetition levels are required, because max repetition level = 1"
1672            );
1673        }
1674    }
1675
1676    #[test]
1677    fn test_column_writer_not_enough_values_to_write() {
1678        let page_writer = get_test_page_writer();
1679        let props = Default::default();
1680        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1681        let res = writer.write_batch(&[1, 2], Some(&[1, 1, 1, 1]), None);
1682        assert!(res.is_err());
1683        if let Err(err) = res {
1684            assert_eq!(
1685                format!("{err}"),
1686                "Parquet error: Expected to write 4 values, but have only 2"
1687            );
1688        }
1689    }
1690
1691    #[test]
1692    fn test_column_writer_write_only_one_dictionary_page() {
1693        let page_writer = get_test_page_writer();
1694        let props = Default::default();
1695        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1696        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1697        // First page should be correctly written.
1698        writer.add_data_page().unwrap();
1699        writer.write_dictionary_page().unwrap();
1700        let err = writer.write_dictionary_page().unwrap_err().to_string();
1701        assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1702    }
1703
1704    #[test]
1705    fn test_column_writer_error_when_writing_disabled_dictionary() {
1706        let page_writer = get_test_page_writer();
1707        let props = Arc::new(
1708            WriterProperties::builder()
1709                .set_dictionary_enabled(false)
1710                .build(),
1711        );
1712        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1713        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1714        let err = writer.write_dictionary_page().unwrap_err().to_string();
1715        assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1716    }
1717
1718    #[test]
1719    fn test_column_writer_boolean_type_does_not_support_dictionary() {
1720        let page_writer = get_test_page_writer();
1721        let props = Arc::new(
1722            WriterProperties::builder()
1723                .set_dictionary_enabled(true)
1724                .build(),
1725        );
1726        let mut writer = get_test_column_writer::<BoolType>(page_writer, 0, 0, props);
1727        writer
1728            .write_batch(&[true, false, true, false], None, None)
1729            .unwrap();
1730
1731        let r = writer.close().unwrap();
1732        // PlainEncoder uses bit writer to write boolean values, which all fit into 1
1733        // byte.
1734        assert_eq!(r.bytes_written, 1);
1735        assert_eq!(r.rows_written, 4);
1736
1737        let metadata = r.metadata;
1738        assert_eq!(
1739            metadata.encodings().collect::<Vec<_>>(),
1740            vec![Encoding::PLAIN, Encoding::RLE]
1741        );
1742        assert_eq!(metadata.num_values(), 4); // just values
1743        assert_eq!(metadata.dictionary_page_offset(), None);
1744    }
1745
1746    #[test]
1747    fn test_column_writer_default_encoding_support_bool() {
1748        check_encoding_write_support::<BoolType>(
1749            WriterVersion::PARQUET_1_0,
1750            true,
1751            &[true, false],
1752            None,
1753            &[Encoding::PLAIN, Encoding::RLE],
1754            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1755        );
1756        check_encoding_write_support::<BoolType>(
1757            WriterVersion::PARQUET_1_0,
1758            false,
1759            &[true, false],
1760            None,
1761            &[Encoding::PLAIN, Encoding::RLE],
1762            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1763        );
1764        check_encoding_write_support::<BoolType>(
1765            WriterVersion::PARQUET_2_0,
1766            true,
1767            &[true, false],
1768            None,
1769            &[Encoding::RLE],
1770            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE, 1)],
1771        );
1772        check_encoding_write_support::<BoolType>(
1773            WriterVersion::PARQUET_2_0,
1774            false,
1775            &[true, false],
1776            None,
1777            &[Encoding::RLE],
1778            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE, 1)],
1779        );
1780    }
1781
1782    #[test]
1783    fn test_column_writer_default_encoding_support_int32() {
1784        check_encoding_write_support::<Int32Type>(
1785            WriterVersion::PARQUET_1_0,
1786            true,
1787            &[1, 2],
1788            Some(0),
1789            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1790            &[
1791                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1792                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1793            ],
1794        );
1795        check_encoding_write_support::<Int32Type>(
1796            WriterVersion::PARQUET_1_0,
1797            false,
1798            &[1, 2],
1799            None,
1800            &[Encoding::PLAIN, Encoding::RLE],
1801            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1802        );
1803        check_encoding_write_support::<Int32Type>(
1804            WriterVersion::PARQUET_2_0,
1805            true,
1806            &[1, 2],
1807            Some(0),
1808            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1809            &[
1810                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1811                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1812            ],
1813        );
1814        check_encoding_write_support::<Int32Type>(
1815            WriterVersion::PARQUET_2_0,
1816            false,
1817            &[1, 2],
1818            None,
1819            &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
1820            &[encoding_stats(
1821                PageType::DATA_PAGE_V2,
1822                Encoding::DELTA_BINARY_PACKED,
1823                1,
1824            )],
1825        );
1826    }
1827
1828    #[test]
1829    fn test_column_writer_default_encoding_support_int64() {
1830        check_encoding_write_support::<Int64Type>(
1831            WriterVersion::PARQUET_1_0,
1832            true,
1833            &[1, 2],
1834            Some(0),
1835            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1836            &[
1837                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1838                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1839            ],
1840        );
1841        check_encoding_write_support::<Int64Type>(
1842            WriterVersion::PARQUET_1_0,
1843            false,
1844            &[1, 2],
1845            None,
1846            &[Encoding::PLAIN, Encoding::RLE],
1847            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1848        );
1849        check_encoding_write_support::<Int64Type>(
1850            WriterVersion::PARQUET_2_0,
1851            true,
1852            &[1, 2],
1853            Some(0),
1854            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1855            &[
1856                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1857                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1858            ],
1859        );
1860        check_encoding_write_support::<Int64Type>(
1861            WriterVersion::PARQUET_2_0,
1862            false,
1863            &[1, 2],
1864            None,
1865            &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
1866            &[encoding_stats(
1867                PageType::DATA_PAGE_V2,
1868                Encoding::DELTA_BINARY_PACKED,
1869                1,
1870            )],
1871        );
1872    }
1873
1874    #[test]
1875    fn test_column_writer_default_encoding_support_int96() {
1876        check_encoding_write_support::<Int96Type>(
1877            WriterVersion::PARQUET_1_0,
1878            true,
1879            &[Int96::from(vec![1, 2, 3])],
1880            Some(0),
1881            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1882            &[
1883                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1884                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1885            ],
1886        );
1887        check_encoding_write_support::<Int96Type>(
1888            WriterVersion::PARQUET_1_0,
1889            false,
1890            &[Int96::from(vec![1, 2, 3])],
1891            None,
1892            &[Encoding::PLAIN, Encoding::RLE],
1893            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1894        );
1895        check_encoding_write_support::<Int96Type>(
1896            WriterVersion::PARQUET_2_0,
1897            true,
1898            &[Int96::from(vec![1, 2, 3])],
1899            Some(0),
1900            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1901            &[
1902                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1903                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1904            ],
1905        );
1906        check_encoding_write_support::<Int96Type>(
1907            WriterVersion::PARQUET_2_0,
1908            false,
1909            &[Int96::from(vec![1, 2, 3])],
1910            None,
1911            &[Encoding::PLAIN, Encoding::RLE],
1912            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1913        );
1914    }
1915
1916    #[test]
1917    fn test_column_writer_default_encoding_support_float() {
1918        check_encoding_write_support::<FloatType>(
1919            WriterVersion::PARQUET_1_0,
1920            true,
1921            &[1.0, 2.0],
1922            Some(0),
1923            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1924            &[
1925                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1926                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1927            ],
1928        );
1929        check_encoding_write_support::<FloatType>(
1930            WriterVersion::PARQUET_1_0,
1931            false,
1932            &[1.0, 2.0],
1933            None,
1934            &[Encoding::PLAIN, Encoding::RLE],
1935            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1936        );
1937        check_encoding_write_support::<FloatType>(
1938            WriterVersion::PARQUET_2_0,
1939            true,
1940            &[1.0, 2.0],
1941            Some(0),
1942            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1943            &[
1944                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1945                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1946            ],
1947        );
1948        check_encoding_write_support::<FloatType>(
1949            WriterVersion::PARQUET_2_0,
1950            false,
1951            &[1.0, 2.0],
1952            None,
1953            &[Encoding::PLAIN, Encoding::RLE],
1954            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1955        );
1956    }
1957
1958    #[test]
1959    fn test_column_writer_default_encoding_support_double() {
1960        check_encoding_write_support::<DoubleType>(
1961            WriterVersion::PARQUET_1_0,
1962            true,
1963            &[1.0, 2.0],
1964            Some(0),
1965            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1966            &[
1967                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1968                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1969            ],
1970        );
1971        check_encoding_write_support::<DoubleType>(
1972            WriterVersion::PARQUET_1_0,
1973            false,
1974            &[1.0, 2.0],
1975            None,
1976            &[Encoding::PLAIN, Encoding::RLE],
1977            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1978        );
1979        check_encoding_write_support::<DoubleType>(
1980            WriterVersion::PARQUET_2_0,
1981            true,
1982            &[1.0, 2.0],
1983            Some(0),
1984            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1985            &[
1986                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1987                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1988            ],
1989        );
1990        check_encoding_write_support::<DoubleType>(
1991            WriterVersion::PARQUET_2_0,
1992            false,
1993            &[1.0, 2.0],
1994            None,
1995            &[Encoding::PLAIN, Encoding::RLE],
1996            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1997        );
1998    }
1999
2000    #[test]
2001    fn test_column_writer_default_encoding_support_byte_array() {
2002        check_encoding_write_support::<ByteArrayType>(
2003            WriterVersion::PARQUET_1_0,
2004            true,
2005            &[ByteArray::from(vec![1u8])],
2006            Some(0),
2007            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2008            &[
2009                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2010                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
2011            ],
2012        );
2013        check_encoding_write_support::<ByteArrayType>(
2014            WriterVersion::PARQUET_1_0,
2015            false,
2016            &[ByteArray::from(vec![1u8])],
2017            None,
2018            &[Encoding::PLAIN, Encoding::RLE],
2019            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2020        );
2021        check_encoding_write_support::<ByteArrayType>(
2022            WriterVersion::PARQUET_2_0,
2023            true,
2024            &[ByteArray::from(vec![1u8])],
2025            Some(0),
2026            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2027            &[
2028                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2029                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2030            ],
2031        );
2032        check_encoding_write_support::<ByteArrayType>(
2033            WriterVersion::PARQUET_2_0,
2034            false,
2035            &[ByteArray::from(vec![1u8])],
2036            None,
2037            &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
2038            &[encoding_stats(
2039                PageType::DATA_PAGE_V2,
2040                Encoding::DELTA_BYTE_ARRAY,
2041                1,
2042            )],
2043        );
2044    }
2045
2046    #[test]
2047    fn test_column_writer_default_encoding_support_fixed_len_byte_array() {
2048        check_encoding_write_support::<FixedLenByteArrayType>(
2049            WriterVersion::PARQUET_1_0,
2050            true,
2051            &[ByteArray::from(vec![1u8]).into()],
2052            None,
2053            &[Encoding::PLAIN, Encoding::RLE],
2054            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2055        );
2056        check_encoding_write_support::<FixedLenByteArrayType>(
2057            WriterVersion::PARQUET_1_0,
2058            false,
2059            &[ByteArray::from(vec![1u8]).into()],
2060            None,
2061            &[Encoding::PLAIN, Encoding::RLE],
2062            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2063        );
2064        check_encoding_write_support::<FixedLenByteArrayType>(
2065            WriterVersion::PARQUET_2_0,
2066            true,
2067            &[ByteArray::from(vec![1u8]).into()],
2068            Some(0),
2069            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2070            &[
2071                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2072                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2073            ],
2074        );
2075        check_encoding_write_support::<FixedLenByteArrayType>(
2076            WriterVersion::PARQUET_2_0,
2077            false,
2078            &[ByteArray::from(vec![1u8]).into()],
2079            None,
2080            &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
2081            &[encoding_stats(
2082                PageType::DATA_PAGE_V2,
2083                Encoding::DELTA_BYTE_ARRAY,
2084                1,
2085            )],
2086        );
2087    }
2088
2089    #[test]
2090    fn test_column_writer_check_metadata() {
2091        let page_writer = get_test_page_writer();
2092        let props = Default::default();
2093        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2094        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2095
2096        let r = writer.close().unwrap();
2097        assert_eq!(r.bytes_written, 20);
2098        assert_eq!(r.rows_written, 4);
2099
2100        let metadata = r.metadata;
2101        assert_eq!(
2102            metadata.encodings().collect::<Vec<_>>(),
2103            vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
2104        );
2105        assert_eq!(metadata.num_values(), 4);
2106        assert_eq!(metadata.compressed_size(), 20);
2107        assert_eq!(metadata.uncompressed_size(), 20);
2108        assert_eq!(metadata.data_page_offset(), 0);
2109        assert_eq!(metadata.dictionary_page_offset(), Some(0));
2110        if let Some(stats) = metadata.statistics() {
2111            assert_eq!(stats.null_count_opt(), Some(0));
2112            assert_eq!(stats.distinct_count_opt(), None);
2113            if let Statistics::Int32(stats) = stats {
2114                assert_eq!(stats.min_opt().unwrap(), &1);
2115                assert_eq!(stats.max_opt().unwrap(), &4);
2116            } else {
2117                panic!("expecting Statistics::Int32");
2118            }
2119        } else {
2120            panic!("metadata missing statistics");
2121        }
2122    }
2123
2124    #[test]
2125    fn test_column_writer_check_byte_array_min_max() {
2126        let page_writer = get_test_page_writer();
2127        let props = Default::default();
2128        let mut writer = get_test_decimals_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
2129        writer
2130            .write_batch(
2131                &[
2132                    ByteArray::from(vec![
2133                        255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
2134                        35u8, 231u8, 90u8, 0u8, 0u8,
2135                    ]),
2136                    ByteArray::from(vec![
2137                        255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 228u8, 62u8, 146u8,
2138                        152u8, 177u8, 56u8, 0u8, 0u8,
2139                    ]),
2140                    ByteArray::from(vec![
2141                        0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8,
2142                        0u8,
2143                    ]),
2144                    ByteArray::from(vec![
2145                        0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
2146                        44u8, 0u8, 0u8,
2147                    ]),
2148                ],
2149                None,
2150                None,
2151            )
2152            .unwrap();
2153        let metadata = writer.close().unwrap().metadata;
2154        if let Some(stats) = metadata.statistics() {
2155            if let Statistics::ByteArray(stats) = stats {
2156                assert_eq!(
2157                    stats.min_opt().unwrap(),
2158                    &ByteArray::from(vec![
2159                        255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
2160                        35u8, 231u8, 90u8, 0u8, 0u8,
2161                    ])
2162                );
2163                assert_eq!(
2164                    stats.max_opt().unwrap(),
2165                    &ByteArray::from(vec![
2166                        0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
2167                        44u8, 0u8, 0u8,
2168                    ])
2169                );
2170            } else {
2171                panic!("expecting Statistics::ByteArray");
2172            }
2173        } else {
2174            panic!("metadata missing statistics");
2175        }
2176    }
2177
2178    #[test]
2179    fn test_column_writer_uint32_converted_type_min_max() {
2180        let page_writer = get_test_page_writer();
2181        let props = Default::default();
2182        let mut writer = get_test_unsigned_int_given_as_converted_column_writer::<Int32Type>(
2183            page_writer,
2184            0,
2185            0,
2186            props,
2187        );
2188        writer.write_batch(&[0, 1, 2, 3, 4, 5], None, None).unwrap();
2189        let metadata = writer.close().unwrap().metadata;
2190        if let Some(stats) = metadata.statistics() {
2191            if let Statistics::Int32(stats) = stats {
2192                assert_eq!(stats.min_opt().unwrap(), &0,);
2193                assert_eq!(stats.max_opt().unwrap(), &5,);
2194            } else {
2195                panic!("expecting Statistics::Int32");
2196            }
2197        } else {
2198            panic!("metadata missing statistics");
2199        }
2200    }
2201
2202    #[test]
2203    fn test_column_writer_precalculated_statistics() {
2204        let page_writer = get_test_page_writer();
2205        let props = Arc::new(
2206            WriterProperties::builder()
2207                .set_statistics_enabled(EnabledStatistics::Chunk)
2208                .build(),
2209        );
2210        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2211        writer
2212            .write_batch_with_statistics(
2213                &[1, 2, 3, 4],
2214                None,
2215                None,
2216                Some(&-17),
2217                Some(&9000),
2218                Some(55),
2219            )
2220            .unwrap();
2221
2222        let r = writer.close().unwrap();
2223        assert_eq!(r.bytes_written, 20);
2224        assert_eq!(r.rows_written, 4);
2225
2226        let metadata = r.metadata;
2227        assert_eq!(
2228            metadata.encodings().collect::<Vec<_>>(),
2229            vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
2230        );
2231        assert_eq!(metadata.num_values(), 4);
2232        assert_eq!(metadata.compressed_size(), 20);
2233        assert_eq!(metadata.uncompressed_size(), 20);
2234        assert_eq!(metadata.data_page_offset(), 0);
2235        assert_eq!(metadata.dictionary_page_offset(), Some(0));
2236        if let Some(stats) = metadata.statistics() {
2237            assert_eq!(stats.null_count_opt(), Some(0));
2238            assert_eq!(stats.distinct_count_opt().unwrap_or(0), 55);
2239            if let Statistics::Int32(stats) = stats {
2240                assert_eq!(stats.min_opt().unwrap(), &-17);
2241                assert_eq!(stats.max_opt().unwrap(), &9000);
2242            } else {
2243                panic!("expecting Statistics::Int32");
2244            }
2245        } else {
2246            panic!("metadata missing statistics");
2247        }
2248    }
2249
2250    #[test]
2251    fn test_mixed_precomputed_statistics() {
2252        let mut buf = Vec::with_capacity(100);
2253        let mut write = TrackedWrite::new(&mut buf);
2254        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2255        let props = Arc::new(
2256            WriterProperties::builder()
2257                .set_write_page_header_statistics(true)
2258                .build(),
2259        );
2260        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2261
2262        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2263        writer
2264            .write_batch_with_statistics(&[5, 6, 7], None, None, Some(&5), Some(&7), Some(3))
2265            .unwrap();
2266
2267        let r = writer.close().unwrap();
2268
2269        let stats = r.metadata.statistics().unwrap();
2270        assert_eq!(stats.min_bytes_opt().unwrap(), 1_i32.to_le_bytes());
2271        assert_eq!(stats.max_bytes_opt().unwrap(), 7_i32.to_le_bytes());
2272        assert_eq!(stats.null_count_opt(), Some(0));
2273        assert!(stats.distinct_count_opt().is_none());
2274
2275        drop(write);
2276
2277        let props = ReaderProperties::builder()
2278            .set_backward_compatible_lz4(false)
2279            .set_read_page_statistics(true)
2280            .build();
2281        let reader = SerializedPageReader::new_with_properties(
2282            Arc::new(Bytes::from(buf)),
2283            &r.metadata,
2284            r.rows_written as usize,
2285            None,
2286            Arc::new(props),
2287        )
2288        .unwrap();
2289
2290        let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2291        assert_eq!(pages.len(), 2);
2292
2293        assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2294        assert_eq!(pages[1].page_type(), PageType::DATA_PAGE);
2295
2296        let page_statistics = pages[1].statistics().unwrap();
2297        assert_eq!(
2298            page_statistics.min_bytes_opt().unwrap(),
2299            1_i32.to_le_bytes()
2300        );
2301        assert_eq!(
2302            page_statistics.max_bytes_opt().unwrap(),
2303            7_i32.to_le_bytes()
2304        );
2305        assert_eq!(page_statistics.null_count_opt(), Some(0));
2306        assert!(page_statistics.distinct_count_opt().is_none());
2307    }
2308
2309    #[test]
2310    fn test_disabled_statistics() {
2311        let mut buf = Vec::with_capacity(100);
2312        let mut write = TrackedWrite::new(&mut buf);
2313        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2314        let props = WriterProperties::builder()
2315            .set_statistics_enabled(EnabledStatistics::None)
2316            .set_writer_version(WriterVersion::PARQUET_2_0)
2317            .build();
2318        let props = Arc::new(props);
2319
2320        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
2321        writer
2322            .write_batch(&[1, 2, 3, 4], Some(&[1, 0, 0, 1, 1, 1]), None)
2323            .unwrap();
2324
2325        let r = writer.close().unwrap();
2326        assert!(r.metadata.statistics().is_none());
2327
2328        drop(write);
2329
2330        let props = ReaderProperties::builder()
2331            .set_backward_compatible_lz4(false)
2332            .build();
2333        let reader = SerializedPageReader::new_with_properties(
2334            Arc::new(Bytes::from(buf)),
2335            &r.metadata,
2336            r.rows_written as usize,
2337            None,
2338            Arc::new(props),
2339        )
2340        .unwrap();
2341
2342        let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2343        assert_eq!(pages.len(), 2);
2344
2345        assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2346        assert_eq!(pages[1].page_type(), PageType::DATA_PAGE_V2);
2347
2348        match &pages[1] {
2349            Page::DataPageV2 {
2350                num_values,
2351                num_nulls,
2352                num_rows,
2353                statistics,
2354                ..
2355            } => {
2356                assert_eq!(*num_values, 6);
2357                assert_eq!(*num_nulls, 2);
2358                assert_eq!(*num_rows, 6);
2359                assert!(statistics.is_none());
2360            }
2361            _ => unreachable!(),
2362        }
2363    }
2364
2365    #[test]
2366    fn test_column_writer_empty_column_roundtrip() {
2367        let props = Default::default();
2368        column_roundtrip::<Int32Type>(props, &[], None, None);
2369    }
2370
2371    #[test]
2372    fn test_column_writer_non_nullable_values_roundtrip() {
2373        let props = Default::default();
2374        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 0, 0);
2375    }
2376
2377    #[test]
2378    fn test_column_writer_nullable_non_repeated_values_roundtrip() {
2379        let props = Default::default();
2380        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 0);
2381    }
2382
2383    #[test]
2384    fn test_column_writer_nullable_repeated_values_roundtrip() {
2385        let props = Default::default();
2386        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2387    }
2388
2389    #[test]
2390    fn test_column_writer_dictionary_fallback_small_data_page() {
2391        let props = WriterProperties::builder()
2392            .set_dictionary_page_size_limit(32)
2393            .set_data_page_size_limit(32)
2394            .build();
2395        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2396    }
2397
2398    #[test]
2399    fn test_column_writer_small_write_batch_size() {
2400        for i in &[1usize, 2, 5, 10, 11, 1023] {
2401            let props = WriterProperties::builder().set_write_batch_size(*i).build();
2402
2403            column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2404        }
2405    }
2406
2407    #[test]
2408    fn test_column_writer_dictionary_disabled_v1() {
2409        let props = WriterProperties::builder()
2410            .set_writer_version(WriterVersion::PARQUET_1_0)
2411            .set_dictionary_enabled(false)
2412            .build();
2413        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2414    }
2415
2416    #[test]
2417    fn test_column_writer_dictionary_disabled_v2() {
2418        let props = WriterProperties::builder()
2419            .set_writer_version(WriterVersion::PARQUET_2_0)
2420            .set_dictionary_enabled(false)
2421            .build();
2422        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2423    }
2424
2425    #[test]
2426    fn test_column_writer_compression_v1() {
2427        let props = WriterProperties::builder()
2428            .set_writer_version(WriterVersion::PARQUET_1_0)
2429            .set_compression(Compression::SNAPPY)
2430            .build();
2431        column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2432    }
2433
2434    #[test]
2435    fn test_column_writer_compression_v2() {
2436        let props = WriterProperties::builder()
2437            .set_writer_version(WriterVersion::PARQUET_2_0)
2438            .set_compression(Compression::SNAPPY)
2439            .build();
2440        column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2441    }
2442
2443    #[test]
2444    fn test_column_writer_add_data_pages_with_dict() {
2445        // ARROW-5129: Test verifies that we add data page in case of dictionary encoding
2446        // and no fallback occurred so far.
2447        let mut file = tempfile::tempfile().unwrap();
2448        let mut write = TrackedWrite::new(&mut file);
2449        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2450        let props = Arc::new(
2451            WriterProperties::builder()
2452                .set_data_page_size_limit(10)
2453                .set_write_batch_size(3) // write 3 values at a time
2454                .build(),
2455        );
2456        let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
2457        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2458        writer.write_batch(data, None, None).unwrap();
2459        let r = writer.close().unwrap();
2460
2461        drop(write);
2462
2463        // Read pages and check the sequence
2464        let props = ReaderProperties::builder()
2465            .set_backward_compatible_lz4(false)
2466            .build();
2467        let mut page_reader = Box::new(
2468            SerializedPageReader::new_with_properties(
2469                Arc::new(file),
2470                &r.metadata,
2471                r.rows_written as usize,
2472                None,
2473                Arc::new(props),
2474            )
2475            .unwrap(),
2476        );
2477        let mut res = Vec::new();
2478        while let Some(page) = page_reader.get_next_page().unwrap() {
2479            res.push((page.page_type(), page.num_values(), page.buffer().len()));
2480        }
2481        assert_eq!(
2482            res,
2483            vec![
2484                (PageType::DICTIONARY_PAGE, 10, 40),
2485                (PageType::DATA_PAGE, 9, 10),
2486                (PageType::DATA_PAGE, 1, 3),
2487            ]
2488        );
2489        assert_eq!(
2490            r.metadata.page_encoding_stats(),
2491            Some(&vec![
2492                PageEncodingStats {
2493                    page_type: PageType::DICTIONARY_PAGE,
2494                    encoding: Encoding::PLAIN,
2495                    count: 1
2496                },
2497                PageEncodingStats {
2498                    page_type: PageType::DATA_PAGE,
2499                    encoding: Encoding::RLE_DICTIONARY,
2500                    count: 2,
2501                }
2502            ])
2503        );
2504    }
2505
2506    #[test]
2507    fn test_column_writer_column_data_page_size_limit() {
2508        let props = Arc::new(
2509            WriterProperties::builder()
2510                .set_writer_version(WriterVersion::PARQUET_1_0)
2511                .set_dictionary_enabled(false)
2512                .set_data_page_size_limit(1000)
2513                .set_column_data_page_size_limit(ColumnPath::from("col"), 10)
2514                .set_write_batch_size(3)
2515                .build(),
2516        );
2517        let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
2518
2519        let col_values =
2520            write_and_collect_page_values(ColumnPath::from("col"), Arc::clone(&props), data);
2521        let other_values = write_and_collect_page_values(ColumnPath::from("other"), props, data);
2522
2523        assert_eq!(col_values, vec![3, 3, 3, 1]);
2524        assert_eq!(other_values, vec![10]);
2525    }
2526
2527    #[test]
2528    fn test_bool_statistics() {
2529        let stats = statistics_roundtrip::<BoolType>(&[true, false, false, true]);
2530        // Booleans have an unsigned sort order and so are not compatible
2531        // with the deprecated `min` and `max` statistics
2532        assert!(!stats.is_min_max_backwards_compatible());
2533        if let Statistics::Boolean(stats) = stats {
2534            assert_eq!(stats.min_opt().unwrap(), &false);
2535            assert_eq!(stats.max_opt().unwrap(), &true);
2536        } else {
2537            panic!("expecting Statistics::Boolean, got {stats:?}");
2538        }
2539    }
2540
2541    #[test]
2542    fn test_int32_statistics() {
2543        let stats = statistics_roundtrip::<Int32Type>(&[-1, 3, -2, 2]);
2544        assert!(stats.is_min_max_backwards_compatible());
2545        if let Statistics::Int32(stats) = stats {
2546            assert_eq!(stats.min_opt().unwrap(), &-2);
2547            assert_eq!(stats.max_opt().unwrap(), &3);
2548        } else {
2549            panic!("expecting Statistics::Int32, got {stats:?}");
2550        }
2551    }
2552
2553    #[test]
2554    fn test_int64_statistics() {
2555        let stats = statistics_roundtrip::<Int64Type>(&[-1, 3, -2, 2]);
2556        assert!(stats.is_min_max_backwards_compatible());
2557        if let Statistics::Int64(stats) = stats {
2558            assert_eq!(stats.min_opt().unwrap(), &-2);
2559            assert_eq!(stats.max_opt().unwrap(), &3);
2560        } else {
2561            panic!("expecting Statistics::Int64, got {stats:?}");
2562        }
2563    }
2564
2565    #[test]
2566    fn test_int96_statistics() {
2567        let input = vec![
2568            Int96::from(vec![1, 20, 30]),
2569            Int96::from(vec![3, 20, 10]),
2570            Int96::from(vec![0, 20, 30]),
2571            Int96::from(vec![2, 20, 30]),
2572        ]
2573        .into_iter()
2574        .collect::<Vec<Int96>>();
2575
2576        let stats = statistics_roundtrip::<Int96Type>(&input);
2577        assert!(!stats.is_min_max_backwards_compatible());
2578        if let Statistics::Int96(stats) = stats {
2579            assert_eq!(stats.min_opt().unwrap(), &Int96::from(vec![3, 20, 10]));
2580            assert_eq!(stats.max_opt().unwrap(), &Int96::from(vec![2, 20, 30]));
2581        } else {
2582            panic!("expecting Statistics::Int96, got {stats:?}");
2583        }
2584    }
2585
2586    #[test]
2587    fn test_float_statistics() {
2588        let stats = statistics_roundtrip::<FloatType>(&[-1.0, 3.0, -2.0, 2.0]);
2589        assert!(stats.is_min_max_backwards_compatible());
2590        if let Statistics::Float(stats) = stats {
2591            assert_eq!(stats.min_opt().unwrap(), &-2.0);
2592            assert_eq!(stats.max_opt().unwrap(), &3.0);
2593        } else {
2594            panic!("expecting Statistics::Float, got {stats:?}");
2595        }
2596    }
2597
2598    #[test]
2599    fn test_double_statistics() {
2600        let stats = statistics_roundtrip::<DoubleType>(&[-1.0, 3.0, -2.0, 2.0]);
2601        assert!(stats.is_min_max_backwards_compatible());
2602        if let Statistics::Double(stats) = stats {
2603            assert_eq!(stats.min_opt().unwrap(), &-2.0);
2604            assert_eq!(stats.max_opt().unwrap(), &3.0);
2605        } else {
2606            panic!("expecting Statistics::Double, got {stats:?}");
2607        }
2608    }
2609
2610    #[test]
2611    fn test_byte_array_statistics() {
2612        let input = ["aawaa", "zz", "aaw", "m", "qrs"]
2613            .iter()
2614            .map(|&s| s.into())
2615            .collect::<Vec<_>>();
2616
2617        let stats = statistics_roundtrip::<ByteArrayType>(&input);
2618        assert!(!stats.is_min_max_backwards_compatible());
2619        if let Statistics::ByteArray(stats) = stats {
2620            assert_eq!(stats.min_opt().unwrap(), &ByteArray::from("aaw"));
2621            assert_eq!(stats.max_opt().unwrap(), &ByteArray::from("zz"));
2622        } else {
2623            panic!("expecting Statistics::ByteArray, got {stats:?}");
2624        }
2625    }
2626
2627    #[test]
2628    fn test_fixed_len_byte_array_statistics() {
2629        let input = ["aawaa", "zz   ", "aaw  ", "m    ", "qrs  "]
2630            .iter()
2631            .map(|&s| ByteArray::from(s).into())
2632            .collect::<Vec<_>>();
2633
2634        let stats = statistics_roundtrip::<FixedLenByteArrayType>(&input);
2635        assert!(!stats.is_min_max_backwards_compatible());
2636        if let Statistics::FixedLenByteArray(stats) = stats {
2637            let expected_min: FixedLenByteArray = ByteArray::from("aaw  ").into();
2638            assert_eq!(stats.min_opt().unwrap(), &expected_min);
2639            let expected_max: FixedLenByteArray = ByteArray::from("zz   ").into();
2640            assert_eq!(stats.max_opt().unwrap(), &expected_max);
2641        } else {
2642            panic!("expecting Statistics::FixedLenByteArray, got {stats:?}");
2643        }
2644    }
2645
2646    #[test]
2647    fn test_column_writer_check_float16_min_max() {
2648        let input = [
2649            -f16::ONE,
2650            f16::from_f32(3.0),
2651            -f16::from_f32(2.0),
2652            f16::from_f32(2.0),
2653        ]
2654        .into_iter()
2655        .map(|s| ByteArray::from(s).into())
2656        .collect::<Vec<_>>();
2657
2658        let stats = float16_statistics_roundtrip(&input);
2659        assert!(stats.is_min_max_backwards_compatible());
2660        assert_eq!(
2661            stats.min_opt().unwrap(),
2662            &ByteArray::from(-f16::from_f32(2.0))
2663        );
2664        assert_eq!(
2665            stats.max_opt().unwrap(),
2666            &ByteArray::from(f16::from_f32(3.0))
2667        );
2668    }
2669
2670    #[test]
2671    fn test_column_writer_check_float16_nan_middle() {
2672        let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
2673            .into_iter()
2674            .map(|s| ByteArray::from(s).into())
2675            .collect::<Vec<_>>();
2676
2677        let stats = float16_statistics_roundtrip(&input);
2678        assert!(stats.is_min_max_backwards_compatible());
2679        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2680        assert_eq!(
2681            stats.max_opt().unwrap(),
2682            &ByteArray::from(f16::ONE + f16::ONE)
2683        );
2684    }
2685
2686    #[test]
2687    fn test_float16_statistics_nan_middle() {
2688        let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
2689            .into_iter()
2690            .map(|s| ByteArray::from(s).into())
2691            .collect::<Vec<_>>();
2692
2693        let stats = float16_statistics_roundtrip(&input);
2694        assert!(stats.is_min_max_backwards_compatible());
2695        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2696        assert_eq!(
2697            stats.max_opt().unwrap(),
2698            &ByteArray::from(f16::ONE + f16::ONE)
2699        );
2700    }
2701
2702    #[test]
2703    fn test_float16_statistics_nan_start() {
2704        let input = [f16::NAN, f16::ONE, f16::ONE + f16::ONE]
2705            .into_iter()
2706            .map(|s| ByteArray::from(s).into())
2707            .collect::<Vec<_>>();
2708
2709        let stats = float16_statistics_roundtrip(&input);
2710        assert!(stats.is_min_max_backwards_compatible());
2711        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2712        assert_eq!(
2713            stats.max_opt().unwrap(),
2714            &ByteArray::from(f16::ONE + f16::ONE)
2715        );
2716    }
2717
2718    #[test]
2719    fn test_float16_statistics_nan_only() {
2720        let input = [f16::NAN, f16::NAN]
2721            .into_iter()
2722            .map(|s| ByteArray::from(s).into())
2723            .collect::<Vec<_>>();
2724
2725        let stats = float16_statistics_roundtrip(&input);
2726        assert!(stats.min_bytes_opt().is_none());
2727        assert!(stats.max_bytes_opt().is_none());
2728        assert!(stats.is_min_max_backwards_compatible());
2729    }
2730
2731    #[test]
2732    fn test_float16_statistics_zero_only() {
2733        let input = [f16::ZERO]
2734            .into_iter()
2735            .map(|s| ByteArray::from(s).into())
2736            .collect::<Vec<_>>();
2737
2738        let stats = float16_statistics_roundtrip(&input);
2739        assert!(stats.is_min_max_backwards_compatible());
2740        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2741        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2742    }
2743
2744    #[test]
2745    fn test_float16_statistics_neg_zero_only() {
2746        let input = [f16::NEG_ZERO]
2747            .into_iter()
2748            .map(|s| ByteArray::from(s).into())
2749            .collect::<Vec<_>>();
2750
2751        let stats = float16_statistics_roundtrip(&input);
2752        assert!(stats.is_min_max_backwards_compatible());
2753        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2754        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2755    }
2756
2757    #[test]
2758    fn test_float16_statistics_zero_min() {
2759        let input = [f16::ZERO, f16::ONE, f16::NAN, f16::PI]
2760            .into_iter()
2761            .map(|s| ByteArray::from(s).into())
2762            .collect::<Vec<_>>();
2763
2764        let stats = float16_statistics_roundtrip(&input);
2765        assert!(stats.is_min_max_backwards_compatible());
2766        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2767        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::PI));
2768    }
2769
2770    #[test]
2771    fn test_float16_statistics_neg_zero_max() {
2772        let input = [f16::NEG_ZERO, f16::NEG_ONE, f16::NAN, -f16::PI]
2773            .into_iter()
2774            .map(|s| ByteArray::from(s).into())
2775            .collect::<Vec<_>>();
2776
2777        let stats = float16_statistics_roundtrip(&input);
2778        assert!(stats.is_min_max_backwards_compatible());
2779        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(-f16::PI));
2780        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2781    }
2782
2783    #[test]
2784    fn test_float_statistics_nan_middle() {
2785        let stats = statistics_roundtrip::<FloatType>(&[1.0, f32::NAN, 2.0]);
2786        assert!(stats.is_min_max_backwards_compatible());
2787        if let Statistics::Float(stats) = stats {
2788            assert_eq!(stats.min_opt().unwrap(), &1.0);
2789            assert_eq!(stats.max_opt().unwrap(), &2.0);
2790        } else {
2791            panic!("expecting Statistics::Float");
2792        }
2793    }
2794
2795    #[test]
2796    fn test_float_statistics_nan_start() {
2797        let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, 1.0, 2.0]);
2798        assert!(stats.is_min_max_backwards_compatible());
2799        if let Statistics::Float(stats) = stats {
2800            assert_eq!(stats.min_opt().unwrap(), &1.0);
2801            assert_eq!(stats.max_opt().unwrap(), &2.0);
2802        } else {
2803            panic!("expecting Statistics::Float");
2804        }
2805    }
2806
2807    #[test]
2808    fn test_float_statistics_nan_only() {
2809        let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, f32::NAN]);
2810        assert!(stats.min_bytes_opt().is_none());
2811        assert!(stats.max_bytes_opt().is_none());
2812        assert!(stats.is_min_max_backwards_compatible());
2813        assert!(matches!(stats, Statistics::Float(_)));
2814    }
2815
2816    #[test]
2817    fn test_float_statistics_zero_only() {
2818        let stats = statistics_roundtrip::<FloatType>(&[0.0]);
2819        assert!(stats.is_min_max_backwards_compatible());
2820        if let Statistics::Float(stats) = stats {
2821            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2822            assert!(stats.min_opt().unwrap().is_sign_negative());
2823            assert_eq!(stats.max_opt().unwrap(), &0.0);
2824            assert!(stats.max_opt().unwrap().is_sign_positive());
2825        } else {
2826            panic!("expecting Statistics::Float");
2827        }
2828    }
2829
2830    #[test]
2831    fn test_float_statistics_neg_zero_only() {
2832        let stats = statistics_roundtrip::<FloatType>(&[-0.0]);
2833        assert!(stats.is_min_max_backwards_compatible());
2834        if let Statistics::Float(stats) = stats {
2835            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2836            assert!(stats.min_opt().unwrap().is_sign_negative());
2837            assert_eq!(stats.max_opt().unwrap(), &0.0);
2838            assert!(stats.max_opt().unwrap().is_sign_positive());
2839        } else {
2840            panic!("expecting Statistics::Float");
2841        }
2842    }
2843
2844    #[test]
2845    fn test_float_statistics_zero_min() {
2846        let stats = statistics_roundtrip::<FloatType>(&[0.0, 1.0, f32::NAN, 2.0]);
2847        assert!(stats.is_min_max_backwards_compatible());
2848        if let Statistics::Float(stats) = stats {
2849            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2850            assert!(stats.min_opt().unwrap().is_sign_negative());
2851            assert_eq!(stats.max_opt().unwrap(), &2.0);
2852        } else {
2853            panic!("expecting Statistics::Float");
2854        }
2855    }
2856
2857    #[test]
2858    fn test_float_statistics_neg_zero_max() {
2859        let stats = statistics_roundtrip::<FloatType>(&[-0.0, -1.0, f32::NAN, -2.0]);
2860        assert!(stats.is_min_max_backwards_compatible());
2861        if let Statistics::Float(stats) = stats {
2862            assert_eq!(stats.min_opt().unwrap(), &-2.0);
2863            assert_eq!(stats.max_opt().unwrap(), &0.0);
2864            assert!(stats.max_opt().unwrap().is_sign_positive());
2865        } else {
2866            panic!("expecting Statistics::Float");
2867        }
2868    }
2869
2870    #[test]
2871    fn test_double_statistics_nan_middle() {
2872        let stats = statistics_roundtrip::<DoubleType>(&[1.0, f64::NAN, 2.0]);
2873        assert!(stats.is_min_max_backwards_compatible());
2874        if let Statistics::Double(stats) = stats {
2875            assert_eq!(stats.min_opt().unwrap(), &1.0);
2876            assert_eq!(stats.max_opt().unwrap(), &2.0);
2877        } else {
2878            panic!("expecting Statistics::Double");
2879        }
2880    }
2881
2882    #[test]
2883    fn test_double_statistics_nan_start() {
2884        let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, 1.0, 2.0]);
2885        assert!(stats.is_min_max_backwards_compatible());
2886        if let Statistics::Double(stats) = stats {
2887            assert_eq!(stats.min_opt().unwrap(), &1.0);
2888            assert_eq!(stats.max_opt().unwrap(), &2.0);
2889        } else {
2890            panic!("expecting Statistics::Double");
2891        }
2892    }
2893
2894    #[test]
2895    fn test_double_statistics_nan_only() {
2896        let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, f64::NAN]);
2897        assert!(stats.min_bytes_opt().is_none());
2898        assert!(stats.max_bytes_opt().is_none());
2899        assert!(matches!(stats, Statistics::Double(_)));
2900        assert!(stats.is_min_max_backwards_compatible());
2901    }
2902
2903    #[test]
2904    fn test_double_statistics_zero_only() {
2905        let stats = statistics_roundtrip::<DoubleType>(&[0.0]);
2906        assert!(stats.is_min_max_backwards_compatible());
2907        if let Statistics::Double(stats) = stats {
2908            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2909            assert!(stats.min_opt().unwrap().is_sign_negative());
2910            assert_eq!(stats.max_opt().unwrap(), &0.0);
2911            assert!(stats.max_opt().unwrap().is_sign_positive());
2912        } else {
2913            panic!("expecting Statistics::Double");
2914        }
2915    }
2916
2917    #[test]
2918    fn test_double_statistics_neg_zero_only() {
2919        let stats = statistics_roundtrip::<DoubleType>(&[-0.0]);
2920        assert!(stats.is_min_max_backwards_compatible());
2921        if let Statistics::Double(stats) = stats {
2922            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2923            assert!(stats.min_opt().unwrap().is_sign_negative());
2924            assert_eq!(stats.max_opt().unwrap(), &0.0);
2925            assert!(stats.max_opt().unwrap().is_sign_positive());
2926        } else {
2927            panic!("expecting Statistics::Double");
2928        }
2929    }
2930
2931    #[test]
2932    fn test_double_statistics_zero_min() {
2933        let stats = statistics_roundtrip::<DoubleType>(&[0.0, 1.0, f64::NAN, 2.0]);
2934        assert!(stats.is_min_max_backwards_compatible());
2935        if let Statistics::Double(stats) = stats {
2936            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2937            assert!(stats.min_opt().unwrap().is_sign_negative());
2938            assert_eq!(stats.max_opt().unwrap(), &2.0);
2939        } else {
2940            panic!("expecting Statistics::Double");
2941        }
2942    }
2943
2944    #[test]
2945    fn test_double_statistics_neg_zero_max() {
2946        let stats = statistics_roundtrip::<DoubleType>(&[-0.0, -1.0, f64::NAN, -2.0]);
2947        assert!(stats.is_min_max_backwards_compatible());
2948        if let Statistics::Double(stats) = stats {
2949            assert_eq!(stats.min_opt().unwrap(), &-2.0);
2950            assert_eq!(stats.max_opt().unwrap(), &0.0);
2951            assert!(stats.max_opt().unwrap().is_sign_positive());
2952        } else {
2953            panic!("expecting Statistics::Double");
2954        }
2955    }
2956
2957    #[test]
2958    fn test_compare_greater_byte_array_decimals() {
2959        assert!(!compare_greater_byte_array_decimals(&[], &[],),);
2960        assert!(compare_greater_byte_array_decimals(&[1u8,], &[],),);
2961        assert!(!compare_greater_byte_array_decimals(&[], &[1u8,],),);
2962        assert!(compare_greater_byte_array_decimals(&[1u8,], &[0u8,],),);
2963        assert!(!compare_greater_byte_array_decimals(&[1u8,], &[1u8,],),);
2964        assert!(compare_greater_byte_array_decimals(&[1u8, 0u8,], &[0u8,],),);
2965        assert!(!compare_greater_byte_array_decimals(
2966            &[0u8, 1u8,],
2967            &[1u8, 0u8,],
2968        ),);
2969        assert!(!compare_greater_byte_array_decimals(
2970            &[255u8, 35u8, 0u8, 0u8,],
2971            &[0u8,],
2972        ),);
2973        assert!(compare_greater_byte_array_decimals(
2974            &[0u8,],
2975            &[255u8, 35u8, 0u8, 0u8,],
2976        ),);
2977    }
2978
2979    #[test]
2980    fn test_column_index_with_null_pages() {
2981        // write a single page of all nulls
2982        let page_writer = get_test_page_writer();
2983        let props = Default::default();
2984        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
2985        writer.write_batch(&[], Some(&[0, 0, 0, 0]), None).unwrap();
2986
2987        let r = writer.close().unwrap();
2988        assert!(r.column_index.is_some());
2989        let col_idx = r.column_index.unwrap();
2990        let col_idx = match col_idx {
2991            ColumnIndexMetaData::INT32(col_idx) => col_idx,
2992            _ => panic!("wrong stats type"),
2993        };
2994        // null_pages should be true for page 0
2995        assert!(col_idx.is_null_page(0));
2996        // min and max should be empty byte arrays
2997        assert!(col_idx.min_value(0).is_none());
2998        assert!(col_idx.max_value(0).is_none());
2999        // null_counts should be defined and be 4 for page 0
3000        assert!(col_idx.null_count(0).is_some());
3001        assert_eq!(col_idx.null_count(0), Some(4));
3002        // there is no repetition so rep histogram should be absent
3003        assert!(col_idx.repetition_level_histogram(0).is_none());
3004        // definition_level_histogram should be present and should be 0:4, 1:0
3005        assert!(col_idx.definition_level_histogram(0).is_some());
3006        assert_eq!(col_idx.definition_level_histogram(0).unwrap(), &[4, 0]);
3007    }
3008
3009    #[test]
3010    fn test_column_offset_index_metadata() {
3011        // write data
3012        // and check the offset index and column index
3013        let page_writer = get_test_page_writer();
3014        let props = Default::default();
3015        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
3016        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
3017        // first page
3018        writer.flush_data_pages().unwrap();
3019        // second page
3020        writer.write_batch(&[4, 8, 2, -5], None, None).unwrap();
3021
3022        let r = writer.close().unwrap();
3023        let column_index = r.column_index.unwrap();
3024        let offset_index = r.offset_index.unwrap();
3025
3026        assert_eq!(8, r.rows_written);
3027
3028        // column index
3029        let column_index = match column_index {
3030            ColumnIndexMetaData::INT32(column_index) => column_index,
3031            _ => panic!("wrong stats type"),
3032        };
3033        assert_eq!(2, column_index.num_pages());
3034        assert_eq!(2, offset_index.page_locations.len());
3035        assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order);
3036        for idx in 0..2 {
3037            assert!(!column_index.is_null_page(idx));
3038            assert_eq!(0, column_index.null_count(0).unwrap());
3039        }
3040
3041        if let Some(stats) = r.metadata.statistics() {
3042            assert_eq!(stats.null_count_opt(), Some(0));
3043            assert_eq!(stats.distinct_count_opt(), None);
3044            if let Statistics::Int32(stats) = stats {
3045                // first page is [1,2,3,4]
3046                // second page is [-5,2,4,8]
3047                // note that we don't increment here, as this is a non BinaryArray type.
3048                assert_eq!(stats.min_opt(), column_index.min_value(1));
3049                assert_eq!(stats.max_opt(), column_index.max_value(1));
3050            } else {
3051                panic!("expecting Statistics::Int32");
3052            }
3053        } else {
3054            panic!("metadata missing statistics");
3055        }
3056
3057        // page location
3058        assert_eq!(0, offset_index.page_locations[0].first_row_index);
3059        assert_eq!(4, offset_index.page_locations[1].first_row_index);
3060    }
3061
3062    /// Verify min/max value truncation in the column index works as expected
3063    #[test]
3064    fn test_column_offset_index_metadata_truncating() {
3065        // write data
3066        // and check the offset index and column index
3067        let page_writer = get_test_page_writer();
3068        let props = WriterProperties::builder()
3069            .set_statistics_truncate_length(None) // disable column index truncation
3070            .build()
3071            .into();
3072        let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3073
3074        let mut data = vec![FixedLenByteArray::default(); 3];
3075        // This is the expected min value - "aaa..."
3076        data[0].set_data(Bytes::from(vec![97_u8; 200]));
3077        // This is the expected max value - "ZZZ..."
3078        data[1].set_data(Bytes::from(vec![112_u8; 200]));
3079        data[2].set_data(Bytes::from(vec![98_u8; 200]));
3080
3081        writer.write_batch(&data, None, None).unwrap();
3082
3083        writer.flush_data_pages().unwrap();
3084
3085        let r = writer.close().unwrap();
3086        let column_index = r.column_index.unwrap();
3087        let offset_index = r.offset_index.unwrap();
3088
3089        let column_index = match column_index {
3090            ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
3091            _ => panic!("wrong stats type"),
3092        };
3093
3094        assert_eq!(3, r.rows_written);
3095
3096        // column index
3097        assert_eq!(1, column_index.num_pages());
3098        assert_eq!(1, offset_index.page_locations.len());
3099        assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
3100        assert!(!column_index.is_null_page(0));
3101        assert_eq!(Some(0), column_index.null_count(0));
3102
3103        if let Some(stats) = r.metadata.statistics() {
3104            assert_eq!(stats.null_count_opt(), Some(0));
3105            assert_eq!(stats.distinct_count_opt(), None);
3106            if let Statistics::FixedLenByteArray(stats) = stats {
3107                let column_index_min_value = column_index.min_value(0).unwrap();
3108                let column_index_max_value = column_index.max_value(0).unwrap();
3109
3110                // Column index stats are truncated, while the column chunk's aren't.
3111                assert_ne!(stats.min_bytes_opt().unwrap(), column_index_min_value);
3112                assert_ne!(stats.max_bytes_opt().unwrap(), column_index_max_value);
3113
3114                assert_eq!(
3115                    column_index_min_value.len(),
3116                    DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
3117                );
3118                assert_eq!(column_index_min_value, &[97_u8; 64]);
3119                assert_eq!(
3120                    column_index_max_value.len(),
3121                    DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
3122                );
3123
3124                // We expect the last byte to be incremented
3125                assert_eq!(
3126                    *column_index_max_value.last().unwrap(),
3127                    *column_index_max_value.first().unwrap() + 1
3128                );
3129            } else {
3130                panic!("expecting Statistics::FixedLenByteArray");
3131            }
3132        } else {
3133            panic!("metadata missing statistics");
3134        }
3135    }
3136
3137    #[test]
3138    fn test_column_offset_index_truncating_spec_example() {
3139        // write data
3140        // and check the offset index and column index
3141        let page_writer = get_test_page_writer();
3142
3143        // Truncate values at 1 byte
3144        let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3145        let props = Arc::new(builder.build());
3146        let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3147
3148        let mut data = vec![FixedLenByteArray::default(); 1];
3149        // This is the expected min value
3150        data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
3151
3152        writer.write_batch(&data, None, None).unwrap();
3153
3154        writer.flush_data_pages().unwrap();
3155
3156        let r = writer.close().unwrap();
3157        let column_index = r.column_index.unwrap();
3158        let offset_index = r.offset_index.unwrap();
3159
3160        let column_index = match column_index {
3161            ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
3162            _ => panic!("wrong stats type"),
3163        };
3164
3165        assert_eq!(1, r.rows_written);
3166
3167        // column index
3168        assert_eq!(1, column_index.num_pages());
3169        assert_eq!(1, offset_index.page_locations.len());
3170        assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
3171        assert!(!column_index.is_null_page(0));
3172        assert_eq!(Some(0), column_index.null_count(0));
3173
3174        if let Some(stats) = r.metadata.statistics() {
3175            assert_eq!(stats.null_count_opt(), Some(0));
3176            assert_eq!(stats.distinct_count_opt(), None);
3177            if let Statistics::FixedLenByteArray(_stats) = stats {
3178                let column_index_min_value = column_index.min_value(0).unwrap();
3179                let column_index_max_value = column_index.max_value(0).unwrap();
3180
3181                assert_eq!(column_index_min_value.len(), 1);
3182                assert_eq!(column_index_max_value.len(), 1);
3183
3184                assert_eq!("B".as_bytes(), column_index_min_value);
3185                assert_eq!("C".as_bytes(), column_index_max_value);
3186
3187                assert_ne!(column_index_min_value, stats.min_bytes_opt().unwrap());
3188                assert_ne!(column_index_max_value, stats.max_bytes_opt().unwrap());
3189            } else {
3190                panic!("expecting Statistics::FixedLenByteArray");
3191            }
3192        } else {
3193            panic!("metadata missing statistics");
3194        }
3195    }
3196
3197    #[test]
3198    fn test_float16_min_max_no_truncation() {
3199        // Even if we set truncation to occur at 1 byte, we should not truncate for Float16
3200        let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3201        let props = Arc::new(builder.build());
3202        let page_writer = get_test_page_writer();
3203        let mut writer = get_test_float16_column_writer(page_writer, props);
3204
3205        let expected_value = f16::PI.to_le_bytes().to_vec();
3206        let data = vec![ByteArray::from(expected_value.clone()).into()];
3207        writer.write_batch(&data, None, None).unwrap();
3208        writer.flush_data_pages().unwrap();
3209
3210        let r = writer.close().unwrap();
3211
3212        // stats should still be written
3213        // ensure bytes weren't truncated for column index
3214        let column_index = r.column_index.unwrap();
3215        let column_index = match column_index {
3216            ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
3217            _ => panic!("wrong stats type"),
3218        };
3219        let column_index_min_bytes = column_index.min_value(0).unwrap();
3220        let column_index_max_bytes = column_index.max_value(0).unwrap();
3221        assert_eq!(expected_value, column_index_min_bytes);
3222        assert_eq!(expected_value, column_index_max_bytes);
3223
3224        // ensure bytes weren't truncated for statistics
3225        let stats = r.metadata.statistics().unwrap();
3226        if let Statistics::FixedLenByteArray(stats) = stats {
3227            let stats_min_bytes = stats.min_bytes_opt().unwrap();
3228            let stats_max_bytes = stats.max_bytes_opt().unwrap();
3229            assert_eq!(expected_value, stats_min_bytes);
3230            assert_eq!(expected_value, stats_max_bytes);
3231        } else {
3232            panic!("expecting Statistics::FixedLenByteArray");
3233        }
3234    }
3235
3236    #[test]
3237    fn test_decimal_min_max_no_truncation() {
3238        // Even if we set truncation to occur at 1 byte, we should not truncate for Decimal
3239        let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3240        let props = Arc::new(builder.build());
3241        let page_writer = get_test_page_writer();
3242        let mut writer =
3243            get_test_decimals_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3244
3245        let expected_value = vec![
3246            255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8, 35u8,
3247            231u8, 90u8, 0u8, 0u8,
3248        ];
3249        let data = vec![ByteArray::from(expected_value.clone()).into()];
3250        writer.write_batch(&data, None, None).unwrap();
3251        writer.flush_data_pages().unwrap();
3252
3253        let r = writer.close().unwrap();
3254
3255        // stats should still be written
3256        // ensure bytes weren't truncated for column index
3257        let column_index = r.column_index.unwrap();
3258        let column_index = match column_index {
3259            ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
3260            _ => panic!("wrong stats type"),
3261        };
3262        let column_index_min_bytes = column_index.min_value(0).unwrap();
3263        let column_index_max_bytes = column_index.max_value(0).unwrap();
3264        assert_eq!(expected_value, column_index_min_bytes);
3265        assert_eq!(expected_value, column_index_max_bytes);
3266
3267        // ensure bytes weren't truncated for statistics
3268        let stats = r.metadata.statistics().unwrap();
3269        if let Statistics::FixedLenByteArray(stats) = stats {
3270            let stats_min_bytes = stats.min_bytes_opt().unwrap();
3271            let stats_max_bytes = stats.max_bytes_opt().unwrap();
3272            assert_eq!(expected_value, stats_min_bytes);
3273            assert_eq!(expected_value, stats_max_bytes);
3274        } else {
3275            panic!("expecting Statistics::FixedLenByteArray");
3276        }
3277    }
3278
3279    #[test]
3280    fn test_statistics_truncating_byte_array_default() {
3281        let page_writer = get_test_page_writer();
3282
3283        // The default truncate length is 64 bytes
3284        let props = WriterProperties::builder().build().into();
3285        let mut writer = get_test_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
3286
3287        let mut data = vec![ByteArray::default(); 1];
3288        data[0].set_data(Bytes::from(String::from(
3289            "This string is longer than 64 bytes, so it will almost certainly be truncated.",
3290        )));
3291        writer.write_batch(&data, None, None).unwrap();
3292        writer.flush_data_pages().unwrap();
3293
3294        let r = writer.close().unwrap();
3295
3296        assert_eq!(1, r.rows_written);
3297
3298        let stats = r.metadata.statistics().expect("statistics");
3299        if let Statistics::ByteArray(_stats) = stats {
3300            let min_value = _stats.min_opt().unwrap();
3301            let max_value = _stats.max_opt().unwrap();
3302
3303            assert!(!_stats.min_is_exact());
3304            assert!(!_stats.max_is_exact());
3305
3306            let expected_len = 64;
3307            assert_eq!(min_value.len(), expected_len);
3308            assert_eq!(max_value.len(), expected_len);
3309
3310            let expected_min =
3311                "This string is longer than 64 bytes, so it will almost certainly".as_bytes();
3312            assert_eq!(expected_min, min_value.as_bytes());
3313            // note the max value is different from the min value: the last byte is incremented
3314            let expected_max =
3315                "This string is longer than 64 bytes, so it will almost certainlz".as_bytes();
3316            assert_eq!(expected_max, max_value.as_bytes());
3317        } else {
3318            panic!("expecting Statistics::ByteArray");
3319        }
3320    }
3321
3322    #[test]
3323    fn test_statistics_truncating_byte_array() {
3324        let page_writer = get_test_page_writer();
3325
3326        const TEST_TRUNCATE_LENGTH: usize = 1;
3327
3328        // Truncate values at 1 byte
3329        let builder =
3330            WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3331        let props = Arc::new(builder.build());
3332        let mut writer = get_test_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
3333
3334        let mut data = vec![ByteArray::default(); 1];
3335        // This is the expected min value
3336        data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
3337
3338        writer.write_batch(&data, None, None).unwrap();
3339
3340        writer.flush_data_pages().unwrap();
3341
3342        let r = writer.close().unwrap();
3343
3344        assert_eq!(1, r.rows_written);
3345
3346        let stats = r.metadata.statistics().expect("statistics");
3347        assert_eq!(stats.null_count_opt(), Some(0));
3348        assert_eq!(stats.distinct_count_opt(), None);
3349        if let Statistics::ByteArray(_stats) = stats {
3350            let min_value = _stats.min_opt().unwrap();
3351            let max_value = _stats.max_opt().unwrap();
3352
3353            assert!(!_stats.min_is_exact());
3354            assert!(!_stats.max_is_exact());
3355
3356            assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3357            assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3358
3359            assert_eq!("B".as_bytes(), min_value.as_bytes());
3360            assert_eq!("C".as_bytes(), max_value.as_bytes());
3361        } else {
3362            panic!("expecting Statistics::ByteArray");
3363        }
3364    }
3365
3366    #[test]
3367    fn test_statistics_truncating_fixed_len_byte_array() {
3368        let page_writer = get_test_page_writer();
3369
3370        const TEST_TRUNCATE_LENGTH: usize = 1;
3371
3372        // Truncate values at 1 byte
3373        let builder =
3374            WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3375        let props = Arc::new(builder.build());
3376        let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3377
3378        let mut data = vec![FixedLenByteArray::default(); 1];
3379
3380        const PSEUDO_DECIMAL_VALUE: i128 = 6541894651216648486512564456564654;
3381        const PSEUDO_DECIMAL_BYTES: [u8; 16] = PSEUDO_DECIMAL_VALUE.to_be_bytes();
3382
3383        const EXPECTED_MIN: [u8; TEST_TRUNCATE_LENGTH] = [PSEUDO_DECIMAL_BYTES[0]]; // parquet specifies big-endian order for decimals
3384        const EXPECTED_MAX: [u8; TEST_TRUNCATE_LENGTH] =
3385            [PSEUDO_DECIMAL_BYTES[0].overflowing_add(1).0];
3386
3387        // This is the expected min value
3388        data[0].set_data(Bytes::from(PSEUDO_DECIMAL_BYTES.as_slice()));
3389
3390        writer.write_batch(&data, None, None).unwrap();
3391
3392        writer.flush_data_pages().unwrap();
3393
3394        let r = writer.close().unwrap();
3395
3396        assert_eq!(1, r.rows_written);
3397
3398        let stats = r.metadata.statistics().expect("statistics");
3399        assert_eq!(stats.null_count_opt(), Some(0));
3400        assert_eq!(stats.distinct_count_opt(), None);
3401        if let Statistics::FixedLenByteArray(_stats) = stats {
3402            let min_value = _stats.min_opt().unwrap();
3403            let max_value = _stats.max_opt().unwrap();
3404
3405            assert!(!_stats.min_is_exact());
3406            assert!(!_stats.max_is_exact());
3407
3408            assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3409            assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3410
3411            assert_eq!(EXPECTED_MIN.as_slice(), min_value.as_bytes());
3412            assert_eq!(EXPECTED_MAX.as_slice(), max_value.as_bytes());
3413
3414            let reconstructed_min = i128::from_be_bytes([
3415                min_value.as_bytes()[0],
3416                0,
3417                0,
3418                0,
3419                0,
3420                0,
3421                0,
3422                0,
3423                0,
3424                0,
3425                0,
3426                0,
3427                0,
3428                0,
3429                0,
3430                0,
3431            ]);
3432
3433            let reconstructed_max = i128::from_be_bytes([
3434                max_value.as_bytes()[0],
3435                0,
3436                0,
3437                0,
3438                0,
3439                0,
3440                0,
3441                0,
3442                0,
3443                0,
3444                0,
3445                0,
3446                0,
3447                0,
3448                0,
3449                0,
3450            ]);
3451
3452            // check that the inner value is correctly bounded by the min/max
3453            println!("min: {reconstructed_min} {PSEUDO_DECIMAL_VALUE}");
3454            assert!(reconstructed_min <= PSEUDO_DECIMAL_VALUE);
3455            println!("max {reconstructed_max} {PSEUDO_DECIMAL_VALUE}");
3456            assert!(reconstructed_max >= PSEUDO_DECIMAL_VALUE);
3457        } else {
3458            panic!("expecting Statistics::FixedLenByteArray");
3459        }
3460    }
3461
3462    #[test]
3463    fn test_send() {
3464        fn test<T: Send>() {}
3465        test::<ColumnWriterImpl<Int32Type>>();
3466    }
3467
3468    #[test]
3469    fn test_increment() {
3470        let v = increment(vec![0, 0, 0]).unwrap();
3471        assert_eq!(&v, &[0, 0, 1]);
3472
3473        // Handle overflow
3474        let v = increment(vec![0, 255, 255]).unwrap();
3475        assert_eq!(&v, &[1, 0, 0]);
3476
3477        // Return `None` if all bytes are u8::MAX
3478        let v = increment(vec![255, 255, 255]);
3479        assert!(v.is_none());
3480    }
3481
3482    #[test]
3483    fn test_increment_utf8() {
3484        let test_inc = |o: &str, expected: &str| {
3485            if let Ok(v) = String::from_utf8(increment_utf8(o).unwrap()) {
3486                // Got the expected result...
3487                assert_eq!(v, expected);
3488                // and it's greater than the original string
3489                assert!(*v > *o);
3490                // Also show that BinaryArray level comparison works here
3491                let mut greater = ByteArray::new();
3492                greater.set_data(Bytes::from(v));
3493                let mut original = ByteArray::new();
3494                original.set_data(Bytes::from(o.as_bytes().to_vec()));
3495                assert!(greater > original);
3496            } else {
3497                panic!("Expected incremented UTF8 string to also be valid.");
3498            }
3499        };
3500
3501        // Basic ASCII case
3502        test_inc("hello", "hellp");
3503
3504        // 1-byte ending in max 1-byte
3505        test_inc("a\u{7f}", "b");
3506
3507        // 1-byte max should not truncate as it would need 2-byte code points
3508        assert!(increment_utf8("\u{7f}\u{7f}").is_none());
3509
3510        // UTF8 string
3511        test_inc("❤️🧡💛💚💙💜", "❤️🧡💛💚💙💝");
3512
3513        // 2-byte without overflow
3514        test_inc("éééé", "éééê");
3515
3516        // 2-byte that overflows lowest byte
3517        test_inc("\u{ff}\u{ff}", "\u{ff}\u{100}");
3518
3519        // 2-byte ending in max 2-byte
3520        test_inc("a\u{7ff}", "b");
3521
3522        // Max 2-byte should not truncate as it would need 3-byte code points
3523        assert!(increment_utf8("\u{7ff}\u{7ff}").is_none());
3524
3525        // 3-byte without overflow [U+800, U+800] -> [U+800, U+801] (note that these
3526        // characters should render right to left).
3527        test_inc("ࠀࠀ", "ࠀࠁ");
3528
3529        // 3-byte ending in max 3-byte
3530        test_inc("a\u{ffff}", "b");
3531
3532        // Max 3-byte should not truncate as it would need 4-byte code points
3533        assert!(increment_utf8("\u{ffff}\u{ffff}").is_none());
3534
3535        // 4-byte without overflow
3536        test_inc("𐀀𐀀", "𐀀𐀁");
3537
3538        // 4-byte ending in max unicode
3539        test_inc("a\u{10ffff}", "b");
3540
3541        // Max 4-byte should not truncate
3542        assert!(increment_utf8("\u{10ffff}\u{10ffff}").is_none());
3543
3544        // Skip over surrogate pair range (0xD800..=0xDFFF)
3545        //test_inc("a\u{D7FF}", "a\u{e000}");
3546        test_inc("a\u{D7FF}", "b");
3547    }
3548
3549    #[test]
3550    fn test_truncate_utf8() {
3551        // No-op
3552        let data = "❤️🧡💛💚💙💜";
3553        let r = truncate_utf8(data, data.len()).unwrap();
3554        assert_eq!(r.len(), data.len());
3555        assert_eq!(&r, data.as_bytes());
3556
3557        // We slice it away from the UTF8 boundary
3558        let r = truncate_utf8(data, 13).unwrap();
3559        assert_eq!(r.len(), 10);
3560        assert_eq!(&r, "❤️🧡".as_bytes());
3561
3562        // One multi-byte code point, and a length shorter than it, so we can't slice it
3563        let r = truncate_utf8("\u{0836}", 1);
3564        assert!(r.is_none());
3565
3566        // Test truncate and increment for max bounds on UTF-8 statistics
3567        // 7-bit (i.e. ASCII)
3568        let r = truncate_and_increment_utf8("yyyyyyyyy", 8).unwrap();
3569        assert_eq!(&r, "yyyyyyyz".as_bytes());
3570
3571        // 2-byte without overflow
3572        let r = truncate_and_increment_utf8("ééééé", 7).unwrap();
3573        assert_eq!(&r, "ééê".as_bytes());
3574
3575        // 2-byte that overflows lowest byte
3576        let r = truncate_and_increment_utf8("\u{ff}\u{ff}\u{ff}\u{ff}\u{ff}", 8).unwrap();
3577        assert_eq!(&r, "\u{ff}\u{ff}\u{ff}\u{100}".as_bytes());
3578
3579        // max 2-byte should not truncate as it would need 3-byte code points
3580        let r = truncate_and_increment_utf8("߿߿߿߿߿", 8);
3581        assert!(r.is_none());
3582
3583        // 3-byte without overflow [U+800, U+800, U+800] -> [U+800, U+801] (note that these
3584        // characters should render right to left).
3585        let r = truncate_and_increment_utf8("ࠀࠀࠀࠀ", 8).unwrap();
3586        assert_eq!(&r, "ࠀࠁ".as_bytes());
3587
3588        // max 3-byte should not truncate as it would need 4-byte code points
3589        let r = truncate_and_increment_utf8("\u{ffff}\u{ffff}\u{ffff}", 8);
3590        assert!(r.is_none());
3591
3592        // 4-byte without overflow
3593        let r = truncate_and_increment_utf8("𐀀𐀀𐀀𐀀", 9).unwrap();
3594        assert_eq!(&r, "𐀀𐀁".as_bytes());
3595
3596        // max 4-byte should not truncate
3597        let r = truncate_and_increment_utf8("\u{10ffff}\u{10ffff}", 8);
3598        assert!(r.is_none());
3599    }
3600
3601    #[test]
3602    // Check fallback truncation of statistics that should be UTF-8, but aren't
3603    // (see https://github.com/apache/arrow-rs/pull/6870).
3604    fn test_byte_array_truncate_invalid_utf8_statistics() {
3605        let message_type = "
3606            message test_schema {
3607                OPTIONAL BYTE_ARRAY a (UTF8);
3608            }
3609        ";
3610        let schema = Arc::new(parse_message_type(message_type).unwrap());
3611
3612        // Create Vec<ByteArray> containing non-UTF8 bytes
3613        let data = vec![ByteArray::from(vec![128u8; 32]); 7];
3614        let def_levels = [1, 1, 1, 1, 0, 1, 0, 1, 0, 1];
3615        let file: File = tempfile::tempfile().unwrap();
3616        let props = Arc::new(
3617            WriterProperties::builder()
3618                .set_statistics_enabled(EnabledStatistics::Chunk)
3619                .set_statistics_truncate_length(Some(8))
3620                .build(),
3621        );
3622
3623        let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
3624        let mut row_group_writer = writer.next_row_group().unwrap();
3625
3626        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
3627        col_writer
3628            .typed::<ByteArrayType>()
3629            .write_batch(&data, Some(&def_levels), None)
3630            .unwrap();
3631        col_writer.close().unwrap();
3632        row_group_writer.close().unwrap();
3633        let file_metadata = writer.close().unwrap();
3634        let stats = file_metadata.row_group(0).column(0).statistics().unwrap();
3635        assert!(!stats.max_is_exact());
3636        // Truncation of invalid UTF-8 should fall back to binary truncation, so last byte should
3637        // be incremented by 1.
3638        assert_eq!(
3639            stats.max_bytes_opt().map(|v| v.to_vec()),
3640            Some([128, 128, 128, 128, 128, 128, 128, 129].to_vec())
3641        );
3642    }
3643
3644    #[test]
3645    fn test_increment_max_binary_chars() {
3646        let r = increment(vec![0xFF, 0xFE, 0xFD, 0xFF, 0xFF]);
3647        assert_eq!(&r.unwrap(), &[0xFF, 0xFE, 0xFE, 0x00, 0x00]);
3648
3649        let incremented = increment(vec![0xFF, 0xFF, 0xFF]);
3650        assert!(incremented.is_none())
3651    }
3652
3653    #[test]
3654    fn test_no_column_index_when_stats_disabled() {
3655        // https://github.com/apache/arrow-rs/issues/6010
3656        // Test that column index is not created/written for all-nulls column when page
3657        // statistics are disabled.
3658        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3659        let props = Arc::new(
3660            WriterProperties::builder()
3661                .set_statistics_enabled(EnabledStatistics::None)
3662                .build(),
3663        );
3664        let column_writer = get_column_writer(descr, props, get_test_page_writer());
3665        let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3666
3667        let data = Vec::new();
3668        let def_levels = vec![0; 10];
3669        writer.write_batch(&data, Some(&def_levels), None).unwrap();
3670        writer.flush_data_pages().unwrap();
3671
3672        let column_close_result = writer.close().unwrap();
3673        assert!(column_close_result.offset_index.is_some());
3674        assert!(column_close_result.column_index.is_none());
3675    }
3676
3677    #[test]
3678    fn test_no_offset_index_when_disabled() {
3679        // Test that offset indexes can be disabled
3680        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3681        let props = Arc::new(
3682            WriterProperties::builder()
3683                .set_statistics_enabled(EnabledStatistics::None)
3684                .set_offset_index_disabled(true)
3685                .build(),
3686        );
3687        let column_writer = get_column_writer(descr, props, get_test_page_writer());
3688        let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3689
3690        let data = Vec::new();
3691        let def_levels = vec![0; 10];
3692        writer.write_batch(&data, Some(&def_levels), None).unwrap();
3693        writer.flush_data_pages().unwrap();
3694
3695        let column_close_result = writer.close().unwrap();
3696        assert!(column_close_result.offset_index.is_none());
3697        assert!(column_close_result.column_index.is_none());
3698    }
3699
3700    #[test]
3701    fn test_offset_index_overridden() {
3702        // Test that offset indexes are not disabled when gathering page statistics
3703        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3704        let props = Arc::new(
3705            WriterProperties::builder()
3706                .set_statistics_enabled(EnabledStatistics::Page)
3707                .set_offset_index_disabled(true)
3708                .build(),
3709        );
3710        let column_writer = get_column_writer(descr, props, get_test_page_writer());
3711        let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3712
3713        let data = Vec::new();
3714        let def_levels = vec![0; 10];
3715        writer.write_batch(&data, Some(&def_levels), None).unwrap();
3716        writer.flush_data_pages().unwrap();
3717
3718        let column_close_result = writer.close().unwrap();
3719        assert!(column_close_result.offset_index.is_some());
3720        assert!(column_close_result.column_index.is_some());
3721    }
3722
3723    #[test]
3724    fn test_boundary_order() -> Result<()> {
3725        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3726        // min max both ascending
3727        let column_close_result = write_multiple_pages::<Int32Type>(
3728            &descr,
3729            &[
3730                &[Some(-10), Some(10)],
3731                &[Some(-5), Some(11)],
3732                &[None],
3733                &[Some(-5), Some(11)],
3734            ],
3735        )?;
3736        let boundary_order = column_close_result
3737            .column_index
3738            .unwrap()
3739            .get_boundary_order();
3740        assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3741
3742        // min max both descending
3743        let column_close_result = write_multiple_pages::<Int32Type>(
3744            &descr,
3745            &[
3746                &[Some(10), Some(11)],
3747                &[Some(5), Some(11)],
3748                &[None],
3749                &[Some(-5), Some(0)],
3750            ],
3751        )?;
3752        let boundary_order = column_close_result
3753            .column_index
3754            .unwrap()
3755            .get_boundary_order();
3756        assert_eq!(boundary_order, Some(BoundaryOrder::DESCENDING));
3757
3758        // min max both equal
3759        let column_close_result = write_multiple_pages::<Int32Type>(
3760            &descr,
3761            &[&[Some(10), Some(11)], &[None], &[Some(10), Some(11)]],
3762        )?;
3763        let boundary_order = column_close_result
3764            .column_index
3765            .unwrap()
3766            .get_boundary_order();
3767        assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3768
3769        // only nulls
3770        let column_close_result =
3771            write_multiple_pages::<Int32Type>(&descr, &[&[None], &[None], &[None]])?;
3772        let boundary_order = column_close_result
3773            .column_index
3774            .unwrap()
3775            .get_boundary_order();
3776        assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3777
3778        // one page
3779        let column_close_result =
3780            write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)]])?;
3781        let boundary_order = column_close_result
3782            .column_index
3783            .unwrap()
3784            .get_boundary_order();
3785        assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3786
3787        // one non-null page
3788        let column_close_result =
3789            write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)], &[None]])?;
3790        let boundary_order = column_close_result
3791            .column_index
3792            .unwrap()
3793            .get_boundary_order();
3794        assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3795
3796        // min max both unordered
3797        let column_close_result = write_multiple_pages::<Int32Type>(
3798            &descr,
3799            &[
3800                &[Some(10), Some(11)],
3801                &[Some(11), Some(16)],
3802                &[None],
3803                &[Some(-5), Some(0)],
3804            ],
3805        )?;
3806        let boundary_order = column_close_result
3807            .column_index
3808            .unwrap()
3809            .get_boundary_order();
3810        assert_eq!(boundary_order, Some(BoundaryOrder::UNORDERED));
3811
3812        // min max both ordered in different orders
3813        let column_close_result = write_multiple_pages::<Int32Type>(
3814            &descr,
3815            &[
3816                &[Some(1), Some(9)],
3817                &[Some(2), Some(8)],
3818                &[None],
3819                &[Some(3), Some(7)],
3820            ],
3821        )?;
3822        let boundary_order = column_close_result
3823            .column_index
3824            .unwrap()
3825            .get_boundary_order();
3826        assert_eq!(boundary_order, Some(BoundaryOrder::UNORDERED));
3827
3828        Ok(())
3829    }
3830
3831    #[test]
3832    fn test_boundary_order_logical_type() -> Result<()> {
3833        // ensure that logical types account for different sort order than underlying
3834        // physical type representation
3835        let f16_descr = Arc::new(get_test_float16_column_descr(1, 0));
3836        let fba_descr = {
3837            let tpe = SchemaType::primitive_type_builder(
3838                "col",
3839                FixedLenByteArrayType::get_physical_type(),
3840            )
3841            .with_length(2)
3842            .build()?;
3843            Arc::new(ColumnDescriptor::new(
3844                Arc::new(tpe),
3845                1,
3846                0,
3847                ColumnPath::from("col"),
3848            ))
3849        };
3850
3851        let values: &[&[Option<FixedLenByteArray>]] = &[
3852            &[Some(FixedLenByteArray::from(ByteArray::from(f16::ONE)))],
3853            &[Some(FixedLenByteArray::from(ByteArray::from(f16::ZERO)))],
3854            &[Some(FixedLenByteArray::from(ByteArray::from(
3855                f16::NEG_ZERO,
3856            )))],
3857            &[Some(FixedLenByteArray::from(ByteArray::from(f16::NEG_ONE)))],
3858        ];
3859
3860        // f16 descending
3861        let column_close_result =
3862            write_multiple_pages::<FixedLenByteArrayType>(&f16_descr, values)?;
3863        let boundary_order = column_close_result
3864            .column_index
3865            .unwrap()
3866            .get_boundary_order();
3867        assert_eq!(boundary_order, Some(BoundaryOrder::DESCENDING));
3868
3869        // same bytes, but fba unordered
3870        let column_close_result =
3871            write_multiple_pages::<FixedLenByteArrayType>(&fba_descr, values)?;
3872        let boundary_order = column_close_result
3873            .column_index
3874            .unwrap()
3875            .get_boundary_order();
3876        assert_eq!(boundary_order, Some(BoundaryOrder::UNORDERED));
3877
3878        Ok(())
3879    }
3880
3881    #[test]
3882    fn test_interval_stats_should_not_have_min_max() {
3883        let input = [
3884            vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
3885            vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
3886            vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2],
3887        ]
3888        .into_iter()
3889        .map(|s| ByteArray::from(s).into())
3890        .collect::<Vec<_>>();
3891
3892        let page_writer = get_test_page_writer();
3893        let mut writer = get_test_interval_column_writer(page_writer);
3894        writer.write_batch(&input, None, None).unwrap();
3895
3896        let metadata = writer.close().unwrap().metadata;
3897        let stats = if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
3898            stats.clone()
3899        } else {
3900            panic!("metadata missing statistics");
3901        };
3902        assert!(stats.min_bytes_opt().is_none());
3903        assert!(stats.max_bytes_opt().is_none());
3904    }
3905
3906    #[test]
3907    #[cfg(feature = "arrow")]
3908    fn test_column_writer_get_estimated_total_bytes() {
3909        let page_writer = get_test_page_writer();
3910        let props = Default::default();
3911        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
3912        assert_eq!(writer.get_estimated_total_bytes(), 0);
3913
3914        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
3915        writer.add_data_page().unwrap();
3916        let size_with_one_page = writer.get_estimated_total_bytes();
3917        assert_eq!(size_with_one_page, 20);
3918
3919        writer.write_batch(&[5, 6, 7, 8], None, None).unwrap();
3920        writer.add_data_page().unwrap();
3921        let size_with_two_pages = writer.get_estimated_total_bytes();
3922        // different pages have different compressed lengths
3923        assert_eq!(size_with_two_pages, 20 + 21);
3924    }
3925
3926    fn write_multiple_pages<T: DataType>(
3927        column_descr: &Arc<ColumnDescriptor>,
3928        pages: &[&[Option<T::T>]],
3929    ) -> Result<ColumnCloseResult> {
3930        let column_writer = get_column_writer(
3931            column_descr.clone(),
3932            Default::default(),
3933            get_test_page_writer(),
3934        );
3935        let mut writer = get_typed_column_writer::<T>(column_writer);
3936
3937        for &page in pages {
3938            let values = page.iter().filter_map(Clone::clone).collect::<Vec<_>>();
3939            let def_levels = page
3940                .iter()
3941                .map(|maybe_value| if maybe_value.is_some() { 1 } else { 0 })
3942                .collect::<Vec<_>>();
3943            writer.write_batch(&values, Some(&def_levels), None)?;
3944            writer.flush_data_pages()?;
3945        }
3946
3947        writer.close()
3948    }
3949
3950    /// Performs write-read roundtrip with randomly generated values and levels.
3951    /// `max_size` is maximum number of values or levels (if `max_def_level` > 0) to write
3952    /// for a column.
3953    fn column_roundtrip_random<T: DataType>(
3954        props: WriterProperties,
3955        max_size: usize,
3956        min_value: T::T,
3957        max_value: T::T,
3958        max_def_level: i16,
3959        max_rep_level: i16,
3960    ) where
3961        T::T: PartialOrd + SampleUniform + Copy,
3962    {
3963        let mut num_values: usize = 0;
3964
3965        let mut buf: Vec<i16> = Vec::new();
3966        let def_levels = if max_def_level > 0 {
3967            random_numbers_range(max_size, 0, max_def_level + 1, &mut buf);
3968            for &dl in &buf[..] {
3969                if dl == max_def_level {
3970                    num_values += 1;
3971                }
3972            }
3973            Some(&buf[..])
3974        } else {
3975            num_values = max_size;
3976            None
3977        };
3978
3979        let mut buf: Vec<i16> = Vec::new();
3980        let rep_levels = if max_rep_level > 0 {
3981            random_numbers_range(max_size, 0, max_rep_level + 1, &mut buf);
3982            buf[0] = 0; // Must start on record boundary
3983            Some(&buf[..])
3984        } else {
3985            None
3986        };
3987
3988        let mut values: Vec<T::T> = Vec::new();
3989        random_numbers_range(num_values, min_value, max_value, &mut values);
3990
3991        column_roundtrip::<T>(props, &values[..], def_levels, rep_levels);
3992    }
3993
3994    /// Performs write-read roundtrip and asserts written values and levels.
3995    fn column_roundtrip<T: DataType>(
3996        props: WriterProperties,
3997        values: &[T::T],
3998        def_levels: Option<&[i16]>,
3999        rep_levels: Option<&[i16]>,
4000    ) {
4001        let mut file = tempfile::tempfile().unwrap();
4002        let mut write = TrackedWrite::new(&mut file);
4003        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
4004
4005        let max_def_level = match def_levels {
4006            Some(buf) => *buf.iter().max().unwrap_or(&0i16),
4007            None => 0i16,
4008        };
4009
4010        let max_rep_level = match rep_levels {
4011            Some(buf) => *buf.iter().max().unwrap_or(&0i16),
4012            None => 0i16,
4013        };
4014
4015        let mut max_batch_size = values.len();
4016        if let Some(levels) = def_levels {
4017            max_batch_size = max_batch_size.max(levels.len());
4018        }
4019        if let Some(levels) = rep_levels {
4020            max_batch_size = max_batch_size.max(levels.len());
4021        }
4022
4023        let mut writer =
4024            get_test_column_writer::<T>(page_writer, max_def_level, max_rep_level, Arc::new(props));
4025
4026        let values_written = writer.write_batch(values, def_levels, rep_levels).unwrap();
4027        assert_eq!(values_written, values.len());
4028        let result = writer.close().unwrap();
4029
4030        drop(write);
4031
4032        let props = ReaderProperties::builder()
4033            .set_backward_compatible_lz4(false)
4034            .build();
4035        let page_reader = Box::new(
4036            SerializedPageReader::new_with_properties(
4037                Arc::new(file),
4038                &result.metadata,
4039                result.rows_written as usize,
4040                None,
4041                Arc::new(props),
4042            )
4043            .unwrap(),
4044        );
4045        let mut reader = get_test_column_reader::<T>(page_reader, max_def_level, max_rep_level);
4046
4047        let mut actual_values = Vec::with_capacity(max_batch_size);
4048        let mut actual_def_levels = def_levels.map(|_| Vec::with_capacity(max_batch_size));
4049        let mut actual_rep_levels = rep_levels.map(|_| Vec::with_capacity(max_batch_size));
4050
4051        let (_, values_read, levels_read) = reader
4052            .read_records(
4053                max_batch_size,
4054                actual_def_levels.as_mut(),
4055                actual_rep_levels.as_mut(),
4056                &mut actual_values,
4057            )
4058            .unwrap();
4059
4060        // Assert values, definition and repetition levels.
4061
4062        assert_eq!(&actual_values[..values_read], values);
4063        match actual_def_levels {
4064            Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), def_levels),
4065            None => assert_eq!(None, def_levels),
4066        }
4067        match actual_rep_levels {
4068            Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), rep_levels),
4069            None => assert_eq!(None, rep_levels),
4070        }
4071
4072        // Assert written rows.
4073
4074        if let Some(levels) = actual_rep_levels {
4075            let mut actual_rows_written = 0;
4076            for l in levels {
4077                if l == 0 {
4078                    actual_rows_written += 1;
4079                }
4080            }
4081            assert_eq!(actual_rows_written, result.rows_written);
4082        } else if actual_def_levels.is_some() {
4083            assert_eq!(levels_read as u64, result.rows_written);
4084        } else {
4085            assert_eq!(values_read as u64, result.rows_written);
4086        }
4087    }
4088
4089    /// Performs write of provided values and returns column metadata of those values.
4090    /// Used to test encoding support for column writer.
4091    fn column_write_and_get_metadata<T: DataType>(
4092        props: WriterProperties,
4093        values: &[T::T],
4094    ) -> ColumnChunkMetaData {
4095        let page_writer = get_test_page_writer();
4096        let props = Arc::new(props);
4097        let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
4098        writer.write_batch(values, None, None).unwrap();
4099        writer.close().unwrap().metadata
4100    }
4101
4102    // Helper function to more compactly create a PageEncodingStats struct.
4103    fn encoding_stats(page_type: PageType, encoding: Encoding, count: i32) -> PageEncodingStats {
4104        PageEncodingStats {
4105            page_type,
4106            encoding,
4107            count,
4108        }
4109    }
4110
4111    // Function to use in tests for EncodingWriteSupport. This checks that dictionary
4112    // offset and encodings to make sure that column writer uses provided by trait
4113    // encodings.
4114    fn check_encoding_write_support<T: DataType>(
4115        version: WriterVersion,
4116        dict_enabled: bool,
4117        data: &[T::T],
4118        dictionary_page_offset: Option<i64>,
4119        encodings: &[Encoding],
4120        page_encoding_stats: &[PageEncodingStats],
4121    ) {
4122        let props = WriterProperties::builder()
4123            .set_writer_version(version)
4124            .set_dictionary_enabled(dict_enabled)
4125            .build();
4126        let meta = column_write_and_get_metadata::<T>(props, data);
4127        assert_eq!(meta.dictionary_page_offset(), dictionary_page_offset);
4128        assert_eq!(meta.encodings().collect::<Vec<_>>(), encodings);
4129        assert_eq!(meta.page_encoding_stats().unwrap(), page_encoding_stats);
4130    }
4131
4132    /// Returns column writer.
4133    fn get_test_column_writer<'a, T: DataType>(
4134        page_writer: Box<dyn PageWriter + 'a>,
4135        max_def_level: i16,
4136        max_rep_level: i16,
4137        props: WriterPropertiesPtr,
4138    ) -> ColumnWriterImpl<'a, T> {
4139        let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
4140        let column_writer = get_column_writer(descr, props, page_writer);
4141        get_typed_column_writer::<T>(column_writer)
4142    }
4143
4144    fn get_test_column_writer_with_path<'a, T: DataType>(
4145        page_writer: Box<dyn PageWriter + 'a>,
4146        max_def_level: i16,
4147        max_rep_level: i16,
4148        props: WriterPropertiesPtr,
4149        path: ColumnPath,
4150    ) -> ColumnWriterImpl<'a, T> {
4151        let descr = Arc::new(get_test_column_descr_with_path::<T>(
4152            max_def_level,
4153            max_rep_level,
4154            path,
4155        ));
4156        let column_writer = get_column_writer(descr, props, page_writer);
4157        get_typed_column_writer::<T>(column_writer)
4158    }
4159
4160    /// Returns column reader.
4161    fn get_test_column_reader<T: DataType>(
4162        page_reader: Box<dyn PageReader>,
4163        max_def_level: i16,
4164        max_rep_level: i16,
4165    ) -> ColumnReaderImpl<T> {
4166        let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
4167        let column_reader = get_column_reader(descr, page_reader);
4168        get_typed_column_reader::<T>(column_reader)
4169    }
4170
4171    /// Returns descriptor for primitive column.
4172    fn get_test_column_descr<T: DataType>(
4173        max_def_level: i16,
4174        max_rep_level: i16,
4175    ) -> ColumnDescriptor {
4176        let path = ColumnPath::from("col");
4177        let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4178            // length is set for "encoding support" tests for FIXED_LEN_BYTE_ARRAY type,
4179            // it should be no-op for other types
4180            .with_length(1)
4181            .build()
4182            .unwrap();
4183        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4184    }
4185
4186    fn get_test_column_descr_with_path<T: DataType>(
4187        max_def_level: i16,
4188        max_rep_level: i16,
4189        path: ColumnPath,
4190    ) -> ColumnDescriptor {
4191        let name = path.string();
4192        let tpe = SchemaType::primitive_type_builder(&name, T::get_physical_type())
4193            // length is set for "encoding support" tests for FIXED_LEN_BYTE_ARRAY type,
4194            // it should be no-op for other types
4195            .with_length(1)
4196            .build()
4197            .unwrap();
4198        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4199    }
4200
4201    fn write_and_collect_page_values(
4202        path: ColumnPath,
4203        props: WriterPropertiesPtr,
4204        data: &[i32],
4205    ) -> Vec<u32> {
4206        let mut file = tempfile::tempfile().unwrap();
4207        let mut write = TrackedWrite::new(&mut file);
4208        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
4209        let mut writer =
4210            get_test_column_writer_with_path::<Int32Type>(page_writer, 0, 0, props, path);
4211        writer.write_batch(data, None, None).unwrap();
4212        let r = writer.close().unwrap();
4213
4214        drop(write);
4215
4216        let props = ReaderProperties::builder()
4217            .set_backward_compatible_lz4(false)
4218            .build();
4219        let mut page_reader = Box::new(
4220            SerializedPageReader::new_with_properties(
4221                Arc::new(file),
4222                &r.metadata,
4223                r.rows_written as usize,
4224                None,
4225                Arc::new(props),
4226            )
4227            .unwrap(),
4228        );
4229
4230        let mut values_per_page = Vec::new();
4231        while let Some(page) = page_reader.get_next_page().unwrap() {
4232            assert_eq!(page.page_type(), PageType::DATA_PAGE);
4233            values_per_page.push(page.num_values());
4234        }
4235
4236        values_per_page
4237    }
4238
4239    /// Returns page writer that collects pages without serializing them.
4240    fn get_test_page_writer() -> Box<dyn PageWriter> {
4241        Box::new(TestPageWriter {})
4242    }
4243
4244    struct TestPageWriter {}
4245
4246    impl PageWriter for TestPageWriter {
4247        fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
4248            let mut res = PageWriteSpec::new();
4249            res.page_type = page.page_type();
4250            res.uncompressed_size = page.uncompressed_size();
4251            res.compressed_size = page.compressed_size();
4252            res.num_values = page.num_values();
4253            res.offset = 0;
4254            res.bytes_written = page.data().len() as u64;
4255            Ok(res)
4256        }
4257
4258        fn close(&mut self) -> Result<()> {
4259            Ok(())
4260        }
4261    }
4262
4263    /// Write data into parquet using [`get_test_page_writer`] and [`get_test_column_writer`] and returns generated statistics.
4264    fn statistics_roundtrip<T: DataType>(values: &[<T as DataType>::T]) -> Statistics {
4265        let page_writer = get_test_page_writer();
4266        let props = Default::default();
4267        let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
4268        writer.write_batch(values, None, None).unwrap();
4269
4270        let metadata = writer.close().unwrap().metadata;
4271        if let Some(stats) = metadata.statistics() {
4272            stats.clone()
4273        } else {
4274            panic!("metadata missing statistics");
4275        }
4276    }
4277
4278    /// Returns Decimals column writer.
4279    fn get_test_decimals_column_writer<T: DataType>(
4280        page_writer: Box<dyn PageWriter>,
4281        max_def_level: i16,
4282        max_rep_level: i16,
4283        props: WriterPropertiesPtr,
4284    ) -> ColumnWriterImpl<'static, T> {
4285        let descr = Arc::new(get_test_decimals_column_descr::<T>(
4286            max_def_level,
4287            max_rep_level,
4288        ));
4289        let column_writer = get_column_writer(descr, props, page_writer);
4290        get_typed_column_writer::<T>(column_writer)
4291    }
4292
4293    /// Returns descriptor for Decimal type with primitive column.
4294    fn get_test_decimals_column_descr<T: DataType>(
4295        max_def_level: i16,
4296        max_rep_level: i16,
4297    ) -> ColumnDescriptor {
4298        let path = ColumnPath::from("col");
4299        let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4300            .with_length(16)
4301            .with_logical_type(Some(LogicalType::Decimal {
4302                scale: 2,
4303                precision: 3,
4304            }))
4305            .with_scale(2)
4306            .with_precision(3)
4307            .build()
4308            .unwrap();
4309        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4310    }
4311
4312    fn float16_statistics_roundtrip(
4313        values: &[FixedLenByteArray],
4314    ) -> ValueStatistics<FixedLenByteArray> {
4315        let page_writer = get_test_page_writer();
4316        let mut writer = get_test_float16_column_writer(page_writer, Default::default());
4317        writer.write_batch(values, None, None).unwrap();
4318
4319        let metadata = writer.close().unwrap().metadata;
4320        if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
4321            stats.clone()
4322        } else {
4323            panic!("metadata missing statistics");
4324        }
4325    }
4326
4327    fn get_test_float16_column_writer(
4328        page_writer: Box<dyn PageWriter>,
4329        props: WriterPropertiesPtr,
4330    ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
4331        let descr = Arc::new(get_test_float16_column_descr(0, 0));
4332        let column_writer = get_column_writer(descr, props, page_writer);
4333        get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
4334    }
4335
4336    fn get_test_float16_column_descr(max_def_level: i16, max_rep_level: i16) -> ColumnDescriptor {
4337        let path = ColumnPath::from("col");
4338        let tpe =
4339            SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
4340                .with_length(2)
4341                .with_logical_type(Some(LogicalType::Float16))
4342                .build()
4343                .unwrap();
4344        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4345    }
4346
4347    fn get_test_interval_column_writer(
4348        page_writer: Box<dyn PageWriter>,
4349    ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
4350        let descr = Arc::new(get_test_interval_column_descr());
4351        let column_writer = get_column_writer(descr, Default::default(), page_writer);
4352        get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
4353    }
4354
4355    fn get_test_interval_column_descr() -> ColumnDescriptor {
4356        let path = ColumnPath::from("col");
4357        let tpe =
4358            SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
4359                .with_length(12)
4360                .with_converted_type(ConvertedType::INTERVAL)
4361                .build()
4362                .unwrap();
4363        ColumnDescriptor::new(Arc::new(tpe), 0, 0, path)
4364    }
4365
4366    /// Returns column writer for UINT32 Column provided as ConvertedType only
4367    fn get_test_unsigned_int_given_as_converted_column_writer<'a, T: DataType>(
4368        page_writer: Box<dyn PageWriter + 'a>,
4369        max_def_level: i16,
4370        max_rep_level: i16,
4371        props: WriterPropertiesPtr,
4372    ) -> ColumnWriterImpl<'a, T> {
4373        let descr = Arc::new(get_test_converted_type_unsigned_integer_column_descr::<T>(
4374            max_def_level,
4375            max_rep_level,
4376        ));
4377        let column_writer = get_column_writer(descr, props, page_writer);
4378        get_typed_column_writer::<T>(column_writer)
4379    }
4380
4381    /// Returns column descriptor for UINT32 Column provided as ConvertedType only
4382    fn get_test_converted_type_unsigned_integer_column_descr<T: DataType>(
4383        max_def_level: i16,
4384        max_rep_level: i16,
4385    ) -> ColumnDescriptor {
4386        let path = ColumnPath::from("col");
4387        let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4388            .with_converted_type(ConvertedType::UINT_32)
4389            .build()
4390            .unwrap();
4391        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4392    }
4393
4394    #[test]
4395    fn test_page_v2_snappy_compression_fallback() {
4396        // Test that PageV2 sets is_compressed to false when Snappy compression increases data size
4397        let page_writer = TestPageWriter {};
4398
4399        // Create WriterProperties with PageV2 and Snappy compression
4400        let props = WriterProperties::builder()
4401            .set_writer_version(WriterVersion::PARQUET_2_0)
4402            // Disable dictionary to ensure data is written directly
4403            .set_dictionary_enabled(false)
4404            .set_compression(Compression::SNAPPY)
4405            .build();
4406
4407        let mut column_writer =
4408            get_test_column_writer::<ByteArrayType>(Box::new(page_writer), 0, 0, Arc::new(props));
4409
4410        // Create small, simple data that Snappy compression will likely increase in size
4411        // due to compression overhead for very small data
4412        let values = vec![ByteArray::from("a")];
4413
4414        column_writer.write_batch(&values, None, None).unwrap();
4415
4416        let result = column_writer.close().unwrap();
4417        assert_eq!(
4418            result.metadata.uncompressed_size(),
4419            result.metadata.compressed_size()
4420        );
4421    }
4422}