parquet/column/writer/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains column writer API.
19
20use bytes::Bytes;
21use half::f16;
22
23use crate::bloom_filter::Sbbf;
24use crate::format::{BoundaryOrder, ColumnIndex, OffsetIndex};
25use std::collections::{BTreeSet, VecDeque};
26use std::str;
27
28use crate::basic::{Compression, ConvertedType, Encoding, LogicalType, PageType, Type};
29use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter};
30use crate::column::writer::encoder::{ColumnValueEncoder, ColumnValueEncoderImpl, ColumnValues};
31use crate::compression::{create_codec, Codec, CodecOptionsBuilder};
32use crate::data_type::private::ParquetValueType;
33use crate::data_type::*;
34use crate::encodings::levels::LevelEncoder;
35#[cfg(feature = "encryption")]
36use crate::encryption::encrypt::get_column_crypto_metadata;
37use crate::errors::{ParquetError, Result};
38use crate::file::metadata::{
39    ColumnChunkMetaData, ColumnChunkMetaDataBuilder, ColumnIndexBuilder, LevelHistogram,
40    OffsetIndexBuilder,
41};
42use crate::file::page_encoding_stats::PageEncodingStats;
43use crate::file::properties::{
44    EnabledStatistics, WriterProperties, WriterPropertiesPtr, WriterVersion,
45};
46use crate::file::statistics::{Statistics, ValueStatistics};
47use crate::schema::types::{ColumnDescPtr, ColumnDescriptor};
48
49pub(crate) mod encoder;
50
51macro_rules! downcast_writer {
52    ($e:expr, $i:ident, $b:expr) => {
53        match $e {
54            Self::BoolColumnWriter($i) => $b,
55            Self::Int32ColumnWriter($i) => $b,
56            Self::Int64ColumnWriter($i) => $b,
57            Self::Int96ColumnWriter($i) => $b,
58            Self::FloatColumnWriter($i) => $b,
59            Self::DoubleColumnWriter($i) => $b,
60            Self::ByteArrayColumnWriter($i) => $b,
61            Self::FixedLenByteArrayColumnWriter($i) => $b,
62        }
63    };
64}
65
66/// Column writer for a Parquet type.
67pub enum ColumnWriter<'a> {
68    /// Column writer for boolean type
69    BoolColumnWriter(ColumnWriterImpl<'a, BoolType>),
70    /// Column writer for int32 type
71    Int32ColumnWriter(ColumnWriterImpl<'a, Int32Type>),
72    /// Column writer for int64 type
73    Int64ColumnWriter(ColumnWriterImpl<'a, Int64Type>),
74    /// Column writer for int96 (timestamp) type
75    Int96ColumnWriter(ColumnWriterImpl<'a, Int96Type>),
76    /// Column writer for float type
77    FloatColumnWriter(ColumnWriterImpl<'a, FloatType>),
78    /// Column writer for double type
79    DoubleColumnWriter(ColumnWriterImpl<'a, DoubleType>),
80    /// Column writer for byte array type
81    ByteArrayColumnWriter(ColumnWriterImpl<'a, ByteArrayType>),
82    /// Column writer for fixed length byte array type
83    FixedLenByteArrayColumnWriter(ColumnWriterImpl<'a, FixedLenByteArrayType>),
84}
85
86impl ColumnWriter<'_> {
87    /// Returns the estimated total memory usage
88    #[cfg(feature = "arrow")]
89    pub(crate) fn memory_size(&self) -> usize {
90        downcast_writer!(self, typed, typed.memory_size())
91    }
92
93    /// Returns the estimated total encoded bytes for this column writer
94    #[cfg(feature = "arrow")]
95    pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
96        downcast_writer!(self, typed, typed.get_estimated_total_bytes())
97    }
98
99    /// Close this [`ColumnWriter`]
100    pub fn close(self) -> Result<ColumnCloseResult> {
101        downcast_writer!(self, typed, typed.close())
102    }
103}
104
105#[deprecated(
106    since = "54.0.0",
107    note = "Seems like a stray and nobody knows what's it for. Will be removed in the next release."
108)]
109#[allow(missing_docs)]
110pub enum Level {
111    Page,
112    Column,
113}
114
115/// Gets a specific column writer corresponding to column descriptor `descr`.
116pub fn get_column_writer<'a>(
117    descr: ColumnDescPtr,
118    props: WriterPropertiesPtr,
119    page_writer: Box<dyn PageWriter + 'a>,
120) -> ColumnWriter<'a> {
121    match descr.physical_type() {
122        Type::BOOLEAN => {
123            ColumnWriter::BoolColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
124        }
125        Type::INT32 => {
126            ColumnWriter::Int32ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
127        }
128        Type::INT64 => {
129            ColumnWriter::Int64ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
130        }
131        Type::INT96 => {
132            ColumnWriter::Int96ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
133        }
134        Type::FLOAT => {
135            ColumnWriter::FloatColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
136        }
137        Type::DOUBLE => {
138            ColumnWriter::DoubleColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
139        }
140        Type::BYTE_ARRAY => {
141            ColumnWriter::ByteArrayColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
142        }
143        Type::FIXED_LEN_BYTE_ARRAY => ColumnWriter::FixedLenByteArrayColumnWriter(
144            ColumnWriterImpl::new(descr, props, page_writer),
145        ),
146    }
147}
148
149/// Gets a typed column writer for the specific type `T`, by "up-casting" `col_writer` of
150/// non-generic type to a generic column writer type `ColumnWriterImpl`.
151///
152/// Panics if actual enum value for `col_writer` does not match the type `T`.
153pub fn get_typed_column_writer<T: DataType>(col_writer: ColumnWriter) -> ColumnWriterImpl<T> {
154    T::get_column_writer(col_writer).unwrap_or_else(|| {
155        panic!(
156            "Failed to convert column writer into a typed column writer for `{}` type",
157            T::get_physical_type()
158        )
159    })
160}
161
162/// Similar to `get_typed_column_writer` but returns a reference.
163pub fn get_typed_column_writer_ref<'a, 'b: 'a, T: DataType>(
164    col_writer: &'b ColumnWriter<'a>,
165) -> &'b ColumnWriterImpl<'a, T> {
166    T::get_column_writer_ref(col_writer).unwrap_or_else(|| {
167        panic!(
168            "Failed to convert column writer into a typed column writer for `{}` type",
169            T::get_physical_type()
170        )
171    })
172}
173
174/// Similar to `get_typed_column_writer` but returns a reference.
175pub fn get_typed_column_writer_mut<'a, 'b: 'a, T: DataType>(
176    col_writer: &'a mut ColumnWriter<'b>,
177) -> &'a mut ColumnWriterImpl<'b, T> {
178    T::get_column_writer_mut(col_writer).unwrap_or_else(|| {
179        panic!(
180            "Failed to convert column writer into a typed column writer for `{}` type",
181            T::get_physical_type()
182        )
183    })
184}
185
186/// Metadata returned by [`GenericColumnWriter::close`]
187#[derive(Debug, Clone)]
188pub struct ColumnCloseResult {
189    /// The total number of bytes written
190    pub bytes_written: u64,
191    /// The total number of rows written
192    pub rows_written: u64,
193    /// Metadata for this column chunk
194    pub metadata: ColumnChunkMetaData,
195    /// Optional bloom filter for this column
196    pub bloom_filter: Option<Sbbf>,
197    /// Optional column index, for filtering
198    pub column_index: Option<ColumnIndex>,
199    /// Optional offset index, identifying page locations
200    pub offset_index: Option<OffsetIndex>,
201}
202
203// Metrics per page
204#[derive(Default)]
205struct PageMetrics {
206    num_buffered_values: u32,
207    num_buffered_rows: u32,
208    num_page_nulls: u64,
209    repetition_level_histogram: Option<LevelHistogram>,
210    definition_level_histogram: Option<LevelHistogram>,
211}
212
213impl PageMetrics {
214    fn new() -> Self {
215        Default::default()
216    }
217
218    /// Initialize the repetition level histogram
219    fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
220        self.repetition_level_histogram = LevelHistogram::try_new(max_level);
221        self
222    }
223
224    /// Initialize the definition level histogram
225    fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
226        self.definition_level_histogram = LevelHistogram::try_new(max_level);
227        self
228    }
229
230    /// Resets the state of this `PageMetrics` to the initial state.
231    /// If histograms have been initialized their contents will be reset to zero.
232    fn new_page(&mut self) {
233        self.num_buffered_values = 0;
234        self.num_buffered_rows = 0;
235        self.num_page_nulls = 0;
236        self.repetition_level_histogram
237            .as_mut()
238            .map(LevelHistogram::reset);
239        self.definition_level_histogram
240            .as_mut()
241            .map(LevelHistogram::reset);
242    }
243
244    /// Updates histogram values using provided repetition levels
245    fn update_repetition_level_histogram(&mut self, levels: &[i16]) {
246        if let Some(ref mut rep_hist) = self.repetition_level_histogram {
247            rep_hist.update_from_levels(levels);
248        }
249    }
250
251    /// Updates histogram values using provided definition levels
252    fn update_definition_level_histogram(&mut self, levels: &[i16]) {
253        if let Some(ref mut def_hist) = self.definition_level_histogram {
254            def_hist.update_from_levels(levels);
255        }
256    }
257}
258
259// Metrics per column writer
260#[derive(Default)]
261struct ColumnMetrics<T: Default> {
262    total_bytes_written: u64,
263    total_rows_written: u64,
264    total_uncompressed_size: u64,
265    total_compressed_size: u64,
266    total_num_values: u64,
267    dictionary_page_offset: Option<u64>,
268    data_page_offset: Option<u64>,
269    min_column_value: Option<T>,
270    max_column_value: Option<T>,
271    num_column_nulls: u64,
272    column_distinct_count: Option<u64>,
273    variable_length_bytes: Option<i64>,
274    repetition_level_histogram: Option<LevelHistogram>,
275    definition_level_histogram: Option<LevelHistogram>,
276}
277
278impl<T: Default> ColumnMetrics<T> {
279    fn new() -> Self {
280        Default::default()
281    }
282
283    /// Initialize the repetition level histogram
284    fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
285        self.repetition_level_histogram = LevelHistogram::try_new(max_level);
286        self
287    }
288
289    /// Initialize the definition level histogram
290    fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
291        self.definition_level_histogram = LevelHistogram::try_new(max_level);
292        self
293    }
294
295    /// Sum `page_histogram` into `chunk_histogram`
296    fn update_histogram(
297        chunk_histogram: &mut Option<LevelHistogram>,
298        page_histogram: &Option<LevelHistogram>,
299    ) {
300        if let (Some(page_hist), Some(chunk_hist)) = (page_histogram, chunk_histogram) {
301            chunk_hist.add(page_hist);
302        }
303    }
304
305    /// Sum the provided PageMetrics histograms into the chunk histograms. Does nothing if
306    /// page histograms are not initialized.
307    fn update_from_page_metrics(&mut self, page_metrics: &PageMetrics) {
308        ColumnMetrics::<T>::update_histogram(
309            &mut self.definition_level_histogram,
310            &page_metrics.definition_level_histogram,
311        );
312        ColumnMetrics::<T>::update_histogram(
313            &mut self.repetition_level_histogram,
314            &page_metrics.repetition_level_histogram,
315        );
316    }
317
318    /// Sum the provided page variable_length_bytes into the chunk variable_length_bytes
319    fn update_variable_length_bytes(&mut self, variable_length_bytes: Option<i64>) {
320        if let Some(var_bytes) = variable_length_bytes {
321            *self.variable_length_bytes.get_or_insert(0) += var_bytes;
322        }
323    }
324}
325
326/// Typed column writer for a primitive column.
327pub type ColumnWriterImpl<'a, T> = GenericColumnWriter<'a, ColumnValueEncoderImpl<T>>;
328
329/// Generic column writer for a primitive column.
330pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> {
331    // Column writer properties
332    descr: ColumnDescPtr,
333    props: WriterPropertiesPtr,
334    statistics_enabled: EnabledStatistics,
335
336    page_writer: Box<dyn PageWriter + 'a>,
337    codec: Compression,
338    compressor: Option<Box<dyn Codec>>,
339    encoder: E,
340
341    page_metrics: PageMetrics,
342    // Metrics per column writer
343    column_metrics: ColumnMetrics<E::T>,
344
345    /// The order of encodings within the generated metadata does not impact its meaning,
346    /// but we use a BTreeSet so that the output is deterministic
347    encodings: BTreeSet<Encoding>,
348    encoding_stats: Vec<PageEncodingStats>,
349    // Reused buffers
350    def_levels_sink: Vec<i16>,
351    rep_levels_sink: Vec<i16>,
352    data_pages: VecDeque<CompressedPage>,
353    // column index and offset index
354    column_index_builder: ColumnIndexBuilder,
355    offset_index_builder: Option<OffsetIndexBuilder>,
356
357    // Below fields used to incrementally check boundary order across data pages.
358    // We assume they are ascending/descending until proven wrong.
359    data_page_boundary_ascending: bool,
360    data_page_boundary_descending: bool,
361    /// (min, max)
362    last_non_null_data_page_min_max: Option<(E::T, E::T)>,
363}
364
365impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
366    /// Returns a new instance of [`GenericColumnWriter`].
367    pub fn new(
368        descr: ColumnDescPtr,
369        props: WriterPropertiesPtr,
370        page_writer: Box<dyn PageWriter + 'a>,
371    ) -> Self {
372        let codec = props.compression(descr.path());
373        let codec_options = CodecOptionsBuilder::default().build();
374        let compressor = create_codec(codec, &codec_options).unwrap();
375        let encoder = E::try_new(&descr, props.as_ref()).unwrap();
376
377        let statistics_enabled = props.statistics_enabled(descr.path());
378
379        let mut encodings = BTreeSet::new();
380        // Used for level information
381        encodings.insert(Encoding::RLE);
382
383        let mut page_metrics = PageMetrics::new();
384        let mut column_metrics = ColumnMetrics::<E::T>::new();
385
386        // Initialize level histograms if collecting page or chunk statistics
387        if statistics_enabled != EnabledStatistics::None {
388            page_metrics = page_metrics
389                .with_repetition_level_histogram(descr.max_rep_level())
390                .with_definition_level_histogram(descr.max_def_level());
391            column_metrics = column_metrics
392                .with_repetition_level_histogram(descr.max_rep_level())
393                .with_definition_level_histogram(descr.max_def_level())
394        }
395
396        // Disable column_index_builder if not collecting page statistics.
397        let mut column_index_builder = ColumnIndexBuilder::new();
398        if statistics_enabled != EnabledStatistics::Page {
399            column_index_builder.to_invalid()
400        }
401
402        // Disable offset_index_builder if requested by user.
403        let offset_index_builder = match props.offset_index_disabled() {
404            false => Some(OffsetIndexBuilder::new()),
405            _ => None,
406        };
407
408        Self {
409            descr,
410            props,
411            statistics_enabled,
412            page_writer,
413            codec,
414            compressor,
415            encoder,
416            def_levels_sink: vec![],
417            rep_levels_sink: vec![],
418            data_pages: VecDeque::new(),
419            page_metrics,
420            column_metrics,
421            column_index_builder,
422            offset_index_builder,
423            encodings,
424            encoding_stats: vec![],
425            data_page_boundary_ascending: true,
426            data_page_boundary_descending: true,
427            last_non_null_data_page_min_max: None,
428        }
429    }
430
431    #[allow(clippy::too_many_arguments)]
432    pub(crate) fn write_batch_internal(
433        &mut self,
434        values: &E::Values,
435        value_indices: Option<&[usize]>,
436        def_levels: Option<&[i16]>,
437        rep_levels: Option<&[i16]>,
438        min: Option<&E::T>,
439        max: Option<&E::T>,
440        distinct_count: Option<u64>,
441    ) -> Result<usize> {
442        // Check if number of definition levels is the same as number of repetition levels.
443        if let (Some(def), Some(rep)) = (def_levels, rep_levels) {
444            if def.len() != rep.len() {
445                return Err(general_err!(
446                    "Inconsistent length of definition and repetition levels: {} != {}",
447                    def.len(),
448                    rep.len()
449                ));
450            }
451        }
452
453        // We check for DataPage limits only after we have inserted the values. If a user
454        // writes a large number of values, the DataPage size can be well above the limit.
455        //
456        // The purpose of this chunking is to bound this. Even if a user writes large
457        // number of values, the chunking will ensure that we add data page at a
458        // reasonable pagesize limit.
459
460        // TODO: find out why we don't account for size of levels when we estimate page
461        // size.
462
463        let num_levels = match def_levels {
464            Some(def_levels) => def_levels.len(),
465            None => values.len(),
466        };
467
468        if let Some(min) = min {
469            update_min(&self.descr, min, &mut self.column_metrics.min_column_value);
470        }
471        if let Some(max) = max {
472            update_max(&self.descr, max, &mut self.column_metrics.max_column_value);
473        }
474
475        // We can only set the distinct count if there are no other writes
476        if self.encoder.num_values() == 0 {
477            self.column_metrics.column_distinct_count = distinct_count;
478        } else {
479            self.column_metrics.column_distinct_count = None;
480        }
481
482        let mut values_offset = 0;
483        let mut levels_offset = 0;
484        let base_batch_size = self.props.write_batch_size();
485        while levels_offset < num_levels {
486            let mut end_offset = num_levels.min(levels_offset + base_batch_size);
487
488            // Split at record boundary
489            if let Some(r) = rep_levels {
490                while end_offset < r.len() && r[end_offset] != 0 {
491                    end_offset += 1;
492                }
493            }
494
495            values_offset += self.write_mini_batch(
496                values,
497                values_offset,
498                value_indices,
499                end_offset - levels_offset,
500                def_levels.map(|lv| &lv[levels_offset..end_offset]),
501                rep_levels.map(|lv| &lv[levels_offset..end_offset]),
502            )?;
503            levels_offset = end_offset;
504        }
505
506        // Return total number of values processed.
507        Ok(values_offset)
508    }
509
510    /// Writes batch of values, definition levels and repetition levels.
511    /// Returns number of values processed (written).
512    ///
513    /// If definition and repetition levels are provided, we write fully those levels and
514    /// select how many values to write (this number will be returned), since number of
515    /// actual written values may be smaller than provided values.
516    ///
517    /// If only values are provided, then all values are written and the length of
518    /// of the values buffer is returned.
519    ///
520    /// Definition and/or repetition levels can be omitted, if values are
521    /// non-nullable and/or non-repeated.
522    pub fn write_batch(
523        &mut self,
524        values: &E::Values,
525        def_levels: Option<&[i16]>,
526        rep_levels: Option<&[i16]>,
527    ) -> Result<usize> {
528        self.write_batch_internal(values, None, def_levels, rep_levels, None, None, None)
529    }
530
531    /// Writer may optionally provide pre-calculated statistics for use when computing
532    /// chunk-level statistics
533    ///
534    /// NB: [`WriterProperties::statistics_enabled`] must be set to [`EnabledStatistics::Chunk`]
535    /// for these statistics to take effect. If [`EnabledStatistics::None`] they will be ignored,
536    /// and if [`EnabledStatistics::Page`] the chunk statistics will instead be computed from the
537    /// computed page statistics
538    pub fn write_batch_with_statistics(
539        &mut self,
540        values: &E::Values,
541        def_levels: Option<&[i16]>,
542        rep_levels: Option<&[i16]>,
543        min: Option<&E::T>,
544        max: Option<&E::T>,
545        distinct_count: Option<u64>,
546    ) -> Result<usize> {
547        self.write_batch_internal(
548            values,
549            None,
550            def_levels,
551            rep_levels,
552            min,
553            max,
554            distinct_count,
555        )
556    }
557
558    /// Returns the estimated total memory usage.
559    ///
560    /// Unlike [`Self::get_estimated_total_bytes`] this is an estimate
561    /// of the current memory usage and not the final anticipated encoded size.
562    #[cfg(feature = "arrow")]
563    pub(crate) fn memory_size(&self) -> usize {
564        self.column_metrics.total_bytes_written as usize + self.encoder.estimated_memory_size()
565    }
566
567    /// Returns total number of bytes written by this column writer so far.
568    /// This value is also returned when column writer is closed.
569    ///
570    /// Note: this value does not include any buffered data that has not
571    /// yet been flushed to a page.
572    pub fn get_total_bytes_written(&self) -> u64 {
573        self.column_metrics.total_bytes_written
574    }
575
576    /// Returns the estimated total encoded bytes for this column writer.
577    ///
578    /// Unlike [`Self::get_total_bytes_written`] this includes an estimate
579    /// of any data that has not yet been flushed to a page, based on it's
580    /// anticipated encoded size.
581    #[cfg(feature = "arrow")]
582    pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
583        self.data_pages
584            .iter()
585            .map(|page| page.data().len() as u64)
586            .sum::<u64>()
587            + self.column_metrics.total_bytes_written
588            + self.encoder.estimated_data_page_size() as u64
589            + self.encoder.estimated_dict_page_size().unwrap_or_default() as u64
590    }
591
592    /// Returns total number of rows written by this column writer so far.
593    /// This value is also returned when column writer is closed.
594    pub fn get_total_rows_written(&self) -> u64 {
595        self.column_metrics.total_rows_written
596    }
597
598    /// Returns a reference to a [`ColumnDescPtr`]
599    pub fn get_descriptor(&self) -> &ColumnDescPtr {
600        &self.descr
601    }
602
603    /// Finalizes writes and closes the column writer.
604    /// Returns total bytes written, total rows written and column chunk metadata.
605    pub fn close(mut self) -> Result<ColumnCloseResult> {
606        if self.page_metrics.num_buffered_values > 0 {
607            self.add_data_page()?;
608        }
609        if self.encoder.has_dictionary() {
610            self.write_dictionary_page()?;
611        }
612        self.flush_data_pages()?;
613        let metadata = self.build_column_metadata()?;
614        self.page_writer.close()?;
615
616        let boundary_order = match (
617            self.data_page_boundary_ascending,
618            self.data_page_boundary_descending,
619        ) {
620            // If the lists are composed of equal elements then will be marked as ascending
621            // (Also the case if all pages are null pages)
622            (true, _) => BoundaryOrder::ASCENDING,
623            (false, true) => BoundaryOrder::DESCENDING,
624            (false, false) => BoundaryOrder::UNORDERED,
625        };
626        self.column_index_builder.set_boundary_order(boundary_order);
627
628        let column_index = self
629            .column_index_builder
630            .valid()
631            .then(|| self.column_index_builder.build_to_thrift());
632
633        let offset_index = self.offset_index_builder.map(|b| b.build_to_thrift());
634
635        Ok(ColumnCloseResult {
636            bytes_written: self.column_metrics.total_bytes_written,
637            rows_written: self.column_metrics.total_rows_written,
638            bloom_filter: self.encoder.flush_bloom_filter(),
639            metadata,
640            column_index,
641            offset_index,
642        })
643    }
644
645    /// Writes mini batch of values, definition and repetition levels.
646    /// This allows fine-grained processing of values and maintaining a reasonable
647    /// page size.
648    fn write_mini_batch(
649        &mut self,
650        values: &E::Values,
651        values_offset: usize,
652        value_indices: Option<&[usize]>,
653        num_levels: usize,
654        def_levels: Option<&[i16]>,
655        rep_levels: Option<&[i16]>,
656    ) -> Result<usize> {
657        // Process definition levels and determine how many values to write.
658        let values_to_write = if self.descr.max_def_level() > 0 {
659            let levels = def_levels.ok_or_else(|| {
660                general_err!(
661                    "Definition levels are required, because max definition level = {}",
662                    self.descr.max_def_level()
663                )
664            })?;
665
666            let mut values_to_write = 0;
667            for &level in levels {
668                if level == self.descr.max_def_level() {
669                    values_to_write += 1;
670                } else {
671                    // We must always compute this as it is used to populate v2 pages
672                    self.page_metrics.num_page_nulls += 1
673                }
674            }
675
676            // Update histogram
677            self.page_metrics.update_definition_level_histogram(levels);
678
679            self.def_levels_sink.extend_from_slice(levels);
680            values_to_write
681        } else {
682            num_levels
683        };
684
685        // Process repetition levels and determine how many rows we are about to process.
686        if self.descr.max_rep_level() > 0 {
687            // A row could contain more than one value.
688            let levels = rep_levels.ok_or_else(|| {
689                general_err!(
690                    "Repetition levels are required, because max repetition level = {}",
691                    self.descr.max_rep_level()
692                )
693            })?;
694
695            if !levels.is_empty() && levels[0] != 0 {
696                return Err(general_err!(
697                    "Write must start at a record boundary, got non-zero repetition level of {}",
698                    levels[0]
699                ));
700            }
701
702            // Count the occasions where we start a new row
703            for &level in levels {
704                self.page_metrics.num_buffered_rows += (level == 0) as u32
705            }
706
707            // Update histogram
708            self.page_metrics.update_repetition_level_histogram(levels);
709
710            self.rep_levels_sink.extend_from_slice(levels);
711        } else {
712            // Each value is exactly one row.
713            // Equals to the number of values, we count nulls as well.
714            self.page_metrics.num_buffered_rows += num_levels as u32;
715        }
716
717        match value_indices {
718            Some(indices) => {
719                let indices = &indices[values_offset..values_offset + values_to_write];
720                self.encoder.write_gather(values, indices)?;
721            }
722            None => self.encoder.write(values, values_offset, values_to_write)?,
723        }
724
725        self.page_metrics.num_buffered_values += num_levels as u32;
726
727        if self.should_add_data_page() {
728            self.add_data_page()?;
729        }
730
731        if self.should_dict_fallback() {
732            self.dict_fallback()?;
733        }
734
735        Ok(values_to_write)
736    }
737
738    /// Returns true if we need to fall back to non-dictionary encoding.
739    ///
740    /// We can only fall back if dictionary encoder is set and we have exceeded dictionary
741    /// size.
742    #[inline]
743    fn should_dict_fallback(&self) -> bool {
744        match self.encoder.estimated_dict_page_size() {
745            Some(size) => size >= self.props.dictionary_page_size_limit(),
746            None => false,
747        }
748    }
749
750    /// Returns true if there is enough data for a data page, false otherwise.
751    #[inline]
752    fn should_add_data_page(&self) -> bool {
753        // This is necessary in the event of a much larger dictionary size than page size
754        //
755        // In such a scenario the dictionary decoder may return an estimated encoded
756        // size in excess of the page size limit, even when there are no buffered values
757        if self.page_metrics.num_buffered_values == 0 {
758            return false;
759        }
760
761        self.page_metrics.num_buffered_rows as usize >= self.props.data_page_row_count_limit()
762            || self.encoder.estimated_data_page_size() >= self.props.data_page_size_limit()
763    }
764
765    /// Performs dictionary fallback.
766    /// Prepares and writes dictionary and all data pages into page writer.
767    fn dict_fallback(&mut self) -> Result<()> {
768        // At this point we know that we need to fall back.
769        if self.page_metrics.num_buffered_values > 0 {
770            self.add_data_page()?;
771        }
772        self.write_dictionary_page()?;
773        self.flush_data_pages()?;
774        Ok(())
775    }
776
777    /// Update the column index and offset index when adding the data page
778    fn update_column_offset_index(
779        &mut self,
780        page_statistics: Option<&ValueStatistics<E::T>>,
781        page_variable_length_bytes: Option<i64>,
782    ) {
783        // update the column index
784        let null_page =
785            (self.page_metrics.num_buffered_rows as u64) == self.page_metrics.num_page_nulls;
786        // a page contains only null values,
787        // and writers have to set the corresponding entries in min_values and max_values to byte[0]
788        if null_page && self.column_index_builder.valid() {
789            self.column_index_builder.append(
790                null_page,
791                vec![],
792                vec![],
793                self.page_metrics.num_page_nulls as i64,
794            );
795        } else if self.column_index_builder.valid() {
796            // from page statistics
797            // If can't get the page statistics, ignore this column/offset index for this column chunk
798            match &page_statistics {
799                None => {
800                    self.column_index_builder.to_invalid();
801                }
802                Some(stat) => {
803                    // Check if min/max are still ascending/descending across pages
804                    let new_min = stat.min_opt().unwrap();
805                    let new_max = stat.max_opt().unwrap();
806                    if let Some((last_min, last_max)) = &self.last_non_null_data_page_min_max {
807                        if self.data_page_boundary_ascending {
808                            // If last min/max are greater than new min/max then not ascending anymore
809                            let not_ascending = compare_greater(&self.descr, last_min, new_min)
810                                || compare_greater(&self.descr, last_max, new_max);
811                            if not_ascending {
812                                self.data_page_boundary_ascending = false;
813                            }
814                        }
815
816                        if self.data_page_boundary_descending {
817                            // If new min/max are greater than last min/max then not descending anymore
818                            let not_descending = compare_greater(&self.descr, new_min, last_min)
819                                || compare_greater(&self.descr, new_max, last_max);
820                            if not_descending {
821                                self.data_page_boundary_descending = false;
822                            }
823                        }
824                    }
825                    self.last_non_null_data_page_min_max = Some((new_min.clone(), new_max.clone()));
826
827                    if self.can_truncate_value() {
828                        self.column_index_builder.append(
829                            null_page,
830                            self.truncate_min_value(
831                                self.props.column_index_truncate_length(),
832                                stat.min_bytes_opt().unwrap(),
833                            )
834                            .0,
835                            self.truncate_max_value(
836                                self.props.column_index_truncate_length(),
837                                stat.max_bytes_opt().unwrap(),
838                            )
839                            .0,
840                            self.page_metrics.num_page_nulls as i64,
841                        );
842                    } else {
843                        self.column_index_builder.append(
844                            null_page,
845                            stat.min_bytes_opt().unwrap().to_vec(),
846                            stat.max_bytes_opt().unwrap().to_vec(),
847                            self.page_metrics.num_page_nulls as i64,
848                        );
849                    }
850                }
851            }
852        }
853
854        // Append page histograms to the `ColumnIndex` histograms
855        self.column_index_builder.append_histograms(
856            &self.page_metrics.repetition_level_histogram,
857            &self.page_metrics.definition_level_histogram,
858        );
859
860        // Update the offset index
861        if let Some(builder) = self.offset_index_builder.as_mut() {
862            builder.append_row_count(self.page_metrics.num_buffered_rows as i64);
863            builder.append_unencoded_byte_array_data_bytes(page_variable_length_bytes);
864        }
865    }
866
867    /// Determine if we should allow truncating min/max values for this column's statistics
868    fn can_truncate_value(&self) -> bool {
869        match self.descr.physical_type() {
870            // Don't truncate for Float16 and Decimal because their sort order is different
871            // from that of FIXED_LEN_BYTE_ARRAY sort order.
872            // So truncation of those types could lead to inaccurate min/max statistics
873            Type::FIXED_LEN_BYTE_ARRAY
874                if !matches!(
875                    self.descr.logical_type(),
876                    Some(LogicalType::Decimal { .. }) | Some(LogicalType::Float16)
877                ) =>
878            {
879                true
880            }
881            Type::BYTE_ARRAY => true,
882            // Truncation only applies for fba/binary physical types
883            _ => false,
884        }
885    }
886
887    /// Returns `true` if this column's logical type is a UTF-8 string.
888    fn is_utf8(&self) -> bool {
889        self.get_descriptor().logical_type() == Some(LogicalType::String)
890            || self.get_descriptor().converted_type() == ConvertedType::UTF8
891    }
892
893    /// Truncates a binary statistic to at most `truncation_length` bytes.
894    ///
895    /// If truncation is not possible, returns `data`.
896    ///
897    /// The `bool` in the returned tuple indicates whether truncation occurred or not.
898    ///
899    /// UTF-8 Note:
900    /// If the column type indicates UTF-8, and `data` contains valid UTF-8, then the result will
901    /// also remain valid UTF-8, but may be less tnan `truncation_length` bytes to avoid splitting
902    /// on non-character boundaries.
903    fn truncate_min_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
904        truncation_length
905            .filter(|l| data.len() > *l)
906            .and_then(|l|
907                // don't do extra work if this column isn't UTF-8
908                if self.is_utf8() {
909                    match str::from_utf8(data) {
910                        Ok(str_data) => truncate_utf8(str_data, l),
911                        Err(_) => Some(data[..l].to_vec()),
912                    }
913                } else {
914                    Some(data[..l].to_vec())
915                }
916            )
917            .map(|truncated| (truncated, true))
918            .unwrap_or_else(|| (data.to_vec(), false))
919    }
920
921    /// Truncates a binary statistic to at most `truncation_length` bytes, and then increment the
922    /// final byte(s) to yield a valid upper bound. This may result in a result of less than
923    /// `truncation_length` bytes if the last byte(s) overflows.
924    ///
925    /// If truncation is not possible, returns `data`.
926    ///
927    /// The `bool` in the returned tuple indicates whether truncation occurred or not.
928    ///
929    /// UTF-8 Note:
930    /// If the column type indicates UTF-8, and `data` contains valid UTF-8, then the result will
931    /// also remain valid UTF-8 (but again may be less than `truncation_length` bytes). If `data`
932    /// does not contain valid UTF-8, then truncation will occur as if the column is non-string
933    /// binary.
934    fn truncate_max_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
935        truncation_length
936            .filter(|l| data.len() > *l)
937            .and_then(|l|
938                // don't do extra work if this column isn't UTF-8
939                if self.is_utf8() {
940                    match str::from_utf8(data) {
941                        Ok(str_data) => truncate_and_increment_utf8(str_data, l),
942                        Err(_) => increment(data[..l].to_vec()),
943                    }
944                } else {
945                    increment(data[..l].to_vec())
946                }
947            )
948            .map(|truncated| (truncated, true))
949            .unwrap_or_else(|| (data.to_vec(), false))
950    }
951
952    /// Adds data page.
953    /// Data page is either buffered in case of dictionary encoding or written directly.
954    fn add_data_page(&mut self) -> Result<()> {
955        // Extract encoded values
956        let values_data = self.encoder.flush_data_page()?;
957
958        let max_def_level = self.descr.max_def_level();
959        let max_rep_level = self.descr.max_rep_level();
960
961        self.column_metrics.num_column_nulls += self.page_metrics.num_page_nulls;
962
963        let page_statistics = match (values_data.min_value, values_data.max_value) {
964            (Some(min), Some(max)) => {
965                // Update chunk level statistics
966                update_min(&self.descr, &min, &mut self.column_metrics.min_column_value);
967                update_max(&self.descr, &max, &mut self.column_metrics.max_column_value);
968
969                (self.statistics_enabled == EnabledStatistics::Page).then_some(
970                    ValueStatistics::new(
971                        Some(min),
972                        Some(max),
973                        None,
974                        Some(self.page_metrics.num_page_nulls),
975                        false,
976                    ),
977                )
978            }
979            _ => None,
980        };
981
982        // update column and offset index
983        self.update_column_offset_index(
984            page_statistics.as_ref(),
985            values_data.variable_length_bytes,
986        );
987
988        // Update histograms and variable_length_bytes in column_metrics
989        self.column_metrics
990            .update_from_page_metrics(&self.page_metrics);
991        self.column_metrics
992            .update_variable_length_bytes(values_data.variable_length_bytes);
993
994        let page_statistics = page_statistics.map(Statistics::from);
995
996        let compressed_page = match self.props.writer_version() {
997            WriterVersion::PARQUET_1_0 => {
998                let mut buffer = vec![];
999
1000                if max_rep_level > 0 {
1001                    buffer.extend_from_slice(
1002                        &self.encode_levels_v1(
1003                            Encoding::RLE,
1004                            &self.rep_levels_sink[..],
1005                            max_rep_level,
1006                        )[..],
1007                    );
1008                }
1009
1010                if max_def_level > 0 {
1011                    buffer.extend_from_slice(
1012                        &self.encode_levels_v1(
1013                            Encoding::RLE,
1014                            &self.def_levels_sink[..],
1015                            max_def_level,
1016                        )[..],
1017                    );
1018                }
1019
1020                buffer.extend_from_slice(&values_data.buf);
1021                let uncompressed_size = buffer.len();
1022
1023                if let Some(ref mut cmpr) = self.compressor {
1024                    let mut compressed_buf = Vec::with_capacity(uncompressed_size);
1025                    cmpr.compress(&buffer[..], &mut compressed_buf)?;
1026                    buffer = compressed_buf;
1027                }
1028
1029                let data_page = Page::DataPage {
1030                    buf: buffer.into(),
1031                    num_values: self.page_metrics.num_buffered_values,
1032                    encoding: values_data.encoding,
1033                    def_level_encoding: Encoding::RLE,
1034                    rep_level_encoding: Encoding::RLE,
1035                    statistics: page_statistics,
1036                };
1037
1038                CompressedPage::new(data_page, uncompressed_size)
1039            }
1040            WriterVersion::PARQUET_2_0 => {
1041                let mut rep_levels_byte_len = 0;
1042                let mut def_levels_byte_len = 0;
1043                let mut buffer = vec![];
1044
1045                if max_rep_level > 0 {
1046                    let levels = self.encode_levels_v2(&self.rep_levels_sink[..], max_rep_level);
1047                    rep_levels_byte_len = levels.len();
1048                    buffer.extend_from_slice(&levels[..]);
1049                }
1050
1051                if max_def_level > 0 {
1052                    let levels = self.encode_levels_v2(&self.def_levels_sink[..], max_def_level);
1053                    def_levels_byte_len = levels.len();
1054                    buffer.extend_from_slice(&levels[..]);
1055                }
1056
1057                let uncompressed_size =
1058                    rep_levels_byte_len + def_levels_byte_len + values_data.buf.len();
1059
1060                // Data Page v2 compresses values only.
1061                match self.compressor {
1062                    Some(ref mut cmpr) => {
1063                        cmpr.compress(&values_data.buf, &mut buffer)?;
1064                    }
1065                    None => buffer.extend_from_slice(&values_data.buf),
1066                }
1067
1068                let data_page = Page::DataPageV2 {
1069                    buf: buffer.into(),
1070                    num_values: self.page_metrics.num_buffered_values,
1071                    encoding: values_data.encoding,
1072                    num_nulls: self.page_metrics.num_page_nulls as u32,
1073                    num_rows: self.page_metrics.num_buffered_rows,
1074                    def_levels_byte_len: def_levels_byte_len as u32,
1075                    rep_levels_byte_len: rep_levels_byte_len as u32,
1076                    is_compressed: self.compressor.is_some(),
1077                    statistics: page_statistics,
1078                };
1079
1080                CompressedPage::new(data_page, uncompressed_size)
1081            }
1082        };
1083
1084        // Check if we need to buffer data page or flush it to the sink directly.
1085        if self.encoder.has_dictionary() {
1086            self.data_pages.push_back(compressed_page);
1087        } else {
1088            self.write_data_page(compressed_page)?;
1089        }
1090
1091        // Update total number of rows.
1092        self.column_metrics.total_rows_written += self.page_metrics.num_buffered_rows as u64;
1093
1094        // Reset state.
1095        self.rep_levels_sink.clear();
1096        self.def_levels_sink.clear();
1097        self.page_metrics.new_page();
1098
1099        Ok(())
1100    }
1101
1102    /// Finalises any outstanding data pages and flushes buffered data pages from
1103    /// dictionary encoding into underlying sink.
1104    #[inline]
1105    fn flush_data_pages(&mut self) -> Result<()> {
1106        // Write all outstanding data to a new page.
1107        if self.page_metrics.num_buffered_values > 0 {
1108            self.add_data_page()?;
1109        }
1110
1111        while let Some(page) = self.data_pages.pop_front() {
1112            self.write_data_page(page)?;
1113        }
1114
1115        Ok(())
1116    }
1117
1118    /// Assembles column chunk metadata.
1119    fn build_column_metadata(&mut self) -> Result<ColumnChunkMetaData> {
1120        let total_compressed_size = self.column_metrics.total_compressed_size as i64;
1121        let total_uncompressed_size = self.column_metrics.total_uncompressed_size as i64;
1122        let num_values = self.column_metrics.total_num_values as i64;
1123        let dict_page_offset = self.column_metrics.dictionary_page_offset.map(|v| v as i64);
1124        // If data page offset is not set, then no pages have been written
1125        let data_page_offset = self.column_metrics.data_page_offset.unwrap_or(0) as i64;
1126
1127        let mut builder = ColumnChunkMetaData::builder(self.descr.clone())
1128            .set_compression(self.codec)
1129            .set_encodings(self.encodings.iter().cloned().collect())
1130            .set_page_encoding_stats(self.encoding_stats.clone())
1131            .set_total_compressed_size(total_compressed_size)
1132            .set_total_uncompressed_size(total_uncompressed_size)
1133            .set_num_values(num_values)
1134            .set_data_page_offset(data_page_offset)
1135            .set_dictionary_page_offset(dict_page_offset);
1136
1137        if self.statistics_enabled != EnabledStatistics::None {
1138            let backwards_compatible_min_max = self.descr.sort_order().is_signed();
1139
1140            let statistics = ValueStatistics::<E::T>::new(
1141                self.column_metrics.min_column_value.clone(),
1142                self.column_metrics.max_column_value.clone(),
1143                self.column_metrics.column_distinct_count,
1144                Some(self.column_metrics.num_column_nulls),
1145                false,
1146            )
1147            .with_backwards_compatible_min_max(backwards_compatible_min_max)
1148            .into();
1149
1150            let statistics = match statistics {
1151                Statistics::ByteArray(stats) if stats._internal_has_min_max_set() => {
1152                    let (min, did_truncate_min) = self.truncate_min_value(
1153                        self.props.statistics_truncate_length(),
1154                        stats.min_bytes_opt().unwrap(),
1155                    );
1156                    let (max, did_truncate_max) = self.truncate_max_value(
1157                        self.props.statistics_truncate_length(),
1158                        stats.max_bytes_opt().unwrap(),
1159                    );
1160                    Statistics::ByteArray(
1161                        ValueStatistics::new(
1162                            Some(min.into()),
1163                            Some(max.into()),
1164                            stats.distinct_count(),
1165                            stats.null_count_opt(),
1166                            backwards_compatible_min_max,
1167                        )
1168                        .with_max_is_exact(!did_truncate_max)
1169                        .with_min_is_exact(!did_truncate_min),
1170                    )
1171                }
1172                Statistics::FixedLenByteArray(stats)
1173                    if (stats._internal_has_min_max_set() && self.can_truncate_value()) =>
1174                {
1175                    let (min, did_truncate_min) = self.truncate_min_value(
1176                        self.props.statistics_truncate_length(),
1177                        stats.min_bytes_opt().unwrap(),
1178                    );
1179                    let (max, did_truncate_max) = self.truncate_max_value(
1180                        self.props.statistics_truncate_length(),
1181                        stats.max_bytes_opt().unwrap(),
1182                    );
1183                    Statistics::FixedLenByteArray(
1184                        ValueStatistics::new(
1185                            Some(min.into()),
1186                            Some(max.into()),
1187                            stats.distinct_count(),
1188                            stats.null_count_opt(),
1189                            backwards_compatible_min_max,
1190                        )
1191                        .with_max_is_exact(!did_truncate_max)
1192                        .with_min_is_exact(!did_truncate_min),
1193                    )
1194                }
1195                stats => stats,
1196            };
1197
1198            builder = builder
1199                .set_statistics(statistics)
1200                .set_unencoded_byte_array_data_bytes(self.column_metrics.variable_length_bytes)
1201                .set_repetition_level_histogram(
1202                    self.column_metrics.repetition_level_histogram.take(),
1203                )
1204                .set_definition_level_histogram(
1205                    self.column_metrics.definition_level_histogram.take(),
1206                );
1207        }
1208
1209        builder = self.set_column_chunk_encryption_properties(builder);
1210
1211        let metadata = builder.build()?;
1212        Ok(metadata)
1213    }
1214
1215    /// Encodes definition or repetition levels for Data Page v1.
1216    #[inline]
1217    fn encode_levels_v1(&self, encoding: Encoding, levels: &[i16], max_level: i16) -> Vec<u8> {
1218        let mut encoder = LevelEncoder::v1(encoding, max_level, levels.len());
1219        encoder.put(levels);
1220        encoder.consume()
1221    }
1222
1223    /// Encodes definition or repetition levels for Data Page v2.
1224    /// Encoding is always RLE.
1225    #[inline]
1226    fn encode_levels_v2(&self, levels: &[i16], max_level: i16) -> Vec<u8> {
1227        let mut encoder = LevelEncoder::v2(max_level, levels.len());
1228        encoder.put(levels);
1229        encoder.consume()
1230    }
1231
1232    /// Writes compressed data page into underlying sink and updates global metrics.
1233    #[inline]
1234    fn write_data_page(&mut self, page: CompressedPage) -> Result<()> {
1235        self.encodings.insert(page.encoding());
1236        match self.encoding_stats.last_mut() {
1237            Some(encoding_stats)
1238                if encoding_stats.page_type == page.page_type()
1239                    && encoding_stats.encoding == page.encoding() =>
1240            {
1241                encoding_stats.count += 1;
1242            }
1243            _ => {
1244                // data page type does not change inside a file
1245                // encoding can currently only change from dictionary to non-dictionary once
1246                self.encoding_stats.push(PageEncodingStats {
1247                    page_type: page.page_type(),
1248                    encoding: page.encoding(),
1249                    count: 1,
1250                });
1251            }
1252        }
1253        let page_spec = self.page_writer.write_page(page)?;
1254        // update offset index
1255        // compressed_size = header_size + compressed_data_size
1256        if let Some(builder) = self.offset_index_builder.as_mut() {
1257            builder
1258                .append_offset_and_size(page_spec.offset as i64, page_spec.compressed_size as i32)
1259        }
1260        self.update_metrics_for_page(page_spec);
1261        Ok(())
1262    }
1263
1264    /// Writes dictionary page into underlying sink.
1265    #[inline]
1266    fn write_dictionary_page(&mut self) -> Result<()> {
1267        let compressed_page = {
1268            let mut page = self
1269                .encoder
1270                .flush_dict_page()?
1271                .ok_or_else(|| general_err!("Dictionary encoder is not set"))?;
1272
1273            let uncompressed_size = page.buf.len();
1274
1275            if let Some(ref mut cmpr) = self.compressor {
1276                let mut output_buf = Vec::with_capacity(uncompressed_size);
1277                cmpr.compress(&page.buf, &mut output_buf)?;
1278                page.buf = Bytes::from(output_buf);
1279            }
1280
1281            let dict_page = Page::DictionaryPage {
1282                buf: page.buf,
1283                num_values: page.num_values as u32,
1284                encoding: self.props.dictionary_page_encoding(),
1285                is_sorted: page.is_sorted,
1286            };
1287            CompressedPage::new(dict_page, uncompressed_size)
1288        };
1289
1290        self.encodings.insert(compressed_page.encoding());
1291        self.encoding_stats.push(PageEncodingStats {
1292            page_type: PageType::DICTIONARY_PAGE,
1293            encoding: compressed_page.encoding(),
1294            count: 1,
1295        });
1296        let page_spec = self.page_writer.write_page(compressed_page)?;
1297        self.update_metrics_for_page(page_spec);
1298        // For the directory page, don't need to update column/offset index.
1299        Ok(())
1300    }
1301
1302    /// Updates column writer metrics with each page metadata.
1303    #[inline]
1304    fn update_metrics_for_page(&mut self, page_spec: PageWriteSpec) {
1305        self.column_metrics.total_uncompressed_size += page_spec.uncompressed_size as u64;
1306        self.column_metrics.total_compressed_size += page_spec.compressed_size as u64;
1307        self.column_metrics.total_bytes_written += page_spec.bytes_written;
1308
1309        match page_spec.page_type {
1310            PageType::DATA_PAGE | PageType::DATA_PAGE_V2 => {
1311                self.column_metrics.total_num_values += page_spec.num_values as u64;
1312                if self.column_metrics.data_page_offset.is_none() {
1313                    self.column_metrics.data_page_offset = Some(page_spec.offset);
1314                }
1315            }
1316            PageType::DICTIONARY_PAGE => {
1317                assert!(
1318                    self.column_metrics.dictionary_page_offset.is_none(),
1319                    "Dictionary offset is already set"
1320                );
1321                self.column_metrics.dictionary_page_offset = Some(page_spec.offset);
1322            }
1323            _ => {}
1324        }
1325    }
1326
1327    #[inline]
1328    #[cfg(feature = "encryption")]
1329    fn set_column_chunk_encryption_properties(
1330        &self,
1331        builder: ColumnChunkMetaDataBuilder,
1332    ) -> ColumnChunkMetaDataBuilder {
1333        if let Some(encryption_properties) = self.props.file_encryption_properties.as_ref() {
1334            builder.set_column_crypto_metadata(get_column_crypto_metadata(
1335                encryption_properties,
1336                &self.descr,
1337            ))
1338        } else {
1339            builder
1340        }
1341    }
1342
1343    #[inline]
1344    #[cfg(not(feature = "encryption"))]
1345    fn set_column_chunk_encryption_properties(
1346        &self,
1347        builder: ColumnChunkMetaDataBuilder,
1348    ) -> ColumnChunkMetaDataBuilder {
1349        builder
1350    }
1351}
1352
1353fn update_min<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, min: &mut Option<T>) {
1354    update_stat::<T, _>(descr, val, min, |cur| compare_greater(descr, cur, val))
1355}
1356
1357fn update_max<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, max: &mut Option<T>) {
1358    update_stat::<T, _>(descr, val, max, |cur| compare_greater(descr, val, cur))
1359}
1360
1361#[inline]
1362#[allow(clippy::eq_op)]
1363fn is_nan<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T) -> bool {
1364    match T::PHYSICAL_TYPE {
1365        Type::FLOAT | Type::DOUBLE => val != val,
1366        Type::FIXED_LEN_BYTE_ARRAY if descr.logical_type() == Some(LogicalType::Float16) => {
1367            let val = val.as_bytes();
1368            let val = f16::from_le_bytes([val[0], val[1]]);
1369            val.is_nan()
1370        }
1371        _ => false,
1372    }
1373}
1374
1375/// Perform a conditional update of `cur`, skipping any NaN values
1376///
1377/// If `cur` is `None`, sets `cur` to `Some(val)`, otherwise calls `should_update` with
1378/// the value of `cur`, and updates `cur` to `Some(val)` if it returns `true`
1379fn update_stat<T: ParquetValueType, F>(
1380    descr: &ColumnDescriptor,
1381    val: &T,
1382    cur: &mut Option<T>,
1383    should_update: F,
1384) where
1385    F: Fn(&T) -> bool,
1386{
1387    if is_nan(descr, val) {
1388        return;
1389    }
1390
1391    if cur.as_ref().map_or(true, should_update) {
1392        *cur = Some(val.clone());
1393    }
1394}
1395
1396/// Evaluate `a > b` according to underlying logical type.
1397fn compare_greater<T: ParquetValueType>(descr: &ColumnDescriptor, a: &T, b: &T) -> bool {
1398    if let Some(LogicalType::Integer { is_signed, .. }) = descr.logical_type() {
1399        if !is_signed {
1400            // need to compare unsigned
1401            return a.as_u64().unwrap() > b.as_u64().unwrap();
1402        }
1403    }
1404
1405    match descr.converted_type() {
1406        ConvertedType::UINT_8
1407        | ConvertedType::UINT_16
1408        | ConvertedType::UINT_32
1409        | ConvertedType::UINT_64 => {
1410            return a.as_u64().unwrap() > b.as_u64().unwrap();
1411        }
1412        _ => {}
1413    };
1414
1415    if let Some(LogicalType::Decimal { .. }) = descr.logical_type() {
1416        match T::PHYSICAL_TYPE {
1417            Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
1418                return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1419            }
1420            _ => {}
1421        };
1422    }
1423
1424    if descr.converted_type() == ConvertedType::DECIMAL {
1425        match T::PHYSICAL_TYPE {
1426            Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
1427                return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1428            }
1429            _ => {}
1430        };
1431    };
1432
1433    if let Some(LogicalType::Float16) = descr.logical_type() {
1434        let a = a.as_bytes();
1435        let a = f16::from_le_bytes([a[0], a[1]]);
1436        let b = b.as_bytes();
1437        let b = f16::from_le_bytes([b[0], b[1]]);
1438        return a > b;
1439    }
1440
1441    a > b
1442}
1443
1444// ----------------------------------------------------------------------
1445// Encoding support for column writer.
1446// This mirrors parquet-mr default encodings for writes. See:
1447// https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV1ValuesWriterFactory.java
1448// https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV2ValuesWriterFactory.java
1449
1450/// Returns encoding for a column when no other encoding is provided in writer properties.
1451fn fallback_encoding(kind: Type, props: &WriterProperties) -> Encoding {
1452    match (kind, props.writer_version()) {
1453        (Type::BOOLEAN, WriterVersion::PARQUET_2_0) => Encoding::RLE,
1454        (Type::INT32, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1455        (Type::INT64, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1456        (Type::BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1457        (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1458        _ => Encoding::PLAIN,
1459    }
1460}
1461
1462/// Returns true if dictionary is supported for column writer, false otherwise.
1463fn has_dictionary_support(kind: Type, props: &WriterProperties) -> bool {
1464    match (kind, props.writer_version()) {
1465        // Booleans do not support dict encoding and should use a fallback encoding.
1466        (Type::BOOLEAN, _) => false,
1467        // Dictionary encoding was not enabled in PARQUET 1.0
1468        (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_1_0) => false,
1469        (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => true,
1470        _ => true,
1471    }
1472}
1473
1474/// Signed comparison of bytes arrays
1475fn compare_greater_byte_array_decimals(a: &[u8], b: &[u8]) -> bool {
1476    let a_length = a.len();
1477    let b_length = b.len();
1478
1479    if a_length == 0 || b_length == 0 {
1480        return a_length > 0;
1481    }
1482
1483    let first_a: u8 = a[0];
1484    let first_b: u8 = b[0];
1485
1486    // We can short circuit for different signed numbers or
1487    // for equal length bytes arrays that have different first bytes.
1488    // The equality requirement is necessary for sign extension cases.
1489    // 0xFF10 should be equal to 0x10 (due to big endian sign extension).
1490    if (0x80 & first_a) != (0x80 & first_b) || (a_length == b_length && first_a != first_b) {
1491        return (first_a as i8) > (first_b as i8);
1492    }
1493
1494    // When the lengths are unequal and the numbers are of the same
1495    // sign we need to do comparison by sign extending the shorter
1496    // value first, and once we get to equal sized arrays, lexicographical
1497    // unsigned comparison of everything but the first byte is sufficient.
1498
1499    let extension: u8 = if (first_a as i8) < 0 { 0xFF } else { 0 };
1500
1501    if a_length != b_length {
1502        let not_equal = if a_length > b_length {
1503            let lead_length = a_length - b_length;
1504            a[0..lead_length].iter().any(|&x| x != extension)
1505        } else {
1506            let lead_length = b_length - a_length;
1507            b[0..lead_length].iter().any(|&x| x != extension)
1508        };
1509
1510        if not_equal {
1511            let negative_values: bool = (first_a as i8) < 0;
1512            let a_longer: bool = a_length > b_length;
1513            return if negative_values { !a_longer } else { a_longer };
1514        }
1515    }
1516
1517    (a[1..]) > (b[1..])
1518}
1519
1520/// Truncate a UTF-8 slice to the longest prefix that is still a valid UTF-8 string,
1521/// while being less than `length` bytes and non-empty. Returns `None` if truncation
1522/// is not possible within those constraints.
1523///
1524/// The caller guarantees that data.len() > length.
1525fn truncate_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1526    let split = (1..=length).rfind(|x| data.is_char_boundary(*x))?;
1527    Some(data.as_bytes()[..split].to_vec())
1528}
1529
1530/// Truncate a UTF-8 slice and increment it's final character. The returned value is the
1531/// longest such slice that is still a valid UTF-8 string while being less than `length`
1532/// bytes and non-empty. Returns `None` if no such transformation is possible.
1533///
1534/// The caller guarantees that data.len() > length.
1535fn truncate_and_increment_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1536    // UTF-8 is max 4 bytes, so start search 3 back from desired length
1537    let lower_bound = length.saturating_sub(3);
1538    let split = (lower_bound..=length).rfind(|x| data.is_char_boundary(*x))?;
1539    increment_utf8(data.get(..split)?)
1540}
1541
1542/// Increment the final character in a UTF-8 string in such a way that the returned result
1543/// is still a valid UTF-8 string. The returned string may be shorter than the input if the
1544/// last character(s) cannot be incremented (due to overflow or producing invalid code points).
1545/// Returns `None` if the string cannot be incremented.
1546///
1547/// Note that this implementation will not promote an N-byte code point to (N+1) bytes.
1548fn increment_utf8(data: &str) -> Option<Vec<u8>> {
1549    for (idx, original_char) in data.char_indices().rev() {
1550        let original_len = original_char.len_utf8();
1551        if let Some(next_char) = char::from_u32(original_char as u32 + 1) {
1552            // do not allow increasing byte width of incremented char
1553            if next_char.len_utf8() == original_len {
1554                let mut result = data.as_bytes()[..idx + original_len].to_vec();
1555                next_char.encode_utf8(&mut result[idx..]);
1556                return Some(result);
1557            }
1558        }
1559    }
1560
1561    None
1562}
1563
1564/// Try and increment the bytes from right to left.
1565///
1566/// Returns `None` if all bytes are set to `u8::MAX`.
1567fn increment(mut data: Vec<u8>) -> Option<Vec<u8>> {
1568    for byte in data.iter_mut().rev() {
1569        let (incremented, overflow) = byte.overflowing_add(1);
1570        *byte = incremented;
1571
1572        if !overflow {
1573            return Some(data);
1574        }
1575    }
1576
1577    None
1578}
1579
1580#[cfg(test)]
1581mod tests {
1582    use crate::{
1583        file::{properties::DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, writer::SerializedFileWriter},
1584        schema::parser::parse_message_type,
1585    };
1586    use core::str;
1587    use rand::distr::uniform::SampleUniform;
1588    use std::{fs::File, sync::Arc};
1589
1590    use crate::column::{
1591        page::PageReader,
1592        reader::{get_column_reader, get_typed_column_reader, ColumnReaderImpl},
1593    };
1594    use crate::file::writer::TrackedWrite;
1595    use crate::file::{
1596        properties::ReaderProperties, reader::SerializedPageReader, writer::SerializedPageWriter,
1597    };
1598    use crate::schema::types::{ColumnPath, Type as SchemaType};
1599    use crate::util::test_common::rand_gen::random_numbers_range;
1600
1601    use super::*;
1602
1603    #[test]
1604    fn test_column_writer_inconsistent_def_rep_length() {
1605        let page_writer = get_test_page_writer();
1606        let props = Default::default();
1607        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 1, props);
1608        let res = writer.write_batch(&[1, 2, 3, 4], Some(&[1, 1, 1]), Some(&[0, 0]));
1609        assert!(res.is_err());
1610        if let Err(err) = res {
1611            assert_eq!(
1612                format!("{err}"),
1613                "Parquet error: Inconsistent length of definition and repetition levels: 3 != 2"
1614            );
1615        }
1616    }
1617
1618    #[test]
1619    fn test_column_writer_invalid_def_levels() {
1620        let page_writer = get_test_page_writer();
1621        let props = Default::default();
1622        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1623        let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1624        assert!(res.is_err());
1625        if let Err(err) = res {
1626            assert_eq!(
1627                format!("{err}"),
1628                "Parquet error: Definition levels are required, because max definition level = 1"
1629            );
1630        }
1631    }
1632
1633    #[test]
1634    fn test_column_writer_invalid_rep_levels() {
1635        let page_writer = get_test_page_writer();
1636        let props = Default::default();
1637        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 1, props);
1638        let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1639        assert!(res.is_err());
1640        if let Err(err) = res {
1641            assert_eq!(
1642                format!("{err}"),
1643                "Parquet error: Repetition levels are required, because max repetition level = 1"
1644            );
1645        }
1646    }
1647
1648    #[test]
1649    fn test_column_writer_not_enough_values_to_write() {
1650        let page_writer = get_test_page_writer();
1651        let props = Default::default();
1652        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1653        let res = writer.write_batch(&[1, 2], Some(&[1, 1, 1, 1]), None);
1654        assert!(res.is_err());
1655        if let Err(err) = res {
1656            assert_eq!(
1657                format!("{err}"),
1658                "Parquet error: Expected to write 4 values, but have only 2"
1659            );
1660        }
1661    }
1662
1663    #[test]
1664    fn test_column_writer_write_only_one_dictionary_page() {
1665        let page_writer = get_test_page_writer();
1666        let props = Default::default();
1667        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1668        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1669        // First page should be correctly written.
1670        writer.add_data_page().unwrap();
1671        writer.write_dictionary_page().unwrap();
1672        let err = writer.write_dictionary_page().unwrap_err().to_string();
1673        assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1674    }
1675
1676    #[test]
1677    fn test_column_writer_error_when_writing_disabled_dictionary() {
1678        let page_writer = get_test_page_writer();
1679        let props = Arc::new(
1680            WriterProperties::builder()
1681                .set_dictionary_enabled(false)
1682                .build(),
1683        );
1684        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1685        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1686        let err = writer.write_dictionary_page().unwrap_err().to_string();
1687        assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1688    }
1689
1690    #[test]
1691    fn test_column_writer_boolean_type_does_not_support_dictionary() {
1692        let page_writer = get_test_page_writer();
1693        let props = Arc::new(
1694            WriterProperties::builder()
1695                .set_dictionary_enabled(true)
1696                .build(),
1697        );
1698        let mut writer = get_test_column_writer::<BoolType>(page_writer, 0, 0, props);
1699        writer
1700            .write_batch(&[true, false, true, false], None, None)
1701            .unwrap();
1702
1703        let r = writer.close().unwrap();
1704        // PlainEncoder uses bit writer to write boolean values, which all fit into 1
1705        // byte.
1706        assert_eq!(r.bytes_written, 1);
1707        assert_eq!(r.rows_written, 4);
1708
1709        let metadata = r.metadata;
1710        assert_eq!(metadata.encodings(), &vec![Encoding::PLAIN, Encoding::RLE]);
1711        assert_eq!(metadata.num_values(), 4); // just values
1712        assert_eq!(metadata.dictionary_page_offset(), None);
1713    }
1714
1715    #[test]
1716    fn test_column_writer_default_encoding_support_bool() {
1717        check_encoding_write_support::<BoolType>(
1718            WriterVersion::PARQUET_1_0,
1719            true,
1720            &[true, false],
1721            None,
1722            &[Encoding::PLAIN, Encoding::RLE],
1723            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1724        );
1725        check_encoding_write_support::<BoolType>(
1726            WriterVersion::PARQUET_1_0,
1727            false,
1728            &[true, false],
1729            None,
1730            &[Encoding::PLAIN, Encoding::RLE],
1731            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1732        );
1733        check_encoding_write_support::<BoolType>(
1734            WriterVersion::PARQUET_2_0,
1735            true,
1736            &[true, false],
1737            None,
1738            &[Encoding::RLE],
1739            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE, 1)],
1740        );
1741        check_encoding_write_support::<BoolType>(
1742            WriterVersion::PARQUET_2_0,
1743            false,
1744            &[true, false],
1745            None,
1746            &[Encoding::RLE],
1747            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE, 1)],
1748        );
1749    }
1750
1751    #[test]
1752    fn test_column_writer_default_encoding_support_int32() {
1753        check_encoding_write_support::<Int32Type>(
1754            WriterVersion::PARQUET_1_0,
1755            true,
1756            &[1, 2],
1757            Some(0),
1758            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1759            &[
1760                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1761                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1762            ],
1763        );
1764        check_encoding_write_support::<Int32Type>(
1765            WriterVersion::PARQUET_1_0,
1766            false,
1767            &[1, 2],
1768            None,
1769            &[Encoding::PLAIN, Encoding::RLE],
1770            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1771        );
1772        check_encoding_write_support::<Int32Type>(
1773            WriterVersion::PARQUET_2_0,
1774            true,
1775            &[1, 2],
1776            Some(0),
1777            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1778            &[
1779                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1780                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1781            ],
1782        );
1783        check_encoding_write_support::<Int32Type>(
1784            WriterVersion::PARQUET_2_0,
1785            false,
1786            &[1, 2],
1787            None,
1788            &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
1789            &[encoding_stats(
1790                PageType::DATA_PAGE_V2,
1791                Encoding::DELTA_BINARY_PACKED,
1792                1,
1793            )],
1794        );
1795    }
1796
1797    #[test]
1798    fn test_column_writer_default_encoding_support_int64() {
1799        check_encoding_write_support::<Int64Type>(
1800            WriterVersion::PARQUET_1_0,
1801            true,
1802            &[1, 2],
1803            Some(0),
1804            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1805            &[
1806                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1807                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1808            ],
1809        );
1810        check_encoding_write_support::<Int64Type>(
1811            WriterVersion::PARQUET_1_0,
1812            false,
1813            &[1, 2],
1814            None,
1815            &[Encoding::PLAIN, Encoding::RLE],
1816            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1817        );
1818        check_encoding_write_support::<Int64Type>(
1819            WriterVersion::PARQUET_2_0,
1820            true,
1821            &[1, 2],
1822            Some(0),
1823            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1824            &[
1825                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1826                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1827            ],
1828        );
1829        check_encoding_write_support::<Int64Type>(
1830            WriterVersion::PARQUET_2_0,
1831            false,
1832            &[1, 2],
1833            None,
1834            &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
1835            &[encoding_stats(
1836                PageType::DATA_PAGE_V2,
1837                Encoding::DELTA_BINARY_PACKED,
1838                1,
1839            )],
1840        );
1841    }
1842
1843    #[test]
1844    fn test_column_writer_default_encoding_support_int96() {
1845        check_encoding_write_support::<Int96Type>(
1846            WriterVersion::PARQUET_1_0,
1847            true,
1848            &[Int96::from(vec![1, 2, 3])],
1849            Some(0),
1850            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1851            &[
1852                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1853                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1854            ],
1855        );
1856        check_encoding_write_support::<Int96Type>(
1857            WriterVersion::PARQUET_1_0,
1858            false,
1859            &[Int96::from(vec![1, 2, 3])],
1860            None,
1861            &[Encoding::PLAIN, Encoding::RLE],
1862            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1863        );
1864        check_encoding_write_support::<Int96Type>(
1865            WriterVersion::PARQUET_2_0,
1866            true,
1867            &[Int96::from(vec![1, 2, 3])],
1868            Some(0),
1869            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1870            &[
1871                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1872                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1873            ],
1874        );
1875        check_encoding_write_support::<Int96Type>(
1876            WriterVersion::PARQUET_2_0,
1877            false,
1878            &[Int96::from(vec![1, 2, 3])],
1879            None,
1880            &[Encoding::PLAIN, Encoding::RLE],
1881            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1882        );
1883    }
1884
1885    #[test]
1886    fn test_column_writer_default_encoding_support_float() {
1887        check_encoding_write_support::<FloatType>(
1888            WriterVersion::PARQUET_1_0,
1889            true,
1890            &[1.0, 2.0],
1891            Some(0),
1892            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1893            &[
1894                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1895                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1896            ],
1897        );
1898        check_encoding_write_support::<FloatType>(
1899            WriterVersion::PARQUET_1_0,
1900            false,
1901            &[1.0, 2.0],
1902            None,
1903            &[Encoding::PLAIN, Encoding::RLE],
1904            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1905        );
1906        check_encoding_write_support::<FloatType>(
1907            WriterVersion::PARQUET_2_0,
1908            true,
1909            &[1.0, 2.0],
1910            Some(0),
1911            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1912            &[
1913                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1914                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1915            ],
1916        );
1917        check_encoding_write_support::<FloatType>(
1918            WriterVersion::PARQUET_2_0,
1919            false,
1920            &[1.0, 2.0],
1921            None,
1922            &[Encoding::PLAIN, Encoding::RLE],
1923            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1924        );
1925    }
1926
1927    #[test]
1928    fn test_column_writer_default_encoding_support_double() {
1929        check_encoding_write_support::<DoubleType>(
1930            WriterVersion::PARQUET_1_0,
1931            true,
1932            &[1.0, 2.0],
1933            Some(0),
1934            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1935            &[
1936                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1937                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1938            ],
1939        );
1940        check_encoding_write_support::<DoubleType>(
1941            WriterVersion::PARQUET_1_0,
1942            false,
1943            &[1.0, 2.0],
1944            None,
1945            &[Encoding::PLAIN, Encoding::RLE],
1946            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1947        );
1948        check_encoding_write_support::<DoubleType>(
1949            WriterVersion::PARQUET_2_0,
1950            true,
1951            &[1.0, 2.0],
1952            Some(0),
1953            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1954            &[
1955                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1956                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1957            ],
1958        );
1959        check_encoding_write_support::<DoubleType>(
1960            WriterVersion::PARQUET_2_0,
1961            false,
1962            &[1.0, 2.0],
1963            None,
1964            &[Encoding::PLAIN, Encoding::RLE],
1965            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1966        );
1967    }
1968
1969    #[test]
1970    fn test_column_writer_default_encoding_support_byte_array() {
1971        check_encoding_write_support::<ByteArrayType>(
1972            WriterVersion::PARQUET_1_0,
1973            true,
1974            &[ByteArray::from(vec![1u8])],
1975            Some(0),
1976            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1977            &[
1978                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1979                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1980            ],
1981        );
1982        check_encoding_write_support::<ByteArrayType>(
1983            WriterVersion::PARQUET_1_0,
1984            false,
1985            &[ByteArray::from(vec![1u8])],
1986            None,
1987            &[Encoding::PLAIN, Encoding::RLE],
1988            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1989        );
1990        check_encoding_write_support::<ByteArrayType>(
1991            WriterVersion::PARQUET_2_0,
1992            true,
1993            &[ByteArray::from(vec![1u8])],
1994            Some(0),
1995            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1996            &[
1997                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1998                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1999            ],
2000        );
2001        check_encoding_write_support::<ByteArrayType>(
2002            WriterVersion::PARQUET_2_0,
2003            false,
2004            &[ByteArray::from(vec![1u8])],
2005            None,
2006            &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
2007            &[encoding_stats(
2008                PageType::DATA_PAGE_V2,
2009                Encoding::DELTA_BYTE_ARRAY,
2010                1,
2011            )],
2012        );
2013    }
2014
2015    #[test]
2016    fn test_column_writer_default_encoding_support_fixed_len_byte_array() {
2017        check_encoding_write_support::<FixedLenByteArrayType>(
2018            WriterVersion::PARQUET_1_0,
2019            true,
2020            &[ByteArray::from(vec![1u8]).into()],
2021            None,
2022            &[Encoding::PLAIN, Encoding::RLE],
2023            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2024        );
2025        check_encoding_write_support::<FixedLenByteArrayType>(
2026            WriterVersion::PARQUET_1_0,
2027            false,
2028            &[ByteArray::from(vec![1u8]).into()],
2029            None,
2030            &[Encoding::PLAIN, Encoding::RLE],
2031            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2032        );
2033        check_encoding_write_support::<FixedLenByteArrayType>(
2034            WriterVersion::PARQUET_2_0,
2035            true,
2036            &[ByteArray::from(vec![1u8]).into()],
2037            Some(0),
2038            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2039            &[
2040                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2041                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2042            ],
2043        );
2044        check_encoding_write_support::<FixedLenByteArrayType>(
2045            WriterVersion::PARQUET_2_0,
2046            false,
2047            &[ByteArray::from(vec![1u8]).into()],
2048            None,
2049            &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
2050            &[encoding_stats(
2051                PageType::DATA_PAGE_V2,
2052                Encoding::DELTA_BYTE_ARRAY,
2053                1,
2054            )],
2055        );
2056    }
2057
2058    #[test]
2059    fn test_column_writer_check_metadata() {
2060        let page_writer = get_test_page_writer();
2061        let props = Default::default();
2062        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2063        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2064
2065        let r = writer.close().unwrap();
2066        assert_eq!(r.bytes_written, 20);
2067        assert_eq!(r.rows_written, 4);
2068
2069        let metadata = r.metadata;
2070        assert_eq!(
2071            metadata.encodings(),
2072            &vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
2073        );
2074        assert_eq!(metadata.num_values(), 4);
2075        assert_eq!(metadata.compressed_size(), 20);
2076        assert_eq!(metadata.uncompressed_size(), 20);
2077        assert_eq!(metadata.data_page_offset(), 0);
2078        assert_eq!(metadata.dictionary_page_offset(), Some(0));
2079        if let Some(stats) = metadata.statistics() {
2080            assert_eq!(stats.null_count_opt(), Some(0));
2081            assert_eq!(stats.distinct_count_opt(), None);
2082            if let Statistics::Int32(stats) = stats {
2083                assert_eq!(stats.min_opt().unwrap(), &1);
2084                assert_eq!(stats.max_opt().unwrap(), &4);
2085            } else {
2086                panic!("expecting Statistics::Int32");
2087            }
2088        } else {
2089            panic!("metadata missing statistics");
2090        }
2091    }
2092
2093    #[test]
2094    fn test_column_writer_check_byte_array_min_max() {
2095        let page_writer = get_test_page_writer();
2096        let props = Default::default();
2097        let mut writer = get_test_decimals_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
2098        writer
2099            .write_batch(
2100                &[
2101                    ByteArray::from(vec![
2102                        255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
2103                        35u8, 231u8, 90u8, 0u8, 0u8,
2104                    ]),
2105                    ByteArray::from(vec![
2106                        255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 228u8, 62u8, 146u8,
2107                        152u8, 177u8, 56u8, 0u8, 0u8,
2108                    ]),
2109                    ByteArray::from(vec![
2110                        0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8,
2111                        0u8,
2112                    ]),
2113                    ByteArray::from(vec![
2114                        0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
2115                        44u8, 0u8, 0u8,
2116                    ]),
2117                ],
2118                None,
2119                None,
2120            )
2121            .unwrap();
2122        let metadata = writer.close().unwrap().metadata;
2123        if let Some(stats) = metadata.statistics() {
2124            if let Statistics::ByteArray(stats) = stats {
2125                assert_eq!(
2126                    stats.min_opt().unwrap(),
2127                    &ByteArray::from(vec![
2128                        255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
2129                        35u8, 231u8, 90u8, 0u8, 0u8,
2130                    ])
2131                );
2132                assert_eq!(
2133                    stats.max_opt().unwrap(),
2134                    &ByteArray::from(vec![
2135                        0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
2136                        44u8, 0u8, 0u8,
2137                    ])
2138                );
2139            } else {
2140                panic!("expecting Statistics::ByteArray");
2141            }
2142        } else {
2143            panic!("metadata missing statistics");
2144        }
2145    }
2146
2147    #[test]
2148    fn test_column_writer_uint32_converted_type_min_max() {
2149        let page_writer = get_test_page_writer();
2150        let props = Default::default();
2151        let mut writer = get_test_unsigned_int_given_as_converted_column_writer::<Int32Type>(
2152            page_writer,
2153            0,
2154            0,
2155            props,
2156        );
2157        writer.write_batch(&[0, 1, 2, 3, 4, 5], None, None).unwrap();
2158        let metadata = writer.close().unwrap().metadata;
2159        if let Some(stats) = metadata.statistics() {
2160            if let Statistics::Int32(stats) = stats {
2161                assert_eq!(stats.min_opt().unwrap(), &0,);
2162                assert_eq!(stats.max_opt().unwrap(), &5,);
2163            } else {
2164                panic!("expecting Statistics::Int32");
2165            }
2166        } else {
2167            panic!("metadata missing statistics");
2168        }
2169    }
2170
2171    #[test]
2172    fn test_column_writer_precalculated_statistics() {
2173        let page_writer = get_test_page_writer();
2174        let props = Arc::new(
2175            WriterProperties::builder()
2176                .set_statistics_enabled(EnabledStatistics::Chunk)
2177                .build(),
2178        );
2179        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2180        writer
2181            .write_batch_with_statistics(
2182                &[1, 2, 3, 4],
2183                None,
2184                None,
2185                Some(&-17),
2186                Some(&9000),
2187                Some(55),
2188            )
2189            .unwrap();
2190
2191        let r = writer.close().unwrap();
2192        assert_eq!(r.bytes_written, 20);
2193        assert_eq!(r.rows_written, 4);
2194
2195        let metadata = r.metadata;
2196        assert_eq!(
2197            metadata.encodings(),
2198            &vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
2199        );
2200        assert_eq!(metadata.num_values(), 4);
2201        assert_eq!(metadata.compressed_size(), 20);
2202        assert_eq!(metadata.uncompressed_size(), 20);
2203        assert_eq!(metadata.data_page_offset(), 0);
2204        assert_eq!(metadata.dictionary_page_offset(), Some(0));
2205        if let Some(stats) = metadata.statistics() {
2206            assert_eq!(stats.null_count_opt(), Some(0));
2207            assert_eq!(stats.distinct_count_opt().unwrap_or(0), 55);
2208            if let Statistics::Int32(stats) = stats {
2209                assert_eq!(stats.min_opt().unwrap(), &-17);
2210                assert_eq!(stats.max_opt().unwrap(), &9000);
2211            } else {
2212                panic!("expecting Statistics::Int32");
2213            }
2214        } else {
2215            panic!("metadata missing statistics");
2216        }
2217    }
2218
2219    #[test]
2220    fn test_mixed_precomputed_statistics() {
2221        let mut buf = Vec::with_capacity(100);
2222        let mut write = TrackedWrite::new(&mut buf);
2223        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2224        let props = Default::default();
2225        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2226
2227        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2228        writer
2229            .write_batch_with_statistics(&[5, 6, 7], None, None, Some(&5), Some(&7), Some(3))
2230            .unwrap();
2231
2232        let r = writer.close().unwrap();
2233
2234        let stats = r.metadata.statistics().unwrap();
2235        assert_eq!(stats.min_bytes_opt().unwrap(), 1_i32.to_le_bytes());
2236        assert_eq!(stats.max_bytes_opt().unwrap(), 7_i32.to_le_bytes());
2237        assert_eq!(stats.null_count_opt(), Some(0));
2238        assert!(stats.distinct_count_opt().is_none());
2239
2240        drop(write);
2241
2242        let props = ReaderProperties::builder()
2243            .set_backward_compatible_lz4(false)
2244            .build();
2245        let reader = SerializedPageReader::new_with_properties(
2246            Arc::new(Bytes::from(buf)),
2247            &r.metadata,
2248            r.rows_written as usize,
2249            None,
2250            Arc::new(props),
2251        )
2252        .unwrap();
2253
2254        let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2255        assert_eq!(pages.len(), 2);
2256
2257        assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2258        assert_eq!(pages[1].page_type(), PageType::DATA_PAGE);
2259
2260        let page_statistics = pages[1].statistics().unwrap();
2261        assert_eq!(
2262            page_statistics.min_bytes_opt().unwrap(),
2263            1_i32.to_le_bytes()
2264        );
2265        assert_eq!(
2266            page_statistics.max_bytes_opt().unwrap(),
2267            7_i32.to_le_bytes()
2268        );
2269        assert_eq!(page_statistics.null_count_opt(), Some(0));
2270        assert!(page_statistics.distinct_count_opt().is_none());
2271    }
2272
2273    #[test]
2274    fn test_disabled_statistics() {
2275        let mut buf = Vec::with_capacity(100);
2276        let mut write = TrackedWrite::new(&mut buf);
2277        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2278        let props = WriterProperties::builder()
2279            .set_statistics_enabled(EnabledStatistics::None)
2280            .set_writer_version(WriterVersion::PARQUET_2_0)
2281            .build();
2282        let props = Arc::new(props);
2283
2284        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
2285        writer
2286            .write_batch(&[1, 2, 3, 4], Some(&[1, 0, 0, 1, 1, 1]), None)
2287            .unwrap();
2288
2289        let r = writer.close().unwrap();
2290        assert!(r.metadata.statistics().is_none());
2291
2292        drop(write);
2293
2294        let props = ReaderProperties::builder()
2295            .set_backward_compatible_lz4(false)
2296            .build();
2297        let reader = SerializedPageReader::new_with_properties(
2298            Arc::new(Bytes::from(buf)),
2299            &r.metadata,
2300            r.rows_written as usize,
2301            None,
2302            Arc::new(props),
2303        )
2304        .unwrap();
2305
2306        let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2307        assert_eq!(pages.len(), 2);
2308
2309        assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2310        assert_eq!(pages[1].page_type(), PageType::DATA_PAGE_V2);
2311
2312        match &pages[1] {
2313            Page::DataPageV2 {
2314                num_values,
2315                num_nulls,
2316                num_rows,
2317                statistics,
2318                ..
2319            } => {
2320                assert_eq!(*num_values, 6);
2321                assert_eq!(*num_nulls, 2);
2322                assert_eq!(*num_rows, 6);
2323                assert!(statistics.is_none());
2324            }
2325            _ => unreachable!(),
2326        }
2327    }
2328
2329    #[test]
2330    fn test_column_writer_empty_column_roundtrip() {
2331        let props = Default::default();
2332        column_roundtrip::<Int32Type>(props, &[], None, None);
2333    }
2334
2335    #[test]
2336    fn test_column_writer_non_nullable_values_roundtrip() {
2337        let props = Default::default();
2338        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 0, 0);
2339    }
2340
2341    #[test]
2342    fn test_column_writer_nullable_non_repeated_values_roundtrip() {
2343        let props = Default::default();
2344        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 0);
2345    }
2346
2347    #[test]
2348    fn test_column_writer_nullable_repeated_values_roundtrip() {
2349        let props = Default::default();
2350        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2351    }
2352
2353    #[test]
2354    fn test_column_writer_dictionary_fallback_small_data_page() {
2355        let props = WriterProperties::builder()
2356            .set_dictionary_page_size_limit(32)
2357            .set_data_page_size_limit(32)
2358            .build();
2359        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2360    }
2361
2362    #[test]
2363    fn test_column_writer_small_write_batch_size() {
2364        for i in &[1usize, 2, 5, 10, 11, 1023] {
2365            let props = WriterProperties::builder().set_write_batch_size(*i).build();
2366
2367            column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2368        }
2369    }
2370
2371    #[test]
2372    fn test_column_writer_dictionary_disabled_v1() {
2373        let props = WriterProperties::builder()
2374            .set_writer_version(WriterVersion::PARQUET_1_0)
2375            .set_dictionary_enabled(false)
2376            .build();
2377        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2378    }
2379
2380    #[test]
2381    fn test_column_writer_dictionary_disabled_v2() {
2382        let props = WriterProperties::builder()
2383            .set_writer_version(WriterVersion::PARQUET_2_0)
2384            .set_dictionary_enabled(false)
2385            .build();
2386        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2387    }
2388
2389    #[test]
2390    fn test_column_writer_compression_v1() {
2391        let props = WriterProperties::builder()
2392            .set_writer_version(WriterVersion::PARQUET_1_0)
2393            .set_compression(Compression::SNAPPY)
2394            .build();
2395        column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2396    }
2397
2398    #[test]
2399    fn test_column_writer_compression_v2() {
2400        let props = WriterProperties::builder()
2401            .set_writer_version(WriterVersion::PARQUET_2_0)
2402            .set_compression(Compression::SNAPPY)
2403            .build();
2404        column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2405    }
2406
2407    #[test]
2408    fn test_column_writer_add_data_pages_with_dict() {
2409        // ARROW-5129: Test verifies that we add data page in case of dictionary encoding
2410        // and no fallback occurred so far.
2411        let mut file = tempfile::tempfile().unwrap();
2412        let mut write = TrackedWrite::new(&mut file);
2413        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2414        let props = Arc::new(
2415            WriterProperties::builder()
2416                .set_data_page_size_limit(10)
2417                .set_write_batch_size(3) // write 3 values at a time
2418                .build(),
2419        );
2420        let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
2421        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2422        writer.write_batch(data, None, None).unwrap();
2423        let r = writer.close().unwrap();
2424
2425        drop(write);
2426
2427        // Read pages and check the sequence
2428        let props = ReaderProperties::builder()
2429            .set_backward_compatible_lz4(false)
2430            .build();
2431        let mut page_reader = Box::new(
2432            SerializedPageReader::new_with_properties(
2433                Arc::new(file),
2434                &r.metadata,
2435                r.rows_written as usize,
2436                None,
2437                Arc::new(props),
2438            )
2439            .unwrap(),
2440        );
2441        let mut res = Vec::new();
2442        while let Some(page) = page_reader.get_next_page().unwrap() {
2443            res.push((page.page_type(), page.num_values(), page.buffer().len()));
2444        }
2445        assert_eq!(
2446            res,
2447            vec![
2448                (PageType::DICTIONARY_PAGE, 10, 40),
2449                (PageType::DATA_PAGE, 9, 10),
2450                (PageType::DATA_PAGE, 1, 3),
2451            ]
2452        );
2453        assert_eq!(
2454            r.metadata.page_encoding_stats(),
2455            Some(&vec![
2456                PageEncodingStats {
2457                    page_type: PageType::DICTIONARY_PAGE,
2458                    encoding: Encoding::PLAIN,
2459                    count: 1
2460                },
2461                PageEncodingStats {
2462                    page_type: PageType::DATA_PAGE,
2463                    encoding: Encoding::RLE_DICTIONARY,
2464                    count: 2,
2465                }
2466            ])
2467        );
2468    }
2469
2470    #[test]
2471    fn test_bool_statistics() {
2472        let stats = statistics_roundtrip::<BoolType>(&[true, false, false, true]);
2473        // Booleans have an unsigned sort order and so are not compatible
2474        // with the deprecated `min` and `max` statistics
2475        assert!(!stats.is_min_max_backwards_compatible());
2476        if let Statistics::Boolean(stats) = stats {
2477            assert_eq!(stats.min_opt().unwrap(), &false);
2478            assert_eq!(stats.max_opt().unwrap(), &true);
2479        } else {
2480            panic!("expecting Statistics::Boolean, got {stats:?}");
2481        }
2482    }
2483
2484    #[test]
2485    fn test_int32_statistics() {
2486        let stats = statistics_roundtrip::<Int32Type>(&[-1, 3, -2, 2]);
2487        assert!(stats.is_min_max_backwards_compatible());
2488        if let Statistics::Int32(stats) = stats {
2489            assert_eq!(stats.min_opt().unwrap(), &-2);
2490            assert_eq!(stats.max_opt().unwrap(), &3);
2491        } else {
2492            panic!("expecting Statistics::Int32, got {stats:?}");
2493        }
2494    }
2495
2496    #[test]
2497    fn test_int64_statistics() {
2498        let stats = statistics_roundtrip::<Int64Type>(&[-1, 3, -2, 2]);
2499        assert!(stats.is_min_max_backwards_compatible());
2500        if let Statistics::Int64(stats) = stats {
2501            assert_eq!(stats.min_opt().unwrap(), &-2);
2502            assert_eq!(stats.max_opt().unwrap(), &3);
2503        } else {
2504            panic!("expecting Statistics::Int64, got {stats:?}");
2505        }
2506    }
2507
2508    #[test]
2509    fn test_int96_statistics() {
2510        let input = vec![
2511            Int96::from(vec![1, 20, 30]),
2512            Int96::from(vec![3, 20, 10]),
2513            Int96::from(vec![0, 20, 30]),
2514            Int96::from(vec![2, 20, 30]),
2515        ]
2516        .into_iter()
2517        .collect::<Vec<Int96>>();
2518
2519        let stats = statistics_roundtrip::<Int96Type>(&input);
2520        assert!(!stats.is_min_max_backwards_compatible());
2521        if let Statistics::Int96(stats) = stats {
2522            assert_eq!(stats.min_opt().unwrap(), &Int96::from(vec![0, 20, 30]));
2523            assert_eq!(stats.max_opt().unwrap(), &Int96::from(vec![3, 20, 10]));
2524        } else {
2525            panic!("expecting Statistics::Int96, got {stats:?}");
2526        }
2527    }
2528
2529    #[test]
2530    fn test_float_statistics() {
2531        let stats = statistics_roundtrip::<FloatType>(&[-1.0, 3.0, -2.0, 2.0]);
2532        assert!(stats.is_min_max_backwards_compatible());
2533        if let Statistics::Float(stats) = stats {
2534            assert_eq!(stats.min_opt().unwrap(), &-2.0);
2535            assert_eq!(stats.max_opt().unwrap(), &3.0);
2536        } else {
2537            panic!("expecting Statistics::Float, got {stats:?}");
2538        }
2539    }
2540
2541    #[test]
2542    fn test_double_statistics() {
2543        let stats = statistics_roundtrip::<DoubleType>(&[-1.0, 3.0, -2.0, 2.0]);
2544        assert!(stats.is_min_max_backwards_compatible());
2545        if let Statistics::Double(stats) = stats {
2546            assert_eq!(stats.min_opt().unwrap(), &-2.0);
2547            assert_eq!(stats.max_opt().unwrap(), &3.0);
2548        } else {
2549            panic!("expecting Statistics::Double, got {stats:?}");
2550        }
2551    }
2552
2553    #[test]
2554    fn test_byte_array_statistics() {
2555        let input = ["aawaa", "zz", "aaw", "m", "qrs"]
2556            .iter()
2557            .map(|&s| s.into())
2558            .collect::<Vec<_>>();
2559
2560        let stats = statistics_roundtrip::<ByteArrayType>(&input);
2561        assert!(!stats.is_min_max_backwards_compatible());
2562        if let Statistics::ByteArray(stats) = stats {
2563            assert_eq!(stats.min_opt().unwrap(), &ByteArray::from("aaw"));
2564            assert_eq!(stats.max_opt().unwrap(), &ByteArray::from("zz"));
2565        } else {
2566            panic!("expecting Statistics::ByteArray, got {stats:?}");
2567        }
2568    }
2569
2570    #[test]
2571    fn test_fixed_len_byte_array_statistics() {
2572        let input = ["aawaa", "zz   ", "aaw  ", "m    ", "qrs  "]
2573            .iter()
2574            .map(|&s| ByteArray::from(s).into())
2575            .collect::<Vec<_>>();
2576
2577        let stats = statistics_roundtrip::<FixedLenByteArrayType>(&input);
2578        assert!(!stats.is_min_max_backwards_compatible());
2579        if let Statistics::FixedLenByteArray(stats) = stats {
2580            let expected_min: FixedLenByteArray = ByteArray::from("aaw  ").into();
2581            assert_eq!(stats.min_opt().unwrap(), &expected_min);
2582            let expected_max: FixedLenByteArray = ByteArray::from("zz   ").into();
2583            assert_eq!(stats.max_opt().unwrap(), &expected_max);
2584        } else {
2585            panic!("expecting Statistics::FixedLenByteArray, got {stats:?}");
2586        }
2587    }
2588
2589    #[test]
2590    fn test_column_writer_check_float16_min_max() {
2591        let input = [
2592            -f16::ONE,
2593            f16::from_f32(3.0),
2594            -f16::from_f32(2.0),
2595            f16::from_f32(2.0),
2596        ]
2597        .into_iter()
2598        .map(|s| ByteArray::from(s).into())
2599        .collect::<Vec<_>>();
2600
2601        let stats = float16_statistics_roundtrip(&input);
2602        assert!(stats.is_min_max_backwards_compatible());
2603        assert_eq!(
2604            stats.min_opt().unwrap(),
2605            &ByteArray::from(-f16::from_f32(2.0))
2606        );
2607        assert_eq!(
2608            stats.max_opt().unwrap(),
2609            &ByteArray::from(f16::from_f32(3.0))
2610        );
2611    }
2612
2613    #[test]
2614    fn test_column_writer_check_float16_nan_middle() {
2615        let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
2616            .into_iter()
2617            .map(|s| ByteArray::from(s).into())
2618            .collect::<Vec<_>>();
2619
2620        let stats = float16_statistics_roundtrip(&input);
2621        assert!(stats.is_min_max_backwards_compatible());
2622        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2623        assert_eq!(
2624            stats.max_opt().unwrap(),
2625            &ByteArray::from(f16::ONE + f16::ONE)
2626        );
2627    }
2628
2629    #[test]
2630    fn test_float16_statistics_nan_middle() {
2631        let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
2632            .into_iter()
2633            .map(|s| ByteArray::from(s).into())
2634            .collect::<Vec<_>>();
2635
2636        let stats = float16_statistics_roundtrip(&input);
2637        assert!(stats.is_min_max_backwards_compatible());
2638        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2639        assert_eq!(
2640            stats.max_opt().unwrap(),
2641            &ByteArray::from(f16::ONE + f16::ONE)
2642        );
2643    }
2644
2645    #[test]
2646    fn test_float16_statistics_nan_start() {
2647        let input = [f16::NAN, f16::ONE, f16::ONE + f16::ONE]
2648            .into_iter()
2649            .map(|s| ByteArray::from(s).into())
2650            .collect::<Vec<_>>();
2651
2652        let stats = float16_statistics_roundtrip(&input);
2653        assert!(stats.is_min_max_backwards_compatible());
2654        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2655        assert_eq!(
2656            stats.max_opt().unwrap(),
2657            &ByteArray::from(f16::ONE + f16::ONE)
2658        );
2659    }
2660
2661    #[test]
2662    fn test_float16_statistics_nan_only() {
2663        let input = [f16::NAN, f16::NAN]
2664            .into_iter()
2665            .map(|s| ByteArray::from(s).into())
2666            .collect::<Vec<_>>();
2667
2668        let stats = float16_statistics_roundtrip(&input);
2669        assert!(stats.min_bytes_opt().is_none());
2670        assert!(stats.max_bytes_opt().is_none());
2671        assert!(stats.is_min_max_backwards_compatible());
2672    }
2673
2674    #[test]
2675    fn test_float16_statistics_zero_only() {
2676        let input = [f16::ZERO]
2677            .into_iter()
2678            .map(|s| ByteArray::from(s).into())
2679            .collect::<Vec<_>>();
2680
2681        let stats = float16_statistics_roundtrip(&input);
2682        assert!(stats.is_min_max_backwards_compatible());
2683        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2684        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2685    }
2686
2687    #[test]
2688    fn test_float16_statistics_neg_zero_only() {
2689        let input = [f16::NEG_ZERO]
2690            .into_iter()
2691            .map(|s| ByteArray::from(s).into())
2692            .collect::<Vec<_>>();
2693
2694        let stats = float16_statistics_roundtrip(&input);
2695        assert!(stats.is_min_max_backwards_compatible());
2696        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2697        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2698    }
2699
2700    #[test]
2701    fn test_float16_statistics_zero_min() {
2702        let input = [f16::ZERO, f16::ONE, f16::NAN, f16::PI]
2703            .into_iter()
2704            .map(|s| ByteArray::from(s).into())
2705            .collect::<Vec<_>>();
2706
2707        let stats = float16_statistics_roundtrip(&input);
2708        assert!(stats.is_min_max_backwards_compatible());
2709        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2710        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::PI));
2711    }
2712
2713    #[test]
2714    fn test_float16_statistics_neg_zero_max() {
2715        let input = [f16::NEG_ZERO, f16::NEG_ONE, f16::NAN, -f16::PI]
2716            .into_iter()
2717            .map(|s| ByteArray::from(s).into())
2718            .collect::<Vec<_>>();
2719
2720        let stats = float16_statistics_roundtrip(&input);
2721        assert!(stats.is_min_max_backwards_compatible());
2722        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(-f16::PI));
2723        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2724    }
2725
2726    #[test]
2727    fn test_float_statistics_nan_middle() {
2728        let stats = statistics_roundtrip::<FloatType>(&[1.0, f32::NAN, 2.0]);
2729        assert!(stats.is_min_max_backwards_compatible());
2730        if let Statistics::Float(stats) = stats {
2731            assert_eq!(stats.min_opt().unwrap(), &1.0);
2732            assert_eq!(stats.max_opt().unwrap(), &2.0);
2733        } else {
2734            panic!("expecting Statistics::Float");
2735        }
2736    }
2737
2738    #[test]
2739    fn test_float_statistics_nan_start() {
2740        let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, 1.0, 2.0]);
2741        assert!(stats.is_min_max_backwards_compatible());
2742        if let Statistics::Float(stats) = stats {
2743            assert_eq!(stats.min_opt().unwrap(), &1.0);
2744            assert_eq!(stats.max_opt().unwrap(), &2.0);
2745        } else {
2746            panic!("expecting Statistics::Float");
2747        }
2748    }
2749
2750    #[test]
2751    fn test_float_statistics_nan_only() {
2752        let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, f32::NAN]);
2753        assert!(stats.min_bytes_opt().is_none());
2754        assert!(stats.max_bytes_opt().is_none());
2755        assert!(stats.is_min_max_backwards_compatible());
2756        assert!(matches!(stats, Statistics::Float(_)));
2757    }
2758
2759    #[test]
2760    fn test_float_statistics_zero_only() {
2761        let stats = statistics_roundtrip::<FloatType>(&[0.0]);
2762        assert!(stats.is_min_max_backwards_compatible());
2763        if let Statistics::Float(stats) = stats {
2764            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2765            assert!(stats.min_opt().unwrap().is_sign_negative());
2766            assert_eq!(stats.max_opt().unwrap(), &0.0);
2767            assert!(stats.max_opt().unwrap().is_sign_positive());
2768        } else {
2769            panic!("expecting Statistics::Float");
2770        }
2771    }
2772
2773    #[test]
2774    fn test_float_statistics_neg_zero_only() {
2775        let stats = statistics_roundtrip::<FloatType>(&[-0.0]);
2776        assert!(stats.is_min_max_backwards_compatible());
2777        if let Statistics::Float(stats) = stats {
2778            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2779            assert!(stats.min_opt().unwrap().is_sign_negative());
2780            assert_eq!(stats.max_opt().unwrap(), &0.0);
2781            assert!(stats.max_opt().unwrap().is_sign_positive());
2782        } else {
2783            panic!("expecting Statistics::Float");
2784        }
2785    }
2786
2787    #[test]
2788    fn test_float_statistics_zero_min() {
2789        let stats = statistics_roundtrip::<FloatType>(&[0.0, 1.0, f32::NAN, 2.0]);
2790        assert!(stats.is_min_max_backwards_compatible());
2791        if let Statistics::Float(stats) = stats {
2792            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2793            assert!(stats.min_opt().unwrap().is_sign_negative());
2794            assert_eq!(stats.max_opt().unwrap(), &2.0);
2795        } else {
2796            panic!("expecting Statistics::Float");
2797        }
2798    }
2799
2800    #[test]
2801    fn test_float_statistics_neg_zero_max() {
2802        let stats = statistics_roundtrip::<FloatType>(&[-0.0, -1.0, f32::NAN, -2.0]);
2803        assert!(stats.is_min_max_backwards_compatible());
2804        if let Statistics::Float(stats) = stats {
2805            assert_eq!(stats.min_opt().unwrap(), &-2.0);
2806            assert_eq!(stats.max_opt().unwrap(), &0.0);
2807            assert!(stats.max_opt().unwrap().is_sign_positive());
2808        } else {
2809            panic!("expecting Statistics::Float");
2810        }
2811    }
2812
2813    #[test]
2814    fn test_double_statistics_nan_middle() {
2815        let stats = statistics_roundtrip::<DoubleType>(&[1.0, f64::NAN, 2.0]);
2816        assert!(stats.is_min_max_backwards_compatible());
2817        if let Statistics::Double(stats) = stats {
2818            assert_eq!(stats.min_opt().unwrap(), &1.0);
2819            assert_eq!(stats.max_opt().unwrap(), &2.0);
2820        } else {
2821            panic!("expecting Statistics::Double");
2822        }
2823    }
2824
2825    #[test]
2826    fn test_double_statistics_nan_start() {
2827        let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, 1.0, 2.0]);
2828        assert!(stats.is_min_max_backwards_compatible());
2829        if let Statistics::Double(stats) = stats {
2830            assert_eq!(stats.min_opt().unwrap(), &1.0);
2831            assert_eq!(stats.max_opt().unwrap(), &2.0);
2832        } else {
2833            panic!("expecting Statistics::Double");
2834        }
2835    }
2836
2837    #[test]
2838    fn test_double_statistics_nan_only() {
2839        let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, f64::NAN]);
2840        assert!(stats.min_bytes_opt().is_none());
2841        assert!(stats.max_bytes_opt().is_none());
2842        assert!(matches!(stats, Statistics::Double(_)));
2843        assert!(stats.is_min_max_backwards_compatible());
2844    }
2845
2846    #[test]
2847    fn test_double_statistics_zero_only() {
2848        let stats = statistics_roundtrip::<DoubleType>(&[0.0]);
2849        assert!(stats.is_min_max_backwards_compatible());
2850        if let Statistics::Double(stats) = stats {
2851            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2852            assert!(stats.min_opt().unwrap().is_sign_negative());
2853            assert_eq!(stats.max_opt().unwrap(), &0.0);
2854            assert!(stats.max_opt().unwrap().is_sign_positive());
2855        } else {
2856            panic!("expecting Statistics::Double");
2857        }
2858    }
2859
2860    #[test]
2861    fn test_double_statistics_neg_zero_only() {
2862        let stats = statistics_roundtrip::<DoubleType>(&[-0.0]);
2863        assert!(stats.is_min_max_backwards_compatible());
2864        if let Statistics::Double(stats) = stats {
2865            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2866            assert!(stats.min_opt().unwrap().is_sign_negative());
2867            assert_eq!(stats.max_opt().unwrap(), &0.0);
2868            assert!(stats.max_opt().unwrap().is_sign_positive());
2869        } else {
2870            panic!("expecting Statistics::Double");
2871        }
2872    }
2873
2874    #[test]
2875    fn test_double_statistics_zero_min() {
2876        let stats = statistics_roundtrip::<DoubleType>(&[0.0, 1.0, f64::NAN, 2.0]);
2877        assert!(stats.is_min_max_backwards_compatible());
2878        if let Statistics::Double(stats) = stats {
2879            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2880            assert!(stats.min_opt().unwrap().is_sign_negative());
2881            assert_eq!(stats.max_opt().unwrap(), &2.0);
2882        } else {
2883            panic!("expecting Statistics::Double");
2884        }
2885    }
2886
2887    #[test]
2888    fn test_double_statistics_neg_zero_max() {
2889        let stats = statistics_roundtrip::<DoubleType>(&[-0.0, -1.0, f64::NAN, -2.0]);
2890        assert!(stats.is_min_max_backwards_compatible());
2891        if let Statistics::Double(stats) = stats {
2892            assert_eq!(stats.min_opt().unwrap(), &-2.0);
2893            assert_eq!(stats.max_opt().unwrap(), &0.0);
2894            assert!(stats.max_opt().unwrap().is_sign_positive());
2895        } else {
2896            panic!("expecting Statistics::Double");
2897        }
2898    }
2899
2900    #[test]
2901    fn test_compare_greater_byte_array_decimals() {
2902        assert!(!compare_greater_byte_array_decimals(&[], &[],),);
2903        assert!(compare_greater_byte_array_decimals(&[1u8,], &[],),);
2904        assert!(!compare_greater_byte_array_decimals(&[], &[1u8,],),);
2905        assert!(compare_greater_byte_array_decimals(&[1u8,], &[0u8,],),);
2906        assert!(!compare_greater_byte_array_decimals(&[1u8,], &[1u8,],),);
2907        assert!(compare_greater_byte_array_decimals(&[1u8, 0u8,], &[0u8,],),);
2908        assert!(!compare_greater_byte_array_decimals(
2909            &[0u8, 1u8,],
2910            &[1u8, 0u8,],
2911        ),);
2912        assert!(!compare_greater_byte_array_decimals(
2913            &[255u8, 35u8, 0u8, 0u8,],
2914            &[0u8,],
2915        ),);
2916        assert!(compare_greater_byte_array_decimals(
2917            &[0u8,],
2918            &[255u8, 35u8, 0u8, 0u8,],
2919        ),);
2920    }
2921
2922    #[test]
2923    fn test_column_index_with_null_pages() {
2924        // write a single page of all nulls
2925        let page_writer = get_test_page_writer();
2926        let props = Default::default();
2927        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
2928        writer.write_batch(&[], Some(&[0, 0, 0, 0]), None).unwrap();
2929
2930        let r = writer.close().unwrap();
2931        assert!(r.column_index.is_some());
2932        let col_idx = r.column_index.unwrap();
2933        // null_pages should be true for page 0
2934        assert!(col_idx.null_pages[0]);
2935        // min and max should be empty byte arrays
2936        assert_eq!(col_idx.min_values[0].len(), 0);
2937        assert_eq!(col_idx.max_values[0].len(), 0);
2938        // null_counts should be defined and be 4 for page 0
2939        assert!(col_idx.null_counts.is_some());
2940        assert_eq!(col_idx.null_counts.as_ref().unwrap()[0], 4);
2941        // there is no repetition so rep histogram should be absent
2942        assert!(col_idx.repetition_level_histograms.is_none());
2943        // definition_level_histogram should be present and should be 0:4, 1:0
2944        assert!(col_idx.definition_level_histograms.is_some());
2945        assert_eq!(col_idx.definition_level_histograms.unwrap(), &[4, 0]);
2946    }
2947
2948    #[test]
2949    fn test_column_offset_index_metadata() {
2950        // write data
2951        // and check the offset index and column index
2952        let page_writer = get_test_page_writer();
2953        let props = Default::default();
2954        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2955        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2956        // first page
2957        writer.flush_data_pages().unwrap();
2958        // second page
2959        writer.write_batch(&[4, 8, 2, -5], None, None).unwrap();
2960
2961        let r = writer.close().unwrap();
2962        let column_index = r.column_index.unwrap();
2963        let offset_index = r.offset_index.unwrap();
2964
2965        assert_eq!(8, r.rows_written);
2966
2967        // column index
2968        assert_eq!(2, column_index.null_pages.len());
2969        assert_eq!(2, offset_index.page_locations.len());
2970        assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order);
2971        for idx in 0..2 {
2972            assert!(!column_index.null_pages[idx]);
2973            assert_eq!(0, column_index.null_counts.as_ref().unwrap()[idx]);
2974        }
2975
2976        if let Some(stats) = r.metadata.statistics() {
2977            assert_eq!(stats.null_count_opt(), Some(0));
2978            assert_eq!(stats.distinct_count_opt(), None);
2979            if let Statistics::Int32(stats) = stats {
2980                // first page is [1,2,3,4]
2981                // second page is [-5,2,4,8]
2982                // note that we don't increment here, as this is a non BinaryArray type.
2983                assert_eq!(
2984                    stats.min_bytes_opt(),
2985                    Some(column_index.min_values[1].as_slice())
2986                );
2987                assert_eq!(
2988                    stats.max_bytes_opt(),
2989                    column_index.max_values.get(1).map(Vec::as_slice)
2990                );
2991            } else {
2992                panic!("expecting Statistics::Int32");
2993            }
2994        } else {
2995            panic!("metadata missing statistics");
2996        }
2997
2998        // page location
2999        assert_eq!(0, offset_index.page_locations[0].first_row_index);
3000        assert_eq!(4, offset_index.page_locations[1].first_row_index);
3001    }
3002
3003    /// Verify min/max value truncation in the column index works as expected
3004    #[test]
3005    fn test_column_offset_index_metadata_truncating() {
3006        // write data
3007        // and check the offset index and column index
3008        let page_writer = get_test_page_writer();
3009        let props = Default::default();
3010        let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3011
3012        let mut data = vec![FixedLenByteArray::default(); 3];
3013        // This is the expected min value - "aaa..."
3014        data[0].set_data(Bytes::from(vec![97_u8; 200]));
3015        // This is the expected max value - "ZZZ..."
3016        data[1].set_data(Bytes::from(vec![112_u8; 200]));
3017        data[2].set_data(Bytes::from(vec![98_u8; 200]));
3018
3019        writer.write_batch(&data, None, None).unwrap();
3020
3021        writer.flush_data_pages().unwrap();
3022
3023        let r = writer.close().unwrap();
3024        let column_index = r.column_index.unwrap();
3025        let offset_index = r.offset_index.unwrap();
3026
3027        assert_eq!(3, r.rows_written);
3028
3029        // column index
3030        assert_eq!(1, column_index.null_pages.len());
3031        assert_eq!(1, offset_index.page_locations.len());
3032        assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
3033        assert!(!column_index.null_pages[0]);
3034        assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]);
3035
3036        if let Some(stats) = r.metadata.statistics() {
3037            assert_eq!(stats.null_count_opt(), Some(0));
3038            assert_eq!(stats.distinct_count_opt(), None);
3039            if let Statistics::FixedLenByteArray(stats) = stats {
3040                let column_index_min_value = &column_index.min_values[0];
3041                let column_index_max_value = &column_index.max_values[0];
3042
3043                // Column index stats are truncated, while the column chunk's aren't.
3044                assert_ne!(
3045                    stats.min_bytes_opt(),
3046                    Some(column_index_min_value.as_slice())
3047                );
3048                assert_ne!(
3049                    stats.max_bytes_opt(),
3050                    Some(column_index_max_value.as_slice())
3051                );
3052
3053                assert_eq!(
3054                    column_index_min_value.len(),
3055                    DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
3056                );
3057                assert_eq!(column_index_min_value.as_slice(), &[97_u8; 64]);
3058                assert_eq!(
3059                    column_index_max_value.len(),
3060                    DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
3061                );
3062
3063                // We expect the last byte to be incremented
3064                assert_eq!(
3065                    *column_index_max_value.last().unwrap(),
3066                    *column_index_max_value.first().unwrap() + 1
3067                );
3068            } else {
3069                panic!("expecting Statistics::FixedLenByteArray");
3070            }
3071        } else {
3072            panic!("metadata missing statistics");
3073        }
3074    }
3075
3076    #[test]
3077    fn test_column_offset_index_truncating_spec_example() {
3078        // write data
3079        // and check the offset index and column index
3080        let page_writer = get_test_page_writer();
3081
3082        // Truncate values at 1 byte
3083        let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3084        let props = Arc::new(builder.build());
3085        let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3086
3087        let mut data = vec![FixedLenByteArray::default(); 1];
3088        // This is the expected min value
3089        data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
3090
3091        writer.write_batch(&data, None, None).unwrap();
3092
3093        writer.flush_data_pages().unwrap();
3094
3095        let r = writer.close().unwrap();
3096        let column_index = r.column_index.unwrap();
3097        let offset_index = r.offset_index.unwrap();
3098
3099        assert_eq!(1, r.rows_written);
3100
3101        // column index
3102        assert_eq!(1, column_index.null_pages.len());
3103        assert_eq!(1, offset_index.page_locations.len());
3104        assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
3105        assert!(!column_index.null_pages[0]);
3106        assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]);
3107
3108        if let Some(stats) = r.metadata.statistics() {
3109            assert_eq!(stats.null_count_opt(), Some(0));
3110            assert_eq!(stats.distinct_count_opt(), None);
3111            if let Statistics::FixedLenByteArray(_stats) = stats {
3112                let column_index_min_value = &column_index.min_values[0];
3113                let column_index_max_value = &column_index.max_values[0];
3114
3115                assert_eq!(column_index_min_value.len(), 1);
3116                assert_eq!(column_index_max_value.len(), 1);
3117
3118                assert_eq!("B".as_bytes(), column_index_min_value.as_slice());
3119                assert_eq!("C".as_bytes(), column_index_max_value.as_slice());
3120
3121                assert_ne!(column_index_min_value, stats.min_bytes_opt().unwrap());
3122                assert_ne!(column_index_max_value, stats.max_bytes_opt().unwrap());
3123            } else {
3124                panic!("expecting Statistics::FixedLenByteArray");
3125            }
3126        } else {
3127            panic!("metadata missing statistics");
3128        }
3129    }
3130
3131    #[test]
3132    fn test_float16_min_max_no_truncation() {
3133        // Even if we set truncation to occur at 1 byte, we should not truncate for Float16
3134        let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3135        let props = Arc::new(builder.build());
3136        let page_writer = get_test_page_writer();
3137        let mut writer = get_test_float16_column_writer(page_writer, props);
3138
3139        let expected_value = f16::PI.to_le_bytes().to_vec();
3140        let data = vec![ByteArray::from(expected_value.clone()).into()];
3141        writer.write_batch(&data, None, None).unwrap();
3142        writer.flush_data_pages().unwrap();
3143
3144        let r = writer.close().unwrap();
3145
3146        // stats should still be written
3147        // ensure bytes weren't truncated for column index
3148        let column_index = r.column_index.unwrap();
3149        let column_index_min_bytes = column_index.min_values[0].as_slice();
3150        let column_index_max_bytes = column_index.max_values[0].as_slice();
3151        assert_eq!(expected_value, column_index_min_bytes);
3152        assert_eq!(expected_value, column_index_max_bytes);
3153
3154        // ensure bytes weren't truncated for statistics
3155        let stats = r.metadata.statistics().unwrap();
3156        if let Statistics::FixedLenByteArray(stats) = stats {
3157            let stats_min_bytes = stats.min_bytes_opt().unwrap();
3158            let stats_max_bytes = stats.max_bytes_opt().unwrap();
3159            assert_eq!(expected_value, stats_min_bytes);
3160            assert_eq!(expected_value, stats_max_bytes);
3161        } else {
3162            panic!("expecting Statistics::FixedLenByteArray");
3163        }
3164    }
3165
3166    #[test]
3167    fn test_decimal_min_max_no_truncation() {
3168        // Even if we set truncation to occur at 1 byte, we should not truncate for Decimal
3169        let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3170        let props = Arc::new(builder.build());
3171        let page_writer = get_test_page_writer();
3172        let mut writer =
3173            get_test_decimals_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3174
3175        let expected_value = vec![
3176            255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8, 35u8,
3177            231u8, 90u8, 0u8, 0u8,
3178        ];
3179        let data = vec![ByteArray::from(expected_value.clone()).into()];
3180        writer.write_batch(&data, None, None).unwrap();
3181        writer.flush_data_pages().unwrap();
3182
3183        let r = writer.close().unwrap();
3184
3185        // stats should still be written
3186        // ensure bytes weren't truncated for column index
3187        let column_index = r.column_index.unwrap();
3188        let column_index_min_bytes = column_index.min_values[0].as_slice();
3189        let column_index_max_bytes = column_index.max_values[0].as_slice();
3190        assert_eq!(expected_value, column_index_min_bytes);
3191        assert_eq!(expected_value, column_index_max_bytes);
3192
3193        // ensure bytes weren't truncated for statistics
3194        let stats = r.metadata.statistics().unwrap();
3195        if let Statistics::FixedLenByteArray(stats) = stats {
3196            let stats_min_bytes = stats.min_bytes_opt().unwrap();
3197            let stats_max_bytes = stats.max_bytes_opt().unwrap();
3198            assert_eq!(expected_value, stats_min_bytes);
3199            assert_eq!(expected_value, stats_max_bytes);
3200        } else {
3201            panic!("expecting Statistics::FixedLenByteArray");
3202        }
3203    }
3204
3205    #[test]
3206    fn test_statistics_truncating_byte_array() {
3207        let page_writer = get_test_page_writer();
3208
3209        const TEST_TRUNCATE_LENGTH: usize = 1;
3210
3211        // Truncate values at 1 byte
3212        let builder =
3213            WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3214        let props = Arc::new(builder.build());
3215        let mut writer = get_test_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
3216
3217        let mut data = vec![ByteArray::default(); 1];
3218        // This is the expected min value
3219        data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
3220
3221        writer.write_batch(&data, None, None).unwrap();
3222
3223        writer.flush_data_pages().unwrap();
3224
3225        let r = writer.close().unwrap();
3226
3227        assert_eq!(1, r.rows_written);
3228
3229        let stats = r.metadata.statistics().expect("statistics");
3230        assert_eq!(stats.null_count_opt(), Some(0));
3231        assert_eq!(stats.distinct_count_opt(), None);
3232        if let Statistics::ByteArray(_stats) = stats {
3233            let min_value = _stats.min_opt().unwrap();
3234            let max_value = _stats.max_opt().unwrap();
3235
3236            assert!(!_stats.min_is_exact());
3237            assert!(!_stats.max_is_exact());
3238
3239            assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3240            assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3241
3242            assert_eq!("B".as_bytes(), min_value.as_bytes());
3243            assert_eq!("C".as_bytes(), max_value.as_bytes());
3244        } else {
3245            panic!("expecting Statistics::ByteArray");
3246        }
3247    }
3248
3249    #[test]
3250    fn test_statistics_truncating_fixed_len_byte_array() {
3251        let page_writer = get_test_page_writer();
3252
3253        const TEST_TRUNCATE_LENGTH: usize = 1;
3254
3255        // Truncate values at 1 byte
3256        let builder =
3257            WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3258        let props = Arc::new(builder.build());
3259        let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3260
3261        let mut data = vec![FixedLenByteArray::default(); 1];
3262
3263        const PSEUDO_DECIMAL_VALUE: i128 = 6541894651216648486512564456564654;
3264        const PSEUDO_DECIMAL_BYTES: [u8; 16] = PSEUDO_DECIMAL_VALUE.to_be_bytes();
3265
3266        const EXPECTED_MIN: [u8; TEST_TRUNCATE_LENGTH] = [PSEUDO_DECIMAL_BYTES[0]]; // parquet specifies big-endian order for decimals
3267        const EXPECTED_MAX: [u8; TEST_TRUNCATE_LENGTH] =
3268            [PSEUDO_DECIMAL_BYTES[0].overflowing_add(1).0];
3269
3270        // This is the expected min value
3271        data[0].set_data(Bytes::from(PSEUDO_DECIMAL_BYTES.as_slice()));
3272
3273        writer.write_batch(&data, None, None).unwrap();
3274
3275        writer.flush_data_pages().unwrap();
3276
3277        let r = writer.close().unwrap();
3278
3279        assert_eq!(1, r.rows_written);
3280
3281        let stats = r.metadata.statistics().expect("statistics");
3282        assert_eq!(stats.null_count_opt(), Some(0));
3283        assert_eq!(stats.distinct_count_opt(), None);
3284        if let Statistics::FixedLenByteArray(_stats) = stats {
3285            let min_value = _stats.min_opt().unwrap();
3286            let max_value = _stats.max_opt().unwrap();
3287
3288            assert!(!_stats.min_is_exact());
3289            assert!(!_stats.max_is_exact());
3290
3291            assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3292            assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3293
3294            assert_eq!(EXPECTED_MIN.as_slice(), min_value.as_bytes());
3295            assert_eq!(EXPECTED_MAX.as_slice(), max_value.as_bytes());
3296
3297            let reconstructed_min = i128::from_be_bytes([
3298                min_value.as_bytes()[0],
3299                0,
3300                0,
3301                0,
3302                0,
3303                0,
3304                0,
3305                0,
3306                0,
3307                0,
3308                0,
3309                0,
3310                0,
3311                0,
3312                0,
3313                0,
3314            ]);
3315
3316            let reconstructed_max = i128::from_be_bytes([
3317                max_value.as_bytes()[0],
3318                0,
3319                0,
3320                0,
3321                0,
3322                0,
3323                0,
3324                0,
3325                0,
3326                0,
3327                0,
3328                0,
3329                0,
3330                0,
3331                0,
3332                0,
3333            ]);
3334
3335            // check that the inner value is correctly bounded by the min/max
3336            println!("min: {reconstructed_min} {PSEUDO_DECIMAL_VALUE}");
3337            assert!(reconstructed_min <= PSEUDO_DECIMAL_VALUE);
3338            println!("max {reconstructed_max} {PSEUDO_DECIMAL_VALUE}");
3339            assert!(reconstructed_max >= PSEUDO_DECIMAL_VALUE);
3340        } else {
3341            panic!("expecting Statistics::FixedLenByteArray");
3342        }
3343    }
3344
3345    #[test]
3346    fn test_send() {
3347        fn test<T: Send>() {}
3348        test::<ColumnWriterImpl<Int32Type>>();
3349    }
3350
3351    #[test]
3352    fn test_increment() {
3353        let v = increment(vec![0, 0, 0]).unwrap();
3354        assert_eq!(&v, &[0, 0, 1]);
3355
3356        // Handle overflow
3357        let v = increment(vec![0, 255, 255]).unwrap();
3358        assert_eq!(&v, &[1, 0, 0]);
3359
3360        // Return `None` if all bytes are u8::MAX
3361        let v = increment(vec![255, 255, 255]);
3362        assert!(v.is_none());
3363    }
3364
3365    #[test]
3366    fn test_increment_utf8() {
3367        let test_inc = |o: &str, expected: &str| {
3368            if let Ok(v) = String::from_utf8(increment_utf8(o).unwrap()) {
3369                // Got the expected result...
3370                assert_eq!(v, expected);
3371                // and it's greater than the original string
3372                assert!(*v > *o);
3373                // Also show that BinaryArray level comparison works here
3374                let mut greater = ByteArray::new();
3375                greater.set_data(Bytes::from(v));
3376                let mut original = ByteArray::new();
3377                original.set_data(Bytes::from(o.as_bytes().to_vec()));
3378                assert!(greater > original);
3379            } else {
3380                panic!("Expected incremented UTF8 string to also be valid.");
3381            }
3382        };
3383
3384        // Basic ASCII case
3385        test_inc("hello", "hellp");
3386
3387        // 1-byte ending in max 1-byte
3388        test_inc("a\u{7f}", "b");
3389
3390        // 1-byte max should not truncate as it would need 2-byte code points
3391        assert!(increment_utf8("\u{7f}\u{7f}").is_none());
3392
3393        // UTF8 string
3394        test_inc("❤️🧡💛💚💙💜", "❤️🧡💛💚💙💝");
3395
3396        // 2-byte without overflow
3397        test_inc("éééé", "éééê");
3398
3399        // 2-byte that overflows lowest byte
3400        test_inc("\u{ff}\u{ff}", "\u{ff}\u{100}");
3401
3402        // 2-byte ending in max 2-byte
3403        test_inc("a\u{7ff}", "b");
3404
3405        // Max 2-byte should not truncate as it would need 3-byte code points
3406        assert!(increment_utf8("\u{7ff}\u{7ff}").is_none());
3407
3408        // 3-byte without overflow [U+800, U+800] -> [U+800, U+801] (note that these
3409        // characters should render right to left).
3410        test_inc("ࠀࠀ", "ࠀࠁ");
3411
3412        // 3-byte ending in max 3-byte
3413        test_inc("a\u{ffff}", "b");
3414
3415        // Max 3-byte should not truncate as it would need 4-byte code points
3416        assert!(increment_utf8("\u{ffff}\u{ffff}").is_none());
3417
3418        // 4-byte without overflow
3419        test_inc("𐀀𐀀", "𐀀𐀁");
3420
3421        // 4-byte ending in max unicode
3422        test_inc("a\u{10ffff}", "b");
3423
3424        // Max 4-byte should not truncate
3425        assert!(increment_utf8("\u{10ffff}\u{10ffff}").is_none());
3426
3427        // Skip over surrogate pair range (0xD800..=0xDFFF)
3428        //test_inc("a\u{D7FF}", "a\u{e000}");
3429        test_inc("a\u{D7FF}", "b");
3430    }
3431
3432    #[test]
3433    fn test_truncate_utf8() {
3434        // No-op
3435        let data = "❤️🧡💛💚💙💜";
3436        let r = truncate_utf8(data, data.len()).unwrap();
3437        assert_eq!(r.len(), data.len());
3438        assert_eq!(&r, data.as_bytes());
3439
3440        // We slice it away from the UTF8 boundary
3441        let r = truncate_utf8(data, 13).unwrap();
3442        assert_eq!(r.len(), 10);
3443        assert_eq!(&r, "❤️🧡".as_bytes());
3444
3445        // One multi-byte code point, and a length shorter than it, so we can't slice it
3446        let r = truncate_utf8("\u{0836}", 1);
3447        assert!(r.is_none());
3448
3449        // Test truncate and increment for max bounds on UTF-8 statistics
3450        // 7-bit (i.e. ASCII)
3451        let r = truncate_and_increment_utf8("yyyyyyyyy", 8).unwrap();
3452        assert_eq!(&r, "yyyyyyyz".as_bytes());
3453
3454        // 2-byte without overflow
3455        let r = truncate_and_increment_utf8("ééééé", 7).unwrap();
3456        assert_eq!(&r, "ééê".as_bytes());
3457
3458        // 2-byte that overflows lowest byte
3459        let r = truncate_and_increment_utf8("\u{ff}\u{ff}\u{ff}\u{ff}\u{ff}", 8).unwrap();
3460        assert_eq!(&r, "\u{ff}\u{ff}\u{ff}\u{100}".as_bytes());
3461
3462        // max 2-byte should not truncate as it would need 3-byte code points
3463        let r = truncate_and_increment_utf8("߿߿߿߿߿", 8);
3464        assert!(r.is_none());
3465
3466        // 3-byte without overflow [U+800, U+800, U+800] -> [U+800, U+801] (note that these
3467        // characters should render right to left).
3468        let r = truncate_and_increment_utf8("ࠀࠀࠀࠀ", 8).unwrap();
3469        assert_eq!(&r, "ࠀࠁ".as_bytes());
3470
3471        // max 3-byte should not truncate as it would need 4-byte code points
3472        let r = truncate_and_increment_utf8("\u{ffff}\u{ffff}\u{ffff}", 8);
3473        assert!(r.is_none());
3474
3475        // 4-byte without overflow
3476        let r = truncate_and_increment_utf8("𐀀𐀀𐀀𐀀", 9).unwrap();
3477        assert_eq!(&r, "𐀀𐀁".as_bytes());
3478
3479        // max 4-byte should not truncate
3480        let r = truncate_and_increment_utf8("\u{10ffff}\u{10ffff}", 8);
3481        assert!(r.is_none());
3482    }
3483
3484    #[test]
3485    // Check fallback truncation of statistics that should be UTF-8, but aren't
3486    // (see https://github.com/apache/arrow-rs/pull/6870).
3487    fn test_byte_array_truncate_invalid_utf8_statistics() {
3488        let message_type = "
3489            message test_schema {
3490                OPTIONAL BYTE_ARRAY a (UTF8);
3491            }
3492        ";
3493        let schema = Arc::new(parse_message_type(message_type).unwrap());
3494
3495        // Create Vec<ByteArray> containing non-UTF8 bytes
3496        let data = vec![ByteArray::from(vec![128u8; 32]); 7];
3497        let def_levels = [1, 1, 1, 1, 0, 1, 0, 1, 0, 1];
3498        let file: File = tempfile::tempfile().unwrap();
3499        let props = Arc::new(
3500            WriterProperties::builder()
3501                .set_statistics_enabled(EnabledStatistics::Chunk)
3502                .set_statistics_truncate_length(Some(8))
3503                .build(),
3504        );
3505
3506        let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
3507        let mut row_group_writer = writer.next_row_group().unwrap();
3508
3509        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
3510        col_writer
3511            .typed::<ByteArrayType>()
3512            .write_batch(&data, Some(&def_levels), None)
3513            .unwrap();
3514        col_writer.close().unwrap();
3515        row_group_writer.close().unwrap();
3516        let file_metadata = writer.close().unwrap();
3517        assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
3518        let stats = file_metadata.row_groups[0].columns[0]
3519            .meta_data
3520            .as_ref()
3521            .unwrap()
3522            .statistics
3523            .as_ref()
3524            .unwrap();
3525        assert!(!stats.is_max_value_exact.unwrap());
3526        // Truncation of invalid UTF-8 should fall back to binary truncation, so last byte should
3527        // be incremented by 1.
3528        assert_eq!(
3529            stats.max_value,
3530            Some([128, 128, 128, 128, 128, 128, 128, 129].to_vec())
3531        );
3532    }
3533
3534    #[test]
3535    fn test_increment_max_binary_chars() {
3536        let r = increment(vec![0xFF, 0xFE, 0xFD, 0xFF, 0xFF]);
3537        assert_eq!(&r.unwrap(), &[0xFF, 0xFE, 0xFE, 0x00, 0x00]);
3538
3539        let incremented = increment(vec![0xFF, 0xFF, 0xFF]);
3540        assert!(incremented.is_none())
3541    }
3542
3543    #[test]
3544    fn test_no_column_index_when_stats_disabled() {
3545        // https://github.com/apache/arrow-rs/issues/6010
3546        // Test that column index is not created/written for all-nulls column when page
3547        // statistics are disabled.
3548        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3549        let props = Arc::new(
3550            WriterProperties::builder()
3551                .set_statistics_enabled(EnabledStatistics::None)
3552                .build(),
3553        );
3554        let column_writer = get_column_writer(descr, props, get_test_page_writer());
3555        let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3556
3557        let data = Vec::new();
3558        let def_levels = vec![0; 10];
3559        writer.write_batch(&data, Some(&def_levels), None).unwrap();
3560        writer.flush_data_pages().unwrap();
3561
3562        let column_close_result = writer.close().unwrap();
3563        assert!(column_close_result.offset_index.is_some());
3564        assert!(column_close_result.column_index.is_none());
3565    }
3566
3567    #[test]
3568    fn test_no_offset_index_when_disabled() {
3569        // Test that offset indexes can be disabled
3570        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3571        let props = Arc::new(
3572            WriterProperties::builder()
3573                .set_statistics_enabled(EnabledStatistics::None)
3574                .set_offset_index_disabled(true)
3575                .build(),
3576        );
3577        let column_writer = get_column_writer(descr, props, get_test_page_writer());
3578        let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3579
3580        let data = Vec::new();
3581        let def_levels = vec![0; 10];
3582        writer.write_batch(&data, Some(&def_levels), None).unwrap();
3583        writer.flush_data_pages().unwrap();
3584
3585        let column_close_result = writer.close().unwrap();
3586        assert!(column_close_result.offset_index.is_none());
3587        assert!(column_close_result.column_index.is_none());
3588    }
3589
3590    #[test]
3591    fn test_offset_index_overridden() {
3592        // Test that offset indexes are not disabled when gathering page statistics
3593        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3594        let props = Arc::new(
3595            WriterProperties::builder()
3596                .set_statistics_enabled(EnabledStatistics::Page)
3597                .set_offset_index_disabled(true)
3598                .build(),
3599        );
3600        let column_writer = get_column_writer(descr, props, get_test_page_writer());
3601        let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3602
3603        let data = Vec::new();
3604        let def_levels = vec![0; 10];
3605        writer.write_batch(&data, Some(&def_levels), None).unwrap();
3606        writer.flush_data_pages().unwrap();
3607
3608        let column_close_result = writer.close().unwrap();
3609        assert!(column_close_result.offset_index.is_some());
3610        assert!(column_close_result.column_index.is_some());
3611    }
3612
3613    #[test]
3614    fn test_boundary_order() -> Result<()> {
3615        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3616        // min max both ascending
3617        let column_close_result = write_multiple_pages::<Int32Type>(
3618            &descr,
3619            &[
3620                &[Some(-10), Some(10)],
3621                &[Some(-5), Some(11)],
3622                &[None],
3623                &[Some(-5), Some(11)],
3624            ],
3625        )?;
3626        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3627        assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3628
3629        // min max both descending
3630        let column_close_result = write_multiple_pages::<Int32Type>(
3631            &descr,
3632            &[
3633                &[Some(10), Some(11)],
3634                &[Some(5), Some(11)],
3635                &[None],
3636                &[Some(-5), Some(0)],
3637            ],
3638        )?;
3639        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3640        assert_eq!(boundary_order, BoundaryOrder::DESCENDING);
3641
3642        // min max both equal
3643        let column_close_result = write_multiple_pages::<Int32Type>(
3644            &descr,
3645            &[&[Some(10), Some(11)], &[None], &[Some(10), Some(11)]],
3646        )?;
3647        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3648        assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3649
3650        // only nulls
3651        let column_close_result =
3652            write_multiple_pages::<Int32Type>(&descr, &[&[None], &[None], &[None]])?;
3653        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3654        assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3655
3656        // one page
3657        let column_close_result =
3658            write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)]])?;
3659        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3660        assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3661
3662        // one non-null page
3663        let column_close_result =
3664            write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)], &[None]])?;
3665        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3666        assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3667
3668        // min max both unordered
3669        let column_close_result = write_multiple_pages::<Int32Type>(
3670            &descr,
3671            &[
3672                &[Some(10), Some(11)],
3673                &[Some(11), Some(16)],
3674                &[None],
3675                &[Some(-5), Some(0)],
3676            ],
3677        )?;
3678        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3679        assert_eq!(boundary_order, BoundaryOrder::UNORDERED);
3680
3681        // min max both ordered in different orders
3682        let column_close_result = write_multiple_pages::<Int32Type>(
3683            &descr,
3684            &[
3685                &[Some(1), Some(9)],
3686                &[Some(2), Some(8)],
3687                &[None],
3688                &[Some(3), Some(7)],
3689            ],
3690        )?;
3691        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3692        assert_eq!(boundary_order, BoundaryOrder::UNORDERED);
3693
3694        Ok(())
3695    }
3696
3697    #[test]
3698    fn test_boundary_order_logical_type() -> Result<()> {
3699        // ensure that logical types account for different sort order than underlying
3700        // physical type representation
3701        let f16_descr = Arc::new(get_test_float16_column_descr(1, 0));
3702        let fba_descr = {
3703            let tpe = SchemaType::primitive_type_builder(
3704                "col",
3705                FixedLenByteArrayType::get_physical_type(),
3706            )
3707            .with_length(2)
3708            .build()?;
3709            Arc::new(ColumnDescriptor::new(
3710                Arc::new(tpe),
3711                1,
3712                0,
3713                ColumnPath::from("col"),
3714            ))
3715        };
3716
3717        let values: &[&[Option<FixedLenByteArray>]] = &[
3718            &[Some(FixedLenByteArray::from(ByteArray::from(f16::ONE)))],
3719            &[Some(FixedLenByteArray::from(ByteArray::from(f16::ZERO)))],
3720            &[Some(FixedLenByteArray::from(ByteArray::from(
3721                f16::NEG_ZERO,
3722            )))],
3723            &[Some(FixedLenByteArray::from(ByteArray::from(f16::NEG_ONE)))],
3724        ];
3725
3726        // f16 descending
3727        let column_close_result =
3728            write_multiple_pages::<FixedLenByteArrayType>(&f16_descr, values)?;
3729        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3730        assert_eq!(boundary_order, BoundaryOrder::DESCENDING);
3731
3732        // same bytes, but fba unordered
3733        let column_close_result =
3734            write_multiple_pages::<FixedLenByteArrayType>(&fba_descr, values)?;
3735        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3736        assert_eq!(boundary_order, BoundaryOrder::UNORDERED);
3737
3738        Ok(())
3739    }
3740
3741    #[test]
3742    fn test_interval_stats_should_not_have_min_max() {
3743        let input = [
3744            vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
3745            vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
3746            vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2],
3747        ]
3748        .into_iter()
3749        .map(|s| ByteArray::from(s).into())
3750        .collect::<Vec<_>>();
3751
3752        let page_writer = get_test_page_writer();
3753        let mut writer = get_test_interval_column_writer(page_writer);
3754        writer.write_batch(&input, None, None).unwrap();
3755
3756        let metadata = writer.close().unwrap().metadata;
3757        let stats = if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
3758            stats.clone()
3759        } else {
3760            panic!("metadata missing statistics");
3761        };
3762        assert!(stats.min_bytes_opt().is_none());
3763        assert!(stats.max_bytes_opt().is_none());
3764    }
3765
3766    #[test]
3767    #[cfg(feature = "arrow")]
3768    fn test_column_writer_get_estimated_total_bytes() {
3769        let page_writer = get_test_page_writer();
3770        let props = Default::default();
3771        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
3772        assert_eq!(writer.get_estimated_total_bytes(), 0);
3773
3774        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
3775        writer.add_data_page().unwrap();
3776        let size_with_one_page = writer.get_estimated_total_bytes();
3777        assert_eq!(size_with_one_page, 20);
3778
3779        writer.write_batch(&[5, 6, 7, 8], None, None).unwrap();
3780        writer.add_data_page().unwrap();
3781        let size_with_two_pages = writer.get_estimated_total_bytes();
3782        // different pages have different compressed lengths
3783        assert_eq!(size_with_two_pages, 20 + 21);
3784    }
3785
3786    fn write_multiple_pages<T: DataType>(
3787        column_descr: &Arc<ColumnDescriptor>,
3788        pages: &[&[Option<T::T>]],
3789    ) -> Result<ColumnCloseResult> {
3790        let column_writer = get_column_writer(
3791            column_descr.clone(),
3792            Default::default(),
3793            get_test_page_writer(),
3794        );
3795        let mut writer = get_typed_column_writer::<T>(column_writer);
3796
3797        for &page in pages {
3798            let values = page.iter().filter_map(Clone::clone).collect::<Vec<_>>();
3799            let def_levels = page
3800                .iter()
3801                .map(|maybe_value| if maybe_value.is_some() { 1 } else { 0 })
3802                .collect::<Vec<_>>();
3803            writer.write_batch(&values, Some(&def_levels), None)?;
3804            writer.flush_data_pages()?;
3805        }
3806
3807        writer.close()
3808    }
3809
3810    /// Performs write-read roundtrip with randomly generated values and levels.
3811    /// `max_size` is maximum number of values or levels (if `max_def_level` > 0) to write
3812    /// for a column.
3813    fn column_roundtrip_random<T: DataType>(
3814        props: WriterProperties,
3815        max_size: usize,
3816        min_value: T::T,
3817        max_value: T::T,
3818        max_def_level: i16,
3819        max_rep_level: i16,
3820    ) where
3821        T::T: PartialOrd + SampleUniform + Copy,
3822    {
3823        let mut num_values: usize = 0;
3824
3825        let mut buf: Vec<i16> = Vec::new();
3826        let def_levels = if max_def_level > 0 {
3827            random_numbers_range(max_size, 0, max_def_level + 1, &mut buf);
3828            for &dl in &buf[..] {
3829                if dl == max_def_level {
3830                    num_values += 1;
3831                }
3832            }
3833            Some(&buf[..])
3834        } else {
3835            num_values = max_size;
3836            None
3837        };
3838
3839        let mut buf: Vec<i16> = Vec::new();
3840        let rep_levels = if max_rep_level > 0 {
3841            random_numbers_range(max_size, 0, max_rep_level + 1, &mut buf);
3842            buf[0] = 0; // Must start on record boundary
3843            Some(&buf[..])
3844        } else {
3845            None
3846        };
3847
3848        let mut values: Vec<T::T> = Vec::new();
3849        random_numbers_range(num_values, min_value, max_value, &mut values);
3850
3851        column_roundtrip::<T>(props, &values[..], def_levels, rep_levels);
3852    }
3853
3854    /// Performs write-read roundtrip and asserts written values and levels.
3855    fn column_roundtrip<T: DataType>(
3856        props: WriterProperties,
3857        values: &[T::T],
3858        def_levels: Option<&[i16]>,
3859        rep_levels: Option<&[i16]>,
3860    ) {
3861        let mut file = tempfile::tempfile().unwrap();
3862        let mut write = TrackedWrite::new(&mut file);
3863        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
3864
3865        let max_def_level = match def_levels {
3866            Some(buf) => *buf.iter().max().unwrap_or(&0i16),
3867            None => 0i16,
3868        };
3869
3870        let max_rep_level = match rep_levels {
3871            Some(buf) => *buf.iter().max().unwrap_or(&0i16),
3872            None => 0i16,
3873        };
3874
3875        let mut max_batch_size = values.len();
3876        if let Some(levels) = def_levels {
3877            max_batch_size = max_batch_size.max(levels.len());
3878        }
3879        if let Some(levels) = rep_levels {
3880            max_batch_size = max_batch_size.max(levels.len());
3881        }
3882
3883        let mut writer =
3884            get_test_column_writer::<T>(page_writer, max_def_level, max_rep_level, Arc::new(props));
3885
3886        let values_written = writer.write_batch(values, def_levels, rep_levels).unwrap();
3887        assert_eq!(values_written, values.len());
3888        let result = writer.close().unwrap();
3889
3890        drop(write);
3891
3892        let props = ReaderProperties::builder()
3893            .set_backward_compatible_lz4(false)
3894            .build();
3895        let page_reader = Box::new(
3896            SerializedPageReader::new_with_properties(
3897                Arc::new(file),
3898                &result.metadata,
3899                result.rows_written as usize,
3900                None,
3901                Arc::new(props),
3902            )
3903            .unwrap(),
3904        );
3905        let mut reader = get_test_column_reader::<T>(page_reader, max_def_level, max_rep_level);
3906
3907        let mut actual_values = Vec::with_capacity(max_batch_size);
3908        let mut actual_def_levels = def_levels.map(|_| Vec::with_capacity(max_batch_size));
3909        let mut actual_rep_levels = rep_levels.map(|_| Vec::with_capacity(max_batch_size));
3910
3911        let (_, values_read, levels_read) = reader
3912            .read_records(
3913                max_batch_size,
3914                actual_def_levels.as_mut(),
3915                actual_rep_levels.as_mut(),
3916                &mut actual_values,
3917            )
3918            .unwrap();
3919
3920        // Assert values, definition and repetition levels.
3921
3922        assert_eq!(&actual_values[..values_read], values);
3923        match actual_def_levels {
3924            Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), def_levels),
3925            None => assert_eq!(None, def_levels),
3926        }
3927        match actual_rep_levels {
3928            Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), rep_levels),
3929            None => assert_eq!(None, rep_levels),
3930        }
3931
3932        // Assert written rows.
3933
3934        if let Some(levels) = actual_rep_levels {
3935            let mut actual_rows_written = 0;
3936            for l in levels {
3937                if l == 0 {
3938                    actual_rows_written += 1;
3939                }
3940            }
3941            assert_eq!(actual_rows_written, result.rows_written);
3942        } else if actual_def_levels.is_some() {
3943            assert_eq!(levels_read as u64, result.rows_written);
3944        } else {
3945            assert_eq!(values_read as u64, result.rows_written);
3946        }
3947    }
3948
3949    /// Performs write of provided values and returns column metadata of those values.
3950    /// Used to test encoding support for column writer.
3951    fn column_write_and_get_metadata<T: DataType>(
3952        props: WriterProperties,
3953        values: &[T::T],
3954    ) -> ColumnChunkMetaData {
3955        let page_writer = get_test_page_writer();
3956        let props = Arc::new(props);
3957        let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
3958        writer.write_batch(values, None, None).unwrap();
3959        writer.close().unwrap().metadata
3960    }
3961
3962    // Helper function to more compactly create a PageEncodingStats struct.
3963    fn encoding_stats(page_type: PageType, encoding: Encoding, count: i32) -> PageEncodingStats {
3964        PageEncodingStats {
3965            page_type,
3966            encoding,
3967            count,
3968        }
3969    }
3970
3971    // Function to use in tests for EncodingWriteSupport. This checks that dictionary
3972    // offset and encodings to make sure that column writer uses provided by trait
3973    // encodings.
3974    fn check_encoding_write_support<T: DataType>(
3975        version: WriterVersion,
3976        dict_enabled: bool,
3977        data: &[T::T],
3978        dictionary_page_offset: Option<i64>,
3979        encodings: &[Encoding],
3980        page_encoding_stats: &[PageEncodingStats],
3981    ) {
3982        let props = WriterProperties::builder()
3983            .set_writer_version(version)
3984            .set_dictionary_enabled(dict_enabled)
3985            .build();
3986        let meta = column_write_and_get_metadata::<T>(props, data);
3987        assert_eq!(meta.dictionary_page_offset(), dictionary_page_offset);
3988        assert_eq!(meta.encodings(), encodings);
3989        assert_eq!(meta.page_encoding_stats().unwrap(), page_encoding_stats);
3990    }
3991
3992    /// Returns column writer.
3993    fn get_test_column_writer<'a, T: DataType>(
3994        page_writer: Box<dyn PageWriter + 'a>,
3995        max_def_level: i16,
3996        max_rep_level: i16,
3997        props: WriterPropertiesPtr,
3998    ) -> ColumnWriterImpl<'a, T> {
3999        let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
4000        let column_writer = get_column_writer(descr, props, page_writer);
4001        get_typed_column_writer::<T>(column_writer)
4002    }
4003
4004    /// Returns column reader.
4005    fn get_test_column_reader<T: DataType>(
4006        page_reader: Box<dyn PageReader>,
4007        max_def_level: i16,
4008        max_rep_level: i16,
4009    ) -> ColumnReaderImpl<T> {
4010        let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
4011        let column_reader = get_column_reader(descr, page_reader);
4012        get_typed_column_reader::<T>(column_reader)
4013    }
4014
4015    /// Returns descriptor for primitive column.
4016    fn get_test_column_descr<T: DataType>(
4017        max_def_level: i16,
4018        max_rep_level: i16,
4019    ) -> ColumnDescriptor {
4020        let path = ColumnPath::from("col");
4021        let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4022            // length is set for "encoding support" tests for FIXED_LEN_BYTE_ARRAY type,
4023            // it should be no-op for other types
4024            .with_length(1)
4025            .build()
4026            .unwrap();
4027        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4028    }
4029
4030    /// Returns page writer that collects pages without serializing them.
4031    fn get_test_page_writer() -> Box<dyn PageWriter> {
4032        Box::new(TestPageWriter {})
4033    }
4034
4035    struct TestPageWriter {}
4036
4037    impl PageWriter for TestPageWriter {
4038        fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
4039            let mut res = PageWriteSpec::new();
4040            res.page_type = page.page_type();
4041            res.uncompressed_size = page.uncompressed_size();
4042            res.compressed_size = page.compressed_size();
4043            res.num_values = page.num_values();
4044            res.offset = 0;
4045            res.bytes_written = page.data().len() as u64;
4046            Ok(res)
4047        }
4048
4049        fn close(&mut self) -> Result<()> {
4050            Ok(())
4051        }
4052    }
4053
4054    /// Write data into parquet using [`get_test_page_writer`] and [`get_test_column_writer`] and returns generated statistics.
4055    fn statistics_roundtrip<T: DataType>(values: &[<T as DataType>::T]) -> Statistics {
4056        let page_writer = get_test_page_writer();
4057        let props = Default::default();
4058        let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
4059        writer.write_batch(values, None, None).unwrap();
4060
4061        let metadata = writer.close().unwrap().metadata;
4062        if let Some(stats) = metadata.statistics() {
4063            stats.clone()
4064        } else {
4065            panic!("metadata missing statistics");
4066        }
4067    }
4068
4069    /// Returns Decimals column writer.
4070    fn get_test_decimals_column_writer<T: DataType>(
4071        page_writer: Box<dyn PageWriter>,
4072        max_def_level: i16,
4073        max_rep_level: i16,
4074        props: WriterPropertiesPtr,
4075    ) -> ColumnWriterImpl<'static, T> {
4076        let descr = Arc::new(get_test_decimals_column_descr::<T>(
4077            max_def_level,
4078            max_rep_level,
4079        ));
4080        let column_writer = get_column_writer(descr, props, page_writer);
4081        get_typed_column_writer::<T>(column_writer)
4082    }
4083
4084    /// Returns descriptor for Decimal type with primitive column.
4085    fn get_test_decimals_column_descr<T: DataType>(
4086        max_def_level: i16,
4087        max_rep_level: i16,
4088    ) -> ColumnDescriptor {
4089        let path = ColumnPath::from("col");
4090        let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4091            .with_length(16)
4092            .with_logical_type(Some(LogicalType::Decimal {
4093                scale: 2,
4094                precision: 3,
4095            }))
4096            .with_scale(2)
4097            .with_precision(3)
4098            .build()
4099            .unwrap();
4100        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4101    }
4102
4103    fn float16_statistics_roundtrip(
4104        values: &[FixedLenByteArray],
4105    ) -> ValueStatistics<FixedLenByteArray> {
4106        let page_writer = get_test_page_writer();
4107        let mut writer = get_test_float16_column_writer(page_writer, Default::default());
4108        writer.write_batch(values, None, None).unwrap();
4109
4110        let metadata = writer.close().unwrap().metadata;
4111        if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
4112            stats.clone()
4113        } else {
4114            panic!("metadata missing statistics");
4115        }
4116    }
4117
4118    fn get_test_float16_column_writer(
4119        page_writer: Box<dyn PageWriter>,
4120        props: WriterPropertiesPtr,
4121    ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
4122        let descr = Arc::new(get_test_float16_column_descr(0, 0));
4123        let column_writer = get_column_writer(descr, props, page_writer);
4124        get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
4125    }
4126
4127    fn get_test_float16_column_descr(max_def_level: i16, max_rep_level: i16) -> ColumnDescriptor {
4128        let path = ColumnPath::from("col");
4129        let tpe =
4130            SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
4131                .with_length(2)
4132                .with_logical_type(Some(LogicalType::Float16))
4133                .build()
4134                .unwrap();
4135        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4136    }
4137
4138    fn get_test_interval_column_writer(
4139        page_writer: Box<dyn PageWriter>,
4140    ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
4141        let descr = Arc::new(get_test_interval_column_descr());
4142        let column_writer = get_column_writer(descr, Default::default(), page_writer);
4143        get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
4144    }
4145
4146    fn get_test_interval_column_descr() -> ColumnDescriptor {
4147        let path = ColumnPath::from("col");
4148        let tpe =
4149            SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
4150                .with_length(12)
4151                .with_converted_type(ConvertedType::INTERVAL)
4152                .build()
4153                .unwrap();
4154        ColumnDescriptor::new(Arc::new(tpe), 0, 0, path)
4155    }
4156
4157    /// Returns column writer for UINT32 Column provided as ConvertedType only
4158    fn get_test_unsigned_int_given_as_converted_column_writer<'a, T: DataType>(
4159        page_writer: Box<dyn PageWriter + 'a>,
4160        max_def_level: i16,
4161        max_rep_level: i16,
4162        props: WriterPropertiesPtr,
4163    ) -> ColumnWriterImpl<'a, T> {
4164        let descr = Arc::new(get_test_converted_type_unsigned_integer_column_descr::<T>(
4165            max_def_level,
4166            max_rep_level,
4167        ));
4168        let column_writer = get_column_writer(descr, props, page_writer);
4169        get_typed_column_writer::<T>(column_writer)
4170    }
4171
4172    /// Returns column descriptor for UINT32 Column provided as ConvertedType only
4173    fn get_test_converted_type_unsigned_integer_column_descr<T: DataType>(
4174        max_def_level: i16,
4175        max_rep_level: i16,
4176    ) -> ColumnDescriptor {
4177        let path = ColumnPath::from("col");
4178        let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4179            .with_converted_type(ConvertedType::UINT_32)
4180            .build()
4181            .unwrap();
4182        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4183    }
4184}