parquet/file/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains file writer API, and provides methods to write row groups and columns by
19//! using row group writers and column writers respectively.
20
21use crate::bloom_filter::Sbbf;
22use crate::format as parquet;
23use crate::format::{ColumnIndex, OffsetIndex};
24use crate::thrift::TSerializable;
25use std::fmt::Debug;
26use std::io::{BufWriter, IoSlice, Read};
27use std::{io::Write, sync::Arc};
28use thrift::protocol::TCompactOutputProtocol;
29
30use crate::column::page_encryption::PageEncryptor;
31use crate::column::writer::{get_typed_column_writer_mut, ColumnCloseResult, ColumnWriterImpl};
32use crate::column::{
33    page::{CompressedPage, PageWriteSpec, PageWriter},
34    writer::{get_column_writer, ColumnWriter},
35};
36use crate::data_type::DataType;
37#[cfg(feature = "encryption")]
38use crate::encryption::encrypt::{
39    get_column_crypto_metadata, FileEncryptionProperties, FileEncryptor,
40};
41use crate::errors::{ParquetError, Result};
42use crate::file::properties::{BloomFilterPosition, WriterPropertiesPtr};
43use crate::file::reader::ChunkReader;
44#[cfg(feature = "encryption")]
45use crate::file::PARQUET_MAGIC_ENCR_FOOTER;
46use crate::file::{metadata::*, PARQUET_MAGIC};
47use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor, TypePtr};
48
49/// A wrapper around a [`Write`] that keeps track of the number
50/// of bytes that have been written. The given [`Write`] is wrapped
51/// with a [`BufWriter`] to optimize writing performance.
52pub struct TrackedWrite<W: Write> {
53    inner: BufWriter<W>,
54    bytes_written: usize,
55}
56
57impl<W: Write> TrackedWrite<W> {
58    /// Create a new [`TrackedWrite`] from a [`Write`]
59    pub fn new(inner: W) -> Self {
60        let buf_write = BufWriter::new(inner);
61        Self {
62            inner: buf_write,
63            bytes_written: 0,
64        }
65    }
66
67    /// Returns the number of bytes written to this instance
68    pub fn bytes_written(&self) -> usize {
69        self.bytes_written
70    }
71
72    /// Returns a reference to the underlying writer.
73    pub fn inner(&self) -> &W {
74        self.inner.get_ref()
75    }
76
77    /// Returns a mutable reference to the underlying writer.
78    ///
79    /// It is inadvisable to directly write to the underlying writer, doing so
80    /// will likely result in data corruption
81    pub fn inner_mut(&mut self) -> &mut W {
82        self.inner.get_mut()
83    }
84
85    /// Returns the underlying writer.
86    pub fn into_inner(self) -> Result<W> {
87        self.inner.into_inner().map_err(|err| {
88            ParquetError::General(format!("fail to get inner writer: {:?}", err.to_string()))
89        })
90    }
91}
92
93impl<W: Write> Write for TrackedWrite<W> {
94    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
95        let bytes = self.inner.write(buf)?;
96        self.bytes_written += bytes;
97        Ok(bytes)
98    }
99
100    fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> std::io::Result<usize> {
101        let bytes = self.inner.write_vectored(bufs)?;
102        self.bytes_written += bytes;
103        Ok(bytes)
104    }
105
106    fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
107        self.inner.write_all(buf)?;
108        self.bytes_written += buf.len();
109
110        Ok(())
111    }
112
113    fn flush(&mut self) -> std::io::Result<()> {
114        self.inner.flush()
115    }
116}
117
118/// Callback invoked on closing a column chunk
119pub type OnCloseColumnChunk<'a> = Box<dyn FnOnce(ColumnCloseResult) -> Result<()> + 'a>;
120
121/// Callback invoked on closing a row group, arguments are:
122///
123/// - the row group metadata
124/// - the column index for each column chunk
125/// - the offset index for each column chunk
126pub type OnCloseRowGroup<'a, W> = Box<
127    dyn FnOnce(
128            &'a mut TrackedWrite<W>,
129            RowGroupMetaData,
130            Vec<Option<Sbbf>>,
131            Vec<Option<ColumnIndex>>,
132            Vec<Option<OffsetIndex>>,
133        ) -> Result<()>
134        + 'a
135        + Send,
136>;
137
138// ----------------------------------------------------------------------
139// Serialized impl for file & row group writers
140
141/// Parquet file writer API.
142/// Provides methods to write row groups sequentially.
143///
144/// The main workflow should be as following:
145/// - Create file writer, this will open a new file and potentially write some metadata.
146/// - Request a new row group writer by calling `next_row_group`.
147/// - Once finished writing row group, close row group writer by calling `close`
148/// - Write subsequent row groups, if necessary.
149/// - After all row groups have been written, close the file writer using `close` method.
150pub struct SerializedFileWriter<W: Write> {
151    buf: TrackedWrite<W>,
152    schema: TypePtr,
153    descr: SchemaDescPtr,
154    props: WriterPropertiesPtr,
155    row_groups: Vec<RowGroupMetaData>,
156    bloom_filters: Vec<Vec<Option<Sbbf>>>,
157    column_indexes: Vec<Vec<Option<ColumnIndex>>>,
158    offset_indexes: Vec<Vec<Option<OffsetIndex>>>,
159    row_group_index: usize,
160    // kv_metadatas will be appended to `props` when `write_metadata`
161    kv_metadatas: Vec<KeyValue>,
162    finished: bool,
163    #[cfg(feature = "encryption")]
164    file_encryptor: Option<Arc<FileEncryptor>>,
165}
166
167impl<W: Write> Debug for SerializedFileWriter<W> {
168    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
169        // implement Debug so this can be used with #[derive(Debug)]
170        // in client code rather than actually listing all the fields
171        f.debug_struct("SerializedFileWriter")
172            .field("descr", &self.descr)
173            .field("row_group_index", &self.row_group_index)
174            .field("kv_metadatas", &self.kv_metadatas)
175            .finish_non_exhaustive()
176    }
177}
178
179impl<W: Write + Send> SerializedFileWriter<W> {
180    /// Creates new file writer.
181    pub fn new(buf: W, schema: TypePtr, properties: WriterPropertiesPtr) -> Result<Self> {
182        let mut buf = TrackedWrite::new(buf);
183
184        let schema_descriptor = SchemaDescriptor::new(schema.clone());
185
186        #[cfg(feature = "encryption")]
187        let file_encryptor = Self::get_file_encryptor(&properties, &schema_descriptor)?;
188
189        Self::start_file(&properties, &mut buf)?;
190        Ok(Self {
191            buf,
192            schema,
193            descr: Arc::new(schema_descriptor),
194            props: properties,
195            row_groups: vec![],
196            bloom_filters: vec![],
197            column_indexes: Vec::new(),
198            offset_indexes: Vec::new(),
199            row_group_index: 0,
200            kv_metadatas: Vec::new(),
201            finished: false,
202            #[cfg(feature = "encryption")]
203            file_encryptor,
204        })
205    }
206
207    #[cfg(feature = "encryption")]
208    fn get_file_encryptor(
209        properties: &WriterPropertiesPtr,
210        schema_descriptor: &SchemaDescriptor,
211    ) -> Result<Option<Arc<FileEncryptor>>> {
212        if let Some(file_encryption_properties) = &properties.file_encryption_properties {
213            file_encryption_properties.validate_encrypted_column_names(schema_descriptor)?;
214
215            Ok(Some(Arc::new(FileEncryptor::new(
216                file_encryption_properties.clone(),
217            )?)))
218        } else {
219            Ok(None)
220        }
221    }
222
223    /// Creates new row group from this file writer.
224    /// In case of IO error or Thrift error, returns `Err`.
225    ///
226    /// There can be at most 2^15 row groups in a file; and row groups have
227    /// to be written sequentially. Every time the next row group is requested, the
228    /// previous row group must be finalised and closed using `RowGroupWriter::close` method.
229    pub fn next_row_group(&mut self) -> Result<SerializedRowGroupWriter<'_, W>> {
230        self.assert_previous_writer_closed()?;
231        let ordinal = self.row_group_index;
232
233        let ordinal: i16 = ordinal.try_into().map_err(|_| {
234            ParquetError::General(format!(
235                "Parquet does not support more than {} row groups per file (currently: {})",
236                i16::MAX,
237                ordinal
238            ))
239        })?;
240
241        self.row_group_index = self
242            .row_group_index
243            .checked_add(1)
244            .expect("SerializedFileWriter::row_group_index overflowed");
245
246        let bloom_filter_position = self.properties().bloom_filter_position();
247        let row_groups = &mut self.row_groups;
248        let row_bloom_filters = &mut self.bloom_filters;
249        let row_column_indexes = &mut self.column_indexes;
250        let row_offset_indexes = &mut self.offset_indexes;
251        let on_close = move |buf,
252                             mut metadata,
253                             row_group_bloom_filter,
254                             row_group_column_index,
255                             row_group_offset_index| {
256            row_bloom_filters.push(row_group_bloom_filter);
257            row_column_indexes.push(row_group_column_index);
258            row_offset_indexes.push(row_group_offset_index);
259            // write bloom filters out immediately after the row group if requested
260            match bloom_filter_position {
261                BloomFilterPosition::AfterRowGroup => {
262                    write_bloom_filters(buf, row_bloom_filters, &mut metadata)?
263                }
264                BloomFilterPosition::End => (),
265            };
266            row_groups.push(metadata);
267            Ok(())
268        };
269
270        let row_group_writer = SerializedRowGroupWriter::new(
271            self.descr.clone(),
272            self.props.clone(),
273            &mut self.buf,
274            ordinal,
275            Some(Box::new(on_close)),
276        );
277        #[cfg(feature = "encryption")]
278        let row_group_writer = row_group_writer.with_file_encryptor(self.file_encryptor.clone());
279
280        Ok(row_group_writer)
281    }
282
283    /// Returns metadata for any flushed row groups
284    pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
285        &self.row_groups
286    }
287
288    /// Close and finalize the underlying Parquet writer
289    ///
290    /// Unlike [`Self::close`] this does not consume self
291    ///
292    /// Attempting to write after calling finish will result in an error
293    pub fn finish(&mut self) -> Result<parquet::FileMetaData> {
294        self.assert_previous_writer_closed()?;
295        let metadata = self.write_metadata()?;
296        self.buf.flush()?;
297        Ok(metadata)
298    }
299
300    /// Closes and finalises file writer, returning the file metadata.
301    pub fn close(mut self) -> Result<parquet::FileMetaData> {
302        self.finish()
303    }
304
305    /// Writes magic bytes at the beginning of the file.
306    #[cfg(not(feature = "encryption"))]
307    fn start_file(_properties: &WriterPropertiesPtr, buf: &mut TrackedWrite<W>) -> Result<()> {
308        buf.write_all(get_file_magic())?;
309        Ok(())
310    }
311
312    /// Writes magic bytes at the beginning of the file.
313    #[cfg(feature = "encryption")]
314    fn start_file(properties: &WriterPropertiesPtr, buf: &mut TrackedWrite<W>) -> Result<()> {
315        let magic = get_file_magic(properties.file_encryption_properties.as_ref());
316
317        buf.write_all(magic)?;
318        Ok(())
319    }
320
321    /// Assembles and writes metadata at the end of the file.
322    fn write_metadata(&mut self) -> Result<parquet::FileMetaData> {
323        self.finished = true;
324
325        // write out any remaining bloom filters after all row groups
326        for row_group in &mut self.row_groups {
327            write_bloom_filters(&mut self.buf, &mut self.bloom_filters, row_group)?;
328        }
329
330        let key_value_metadata = match self.props.key_value_metadata() {
331            Some(kv) => Some(kv.iter().chain(&self.kv_metadatas).cloned().collect()),
332            None if self.kv_metadatas.is_empty() => None,
333            None => Some(self.kv_metadatas.clone()),
334        };
335
336        let row_groups = self
337            .row_groups
338            .iter()
339            .map(|v| v.to_thrift())
340            .collect::<Vec<_>>();
341
342        let mut encoder = ThriftMetadataWriter::new(
343            &mut self.buf,
344            &self.schema,
345            &self.descr,
346            row_groups,
347            Some(self.props.created_by().to_string()),
348            self.props.writer_version().as_num(),
349        );
350
351        #[cfg(feature = "encryption")]
352        {
353            encoder = encoder.with_file_encryptor(self.file_encryptor.clone());
354        }
355
356        if let Some(key_value_metadata) = key_value_metadata {
357            encoder = encoder.with_key_value_metadata(key_value_metadata)
358        }
359        encoder = encoder.with_column_indexes(&self.column_indexes);
360        encoder = encoder.with_offset_indexes(&self.offset_indexes);
361        encoder.finish()
362    }
363
364    #[inline]
365    fn assert_previous_writer_closed(&self) -> Result<()> {
366        if self.finished {
367            return Err(general_err!("SerializedFileWriter already finished"));
368        }
369
370        if self.row_group_index != self.row_groups.len() {
371            Err(general_err!("Previous row group writer was not closed"))
372        } else {
373            Ok(())
374        }
375    }
376
377    /// Add a [`KeyValue`] to the file writer's metadata
378    pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
379        self.kv_metadatas.push(kv_metadata);
380    }
381
382    /// Returns a reference to schema descriptor.
383    pub fn schema_descr(&self) -> &SchemaDescriptor {
384        &self.descr
385    }
386
387    /// Returns a reference to the writer properties
388    pub fn properties(&self) -> &WriterPropertiesPtr {
389        &self.props
390    }
391
392    /// Returns a reference to the underlying writer.
393    pub fn inner(&self) -> &W {
394        self.buf.inner()
395    }
396
397    /// Writes the given buf bytes to the internal buffer.
398    ///
399    /// This can be used to write raw data to an in-progress parquet file, for
400    ///  example, custom index structures or other payloads. Other parquet readers
401    /// will skip this data when reading the files.
402    ///
403    /// It's safe to use this method to write data to the underlying writer,
404    /// because it will ensure that the buffering and byte‐counting layers are used.
405    pub fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
406        self.buf.write_all(buf)
407    }
408
409    /// Returns a mutable reference to the underlying writer.
410    ///
411    /// **Warning**: if you write directly to this writer, you will skip
412    /// the `TrackedWrite` buffering and byte‐counting layers. That’ll cause
413    /// the file footer’s recorded offsets and sizes to diverge from reality,
414    /// resulting in an unreadable or corrupted Parquet file.
415    ///
416    /// If you want to write safely to the underlying writer, use [`Self::write_all`].
417    pub fn inner_mut(&mut self) -> &mut W {
418        self.buf.inner_mut()
419    }
420
421    /// Writes the file footer and returns the underlying writer.
422    pub fn into_inner(mut self) -> Result<W> {
423        self.assert_previous_writer_closed()?;
424        let _ = self.write_metadata()?;
425
426        self.buf.into_inner()
427    }
428
429    /// Returns the number of bytes written to this instance
430    pub fn bytes_written(&self) -> usize {
431        self.buf.bytes_written()
432    }
433
434    /// Get the file encryptor used by this instance to encrypt data
435    #[cfg(feature = "encryption")]
436    pub(crate) fn file_encryptor(&self) -> Option<Arc<FileEncryptor>> {
437        self.file_encryptor.clone()
438    }
439}
440
441/// Serialize all the bloom filters of the given row group to the given buffer,
442/// and returns the updated row group metadata.
443fn write_bloom_filters<W: Write + Send>(
444    buf: &mut TrackedWrite<W>,
445    bloom_filters: &mut [Vec<Option<Sbbf>>],
446    row_group: &mut RowGroupMetaData,
447) -> Result<()> {
448    // iter row group
449    // iter each column
450    // write bloom filter to the file
451
452    let row_group_idx: u16 = row_group
453        .ordinal()
454        .expect("Missing row group ordinal")
455        .try_into()
456        .map_err(|_| {
457            ParquetError::General(format!(
458                "Negative row group ordinal: {})",
459                row_group.ordinal().unwrap()
460            ))
461        })?;
462    let row_group_idx = row_group_idx as usize;
463    for (column_idx, column_chunk) in row_group.columns_mut().iter_mut().enumerate() {
464        if let Some(bloom_filter) = bloom_filters[row_group_idx][column_idx].take() {
465            let start_offset = buf.bytes_written();
466            bloom_filter.write(&mut *buf)?;
467            let end_offset = buf.bytes_written();
468            // set offset and index for bloom filter
469            *column_chunk = column_chunk
470                .clone()
471                .into_builder()
472                .set_bloom_filter_offset(Some(start_offset as i64))
473                .set_bloom_filter_length(Some((end_offset - start_offset) as i32))
474                .build()?;
475        }
476    }
477    Ok(())
478}
479
480/// Parquet row group writer API.
481/// Provides methods to access column writers in an iterator-like fashion, order is
482/// guaranteed to match the order of schema leaves (column descriptors).
483///
484/// All columns should be written sequentially; the main workflow is:
485/// - Request the next column using `next_column` method - this will return `None` if no
486///   more columns are available to write.
487/// - Once done writing a column, close column writer with `close`
488/// - Once all columns have been written, close row group writer with `close`
489///   method. THe close method will return row group metadata and is no-op
490///   on already closed row group.
491pub struct SerializedRowGroupWriter<'a, W: Write> {
492    descr: SchemaDescPtr,
493    props: WriterPropertiesPtr,
494    buf: &'a mut TrackedWrite<W>,
495    total_rows_written: Option<u64>,
496    total_bytes_written: u64,
497    total_uncompressed_bytes: i64,
498    column_index: usize,
499    row_group_metadata: Option<RowGroupMetaDataPtr>,
500    column_chunks: Vec<ColumnChunkMetaData>,
501    bloom_filters: Vec<Option<Sbbf>>,
502    column_indexes: Vec<Option<ColumnIndex>>,
503    offset_indexes: Vec<Option<OffsetIndex>>,
504    row_group_index: i16,
505    file_offset: i64,
506    on_close: Option<OnCloseRowGroup<'a, W>>,
507    #[cfg(feature = "encryption")]
508    file_encryptor: Option<Arc<FileEncryptor>>,
509}
510
511impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> {
512    /// Creates a new `SerializedRowGroupWriter` with:
513    ///
514    /// - `schema_descr` - the schema to write
515    /// - `properties` - writer properties
516    /// - `buf` - the buffer to write data to
517    /// - `row_group_index` - row group index in this parquet file.
518    /// - `file_offset` - file offset of this row group in this parquet file.
519    /// - `on_close` - an optional callback that will invoked on [`Self::close`]
520    pub fn new(
521        schema_descr: SchemaDescPtr,
522        properties: WriterPropertiesPtr,
523        buf: &'a mut TrackedWrite<W>,
524        row_group_index: i16,
525        on_close: Option<OnCloseRowGroup<'a, W>>,
526    ) -> Self {
527        let num_columns = schema_descr.num_columns();
528        let file_offset = buf.bytes_written() as i64;
529        Self {
530            buf,
531            row_group_index,
532            file_offset,
533            on_close,
534            total_rows_written: None,
535            descr: schema_descr,
536            props: properties,
537            column_index: 0,
538            row_group_metadata: None,
539            column_chunks: Vec::with_capacity(num_columns),
540            bloom_filters: Vec::with_capacity(num_columns),
541            column_indexes: Vec::with_capacity(num_columns),
542            offset_indexes: Vec::with_capacity(num_columns),
543            total_bytes_written: 0,
544            total_uncompressed_bytes: 0,
545            #[cfg(feature = "encryption")]
546            file_encryptor: None,
547        }
548    }
549
550    #[cfg(feature = "encryption")]
551    /// Set the file encryptor to use for encrypting row group data and metadata
552    pub(crate) fn with_file_encryptor(
553        mut self,
554        file_encryptor: Option<Arc<FileEncryptor>>,
555    ) -> Self {
556        self.file_encryptor = file_encryptor;
557        self
558    }
559
560    /// Advance `self.column_index` returning the next [`ColumnDescPtr`] if any
561    fn next_column_desc(&mut self) -> Option<ColumnDescPtr> {
562        let ret = self.descr.columns().get(self.column_index)?.clone();
563        self.column_index += 1;
564        Some(ret)
565    }
566
567    /// Returns [`OnCloseColumnChunk`] for the next writer
568    fn get_on_close(&mut self) -> (&mut TrackedWrite<W>, OnCloseColumnChunk<'_>) {
569        let total_bytes_written = &mut self.total_bytes_written;
570        let total_uncompressed_bytes = &mut self.total_uncompressed_bytes;
571        let total_rows_written = &mut self.total_rows_written;
572        let column_chunks = &mut self.column_chunks;
573        let column_indexes = &mut self.column_indexes;
574        let offset_indexes = &mut self.offset_indexes;
575        let bloom_filters = &mut self.bloom_filters;
576
577        let on_close = |r: ColumnCloseResult| {
578            // Update row group writer metrics
579            *total_bytes_written += r.bytes_written;
580            *total_uncompressed_bytes += r.metadata.uncompressed_size();
581            column_chunks.push(r.metadata);
582            bloom_filters.push(r.bloom_filter);
583            column_indexes.push(r.column_index);
584            offset_indexes.push(r.offset_index);
585
586            if let Some(rows) = *total_rows_written {
587                if rows != r.rows_written {
588                    return Err(general_err!(
589                        "Incorrect number of rows, expected {} != {} rows",
590                        rows,
591                        r.rows_written
592                    ));
593                }
594            } else {
595                *total_rows_written = Some(r.rows_written);
596            }
597
598            Ok(())
599        };
600        (self.buf, Box::new(on_close))
601    }
602
603    /// Returns the next column writer, if available, using the factory function;
604    /// otherwise returns `None`.
605    pub(crate) fn next_column_with_factory<'b, F, C>(&'b mut self, factory: F) -> Result<Option<C>>
606    where
607        F: FnOnce(
608            ColumnDescPtr,
609            WriterPropertiesPtr,
610            Box<dyn PageWriter + 'b>,
611            OnCloseColumnChunk<'b>,
612        ) -> Result<C>,
613    {
614        self.assert_previous_writer_closed()?;
615
616        let encryptor_context = self.get_page_encryptor_context();
617
618        Ok(match self.next_column_desc() {
619            Some(column) => {
620                let props = self.props.clone();
621                let (buf, on_close) = self.get_on_close();
622
623                let page_writer = SerializedPageWriter::new(buf);
624                let page_writer =
625                    Self::set_page_writer_encryptor(&column, encryptor_context, page_writer)?;
626
627                Some(factory(
628                    column,
629                    props,
630                    Box::new(page_writer),
631                    Box::new(on_close),
632                )?)
633            }
634            None => None,
635        })
636    }
637
638    /// Returns the next column writer, if available; otherwise returns `None`.
639    /// In case of any IO error or Thrift error, or if row group writer has already been
640    /// closed returns `Err`.
641    pub fn next_column(&mut self) -> Result<Option<SerializedColumnWriter<'_>>> {
642        self.next_column_with_factory(|descr, props, page_writer, on_close| {
643            let column_writer = get_column_writer(descr, props, page_writer);
644            Ok(SerializedColumnWriter::new(column_writer, Some(on_close)))
645        })
646    }
647
648    /// Append an encoded column chunk from another source without decoding it
649    ///
650    /// This can be used for efficiently concatenating or projecting parquet data,
651    /// or encoding parquet data to temporary in-memory buffers
652    ///
653    /// See [`Self::next_column`] for writing data that isn't already encoded
654    pub fn append_column<R: ChunkReader>(
655        &mut self,
656        reader: &R,
657        mut close: ColumnCloseResult,
658    ) -> Result<()> {
659        self.assert_previous_writer_closed()?;
660        let desc = self
661            .next_column_desc()
662            .ok_or_else(|| general_err!("exhausted columns in SerializedRowGroupWriter"))?;
663
664        let metadata = close.metadata;
665
666        if metadata.column_descr() != desc.as_ref() {
667            return Err(general_err!(
668                "column descriptor mismatch, expected {:?} got {:?}",
669                desc,
670                metadata.column_descr()
671            ));
672        }
673
674        let src_dictionary_offset = metadata.dictionary_page_offset();
675        let src_data_offset = metadata.data_page_offset();
676        let src_offset = src_dictionary_offset.unwrap_or(src_data_offset);
677        let src_length = metadata.compressed_size();
678
679        let write_offset = self.buf.bytes_written();
680        let mut read = reader.get_read(src_offset as _)?.take(src_length as _);
681        let write_length = std::io::copy(&mut read, &mut self.buf)?;
682
683        if src_length as u64 != write_length {
684            return Err(general_err!(
685                "Failed to splice column data, expected {read_length} got {write_length}"
686            ));
687        }
688
689        let map_offset = |x| x - src_offset + write_offset as i64;
690        let mut builder = ColumnChunkMetaData::builder(metadata.column_descr_ptr())
691            .set_compression(metadata.compression())
692            .set_encodings(metadata.encodings().clone())
693            .set_total_compressed_size(metadata.compressed_size())
694            .set_total_uncompressed_size(metadata.uncompressed_size())
695            .set_num_values(metadata.num_values())
696            .set_data_page_offset(map_offset(src_data_offset))
697            .set_dictionary_page_offset(src_dictionary_offset.map(map_offset))
698            .set_unencoded_byte_array_data_bytes(metadata.unencoded_byte_array_data_bytes());
699
700        if let Some(rep_hist) = metadata.repetition_level_histogram() {
701            builder = builder.set_repetition_level_histogram(Some(rep_hist.clone()))
702        }
703        if let Some(def_hist) = metadata.definition_level_histogram() {
704            builder = builder.set_definition_level_histogram(Some(def_hist.clone()))
705        }
706        if let Some(statistics) = metadata.statistics() {
707            builder = builder.set_statistics(statistics.clone())
708        }
709        if let Some(page_encoding_stats) = metadata.page_encoding_stats() {
710            builder = builder.set_page_encoding_stats(page_encoding_stats.clone())
711        }
712        builder = self.set_column_crypto_metadata(builder, &metadata);
713        close.metadata = builder.build()?;
714
715        if let Some(offsets) = close.offset_index.as_mut() {
716            for location in &mut offsets.page_locations {
717                location.offset = map_offset(location.offset)
718            }
719        }
720
721        let (_, on_close) = self.get_on_close();
722        on_close(close)
723    }
724
725    /// Closes this row group writer and returns row group metadata.
726    pub fn close(mut self) -> Result<RowGroupMetaDataPtr> {
727        if self.row_group_metadata.is_none() {
728            self.assert_previous_writer_closed()?;
729
730            let column_chunks = std::mem::take(&mut self.column_chunks);
731            let row_group_metadata = RowGroupMetaData::builder(self.descr.clone())
732                .set_column_metadata(column_chunks)
733                .set_total_byte_size(self.total_uncompressed_bytes)
734                .set_num_rows(self.total_rows_written.unwrap_or(0) as i64)
735                .set_sorting_columns(self.props.sorting_columns().cloned())
736                .set_ordinal(self.row_group_index)
737                .set_file_offset(self.file_offset)
738                .build()?;
739
740            self.row_group_metadata = Some(Arc::new(row_group_metadata.clone()));
741
742            if let Some(on_close) = self.on_close.take() {
743                on_close(
744                    self.buf,
745                    row_group_metadata,
746                    self.bloom_filters,
747                    self.column_indexes,
748                    self.offset_indexes,
749                )?
750            }
751        }
752
753        let metadata = self.row_group_metadata.as_ref().unwrap().clone();
754        Ok(metadata)
755    }
756
757    /// Set the column crypto metadata for a column chunk
758    #[cfg(feature = "encryption")]
759    fn set_column_crypto_metadata(
760        &self,
761        builder: ColumnChunkMetaDataBuilder,
762        metadata: &ColumnChunkMetaData,
763    ) -> ColumnChunkMetaDataBuilder {
764        if let Some(file_encryptor) = self.file_encryptor.as_ref() {
765            builder.set_column_crypto_metadata(get_column_crypto_metadata(
766                file_encryptor.properties(),
767                &metadata.column_descr_ptr(),
768            ))
769        } else {
770            builder
771        }
772    }
773
774    /// Get context required to create a [`PageEncryptor`] for a column
775    #[cfg(feature = "encryption")]
776    fn get_page_encryptor_context(&self) -> PageEncryptorContext {
777        PageEncryptorContext {
778            file_encryptor: self.file_encryptor.clone(),
779            row_group_index: self.row_group_index as usize,
780            column_index: self.column_index,
781        }
782    }
783
784    /// Set the [`PageEncryptor`] on a page writer if a column is encrypted
785    #[cfg(feature = "encryption")]
786    fn set_page_writer_encryptor<'b>(
787        column: &ColumnDescPtr,
788        context: PageEncryptorContext,
789        page_writer: SerializedPageWriter<'b, W>,
790    ) -> Result<SerializedPageWriter<'b, W>> {
791        let page_encryptor = PageEncryptor::create_if_column_encrypted(
792            &context.file_encryptor,
793            context.row_group_index,
794            context.column_index,
795            &column.path().string(),
796        )?;
797
798        Ok(page_writer.with_page_encryptor(page_encryptor))
799    }
800
801    /// No-op implementation of setting the column crypto metadata for a column chunk
802    #[cfg(not(feature = "encryption"))]
803    fn set_column_crypto_metadata(
804        &self,
805        builder: ColumnChunkMetaDataBuilder,
806        _metadata: &ColumnChunkMetaData,
807    ) -> ColumnChunkMetaDataBuilder {
808        builder
809    }
810
811    #[cfg(not(feature = "encryption"))]
812    fn get_page_encryptor_context(&self) -> PageEncryptorContext {
813        PageEncryptorContext {}
814    }
815
816    /// No-op implementation of setting a [`PageEncryptor`] for when encryption is disabled
817    #[cfg(not(feature = "encryption"))]
818    fn set_page_writer_encryptor<'b>(
819        _column: &ColumnDescPtr,
820        _context: PageEncryptorContext,
821        page_writer: SerializedPageWriter<'b, W>,
822    ) -> Result<SerializedPageWriter<'b, W>> {
823        Ok(page_writer)
824    }
825
826    #[inline]
827    fn assert_previous_writer_closed(&self) -> Result<()> {
828        if self.column_index != self.column_chunks.len() {
829            Err(general_err!("Previous column writer was not closed"))
830        } else {
831            Ok(())
832        }
833    }
834}
835
836/// Context required to create a [`PageEncryptor`] for a column
837#[cfg(feature = "encryption")]
838struct PageEncryptorContext {
839    file_encryptor: Option<Arc<FileEncryptor>>,
840    row_group_index: usize,
841    column_index: usize,
842}
843
844#[cfg(not(feature = "encryption"))]
845struct PageEncryptorContext {}
846
847/// A wrapper around a [`ColumnWriter`] that invokes a callback on [`Self::close`]
848pub struct SerializedColumnWriter<'a> {
849    inner: ColumnWriter<'a>,
850    on_close: Option<OnCloseColumnChunk<'a>>,
851}
852
853impl<'a> SerializedColumnWriter<'a> {
854    /// Create a new [`SerializedColumnWriter`] from a [`ColumnWriter`] and an
855    /// optional callback to be invoked on [`Self::close`]
856    pub fn new(inner: ColumnWriter<'a>, on_close: Option<OnCloseColumnChunk<'a>>) -> Self {
857        Self { inner, on_close }
858    }
859
860    /// Returns a reference to an untyped [`ColumnWriter`]
861    pub fn untyped(&mut self) -> &mut ColumnWriter<'a> {
862        &mut self.inner
863    }
864
865    /// Returns a reference to a typed [`ColumnWriterImpl`]
866    pub fn typed<T: DataType>(&mut self) -> &mut ColumnWriterImpl<'a, T> {
867        get_typed_column_writer_mut(&mut self.inner)
868    }
869
870    /// Close this [`SerializedColumnWriter`]
871    pub fn close(mut self) -> Result<()> {
872        let r = self.inner.close()?;
873        if let Some(on_close) = self.on_close.take() {
874            on_close(r)?
875        }
876
877        Ok(())
878    }
879}
880
881/// A serialized implementation for Parquet [`PageWriter`].
882/// Writes and serializes pages and metadata into output stream.
883///
884/// `SerializedPageWriter` should not be used after calling `close()`.
885pub struct SerializedPageWriter<'a, W: Write> {
886    sink: &'a mut TrackedWrite<W>,
887    #[cfg(feature = "encryption")]
888    page_encryptor: Option<PageEncryptor>,
889}
890
891impl<'a, W: Write> SerializedPageWriter<'a, W> {
892    /// Creates new page writer.
893    pub fn new(sink: &'a mut TrackedWrite<W>) -> Self {
894        Self {
895            sink,
896            #[cfg(feature = "encryption")]
897            page_encryptor: None,
898        }
899    }
900
901    /// Serializes page header into Thrift.
902    /// Returns number of bytes that have been written into the sink.
903    #[inline]
904    fn serialize_page_header(&mut self, header: parquet::PageHeader) -> Result<usize> {
905        let start_pos = self.sink.bytes_written();
906        match self.page_encryptor_and_sink_mut() {
907            Some((page_encryptor, sink)) => {
908                page_encryptor.encrypt_page_header(&header, sink)?;
909            }
910            None => {
911                let mut protocol = TCompactOutputProtocol::new(&mut self.sink);
912                header.write_to_out_protocol(&mut protocol)?;
913            }
914        }
915        Ok(self.sink.bytes_written() - start_pos)
916    }
917}
918
919#[cfg(feature = "encryption")]
920impl<'a, W: Write> SerializedPageWriter<'a, W> {
921    /// Set the encryptor to use to encrypt page data
922    fn with_page_encryptor(mut self, page_encryptor: Option<PageEncryptor>) -> Self {
923        self.page_encryptor = page_encryptor;
924        self
925    }
926
927    fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
928        self.page_encryptor.as_mut()
929    }
930
931    fn page_encryptor_and_sink_mut(
932        &mut self,
933    ) -> Option<(&mut PageEncryptor, &mut &'a mut TrackedWrite<W>)> {
934        self.page_encryptor.as_mut().map(|pe| (pe, &mut self.sink))
935    }
936}
937
938#[cfg(not(feature = "encryption"))]
939impl<'a, W: Write> SerializedPageWriter<'a, W> {
940    fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
941        None
942    }
943
944    fn page_encryptor_and_sink_mut(
945        &mut self,
946    ) -> Option<(&mut PageEncryptor, &mut &'a mut TrackedWrite<W>)> {
947        None
948    }
949}
950
951impl<W: Write + Send> PageWriter for SerializedPageWriter<'_, W> {
952    fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
953        let page = match self.page_encryptor_mut() {
954            Some(page_encryptor) => page_encryptor.encrypt_compressed_page(page)?,
955            None => page,
956        };
957
958        let page_type = page.page_type();
959        let start_pos = self.sink.bytes_written() as u64;
960
961        let page_header = page.to_thrift_header();
962        let header_size = self.serialize_page_header(page_header)?;
963
964        self.sink.write_all(page.data())?;
965
966        let mut spec = PageWriteSpec::new();
967        spec.page_type = page_type;
968        spec.uncompressed_size = page.uncompressed_size() + header_size;
969        spec.compressed_size = page.compressed_size() + header_size;
970        spec.offset = start_pos;
971        spec.bytes_written = self.sink.bytes_written() as u64 - start_pos;
972        spec.num_values = page.num_values();
973
974        if let Some(page_encryptor) = self.page_encryptor_mut() {
975            if page.compressed_page().is_data_page() {
976                page_encryptor.increment_page();
977            }
978        }
979        Ok(spec)
980    }
981
982    fn close(&mut self) -> Result<()> {
983        self.sink.flush()?;
984        Ok(())
985    }
986}
987
988/// Get the magic bytes at the start and end of the file that identify this
989/// as a Parquet file.
990#[cfg(feature = "encryption")]
991pub(crate) fn get_file_magic(
992    file_encryption_properties: Option<&FileEncryptionProperties>,
993) -> &'static [u8; 4] {
994    match file_encryption_properties.as_ref() {
995        Some(encryption_properties) if encryption_properties.encrypt_footer() => {
996            &PARQUET_MAGIC_ENCR_FOOTER
997        }
998        _ => &PARQUET_MAGIC,
999    }
1000}
1001
1002#[cfg(not(feature = "encryption"))]
1003pub(crate) fn get_file_magic() -> &'static [u8; 4] {
1004    &PARQUET_MAGIC
1005}
1006
1007#[cfg(test)]
1008mod tests {
1009    use super::*;
1010
1011    #[cfg(feature = "arrow")]
1012    use arrow_array::RecordBatchReader;
1013    use bytes::Bytes;
1014    use std::fs::File;
1015
1016    #[cfg(feature = "arrow")]
1017    use crate::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
1018    #[cfg(feature = "arrow")]
1019    use crate::arrow::ArrowWriter;
1020    use crate::basic::{
1021        ColumnOrder, Compression, ConvertedType, Encoding, LogicalType, Repetition, SortOrder, Type,
1022    };
1023    use crate::column::page::{Page, PageReader};
1024    use crate::column::reader::get_typed_column_reader;
1025    use crate::compression::{create_codec, Codec, CodecOptionsBuilder};
1026    use crate::data_type::{BoolType, ByteArrayType, Int32Type};
1027    use crate::file::page_index::index::Index;
1028    use crate::file::properties::EnabledStatistics;
1029    use crate::file::serialized_reader::ReadOptionsBuilder;
1030    use crate::file::{
1031        properties::{ReaderProperties, WriterProperties, WriterVersion},
1032        reader::{FileReader, SerializedFileReader, SerializedPageReader},
1033        statistics::{from_thrift, to_thrift, Statistics},
1034    };
1035    use crate::format::SortingColumn;
1036    use crate::record::{Row, RowAccessor};
1037    use crate::schema::parser::parse_message_type;
1038    use crate::schema::types;
1039    use crate::schema::types::{ColumnDescriptor, ColumnPath};
1040    use crate::util::test_common::rand_gen::RandGen;
1041
1042    #[test]
1043    fn test_row_group_writer_error_not_all_columns_written() {
1044        let file = tempfile::tempfile().unwrap();
1045        let schema = Arc::new(
1046            types::Type::group_type_builder("schema")
1047                .with_fields(vec![Arc::new(
1048                    types::Type::primitive_type_builder("col1", Type::INT32)
1049                        .build()
1050                        .unwrap(),
1051                )])
1052                .build()
1053                .unwrap(),
1054        );
1055        let props = Default::default();
1056        let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
1057        let row_group_writer = writer.next_row_group().unwrap();
1058        let res = row_group_writer.close();
1059        assert!(res.is_err());
1060        if let Err(err) = res {
1061            assert_eq!(
1062                format!("{err}"),
1063                "Parquet error: Column length mismatch: 1 != 0"
1064            );
1065        }
1066    }
1067
1068    #[test]
1069    fn test_row_group_writer_num_records_mismatch() {
1070        let file = tempfile::tempfile().unwrap();
1071        let schema = Arc::new(
1072            types::Type::group_type_builder("schema")
1073                .with_fields(vec![
1074                    Arc::new(
1075                        types::Type::primitive_type_builder("col1", Type::INT32)
1076                            .with_repetition(Repetition::REQUIRED)
1077                            .build()
1078                            .unwrap(),
1079                    ),
1080                    Arc::new(
1081                        types::Type::primitive_type_builder("col2", Type::INT32)
1082                            .with_repetition(Repetition::REQUIRED)
1083                            .build()
1084                            .unwrap(),
1085                    ),
1086                ])
1087                .build()
1088                .unwrap(),
1089        );
1090        let props = Default::default();
1091        let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
1092        let mut row_group_writer = writer.next_row_group().unwrap();
1093
1094        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
1095        col_writer
1096            .typed::<Int32Type>()
1097            .write_batch(&[1, 2, 3], None, None)
1098            .unwrap();
1099        col_writer.close().unwrap();
1100
1101        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
1102        col_writer
1103            .typed::<Int32Type>()
1104            .write_batch(&[1, 2], None, None)
1105            .unwrap();
1106
1107        let err = col_writer.close().unwrap_err();
1108        assert_eq!(
1109            err.to_string(),
1110            "Parquet error: Incorrect number of rows, expected 3 != 2 rows"
1111        );
1112    }
1113
1114    #[test]
1115    fn test_file_writer_empty_file() {
1116        let file = tempfile::tempfile().unwrap();
1117
1118        let schema = Arc::new(
1119            types::Type::group_type_builder("schema")
1120                .with_fields(vec![Arc::new(
1121                    types::Type::primitive_type_builder("col1", Type::INT32)
1122                        .build()
1123                        .unwrap(),
1124                )])
1125                .build()
1126                .unwrap(),
1127        );
1128        let props = Default::default();
1129        let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1130        writer.close().unwrap();
1131
1132        let reader = SerializedFileReader::new(file).unwrap();
1133        assert_eq!(reader.get_row_iter(None).unwrap().count(), 0);
1134    }
1135
1136    #[test]
1137    fn test_file_writer_column_orders_populated() {
1138        let file = tempfile::tempfile().unwrap();
1139
1140        let schema = Arc::new(
1141            types::Type::group_type_builder("schema")
1142                .with_fields(vec![
1143                    Arc::new(
1144                        types::Type::primitive_type_builder("col1", Type::INT32)
1145                            .build()
1146                            .unwrap(),
1147                    ),
1148                    Arc::new(
1149                        types::Type::primitive_type_builder("col2", Type::FIXED_LEN_BYTE_ARRAY)
1150                            .with_converted_type(ConvertedType::INTERVAL)
1151                            .with_length(12)
1152                            .build()
1153                            .unwrap(),
1154                    ),
1155                    Arc::new(
1156                        types::Type::group_type_builder("nested")
1157                            .with_repetition(Repetition::REQUIRED)
1158                            .with_fields(vec![
1159                                Arc::new(
1160                                    types::Type::primitive_type_builder(
1161                                        "col3",
1162                                        Type::FIXED_LEN_BYTE_ARRAY,
1163                                    )
1164                                    .with_logical_type(Some(LogicalType::Float16))
1165                                    .with_length(2)
1166                                    .build()
1167                                    .unwrap(),
1168                                ),
1169                                Arc::new(
1170                                    types::Type::primitive_type_builder("col4", Type::BYTE_ARRAY)
1171                                        .with_logical_type(Some(LogicalType::String))
1172                                        .build()
1173                                        .unwrap(),
1174                                ),
1175                            ])
1176                            .build()
1177                            .unwrap(),
1178                    ),
1179                ])
1180                .build()
1181                .unwrap(),
1182        );
1183
1184        let props = Default::default();
1185        let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1186        writer.close().unwrap();
1187
1188        let reader = SerializedFileReader::new(file).unwrap();
1189
1190        // only leaves
1191        let expected = vec![
1192            // INT32
1193            ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED),
1194            // INTERVAL
1195            ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::UNDEFINED),
1196            // Float16
1197            ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED),
1198            // String
1199            ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::UNSIGNED),
1200        ];
1201        let actual = reader.metadata().file_metadata().column_orders();
1202
1203        assert!(actual.is_some());
1204        let actual = actual.unwrap();
1205        assert_eq!(*actual, expected);
1206    }
1207
1208    #[test]
1209    fn test_file_writer_with_metadata() {
1210        let file = tempfile::tempfile().unwrap();
1211
1212        let schema = Arc::new(
1213            types::Type::group_type_builder("schema")
1214                .with_fields(vec![Arc::new(
1215                    types::Type::primitive_type_builder("col1", Type::INT32)
1216                        .build()
1217                        .unwrap(),
1218                )])
1219                .build()
1220                .unwrap(),
1221        );
1222        let props = Arc::new(
1223            WriterProperties::builder()
1224                .set_key_value_metadata(Some(vec![KeyValue::new(
1225                    "key".to_string(),
1226                    "value".to_string(),
1227                )]))
1228                .build(),
1229        );
1230        let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1231        writer.close().unwrap();
1232
1233        let reader = SerializedFileReader::new(file).unwrap();
1234        assert_eq!(
1235            reader
1236                .metadata()
1237                .file_metadata()
1238                .key_value_metadata()
1239                .to_owned()
1240                .unwrap()
1241                .len(),
1242            1
1243        );
1244    }
1245
1246    #[test]
1247    fn test_file_writer_v2_with_metadata() {
1248        let file = tempfile::tempfile().unwrap();
1249        let field_logical_type = Some(LogicalType::Integer {
1250            bit_width: 8,
1251            is_signed: false,
1252        });
1253        let field = Arc::new(
1254            types::Type::primitive_type_builder("col1", Type::INT32)
1255                .with_logical_type(field_logical_type.clone())
1256                .with_converted_type(field_logical_type.into())
1257                .build()
1258                .unwrap(),
1259        );
1260        let schema = Arc::new(
1261            types::Type::group_type_builder("schema")
1262                .with_fields(vec![field.clone()])
1263                .build()
1264                .unwrap(),
1265        );
1266        let props = Arc::new(
1267            WriterProperties::builder()
1268                .set_key_value_metadata(Some(vec![KeyValue::new(
1269                    "key".to_string(),
1270                    "value".to_string(),
1271                )]))
1272                .set_writer_version(WriterVersion::PARQUET_2_0)
1273                .build(),
1274        );
1275        let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1276        writer.close().unwrap();
1277
1278        let reader = SerializedFileReader::new(file).unwrap();
1279
1280        assert_eq!(
1281            reader
1282                .metadata()
1283                .file_metadata()
1284                .key_value_metadata()
1285                .to_owned()
1286                .unwrap()
1287                .len(),
1288            1
1289        );
1290
1291        // ARROW-11803: Test that the converted and logical types have been populated
1292        let fields = reader.metadata().file_metadata().schema().get_fields();
1293        assert_eq!(fields.len(), 1);
1294        assert_eq!(fields[0], field);
1295    }
1296
1297    #[test]
1298    fn test_file_writer_with_sorting_columns_metadata() {
1299        let file = tempfile::tempfile().unwrap();
1300
1301        let schema = Arc::new(
1302            types::Type::group_type_builder("schema")
1303                .with_fields(vec![
1304                    Arc::new(
1305                        types::Type::primitive_type_builder("col1", Type::INT32)
1306                            .build()
1307                            .unwrap(),
1308                    ),
1309                    Arc::new(
1310                        types::Type::primitive_type_builder("col2", Type::INT32)
1311                            .build()
1312                            .unwrap(),
1313                    ),
1314                ])
1315                .build()
1316                .unwrap(),
1317        );
1318        let expected_result = Some(vec![SortingColumn {
1319            column_idx: 0,
1320            descending: false,
1321            nulls_first: true,
1322        }]);
1323        let props = Arc::new(
1324            WriterProperties::builder()
1325                .set_key_value_metadata(Some(vec![KeyValue::new(
1326                    "key".to_string(),
1327                    "value".to_string(),
1328                )]))
1329                .set_sorting_columns(expected_result.clone())
1330                .build(),
1331        );
1332        let mut writer =
1333            SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1334        let mut row_group_writer = writer.next_row_group().expect("get row group writer");
1335
1336        let col_writer = row_group_writer.next_column().unwrap().unwrap();
1337        col_writer.close().unwrap();
1338
1339        let col_writer = row_group_writer.next_column().unwrap().unwrap();
1340        col_writer.close().unwrap();
1341
1342        row_group_writer.close().unwrap();
1343        writer.close().unwrap();
1344
1345        let reader = SerializedFileReader::new(file).unwrap();
1346        let result: Vec<Option<&Vec<SortingColumn>>> = reader
1347            .metadata()
1348            .row_groups()
1349            .iter()
1350            .map(|f| f.sorting_columns())
1351            .collect();
1352        // validate the sorting column read match the one written above
1353        assert_eq!(expected_result.as_ref(), result[0]);
1354    }
1355
1356    #[test]
1357    fn test_file_writer_empty_row_groups() {
1358        let file = tempfile::tempfile().unwrap();
1359        test_file_roundtrip(file, vec![]);
1360    }
1361
1362    #[test]
1363    fn test_file_writer_single_row_group() {
1364        let file = tempfile::tempfile().unwrap();
1365        test_file_roundtrip(file, vec![vec![1, 2, 3, 4, 5]]);
1366    }
1367
1368    #[test]
1369    fn test_file_writer_multiple_row_groups() {
1370        let file = tempfile::tempfile().unwrap();
1371        test_file_roundtrip(
1372            file,
1373            vec![
1374                vec![1, 2, 3, 4, 5],
1375                vec![1, 2, 3],
1376                vec![1],
1377                vec![1, 2, 3, 4, 5, 6],
1378            ],
1379        );
1380    }
1381
1382    #[test]
1383    fn test_file_writer_multiple_large_row_groups() {
1384        let file = tempfile::tempfile().unwrap();
1385        test_file_roundtrip(
1386            file,
1387            vec![vec![123; 1024], vec![124; 1000], vec![125; 15], vec![]],
1388        );
1389    }
1390
1391    #[test]
1392    fn test_page_writer_data_pages() {
1393        let pages = vec![
1394            Page::DataPage {
1395                buf: Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]),
1396                num_values: 10,
1397                encoding: Encoding::DELTA_BINARY_PACKED,
1398                def_level_encoding: Encoding::RLE,
1399                rep_level_encoding: Encoding::RLE,
1400                statistics: Some(Statistics::int32(Some(1), Some(3), None, Some(7), true)),
1401            },
1402            Page::DataPageV2 {
1403                buf: Bytes::from(vec![4; 128]),
1404                num_values: 10,
1405                encoding: Encoding::DELTA_BINARY_PACKED,
1406                num_nulls: 2,
1407                num_rows: 12,
1408                def_levels_byte_len: 24,
1409                rep_levels_byte_len: 32,
1410                is_compressed: false,
1411                statistics: Some(Statistics::int32(Some(1), Some(3), None, Some(7), true)),
1412            },
1413        ];
1414
1415        test_page_roundtrip(&pages[..], Compression::SNAPPY, Type::INT32);
1416        test_page_roundtrip(&pages[..], Compression::UNCOMPRESSED, Type::INT32);
1417    }
1418
1419    #[test]
1420    fn test_page_writer_dict_pages() {
1421        let pages = vec![
1422            Page::DictionaryPage {
1423                buf: Bytes::from(vec![1, 2, 3, 4, 5]),
1424                num_values: 5,
1425                encoding: Encoding::RLE_DICTIONARY,
1426                is_sorted: false,
1427            },
1428            Page::DataPage {
1429                buf: Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]),
1430                num_values: 10,
1431                encoding: Encoding::DELTA_BINARY_PACKED,
1432                def_level_encoding: Encoding::RLE,
1433                rep_level_encoding: Encoding::RLE,
1434                statistics: Some(Statistics::int32(Some(1), Some(3), None, Some(7), true)),
1435            },
1436            Page::DataPageV2 {
1437                buf: Bytes::from(vec![4; 128]),
1438                num_values: 10,
1439                encoding: Encoding::DELTA_BINARY_PACKED,
1440                num_nulls: 2,
1441                num_rows: 12,
1442                def_levels_byte_len: 24,
1443                rep_levels_byte_len: 32,
1444                is_compressed: false,
1445                statistics: None,
1446            },
1447        ];
1448
1449        test_page_roundtrip(&pages[..], Compression::SNAPPY, Type::INT32);
1450        test_page_roundtrip(&pages[..], Compression::UNCOMPRESSED, Type::INT32);
1451    }
1452
1453    /// Tests writing and reading pages.
1454    /// Physical type is for statistics only, should match any defined statistics type in
1455    /// pages.
1456    fn test_page_roundtrip(pages: &[Page], codec: Compression, physical_type: Type) {
1457        let mut compressed_pages = vec![];
1458        let mut total_num_values = 0i64;
1459        let codec_options = CodecOptionsBuilder::default()
1460            .set_backward_compatible_lz4(false)
1461            .build();
1462        let mut compressor = create_codec(codec, &codec_options).unwrap();
1463
1464        for page in pages {
1465            let uncompressed_len = page.buffer().len();
1466
1467            let compressed_page = match *page {
1468                Page::DataPage {
1469                    ref buf,
1470                    num_values,
1471                    encoding,
1472                    def_level_encoding,
1473                    rep_level_encoding,
1474                    ref statistics,
1475                } => {
1476                    total_num_values += num_values as i64;
1477                    let output_buf = compress_helper(compressor.as_mut(), buf);
1478
1479                    Page::DataPage {
1480                        buf: Bytes::from(output_buf),
1481                        num_values,
1482                        encoding,
1483                        def_level_encoding,
1484                        rep_level_encoding,
1485                        statistics: from_thrift(physical_type, to_thrift(statistics.as_ref()))
1486                            .unwrap(),
1487                    }
1488                }
1489                Page::DataPageV2 {
1490                    ref buf,
1491                    num_values,
1492                    encoding,
1493                    num_nulls,
1494                    num_rows,
1495                    def_levels_byte_len,
1496                    rep_levels_byte_len,
1497                    ref statistics,
1498                    ..
1499                } => {
1500                    total_num_values += num_values as i64;
1501                    let offset = (def_levels_byte_len + rep_levels_byte_len) as usize;
1502                    let cmp_buf = compress_helper(compressor.as_mut(), &buf[offset..]);
1503                    let mut output_buf = Vec::from(&buf[..offset]);
1504                    output_buf.extend_from_slice(&cmp_buf[..]);
1505
1506                    Page::DataPageV2 {
1507                        buf: Bytes::from(output_buf),
1508                        num_values,
1509                        encoding,
1510                        num_nulls,
1511                        num_rows,
1512                        def_levels_byte_len,
1513                        rep_levels_byte_len,
1514                        is_compressed: compressor.is_some(),
1515                        statistics: from_thrift(physical_type, to_thrift(statistics.as_ref()))
1516                            .unwrap(),
1517                    }
1518                }
1519                Page::DictionaryPage {
1520                    ref buf,
1521                    num_values,
1522                    encoding,
1523                    is_sorted,
1524                } => {
1525                    let output_buf = compress_helper(compressor.as_mut(), buf);
1526
1527                    Page::DictionaryPage {
1528                        buf: Bytes::from(output_buf),
1529                        num_values,
1530                        encoding,
1531                        is_sorted,
1532                    }
1533                }
1534            };
1535
1536            let compressed_page = CompressedPage::new(compressed_page, uncompressed_len);
1537            compressed_pages.push(compressed_page);
1538        }
1539
1540        let mut buffer: Vec<u8> = vec![];
1541        let mut result_pages: Vec<Page> = vec![];
1542        {
1543            let mut writer = TrackedWrite::new(&mut buffer);
1544            let mut page_writer = SerializedPageWriter::new(&mut writer);
1545
1546            for page in compressed_pages {
1547                page_writer.write_page(page).unwrap();
1548            }
1549            page_writer.close().unwrap();
1550        }
1551        {
1552            let reader = bytes::Bytes::from(buffer);
1553
1554            let t = types::Type::primitive_type_builder("t", physical_type)
1555                .build()
1556                .unwrap();
1557
1558            let desc = ColumnDescriptor::new(Arc::new(t), 0, 0, ColumnPath::new(vec![]));
1559            let meta = ColumnChunkMetaData::builder(Arc::new(desc))
1560                .set_compression(codec)
1561                .set_total_compressed_size(reader.len() as i64)
1562                .set_num_values(total_num_values)
1563                .build()
1564                .unwrap();
1565
1566            let props = ReaderProperties::builder()
1567                .set_backward_compatible_lz4(false)
1568                .build();
1569            let mut page_reader = SerializedPageReader::new_with_properties(
1570                Arc::new(reader),
1571                &meta,
1572                total_num_values as usize,
1573                None,
1574                Arc::new(props),
1575            )
1576            .unwrap();
1577
1578            while let Some(page) = page_reader.get_next_page().unwrap() {
1579                result_pages.push(page);
1580            }
1581        }
1582
1583        assert_eq!(result_pages.len(), pages.len());
1584        for i in 0..result_pages.len() {
1585            assert_page(&result_pages[i], &pages[i]);
1586        }
1587    }
1588
1589    /// Helper function to compress a slice
1590    fn compress_helper(compressor: Option<&mut Box<dyn Codec>>, data: &[u8]) -> Vec<u8> {
1591        let mut output_buf = vec![];
1592        if let Some(cmpr) = compressor {
1593            cmpr.compress(data, &mut output_buf).unwrap();
1594        } else {
1595            output_buf.extend_from_slice(data);
1596        }
1597        output_buf
1598    }
1599
1600    /// Check if pages match.
1601    fn assert_page(left: &Page, right: &Page) {
1602        assert_eq!(left.page_type(), right.page_type());
1603        assert_eq!(&left.buffer(), &right.buffer());
1604        assert_eq!(left.num_values(), right.num_values());
1605        assert_eq!(left.encoding(), right.encoding());
1606        assert_eq!(to_thrift(left.statistics()), to_thrift(right.statistics()));
1607    }
1608
1609    /// Tests roundtrip of i32 data written using `W` and read using `R`
1610    fn test_roundtrip_i32<W, R>(
1611        file: W,
1612        data: Vec<Vec<i32>>,
1613        compression: Compression,
1614    ) -> crate::format::FileMetaData
1615    where
1616        W: Write + Send,
1617        R: ChunkReader + From<W> + 'static,
1618    {
1619        test_roundtrip::<W, R, Int32Type, _>(file, data, |r| r.get_int(0).unwrap(), compression)
1620    }
1621
1622    /// Tests roundtrip of data of type `D` written using `W` and read using `R`
1623    /// and the provided `values` function
1624    fn test_roundtrip<W, R, D, F>(
1625        mut file: W,
1626        data: Vec<Vec<D::T>>,
1627        value: F,
1628        compression: Compression,
1629    ) -> crate::format::FileMetaData
1630    where
1631        W: Write + Send,
1632        R: ChunkReader + From<W> + 'static,
1633        D: DataType,
1634        F: Fn(Row) -> D::T,
1635    {
1636        let schema = Arc::new(
1637            types::Type::group_type_builder("schema")
1638                .with_fields(vec![Arc::new(
1639                    types::Type::primitive_type_builder("col1", D::get_physical_type())
1640                        .with_repetition(Repetition::REQUIRED)
1641                        .build()
1642                        .unwrap(),
1643                )])
1644                .build()
1645                .unwrap(),
1646        );
1647        let props = Arc::new(
1648            WriterProperties::builder()
1649                .set_compression(compression)
1650                .build(),
1651        );
1652        let mut file_writer = SerializedFileWriter::new(&mut file, schema, props).unwrap();
1653        let mut rows: i64 = 0;
1654
1655        for (idx, subset) in data.iter().enumerate() {
1656            let row_group_file_offset = file_writer.buf.bytes_written();
1657            let mut row_group_writer = file_writer.next_row_group().unwrap();
1658            if let Some(mut writer) = row_group_writer.next_column().unwrap() {
1659                rows += writer
1660                    .typed::<D>()
1661                    .write_batch(&subset[..], None, None)
1662                    .unwrap() as i64;
1663                writer.close().unwrap();
1664            }
1665            let last_group = row_group_writer.close().unwrap();
1666            let flushed = file_writer.flushed_row_groups();
1667            assert_eq!(flushed.len(), idx + 1);
1668            assert_eq!(Some(idx as i16), last_group.ordinal());
1669            assert_eq!(Some(row_group_file_offset as i64), last_group.file_offset());
1670            assert_eq!(&flushed[idx], last_group.as_ref());
1671        }
1672        let file_metadata = file_writer.close().unwrap();
1673
1674        let reader = SerializedFileReader::new(R::from(file)).unwrap();
1675        assert_eq!(reader.num_row_groups(), data.len());
1676        assert_eq!(
1677            reader.metadata().file_metadata().num_rows(),
1678            rows,
1679            "row count in metadata not equal to number of rows written"
1680        );
1681        for (i, item) in data.iter().enumerate().take(reader.num_row_groups()) {
1682            let row_group_reader = reader.get_row_group(i).unwrap();
1683            let iter = row_group_reader.get_row_iter(None).unwrap();
1684            let res: Vec<_> = iter.map(|row| row.unwrap()).map(&value).collect();
1685            let row_group_size = row_group_reader.metadata().total_byte_size();
1686            let uncompressed_size: i64 = row_group_reader
1687                .metadata()
1688                .columns()
1689                .iter()
1690                .map(|v| v.uncompressed_size())
1691                .sum();
1692            assert_eq!(row_group_size, uncompressed_size);
1693            assert_eq!(res, *item);
1694        }
1695        file_metadata
1696    }
1697
1698    /// File write-read roundtrip.
1699    /// `data` consists of arrays of values for each row group.
1700    fn test_file_roundtrip(file: File, data: Vec<Vec<i32>>) -> crate::format::FileMetaData {
1701        test_roundtrip_i32::<File, File>(file, data, Compression::UNCOMPRESSED)
1702    }
1703
1704    #[test]
1705    fn test_bytes_writer_empty_row_groups() {
1706        test_bytes_roundtrip(vec![], Compression::UNCOMPRESSED);
1707    }
1708
1709    #[test]
1710    fn test_bytes_writer_single_row_group() {
1711        test_bytes_roundtrip(vec![vec![1, 2, 3, 4, 5]], Compression::UNCOMPRESSED);
1712    }
1713
1714    #[test]
1715    fn test_bytes_writer_multiple_row_groups() {
1716        test_bytes_roundtrip(
1717            vec![
1718                vec![1, 2, 3, 4, 5],
1719                vec![1, 2, 3],
1720                vec![1],
1721                vec![1, 2, 3, 4, 5, 6],
1722            ],
1723            Compression::UNCOMPRESSED,
1724        );
1725    }
1726
1727    #[test]
1728    fn test_bytes_writer_single_row_group_compressed() {
1729        test_bytes_roundtrip(vec![vec![1, 2, 3, 4, 5]], Compression::SNAPPY);
1730    }
1731
1732    #[test]
1733    fn test_bytes_writer_multiple_row_groups_compressed() {
1734        test_bytes_roundtrip(
1735            vec![
1736                vec![1, 2, 3, 4, 5],
1737                vec![1, 2, 3],
1738                vec![1],
1739                vec![1, 2, 3, 4, 5, 6],
1740            ],
1741            Compression::SNAPPY,
1742        );
1743    }
1744
1745    fn test_bytes_roundtrip(data: Vec<Vec<i32>>, compression: Compression) {
1746        test_roundtrip_i32::<Vec<u8>, Bytes>(Vec::with_capacity(1024), data, compression);
1747    }
1748
1749    #[test]
1750    fn test_boolean_roundtrip() {
1751        let my_bool_values: Vec<_> = (0..2049).map(|idx| idx % 2 == 0).collect();
1752        test_roundtrip::<Vec<u8>, Bytes, BoolType, _>(
1753            Vec::with_capacity(1024),
1754            vec![my_bool_values],
1755            |r| r.get_bool(0).unwrap(),
1756            Compression::UNCOMPRESSED,
1757        );
1758    }
1759
1760    #[test]
1761    fn test_boolean_compressed_roundtrip() {
1762        let my_bool_values: Vec<_> = (0..2049).map(|idx| idx % 2 == 0).collect();
1763        test_roundtrip::<Vec<u8>, Bytes, BoolType, _>(
1764            Vec::with_capacity(1024),
1765            vec![my_bool_values],
1766            |r| r.get_bool(0).unwrap(),
1767            Compression::SNAPPY,
1768        );
1769    }
1770
1771    #[test]
1772    fn test_column_offset_index_file() {
1773        let file = tempfile::tempfile().unwrap();
1774        let file_metadata = test_file_roundtrip(file, vec![vec![1, 2, 3, 4, 5]]);
1775        file_metadata.row_groups.iter().for_each(|row_group| {
1776            row_group.columns.iter().for_each(|column_chunk| {
1777                assert_ne!(None, column_chunk.column_index_offset);
1778                assert_ne!(None, column_chunk.column_index_length);
1779
1780                assert_ne!(None, column_chunk.offset_index_offset);
1781                assert_ne!(None, column_chunk.offset_index_length);
1782            })
1783        });
1784    }
1785
1786    fn test_kv_metadata(initial_kv: Option<Vec<KeyValue>>, final_kv: Option<Vec<KeyValue>>) {
1787        let schema = Arc::new(
1788            types::Type::group_type_builder("schema")
1789                .with_fields(vec![Arc::new(
1790                    types::Type::primitive_type_builder("col1", Type::INT32)
1791                        .with_repetition(Repetition::REQUIRED)
1792                        .build()
1793                        .unwrap(),
1794                )])
1795                .build()
1796                .unwrap(),
1797        );
1798        let mut out = Vec::with_capacity(1024);
1799        let props = Arc::new(
1800            WriterProperties::builder()
1801                .set_key_value_metadata(initial_kv.clone())
1802                .build(),
1803        );
1804        let mut writer = SerializedFileWriter::new(&mut out, schema, props).unwrap();
1805        let mut row_group_writer = writer.next_row_group().unwrap();
1806        let column = row_group_writer.next_column().unwrap().unwrap();
1807        column.close().unwrap();
1808        row_group_writer.close().unwrap();
1809        if let Some(kvs) = &final_kv {
1810            for kv in kvs {
1811                writer.append_key_value_metadata(kv.clone())
1812            }
1813        }
1814        writer.close().unwrap();
1815
1816        let reader = SerializedFileReader::new(Bytes::from(out)).unwrap();
1817        let metadata = reader.metadata().file_metadata();
1818        let keys = metadata.key_value_metadata();
1819
1820        match (initial_kv, final_kv) {
1821            (Some(a), Some(b)) => {
1822                let keys = keys.unwrap();
1823                assert_eq!(keys.len(), a.len() + b.len());
1824                assert_eq!(&keys[..a.len()], a.as_slice());
1825                assert_eq!(&keys[a.len()..], b.as_slice());
1826            }
1827            (Some(v), None) => assert_eq!(keys.unwrap(), &v),
1828            (None, Some(v)) if !v.is_empty() => assert_eq!(keys.unwrap(), &v),
1829            _ => assert!(keys.is_none()),
1830        }
1831    }
1832
1833    #[test]
1834    fn test_append_metadata() {
1835        let kv1 = KeyValue::new("cupcakes".to_string(), "awesome".to_string());
1836        let kv2 = KeyValue::new("bingo".to_string(), "bongo".to_string());
1837
1838        test_kv_metadata(None, None);
1839        test_kv_metadata(Some(vec![kv1.clone()]), None);
1840        test_kv_metadata(None, Some(vec![kv2.clone()]));
1841        test_kv_metadata(Some(vec![kv1.clone()]), Some(vec![kv2.clone()]));
1842        test_kv_metadata(Some(vec![]), Some(vec![kv2]));
1843        test_kv_metadata(Some(vec![]), Some(vec![]));
1844        test_kv_metadata(Some(vec![kv1]), Some(vec![]));
1845        test_kv_metadata(None, Some(vec![]));
1846    }
1847
1848    #[test]
1849    fn test_backwards_compatible_statistics() {
1850        let message_type = "
1851            message test_schema {
1852                REQUIRED INT32 decimal1 (DECIMAL(8,2));
1853                REQUIRED INT32 i32 (INTEGER(32,true));
1854                REQUIRED INT32 u32 (INTEGER(32,false));
1855            }
1856        ";
1857
1858        let schema = Arc::new(parse_message_type(message_type).unwrap());
1859        let props = Default::default();
1860        let mut writer = SerializedFileWriter::new(vec![], schema, props).unwrap();
1861        let mut row_group_writer = writer.next_row_group().unwrap();
1862
1863        for _ in 0..3 {
1864            let mut writer = row_group_writer.next_column().unwrap().unwrap();
1865            writer
1866                .typed::<Int32Type>()
1867                .write_batch(&[1, 2, 3], None, None)
1868                .unwrap();
1869            writer.close().unwrap();
1870        }
1871        let metadata = row_group_writer.close().unwrap();
1872        writer.close().unwrap();
1873
1874        let thrift = metadata.to_thrift();
1875        let encoded_stats: Vec<_> = thrift
1876            .columns
1877            .into_iter()
1878            .map(|x| x.meta_data.unwrap().statistics.unwrap())
1879            .collect();
1880
1881        // decimal
1882        let s = &encoded_stats[0];
1883        assert_eq!(s.min.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1884        assert_eq!(s.max.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1885        assert_eq!(s.min_value.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1886        assert_eq!(s.max_value.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1887
1888        // i32
1889        let s = &encoded_stats[1];
1890        assert_eq!(s.min.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1891        assert_eq!(s.max.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1892        assert_eq!(s.min_value.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1893        assert_eq!(s.max_value.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1894
1895        // u32
1896        let s = &encoded_stats[2];
1897        assert_eq!(s.min.as_deref(), None);
1898        assert_eq!(s.max.as_deref(), None);
1899        assert_eq!(s.min_value.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1900        assert_eq!(s.max_value.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1901    }
1902
1903    #[test]
1904    fn test_spliced_write() {
1905        let message_type = "
1906            message test_schema {
1907                REQUIRED INT32 i32 (INTEGER(32,true));
1908                REQUIRED INT32 u32 (INTEGER(32,false));
1909            }
1910        ";
1911        let schema = Arc::new(parse_message_type(message_type).unwrap());
1912        let props = Arc::new(WriterProperties::builder().build());
1913
1914        let mut file = Vec::with_capacity(1024);
1915        let mut file_writer = SerializedFileWriter::new(&mut file, schema, props.clone()).unwrap();
1916
1917        let columns = file_writer.descr.columns();
1918        let mut column_state: Vec<(_, Option<ColumnCloseResult>)> = columns
1919            .iter()
1920            .map(|_| (TrackedWrite::new(Vec::with_capacity(1024)), None))
1921            .collect();
1922
1923        let mut column_state_slice = column_state.as_mut_slice();
1924        let mut column_writers = Vec::with_capacity(columns.len());
1925        for c in columns {
1926            let ((buf, out), tail) = column_state_slice.split_first_mut().unwrap();
1927            column_state_slice = tail;
1928
1929            let page_writer = Box::new(SerializedPageWriter::new(buf));
1930            let col_writer = get_column_writer(c.clone(), props.clone(), page_writer);
1931            column_writers.push(SerializedColumnWriter::new(
1932                col_writer,
1933                Some(Box::new(|on_close| {
1934                    *out = Some(on_close);
1935                    Ok(())
1936                })),
1937            ));
1938        }
1939
1940        let column_data = [[1, 2, 3, 4], [7, 3, 7, 3]];
1941
1942        // Interleaved writing to the column writers
1943        for (writer, batch) in column_writers.iter_mut().zip(column_data) {
1944            let writer = writer.typed::<Int32Type>();
1945            writer.write_batch(&batch, None, None).unwrap();
1946        }
1947
1948        // Close the column writers
1949        for writer in column_writers {
1950            writer.close().unwrap()
1951        }
1952
1953        // Splice column data into a row group
1954        let mut row_group_writer = file_writer.next_row_group().unwrap();
1955        for (write, close) in column_state {
1956            let buf = Bytes::from(write.into_inner().unwrap());
1957            row_group_writer
1958                .append_column(&buf, close.unwrap())
1959                .unwrap();
1960        }
1961        row_group_writer.close().unwrap();
1962        file_writer.close().unwrap();
1963
1964        // Check data was written correctly
1965        let file = Bytes::from(file);
1966        let test_read = |reader: SerializedFileReader<Bytes>| {
1967            let row_group = reader.get_row_group(0).unwrap();
1968
1969            let mut out = Vec::with_capacity(4);
1970            let c1 = row_group.get_column_reader(0).unwrap();
1971            let mut c1 = get_typed_column_reader::<Int32Type>(c1);
1972            c1.read_records(4, None, None, &mut out).unwrap();
1973            assert_eq!(out, column_data[0]);
1974
1975            out.clear();
1976
1977            let c2 = row_group.get_column_reader(1).unwrap();
1978            let mut c2 = get_typed_column_reader::<Int32Type>(c2);
1979            c2.read_records(4, None, None, &mut out).unwrap();
1980            assert_eq!(out, column_data[1]);
1981        };
1982
1983        let reader = SerializedFileReader::new(file.clone()).unwrap();
1984        test_read(reader);
1985
1986        let options = ReadOptionsBuilder::new().with_page_index().build();
1987        let reader = SerializedFileReader::new_with_options(file, options).unwrap();
1988        test_read(reader);
1989    }
1990
1991    #[test]
1992    fn test_disabled_statistics() {
1993        let message_type = "
1994            message test_schema {
1995                REQUIRED INT32 a;
1996                REQUIRED INT32 b;
1997            }
1998        ";
1999        let schema = Arc::new(parse_message_type(message_type).unwrap());
2000        let props = WriterProperties::builder()
2001            .set_statistics_enabled(EnabledStatistics::None)
2002            .set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
2003            .set_offset_index_disabled(true) // this should be ignored because of the line above
2004            .build();
2005        let mut file = Vec::with_capacity(1024);
2006        let mut file_writer =
2007            SerializedFileWriter::new(&mut file, schema, Arc::new(props)).unwrap();
2008
2009        let mut row_group_writer = file_writer.next_row_group().unwrap();
2010        let mut a_writer = row_group_writer.next_column().unwrap().unwrap();
2011        let col_writer = a_writer.typed::<Int32Type>();
2012        col_writer.write_batch(&[1, 2, 3], None, None).unwrap();
2013        a_writer.close().unwrap();
2014
2015        let mut b_writer = row_group_writer.next_column().unwrap().unwrap();
2016        let col_writer = b_writer.typed::<Int32Type>();
2017        col_writer.write_batch(&[4, 5, 6], None, None).unwrap();
2018        b_writer.close().unwrap();
2019        row_group_writer.close().unwrap();
2020
2021        let metadata = file_writer.finish().unwrap();
2022        assert_eq!(metadata.row_groups.len(), 1);
2023        let row_group = &metadata.row_groups[0];
2024        assert_eq!(row_group.columns.len(), 2);
2025        // Column "a" has both offset and column index, as requested
2026        assert!(row_group.columns[0].offset_index_offset.is_some());
2027        assert!(row_group.columns[0].column_index_offset.is_some());
2028        // Column "b" should only have offset index
2029        assert!(row_group.columns[1].offset_index_offset.is_some());
2030        assert!(row_group.columns[1].column_index_offset.is_none());
2031
2032        let err = file_writer.next_row_group().err().unwrap().to_string();
2033        assert_eq!(err, "Parquet error: SerializedFileWriter already finished");
2034
2035        drop(file_writer);
2036
2037        let options = ReadOptionsBuilder::new().with_page_index().build();
2038        let reader = SerializedFileReader::new_with_options(Bytes::from(file), options).unwrap();
2039
2040        let offset_index = reader.metadata().offset_index().unwrap();
2041        assert_eq!(offset_index.len(), 1); // 1 row group
2042        assert_eq!(offset_index[0].len(), 2); // 2 columns
2043
2044        let column_index = reader.metadata().column_index().unwrap();
2045        assert_eq!(column_index.len(), 1); // 1 row group
2046        assert_eq!(column_index[0].len(), 2); // 2 column
2047
2048        let a_idx = &column_index[0][0];
2049        assert!(matches!(a_idx, Index::INT32(_)), "{a_idx:?}");
2050        let b_idx = &column_index[0][1];
2051        assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
2052    }
2053
2054    #[test]
2055    fn test_byte_array_size_statistics() {
2056        let message_type = "
2057            message test_schema {
2058                OPTIONAL BYTE_ARRAY a (UTF8);
2059            }
2060        ";
2061        let schema = Arc::new(parse_message_type(message_type).unwrap());
2062        let data = ByteArrayType::gen_vec(32, 7);
2063        let def_levels = [1, 1, 1, 1, 0, 1, 0, 1, 0, 1];
2064        let unenc_size: i64 = data.iter().map(|x| x.len() as i64).sum();
2065        let file: File = tempfile::tempfile().unwrap();
2066        let props = Arc::new(
2067            WriterProperties::builder()
2068                .set_statistics_enabled(EnabledStatistics::Page)
2069                .build(),
2070        );
2071
2072        let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
2073        let mut row_group_writer = writer.next_row_group().unwrap();
2074
2075        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
2076        col_writer
2077            .typed::<ByteArrayType>()
2078            .write_batch(&data, Some(&def_levels), None)
2079            .unwrap();
2080        col_writer.close().unwrap();
2081        row_group_writer.close().unwrap();
2082        let file_metadata = writer.close().unwrap();
2083
2084        assert_eq!(file_metadata.row_groups.len(), 1);
2085        assert_eq!(file_metadata.row_groups[0].columns.len(), 1);
2086        assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
2087
2088        let check_def_hist = |def_hist: &[i64]| {
2089            assert_eq!(def_hist.len(), 2);
2090            assert_eq!(def_hist[0], 3);
2091            assert_eq!(def_hist[1], 7);
2092        };
2093
2094        assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
2095        let meta_data = file_metadata.row_groups[0].columns[0]
2096            .meta_data
2097            .as_ref()
2098            .unwrap();
2099        assert!(meta_data.size_statistics.is_some());
2100        let size_stats = meta_data.size_statistics.as_ref().unwrap();
2101
2102        assert!(size_stats.repetition_level_histogram.is_none());
2103        assert!(size_stats.definition_level_histogram.is_some());
2104        assert!(size_stats.unencoded_byte_array_data_bytes.is_some());
2105        assert_eq!(
2106            unenc_size,
2107            size_stats.unencoded_byte_array_data_bytes.unwrap()
2108        );
2109        check_def_hist(size_stats.definition_level_histogram.as_ref().unwrap());
2110
2111        // check that the read metadata is also correct
2112        let options = ReadOptionsBuilder::new().with_page_index().build();
2113        let reader = SerializedFileReader::new_with_options(file, options).unwrap();
2114
2115        let rfile_metadata = reader.metadata().file_metadata();
2116        assert_eq!(rfile_metadata.num_rows(), file_metadata.num_rows);
2117        assert_eq!(reader.num_row_groups(), 1);
2118        let rowgroup = reader.get_row_group(0).unwrap();
2119        assert_eq!(rowgroup.num_columns(), 1);
2120        let column = rowgroup.metadata().column(0);
2121        assert!(column.definition_level_histogram().is_some());
2122        assert!(column.repetition_level_histogram().is_none());
2123        assert!(column.unencoded_byte_array_data_bytes().is_some());
2124        check_def_hist(column.definition_level_histogram().unwrap().values());
2125        assert_eq!(
2126            unenc_size,
2127            column.unencoded_byte_array_data_bytes().unwrap()
2128        );
2129
2130        // check histogram in column index as well
2131        assert!(reader.metadata().column_index().is_some());
2132        let column_index = reader.metadata().column_index().unwrap();
2133        assert_eq!(column_index.len(), 1);
2134        assert_eq!(column_index[0].len(), 1);
2135        let col_idx = if let Index::BYTE_ARRAY(index) = &column_index[0][0] {
2136            assert_eq!(index.indexes.len(), 1);
2137            &index.indexes[0]
2138        } else {
2139            unreachable!()
2140        };
2141
2142        assert!(col_idx.repetition_level_histogram().is_none());
2143        assert!(col_idx.definition_level_histogram().is_some());
2144        check_def_hist(col_idx.definition_level_histogram().unwrap().values());
2145
2146        assert!(reader.metadata().offset_index().is_some());
2147        let offset_index = reader.metadata().offset_index().unwrap();
2148        assert_eq!(offset_index.len(), 1);
2149        assert_eq!(offset_index[0].len(), 1);
2150        assert!(offset_index[0][0].unencoded_byte_array_data_bytes.is_some());
2151        let page_sizes = offset_index[0][0]
2152            .unencoded_byte_array_data_bytes
2153            .as_ref()
2154            .unwrap();
2155        assert_eq!(page_sizes.len(), 1);
2156        assert_eq!(page_sizes[0], unenc_size);
2157    }
2158
2159    #[test]
2160    fn test_too_many_rowgroups() {
2161        let message_type = "
2162            message test_schema {
2163                REQUIRED BYTE_ARRAY a (UTF8);
2164            }
2165        ";
2166        let schema = Arc::new(parse_message_type(message_type).unwrap());
2167        let file: File = tempfile::tempfile().unwrap();
2168        let props = Arc::new(
2169            WriterProperties::builder()
2170                .set_statistics_enabled(EnabledStatistics::None)
2171                .set_max_row_group_size(1)
2172                .build(),
2173        );
2174        let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
2175
2176        // Create 32k empty rowgroups. Should error when i == 32768.
2177        for i in 0..0x8001 {
2178            match writer.next_row_group() {
2179                Ok(mut row_group_writer) => {
2180                    assert_ne!(i, 0x8000);
2181                    let col_writer = row_group_writer.next_column().unwrap().unwrap();
2182                    col_writer.close().unwrap();
2183                    row_group_writer.close().unwrap();
2184                }
2185                Err(e) => {
2186                    assert_eq!(i, 0x8000);
2187                    assert_eq!(
2188                        e.to_string(),
2189                        "Parquet error: Parquet does not support more than 32767 row groups per file (currently: 32768)"
2190                    );
2191                }
2192            }
2193        }
2194        writer.close().unwrap();
2195    }
2196
2197    #[test]
2198    fn test_size_statistics_with_repetition_and_nulls() {
2199        let message_type = "
2200            message test_schema {
2201                OPTIONAL group i32_list (LIST) {
2202                    REPEATED group list {
2203                        OPTIONAL INT32 element;
2204                    }
2205                }
2206            }
2207        ";
2208        // column is:
2209        // row 0: [1, 2]
2210        // row 1: NULL
2211        // row 2: [4, NULL]
2212        // row 3: []
2213        // row 4: [7, 8, 9, 10]
2214        let schema = Arc::new(parse_message_type(message_type).unwrap());
2215        let data = [1, 2, 4, 7, 8, 9, 10];
2216        let def_levels = [3, 3, 0, 3, 2, 1, 3, 3, 3, 3];
2217        let rep_levels = [0, 1, 0, 0, 1, 0, 0, 1, 1, 1];
2218        let file = tempfile::tempfile().unwrap();
2219        let props = Arc::new(
2220            WriterProperties::builder()
2221                .set_statistics_enabled(EnabledStatistics::Page)
2222                .build(),
2223        );
2224        let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
2225        let mut row_group_writer = writer.next_row_group().unwrap();
2226
2227        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
2228        col_writer
2229            .typed::<Int32Type>()
2230            .write_batch(&data, Some(&def_levels), Some(&rep_levels))
2231            .unwrap();
2232        col_writer.close().unwrap();
2233        row_group_writer.close().unwrap();
2234        let file_metadata = writer.close().unwrap();
2235
2236        assert_eq!(file_metadata.row_groups.len(), 1);
2237        assert_eq!(file_metadata.row_groups[0].columns.len(), 1);
2238        assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
2239
2240        let check_def_hist = |def_hist: &[i64]| {
2241            assert_eq!(def_hist.len(), 4);
2242            assert_eq!(def_hist[0], 1);
2243            assert_eq!(def_hist[1], 1);
2244            assert_eq!(def_hist[2], 1);
2245            assert_eq!(def_hist[3], 7);
2246        };
2247
2248        let check_rep_hist = |rep_hist: &[i64]| {
2249            assert_eq!(rep_hist.len(), 2);
2250            assert_eq!(rep_hist[0], 5);
2251            assert_eq!(rep_hist[1], 5);
2252        };
2253
2254        // check that histograms are set properly in the write and read metadata
2255        // also check that unencoded_byte_array_data_bytes is not set
2256        assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
2257        let meta_data = file_metadata.row_groups[0].columns[0]
2258            .meta_data
2259            .as_ref()
2260            .unwrap();
2261        assert!(meta_data.size_statistics.is_some());
2262        let size_stats = meta_data.size_statistics.as_ref().unwrap();
2263        assert!(size_stats.repetition_level_histogram.is_some());
2264        assert!(size_stats.definition_level_histogram.is_some());
2265        assert!(size_stats.unencoded_byte_array_data_bytes.is_none());
2266        check_def_hist(size_stats.definition_level_histogram.as_ref().unwrap());
2267        check_rep_hist(size_stats.repetition_level_histogram.as_ref().unwrap());
2268
2269        // check that the read metadata is also correct
2270        let options = ReadOptionsBuilder::new().with_page_index().build();
2271        let reader = SerializedFileReader::new_with_options(file, options).unwrap();
2272
2273        let rfile_metadata = reader.metadata().file_metadata();
2274        assert_eq!(rfile_metadata.num_rows(), file_metadata.num_rows);
2275        assert_eq!(reader.num_row_groups(), 1);
2276        let rowgroup = reader.get_row_group(0).unwrap();
2277        assert_eq!(rowgroup.num_columns(), 1);
2278        let column = rowgroup.metadata().column(0);
2279        assert!(column.definition_level_histogram().is_some());
2280        assert!(column.repetition_level_histogram().is_some());
2281        assert!(column.unencoded_byte_array_data_bytes().is_none());
2282        check_def_hist(column.definition_level_histogram().unwrap().values());
2283        check_rep_hist(column.repetition_level_histogram().unwrap().values());
2284
2285        // check histogram in column index as well
2286        assert!(reader.metadata().column_index().is_some());
2287        let column_index = reader.metadata().column_index().unwrap();
2288        assert_eq!(column_index.len(), 1);
2289        assert_eq!(column_index[0].len(), 1);
2290        let col_idx = if let Index::INT32(index) = &column_index[0][0] {
2291            assert_eq!(index.indexes.len(), 1);
2292            &index.indexes[0]
2293        } else {
2294            unreachable!()
2295        };
2296
2297        check_def_hist(col_idx.definition_level_histogram().unwrap().values());
2298        check_rep_hist(col_idx.repetition_level_histogram().unwrap().values());
2299
2300        assert!(reader.metadata().offset_index().is_some());
2301        let offset_index = reader.metadata().offset_index().unwrap();
2302        assert_eq!(offset_index.len(), 1);
2303        assert_eq!(offset_index[0].len(), 1);
2304        assert!(offset_index[0][0].unencoded_byte_array_data_bytes.is_none());
2305    }
2306
2307    #[test]
2308    #[cfg(feature = "arrow")]
2309    fn test_byte_stream_split_extended_roundtrip() {
2310        let path = format!(
2311            "{}/byte_stream_split_extended.gzip.parquet",
2312            arrow::util::test_util::parquet_test_data(),
2313        );
2314        let file = File::open(path).unwrap();
2315
2316        // Read in test file and rewrite to tmp
2317        let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(file)
2318            .expect("parquet open")
2319            .build()
2320            .expect("parquet open");
2321
2322        let file = tempfile::tempfile().unwrap();
2323        let props = WriterProperties::builder()
2324            .set_dictionary_enabled(false)
2325            .set_column_encoding(
2326                ColumnPath::from("float16_byte_stream_split"),
2327                Encoding::BYTE_STREAM_SPLIT,
2328            )
2329            .set_column_encoding(
2330                ColumnPath::from("float_byte_stream_split"),
2331                Encoding::BYTE_STREAM_SPLIT,
2332            )
2333            .set_column_encoding(
2334                ColumnPath::from("double_byte_stream_split"),
2335                Encoding::BYTE_STREAM_SPLIT,
2336            )
2337            .set_column_encoding(
2338                ColumnPath::from("int32_byte_stream_split"),
2339                Encoding::BYTE_STREAM_SPLIT,
2340            )
2341            .set_column_encoding(
2342                ColumnPath::from("int64_byte_stream_split"),
2343                Encoding::BYTE_STREAM_SPLIT,
2344            )
2345            .set_column_encoding(
2346                ColumnPath::from("flba5_byte_stream_split"),
2347                Encoding::BYTE_STREAM_SPLIT,
2348            )
2349            .set_column_encoding(
2350                ColumnPath::from("decimal_byte_stream_split"),
2351                Encoding::BYTE_STREAM_SPLIT,
2352            )
2353            .build();
2354
2355        let mut parquet_writer = ArrowWriter::try_new(
2356            file.try_clone().expect("cannot open file"),
2357            parquet_reader.schema(),
2358            Some(props),
2359        )
2360        .expect("create arrow writer");
2361
2362        for maybe_batch in parquet_reader {
2363            let batch = maybe_batch.expect("reading batch");
2364            parquet_writer.write(&batch).expect("writing data");
2365        }
2366
2367        parquet_writer.close().expect("finalizing file");
2368
2369        let reader = SerializedFileReader::new(file).expect("Failed to create reader");
2370        let filemeta = reader.metadata();
2371
2372        // Make sure byte_stream_split encoding was used
2373        let check_encoding = |x: usize, filemeta: &ParquetMetaData| {
2374            assert!(filemeta
2375                .row_group(0)
2376                .column(x)
2377                .encodings()
2378                .contains(&Encoding::BYTE_STREAM_SPLIT));
2379        };
2380
2381        check_encoding(1, filemeta);
2382        check_encoding(3, filemeta);
2383        check_encoding(5, filemeta);
2384        check_encoding(7, filemeta);
2385        check_encoding(9, filemeta);
2386        check_encoding(11, filemeta);
2387        check_encoding(13, filemeta);
2388
2389        // Read back tmpfile and make sure all values are correct
2390        let mut iter = reader
2391            .get_row_iter(None)
2392            .expect("Failed to create row iterator");
2393
2394        let mut start = 0;
2395        let end = reader.metadata().file_metadata().num_rows();
2396
2397        let check_row = |row: Result<Row, ParquetError>| {
2398            assert!(row.is_ok());
2399            let r = row.unwrap();
2400            assert_eq!(r.get_float16(0).unwrap(), r.get_float16(1).unwrap());
2401            assert_eq!(r.get_float(2).unwrap(), r.get_float(3).unwrap());
2402            assert_eq!(r.get_double(4).unwrap(), r.get_double(5).unwrap());
2403            assert_eq!(r.get_int(6).unwrap(), r.get_int(7).unwrap());
2404            assert_eq!(r.get_long(8).unwrap(), r.get_long(9).unwrap());
2405            assert_eq!(r.get_bytes(10).unwrap(), r.get_bytes(11).unwrap());
2406            assert_eq!(r.get_decimal(12).unwrap(), r.get_decimal(13).unwrap());
2407        };
2408
2409        while start < end {
2410            match iter.next() {
2411                Some(row) => check_row(row),
2412                None => break,
2413            };
2414            start += 1;
2415        }
2416    }
2417}