parquet/file/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`SerializedFileWriter`]: Low level Parquet writer API
19
20use crate::bloom_filter::Sbbf;
21use crate::file::metadata::thrift::PageHeader;
22use crate::file::page_index::column_index::ColumnIndexMetaData;
23use crate::file::page_index::offset_index::OffsetIndexMetaData;
24use crate::parquet_thrift::{ThriftCompactOutputProtocol, WriteThrift};
25use std::fmt::Debug;
26use std::io::{BufWriter, IoSlice, Read};
27use std::{io::Write, sync::Arc};
28
29use crate::column::page_encryption::PageEncryptor;
30use crate::column::writer::{ColumnCloseResult, ColumnWriterImpl, get_typed_column_writer_mut};
31use crate::column::{
32    page::{CompressedPage, PageWriteSpec, PageWriter},
33    writer::{ColumnWriter, get_column_writer},
34};
35use crate::data_type::DataType;
36#[cfg(feature = "encryption")]
37use crate::encryption::encrypt::{
38    FileEncryptionProperties, FileEncryptor, get_column_crypto_metadata,
39};
40use crate::errors::{ParquetError, Result};
41#[cfg(feature = "encryption")]
42use crate::file::PARQUET_MAGIC_ENCR_FOOTER;
43use crate::file::properties::{BloomFilterPosition, WriterPropertiesPtr};
44use crate::file::reader::ChunkReader;
45use crate::file::{PARQUET_MAGIC, metadata::*};
46use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor, TypePtr};
47
48/// A wrapper around a [`Write`] that keeps track of the number
49/// of bytes that have been written. The given [`Write`] is wrapped
50/// with a [`BufWriter`] to optimize writing performance.
51pub struct TrackedWrite<W: Write> {
52    inner: BufWriter<W>,
53    bytes_written: usize,
54}
55
56impl<W: Write> TrackedWrite<W> {
57    /// Create a new [`TrackedWrite`] from a [`Write`]
58    pub fn new(inner: W) -> Self {
59        let buf_write = BufWriter::new(inner);
60        Self {
61            inner: buf_write,
62            bytes_written: 0,
63        }
64    }
65
66    /// Returns the number of bytes written to this instance
67    pub fn bytes_written(&self) -> usize {
68        self.bytes_written
69    }
70
71    /// Returns a reference to the underlying writer.
72    pub fn inner(&self) -> &W {
73        self.inner.get_ref()
74    }
75
76    /// Returns a mutable reference to the underlying writer.
77    ///
78    /// It is inadvisable to directly write to the underlying writer, doing so
79    /// will likely result in data corruption
80    pub fn inner_mut(&mut self) -> &mut W {
81        self.inner.get_mut()
82    }
83
84    /// Returns the underlying writer.
85    pub fn into_inner(self) -> Result<W> {
86        self.inner.into_inner().map_err(|err| {
87            ParquetError::General(format!("fail to get inner writer: {:?}", err.to_string()))
88        })
89    }
90}
91
92impl<W: Write> Write for TrackedWrite<W> {
93    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
94        let bytes = self.inner.write(buf)?;
95        self.bytes_written += bytes;
96        Ok(bytes)
97    }
98
99    fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> std::io::Result<usize> {
100        let bytes = self.inner.write_vectored(bufs)?;
101        self.bytes_written += bytes;
102        Ok(bytes)
103    }
104
105    fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
106        self.inner.write_all(buf)?;
107        self.bytes_written += buf.len();
108
109        Ok(())
110    }
111
112    fn flush(&mut self) -> std::io::Result<()> {
113        self.inner.flush()
114    }
115}
116
117/// Callback invoked on closing a column chunk
118pub type OnCloseColumnChunk<'a> = Box<dyn FnOnce(ColumnCloseResult) -> Result<()> + 'a>;
119
120/// Callback invoked on closing a row group, arguments are:
121///
122/// - the row group metadata
123/// - the column index for each column chunk
124/// - the offset index for each column chunk
125pub type OnCloseRowGroup<'a, W> = Box<
126    dyn FnOnce(
127            &'a mut TrackedWrite<W>,
128            RowGroupMetaData,
129            Vec<Option<Sbbf>>,
130            Vec<Option<ColumnIndexMetaData>>,
131            Vec<Option<OffsetIndexMetaData>>,
132        ) -> Result<()>
133        + 'a
134        + Send,
135>;
136
137// ----------------------------------------------------------------------
138// Serialized impl for file & row group writers
139
140/// Parquet file writer API.
141///
142/// This is a low level API for writing Parquet files directly, and handles
143/// tracking the location of file structures such as row groups and column
144/// chunks, and writing the metadata and file footer.
145///
146/// Data is written to row groups using  [`SerializedRowGroupWriter`] and
147/// columns using [`SerializedColumnWriter`]. The `SerializedFileWriter` tracks
148/// where all the data is written, and assembles the final file metadata.
149///
150/// The main workflow should be as following:
151/// - Create file writer, this will open a new file and potentially write some metadata.
152/// - Request a new row group writer by calling `next_row_group`.
153/// - Once finished writing row group, close row group writer by calling `close`
154/// - Write subsequent row groups, if necessary.
155/// - After all row groups have been written, close the file writer using `close` method.
156pub struct SerializedFileWriter<W: Write> {
157    buf: TrackedWrite<W>,
158    descr: SchemaDescPtr,
159    props: WriterPropertiesPtr,
160    row_groups: Vec<RowGroupMetaData>,
161    bloom_filters: Vec<Vec<Option<Sbbf>>>,
162    column_indexes: Vec<Vec<Option<ColumnIndexMetaData>>>,
163    offset_indexes: Vec<Vec<Option<OffsetIndexMetaData>>>,
164    row_group_index: usize,
165    // kv_metadatas will be appended to `props` when `write_metadata`
166    kv_metadatas: Vec<KeyValue>,
167    finished: bool,
168    #[cfg(feature = "encryption")]
169    file_encryptor: Option<Arc<FileEncryptor>>,
170}
171
172impl<W: Write> Debug for SerializedFileWriter<W> {
173    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
174        // implement Debug so this can be used with #[derive(Debug)]
175        // in client code rather than actually listing all the fields
176        f.debug_struct("SerializedFileWriter")
177            .field("descr", &self.descr)
178            .field("row_group_index", &self.row_group_index)
179            .field("kv_metadatas", &self.kv_metadatas)
180            .finish_non_exhaustive()
181    }
182}
183
184impl<W: Write + Send> SerializedFileWriter<W> {
185    /// Creates new file writer.
186    pub fn new(buf: W, schema: TypePtr, properties: WriterPropertiesPtr) -> Result<Self> {
187        let mut buf = TrackedWrite::new(buf);
188
189        let schema_descriptor = SchemaDescriptor::new(schema.clone());
190
191        #[cfg(feature = "encryption")]
192        let file_encryptor = Self::get_file_encryptor(&properties, &schema_descriptor)?;
193
194        Self::start_file(&properties, &mut buf)?;
195        Ok(Self {
196            buf,
197            descr: Arc::new(schema_descriptor),
198            props: properties,
199            row_groups: vec![],
200            bloom_filters: vec![],
201            column_indexes: Vec::new(),
202            offset_indexes: Vec::new(),
203            row_group_index: 0,
204            kv_metadatas: Vec::new(),
205            finished: false,
206            #[cfg(feature = "encryption")]
207            file_encryptor,
208        })
209    }
210
211    #[cfg(feature = "encryption")]
212    fn get_file_encryptor(
213        properties: &WriterPropertiesPtr,
214        schema_descriptor: &SchemaDescriptor,
215    ) -> Result<Option<Arc<FileEncryptor>>> {
216        if let Some(file_encryption_properties) = properties.file_encryption_properties() {
217            file_encryption_properties.validate_encrypted_column_names(schema_descriptor)?;
218
219            Ok(Some(Arc::new(FileEncryptor::new(Arc::clone(
220                file_encryption_properties,
221            ))?)))
222        } else {
223            Ok(None)
224        }
225    }
226
227    /// Creates new row group from this file writer.
228    ///
229    /// Note: Parquet files are limited to at most 2^15 row groups in a file; and row groups must
230    /// be written sequentially.
231    ///
232    /// Every time the next row group is requested, the previous row group must
233    /// be finalised and closed using the [`SerializedRowGroupWriter::close`]
234    /// method or an error will be returned.
235    pub fn next_row_group(&mut self) -> Result<SerializedRowGroupWriter<'_, W>> {
236        self.assert_previous_writer_closed()?;
237        let ordinal = self.row_group_index;
238
239        let ordinal: i16 = ordinal.try_into().map_err(|_| {
240            ParquetError::General(format!(
241                "Parquet does not support more than {} row groups per file (currently: {})",
242                i16::MAX,
243                ordinal
244            ))
245        })?;
246
247        self.row_group_index = self
248            .row_group_index
249            .checked_add(1)
250            .expect("SerializedFileWriter::row_group_index overflowed");
251
252        let bloom_filter_position = self.properties().bloom_filter_position();
253        let row_groups = &mut self.row_groups;
254        let row_bloom_filters = &mut self.bloom_filters;
255        let row_column_indexes = &mut self.column_indexes;
256        let row_offset_indexes = &mut self.offset_indexes;
257        let on_close = move |buf,
258                             mut metadata,
259                             row_group_bloom_filter,
260                             row_group_column_index,
261                             row_group_offset_index| {
262            row_bloom_filters.push(row_group_bloom_filter);
263            row_column_indexes.push(row_group_column_index);
264            row_offset_indexes.push(row_group_offset_index);
265            // write bloom filters out immediately after the row group if requested
266            match bloom_filter_position {
267                BloomFilterPosition::AfterRowGroup => {
268                    write_bloom_filters(buf, row_bloom_filters, &mut metadata)?
269                }
270                BloomFilterPosition::End => (),
271            };
272            row_groups.push(metadata);
273            Ok(())
274        };
275
276        let row_group_writer = SerializedRowGroupWriter::new(
277            self.descr.clone(),
278            self.props.clone(),
279            &mut self.buf,
280            ordinal,
281            Some(Box::new(on_close)),
282        );
283        #[cfg(feature = "encryption")]
284        let row_group_writer = row_group_writer.with_file_encryptor(self.file_encryptor.clone());
285
286        Ok(row_group_writer)
287    }
288
289    /// Returns metadata for any flushed row groups
290    pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
291        &self.row_groups
292    }
293
294    /// Close and finalize the underlying Parquet writer
295    ///
296    /// Unlike [`Self::close`] this does not consume self
297    ///
298    /// Attempting to write after calling finish will result in an error
299    pub fn finish(&mut self) -> Result<ParquetMetaData> {
300        self.assert_previous_writer_closed()?;
301        let metadata = self.write_metadata()?;
302        self.buf.flush()?;
303        Ok(metadata)
304    }
305
306    /// Closes and finalises file writer, returning the file metadata.
307    pub fn close(mut self) -> Result<ParquetMetaData> {
308        self.finish()
309    }
310
311    /// Writes magic bytes at the beginning of the file.
312    #[cfg(not(feature = "encryption"))]
313    fn start_file(_properties: &WriterPropertiesPtr, buf: &mut TrackedWrite<W>) -> Result<()> {
314        buf.write_all(get_file_magic())?;
315        Ok(())
316    }
317
318    /// Writes magic bytes at the beginning of the file.
319    #[cfg(feature = "encryption")]
320    fn start_file(properties: &WriterPropertiesPtr, buf: &mut TrackedWrite<W>) -> Result<()> {
321        let magic = get_file_magic(properties.file_encryption_properties.as_ref());
322
323        buf.write_all(magic)?;
324        Ok(())
325    }
326
327    /// Assembles and writes metadata at the end of the file. This will take ownership
328    /// of `row_groups` and the page index structures.
329    fn write_metadata(&mut self) -> Result<ParquetMetaData> {
330        self.finished = true;
331
332        // write out any remaining bloom filters after all row groups
333        for row_group in &mut self.row_groups {
334            write_bloom_filters(&mut self.buf, &mut self.bloom_filters, row_group)?;
335        }
336
337        let key_value_metadata = match self.props.key_value_metadata() {
338            Some(kv) => Some(kv.iter().chain(&self.kv_metadatas).cloned().collect()),
339            None if self.kv_metadatas.is_empty() => None,
340            None => Some(self.kv_metadatas.clone()),
341        };
342
343        // take ownership of metadata
344        let row_groups = std::mem::take(&mut self.row_groups);
345        let column_indexes = std::mem::take(&mut self.column_indexes);
346        let offset_indexes = std::mem::take(&mut self.offset_indexes);
347
348        let mut encoder = ThriftMetadataWriter::new(
349            &mut self.buf,
350            &self.descr,
351            row_groups,
352            Some(self.props.created_by().to_string()),
353            self.props.writer_version().as_num(),
354        );
355
356        #[cfg(feature = "encryption")]
357        {
358            encoder = encoder.with_file_encryptor(self.file_encryptor.clone());
359        }
360
361        if let Some(key_value_metadata) = key_value_metadata {
362            encoder = encoder.with_key_value_metadata(key_value_metadata)
363        }
364
365        encoder = encoder.with_column_indexes(column_indexes);
366        if !self.props.offset_index_disabled() {
367            encoder = encoder.with_offset_indexes(offset_indexes);
368        }
369        encoder.finish()
370    }
371
372    #[inline]
373    fn assert_previous_writer_closed(&self) -> Result<()> {
374        if self.finished {
375            return Err(general_err!("SerializedFileWriter already finished"));
376        }
377
378        if self.row_group_index != self.row_groups.len() {
379            Err(general_err!("Previous row group writer was not closed"))
380        } else {
381            Ok(())
382        }
383    }
384
385    /// Add a [`KeyValue`] to the file writer's metadata
386    pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
387        self.kv_metadatas.push(kv_metadata);
388    }
389
390    /// Returns a reference to schema descriptor.
391    pub fn schema_descr(&self) -> &SchemaDescriptor {
392        &self.descr
393    }
394
395    /// Returns a reference to schema descriptor Arc.
396    #[cfg(feature = "arrow")]
397    pub(crate) fn schema_descr_ptr(&self) -> &SchemaDescPtr {
398        &self.descr
399    }
400
401    /// Returns a reference to the writer properties
402    pub fn properties(&self) -> &WriterPropertiesPtr {
403        &self.props
404    }
405
406    /// Returns a reference to the underlying writer.
407    pub fn inner(&self) -> &W {
408        self.buf.inner()
409    }
410
411    /// Writes the given buf bytes to the internal buffer.
412    ///
413    /// This can be used to write raw data to an in-progress Parquet file, for
414    /// example, custom index structures or other payloads. Other Parquet readers
415    /// will skip this data when reading the files.
416    ///
417    /// It's safe to use this method to write data to the underlying writer,
418    /// because it will ensure that the buffering and byte‐counting layers are used.
419    pub fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
420        self.buf.write_all(buf)
421    }
422
423    /// Flushes underlying writer
424    pub fn flush(&mut self) -> std::io::Result<()> {
425        self.buf.flush()
426    }
427
428    /// Returns a mutable reference to the underlying writer.
429    ///
430    /// **Warning**: if you write directly to this writer, you will skip
431    /// the `TrackedWrite` buffering and byte‐counting layers, which can cause
432    /// the file footer’s recorded offsets and sizes to diverge from reality,
433    /// resulting in an unreadable or corrupted Parquet file.
434    ///
435    /// If you want to write safely to the underlying writer, use [`Self::write_all`].
436    pub fn inner_mut(&mut self) -> &mut W {
437        self.buf.inner_mut()
438    }
439
440    /// Writes the file footer and returns the underlying writer.
441    pub fn into_inner(mut self) -> Result<W> {
442        self.assert_previous_writer_closed()?;
443        let _ = self.write_metadata()?;
444
445        self.buf.into_inner()
446    }
447
448    /// Returns the number of bytes written to this instance
449    pub fn bytes_written(&self) -> usize {
450        self.buf.bytes_written()
451    }
452
453    /// Get the file encryptor used by this instance to encrypt data
454    #[cfg(feature = "encryption")]
455    pub(crate) fn file_encryptor(&self) -> Option<Arc<FileEncryptor>> {
456        self.file_encryptor.clone()
457    }
458}
459
460/// Serialize all the bloom filters of the given row group to the given buffer,
461/// and returns the updated row group metadata.
462fn write_bloom_filters<W: Write + Send>(
463    buf: &mut TrackedWrite<W>,
464    bloom_filters: &mut [Vec<Option<Sbbf>>],
465    row_group: &mut RowGroupMetaData,
466) -> Result<()> {
467    // iter row group
468    // iter each column
469    // write bloom filter to the file
470
471    let row_group_idx: u16 = row_group
472        .ordinal()
473        .expect("Missing row group ordinal")
474        .try_into()
475        .map_err(|_| {
476            ParquetError::General(format!(
477                "Negative row group ordinal: {})",
478                row_group.ordinal().unwrap()
479            ))
480        })?;
481    let row_group_idx = row_group_idx as usize;
482    for (column_idx, column_chunk) in row_group.columns_mut().iter_mut().enumerate() {
483        if let Some(bloom_filter) = bloom_filters[row_group_idx][column_idx].take() {
484            let start_offset = buf.bytes_written();
485            bloom_filter.write(&mut *buf)?;
486            let end_offset = buf.bytes_written();
487            // set offset and index for bloom filter
488            *column_chunk = column_chunk
489                .clone()
490                .into_builder()
491                .set_bloom_filter_offset(Some(start_offset as i64))
492                .set_bloom_filter_length(Some((end_offset - start_offset) as i32))
493                .build()?;
494        }
495    }
496    Ok(())
497}
498
499/// Parquet row group writer API.
500///
501/// Provides methods to access column writers in an iterator-like fashion, order is
502/// guaranteed to match the order of schema leaves (column descriptors).
503///
504/// All columns should be written sequentially; the main workflow is:
505/// - Request the next column using `next_column` method - this will return `None` if no
506///   more columns are available to write.
507/// - Once done writing a column, close column writer with `close`
508/// - Once all columns have been written, close row group writer with `close`
509///   method. The close method will return row group metadata and is no-op
510///   on already closed row group.
511pub struct SerializedRowGroupWriter<'a, W: Write> {
512    descr: SchemaDescPtr,
513    props: WriterPropertiesPtr,
514    buf: &'a mut TrackedWrite<W>,
515    total_rows_written: Option<u64>,
516    total_bytes_written: u64,
517    total_uncompressed_bytes: i64,
518    column_index: usize,
519    row_group_metadata: Option<RowGroupMetaDataPtr>,
520    column_chunks: Vec<ColumnChunkMetaData>,
521    bloom_filters: Vec<Option<Sbbf>>,
522    column_indexes: Vec<Option<ColumnIndexMetaData>>,
523    offset_indexes: Vec<Option<OffsetIndexMetaData>>,
524    row_group_index: i16,
525    file_offset: i64,
526    on_close: Option<OnCloseRowGroup<'a, W>>,
527    #[cfg(feature = "encryption")]
528    file_encryptor: Option<Arc<FileEncryptor>>,
529}
530
531impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> {
532    /// Creates a new `SerializedRowGroupWriter` with:
533    ///
534    /// - `schema_descr` - the schema to write
535    /// - `properties` - writer properties
536    /// - `buf` - the buffer to write data to
537    /// - `row_group_index` - row group index in this parquet file.
538    /// - `file_offset` - file offset of this row group in this parquet file.
539    /// - `on_close` - an optional callback that will invoked on [`Self::close`]
540    pub fn new(
541        schema_descr: SchemaDescPtr,
542        properties: WriterPropertiesPtr,
543        buf: &'a mut TrackedWrite<W>,
544        row_group_index: i16,
545        on_close: Option<OnCloseRowGroup<'a, W>>,
546    ) -> Self {
547        let num_columns = schema_descr.num_columns();
548        let file_offset = buf.bytes_written() as i64;
549        Self {
550            buf,
551            row_group_index,
552            file_offset,
553            on_close,
554            total_rows_written: None,
555            descr: schema_descr,
556            props: properties,
557            column_index: 0,
558            row_group_metadata: None,
559            column_chunks: Vec::with_capacity(num_columns),
560            bloom_filters: Vec::with_capacity(num_columns),
561            column_indexes: Vec::with_capacity(num_columns),
562            offset_indexes: Vec::with_capacity(num_columns),
563            total_bytes_written: 0,
564            total_uncompressed_bytes: 0,
565            #[cfg(feature = "encryption")]
566            file_encryptor: None,
567        }
568    }
569
570    #[cfg(feature = "encryption")]
571    /// Set the file encryptor to use for encrypting row group data and metadata
572    pub(crate) fn with_file_encryptor(
573        mut self,
574        file_encryptor: Option<Arc<FileEncryptor>>,
575    ) -> Self {
576        self.file_encryptor = file_encryptor;
577        self
578    }
579
580    /// Advance `self.column_index` returning the next [`ColumnDescPtr`] if any
581    fn next_column_desc(&mut self) -> Option<ColumnDescPtr> {
582        let ret = self.descr.columns().get(self.column_index)?.clone();
583        self.column_index += 1;
584        Some(ret)
585    }
586
587    /// Returns [`OnCloseColumnChunk`] for the next writer
588    fn get_on_close(&mut self) -> (&mut TrackedWrite<W>, OnCloseColumnChunk<'_>) {
589        let total_bytes_written = &mut self.total_bytes_written;
590        let total_uncompressed_bytes = &mut self.total_uncompressed_bytes;
591        let total_rows_written = &mut self.total_rows_written;
592        let column_chunks = &mut self.column_chunks;
593        let column_indexes = &mut self.column_indexes;
594        let offset_indexes = &mut self.offset_indexes;
595        let bloom_filters = &mut self.bloom_filters;
596
597        let on_close = |r: ColumnCloseResult| {
598            // Update row group writer metrics
599            *total_bytes_written += r.bytes_written;
600            *total_uncompressed_bytes += r.metadata.uncompressed_size();
601            column_chunks.push(r.metadata);
602            bloom_filters.push(r.bloom_filter);
603            column_indexes.push(r.column_index);
604            offset_indexes.push(r.offset_index);
605
606            if let Some(rows) = *total_rows_written {
607                if rows != r.rows_written {
608                    return Err(general_err!(
609                        "Incorrect number of rows, expected {} != {} rows",
610                        rows,
611                        r.rows_written
612                    ));
613                }
614            } else {
615                *total_rows_written = Some(r.rows_written);
616            }
617
618            Ok(())
619        };
620        (self.buf, Box::new(on_close))
621    }
622
623    /// Returns the next column writer, if available, using the factory function;
624    /// otherwise returns `None`.
625    pub(crate) fn next_column_with_factory<'b, F, C>(&'b mut self, factory: F) -> Result<Option<C>>
626    where
627        F: FnOnce(
628            ColumnDescPtr,
629            WriterPropertiesPtr,
630            Box<dyn PageWriter + 'b>,
631            OnCloseColumnChunk<'b>,
632        ) -> Result<C>,
633    {
634        self.assert_previous_writer_closed()?;
635
636        let encryptor_context = self.get_page_encryptor_context();
637
638        Ok(match self.next_column_desc() {
639            Some(column) => {
640                let props = self.props.clone();
641                let (buf, on_close) = self.get_on_close();
642
643                let page_writer = SerializedPageWriter::new(buf);
644                let page_writer =
645                    Self::set_page_writer_encryptor(&column, encryptor_context, page_writer)?;
646
647                Some(factory(
648                    column,
649                    props,
650                    Box::new(page_writer),
651                    Box::new(on_close),
652                )?)
653            }
654            None => None,
655        })
656    }
657
658    /// Returns the next column writer, if available; otherwise returns `None`.
659    /// In case of any IO error or Thrift error, or if row group writer has already been
660    /// closed returns `Err`.
661    pub fn next_column(&mut self) -> Result<Option<SerializedColumnWriter<'_>>> {
662        self.next_column_with_factory(|descr, props, page_writer, on_close| {
663            let column_writer = get_column_writer(descr, props, page_writer);
664            Ok(SerializedColumnWriter::new(column_writer, Some(on_close)))
665        })
666    }
667
668    /// Append an encoded column chunk from `reader` directly to the underlying
669    /// writer.
670    ///
671    /// This method can be used for efficiently concatenating or projecting
672    /// Parquet data, or encoding Parquet data to temporary in-memory buffers.
673    ///
674    /// Arguments:
675    /// - `reader`: a [`ChunkReader`] containing the encoded column data
676    /// - `close`: the [`ColumnCloseResult`] metadata returned from closing
677    ///   the column writer that wrote the data in `reader`.
678    ///
679    /// See Also:
680    /// 1. [`get_column_writer`]  for creating writers that can encode data.
681    /// 2. [`Self::next_column`] for writing data that isn't already encoded
682    pub fn append_column<R: ChunkReader>(
683        &mut self,
684        reader: &R,
685        mut close: ColumnCloseResult,
686    ) -> Result<()> {
687        self.assert_previous_writer_closed()?;
688        let desc = self
689            .next_column_desc()
690            .ok_or_else(|| general_err!("exhausted columns in SerializedRowGroupWriter"))?;
691
692        let metadata = close.metadata;
693
694        if metadata.column_descr() != desc.as_ref() {
695            return Err(general_err!(
696                "column descriptor mismatch, expected {:?} got {:?}",
697                desc,
698                metadata.column_descr()
699            ));
700        }
701
702        let src_dictionary_offset = metadata.dictionary_page_offset();
703        let src_data_offset = metadata.data_page_offset();
704        let src_offset = src_dictionary_offset.unwrap_or(src_data_offset);
705        let src_length = metadata.compressed_size();
706
707        let write_offset = self.buf.bytes_written();
708        let mut read = reader.get_read(src_offset as _)?.take(src_length as _);
709        let write_length = std::io::copy(&mut read, &mut self.buf)?;
710
711        if src_length as u64 != write_length {
712            return Err(general_err!(
713                "Failed to splice column data, expected {read_length} got {write_length}"
714            ));
715        }
716
717        let map_offset = |x| x - src_offset + write_offset as i64;
718        let mut builder = ColumnChunkMetaData::builder(metadata.column_descr_ptr())
719            .set_compression(metadata.compression())
720            .set_encodings_mask(*metadata.encodings_mask())
721            .set_total_compressed_size(metadata.compressed_size())
722            .set_total_uncompressed_size(metadata.uncompressed_size())
723            .set_num_values(metadata.num_values())
724            .set_data_page_offset(map_offset(src_data_offset))
725            .set_dictionary_page_offset(src_dictionary_offset.map(map_offset))
726            .set_unencoded_byte_array_data_bytes(metadata.unencoded_byte_array_data_bytes());
727
728        if let Some(rep_hist) = metadata.repetition_level_histogram() {
729            builder = builder.set_repetition_level_histogram(Some(rep_hist.clone()))
730        }
731        if let Some(def_hist) = metadata.definition_level_histogram() {
732            builder = builder.set_definition_level_histogram(Some(def_hist.clone()))
733        }
734        if let Some(statistics) = metadata.statistics() {
735            builder = builder.set_statistics(statistics.clone())
736        }
737        if let Some(geo_statistics) = metadata.geo_statistics() {
738            builder = builder.set_geo_statistics(Box::new(geo_statistics.clone()))
739        }
740        if let Some(page_encoding_stats) = metadata.page_encoding_stats() {
741            builder = builder.set_page_encoding_stats(page_encoding_stats.clone())
742        }
743        builder = self.set_column_crypto_metadata(builder, &metadata);
744        close.metadata = builder.build()?;
745
746        if let Some(offsets) = close.offset_index.as_mut() {
747            for location in &mut offsets.page_locations {
748                location.offset = map_offset(location.offset)
749            }
750        }
751
752        let (_, on_close) = self.get_on_close();
753        on_close(close)
754    }
755
756    /// Closes this row group writer and returns row group metadata.
757    pub fn close(mut self) -> Result<RowGroupMetaDataPtr> {
758        if self.row_group_metadata.is_none() {
759            self.assert_previous_writer_closed()?;
760
761            let column_chunks = std::mem::take(&mut self.column_chunks);
762            let row_group_metadata = RowGroupMetaData::builder(self.descr.clone())
763                .set_column_metadata(column_chunks)
764                .set_total_byte_size(self.total_uncompressed_bytes)
765                .set_num_rows(self.total_rows_written.unwrap_or(0) as i64)
766                .set_sorting_columns(self.props.sorting_columns().cloned())
767                .set_ordinal(self.row_group_index)
768                .set_file_offset(self.file_offset)
769                .build()?;
770
771            self.row_group_metadata = Some(Arc::new(row_group_metadata.clone()));
772
773            if let Some(on_close) = self.on_close.take() {
774                on_close(
775                    self.buf,
776                    row_group_metadata,
777                    self.bloom_filters,
778                    self.column_indexes,
779                    self.offset_indexes,
780                )?
781            }
782        }
783
784        let metadata = self.row_group_metadata.as_ref().unwrap().clone();
785        Ok(metadata)
786    }
787
788    /// Set the column crypto metadata for a column chunk
789    #[cfg(feature = "encryption")]
790    fn set_column_crypto_metadata(
791        &self,
792        builder: ColumnChunkMetaDataBuilder,
793        metadata: &ColumnChunkMetaData,
794    ) -> ColumnChunkMetaDataBuilder {
795        if let Some(file_encryptor) = self.file_encryptor.as_ref() {
796            builder.set_column_crypto_metadata(get_column_crypto_metadata(
797                file_encryptor.properties(),
798                &metadata.column_descr_ptr(),
799            ))
800        } else {
801            builder
802        }
803    }
804
805    /// Get context required to create a [`PageEncryptor`] for a column
806    #[cfg(feature = "encryption")]
807    fn get_page_encryptor_context(&self) -> PageEncryptorContext {
808        PageEncryptorContext {
809            file_encryptor: self.file_encryptor.clone(),
810            row_group_index: self.row_group_index as usize,
811            column_index: self.column_index,
812        }
813    }
814
815    /// Set the [`PageEncryptor`] on a page writer if a column is encrypted
816    #[cfg(feature = "encryption")]
817    fn set_page_writer_encryptor<'b>(
818        column: &ColumnDescPtr,
819        context: PageEncryptorContext,
820        page_writer: SerializedPageWriter<'b, W>,
821    ) -> Result<SerializedPageWriter<'b, W>> {
822        let page_encryptor = PageEncryptor::create_if_column_encrypted(
823            &context.file_encryptor,
824            context.row_group_index,
825            context.column_index,
826            &column.path().string(),
827        )?;
828
829        Ok(page_writer.with_page_encryptor(page_encryptor))
830    }
831
832    /// No-op implementation of setting the column crypto metadata for a column chunk
833    #[cfg(not(feature = "encryption"))]
834    fn set_column_crypto_metadata(
835        &self,
836        builder: ColumnChunkMetaDataBuilder,
837        _metadata: &ColumnChunkMetaData,
838    ) -> ColumnChunkMetaDataBuilder {
839        builder
840    }
841
842    #[cfg(not(feature = "encryption"))]
843    fn get_page_encryptor_context(&self) -> PageEncryptorContext {
844        PageEncryptorContext {}
845    }
846
847    /// No-op implementation of setting a [`PageEncryptor`] for when encryption is disabled
848    #[cfg(not(feature = "encryption"))]
849    fn set_page_writer_encryptor<'b>(
850        _column: &ColumnDescPtr,
851        _context: PageEncryptorContext,
852        page_writer: SerializedPageWriter<'b, W>,
853    ) -> Result<SerializedPageWriter<'b, W>> {
854        Ok(page_writer)
855    }
856
857    #[inline]
858    fn assert_previous_writer_closed(&self) -> Result<()> {
859        if self.column_index != self.column_chunks.len() {
860            Err(general_err!("Previous column writer was not closed"))
861        } else {
862            Ok(())
863        }
864    }
865}
866
867/// Context required to create a [`PageEncryptor`] for a column
868#[cfg(feature = "encryption")]
869struct PageEncryptorContext {
870    file_encryptor: Option<Arc<FileEncryptor>>,
871    row_group_index: usize,
872    column_index: usize,
873}
874
875#[cfg(not(feature = "encryption"))]
876struct PageEncryptorContext {}
877
878/// A wrapper around a [`ColumnWriter`] that invokes a callback on [`Self::close`]
879pub struct SerializedColumnWriter<'a> {
880    inner: ColumnWriter<'a>,
881    on_close: Option<OnCloseColumnChunk<'a>>,
882}
883
884impl<'a> SerializedColumnWriter<'a> {
885    /// Create a new [`SerializedColumnWriter`] from a [`ColumnWriter`] and an
886    /// optional callback to be invoked on [`Self::close`]
887    pub fn new(inner: ColumnWriter<'a>, on_close: Option<OnCloseColumnChunk<'a>>) -> Self {
888        Self { inner, on_close }
889    }
890
891    /// Returns a reference to an untyped [`ColumnWriter`]
892    pub fn untyped(&mut self) -> &mut ColumnWriter<'a> {
893        &mut self.inner
894    }
895
896    /// Returns a reference to a typed [`ColumnWriterImpl`]
897    pub fn typed<T: DataType>(&mut self) -> &mut ColumnWriterImpl<'a, T> {
898        get_typed_column_writer_mut(&mut self.inner)
899    }
900
901    /// Close this [`SerializedColumnWriter`]
902    pub fn close(mut self) -> Result<()> {
903        let r = self.inner.close()?;
904        if let Some(on_close) = self.on_close.take() {
905            on_close(r)?
906        }
907
908        Ok(())
909    }
910}
911
912/// A serialized implementation for Parquet [`PageWriter`].
913/// Writes and serializes pages and metadata into output stream.
914///
915/// `SerializedPageWriter` should not be used after calling `close()`.
916pub struct SerializedPageWriter<'a, W: Write> {
917    sink: &'a mut TrackedWrite<W>,
918    #[cfg(feature = "encryption")]
919    page_encryptor: Option<PageEncryptor>,
920}
921
922impl<'a, W: Write> SerializedPageWriter<'a, W> {
923    /// Creates new page writer.
924    pub fn new(sink: &'a mut TrackedWrite<W>) -> Self {
925        Self {
926            sink,
927            #[cfg(feature = "encryption")]
928            page_encryptor: None,
929        }
930    }
931
932    /// Serializes page header into Thrift.
933    /// Returns number of bytes that have been written into the sink.
934    #[inline]
935    fn serialize_page_header(&mut self, header: PageHeader) -> Result<usize> {
936        let start_pos = self.sink.bytes_written();
937        match self.page_encryptor_and_sink_mut() {
938            Some((page_encryptor, sink)) => {
939                page_encryptor.encrypt_page_header(&header, sink)?;
940            }
941            None => {
942                let mut protocol = ThriftCompactOutputProtocol::new(&mut self.sink);
943                header.write_thrift(&mut protocol)?;
944            }
945        }
946        Ok(self.sink.bytes_written() - start_pos)
947    }
948}
949
950#[cfg(feature = "encryption")]
951impl<'a, W: Write> SerializedPageWriter<'a, W> {
952    /// Set the encryptor to use to encrypt page data
953    fn with_page_encryptor(mut self, page_encryptor: Option<PageEncryptor>) -> Self {
954        self.page_encryptor = page_encryptor;
955        self
956    }
957
958    fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
959        self.page_encryptor.as_mut()
960    }
961
962    fn page_encryptor_and_sink_mut(
963        &mut self,
964    ) -> Option<(&mut PageEncryptor, &mut &'a mut TrackedWrite<W>)> {
965        self.page_encryptor.as_mut().map(|pe| (pe, &mut self.sink))
966    }
967}
968
969#[cfg(not(feature = "encryption"))]
970impl<'a, W: Write> SerializedPageWriter<'a, W> {
971    fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
972        None
973    }
974
975    fn page_encryptor_and_sink_mut(
976        &mut self,
977    ) -> Option<(&mut PageEncryptor, &mut &'a mut TrackedWrite<W>)> {
978        None
979    }
980}
981
982impl<W: Write + Send> PageWriter for SerializedPageWriter<'_, W> {
983    fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
984        let page = match self.page_encryptor_mut() {
985            Some(page_encryptor) => page_encryptor.encrypt_compressed_page(page)?,
986            None => page,
987        };
988
989        let page_type = page.page_type();
990        let start_pos = self.sink.bytes_written() as u64;
991
992        let page_header = page.to_thrift_header()?;
993        let header_size = self.serialize_page_header(page_header)?;
994
995        self.sink.write_all(page.data())?;
996
997        let mut spec = PageWriteSpec::new();
998        spec.page_type = page_type;
999        spec.uncompressed_size = page.uncompressed_size() + header_size;
1000        spec.compressed_size = page.compressed_size() + header_size;
1001        spec.offset = start_pos;
1002        spec.bytes_written = self.sink.bytes_written() as u64 - start_pos;
1003        spec.num_values = page.num_values();
1004
1005        if let Some(page_encryptor) = self.page_encryptor_mut() {
1006            if page.compressed_page().is_data_page() {
1007                page_encryptor.increment_page();
1008            }
1009        }
1010        Ok(spec)
1011    }
1012
1013    fn close(&mut self) -> Result<()> {
1014        self.sink.flush()?;
1015        Ok(())
1016    }
1017}
1018
1019/// Get the magic bytes at the start and end of the file that identify this
1020/// as a Parquet file.
1021#[cfg(feature = "encryption")]
1022pub(crate) fn get_file_magic(
1023    file_encryption_properties: Option<&Arc<FileEncryptionProperties>>,
1024) -> &'static [u8; 4] {
1025    match file_encryption_properties.as_ref() {
1026        Some(encryption_properties) if encryption_properties.encrypt_footer() => {
1027            &PARQUET_MAGIC_ENCR_FOOTER
1028        }
1029        _ => &PARQUET_MAGIC,
1030    }
1031}
1032
1033#[cfg(not(feature = "encryption"))]
1034pub(crate) fn get_file_magic() -> &'static [u8; 4] {
1035    &PARQUET_MAGIC
1036}
1037
1038#[cfg(test)]
1039mod tests {
1040    use super::*;
1041
1042    #[cfg(feature = "arrow")]
1043    use arrow_array::RecordBatchReader;
1044    use bytes::Bytes;
1045    use std::fs::File;
1046
1047    #[cfg(feature = "arrow")]
1048    use crate::arrow::ArrowWriter;
1049    #[cfg(feature = "arrow")]
1050    use crate::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
1051    use crate::basic::{
1052        ColumnOrder, Compression, ConvertedType, Encoding, LogicalType, Repetition, SortOrder, Type,
1053    };
1054    use crate::column::page::{Page, PageReader};
1055    use crate::column::reader::get_typed_column_reader;
1056    use crate::compression::{Codec, CodecOptionsBuilder, create_codec};
1057    use crate::data_type::{BoolType, ByteArrayType, Int32Type};
1058    use crate::file::page_index::column_index::ColumnIndexMetaData;
1059    use crate::file::properties::EnabledStatistics;
1060    use crate::file::serialized_reader::ReadOptionsBuilder;
1061    use crate::file::statistics::{from_thrift_page_stats, page_stats_to_thrift};
1062    use crate::file::{
1063        properties::{ReaderProperties, WriterProperties, WriterVersion},
1064        reader::{FileReader, SerializedFileReader, SerializedPageReader},
1065        statistics::Statistics,
1066    };
1067    use crate::record::{Row, RowAccessor};
1068    use crate::schema::parser::parse_message_type;
1069    use crate::schema::types;
1070    use crate::schema::types::{ColumnDescriptor, ColumnPath};
1071    use crate::util::test_common::file_util::get_test_file;
1072    use crate::util::test_common::rand_gen::RandGen;
1073
1074    #[test]
1075    fn test_row_group_writer_error_not_all_columns_written() {
1076        let file = tempfile::tempfile().unwrap();
1077        let schema = Arc::new(
1078            types::Type::group_type_builder("schema")
1079                .with_fields(vec![Arc::new(
1080                    types::Type::primitive_type_builder("col1", Type::INT32)
1081                        .build()
1082                        .unwrap(),
1083                )])
1084                .build()
1085                .unwrap(),
1086        );
1087        let props = Default::default();
1088        let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
1089        let row_group_writer = writer.next_row_group().unwrap();
1090        let res = row_group_writer.close();
1091        assert!(res.is_err());
1092        if let Err(err) = res {
1093            assert_eq!(
1094                format!("{err}"),
1095                "Parquet error: Column length mismatch: 1 != 0"
1096            );
1097        }
1098    }
1099
1100    #[test]
1101    fn test_row_group_writer_num_records_mismatch() {
1102        let file = tempfile::tempfile().unwrap();
1103        let schema = Arc::new(
1104            types::Type::group_type_builder("schema")
1105                .with_fields(vec![
1106                    Arc::new(
1107                        types::Type::primitive_type_builder("col1", Type::INT32)
1108                            .with_repetition(Repetition::REQUIRED)
1109                            .build()
1110                            .unwrap(),
1111                    ),
1112                    Arc::new(
1113                        types::Type::primitive_type_builder("col2", Type::INT32)
1114                            .with_repetition(Repetition::REQUIRED)
1115                            .build()
1116                            .unwrap(),
1117                    ),
1118                ])
1119                .build()
1120                .unwrap(),
1121        );
1122        let props = Default::default();
1123        let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
1124        let mut row_group_writer = writer.next_row_group().unwrap();
1125
1126        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
1127        col_writer
1128            .typed::<Int32Type>()
1129            .write_batch(&[1, 2, 3], None, None)
1130            .unwrap();
1131        col_writer.close().unwrap();
1132
1133        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
1134        col_writer
1135            .typed::<Int32Type>()
1136            .write_batch(&[1, 2], None, None)
1137            .unwrap();
1138
1139        let err = col_writer.close().unwrap_err();
1140        assert_eq!(
1141            err.to_string(),
1142            "Parquet error: Incorrect number of rows, expected 3 != 2 rows"
1143        );
1144    }
1145
1146    #[test]
1147    fn test_file_writer_empty_file() {
1148        let file = tempfile::tempfile().unwrap();
1149
1150        let schema = Arc::new(
1151            types::Type::group_type_builder("schema")
1152                .with_fields(vec![Arc::new(
1153                    types::Type::primitive_type_builder("col1", Type::INT32)
1154                        .build()
1155                        .unwrap(),
1156                )])
1157                .build()
1158                .unwrap(),
1159        );
1160        let props = Default::default();
1161        let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1162        writer.close().unwrap();
1163
1164        let reader = SerializedFileReader::new(file).unwrap();
1165        assert_eq!(reader.get_row_iter(None).unwrap().count(), 0);
1166    }
1167
1168    #[test]
1169    fn test_file_writer_column_orders_populated() {
1170        let file = tempfile::tempfile().unwrap();
1171
1172        let schema = Arc::new(
1173            types::Type::group_type_builder("schema")
1174                .with_fields(vec![
1175                    Arc::new(
1176                        types::Type::primitive_type_builder("col1", Type::INT32)
1177                            .build()
1178                            .unwrap(),
1179                    ),
1180                    Arc::new(
1181                        types::Type::primitive_type_builder("col2", Type::FIXED_LEN_BYTE_ARRAY)
1182                            .with_converted_type(ConvertedType::INTERVAL)
1183                            .with_length(12)
1184                            .build()
1185                            .unwrap(),
1186                    ),
1187                    Arc::new(
1188                        types::Type::group_type_builder("nested")
1189                            .with_repetition(Repetition::REQUIRED)
1190                            .with_fields(vec![
1191                                Arc::new(
1192                                    types::Type::primitive_type_builder(
1193                                        "col3",
1194                                        Type::FIXED_LEN_BYTE_ARRAY,
1195                                    )
1196                                    .with_logical_type(Some(LogicalType::Float16))
1197                                    .with_length(2)
1198                                    .build()
1199                                    .unwrap(),
1200                                ),
1201                                Arc::new(
1202                                    types::Type::primitive_type_builder("col4", Type::BYTE_ARRAY)
1203                                        .with_logical_type(Some(LogicalType::String))
1204                                        .build()
1205                                        .unwrap(),
1206                                ),
1207                            ])
1208                            .build()
1209                            .unwrap(),
1210                    ),
1211                ])
1212                .build()
1213                .unwrap(),
1214        );
1215
1216        let props = Default::default();
1217        let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1218        writer.close().unwrap();
1219
1220        let reader = SerializedFileReader::new(file).unwrap();
1221
1222        // only leaves
1223        let expected = vec![
1224            // INT32
1225            ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED),
1226            // INTERVAL
1227            ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::UNDEFINED),
1228            // Float16
1229            ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED),
1230            // String
1231            ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::UNSIGNED),
1232        ];
1233        let actual = reader.metadata().file_metadata().column_orders();
1234
1235        assert!(actual.is_some());
1236        let actual = actual.unwrap();
1237        assert_eq!(*actual, expected);
1238    }
1239
1240    #[test]
1241    fn test_file_writer_with_metadata() {
1242        let file = tempfile::tempfile().unwrap();
1243
1244        let schema = Arc::new(
1245            types::Type::group_type_builder("schema")
1246                .with_fields(vec![Arc::new(
1247                    types::Type::primitive_type_builder("col1", Type::INT32)
1248                        .build()
1249                        .unwrap(),
1250                )])
1251                .build()
1252                .unwrap(),
1253        );
1254        let props = Arc::new(
1255            WriterProperties::builder()
1256                .set_key_value_metadata(Some(vec![KeyValue::new(
1257                    "key".to_string(),
1258                    "value".to_string(),
1259                )]))
1260                .build(),
1261        );
1262        let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1263        writer.close().unwrap();
1264
1265        let reader = SerializedFileReader::new(file).unwrap();
1266        assert_eq!(
1267            reader
1268                .metadata()
1269                .file_metadata()
1270                .key_value_metadata()
1271                .to_owned()
1272                .unwrap()
1273                .len(),
1274            1
1275        );
1276    }
1277
1278    #[test]
1279    fn test_file_writer_v2_with_metadata() {
1280        let file = tempfile::tempfile().unwrap();
1281        let field_logical_type = Some(LogicalType::Integer {
1282            bit_width: 8,
1283            is_signed: false,
1284        });
1285        let field = Arc::new(
1286            types::Type::primitive_type_builder("col1", Type::INT32)
1287                .with_logical_type(field_logical_type.clone())
1288                .with_converted_type(field_logical_type.into())
1289                .build()
1290                .unwrap(),
1291        );
1292        let schema = Arc::new(
1293            types::Type::group_type_builder("schema")
1294                .with_fields(vec![field.clone()])
1295                .build()
1296                .unwrap(),
1297        );
1298        let props = Arc::new(
1299            WriterProperties::builder()
1300                .set_key_value_metadata(Some(vec![KeyValue::new(
1301                    "key".to_string(),
1302                    "value".to_string(),
1303                )]))
1304                .set_writer_version(WriterVersion::PARQUET_2_0)
1305                .build(),
1306        );
1307        let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1308        writer.close().unwrap();
1309
1310        let reader = SerializedFileReader::new(file).unwrap();
1311
1312        assert_eq!(
1313            reader
1314                .metadata()
1315                .file_metadata()
1316                .key_value_metadata()
1317                .to_owned()
1318                .unwrap()
1319                .len(),
1320            1
1321        );
1322
1323        // ARROW-11803: Test that the converted and logical types have been populated
1324        let fields = reader.metadata().file_metadata().schema().get_fields();
1325        assert_eq!(fields.len(), 1);
1326        assert_eq!(fields[0], field);
1327    }
1328
1329    #[test]
1330    fn test_file_writer_with_sorting_columns_metadata() {
1331        let file = tempfile::tempfile().unwrap();
1332
1333        let schema = Arc::new(
1334            types::Type::group_type_builder("schema")
1335                .with_fields(vec![
1336                    Arc::new(
1337                        types::Type::primitive_type_builder("col1", Type::INT32)
1338                            .build()
1339                            .unwrap(),
1340                    ),
1341                    Arc::new(
1342                        types::Type::primitive_type_builder("col2", Type::INT32)
1343                            .build()
1344                            .unwrap(),
1345                    ),
1346                ])
1347                .build()
1348                .unwrap(),
1349        );
1350        let expected_result = Some(vec![SortingColumn {
1351            column_idx: 0,
1352            descending: false,
1353            nulls_first: true,
1354        }]);
1355        let props = Arc::new(
1356            WriterProperties::builder()
1357                .set_key_value_metadata(Some(vec![KeyValue::new(
1358                    "key".to_string(),
1359                    "value".to_string(),
1360                )]))
1361                .set_sorting_columns(expected_result.clone())
1362                .build(),
1363        );
1364        let mut writer =
1365            SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1366        let mut row_group_writer = writer.next_row_group().expect("get row group writer");
1367
1368        let col_writer = row_group_writer.next_column().unwrap().unwrap();
1369        col_writer.close().unwrap();
1370
1371        let col_writer = row_group_writer.next_column().unwrap().unwrap();
1372        col_writer.close().unwrap();
1373
1374        row_group_writer.close().unwrap();
1375        writer.close().unwrap();
1376
1377        let reader = SerializedFileReader::new(file).unwrap();
1378        let result: Vec<Option<&Vec<SortingColumn>>> = reader
1379            .metadata()
1380            .row_groups()
1381            .iter()
1382            .map(|f| f.sorting_columns())
1383            .collect();
1384        // validate the sorting column read match the one written above
1385        assert_eq!(expected_result.as_ref(), result[0]);
1386    }
1387
1388    #[test]
1389    fn test_file_writer_empty_row_groups() {
1390        let file = tempfile::tempfile().unwrap();
1391        test_file_roundtrip(file, vec![]);
1392    }
1393
1394    #[test]
1395    fn test_file_writer_single_row_group() {
1396        let file = tempfile::tempfile().unwrap();
1397        test_file_roundtrip(file, vec![vec![1, 2, 3, 4, 5]]);
1398    }
1399
1400    #[test]
1401    fn test_file_writer_multiple_row_groups() {
1402        let file = tempfile::tempfile().unwrap();
1403        test_file_roundtrip(
1404            file,
1405            vec![
1406                vec![1, 2, 3, 4, 5],
1407                vec![1, 2, 3],
1408                vec![1],
1409                vec![1, 2, 3, 4, 5, 6],
1410            ],
1411        );
1412    }
1413
1414    #[test]
1415    fn test_file_writer_multiple_large_row_groups() {
1416        let file = tempfile::tempfile().unwrap();
1417        test_file_roundtrip(
1418            file,
1419            vec![vec![123; 1024], vec![124; 1000], vec![125; 15], vec![]],
1420        );
1421    }
1422
1423    #[test]
1424    fn test_page_writer_data_pages() {
1425        let pages = [
1426            Page::DataPage {
1427                buf: Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]),
1428                num_values: 10,
1429                encoding: Encoding::DELTA_BINARY_PACKED,
1430                def_level_encoding: Encoding::RLE,
1431                rep_level_encoding: Encoding::RLE,
1432                statistics: Some(Statistics::int32(Some(1), Some(3), None, Some(7), true)),
1433            },
1434            Page::DataPageV2 {
1435                buf: Bytes::from(vec![4; 128]),
1436                num_values: 10,
1437                encoding: Encoding::DELTA_BINARY_PACKED,
1438                num_nulls: 2,
1439                num_rows: 12,
1440                def_levels_byte_len: 24,
1441                rep_levels_byte_len: 32,
1442                is_compressed: false,
1443                statistics: Some(Statistics::int32(Some(1), Some(3), None, Some(7), true)),
1444            },
1445        ];
1446
1447        test_page_roundtrip(&pages[..], Compression::SNAPPY, Type::INT32);
1448        test_page_roundtrip(&pages[..], Compression::UNCOMPRESSED, Type::INT32);
1449    }
1450
1451    #[test]
1452    fn test_page_writer_dict_pages() {
1453        let pages = [
1454            Page::DictionaryPage {
1455                buf: Bytes::from(vec![1, 2, 3, 4, 5]),
1456                num_values: 5,
1457                encoding: Encoding::RLE_DICTIONARY,
1458                is_sorted: false,
1459            },
1460            Page::DataPage {
1461                buf: Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]),
1462                num_values: 10,
1463                encoding: Encoding::DELTA_BINARY_PACKED,
1464                def_level_encoding: Encoding::RLE,
1465                rep_level_encoding: Encoding::RLE,
1466                statistics: Some(Statistics::int32(Some(1), Some(3), None, Some(7), true)),
1467            },
1468            Page::DataPageV2 {
1469                buf: Bytes::from(vec![4; 128]),
1470                num_values: 10,
1471                encoding: Encoding::DELTA_BINARY_PACKED,
1472                num_nulls: 2,
1473                num_rows: 12,
1474                def_levels_byte_len: 24,
1475                rep_levels_byte_len: 32,
1476                is_compressed: false,
1477                statistics: None,
1478            },
1479        ];
1480
1481        test_page_roundtrip(&pages[..], Compression::SNAPPY, Type::INT32);
1482        test_page_roundtrip(&pages[..], Compression::UNCOMPRESSED, Type::INT32);
1483    }
1484
1485    /// Tests writing and reading pages.
1486    /// Physical type is for statistics only, should match any defined statistics type in
1487    /// pages.
1488    fn test_page_roundtrip(pages: &[Page], codec: Compression, physical_type: Type) {
1489        let mut compressed_pages = vec![];
1490        let mut total_num_values = 0i64;
1491        let codec_options = CodecOptionsBuilder::default()
1492            .set_backward_compatible_lz4(false)
1493            .build();
1494        let mut compressor = create_codec(codec, &codec_options).unwrap();
1495
1496        for page in pages {
1497            let uncompressed_len = page.buffer().len();
1498
1499            let compressed_page = match *page {
1500                Page::DataPage {
1501                    ref buf,
1502                    num_values,
1503                    encoding,
1504                    def_level_encoding,
1505                    rep_level_encoding,
1506                    ref statistics,
1507                } => {
1508                    total_num_values += num_values as i64;
1509                    let output_buf = compress_helper(compressor.as_mut(), buf);
1510
1511                    Page::DataPage {
1512                        buf: Bytes::from(output_buf),
1513                        num_values,
1514                        encoding,
1515                        def_level_encoding,
1516                        rep_level_encoding,
1517                        statistics: from_thrift_page_stats(
1518                            physical_type,
1519                            page_stats_to_thrift(statistics.as_ref()),
1520                        )
1521                        .unwrap(),
1522                    }
1523                }
1524                Page::DataPageV2 {
1525                    ref buf,
1526                    num_values,
1527                    encoding,
1528                    num_nulls,
1529                    num_rows,
1530                    def_levels_byte_len,
1531                    rep_levels_byte_len,
1532                    ref statistics,
1533                    ..
1534                } => {
1535                    total_num_values += num_values as i64;
1536                    let offset = (def_levels_byte_len + rep_levels_byte_len) as usize;
1537                    let cmp_buf = compress_helper(compressor.as_mut(), &buf[offset..]);
1538                    let mut output_buf = Vec::from(&buf[..offset]);
1539                    output_buf.extend_from_slice(&cmp_buf[..]);
1540
1541                    Page::DataPageV2 {
1542                        buf: Bytes::from(output_buf),
1543                        num_values,
1544                        encoding,
1545                        num_nulls,
1546                        num_rows,
1547                        def_levels_byte_len,
1548                        rep_levels_byte_len,
1549                        is_compressed: compressor.is_some(),
1550                        statistics: from_thrift_page_stats(
1551                            physical_type,
1552                            page_stats_to_thrift(statistics.as_ref()),
1553                        )
1554                        .unwrap(),
1555                    }
1556                }
1557                Page::DictionaryPage {
1558                    ref buf,
1559                    num_values,
1560                    encoding,
1561                    is_sorted,
1562                } => {
1563                    let output_buf = compress_helper(compressor.as_mut(), buf);
1564
1565                    Page::DictionaryPage {
1566                        buf: Bytes::from(output_buf),
1567                        num_values,
1568                        encoding,
1569                        is_sorted,
1570                    }
1571                }
1572            };
1573
1574            let compressed_page = CompressedPage::new(compressed_page, uncompressed_len);
1575            compressed_pages.push(compressed_page);
1576        }
1577
1578        let mut buffer: Vec<u8> = vec![];
1579        let mut result_pages: Vec<Page> = vec![];
1580        {
1581            let mut writer = TrackedWrite::new(&mut buffer);
1582            let mut page_writer = SerializedPageWriter::new(&mut writer);
1583
1584            for page in compressed_pages {
1585                page_writer.write_page(page).unwrap();
1586            }
1587            page_writer.close().unwrap();
1588        }
1589        {
1590            let reader = bytes::Bytes::from(buffer);
1591
1592            let t = types::Type::primitive_type_builder("t", physical_type)
1593                .build()
1594                .unwrap();
1595
1596            let desc = ColumnDescriptor::new(Arc::new(t), 0, 0, ColumnPath::new(vec![]));
1597            let meta = ColumnChunkMetaData::builder(Arc::new(desc))
1598                .set_compression(codec)
1599                .set_total_compressed_size(reader.len() as i64)
1600                .set_num_values(total_num_values)
1601                .build()
1602                .unwrap();
1603
1604            let props = ReaderProperties::builder()
1605                .set_backward_compatible_lz4(false)
1606                .set_read_page_statistics(true)
1607                .build();
1608            let mut page_reader = SerializedPageReader::new_with_properties(
1609                Arc::new(reader),
1610                &meta,
1611                total_num_values as usize,
1612                None,
1613                Arc::new(props),
1614            )
1615            .unwrap();
1616
1617            while let Some(page) = page_reader.get_next_page().unwrap() {
1618                result_pages.push(page);
1619            }
1620        }
1621
1622        assert_eq!(result_pages.len(), pages.len());
1623        for i in 0..result_pages.len() {
1624            assert_page(&result_pages[i], &pages[i]);
1625        }
1626    }
1627
1628    /// Helper function to compress a slice
1629    fn compress_helper(compressor: Option<&mut Box<dyn Codec>>, data: &[u8]) -> Vec<u8> {
1630        let mut output_buf = vec![];
1631        if let Some(cmpr) = compressor {
1632            cmpr.compress(data, &mut output_buf).unwrap();
1633        } else {
1634            output_buf.extend_from_slice(data);
1635        }
1636        output_buf
1637    }
1638
1639    /// Check if pages match.
1640    fn assert_page(left: &Page, right: &Page) {
1641        assert_eq!(left.page_type(), right.page_type());
1642        assert_eq!(&left.buffer(), &right.buffer());
1643        assert_eq!(left.num_values(), right.num_values());
1644        assert_eq!(left.encoding(), right.encoding());
1645        assert_eq!(
1646            page_stats_to_thrift(left.statistics()),
1647            page_stats_to_thrift(right.statistics())
1648        );
1649    }
1650
1651    /// Tests roundtrip of i32 data written using `W` and read using `R`
1652    fn test_roundtrip_i32<W, R>(
1653        file: W,
1654        data: Vec<Vec<i32>>,
1655        compression: Compression,
1656    ) -> ParquetMetaData
1657    where
1658        W: Write + Send,
1659        R: ChunkReader + From<W> + 'static,
1660    {
1661        test_roundtrip::<W, R, Int32Type, _>(file, data, |r| r.get_int(0).unwrap(), compression)
1662    }
1663
1664    /// Tests roundtrip of data of type `D` written using `W` and read using `R`
1665    /// and the provided `values` function
1666    fn test_roundtrip<W, R, D, F>(
1667        mut file: W,
1668        data: Vec<Vec<D::T>>,
1669        value: F,
1670        compression: Compression,
1671    ) -> ParquetMetaData
1672    where
1673        W: Write + Send,
1674        R: ChunkReader + From<W> + 'static,
1675        D: DataType,
1676        F: Fn(Row) -> D::T,
1677    {
1678        let schema = Arc::new(
1679            types::Type::group_type_builder("schema")
1680                .with_fields(vec![Arc::new(
1681                    types::Type::primitive_type_builder("col1", D::get_physical_type())
1682                        .with_repetition(Repetition::REQUIRED)
1683                        .build()
1684                        .unwrap(),
1685                )])
1686                .build()
1687                .unwrap(),
1688        );
1689        let props = Arc::new(
1690            WriterProperties::builder()
1691                .set_compression(compression)
1692                .build(),
1693        );
1694        let mut file_writer = SerializedFileWriter::new(&mut file, schema, props).unwrap();
1695        let mut rows: i64 = 0;
1696
1697        for (idx, subset) in data.iter().enumerate() {
1698            let row_group_file_offset = file_writer.buf.bytes_written();
1699            let mut row_group_writer = file_writer.next_row_group().unwrap();
1700            if let Some(mut writer) = row_group_writer.next_column().unwrap() {
1701                rows += writer
1702                    .typed::<D>()
1703                    .write_batch(&subset[..], None, None)
1704                    .unwrap() as i64;
1705                writer.close().unwrap();
1706            }
1707            let last_group = row_group_writer.close().unwrap();
1708            let flushed = file_writer.flushed_row_groups();
1709            assert_eq!(flushed.len(), idx + 1);
1710            assert_eq!(Some(idx as i16), last_group.ordinal());
1711            assert_eq!(Some(row_group_file_offset as i64), last_group.file_offset());
1712            assert_eq!(&flushed[idx], last_group.as_ref());
1713        }
1714        let file_metadata = file_writer.close().unwrap();
1715
1716        let reader = SerializedFileReader::new(R::from(file)).unwrap();
1717        assert_eq!(reader.num_row_groups(), data.len());
1718        assert_eq!(
1719            reader.metadata().file_metadata().num_rows(),
1720            rows,
1721            "row count in metadata not equal to number of rows written"
1722        );
1723        for (i, item) in data.iter().enumerate().take(reader.num_row_groups()) {
1724            let row_group_reader = reader.get_row_group(i).unwrap();
1725            let iter = row_group_reader.get_row_iter(None).unwrap();
1726            let res: Vec<_> = iter.map(|row| row.unwrap()).map(&value).collect();
1727            let row_group_size = row_group_reader.metadata().total_byte_size();
1728            let uncompressed_size: i64 = row_group_reader
1729                .metadata()
1730                .columns()
1731                .iter()
1732                .map(|v| v.uncompressed_size())
1733                .sum();
1734            assert_eq!(row_group_size, uncompressed_size);
1735            assert_eq!(res, *item);
1736        }
1737        file_metadata
1738    }
1739
1740    /// File write-read roundtrip.
1741    /// `data` consists of arrays of values for each row group.
1742    fn test_file_roundtrip(file: File, data: Vec<Vec<i32>>) -> ParquetMetaData {
1743        test_roundtrip_i32::<File, File>(file, data, Compression::UNCOMPRESSED)
1744    }
1745
1746    #[test]
1747    fn test_bytes_writer_empty_row_groups() {
1748        test_bytes_roundtrip(vec![], Compression::UNCOMPRESSED);
1749    }
1750
1751    #[test]
1752    fn test_bytes_writer_single_row_group() {
1753        test_bytes_roundtrip(vec![vec![1, 2, 3, 4, 5]], Compression::UNCOMPRESSED);
1754    }
1755
1756    #[test]
1757    fn test_bytes_writer_multiple_row_groups() {
1758        test_bytes_roundtrip(
1759            vec![
1760                vec![1, 2, 3, 4, 5],
1761                vec![1, 2, 3],
1762                vec![1],
1763                vec![1, 2, 3, 4, 5, 6],
1764            ],
1765            Compression::UNCOMPRESSED,
1766        );
1767    }
1768
1769    #[test]
1770    fn test_bytes_writer_single_row_group_compressed() {
1771        test_bytes_roundtrip(vec![vec![1, 2, 3, 4, 5]], Compression::SNAPPY);
1772    }
1773
1774    #[test]
1775    fn test_bytes_writer_multiple_row_groups_compressed() {
1776        test_bytes_roundtrip(
1777            vec![
1778                vec![1, 2, 3, 4, 5],
1779                vec![1, 2, 3],
1780                vec![1],
1781                vec![1, 2, 3, 4, 5, 6],
1782            ],
1783            Compression::SNAPPY,
1784        );
1785    }
1786
1787    fn test_bytes_roundtrip(data: Vec<Vec<i32>>, compression: Compression) {
1788        test_roundtrip_i32::<Vec<u8>, Bytes>(Vec::with_capacity(1024), data, compression);
1789    }
1790
1791    #[test]
1792    fn test_boolean_roundtrip() {
1793        let my_bool_values: Vec<_> = (0..2049).map(|idx| idx % 2 == 0).collect();
1794        test_roundtrip::<Vec<u8>, Bytes, BoolType, _>(
1795            Vec::with_capacity(1024),
1796            vec![my_bool_values],
1797            |r| r.get_bool(0).unwrap(),
1798            Compression::UNCOMPRESSED,
1799        );
1800    }
1801
1802    #[test]
1803    fn test_boolean_compressed_roundtrip() {
1804        let my_bool_values: Vec<_> = (0..2049).map(|idx| idx % 2 == 0).collect();
1805        test_roundtrip::<Vec<u8>, Bytes, BoolType, _>(
1806            Vec::with_capacity(1024),
1807            vec![my_bool_values],
1808            |r| r.get_bool(0).unwrap(),
1809            Compression::SNAPPY,
1810        );
1811    }
1812
1813    #[test]
1814    fn test_column_offset_index_file() {
1815        let file = tempfile::tempfile().unwrap();
1816        let file_metadata = test_file_roundtrip(file, vec![vec![1, 2, 3, 4, 5]]);
1817        file_metadata.row_groups().iter().for_each(|row_group| {
1818            row_group.columns().iter().for_each(|column_chunk| {
1819                assert!(column_chunk.column_index_offset().is_some());
1820                assert!(column_chunk.column_index_length().is_some());
1821                assert!(column_chunk.offset_index_offset().is_some());
1822                assert!(column_chunk.offset_index_length().is_some());
1823            })
1824        });
1825    }
1826
1827    fn test_kv_metadata(initial_kv: Option<Vec<KeyValue>>, final_kv: Option<Vec<KeyValue>>) {
1828        let schema = Arc::new(
1829            types::Type::group_type_builder("schema")
1830                .with_fields(vec![Arc::new(
1831                    types::Type::primitive_type_builder("col1", Type::INT32)
1832                        .with_repetition(Repetition::REQUIRED)
1833                        .build()
1834                        .unwrap(),
1835                )])
1836                .build()
1837                .unwrap(),
1838        );
1839        let mut out = Vec::with_capacity(1024);
1840        let props = Arc::new(
1841            WriterProperties::builder()
1842                .set_key_value_metadata(initial_kv.clone())
1843                .build(),
1844        );
1845        let mut writer = SerializedFileWriter::new(&mut out, schema, props).unwrap();
1846        let mut row_group_writer = writer.next_row_group().unwrap();
1847        let column = row_group_writer.next_column().unwrap().unwrap();
1848        column.close().unwrap();
1849        row_group_writer.close().unwrap();
1850        if let Some(kvs) = &final_kv {
1851            for kv in kvs {
1852                writer.append_key_value_metadata(kv.clone())
1853            }
1854        }
1855        writer.close().unwrap();
1856
1857        let reader = SerializedFileReader::new(Bytes::from(out)).unwrap();
1858        let metadata = reader.metadata().file_metadata();
1859        let keys = metadata.key_value_metadata();
1860
1861        match (initial_kv, final_kv) {
1862            (Some(a), Some(b)) => {
1863                let keys = keys.unwrap();
1864                assert_eq!(keys.len(), a.len() + b.len());
1865                assert_eq!(&keys[..a.len()], a.as_slice());
1866                assert_eq!(&keys[a.len()..], b.as_slice());
1867            }
1868            (Some(v), None) => assert_eq!(keys.unwrap(), &v),
1869            (None, Some(v)) if !v.is_empty() => assert_eq!(keys.unwrap(), &v),
1870            _ => assert!(keys.is_none()),
1871        }
1872    }
1873
1874    #[test]
1875    fn test_append_metadata() {
1876        let kv1 = KeyValue::new("cupcakes".to_string(), "awesome".to_string());
1877        let kv2 = KeyValue::new("bingo".to_string(), "bongo".to_string());
1878
1879        test_kv_metadata(None, None);
1880        test_kv_metadata(Some(vec![kv1.clone()]), None);
1881        test_kv_metadata(None, Some(vec![kv2.clone()]));
1882        test_kv_metadata(Some(vec![kv1.clone()]), Some(vec![kv2.clone()]));
1883        test_kv_metadata(Some(vec![]), Some(vec![kv2]));
1884        test_kv_metadata(Some(vec![]), Some(vec![]));
1885        test_kv_metadata(Some(vec![kv1]), Some(vec![]));
1886        test_kv_metadata(None, Some(vec![]));
1887    }
1888
1889    #[test]
1890    fn test_backwards_compatible_statistics() {
1891        let message_type = "
1892            message test_schema {
1893                REQUIRED INT32 decimal1 (DECIMAL(8,2));
1894                REQUIRED INT32 i32 (INTEGER(32,true));
1895                REQUIRED INT32 u32 (INTEGER(32,false));
1896            }
1897        ";
1898
1899        let schema = Arc::new(parse_message_type(message_type).unwrap());
1900        let props = Default::default();
1901        let mut writer = SerializedFileWriter::new(vec![], schema, props).unwrap();
1902        let mut row_group_writer = writer.next_row_group().unwrap();
1903
1904        for _ in 0..3 {
1905            let mut writer = row_group_writer.next_column().unwrap().unwrap();
1906            writer
1907                .typed::<Int32Type>()
1908                .write_batch(&[1, 2, 3], None, None)
1909                .unwrap();
1910            writer.close().unwrap();
1911        }
1912        let metadata = row_group_writer.close().unwrap();
1913        writer.close().unwrap();
1914
1915        // decimal
1916        let s = page_stats_to_thrift(metadata.column(0).statistics()).unwrap();
1917        assert_eq!(s.min.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1918        assert_eq!(s.max.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1919        assert_eq!(s.min_value.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1920        assert_eq!(s.max_value.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1921
1922        // i32
1923        let s = page_stats_to_thrift(metadata.column(1).statistics()).unwrap();
1924        assert_eq!(s.min.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1925        assert_eq!(s.max.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1926        assert_eq!(s.min_value.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1927        assert_eq!(s.max_value.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1928
1929        // u32
1930        let s = page_stats_to_thrift(metadata.column(2).statistics()).unwrap();
1931        assert_eq!(s.min.as_deref(), None);
1932        assert_eq!(s.max.as_deref(), None);
1933        assert_eq!(s.min_value.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1934        assert_eq!(s.max_value.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1935    }
1936
1937    #[test]
1938    fn test_spliced_write() {
1939        let message_type = "
1940            message test_schema {
1941                REQUIRED INT32 i32 (INTEGER(32,true));
1942                REQUIRED INT32 u32 (INTEGER(32,false));
1943            }
1944        ";
1945        let schema = Arc::new(parse_message_type(message_type).unwrap());
1946        let props = Arc::new(WriterProperties::builder().build());
1947
1948        let mut file = Vec::with_capacity(1024);
1949        let mut file_writer = SerializedFileWriter::new(&mut file, schema, props.clone()).unwrap();
1950
1951        let columns = file_writer.descr.columns();
1952        let mut column_state: Vec<(_, Option<ColumnCloseResult>)> = columns
1953            .iter()
1954            .map(|_| (TrackedWrite::new(Vec::with_capacity(1024)), None))
1955            .collect();
1956
1957        let mut column_state_slice = column_state.as_mut_slice();
1958        let mut column_writers = Vec::with_capacity(columns.len());
1959        for c in columns {
1960            let ((buf, out), tail) = column_state_slice.split_first_mut().unwrap();
1961            column_state_slice = tail;
1962
1963            let page_writer = Box::new(SerializedPageWriter::new(buf));
1964            let col_writer = get_column_writer(c.clone(), props.clone(), page_writer);
1965            column_writers.push(SerializedColumnWriter::new(
1966                col_writer,
1967                Some(Box::new(|on_close| {
1968                    *out = Some(on_close);
1969                    Ok(())
1970                })),
1971            ));
1972        }
1973
1974        let column_data = [[1, 2, 3, 4], [7, 3, 7, 3]];
1975
1976        // Interleaved writing to the column writers
1977        for (writer, batch) in column_writers.iter_mut().zip(column_data) {
1978            let writer = writer.typed::<Int32Type>();
1979            writer.write_batch(&batch, None, None).unwrap();
1980        }
1981
1982        // Close the column writers
1983        for writer in column_writers {
1984            writer.close().unwrap()
1985        }
1986
1987        // Splice column data into a row group
1988        let mut row_group_writer = file_writer.next_row_group().unwrap();
1989        for (write, close) in column_state {
1990            let buf = Bytes::from(write.into_inner().unwrap());
1991            row_group_writer
1992                .append_column(&buf, close.unwrap())
1993                .unwrap();
1994        }
1995        row_group_writer.close().unwrap();
1996        file_writer.close().unwrap();
1997
1998        // Check data was written correctly
1999        let file = Bytes::from(file);
2000        let test_read = |reader: SerializedFileReader<Bytes>| {
2001            let row_group = reader.get_row_group(0).unwrap();
2002
2003            let mut out = Vec::with_capacity(4);
2004            let c1 = row_group.get_column_reader(0).unwrap();
2005            let mut c1 = get_typed_column_reader::<Int32Type>(c1);
2006            c1.read_records(4, None, None, &mut out).unwrap();
2007            assert_eq!(out, column_data[0]);
2008
2009            out.clear();
2010
2011            let c2 = row_group.get_column_reader(1).unwrap();
2012            let mut c2 = get_typed_column_reader::<Int32Type>(c2);
2013            c2.read_records(4, None, None, &mut out).unwrap();
2014            assert_eq!(out, column_data[1]);
2015        };
2016
2017        let reader = SerializedFileReader::new(file.clone()).unwrap();
2018        test_read(reader);
2019
2020        let options = ReadOptionsBuilder::new().with_page_index().build();
2021        let reader = SerializedFileReader::new_with_options(file, options).unwrap();
2022        test_read(reader);
2023    }
2024
2025    #[test]
2026    fn test_disabled_statistics() {
2027        let message_type = "
2028            message test_schema {
2029                REQUIRED INT32 a;
2030                REQUIRED INT32 b;
2031            }
2032        ";
2033        let schema = Arc::new(parse_message_type(message_type).unwrap());
2034        let props = WriterProperties::builder()
2035            .set_statistics_enabled(EnabledStatistics::None)
2036            .set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
2037            .set_offset_index_disabled(true) // this should be ignored because of the line above
2038            .build();
2039        let mut file = Vec::with_capacity(1024);
2040        let mut file_writer =
2041            SerializedFileWriter::new(&mut file, schema, Arc::new(props)).unwrap();
2042
2043        let mut row_group_writer = file_writer.next_row_group().unwrap();
2044        let mut a_writer = row_group_writer.next_column().unwrap().unwrap();
2045        let col_writer = a_writer.typed::<Int32Type>();
2046        col_writer.write_batch(&[1, 2, 3], None, None).unwrap();
2047        a_writer.close().unwrap();
2048
2049        let mut b_writer = row_group_writer.next_column().unwrap().unwrap();
2050        let col_writer = b_writer.typed::<Int32Type>();
2051        col_writer.write_batch(&[4, 5, 6], None, None).unwrap();
2052        b_writer.close().unwrap();
2053        row_group_writer.close().unwrap();
2054
2055        let metadata = file_writer.finish().unwrap();
2056        assert_eq!(metadata.num_row_groups(), 1);
2057        let row_group = metadata.row_group(0);
2058        assert_eq!(row_group.num_columns(), 2);
2059        // Column "a" has both offset and column index, as requested
2060        assert!(row_group.column(0).offset_index_offset().is_some());
2061        assert!(row_group.column(0).column_index_offset().is_some());
2062        // Column "b" should only have offset index
2063        assert!(row_group.column(1).offset_index_offset().is_some());
2064        assert!(row_group.column(1).column_index_offset().is_none());
2065
2066        let err = file_writer.next_row_group().err().unwrap().to_string();
2067        assert_eq!(err, "Parquet error: SerializedFileWriter already finished");
2068
2069        drop(file_writer);
2070
2071        let options = ReadOptionsBuilder::new().with_page_index().build();
2072        let reader = SerializedFileReader::new_with_options(Bytes::from(file), options).unwrap();
2073
2074        let offset_index = reader.metadata().offset_index().unwrap();
2075        assert_eq!(offset_index.len(), 1); // 1 row group
2076        assert_eq!(offset_index[0].len(), 2); // 2 columns
2077
2078        let column_index = reader.metadata().column_index().unwrap();
2079        assert_eq!(column_index.len(), 1); // 1 row group
2080        assert_eq!(column_index[0].len(), 2); // 2 column
2081
2082        let a_idx = &column_index[0][0];
2083        assert!(matches!(a_idx, ColumnIndexMetaData::INT32(_)), "{a_idx:?}");
2084        let b_idx = &column_index[0][1];
2085        assert!(matches!(b_idx, ColumnIndexMetaData::NONE), "{b_idx:?}");
2086    }
2087
2088    #[test]
2089    fn test_byte_array_size_statistics() {
2090        let message_type = "
2091            message test_schema {
2092                OPTIONAL BYTE_ARRAY a (UTF8);
2093            }
2094        ";
2095        let schema = Arc::new(parse_message_type(message_type).unwrap());
2096        let data = ByteArrayType::gen_vec(32, 7);
2097        let def_levels = [1, 1, 1, 1, 0, 1, 0, 1, 0, 1];
2098        let unenc_size: i64 = data.iter().map(|x| x.len() as i64).sum();
2099        let file: File = tempfile::tempfile().unwrap();
2100        let props = Arc::new(
2101            WriterProperties::builder()
2102                .set_statistics_enabled(EnabledStatistics::Page)
2103                .build(),
2104        );
2105
2106        let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
2107        let mut row_group_writer = writer.next_row_group().unwrap();
2108
2109        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
2110        col_writer
2111            .typed::<ByteArrayType>()
2112            .write_batch(&data, Some(&def_levels), None)
2113            .unwrap();
2114        col_writer.close().unwrap();
2115        row_group_writer.close().unwrap();
2116        let file_metadata = writer.close().unwrap();
2117
2118        assert_eq!(file_metadata.num_row_groups(), 1);
2119        assert_eq!(file_metadata.row_group(0).num_columns(), 1);
2120
2121        let check_def_hist = |def_hist: &[i64]| {
2122            assert_eq!(def_hist.len(), 2);
2123            assert_eq!(def_hist[0], 3);
2124            assert_eq!(def_hist[1], 7);
2125        };
2126
2127        let meta_data = file_metadata.row_group(0).column(0);
2128
2129        assert!(meta_data.repetition_level_histogram().is_none());
2130        assert!(meta_data.definition_level_histogram().is_some());
2131        assert!(meta_data.unencoded_byte_array_data_bytes().is_some());
2132        assert_eq!(
2133            unenc_size,
2134            meta_data.unencoded_byte_array_data_bytes().unwrap()
2135        );
2136        check_def_hist(meta_data.definition_level_histogram().unwrap().values());
2137
2138        // check that the read metadata is also correct
2139        let options = ReadOptionsBuilder::new().with_page_index().build();
2140        let reader = SerializedFileReader::new_with_options(file, options).unwrap();
2141
2142        let rfile_metadata = reader.metadata().file_metadata();
2143        assert_eq!(
2144            rfile_metadata.num_rows(),
2145            file_metadata.file_metadata().num_rows()
2146        );
2147        assert_eq!(reader.num_row_groups(), 1);
2148        let rowgroup = reader.get_row_group(0).unwrap();
2149        assert_eq!(rowgroup.num_columns(), 1);
2150        let column = rowgroup.metadata().column(0);
2151        assert!(column.definition_level_histogram().is_some());
2152        assert!(column.repetition_level_histogram().is_none());
2153        assert!(column.unencoded_byte_array_data_bytes().is_some());
2154        check_def_hist(column.definition_level_histogram().unwrap().values());
2155        assert_eq!(
2156            unenc_size,
2157            column.unencoded_byte_array_data_bytes().unwrap()
2158        );
2159
2160        // check histogram in column index as well
2161        assert!(reader.metadata().column_index().is_some());
2162        let column_index = reader.metadata().column_index().unwrap();
2163        assert_eq!(column_index.len(), 1);
2164        assert_eq!(column_index[0].len(), 1);
2165        let col_idx = if let ColumnIndexMetaData::BYTE_ARRAY(index) = &column_index[0][0] {
2166            assert_eq!(index.num_pages(), 1);
2167            index
2168        } else {
2169            unreachable!()
2170        };
2171
2172        assert!(col_idx.repetition_level_histogram(0).is_none());
2173        assert!(col_idx.definition_level_histogram(0).is_some());
2174        check_def_hist(col_idx.definition_level_histogram(0).unwrap());
2175
2176        assert!(reader.metadata().offset_index().is_some());
2177        let offset_index = reader.metadata().offset_index().unwrap();
2178        assert_eq!(offset_index.len(), 1);
2179        assert_eq!(offset_index[0].len(), 1);
2180        assert!(offset_index[0][0].unencoded_byte_array_data_bytes.is_some());
2181        let page_sizes = offset_index[0][0]
2182            .unencoded_byte_array_data_bytes
2183            .as_ref()
2184            .unwrap();
2185        assert_eq!(page_sizes.len(), 1);
2186        assert_eq!(page_sizes[0], unenc_size);
2187    }
2188
2189    #[test]
2190    fn test_too_many_rowgroups() {
2191        let message_type = "
2192            message test_schema {
2193                REQUIRED BYTE_ARRAY a (UTF8);
2194            }
2195        ";
2196        let schema = Arc::new(parse_message_type(message_type).unwrap());
2197        let file: File = tempfile::tempfile().unwrap();
2198        let props = Arc::new(
2199            WriterProperties::builder()
2200                .set_statistics_enabled(EnabledStatistics::None)
2201                .set_max_row_group_size(1)
2202                .build(),
2203        );
2204        let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
2205
2206        // Create 32k empty rowgroups. Should error when i == 32768.
2207        for i in 0..0x8001 {
2208            match writer.next_row_group() {
2209                Ok(mut row_group_writer) => {
2210                    assert_ne!(i, 0x8000);
2211                    let col_writer = row_group_writer.next_column().unwrap().unwrap();
2212                    col_writer.close().unwrap();
2213                    row_group_writer.close().unwrap();
2214                }
2215                Err(e) => {
2216                    assert_eq!(i, 0x8000);
2217                    assert_eq!(
2218                        e.to_string(),
2219                        "Parquet error: Parquet does not support more than 32767 row groups per file (currently: 32768)"
2220                    );
2221                }
2222            }
2223        }
2224        writer.close().unwrap();
2225    }
2226
2227    #[test]
2228    fn test_size_statistics_with_repetition_and_nulls() {
2229        let message_type = "
2230            message test_schema {
2231                OPTIONAL group i32_list (LIST) {
2232                    REPEATED group list {
2233                        OPTIONAL INT32 element;
2234                    }
2235                }
2236            }
2237        ";
2238        // column is:
2239        // row 0: [1, 2]
2240        // row 1: NULL
2241        // row 2: [4, NULL]
2242        // row 3: []
2243        // row 4: [7, 8, 9, 10]
2244        let schema = Arc::new(parse_message_type(message_type).unwrap());
2245        let data = [1, 2, 4, 7, 8, 9, 10];
2246        let def_levels = [3, 3, 0, 3, 2, 1, 3, 3, 3, 3];
2247        let rep_levels = [0, 1, 0, 0, 1, 0, 0, 1, 1, 1];
2248        let file = tempfile::tempfile().unwrap();
2249        let props = Arc::new(
2250            WriterProperties::builder()
2251                .set_statistics_enabled(EnabledStatistics::Page)
2252                .build(),
2253        );
2254        let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
2255        let mut row_group_writer = writer.next_row_group().unwrap();
2256
2257        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
2258        col_writer
2259            .typed::<Int32Type>()
2260            .write_batch(&data, Some(&def_levels), Some(&rep_levels))
2261            .unwrap();
2262        col_writer.close().unwrap();
2263        row_group_writer.close().unwrap();
2264        let file_metadata = writer.close().unwrap();
2265
2266        assert_eq!(file_metadata.num_row_groups(), 1);
2267        assert_eq!(file_metadata.row_group(0).num_columns(), 1);
2268
2269        let check_def_hist = |def_hist: &[i64]| {
2270            assert_eq!(def_hist.len(), 4);
2271            assert_eq!(def_hist[0], 1);
2272            assert_eq!(def_hist[1], 1);
2273            assert_eq!(def_hist[2], 1);
2274            assert_eq!(def_hist[3], 7);
2275        };
2276
2277        let check_rep_hist = |rep_hist: &[i64]| {
2278            assert_eq!(rep_hist.len(), 2);
2279            assert_eq!(rep_hist[0], 5);
2280            assert_eq!(rep_hist[1], 5);
2281        };
2282
2283        // check that histograms are set properly in the write and read metadata
2284        // also check that unencoded_byte_array_data_bytes is not set
2285        let meta_data = file_metadata.row_group(0).column(0);
2286        assert!(meta_data.repetition_level_histogram().is_some());
2287        assert!(meta_data.definition_level_histogram().is_some());
2288        assert!(meta_data.unencoded_byte_array_data_bytes().is_none());
2289        check_def_hist(meta_data.definition_level_histogram().unwrap().values());
2290        check_rep_hist(meta_data.repetition_level_histogram().unwrap().values());
2291
2292        // check that the read metadata is also correct
2293        let options = ReadOptionsBuilder::new().with_page_index().build();
2294        let reader = SerializedFileReader::new_with_options(file, options).unwrap();
2295
2296        let rfile_metadata = reader.metadata().file_metadata();
2297        assert_eq!(
2298            rfile_metadata.num_rows(),
2299            file_metadata.file_metadata().num_rows()
2300        );
2301        assert_eq!(reader.num_row_groups(), 1);
2302        let rowgroup = reader.get_row_group(0).unwrap();
2303        assert_eq!(rowgroup.num_columns(), 1);
2304        let column = rowgroup.metadata().column(0);
2305        assert!(column.definition_level_histogram().is_some());
2306        assert!(column.repetition_level_histogram().is_some());
2307        assert!(column.unencoded_byte_array_data_bytes().is_none());
2308        check_def_hist(column.definition_level_histogram().unwrap().values());
2309        check_rep_hist(column.repetition_level_histogram().unwrap().values());
2310
2311        // check histogram in column index as well
2312        assert!(reader.metadata().column_index().is_some());
2313        let column_index = reader.metadata().column_index().unwrap();
2314        assert_eq!(column_index.len(), 1);
2315        assert_eq!(column_index[0].len(), 1);
2316        let col_idx = if let ColumnIndexMetaData::INT32(index) = &column_index[0][0] {
2317            assert_eq!(index.num_pages(), 1);
2318            index
2319        } else {
2320            unreachable!()
2321        };
2322
2323        check_def_hist(col_idx.definition_level_histogram(0).unwrap());
2324        check_rep_hist(col_idx.repetition_level_histogram(0).unwrap());
2325
2326        assert!(reader.metadata().offset_index().is_some());
2327        let offset_index = reader.metadata().offset_index().unwrap();
2328        assert_eq!(offset_index.len(), 1);
2329        assert_eq!(offset_index[0].len(), 1);
2330        assert!(offset_index[0][0].unencoded_byte_array_data_bytes.is_none());
2331    }
2332
2333    #[test]
2334    #[cfg(feature = "arrow")]
2335    fn test_byte_stream_split_extended_roundtrip() {
2336        let path = format!(
2337            "{}/byte_stream_split_extended.gzip.parquet",
2338            arrow::util::test_util::parquet_test_data(),
2339        );
2340        let file = File::open(path).unwrap();
2341
2342        // Read in test file and rewrite to tmp
2343        let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(file)
2344            .expect("parquet open")
2345            .build()
2346            .expect("parquet open");
2347
2348        let file = tempfile::tempfile().unwrap();
2349        let props = WriterProperties::builder()
2350            .set_dictionary_enabled(false)
2351            .set_column_encoding(
2352                ColumnPath::from("float16_byte_stream_split"),
2353                Encoding::BYTE_STREAM_SPLIT,
2354            )
2355            .set_column_encoding(
2356                ColumnPath::from("float_byte_stream_split"),
2357                Encoding::BYTE_STREAM_SPLIT,
2358            )
2359            .set_column_encoding(
2360                ColumnPath::from("double_byte_stream_split"),
2361                Encoding::BYTE_STREAM_SPLIT,
2362            )
2363            .set_column_encoding(
2364                ColumnPath::from("int32_byte_stream_split"),
2365                Encoding::BYTE_STREAM_SPLIT,
2366            )
2367            .set_column_encoding(
2368                ColumnPath::from("int64_byte_stream_split"),
2369                Encoding::BYTE_STREAM_SPLIT,
2370            )
2371            .set_column_encoding(
2372                ColumnPath::from("flba5_byte_stream_split"),
2373                Encoding::BYTE_STREAM_SPLIT,
2374            )
2375            .set_column_encoding(
2376                ColumnPath::from("decimal_byte_stream_split"),
2377                Encoding::BYTE_STREAM_SPLIT,
2378            )
2379            .build();
2380
2381        let mut parquet_writer = ArrowWriter::try_new(
2382            file.try_clone().expect("cannot open file"),
2383            parquet_reader.schema(),
2384            Some(props),
2385        )
2386        .expect("create arrow writer");
2387
2388        for maybe_batch in parquet_reader {
2389            let batch = maybe_batch.expect("reading batch");
2390            parquet_writer.write(&batch).expect("writing data");
2391        }
2392
2393        parquet_writer.close().expect("finalizing file");
2394
2395        let reader = SerializedFileReader::new(file).expect("Failed to create reader");
2396        let filemeta = reader.metadata();
2397
2398        // Make sure byte_stream_split encoding was used
2399        let check_encoding = |x: usize, filemeta: &ParquetMetaData| {
2400            assert!(
2401                filemeta
2402                    .row_group(0)
2403                    .column(x)
2404                    .encodings()
2405                    .collect::<Vec<_>>()
2406                    .contains(&Encoding::BYTE_STREAM_SPLIT)
2407            );
2408        };
2409
2410        check_encoding(1, filemeta);
2411        check_encoding(3, filemeta);
2412        check_encoding(5, filemeta);
2413        check_encoding(7, filemeta);
2414        check_encoding(9, filemeta);
2415        check_encoding(11, filemeta);
2416        check_encoding(13, filemeta);
2417
2418        // Read back tmpfile and make sure all values are correct
2419        let mut iter = reader
2420            .get_row_iter(None)
2421            .expect("Failed to create row iterator");
2422
2423        let mut start = 0;
2424        let end = reader.metadata().file_metadata().num_rows();
2425
2426        let check_row = |row: Result<Row, ParquetError>| {
2427            assert!(row.is_ok());
2428            let r = row.unwrap();
2429            assert_eq!(r.get_float16(0).unwrap(), r.get_float16(1).unwrap());
2430            assert_eq!(r.get_float(2).unwrap(), r.get_float(3).unwrap());
2431            assert_eq!(r.get_double(4).unwrap(), r.get_double(5).unwrap());
2432            assert_eq!(r.get_int(6).unwrap(), r.get_int(7).unwrap());
2433            assert_eq!(r.get_long(8).unwrap(), r.get_long(9).unwrap());
2434            assert_eq!(r.get_bytes(10).unwrap(), r.get_bytes(11).unwrap());
2435            assert_eq!(r.get_decimal(12).unwrap(), r.get_decimal(13).unwrap());
2436        };
2437
2438        while start < end {
2439            match iter.next() {
2440                Some(row) => check_row(row),
2441                None => break,
2442            };
2443            start += 1;
2444        }
2445    }
2446
2447    #[test]
2448    fn test_rewrite_no_page_indexes() {
2449        let file = get_test_file("alltypes_tiny_pages.parquet");
2450        let metadata = ParquetMetaDataReader::new()
2451            .with_page_index_policy(PageIndexPolicy::Optional)
2452            .parse_and_finish(&file)
2453            .unwrap();
2454
2455        let props = Arc::new(WriterProperties::builder().build());
2456        let schema = metadata.file_metadata().schema_descr().root_schema_ptr();
2457        let output = Vec::<u8>::new();
2458        let mut writer = SerializedFileWriter::new(output, schema, props).unwrap();
2459
2460        for rg in metadata.row_groups() {
2461            let mut rg_out = writer.next_row_group().unwrap();
2462            for column in rg.columns() {
2463                let result = ColumnCloseResult {
2464                    bytes_written: column.compressed_size() as _,
2465                    rows_written: rg.num_rows() as _,
2466                    metadata: column.clone(),
2467                    bloom_filter: None,
2468                    column_index: None,
2469                    offset_index: None,
2470                };
2471                rg_out.append_column(&file, result).unwrap();
2472            }
2473            rg_out.close().unwrap();
2474        }
2475        writer.close().unwrap();
2476    }
2477
2478    #[test]
2479    fn test_rewrite_missing_column_index() {
2480        // this file has an INT96 column that lacks a column index entry
2481        let file = get_test_file("alltypes_tiny_pages.parquet");
2482        let metadata = ParquetMetaDataReader::new()
2483            .with_page_index_policy(PageIndexPolicy::Optional)
2484            .parse_and_finish(&file)
2485            .unwrap();
2486
2487        let props = Arc::new(WriterProperties::builder().build());
2488        let schema = metadata.file_metadata().schema_descr().root_schema_ptr();
2489        let output = Vec::<u8>::new();
2490        let mut writer = SerializedFileWriter::new(output, schema, props).unwrap();
2491
2492        let column_indexes = metadata.column_index();
2493        let offset_indexes = metadata.offset_index();
2494
2495        for (rg_idx, rg) in metadata.row_groups().iter().enumerate() {
2496            let rg_column_indexes = column_indexes.and_then(|ci| ci.get(rg_idx));
2497            let rg_offset_indexes = offset_indexes.and_then(|oi| oi.get(rg_idx));
2498            let mut rg_out = writer.next_row_group().unwrap();
2499            for (col_idx, column) in rg.columns().iter().enumerate() {
2500                let column_index = rg_column_indexes.and_then(|row| row.get(col_idx)).cloned();
2501                let offset_index = rg_offset_indexes.and_then(|row| row.get(col_idx)).cloned();
2502                let result = ColumnCloseResult {
2503                    bytes_written: column.compressed_size() as _,
2504                    rows_written: rg.num_rows() as _,
2505                    metadata: column.clone(),
2506                    bloom_filter: None,
2507                    column_index,
2508                    offset_index,
2509                };
2510                rg_out.append_column(&file, result).unwrap();
2511            }
2512            rg_out.close().unwrap();
2513        }
2514        writer.close().unwrap();
2515    }
2516}