parquet/file/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains file writer API, and provides methods to write row groups and columns by
19//! using row group writers and column writers respectively.
20
21use crate::bloom_filter::Sbbf;
22use crate::format as parquet;
23use crate::format::{ColumnIndex, OffsetIndex};
24use crate::thrift::TSerializable;
25use std::fmt::Debug;
26use std::io::{BufWriter, IoSlice, Read};
27use std::{io::Write, sync::Arc};
28use thrift::protocol::TCompactOutputProtocol;
29
30use crate::column::writer::{get_typed_column_writer_mut, ColumnCloseResult, ColumnWriterImpl};
31use crate::column::{
32    page::{CompressedPage, PageWriteSpec, PageWriter},
33    writer::{get_column_writer, ColumnWriter},
34};
35use crate::data_type::DataType;
36use crate::errors::{ParquetError, Result};
37use crate::file::properties::{BloomFilterPosition, WriterPropertiesPtr};
38use crate::file::reader::ChunkReader;
39use crate::file::{metadata::*, PARQUET_MAGIC};
40use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor, TypePtr};
41
42/// A wrapper around a [`Write`] that keeps track of the number
43/// of bytes that have been written. The given [`Write`] is wrapped
44/// with a [`BufWriter`] to optimize writing performance.
45pub struct TrackedWrite<W: Write> {
46    inner: BufWriter<W>,
47    bytes_written: usize,
48}
49
50impl<W: Write> TrackedWrite<W> {
51    /// Create a new [`TrackedWrite`] from a [`Write`]
52    pub fn new(inner: W) -> Self {
53        let buf_write = BufWriter::new(inner);
54        Self {
55            inner: buf_write,
56            bytes_written: 0,
57        }
58    }
59
60    /// Returns the number of bytes written to this instance
61    pub fn bytes_written(&self) -> usize {
62        self.bytes_written
63    }
64
65    /// Returns a reference to the underlying writer.
66    pub fn inner(&self) -> &W {
67        self.inner.get_ref()
68    }
69
70    /// Returns a mutable reference to the underlying writer.
71    ///
72    /// It is inadvisable to directly write to the underlying writer, doing so
73    /// will likely result in data corruption
74    pub fn inner_mut(&mut self) -> &mut W {
75        self.inner.get_mut()
76    }
77
78    /// Returns the underlying writer.
79    pub fn into_inner(self) -> Result<W> {
80        self.inner.into_inner().map_err(|err| {
81            ParquetError::General(format!("fail to get inner writer: {:?}", err.to_string()))
82        })
83    }
84}
85
86impl<W: Write> Write for TrackedWrite<W> {
87    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
88        let bytes = self.inner.write(buf)?;
89        self.bytes_written += bytes;
90        Ok(bytes)
91    }
92
93    fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> std::io::Result<usize> {
94        let bytes = self.inner.write_vectored(bufs)?;
95        self.bytes_written += bytes;
96        Ok(bytes)
97    }
98
99    fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
100        self.inner.write_all(buf)?;
101        self.bytes_written += buf.len();
102
103        Ok(())
104    }
105
106    fn flush(&mut self) -> std::io::Result<()> {
107        self.inner.flush()
108    }
109}
110
111/// Callback invoked on closing a column chunk
112pub type OnCloseColumnChunk<'a> = Box<dyn FnOnce(ColumnCloseResult) -> Result<()> + 'a>;
113
114/// Callback invoked on closing a row group, arguments are:
115///
116/// - the row group metadata
117/// - the column index for each column chunk
118/// - the offset index for each column chunk
119pub type OnCloseRowGroup<'a, W> = Box<
120    dyn FnOnce(
121            &'a mut TrackedWrite<W>,
122            RowGroupMetaData,
123            Vec<Option<Sbbf>>,
124            Vec<Option<ColumnIndex>>,
125            Vec<Option<OffsetIndex>>,
126        ) -> Result<()>
127        + 'a
128        + Send,
129>;
130
131// ----------------------------------------------------------------------
132// Serialized impl for file & row group writers
133
134/// Parquet file writer API.
135/// Provides methods to write row groups sequentially.
136///
137/// The main workflow should be as following:
138/// - Create file writer, this will open a new file and potentially write some metadata.
139/// - Request a new row group writer by calling `next_row_group`.
140/// - Once finished writing row group, close row group writer by calling `close`
141/// - Write subsequent row groups, if necessary.
142/// - After all row groups have been written, close the file writer using `close` method.
143pub struct SerializedFileWriter<W: Write> {
144    buf: TrackedWrite<W>,
145    schema: TypePtr,
146    descr: SchemaDescPtr,
147    props: WriterPropertiesPtr,
148    row_groups: Vec<RowGroupMetaData>,
149    bloom_filters: Vec<Vec<Option<Sbbf>>>,
150    column_indexes: Vec<Vec<Option<ColumnIndex>>>,
151    offset_indexes: Vec<Vec<Option<OffsetIndex>>>,
152    row_group_index: usize,
153    // kv_metadatas will be appended to `props` when `write_metadata`
154    kv_metadatas: Vec<KeyValue>,
155    finished: bool,
156}
157
158impl<W: Write> Debug for SerializedFileWriter<W> {
159    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
160        // implement Debug so this can be used with #[derive(Debug)]
161        // in client code rather than actually listing all the fields
162        f.debug_struct("SerializedFileWriter")
163            .field("descr", &self.descr)
164            .field("row_group_index", &self.row_group_index)
165            .field("kv_metadatas", &self.kv_metadatas)
166            .finish_non_exhaustive()
167    }
168}
169
170impl<W: Write + Send> SerializedFileWriter<W> {
171    /// Creates new file writer.
172    pub fn new(buf: W, schema: TypePtr, properties: WriterPropertiesPtr) -> Result<Self> {
173        let mut buf = TrackedWrite::new(buf);
174        Self::start_file(&mut buf)?;
175        Ok(Self {
176            buf,
177            schema: schema.clone(),
178            descr: Arc::new(SchemaDescriptor::new(schema)),
179            props: properties,
180            row_groups: vec![],
181            bloom_filters: vec![],
182            column_indexes: Vec::new(),
183            offset_indexes: Vec::new(),
184            row_group_index: 0,
185            kv_metadatas: Vec::new(),
186            finished: false,
187        })
188    }
189
190    /// Creates new row group from this file writer.
191    /// In case of IO error or Thrift error, returns `Err`.
192    ///
193    /// There can be at most 2^15 row groups in a file; and row groups have
194    /// to be written sequentially. Every time the next row group is requested, the
195    /// previous row group must be finalised and closed using `RowGroupWriter::close` method.
196    pub fn next_row_group(&mut self) -> Result<SerializedRowGroupWriter<'_, W>> {
197        self.assert_previous_writer_closed()?;
198        let ordinal = self.row_group_index;
199
200        let ordinal: i16 = ordinal.try_into().map_err(|_| {
201            ParquetError::General(format!(
202                "Parquet does not support more than {} row groups per file (currently: {})",
203                i16::MAX,
204                ordinal
205            ))
206        })?;
207
208        self.row_group_index = self
209            .row_group_index
210            .checked_add(1)
211            .expect("SerializedFileWriter::row_group_index overflowed");
212
213        let bloom_filter_position = self.properties().bloom_filter_position();
214        let row_groups = &mut self.row_groups;
215        let row_bloom_filters = &mut self.bloom_filters;
216        let row_column_indexes = &mut self.column_indexes;
217        let row_offset_indexes = &mut self.offset_indexes;
218        let on_close = move |buf,
219                             mut metadata,
220                             row_group_bloom_filter,
221                             row_group_column_index,
222                             row_group_offset_index| {
223            row_bloom_filters.push(row_group_bloom_filter);
224            row_column_indexes.push(row_group_column_index);
225            row_offset_indexes.push(row_group_offset_index);
226            // write bloom filters out immediately after the row group if requested
227            match bloom_filter_position {
228                BloomFilterPosition::AfterRowGroup => {
229                    write_bloom_filters(buf, row_bloom_filters, &mut metadata)?
230                }
231                BloomFilterPosition::End => (),
232            };
233            row_groups.push(metadata);
234            Ok(())
235        };
236
237        let row_group_writer = SerializedRowGroupWriter::new(
238            self.descr.clone(),
239            self.props.clone(),
240            &mut self.buf,
241            ordinal,
242            Some(Box::new(on_close)),
243        );
244        Ok(row_group_writer)
245    }
246
247    /// Returns metadata for any flushed row groups
248    pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
249        &self.row_groups
250    }
251
252    /// Close and finalize the underlying Parquet writer
253    ///
254    /// Unlike [`Self::close`] this does not consume self
255    ///
256    /// Attempting to write after calling finish will result in an error
257    pub fn finish(&mut self) -> Result<parquet::FileMetaData> {
258        self.assert_previous_writer_closed()?;
259        let metadata = self.write_metadata()?;
260        self.buf.flush()?;
261        Ok(metadata)
262    }
263
264    /// Closes and finalises file writer, returning the file metadata.
265    pub fn close(mut self) -> Result<parquet::FileMetaData> {
266        self.finish()
267    }
268
269    /// Writes magic bytes at the beginning of the file.
270    fn start_file(buf: &mut TrackedWrite<W>) -> Result<()> {
271        buf.write_all(&PARQUET_MAGIC)?;
272        Ok(())
273    }
274
275    /// Assembles and writes metadata at the end of the file.
276    fn write_metadata(&mut self) -> Result<parquet::FileMetaData> {
277        self.finished = true;
278
279        // write out any remaining bloom filters after all row groups
280        for row_group in &mut self.row_groups {
281            write_bloom_filters(&mut self.buf, &mut self.bloom_filters, row_group)?;
282        }
283
284        let key_value_metadata = match self.props.key_value_metadata() {
285            Some(kv) => Some(kv.iter().chain(&self.kv_metadatas).cloned().collect()),
286            None if self.kv_metadatas.is_empty() => None,
287            None => Some(self.kv_metadatas.clone()),
288        };
289
290        let row_groups = self
291            .row_groups
292            .iter()
293            .map(|v| v.to_thrift())
294            .collect::<Vec<_>>();
295
296        let mut encoder = ThriftMetadataWriter::new(
297            &mut self.buf,
298            &self.schema,
299            &self.descr,
300            row_groups,
301            Some(self.props.created_by().to_string()),
302            self.props.writer_version().as_num(),
303        );
304        if let Some(key_value_metadata) = key_value_metadata {
305            encoder = encoder.with_key_value_metadata(key_value_metadata)
306        }
307        encoder = encoder.with_column_indexes(&self.column_indexes);
308        encoder = encoder.with_offset_indexes(&self.offset_indexes);
309        encoder.finish()
310    }
311
312    #[inline]
313    fn assert_previous_writer_closed(&self) -> Result<()> {
314        if self.finished {
315            return Err(general_err!("SerializedFileWriter already finished"));
316        }
317
318        if self.row_group_index != self.row_groups.len() {
319            Err(general_err!("Previous row group writer was not closed"))
320        } else {
321            Ok(())
322        }
323    }
324
325    /// Add a [`KeyValue`] to the file writer's metadata
326    pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
327        self.kv_metadatas.push(kv_metadata);
328    }
329
330    /// Returns a reference to schema descriptor.
331    pub fn schema_descr(&self) -> &SchemaDescriptor {
332        &self.descr
333    }
334
335    /// Returns a reference to the writer properties
336    pub fn properties(&self) -> &WriterPropertiesPtr {
337        &self.props
338    }
339
340    /// Returns a reference to the underlying writer.
341    pub fn inner(&self) -> &W {
342        self.buf.inner()
343    }
344
345    /// Returns a mutable reference to the underlying writer.
346    ///
347    /// It is inadvisable to directly write to the underlying writer.
348    pub fn inner_mut(&mut self) -> &mut W {
349        self.buf.inner_mut()
350    }
351
352    /// Writes the file footer and returns the underlying writer.
353    pub fn into_inner(mut self) -> Result<W> {
354        self.assert_previous_writer_closed()?;
355        let _ = self.write_metadata()?;
356
357        self.buf.into_inner()
358    }
359
360    /// Returns the number of bytes written to this instance
361    pub fn bytes_written(&self) -> usize {
362        self.buf.bytes_written()
363    }
364}
365
366/// Serialize all the bloom filters of the given row group to the given buffer,
367/// and returns the updated row group metadata.
368fn write_bloom_filters<W: Write + Send>(
369    buf: &mut TrackedWrite<W>,
370    bloom_filters: &mut [Vec<Option<Sbbf>>],
371    row_group: &mut RowGroupMetaData,
372) -> Result<()> {
373    // iter row group
374    // iter each column
375    // write bloom filter to the file
376
377    let row_group_idx: u16 = row_group
378        .ordinal()
379        .expect("Missing row group ordinal")
380        .try_into()
381        .map_err(|_| {
382            ParquetError::General(format!(
383                "Negative row group ordinal: {})",
384                row_group.ordinal().unwrap()
385            ))
386        })?;
387    let row_group_idx = row_group_idx as usize;
388    for (column_idx, column_chunk) in row_group.columns_mut().iter_mut().enumerate() {
389        if let Some(bloom_filter) = bloom_filters[row_group_idx][column_idx].take() {
390            let start_offset = buf.bytes_written();
391            bloom_filter.write(&mut *buf)?;
392            let end_offset = buf.bytes_written();
393            // set offset and index for bloom filter
394            *column_chunk = column_chunk
395                .clone()
396                .into_builder()
397                .set_bloom_filter_offset(Some(start_offset as i64))
398                .set_bloom_filter_length(Some((end_offset - start_offset) as i32))
399                .build()?;
400        }
401    }
402    Ok(())
403}
404
405/// Parquet row group writer API.
406/// Provides methods to access column writers in an iterator-like fashion, order is
407/// guaranteed to match the order of schema leaves (column descriptors).
408///
409/// All columns should be written sequentially; the main workflow is:
410/// - Request the next column using `next_column` method - this will return `None` if no
411///   more columns are available to write.
412/// - Once done writing a column, close column writer with `close`
413/// - Once all columns have been written, close row group writer with `close`
414///   method. THe close method will return row group metadata and is no-op
415///   on already closed row group.
416pub struct SerializedRowGroupWriter<'a, W: Write> {
417    descr: SchemaDescPtr,
418    props: WriterPropertiesPtr,
419    buf: &'a mut TrackedWrite<W>,
420    total_rows_written: Option<u64>,
421    total_bytes_written: u64,
422    total_uncompressed_bytes: i64,
423    column_index: usize,
424    row_group_metadata: Option<RowGroupMetaDataPtr>,
425    column_chunks: Vec<ColumnChunkMetaData>,
426    bloom_filters: Vec<Option<Sbbf>>,
427    column_indexes: Vec<Option<ColumnIndex>>,
428    offset_indexes: Vec<Option<OffsetIndex>>,
429    row_group_index: i16,
430    file_offset: i64,
431    on_close: Option<OnCloseRowGroup<'a, W>>,
432}
433
434impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> {
435    /// Creates a new `SerializedRowGroupWriter` with:
436    ///
437    /// - `schema_descr` - the schema to write
438    /// - `properties` - writer properties
439    /// - `buf` - the buffer to write data to
440    /// - `row_group_index` - row group index in this parquet file.
441    /// - `file_offset` - file offset of this row group in this parquet file.
442    /// - `on_close` - an optional callback that will invoked on [`Self::close`]
443    pub fn new(
444        schema_descr: SchemaDescPtr,
445        properties: WriterPropertiesPtr,
446        buf: &'a mut TrackedWrite<W>,
447        row_group_index: i16,
448        on_close: Option<OnCloseRowGroup<'a, W>>,
449    ) -> Self {
450        let num_columns = schema_descr.num_columns();
451        let file_offset = buf.bytes_written() as i64;
452        Self {
453            buf,
454            row_group_index,
455            file_offset,
456            on_close,
457            total_rows_written: None,
458            descr: schema_descr,
459            props: properties,
460            column_index: 0,
461            row_group_metadata: None,
462            column_chunks: Vec::with_capacity(num_columns),
463            bloom_filters: Vec::with_capacity(num_columns),
464            column_indexes: Vec::with_capacity(num_columns),
465            offset_indexes: Vec::with_capacity(num_columns),
466            total_bytes_written: 0,
467            total_uncompressed_bytes: 0,
468        }
469    }
470
471    /// Advance `self.column_index` returning the next [`ColumnDescPtr`] if any
472    fn next_column_desc(&mut self) -> Option<ColumnDescPtr> {
473        let ret = self.descr.columns().get(self.column_index)?.clone();
474        self.column_index += 1;
475        Some(ret)
476    }
477
478    /// Returns [`OnCloseColumnChunk`] for the next writer
479    fn get_on_close(&mut self) -> (&mut TrackedWrite<W>, OnCloseColumnChunk<'_>) {
480        let total_bytes_written = &mut self.total_bytes_written;
481        let total_uncompressed_bytes = &mut self.total_uncompressed_bytes;
482        let total_rows_written = &mut self.total_rows_written;
483        let column_chunks = &mut self.column_chunks;
484        let column_indexes = &mut self.column_indexes;
485        let offset_indexes = &mut self.offset_indexes;
486        let bloom_filters = &mut self.bloom_filters;
487
488        let on_close = |r: ColumnCloseResult| {
489            // Update row group writer metrics
490            *total_bytes_written += r.bytes_written;
491            *total_uncompressed_bytes += r.metadata.uncompressed_size();
492            column_chunks.push(r.metadata);
493            bloom_filters.push(r.bloom_filter);
494            column_indexes.push(r.column_index);
495            offset_indexes.push(r.offset_index);
496
497            if let Some(rows) = *total_rows_written {
498                if rows != r.rows_written {
499                    return Err(general_err!(
500                        "Incorrect number of rows, expected {} != {} rows",
501                        rows,
502                        r.rows_written
503                    ));
504                }
505            } else {
506                *total_rows_written = Some(r.rows_written);
507            }
508
509            Ok(())
510        };
511        (self.buf, Box::new(on_close))
512    }
513
514    /// Returns the next column writer, if available, using the factory function;
515    /// otherwise returns `None`.
516    pub(crate) fn next_column_with_factory<'b, F, C>(&'b mut self, factory: F) -> Result<Option<C>>
517    where
518        F: FnOnce(
519            ColumnDescPtr,
520            WriterPropertiesPtr,
521            Box<dyn PageWriter + 'b>,
522            OnCloseColumnChunk<'b>,
523        ) -> Result<C>,
524    {
525        self.assert_previous_writer_closed()?;
526        Ok(match self.next_column_desc() {
527            Some(column) => {
528                let props = self.props.clone();
529                let (buf, on_close) = self.get_on_close();
530                let page_writer = Box::new(SerializedPageWriter::new(buf));
531                Some(factory(column, props, page_writer, Box::new(on_close))?)
532            }
533            None => None,
534        })
535    }
536
537    /// Returns the next column writer, if available; otherwise returns `None`.
538    /// In case of any IO error or Thrift error, or if row group writer has already been
539    /// closed returns `Err`.
540    pub fn next_column(&mut self) -> Result<Option<SerializedColumnWriter<'_>>> {
541        self.next_column_with_factory(|descr, props, page_writer, on_close| {
542            let column_writer = get_column_writer(descr, props, page_writer);
543            Ok(SerializedColumnWriter::new(column_writer, Some(on_close)))
544        })
545    }
546
547    /// Append an encoded column chunk from another source without decoding it
548    ///
549    /// This can be used for efficiently concatenating or projecting parquet data,
550    /// or encoding parquet data to temporary in-memory buffers
551    ///
552    /// See [`Self::next_column`] for writing data that isn't already encoded
553    pub fn append_column<R: ChunkReader>(
554        &mut self,
555        reader: &R,
556        mut close: ColumnCloseResult,
557    ) -> Result<()> {
558        self.assert_previous_writer_closed()?;
559        let desc = self
560            .next_column_desc()
561            .ok_or_else(|| general_err!("exhausted columns in SerializedRowGroupWriter"))?;
562
563        let metadata = close.metadata;
564
565        if metadata.column_descr() != desc.as_ref() {
566            return Err(general_err!(
567                "column descriptor mismatch, expected {:?} got {:?}",
568                desc,
569                metadata.column_descr()
570            ));
571        }
572
573        let src_dictionary_offset = metadata.dictionary_page_offset();
574        let src_data_offset = metadata.data_page_offset();
575        let src_offset = src_dictionary_offset.unwrap_or(src_data_offset);
576        let src_length = metadata.compressed_size();
577
578        let write_offset = self.buf.bytes_written();
579        let mut read = reader.get_read(src_offset as _)?.take(src_length as _);
580        let write_length = std::io::copy(&mut read, &mut self.buf)?;
581
582        if src_length as u64 != write_length {
583            return Err(general_err!(
584                "Failed to splice column data, expected {read_length} got {write_length}"
585            ));
586        }
587
588        let map_offset = |x| x - src_offset + write_offset as i64;
589        let mut builder = ColumnChunkMetaData::builder(metadata.column_descr_ptr())
590            .set_compression(metadata.compression())
591            .set_encodings(metadata.encodings().clone())
592            .set_total_compressed_size(metadata.compressed_size())
593            .set_total_uncompressed_size(metadata.uncompressed_size())
594            .set_num_values(metadata.num_values())
595            .set_data_page_offset(map_offset(src_data_offset))
596            .set_dictionary_page_offset(src_dictionary_offset.map(map_offset))
597            .set_unencoded_byte_array_data_bytes(metadata.unencoded_byte_array_data_bytes());
598
599        if let Some(rep_hist) = metadata.repetition_level_histogram() {
600            builder = builder.set_repetition_level_histogram(Some(rep_hist.clone()))
601        }
602        if let Some(def_hist) = metadata.definition_level_histogram() {
603            builder = builder.set_definition_level_histogram(Some(def_hist.clone()))
604        }
605        if let Some(statistics) = metadata.statistics() {
606            builder = builder.set_statistics(statistics.clone())
607        }
608        close.metadata = builder.build()?;
609
610        if let Some(offsets) = close.offset_index.as_mut() {
611            for location in &mut offsets.page_locations {
612                location.offset = map_offset(location.offset)
613            }
614        }
615
616        let (_, on_close) = self.get_on_close();
617        on_close(close)
618    }
619
620    /// Closes this row group writer and returns row group metadata.
621    pub fn close(mut self) -> Result<RowGroupMetaDataPtr> {
622        if self.row_group_metadata.is_none() {
623            self.assert_previous_writer_closed()?;
624
625            let column_chunks = std::mem::take(&mut self.column_chunks);
626            let row_group_metadata = RowGroupMetaData::builder(self.descr.clone())
627                .set_column_metadata(column_chunks)
628                .set_total_byte_size(self.total_uncompressed_bytes)
629                .set_num_rows(self.total_rows_written.unwrap_or(0) as i64)
630                .set_sorting_columns(self.props.sorting_columns().cloned())
631                .set_ordinal(self.row_group_index)
632                .set_file_offset(self.file_offset)
633                .build()?;
634
635            self.row_group_metadata = Some(Arc::new(row_group_metadata.clone()));
636
637            if let Some(on_close) = self.on_close.take() {
638                on_close(
639                    self.buf,
640                    row_group_metadata,
641                    self.bloom_filters,
642                    self.column_indexes,
643                    self.offset_indexes,
644                )?
645            }
646        }
647
648        let metadata = self.row_group_metadata.as_ref().unwrap().clone();
649        Ok(metadata)
650    }
651
652    #[inline]
653    fn assert_previous_writer_closed(&self) -> Result<()> {
654        if self.column_index != self.column_chunks.len() {
655            Err(general_err!("Previous column writer was not closed"))
656        } else {
657            Ok(())
658        }
659    }
660}
661
662/// A wrapper around a [`ColumnWriter`] that invokes a callback on [`Self::close`]
663pub struct SerializedColumnWriter<'a> {
664    inner: ColumnWriter<'a>,
665    on_close: Option<OnCloseColumnChunk<'a>>,
666}
667
668impl<'a> SerializedColumnWriter<'a> {
669    /// Create a new [`SerializedColumnWriter`] from a [`ColumnWriter`] and an
670    /// optional callback to be invoked on [`Self::close`]
671    pub fn new(inner: ColumnWriter<'a>, on_close: Option<OnCloseColumnChunk<'a>>) -> Self {
672        Self { inner, on_close }
673    }
674
675    /// Returns a reference to an untyped [`ColumnWriter`]
676    pub fn untyped(&mut self) -> &mut ColumnWriter<'a> {
677        &mut self.inner
678    }
679
680    /// Returns a reference to a typed [`ColumnWriterImpl`]
681    pub fn typed<T: DataType>(&mut self) -> &mut ColumnWriterImpl<'a, T> {
682        get_typed_column_writer_mut(&mut self.inner)
683    }
684
685    /// Close this [`SerializedColumnWriter`]
686    pub fn close(mut self) -> Result<()> {
687        let r = self.inner.close()?;
688        if let Some(on_close) = self.on_close.take() {
689            on_close(r)?
690        }
691
692        Ok(())
693    }
694}
695
696/// A serialized implementation for Parquet [`PageWriter`].
697/// Writes and serializes pages and metadata into output stream.
698///
699/// `SerializedPageWriter` should not be used after calling `close()`.
700pub struct SerializedPageWriter<'a, W: Write> {
701    sink: &'a mut TrackedWrite<W>,
702}
703
704impl<'a, W: Write> SerializedPageWriter<'a, W> {
705    /// Creates new page writer.
706    pub fn new(sink: &'a mut TrackedWrite<W>) -> Self {
707        Self { sink }
708    }
709
710    /// Serializes page header into Thrift.
711    /// Returns number of bytes that have been written into the sink.
712    #[inline]
713    fn serialize_page_header(&mut self, header: parquet::PageHeader) -> Result<usize> {
714        let start_pos = self.sink.bytes_written();
715        {
716            let mut protocol = TCompactOutputProtocol::new(&mut self.sink);
717            header.write_to_out_protocol(&mut protocol)?;
718        }
719        Ok(self.sink.bytes_written() - start_pos)
720    }
721}
722
723impl<W: Write + Send> PageWriter for SerializedPageWriter<'_, W> {
724    fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
725        let page_type = page.page_type();
726        let start_pos = self.sink.bytes_written() as u64;
727
728        let page_header = page.to_thrift_header();
729        let header_size = self.serialize_page_header(page_header)?;
730        self.sink.write_all(page.data())?;
731
732        let mut spec = PageWriteSpec::new();
733        spec.page_type = page_type;
734        spec.uncompressed_size = page.uncompressed_size() + header_size;
735        spec.compressed_size = page.compressed_size() + header_size;
736        spec.offset = start_pos;
737        spec.bytes_written = self.sink.bytes_written() as u64 - start_pos;
738        spec.num_values = page.num_values();
739
740        Ok(spec)
741    }
742
743    fn close(&mut self) -> Result<()> {
744        self.sink.flush()?;
745        Ok(())
746    }
747}
748
749#[cfg(test)]
750mod tests {
751    use super::*;
752
753    #[cfg(feature = "arrow")]
754    use arrow_array::RecordBatchReader;
755    use bytes::Bytes;
756    use std::fs::File;
757
758    #[cfg(feature = "arrow")]
759    use crate::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
760    #[cfg(feature = "arrow")]
761    use crate::arrow::ArrowWriter;
762    use crate::basic::{
763        ColumnOrder, Compression, ConvertedType, Encoding, LogicalType, Repetition, SortOrder, Type,
764    };
765    use crate::column::page::{Page, PageReader};
766    use crate::column::reader::get_typed_column_reader;
767    use crate::compression::{create_codec, Codec, CodecOptionsBuilder};
768    use crate::data_type::{BoolType, ByteArrayType, Int32Type};
769    use crate::file::page_index::index::Index;
770    use crate::file::properties::EnabledStatistics;
771    use crate::file::serialized_reader::ReadOptionsBuilder;
772    use crate::file::{
773        properties::{ReaderProperties, WriterProperties, WriterVersion},
774        reader::{FileReader, SerializedFileReader, SerializedPageReader},
775        statistics::{from_thrift, to_thrift, Statistics},
776    };
777    use crate::format::SortingColumn;
778    use crate::record::{Row, RowAccessor};
779    use crate::schema::parser::parse_message_type;
780    use crate::schema::types;
781    use crate::schema::types::{ColumnDescriptor, ColumnPath};
782    use crate::util::test_common::rand_gen::RandGen;
783
784    #[test]
785    fn test_row_group_writer_error_not_all_columns_written() {
786        let file = tempfile::tempfile().unwrap();
787        let schema = Arc::new(
788            types::Type::group_type_builder("schema")
789                .with_fields(vec![Arc::new(
790                    types::Type::primitive_type_builder("col1", Type::INT32)
791                        .build()
792                        .unwrap(),
793                )])
794                .build()
795                .unwrap(),
796        );
797        let props = Default::default();
798        let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
799        let row_group_writer = writer.next_row_group().unwrap();
800        let res = row_group_writer.close();
801        assert!(res.is_err());
802        if let Err(err) = res {
803            assert_eq!(
804                format!("{err}"),
805                "Parquet error: Column length mismatch: 1 != 0"
806            );
807        }
808    }
809
810    #[test]
811    fn test_row_group_writer_num_records_mismatch() {
812        let file = tempfile::tempfile().unwrap();
813        let schema = Arc::new(
814            types::Type::group_type_builder("schema")
815                .with_fields(vec![
816                    Arc::new(
817                        types::Type::primitive_type_builder("col1", Type::INT32)
818                            .with_repetition(Repetition::REQUIRED)
819                            .build()
820                            .unwrap(),
821                    ),
822                    Arc::new(
823                        types::Type::primitive_type_builder("col2", Type::INT32)
824                            .with_repetition(Repetition::REQUIRED)
825                            .build()
826                            .unwrap(),
827                    ),
828                ])
829                .build()
830                .unwrap(),
831        );
832        let props = Default::default();
833        let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
834        let mut row_group_writer = writer.next_row_group().unwrap();
835
836        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
837        col_writer
838            .typed::<Int32Type>()
839            .write_batch(&[1, 2, 3], None, None)
840            .unwrap();
841        col_writer.close().unwrap();
842
843        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
844        col_writer
845            .typed::<Int32Type>()
846            .write_batch(&[1, 2], None, None)
847            .unwrap();
848
849        let err = col_writer.close().unwrap_err();
850        assert_eq!(
851            err.to_string(),
852            "Parquet error: Incorrect number of rows, expected 3 != 2 rows"
853        );
854    }
855
856    #[test]
857    fn test_file_writer_empty_file() {
858        let file = tempfile::tempfile().unwrap();
859
860        let schema = Arc::new(
861            types::Type::group_type_builder("schema")
862                .with_fields(vec![Arc::new(
863                    types::Type::primitive_type_builder("col1", Type::INT32)
864                        .build()
865                        .unwrap(),
866                )])
867                .build()
868                .unwrap(),
869        );
870        let props = Default::default();
871        let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
872        writer.close().unwrap();
873
874        let reader = SerializedFileReader::new(file).unwrap();
875        assert_eq!(reader.get_row_iter(None).unwrap().count(), 0);
876    }
877
878    #[test]
879    fn test_file_writer_column_orders_populated() {
880        let file = tempfile::tempfile().unwrap();
881
882        let schema = Arc::new(
883            types::Type::group_type_builder("schema")
884                .with_fields(vec![
885                    Arc::new(
886                        types::Type::primitive_type_builder("col1", Type::INT32)
887                            .build()
888                            .unwrap(),
889                    ),
890                    Arc::new(
891                        types::Type::primitive_type_builder("col2", Type::FIXED_LEN_BYTE_ARRAY)
892                            .with_converted_type(ConvertedType::INTERVAL)
893                            .with_length(12)
894                            .build()
895                            .unwrap(),
896                    ),
897                    Arc::new(
898                        types::Type::group_type_builder("nested")
899                            .with_repetition(Repetition::REQUIRED)
900                            .with_fields(vec![
901                                Arc::new(
902                                    types::Type::primitive_type_builder(
903                                        "col3",
904                                        Type::FIXED_LEN_BYTE_ARRAY,
905                                    )
906                                    .with_logical_type(Some(LogicalType::Float16))
907                                    .with_length(2)
908                                    .build()
909                                    .unwrap(),
910                                ),
911                                Arc::new(
912                                    types::Type::primitive_type_builder("col4", Type::BYTE_ARRAY)
913                                        .with_logical_type(Some(LogicalType::String))
914                                        .build()
915                                        .unwrap(),
916                                ),
917                            ])
918                            .build()
919                            .unwrap(),
920                    ),
921                ])
922                .build()
923                .unwrap(),
924        );
925
926        let props = Default::default();
927        let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
928        writer.close().unwrap();
929
930        let reader = SerializedFileReader::new(file).unwrap();
931
932        // only leaves
933        let expected = vec![
934            // INT32
935            ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED),
936            // INTERVAL
937            ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::UNDEFINED),
938            // Float16
939            ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED),
940            // String
941            ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::UNSIGNED),
942        ];
943        let actual = reader.metadata().file_metadata().column_orders();
944
945        assert!(actual.is_some());
946        let actual = actual.unwrap();
947        assert_eq!(*actual, expected);
948    }
949
950    #[test]
951    fn test_file_writer_with_metadata() {
952        let file = tempfile::tempfile().unwrap();
953
954        let schema = Arc::new(
955            types::Type::group_type_builder("schema")
956                .with_fields(vec![Arc::new(
957                    types::Type::primitive_type_builder("col1", Type::INT32)
958                        .build()
959                        .unwrap(),
960                )])
961                .build()
962                .unwrap(),
963        );
964        let props = Arc::new(
965            WriterProperties::builder()
966                .set_key_value_metadata(Some(vec![KeyValue::new(
967                    "key".to_string(),
968                    "value".to_string(),
969                )]))
970                .build(),
971        );
972        let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
973        writer.close().unwrap();
974
975        let reader = SerializedFileReader::new(file).unwrap();
976        assert_eq!(
977            reader
978                .metadata()
979                .file_metadata()
980                .key_value_metadata()
981                .to_owned()
982                .unwrap()
983                .len(),
984            1
985        );
986    }
987
988    #[test]
989    fn test_file_writer_v2_with_metadata() {
990        let file = tempfile::tempfile().unwrap();
991        let field_logical_type = Some(LogicalType::Integer {
992            bit_width: 8,
993            is_signed: false,
994        });
995        let field = Arc::new(
996            types::Type::primitive_type_builder("col1", Type::INT32)
997                .with_logical_type(field_logical_type.clone())
998                .with_converted_type(field_logical_type.into())
999                .build()
1000                .unwrap(),
1001        );
1002        let schema = Arc::new(
1003            types::Type::group_type_builder("schema")
1004                .with_fields(vec![field.clone()])
1005                .build()
1006                .unwrap(),
1007        );
1008        let props = Arc::new(
1009            WriterProperties::builder()
1010                .set_key_value_metadata(Some(vec![KeyValue::new(
1011                    "key".to_string(),
1012                    "value".to_string(),
1013                )]))
1014                .set_writer_version(WriterVersion::PARQUET_2_0)
1015                .build(),
1016        );
1017        let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1018        writer.close().unwrap();
1019
1020        let reader = SerializedFileReader::new(file).unwrap();
1021
1022        assert_eq!(
1023            reader
1024                .metadata()
1025                .file_metadata()
1026                .key_value_metadata()
1027                .to_owned()
1028                .unwrap()
1029                .len(),
1030            1
1031        );
1032
1033        // ARROW-11803: Test that the converted and logical types have been populated
1034        let fields = reader.metadata().file_metadata().schema().get_fields();
1035        assert_eq!(fields.len(), 1);
1036        assert_eq!(fields[0], field);
1037    }
1038
1039    #[test]
1040    fn test_file_writer_with_sorting_columns_metadata() {
1041        let file = tempfile::tempfile().unwrap();
1042
1043        let schema = Arc::new(
1044            types::Type::group_type_builder("schema")
1045                .with_fields(vec![
1046                    Arc::new(
1047                        types::Type::primitive_type_builder("col1", Type::INT32)
1048                            .build()
1049                            .unwrap(),
1050                    ),
1051                    Arc::new(
1052                        types::Type::primitive_type_builder("col2", Type::INT32)
1053                            .build()
1054                            .unwrap(),
1055                    ),
1056                ])
1057                .build()
1058                .unwrap(),
1059        );
1060        let expected_result = Some(vec![SortingColumn {
1061            column_idx: 0,
1062            descending: false,
1063            nulls_first: true,
1064        }]);
1065        let props = Arc::new(
1066            WriterProperties::builder()
1067                .set_key_value_metadata(Some(vec![KeyValue::new(
1068                    "key".to_string(),
1069                    "value".to_string(),
1070                )]))
1071                .set_sorting_columns(expected_result.clone())
1072                .build(),
1073        );
1074        let mut writer =
1075            SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1076        let mut row_group_writer = writer.next_row_group().expect("get row group writer");
1077
1078        let col_writer = row_group_writer.next_column().unwrap().unwrap();
1079        col_writer.close().unwrap();
1080
1081        let col_writer = row_group_writer.next_column().unwrap().unwrap();
1082        col_writer.close().unwrap();
1083
1084        row_group_writer.close().unwrap();
1085        writer.close().unwrap();
1086
1087        let reader = SerializedFileReader::new(file).unwrap();
1088        let result: Vec<Option<&Vec<SortingColumn>>> = reader
1089            .metadata()
1090            .row_groups()
1091            .iter()
1092            .map(|f| f.sorting_columns())
1093            .collect();
1094        // validate the sorting column read match the one written above
1095        assert_eq!(expected_result.as_ref(), result[0]);
1096    }
1097
1098    #[test]
1099    fn test_file_writer_empty_row_groups() {
1100        let file = tempfile::tempfile().unwrap();
1101        test_file_roundtrip(file, vec![]);
1102    }
1103
1104    #[test]
1105    fn test_file_writer_single_row_group() {
1106        let file = tempfile::tempfile().unwrap();
1107        test_file_roundtrip(file, vec![vec![1, 2, 3, 4, 5]]);
1108    }
1109
1110    #[test]
1111    fn test_file_writer_multiple_row_groups() {
1112        let file = tempfile::tempfile().unwrap();
1113        test_file_roundtrip(
1114            file,
1115            vec![
1116                vec![1, 2, 3, 4, 5],
1117                vec![1, 2, 3],
1118                vec![1],
1119                vec![1, 2, 3, 4, 5, 6],
1120            ],
1121        );
1122    }
1123
1124    #[test]
1125    fn test_file_writer_multiple_large_row_groups() {
1126        let file = tempfile::tempfile().unwrap();
1127        test_file_roundtrip(
1128            file,
1129            vec![vec![123; 1024], vec![124; 1000], vec![125; 15], vec![]],
1130        );
1131    }
1132
1133    #[test]
1134    fn test_page_writer_data_pages() {
1135        let pages = vec![
1136            Page::DataPage {
1137                buf: Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]),
1138                num_values: 10,
1139                encoding: Encoding::DELTA_BINARY_PACKED,
1140                def_level_encoding: Encoding::RLE,
1141                rep_level_encoding: Encoding::RLE,
1142                statistics: Some(Statistics::int32(Some(1), Some(3), None, Some(7), true)),
1143            },
1144            Page::DataPageV2 {
1145                buf: Bytes::from(vec![4; 128]),
1146                num_values: 10,
1147                encoding: Encoding::DELTA_BINARY_PACKED,
1148                num_nulls: 2,
1149                num_rows: 12,
1150                def_levels_byte_len: 24,
1151                rep_levels_byte_len: 32,
1152                is_compressed: false,
1153                statistics: Some(Statistics::int32(Some(1), Some(3), None, Some(7), true)),
1154            },
1155        ];
1156
1157        test_page_roundtrip(&pages[..], Compression::SNAPPY, Type::INT32);
1158        test_page_roundtrip(&pages[..], Compression::UNCOMPRESSED, Type::INT32);
1159    }
1160
1161    #[test]
1162    fn test_page_writer_dict_pages() {
1163        let pages = vec![
1164            Page::DictionaryPage {
1165                buf: Bytes::from(vec![1, 2, 3, 4, 5]),
1166                num_values: 5,
1167                encoding: Encoding::RLE_DICTIONARY,
1168                is_sorted: false,
1169            },
1170            Page::DataPage {
1171                buf: Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]),
1172                num_values: 10,
1173                encoding: Encoding::DELTA_BINARY_PACKED,
1174                def_level_encoding: Encoding::RLE,
1175                rep_level_encoding: Encoding::RLE,
1176                statistics: Some(Statistics::int32(Some(1), Some(3), None, Some(7), true)),
1177            },
1178            Page::DataPageV2 {
1179                buf: Bytes::from(vec![4; 128]),
1180                num_values: 10,
1181                encoding: Encoding::DELTA_BINARY_PACKED,
1182                num_nulls: 2,
1183                num_rows: 12,
1184                def_levels_byte_len: 24,
1185                rep_levels_byte_len: 32,
1186                is_compressed: false,
1187                statistics: None,
1188            },
1189        ];
1190
1191        test_page_roundtrip(&pages[..], Compression::SNAPPY, Type::INT32);
1192        test_page_roundtrip(&pages[..], Compression::UNCOMPRESSED, Type::INT32);
1193    }
1194
1195    /// Tests writing and reading pages.
1196    /// Physical type is for statistics only, should match any defined statistics type in
1197    /// pages.
1198    fn test_page_roundtrip(pages: &[Page], codec: Compression, physical_type: Type) {
1199        let mut compressed_pages = vec![];
1200        let mut total_num_values = 0i64;
1201        let codec_options = CodecOptionsBuilder::default()
1202            .set_backward_compatible_lz4(false)
1203            .build();
1204        let mut compressor = create_codec(codec, &codec_options).unwrap();
1205
1206        for page in pages {
1207            let uncompressed_len = page.buffer().len();
1208
1209            let compressed_page = match *page {
1210                Page::DataPage {
1211                    ref buf,
1212                    num_values,
1213                    encoding,
1214                    def_level_encoding,
1215                    rep_level_encoding,
1216                    ref statistics,
1217                } => {
1218                    total_num_values += num_values as i64;
1219                    let output_buf = compress_helper(compressor.as_mut(), buf);
1220
1221                    Page::DataPage {
1222                        buf: Bytes::from(output_buf),
1223                        num_values,
1224                        encoding,
1225                        def_level_encoding,
1226                        rep_level_encoding,
1227                        statistics: from_thrift(physical_type, to_thrift(statistics.as_ref()))
1228                            .unwrap(),
1229                    }
1230                }
1231                Page::DataPageV2 {
1232                    ref buf,
1233                    num_values,
1234                    encoding,
1235                    num_nulls,
1236                    num_rows,
1237                    def_levels_byte_len,
1238                    rep_levels_byte_len,
1239                    ref statistics,
1240                    ..
1241                } => {
1242                    total_num_values += num_values as i64;
1243                    let offset = (def_levels_byte_len + rep_levels_byte_len) as usize;
1244                    let cmp_buf = compress_helper(compressor.as_mut(), &buf[offset..]);
1245                    let mut output_buf = Vec::from(&buf[..offset]);
1246                    output_buf.extend_from_slice(&cmp_buf[..]);
1247
1248                    Page::DataPageV2 {
1249                        buf: Bytes::from(output_buf),
1250                        num_values,
1251                        encoding,
1252                        num_nulls,
1253                        num_rows,
1254                        def_levels_byte_len,
1255                        rep_levels_byte_len,
1256                        is_compressed: compressor.is_some(),
1257                        statistics: from_thrift(physical_type, to_thrift(statistics.as_ref()))
1258                            .unwrap(),
1259                    }
1260                }
1261                Page::DictionaryPage {
1262                    ref buf,
1263                    num_values,
1264                    encoding,
1265                    is_sorted,
1266                } => {
1267                    let output_buf = compress_helper(compressor.as_mut(), buf);
1268
1269                    Page::DictionaryPage {
1270                        buf: Bytes::from(output_buf),
1271                        num_values,
1272                        encoding,
1273                        is_sorted,
1274                    }
1275                }
1276            };
1277
1278            let compressed_page = CompressedPage::new(compressed_page, uncompressed_len);
1279            compressed_pages.push(compressed_page);
1280        }
1281
1282        let mut buffer: Vec<u8> = vec![];
1283        let mut result_pages: Vec<Page> = vec![];
1284        {
1285            let mut writer = TrackedWrite::new(&mut buffer);
1286            let mut page_writer = SerializedPageWriter::new(&mut writer);
1287
1288            for page in compressed_pages {
1289                page_writer.write_page(page).unwrap();
1290            }
1291            page_writer.close().unwrap();
1292        }
1293        {
1294            let reader = bytes::Bytes::from(buffer);
1295
1296            let t = types::Type::primitive_type_builder("t", physical_type)
1297                .build()
1298                .unwrap();
1299
1300            let desc = ColumnDescriptor::new(Arc::new(t), 0, 0, ColumnPath::new(vec![]));
1301            let meta = ColumnChunkMetaData::builder(Arc::new(desc))
1302                .set_compression(codec)
1303                .set_total_compressed_size(reader.len() as i64)
1304                .set_num_values(total_num_values)
1305                .build()
1306                .unwrap();
1307
1308            let props = ReaderProperties::builder()
1309                .set_backward_compatible_lz4(false)
1310                .build();
1311            let mut page_reader = SerializedPageReader::new_with_properties(
1312                Arc::new(reader),
1313                &meta,
1314                total_num_values as usize,
1315                None,
1316                Arc::new(props),
1317            )
1318            .unwrap();
1319
1320            while let Some(page) = page_reader.get_next_page().unwrap() {
1321                result_pages.push(page);
1322            }
1323        }
1324
1325        assert_eq!(result_pages.len(), pages.len());
1326        for i in 0..result_pages.len() {
1327            assert_page(&result_pages[i], &pages[i]);
1328        }
1329    }
1330
1331    /// Helper function to compress a slice
1332    fn compress_helper(compressor: Option<&mut Box<dyn Codec>>, data: &[u8]) -> Vec<u8> {
1333        let mut output_buf = vec![];
1334        if let Some(cmpr) = compressor {
1335            cmpr.compress(data, &mut output_buf).unwrap();
1336        } else {
1337            output_buf.extend_from_slice(data);
1338        }
1339        output_buf
1340    }
1341
1342    /// Check if pages match.
1343    fn assert_page(left: &Page, right: &Page) {
1344        assert_eq!(left.page_type(), right.page_type());
1345        assert_eq!(&left.buffer(), &right.buffer());
1346        assert_eq!(left.num_values(), right.num_values());
1347        assert_eq!(left.encoding(), right.encoding());
1348        assert_eq!(to_thrift(left.statistics()), to_thrift(right.statistics()));
1349    }
1350
1351    /// Tests roundtrip of i32 data written using `W` and read using `R`
1352    fn test_roundtrip_i32<W, R>(
1353        file: W,
1354        data: Vec<Vec<i32>>,
1355        compression: Compression,
1356    ) -> crate::format::FileMetaData
1357    where
1358        W: Write + Send,
1359        R: ChunkReader + From<W> + 'static,
1360    {
1361        test_roundtrip::<W, R, Int32Type, _>(file, data, |r| r.get_int(0).unwrap(), compression)
1362    }
1363
1364    /// Tests roundtrip of data of type `D` written using `W` and read using `R`
1365    /// and the provided `values` function
1366    fn test_roundtrip<W, R, D, F>(
1367        mut file: W,
1368        data: Vec<Vec<D::T>>,
1369        value: F,
1370        compression: Compression,
1371    ) -> crate::format::FileMetaData
1372    where
1373        W: Write + Send,
1374        R: ChunkReader + From<W> + 'static,
1375        D: DataType,
1376        F: Fn(Row) -> D::T,
1377    {
1378        let schema = Arc::new(
1379            types::Type::group_type_builder("schema")
1380                .with_fields(vec![Arc::new(
1381                    types::Type::primitive_type_builder("col1", D::get_physical_type())
1382                        .with_repetition(Repetition::REQUIRED)
1383                        .build()
1384                        .unwrap(),
1385                )])
1386                .build()
1387                .unwrap(),
1388        );
1389        let props = Arc::new(
1390            WriterProperties::builder()
1391                .set_compression(compression)
1392                .build(),
1393        );
1394        let mut file_writer = SerializedFileWriter::new(&mut file, schema, props).unwrap();
1395        let mut rows: i64 = 0;
1396
1397        for (idx, subset) in data.iter().enumerate() {
1398            let row_group_file_offset = file_writer.buf.bytes_written();
1399            let mut row_group_writer = file_writer.next_row_group().unwrap();
1400            if let Some(mut writer) = row_group_writer.next_column().unwrap() {
1401                rows += writer
1402                    .typed::<D>()
1403                    .write_batch(&subset[..], None, None)
1404                    .unwrap() as i64;
1405                writer.close().unwrap();
1406            }
1407            let last_group = row_group_writer.close().unwrap();
1408            let flushed = file_writer.flushed_row_groups();
1409            assert_eq!(flushed.len(), idx + 1);
1410            assert_eq!(Some(idx as i16), last_group.ordinal());
1411            assert_eq!(Some(row_group_file_offset as i64), last_group.file_offset());
1412            assert_eq!(&flushed[idx], last_group.as_ref());
1413        }
1414        let file_metadata = file_writer.close().unwrap();
1415
1416        let reader = SerializedFileReader::new(R::from(file)).unwrap();
1417        assert_eq!(reader.num_row_groups(), data.len());
1418        assert_eq!(
1419            reader.metadata().file_metadata().num_rows(),
1420            rows,
1421            "row count in metadata not equal to number of rows written"
1422        );
1423        for (i, item) in data.iter().enumerate().take(reader.num_row_groups()) {
1424            let row_group_reader = reader.get_row_group(i).unwrap();
1425            let iter = row_group_reader.get_row_iter(None).unwrap();
1426            let res: Vec<_> = iter.map(|row| row.unwrap()).map(&value).collect();
1427            let row_group_size = row_group_reader.metadata().total_byte_size();
1428            let uncompressed_size: i64 = row_group_reader
1429                .metadata()
1430                .columns()
1431                .iter()
1432                .map(|v| v.uncompressed_size())
1433                .sum();
1434            assert_eq!(row_group_size, uncompressed_size);
1435            assert_eq!(res, *item);
1436        }
1437        file_metadata
1438    }
1439
1440    /// File write-read roundtrip.
1441    /// `data` consists of arrays of values for each row group.
1442    fn test_file_roundtrip(file: File, data: Vec<Vec<i32>>) -> crate::format::FileMetaData {
1443        test_roundtrip_i32::<File, File>(file, data, Compression::UNCOMPRESSED)
1444    }
1445
1446    #[test]
1447    fn test_bytes_writer_empty_row_groups() {
1448        test_bytes_roundtrip(vec![], Compression::UNCOMPRESSED);
1449    }
1450
1451    #[test]
1452    fn test_bytes_writer_single_row_group() {
1453        test_bytes_roundtrip(vec![vec![1, 2, 3, 4, 5]], Compression::UNCOMPRESSED);
1454    }
1455
1456    #[test]
1457    fn test_bytes_writer_multiple_row_groups() {
1458        test_bytes_roundtrip(
1459            vec![
1460                vec![1, 2, 3, 4, 5],
1461                vec![1, 2, 3],
1462                vec![1],
1463                vec![1, 2, 3, 4, 5, 6],
1464            ],
1465            Compression::UNCOMPRESSED,
1466        );
1467    }
1468
1469    #[test]
1470    fn test_bytes_writer_single_row_group_compressed() {
1471        test_bytes_roundtrip(vec![vec![1, 2, 3, 4, 5]], Compression::SNAPPY);
1472    }
1473
1474    #[test]
1475    fn test_bytes_writer_multiple_row_groups_compressed() {
1476        test_bytes_roundtrip(
1477            vec![
1478                vec![1, 2, 3, 4, 5],
1479                vec![1, 2, 3],
1480                vec![1],
1481                vec![1, 2, 3, 4, 5, 6],
1482            ],
1483            Compression::SNAPPY,
1484        );
1485    }
1486
1487    fn test_bytes_roundtrip(data: Vec<Vec<i32>>, compression: Compression) {
1488        test_roundtrip_i32::<Vec<u8>, Bytes>(Vec::with_capacity(1024), data, compression);
1489    }
1490
1491    #[test]
1492    fn test_boolean_roundtrip() {
1493        let my_bool_values: Vec<_> = (0..2049).map(|idx| idx % 2 == 0).collect();
1494        test_roundtrip::<Vec<u8>, Bytes, BoolType, _>(
1495            Vec::with_capacity(1024),
1496            vec![my_bool_values],
1497            |r| r.get_bool(0).unwrap(),
1498            Compression::UNCOMPRESSED,
1499        );
1500    }
1501
1502    #[test]
1503    fn test_boolean_compressed_roundtrip() {
1504        let my_bool_values: Vec<_> = (0..2049).map(|idx| idx % 2 == 0).collect();
1505        test_roundtrip::<Vec<u8>, Bytes, BoolType, _>(
1506            Vec::with_capacity(1024),
1507            vec![my_bool_values],
1508            |r| r.get_bool(0).unwrap(),
1509            Compression::SNAPPY,
1510        );
1511    }
1512
1513    #[test]
1514    fn test_column_offset_index_file() {
1515        let file = tempfile::tempfile().unwrap();
1516        let file_metadata = test_file_roundtrip(file, vec![vec![1, 2, 3, 4, 5]]);
1517        file_metadata.row_groups.iter().for_each(|row_group| {
1518            row_group.columns.iter().for_each(|column_chunk| {
1519                assert_ne!(None, column_chunk.column_index_offset);
1520                assert_ne!(None, column_chunk.column_index_length);
1521
1522                assert_ne!(None, column_chunk.offset_index_offset);
1523                assert_ne!(None, column_chunk.offset_index_length);
1524            })
1525        });
1526    }
1527
1528    fn test_kv_metadata(initial_kv: Option<Vec<KeyValue>>, final_kv: Option<Vec<KeyValue>>) {
1529        let schema = Arc::new(
1530            types::Type::group_type_builder("schema")
1531                .with_fields(vec![Arc::new(
1532                    types::Type::primitive_type_builder("col1", Type::INT32)
1533                        .with_repetition(Repetition::REQUIRED)
1534                        .build()
1535                        .unwrap(),
1536                )])
1537                .build()
1538                .unwrap(),
1539        );
1540        let mut out = Vec::with_capacity(1024);
1541        let props = Arc::new(
1542            WriterProperties::builder()
1543                .set_key_value_metadata(initial_kv.clone())
1544                .build(),
1545        );
1546        let mut writer = SerializedFileWriter::new(&mut out, schema, props).unwrap();
1547        let mut row_group_writer = writer.next_row_group().unwrap();
1548        let column = row_group_writer.next_column().unwrap().unwrap();
1549        column.close().unwrap();
1550        row_group_writer.close().unwrap();
1551        if let Some(kvs) = &final_kv {
1552            for kv in kvs {
1553                writer.append_key_value_metadata(kv.clone())
1554            }
1555        }
1556        writer.close().unwrap();
1557
1558        let reader = SerializedFileReader::new(Bytes::from(out)).unwrap();
1559        let metadata = reader.metadata().file_metadata();
1560        let keys = metadata.key_value_metadata();
1561
1562        match (initial_kv, final_kv) {
1563            (Some(a), Some(b)) => {
1564                let keys = keys.unwrap();
1565                assert_eq!(keys.len(), a.len() + b.len());
1566                assert_eq!(&keys[..a.len()], a.as_slice());
1567                assert_eq!(&keys[a.len()..], b.as_slice());
1568            }
1569            (Some(v), None) => assert_eq!(keys.unwrap(), &v),
1570            (None, Some(v)) if !v.is_empty() => assert_eq!(keys.unwrap(), &v),
1571            _ => assert!(keys.is_none()),
1572        }
1573    }
1574
1575    #[test]
1576    fn test_append_metadata() {
1577        let kv1 = KeyValue::new("cupcakes".to_string(), "awesome".to_string());
1578        let kv2 = KeyValue::new("bingo".to_string(), "bongo".to_string());
1579
1580        test_kv_metadata(None, None);
1581        test_kv_metadata(Some(vec![kv1.clone()]), None);
1582        test_kv_metadata(None, Some(vec![kv2.clone()]));
1583        test_kv_metadata(Some(vec![kv1.clone()]), Some(vec![kv2.clone()]));
1584        test_kv_metadata(Some(vec![]), Some(vec![kv2]));
1585        test_kv_metadata(Some(vec![]), Some(vec![]));
1586        test_kv_metadata(Some(vec![kv1]), Some(vec![]));
1587        test_kv_metadata(None, Some(vec![]));
1588    }
1589
1590    #[test]
1591    fn test_backwards_compatible_statistics() {
1592        let message_type = "
1593            message test_schema {
1594                REQUIRED INT32 decimal1 (DECIMAL(8,2));
1595                REQUIRED INT32 i32 (INTEGER(32,true));
1596                REQUIRED INT32 u32 (INTEGER(32,false));
1597            }
1598        ";
1599
1600        let schema = Arc::new(parse_message_type(message_type).unwrap());
1601        let props = Default::default();
1602        let mut writer = SerializedFileWriter::new(vec![], schema, props).unwrap();
1603        let mut row_group_writer = writer.next_row_group().unwrap();
1604
1605        for _ in 0..3 {
1606            let mut writer = row_group_writer.next_column().unwrap().unwrap();
1607            writer
1608                .typed::<Int32Type>()
1609                .write_batch(&[1, 2, 3], None, None)
1610                .unwrap();
1611            writer.close().unwrap();
1612        }
1613        let metadata = row_group_writer.close().unwrap();
1614        writer.close().unwrap();
1615
1616        let thrift = metadata.to_thrift();
1617        let encoded_stats: Vec<_> = thrift
1618            .columns
1619            .into_iter()
1620            .map(|x| x.meta_data.unwrap().statistics.unwrap())
1621            .collect();
1622
1623        // decimal
1624        let s = &encoded_stats[0];
1625        assert_eq!(s.min.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1626        assert_eq!(s.max.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1627        assert_eq!(s.min_value.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1628        assert_eq!(s.max_value.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1629
1630        // i32
1631        let s = &encoded_stats[1];
1632        assert_eq!(s.min.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1633        assert_eq!(s.max.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1634        assert_eq!(s.min_value.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1635        assert_eq!(s.max_value.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1636
1637        // u32
1638        let s = &encoded_stats[2];
1639        assert_eq!(s.min.as_deref(), None);
1640        assert_eq!(s.max.as_deref(), None);
1641        assert_eq!(s.min_value.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1642        assert_eq!(s.max_value.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1643    }
1644
1645    #[test]
1646    fn test_spliced_write() {
1647        let message_type = "
1648            message test_schema {
1649                REQUIRED INT32 i32 (INTEGER(32,true));
1650                REQUIRED INT32 u32 (INTEGER(32,false));
1651            }
1652        ";
1653        let schema = Arc::new(parse_message_type(message_type).unwrap());
1654        let props = Arc::new(WriterProperties::builder().build());
1655
1656        let mut file = Vec::with_capacity(1024);
1657        let mut file_writer = SerializedFileWriter::new(&mut file, schema, props.clone()).unwrap();
1658
1659        let columns = file_writer.descr.columns();
1660        let mut column_state: Vec<(_, Option<ColumnCloseResult>)> = columns
1661            .iter()
1662            .map(|_| (TrackedWrite::new(Vec::with_capacity(1024)), None))
1663            .collect();
1664
1665        let mut column_state_slice = column_state.as_mut_slice();
1666        let mut column_writers = Vec::with_capacity(columns.len());
1667        for c in columns {
1668            let ((buf, out), tail) = column_state_slice.split_first_mut().unwrap();
1669            column_state_slice = tail;
1670
1671            let page_writer = Box::new(SerializedPageWriter::new(buf));
1672            let col_writer = get_column_writer(c.clone(), props.clone(), page_writer);
1673            column_writers.push(SerializedColumnWriter::new(
1674                col_writer,
1675                Some(Box::new(|on_close| {
1676                    *out = Some(on_close);
1677                    Ok(())
1678                })),
1679            ));
1680        }
1681
1682        let column_data = [[1, 2, 3, 4], [7, 3, 7, 3]];
1683
1684        // Interleaved writing to the column writers
1685        for (writer, batch) in column_writers.iter_mut().zip(column_data) {
1686            let writer = writer.typed::<Int32Type>();
1687            writer.write_batch(&batch, None, None).unwrap();
1688        }
1689
1690        // Close the column writers
1691        for writer in column_writers {
1692            writer.close().unwrap()
1693        }
1694
1695        // Splice column data into a row group
1696        let mut row_group_writer = file_writer.next_row_group().unwrap();
1697        for (write, close) in column_state {
1698            let buf = Bytes::from(write.into_inner().unwrap());
1699            row_group_writer
1700                .append_column(&buf, close.unwrap())
1701                .unwrap();
1702        }
1703        row_group_writer.close().unwrap();
1704        file_writer.close().unwrap();
1705
1706        // Check data was written correctly
1707        let file = Bytes::from(file);
1708        let test_read = |reader: SerializedFileReader<Bytes>| {
1709            let row_group = reader.get_row_group(0).unwrap();
1710
1711            let mut out = Vec::with_capacity(4);
1712            let c1 = row_group.get_column_reader(0).unwrap();
1713            let mut c1 = get_typed_column_reader::<Int32Type>(c1);
1714            c1.read_records(4, None, None, &mut out).unwrap();
1715            assert_eq!(out, column_data[0]);
1716
1717            out.clear();
1718
1719            let c2 = row_group.get_column_reader(1).unwrap();
1720            let mut c2 = get_typed_column_reader::<Int32Type>(c2);
1721            c2.read_records(4, None, None, &mut out).unwrap();
1722            assert_eq!(out, column_data[1]);
1723        };
1724
1725        let reader = SerializedFileReader::new(file.clone()).unwrap();
1726        test_read(reader);
1727
1728        let options = ReadOptionsBuilder::new().with_page_index().build();
1729        let reader = SerializedFileReader::new_with_options(file, options).unwrap();
1730        test_read(reader);
1731    }
1732
1733    #[test]
1734    fn test_disabled_statistics() {
1735        let message_type = "
1736            message test_schema {
1737                REQUIRED INT32 a;
1738                REQUIRED INT32 b;
1739            }
1740        ";
1741        let schema = Arc::new(parse_message_type(message_type).unwrap());
1742        let props = WriterProperties::builder()
1743            .set_statistics_enabled(EnabledStatistics::None)
1744            .set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
1745            .set_offset_index_disabled(true) // this should be ignored because of the line above
1746            .build();
1747        let mut file = Vec::with_capacity(1024);
1748        let mut file_writer =
1749            SerializedFileWriter::new(&mut file, schema, Arc::new(props)).unwrap();
1750
1751        let mut row_group_writer = file_writer.next_row_group().unwrap();
1752        let mut a_writer = row_group_writer.next_column().unwrap().unwrap();
1753        let col_writer = a_writer.typed::<Int32Type>();
1754        col_writer.write_batch(&[1, 2, 3], None, None).unwrap();
1755        a_writer.close().unwrap();
1756
1757        let mut b_writer = row_group_writer.next_column().unwrap().unwrap();
1758        let col_writer = b_writer.typed::<Int32Type>();
1759        col_writer.write_batch(&[4, 5, 6], None, None).unwrap();
1760        b_writer.close().unwrap();
1761        row_group_writer.close().unwrap();
1762
1763        let metadata = file_writer.finish().unwrap();
1764        assert_eq!(metadata.row_groups.len(), 1);
1765        let row_group = &metadata.row_groups[0];
1766        assert_eq!(row_group.columns.len(), 2);
1767        // Column "a" has both offset and column index, as requested
1768        assert!(row_group.columns[0].offset_index_offset.is_some());
1769        assert!(row_group.columns[0].column_index_offset.is_some());
1770        // Column "b" should only have offset index
1771        assert!(row_group.columns[1].offset_index_offset.is_some());
1772        assert!(row_group.columns[1].column_index_offset.is_none());
1773
1774        let err = file_writer.next_row_group().err().unwrap().to_string();
1775        assert_eq!(err, "Parquet error: SerializedFileWriter already finished");
1776
1777        drop(file_writer);
1778
1779        let options = ReadOptionsBuilder::new().with_page_index().build();
1780        let reader = SerializedFileReader::new_with_options(Bytes::from(file), options).unwrap();
1781
1782        let offset_index = reader.metadata().offset_index().unwrap();
1783        assert_eq!(offset_index.len(), 1); // 1 row group
1784        assert_eq!(offset_index[0].len(), 2); // 2 columns
1785
1786        let column_index = reader.metadata().column_index().unwrap();
1787        assert_eq!(column_index.len(), 1); // 1 row group
1788        assert_eq!(column_index[0].len(), 2); // 2 column
1789
1790        let a_idx = &column_index[0][0];
1791        assert!(matches!(a_idx, Index::INT32(_)), "{a_idx:?}");
1792        let b_idx = &column_index[0][1];
1793        assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
1794    }
1795
1796    #[test]
1797    fn test_byte_array_size_statistics() {
1798        let message_type = "
1799            message test_schema {
1800                OPTIONAL BYTE_ARRAY a (UTF8);
1801            }
1802        ";
1803        let schema = Arc::new(parse_message_type(message_type).unwrap());
1804        let data = ByteArrayType::gen_vec(32, 7);
1805        let def_levels = [1, 1, 1, 1, 0, 1, 0, 1, 0, 1];
1806        let unenc_size: i64 = data.iter().map(|x| x.len() as i64).sum();
1807        let file: File = tempfile::tempfile().unwrap();
1808        let props = Arc::new(
1809            WriterProperties::builder()
1810                .set_statistics_enabled(EnabledStatistics::Page)
1811                .build(),
1812        );
1813
1814        let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
1815        let mut row_group_writer = writer.next_row_group().unwrap();
1816
1817        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
1818        col_writer
1819            .typed::<ByteArrayType>()
1820            .write_batch(&data, Some(&def_levels), None)
1821            .unwrap();
1822        col_writer.close().unwrap();
1823        row_group_writer.close().unwrap();
1824        let file_metadata = writer.close().unwrap();
1825
1826        assert_eq!(file_metadata.row_groups.len(), 1);
1827        assert_eq!(file_metadata.row_groups[0].columns.len(), 1);
1828        assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
1829
1830        let check_def_hist = |def_hist: &[i64]| {
1831            assert_eq!(def_hist.len(), 2);
1832            assert_eq!(def_hist[0], 3);
1833            assert_eq!(def_hist[1], 7);
1834        };
1835
1836        assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
1837        let meta_data = file_metadata.row_groups[0].columns[0]
1838            .meta_data
1839            .as_ref()
1840            .unwrap();
1841        assert!(meta_data.size_statistics.is_some());
1842        let size_stats = meta_data.size_statistics.as_ref().unwrap();
1843
1844        assert!(size_stats.repetition_level_histogram.is_none());
1845        assert!(size_stats.definition_level_histogram.is_some());
1846        assert!(size_stats.unencoded_byte_array_data_bytes.is_some());
1847        assert_eq!(
1848            unenc_size,
1849            size_stats.unencoded_byte_array_data_bytes.unwrap()
1850        );
1851        check_def_hist(size_stats.definition_level_histogram.as_ref().unwrap());
1852
1853        // check that the read metadata is also correct
1854        let options = ReadOptionsBuilder::new().with_page_index().build();
1855        let reader = SerializedFileReader::new_with_options(file, options).unwrap();
1856
1857        let rfile_metadata = reader.metadata().file_metadata();
1858        assert_eq!(rfile_metadata.num_rows(), file_metadata.num_rows);
1859        assert_eq!(reader.num_row_groups(), 1);
1860        let rowgroup = reader.get_row_group(0).unwrap();
1861        assert_eq!(rowgroup.num_columns(), 1);
1862        let column = rowgroup.metadata().column(0);
1863        assert!(column.definition_level_histogram().is_some());
1864        assert!(column.repetition_level_histogram().is_none());
1865        assert!(column.unencoded_byte_array_data_bytes().is_some());
1866        check_def_hist(column.definition_level_histogram().unwrap().values());
1867        assert_eq!(
1868            unenc_size,
1869            column.unencoded_byte_array_data_bytes().unwrap()
1870        );
1871
1872        // check histogram in column index as well
1873        assert!(reader.metadata().column_index().is_some());
1874        let column_index = reader.metadata().column_index().unwrap();
1875        assert_eq!(column_index.len(), 1);
1876        assert_eq!(column_index[0].len(), 1);
1877        let col_idx = if let Index::BYTE_ARRAY(index) = &column_index[0][0] {
1878            assert_eq!(index.indexes.len(), 1);
1879            &index.indexes[0]
1880        } else {
1881            unreachable!()
1882        };
1883
1884        assert!(col_idx.repetition_level_histogram().is_none());
1885        assert!(col_idx.definition_level_histogram().is_some());
1886        check_def_hist(col_idx.definition_level_histogram().unwrap().values());
1887
1888        assert!(reader.metadata().offset_index().is_some());
1889        let offset_index = reader.metadata().offset_index().unwrap();
1890        assert_eq!(offset_index.len(), 1);
1891        assert_eq!(offset_index[0].len(), 1);
1892        assert!(offset_index[0][0].unencoded_byte_array_data_bytes.is_some());
1893        let page_sizes = offset_index[0][0]
1894            .unencoded_byte_array_data_bytes
1895            .as_ref()
1896            .unwrap();
1897        assert_eq!(page_sizes.len(), 1);
1898        assert_eq!(page_sizes[0], unenc_size);
1899    }
1900
1901    #[test]
1902    fn test_too_many_rowgroups() {
1903        let message_type = "
1904            message test_schema {
1905                REQUIRED BYTE_ARRAY a (UTF8);
1906            }
1907        ";
1908        let schema = Arc::new(parse_message_type(message_type).unwrap());
1909        let file: File = tempfile::tempfile().unwrap();
1910        let props = Arc::new(
1911            WriterProperties::builder()
1912                .set_statistics_enabled(EnabledStatistics::None)
1913                .set_max_row_group_size(1)
1914                .build(),
1915        );
1916        let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
1917
1918        // Create 32k empty rowgroups. Should error when i == 32768.
1919        for i in 0..0x8001 {
1920            match writer.next_row_group() {
1921                Ok(mut row_group_writer) => {
1922                    assert_ne!(i, 0x8000);
1923                    let col_writer = row_group_writer.next_column().unwrap().unwrap();
1924                    col_writer.close().unwrap();
1925                    row_group_writer.close().unwrap();
1926                }
1927                Err(e) => {
1928                    assert_eq!(i, 0x8000);
1929                    assert_eq!(
1930                        e.to_string(),
1931                        "Parquet error: Parquet does not support more than 32767 row groups per file (currently: 32768)"
1932                    );
1933                }
1934            }
1935        }
1936        writer.close().unwrap();
1937    }
1938
1939    #[test]
1940    fn test_size_statistics_with_repetition_and_nulls() {
1941        let message_type = "
1942            message test_schema {
1943                OPTIONAL group i32_list (LIST) {
1944                    REPEATED group list {
1945                        OPTIONAL INT32 element;
1946                    }
1947                }
1948            }
1949        ";
1950        // column is:
1951        // row 0: [1, 2]
1952        // row 1: NULL
1953        // row 2: [4, NULL]
1954        // row 3: []
1955        // row 4: [7, 8, 9, 10]
1956        let schema = Arc::new(parse_message_type(message_type).unwrap());
1957        let data = [1, 2, 4, 7, 8, 9, 10];
1958        let def_levels = [3, 3, 0, 3, 2, 1, 3, 3, 3, 3];
1959        let rep_levels = [0, 1, 0, 0, 1, 0, 0, 1, 1, 1];
1960        let file = tempfile::tempfile().unwrap();
1961        let props = Arc::new(
1962            WriterProperties::builder()
1963                .set_statistics_enabled(EnabledStatistics::Page)
1964                .build(),
1965        );
1966        let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
1967        let mut row_group_writer = writer.next_row_group().unwrap();
1968
1969        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
1970        col_writer
1971            .typed::<Int32Type>()
1972            .write_batch(&data, Some(&def_levels), Some(&rep_levels))
1973            .unwrap();
1974        col_writer.close().unwrap();
1975        row_group_writer.close().unwrap();
1976        let file_metadata = writer.close().unwrap();
1977
1978        assert_eq!(file_metadata.row_groups.len(), 1);
1979        assert_eq!(file_metadata.row_groups[0].columns.len(), 1);
1980        assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
1981
1982        let check_def_hist = |def_hist: &[i64]| {
1983            assert_eq!(def_hist.len(), 4);
1984            assert_eq!(def_hist[0], 1);
1985            assert_eq!(def_hist[1], 1);
1986            assert_eq!(def_hist[2], 1);
1987            assert_eq!(def_hist[3], 7);
1988        };
1989
1990        let check_rep_hist = |rep_hist: &[i64]| {
1991            assert_eq!(rep_hist.len(), 2);
1992            assert_eq!(rep_hist[0], 5);
1993            assert_eq!(rep_hist[1], 5);
1994        };
1995
1996        // check that histograms are set properly in the write and read metadata
1997        // also check that unencoded_byte_array_data_bytes is not set
1998        assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
1999        let meta_data = file_metadata.row_groups[0].columns[0]
2000            .meta_data
2001            .as_ref()
2002            .unwrap();
2003        assert!(meta_data.size_statistics.is_some());
2004        let size_stats = meta_data.size_statistics.as_ref().unwrap();
2005        assert!(size_stats.repetition_level_histogram.is_some());
2006        assert!(size_stats.definition_level_histogram.is_some());
2007        assert!(size_stats.unencoded_byte_array_data_bytes.is_none());
2008        check_def_hist(size_stats.definition_level_histogram.as_ref().unwrap());
2009        check_rep_hist(size_stats.repetition_level_histogram.as_ref().unwrap());
2010
2011        // check that the read metadata is also correct
2012        let options = ReadOptionsBuilder::new().with_page_index().build();
2013        let reader = SerializedFileReader::new_with_options(file, options).unwrap();
2014
2015        let rfile_metadata = reader.metadata().file_metadata();
2016        assert_eq!(rfile_metadata.num_rows(), file_metadata.num_rows);
2017        assert_eq!(reader.num_row_groups(), 1);
2018        let rowgroup = reader.get_row_group(0).unwrap();
2019        assert_eq!(rowgroup.num_columns(), 1);
2020        let column = rowgroup.metadata().column(0);
2021        assert!(column.definition_level_histogram().is_some());
2022        assert!(column.repetition_level_histogram().is_some());
2023        assert!(column.unencoded_byte_array_data_bytes().is_none());
2024        check_def_hist(column.definition_level_histogram().unwrap().values());
2025        check_rep_hist(column.repetition_level_histogram().unwrap().values());
2026
2027        // check histogram in column index as well
2028        assert!(reader.metadata().column_index().is_some());
2029        let column_index = reader.metadata().column_index().unwrap();
2030        assert_eq!(column_index.len(), 1);
2031        assert_eq!(column_index[0].len(), 1);
2032        let col_idx = if let Index::INT32(index) = &column_index[0][0] {
2033            assert_eq!(index.indexes.len(), 1);
2034            &index.indexes[0]
2035        } else {
2036            unreachable!()
2037        };
2038
2039        check_def_hist(col_idx.definition_level_histogram().unwrap().values());
2040        check_rep_hist(col_idx.repetition_level_histogram().unwrap().values());
2041
2042        assert!(reader.metadata().offset_index().is_some());
2043        let offset_index = reader.metadata().offset_index().unwrap();
2044        assert_eq!(offset_index.len(), 1);
2045        assert_eq!(offset_index[0].len(), 1);
2046        assert!(offset_index[0][0].unencoded_byte_array_data_bytes.is_none());
2047    }
2048
2049    #[test]
2050    #[cfg(feature = "arrow")]
2051    fn test_byte_stream_split_extended_roundtrip() {
2052        let path = format!(
2053            "{}/byte_stream_split_extended.gzip.parquet",
2054            arrow::util::test_util::parquet_test_data(),
2055        );
2056        let file = File::open(path).unwrap();
2057
2058        // Read in test file and rewrite to tmp
2059        let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(file)
2060            .expect("parquet open")
2061            .build()
2062            .expect("parquet open");
2063
2064        let file = tempfile::tempfile().unwrap();
2065        let props = WriterProperties::builder()
2066            .set_dictionary_enabled(false)
2067            .set_column_encoding(
2068                ColumnPath::from("float16_byte_stream_split"),
2069                Encoding::BYTE_STREAM_SPLIT,
2070            )
2071            .set_column_encoding(
2072                ColumnPath::from("float_byte_stream_split"),
2073                Encoding::BYTE_STREAM_SPLIT,
2074            )
2075            .set_column_encoding(
2076                ColumnPath::from("double_byte_stream_split"),
2077                Encoding::BYTE_STREAM_SPLIT,
2078            )
2079            .set_column_encoding(
2080                ColumnPath::from("int32_byte_stream_split"),
2081                Encoding::BYTE_STREAM_SPLIT,
2082            )
2083            .set_column_encoding(
2084                ColumnPath::from("int64_byte_stream_split"),
2085                Encoding::BYTE_STREAM_SPLIT,
2086            )
2087            .set_column_encoding(
2088                ColumnPath::from("flba5_byte_stream_split"),
2089                Encoding::BYTE_STREAM_SPLIT,
2090            )
2091            .set_column_encoding(
2092                ColumnPath::from("decimal_byte_stream_split"),
2093                Encoding::BYTE_STREAM_SPLIT,
2094            )
2095            .build();
2096
2097        let mut parquet_writer = ArrowWriter::try_new(
2098            file.try_clone().expect("cannot open file"),
2099            parquet_reader.schema(),
2100            Some(props),
2101        )
2102        .expect("create arrow writer");
2103
2104        for maybe_batch in parquet_reader {
2105            let batch = maybe_batch.expect("reading batch");
2106            parquet_writer.write(&batch).expect("writing data");
2107        }
2108
2109        parquet_writer.close().expect("finalizing file");
2110
2111        let reader = SerializedFileReader::new(file).expect("Failed to create reader");
2112        let filemeta = reader.metadata();
2113
2114        // Make sure byte_stream_split encoding was used
2115        let check_encoding = |x: usize, filemeta: &ParquetMetaData| {
2116            assert!(filemeta
2117                .row_group(0)
2118                .column(x)
2119                .encodings()
2120                .contains(&Encoding::BYTE_STREAM_SPLIT));
2121        };
2122
2123        check_encoding(1, filemeta);
2124        check_encoding(3, filemeta);
2125        check_encoding(5, filemeta);
2126        check_encoding(7, filemeta);
2127        check_encoding(9, filemeta);
2128        check_encoding(11, filemeta);
2129        check_encoding(13, filemeta);
2130
2131        // Read back tmpfile and make sure all values are correct
2132        let mut iter = reader
2133            .get_row_iter(None)
2134            .expect("Failed to create row iterator");
2135
2136        let mut start = 0;
2137        let end = reader.metadata().file_metadata().num_rows();
2138
2139        let check_row = |row: Result<Row, ParquetError>| {
2140            assert!(row.is_ok());
2141            let r = row.unwrap();
2142            assert_eq!(r.get_float16(0).unwrap(), r.get_float16(1).unwrap());
2143            assert_eq!(r.get_float(2).unwrap(), r.get_float(3).unwrap());
2144            assert_eq!(r.get_double(4).unwrap(), r.get_double(5).unwrap());
2145            assert_eq!(r.get_int(6).unwrap(), r.get_int(7).unwrap());
2146            assert_eq!(r.get_long(8).unwrap(), r.get_long(9).unwrap());
2147            assert_eq!(r.get_bytes(10).unwrap(), r.get_bytes(11).unwrap());
2148            assert_eq!(r.get_decimal(12).unwrap(), r.get_decimal(13).unwrap());
2149        };
2150
2151        while start < end {
2152            match iter.next() {
2153                Some(row) => check_row(row),
2154                None => break,
2155            };
2156            start += 1;
2157        }
2158    }
2159}