Skip to main content

parquet/file/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`SerializedFileWriter`]: Low level Parquet writer API
19
20use crate::bloom_filter::Sbbf;
21use crate::file::metadata::thrift::PageHeader;
22use crate::file::page_index::column_index::ColumnIndexMetaData;
23use crate::file::page_index::offset_index::OffsetIndexMetaData;
24use crate::parquet_thrift::{ThriftCompactOutputProtocol, WriteThrift};
25use std::fmt::Debug;
26use std::io::{BufWriter, IoSlice, Read};
27use std::{io::Write, sync::Arc};
28
29use crate::column::page_encryption::PageEncryptor;
30use crate::column::writer::{ColumnCloseResult, ColumnWriterImpl, get_typed_column_writer_mut};
31use crate::column::{
32    page::{CompressedPage, PageWriteSpec, PageWriter},
33    writer::{ColumnWriter, get_column_writer},
34};
35use crate::data_type::DataType;
36#[cfg(feature = "encryption")]
37use crate::encryption::encrypt::{
38    FileEncryptionProperties, FileEncryptor, get_column_crypto_metadata,
39};
40use crate::errors::{ParquetError, Result};
41#[cfg(feature = "encryption")]
42use crate::file::PARQUET_MAGIC_ENCR_FOOTER;
43use crate::file::properties::{BloomFilterPosition, WriterPropertiesPtr};
44use crate::file::reader::ChunkReader;
45use crate::file::{PARQUET_MAGIC, metadata::*};
46use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor, TypePtr};
47
48/// A wrapper around a [`Write`] that keeps track of the number
49/// of bytes that have been written. The given [`Write`] is wrapped
50/// with a [`BufWriter`] to optimize writing performance.
51pub struct TrackedWrite<W: Write> {
52    inner: BufWriter<W>,
53    bytes_written: usize,
54}
55
56impl<W: Write> TrackedWrite<W> {
57    /// Create a new [`TrackedWrite`] from a [`Write`]
58    pub fn new(inner: W) -> Self {
59        let buf_write = BufWriter::new(inner);
60        Self {
61            inner: buf_write,
62            bytes_written: 0,
63        }
64    }
65
66    /// Returns the number of bytes written to this instance
67    pub fn bytes_written(&self) -> usize {
68        self.bytes_written
69    }
70
71    /// Returns a reference to the underlying writer.
72    pub fn inner(&self) -> &W {
73        self.inner.get_ref()
74    }
75
76    /// Returns a mutable reference to the underlying writer.
77    ///
78    /// It is inadvisable to directly write to the underlying writer, doing so
79    /// will likely result in data corruption
80    pub fn inner_mut(&mut self) -> &mut W {
81        self.inner.get_mut()
82    }
83
84    /// Returns the underlying writer.
85    pub fn into_inner(self) -> Result<W> {
86        self.inner.into_inner().map_err(|err| {
87            ParquetError::General(format!("fail to get inner writer: {:?}", err.to_string()))
88        })
89    }
90}
91
92impl<W: Write> Write for TrackedWrite<W> {
93    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
94        let bytes = self.inner.write(buf)?;
95        self.bytes_written += bytes;
96        Ok(bytes)
97    }
98
99    fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> std::io::Result<usize> {
100        let bytes = self.inner.write_vectored(bufs)?;
101        self.bytes_written += bytes;
102        Ok(bytes)
103    }
104
105    fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
106        self.inner.write_all(buf)?;
107        self.bytes_written += buf.len();
108
109        Ok(())
110    }
111
112    fn flush(&mut self) -> std::io::Result<()> {
113        self.inner.flush()
114    }
115}
116
117/// Callback invoked on closing a column chunk
118pub type OnCloseColumnChunk<'a> = Box<dyn FnOnce(ColumnCloseResult) -> Result<()> + 'a>;
119
120/// Callback invoked on closing a row group, arguments are:
121///
122/// - the row group metadata
123/// - the column index for each column chunk
124/// - the offset index for each column chunk
125pub type OnCloseRowGroup<'a, W> = Box<
126    dyn FnOnce(
127            &'a mut TrackedWrite<W>,
128            RowGroupMetaData,
129            Vec<Option<Sbbf>>,
130            Vec<Option<ColumnIndexMetaData>>,
131            Vec<Option<OffsetIndexMetaData>>,
132        ) -> Result<()>
133        + 'a
134        + Send,
135>;
136
137// ----------------------------------------------------------------------
138// Serialized impl for file & row group writers
139
140/// Parquet file writer API.
141///
142/// This is a low level API for writing Parquet files directly, and handles
143/// tracking the location of file structures such as row groups and column
144/// chunks, and writing the metadata and file footer.
145///
146/// Data is written to row groups using  [`SerializedRowGroupWriter`] and
147/// columns using [`SerializedColumnWriter`]. The `SerializedFileWriter` tracks
148/// where all the data is written, and assembles the final file metadata.
149///
150/// The main workflow should be as following:
151/// - Create file writer, this will open a new file and potentially write some metadata.
152/// - Request a new row group writer by calling `next_row_group`.
153/// - Once finished writing row group, close row group writer by calling `close`
154/// - Write subsequent row groups, if necessary.
155/// - After all row groups have been written, close the file writer using `close` method.
156pub struct SerializedFileWriter<W: Write> {
157    buf: TrackedWrite<W>,
158    descr: SchemaDescPtr,
159    props: WriterPropertiesPtr,
160    row_groups: Vec<RowGroupMetaData>,
161    bloom_filters: Vec<Vec<Option<Sbbf>>>,
162    column_indexes: Vec<Vec<Option<ColumnIndexMetaData>>>,
163    offset_indexes: Vec<Vec<Option<OffsetIndexMetaData>>>,
164    row_group_index: usize,
165    // kv_metadatas will be appended to `props` when `write_metadata`
166    kv_metadatas: Vec<KeyValue>,
167    finished: bool,
168    #[cfg(feature = "encryption")]
169    file_encryptor: Option<Arc<FileEncryptor>>,
170}
171
172impl<W: Write> Debug for SerializedFileWriter<W> {
173    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
174        // implement Debug so this can be used with #[derive(Debug)]
175        // in client code rather than actually listing all the fields
176        f.debug_struct("SerializedFileWriter")
177            .field("descr", &self.descr)
178            .field("row_group_index", &self.row_group_index)
179            .field("kv_metadatas", &self.kv_metadatas)
180            .finish_non_exhaustive()
181    }
182}
183
184impl<W: Write + Send> SerializedFileWriter<W> {
185    /// Creates new file writer.
186    pub fn new(buf: W, schema: TypePtr, properties: WriterPropertiesPtr) -> Result<Self> {
187        let mut buf = TrackedWrite::new(buf);
188
189        let schema_descriptor = SchemaDescriptor::new(schema.clone());
190
191        #[cfg(feature = "encryption")]
192        let file_encryptor = Self::get_file_encryptor(&properties, &schema_descriptor)?;
193
194        Self::start_file(&properties, &mut buf)?;
195        Ok(Self {
196            buf,
197            descr: Arc::new(schema_descriptor),
198            props: properties,
199            row_groups: vec![],
200            bloom_filters: vec![],
201            column_indexes: Vec::new(),
202            offset_indexes: Vec::new(),
203            row_group_index: 0,
204            kv_metadatas: Vec::new(),
205            finished: false,
206            #[cfg(feature = "encryption")]
207            file_encryptor,
208        })
209    }
210
211    #[cfg(feature = "encryption")]
212    fn get_file_encryptor(
213        properties: &WriterPropertiesPtr,
214        schema_descriptor: &SchemaDescriptor,
215    ) -> Result<Option<Arc<FileEncryptor>>> {
216        if let Some(file_encryption_properties) = properties.file_encryption_properties() {
217            file_encryption_properties.validate_encrypted_column_names(schema_descriptor)?;
218
219            Ok(Some(Arc::new(FileEncryptor::new(Arc::clone(
220                file_encryption_properties,
221            ))?)))
222        } else {
223            Ok(None)
224        }
225    }
226
227    /// Creates new row group from this file writer.
228    ///
229    /// Note: Parquet files are limited to at most 2^15 row groups in a file; and row groups must
230    /// be written sequentially.
231    ///
232    /// Every time the next row group is requested, the previous row group must
233    /// be finalised and closed using the [`SerializedRowGroupWriter::close`]
234    /// method or an error will be returned.
235    pub fn next_row_group(&mut self) -> Result<SerializedRowGroupWriter<'_, W>> {
236        self.assert_previous_writer_closed()?;
237        let ordinal = self.row_group_index;
238
239        let ordinal: i16 = ordinal.try_into().map_err(|_| {
240            ParquetError::General(format!(
241                "Parquet does not support more than {} row groups per file (currently: {})",
242                i16::MAX,
243                ordinal
244            ))
245        })?;
246
247        self.row_group_index = self
248            .row_group_index
249            .checked_add(1)
250            .expect("SerializedFileWriter::row_group_index overflowed");
251
252        let bloom_filter_position = self.properties().bloom_filter_position();
253        let row_groups = &mut self.row_groups;
254        let row_bloom_filters = &mut self.bloom_filters;
255        let row_column_indexes = &mut self.column_indexes;
256        let row_offset_indexes = &mut self.offset_indexes;
257        let on_close = move |buf,
258                             mut metadata,
259                             row_group_bloom_filter,
260                             row_group_column_index,
261                             row_group_offset_index| {
262            row_bloom_filters.push(row_group_bloom_filter);
263            row_column_indexes.push(row_group_column_index);
264            row_offset_indexes.push(row_group_offset_index);
265            // write bloom filters out immediately after the row group if requested
266            match bloom_filter_position {
267                BloomFilterPosition::AfterRowGroup => {
268                    write_bloom_filters(buf, row_bloom_filters, &mut metadata)?
269                }
270                BloomFilterPosition::End => (),
271            };
272            row_groups.push(metadata);
273            Ok(())
274        };
275
276        let row_group_writer = SerializedRowGroupWriter::new(
277            self.descr.clone(),
278            self.props.clone(),
279            &mut self.buf,
280            ordinal,
281            Some(Box::new(on_close)),
282        );
283        #[cfg(feature = "encryption")]
284        let row_group_writer = row_group_writer.with_file_encryptor(self.file_encryptor.clone());
285
286        Ok(row_group_writer)
287    }
288
289    /// Returns metadata for any flushed row groups
290    pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
291        &self.row_groups
292    }
293
294    /// Close and finalize the underlying Parquet writer
295    ///
296    /// Unlike [`Self::close`] this does not consume self
297    ///
298    /// Attempting to write after calling finish will result in an error
299    pub fn finish(&mut self) -> Result<ParquetMetaData> {
300        self.assert_previous_writer_closed()?;
301        let metadata = self.write_metadata()?;
302        self.buf.flush()?;
303        Ok(metadata)
304    }
305
306    /// Closes and finalises file writer, returning the file metadata.
307    pub fn close(mut self) -> Result<ParquetMetaData> {
308        self.finish()
309    }
310
311    /// Writes magic bytes at the beginning of the file.
312    #[cfg(not(feature = "encryption"))]
313    fn start_file(_properties: &WriterPropertiesPtr, buf: &mut TrackedWrite<W>) -> Result<()> {
314        buf.write_all(get_file_magic())?;
315        Ok(())
316    }
317
318    /// Writes magic bytes at the beginning of the file.
319    #[cfg(feature = "encryption")]
320    fn start_file(properties: &WriterPropertiesPtr, buf: &mut TrackedWrite<W>) -> Result<()> {
321        let magic = get_file_magic(properties.file_encryption_properties.as_ref());
322
323        buf.write_all(magic)?;
324        Ok(())
325    }
326
327    /// Assembles and writes metadata at the end of the file. This will take ownership
328    /// of `row_groups` and the page index structures.
329    fn write_metadata(&mut self) -> Result<ParquetMetaData> {
330        self.finished = true;
331
332        // write out any remaining bloom filters after all row groups
333        for row_group in &mut self.row_groups {
334            write_bloom_filters(&mut self.buf, &mut self.bloom_filters, row_group)?;
335        }
336
337        let key_value_metadata = match self.props.key_value_metadata() {
338            Some(kv) => Some(kv.iter().chain(&self.kv_metadatas).cloned().collect()),
339            None if self.kv_metadatas.is_empty() => None,
340            None => Some(self.kv_metadatas.clone()),
341        };
342
343        // take ownership of metadata
344        let row_groups = std::mem::take(&mut self.row_groups);
345        let column_indexes = std::mem::take(&mut self.column_indexes);
346        let offset_indexes = std::mem::take(&mut self.offset_indexes);
347
348        let write_path_in_schema = self.props.write_path_in_schema();
349        let mut encoder = ThriftMetadataWriter::new(
350            &mut self.buf,
351            &self.descr,
352            row_groups,
353            Some(self.props.created_by().to_string()),
354            self.props.writer_version().as_num(),
355            write_path_in_schema,
356        );
357
358        #[cfg(feature = "encryption")]
359        {
360            encoder = encoder.with_file_encryptor(self.file_encryptor.clone());
361        }
362
363        if let Some(key_value_metadata) = key_value_metadata {
364            encoder = encoder.with_key_value_metadata(key_value_metadata)
365        }
366
367        encoder = encoder.with_column_indexes(column_indexes);
368        if !self.props.offset_index_disabled() {
369            encoder = encoder.with_offset_indexes(offset_indexes);
370        }
371        encoder.finish()
372    }
373
374    #[inline]
375    fn assert_previous_writer_closed(&self) -> Result<()> {
376        if self.finished {
377            return Err(general_err!("SerializedFileWriter already finished"));
378        }
379
380        if self.row_group_index != self.row_groups.len() {
381            Err(general_err!("Previous row group writer was not closed"))
382        } else {
383            Ok(())
384        }
385    }
386
387    /// Add a [`KeyValue`] to the file writer's metadata
388    pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
389        self.kv_metadatas.push(kv_metadata);
390    }
391
392    /// Returns a reference to schema descriptor.
393    pub fn schema_descr(&self) -> &SchemaDescriptor {
394        &self.descr
395    }
396
397    /// Returns a reference to schema descriptor Arc.
398    #[cfg(feature = "arrow")]
399    pub(crate) fn schema_descr_ptr(&self) -> &SchemaDescPtr {
400        &self.descr
401    }
402
403    /// Returns a reference to the writer properties
404    pub fn properties(&self) -> &WriterPropertiesPtr {
405        &self.props
406    }
407
408    /// Returns a reference to the underlying writer.
409    pub fn inner(&self) -> &W {
410        self.buf.inner()
411    }
412
413    /// Writes the given buf bytes to the internal buffer.
414    ///
415    /// This can be used to write raw data to an in-progress Parquet file, for
416    /// example, custom index structures or other payloads. Other Parquet readers
417    /// will skip this data when reading the files.
418    ///
419    /// It's safe to use this method to write data to the underlying writer,
420    /// because it will ensure that the buffering and byte‐counting layers are used.
421    pub fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
422        self.buf.write_all(buf)
423    }
424
425    /// Flushes underlying writer
426    pub fn flush(&mut self) -> std::io::Result<()> {
427        self.buf.flush()
428    }
429
430    /// Returns a mutable reference to the underlying writer.
431    ///
432    /// **Warning**: if you write directly to this writer, you will skip
433    /// the `TrackedWrite` buffering and byte‐counting layers, which can cause
434    /// the file footer’s recorded offsets and sizes to diverge from reality,
435    /// resulting in an unreadable or corrupted Parquet file.
436    ///
437    /// If you want to write safely to the underlying writer, use [`Self::write_all`].
438    pub fn inner_mut(&mut self) -> &mut W {
439        self.buf.inner_mut()
440    }
441
442    /// Writes the file footer and returns the underlying writer.
443    pub fn into_inner(mut self) -> Result<W> {
444        self.assert_previous_writer_closed()?;
445        let _ = self.write_metadata()?;
446
447        self.buf.into_inner()
448    }
449
450    /// Returns the number of bytes written to this instance
451    pub fn bytes_written(&self) -> usize {
452        self.buf.bytes_written()
453    }
454
455    /// Get the file encryptor used by this instance to encrypt data
456    #[cfg(feature = "encryption")]
457    pub(crate) fn file_encryptor(&self) -> Option<Arc<FileEncryptor>> {
458        self.file_encryptor.clone()
459    }
460}
461
462/// Serialize all the bloom filters of the given row group to the given buffer,
463/// and returns the updated row group metadata.
464fn write_bloom_filters<W: Write + Send>(
465    buf: &mut TrackedWrite<W>,
466    bloom_filters: &mut [Vec<Option<Sbbf>>],
467    row_group: &mut RowGroupMetaData,
468) -> Result<()> {
469    // iter row group
470    // iter each column
471    // write bloom filter to the file
472
473    let row_group_idx: u16 = row_group
474        .ordinal()
475        .expect("Missing row group ordinal")
476        .try_into()
477        .map_err(|_| {
478            ParquetError::General(format!(
479                "Negative row group ordinal: {})",
480                row_group.ordinal().unwrap()
481            ))
482        })?;
483    let row_group_idx = row_group_idx as usize;
484    for (column_idx, column_chunk) in row_group.columns_mut().iter_mut().enumerate() {
485        if let Some(bloom_filter) = bloom_filters[row_group_idx][column_idx].take() {
486            let start_offset = buf.bytes_written();
487            bloom_filter.write(&mut *buf)?;
488            let end_offset = buf.bytes_written();
489            // set offset and index for bloom filter
490            *column_chunk = column_chunk
491                .clone()
492                .into_builder()
493                .set_bloom_filter_offset(Some(start_offset as i64))
494                .set_bloom_filter_length(Some((end_offset - start_offset) as i32))
495                .build()?;
496        }
497    }
498    Ok(())
499}
500
501/// Parquet row group writer API.
502///
503/// Provides methods to access column writers in an iterator-like fashion, order is
504/// guaranteed to match the order of schema leaves (column descriptors).
505///
506/// All columns should be written sequentially; the main workflow is:
507/// - Request the next column using `next_column` method - this will return `None` if no
508///   more columns are available to write.
509/// - Once done writing a column, close column writer with `close`
510/// - Once all columns have been written, close row group writer with `close`
511///   method. The close method will return row group metadata and is no-op
512///   on already closed row group.
513pub struct SerializedRowGroupWriter<'a, W: Write> {
514    descr: SchemaDescPtr,
515    props: WriterPropertiesPtr,
516    buf: &'a mut TrackedWrite<W>,
517    total_rows_written: Option<u64>,
518    total_bytes_written: u64,
519    total_uncompressed_bytes: i64,
520    column_index: usize,
521    row_group_metadata: Option<RowGroupMetaDataPtr>,
522    column_chunks: Vec<ColumnChunkMetaData>,
523    bloom_filters: Vec<Option<Sbbf>>,
524    column_indexes: Vec<Option<ColumnIndexMetaData>>,
525    offset_indexes: Vec<Option<OffsetIndexMetaData>>,
526    row_group_index: i16,
527    file_offset: i64,
528    on_close: Option<OnCloseRowGroup<'a, W>>,
529    #[cfg(feature = "encryption")]
530    file_encryptor: Option<Arc<FileEncryptor>>,
531}
532
533impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> {
534    /// Creates a new `SerializedRowGroupWriter` with:
535    ///
536    /// - `schema_descr` - the schema to write
537    /// - `properties` - writer properties
538    /// - `buf` - the buffer to write data to
539    /// - `row_group_index` - row group index in this parquet file.
540    /// - `file_offset` - file offset of this row group in this parquet file.
541    /// - `on_close` - an optional callback that will invoked on [`Self::close`]
542    pub fn new(
543        schema_descr: SchemaDescPtr,
544        properties: WriterPropertiesPtr,
545        buf: &'a mut TrackedWrite<W>,
546        row_group_index: i16,
547        on_close: Option<OnCloseRowGroup<'a, W>>,
548    ) -> Self {
549        let num_columns = schema_descr.num_columns();
550        let file_offset = buf.bytes_written() as i64;
551        Self {
552            buf,
553            row_group_index,
554            file_offset,
555            on_close,
556            total_rows_written: None,
557            descr: schema_descr,
558            props: properties,
559            column_index: 0,
560            row_group_metadata: None,
561            column_chunks: Vec::with_capacity(num_columns),
562            bloom_filters: Vec::with_capacity(num_columns),
563            column_indexes: Vec::with_capacity(num_columns),
564            offset_indexes: Vec::with_capacity(num_columns),
565            total_bytes_written: 0,
566            total_uncompressed_bytes: 0,
567            #[cfg(feature = "encryption")]
568            file_encryptor: None,
569        }
570    }
571
572    #[cfg(feature = "encryption")]
573    /// Set the file encryptor to use for encrypting row group data and metadata
574    pub(crate) fn with_file_encryptor(
575        mut self,
576        file_encryptor: Option<Arc<FileEncryptor>>,
577    ) -> Self {
578        self.file_encryptor = file_encryptor;
579        self
580    }
581
582    /// Advance `self.column_index` returning the next [`ColumnDescPtr`] if any
583    fn next_column_desc(&mut self) -> Option<ColumnDescPtr> {
584        let ret = self.descr.columns().get(self.column_index)?.clone();
585        self.column_index += 1;
586        Some(ret)
587    }
588
589    /// Returns [`OnCloseColumnChunk`] for the next writer
590    fn get_on_close(&mut self) -> (&mut TrackedWrite<W>, OnCloseColumnChunk<'_>) {
591        let total_bytes_written = &mut self.total_bytes_written;
592        let total_uncompressed_bytes = &mut self.total_uncompressed_bytes;
593        let total_rows_written = &mut self.total_rows_written;
594        let column_chunks = &mut self.column_chunks;
595        let column_indexes = &mut self.column_indexes;
596        let offset_indexes = &mut self.offset_indexes;
597        let bloom_filters = &mut self.bloom_filters;
598
599        let on_close = |r: ColumnCloseResult| {
600            // Update row group writer metrics
601            *total_bytes_written += r.bytes_written;
602            *total_uncompressed_bytes += r.metadata.uncompressed_size();
603            column_chunks.push(r.metadata);
604            bloom_filters.push(r.bloom_filter);
605            column_indexes.push(r.column_index);
606            offset_indexes.push(r.offset_index);
607
608            if let Some(rows) = *total_rows_written {
609                if rows != r.rows_written {
610                    return Err(general_err!(
611                        "Incorrect number of rows, expected {} != {} rows",
612                        rows,
613                        r.rows_written
614                    ));
615                }
616            } else {
617                *total_rows_written = Some(r.rows_written);
618            }
619
620            Ok(())
621        };
622        (self.buf, Box::new(on_close))
623    }
624
625    /// Returns the next column writer, if available, using the factory function;
626    /// otherwise returns `None`.
627    pub(crate) fn next_column_with_factory<'b, F, C>(&'b mut self, factory: F) -> Result<Option<C>>
628    where
629        F: FnOnce(
630            ColumnDescPtr,
631            WriterPropertiesPtr,
632            Box<dyn PageWriter + 'b>,
633            OnCloseColumnChunk<'b>,
634        ) -> Result<C>,
635    {
636        self.assert_previous_writer_closed()?;
637
638        let encryptor_context = self.get_page_encryptor_context();
639
640        Ok(match self.next_column_desc() {
641            Some(column) => {
642                let props = self.props.clone();
643                let (buf, on_close) = self.get_on_close();
644
645                let page_writer = SerializedPageWriter::new(buf);
646                let page_writer =
647                    Self::set_page_writer_encryptor(&column, encryptor_context, page_writer)?;
648
649                Some(factory(
650                    column,
651                    props,
652                    Box::new(page_writer),
653                    Box::new(on_close),
654                )?)
655            }
656            None => None,
657        })
658    }
659
660    /// Returns the next column writer, if available; otherwise returns `None`.
661    /// In case of any IO error or Thrift error, or if row group writer has already been
662    /// closed returns `Err`.
663    pub fn next_column(&mut self) -> Result<Option<SerializedColumnWriter<'_>>> {
664        self.next_column_with_factory(|descr, props, page_writer, on_close| {
665            let column_writer = get_column_writer(descr, props, page_writer);
666            Ok(SerializedColumnWriter::new(column_writer, Some(on_close)))
667        })
668    }
669
670    /// Append an encoded column chunk from `reader` directly to the underlying
671    /// writer.
672    ///
673    /// This method can be used for efficiently concatenating or projecting
674    /// Parquet data, or encoding Parquet data to temporary in-memory buffers.
675    ///
676    /// Arguments:
677    /// - `reader`: a [`ChunkReader`] containing the encoded column data
678    /// - `close`: the [`ColumnCloseResult`] metadata returned from closing
679    ///   the column writer that wrote the data in `reader`.
680    ///
681    /// See Also:
682    /// 1. [`get_column_writer`]  for creating writers that can encode data.
683    /// 2. [`Self::next_column`] for writing data that isn't already encoded
684    pub fn append_column<R: ChunkReader>(
685        &mut self,
686        reader: &R,
687        mut close: ColumnCloseResult,
688    ) -> Result<()> {
689        self.assert_previous_writer_closed()?;
690        let desc = self
691            .next_column_desc()
692            .ok_or_else(|| general_err!("exhausted columns in SerializedRowGroupWriter"))?;
693
694        let metadata = close.metadata;
695
696        if metadata.column_descr() != desc.as_ref() {
697            return Err(general_err!(
698                "column descriptor mismatch, expected {:?} got {:?}",
699                desc,
700                metadata.column_descr()
701            ));
702        }
703
704        let src_dictionary_offset = metadata.dictionary_page_offset();
705        let src_data_offset = metadata.data_page_offset();
706        let src_offset = src_dictionary_offset.unwrap_or(src_data_offset);
707        let src_length = metadata.compressed_size();
708
709        let write_offset = self.buf.bytes_written();
710        let mut read = reader.get_read(src_offset as _)?.take(src_length as _);
711        let write_length = std::io::copy(&mut read, &mut self.buf)?;
712
713        if src_length as u64 != write_length {
714            return Err(general_err!(
715                "Failed to splice column data, expected {read_length} got {write_length}"
716            ));
717        }
718
719        let map_offset = |x| x - src_offset + write_offset as i64;
720        let mut builder = ColumnChunkMetaData::builder(metadata.column_descr_ptr())
721            .set_compression_codec(metadata.compression_codec())
722            .set_encodings_mask(*metadata.encodings_mask())
723            .set_total_compressed_size(metadata.compressed_size())
724            .set_total_uncompressed_size(metadata.uncompressed_size())
725            .set_num_values(metadata.num_values())
726            .set_data_page_offset(map_offset(src_data_offset))
727            .set_dictionary_page_offset(src_dictionary_offset.map(map_offset))
728            .set_unencoded_byte_array_data_bytes(metadata.unencoded_byte_array_data_bytes());
729
730        if let Some(rep_hist) = metadata.repetition_level_histogram() {
731            builder = builder.set_repetition_level_histogram(Some(rep_hist.clone()))
732        }
733        if let Some(def_hist) = metadata.definition_level_histogram() {
734            builder = builder.set_definition_level_histogram(Some(def_hist.clone()))
735        }
736        if let Some(statistics) = metadata.statistics() {
737            builder = builder.set_statistics(statistics.clone())
738        }
739        if let Some(geo_statistics) = metadata.geo_statistics() {
740            builder = builder.set_geo_statistics(Box::new(geo_statistics.clone()))
741        }
742        if let Some(page_encoding_stats) = metadata.page_encoding_stats() {
743            builder = builder.set_page_encoding_stats(page_encoding_stats.clone())
744        }
745        builder = self.set_column_crypto_metadata(builder, &metadata);
746        close.metadata = builder.build()?;
747
748        if let Some(offsets) = close.offset_index.as_mut() {
749            for location in &mut offsets.page_locations {
750                location.offset = map_offset(location.offset)
751            }
752        }
753
754        let (_, on_close) = self.get_on_close();
755        on_close(close)
756    }
757
758    /// Closes this row group writer and returns row group metadata.
759    pub fn close(mut self) -> Result<RowGroupMetaDataPtr> {
760        if self.row_group_metadata.is_none() {
761            self.assert_previous_writer_closed()?;
762
763            let column_chunks = std::mem::take(&mut self.column_chunks);
764            let row_group_metadata = RowGroupMetaData::builder(self.descr.clone())
765                .set_column_metadata(column_chunks)
766                .set_total_byte_size(self.total_uncompressed_bytes)
767                .set_num_rows(self.total_rows_written.unwrap_or(0) as i64)
768                .set_sorting_columns(self.props.sorting_columns().cloned())
769                .set_ordinal(self.row_group_index)
770                .set_file_offset(self.file_offset)
771                .build()?;
772
773            self.row_group_metadata = Some(Arc::new(row_group_metadata.clone()));
774
775            if let Some(on_close) = self.on_close.take() {
776                on_close(
777                    self.buf,
778                    row_group_metadata,
779                    self.bloom_filters,
780                    self.column_indexes,
781                    self.offset_indexes,
782                )?
783            }
784        }
785
786        let metadata = self.row_group_metadata.as_ref().unwrap().clone();
787        Ok(metadata)
788    }
789
790    /// Set the column crypto metadata for a column chunk
791    #[cfg(feature = "encryption")]
792    fn set_column_crypto_metadata(
793        &self,
794        builder: ColumnChunkMetaDataBuilder,
795        metadata: &ColumnChunkMetaData,
796    ) -> ColumnChunkMetaDataBuilder {
797        if let Some(file_encryptor) = self.file_encryptor.as_ref() {
798            builder.set_column_crypto_metadata(get_column_crypto_metadata(
799                file_encryptor.properties(),
800                &metadata.column_descr_ptr(),
801            ))
802        } else {
803            builder
804        }
805    }
806
807    /// Get context required to create a [`PageEncryptor`] for a column
808    #[cfg(feature = "encryption")]
809    fn get_page_encryptor_context(&self) -> PageEncryptorContext {
810        PageEncryptorContext {
811            file_encryptor: self.file_encryptor.clone(),
812            row_group_index: self.row_group_index as usize,
813            column_index: self.column_index,
814        }
815    }
816
817    /// Set the [`PageEncryptor`] on a page writer if a column is encrypted
818    #[cfg(feature = "encryption")]
819    fn set_page_writer_encryptor<'b>(
820        column: &ColumnDescPtr,
821        context: PageEncryptorContext,
822        page_writer: SerializedPageWriter<'b, W>,
823    ) -> Result<SerializedPageWriter<'b, W>> {
824        let page_encryptor = PageEncryptor::create_if_column_encrypted(
825            &context.file_encryptor,
826            context.row_group_index,
827            context.column_index,
828            &column.path().string(),
829        )?;
830
831        Ok(page_writer.with_page_encryptor(page_encryptor))
832    }
833
834    /// No-op implementation of setting the column crypto metadata for a column chunk
835    #[cfg(not(feature = "encryption"))]
836    fn set_column_crypto_metadata(
837        &self,
838        builder: ColumnChunkMetaDataBuilder,
839        _metadata: &ColumnChunkMetaData,
840    ) -> ColumnChunkMetaDataBuilder {
841        builder
842    }
843
844    #[cfg(not(feature = "encryption"))]
845    fn get_page_encryptor_context(&self) -> PageEncryptorContext {
846        PageEncryptorContext {}
847    }
848
849    /// No-op implementation of setting a [`PageEncryptor`] for when encryption is disabled
850    #[cfg(not(feature = "encryption"))]
851    fn set_page_writer_encryptor<'b>(
852        _column: &ColumnDescPtr,
853        _context: PageEncryptorContext,
854        page_writer: SerializedPageWriter<'b, W>,
855    ) -> Result<SerializedPageWriter<'b, W>> {
856        Ok(page_writer)
857    }
858
859    #[inline]
860    fn assert_previous_writer_closed(&self) -> Result<()> {
861        if self.column_index != self.column_chunks.len() {
862            Err(general_err!("Previous column writer was not closed"))
863        } else {
864            Ok(())
865        }
866    }
867}
868
869/// Context required to create a [`PageEncryptor`] for a column
870#[cfg(feature = "encryption")]
871struct PageEncryptorContext {
872    file_encryptor: Option<Arc<FileEncryptor>>,
873    row_group_index: usize,
874    column_index: usize,
875}
876
877#[cfg(not(feature = "encryption"))]
878struct PageEncryptorContext {}
879
880/// A wrapper around a [`ColumnWriter`] that invokes a callback on [`Self::close`]
881pub struct SerializedColumnWriter<'a> {
882    inner: ColumnWriter<'a>,
883    on_close: Option<OnCloseColumnChunk<'a>>,
884}
885
886impl<'a> SerializedColumnWriter<'a> {
887    /// Create a new [`SerializedColumnWriter`] from a [`ColumnWriter`] and an
888    /// optional callback to be invoked on [`Self::close`]
889    pub fn new(inner: ColumnWriter<'a>, on_close: Option<OnCloseColumnChunk<'a>>) -> Self {
890        Self { inner, on_close }
891    }
892
893    /// Returns a reference to an untyped [`ColumnWriter`]
894    pub fn untyped(&mut self) -> &mut ColumnWriter<'a> {
895        &mut self.inner
896    }
897
898    /// Returns a reference to a typed [`ColumnWriterImpl`]
899    pub fn typed<T: DataType>(&mut self) -> &mut ColumnWriterImpl<'a, T> {
900        get_typed_column_writer_mut(&mut self.inner)
901    }
902
903    /// Close this [`SerializedColumnWriter`]
904    pub fn close(mut self) -> Result<()> {
905        let r = self.inner.close()?;
906        if let Some(on_close) = self.on_close.take() {
907            on_close(r)?
908        }
909
910        Ok(())
911    }
912}
913
914/// A serialized implementation for Parquet [`PageWriter`].
915/// Writes and serializes pages and metadata into output stream.
916///
917/// `SerializedPageWriter` should not be used after calling `close()`.
918pub struct SerializedPageWriter<'a, W: Write> {
919    sink: &'a mut TrackedWrite<W>,
920    #[cfg(feature = "encryption")]
921    page_encryptor: Option<PageEncryptor>,
922}
923
924impl<'a, W: Write> SerializedPageWriter<'a, W> {
925    /// Creates new page writer.
926    pub fn new(sink: &'a mut TrackedWrite<W>) -> Self {
927        Self {
928            sink,
929            #[cfg(feature = "encryption")]
930            page_encryptor: None,
931        }
932    }
933
934    /// Serializes page header into Thrift.
935    /// Returns number of bytes that have been written into the sink.
936    #[inline]
937    fn serialize_page_header(&mut self, header: PageHeader) -> Result<usize> {
938        let start_pos = self.sink.bytes_written();
939        match self.page_encryptor_and_sink_mut() {
940            Some((page_encryptor, sink)) => {
941                page_encryptor.encrypt_page_header(&header, sink)?;
942            }
943            None => {
944                let mut protocol = ThriftCompactOutputProtocol::new(&mut self.sink);
945                header.write_thrift(&mut protocol)?;
946            }
947        }
948        Ok(self.sink.bytes_written() - start_pos)
949    }
950}
951
952#[cfg(feature = "encryption")]
953impl<'a, W: Write> SerializedPageWriter<'a, W> {
954    /// Set the encryptor to use to encrypt page data
955    fn with_page_encryptor(mut self, page_encryptor: Option<PageEncryptor>) -> Self {
956        self.page_encryptor = page_encryptor;
957        self
958    }
959
960    fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
961        self.page_encryptor.as_mut()
962    }
963
964    fn page_encryptor_and_sink_mut(
965        &mut self,
966    ) -> Option<(&mut PageEncryptor, &mut &'a mut TrackedWrite<W>)> {
967        self.page_encryptor.as_mut().map(|pe| (pe, &mut self.sink))
968    }
969}
970
971#[cfg(not(feature = "encryption"))]
972impl<'a, W: Write> SerializedPageWriter<'a, W> {
973    fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
974        None
975    }
976
977    fn page_encryptor_and_sink_mut(
978        &mut self,
979    ) -> Option<(&mut PageEncryptor, &mut &'a mut TrackedWrite<W>)> {
980        None
981    }
982}
983
984impl<W: Write + Send> PageWriter for SerializedPageWriter<'_, W> {
985    fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
986        let page = match self.page_encryptor_mut() {
987            Some(page_encryptor) => page_encryptor.encrypt_compressed_page(page)?,
988            None => page,
989        };
990
991        let page_type = page.page_type();
992        let start_pos = self.sink.bytes_written() as u64;
993
994        let page_header = page.to_thrift_header()?;
995        let header_size = self.serialize_page_header(page_header)?;
996
997        self.sink.write_all(page.data())?;
998
999        let mut spec = PageWriteSpec::new();
1000        spec.page_type = page_type;
1001        spec.uncompressed_size = page.uncompressed_size() + header_size;
1002        spec.compressed_size = page.compressed_size() + header_size;
1003        spec.offset = start_pos;
1004        spec.bytes_written = self.sink.bytes_written() as u64 - start_pos;
1005        spec.num_values = page.num_values();
1006
1007        if let Some(page_encryptor) = self.page_encryptor_mut() {
1008            if page.compressed_page().is_data_page() {
1009                page_encryptor.increment_page();
1010            }
1011        }
1012        Ok(spec)
1013    }
1014
1015    fn close(&mut self) -> Result<()> {
1016        self.sink.flush()?;
1017        Ok(())
1018    }
1019}
1020
1021/// Get the magic bytes at the start and end of the file that identify this
1022/// as a Parquet file.
1023#[cfg(feature = "encryption")]
1024pub(crate) fn get_file_magic(
1025    file_encryption_properties: Option<&Arc<FileEncryptionProperties>>,
1026) -> &'static [u8; 4] {
1027    match file_encryption_properties.as_ref() {
1028        Some(encryption_properties) if encryption_properties.encrypt_footer() => {
1029            &PARQUET_MAGIC_ENCR_FOOTER
1030        }
1031        _ => &PARQUET_MAGIC,
1032    }
1033}
1034
1035#[cfg(not(feature = "encryption"))]
1036pub(crate) fn get_file_magic() -> &'static [u8; 4] {
1037    &PARQUET_MAGIC
1038}
1039
1040#[cfg(test)]
1041mod tests {
1042    use super::*;
1043
1044    #[cfg(feature = "arrow")]
1045    use arrow_array::RecordBatchReader;
1046    use bytes::Bytes;
1047    use std::fs::File;
1048
1049    #[cfg(feature = "arrow")]
1050    use crate::arrow::ArrowWriter;
1051    #[cfg(feature = "arrow")]
1052    use crate::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
1053    use crate::basic::{
1054        ColumnOrder, Compression, ConvertedType, Encoding, LogicalType, Repetition, SortOrder, Type,
1055    };
1056    use crate::column::page::{Page, PageReader};
1057    use crate::column::reader::get_typed_column_reader;
1058    use crate::compression::{Codec, CodecOptionsBuilder, create_codec};
1059    use crate::data_type::{BoolType, ByteArrayType, Int32Type};
1060    use crate::file::page_index::column_index::ColumnIndexMetaData;
1061    use crate::file::properties::EnabledStatistics;
1062    use crate::file::serialized_reader::ReadOptionsBuilder;
1063    use crate::file::statistics::{from_thrift_page_stats, page_stats_to_thrift};
1064    use crate::file::{
1065        properties::{ReaderProperties, WriterProperties, WriterVersion},
1066        reader::{FileReader, SerializedFileReader, SerializedPageReader},
1067        statistics::Statistics,
1068    };
1069    use crate::record::{Row, RowAccessor};
1070    use crate::schema::parser::parse_message_type;
1071    use crate::schema::types;
1072    use crate::schema::types::{ColumnDescriptor, ColumnPath};
1073    use crate::util::test_common::file_util::get_test_file;
1074    use crate::util::test_common::rand_gen::RandGen;
1075
1076    #[test]
1077    fn test_row_group_writer_error_not_all_columns_written() {
1078        let file = tempfile::tempfile().unwrap();
1079        let schema = Arc::new(
1080            types::Type::group_type_builder("schema")
1081                .with_fields(vec![Arc::new(
1082                    types::Type::primitive_type_builder("col1", Type::INT32)
1083                        .build()
1084                        .unwrap(),
1085                )])
1086                .build()
1087                .unwrap(),
1088        );
1089        let props = Default::default();
1090        let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
1091        let row_group_writer = writer.next_row_group().unwrap();
1092        let res = row_group_writer.close();
1093        assert!(res.is_err());
1094        if let Err(err) = res {
1095            assert_eq!(
1096                format!("{err}"),
1097                "Parquet error: Column length mismatch: 1 != 0"
1098            );
1099        }
1100    }
1101
1102    #[test]
1103    fn test_row_group_writer_num_records_mismatch() {
1104        let file = tempfile::tempfile().unwrap();
1105        let schema = Arc::new(
1106            types::Type::group_type_builder("schema")
1107                .with_fields(vec![
1108                    Arc::new(
1109                        types::Type::primitive_type_builder("col1", Type::INT32)
1110                            .with_repetition(Repetition::REQUIRED)
1111                            .build()
1112                            .unwrap(),
1113                    ),
1114                    Arc::new(
1115                        types::Type::primitive_type_builder("col2", Type::INT32)
1116                            .with_repetition(Repetition::REQUIRED)
1117                            .build()
1118                            .unwrap(),
1119                    ),
1120                ])
1121                .build()
1122                .unwrap(),
1123        );
1124        let props = Default::default();
1125        let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
1126        let mut row_group_writer = writer.next_row_group().unwrap();
1127
1128        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
1129        col_writer
1130            .typed::<Int32Type>()
1131            .write_batch(&[1, 2, 3], None, None)
1132            .unwrap();
1133        col_writer.close().unwrap();
1134
1135        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
1136        col_writer
1137            .typed::<Int32Type>()
1138            .write_batch(&[1, 2], None, None)
1139            .unwrap();
1140
1141        let err = col_writer.close().unwrap_err();
1142        assert_eq!(
1143            err.to_string(),
1144            "Parquet error: Incorrect number of rows, expected 3 != 2 rows"
1145        );
1146    }
1147
1148    #[test]
1149    fn test_file_writer_empty_file() {
1150        let file = tempfile::tempfile().unwrap();
1151
1152        let schema = Arc::new(
1153            types::Type::group_type_builder("schema")
1154                .with_fields(vec![Arc::new(
1155                    types::Type::primitive_type_builder("col1", Type::INT32)
1156                        .build()
1157                        .unwrap(),
1158                )])
1159                .build()
1160                .unwrap(),
1161        );
1162        let props = Default::default();
1163        let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1164        writer.close().unwrap();
1165
1166        let reader = SerializedFileReader::new(file).unwrap();
1167        assert_eq!(reader.get_row_iter(None).unwrap().count(), 0);
1168    }
1169
1170    #[test]
1171    fn test_file_writer_column_orders_populated() {
1172        let file = tempfile::tempfile().unwrap();
1173
1174        let schema = Arc::new(
1175            types::Type::group_type_builder("schema")
1176                .with_fields(vec![
1177                    Arc::new(
1178                        types::Type::primitive_type_builder("col1", Type::INT32)
1179                            .build()
1180                            .unwrap(),
1181                    ),
1182                    Arc::new(
1183                        types::Type::primitive_type_builder("col2", Type::FIXED_LEN_BYTE_ARRAY)
1184                            .with_converted_type(ConvertedType::INTERVAL)
1185                            .with_length(12)
1186                            .build()
1187                            .unwrap(),
1188                    ),
1189                    Arc::new(
1190                        types::Type::group_type_builder("nested")
1191                            .with_repetition(Repetition::REQUIRED)
1192                            .with_fields(vec![
1193                                Arc::new(
1194                                    types::Type::primitive_type_builder(
1195                                        "col3",
1196                                        Type::FIXED_LEN_BYTE_ARRAY,
1197                                    )
1198                                    .with_logical_type(Some(LogicalType::Float16))
1199                                    .with_length(2)
1200                                    .build()
1201                                    .unwrap(),
1202                                ),
1203                                Arc::new(
1204                                    types::Type::primitive_type_builder("col4", Type::BYTE_ARRAY)
1205                                        .with_logical_type(Some(LogicalType::String))
1206                                        .build()
1207                                        .unwrap(),
1208                                ),
1209                            ])
1210                            .build()
1211                            .unwrap(),
1212                    ),
1213                ])
1214                .build()
1215                .unwrap(),
1216        );
1217
1218        let props = Default::default();
1219        let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1220        writer.close().unwrap();
1221
1222        let reader = SerializedFileReader::new(file).unwrap();
1223
1224        // only leaves
1225        let expected = vec![
1226            // INT32
1227            ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED),
1228            // INTERVAL
1229            ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::UNDEFINED),
1230            // Float16
1231            ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED),
1232            // String
1233            ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::UNSIGNED),
1234        ];
1235        let actual = reader.metadata().file_metadata().column_orders();
1236
1237        assert!(actual.is_some());
1238        let actual = actual.unwrap();
1239        assert_eq!(*actual, expected);
1240    }
1241
1242    #[test]
1243    fn test_file_writer_with_metadata() {
1244        let file = tempfile::tempfile().unwrap();
1245
1246        let schema = Arc::new(
1247            types::Type::group_type_builder("schema")
1248                .with_fields(vec![Arc::new(
1249                    types::Type::primitive_type_builder("col1", Type::INT32)
1250                        .build()
1251                        .unwrap(),
1252                )])
1253                .build()
1254                .unwrap(),
1255        );
1256        let props = Arc::new(
1257            WriterProperties::builder()
1258                .set_key_value_metadata(Some(vec![KeyValue::new(
1259                    "key".to_string(),
1260                    "value".to_string(),
1261                )]))
1262                .build(),
1263        );
1264        let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1265        writer.close().unwrap();
1266
1267        let reader = SerializedFileReader::new(file).unwrap();
1268        assert_eq!(
1269            reader
1270                .metadata()
1271                .file_metadata()
1272                .key_value_metadata()
1273                .to_owned()
1274                .unwrap()
1275                .len(),
1276            1
1277        );
1278    }
1279
1280    #[test]
1281    fn test_file_writer_v2_with_metadata() {
1282        let file = tempfile::tempfile().unwrap();
1283        let field_logical_type = Some(LogicalType::Integer {
1284            bit_width: 8,
1285            is_signed: false,
1286        });
1287        let field = Arc::new(
1288            types::Type::primitive_type_builder("col1", Type::INT32)
1289                .with_logical_type(field_logical_type.clone())
1290                .with_converted_type(field_logical_type.into())
1291                .build()
1292                .unwrap(),
1293        );
1294        let schema = Arc::new(
1295            types::Type::group_type_builder("schema")
1296                .with_fields(vec![field.clone()])
1297                .build()
1298                .unwrap(),
1299        );
1300        let props = Arc::new(
1301            WriterProperties::builder()
1302                .set_key_value_metadata(Some(vec![KeyValue::new(
1303                    "key".to_string(),
1304                    "value".to_string(),
1305                )]))
1306                .set_writer_version(WriterVersion::PARQUET_2_0)
1307                .build(),
1308        );
1309        let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1310        writer.close().unwrap();
1311
1312        let reader = SerializedFileReader::new(file).unwrap();
1313
1314        assert_eq!(
1315            reader
1316                .metadata()
1317                .file_metadata()
1318                .key_value_metadata()
1319                .to_owned()
1320                .unwrap()
1321                .len(),
1322            1
1323        );
1324
1325        // ARROW-11803: Test that the converted and logical types have been populated
1326        let fields = reader.metadata().file_metadata().schema().get_fields();
1327        assert_eq!(fields.len(), 1);
1328        assert_eq!(fields[0], field);
1329    }
1330
1331    #[test]
1332    fn test_file_writer_with_sorting_columns_metadata() {
1333        let file = tempfile::tempfile().unwrap();
1334
1335        let schema = Arc::new(
1336            types::Type::group_type_builder("schema")
1337                .with_fields(vec![
1338                    Arc::new(
1339                        types::Type::primitive_type_builder("col1", Type::INT32)
1340                            .build()
1341                            .unwrap(),
1342                    ),
1343                    Arc::new(
1344                        types::Type::primitive_type_builder("col2", Type::INT32)
1345                            .build()
1346                            .unwrap(),
1347                    ),
1348                ])
1349                .build()
1350                .unwrap(),
1351        );
1352        let expected_result = Some(vec![SortingColumn {
1353            column_idx: 0,
1354            descending: false,
1355            nulls_first: true,
1356        }]);
1357        let props = Arc::new(
1358            WriterProperties::builder()
1359                .set_key_value_metadata(Some(vec![KeyValue::new(
1360                    "key".to_string(),
1361                    "value".to_string(),
1362                )]))
1363                .set_sorting_columns(expected_result.clone())
1364                .build(),
1365        );
1366        let mut writer =
1367            SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1368        let mut row_group_writer = writer.next_row_group().expect("get row group writer");
1369
1370        let col_writer = row_group_writer.next_column().unwrap().unwrap();
1371        col_writer.close().unwrap();
1372
1373        let col_writer = row_group_writer.next_column().unwrap().unwrap();
1374        col_writer.close().unwrap();
1375
1376        row_group_writer.close().unwrap();
1377        writer.close().unwrap();
1378
1379        let reader = SerializedFileReader::new(file).unwrap();
1380        let result: Vec<Option<&Vec<SortingColumn>>> = reader
1381            .metadata()
1382            .row_groups()
1383            .iter()
1384            .map(|f| f.sorting_columns())
1385            .collect();
1386        // validate the sorting column read match the one written above
1387        assert_eq!(expected_result.as_ref(), result[0]);
1388    }
1389
1390    #[test]
1391    fn test_file_writer_empty_row_groups() {
1392        let file = tempfile::tempfile().unwrap();
1393        test_file_roundtrip(file, vec![]);
1394    }
1395
1396    #[test]
1397    fn test_file_writer_single_row_group() {
1398        let file = tempfile::tempfile().unwrap();
1399        test_file_roundtrip(file, vec![vec![1, 2, 3, 4, 5]]);
1400    }
1401
1402    #[test]
1403    fn test_file_writer_multiple_row_groups() {
1404        let file = tempfile::tempfile().unwrap();
1405        test_file_roundtrip(
1406            file,
1407            vec![
1408                vec![1, 2, 3, 4, 5],
1409                vec![1, 2, 3],
1410                vec![1],
1411                vec![1, 2, 3, 4, 5, 6],
1412            ],
1413        );
1414    }
1415
1416    #[test]
1417    fn test_file_writer_multiple_large_row_groups() {
1418        let file = tempfile::tempfile().unwrap();
1419        test_file_roundtrip(
1420            file,
1421            vec![vec![123; 1024], vec![124; 1000], vec![125; 15], vec![]],
1422        );
1423    }
1424
1425    #[test]
1426    fn test_page_writer_data_pages() {
1427        let pages = [
1428            Page::DataPage {
1429                buf: Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]),
1430                num_values: 10,
1431                encoding: Encoding::DELTA_BINARY_PACKED,
1432                def_level_encoding: Encoding::RLE,
1433                rep_level_encoding: Encoding::RLE,
1434                statistics: Some(Statistics::int32(Some(1), Some(3), None, Some(7), true)),
1435            },
1436            Page::DataPageV2 {
1437                buf: Bytes::from(vec![4; 128]),
1438                num_values: 10,
1439                encoding: Encoding::DELTA_BINARY_PACKED,
1440                num_nulls: 2,
1441                num_rows: 12,
1442                def_levels_byte_len: 24,
1443                rep_levels_byte_len: 32,
1444                is_compressed: false,
1445                statistics: Some(Statistics::int32(Some(1), Some(3), None, Some(7), true)),
1446            },
1447        ];
1448
1449        test_page_roundtrip(&pages[..], Compression::SNAPPY, Type::INT32);
1450        test_page_roundtrip(&pages[..], Compression::UNCOMPRESSED, Type::INT32);
1451    }
1452
1453    #[test]
1454    fn test_page_writer_dict_pages() {
1455        let pages = [
1456            Page::DictionaryPage {
1457                buf: Bytes::from(vec![1, 2, 3, 4, 5]),
1458                num_values: 5,
1459                encoding: Encoding::RLE_DICTIONARY,
1460                is_sorted: false,
1461            },
1462            Page::DataPage {
1463                buf: Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]),
1464                num_values: 10,
1465                encoding: Encoding::DELTA_BINARY_PACKED,
1466                def_level_encoding: Encoding::RLE,
1467                rep_level_encoding: Encoding::RLE,
1468                statistics: Some(Statistics::int32(Some(1), Some(3), None, Some(7), true)),
1469            },
1470            Page::DataPageV2 {
1471                buf: Bytes::from(vec![4; 128]),
1472                num_values: 10,
1473                encoding: Encoding::DELTA_BINARY_PACKED,
1474                num_nulls: 2,
1475                num_rows: 12,
1476                def_levels_byte_len: 24,
1477                rep_levels_byte_len: 32,
1478                is_compressed: false,
1479                statistics: None,
1480            },
1481        ];
1482
1483        test_page_roundtrip(&pages[..], Compression::SNAPPY, Type::INT32);
1484        test_page_roundtrip(&pages[..], Compression::UNCOMPRESSED, Type::INT32);
1485    }
1486
1487    /// Tests writing and reading pages.
1488    /// Physical type is for statistics only, should match any defined statistics type in
1489    /// pages.
1490    fn test_page_roundtrip(pages: &[Page], codec: Compression, physical_type: Type) {
1491        let mut compressed_pages = vec![];
1492        let mut total_num_values = 0i64;
1493        let codec_options = CodecOptionsBuilder::default()
1494            .set_backward_compatible_lz4(false)
1495            .build();
1496        let mut compressor = create_codec(codec, &codec_options).unwrap();
1497
1498        for page in pages {
1499            let uncompressed_len = page.buffer().len();
1500
1501            let compressed_page = match *page {
1502                Page::DataPage {
1503                    ref buf,
1504                    num_values,
1505                    encoding,
1506                    def_level_encoding,
1507                    rep_level_encoding,
1508                    ref statistics,
1509                } => {
1510                    total_num_values += num_values as i64;
1511                    let output_buf = compress_helper(compressor.as_mut(), buf);
1512
1513                    Page::DataPage {
1514                        buf: Bytes::from(output_buf),
1515                        num_values,
1516                        encoding,
1517                        def_level_encoding,
1518                        rep_level_encoding,
1519                        statistics: from_thrift_page_stats(
1520                            physical_type,
1521                            page_stats_to_thrift(statistics.as_ref()),
1522                        )
1523                        .unwrap(),
1524                    }
1525                }
1526                Page::DataPageV2 {
1527                    ref buf,
1528                    num_values,
1529                    encoding,
1530                    num_nulls,
1531                    num_rows,
1532                    def_levels_byte_len,
1533                    rep_levels_byte_len,
1534                    ref statistics,
1535                    ..
1536                } => {
1537                    total_num_values += num_values as i64;
1538                    let offset = (def_levels_byte_len + rep_levels_byte_len) as usize;
1539                    let cmp_buf = compress_helper(compressor.as_mut(), &buf[offset..]);
1540                    let mut output_buf = Vec::from(&buf[..offset]);
1541                    output_buf.extend_from_slice(&cmp_buf[..]);
1542
1543                    Page::DataPageV2 {
1544                        buf: Bytes::from(output_buf),
1545                        num_values,
1546                        encoding,
1547                        num_nulls,
1548                        num_rows,
1549                        def_levels_byte_len,
1550                        rep_levels_byte_len,
1551                        is_compressed: compressor.is_some(),
1552                        statistics: from_thrift_page_stats(
1553                            physical_type,
1554                            page_stats_to_thrift(statistics.as_ref()),
1555                        )
1556                        .unwrap(),
1557                    }
1558                }
1559                Page::DictionaryPage {
1560                    ref buf,
1561                    num_values,
1562                    encoding,
1563                    is_sorted,
1564                } => {
1565                    let output_buf = compress_helper(compressor.as_mut(), buf);
1566
1567                    Page::DictionaryPage {
1568                        buf: Bytes::from(output_buf),
1569                        num_values,
1570                        encoding,
1571                        is_sorted,
1572                    }
1573                }
1574            };
1575
1576            let compressed_page = CompressedPage::new(compressed_page, uncompressed_len);
1577            compressed_pages.push(compressed_page);
1578        }
1579
1580        let mut buffer: Vec<u8> = vec![];
1581        let mut result_pages: Vec<Page> = vec![];
1582        {
1583            let mut writer = TrackedWrite::new(&mut buffer);
1584            let mut page_writer = SerializedPageWriter::new(&mut writer);
1585
1586            for page in compressed_pages {
1587                page_writer.write_page(page).unwrap();
1588            }
1589            page_writer.close().unwrap();
1590        }
1591        {
1592            let reader = bytes::Bytes::from(buffer);
1593
1594            let t = types::Type::primitive_type_builder("t", physical_type)
1595                .build()
1596                .unwrap();
1597
1598            let desc = ColumnDescriptor::new(Arc::new(t), 0, 0, ColumnPath::new(vec![]));
1599            let meta = ColumnChunkMetaData::builder(Arc::new(desc))
1600                .set_compression_codec(codec.into())
1601                .set_total_compressed_size(reader.len() as i64)
1602                .set_num_values(total_num_values)
1603                .build()
1604                .unwrap();
1605
1606            let props = ReaderProperties::builder()
1607                .set_backward_compatible_lz4(false)
1608                .set_read_page_statistics(true)
1609                .build();
1610            let mut page_reader = SerializedPageReader::new_with_properties(
1611                Arc::new(reader),
1612                &meta,
1613                total_num_values as usize,
1614                None,
1615                Arc::new(props),
1616            )
1617            .unwrap();
1618
1619            while let Some(page) = page_reader.get_next_page().unwrap() {
1620                result_pages.push(page);
1621            }
1622        }
1623
1624        assert_eq!(result_pages.len(), pages.len());
1625        for i in 0..result_pages.len() {
1626            assert_page(&result_pages[i], &pages[i]);
1627        }
1628    }
1629
1630    /// Helper function to compress a slice
1631    fn compress_helper(compressor: Option<&mut Box<dyn Codec>>, data: &[u8]) -> Vec<u8> {
1632        let mut output_buf = vec![];
1633        if let Some(cmpr) = compressor {
1634            cmpr.compress(data, &mut output_buf).unwrap();
1635        } else {
1636            output_buf.extend_from_slice(data);
1637        }
1638        output_buf
1639    }
1640
1641    /// Check if pages match.
1642    fn assert_page(left: &Page, right: &Page) {
1643        assert_eq!(left.page_type(), right.page_type());
1644        assert_eq!(&left.buffer(), &right.buffer());
1645        assert_eq!(left.num_values(), right.num_values());
1646        assert_eq!(left.encoding(), right.encoding());
1647        assert_eq!(
1648            page_stats_to_thrift(left.statistics()),
1649            page_stats_to_thrift(right.statistics())
1650        );
1651    }
1652
1653    /// Tests roundtrip of i32 data written using `W` and read using `R`
1654    fn test_roundtrip_i32<W, R>(
1655        file: W,
1656        data: Vec<Vec<i32>>,
1657        compression: Compression,
1658    ) -> ParquetMetaData
1659    where
1660        W: Write + Send,
1661        R: ChunkReader + From<W> + 'static,
1662    {
1663        test_roundtrip::<W, R, Int32Type, _>(file, data, |r| r.get_int(0).unwrap(), compression)
1664    }
1665
1666    /// Tests roundtrip of data of type `D` written using `W` and read using `R`
1667    /// and the provided `values` function
1668    fn test_roundtrip<W, R, D, F>(
1669        mut file: W,
1670        data: Vec<Vec<D::T>>,
1671        value: F,
1672        compression: Compression,
1673    ) -> ParquetMetaData
1674    where
1675        W: Write + Send,
1676        R: ChunkReader + From<W> + 'static,
1677        D: DataType,
1678        F: Fn(Row) -> D::T,
1679    {
1680        let schema = Arc::new(
1681            types::Type::group_type_builder("schema")
1682                .with_fields(vec![Arc::new(
1683                    types::Type::primitive_type_builder("col1", D::get_physical_type())
1684                        .with_repetition(Repetition::REQUIRED)
1685                        .build()
1686                        .unwrap(),
1687                )])
1688                .build()
1689                .unwrap(),
1690        );
1691        let props = Arc::new(
1692            WriterProperties::builder()
1693                .set_compression(compression)
1694                .build(),
1695        );
1696        let mut file_writer = SerializedFileWriter::new(&mut file, schema, props).unwrap();
1697        let mut rows: i64 = 0;
1698
1699        for (idx, subset) in data.iter().enumerate() {
1700            let row_group_file_offset = file_writer.buf.bytes_written();
1701            let mut row_group_writer = file_writer.next_row_group().unwrap();
1702            if let Some(mut writer) = row_group_writer.next_column().unwrap() {
1703                rows += writer
1704                    .typed::<D>()
1705                    .write_batch(&subset[..], None, None)
1706                    .unwrap() as i64;
1707                writer.close().unwrap();
1708            }
1709            let last_group = row_group_writer.close().unwrap();
1710            let flushed = file_writer.flushed_row_groups();
1711            assert_eq!(flushed.len(), idx + 1);
1712            assert_eq!(Some(idx as i16), last_group.ordinal());
1713            assert_eq!(Some(row_group_file_offset as i64), last_group.file_offset());
1714            assert_eq!(&flushed[idx], last_group.as_ref());
1715        }
1716        let file_metadata = file_writer.close().unwrap();
1717
1718        let reader = SerializedFileReader::new(R::from(file)).unwrap();
1719        assert_eq!(reader.num_row_groups(), data.len());
1720        assert_eq!(
1721            reader.metadata().file_metadata().num_rows(),
1722            rows,
1723            "row count in metadata not equal to number of rows written"
1724        );
1725        for (i, item) in data.iter().enumerate().take(reader.num_row_groups()) {
1726            let row_group_reader = reader.get_row_group(i).unwrap();
1727            let iter = row_group_reader.get_row_iter(None).unwrap();
1728            let res: Vec<_> = iter.map(|row| row.unwrap()).map(&value).collect();
1729            let row_group_size = row_group_reader.metadata().total_byte_size();
1730            let uncompressed_size: i64 = row_group_reader
1731                .metadata()
1732                .columns()
1733                .iter()
1734                .map(|v| v.uncompressed_size())
1735                .sum();
1736            assert_eq!(row_group_size, uncompressed_size);
1737            assert_eq!(res, *item);
1738        }
1739        file_metadata
1740    }
1741
1742    /// File write-read roundtrip.
1743    /// `data` consists of arrays of values for each row group.
1744    fn test_file_roundtrip(file: File, data: Vec<Vec<i32>>) -> ParquetMetaData {
1745        test_roundtrip_i32::<File, File>(file, data, Compression::UNCOMPRESSED)
1746    }
1747
1748    #[test]
1749    fn test_bytes_writer_empty_row_groups() {
1750        test_bytes_roundtrip(vec![], Compression::UNCOMPRESSED);
1751    }
1752
1753    #[test]
1754    fn test_bytes_writer_single_row_group() {
1755        test_bytes_roundtrip(vec![vec![1, 2, 3, 4, 5]], Compression::UNCOMPRESSED);
1756    }
1757
1758    #[test]
1759    fn test_bytes_writer_multiple_row_groups() {
1760        test_bytes_roundtrip(
1761            vec![
1762                vec![1, 2, 3, 4, 5],
1763                vec![1, 2, 3],
1764                vec![1],
1765                vec![1, 2, 3, 4, 5, 6],
1766            ],
1767            Compression::UNCOMPRESSED,
1768        );
1769    }
1770
1771    #[test]
1772    fn test_bytes_writer_single_row_group_compressed() {
1773        test_bytes_roundtrip(vec![vec![1, 2, 3, 4, 5]], Compression::SNAPPY);
1774    }
1775
1776    #[test]
1777    fn test_bytes_writer_multiple_row_groups_compressed() {
1778        test_bytes_roundtrip(
1779            vec![
1780                vec![1, 2, 3, 4, 5],
1781                vec![1, 2, 3],
1782                vec![1],
1783                vec![1, 2, 3, 4, 5, 6],
1784            ],
1785            Compression::SNAPPY,
1786        );
1787    }
1788
1789    fn test_bytes_roundtrip(data: Vec<Vec<i32>>, compression: Compression) {
1790        test_roundtrip_i32::<Vec<u8>, Bytes>(Vec::with_capacity(1024), data, compression);
1791    }
1792
1793    #[test]
1794    fn test_boolean_roundtrip() {
1795        let my_bool_values: Vec<_> = (0..2049).map(|idx| idx % 2 == 0).collect();
1796        test_roundtrip::<Vec<u8>, Bytes, BoolType, _>(
1797            Vec::with_capacity(1024),
1798            vec![my_bool_values],
1799            |r| r.get_bool(0).unwrap(),
1800            Compression::UNCOMPRESSED,
1801        );
1802    }
1803
1804    #[test]
1805    fn test_boolean_compressed_roundtrip() {
1806        let my_bool_values: Vec<_> = (0..2049).map(|idx| idx % 2 == 0).collect();
1807        test_roundtrip::<Vec<u8>, Bytes, BoolType, _>(
1808            Vec::with_capacity(1024),
1809            vec![my_bool_values],
1810            |r| r.get_bool(0).unwrap(),
1811            Compression::SNAPPY,
1812        );
1813    }
1814
1815    #[test]
1816    fn test_column_offset_index_file() {
1817        let file = tempfile::tempfile().unwrap();
1818        let file_metadata = test_file_roundtrip(file, vec![vec![1, 2, 3, 4, 5]]);
1819        file_metadata.row_groups().iter().for_each(|row_group| {
1820            row_group.columns().iter().for_each(|column_chunk| {
1821                assert!(column_chunk.column_index_offset().is_some());
1822                assert!(column_chunk.column_index_length().is_some());
1823                assert!(column_chunk.offset_index_offset().is_some());
1824                assert!(column_chunk.offset_index_length().is_some());
1825            })
1826        });
1827    }
1828
1829    fn test_kv_metadata(initial_kv: Option<Vec<KeyValue>>, final_kv: Option<Vec<KeyValue>>) {
1830        let schema = Arc::new(
1831            types::Type::group_type_builder("schema")
1832                .with_fields(vec![Arc::new(
1833                    types::Type::primitive_type_builder("col1", Type::INT32)
1834                        .with_repetition(Repetition::REQUIRED)
1835                        .build()
1836                        .unwrap(),
1837                )])
1838                .build()
1839                .unwrap(),
1840        );
1841        let mut out = Vec::with_capacity(1024);
1842        let props = Arc::new(
1843            WriterProperties::builder()
1844                .set_key_value_metadata(initial_kv.clone())
1845                .build(),
1846        );
1847        let mut writer = SerializedFileWriter::new(&mut out, schema, props).unwrap();
1848        let mut row_group_writer = writer.next_row_group().unwrap();
1849        let column = row_group_writer.next_column().unwrap().unwrap();
1850        column.close().unwrap();
1851        row_group_writer.close().unwrap();
1852        if let Some(kvs) = &final_kv {
1853            for kv in kvs {
1854                writer.append_key_value_metadata(kv.clone())
1855            }
1856        }
1857        writer.close().unwrap();
1858
1859        let reader = SerializedFileReader::new(Bytes::from(out)).unwrap();
1860        let metadata = reader.metadata().file_metadata();
1861        let keys = metadata.key_value_metadata();
1862
1863        match (initial_kv, final_kv) {
1864            (Some(a), Some(b)) => {
1865                let keys = keys.unwrap();
1866                assert_eq!(keys.len(), a.len() + b.len());
1867                assert_eq!(&keys[..a.len()], a.as_slice());
1868                assert_eq!(&keys[a.len()..], b.as_slice());
1869            }
1870            (Some(v), None) => assert_eq!(keys.unwrap(), &v),
1871            (None, Some(v)) if !v.is_empty() => assert_eq!(keys.unwrap(), &v),
1872            _ => assert!(keys.is_none()),
1873        }
1874    }
1875
1876    #[test]
1877    fn test_append_metadata() {
1878        let kv1 = KeyValue::new("cupcakes".to_string(), "awesome".to_string());
1879        let kv2 = KeyValue::new("bingo".to_string(), "bongo".to_string());
1880
1881        test_kv_metadata(None, None);
1882        test_kv_metadata(Some(vec![kv1.clone()]), None);
1883        test_kv_metadata(None, Some(vec![kv2.clone()]));
1884        test_kv_metadata(Some(vec![kv1.clone()]), Some(vec![kv2.clone()]));
1885        test_kv_metadata(Some(vec![]), Some(vec![kv2]));
1886        test_kv_metadata(Some(vec![]), Some(vec![]));
1887        test_kv_metadata(Some(vec![kv1]), Some(vec![]));
1888        test_kv_metadata(None, Some(vec![]));
1889    }
1890
1891    #[test]
1892    fn test_backwards_compatible_statistics() {
1893        let message_type = "
1894            message test_schema {
1895                REQUIRED INT32 decimal1 (DECIMAL(8,2));
1896                REQUIRED INT32 i32 (INTEGER(32,true));
1897                REQUIRED INT32 u32 (INTEGER(32,false));
1898            }
1899        ";
1900
1901        let schema = Arc::new(parse_message_type(message_type).unwrap());
1902        let props = Default::default();
1903        let mut writer = SerializedFileWriter::new(vec![], schema, props).unwrap();
1904        let mut row_group_writer = writer.next_row_group().unwrap();
1905
1906        for _ in 0..3 {
1907            let mut writer = row_group_writer.next_column().unwrap().unwrap();
1908            writer
1909                .typed::<Int32Type>()
1910                .write_batch(&[1, 2, 3], None, None)
1911                .unwrap();
1912            writer.close().unwrap();
1913        }
1914        let metadata = row_group_writer.close().unwrap();
1915        writer.close().unwrap();
1916
1917        // decimal
1918        let s = page_stats_to_thrift(metadata.column(0).statistics()).unwrap();
1919        assert_eq!(s.min.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1920        assert_eq!(s.max.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1921        assert_eq!(s.min_value.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1922        assert_eq!(s.max_value.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1923
1924        // i32
1925        let s = page_stats_to_thrift(metadata.column(1).statistics()).unwrap();
1926        assert_eq!(s.min.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1927        assert_eq!(s.max.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1928        assert_eq!(s.min_value.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1929        assert_eq!(s.max_value.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1930
1931        // u32
1932        let s = page_stats_to_thrift(metadata.column(2).statistics()).unwrap();
1933        assert_eq!(s.min.as_deref(), None);
1934        assert_eq!(s.max.as_deref(), None);
1935        assert_eq!(s.min_value.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1936        assert_eq!(s.max_value.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1937    }
1938
1939    #[test]
1940    fn test_spliced_write() {
1941        let message_type = "
1942            message test_schema {
1943                REQUIRED INT32 i32 (INTEGER(32,true));
1944                REQUIRED INT32 u32 (INTEGER(32,false));
1945            }
1946        ";
1947        let schema = Arc::new(parse_message_type(message_type).unwrap());
1948        let props = Arc::new(WriterProperties::builder().build());
1949
1950        let mut file = Vec::with_capacity(1024);
1951        let mut file_writer = SerializedFileWriter::new(&mut file, schema, props.clone()).unwrap();
1952
1953        let columns = file_writer.descr.columns();
1954        let mut column_state: Vec<(_, Option<ColumnCloseResult>)> = columns
1955            .iter()
1956            .map(|_| (TrackedWrite::new(Vec::with_capacity(1024)), None))
1957            .collect();
1958
1959        let mut column_state_slice = column_state.as_mut_slice();
1960        let mut column_writers = Vec::with_capacity(columns.len());
1961        for c in columns {
1962            let ((buf, out), tail) = column_state_slice.split_first_mut().unwrap();
1963            column_state_slice = tail;
1964
1965            let page_writer = Box::new(SerializedPageWriter::new(buf));
1966            let col_writer = get_column_writer(c.clone(), props.clone(), page_writer);
1967            column_writers.push(SerializedColumnWriter::new(
1968                col_writer,
1969                Some(Box::new(|on_close| {
1970                    *out = Some(on_close);
1971                    Ok(())
1972                })),
1973            ));
1974        }
1975
1976        let column_data = [[1, 2, 3, 4], [7, 3, 7, 3]];
1977
1978        // Interleaved writing to the column writers
1979        for (writer, batch) in column_writers.iter_mut().zip(column_data) {
1980            let writer = writer.typed::<Int32Type>();
1981            writer.write_batch(&batch, None, None).unwrap();
1982        }
1983
1984        // Close the column writers
1985        for writer in column_writers {
1986            writer.close().unwrap()
1987        }
1988
1989        // Splice column data into a row group
1990        let mut row_group_writer = file_writer.next_row_group().unwrap();
1991        for (write, close) in column_state {
1992            let buf = Bytes::from(write.into_inner().unwrap());
1993            row_group_writer
1994                .append_column(&buf, close.unwrap())
1995                .unwrap();
1996        }
1997        row_group_writer.close().unwrap();
1998        file_writer.close().unwrap();
1999
2000        // Check data was written correctly
2001        let file = Bytes::from(file);
2002        let test_read = |reader: SerializedFileReader<Bytes>| {
2003            let row_group = reader.get_row_group(0).unwrap();
2004
2005            let mut out = Vec::with_capacity(4);
2006            let c1 = row_group.get_column_reader(0).unwrap();
2007            let mut c1 = get_typed_column_reader::<Int32Type>(c1);
2008            c1.read_records(4, None, None, &mut out).unwrap();
2009            assert_eq!(out, column_data[0]);
2010
2011            out.clear();
2012
2013            let c2 = row_group.get_column_reader(1).unwrap();
2014            let mut c2 = get_typed_column_reader::<Int32Type>(c2);
2015            c2.read_records(4, None, None, &mut out).unwrap();
2016            assert_eq!(out, column_data[1]);
2017        };
2018
2019        let reader = SerializedFileReader::new(file.clone()).unwrap();
2020        test_read(reader);
2021
2022        let options = ReadOptionsBuilder::new().with_page_index().build();
2023        let reader = SerializedFileReader::new_with_options(file, options).unwrap();
2024        test_read(reader);
2025    }
2026
2027    #[test]
2028    fn test_disabled_statistics() {
2029        let message_type = "
2030            message test_schema {
2031                REQUIRED INT32 a;
2032                REQUIRED INT32 b;
2033            }
2034        ";
2035        let schema = Arc::new(parse_message_type(message_type).unwrap());
2036        let props = WriterProperties::builder()
2037            .set_statistics_enabled(EnabledStatistics::None)
2038            .set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
2039            .set_offset_index_disabled(true) // this should be ignored because of the line above
2040            .build();
2041        let mut file = Vec::with_capacity(1024);
2042        let mut file_writer =
2043            SerializedFileWriter::new(&mut file, schema, Arc::new(props)).unwrap();
2044
2045        let mut row_group_writer = file_writer.next_row_group().unwrap();
2046        let mut a_writer = row_group_writer.next_column().unwrap().unwrap();
2047        let col_writer = a_writer.typed::<Int32Type>();
2048        col_writer.write_batch(&[1, 2, 3], None, None).unwrap();
2049        a_writer.close().unwrap();
2050
2051        let mut b_writer = row_group_writer.next_column().unwrap().unwrap();
2052        let col_writer = b_writer.typed::<Int32Type>();
2053        col_writer.write_batch(&[4, 5, 6], None, None).unwrap();
2054        b_writer.close().unwrap();
2055        row_group_writer.close().unwrap();
2056
2057        let metadata = file_writer.finish().unwrap();
2058        assert_eq!(metadata.num_row_groups(), 1);
2059        let row_group = metadata.row_group(0);
2060        assert_eq!(row_group.num_columns(), 2);
2061        // Column "a" has both offset and column index, as requested
2062        assert!(row_group.column(0).offset_index_offset().is_some());
2063        assert!(row_group.column(0).column_index_offset().is_some());
2064        // Column "b" should only have offset index
2065        assert!(row_group.column(1).offset_index_offset().is_some());
2066        assert!(row_group.column(1).column_index_offset().is_none());
2067
2068        let err = file_writer.next_row_group().err().unwrap().to_string();
2069        assert_eq!(err, "Parquet error: SerializedFileWriter already finished");
2070
2071        drop(file_writer);
2072
2073        let options = ReadOptionsBuilder::new().with_page_index().build();
2074        let reader = SerializedFileReader::new_with_options(Bytes::from(file), options).unwrap();
2075
2076        let offset_index = reader.metadata().offset_index().unwrap();
2077        assert_eq!(offset_index.len(), 1); // 1 row group
2078        assert_eq!(offset_index[0].len(), 2); // 2 columns
2079
2080        let column_index = reader.metadata().column_index().unwrap();
2081        assert_eq!(column_index.len(), 1); // 1 row group
2082        assert_eq!(column_index[0].len(), 2); // 2 column
2083
2084        let a_idx = &column_index[0][0];
2085        assert!(matches!(a_idx, ColumnIndexMetaData::INT32(_)), "{a_idx:?}");
2086        let b_idx = &column_index[0][1];
2087        assert!(matches!(b_idx, ColumnIndexMetaData::NONE), "{b_idx:?}");
2088    }
2089
2090    #[test]
2091    fn test_byte_array_size_statistics() {
2092        let message_type = "
2093            message test_schema {
2094                OPTIONAL BYTE_ARRAY a (UTF8);
2095            }
2096        ";
2097        let schema = Arc::new(parse_message_type(message_type).unwrap());
2098        let data = ByteArrayType::gen_vec(32, 7);
2099        let def_levels = [1, 1, 1, 1, 0, 1, 0, 1, 0, 1];
2100        let unenc_size: i64 = data.iter().map(|x| x.len() as i64).sum();
2101        let file: File = tempfile::tempfile().unwrap();
2102        let props = Arc::new(
2103            WriterProperties::builder()
2104                .set_statistics_enabled(EnabledStatistics::Page)
2105                .build(),
2106        );
2107
2108        let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
2109        let mut row_group_writer = writer.next_row_group().unwrap();
2110
2111        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
2112        col_writer
2113            .typed::<ByteArrayType>()
2114            .write_batch(&data, Some(&def_levels), None)
2115            .unwrap();
2116        col_writer.close().unwrap();
2117        row_group_writer.close().unwrap();
2118        let file_metadata = writer.close().unwrap();
2119
2120        assert_eq!(file_metadata.num_row_groups(), 1);
2121        assert_eq!(file_metadata.row_group(0).num_columns(), 1);
2122
2123        let check_def_hist = |def_hist: &[i64]| {
2124            assert_eq!(def_hist.len(), 2);
2125            assert_eq!(def_hist[0], 3);
2126            assert_eq!(def_hist[1], 7);
2127        };
2128
2129        let meta_data = file_metadata.row_group(0).column(0);
2130
2131        assert!(meta_data.repetition_level_histogram().is_none());
2132        assert!(meta_data.definition_level_histogram().is_some());
2133        assert!(meta_data.unencoded_byte_array_data_bytes().is_some());
2134        assert_eq!(
2135            unenc_size,
2136            meta_data.unencoded_byte_array_data_bytes().unwrap()
2137        );
2138        check_def_hist(meta_data.definition_level_histogram().unwrap().values());
2139
2140        // check that the read metadata is also correct
2141        let options = ReadOptionsBuilder::new().with_page_index().build();
2142        let reader = SerializedFileReader::new_with_options(file, options).unwrap();
2143
2144        let rfile_metadata = reader.metadata().file_metadata();
2145        assert_eq!(
2146            rfile_metadata.num_rows(),
2147            file_metadata.file_metadata().num_rows()
2148        );
2149        assert_eq!(reader.num_row_groups(), 1);
2150        let rowgroup = reader.get_row_group(0).unwrap();
2151        assert_eq!(rowgroup.num_columns(), 1);
2152        let column = rowgroup.metadata().column(0);
2153        assert!(column.definition_level_histogram().is_some());
2154        assert!(column.repetition_level_histogram().is_none());
2155        assert!(column.unencoded_byte_array_data_bytes().is_some());
2156        check_def_hist(column.definition_level_histogram().unwrap().values());
2157        assert_eq!(
2158            unenc_size,
2159            column.unencoded_byte_array_data_bytes().unwrap()
2160        );
2161
2162        // check histogram in column index as well
2163        assert!(reader.metadata().column_index().is_some());
2164        let column_index = reader.metadata().column_index().unwrap();
2165        assert_eq!(column_index.len(), 1);
2166        assert_eq!(column_index[0].len(), 1);
2167        let col_idx = if let ColumnIndexMetaData::BYTE_ARRAY(index) = &column_index[0][0] {
2168            assert_eq!(index.num_pages(), 1);
2169            index
2170        } else {
2171            unreachable!()
2172        };
2173
2174        assert!(col_idx.repetition_level_histogram(0).is_none());
2175        assert!(col_idx.definition_level_histogram(0).is_some());
2176        check_def_hist(col_idx.definition_level_histogram(0).unwrap());
2177
2178        assert!(reader.metadata().offset_index().is_some());
2179        let offset_index = reader.metadata().offset_index().unwrap();
2180        assert_eq!(offset_index.len(), 1);
2181        assert_eq!(offset_index[0].len(), 1);
2182        assert!(offset_index[0][0].unencoded_byte_array_data_bytes.is_some());
2183        let page_sizes = offset_index[0][0]
2184            .unencoded_byte_array_data_bytes
2185            .as_ref()
2186            .unwrap();
2187        assert_eq!(page_sizes.len(), 1);
2188        assert_eq!(page_sizes[0], unenc_size);
2189    }
2190
2191    #[test]
2192    fn test_too_many_rowgroups() {
2193        let message_type = "
2194            message test_schema {
2195                REQUIRED BYTE_ARRAY a (UTF8);
2196            }
2197        ";
2198        let schema = Arc::new(parse_message_type(message_type).unwrap());
2199        let file: File = tempfile::tempfile().unwrap();
2200        let props = Arc::new(
2201            WriterProperties::builder()
2202                .set_statistics_enabled(EnabledStatistics::None)
2203                .set_max_row_group_row_count(Some(1))
2204                .build(),
2205        );
2206        let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
2207
2208        // Create 32k empty rowgroups. Should error when i == 32768.
2209        for i in 0..0x8001 {
2210            match writer.next_row_group() {
2211                Ok(mut row_group_writer) => {
2212                    assert_ne!(i, 0x8000);
2213                    let col_writer = row_group_writer.next_column().unwrap().unwrap();
2214                    col_writer.close().unwrap();
2215                    row_group_writer.close().unwrap();
2216                }
2217                Err(e) => {
2218                    assert_eq!(i, 0x8000);
2219                    assert_eq!(
2220                        e.to_string(),
2221                        "Parquet error: Parquet does not support more than 32767 row groups per file (currently: 32768)"
2222                    );
2223                }
2224            }
2225        }
2226        writer.close().unwrap();
2227    }
2228
2229    #[test]
2230    fn test_size_statistics_with_repetition_and_nulls() {
2231        let message_type = "
2232            message test_schema {
2233                OPTIONAL group i32_list (LIST) {
2234                    REPEATED group list {
2235                        OPTIONAL INT32 element;
2236                    }
2237                }
2238            }
2239        ";
2240        // column is:
2241        // row 0: [1, 2]
2242        // row 1: NULL
2243        // row 2: [4, NULL]
2244        // row 3: []
2245        // row 4: [7, 8, 9, 10]
2246        let schema = Arc::new(parse_message_type(message_type).unwrap());
2247        let data = [1, 2, 4, 7, 8, 9, 10];
2248        let def_levels = [3, 3, 0, 3, 2, 1, 3, 3, 3, 3];
2249        let rep_levels = [0, 1, 0, 0, 1, 0, 0, 1, 1, 1];
2250        let file = tempfile::tempfile().unwrap();
2251        let props = Arc::new(
2252            WriterProperties::builder()
2253                .set_statistics_enabled(EnabledStatistics::Page)
2254                .build(),
2255        );
2256        let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
2257        let mut row_group_writer = writer.next_row_group().unwrap();
2258
2259        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
2260        col_writer
2261            .typed::<Int32Type>()
2262            .write_batch(&data, Some(&def_levels), Some(&rep_levels))
2263            .unwrap();
2264        col_writer.close().unwrap();
2265        row_group_writer.close().unwrap();
2266        let file_metadata = writer.close().unwrap();
2267
2268        assert_eq!(file_metadata.num_row_groups(), 1);
2269        assert_eq!(file_metadata.row_group(0).num_columns(), 1);
2270
2271        let check_def_hist = |def_hist: &[i64]| {
2272            assert_eq!(def_hist.len(), 4);
2273            assert_eq!(def_hist[0], 1);
2274            assert_eq!(def_hist[1], 1);
2275            assert_eq!(def_hist[2], 1);
2276            assert_eq!(def_hist[3], 7);
2277        };
2278
2279        let check_rep_hist = |rep_hist: &[i64]| {
2280            assert_eq!(rep_hist.len(), 2);
2281            assert_eq!(rep_hist[0], 5);
2282            assert_eq!(rep_hist[1], 5);
2283        };
2284
2285        // check that histograms are set properly in the write and read metadata
2286        // also check that unencoded_byte_array_data_bytes is not set
2287        let meta_data = file_metadata.row_group(0).column(0);
2288        assert!(meta_data.repetition_level_histogram().is_some());
2289        assert!(meta_data.definition_level_histogram().is_some());
2290        assert!(meta_data.unencoded_byte_array_data_bytes().is_none());
2291        check_def_hist(meta_data.definition_level_histogram().unwrap().values());
2292        check_rep_hist(meta_data.repetition_level_histogram().unwrap().values());
2293
2294        // check that the read metadata is also correct
2295        let options = ReadOptionsBuilder::new().with_page_index().build();
2296        let reader = SerializedFileReader::new_with_options(file, options).unwrap();
2297
2298        let rfile_metadata = reader.metadata().file_metadata();
2299        assert_eq!(
2300            rfile_metadata.num_rows(),
2301            file_metadata.file_metadata().num_rows()
2302        );
2303        assert_eq!(reader.num_row_groups(), 1);
2304        let rowgroup = reader.get_row_group(0).unwrap();
2305        assert_eq!(rowgroup.num_columns(), 1);
2306        let column = rowgroup.metadata().column(0);
2307        assert!(column.definition_level_histogram().is_some());
2308        assert!(column.repetition_level_histogram().is_some());
2309        assert!(column.unencoded_byte_array_data_bytes().is_none());
2310        check_def_hist(column.definition_level_histogram().unwrap().values());
2311        check_rep_hist(column.repetition_level_histogram().unwrap().values());
2312
2313        // check histogram in column index as well
2314        assert!(reader.metadata().column_index().is_some());
2315        let column_index = reader.metadata().column_index().unwrap();
2316        assert_eq!(column_index.len(), 1);
2317        assert_eq!(column_index[0].len(), 1);
2318        let col_idx = if let ColumnIndexMetaData::INT32(index) = &column_index[0][0] {
2319            assert_eq!(index.num_pages(), 1);
2320            index
2321        } else {
2322            unreachable!()
2323        };
2324
2325        check_def_hist(col_idx.definition_level_histogram(0).unwrap());
2326        check_rep_hist(col_idx.repetition_level_histogram(0).unwrap());
2327
2328        assert!(reader.metadata().offset_index().is_some());
2329        let offset_index = reader.metadata().offset_index().unwrap();
2330        assert_eq!(offset_index.len(), 1);
2331        assert_eq!(offset_index[0].len(), 1);
2332        assert!(offset_index[0][0].unencoded_byte_array_data_bytes.is_none());
2333    }
2334
2335    #[test]
2336    #[cfg(feature = "arrow")]
2337    fn test_byte_stream_split_extended_roundtrip() {
2338        let path = format!(
2339            "{}/byte_stream_split_extended.gzip.parquet",
2340            arrow::util::test_util::parquet_test_data(),
2341        );
2342        let file = File::open(path).unwrap();
2343
2344        // Read in test file and rewrite to tmp
2345        let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(file)
2346            .expect("parquet open")
2347            .build()
2348            .expect("parquet open");
2349
2350        let file = tempfile::tempfile().unwrap();
2351        let props = WriterProperties::builder()
2352            .set_dictionary_enabled(false)
2353            .set_column_encoding(
2354                ColumnPath::from("float16_byte_stream_split"),
2355                Encoding::BYTE_STREAM_SPLIT,
2356            )
2357            .set_column_encoding(
2358                ColumnPath::from("float_byte_stream_split"),
2359                Encoding::BYTE_STREAM_SPLIT,
2360            )
2361            .set_column_encoding(
2362                ColumnPath::from("double_byte_stream_split"),
2363                Encoding::BYTE_STREAM_SPLIT,
2364            )
2365            .set_column_encoding(
2366                ColumnPath::from("int32_byte_stream_split"),
2367                Encoding::BYTE_STREAM_SPLIT,
2368            )
2369            .set_column_encoding(
2370                ColumnPath::from("int64_byte_stream_split"),
2371                Encoding::BYTE_STREAM_SPLIT,
2372            )
2373            .set_column_encoding(
2374                ColumnPath::from("flba5_byte_stream_split"),
2375                Encoding::BYTE_STREAM_SPLIT,
2376            )
2377            .set_column_encoding(
2378                ColumnPath::from("decimal_byte_stream_split"),
2379                Encoding::BYTE_STREAM_SPLIT,
2380            )
2381            .build();
2382
2383        let mut parquet_writer = ArrowWriter::try_new(
2384            file.try_clone().expect("cannot open file"),
2385            parquet_reader.schema(),
2386            Some(props),
2387        )
2388        .expect("create arrow writer");
2389
2390        for maybe_batch in parquet_reader {
2391            let batch = maybe_batch.expect("reading batch");
2392            parquet_writer.write(&batch).expect("writing data");
2393        }
2394
2395        parquet_writer.close().expect("finalizing file");
2396
2397        let reader = SerializedFileReader::new(file).expect("Failed to create reader");
2398        let filemeta = reader.metadata();
2399
2400        // Make sure byte_stream_split encoding was used
2401        let check_encoding = |x: usize, filemeta: &ParquetMetaData| {
2402            assert!(
2403                filemeta
2404                    .row_group(0)
2405                    .column(x)
2406                    .encodings()
2407                    .collect::<Vec<_>>()
2408                    .contains(&Encoding::BYTE_STREAM_SPLIT)
2409            );
2410        };
2411
2412        check_encoding(1, filemeta);
2413        check_encoding(3, filemeta);
2414        check_encoding(5, filemeta);
2415        check_encoding(7, filemeta);
2416        check_encoding(9, filemeta);
2417        check_encoding(11, filemeta);
2418        check_encoding(13, filemeta);
2419
2420        // Read back tmpfile and make sure all values are correct
2421        let mut iter = reader
2422            .get_row_iter(None)
2423            .expect("Failed to create row iterator");
2424
2425        let mut start = 0;
2426        let end = reader.metadata().file_metadata().num_rows();
2427
2428        let check_row = |row: Result<Row, ParquetError>| {
2429            assert!(row.is_ok());
2430            let r = row.unwrap();
2431            assert_eq!(r.get_float16(0).unwrap(), r.get_float16(1).unwrap());
2432            assert_eq!(r.get_float(2).unwrap(), r.get_float(3).unwrap());
2433            assert_eq!(r.get_double(4).unwrap(), r.get_double(5).unwrap());
2434            assert_eq!(r.get_int(6).unwrap(), r.get_int(7).unwrap());
2435            assert_eq!(r.get_long(8).unwrap(), r.get_long(9).unwrap());
2436            assert_eq!(r.get_bytes(10).unwrap(), r.get_bytes(11).unwrap());
2437            assert_eq!(r.get_decimal(12).unwrap(), r.get_decimal(13).unwrap());
2438        };
2439
2440        while start < end {
2441            match iter.next() {
2442                Some(row) => check_row(row),
2443                None => break,
2444            };
2445            start += 1;
2446        }
2447    }
2448
2449    #[test]
2450    fn test_rewrite_no_page_indexes() {
2451        let file = get_test_file("alltypes_tiny_pages.parquet");
2452        let metadata = ParquetMetaDataReader::new()
2453            .with_page_index_policy(PageIndexPolicy::Optional)
2454            .parse_and_finish(&file)
2455            .unwrap();
2456
2457        let props = Arc::new(WriterProperties::builder().build());
2458        let schema = metadata.file_metadata().schema_descr().root_schema_ptr();
2459        let output = Vec::<u8>::new();
2460        let mut writer = SerializedFileWriter::new(output, schema, props).unwrap();
2461
2462        for rg in metadata.row_groups() {
2463            let mut rg_out = writer.next_row_group().unwrap();
2464            for column in rg.columns() {
2465                let result = ColumnCloseResult {
2466                    bytes_written: column.compressed_size() as _,
2467                    rows_written: rg.num_rows() as _,
2468                    metadata: column.clone(),
2469                    bloom_filter: None,
2470                    column_index: None,
2471                    offset_index: None,
2472                };
2473                rg_out.append_column(&file, result).unwrap();
2474            }
2475            rg_out.close().unwrap();
2476        }
2477        writer.close().unwrap();
2478    }
2479
2480    #[test]
2481    fn test_rewrite_missing_column_index() {
2482        // this file has an INT96 column that lacks a column index entry
2483        let file = get_test_file("alltypes_tiny_pages.parquet");
2484        let metadata = ParquetMetaDataReader::new()
2485            .with_page_index_policy(PageIndexPolicy::Optional)
2486            .parse_and_finish(&file)
2487            .unwrap();
2488
2489        let props = Arc::new(WriterProperties::builder().build());
2490        let schema = metadata.file_metadata().schema_descr().root_schema_ptr();
2491        let output = Vec::<u8>::new();
2492        let mut writer = SerializedFileWriter::new(output, schema, props).unwrap();
2493
2494        let column_indexes = metadata.column_index();
2495        let offset_indexes = metadata.offset_index();
2496
2497        for (rg_idx, rg) in metadata.row_groups().iter().enumerate() {
2498            let rg_column_indexes = column_indexes.and_then(|ci| ci.get(rg_idx));
2499            let rg_offset_indexes = offset_indexes.and_then(|oi| oi.get(rg_idx));
2500            let mut rg_out = writer.next_row_group().unwrap();
2501            for (col_idx, column) in rg.columns().iter().enumerate() {
2502                let column_index = rg_column_indexes.and_then(|row| row.get(col_idx)).cloned();
2503                let offset_index = rg_offset_indexes.and_then(|row| row.get(col_idx)).cloned();
2504                let result = ColumnCloseResult {
2505                    bytes_written: column.compressed_size() as _,
2506                    rows_written: rg.num_rows() as _,
2507                    metadata: column.clone(),
2508                    bloom_filter: None,
2509                    column_index,
2510                    offset_index,
2511                };
2512                rg_out.append_column(&file, result).unwrap();
2513            }
2514            rg_out.close().unwrap();
2515        }
2516        writer.close().unwrap();
2517    }
2518}