parquet/file/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`SerializedFileWriter`]: Low level Parquet writer API
19
20use crate::bloom_filter::Sbbf;
21use crate::format as parquet;
22use crate::format::{ColumnIndex, OffsetIndex};
23use crate::thrift::TSerializable;
24use std::fmt::Debug;
25use std::io::{BufWriter, IoSlice, Read};
26use std::{io::Write, sync::Arc};
27use thrift::protocol::TCompactOutputProtocol;
28
29use crate::column::page_encryption::PageEncryptor;
30use crate::column::writer::{get_typed_column_writer_mut, ColumnCloseResult, ColumnWriterImpl};
31use crate::column::{
32    page::{CompressedPage, PageWriteSpec, PageWriter},
33    writer::{get_column_writer, ColumnWriter},
34};
35use crate::data_type::DataType;
36#[cfg(feature = "encryption")]
37use crate::encryption::encrypt::{
38    get_column_crypto_metadata, FileEncryptionProperties, FileEncryptor,
39};
40use crate::errors::{ParquetError, Result};
41use crate::file::properties::{BloomFilterPosition, WriterPropertiesPtr};
42use crate::file::reader::ChunkReader;
43#[cfg(feature = "encryption")]
44use crate::file::PARQUET_MAGIC_ENCR_FOOTER;
45use crate::file::{metadata::*, PARQUET_MAGIC};
46use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor, TypePtr};
47
48/// A wrapper around a [`Write`] that keeps track of the number
49/// of bytes that have been written. The given [`Write`] is wrapped
50/// with a [`BufWriter`] to optimize writing performance.
51pub struct TrackedWrite<W: Write> {
52    inner: BufWriter<W>,
53    bytes_written: usize,
54}
55
56impl<W: Write> TrackedWrite<W> {
57    /// Create a new [`TrackedWrite`] from a [`Write`]
58    pub fn new(inner: W) -> Self {
59        let buf_write = BufWriter::new(inner);
60        Self {
61            inner: buf_write,
62            bytes_written: 0,
63        }
64    }
65
66    /// Returns the number of bytes written to this instance
67    pub fn bytes_written(&self) -> usize {
68        self.bytes_written
69    }
70
71    /// Returns a reference to the underlying writer.
72    pub fn inner(&self) -> &W {
73        self.inner.get_ref()
74    }
75
76    /// Returns a mutable reference to the underlying writer.
77    ///
78    /// It is inadvisable to directly write to the underlying writer, doing so
79    /// will likely result in data corruption
80    pub fn inner_mut(&mut self) -> &mut W {
81        self.inner.get_mut()
82    }
83
84    /// Returns the underlying writer.
85    pub fn into_inner(self) -> Result<W> {
86        self.inner.into_inner().map_err(|err| {
87            ParquetError::General(format!("fail to get inner writer: {:?}", err.to_string()))
88        })
89    }
90}
91
92impl<W: Write> Write for TrackedWrite<W> {
93    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
94        let bytes = self.inner.write(buf)?;
95        self.bytes_written += bytes;
96        Ok(bytes)
97    }
98
99    fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> std::io::Result<usize> {
100        let bytes = self.inner.write_vectored(bufs)?;
101        self.bytes_written += bytes;
102        Ok(bytes)
103    }
104
105    fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
106        self.inner.write_all(buf)?;
107        self.bytes_written += buf.len();
108
109        Ok(())
110    }
111
112    fn flush(&mut self) -> std::io::Result<()> {
113        self.inner.flush()
114    }
115}
116
117/// Callback invoked on closing a column chunk
118pub type OnCloseColumnChunk<'a> = Box<dyn FnOnce(ColumnCloseResult) -> Result<()> + 'a>;
119
120/// Callback invoked on closing a row group, arguments are:
121///
122/// - the row group metadata
123/// - the column index for each column chunk
124/// - the offset index for each column chunk
125pub type OnCloseRowGroup<'a, W> = Box<
126    dyn FnOnce(
127            &'a mut TrackedWrite<W>,
128            RowGroupMetaData,
129            Vec<Option<Sbbf>>,
130            Vec<Option<ColumnIndex>>,
131            Vec<Option<OffsetIndex>>,
132        ) -> Result<()>
133        + 'a
134        + Send,
135>;
136
137// ----------------------------------------------------------------------
138// Serialized impl for file & row group writers
139
140/// Parquet file writer API.
141///
142/// This is a low level API for writing Parquet files directly, and handles
143/// tracking the location of file structures such as row groups and column
144/// chunks, and writing the metadata and file footer.
145///
146/// Data is written to row groups using  [`SerializedRowGroupWriter`] and
147/// columns using [`SerializedColumnWriter`]. The `SerializedFileWriter` tracks
148/// where all the data is written, and assembles the final file metadata.
149///
150/// The main workflow should be as following:
151/// - Create file writer, this will open a new file and potentially write some metadata.
152/// - Request a new row group writer by calling `next_row_group`.
153/// - Once finished writing row group, close row group writer by calling `close`
154/// - Write subsequent row groups, if necessary.
155/// - After all row groups have been written, close the file writer using `close` method.
156pub struct SerializedFileWriter<W: Write> {
157    buf: TrackedWrite<W>,
158    schema: TypePtr,
159    descr: SchemaDescPtr,
160    props: WriterPropertiesPtr,
161    row_groups: Vec<RowGroupMetaData>,
162    bloom_filters: Vec<Vec<Option<Sbbf>>>,
163    column_indexes: Vec<Vec<Option<ColumnIndex>>>,
164    offset_indexes: Vec<Vec<Option<OffsetIndex>>>,
165    row_group_index: usize,
166    // kv_metadatas will be appended to `props` when `write_metadata`
167    kv_metadatas: Vec<KeyValue>,
168    finished: bool,
169    #[cfg(feature = "encryption")]
170    file_encryptor: Option<Arc<FileEncryptor>>,
171}
172
173impl<W: Write> Debug for SerializedFileWriter<W> {
174    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
175        // implement Debug so this can be used with #[derive(Debug)]
176        // in client code rather than actually listing all the fields
177        f.debug_struct("SerializedFileWriter")
178            .field("descr", &self.descr)
179            .field("row_group_index", &self.row_group_index)
180            .field("kv_metadatas", &self.kv_metadatas)
181            .finish_non_exhaustive()
182    }
183}
184
185impl<W: Write + Send> SerializedFileWriter<W> {
186    /// Creates new file writer.
187    pub fn new(buf: W, schema: TypePtr, properties: WriterPropertiesPtr) -> Result<Self> {
188        let mut buf = TrackedWrite::new(buf);
189
190        let schema_descriptor = SchemaDescriptor::new(schema.clone());
191
192        #[cfg(feature = "encryption")]
193        let file_encryptor = Self::get_file_encryptor(&properties, &schema_descriptor)?;
194
195        Self::start_file(&properties, &mut buf)?;
196        Ok(Self {
197            buf,
198            schema,
199            descr: Arc::new(schema_descriptor),
200            props: properties,
201            row_groups: vec![],
202            bloom_filters: vec![],
203            column_indexes: Vec::new(),
204            offset_indexes: Vec::new(),
205            row_group_index: 0,
206            kv_metadatas: Vec::new(),
207            finished: false,
208            #[cfg(feature = "encryption")]
209            file_encryptor,
210        })
211    }
212
213    #[cfg(feature = "encryption")]
214    fn get_file_encryptor(
215        properties: &WriterPropertiesPtr,
216        schema_descriptor: &SchemaDescriptor,
217    ) -> Result<Option<Arc<FileEncryptor>>> {
218        if let Some(file_encryption_properties) = &properties.file_encryption_properties {
219            file_encryption_properties.validate_encrypted_column_names(schema_descriptor)?;
220
221            Ok(Some(Arc::new(FileEncryptor::new(
222                file_encryption_properties.clone(),
223            )?)))
224        } else {
225            Ok(None)
226        }
227    }
228
229    /// Creates new row group from this file writer.
230    ///
231    /// Note: Parquet files are limited to at most 2^15 row groups in a file; and row groups must
232    /// be written sequentially.
233    ///
234    /// Every time the next row group is requested, the previous row group must
235    /// be finalised and closed using the [`SerializedRowGroupWriter::close`]
236    /// method or an error will be returned.
237    pub fn next_row_group(&mut self) -> Result<SerializedRowGroupWriter<'_, W>> {
238        self.assert_previous_writer_closed()?;
239        let ordinal = self.row_group_index;
240
241        let ordinal: i16 = ordinal.try_into().map_err(|_| {
242            ParquetError::General(format!(
243                "Parquet does not support more than {} row groups per file (currently: {})",
244                i16::MAX,
245                ordinal
246            ))
247        })?;
248
249        self.row_group_index = self
250            .row_group_index
251            .checked_add(1)
252            .expect("SerializedFileWriter::row_group_index overflowed");
253
254        let bloom_filter_position = self.properties().bloom_filter_position();
255        let row_groups = &mut self.row_groups;
256        let row_bloom_filters = &mut self.bloom_filters;
257        let row_column_indexes = &mut self.column_indexes;
258        let row_offset_indexes = &mut self.offset_indexes;
259        let on_close = move |buf,
260                             mut metadata,
261                             row_group_bloom_filter,
262                             row_group_column_index,
263                             row_group_offset_index| {
264            row_bloom_filters.push(row_group_bloom_filter);
265            row_column_indexes.push(row_group_column_index);
266            row_offset_indexes.push(row_group_offset_index);
267            // write bloom filters out immediately after the row group if requested
268            match bloom_filter_position {
269                BloomFilterPosition::AfterRowGroup => {
270                    write_bloom_filters(buf, row_bloom_filters, &mut metadata)?
271                }
272                BloomFilterPosition::End => (),
273            };
274            row_groups.push(metadata);
275            Ok(())
276        };
277
278        let row_group_writer = SerializedRowGroupWriter::new(
279            self.descr.clone(),
280            self.props.clone(),
281            &mut self.buf,
282            ordinal,
283            Some(Box::new(on_close)),
284        );
285        #[cfg(feature = "encryption")]
286        let row_group_writer = row_group_writer.with_file_encryptor(self.file_encryptor.clone());
287
288        Ok(row_group_writer)
289    }
290
291    /// Returns metadata for any flushed row groups
292    pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
293        &self.row_groups
294    }
295
296    /// Close and finalize the underlying Parquet writer
297    ///
298    /// Unlike [`Self::close`] this does not consume self
299    ///
300    /// Attempting to write after calling finish will result in an error
301    pub fn finish(&mut self) -> Result<parquet::FileMetaData> {
302        self.assert_previous_writer_closed()?;
303        let metadata = self.write_metadata()?;
304        self.buf.flush()?;
305        Ok(metadata)
306    }
307
308    /// Closes and finalises file writer, returning the file metadata.
309    pub fn close(mut self) -> Result<parquet::FileMetaData> {
310        self.finish()
311    }
312
313    /// Writes magic bytes at the beginning of the file.
314    #[cfg(not(feature = "encryption"))]
315    fn start_file(_properties: &WriterPropertiesPtr, buf: &mut TrackedWrite<W>) -> Result<()> {
316        buf.write_all(get_file_magic())?;
317        Ok(())
318    }
319
320    /// Writes magic bytes at the beginning of the file.
321    #[cfg(feature = "encryption")]
322    fn start_file(properties: &WriterPropertiesPtr, buf: &mut TrackedWrite<W>) -> Result<()> {
323        let magic = get_file_magic(properties.file_encryption_properties.as_ref());
324
325        buf.write_all(magic)?;
326        Ok(())
327    }
328
329    /// Assembles and writes metadata at the end of the file.
330    fn write_metadata(&mut self) -> Result<parquet::FileMetaData> {
331        self.finished = true;
332
333        // write out any remaining bloom filters after all row groups
334        for row_group in &mut self.row_groups {
335            write_bloom_filters(&mut self.buf, &mut self.bloom_filters, row_group)?;
336        }
337
338        let key_value_metadata = match self.props.key_value_metadata() {
339            Some(kv) => Some(kv.iter().chain(&self.kv_metadatas).cloned().collect()),
340            None if self.kv_metadatas.is_empty() => None,
341            None => Some(self.kv_metadatas.clone()),
342        };
343
344        let row_groups = self
345            .row_groups
346            .iter()
347            .map(|v| v.to_thrift())
348            .collect::<Vec<_>>();
349
350        let mut encoder = ThriftMetadataWriter::new(
351            &mut self.buf,
352            &self.schema,
353            &self.descr,
354            row_groups,
355            Some(self.props.created_by().to_string()),
356            self.props.writer_version().as_num(),
357        );
358
359        #[cfg(feature = "encryption")]
360        {
361            encoder = encoder.with_file_encryptor(self.file_encryptor.clone());
362        }
363
364        if let Some(key_value_metadata) = key_value_metadata {
365            encoder = encoder.with_key_value_metadata(key_value_metadata)
366        }
367        encoder = encoder.with_column_indexes(&self.column_indexes);
368        encoder = encoder.with_offset_indexes(&self.offset_indexes);
369        encoder.finish()
370    }
371
372    #[inline]
373    fn assert_previous_writer_closed(&self) -> Result<()> {
374        if self.finished {
375            return Err(general_err!("SerializedFileWriter already finished"));
376        }
377
378        if self.row_group_index != self.row_groups.len() {
379            Err(general_err!("Previous row group writer was not closed"))
380        } else {
381            Ok(())
382        }
383    }
384
385    /// Add a [`KeyValue`] to the file writer's metadata
386    pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
387        self.kv_metadatas.push(kv_metadata);
388    }
389
390    /// Returns a reference to schema descriptor.
391    pub fn schema_descr(&self) -> &SchemaDescriptor {
392        &self.descr
393    }
394
395    /// Returns a reference to the writer properties
396    pub fn properties(&self) -> &WriterPropertiesPtr {
397        &self.props
398    }
399
400    /// Returns a reference to the underlying writer.
401    pub fn inner(&self) -> &W {
402        self.buf.inner()
403    }
404
405    /// Writes the given buf bytes to the internal buffer.
406    ///
407    /// This can be used to write raw data to an in-progress Parquet file, for
408    /// example, custom index structures or other payloads. Other Parquet readers
409    /// will skip this data when reading the files.
410    ///
411    /// It's safe to use this method to write data to the underlying writer,
412    /// because it will ensure that the buffering and byte‐counting layers are used.
413    pub fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
414        self.buf.write_all(buf)
415    }
416
417    /// Returns a mutable reference to the underlying writer.
418    ///
419    /// **Warning**: if you write directly to this writer, you will skip
420    /// the `TrackedWrite` buffering and byte‐counting layers, which can cause
421    /// the file footer’s recorded offsets and sizes to diverge from reality,
422    /// resulting in an unreadable or corrupted Parquet file.
423    ///
424    /// If you want to write safely to the underlying writer, use [`Self::write_all`].
425    pub fn inner_mut(&mut self) -> &mut W {
426        self.buf.inner_mut()
427    }
428
429    /// Writes the file footer and returns the underlying writer.
430    pub fn into_inner(mut self) -> Result<W> {
431        self.assert_previous_writer_closed()?;
432        let _ = self.write_metadata()?;
433
434        self.buf.into_inner()
435    }
436
437    /// Returns the number of bytes written to this instance
438    pub fn bytes_written(&self) -> usize {
439        self.buf.bytes_written()
440    }
441
442    /// Get the file encryptor used by this instance to encrypt data
443    #[cfg(feature = "encryption")]
444    pub(crate) fn file_encryptor(&self) -> Option<Arc<FileEncryptor>> {
445        self.file_encryptor.clone()
446    }
447}
448
449/// Serialize all the bloom filters of the given row group to the given buffer,
450/// and returns the updated row group metadata.
451fn write_bloom_filters<W: Write + Send>(
452    buf: &mut TrackedWrite<W>,
453    bloom_filters: &mut [Vec<Option<Sbbf>>],
454    row_group: &mut RowGroupMetaData,
455) -> Result<()> {
456    // iter row group
457    // iter each column
458    // write bloom filter to the file
459
460    let row_group_idx: u16 = row_group
461        .ordinal()
462        .expect("Missing row group ordinal")
463        .try_into()
464        .map_err(|_| {
465            ParquetError::General(format!(
466                "Negative row group ordinal: {})",
467                row_group.ordinal().unwrap()
468            ))
469        })?;
470    let row_group_idx = row_group_idx as usize;
471    for (column_idx, column_chunk) in row_group.columns_mut().iter_mut().enumerate() {
472        if let Some(bloom_filter) = bloom_filters[row_group_idx][column_idx].take() {
473            let start_offset = buf.bytes_written();
474            bloom_filter.write(&mut *buf)?;
475            let end_offset = buf.bytes_written();
476            // set offset and index for bloom filter
477            *column_chunk = column_chunk
478                .clone()
479                .into_builder()
480                .set_bloom_filter_offset(Some(start_offset as i64))
481                .set_bloom_filter_length(Some((end_offset - start_offset) as i32))
482                .build()?;
483        }
484    }
485    Ok(())
486}
487
488/// Parquet row group writer API.
489///
490/// Provides methods to access column writers in an iterator-like fashion, order is
491/// guaranteed to match the order of schema leaves (column descriptors).
492///
493/// All columns should be written sequentially; the main workflow is:
494/// - Request the next column using `next_column` method - this will return `None` if no
495///   more columns are available to write.
496/// - Once done writing a column, close column writer with `close`
497/// - Once all columns have been written, close row group writer with `close`
498///   method. The close method will return row group metadata and is no-op
499///   on already closed row group.
500pub struct SerializedRowGroupWriter<'a, W: Write> {
501    descr: SchemaDescPtr,
502    props: WriterPropertiesPtr,
503    buf: &'a mut TrackedWrite<W>,
504    total_rows_written: Option<u64>,
505    total_bytes_written: u64,
506    total_uncompressed_bytes: i64,
507    column_index: usize,
508    row_group_metadata: Option<RowGroupMetaDataPtr>,
509    column_chunks: Vec<ColumnChunkMetaData>,
510    bloom_filters: Vec<Option<Sbbf>>,
511    column_indexes: Vec<Option<ColumnIndex>>,
512    offset_indexes: Vec<Option<OffsetIndex>>,
513    row_group_index: i16,
514    file_offset: i64,
515    on_close: Option<OnCloseRowGroup<'a, W>>,
516    #[cfg(feature = "encryption")]
517    file_encryptor: Option<Arc<FileEncryptor>>,
518}
519
520impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> {
521    /// Creates a new `SerializedRowGroupWriter` with:
522    ///
523    /// - `schema_descr` - the schema to write
524    /// - `properties` - writer properties
525    /// - `buf` - the buffer to write data to
526    /// - `row_group_index` - row group index in this parquet file.
527    /// - `file_offset` - file offset of this row group in this parquet file.
528    /// - `on_close` - an optional callback that will invoked on [`Self::close`]
529    pub fn new(
530        schema_descr: SchemaDescPtr,
531        properties: WriterPropertiesPtr,
532        buf: &'a mut TrackedWrite<W>,
533        row_group_index: i16,
534        on_close: Option<OnCloseRowGroup<'a, W>>,
535    ) -> Self {
536        let num_columns = schema_descr.num_columns();
537        let file_offset = buf.bytes_written() as i64;
538        Self {
539            buf,
540            row_group_index,
541            file_offset,
542            on_close,
543            total_rows_written: None,
544            descr: schema_descr,
545            props: properties,
546            column_index: 0,
547            row_group_metadata: None,
548            column_chunks: Vec::with_capacity(num_columns),
549            bloom_filters: Vec::with_capacity(num_columns),
550            column_indexes: Vec::with_capacity(num_columns),
551            offset_indexes: Vec::with_capacity(num_columns),
552            total_bytes_written: 0,
553            total_uncompressed_bytes: 0,
554            #[cfg(feature = "encryption")]
555            file_encryptor: None,
556        }
557    }
558
559    #[cfg(feature = "encryption")]
560    /// Set the file encryptor to use for encrypting row group data and metadata
561    pub(crate) fn with_file_encryptor(
562        mut self,
563        file_encryptor: Option<Arc<FileEncryptor>>,
564    ) -> Self {
565        self.file_encryptor = file_encryptor;
566        self
567    }
568
569    /// Advance `self.column_index` returning the next [`ColumnDescPtr`] if any
570    fn next_column_desc(&mut self) -> Option<ColumnDescPtr> {
571        let ret = self.descr.columns().get(self.column_index)?.clone();
572        self.column_index += 1;
573        Some(ret)
574    }
575
576    /// Returns [`OnCloseColumnChunk`] for the next writer
577    fn get_on_close(&mut self) -> (&mut TrackedWrite<W>, OnCloseColumnChunk<'_>) {
578        let total_bytes_written = &mut self.total_bytes_written;
579        let total_uncompressed_bytes = &mut self.total_uncompressed_bytes;
580        let total_rows_written = &mut self.total_rows_written;
581        let column_chunks = &mut self.column_chunks;
582        let column_indexes = &mut self.column_indexes;
583        let offset_indexes = &mut self.offset_indexes;
584        let bloom_filters = &mut self.bloom_filters;
585
586        let on_close = |r: ColumnCloseResult| {
587            // Update row group writer metrics
588            *total_bytes_written += r.bytes_written;
589            *total_uncompressed_bytes += r.metadata.uncompressed_size();
590            column_chunks.push(r.metadata);
591            bloom_filters.push(r.bloom_filter);
592            column_indexes.push(r.column_index);
593            offset_indexes.push(r.offset_index);
594
595            if let Some(rows) = *total_rows_written {
596                if rows != r.rows_written {
597                    return Err(general_err!(
598                        "Incorrect number of rows, expected {} != {} rows",
599                        rows,
600                        r.rows_written
601                    ));
602                }
603            } else {
604                *total_rows_written = Some(r.rows_written);
605            }
606
607            Ok(())
608        };
609        (self.buf, Box::new(on_close))
610    }
611
612    /// Returns the next column writer, if available, using the factory function;
613    /// otherwise returns `None`.
614    pub(crate) fn next_column_with_factory<'b, F, C>(&'b mut self, factory: F) -> Result<Option<C>>
615    where
616        F: FnOnce(
617            ColumnDescPtr,
618            WriterPropertiesPtr,
619            Box<dyn PageWriter + 'b>,
620            OnCloseColumnChunk<'b>,
621        ) -> Result<C>,
622    {
623        self.assert_previous_writer_closed()?;
624
625        let encryptor_context = self.get_page_encryptor_context();
626
627        Ok(match self.next_column_desc() {
628            Some(column) => {
629                let props = self.props.clone();
630                let (buf, on_close) = self.get_on_close();
631
632                let page_writer = SerializedPageWriter::new(buf);
633                let page_writer =
634                    Self::set_page_writer_encryptor(&column, encryptor_context, page_writer)?;
635
636                Some(factory(
637                    column,
638                    props,
639                    Box::new(page_writer),
640                    Box::new(on_close),
641                )?)
642            }
643            None => None,
644        })
645    }
646
647    /// Returns the next column writer, if available; otherwise returns `None`.
648    /// In case of any IO error or Thrift error, or if row group writer has already been
649    /// closed returns `Err`.
650    pub fn next_column(&mut self) -> Result<Option<SerializedColumnWriter<'_>>> {
651        self.next_column_with_factory(|descr, props, page_writer, on_close| {
652            let column_writer = get_column_writer(descr, props, page_writer);
653            Ok(SerializedColumnWriter::new(column_writer, Some(on_close)))
654        })
655    }
656
657    /// Append an encoded column chunk from `reader` directly to the underlying
658    /// writer.
659    ///
660    /// This method can be used for efficiently concatenating or projecting
661    /// Parquet data, or encoding Parquet data to temporary in-memory buffers.
662    ///
663    /// Arguments:
664    /// - `reader`: a [`ChunkReader`] containing the encoded column data
665    /// - `close`: the [`ColumnCloseResult`] metadata returned from closing
666    ///   the column writer that wrote the data in `reader`.
667    ///
668    /// See Also:
669    /// 1. [`get_column_writer`]  for creating writers that can encode data.
670    /// 2. [`Self::next_column`] for writing data that isn't already encoded
671    pub fn append_column<R: ChunkReader>(
672        &mut self,
673        reader: &R,
674        mut close: ColumnCloseResult,
675    ) -> Result<()> {
676        self.assert_previous_writer_closed()?;
677        let desc = self
678            .next_column_desc()
679            .ok_or_else(|| general_err!("exhausted columns in SerializedRowGroupWriter"))?;
680
681        let metadata = close.metadata;
682
683        if metadata.column_descr() != desc.as_ref() {
684            return Err(general_err!(
685                "column descriptor mismatch, expected {:?} got {:?}",
686                desc,
687                metadata.column_descr()
688            ));
689        }
690
691        let src_dictionary_offset = metadata.dictionary_page_offset();
692        let src_data_offset = metadata.data_page_offset();
693        let src_offset = src_dictionary_offset.unwrap_or(src_data_offset);
694        let src_length = metadata.compressed_size();
695
696        let write_offset = self.buf.bytes_written();
697        let mut read = reader.get_read(src_offset as _)?.take(src_length as _);
698        let write_length = std::io::copy(&mut read, &mut self.buf)?;
699
700        if src_length as u64 != write_length {
701            return Err(general_err!(
702                "Failed to splice column data, expected {read_length} got {write_length}"
703            ));
704        }
705
706        let map_offset = |x| x - src_offset + write_offset as i64;
707        let mut builder = ColumnChunkMetaData::builder(metadata.column_descr_ptr())
708            .set_compression(metadata.compression())
709            .set_encodings(metadata.encodings().clone())
710            .set_total_compressed_size(metadata.compressed_size())
711            .set_total_uncompressed_size(metadata.uncompressed_size())
712            .set_num_values(metadata.num_values())
713            .set_data_page_offset(map_offset(src_data_offset))
714            .set_dictionary_page_offset(src_dictionary_offset.map(map_offset))
715            .set_unencoded_byte_array_data_bytes(metadata.unencoded_byte_array_data_bytes());
716
717        if let Some(rep_hist) = metadata.repetition_level_histogram() {
718            builder = builder.set_repetition_level_histogram(Some(rep_hist.clone()))
719        }
720        if let Some(def_hist) = metadata.definition_level_histogram() {
721            builder = builder.set_definition_level_histogram(Some(def_hist.clone()))
722        }
723        if let Some(statistics) = metadata.statistics() {
724            builder = builder.set_statistics(statistics.clone())
725        }
726        if let Some(page_encoding_stats) = metadata.page_encoding_stats() {
727            builder = builder.set_page_encoding_stats(page_encoding_stats.clone())
728        }
729        builder = self.set_column_crypto_metadata(builder, &metadata);
730        close.metadata = builder.build()?;
731
732        if let Some(offsets) = close.offset_index.as_mut() {
733            for location in &mut offsets.page_locations {
734                location.offset = map_offset(location.offset)
735            }
736        }
737
738        let (_, on_close) = self.get_on_close();
739        on_close(close)
740    }
741
742    /// Closes this row group writer and returns row group metadata.
743    pub fn close(mut self) -> Result<RowGroupMetaDataPtr> {
744        if self.row_group_metadata.is_none() {
745            self.assert_previous_writer_closed()?;
746
747            let column_chunks = std::mem::take(&mut self.column_chunks);
748            let row_group_metadata = RowGroupMetaData::builder(self.descr.clone())
749                .set_column_metadata(column_chunks)
750                .set_total_byte_size(self.total_uncompressed_bytes)
751                .set_num_rows(self.total_rows_written.unwrap_or(0) as i64)
752                .set_sorting_columns(self.props.sorting_columns().cloned())
753                .set_ordinal(self.row_group_index)
754                .set_file_offset(self.file_offset)
755                .build()?;
756
757            self.row_group_metadata = Some(Arc::new(row_group_metadata.clone()));
758
759            if let Some(on_close) = self.on_close.take() {
760                on_close(
761                    self.buf,
762                    row_group_metadata,
763                    self.bloom_filters,
764                    self.column_indexes,
765                    self.offset_indexes,
766                )?
767            }
768        }
769
770        let metadata = self.row_group_metadata.as_ref().unwrap().clone();
771        Ok(metadata)
772    }
773
774    /// Set the column crypto metadata for a column chunk
775    #[cfg(feature = "encryption")]
776    fn set_column_crypto_metadata(
777        &self,
778        builder: ColumnChunkMetaDataBuilder,
779        metadata: &ColumnChunkMetaData,
780    ) -> ColumnChunkMetaDataBuilder {
781        if let Some(file_encryptor) = self.file_encryptor.as_ref() {
782            builder.set_column_crypto_metadata(get_column_crypto_metadata(
783                file_encryptor.properties(),
784                &metadata.column_descr_ptr(),
785            ))
786        } else {
787            builder
788        }
789    }
790
791    /// Get context required to create a [`PageEncryptor`] for a column
792    #[cfg(feature = "encryption")]
793    fn get_page_encryptor_context(&self) -> PageEncryptorContext {
794        PageEncryptorContext {
795            file_encryptor: self.file_encryptor.clone(),
796            row_group_index: self.row_group_index as usize,
797            column_index: self.column_index,
798        }
799    }
800
801    /// Set the [`PageEncryptor`] on a page writer if a column is encrypted
802    #[cfg(feature = "encryption")]
803    fn set_page_writer_encryptor<'b>(
804        column: &ColumnDescPtr,
805        context: PageEncryptorContext,
806        page_writer: SerializedPageWriter<'b, W>,
807    ) -> Result<SerializedPageWriter<'b, W>> {
808        let page_encryptor = PageEncryptor::create_if_column_encrypted(
809            &context.file_encryptor,
810            context.row_group_index,
811            context.column_index,
812            &column.path().string(),
813        )?;
814
815        Ok(page_writer.with_page_encryptor(page_encryptor))
816    }
817
818    /// No-op implementation of setting the column crypto metadata for a column chunk
819    #[cfg(not(feature = "encryption"))]
820    fn set_column_crypto_metadata(
821        &self,
822        builder: ColumnChunkMetaDataBuilder,
823        _metadata: &ColumnChunkMetaData,
824    ) -> ColumnChunkMetaDataBuilder {
825        builder
826    }
827
828    #[cfg(not(feature = "encryption"))]
829    fn get_page_encryptor_context(&self) -> PageEncryptorContext {
830        PageEncryptorContext {}
831    }
832
833    /// No-op implementation of setting a [`PageEncryptor`] for when encryption is disabled
834    #[cfg(not(feature = "encryption"))]
835    fn set_page_writer_encryptor<'b>(
836        _column: &ColumnDescPtr,
837        _context: PageEncryptorContext,
838        page_writer: SerializedPageWriter<'b, W>,
839    ) -> Result<SerializedPageWriter<'b, W>> {
840        Ok(page_writer)
841    }
842
843    #[inline]
844    fn assert_previous_writer_closed(&self) -> Result<()> {
845        if self.column_index != self.column_chunks.len() {
846            Err(general_err!("Previous column writer was not closed"))
847        } else {
848            Ok(())
849        }
850    }
851}
852
853/// Context required to create a [`PageEncryptor`] for a column
854#[cfg(feature = "encryption")]
855struct PageEncryptorContext {
856    file_encryptor: Option<Arc<FileEncryptor>>,
857    row_group_index: usize,
858    column_index: usize,
859}
860
861#[cfg(not(feature = "encryption"))]
862struct PageEncryptorContext {}
863
864/// A wrapper around a [`ColumnWriter`] that invokes a callback on [`Self::close`]
865pub struct SerializedColumnWriter<'a> {
866    inner: ColumnWriter<'a>,
867    on_close: Option<OnCloseColumnChunk<'a>>,
868}
869
870impl<'a> SerializedColumnWriter<'a> {
871    /// Create a new [`SerializedColumnWriter`] from a [`ColumnWriter`] and an
872    /// optional callback to be invoked on [`Self::close`]
873    pub fn new(inner: ColumnWriter<'a>, on_close: Option<OnCloseColumnChunk<'a>>) -> Self {
874        Self { inner, on_close }
875    }
876
877    /// Returns a reference to an untyped [`ColumnWriter`]
878    pub fn untyped(&mut self) -> &mut ColumnWriter<'a> {
879        &mut self.inner
880    }
881
882    /// Returns a reference to a typed [`ColumnWriterImpl`]
883    pub fn typed<T: DataType>(&mut self) -> &mut ColumnWriterImpl<'a, T> {
884        get_typed_column_writer_mut(&mut self.inner)
885    }
886
887    /// Close this [`SerializedColumnWriter`]
888    pub fn close(mut self) -> Result<()> {
889        let r = self.inner.close()?;
890        if let Some(on_close) = self.on_close.take() {
891            on_close(r)?
892        }
893
894        Ok(())
895    }
896}
897
898/// A serialized implementation for Parquet [`PageWriter`].
899/// Writes and serializes pages and metadata into output stream.
900///
901/// `SerializedPageWriter` should not be used after calling `close()`.
902pub struct SerializedPageWriter<'a, W: Write> {
903    sink: &'a mut TrackedWrite<W>,
904    #[cfg(feature = "encryption")]
905    page_encryptor: Option<PageEncryptor>,
906}
907
908impl<'a, W: Write> SerializedPageWriter<'a, W> {
909    /// Creates new page writer.
910    pub fn new(sink: &'a mut TrackedWrite<W>) -> Self {
911        Self {
912            sink,
913            #[cfg(feature = "encryption")]
914            page_encryptor: None,
915        }
916    }
917
918    /// Serializes page header into Thrift.
919    /// Returns number of bytes that have been written into the sink.
920    #[inline]
921    fn serialize_page_header(&mut self, header: parquet::PageHeader) -> Result<usize> {
922        let start_pos = self.sink.bytes_written();
923        match self.page_encryptor_and_sink_mut() {
924            Some((page_encryptor, sink)) => {
925                page_encryptor.encrypt_page_header(&header, sink)?;
926            }
927            None => {
928                let mut protocol = TCompactOutputProtocol::new(&mut self.sink);
929                header.write_to_out_protocol(&mut protocol)?;
930            }
931        }
932        Ok(self.sink.bytes_written() - start_pos)
933    }
934}
935
936#[cfg(feature = "encryption")]
937impl<'a, W: Write> SerializedPageWriter<'a, W> {
938    /// Set the encryptor to use to encrypt page data
939    fn with_page_encryptor(mut self, page_encryptor: Option<PageEncryptor>) -> Self {
940        self.page_encryptor = page_encryptor;
941        self
942    }
943
944    fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
945        self.page_encryptor.as_mut()
946    }
947
948    fn page_encryptor_and_sink_mut(
949        &mut self,
950    ) -> Option<(&mut PageEncryptor, &mut &'a mut TrackedWrite<W>)> {
951        self.page_encryptor.as_mut().map(|pe| (pe, &mut self.sink))
952    }
953}
954
955#[cfg(not(feature = "encryption"))]
956impl<'a, W: Write> SerializedPageWriter<'a, W> {
957    fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
958        None
959    }
960
961    fn page_encryptor_and_sink_mut(
962        &mut self,
963    ) -> Option<(&mut PageEncryptor, &mut &'a mut TrackedWrite<W>)> {
964        None
965    }
966}
967
968impl<W: Write + Send> PageWriter for SerializedPageWriter<'_, W> {
969    fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
970        let page = match self.page_encryptor_mut() {
971            Some(page_encryptor) => page_encryptor.encrypt_compressed_page(page)?,
972            None => page,
973        };
974
975        let page_type = page.page_type();
976        let start_pos = self.sink.bytes_written() as u64;
977
978        let page_header = page.to_thrift_header()?;
979        let header_size = self.serialize_page_header(page_header)?;
980
981        self.sink.write_all(page.data())?;
982
983        let mut spec = PageWriteSpec::new();
984        spec.page_type = page_type;
985        spec.uncompressed_size = page.uncompressed_size() + header_size;
986        spec.compressed_size = page.compressed_size() + header_size;
987        spec.offset = start_pos;
988        spec.bytes_written = self.sink.bytes_written() as u64 - start_pos;
989        spec.num_values = page.num_values();
990
991        if let Some(page_encryptor) = self.page_encryptor_mut() {
992            if page.compressed_page().is_data_page() {
993                page_encryptor.increment_page();
994            }
995        }
996        Ok(spec)
997    }
998
999    fn close(&mut self) -> Result<()> {
1000        self.sink.flush()?;
1001        Ok(())
1002    }
1003}
1004
1005/// Get the magic bytes at the start and end of the file that identify this
1006/// as a Parquet file.
1007#[cfg(feature = "encryption")]
1008pub(crate) fn get_file_magic(
1009    file_encryption_properties: Option<&FileEncryptionProperties>,
1010) -> &'static [u8; 4] {
1011    match file_encryption_properties.as_ref() {
1012        Some(encryption_properties) if encryption_properties.encrypt_footer() => {
1013            &PARQUET_MAGIC_ENCR_FOOTER
1014        }
1015        _ => &PARQUET_MAGIC,
1016    }
1017}
1018
1019#[cfg(not(feature = "encryption"))]
1020pub(crate) fn get_file_magic() -> &'static [u8; 4] {
1021    &PARQUET_MAGIC
1022}
1023
1024#[cfg(test)]
1025mod tests {
1026    use super::*;
1027
1028    #[cfg(feature = "arrow")]
1029    use arrow_array::RecordBatchReader;
1030    use bytes::Bytes;
1031    use std::fs::File;
1032
1033    #[cfg(feature = "arrow")]
1034    use crate::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
1035    #[cfg(feature = "arrow")]
1036    use crate::arrow::ArrowWriter;
1037    use crate::basic::{
1038        ColumnOrder, Compression, ConvertedType, Encoding, LogicalType, Repetition, SortOrder, Type,
1039    };
1040    use crate::column::page::{Page, PageReader};
1041    use crate::column::reader::get_typed_column_reader;
1042    use crate::compression::{create_codec, Codec, CodecOptionsBuilder};
1043    use crate::data_type::{BoolType, ByteArrayType, Int32Type};
1044    use crate::file::page_index::index::Index;
1045    use crate::file::properties::EnabledStatistics;
1046    use crate::file::serialized_reader::ReadOptionsBuilder;
1047    use crate::file::{
1048        properties::{ReaderProperties, WriterProperties, WriterVersion},
1049        reader::{FileReader, SerializedFileReader, SerializedPageReader},
1050        statistics::{from_thrift, to_thrift, Statistics},
1051    };
1052    use crate::format::SortingColumn;
1053    use crate::record::{Row, RowAccessor};
1054    use crate::schema::parser::parse_message_type;
1055    use crate::schema::types;
1056    use crate::schema::types::{ColumnDescriptor, ColumnPath};
1057    use crate::util::test_common::rand_gen::RandGen;
1058
1059    #[test]
1060    fn test_row_group_writer_error_not_all_columns_written() {
1061        let file = tempfile::tempfile().unwrap();
1062        let schema = Arc::new(
1063            types::Type::group_type_builder("schema")
1064                .with_fields(vec![Arc::new(
1065                    types::Type::primitive_type_builder("col1", Type::INT32)
1066                        .build()
1067                        .unwrap(),
1068                )])
1069                .build()
1070                .unwrap(),
1071        );
1072        let props = Default::default();
1073        let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
1074        let row_group_writer = writer.next_row_group().unwrap();
1075        let res = row_group_writer.close();
1076        assert!(res.is_err());
1077        if let Err(err) = res {
1078            assert_eq!(
1079                format!("{err}"),
1080                "Parquet error: Column length mismatch: 1 != 0"
1081            );
1082        }
1083    }
1084
1085    #[test]
1086    fn test_row_group_writer_num_records_mismatch() {
1087        let file = tempfile::tempfile().unwrap();
1088        let schema = Arc::new(
1089            types::Type::group_type_builder("schema")
1090                .with_fields(vec![
1091                    Arc::new(
1092                        types::Type::primitive_type_builder("col1", Type::INT32)
1093                            .with_repetition(Repetition::REQUIRED)
1094                            .build()
1095                            .unwrap(),
1096                    ),
1097                    Arc::new(
1098                        types::Type::primitive_type_builder("col2", Type::INT32)
1099                            .with_repetition(Repetition::REQUIRED)
1100                            .build()
1101                            .unwrap(),
1102                    ),
1103                ])
1104                .build()
1105                .unwrap(),
1106        );
1107        let props = Default::default();
1108        let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
1109        let mut row_group_writer = writer.next_row_group().unwrap();
1110
1111        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
1112        col_writer
1113            .typed::<Int32Type>()
1114            .write_batch(&[1, 2, 3], None, None)
1115            .unwrap();
1116        col_writer.close().unwrap();
1117
1118        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
1119        col_writer
1120            .typed::<Int32Type>()
1121            .write_batch(&[1, 2], None, None)
1122            .unwrap();
1123
1124        let err = col_writer.close().unwrap_err();
1125        assert_eq!(
1126            err.to_string(),
1127            "Parquet error: Incorrect number of rows, expected 3 != 2 rows"
1128        );
1129    }
1130
1131    #[test]
1132    fn test_file_writer_empty_file() {
1133        let file = tempfile::tempfile().unwrap();
1134
1135        let schema = Arc::new(
1136            types::Type::group_type_builder("schema")
1137                .with_fields(vec![Arc::new(
1138                    types::Type::primitive_type_builder("col1", Type::INT32)
1139                        .build()
1140                        .unwrap(),
1141                )])
1142                .build()
1143                .unwrap(),
1144        );
1145        let props = Default::default();
1146        let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1147        writer.close().unwrap();
1148
1149        let reader = SerializedFileReader::new(file).unwrap();
1150        assert_eq!(reader.get_row_iter(None).unwrap().count(), 0);
1151    }
1152
1153    #[test]
1154    fn test_file_writer_column_orders_populated() {
1155        let file = tempfile::tempfile().unwrap();
1156
1157        let schema = Arc::new(
1158            types::Type::group_type_builder("schema")
1159                .with_fields(vec![
1160                    Arc::new(
1161                        types::Type::primitive_type_builder("col1", Type::INT32)
1162                            .build()
1163                            .unwrap(),
1164                    ),
1165                    Arc::new(
1166                        types::Type::primitive_type_builder("col2", Type::FIXED_LEN_BYTE_ARRAY)
1167                            .with_converted_type(ConvertedType::INTERVAL)
1168                            .with_length(12)
1169                            .build()
1170                            .unwrap(),
1171                    ),
1172                    Arc::new(
1173                        types::Type::group_type_builder("nested")
1174                            .with_repetition(Repetition::REQUIRED)
1175                            .with_fields(vec![
1176                                Arc::new(
1177                                    types::Type::primitive_type_builder(
1178                                        "col3",
1179                                        Type::FIXED_LEN_BYTE_ARRAY,
1180                                    )
1181                                    .with_logical_type(Some(LogicalType::Float16))
1182                                    .with_length(2)
1183                                    .build()
1184                                    .unwrap(),
1185                                ),
1186                                Arc::new(
1187                                    types::Type::primitive_type_builder("col4", Type::BYTE_ARRAY)
1188                                        .with_logical_type(Some(LogicalType::String))
1189                                        .build()
1190                                        .unwrap(),
1191                                ),
1192                            ])
1193                            .build()
1194                            .unwrap(),
1195                    ),
1196                ])
1197                .build()
1198                .unwrap(),
1199        );
1200
1201        let props = Default::default();
1202        let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1203        writer.close().unwrap();
1204
1205        let reader = SerializedFileReader::new(file).unwrap();
1206
1207        // only leaves
1208        let expected = vec![
1209            // INT32
1210            ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED),
1211            // INTERVAL
1212            ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::UNDEFINED),
1213            // Float16
1214            ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED),
1215            // String
1216            ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::UNSIGNED),
1217        ];
1218        let actual = reader.metadata().file_metadata().column_orders();
1219
1220        assert!(actual.is_some());
1221        let actual = actual.unwrap();
1222        assert_eq!(*actual, expected);
1223    }
1224
1225    #[test]
1226    fn test_file_writer_with_metadata() {
1227        let file = tempfile::tempfile().unwrap();
1228
1229        let schema = Arc::new(
1230            types::Type::group_type_builder("schema")
1231                .with_fields(vec![Arc::new(
1232                    types::Type::primitive_type_builder("col1", Type::INT32)
1233                        .build()
1234                        .unwrap(),
1235                )])
1236                .build()
1237                .unwrap(),
1238        );
1239        let props = Arc::new(
1240            WriterProperties::builder()
1241                .set_key_value_metadata(Some(vec![KeyValue::new(
1242                    "key".to_string(),
1243                    "value".to_string(),
1244                )]))
1245                .build(),
1246        );
1247        let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1248        writer.close().unwrap();
1249
1250        let reader = SerializedFileReader::new(file).unwrap();
1251        assert_eq!(
1252            reader
1253                .metadata()
1254                .file_metadata()
1255                .key_value_metadata()
1256                .to_owned()
1257                .unwrap()
1258                .len(),
1259            1
1260        );
1261    }
1262
1263    #[test]
1264    fn test_file_writer_v2_with_metadata() {
1265        let file = tempfile::tempfile().unwrap();
1266        let field_logical_type = Some(LogicalType::Integer {
1267            bit_width: 8,
1268            is_signed: false,
1269        });
1270        let field = Arc::new(
1271            types::Type::primitive_type_builder("col1", Type::INT32)
1272                .with_logical_type(field_logical_type.clone())
1273                .with_converted_type(field_logical_type.into())
1274                .build()
1275                .unwrap(),
1276        );
1277        let schema = Arc::new(
1278            types::Type::group_type_builder("schema")
1279                .with_fields(vec![field.clone()])
1280                .build()
1281                .unwrap(),
1282        );
1283        let props = Arc::new(
1284            WriterProperties::builder()
1285                .set_key_value_metadata(Some(vec![KeyValue::new(
1286                    "key".to_string(),
1287                    "value".to_string(),
1288                )]))
1289                .set_writer_version(WriterVersion::PARQUET_2_0)
1290                .build(),
1291        );
1292        let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1293        writer.close().unwrap();
1294
1295        let reader = SerializedFileReader::new(file).unwrap();
1296
1297        assert_eq!(
1298            reader
1299                .metadata()
1300                .file_metadata()
1301                .key_value_metadata()
1302                .to_owned()
1303                .unwrap()
1304                .len(),
1305            1
1306        );
1307
1308        // ARROW-11803: Test that the converted and logical types have been populated
1309        let fields = reader.metadata().file_metadata().schema().get_fields();
1310        assert_eq!(fields.len(), 1);
1311        assert_eq!(fields[0], field);
1312    }
1313
1314    #[test]
1315    fn test_file_writer_with_sorting_columns_metadata() {
1316        let file = tempfile::tempfile().unwrap();
1317
1318        let schema = Arc::new(
1319            types::Type::group_type_builder("schema")
1320                .with_fields(vec![
1321                    Arc::new(
1322                        types::Type::primitive_type_builder("col1", Type::INT32)
1323                            .build()
1324                            .unwrap(),
1325                    ),
1326                    Arc::new(
1327                        types::Type::primitive_type_builder("col2", Type::INT32)
1328                            .build()
1329                            .unwrap(),
1330                    ),
1331                ])
1332                .build()
1333                .unwrap(),
1334        );
1335        let expected_result = Some(vec![SortingColumn {
1336            column_idx: 0,
1337            descending: false,
1338            nulls_first: true,
1339        }]);
1340        let props = Arc::new(
1341            WriterProperties::builder()
1342                .set_key_value_metadata(Some(vec![KeyValue::new(
1343                    "key".to_string(),
1344                    "value".to_string(),
1345                )]))
1346                .set_sorting_columns(expected_result.clone())
1347                .build(),
1348        );
1349        let mut writer =
1350            SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1351        let mut row_group_writer = writer.next_row_group().expect("get row group writer");
1352
1353        let col_writer = row_group_writer.next_column().unwrap().unwrap();
1354        col_writer.close().unwrap();
1355
1356        let col_writer = row_group_writer.next_column().unwrap().unwrap();
1357        col_writer.close().unwrap();
1358
1359        row_group_writer.close().unwrap();
1360        writer.close().unwrap();
1361
1362        let reader = SerializedFileReader::new(file).unwrap();
1363        let result: Vec<Option<&Vec<SortingColumn>>> = reader
1364            .metadata()
1365            .row_groups()
1366            .iter()
1367            .map(|f| f.sorting_columns())
1368            .collect();
1369        // validate the sorting column read match the one written above
1370        assert_eq!(expected_result.as_ref(), result[0]);
1371    }
1372
1373    #[test]
1374    fn test_file_writer_empty_row_groups() {
1375        let file = tempfile::tempfile().unwrap();
1376        test_file_roundtrip(file, vec![]);
1377    }
1378
1379    #[test]
1380    fn test_file_writer_single_row_group() {
1381        let file = tempfile::tempfile().unwrap();
1382        test_file_roundtrip(file, vec![vec![1, 2, 3, 4, 5]]);
1383    }
1384
1385    #[test]
1386    fn test_file_writer_multiple_row_groups() {
1387        let file = tempfile::tempfile().unwrap();
1388        test_file_roundtrip(
1389            file,
1390            vec![
1391                vec![1, 2, 3, 4, 5],
1392                vec![1, 2, 3],
1393                vec![1],
1394                vec![1, 2, 3, 4, 5, 6],
1395            ],
1396        );
1397    }
1398
1399    #[test]
1400    fn test_file_writer_multiple_large_row_groups() {
1401        let file = tempfile::tempfile().unwrap();
1402        test_file_roundtrip(
1403            file,
1404            vec![vec![123; 1024], vec![124; 1000], vec![125; 15], vec![]],
1405        );
1406    }
1407
1408    #[test]
1409    fn test_page_writer_data_pages() {
1410        let pages = vec![
1411            Page::DataPage {
1412                buf: Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]),
1413                num_values: 10,
1414                encoding: Encoding::DELTA_BINARY_PACKED,
1415                def_level_encoding: Encoding::RLE,
1416                rep_level_encoding: Encoding::RLE,
1417                statistics: Some(Statistics::int32(Some(1), Some(3), None, Some(7), true)),
1418            },
1419            Page::DataPageV2 {
1420                buf: Bytes::from(vec![4; 128]),
1421                num_values: 10,
1422                encoding: Encoding::DELTA_BINARY_PACKED,
1423                num_nulls: 2,
1424                num_rows: 12,
1425                def_levels_byte_len: 24,
1426                rep_levels_byte_len: 32,
1427                is_compressed: false,
1428                statistics: Some(Statistics::int32(Some(1), Some(3), None, Some(7), true)),
1429            },
1430        ];
1431
1432        test_page_roundtrip(&pages[..], Compression::SNAPPY, Type::INT32);
1433        test_page_roundtrip(&pages[..], Compression::UNCOMPRESSED, Type::INT32);
1434    }
1435
1436    #[test]
1437    fn test_page_writer_dict_pages() {
1438        let pages = vec![
1439            Page::DictionaryPage {
1440                buf: Bytes::from(vec![1, 2, 3, 4, 5]),
1441                num_values: 5,
1442                encoding: Encoding::RLE_DICTIONARY,
1443                is_sorted: false,
1444            },
1445            Page::DataPage {
1446                buf: Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]),
1447                num_values: 10,
1448                encoding: Encoding::DELTA_BINARY_PACKED,
1449                def_level_encoding: Encoding::RLE,
1450                rep_level_encoding: Encoding::RLE,
1451                statistics: Some(Statistics::int32(Some(1), Some(3), None, Some(7), true)),
1452            },
1453            Page::DataPageV2 {
1454                buf: Bytes::from(vec![4; 128]),
1455                num_values: 10,
1456                encoding: Encoding::DELTA_BINARY_PACKED,
1457                num_nulls: 2,
1458                num_rows: 12,
1459                def_levels_byte_len: 24,
1460                rep_levels_byte_len: 32,
1461                is_compressed: false,
1462                statistics: None,
1463            },
1464        ];
1465
1466        test_page_roundtrip(&pages[..], Compression::SNAPPY, Type::INT32);
1467        test_page_roundtrip(&pages[..], Compression::UNCOMPRESSED, Type::INT32);
1468    }
1469
1470    /// Tests writing and reading pages.
1471    /// Physical type is for statistics only, should match any defined statistics type in
1472    /// pages.
1473    fn test_page_roundtrip(pages: &[Page], codec: Compression, physical_type: Type) {
1474        let mut compressed_pages = vec![];
1475        let mut total_num_values = 0i64;
1476        let codec_options = CodecOptionsBuilder::default()
1477            .set_backward_compatible_lz4(false)
1478            .build();
1479        let mut compressor = create_codec(codec, &codec_options).unwrap();
1480
1481        for page in pages {
1482            let uncompressed_len = page.buffer().len();
1483
1484            let compressed_page = match *page {
1485                Page::DataPage {
1486                    ref buf,
1487                    num_values,
1488                    encoding,
1489                    def_level_encoding,
1490                    rep_level_encoding,
1491                    ref statistics,
1492                } => {
1493                    total_num_values += num_values as i64;
1494                    let output_buf = compress_helper(compressor.as_mut(), buf);
1495
1496                    Page::DataPage {
1497                        buf: Bytes::from(output_buf),
1498                        num_values,
1499                        encoding,
1500                        def_level_encoding,
1501                        rep_level_encoding,
1502                        statistics: from_thrift(physical_type, to_thrift(statistics.as_ref()))
1503                            .unwrap(),
1504                    }
1505                }
1506                Page::DataPageV2 {
1507                    ref buf,
1508                    num_values,
1509                    encoding,
1510                    num_nulls,
1511                    num_rows,
1512                    def_levels_byte_len,
1513                    rep_levels_byte_len,
1514                    ref statistics,
1515                    ..
1516                } => {
1517                    total_num_values += num_values as i64;
1518                    let offset = (def_levels_byte_len + rep_levels_byte_len) as usize;
1519                    let cmp_buf = compress_helper(compressor.as_mut(), &buf[offset..]);
1520                    let mut output_buf = Vec::from(&buf[..offset]);
1521                    output_buf.extend_from_slice(&cmp_buf[..]);
1522
1523                    Page::DataPageV2 {
1524                        buf: Bytes::from(output_buf),
1525                        num_values,
1526                        encoding,
1527                        num_nulls,
1528                        num_rows,
1529                        def_levels_byte_len,
1530                        rep_levels_byte_len,
1531                        is_compressed: compressor.is_some(),
1532                        statistics: from_thrift(physical_type, to_thrift(statistics.as_ref()))
1533                            .unwrap(),
1534                    }
1535                }
1536                Page::DictionaryPage {
1537                    ref buf,
1538                    num_values,
1539                    encoding,
1540                    is_sorted,
1541                } => {
1542                    let output_buf = compress_helper(compressor.as_mut(), buf);
1543
1544                    Page::DictionaryPage {
1545                        buf: Bytes::from(output_buf),
1546                        num_values,
1547                        encoding,
1548                        is_sorted,
1549                    }
1550                }
1551            };
1552
1553            let compressed_page = CompressedPage::new(compressed_page, uncompressed_len);
1554            compressed_pages.push(compressed_page);
1555        }
1556
1557        let mut buffer: Vec<u8> = vec![];
1558        let mut result_pages: Vec<Page> = vec![];
1559        {
1560            let mut writer = TrackedWrite::new(&mut buffer);
1561            let mut page_writer = SerializedPageWriter::new(&mut writer);
1562
1563            for page in compressed_pages {
1564                page_writer.write_page(page).unwrap();
1565            }
1566            page_writer.close().unwrap();
1567        }
1568        {
1569            let reader = bytes::Bytes::from(buffer);
1570
1571            let t = types::Type::primitive_type_builder("t", physical_type)
1572                .build()
1573                .unwrap();
1574
1575            let desc = ColumnDescriptor::new(Arc::new(t), 0, 0, ColumnPath::new(vec![]));
1576            let meta = ColumnChunkMetaData::builder(Arc::new(desc))
1577                .set_compression(codec)
1578                .set_total_compressed_size(reader.len() as i64)
1579                .set_num_values(total_num_values)
1580                .build()
1581                .unwrap();
1582
1583            let props = ReaderProperties::builder()
1584                .set_backward_compatible_lz4(false)
1585                .build();
1586            let mut page_reader = SerializedPageReader::new_with_properties(
1587                Arc::new(reader),
1588                &meta,
1589                total_num_values as usize,
1590                None,
1591                Arc::new(props),
1592            )
1593            .unwrap();
1594
1595            while let Some(page) = page_reader.get_next_page().unwrap() {
1596                result_pages.push(page);
1597            }
1598        }
1599
1600        assert_eq!(result_pages.len(), pages.len());
1601        for i in 0..result_pages.len() {
1602            assert_page(&result_pages[i], &pages[i]);
1603        }
1604    }
1605
1606    /// Helper function to compress a slice
1607    fn compress_helper(compressor: Option<&mut Box<dyn Codec>>, data: &[u8]) -> Vec<u8> {
1608        let mut output_buf = vec![];
1609        if let Some(cmpr) = compressor {
1610            cmpr.compress(data, &mut output_buf).unwrap();
1611        } else {
1612            output_buf.extend_from_slice(data);
1613        }
1614        output_buf
1615    }
1616
1617    /// Check if pages match.
1618    fn assert_page(left: &Page, right: &Page) {
1619        assert_eq!(left.page_type(), right.page_type());
1620        assert_eq!(&left.buffer(), &right.buffer());
1621        assert_eq!(left.num_values(), right.num_values());
1622        assert_eq!(left.encoding(), right.encoding());
1623        assert_eq!(to_thrift(left.statistics()), to_thrift(right.statistics()));
1624    }
1625
1626    /// Tests roundtrip of i32 data written using `W` and read using `R`
1627    fn test_roundtrip_i32<W, R>(
1628        file: W,
1629        data: Vec<Vec<i32>>,
1630        compression: Compression,
1631    ) -> crate::format::FileMetaData
1632    where
1633        W: Write + Send,
1634        R: ChunkReader + From<W> + 'static,
1635    {
1636        test_roundtrip::<W, R, Int32Type, _>(file, data, |r| r.get_int(0).unwrap(), compression)
1637    }
1638
1639    /// Tests roundtrip of data of type `D` written using `W` and read using `R`
1640    /// and the provided `values` function
1641    fn test_roundtrip<W, R, D, F>(
1642        mut file: W,
1643        data: Vec<Vec<D::T>>,
1644        value: F,
1645        compression: Compression,
1646    ) -> crate::format::FileMetaData
1647    where
1648        W: Write + Send,
1649        R: ChunkReader + From<W> + 'static,
1650        D: DataType,
1651        F: Fn(Row) -> D::T,
1652    {
1653        let schema = Arc::new(
1654            types::Type::group_type_builder("schema")
1655                .with_fields(vec![Arc::new(
1656                    types::Type::primitive_type_builder("col1", D::get_physical_type())
1657                        .with_repetition(Repetition::REQUIRED)
1658                        .build()
1659                        .unwrap(),
1660                )])
1661                .build()
1662                .unwrap(),
1663        );
1664        let props = Arc::new(
1665            WriterProperties::builder()
1666                .set_compression(compression)
1667                .build(),
1668        );
1669        let mut file_writer = SerializedFileWriter::new(&mut file, schema, props).unwrap();
1670        let mut rows: i64 = 0;
1671
1672        for (idx, subset) in data.iter().enumerate() {
1673            let row_group_file_offset = file_writer.buf.bytes_written();
1674            let mut row_group_writer = file_writer.next_row_group().unwrap();
1675            if let Some(mut writer) = row_group_writer.next_column().unwrap() {
1676                rows += writer
1677                    .typed::<D>()
1678                    .write_batch(&subset[..], None, None)
1679                    .unwrap() as i64;
1680                writer.close().unwrap();
1681            }
1682            let last_group = row_group_writer.close().unwrap();
1683            let flushed = file_writer.flushed_row_groups();
1684            assert_eq!(flushed.len(), idx + 1);
1685            assert_eq!(Some(idx as i16), last_group.ordinal());
1686            assert_eq!(Some(row_group_file_offset as i64), last_group.file_offset());
1687            assert_eq!(&flushed[idx], last_group.as_ref());
1688        }
1689        let file_metadata = file_writer.close().unwrap();
1690
1691        let reader = SerializedFileReader::new(R::from(file)).unwrap();
1692        assert_eq!(reader.num_row_groups(), data.len());
1693        assert_eq!(
1694            reader.metadata().file_metadata().num_rows(),
1695            rows,
1696            "row count in metadata not equal to number of rows written"
1697        );
1698        for (i, item) in data.iter().enumerate().take(reader.num_row_groups()) {
1699            let row_group_reader = reader.get_row_group(i).unwrap();
1700            let iter = row_group_reader.get_row_iter(None).unwrap();
1701            let res: Vec<_> = iter.map(|row| row.unwrap()).map(&value).collect();
1702            let row_group_size = row_group_reader.metadata().total_byte_size();
1703            let uncompressed_size: i64 = row_group_reader
1704                .metadata()
1705                .columns()
1706                .iter()
1707                .map(|v| v.uncompressed_size())
1708                .sum();
1709            assert_eq!(row_group_size, uncompressed_size);
1710            assert_eq!(res, *item);
1711        }
1712        file_metadata
1713    }
1714
1715    /// File write-read roundtrip.
1716    /// `data` consists of arrays of values for each row group.
1717    fn test_file_roundtrip(file: File, data: Vec<Vec<i32>>) -> crate::format::FileMetaData {
1718        test_roundtrip_i32::<File, File>(file, data, Compression::UNCOMPRESSED)
1719    }
1720
1721    #[test]
1722    fn test_bytes_writer_empty_row_groups() {
1723        test_bytes_roundtrip(vec![], Compression::UNCOMPRESSED);
1724    }
1725
1726    #[test]
1727    fn test_bytes_writer_single_row_group() {
1728        test_bytes_roundtrip(vec![vec![1, 2, 3, 4, 5]], Compression::UNCOMPRESSED);
1729    }
1730
1731    #[test]
1732    fn test_bytes_writer_multiple_row_groups() {
1733        test_bytes_roundtrip(
1734            vec![
1735                vec![1, 2, 3, 4, 5],
1736                vec![1, 2, 3],
1737                vec![1],
1738                vec![1, 2, 3, 4, 5, 6],
1739            ],
1740            Compression::UNCOMPRESSED,
1741        );
1742    }
1743
1744    #[test]
1745    fn test_bytes_writer_single_row_group_compressed() {
1746        test_bytes_roundtrip(vec![vec![1, 2, 3, 4, 5]], Compression::SNAPPY);
1747    }
1748
1749    #[test]
1750    fn test_bytes_writer_multiple_row_groups_compressed() {
1751        test_bytes_roundtrip(
1752            vec![
1753                vec![1, 2, 3, 4, 5],
1754                vec![1, 2, 3],
1755                vec![1],
1756                vec![1, 2, 3, 4, 5, 6],
1757            ],
1758            Compression::SNAPPY,
1759        );
1760    }
1761
1762    fn test_bytes_roundtrip(data: Vec<Vec<i32>>, compression: Compression) {
1763        test_roundtrip_i32::<Vec<u8>, Bytes>(Vec::with_capacity(1024), data, compression);
1764    }
1765
1766    #[test]
1767    fn test_boolean_roundtrip() {
1768        let my_bool_values: Vec<_> = (0..2049).map(|idx| idx % 2 == 0).collect();
1769        test_roundtrip::<Vec<u8>, Bytes, BoolType, _>(
1770            Vec::with_capacity(1024),
1771            vec![my_bool_values],
1772            |r| r.get_bool(0).unwrap(),
1773            Compression::UNCOMPRESSED,
1774        );
1775    }
1776
1777    #[test]
1778    fn test_boolean_compressed_roundtrip() {
1779        let my_bool_values: Vec<_> = (0..2049).map(|idx| idx % 2 == 0).collect();
1780        test_roundtrip::<Vec<u8>, Bytes, BoolType, _>(
1781            Vec::with_capacity(1024),
1782            vec![my_bool_values],
1783            |r| r.get_bool(0).unwrap(),
1784            Compression::SNAPPY,
1785        );
1786    }
1787
1788    #[test]
1789    fn test_column_offset_index_file() {
1790        let file = tempfile::tempfile().unwrap();
1791        let file_metadata = test_file_roundtrip(file, vec![vec![1, 2, 3, 4, 5]]);
1792        file_metadata.row_groups.iter().for_each(|row_group| {
1793            row_group.columns.iter().for_each(|column_chunk| {
1794                assert_ne!(None, column_chunk.column_index_offset);
1795                assert_ne!(None, column_chunk.column_index_length);
1796
1797                assert_ne!(None, column_chunk.offset_index_offset);
1798                assert_ne!(None, column_chunk.offset_index_length);
1799            })
1800        });
1801    }
1802
1803    fn test_kv_metadata(initial_kv: Option<Vec<KeyValue>>, final_kv: Option<Vec<KeyValue>>) {
1804        let schema = Arc::new(
1805            types::Type::group_type_builder("schema")
1806                .with_fields(vec![Arc::new(
1807                    types::Type::primitive_type_builder("col1", Type::INT32)
1808                        .with_repetition(Repetition::REQUIRED)
1809                        .build()
1810                        .unwrap(),
1811                )])
1812                .build()
1813                .unwrap(),
1814        );
1815        let mut out = Vec::with_capacity(1024);
1816        let props = Arc::new(
1817            WriterProperties::builder()
1818                .set_key_value_metadata(initial_kv.clone())
1819                .build(),
1820        );
1821        let mut writer = SerializedFileWriter::new(&mut out, schema, props).unwrap();
1822        let mut row_group_writer = writer.next_row_group().unwrap();
1823        let column = row_group_writer.next_column().unwrap().unwrap();
1824        column.close().unwrap();
1825        row_group_writer.close().unwrap();
1826        if let Some(kvs) = &final_kv {
1827            for kv in kvs {
1828                writer.append_key_value_metadata(kv.clone())
1829            }
1830        }
1831        writer.close().unwrap();
1832
1833        let reader = SerializedFileReader::new(Bytes::from(out)).unwrap();
1834        let metadata = reader.metadata().file_metadata();
1835        let keys = metadata.key_value_metadata();
1836
1837        match (initial_kv, final_kv) {
1838            (Some(a), Some(b)) => {
1839                let keys = keys.unwrap();
1840                assert_eq!(keys.len(), a.len() + b.len());
1841                assert_eq!(&keys[..a.len()], a.as_slice());
1842                assert_eq!(&keys[a.len()..], b.as_slice());
1843            }
1844            (Some(v), None) => assert_eq!(keys.unwrap(), &v),
1845            (None, Some(v)) if !v.is_empty() => assert_eq!(keys.unwrap(), &v),
1846            _ => assert!(keys.is_none()),
1847        }
1848    }
1849
1850    #[test]
1851    fn test_append_metadata() {
1852        let kv1 = KeyValue::new("cupcakes".to_string(), "awesome".to_string());
1853        let kv2 = KeyValue::new("bingo".to_string(), "bongo".to_string());
1854
1855        test_kv_metadata(None, None);
1856        test_kv_metadata(Some(vec![kv1.clone()]), None);
1857        test_kv_metadata(None, Some(vec![kv2.clone()]));
1858        test_kv_metadata(Some(vec![kv1.clone()]), Some(vec![kv2.clone()]));
1859        test_kv_metadata(Some(vec![]), Some(vec![kv2]));
1860        test_kv_metadata(Some(vec![]), Some(vec![]));
1861        test_kv_metadata(Some(vec![kv1]), Some(vec![]));
1862        test_kv_metadata(None, Some(vec![]));
1863    }
1864
1865    #[test]
1866    fn test_backwards_compatible_statistics() {
1867        let message_type = "
1868            message test_schema {
1869                REQUIRED INT32 decimal1 (DECIMAL(8,2));
1870                REQUIRED INT32 i32 (INTEGER(32,true));
1871                REQUIRED INT32 u32 (INTEGER(32,false));
1872            }
1873        ";
1874
1875        let schema = Arc::new(parse_message_type(message_type).unwrap());
1876        let props = Default::default();
1877        let mut writer = SerializedFileWriter::new(vec![], schema, props).unwrap();
1878        let mut row_group_writer = writer.next_row_group().unwrap();
1879
1880        for _ in 0..3 {
1881            let mut writer = row_group_writer.next_column().unwrap().unwrap();
1882            writer
1883                .typed::<Int32Type>()
1884                .write_batch(&[1, 2, 3], None, None)
1885                .unwrap();
1886            writer.close().unwrap();
1887        }
1888        let metadata = row_group_writer.close().unwrap();
1889        writer.close().unwrap();
1890
1891        let thrift = metadata.to_thrift();
1892        let encoded_stats: Vec<_> = thrift
1893            .columns
1894            .into_iter()
1895            .map(|x| x.meta_data.unwrap().statistics.unwrap())
1896            .collect();
1897
1898        // decimal
1899        let s = &encoded_stats[0];
1900        assert_eq!(s.min.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1901        assert_eq!(s.max.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1902        assert_eq!(s.min_value.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1903        assert_eq!(s.max_value.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1904
1905        // i32
1906        let s = &encoded_stats[1];
1907        assert_eq!(s.min.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1908        assert_eq!(s.max.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1909        assert_eq!(s.min_value.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1910        assert_eq!(s.max_value.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1911
1912        // u32
1913        let s = &encoded_stats[2];
1914        assert_eq!(s.min.as_deref(), None);
1915        assert_eq!(s.max.as_deref(), None);
1916        assert_eq!(s.min_value.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1917        assert_eq!(s.max_value.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1918    }
1919
1920    #[test]
1921    fn test_spliced_write() {
1922        let message_type = "
1923            message test_schema {
1924                REQUIRED INT32 i32 (INTEGER(32,true));
1925                REQUIRED INT32 u32 (INTEGER(32,false));
1926            }
1927        ";
1928        let schema = Arc::new(parse_message_type(message_type).unwrap());
1929        let props = Arc::new(WriterProperties::builder().build());
1930
1931        let mut file = Vec::with_capacity(1024);
1932        let mut file_writer = SerializedFileWriter::new(&mut file, schema, props.clone()).unwrap();
1933
1934        let columns = file_writer.descr.columns();
1935        let mut column_state: Vec<(_, Option<ColumnCloseResult>)> = columns
1936            .iter()
1937            .map(|_| (TrackedWrite::new(Vec::with_capacity(1024)), None))
1938            .collect();
1939
1940        let mut column_state_slice = column_state.as_mut_slice();
1941        let mut column_writers = Vec::with_capacity(columns.len());
1942        for c in columns {
1943            let ((buf, out), tail) = column_state_slice.split_first_mut().unwrap();
1944            column_state_slice = tail;
1945
1946            let page_writer = Box::new(SerializedPageWriter::new(buf));
1947            let col_writer = get_column_writer(c.clone(), props.clone(), page_writer);
1948            column_writers.push(SerializedColumnWriter::new(
1949                col_writer,
1950                Some(Box::new(|on_close| {
1951                    *out = Some(on_close);
1952                    Ok(())
1953                })),
1954            ));
1955        }
1956
1957        let column_data = [[1, 2, 3, 4], [7, 3, 7, 3]];
1958
1959        // Interleaved writing to the column writers
1960        for (writer, batch) in column_writers.iter_mut().zip(column_data) {
1961            let writer = writer.typed::<Int32Type>();
1962            writer.write_batch(&batch, None, None).unwrap();
1963        }
1964
1965        // Close the column writers
1966        for writer in column_writers {
1967            writer.close().unwrap()
1968        }
1969
1970        // Splice column data into a row group
1971        let mut row_group_writer = file_writer.next_row_group().unwrap();
1972        for (write, close) in column_state {
1973            let buf = Bytes::from(write.into_inner().unwrap());
1974            row_group_writer
1975                .append_column(&buf, close.unwrap())
1976                .unwrap();
1977        }
1978        row_group_writer.close().unwrap();
1979        file_writer.close().unwrap();
1980
1981        // Check data was written correctly
1982        let file = Bytes::from(file);
1983        let test_read = |reader: SerializedFileReader<Bytes>| {
1984            let row_group = reader.get_row_group(0).unwrap();
1985
1986            let mut out = Vec::with_capacity(4);
1987            let c1 = row_group.get_column_reader(0).unwrap();
1988            let mut c1 = get_typed_column_reader::<Int32Type>(c1);
1989            c1.read_records(4, None, None, &mut out).unwrap();
1990            assert_eq!(out, column_data[0]);
1991
1992            out.clear();
1993
1994            let c2 = row_group.get_column_reader(1).unwrap();
1995            let mut c2 = get_typed_column_reader::<Int32Type>(c2);
1996            c2.read_records(4, None, None, &mut out).unwrap();
1997            assert_eq!(out, column_data[1]);
1998        };
1999
2000        let reader = SerializedFileReader::new(file.clone()).unwrap();
2001        test_read(reader);
2002
2003        let options = ReadOptionsBuilder::new().with_page_index().build();
2004        let reader = SerializedFileReader::new_with_options(file, options).unwrap();
2005        test_read(reader);
2006    }
2007
2008    #[test]
2009    fn test_disabled_statistics() {
2010        let message_type = "
2011            message test_schema {
2012                REQUIRED INT32 a;
2013                REQUIRED INT32 b;
2014            }
2015        ";
2016        let schema = Arc::new(parse_message_type(message_type).unwrap());
2017        let props = WriterProperties::builder()
2018            .set_statistics_enabled(EnabledStatistics::None)
2019            .set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
2020            .set_offset_index_disabled(true) // this should be ignored because of the line above
2021            .build();
2022        let mut file = Vec::with_capacity(1024);
2023        let mut file_writer =
2024            SerializedFileWriter::new(&mut file, schema, Arc::new(props)).unwrap();
2025
2026        let mut row_group_writer = file_writer.next_row_group().unwrap();
2027        let mut a_writer = row_group_writer.next_column().unwrap().unwrap();
2028        let col_writer = a_writer.typed::<Int32Type>();
2029        col_writer.write_batch(&[1, 2, 3], None, None).unwrap();
2030        a_writer.close().unwrap();
2031
2032        let mut b_writer = row_group_writer.next_column().unwrap().unwrap();
2033        let col_writer = b_writer.typed::<Int32Type>();
2034        col_writer.write_batch(&[4, 5, 6], None, None).unwrap();
2035        b_writer.close().unwrap();
2036        row_group_writer.close().unwrap();
2037
2038        let metadata = file_writer.finish().unwrap();
2039        assert_eq!(metadata.row_groups.len(), 1);
2040        let row_group = &metadata.row_groups[0];
2041        assert_eq!(row_group.columns.len(), 2);
2042        // Column "a" has both offset and column index, as requested
2043        assert!(row_group.columns[0].offset_index_offset.is_some());
2044        assert!(row_group.columns[0].column_index_offset.is_some());
2045        // Column "b" should only have offset index
2046        assert!(row_group.columns[1].offset_index_offset.is_some());
2047        assert!(row_group.columns[1].column_index_offset.is_none());
2048
2049        let err = file_writer.next_row_group().err().unwrap().to_string();
2050        assert_eq!(err, "Parquet error: SerializedFileWriter already finished");
2051
2052        drop(file_writer);
2053
2054        let options = ReadOptionsBuilder::new().with_page_index().build();
2055        let reader = SerializedFileReader::new_with_options(Bytes::from(file), options).unwrap();
2056
2057        let offset_index = reader.metadata().offset_index().unwrap();
2058        assert_eq!(offset_index.len(), 1); // 1 row group
2059        assert_eq!(offset_index[0].len(), 2); // 2 columns
2060
2061        let column_index = reader.metadata().column_index().unwrap();
2062        assert_eq!(column_index.len(), 1); // 1 row group
2063        assert_eq!(column_index[0].len(), 2); // 2 column
2064
2065        let a_idx = &column_index[0][0];
2066        assert!(matches!(a_idx, Index::INT32(_)), "{a_idx:?}");
2067        let b_idx = &column_index[0][1];
2068        assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
2069    }
2070
2071    #[test]
2072    fn test_byte_array_size_statistics() {
2073        let message_type = "
2074            message test_schema {
2075                OPTIONAL BYTE_ARRAY a (UTF8);
2076            }
2077        ";
2078        let schema = Arc::new(parse_message_type(message_type).unwrap());
2079        let data = ByteArrayType::gen_vec(32, 7);
2080        let def_levels = [1, 1, 1, 1, 0, 1, 0, 1, 0, 1];
2081        let unenc_size: i64 = data.iter().map(|x| x.len() as i64).sum();
2082        let file: File = tempfile::tempfile().unwrap();
2083        let props = Arc::new(
2084            WriterProperties::builder()
2085                .set_statistics_enabled(EnabledStatistics::Page)
2086                .build(),
2087        );
2088
2089        let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
2090        let mut row_group_writer = writer.next_row_group().unwrap();
2091
2092        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
2093        col_writer
2094            .typed::<ByteArrayType>()
2095            .write_batch(&data, Some(&def_levels), None)
2096            .unwrap();
2097        col_writer.close().unwrap();
2098        row_group_writer.close().unwrap();
2099        let file_metadata = writer.close().unwrap();
2100
2101        assert_eq!(file_metadata.row_groups.len(), 1);
2102        assert_eq!(file_metadata.row_groups[0].columns.len(), 1);
2103        assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
2104
2105        let check_def_hist = |def_hist: &[i64]| {
2106            assert_eq!(def_hist.len(), 2);
2107            assert_eq!(def_hist[0], 3);
2108            assert_eq!(def_hist[1], 7);
2109        };
2110
2111        assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
2112        let meta_data = file_metadata.row_groups[0].columns[0]
2113            .meta_data
2114            .as_ref()
2115            .unwrap();
2116        assert!(meta_data.size_statistics.is_some());
2117        let size_stats = meta_data.size_statistics.as_ref().unwrap();
2118
2119        assert!(size_stats.repetition_level_histogram.is_none());
2120        assert!(size_stats.definition_level_histogram.is_some());
2121        assert!(size_stats.unencoded_byte_array_data_bytes.is_some());
2122        assert_eq!(
2123            unenc_size,
2124            size_stats.unencoded_byte_array_data_bytes.unwrap()
2125        );
2126        check_def_hist(size_stats.definition_level_histogram.as_ref().unwrap());
2127
2128        // check that the read metadata is also correct
2129        let options = ReadOptionsBuilder::new().with_page_index().build();
2130        let reader = SerializedFileReader::new_with_options(file, options).unwrap();
2131
2132        let rfile_metadata = reader.metadata().file_metadata();
2133        assert_eq!(rfile_metadata.num_rows(), file_metadata.num_rows);
2134        assert_eq!(reader.num_row_groups(), 1);
2135        let rowgroup = reader.get_row_group(0).unwrap();
2136        assert_eq!(rowgroup.num_columns(), 1);
2137        let column = rowgroup.metadata().column(0);
2138        assert!(column.definition_level_histogram().is_some());
2139        assert!(column.repetition_level_histogram().is_none());
2140        assert!(column.unencoded_byte_array_data_bytes().is_some());
2141        check_def_hist(column.definition_level_histogram().unwrap().values());
2142        assert_eq!(
2143            unenc_size,
2144            column.unencoded_byte_array_data_bytes().unwrap()
2145        );
2146
2147        // check histogram in column index as well
2148        assert!(reader.metadata().column_index().is_some());
2149        let column_index = reader.metadata().column_index().unwrap();
2150        assert_eq!(column_index.len(), 1);
2151        assert_eq!(column_index[0].len(), 1);
2152        let col_idx = if let Index::BYTE_ARRAY(index) = &column_index[0][0] {
2153            assert_eq!(index.indexes.len(), 1);
2154            &index.indexes[0]
2155        } else {
2156            unreachable!()
2157        };
2158
2159        assert!(col_idx.repetition_level_histogram().is_none());
2160        assert!(col_idx.definition_level_histogram().is_some());
2161        check_def_hist(col_idx.definition_level_histogram().unwrap().values());
2162
2163        assert!(reader.metadata().offset_index().is_some());
2164        let offset_index = reader.metadata().offset_index().unwrap();
2165        assert_eq!(offset_index.len(), 1);
2166        assert_eq!(offset_index[0].len(), 1);
2167        assert!(offset_index[0][0].unencoded_byte_array_data_bytes.is_some());
2168        let page_sizes = offset_index[0][0]
2169            .unencoded_byte_array_data_bytes
2170            .as_ref()
2171            .unwrap();
2172        assert_eq!(page_sizes.len(), 1);
2173        assert_eq!(page_sizes[0], unenc_size);
2174    }
2175
2176    #[test]
2177    fn test_too_many_rowgroups() {
2178        let message_type = "
2179            message test_schema {
2180                REQUIRED BYTE_ARRAY a (UTF8);
2181            }
2182        ";
2183        let schema = Arc::new(parse_message_type(message_type).unwrap());
2184        let file: File = tempfile::tempfile().unwrap();
2185        let props = Arc::new(
2186            WriterProperties::builder()
2187                .set_statistics_enabled(EnabledStatistics::None)
2188                .set_max_row_group_size(1)
2189                .build(),
2190        );
2191        let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
2192
2193        // Create 32k empty rowgroups. Should error when i == 32768.
2194        for i in 0..0x8001 {
2195            match writer.next_row_group() {
2196                Ok(mut row_group_writer) => {
2197                    assert_ne!(i, 0x8000);
2198                    let col_writer = row_group_writer.next_column().unwrap().unwrap();
2199                    col_writer.close().unwrap();
2200                    row_group_writer.close().unwrap();
2201                }
2202                Err(e) => {
2203                    assert_eq!(i, 0x8000);
2204                    assert_eq!(
2205                        e.to_string(),
2206                        "Parquet error: Parquet does not support more than 32767 row groups per file (currently: 32768)"
2207                    );
2208                }
2209            }
2210        }
2211        writer.close().unwrap();
2212    }
2213
2214    #[test]
2215    fn test_size_statistics_with_repetition_and_nulls() {
2216        let message_type = "
2217            message test_schema {
2218                OPTIONAL group i32_list (LIST) {
2219                    REPEATED group list {
2220                        OPTIONAL INT32 element;
2221                    }
2222                }
2223            }
2224        ";
2225        // column is:
2226        // row 0: [1, 2]
2227        // row 1: NULL
2228        // row 2: [4, NULL]
2229        // row 3: []
2230        // row 4: [7, 8, 9, 10]
2231        let schema = Arc::new(parse_message_type(message_type).unwrap());
2232        let data = [1, 2, 4, 7, 8, 9, 10];
2233        let def_levels = [3, 3, 0, 3, 2, 1, 3, 3, 3, 3];
2234        let rep_levels = [0, 1, 0, 0, 1, 0, 0, 1, 1, 1];
2235        let file = tempfile::tempfile().unwrap();
2236        let props = Arc::new(
2237            WriterProperties::builder()
2238                .set_statistics_enabled(EnabledStatistics::Page)
2239                .build(),
2240        );
2241        let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
2242        let mut row_group_writer = writer.next_row_group().unwrap();
2243
2244        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
2245        col_writer
2246            .typed::<Int32Type>()
2247            .write_batch(&data, Some(&def_levels), Some(&rep_levels))
2248            .unwrap();
2249        col_writer.close().unwrap();
2250        row_group_writer.close().unwrap();
2251        let file_metadata = writer.close().unwrap();
2252
2253        assert_eq!(file_metadata.row_groups.len(), 1);
2254        assert_eq!(file_metadata.row_groups[0].columns.len(), 1);
2255        assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
2256
2257        let check_def_hist = |def_hist: &[i64]| {
2258            assert_eq!(def_hist.len(), 4);
2259            assert_eq!(def_hist[0], 1);
2260            assert_eq!(def_hist[1], 1);
2261            assert_eq!(def_hist[2], 1);
2262            assert_eq!(def_hist[3], 7);
2263        };
2264
2265        let check_rep_hist = |rep_hist: &[i64]| {
2266            assert_eq!(rep_hist.len(), 2);
2267            assert_eq!(rep_hist[0], 5);
2268            assert_eq!(rep_hist[1], 5);
2269        };
2270
2271        // check that histograms are set properly in the write and read metadata
2272        // also check that unencoded_byte_array_data_bytes is not set
2273        assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
2274        let meta_data = file_metadata.row_groups[0].columns[0]
2275            .meta_data
2276            .as_ref()
2277            .unwrap();
2278        assert!(meta_data.size_statistics.is_some());
2279        let size_stats = meta_data.size_statistics.as_ref().unwrap();
2280        assert!(size_stats.repetition_level_histogram.is_some());
2281        assert!(size_stats.definition_level_histogram.is_some());
2282        assert!(size_stats.unencoded_byte_array_data_bytes.is_none());
2283        check_def_hist(size_stats.definition_level_histogram.as_ref().unwrap());
2284        check_rep_hist(size_stats.repetition_level_histogram.as_ref().unwrap());
2285
2286        // check that the read metadata is also correct
2287        let options = ReadOptionsBuilder::new().with_page_index().build();
2288        let reader = SerializedFileReader::new_with_options(file, options).unwrap();
2289
2290        let rfile_metadata = reader.metadata().file_metadata();
2291        assert_eq!(rfile_metadata.num_rows(), file_metadata.num_rows);
2292        assert_eq!(reader.num_row_groups(), 1);
2293        let rowgroup = reader.get_row_group(0).unwrap();
2294        assert_eq!(rowgroup.num_columns(), 1);
2295        let column = rowgroup.metadata().column(0);
2296        assert!(column.definition_level_histogram().is_some());
2297        assert!(column.repetition_level_histogram().is_some());
2298        assert!(column.unencoded_byte_array_data_bytes().is_none());
2299        check_def_hist(column.definition_level_histogram().unwrap().values());
2300        check_rep_hist(column.repetition_level_histogram().unwrap().values());
2301
2302        // check histogram in column index as well
2303        assert!(reader.metadata().column_index().is_some());
2304        let column_index = reader.metadata().column_index().unwrap();
2305        assert_eq!(column_index.len(), 1);
2306        assert_eq!(column_index[0].len(), 1);
2307        let col_idx = if let Index::INT32(index) = &column_index[0][0] {
2308            assert_eq!(index.indexes.len(), 1);
2309            &index.indexes[0]
2310        } else {
2311            unreachable!()
2312        };
2313
2314        check_def_hist(col_idx.definition_level_histogram().unwrap().values());
2315        check_rep_hist(col_idx.repetition_level_histogram().unwrap().values());
2316
2317        assert!(reader.metadata().offset_index().is_some());
2318        let offset_index = reader.metadata().offset_index().unwrap();
2319        assert_eq!(offset_index.len(), 1);
2320        assert_eq!(offset_index[0].len(), 1);
2321        assert!(offset_index[0][0].unencoded_byte_array_data_bytes.is_none());
2322    }
2323
2324    #[test]
2325    #[cfg(feature = "arrow")]
2326    fn test_byte_stream_split_extended_roundtrip() {
2327        let path = format!(
2328            "{}/byte_stream_split_extended.gzip.parquet",
2329            arrow::util::test_util::parquet_test_data(),
2330        );
2331        let file = File::open(path).unwrap();
2332
2333        // Read in test file and rewrite to tmp
2334        let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(file)
2335            .expect("parquet open")
2336            .build()
2337            .expect("parquet open");
2338
2339        let file = tempfile::tempfile().unwrap();
2340        let props = WriterProperties::builder()
2341            .set_dictionary_enabled(false)
2342            .set_column_encoding(
2343                ColumnPath::from("float16_byte_stream_split"),
2344                Encoding::BYTE_STREAM_SPLIT,
2345            )
2346            .set_column_encoding(
2347                ColumnPath::from("float_byte_stream_split"),
2348                Encoding::BYTE_STREAM_SPLIT,
2349            )
2350            .set_column_encoding(
2351                ColumnPath::from("double_byte_stream_split"),
2352                Encoding::BYTE_STREAM_SPLIT,
2353            )
2354            .set_column_encoding(
2355                ColumnPath::from("int32_byte_stream_split"),
2356                Encoding::BYTE_STREAM_SPLIT,
2357            )
2358            .set_column_encoding(
2359                ColumnPath::from("int64_byte_stream_split"),
2360                Encoding::BYTE_STREAM_SPLIT,
2361            )
2362            .set_column_encoding(
2363                ColumnPath::from("flba5_byte_stream_split"),
2364                Encoding::BYTE_STREAM_SPLIT,
2365            )
2366            .set_column_encoding(
2367                ColumnPath::from("decimal_byte_stream_split"),
2368                Encoding::BYTE_STREAM_SPLIT,
2369            )
2370            .build();
2371
2372        let mut parquet_writer = ArrowWriter::try_new(
2373            file.try_clone().expect("cannot open file"),
2374            parquet_reader.schema(),
2375            Some(props),
2376        )
2377        .expect("create arrow writer");
2378
2379        for maybe_batch in parquet_reader {
2380            let batch = maybe_batch.expect("reading batch");
2381            parquet_writer.write(&batch).expect("writing data");
2382        }
2383
2384        parquet_writer.close().expect("finalizing file");
2385
2386        let reader = SerializedFileReader::new(file).expect("Failed to create reader");
2387        let filemeta = reader.metadata();
2388
2389        // Make sure byte_stream_split encoding was used
2390        let check_encoding = |x: usize, filemeta: &ParquetMetaData| {
2391            assert!(filemeta
2392                .row_group(0)
2393                .column(x)
2394                .encodings()
2395                .contains(&Encoding::BYTE_STREAM_SPLIT));
2396        };
2397
2398        check_encoding(1, filemeta);
2399        check_encoding(3, filemeta);
2400        check_encoding(5, filemeta);
2401        check_encoding(7, filemeta);
2402        check_encoding(9, filemeta);
2403        check_encoding(11, filemeta);
2404        check_encoding(13, filemeta);
2405
2406        // Read back tmpfile and make sure all values are correct
2407        let mut iter = reader
2408            .get_row_iter(None)
2409            .expect("Failed to create row iterator");
2410
2411        let mut start = 0;
2412        let end = reader.metadata().file_metadata().num_rows();
2413
2414        let check_row = |row: Result<Row, ParquetError>| {
2415            assert!(row.is_ok());
2416            let r = row.unwrap();
2417            assert_eq!(r.get_float16(0).unwrap(), r.get_float16(1).unwrap());
2418            assert_eq!(r.get_float(2).unwrap(), r.get_float(3).unwrap());
2419            assert_eq!(r.get_double(4).unwrap(), r.get_double(5).unwrap());
2420            assert_eq!(r.get_int(6).unwrap(), r.get_int(7).unwrap());
2421            assert_eq!(r.get_long(8).unwrap(), r.get_long(9).unwrap());
2422            assert_eq!(r.get_bytes(10).unwrap(), r.get_bytes(11).unwrap());
2423            assert_eq!(r.get_decimal(12).unwrap(), r.get_decimal(13).unwrap());
2424        };
2425
2426        while start < end {
2427            match iter.next() {
2428                Some(row) => check_row(row),
2429                None => break,
2430            };
2431            start += 1;
2432        }
2433    }
2434}