Skip to main content

parquet/column/writer/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains column writer API.
19
20use bytes::Bytes;
21use half::f16;
22
23use crate::bloom_filter::Sbbf;
24use crate::file::page_index::column_index::ColumnIndexMetaData;
25use crate::file::page_index::offset_index::OffsetIndexMetaData;
26use std::collections::{BTreeSet, VecDeque};
27use std::str;
28
29use crate::basic::{
30    BoundaryOrder, Compression, ConvertedType, Encoding, EncodingMask, LogicalType, PageType, Type,
31};
32use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter};
33use crate::column::writer::encoder::{ColumnValueEncoder, ColumnValueEncoderImpl, ColumnValues};
34use crate::compression::{Codec, CodecOptionsBuilder, create_codec};
35use crate::data_type::private::ParquetValueType;
36use crate::data_type::*;
37use crate::encodings::levels::LevelEncoder;
38#[cfg(feature = "encryption")]
39use crate::encryption::encrypt::get_column_crypto_metadata;
40use crate::errors::{ParquetError, Result};
41use crate::file::metadata::{
42    ColumnChunkMetaData, ColumnChunkMetaDataBuilder, ColumnIndexBuilder, LevelHistogram,
43    OffsetIndexBuilder, PageEncodingStats,
44};
45use crate::file::properties::{
46    EnabledStatistics, WriterProperties, WriterPropertiesPtr, WriterVersion,
47};
48use crate::file::statistics::{Statistics, ValueStatistics};
49use crate::schema::types::{ColumnDescPtr, ColumnDescriptor};
50
51pub(crate) mod encoder;
52
53macro_rules! downcast_writer {
54    ($e:expr, $i:ident, $b:expr) => {
55        match $e {
56            Self::BoolColumnWriter($i) => $b,
57            Self::Int32ColumnWriter($i) => $b,
58            Self::Int64ColumnWriter($i) => $b,
59            Self::Int96ColumnWriter($i) => $b,
60            Self::FloatColumnWriter($i) => $b,
61            Self::DoubleColumnWriter($i) => $b,
62            Self::ByteArrayColumnWriter($i) => $b,
63            Self::FixedLenByteArrayColumnWriter($i) => $b,
64        }
65    };
66}
67
68/// Column writer for a Parquet type.
69///
70/// See [`get_column_writer`] to create instances of this type
71pub enum ColumnWriter<'a> {
72    /// Column writer for boolean type
73    BoolColumnWriter(ColumnWriterImpl<'a, BoolType>),
74    /// Column writer for int32 type
75    Int32ColumnWriter(ColumnWriterImpl<'a, Int32Type>),
76    /// Column writer for int64 type
77    Int64ColumnWriter(ColumnWriterImpl<'a, Int64Type>),
78    /// Column writer for int96 (timestamp) type
79    Int96ColumnWriter(ColumnWriterImpl<'a, Int96Type>),
80    /// Column writer for float type
81    FloatColumnWriter(ColumnWriterImpl<'a, FloatType>),
82    /// Column writer for double type
83    DoubleColumnWriter(ColumnWriterImpl<'a, DoubleType>),
84    /// Column writer for byte array type
85    ByteArrayColumnWriter(ColumnWriterImpl<'a, ByteArrayType>),
86    /// Column writer for fixed length byte array type
87    FixedLenByteArrayColumnWriter(ColumnWriterImpl<'a, FixedLenByteArrayType>),
88}
89
90impl ColumnWriter<'_> {
91    /// Returns the estimated total memory usage
92    #[cfg(feature = "arrow")]
93    pub(crate) fn memory_size(&self) -> usize {
94        downcast_writer!(self, typed, typed.memory_size())
95    }
96
97    /// Returns the estimated total encoded bytes for this column writer
98    #[cfg(feature = "arrow")]
99    pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
100        downcast_writer!(self, typed, typed.get_estimated_total_bytes())
101    }
102
103    /// Finalize the currently buffered values as a data page.
104    ///
105    /// This is used by content-defined chunking to force a page boundary at
106    /// content-determined positions.
107    #[cfg(feature = "arrow")]
108    pub(crate) fn add_data_page(&mut self) -> Result<()> {
109        downcast_writer!(self, typed, typed.add_data_page())
110    }
111
112    /// Close this [`ColumnWriter`], returning the metadata for the column chunk.
113    pub fn close(self) -> Result<ColumnCloseResult> {
114        downcast_writer!(self, typed, typed.close())
115    }
116}
117
118/// Create a specific column writer corresponding to column descriptor `descr`.
119pub fn get_column_writer<'a>(
120    descr: ColumnDescPtr,
121    props: WriterPropertiesPtr,
122    page_writer: Box<dyn PageWriter + 'a>,
123) -> ColumnWriter<'a> {
124    match descr.physical_type() {
125        Type::BOOLEAN => {
126            ColumnWriter::BoolColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
127        }
128        Type::INT32 => {
129            ColumnWriter::Int32ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
130        }
131        Type::INT64 => {
132            ColumnWriter::Int64ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
133        }
134        Type::INT96 => {
135            ColumnWriter::Int96ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
136        }
137        Type::FLOAT => {
138            ColumnWriter::FloatColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
139        }
140        Type::DOUBLE => {
141            ColumnWriter::DoubleColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
142        }
143        Type::BYTE_ARRAY => {
144            ColumnWriter::ByteArrayColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
145        }
146        Type::FIXED_LEN_BYTE_ARRAY => ColumnWriter::FixedLenByteArrayColumnWriter(
147            ColumnWriterImpl::new(descr, props, page_writer),
148        ),
149    }
150}
151
152/// Gets a typed column writer for the specific type `T`, by "up-casting" `col_writer` of
153/// non-generic type to a generic column writer type `ColumnWriterImpl`.
154///
155/// Panics if actual enum value for `col_writer` does not match the type `T`.
156pub fn get_typed_column_writer<T: DataType>(col_writer: ColumnWriter) -> ColumnWriterImpl<T> {
157    T::get_column_writer(col_writer).unwrap_or_else(|| {
158        panic!(
159            "Failed to convert column writer into a typed column writer for `{}` type",
160            T::get_physical_type()
161        )
162    })
163}
164
165/// Similar to `get_typed_column_writer` but returns a reference.
166pub fn get_typed_column_writer_ref<'a, 'b: 'a, T: DataType>(
167    col_writer: &'b ColumnWriter<'a>,
168) -> &'b ColumnWriterImpl<'a, T> {
169    T::get_column_writer_ref(col_writer).unwrap_or_else(|| {
170        panic!(
171            "Failed to convert column writer into a typed column writer for `{}` type",
172            T::get_physical_type()
173        )
174    })
175}
176
177/// Similar to `get_typed_column_writer` but returns a reference.
178pub fn get_typed_column_writer_mut<'a, 'b: 'a, T: DataType>(
179    col_writer: &'a mut ColumnWriter<'b>,
180) -> &'a mut ColumnWriterImpl<'b, T> {
181    T::get_column_writer_mut(col_writer).unwrap_or_else(|| {
182        panic!(
183            "Failed to convert column writer into a typed column writer for `{}` type",
184            T::get_physical_type()
185        )
186    })
187}
188
189/// Metadata for a column chunk of a Parquet file.
190///
191/// Note this structure is returned by [`ColumnWriter::close`].
192#[derive(Debug, Clone)]
193pub struct ColumnCloseResult {
194    /// The total number of bytes written
195    pub bytes_written: u64,
196    /// The total number of rows written
197    pub rows_written: u64,
198    /// Metadata for this column chunk
199    pub metadata: ColumnChunkMetaData,
200    /// Optional bloom filter for this column
201    pub bloom_filter: Option<Sbbf>,
202    /// Optional column index, for filtering
203    pub column_index: Option<ColumnIndexMetaData>,
204    /// Optional offset index, identifying page locations
205    pub offset_index: Option<OffsetIndexMetaData>,
206}
207
208// Metrics per page
209#[derive(Default)]
210struct PageMetrics {
211    num_buffered_values: u32,
212    num_buffered_rows: u32,
213    num_page_nulls: u64,
214    repetition_level_histogram: Option<LevelHistogram>,
215    definition_level_histogram: Option<LevelHistogram>,
216}
217
218impl PageMetrics {
219    fn new() -> Self {
220        Default::default()
221    }
222
223    /// Initialize the repetition level histogram
224    fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
225        self.repetition_level_histogram = LevelHistogram::try_new(max_level);
226        self
227    }
228
229    /// Initialize the definition level histogram
230    fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
231        self.definition_level_histogram = LevelHistogram::try_new(max_level);
232        self
233    }
234
235    /// Resets the state of this `PageMetrics` to the initial state.
236    /// If histograms have been initialized their contents will be reset to zero.
237    fn new_page(&mut self) {
238        self.num_buffered_values = 0;
239        self.num_buffered_rows = 0;
240        self.num_page_nulls = 0;
241        self.repetition_level_histogram
242            .as_mut()
243            .map(LevelHistogram::reset);
244        self.definition_level_histogram
245            .as_mut()
246            .map(LevelHistogram::reset);
247    }
248
249    /// Updates histogram values using provided repetition levels
250    fn update_repetition_level_histogram(&mut self, levels: &[i16]) {
251        if let Some(ref mut rep_hist) = self.repetition_level_histogram {
252            rep_hist.update_from_levels(levels);
253        }
254    }
255
256    /// Updates histogram values using provided definition levels
257    fn update_definition_level_histogram(&mut self, levels: &[i16]) {
258        if let Some(ref mut def_hist) = self.definition_level_histogram {
259            def_hist.update_from_levels(levels);
260        }
261    }
262}
263
264// Metrics per column writer
265#[derive(Default)]
266struct ColumnMetrics<T: Default> {
267    total_bytes_written: u64,
268    total_rows_written: u64,
269    total_uncompressed_size: u64,
270    total_compressed_size: u64,
271    total_num_values: u64,
272    dictionary_page_offset: Option<u64>,
273    data_page_offset: Option<u64>,
274    min_column_value: Option<T>,
275    max_column_value: Option<T>,
276    num_column_nulls: u64,
277    column_distinct_count: Option<u64>,
278    variable_length_bytes: Option<i64>,
279    repetition_level_histogram: Option<LevelHistogram>,
280    definition_level_histogram: Option<LevelHistogram>,
281}
282
283impl<T: Default> ColumnMetrics<T> {
284    fn new() -> Self {
285        Default::default()
286    }
287
288    /// Initialize the repetition level histogram
289    fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
290        self.repetition_level_histogram = LevelHistogram::try_new(max_level);
291        self
292    }
293
294    /// Initialize the definition level histogram
295    fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
296        self.definition_level_histogram = LevelHistogram::try_new(max_level);
297        self
298    }
299
300    /// Sum `page_histogram` into `chunk_histogram`
301    fn update_histogram(
302        chunk_histogram: &mut Option<LevelHistogram>,
303        page_histogram: &Option<LevelHistogram>,
304    ) {
305        if let (Some(page_hist), Some(chunk_hist)) = (page_histogram, chunk_histogram) {
306            chunk_hist.add(page_hist);
307        }
308    }
309
310    /// Sum the provided PageMetrics histograms into the chunk histograms. Does nothing if
311    /// page histograms are not initialized.
312    fn update_from_page_metrics(&mut self, page_metrics: &PageMetrics) {
313        ColumnMetrics::<T>::update_histogram(
314            &mut self.definition_level_histogram,
315            &page_metrics.definition_level_histogram,
316        );
317        ColumnMetrics::<T>::update_histogram(
318            &mut self.repetition_level_histogram,
319            &page_metrics.repetition_level_histogram,
320        );
321    }
322
323    /// Sum the provided page variable_length_bytes into the chunk variable_length_bytes
324    fn update_variable_length_bytes(&mut self, variable_length_bytes: Option<i64>) {
325        if let Some(var_bytes) = variable_length_bytes {
326            *self.variable_length_bytes.get_or_insert(0) += var_bytes;
327        }
328    }
329}
330
331/// Typed column writer for a primitive column.
332pub type ColumnWriterImpl<'a, T> = GenericColumnWriter<'a, ColumnValueEncoderImpl<T>>;
333
334/// Generic column writer for a primitive Parquet column
335pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> {
336    // Column writer properties
337    descr: ColumnDescPtr,
338    props: WriterPropertiesPtr,
339    statistics_enabled: EnabledStatistics,
340
341    page_writer: Box<dyn PageWriter + 'a>,
342    codec: Compression,
343    compressor: Option<Box<dyn Codec>>,
344    encoder: E,
345
346    page_metrics: PageMetrics,
347    // Metrics per column writer
348    column_metrics: ColumnMetrics<E::T>,
349
350    /// The order of encodings within the generated metadata does not impact its meaning,
351    /// but we use a BTreeSet so that the output is deterministic
352    encodings: BTreeSet<Encoding>,
353    encoding_stats: Vec<PageEncodingStats>,
354    // Reused buffers
355    def_levels_sink: Vec<i16>,
356    rep_levels_sink: Vec<i16>,
357    data_pages: VecDeque<CompressedPage>,
358    // column index and offset index
359    column_index_builder: ColumnIndexBuilder,
360    offset_index_builder: Option<OffsetIndexBuilder>,
361
362    // Below fields used to incrementally check boundary order across data pages.
363    // We assume they are ascending/descending until proven wrong.
364    data_page_boundary_ascending: bool,
365    data_page_boundary_descending: bool,
366    /// (min, max)
367    last_non_null_data_page_min_max: Option<(E::T, E::T)>,
368}
369
370impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
371    /// Returns a new instance of [`GenericColumnWriter`].
372    pub fn new(
373        descr: ColumnDescPtr,
374        props: WriterPropertiesPtr,
375        page_writer: Box<dyn PageWriter + 'a>,
376    ) -> Self {
377        let codec = props.compression(descr.path());
378        let codec_options = CodecOptionsBuilder::default().build();
379        let compressor = create_codec(codec, &codec_options).unwrap();
380        let encoder = E::try_new(&descr, props.as_ref()).unwrap();
381
382        let statistics_enabled = props.statistics_enabled(descr.path());
383
384        let mut encodings = BTreeSet::new();
385        // Used for level information
386        encodings.insert(Encoding::RLE);
387
388        let mut page_metrics = PageMetrics::new();
389        let mut column_metrics = ColumnMetrics::<E::T>::new();
390
391        // Initialize level histograms if collecting page or chunk statistics
392        if statistics_enabled != EnabledStatistics::None {
393            page_metrics = page_metrics
394                .with_repetition_level_histogram(descr.max_rep_level())
395                .with_definition_level_histogram(descr.max_def_level());
396            column_metrics = column_metrics
397                .with_repetition_level_histogram(descr.max_rep_level())
398                .with_definition_level_histogram(descr.max_def_level())
399        }
400
401        // Disable column_index_builder if not collecting page statistics.
402        let mut column_index_builder = ColumnIndexBuilder::new(descr.physical_type());
403        if statistics_enabled != EnabledStatistics::Page {
404            column_index_builder.to_invalid()
405        }
406
407        // Disable offset_index_builder if requested by user.
408        let offset_index_builder = match props.offset_index_disabled() {
409            false => Some(OffsetIndexBuilder::new()),
410            _ => None,
411        };
412
413        Self {
414            descr,
415            props,
416            statistics_enabled,
417            page_writer,
418            codec,
419            compressor,
420            encoder,
421            def_levels_sink: vec![],
422            rep_levels_sink: vec![],
423            data_pages: VecDeque::new(),
424            page_metrics,
425            column_metrics,
426            column_index_builder,
427            offset_index_builder,
428            encodings,
429            encoding_stats: vec![],
430            data_page_boundary_ascending: true,
431            data_page_boundary_descending: true,
432            last_non_null_data_page_min_max: None,
433        }
434    }
435
436    #[allow(clippy::too_many_arguments)]
437    pub(crate) fn write_batch_internal(
438        &mut self,
439        values: &E::Values,
440        value_indices: Option<&[usize]>,
441        def_levels: Option<&[i16]>,
442        rep_levels: Option<&[i16]>,
443        min: Option<&E::T>,
444        max: Option<&E::T>,
445        distinct_count: Option<u64>,
446    ) -> Result<usize> {
447        // Check if number of definition levels is the same as number of repetition levels.
448        if let (Some(def), Some(rep)) = (def_levels, rep_levels) {
449            if def.len() != rep.len() {
450                return Err(general_err!(
451                    "Inconsistent length of definition and repetition levels: {} != {}",
452                    def.len(),
453                    rep.len()
454                ));
455            }
456        }
457
458        // We check for DataPage limits only after we have inserted the values. If a user
459        // writes a large number of values, the DataPage size can be well above the limit.
460        //
461        // The purpose of this chunking is to bound this. Even if a user writes large
462        // number of values, the chunking will ensure that we add data page at a
463        // reasonable pagesize limit.
464
465        // TODO: find out why we don't account for size of levels when we estimate page
466        // size.
467
468        let num_levels = match def_levels {
469            Some(def_levels) => def_levels.len(),
470            None => values.len(),
471        };
472
473        if let Some(min) = min {
474            update_min(&self.descr, min, &mut self.column_metrics.min_column_value);
475        }
476        if let Some(max) = max {
477            update_max(&self.descr, max, &mut self.column_metrics.max_column_value);
478        }
479
480        // We can only set the distinct count if there are no other writes
481        if self.encoder.num_values() == 0 {
482            self.column_metrics.column_distinct_count = distinct_count;
483        } else {
484            self.column_metrics.column_distinct_count = None;
485        }
486
487        let mut values_offset = 0;
488        let mut levels_offset = 0;
489        let base_batch_size = self.props.write_batch_size();
490        while levels_offset < num_levels {
491            let mut end_offset = num_levels.min(levels_offset + base_batch_size);
492
493            // Split at record boundary
494            if let Some(r) = rep_levels {
495                while end_offset < r.len() && r[end_offset] != 0 {
496                    end_offset += 1;
497                }
498            }
499
500            values_offset += self.write_mini_batch(
501                values,
502                values_offset,
503                value_indices,
504                end_offset - levels_offset,
505                def_levels.map(|lv| &lv[levels_offset..end_offset]),
506                rep_levels.map(|lv| &lv[levels_offset..end_offset]),
507            )?;
508            levels_offset = end_offset;
509        }
510
511        // Return total number of values processed.
512        Ok(values_offset)
513    }
514
515    /// Writes batch of values, definition levels and repetition levels.
516    /// Returns number of values processed (written).
517    ///
518    /// If definition and repetition levels are provided, we write fully those levels and
519    /// select how many values to write (this number will be returned), since number of
520    /// actual written values may be smaller than provided values.
521    ///
522    /// If only values are provided, then all values are written and the length of
523    /// of the values buffer is returned.
524    ///
525    /// Definition and/or repetition levels can be omitted, if values are
526    /// non-nullable and/or non-repeated.
527    pub fn write_batch(
528        &mut self,
529        values: &E::Values,
530        def_levels: Option<&[i16]>,
531        rep_levels: Option<&[i16]>,
532    ) -> Result<usize> {
533        self.write_batch_internal(values, None, def_levels, rep_levels, None, None, None)
534    }
535
536    /// Writer may optionally provide pre-calculated statistics for use when computing
537    /// chunk-level statistics
538    ///
539    /// NB: [`WriterProperties::statistics_enabled`] must be set to [`EnabledStatistics::Chunk`]
540    /// for these statistics to take effect. If [`EnabledStatistics::None`] they will be ignored,
541    /// and if [`EnabledStatistics::Page`] the chunk statistics will instead be computed from the
542    /// computed page statistics
543    pub fn write_batch_with_statistics(
544        &mut self,
545        values: &E::Values,
546        def_levels: Option<&[i16]>,
547        rep_levels: Option<&[i16]>,
548        min: Option<&E::T>,
549        max: Option<&E::T>,
550        distinct_count: Option<u64>,
551    ) -> Result<usize> {
552        self.write_batch_internal(
553            values,
554            None,
555            def_levels,
556            rep_levels,
557            min,
558            max,
559            distinct_count,
560        )
561    }
562
563    /// Returns the estimated total memory usage.
564    ///
565    /// Unlike [`Self::get_estimated_total_bytes`] this is an estimate
566    /// of the current memory usage and not the final anticipated encoded size.
567    #[cfg(feature = "arrow")]
568    pub(crate) fn memory_size(&self) -> usize {
569        self.column_metrics.total_bytes_written as usize + self.encoder.estimated_memory_size()
570    }
571
572    /// Returns total number of bytes written by this column writer so far.
573    /// This value is also returned when column writer is closed.
574    ///
575    /// Note: this value does not include any buffered data that has not
576    /// yet been flushed to a page.
577    pub fn get_total_bytes_written(&self) -> u64 {
578        self.column_metrics.total_bytes_written
579    }
580
581    /// Returns the estimated total encoded bytes for this column writer.
582    ///
583    /// Unlike [`Self::get_total_bytes_written`] this includes an estimate
584    /// of any data that has not yet been flushed to a page, based on it's
585    /// anticipated encoded size.
586    #[cfg(feature = "arrow")]
587    pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
588        self.data_pages
589            .iter()
590            .map(|page| page.data().len() as u64)
591            .sum::<u64>()
592            + self.column_metrics.total_bytes_written
593            + self.encoder.estimated_data_page_size() as u64
594            + self.encoder.estimated_dict_page_size().unwrap_or_default() as u64
595    }
596
597    /// Returns total number of rows written by this column writer so far.
598    /// This value is also returned when column writer is closed.
599    pub fn get_total_rows_written(&self) -> u64 {
600        self.column_metrics.total_rows_written
601    }
602
603    /// Returns a reference to a [`ColumnDescPtr`]
604    pub fn get_descriptor(&self) -> &ColumnDescPtr {
605        &self.descr
606    }
607
608    /// Finalizes writes and closes the column writer.
609    /// Returns total bytes written, total rows written and column chunk metadata.
610    pub fn close(mut self) -> Result<ColumnCloseResult> {
611        if self.page_metrics.num_buffered_values > 0 {
612            self.add_data_page()?;
613        }
614        if self.encoder.has_dictionary() {
615            self.write_dictionary_page()?;
616        }
617        self.flush_data_pages()?;
618        let metadata = self.build_column_metadata()?;
619        self.page_writer.close()?;
620
621        let boundary_order = match (
622            self.data_page_boundary_ascending,
623            self.data_page_boundary_descending,
624        ) {
625            // If the lists are composed of equal elements then will be marked as ascending
626            // (Also the case if all pages are null pages)
627            (true, _) => BoundaryOrder::ASCENDING,
628            (false, true) => BoundaryOrder::DESCENDING,
629            (false, false) => BoundaryOrder::UNORDERED,
630        };
631        self.column_index_builder.set_boundary_order(boundary_order);
632
633        let column_index = match self.column_index_builder.valid() {
634            true => Some(self.column_index_builder.build()?),
635            false => None,
636        };
637
638        let offset_index = self.offset_index_builder.map(|b| b.build());
639
640        Ok(ColumnCloseResult {
641            bytes_written: self.column_metrics.total_bytes_written,
642            rows_written: self.column_metrics.total_rows_written,
643            bloom_filter: self.encoder.flush_bloom_filter(),
644            metadata,
645            column_index,
646            offset_index,
647        })
648    }
649
650    /// Writes mini batch of values, definition and repetition levels.
651    /// This allows fine-grained processing of values and maintaining a reasonable
652    /// page size.
653    fn write_mini_batch(
654        &mut self,
655        values: &E::Values,
656        values_offset: usize,
657        value_indices: Option<&[usize]>,
658        num_levels: usize,
659        def_levels: Option<&[i16]>,
660        rep_levels: Option<&[i16]>,
661    ) -> Result<usize> {
662        // Process definition levels and determine how many values to write.
663        let values_to_write = if self.descr.max_def_level() > 0 {
664            let levels = def_levels.ok_or_else(|| {
665                general_err!(
666                    "Definition levels are required, because max definition level = {}",
667                    self.descr.max_def_level()
668                )
669            })?;
670
671            let values_to_write = levels
672                .iter()
673                .map(|level| (*level == self.descr.max_def_level()) as usize)
674                .sum();
675            self.page_metrics.num_page_nulls += (levels.len() - values_to_write) as u64;
676
677            // Update histogram
678            self.page_metrics.update_definition_level_histogram(levels);
679
680            self.def_levels_sink.extend_from_slice(levels);
681            values_to_write
682        } else {
683            num_levels
684        };
685
686        // Process repetition levels and determine how many rows we are about to process.
687        if self.descr.max_rep_level() > 0 {
688            // A row could contain more than one value.
689            let levels = rep_levels.ok_or_else(|| {
690                general_err!(
691                    "Repetition levels are required, because max repetition level = {}",
692                    self.descr.max_rep_level()
693                )
694            })?;
695
696            if !levels.is_empty() && levels[0] != 0 {
697                return Err(general_err!(
698                    "Write must start at a record boundary, got non-zero repetition level of {}",
699                    levels[0]
700                ));
701            }
702
703            // Count the occasions where we start a new row
704            for &level in levels {
705                self.page_metrics.num_buffered_rows += (level == 0) as u32
706            }
707
708            // Update histogram
709            self.page_metrics.update_repetition_level_histogram(levels);
710
711            self.rep_levels_sink.extend_from_slice(levels);
712        } else {
713            // Each value is exactly one row.
714            // Equals to the number of values, we count nulls as well.
715            self.page_metrics.num_buffered_rows += num_levels as u32;
716        }
717
718        match value_indices {
719            Some(indices) => {
720                let indices = &indices[values_offset..values_offset + values_to_write];
721                self.encoder.write_gather(values, indices)?;
722            }
723            None => self.encoder.write(values, values_offset, values_to_write)?,
724        }
725
726        self.page_metrics.num_buffered_values += num_levels as u32;
727
728        if self.should_add_data_page() {
729            self.add_data_page()?;
730        }
731
732        if self.should_dict_fallback() {
733            self.dict_fallback()?;
734        }
735
736        Ok(values_to_write)
737    }
738
739    /// Returns true if we need to fall back to non-dictionary encoding.
740    ///
741    /// We can only fall back if dictionary encoder is set and we have exceeded dictionary
742    /// size.
743    #[inline]
744    fn should_dict_fallback(&self) -> bool {
745        match self.encoder.estimated_dict_page_size() {
746            Some(size) => {
747                size >= self
748                    .props
749                    .column_dictionary_page_size_limit(self.descr.path())
750            }
751            None => false,
752        }
753    }
754
755    /// Returns true if there is enough data for a data page, false otherwise.
756    #[inline]
757    fn should_add_data_page(&self) -> bool {
758        // This is necessary in the event of a much larger dictionary size than page size
759        //
760        // In such a scenario the dictionary decoder may return an estimated encoded
761        // size in excess of the page size limit, even when there are no buffered values
762        if self.page_metrics.num_buffered_values == 0 {
763            return false;
764        }
765
766        self.page_metrics.num_buffered_rows as usize >= self.props.data_page_row_count_limit()
767            || self.encoder.estimated_data_page_size()
768                >= self.props.column_data_page_size_limit(self.descr.path())
769    }
770
771    /// Performs dictionary fallback.
772    /// Prepares and writes dictionary and all data pages into page writer.
773    fn dict_fallback(&mut self) -> Result<()> {
774        // At this point we know that we need to fall back.
775        if self.page_metrics.num_buffered_values > 0 {
776            self.add_data_page()?;
777        }
778        self.write_dictionary_page()?;
779        self.flush_data_pages()?;
780        Ok(())
781    }
782
783    /// Update the column index and offset index when adding the data page
784    fn update_column_offset_index(
785        &mut self,
786        page_statistics: Option<&ValueStatistics<E::T>>,
787        page_variable_length_bytes: Option<i64>,
788    ) {
789        // update the column index
790        let null_page =
791            (self.page_metrics.num_buffered_rows as u64) == self.page_metrics.num_page_nulls;
792        // a page contains only null values,
793        // and writers have to set the corresponding entries in min_values and max_values to byte[0]
794        if null_page && self.column_index_builder.valid() {
795            self.column_index_builder.append(
796                null_page,
797                vec![],
798                vec![],
799                self.page_metrics.num_page_nulls as i64,
800            );
801        } else if self.column_index_builder.valid() {
802            // from page statistics
803            // If can't get the page statistics, ignore this column/offset index for this column chunk
804            match &page_statistics {
805                None => {
806                    self.column_index_builder.to_invalid();
807                }
808                Some(stat) => {
809                    // Check if min/max are still ascending/descending across pages
810                    let new_min = stat.min_opt().unwrap();
811                    let new_max = stat.max_opt().unwrap();
812                    if let Some((last_min, last_max)) = &self.last_non_null_data_page_min_max {
813                        if self.data_page_boundary_ascending {
814                            // If last min/max are greater than new min/max then not ascending anymore
815                            let not_ascending = compare_greater(&self.descr, last_min, new_min)
816                                || compare_greater(&self.descr, last_max, new_max);
817                            if not_ascending {
818                                self.data_page_boundary_ascending = false;
819                            }
820                        }
821
822                        if self.data_page_boundary_descending {
823                            // If new min/max are greater than last min/max then not descending anymore
824                            let not_descending = compare_greater(&self.descr, new_min, last_min)
825                                || compare_greater(&self.descr, new_max, last_max);
826                            if not_descending {
827                                self.data_page_boundary_descending = false;
828                            }
829                        }
830                    }
831                    self.last_non_null_data_page_min_max = Some((new_min.clone(), new_max.clone()));
832
833                    if self.can_truncate_value() {
834                        self.column_index_builder.append(
835                            null_page,
836                            self.truncate_min_value(
837                                self.props.column_index_truncate_length(),
838                                stat.min_bytes_opt().unwrap(),
839                            )
840                            .0,
841                            self.truncate_max_value(
842                                self.props.column_index_truncate_length(),
843                                stat.max_bytes_opt().unwrap(),
844                            )
845                            .0,
846                            self.page_metrics.num_page_nulls as i64,
847                        );
848                    } else {
849                        self.column_index_builder.append(
850                            null_page,
851                            stat.min_bytes_opt().unwrap().to_vec(),
852                            stat.max_bytes_opt().unwrap().to_vec(),
853                            self.page_metrics.num_page_nulls as i64,
854                        );
855                    }
856                }
857            }
858        }
859
860        // Append page histograms to the `ColumnIndex` histograms
861        self.column_index_builder.append_histograms(
862            &self.page_metrics.repetition_level_histogram,
863            &self.page_metrics.definition_level_histogram,
864        );
865
866        // Update the offset index
867        if let Some(builder) = self.offset_index_builder.as_mut() {
868            builder.append_row_count(self.page_metrics.num_buffered_rows as i64);
869            builder.append_unencoded_byte_array_data_bytes(page_variable_length_bytes);
870        }
871    }
872
873    /// Determine if we should allow truncating min/max values for this column's statistics
874    fn can_truncate_value(&self) -> bool {
875        match self.descr.physical_type() {
876            // Don't truncate for Float16 and Decimal because their sort order is different
877            // from that of FIXED_LEN_BYTE_ARRAY sort order.
878            // So truncation of those types could lead to inaccurate min/max statistics
879            Type::FIXED_LEN_BYTE_ARRAY
880                if !matches!(
881                    self.descr.logical_type_ref(),
882                    Some(&LogicalType::Decimal { .. }) | Some(&LogicalType::Float16)
883                ) =>
884            {
885                true
886            }
887            Type::BYTE_ARRAY => true,
888            // Truncation only applies for fba/binary physical types
889            _ => false,
890        }
891    }
892
893    /// Returns `true` if this column's logical type is a UTF-8 string.
894    fn is_utf8(&self) -> bool {
895        self.get_descriptor().logical_type_ref() == Some(&LogicalType::String)
896            || self.get_descriptor().converted_type() == ConvertedType::UTF8
897    }
898
899    /// Truncates a binary statistic to at most `truncation_length` bytes.
900    ///
901    /// If truncation is not possible, returns `data`.
902    ///
903    /// The `bool` in the returned tuple indicates whether truncation occurred or not.
904    ///
905    /// UTF-8 Note:
906    /// If the column type indicates UTF-8, and `data` contains valid UTF-8, then the result will
907    /// also remain valid UTF-8, but may be less tnan `truncation_length` bytes to avoid splitting
908    /// on non-character boundaries.
909    fn truncate_min_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
910        truncation_length
911            .filter(|l| data.len() > *l)
912            .and_then(|l|
913                // don't do extra work if this column isn't UTF-8
914                if self.is_utf8() {
915                    match str::from_utf8(data) {
916                        Ok(str_data) => truncate_utf8(str_data, l),
917                        Err(_) => Some(data[..l].to_vec()),
918                    }
919                } else {
920                    Some(data[..l].to_vec())
921                }
922            )
923            .map(|truncated| (truncated, true))
924            .unwrap_or_else(|| (data.to_vec(), false))
925    }
926
927    /// Truncates a binary statistic to at most `truncation_length` bytes, and then increment the
928    /// final byte(s) to yield a valid upper bound. This may result in a result of less than
929    /// `truncation_length` bytes if the last byte(s) overflows.
930    ///
931    /// If truncation is not possible, returns `data`.
932    ///
933    /// The `bool` in the returned tuple indicates whether truncation occurred or not.
934    ///
935    /// UTF-8 Note:
936    /// If the column type indicates UTF-8, and `data` contains valid UTF-8, then the result will
937    /// also remain valid UTF-8 (but again may be less than `truncation_length` bytes). If `data`
938    /// does not contain valid UTF-8, then truncation will occur as if the column is non-string
939    /// binary.
940    fn truncate_max_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
941        truncation_length
942            .filter(|l| data.len() > *l)
943            .and_then(|l|
944                // don't do extra work if this column isn't UTF-8
945                if self.is_utf8() {
946                    match str::from_utf8(data) {
947                        Ok(str_data) => truncate_and_increment_utf8(str_data, l),
948                        Err(_) => increment(data[..l].to_vec()),
949                    }
950                } else {
951                    increment(data[..l].to_vec())
952                }
953            )
954            .map(|truncated| (truncated, true))
955            .unwrap_or_else(|| (data.to_vec(), false))
956    }
957
958    /// Truncate the min and max values that will be written to a data page
959    /// header or column chunk Statistics
960    fn truncate_statistics(&self, statistics: Statistics) -> Statistics {
961        let backwards_compatible_min_max = self.descr.sort_order().is_signed();
962        match statistics {
963            Statistics::ByteArray(stats) if stats._internal_has_min_max_set() => {
964                let (min, did_truncate_min) = self.truncate_min_value(
965                    self.props.statistics_truncate_length(),
966                    stats.min_bytes_opt().unwrap(),
967                );
968                let (max, did_truncate_max) = self.truncate_max_value(
969                    self.props.statistics_truncate_length(),
970                    stats.max_bytes_opt().unwrap(),
971                );
972                Statistics::ByteArray(
973                    ValueStatistics::new(
974                        Some(min.into()),
975                        Some(max.into()),
976                        stats.distinct_count(),
977                        stats.null_count_opt(),
978                        backwards_compatible_min_max,
979                    )
980                    .with_max_is_exact(!did_truncate_max)
981                    .with_min_is_exact(!did_truncate_min),
982                )
983            }
984            Statistics::FixedLenByteArray(stats)
985                if (stats._internal_has_min_max_set() && self.can_truncate_value()) =>
986            {
987                let (min, did_truncate_min) = self.truncate_min_value(
988                    self.props.statistics_truncate_length(),
989                    stats.min_bytes_opt().unwrap(),
990                );
991                let (max, did_truncate_max) = self.truncate_max_value(
992                    self.props.statistics_truncate_length(),
993                    stats.max_bytes_opt().unwrap(),
994                );
995                Statistics::FixedLenByteArray(
996                    ValueStatistics::new(
997                        Some(min.into()),
998                        Some(max.into()),
999                        stats.distinct_count(),
1000                        stats.null_count_opt(),
1001                        backwards_compatible_min_max,
1002                    )
1003                    .with_max_is_exact(!did_truncate_max)
1004                    .with_min_is_exact(!did_truncate_min),
1005                )
1006            }
1007            stats => stats,
1008        }
1009    }
1010
1011    /// Adds data page.
1012    /// Data page is either buffered in case of dictionary encoding or written directly.
1013    pub(crate) fn add_data_page(&mut self) -> Result<()> {
1014        // Extract encoded values
1015        let values_data = self.encoder.flush_data_page()?;
1016
1017        let max_def_level = self.descr.max_def_level();
1018        let max_rep_level = self.descr.max_rep_level();
1019
1020        self.column_metrics.num_column_nulls += self.page_metrics.num_page_nulls;
1021
1022        let page_statistics = match (values_data.min_value, values_data.max_value) {
1023            (Some(min), Some(max)) => {
1024                // Update chunk level statistics
1025                update_min(&self.descr, &min, &mut self.column_metrics.min_column_value);
1026                update_max(&self.descr, &max, &mut self.column_metrics.max_column_value);
1027
1028                (self.statistics_enabled == EnabledStatistics::Page).then_some(
1029                    ValueStatistics::new(
1030                        Some(min),
1031                        Some(max),
1032                        None,
1033                        Some(self.page_metrics.num_page_nulls),
1034                        false,
1035                    ),
1036                )
1037            }
1038            _ => None,
1039        };
1040
1041        // update column and offset index
1042        self.update_column_offset_index(
1043            page_statistics.as_ref(),
1044            values_data.variable_length_bytes,
1045        );
1046
1047        // Update histograms and variable_length_bytes in column_metrics
1048        self.column_metrics
1049            .update_from_page_metrics(&self.page_metrics);
1050        self.column_metrics
1051            .update_variable_length_bytes(values_data.variable_length_bytes);
1052
1053        // From here on, we only need page statistics if they will be written to the page header.
1054        let page_statistics = page_statistics
1055            .filter(|_| self.props.write_page_header_statistics(self.descr.path()))
1056            .map(|stats| self.truncate_statistics(Statistics::from(stats)));
1057
1058        let compressed_page = match self.props.writer_version() {
1059            WriterVersion::PARQUET_1_0 => {
1060                let mut buffer = vec![];
1061
1062                if max_rep_level > 0 {
1063                    buffer.extend_from_slice(
1064                        &self.encode_levels_v1(
1065                            Encoding::RLE,
1066                            &self.rep_levels_sink[..],
1067                            max_rep_level,
1068                        )[..],
1069                    );
1070                }
1071
1072                if max_def_level > 0 {
1073                    buffer.extend_from_slice(
1074                        &self.encode_levels_v1(
1075                            Encoding::RLE,
1076                            &self.def_levels_sink[..],
1077                            max_def_level,
1078                        )[..],
1079                    );
1080                }
1081
1082                buffer.extend_from_slice(&values_data.buf);
1083                let uncompressed_size = buffer.len();
1084
1085                if let Some(ref mut cmpr) = self.compressor {
1086                    let mut compressed_buf = Vec::with_capacity(uncompressed_size);
1087                    cmpr.compress(&buffer[..], &mut compressed_buf)?;
1088                    compressed_buf.shrink_to_fit();
1089                    buffer = compressed_buf;
1090                }
1091
1092                let data_page = Page::DataPage {
1093                    buf: buffer.into(),
1094                    num_values: self.page_metrics.num_buffered_values,
1095                    encoding: values_data.encoding,
1096                    def_level_encoding: Encoding::RLE,
1097                    rep_level_encoding: Encoding::RLE,
1098                    statistics: page_statistics,
1099                };
1100
1101                CompressedPage::new(data_page, uncompressed_size)
1102            }
1103            WriterVersion::PARQUET_2_0 => {
1104                let mut rep_levels_byte_len = 0;
1105                let mut def_levels_byte_len = 0;
1106                let mut buffer = vec![];
1107
1108                if max_rep_level > 0 {
1109                    let levels = self.encode_levels_v2(&self.rep_levels_sink[..], max_rep_level);
1110                    rep_levels_byte_len = levels.len();
1111                    buffer.extend_from_slice(&levels[..]);
1112                }
1113
1114                if max_def_level > 0 {
1115                    let levels = self.encode_levels_v2(&self.def_levels_sink[..], max_def_level);
1116                    def_levels_byte_len = levels.len();
1117                    buffer.extend_from_slice(&levels[..]);
1118                }
1119
1120                let uncompressed_size =
1121                    rep_levels_byte_len + def_levels_byte_len + values_data.buf.len();
1122
1123                // Data Page v2 compresses values only.
1124                let is_compressed = match self.compressor {
1125                    Some(ref mut cmpr) => {
1126                        let buffer_len = buffer.len();
1127                        cmpr.compress(&values_data.buf, &mut buffer)?;
1128                        if uncompressed_size <= buffer.len() - buffer_len {
1129                            buffer.truncate(buffer_len);
1130                            buffer.extend_from_slice(&values_data.buf);
1131                            false
1132                        } else {
1133                            true
1134                        }
1135                    }
1136                    None => {
1137                        buffer.extend_from_slice(&values_data.buf);
1138                        false
1139                    }
1140                };
1141
1142                let data_page = Page::DataPageV2 {
1143                    buf: buffer.into(),
1144                    num_values: self.page_metrics.num_buffered_values,
1145                    encoding: values_data.encoding,
1146                    num_nulls: self.page_metrics.num_page_nulls as u32,
1147                    num_rows: self.page_metrics.num_buffered_rows,
1148                    def_levels_byte_len: def_levels_byte_len as u32,
1149                    rep_levels_byte_len: rep_levels_byte_len as u32,
1150                    is_compressed,
1151                    statistics: page_statistics,
1152                };
1153
1154                CompressedPage::new(data_page, uncompressed_size)
1155            }
1156        };
1157
1158        // Check if we need to buffer data page or flush it to the sink directly.
1159        if self.encoder.has_dictionary() {
1160            self.data_pages.push_back(compressed_page);
1161        } else {
1162            self.write_data_page(compressed_page)?;
1163        }
1164
1165        // Update total number of rows.
1166        self.column_metrics.total_rows_written += self.page_metrics.num_buffered_rows as u64;
1167
1168        // Reset state.
1169        self.rep_levels_sink.clear();
1170        self.def_levels_sink.clear();
1171        self.page_metrics.new_page();
1172
1173        Ok(())
1174    }
1175
1176    /// Finalises any outstanding data pages and flushes buffered data pages from
1177    /// dictionary encoding into underlying sink.
1178    #[inline]
1179    fn flush_data_pages(&mut self) -> Result<()> {
1180        // Write all outstanding data to a new page.
1181        if self.page_metrics.num_buffered_values > 0 {
1182            self.add_data_page()?;
1183        }
1184
1185        while let Some(page) = self.data_pages.pop_front() {
1186            self.write_data_page(page)?;
1187        }
1188
1189        Ok(())
1190    }
1191
1192    /// Assembles column chunk metadata.
1193    fn build_column_metadata(&mut self) -> Result<ColumnChunkMetaData> {
1194        let total_compressed_size = self.column_metrics.total_compressed_size as i64;
1195        let total_uncompressed_size = self.column_metrics.total_uncompressed_size as i64;
1196        let num_values = self.column_metrics.total_num_values as i64;
1197        let dict_page_offset = self.column_metrics.dictionary_page_offset.map(|v| v as i64);
1198        // If data page offset is not set, then no pages have been written
1199        let data_page_offset = self.column_metrics.data_page_offset.unwrap_or(0) as i64;
1200
1201        let mut builder = ColumnChunkMetaData::builder(self.descr.clone())
1202            .set_compression(self.codec)
1203            .set_encodings_mask(EncodingMask::new_from_encodings(self.encodings.iter()))
1204            .set_page_encoding_stats(self.encoding_stats.clone())
1205            .set_total_compressed_size(total_compressed_size)
1206            .set_total_uncompressed_size(total_uncompressed_size)
1207            .set_num_values(num_values)
1208            .set_data_page_offset(data_page_offset)
1209            .set_dictionary_page_offset(dict_page_offset);
1210
1211        if self.statistics_enabled != EnabledStatistics::None {
1212            let backwards_compatible_min_max = self.descr.sort_order().is_signed();
1213
1214            let statistics = ValueStatistics::<E::T>::new(
1215                self.column_metrics.min_column_value.clone(),
1216                self.column_metrics.max_column_value.clone(),
1217                self.column_metrics.column_distinct_count,
1218                Some(self.column_metrics.num_column_nulls),
1219                false,
1220            )
1221            .with_backwards_compatible_min_max(backwards_compatible_min_max)
1222            .into();
1223
1224            let statistics = self.truncate_statistics(statistics);
1225
1226            builder = builder
1227                .set_statistics(statistics)
1228                .set_unencoded_byte_array_data_bytes(self.column_metrics.variable_length_bytes)
1229                .set_repetition_level_histogram(
1230                    self.column_metrics.repetition_level_histogram.take(),
1231                )
1232                .set_definition_level_histogram(
1233                    self.column_metrics.definition_level_histogram.take(),
1234                );
1235
1236            if let Some(geo_stats) = self.encoder.flush_geospatial_statistics() {
1237                builder = builder.set_geo_statistics(geo_stats);
1238            }
1239        }
1240
1241        builder = self.set_column_chunk_encryption_properties(builder);
1242
1243        let metadata = builder.build()?;
1244        Ok(metadata)
1245    }
1246
1247    /// Encodes definition or repetition levels for Data Page v1.
1248    #[inline]
1249    fn encode_levels_v1(&self, encoding: Encoding, levels: &[i16], max_level: i16) -> Vec<u8> {
1250        let mut encoder = LevelEncoder::v1(encoding, max_level, levels.len());
1251        encoder.put(levels);
1252        encoder.consume()
1253    }
1254
1255    /// Encodes definition or repetition levels for Data Page v2.
1256    /// Encoding is always RLE.
1257    #[inline]
1258    fn encode_levels_v2(&self, levels: &[i16], max_level: i16) -> Vec<u8> {
1259        let mut encoder = LevelEncoder::v2(max_level, levels.len());
1260        encoder.put(levels);
1261        encoder.consume()
1262    }
1263
1264    /// Writes compressed data page into underlying sink and updates global metrics.
1265    #[inline]
1266    fn write_data_page(&mut self, page: CompressedPage) -> Result<()> {
1267        self.encodings.insert(page.encoding());
1268        match self.encoding_stats.last_mut() {
1269            Some(encoding_stats)
1270                if encoding_stats.page_type == page.page_type()
1271                    && encoding_stats.encoding == page.encoding() =>
1272            {
1273                encoding_stats.count += 1;
1274            }
1275            _ => {
1276                // data page type does not change inside a file
1277                // encoding can currently only change from dictionary to non-dictionary once
1278                self.encoding_stats.push(PageEncodingStats {
1279                    page_type: page.page_type(),
1280                    encoding: page.encoding(),
1281                    count: 1,
1282                });
1283            }
1284        }
1285        let page_spec = self.page_writer.write_page(page)?;
1286        // update offset index
1287        // compressed_size = header_size + compressed_data_size
1288        if let Some(builder) = self.offset_index_builder.as_mut() {
1289            builder
1290                .append_offset_and_size(page_spec.offset as i64, page_spec.compressed_size as i32)
1291        }
1292        self.update_metrics_for_page(page_spec);
1293        Ok(())
1294    }
1295
1296    /// Writes dictionary page into underlying sink.
1297    #[inline]
1298    fn write_dictionary_page(&mut self) -> Result<()> {
1299        let compressed_page = {
1300            let mut page = self
1301                .encoder
1302                .flush_dict_page()?
1303                .ok_or_else(|| general_err!("Dictionary encoder is not set"))?;
1304
1305            let uncompressed_size = page.buf.len();
1306
1307            if let Some(ref mut cmpr) = self.compressor {
1308                let mut output_buf = Vec::with_capacity(uncompressed_size);
1309                cmpr.compress(&page.buf, &mut output_buf)?;
1310                page.buf = Bytes::from(output_buf);
1311            }
1312
1313            let dict_page = Page::DictionaryPage {
1314                buf: page.buf,
1315                num_values: page.num_values as u32,
1316                encoding: self.props.dictionary_page_encoding(),
1317                is_sorted: page.is_sorted,
1318            };
1319            CompressedPage::new(dict_page, uncompressed_size)
1320        };
1321
1322        self.encodings.insert(compressed_page.encoding());
1323        self.encoding_stats.push(PageEncodingStats {
1324            page_type: PageType::DICTIONARY_PAGE,
1325            encoding: compressed_page.encoding(),
1326            count: 1,
1327        });
1328        let page_spec = self.page_writer.write_page(compressed_page)?;
1329        self.update_metrics_for_page(page_spec);
1330        // For the directory page, don't need to update column/offset index.
1331        Ok(())
1332    }
1333
1334    /// Updates column writer metrics with each page metadata.
1335    #[inline]
1336    fn update_metrics_for_page(&mut self, page_spec: PageWriteSpec) {
1337        self.column_metrics.total_uncompressed_size += page_spec.uncompressed_size as u64;
1338        self.column_metrics.total_compressed_size += page_spec.compressed_size as u64;
1339        self.column_metrics.total_bytes_written += page_spec.bytes_written;
1340
1341        match page_spec.page_type {
1342            PageType::DATA_PAGE | PageType::DATA_PAGE_V2 => {
1343                self.column_metrics.total_num_values += page_spec.num_values as u64;
1344                if self.column_metrics.data_page_offset.is_none() {
1345                    self.column_metrics.data_page_offset = Some(page_spec.offset);
1346                }
1347            }
1348            PageType::DICTIONARY_PAGE => {
1349                assert!(
1350                    self.column_metrics.dictionary_page_offset.is_none(),
1351                    "Dictionary offset is already set"
1352                );
1353                self.column_metrics.dictionary_page_offset = Some(page_spec.offset);
1354            }
1355            _ => {}
1356        }
1357    }
1358
1359    #[inline]
1360    #[cfg(feature = "encryption")]
1361    fn set_column_chunk_encryption_properties(
1362        &self,
1363        builder: ColumnChunkMetaDataBuilder,
1364    ) -> ColumnChunkMetaDataBuilder {
1365        if let Some(encryption_properties) = self.props.file_encryption_properties.as_ref() {
1366            builder.set_column_crypto_metadata(get_column_crypto_metadata(
1367                encryption_properties,
1368                &self.descr,
1369            ))
1370        } else {
1371            builder
1372        }
1373    }
1374
1375    #[inline]
1376    #[cfg(not(feature = "encryption"))]
1377    fn set_column_chunk_encryption_properties(
1378        &self,
1379        builder: ColumnChunkMetaDataBuilder,
1380    ) -> ColumnChunkMetaDataBuilder {
1381        builder
1382    }
1383}
1384
1385fn update_min<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, min: &mut Option<T>) {
1386    update_stat::<T, _>(descr, val, min, |cur| compare_greater(descr, cur, val))
1387}
1388
1389fn update_max<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, max: &mut Option<T>) {
1390    update_stat::<T, _>(descr, val, max, |cur| compare_greater(descr, val, cur))
1391}
1392
1393#[inline]
1394#[allow(clippy::eq_op)]
1395fn is_nan<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T) -> bool {
1396    match T::PHYSICAL_TYPE {
1397        Type::FLOAT | Type::DOUBLE => val != val,
1398        Type::FIXED_LEN_BYTE_ARRAY if descr.logical_type_ref() == Some(&LogicalType::Float16) => {
1399            let val = val.as_bytes();
1400            let val = f16::from_le_bytes([val[0], val[1]]);
1401            val.is_nan()
1402        }
1403        _ => false,
1404    }
1405}
1406
1407/// Perform a conditional update of `cur`, skipping any NaN values
1408///
1409/// If `cur` is `None`, sets `cur` to `Some(val)`, otherwise calls `should_update` with
1410/// the value of `cur`, and updates `cur` to `Some(val)` if it returns `true`
1411fn update_stat<T: ParquetValueType, F>(
1412    descr: &ColumnDescriptor,
1413    val: &T,
1414    cur: &mut Option<T>,
1415    should_update: F,
1416) where
1417    F: Fn(&T) -> bool,
1418{
1419    if is_nan(descr, val) {
1420        return;
1421    }
1422
1423    if cur.as_ref().is_none_or(should_update) {
1424        *cur = Some(val.clone());
1425    }
1426}
1427
1428/// Evaluate `a > b` according to underlying logical type.
1429fn compare_greater<T: ParquetValueType>(descr: &ColumnDescriptor, a: &T, b: &T) -> bool {
1430    match T::PHYSICAL_TYPE {
1431        Type::INT32 | Type::INT64 => {
1432            if let Some(LogicalType::Integer {
1433                is_signed: false, ..
1434            }) = descr.logical_type_ref()
1435            {
1436                // need to compare unsigned
1437                return compare_greater_unsigned_int(a, b);
1438            }
1439
1440            match descr.converted_type() {
1441                ConvertedType::UINT_8
1442                | ConvertedType::UINT_16
1443                | ConvertedType::UINT_32
1444                | ConvertedType::UINT_64 => {
1445                    return compare_greater_unsigned_int(a, b);
1446                }
1447                _ => {}
1448            };
1449        }
1450        Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
1451            if let Some(LogicalType::Decimal { .. }) = descr.logical_type_ref() {
1452                return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1453            }
1454            if let ConvertedType::DECIMAL = descr.converted_type() {
1455                return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1456            }
1457            if let Some(LogicalType::Float16) = descr.logical_type_ref() {
1458                return compare_greater_f16(a.as_bytes(), b.as_bytes());
1459            }
1460        }
1461
1462        _ => {}
1463    }
1464
1465    // compare independent of logical / converted type
1466    a > b
1467}
1468
1469// ----------------------------------------------------------------------
1470// Encoding support for column writer.
1471// This mirrors parquet-mr default encodings for writes. See:
1472// https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV1ValuesWriterFactory.java
1473// https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV2ValuesWriterFactory.java
1474
1475/// Returns encoding for a column when no other encoding is provided in writer properties.
1476fn fallback_encoding(kind: Type, props: &WriterProperties) -> Encoding {
1477    match (kind, props.writer_version()) {
1478        (Type::BOOLEAN, WriterVersion::PARQUET_2_0) => Encoding::RLE,
1479        (Type::INT32, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1480        (Type::INT64, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1481        (Type::BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1482        (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1483        _ => Encoding::PLAIN,
1484    }
1485}
1486
1487/// Returns true if dictionary is supported for column writer, false otherwise.
1488fn has_dictionary_support(kind: Type, props: &WriterProperties) -> bool {
1489    match (kind, props.writer_version()) {
1490        // Booleans do not support dict encoding and should use a fallback encoding.
1491        (Type::BOOLEAN, _) => false,
1492        // Dictionary encoding was not enabled in PARQUET 1.0
1493        (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_1_0) => false,
1494        (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => true,
1495        _ => true,
1496    }
1497}
1498
1499#[inline]
1500fn compare_greater_unsigned_int<T: ParquetValueType>(a: &T, b: &T) -> bool {
1501    a.as_u64().unwrap() > b.as_u64().unwrap()
1502}
1503
1504#[inline]
1505fn compare_greater_f16(a: &[u8], b: &[u8]) -> bool {
1506    let a = f16::from_le_bytes(a.try_into().unwrap());
1507    let b = f16::from_le_bytes(b.try_into().unwrap());
1508    a > b
1509}
1510
1511/// Signed comparison of bytes arrays
1512fn compare_greater_byte_array_decimals(a: &[u8], b: &[u8]) -> bool {
1513    let a_length = a.len();
1514    let b_length = b.len();
1515
1516    if a_length == 0 || b_length == 0 {
1517        return a_length > 0;
1518    }
1519
1520    let first_a: u8 = a[0];
1521    let first_b: u8 = b[0];
1522
1523    // We can short circuit for different signed numbers or
1524    // for equal length bytes arrays that have different first bytes.
1525    // The equality requirement is necessary for sign extension cases.
1526    // 0xFF10 should be equal to 0x10 (due to big endian sign extension).
1527    if (0x80 & first_a) != (0x80 & first_b) || (a_length == b_length && first_a != first_b) {
1528        return (first_a as i8) > (first_b as i8);
1529    }
1530
1531    // When the lengths are unequal and the numbers are of the same
1532    // sign we need to do comparison by sign extending the shorter
1533    // value first, and once we get to equal sized arrays, lexicographical
1534    // unsigned comparison of everything but the first byte is sufficient.
1535
1536    let extension: u8 = if (first_a as i8) < 0 { 0xFF } else { 0 };
1537
1538    if a_length != b_length {
1539        let not_equal = if a_length > b_length {
1540            let lead_length = a_length - b_length;
1541            a[0..lead_length].iter().any(|&x| x != extension)
1542        } else {
1543            let lead_length = b_length - a_length;
1544            b[0..lead_length].iter().any(|&x| x != extension)
1545        };
1546
1547        if not_equal {
1548            let negative_values: bool = (first_a as i8) < 0;
1549            let a_longer: bool = a_length > b_length;
1550            return if negative_values { !a_longer } else { a_longer };
1551        }
1552    }
1553
1554    (a[1..]) > (b[1..])
1555}
1556
1557/// Truncate a UTF-8 slice to the longest prefix that is still a valid UTF-8 string,
1558/// while being less than `length` bytes and non-empty. Returns `None` if truncation
1559/// is not possible within those constraints.
1560///
1561/// The caller guarantees that data.len() > length.
1562fn truncate_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1563    let split = (1..=length).rfind(|x| data.is_char_boundary(*x))?;
1564    Some(data.as_bytes()[..split].to_vec())
1565}
1566
1567/// Truncate a UTF-8 slice and increment it's final character. The returned value is the
1568/// longest such slice that is still a valid UTF-8 string while being less than `length`
1569/// bytes and non-empty. Returns `None` if no such transformation is possible.
1570///
1571/// The caller guarantees that data.len() > length.
1572fn truncate_and_increment_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1573    // UTF-8 is max 4 bytes, so start search 3 back from desired length
1574    let lower_bound = length.saturating_sub(3);
1575    let split = (lower_bound..=length).rfind(|x| data.is_char_boundary(*x))?;
1576    increment_utf8(data.get(..split)?)
1577}
1578
1579/// Increment the final character in a UTF-8 string in such a way that the returned result
1580/// is still a valid UTF-8 string. The returned string may be shorter than the input if the
1581/// last character(s) cannot be incremented (due to overflow or producing invalid code points).
1582/// Returns `None` if the string cannot be incremented.
1583///
1584/// Note that this implementation will not promote an N-byte code point to (N+1) bytes.
1585fn increment_utf8(data: &str) -> Option<Vec<u8>> {
1586    for (idx, original_char) in data.char_indices().rev() {
1587        let original_len = original_char.len_utf8();
1588        if let Some(next_char) = char::from_u32(original_char as u32 + 1) {
1589            // do not allow increasing byte width of incremented char
1590            if next_char.len_utf8() == original_len {
1591                let mut result = data.as_bytes()[..idx + original_len].to_vec();
1592                next_char.encode_utf8(&mut result[idx..]);
1593                return Some(result);
1594            }
1595        }
1596    }
1597
1598    None
1599}
1600
1601/// Try and increment the bytes from right to left.
1602///
1603/// Returns `None` if all bytes are set to `u8::MAX`.
1604fn increment(mut data: Vec<u8>) -> Option<Vec<u8>> {
1605    for byte in data.iter_mut().rev() {
1606        let (incremented, overflow) = byte.overflowing_add(1);
1607        *byte = incremented;
1608
1609        if !overflow {
1610            return Some(data);
1611        }
1612    }
1613
1614    None
1615}
1616
1617#[cfg(test)]
1618mod tests {
1619    use crate::{
1620        file::{properties::DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, writer::SerializedFileWriter},
1621        schema::parser::parse_message_type,
1622    };
1623    use core::str;
1624    use rand::distr::uniform::SampleUniform;
1625    use std::{fs::File, sync::Arc};
1626
1627    use crate::column::{
1628        page::PageReader,
1629        reader::{ColumnReaderImpl, get_column_reader, get_typed_column_reader},
1630    };
1631    use crate::file::writer::TrackedWrite;
1632    use crate::file::{
1633        properties::ReaderProperties, reader::SerializedPageReader, writer::SerializedPageWriter,
1634    };
1635    use crate::schema::types::{ColumnPath, Type as SchemaType};
1636    use crate::util::test_common::rand_gen::random_numbers_range;
1637
1638    use super::*;
1639
1640    #[test]
1641    fn test_column_writer_inconsistent_def_rep_length() {
1642        let page_writer = get_test_page_writer();
1643        let props = Default::default();
1644        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 1, props);
1645        let res = writer.write_batch(&[1, 2, 3, 4], Some(&[1, 1, 1]), Some(&[0, 0]));
1646        assert!(res.is_err());
1647        if let Err(err) = res {
1648            assert_eq!(
1649                format!("{err}"),
1650                "Parquet error: Inconsistent length of definition and repetition levels: 3 != 2"
1651            );
1652        }
1653    }
1654
1655    #[test]
1656    fn test_column_writer_invalid_def_levels() {
1657        let page_writer = get_test_page_writer();
1658        let props = Default::default();
1659        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1660        let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1661        assert!(res.is_err());
1662        if let Err(err) = res {
1663            assert_eq!(
1664                format!("{err}"),
1665                "Parquet error: Definition levels are required, because max definition level = 1"
1666            );
1667        }
1668    }
1669
1670    #[test]
1671    fn test_column_writer_invalid_rep_levels() {
1672        let page_writer = get_test_page_writer();
1673        let props = Default::default();
1674        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 1, props);
1675        let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1676        assert!(res.is_err());
1677        if let Err(err) = res {
1678            assert_eq!(
1679                format!("{err}"),
1680                "Parquet error: Repetition levels are required, because max repetition level = 1"
1681            );
1682        }
1683    }
1684
1685    #[test]
1686    fn test_column_writer_not_enough_values_to_write() {
1687        let page_writer = get_test_page_writer();
1688        let props = Default::default();
1689        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1690        let res = writer.write_batch(&[1, 2], Some(&[1, 1, 1, 1]), None);
1691        assert!(res.is_err());
1692        if let Err(err) = res {
1693            assert_eq!(
1694                format!("{err}"),
1695                "Parquet error: Expected to write 4 values, but have only 2"
1696            );
1697        }
1698    }
1699
1700    #[test]
1701    fn test_column_writer_write_only_one_dictionary_page() {
1702        let page_writer = get_test_page_writer();
1703        let props = Default::default();
1704        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1705        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1706        // First page should be correctly written.
1707        writer.add_data_page().unwrap();
1708        writer.write_dictionary_page().unwrap();
1709        let err = writer.write_dictionary_page().unwrap_err().to_string();
1710        assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1711    }
1712
1713    #[test]
1714    fn test_column_writer_error_when_writing_disabled_dictionary() {
1715        let page_writer = get_test_page_writer();
1716        let props = Arc::new(
1717            WriterProperties::builder()
1718                .set_dictionary_enabled(false)
1719                .build(),
1720        );
1721        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1722        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1723        let err = writer.write_dictionary_page().unwrap_err().to_string();
1724        assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1725    }
1726
1727    #[test]
1728    fn test_column_writer_boolean_type_does_not_support_dictionary() {
1729        let page_writer = get_test_page_writer();
1730        let props = Arc::new(
1731            WriterProperties::builder()
1732                .set_dictionary_enabled(true)
1733                .build(),
1734        );
1735        let mut writer = get_test_column_writer::<BoolType>(page_writer, 0, 0, props);
1736        writer
1737            .write_batch(&[true, false, true, false], None, None)
1738            .unwrap();
1739
1740        let r = writer.close().unwrap();
1741        // PlainEncoder uses bit writer to write boolean values, which all fit into 1
1742        // byte.
1743        assert_eq!(r.bytes_written, 1);
1744        assert_eq!(r.rows_written, 4);
1745
1746        let metadata = r.metadata;
1747        assert_eq!(
1748            metadata.encodings().collect::<Vec<_>>(),
1749            vec![Encoding::PLAIN, Encoding::RLE]
1750        );
1751        assert_eq!(metadata.num_values(), 4); // just values
1752        assert_eq!(metadata.dictionary_page_offset(), None);
1753    }
1754
1755    #[test]
1756    fn test_column_writer_default_encoding_support_bool() {
1757        check_encoding_write_support::<BoolType>(
1758            WriterVersion::PARQUET_1_0,
1759            true,
1760            &[true, false],
1761            None,
1762            &[Encoding::PLAIN, Encoding::RLE],
1763            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1764        );
1765        check_encoding_write_support::<BoolType>(
1766            WriterVersion::PARQUET_1_0,
1767            false,
1768            &[true, false],
1769            None,
1770            &[Encoding::PLAIN, Encoding::RLE],
1771            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1772        );
1773        check_encoding_write_support::<BoolType>(
1774            WriterVersion::PARQUET_2_0,
1775            true,
1776            &[true, false],
1777            None,
1778            &[Encoding::RLE],
1779            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE, 1)],
1780        );
1781        check_encoding_write_support::<BoolType>(
1782            WriterVersion::PARQUET_2_0,
1783            false,
1784            &[true, false],
1785            None,
1786            &[Encoding::RLE],
1787            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE, 1)],
1788        );
1789    }
1790
1791    #[test]
1792    fn test_column_writer_default_encoding_support_int32() {
1793        check_encoding_write_support::<Int32Type>(
1794            WriterVersion::PARQUET_1_0,
1795            true,
1796            &[1, 2],
1797            Some(0),
1798            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1799            &[
1800                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1801                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1802            ],
1803        );
1804        check_encoding_write_support::<Int32Type>(
1805            WriterVersion::PARQUET_1_0,
1806            false,
1807            &[1, 2],
1808            None,
1809            &[Encoding::PLAIN, Encoding::RLE],
1810            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1811        );
1812        check_encoding_write_support::<Int32Type>(
1813            WriterVersion::PARQUET_2_0,
1814            true,
1815            &[1, 2],
1816            Some(0),
1817            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1818            &[
1819                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1820                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1821            ],
1822        );
1823        check_encoding_write_support::<Int32Type>(
1824            WriterVersion::PARQUET_2_0,
1825            false,
1826            &[1, 2],
1827            None,
1828            &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
1829            &[encoding_stats(
1830                PageType::DATA_PAGE_V2,
1831                Encoding::DELTA_BINARY_PACKED,
1832                1,
1833            )],
1834        );
1835    }
1836
1837    #[test]
1838    fn test_column_writer_default_encoding_support_int64() {
1839        check_encoding_write_support::<Int64Type>(
1840            WriterVersion::PARQUET_1_0,
1841            true,
1842            &[1, 2],
1843            Some(0),
1844            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1845            &[
1846                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1847                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1848            ],
1849        );
1850        check_encoding_write_support::<Int64Type>(
1851            WriterVersion::PARQUET_1_0,
1852            false,
1853            &[1, 2],
1854            None,
1855            &[Encoding::PLAIN, Encoding::RLE],
1856            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1857        );
1858        check_encoding_write_support::<Int64Type>(
1859            WriterVersion::PARQUET_2_0,
1860            true,
1861            &[1, 2],
1862            Some(0),
1863            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1864            &[
1865                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1866                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1867            ],
1868        );
1869        check_encoding_write_support::<Int64Type>(
1870            WriterVersion::PARQUET_2_0,
1871            false,
1872            &[1, 2],
1873            None,
1874            &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
1875            &[encoding_stats(
1876                PageType::DATA_PAGE_V2,
1877                Encoding::DELTA_BINARY_PACKED,
1878                1,
1879            )],
1880        );
1881    }
1882
1883    #[test]
1884    fn test_column_writer_default_encoding_support_int96() {
1885        check_encoding_write_support::<Int96Type>(
1886            WriterVersion::PARQUET_1_0,
1887            true,
1888            &[Int96::from(vec![1, 2, 3])],
1889            Some(0),
1890            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1891            &[
1892                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1893                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1894            ],
1895        );
1896        check_encoding_write_support::<Int96Type>(
1897            WriterVersion::PARQUET_1_0,
1898            false,
1899            &[Int96::from(vec![1, 2, 3])],
1900            None,
1901            &[Encoding::PLAIN, Encoding::RLE],
1902            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1903        );
1904        check_encoding_write_support::<Int96Type>(
1905            WriterVersion::PARQUET_2_0,
1906            true,
1907            &[Int96::from(vec![1, 2, 3])],
1908            Some(0),
1909            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1910            &[
1911                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1912                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1913            ],
1914        );
1915        check_encoding_write_support::<Int96Type>(
1916            WriterVersion::PARQUET_2_0,
1917            false,
1918            &[Int96::from(vec![1, 2, 3])],
1919            None,
1920            &[Encoding::PLAIN, Encoding::RLE],
1921            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1922        );
1923    }
1924
1925    #[test]
1926    fn test_column_writer_default_encoding_support_float() {
1927        check_encoding_write_support::<FloatType>(
1928            WriterVersion::PARQUET_1_0,
1929            true,
1930            &[1.0, 2.0],
1931            Some(0),
1932            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1933            &[
1934                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1935                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1936            ],
1937        );
1938        check_encoding_write_support::<FloatType>(
1939            WriterVersion::PARQUET_1_0,
1940            false,
1941            &[1.0, 2.0],
1942            None,
1943            &[Encoding::PLAIN, Encoding::RLE],
1944            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1945        );
1946        check_encoding_write_support::<FloatType>(
1947            WriterVersion::PARQUET_2_0,
1948            true,
1949            &[1.0, 2.0],
1950            Some(0),
1951            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1952            &[
1953                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1954                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1955            ],
1956        );
1957        check_encoding_write_support::<FloatType>(
1958            WriterVersion::PARQUET_2_0,
1959            false,
1960            &[1.0, 2.0],
1961            None,
1962            &[Encoding::PLAIN, Encoding::RLE],
1963            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1964        );
1965    }
1966
1967    #[test]
1968    fn test_column_writer_default_encoding_support_double() {
1969        check_encoding_write_support::<DoubleType>(
1970            WriterVersion::PARQUET_1_0,
1971            true,
1972            &[1.0, 2.0],
1973            Some(0),
1974            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1975            &[
1976                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1977                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1978            ],
1979        );
1980        check_encoding_write_support::<DoubleType>(
1981            WriterVersion::PARQUET_1_0,
1982            false,
1983            &[1.0, 2.0],
1984            None,
1985            &[Encoding::PLAIN, Encoding::RLE],
1986            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1987        );
1988        check_encoding_write_support::<DoubleType>(
1989            WriterVersion::PARQUET_2_0,
1990            true,
1991            &[1.0, 2.0],
1992            Some(0),
1993            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1994            &[
1995                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1996                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1997            ],
1998        );
1999        check_encoding_write_support::<DoubleType>(
2000            WriterVersion::PARQUET_2_0,
2001            false,
2002            &[1.0, 2.0],
2003            None,
2004            &[Encoding::PLAIN, Encoding::RLE],
2005            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
2006        );
2007    }
2008
2009    #[test]
2010    fn test_column_writer_default_encoding_support_byte_array() {
2011        check_encoding_write_support::<ByteArrayType>(
2012            WriterVersion::PARQUET_1_0,
2013            true,
2014            &[ByteArray::from(vec![1u8])],
2015            Some(0),
2016            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2017            &[
2018                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2019                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
2020            ],
2021        );
2022        check_encoding_write_support::<ByteArrayType>(
2023            WriterVersion::PARQUET_1_0,
2024            false,
2025            &[ByteArray::from(vec![1u8])],
2026            None,
2027            &[Encoding::PLAIN, Encoding::RLE],
2028            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2029        );
2030        check_encoding_write_support::<ByteArrayType>(
2031            WriterVersion::PARQUET_2_0,
2032            true,
2033            &[ByteArray::from(vec![1u8])],
2034            Some(0),
2035            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2036            &[
2037                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2038                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2039            ],
2040        );
2041        check_encoding_write_support::<ByteArrayType>(
2042            WriterVersion::PARQUET_2_0,
2043            false,
2044            &[ByteArray::from(vec![1u8])],
2045            None,
2046            &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
2047            &[encoding_stats(
2048                PageType::DATA_PAGE_V2,
2049                Encoding::DELTA_BYTE_ARRAY,
2050                1,
2051            )],
2052        );
2053    }
2054
2055    #[test]
2056    fn test_column_writer_default_encoding_support_fixed_len_byte_array() {
2057        check_encoding_write_support::<FixedLenByteArrayType>(
2058            WriterVersion::PARQUET_1_0,
2059            true,
2060            &[ByteArray::from(vec![1u8]).into()],
2061            None,
2062            &[Encoding::PLAIN, Encoding::RLE],
2063            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2064        );
2065        check_encoding_write_support::<FixedLenByteArrayType>(
2066            WriterVersion::PARQUET_1_0,
2067            false,
2068            &[ByteArray::from(vec![1u8]).into()],
2069            None,
2070            &[Encoding::PLAIN, Encoding::RLE],
2071            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2072        );
2073        check_encoding_write_support::<FixedLenByteArrayType>(
2074            WriterVersion::PARQUET_2_0,
2075            true,
2076            &[ByteArray::from(vec![1u8]).into()],
2077            Some(0),
2078            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2079            &[
2080                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2081                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2082            ],
2083        );
2084        check_encoding_write_support::<FixedLenByteArrayType>(
2085            WriterVersion::PARQUET_2_0,
2086            false,
2087            &[ByteArray::from(vec![1u8]).into()],
2088            None,
2089            &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
2090            &[encoding_stats(
2091                PageType::DATA_PAGE_V2,
2092                Encoding::DELTA_BYTE_ARRAY,
2093                1,
2094            )],
2095        );
2096    }
2097
2098    #[test]
2099    fn test_column_writer_check_metadata() {
2100        let page_writer = get_test_page_writer();
2101        let props = Default::default();
2102        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2103        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2104
2105        let r = writer.close().unwrap();
2106        assert_eq!(r.bytes_written, 20);
2107        assert_eq!(r.rows_written, 4);
2108
2109        let metadata = r.metadata;
2110        assert_eq!(
2111            metadata.encodings().collect::<Vec<_>>(),
2112            vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
2113        );
2114        assert_eq!(metadata.num_values(), 4);
2115        assert_eq!(metadata.compressed_size(), 20);
2116        assert_eq!(metadata.uncompressed_size(), 20);
2117        assert_eq!(metadata.data_page_offset(), 0);
2118        assert_eq!(metadata.dictionary_page_offset(), Some(0));
2119        if let Some(stats) = metadata.statistics() {
2120            assert_eq!(stats.null_count_opt(), Some(0));
2121            assert_eq!(stats.distinct_count_opt(), None);
2122            if let Statistics::Int32(stats) = stats {
2123                assert_eq!(stats.min_opt().unwrap(), &1);
2124                assert_eq!(stats.max_opt().unwrap(), &4);
2125            } else {
2126                panic!("expecting Statistics::Int32");
2127            }
2128        } else {
2129            panic!("metadata missing statistics");
2130        }
2131    }
2132
2133    #[test]
2134    fn test_column_writer_check_byte_array_min_max() {
2135        let page_writer = get_test_page_writer();
2136        let props = Default::default();
2137        let mut writer = get_test_decimals_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
2138        writer
2139            .write_batch(
2140                &[
2141                    ByteArray::from(vec![
2142                        255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
2143                        35u8, 231u8, 90u8, 0u8, 0u8,
2144                    ]),
2145                    ByteArray::from(vec![
2146                        255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 228u8, 62u8, 146u8,
2147                        152u8, 177u8, 56u8, 0u8, 0u8,
2148                    ]),
2149                    ByteArray::from(vec![
2150                        0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8,
2151                        0u8,
2152                    ]),
2153                    ByteArray::from(vec![
2154                        0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
2155                        44u8, 0u8, 0u8,
2156                    ]),
2157                ],
2158                None,
2159                None,
2160            )
2161            .unwrap();
2162        let metadata = writer.close().unwrap().metadata;
2163        if let Some(stats) = metadata.statistics() {
2164            if let Statistics::ByteArray(stats) = stats {
2165                assert_eq!(
2166                    stats.min_opt().unwrap(),
2167                    &ByteArray::from(vec![
2168                        255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
2169                        35u8, 231u8, 90u8, 0u8, 0u8,
2170                    ])
2171                );
2172                assert_eq!(
2173                    stats.max_opt().unwrap(),
2174                    &ByteArray::from(vec![
2175                        0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
2176                        44u8, 0u8, 0u8,
2177                    ])
2178                );
2179            } else {
2180                panic!("expecting Statistics::ByteArray");
2181            }
2182        } else {
2183            panic!("metadata missing statistics");
2184        }
2185    }
2186
2187    #[test]
2188    fn test_column_writer_uint32_converted_type_min_max() {
2189        let page_writer = get_test_page_writer();
2190        let props = Default::default();
2191        let mut writer = get_test_unsigned_int_given_as_converted_column_writer::<Int32Type>(
2192            page_writer,
2193            0,
2194            0,
2195            props,
2196        );
2197        writer.write_batch(&[0, 1, 2, 3, 4, 5], None, None).unwrap();
2198        let metadata = writer.close().unwrap().metadata;
2199        if let Some(stats) = metadata.statistics() {
2200            if let Statistics::Int32(stats) = stats {
2201                assert_eq!(stats.min_opt().unwrap(), &0,);
2202                assert_eq!(stats.max_opt().unwrap(), &5,);
2203            } else {
2204                panic!("expecting Statistics::Int32");
2205            }
2206        } else {
2207            panic!("metadata missing statistics");
2208        }
2209    }
2210
2211    #[test]
2212    fn test_column_writer_precalculated_statistics() {
2213        let page_writer = get_test_page_writer();
2214        let props = Arc::new(
2215            WriterProperties::builder()
2216                .set_statistics_enabled(EnabledStatistics::Chunk)
2217                .build(),
2218        );
2219        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2220        writer
2221            .write_batch_with_statistics(
2222                &[1, 2, 3, 4],
2223                None,
2224                None,
2225                Some(&-17),
2226                Some(&9000),
2227                Some(55),
2228            )
2229            .unwrap();
2230
2231        let r = writer.close().unwrap();
2232        assert_eq!(r.bytes_written, 20);
2233        assert_eq!(r.rows_written, 4);
2234
2235        let metadata = r.metadata;
2236        assert_eq!(
2237            metadata.encodings().collect::<Vec<_>>(),
2238            vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
2239        );
2240        assert_eq!(metadata.num_values(), 4);
2241        assert_eq!(metadata.compressed_size(), 20);
2242        assert_eq!(metadata.uncompressed_size(), 20);
2243        assert_eq!(metadata.data_page_offset(), 0);
2244        assert_eq!(metadata.dictionary_page_offset(), Some(0));
2245        if let Some(stats) = metadata.statistics() {
2246            assert_eq!(stats.null_count_opt(), Some(0));
2247            assert_eq!(stats.distinct_count_opt().unwrap_or(0), 55);
2248            if let Statistics::Int32(stats) = stats {
2249                assert_eq!(stats.min_opt().unwrap(), &-17);
2250                assert_eq!(stats.max_opt().unwrap(), &9000);
2251            } else {
2252                panic!("expecting Statistics::Int32");
2253            }
2254        } else {
2255            panic!("metadata missing statistics");
2256        }
2257    }
2258
2259    #[test]
2260    fn test_mixed_precomputed_statistics() {
2261        let mut buf = Vec::with_capacity(100);
2262        let mut write = TrackedWrite::new(&mut buf);
2263        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2264        let props = Arc::new(
2265            WriterProperties::builder()
2266                .set_write_page_header_statistics(true)
2267                .build(),
2268        );
2269        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2270
2271        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2272        writer
2273            .write_batch_with_statistics(&[5, 6, 7], None, None, Some(&5), Some(&7), Some(3))
2274            .unwrap();
2275
2276        let r = writer.close().unwrap();
2277
2278        let stats = r.metadata.statistics().unwrap();
2279        assert_eq!(stats.min_bytes_opt().unwrap(), 1_i32.to_le_bytes());
2280        assert_eq!(stats.max_bytes_opt().unwrap(), 7_i32.to_le_bytes());
2281        assert_eq!(stats.null_count_opt(), Some(0));
2282        assert!(stats.distinct_count_opt().is_none());
2283
2284        drop(write);
2285
2286        let props = ReaderProperties::builder()
2287            .set_backward_compatible_lz4(false)
2288            .set_read_page_statistics(true)
2289            .build();
2290        let reader = SerializedPageReader::new_with_properties(
2291            Arc::new(Bytes::from(buf)),
2292            &r.metadata,
2293            r.rows_written as usize,
2294            None,
2295            Arc::new(props),
2296        )
2297        .unwrap();
2298
2299        let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2300        assert_eq!(pages.len(), 2);
2301
2302        assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2303        assert_eq!(pages[1].page_type(), PageType::DATA_PAGE);
2304
2305        let page_statistics = pages[1].statistics().unwrap();
2306        assert_eq!(
2307            page_statistics.min_bytes_opt().unwrap(),
2308            1_i32.to_le_bytes()
2309        );
2310        assert_eq!(
2311            page_statistics.max_bytes_opt().unwrap(),
2312            7_i32.to_le_bytes()
2313        );
2314        assert_eq!(page_statistics.null_count_opt(), Some(0));
2315        assert!(page_statistics.distinct_count_opt().is_none());
2316    }
2317
2318    #[test]
2319    fn test_disabled_statistics() {
2320        let mut buf = Vec::with_capacity(100);
2321        let mut write = TrackedWrite::new(&mut buf);
2322        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2323        let props = WriterProperties::builder()
2324            .set_statistics_enabled(EnabledStatistics::None)
2325            .set_writer_version(WriterVersion::PARQUET_2_0)
2326            .build();
2327        let props = Arc::new(props);
2328
2329        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
2330        writer
2331            .write_batch(&[1, 2, 3, 4], Some(&[1, 0, 0, 1, 1, 1]), None)
2332            .unwrap();
2333
2334        let r = writer.close().unwrap();
2335        assert!(r.metadata.statistics().is_none());
2336
2337        drop(write);
2338
2339        let props = ReaderProperties::builder()
2340            .set_backward_compatible_lz4(false)
2341            .build();
2342        let reader = SerializedPageReader::new_with_properties(
2343            Arc::new(Bytes::from(buf)),
2344            &r.metadata,
2345            r.rows_written as usize,
2346            None,
2347            Arc::new(props),
2348        )
2349        .unwrap();
2350
2351        let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2352        assert_eq!(pages.len(), 2);
2353
2354        assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2355        assert_eq!(pages[1].page_type(), PageType::DATA_PAGE_V2);
2356
2357        match &pages[1] {
2358            Page::DataPageV2 {
2359                num_values,
2360                num_nulls,
2361                num_rows,
2362                statistics,
2363                ..
2364            } => {
2365                assert_eq!(*num_values, 6);
2366                assert_eq!(*num_nulls, 2);
2367                assert_eq!(*num_rows, 6);
2368                assert!(statistics.is_none());
2369            }
2370            _ => unreachable!(),
2371        }
2372    }
2373
2374    #[test]
2375    fn test_column_writer_empty_column_roundtrip() {
2376        let props = Default::default();
2377        column_roundtrip::<Int32Type>(props, &[], None, None);
2378    }
2379
2380    #[test]
2381    fn test_column_writer_non_nullable_values_roundtrip() {
2382        let props = Default::default();
2383        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 0, 0);
2384    }
2385
2386    #[test]
2387    fn test_column_writer_nullable_non_repeated_values_roundtrip() {
2388        let props = Default::default();
2389        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 0);
2390    }
2391
2392    #[test]
2393    fn test_column_writer_nullable_repeated_values_roundtrip() {
2394        let props = Default::default();
2395        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2396    }
2397
2398    #[test]
2399    fn test_column_writer_dictionary_fallback_small_data_page() {
2400        let props = WriterProperties::builder()
2401            .set_dictionary_page_size_limit(32)
2402            .set_data_page_size_limit(32)
2403            .build();
2404        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2405    }
2406
2407    #[test]
2408    fn test_column_writer_small_write_batch_size() {
2409        for i in &[1usize, 2, 5, 10, 11, 1023] {
2410            let props = WriterProperties::builder().set_write_batch_size(*i).build();
2411
2412            column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2413        }
2414    }
2415
2416    #[test]
2417    fn test_column_writer_dictionary_disabled_v1() {
2418        let props = WriterProperties::builder()
2419            .set_writer_version(WriterVersion::PARQUET_1_0)
2420            .set_dictionary_enabled(false)
2421            .build();
2422        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2423    }
2424
2425    #[test]
2426    fn test_column_writer_dictionary_disabled_v2() {
2427        let props = WriterProperties::builder()
2428            .set_writer_version(WriterVersion::PARQUET_2_0)
2429            .set_dictionary_enabled(false)
2430            .build();
2431        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2432    }
2433
2434    #[test]
2435    fn test_column_writer_compression_v1() {
2436        let props = WriterProperties::builder()
2437            .set_writer_version(WriterVersion::PARQUET_1_0)
2438            .set_compression(Compression::SNAPPY)
2439            .build();
2440        column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2441    }
2442
2443    #[test]
2444    fn test_column_writer_compression_v2() {
2445        let props = WriterProperties::builder()
2446            .set_writer_version(WriterVersion::PARQUET_2_0)
2447            .set_compression(Compression::SNAPPY)
2448            .build();
2449        column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2450    }
2451
2452    #[test]
2453    fn test_column_writer_add_data_pages_with_dict() {
2454        // ARROW-5129: Test verifies that we add data page in case of dictionary encoding
2455        // and no fallback occurred so far.
2456        let mut file = tempfile::tempfile().unwrap();
2457        let mut write = TrackedWrite::new(&mut file);
2458        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2459        let props = Arc::new(
2460            WriterProperties::builder()
2461                .set_data_page_size_limit(10)
2462                .set_write_batch_size(3) // write 3 values at a time
2463                .build(),
2464        );
2465        let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
2466        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2467        writer.write_batch(data, None, None).unwrap();
2468        let r = writer.close().unwrap();
2469
2470        drop(write);
2471
2472        // Read pages and check the sequence
2473        let props = ReaderProperties::builder()
2474            .set_backward_compatible_lz4(false)
2475            .build();
2476        let mut page_reader = Box::new(
2477            SerializedPageReader::new_with_properties(
2478                Arc::new(file),
2479                &r.metadata,
2480                r.rows_written as usize,
2481                None,
2482                Arc::new(props),
2483            )
2484            .unwrap(),
2485        );
2486        let mut res = Vec::new();
2487        while let Some(page) = page_reader.get_next_page().unwrap() {
2488            res.push((page.page_type(), page.num_values(), page.buffer().len()));
2489        }
2490        assert_eq!(
2491            res,
2492            vec![
2493                (PageType::DICTIONARY_PAGE, 10, 40),
2494                (PageType::DATA_PAGE, 9, 10),
2495                (PageType::DATA_PAGE, 1, 3),
2496            ]
2497        );
2498        assert_eq!(
2499            r.metadata.page_encoding_stats(),
2500            Some(&vec![
2501                PageEncodingStats {
2502                    page_type: PageType::DICTIONARY_PAGE,
2503                    encoding: Encoding::PLAIN,
2504                    count: 1
2505                },
2506                PageEncodingStats {
2507                    page_type: PageType::DATA_PAGE,
2508                    encoding: Encoding::RLE_DICTIONARY,
2509                    count: 2,
2510                }
2511            ])
2512        );
2513    }
2514
2515    #[test]
2516    fn test_column_writer_column_data_page_size_limit() {
2517        let props = Arc::new(
2518            WriterProperties::builder()
2519                .set_writer_version(WriterVersion::PARQUET_1_0)
2520                .set_dictionary_enabled(false)
2521                .set_data_page_size_limit(1000)
2522                .set_column_data_page_size_limit(ColumnPath::from("col"), 10)
2523                .set_write_batch_size(3)
2524                .build(),
2525        );
2526        let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
2527
2528        let col_values =
2529            write_and_collect_page_values(ColumnPath::from("col"), Arc::clone(&props), data);
2530        let other_values = write_and_collect_page_values(ColumnPath::from("other"), props, data);
2531
2532        assert_eq!(col_values, vec![3, 3, 3, 1]);
2533        assert_eq!(other_values, vec![10]);
2534    }
2535
2536    #[test]
2537    fn test_bool_statistics() {
2538        let stats = statistics_roundtrip::<BoolType>(&[true, false, false, true]);
2539        // Booleans have an unsigned sort order and so are not compatible
2540        // with the deprecated `min` and `max` statistics
2541        assert!(!stats.is_min_max_backwards_compatible());
2542        if let Statistics::Boolean(stats) = stats {
2543            assert_eq!(stats.min_opt().unwrap(), &false);
2544            assert_eq!(stats.max_opt().unwrap(), &true);
2545        } else {
2546            panic!("expecting Statistics::Boolean, got {stats:?}");
2547        }
2548    }
2549
2550    #[test]
2551    fn test_int32_statistics() {
2552        let stats = statistics_roundtrip::<Int32Type>(&[-1, 3, -2, 2]);
2553        assert!(stats.is_min_max_backwards_compatible());
2554        if let Statistics::Int32(stats) = stats {
2555            assert_eq!(stats.min_opt().unwrap(), &-2);
2556            assert_eq!(stats.max_opt().unwrap(), &3);
2557        } else {
2558            panic!("expecting Statistics::Int32, got {stats:?}");
2559        }
2560    }
2561
2562    #[test]
2563    fn test_int64_statistics() {
2564        let stats = statistics_roundtrip::<Int64Type>(&[-1, 3, -2, 2]);
2565        assert!(stats.is_min_max_backwards_compatible());
2566        if let Statistics::Int64(stats) = stats {
2567            assert_eq!(stats.min_opt().unwrap(), &-2);
2568            assert_eq!(stats.max_opt().unwrap(), &3);
2569        } else {
2570            panic!("expecting Statistics::Int64, got {stats:?}");
2571        }
2572    }
2573
2574    #[test]
2575    fn test_int96_statistics() {
2576        let input = vec![
2577            Int96::from(vec![1, 20, 30]),
2578            Int96::from(vec![3, 20, 10]),
2579            Int96::from(vec![0, 20, 30]),
2580            Int96::from(vec![2, 20, 30]),
2581        ]
2582        .into_iter()
2583        .collect::<Vec<Int96>>();
2584
2585        let stats = statistics_roundtrip::<Int96Type>(&input);
2586        assert!(!stats.is_min_max_backwards_compatible());
2587        if let Statistics::Int96(stats) = stats {
2588            assert_eq!(stats.min_opt().unwrap(), &Int96::from(vec![3, 20, 10]));
2589            assert_eq!(stats.max_opt().unwrap(), &Int96::from(vec![2, 20, 30]));
2590        } else {
2591            panic!("expecting Statistics::Int96, got {stats:?}");
2592        }
2593    }
2594
2595    #[test]
2596    fn test_float_statistics() {
2597        let stats = statistics_roundtrip::<FloatType>(&[-1.0, 3.0, -2.0, 2.0]);
2598        assert!(stats.is_min_max_backwards_compatible());
2599        if let Statistics::Float(stats) = stats {
2600            assert_eq!(stats.min_opt().unwrap(), &-2.0);
2601            assert_eq!(stats.max_opt().unwrap(), &3.0);
2602        } else {
2603            panic!("expecting Statistics::Float, got {stats:?}");
2604        }
2605    }
2606
2607    #[test]
2608    fn test_double_statistics() {
2609        let stats = statistics_roundtrip::<DoubleType>(&[-1.0, 3.0, -2.0, 2.0]);
2610        assert!(stats.is_min_max_backwards_compatible());
2611        if let Statistics::Double(stats) = stats {
2612            assert_eq!(stats.min_opt().unwrap(), &-2.0);
2613            assert_eq!(stats.max_opt().unwrap(), &3.0);
2614        } else {
2615            panic!("expecting Statistics::Double, got {stats:?}");
2616        }
2617    }
2618
2619    #[test]
2620    fn test_byte_array_statistics() {
2621        let input = ["aawaa", "zz", "aaw", "m", "qrs"]
2622            .iter()
2623            .map(|&s| s.into())
2624            .collect::<Vec<_>>();
2625
2626        let stats = statistics_roundtrip::<ByteArrayType>(&input);
2627        assert!(!stats.is_min_max_backwards_compatible());
2628        if let Statistics::ByteArray(stats) = stats {
2629            assert_eq!(stats.min_opt().unwrap(), &ByteArray::from("aaw"));
2630            assert_eq!(stats.max_opt().unwrap(), &ByteArray::from("zz"));
2631        } else {
2632            panic!("expecting Statistics::ByteArray, got {stats:?}");
2633        }
2634    }
2635
2636    #[test]
2637    fn test_fixed_len_byte_array_statistics() {
2638        let input = ["aawaa", "zz   ", "aaw  ", "m    ", "qrs  "]
2639            .iter()
2640            .map(|&s| ByteArray::from(s).into())
2641            .collect::<Vec<_>>();
2642
2643        let stats = statistics_roundtrip::<FixedLenByteArrayType>(&input);
2644        assert!(!stats.is_min_max_backwards_compatible());
2645        if let Statistics::FixedLenByteArray(stats) = stats {
2646            let expected_min: FixedLenByteArray = ByteArray::from("aaw  ").into();
2647            assert_eq!(stats.min_opt().unwrap(), &expected_min);
2648            let expected_max: FixedLenByteArray = ByteArray::from("zz   ").into();
2649            assert_eq!(stats.max_opt().unwrap(), &expected_max);
2650        } else {
2651            panic!("expecting Statistics::FixedLenByteArray, got {stats:?}");
2652        }
2653    }
2654
2655    #[test]
2656    fn test_column_writer_check_float16_min_max() {
2657        let input = [
2658            -f16::ONE,
2659            f16::from_f32(3.0),
2660            -f16::from_f32(2.0),
2661            f16::from_f32(2.0),
2662        ]
2663        .into_iter()
2664        .map(|s| ByteArray::from(s).into())
2665        .collect::<Vec<_>>();
2666
2667        let stats = float16_statistics_roundtrip(&input);
2668        assert!(stats.is_min_max_backwards_compatible());
2669        assert_eq!(
2670            stats.min_opt().unwrap(),
2671            &ByteArray::from(-f16::from_f32(2.0))
2672        );
2673        assert_eq!(
2674            stats.max_opt().unwrap(),
2675            &ByteArray::from(f16::from_f32(3.0))
2676        );
2677    }
2678
2679    #[test]
2680    fn test_column_writer_check_float16_nan_middle() {
2681        let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
2682            .into_iter()
2683            .map(|s| ByteArray::from(s).into())
2684            .collect::<Vec<_>>();
2685
2686        let stats = float16_statistics_roundtrip(&input);
2687        assert!(stats.is_min_max_backwards_compatible());
2688        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2689        assert_eq!(
2690            stats.max_opt().unwrap(),
2691            &ByteArray::from(f16::ONE + f16::ONE)
2692        );
2693    }
2694
2695    #[test]
2696    fn test_float16_statistics_nan_middle() {
2697        let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
2698            .into_iter()
2699            .map(|s| ByteArray::from(s).into())
2700            .collect::<Vec<_>>();
2701
2702        let stats = float16_statistics_roundtrip(&input);
2703        assert!(stats.is_min_max_backwards_compatible());
2704        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2705        assert_eq!(
2706            stats.max_opt().unwrap(),
2707            &ByteArray::from(f16::ONE + f16::ONE)
2708        );
2709    }
2710
2711    #[test]
2712    fn test_float16_statistics_nan_start() {
2713        let input = [f16::NAN, f16::ONE, f16::ONE + f16::ONE]
2714            .into_iter()
2715            .map(|s| ByteArray::from(s).into())
2716            .collect::<Vec<_>>();
2717
2718        let stats = float16_statistics_roundtrip(&input);
2719        assert!(stats.is_min_max_backwards_compatible());
2720        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2721        assert_eq!(
2722            stats.max_opt().unwrap(),
2723            &ByteArray::from(f16::ONE + f16::ONE)
2724        );
2725    }
2726
2727    #[test]
2728    fn test_float16_statistics_nan_only() {
2729        let input = [f16::NAN, f16::NAN]
2730            .into_iter()
2731            .map(|s| ByteArray::from(s).into())
2732            .collect::<Vec<_>>();
2733
2734        let stats = float16_statistics_roundtrip(&input);
2735        assert!(stats.min_bytes_opt().is_none());
2736        assert!(stats.max_bytes_opt().is_none());
2737        assert!(stats.is_min_max_backwards_compatible());
2738    }
2739
2740    #[test]
2741    fn test_float16_statistics_zero_only() {
2742        let input = [f16::ZERO]
2743            .into_iter()
2744            .map(|s| ByteArray::from(s).into())
2745            .collect::<Vec<_>>();
2746
2747        let stats = float16_statistics_roundtrip(&input);
2748        assert!(stats.is_min_max_backwards_compatible());
2749        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2750        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2751    }
2752
2753    #[test]
2754    fn test_float16_statistics_neg_zero_only() {
2755        let input = [f16::NEG_ZERO]
2756            .into_iter()
2757            .map(|s| ByteArray::from(s).into())
2758            .collect::<Vec<_>>();
2759
2760        let stats = float16_statistics_roundtrip(&input);
2761        assert!(stats.is_min_max_backwards_compatible());
2762        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2763        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2764    }
2765
2766    #[test]
2767    fn test_float16_statistics_zero_min() {
2768        let input = [f16::ZERO, f16::ONE, f16::NAN, f16::PI]
2769            .into_iter()
2770            .map(|s| ByteArray::from(s).into())
2771            .collect::<Vec<_>>();
2772
2773        let stats = float16_statistics_roundtrip(&input);
2774        assert!(stats.is_min_max_backwards_compatible());
2775        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2776        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::PI));
2777    }
2778
2779    #[test]
2780    fn test_float16_statistics_neg_zero_max() {
2781        let input = [f16::NEG_ZERO, f16::NEG_ONE, f16::NAN, -f16::PI]
2782            .into_iter()
2783            .map(|s| ByteArray::from(s).into())
2784            .collect::<Vec<_>>();
2785
2786        let stats = float16_statistics_roundtrip(&input);
2787        assert!(stats.is_min_max_backwards_compatible());
2788        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(-f16::PI));
2789        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2790    }
2791
2792    #[test]
2793    fn test_float_statistics_nan_middle() {
2794        let stats = statistics_roundtrip::<FloatType>(&[1.0, f32::NAN, 2.0]);
2795        assert!(stats.is_min_max_backwards_compatible());
2796        if let Statistics::Float(stats) = stats {
2797            assert_eq!(stats.min_opt().unwrap(), &1.0);
2798            assert_eq!(stats.max_opt().unwrap(), &2.0);
2799        } else {
2800            panic!("expecting Statistics::Float");
2801        }
2802    }
2803
2804    #[test]
2805    fn test_float_statistics_nan_start() {
2806        let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, 1.0, 2.0]);
2807        assert!(stats.is_min_max_backwards_compatible());
2808        if let Statistics::Float(stats) = stats {
2809            assert_eq!(stats.min_opt().unwrap(), &1.0);
2810            assert_eq!(stats.max_opt().unwrap(), &2.0);
2811        } else {
2812            panic!("expecting Statistics::Float");
2813        }
2814    }
2815
2816    #[test]
2817    fn test_float_statistics_nan_only() {
2818        let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, f32::NAN]);
2819        assert!(stats.min_bytes_opt().is_none());
2820        assert!(stats.max_bytes_opt().is_none());
2821        assert!(stats.is_min_max_backwards_compatible());
2822        assert!(matches!(stats, Statistics::Float(_)));
2823    }
2824
2825    #[test]
2826    fn test_float_statistics_zero_only() {
2827        let stats = statistics_roundtrip::<FloatType>(&[0.0]);
2828        assert!(stats.is_min_max_backwards_compatible());
2829        if let Statistics::Float(stats) = stats {
2830            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2831            assert!(stats.min_opt().unwrap().is_sign_negative());
2832            assert_eq!(stats.max_opt().unwrap(), &0.0);
2833            assert!(stats.max_opt().unwrap().is_sign_positive());
2834        } else {
2835            panic!("expecting Statistics::Float");
2836        }
2837    }
2838
2839    #[test]
2840    fn test_float_statistics_neg_zero_only() {
2841        let stats = statistics_roundtrip::<FloatType>(&[-0.0]);
2842        assert!(stats.is_min_max_backwards_compatible());
2843        if let Statistics::Float(stats) = stats {
2844            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2845            assert!(stats.min_opt().unwrap().is_sign_negative());
2846            assert_eq!(stats.max_opt().unwrap(), &0.0);
2847            assert!(stats.max_opt().unwrap().is_sign_positive());
2848        } else {
2849            panic!("expecting Statistics::Float");
2850        }
2851    }
2852
2853    #[test]
2854    fn test_float_statistics_zero_min() {
2855        let stats = statistics_roundtrip::<FloatType>(&[0.0, 1.0, f32::NAN, 2.0]);
2856        assert!(stats.is_min_max_backwards_compatible());
2857        if let Statistics::Float(stats) = stats {
2858            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2859            assert!(stats.min_opt().unwrap().is_sign_negative());
2860            assert_eq!(stats.max_opt().unwrap(), &2.0);
2861        } else {
2862            panic!("expecting Statistics::Float");
2863        }
2864    }
2865
2866    #[test]
2867    fn test_float_statistics_neg_zero_max() {
2868        let stats = statistics_roundtrip::<FloatType>(&[-0.0, -1.0, f32::NAN, -2.0]);
2869        assert!(stats.is_min_max_backwards_compatible());
2870        if let Statistics::Float(stats) = stats {
2871            assert_eq!(stats.min_opt().unwrap(), &-2.0);
2872            assert_eq!(stats.max_opt().unwrap(), &0.0);
2873            assert!(stats.max_opt().unwrap().is_sign_positive());
2874        } else {
2875            panic!("expecting Statistics::Float");
2876        }
2877    }
2878
2879    #[test]
2880    fn test_double_statistics_nan_middle() {
2881        let stats = statistics_roundtrip::<DoubleType>(&[1.0, f64::NAN, 2.0]);
2882        assert!(stats.is_min_max_backwards_compatible());
2883        if let Statistics::Double(stats) = stats {
2884            assert_eq!(stats.min_opt().unwrap(), &1.0);
2885            assert_eq!(stats.max_opt().unwrap(), &2.0);
2886        } else {
2887            panic!("expecting Statistics::Double");
2888        }
2889    }
2890
2891    #[test]
2892    fn test_double_statistics_nan_start() {
2893        let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, 1.0, 2.0]);
2894        assert!(stats.is_min_max_backwards_compatible());
2895        if let Statistics::Double(stats) = stats {
2896            assert_eq!(stats.min_opt().unwrap(), &1.0);
2897            assert_eq!(stats.max_opt().unwrap(), &2.0);
2898        } else {
2899            panic!("expecting Statistics::Double");
2900        }
2901    }
2902
2903    #[test]
2904    fn test_double_statistics_nan_only() {
2905        let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, f64::NAN]);
2906        assert!(stats.min_bytes_opt().is_none());
2907        assert!(stats.max_bytes_opt().is_none());
2908        assert!(matches!(stats, Statistics::Double(_)));
2909        assert!(stats.is_min_max_backwards_compatible());
2910    }
2911
2912    #[test]
2913    fn test_double_statistics_zero_only() {
2914        let stats = statistics_roundtrip::<DoubleType>(&[0.0]);
2915        assert!(stats.is_min_max_backwards_compatible());
2916        if let Statistics::Double(stats) = stats {
2917            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2918            assert!(stats.min_opt().unwrap().is_sign_negative());
2919            assert_eq!(stats.max_opt().unwrap(), &0.0);
2920            assert!(stats.max_opt().unwrap().is_sign_positive());
2921        } else {
2922            panic!("expecting Statistics::Double");
2923        }
2924    }
2925
2926    #[test]
2927    fn test_double_statistics_neg_zero_only() {
2928        let stats = statistics_roundtrip::<DoubleType>(&[-0.0]);
2929        assert!(stats.is_min_max_backwards_compatible());
2930        if let Statistics::Double(stats) = stats {
2931            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2932            assert!(stats.min_opt().unwrap().is_sign_negative());
2933            assert_eq!(stats.max_opt().unwrap(), &0.0);
2934            assert!(stats.max_opt().unwrap().is_sign_positive());
2935        } else {
2936            panic!("expecting Statistics::Double");
2937        }
2938    }
2939
2940    #[test]
2941    fn test_double_statistics_zero_min() {
2942        let stats = statistics_roundtrip::<DoubleType>(&[0.0, 1.0, f64::NAN, 2.0]);
2943        assert!(stats.is_min_max_backwards_compatible());
2944        if let Statistics::Double(stats) = stats {
2945            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2946            assert!(stats.min_opt().unwrap().is_sign_negative());
2947            assert_eq!(stats.max_opt().unwrap(), &2.0);
2948        } else {
2949            panic!("expecting Statistics::Double");
2950        }
2951    }
2952
2953    #[test]
2954    fn test_double_statistics_neg_zero_max() {
2955        let stats = statistics_roundtrip::<DoubleType>(&[-0.0, -1.0, f64::NAN, -2.0]);
2956        assert!(stats.is_min_max_backwards_compatible());
2957        if let Statistics::Double(stats) = stats {
2958            assert_eq!(stats.min_opt().unwrap(), &-2.0);
2959            assert_eq!(stats.max_opt().unwrap(), &0.0);
2960            assert!(stats.max_opt().unwrap().is_sign_positive());
2961        } else {
2962            panic!("expecting Statistics::Double");
2963        }
2964    }
2965
2966    #[test]
2967    fn test_compare_greater_byte_array_decimals() {
2968        assert!(!compare_greater_byte_array_decimals(&[], &[],),);
2969        assert!(compare_greater_byte_array_decimals(&[1u8,], &[],),);
2970        assert!(!compare_greater_byte_array_decimals(&[], &[1u8,],),);
2971        assert!(compare_greater_byte_array_decimals(&[1u8,], &[0u8,],),);
2972        assert!(!compare_greater_byte_array_decimals(&[1u8,], &[1u8,],),);
2973        assert!(compare_greater_byte_array_decimals(&[1u8, 0u8,], &[0u8,],),);
2974        assert!(!compare_greater_byte_array_decimals(
2975            &[0u8, 1u8,],
2976            &[1u8, 0u8,],
2977        ),);
2978        assert!(!compare_greater_byte_array_decimals(
2979            &[255u8, 35u8, 0u8, 0u8,],
2980            &[0u8,],
2981        ),);
2982        assert!(compare_greater_byte_array_decimals(
2983            &[0u8,],
2984            &[255u8, 35u8, 0u8, 0u8,],
2985        ),);
2986    }
2987
2988    #[test]
2989    fn test_column_index_with_null_pages() {
2990        // write a single page of all nulls
2991        let page_writer = get_test_page_writer();
2992        let props = Default::default();
2993        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
2994        writer.write_batch(&[], Some(&[0, 0, 0, 0]), None).unwrap();
2995
2996        let r = writer.close().unwrap();
2997        assert!(r.column_index.is_some());
2998        let col_idx = r.column_index.unwrap();
2999        let col_idx = match col_idx {
3000            ColumnIndexMetaData::INT32(col_idx) => col_idx,
3001            _ => panic!("wrong stats type"),
3002        };
3003        // null_pages should be true for page 0
3004        assert!(col_idx.is_null_page(0));
3005        // min and max should be empty byte arrays
3006        assert!(col_idx.min_value(0).is_none());
3007        assert!(col_idx.max_value(0).is_none());
3008        // null_counts should be defined and be 4 for page 0
3009        assert!(col_idx.null_count(0).is_some());
3010        assert_eq!(col_idx.null_count(0), Some(4));
3011        // there is no repetition so rep histogram should be absent
3012        assert!(col_idx.repetition_level_histogram(0).is_none());
3013        // definition_level_histogram should be present and should be 0:4, 1:0
3014        assert!(col_idx.definition_level_histogram(0).is_some());
3015        assert_eq!(col_idx.definition_level_histogram(0).unwrap(), &[4, 0]);
3016    }
3017
3018    #[test]
3019    fn test_column_offset_index_metadata() {
3020        // write data
3021        // and check the offset index and column index
3022        let page_writer = get_test_page_writer();
3023        let props = Default::default();
3024        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
3025        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
3026        // first page
3027        writer.flush_data_pages().unwrap();
3028        // second page
3029        writer.write_batch(&[4, 8, 2, -5], None, None).unwrap();
3030
3031        let r = writer.close().unwrap();
3032        let column_index = r.column_index.unwrap();
3033        let offset_index = r.offset_index.unwrap();
3034
3035        assert_eq!(8, r.rows_written);
3036
3037        // column index
3038        let column_index = match column_index {
3039            ColumnIndexMetaData::INT32(column_index) => column_index,
3040            _ => panic!("wrong stats type"),
3041        };
3042        assert_eq!(2, column_index.num_pages());
3043        assert_eq!(2, offset_index.page_locations.len());
3044        assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order);
3045        for idx in 0..2 {
3046            assert!(!column_index.is_null_page(idx));
3047            assert_eq!(0, column_index.null_count(0).unwrap());
3048        }
3049
3050        if let Some(stats) = r.metadata.statistics() {
3051            assert_eq!(stats.null_count_opt(), Some(0));
3052            assert_eq!(stats.distinct_count_opt(), None);
3053            if let Statistics::Int32(stats) = stats {
3054                // first page is [1,2,3,4]
3055                // second page is [-5,2,4,8]
3056                // note that we don't increment here, as this is a non BinaryArray type.
3057                assert_eq!(stats.min_opt(), column_index.min_value(1));
3058                assert_eq!(stats.max_opt(), column_index.max_value(1));
3059            } else {
3060                panic!("expecting Statistics::Int32");
3061            }
3062        } else {
3063            panic!("metadata missing statistics");
3064        }
3065
3066        // page location
3067        assert_eq!(0, offset_index.page_locations[0].first_row_index);
3068        assert_eq!(4, offset_index.page_locations[1].first_row_index);
3069    }
3070
3071    /// Verify min/max value truncation in the column index works as expected
3072    #[test]
3073    fn test_column_offset_index_metadata_truncating() {
3074        // write data
3075        // and check the offset index and column index
3076        let page_writer = get_test_page_writer();
3077        let props = WriterProperties::builder()
3078            .set_statistics_truncate_length(None) // disable column index truncation
3079            .build()
3080            .into();
3081        let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3082
3083        let mut data = vec![FixedLenByteArray::default(); 3];
3084        // This is the expected min value - "aaa..."
3085        data[0].set_data(Bytes::from(vec![97_u8; 200]));
3086        // This is the expected max value - "ZZZ..."
3087        data[1].set_data(Bytes::from(vec![112_u8; 200]));
3088        data[2].set_data(Bytes::from(vec![98_u8; 200]));
3089
3090        writer.write_batch(&data, None, None).unwrap();
3091
3092        writer.flush_data_pages().unwrap();
3093
3094        let r = writer.close().unwrap();
3095        let column_index = r.column_index.unwrap();
3096        let offset_index = r.offset_index.unwrap();
3097
3098        let column_index = match column_index {
3099            ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
3100            _ => panic!("wrong stats type"),
3101        };
3102
3103        assert_eq!(3, r.rows_written);
3104
3105        // column index
3106        assert_eq!(1, column_index.num_pages());
3107        assert_eq!(1, offset_index.page_locations.len());
3108        assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
3109        assert!(!column_index.is_null_page(0));
3110        assert_eq!(Some(0), column_index.null_count(0));
3111
3112        if let Some(stats) = r.metadata.statistics() {
3113            assert_eq!(stats.null_count_opt(), Some(0));
3114            assert_eq!(stats.distinct_count_opt(), None);
3115            if let Statistics::FixedLenByteArray(stats) = stats {
3116                let column_index_min_value = column_index.min_value(0).unwrap();
3117                let column_index_max_value = column_index.max_value(0).unwrap();
3118
3119                // Column index stats are truncated, while the column chunk's aren't.
3120                assert_ne!(stats.min_bytes_opt().unwrap(), column_index_min_value);
3121                assert_ne!(stats.max_bytes_opt().unwrap(), column_index_max_value);
3122
3123                assert_eq!(
3124                    column_index_min_value.len(),
3125                    DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
3126                );
3127                assert_eq!(column_index_min_value, &[97_u8; 64]);
3128                assert_eq!(
3129                    column_index_max_value.len(),
3130                    DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
3131                );
3132
3133                // We expect the last byte to be incremented
3134                assert_eq!(
3135                    *column_index_max_value.last().unwrap(),
3136                    *column_index_max_value.first().unwrap() + 1
3137                );
3138            } else {
3139                panic!("expecting Statistics::FixedLenByteArray");
3140            }
3141        } else {
3142            panic!("metadata missing statistics");
3143        }
3144    }
3145
3146    #[test]
3147    fn test_column_offset_index_truncating_spec_example() {
3148        // write data
3149        // and check the offset index and column index
3150        let page_writer = get_test_page_writer();
3151
3152        // Truncate values at 1 byte
3153        let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3154        let props = Arc::new(builder.build());
3155        let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3156
3157        let mut data = vec![FixedLenByteArray::default(); 1];
3158        // This is the expected min value
3159        data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
3160
3161        writer.write_batch(&data, None, None).unwrap();
3162
3163        writer.flush_data_pages().unwrap();
3164
3165        let r = writer.close().unwrap();
3166        let column_index = r.column_index.unwrap();
3167        let offset_index = r.offset_index.unwrap();
3168
3169        let column_index = match column_index {
3170            ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
3171            _ => panic!("wrong stats type"),
3172        };
3173
3174        assert_eq!(1, r.rows_written);
3175
3176        // column index
3177        assert_eq!(1, column_index.num_pages());
3178        assert_eq!(1, offset_index.page_locations.len());
3179        assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
3180        assert!(!column_index.is_null_page(0));
3181        assert_eq!(Some(0), column_index.null_count(0));
3182
3183        if let Some(stats) = r.metadata.statistics() {
3184            assert_eq!(stats.null_count_opt(), Some(0));
3185            assert_eq!(stats.distinct_count_opt(), None);
3186            if let Statistics::FixedLenByteArray(_stats) = stats {
3187                let column_index_min_value = column_index.min_value(0).unwrap();
3188                let column_index_max_value = column_index.max_value(0).unwrap();
3189
3190                assert_eq!(column_index_min_value.len(), 1);
3191                assert_eq!(column_index_max_value.len(), 1);
3192
3193                assert_eq!("B".as_bytes(), column_index_min_value);
3194                assert_eq!("C".as_bytes(), column_index_max_value);
3195
3196                assert_ne!(column_index_min_value, stats.min_bytes_opt().unwrap());
3197                assert_ne!(column_index_max_value, stats.max_bytes_opt().unwrap());
3198            } else {
3199                panic!("expecting Statistics::FixedLenByteArray");
3200            }
3201        } else {
3202            panic!("metadata missing statistics");
3203        }
3204    }
3205
3206    #[test]
3207    fn test_float16_min_max_no_truncation() {
3208        // Even if we set truncation to occur at 1 byte, we should not truncate for Float16
3209        let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3210        let props = Arc::new(builder.build());
3211        let page_writer = get_test_page_writer();
3212        let mut writer = get_test_float16_column_writer(page_writer, props);
3213
3214        let expected_value = f16::PI.to_le_bytes().to_vec();
3215        let data = vec![ByteArray::from(expected_value.clone()).into()];
3216        writer.write_batch(&data, None, None).unwrap();
3217        writer.flush_data_pages().unwrap();
3218
3219        let r = writer.close().unwrap();
3220
3221        // stats should still be written
3222        // ensure bytes weren't truncated for column index
3223        let column_index = r.column_index.unwrap();
3224        let column_index = match column_index {
3225            ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
3226            _ => panic!("wrong stats type"),
3227        };
3228        let column_index_min_bytes = column_index.min_value(0).unwrap();
3229        let column_index_max_bytes = column_index.max_value(0).unwrap();
3230        assert_eq!(expected_value, column_index_min_bytes);
3231        assert_eq!(expected_value, column_index_max_bytes);
3232
3233        // ensure bytes weren't truncated for statistics
3234        let stats = r.metadata.statistics().unwrap();
3235        if let Statistics::FixedLenByteArray(stats) = stats {
3236            let stats_min_bytes = stats.min_bytes_opt().unwrap();
3237            let stats_max_bytes = stats.max_bytes_opt().unwrap();
3238            assert_eq!(expected_value, stats_min_bytes);
3239            assert_eq!(expected_value, stats_max_bytes);
3240        } else {
3241            panic!("expecting Statistics::FixedLenByteArray");
3242        }
3243    }
3244
3245    #[test]
3246    fn test_decimal_min_max_no_truncation() {
3247        // Even if we set truncation to occur at 1 byte, we should not truncate for Decimal
3248        let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3249        let props = Arc::new(builder.build());
3250        let page_writer = get_test_page_writer();
3251        let mut writer =
3252            get_test_decimals_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3253
3254        let expected_value = vec![
3255            255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8, 35u8,
3256            231u8, 90u8, 0u8, 0u8,
3257        ];
3258        let data = vec![ByteArray::from(expected_value.clone()).into()];
3259        writer.write_batch(&data, None, None).unwrap();
3260        writer.flush_data_pages().unwrap();
3261
3262        let r = writer.close().unwrap();
3263
3264        // stats should still be written
3265        // ensure bytes weren't truncated for column index
3266        let column_index = r.column_index.unwrap();
3267        let column_index = match column_index {
3268            ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
3269            _ => panic!("wrong stats type"),
3270        };
3271        let column_index_min_bytes = column_index.min_value(0).unwrap();
3272        let column_index_max_bytes = column_index.max_value(0).unwrap();
3273        assert_eq!(expected_value, column_index_min_bytes);
3274        assert_eq!(expected_value, column_index_max_bytes);
3275
3276        // ensure bytes weren't truncated for statistics
3277        let stats = r.metadata.statistics().unwrap();
3278        if let Statistics::FixedLenByteArray(stats) = stats {
3279            let stats_min_bytes = stats.min_bytes_opt().unwrap();
3280            let stats_max_bytes = stats.max_bytes_opt().unwrap();
3281            assert_eq!(expected_value, stats_min_bytes);
3282            assert_eq!(expected_value, stats_max_bytes);
3283        } else {
3284            panic!("expecting Statistics::FixedLenByteArray");
3285        }
3286    }
3287
3288    #[test]
3289    fn test_statistics_truncating_byte_array_default() {
3290        let page_writer = get_test_page_writer();
3291
3292        // The default truncate length is 64 bytes
3293        let props = WriterProperties::builder().build().into();
3294        let mut writer = get_test_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
3295
3296        let mut data = vec![ByteArray::default(); 1];
3297        data[0].set_data(Bytes::from(String::from(
3298            "This string is longer than 64 bytes, so it will almost certainly be truncated.",
3299        )));
3300        writer.write_batch(&data, None, None).unwrap();
3301        writer.flush_data_pages().unwrap();
3302
3303        let r = writer.close().unwrap();
3304
3305        assert_eq!(1, r.rows_written);
3306
3307        let stats = r.metadata.statistics().expect("statistics");
3308        if let Statistics::ByteArray(_stats) = stats {
3309            let min_value = _stats.min_opt().unwrap();
3310            let max_value = _stats.max_opt().unwrap();
3311
3312            assert!(!_stats.min_is_exact());
3313            assert!(!_stats.max_is_exact());
3314
3315            let expected_len = 64;
3316            assert_eq!(min_value.len(), expected_len);
3317            assert_eq!(max_value.len(), expected_len);
3318
3319            let expected_min =
3320                "This string is longer than 64 bytes, so it will almost certainly".as_bytes();
3321            assert_eq!(expected_min, min_value.as_bytes());
3322            // note the max value is different from the min value: the last byte is incremented
3323            let expected_max =
3324                "This string is longer than 64 bytes, so it will almost certainlz".as_bytes();
3325            assert_eq!(expected_max, max_value.as_bytes());
3326        } else {
3327            panic!("expecting Statistics::ByteArray");
3328        }
3329    }
3330
3331    #[test]
3332    fn test_statistics_truncating_byte_array() {
3333        let page_writer = get_test_page_writer();
3334
3335        const TEST_TRUNCATE_LENGTH: usize = 1;
3336
3337        // Truncate values at 1 byte
3338        let builder =
3339            WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3340        let props = Arc::new(builder.build());
3341        let mut writer = get_test_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
3342
3343        let mut data = vec![ByteArray::default(); 1];
3344        // This is the expected min value
3345        data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
3346
3347        writer.write_batch(&data, None, None).unwrap();
3348
3349        writer.flush_data_pages().unwrap();
3350
3351        let r = writer.close().unwrap();
3352
3353        assert_eq!(1, r.rows_written);
3354
3355        let stats = r.metadata.statistics().expect("statistics");
3356        assert_eq!(stats.null_count_opt(), Some(0));
3357        assert_eq!(stats.distinct_count_opt(), None);
3358        if let Statistics::ByteArray(_stats) = stats {
3359            let min_value = _stats.min_opt().unwrap();
3360            let max_value = _stats.max_opt().unwrap();
3361
3362            assert!(!_stats.min_is_exact());
3363            assert!(!_stats.max_is_exact());
3364
3365            assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3366            assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3367
3368            assert_eq!("B".as_bytes(), min_value.as_bytes());
3369            assert_eq!("C".as_bytes(), max_value.as_bytes());
3370        } else {
3371            panic!("expecting Statistics::ByteArray");
3372        }
3373    }
3374
3375    #[test]
3376    fn test_statistics_truncating_fixed_len_byte_array() {
3377        let page_writer = get_test_page_writer();
3378
3379        const TEST_TRUNCATE_LENGTH: usize = 1;
3380
3381        // Truncate values at 1 byte
3382        let builder =
3383            WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3384        let props = Arc::new(builder.build());
3385        let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3386
3387        let mut data = vec![FixedLenByteArray::default(); 1];
3388
3389        const PSEUDO_DECIMAL_VALUE: i128 = 6541894651216648486512564456564654;
3390        const PSEUDO_DECIMAL_BYTES: [u8; 16] = PSEUDO_DECIMAL_VALUE.to_be_bytes();
3391
3392        const EXPECTED_MIN: [u8; TEST_TRUNCATE_LENGTH] = [PSEUDO_DECIMAL_BYTES[0]]; // parquet specifies big-endian order for decimals
3393        const EXPECTED_MAX: [u8; TEST_TRUNCATE_LENGTH] =
3394            [PSEUDO_DECIMAL_BYTES[0].overflowing_add(1).0];
3395
3396        // This is the expected min value
3397        data[0].set_data(Bytes::from(PSEUDO_DECIMAL_BYTES.as_slice()));
3398
3399        writer.write_batch(&data, None, None).unwrap();
3400
3401        writer.flush_data_pages().unwrap();
3402
3403        let r = writer.close().unwrap();
3404
3405        assert_eq!(1, r.rows_written);
3406
3407        let stats = r.metadata.statistics().expect("statistics");
3408        assert_eq!(stats.null_count_opt(), Some(0));
3409        assert_eq!(stats.distinct_count_opt(), None);
3410        if let Statistics::FixedLenByteArray(_stats) = stats {
3411            let min_value = _stats.min_opt().unwrap();
3412            let max_value = _stats.max_opt().unwrap();
3413
3414            assert!(!_stats.min_is_exact());
3415            assert!(!_stats.max_is_exact());
3416
3417            assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3418            assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3419
3420            assert_eq!(EXPECTED_MIN.as_slice(), min_value.as_bytes());
3421            assert_eq!(EXPECTED_MAX.as_slice(), max_value.as_bytes());
3422
3423            let reconstructed_min = i128::from_be_bytes([
3424                min_value.as_bytes()[0],
3425                0,
3426                0,
3427                0,
3428                0,
3429                0,
3430                0,
3431                0,
3432                0,
3433                0,
3434                0,
3435                0,
3436                0,
3437                0,
3438                0,
3439                0,
3440            ]);
3441
3442            let reconstructed_max = i128::from_be_bytes([
3443                max_value.as_bytes()[0],
3444                0,
3445                0,
3446                0,
3447                0,
3448                0,
3449                0,
3450                0,
3451                0,
3452                0,
3453                0,
3454                0,
3455                0,
3456                0,
3457                0,
3458                0,
3459            ]);
3460
3461            // check that the inner value is correctly bounded by the min/max
3462            println!("min: {reconstructed_min} {PSEUDO_DECIMAL_VALUE}");
3463            assert!(reconstructed_min <= PSEUDO_DECIMAL_VALUE);
3464            println!("max {reconstructed_max} {PSEUDO_DECIMAL_VALUE}");
3465            assert!(reconstructed_max >= PSEUDO_DECIMAL_VALUE);
3466        } else {
3467            panic!("expecting Statistics::FixedLenByteArray");
3468        }
3469    }
3470
3471    #[test]
3472    fn test_send() {
3473        fn test<T: Send>() {}
3474        test::<ColumnWriterImpl<Int32Type>>();
3475    }
3476
3477    #[test]
3478    fn test_increment() {
3479        let v = increment(vec![0, 0, 0]).unwrap();
3480        assert_eq!(&v, &[0, 0, 1]);
3481
3482        // Handle overflow
3483        let v = increment(vec![0, 255, 255]).unwrap();
3484        assert_eq!(&v, &[1, 0, 0]);
3485
3486        // Return `None` if all bytes are u8::MAX
3487        let v = increment(vec![255, 255, 255]);
3488        assert!(v.is_none());
3489    }
3490
3491    #[test]
3492    fn test_increment_utf8() {
3493        let test_inc = |o: &str, expected: &str| {
3494            if let Ok(v) = String::from_utf8(increment_utf8(o).unwrap()) {
3495                // Got the expected result...
3496                assert_eq!(v, expected);
3497                // and it's greater than the original string
3498                assert!(*v > *o);
3499                // Also show that BinaryArray level comparison works here
3500                let mut greater = ByteArray::new();
3501                greater.set_data(Bytes::from(v));
3502                let mut original = ByteArray::new();
3503                original.set_data(Bytes::from(o.as_bytes().to_vec()));
3504                assert!(greater > original);
3505            } else {
3506                panic!("Expected incremented UTF8 string to also be valid.");
3507            }
3508        };
3509
3510        // Basic ASCII case
3511        test_inc("hello", "hellp");
3512
3513        // 1-byte ending in max 1-byte
3514        test_inc("a\u{7f}", "b");
3515
3516        // 1-byte max should not truncate as it would need 2-byte code points
3517        assert!(increment_utf8("\u{7f}\u{7f}").is_none());
3518
3519        // UTF8 string
3520        test_inc("❤️🧡💛💚💙💜", "❤️🧡💛💚💙💝");
3521
3522        // 2-byte without overflow
3523        test_inc("éééé", "éééê");
3524
3525        // 2-byte that overflows lowest byte
3526        test_inc("\u{ff}\u{ff}", "\u{ff}\u{100}");
3527
3528        // 2-byte ending in max 2-byte
3529        test_inc("a\u{7ff}", "b");
3530
3531        // Max 2-byte should not truncate as it would need 3-byte code points
3532        assert!(increment_utf8("\u{7ff}\u{7ff}").is_none());
3533
3534        // 3-byte without overflow [U+800, U+800] -> [U+800, U+801] (note that these
3535        // characters should render right to left).
3536        test_inc("ࠀࠀ", "ࠀࠁ");
3537
3538        // 3-byte ending in max 3-byte
3539        test_inc("a\u{ffff}", "b");
3540
3541        // Max 3-byte should not truncate as it would need 4-byte code points
3542        assert!(increment_utf8("\u{ffff}\u{ffff}").is_none());
3543
3544        // 4-byte without overflow
3545        test_inc("𐀀𐀀", "𐀀𐀁");
3546
3547        // 4-byte ending in max unicode
3548        test_inc("a\u{10ffff}", "b");
3549
3550        // Max 4-byte should not truncate
3551        assert!(increment_utf8("\u{10ffff}\u{10ffff}").is_none());
3552
3553        // Skip over surrogate pair range (0xD800..=0xDFFF)
3554        //test_inc("a\u{D7FF}", "a\u{e000}");
3555        test_inc("a\u{D7FF}", "b");
3556    }
3557
3558    #[test]
3559    fn test_truncate_utf8() {
3560        // No-op
3561        let data = "❤️🧡💛💚💙💜";
3562        let r = truncate_utf8(data, data.len()).unwrap();
3563        assert_eq!(r.len(), data.len());
3564        assert_eq!(&r, data.as_bytes());
3565
3566        // We slice it away from the UTF8 boundary
3567        let r = truncate_utf8(data, 13).unwrap();
3568        assert_eq!(r.len(), 10);
3569        assert_eq!(&r, "❤️🧡".as_bytes());
3570
3571        // One multi-byte code point, and a length shorter than it, so we can't slice it
3572        let r = truncate_utf8("\u{0836}", 1);
3573        assert!(r.is_none());
3574
3575        // Test truncate and increment for max bounds on UTF-8 statistics
3576        // 7-bit (i.e. ASCII)
3577        let r = truncate_and_increment_utf8("yyyyyyyyy", 8).unwrap();
3578        assert_eq!(&r, "yyyyyyyz".as_bytes());
3579
3580        // 2-byte without overflow
3581        let r = truncate_and_increment_utf8("ééééé", 7).unwrap();
3582        assert_eq!(&r, "ééê".as_bytes());
3583
3584        // 2-byte that overflows lowest byte
3585        let r = truncate_and_increment_utf8("\u{ff}\u{ff}\u{ff}\u{ff}\u{ff}", 8).unwrap();
3586        assert_eq!(&r, "\u{ff}\u{ff}\u{ff}\u{100}".as_bytes());
3587
3588        // max 2-byte should not truncate as it would need 3-byte code points
3589        let r = truncate_and_increment_utf8("߿߿߿߿߿", 8);
3590        assert!(r.is_none());
3591
3592        // 3-byte without overflow [U+800, U+800, U+800] -> [U+800, U+801] (note that these
3593        // characters should render right to left).
3594        let r = truncate_and_increment_utf8("ࠀࠀࠀࠀ", 8).unwrap();
3595        assert_eq!(&r, "ࠀࠁ".as_bytes());
3596
3597        // max 3-byte should not truncate as it would need 4-byte code points
3598        let r = truncate_and_increment_utf8("\u{ffff}\u{ffff}\u{ffff}", 8);
3599        assert!(r.is_none());
3600
3601        // 4-byte without overflow
3602        let r = truncate_and_increment_utf8("𐀀𐀀𐀀𐀀", 9).unwrap();
3603        assert_eq!(&r, "𐀀𐀁".as_bytes());
3604
3605        // max 4-byte should not truncate
3606        let r = truncate_and_increment_utf8("\u{10ffff}\u{10ffff}", 8);
3607        assert!(r.is_none());
3608    }
3609
3610    #[test]
3611    // Check fallback truncation of statistics that should be UTF-8, but aren't
3612    // (see https://github.com/apache/arrow-rs/pull/6870).
3613    fn test_byte_array_truncate_invalid_utf8_statistics() {
3614        let message_type = "
3615            message test_schema {
3616                OPTIONAL BYTE_ARRAY a (UTF8);
3617            }
3618        ";
3619        let schema = Arc::new(parse_message_type(message_type).unwrap());
3620
3621        // Create Vec<ByteArray> containing non-UTF8 bytes
3622        let data = vec![ByteArray::from(vec![128u8; 32]); 7];
3623        let def_levels = [1, 1, 1, 1, 0, 1, 0, 1, 0, 1];
3624        let file: File = tempfile::tempfile().unwrap();
3625        let props = Arc::new(
3626            WriterProperties::builder()
3627                .set_statistics_enabled(EnabledStatistics::Chunk)
3628                .set_statistics_truncate_length(Some(8))
3629                .build(),
3630        );
3631
3632        let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
3633        let mut row_group_writer = writer.next_row_group().unwrap();
3634
3635        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
3636        col_writer
3637            .typed::<ByteArrayType>()
3638            .write_batch(&data, Some(&def_levels), None)
3639            .unwrap();
3640        col_writer.close().unwrap();
3641        row_group_writer.close().unwrap();
3642        let file_metadata = writer.close().unwrap();
3643        let stats = file_metadata.row_group(0).column(0).statistics().unwrap();
3644        assert!(!stats.max_is_exact());
3645        // Truncation of invalid UTF-8 should fall back to binary truncation, so last byte should
3646        // be incremented by 1.
3647        assert_eq!(
3648            stats.max_bytes_opt().map(|v| v.to_vec()),
3649            Some([128, 128, 128, 128, 128, 128, 128, 129].to_vec())
3650        );
3651    }
3652
3653    #[test]
3654    fn test_increment_max_binary_chars() {
3655        let r = increment(vec![0xFF, 0xFE, 0xFD, 0xFF, 0xFF]);
3656        assert_eq!(&r.unwrap(), &[0xFF, 0xFE, 0xFE, 0x00, 0x00]);
3657
3658        let incremented = increment(vec![0xFF, 0xFF, 0xFF]);
3659        assert!(incremented.is_none())
3660    }
3661
3662    #[test]
3663    fn test_no_column_index_when_stats_disabled() {
3664        // https://github.com/apache/arrow-rs/issues/6010
3665        // Test that column index is not created/written for all-nulls column when page
3666        // statistics are disabled.
3667        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3668        let props = Arc::new(
3669            WriterProperties::builder()
3670                .set_statistics_enabled(EnabledStatistics::None)
3671                .build(),
3672        );
3673        let column_writer = get_column_writer(descr, props, get_test_page_writer());
3674        let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3675
3676        let data = Vec::new();
3677        let def_levels = vec![0; 10];
3678        writer.write_batch(&data, Some(&def_levels), None).unwrap();
3679        writer.flush_data_pages().unwrap();
3680
3681        let column_close_result = writer.close().unwrap();
3682        assert!(column_close_result.offset_index.is_some());
3683        assert!(column_close_result.column_index.is_none());
3684    }
3685
3686    #[test]
3687    fn test_no_offset_index_when_disabled() {
3688        // Test that offset indexes can be disabled
3689        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3690        let props = Arc::new(
3691            WriterProperties::builder()
3692                .set_statistics_enabled(EnabledStatistics::None)
3693                .set_offset_index_disabled(true)
3694                .build(),
3695        );
3696        let column_writer = get_column_writer(descr, props, get_test_page_writer());
3697        let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3698
3699        let data = Vec::new();
3700        let def_levels = vec![0; 10];
3701        writer.write_batch(&data, Some(&def_levels), None).unwrap();
3702        writer.flush_data_pages().unwrap();
3703
3704        let column_close_result = writer.close().unwrap();
3705        assert!(column_close_result.offset_index.is_none());
3706        assert!(column_close_result.column_index.is_none());
3707    }
3708
3709    #[test]
3710    fn test_offset_index_overridden() {
3711        // Test that offset indexes are not disabled when gathering page statistics
3712        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3713        let props = Arc::new(
3714            WriterProperties::builder()
3715                .set_statistics_enabled(EnabledStatistics::Page)
3716                .set_offset_index_disabled(true)
3717                .build(),
3718        );
3719        let column_writer = get_column_writer(descr, props, get_test_page_writer());
3720        let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3721
3722        let data = Vec::new();
3723        let def_levels = vec![0; 10];
3724        writer.write_batch(&data, Some(&def_levels), None).unwrap();
3725        writer.flush_data_pages().unwrap();
3726
3727        let column_close_result = writer.close().unwrap();
3728        assert!(column_close_result.offset_index.is_some());
3729        assert!(column_close_result.column_index.is_some());
3730    }
3731
3732    #[test]
3733    fn test_boundary_order() -> Result<()> {
3734        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3735        // min max both ascending
3736        let column_close_result = write_multiple_pages::<Int32Type>(
3737            &descr,
3738            &[
3739                &[Some(-10), Some(10)],
3740                &[Some(-5), Some(11)],
3741                &[None],
3742                &[Some(-5), Some(11)],
3743            ],
3744        )?;
3745        let boundary_order = column_close_result
3746            .column_index
3747            .unwrap()
3748            .get_boundary_order();
3749        assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3750
3751        // min max both descending
3752        let column_close_result = write_multiple_pages::<Int32Type>(
3753            &descr,
3754            &[
3755                &[Some(10), Some(11)],
3756                &[Some(5), Some(11)],
3757                &[None],
3758                &[Some(-5), Some(0)],
3759            ],
3760        )?;
3761        let boundary_order = column_close_result
3762            .column_index
3763            .unwrap()
3764            .get_boundary_order();
3765        assert_eq!(boundary_order, Some(BoundaryOrder::DESCENDING));
3766
3767        // min max both equal
3768        let column_close_result = write_multiple_pages::<Int32Type>(
3769            &descr,
3770            &[&[Some(10), Some(11)], &[None], &[Some(10), Some(11)]],
3771        )?;
3772        let boundary_order = column_close_result
3773            .column_index
3774            .unwrap()
3775            .get_boundary_order();
3776        assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3777
3778        // only nulls
3779        let column_close_result =
3780            write_multiple_pages::<Int32Type>(&descr, &[&[None], &[None], &[None]])?;
3781        let boundary_order = column_close_result
3782            .column_index
3783            .unwrap()
3784            .get_boundary_order();
3785        assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3786
3787        // one page
3788        let column_close_result =
3789            write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)]])?;
3790        let boundary_order = column_close_result
3791            .column_index
3792            .unwrap()
3793            .get_boundary_order();
3794        assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3795
3796        // one non-null page
3797        let column_close_result =
3798            write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)], &[None]])?;
3799        let boundary_order = column_close_result
3800            .column_index
3801            .unwrap()
3802            .get_boundary_order();
3803        assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3804
3805        // min max both unordered
3806        let column_close_result = write_multiple_pages::<Int32Type>(
3807            &descr,
3808            &[
3809                &[Some(10), Some(11)],
3810                &[Some(11), Some(16)],
3811                &[None],
3812                &[Some(-5), Some(0)],
3813            ],
3814        )?;
3815        let boundary_order = column_close_result
3816            .column_index
3817            .unwrap()
3818            .get_boundary_order();
3819        assert_eq!(boundary_order, Some(BoundaryOrder::UNORDERED));
3820
3821        // min max both ordered in different orders
3822        let column_close_result = write_multiple_pages::<Int32Type>(
3823            &descr,
3824            &[
3825                &[Some(1), Some(9)],
3826                &[Some(2), Some(8)],
3827                &[None],
3828                &[Some(3), Some(7)],
3829            ],
3830        )?;
3831        let boundary_order = column_close_result
3832            .column_index
3833            .unwrap()
3834            .get_boundary_order();
3835        assert_eq!(boundary_order, Some(BoundaryOrder::UNORDERED));
3836
3837        Ok(())
3838    }
3839
3840    #[test]
3841    fn test_boundary_order_logical_type() -> Result<()> {
3842        // ensure that logical types account for different sort order than underlying
3843        // physical type representation
3844        let f16_descr = Arc::new(get_test_float16_column_descr(1, 0));
3845        let fba_descr = {
3846            let tpe = SchemaType::primitive_type_builder(
3847                "col",
3848                FixedLenByteArrayType::get_physical_type(),
3849            )
3850            .with_length(2)
3851            .build()?;
3852            Arc::new(ColumnDescriptor::new(
3853                Arc::new(tpe),
3854                1,
3855                0,
3856                ColumnPath::from("col"),
3857            ))
3858        };
3859
3860        let values: &[&[Option<FixedLenByteArray>]] = &[
3861            &[Some(FixedLenByteArray::from(ByteArray::from(f16::ONE)))],
3862            &[Some(FixedLenByteArray::from(ByteArray::from(f16::ZERO)))],
3863            &[Some(FixedLenByteArray::from(ByteArray::from(
3864                f16::NEG_ZERO,
3865            )))],
3866            &[Some(FixedLenByteArray::from(ByteArray::from(f16::NEG_ONE)))],
3867        ];
3868
3869        // f16 descending
3870        let column_close_result =
3871            write_multiple_pages::<FixedLenByteArrayType>(&f16_descr, values)?;
3872        let boundary_order = column_close_result
3873            .column_index
3874            .unwrap()
3875            .get_boundary_order();
3876        assert_eq!(boundary_order, Some(BoundaryOrder::DESCENDING));
3877
3878        // same bytes, but fba unordered
3879        let column_close_result =
3880            write_multiple_pages::<FixedLenByteArrayType>(&fba_descr, values)?;
3881        let boundary_order = column_close_result
3882            .column_index
3883            .unwrap()
3884            .get_boundary_order();
3885        assert_eq!(boundary_order, Some(BoundaryOrder::UNORDERED));
3886
3887        Ok(())
3888    }
3889
3890    #[test]
3891    fn test_interval_stats_should_not_have_min_max() {
3892        let input = [
3893            vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
3894            vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
3895            vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2],
3896        ]
3897        .into_iter()
3898        .map(|s| ByteArray::from(s).into())
3899        .collect::<Vec<_>>();
3900
3901        let page_writer = get_test_page_writer();
3902        let mut writer = get_test_interval_column_writer(page_writer);
3903        writer.write_batch(&input, None, None).unwrap();
3904
3905        let metadata = writer.close().unwrap().metadata;
3906        let stats = if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
3907            stats.clone()
3908        } else {
3909            panic!("metadata missing statistics");
3910        };
3911        assert!(stats.min_bytes_opt().is_none());
3912        assert!(stats.max_bytes_opt().is_none());
3913    }
3914
3915    #[test]
3916    #[cfg(feature = "arrow")]
3917    fn test_column_writer_get_estimated_total_bytes() {
3918        let page_writer = get_test_page_writer();
3919        let props = Default::default();
3920        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
3921        assert_eq!(writer.get_estimated_total_bytes(), 0);
3922
3923        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
3924        writer.add_data_page().unwrap();
3925        let size_with_one_page = writer.get_estimated_total_bytes();
3926        assert_eq!(size_with_one_page, 20);
3927
3928        writer.write_batch(&[5, 6, 7, 8], None, None).unwrap();
3929        writer.add_data_page().unwrap();
3930        let size_with_two_pages = writer.get_estimated_total_bytes();
3931        // different pages have different compressed lengths
3932        assert_eq!(size_with_two_pages, 20 + 21);
3933    }
3934
3935    fn write_multiple_pages<T: DataType>(
3936        column_descr: &Arc<ColumnDescriptor>,
3937        pages: &[&[Option<T::T>]],
3938    ) -> Result<ColumnCloseResult> {
3939        let column_writer = get_column_writer(
3940            column_descr.clone(),
3941            Default::default(),
3942            get_test_page_writer(),
3943        );
3944        let mut writer = get_typed_column_writer::<T>(column_writer);
3945
3946        for &page in pages {
3947            let values = page.iter().filter_map(Clone::clone).collect::<Vec<_>>();
3948            let def_levels = page
3949                .iter()
3950                .map(|maybe_value| if maybe_value.is_some() { 1 } else { 0 })
3951                .collect::<Vec<_>>();
3952            writer.write_batch(&values, Some(&def_levels), None)?;
3953            writer.flush_data_pages()?;
3954        }
3955
3956        writer.close()
3957    }
3958
3959    /// Performs write-read roundtrip with randomly generated values and levels.
3960    /// `max_size` is maximum number of values or levels (if `max_def_level` > 0) to write
3961    /// for a column.
3962    fn column_roundtrip_random<T: DataType>(
3963        props: WriterProperties,
3964        max_size: usize,
3965        min_value: T::T,
3966        max_value: T::T,
3967        max_def_level: i16,
3968        max_rep_level: i16,
3969    ) where
3970        T::T: PartialOrd + SampleUniform + Copy,
3971    {
3972        let mut num_values: usize = 0;
3973
3974        let mut buf: Vec<i16> = Vec::new();
3975        let def_levels = if max_def_level > 0 {
3976            random_numbers_range(max_size, 0, max_def_level + 1, &mut buf);
3977            for &dl in &buf[..] {
3978                if dl == max_def_level {
3979                    num_values += 1;
3980                }
3981            }
3982            Some(&buf[..])
3983        } else {
3984            num_values = max_size;
3985            None
3986        };
3987
3988        let mut buf: Vec<i16> = Vec::new();
3989        let rep_levels = if max_rep_level > 0 {
3990            random_numbers_range(max_size, 0, max_rep_level + 1, &mut buf);
3991            buf[0] = 0; // Must start on record boundary
3992            Some(&buf[..])
3993        } else {
3994            None
3995        };
3996
3997        let mut values: Vec<T::T> = Vec::new();
3998        random_numbers_range(num_values, min_value, max_value, &mut values);
3999
4000        column_roundtrip::<T>(props, &values[..], def_levels, rep_levels);
4001    }
4002
4003    /// Performs write-read roundtrip and asserts written values and levels.
4004    fn column_roundtrip<T: DataType>(
4005        props: WriterProperties,
4006        values: &[T::T],
4007        def_levels: Option<&[i16]>,
4008        rep_levels: Option<&[i16]>,
4009    ) {
4010        let mut file = tempfile::tempfile().unwrap();
4011        let mut write = TrackedWrite::new(&mut file);
4012        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
4013
4014        let max_def_level = match def_levels {
4015            Some(buf) => *buf.iter().max().unwrap_or(&0i16),
4016            None => 0i16,
4017        };
4018
4019        let max_rep_level = match rep_levels {
4020            Some(buf) => *buf.iter().max().unwrap_or(&0i16),
4021            None => 0i16,
4022        };
4023
4024        let mut max_batch_size = values.len();
4025        if let Some(levels) = def_levels {
4026            max_batch_size = max_batch_size.max(levels.len());
4027        }
4028        if let Some(levels) = rep_levels {
4029            max_batch_size = max_batch_size.max(levels.len());
4030        }
4031
4032        let mut writer =
4033            get_test_column_writer::<T>(page_writer, max_def_level, max_rep_level, Arc::new(props));
4034
4035        let values_written = writer.write_batch(values, def_levels, rep_levels).unwrap();
4036        assert_eq!(values_written, values.len());
4037        let result = writer.close().unwrap();
4038
4039        drop(write);
4040
4041        let props = ReaderProperties::builder()
4042            .set_backward_compatible_lz4(false)
4043            .build();
4044        let page_reader = Box::new(
4045            SerializedPageReader::new_with_properties(
4046                Arc::new(file),
4047                &result.metadata,
4048                result.rows_written as usize,
4049                None,
4050                Arc::new(props),
4051            )
4052            .unwrap(),
4053        );
4054        let mut reader = get_test_column_reader::<T>(page_reader, max_def_level, max_rep_level);
4055
4056        let mut actual_values = Vec::with_capacity(max_batch_size);
4057        let mut actual_def_levels = def_levels.map(|_| Vec::with_capacity(max_batch_size));
4058        let mut actual_rep_levels = rep_levels.map(|_| Vec::with_capacity(max_batch_size));
4059
4060        let (_, values_read, levels_read) = reader
4061            .read_records(
4062                max_batch_size,
4063                actual_def_levels.as_mut(),
4064                actual_rep_levels.as_mut(),
4065                &mut actual_values,
4066            )
4067            .unwrap();
4068
4069        // Assert values, definition and repetition levels.
4070
4071        assert_eq!(&actual_values[..values_read], values);
4072        match actual_def_levels {
4073            Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), def_levels),
4074            None => assert_eq!(None, def_levels),
4075        }
4076        match actual_rep_levels {
4077            Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), rep_levels),
4078            None => assert_eq!(None, rep_levels),
4079        }
4080
4081        // Assert written rows.
4082
4083        if let Some(levels) = actual_rep_levels {
4084            let mut actual_rows_written = 0;
4085            for l in levels {
4086                if l == 0 {
4087                    actual_rows_written += 1;
4088                }
4089            }
4090            assert_eq!(actual_rows_written, result.rows_written);
4091        } else if actual_def_levels.is_some() {
4092            assert_eq!(levels_read as u64, result.rows_written);
4093        } else {
4094            assert_eq!(values_read as u64, result.rows_written);
4095        }
4096    }
4097
4098    /// Performs write of provided values and returns column metadata of those values.
4099    /// Used to test encoding support for column writer.
4100    fn column_write_and_get_metadata<T: DataType>(
4101        props: WriterProperties,
4102        values: &[T::T],
4103    ) -> ColumnChunkMetaData {
4104        let page_writer = get_test_page_writer();
4105        let props = Arc::new(props);
4106        let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
4107        writer.write_batch(values, None, None).unwrap();
4108        writer.close().unwrap().metadata
4109    }
4110
4111    // Helper function to more compactly create a PageEncodingStats struct.
4112    fn encoding_stats(page_type: PageType, encoding: Encoding, count: i32) -> PageEncodingStats {
4113        PageEncodingStats {
4114            page_type,
4115            encoding,
4116            count,
4117        }
4118    }
4119
4120    // Function to use in tests for EncodingWriteSupport. This checks that dictionary
4121    // offset and encodings to make sure that column writer uses provided by trait
4122    // encodings.
4123    fn check_encoding_write_support<T: DataType>(
4124        version: WriterVersion,
4125        dict_enabled: bool,
4126        data: &[T::T],
4127        dictionary_page_offset: Option<i64>,
4128        encodings: &[Encoding],
4129        page_encoding_stats: &[PageEncodingStats],
4130    ) {
4131        let props = WriterProperties::builder()
4132            .set_writer_version(version)
4133            .set_dictionary_enabled(dict_enabled)
4134            .build();
4135        let meta = column_write_and_get_metadata::<T>(props, data);
4136        assert_eq!(meta.dictionary_page_offset(), dictionary_page_offset);
4137        assert_eq!(meta.encodings().collect::<Vec<_>>(), encodings);
4138        assert_eq!(meta.page_encoding_stats().unwrap(), page_encoding_stats);
4139    }
4140
4141    /// Returns column writer.
4142    fn get_test_column_writer<'a, T: DataType>(
4143        page_writer: Box<dyn PageWriter + 'a>,
4144        max_def_level: i16,
4145        max_rep_level: i16,
4146        props: WriterPropertiesPtr,
4147    ) -> ColumnWriterImpl<'a, T> {
4148        let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
4149        let column_writer = get_column_writer(descr, props, page_writer);
4150        get_typed_column_writer::<T>(column_writer)
4151    }
4152
4153    fn get_test_column_writer_with_path<'a, T: DataType>(
4154        page_writer: Box<dyn PageWriter + 'a>,
4155        max_def_level: i16,
4156        max_rep_level: i16,
4157        props: WriterPropertiesPtr,
4158        path: ColumnPath,
4159    ) -> ColumnWriterImpl<'a, T> {
4160        let descr = Arc::new(get_test_column_descr_with_path::<T>(
4161            max_def_level,
4162            max_rep_level,
4163            path,
4164        ));
4165        let column_writer = get_column_writer(descr, props, page_writer);
4166        get_typed_column_writer::<T>(column_writer)
4167    }
4168
4169    /// Returns column reader.
4170    fn get_test_column_reader<T: DataType>(
4171        page_reader: Box<dyn PageReader>,
4172        max_def_level: i16,
4173        max_rep_level: i16,
4174    ) -> ColumnReaderImpl<T> {
4175        let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
4176        let column_reader = get_column_reader(descr, page_reader);
4177        get_typed_column_reader::<T>(column_reader)
4178    }
4179
4180    /// Returns descriptor for primitive column.
4181    fn get_test_column_descr<T: DataType>(
4182        max_def_level: i16,
4183        max_rep_level: i16,
4184    ) -> ColumnDescriptor {
4185        let path = ColumnPath::from("col");
4186        let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4187            // length is set for "encoding support" tests for FIXED_LEN_BYTE_ARRAY type,
4188            // it should be no-op for other types
4189            .with_length(1)
4190            .build()
4191            .unwrap();
4192        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4193    }
4194
4195    fn get_test_column_descr_with_path<T: DataType>(
4196        max_def_level: i16,
4197        max_rep_level: i16,
4198        path: ColumnPath,
4199    ) -> ColumnDescriptor {
4200        let name = path.string();
4201        let tpe = SchemaType::primitive_type_builder(&name, T::get_physical_type())
4202            // length is set for "encoding support" tests for FIXED_LEN_BYTE_ARRAY type,
4203            // it should be no-op for other types
4204            .with_length(1)
4205            .build()
4206            .unwrap();
4207        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4208    }
4209
4210    fn write_and_collect_page_values(
4211        path: ColumnPath,
4212        props: WriterPropertiesPtr,
4213        data: &[i32],
4214    ) -> Vec<u32> {
4215        let mut file = tempfile::tempfile().unwrap();
4216        let mut write = TrackedWrite::new(&mut file);
4217        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
4218        let mut writer =
4219            get_test_column_writer_with_path::<Int32Type>(page_writer, 0, 0, props, path);
4220        writer.write_batch(data, None, None).unwrap();
4221        let r = writer.close().unwrap();
4222
4223        drop(write);
4224
4225        let props = ReaderProperties::builder()
4226            .set_backward_compatible_lz4(false)
4227            .build();
4228        let mut page_reader = Box::new(
4229            SerializedPageReader::new_with_properties(
4230                Arc::new(file),
4231                &r.metadata,
4232                r.rows_written as usize,
4233                None,
4234                Arc::new(props),
4235            )
4236            .unwrap(),
4237        );
4238
4239        let mut values_per_page = Vec::new();
4240        while let Some(page) = page_reader.get_next_page().unwrap() {
4241            assert_eq!(page.page_type(), PageType::DATA_PAGE);
4242            values_per_page.push(page.num_values());
4243        }
4244
4245        values_per_page
4246    }
4247
4248    /// Returns page writer that collects pages without serializing them.
4249    fn get_test_page_writer() -> Box<dyn PageWriter> {
4250        Box::new(TestPageWriter {})
4251    }
4252
4253    struct TestPageWriter {}
4254
4255    impl PageWriter for TestPageWriter {
4256        fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
4257            let mut res = PageWriteSpec::new();
4258            res.page_type = page.page_type();
4259            res.uncompressed_size = page.uncompressed_size();
4260            res.compressed_size = page.compressed_size();
4261            res.num_values = page.num_values();
4262            res.offset = 0;
4263            res.bytes_written = page.data().len() as u64;
4264            Ok(res)
4265        }
4266
4267        fn close(&mut self) -> Result<()> {
4268            Ok(())
4269        }
4270    }
4271
4272    /// Write data into parquet using [`get_test_page_writer`] and [`get_test_column_writer`] and returns generated statistics.
4273    fn statistics_roundtrip<T: DataType>(values: &[<T as DataType>::T]) -> Statistics {
4274        let page_writer = get_test_page_writer();
4275        let props = Default::default();
4276        let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
4277        writer.write_batch(values, None, None).unwrap();
4278
4279        let metadata = writer.close().unwrap().metadata;
4280        if let Some(stats) = metadata.statistics() {
4281            stats.clone()
4282        } else {
4283            panic!("metadata missing statistics");
4284        }
4285    }
4286
4287    /// Returns Decimals column writer.
4288    fn get_test_decimals_column_writer<T: DataType>(
4289        page_writer: Box<dyn PageWriter>,
4290        max_def_level: i16,
4291        max_rep_level: i16,
4292        props: WriterPropertiesPtr,
4293    ) -> ColumnWriterImpl<'static, T> {
4294        let descr = Arc::new(get_test_decimals_column_descr::<T>(
4295            max_def_level,
4296            max_rep_level,
4297        ));
4298        let column_writer = get_column_writer(descr, props, page_writer);
4299        get_typed_column_writer::<T>(column_writer)
4300    }
4301
4302    /// Returns descriptor for Decimal type with primitive column.
4303    fn get_test_decimals_column_descr<T: DataType>(
4304        max_def_level: i16,
4305        max_rep_level: i16,
4306    ) -> ColumnDescriptor {
4307        let path = ColumnPath::from("col");
4308        let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4309            .with_length(16)
4310            .with_logical_type(Some(LogicalType::Decimal {
4311                scale: 2,
4312                precision: 3,
4313            }))
4314            .with_scale(2)
4315            .with_precision(3)
4316            .build()
4317            .unwrap();
4318        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4319    }
4320
4321    fn float16_statistics_roundtrip(
4322        values: &[FixedLenByteArray],
4323    ) -> ValueStatistics<FixedLenByteArray> {
4324        let page_writer = get_test_page_writer();
4325        let mut writer = get_test_float16_column_writer(page_writer, Default::default());
4326        writer.write_batch(values, None, None).unwrap();
4327
4328        let metadata = writer.close().unwrap().metadata;
4329        if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
4330            stats.clone()
4331        } else {
4332            panic!("metadata missing statistics");
4333        }
4334    }
4335
4336    fn get_test_float16_column_writer(
4337        page_writer: Box<dyn PageWriter>,
4338        props: WriterPropertiesPtr,
4339    ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
4340        let descr = Arc::new(get_test_float16_column_descr(0, 0));
4341        let column_writer = get_column_writer(descr, props, page_writer);
4342        get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
4343    }
4344
4345    fn get_test_float16_column_descr(max_def_level: i16, max_rep_level: i16) -> ColumnDescriptor {
4346        let path = ColumnPath::from("col");
4347        let tpe =
4348            SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
4349                .with_length(2)
4350                .with_logical_type(Some(LogicalType::Float16))
4351                .build()
4352                .unwrap();
4353        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4354    }
4355
4356    fn get_test_interval_column_writer(
4357        page_writer: Box<dyn PageWriter>,
4358    ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
4359        let descr = Arc::new(get_test_interval_column_descr());
4360        let column_writer = get_column_writer(descr, Default::default(), page_writer);
4361        get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
4362    }
4363
4364    fn get_test_interval_column_descr() -> ColumnDescriptor {
4365        let path = ColumnPath::from("col");
4366        let tpe =
4367            SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
4368                .with_length(12)
4369                .with_converted_type(ConvertedType::INTERVAL)
4370                .build()
4371                .unwrap();
4372        ColumnDescriptor::new(Arc::new(tpe), 0, 0, path)
4373    }
4374
4375    /// Returns column writer for UINT32 Column provided as ConvertedType only
4376    fn get_test_unsigned_int_given_as_converted_column_writer<'a, T: DataType>(
4377        page_writer: Box<dyn PageWriter + 'a>,
4378        max_def_level: i16,
4379        max_rep_level: i16,
4380        props: WriterPropertiesPtr,
4381    ) -> ColumnWriterImpl<'a, T> {
4382        let descr = Arc::new(get_test_converted_type_unsigned_integer_column_descr::<T>(
4383            max_def_level,
4384            max_rep_level,
4385        ));
4386        let column_writer = get_column_writer(descr, props, page_writer);
4387        get_typed_column_writer::<T>(column_writer)
4388    }
4389
4390    /// Returns column descriptor for UINT32 Column provided as ConvertedType only
4391    fn get_test_converted_type_unsigned_integer_column_descr<T: DataType>(
4392        max_def_level: i16,
4393        max_rep_level: i16,
4394    ) -> ColumnDescriptor {
4395        let path = ColumnPath::from("col");
4396        let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4397            .with_converted_type(ConvertedType::UINT_32)
4398            .build()
4399            .unwrap();
4400        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4401    }
4402
4403    #[test]
4404    fn test_page_v2_snappy_compression_fallback() {
4405        // Test that PageV2 sets is_compressed to false when Snappy compression increases data size
4406        let page_writer = TestPageWriter {};
4407
4408        // Create WriterProperties with PageV2 and Snappy compression
4409        let props = WriterProperties::builder()
4410            .set_writer_version(WriterVersion::PARQUET_2_0)
4411            // Disable dictionary to ensure data is written directly
4412            .set_dictionary_enabled(false)
4413            .set_compression(Compression::SNAPPY)
4414            .build();
4415
4416        let mut column_writer =
4417            get_test_column_writer::<ByteArrayType>(Box::new(page_writer), 0, 0, Arc::new(props));
4418
4419        // Create small, simple data that Snappy compression will likely increase in size
4420        // due to compression overhead for very small data
4421        let values = vec![ByteArray::from("a")];
4422
4423        column_writer.write_batch(&values, None, None).unwrap();
4424
4425        let result = column_writer.close().unwrap();
4426        assert_eq!(
4427            result.metadata.uncompressed_size(),
4428            result.metadata.compressed_size()
4429        );
4430    }
4431}