Skip to main content

parquet/file/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`SerializedFileWriter`]: Low level Parquet writer API
19
20use crate::bloom_filter::Sbbf;
21use crate::file::metadata::thrift::PageHeader;
22use crate::file::page_index::column_index::ColumnIndexMetaData;
23use crate::file::page_index::offset_index::OffsetIndexMetaData;
24use crate::parquet_thrift::{ThriftCompactOutputProtocol, WriteThrift};
25use std::fmt::Debug;
26use std::io::{BufWriter, IoSlice, Read};
27use std::{io::Write, sync::Arc};
28
29use crate::column::page_encryption::PageEncryptor;
30use crate::column::writer::{ColumnCloseResult, ColumnWriterImpl, get_typed_column_writer_mut};
31use crate::column::{
32    page::{CompressedPage, PageWriteSpec, PageWriter},
33    writer::{ColumnWriter, get_column_writer},
34};
35use crate::data_type::DataType;
36#[cfg(feature = "encryption")]
37use crate::encryption::encrypt::{
38    FileEncryptionProperties, FileEncryptor, get_column_crypto_metadata,
39};
40use crate::errors::{ParquetError, Result};
41#[cfg(feature = "encryption")]
42use crate::file::PARQUET_MAGIC_ENCR_FOOTER;
43use crate::file::properties::{BloomFilterPosition, WriterPropertiesPtr};
44use crate::file::reader::ChunkReader;
45use crate::file::{PARQUET_MAGIC, metadata::*};
46use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor, TypePtr};
47
48/// A wrapper around a [`Write`] that keeps track of the number
49/// of bytes that have been written. The given [`Write`] is wrapped
50/// with a [`BufWriter`] to optimize writing performance.
51pub struct TrackedWrite<W: Write> {
52    inner: BufWriter<W>,
53    bytes_written: usize,
54}
55
56impl<W: Write> TrackedWrite<W> {
57    /// Create a new [`TrackedWrite`] from a [`Write`]
58    pub fn new(inner: W) -> Self {
59        let buf_write = BufWriter::new(inner);
60        Self {
61            inner: buf_write,
62            bytes_written: 0,
63        }
64    }
65
66    /// Returns the number of bytes written to this instance
67    pub fn bytes_written(&self) -> usize {
68        self.bytes_written
69    }
70
71    /// Returns a reference to the underlying writer.
72    pub fn inner(&self) -> &W {
73        self.inner.get_ref()
74    }
75
76    /// Returns a mutable reference to the underlying writer.
77    ///
78    /// It is inadvisable to directly write to the underlying writer, doing so
79    /// will likely result in data corruption
80    pub fn inner_mut(&mut self) -> &mut W {
81        self.inner.get_mut()
82    }
83
84    /// Returns the underlying writer.
85    pub fn into_inner(self) -> Result<W> {
86        self.inner.into_inner().map_err(|err| {
87            ParquetError::General(format!("fail to get inner writer: {:?}", err.to_string()))
88        })
89    }
90}
91
92impl<W: Write> Write for TrackedWrite<W> {
93    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
94        let bytes = self.inner.write(buf)?;
95        self.bytes_written += bytes;
96        Ok(bytes)
97    }
98
99    fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> std::io::Result<usize> {
100        let bytes = self.inner.write_vectored(bufs)?;
101        self.bytes_written += bytes;
102        Ok(bytes)
103    }
104
105    fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
106        self.inner.write_all(buf)?;
107        self.bytes_written += buf.len();
108
109        Ok(())
110    }
111
112    fn flush(&mut self) -> std::io::Result<()> {
113        self.inner.flush()
114    }
115}
116
117/// Callback invoked on closing a column chunk
118pub type OnCloseColumnChunk<'a> = Box<dyn FnOnce(ColumnCloseResult) -> Result<()> + 'a>;
119
120/// Callback invoked on closing a row group, arguments are:
121///
122/// - the row group metadata
123/// - the column index for each column chunk
124/// - the offset index for each column chunk
125pub type OnCloseRowGroup<'a, W> = Box<
126    dyn FnOnce(
127            &'a mut TrackedWrite<W>,
128            RowGroupMetaData,
129            Vec<Option<Sbbf>>,
130            Vec<Option<ColumnIndexMetaData>>,
131            Vec<Option<OffsetIndexMetaData>>,
132        ) -> Result<()>
133        + 'a
134        + Send,
135>;
136
137// ----------------------------------------------------------------------
138// Serialized impl for file & row group writers
139
140/// Parquet file writer API.
141///
142/// This is a low level API for writing Parquet files directly, and handles
143/// tracking the location of file structures such as row groups and column
144/// chunks, and writing the metadata and file footer.
145///
146/// Data is written to row groups using  [`SerializedRowGroupWriter`] and
147/// columns using [`SerializedColumnWriter`]. The `SerializedFileWriter` tracks
148/// where all the data is written, and assembles the final file metadata.
149///
150/// The main workflow should be as following:
151/// - Create file writer, this will open a new file and potentially write some metadata.
152/// - Request a new row group writer by calling `next_row_group`.
153/// - Once finished writing row group, close row group writer by calling `close`
154/// - Write subsequent row groups, if necessary.
155/// - After all row groups have been written, close the file writer using `close` method.
156pub struct SerializedFileWriter<W: Write> {
157    buf: TrackedWrite<W>,
158    descr: SchemaDescPtr,
159    props: WriterPropertiesPtr,
160    row_groups: Vec<RowGroupMetaData>,
161    bloom_filters: Vec<Vec<Option<Sbbf>>>,
162    column_indexes: Vec<Vec<Option<ColumnIndexMetaData>>>,
163    offset_indexes: Vec<Vec<Option<OffsetIndexMetaData>>>,
164    row_group_index: usize,
165    // kv_metadatas will be appended to `props` when `write_metadata`
166    kv_metadatas: Vec<KeyValue>,
167    finished: bool,
168    #[cfg(feature = "encryption")]
169    file_encryptor: Option<Arc<FileEncryptor>>,
170}
171
172impl<W: Write> Debug for SerializedFileWriter<W> {
173    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
174        // implement Debug so this can be used with #[derive(Debug)]
175        // in client code rather than actually listing all the fields
176        f.debug_struct("SerializedFileWriter")
177            .field("descr", &self.descr)
178            .field("row_group_index", &self.row_group_index)
179            .field("kv_metadatas", &self.kv_metadatas)
180            .finish_non_exhaustive()
181    }
182}
183
184impl<W: Write + Send> SerializedFileWriter<W> {
185    /// Creates new file writer.
186    pub fn new(buf: W, schema: TypePtr, properties: WriterPropertiesPtr) -> Result<Self> {
187        let mut buf = TrackedWrite::new(buf);
188
189        let schema_descriptor = SchemaDescriptor::new(schema.clone());
190
191        #[cfg(feature = "encryption")]
192        let file_encryptor = Self::get_file_encryptor(&properties, &schema_descriptor)?;
193
194        Self::start_file(&properties, &mut buf)?;
195        Ok(Self {
196            buf,
197            descr: Arc::new(schema_descriptor),
198            props: properties,
199            row_groups: vec![],
200            bloom_filters: vec![],
201            column_indexes: Vec::new(),
202            offset_indexes: Vec::new(),
203            row_group_index: 0,
204            kv_metadatas: Vec::new(),
205            finished: false,
206            #[cfg(feature = "encryption")]
207            file_encryptor,
208        })
209    }
210
211    #[cfg(feature = "encryption")]
212    fn get_file_encryptor(
213        properties: &WriterPropertiesPtr,
214        schema_descriptor: &SchemaDescriptor,
215    ) -> Result<Option<Arc<FileEncryptor>>> {
216        if let Some(file_encryption_properties) = properties.file_encryption_properties() {
217            file_encryption_properties.validate_encrypted_column_names(schema_descriptor)?;
218
219            Ok(Some(Arc::new(FileEncryptor::new(Arc::clone(
220                file_encryption_properties,
221            ))?)))
222        } else {
223            Ok(None)
224        }
225    }
226
227    /// Creates new row group from this file writer.
228    ///
229    /// Note: Parquet files are limited to at most 2^15 row groups in a file; and row groups must
230    /// be written sequentially.
231    ///
232    /// Every time the next row group is requested, the previous row group must
233    /// be finalised and closed using the [`SerializedRowGroupWriter::close`]
234    /// method or an error will be returned.
235    pub fn next_row_group(&mut self) -> Result<SerializedRowGroupWriter<'_, W>> {
236        self.assert_previous_writer_closed()?;
237        let ordinal = self.row_group_index;
238
239        let ordinal: i16 = ordinal.try_into().map_err(|_| {
240            ParquetError::General(format!(
241                "Parquet does not support more than {} row groups per file (currently: {})",
242                i16::MAX,
243                ordinal
244            ))
245        })?;
246
247        self.row_group_index = self
248            .row_group_index
249            .checked_add(1)
250            .expect("SerializedFileWriter::row_group_index overflowed");
251
252        let bloom_filter_position = self.properties().bloom_filter_position();
253        let row_groups = &mut self.row_groups;
254        let row_bloom_filters = &mut self.bloom_filters;
255        let row_column_indexes = &mut self.column_indexes;
256        let row_offset_indexes = &mut self.offset_indexes;
257        let on_close = move |buf,
258                             mut metadata,
259                             row_group_bloom_filter,
260                             row_group_column_index,
261                             row_group_offset_index| {
262            row_bloom_filters.push(row_group_bloom_filter);
263            row_column_indexes.push(row_group_column_index);
264            row_offset_indexes.push(row_group_offset_index);
265            // write bloom filters out immediately after the row group if requested
266            match bloom_filter_position {
267                BloomFilterPosition::AfterRowGroup => {
268                    write_bloom_filters(buf, row_bloom_filters, &mut metadata)?
269                }
270                BloomFilterPosition::End => (),
271            };
272            row_groups.push(metadata);
273            Ok(())
274        };
275
276        let row_group_writer = SerializedRowGroupWriter::new(
277            self.descr.clone(),
278            self.props.clone(),
279            &mut self.buf,
280            ordinal,
281            Some(Box::new(on_close)),
282        );
283        #[cfg(feature = "encryption")]
284        let row_group_writer = row_group_writer.with_file_encryptor(self.file_encryptor.clone());
285
286        Ok(row_group_writer)
287    }
288
289    /// Returns metadata for any flushed row groups
290    pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
291        &self.row_groups
292    }
293
294    /// Close and finalize the underlying Parquet writer
295    ///
296    /// Unlike [`Self::close`] this does not consume self
297    ///
298    /// Attempting to write after calling finish will result in an error
299    pub fn finish(&mut self) -> Result<ParquetMetaData> {
300        self.assert_previous_writer_closed()?;
301        let metadata = self.write_metadata()?;
302        self.buf.flush()?;
303        Ok(metadata)
304    }
305
306    /// Closes and finalises file writer, returning the file metadata.
307    pub fn close(mut self) -> Result<ParquetMetaData> {
308        self.finish()
309    }
310
311    /// Writes magic bytes at the beginning of the file.
312    #[cfg(not(feature = "encryption"))]
313    fn start_file(_properties: &WriterPropertiesPtr, buf: &mut TrackedWrite<W>) -> Result<()> {
314        buf.write_all(get_file_magic())?;
315        Ok(())
316    }
317
318    /// Writes magic bytes at the beginning of the file.
319    #[cfg(feature = "encryption")]
320    fn start_file(properties: &WriterPropertiesPtr, buf: &mut TrackedWrite<W>) -> Result<()> {
321        let magic = get_file_magic(properties.file_encryption_properties.as_ref());
322
323        buf.write_all(magic)?;
324        Ok(())
325    }
326
327    /// Assembles and writes metadata at the end of the file. This will take ownership
328    /// of `row_groups` and the page index structures.
329    fn write_metadata(&mut self) -> Result<ParquetMetaData> {
330        self.finished = true;
331
332        // write out any remaining bloom filters after all row groups
333        for row_group in &mut self.row_groups {
334            write_bloom_filters(&mut self.buf, &mut self.bloom_filters, row_group)?;
335        }
336
337        let key_value_metadata = match self.props.key_value_metadata() {
338            Some(kv) => Some(kv.iter().chain(&self.kv_metadatas).cloned().collect()),
339            None if self.kv_metadatas.is_empty() => None,
340            None => Some(self.kv_metadatas.clone()),
341        };
342
343        // take ownership of metadata
344        let row_groups = std::mem::take(&mut self.row_groups);
345        let column_indexes = std::mem::take(&mut self.column_indexes);
346        let offset_indexes = std::mem::take(&mut self.offset_indexes);
347
348        let write_path_in_schema = self.props.write_path_in_schema();
349        let mut encoder = ThriftMetadataWriter::new(
350            &mut self.buf,
351            &self.descr,
352            row_groups,
353            Some(self.props.created_by().to_string()),
354            self.props.writer_version().as_num(),
355            write_path_in_schema,
356        );
357
358        #[cfg(feature = "encryption")]
359        {
360            encoder = encoder.with_file_encryptor(self.file_encryptor.clone());
361        }
362
363        if let Some(key_value_metadata) = key_value_metadata {
364            encoder = encoder.with_key_value_metadata(key_value_metadata)
365        }
366
367        encoder = encoder.with_column_indexes(column_indexes);
368        if !self.props.offset_index_disabled() {
369            encoder = encoder.with_offset_indexes(offset_indexes);
370        }
371        encoder.finish()
372    }
373
374    #[inline]
375    fn assert_previous_writer_closed(&self) -> Result<()> {
376        if self.finished {
377            return Err(general_err!("SerializedFileWriter already finished"));
378        }
379
380        if self.row_group_index != self.row_groups.len() {
381            Err(general_err!("Previous row group writer was not closed"))
382        } else {
383            Ok(())
384        }
385    }
386
387    /// Add a [`KeyValue`] to the file writer's metadata
388    pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
389        self.kv_metadatas.push(kv_metadata);
390    }
391
392    /// Returns a reference to schema descriptor.
393    pub fn schema_descr(&self) -> &SchemaDescriptor {
394        &self.descr
395    }
396
397    /// Returns a reference to schema descriptor Arc.
398    #[cfg(feature = "arrow")]
399    pub(crate) fn schema_descr_ptr(&self) -> &SchemaDescPtr {
400        &self.descr
401    }
402
403    /// Returns a reference to the writer properties
404    pub fn properties(&self) -> &WriterPropertiesPtr {
405        &self.props
406    }
407
408    /// Returns a reference to the underlying writer.
409    pub fn inner(&self) -> &W {
410        self.buf.inner()
411    }
412
413    /// Writes the given buf bytes to the internal buffer.
414    ///
415    /// This can be used to write raw data to an in-progress Parquet file, for
416    /// example, custom index structures or other payloads. Other Parquet readers
417    /// will skip this data when reading the files.
418    ///
419    /// It's safe to use this method to write data to the underlying writer,
420    /// because it will ensure that the buffering and byte‐counting layers are used.
421    pub fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
422        self.buf.write_all(buf)
423    }
424
425    /// Flushes underlying writer
426    pub fn flush(&mut self) -> std::io::Result<()> {
427        self.buf.flush()
428    }
429
430    /// Returns a mutable reference to the underlying writer.
431    ///
432    /// **Warning**: if you write directly to this writer, you will skip
433    /// the `TrackedWrite` buffering and byte‐counting layers, which can cause
434    /// the file footer’s recorded offsets and sizes to diverge from reality,
435    /// resulting in an unreadable or corrupted Parquet file.
436    ///
437    /// If you want to write safely to the underlying writer, use [`Self::write_all`].
438    pub fn inner_mut(&mut self) -> &mut W {
439        self.buf.inner_mut()
440    }
441
442    /// Writes the file footer and returns the underlying writer.
443    pub fn into_inner(mut self) -> Result<W> {
444        self.assert_previous_writer_closed()?;
445        let _ = self.write_metadata()?;
446
447        self.buf.into_inner()
448    }
449
450    /// Returns the number of bytes written to this instance
451    pub fn bytes_written(&self) -> usize {
452        self.buf.bytes_written()
453    }
454
455    /// Get the file encryptor used by this instance to encrypt data
456    #[cfg(feature = "encryption")]
457    pub(crate) fn file_encryptor(&self) -> Option<Arc<FileEncryptor>> {
458        self.file_encryptor.clone()
459    }
460}
461
462/// Serialize all the bloom filters of the given row group to the given buffer,
463/// and returns the updated row group metadata.
464fn write_bloom_filters<W: Write + Send>(
465    buf: &mut TrackedWrite<W>,
466    bloom_filters: &mut [Vec<Option<Sbbf>>],
467    row_group: &mut RowGroupMetaData,
468) -> Result<()> {
469    // iter row group
470    // iter each column
471    // write bloom filter to the file
472
473    let row_group_idx: u16 = row_group
474        .ordinal()
475        .expect("Missing row group ordinal")
476        .try_into()
477        .map_err(|_| {
478            ParquetError::General(format!(
479                "Negative row group ordinal: {})",
480                row_group.ordinal().unwrap()
481            ))
482        })?;
483    let row_group_idx = row_group_idx as usize;
484    for (column_idx, column_chunk) in row_group.columns_mut().iter_mut().enumerate() {
485        if let Some(bloom_filter) = bloom_filters[row_group_idx][column_idx].take() {
486            let start_offset = buf.bytes_written();
487            bloom_filter.write(&mut *buf)?;
488            let end_offset = buf.bytes_written();
489            // set offset and index for bloom filter
490            *column_chunk = column_chunk
491                .clone()
492                .into_builder()
493                .set_bloom_filter_offset(Some(start_offset as i64))
494                .set_bloom_filter_length(Some((end_offset - start_offset) as i32))
495                .build()?;
496        }
497    }
498    Ok(())
499}
500
501/// Parquet row group writer API.
502///
503/// Provides methods to access column writers in an iterator-like fashion, order is
504/// guaranteed to match the order of schema leaves (column descriptors).
505///
506/// All columns should be written sequentially; the main workflow is:
507/// - Request the next column using `next_column` method - this will return `None` if no
508///   more columns are available to write.
509/// - Once done writing a column, close column writer with `close`
510/// - Once all columns have been written, close row group writer with `close`
511///   method. The close method will return row group metadata and is no-op
512///   on already closed row group.
513pub struct SerializedRowGroupWriter<'a, W: Write> {
514    descr: SchemaDescPtr,
515    props: WriterPropertiesPtr,
516    buf: &'a mut TrackedWrite<W>,
517    total_rows_written: Option<u64>,
518    total_bytes_written: u64,
519    total_uncompressed_bytes: i64,
520    column_index: usize,
521    row_group_metadata: Option<RowGroupMetaDataPtr>,
522    column_chunks: Vec<ColumnChunkMetaData>,
523    bloom_filters: Vec<Option<Sbbf>>,
524    column_indexes: Vec<Option<ColumnIndexMetaData>>,
525    offset_indexes: Vec<Option<OffsetIndexMetaData>>,
526    row_group_index: i16,
527    file_offset: i64,
528    on_close: Option<OnCloseRowGroup<'a, W>>,
529    #[cfg(feature = "encryption")]
530    file_encryptor: Option<Arc<FileEncryptor>>,
531}
532
533impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> {
534    /// Creates a new `SerializedRowGroupWriter` with:
535    ///
536    /// - `schema_descr` - the schema to write
537    /// - `properties` - writer properties
538    /// - `buf` - the buffer to write data to
539    /// - `row_group_index` - row group index in this parquet file.
540    /// - `file_offset` - file offset of this row group in this parquet file.
541    /// - `on_close` - an optional callback that will invoked on [`Self::close`]
542    pub fn new(
543        schema_descr: SchemaDescPtr,
544        properties: WriterPropertiesPtr,
545        buf: &'a mut TrackedWrite<W>,
546        row_group_index: i16,
547        on_close: Option<OnCloseRowGroup<'a, W>>,
548    ) -> Self {
549        let num_columns = schema_descr.num_columns();
550        let file_offset = buf.bytes_written() as i64;
551        Self {
552            buf,
553            row_group_index,
554            file_offset,
555            on_close,
556            total_rows_written: None,
557            descr: schema_descr,
558            props: properties,
559            column_index: 0,
560            row_group_metadata: None,
561            column_chunks: Vec::with_capacity(num_columns),
562            bloom_filters: Vec::with_capacity(num_columns),
563            column_indexes: Vec::with_capacity(num_columns),
564            offset_indexes: Vec::with_capacity(num_columns),
565            total_bytes_written: 0,
566            total_uncompressed_bytes: 0,
567            #[cfg(feature = "encryption")]
568            file_encryptor: None,
569        }
570    }
571
572    #[cfg(feature = "encryption")]
573    /// Set the file encryptor to use for encrypting row group data and metadata
574    pub(crate) fn with_file_encryptor(
575        mut self,
576        file_encryptor: Option<Arc<FileEncryptor>>,
577    ) -> Self {
578        self.file_encryptor = file_encryptor;
579        self
580    }
581
582    /// Advance `self.column_index` returning the next [`ColumnDescPtr`] if any
583    fn next_column_desc(&mut self) -> Option<ColumnDescPtr> {
584        let ret = self.descr.columns().get(self.column_index)?.clone();
585        self.column_index += 1;
586        Some(ret)
587    }
588
589    /// Returns [`OnCloseColumnChunk`] for the next writer
590    fn get_on_close(&mut self) -> (&mut TrackedWrite<W>, OnCloseColumnChunk<'_>) {
591        let total_bytes_written = &mut self.total_bytes_written;
592        let total_uncompressed_bytes = &mut self.total_uncompressed_bytes;
593        let total_rows_written = &mut self.total_rows_written;
594        let column_chunks = &mut self.column_chunks;
595        let column_indexes = &mut self.column_indexes;
596        let offset_indexes = &mut self.offset_indexes;
597        let bloom_filters = &mut self.bloom_filters;
598
599        let on_close = |r: ColumnCloseResult| {
600            // Update row group writer metrics
601            *total_bytes_written += r.bytes_written;
602            *total_uncompressed_bytes += r.metadata.uncompressed_size();
603            column_chunks.push(r.metadata);
604            bloom_filters.push(r.bloom_filter);
605            column_indexes.push(r.column_index);
606            offset_indexes.push(r.offset_index);
607
608            if let Some(rows) = *total_rows_written {
609                if rows != r.rows_written {
610                    return Err(general_err!(
611                        "Incorrect number of rows, expected {} != {} rows",
612                        rows,
613                        r.rows_written
614                    ));
615                }
616            } else {
617                *total_rows_written = Some(r.rows_written);
618            }
619
620            Ok(())
621        };
622        (self.buf, Box::new(on_close))
623    }
624
625    /// Returns the next column writer, if available, using the factory function;
626    /// otherwise returns `None`.
627    pub(crate) fn next_column_with_factory<'b, F, C>(&'b mut self, factory: F) -> Result<Option<C>>
628    where
629        F: FnOnce(
630            ColumnDescPtr,
631            WriterPropertiesPtr,
632            Box<dyn PageWriter + 'b>,
633            OnCloseColumnChunk<'b>,
634        ) -> Result<C>,
635    {
636        self.assert_previous_writer_closed()?;
637
638        let encryptor_context = self.get_page_encryptor_context();
639
640        Ok(match self.next_column_desc() {
641            Some(column) => {
642                let props = self.props.clone();
643                let (buf, on_close) = self.get_on_close();
644
645                let page_writer = SerializedPageWriter::new(buf);
646                let page_writer =
647                    Self::set_page_writer_encryptor(&column, encryptor_context, page_writer)?;
648
649                Some(factory(
650                    column,
651                    props,
652                    Box::new(page_writer),
653                    Box::new(on_close),
654                )?)
655            }
656            None => None,
657        })
658    }
659
660    /// Returns the next column writer, if available; otherwise returns `None`.
661    /// In case of any IO error or Thrift error, or if row group writer has already been
662    /// closed returns `Err`.
663    pub fn next_column(&mut self) -> Result<Option<SerializedColumnWriter<'_>>> {
664        self.next_column_with_factory(|descr, props, page_writer, on_close| {
665            let column_writer = get_column_writer(descr, props, page_writer);
666            Ok(SerializedColumnWriter::new(column_writer, Some(on_close)))
667        })
668    }
669
670    /// Append an encoded column chunk from `reader` directly to the underlying
671    /// writer.
672    ///
673    /// This method can be used for efficiently concatenating or projecting
674    /// Parquet data, or encoding Parquet data to temporary in-memory buffers.
675    ///
676    /// Arguments:
677    /// - `reader`: a [`ChunkReader`] containing the encoded column data
678    /// - `close`: the [`ColumnCloseResult`] metadata returned from closing
679    ///   the column writer that wrote the data in `reader`.
680    ///
681    /// See Also:
682    /// 1. [`get_column_writer`]  for creating writers that can encode data.
683    /// 2. [`Self::next_column`] for writing data that isn't already encoded
684    pub fn append_column<R: ChunkReader>(
685        &mut self,
686        reader: &R,
687        close: ColumnCloseResult,
688    ) -> Result<()> {
689        // Position a reader at the start of the buffered chunk, then splice the
690        // bytes through the shared streaming path.
691        let metadata = &close.metadata;
692        let src_offset = metadata
693            .dictionary_page_offset()
694            .unwrap_or_else(|| metadata.data_page_offset());
695        let read = reader.get_read(src_offset as _)?;
696        self.append_column_from_read(read, close)
697    }
698
699    /// Splice an already-encoded column chunk into the row group, reading its
700    /// bytes sequentially from `read`.
701    ///
702    /// `read` must be positioned at the start of the chunk (the dictionary page
703    /// if present, otherwise the first data page — i.e. `src_offset` below) and
704    /// yield exactly the chunk's compressed bytes. Unlike [`Self::append_column`]
705    /// this consumes an owned [`Read`], which lets the caller stream the bytes
706    /// back from a [`PageStore`](crate::column::page_store::PageStore) one page
707    /// at a time without materializing the whole chunk in memory.
708    pub(crate) fn append_column_from_read<R: Read>(
709        &mut self,
710        read: R,
711        mut close: ColumnCloseResult,
712    ) -> Result<()> {
713        self.assert_previous_writer_closed()?;
714        let desc = self
715            .next_column_desc()
716            .ok_or_else(|| general_err!("exhausted columns in SerializedRowGroupWriter"))?;
717
718        let metadata = close.metadata;
719
720        if metadata.column_descr() != desc.as_ref() {
721            return Err(general_err!(
722                "column descriptor mismatch, expected {:?} got {:?}",
723                desc,
724                metadata.column_descr()
725            ));
726        }
727
728        let src_dictionary_offset = metadata.dictionary_page_offset();
729        let src_data_offset = metadata.data_page_offset();
730        let src_offset = src_dictionary_offset.unwrap_or(src_data_offset);
731        let src_length = metadata.compressed_size();
732
733        let write_offset = self.buf.bytes_written();
734        let mut read = read.take(src_length as _);
735        let write_length = std::io::copy(&mut read, &mut self.buf)?;
736
737        if src_length as u64 != write_length {
738            return Err(general_err!(
739                "Failed to splice column data, expected {read_length} got {write_length}"
740            ));
741        }
742
743        let map_offset = |x| x - src_offset + write_offset as i64;
744        let mut builder = ColumnChunkMetaData::builder(metadata.column_descr_ptr())
745            .set_compression_codec(metadata.compression_codec())
746            .set_encodings_mask(*metadata.encodings_mask())
747            .set_total_compressed_size(metadata.compressed_size())
748            .set_total_uncompressed_size(metadata.uncompressed_size())
749            .set_num_values(metadata.num_values())
750            .set_data_page_offset(map_offset(src_data_offset))
751            .set_dictionary_page_offset(src_dictionary_offset.map(map_offset))
752            .set_unencoded_byte_array_data_bytes(metadata.unencoded_byte_array_data_bytes());
753
754        if let Some(rep_hist) = metadata.repetition_level_histogram() {
755            builder = builder.set_repetition_level_histogram(Some(rep_hist.clone()))
756        }
757        if let Some(def_hist) = metadata.definition_level_histogram() {
758            builder = builder.set_definition_level_histogram(Some(def_hist.clone()))
759        }
760        if let Some(statistics) = metadata.statistics() {
761            builder = builder.set_statistics(statistics.clone())
762        }
763        if let Some(geo_statistics) = metadata.geo_statistics() {
764            builder = builder.set_geo_statistics(Box::new(geo_statistics.clone()))
765        }
766        if let Some(page_encoding_stats) = metadata.page_encoding_stats() {
767            builder = builder.set_page_encoding_stats(page_encoding_stats.clone())
768        }
769        builder = self.set_column_crypto_metadata(builder, &metadata);
770        close.metadata = builder.build()?;
771
772        if let Some(offsets) = close.offset_index.as_mut() {
773            for location in &mut offsets.page_locations {
774                location.offset = map_offset(location.offset)
775            }
776        }
777
778        let (_, on_close) = self.get_on_close();
779        on_close(close)
780    }
781
782    /// Closes this row group writer and returns row group metadata.
783    pub fn close(mut self) -> Result<RowGroupMetaDataPtr> {
784        if self.row_group_metadata.is_none() {
785            self.assert_previous_writer_closed()?;
786
787            let column_chunks = std::mem::take(&mut self.column_chunks);
788            let row_group_metadata = RowGroupMetaData::builder(self.descr.clone())
789                .set_column_metadata(column_chunks)
790                .set_total_byte_size(self.total_uncompressed_bytes)
791                .set_num_rows(self.total_rows_written.unwrap_or(0) as i64)
792                .set_sorting_columns(self.props.sorting_columns().cloned())
793                .set_ordinal(self.row_group_index)
794                .set_file_offset(self.file_offset)
795                .build()?;
796
797            self.row_group_metadata = Some(Arc::new(row_group_metadata.clone()));
798
799            if let Some(on_close) = self.on_close.take() {
800                on_close(
801                    self.buf,
802                    row_group_metadata,
803                    self.bloom_filters,
804                    self.column_indexes,
805                    self.offset_indexes,
806                )?
807            }
808        }
809
810        let metadata = self.row_group_metadata.as_ref().unwrap().clone();
811        Ok(metadata)
812    }
813
814    /// Set the column crypto metadata for a column chunk
815    #[cfg(feature = "encryption")]
816    fn set_column_crypto_metadata(
817        &self,
818        builder: ColumnChunkMetaDataBuilder,
819        metadata: &ColumnChunkMetaData,
820    ) -> ColumnChunkMetaDataBuilder {
821        if let Some(file_encryptor) = self.file_encryptor.as_ref() {
822            builder.set_column_crypto_metadata(get_column_crypto_metadata(
823                file_encryptor.properties(),
824                &metadata.column_descr_ptr(),
825            ))
826        } else {
827            builder
828        }
829    }
830
831    /// Get context required to create a [`PageEncryptor`] for a column
832    #[cfg(feature = "encryption")]
833    fn get_page_encryptor_context(&self) -> PageEncryptorContext {
834        PageEncryptorContext {
835            file_encryptor: self.file_encryptor.clone(),
836            row_group_index: self.row_group_index as usize,
837            column_index: self.column_index,
838        }
839    }
840
841    /// Set the [`PageEncryptor`] on a page writer if a column is encrypted
842    #[cfg(feature = "encryption")]
843    fn set_page_writer_encryptor<'b>(
844        column: &ColumnDescPtr,
845        context: PageEncryptorContext,
846        page_writer: SerializedPageWriter<'b, W>,
847    ) -> Result<SerializedPageWriter<'b, W>> {
848        let page_encryptor = PageEncryptor::create_if_column_encrypted(
849            &context.file_encryptor,
850            context.row_group_index,
851            context.column_index,
852            &column.path().string(),
853        )?;
854
855        Ok(page_writer.with_page_encryptor(page_encryptor))
856    }
857
858    /// No-op implementation of setting the column crypto metadata for a column chunk
859    #[cfg(not(feature = "encryption"))]
860    fn set_column_crypto_metadata(
861        &self,
862        builder: ColumnChunkMetaDataBuilder,
863        _metadata: &ColumnChunkMetaData,
864    ) -> ColumnChunkMetaDataBuilder {
865        builder
866    }
867
868    #[cfg(not(feature = "encryption"))]
869    fn get_page_encryptor_context(&self) -> PageEncryptorContext {
870        PageEncryptorContext {}
871    }
872
873    /// No-op implementation of setting a [`PageEncryptor`] for when encryption is disabled
874    #[cfg(not(feature = "encryption"))]
875    fn set_page_writer_encryptor<'b>(
876        _column: &ColumnDescPtr,
877        _context: PageEncryptorContext,
878        page_writer: SerializedPageWriter<'b, W>,
879    ) -> Result<SerializedPageWriter<'b, W>> {
880        Ok(page_writer)
881    }
882
883    #[inline]
884    fn assert_previous_writer_closed(&self) -> Result<()> {
885        if self.column_index != self.column_chunks.len() {
886            Err(general_err!("Previous column writer was not closed"))
887        } else {
888            Ok(())
889        }
890    }
891}
892
893/// Context required to create a [`PageEncryptor`] for a column
894#[cfg(feature = "encryption")]
895struct PageEncryptorContext {
896    file_encryptor: Option<Arc<FileEncryptor>>,
897    row_group_index: usize,
898    column_index: usize,
899}
900
901#[cfg(not(feature = "encryption"))]
902struct PageEncryptorContext {}
903
904/// A wrapper around a [`ColumnWriter`] that invokes a callback on [`Self::close`]
905pub struct SerializedColumnWriter<'a> {
906    inner: ColumnWriter<'a>,
907    on_close: Option<OnCloseColumnChunk<'a>>,
908}
909
910impl<'a> SerializedColumnWriter<'a> {
911    /// Create a new [`SerializedColumnWriter`] from a [`ColumnWriter`] and an
912    /// optional callback to be invoked on [`Self::close`]
913    pub fn new(inner: ColumnWriter<'a>, on_close: Option<OnCloseColumnChunk<'a>>) -> Self {
914        Self { inner, on_close }
915    }
916
917    /// Returns a reference to an untyped [`ColumnWriter`]
918    pub fn untyped(&mut self) -> &mut ColumnWriter<'a> {
919        &mut self.inner
920    }
921
922    /// Returns a reference to a typed [`ColumnWriterImpl`]
923    pub fn typed<T: DataType>(&mut self) -> &mut ColumnWriterImpl<'a, T> {
924        get_typed_column_writer_mut(&mut self.inner)
925    }
926
927    /// Close this [`SerializedColumnWriter`]
928    pub fn close(mut self) -> Result<()> {
929        let r = self.inner.close()?;
930        if let Some(on_close) = self.on_close.take() {
931            on_close(r)?
932        }
933
934        Ok(())
935    }
936}
937
938/// A serialized implementation for Parquet [`PageWriter`].
939/// Writes and serializes pages and metadata into output stream.
940///
941/// `SerializedPageWriter` should not be used after calling `close()`.
942pub struct SerializedPageWriter<'a, W: Write> {
943    sink: &'a mut TrackedWrite<W>,
944    #[cfg(feature = "encryption")]
945    page_encryptor: Option<PageEncryptor>,
946}
947
948impl<'a, W: Write> SerializedPageWriter<'a, W> {
949    /// Creates new page writer.
950    pub fn new(sink: &'a mut TrackedWrite<W>) -> Self {
951        Self {
952            sink,
953            #[cfg(feature = "encryption")]
954            page_encryptor: None,
955        }
956    }
957
958    /// Serializes page header into Thrift.
959    /// Returns number of bytes that have been written into the sink.
960    #[inline]
961    fn serialize_page_header(&mut self, header: PageHeader) -> Result<usize> {
962        let start_pos = self.sink.bytes_written();
963        match self.page_encryptor_and_sink_mut() {
964            Some((page_encryptor, sink)) => {
965                page_encryptor.encrypt_page_header(&header, sink)?;
966            }
967            None => {
968                let mut protocol = ThriftCompactOutputProtocol::new(&mut self.sink);
969                header.write_thrift(&mut protocol)?;
970            }
971        }
972        Ok(self.sink.bytes_written() - start_pos)
973    }
974}
975
976#[cfg(feature = "encryption")]
977impl<'a, W: Write> SerializedPageWriter<'a, W> {
978    /// Set the encryptor to use to encrypt page data
979    fn with_page_encryptor(mut self, page_encryptor: Option<PageEncryptor>) -> Self {
980        self.page_encryptor = page_encryptor;
981        self
982    }
983
984    fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
985        self.page_encryptor.as_mut()
986    }
987
988    fn page_encryptor_and_sink_mut(
989        &mut self,
990    ) -> Option<(&mut PageEncryptor, &mut &'a mut TrackedWrite<W>)> {
991        self.page_encryptor.as_mut().map(|pe| (pe, &mut self.sink))
992    }
993}
994
995#[cfg(not(feature = "encryption"))]
996impl<'a, W: Write> SerializedPageWriter<'a, W> {
997    fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
998        None
999    }
1000
1001    fn page_encryptor_and_sink_mut(
1002        &mut self,
1003    ) -> Option<(&mut PageEncryptor, &mut &'a mut TrackedWrite<W>)> {
1004        None
1005    }
1006}
1007
1008impl<W: Write + Send> PageWriter for SerializedPageWriter<'_, W> {
1009    fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
1010        let page = match self.page_encryptor_mut() {
1011            Some(page_encryptor) => page_encryptor.encrypt_compressed_page(page)?,
1012            None => page,
1013        };
1014
1015        let page_type = page.page_type();
1016        let start_pos = self.sink.bytes_written() as u64;
1017
1018        let page_header = page.to_thrift_header()?;
1019        let header_size = self.serialize_page_header(page_header)?;
1020
1021        self.sink.write_all(page.data())?;
1022
1023        let mut spec = PageWriteSpec::new();
1024        spec.page_type = page_type;
1025        spec.uncompressed_size = page.uncompressed_size() + header_size;
1026        spec.compressed_size = page.compressed_size() + header_size;
1027        spec.offset = start_pos;
1028        spec.bytes_written = self.sink.bytes_written() as u64 - start_pos;
1029        spec.num_values = page.num_values();
1030
1031        if let Some(page_encryptor) = self.page_encryptor_mut() {
1032            if page.compressed_page().is_data_page() {
1033                page_encryptor.increment_page();
1034            }
1035        }
1036        Ok(spec)
1037    }
1038
1039    fn close(&mut self) -> Result<()> {
1040        self.sink.flush()?;
1041        Ok(())
1042    }
1043}
1044
1045/// Get the magic bytes at the start and end of the file that identify this
1046/// as a Parquet file.
1047#[cfg(feature = "encryption")]
1048pub(crate) fn get_file_magic(
1049    file_encryption_properties: Option<&Arc<FileEncryptionProperties>>,
1050) -> &'static [u8; 4] {
1051    match file_encryption_properties.as_ref() {
1052        Some(encryption_properties) if encryption_properties.encrypt_footer() => {
1053            &PARQUET_MAGIC_ENCR_FOOTER
1054        }
1055        _ => &PARQUET_MAGIC,
1056    }
1057}
1058
1059#[cfg(not(feature = "encryption"))]
1060pub(crate) fn get_file_magic() -> &'static [u8; 4] {
1061    &PARQUET_MAGIC
1062}
1063
1064#[cfg(test)]
1065mod tests {
1066    use super::*;
1067
1068    #[cfg(feature = "arrow")]
1069    use arrow_array::RecordBatchReader;
1070    use bytes::Bytes;
1071    use std::fs::File;
1072
1073    #[cfg(feature = "arrow")]
1074    use crate::arrow::ArrowWriter;
1075    #[cfg(feature = "arrow")]
1076    use crate::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
1077    use crate::basic::{
1078        ColumnOrder, Compression, ConvertedType, Encoding, LogicalType, Repetition, SortOrder, Type,
1079    };
1080    use crate::column::page::{Page, PageReader};
1081    use crate::column::reader::get_typed_column_reader;
1082    use crate::compression::{Codec, CodecOptionsBuilder, create_codec};
1083    use crate::data_type::{BoolType, ByteArrayType, Int32Type};
1084    use crate::file::page_index::column_index::ColumnIndexMetaData;
1085    use crate::file::properties::EnabledStatistics;
1086    use crate::file::serialized_reader::ReadOptionsBuilder;
1087    use crate::file::statistics::{from_thrift_page_stats, page_stats_to_thrift};
1088    use crate::file::{
1089        properties::{ReaderProperties, WriterProperties, WriterVersion},
1090        reader::{FileReader, SerializedFileReader, SerializedPageReader},
1091        statistics::Statistics,
1092    };
1093    use crate::record::{Row, RowAccessor};
1094    use crate::schema::parser::parse_message_type;
1095    use crate::schema::types;
1096    use crate::schema::types::{ColumnDescriptor, ColumnPath};
1097    use crate::util::test_common::file_util::get_test_file;
1098    use crate::util::test_common::rand_gen::RandGen;
1099
1100    #[test]
1101    fn test_row_group_writer_error_not_all_columns_written() {
1102        let file = tempfile::tempfile().unwrap();
1103        let schema = Arc::new(
1104            types::Type::group_type_builder("schema")
1105                .with_fields(vec![Arc::new(
1106                    types::Type::primitive_type_builder("col1", Type::INT32)
1107                        .build()
1108                        .unwrap(),
1109                )])
1110                .build()
1111                .unwrap(),
1112        );
1113        let props = Default::default();
1114        let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
1115        let row_group_writer = writer.next_row_group().unwrap();
1116        let res = row_group_writer.close();
1117        assert!(res.is_err());
1118        if let Err(err) = res {
1119            assert_eq!(
1120                format!("{err}"),
1121                "Parquet error: Column length mismatch: 1 != 0"
1122            );
1123        }
1124    }
1125
1126    #[test]
1127    fn test_row_group_writer_num_records_mismatch() {
1128        let file = tempfile::tempfile().unwrap();
1129        let schema = Arc::new(
1130            types::Type::group_type_builder("schema")
1131                .with_fields(vec![
1132                    Arc::new(
1133                        types::Type::primitive_type_builder("col1", Type::INT32)
1134                            .with_repetition(Repetition::REQUIRED)
1135                            .build()
1136                            .unwrap(),
1137                    ),
1138                    Arc::new(
1139                        types::Type::primitive_type_builder("col2", Type::INT32)
1140                            .with_repetition(Repetition::REQUIRED)
1141                            .build()
1142                            .unwrap(),
1143                    ),
1144                ])
1145                .build()
1146                .unwrap(),
1147        );
1148        let props = Default::default();
1149        let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
1150        let mut row_group_writer = writer.next_row_group().unwrap();
1151
1152        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
1153        col_writer
1154            .typed::<Int32Type>()
1155            .write_batch(&[1, 2, 3], None, None)
1156            .unwrap();
1157        col_writer.close().unwrap();
1158
1159        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
1160        col_writer
1161            .typed::<Int32Type>()
1162            .write_batch(&[1, 2], None, None)
1163            .unwrap();
1164
1165        let err = col_writer.close().unwrap_err();
1166        assert_eq!(
1167            err.to_string(),
1168            "Parquet error: Incorrect number of rows, expected 3 != 2 rows"
1169        );
1170    }
1171
1172    #[test]
1173    fn test_file_writer_empty_file() {
1174        let file = tempfile::tempfile().unwrap();
1175
1176        let schema = Arc::new(
1177            types::Type::group_type_builder("schema")
1178                .with_fields(vec![Arc::new(
1179                    types::Type::primitive_type_builder("col1", Type::INT32)
1180                        .build()
1181                        .unwrap(),
1182                )])
1183                .build()
1184                .unwrap(),
1185        );
1186        let props = Default::default();
1187        let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1188        writer.close().unwrap();
1189
1190        let reader = SerializedFileReader::new(file).unwrap();
1191        assert_eq!(reader.get_row_iter(None).unwrap().count(), 0);
1192    }
1193
1194    #[test]
1195    fn test_file_writer_column_orders_populated() {
1196        let file = tempfile::tempfile().unwrap();
1197
1198        let schema = Arc::new(
1199            types::Type::group_type_builder("schema")
1200                .with_fields(vec![
1201                    Arc::new(
1202                        types::Type::primitive_type_builder("col1", Type::INT32)
1203                            .build()
1204                            .unwrap(),
1205                    ),
1206                    Arc::new(
1207                        types::Type::primitive_type_builder("col2", Type::FIXED_LEN_BYTE_ARRAY)
1208                            .with_converted_type(ConvertedType::INTERVAL)
1209                            .with_length(12)
1210                            .build()
1211                            .unwrap(),
1212                    ),
1213                    Arc::new(
1214                        types::Type::group_type_builder("nested")
1215                            .with_repetition(Repetition::REQUIRED)
1216                            .with_fields(vec![
1217                                Arc::new(
1218                                    types::Type::primitive_type_builder(
1219                                        "col3",
1220                                        Type::FIXED_LEN_BYTE_ARRAY,
1221                                    )
1222                                    .with_logical_type(Some(LogicalType::Float16))
1223                                    .with_length(2)
1224                                    .build()
1225                                    .unwrap(),
1226                                ),
1227                                Arc::new(
1228                                    types::Type::primitive_type_builder("col4", Type::BYTE_ARRAY)
1229                                        .with_logical_type(Some(LogicalType::String))
1230                                        .build()
1231                                        .unwrap(),
1232                                ),
1233                            ])
1234                            .build()
1235                            .unwrap(),
1236                    ),
1237                ])
1238                .build()
1239                .unwrap(),
1240        );
1241
1242        let props = Default::default();
1243        let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1244        writer.close().unwrap();
1245
1246        let reader = SerializedFileReader::new(file).unwrap();
1247
1248        // only leaves
1249        let expected = vec![
1250            // INT32
1251            ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED),
1252            // INTERVAL
1253            ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::UNDEFINED),
1254            // Float16
1255            ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED),
1256            // String
1257            ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::UNSIGNED),
1258        ];
1259        let actual = reader.metadata().file_metadata().column_orders();
1260
1261        assert!(actual.is_some());
1262        let actual = actual.unwrap();
1263        assert_eq!(*actual, expected);
1264    }
1265
1266    #[test]
1267    fn test_file_writer_with_metadata() {
1268        let file = tempfile::tempfile().unwrap();
1269
1270        let schema = Arc::new(
1271            types::Type::group_type_builder("schema")
1272                .with_fields(vec![Arc::new(
1273                    types::Type::primitive_type_builder("col1", Type::INT32)
1274                        .build()
1275                        .unwrap(),
1276                )])
1277                .build()
1278                .unwrap(),
1279        );
1280        let props = Arc::new(
1281            WriterProperties::builder()
1282                .set_key_value_metadata(Some(vec![KeyValue::new(
1283                    "key".to_string(),
1284                    "value".to_string(),
1285                )]))
1286                .build(),
1287        );
1288        let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1289        writer.close().unwrap();
1290
1291        let reader = SerializedFileReader::new(file).unwrap();
1292        assert_eq!(
1293            reader
1294                .metadata()
1295                .file_metadata()
1296                .key_value_metadata()
1297                .to_owned()
1298                .unwrap()
1299                .len(),
1300            1
1301        );
1302    }
1303
1304    #[test]
1305    fn test_file_writer_v2_with_metadata() {
1306        let file = tempfile::tempfile().unwrap();
1307        let field_logical_type = Some(LogicalType::integer(8, false));
1308        let field = Arc::new(
1309            types::Type::primitive_type_builder("col1", Type::INT32)
1310                .with_logical_type(field_logical_type.clone())
1311                .with_converted_type(field_logical_type.into())
1312                .build()
1313                .unwrap(),
1314        );
1315        let schema = Arc::new(
1316            types::Type::group_type_builder("schema")
1317                .with_fields(vec![field.clone()])
1318                .build()
1319                .unwrap(),
1320        );
1321        let props = Arc::new(
1322            WriterProperties::builder()
1323                .set_key_value_metadata(Some(vec![KeyValue::new(
1324                    "key".to_string(),
1325                    "value".to_string(),
1326                )]))
1327                .set_writer_version(WriterVersion::PARQUET_2_0)
1328                .build(),
1329        );
1330        let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1331        writer.close().unwrap();
1332
1333        let reader = SerializedFileReader::new(file).unwrap();
1334
1335        assert_eq!(
1336            reader
1337                .metadata()
1338                .file_metadata()
1339                .key_value_metadata()
1340                .to_owned()
1341                .unwrap()
1342                .len(),
1343            1
1344        );
1345
1346        // ARROW-11803: Test that the converted and logical types have been populated
1347        let fields = reader.metadata().file_metadata().schema().get_fields();
1348        assert_eq!(fields.len(), 1);
1349        assert_eq!(fields[0], field);
1350    }
1351
1352    #[test]
1353    fn test_file_writer_with_sorting_columns_metadata() {
1354        let file = tempfile::tempfile().unwrap();
1355
1356        let schema = Arc::new(
1357            types::Type::group_type_builder("schema")
1358                .with_fields(vec![
1359                    Arc::new(
1360                        types::Type::primitive_type_builder("col1", Type::INT32)
1361                            .build()
1362                            .unwrap(),
1363                    ),
1364                    Arc::new(
1365                        types::Type::primitive_type_builder("col2", Type::INT32)
1366                            .build()
1367                            .unwrap(),
1368                    ),
1369                ])
1370                .build()
1371                .unwrap(),
1372        );
1373        let expected_result = Some(vec![SortingColumn {
1374            column_idx: 0,
1375            descending: false,
1376            nulls_first: true,
1377        }]);
1378        let props = Arc::new(
1379            WriterProperties::builder()
1380                .set_key_value_metadata(Some(vec![KeyValue::new(
1381                    "key".to_string(),
1382                    "value".to_string(),
1383                )]))
1384                .set_sorting_columns(expected_result.clone())
1385                .build(),
1386        );
1387        let mut writer =
1388            SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1389        let mut row_group_writer = writer.next_row_group().expect("get row group writer");
1390
1391        let col_writer = row_group_writer.next_column().unwrap().unwrap();
1392        col_writer.close().unwrap();
1393
1394        let col_writer = row_group_writer.next_column().unwrap().unwrap();
1395        col_writer.close().unwrap();
1396
1397        row_group_writer.close().unwrap();
1398        writer.close().unwrap();
1399
1400        let reader = SerializedFileReader::new(file).unwrap();
1401        let result: Vec<Option<&Vec<SortingColumn>>> = reader
1402            .metadata()
1403            .row_groups()
1404            .iter()
1405            .map(|f| f.sorting_columns())
1406            .collect();
1407        // validate the sorting column read match the one written above
1408        assert_eq!(expected_result.as_ref(), result[0]);
1409    }
1410
1411    #[test]
1412    fn test_file_writer_empty_row_groups() {
1413        let file = tempfile::tempfile().unwrap();
1414        test_file_roundtrip(file, vec![]);
1415    }
1416
1417    #[test]
1418    fn test_file_writer_single_row_group() {
1419        let file = tempfile::tempfile().unwrap();
1420        test_file_roundtrip(file, vec![vec![1, 2, 3, 4, 5]]);
1421    }
1422
1423    #[test]
1424    fn test_file_writer_multiple_row_groups() {
1425        let file = tempfile::tempfile().unwrap();
1426        test_file_roundtrip(
1427            file,
1428            vec![
1429                vec![1, 2, 3, 4, 5],
1430                vec![1, 2, 3],
1431                vec![1],
1432                vec![1, 2, 3, 4, 5, 6],
1433            ],
1434        );
1435    }
1436
1437    #[test]
1438    fn test_file_writer_multiple_large_row_groups() {
1439        let file = tempfile::tempfile().unwrap();
1440        test_file_roundtrip(
1441            file,
1442            vec![vec![123; 1024], vec![124; 1000], vec![125; 15], vec![]],
1443        );
1444    }
1445
1446    #[test]
1447    fn test_page_writer_data_pages() {
1448        let pages = [
1449            Page::DataPage {
1450                buf: Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]),
1451                num_values: 10,
1452                encoding: Encoding::DELTA_BINARY_PACKED,
1453                def_level_encoding: Encoding::RLE,
1454                rep_level_encoding: Encoding::RLE,
1455                statistics: Some(Statistics::int32(Some(1), Some(3), None, Some(7), true)),
1456            },
1457            Page::DataPageV2 {
1458                buf: Bytes::from(vec![4; 128]),
1459                num_values: 10,
1460                encoding: Encoding::DELTA_BINARY_PACKED,
1461                num_nulls: 2,
1462                num_rows: 12,
1463                def_levels_byte_len: 24,
1464                rep_levels_byte_len: 32,
1465                is_compressed: false,
1466                statistics: Some(Statistics::int32(Some(1), Some(3), None, Some(7), true)),
1467            },
1468        ];
1469
1470        test_page_roundtrip(&pages[..], Compression::SNAPPY, Type::INT32);
1471        test_page_roundtrip(&pages[..], Compression::UNCOMPRESSED, Type::INT32);
1472    }
1473
1474    #[test]
1475    fn test_page_writer_dict_pages() {
1476        let pages = [
1477            Page::DictionaryPage {
1478                buf: Bytes::from(vec![1, 2, 3, 4, 5]),
1479                num_values: 5,
1480                encoding: Encoding::RLE_DICTIONARY,
1481                is_sorted: false,
1482            },
1483            Page::DataPage {
1484                buf: Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]),
1485                num_values: 10,
1486                encoding: Encoding::DELTA_BINARY_PACKED,
1487                def_level_encoding: Encoding::RLE,
1488                rep_level_encoding: Encoding::RLE,
1489                statistics: Some(Statistics::int32(Some(1), Some(3), None, Some(7), true)),
1490            },
1491            Page::DataPageV2 {
1492                buf: Bytes::from(vec![4; 128]),
1493                num_values: 10,
1494                encoding: Encoding::DELTA_BINARY_PACKED,
1495                num_nulls: 2,
1496                num_rows: 12,
1497                def_levels_byte_len: 24,
1498                rep_levels_byte_len: 32,
1499                is_compressed: false,
1500                statistics: None,
1501            },
1502        ];
1503
1504        test_page_roundtrip(&pages[..], Compression::SNAPPY, Type::INT32);
1505        test_page_roundtrip(&pages[..], Compression::UNCOMPRESSED, Type::INT32);
1506    }
1507
1508    /// Tests writing and reading pages.
1509    /// Physical type is for statistics only, should match any defined statistics type in
1510    /// pages.
1511    fn test_page_roundtrip(pages: &[Page], codec: Compression, physical_type: Type) {
1512        let mut compressed_pages = vec![];
1513        let mut total_num_values = 0i64;
1514        let codec_options = CodecOptionsBuilder::default()
1515            .set_backward_compatible_lz4(false)
1516            .build();
1517        let mut compressor = create_codec(codec, &codec_options).unwrap();
1518
1519        for page in pages {
1520            let uncompressed_len = page.buffer().len();
1521
1522            let compressed_page = match *page {
1523                Page::DataPage {
1524                    ref buf,
1525                    num_values,
1526                    encoding,
1527                    def_level_encoding,
1528                    rep_level_encoding,
1529                    ref statistics,
1530                } => {
1531                    total_num_values += num_values as i64;
1532                    let output_buf = compress_helper(compressor.as_mut(), buf);
1533
1534                    Page::DataPage {
1535                        buf: Bytes::from(output_buf),
1536                        num_values,
1537                        encoding,
1538                        def_level_encoding,
1539                        rep_level_encoding,
1540                        statistics: from_thrift_page_stats(
1541                            physical_type,
1542                            page_stats_to_thrift(statistics.as_ref()),
1543                        )
1544                        .unwrap(),
1545                    }
1546                }
1547                Page::DataPageV2 {
1548                    ref buf,
1549                    num_values,
1550                    encoding,
1551                    num_nulls,
1552                    num_rows,
1553                    def_levels_byte_len,
1554                    rep_levels_byte_len,
1555                    ref statistics,
1556                    ..
1557                } => {
1558                    total_num_values += num_values as i64;
1559                    let offset = (def_levels_byte_len + rep_levels_byte_len) as usize;
1560                    let cmp_buf = compress_helper(compressor.as_mut(), &buf[offset..]);
1561                    let mut output_buf = Vec::from(&buf[..offset]);
1562                    output_buf.extend_from_slice(&cmp_buf[..]);
1563
1564                    Page::DataPageV2 {
1565                        buf: Bytes::from(output_buf),
1566                        num_values,
1567                        encoding,
1568                        num_nulls,
1569                        num_rows,
1570                        def_levels_byte_len,
1571                        rep_levels_byte_len,
1572                        is_compressed: compressor.is_some(),
1573                        statistics: from_thrift_page_stats(
1574                            physical_type,
1575                            page_stats_to_thrift(statistics.as_ref()),
1576                        )
1577                        .unwrap(),
1578                    }
1579                }
1580                Page::DictionaryPage {
1581                    ref buf,
1582                    num_values,
1583                    encoding,
1584                    is_sorted,
1585                } => {
1586                    let output_buf = compress_helper(compressor.as_mut(), buf);
1587
1588                    Page::DictionaryPage {
1589                        buf: Bytes::from(output_buf),
1590                        num_values,
1591                        encoding,
1592                        is_sorted,
1593                    }
1594                }
1595            };
1596
1597            let compressed_page = CompressedPage::new(compressed_page, uncompressed_len);
1598            compressed_pages.push(compressed_page);
1599        }
1600
1601        let mut buffer: Vec<u8> = vec![];
1602        let mut result_pages: Vec<Page> = vec![];
1603        {
1604            let mut writer = TrackedWrite::new(&mut buffer);
1605            let mut page_writer = SerializedPageWriter::new(&mut writer);
1606
1607            for page in compressed_pages {
1608                page_writer.write_page(page).unwrap();
1609            }
1610            page_writer.close().unwrap();
1611        }
1612        {
1613            let reader = bytes::Bytes::from(buffer);
1614
1615            let t = types::Type::primitive_type_builder("t", physical_type)
1616                .build()
1617                .unwrap();
1618
1619            let desc = ColumnDescriptor::new(Arc::new(t), 0, 0, ColumnPath::new(vec![]));
1620            let meta = ColumnChunkMetaData::builder(Arc::new(desc))
1621                .set_compression_codec(codec.into())
1622                .set_total_compressed_size(reader.len() as i64)
1623                .set_num_values(total_num_values)
1624                .build()
1625                .unwrap();
1626
1627            let props = ReaderProperties::builder()
1628                .set_backward_compatible_lz4(false)
1629                .set_read_page_statistics(true)
1630                .build();
1631            let mut page_reader = SerializedPageReader::new_with_properties(
1632                Arc::new(reader),
1633                &meta,
1634                total_num_values as usize,
1635                None,
1636                Arc::new(props),
1637            )
1638            .unwrap();
1639
1640            while let Some(page) = page_reader.get_next_page().unwrap() {
1641                result_pages.push(page);
1642            }
1643        }
1644
1645        assert_eq!(result_pages.len(), pages.len());
1646        for i in 0..result_pages.len() {
1647            assert_page(&result_pages[i], &pages[i]);
1648        }
1649    }
1650
1651    /// Helper function to compress a slice
1652    fn compress_helper(compressor: Option<&mut Box<dyn Codec>>, data: &[u8]) -> Vec<u8> {
1653        let mut output_buf = vec![];
1654        if let Some(cmpr) = compressor {
1655            cmpr.compress(data, &mut output_buf).unwrap();
1656        } else {
1657            output_buf.extend_from_slice(data);
1658        }
1659        output_buf
1660    }
1661
1662    /// Check if pages match.
1663    fn assert_page(left: &Page, right: &Page) {
1664        assert_eq!(left.page_type(), right.page_type());
1665        assert_eq!(&left.buffer(), &right.buffer());
1666        assert_eq!(left.num_values(), right.num_values());
1667        assert_eq!(left.encoding(), right.encoding());
1668        assert_eq!(
1669            page_stats_to_thrift(left.statistics()),
1670            page_stats_to_thrift(right.statistics())
1671        );
1672    }
1673
1674    /// Tests roundtrip of i32 data written using `W` and read using `R`
1675    fn test_roundtrip_i32<W, R>(
1676        file: W,
1677        data: Vec<Vec<i32>>,
1678        compression: Compression,
1679    ) -> ParquetMetaData
1680    where
1681        W: Write + Send,
1682        R: ChunkReader + From<W> + 'static,
1683    {
1684        test_roundtrip::<W, R, Int32Type, _>(file, data, |r| r.get_int(0).unwrap(), compression)
1685    }
1686
1687    /// Tests roundtrip of data of type `D` written using `W` and read using `R`
1688    /// and the provided `values` function
1689    fn test_roundtrip<W, R, D, F>(
1690        mut file: W,
1691        data: Vec<Vec<D::T>>,
1692        value: F,
1693        compression: Compression,
1694    ) -> ParquetMetaData
1695    where
1696        W: Write + Send,
1697        R: ChunkReader + From<W> + 'static,
1698        D: DataType,
1699        F: Fn(Row) -> D::T,
1700    {
1701        let schema = Arc::new(
1702            types::Type::group_type_builder("schema")
1703                .with_fields(vec![Arc::new(
1704                    types::Type::primitive_type_builder("col1", D::get_physical_type())
1705                        .with_repetition(Repetition::REQUIRED)
1706                        .build()
1707                        .unwrap(),
1708                )])
1709                .build()
1710                .unwrap(),
1711        );
1712        let props = Arc::new(
1713            WriterProperties::builder()
1714                .set_compression(compression)
1715                .build(),
1716        );
1717        let mut file_writer = SerializedFileWriter::new(&mut file, schema, props).unwrap();
1718        let mut rows: i64 = 0;
1719
1720        for (idx, subset) in data.iter().enumerate() {
1721            let row_group_file_offset = file_writer.buf.bytes_written();
1722            let mut row_group_writer = file_writer.next_row_group().unwrap();
1723            if let Some(mut writer) = row_group_writer.next_column().unwrap() {
1724                rows += writer
1725                    .typed::<D>()
1726                    .write_batch(&subset[..], None, None)
1727                    .unwrap() as i64;
1728                writer.close().unwrap();
1729            }
1730            let last_group = row_group_writer.close().unwrap();
1731            let flushed = file_writer.flushed_row_groups();
1732            assert_eq!(flushed.len(), idx + 1);
1733            assert_eq!(Some(idx as i16), last_group.ordinal());
1734            assert_eq!(Some(row_group_file_offset as i64), last_group.file_offset());
1735            assert_eq!(&flushed[idx], last_group.as_ref());
1736        }
1737        let file_metadata = file_writer.close().unwrap();
1738
1739        let reader = SerializedFileReader::new(R::from(file)).unwrap();
1740        assert_eq!(reader.num_row_groups(), data.len());
1741        assert_eq!(
1742            reader.metadata().file_metadata().num_rows(),
1743            rows,
1744            "row count in metadata not equal to number of rows written"
1745        );
1746        for (i, item) in data.iter().enumerate().take(reader.num_row_groups()) {
1747            let row_group_reader = reader.get_row_group(i).unwrap();
1748            let iter = row_group_reader.get_row_iter(None).unwrap();
1749            let res: Vec<_> = iter.map(|row| row.unwrap()).map(&value).collect();
1750            let row_group_size = row_group_reader.metadata().total_byte_size();
1751            let uncompressed_size: i64 = row_group_reader
1752                .metadata()
1753                .columns()
1754                .iter()
1755                .map(|v| v.uncompressed_size())
1756                .sum();
1757            assert_eq!(row_group_size, uncompressed_size);
1758            assert_eq!(res, *item);
1759        }
1760        file_metadata
1761    }
1762
1763    /// File write-read roundtrip.
1764    /// `data` consists of arrays of values for each row group.
1765    fn test_file_roundtrip(file: File, data: Vec<Vec<i32>>) -> ParquetMetaData {
1766        test_roundtrip_i32::<File, File>(file, data, Compression::UNCOMPRESSED)
1767    }
1768
1769    #[test]
1770    fn test_bytes_writer_empty_row_groups() {
1771        test_bytes_roundtrip(vec![], Compression::UNCOMPRESSED);
1772    }
1773
1774    #[test]
1775    fn test_bytes_writer_single_row_group() {
1776        test_bytes_roundtrip(vec![vec![1, 2, 3, 4, 5]], Compression::UNCOMPRESSED);
1777    }
1778
1779    #[test]
1780    fn test_bytes_writer_multiple_row_groups() {
1781        test_bytes_roundtrip(
1782            vec![
1783                vec![1, 2, 3, 4, 5],
1784                vec![1, 2, 3],
1785                vec![1],
1786                vec![1, 2, 3, 4, 5, 6],
1787            ],
1788            Compression::UNCOMPRESSED,
1789        );
1790    }
1791
1792    #[test]
1793    fn test_bytes_writer_single_row_group_compressed() {
1794        test_bytes_roundtrip(vec![vec![1, 2, 3, 4, 5]], Compression::SNAPPY);
1795    }
1796
1797    #[test]
1798    fn test_bytes_writer_multiple_row_groups_compressed() {
1799        test_bytes_roundtrip(
1800            vec![
1801                vec![1, 2, 3, 4, 5],
1802                vec![1, 2, 3],
1803                vec![1],
1804                vec![1, 2, 3, 4, 5, 6],
1805            ],
1806            Compression::SNAPPY,
1807        );
1808    }
1809
1810    fn test_bytes_roundtrip(data: Vec<Vec<i32>>, compression: Compression) {
1811        test_roundtrip_i32::<Vec<u8>, Bytes>(Vec::with_capacity(1024), data, compression);
1812    }
1813
1814    #[test]
1815    fn test_boolean_roundtrip() {
1816        let my_bool_values: Vec<_> = (0..2049).map(|idx| idx % 2 == 0).collect();
1817        test_roundtrip::<Vec<u8>, Bytes, BoolType, _>(
1818            Vec::with_capacity(1024),
1819            vec![my_bool_values],
1820            |r| r.get_bool(0).unwrap(),
1821            Compression::UNCOMPRESSED,
1822        );
1823    }
1824
1825    #[test]
1826    fn test_boolean_compressed_roundtrip() {
1827        let my_bool_values: Vec<_> = (0..2049).map(|idx| idx % 2 == 0).collect();
1828        test_roundtrip::<Vec<u8>, Bytes, BoolType, _>(
1829            Vec::with_capacity(1024),
1830            vec![my_bool_values],
1831            |r| r.get_bool(0).unwrap(),
1832            Compression::SNAPPY,
1833        );
1834    }
1835
1836    #[test]
1837    fn test_column_offset_index_file() {
1838        let file = tempfile::tempfile().unwrap();
1839        let file_metadata = test_file_roundtrip(file, vec![vec![1, 2, 3, 4, 5]]);
1840        file_metadata.row_groups().iter().for_each(|row_group| {
1841            row_group.columns().iter().for_each(|column_chunk| {
1842                assert!(column_chunk.column_index_offset().is_some());
1843                assert!(column_chunk.column_index_length().is_some());
1844                assert!(column_chunk.offset_index_offset().is_some());
1845                assert!(column_chunk.offset_index_length().is_some());
1846            })
1847        });
1848    }
1849
1850    fn test_kv_metadata(initial_kv: Option<Vec<KeyValue>>, final_kv: Option<Vec<KeyValue>>) {
1851        let schema = Arc::new(
1852            types::Type::group_type_builder("schema")
1853                .with_fields(vec![Arc::new(
1854                    types::Type::primitive_type_builder("col1", Type::INT32)
1855                        .with_repetition(Repetition::REQUIRED)
1856                        .build()
1857                        .unwrap(),
1858                )])
1859                .build()
1860                .unwrap(),
1861        );
1862        let mut out = Vec::with_capacity(1024);
1863        let props = Arc::new(
1864            WriterProperties::builder()
1865                .set_key_value_metadata(initial_kv.clone())
1866                .build(),
1867        );
1868        let mut writer = SerializedFileWriter::new(&mut out, schema, props).unwrap();
1869        let mut row_group_writer = writer.next_row_group().unwrap();
1870        let column = row_group_writer.next_column().unwrap().unwrap();
1871        column.close().unwrap();
1872        row_group_writer.close().unwrap();
1873        if let Some(kvs) = &final_kv {
1874            for kv in kvs {
1875                writer.append_key_value_metadata(kv.clone())
1876            }
1877        }
1878        writer.close().unwrap();
1879
1880        let reader = SerializedFileReader::new(Bytes::from(out)).unwrap();
1881        let metadata = reader.metadata().file_metadata();
1882        let keys = metadata.key_value_metadata();
1883
1884        match (initial_kv, final_kv) {
1885            (Some(a), Some(b)) => {
1886                let keys = keys.unwrap();
1887                assert_eq!(keys.len(), a.len() + b.len());
1888                assert_eq!(&keys[..a.len()], a.as_slice());
1889                assert_eq!(&keys[a.len()..], b.as_slice());
1890            }
1891            (Some(v), None) => assert_eq!(keys.unwrap(), &v),
1892            (None, Some(v)) if !v.is_empty() => assert_eq!(keys.unwrap(), &v),
1893            _ => assert!(keys.is_none()),
1894        }
1895    }
1896
1897    #[test]
1898    fn test_append_metadata() {
1899        let kv1 = KeyValue::new("cupcakes".to_string(), "awesome".to_string());
1900        let kv2 = KeyValue::new("bingo".to_string(), "bongo".to_string());
1901
1902        test_kv_metadata(None, None);
1903        test_kv_metadata(Some(vec![kv1.clone()]), None);
1904        test_kv_metadata(None, Some(vec![kv2.clone()]));
1905        test_kv_metadata(Some(vec![kv1.clone()]), Some(vec![kv2.clone()]));
1906        test_kv_metadata(Some(vec![]), Some(vec![kv2]));
1907        test_kv_metadata(Some(vec![]), Some(vec![]));
1908        test_kv_metadata(Some(vec![kv1]), Some(vec![]));
1909        test_kv_metadata(None, Some(vec![]));
1910    }
1911
1912    #[test]
1913    fn test_backwards_compatible_statistics() {
1914        let message_type = "
1915            message test_schema {
1916                REQUIRED INT32 decimal1 (DECIMAL(8,2));
1917                REQUIRED INT32 i32 (INTEGER(32,true));
1918                REQUIRED INT32 u32 (INTEGER(32,false));
1919            }
1920        ";
1921
1922        let schema = Arc::new(parse_message_type(message_type).unwrap());
1923        let props = Default::default();
1924        let mut writer = SerializedFileWriter::new(vec![], schema, props).unwrap();
1925        let mut row_group_writer = writer.next_row_group().unwrap();
1926
1927        for _ in 0..3 {
1928            let mut writer = row_group_writer.next_column().unwrap().unwrap();
1929            writer
1930                .typed::<Int32Type>()
1931                .write_batch(&[1, 2, 3], None, None)
1932                .unwrap();
1933            writer.close().unwrap();
1934        }
1935        let metadata = row_group_writer.close().unwrap();
1936        writer.close().unwrap();
1937
1938        // decimal
1939        let s = page_stats_to_thrift(metadata.column(0).statistics()).unwrap();
1940        assert_eq!(s.min.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1941        assert_eq!(s.max.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1942        assert_eq!(s.min_value.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1943        assert_eq!(s.max_value.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1944
1945        // i32
1946        let s = page_stats_to_thrift(metadata.column(1).statistics()).unwrap();
1947        assert_eq!(s.min.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1948        assert_eq!(s.max.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1949        assert_eq!(s.min_value.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1950        assert_eq!(s.max_value.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1951
1952        // u32
1953        let s = page_stats_to_thrift(metadata.column(2).statistics()).unwrap();
1954        assert_eq!(s.min.as_deref(), None);
1955        assert_eq!(s.max.as_deref(), None);
1956        assert_eq!(s.min_value.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1957        assert_eq!(s.max_value.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1958    }
1959
1960    #[test]
1961    fn test_spliced_write() {
1962        let message_type = "
1963            message test_schema {
1964                REQUIRED INT32 i32 (INTEGER(32,true));
1965                REQUIRED INT32 u32 (INTEGER(32,false));
1966            }
1967        ";
1968        let schema = Arc::new(parse_message_type(message_type).unwrap());
1969        let props = Arc::new(WriterProperties::builder().build());
1970
1971        let mut file = Vec::with_capacity(1024);
1972        let mut file_writer = SerializedFileWriter::new(&mut file, schema, props.clone()).unwrap();
1973
1974        let columns = file_writer.descr.columns();
1975        let mut column_state: Vec<(_, Option<ColumnCloseResult>)> = columns
1976            .iter()
1977            .map(|_| (TrackedWrite::new(Vec::with_capacity(1024)), None))
1978            .collect();
1979
1980        let mut column_state_slice = column_state.as_mut_slice();
1981        let mut column_writers = Vec::with_capacity(columns.len());
1982        for c in columns {
1983            let ((buf, out), tail) = column_state_slice.split_first_mut().unwrap();
1984            column_state_slice = tail;
1985
1986            let page_writer = Box::new(SerializedPageWriter::new(buf));
1987            let col_writer = get_column_writer(c.clone(), props.clone(), page_writer);
1988            column_writers.push(SerializedColumnWriter::new(
1989                col_writer,
1990                Some(Box::new(|on_close| {
1991                    *out = Some(on_close);
1992                    Ok(())
1993                })),
1994            ));
1995        }
1996
1997        let column_data = [[1, 2, 3, 4], [7, 3, 7, 3]];
1998
1999        // Interleaved writing to the column writers
2000        for (writer, batch) in column_writers.iter_mut().zip(column_data) {
2001            let writer = writer.typed::<Int32Type>();
2002            writer.write_batch(&batch, None, None).unwrap();
2003        }
2004
2005        // Close the column writers
2006        for writer in column_writers {
2007            writer.close().unwrap()
2008        }
2009
2010        // Splice column data into a row group
2011        let mut row_group_writer = file_writer.next_row_group().unwrap();
2012        for (write, close) in column_state {
2013            let buf = Bytes::from(write.into_inner().unwrap());
2014            row_group_writer
2015                .append_column(&buf, close.unwrap())
2016                .unwrap();
2017        }
2018        row_group_writer.close().unwrap();
2019        file_writer.close().unwrap();
2020
2021        // Check data was written correctly
2022        let file = Bytes::from(file);
2023        let test_read = |reader: SerializedFileReader<Bytes>| {
2024            let row_group = reader.get_row_group(0).unwrap();
2025
2026            let mut out = Vec::with_capacity(4);
2027            let c1 = row_group.get_column_reader(0).unwrap();
2028            let mut c1 = get_typed_column_reader::<Int32Type>(c1);
2029            c1.read_records(4, None, None, &mut out).unwrap();
2030            assert_eq!(out, column_data[0]);
2031
2032            out.clear();
2033
2034            let c2 = row_group.get_column_reader(1).unwrap();
2035            let mut c2 = get_typed_column_reader::<Int32Type>(c2);
2036            c2.read_records(4, None, None, &mut out).unwrap();
2037            assert_eq!(out, column_data[1]);
2038        };
2039
2040        let reader = SerializedFileReader::new(file.clone()).unwrap();
2041        test_read(reader);
2042
2043        let options = ReadOptionsBuilder::new().with_page_index().build();
2044        let reader = SerializedFileReader::new_with_options(file, options).unwrap();
2045        test_read(reader);
2046    }
2047
2048    #[test]
2049    fn test_disabled_statistics() {
2050        let message_type = "
2051            message test_schema {
2052                REQUIRED INT32 a;
2053                REQUIRED INT32 b;
2054            }
2055        ";
2056        let schema = Arc::new(parse_message_type(message_type).unwrap());
2057        let props = WriterProperties::builder()
2058            .set_statistics_enabled(EnabledStatistics::None)
2059            .set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
2060            .set_offset_index_disabled(true) // this should be ignored because of the line above
2061            .build();
2062        let mut file = Vec::with_capacity(1024);
2063        let mut file_writer =
2064            SerializedFileWriter::new(&mut file, schema, Arc::new(props)).unwrap();
2065
2066        let mut row_group_writer = file_writer.next_row_group().unwrap();
2067        let mut a_writer = row_group_writer.next_column().unwrap().unwrap();
2068        let col_writer = a_writer.typed::<Int32Type>();
2069        col_writer.write_batch(&[1, 2, 3], None, None).unwrap();
2070        a_writer.close().unwrap();
2071
2072        let mut b_writer = row_group_writer.next_column().unwrap().unwrap();
2073        let col_writer = b_writer.typed::<Int32Type>();
2074        col_writer.write_batch(&[4, 5, 6], None, None).unwrap();
2075        b_writer.close().unwrap();
2076        row_group_writer.close().unwrap();
2077
2078        let metadata = file_writer.finish().unwrap();
2079        assert_eq!(metadata.num_row_groups(), 1);
2080        let row_group = metadata.row_group(0);
2081        assert_eq!(row_group.num_columns(), 2);
2082        // Column "a" has both offset and column index, as requested
2083        assert!(row_group.column(0).offset_index_offset().is_some());
2084        assert!(row_group.column(0).column_index_offset().is_some());
2085        // Column "b" should only have offset index
2086        assert!(row_group.column(1).offset_index_offset().is_some());
2087        assert!(row_group.column(1).column_index_offset().is_none());
2088
2089        let err = file_writer.next_row_group().err().unwrap().to_string();
2090        assert_eq!(err, "Parquet error: SerializedFileWriter already finished");
2091
2092        drop(file_writer);
2093
2094        let options = ReadOptionsBuilder::new().with_page_index().build();
2095        let reader = SerializedFileReader::new_with_options(Bytes::from(file), options).unwrap();
2096
2097        let offset_index = reader.metadata().offset_index().unwrap();
2098        assert_eq!(offset_index.len(), 1); // 1 row group
2099        assert_eq!(offset_index[0].len(), 2); // 2 columns
2100
2101        let column_index = reader.metadata().column_index().unwrap();
2102        assert_eq!(column_index.len(), 1); // 1 row group
2103        assert_eq!(column_index[0].len(), 2); // 2 column
2104
2105        let a_idx = &column_index[0][0];
2106        assert!(matches!(a_idx, ColumnIndexMetaData::INT32(_)), "{a_idx:?}");
2107        let b_idx = &column_index[0][1];
2108        assert!(matches!(b_idx, ColumnIndexMetaData::NONE), "{b_idx:?}");
2109    }
2110
2111    #[test]
2112    fn test_byte_array_size_statistics() {
2113        let message_type = "
2114            message test_schema {
2115                OPTIONAL BYTE_ARRAY a (UTF8);
2116            }
2117        ";
2118        let schema = Arc::new(parse_message_type(message_type).unwrap());
2119        let data = ByteArrayType::gen_vec(32, 7);
2120        let def_levels = [1, 1, 1, 1, 0, 1, 0, 1, 0, 1];
2121        let unenc_size: i64 = data.iter().map(|x| x.len() as i64).sum();
2122        let file: File = tempfile::tempfile().unwrap();
2123        let props = Arc::new(
2124            WriterProperties::builder()
2125                .set_statistics_enabled(EnabledStatistics::Page)
2126                .build(),
2127        );
2128
2129        let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
2130        let mut row_group_writer = writer.next_row_group().unwrap();
2131
2132        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
2133        col_writer
2134            .typed::<ByteArrayType>()
2135            .write_batch(&data, Some(&def_levels), None)
2136            .unwrap();
2137        col_writer.close().unwrap();
2138        row_group_writer.close().unwrap();
2139        let file_metadata = writer.close().unwrap();
2140
2141        assert_eq!(file_metadata.num_row_groups(), 1);
2142        assert_eq!(file_metadata.row_group(0).num_columns(), 1);
2143
2144        let check_def_hist = |def_hist: &[i64]| {
2145            assert_eq!(def_hist.len(), 2);
2146            assert_eq!(def_hist[0], 3);
2147            assert_eq!(def_hist[1], 7);
2148        };
2149
2150        let meta_data = file_metadata.row_group(0).column(0);
2151
2152        assert!(meta_data.repetition_level_histogram().is_none());
2153        assert!(meta_data.definition_level_histogram().is_some());
2154        assert!(meta_data.unencoded_byte_array_data_bytes().is_some());
2155        assert_eq!(
2156            unenc_size,
2157            meta_data.unencoded_byte_array_data_bytes().unwrap()
2158        );
2159        check_def_hist(meta_data.definition_level_histogram().unwrap().values());
2160
2161        // check that the read metadata is also correct
2162        let options = ReadOptionsBuilder::new().with_page_index().build();
2163        let reader = SerializedFileReader::new_with_options(file, options).unwrap();
2164
2165        let rfile_metadata = reader.metadata().file_metadata();
2166        assert_eq!(
2167            rfile_metadata.num_rows(),
2168            file_metadata.file_metadata().num_rows()
2169        );
2170        assert_eq!(reader.num_row_groups(), 1);
2171        let rowgroup = reader.get_row_group(0).unwrap();
2172        assert_eq!(rowgroup.num_columns(), 1);
2173        let column = rowgroup.metadata().column(0);
2174        assert!(column.definition_level_histogram().is_some());
2175        assert!(column.repetition_level_histogram().is_none());
2176        assert!(column.unencoded_byte_array_data_bytes().is_some());
2177        check_def_hist(column.definition_level_histogram().unwrap().values());
2178        assert_eq!(
2179            unenc_size,
2180            column.unencoded_byte_array_data_bytes().unwrap()
2181        );
2182
2183        // check histogram in column index as well
2184        assert!(reader.metadata().column_index().is_some());
2185        let column_index = reader.metadata().column_index().unwrap();
2186        assert_eq!(column_index.len(), 1);
2187        assert_eq!(column_index[0].len(), 1);
2188        let col_idx = if let ColumnIndexMetaData::BYTE_ARRAY(index) = &column_index[0][0] {
2189            assert_eq!(index.num_pages(), 1);
2190            index
2191        } else {
2192            unreachable!()
2193        };
2194
2195        assert!(col_idx.repetition_level_histogram(0).is_none());
2196        assert!(col_idx.definition_level_histogram(0).is_some());
2197        check_def_hist(col_idx.definition_level_histogram(0).unwrap());
2198
2199        assert!(reader.metadata().offset_index().is_some());
2200        let offset_index = reader.metadata().offset_index().unwrap();
2201        assert_eq!(offset_index.len(), 1);
2202        assert_eq!(offset_index[0].len(), 1);
2203        assert!(offset_index[0][0].unencoded_byte_array_data_bytes.is_some());
2204        let page_sizes = offset_index[0][0]
2205            .unencoded_byte_array_data_bytes
2206            .as_ref()
2207            .unwrap();
2208        assert_eq!(page_sizes.len(), 1);
2209        assert_eq!(page_sizes[0], unenc_size);
2210    }
2211
2212    #[test]
2213    fn test_too_many_rowgroups() {
2214        let message_type = "
2215            message test_schema {
2216                REQUIRED BYTE_ARRAY a (UTF8);
2217            }
2218        ";
2219        let schema = Arc::new(parse_message_type(message_type).unwrap());
2220        let file: File = tempfile::tempfile().unwrap();
2221        let props = Arc::new(
2222            WriterProperties::builder()
2223                .set_statistics_enabled(EnabledStatistics::None)
2224                .set_max_row_group_row_count(Some(1))
2225                .build(),
2226        );
2227        let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
2228
2229        // Create 32k empty rowgroups. Should error when i == 32768.
2230        for i in 0..0x8001 {
2231            match writer.next_row_group() {
2232                Ok(mut row_group_writer) => {
2233                    assert_ne!(i, 0x8000);
2234                    let col_writer = row_group_writer.next_column().unwrap().unwrap();
2235                    col_writer.close().unwrap();
2236                    row_group_writer.close().unwrap();
2237                }
2238                Err(e) => {
2239                    assert_eq!(i, 0x8000);
2240                    assert_eq!(
2241                        e.to_string(),
2242                        "Parquet error: Parquet does not support more than 32767 row groups per file (currently: 32768)"
2243                    );
2244                }
2245            }
2246        }
2247        writer.close().unwrap();
2248    }
2249
2250    #[test]
2251    fn test_size_statistics_with_repetition_and_nulls() {
2252        let message_type = "
2253            message test_schema {
2254                OPTIONAL group i32_list (LIST) {
2255                    REPEATED group list {
2256                        OPTIONAL INT32 element;
2257                    }
2258                }
2259            }
2260        ";
2261        // column is:
2262        // row 0: [1, 2]
2263        // row 1: NULL
2264        // row 2: [4, NULL]
2265        // row 3: []
2266        // row 4: [7, 8, 9, 10]
2267        let schema = Arc::new(parse_message_type(message_type).unwrap());
2268        let data = [1, 2, 4, 7, 8, 9, 10];
2269        let def_levels = [3, 3, 0, 3, 2, 1, 3, 3, 3, 3];
2270        let rep_levels = [0, 1, 0, 0, 1, 0, 0, 1, 1, 1];
2271        let file = tempfile::tempfile().unwrap();
2272        let props = Arc::new(
2273            WriterProperties::builder()
2274                .set_statistics_enabled(EnabledStatistics::Page)
2275                .build(),
2276        );
2277        let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
2278        let mut row_group_writer = writer.next_row_group().unwrap();
2279
2280        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
2281        col_writer
2282            .typed::<Int32Type>()
2283            .write_batch(&data, Some(&def_levels), Some(&rep_levels))
2284            .unwrap();
2285        col_writer.close().unwrap();
2286        row_group_writer.close().unwrap();
2287        let file_metadata = writer.close().unwrap();
2288
2289        assert_eq!(file_metadata.num_row_groups(), 1);
2290        assert_eq!(file_metadata.row_group(0).num_columns(), 1);
2291
2292        let check_def_hist = |def_hist: &[i64]| {
2293            assert_eq!(def_hist.len(), 4);
2294            assert_eq!(def_hist[0], 1);
2295            assert_eq!(def_hist[1], 1);
2296            assert_eq!(def_hist[2], 1);
2297            assert_eq!(def_hist[3], 7);
2298        };
2299
2300        let check_rep_hist = |rep_hist: &[i64]| {
2301            assert_eq!(rep_hist.len(), 2);
2302            assert_eq!(rep_hist[0], 5);
2303            assert_eq!(rep_hist[1], 5);
2304        };
2305
2306        // check that histograms are set properly in the write and read metadata
2307        // also check that unencoded_byte_array_data_bytes is not set
2308        let meta_data = file_metadata.row_group(0).column(0);
2309        assert!(meta_data.repetition_level_histogram().is_some());
2310        assert!(meta_data.definition_level_histogram().is_some());
2311        assert!(meta_data.unencoded_byte_array_data_bytes().is_none());
2312        check_def_hist(meta_data.definition_level_histogram().unwrap().values());
2313        check_rep_hist(meta_data.repetition_level_histogram().unwrap().values());
2314
2315        // check that the read metadata is also correct
2316        let options = ReadOptionsBuilder::new().with_page_index().build();
2317        let reader = SerializedFileReader::new_with_options(file, options).unwrap();
2318
2319        let rfile_metadata = reader.metadata().file_metadata();
2320        assert_eq!(
2321            rfile_metadata.num_rows(),
2322            file_metadata.file_metadata().num_rows()
2323        );
2324        assert_eq!(reader.num_row_groups(), 1);
2325        let rowgroup = reader.get_row_group(0).unwrap();
2326        assert_eq!(rowgroup.num_columns(), 1);
2327        let column = rowgroup.metadata().column(0);
2328        assert!(column.definition_level_histogram().is_some());
2329        assert!(column.repetition_level_histogram().is_some());
2330        assert!(column.unencoded_byte_array_data_bytes().is_none());
2331        check_def_hist(column.definition_level_histogram().unwrap().values());
2332        check_rep_hist(column.repetition_level_histogram().unwrap().values());
2333
2334        // check histogram in column index as well
2335        assert!(reader.metadata().column_index().is_some());
2336        let column_index = reader.metadata().column_index().unwrap();
2337        assert_eq!(column_index.len(), 1);
2338        assert_eq!(column_index[0].len(), 1);
2339        let col_idx = if let ColumnIndexMetaData::INT32(index) = &column_index[0][0] {
2340            assert_eq!(index.num_pages(), 1);
2341            index
2342        } else {
2343            unreachable!()
2344        };
2345
2346        check_def_hist(col_idx.definition_level_histogram(0).unwrap());
2347        check_rep_hist(col_idx.repetition_level_histogram(0).unwrap());
2348
2349        assert!(reader.metadata().offset_index().is_some());
2350        let offset_index = reader.metadata().offset_index().unwrap();
2351        assert_eq!(offset_index.len(), 1);
2352        assert_eq!(offset_index[0].len(), 1);
2353        assert!(offset_index[0][0].unencoded_byte_array_data_bytes.is_none());
2354    }
2355
2356    #[test]
2357    #[cfg(feature = "arrow")]
2358    fn test_byte_stream_split_extended_roundtrip() {
2359        let path = format!(
2360            "{}/byte_stream_split_extended.gzip.parquet",
2361            arrow::util::test_util::parquet_test_data(),
2362        );
2363        let file = File::open(path).unwrap();
2364
2365        // Read in test file and rewrite to tmp
2366        let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(file)
2367            .expect("parquet open")
2368            .build()
2369            .expect("parquet open");
2370
2371        let file = tempfile::tempfile().unwrap();
2372        let props = WriterProperties::builder()
2373            .set_dictionary_enabled(false)
2374            .set_column_encoding(
2375                ColumnPath::from("float16_byte_stream_split"),
2376                Encoding::BYTE_STREAM_SPLIT,
2377            )
2378            .set_column_encoding(
2379                ColumnPath::from("float_byte_stream_split"),
2380                Encoding::BYTE_STREAM_SPLIT,
2381            )
2382            .set_column_encoding(
2383                ColumnPath::from("double_byte_stream_split"),
2384                Encoding::BYTE_STREAM_SPLIT,
2385            )
2386            .set_column_encoding(
2387                ColumnPath::from("int32_byte_stream_split"),
2388                Encoding::BYTE_STREAM_SPLIT,
2389            )
2390            .set_column_encoding(
2391                ColumnPath::from("int64_byte_stream_split"),
2392                Encoding::BYTE_STREAM_SPLIT,
2393            )
2394            .set_column_encoding(
2395                ColumnPath::from("flba5_byte_stream_split"),
2396                Encoding::BYTE_STREAM_SPLIT,
2397            )
2398            .set_column_encoding(
2399                ColumnPath::from("decimal_byte_stream_split"),
2400                Encoding::BYTE_STREAM_SPLIT,
2401            )
2402            .build();
2403
2404        let mut parquet_writer = ArrowWriter::try_new(
2405            file.try_clone().expect("cannot open file"),
2406            parquet_reader.schema(),
2407            Some(props),
2408        )
2409        .expect("create arrow writer");
2410
2411        for maybe_batch in parquet_reader {
2412            let batch = maybe_batch.expect("reading batch");
2413            parquet_writer.write(&batch).expect("writing data");
2414        }
2415
2416        parquet_writer.close().expect("finalizing file");
2417
2418        let reader = SerializedFileReader::new(file).expect("Failed to create reader");
2419        let filemeta = reader.metadata();
2420
2421        // Make sure byte_stream_split encoding was used
2422        let check_encoding = |x: usize, filemeta: &ParquetMetaData| {
2423            assert!(
2424                filemeta
2425                    .row_group(0)
2426                    .column(x)
2427                    .encodings()
2428                    .collect::<Vec<_>>()
2429                    .contains(&Encoding::BYTE_STREAM_SPLIT)
2430            );
2431        };
2432
2433        check_encoding(1, filemeta);
2434        check_encoding(3, filemeta);
2435        check_encoding(5, filemeta);
2436        check_encoding(7, filemeta);
2437        check_encoding(9, filemeta);
2438        check_encoding(11, filemeta);
2439        check_encoding(13, filemeta);
2440
2441        // Read back tmpfile and make sure all values are correct
2442        let mut iter = reader
2443            .get_row_iter(None)
2444            .expect("Failed to create row iterator");
2445
2446        let mut start = 0;
2447        let end = reader.metadata().file_metadata().num_rows();
2448
2449        let check_row = |row: Result<Row, ParquetError>| {
2450            assert!(row.is_ok());
2451            let r = row.unwrap();
2452            assert_eq!(r.get_float16(0).unwrap(), r.get_float16(1).unwrap());
2453            assert_eq!(r.get_float(2).unwrap(), r.get_float(3).unwrap());
2454            assert_eq!(r.get_double(4).unwrap(), r.get_double(5).unwrap());
2455            assert_eq!(r.get_int(6).unwrap(), r.get_int(7).unwrap());
2456            assert_eq!(r.get_long(8).unwrap(), r.get_long(9).unwrap());
2457            assert_eq!(r.get_bytes(10).unwrap(), r.get_bytes(11).unwrap());
2458            assert_eq!(r.get_decimal(12).unwrap(), r.get_decimal(13).unwrap());
2459        };
2460
2461        while start < end {
2462            match iter.next() {
2463                Some(row) => check_row(row),
2464                None => break,
2465            };
2466            start += 1;
2467        }
2468    }
2469
2470    #[test]
2471    fn test_rewrite_no_page_indexes() {
2472        let file = get_test_file("alltypes_tiny_pages.parquet");
2473        let metadata = ParquetMetaDataReader::new()
2474            .with_page_index_policy(PageIndexPolicy::Optional)
2475            .parse_and_finish(&file)
2476            .unwrap();
2477
2478        let props = Arc::new(WriterProperties::builder().build());
2479        let schema = metadata.file_metadata().schema_descr().root_schema_ptr();
2480        let output = Vec::<u8>::new();
2481        let mut writer = SerializedFileWriter::new(output, schema, props).unwrap();
2482
2483        for rg in metadata.row_groups() {
2484            let mut rg_out = writer.next_row_group().unwrap();
2485            for column in rg.columns() {
2486                let result = ColumnCloseResult {
2487                    bytes_written: column.compressed_size() as _,
2488                    rows_written: rg.num_rows() as _,
2489                    metadata: column.clone(),
2490                    bloom_filter: None,
2491                    column_index: None,
2492                    offset_index: None,
2493                };
2494                rg_out.append_column(&file, result).unwrap();
2495            }
2496            rg_out.close().unwrap();
2497        }
2498        writer.close().unwrap();
2499    }
2500
2501    #[test]
2502    fn test_rewrite_missing_column_index() {
2503        // this file has an INT96 column that lacks a column index entry
2504        let file = get_test_file("alltypes_tiny_pages.parquet");
2505        let metadata = ParquetMetaDataReader::new()
2506            .with_page_index_policy(PageIndexPolicy::Optional)
2507            .parse_and_finish(&file)
2508            .unwrap();
2509
2510        let props = Arc::new(WriterProperties::builder().build());
2511        let schema = metadata.file_metadata().schema_descr().root_schema_ptr();
2512        let output = Vec::<u8>::new();
2513        let mut writer = SerializedFileWriter::new(output, schema, props).unwrap();
2514
2515        let column_indexes = metadata.column_index();
2516        let offset_indexes = metadata.offset_index();
2517
2518        for (rg_idx, rg) in metadata.row_groups().iter().enumerate() {
2519            let rg_column_indexes = column_indexes.and_then(|ci| ci.get(rg_idx));
2520            let rg_offset_indexes = offset_indexes.and_then(|oi| oi.get(rg_idx));
2521            let mut rg_out = writer.next_row_group().unwrap();
2522            for (col_idx, column) in rg.columns().iter().enumerate() {
2523                let column_index = rg_column_indexes.and_then(|row| row.get(col_idx)).cloned();
2524                let offset_index = rg_offset_indexes.and_then(|row| row.get(col_idx)).cloned();
2525                let result = ColumnCloseResult {
2526                    bytes_written: column.compressed_size() as _,
2527                    rows_written: rg.num_rows() as _,
2528                    metadata: column.clone(),
2529                    bloom_filter: None,
2530                    column_index,
2531                    offset_index,
2532                };
2533                rg_out.append_column(&file, result).unwrap();
2534            }
2535            rg_out.close().unwrap();
2536        }
2537        writer.close().unwrap();
2538    }
2539}