parquet/column/writer/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains column writer API.
19
20use bytes::Bytes;
21use half::f16;
22
23use crate::bloom_filter::Sbbf;
24use crate::format::{BoundaryOrder, ColumnIndex, OffsetIndex};
25use std::collections::{BTreeSet, VecDeque};
26use std::str;
27
28use crate::basic::{Compression, ConvertedType, Encoding, LogicalType, PageType, Type};
29use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter};
30use crate::column::writer::encoder::{ColumnValueEncoder, ColumnValueEncoderImpl, ColumnValues};
31use crate::compression::{create_codec, Codec, CodecOptionsBuilder};
32use crate::data_type::private::ParquetValueType;
33use crate::data_type::*;
34use crate::encodings::levels::LevelEncoder;
35use crate::errors::{ParquetError, Result};
36use crate::file::metadata::{ColumnIndexBuilder, LevelHistogram, OffsetIndexBuilder};
37use crate::file::properties::EnabledStatistics;
38use crate::file::statistics::{Statistics, ValueStatistics};
39use crate::file::{
40    metadata::ColumnChunkMetaData,
41    properties::{WriterProperties, WriterPropertiesPtr, WriterVersion},
42};
43use crate::schema::types::{ColumnDescPtr, ColumnDescriptor};
44
45pub(crate) mod encoder;
46
47macro_rules! downcast_writer {
48    ($e:expr, $i:ident, $b:expr) => {
49        match $e {
50            Self::BoolColumnWriter($i) => $b,
51            Self::Int32ColumnWriter($i) => $b,
52            Self::Int64ColumnWriter($i) => $b,
53            Self::Int96ColumnWriter($i) => $b,
54            Self::FloatColumnWriter($i) => $b,
55            Self::DoubleColumnWriter($i) => $b,
56            Self::ByteArrayColumnWriter($i) => $b,
57            Self::FixedLenByteArrayColumnWriter($i) => $b,
58        }
59    };
60}
61
62/// Column writer for a Parquet type.
63pub enum ColumnWriter<'a> {
64    /// Column writer for boolean type
65    BoolColumnWriter(ColumnWriterImpl<'a, BoolType>),
66    /// Column writer for int32 type
67    Int32ColumnWriter(ColumnWriterImpl<'a, Int32Type>),
68    /// Column writer for int64 type
69    Int64ColumnWriter(ColumnWriterImpl<'a, Int64Type>),
70    /// Column writer for int96 (timestamp) type
71    Int96ColumnWriter(ColumnWriterImpl<'a, Int96Type>),
72    /// Column writer for float type
73    FloatColumnWriter(ColumnWriterImpl<'a, FloatType>),
74    /// Column writer for double type
75    DoubleColumnWriter(ColumnWriterImpl<'a, DoubleType>),
76    /// Column writer for byte array type
77    ByteArrayColumnWriter(ColumnWriterImpl<'a, ByteArrayType>),
78    /// Column writer for fixed length byte array type
79    FixedLenByteArrayColumnWriter(ColumnWriterImpl<'a, FixedLenByteArrayType>),
80}
81
82impl ColumnWriter<'_> {
83    /// Returns the estimated total memory usage
84    #[cfg(feature = "arrow")]
85    pub(crate) fn memory_size(&self) -> usize {
86        downcast_writer!(self, typed, typed.memory_size())
87    }
88
89    /// Returns the estimated total encoded bytes for this column writer
90    #[cfg(feature = "arrow")]
91    pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
92        downcast_writer!(self, typed, typed.get_estimated_total_bytes())
93    }
94
95    /// Close this [`ColumnWriter`]
96    pub fn close(self) -> Result<ColumnCloseResult> {
97        downcast_writer!(self, typed, typed.close())
98    }
99}
100
101#[deprecated(
102    since = "54.0.0",
103    note = "Seems like a stray and nobody knows what's it for. Will be removed in the next release."
104)]
105#[allow(missing_docs)]
106pub enum Level {
107    Page,
108    Column,
109}
110
111/// Gets a specific column writer corresponding to column descriptor `descr`.
112pub fn get_column_writer<'a>(
113    descr: ColumnDescPtr,
114    props: WriterPropertiesPtr,
115    page_writer: Box<dyn PageWriter + 'a>,
116) -> ColumnWriter<'a> {
117    match descr.physical_type() {
118        Type::BOOLEAN => {
119            ColumnWriter::BoolColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
120        }
121        Type::INT32 => {
122            ColumnWriter::Int32ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
123        }
124        Type::INT64 => {
125            ColumnWriter::Int64ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
126        }
127        Type::INT96 => {
128            ColumnWriter::Int96ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
129        }
130        Type::FLOAT => {
131            ColumnWriter::FloatColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
132        }
133        Type::DOUBLE => {
134            ColumnWriter::DoubleColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
135        }
136        Type::BYTE_ARRAY => {
137            ColumnWriter::ByteArrayColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
138        }
139        Type::FIXED_LEN_BYTE_ARRAY => ColumnWriter::FixedLenByteArrayColumnWriter(
140            ColumnWriterImpl::new(descr, props, page_writer),
141        ),
142    }
143}
144
145/// Gets a typed column writer for the specific type `T`, by "up-casting" `col_writer` of
146/// non-generic type to a generic column writer type `ColumnWriterImpl`.
147///
148/// Panics if actual enum value for `col_writer` does not match the type `T`.
149pub fn get_typed_column_writer<T: DataType>(col_writer: ColumnWriter) -> ColumnWriterImpl<T> {
150    T::get_column_writer(col_writer).unwrap_or_else(|| {
151        panic!(
152            "Failed to convert column writer into a typed column writer for `{}` type",
153            T::get_physical_type()
154        )
155    })
156}
157
158/// Similar to `get_typed_column_writer` but returns a reference.
159pub fn get_typed_column_writer_ref<'a, 'b: 'a, T: DataType>(
160    col_writer: &'b ColumnWriter<'a>,
161) -> &'b ColumnWriterImpl<'a, T> {
162    T::get_column_writer_ref(col_writer).unwrap_or_else(|| {
163        panic!(
164            "Failed to convert column writer into a typed column writer for `{}` type",
165            T::get_physical_type()
166        )
167    })
168}
169
170/// Similar to `get_typed_column_writer` but returns a reference.
171pub fn get_typed_column_writer_mut<'a, 'b: 'a, T: DataType>(
172    col_writer: &'a mut ColumnWriter<'b>,
173) -> &'a mut ColumnWriterImpl<'b, T> {
174    T::get_column_writer_mut(col_writer).unwrap_or_else(|| {
175        panic!(
176            "Failed to convert column writer into a typed column writer for `{}` type",
177            T::get_physical_type()
178        )
179    })
180}
181
182/// Metadata returned by [`GenericColumnWriter::close`]
183#[derive(Debug, Clone)]
184pub struct ColumnCloseResult {
185    /// The total number of bytes written
186    pub bytes_written: u64,
187    /// The total number of rows written
188    pub rows_written: u64,
189    /// Metadata for this column chunk
190    pub metadata: ColumnChunkMetaData,
191    /// Optional bloom filter for this column
192    pub bloom_filter: Option<Sbbf>,
193    /// Optional column index, for filtering
194    pub column_index: Option<ColumnIndex>,
195    /// Optional offset index, identifying page locations
196    pub offset_index: Option<OffsetIndex>,
197}
198
199// Metrics per page
200#[derive(Default)]
201struct PageMetrics {
202    num_buffered_values: u32,
203    num_buffered_rows: u32,
204    num_page_nulls: u64,
205    repetition_level_histogram: Option<LevelHistogram>,
206    definition_level_histogram: Option<LevelHistogram>,
207}
208
209impl PageMetrics {
210    fn new() -> Self {
211        Default::default()
212    }
213
214    /// Initialize the repetition level histogram
215    fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
216        self.repetition_level_histogram = LevelHistogram::try_new(max_level);
217        self
218    }
219
220    /// Initialize the definition level histogram
221    fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
222        self.definition_level_histogram = LevelHistogram::try_new(max_level);
223        self
224    }
225
226    /// Resets the state of this `PageMetrics` to the initial state.
227    /// If histograms have been initialized their contents will be reset to zero.
228    fn new_page(&mut self) {
229        self.num_buffered_values = 0;
230        self.num_buffered_rows = 0;
231        self.num_page_nulls = 0;
232        self.repetition_level_histogram
233            .as_mut()
234            .map(LevelHistogram::reset);
235        self.definition_level_histogram
236            .as_mut()
237            .map(LevelHistogram::reset);
238    }
239
240    /// Updates histogram values using provided repetition levels
241    fn update_repetition_level_histogram(&mut self, levels: &[i16]) {
242        if let Some(ref mut rep_hist) = self.repetition_level_histogram {
243            rep_hist.update_from_levels(levels);
244        }
245    }
246
247    /// Updates histogram values using provided definition levels
248    fn update_definition_level_histogram(&mut self, levels: &[i16]) {
249        if let Some(ref mut def_hist) = self.definition_level_histogram {
250            def_hist.update_from_levels(levels);
251        }
252    }
253}
254
255// Metrics per column writer
256#[derive(Default)]
257struct ColumnMetrics<T: Default> {
258    total_bytes_written: u64,
259    total_rows_written: u64,
260    total_uncompressed_size: u64,
261    total_compressed_size: u64,
262    total_num_values: u64,
263    dictionary_page_offset: Option<u64>,
264    data_page_offset: Option<u64>,
265    min_column_value: Option<T>,
266    max_column_value: Option<T>,
267    num_column_nulls: u64,
268    column_distinct_count: Option<u64>,
269    variable_length_bytes: Option<i64>,
270    repetition_level_histogram: Option<LevelHistogram>,
271    definition_level_histogram: Option<LevelHistogram>,
272}
273
274impl<T: Default> ColumnMetrics<T> {
275    fn new() -> Self {
276        Default::default()
277    }
278
279    /// Initialize the repetition level histogram
280    fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
281        self.repetition_level_histogram = LevelHistogram::try_new(max_level);
282        self
283    }
284
285    /// Initialize the definition level histogram
286    fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
287        self.definition_level_histogram = LevelHistogram::try_new(max_level);
288        self
289    }
290
291    /// Sum `page_histogram` into `chunk_histogram`
292    fn update_histogram(
293        chunk_histogram: &mut Option<LevelHistogram>,
294        page_histogram: &Option<LevelHistogram>,
295    ) {
296        if let (Some(page_hist), Some(chunk_hist)) = (page_histogram, chunk_histogram) {
297            chunk_hist.add(page_hist);
298        }
299    }
300
301    /// Sum the provided PageMetrics histograms into the chunk histograms. Does nothing if
302    /// page histograms are not initialized.
303    fn update_from_page_metrics(&mut self, page_metrics: &PageMetrics) {
304        ColumnMetrics::<T>::update_histogram(
305            &mut self.definition_level_histogram,
306            &page_metrics.definition_level_histogram,
307        );
308        ColumnMetrics::<T>::update_histogram(
309            &mut self.repetition_level_histogram,
310            &page_metrics.repetition_level_histogram,
311        );
312    }
313
314    /// Sum the provided page variable_length_bytes into the chunk variable_length_bytes
315    fn update_variable_length_bytes(&mut self, variable_length_bytes: Option<i64>) {
316        if let Some(var_bytes) = variable_length_bytes {
317            *self.variable_length_bytes.get_or_insert(0) += var_bytes;
318        }
319    }
320}
321
322/// Typed column writer for a primitive column.
323pub type ColumnWriterImpl<'a, T> = GenericColumnWriter<'a, ColumnValueEncoderImpl<T>>;
324
325/// Generic column writer for a primitive column.
326pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> {
327    // Column writer properties
328    descr: ColumnDescPtr,
329    props: WriterPropertiesPtr,
330    statistics_enabled: EnabledStatistics,
331
332    page_writer: Box<dyn PageWriter + 'a>,
333    codec: Compression,
334    compressor: Option<Box<dyn Codec>>,
335    encoder: E,
336
337    page_metrics: PageMetrics,
338    // Metrics per column writer
339    column_metrics: ColumnMetrics<E::T>,
340
341    /// The order of encodings within the generated metadata does not impact its meaning,
342    /// but we use a BTreeSet so that the output is deterministic
343    encodings: BTreeSet<Encoding>,
344    // Reused buffers
345    def_levels_sink: Vec<i16>,
346    rep_levels_sink: Vec<i16>,
347    data_pages: VecDeque<CompressedPage>,
348    // column index and offset index
349    column_index_builder: ColumnIndexBuilder,
350    offset_index_builder: Option<OffsetIndexBuilder>,
351
352    // Below fields used to incrementally check boundary order across data pages.
353    // We assume they are ascending/descending until proven wrong.
354    data_page_boundary_ascending: bool,
355    data_page_boundary_descending: bool,
356    /// (min, max)
357    last_non_null_data_page_min_max: Option<(E::T, E::T)>,
358}
359
360impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
361    /// Returns a new instance of [`GenericColumnWriter`].
362    pub fn new(
363        descr: ColumnDescPtr,
364        props: WriterPropertiesPtr,
365        page_writer: Box<dyn PageWriter + 'a>,
366    ) -> Self {
367        let codec = props.compression(descr.path());
368        let codec_options = CodecOptionsBuilder::default().build();
369        let compressor = create_codec(codec, &codec_options).unwrap();
370        let encoder = E::try_new(&descr, props.as_ref()).unwrap();
371
372        let statistics_enabled = props.statistics_enabled(descr.path());
373
374        let mut encodings = BTreeSet::new();
375        // Used for level information
376        encodings.insert(Encoding::RLE);
377
378        let mut page_metrics = PageMetrics::new();
379        let mut column_metrics = ColumnMetrics::<E::T>::new();
380
381        // Initialize level histograms if collecting page or chunk statistics
382        if statistics_enabled != EnabledStatistics::None {
383            page_metrics = page_metrics
384                .with_repetition_level_histogram(descr.max_rep_level())
385                .with_definition_level_histogram(descr.max_def_level());
386            column_metrics = column_metrics
387                .with_repetition_level_histogram(descr.max_rep_level())
388                .with_definition_level_histogram(descr.max_def_level())
389        }
390
391        // Disable column_index_builder if not collecting page statistics.
392        let mut column_index_builder = ColumnIndexBuilder::new();
393        if statistics_enabled != EnabledStatistics::Page {
394            column_index_builder.to_invalid()
395        }
396
397        // Disable offset_index_builder if requested by user.
398        let offset_index_builder = match props.offset_index_disabled() {
399            false => Some(OffsetIndexBuilder::new()),
400            _ => None,
401        };
402
403        Self {
404            descr,
405            props,
406            statistics_enabled,
407            page_writer,
408            codec,
409            compressor,
410            encoder,
411            def_levels_sink: vec![],
412            rep_levels_sink: vec![],
413            data_pages: VecDeque::new(),
414            page_metrics,
415            column_metrics,
416            column_index_builder,
417            offset_index_builder,
418            encodings,
419            data_page_boundary_ascending: true,
420            data_page_boundary_descending: true,
421            last_non_null_data_page_min_max: None,
422        }
423    }
424
425    #[allow(clippy::too_many_arguments)]
426    pub(crate) fn write_batch_internal(
427        &mut self,
428        values: &E::Values,
429        value_indices: Option<&[usize]>,
430        def_levels: Option<&[i16]>,
431        rep_levels: Option<&[i16]>,
432        min: Option<&E::T>,
433        max: Option<&E::T>,
434        distinct_count: Option<u64>,
435    ) -> Result<usize> {
436        // Check if number of definition levels is the same as number of repetition levels.
437        if let (Some(def), Some(rep)) = (def_levels, rep_levels) {
438            if def.len() != rep.len() {
439                return Err(general_err!(
440                    "Inconsistent length of definition and repetition levels: {} != {}",
441                    def.len(),
442                    rep.len()
443                ));
444            }
445        }
446
447        // We check for DataPage limits only after we have inserted the values. If a user
448        // writes a large number of values, the DataPage size can be well above the limit.
449        //
450        // The purpose of this chunking is to bound this. Even if a user writes large
451        // number of values, the chunking will ensure that we add data page at a
452        // reasonable pagesize limit.
453
454        // TODO: find out why we don't account for size of levels when we estimate page
455        // size.
456
457        let num_levels = match def_levels {
458            Some(def_levels) => def_levels.len(),
459            None => values.len(),
460        };
461
462        if let Some(min) = min {
463            update_min(&self.descr, min, &mut self.column_metrics.min_column_value);
464        }
465        if let Some(max) = max {
466            update_max(&self.descr, max, &mut self.column_metrics.max_column_value);
467        }
468
469        // We can only set the distinct count if there are no other writes
470        if self.encoder.num_values() == 0 {
471            self.column_metrics.column_distinct_count = distinct_count;
472        } else {
473            self.column_metrics.column_distinct_count = None;
474        }
475
476        let mut values_offset = 0;
477        let mut levels_offset = 0;
478        let base_batch_size = self.props.write_batch_size();
479        while levels_offset < num_levels {
480            let mut end_offset = num_levels.min(levels_offset + base_batch_size);
481
482            // Split at record boundary
483            if let Some(r) = rep_levels {
484                while end_offset < r.len() && r[end_offset] != 0 {
485                    end_offset += 1;
486                }
487            }
488
489            values_offset += self.write_mini_batch(
490                values,
491                values_offset,
492                value_indices,
493                end_offset - levels_offset,
494                def_levels.map(|lv| &lv[levels_offset..end_offset]),
495                rep_levels.map(|lv| &lv[levels_offset..end_offset]),
496            )?;
497            levels_offset = end_offset;
498        }
499
500        // Return total number of values processed.
501        Ok(values_offset)
502    }
503
504    /// Writes batch of values, definition levels and repetition levels.
505    /// Returns number of values processed (written).
506    ///
507    /// If definition and repetition levels are provided, we write fully those levels and
508    /// select how many values to write (this number will be returned), since number of
509    /// actual written values may be smaller than provided values.
510    ///
511    /// If only values are provided, then all values are written and the length of
512    /// of the values buffer is returned.
513    ///
514    /// Definition and/or repetition levels can be omitted, if values are
515    /// non-nullable and/or non-repeated.
516    pub fn write_batch(
517        &mut self,
518        values: &E::Values,
519        def_levels: Option<&[i16]>,
520        rep_levels: Option<&[i16]>,
521    ) -> Result<usize> {
522        self.write_batch_internal(values, None, def_levels, rep_levels, None, None, None)
523    }
524
525    /// Writer may optionally provide pre-calculated statistics for use when computing
526    /// chunk-level statistics
527    ///
528    /// NB: [`WriterProperties::statistics_enabled`] must be set to [`EnabledStatistics::Chunk`]
529    /// for these statistics to take effect. If [`EnabledStatistics::None`] they will be ignored,
530    /// and if [`EnabledStatistics::Page`] the chunk statistics will instead be computed from the
531    /// computed page statistics
532    pub fn write_batch_with_statistics(
533        &mut self,
534        values: &E::Values,
535        def_levels: Option<&[i16]>,
536        rep_levels: Option<&[i16]>,
537        min: Option<&E::T>,
538        max: Option<&E::T>,
539        distinct_count: Option<u64>,
540    ) -> Result<usize> {
541        self.write_batch_internal(
542            values,
543            None,
544            def_levels,
545            rep_levels,
546            min,
547            max,
548            distinct_count,
549        )
550    }
551
552    /// Returns the estimated total memory usage.
553    ///
554    /// Unlike [`Self::get_estimated_total_bytes`] this is an estimate
555    /// of the current memory usage and not the final anticipated encoded size.
556    #[cfg(feature = "arrow")]
557    pub(crate) fn memory_size(&self) -> usize {
558        self.column_metrics.total_bytes_written as usize + self.encoder.estimated_memory_size()
559    }
560
561    /// Returns total number of bytes written by this column writer so far.
562    /// This value is also returned when column writer is closed.
563    ///
564    /// Note: this value does not include any buffered data that has not
565    /// yet been flushed to a page.
566    pub fn get_total_bytes_written(&self) -> u64 {
567        self.column_metrics.total_bytes_written
568    }
569
570    /// Returns the estimated total encoded bytes for this column writer.
571    ///
572    /// Unlike [`Self::get_total_bytes_written`] this includes an estimate
573    /// of any data that has not yet been flushed to a page, based on it's
574    /// anticipated encoded size.
575    #[cfg(feature = "arrow")]
576    pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
577        self.data_pages
578            .iter()
579            .map(|page| page.data().len() as u64)
580            .sum::<u64>()
581            + self.column_metrics.total_bytes_written
582            + self.encoder.estimated_data_page_size() as u64
583            + self.encoder.estimated_dict_page_size().unwrap_or_default() as u64
584    }
585
586    /// Returns total number of rows written by this column writer so far.
587    /// This value is also returned when column writer is closed.
588    pub fn get_total_rows_written(&self) -> u64 {
589        self.column_metrics.total_rows_written
590    }
591
592    /// Returns a reference to a [`ColumnDescPtr`]
593    pub fn get_descriptor(&self) -> &ColumnDescPtr {
594        &self.descr
595    }
596
597    /// Finalizes writes and closes the column writer.
598    /// Returns total bytes written, total rows written and column chunk metadata.
599    pub fn close(mut self) -> Result<ColumnCloseResult> {
600        if self.page_metrics.num_buffered_values > 0 {
601            self.add_data_page()?;
602        }
603        if self.encoder.has_dictionary() {
604            self.write_dictionary_page()?;
605        }
606        self.flush_data_pages()?;
607        let metadata = self.build_column_metadata()?;
608        self.page_writer.close()?;
609
610        let boundary_order = match (
611            self.data_page_boundary_ascending,
612            self.data_page_boundary_descending,
613        ) {
614            // If the lists are composed of equal elements then will be marked as ascending
615            // (Also the case if all pages are null pages)
616            (true, _) => BoundaryOrder::ASCENDING,
617            (false, true) => BoundaryOrder::DESCENDING,
618            (false, false) => BoundaryOrder::UNORDERED,
619        };
620        self.column_index_builder.set_boundary_order(boundary_order);
621
622        let column_index = self
623            .column_index_builder
624            .valid()
625            .then(|| self.column_index_builder.build_to_thrift());
626
627        let offset_index = self.offset_index_builder.map(|b| b.build_to_thrift());
628
629        Ok(ColumnCloseResult {
630            bytes_written: self.column_metrics.total_bytes_written,
631            rows_written: self.column_metrics.total_rows_written,
632            bloom_filter: self.encoder.flush_bloom_filter(),
633            metadata,
634            column_index,
635            offset_index,
636        })
637    }
638
639    /// Writes mini batch of values, definition and repetition levels.
640    /// This allows fine-grained processing of values and maintaining a reasonable
641    /// page size.
642    fn write_mini_batch(
643        &mut self,
644        values: &E::Values,
645        values_offset: usize,
646        value_indices: Option<&[usize]>,
647        num_levels: usize,
648        def_levels: Option<&[i16]>,
649        rep_levels: Option<&[i16]>,
650    ) -> Result<usize> {
651        // Process definition levels and determine how many values to write.
652        let values_to_write = if self.descr.max_def_level() > 0 {
653            let levels = def_levels.ok_or_else(|| {
654                general_err!(
655                    "Definition levels are required, because max definition level = {}",
656                    self.descr.max_def_level()
657                )
658            })?;
659
660            let mut values_to_write = 0;
661            for &level in levels {
662                if level == self.descr.max_def_level() {
663                    values_to_write += 1;
664                } else {
665                    // We must always compute this as it is used to populate v2 pages
666                    self.page_metrics.num_page_nulls += 1
667                }
668            }
669
670            // Update histogram
671            self.page_metrics.update_definition_level_histogram(levels);
672
673            self.def_levels_sink.extend_from_slice(levels);
674            values_to_write
675        } else {
676            num_levels
677        };
678
679        // Process repetition levels and determine how many rows we are about to process.
680        if self.descr.max_rep_level() > 0 {
681            // A row could contain more than one value.
682            let levels = rep_levels.ok_or_else(|| {
683                general_err!(
684                    "Repetition levels are required, because max repetition level = {}",
685                    self.descr.max_rep_level()
686                )
687            })?;
688
689            if !levels.is_empty() && levels[0] != 0 {
690                return Err(general_err!(
691                    "Write must start at a record boundary, got non-zero repetition level of {}",
692                    levels[0]
693                ));
694            }
695
696            // Count the occasions where we start a new row
697            for &level in levels {
698                self.page_metrics.num_buffered_rows += (level == 0) as u32
699            }
700
701            // Update histogram
702            self.page_metrics.update_repetition_level_histogram(levels);
703
704            self.rep_levels_sink.extend_from_slice(levels);
705        } else {
706            // Each value is exactly one row.
707            // Equals to the number of values, we count nulls as well.
708            self.page_metrics.num_buffered_rows += num_levels as u32;
709        }
710
711        match value_indices {
712            Some(indices) => {
713                let indices = &indices[values_offset..values_offset + values_to_write];
714                self.encoder.write_gather(values, indices)?;
715            }
716            None => self.encoder.write(values, values_offset, values_to_write)?,
717        }
718
719        self.page_metrics.num_buffered_values += num_levels as u32;
720
721        if self.should_add_data_page() {
722            self.add_data_page()?;
723        }
724
725        if self.should_dict_fallback() {
726            self.dict_fallback()?;
727        }
728
729        Ok(values_to_write)
730    }
731
732    /// Returns true if we need to fall back to non-dictionary encoding.
733    ///
734    /// We can only fall back if dictionary encoder is set and we have exceeded dictionary
735    /// size.
736    #[inline]
737    fn should_dict_fallback(&self) -> bool {
738        match self.encoder.estimated_dict_page_size() {
739            Some(size) => size >= self.props.dictionary_page_size_limit(),
740            None => false,
741        }
742    }
743
744    /// Returns true if there is enough data for a data page, false otherwise.
745    #[inline]
746    fn should_add_data_page(&self) -> bool {
747        // This is necessary in the event of a much larger dictionary size than page size
748        //
749        // In such a scenario the dictionary decoder may return an estimated encoded
750        // size in excess of the page size limit, even when there are no buffered values
751        if self.page_metrics.num_buffered_values == 0 {
752            return false;
753        }
754
755        self.page_metrics.num_buffered_rows as usize >= self.props.data_page_row_count_limit()
756            || self.encoder.estimated_data_page_size() >= self.props.data_page_size_limit()
757    }
758
759    /// Performs dictionary fallback.
760    /// Prepares and writes dictionary and all data pages into page writer.
761    fn dict_fallback(&mut self) -> Result<()> {
762        // At this point we know that we need to fall back.
763        if self.page_metrics.num_buffered_values > 0 {
764            self.add_data_page()?;
765        }
766        self.write_dictionary_page()?;
767        self.flush_data_pages()?;
768        Ok(())
769    }
770
771    /// Update the column index and offset index when adding the data page
772    fn update_column_offset_index(
773        &mut self,
774        page_statistics: Option<&ValueStatistics<E::T>>,
775        page_variable_length_bytes: Option<i64>,
776    ) {
777        // update the column index
778        let null_page =
779            (self.page_metrics.num_buffered_rows as u64) == self.page_metrics.num_page_nulls;
780        // a page contains only null values,
781        // and writers have to set the corresponding entries in min_values and max_values to byte[0]
782        if null_page && self.column_index_builder.valid() {
783            self.column_index_builder.append(
784                null_page,
785                vec![],
786                vec![],
787                self.page_metrics.num_page_nulls as i64,
788            );
789        } else if self.column_index_builder.valid() {
790            // from page statistics
791            // If can't get the page statistics, ignore this column/offset index for this column chunk
792            match &page_statistics {
793                None => {
794                    self.column_index_builder.to_invalid();
795                }
796                Some(stat) => {
797                    // Check if min/max are still ascending/descending across pages
798                    let new_min = stat.min_opt().unwrap();
799                    let new_max = stat.max_opt().unwrap();
800                    if let Some((last_min, last_max)) = &self.last_non_null_data_page_min_max {
801                        if self.data_page_boundary_ascending {
802                            // If last min/max are greater than new min/max then not ascending anymore
803                            let not_ascending = compare_greater(&self.descr, last_min, new_min)
804                                || compare_greater(&self.descr, last_max, new_max);
805                            if not_ascending {
806                                self.data_page_boundary_ascending = false;
807                            }
808                        }
809
810                        if self.data_page_boundary_descending {
811                            // If new min/max are greater than last min/max then not descending anymore
812                            let not_descending = compare_greater(&self.descr, new_min, last_min)
813                                || compare_greater(&self.descr, new_max, last_max);
814                            if not_descending {
815                                self.data_page_boundary_descending = false;
816                            }
817                        }
818                    }
819                    self.last_non_null_data_page_min_max = Some((new_min.clone(), new_max.clone()));
820
821                    if self.can_truncate_value() {
822                        self.column_index_builder.append(
823                            null_page,
824                            self.truncate_min_value(
825                                self.props.column_index_truncate_length(),
826                                stat.min_bytes_opt().unwrap(),
827                            )
828                            .0,
829                            self.truncate_max_value(
830                                self.props.column_index_truncate_length(),
831                                stat.max_bytes_opt().unwrap(),
832                            )
833                            .0,
834                            self.page_metrics.num_page_nulls as i64,
835                        );
836                    } else {
837                        self.column_index_builder.append(
838                            null_page,
839                            stat.min_bytes_opt().unwrap().to_vec(),
840                            stat.max_bytes_opt().unwrap().to_vec(),
841                            self.page_metrics.num_page_nulls as i64,
842                        );
843                    }
844                }
845            }
846        }
847
848        // Append page histograms to the `ColumnIndex` histograms
849        self.column_index_builder.append_histograms(
850            &self.page_metrics.repetition_level_histogram,
851            &self.page_metrics.definition_level_histogram,
852        );
853
854        // Update the offset index
855        if let Some(builder) = self.offset_index_builder.as_mut() {
856            builder.append_row_count(self.page_metrics.num_buffered_rows as i64);
857            builder.append_unencoded_byte_array_data_bytes(page_variable_length_bytes);
858        }
859    }
860
861    /// Determine if we should allow truncating min/max values for this column's statistics
862    fn can_truncate_value(&self) -> bool {
863        match self.descr.physical_type() {
864            // Don't truncate for Float16 and Decimal because their sort order is different
865            // from that of FIXED_LEN_BYTE_ARRAY sort order.
866            // So truncation of those types could lead to inaccurate min/max statistics
867            Type::FIXED_LEN_BYTE_ARRAY
868                if !matches!(
869                    self.descr.logical_type(),
870                    Some(LogicalType::Decimal { .. }) | Some(LogicalType::Float16)
871                ) =>
872            {
873                true
874            }
875            Type::BYTE_ARRAY => true,
876            // Truncation only applies for fba/binary physical types
877            _ => false,
878        }
879    }
880
881    /// Returns `true` if this column's logical type is a UTF-8 string.
882    fn is_utf8(&self) -> bool {
883        self.get_descriptor().logical_type() == Some(LogicalType::String)
884            || self.get_descriptor().converted_type() == ConvertedType::UTF8
885    }
886
887    /// Truncates a binary statistic to at most `truncation_length` bytes.
888    ///
889    /// If truncation is not possible, returns `data`.
890    ///
891    /// The `bool` in the returned tuple indicates whether truncation occurred or not.
892    ///
893    /// UTF-8 Note:
894    /// If the column type indicates UTF-8, and `data` contains valid UTF-8, then the result will
895    /// also remain valid UTF-8, but may be less tnan `truncation_length` bytes to avoid splitting
896    /// on non-character boundaries.
897    fn truncate_min_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
898        truncation_length
899            .filter(|l| data.len() > *l)
900            .and_then(|l|
901                // don't do extra work if this column isn't UTF-8
902                if self.is_utf8() {
903                    match str::from_utf8(data) {
904                        Ok(str_data) => truncate_utf8(str_data, l),
905                        Err(_) => Some(data[..l].to_vec()),
906                    }
907                } else {
908                    Some(data[..l].to_vec())
909                }
910            )
911            .map(|truncated| (truncated, true))
912            .unwrap_or_else(|| (data.to_vec(), false))
913    }
914
915    /// Truncates a binary statistic to at most `truncation_length` bytes, and then increment the
916    /// final byte(s) to yield a valid upper bound. This may result in a result of less than
917    /// `truncation_length` bytes if the last byte(s) overflows.
918    ///
919    /// If truncation is not possible, returns `data`.
920    ///
921    /// The `bool` in the returned tuple indicates whether truncation occurred or not.
922    ///
923    /// UTF-8 Note:
924    /// If the column type indicates UTF-8, and `data` contains valid UTF-8, then the result will
925    /// also remain valid UTF-8 (but again may be less than `truncation_length` bytes). If `data`
926    /// does not contain valid UTF-8, then truncation will occur as if the column is non-string
927    /// binary.
928    fn truncate_max_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
929        truncation_length
930            .filter(|l| data.len() > *l)
931            .and_then(|l|
932                // don't do extra work if this column isn't UTF-8
933                if self.is_utf8() {
934                    match str::from_utf8(data) {
935                        Ok(str_data) => truncate_and_increment_utf8(str_data, l),
936                        Err(_) => increment(data[..l].to_vec()),
937                    }
938                } else {
939                    increment(data[..l].to_vec())
940                }
941            )
942            .map(|truncated| (truncated, true))
943            .unwrap_or_else(|| (data.to_vec(), false))
944    }
945
946    /// Adds data page.
947    /// Data page is either buffered in case of dictionary encoding or written directly.
948    fn add_data_page(&mut self) -> Result<()> {
949        // Extract encoded values
950        let values_data = self.encoder.flush_data_page()?;
951
952        let max_def_level = self.descr.max_def_level();
953        let max_rep_level = self.descr.max_rep_level();
954
955        self.column_metrics.num_column_nulls += self.page_metrics.num_page_nulls;
956
957        let page_statistics = match (values_data.min_value, values_data.max_value) {
958            (Some(min), Some(max)) => {
959                // Update chunk level statistics
960                update_min(&self.descr, &min, &mut self.column_metrics.min_column_value);
961                update_max(&self.descr, &max, &mut self.column_metrics.max_column_value);
962
963                (self.statistics_enabled == EnabledStatistics::Page).then_some(
964                    ValueStatistics::new(
965                        Some(min),
966                        Some(max),
967                        None,
968                        Some(self.page_metrics.num_page_nulls),
969                        false,
970                    ),
971                )
972            }
973            _ => None,
974        };
975
976        // update column and offset index
977        self.update_column_offset_index(
978            page_statistics.as_ref(),
979            values_data.variable_length_bytes,
980        );
981
982        // Update histograms and variable_length_bytes in column_metrics
983        self.column_metrics
984            .update_from_page_metrics(&self.page_metrics);
985        self.column_metrics
986            .update_variable_length_bytes(values_data.variable_length_bytes);
987
988        let page_statistics = page_statistics.map(Statistics::from);
989
990        let compressed_page = match self.props.writer_version() {
991            WriterVersion::PARQUET_1_0 => {
992                let mut buffer = vec![];
993
994                if max_rep_level > 0 {
995                    buffer.extend_from_slice(
996                        &self.encode_levels_v1(
997                            Encoding::RLE,
998                            &self.rep_levels_sink[..],
999                            max_rep_level,
1000                        )[..],
1001                    );
1002                }
1003
1004                if max_def_level > 0 {
1005                    buffer.extend_from_slice(
1006                        &self.encode_levels_v1(
1007                            Encoding::RLE,
1008                            &self.def_levels_sink[..],
1009                            max_def_level,
1010                        )[..],
1011                    );
1012                }
1013
1014                buffer.extend_from_slice(&values_data.buf);
1015                let uncompressed_size = buffer.len();
1016
1017                if let Some(ref mut cmpr) = self.compressor {
1018                    let mut compressed_buf = Vec::with_capacity(uncompressed_size);
1019                    cmpr.compress(&buffer[..], &mut compressed_buf)?;
1020                    buffer = compressed_buf;
1021                }
1022
1023                let data_page = Page::DataPage {
1024                    buf: buffer.into(),
1025                    num_values: self.page_metrics.num_buffered_values,
1026                    encoding: values_data.encoding,
1027                    def_level_encoding: Encoding::RLE,
1028                    rep_level_encoding: Encoding::RLE,
1029                    statistics: page_statistics,
1030                };
1031
1032                CompressedPage::new(data_page, uncompressed_size)
1033            }
1034            WriterVersion::PARQUET_2_0 => {
1035                let mut rep_levels_byte_len = 0;
1036                let mut def_levels_byte_len = 0;
1037                let mut buffer = vec![];
1038
1039                if max_rep_level > 0 {
1040                    let levels = self.encode_levels_v2(&self.rep_levels_sink[..], max_rep_level);
1041                    rep_levels_byte_len = levels.len();
1042                    buffer.extend_from_slice(&levels[..]);
1043                }
1044
1045                if max_def_level > 0 {
1046                    let levels = self.encode_levels_v2(&self.def_levels_sink[..], max_def_level);
1047                    def_levels_byte_len = levels.len();
1048                    buffer.extend_from_slice(&levels[..]);
1049                }
1050
1051                let uncompressed_size =
1052                    rep_levels_byte_len + def_levels_byte_len + values_data.buf.len();
1053
1054                // Data Page v2 compresses values only.
1055                match self.compressor {
1056                    Some(ref mut cmpr) => {
1057                        cmpr.compress(&values_data.buf, &mut buffer)?;
1058                    }
1059                    None => buffer.extend_from_slice(&values_data.buf),
1060                }
1061
1062                let data_page = Page::DataPageV2 {
1063                    buf: buffer.into(),
1064                    num_values: self.page_metrics.num_buffered_values,
1065                    encoding: values_data.encoding,
1066                    num_nulls: self.page_metrics.num_page_nulls as u32,
1067                    num_rows: self.page_metrics.num_buffered_rows,
1068                    def_levels_byte_len: def_levels_byte_len as u32,
1069                    rep_levels_byte_len: rep_levels_byte_len as u32,
1070                    is_compressed: self.compressor.is_some(),
1071                    statistics: page_statistics,
1072                };
1073
1074                CompressedPage::new(data_page, uncompressed_size)
1075            }
1076        };
1077
1078        // Check if we need to buffer data page or flush it to the sink directly.
1079        if self.encoder.has_dictionary() {
1080            self.data_pages.push_back(compressed_page);
1081        } else {
1082            self.write_data_page(compressed_page)?;
1083        }
1084
1085        // Update total number of rows.
1086        self.column_metrics.total_rows_written += self.page_metrics.num_buffered_rows as u64;
1087
1088        // Reset state.
1089        self.rep_levels_sink.clear();
1090        self.def_levels_sink.clear();
1091        self.page_metrics.new_page();
1092
1093        Ok(())
1094    }
1095
1096    /// Finalises any outstanding data pages and flushes buffered data pages from
1097    /// dictionary encoding into underlying sink.
1098    #[inline]
1099    fn flush_data_pages(&mut self) -> Result<()> {
1100        // Write all outstanding data to a new page.
1101        if self.page_metrics.num_buffered_values > 0 {
1102            self.add_data_page()?;
1103        }
1104
1105        while let Some(page) = self.data_pages.pop_front() {
1106            self.write_data_page(page)?;
1107        }
1108
1109        Ok(())
1110    }
1111
1112    /// Assembles column chunk metadata.
1113    fn build_column_metadata(&mut self) -> Result<ColumnChunkMetaData> {
1114        let total_compressed_size = self.column_metrics.total_compressed_size as i64;
1115        let total_uncompressed_size = self.column_metrics.total_uncompressed_size as i64;
1116        let num_values = self.column_metrics.total_num_values as i64;
1117        let dict_page_offset = self.column_metrics.dictionary_page_offset.map(|v| v as i64);
1118        // If data page offset is not set, then no pages have been written
1119        let data_page_offset = self.column_metrics.data_page_offset.unwrap_or(0) as i64;
1120
1121        let mut builder = ColumnChunkMetaData::builder(self.descr.clone())
1122            .set_compression(self.codec)
1123            .set_encodings(self.encodings.iter().cloned().collect())
1124            .set_total_compressed_size(total_compressed_size)
1125            .set_total_uncompressed_size(total_uncompressed_size)
1126            .set_num_values(num_values)
1127            .set_data_page_offset(data_page_offset)
1128            .set_dictionary_page_offset(dict_page_offset);
1129
1130        if self.statistics_enabled != EnabledStatistics::None {
1131            let backwards_compatible_min_max = self.descr.sort_order().is_signed();
1132
1133            let statistics = ValueStatistics::<E::T>::new(
1134                self.column_metrics.min_column_value.clone(),
1135                self.column_metrics.max_column_value.clone(),
1136                self.column_metrics.column_distinct_count,
1137                Some(self.column_metrics.num_column_nulls),
1138                false,
1139            )
1140            .with_backwards_compatible_min_max(backwards_compatible_min_max)
1141            .into();
1142
1143            let statistics = match statistics {
1144                Statistics::ByteArray(stats) if stats._internal_has_min_max_set() => {
1145                    let (min, did_truncate_min) = self.truncate_min_value(
1146                        self.props.statistics_truncate_length(),
1147                        stats.min_bytes_opt().unwrap(),
1148                    );
1149                    let (max, did_truncate_max) = self.truncate_max_value(
1150                        self.props.statistics_truncate_length(),
1151                        stats.max_bytes_opt().unwrap(),
1152                    );
1153                    Statistics::ByteArray(
1154                        ValueStatistics::new(
1155                            Some(min.into()),
1156                            Some(max.into()),
1157                            stats.distinct_count(),
1158                            stats.null_count_opt(),
1159                            backwards_compatible_min_max,
1160                        )
1161                        .with_max_is_exact(!did_truncate_max)
1162                        .with_min_is_exact(!did_truncate_min),
1163                    )
1164                }
1165                Statistics::FixedLenByteArray(stats)
1166                    if (stats._internal_has_min_max_set() && self.can_truncate_value()) =>
1167                {
1168                    let (min, did_truncate_min) = self.truncate_min_value(
1169                        self.props.statistics_truncate_length(),
1170                        stats.min_bytes_opt().unwrap(),
1171                    );
1172                    let (max, did_truncate_max) = self.truncate_max_value(
1173                        self.props.statistics_truncate_length(),
1174                        stats.max_bytes_opt().unwrap(),
1175                    );
1176                    Statistics::FixedLenByteArray(
1177                        ValueStatistics::new(
1178                            Some(min.into()),
1179                            Some(max.into()),
1180                            stats.distinct_count(),
1181                            stats.null_count_opt(),
1182                            backwards_compatible_min_max,
1183                        )
1184                        .with_max_is_exact(!did_truncate_max)
1185                        .with_min_is_exact(!did_truncate_min),
1186                    )
1187                }
1188                stats => stats,
1189            };
1190
1191            builder = builder
1192                .set_statistics(statistics)
1193                .set_unencoded_byte_array_data_bytes(self.column_metrics.variable_length_bytes)
1194                .set_repetition_level_histogram(
1195                    self.column_metrics.repetition_level_histogram.take(),
1196                )
1197                .set_definition_level_histogram(
1198                    self.column_metrics.definition_level_histogram.take(),
1199                );
1200        }
1201
1202        let metadata = builder.build()?;
1203        Ok(metadata)
1204    }
1205
1206    /// Encodes definition or repetition levels for Data Page v1.
1207    #[inline]
1208    fn encode_levels_v1(&self, encoding: Encoding, levels: &[i16], max_level: i16) -> Vec<u8> {
1209        let mut encoder = LevelEncoder::v1(encoding, max_level, levels.len());
1210        encoder.put(levels);
1211        encoder.consume()
1212    }
1213
1214    /// Encodes definition or repetition levels for Data Page v2.
1215    /// Encoding is always RLE.
1216    #[inline]
1217    fn encode_levels_v2(&self, levels: &[i16], max_level: i16) -> Vec<u8> {
1218        let mut encoder = LevelEncoder::v2(max_level, levels.len());
1219        encoder.put(levels);
1220        encoder.consume()
1221    }
1222
1223    /// Writes compressed data page into underlying sink and updates global metrics.
1224    #[inline]
1225    fn write_data_page(&mut self, page: CompressedPage) -> Result<()> {
1226        self.encodings.insert(page.encoding());
1227        let page_spec = self.page_writer.write_page(page)?;
1228        // update offset index
1229        // compressed_size = header_size + compressed_data_size
1230        if let Some(builder) = self.offset_index_builder.as_mut() {
1231            builder
1232                .append_offset_and_size(page_spec.offset as i64, page_spec.compressed_size as i32)
1233        }
1234        self.update_metrics_for_page(page_spec);
1235        Ok(())
1236    }
1237
1238    /// Writes dictionary page into underlying sink.
1239    #[inline]
1240    fn write_dictionary_page(&mut self) -> Result<()> {
1241        let compressed_page = {
1242            let mut page = self
1243                .encoder
1244                .flush_dict_page()?
1245                .ok_or_else(|| general_err!("Dictionary encoder is not set"))?;
1246
1247            let uncompressed_size = page.buf.len();
1248
1249            if let Some(ref mut cmpr) = self.compressor {
1250                let mut output_buf = Vec::with_capacity(uncompressed_size);
1251                cmpr.compress(&page.buf, &mut output_buf)?;
1252                page.buf = Bytes::from(output_buf);
1253            }
1254
1255            let dict_page = Page::DictionaryPage {
1256                buf: page.buf,
1257                num_values: page.num_values as u32,
1258                encoding: self.props.dictionary_page_encoding(),
1259                is_sorted: page.is_sorted,
1260            };
1261            CompressedPage::new(dict_page, uncompressed_size)
1262        };
1263
1264        self.encodings.insert(compressed_page.encoding());
1265        let page_spec = self.page_writer.write_page(compressed_page)?;
1266        self.update_metrics_for_page(page_spec);
1267        // For the directory page, don't need to update column/offset index.
1268        Ok(())
1269    }
1270
1271    /// Updates column writer metrics with each page metadata.
1272    #[inline]
1273    fn update_metrics_for_page(&mut self, page_spec: PageWriteSpec) {
1274        self.column_metrics.total_uncompressed_size += page_spec.uncompressed_size as u64;
1275        self.column_metrics.total_compressed_size += page_spec.compressed_size as u64;
1276        self.column_metrics.total_bytes_written += page_spec.bytes_written;
1277
1278        match page_spec.page_type {
1279            PageType::DATA_PAGE | PageType::DATA_PAGE_V2 => {
1280                self.column_metrics.total_num_values += page_spec.num_values as u64;
1281                if self.column_metrics.data_page_offset.is_none() {
1282                    self.column_metrics.data_page_offset = Some(page_spec.offset);
1283                }
1284            }
1285            PageType::DICTIONARY_PAGE => {
1286                assert!(
1287                    self.column_metrics.dictionary_page_offset.is_none(),
1288                    "Dictionary offset is already set"
1289                );
1290                self.column_metrics.dictionary_page_offset = Some(page_spec.offset);
1291            }
1292            _ => {}
1293        }
1294    }
1295}
1296
1297fn update_min<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, min: &mut Option<T>) {
1298    update_stat::<T, _>(descr, val, min, |cur| compare_greater(descr, cur, val))
1299}
1300
1301fn update_max<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, max: &mut Option<T>) {
1302    update_stat::<T, _>(descr, val, max, |cur| compare_greater(descr, val, cur))
1303}
1304
1305#[inline]
1306#[allow(clippy::eq_op)]
1307fn is_nan<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T) -> bool {
1308    match T::PHYSICAL_TYPE {
1309        Type::FLOAT | Type::DOUBLE => val != val,
1310        Type::FIXED_LEN_BYTE_ARRAY if descr.logical_type() == Some(LogicalType::Float16) => {
1311            let val = val.as_bytes();
1312            let val = f16::from_le_bytes([val[0], val[1]]);
1313            val.is_nan()
1314        }
1315        _ => false,
1316    }
1317}
1318
1319/// Perform a conditional update of `cur`, skipping any NaN values
1320///
1321/// If `cur` is `None`, sets `cur` to `Some(val)`, otherwise calls `should_update` with
1322/// the value of `cur`, and updates `cur` to `Some(val)` if it returns `true`
1323fn update_stat<T: ParquetValueType, F>(
1324    descr: &ColumnDescriptor,
1325    val: &T,
1326    cur: &mut Option<T>,
1327    should_update: F,
1328) where
1329    F: Fn(&T) -> bool,
1330{
1331    if is_nan(descr, val) {
1332        return;
1333    }
1334
1335    if cur.as_ref().map_or(true, should_update) {
1336        *cur = Some(val.clone());
1337    }
1338}
1339
1340/// Evaluate `a > b` according to underlying logical type.
1341fn compare_greater<T: ParquetValueType>(descr: &ColumnDescriptor, a: &T, b: &T) -> bool {
1342    if let Some(LogicalType::Integer { is_signed, .. }) = descr.logical_type() {
1343        if !is_signed {
1344            // need to compare unsigned
1345            return a.as_u64().unwrap() > b.as_u64().unwrap();
1346        }
1347    }
1348
1349    match descr.converted_type() {
1350        ConvertedType::UINT_8
1351        | ConvertedType::UINT_16
1352        | ConvertedType::UINT_32
1353        | ConvertedType::UINT_64 => {
1354            return a.as_u64().unwrap() > b.as_u64().unwrap();
1355        }
1356        _ => {}
1357    };
1358
1359    if let Some(LogicalType::Decimal { .. }) = descr.logical_type() {
1360        match T::PHYSICAL_TYPE {
1361            Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
1362                return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1363            }
1364            _ => {}
1365        };
1366    }
1367
1368    if descr.converted_type() == ConvertedType::DECIMAL {
1369        match T::PHYSICAL_TYPE {
1370            Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
1371                return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1372            }
1373            _ => {}
1374        };
1375    };
1376
1377    if let Some(LogicalType::Float16) = descr.logical_type() {
1378        let a = a.as_bytes();
1379        let a = f16::from_le_bytes([a[0], a[1]]);
1380        let b = b.as_bytes();
1381        let b = f16::from_le_bytes([b[0], b[1]]);
1382        return a > b;
1383    }
1384
1385    a > b
1386}
1387
1388// ----------------------------------------------------------------------
1389// Encoding support for column writer.
1390// This mirrors parquet-mr default encodings for writes. See:
1391// https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV1ValuesWriterFactory.java
1392// https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV2ValuesWriterFactory.java
1393
1394/// Returns encoding for a column when no other encoding is provided in writer properties.
1395fn fallback_encoding(kind: Type, props: &WriterProperties) -> Encoding {
1396    match (kind, props.writer_version()) {
1397        (Type::BOOLEAN, WriterVersion::PARQUET_2_0) => Encoding::RLE,
1398        (Type::INT32, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1399        (Type::INT64, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1400        (Type::BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1401        (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1402        _ => Encoding::PLAIN,
1403    }
1404}
1405
1406/// Returns true if dictionary is supported for column writer, false otherwise.
1407fn has_dictionary_support(kind: Type, props: &WriterProperties) -> bool {
1408    match (kind, props.writer_version()) {
1409        // Booleans do not support dict encoding and should use a fallback encoding.
1410        (Type::BOOLEAN, _) => false,
1411        // Dictionary encoding was not enabled in PARQUET 1.0
1412        (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_1_0) => false,
1413        (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => true,
1414        _ => true,
1415    }
1416}
1417
1418/// Signed comparison of bytes arrays
1419fn compare_greater_byte_array_decimals(a: &[u8], b: &[u8]) -> bool {
1420    let a_length = a.len();
1421    let b_length = b.len();
1422
1423    if a_length == 0 || b_length == 0 {
1424        return a_length > 0;
1425    }
1426
1427    let first_a: u8 = a[0];
1428    let first_b: u8 = b[0];
1429
1430    // We can short circuit for different signed numbers or
1431    // for equal length bytes arrays that have different first bytes.
1432    // The equality requirement is necessary for sign extension cases.
1433    // 0xFF10 should be equal to 0x10 (due to big endian sign extension).
1434    if (0x80 & first_a) != (0x80 & first_b) || (a_length == b_length && first_a != first_b) {
1435        return (first_a as i8) > (first_b as i8);
1436    }
1437
1438    // When the lengths are unequal and the numbers are of the same
1439    // sign we need to do comparison by sign extending the shorter
1440    // value first, and once we get to equal sized arrays, lexicographical
1441    // unsigned comparison of everything but the first byte is sufficient.
1442
1443    let extension: u8 = if (first_a as i8) < 0 { 0xFF } else { 0 };
1444
1445    if a_length != b_length {
1446        let not_equal = if a_length > b_length {
1447            let lead_length = a_length - b_length;
1448            a[0..lead_length].iter().any(|&x| x != extension)
1449        } else {
1450            let lead_length = b_length - a_length;
1451            b[0..lead_length].iter().any(|&x| x != extension)
1452        };
1453
1454        if not_equal {
1455            let negative_values: bool = (first_a as i8) < 0;
1456            let a_longer: bool = a_length > b_length;
1457            return if negative_values { !a_longer } else { a_longer };
1458        }
1459    }
1460
1461    (a[1..]) > (b[1..])
1462}
1463
1464/// Truncate a UTF-8 slice to the longest prefix that is still a valid UTF-8 string,
1465/// while being less than `length` bytes and non-empty. Returns `None` if truncation
1466/// is not possible within those constraints.
1467///
1468/// The caller guarantees that data.len() > length.
1469fn truncate_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1470    let split = (1..=length).rfind(|x| data.is_char_boundary(*x))?;
1471    Some(data.as_bytes()[..split].to_vec())
1472}
1473
1474/// Truncate a UTF-8 slice and increment it's final character. The returned value is the
1475/// longest such slice that is still a valid UTF-8 string while being less than `length`
1476/// bytes and non-empty. Returns `None` if no such transformation is possible.
1477///
1478/// The caller guarantees that data.len() > length.
1479fn truncate_and_increment_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1480    // UTF-8 is max 4 bytes, so start search 3 back from desired length
1481    let lower_bound = length.saturating_sub(3);
1482    let split = (lower_bound..=length).rfind(|x| data.is_char_boundary(*x))?;
1483    increment_utf8(data.get(..split)?)
1484}
1485
1486/// Increment the final character in a UTF-8 string in such a way that the returned result
1487/// is still a valid UTF-8 string. The returned string may be shorter than the input if the
1488/// last character(s) cannot be incremented (due to overflow or producing invalid code points).
1489/// Returns `None` if the string cannot be incremented.
1490///
1491/// Note that this implementation will not promote an N-byte code point to (N+1) bytes.
1492fn increment_utf8(data: &str) -> Option<Vec<u8>> {
1493    for (idx, original_char) in data.char_indices().rev() {
1494        let original_len = original_char.len_utf8();
1495        if let Some(next_char) = char::from_u32(original_char as u32 + 1) {
1496            // do not allow increasing byte width of incremented char
1497            if next_char.len_utf8() == original_len {
1498                let mut result = data.as_bytes()[..idx + original_len].to_vec();
1499                next_char.encode_utf8(&mut result[idx..]);
1500                return Some(result);
1501            }
1502        }
1503    }
1504
1505    None
1506}
1507
1508/// Try and increment the bytes from right to left.
1509///
1510/// Returns `None` if all bytes are set to `u8::MAX`.
1511fn increment(mut data: Vec<u8>) -> Option<Vec<u8>> {
1512    for byte in data.iter_mut().rev() {
1513        let (incremented, overflow) = byte.overflowing_add(1);
1514        *byte = incremented;
1515
1516        if !overflow {
1517            return Some(data);
1518        }
1519    }
1520
1521    None
1522}
1523
1524#[cfg(test)]
1525mod tests {
1526    use crate::{
1527        file::{properties::DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, writer::SerializedFileWriter},
1528        schema::parser::parse_message_type,
1529    };
1530    use core::str;
1531    use rand::distributions::uniform::SampleUniform;
1532    use std::{fs::File, sync::Arc};
1533
1534    use crate::column::{
1535        page::PageReader,
1536        reader::{get_column_reader, get_typed_column_reader, ColumnReaderImpl},
1537    };
1538    use crate::file::writer::TrackedWrite;
1539    use crate::file::{
1540        properties::ReaderProperties, reader::SerializedPageReader, writer::SerializedPageWriter,
1541    };
1542    use crate::schema::types::{ColumnPath, Type as SchemaType};
1543    use crate::util::test_common::rand_gen::random_numbers_range;
1544
1545    use super::*;
1546
1547    #[test]
1548    fn test_column_writer_inconsistent_def_rep_length() {
1549        let page_writer = get_test_page_writer();
1550        let props = Default::default();
1551        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 1, props);
1552        let res = writer.write_batch(&[1, 2, 3, 4], Some(&[1, 1, 1]), Some(&[0, 0]));
1553        assert!(res.is_err());
1554        if let Err(err) = res {
1555            assert_eq!(
1556                format!("{err}"),
1557                "Parquet error: Inconsistent length of definition and repetition levels: 3 != 2"
1558            );
1559        }
1560    }
1561
1562    #[test]
1563    fn test_column_writer_invalid_def_levels() {
1564        let page_writer = get_test_page_writer();
1565        let props = Default::default();
1566        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1567        let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1568        assert!(res.is_err());
1569        if let Err(err) = res {
1570            assert_eq!(
1571                format!("{err}"),
1572                "Parquet error: Definition levels are required, because max definition level = 1"
1573            );
1574        }
1575    }
1576
1577    #[test]
1578    fn test_column_writer_invalid_rep_levels() {
1579        let page_writer = get_test_page_writer();
1580        let props = Default::default();
1581        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 1, props);
1582        let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1583        assert!(res.is_err());
1584        if let Err(err) = res {
1585            assert_eq!(
1586                format!("{err}"),
1587                "Parquet error: Repetition levels are required, because max repetition level = 1"
1588            );
1589        }
1590    }
1591
1592    #[test]
1593    fn test_column_writer_not_enough_values_to_write() {
1594        let page_writer = get_test_page_writer();
1595        let props = Default::default();
1596        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1597        let res = writer.write_batch(&[1, 2], Some(&[1, 1, 1, 1]), None);
1598        assert!(res.is_err());
1599        if let Err(err) = res {
1600            assert_eq!(
1601                format!("{err}"),
1602                "Parquet error: Expected to write 4 values, but have only 2"
1603            );
1604        }
1605    }
1606
1607    #[test]
1608    fn test_column_writer_write_only_one_dictionary_page() {
1609        let page_writer = get_test_page_writer();
1610        let props = Default::default();
1611        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1612        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1613        // First page should be correctly written.
1614        writer.add_data_page().unwrap();
1615        writer.write_dictionary_page().unwrap();
1616        let err = writer.write_dictionary_page().unwrap_err().to_string();
1617        assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1618    }
1619
1620    #[test]
1621    fn test_column_writer_error_when_writing_disabled_dictionary() {
1622        let page_writer = get_test_page_writer();
1623        let props = Arc::new(
1624            WriterProperties::builder()
1625                .set_dictionary_enabled(false)
1626                .build(),
1627        );
1628        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1629        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1630        let err = writer.write_dictionary_page().unwrap_err().to_string();
1631        assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1632    }
1633
1634    #[test]
1635    fn test_column_writer_boolean_type_does_not_support_dictionary() {
1636        let page_writer = get_test_page_writer();
1637        let props = Arc::new(
1638            WriterProperties::builder()
1639                .set_dictionary_enabled(true)
1640                .build(),
1641        );
1642        let mut writer = get_test_column_writer::<BoolType>(page_writer, 0, 0, props);
1643        writer
1644            .write_batch(&[true, false, true, false], None, None)
1645            .unwrap();
1646
1647        let r = writer.close().unwrap();
1648        // PlainEncoder uses bit writer to write boolean values, which all fit into 1
1649        // byte.
1650        assert_eq!(r.bytes_written, 1);
1651        assert_eq!(r.rows_written, 4);
1652
1653        let metadata = r.metadata;
1654        assert_eq!(metadata.encodings(), &vec![Encoding::PLAIN, Encoding::RLE]);
1655        assert_eq!(metadata.num_values(), 4); // just values
1656        assert_eq!(metadata.dictionary_page_offset(), None);
1657    }
1658
1659    #[test]
1660    fn test_column_writer_default_encoding_support_bool() {
1661        check_encoding_write_support::<BoolType>(
1662            WriterVersion::PARQUET_1_0,
1663            true,
1664            &[true, false],
1665            None,
1666            &[Encoding::PLAIN, Encoding::RLE],
1667        );
1668        check_encoding_write_support::<BoolType>(
1669            WriterVersion::PARQUET_1_0,
1670            false,
1671            &[true, false],
1672            None,
1673            &[Encoding::PLAIN, Encoding::RLE],
1674        );
1675        check_encoding_write_support::<BoolType>(
1676            WriterVersion::PARQUET_2_0,
1677            true,
1678            &[true, false],
1679            None,
1680            &[Encoding::RLE],
1681        );
1682        check_encoding_write_support::<BoolType>(
1683            WriterVersion::PARQUET_2_0,
1684            false,
1685            &[true, false],
1686            None,
1687            &[Encoding::RLE],
1688        );
1689    }
1690
1691    #[test]
1692    fn test_column_writer_default_encoding_support_int32() {
1693        check_encoding_write_support::<Int32Type>(
1694            WriterVersion::PARQUET_1_0,
1695            true,
1696            &[1, 2],
1697            Some(0),
1698            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1699        );
1700        check_encoding_write_support::<Int32Type>(
1701            WriterVersion::PARQUET_1_0,
1702            false,
1703            &[1, 2],
1704            None,
1705            &[Encoding::PLAIN, Encoding::RLE],
1706        );
1707        check_encoding_write_support::<Int32Type>(
1708            WriterVersion::PARQUET_2_0,
1709            true,
1710            &[1, 2],
1711            Some(0),
1712            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1713        );
1714        check_encoding_write_support::<Int32Type>(
1715            WriterVersion::PARQUET_2_0,
1716            false,
1717            &[1, 2],
1718            None,
1719            &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
1720        );
1721    }
1722
1723    #[test]
1724    fn test_column_writer_default_encoding_support_int64() {
1725        check_encoding_write_support::<Int64Type>(
1726            WriterVersion::PARQUET_1_0,
1727            true,
1728            &[1, 2],
1729            Some(0),
1730            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1731        );
1732        check_encoding_write_support::<Int64Type>(
1733            WriterVersion::PARQUET_1_0,
1734            false,
1735            &[1, 2],
1736            None,
1737            &[Encoding::PLAIN, Encoding::RLE],
1738        );
1739        check_encoding_write_support::<Int64Type>(
1740            WriterVersion::PARQUET_2_0,
1741            true,
1742            &[1, 2],
1743            Some(0),
1744            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1745        );
1746        check_encoding_write_support::<Int64Type>(
1747            WriterVersion::PARQUET_2_0,
1748            false,
1749            &[1, 2],
1750            None,
1751            &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
1752        );
1753    }
1754
1755    #[test]
1756    fn test_column_writer_default_encoding_support_int96() {
1757        check_encoding_write_support::<Int96Type>(
1758            WriterVersion::PARQUET_1_0,
1759            true,
1760            &[Int96::from(vec![1, 2, 3])],
1761            Some(0),
1762            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1763        );
1764        check_encoding_write_support::<Int96Type>(
1765            WriterVersion::PARQUET_1_0,
1766            false,
1767            &[Int96::from(vec![1, 2, 3])],
1768            None,
1769            &[Encoding::PLAIN, Encoding::RLE],
1770        );
1771        check_encoding_write_support::<Int96Type>(
1772            WriterVersion::PARQUET_2_0,
1773            true,
1774            &[Int96::from(vec![1, 2, 3])],
1775            Some(0),
1776            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1777        );
1778        check_encoding_write_support::<Int96Type>(
1779            WriterVersion::PARQUET_2_0,
1780            false,
1781            &[Int96::from(vec![1, 2, 3])],
1782            None,
1783            &[Encoding::PLAIN, Encoding::RLE],
1784        );
1785    }
1786
1787    #[test]
1788    fn test_column_writer_default_encoding_support_float() {
1789        check_encoding_write_support::<FloatType>(
1790            WriterVersion::PARQUET_1_0,
1791            true,
1792            &[1.0, 2.0],
1793            Some(0),
1794            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1795        );
1796        check_encoding_write_support::<FloatType>(
1797            WriterVersion::PARQUET_1_0,
1798            false,
1799            &[1.0, 2.0],
1800            None,
1801            &[Encoding::PLAIN, Encoding::RLE],
1802        );
1803        check_encoding_write_support::<FloatType>(
1804            WriterVersion::PARQUET_2_0,
1805            true,
1806            &[1.0, 2.0],
1807            Some(0),
1808            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1809        );
1810        check_encoding_write_support::<FloatType>(
1811            WriterVersion::PARQUET_2_0,
1812            false,
1813            &[1.0, 2.0],
1814            None,
1815            &[Encoding::PLAIN, Encoding::RLE],
1816        );
1817    }
1818
1819    #[test]
1820    fn test_column_writer_default_encoding_support_double() {
1821        check_encoding_write_support::<DoubleType>(
1822            WriterVersion::PARQUET_1_0,
1823            true,
1824            &[1.0, 2.0],
1825            Some(0),
1826            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1827        );
1828        check_encoding_write_support::<DoubleType>(
1829            WriterVersion::PARQUET_1_0,
1830            false,
1831            &[1.0, 2.0],
1832            None,
1833            &[Encoding::PLAIN, Encoding::RLE],
1834        );
1835        check_encoding_write_support::<DoubleType>(
1836            WriterVersion::PARQUET_2_0,
1837            true,
1838            &[1.0, 2.0],
1839            Some(0),
1840            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1841        );
1842        check_encoding_write_support::<DoubleType>(
1843            WriterVersion::PARQUET_2_0,
1844            false,
1845            &[1.0, 2.0],
1846            None,
1847            &[Encoding::PLAIN, Encoding::RLE],
1848        );
1849    }
1850
1851    #[test]
1852    fn test_column_writer_default_encoding_support_byte_array() {
1853        check_encoding_write_support::<ByteArrayType>(
1854            WriterVersion::PARQUET_1_0,
1855            true,
1856            &[ByteArray::from(vec![1u8])],
1857            Some(0),
1858            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1859        );
1860        check_encoding_write_support::<ByteArrayType>(
1861            WriterVersion::PARQUET_1_0,
1862            false,
1863            &[ByteArray::from(vec![1u8])],
1864            None,
1865            &[Encoding::PLAIN, Encoding::RLE],
1866        );
1867        check_encoding_write_support::<ByteArrayType>(
1868            WriterVersion::PARQUET_2_0,
1869            true,
1870            &[ByteArray::from(vec![1u8])],
1871            Some(0),
1872            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1873        );
1874        check_encoding_write_support::<ByteArrayType>(
1875            WriterVersion::PARQUET_2_0,
1876            false,
1877            &[ByteArray::from(vec![1u8])],
1878            None,
1879            &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
1880        );
1881    }
1882
1883    #[test]
1884    fn test_column_writer_default_encoding_support_fixed_len_byte_array() {
1885        check_encoding_write_support::<FixedLenByteArrayType>(
1886            WriterVersion::PARQUET_1_0,
1887            true,
1888            &[ByteArray::from(vec![1u8]).into()],
1889            None,
1890            &[Encoding::PLAIN, Encoding::RLE],
1891        );
1892        check_encoding_write_support::<FixedLenByteArrayType>(
1893            WriterVersion::PARQUET_1_0,
1894            false,
1895            &[ByteArray::from(vec![1u8]).into()],
1896            None,
1897            &[Encoding::PLAIN, Encoding::RLE],
1898        );
1899        check_encoding_write_support::<FixedLenByteArrayType>(
1900            WriterVersion::PARQUET_2_0,
1901            true,
1902            &[ByteArray::from(vec![1u8]).into()],
1903            Some(0),
1904            &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1905        );
1906        check_encoding_write_support::<FixedLenByteArrayType>(
1907            WriterVersion::PARQUET_2_0,
1908            false,
1909            &[ByteArray::from(vec![1u8]).into()],
1910            None,
1911            &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
1912        );
1913    }
1914
1915    #[test]
1916    fn test_column_writer_check_metadata() {
1917        let page_writer = get_test_page_writer();
1918        let props = Default::default();
1919        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1920        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1921
1922        let r = writer.close().unwrap();
1923        assert_eq!(r.bytes_written, 20);
1924        assert_eq!(r.rows_written, 4);
1925
1926        let metadata = r.metadata;
1927        assert_eq!(
1928            metadata.encodings(),
1929            &vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
1930        );
1931        assert_eq!(metadata.num_values(), 4);
1932        assert_eq!(metadata.compressed_size(), 20);
1933        assert_eq!(metadata.uncompressed_size(), 20);
1934        assert_eq!(metadata.data_page_offset(), 0);
1935        assert_eq!(metadata.dictionary_page_offset(), Some(0));
1936        if let Some(stats) = metadata.statistics() {
1937            assert_eq!(stats.null_count_opt(), Some(0));
1938            assert_eq!(stats.distinct_count_opt(), None);
1939            if let Statistics::Int32(stats) = stats {
1940                assert_eq!(stats.min_opt().unwrap(), &1);
1941                assert_eq!(stats.max_opt().unwrap(), &4);
1942            } else {
1943                panic!("expecting Statistics::Int32");
1944            }
1945        } else {
1946            panic!("metadata missing statistics");
1947        }
1948    }
1949
1950    #[test]
1951    fn test_column_writer_check_byte_array_min_max() {
1952        let page_writer = get_test_page_writer();
1953        let props = Default::default();
1954        let mut writer = get_test_decimals_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
1955        writer
1956            .write_batch(
1957                &[
1958                    ByteArray::from(vec![
1959                        255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
1960                        35u8, 231u8, 90u8, 0u8, 0u8,
1961                    ]),
1962                    ByteArray::from(vec![
1963                        255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 228u8, 62u8, 146u8,
1964                        152u8, 177u8, 56u8, 0u8, 0u8,
1965                    ]),
1966                    ByteArray::from(vec![
1967                        0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8,
1968                        0u8,
1969                    ]),
1970                    ByteArray::from(vec![
1971                        0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
1972                        44u8, 0u8, 0u8,
1973                    ]),
1974                ],
1975                None,
1976                None,
1977            )
1978            .unwrap();
1979        let metadata = writer.close().unwrap().metadata;
1980        if let Some(stats) = metadata.statistics() {
1981            if let Statistics::ByteArray(stats) = stats {
1982                assert_eq!(
1983                    stats.min_opt().unwrap(),
1984                    &ByteArray::from(vec![
1985                        255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
1986                        35u8, 231u8, 90u8, 0u8, 0u8,
1987                    ])
1988                );
1989                assert_eq!(
1990                    stats.max_opt().unwrap(),
1991                    &ByteArray::from(vec![
1992                        0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
1993                        44u8, 0u8, 0u8,
1994                    ])
1995                );
1996            } else {
1997                panic!("expecting Statistics::ByteArray");
1998            }
1999        } else {
2000            panic!("metadata missing statistics");
2001        }
2002    }
2003
2004    #[test]
2005    fn test_column_writer_uint32_converted_type_min_max() {
2006        let page_writer = get_test_page_writer();
2007        let props = Default::default();
2008        let mut writer = get_test_unsigned_int_given_as_converted_column_writer::<Int32Type>(
2009            page_writer,
2010            0,
2011            0,
2012            props,
2013        );
2014        writer.write_batch(&[0, 1, 2, 3, 4, 5], None, None).unwrap();
2015        let metadata = writer.close().unwrap().metadata;
2016        if let Some(stats) = metadata.statistics() {
2017            if let Statistics::Int32(stats) = stats {
2018                assert_eq!(stats.min_opt().unwrap(), &0,);
2019                assert_eq!(stats.max_opt().unwrap(), &5,);
2020            } else {
2021                panic!("expecting Statistics::Int32");
2022            }
2023        } else {
2024            panic!("metadata missing statistics");
2025        }
2026    }
2027
2028    #[test]
2029    fn test_column_writer_precalculated_statistics() {
2030        let page_writer = get_test_page_writer();
2031        let props = Arc::new(
2032            WriterProperties::builder()
2033                .set_statistics_enabled(EnabledStatistics::Chunk)
2034                .build(),
2035        );
2036        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2037        writer
2038            .write_batch_with_statistics(
2039                &[1, 2, 3, 4],
2040                None,
2041                None,
2042                Some(&-17),
2043                Some(&9000),
2044                Some(55),
2045            )
2046            .unwrap();
2047
2048        let r = writer.close().unwrap();
2049        assert_eq!(r.bytes_written, 20);
2050        assert_eq!(r.rows_written, 4);
2051
2052        let metadata = r.metadata;
2053        assert_eq!(
2054            metadata.encodings(),
2055            &vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
2056        );
2057        assert_eq!(metadata.num_values(), 4);
2058        assert_eq!(metadata.compressed_size(), 20);
2059        assert_eq!(metadata.uncompressed_size(), 20);
2060        assert_eq!(metadata.data_page_offset(), 0);
2061        assert_eq!(metadata.dictionary_page_offset(), Some(0));
2062        if let Some(stats) = metadata.statistics() {
2063            assert_eq!(stats.null_count_opt(), Some(0));
2064            assert_eq!(stats.distinct_count_opt().unwrap_or(0), 55);
2065            if let Statistics::Int32(stats) = stats {
2066                assert_eq!(stats.min_opt().unwrap(), &-17);
2067                assert_eq!(stats.max_opt().unwrap(), &9000);
2068            } else {
2069                panic!("expecting Statistics::Int32");
2070            }
2071        } else {
2072            panic!("metadata missing statistics");
2073        }
2074    }
2075
2076    #[test]
2077    fn test_mixed_precomputed_statistics() {
2078        let mut buf = Vec::with_capacity(100);
2079        let mut write = TrackedWrite::new(&mut buf);
2080        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2081        let props = Default::default();
2082        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2083
2084        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2085        writer
2086            .write_batch_with_statistics(&[5, 6, 7], None, None, Some(&5), Some(&7), Some(3))
2087            .unwrap();
2088
2089        let r = writer.close().unwrap();
2090
2091        let stats = r.metadata.statistics().unwrap();
2092        assert_eq!(stats.min_bytes_opt().unwrap(), 1_i32.to_le_bytes());
2093        assert_eq!(stats.max_bytes_opt().unwrap(), 7_i32.to_le_bytes());
2094        assert_eq!(stats.null_count_opt(), Some(0));
2095        assert!(stats.distinct_count_opt().is_none());
2096
2097        drop(write);
2098
2099        let props = ReaderProperties::builder()
2100            .set_backward_compatible_lz4(false)
2101            .build();
2102        let reader = SerializedPageReader::new_with_properties(
2103            Arc::new(Bytes::from(buf)),
2104            &r.metadata,
2105            r.rows_written as usize,
2106            None,
2107            Arc::new(props),
2108        )
2109        .unwrap();
2110
2111        let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2112        assert_eq!(pages.len(), 2);
2113
2114        assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2115        assert_eq!(pages[1].page_type(), PageType::DATA_PAGE);
2116
2117        let page_statistics = pages[1].statistics().unwrap();
2118        assert_eq!(
2119            page_statistics.min_bytes_opt().unwrap(),
2120            1_i32.to_le_bytes()
2121        );
2122        assert_eq!(
2123            page_statistics.max_bytes_opt().unwrap(),
2124            7_i32.to_le_bytes()
2125        );
2126        assert_eq!(page_statistics.null_count_opt(), Some(0));
2127        assert!(page_statistics.distinct_count_opt().is_none());
2128    }
2129
2130    #[test]
2131    fn test_disabled_statistics() {
2132        let mut buf = Vec::with_capacity(100);
2133        let mut write = TrackedWrite::new(&mut buf);
2134        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2135        let props = WriterProperties::builder()
2136            .set_statistics_enabled(EnabledStatistics::None)
2137            .set_writer_version(WriterVersion::PARQUET_2_0)
2138            .build();
2139        let props = Arc::new(props);
2140
2141        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
2142        writer
2143            .write_batch(&[1, 2, 3, 4], Some(&[1, 0, 0, 1, 1, 1]), None)
2144            .unwrap();
2145
2146        let r = writer.close().unwrap();
2147        assert!(r.metadata.statistics().is_none());
2148
2149        drop(write);
2150
2151        let props = ReaderProperties::builder()
2152            .set_backward_compatible_lz4(false)
2153            .build();
2154        let reader = SerializedPageReader::new_with_properties(
2155            Arc::new(Bytes::from(buf)),
2156            &r.metadata,
2157            r.rows_written as usize,
2158            None,
2159            Arc::new(props),
2160        )
2161        .unwrap();
2162
2163        let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2164        assert_eq!(pages.len(), 2);
2165
2166        assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2167        assert_eq!(pages[1].page_type(), PageType::DATA_PAGE_V2);
2168
2169        match &pages[1] {
2170            Page::DataPageV2 {
2171                num_values,
2172                num_nulls,
2173                num_rows,
2174                statistics,
2175                ..
2176            } => {
2177                assert_eq!(*num_values, 6);
2178                assert_eq!(*num_nulls, 2);
2179                assert_eq!(*num_rows, 6);
2180                assert!(statistics.is_none());
2181            }
2182            _ => unreachable!(),
2183        }
2184    }
2185
2186    #[test]
2187    fn test_column_writer_empty_column_roundtrip() {
2188        let props = Default::default();
2189        column_roundtrip::<Int32Type>(props, &[], None, None);
2190    }
2191
2192    #[test]
2193    fn test_column_writer_non_nullable_values_roundtrip() {
2194        let props = Default::default();
2195        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 0, 0);
2196    }
2197
2198    #[test]
2199    fn test_column_writer_nullable_non_repeated_values_roundtrip() {
2200        let props = Default::default();
2201        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 0);
2202    }
2203
2204    #[test]
2205    fn test_column_writer_nullable_repeated_values_roundtrip() {
2206        let props = Default::default();
2207        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2208    }
2209
2210    #[test]
2211    fn test_column_writer_dictionary_fallback_small_data_page() {
2212        let props = WriterProperties::builder()
2213            .set_dictionary_page_size_limit(32)
2214            .set_data_page_size_limit(32)
2215            .build();
2216        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2217    }
2218
2219    #[test]
2220    fn test_column_writer_small_write_batch_size() {
2221        for i in &[1usize, 2, 5, 10, 11, 1023] {
2222            let props = WriterProperties::builder().set_write_batch_size(*i).build();
2223
2224            column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2225        }
2226    }
2227
2228    #[test]
2229    fn test_column_writer_dictionary_disabled_v1() {
2230        let props = WriterProperties::builder()
2231            .set_writer_version(WriterVersion::PARQUET_1_0)
2232            .set_dictionary_enabled(false)
2233            .build();
2234        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2235    }
2236
2237    #[test]
2238    fn test_column_writer_dictionary_disabled_v2() {
2239        let props = WriterProperties::builder()
2240            .set_writer_version(WriterVersion::PARQUET_2_0)
2241            .set_dictionary_enabled(false)
2242            .build();
2243        column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2244    }
2245
2246    #[test]
2247    fn test_column_writer_compression_v1() {
2248        let props = WriterProperties::builder()
2249            .set_writer_version(WriterVersion::PARQUET_1_0)
2250            .set_compression(Compression::SNAPPY)
2251            .build();
2252        column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2253    }
2254
2255    #[test]
2256    fn test_column_writer_compression_v2() {
2257        let props = WriterProperties::builder()
2258            .set_writer_version(WriterVersion::PARQUET_2_0)
2259            .set_compression(Compression::SNAPPY)
2260            .build();
2261        column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2262    }
2263
2264    #[test]
2265    fn test_column_writer_add_data_pages_with_dict() {
2266        // ARROW-5129: Test verifies that we add data page in case of dictionary encoding
2267        // and no fallback occurred so far.
2268        let mut file = tempfile::tempfile().unwrap();
2269        let mut write = TrackedWrite::new(&mut file);
2270        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2271        let props = Arc::new(
2272            WriterProperties::builder()
2273                .set_data_page_size_limit(10)
2274                .set_write_batch_size(3) // write 3 values at a time
2275                .build(),
2276        );
2277        let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
2278        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2279        writer.write_batch(data, None, None).unwrap();
2280        let r = writer.close().unwrap();
2281
2282        drop(write);
2283
2284        // Read pages and check the sequence
2285        let props = ReaderProperties::builder()
2286            .set_backward_compatible_lz4(false)
2287            .build();
2288        let mut page_reader = Box::new(
2289            SerializedPageReader::new_with_properties(
2290                Arc::new(file),
2291                &r.metadata,
2292                r.rows_written as usize,
2293                None,
2294                Arc::new(props),
2295            )
2296            .unwrap(),
2297        );
2298        let mut res = Vec::new();
2299        while let Some(page) = page_reader.get_next_page().unwrap() {
2300            res.push((page.page_type(), page.num_values(), page.buffer().len()));
2301        }
2302        assert_eq!(
2303            res,
2304            vec![
2305                (PageType::DICTIONARY_PAGE, 10, 40),
2306                (PageType::DATA_PAGE, 9, 10),
2307                (PageType::DATA_PAGE, 1, 3),
2308            ]
2309        );
2310    }
2311
2312    #[test]
2313    fn test_bool_statistics() {
2314        let stats = statistics_roundtrip::<BoolType>(&[true, false, false, true]);
2315        // Booleans have an unsigned sort order and so are not compatible
2316        // with the deprecated `min` and `max` statistics
2317        assert!(!stats.is_min_max_backwards_compatible());
2318        if let Statistics::Boolean(stats) = stats {
2319            assert_eq!(stats.min_opt().unwrap(), &false);
2320            assert_eq!(stats.max_opt().unwrap(), &true);
2321        } else {
2322            panic!("expecting Statistics::Boolean, got {stats:?}");
2323        }
2324    }
2325
2326    #[test]
2327    fn test_int32_statistics() {
2328        let stats = statistics_roundtrip::<Int32Type>(&[-1, 3, -2, 2]);
2329        assert!(stats.is_min_max_backwards_compatible());
2330        if let Statistics::Int32(stats) = stats {
2331            assert_eq!(stats.min_opt().unwrap(), &-2);
2332            assert_eq!(stats.max_opt().unwrap(), &3);
2333        } else {
2334            panic!("expecting Statistics::Int32, got {stats:?}");
2335        }
2336    }
2337
2338    #[test]
2339    fn test_int64_statistics() {
2340        let stats = statistics_roundtrip::<Int64Type>(&[-1, 3, -2, 2]);
2341        assert!(stats.is_min_max_backwards_compatible());
2342        if let Statistics::Int64(stats) = stats {
2343            assert_eq!(stats.min_opt().unwrap(), &-2);
2344            assert_eq!(stats.max_opt().unwrap(), &3);
2345        } else {
2346            panic!("expecting Statistics::Int64, got {stats:?}");
2347        }
2348    }
2349
2350    #[test]
2351    fn test_int96_statistics() {
2352        let input = vec![
2353            Int96::from(vec![1, 20, 30]),
2354            Int96::from(vec![3, 20, 10]),
2355            Int96::from(vec![0, 20, 30]),
2356            Int96::from(vec![2, 20, 30]),
2357        ]
2358        .into_iter()
2359        .collect::<Vec<Int96>>();
2360
2361        let stats = statistics_roundtrip::<Int96Type>(&input);
2362        assert!(!stats.is_min_max_backwards_compatible());
2363        if let Statistics::Int96(stats) = stats {
2364            assert_eq!(stats.min_opt().unwrap(), &Int96::from(vec![0, 20, 30]));
2365            assert_eq!(stats.max_opt().unwrap(), &Int96::from(vec![3, 20, 10]));
2366        } else {
2367            panic!("expecting Statistics::Int96, got {stats:?}");
2368        }
2369    }
2370
2371    #[test]
2372    fn test_float_statistics() {
2373        let stats = statistics_roundtrip::<FloatType>(&[-1.0, 3.0, -2.0, 2.0]);
2374        assert!(stats.is_min_max_backwards_compatible());
2375        if let Statistics::Float(stats) = stats {
2376            assert_eq!(stats.min_opt().unwrap(), &-2.0);
2377            assert_eq!(stats.max_opt().unwrap(), &3.0);
2378        } else {
2379            panic!("expecting Statistics::Float, got {stats:?}");
2380        }
2381    }
2382
2383    #[test]
2384    fn test_double_statistics() {
2385        let stats = statistics_roundtrip::<DoubleType>(&[-1.0, 3.0, -2.0, 2.0]);
2386        assert!(stats.is_min_max_backwards_compatible());
2387        if let Statistics::Double(stats) = stats {
2388            assert_eq!(stats.min_opt().unwrap(), &-2.0);
2389            assert_eq!(stats.max_opt().unwrap(), &3.0);
2390        } else {
2391            panic!("expecting Statistics::Double, got {stats:?}");
2392        }
2393    }
2394
2395    #[test]
2396    fn test_byte_array_statistics() {
2397        let input = ["aawaa", "zz", "aaw", "m", "qrs"]
2398            .iter()
2399            .map(|&s| s.into())
2400            .collect::<Vec<_>>();
2401
2402        let stats = statistics_roundtrip::<ByteArrayType>(&input);
2403        assert!(!stats.is_min_max_backwards_compatible());
2404        if let Statistics::ByteArray(stats) = stats {
2405            assert_eq!(stats.min_opt().unwrap(), &ByteArray::from("aaw"));
2406            assert_eq!(stats.max_opt().unwrap(), &ByteArray::from("zz"));
2407        } else {
2408            panic!("expecting Statistics::ByteArray, got {stats:?}");
2409        }
2410    }
2411
2412    #[test]
2413    fn test_fixed_len_byte_array_statistics() {
2414        let input = ["aawaa", "zz   ", "aaw  ", "m    ", "qrs  "]
2415            .iter()
2416            .map(|&s| ByteArray::from(s).into())
2417            .collect::<Vec<_>>();
2418
2419        let stats = statistics_roundtrip::<FixedLenByteArrayType>(&input);
2420        assert!(!stats.is_min_max_backwards_compatible());
2421        if let Statistics::FixedLenByteArray(stats) = stats {
2422            let expected_min: FixedLenByteArray = ByteArray::from("aaw  ").into();
2423            assert_eq!(stats.min_opt().unwrap(), &expected_min);
2424            let expected_max: FixedLenByteArray = ByteArray::from("zz   ").into();
2425            assert_eq!(stats.max_opt().unwrap(), &expected_max);
2426        } else {
2427            panic!("expecting Statistics::FixedLenByteArray, got {stats:?}");
2428        }
2429    }
2430
2431    #[test]
2432    fn test_column_writer_check_float16_min_max() {
2433        let input = [
2434            -f16::ONE,
2435            f16::from_f32(3.0),
2436            -f16::from_f32(2.0),
2437            f16::from_f32(2.0),
2438        ]
2439        .into_iter()
2440        .map(|s| ByteArray::from(s).into())
2441        .collect::<Vec<_>>();
2442
2443        let stats = float16_statistics_roundtrip(&input);
2444        assert!(stats.is_min_max_backwards_compatible());
2445        assert_eq!(
2446            stats.min_opt().unwrap(),
2447            &ByteArray::from(-f16::from_f32(2.0))
2448        );
2449        assert_eq!(
2450            stats.max_opt().unwrap(),
2451            &ByteArray::from(f16::from_f32(3.0))
2452        );
2453    }
2454
2455    #[test]
2456    fn test_column_writer_check_float16_nan_middle() {
2457        let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
2458            .into_iter()
2459            .map(|s| ByteArray::from(s).into())
2460            .collect::<Vec<_>>();
2461
2462        let stats = float16_statistics_roundtrip(&input);
2463        assert!(stats.is_min_max_backwards_compatible());
2464        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2465        assert_eq!(
2466            stats.max_opt().unwrap(),
2467            &ByteArray::from(f16::ONE + f16::ONE)
2468        );
2469    }
2470
2471    #[test]
2472    fn test_float16_statistics_nan_middle() {
2473        let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
2474            .into_iter()
2475            .map(|s| ByteArray::from(s).into())
2476            .collect::<Vec<_>>();
2477
2478        let stats = float16_statistics_roundtrip(&input);
2479        assert!(stats.is_min_max_backwards_compatible());
2480        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2481        assert_eq!(
2482            stats.max_opt().unwrap(),
2483            &ByteArray::from(f16::ONE + f16::ONE)
2484        );
2485    }
2486
2487    #[test]
2488    fn test_float16_statistics_nan_start() {
2489        let input = [f16::NAN, f16::ONE, f16::ONE + f16::ONE]
2490            .into_iter()
2491            .map(|s| ByteArray::from(s).into())
2492            .collect::<Vec<_>>();
2493
2494        let stats = float16_statistics_roundtrip(&input);
2495        assert!(stats.is_min_max_backwards_compatible());
2496        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2497        assert_eq!(
2498            stats.max_opt().unwrap(),
2499            &ByteArray::from(f16::ONE + f16::ONE)
2500        );
2501    }
2502
2503    #[test]
2504    fn test_float16_statistics_nan_only() {
2505        let input = [f16::NAN, f16::NAN]
2506            .into_iter()
2507            .map(|s| ByteArray::from(s).into())
2508            .collect::<Vec<_>>();
2509
2510        let stats = float16_statistics_roundtrip(&input);
2511        assert!(stats.min_bytes_opt().is_none());
2512        assert!(stats.max_bytes_opt().is_none());
2513        assert!(stats.is_min_max_backwards_compatible());
2514    }
2515
2516    #[test]
2517    fn test_float16_statistics_zero_only() {
2518        let input = [f16::ZERO]
2519            .into_iter()
2520            .map(|s| ByteArray::from(s).into())
2521            .collect::<Vec<_>>();
2522
2523        let stats = float16_statistics_roundtrip(&input);
2524        assert!(stats.is_min_max_backwards_compatible());
2525        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2526        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2527    }
2528
2529    #[test]
2530    fn test_float16_statistics_neg_zero_only() {
2531        let input = [f16::NEG_ZERO]
2532            .into_iter()
2533            .map(|s| ByteArray::from(s).into())
2534            .collect::<Vec<_>>();
2535
2536        let stats = float16_statistics_roundtrip(&input);
2537        assert!(stats.is_min_max_backwards_compatible());
2538        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2539        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2540    }
2541
2542    #[test]
2543    fn test_float16_statistics_zero_min() {
2544        let input = [f16::ZERO, f16::ONE, f16::NAN, f16::PI]
2545            .into_iter()
2546            .map(|s| ByteArray::from(s).into())
2547            .collect::<Vec<_>>();
2548
2549        let stats = float16_statistics_roundtrip(&input);
2550        assert!(stats.is_min_max_backwards_compatible());
2551        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2552        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::PI));
2553    }
2554
2555    #[test]
2556    fn test_float16_statistics_neg_zero_max() {
2557        let input = [f16::NEG_ZERO, f16::NEG_ONE, f16::NAN, -f16::PI]
2558            .into_iter()
2559            .map(|s| ByteArray::from(s).into())
2560            .collect::<Vec<_>>();
2561
2562        let stats = float16_statistics_roundtrip(&input);
2563        assert!(stats.is_min_max_backwards_compatible());
2564        assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(-f16::PI));
2565        assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2566    }
2567
2568    #[test]
2569    fn test_float_statistics_nan_middle() {
2570        let stats = statistics_roundtrip::<FloatType>(&[1.0, f32::NAN, 2.0]);
2571        assert!(stats.is_min_max_backwards_compatible());
2572        if let Statistics::Float(stats) = stats {
2573            assert_eq!(stats.min_opt().unwrap(), &1.0);
2574            assert_eq!(stats.max_opt().unwrap(), &2.0);
2575        } else {
2576            panic!("expecting Statistics::Float");
2577        }
2578    }
2579
2580    #[test]
2581    fn test_float_statistics_nan_start() {
2582        let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, 1.0, 2.0]);
2583        assert!(stats.is_min_max_backwards_compatible());
2584        if let Statistics::Float(stats) = stats {
2585            assert_eq!(stats.min_opt().unwrap(), &1.0);
2586            assert_eq!(stats.max_opt().unwrap(), &2.0);
2587        } else {
2588            panic!("expecting Statistics::Float");
2589        }
2590    }
2591
2592    #[test]
2593    fn test_float_statistics_nan_only() {
2594        let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, f32::NAN]);
2595        assert!(stats.min_bytes_opt().is_none());
2596        assert!(stats.max_bytes_opt().is_none());
2597        assert!(stats.is_min_max_backwards_compatible());
2598        assert!(matches!(stats, Statistics::Float(_)));
2599    }
2600
2601    #[test]
2602    fn test_float_statistics_zero_only() {
2603        let stats = statistics_roundtrip::<FloatType>(&[0.0]);
2604        assert!(stats.is_min_max_backwards_compatible());
2605        if let Statistics::Float(stats) = stats {
2606            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2607            assert!(stats.min_opt().unwrap().is_sign_negative());
2608            assert_eq!(stats.max_opt().unwrap(), &0.0);
2609            assert!(stats.max_opt().unwrap().is_sign_positive());
2610        } else {
2611            panic!("expecting Statistics::Float");
2612        }
2613    }
2614
2615    #[test]
2616    fn test_float_statistics_neg_zero_only() {
2617        let stats = statistics_roundtrip::<FloatType>(&[-0.0]);
2618        assert!(stats.is_min_max_backwards_compatible());
2619        if let Statistics::Float(stats) = stats {
2620            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2621            assert!(stats.min_opt().unwrap().is_sign_negative());
2622            assert_eq!(stats.max_opt().unwrap(), &0.0);
2623            assert!(stats.max_opt().unwrap().is_sign_positive());
2624        } else {
2625            panic!("expecting Statistics::Float");
2626        }
2627    }
2628
2629    #[test]
2630    fn test_float_statistics_zero_min() {
2631        let stats = statistics_roundtrip::<FloatType>(&[0.0, 1.0, f32::NAN, 2.0]);
2632        assert!(stats.is_min_max_backwards_compatible());
2633        if let Statistics::Float(stats) = stats {
2634            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2635            assert!(stats.min_opt().unwrap().is_sign_negative());
2636            assert_eq!(stats.max_opt().unwrap(), &2.0);
2637        } else {
2638            panic!("expecting Statistics::Float");
2639        }
2640    }
2641
2642    #[test]
2643    fn test_float_statistics_neg_zero_max() {
2644        let stats = statistics_roundtrip::<FloatType>(&[-0.0, -1.0, f32::NAN, -2.0]);
2645        assert!(stats.is_min_max_backwards_compatible());
2646        if let Statistics::Float(stats) = stats {
2647            assert_eq!(stats.min_opt().unwrap(), &-2.0);
2648            assert_eq!(stats.max_opt().unwrap(), &0.0);
2649            assert!(stats.max_opt().unwrap().is_sign_positive());
2650        } else {
2651            panic!("expecting Statistics::Float");
2652        }
2653    }
2654
2655    #[test]
2656    fn test_double_statistics_nan_middle() {
2657        let stats = statistics_roundtrip::<DoubleType>(&[1.0, f64::NAN, 2.0]);
2658        assert!(stats.is_min_max_backwards_compatible());
2659        if let Statistics::Double(stats) = stats {
2660            assert_eq!(stats.min_opt().unwrap(), &1.0);
2661            assert_eq!(stats.max_opt().unwrap(), &2.0);
2662        } else {
2663            panic!("expecting Statistics::Double");
2664        }
2665    }
2666
2667    #[test]
2668    fn test_double_statistics_nan_start() {
2669        let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, 1.0, 2.0]);
2670        assert!(stats.is_min_max_backwards_compatible());
2671        if let Statistics::Double(stats) = stats {
2672            assert_eq!(stats.min_opt().unwrap(), &1.0);
2673            assert_eq!(stats.max_opt().unwrap(), &2.0);
2674        } else {
2675            panic!("expecting Statistics::Double");
2676        }
2677    }
2678
2679    #[test]
2680    fn test_double_statistics_nan_only() {
2681        let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, f64::NAN]);
2682        assert!(stats.min_bytes_opt().is_none());
2683        assert!(stats.max_bytes_opt().is_none());
2684        assert!(matches!(stats, Statistics::Double(_)));
2685        assert!(stats.is_min_max_backwards_compatible());
2686    }
2687
2688    #[test]
2689    fn test_double_statistics_zero_only() {
2690        let stats = statistics_roundtrip::<DoubleType>(&[0.0]);
2691        assert!(stats.is_min_max_backwards_compatible());
2692        if let Statistics::Double(stats) = stats {
2693            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2694            assert!(stats.min_opt().unwrap().is_sign_negative());
2695            assert_eq!(stats.max_opt().unwrap(), &0.0);
2696            assert!(stats.max_opt().unwrap().is_sign_positive());
2697        } else {
2698            panic!("expecting Statistics::Double");
2699        }
2700    }
2701
2702    #[test]
2703    fn test_double_statistics_neg_zero_only() {
2704        let stats = statistics_roundtrip::<DoubleType>(&[-0.0]);
2705        assert!(stats.is_min_max_backwards_compatible());
2706        if let Statistics::Double(stats) = stats {
2707            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2708            assert!(stats.min_opt().unwrap().is_sign_negative());
2709            assert_eq!(stats.max_opt().unwrap(), &0.0);
2710            assert!(stats.max_opt().unwrap().is_sign_positive());
2711        } else {
2712            panic!("expecting Statistics::Double");
2713        }
2714    }
2715
2716    #[test]
2717    fn test_double_statistics_zero_min() {
2718        let stats = statistics_roundtrip::<DoubleType>(&[0.0, 1.0, f64::NAN, 2.0]);
2719        assert!(stats.is_min_max_backwards_compatible());
2720        if let Statistics::Double(stats) = stats {
2721            assert_eq!(stats.min_opt().unwrap(), &-0.0);
2722            assert!(stats.min_opt().unwrap().is_sign_negative());
2723            assert_eq!(stats.max_opt().unwrap(), &2.0);
2724        } else {
2725            panic!("expecting Statistics::Double");
2726        }
2727    }
2728
2729    #[test]
2730    fn test_double_statistics_neg_zero_max() {
2731        let stats = statistics_roundtrip::<DoubleType>(&[-0.0, -1.0, f64::NAN, -2.0]);
2732        assert!(stats.is_min_max_backwards_compatible());
2733        if let Statistics::Double(stats) = stats {
2734            assert_eq!(stats.min_opt().unwrap(), &-2.0);
2735            assert_eq!(stats.max_opt().unwrap(), &0.0);
2736            assert!(stats.max_opt().unwrap().is_sign_positive());
2737        } else {
2738            panic!("expecting Statistics::Double");
2739        }
2740    }
2741
2742    #[test]
2743    fn test_compare_greater_byte_array_decimals() {
2744        assert!(!compare_greater_byte_array_decimals(&[], &[],),);
2745        assert!(compare_greater_byte_array_decimals(&[1u8,], &[],),);
2746        assert!(!compare_greater_byte_array_decimals(&[], &[1u8,],),);
2747        assert!(compare_greater_byte_array_decimals(&[1u8,], &[0u8,],),);
2748        assert!(!compare_greater_byte_array_decimals(&[1u8,], &[1u8,],),);
2749        assert!(compare_greater_byte_array_decimals(&[1u8, 0u8,], &[0u8,],),);
2750        assert!(!compare_greater_byte_array_decimals(
2751            &[0u8, 1u8,],
2752            &[1u8, 0u8,],
2753        ),);
2754        assert!(!compare_greater_byte_array_decimals(
2755            &[255u8, 35u8, 0u8, 0u8,],
2756            &[0u8,],
2757        ),);
2758        assert!(compare_greater_byte_array_decimals(
2759            &[0u8,],
2760            &[255u8, 35u8, 0u8, 0u8,],
2761        ),);
2762    }
2763
2764    #[test]
2765    fn test_column_index_with_null_pages() {
2766        // write a single page of all nulls
2767        let page_writer = get_test_page_writer();
2768        let props = Default::default();
2769        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
2770        writer.write_batch(&[], Some(&[0, 0, 0, 0]), None).unwrap();
2771
2772        let r = writer.close().unwrap();
2773        assert!(r.column_index.is_some());
2774        let col_idx = r.column_index.unwrap();
2775        // null_pages should be true for page 0
2776        assert!(col_idx.null_pages[0]);
2777        // min and max should be empty byte arrays
2778        assert_eq!(col_idx.min_values[0].len(), 0);
2779        assert_eq!(col_idx.max_values[0].len(), 0);
2780        // null_counts should be defined and be 4 for page 0
2781        assert!(col_idx.null_counts.is_some());
2782        assert_eq!(col_idx.null_counts.as_ref().unwrap()[0], 4);
2783        // there is no repetition so rep histogram should be absent
2784        assert!(col_idx.repetition_level_histograms.is_none());
2785        // definition_level_histogram should be present and should be 0:4, 1:0
2786        assert!(col_idx.definition_level_histograms.is_some());
2787        assert_eq!(col_idx.definition_level_histograms.unwrap(), &[4, 0]);
2788    }
2789
2790    #[test]
2791    fn test_column_offset_index_metadata() {
2792        // write data
2793        // and check the offset index and column index
2794        let page_writer = get_test_page_writer();
2795        let props = Default::default();
2796        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2797        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2798        // first page
2799        writer.flush_data_pages().unwrap();
2800        // second page
2801        writer.write_batch(&[4, 8, 2, -5], None, None).unwrap();
2802
2803        let r = writer.close().unwrap();
2804        let column_index = r.column_index.unwrap();
2805        let offset_index = r.offset_index.unwrap();
2806
2807        assert_eq!(8, r.rows_written);
2808
2809        // column index
2810        assert_eq!(2, column_index.null_pages.len());
2811        assert_eq!(2, offset_index.page_locations.len());
2812        assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order);
2813        for idx in 0..2 {
2814            assert!(!column_index.null_pages[idx]);
2815            assert_eq!(0, column_index.null_counts.as_ref().unwrap()[idx]);
2816        }
2817
2818        if let Some(stats) = r.metadata.statistics() {
2819            assert_eq!(stats.null_count_opt(), Some(0));
2820            assert_eq!(stats.distinct_count_opt(), None);
2821            if let Statistics::Int32(stats) = stats {
2822                // first page is [1,2,3,4]
2823                // second page is [-5,2,4,8]
2824                // note that we don't increment here, as this is a non BinaryArray type.
2825                assert_eq!(
2826                    stats.min_bytes_opt(),
2827                    Some(column_index.min_values[1].as_slice())
2828                );
2829                assert_eq!(
2830                    stats.max_bytes_opt(),
2831                    column_index.max_values.get(1).map(Vec::as_slice)
2832                );
2833            } else {
2834                panic!("expecting Statistics::Int32");
2835            }
2836        } else {
2837            panic!("metadata missing statistics");
2838        }
2839
2840        // page location
2841        assert_eq!(0, offset_index.page_locations[0].first_row_index);
2842        assert_eq!(4, offset_index.page_locations[1].first_row_index);
2843    }
2844
2845    /// Verify min/max value truncation in the column index works as expected
2846    #[test]
2847    fn test_column_offset_index_metadata_truncating() {
2848        // write data
2849        // and check the offset index and column index
2850        let page_writer = get_test_page_writer();
2851        let props = Default::default();
2852        let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
2853
2854        let mut data = vec![FixedLenByteArray::default(); 3];
2855        // This is the expected min value - "aaa..."
2856        data[0].set_data(Bytes::from(vec![97_u8; 200]));
2857        // This is the expected max value - "ZZZ..."
2858        data[1].set_data(Bytes::from(vec![112_u8; 200]));
2859        data[2].set_data(Bytes::from(vec![98_u8; 200]));
2860
2861        writer.write_batch(&data, None, None).unwrap();
2862
2863        writer.flush_data_pages().unwrap();
2864
2865        let r = writer.close().unwrap();
2866        let column_index = r.column_index.unwrap();
2867        let offset_index = r.offset_index.unwrap();
2868
2869        assert_eq!(3, r.rows_written);
2870
2871        // column index
2872        assert_eq!(1, column_index.null_pages.len());
2873        assert_eq!(1, offset_index.page_locations.len());
2874        assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
2875        assert!(!column_index.null_pages[0]);
2876        assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]);
2877
2878        if let Some(stats) = r.metadata.statistics() {
2879            assert_eq!(stats.null_count_opt(), Some(0));
2880            assert_eq!(stats.distinct_count_opt(), None);
2881            if let Statistics::FixedLenByteArray(stats) = stats {
2882                let column_index_min_value = &column_index.min_values[0];
2883                let column_index_max_value = &column_index.max_values[0];
2884
2885                // Column index stats are truncated, while the column chunk's aren't.
2886                assert_ne!(
2887                    stats.min_bytes_opt(),
2888                    Some(column_index_min_value.as_slice())
2889                );
2890                assert_ne!(
2891                    stats.max_bytes_opt(),
2892                    Some(column_index_max_value.as_slice())
2893                );
2894
2895                assert_eq!(
2896                    column_index_min_value.len(),
2897                    DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
2898                );
2899                assert_eq!(column_index_min_value.as_slice(), &[97_u8; 64]);
2900                assert_eq!(
2901                    column_index_max_value.len(),
2902                    DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
2903                );
2904
2905                // We expect the last byte to be incremented
2906                assert_eq!(
2907                    *column_index_max_value.last().unwrap(),
2908                    *column_index_max_value.first().unwrap() + 1
2909                );
2910            } else {
2911                panic!("expecting Statistics::FixedLenByteArray");
2912            }
2913        } else {
2914            panic!("metadata missing statistics");
2915        }
2916    }
2917
2918    #[test]
2919    fn test_column_offset_index_truncating_spec_example() {
2920        // write data
2921        // and check the offset index and column index
2922        let page_writer = get_test_page_writer();
2923
2924        // Truncate values at 1 byte
2925        let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
2926        let props = Arc::new(builder.build());
2927        let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
2928
2929        let mut data = vec![FixedLenByteArray::default(); 1];
2930        // This is the expected min value
2931        data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
2932
2933        writer.write_batch(&data, None, None).unwrap();
2934
2935        writer.flush_data_pages().unwrap();
2936
2937        let r = writer.close().unwrap();
2938        let column_index = r.column_index.unwrap();
2939        let offset_index = r.offset_index.unwrap();
2940
2941        assert_eq!(1, r.rows_written);
2942
2943        // column index
2944        assert_eq!(1, column_index.null_pages.len());
2945        assert_eq!(1, offset_index.page_locations.len());
2946        assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
2947        assert!(!column_index.null_pages[0]);
2948        assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]);
2949
2950        if let Some(stats) = r.metadata.statistics() {
2951            assert_eq!(stats.null_count_opt(), Some(0));
2952            assert_eq!(stats.distinct_count_opt(), None);
2953            if let Statistics::FixedLenByteArray(_stats) = stats {
2954                let column_index_min_value = &column_index.min_values[0];
2955                let column_index_max_value = &column_index.max_values[0];
2956
2957                assert_eq!(column_index_min_value.len(), 1);
2958                assert_eq!(column_index_max_value.len(), 1);
2959
2960                assert_eq!("B".as_bytes(), column_index_min_value.as_slice());
2961                assert_eq!("C".as_bytes(), column_index_max_value.as_slice());
2962
2963                assert_ne!(column_index_min_value, stats.min_bytes_opt().unwrap());
2964                assert_ne!(column_index_max_value, stats.max_bytes_opt().unwrap());
2965            } else {
2966                panic!("expecting Statistics::FixedLenByteArray");
2967            }
2968        } else {
2969            panic!("metadata missing statistics");
2970        }
2971    }
2972
2973    #[test]
2974    fn test_float16_min_max_no_truncation() {
2975        // Even if we set truncation to occur at 1 byte, we should not truncate for Float16
2976        let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
2977        let props = Arc::new(builder.build());
2978        let page_writer = get_test_page_writer();
2979        let mut writer = get_test_float16_column_writer(page_writer, props);
2980
2981        let expected_value = f16::PI.to_le_bytes().to_vec();
2982        let data = vec![ByteArray::from(expected_value.clone()).into()];
2983        writer.write_batch(&data, None, None).unwrap();
2984        writer.flush_data_pages().unwrap();
2985
2986        let r = writer.close().unwrap();
2987
2988        // stats should still be written
2989        // ensure bytes weren't truncated for column index
2990        let column_index = r.column_index.unwrap();
2991        let column_index_min_bytes = column_index.min_values[0].as_slice();
2992        let column_index_max_bytes = column_index.max_values[0].as_slice();
2993        assert_eq!(expected_value, column_index_min_bytes);
2994        assert_eq!(expected_value, column_index_max_bytes);
2995
2996        // ensure bytes weren't truncated for statistics
2997        let stats = r.metadata.statistics().unwrap();
2998        if let Statistics::FixedLenByteArray(stats) = stats {
2999            let stats_min_bytes = stats.min_bytes_opt().unwrap();
3000            let stats_max_bytes = stats.max_bytes_opt().unwrap();
3001            assert_eq!(expected_value, stats_min_bytes);
3002            assert_eq!(expected_value, stats_max_bytes);
3003        } else {
3004            panic!("expecting Statistics::FixedLenByteArray");
3005        }
3006    }
3007
3008    #[test]
3009    fn test_decimal_min_max_no_truncation() {
3010        // Even if we set truncation to occur at 1 byte, we should not truncate for Decimal
3011        let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3012        let props = Arc::new(builder.build());
3013        let page_writer = get_test_page_writer();
3014        let mut writer =
3015            get_test_decimals_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3016
3017        let expected_value = vec![
3018            255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8, 35u8,
3019            231u8, 90u8, 0u8, 0u8,
3020        ];
3021        let data = vec![ByteArray::from(expected_value.clone()).into()];
3022        writer.write_batch(&data, None, None).unwrap();
3023        writer.flush_data_pages().unwrap();
3024
3025        let r = writer.close().unwrap();
3026
3027        // stats should still be written
3028        // ensure bytes weren't truncated for column index
3029        let column_index = r.column_index.unwrap();
3030        let column_index_min_bytes = column_index.min_values[0].as_slice();
3031        let column_index_max_bytes = column_index.max_values[0].as_slice();
3032        assert_eq!(expected_value, column_index_min_bytes);
3033        assert_eq!(expected_value, column_index_max_bytes);
3034
3035        // ensure bytes weren't truncated for statistics
3036        let stats = r.metadata.statistics().unwrap();
3037        if let Statistics::FixedLenByteArray(stats) = stats {
3038            let stats_min_bytes = stats.min_bytes_opt().unwrap();
3039            let stats_max_bytes = stats.max_bytes_opt().unwrap();
3040            assert_eq!(expected_value, stats_min_bytes);
3041            assert_eq!(expected_value, stats_max_bytes);
3042        } else {
3043            panic!("expecting Statistics::FixedLenByteArray");
3044        }
3045    }
3046
3047    #[test]
3048    fn test_statistics_truncating_byte_array() {
3049        let page_writer = get_test_page_writer();
3050
3051        const TEST_TRUNCATE_LENGTH: usize = 1;
3052
3053        // Truncate values at 1 byte
3054        let builder =
3055            WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3056        let props = Arc::new(builder.build());
3057        let mut writer = get_test_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
3058
3059        let mut data = vec![ByteArray::default(); 1];
3060        // This is the expected min value
3061        data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
3062
3063        writer.write_batch(&data, None, None).unwrap();
3064
3065        writer.flush_data_pages().unwrap();
3066
3067        let r = writer.close().unwrap();
3068
3069        assert_eq!(1, r.rows_written);
3070
3071        let stats = r.metadata.statistics().expect("statistics");
3072        assert_eq!(stats.null_count_opt(), Some(0));
3073        assert_eq!(stats.distinct_count_opt(), None);
3074        if let Statistics::ByteArray(_stats) = stats {
3075            let min_value = _stats.min_opt().unwrap();
3076            let max_value = _stats.max_opt().unwrap();
3077
3078            assert!(!_stats.min_is_exact());
3079            assert!(!_stats.max_is_exact());
3080
3081            assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3082            assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3083
3084            assert_eq!("B".as_bytes(), min_value.as_bytes());
3085            assert_eq!("C".as_bytes(), max_value.as_bytes());
3086        } else {
3087            panic!("expecting Statistics::ByteArray");
3088        }
3089    }
3090
3091    #[test]
3092    fn test_statistics_truncating_fixed_len_byte_array() {
3093        let page_writer = get_test_page_writer();
3094
3095        const TEST_TRUNCATE_LENGTH: usize = 1;
3096
3097        // Truncate values at 1 byte
3098        let builder =
3099            WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3100        let props = Arc::new(builder.build());
3101        let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3102
3103        let mut data = vec![FixedLenByteArray::default(); 1];
3104
3105        const PSEUDO_DECIMAL_VALUE: i128 = 6541894651216648486512564456564654;
3106        const PSEUDO_DECIMAL_BYTES: [u8; 16] = PSEUDO_DECIMAL_VALUE.to_be_bytes();
3107
3108        const EXPECTED_MIN: [u8; TEST_TRUNCATE_LENGTH] = [PSEUDO_DECIMAL_BYTES[0]]; // parquet specifies big-endian order for decimals
3109        const EXPECTED_MAX: [u8; TEST_TRUNCATE_LENGTH] =
3110            [PSEUDO_DECIMAL_BYTES[0].overflowing_add(1).0];
3111
3112        // This is the expected min value
3113        data[0].set_data(Bytes::from(PSEUDO_DECIMAL_BYTES.as_slice()));
3114
3115        writer.write_batch(&data, None, None).unwrap();
3116
3117        writer.flush_data_pages().unwrap();
3118
3119        let r = writer.close().unwrap();
3120
3121        assert_eq!(1, r.rows_written);
3122
3123        let stats = r.metadata.statistics().expect("statistics");
3124        assert_eq!(stats.null_count_opt(), Some(0));
3125        assert_eq!(stats.distinct_count_opt(), None);
3126        if let Statistics::FixedLenByteArray(_stats) = stats {
3127            let min_value = _stats.min_opt().unwrap();
3128            let max_value = _stats.max_opt().unwrap();
3129
3130            assert!(!_stats.min_is_exact());
3131            assert!(!_stats.max_is_exact());
3132
3133            assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3134            assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3135
3136            assert_eq!(EXPECTED_MIN.as_slice(), min_value.as_bytes());
3137            assert_eq!(EXPECTED_MAX.as_slice(), max_value.as_bytes());
3138
3139            let reconstructed_min = i128::from_be_bytes([
3140                min_value.as_bytes()[0],
3141                0,
3142                0,
3143                0,
3144                0,
3145                0,
3146                0,
3147                0,
3148                0,
3149                0,
3150                0,
3151                0,
3152                0,
3153                0,
3154                0,
3155                0,
3156            ]);
3157
3158            let reconstructed_max = i128::from_be_bytes([
3159                max_value.as_bytes()[0],
3160                0,
3161                0,
3162                0,
3163                0,
3164                0,
3165                0,
3166                0,
3167                0,
3168                0,
3169                0,
3170                0,
3171                0,
3172                0,
3173                0,
3174                0,
3175            ]);
3176
3177            // check that the inner value is correctly bounded by the min/max
3178            println!("min: {reconstructed_min} {PSEUDO_DECIMAL_VALUE}");
3179            assert!(reconstructed_min <= PSEUDO_DECIMAL_VALUE);
3180            println!("max {reconstructed_max} {PSEUDO_DECIMAL_VALUE}");
3181            assert!(reconstructed_max >= PSEUDO_DECIMAL_VALUE);
3182        } else {
3183            panic!("expecting Statistics::FixedLenByteArray");
3184        }
3185    }
3186
3187    #[test]
3188    fn test_send() {
3189        fn test<T: Send>() {}
3190        test::<ColumnWriterImpl<Int32Type>>();
3191    }
3192
3193    #[test]
3194    fn test_increment() {
3195        let v = increment(vec![0, 0, 0]).unwrap();
3196        assert_eq!(&v, &[0, 0, 1]);
3197
3198        // Handle overflow
3199        let v = increment(vec![0, 255, 255]).unwrap();
3200        assert_eq!(&v, &[1, 0, 0]);
3201
3202        // Return `None` if all bytes are u8::MAX
3203        let v = increment(vec![255, 255, 255]);
3204        assert!(v.is_none());
3205    }
3206
3207    #[test]
3208    fn test_increment_utf8() {
3209        let test_inc = |o: &str, expected: &str| {
3210            if let Ok(v) = String::from_utf8(increment_utf8(o).unwrap()) {
3211                // Got the expected result...
3212                assert_eq!(v, expected);
3213                // and it's greater than the original string
3214                assert!(*v > *o);
3215                // Also show that BinaryArray level comparison works here
3216                let mut greater = ByteArray::new();
3217                greater.set_data(Bytes::from(v));
3218                let mut original = ByteArray::new();
3219                original.set_data(Bytes::from(o.as_bytes().to_vec()));
3220                assert!(greater > original);
3221            } else {
3222                panic!("Expected incremented UTF8 string to also be valid.");
3223            }
3224        };
3225
3226        // Basic ASCII case
3227        test_inc("hello", "hellp");
3228
3229        // 1-byte ending in max 1-byte
3230        test_inc("a\u{7f}", "b");
3231
3232        // 1-byte max should not truncate as it would need 2-byte code points
3233        assert!(increment_utf8("\u{7f}\u{7f}").is_none());
3234
3235        // UTF8 string
3236        test_inc("❤️🧡💛💚💙💜", "❤️🧡💛💚💙💝");
3237
3238        // 2-byte without overflow
3239        test_inc("éééé", "éééê");
3240
3241        // 2-byte that overflows lowest byte
3242        test_inc("\u{ff}\u{ff}", "\u{ff}\u{100}");
3243
3244        // 2-byte ending in max 2-byte
3245        test_inc("a\u{7ff}", "b");
3246
3247        // Max 2-byte should not truncate as it would need 3-byte code points
3248        assert!(increment_utf8("\u{7ff}\u{7ff}").is_none());
3249
3250        // 3-byte without overflow [U+800, U+800] -> [U+800, U+801] (note that these
3251        // characters should render right to left).
3252        test_inc("ࠀࠀ", "ࠀࠁ");
3253
3254        // 3-byte ending in max 3-byte
3255        test_inc("a\u{ffff}", "b");
3256
3257        // Max 3-byte should not truncate as it would need 4-byte code points
3258        assert!(increment_utf8("\u{ffff}\u{ffff}").is_none());
3259
3260        // 4-byte without overflow
3261        test_inc("𐀀𐀀", "𐀀𐀁");
3262
3263        // 4-byte ending in max unicode
3264        test_inc("a\u{10ffff}", "b");
3265
3266        // Max 4-byte should not truncate
3267        assert!(increment_utf8("\u{10ffff}\u{10ffff}").is_none());
3268
3269        // Skip over surrogate pair range (0xD800..=0xDFFF)
3270        //test_inc("a\u{D7FF}", "a\u{e000}");
3271        test_inc("a\u{D7FF}", "b");
3272    }
3273
3274    #[test]
3275    fn test_truncate_utf8() {
3276        // No-op
3277        let data = "❤️🧡💛💚💙💜";
3278        let r = truncate_utf8(data, data.len()).unwrap();
3279        assert_eq!(r.len(), data.len());
3280        assert_eq!(&r, data.as_bytes());
3281
3282        // We slice it away from the UTF8 boundary
3283        let r = truncate_utf8(data, 13).unwrap();
3284        assert_eq!(r.len(), 10);
3285        assert_eq!(&r, "❤️🧡".as_bytes());
3286
3287        // One multi-byte code point, and a length shorter than it, so we can't slice it
3288        let r = truncate_utf8("\u{0836}", 1);
3289        assert!(r.is_none());
3290
3291        // Test truncate and increment for max bounds on UTF-8 statistics
3292        // 7-bit (i.e. ASCII)
3293        let r = truncate_and_increment_utf8("yyyyyyyyy", 8).unwrap();
3294        assert_eq!(&r, "yyyyyyyz".as_bytes());
3295
3296        // 2-byte without overflow
3297        let r = truncate_and_increment_utf8("ééééé", 7).unwrap();
3298        assert_eq!(&r, "ééê".as_bytes());
3299
3300        // 2-byte that overflows lowest byte
3301        let r = truncate_and_increment_utf8("\u{ff}\u{ff}\u{ff}\u{ff}\u{ff}", 8).unwrap();
3302        assert_eq!(&r, "\u{ff}\u{ff}\u{ff}\u{100}".as_bytes());
3303
3304        // max 2-byte should not truncate as it would need 3-byte code points
3305        let r = truncate_and_increment_utf8("߿߿߿߿߿", 8);
3306        assert!(r.is_none());
3307
3308        // 3-byte without overflow [U+800, U+800, U+800] -> [U+800, U+801] (note that these
3309        // characters should render right to left).
3310        let r = truncate_and_increment_utf8("ࠀࠀࠀࠀ", 8).unwrap();
3311        assert_eq!(&r, "ࠀࠁ".as_bytes());
3312
3313        // max 3-byte should not truncate as it would need 4-byte code points
3314        let r = truncate_and_increment_utf8("\u{ffff}\u{ffff}\u{ffff}", 8);
3315        assert!(r.is_none());
3316
3317        // 4-byte without overflow
3318        let r = truncate_and_increment_utf8("𐀀𐀀𐀀𐀀", 9).unwrap();
3319        assert_eq!(&r, "𐀀𐀁".as_bytes());
3320
3321        // max 4-byte should not truncate
3322        let r = truncate_and_increment_utf8("\u{10ffff}\u{10ffff}", 8);
3323        assert!(r.is_none());
3324    }
3325
3326    #[test]
3327    // Check fallback truncation of statistics that should be UTF-8, but aren't
3328    // (see https://github.com/apache/arrow-rs/pull/6870).
3329    fn test_byte_array_truncate_invalid_utf8_statistics() {
3330        let message_type = "
3331            message test_schema {
3332                OPTIONAL BYTE_ARRAY a (UTF8);
3333            }
3334        ";
3335        let schema = Arc::new(parse_message_type(message_type).unwrap());
3336
3337        // Create Vec<ByteArray> containing non-UTF8 bytes
3338        let data = vec![ByteArray::from(vec![128u8; 32]); 7];
3339        let def_levels = [1, 1, 1, 1, 0, 1, 0, 1, 0, 1];
3340        let file: File = tempfile::tempfile().unwrap();
3341        let props = Arc::new(
3342            WriterProperties::builder()
3343                .set_statistics_enabled(EnabledStatistics::Chunk)
3344                .set_statistics_truncate_length(Some(8))
3345                .build(),
3346        );
3347
3348        let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
3349        let mut row_group_writer = writer.next_row_group().unwrap();
3350
3351        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
3352        col_writer
3353            .typed::<ByteArrayType>()
3354            .write_batch(&data, Some(&def_levels), None)
3355            .unwrap();
3356        col_writer.close().unwrap();
3357        row_group_writer.close().unwrap();
3358        let file_metadata = writer.close().unwrap();
3359        assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
3360        let stats = file_metadata.row_groups[0].columns[0]
3361            .meta_data
3362            .as_ref()
3363            .unwrap()
3364            .statistics
3365            .as_ref()
3366            .unwrap();
3367        assert!(!stats.is_max_value_exact.unwrap());
3368        // Truncation of invalid UTF-8 should fall back to binary truncation, so last byte should
3369        // be incremented by 1.
3370        assert_eq!(
3371            stats.max_value,
3372            Some([128, 128, 128, 128, 128, 128, 128, 129].to_vec())
3373        );
3374    }
3375
3376    #[test]
3377    fn test_increment_max_binary_chars() {
3378        let r = increment(vec![0xFF, 0xFE, 0xFD, 0xFF, 0xFF]);
3379        assert_eq!(&r.unwrap(), &[0xFF, 0xFE, 0xFE, 0x00, 0x00]);
3380
3381        let incremented = increment(vec![0xFF, 0xFF, 0xFF]);
3382        assert!(incremented.is_none())
3383    }
3384
3385    #[test]
3386    fn test_no_column_index_when_stats_disabled() {
3387        // https://github.com/apache/arrow-rs/issues/6010
3388        // Test that column index is not created/written for all-nulls column when page
3389        // statistics are disabled.
3390        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3391        let props = Arc::new(
3392            WriterProperties::builder()
3393                .set_statistics_enabled(EnabledStatistics::None)
3394                .build(),
3395        );
3396        let column_writer = get_column_writer(descr, props, get_test_page_writer());
3397        let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3398
3399        let data = Vec::new();
3400        let def_levels = vec![0; 10];
3401        writer.write_batch(&data, Some(&def_levels), None).unwrap();
3402        writer.flush_data_pages().unwrap();
3403
3404        let column_close_result = writer.close().unwrap();
3405        assert!(column_close_result.offset_index.is_some());
3406        assert!(column_close_result.column_index.is_none());
3407    }
3408
3409    #[test]
3410    fn test_no_offset_index_when_disabled() {
3411        // Test that offset indexes can be disabled
3412        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3413        let props = Arc::new(
3414            WriterProperties::builder()
3415                .set_statistics_enabled(EnabledStatistics::None)
3416                .set_offset_index_disabled(true)
3417                .build(),
3418        );
3419        let column_writer = get_column_writer(descr, props, get_test_page_writer());
3420        let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3421
3422        let data = Vec::new();
3423        let def_levels = vec![0; 10];
3424        writer.write_batch(&data, Some(&def_levels), None).unwrap();
3425        writer.flush_data_pages().unwrap();
3426
3427        let column_close_result = writer.close().unwrap();
3428        assert!(column_close_result.offset_index.is_none());
3429        assert!(column_close_result.column_index.is_none());
3430    }
3431
3432    #[test]
3433    fn test_offset_index_overridden() {
3434        // Test that offset indexes are not disabled when gathering page statistics
3435        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3436        let props = Arc::new(
3437            WriterProperties::builder()
3438                .set_statistics_enabled(EnabledStatistics::Page)
3439                .set_offset_index_disabled(true)
3440                .build(),
3441        );
3442        let column_writer = get_column_writer(descr, props, get_test_page_writer());
3443        let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3444
3445        let data = Vec::new();
3446        let def_levels = vec![0; 10];
3447        writer.write_batch(&data, Some(&def_levels), None).unwrap();
3448        writer.flush_data_pages().unwrap();
3449
3450        let column_close_result = writer.close().unwrap();
3451        assert!(column_close_result.offset_index.is_some());
3452        assert!(column_close_result.column_index.is_some());
3453    }
3454
3455    #[test]
3456    fn test_boundary_order() -> Result<()> {
3457        let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3458        // min max both ascending
3459        let column_close_result = write_multiple_pages::<Int32Type>(
3460            &descr,
3461            &[
3462                &[Some(-10), Some(10)],
3463                &[Some(-5), Some(11)],
3464                &[None],
3465                &[Some(-5), Some(11)],
3466            ],
3467        )?;
3468        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3469        assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3470
3471        // min max both descending
3472        let column_close_result = write_multiple_pages::<Int32Type>(
3473            &descr,
3474            &[
3475                &[Some(10), Some(11)],
3476                &[Some(5), Some(11)],
3477                &[None],
3478                &[Some(-5), Some(0)],
3479            ],
3480        )?;
3481        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3482        assert_eq!(boundary_order, BoundaryOrder::DESCENDING);
3483
3484        // min max both equal
3485        let column_close_result = write_multiple_pages::<Int32Type>(
3486            &descr,
3487            &[&[Some(10), Some(11)], &[None], &[Some(10), Some(11)]],
3488        )?;
3489        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3490        assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3491
3492        // only nulls
3493        let column_close_result =
3494            write_multiple_pages::<Int32Type>(&descr, &[&[None], &[None], &[None]])?;
3495        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3496        assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3497
3498        // one page
3499        let column_close_result =
3500            write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)]])?;
3501        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3502        assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3503
3504        // one non-null page
3505        let column_close_result =
3506            write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)], &[None]])?;
3507        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3508        assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3509
3510        // min max both unordered
3511        let column_close_result = write_multiple_pages::<Int32Type>(
3512            &descr,
3513            &[
3514                &[Some(10), Some(11)],
3515                &[Some(11), Some(16)],
3516                &[None],
3517                &[Some(-5), Some(0)],
3518            ],
3519        )?;
3520        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3521        assert_eq!(boundary_order, BoundaryOrder::UNORDERED);
3522
3523        // min max both ordered in different orders
3524        let column_close_result = write_multiple_pages::<Int32Type>(
3525            &descr,
3526            &[
3527                &[Some(1), Some(9)],
3528                &[Some(2), Some(8)],
3529                &[None],
3530                &[Some(3), Some(7)],
3531            ],
3532        )?;
3533        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3534        assert_eq!(boundary_order, BoundaryOrder::UNORDERED);
3535
3536        Ok(())
3537    }
3538
3539    #[test]
3540    fn test_boundary_order_logical_type() -> Result<()> {
3541        // ensure that logical types account for different sort order than underlying
3542        // physical type representation
3543        let f16_descr = Arc::new(get_test_float16_column_descr(1, 0));
3544        let fba_descr = {
3545            let tpe = SchemaType::primitive_type_builder(
3546                "col",
3547                FixedLenByteArrayType::get_physical_type(),
3548            )
3549            .with_length(2)
3550            .build()?;
3551            Arc::new(ColumnDescriptor::new(
3552                Arc::new(tpe),
3553                1,
3554                0,
3555                ColumnPath::from("col"),
3556            ))
3557        };
3558
3559        let values: &[&[Option<FixedLenByteArray>]] = &[
3560            &[Some(FixedLenByteArray::from(ByteArray::from(f16::ONE)))],
3561            &[Some(FixedLenByteArray::from(ByteArray::from(f16::ZERO)))],
3562            &[Some(FixedLenByteArray::from(ByteArray::from(
3563                f16::NEG_ZERO,
3564            )))],
3565            &[Some(FixedLenByteArray::from(ByteArray::from(f16::NEG_ONE)))],
3566        ];
3567
3568        // f16 descending
3569        let column_close_result =
3570            write_multiple_pages::<FixedLenByteArrayType>(&f16_descr, values)?;
3571        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3572        assert_eq!(boundary_order, BoundaryOrder::DESCENDING);
3573
3574        // same bytes, but fba unordered
3575        let column_close_result =
3576            write_multiple_pages::<FixedLenByteArrayType>(&fba_descr, values)?;
3577        let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3578        assert_eq!(boundary_order, BoundaryOrder::UNORDERED);
3579
3580        Ok(())
3581    }
3582
3583    #[test]
3584    fn test_interval_stats_should_not_have_min_max() {
3585        let input = [
3586            vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
3587            vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
3588            vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2],
3589        ]
3590        .into_iter()
3591        .map(|s| ByteArray::from(s).into())
3592        .collect::<Vec<_>>();
3593
3594        let page_writer = get_test_page_writer();
3595        let mut writer = get_test_interval_column_writer(page_writer);
3596        writer.write_batch(&input, None, None).unwrap();
3597
3598        let metadata = writer.close().unwrap().metadata;
3599        let stats = if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
3600            stats.clone()
3601        } else {
3602            panic!("metadata missing statistics");
3603        };
3604        assert!(stats.min_bytes_opt().is_none());
3605        assert!(stats.max_bytes_opt().is_none());
3606    }
3607
3608    #[test]
3609    #[cfg(feature = "arrow")]
3610    fn test_column_writer_get_estimated_total_bytes() {
3611        let page_writer = get_test_page_writer();
3612        let props = Default::default();
3613        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
3614        assert_eq!(writer.get_estimated_total_bytes(), 0);
3615
3616        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
3617        writer.add_data_page().unwrap();
3618        let size_with_one_page = writer.get_estimated_total_bytes();
3619        assert_eq!(size_with_one_page, 20);
3620
3621        writer.write_batch(&[5, 6, 7, 8], None, None).unwrap();
3622        writer.add_data_page().unwrap();
3623        let size_with_two_pages = writer.get_estimated_total_bytes();
3624        // different pages have different compressed lengths
3625        assert_eq!(size_with_two_pages, 20 + 21);
3626    }
3627
3628    fn write_multiple_pages<T: DataType>(
3629        column_descr: &Arc<ColumnDescriptor>,
3630        pages: &[&[Option<T::T>]],
3631    ) -> Result<ColumnCloseResult> {
3632        let column_writer = get_column_writer(
3633            column_descr.clone(),
3634            Default::default(),
3635            get_test_page_writer(),
3636        );
3637        let mut writer = get_typed_column_writer::<T>(column_writer);
3638
3639        for &page in pages {
3640            let values = page.iter().filter_map(Clone::clone).collect::<Vec<_>>();
3641            let def_levels = page
3642                .iter()
3643                .map(|maybe_value| if maybe_value.is_some() { 1 } else { 0 })
3644                .collect::<Vec<_>>();
3645            writer.write_batch(&values, Some(&def_levels), None)?;
3646            writer.flush_data_pages()?;
3647        }
3648
3649        writer.close()
3650    }
3651
3652    /// Performs write-read roundtrip with randomly generated values and levels.
3653    /// `max_size` is maximum number of values or levels (if `max_def_level` > 0) to write
3654    /// for a column.
3655    fn column_roundtrip_random<T: DataType>(
3656        props: WriterProperties,
3657        max_size: usize,
3658        min_value: T::T,
3659        max_value: T::T,
3660        max_def_level: i16,
3661        max_rep_level: i16,
3662    ) where
3663        T::T: PartialOrd + SampleUniform + Copy,
3664    {
3665        let mut num_values: usize = 0;
3666
3667        let mut buf: Vec<i16> = Vec::new();
3668        let def_levels = if max_def_level > 0 {
3669            random_numbers_range(max_size, 0, max_def_level + 1, &mut buf);
3670            for &dl in &buf[..] {
3671                if dl == max_def_level {
3672                    num_values += 1;
3673                }
3674            }
3675            Some(&buf[..])
3676        } else {
3677            num_values = max_size;
3678            None
3679        };
3680
3681        let mut buf: Vec<i16> = Vec::new();
3682        let rep_levels = if max_rep_level > 0 {
3683            random_numbers_range(max_size, 0, max_rep_level + 1, &mut buf);
3684            buf[0] = 0; // Must start on record boundary
3685            Some(&buf[..])
3686        } else {
3687            None
3688        };
3689
3690        let mut values: Vec<T::T> = Vec::new();
3691        random_numbers_range(num_values, min_value, max_value, &mut values);
3692
3693        column_roundtrip::<T>(props, &values[..], def_levels, rep_levels);
3694    }
3695
3696    /// Performs write-read roundtrip and asserts written values and levels.
3697    fn column_roundtrip<T: DataType>(
3698        props: WriterProperties,
3699        values: &[T::T],
3700        def_levels: Option<&[i16]>,
3701        rep_levels: Option<&[i16]>,
3702    ) {
3703        let mut file = tempfile::tempfile().unwrap();
3704        let mut write = TrackedWrite::new(&mut file);
3705        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
3706
3707        let max_def_level = match def_levels {
3708            Some(buf) => *buf.iter().max().unwrap_or(&0i16),
3709            None => 0i16,
3710        };
3711
3712        let max_rep_level = match rep_levels {
3713            Some(buf) => *buf.iter().max().unwrap_or(&0i16),
3714            None => 0i16,
3715        };
3716
3717        let mut max_batch_size = values.len();
3718        if let Some(levels) = def_levels {
3719            max_batch_size = max_batch_size.max(levels.len());
3720        }
3721        if let Some(levels) = rep_levels {
3722            max_batch_size = max_batch_size.max(levels.len());
3723        }
3724
3725        let mut writer =
3726            get_test_column_writer::<T>(page_writer, max_def_level, max_rep_level, Arc::new(props));
3727
3728        let values_written = writer.write_batch(values, def_levels, rep_levels).unwrap();
3729        assert_eq!(values_written, values.len());
3730        let result = writer.close().unwrap();
3731
3732        drop(write);
3733
3734        let props = ReaderProperties::builder()
3735            .set_backward_compatible_lz4(false)
3736            .build();
3737        let page_reader = Box::new(
3738            SerializedPageReader::new_with_properties(
3739                Arc::new(file),
3740                &result.metadata,
3741                result.rows_written as usize,
3742                None,
3743                Arc::new(props),
3744            )
3745            .unwrap(),
3746        );
3747        let mut reader = get_test_column_reader::<T>(page_reader, max_def_level, max_rep_level);
3748
3749        let mut actual_values = Vec::with_capacity(max_batch_size);
3750        let mut actual_def_levels = def_levels.map(|_| Vec::with_capacity(max_batch_size));
3751        let mut actual_rep_levels = rep_levels.map(|_| Vec::with_capacity(max_batch_size));
3752
3753        let (_, values_read, levels_read) = reader
3754            .read_records(
3755                max_batch_size,
3756                actual_def_levels.as_mut(),
3757                actual_rep_levels.as_mut(),
3758                &mut actual_values,
3759            )
3760            .unwrap();
3761
3762        // Assert values, definition and repetition levels.
3763
3764        assert_eq!(&actual_values[..values_read], values);
3765        match actual_def_levels {
3766            Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), def_levels),
3767            None => assert_eq!(None, def_levels),
3768        }
3769        match actual_rep_levels {
3770            Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), rep_levels),
3771            None => assert_eq!(None, rep_levels),
3772        }
3773
3774        // Assert written rows.
3775
3776        if let Some(levels) = actual_rep_levels {
3777            let mut actual_rows_written = 0;
3778            for l in levels {
3779                if l == 0 {
3780                    actual_rows_written += 1;
3781                }
3782            }
3783            assert_eq!(actual_rows_written, result.rows_written);
3784        } else if actual_def_levels.is_some() {
3785            assert_eq!(levels_read as u64, result.rows_written);
3786        } else {
3787            assert_eq!(values_read as u64, result.rows_written);
3788        }
3789    }
3790
3791    /// Performs write of provided values and returns column metadata of those values.
3792    /// Used to test encoding support for column writer.
3793    fn column_write_and_get_metadata<T: DataType>(
3794        props: WriterProperties,
3795        values: &[T::T],
3796    ) -> ColumnChunkMetaData {
3797        let page_writer = get_test_page_writer();
3798        let props = Arc::new(props);
3799        let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
3800        writer.write_batch(values, None, None).unwrap();
3801        writer.close().unwrap().metadata
3802    }
3803
3804    // Function to use in tests for EncodingWriteSupport. This checks that dictionary
3805    // offset and encodings to make sure that column writer uses provided by trait
3806    // encodings.
3807    fn check_encoding_write_support<T: DataType>(
3808        version: WriterVersion,
3809        dict_enabled: bool,
3810        data: &[T::T],
3811        dictionary_page_offset: Option<i64>,
3812        encodings: &[Encoding],
3813    ) {
3814        let props = WriterProperties::builder()
3815            .set_writer_version(version)
3816            .set_dictionary_enabled(dict_enabled)
3817            .build();
3818        let meta = column_write_and_get_metadata::<T>(props, data);
3819        assert_eq!(meta.dictionary_page_offset(), dictionary_page_offset);
3820        assert_eq!(meta.encodings(), &encodings);
3821    }
3822
3823    /// Returns column writer.
3824    fn get_test_column_writer<'a, T: DataType>(
3825        page_writer: Box<dyn PageWriter + 'a>,
3826        max_def_level: i16,
3827        max_rep_level: i16,
3828        props: WriterPropertiesPtr,
3829    ) -> ColumnWriterImpl<'a, T> {
3830        let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
3831        let column_writer = get_column_writer(descr, props, page_writer);
3832        get_typed_column_writer::<T>(column_writer)
3833    }
3834
3835    /// Returns column reader.
3836    fn get_test_column_reader<T: DataType>(
3837        page_reader: Box<dyn PageReader>,
3838        max_def_level: i16,
3839        max_rep_level: i16,
3840    ) -> ColumnReaderImpl<T> {
3841        let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
3842        let column_reader = get_column_reader(descr, page_reader);
3843        get_typed_column_reader::<T>(column_reader)
3844    }
3845
3846    /// Returns descriptor for primitive column.
3847    fn get_test_column_descr<T: DataType>(
3848        max_def_level: i16,
3849        max_rep_level: i16,
3850    ) -> ColumnDescriptor {
3851        let path = ColumnPath::from("col");
3852        let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
3853            // length is set for "encoding support" tests for FIXED_LEN_BYTE_ARRAY type,
3854            // it should be no-op for other types
3855            .with_length(1)
3856            .build()
3857            .unwrap();
3858        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
3859    }
3860
3861    /// Returns page writer that collects pages without serializing them.
3862    fn get_test_page_writer() -> Box<dyn PageWriter> {
3863        Box::new(TestPageWriter {})
3864    }
3865
3866    struct TestPageWriter {}
3867
3868    impl PageWriter for TestPageWriter {
3869        fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
3870            let mut res = PageWriteSpec::new();
3871            res.page_type = page.page_type();
3872            res.uncompressed_size = page.uncompressed_size();
3873            res.compressed_size = page.compressed_size();
3874            res.num_values = page.num_values();
3875            res.offset = 0;
3876            res.bytes_written = page.data().len() as u64;
3877            Ok(res)
3878        }
3879
3880        fn close(&mut self) -> Result<()> {
3881            Ok(())
3882        }
3883    }
3884
3885    /// Write data into parquet using [`get_test_page_writer`] and [`get_test_column_writer`] and returns generated statistics.
3886    fn statistics_roundtrip<T: DataType>(values: &[<T as DataType>::T]) -> Statistics {
3887        let page_writer = get_test_page_writer();
3888        let props = Default::default();
3889        let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
3890        writer.write_batch(values, None, None).unwrap();
3891
3892        let metadata = writer.close().unwrap().metadata;
3893        if let Some(stats) = metadata.statistics() {
3894            stats.clone()
3895        } else {
3896            panic!("metadata missing statistics");
3897        }
3898    }
3899
3900    /// Returns Decimals column writer.
3901    fn get_test_decimals_column_writer<T: DataType>(
3902        page_writer: Box<dyn PageWriter>,
3903        max_def_level: i16,
3904        max_rep_level: i16,
3905        props: WriterPropertiesPtr,
3906    ) -> ColumnWriterImpl<'static, T> {
3907        let descr = Arc::new(get_test_decimals_column_descr::<T>(
3908            max_def_level,
3909            max_rep_level,
3910        ));
3911        let column_writer = get_column_writer(descr, props, page_writer);
3912        get_typed_column_writer::<T>(column_writer)
3913    }
3914
3915    /// Returns descriptor for Decimal type with primitive column.
3916    fn get_test_decimals_column_descr<T: DataType>(
3917        max_def_level: i16,
3918        max_rep_level: i16,
3919    ) -> ColumnDescriptor {
3920        let path = ColumnPath::from("col");
3921        let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
3922            .with_length(16)
3923            .with_logical_type(Some(LogicalType::Decimal {
3924                scale: 2,
3925                precision: 3,
3926            }))
3927            .with_scale(2)
3928            .with_precision(3)
3929            .build()
3930            .unwrap();
3931        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
3932    }
3933
3934    fn float16_statistics_roundtrip(
3935        values: &[FixedLenByteArray],
3936    ) -> ValueStatistics<FixedLenByteArray> {
3937        let page_writer = get_test_page_writer();
3938        let mut writer = get_test_float16_column_writer(page_writer, Default::default());
3939        writer.write_batch(values, None, None).unwrap();
3940
3941        let metadata = writer.close().unwrap().metadata;
3942        if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
3943            stats.clone()
3944        } else {
3945            panic!("metadata missing statistics");
3946        }
3947    }
3948
3949    fn get_test_float16_column_writer(
3950        page_writer: Box<dyn PageWriter>,
3951        props: WriterPropertiesPtr,
3952    ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
3953        let descr = Arc::new(get_test_float16_column_descr(0, 0));
3954        let column_writer = get_column_writer(descr, props, page_writer);
3955        get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
3956    }
3957
3958    fn get_test_float16_column_descr(max_def_level: i16, max_rep_level: i16) -> ColumnDescriptor {
3959        let path = ColumnPath::from("col");
3960        let tpe =
3961            SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
3962                .with_length(2)
3963                .with_logical_type(Some(LogicalType::Float16))
3964                .build()
3965                .unwrap();
3966        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
3967    }
3968
3969    fn get_test_interval_column_writer(
3970        page_writer: Box<dyn PageWriter>,
3971    ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
3972        let descr = Arc::new(get_test_interval_column_descr());
3973        let column_writer = get_column_writer(descr, Default::default(), page_writer);
3974        get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
3975    }
3976
3977    fn get_test_interval_column_descr() -> ColumnDescriptor {
3978        let path = ColumnPath::from("col");
3979        let tpe =
3980            SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
3981                .with_length(12)
3982                .with_converted_type(ConvertedType::INTERVAL)
3983                .build()
3984                .unwrap();
3985        ColumnDescriptor::new(Arc::new(tpe), 0, 0, path)
3986    }
3987
3988    /// Returns column writer for UINT32 Column provided as ConvertedType only
3989    fn get_test_unsigned_int_given_as_converted_column_writer<'a, T: DataType>(
3990        page_writer: Box<dyn PageWriter + 'a>,
3991        max_def_level: i16,
3992        max_rep_level: i16,
3993        props: WriterPropertiesPtr,
3994    ) -> ColumnWriterImpl<'a, T> {
3995        let descr = Arc::new(get_test_converted_type_unsigned_integer_column_descr::<T>(
3996            max_def_level,
3997            max_rep_level,
3998        ));
3999        let column_writer = get_column_writer(descr, props, page_writer);
4000        get_typed_column_writer::<T>(column_writer)
4001    }
4002
4003    /// Returns column descriptor for UINT32 Column provided as ConvertedType only
4004    fn get_test_converted_type_unsigned_integer_column_descr<T: DataType>(
4005        max_def_level: i16,
4006        max_rep_level: i16,
4007    ) -> ColumnDescriptor {
4008        let path = ColumnPath::from("col");
4009        let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4010            .with_converted_type(ConvertedType::UINT_32)
4011            .build()
4012            .unwrap();
4013        ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4014    }
4015}