parquet/column/writer/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains column writer API.
19
20use bytes::Bytes;
21use half::f16;
22
23use crate::bloom_filter::Sbbf;
24use crate::format::{BoundaryOrder, ColumnIndex, OffsetIndex};
25use std::collections::{BTreeSet, VecDeque};
26use std::str;
27
28use crate::basic::{Compression, ConvertedType, Encoding, LogicalType, PageType, Type};
29use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter};
30use crate::column::writer::encoder::{ColumnValueEncoder, ColumnValueEncoderImpl, ColumnValues};
31use crate::compression::{create_codec, Codec, CodecOptionsBuilder};
32use crate::data_type::private::ParquetValueType;
33use crate::data_type::*;
34use crate::encodings::levels::LevelEncoder;
35#[cfg(feature = "encryption")]
36use crate::encryption::encrypt::get_column_crypto_metadata;
37use crate::errors::{ParquetError, Result};
38use crate::file::metadata::{
39    ColumnChunkMetaData, ColumnChunkMetaDataBuilder, ColumnIndexBuilder, LevelHistogram,
40    OffsetIndexBuilder,
41};
42use crate::file::page_encoding_stats::PageEncodingStats;
43use crate::file::properties::{
44    EnabledStatistics, WriterProperties, WriterPropertiesPtr, WriterVersion,
45};
46use crate::file::statistics::{Statistics, ValueStatistics};
47use crate::schema::types::{ColumnDescPtr, ColumnDescriptor};
48
49pub(crate) mod encoder;
50
51macro_rules! downcast_writer {
52    ($e:expr, $i:ident, $b:expr) => {
53        match $e {
54            Self::BoolColumnWriter($i) => $b,
55            Self::Int32ColumnWriter($i) => $b,
56            Self::Int64ColumnWriter($i) => $b,
57            Self::Int96ColumnWriter($i) => $b,
58            Self::FloatColumnWriter($i) => $b,
59            Self::DoubleColumnWriter($i) => $b,
60            Self::ByteArrayColumnWriter($i) => $b,
61            Self::FixedLenByteArrayColumnWriter($i) => $b,
62        }
63    };
64}
65
66/// Column writer for a Parquet type.
67pub enum ColumnWriter<'a> {
68    /// Column writer for boolean type
69    BoolColumnWriter(ColumnWriterImpl<'a, BoolType>),
70    /// Column writer for int32 type
71    Int32ColumnWriter(ColumnWriterImpl<'a, Int32Type>),
72    /// Column writer for int64 type
73    Int64ColumnWriter(ColumnWriterImpl<'a, Int64Type>),
74    /// Column writer for int96 (timestamp) type
75    Int96ColumnWriter(ColumnWriterImpl<'a, Int96Type>),
76    /// Column writer for float type
77    FloatColumnWriter(ColumnWriterImpl<'a, FloatType>),
78    /// Column writer for double type
79    DoubleColumnWriter(ColumnWriterImpl<'a, DoubleType>),
80    /// Column writer for byte array type
81    ByteArrayColumnWriter(ColumnWriterImpl<'a, ByteArrayType>),
82    /// Column writer for fixed length byte array type
83    FixedLenByteArrayColumnWriter(ColumnWriterImpl<'a, FixedLenByteArrayType>),
84}
85
86impl ColumnWriter<'_> {
87    /// Returns the estimated total memory usage
88    #[cfg(feature = "arrow")]
89    pub(crate) fn memory_size(&self) -> usize {
90        downcast_writer!(self, typed, typed.memory_size())
91    }
92
93    /// Returns the estimated total encoded bytes for this column writer
94    #[cfg(feature = "arrow")]
95    pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
96        downcast_writer!(self, typed, typed.get_estimated_total_bytes())
97    }
98
99    /// Close this [`ColumnWriter`]
100    pub fn close(self) -> Result<ColumnCloseResult> {
101        downcast_writer!(self, typed, typed.close())
102    }
103}
104
105#[deprecated(
106    since = "54.0.0",
107    note = "Seems like a stray and nobody knows what's it for. Will be removed in the next release."
108)]
109#[allow(missing_docs)]
110pub enum Level {
111    Page,
112    Column,
113}
114
115/// Gets a specific column writer corresponding to column descriptor `descr`.
116pub fn get_column_writer<'a>(
117    descr: ColumnDescPtr,
118    props: WriterPropertiesPtr,
119    page_writer: Box<dyn PageWriter + 'a>,
120) -> ColumnWriter<'a> {
121    match descr.physical_type() {
122        Type::BOOLEAN => {
123            ColumnWriter::BoolColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
124        }
125        Type::INT32 => {
126            ColumnWriter::Int32ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
127        }
128        Type::INT64 => {
129            ColumnWriter::Int64ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
130        }
131        Type::INT96 => {
132            ColumnWriter::Int96ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
133        }
134        Type::FLOAT => {
135            ColumnWriter::FloatColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
136        }
137        Type::DOUBLE => {
138            ColumnWriter::DoubleColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
139        }
140        Type::BYTE_ARRAY => {
141            ColumnWriter::ByteArrayColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
142        }
143        Type::FIXED_LEN_BYTE_ARRAY => ColumnWriter::FixedLenByteArrayColumnWriter(
144            ColumnWriterImpl::new(descr, props, page_writer),
145        ),
146    }
147}
148
149/// Gets a typed column writer for the specific type `T`, by "up-casting" `col_writer` of
150/// non-generic type to a generic column writer type `ColumnWriterImpl`.
151///
152/// Panics if actual enum value for `col_writer` does not match the type `T`.
153pub fn get_typed_column_writer<T: DataType>(col_writer: ColumnWriter) -> ColumnWriterImpl<T> {
154    T::get_column_writer(col_writer).unwrap_or_else(|| {
155        panic!(
156            "Failed to convert column writer into a typed column writer for `{}` type",
157            T::get_physical_type()
158        )
159    })
160}
161
162/// Similar to `get_typed_column_writer` but returns a reference.
163pub fn get_typed_column_writer_ref<'a, 'b: 'a, T: DataType>(
164    col_writer: &'b ColumnWriter<'a>,
165) -> &'b ColumnWriterImpl<'a, T> {
166    T::get_column_writer_ref(col_writer).unwrap_or_else(|| {
167        panic!(
168            "Failed to convert column writer into a typed column writer for `{}` type",
169            T::get_physical_type()
170        )
171    })
172}
173
174/// Similar to `get_typed_column_writer` but returns a reference.
175pub fn get_typed_column_writer_mut<'a, 'b: 'a, T: DataType>(
176    col_writer: &'a mut ColumnWriter<'b>,
177) -> &'a mut ColumnWriterImpl<'b, T> {
178    T::get_column_writer_mut(col_writer).unwrap_or_else(|| {
179        panic!(
180            "Failed to convert column writer into a typed column writer for `{}` type",
181            T::get_physical_type()
182        )
183    })
184}
185
186/// Metadata returned by [`GenericColumnWriter::close`]
187#[derive(Debug, Clone)]
188pub struct ColumnCloseResult {
189    /// The total number of bytes written
190    pub bytes_written: u64,
191    /// The total number of rows written
192    pub rows_written: u64,
193    /// Metadata for this column chunk
194    pub metadata: ColumnChunkMetaData,
195    /// Optional bloom filter for this column
196    pub bloom_filter: Option<Sbbf>,
197    /// Optional column index, for filtering
198    pub column_index: Option<ColumnIndex>,
199    /// Optional offset index, identifying page locations
200    pub offset_index: Option<OffsetIndex>,
201}
202
203// Metrics per page
204#[derive(Default)]
205struct PageMetrics {
206    num_buffered_values: u32,
207    num_buffered_rows: u32,
208    num_page_nulls: u64,
209    repetition_level_histogram: Option<LevelHistogram>,
210    definition_level_histogram: Option<LevelHistogram>,
211}
212
213impl PageMetrics {
214    fn new() -> Self {
215        Default::default()
216    }
217
218    /// Initialize the repetition level histogram
219    fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
220        self.repetition_level_histogram = LevelHistogram::try_new(max_level);
221        self
222    }
223
224    /// Initialize the definition level histogram
225    fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
226        self.definition_level_histogram = LevelHistogram::try_new(max_level);
227        self
228    }
229
230    /// Resets the state of this `PageMetrics` to the initial state.
231    /// If histograms have been initialized their contents will be reset to zero.
232    fn new_page(&mut self) {
233        self.num_buffered_values = 0;
234        self.num_buffered_rows = 0;
235        self.num_page_nulls = 0;
236        self.repetition_level_histogram
237            .as_mut()
238            .map(LevelHistogram::reset);
239        self.definition_level_histogram
240            .as_mut()
241            .map(LevelHistogram::reset);
242    }
243
244    /// Updates histogram values using provided repetition levels
245    fn update_repetition_level_histogram(&mut self, levels: &[i16]) {
246        if let Some(ref mut rep_hist) = self.repetition_level_histogram {
247            rep_hist.update_from_levels(levels);
248        }
249    }
250
251    /// Updates histogram values using provided definition levels
252    fn update_definition_level_histogram(&mut self, levels: &[i16]) {
253        if let Some(ref mut def_hist) = self.definition_level_histogram {
254            def_hist.update_from_levels(levels);
255        }
256    }
257}
258
259// Metrics per column writer
260#[derive(Default)]
261struct ColumnMetrics<T: Default> {
262    total_bytes_written: u64,
263    total_rows_written: u64,
264    total_uncompressed_size: u64,
265    total_compressed_size: u64,
266    total_num_values: u64,
267    dictionary_page_offset: Option<u64>,
268    data_page_offset: Option<u64>,
269    min_column_value: Option<T>,
270    max_column_value: Option<T>,
271    num_column_nulls: u64,
272    column_distinct_count: Option<u64>,
273    variable_length_bytes: Option<i64>,
274    repetition_level_histogram: Option<LevelHistogram>,
275    definition_level_histogram: Option<LevelHistogram>,
276}
277
278impl<T: Default> ColumnMetrics<T> {
279    fn new() -> Self {
280        Default::default()
281    }
282
283    /// Initialize the repetition level histogram
284    fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
285        self.repetition_level_histogram = LevelHistogram::try_new(max_level);
286        self
287    }
288
289    /// Initialize the definition level histogram
290    fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
291        self.definition_level_histogram = LevelHistogram::try_new(max_level);
292        self
293    }
294
295    /// Sum `page_histogram` into `chunk_histogram`
296    fn update_histogram(
297        chunk_histogram: &mut Option<LevelHistogram>,
298        page_histogram: &Option<LevelHistogram>,
299    ) {
300        if let (Some(page_hist), Some(chunk_hist)) = (page_histogram, chunk_histogram) {
301            chunk_hist.add(page_hist);
302        }
303    }
304
305    /// Sum the provided PageMetrics histograms into the chunk histograms. Does nothing if
306    /// page histograms are not initialized.
307    fn update_from_page_metrics(&mut self, page_metrics: &PageMetrics) {
308        ColumnMetrics::<T>::update_histogram(
309            &mut self.definition_level_histogram,
310            &page_metrics.definition_level_histogram,
311        );
312        ColumnMetrics::<T>::update_histogram(
313            &mut self.repetition_level_histogram,
314            &page_metrics.repetition_level_histogram,
315        );
316    }
317
318    /// Sum the provided page variable_length_bytes into the chunk variable_length_bytes
319    fn update_variable_length_bytes(&mut self, variable_length_bytes: Option<i64>) {
320        if let Some(var_bytes) = variable_length_bytes {
321            *self.variable_length_bytes.get_or_insert(0) += var_bytes;
322        }
323    }
324}
325
326/// Typed column writer for a primitive column.
327pub type ColumnWriterImpl<'a, T> = GenericColumnWriter<'a, ColumnValueEncoderImpl<T>>;
328
329/// Generic column writer for a primitive column.
330pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> {
331    // Column writer properties
332    descr: ColumnDescPtr,
333    props: WriterPropertiesPtr,
334    statistics_enabled: EnabledStatistics,
335
336    page_writer: Box<dyn PageWriter + 'a>,
337    codec: Compression,
338    compressor: Option<Box<dyn Codec>>,
339    encoder: E,
340
341    page_metrics: PageMetrics,
342    // Metrics per column writer
343    column_metrics: ColumnMetrics<E::T>,
344
345    /// The order of encodings within the generated metadata does not impact its meaning,
346    /// but we use a BTreeSet so that the output is deterministic
347    encodings: BTreeSet<Encoding>,
348    encoding_stats: Vec<PageEncodingStats>,
349    // Reused buffers
350    def_levels_sink: Vec<i16>,
351    rep_levels_sink: Vec<i16>,
352    data_pages: VecDeque<CompressedPage>,
353    // column index and offset index
354    column_index_builder: ColumnIndexBuilder,
355    offset_index_builder: Option<OffsetIndexBuilder>,
356
357    // Below fields used to incrementally check boundary order across data pages.
358    // We assume they are ascending/descending until proven wrong.
359    data_page_boundary_ascending: bool,
360    data_page_boundary_descending: bool,
361    /// (min, max)
362    last_non_null_data_page_min_max: Option<(E::T, E::T)>,
363}
364
365impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
366    /// Returns a new instance of [`GenericColumnWriter`].
367    pub fn new(
368        descr: ColumnDescPtr,
369        props: WriterPropertiesPtr,
370        page_writer: Box<dyn PageWriter + 'a>,
371    ) -> Self {
372        let codec = props.compression(descr.path());
373        let codec_options = CodecOptionsBuilder::default().build();
374        let compressor = create_codec(codec, &codec_options).unwrap();
375        let encoder = E::try_new(&descr, props.as_ref()).unwrap();
376
377        let statistics_enabled = props.statistics_enabled(descr.path());
378
379        let mut encodings = BTreeSet::new();
380        // Used for level information
381        encodings.insert(Encoding::RLE);
382
383        let mut page_metrics = PageMetrics::new();
384        let mut column_metrics = ColumnMetrics::<E::T>::new();
385
386        // Initialize level histograms if collecting page or chunk statistics
387        if statistics_enabled != EnabledStatistics::None {
388            page_metrics = page_metrics
389                .with_repetition_level_histogram(descr.max_rep_level())
390                .with_definition_level_histogram(descr.max_def_level());
391            column_metrics = column_metrics
392                .with_repetition_level_histogram(descr.max_rep_level())
393                .with_definition_level_histogram(descr.max_def_level())
394        }
395
396        // Disable column_index_builder if not collecting page statistics.
397        let mut column_index_builder = ColumnIndexBuilder::new();
398        if statistics_enabled != EnabledStatistics::Page {
399            column_index_builder.to_invalid()
400        }
401
402        // Disable offset_index_builder if requested by user.
403        let offset_index_builder = match props.offset_index_disabled() {
404            false => Some(OffsetIndexBuilder::new()),
405            _ => None,
406        };
407
408        Self {
409            descr,
410            props,
411            statistics_enabled,
412            page_writer,
413            codec,
414            compressor,
415            encoder,
416            def_levels_sink: vec![],
417            rep_levels_sink: vec![],
418            data_pages: VecDeque::new(),
419            page_metrics,
420            column_metrics,
421            column_index_builder,
422            offset_index_builder,
423            encodings,
424            encoding_stats: vec![],
425            data_page_boundary_ascending: true,
426            data_page_boundary_descending: true,
427            last_non_null_data_page_min_max: None,
428        }
429    }
430
431    #[allow(clippy::too_many_arguments)]
432    pub(crate) fn write_batch_internal(
433        &mut self,
434        values: &E::Values,
435        value_indices: Option<&[usize]>,
436        def_levels: Option<&[i16]>,
437        rep_levels: Option<&[i16]>,
438        min: Option<&E::T>,
439        max: Option<&E::T>,
440        distinct_count: Option<u64>,
441    ) -> Result<usize> {
442        // Check if number of definition levels is the same as number of repetition levels.
443        if let (Some(def), Some(rep)) = (def_levels, rep_levels) {
444            if def.len() != rep.len() {
445                return Err(general_err!(
446                    "Inconsistent length of definition and repetition levels: {} != {}",
447                    def.len(),
448                    rep.len()
449                ));
450            }
451        }
452
453        // We check for DataPage limits only after we have inserted the values. If a user
454        // writes a large number of values, the DataPage size can be well above the limit.
455        //
456        // The purpose of this chunking is to bound this. Even if a user writes large
457        // number of values, the chunking will ensure that we add data page at a
458        // reasonable pagesize limit.
459
460        // TODO: find out why we don't account for size of levels when we estimate page
461        // size.
462
463        let num_levels = match def_levels {
464            Some(def_levels) => def_levels.len(),
465            None => values.len(),
466        };
467
468        if let Some(min) = min {
469            update_min(&self.descr, min, &mut self.column_metrics.min_column_value);
470        }
471        if let Some(max) = max {
472            update_max(&self.descr, max, &mut self.column_metrics.max_column_value);
473        }
474
475        // We can only set the distinct count if there are no other writes
476        if self.encoder.num_values() == 0 {
477            self.column_metrics.column_distinct_count = distinct_count;
478        } else {
479            self.column_metrics.column_distinct_count = None;
480        }
481
482        let mut values_offset = 0;
483        let mut levels_offset = 0;
484        let base_batch_size = self.props.write_batch_size();
485        while levels_offset < num_levels {
486            let mut end_offset = num_levels.min(levels_offset + base_batch_size);
487
488            // Split at record boundary
489            if let Some(r) = rep_levels {
490                while end_offset < r.len() && r[end_offset] != 0 {
491                    end_offset += 1;
492                }
493            }
494
495            values_offset += self.write_mini_batch(
496                values,
497                values_offset,
498                value_indices,
499                end_offset - levels_offset,
500                def_levels.map(|lv| &lv[levels_offset..end_offset]),
501                rep_levels.map(|lv| &lv[levels_offset..end_offset]),
502            )?;
503            levels_offset = end_offset;
504        }
505
506        // Return total number of values processed.
507        Ok(values_offset)
508    }
509
510    /// Writes batch of values, definition levels and repetition levels.
511    /// Returns number of values processed (written).
512    ///
513    /// If definition and repetition levels are provided, we write fully those levels and
514    /// select how many values to write (this number will be returned), since number of
515    /// actual written values may be smaller than provided values.
516    ///
517    /// If only values are provided, then all values are written and the length of
518    /// of the values buffer is returned.
519    ///
520    /// Definition and/or repetition levels can be omitted, if values are
521    /// non-nullable and/or non-repeated.
522    pub fn write_batch(
523        &mut self,
524        values: &E::Values,
525        def_levels: Option<&[i16]>,
526        rep_levels: Option<&[i16]>,
527    ) -> Result<usize> {
528        self.write_batch_internal(values, None, def_levels, rep_levels, None, None, None)
529    }
530
531    /// Writer may optionally provide pre-calculated statistics for use when computing
532    /// chunk-level statistics
533    ///
534    /// NB: [`WriterProperties::statistics_enabled`] must be set to [`EnabledStatistics::Chunk`]
535    /// for these statistics to take effect. If [`EnabledStatistics::None`] they will be ignored,
536    /// and if [`EnabledStatistics::Page`] the chunk statistics will instead be computed from the
537    /// computed page statistics
538    pub fn write_batch_with_statistics(
539        &mut self,
540        values: &E::Values,
541        def_levels: Option<&[i16]>,
542        rep_levels: Option<&[i16]>,
543        min: Option<&E::T>,
544        max: Option<&E::T>,
545        distinct_count: Option<u64>,
546    ) -> Result<usize> {
547        self.write_batch_internal(
548            values,
549            None,
550            def_levels,
551            rep_levels,
552            min,
553            max,
554            distinct_count,
555        )
556    }
557
558    /// Returns the estimated total memory usage.
559    ///
560    /// Unlike [`Self::get_estimated_total_bytes`] this is an estimate
561    /// of the current memory usage and not the final anticipated encoded size.
562    #[cfg(feature = "arrow")]
563    pub(crate) fn memory_size(&self) -> usize {
564        self.column_metrics.total_bytes_written as usize + self.encoder.estimated_memory_size()
565    }
566
567    /// Returns total number of bytes written by this column writer so far.
568    /// This value is also returned when column writer is closed.
569    ///
570    /// Note: this value does not include any buffered data that has not
571    /// yet been flushed to a page.
572    pub fn get_total_bytes_written(&self) -> u64 {
573        self.column_metrics.total_bytes_written
574    }
575
576    /// Returns the estimated total encoded bytes for this column writer.
577    ///
578    /// Unlike [`Self::get_total_bytes_written`] this includes an estimate
579    /// of any data that has not yet been flushed to a page, based on it's
580    /// anticipated encoded size.
581    #[cfg(feature = "arrow")]
582    pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
583        self.data_pages
584            .iter()
585            .map(|page| page.data().len() as u64)
586            .sum::<u64>()
587            + self.column_metrics.total_bytes_written
588            + self.encoder.estimated_data_page_size() as u64
589            + self.encoder.estimated_dict_page_size().unwrap_or_default() as u64
590    }
591
592    /// Returns total number of rows written by this column writer so far.
593    /// This value is also returned when column writer is closed.
594    pub fn get_total_rows_written(&self) -> u64 {
595        self.column_metrics.total_rows_written
596    }
597
598    /// Returns a reference to a [`ColumnDescPtr`]
599    pub fn get_descriptor(&self) -> &ColumnDescPtr {
600        &self.descr
601    }
602
603    /// Finalizes writes and closes the column writer.
604    /// Returns total bytes written, total rows written and column chunk metadata.
605    pub fn close(mut self) -> Result<ColumnCloseResult> {
606        if self.page_metrics.num_buffered_values > 0 {
607            self.add_data_page()?;
608        }
609        if self.encoder.has_dictionary() {
610            self.write_dictionary_page()?;
611        }
612        self.flush_data_pages()?;
613        let metadata = self.build_column_metadata()?;
614        self.page_writer.close()?;
615
616        let boundary_order = match (
617            self.data_page_boundary_ascending,
618            self.data_page_boundary_descending,
619        ) {
620            // If the lists are composed of equal elements then will be marked as ascending
621            // (Also the case if all pages are null pages)
622            (true, _) => BoundaryOrder::ASCENDING,
623            (false, true) => BoundaryOrder::DESCENDING,
624            (false, false) => BoundaryOrder::UNORDERED,
625        };
626        self.column_index_builder.set_boundary_order(boundary_order);
627
628        let column_index = self
629            .column_index_builder
630            .valid()
631            .then(|| self.column_index_builder.build_to_thrift());
632
633        let offset_index = self.offset_index_builder.map(|b| b.build_to_thrift());
634
635        Ok(ColumnCloseResult {
636            bytes_written: self.column_metrics.total_bytes_written,
637            rows_written: self.column_metrics.total_rows_written,
638            bloom_filter: self.encoder.flush_bloom_filter(),
639            metadata,
640            column_index,
641            offset_index,
642        })
643    }
644
645    /// Writes mini batch of values, definition and repetition levels.
646    /// This allows fine-grained processing of values and maintaining a reasonable
647    /// page size.
648    fn write_mini_batch(
649        &mut self,
650        values: &E::Values,
651        values_offset: usize,
652        value_indices: Option<&[usize]>,
653        num_levels: usize,
654        def_levels: Option<&[i16]>,
655        rep_levels: Option<&[i16]>,
656    ) -> Result<usize> {
657        // Process definition levels and determine how many values to write.
658        let values_to_write = if self.descr.max_def_level() > 0 {
659            let levels = def_levels.ok_or_else(|| {
660                general_err!(
661                    "Definition levels are required, because max definition level = {}",
662                    self.descr.max_def_level()
663                )
664            })?;
665
666            let mut values_to_write = 0;
667            for &level in levels {
668                if level == self.descr.max_def_level() {
669                    values_to_write += 1;
670                } else {
671                    // We must always compute this as it is used to populate v2 pages
672                    self.page_metrics.num_page_nulls += 1
673                }
674            }
675
676            // Update histogram
677            self.page_metrics.update_definition_level_histogram(levels);
678
679            self.def_levels_sink.extend_from_slice(levels);
680            values_to_write
681        } else {
682            num_levels
683        };
684
685        // Process repetition levels and determine how many rows we are about to process.
686        if self.descr.max_rep_level() > 0 {
687            // A row could contain more than one value.
688            let levels = rep_levels.ok_or_else(|| {
689                general_err!(
690                    "Repetition levels are required, because max repetition level = {}",
691                    self.descr.max_rep_level()
692                )
693            })?;
694
695            if !levels.is_empty() && levels[0] != 0 {
696                return Err(general_err!(
697                    "Write must start at a record boundary, got non-zero repetition level of {}",
698                    levels[0]
699                ));
700            }
701
702            // Count the occasions where we start a new row
703            for &level in levels {
704                self.page_metrics.num_buffered_rows += (level == 0) as u32
705            }
706
707            // Update histogram
708            self.page_metrics.update_repetition_level_histogram(levels);
709
710            self.rep_levels_sink.extend_from_slice(levels);
711        } else {
712            // Each value is exactly one row.
713            // Equals to the number of values, we count nulls as well.
714            self.page_metrics.num_buffered_rows += num_levels as u32;
715        }
716
717        match value_indices {
718            Some(indices) => {
719                let indices = &indices[values_offset..values_offset + values_to_write];
720                self.encoder.write_gather(values, indices)?;
721            }
722            None => self.encoder.write(values, values_offset, values_to_write)?,
723        }
724
725        self.page_metrics.num_buffered_values += num_levels as u32;
726
727        if self.should_add_data_page() {
728            self.add_data_page()?;
729        }
730
731        if self.should_dict_fallback() {
732            self.dict_fallback()?;
733        }
734
735        Ok(values_to_write)
736    }
737
738    /// Returns true if we need to fall back to non-dictionary encoding.
739    ///
740    /// We can only fall back if dictionary encoder is set and we have exceeded dictionary
741    /// size.
742    #[inline]
743    fn should_dict_fallback(&self) -> bool {
744        match self.encoder.estimated_dict_page_size() {
745            Some(size) => size >= self.props.dictionary_page_size_limit(),
746            None => false,
747        }
748    }
749
750    /// Returns true if there is enough data for a data page, false otherwise.
751    #[inline]
752    fn should_add_data_page(&self) -> bool {
753        // This is necessary in the event of a much larger dictionary size than page size
754        //
755        // In such a scenario the dictionary decoder may return an estimated encoded
756        // size in excess of the page size limit, even when there are no buffered values
757        if self.page_metrics.num_buffered_values == 0 {
758            return false;
759        }
760
761        self.page_metrics.num_buffered_rows as usize >= self.props.data_page_row_count_limit()
762            || self.encoder.estimated_data_page_size() >= self.props.data_page_size_limit()
763    }
764
765    /// Performs dictionary fallback.
766    /// Prepares and writes dictionary and all data pages into page writer.
767    fn dict_fallback(&mut self) -> Result<()> {
768        // At this point we know that we need to fall back.
769        if self.page_metrics.num_buffered_values > 0 {
770            self.add_data_page()?;
771        }
772        self.write_dictionary_page()?;
773        self.flush_data_pages()?;
774        Ok(())
775    }
776
777    /// Update the column index and offset index when adding the data page
778    fn update_column_offset_index(
779        &mut self,
780        page_statistics: Option<&ValueStatistics<E::T>>,
781        page_variable_length_bytes: Option<i64>,
782    ) {
783        // update the column index
784        let null_page =
785            (self.page_metrics.num_buffered_rows as u64) == self.page_metrics.num_page_nulls;
786        // a page contains only null values,
787        // and writers have to set the corresponding entries in min_values and max_values to byte[0]
788        if null_page && self.column_index_builder.valid() {
789            self.column_index_builder.append(
790                null_page,
791                vec![],
792                vec![],
793                self.page_metrics.num_page_nulls as i64,
794            );
795        } else if self.column_index_builder.valid() {
796            // from page statistics
797            // If can't get the page statistics, ignore this column/offset index for this column chunk
798            match &page_statistics {
799                None => {
800                    self.column_index_builder.to_invalid();
801                }
802                Some(stat) => {
803                    // Check if min/max are still ascending/descending across pages
804                    let new_min = stat.min_opt().unwrap();
805                    let new_max = stat.max_opt().unwrap();
806                    if let Some((last_min, last_max)) = &self.last_non_null_data_page_min_max {
807                        if self.data_page_boundary_ascending {
808                            // If last min/max are greater than new min/max then not ascending anymore
809                            let not_ascending = compare_greater(&self.descr, last_min, new_min)
810                                || compare_greater(&self.descr, last_max, new_max);
811                            if not_ascending {
812                                self.data_page_boundary_ascending = false;
813                            }
814                        }
815
816                        if self.data_page_boundary_descending {
817                            // If new min/max are greater than last min/max then not descending anymore
818                            let not_descending = compare_greater(&self.descr, new_min, last_min)
819                                || compare_greater(&self.descr, new_max, last_max);
820                            if not_descending {
821                                self.data_page_boundary_descending = false;
822                            }
823                        }
824                    }
825                    self.last_non_null_data_page_min_max = Some((new_min.clone(), new_max.clone()));
826
827                    if self.can_truncate_value() {
828                        self.column_index_builder.append(
829                            null_page,
830                            self.truncate_min_value(
831                                self.props.column_index_truncate_length(),
832                                stat.min_bytes_opt().unwrap(),
833                            )
834                            .0,
835                            self.truncate_max_value(
836                                self.props.column_index_truncate_length(),
837                                stat.max_bytes_opt().unwrap(),
838                            )
839                            .0,
840                            self.page_metrics.num_page_nulls as i64,
841                        );
842                    } else {
843                        self.column_index_builder.append(
844                            null_page,
845                            stat.min_bytes_opt().unwrap().to_vec(),
846                            stat.max_bytes_opt().unwrap().to_vec(),
847                            self.page_metrics.num_page_nulls as i64,
848                        );
849                    }
850                }
851            }
852        }
853
854        // Append page histograms to the `ColumnIndex` histograms
855        self.column_index_builder.append_histograms(
856            &self.page_metrics.repetition_level_histogram,
857            &self.page_metrics.definition_level_histogram,
858        );
859
860        // Update the offset index
861        if let Some(builder) = self.offset_index_builder.as_mut() {
862            builder.append_row_count(self.page_metrics.num_buffered_rows as i64);
863            builder.append_unencoded_byte_array_data_bytes(page_variable_length_bytes);
864        }
865    }
866
867    /// Determine if we should allow truncating min/max values for this column's statistics
868    fn can_truncate_value(&self) -> bool {
869        match self.descr.physical_type() {
870            // Don't truncate for Float16 and Decimal because their sort order is different
871            // from that of FIXED_LEN_BYTE_ARRAY sort order.
872            // So truncation of those types could lead to inaccurate min/max statistics
873            Type::FIXED_LEN_BYTE_ARRAY
874                if !matches!(
875                    self.descr.logical_type(),
876                    Some(LogicalType::Decimal { .. }) | Some(LogicalType::Float16)
877                ) =>
878            {
879                true
880            }
881            Type::BYTE_ARRAY => true,
882            // Truncation only applies for fba/binary physical types
883            _ => false,
884        }
885    }
886
887    /// Returns `true` if this column's logical type is a UTF-8 string.
888    fn is_utf8(&self) -> bool {
889        self.get_descriptor().logical_type() == Some(LogicalType::String)
890            || self.get_descriptor().converted_type() == ConvertedType::UTF8
891    }
892
893    /// Truncates a binary statistic to at most `truncation_length` bytes.
894    ///
895    /// If truncation is not possible, returns `data`.
896    ///
897    /// The `bool` in the returned tuple indicates whether truncation occurred or not.
898    ///
899    /// UTF-8 Note:
900    /// If the column type indicates UTF-8, and `data` contains valid UTF-8, then the result will
901    /// also remain valid UTF-8, but may be less tnan `truncation_length` bytes to avoid splitting
902    /// on non-character boundaries.
903    fn truncate_min_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
904        truncation_length
905            .filter(|l| data.len() > *l)
906            .and_then(|l|
907                // don't do extra work if this column isn't UTF-8
908                if self.is_utf8() {
909                    match str::from_utf8(data) {
910                        Ok(str_data) => truncate_utf8(str_data, l),
911                        Err(_) => Some(data[..l].to_vec()),
912                    }
913                } else {
914                    Some(data[..l].to_vec())
915                }
916            )
917            .map(|truncated| (truncated, true))
918            .unwrap_or_else(|| (data.to_vec(), false))
919    }
920
921    /// Truncates a binary statistic to at most `truncation_length` bytes, and then increment the
922    /// final byte(s) to yield a valid upper bound. This may result in a result of less than
923    /// `truncation_length` bytes if the last byte(s) overflows.
924    ///
925    /// If truncation is not possible, returns `data`.
926    ///
927    /// The `bool` in the returned tuple indicates whether truncation occurred or not.
928    ///
929    /// UTF-8 Note:
930    /// If the column type indicates UTF-8, and `data` contains valid UTF-8, then the result will
931    /// also remain valid UTF-8 (but again may be less than `truncation_length` bytes). If `data`
932    /// does not contain valid UTF-8, then truncation will occur as if the column is non-string
933    /// binary.
934    fn truncate_max_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
935        truncation_length
936            .filter(|l| data.len() > *l)
937            .and_then(|l|
938                // don't do extra work if this column isn't UTF-8
939                if self.is_utf8() {
940                    match str::from_utf8(data) {
941                        Ok(str_data) => truncate_and_increment_utf8(str_data, l),
942                        Err(_) => increment(data[..l].to_vec()),
943                    }
944                } else {
945                    increment(data[..l].to_vec())
946                }
947            )
948            .map(|truncated| (truncated, true))
949            .unwrap_or_else(|| (data.to_vec(), false))
950    }
951
952    /// Truncate the min and max values that will be written to a data page
953    /// header or column chunk Statistics
954    fn truncate_statistics(&self, statistics: Statistics) -> Statistics {
955        let backwards_compatible_min_max = self.descr.sort_order().is_signed();
956        match statistics {
957            Statistics::ByteArray(stats) if stats._internal_has_min_max_set() => {
958                let (min, did_truncate_min) = self.truncate_min_value(
959                    self.props.statistics_truncate_length(),
960                    stats.min_bytes_opt().unwrap(),
961                );
962                let (max, did_truncate_max) = self.truncate_max_value(
963                    self.props.statistics_truncate_length(),
964                    stats.max_bytes_opt().unwrap(),
965                );
966                Statistics::ByteArray(
967                    ValueStatistics::new(
968                        Some(min.into()),
969                        Some(max.into()),
970                        stats.distinct_count(),
971                        stats.null_count_opt(),
972                        backwards_compatible_min_max,
973                    )
974                    .with_max_is_exact(!did_truncate_max)
975                    .with_min_is_exact(!did_truncate_min),
976                )
977            }
978            Statistics::FixedLenByteArray(stats)
979                if (stats._internal_has_min_max_set() && self.can_truncate_value()) =>
980            {
981                let (min, did_truncate_min) = self.truncate_min_value(
982                    self.props.statistics_truncate_length(),
983                    stats.min_bytes_opt().unwrap(),
984                );
985                let (max, did_truncate_max) = self.truncate_max_value(
986                    self.props.statistics_truncate_length(),
987                    stats.max_bytes_opt().unwrap(),
988                );
989                Statistics::FixedLenByteArray(
990                    ValueStatistics::new(
991                        Some(min.into()),
992                        Some(max.into()),
993                        stats.distinct_count(),
994                        stats.null_count_opt(),
995                        backwards_compatible_min_max,
996                    )
997                    .with_max_is_exact(!did_truncate_max)
998                    .with_min_is_exact(!did_truncate_min),
999                )
1000            }
1001            stats => stats,
1002        }
1003    }
1004
1005    /// Adds data page.
1006    /// Data page is either buffered in case of dictionary encoding or written directly.
1007    fn add_data_page(&mut self) -> Result<()> {
1008        // Extract encoded values
1009        let values_data = self.encoder.flush_data_page()?;
1010
1011        let max_def_level = self.descr.max_def_level();
1012        let max_rep_level = self.descr.max_rep_level();
1013
1014        self.column_metrics.num_column_nulls += self.page_metrics.num_page_nulls;
1015
1016        let page_statistics = match (values_data.min_value, values_data.max_value) {
1017            (Some(min), Some(max)) => {
1018                // Update chunk level statistics
1019                update_min(&self.descr, &min, &mut self.column_metrics.min_column_value);
1020                update_max(&self.descr, &max, &mut self.column_metrics.max_column_value);
1021
1022                (self.statistics_enabled == EnabledStatistics::Page).then_some(
1023                    ValueStatistics::new(
1024                        Some(min),
1025                        Some(max),
1026                        None,
1027                        Some(self.page_metrics.num_page_nulls),
1028                        false,
1029                    ),
1030                )
1031            }
1032            _ => None,
1033        };
1034
1035        // update column and offset index
1036        self.update_column_offset_index(
1037            page_statistics.as_ref(),
1038            values_data.variable_length_bytes,
1039        );
1040
1041        // Update histograms and variable_length_bytes in column_metrics
1042        self.column_metrics
1043            .update_from_page_metrics(&self.page_metrics);
1044        self.column_metrics
1045            .update_variable_length_bytes(values_data.variable_length_bytes);
1046
1047        let page_statistics = page_statistics.map(Statistics::from);
1048        let page_statistics = page_statistics.map(|stats| self.truncate_statistics(stats));
1049
1050        let compressed_page = match self.props.writer_version() {
1051            WriterVersion::PARQUET_1_0 => {
1052                let mut buffer = vec![];
1053
1054                if max_rep_level > 0 {
1055                    buffer.extend_from_slice(
1056                        &self.encode_levels_v1(
1057                            Encoding::RLE,
1058                            &self.rep_levels_sink[..],
1059                            max_rep_level,
1060                        )[..],
1061                    );
1062                }
1063
1064                if max_def_level > 0 {
1065                    buffer.extend_from_slice(
1066                        &self.encode_levels_v1(
1067                            Encoding::RLE,
1068                            &self.def_levels_sink[..],
1069                            max_def_level,
1070                        )[..],
1071                    );
1072                }
1073
1074                buffer.extend_from_slice(&values_data.buf);
1075                let uncompressed_size = buffer.len();
1076
1077                if let Some(ref mut cmpr) = self.compressor {
1078                    let mut compressed_buf = Vec::with_capacity(uncompressed_size);
1079                    cmpr.compress(&buffer[..], &mut compressed_buf)?;
1080                    buffer = compressed_buf;
1081                }
1082
1083                let data_page = Page::DataPage {
1084                    buf: buffer.into(),
1085                    num_values: self.page_metrics.num_buffered_values,
1086                    encoding: values_data.encoding,
1087                    def_level_encoding: Encoding::RLE,
1088                    rep_level_encoding: Encoding::RLE,
1089                    statistics: page_statistics,
1090                };
1091
1092                CompressedPage::new(data_page, uncompressed_size)
1093            }
1094            WriterVersion::PARQUET_2_0 => {
1095                let mut rep_levels_byte_len = 0;
1096                let mut def_levels_byte_len = 0;
1097                let mut buffer = vec![];
1098
1099                if max_rep_level > 0 {
1100                    let levels = self.encode_levels_v2(&self.rep_levels_sink[..], max_rep_level);
1101                    rep_levels_byte_len = levels.len();
1102                    buffer.extend_from_slice(&levels[..]);
1103                }
1104
1105                if max_def_level > 0 {
1106                    let levels = self.encode_levels_v2(&self.def_levels_sink[..], max_def_level);
1107                    def_levels_byte_len = levels.len();
1108                    buffer.extend_from_slice(&levels[..]);
1109                }
1110
1111                let uncompressed_size =
1112                    rep_levels_byte_len + def_levels_byte_len + values_data.buf.len();
1113
1114                // Data Page v2 compresses values only.
1115                match self.compressor {
1116                    Some(ref mut cmpr) => {
1117                        cmpr.compress(&values_data.buf, &mut buffer)?;
1118                    }
1119                    None => buffer.extend_from_slice(&values_data.buf),
1120                }
1121
1122                let data_page = Page::DataPageV2 {
1123                    buf: buffer.into(),
1124                    num_values: self.page_metrics.num_buffered_values,
1125                    encoding: values_data.encoding,
1126                    num_nulls: self.page_metrics.num_page_nulls as u32,
1127                    num_rows: self.page_metrics.num_buffered_rows,
1128                    def_levels_byte_len: def_levels_byte_len as u32,
1129                    rep_levels_byte_len: rep_levels_byte_len as u32,
1130                    is_compressed: self.compressor.is_some(),
1131                    statistics: page_statistics,
1132                };
1133
1134                CompressedPage::new(data_page, uncompressed_size)
1135            }
1136        };
1137
1138        // Check if we need to buffer data page or flush it to the sink directly.
1139        if self.encoder.has_dictionary() {
1140            self.data_pages.push_back(compressed_page);
1141        } else {
1142            self.write_data_page(compressed_page)?;
1143        }
1144
1145        // Update total number of rows.
1146        self.column_metrics.total_rows_written += self.page_metrics.num_buffered_rows as u64;
1147
1148        // Reset state.
1149        self.rep_levels_sink.clear();
1150        self.def_levels_sink.clear();
1151        self.page_metrics.new_page();
1152
1153        Ok(())
1154    }
1155
1156    /// Finalises any outstanding data pages and flushes buffered data pages from
1157    /// dictionary encoding into underlying sink.
1158    #[inline]
1159    fn flush_data_pages(&mut self) -> Result<()> {
1160        // Write all outstanding data to a new page.
1161        if self.page_metrics.num_buffered_values > 0 {
1162            self.add_data_page()?;
1163        }
1164
1165        while let Some(page) = self.data_pages.pop_front() {
1166            self.write_data_page(page)?;
1167        }
1168
1169        Ok(())
1170    }
1171
1172    /// Assembles column chunk metadata.
1173    fn build_column_metadata(&mut self) -> Result<ColumnChunkMetaData> {
1174        let total_compressed_size = self.column_metrics.total_compressed_size as i64;
1175        let total_uncompressed_size = self.column_metrics.total_uncompressed_size as i64;
1176        let num_values = self.column_metrics.total_num_values as i64;
1177        let dict_page_offset = self.column_metrics.dictionary_page_offset.map(|v| v as i64);
1178        // If data page offset is not set, then no pages have been written
1179        let data_page_offset = self.column_metrics.data_page_offset.unwrap_or(0) as i64;
1180
1181        let mut builder = ColumnChunkMetaData::builder(self.descr.clone())
1182            .set_compression(self.codec)
1183            .set_encodings(self.encodings.iter().cloned().collect())
1184            .set_page_encoding_stats(self.encoding_stats.clone())
1185            .set_total_compressed_size(total_compressed_size)
1186            .set_total_uncompressed_size(total_uncompressed_size)
1187            .set_num_values(num_values)
1188            .set_data_page_offset(data_page_offset)
1189            .set_dictionary_page_offset(dict_page_offset);
1190
1191        if self.statistics_enabled != EnabledStatistics::None {
1192            let backwards_compatible_min_max = self.descr.sort_order().is_signed();
1193
1194            let statistics = ValueStatistics::<E::T>::new(
1195                self.column_metrics.min_column_value.clone(),
1196                self.column_metrics.max_column_value.clone(),
1197                self.column_metrics.column_distinct_count,
1198                Some(self.column_metrics.num_column_nulls),
1199                false,
1200            )
1201            .with_backwards_compatible_min_max(backwards_compatible_min_max)
1202            .into();
1203
1204            let statistics = self.truncate_statistics(statistics);
1205
1206            builder = builder
1207                .set_statistics(statistics)
1208                .set_unencoded_byte_array_data_bytes(self.column_metrics.variable_length_bytes)
1209                .set_repetition_level_histogram(
1210                    self.column_metrics.repetition_level_histogram.take(),
1211                )
1212                .set_definition_level_histogram(
1213                    self.column_metrics.definition_level_histogram.take(),
1214                );
1215        }
1216
1217        builder = self.set_column_chunk_encryption_properties(builder);
1218
1219        let metadata = builder.build()?;
1220        Ok(metadata)
1221    }
1222
1223    /// Encodes definition or repetition levels for Data Page v1.
1224    #[inline]
1225    fn encode_levels_v1(&self, encoding: Encoding, levels: &[i16], max_level: i16) -> Vec<u8> {
1226        let mut encoder = LevelEncoder::v1(encoding, max_level, levels.len());
1227        encoder.put(levels);
1228        encoder.consume()
1229    }
1230
1231    /// Encodes definition or repetition levels for Data Page v2.
1232    /// Encoding is always RLE.
1233    #[inline]
1234    fn encode_levels_v2(&self, levels: &[i16], max_level: i16) -> Vec<u8> {
1235        let mut encoder = LevelEncoder::v2(max_level, levels.len());
1236        encoder.put(levels);
1237        encoder.consume()
1238    }
1239
1240    /// Writes compressed data page into underlying sink and updates global metrics.
1241    #[inline]
1242    fn write_data_page(&mut self, page: CompressedPage) -> Result<()> {
1243        self.encodings.insert(page.encoding());
1244        match self.encoding_stats.last_mut() {
1245            Some(encoding_stats)
1246                if encoding_stats.page_type == page.page_type()
1247                    && encoding_stats.encoding == page.encoding() =>
1248            {
1249                encoding_stats.count += 1;
1250            }
1251            _ => {
1252                // data page type does not change inside a file
1253                // encoding can currently only change from dictionary to non-dictionary once
1254                self.encoding_stats.push(PageEncodingStats {
1255                    page_type: page.page_type(),
1256                    encoding: page.encoding(),
1257                    count: 1,
1258                });
1259            }
1260        }
1261        let page_spec = self.page_writer.write_page(page)?;
1262        // update offset index
1263        // compressed_size = header_size + compressed_data_size
1264        if let Some(builder) = self.offset_index_builder.as_mut() {
1265            builder
1266                .append_offset_and_size(page_spec.offset as i64, page_spec.compressed_size as i32)
1267        }
1268        self.update_metrics_for_page(page_spec);
1269        Ok(())
1270    }
1271
1272    /// Writes dictionary page into underlying sink.
1273    #[inline]
1274    fn write_dictionary_page(&mut self) -> Result<()> {
1275        let compressed_page = {
1276            let mut page = self
1277                .encoder
1278                .flush_dict_page()?
1279                .ok_or_else(|| general_err!("Dictionary encoder is not set"))?;
1280
1281            let uncompressed_size = page.buf.len();
1282
1283            if let Some(ref mut cmpr) = self.compressor {
1284                let mut output_buf = Vec::with_capacity(uncompressed_size);
1285                cmpr.compress(&page.buf, &mut output_buf)?;
1286                page.buf = Bytes::from(output_buf);
1287            }
1288
1289            let dict_page = Page::DictionaryPage {
1290                buf: page.buf,
1291                num_values: page.num_values as u32,
1292                encoding: self.props.dictionary_page_encoding(),
1293                is_sorted: page.is_sorted,
1294            };
1295            CompressedPage::new(dict_page, uncompressed_size)
1296        };
1297
1298        self.encodings.insert(compressed_page.encoding());
1299        self.encoding_stats.push(PageEncodingStats {
1300            page_type: PageType::DICTIONARY_PAGE,
1301            encoding: compressed_page.encoding(),
1302            count: 1,
1303        });
1304        let page_spec = self.page_writer.write_page(compressed_page)?;
1305        self.update_metrics_for_page(page_spec);
1306        // For the directory page, don't need to update column/offset index.
1307        Ok(())
1308    }
1309
1310    /// Updates column writer metrics with each page metadata.
1311    #[inline]
1312    fn update_metrics_for_page(&mut self, page_spec: PageWriteSpec) {
1313        self.column_metrics.total_uncompressed_size += page_spec.uncompressed_size as u64;
1314        self.column_metrics.total_compressed_size += page_spec.compressed_size as u64;
1315        self.column_metrics.total_bytes_written += page_spec.bytes_written;
1316
1317        match page_spec.page_type {
1318            PageType::DATA_PAGE | PageType::DATA_PAGE_V2 => {
1319                self.column_metrics.total_num_values += page_spec.num_values as u64;
1320                if self.column_metrics.data_page_offset.is_none() {
1321                    self.column_metrics.data_page_offset = Some(page_spec.offset);
1322                }
1323            }
1324            PageType::DICTIONARY_PAGE => {
1325                assert!(
1326                    self.column_metrics.dictionary_page_offset.is_none(),
1327                    "Dictionary offset is already set"
1328                );
1329                self.column_metrics.dictionary_page_offset = Some(page_spec.offset);
1330            }
1331            _ => {}
1332        }
1333    }
1334
1335    #[inline]
1336    #[cfg(feature = "encryption")]
1337    fn set_column_chunk_encryption_properties(
1338        &self,
1339        builder: ColumnChunkMetaDataBuilder,
1340    ) -> ColumnChunkMetaDataBuilder {
1341        if let Some(encryption_properties) = self.props.file_encryption_properties.as_ref() {
1342            builder.set_column_crypto_metadata(get_column_crypto_metadata(
1343                encryption_properties,
1344                &self.descr,
1345            ))
1346        } else {
1347            builder
1348        }
1349    }
1350
1351    #[inline]
1352    #[cfg(not(feature = "encryption"))]
1353    fn set_column_chunk_encryption_properties(
1354        &self,
1355        builder: ColumnChunkMetaDataBuilder,
1356    ) -> ColumnChunkMetaDataBuilder {
1357        builder
1358    }
1359}
1360
1361fn update_min<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, min: &mut Option<T>) {
1362    update_stat::<T, _>(descr, val, min, |cur| compare_greater(descr, cur, val))
1363}
1364
1365fn update_max<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, max: &mut Option<T>) {
1366    update_stat::<T, _>(descr, val, max, |cur| compare_greater(descr, val, cur))
1367}
1368
1369#[inline]
1370#[allow(clippy::eq_op)]
1371fn is_nan<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T) -> bool {
1372    match T::PHYSICAL_TYPE {
1373        Type::FLOAT | Type::DOUBLE => val != val,
1374        Type::FIXED_LEN_BYTE_ARRAY if descr.logical_type() == Some(LogicalType::Float16) => {
1375            let val = val.as_bytes();
1376            let val = f16::from_le_bytes([val[0], val[1]]);
1377            val.is_nan()
1378        }
1379        _ => false,
1380    }
1381}
1382
1383/// Perform a conditional update of `cur`, skipping any NaN values
1384///
1385/// If `cur` is `None`, sets `cur` to `Some(val)`, otherwise calls `should_update` with
1386/// the value of `cur`, and updates `cur` to `Some(val)` if it returns `true`
1387fn update_stat<T: ParquetValueType, F>(
1388    descr: &ColumnDescriptor,
1389    val: &T,
1390    cur: &mut Option<T>,
1391    should_update: F,
1392) where
1393    F: Fn(&T) -> bool,
1394{
1395    if is_nan(descr, val) {
1396        return;
1397    }
1398
1399    if cur.as_ref().map_or(true, should_update) {
1400        *cur = Some(val.clone());
1401    }
1402}
1403
1404/// Evaluate `a > b` according to underlying logical type.
1405fn compare_greater<T: ParquetValueType>(descr: &ColumnDescriptor, a: &T, b: &T) -> bool {
1406    if let Some(LogicalType::Integer { is_signed, .. }) = descr.logical_type() {
1407        if !is_signed {
1408            // need to compare unsigned
1409            return a.as_u64().unwrap() > b.as_u64().unwrap();
1410        }
1411    }
1412
1413    match descr.converted_type() {
1414        ConvertedType::UINT_8
1415        | ConvertedType::UINT_16
1416        | ConvertedType::UINT_32
1417        | ConvertedType::UINT_64 => {
1418            return a.as_u64().unwrap() > b.as_u64().unwrap();
1419        }
1420        _ => {}
1421    };
1422
1423    if let Some(LogicalType::Decimal { .. }) = descr.logical_type() {
1424        match T::PHYSICAL_TYPE {
1425            Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
1426                return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1427            }
1428            _ => {}
1429        };
1430    }
1431
1432    if descr.converted_type() == ConvertedType::DECIMAL {
1433        match T::PHYSICAL_TYPE {
1434            Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
1435                return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1436            }
1437            _ => {}
1438        };
1439    };
1440
1441    if let Some(LogicalType::Float16) = descr.logical_type() {
1442        let a = a.as_bytes();
1443        let a = f16::from_le_bytes([a[0], a[1]]);
1444        let b = b.as_bytes();
1445        let b = f16::from_le_bytes([b[0], b[1]]);
1446        return a > b;
1447    }
1448
1449    a > b
1450}
1451
1452// ----------------------------------------------------------------------
1453// Encoding support for column writer.
1454// This mirrors parquet-mr default encodings for writes. See:
1455// https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV1ValuesWriterFactory.java
1456// https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV2ValuesWriterFactory.java
1457
1458/// Returns encoding for a column when no other encoding is provided in writer properties.
1459fn fallback_encoding(kind: Type, props: &WriterProperties) -> Encoding {
1460    match (kind, props.writer_version()) {
1461        (Type::BOOLEAN, WriterVersion::PARQUET_2_0) => Encoding::RLE,
1462        (Type::INT32, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1463        (Type::INT64, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1464        (Type::BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1465        (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1466        _ => Encoding::PLAIN,
1467    }
1468}
1469
1470/// Returns true if dictionary is supported for column writer, false otherwise.
1471fn has_dictionary_support(kind: Type, props: &WriterProperties) -> bool {
1472    match (kind, props.writer_version()) {
1473        // Booleans do not support dict encoding and should use a fallback encoding.
1474        (Type::BOOLEAN, _) => false,
1475        // Dictionary encoding was not enabled in PARQUET 1.0
1476        (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_1_0) => false,
1477        (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => true,
1478        _ => true,
1479    }
1480}
1481
1482/// Signed comparison of bytes arrays
1483fn compare_greater_byte_array_decimals(a: &[u8], b: &[u8]) -> bool {
1484    let a_length = a.len();
1485    let b_length = b.len();
1486
1487    if a_length == 0 || b_length == 0 {
1488        return a_length > 0;
1489    }
1490
1491    let first_a: u8 = a[0];
1492    let first_b: u8 = b[0];
1493
1494    // We can short circuit for different signed numbers or
1495    // for equal length bytes arrays that have different first bytes.
1496    // The equality requirement is necessary for sign extension cases.
1497    // 0xFF10 should be equal to 0x10 (due to big endian sign extension).
1498    if (0x80 & first_a) != (0x80 & first_b) || (a_length == b_length && first_a != first_b) {
1499        return (first_a as i8) > (first_b as i8);
1500    }
1501
1502    // When the lengths are unequal and the numbers are of the same
1503    // sign we need to do comparison by sign extending the shorter
1504    // value first, and once we get to equal sized arrays, lexicographical
1505    // unsigned comparison of everything but the first byte is sufficient.
1506
1507    let extension: u8 = if (first_a as i8) < 0 { 0xFF } else { 0 };
1508
1509    if a_length != b_length {
1510        let not_equal = if a_length > b_length {
1511            let lead_length = a_length - b_length;
1512            a[0..lead_length].iter().any(|&x| x != extension)
1513        } else {
1514            let lead_length = b_length - a_length;
1515            b[0..lead_length].iter().any(|&x| x != extension)
1516        };
1517
1518        if not_equal {
1519            let negative_values: bool = (first_a as i8) < 0;
1520            let a_longer: bool = a_length > b_length;
1521            return if negative_values { !a_longer } else { a_longer };
1522        }
1523    }
1524
1525    (a[1..]) > (b[1..])
1526}
1527
1528/// Truncate a UTF-8 slice to the longest prefix that is still a valid UTF-8 string,
1529/// while being less than `length` bytes and non-empty. Returns `None` if truncation
1530/// is not possible within those constraints.
1531///
1532/// The caller guarantees that data.len() > length.
1533fn truncate_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1534    let split = (1..=length).rfind(|x| data.is_char_boundary(*x))?;
1535    Some(data.as_bytes()[..split].to_vec())
1536}
1537
1538/// Truncate a UTF-8 slice and increment it's final character. The returned value is the
1539/// longest such slice that is still a valid UTF-8 string while being less than `length`
1540/// bytes and non-empty. Returns `None` if no such transformation is possible.
1541///
1542/// The caller guarantees that data.len() > length.
1543fn truncate_and_increment_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1544    // UTF-8 is max 4 bytes, so start search 3 back from desired length
1545    let lower_bound = length.saturating_sub(3);
1546    let split = (lower_bound..=length).rfind(|x| data.is_char_boundary(*x))?;
1547    increment_utf8(data.get(..split)?)
1548}
1549
1550/// Increment the final character in a UTF-8 string in such a way that the returned result
1551/// is still a valid UTF-8 string. The returned string may be shorter than the input if the
1552/// last character(s) cannot be incremented (due to overflow or producing invalid code points).
1553/// Returns `None` if the string cannot be incremented.
1554///
1555/// Note that this implementation will not promote an N-byte code point to (N+1) bytes.
1556fn increment_utf8(data: &str) -> Option<Vec<u8>> {
1557    for (idx, original_char) in data.char_indices().rev() {
1558        let original_len = original_char.len_utf8();
1559        if let Some(next_char) = char::from_u32(original_char as u32 + 1) {
1560            // do not allow increasing byte width of incremented char
1561            if next_char.len_utf8() == original_len {
1562                let mut result = data.as_bytes()[..idx + original_len].to_vec();
1563                next_char.encode_utf8(&mut result[idx..]);
1564                return Some(result);
1565            }
1566        }
1567    }
1568
1569    None
1570}
1571
1572/// Try and increment the bytes from right to left.
1573///
1574/// Returns `None` if all bytes are set to `u8::MAX`.
1575fn increment(mut data: Vec<u8>) -> Option<Vec<u8>> {
1576    for byte in data.iter_mut().rev() {
1577        let (incremented, overflow) = byte.overflowing_add(1);
1578        *byte = incremented;
1579
1580        if !overflow {
1581            return Some(data);
1582        }
1583    }
1584
1585    None
1586}
1587
1588#[cfg(test)]
1589mod tests {
1590    use crate::{
1591        file::{properties::DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, writer::SerializedFileWriter},
1592        schema::parser::parse_message_type,
1593    };
1594    use core::str;
1595    use rand::distr::uniform::SampleUniform;
1596    use std::{fs::File, sync::Arc};
1597
1598    use crate::column::{
1599        page::PageReader,
1600        reader::{get_column_reader, get_typed_column_reader, ColumnReaderImpl},
1601    };
1602    use crate::file::writer::TrackedWrite;
1603    use crate::file::{
1604        properties::ReaderProperties, reader::SerializedPageReader, writer::SerializedPageWriter,
1605    };
1606    use crate::schema::types::{ColumnPath, Type as SchemaType};
1607    use crate::util::test_common::rand_gen::random_numbers_range;
1608
1609    use super::*;
1610
1611    #[test]
1612    fn test_column_writer_inconsistent_def_rep_length() {
1613        let page_writer = get_test_page_writer();
1614        let props = Default::default();
1615        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 1, props);
1616        let res = writer.write_batch(&[1, 2, 3, 4], Some(&[1, 1, 1]), Some(&[0, 0]));
1617        assert!(res.is_err());
1618        if let Err(err) = res {
1619            assert_eq!(
1620                format!("{err}"),
1621                "Parquet error: Inconsistent length of definition and repetition levels: 3 != 2"
1622            );
1623        }
1624    }
1625
1626    #[test]
1627    fn test_column_writer_invalid_def_levels() {
1628        let page_writer = get_test_page_writer();
1629        let props = Default::default();
1630        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1631        let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1632        assert!(res.is_err());
1633        if let Err(err) = res {
1634            assert_eq!(
1635                format!("{err}"),
1636                "Parquet error: Definition levels are required, because max definition level = 1"
1637            );
1638        }
1639    }
1640
1641    #[test]
1642    fn test_column_writer_invalid_rep_levels() {
1643        let page_writer = get_test_page_writer();
1644        let props = Default::default();
1645        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 1, props);
1646        let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1647        assert!(res.is_err());
1648        if let Err(err) = res {
1649            assert_eq!(
1650                format!("{err}"),
1651                "Parquet error: Repetition levels are required, because max repetition level = 1"
1652            );
1653        }
1654    }
1655
1656    #[test]
1657    fn test_column_writer_not_enough_values_to_write() {
1658        let page_writer = get_test_page_writer();
1659        let props = Default::default();
1660        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1661        let res = writer.write_batch(&[1, 2], Some(&[1, 1, 1, 1]), None);
1662        assert!(res.is_err());
1663        if let Err(err) = res {
1664            assert_eq!(
1665                format!("{err}"),
1666                "Parquet error: Expected to write 4 values, but have only 2"
1667            );
1668        }
1669    }
1670
1671    #[test]
1672    fn test_column_writer_write_only_one_dictionary_page() {
1673        let page_writer = get_test_page_writer();
1674        let props = Default::default();
1675        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1676        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1677        // First page should be correctly written.
1678        writer.add_data_page().unwrap();
1679        writer.write_dictionary_page().unwrap();
1680        let err = writer.write_dictionary_page().unwrap_err().to_string();
1681        assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1682    }
1683
1684    #[test]
1685    fn test_column_writer_error_when_writing_disabled_dictionary() {
1686        let page_writer = get_test_page_writer();
1687        let props = Arc::new(
1688            WriterProperties::builder()
1689                .set_dictionary_enabled(false)
1690                .build(),
1691        );
1692        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1693        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1694        let err = writer.write_dictionary_page().unwrap_err().to_string();
1695        assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1696    }
1697
1698    #[test]
1699    fn test_column_writer_boolean_type_does_not_support_dictionary() {
1700        let page_writer = get_test_page_writer();
1701        let props = Arc::new(
1702            WriterProperties::builder()
1703                .set_dictionary_enabled(true)
1704                .build(),
1705        );
1706        let mut writer = get_test_column_writer::<BoolType>(page_writer, 0, 0, props);
1707        writer
1708            .write_batch(&[true, false, true, false], None, None)
1709            .unwrap();
1710
1711        let r = writer.close().unwrap();
1712        // PlainEncoder uses bit writer to write boolean values, which all fit into 1
1713        // byte.
1714        assert_eq!(r.bytes_written, 1);
1715        assert_eq!(r.rows_written, 4);
1716
1717        let metadata = r.metadata;
1718        assert_eq!(metadata.encodings(), &vec![Encoding::PLAIN, Encoding::RLE]);
1719        assert_eq!(metadata.num_values(), 4); // just values
1720        assert_eq!(metadata.dictionary_page_offset(), None);
1721    }
1722
1723    #[test]
1724    fn test_column_writer_default_encoding_support_bool() {
1725        check_encoding_write_support::<BoolType>(
1726            WriterVersion::PARQUET_1_0,
1727            true,
1728            &[true, false],
1729            None,
1730            &[Encoding::PLAIN, Encoding::RLE],
1731            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1732        );
1733        check_encoding_write_support::<BoolType>(
1734            WriterVersion::PARQUET_1_0,
1735            false,
1736            &[true, false],
1737            None,
1738            &[Encoding::PLAIN, Encoding::RLE],
1739            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1740        );
1741        check_encoding_write_support::<BoolType>(
1742            WriterVersion::PARQUET_2_0,
1743            true,
1744            &[true, false],
1745            None,
1746            &[Encoding::RLE],
1747            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE, 1)],
1748        );
1749        check_encoding_write_support::<BoolType>(
1750            WriterVersion::PARQUET_2_0,
1751            false,
1752            &[true, false],
1753            None,
1754            &[Encoding::RLE],
1755            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE, 1)],
1756        );
1757    }
1758
1759    #[test]
1760    fn test_column_writer_default_encoding_support_int32() {
1761        check_encoding_write_support::<Int32Type>(
1762            WriterVersion::PARQUET_1_0,
1763            true,
1764            &[1, 2],
1765            Some(0),
1766            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1767            &[
1768                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1769                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1770            ],
1771        );
1772        check_encoding_write_support::<Int32Type>(
1773            WriterVersion::PARQUET_1_0,
1774            false,
1775            &[1, 2],
1776            None,
1777            &[Encoding::PLAIN, Encoding::RLE],
1778            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1779        );
1780        check_encoding_write_support::<Int32Type>(
1781            WriterVersion::PARQUET_2_0,
1782            true,
1783            &[1, 2],
1784            Some(0),
1785            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1786            &[
1787                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1788                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1789            ],
1790        );
1791        check_encoding_write_support::<Int32Type>(
1792            WriterVersion::PARQUET_2_0,
1793            false,
1794            &[1, 2],
1795            None,
1796            &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
1797            &[encoding_stats(
1798                PageType::DATA_PAGE_V2,
1799                Encoding::DELTA_BINARY_PACKED,
1800                1,
1801            )],
1802        );
1803    }
1804
1805    #[test]
1806    fn test_column_writer_default_encoding_support_int64() {
1807        check_encoding_write_support::<Int64Type>(
1808            WriterVersion::PARQUET_1_0,
1809            true,
1810            &[1, 2],
1811            Some(0),
1812            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1813            &[
1814                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1815                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1816            ],
1817        );
1818        check_encoding_write_support::<Int64Type>(
1819            WriterVersion::PARQUET_1_0,
1820            false,
1821            &[1, 2],
1822            None,
1823            &[Encoding::PLAIN, Encoding::RLE],
1824            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1825        );
1826        check_encoding_write_support::<Int64Type>(
1827            WriterVersion::PARQUET_2_0,
1828            true,
1829            &[1, 2],
1830            Some(0),
1831            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1832            &[
1833                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1834                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1835            ],
1836        );
1837        check_encoding_write_support::<Int64Type>(
1838            WriterVersion::PARQUET_2_0,
1839            false,
1840            &[1, 2],
1841            None,
1842            &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
1843            &[encoding_stats(
1844                PageType::DATA_PAGE_V2,
1845                Encoding::DELTA_BINARY_PACKED,
1846                1,
1847            )],
1848        );
1849    }
1850
1851    #[test]
1852    fn test_column_writer_default_encoding_support_int96() {
1853        check_encoding_write_support::<Int96Type>(
1854            WriterVersion::PARQUET_1_0,
1855            true,
1856            &[Int96::from(vec![1, 2, 3])],
1857            Some(0),
1858            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1859            &[
1860                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1861                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1862            ],
1863        );
1864        check_encoding_write_support::<Int96Type>(
1865            WriterVersion::PARQUET_1_0,
1866            false,
1867            &[Int96::from(vec![1, 2, 3])],
1868            None,
1869            &[Encoding::PLAIN, Encoding::RLE],
1870            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1871        );
1872        check_encoding_write_support::<Int96Type>(
1873            WriterVersion::PARQUET_2_0,
1874            true,
1875            &[Int96::from(vec![1, 2, 3])],
1876            Some(0),
1877            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1878            &[
1879                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1880                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1881            ],
1882        );
1883        check_encoding_write_support::<Int96Type>(
1884            WriterVersion::PARQUET_2_0,
1885            false,
1886            &[Int96::from(vec![1, 2, 3])],
1887            None,
1888            &[Encoding::PLAIN, Encoding::RLE],
1889            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1890        );
1891    }
1892
1893    #[test]
1894    fn test_column_writer_default_encoding_support_float() {
1895        check_encoding_write_support::<FloatType>(
1896            WriterVersion::PARQUET_1_0,
1897            true,
1898            &[1.0, 2.0],
1899            Some(0),
1900            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1901            &[
1902                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1903                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1904            ],
1905        );
1906        check_encoding_write_support::<FloatType>(
1907            WriterVersion::PARQUET_1_0,
1908            false,
1909            &[1.0, 2.0],
1910            None,
1911            &[Encoding::PLAIN, Encoding::RLE],
1912            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1913        );
1914        check_encoding_write_support::<FloatType>(
1915            WriterVersion::PARQUET_2_0,
1916            true,
1917            &[1.0, 2.0],
1918            Some(0),
1919            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1920            &[
1921                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1922                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1923            ],
1924        );
1925        check_encoding_write_support::<FloatType>(
1926            WriterVersion::PARQUET_2_0,
1927            false,
1928            &[1.0, 2.0],
1929            None,
1930            &[Encoding::PLAIN, Encoding::RLE],
1931            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1932        );
1933    }
1934
1935    #[test]
1936    fn test_column_writer_default_encoding_support_double() {
1937        check_encoding_write_support::<DoubleType>(
1938            WriterVersion::PARQUET_1_0,
1939            true,
1940            &[1.0, 2.0],
1941            Some(0),
1942            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1943            &[
1944                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1945                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1946            ],
1947        );
1948        check_encoding_write_support::<DoubleType>(
1949            WriterVersion::PARQUET_1_0,
1950            false,
1951            &[1.0, 2.0],
1952            None,
1953            &[Encoding::PLAIN, Encoding::RLE],
1954            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1955        );
1956        check_encoding_write_support::<DoubleType>(
1957            WriterVersion::PARQUET_2_0,
1958            true,
1959            &[1.0, 2.0],
1960            Some(0),
1961            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1962            &[
1963                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1964                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1965            ],
1966        );
1967        check_encoding_write_support::<DoubleType>(
1968            WriterVersion::PARQUET_2_0,
1969            false,
1970            &[1.0, 2.0],
1971            None,
1972            &[Encoding::PLAIN, Encoding::RLE],
1973            &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1974        );
1975    }
1976
1977    #[test]
1978    fn test_column_writer_default_encoding_support_byte_array() {
1979        check_encoding_write_support::<ByteArrayType>(
1980            WriterVersion::PARQUET_1_0,
1981            true,
1982            &[ByteArray::from(vec![1u8])],
1983            Some(0),
1984            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1985            &[
1986                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1987                encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1988            ],
1989        );
1990        check_encoding_write_support::<ByteArrayType>(
1991            WriterVersion::PARQUET_1_0,
1992            false,
1993            &[ByteArray::from(vec![1u8])],
1994            None,
1995            &[Encoding::PLAIN, Encoding::RLE],
1996            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1997        );
1998        check_encoding_write_support::<ByteArrayType>(
1999            WriterVersion::PARQUET_2_0,
2000            true,
2001            &[ByteArray::from(vec![1u8])],
2002            Some(0),
2003            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2004            &[
2005                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2006                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2007            ],
2008        );
2009        check_encoding_write_support::<ByteArrayType>(
2010            WriterVersion::PARQUET_2_0,
2011            false,
2012            &[ByteArray::from(vec![1u8])],
2013            None,
2014            &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
2015            &[encoding_stats(
2016                PageType::DATA_PAGE_V2,
2017                Encoding::DELTA_BYTE_ARRAY,
2018                1,
2019            )],
2020        );
2021    }
2022
2023    #[test]
2024    fn test_column_writer_default_encoding_support_fixed_len_byte_array() {
2025        check_encoding_write_support::<FixedLenByteArrayType>(
2026            WriterVersion::PARQUET_1_0,
2027            true,
2028            &[ByteArray::from(vec![1u8]).into()],
2029            None,
2030            &[Encoding::PLAIN, Encoding::RLE],
2031            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2032        );
2033        check_encoding_write_support::<FixedLenByteArrayType>(
2034            WriterVersion::PARQUET_1_0,
2035            false,
2036            &[ByteArray::from(vec![1u8]).into()],
2037            None,
2038            &[Encoding::PLAIN, Encoding::RLE],
2039            &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2040        );
2041        check_encoding_write_support::<FixedLenByteArrayType>(
2042            WriterVersion::PARQUET_2_0,
2043            true,
2044            &[ByteArray::from(vec![1u8]).into()],
2045            Some(0),
2046            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2047            &[
2048                encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2049                encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2050            ],
2051        );
2052        check_encoding_write_support::<FixedLenByteArrayType>(
2053            WriterVersion::PARQUET_2_0,
2054            false,
2055            &[ByteArray::from(vec![1u8]).into()],
2056            None,
2057            &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
2058            &[encoding_stats(
2059                PageType::DATA_PAGE_V2,
2060                Encoding::DELTA_BYTE_ARRAY,
2061                1,
2062            )],
2063        );
2064    }
2065
2066    #[test]
2067    fn test_column_writer_check_metadata() {
2068        let page_writer = get_test_page_writer();
2069        let props = Default::default();
2070        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2071        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2072
2073        let r = writer.close().unwrap();
2074        assert_eq!(r.bytes_written, 20);
2075        assert_eq!(r.rows_written, 4);
2076
2077        let metadata = r.metadata;
2078        assert_eq!(
2079            metadata.encodings(),
2080            &vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
2081        );
2082        assert_eq!(metadata.num_values(), 4);
2083        assert_eq!(metadata.compressed_size(), 20);
2084        assert_eq!(metadata.uncompressed_size(), 20);
2085        assert_eq!(metadata.data_page_offset(), 0);
2086        assert_eq!(metadata.dictionary_page_offset(), Some(0));
2087        if let Some(stats) = metadata.statistics() {
2088            assert_eq!(stats.null_count_opt(), Some(0));
2089            assert_eq!(stats.distinct_count_opt(), None);
2090            if let Statistics::Int32(stats) = stats {
2091                assert_eq!(stats.min_opt().unwrap(), &1);
2092                assert_eq!(stats.max_opt().unwrap(), &4);
2093            } else {
2094                panic!("expecting Statistics::Int32");
2095            }
2096        } else {
2097            panic!("metadata missing statistics");
2098        }
2099    }
2100
2101    #[test]
2102    fn test_column_writer_check_byte_array_min_max() {
2103        let page_writer = get_test_page_writer();
2104        let props = Default::default();
2105        let mut writer = get_test_decimals_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
2106        writer
2107            .write_batch(
2108                &[
2109                    ByteArray::from(vec![
2110                        255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
2111                        35u8, 231u8, 90u8, 0u8, 0u8,
2112                    ]),
2113                    ByteArray::from(vec![
2114                        255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 228u8, 62u8, 146u8,
2115                        152u8, 177u8, 56u8, 0u8, 0u8,
2116                    ]),
2117                    ByteArray::from(vec![
2118                        0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8,
2119                        0u8,
2120                    ]),
2121                    ByteArray::from(vec![
2122                        0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
2123                        44u8, 0u8, 0u8,
2124                    ]),
2125                ],
2126                None,
2127                None,
2128            )
2129            .unwrap();
2130        let metadata = writer.close().unwrap().metadata;
2131        if let Some(stats) = metadata.statistics() {
2132            if let Statistics::ByteArray(stats) = stats {
2133                assert_eq!(
2134                    stats.min_opt().unwrap(),
2135                    &ByteArray::from(vec![
2136                        255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
2137                        35u8, 231u8, 90u8, 0u8, 0u8,
2138                    ])
2139                );
2140                assert_eq!(
2141                    stats.max_opt().unwrap(),
2142                    &ByteArray::from(vec![
2143                        0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
2144                        44u8, 0u8, 0u8,
2145                    ])
2146                );
2147            } else {
2148                panic!("expecting Statistics::ByteArray");
2149            }
2150        } else {
2151            panic!("metadata missing statistics");
2152        }
2153    }
2154
2155    #[test]
2156    fn test_column_writer_uint32_converted_type_min_max() {
2157        let page_writer = get_test_page_writer();
2158        let props = Default::default();
2159        let mut writer = get_test_unsigned_int_given_as_converted_column_writer::<Int32Type>(
2160            page_writer,
2161            0,
2162            0,
2163            props,
2164        );
2165        writer.write_batch(&[0, 1, 2, 3, 4, 5], None, None).unwrap();
2166        let metadata = writer.close().unwrap().metadata;
2167        if let Some(stats) = metadata.statistics() {
2168            if let Statistics::Int32(stats) = stats {
2169                assert_eq!(stats.min_opt().unwrap(), &0,);
2170                assert_eq!(stats.max_opt().unwrap(), &5,);
2171            } else {
2172                panic!("expecting Statistics::Int32");
2173            }
2174        } else {
2175            panic!("metadata missing statistics");
2176        }
2177    }
2178
2179    #[test]
2180    fn test_column_writer_precalculated_statistics() {
2181        let page_writer = get_test_page_writer();
2182        let props = Arc::new(
2183            WriterProperties::builder()
2184                .set_statistics_enabled(EnabledStatistics::Chunk)
2185                .build(),
2186        );
2187        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2188        writer
2189            .write_batch_with_statistics(
2190                &[1, 2, 3, 4],
2191                None,
2192                None,
2193                Some(&-17),
2194                Some(&9000),
2195                Some(55),
2196            )
2197            .unwrap();
2198
2199        let r = writer.close().unwrap();
2200        assert_eq!(r.bytes_written, 20);
2201        assert_eq!(r.rows_written, 4);
2202
2203        let metadata = r.metadata;
2204        assert_eq!(
2205            metadata.encodings(),
2206            &vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
2207        );
2208        assert_eq!(metadata.num_values(), 4);
2209        assert_eq!(metadata.compressed_size(), 20);
2210        assert_eq!(metadata.uncompressed_size(), 20);
2211        assert_eq!(metadata.data_page_offset(), 0);
2212        assert_eq!(metadata.dictionary_page_offset(), Some(0));
2213        if let Some(stats) = metadata.statistics() {
2214            assert_eq!(stats.null_count_opt(), Some(0));
2215            assert_eq!(stats.distinct_count_opt().unwrap_or(0), 55);
2216            if let Statistics::Int32(stats) = stats {
2217                assert_eq!(stats.min_opt().unwrap(), &-17);
2218                assert_eq!(stats.max_opt().unwrap(), &9000);
2219            } else {
2220                panic!("expecting Statistics::Int32");
2221            }
2222        } else {
2223            panic!("metadata missing statistics");
2224        }
2225    }
2226
2227    #[test]
2228    fn test_mixed_precomputed_statistics() {
2229        let mut buf = Vec::with_capacity(100);
2230        let mut write = TrackedWrite::new(&mut buf);
2231        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2232        let props = Default::default();
2233        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2234
2235        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2236        writer
2237            .write_batch_with_statistics(&[5, 6, 7], None, None, Some(&5), Some(&7), Some(3))
2238            .unwrap();
2239
2240        let r = writer.close().unwrap();
2241
2242        let stats = r.metadata.statistics().unwrap();
2243        assert_eq!(stats.min_bytes_opt().unwrap(), 1_i32.to_le_bytes());
2244        assert_eq!(stats.max_bytes_opt().unwrap(), 7_i32.to_le_bytes());
2245        assert_eq!(stats.null_count_opt(), Some(0));
2246        assert!(stats.distinct_count_opt().is_none());
2247
2248        drop(write);
2249
2250        let props = ReaderProperties::builder()
2251            .set_backward_compatible_lz4(false)
2252            .build();
2253        let reader = SerializedPageReader::new_with_properties(
2254            Arc::new(Bytes::from(buf)),
2255            &r.metadata,
2256            r.rows_written as usize,
2257            None,
2258            Arc::new(props),
2259        )
2260        .unwrap();
2261
2262        let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2263        assert_eq!(pages.len(), 2);
2264
2265        assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2266        assert_eq!(pages[1].page_type(), PageType::DATA_PAGE);
2267
2268        let page_statistics = pages[1].statistics().unwrap();
2269        assert_eq!(
2270            page_statistics.min_bytes_opt().unwrap(),
2271            1_i32.to_le_bytes()
2272        );
2273        assert_eq!(
2274            page_statistics.max_bytes_opt().unwrap(),
2275            7_i32.to_le_bytes()
2276        );
2277        assert_eq!(page_statistics.null_count_opt(), Some(0));
2278        assert!(page_statistics.distinct_count_opt().is_none());
2279    }
2280
2281    #[test]
2282    fn test_disabled_statistics() {
2283        let mut buf = Vec::with_capacity(100);
2284        let mut write = TrackedWrite::new(&mut buf);
2285        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2286        let props = WriterProperties::builder()
2287            .set_statistics_enabled(EnabledStatistics::None)
2288            .set_writer_version(WriterVersion::PARQUET_2_0)
2289            .build();
2290        let props = Arc::new(props);
2291
2292        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
2293        writer
2294            .write_batch(&[1, 2, 3, 4], Some(&[1, 0, 0, 1, 1, 1]), None)
2295            .unwrap();
2296
2297        let r = writer.close().unwrap();
2298        assert!(r.metadata.statistics().is_none());
2299
2300        drop(write);
2301
2302        let props = ReaderProperties::builder()
2303            .set_backward_compatible_lz4(false)
2304            .build();
2305        let reader = SerializedPageReader::new_with_properties(
2306            Arc::new(Bytes::from(buf)),
2307            &r.metadata,
2308            r.rows_written as usize,
2309            None,
2310            Arc::new(props),
2311        )
2312        .unwrap();
2313
2314        let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2315        assert_eq!(pages.len(), 2);
2316
2317        assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2318        assert_eq!(pages[1].page_type(), PageType::DATA_PAGE_V2);
2319
2320        match &pages[1] {
2321            Page::DataPageV2 {
2322                num_values,
2323                num_nulls,
2324                num_rows,
2325                statistics,
2326                ..
2327            } => {
2328                assert_eq!(*num_values, 6);
2329                assert_eq!(*num_nulls, 2);
2330                assert_eq!(*num_rows, 6);
2331                assert!(statistics.is_none());
2332            }
2333            _ => unreachable!(),
2334        }
2335    }
2336
2337    #[test]
2338    fn test_column_writer_empty_column_roundtrip() {
2339        let props = Default::default();
2340        column_roundtrip::<Int32Type>(props, &[], None, None);
2341    }
2342
2343    #[test]
2344    fn test_column_writer_non_nullable_values_roundtrip() {
2345        let props = Default::default();
2346        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 0, 0);
2347    }
2348
2349    #[test]
2350    fn test_column_writer_nullable_non_repeated_values_roundtrip() {
2351        let props = Default::default();
2352        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 0);
2353    }
2354
2355    #[test]
2356    fn test_column_writer_nullable_repeated_values_roundtrip() {
2357        let props = Default::default();
2358        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2359    }
2360
2361    #[test]
2362    fn test_column_writer_dictionary_fallback_small_data_page() {
2363        let props = WriterProperties::builder()
2364            .set_dictionary_page_size_limit(32)
2365            .set_data_page_size_limit(32)
2366            .build();
2367        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2368    }
2369
2370    #[test]
2371    fn test_column_writer_small_write_batch_size() {
2372        for i in &[1usize, 2, 5, 10, 11, 1023] {
2373            let props = WriterProperties::builder().set_write_batch_size(*i).build();
2374
2375            column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2376        }
2377    }
2378
2379    #[test]
2380    fn test_column_writer_dictionary_disabled_v1() {
2381        let props = WriterProperties::builder()
2382            .set_writer_version(WriterVersion::PARQUET_1_0)
2383            .set_dictionary_enabled(false)
2384            .build();
2385        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2386    }
2387
2388    #[test]
2389    fn test_column_writer_dictionary_disabled_v2() {
2390        let props = WriterProperties::builder()
2391            .set_writer_version(WriterVersion::PARQUET_2_0)
2392            .set_dictionary_enabled(false)
2393            .build();
2394        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2395    }
2396
2397    #[test]
2398    fn test_column_writer_compression_v1() {
2399        let props = WriterProperties::builder()
2400            .set_writer_version(WriterVersion::PARQUET_1_0)
2401            .set_compression(Compression::SNAPPY)
2402            .build();
2403        column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2404    }
2405
2406    #[test]
2407    fn test_column_writer_compression_v2() {
2408        let props = WriterProperties::builder()
2409            .set_writer_version(WriterVersion::PARQUET_2_0)
2410            .set_compression(Compression::SNAPPY)
2411            .build();
2412        column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2413    }
2414
2415    #[test]
2416    fn test_column_writer_add_data_pages_with_dict() {
2417        // ARROW-5129: Test verifies that we add data page in case of dictionary encoding
2418        // and no fallback occurred so far.
2419        let mut file = tempfile::tempfile().unwrap();
2420        let mut write = TrackedWrite::new(&mut file);
2421        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2422        let props = Arc::new(
2423            WriterProperties::builder()
2424                .set_data_page_size_limit(10)
2425                .set_write_batch_size(3) // write 3 values at a time
2426                .build(),
2427        );
2428        let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
2429        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2430        writer.write_batch(data, None, None).unwrap();
2431        let r = writer.close().unwrap();
2432
2433        drop(write);
2434
2435        // Read pages and check the sequence
2436        let props = ReaderProperties::builder()
2437            .set_backward_compatible_lz4(false)
2438            .build();
2439        let mut page_reader = Box::new(
2440            SerializedPageReader::new_with_properties(
2441                Arc::new(file),
2442                &r.metadata,
2443                r.rows_written as usize,
2444                None,
2445                Arc::new(props),
2446            )
2447            .unwrap(),
2448        );
2449        let mut res = Vec::new();
2450        while let Some(page) = page_reader.get_next_page().unwrap() {
2451            res.push((page.page_type(), page.num_values(), page.buffer().len()));
2452        }
2453        assert_eq!(
2454            res,
2455            vec![
2456                (PageType::DICTIONARY_PAGE, 10, 40),
2457                (PageType::DATA_PAGE, 9, 10),
2458                (PageType::DATA_PAGE, 1, 3),
2459            ]
2460        );
2461        assert_eq!(
2462            r.metadata.page_encoding_stats(),
2463            Some(&vec![
2464                PageEncodingStats {
2465                    page_type: PageType::DICTIONARY_PAGE,
2466                    encoding: Encoding::PLAIN,
2467                    count: 1
2468                },
2469                PageEncodingStats {
2470                    page_type: PageType::DATA_PAGE,
2471                    encoding: Encoding::RLE_DICTIONARY,
2472                    count: 2,
2473                }
2474            ])
2475        );
2476    }
2477
2478    #[test]
2479    fn test_bool_statistics() {
2480        let stats = statistics_roundtrip::<BoolType>(&[true, false, false, true]);
2481        // Booleans have an unsigned sort order and so are not compatible
2482        // with the deprecated `min` and `max` statistics
2483        assert!(!stats.is_min_max_backwards_compatible());
2484        if let Statistics::Boolean(stats) = stats {
2485            assert_eq!(stats.min_opt().unwrap(), &false);
2486            assert_eq!(stats.max_opt().unwrap(), &true);
2487        } else {
2488            panic!("expecting Statistics::Boolean, got {stats:?}");
2489        }
2490    }
2491
2492    #[test]
2493    fn test_int32_statistics() {
2494        let stats = statistics_roundtrip::<Int32Type>(&[-1, 3, -2, 2]);
2495        assert!(stats.is_min_max_backwards_compatible());
2496        if let Statistics::Int32(stats) = stats {
2497            assert_eq!(stats.min_opt().unwrap(), &-2);
2498            assert_eq!(stats.max_opt().unwrap(), &3);
2499        } else {
2500            panic!("expecting Statistics::Int32, got {stats:?}");
2501        }
2502    }
2503
2504    #[test]
2505    fn test_int64_statistics() {
2506        let stats = statistics_roundtrip::<Int64Type>(&[-1, 3, -2, 2]);
2507        assert!(stats.is_min_max_backwards_compatible());
2508        if let Statistics::Int64(stats) = stats {
2509            assert_eq!(stats.min_opt().unwrap(), &-2);
2510            assert_eq!(stats.max_opt().unwrap(), &3);
2511        } else {
2512            panic!("expecting Statistics::Int64, got {stats:?}");
2513        }
2514    }
2515
2516    #[test]
2517    fn test_int96_statistics() {
2518        let input = vec![
2519            Int96::from(vec![1, 20, 30]),
2520            Int96::from(vec![3, 20, 10]),
2521            Int96::from(vec![0, 20, 30]),
2522            Int96::from(vec![2, 20, 30]),
2523        ]
2524        .into_iter()
2525        .collect::<Vec<Int96>>();
2526
2527        let stats = statistics_roundtrip::<Int96Type>(&input);
2528        assert!(!stats.is_min_max_backwards_compatible());
2529        if let Statistics::Int96(stats) = stats {
2530            assert_eq!(stats.min_opt().unwrap(), &Int96::from(vec![0, 20, 30]));
2531            assert_eq!(stats.max_opt().unwrap(), &Int96::from(vec![3, 20, 10]));
2532        } else {
2533            panic!("expecting Statistics::Int96, got {stats:?}");
2534        }
2535    }
2536
2537    #[test]
2538    fn test_float_statistics() {
2539        let stats = statistics_roundtrip::<FloatType>(&[-1.0, 3.0, -2.0, 2.0]);
2540        assert!(stats.is_min_max_backwards_compatible());
2541        if let Statistics::Float(stats) = stats {
2542            assert_eq!(stats.min_opt().unwrap(), &-2.0);
2543            assert_eq!(stats.max_opt().unwrap(), &3.0);
2544        } else {
2545            panic!("expecting Statistics::Float, got {stats:?}");
2546        }
2547    }
2548
2549    #[test]
2550    fn test_double_statistics() {
2551        let stats = statistics_roundtrip::<DoubleType>(&[-1.0, 3.0, -2.0, 2.0]);
2552        assert!(stats.is_min_max_backwards_compatible());
2553        if let Statistics::Double(stats) = stats {
2554            assert_eq!(stats.min_opt().unwrap(), &-2.0);
2555            assert_eq!(stats.max_opt().unwrap(), &3.0);
2556        } else {
2557            panic!("expecting Statistics::Double, got {stats:?}");
2558        }
2559    }
2560
2561    #[test]
2562    fn test_byte_array_statistics() {
2563        let input = ["aawaa", "zz", "aaw", "m", "qrs"]
2564            .iter()
2565            .map(|&s| s.into())
2566            .collect::<Vec<_>>();
2567
2568        let stats = statistics_roundtrip::<ByteArrayType>(&input);
2569        assert!(!stats.is_min_max_backwards_compatible());
2570        if let Statistics::ByteArray(stats) = stats {
2571            assert_eq!(stats.min_opt().unwrap(), &ByteArray::from("aaw"));
2572            assert_eq!(stats.max_opt().unwrap(), &ByteArray::from("zz"));
2573        } else {
2574            panic!("expecting Statistics::ByteArray, got {stats:?}");
2575        }
2576    }
2577
2578    #[test]
2579    fn test_fixed_len_byte_array_statistics() {
2580        let input = ["aawaa", "zz   ", "aaw  ", "m    ", "qrs  "]
2581            .iter()
2582            .map(|&s| ByteArray::from(s).into())
2583            .collect::<Vec<_>>();
2584
2585        let stats = statistics_roundtrip::<FixedLenByteArrayType>(&input);
2586        assert!(!stats.is_min_max_backwards_compatible());
2587        if let Statistics::FixedLenByteArray(stats) = stats {
2588            let expected_min: FixedLenByteArray = ByteArray::from("aaw  ").into();
2589            assert_eq!(stats.min_opt().unwrap(), &expected_min);
2590            let expected_max: FixedLenByteArray = ByteArray::from("zz   ").into();
2591            assert_eq!(stats.max_opt().unwrap(), &expected_max);
2592        } else {
2593            panic!("expecting Statistics::FixedLenByteArray, got {stats:?}");
2594        }
2595    }
2596
2597    #[test]
2598    fn test_column_writer_check_float16_min_max() {
2599        let input = [
2600            -f16::ONE,
2601            f16::from_f32(3.0),
2602            -f16::from_f32(2.0),
2603            f16::from_f32(2.0),
2604        ]
2605        .into_iter()
2606        .map(|s| ByteArray::from(s).into())
2607        .collect::<Vec<_>>();
2608
2609        let stats = float16_statistics_roundtrip(&input);
2610        assert!(stats.is_min_max_backwards_compatible());
2611        assert_eq!(
2612            stats.min_opt().unwrap(),
2613            &ByteArray::from(-f16::from_f32(2.0))
2614        );
2615        assert_eq!(
2616            stats.max_opt().unwrap(),
2617            &ByteArray::from(f16::from_f32(3.0))
2618        );
2619    }
2620
2621    #[test]
2622    fn test_column_writer_check_float16_nan_middle() {
2623        let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
2624            .into_iter()
2625            .map(|s| ByteArray::from(s).into())
2626            .collect::<Vec<_>>();
2627
2628        let stats = float16_statistics_roundtrip(&input);
2629        assert!(stats.is_min_max_backwards_compatible());
2630        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2631        assert_eq!(
2632            stats.max_opt().unwrap(),
2633            &ByteArray::from(f16::ONE + f16::ONE)
2634        );
2635    }
2636
2637    #[test]
2638    fn test_float16_statistics_nan_middle() {
2639        let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
2640            .into_iter()
2641            .map(|s| ByteArray::from(s).into())
2642            .collect::<Vec<_>>();
2643
2644        let stats = float16_statistics_roundtrip(&input);
2645        assert!(stats.is_min_max_backwards_compatible());
2646        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2647        assert_eq!(
2648            stats.max_opt().unwrap(),
2649            &ByteArray::from(f16::ONE + f16::ONE)
2650        );
2651    }
2652
2653    #[test]
2654    fn test_float16_statistics_nan_start() {
2655        let input = [f16::NAN, f16::ONE, f16::ONE + f16::ONE]
2656            .into_iter()
2657            .map(|s| ByteArray::from(s).into())
2658            .collect::<Vec<_>>();
2659
2660        let stats = float16_statistics_roundtrip(&input);
2661        assert!(stats.is_min_max_backwards_compatible());
2662        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2663        assert_eq!(
2664            stats.max_opt().unwrap(),
2665            &ByteArray::from(f16::ONE + f16::ONE)
2666        );
2667    }
2668
2669    #[test]
2670    fn test_float16_statistics_nan_only() {
2671        let input = [f16::NAN, f16::NAN]
2672            .into_iter()
2673            .map(|s| ByteArray::from(s).into())
2674            .collect::<Vec<_>>();
2675
2676        let stats = float16_statistics_roundtrip(&input);
2677        assert!(stats.min_bytes_opt().is_none());
2678        assert!(stats.max_bytes_opt().is_none());
2679        assert!(stats.is_min_max_backwards_compatible());
2680    }
2681
2682    #[test]
2683    fn test_float16_statistics_zero_only() {
2684        let input = [f16::ZERO]
2685            .into_iter()
2686            .map(|s| ByteArray::from(s).into())
2687            .collect::<Vec<_>>();
2688
2689        let stats = float16_statistics_roundtrip(&input);
2690        assert!(stats.is_min_max_backwards_compatible());
2691        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2692        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2693    }
2694
2695    #[test]
2696    fn test_float16_statistics_neg_zero_only() {
2697        let input = [f16::NEG_ZERO]
2698            .into_iter()
2699            .map(|s| ByteArray::from(s).into())
2700            .collect::<Vec<_>>();
2701
2702        let stats = float16_statistics_roundtrip(&input);
2703        assert!(stats.is_min_max_backwards_compatible());
2704        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2705        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2706    }
2707
2708    #[test]
2709    fn test_float16_statistics_zero_min() {
2710        let input = [f16::ZERO, f16::ONE, f16::NAN, f16::PI]
2711            .into_iter()
2712            .map(|s| ByteArray::from(s).into())
2713            .collect::<Vec<_>>();
2714
2715        let stats = float16_statistics_roundtrip(&input);
2716        assert!(stats.is_min_max_backwards_compatible());
2717        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2718        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::PI));
2719    }
2720
2721    #[test]
2722    fn test_float16_statistics_neg_zero_max() {
2723        let input = [f16::NEG_ZERO, f16::NEG_ONE, f16::NAN, -f16::PI]
2724            .into_iter()
2725            .map(|s| ByteArray::from(s).into())
2726            .collect::<Vec<_>>();
2727
2728        let stats = float16_statistics_roundtrip(&input);
2729        assert!(stats.is_min_max_backwards_compatible());
2730        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(-f16::PI));
2731        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2732    }
2733
2734    #[test]
2735    fn test_float_statistics_nan_middle() {
2736        let stats = statistics_roundtrip::<FloatType>(&[1.0, f32::NAN, 2.0]);
2737        assert!(stats.is_min_max_backwards_compatible());
2738        if let Statistics::Float(stats) = stats {
2739            assert_eq!(stats.min_opt().unwrap(), &1.0);
2740            assert_eq!(stats.max_opt().unwrap(), &2.0);
2741        } else {
2742            panic!("expecting Statistics::Float");
2743        }
2744    }
2745
2746    #[test]
2747    fn test_float_statistics_nan_start() {
2748        let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, 1.0, 2.0]);
2749        assert!(stats.is_min_max_backwards_compatible());
2750        if let Statistics::Float(stats) = stats {
2751            assert_eq!(stats.min_opt().unwrap(), &1.0);
2752            assert_eq!(stats.max_opt().unwrap(), &2.0);
2753        } else {
2754            panic!("expecting Statistics::Float");
2755        }
2756    }
2757
2758    #[test]
2759    fn test_float_statistics_nan_only() {
2760        let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, f32::NAN]);
2761        assert!(stats.min_bytes_opt().is_none());
2762        assert!(stats.max_bytes_opt().is_none());
2763        assert!(stats.is_min_max_backwards_compatible());
2764        assert!(matches!(stats, Statistics::Float(_)));
2765    }
2766
2767    #[test]
2768    fn test_float_statistics_zero_only() {
2769        let stats = statistics_roundtrip::<FloatType>(&[0.0]);
2770        assert!(stats.is_min_max_backwards_compatible());
2771        if let Statistics::Float(stats) = stats {
2772            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2773            assert!(stats.min_opt().unwrap().is_sign_negative());
2774            assert_eq!(stats.max_opt().unwrap(), &0.0);
2775            assert!(stats.max_opt().unwrap().is_sign_positive());
2776        } else {
2777            panic!("expecting Statistics::Float");
2778        }
2779    }
2780
2781    #[test]
2782    fn test_float_statistics_neg_zero_only() {
2783        let stats = statistics_roundtrip::<FloatType>(&[-0.0]);
2784        assert!(stats.is_min_max_backwards_compatible());
2785        if let Statistics::Float(stats) = stats {
2786            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2787            assert!(stats.min_opt().unwrap().is_sign_negative());
2788            assert_eq!(stats.max_opt().unwrap(), &0.0);
2789            assert!(stats.max_opt().unwrap().is_sign_positive());
2790        } else {
2791            panic!("expecting Statistics::Float");
2792        }
2793    }
2794
2795    #[test]
2796    fn test_float_statistics_zero_min() {
2797        let stats = statistics_roundtrip::<FloatType>(&[0.0, 1.0, f32::NAN, 2.0]);
2798        assert!(stats.is_min_max_backwards_compatible());
2799        if let Statistics::Float(stats) = stats {
2800            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2801            assert!(stats.min_opt().unwrap().is_sign_negative());
2802            assert_eq!(stats.max_opt().unwrap(), &2.0);
2803        } else {
2804            panic!("expecting Statistics::Float");
2805        }
2806    }
2807
2808    #[test]
2809    fn test_float_statistics_neg_zero_max() {
2810        let stats = statistics_roundtrip::<FloatType>(&[-0.0, -1.0, f32::NAN, -2.0]);
2811        assert!(stats.is_min_max_backwards_compatible());
2812        if let Statistics::Float(stats) = stats {
2813            assert_eq!(stats.min_opt().unwrap(), &-2.0);
2814            assert_eq!(stats.max_opt().unwrap(), &0.0);
2815            assert!(stats.max_opt().unwrap().is_sign_positive());
2816        } else {
2817            panic!("expecting Statistics::Float");
2818        }
2819    }
2820
2821    #[test]
2822    fn test_double_statistics_nan_middle() {
2823        let stats = statistics_roundtrip::<DoubleType>(&[1.0, f64::NAN, 2.0]);
2824        assert!(stats.is_min_max_backwards_compatible());
2825        if let Statistics::Double(stats) = stats {
2826            assert_eq!(stats.min_opt().unwrap(), &1.0);
2827            assert_eq!(stats.max_opt().unwrap(), &2.0);
2828        } else {
2829            panic!("expecting Statistics::Double");
2830        }
2831    }
2832
2833    #[test]
2834    fn test_double_statistics_nan_start() {
2835        let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, 1.0, 2.0]);
2836        assert!(stats.is_min_max_backwards_compatible());
2837        if let Statistics::Double(stats) = stats {
2838            assert_eq!(stats.min_opt().unwrap(), &1.0);
2839            assert_eq!(stats.max_opt().unwrap(), &2.0);
2840        } else {
2841            panic!("expecting Statistics::Double");
2842        }
2843    }
2844
2845    #[test]
2846    fn test_double_statistics_nan_only() {
2847        let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, f64::NAN]);
2848        assert!(stats.min_bytes_opt().is_none());
2849        assert!(stats.max_bytes_opt().is_none());
2850        assert!(matches!(stats, Statistics::Double(_)));
2851        assert!(stats.is_min_max_backwards_compatible());
2852    }
2853
2854    #[test]
2855    fn test_double_statistics_zero_only() {
2856        let stats = statistics_roundtrip::<DoubleType>(&[0.0]);
2857        assert!(stats.is_min_max_backwards_compatible());
2858        if let Statistics::Double(stats) = stats {
2859            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2860            assert!(stats.min_opt().unwrap().is_sign_negative());
2861            assert_eq!(stats.max_opt().unwrap(), &0.0);
2862            assert!(stats.max_opt().unwrap().is_sign_positive());
2863        } else {
2864            panic!("expecting Statistics::Double");
2865        }
2866    }
2867
2868    #[test]
2869    fn test_double_statistics_neg_zero_only() {
2870        let stats = statistics_roundtrip::<DoubleType>(&[-0.0]);
2871        assert!(stats.is_min_max_backwards_compatible());
2872        if let Statistics::Double(stats) = stats {
2873            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2874            assert!(stats.min_opt().unwrap().is_sign_negative());
2875            assert_eq!(stats.max_opt().unwrap(), &0.0);
2876            assert!(stats.max_opt().unwrap().is_sign_positive());
2877        } else {
2878            panic!("expecting Statistics::Double");
2879        }
2880    }
2881
2882    #[test]
2883    fn test_double_statistics_zero_min() {
2884        let stats = statistics_roundtrip::<DoubleType>(&[0.0, 1.0, f64::NAN, 2.0]);
2885        assert!(stats.is_min_max_backwards_compatible());
2886        if let Statistics::Double(stats) = stats {
2887            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2888            assert!(stats.min_opt().unwrap().is_sign_negative());
2889            assert_eq!(stats.max_opt().unwrap(), &2.0);
2890        } else {
2891            panic!("expecting Statistics::Double");
2892        }
2893    }
2894
2895    #[test]
2896    fn test_double_statistics_neg_zero_max() {
2897        let stats = statistics_roundtrip::<DoubleType>(&[-0.0, -1.0, f64::NAN, -2.0]);
2898        assert!(stats.is_min_max_backwards_compatible());
2899        if let Statistics::Double(stats) = stats {
2900            assert_eq!(stats.min_opt().unwrap(), &-2.0);
2901            assert_eq!(stats.max_opt().unwrap(), &0.0);
2902            assert!(stats.max_opt().unwrap().is_sign_positive());
2903        } else {
2904            panic!("expecting Statistics::Double");
2905        }
2906    }
2907
2908    #[test]
2909    fn test_compare_greater_byte_array_decimals() {
2910        assert!(!compare_greater_byte_array_decimals(&[], &[],),);
2911        assert!(compare_greater_byte_array_decimals(&[1u8,], &[],),);
2912        assert!(!compare_greater_byte_array_decimals(&[], &[1u8,],),);
2913        assert!(compare_greater_byte_array_decimals(&[1u8,], &[0u8,],),);
2914        assert!(!compare_greater_byte_array_decimals(&[1u8,], &[1u8,],),);
2915        assert!(compare_greater_byte_array_decimals(&[1u8, 0u8,], &[0u8,],),);
2916        assert!(!compare_greater_byte_array_decimals(
2917            &[0u8, 1u8,],
2918            &[1u8, 0u8,],
2919        ),);
2920        assert!(!compare_greater_byte_array_decimals(
2921            &[255u8, 35u8, 0u8, 0u8,],
2922            &[0u8,],
2923        ),);
2924        assert!(compare_greater_byte_array_decimals(
2925            &[0u8,],
2926            &[255u8, 35u8, 0u8, 0u8,],
2927        ),);
2928    }
2929
2930    #[test]
2931    fn test_column_index_with_null_pages() {
2932        // write a single page of all nulls
2933        let page_writer = get_test_page_writer();
2934        let props = Default::default();
2935        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
2936        writer.write_batch(&[], Some(&[0, 0, 0, 0]), None).unwrap();
2937
2938        let r = writer.close().unwrap();
2939        assert!(r.column_index.is_some());
2940        let col_idx = r.column_index.unwrap();
2941        // null_pages should be true for page 0
2942        assert!(col_idx.null_pages[0]);
2943        // min and max should be empty byte arrays
2944        assert_eq!(col_idx.min_values[0].len(), 0);
2945        assert_eq!(col_idx.max_values[0].len(), 0);
2946        // null_counts should be defined and be 4 for page 0
2947        assert!(col_idx.null_counts.is_some());
2948        assert_eq!(col_idx.null_counts.as_ref().unwrap()[0], 4);
2949        // there is no repetition so rep histogram should be absent
2950        assert!(col_idx.repetition_level_histograms.is_none());
2951        // definition_level_histogram should be present and should be 0:4, 1:0
2952        assert!(col_idx.definition_level_histograms.is_some());
2953        assert_eq!(col_idx.definition_level_histograms.unwrap(), &[4, 0]);
2954    }
2955
2956    #[test]
2957    fn test_column_offset_index_metadata() {
2958        // write data
2959        // and check the offset index and column index
2960        let page_writer = get_test_page_writer();
2961        let props = Default::default();
2962        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2963        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2964        // first page
2965        writer.flush_data_pages().unwrap();
2966        // second page
2967        writer.write_batch(&[4, 8, 2, -5], None, None).unwrap();
2968
2969        let r = writer.close().unwrap();
2970        let column_index = r.column_index.unwrap();
2971        let offset_index = r.offset_index.unwrap();
2972
2973        assert_eq!(8, r.rows_written);
2974
2975        // column index
2976        assert_eq!(2, column_index.null_pages.len());
2977        assert_eq!(2, offset_index.page_locations.len());
2978        assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order);
2979        for idx in 0..2 {
2980            assert!(!column_index.null_pages[idx]);
2981            assert_eq!(0, column_index.null_counts.as_ref().unwrap()[idx]);
2982        }
2983
2984        if let Some(stats) = r.metadata.statistics() {
2985            assert_eq!(stats.null_count_opt(), Some(0));
2986            assert_eq!(stats.distinct_count_opt(), None);
2987            if let Statistics::Int32(stats) = stats {
2988                // first page is [1,2,3,4]
2989                // second page is [-5,2,4,8]
2990                // note that we don't increment here, as this is a non BinaryArray type.
2991                assert_eq!(
2992                    stats.min_bytes_opt(),
2993                    Some(column_index.min_values[1].as_slice())
2994                );
2995                assert_eq!(
2996                    stats.max_bytes_opt(),
2997                    column_index.max_values.get(1).map(Vec::as_slice)
2998                );
2999            } else {
3000                panic!("expecting Statistics::Int32");
3001            }
3002        } else {
3003            panic!("metadata missing statistics");
3004        }
3005
3006        // page location
3007        assert_eq!(0, offset_index.page_locations[0].first_row_index);
3008        assert_eq!(4, offset_index.page_locations[1].first_row_index);
3009    }
3010
3011    /// Verify min/max value truncation in the column index works as expected
3012    #[test]
3013    fn test_column_offset_index_metadata_truncating() {
3014        // write data
3015        // and check the offset index and column index
3016        let page_writer = get_test_page_writer();
3017        let props = Default::default();
3018        let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3019
3020        let mut data = vec![FixedLenByteArray::default(); 3];
3021        // This is the expected min value - "aaa..."
3022        data[0].set_data(Bytes::from(vec![97_u8; 200]));
3023        // This is the expected max value - "ZZZ..."
3024        data[1].set_data(Bytes::from(vec![112_u8; 200]));
3025        data[2].set_data(Bytes::from(vec![98_u8; 200]));
3026
3027        writer.write_batch(&data, None, None).unwrap();
3028
3029        writer.flush_data_pages().unwrap();
3030
3031        let r = writer.close().unwrap();
3032        let column_index = r.column_index.unwrap();
3033        let offset_index = r.offset_index.unwrap();
3034
3035        assert_eq!(3, r.rows_written);
3036
3037        // column index
3038        assert_eq!(1, column_index.null_pages.len());
3039        assert_eq!(1, offset_index.page_locations.len());
3040        assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
3041        assert!(!column_index.null_pages[0]);
3042        assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]);
3043
3044        if let Some(stats) = r.metadata.statistics() {
3045            assert_eq!(stats.null_count_opt(), Some(0));
3046            assert_eq!(stats.distinct_count_opt(), None);
3047            if let Statistics::FixedLenByteArray(stats) = stats {
3048                let column_index_min_value = &column_index.min_values[0];
3049                let column_index_max_value = &column_index.max_values[0];
3050
3051                // Column index stats are truncated, while the column chunk's aren't.
3052                assert_ne!(
3053                    stats.min_bytes_opt(),
3054                    Some(column_index_min_value.as_slice())
3055                );
3056                assert_ne!(
3057                    stats.max_bytes_opt(),
3058                    Some(column_index_max_value.as_slice())
3059                );
3060
3061                assert_eq!(
3062                    column_index_min_value.len(),
3063                    DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
3064                );
3065                assert_eq!(column_index_min_value.as_slice(), &[97_u8; 64]);
3066                assert_eq!(
3067                    column_index_max_value.len(),
3068                    DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
3069                );
3070
3071                // We expect the last byte to be incremented
3072                assert_eq!(
3073                    *column_index_max_value.last().unwrap(),
3074                    *column_index_max_value.first().unwrap() + 1
3075                );
3076            } else {
3077                panic!("expecting Statistics::FixedLenByteArray");
3078            }
3079        } else {
3080            panic!("metadata missing statistics");
3081        }
3082    }
3083
3084    #[test]
3085    fn test_column_offset_index_truncating_spec_example() {
3086        // write data
3087        // and check the offset index and column index
3088        let page_writer = get_test_page_writer();
3089
3090        // Truncate values at 1 byte
3091        let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3092        let props = Arc::new(builder.build());
3093        let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3094
3095        let mut data = vec![FixedLenByteArray::default(); 1];
3096        // This is the expected min value
3097        data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
3098
3099        writer.write_batch(&data, None, None).unwrap();
3100
3101        writer.flush_data_pages().unwrap();
3102
3103        let r = writer.close().unwrap();
3104        let column_index = r.column_index.unwrap();
3105        let offset_index = r.offset_index.unwrap();
3106
3107        assert_eq!(1, r.rows_written);
3108
3109        // column index
3110        assert_eq!(1, column_index.null_pages.len());
3111        assert_eq!(1, offset_index.page_locations.len());
3112        assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
3113        assert!(!column_index.null_pages[0]);
3114        assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]);
3115
3116        if let Some(stats) = r.metadata.statistics() {
3117            assert_eq!(stats.null_count_opt(), Some(0));
3118            assert_eq!(stats.distinct_count_opt(), None);
3119            if let Statistics::FixedLenByteArray(_stats) = stats {
3120                let column_index_min_value = &column_index.min_values[0];
3121                let column_index_max_value = &column_index.max_values[0];
3122
3123                assert_eq!(column_index_min_value.len(), 1);
3124                assert_eq!(column_index_max_value.len(), 1);
3125
3126                assert_eq!("B".as_bytes(), column_index_min_value.as_slice());
3127                assert_eq!("C".as_bytes(), column_index_max_value.as_slice());
3128
3129                assert_ne!(column_index_min_value, stats.min_bytes_opt().unwrap());
3130                assert_ne!(column_index_max_value, stats.max_bytes_opt().unwrap());
3131            } else {
3132                panic!("expecting Statistics::FixedLenByteArray");
3133            }
3134        } else {
3135            panic!("metadata missing statistics");
3136        }
3137    }
3138
3139    #[test]
3140    fn test_float16_min_max_no_truncation() {
3141        // Even if we set truncation to occur at 1 byte, we should not truncate for Float16
3142        let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3143        let props = Arc::new(builder.build());
3144        let page_writer = get_test_page_writer();
3145        let mut writer = get_test_float16_column_writer(page_writer, props);
3146
3147        let expected_value = f16::PI.to_le_bytes().to_vec();
3148        let data = vec![ByteArray::from(expected_value.clone()).into()];
3149        writer.write_batch(&data, None, None).unwrap();
3150        writer.flush_data_pages().unwrap();
3151
3152        let r = writer.close().unwrap();
3153
3154        // stats should still be written
3155        // ensure bytes weren't truncated for column index
3156        let column_index = r.column_index.unwrap();
3157        let column_index_min_bytes = column_index.min_values[0].as_slice();
3158        let column_index_max_bytes = column_index.max_values[0].as_slice();
3159        assert_eq!(expected_value, column_index_min_bytes);
3160        assert_eq!(expected_value, column_index_max_bytes);
3161
3162        // ensure bytes weren't truncated for statistics
3163        let stats = r.metadata.statistics().unwrap();
3164        if let Statistics::FixedLenByteArray(stats) = stats {
3165            let stats_min_bytes = stats.min_bytes_opt().unwrap();
3166            let stats_max_bytes = stats.max_bytes_opt().unwrap();
3167            assert_eq!(expected_value, stats_min_bytes);
3168            assert_eq!(expected_value, stats_max_bytes);
3169        } else {
3170            panic!("expecting Statistics::FixedLenByteArray");
3171        }
3172    }
3173
3174    #[test]
3175    fn test_decimal_min_max_no_truncation() {
3176        // Even if we set truncation to occur at 1 byte, we should not truncate for Decimal
3177        let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3178        let props = Arc::new(builder.build());
3179        let page_writer = get_test_page_writer();
3180        let mut writer =
3181            get_test_decimals_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3182
3183        let expected_value = vec![
3184            255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8, 35u8,
3185            231u8, 90u8, 0u8, 0u8,
3186        ];
3187        let data = vec![ByteArray::from(expected_value.clone()).into()];
3188        writer.write_batch(&data, None, None).unwrap();
3189        writer.flush_data_pages().unwrap();
3190
3191        let r = writer.close().unwrap();
3192
3193        // stats should still be written
3194        // ensure bytes weren't truncated for column index
3195        let column_index = r.column_index.unwrap();
3196        let column_index_min_bytes = column_index.min_values[0].as_slice();
3197        let column_index_max_bytes = column_index.max_values[0].as_slice();
3198        assert_eq!(expected_value, column_index_min_bytes);
3199        assert_eq!(expected_value, column_index_max_bytes);
3200
3201        // ensure bytes weren't truncated for statistics
3202        let stats = r.metadata.statistics().unwrap();
3203        if let Statistics::FixedLenByteArray(stats) = stats {
3204            let stats_min_bytes = stats.min_bytes_opt().unwrap();
3205            let stats_max_bytes = stats.max_bytes_opt().unwrap();
3206            assert_eq!(expected_value, stats_min_bytes);
3207            assert_eq!(expected_value, stats_max_bytes);
3208        } else {
3209            panic!("expecting Statistics::FixedLenByteArray");
3210        }
3211    }
3212
3213    #[test]
3214    fn test_statistics_truncating_byte_array() {
3215        let page_writer = get_test_page_writer();
3216
3217        const TEST_TRUNCATE_LENGTH: usize = 1;
3218
3219        // Truncate values at 1 byte
3220        let builder =
3221            WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3222        let props = Arc::new(builder.build());
3223        let mut writer = get_test_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
3224
3225        let mut data = vec![ByteArray::default(); 1];
3226        // This is the expected min value
3227        data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
3228
3229        writer.write_batch(&data, None, None).unwrap();
3230
3231        writer.flush_data_pages().unwrap();
3232
3233        let r = writer.close().unwrap();
3234
3235        assert_eq!(1, r.rows_written);
3236
3237        let stats = r.metadata.statistics().expect("statistics");
3238        assert_eq!(stats.null_count_opt(), Some(0));
3239        assert_eq!(stats.distinct_count_opt(), None);
3240        if let Statistics::ByteArray(_stats) = stats {
3241            let min_value = _stats.min_opt().unwrap();
3242            let max_value = _stats.max_opt().unwrap();
3243
3244            assert!(!_stats.min_is_exact());
3245            assert!(!_stats.max_is_exact());
3246
3247            assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3248            assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3249
3250            assert_eq!("B".as_bytes(), min_value.as_bytes());
3251            assert_eq!("C".as_bytes(), max_value.as_bytes());
3252        } else {
3253            panic!("expecting Statistics::ByteArray");
3254        }
3255    }
3256
3257    #[test]
3258    fn test_statistics_truncating_fixed_len_byte_array() {
3259        let page_writer = get_test_page_writer();
3260
3261        const TEST_TRUNCATE_LENGTH: usize = 1;
3262
3263        // Truncate values at 1 byte
3264        let builder =
3265            WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3266        let props = Arc::new(builder.build());
3267        let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3268
3269        let mut data = vec![FixedLenByteArray::default(); 1];
3270
3271        const PSEUDO_DECIMAL_VALUE: i128 = 6541894651216648486512564456564654;
3272        const PSEUDO_DECIMAL_BYTES: [u8; 16] = PSEUDO_DECIMAL_VALUE.to_be_bytes();
3273
3274        const EXPECTED_MIN: [u8; TEST_TRUNCATE_LENGTH] = [PSEUDO_DECIMAL_BYTES[0]]; // parquet specifies big-endian order for decimals
3275        const EXPECTED_MAX: [u8; TEST_TRUNCATE_LENGTH] =
3276            [PSEUDO_DECIMAL_BYTES[0].overflowing_add(1).0];
3277
3278        // This is the expected min value
3279        data[0].set_data(Bytes::from(PSEUDO_DECIMAL_BYTES.as_slice()));
3280
3281        writer.write_batch(&data, None, None).unwrap();
3282
3283        writer.flush_data_pages().unwrap();
3284
3285        let r = writer.close().unwrap();
3286
3287        assert_eq!(1, r.rows_written);
3288
3289        let stats = r.metadata.statistics().expect("statistics");
3290        assert_eq!(stats.null_count_opt(), Some(0));
3291        assert_eq!(stats.distinct_count_opt(), None);
3292        if let Statistics::FixedLenByteArray(_stats) = stats {
3293            let min_value = _stats.min_opt().unwrap();
3294            let max_value = _stats.max_opt().unwrap();
3295
3296            assert!(!_stats.min_is_exact());
3297            assert!(!_stats.max_is_exact());
3298
3299            assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3300            assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3301
3302            assert_eq!(EXPECTED_MIN.as_slice(), min_value.as_bytes());
3303            assert_eq!(EXPECTED_MAX.as_slice(), max_value.as_bytes());
3304
3305            let reconstructed_min = i128::from_be_bytes([
3306                min_value.as_bytes()[0],
3307                0,
3308                0,
3309                0,
3310                0,
3311                0,
3312                0,
3313                0,
3314                0,
3315                0,
3316                0,
3317                0,
3318                0,
3319                0,
3320                0,
3321                0,
3322            ]);
3323
3324            let reconstructed_max = i128::from_be_bytes([
3325                max_value.as_bytes()[0],
3326                0,
3327                0,
3328                0,
3329                0,
3330                0,
3331                0,
3332                0,
3333                0,
3334                0,
3335                0,
3336                0,
3337                0,
3338                0,
3339                0,
3340                0,
3341            ]);
3342
3343            // check that the inner value is correctly bounded by the min/max
3344            println!("min: {reconstructed_min} {PSEUDO_DECIMAL_VALUE}");
3345            assert!(reconstructed_min <= PSEUDO_DECIMAL_VALUE);
3346            println!("max {reconstructed_max} {PSEUDO_DECIMAL_VALUE}");
3347            assert!(reconstructed_max >= PSEUDO_DECIMAL_VALUE);
3348        } else {
3349            panic!("expecting Statistics::FixedLenByteArray");
3350        }
3351    }
3352
3353    #[test]
3354    fn test_send() {
3355        fn test<T: Send>() {}
3356        test::<ColumnWriterImpl<Int32Type>>();
3357    }
3358
3359    #[test]
3360    fn test_increment() {
3361        let v = increment(vec![0, 0, 0]).unwrap();
3362        assert_eq!(&v, &[0, 0, 1]);
3363
3364        // Handle overflow
3365        let v = increment(vec![0, 255, 255]).unwrap();
3366        assert_eq!(&v, &[1, 0, 0]);
3367
3368        // Return `None` if all bytes are u8::MAX
3369        let v = increment(vec![255, 255, 255]);
3370        assert!(v.is_none());
3371    }
3372
3373    #[test]
3374    fn test_increment_utf8() {
3375        let test_inc = |o: &str, expected: &str| {
3376            if let Ok(v) = String::from_utf8(increment_utf8(o).unwrap()) {
3377                // Got the expected result...
3378                assert_eq!(v, expected);
3379                // and it's greater than the original string
3380                assert!(*v > *o);
3381                // Also show that BinaryArray level comparison works here
3382                let mut greater = ByteArray::new();
3383                greater.set_data(Bytes::from(v));
3384                let mut original = ByteArray::new();
3385                original.set_data(Bytes::from(o.as_bytes().to_vec()));
3386                assert!(greater > original);
3387            } else {
3388                panic!("Expected incremented UTF8 string to also be valid.");
3389            }
3390        };
3391
3392        // Basic ASCII case
3393        test_inc("hello", "hellp");
3394
3395        // 1-byte ending in max 1-byte
3396        test_inc("a\u{7f}", "b");
3397
3398        // 1-byte max should not truncate as it would need 2-byte code points
3399        assert!(increment_utf8("\u{7f}\u{7f}").is_none());
3400
3401        // UTF8 string
3402        test_inc("❤️🧡💛💚💙💜", "❤️🧡💛💚💙💝");
3403
3404        // 2-byte without overflow
3405        test_inc("éééé", "éééê");
3406
3407        // 2-byte that overflows lowest byte
3408        test_inc("\u{ff}\u{ff}", "\u{ff}\u{100}");
3409
3410        // 2-byte ending in max 2-byte
3411        test_inc("a\u{7ff}", "b");
3412
3413        // Max 2-byte should not truncate as it would need 3-byte code points
3414        assert!(increment_utf8("\u{7ff}\u{7ff}").is_none());
3415
3416        // 3-byte without overflow [U+800, U+800] -> [U+800, U+801] (note that these
3417        // characters should render right to left).
3418        test_inc("ࠀࠀ", "ࠀࠁ");
3419
3420        // 3-byte ending in max 3-byte
3421        test_inc("a\u{ffff}", "b");
3422
3423        // Max 3-byte should not truncate as it would need 4-byte code points
3424        assert!(increment_utf8("\u{ffff}\u{ffff}").is_none());
3425
3426        // 4-byte without overflow
3427        test_inc("𐀀𐀀", "𐀀𐀁");
3428
3429        // 4-byte ending in max unicode
3430        test_inc("a\u{10ffff}", "b");
3431
3432        // Max 4-byte should not truncate
3433        assert!(increment_utf8("\u{10ffff}\u{10ffff}").is_none());
3434
3435        // Skip over surrogate pair range (0xD800..=0xDFFF)
3436        //test_inc("a\u{D7FF}", "a\u{e000}");
3437        test_inc("a\u{D7FF}", "b");
3438    }
3439
3440    #[test]
3441    fn test_truncate_utf8() {
3442        // No-op
3443        let data = "❤️🧡💛💚💙💜";
3444        let r = truncate_utf8(data, data.len()).unwrap();
3445        assert_eq!(r.len(), data.len());
3446        assert_eq!(&r, data.as_bytes());
3447
3448        // We slice it away from the UTF8 boundary
3449        let r = truncate_utf8(data, 13).unwrap();
3450        assert_eq!(r.len(), 10);
3451        assert_eq!(&r, "❤️🧡".as_bytes());
3452
3453        // One multi-byte code point, and a length shorter than it, so we can't slice it
3454        let r = truncate_utf8("\u{0836}", 1);
3455        assert!(r.is_none());
3456
3457        // Test truncate and increment for max bounds on UTF-8 statistics
3458        // 7-bit (i.e. ASCII)
3459        let r = truncate_and_increment_utf8("yyyyyyyyy", 8).unwrap();
3460        assert_eq!(&r, "yyyyyyyz".as_bytes());
3461
3462        // 2-byte without overflow
3463        let r = truncate_and_increment_utf8("ééééé", 7).unwrap();
3464        assert_eq!(&r, "ééê".as_bytes());
3465
3466        // 2-byte that overflows lowest byte
3467        let r = truncate_and_increment_utf8("\u{ff}\u{ff}\u{ff}\u{ff}\u{ff}", 8).unwrap();
3468        assert_eq!(&r, "\u{ff}\u{ff}\u{ff}\u{100}".as_bytes());
3469
3470        // max 2-byte should not truncate as it would need 3-byte code points
3471        let r = truncate_and_increment_utf8("߿߿߿߿߿", 8);
3472        assert!(r.is_none());
3473
3474        // 3-byte without overflow [U+800, U+800, U+800] -> [U+800, U+801] (note that these
3475        // characters should render right to left).
3476        let r = truncate_and_increment_utf8("ࠀࠀࠀࠀ", 8).unwrap();
3477        assert_eq!(&r, "ࠀࠁ".as_bytes());
3478
3479        // max 3-byte should not truncate as it would need 4-byte code points
3480        let r = truncate_and_increment_utf8("\u{ffff}\u{ffff}\u{ffff}", 8);
3481        assert!(r.is_none());
3482
3483        // 4-byte without overflow
3484        let r = truncate_and_increment_utf8("𐀀𐀀𐀀𐀀", 9).unwrap();
3485        assert_eq!(&r, "𐀀𐀁".as_bytes());
3486
3487        // max 4-byte should not truncate
3488        let r = truncate_and_increment_utf8("\u{10ffff}\u{10ffff}", 8);
3489        assert!(r.is_none());
3490    }
3491
3492    #[test]
3493    // Check fallback truncation of statistics that should be UTF-8, but aren't
3494    // (see https://github.com/apache/arrow-rs/pull/6870).
3495    fn test_byte_array_truncate_invalid_utf8_statistics() {
3496        let message_type = "
3497            message test_schema {
3498                OPTIONAL BYTE_ARRAY a (UTF8);
3499            }
3500        ";
3501        let schema = Arc::new(parse_message_type(message_type).unwrap());
3502
3503        // Create Vec<ByteArray> containing non-UTF8 bytes
3504        let data = vec![ByteArray::from(vec![128u8; 32]); 7];
3505        let def_levels = [1, 1, 1, 1, 0, 1, 0, 1, 0, 1];
3506        let file: File = tempfile::tempfile().unwrap();
3507        let props = Arc::new(
3508            WriterProperties::builder()
3509                .set_statistics_enabled(EnabledStatistics::Chunk)
3510                .set_statistics_truncate_length(Some(8))
3511                .build(),
3512        );
3513
3514        let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
3515        let mut row_group_writer = writer.next_row_group().unwrap();
3516
3517        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
3518        col_writer
3519            .typed::<ByteArrayType>()
3520            .write_batch(&data, Some(&def_levels), None)
3521            .unwrap();
3522        col_writer.close().unwrap();
3523        row_group_writer.close().unwrap();
3524        let file_metadata = writer.close().unwrap();
3525        assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
3526        let stats = file_metadata.row_groups[0].columns[0]
3527            .meta_data
3528            .as_ref()
3529            .unwrap()
3530            .statistics
3531            .as_ref()
3532            .unwrap();
3533        assert!(!stats.is_max_value_exact.unwrap());
3534        // Truncation of invalid UTF-8 should fall back to binary truncation, so last byte should
3535        // be incremented by 1.
3536        assert_eq!(
3537            stats.max_value,
3538            Some([128, 128, 128, 128, 128, 128, 128, 129].to_vec())
3539        );
3540    }
3541
3542    #[test]
3543    fn test_increment_max_binary_chars() {
3544        let r = increment(vec![0xFF, 0xFE, 0xFD, 0xFF, 0xFF]);
3545        assert_eq!(&r.unwrap(), &[0xFF, 0xFE, 0xFE, 0x00, 0x00]);
3546
3547        let incremented = increment(vec![0xFF, 0xFF, 0xFF]);
3548        assert!(incremented.is_none())
3549    }
3550
3551    #[test]
3552    fn test_no_column_index_when_stats_disabled() {
3553        // https://github.com/apache/arrow-rs/issues/6010
3554        // Test that column index is not created/written for all-nulls column when page
3555        // statistics are disabled.
3556        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3557        let props = Arc::new(
3558            WriterProperties::builder()
3559                .set_statistics_enabled(EnabledStatistics::None)
3560                .build(),
3561        );
3562        let column_writer = get_column_writer(descr, props, get_test_page_writer());
3563        let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3564
3565        let data = Vec::new();
3566        let def_levels = vec![0; 10];
3567        writer.write_batch(&data, Some(&def_levels), None).unwrap();
3568        writer.flush_data_pages().unwrap();
3569
3570        let column_close_result = writer.close().unwrap();
3571        assert!(column_close_result.offset_index.is_some());
3572        assert!(column_close_result.column_index.is_none());
3573    }
3574
3575    #[test]
3576    fn test_no_offset_index_when_disabled() {
3577        // Test that offset indexes can be disabled
3578        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3579        let props = Arc::new(
3580            WriterProperties::builder()
3581                .set_statistics_enabled(EnabledStatistics::None)
3582                .set_offset_index_disabled(true)
3583                .build(),
3584        );
3585        let column_writer = get_column_writer(descr, props, get_test_page_writer());
3586        let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3587
3588        let data = Vec::new();
3589        let def_levels = vec![0; 10];
3590        writer.write_batch(&data, Some(&def_levels), None).unwrap();
3591        writer.flush_data_pages().unwrap();
3592
3593        let column_close_result = writer.close().unwrap();
3594        assert!(column_close_result.offset_index.is_none());
3595        assert!(column_close_result.column_index.is_none());
3596    }
3597
3598    #[test]
3599    fn test_offset_index_overridden() {
3600        // Test that offset indexes are not disabled when gathering page statistics
3601        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3602        let props = Arc::new(
3603            WriterProperties::builder()
3604                .set_statistics_enabled(EnabledStatistics::Page)
3605                .set_offset_index_disabled(true)
3606                .build(),
3607        );
3608        let column_writer = get_column_writer(descr, props, get_test_page_writer());
3609        let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3610
3611        let data = Vec::new();
3612        let def_levels = vec![0; 10];
3613        writer.write_batch(&data, Some(&def_levels), None).unwrap();
3614        writer.flush_data_pages().unwrap();
3615
3616        let column_close_result = writer.close().unwrap();
3617        assert!(column_close_result.offset_index.is_some());
3618        assert!(column_close_result.column_index.is_some());
3619    }
3620
3621    #[test]
3622    fn test_boundary_order() -> Result<()> {
3623        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3624        // min max both ascending
3625        let column_close_result = write_multiple_pages::<Int32Type>(
3626            &descr,
3627            &[
3628                &[Some(-10), Some(10)],
3629                &[Some(-5), Some(11)],
3630                &[None],
3631                &[Some(-5), Some(11)],
3632            ],
3633        )?;
3634        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3635        assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3636
3637        // min max both descending
3638        let column_close_result = write_multiple_pages::<Int32Type>(
3639            &descr,
3640            &[
3641                &[Some(10), Some(11)],
3642                &[Some(5), Some(11)],
3643                &[None],
3644                &[Some(-5), Some(0)],
3645            ],
3646        )?;
3647        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3648        assert_eq!(boundary_order, BoundaryOrder::DESCENDING);
3649
3650        // min max both equal
3651        let column_close_result = write_multiple_pages::<Int32Type>(
3652            &descr,
3653            &[&[Some(10), Some(11)], &[None], &[Some(10), Some(11)]],
3654        )?;
3655        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3656        assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3657
3658        // only nulls
3659        let column_close_result =
3660            write_multiple_pages::<Int32Type>(&descr, &[&[None], &[None], &[None]])?;
3661        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3662        assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3663
3664        // one page
3665        let column_close_result =
3666            write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)]])?;
3667        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3668        assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3669
3670        // one non-null page
3671        let column_close_result =
3672            write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)], &[None]])?;
3673        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3674        assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3675
3676        // min max both unordered
3677        let column_close_result = write_multiple_pages::<Int32Type>(
3678            &descr,
3679            &[
3680                &[Some(10), Some(11)],
3681                &[Some(11), Some(16)],
3682                &[None],
3683                &[Some(-5), Some(0)],
3684            ],
3685        )?;
3686        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3687        assert_eq!(boundary_order, BoundaryOrder::UNORDERED);
3688
3689        // min max both ordered in different orders
3690        let column_close_result = write_multiple_pages::<Int32Type>(
3691            &descr,
3692            &[
3693                &[Some(1), Some(9)],
3694                &[Some(2), Some(8)],
3695                &[None],
3696                &[Some(3), Some(7)],
3697            ],
3698        )?;
3699        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3700        assert_eq!(boundary_order, BoundaryOrder::UNORDERED);
3701
3702        Ok(())
3703    }
3704
3705    #[test]
3706    fn test_boundary_order_logical_type() -> Result<()> {
3707        // ensure that logical types account for different sort order than underlying
3708        // physical type representation
3709        let f16_descr = Arc::new(get_test_float16_column_descr(1, 0));
3710        let fba_descr = {
3711            let tpe = SchemaType::primitive_type_builder(
3712                "col",
3713                FixedLenByteArrayType::get_physical_type(),
3714            )
3715            .with_length(2)
3716            .build()?;
3717            Arc::new(ColumnDescriptor::new(
3718                Arc::new(tpe),
3719                1,
3720                0,
3721                ColumnPath::from("col"),
3722            ))
3723        };
3724
3725        let values: &[&[Option<FixedLenByteArray>]] = &[
3726            &[Some(FixedLenByteArray::from(ByteArray::from(f16::ONE)))],
3727            &[Some(FixedLenByteArray::from(ByteArray::from(f16::ZERO)))],
3728            &[Some(FixedLenByteArray::from(ByteArray::from(
3729                f16::NEG_ZERO,
3730            )))],
3731            &[Some(FixedLenByteArray::from(ByteArray::from(f16::NEG_ONE)))],
3732        ];
3733
3734        // f16 descending
3735        let column_close_result =
3736            write_multiple_pages::<FixedLenByteArrayType>(&f16_descr, values)?;
3737        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3738        assert_eq!(boundary_order, BoundaryOrder::DESCENDING);
3739
3740        // same bytes, but fba unordered
3741        let column_close_result =
3742            write_multiple_pages::<FixedLenByteArrayType>(&fba_descr, values)?;
3743        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3744        assert_eq!(boundary_order, BoundaryOrder::UNORDERED);
3745
3746        Ok(())
3747    }
3748
3749    #[test]
3750    fn test_interval_stats_should_not_have_min_max() {
3751        let input = [
3752            vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
3753            vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
3754            vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2],
3755        ]
3756        .into_iter()
3757        .map(|s| ByteArray::from(s).into())
3758        .collect::<Vec<_>>();
3759
3760        let page_writer = get_test_page_writer();
3761        let mut writer = get_test_interval_column_writer(page_writer);
3762        writer.write_batch(&input, None, None).unwrap();
3763
3764        let metadata = writer.close().unwrap().metadata;
3765        let stats = if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
3766            stats.clone()
3767        } else {
3768            panic!("metadata missing statistics");
3769        };
3770        assert!(stats.min_bytes_opt().is_none());
3771        assert!(stats.max_bytes_opt().is_none());
3772    }
3773
3774    #[test]
3775    #[cfg(feature = "arrow")]
3776    fn test_column_writer_get_estimated_total_bytes() {
3777        let page_writer = get_test_page_writer();
3778        let props = Default::default();
3779        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
3780        assert_eq!(writer.get_estimated_total_bytes(), 0);
3781
3782        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
3783        writer.add_data_page().unwrap();
3784        let size_with_one_page = writer.get_estimated_total_bytes();
3785        assert_eq!(size_with_one_page, 20);
3786
3787        writer.write_batch(&[5, 6, 7, 8], None, None).unwrap();
3788        writer.add_data_page().unwrap();
3789        let size_with_two_pages = writer.get_estimated_total_bytes();
3790        // different pages have different compressed lengths
3791        assert_eq!(size_with_two_pages, 20 + 21);
3792    }
3793
3794    fn write_multiple_pages<T: DataType>(
3795        column_descr: &Arc<ColumnDescriptor>,
3796        pages: &[&[Option<T::T>]],
3797    ) -> Result<ColumnCloseResult> {
3798        let column_writer = get_column_writer(
3799            column_descr.clone(),
3800            Default::default(),
3801            get_test_page_writer(),
3802        );
3803        let mut writer = get_typed_column_writer::<T>(column_writer);
3804
3805        for &page in pages {
3806            let values = page.iter().filter_map(Clone::clone).collect::<Vec<_>>();
3807            let def_levels = page
3808                .iter()
3809                .map(|maybe_value| if maybe_value.is_some() { 1 } else { 0 })
3810                .collect::<Vec<_>>();
3811            writer.write_batch(&values, Some(&def_levels), None)?;
3812            writer.flush_data_pages()?;
3813        }
3814
3815        writer.close()
3816    }
3817
3818    /// Performs write-read roundtrip with randomly generated values and levels.
3819    /// `max_size` is maximum number of values or levels (if `max_def_level` > 0) to write
3820    /// for a column.
3821    fn column_roundtrip_random<T: DataType>(
3822        props: WriterProperties,
3823        max_size: usize,
3824        min_value: T::T,
3825        max_value: T::T,
3826        max_def_level: i16,
3827        max_rep_level: i16,
3828    ) where
3829        T::T: PartialOrd + SampleUniform + Copy,
3830    {
3831        let mut num_values: usize = 0;
3832
3833        let mut buf: Vec<i16> = Vec::new();
3834        let def_levels = if max_def_level > 0 {
3835            random_numbers_range(max_size, 0, max_def_level + 1, &mut buf);
3836            for &dl in &buf[..] {
3837                if dl == max_def_level {
3838                    num_values += 1;
3839                }
3840            }
3841            Some(&buf[..])
3842        } else {
3843            num_values = max_size;
3844            None
3845        };
3846
3847        let mut buf: Vec<i16> = Vec::new();
3848        let rep_levels = if max_rep_level > 0 {
3849            random_numbers_range(max_size, 0, max_rep_level + 1, &mut buf);
3850            buf[0] = 0; // Must start on record boundary
3851            Some(&buf[..])
3852        } else {
3853            None
3854        };
3855
3856        let mut values: Vec<T::T> = Vec::new();
3857        random_numbers_range(num_values, min_value, max_value, &mut values);
3858
3859        column_roundtrip::<T>(props, &values[..], def_levels, rep_levels);
3860    }
3861
3862    /// Performs write-read roundtrip and asserts written values and levels.
3863    fn column_roundtrip<T: DataType>(
3864        props: WriterProperties,
3865        values: &[T::T],
3866        def_levels: Option<&[i16]>,
3867        rep_levels: Option<&[i16]>,
3868    ) {
3869        let mut file = tempfile::tempfile().unwrap();
3870        let mut write = TrackedWrite::new(&mut file);
3871        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
3872
3873        let max_def_level = match def_levels {
3874            Some(buf) => *buf.iter().max().unwrap_or(&0i16),
3875            None => 0i16,
3876        };
3877
3878        let max_rep_level = match rep_levels {
3879            Some(buf) => *buf.iter().max().unwrap_or(&0i16),
3880            None => 0i16,
3881        };
3882
3883        let mut max_batch_size = values.len();
3884        if let Some(levels) = def_levels {
3885            max_batch_size = max_batch_size.max(levels.len());
3886        }
3887        if let Some(levels) = rep_levels {
3888            max_batch_size = max_batch_size.max(levels.len());
3889        }
3890
3891        let mut writer =
3892            get_test_column_writer::<T>(page_writer, max_def_level, max_rep_level, Arc::new(props));
3893
3894        let values_written = writer.write_batch(values, def_levels, rep_levels).unwrap();
3895        assert_eq!(values_written, values.len());
3896        let result = writer.close().unwrap();
3897
3898        drop(write);
3899
3900        let props = ReaderProperties::builder()
3901            .set_backward_compatible_lz4(false)
3902            .build();
3903        let page_reader = Box::new(
3904            SerializedPageReader::new_with_properties(
3905                Arc::new(file),
3906                &result.metadata,
3907                result.rows_written as usize,
3908                None,
3909                Arc::new(props),
3910            )
3911            .unwrap(),
3912        );
3913        let mut reader = get_test_column_reader::<T>(page_reader, max_def_level, max_rep_level);
3914
3915        let mut actual_values = Vec::with_capacity(max_batch_size);
3916        let mut actual_def_levels = def_levels.map(|_| Vec::with_capacity(max_batch_size));
3917        let mut actual_rep_levels = rep_levels.map(|_| Vec::with_capacity(max_batch_size));
3918
3919        let (_, values_read, levels_read) = reader
3920            .read_records(
3921                max_batch_size,
3922                actual_def_levels.as_mut(),
3923                actual_rep_levels.as_mut(),
3924                &mut actual_values,
3925            )
3926            .unwrap();
3927
3928        // Assert values, definition and repetition levels.
3929
3930        assert_eq!(&actual_values[..values_read], values);
3931        match actual_def_levels {
3932            Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), def_levels),
3933            None => assert_eq!(None, def_levels),
3934        }
3935        match actual_rep_levels {
3936            Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), rep_levels),
3937            None => assert_eq!(None, rep_levels),
3938        }
3939
3940        // Assert written rows.
3941
3942        if let Some(levels) = actual_rep_levels {
3943            let mut actual_rows_written = 0;
3944            for l in levels {
3945                if l == 0 {
3946                    actual_rows_written += 1;
3947                }
3948            }
3949            assert_eq!(actual_rows_written, result.rows_written);
3950        } else if actual_def_levels.is_some() {
3951            assert_eq!(levels_read as u64, result.rows_written);
3952        } else {
3953            assert_eq!(values_read as u64, result.rows_written);
3954        }
3955    }
3956
3957    /// Performs write of provided values and returns column metadata of those values.
3958    /// Used to test encoding support for column writer.
3959    fn column_write_and_get_metadata<T: DataType>(
3960        props: WriterProperties,
3961        values: &[T::T],
3962    ) -> ColumnChunkMetaData {
3963        let page_writer = get_test_page_writer();
3964        let props = Arc::new(props);
3965        let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
3966        writer.write_batch(values, None, None).unwrap();
3967        writer.close().unwrap().metadata
3968    }
3969
3970    // Helper function to more compactly create a PageEncodingStats struct.
3971    fn encoding_stats(page_type: PageType, encoding: Encoding, count: i32) -> PageEncodingStats {
3972        PageEncodingStats {
3973            page_type,
3974            encoding,
3975            count,
3976        }
3977    }
3978
3979    // Function to use in tests for EncodingWriteSupport. This checks that dictionary
3980    // offset and encodings to make sure that column writer uses provided by trait
3981    // encodings.
3982    fn check_encoding_write_support<T: DataType>(
3983        version: WriterVersion,
3984        dict_enabled: bool,
3985        data: &[T::T],
3986        dictionary_page_offset: Option<i64>,
3987        encodings: &[Encoding],
3988        page_encoding_stats: &[PageEncodingStats],
3989    ) {
3990        let props = WriterProperties::builder()
3991            .set_writer_version(version)
3992            .set_dictionary_enabled(dict_enabled)
3993            .build();
3994        let meta = column_write_and_get_metadata::<T>(props, data);
3995        assert_eq!(meta.dictionary_page_offset(), dictionary_page_offset);
3996        assert_eq!(meta.encodings(), encodings);
3997        assert_eq!(meta.page_encoding_stats().unwrap(), page_encoding_stats);
3998    }
3999
4000    /// Returns column writer.
4001    fn get_test_column_writer<'a, T: DataType>(
4002        page_writer: Box<dyn PageWriter + 'a>,
4003        max_def_level: i16,
4004        max_rep_level: i16,
4005        props: WriterPropertiesPtr,
4006    ) -> ColumnWriterImpl<'a, T> {
4007        let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
4008        let column_writer = get_column_writer(descr, props, page_writer);
4009        get_typed_column_writer::<T>(column_writer)
4010    }
4011
4012    /// Returns column reader.
4013    fn get_test_column_reader<T: DataType>(
4014        page_reader: Box<dyn PageReader>,
4015        max_def_level: i16,
4016        max_rep_level: i16,
4017    ) -> ColumnReaderImpl<T> {
4018        let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
4019        let column_reader = get_column_reader(descr, page_reader);
4020        get_typed_column_reader::<T>(column_reader)
4021    }
4022
4023    /// Returns descriptor for primitive column.
4024    fn get_test_column_descr<T: DataType>(
4025        max_def_level: i16,
4026        max_rep_level: i16,
4027    ) -> ColumnDescriptor {
4028        let path = ColumnPath::from("col");
4029        let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4030            // length is set for "encoding support" tests for FIXED_LEN_BYTE_ARRAY type,
4031            // it should be no-op for other types
4032            .with_length(1)
4033            .build()
4034            .unwrap();
4035        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4036    }
4037
4038    /// Returns page writer that collects pages without serializing them.
4039    fn get_test_page_writer() -> Box<dyn PageWriter> {
4040        Box::new(TestPageWriter {})
4041    }
4042
4043    struct TestPageWriter {}
4044
4045    impl PageWriter for TestPageWriter {
4046        fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
4047            let mut res = PageWriteSpec::new();
4048            res.page_type = page.page_type();
4049            res.uncompressed_size = page.uncompressed_size();
4050            res.compressed_size = page.compressed_size();
4051            res.num_values = page.num_values();
4052            res.offset = 0;
4053            res.bytes_written = page.data().len() as u64;
4054            Ok(res)
4055        }
4056
4057        fn close(&mut self) -> Result<()> {
4058            Ok(())
4059        }
4060    }
4061
4062    /// Write data into parquet using [`get_test_page_writer`] and [`get_test_column_writer`] and returns generated statistics.
4063    fn statistics_roundtrip<T: DataType>(values: &[<T as DataType>::T]) -> Statistics {
4064        let page_writer = get_test_page_writer();
4065        let props = Default::default();
4066        let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
4067        writer.write_batch(values, None, None).unwrap();
4068
4069        let metadata = writer.close().unwrap().metadata;
4070        if let Some(stats) = metadata.statistics() {
4071            stats.clone()
4072        } else {
4073            panic!("metadata missing statistics");
4074        }
4075    }
4076
4077    /// Returns Decimals column writer.
4078    fn get_test_decimals_column_writer<T: DataType>(
4079        page_writer: Box<dyn PageWriter>,
4080        max_def_level: i16,
4081        max_rep_level: i16,
4082        props: WriterPropertiesPtr,
4083    ) -> ColumnWriterImpl<'static, T> {
4084        let descr = Arc::new(get_test_decimals_column_descr::<T>(
4085            max_def_level,
4086            max_rep_level,
4087        ));
4088        let column_writer = get_column_writer(descr, props, page_writer);
4089        get_typed_column_writer::<T>(column_writer)
4090    }
4091
4092    /// Returns descriptor for Decimal type with primitive column.
4093    fn get_test_decimals_column_descr<T: DataType>(
4094        max_def_level: i16,
4095        max_rep_level: i16,
4096    ) -> ColumnDescriptor {
4097        let path = ColumnPath::from("col");
4098        let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4099            .with_length(16)
4100            .with_logical_type(Some(LogicalType::Decimal {
4101                scale: 2,
4102                precision: 3,
4103            }))
4104            .with_scale(2)
4105            .with_precision(3)
4106            .build()
4107            .unwrap();
4108        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4109    }
4110
4111    fn float16_statistics_roundtrip(
4112        values: &[FixedLenByteArray],
4113    ) -> ValueStatistics<FixedLenByteArray> {
4114        let page_writer = get_test_page_writer();
4115        let mut writer = get_test_float16_column_writer(page_writer, Default::default());
4116        writer.write_batch(values, None, None).unwrap();
4117
4118        let metadata = writer.close().unwrap().metadata;
4119        if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
4120            stats.clone()
4121        } else {
4122            panic!("metadata missing statistics");
4123        }
4124    }
4125
4126    fn get_test_float16_column_writer(
4127        page_writer: Box<dyn PageWriter>,
4128        props: WriterPropertiesPtr,
4129    ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
4130        let descr = Arc::new(get_test_float16_column_descr(0, 0));
4131        let column_writer = get_column_writer(descr, props, page_writer);
4132        get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
4133    }
4134
4135    fn get_test_float16_column_descr(max_def_level: i16, max_rep_level: i16) -> ColumnDescriptor {
4136        let path = ColumnPath::from("col");
4137        let tpe =
4138            SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
4139                .with_length(2)
4140                .with_logical_type(Some(LogicalType::Float16))
4141                .build()
4142                .unwrap();
4143        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4144    }
4145
4146    fn get_test_interval_column_writer(
4147        page_writer: Box<dyn PageWriter>,
4148    ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
4149        let descr = Arc::new(get_test_interval_column_descr());
4150        let column_writer = get_column_writer(descr, Default::default(), page_writer);
4151        get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
4152    }
4153
4154    fn get_test_interval_column_descr() -> ColumnDescriptor {
4155        let path = ColumnPath::from("col");
4156        let tpe =
4157            SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
4158                .with_length(12)
4159                .with_converted_type(ConvertedType::INTERVAL)
4160                .build()
4161                .unwrap();
4162        ColumnDescriptor::new(Arc::new(tpe), 0, 0, path)
4163    }
4164
4165    /// Returns column writer for UINT32 Column provided as ConvertedType only
4166    fn get_test_unsigned_int_given_as_converted_column_writer<'a, T: DataType>(
4167        page_writer: Box<dyn PageWriter + 'a>,
4168        max_def_level: i16,
4169        max_rep_level: i16,
4170        props: WriterPropertiesPtr,
4171    ) -> ColumnWriterImpl<'a, T> {
4172        let descr = Arc::new(get_test_converted_type_unsigned_integer_column_descr::<T>(
4173            max_def_level,
4174            max_rep_level,
4175        ));
4176        let column_writer = get_column_writer(descr, props, page_writer);
4177        get_typed_column_writer::<T>(column_writer)
4178    }
4179
4180    /// Returns column descriptor for UINT32 Column provided as ConvertedType only
4181    fn get_test_converted_type_unsigned_integer_column_descr<T: DataType>(
4182        max_def_level: i16,
4183        max_rep_level: i16,
4184    ) -> ColumnDescriptor {
4185        let path = ColumnPath::from("col");
4186        let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4187            .with_converted_type(ConvertedType::UINT_32)
4188            .build()
4189            .unwrap();
4190        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4191    }
4192}