parquet/column/writer/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains column writer API.
19
20use bytes::Bytes;
21use half::f16;
22
23use crate::bloom_filter::Sbbf;
24use crate::format::{BoundaryOrder, ColumnIndex, OffsetIndex};
25use std::collections::{BTreeSet, VecDeque};
26use std::str;
27
28use crate::basic::{Compression, ConvertedType, Encoding, LogicalType, PageType, Type};
29use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter};
30use crate::column::writer::encoder::{ColumnValueEncoder, ColumnValueEncoderImpl, ColumnValues};
31use crate::compression::{create_codec, Codec, CodecOptionsBuilder};
32use crate::data_type::private::ParquetValueType;
33use crate::data_type::*;
34use crate::encodings::levels::LevelEncoder;
35#[cfg(feature = "encryption")]
36use crate::encryption::encrypt::get_column_crypto_metadata;
37use crate::errors::{ParquetError, Result};
38use crate::file::metadata::{
39    ColumnChunkMetaDataBuilder, ColumnIndexBuilder, LevelHistogram, OffsetIndexBuilder,
40};
41use crate::file::properties::EnabledStatistics;
42use crate::file::statistics::{Statistics, ValueStatistics};
43use crate::file::{
44    metadata::ColumnChunkMetaData,
45    properties::{WriterProperties, WriterPropertiesPtr, WriterVersion},
46};
47use crate::schema::types::{ColumnDescPtr, ColumnDescriptor};
48
49pub(crate) mod encoder;
50
51macro_rules! downcast_writer {
52    ($e:expr, $i:ident, $b:expr) => {
53        match $e {
54            Self::BoolColumnWriter($i) => $b,
55            Self::Int32ColumnWriter($i) => $b,
56            Self::Int64ColumnWriter($i) => $b,
57            Self::Int96ColumnWriter($i) => $b,
58            Self::FloatColumnWriter($i) => $b,
59            Self::DoubleColumnWriter($i) => $b,
60            Self::ByteArrayColumnWriter($i) => $b,
61            Self::FixedLenByteArrayColumnWriter($i) => $b,
62        }
63    };
64}
65
66/// Column writer for a Parquet type.
67pub enum ColumnWriter<'a> {
68    /// Column writer for boolean type
69    BoolColumnWriter(ColumnWriterImpl<'a, BoolType>),
70    /// Column writer for int32 type
71    Int32ColumnWriter(ColumnWriterImpl<'a, Int32Type>),
72    /// Column writer for int64 type
73    Int64ColumnWriter(ColumnWriterImpl<'a, Int64Type>),
74    /// Column writer for int96 (timestamp) type
75    Int96ColumnWriter(ColumnWriterImpl<'a, Int96Type>),
76    /// Column writer for float type
77    FloatColumnWriter(ColumnWriterImpl<'a, FloatType>),
78    /// Column writer for double type
79    DoubleColumnWriter(ColumnWriterImpl<'a, DoubleType>),
80    /// Column writer for byte array type
81    ByteArrayColumnWriter(ColumnWriterImpl<'a, ByteArrayType>),
82    /// Column writer for fixed length byte array type
83    FixedLenByteArrayColumnWriter(ColumnWriterImpl<'a, FixedLenByteArrayType>),
84}
85
86impl ColumnWriter<'_> {
87    /// Returns the estimated total memory usage
88    #[cfg(feature = "arrow")]
89    pub(crate) fn memory_size(&self) -> usize {
90        downcast_writer!(self, typed, typed.memory_size())
91    }
92
93    /// Returns the estimated total encoded bytes for this column writer
94    #[cfg(feature = "arrow")]
95    pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
96        downcast_writer!(self, typed, typed.get_estimated_total_bytes())
97    }
98
99    /// Close this [`ColumnWriter`]
100    pub fn close(self) -> Result<ColumnCloseResult> {
101        downcast_writer!(self, typed, typed.close())
102    }
103}
104
105#[deprecated(
106    since = "54.0.0",
107    note = "Seems like a stray and nobody knows what's it for. Will be removed in the next release."
108)]
109#[allow(missing_docs)]
110pub enum Level {
111    Page,
112    Column,
113}
114
115/// Gets a specific column writer corresponding to column descriptor `descr`.
116pub fn get_column_writer<'a>(
117    descr: ColumnDescPtr,
118    props: WriterPropertiesPtr,
119    page_writer: Box<dyn PageWriter + 'a>,
120) -> ColumnWriter<'a> {
121    match descr.physical_type() {
122        Type::BOOLEAN => {
123            ColumnWriter::BoolColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
124        }
125        Type::INT32 => {
126            ColumnWriter::Int32ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
127        }
128        Type::INT64 => {
129            ColumnWriter::Int64ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
130        }
131        Type::INT96 => {
132            ColumnWriter::Int96ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
133        }
134        Type::FLOAT => {
135            ColumnWriter::FloatColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
136        }
137        Type::DOUBLE => {
138            ColumnWriter::DoubleColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
139        }
140        Type::BYTE_ARRAY => {
141            ColumnWriter::ByteArrayColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
142        }
143        Type::FIXED_LEN_BYTE_ARRAY => ColumnWriter::FixedLenByteArrayColumnWriter(
144            ColumnWriterImpl::new(descr, props, page_writer),
145        ),
146    }
147}
148
149/// Gets a typed column writer for the specific type `T`, by "up-casting" `col_writer` of
150/// non-generic type to a generic column writer type `ColumnWriterImpl`.
151///
152/// Panics if actual enum value for `col_writer` does not match the type `T`.
153pub fn get_typed_column_writer<T: DataType>(col_writer: ColumnWriter) -> ColumnWriterImpl<T> {
154    T::get_column_writer(col_writer).unwrap_or_else(|| {
155        panic!(
156            "Failed to convert column writer into a typed column writer for `{}` type",
157            T::get_physical_type()
158        )
159    })
160}
161
162/// Similar to `get_typed_column_writer` but returns a reference.
163pub fn get_typed_column_writer_ref<'a, 'b: 'a, T: DataType>(
164    col_writer: &'b ColumnWriter<'a>,
165) -> &'b ColumnWriterImpl<'a, T> {
166    T::get_column_writer_ref(col_writer).unwrap_or_else(|| {
167        panic!(
168            "Failed to convert column writer into a typed column writer for `{}` type",
169            T::get_physical_type()
170        )
171    })
172}
173
174/// Similar to `get_typed_column_writer` but returns a reference.
175pub fn get_typed_column_writer_mut<'a, 'b: 'a, T: DataType>(
176    col_writer: &'a mut ColumnWriter<'b>,
177) -> &'a mut ColumnWriterImpl<'b, T> {
178    T::get_column_writer_mut(col_writer).unwrap_or_else(|| {
179        panic!(
180            "Failed to convert column writer into a typed column writer for `{}` type",
181            T::get_physical_type()
182        )
183    })
184}
185
186/// Metadata returned by [`GenericColumnWriter::close`]
187#[derive(Debug, Clone)]
188pub struct ColumnCloseResult {
189    /// The total number of bytes written
190    pub bytes_written: u64,
191    /// The total number of rows written
192    pub rows_written: u64,
193    /// Metadata for this column chunk
194    pub metadata: ColumnChunkMetaData,
195    /// Optional bloom filter for this column
196    pub bloom_filter: Option<Sbbf>,
197    /// Optional column index, for filtering
198    pub column_index: Option<ColumnIndex>,
199    /// Optional offset index, identifying page locations
200    pub offset_index: Option<OffsetIndex>,
201}
202
203// Metrics per page
204#[derive(Default)]
205struct PageMetrics {
206    num_buffered_values: u32,
207    num_buffered_rows: u32,
208    num_page_nulls: u64,
209    repetition_level_histogram: Option<LevelHistogram>,
210    definition_level_histogram: Option<LevelHistogram>,
211}
212
213impl PageMetrics {
214    fn new() -> Self {
215        Default::default()
216    }
217
218    /// Initialize the repetition level histogram
219    fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
220        self.repetition_level_histogram = LevelHistogram::try_new(max_level);
221        self
222    }
223
224    /// Initialize the definition level histogram
225    fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
226        self.definition_level_histogram = LevelHistogram::try_new(max_level);
227        self
228    }
229
230    /// Resets the state of this `PageMetrics` to the initial state.
231    /// If histograms have been initialized their contents will be reset to zero.
232    fn new_page(&mut self) {
233        self.num_buffered_values = 0;
234        self.num_buffered_rows = 0;
235        self.num_page_nulls = 0;
236        self.repetition_level_histogram
237            .as_mut()
238            .map(LevelHistogram::reset);
239        self.definition_level_histogram
240            .as_mut()
241            .map(LevelHistogram::reset);
242    }
243
244    /// Updates histogram values using provided repetition levels
245    fn update_repetition_level_histogram(&mut self, levels: &[i16]) {
246        if let Some(ref mut rep_hist) = self.repetition_level_histogram {
247            rep_hist.update_from_levels(levels);
248        }
249    }
250
251    /// Updates histogram values using provided definition levels
252    fn update_definition_level_histogram(&mut self, levels: &[i16]) {
253        if let Some(ref mut def_hist) = self.definition_level_histogram {
254            def_hist.update_from_levels(levels);
255        }
256    }
257}
258
259// Metrics per column writer
260#[derive(Default)]
261struct ColumnMetrics<T: Default> {
262    total_bytes_written: u64,
263    total_rows_written: u64,
264    total_uncompressed_size: u64,
265    total_compressed_size: u64,
266    total_num_values: u64,
267    dictionary_page_offset: Option<u64>,
268    data_page_offset: Option<u64>,
269    min_column_value: Option<T>,
270    max_column_value: Option<T>,
271    num_column_nulls: u64,
272    column_distinct_count: Option<u64>,
273    variable_length_bytes: Option<i64>,
274    repetition_level_histogram: Option<LevelHistogram>,
275    definition_level_histogram: Option<LevelHistogram>,
276}
277
278impl<T: Default> ColumnMetrics<T> {
279    fn new() -> Self {
280        Default::default()
281    }
282
283    /// Initialize the repetition level histogram
284    fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
285        self.repetition_level_histogram = LevelHistogram::try_new(max_level);
286        self
287    }
288
289    /// Initialize the definition level histogram
290    fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
291        self.definition_level_histogram = LevelHistogram::try_new(max_level);
292        self
293    }
294
295    /// Sum `page_histogram` into `chunk_histogram`
296    fn update_histogram(
297        chunk_histogram: &mut Option<LevelHistogram>,
298        page_histogram: &Option<LevelHistogram>,
299    ) {
300        if let (Some(page_hist), Some(chunk_hist)) = (page_histogram, chunk_histogram) {
301            chunk_hist.add(page_hist);
302        }
303    }
304
305    /// Sum the provided PageMetrics histograms into the chunk histograms. Does nothing if
306    /// page histograms are not initialized.
307    fn update_from_page_metrics(&mut self, page_metrics: &PageMetrics) {
308        ColumnMetrics::<T>::update_histogram(
309            &mut self.definition_level_histogram,
310            &page_metrics.definition_level_histogram,
311        );
312        ColumnMetrics::<T>::update_histogram(
313            &mut self.repetition_level_histogram,
314            &page_metrics.repetition_level_histogram,
315        );
316    }
317
318    /// Sum the provided page variable_length_bytes into the chunk variable_length_bytes
319    fn update_variable_length_bytes(&mut self, variable_length_bytes: Option<i64>) {
320        if let Some(var_bytes) = variable_length_bytes {
321            *self.variable_length_bytes.get_or_insert(0) += var_bytes;
322        }
323    }
324}
325
326/// Typed column writer for a primitive column.
327pub type ColumnWriterImpl<'a, T> = GenericColumnWriter<'a, ColumnValueEncoderImpl<T>>;
328
329/// Generic column writer for a primitive column.
330pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> {
331    // Column writer properties
332    descr: ColumnDescPtr,
333    props: WriterPropertiesPtr,
334    statistics_enabled: EnabledStatistics,
335
336    page_writer: Box<dyn PageWriter + 'a>,
337    codec: Compression,
338    compressor: Option<Box<dyn Codec>>,
339    encoder: E,
340
341    page_metrics: PageMetrics,
342    // Metrics per column writer
343    column_metrics: ColumnMetrics<E::T>,
344
345    /// The order of encodings within the generated metadata does not impact its meaning,
346    /// but we use a BTreeSet so that the output is deterministic
347    encodings: BTreeSet<Encoding>,
348    // Reused buffers
349    def_levels_sink: Vec<i16>,
350    rep_levels_sink: Vec<i16>,
351    data_pages: VecDeque<CompressedPage>,
352    // column index and offset index
353    column_index_builder: ColumnIndexBuilder,
354    offset_index_builder: Option<OffsetIndexBuilder>,
355
356    // Below fields used to incrementally check boundary order across data pages.
357    // We assume they are ascending/descending until proven wrong.
358    data_page_boundary_ascending: bool,
359    data_page_boundary_descending: bool,
360    /// (min, max)
361    last_non_null_data_page_min_max: Option<(E::T, E::T)>,
362}
363
364impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
365    /// Returns a new instance of [`GenericColumnWriter`].
366    pub fn new(
367        descr: ColumnDescPtr,
368        props: WriterPropertiesPtr,
369        page_writer: Box<dyn PageWriter + 'a>,
370    ) -> Self {
371        let codec = props.compression(descr.path());
372        let codec_options = CodecOptionsBuilder::default().build();
373        let compressor = create_codec(codec, &codec_options).unwrap();
374        let encoder = E::try_new(&descr, props.as_ref()).unwrap();
375
376        let statistics_enabled = props.statistics_enabled(descr.path());
377
378        let mut encodings = BTreeSet::new();
379        // Used for level information
380        encodings.insert(Encoding::RLE);
381
382        let mut page_metrics = PageMetrics::new();
383        let mut column_metrics = ColumnMetrics::<E::T>::new();
384
385        // Initialize level histograms if collecting page or chunk statistics
386        if statistics_enabled != EnabledStatistics::None {
387            page_metrics = page_metrics
388                .with_repetition_level_histogram(descr.max_rep_level())
389                .with_definition_level_histogram(descr.max_def_level());
390            column_metrics = column_metrics
391                .with_repetition_level_histogram(descr.max_rep_level())
392                .with_definition_level_histogram(descr.max_def_level())
393        }
394
395        // Disable column_index_builder if not collecting page statistics.
396        let mut column_index_builder = ColumnIndexBuilder::new();
397        if statistics_enabled != EnabledStatistics::Page {
398            column_index_builder.to_invalid()
399        }
400
401        // Disable offset_index_builder if requested by user.
402        let offset_index_builder = match props.offset_index_disabled() {
403            false => Some(OffsetIndexBuilder::new()),
404            _ => None,
405        };
406
407        Self {
408            descr,
409            props,
410            statistics_enabled,
411            page_writer,
412            codec,
413            compressor,
414            encoder,
415            def_levels_sink: vec![],
416            rep_levels_sink: vec![],
417            data_pages: VecDeque::new(),
418            page_metrics,
419            column_metrics,
420            column_index_builder,
421            offset_index_builder,
422            encodings,
423            data_page_boundary_ascending: true,
424            data_page_boundary_descending: true,
425            last_non_null_data_page_min_max: None,
426        }
427    }
428
429    #[allow(clippy::too_many_arguments)]
430    pub(crate) fn write_batch_internal(
431        &mut self,
432        values: &E::Values,
433        value_indices: Option<&[usize]>,
434        def_levels: Option<&[i16]>,
435        rep_levels: Option<&[i16]>,
436        min: Option<&E::T>,
437        max: Option<&E::T>,
438        distinct_count: Option<u64>,
439    ) -> Result<usize> {
440        // Check if number of definition levels is the same as number of repetition levels.
441        if let (Some(def), Some(rep)) = (def_levels, rep_levels) {
442            if def.len() != rep.len() {
443                return Err(general_err!(
444                    "Inconsistent length of definition and repetition levels: {} != {}",
445                    def.len(),
446                    rep.len()
447                ));
448            }
449        }
450
451        // We check for DataPage limits only after we have inserted the values. If a user
452        // writes a large number of values, the DataPage size can be well above the limit.
453        //
454        // The purpose of this chunking is to bound this. Even if a user writes large
455        // number of values, the chunking will ensure that we add data page at a
456        // reasonable pagesize limit.
457
458        // TODO: find out why we don't account for size of levels when we estimate page
459        // size.
460
461        let num_levels = match def_levels {
462            Some(def_levels) => def_levels.len(),
463            None => values.len(),
464        };
465
466        if let Some(min) = min {
467            update_min(&self.descr, min, &mut self.column_metrics.min_column_value);
468        }
469        if let Some(max) = max {
470            update_max(&self.descr, max, &mut self.column_metrics.max_column_value);
471        }
472
473        // We can only set the distinct count if there are no other writes
474        if self.encoder.num_values() == 0 {
475            self.column_metrics.column_distinct_count = distinct_count;
476        } else {
477            self.column_metrics.column_distinct_count = None;
478        }
479
480        let mut values_offset = 0;
481        let mut levels_offset = 0;
482        let base_batch_size = self.props.write_batch_size();
483        while levels_offset < num_levels {
484            let mut end_offset = num_levels.min(levels_offset + base_batch_size);
485
486            // Split at record boundary
487            if let Some(r) = rep_levels {
488                while end_offset < r.len() && r[end_offset] != 0 {
489                    end_offset += 1;
490                }
491            }
492
493            values_offset += self.write_mini_batch(
494                values,
495                values_offset,
496                value_indices,
497                end_offset - levels_offset,
498                def_levels.map(|lv| &lv[levels_offset..end_offset]),
499                rep_levels.map(|lv| &lv[levels_offset..end_offset]),
500            )?;
501            levels_offset = end_offset;
502        }
503
504        // Return total number of values processed.
505        Ok(values_offset)
506    }
507
508    /// Writes batch of values, definition levels and repetition levels.
509    /// Returns number of values processed (written).
510    ///
511    /// If definition and repetition levels are provided, we write fully those levels and
512    /// select how many values to write (this number will be returned), since number of
513    /// actual written values may be smaller than provided values.
514    ///
515    /// If only values are provided, then all values are written and the length of
516    /// of the values buffer is returned.
517    ///
518    /// Definition and/or repetition levels can be omitted, if values are
519    /// non-nullable and/or non-repeated.
520    pub fn write_batch(
521        &mut self,
522        values: &E::Values,
523        def_levels: Option<&[i16]>,
524        rep_levels: Option<&[i16]>,
525    ) -> Result<usize> {
526        self.write_batch_internal(values, None, def_levels, rep_levels, None, None, None)
527    }
528
529    /// Writer may optionally provide pre-calculated statistics for use when computing
530    /// chunk-level statistics
531    ///
532    /// NB: [`WriterProperties::statistics_enabled`] must be set to [`EnabledStatistics::Chunk`]
533    /// for these statistics to take effect. If [`EnabledStatistics::None`] they will be ignored,
534    /// and if [`EnabledStatistics::Page`] the chunk statistics will instead be computed from the
535    /// computed page statistics
536    pub fn write_batch_with_statistics(
537        &mut self,
538        values: &E::Values,
539        def_levels: Option<&[i16]>,
540        rep_levels: Option<&[i16]>,
541        min: Option<&E::T>,
542        max: Option<&E::T>,
543        distinct_count: Option<u64>,
544    ) -> Result<usize> {
545        self.write_batch_internal(
546            values,
547            None,
548            def_levels,
549            rep_levels,
550            min,
551            max,
552            distinct_count,
553        )
554    }
555
556    /// Returns the estimated total memory usage.
557    ///
558    /// Unlike [`Self::get_estimated_total_bytes`] this is an estimate
559    /// of the current memory usage and not the final anticipated encoded size.
560    #[cfg(feature = "arrow")]
561    pub(crate) fn memory_size(&self) -> usize {
562        self.column_metrics.total_bytes_written as usize + self.encoder.estimated_memory_size()
563    }
564
565    /// Returns total number of bytes written by this column writer so far.
566    /// This value is also returned when column writer is closed.
567    ///
568    /// Note: this value does not include any buffered data that has not
569    /// yet been flushed to a page.
570    pub fn get_total_bytes_written(&self) -> u64 {
571        self.column_metrics.total_bytes_written
572    }
573
574    /// Returns the estimated total encoded bytes for this column writer.
575    ///
576    /// Unlike [`Self::get_total_bytes_written`] this includes an estimate
577    /// of any data that has not yet been flushed to a page, based on it's
578    /// anticipated encoded size.
579    #[cfg(feature = "arrow")]
580    pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
581        self.data_pages
582            .iter()
583            .map(|page| page.data().len() as u64)
584            .sum::<u64>()
585            + self.column_metrics.total_bytes_written
586            + self.encoder.estimated_data_page_size() as u64
587            + self.encoder.estimated_dict_page_size().unwrap_or_default() as u64
588    }
589
590    /// Returns total number of rows written by this column writer so far.
591    /// This value is also returned when column writer is closed.
592    pub fn get_total_rows_written(&self) -> u64 {
593        self.column_metrics.total_rows_written
594    }
595
596    /// Returns a reference to a [`ColumnDescPtr`]
597    pub fn get_descriptor(&self) -> &ColumnDescPtr {
598        &self.descr
599    }
600
601    /// Finalizes writes and closes the column writer.
602    /// Returns total bytes written, total rows written and column chunk metadata.
603    pub fn close(mut self) -> Result<ColumnCloseResult> {
604        if self.page_metrics.num_buffered_values > 0 {
605            self.add_data_page()?;
606        }
607        if self.encoder.has_dictionary() {
608            self.write_dictionary_page()?;
609        }
610        self.flush_data_pages()?;
611        let metadata = self.build_column_metadata()?;
612        self.page_writer.close()?;
613
614        let boundary_order = match (
615            self.data_page_boundary_ascending,
616            self.data_page_boundary_descending,
617        ) {
618            // If the lists are composed of equal elements then will be marked as ascending
619            // (Also the case if all pages are null pages)
620            (true, _) => BoundaryOrder::ASCENDING,
621            (false, true) => BoundaryOrder::DESCENDING,
622            (false, false) => BoundaryOrder::UNORDERED,
623        };
624        self.column_index_builder.set_boundary_order(boundary_order);
625
626        let column_index = self
627            .column_index_builder
628            .valid()
629            .then(|| self.column_index_builder.build_to_thrift());
630
631        let offset_index = self.offset_index_builder.map(|b| b.build_to_thrift());
632
633        Ok(ColumnCloseResult {
634            bytes_written: self.column_metrics.total_bytes_written,
635            rows_written: self.column_metrics.total_rows_written,
636            bloom_filter: self.encoder.flush_bloom_filter(),
637            metadata,
638            column_index,
639            offset_index,
640        })
641    }
642
643    /// Writes mini batch of values, definition and repetition levels.
644    /// This allows fine-grained processing of values and maintaining a reasonable
645    /// page size.
646    fn write_mini_batch(
647        &mut self,
648        values: &E::Values,
649        values_offset: usize,
650        value_indices: Option<&[usize]>,
651        num_levels: usize,
652        def_levels: Option<&[i16]>,
653        rep_levels: Option<&[i16]>,
654    ) -> Result<usize> {
655        // Process definition levels and determine how many values to write.
656        let values_to_write = if self.descr.max_def_level() > 0 {
657            let levels = def_levels.ok_or_else(|| {
658                general_err!(
659                    "Definition levels are required, because max definition level = {}",
660                    self.descr.max_def_level()
661                )
662            })?;
663
664            let mut values_to_write = 0;
665            for &level in levels {
666                if level == self.descr.max_def_level() {
667                    values_to_write += 1;
668                } else {
669                    // We must always compute this as it is used to populate v2 pages
670                    self.page_metrics.num_page_nulls += 1
671                }
672            }
673
674            // Update histogram
675            self.page_metrics.update_definition_level_histogram(levels);
676
677            self.def_levels_sink.extend_from_slice(levels);
678            values_to_write
679        } else {
680            num_levels
681        };
682
683        // Process repetition levels and determine how many rows we are about to process.
684        if self.descr.max_rep_level() > 0 {
685            // A row could contain more than one value.
686            let levels = rep_levels.ok_or_else(|| {
687                general_err!(
688                    "Repetition levels are required, because max repetition level = {}",
689                    self.descr.max_rep_level()
690                )
691            })?;
692
693            if !levels.is_empty() && levels[0] != 0 {
694                return Err(general_err!(
695                    "Write must start at a record boundary, got non-zero repetition level of {}",
696                    levels[0]
697                ));
698            }
699
700            // Count the occasions where we start a new row
701            for &level in levels {
702                self.page_metrics.num_buffered_rows += (level == 0) as u32
703            }
704
705            // Update histogram
706            self.page_metrics.update_repetition_level_histogram(levels);
707
708            self.rep_levels_sink.extend_from_slice(levels);
709        } else {
710            // Each value is exactly one row.
711            // Equals to the number of values, we count nulls as well.
712            self.page_metrics.num_buffered_rows += num_levels as u32;
713        }
714
715        match value_indices {
716            Some(indices) => {
717                let indices = &indices[values_offset..values_offset + values_to_write];
718                self.encoder.write_gather(values, indices)?;
719            }
720            None => self.encoder.write(values, values_offset, values_to_write)?,
721        }
722
723        self.page_metrics.num_buffered_values += num_levels as u32;
724
725        if self.should_add_data_page() {
726            self.add_data_page()?;
727        }
728
729        if self.should_dict_fallback() {
730            self.dict_fallback()?;
731        }
732
733        Ok(values_to_write)
734    }
735
736    /// Returns true if we need to fall back to non-dictionary encoding.
737    ///
738    /// We can only fall back if dictionary encoder is set and we have exceeded dictionary
739    /// size.
740    #[inline]
741    fn should_dict_fallback(&self) -> bool {
742        match self.encoder.estimated_dict_page_size() {
743            Some(size) => size >= self.props.dictionary_page_size_limit(),
744            None => false,
745        }
746    }
747
748    /// Returns true if there is enough data for a data page, false otherwise.
749    #[inline]
750    fn should_add_data_page(&self) -> bool {
751        // This is necessary in the event of a much larger dictionary size than page size
752        //
753        // In such a scenario the dictionary decoder may return an estimated encoded
754        // size in excess of the page size limit, even when there are no buffered values
755        if self.page_metrics.num_buffered_values == 0 {
756            return false;
757        }
758
759        self.page_metrics.num_buffered_rows as usize >= self.props.data_page_row_count_limit()
760            || self.encoder.estimated_data_page_size() >= self.props.data_page_size_limit()
761    }
762
763    /// Performs dictionary fallback.
764    /// Prepares and writes dictionary and all data pages into page writer.
765    fn dict_fallback(&mut self) -> Result<()> {
766        // At this point we know that we need to fall back.
767        if self.page_metrics.num_buffered_values > 0 {
768            self.add_data_page()?;
769        }
770        self.write_dictionary_page()?;
771        self.flush_data_pages()?;
772        Ok(())
773    }
774
775    /// Update the column index and offset index when adding the data page
776    fn update_column_offset_index(
777        &mut self,
778        page_statistics: Option<&ValueStatistics<E::T>>,
779        page_variable_length_bytes: Option<i64>,
780    ) {
781        // update the column index
782        let null_page =
783            (self.page_metrics.num_buffered_rows as u64) == self.page_metrics.num_page_nulls;
784        // a page contains only null values,
785        // and writers have to set the corresponding entries in min_values and max_values to byte[0]
786        if null_page && self.column_index_builder.valid() {
787            self.column_index_builder.append(
788                null_page,
789                vec![],
790                vec![],
791                self.page_metrics.num_page_nulls as i64,
792            );
793        } else if self.column_index_builder.valid() {
794            // from page statistics
795            // If can't get the page statistics, ignore this column/offset index for this column chunk
796            match &page_statistics {
797                None => {
798                    self.column_index_builder.to_invalid();
799                }
800                Some(stat) => {
801                    // Check if min/max are still ascending/descending across pages
802                    let new_min = stat.min_opt().unwrap();
803                    let new_max = stat.max_opt().unwrap();
804                    if let Some((last_min, last_max)) = &self.last_non_null_data_page_min_max {
805                        if self.data_page_boundary_ascending {
806                            // If last min/max are greater than new min/max then not ascending anymore
807                            let not_ascending = compare_greater(&self.descr, last_min, new_min)
808                                || compare_greater(&self.descr, last_max, new_max);
809                            if not_ascending {
810                                self.data_page_boundary_ascending = false;
811                            }
812                        }
813
814                        if self.data_page_boundary_descending {
815                            // If new min/max are greater than last min/max then not descending anymore
816                            let not_descending = compare_greater(&self.descr, new_min, last_min)
817                                || compare_greater(&self.descr, new_max, last_max);
818                            if not_descending {
819                                self.data_page_boundary_descending = false;
820                            }
821                        }
822                    }
823                    self.last_non_null_data_page_min_max = Some((new_min.clone(), new_max.clone()));
824
825                    if self.can_truncate_value() {
826                        self.column_index_builder.append(
827                            null_page,
828                            self.truncate_min_value(
829                                self.props.column_index_truncate_length(),
830                                stat.min_bytes_opt().unwrap(),
831                            )
832                            .0,
833                            self.truncate_max_value(
834                                self.props.column_index_truncate_length(),
835                                stat.max_bytes_opt().unwrap(),
836                            )
837                            .0,
838                            self.page_metrics.num_page_nulls as i64,
839                        );
840                    } else {
841                        self.column_index_builder.append(
842                            null_page,
843                            stat.min_bytes_opt().unwrap().to_vec(),
844                            stat.max_bytes_opt().unwrap().to_vec(),
845                            self.page_metrics.num_page_nulls as i64,
846                        );
847                    }
848                }
849            }
850        }
851
852        // Append page histograms to the `ColumnIndex` histograms
853        self.column_index_builder.append_histograms(
854            &self.page_metrics.repetition_level_histogram,
855            &self.page_metrics.definition_level_histogram,
856        );
857
858        // Update the offset index
859        if let Some(builder) = self.offset_index_builder.as_mut() {
860            builder.append_row_count(self.page_metrics.num_buffered_rows as i64);
861            builder.append_unencoded_byte_array_data_bytes(page_variable_length_bytes);
862        }
863    }
864
865    /// Determine if we should allow truncating min/max values for this column's statistics
866    fn can_truncate_value(&self) -> bool {
867        match self.descr.physical_type() {
868            // Don't truncate for Float16 and Decimal because their sort order is different
869            // from that of FIXED_LEN_BYTE_ARRAY sort order.
870            // So truncation of those types could lead to inaccurate min/max statistics
871            Type::FIXED_LEN_BYTE_ARRAY
872                if !matches!(
873                    self.descr.logical_type(),
874                    Some(LogicalType::Decimal { .. }) | Some(LogicalType::Float16)
875                ) =>
876            {
877                true
878            }
879            Type::BYTE_ARRAY => true,
880            // Truncation only applies for fba/binary physical types
881            _ => false,
882        }
883    }
884
885    /// Returns `true` if this column's logical type is a UTF-8 string.
886    fn is_utf8(&self) -> bool {
887        self.get_descriptor().logical_type() == Some(LogicalType::String)
888            || self.get_descriptor().converted_type() == ConvertedType::UTF8
889    }
890
891    /// Truncates a binary statistic to at most `truncation_length` bytes.
892    ///
893    /// If truncation is not possible, returns `data`.
894    ///
895    /// The `bool` in the returned tuple indicates whether truncation occurred or not.
896    ///
897    /// UTF-8 Note:
898    /// If the column type indicates UTF-8, and `data` contains valid UTF-8, then the result will
899    /// also remain valid UTF-8, but may be less tnan `truncation_length` bytes to avoid splitting
900    /// on non-character boundaries.
901    fn truncate_min_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
902        truncation_length
903            .filter(|l| data.len() > *l)
904            .and_then(|l|
905                // don't do extra work if this column isn't UTF-8
906                if self.is_utf8() {
907                    match str::from_utf8(data) {
908                        Ok(str_data) => truncate_utf8(str_data, l),
909                        Err(_) => Some(data[..l].to_vec()),
910                    }
911                } else {
912                    Some(data[..l].to_vec())
913                }
914            )
915            .map(|truncated| (truncated, true))
916            .unwrap_or_else(|| (data.to_vec(), false))
917    }
918
919    /// Truncates a binary statistic to at most `truncation_length` bytes, and then increment the
920    /// final byte(s) to yield a valid upper bound. This may result in a result of less than
921    /// `truncation_length` bytes if the last byte(s) overflows.
922    ///
923    /// If truncation is not possible, returns `data`.
924    ///
925    /// The `bool` in the returned tuple indicates whether truncation occurred or not.
926    ///
927    /// UTF-8 Note:
928    /// If the column type indicates UTF-8, and `data` contains valid UTF-8, then the result will
929    /// also remain valid UTF-8 (but again may be less than `truncation_length` bytes). If `data`
930    /// does not contain valid UTF-8, then truncation will occur as if the column is non-string
931    /// binary.
932    fn truncate_max_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
933        truncation_length
934            .filter(|l| data.len() > *l)
935            .and_then(|l|
936                // don't do extra work if this column isn't UTF-8
937                if self.is_utf8() {
938                    match str::from_utf8(data) {
939                        Ok(str_data) => truncate_and_increment_utf8(str_data, l),
940                        Err(_) => increment(data[..l].to_vec()),
941                    }
942                } else {
943                    increment(data[..l].to_vec())
944                }
945            )
946            .map(|truncated| (truncated, true))
947            .unwrap_or_else(|| (data.to_vec(), false))
948    }
949
950    /// Adds data page.
951    /// Data page is either buffered in case of dictionary encoding or written directly.
952    fn add_data_page(&mut self) -> Result<()> {
953        // Extract encoded values
954        let values_data = self.encoder.flush_data_page()?;
955
956        let max_def_level = self.descr.max_def_level();
957        let max_rep_level = self.descr.max_rep_level();
958
959        self.column_metrics.num_column_nulls += self.page_metrics.num_page_nulls;
960
961        let page_statistics = match (values_data.min_value, values_data.max_value) {
962            (Some(min), Some(max)) => {
963                // Update chunk level statistics
964                update_min(&self.descr, &min, &mut self.column_metrics.min_column_value);
965                update_max(&self.descr, &max, &mut self.column_metrics.max_column_value);
966
967                (self.statistics_enabled == EnabledStatistics::Page).then_some(
968                    ValueStatistics::new(
969                        Some(min),
970                        Some(max),
971                        None,
972                        Some(self.page_metrics.num_page_nulls),
973                        false,
974                    ),
975                )
976            }
977            _ => None,
978        };
979
980        // update column and offset index
981        self.update_column_offset_index(
982            page_statistics.as_ref(),
983            values_data.variable_length_bytes,
984        );
985
986        // Update histograms and variable_length_bytes in column_metrics
987        self.column_metrics
988            .update_from_page_metrics(&self.page_metrics);
989        self.column_metrics
990            .update_variable_length_bytes(values_data.variable_length_bytes);
991
992        let page_statistics = page_statistics.map(Statistics::from);
993
994        let compressed_page = match self.props.writer_version() {
995            WriterVersion::PARQUET_1_0 => {
996                let mut buffer = vec![];
997
998                if max_rep_level > 0 {
999                    buffer.extend_from_slice(
1000                        &self.encode_levels_v1(
1001                            Encoding::RLE,
1002                            &self.rep_levels_sink[..],
1003                            max_rep_level,
1004                        )[..],
1005                    );
1006                }
1007
1008                if max_def_level > 0 {
1009                    buffer.extend_from_slice(
1010                        &self.encode_levels_v1(
1011                            Encoding::RLE,
1012                            &self.def_levels_sink[..],
1013                            max_def_level,
1014                        )[..],
1015                    );
1016                }
1017
1018                buffer.extend_from_slice(&values_data.buf);
1019                let uncompressed_size = buffer.len();
1020
1021                if let Some(ref mut cmpr) = self.compressor {
1022                    let mut compressed_buf = Vec::with_capacity(uncompressed_size);
1023                    cmpr.compress(&buffer[..], &mut compressed_buf)?;
1024                    buffer = compressed_buf;
1025                }
1026
1027                let data_page = Page::DataPage {
1028                    buf: buffer.into(),
1029                    num_values: self.page_metrics.num_buffered_values,
1030                    encoding: values_data.encoding,
1031                    def_level_encoding: Encoding::RLE,
1032                    rep_level_encoding: Encoding::RLE,
1033                    statistics: page_statistics,
1034                };
1035
1036                CompressedPage::new(data_page, uncompressed_size)
1037            }
1038            WriterVersion::PARQUET_2_0 => {
1039                let mut rep_levels_byte_len = 0;
1040                let mut def_levels_byte_len = 0;
1041                let mut buffer = vec![];
1042
1043                if max_rep_level > 0 {
1044                    let levels = self.encode_levels_v2(&self.rep_levels_sink[..], max_rep_level);
1045                    rep_levels_byte_len = levels.len();
1046                    buffer.extend_from_slice(&levels[..]);
1047                }
1048
1049                if max_def_level > 0 {
1050                    let levels = self.encode_levels_v2(&self.def_levels_sink[..], max_def_level);
1051                    def_levels_byte_len = levels.len();
1052                    buffer.extend_from_slice(&levels[..]);
1053                }
1054
1055                let uncompressed_size =
1056                    rep_levels_byte_len + def_levels_byte_len + values_data.buf.len();
1057
1058                // Data Page v2 compresses values only.
1059                match self.compressor {
1060                    Some(ref mut cmpr) => {
1061                        cmpr.compress(&values_data.buf, &mut buffer)?;
1062                    }
1063                    None => buffer.extend_from_slice(&values_data.buf),
1064                }
1065
1066                let data_page = Page::DataPageV2 {
1067                    buf: buffer.into(),
1068                    num_values: self.page_metrics.num_buffered_values,
1069                    encoding: values_data.encoding,
1070                    num_nulls: self.page_metrics.num_page_nulls as u32,
1071                    num_rows: self.page_metrics.num_buffered_rows,
1072                    def_levels_byte_len: def_levels_byte_len as u32,
1073                    rep_levels_byte_len: rep_levels_byte_len as u32,
1074                    is_compressed: self.compressor.is_some(),
1075                    statistics: page_statistics,
1076                };
1077
1078                CompressedPage::new(data_page, uncompressed_size)
1079            }
1080        };
1081
1082        // Check if we need to buffer data page or flush it to the sink directly.
1083        if self.encoder.has_dictionary() {
1084            self.data_pages.push_back(compressed_page);
1085        } else {
1086            self.write_data_page(compressed_page)?;
1087        }
1088
1089        // Update total number of rows.
1090        self.column_metrics.total_rows_written += self.page_metrics.num_buffered_rows as u64;
1091
1092        // Reset state.
1093        self.rep_levels_sink.clear();
1094        self.def_levels_sink.clear();
1095        self.page_metrics.new_page();
1096
1097        Ok(())
1098    }
1099
1100    /// Finalises any outstanding data pages and flushes buffered data pages from
1101    /// dictionary encoding into underlying sink.
1102    #[inline]
1103    fn flush_data_pages(&mut self) -> Result<()> {
1104        // Write all outstanding data to a new page.
1105        if self.page_metrics.num_buffered_values > 0 {
1106            self.add_data_page()?;
1107        }
1108
1109        while let Some(page) = self.data_pages.pop_front() {
1110            self.write_data_page(page)?;
1111        }
1112
1113        Ok(())
1114    }
1115
1116    /// Assembles column chunk metadata.
1117    fn build_column_metadata(&mut self) -> Result<ColumnChunkMetaData> {
1118        let total_compressed_size = self.column_metrics.total_compressed_size as i64;
1119        let total_uncompressed_size = self.column_metrics.total_uncompressed_size as i64;
1120        let num_values = self.column_metrics.total_num_values as i64;
1121        let dict_page_offset = self.column_metrics.dictionary_page_offset.map(|v| v as i64);
1122        // If data page offset is not set, then no pages have been written
1123        let data_page_offset = self.column_metrics.data_page_offset.unwrap_or(0) as i64;
1124
1125        let mut builder = ColumnChunkMetaData::builder(self.descr.clone())
1126            .set_compression(self.codec)
1127            .set_encodings(self.encodings.iter().cloned().collect())
1128            .set_total_compressed_size(total_compressed_size)
1129            .set_total_uncompressed_size(total_uncompressed_size)
1130            .set_num_values(num_values)
1131            .set_data_page_offset(data_page_offset)
1132            .set_dictionary_page_offset(dict_page_offset);
1133
1134        if self.statistics_enabled != EnabledStatistics::None {
1135            let backwards_compatible_min_max = self.descr.sort_order().is_signed();
1136
1137            let statistics = ValueStatistics::<E::T>::new(
1138                self.column_metrics.min_column_value.clone(),
1139                self.column_metrics.max_column_value.clone(),
1140                self.column_metrics.column_distinct_count,
1141                Some(self.column_metrics.num_column_nulls),
1142                false,
1143            )
1144            .with_backwards_compatible_min_max(backwards_compatible_min_max)
1145            .into();
1146
1147            let statistics = match statistics {
1148                Statistics::ByteArray(stats) if stats._internal_has_min_max_set() => {
1149                    let (min, did_truncate_min) = self.truncate_min_value(
1150                        self.props.statistics_truncate_length(),
1151                        stats.min_bytes_opt().unwrap(),
1152                    );
1153                    let (max, did_truncate_max) = self.truncate_max_value(
1154                        self.props.statistics_truncate_length(),
1155                        stats.max_bytes_opt().unwrap(),
1156                    );
1157                    Statistics::ByteArray(
1158                        ValueStatistics::new(
1159                            Some(min.into()),
1160                            Some(max.into()),
1161                            stats.distinct_count(),
1162                            stats.null_count_opt(),
1163                            backwards_compatible_min_max,
1164                        )
1165                        .with_max_is_exact(!did_truncate_max)
1166                        .with_min_is_exact(!did_truncate_min),
1167                    )
1168                }
1169                Statistics::FixedLenByteArray(stats)
1170                    if (stats._internal_has_min_max_set() && self.can_truncate_value()) =>
1171                {
1172                    let (min, did_truncate_min) = self.truncate_min_value(
1173                        self.props.statistics_truncate_length(),
1174                        stats.min_bytes_opt().unwrap(),
1175                    );
1176                    let (max, did_truncate_max) = self.truncate_max_value(
1177                        self.props.statistics_truncate_length(),
1178                        stats.max_bytes_opt().unwrap(),
1179                    );
1180                    Statistics::FixedLenByteArray(
1181                        ValueStatistics::new(
1182                            Some(min.into()),
1183                            Some(max.into()),
1184                            stats.distinct_count(),
1185                            stats.null_count_opt(),
1186                            backwards_compatible_min_max,
1187                        )
1188                        .with_max_is_exact(!did_truncate_max)
1189                        .with_min_is_exact(!did_truncate_min),
1190                    )
1191                }
1192                stats => stats,
1193            };
1194
1195            builder = builder
1196                .set_statistics(statistics)
1197                .set_unencoded_byte_array_data_bytes(self.column_metrics.variable_length_bytes)
1198                .set_repetition_level_histogram(
1199                    self.column_metrics.repetition_level_histogram.take(),
1200                )
1201                .set_definition_level_histogram(
1202                    self.column_metrics.definition_level_histogram.take(),
1203                );
1204        }
1205
1206        builder = self.set_column_chunk_encryption_properties(builder);
1207
1208        let metadata = builder.build()?;
1209        Ok(metadata)
1210    }
1211
1212    /// Encodes definition or repetition levels for Data Page v1.
1213    #[inline]
1214    fn encode_levels_v1(&self, encoding: Encoding, levels: &[i16], max_level: i16) -> Vec<u8> {
1215        let mut encoder = LevelEncoder::v1(encoding, max_level, levels.len());
1216        encoder.put(levels);
1217        encoder.consume()
1218    }
1219
1220    /// Encodes definition or repetition levels for Data Page v2.
1221    /// Encoding is always RLE.
1222    #[inline]
1223    fn encode_levels_v2(&self, levels: &[i16], max_level: i16) -> Vec<u8> {
1224        let mut encoder = LevelEncoder::v2(max_level, levels.len());
1225        encoder.put(levels);
1226        encoder.consume()
1227    }
1228
1229    /// Writes compressed data page into underlying sink and updates global metrics.
1230    #[inline]
1231    fn write_data_page(&mut self, page: CompressedPage) -> Result<()> {
1232        self.encodings.insert(page.encoding());
1233        let page_spec = self.page_writer.write_page(page)?;
1234        // update offset index
1235        // compressed_size = header_size + compressed_data_size
1236        if let Some(builder) = self.offset_index_builder.as_mut() {
1237            builder
1238                .append_offset_and_size(page_spec.offset as i64, page_spec.compressed_size as i32)
1239        }
1240        self.update_metrics_for_page(page_spec);
1241        Ok(())
1242    }
1243
1244    /// Writes dictionary page into underlying sink.
1245    #[inline]
1246    fn write_dictionary_page(&mut self) -> Result<()> {
1247        let compressed_page = {
1248            let mut page = self
1249                .encoder
1250                .flush_dict_page()?
1251                .ok_or_else(|| general_err!("Dictionary encoder is not set"))?;
1252
1253            let uncompressed_size = page.buf.len();
1254
1255            if let Some(ref mut cmpr) = self.compressor {
1256                let mut output_buf = Vec::with_capacity(uncompressed_size);
1257                cmpr.compress(&page.buf, &mut output_buf)?;
1258                page.buf = Bytes::from(output_buf);
1259            }
1260
1261            let dict_page = Page::DictionaryPage {
1262                buf: page.buf,
1263                num_values: page.num_values as u32,
1264                encoding: self.props.dictionary_page_encoding(),
1265                is_sorted: page.is_sorted,
1266            };
1267            CompressedPage::new(dict_page, uncompressed_size)
1268        };
1269
1270        self.encodings.insert(compressed_page.encoding());
1271        let page_spec = self.page_writer.write_page(compressed_page)?;
1272        self.update_metrics_for_page(page_spec);
1273        // For the directory page, don't need to update column/offset index.
1274        Ok(())
1275    }
1276
1277    /// Updates column writer metrics with each page metadata.
1278    #[inline]
1279    fn update_metrics_for_page(&mut self, page_spec: PageWriteSpec) {
1280        self.column_metrics.total_uncompressed_size += page_spec.uncompressed_size as u64;
1281        self.column_metrics.total_compressed_size += page_spec.compressed_size as u64;
1282        self.column_metrics.total_bytes_written += page_spec.bytes_written;
1283
1284        match page_spec.page_type {
1285            PageType::DATA_PAGE | PageType::DATA_PAGE_V2 => {
1286                self.column_metrics.total_num_values += page_spec.num_values as u64;
1287                if self.column_metrics.data_page_offset.is_none() {
1288                    self.column_metrics.data_page_offset = Some(page_spec.offset);
1289                }
1290            }
1291            PageType::DICTIONARY_PAGE => {
1292                assert!(
1293                    self.column_metrics.dictionary_page_offset.is_none(),
1294                    "Dictionary offset is already set"
1295                );
1296                self.column_metrics.dictionary_page_offset = Some(page_spec.offset);
1297            }
1298            _ => {}
1299        }
1300    }
1301
1302    #[inline]
1303    #[cfg(feature = "encryption")]
1304    fn set_column_chunk_encryption_properties(
1305        &self,
1306        builder: ColumnChunkMetaDataBuilder,
1307    ) -> ColumnChunkMetaDataBuilder {
1308        if let Some(encryption_properties) = self.props.file_encryption_properties.as_ref() {
1309            builder.set_column_crypto_metadata(get_column_crypto_metadata(
1310                encryption_properties,
1311                &self.descr,
1312            ))
1313        } else {
1314            builder
1315        }
1316    }
1317
1318    #[inline]
1319    #[cfg(not(feature = "encryption"))]
1320    fn set_column_chunk_encryption_properties(
1321        &self,
1322        builder: ColumnChunkMetaDataBuilder,
1323    ) -> ColumnChunkMetaDataBuilder {
1324        builder
1325    }
1326}
1327
1328fn update_min<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, min: &mut Option<T>) {
1329    update_stat::<T, _>(descr, val, min, |cur| compare_greater(descr, cur, val))
1330}
1331
1332fn update_max<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, max: &mut Option<T>) {
1333    update_stat::<T, _>(descr, val, max, |cur| compare_greater(descr, val, cur))
1334}
1335
1336#[inline]
1337#[allow(clippy::eq_op)]
1338fn is_nan<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T) -> bool {
1339    match T::PHYSICAL_TYPE {
1340        Type::FLOAT | Type::DOUBLE => val != val,
1341        Type::FIXED_LEN_BYTE_ARRAY if descr.logical_type() == Some(LogicalType::Float16) => {
1342            let val = val.as_bytes();
1343            let val = f16::from_le_bytes([val[0], val[1]]);
1344            val.is_nan()
1345        }
1346        _ => false,
1347    }
1348}
1349
1350/// Perform a conditional update of `cur`, skipping any NaN values
1351///
1352/// If `cur` is `None`, sets `cur` to `Some(val)`, otherwise calls `should_update` with
1353/// the value of `cur`, and updates `cur` to `Some(val)` if it returns `true`
1354fn update_stat<T: ParquetValueType, F>(
1355    descr: &ColumnDescriptor,
1356    val: &T,
1357    cur: &mut Option<T>,
1358    should_update: F,
1359) where
1360    F: Fn(&T) -> bool,
1361{
1362    if is_nan(descr, val) {
1363        return;
1364    }
1365
1366    if cur.as_ref().map_or(true, should_update) {
1367        *cur = Some(val.clone());
1368    }
1369}
1370
1371/// Evaluate `a > b` according to underlying logical type.
1372fn compare_greater<T: ParquetValueType>(descr: &ColumnDescriptor, a: &T, b: &T) -> bool {
1373    if let Some(LogicalType::Integer { is_signed, .. }) = descr.logical_type() {
1374        if !is_signed {
1375            // need to compare unsigned
1376            return a.as_u64().unwrap() > b.as_u64().unwrap();
1377        }
1378    }
1379
1380    match descr.converted_type() {
1381        ConvertedType::UINT_8
1382        | ConvertedType::UINT_16
1383        | ConvertedType::UINT_32
1384        | ConvertedType::UINT_64 => {
1385            return a.as_u64().unwrap() > b.as_u64().unwrap();
1386        }
1387        _ => {}
1388    };
1389
1390    if let Some(LogicalType::Decimal { .. }) = descr.logical_type() {
1391        match T::PHYSICAL_TYPE {
1392            Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
1393                return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1394            }
1395            _ => {}
1396        };
1397    }
1398
1399    if descr.converted_type() == ConvertedType::DECIMAL {
1400        match T::PHYSICAL_TYPE {
1401            Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
1402                return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1403            }
1404            _ => {}
1405        };
1406    };
1407
1408    if let Some(LogicalType::Float16) = descr.logical_type() {
1409        let a = a.as_bytes();
1410        let a = f16::from_le_bytes([a[0], a[1]]);
1411        let b = b.as_bytes();
1412        let b = f16::from_le_bytes([b[0], b[1]]);
1413        return a > b;
1414    }
1415
1416    a > b
1417}
1418
1419// ----------------------------------------------------------------------
1420// Encoding support for column writer.
1421// This mirrors parquet-mr default encodings for writes. See:
1422// https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV1ValuesWriterFactory.java
1423// https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV2ValuesWriterFactory.java
1424
1425/// Returns encoding for a column when no other encoding is provided in writer properties.
1426fn fallback_encoding(kind: Type, props: &WriterProperties) -> Encoding {
1427    match (kind, props.writer_version()) {
1428        (Type::BOOLEAN, WriterVersion::PARQUET_2_0) => Encoding::RLE,
1429        (Type::INT32, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1430        (Type::INT64, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1431        (Type::BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1432        (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1433        _ => Encoding::PLAIN,
1434    }
1435}
1436
1437/// Returns true if dictionary is supported for column writer, false otherwise.
1438fn has_dictionary_support(kind: Type, props: &WriterProperties) -> bool {
1439    match (kind, props.writer_version()) {
1440        // Booleans do not support dict encoding and should use a fallback encoding.
1441        (Type::BOOLEAN, _) => false,
1442        // Dictionary encoding was not enabled in PARQUET 1.0
1443        (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_1_0) => false,
1444        (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => true,
1445        _ => true,
1446    }
1447}
1448
1449/// Signed comparison of bytes arrays
1450fn compare_greater_byte_array_decimals(a: &[u8], b: &[u8]) -> bool {
1451    let a_length = a.len();
1452    let b_length = b.len();
1453
1454    if a_length == 0 || b_length == 0 {
1455        return a_length > 0;
1456    }
1457
1458    let first_a: u8 = a[0];
1459    let first_b: u8 = b[0];
1460
1461    // We can short circuit for different signed numbers or
1462    // for equal length bytes arrays that have different first bytes.
1463    // The equality requirement is necessary for sign extension cases.
1464    // 0xFF10 should be equal to 0x10 (due to big endian sign extension).
1465    if (0x80 & first_a) != (0x80 & first_b) || (a_length == b_length && first_a != first_b) {
1466        return (first_a as i8) > (first_b as i8);
1467    }
1468
1469    // When the lengths are unequal and the numbers are of the same
1470    // sign we need to do comparison by sign extending the shorter
1471    // value first, and once we get to equal sized arrays, lexicographical
1472    // unsigned comparison of everything but the first byte is sufficient.
1473
1474    let extension: u8 = if (first_a as i8) < 0 { 0xFF } else { 0 };
1475
1476    if a_length != b_length {
1477        let not_equal = if a_length > b_length {
1478            let lead_length = a_length - b_length;
1479            a[0..lead_length].iter().any(|&x| x != extension)
1480        } else {
1481            let lead_length = b_length - a_length;
1482            b[0..lead_length].iter().any(|&x| x != extension)
1483        };
1484
1485        if not_equal {
1486            let negative_values: bool = (first_a as i8) < 0;
1487            let a_longer: bool = a_length > b_length;
1488            return if negative_values { !a_longer } else { a_longer };
1489        }
1490    }
1491
1492    (a[1..]) > (b[1..])
1493}
1494
1495/// Truncate a UTF-8 slice to the longest prefix that is still a valid UTF-8 string,
1496/// while being less than `length` bytes and non-empty. Returns `None` if truncation
1497/// is not possible within those constraints.
1498///
1499/// The caller guarantees that data.len() > length.
1500fn truncate_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1501    let split = (1..=length).rfind(|x| data.is_char_boundary(*x))?;
1502    Some(data.as_bytes()[..split].to_vec())
1503}
1504
1505/// Truncate a UTF-8 slice and increment it's final character. The returned value is the
1506/// longest such slice that is still a valid UTF-8 string while being less than `length`
1507/// bytes and non-empty. Returns `None` if no such transformation is possible.
1508///
1509/// The caller guarantees that data.len() > length.
1510fn truncate_and_increment_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1511    // UTF-8 is max 4 bytes, so start search 3 back from desired length
1512    let lower_bound = length.saturating_sub(3);
1513    let split = (lower_bound..=length).rfind(|x| data.is_char_boundary(*x))?;
1514    increment_utf8(data.get(..split)?)
1515}
1516
1517/// Increment the final character in a UTF-8 string in such a way that the returned result
1518/// is still a valid UTF-8 string. The returned string may be shorter than the input if the
1519/// last character(s) cannot be incremented (due to overflow or producing invalid code points).
1520/// Returns `None` if the string cannot be incremented.
1521///
1522/// Note that this implementation will not promote an N-byte code point to (N+1) bytes.
1523fn increment_utf8(data: &str) -> Option<Vec<u8>> {
1524    for (idx, original_char) in data.char_indices().rev() {
1525        let original_len = original_char.len_utf8();
1526        if let Some(next_char) = char::from_u32(original_char as u32 + 1) {
1527            // do not allow increasing byte width of incremented char
1528            if next_char.len_utf8() == original_len {
1529                let mut result = data.as_bytes()[..idx + original_len].to_vec();
1530                next_char.encode_utf8(&mut result[idx..]);
1531                return Some(result);
1532            }
1533        }
1534    }
1535
1536    None
1537}
1538
1539/// Try and increment the bytes from right to left.
1540///
1541/// Returns `None` if all bytes are set to `u8::MAX`.
1542fn increment(mut data: Vec<u8>) -> Option<Vec<u8>> {
1543    for byte in data.iter_mut().rev() {
1544        let (incremented, overflow) = byte.overflowing_add(1);
1545        *byte = incremented;
1546
1547        if !overflow {
1548            return Some(data);
1549        }
1550    }
1551
1552    None
1553}
1554
1555#[cfg(test)]
1556mod tests {
1557    use crate::{
1558        file::{properties::DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, writer::SerializedFileWriter},
1559        schema::parser::parse_message_type,
1560    };
1561    use core::str;
1562    use rand::distr::uniform::SampleUniform;
1563    use std::{fs::File, sync::Arc};
1564
1565    use crate::column::{
1566        page::PageReader,
1567        reader::{get_column_reader, get_typed_column_reader, ColumnReaderImpl},
1568    };
1569    use crate::file::writer::TrackedWrite;
1570    use crate::file::{
1571        properties::ReaderProperties, reader::SerializedPageReader, writer::SerializedPageWriter,
1572    };
1573    use crate::schema::types::{ColumnPath, Type as SchemaType};
1574    use crate::util::test_common::rand_gen::random_numbers_range;
1575
1576    use super::*;
1577
1578    #[test]
1579    fn test_column_writer_inconsistent_def_rep_length() {
1580        let page_writer = get_test_page_writer();
1581        let props = Default::default();
1582        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 1, props);
1583        let res = writer.write_batch(&[1, 2, 3, 4], Some(&[1, 1, 1]), Some(&[0, 0]));
1584        assert!(res.is_err());
1585        if let Err(err) = res {
1586            assert_eq!(
1587                format!("{err}"),
1588                "Parquet error: Inconsistent length of definition and repetition levels: 3 != 2"
1589            );
1590        }
1591    }
1592
1593    #[test]
1594    fn test_column_writer_invalid_def_levels() {
1595        let page_writer = get_test_page_writer();
1596        let props = Default::default();
1597        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1598        let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1599        assert!(res.is_err());
1600        if let Err(err) = res {
1601            assert_eq!(
1602                format!("{err}"),
1603                "Parquet error: Definition levels are required, because max definition level = 1"
1604            );
1605        }
1606    }
1607
1608    #[test]
1609    fn test_column_writer_invalid_rep_levels() {
1610        let page_writer = get_test_page_writer();
1611        let props = Default::default();
1612        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 1, props);
1613        let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1614        assert!(res.is_err());
1615        if let Err(err) = res {
1616            assert_eq!(
1617                format!("{err}"),
1618                "Parquet error: Repetition levels are required, because max repetition level = 1"
1619            );
1620        }
1621    }
1622
1623    #[test]
1624    fn test_column_writer_not_enough_values_to_write() {
1625        let page_writer = get_test_page_writer();
1626        let props = Default::default();
1627        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1628        let res = writer.write_batch(&[1, 2], Some(&[1, 1, 1, 1]), None);
1629        assert!(res.is_err());
1630        if let Err(err) = res {
1631            assert_eq!(
1632                format!("{err}"),
1633                "Parquet error: Expected to write 4 values, but have only 2"
1634            );
1635        }
1636    }
1637
1638    #[test]
1639    fn test_column_writer_write_only_one_dictionary_page() {
1640        let page_writer = get_test_page_writer();
1641        let props = Default::default();
1642        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1643        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1644        // First page should be correctly written.
1645        writer.add_data_page().unwrap();
1646        writer.write_dictionary_page().unwrap();
1647        let err = writer.write_dictionary_page().unwrap_err().to_string();
1648        assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1649    }
1650
1651    #[test]
1652    fn test_column_writer_error_when_writing_disabled_dictionary() {
1653        let page_writer = get_test_page_writer();
1654        let props = Arc::new(
1655            WriterProperties::builder()
1656                .set_dictionary_enabled(false)
1657                .build(),
1658        );
1659        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1660        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1661        let err = writer.write_dictionary_page().unwrap_err().to_string();
1662        assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1663    }
1664
1665    #[test]
1666    fn test_column_writer_boolean_type_does_not_support_dictionary() {
1667        let page_writer = get_test_page_writer();
1668        let props = Arc::new(
1669            WriterProperties::builder()
1670                .set_dictionary_enabled(true)
1671                .build(),
1672        );
1673        let mut writer = get_test_column_writer::<BoolType>(page_writer, 0, 0, props);
1674        writer
1675            .write_batch(&[true, false, true, false], None, None)
1676            .unwrap();
1677
1678        let r = writer.close().unwrap();
1679        // PlainEncoder uses bit writer to write boolean values, which all fit into 1
1680        // byte.
1681        assert_eq!(r.bytes_written, 1);
1682        assert_eq!(r.rows_written, 4);
1683
1684        let metadata = r.metadata;
1685        assert_eq!(metadata.encodings(), &vec![Encoding::PLAIN, Encoding::RLE]);
1686        assert_eq!(metadata.num_values(), 4); // just values
1687        assert_eq!(metadata.dictionary_page_offset(), None);
1688    }
1689
1690    #[test]
1691    fn test_column_writer_default_encoding_support_bool() {
1692        check_encoding_write_support::<BoolType>(
1693            WriterVersion::PARQUET_1_0,
1694            true,
1695            &[true, false],
1696            None,
1697            &[Encoding::PLAIN, Encoding::RLE],
1698        );
1699        check_encoding_write_support::<BoolType>(
1700            WriterVersion::PARQUET_1_0,
1701            false,
1702            &[true, false],
1703            None,
1704            &[Encoding::PLAIN, Encoding::RLE],
1705        );
1706        check_encoding_write_support::<BoolType>(
1707            WriterVersion::PARQUET_2_0,
1708            true,
1709            &[true, false],
1710            None,
1711            &[Encoding::RLE],
1712        );
1713        check_encoding_write_support::<BoolType>(
1714            WriterVersion::PARQUET_2_0,
1715            false,
1716            &[true, false],
1717            None,
1718            &[Encoding::RLE],
1719        );
1720    }
1721
1722    #[test]
1723    fn test_column_writer_default_encoding_support_int32() {
1724        check_encoding_write_support::<Int32Type>(
1725            WriterVersion::PARQUET_1_0,
1726            true,
1727            &[1, 2],
1728            Some(0),
1729            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1730        );
1731        check_encoding_write_support::<Int32Type>(
1732            WriterVersion::PARQUET_1_0,
1733            false,
1734            &[1, 2],
1735            None,
1736            &[Encoding::PLAIN, Encoding::RLE],
1737        );
1738        check_encoding_write_support::<Int32Type>(
1739            WriterVersion::PARQUET_2_0,
1740            true,
1741            &[1, 2],
1742            Some(0),
1743            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1744        );
1745        check_encoding_write_support::<Int32Type>(
1746            WriterVersion::PARQUET_2_0,
1747            false,
1748            &[1, 2],
1749            None,
1750            &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
1751        );
1752    }
1753
1754    #[test]
1755    fn test_column_writer_default_encoding_support_int64() {
1756        check_encoding_write_support::<Int64Type>(
1757            WriterVersion::PARQUET_1_0,
1758            true,
1759            &[1, 2],
1760            Some(0),
1761            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1762        );
1763        check_encoding_write_support::<Int64Type>(
1764            WriterVersion::PARQUET_1_0,
1765            false,
1766            &[1, 2],
1767            None,
1768            &[Encoding::PLAIN, Encoding::RLE],
1769        );
1770        check_encoding_write_support::<Int64Type>(
1771            WriterVersion::PARQUET_2_0,
1772            true,
1773            &[1, 2],
1774            Some(0),
1775            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1776        );
1777        check_encoding_write_support::<Int64Type>(
1778            WriterVersion::PARQUET_2_0,
1779            false,
1780            &[1, 2],
1781            None,
1782            &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
1783        );
1784    }
1785
1786    #[test]
1787    fn test_column_writer_default_encoding_support_int96() {
1788        check_encoding_write_support::<Int96Type>(
1789            WriterVersion::PARQUET_1_0,
1790            true,
1791            &[Int96::from(vec![1, 2, 3])],
1792            Some(0),
1793            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1794        );
1795        check_encoding_write_support::<Int96Type>(
1796            WriterVersion::PARQUET_1_0,
1797            false,
1798            &[Int96::from(vec![1, 2, 3])],
1799            None,
1800            &[Encoding::PLAIN, Encoding::RLE],
1801        );
1802        check_encoding_write_support::<Int96Type>(
1803            WriterVersion::PARQUET_2_0,
1804            true,
1805            &[Int96::from(vec![1, 2, 3])],
1806            Some(0),
1807            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1808        );
1809        check_encoding_write_support::<Int96Type>(
1810            WriterVersion::PARQUET_2_0,
1811            false,
1812            &[Int96::from(vec![1, 2, 3])],
1813            None,
1814            &[Encoding::PLAIN, Encoding::RLE],
1815        );
1816    }
1817
1818    #[test]
1819    fn test_column_writer_default_encoding_support_float() {
1820        check_encoding_write_support::<FloatType>(
1821            WriterVersion::PARQUET_1_0,
1822            true,
1823            &[1.0, 2.0],
1824            Some(0),
1825            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1826        );
1827        check_encoding_write_support::<FloatType>(
1828            WriterVersion::PARQUET_1_0,
1829            false,
1830            &[1.0, 2.0],
1831            None,
1832            &[Encoding::PLAIN, Encoding::RLE],
1833        );
1834        check_encoding_write_support::<FloatType>(
1835            WriterVersion::PARQUET_2_0,
1836            true,
1837            &[1.0, 2.0],
1838            Some(0),
1839            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1840        );
1841        check_encoding_write_support::<FloatType>(
1842            WriterVersion::PARQUET_2_0,
1843            false,
1844            &[1.0, 2.0],
1845            None,
1846            &[Encoding::PLAIN, Encoding::RLE],
1847        );
1848    }
1849
1850    #[test]
1851    fn test_column_writer_default_encoding_support_double() {
1852        check_encoding_write_support::<DoubleType>(
1853            WriterVersion::PARQUET_1_0,
1854            true,
1855            &[1.0, 2.0],
1856            Some(0),
1857            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1858        );
1859        check_encoding_write_support::<DoubleType>(
1860            WriterVersion::PARQUET_1_0,
1861            false,
1862            &[1.0, 2.0],
1863            None,
1864            &[Encoding::PLAIN, Encoding::RLE],
1865        );
1866        check_encoding_write_support::<DoubleType>(
1867            WriterVersion::PARQUET_2_0,
1868            true,
1869            &[1.0, 2.0],
1870            Some(0),
1871            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1872        );
1873        check_encoding_write_support::<DoubleType>(
1874            WriterVersion::PARQUET_2_0,
1875            false,
1876            &[1.0, 2.0],
1877            None,
1878            &[Encoding::PLAIN, Encoding::RLE],
1879        );
1880    }
1881
1882    #[test]
1883    fn test_column_writer_default_encoding_support_byte_array() {
1884        check_encoding_write_support::<ByteArrayType>(
1885            WriterVersion::PARQUET_1_0,
1886            true,
1887            &[ByteArray::from(vec![1u8])],
1888            Some(0),
1889            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1890        );
1891        check_encoding_write_support::<ByteArrayType>(
1892            WriterVersion::PARQUET_1_0,
1893            false,
1894            &[ByteArray::from(vec![1u8])],
1895            None,
1896            &[Encoding::PLAIN, Encoding::RLE],
1897        );
1898        check_encoding_write_support::<ByteArrayType>(
1899            WriterVersion::PARQUET_2_0,
1900            true,
1901            &[ByteArray::from(vec![1u8])],
1902            Some(0),
1903            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1904        );
1905        check_encoding_write_support::<ByteArrayType>(
1906            WriterVersion::PARQUET_2_0,
1907            false,
1908            &[ByteArray::from(vec![1u8])],
1909            None,
1910            &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
1911        );
1912    }
1913
1914    #[test]
1915    fn test_column_writer_default_encoding_support_fixed_len_byte_array() {
1916        check_encoding_write_support::<FixedLenByteArrayType>(
1917            WriterVersion::PARQUET_1_0,
1918            true,
1919            &[ByteArray::from(vec![1u8]).into()],
1920            None,
1921            &[Encoding::PLAIN, Encoding::RLE],
1922        );
1923        check_encoding_write_support::<FixedLenByteArrayType>(
1924            WriterVersion::PARQUET_1_0,
1925            false,
1926            &[ByteArray::from(vec![1u8]).into()],
1927            None,
1928            &[Encoding::PLAIN, Encoding::RLE],
1929        );
1930        check_encoding_write_support::<FixedLenByteArrayType>(
1931            WriterVersion::PARQUET_2_0,
1932            true,
1933            &[ByteArray::from(vec![1u8]).into()],
1934            Some(0),
1935            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1936        );
1937        check_encoding_write_support::<FixedLenByteArrayType>(
1938            WriterVersion::PARQUET_2_0,
1939            false,
1940            &[ByteArray::from(vec![1u8]).into()],
1941            None,
1942            &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
1943        );
1944    }
1945
1946    #[test]
1947    fn test_column_writer_check_metadata() {
1948        let page_writer = get_test_page_writer();
1949        let props = Default::default();
1950        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1951        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1952
1953        let r = writer.close().unwrap();
1954        assert_eq!(r.bytes_written, 20);
1955        assert_eq!(r.rows_written, 4);
1956
1957        let metadata = r.metadata;
1958        assert_eq!(
1959            metadata.encodings(),
1960            &vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
1961        );
1962        assert_eq!(metadata.num_values(), 4);
1963        assert_eq!(metadata.compressed_size(), 20);
1964        assert_eq!(metadata.uncompressed_size(), 20);
1965        assert_eq!(metadata.data_page_offset(), 0);
1966        assert_eq!(metadata.dictionary_page_offset(), Some(0));
1967        if let Some(stats) = metadata.statistics() {
1968            assert_eq!(stats.null_count_opt(), Some(0));
1969            assert_eq!(stats.distinct_count_opt(), None);
1970            if let Statistics::Int32(stats) = stats {
1971                assert_eq!(stats.min_opt().unwrap(), &1);
1972                assert_eq!(stats.max_opt().unwrap(), &4);
1973            } else {
1974                panic!("expecting Statistics::Int32");
1975            }
1976        } else {
1977            panic!("metadata missing statistics");
1978        }
1979    }
1980
1981    #[test]
1982    fn test_column_writer_check_byte_array_min_max() {
1983        let page_writer = get_test_page_writer();
1984        let props = Default::default();
1985        let mut writer = get_test_decimals_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
1986        writer
1987            .write_batch(
1988                &[
1989                    ByteArray::from(vec![
1990                        255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
1991                        35u8, 231u8, 90u8, 0u8, 0u8,
1992                    ]),
1993                    ByteArray::from(vec![
1994                        255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 228u8, 62u8, 146u8,
1995                        152u8, 177u8, 56u8, 0u8, 0u8,
1996                    ]),
1997                    ByteArray::from(vec![
1998                        0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8,
1999                        0u8,
2000                    ]),
2001                    ByteArray::from(vec![
2002                        0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
2003                        44u8, 0u8, 0u8,
2004                    ]),
2005                ],
2006                None,
2007                None,
2008            )
2009            .unwrap();
2010        let metadata = writer.close().unwrap().metadata;
2011        if let Some(stats) = metadata.statistics() {
2012            if let Statistics::ByteArray(stats) = stats {
2013                assert_eq!(
2014                    stats.min_opt().unwrap(),
2015                    &ByteArray::from(vec![
2016                        255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
2017                        35u8, 231u8, 90u8, 0u8, 0u8,
2018                    ])
2019                );
2020                assert_eq!(
2021                    stats.max_opt().unwrap(),
2022                    &ByteArray::from(vec![
2023                        0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
2024                        44u8, 0u8, 0u8,
2025                    ])
2026                );
2027            } else {
2028                panic!("expecting Statistics::ByteArray");
2029            }
2030        } else {
2031            panic!("metadata missing statistics");
2032        }
2033    }
2034
2035    #[test]
2036    fn test_column_writer_uint32_converted_type_min_max() {
2037        let page_writer = get_test_page_writer();
2038        let props = Default::default();
2039        let mut writer = get_test_unsigned_int_given_as_converted_column_writer::<Int32Type>(
2040            page_writer,
2041            0,
2042            0,
2043            props,
2044        );
2045        writer.write_batch(&[0, 1, 2, 3, 4, 5], None, None).unwrap();
2046        let metadata = writer.close().unwrap().metadata;
2047        if let Some(stats) = metadata.statistics() {
2048            if let Statistics::Int32(stats) = stats {
2049                assert_eq!(stats.min_opt().unwrap(), &0,);
2050                assert_eq!(stats.max_opt().unwrap(), &5,);
2051            } else {
2052                panic!("expecting Statistics::Int32");
2053            }
2054        } else {
2055            panic!("metadata missing statistics");
2056        }
2057    }
2058
2059    #[test]
2060    fn test_column_writer_precalculated_statistics() {
2061        let page_writer = get_test_page_writer();
2062        let props = Arc::new(
2063            WriterProperties::builder()
2064                .set_statistics_enabled(EnabledStatistics::Chunk)
2065                .build(),
2066        );
2067        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2068        writer
2069            .write_batch_with_statistics(
2070                &[1, 2, 3, 4],
2071                None,
2072                None,
2073                Some(&-17),
2074                Some(&9000),
2075                Some(55),
2076            )
2077            .unwrap();
2078
2079        let r = writer.close().unwrap();
2080        assert_eq!(r.bytes_written, 20);
2081        assert_eq!(r.rows_written, 4);
2082
2083        let metadata = r.metadata;
2084        assert_eq!(
2085            metadata.encodings(),
2086            &vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
2087        );
2088        assert_eq!(metadata.num_values(), 4);
2089        assert_eq!(metadata.compressed_size(), 20);
2090        assert_eq!(metadata.uncompressed_size(), 20);
2091        assert_eq!(metadata.data_page_offset(), 0);
2092        assert_eq!(metadata.dictionary_page_offset(), Some(0));
2093        if let Some(stats) = metadata.statistics() {
2094            assert_eq!(stats.null_count_opt(), Some(0));
2095            assert_eq!(stats.distinct_count_opt().unwrap_or(0), 55);
2096            if let Statistics::Int32(stats) = stats {
2097                assert_eq!(stats.min_opt().unwrap(), &-17);
2098                assert_eq!(stats.max_opt().unwrap(), &9000);
2099            } else {
2100                panic!("expecting Statistics::Int32");
2101            }
2102        } else {
2103            panic!("metadata missing statistics");
2104        }
2105    }
2106
2107    #[test]
2108    fn test_mixed_precomputed_statistics() {
2109        let mut buf = Vec::with_capacity(100);
2110        let mut write = TrackedWrite::new(&mut buf);
2111        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2112        let props = Default::default();
2113        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2114
2115        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2116        writer
2117            .write_batch_with_statistics(&[5, 6, 7], None, None, Some(&5), Some(&7), Some(3))
2118            .unwrap();
2119
2120        let r = writer.close().unwrap();
2121
2122        let stats = r.metadata.statistics().unwrap();
2123        assert_eq!(stats.min_bytes_opt().unwrap(), 1_i32.to_le_bytes());
2124        assert_eq!(stats.max_bytes_opt().unwrap(), 7_i32.to_le_bytes());
2125        assert_eq!(stats.null_count_opt(), Some(0));
2126        assert!(stats.distinct_count_opt().is_none());
2127
2128        drop(write);
2129
2130        let props = ReaderProperties::builder()
2131            .set_backward_compatible_lz4(false)
2132            .build();
2133        let reader = SerializedPageReader::new_with_properties(
2134            Arc::new(Bytes::from(buf)),
2135            &r.metadata,
2136            r.rows_written as usize,
2137            None,
2138            Arc::new(props),
2139        )
2140        .unwrap();
2141
2142        let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2143        assert_eq!(pages.len(), 2);
2144
2145        assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2146        assert_eq!(pages[1].page_type(), PageType::DATA_PAGE);
2147
2148        let page_statistics = pages[1].statistics().unwrap();
2149        assert_eq!(
2150            page_statistics.min_bytes_opt().unwrap(),
2151            1_i32.to_le_bytes()
2152        );
2153        assert_eq!(
2154            page_statistics.max_bytes_opt().unwrap(),
2155            7_i32.to_le_bytes()
2156        );
2157        assert_eq!(page_statistics.null_count_opt(), Some(0));
2158        assert!(page_statistics.distinct_count_opt().is_none());
2159    }
2160
2161    #[test]
2162    fn test_disabled_statistics() {
2163        let mut buf = Vec::with_capacity(100);
2164        let mut write = TrackedWrite::new(&mut buf);
2165        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2166        let props = WriterProperties::builder()
2167            .set_statistics_enabled(EnabledStatistics::None)
2168            .set_writer_version(WriterVersion::PARQUET_2_0)
2169            .build();
2170        let props = Arc::new(props);
2171
2172        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
2173        writer
2174            .write_batch(&[1, 2, 3, 4], Some(&[1, 0, 0, 1, 1, 1]), None)
2175            .unwrap();
2176
2177        let r = writer.close().unwrap();
2178        assert!(r.metadata.statistics().is_none());
2179
2180        drop(write);
2181
2182        let props = ReaderProperties::builder()
2183            .set_backward_compatible_lz4(false)
2184            .build();
2185        let reader = SerializedPageReader::new_with_properties(
2186            Arc::new(Bytes::from(buf)),
2187            &r.metadata,
2188            r.rows_written as usize,
2189            None,
2190            Arc::new(props),
2191        )
2192        .unwrap();
2193
2194        let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2195        assert_eq!(pages.len(), 2);
2196
2197        assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2198        assert_eq!(pages[1].page_type(), PageType::DATA_PAGE_V2);
2199
2200        match &pages[1] {
2201            Page::DataPageV2 {
2202                num_values,
2203                num_nulls,
2204                num_rows,
2205                statistics,
2206                ..
2207            } => {
2208                assert_eq!(*num_values, 6);
2209                assert_eq!(*num_nulls, 2);
2210                assert_eq!(*num_rows, 6);
2211                assert!(statistics.is_none());
2212            }
2213            _ => unreachable!(),
2214        }
2215    }
2216
2217    #[test]
2218    fn test_column_writer_empty_column_roundtrip() {
2219        let props = Default::default();
2220        column_roundtrip::<Int32Type>(props, &[], None, None);
2221    }
2222
2223    #[test]
2224    fn test_column_writer_non_nullable_values_roundtrip() {
2225        let props = Default::default();
2226        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 0, 0);
2227    }
2228
2229    #[test]
2230    fn test_column_writer_nullable_non_repeated_values_roundtrip() {
2231        let props = Default::default();
2232        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 0);
2233    }
2234
2235    #[test]
2236    fn test_column_writer_nullable_repeated_values_roundtrip() {
2237        let props = Default::default();
2238        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2239    }
2240
2241    #[test]
2242    fn test_column_writer_dictionary_fallback_small_data_page() {
2243        let props = WriterProperties::builder()
2244            .set_dictionary_page_size_limit(32)
2245            .set_data_page_size_limit(32)
2246            .build();
2247        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2248    }
2249
2250    #[test]
2251    fn test_column_writer_small_write_batch_size() {
2252        for i in &[1usize, 2, 5, 10, 11, 1023] {
2253            let props = WriterProperties::builder().set_write_batch_size(*i).build();
2254
2255            column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2256        }
2257    }
2258
2259    #[test]
2260    fn test_column_writer_dictionary_disabled_v1() {
2261        let props = WriterProperties::builder()
2262            .set_writer_version(WriterVersion::PARQUET_1_0)
2263            .set_dictionary_enabled(false)
2264            .build();
2265        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2266    }
2267
2268    #[test]
2269    fn test_column_writer_dictionary_disabled_v2() {
2270        let props = WriterProperties::builder()
2271            .set_writer_version(WriterVersion::PARQUET_2_0)
2272            .set_dictionary_enabled(false)
2273            .build();
2274        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2275    }
2276
2277    #[test]
2278    fn test_column_writer_compression_v1() {
2279        let props = WriterProperties::builder()
2280            .set_writer_version(WriterVersion::PARQUET_1_0)
2281            .set_compression(Compression::SNAPPY)
2282            .build();
2283        column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2284    }
2285
2286    #[test]
2287    fn test_column_writer_compression_v2() {
2288        let props = WriterProperties::builder()
2289            .set_writer_version(WriterVersion::PARQUET_2_0)
2290            .set_compression(Compression::SNAPPY)
2291            .build();
2292        column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2293    }
2294
2295    #[test]
2296    fn test_column_writer_add_data_pages_with_dict() {
2297        // ARROW-5129: Test verifies that we add data page in case of dictionary encoding
2298        // and no fallback occurred so far.
2299        let mut file = tempfile::tempfile().unwrap();
2300        let mut write = TrackedWrite::new(&mut file);
2301        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2302        let props = Arc::new(
2303            WriterProperties::builder()
2304                .set_data_page_size_limit(10)
2305                .set_write_batch_size(3) // write 3 values at a time
2306                .build(),
2307        );
2308        let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
2309        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2310        writer.write_batch(data, None, None).unwrap();
2311        let r = writer.close().unwrap();
2312
2313        drop(write);
2314
2315        // Read pages and check the sequence
2316        let props = ReaderProperties::builder()
2317            .set_backward_compatible_lz4(false)
2318            .build();
2319        let mut page_reader = Box::new(
2320            SerializedPageReader::new_with_properties(
2321                Arc::new(file),
2322                &r.metadata,
2323                r.rows_written as usize,
2324                None,
2325                Arc::new(props),
2326            )
2327            .unwrap(),
2328        );
2329        let mut res = Vec::new();
2330        while let Some(page) = page_reader.get_next_page().unwrap() {
2331            res.push((page.page_type(), page.num_values(), page.buffer().len()));
2332        }
2333        assert_eq!(
2334            res,
2335            vec![
2336                (PageType::DICTIONARY_PAGE, 10, 40),
2337                (PageType::DATA_PAGE, 9, 10),
2338                (PageType::DATA_PAGE, 1, 3),
2339            ]
2340        );
2341    }
2342
2343    #[test]
2344    fn test_bool_statistics() {
2345        let stats = statistics_roundtrip::<BoolType>(&[true, false, false, true]);
2346        // Booleans have an unsigned sort order and so are not compatible
2347        // with the deprecated `min` and `max` statistics
2348        assert!(!stats.is_min_max_backwards_compatible());
2349        if let Statistics::Boolean(stats) = stats {
2350            assert_eq!(stats.min_opt().unwrap(), &false);
2351            assert_eq!(stats.max_opt().unwrap(), &true);
2352        } else {
2353            panic!("expecting Statistics::Boolean, got {stats:?}");
2354        }
2355    }
2356
2357    #[test]
2358    fn test_int32_statistics() {
2359        let stats = statistics_roundtrip::<Int32Type>(&[-1, 3, -2, 2]);
2360        assert!(stats.is_min_max_backwards_compatible());
2361        if let Statistics::Int32(stats) = stats {
2362            assert_eq!(stats.min_opt().unwrap(), &-2);
2363            assert_eq!(stats.max_opt().unwrap(), &3);
2364        } else {
2365            panic!("expecting Statistics::Int32, got {stats:?}");
2366        }
2367    }
2368
2369    #[test]
2370    fn test_int64_statistics() {
2371        let stats = statistics_roundtrip::<Int64Type>(&[-1, 3, -2, 2]);
2372        assert!(stats.is_min_max_backwards_compatible());
2373        if let Statistics::Int64(stats) = stats {
2374            assert_eq!(stats.min_opt().unwrap(), &-2);
2375            assert_eq!(stats.max_opt().unwrap(), &3);
2376        } else {
2377            panic!("expecting Statistics::Int64, got {stats:?}");
2378        }
2379    }
2380
2381    #[test]
2382    fn test_int96_statistics() {
2383        let input = vec![
2384            Int96::from(vec![1, 20, 30]),
2385            Int96::from(vec![3, 20, 10]),
2386            Int96::from(vec![0, 20, 30]),
2387            Int96::from(vec![2, 20, 30]),
2388        ]
2389        .into_iter()
2390        .collect::<Vec<Int96>>();
2391
2392        let stats = statistics_roundtrip::<Int96Type>(&input);
2393        assert!(!stats.is_min_max_backwards_compatible());
2394        if let Statistics::Int96(stats) = stats {
2395            assert_eq!(stats.min_opt().unwrap(), &Int96::from(vec![0, 20, 30]));
2396            assert_eq!(stats.max_opt().unwrap(), &Int96::from(vec![3, 20, 10]));
2397        } else {
2398            panic!("expecting Statistics::Int96, got {stats:?}");
2399        }
2400    }
2401
2402    #[test]
2403    fn test_float_statistics() {
2404        let stats = statistics_roundtrip::<FloatType>(&[-1.0, 3.0, -2.0, 2.0]);
2405        assert!(stats.is_min_max_backwards_compatible());
2406        if let Statistics::Float(stats) = stats {
2407            assert_eq!(stats.min_opt().unwrap(), &-2.0);
2408            assert_eq!(stats.max_opt().unwrap(), &3.0);
2409        } else {
2410            panic!("expecting Statistics::Float, got {stats:?}");
2411        }
2412    }
2413
2414    #[test]
2415    fn test_double_statistics() {
2416        let stats = statistics_roundtrip::<DoubleType>(&[-1.0, 3.0, -2.0, 2.0]);
2417        assert!(stats.is_min_max_backwards_compatible());
2418        if let Statistics::Double(stats) = stats {
2419            assert_eq!(stats.min_opt().unwrap(), &-2.0);
2420            assert_eq!(stats.max_opt().unwrap(), &3.0);
2421        } else {
2422            panic!("expecting Statistics::Double, got {stats:?}");
2423        }
2424    }
2425
2426    #[test]
2427    fn test_byte_array_statistics() {
2428        let input = ["aawaa", "zz", "aaw", "m", "qrs"]
2429            .iter()
2430            .map(|&s| s.into())
2431            .collect::<Vec<_>>();
2432
2433        let stats = statistics_roundtrip::<ByteArrayType>(&input);
2434        assert!(!stats.is_min_max_backwards_compatible());
2435        if let Statistics::ByteArray(stats) = stats {
2436            assert_eq!(stats.min_opt().unwrap(), &ByteArray::from("aaw"));
2437            assert_eq!(stats.max_opt().unwrap(), &ByteArray::from("zz"));
2438        } else {
2439            panic!("expecting Statistics::ByteArray, got {stats:?}");
2440        }
2441    }
2442
2443    #[test]
2444    fn test_fixed_len_byte_array_statistics() {
2445        let input = ["aawaa", "zz   ", "aaw  ", "m    ", "qrs  "]
2446            .iter()
2447            .map(|&s| ByteArray::from(s).into())
2448            .collect::<Vec<_>>();
2449
2450        let stats = statistics_roundtrip::<FixedLenByteArrayType>(&input);
2451        assert!(!stats.is_min_max_backwards_compatible());
2452        if let Statistics::FixedLenByteArray(stats) = stats {
2453            let expected_min: FixedLenByteArray = ByteArray::from("aaw  ").into();
2454            assert_eq!(stats.min_opt().unwrap(), &expected_min);
2455            let expected_max: FixedLenByteArray = ByteArray::from("zz   ").into();
2456            assert_eq!(stats.max_opt().unwrap(), &expected_max);
2457        } else {
2458            panic!("expecting Statistics::FixedLenByteArray, got {stats:?}");
2459        }
2460    }
2461
2462    #[test]
2463    fn test_column_writer_check_float16_min_max() {
2464        let input = [
2465            -f16::ONE,
2466            f16::from_f32(3.0),
2467            -f16::from_f32(2.0),
2468            f16::from_f32(2.0),
2469        ]
2470        .into_iter()
2471        .map(|s| ByteArray::from(s).into())
2472        .collect::<Vec<_>>();
2473
2474        let stats = float16_statistics_roundtrip(&input);
2475        assert!(stats.is_min_max_backwards_compatible());
2476        assert_eq!(
2477            stats.min_opt().unwrap(),
2478            &ByteArray::from(-f16::from_f32(2.0))
2479        );
2480        assert_eq!(
2481            stats.max_opt().unwrap(),
2482            &ByteArray::from(f16::from_f32(3.0))
2483        );
2484    }
2485
2486    #[test]
2487    fn test_column_writer_check_float16_nan_middle() {
2488        let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
2489            .into_iter()
2490            .map(|s| ByteArray::from(s).into())
2491            .collect::<Vec<_>>();
2492
2493        let stats = float16_statistics_roundtrip(&input);
2494        assert!(stats.is_min_max_backwards_compatible());
2495        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2496        assert_eq!(
2497            stats.max_opt().unwrap(),
2498            &ByteArray::from(f16::ONE + f16::ONE)
2499        );
2500    }
2501
2502    #[test]
2503    fn test_float16_statistics_nan_middle() {
2504        let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
2505            .into_iter()
2506            .map(|s| ByteArray::from(s).into())
2507            .collect::<Vec<_>>();
2508
2509        let stats = float16_statistics_roundtrip(&input);
2510        assert!(stats.is_min_max_backwards_compatible());
2511        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2512        assert_eq!(
2513            stats.max_opt().unwrap(),
2514            &ByteArray::from(f16::ONE + f16::ONE)
2515        );
2516    }
2517
2518    #[test]
2519    fn test_float16_statistics_nan_start() {
2520        let input = [f16::NAN, f16::ONE, f16::ONE + f16::ONE]
2521            .into_iter()
2522            .map(|s| ByteArray::from(s).into())
2523            .collect::<Vec<_>>();
2524
2525        let stats = float16_statistics_roundtrip(&input);
2526        assert!(stats.is_min_max_backwards_compatible());
2527        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2528        assert_eq!(
2529            stats.max_opt().unwrap(),
2530            &ByteArray::from(f16::ONE + f16::ONE)
2531        );
2532    }
2533
2534    #[test]
2535    fn test_float16_statistics_nan_only() {
2536        let input = [f16::NAN, f16::NAN]
2537            .into_iter()
2538            .map(|s| ByteArray::from(s).into())
2539            .collect::<Vec<_>>();
2540
2541        let stats = float16_statistics_roundtrip(&input);
2542        assert!(stats.min_bytes_opt().is_none());
2543        assert!(stats.max_bytes_opt().is_none());
2544        assert!(stats.is_min_max_backwards_compatible());
2545    }
2546
2547    #[test]
2548    fn test_float16_statistics_zero_only() {
2549        let input = [f16::ZERO]
2550            .into_iter()
2551            .map(|s| ByteArray::from(s).into())
2552            .collect::<Vec<_>>();
2553
2554        let stats = float16_statistics_roundtrip(&input);
2555        assert!(stats.is_min_max_backwards_compatible());
2556        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2557        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2558    }
2559
2560    #[test]
2561    fn test_float16_statistics_neg_zero_only() {
2562        let input = [f16::NEG_ZERO]
2563            .into_iter()
2564            .map(|s| ByteArray::from(s).into())
2565            .collect::<Vec<_>>();
2566
2567        let stats = float16_statistics_roundtrip(&input);
2568        assert!(stats.is_min_max_backwards_compatible());
2569        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2570        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2571    }
2572
2573    #[test]
2574    fn test_float16_statistics_zero_min() {
2575        let input = [f16::ZERO, f16::ONE, f16::NAN, f16::PI]
2576            .into_iter()
2577            .map(|s| ByteArray::from(s).into())
2578            .collect::<Vec<_>>();
2579
2580        let stats = float16_statistics_roundtrip(&input);
2581        assert!(stats.is_min_max_backwards_compatible());
2582        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2583        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::PI));
2584    }
2585
2586    #[test]
2587    fn test_float16_statistics_neg_zero_max() {
2588        let input = [f16::NEG_ZERO, f16::NEG_ONE, f16::NAN, -f16::PI]
2589            .into_iter()
2590            .map(|s| ByteArray::from(s).into())
2591            .collect::<Vec<_>>();
2592
2593        let stats = float16_statistics_roundtrip(&input);
2594        assert!(stats.is_min_max_backwards_compatible());
2595        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(-f16::PI));
2596        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2597    }
2598
2599    #[test]
2600    fn test_float_statistics_nan_middle() {
2601        let stats = statistics_roundtrip::<FloatType>(&[1.0, f32::NAN, 2.0]);
2602        assert!(stats.is_min_max_backwards_compatible());
2603        if let Statistics::Float(stats) = stats {
2604            assert_eq!(stats.min_opt().unwrap(), &1.0);
2605            assert_eq!(stats.max_opt().unwrap(), &2.0);
2606        } else {
2607            panic!("expecting Statistics::Float");
2608        }
2609    }
2610
2611    #[test]
2612    fn test_float_statistics_nan_start() {
2613        let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, 1.0, 2.0]);
2614        assert!(stats.is_min_max_backwards_compatible());
2615        if let Statistics::Float(stats) = stats {
2616            assert_eq!(stats.min_opt().unwrap(), &1.0);
2617            assert_eq!(stats.max_opt().unwrap(), &2.0);
2618        } else {
2619            panic!("expecting Statistics::Float");
2620        }
2621    }
2622
2623    #[test]
2624    fn test_float_statistics_nan_only() {
2625        let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, f32::NAN]);
2626        assert!(stats.min_bytes_opt().is_none());
2627        assert!(stats.max_bytes_opt().is_none());
2628        assert!(stats.is_min_max_backwards_compatible());
2629        assert!(matches!(stats, Statistics::Float(_)));
2630    }
2631
2632    #[test]
2633    fn test_float_statistics_zero_only() {
2634        let stats = statistics_roundtrip::<FloatType>(&[0.0]);
2635        assert!(stats.is_min_max_backwards_compatible());
2636        if let Statistics::Float(stats) = stats {
2637            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2638            assert!(stats.min_opt().unwrap().is_sign_negative());
2639            assert_eq!(stats.max_opt().unwrap(), &0.0);
2640            assert!(stats.max_opt().unwrap().is_sign_positive());
2641        } else {
2642            panic!("expecting Statistics::Float");
2643        }
2644    }
2645
2646    #[test]
2647    fn test_float_statistics_neg_zero_only() {
2648        let stats = statistics_roundtrip::<FloatType>(&[-0.0]);
2649        assert!(stats.is_min_max_backwards_compatible());
2650        if let Statistics::Float(stats) = stats {
2651            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2652            assert!(stats.min_opt().unwrap().is_sign_negative());
2653            assert_eq!(stats.max_opt().unwrap(), &0.0);
2654            assert!(stats.max_opt().unwrap().is_sign_positive());
2655        } else {
2656            panic!("expecting Statistics::Float");
2657        }
2658    }
2659
2660    #[test]
2661    fn test_float_statistics_zero_min() {
2662        let stats = statistics_roundtrip::<FloatType>(&[0.0, 1.0, f32::NAN, 2.0]);
2663        assert!(stats.is_min_max_backwards_compatible());
2664        if let Statistics::Float(stats) = stats {
2665            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2666            assert!(stats.min_opt().unwrap().is_sign_negative());
2667            assert_eq!(stats.max_opt().unwrap(), &2.0);
2668        } else {
2669            panic!("expecting Statistics::Float");
2670        }
2671    }
2672
2673    #[test]
2674    fn test_float_statistics_neg_zero_max() {
2675        let stats = statistics_roundtrip::<FloatType>(&[-0.0, -1.0, f32::NAN, -2.0]);
2676        assert!(stats.is_min_max_backwards_compatible());
2677        if let Statistics::Float(stats) = stats {
2678            assert_eq!(stats.min_opt().unwrap(), &-2.0);
2679            assert_eq!(stats.max_opt().unwrap(), &0.0);
2680            assert!(stats.max_opt().unwrap().is_sign_positive());
2681        } else {
2682            panic!("expecting Statistics::Float");
2683        }
2684    }
2685
2686    #[test]
2687    fn test_double_statistics_nan_middle() {
2688        let stats = statistics_roundtrip::<DoubleType>(&[1.0, f64::NAN, 2.0]);
2689        assert!(stats.is_min_max_backwards_compatible());
2690        if let Statistics::Double(stats) = stats {
2691            assert_eq!(stats.min_opt().unwrap(), &1.0);
2692            assert_eq!(stats.max_opt().unwrap(), &2.0);
2693        } else {
2694            panic!("expecting Statistics::Double");
2695        }
2696    }
2697
2698    #[test]
2699    fn test_double_statistics_nan_start() {
2700        let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, 1.0, 2.0]);
2701        assert!(stats.is_min_max_backwards_compatible());
2702        if let Statistics::Double(stats) = stats {
2703            assert_eq!(stats.min_opt().unwrap(), &1.0);
2704            assert_eq!(stats.max_opt().unwrap(), &2.0);
2705        } else {
2706            panic!("expecting Statistics::Double");
2707        }
2708    }
2709
2710    #[test]
2711    fn test_double_statistics_nan_only() {
2712        let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, f64::NAN]);
2713        assert!(stats.min_bytes_opt().is_none());
2714        assert!(stats.max_bytes_opt().is_none());
2715        assert!(matches!(stats, Statistics::Double(_)));
2716        assert!(stats.is_min_max_backwards_compatible());
2717    }
2718
2719    #[test]
2720    fn test_double_statistics_zero_only() {
2721        let stats = statistics_roundtrip::<DoubleType>(&[0.0]);
2722        assert!(stats.is_min_max_backwards_compatible());
2723        if let Statistics::Double(stats) = stats {
2724            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2725            assert!(stats.min_opt().unwrap().is_sign_negative());
2726            assert_eq!(stats.max_opt().unwrap(), &0.0);
2727            assert!(stats.max_opt().unwrap().is_sign_positive());
2728        } else {
2729            panic!("expecting Statistics::Double");
2730        }
2731    }
2732
2733    #[test]
2734    fn test_double_statistics_neg_zero_only() {
2735        let stats = statistics_roundtrip::<DoubleType>(&[-0.0]);
2736        assert!(stats.is_min_max_backwards_compatible());
2737        if let Statistics::Double(stats) = stats {
2738            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2739            assert!(stats.min_opt().unwrap().is_sign_negative());
2740            assert_eq!(stats.max_opt().unwrap(), &0.0);
2741            assert!(stats.max_opt().unwrap().is_sign_positive());
2742        } else {
2743            panic!("expecting Statistics::Double");
2744        }
2745    }
2746
2747    #[test]
2748    fn test_double_statistics_zero_min() {
2749        let stats = statistics_roundtrip::<DoubleType>(&[0.0, 1.0, f64::NAN, 2.0]);
2750        assert!(stats.is_min_max_backwards_compatible());
2751        if let Statistics::Double(stats) = stats {
2752            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2753            assert!(stats.min_opt().unwrap().is_sign_negative());
2754            assert_eq!(stats.max_opt().unwrap(), &2.0);
2755        } else {
2756            panic!("expecting Statistics::Double");
2757        }
2758    }
2759
2760    #[test]
2761    fn test_double_statistics_neg_zero_max() {
2762        let stats = statistics_roundtrip::<DoubleType>(&[-0.0, -1.0, f64::NAN, -2.0]);
2763        assert!(stats.is_min_max_backwards_compatible());
2764        if let Statistics::Double(stats) = stats {
2765            assert_eq!(stats.min_opt().unwrap(), &-2.0);
2766            assert_eq!(stats.max_opt().unwrap(), &0.0);
2767            assert!(stats.max_opt().unwrap().is_sign_positive());
2768        } else {
2769            panic!("expecting Statistics::Double");
2770        }
2771    }
2772
2773    #[test]
2774    fn test_compare_greater_byte_array_decimals() {
2775        assert!(!compare_greater_byte_array_decimals(&[], &[],),);
2776        assert!(compare_greater_byte_array_decimals(&[1u8,], &[],),);
2777        assert!(!compare_greater_byte_array_decimals(&[], &[1u8,],),);
2778        assert!(compare_greater_byte_array_decimals(&[1u8,], &[0u8,],),);
2779        assert!(!compare_greater_byte_array_decimals(&[1u8,], &[1u8,],),);
2780        assert!(compare_greater_byte_array_decimals(&[1u8, 0u8,], &[0u8,],),);
2781        assert!(!compare_greater_byte_array_decimals(
2782            &[0u8, 1u8,],
2783            &[1u8, 0u8,],
2784        ),);
2785        assert!(!compare_greater_byte_array_decimals(
2786            &[255u8, 35u8, 0u8, 0u8,],
2787            &[0u8,],
2788        ),);
2789        assert!(compare_greater_byte_array_decimals(
2790            &[0u8,],
2791            &[255u8, 35u8, 0u8, 0u8,],
2792        ),);
2793    }
2794
2795    #[test]
2796    fn test_column_index_with_null_pages() {
2797        // write a single page of all nulls
2798        let page_writer = get_test_page_writer();
2799        let props = Default::default();
2800        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
2801        writer.write_batch(&[], Some(&[0, 0, 0, 0]), None).unwrap();
2802
2803        let r = writer.close().unwrap();
2804        assert!(r.column_index.is_some());
2805        let col_idx = r.column_index.unwrap();
2806        // null_pages should be true for page 0
2807        assert!(col_idx.null_pages[0]);
2808        // min and max should be empty byte arrays
2809        assert_eq!(col_idx.min_values[0].len(), 0);
2810        assert_eq!(col_idx.max_values[0].len(), 0);
2811        // null_counts should be defined and be 4 for page 0
2812        assert!(col_idx.null_counts.is_some());
2813        assert_eq!(col_idx.null_counts.as_ref().unwrap()[0], 4);
2814        // there is no repetition so rep histogram should be absent
2815        assert!(col_idx.repetition_level_histograms.is_none());
2816        // definition_level_histogram should be present and should be 0:4, 1:0
2817        assert!(col_idx.definition_level_histograms.is_some());
2818        assert_eq!(col_idx.definition_level_histograms.unwrap(), &[4, 0]);
2819    }
2820
2821    #[test]
2822    fn test_column_offset_index_metadata() {
2823        // write data
2824        // and check the offset index and column index
2825        let page_writer = get_test_page_writer();
2826        let props = Default::default();
2827        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2828        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2829        // first page
2830        writer.flush_data_pages().unwrap();
2831        // second page
2832        writer.write_batch(&[4, 8, 2, -5], None, None).unwrap();
2833
2834        let r = writer.close().unwrap();
2835        let column_index = r.column_index.unwrap();
2836        let offset_index = r.offset_index.unwrap();
2837
2838        assert_eq!(8, r.rows_written);
2839
2840        // column index
2841        assert_eq!(2, column_index.null_pages.len());
2842        assert_eq!(2, offset_index.page_locations.len());
2843        assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order);
2844        for idx in 0..2 {
2845            assert!(!column_index.null_pages[idx]);
2846            assert_eq!(0, column_index.null_counts.as_ref().unwrap()[idx]);
2847        }
2848
2849        if let Some(stats) = r.metadata.statistics() {
2850            assert_eq!(stats.null_count_opt(), Some(0));
2851            assert_eq!(stats.distinct_count_opt(), None);
2852            if let Statistics::Int32(stats) = stats {
2853                // first page is [1,2,3,4]
2854                // second page is [-5,2,4,8]
2855                // note that we don't increment here, as this is a non BinaryArray type.
2856                assert_eq!(
2857                    stats.min_bytes_opt(),
2858                    Some(column_index.min_values[1].as_slice())
2859                );
2860                assert_eq!(
2861                    stats.max_bytes_opt(),
2862                    column_index.max_values.get(1).map(Vec::as_slice)
2863                );
2864            } else {
2865                panic!("expecting Statistics::Int32");
2866            }
2867        } else {
2868            panic!("metadata missing statistics");
2869        }
2870
2871        // page location
2872        assert_eq!(0, offset_index.page_locations[0].first_row_index);
2873        assert_eq!(4, offset_index.page_locations[1].first_row_index);
2874    }
2875
2876    /// Verify min/max value truncation in the column index works as expected
2877    #[test]
2878    fn test_column_offset_index_metadata_truncating() {
2879        // write data
2880        // and check the offset index and column index
2881        let page_writer = get_test_page_writer();
2882        let props = Default::default();
2883        let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
2884
2885        let mut data = vec![FixedLenByteArray::default(); 3];
2886        // This is the expected min value - "aaa..."
2887        data[0].set_data(Bytes::from(vec![97_u8; 200]));
2888        // This is the expected max value - "ZZZ..."
2889        data[1].set_data(Bytes::from(vec![112_u8; 200]));
2890        data[2].set_data(Bytes::from(vec![98_u8; 200]));
2891
2892        writer.write_batch(&data, None, None).unwrap();
2893
2894        writer.flush_data_pages().unwrap();
2895
2896        let r = writer.close().unwrap();
2897        let column_index = r.column_index.unwrap();
2898        let offset_index = r.offset_index.unwrap();
2899
2900        assert_eq!(3, r.rows_written);
2901
2902        // column index
2903        assert_eq!(1, column_index.null_pages.len());
2904        assert_eq!(1, offset_index.page_locations.len());
2905        assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
2906        assert!(!column_index.null_pages[0]);
2907        assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]);
2908
2909        if let Some(stats) = r.metadata.statistics() {
2910            assert_eq!(stats.null_count_opt(), Some(0));
2911            assert_eq!(stats.distinct_count_opt(), None);
2912            if let Statistics::FixedLenByteArray(stats) = stats {
2913                let column_index_min_value = &column_index.min_values[0];
2914                let column_index_max_value = &column_index.max_values[0];
2915
2916                // Column index stats are truncated, while the column chunk's aren't.
2917                assert_ne!(
2918                    stats.min_bytes_opt(),
2919                    Some(column_index_min_value.as_slice())
2920                );
2921                assert_ne!(
2922                    stats.max_bytes_opt(),
2923                    Some(column_index_max_value.as_slice())
2924                );
2925
2926                assert_eq!(
2927                    column_index_min_value.len(),
2928                    DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
2929                );
2930                assert_eq!(column_index_min_value.as_slice(), &[97_u8; 64]);
2931                assert_eq!(
2932                    column_index_max_value.len(),
2933                    DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
2934                );
2935
2936                // We expect the last byte to be incremented
2937                assert_eq!(
2938                    *column_index_max_value.last().unwrap(),
2939                    *column_index_max_value.first().unwrap() + 1
2940                );
2941            } else {
2942                panic!("expecting Statistics::FixedLenByteArray");
2943            }
2944        } else {
2945            panic!("metadata missing statistics");
2946        }
2947    }
2948
2949    #[test]
2950    fn test_column_offset_index_truncating_spec_example() {
2951        // write data
2952        // and check the offset index and column index
2953        let page_writer = get_test_page_writer();
2954
2955        // Truncate values at 1 byte
2956        let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
2957        let props = Arc::new(builder.build());
2958        let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
2959
2960        let mut data = vec![FixedLenByteArray::default(); 1];
2961        // This is the expected min value
2962        data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
2963
2964        writer.write_batch(&data, None, None).unwrap();
2965
2966        writer.flush_data_pages().unwrap();
2967
2968        let r = writer.close().unwrap();
2969        let column_index = r.column_index.unwrap();
2970        let offset_index = r.offset_index.unwrap();
2971
2972        assert_eq!(1, r.rows_written);
2973
2974        // column index
2975        assert_eq!(1, column_index.null_pages.len());
2976        assert_eq!(1, offset_index.page_locations.len());
2977        assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
2978        assert!(!column_index.null_pages[0]);
2979        assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]);
2980
2981        if let Some(stats) = r.metadata.statistics() {
2982            assert_eq!(stats.null_count_opt(), Some(0));
2983            assert_eq!(stats.distinct_count_opt(), None);
2984            if let Statistics::FixedLenByteArray(_stats) = stats {
2985                let column_index_min_value = &column_index.min_values[0];
2986                let column_index_max_value = &column_index.max_values[0];
2987
2988                assert_eq!(column_index_min_value.len(), 1);
2989                assert_eq!(column_index_max_value.len(), 1);
2990
2991                assert_eq!("B".as_bytes(), column_index_min_value.as_slice());
2992                assert_eq!("C".as_bytes(), column_index_max_value.as_slice());
2993
2994                assert_ne!(column_index_min_value, stats.min_bytes_opt().unwrap());
2995                assert_ne!(column_index_max_value, stats.max_bytes_opt().unwrap());
2996            } else {
2997                panic!("expecting Statistics::FixedLenByteArray");
2998            }
2999        } else {
3000            panic!("metadata missing statistics");
3001        }
3002    }
3003
3004    #[test]
3005    fn test_float16_min_max_no_truncation() {
3006        // Even if we set truncation to occur at 1 byte, we should not truncate for Float16
3007        let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3008        let props = Arc::new(builder.build());
3009        let page_writer = get_test_page_writer();
3010        let mut writer = get_test_float16_column_writer(page_writer, props);
3011
3012        let expected_value = f16::PI.to_le_bytes().to_vec();
3013        let data = vec![ByteArray::from(expected_value.clone()).into()];
3014        writer.write_batch(&data, None, None).unwrap();
3015        writer.flush_data_pages().unwrap();
3016
3017        let r = writer.close().unwrap();
3018
3019        // stats should still be written
3020        // ensure bytes weren't truncated for column index
3021        let column_index = r.column_index.unwrap();
3022        let column_index_min_bytes = column_index.min_values[0].as_slice();
3023        let column_index_max_bytes = column_index.max_values[0].as_slice();
3024        assert_eq!(expected_value, column_index_min_bytes);
3025        assert_eq!(expected_value, column_index_max_bytes);
3026
3027        // ensure bytes weren't truncated for statistics
3028        let stats = r.metadata.statistics().unwrap();
3029        if let Statistics::FixedLenByteArray(stats) = stats {
3030            let stats_min_bytes = stats.min_bytes_opt().unwrap();
3031            let stats_max_bytes = stats.max_bytes_opt().unwrap();
3032            assert_eq!(expected_value, stats_min_bytes);
3033            assert_eq!(expected_value, stats_max_bytes);
3034        } else {
3035            panic!("expecting Statistics::FixedLenByteArray");
3036        }
3037    }
3038
3039    #[test]
3040    fn test_decimal_min_max_no_truncation() {
3041        // Even if we set truncation to occur at 1 byte, we should not truncate for Decimal
3042        let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3043        let props = Arc::new(builder.build());
3044        let page_writer = get_test_page_writer();
3045        let mut writer =
3046            get_test_decimals_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3047
3048        let expected_value = vec![
3049            255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8, 35u8,
3050            231u8, 90u8, 0u8, 0u8,
3051        ];
3052        let data = vec![ByteArray::from(expected_value.clone()).into()];
3053        writer.write_batch(&data, None, None).unwrap();
3054        writer.flush_data_pages().unwrap();
3055
3056        let r = writer.close().unwrap();
3057
3058        // stats should still be written
3059        // ensure bytes weren't truncated for column index
3060        let column_index = r.column_index.unwrap();
3061        let column_index_min_bytes = column_index.min_values[0].as_slice();
3062        let column_index_max_bytes = column_index.max_values[0].as_slice();
3063        assert_eq!(expected_value, column_index_min_bytes);
3064        assert_eq!(expected_value, column_index_max_bytes);
3065
3066        // ensure bytes weren't truncated for statistics
3067        let stats = r.metadata.statistics().unwrap();
3068        if let Statistics::FixedLenByteArray(stats) = stats {
3069            let stats_min_bytes = stats.min_bytes_opt().unwrap();
3070            let stats_max_bytes = stats.max_bytes_opt().unwrap();
3071            assert_eq!(expected_value, stats_min_bytes);
3072            assert_eq!(expected_value, stats_max_bytes);
3073        } else {
3074            panic!("expecting Statistics::FixedLenByteArray");
3075        }
3076    }
3077
3078    #[test]
3079    fn test_statistics_truncating_byte_array() {
3080        let page_writer = get_test_page_writer();
3081
3082        const TEST_TRUNCATE_LENGTH: usize = 1;
3083
3084        // Truncate values at 1 byte
3085        let builder =
3086            WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3087        let props = Arc::new(builder.build());
3088        let mut writer = get_test_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
3089
3090        let mut data = vec![ByteArray::default(); 1];
3091        // This is the expected min value
3092        data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
3093
3094        writer.write_batch(&data, None, None).unwrap();
3095
3096        writer.flush_data_pages().unwrap();
3097
3098        let r = writer.close().unwrap();
3099
3100        assert_eq!(1, r.rows_written);
3101
3102        let stats = r.metadata.statistics().expect("statistics");
3103        assert_eq!(stats.null_count_opt(), Some(0));
3104        assert_eq!(stats.distinct_count_opt(), None);
3105        if let Statistics::ByteArray(_stats) = stats {
3106            let min_value = _stats.min_opt().unwrap();
3107            let max_value = _stats.max_opt().unwrap();
3108
3109            assert!(!_stats.min_is_exact());
3110            assert!(!_stats.max_is_exact());
3111
3112            assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3113            assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3114
3115            assert_eq!("B".as_bytes(), min_value.as_bytes());
3116            assert_eq!("C".as_bytes(), max_value.as_bytes());
3117        } else {
3118            panic!("expecting Statistics::ByteArray");
3119        }
3120    }
3121
3122    #[test]
3123    fn test_statistics_truncating_fixed_len_byte_array() {
3124        let page_writer = get_test_page_writer();
3125
3126        const TEST_TRUNCATE_LENGTH: usize = 1;
3127
3128        // Truncate values at 1 byte
3129        let builder =
3130            WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3131        let props = Arc::new(builder.build());
3132        let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3133
3134        let mut data = vec![FixedLenByteArray::default(); 1];
3135
3136        const PSEUDO_DECIMAL_VALUE: i128 = 6541894651216648486512564456564654;
3137        const PSEUDO_DECIMAL_BYTES: [u8; 16] = PSEUDO_DECIMAL_VALUE.to_be_bytes();
3138
3139        const EXPECTED_MIN: [u8; TEST_TRUNCATE_LENGTH] = [PSEUDO_DECIMAL_BYTES[0]]; // parquet specifies big-endian order for decimals
3140        const EXPECTED_MAX: [u8; TEST_TRUNCATE_LENGTH] =
3141            [PSEUDO_DECIMAL_BYTES[0].overflowing_add(1).0];
3142
3143        // This is the expected min value
3144        data[0].set_data(Bytes::from(PSEUDO_DECIMAL_BYTES.as_slice()));
3145
3146        writer.write_batch(&data, None, None).unwrap();
3147
3148        writer.flush_data_pages().unwrap();
3149
3150        let r = writer.close().unwrap();
3151
3152        assert_eq!(1, r.rows_written);
3153
3154        let stats = r.metadata.statistics().expect("statistics");
3155        assert_eq!(stats.null_count_opt(), Some(0));
3156        assert_eq!(stats.distinct_count_opt(), None);
3157        if let Statistics::FixedLenByteArray(_stats) = stats {
3158            let min_value = _stats.min_opt().unwrap();
3159            let max_value = _stats.max_opt().unwrap();
3160
3161            assert!(!_stats.min_is_exact());
3162            assert!(!_stats.max_is_exact());
3163
3164            assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3165            assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3166
3167            assert_eq!(EXPECTED_MIN.as_slice(), min_value.as_bytes());
3168            assert_eq!(EXPECTED_MAX.as_slice(), max_value.as_bytes());
3169
3170            let reconstructed_min = i128::from_be_bytes([
3171                min_value.as_bytes()[0],
3172                0,
3173                0,
3174                0,
3175                0,
3176                0,
3177                0,
3178                0,
3179                0,
3180                0,
3181                0,
3182                0,
3183                0,
3184                0,
3185                0,
3186                0,
3187            ]);
3188
3189            let reconstructed_max = i128::from_be_bytes([
3190                max_value.as_bytes()[0],
3191                0,
3192                0,
3193                0,
3194                0,
3195                0,
3196                0,
3197                0,
3198                0,
3199                0,
3200                0,
3201                0,
3202                0,
3203                0,
3204                0,
3205                0,
3206            ]);
3207
3208            // check that the inner value is correctly bounded by the min/max
3209            println!("min: {reconstructed_min} {PSEUDO_DECIMAL_VALUE}");
3210            assert!(reconstructed_min <= PSEUDO_DECIMAL_VALUE);
3211            println!("max {reconstructed_max} {PSEUDO_DECIMAL_VALUE}");
3212            assert!(reconstructed_max >= PSEUDO_DECIMAL_VALUE);
3213        } else {
3214            panic!("expecting Statistics::FixedLenByteArray");
3215        }
3216    }
3217
3218    #[test]
3219    fn test_send() {
3220        fn test<T: Send>() {}
3221        test::<ColumnWriterImpl<Int32Type>>();
3222    }
3223
3224    #[test]
3225    fn test_increment() {
3226        let v = increment(vec![0, 0, 0]).unwrap();
3227        assert_eq!(&v, &[0, 0, 1]);
3228
3229        // Handle overflow
3230        let v = increment(vec![0, 255, 255]).unwrap();
3231        assert_eq!(&v, &[1, 0, 0]);
3232
3233        // Return `None` if all bytes are u8::MAX
3234        let v = increment(vec![255, 255, 255]);
3235        assert!(v.is_none());
3236    }
3237
3238    #[test]
3239    fn test_increment_utf8() {
3240        let test_inc = |o: &str, expected: &str| {
3241            if let Ok(v) = String::from_utf8(increment_utf8(o).unwrap()) {
3242                // Got the expected result...
3243                assert_eq!(v, expected);
3244                // and it's greater than the original string
3245                assert!(*v > *o);
3246                // Also show that BinaryArray level comparison works here
3247                let mut greater = ByteArray::new();
3248                greater.set_data(Bytes::from(v));
3249                let mut original = ByteArray::new();
3250                original.set_data(Bytes::from(o.as_bytes().to_vec()));
3251                assert!(greater > original);
3252            } else {
3253                panic!("Expected incremented UTF8 string to also be valid.");
3254            }
3255        };
3256
3257        // Basic ASCII case
3258        test_inc("hello", "hellp");
3259
3260        // 1-byte ending in max 1-byte
3261        test_inc("a\u{7f}", "b");
3262
3263        // 1-byte max should not truncate as it would need 2-byte code points
3264        assert!(increment_utf8("\u{7f}\u{7f}").is_none());
3265
3266        // UTF8 string
3267        test_inc("❤️🧡💛💚💙💜", "❤️🧡💛💚💙💝");
3268
3269        // 2-byte without overflow
3270        test_inc("éééé", "éééê");
3271
3272        // 2-byte that overflows lowest byte
3273        test_inc("\u{ff}\u{ff}", "\u{ff}\u{100}");
3274
3275        // 2-byte ending in max 2-byte
3276        test_inc("a\u{7ff}", "b");
3277
3278        // Max 2-byte should not truncate as it would need 3-byte code points
3279        assert!(increment_utf8("\u{7ff}\u{7ff}").is_none());
3280
3281        // 3-byte without overflow [U+800, U+800] -> [U+800, U+801] (note that these
3282        // characters should render right to left).
3283        test_inc("ࠀࠀ", "ࠀࠁ");
3284
3285        // 3-byte ending in max 3-byte
3286        test_inc("a\u{ffff}", "b");
3287
3288        // Max 3-byte should not truncate as it would need 4-byte code points
3289        assert!(increment_utf8("\u{ffff}\u{ffff}").is_none());
3290
3291        // 4-byte without overflow
3292        test_inc("𐀀𐀀", "𐀀𐀁");
3293
3294        // 4-byte ending in max unicode
3295        test_inc("a\u{10ffff}", "b");
3296
3297        // Max 4-byte should not truncate
3298        assert!(increment_utf8("\u{10ffff}\u{10ffff}").is_none());
3299
3300        // Skip over surrogate pair range (0xD800..=0xDFFF)
3301        //test_inc("a\u{D7FF}", "a\u{e000}");
3302        test_inc("a\u{D7FF}", "b");
3303    }
3304
3305    #[test]
3306    fn test_truncate_utf8() {
3307        // No-op
3308        let data = "❤️🧡💛💚💙💜";
3309        let r = truncate_utf8(data, data.len()).unwrap();
3310        assert_eq!(r.len(), data.len());
3311        assert_eq!(&r, data.as_bytes());
3312
3313        // We slice it away from the UTF8 boundary
3314        let r = truncate_utf8(data, 13).unwrap();
3315        assert_eq!(r.len(), 10);
3316        assert_eq!(&r, "❤️🧡".as_bytes());
3317
3318        // One multi-byte code point, and a length shorter than it, so we can't slice it
3319        let r = truncate_utf8("\u{0836}", 1);
3320        assert!(r.is_none());
3321
3322        // Test truncate and increment for max bounds on UTF-8 statistics
3323        // 7-bit (i.e. ASCII)
3324        let r = truncate_and_increment_utf8("yyyyyyyyy", 8).unwrap();
3325        assert_eq!(&r, "yyyyyyyz".as_bytes());
3326
3327        // 2-byte without overflow
3328        let r = truncate_and_increment_utf8("ééééé", 7).unwrap();
3329        assert_eq!(&r, "ééê".as_bytes());
3330
3331        // 2-byte that overflows lowest byte
3332        let r = truncate_and_increment_utf8("\u{ff}\u{ff}\u{ff}\u{ff}\u{ff}", 8).unwrap();
3333        assert_eq!(&r, "\u{ff}\u{ff}\u{ff}\u{100}".as_bytes());
3334
3335        // max 2-byte should not truncate as it would need 3-byte code points
3336        let r = truncate_and_increment_utf8("߿߿߿߿߿", 8);
3337        assert!(r.is_none());
3338
3339        // 3-byte without overflow [U+800, U+800, U+800] -> [U+800, U+801] (note that these
3340        // characters should render right to left).
3341        let r = truncate_and_increment_utf8("ࠀࠀࠀࠀ", 8).unwrap();
3342        assert_eq!(&r, "ࠀࠁ".as_bytes());
3343
3344        // max 3-byte should not truncate as it would need 4-byte code points
3345        let r = truncate_and_increment_utf8("\u{ffff}\u{ffff}\u{ffff}", 8);
3346        assert!(r.is_none());
3347
3348        // 4-byte without overflow
3349        let r = truncate_and_increment_utf8("𐀀𐀀𐀀𐀀", 9).unwrap();
3350        assert_eq!(&r, "𐀀𐀁".as_bytes());
3351
3352        // max 4-byte should not truncate
3353        let r = truncate_and_increment_utf8("\u{10ffff}\u{10ffff}", 8);
3354        assert!(r.is_none());
3355    }
3356
3357    #[test]
3358    // Check fallback truncation of statistics that should be UTF-8, but aren't
3359    // (see https://github.com/apache/arrow-rs/pull/6870).
3360    fn test_byte_array_truncate_invalid_utf8_statistics() {
3361        let message_type = "
3362            message test_schema {
3363                OPTIONAL BYTE_ARRAY a (UTF8);
3364            }
3365        ";
3366        let schema = Arc::new(parse_message_type(message_type).unwrap());
3367
3368        // Create Vec<ByteArray> containing non-UTF8 bytes
3369        let data = vec![ByteArray::from(vec![128u8; 32]); 7];
3370        let def_levels = [1, 1, 1, 1, 0, 1, 0, 1, 0, 1];
3371        let file: File = tempfile::tempfile().unwrap();
3372        let props = Arc::new(
3373            WriterProperties::builder()
3374                .set_statistics_enabled(EnabledStatistics::Chunk)
3375                .set_statistics_truncate_length(Some(8))
3376                .build(),
3377        );
3378
3379        let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
3380        let mut row_group_writer = writer.next_row_group().unwrap();
3381
3382        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
3383        col_writer
3384            .typed::<ByteArrayType>()
3385            .write_batch(&data, Some(&def_levels), None)
3386            .unwrap();
3387        col_writer.close().unwrap();
3388        row_group_writer.close().unwrap();
3389        let file_metadata = writer.close().unwrap();
3390        assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
3391        let stats = file_metadata.row_groups[0].columns[0]
3392            .meta_data
3393            .as_ref()
3394            .unwrap()
3395            .statistics
3396            .as_ref()
3397            .unwrap();
3398        assert!(!stats.is_max_value_exact.unwrap());
3399        // Truncation of invalid UTF-8 should fall back to binary truncation, so last byte should
3400        // be incremented by 1.
3401        assert_eq!(
3402            stats.max_value,
3403            Some([128, 128, 128, 128, 128, 128, 128, 129].to_vec())
3404        );
3405    }
3406
3407    #[test]
3408    fn test_increment_max_binary_chars() {
3409        let r = increment(vec![0xFF, 0xFE, 0xFD, 0xFF, 0xFF]);
3410        assert_eq!(&r.unwrap(), &[0xFF, 0xFE, 0xFE, 0x00, 0x00]);
3411
3412        let incremented = increment(vec![0xFF, 0xFF, 0xFF]);
3413        assert!(incremented.is_none())
3414    }
3415
3416    #[test]
3417    fn test_no_column_index_when_stats_disabled() {
3418        // https://github.com/apache/arrow-rs/issues/6010
3419        // Test that column index is not created/written for all-nulls column when page
3420        // statistics are disabled.
3421        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3422        let props = Arc::new(
3423            WriterProperties::builder()
3424                .set_statistics_enabled(EnabledStatistics::None)
3425                .build(),
3426        );
3427        let column_writer = get_column_writer(descr, props, get_test_page_writer());
3428        let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3429
3430        let data = Vec::new();
3431        let def_levels = vec![0; 10];
3432        writer.write_batch(&data, Some(&def_levels), None).unwrap();
3433        writer.flush_data_pages().unwrap();
3434
3435        let column_close_result = writer.close().unwrap();
3436        assert!(column_close_result.offset_index.is_some());
3437        assert!(column_close_result.column_index.is_none());
3438    }
3439
3440    #[test]
3441    fn test_no_offset_index_when_disabled() {
3442        // Test that offset indexes can be disabled
3443        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3444        let props = Arc::new(
3445            WriterProperties::builder()
3446                .set_statistics_enabled(EnabledStatistics::None)
3447                .set_offset_index_disabled(true)
3448                .build(),
3449        );
3450        let column_writer = get_column_writer(descr, props, get_test_page_writer());
3451        let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3452
3453        let data = Vec::new();
3454        let def_levels = vec![0; 10];
3455        writer.write_batch(&data, Some(&def_levels), None).unwrap();
3456        writer.flush_data_pages().unwrap();
3457
3458        let column_close_result = writer.close().unwrap();
3459        assert!(column_close_result.offset_index.is_none());
3460        assert!(column_close_result.column_index.is_none());
3461    }
3462
3463    #[test]
3464    fn test_offset_index_overridden() {
3465        // Test that offset indexes are not disabled when gathering page statistics
3466        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3467        let props = Arc::new(
3468            WriterProperties::builder()
3469                .set_statistics_enabled(EnabledStatistics::Page)
3470                .set_offset_index_disabled(true)
3471                .build(),
3472        );
3473        let column_writer = get_column_writer(descr, props, get_test_page_writer());
3474        let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3475
3476        let data = Vec::new();
3477        let def_levels = vec![0; 10];
3478        writer.write_batch(&data, Some(&def_levels), None).unwrap();
3479        writer.flush_data_pages().unwrap();
3480
3481        let column_close_result = writer.close().unwrap();
3482        assert!(column_close_result.offset_index.is_some());
3483        assert!(column_close_result.column_index.is_some());
3484    }
3485
3486    #[test]
3487    fn test_boundary_order() -> Result<()> {
3488        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3489        // min max both ascending
3490        let column_close_result = write_multiple_pages::<Int32Type>(
3491            &descr,
3492            &[
3493                &[Some(-10), Some(10)],
3494                &[Some(-5), Some(11)],
3495                &[None],
3496                &[Some(-5), Some(11)],
3497            ],
3498        )?;
3499        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3500        assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3501
3502        // min max both descending
3503        let column_close_result = write_multiple_pages::<Int32Type>(
3504            &descr,
3505            &[
3506                &[Some(10), Some(11)],
3507                &[Some(5), Some(11)],
3508                &[None],
3509                &[Some(-5), Some(0)],
3510            ],
3511        )?;
3512        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3513        assert_eq!(boundary_order, BoundaryOrder::DESCENDING);
3514
3515        // min max both equal
3516        let column_close_result = write_multiple_pages::<Int32Type>(
3517            &descr,
3518            &[&[Some(10), Some(11)], &[None], &[Some(10), Some(11)]],
3519        )?;
3520        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3521        assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3522
3523        // only nulls
3524        let column_close_result =
3525            write_multiple_pages::<Int32Type>(&descr, &[&[None], &[None], &[None]])?;
3526        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3527        assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3528
3529        // one page
3530        let column_close_result =
3531            write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)]])?;
3532        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3533        assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3534
3535        // one non-null page
3536        let column_close_result =
3537            write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)], &[None]])?;
3538        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3539        assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3540
3541        // min max both unordered
3542        let column_close_result = write_multiple_pages::<Int32Type>(
3543            &descr,
3544            &[
3545                &[Some(10), Some(11)],
3546                &[Some(11), Some(16)],
3547                &[None],
3548                &[Some(-5), Some(0)],
3549            ],
3550        )?;
3551        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3552        assert_eq!(boundary_order, BoundaryOrder::UNORDERED);
3553
3554        // min max both ordered in different orders
3555        let column_close_result = write_multiple_pages::<Int32Type>(
3556            &descr,
3557            &[
3558                &[Some(1), Some(9)],
3559                &[Some(2), Some(8)],
3560                &[None],
3561                &[Some(3), Some(7)],
3562            ],
3563        )?;
3564        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3565        assert_eq!(boundary_order, BoundaryOrder::UNORDERED);
3566
3567        Ok(())
3568    }
3569
3570    #[test]
3571    fn test_boundary_order_logical_type() -> Result<()> {
3572        // ensure that logical types account for different sort order than underlying
3573        // physical type representation
3574        let f16_descr = Arc::new(get_test_float16_column_descr(1, 0));
3575        let fba_descr = {
3576            let tpe = SchemaType::primitive_type_builder(
3577                "col",
3578                FixedLenByteArrayType::get_physical_type(),
3579            )
3580            .with_length(2)
3581            .build()?;
3582            Arc::new(ColumnDescriptor::new(
3583                Arc::new(tpe),
3584                1,
3585                0,
3586                ColumnPath::from("col"),
3587            ))
3588        };
3589
3590        let values: &[&[Option<FixedLenByteArray>]] = &[
3591            &[Some(FixedLenByteArray::from(ByteArray::from(f16::ONE)))],
3592            &[Some(FixedLenByteArray::from(ByteArray::from(f16::ZERO)))],
3593            &[Some(FixedLenByteArray::from(ByteArray::from(
3594                f16::NEG_ZERO,
3595            )))],
3596            &[Some(FixedLenByteArray::from(ByteArray::from(f16::NEG_ONE)))],
3597        ];
3598
3599        // f16 descending
3600        let column_close_result =
3601            write_multiple_pages::<FixedLenByteArrayType>(&f16_descr, values)?;
3602        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3603        assert_eq!(boundary_order, BoundaryOrder::DESCENDING);
3604
3605        // same bytes, but fba unordered
3606        let column_close_result =
3607            write_multiple_pages::<FixedLenByteArrayType>(&fba_descr, values)?;
3608        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3609        assert_eq!(boundary_order, BoundaryOrder::UNORDERED);
3610
3611        Ok(())
3612    }
3613
3614    #[test]
3615    fn test_interval_stats_should_not_have_min_max() {
3616        let input = [
3617            vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
3618            vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
3619            vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2],
3620        ]
3621        .into_iter()
3622        .map(|s| ByteArray::from(s).into())
3623        .collect::<Vec<_>>();
3624
3625        let page_writer = get_test_page_writer();
3626        let mut writer = get_test_interval_column_writer(page_writer);
3627        writer.write_batch(&input, None, None).unwrap();
3628
3629        let metadata = writer.close().unwrap().metadata;
3630        let stats = if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
3631            stats.clone()
3632        } else {
3633            panic!("metadata missing statistics");
3634        };
3635        assert!(stats.min_bytes_opt().is_none());
3636        assert!(stats.max_bytes_opt().is_none());
3637    }
3638
3639    #[test]
3640    #[cfg(feature = "arrow")]
3641    fn test_column_writer_get_estimated_total_bytes() {
3642        let page_writer = get_test_page_writer();
3643        let props = Default::default();
3644        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
3645        assert_eq!(writer.get_estimated_total_bytes(), 0);
3646
3647        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
3648        writer.add_data_page().unwrap();
3649        let size_with_one_page = writer.get_estimated_total_bytes();
3650        assert_eq!(size_with_one_page, 20);
3651
3652        writer.write_batch(&[5, 6, 7, 8], None, None).unwrap();
3653        writer.add_data_page().unwrap();
3654        let size_with_two_pages = writer.get_estimated_total_bytes();
3655        // different pages have different compressed lengths
3656        assert_eq!(size_with_two_pages, 20 + 21);
3657    }
3658
3659    fn write_multiple_pages<T: DataType>(
3660        column_descr: &Arc<ColumnDescriptor>,
3661        pages: &[&[Option<T::T>]],
3662    ) -> Result<ColumnCloseResult> {
3663        let column_writer = get_column_writer(
3664            column_descr.clone(),
3665            Default::default(),
3666            get_test_page_writer(),
3667        );
3668        let mut writer = get_typed_column_writer::<T>(column_writer);
3669
3670        for &page in pages {
3671            let values = page.iter().filter_map(Clone::clone).collect::<Vec<_>>();
3672            let def_levels = page
3673                .iter()
3674                .map(|maybe_value| if maybe_value.is_some() { 1 } else { 0 })
3675                .collect::<Vec<_>>();
3676            writer.write_batch(&values, Some(&def_levels), None)?;
3677            writer.flush_data_pages()?;
3678        }
3679
3680        writer.close()
3681    }
3682
3683    /// Performs write-read roundtrip with randomly generated values and levels.
3684    /// `max_size` is maximum number of values or levels (if `max_def_level` > 0) to write
3685    /// for a column.
3686    fn column_roundtrip_random<T: DataType>(
3687        props: WriterProperties,
3688        max_size: usize,
3689        min_value: T::T,
3690        max_value: T::T,
3691        max_def_level: i16,
3692        max_rep_level: i16,
3693    ) where
3694        T::T: PartialOrd + SampleUniform + Copy,
3695    {
3696        let mut num_values: usize = 0;
3697
3698        let mut buf: Vec<i16> = Vec::new();
3699        let def_levels = if max_def_level > 0 {
3700            random_numbers_range(max_size, 0, max_def_level + 1, &mut buf);
3701            for &dl in &buf[..] {
3702                if dl == max_def_level {
3703                    num_values += 1;
3704                }
3705            }
3706            Some(&buf[..])
3707        } else {
3708            num_values = max_size;
3709            None
3710        };
3711
3712        let mut buf: Vec<i16> = Vec::new();
3713        let rep_levels = if max_rep_level > 0 {
3714            random_numbers_range(max_size, 0, max_rep_level + 1, &mut buf);
3715            buf[0] = 0; // Must start on record boundary
3716            Some(&buf[..])
3717        } else {
3718            None
3719        };
3720
3721        let mut values: Vec<T::T> = Vec::new();
3722        random_numbers_range(num_values, min_value, max_value, &mut values);
3723
3724        column_roundtrip::<T>(props, &values[..], def_levels, rep_levels);
3725    }
3726
3727    /// Performs write-read roundtrip and asserts written values and levels.
3728    fn column_roundtrip<T: DataType>(
3729        props: WriterProperties,
3730        values: &[T::T],
3731        def_levels: Option<&[i16]>,
3732        rep_levels: Option<&[i16]>,
3733    ) {
3734        let mut file = tempfile::tempfile().unwrap();
3735        let mut write = TrackedWrite::new(&mut file);
3736        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
3737
3738        let max_def_level = match def_levels {
3739            Some(buf) => *buf.iter().max().unwrap_or(&0i16),
3740            None => 0i16,
3741        };
3742
3743        let max_rep_level = match rep_levels {
3744            Some(buf) => *buf.iter().max().unwrap_or(&0i16),
3745            None => 0i16,
3746        };
3747
3748        let mut max_batch_size = values.len();
3749        if let Some(levels) = def_levels {
3750            max_batch_size = max_batch_size.max(levels.len());
3751        }
3752        if let Some(levels) = rep_levels {
3753            max_batch_size = max_batch_size.max(levels.len());
3754        }
3755
3756        let mut writer =
3757            get_test_column_writer::<T>(page_writer, max_def_level, max_rep_level, Arc::new(props));
3758
3759        let values_written = writer.write_batch(values, def_levels, rep_levels).unwrap();
3760        assert_eq!(values_written, values.len());
3761        let result = writer.close().unwrap();
3762
3763        drop(write);
3764
3765        let props = ReaderProperties::builder()
3766            .set_backward_compatible_lz4(false)
3767            .build();
3768        let page_reader = Box::new(
3769            SerializedPageReader::new_with_properties(
3770                Arc::new(file),
3771                &result.metadata,
3772                result.rows_written as usize,
3773                None,
3774                Arc::new(props),
3775            )
3776            .unwrap(),
3777        );
3778        let mut reader = get_test_column_reader::<T>(page_reader, max_def_level, max_rep_level);
3779
3780        let mut actual_values = Vec::with_capacity(max_batch_size);
3781        let mut actual_def_levels = def_levels.map(|_| Vec::with_capacity(max_batch_size));
3782        let mut actual_rep_levels = rep_levels.map(|_| Vec::with_capacity(max_batch_size));
3783
3784        let (_, values_read, levels_read) = reader
3785            .read_records(
3786                max_batch_size,
3787                actual_def_levels.as_mut(),
3788                actual_rep_levels.as_mut(),
3789                &mut actual_values,
3790            )
3791            .unwrap();
3792
3793        // Assert values, definition and repetition levels.
3794
3795        assert_eq!(&actual_values[..values_read], values);
3796        match actual_def_levels {
3797            Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), def_levels),
3798            None => assert_eq!(None, def_levels),
3799        }
3800        match actual_rep_levels {
3801            Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), rep_levels),
3802            None => assert_eq!(None, rep_levels),
3803        }
3804
3805        // Assert written rows.
3806
3807        if let Some(levels) = actual_rep_levels {
3808            let mut actual_rows_written = 0;
3809            for l in levels {
3810                if l == 0 {
3811                    actual_rows_written += 1;
3812                }
3813            }
3814            assert_eq!(actual_rows_written, result.rows_written);
3815        } else if actual_def_levels.is_some() {
3816            assert_eq!(levels_read as u64, result.rows_written);
3817        } else {
3818            assert_eq!(values_read as u64, result.rows_written);
3819        }
3820    }
3821
3822    /// Performs write of provided values and returns column metadata of those values.
3823    /// Used to test encoding support for column writer.
3824    fn column_write_and_get_metadata<T: DataType>(
3825        props: WriterProperties,
3826        values: &[T::T],
3827    ) -> ColumnChunkMetaData {
3828        let page_writer = get_test_page_writer();
3829        let props = Arc::new(props);
3830        let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
3831        writer.write_batch(values, None, None).unwrap();
3832        writer.close().unwrap().metadata
3833    }
3834
3835    // Function to use in tests for EncodingWriteSupport. This checks that dictionary
3836    // offset and encodings to make sure that column writer uses provided by trait
3837    // encodings.
3838    fn check_encoding_write_support<T: DataType>(
3839        version: WriterVersion,
3840        dict_enabled: bool,
3841        data: &[T::T],
3842        dictionary_page_offset: Option<i64>,
3843        encodings: &[Encoding],
3844    ) {
3845        let props = WriterProperties::builder()
3846            .set_writer_version(version)
3847            .set_dictionary_enabled(dict_enabled)
3848            .build();
3849        let meta = column_write_and_get_metadata::<T>(props, data);
3850        assert_eq!(meta.dictionary_page_offset(), dictionary_page_offset);
3851        assert_eq!(meta.encodings(), &encodings);
3852    }
3853
3854    /// Returns column writer.
3855    fn get_test_column_writer<'a, T: DataType>(
3856        page_writer: Box<dyn PageWriter + 'a>,
3857        max_def_level: i16,
3858        max_rep_level: i16,
3859        props: WriterPropertiesPtr,
3860    ) -> ColumnWriterImpl<'a, T> {
3861        let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
3862        let column_writer = get_column_writer(descr, props, page_writer);
3863        get_typed_column_writer::<T>(column_writer)
3864    }
3865
3866    /// Returns column reader.
3867    fn get_test_column_reader<T: DataType>(
3868        page_reader: Box<dyn PageReader>,
3869        max_def_level: i16,
3870        max_rep_level: i16,
3871    ) -> ColumnReaderImpl<T> {
3872        let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
3873        let column_reader = get_column_reader(descr, page_reader);
3874        get_typed_column_reader::<T>(column_reader)
3875    }
3876
3877    /// Returns descriptor for primitive column.
3878    fn get_test_column_descr<T: DataType>(
3879        max_def_level: i16,
3880        max_rep_level: i16,
3881    ) -> ColumnDescriptor {
3882        let path = ColumnPath::from("col");
3883        let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
3884            // length is set for "encoding support" tests for FIXED_LEN_BYTE_ARRAY type,
3885            // it should be no-op for other types
3886            .with_length(1)
3887            .build()
3888            .unwrap();
3889        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
3890    }
3891
3892    /// Returns page writer that collects pages without serializing them.
3893    fn get_test_page_writer() -> Box<dyn PageWriter> {
3894        Box::new(TestPageWriter {})
3895    }
3896
3897    struct TestPageWriter {}
3898
3899    impl PageWriter for TestPageWriter {
3900        fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
3901            let mut res = PageWriteSpec::new();
3902            res.page_type = page.page_type();
3903            res.uncompressed_size = page.uncompressed_size();
3904            res.compressed_size = page.compressed_size();
3905            res.num_values = page.num_values();
3906            res.offset = 0;
3907            res.bytes_written = page.data().len() as u64;
3908            Ok(res)
3909        }
3910
3911        fn close(&mut self) -> Result<()> {
3912            Ok(())
3913        }
3914    }
3915
3916    /// Write data into parquet using [`get_test_page_writer`] and [`get_test_column_writer`] and returns generated statistics.
3917    fn statistics_roundtrip<T: DataType>(values: &[<T as DataType>::T]) -> Statistics {
3918        let page_writer = get_test_page_writer();
3919        let props = Default::default();
3920        let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
3921        writer.write_batch(values, None, None).unwrap();
3922
3923        let metadata = writer.close().unwrap().metadata;
3924        if let Some(stats) = metadata.statistics() {
3925            stats.clone()
3926        } else {
3927            panic!("metadata missing statistics");
3928        }
3929    }
3930
3931    /// Returns Decimals column writer.
3932    fn get_test_decimals_column_writer<T: DataType>(
3933        page_writer: Box<dyn PageWriter>,
3934        max_def_level: i16,
3935        max_rep_level: i16,
3936        props: WriterPropertiesPtr,
3937    ) -> ColumnWriterImpl<'static, T> {
3938        let descr = Arc::new(get_test_decimals_column_descr::<T>(
3939            max_def_level,
3940            max_rep_level,
3941        ));
3942        let column_writer = get_column_writer(descr, props, page_writer);
3943        get_typed_column_writer::<T>(column_writer)
3944    }
3945
3946    /// Returns descriptor for Decimal type with primitive column.
3947    fn get_test_decimals_column_descr<T: DataType>(
3948        max_def_level: i16,
3949        max_rep_level: i16,
3950    ) -> ColumnDescriptor {
3951        let path = ColumnPath::from("col");
3952        let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
3953            .with_length(16)
3954            .with_logical_type(Some(LogicalType::Decimal {
3955                scale: 2,
3956                precision: 3,
3957            }))
3958            .with_scale(2)
3959            .with_precision(3)
3960            .build()
3961            .unwrap();
3962        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
3963    }
3964
3965    fn float16_statistics_roundtrip(
3966        values: &[FixedLenByteArray],
3967    ) -> ValueStatistics<FixedLenByteArray> {
3968        let page_writer = get_test_page_writer();
3969        let mut writer = get_test_float16_column_writer(page_writer, Default::default());
3970        writer.write_batch(values, None, None).unwrap();
3971
3972        let metadata = writer.close().unwrap().metadata;
3973        if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
3974            stats.clone()
3975        } else {
3976            panic!("metadata missing statistics");
3977        }
3978    }
3979
3980    fn get_test_float16_column_writer(
3981        page_writer: Box<dyn PageWriter>,
3982        props: WriterPropertiesPtr,
3983    ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
3984        let descr = Arc::new(get_test_float16_column_descr(0, 0));
3985        let column_writer = get_column_writer(descr, props, page_writer);
3986        get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
3987    }
3988
3989    fn get_test_float16_column_descr(max_def_level: i16, max_rep_level: i16) -> ColumnDescriptor {
3990        let path = ColumnPath::from("col");
3991        let tpe =
3992            SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
3993                .with_length(2)
3994                .with_logical_type(Some(LogicalType::Float16))
3995                .build()
3996                .unwrap();
3997        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
3998    }
3999
4000    fn get_test_interval_column_writer(
4001        page_writer: Box<dyn PageWriter>,
4002    ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
4003        let descr = Arc::new(get_test_interval_column_descr());
4004        let column_writer = get_column_writer(descr, Default::default(), page_writer);
4005        get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
4006    }
4007
4008    fn get_test_interval_column_descr() -> ColumnDescriptor {
4009        let path = ColumnPath::from("col");
4010        let tpe =
4011            SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
4012                .with_length(12)
4013                .with_converted_type(ConvertedType::INTERVAL)
4014                .build()
4015                .unwrap();
4016        ColumnDescriptor::new(Arc::new(tpe), 0, 0, path)
4017    }
4018
4019    /// Returns column writer for UINT32 Column provided as ConvertedType only
4020    fn get_test_unsigned_int_given_as_converted_column_writer<'a, T: DataType>(
4021        page_writer: Box<dyn PageWriter + 'a>,
4022        max_def_level: i16,
4023        max_rep_level: i16,
4024        props: WriterPropertiesPtr,
4025    ) -> ColumnWriterImpl<'a, T> {
4026        let descr = Arc::new(get_test_converted_type_unsigned_integer_column_descr::<T>(
4027            max_def_level,
4028            max_rep_level,
4029        ));
4030        let column_writer = get_column_writer(descr, props, page_writer);
4031        get_typed_column_writer::<T>(column_writer)
4032    }
4033
4034    /// Returns column descriptor for UINT32 Column provided as ConvertedType only
4035    fn get_test_converted_type_unsigned_integer_column_descr<T: DataType>(
4036        max_def_level: i16,
4037        max_rep_level: i16,
4038    ) -> ColumnDescriptor {
4039        let path = ColumnPath::from("col");
4040        let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4041            .with_converted_type(ConvertedType::UINT_32)
4042            .build()
4043            .unwrap();
4044        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4045    }
4046}