parquet/file/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains file writer API, and provides methods to write row groups and columns by
19//! using row group writers and column writers respectively.
20
21use crate::bloom_filter::Sbbf;
22use crate::format as parquet;
23use crate::format::{ColumnIndex, OffsetIndex};
24use crate::thrift::TSerializable;
25use std::fmt::Debug;
26use std::io::{BufWriter, IoSlice, Read};
27use std::{io::Write, sync::Arc};
28use thrift::protocol::TCompactOutputProtocol;
29
30use crate::column::page_encryption::PageEncryptor;
31use crate::column::writer::{get_typed_column_writer_mut, ColumnCloseResult, ColumnWriterImpl};
32use crate::column::{
33    page::{CompressedPage, PageWriteSpec, PageWriter},
34    writer::{get_column_writer, ColumnWriter},
35};
36use crate::data_type::DataType;
37#[cfg(feature = "encryption")]
38use crate::encryption::encrypt::{
39    get_column_crypto_metadata, FileEncryptionProperties, FileEncryptor,
40};
41use crate::errors::{ParquetError, Result};
42use crate::file::properties::{BloomFilterPosition, WriterPropertiesPtr};
43use crate::file::reader::ChunkReader;
44#[cfg(feature = "encryption")]
45use crate::file::PARQUET_MAGIC_ENCR_FOOTER;
46use crate::file::{metadata::*, PARQUET_MAGIC};
47use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor, TypePtr};
48
49/// A wrapper around a [`Write`] that keeps track of the number
50/// of bytes that have been written. The given [`Write`] is wrapped
51/// with a [`BufWriter`] to optimize writing performance.
52pub struct TrackedWrite<W: Write> {
53    inner: BufWriter<W>,
54    bytes_written: usize,
55}
56
57impl<W: Write> TrackedWrite<W> {
58    /// Create a new [`TrackedWrite`] from a [`Write`]
59    pub fn new(inner: W) -> Self {
60        let buf_write = BufWriter::new(inner);
61        Self {
62            inner: buf_write,
63            bytes_written: 0,
64        }
65    }
66
67    /// Returns the number of bytes written to this instance
68    pub fn bytes_written(&self) -> usize {
69        self.bytes_written
70    }
71
72    /// Returns a reference to the underlying writer.
73    pub fn inner(&self) -> &W {
74        self.inner.get_ref()
75    }
76
77    /// Returns a mutable reference to the underlying writer.
78    ///
79    /// It is inadvisable to directly write to the underlying writer, doing so
80    /// will likely result in data corruption
81    pub fn inner_mut(&mut self) -> &mut W {
82        self.inner.get_mut()
83    }
84
85    /// Returns the underlying writer.
86    pub fn into_inner(self) -> Result<W> {
87        self.inner.into_inner().map_err(|err| {
88            ParquetError::General(format!("fail to get inner writer: {:?}", err.to_string()))
89        })
90    }
91}
92
93impl<W: Write> Write for TrackedWrite<W> {
94    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
95        let bytes = self.inner.write(buf)?;
96        self.bytes_written += bytes;
97        Ok(bytes)
98    }
99
100    fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> std::io::Result<usize> {
101        let bytes = self.inner.write_vectored(bufs)?;
102        self.bytes_written += bytes;
103        Ok(bytes)
104    }
105
106    fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
107        self.inner.write_all(buf)?;
108        self.bytes_written += buf.len();
109
110        Ok(())
111    }
112
113    fn flush(&mut self) -> std::io::Result<()> {
114        self.inner.flush()
115    }
116}
117
118/// Callback invoked on closing a column chunk
119pub type OnCloseColumnChunk<'a> = Box<dyn FnOnce(ColumnCloseResult) -> Result<()> + 'a>;
120
121/// Callback invoked on closing a row group, arguments are:
122///
123/// - the row group metadata
124/// - the column index for each column chunk
125/// - the offset index for each column chunk
126pub type OnCloseRowGroup<'a, W> = Box<
127    dyn FnOnce(
128            &'a mut TrackedWrite<W>,
129            RowGroupMetaData,
130            Vec<Option<Sbbf>>,
131            Vec<Option<ColumnIndex>>,
132            Vec<Option<OffsetIndex>>,
133        ) -> Result<()>
134        + 'a
135        + Send,
136>;
137
138// ----------------------------------------------------------------------
139// Serialized impl for file & row group writers
140
141/// Parquet file writer API.
142/// Provides methods to write row groups sequentially.
143///
144/// The main workflow should be as following:
145/// - Create file writer, this will open a new file and potentially write some metadata.
146/// - Request a new row group writer by calling `next_row_group`.
147/// - Once finished writing row group, close row group writer by calling `close`
148/// - Write subsequent row groups, if necessary.
149/// - After all row groups have been written, close the file writer using `close` method.
150pub struct SerializedFileWriter<W: Write> {
151    buf: TrackedWrite<W>,
152    schema: TypePtr,
153    descr: SchemaDescPtr,
154    props: WriterPropertiesPtr,
155    row_groups: Vec<RowGroupMetaData>,
156    bloom_filters: Vec<Vec<Option<Sbbf>>>,
157    column_indexes: Vec<Vec<Option<ColumnIndex>>>,
158    offset_indexes: Vec<Vec<Option<OffsetIndex>>>,
159    row_group_index: usize,
160    // kv_metadatas will be appended to `props` when `write_metadata`
161    kv_metadatas: Vec<KeyValue>,
162    finished: bool,
163    #[cfg(feature = "encryption")]
164    file_encryptor: Option<Arc<FileEncryptor>>,
165}
166
167impl<W: Write> Debug for SerializedFileWriter<W> {
168    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
169        // implement Debug so this can be used with #[derive(Debug)]
170        // in client code rather than actually listing all the fields
171        f.debug_struct("SerializedFileWriter")
172            .field("descr", &self.descr)
173            .field("row_group_index", &self.row_group_index)
174            .field("kv_metadatas", &self.kv_metadatas)
175            .finish_non_exhaustive()
176    }
177}
178
179impl<W: Write + Send> SerializedFileWriter<W> {
180    /// Creates new file writer.
181    pub fn new(buf: W, schema: TypePtr, properties: WriterPropertiesPtr) -> Result<Self> {
182        let mut buf = TrackedWrite::new(buf);
183
184        let schema_descriptor = SchemaDescriptor::new(schema.clone());
185
186        #[cfg(feature = "encryption")]
187        let file_encryptor = Self::get_file_encryptor(&properties, &schema_descriptor)?;
188
189        Self::start_file(&properties, &mut buf)?;
190        Ok(Self {
191            buf,
192            schema,
193            descr: Arc::new(schema_descriptor),
194            props: properties,
195            row_groups: vec![],
196            bloom_filters: vec![],
197            column_indexes: Vec::new(),
198            offset_indexes: Vec::new(),
199            row_group_index: 0,
200            kv_metadatas: Vec::new(),
201            finished: false,
202            #[cfg(feature = "encryption")]
203            file_encryptor,
204        })
205    }
206
207    #[cfg(feature = "encryption")]
208    fn get_file_encryptor(
209        properties: &WriterPropertiesPtr,
210        schema_descriptor: &SchemaDescriptor,
211    ) -> Result<Option<Arc<FileEncryptor>>> {
212        if let Some(file_encryption_properties) = &properties.file_encryption_properties {
213            file_encryption_properties.validate_encrypted_column_names(schema_descriptor)?;
214
215            Ok(Some(Arc::new(FileEncryptor::new(
216                file_encryption_properties.clone(),
217            )?)))
218        } else {
219            Ok(None)
220        }
221    }
222
223    /// Creates new row group from this file writer.
224    /// In case of IO error or Thrift error, returns `Err`.
225    ///
226    /// There can be at most 2^15 row groups in a file; and row groups have
227    /// to be written sequentially. Every time the next row group is requested, the
228    /// previous row group must be finalised and closed using `RowGroupWriter::close` method.
229    pub fn next_row_group(&mut self) -> Result<SerializedRowGroupWriter<'_, W>> {
230        self.assert_previous_writer_closed()?;
231        let ordinal = self.row_group_index;
232
233        let ordinal: i16 = ordinal.try_into().map_err(|_| {
234            ParquetError::General(format!(
235                "Parquet does not support more than {} row groups per file (currently: {})",
236                i16::MAX,
237                ordinal
238            ))
239        })?;
240
241        self.row_group_index = self
242            .row_group_index
243            .checked_add(1)
244            .expect("SerializedFileWriter::row_group_index overflowed");
245
246        let bloom_filter_position = self.properties().bloom_filter_position();
247        let row_groups = &mut self.row_groups;
248        let row_bloom_filters = &mut self.bloom_filters;
249        let row_column_indexes = &mut self.column_indexes;
250        let row_offset_indexes = &mut self.offset_indexes;
251        let on_close = move |buf,
252                             mut metadata,
253                             row_group_bloom_filter,
254                             row_group_column_index,
255                             row_group_offset_index| {
256            row_bloom_filters.push(row_group_bloom_filter);
257            row_column_indexes.push(row_group_column_index);
258            row_offset_indexes.push(row_group_offset_index);
259            // write bloom filters out immediately after the row group if requested
260            match bloom_filter_position {
261                BloomFilterPosition::AfterRowGroup => {
262                    write_bloom_filters(buf, row_bloom_filters, &mut metadata)?
263                }
264                BloomFilterPosition::End => (),
265            };
266            row_groups.push(metadata);
267            Ok(())
268        };
269
270        let row_group_writer = SerializedRowGroupWriter::new(
271            self.descr.clone(),
272            self.props.clone(),
273            &mut self.buf,
274            ordinal,
275            Some(Box::new(on_close)),
276        );
277        #[cfg(feature = "encryption")]
278        let row_group_writer = row_group_writer.with_file_encryptor(self.file_encryptor.clone());
279
280        Ok(row_group_writer)
281    }
282
283    /// Returns metadata for any flushed row groups
284    pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
285        &self.row_groups
286    }
287
288    /// Close and finalize the underlying Parquet writer
289    ///
290    /// Unlike [`Self::close`] this does not consume self
291    ///
292    /// Attempting to write after calling finish will result in an error
293    pub fn finish(&mut self) -> Result<parquet::FileMetaData> {
294        self.assert_previous_writer_closed()?;
295        let metadata = self.write_metadata()?;
296        self.buf.flush()?;
297        Ok(metadata)
298    }
299
300    /// Closes and finalises file writer, returning the file metadata.
301    pub fn close(mut self) -> Result<parquet::FileMetaData> {
302        self.finish()
303    }
304
305    /// Writes magic bytes at the beginning of the file.
306    #[cfg(not(feature = "encryption"))]
307    fn start_file(_properties: &WriterPropertiesPtr, buf: &mut TrackedWrite<W>) -> Result<()> {
308        buf.write_all(get_file_magic())?;
309        Ok(())
310    }
311
312    /// Writes magic bytes at the beginning of the file.
313    #[cfg(feature = "encryption")]
314    fn start_file(properties: &WriterPropertiesPtr, buf: &mut TrackedWrite<W>) -> Result<()> {
315        let magic = get_file_magic(properties.file_encryption_properties.as_ref());
316
317        buf.write_all(magic)?;
318        Ok(())
319    }
320
321    /// Assembles and writes metadata at the end of the file.
322    fn write_metadata(&mut self) -> Result<parquet::FileMetaData> {
323        self.finished = true;
324
325        // write out any remaining bloom filters after all row groups
326        for row_group in &mut self.row_groups {
327            write_bloom_filters(&mut self.buf, &mut self.bloom_filters, row_group)?;
328        }
329
330        let key_value_metadata = match self.props.key_value_metadata() {
331            Some(kv) => Some(kv.iter().chain(&self.kv_metadatas).cloned().collect()),
332            None if self.kv_metadatas.is_empty() => None,
333            None => Some(self.kv_metadatas.clone()),
334        };
335
336        let row_groups = self
337            .row_groups
338            .iter()
339            .map(|v| v.to_thrift())
340            .collect::<Vec<_>>();
341
342        let mut encoder = ThriftMetadataWriter::new(
343            &mut self.buf,
344            &self.schema,
345            &self.descr,
346            row_groups,
347            Some(self.props.created_by().to_string()),
348            self.props.writer_version().as_num(),
349        );
350
351        #[cfg(feature = "encryption")]
352        {
353            encoder = encoder.with_file_encryptor(self.file_encryptor.clone());
354        }
355
356        if let Some(key_value_metadata) = key_value_metadata {
357            encoder = encoder.with_key_value_metadata(key_value_metadata)
358        }
359        encoder = encoder.with_column_indexes(&self.column_indexes);
360        encoder = encoder.with_offset_indexes(&self.offset_indexes);
361        encoder.finish()
362    }
363
364    #[inline]
365    fn assert_previous_writer_closed(&self) -> Result<()> {
366        if self.finished {
367            return Err(general_err!("SerializedFileWriter already finished"));
368        }
369
370        if self.row_group_index != self.row_groups.len() {
371            Err(general_err!("Previous row group writer was not closed"))
372        } else {
373            Ok(())
374        }
375    }
376
377    /// Add a [`KeyValue`] to the file writer's metadata
378    pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
379        self.kv_metadatas.push(kv_metadata);
380    }
381
382    /// Returns a reference to schema descriptor.
383    pub fn schema_descr(&self) -> &SchemaDescriptor {
384        &self.descr
385    }
386
387    /// Returns a reference to the writer properties
388    pub fn properties(&self) -> &WriterPropertiesPtr {
389        &self.props
390    }
391
392    /// Returns a reference to the underlying writer.
393    pub fn inner(&self) -> &W {
394        self.buf.inner()
395    }
396
397    /// Returns a mutable reference to the underlying writer.
398    ///
399    /// It is inadvisable to directly write to the underlying writer.
400    pub fn inner_mut(&mut self) -> &mut W {
401        self.buf.inner_mut()
402    }
403
404    /// Writes the file footer and returns the underlying writer.
405    pub fn into_inner(mut self) -> Result<W> {
406        self.assert_previous_writer_closed()?;
407        let _ = self.write_metadata()?;
408
409        self.buf.into_inner()
410    }
411
412    /// Returns the number of bytes written to this instance
413    pub fn bytes_written(&self) -> usize {
414        self.buf.bytes_written()
415    }
416
417    /// Get the file encryptor used by this instance to encrypt data
418    #[cfg(feature = "encryption")]
419    pub(crate) fn file_encryptor(&self) -> Option<Arc<FileEncryptor>> {
420        self.file_encryptor.clone()
421    }
422}
423
424/// Serialize all the bloom filters of the given row group to the given buffer,
425/// and returns the updated row group metadata.
426fn write_bloom_filters<W: Write + Send>(
427    buf: &mut TrackedWrite<W>,
428    bloom_filters: &mut [Vec<Option<Sbbf>>],
429    row_group: &mut RowGroupMetaData,
430) -> Result<()> {
431    // iter row group
432    // iter each column
433    // write bloom filter to the file
434
435    let row_group_idx: u16 = row_group
436        .ordinal()
437        .expect("Missing row group ordinal")
438        .try_into()
439        .map_err(|_| {
440            ParquetError::General(format!(
441                "Negative row group ordinal: {})",
442                row_group.ordinal().unwrap()
443            ))
444        })?;
445    let row_group_idx = row_group_idx as usize;
446    for (column_idx, column_chunk) in row_group.columns_mut().iter_mut().enumerate() {
447        if let Some(bloom_filter) = bloom_filters[row_group_idx][column_idx].take() {
448            let start_offset = buf.bytes_written();
449            bloom_filter.write(&mut *buf)?;
450            let end_offset = buf.bytes_written();
451            // set offset and index for bloom filter
452            *column_chunk = column_chunk
453                .clone()
454                .into_builder()
455                .set_bloom_filter_offset(Some(start_offset as i64))
456                .set_bloom_filter_length(Some((end_offset - start_offset) as i32))
457                .build()?;
458        }
459    }
460    Ok(())
461}
462
463/// Parquet row group writer API.
464/// Provides methods to access column writers in an iterator-like fashion, order is
465/// guaranteed to match the order of schema leaves (column descriptors).
466///
467/// All columns should be written sequentially; the main workflow is:
468/// - Request the next column using `next_column` method - this will return `None` if no
469///   more columns are available to write.
470/// - Once done writing a column, close column writer with `close`
471/// - Once all columns have been written, close row group writer with `close`
472///   method. THe close method will return row group metadata and is no-op
473///   on already closed row group.
474pub struct SerializedRowGroupWriter<'a, W: Write> {
475    descr: SchemaDescPtr,
476    props: WriterPropertiesPtr,
477    buf: &'a mut TrackedWrite<W>,
478    total_rows_written: Option<u64>,
479    total_bytes_written: u64,
480    total_uncompressed_bytes: i64,
481    column_index: usize,
482    row_group_metadata: Option<RowGroupMetaDataPtr>,
483    column_chunks: Vec<ColumnChunkMetaData>,
484    bloom_filters: Vec<Option<Sbbf>>,
485    column_indexes: Vec<Option<ColumnIndex>>,
486    offset_indexes: Vec<Option<OffsetIndex>>,
487    row_group_index: i16,
488    file_offset: i64,
489    on_close: Option<OnCloseRowGroup<'a, W>>,
490    #[cfg(feature = "encryption")]
491    file_encryptor: Option<Arc<FileEncryptor>>,
492}
493
494impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> {
495    /// Creates a new `SerializedRowGroupWriter` with:
496    ///
497    /// - `schema_descr` - the schema to write
498    /// - `properties` - writer properties
499    /// - `buf` - the buffer to write data to
500    /// - `row_group_index` - row group index in this parquet file.
501    /// - `file_offset` - file offset of this row group in this parquet file.
502    /// - `on_close` - an optional callback that will invoked on [`Self::close`]
503    pub fn new(
504        schema_descr: SchemaDescPtr,
505        properties: WriterPropertiesPtr,
506        buf: &'a mut TrackedWrite<W>,
507        row_group_index: i16,
508        on_close: Option<OnCloseRowGroup<'a, W>>,
509    ) -> Self {
510        let num_columns = schema_descr.num_columns();
511        let file_offset = buf.bytes_written() as i64;
512        Self {
513            buf,
514            row_group_index,
515            file_offset,
516            on_close,
517            total_rows_written: None,
518            descr: schema_descr,
519            props: properties,
520            column_index: 0,
521            row_group_metadata: None,
522            column_chunks: Vec::with_capacity(num_columns),
523            bloom_filters: Vec::with_capacity(num_columns),
524            column_indexes: Vec::with_capacity(num_columns),
525            offset_indexes: Vec::with_capacity(num_columns),
526            total_bytes_written: 0,
527            total_uncompressed_bytes: 0,
528            #[cfg(feature = "encryption")]
529            file_encryptor: None,
530        }
531    }
532
533    #[cfg(feature = "encryption")]
534    /// Set the file encryptor to use for encrypting row group data and metadata
535    pub(crate) fn with_file_encryptor(
536        mut self,
537        file_encryptor: Option<Arc<FileEncryptor>>,
538    ) -> Self {
539        self.file_encryptor = file_encryptor;
540        self
541    }
542
543    /// Advance `self.column_index` returning the next [`ColumnDescPtr`] if any
544    fn next_column_desc(&mut self) -> Option<ColumnDescPtr> {
545        let ret = self.descr.columns().get(self.column_index)?.clone();
546        self.column_index += 1;
547        Some(ret)
548    }
549
550    /// Returns [`OnCloseColumnChunk`] for the next writer
551    fn get_on_close(&mut self) -> (&mut TrackedWrite<W>, OnCloseColumnChunk<'_>) {
552        let total_bytes_written = &mut self.total_bytes_written;
553        let total_uncompressed_bytes = &mut self.total_uncompressed_bytes;
554        let total_rows_written = &mut self.total_rows_written;
555        let column_chunks = &mut self.column_chunks;
556        let column_indexes = &mut self.column_indexes;
557        let offset_indexes = &mut self.offset_indexes;
558        let bloom_filters = &mut self.bloom_filters;
559
560        let on_close = |r: ColumnCloseResult| {
561            // Update row group writer metrics
562            *total_bytes_written += r.bytes_written;
563            *total_uncompressed_bytes += r.metadata.uncompressed_size();
564            column_chunks.push(r.metadata);
565            bloom_filters.push(r.bloom_filter);
566            column_indexes.push(r.column_index);
567            offset_indexes.push(r.offset_index);
568
569            if let Some(rows) = *total_rows_written {
570                if rows != r.rows_written {
571                    return Err(general_err!(
572                        "Incorrect number of rows, expected {} != {} rows",
573                        rows,
574                        r.rows_written
575                    ));
576                }
577            } else {
578                *total_rows_written = Some(r.rows_written);
579            }
580
581            Ok(())
582        };
583        (self.buf, Box::new(on_close))
584    }
585
586    /// Returns the next column writer, if available, using the factory function;
587    /// otherwise returns `None`.
588    pub(crate) fn next_column_with_factory<'b, F, C>(&'b mut self, factory: F) -> Result<Option<C>>
589    where
590        F: FnOnce(
591            ColumnDescPtr,
592            WriterPropertiesPtr,
593            Box<dyn PageWriter + 'b>,
594            OnCloseColumnChunk<'b>,
595        ) -> Result<C>,
596    {
597        self.assert_previous_writer_closed()?;
598
599        let encryptor_context = self.get_page_encryptor_context();
600
601        Ok(match self.next_column_desc() {
602            Some(column) => {
603                let props = self.props.clone();
604                let (buf, on_close) = self.get_on_close();
605
606                let page_writer = SerializedPageWriter::new(buf);
607                let page_writer =
608                    Self::set_page_writer_encryptor(&column, encryptor_context, page_writer)?;
609
610                Some(factory(
611                    column,
612                    props,
613                    Box::new(page_writer),
614                    Box::new(on_close),
615                )?)
616            }
617            None => None,
618        })
619    }
620
621    /// Returns the next column writer, if available; otherwise returns `None`.
622    /// In case of any IO error or Thrift error, or if row group writer has already been
623    /// closed returns `Err`.
624    pub fn next_column(&mut self) -> Result<Option<SerializedColumnWriter<'_>>> {
625        self.next_column_with_factory(|descr, props, page_writer, on_close| {
626            let column_writer = get_column_writer(descr, props, page_writer);
627            Ok(SerializedColumnWriter::new(column_writer, Some(on_close)))
628        })
629    }
630
631    /// Append an encoded column chunk from another source without decoding it
632    ///
633    /// This can be used for efficiently concatenating or projecting parquet data,
634    /// or encoding parquet data to temporary in-memory buffers
635    ///
636    /// See [`Self::next_column`] for writing data that isn't already encoded
637    pub fn append_column<R: ChunkReader>(
638        &mut self,
639        reader: &R,
640        mut close: ColumnCloseResult,
641    ) -> Result<()> {
642        self.assert_previous_writer_closed()?;
643        let desc = self
644            .next_column_desc()
645            .ok_or_else(|| general_err!("exhausted columns in SerializedRowGroupWriter"))?;
646
647        let metadata = close.metadata;
648
649        if metadata.column_descr() != desc.as_ref() {
650            return Err(general_err!(
651                "column descriptor mismatch, expected {:?} got {:?}",
652                desc,
653                metadata.column_descr()
654            ));
655        }
656
657        let src_dictionary_offset = metadata.dictionary_page_offset();
658        let src_data_offset = metadata.data_page_offset();
659        let src_offset = src_dictionary_offset.unwrap_or(src_data_offset);
660        let src_length = metadata.compressed_size();
661
662        let write_offset = self.buf.bytes_written();
663        let mut read = reader.get_read(src_offset as _)?.take(src_length as _);
664        let write_length = std::io::copy(&mut read, &mut self.buf)?;
665
666        if src_length as u64 != write_length {
667            return Err(general_err!(
668                "Failed to splice column data, expected {read_length} got {write_length}"
669            ));
670        }
671
672        let map_offset = |x| x - src_offset + write_offset as i64;
673        let mut builder = ColumnChunkMetaData::builder(metadata.column_descr_ptr())
674            .set_compression(metadata.compression())
675            .set_encodings(metadata.encodings().clone())
676            .set_total_compressed_size(metadata.compressed_size())
677            .set_total_uncompressed_size(metadata.uncompressed_size())
678            .set_num_values(metadata.num_values())
679            .set_data_page_offset(map_offset(src_data_offset))
680            .set_dictionary_page_offset(src_dictionary_offset.map(map_offset))
681            .set_unencoded_byte_array_data_bytes(metadata.unencoded_byte_array_data_bytes());
682
683        if let Some(rep_hist) = metadata.repetition_level_histogram() {
684            builder = builder.set_repetition_level_histogram(Some(rep_hist.clone()))
685        }
686        if let Some(def_hist) = metadata.definition_level_histogram() {
687            builder = builder.set_definition_level_histogram(Some(def_hist.clone()))
688        }
689        if let Some(statistics) = metadata.statistics() {
690            builder = builder.set_statistics(statistics.clone())
691        }
692        builder = self.set_column_crypto_metadata(builder, &metadata);
693        close.metadata = builder.build()?;
694
695        if let Some(offsets) = close.offset_index.as_mut() {
696            for location in &mut offsets.page_locations {
697                location.offset = map_offset(location.offset)
698            }
699        }
700
701        let (_, on_close) = self.get_on_close();
702        on_close(close)
703    }
704
705    /// Closes this row group writer and returns row group metadata.
706    pub fn close(mut self) -> Result<RowGroupMetaDataPtr> {
707        if self.row_group_metadata.is_none() {
708            self.assert_previous_writer_closed()?;
709
710            let column_chunks = std::mem::take(&mut self.column_chunks);
711            let row_group_metadata = RowGroupMetaData::builder(self.descr.clone())
712                .set_column_metadata(column_chunks)
713                .set_total_byte_size(self.total_uncompressed_bytes)
714                .set_num_rows(self.total_rows_written.unwrap_or(0) as i64)
715                .set_sorting_columns(self.props.sorting_columns().cloned())
716                .set_ordinal(self.row_group_index)
717                .set_file_offset(self.file_offset)
718                .build()?;
719
720            self.row_group_metadata = Some(Arc::new(row_group_metadata.clone()));
721
722            if let Some(on_close) = self.on_close.take() {
723                on_close(
724                    self.buf,
725                    row_group_metadata,
726                    self.bloom_filters,
727                    self.column_indexes,
728                    self.offset_indexes,
729                )?
730            }
731        }
732
733        let metadata = self.row_group_metadata.as_ref().unwrap().clone();
734        Ok(metadata)
735    }
736
737    /// Set the column crypto metadata for a column chunk
738    #[cfg(feature = "encryption")]
739    fn set_column_crypto_metadata(
740        &self,
741        builder: ColumnChunkMetaDataBuilder,
742        metadata: &ColumnChunkMetaData,
743    ) -> ColumnChunkMetaDataBuilder {
744        if let Some(file_encryptor) = self.file_encryptor.as_ref() {
745            builder.set_column_crypto_metadata(get_column_crypto_metadata(
746                file_encryptor.properties(),
747                &metadata.column_descr_ptr(),
748            ))
749        } else {
750            builder
751        }
752    }
753
754    /// Get context required to create a [`PageEncryptor`] for a column
755    #[cfg(feature = "encryption")]
756    fn get_page_encryptor_context(&self) -> PageEncryptorContext {
757        PageEncryptorContext {
758            file_encryptor: self.file_encryptor.clone(),
759            row_group_index: self.row_group_index as usize,
760            column_index: self.column_index,
761        }
762    }
763
764    /// Set the [`PageEncryptor`] on a page writer if a column is encrypted
765    #[cfg(feature = "encryption")]
766    fn set_page_writer_encryptor<'b>(
767        column: &ColumnDescPtr,
768        context: PageEncryptorContext,
769        page_writer: SerializedPageWriter<'b, W>,
770    ) -> Result<SerializedPageWriter<'b, W>> {
771        let page_encryptor = PageEncryptor::create_if_column_encrypted(
772            &context.file_encryptor,
773            context.row_group_index,
774            context.column_index,
775            &column.path().string(),
776        )?;
777
778        Ok(page_writer.with_page_encryptor(page_encryptor))
779    }
780
781    /// No-op implementation of setting the column crypto metadata for a column chunk
782    #[cfg(not(feature = "encryption"))]
783    fn set_column_crypto_metadata(
784        &self,
785        builder: ColumnChunkMetaDataBuilder,
786        _metadata: &ColumnChunkMetaData,
787    ) -> ColumnChunkMetaDataBuilder {
788        builder
789    }
790
791    #[cfg(not(feature = "encryption"))]
792    fn get_page_encryptor_context(&self) -> PageEncryptorContext {
793        PageEncryptorContext {}
794    }
795
796    /// No-op implementation of setting a [`PageEncryptor`] for when encryption is disabled
797    #[cfg(not(feature = "encryption"))]
798    fn set_page_writer_encryptor<'b>(
799        _column: &ColumnDescPtr,
800        _context: PageEncryptorContext,
801        page_writer: SerializedPageWriter<'b, W>,
802    ) -> Result<SerializedPageWriter<'b, W>> {
803        Ok(page_writer)
804    }
805
806    #[inline]
807    fn assert_previous_writer_closed(&self) -> Result<()> {
808        if self.column_index != self.column_chunks.len() {
809            Err(general_err!("Previous column writer was not closed"))
810        } else {
811            Ok(())
812        }
813    }
814}
815
816/// Context required to create a [`PageEncryptor`] for a column
817#[cfg(feature = "encryption")]
818struct PageEncryptorContext {
819    file_encryptor: Option<Arc<FileEncryptor>>,
820    row_group_index: usize,
821    column_index: usize,
822}
823
824#[cfg(not(feature = "encryption"))]
825struct PageEncryptorContext {}
826
827/// A wrapper around a [`ColumnWriter`] that invokes a callback on [`Self::close`]
828pub struct SerializedColumnWriter<'a> {
829    inner: ColumnWriter<'a>,
830    on_close: Option<OnCloseColumnChunk<'a>>,
831}
832
833impl<'a> SerializedColumnWriter<'a> {
834    /// Create a new [`SerializedColumnWriter`] from a [`ColumnWriter`] and an
835    /// optional callback to be invoked on [`Self::close`]
836    pub fn new(inner: ColumnWriter<'a>, on_close: Option<OnCloseColumnChunk<'a>>) -> Self {
837        Self { inner, on_close }
838    }
839
840    /// Returns a reference to an untyped [`ColumnWriter`]
841    pub fn untyped(&mut self) -> &mut ColumnWriter<'a> {
842        &mut self.inner
843    }
844
845    /// Returns a reference to a typed [`ColumnWriterImpl`]
846    pub fn typed<T: DataType>(&mut self) -> &mut ColumnWriterImpl<'a, T> {
847        get_typed_column_writer_mut(&mut self.inner)
848    }
849
850    /// Close this [`SerializedColumnWriter`]
851    pub fn close(mut self) -> Result<()> {
852        let r = self.inner.close()?;
853        if let Some(on_close) = self.on_close.take() {
854            on_close(r)?
855        }
856
857        Ok(())
858    }
859}
860
861/// A serialized implementation for Parquet [`PageWriter`].
862/// Writes and serializes pages and metadata into output stream.
863///
864/// `SerializedPageWriter` should not be used after calling `close()`.
865pub struct SerializedPageWriter<'a, W: Write> {
866    sink: &'a mut TrackedWrite<W>,
867    #[cfg(feature = "encryption")]
868    page_encryptor: Option<PageEncryptor>,
869}
870
871impl<'a, W: Write> SerializedPageWriter<'a, W> {
872    /// Creates new page writer.
873    pub fn new(sink: &'a mut TrackedWrite<W>) -> Self {
874        Self {
875            sink,
876            #[cfg(feature = "encryption")]
877            page_encryptor: None,
878        }
879    }
880
881    /// Serializes page header into Thrift.
882    /// Returns number of bytes that have been written into the sink.
883    #[inline]
884    fn serialize_page_header(&mut self, header: parquet::PageHeader) -> Result<usize> {
885        let start_pos = self.sink.bytes_written();
886        match self.page_encryptor_and_sink_mut() {
887            Some((page_encryptor, sink)) => {
888                page_encryptor.encrypt_page_header(&header, sink)?;
889            }
890            None => {
891                let mut protocol = TCompactOutputProtocol::new(&mut self.sink);
892                header.write_to_out_protocol(&mut protocol)?;
893            }
894        }
895        Ok(self.sink.bytes_written() - start_pos)
896    }
897}
898
899#[cfg(feature = "encryption")]
900impl<'a, W: Write> SerializedPageWriter<'a, W> {
901    /// Set the encryptor to use to encrypt page data
902    fn with_page_encryptor(mut self, page_encryptor: Option<PageEncryptor>) -> Self {
903        self.page_encryptor = page_encryptor;
904        self
905    }
906
907    fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
908        self.page_encryptor.as_mut()
909    }
910
911    fn page_encryptor_and_sink_mut(
912        &mut self,
913    ) -> Option<(&mut PageEncryptor, &mut &'a mut TrackedWrite<W>)> {
914        self.page_encryptor.as_mut().map(|pe| (pe, &mut self.sink))
915    }
916}
917
918#[cfg(not(feature = "encryption"))]
919impl<'a, W: Write> SerializedPageWriter<'a, W> {
920    fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
921        None
922    }
923
924    fn page_encryptor_and_sink_mut(
925        &mut self,
926    ) -> Option<(&mut PageEncryptor, &mut &'a mut TrackedWrite<W>)> {
927        None
928    }
929}
930
931impl<W: Write + Send> PageWriter for SerializedPageWriter<'_, W> {
932    fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
933        let page = match self.page_encryptor_mut() {
934            Some(page_encryptor) => page_encryptor.encrypt_compressed_page(page)?,
935            None => page,
936        };
937
938        let page_type = page.page_type();
939        let start_pos = self.sink.bytes_written() as u64;
940
941        let page_header = page.to_thrift_header();
942        let header_size = self.serialize_page_header(page_header)?;
943
944        self.sink.write_all(page.data())?;
945
946        let mut spec = PageWriteSpec::new();
947        spec.page_type = page_type;
948        spec.uncompressed_size = page.uncompressed_size() + header_size;
949        spec.compressed_size = page.compressed_size() + header_size;
950        spec.offset = start_pos;
951        spec.bytes_written = self.sink.bytes_written() as u64 - start_pos;
952        spec.num_values = page.num_values();
953
954        if let Some(page_encryptor) = self.page_encryptor_mut() {
955            if page.compressed_page().is_data_page() {
956                page_encryptor.increment_page();
957            }
958        }
959        Ok(spec)
960    }
961
962    fn close(&mut self) -> Result<()> {
963        self.sink.flush()?;
964        Ok(())
965    }
966}
967
968/// Get the magic bytes at the start and end of the file that identify this
969/// as a Parquet file.
970#[cfg(feature = "encryption")]
971pub(crate) fn get_file_magic(
972    file_encryption_properties: Option<&FileEncryptionProperties>,
973) -> &'static [u8; 4] {
974    match file_encryption_properties.as_ref() {
975        Some(encryption_properties) if encryption_properties.encrypt_footer() => {
976            &PARQUET_MAGIC_ENCR_FOOTER
977        }
978        _ => &PARQUET_MAGIC,
979    }
980}
981
982#[cfg(not(feature = "encryption"))]
983pub(crate) fn get_file_magic() -> &'static [u8; 4] {
984    &PARQUET_MAGIC
985}
986
987#[cfg(test)]
988mod tests {
989    use super::*;
990
991    #[cfg(feature = "arrow")]
992    use arrow_array::RecordBatchReader;
993    use bytes::Bytes;
994    use std::fs::File;
995
996    #[cfg(feature = "arrow")]
997    use crate::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
998    #[cfg(feature = "arrow")]
999    use crate::arrow::ArrowWriter;
1000    use crate::basic::{
1001        ColumnOrder, Compression, ConvertedType, Encoding, LogicalType, Repetition, SortOrder, Type,
1002    };
1003    use crate::column::page::{Page, PageReader};
1004    use crate::column::reader::get_typed_column_reader;
1005    use crate::compression::{create_codec, Codec, CodecOptionsBuilder};
1006    use crate::data_type::{BoolType, ByteArrayType, Int32Type};
1007    use crate::file::page_index::index::Index;
1008    use crate::file::properties::EnabledStatistics;
1009    use crate::file::serialized_reader::ReadOptionsBuilder;
1010    use crate::file::{
1011        properties::{ReaderProperties, WriterProperties, WriterVersion},
1012        reader::{FileReader, SerializedFileReader, SerializedPageReader},
1013        statistics::{from_thrift, to_thrift, Statistics},
1014    };
1015    use crate::format::SortingColumn;
1016    use crate::record::{Row, RowAccessor};
1017    use crate::schema::parser::parse_message_type;
1018    use crate::schema::types;
1019    use crate::schema::types::{ColumnDescriptor, ColumnPath};
1020    use crate::util::test_common::rand_gen::RandGen;
1021
1022    #[test]
1023    fn test_row_group_writer_error_not_all_columns_written() {
1024        let file = tempfile::tempfile().unwrap();
1025        let schema = Arc::new(
1026            types::Type::group_type_builder("schema")
1027                .with_fields(vec![Arc::new(
1028                    types::Type::primitive_type_builder("col1", Type::INT32)
1029                        .build()
1030                        .unwrap(),
1031                )])
1032                .build()
1033                .unwrap(),
1034        );
1035        let props = Default::default();
1036        let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
1037        let row_group_writer = writer.next_row_group().unwrap();
1038        let res = row_group_writer.close();
1039        assert!(res.is_err());
1040        if let Err(err) = res {
1041            assert_eq!(
1042                format!("{err}"),
1043                "Parquet error: Column length mismatch: 1 != 0"
1044            );
1045        }
1046    }
1047
1048    #[test]
1049    fn test_row_group_writer_num_records_mismatch() {
1050        let file = tempfile::tempfile().unwrap();
1051        let schema = Arc::new(
1052            types::Type::group_type_builder("schema")
1053                .with_fields(vec![
1054                    Arc::new(
1055                        types::Type::primitive_type_builder("col1", Type::INT32)
1056                            .with_repetition(Repetition::REQUIRED)
1057                            .build()
1058                            .unwrap(),
1059                    ),
1060                    Arc::new(
1061                        types::Type::primitive_type_builder("col2", Type::INT32)
1062                            .with_repetition(Repetition::REQUIRED)
1063                            .build()
1064                            .unwrap(),
1065                    ),
1066                ])
1067                .build()
1068                .unwrap(),
1069        );
1070        let props = Default::default();
1071        let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
1072        let mut row_group_writer = writer.next_row_group().unwrap();
1073
1074        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
1075        col_writer
1076            .typed::<Int32Type>()
1077            .write_batch(&[1, 2, 3], None, None)
1078            .unwrap();
1079        col_writer.close().unwrap();
1080
1081        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
1082        col_writer
1083            .typed::<Int32Type>()
1084            .write_batch(&[1, 2], None, None)
1085            .unwrap();
1086
1087        let err = col_writer.close().unwrap_err();
1088        assert_eq!(
1089            err.to_string(),
1090            "Parquet error: Incorrect number of rows, expected 3 != 2 rows"
1091        );
1092    }
1093
1094    #[test]
1095    fn test_file_writer_empty_file() {
1096        let file = tempfile::tempfile().unwrap();
1097
1098        let schema = Arc::new(
1099            types::Type::group_type_builder("schema")
1100                .with_fields(vec![Arc::new(
1101                    types::Type::primitive_type_builder("col1", Type::INT32)
1102                        .build()
1103                        .unwrap(),
1104                )])
1105                .build()
1106                .unwrap(),
1107        );
1108        let props = Default::default();
1109        let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1110        writer.close().unwrap();
1111
1112        let reader = SerializedFileReader::new(file).unwrap();
1113        assert_eq!(reader.get_row_iter(None).unwrap().count(), 0);
1114    }
1115
1116    #[test]
1117    fn test_file_writer_column_orders_populated() {
1118        let file = tempfile::tempfile().unwrap();
1119
1120        let schema = Arc::new(
1121            types::Type::group_type_builder("schema")
1122                .with_fields(vec![
1123                    Arc::new(
1124                        types::Type::primitive_type_builder("col1", Type::INT32)
1125                            .build()
1126                            .unwrap(),
1127                    ),
1128                    Arc::new(
1129                        types::Type::primitive_type_builder("col2", Type::FIXED_LEN_BYTE_ARRAY)
1130                            .with_converted_type(ConvertedType::INTERVAL)
1131                            .with_length(12)
1132                            .build()
1133                            .unwrap(),
1134                    ),
1135                    Arc::new(
1136                        types::Type::group_type_builder("nested")
1137                            .with_repetition(Repetition::REQUIRED)
1138                            .with_fields(vec![
1139                                Arc::new(
1140                                    types::Type::primitive_type_builder(
1141                                        "col3",
1142                                        Type::FIXED_LEN_BYTE_ARRAY,
1143                                    )
1144                                    .with_logical_type(Some(LogicalType::Float16))
1145                                    .with_length(2)
1146                                    .build()
1147                                    .unwrap(),
1148                                ),
1149                                Arc::new(
1150                                    types::Type::primitive_type_builder("col4", Type::BYTE_ARRAY)
1151                                        .with_logical_type(Some(LogicalType::String))
1152                                        .build()
1153                                        .unwrap(),
1154                                ),
1155                            ])
1156                            .build()
1157                            .unwrap(),
1158                    ),
1159                ])
1160                .build()
1161                .unwrap(),
1162        );
1163
1164        let props = Default::default();
1165        let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1166        writer.close().unwrap();
1167
1168        let reader = SerializedFileReader::new(file).unwrap();
1169
1170        // only leaves
1171        let expected = vec![
1172            // INT32
1173            ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED),
1174            // INTERVAL
1175            ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::UNDEFINED),
1176            // Float16
1177            ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED),
1178            // String
1179            ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::UNSIGNED),
1180        ];
1181        let actual = reader.metadata().file_metadata().column_orders();
1182
1183        assert!(actual.is_some());
1184        let actual = actual.unwrap();
1185        assert_eq!(*actual, expected);
1186    }
1187
1188    #[test]
1189    fn test_file_writer_with_metadata() {
1190        let file = tempfile::tempfile().unwrap();
1191
1192        let schema = Arc::new(
1193            types::Type::group_type_builder("schema")
1194                .with_fields(vec![Arc::new(
1195                    types::Type::primitive_type_builder("col1", Type::INT32)
1196                        .build()
1197                        .unwrap(),
1198                )])
1199                .build()
1200                .unwrap(),
1201        );
1202        let props = Arc::new(
1203            WriterProperties::builder()
1204                .set_key_value_metadata(Some(vec![KeyValue::new(
1205                    "key".to_string(),
1206                    "value".to_string(),
1207                )]))
1208                .build(),
1209        );
1210        let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1211        writer.close().unwrap();
1212
1213        let reader = SerializedFileReader::new(file).unwrap();
1214        assert_eq!(
1215            reader
1216                .metadata()
1217                .file_metadata()
1218                .key_value_metadata()
1219                .to_owned()
1220                .unwrap()
1221                .len(),
1222            1
1223        );
1224    }
1225
1226    #[test]
1227    fn test_file_writer_v2_with_metadata() {
1228        let file = tempfile::tempfile().unwrap();
1229        let field_logical_type = Some(LogicalType::Integer {
1230            bit_width: 8,
1231            is_signed: false,
1232        });
1233        let field = Arc::new(
1234            types::Type::primitive_type_builder("col1", Type::INT32)
1235                .with_logical_type(field_logical_type.clone())
1236                .with_converted_type(field_logical_type.into())
1237                .build()
1238                .unwrap(),
1239        );
1240        let schema = Arc::new(
1241            types::Type::group_type_builder("schema")
1242                .with_fields(vec![field.clone()])
1243                .build()
1244                .unwrap(),
1245        );
1246        let props = Arc::new(
1247            WriterProperties::builder()
1248                .set_key_value_metadata(Some(vec![KeyValue::new(
1249                    "key".to_string(),
1250                    "value".to_string(),
1251                )]))
1252                .set_writer_version(WriterVersion::PARQUET_2_0)
1253                .build(),
1254        );
1255        let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1256        writer.close().unwrap();
1257
1258        let reader = SerializedFileReader::new(file).unwrap();
1259
1260        assert_eq!(
1261            reader
1262                .metadata()
1263                .file_metadata()
1264                .key_value_metadata()
1265                .to_owned()
1266                .unwrap()
1267                .len(),
1268            1
1269        );
1270
1271        // ARROW-11803: Test that the converted and logical types have been populated
1272        let fields = reader.metadata().file_metadata().schema().get_fields();
1273        assert_eq!(fields.len(), 1);
1274        assert_eq!(fields[0], field);
1275    }
1276
1277    #[test]
1278    fn test_file_writer_with_sorting_columns_metadata() {
1279        let file = tempfile::tempfile().unwrap();
1280
1281        let schema = Arc::new(
1282            types::Type::group_type_builder("schema")
1283                .with_fields(vec![
1284                    Arc::new(
1285                        types::Type::primitive_type_builder("col1", Type::INT32)
1286                            .build()
1287                            .unwrap(),
1288                    ),
1289                    Arc::new(
1290                        types::Type::primitive_type_builder("col2", Type::INT32)
1291                            .build()
1292                            .unwrap(),
1293                    ),
1294                ])
1295                .build()
1296                .unwrap(),
1297        );
1298        let expected_result = Some(vec![SortingColumn {
1299            column_idx: 0,
1300            descending: false,
1301            nulls_first: true,
1302        }]);
1303        let props = Arc::new(
1304            WriterProperties::builder()
1305                .set_key_value_metadata(Some(vec![KeyValue::new(
1306                    "key".to_string(),
1307                    "value".to_string(),
1308                )]))
1309                .set_sorting_columns(expected_result.clone())
1310                .build(),
1311        );
1312        let mut writer =
1313            SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1314        let mut row_group_writer = writer.next_row_group().expect("get row group writer");
1315
1316        let col_writer = row_group_writer.next_column().unwrap().unwrap();
1317        col_writer.close().unwrap();
1318
1319        let col_writer = row_group_writer.next_column().unwrap().unwrap();
1320        col_writer.close().unwrap();
1321
1322        row_group_writer.close().unwrap();
1323        writer.close().unwrap();
1324
1325        let reader = SerializedFileReader::new(file).unwrap();
1326        let result: Vec<Option<&Vec<SortingColumn>>> = reader
1327            .metadata()
1328            .row_groups()
1329            .iter()
1330            .map(|f| f.sorting_columns())
1331            .collect();
1332        // validate the sorting column read match the one written above
1333        assert_eq!(expected_result.as_ref(), result[0]);
1334    }
1335
1336    #[test]
1337    fn test_file_writer_empty_row_groups() {
1338        let file = tempfile::tempfile().unwrap();
1339        test_file_roundtrip(file, vec![]);
1340    }
1341
1342    #[test]
1343    fn test_file_writer_single_row_group() {
1344        let file = tempfile::tempfile().unwrap();
1345        test_file_roundtrip(file, vec![vec![1, 2, 3, 4, 5]]);
1346    }
1347
1348    #[test]
1349    fn test_file_writer_multiple_row_groups() {
1350        let file = tempfile::tempfile().unwrap();
1351        test_file_roundtrip(
1352            file,
1353            vec![
1354                vec![1, 2, 3, 4, 5],
1355                vec![1, 2, 3],
1356                vec![1],
1357                vec![1, 2, 3, 4, 5, 6],
1358            ],
1359        );
1360    }
1361
1362    #[test]
1363    fn test_file_writer_multiple_large_row_groups() {
1364        let file = tempfile::tempfile().unwrap();
1365        test_file_roundtrip(
1366            file,
1367            vec![vec![123; 1024], vec![124; 1000], vec![125; 15], vec![]],
1368        );
1369    }
1370
1371    #[test]
1372    fn test_page_writer_data_pages() {
1373        let pages = vec![
1374            Page::DataPage {
1375                buf: Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]),
1376                num_values: 10,
1377                encoding: Encoding::DELTA_BINARY_PACKED,
1378                def_level_encoding: Encoding::RLE,
1379                rep_level_encoding: Encoding::RLE,
1380                statistics: Some(Statistics::int32(Some(1), Some(3), None, Some(7), true)),
1381            },
1382            Page::DataPageV2 {
1383                buf: Bytes::from(vec![4; 128]),
1384                num_values: 10,
1385                encoding: Encoding::DELTA_BINARY_PACKED,
1386                num_nulls: 2,
1387                num_rows: 12,
1388                def_levels_byte_len: 24,
1389                rep_levels_byte_len: 32,
1390                is_compressed: false,
1391                statistics: Some(Statistics::int32(Some(1), Some(3), None, Some(7), true)),
1392            },
1393        ];
1394
1395        test_page_roundtrip(&pages[..], Compression::SNAPPY, Type::INT32);
1396        test_page_roundtrip(&pages[..], Compression::UNCOMPRESSED, Type::INT32);
1397    }
1398
1399    #[test]
1400    fn test_page_writer_dict_pages() {
1401        let pages = vec![
1402            Page::DictionaryPage {
1403                buf: Bytes::from(vec![1, 2, 3, 4, 5]),
1404                num_values: 5,
1405                encoding: Encoding::RLE_DICTIONARY,
1406                is_sorted: false,
1407            },
1408            Page::DataPage {
1409                buf: Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]),
1410                num_values: 10,
1411                encoding: Encoding::DELTA_BINARY_PACKED,
1412                def_level_encoding: Encoding::RLE,
1413                rep_level_encoding: Encoding::RLE,
1414                statistics: Some(Statistics::int32(Some(1), Some(3), None, Some(7), true)),
1415            },
1416            Page::DataPageV2 {
1417                buf: Bytes::from(vec![4; 128]),
1418                num_values: 10,
1419                encoding: Encoding::DELTA_BINARY_PACKED,
1420                num_nulls: 2,
1421                num_rows: 12,
1422                def_levels_byte_len: 24,
1423                rep_levels_byte_len: 32,
1424                is_compressed: false,
1425                statistics: None,
1426            },
1427        ];
1428
1429        test_page_roundtrip(&pages[..], Compression::SNAPPY, Type::INT32);
1430        test_page_roundtrip(&pages[..], Compression::UNCOMPRESSED, Type::INT32);
1431    }
1432
1433    /// Tests writing and reading pages.
1434    /// Physical type is for statistics only, should match any defined statistics type in
1435    /// pages.
1436    fn test_page_roundtrip(pages: &[Page], codec: Compression, physical_type: Type) {
1437        let mut compressed_pages = vec![];
1438        let mut total_num_values = 0i64;
1439        let codec_options = CodecOptionsBuilder::default()
1440            .set_backward_compatible_lz4(false)
1441            .build();
1442        let mut compressor = create_codec(codec, &codec_options).unwrap();
1443
1444        for page in pages {
1445            let uncompressed_len = page.buffer().len();
1446
1447            let compressed_page = match *page {
1448                Page::DataPage {
1449                    ref buf,
1450                    num_values,
1451                    encoding,
1452                    def_level_encoding,
1453                    rep_level_encoding,
1454                    ref statistics,
1455                } => {
1456                    total_num_values += num_values as i64;
1457                    let output_buf = compress_helper(compressor.as_mut(), buf);
1458
1459                    Page::DataPage {
1460                        buf: Bytes::from(output_buf),
1461                        num_values,
1462                        encoding,
1463                        def_level_encoding,
1464                        rep_level_encoding,
1465                        statistics: from_thrift(physical_type, to_thrift(statistics.as_ref()))
1466                            .unwrap(),
1467                    }
1468                }
1469                Page::DataPageV2 {
1470                    ref buf,
1471                    num_values,
1472                    encoding,
1473                    num_nulls,
1474                    num_rows,
1475                    def_levels_byte_len,
1476                    rep_levels_byte_len,
1477                    ref statistics,
1478                    ..
1479                } => {
1480                    total_num_values += num_values as i64;
1481                    let offset = (def_levels_byte_len + rep_levels_byte_len) as usize;
1482                    let cmp_buf = compress_helper(compressor.as_mut(), &buf[offset..]);
1483                    let mut output_buf = Vec::from(&buf[..offset]);
1484                    output_buf.extend_from_slice(&cmp_buf[..]);
1485
1486                    Page::DataPageV2 {
1487                        buf: Bytes::from(output_buf),
1488                        num_values,
1489                        encoding,
1490                        num_nulls,
1491                        num_rows,
1492                        def_levels_byte_len,
1493                        rep_levels_byte_len,
1494                        is_compressed: compressor.is_some(),
1495                        statistics: from_thrift(physical_type, to_thrift(statistics.as_ref()))
1496                            .unwrap(),
1497                    }
1498                }
1499                Page::DictionaryPage {
1500                    ref buf,
1501                    num_values,
1502                    encoding,
1503                    is_sorted,
1504                } => {
1505                    let output_buf = compress_helper(compressor.as_mut(), buf);
1506
1507                    Page::DictionaryPage {
1508                        buf: Bytes::from(output_buf),
1509                        num_values,
1510                        encoding,
1511                        is_sorted,
1512                    }
1513                }
1514            };
1515
1516            let compressed_page = CompressedPage::new(compressed_page, uncompressed_len);
1517            compressed_pages.push(compressed_page);
1518        }
1519
1520        let mut buffer: Vec<u8> = vec![];
1521        let mut result_pages: Vec<Page> = vec![];
1522        {
1523            let mut writer = TrackedWrite::new(&mut buffer);
1524            let mut page_writer = SerializedPageWriter::new(&mut writer);
1525
1526            for page in compressed_pages {
1527                page_writer.write_page(page).unwrap();
1528            }
1529            page_writer.close().unwrap();
1530        }
1531        {
1532            let reader = bytes::Bytes::from(buffer);
1533
1534            let t = types::Type::primitive_type_builder("t", physical_type)
1535                .build()
1536                .unwrap();
1537
1538            let desc = ColumnDescriptor::new(Arc::new(t), 0, 0, ColumnPath::new(vec![]));
1539            let meta = ColumnChunkMetaData::builder(Arc::new(desc))
1540                .set_compression(codec)
1541                .set_total_compressed_size(reader.len() as i64)
1542                .set_num_values(total_num_values)
1543                .build()
1544                .unwrap();
1545
1546            let props = ReaderProperties::builder()
1547                .set_backward_compatible_lz4(false)
1548                .build();
1549            let mut page_reader = SerializedPageReader::new_with_properties(
1550                Arc::new(reader),
1551                &meta,
1552                total_num_values as usize,
1553                None,
1554                Arc::new(props),
1555            )
1556            .unwrap();
1557
1558            while let Some(page) = page_reader.get_next_page().unwrap() {
1559                result_pages.push(page);
1560            }
1561        }
1562
1563        assert_eq!(result_pages.len(), pages.len());
1564        for i in 0..result_pages.len() {
1565            assert_page(&result_pages[i], &pages[i]);
1566        }
1567    }
1568
1569    /// Helper function to compress a slice
1570    fn compress_helper(compressor: Option<&mut Box<dyn Codec>>, data: &[u8]) -> Vec<u8> {
1571        let mut output_buf = vec![];
1572        if let Some(cmpr) = compressor {
1573            cmpr.compress(data, &mut output_buf).unwrap();
1574        } else {
1575            output_buf.extend_from_slice(data);
1576        }
1577        output_buf
1578    }
1579
1580    /// Check if pages match.
1581    fn assert_page(left: &Page, right: &Page) {
1582        assert_eq!(left.page_type(), right.page_type());
1583        assert_eq!(&left.buffer(), &right.buffer());
1584        assert_eq!(left.num_values(), right.num_values());
1585        assert_eq!(left.encoding(), right.encoding());
1586        assert_eq!(to_thrift(left.statistics()), to_thrift(right.statistics()));
1587    }
1588
1589    /// Tests roundtrip of i32 data written using `W` and read using `R`
1590    fn test_roundtrip_i32<W, R>(
1591        file: W,
1592        data: Vec<Vec<i32>>,
1593        compression: Compression,
1594    ) -> crate::format::FileMetaData
1595    where
1596        W: Write + Send,
1597        R: ChunkReader + From<W> + 'static,
1598    {
1599        test_roundtrip::<W, R, Int32Type, _>(file, data, |r| r.get_int(0).unwrap(), compression)
1600    }
1601
1602    /// Tests roundtrip of data of type `D` written using `W` and read using `R`
1603    /// and the provided `values` function
1604    fn test_roundtrip<W, R, D, F>(
1605        mut file: W,
1606        data: Vec<Vec<D::T>>,
1607        value: F,
1608        compression: Compression,
1609    ) -> crate::format::FileMetaData
1610    where
1611        W: Write + Send,
1612        R: ChunkReader + From<W> + 'static,
1613        D: DataType,
1614        F: Fn(Row) -> D::T,
1615    {
1616        let schema = Arc::new(
1617            types::Type::group_type_builder("schema")
1618                .with_fields(vec![Arc::new(
1619                    types::Type::primitive_type_builder("col1", D::get_physical_type())
1620                        .with_repetition(Repetition::REQUIRED)
1621                        .build()
1622                        .unwrap(),
1623                )])
1624                .build()
1625                .unwrap(),
1626        );
1627        let props = Arc::new(
1628            WriterProperties::builder()
1629                .set_compression(compression)
1630                .build(),
1631        );
1632        let mut file_writer = SerializedFileWriter::new(&mut file, schema, props).unwrap();
1633        let mut rows: i64 = 0;
1634
1635        for (idx, subset) in data.iter().enumerate() {
1636            let row_group_file_offset = file_writer.buf.bytes_written();
1637            let mut row_group_writer = file_writer.next_row_group().unwrap();
1638            if let Some(mut writer) = row_group_writer.next_column().unwrap() {
1639                rows += writer
1640                    .typed::<D>()
1641                    .write_batch(&subset[..], None, None)
1642                    .unwrap() as i64;
1643                writer.close().unwrap();
1644            }
1645            let last_group = row_group_writer.close().unwrap();
1646            let flushed = file_writer.flushed_row_groups();
1647            assert_eq!(flushed.len(), idx + 1);
1648            assert_eq!(Some(idx as i16), last_group.ordinal());
1649            assert_eq!(Some(row_group_file_offset as i64), last_group.file_offset());
1650            assert_eq!(&flushed[idx], last_group.as_ref());
1651        }
1652        let file_metadata = file_writer.close().unwrap();
1653
1654        let reader = SerializedFileReader::new(R::from(file)).unwrap();
1655        assert_eq!(reader.num_row_groups(), data.len());
1656        assert_eq!(
1657            reader.metadata().file_metadata().num_rows(),
1658            rows,
1659            "row count in metadata not equal to number of rows written"
1660        );
1661        for (i, item) in data.iter().enumerate().take(reader.num_row_groups()) {
1662            let row_group_reader = reader.get_row_group(i).unwrap();
1663            let iter = row_group_reader.get_row_iter(None).unwrap();
1664            let res: Vec<_> = iter.map(|row| row.unwrap()).map(&value).collect();
1665            let row_group_size = row_group_reader.metadata().total_byte_size();
1666            let uncompressed_size: i64 = row_group_reader
1667                .metadata()
1668                .columns()
1669                .iter()
1670                .map(|v| v.uncompressed_size())
1671                .sum();
1672            assert_eq!(row_group_size, uncompressed_size);
1673            assert_eq!(res, *item);
1674        }
1675        file_metadata
1676    }
1677
1678    /// File write-read roundtrip.
1679    /// `data` consists of arrays of values for each row group.
1680    fn test_file_roundtrip(file: File, data: Vec<Vec<i32>>) -> crate::format::FileMetaData {
1681        test_roundtrip_i32::<File, File>(file, data, Compression::UNCOMPRESSED)
1682    }
1683
1684    #[test]
1685    fn test_bytes_writer_empty_row_groups() {
1686        test_bytes_roundtrip(vec![], Compression::UNCOMPRESSED);
1687    }
1688
1689    #[test]
1690    fn test_bytes_writer_single_row_group() {
1691        test_bytes_roundtrip(vec![vec![1, 2, 3, 4, 5]], Compression::UNCOMPRESSED);
1692    }
1693
1694    #[test]
1695    fn test_bytes_writer_multiple_row_groups() {
1696        test_bytes_roundtrip(
1697            vec![
1698                vec![1, 2, 3, 4, 5],
1699                vec![1, 2, 3],
1700                vec![1],
1701                vec![1, 2, 3, 4, 5, 6],
1702            ],
1703            Compression::UNCOMPRESSED,
1704        );
1705    }
1706
1707    #[test]
1708    fn test_bytes_writer_single_row_group_compressed() {
1709        test_bytes_roundtrip(vec![vec![1, 2, 3, 4, 5]], Compression::SNAPPY);
1710    }
1711
1712    #[test]
1713    fn test_bytes_writer_multiple_row_groups_compressed() {
1714        test_bytes_roundtrip(
1715            vec![
1716                vec![1, 2, 3, 4, 5],
1717                vec![1, 2, 3],
1718                vec![1],
1719                vec![1, 2, 3, 4, 5, 6],
1720            ],
1721            Compression::SNAPPY,
1722        );
1723    }
1724
1725    fn test_bytes_roundtrip(data: Vec<Vec<i32>>, compression: Compression) {
1726        test_roundtrip_i32::<Vec<u8>, Bytes>(Vec::with_capacity(1024), data, compression);
1727    }
1728
1729    #[test]
1730    fn test_boolean_roundtrip() {
1731        let my_bool_values: Vec<_> = (0..2049).map(|idx| idx % 2 == 0).collect();
1732        test_roundtrip::<Vec<u8>, Bytes, BoolType, _>(
1733            Vec::with_capacity(1024),
1734            vec![my_bool_values],
1735            |r| r.get_bool(0).unwrap(),
1736            Compression::UNCOMPRESSED,
1737        );
1738    }
1739
1740    #[test]
1741    fn test_boolean_compressed_roundtrip() {
1742        let my_bool_values: Vec<_> = (0..2049).map(|idx| idx % 2 == 0).collect();
1743        test_roundtrip::<Vec<u8>, Bytes, BoolType, _>(
1744            Vec::with_capacity(1024),
1745            vec![my_bool_values],
1746            |r| r.get_bool(0).unwrap(),
1747            Compression::SNAPPY,
1748        );
1749    }
1750
1751    #[test]
1752    fn test_column_offset_index_file() {
1753        let file = tempfile::tempfile().unwrap();
1754        let file_metadata = test_file_roundtrip(file, vec![vec![1, 2, 3, 4, 5]]);
1755        file_metadata.row_groups.iter().for_each(|row_group| {
1756            row_group.columns.iter().for_each(|column_chunk| {
1757                assert_ne!(None, column_chunk.column_index_offset);
1758                assert_ne!(None, column_chunk.column_index_length);
1759
1760                assert_ne!(None, column_chunk.offset_index_offset);
1761                assert_ne!(None, column_chunk.offset_index_length);
1762            })
1763        });
1764    }
1765
1766    fn test_kv_metadata(initial_kv: Option<Vec<KeyValue>>, final_kv: Option<Vec<KeyValue>>) {
1767        let schema = Arc::new(
1768            types::Type::group_type_builder("schema")
1769                .with_fields(vec![Arc::new(
1770                    types::Type::primitive_type_builder("col1", Type::INT32)
1771                        .with_repetition(Repetition::REQUIRED)
1772                        .build()
1773                        .unwrap(),
1774                )])
1775                .build()
1776                .unwrap(),
1777        );
1778        let mut out = Vec::with_capacity(1024);
1779        let props = Arc::new(
1780            WriterProperties::builder()
1781                .set_key_value_metadata(initial_kv.clone())
1782                .build(),
1783        );
1784        let mut writer = SerializedFileWriter::new(&mut out, schema, props).unwrap();
1785        let mut row_group_writer = writer.next_row_group().unwrap();
1786        let column = row_group_writer.next_column().unwrap().unwrap();
1787        column.close().unwrap();
1788        row_group_writer.close().unwrap();
1789        if let Some(kvs) = &final_kv {
1790            for kv in kvs {
1791                writer.append_key_value_metadata(kv.clone())
1792            }
1793        }
1794        writer.close().unwrap();
1795
1796        let reader = SerializedFileReader::new(Bytes::from(out)).unwrap();
1797        let metadata = reader.metadata().file_metadata();
1798        let keys = metadata.key_value_metadata();
1799
1800        match (initial_kv, final_kv) {
1801            (Some(a), Some(b)) => {
1802                let keys = keys.unwrap();
1803                assert_eq!(keys.len(), a.len() + b.len());
1804                assert_eq!(&keys[..a.len()], a.as_slice());
1805                assert_eq!(&keys[a.len()..], b.as_slice());
1806            }
1807            (Some(v), None) => assert_eq!(keys.unwrap(), &v),
1808            (None, Some(v)) if !v.is_empty() => assert_eq!(keys.unwrap(), &v),
1809            _ => assert!(keys.is_none()),
1810        }
1811    }
1812
1813    #[test]
1814    fn test_append_metadata() {
1815        let kv1 = KeyValue::new("cupcakes".to_string(), "awesome".to_string());
1816        let kv2 = KeyValue::new("bingo".to_string(), "bongo".to_string());
1817
1818        test_kv_metadata(None, None);
1819        test_kv_metadata(Some(vec![kv1.clone()]), None);
1820        test_kv_metadata(None, Some(vec![kv2.clone()]));
1821        test_kv_metadata(Some(vec![kv1.clone()]), Some(vec![kv2.clone()]));
1822        test_kv_metadata(Some(vec![]), Some(vec![kv2]));
1823        test_kv_metadata(Some(vec![]), Some(vec![]));
1824        test_kv_metadata(Some(vec![kv1]), Some(vec![]));
1825        test_kv_metadata(None, Some(vec![]));
1826    }
1827
1828    #[test]
1829    fn test_backwards_compatible_statistics() {
1830        let message_type = "
1831            message test_schema {
1832                REQUIRED INT32 decimal1 (DECIMAL(8,2));
1833                REQUIRED INT32 i32 (INTEGER(32,true));
1834                REQUIRED INT32 u32 (INTEGER(32,false));
1835            }
1836        ";
1837
1838        let schema = Arc::new(parse_message_type(message_type).unwrap());
1839        let props = Default::default();
1840        let mut writer = SerializedFileWriter::new(vec![], schema, props).unwrap();
1841        let mut row_group_writer = writer.next_row_group().unwrap();
1842
1843        for _ in 0..3 {
1844            let mut writer = row_group_writer.next_column().unwrap().unwrap();
1845            writer
1846                .typed::<Int32Type>()
1847                .write_batch(&[1, 2, 3], None, None)
1848                .unwrap();
1849            writer.close().unwrap();
1850        }
1851        let metadata = row_group_writer.close().unwrap();
1852        writer.close().unwrap();
1853
1854        let thrift = metadata.to_thrift();
1855        let encoded_stats: Vec<_> = thrift
1856            .columns
1857            .into_iter()
1858            .map(|x| x.meta_data.unwrap().statistics.unwrap())
1859            .collect();
1860
1861        // decimal
1862        let s = &encoded_stats[0];
1863        assert_eq!(s.min.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1864        assert_eq!(s.max.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1865        assert_eq!(s.min_value.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1866        assert_eq!(s.max_value.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1867
1868        // i32
1869        let s = &encoded_stats[1];
1870        assert_eq!(s.min.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1871        assert_eq!(s.max.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1872        assert_eq!(s.min_value.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1873        assert_eq!(s.max_value.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1874
1875        // u32
1876        let s = &encoded_stats[2];
1877        assert_eq!(s.min.as_deref(), None);
1878        assert_eq!(s.max.as_deref(), None);
1879        assert_eq!(s.min_value.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1880        assert_eq!(s.max_value.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1881    }
1882
1883    #[test]
1884    fn test_spliced_write() {
1885        let message_type = "
1886            message test_schema {
1887                REQUIRED INT32 i32 (INTEGER(32,true));
1888                REQUIRED INT32 u32 (INTEGER(32,false));
1889            }
1890        ";
1891        let schema = Arc::new(parse_message_type(message_type).unwrap());
1892        let props = Arc::new(WriterProperties::builder().build());
1893
1894        let mut file = Vec::with_capacity(1024);
1895        let mut file_writer = SerializedFileWriter::new(&mut file, schema, props.clone()).unwrap();
1896
1897        let columns = file_writer.descr.columns();
1898        let mut column_state: Vec<(_, Option<ColumnCloseResult>)> = columns
1899            .iter()
1900            .map(|_| (TrackedWrite::new(Vec::with_capacity(1024)), None))
1901            .collect();
1902
1903        let mut column_state_slice = column_state.as_mut_slice();
1904        let mut column_writers = Vec::with_capacity(columns.len());
1905        for c in columns {
1906            let ((buf, out), tail) = column_state_slice.split_first_mut().unwrap();
1907            column_state_slice = tail;
1908
1909            let page_writer = Box::new(SerializedPageWriter::new(buf));
1910            let col_writer = get_column_writer(c.clone(), props.clone(), page_writer);
1911            column_writers.push(SerializedColumnWriter::new(
1912                col_writer,
1913                Some(Box::new(|on_close| {
1914                    *out = Some(on_close);
1915                    Ok(())
1916                })),
1917            ));
1918        }
1919
1920        let column_data = [[1, 2, 3, 4], [7, 3, 7, 3]];
1921
1922        // Interleaved writing to the column writers
1923        for (writer, batch) in column_writers.iter_mut().zip(column_data) {
1924            let writer = writer.typed::<Int32Type>();
1925            writer.write_batch(&batch, None, None).unwrap();
1926        }
1927
1928        // Close the column writers
1929        for writer in column_writers {
1930            writer.close().unwrap()
1931        }
1932
1933        // Splice column data into a row group
1934        let mut row_group_writer = file_writer.next_row_group().unwrap();
1935        for (write, close) in column_state {
1936            let buf = Bytes::from(write.into_inner().unwrap());
1937            row_group_writer
1938                .append_column(&buf, close.unwrap())
1939                .unwrap();
1940        }
1941        row_group_writer.close().unwrap();
1942        file_writer.close().unwrap();
1943
1944        // Check data was written correctly
1945        let file = Bytes::from(file);
1946        let test_read = |reader: SerializedFileReader<Bytes>| {
1947            let row_group = reader.get_row_group(0).unwrap();
1948
1949            let mut out = Vec::with_capacity(4);
1950            let c1 = row_group.get_column_reader(0).unwrap();
1951            let mut c1 = get_typed_column_reader::<Int32Type>(c1);
1952            c1.read_records(4, None, None, &mut out).unwrap();
1953            assert_eq!(out, column_data[0]);
1954
1955            out.clear();
1956
1957            let c2 = row_group.get_column_reader(1).unwrap();
1958            let mut c2 = get_typed_column_reader::<Int32Type>(c2);
1959            c2.read_records(4, None, None, &mut out).unwrap();
1960            assert_eq!(out, column_data[1]);
1961        };
1962
1963        let reader = SerializedFileReader::new(file.clone()).unwrap();
1964        test_read(reader);
1965
1966        let options = ReadOptionsBuilder::new().with_page_index().build();
1967        let reader = SerializedFileReader::new_with_options(file, options).unwrap();
1968        test_read(reader);
1969    }
1970
1971    #[test]
1972    fn test_disabled_statistics() {
1973        let message_type = "
1974            message test_schema {
1975                REQUIRED INT32 a;
1976                REQUIRED INT32 b;
1977            }
1978        ";
1979        let schema = Arc::new(parse_message_type(message_type).unwrap());
1980        let props = WriterProperties::builder()
1981            .set_statistics_enabled(EnabledStatistics::None)
1982            .set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
1983            .set_offset_index_disabled(true) // this should be ignored because of the line above
1984            .build();
1985        let mut file = Vec::with_capacity(1024);
1986        let mut file_writer =
1987            SerializedFileWriter::new(&mut file, schema, Arc::new(props)).unwrap();
1988
1989        let mut row_group_writer = file_writer.next_row_group().unwrap();
1990        let mut a_writer = row_group_writer.next_column().unwrap().unwrap();
1991        let col_writer = a_writer.typed::<Int32Type>();
1992        col_writer.write_batch(&[1, 2, 3], None, None).unwrap();
1993        a_writer.close().unwrap();
1994
1995        let mut b_writer = row_group_writer.next_column().unwrap().unwrap();
1996        let col_writer = b_writer.typed::<Int32Type>();
1997        col_writer.write_batch(&[4, 5, 6], None, None).unwrap();
1998        b_writer.close().unwrap();
1999        row_group_writer.close().unwrap();
2000
2001        let metadata = file_writer.finish().unwrap();
2002        assert_eq!(metadata.row_groups.len(), 1);
2003        let row_group = &metadata.row_groups[0];
2004        assert_eq!(row_group.columns.len(), 2);
2005        // Column "a" has both offset and column index, as requested
2006        assert!(row_group.columns[0].offset_index_offset.is_some());
2007        assert!(row_group.columns[0].column_index_offset.is_some());
2008        // Column "b" should only have offset index
2009        assert!(row_group.columns[1].offset_index_offset.is_some());
2010        assert!(row_group.columns[1].column_index_offset.is_none());
2011
2012        let err = file_writer.next_row_group().err().unwrap().to_string();
2013        assert_eq!(err, "Parquet error: SerializedFileWriter already finished");
2014
2015        drop(file_writer);
2016
2017        let options = ReadOptionsBuilder::new().with_page_index().build();
2018        let reader = SerializedFileReader::new_with_options(Bytes::from(file), options).unwrap();
2019
2020        let offset_index = reader.metadata().offset_index().unwrap();
2021        assert_eq!(offset_index.len(), 1); // 1 row group
2022        assert_eq!(offset_index[0].len(), 2); // 2 columns
2023
2024        let column_index = reader.metadata().column_index().unwrap();
2025        assert_eq!(column_index.len(), 1); // 1 row group
2026        assert_eq!(column_index[0].len(), 2); // 2 column
2027
2028        let a_idx = &column_index[0][0];
2029        assert!(matches!(a_idx, Index::INT32(_)), "{a_idx:?}");
2030        let b_idx = &column_index[0][1];
2031        assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
2032    }
2033
2034    #[test]
2035    fn test_byte_array_size_statistics() {
2036        let message_type = "
2037            message test_schema {
2038                OPTIONAL BYTE_ARRAY a (UTF8);
2039            }
2040        ";
2041        let schema = Arc::new(parse_message_type(message_type).unwrap());
2042        let data = ByteArrayType::gen_vec(32, 7);
2043        let def_levels = [1, 1, 1, 1, 0, 1, 0, 1, 0, 1];
2044        let unenc_size: i64 = data.iter().map(|x| x.len() as i64).sum();
2045        let file: File = tempfile::tempfile().unwrap();
2046        let props = Arc::new(
2047            WriterProperties::builder()
2048                .set_statistics_enabled(EnabledStatistics::Page)
2049                .build(),
2050        );
2051
2052        let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
2053        let mut row_group_writer = writer.next_row_group().unwrap();
2054
2055        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
2056        col_writer
2057            .typed::<ByteArrayType>()
2058            .write_batch(&data, Some(&def_levels), None)
2059            .unwrap();
2060        col_writer.close().unwrap();
2061        row_group_writer.close().unwrap();
2062        let file_metadata = writer.close().unwrap();
2063
2064        assert_eq!(file_metadata.row_groups.len(), 1);
2065        assert_eq!(file_metadata.row_groups[0].columns.len(), 1);
2066        assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
2067
2068        let check_def_hist = |def_hist: &[i64]| {
2069            assert_eq!(def_hist.len(), 2);
2070            assert_eq!(def_hist[0], 3);
2071            assert_eq!(def_hist[1], 7);
2072        };
2073
2074        assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
2075        let meta_data = file_metadata.row_groups[0].columns[0]
2076            .meta_data
2077            .as_ref()
2078            .unwrap();
2079        assert!(meta_data.size_statistics.is_some());
2080        let size_stats = meta_data.size_statistics.as_ref().unwrap();
2081
2082        assert!(size_stats.repetition_level_histogram.is_none());
2083        assert!(size_stats.definition_level_histogram.is_some());
2084        assert!(size_stats.unencoded_byte_array_data_bytes.is_some());
2085        assert_eq!(
2086            unenc_size,
2087            size_stats.unencoded_byte_array_data_bytes.unwrap()
2088        );
2089        check_def_hist(size_stats.definition_level_histogram.as_ref().unwrap());
2090
2091        // check that the read metadata is also correct
2092        let options = ReadOptionsBuilder::new().with_page_index().build();
2093        let reader = SerializedFileReader::new_with_options(file, options).unwrap();
2094
2095        let rfile_metadata = reader.metadata().file_metadata();
2096        assert_eq!(rfile_metadata.num_rows(), file_metadata.num_rows);
2097        assert_eq!(reader.num_row_groups(), 1);
2098        let rowgroup = reader.get_row_group(0).unwrap();
2099        assert_eq!(rowgroup.num_columns(), 1);
2100        let column = rowgroup.metadata().column(0);
2101        assert!(column.definition_level_histogram().is_some());
2102        assert!(column.repetition_level_histogram().is_none());
2103        assert!(column.unencoded_byte_array_data_bytes().is_some());
2104        check_def_hist(column.definition_level_histogram().unwrap().values());
2105        assert_eq!(
2106            unenc_size,
2107            column.unencoded_byte_array_data_bytes().unwrap()
2108        );
2109
2110        // check histogram in column index as well
2111        assert!(reader.metadata().column_index().is_some());
2112        let column_index = reader.metadata().column_index().unwrap();
2113        assert_eq!(column_index.len(), 1);
2114        assert_eq!(column_index[0].len(), 1);
2115        let col_idx = if let Index::BYTE_ARRAY(index) = &column_index[0][0] {
2116            assert_eq!(index.indexes.len(), 1);
2117            &index.indexes[0]
2118        } else {
2119            unreachable!()
2120        };
2121
2122        assert!(col_idx.repetition_level_histogram().is_none());
2123        assert!(col_idx.definition_level_histogram().is_some());
2124        check_def_hist(col_idx.definition_level_histogram().unwrap().values());
2125
2126        assert!(reader.metadata().offset_index().is_some());
2127        let offset_index = reader.metadata().offset_index().unwrap();
2128        assert_eq!(offset_index.len(), 1);
2129        assert_eq!(offset_index[0].len(), 1);
2130        assert!(offset_index[0][0].unencoded_byte_array_data_bytes.is_some());
2131        let page_sizes = offset_index[0][0]
2132            .unencoded_byte_array_data_bytes
2133            .as_ref()
2134            .unwrap();
2135        assert_eq!(page_sizes.len(), 1);
2136        assert_eq!(page_sizes[0], unenc_size);
2137    }
2138
2139    #[test]
2140    fn test_too_many_rowgroups() {
2141        let message_type = "
2142            message test_schema {
2143                REQUIRED BYTE_ARRAY a (UTF8);
2144            }
2145        ";
2146        let schema = Arc::new(parse_message_type(message_type).unwrap());
2147        let file: File = tempfile::tempfile().unwrap();
2148        let props = Arc::new(
2149            WriterProperties::builder()
2150                .set_statistics_enabled(EnabledStatistics::None)
2151                .set_max_row_group_size(1)
2152                .build(),
2153        );
2154        let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
2155
2156        // Create 32k empty rowgroups. Should error when i == 32768.
2157        for i in 0..0x8001 {
2158            match writer.next_row_group() {
2159                Ok(mut row_group_writer) => {
2160                    assert_ne!(i, 0x8000);
2161                    let col_writer = row_group_writer.next_column().unwrap().unwrap();
2162                    col_writer.close().unwrap();
2163                    row_group_writer.close().unwrap();
2164                }
2165                Err(e) => {
2166                    assert_eq!(i, 0x8000);
2167                    assert_eq!(
2168                        e.to_string(),
2169                        "Parquet error: Parquet does not support more than 32767 row groups per file (currently: 32768)"
2170                    );
2171                }
2172            }
2173        }
2174        writer.close().unwrap();
2175    }
2176
2177    #[test]
2178    fn test_size_statistics_with_repetition_and_nulls() {
2179        let message_type = "
2180            message test_schema {
2181                OPTIONAL group i32_list (LIST) {
2182                    REPEATED group list {
2183                        OPTIONAL INT32 element;
2184                    }
2185                }
2186            }
2187        ";
2188        // column is:
2189        // row 0: [1, 2]
2190        // row 1: NULL
2191        // row 2: [4, NULL]
2192        // row 3: []
2193        // row 4: [7, 8, 9, 10]
2194        let schema = Arc::new(parse_message_type(message_type).unwrap());
2195        let data = [1, 2, 4, 7, 8, 9, 10];
2196        let def_levels = [3, 3, 0, 3, 2, 1, 3, 3, 3, 3];
2197        let rep_levels = [0, 1, 0, 0, 1, 0, 0, 1, 1, 1];
2198        let file = tempfile::tempfile().unwrap();
2199        let props = Arc::new(
2200            WriterProperties::builder()
2201                .set_statistics_enabled(EnabledStatistics::Page)
2202                .build(),
2203        );
2204        let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
2205        let mut row_group_writer = writer.next_row_group().unwrap();
2206
2207        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
2208        col_writer
2209            .typed::<Int32Type>()
2210            .write_batch(&data, Some(&def_levels), Some(&rep_levels))
2211            .unwrap();
2212        col_writer.close().unwrap();
2213        row_group_writer.close().unwrap();
2214        let file_metadata = writer.close().unwrap();
2215
2216        assert_eq!(file_metadata.row_groups.len(), 1);
2217        assert_eq!(file_metadata.row_groups[0].columns.len(), 1);
2218        assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
2219
2220        let check_def_hist = |def_hist: &[i64]| {
2221            assert_eq!(def_hist.len(), 4);
2222            assert_eq!(def_hist[0], 1);
2223            assert_eq!(def_hist[1], 1);
2224            assert_eq!(def_hist[2], 1);
2225            assert_eq!(def_hist[3], 7);
2226        };
2227
2228        let check_rep_hist = |rep_hist: &[i64]| {
2229            assert_eq!(rep_hist.len(), 2);
2230            assert_eq!(rep_hist[0], 5);
2231            assert_eq!(rep_hist[1], 5);
2232        };
2233
2234        // check that histograms are set properly in the write and read metadata
2235        // also check that unencoded_byte_array_data_bytes is not set
2236        assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
2237        let meta_data = file_metadata.row_groups[0].columns[0]
2238            .meta_data
2239            .as_ref()
2240            .unwrap();
2241        assert!(meta_data.size_statistics.is_some());
2242        let size_stats = meta_data.size_statistics.as_ref().unwrap();
2243        assert!(size_stats.repetition_level_histogram.is_some());
2244        assert!(size_stats.definition_level_histogram.is_some());
2245        assert!(size_stats.unencoded_byte_array_data_bytes.is_none());
2246        check_def_hist(size_stats.definition_level_histogram.as_ref().unwrap());
2247        check_rep_hist(size_stats.repetition_level_histogram.as_ref().unwrap());
2248
2249        // check that the read metadata is also correct
2250        let options = ReadOptionsBuilder::new().with_page_index().build();
2251        let reader = SerializedFileReader::new_with_options(file, options).unwrap();
2252
2253        let rfile_metadata = reader.metadata().file_metadata();
2254        assert_eq!(rfile_metadata.num_rows(), file_metadata.num_rows);
2255        assert_eq!(reader.num_row_groups(), 1);
2256        let rowgroup = reader.get_row_group(0).unwrap();
2257        assert_eq!(rowgroup.num_columns(), 1);
2258        let column = rowgroup.metadata().column(0);
2259        assert!(column.definition_level_histogram().is_some());
2260        assert!(column.repetition_level_histogram().is_some());
2261        assert!(column.unencoded_byte_array_data_bytes().is_none());
2262        check_def_hist(column.definition_level_histogram().unwrap().values());
2263        check_rep_hist(column.repetition_level_histogram().unwrap().values());
2264
2265        // check histogram in column index as well
2266        assert!(reader.metadata().column_index().is_some());
2267        let column_index = reader.metadata().column_index().unwrap();
2268        assert_eq!(column_index.len(), 1);
2269        assert_eq!(column_index[0].len(), 1);
2270        let col_idx = if let Index::INT32(index) = &column_index[0][0] {
2271            assert_eq!(index.indexes.len(), 1);
2272            &index.indexes[0]
2273        } else {
2274            unreachable!()
2275        };
2276
2277        check_def_hist(col_idx.definition_level_histogram().unwrap().values());
2278        check_rep_hist(col_idx.repetition_level_histogram().unwrap().values());
2279
2280        assert!(reader.metadata().offset_index().is_some());
2281        let offset_index = reader.metadata().offset_index().unwrap();
2282        assert_eq!(offset_index.len(), 1);
2283        assert_eq!(offset_index[0].len(), 1);
2284        assert!(offset_index[0][0].unencoded_byte_array_data_bytes.is_none());
2285    }
2286
2287    #[test]
2288    #[cfg(feature = "arrow")]
2289    fn test_byte_stream_split_extended_roundtrip() {
2290        let path = format!(
2291            "{}/byte_stream_split_extended.gzip.parquet",
2292            arrow::util::test_util::parquet_test_data(),
2293        );
2294        let file = File::open(path).unwrap();
2295
2296        // Read in test file and rewrite to tmp
2297        let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(file)
2298            .expect("parquet open")
2299            .build()
2300            .expect("parquet open");
2301
2302        let file = tempfile::tempfile().unwrap();
2303        let props = WriterProperties::builder()
2304            .set_dictionary_enabled(false)
2305            .set_column_encoding(
2306                ColumnPath::from("float16_byte_stream_split"),
2307                Encoding::BYTE_STREAM_SPLIT,
2308            )
2309            .set_column_encoding(
2310                ColumnPath::from("float_byte_stream_split"),
2311                Encoding::BYTE_STREAM_SPLIT,
2312            )
2313            .set_column_encoding(
2314                ColumnPath::from("double_byte_stream_split"),
2315                Encoding::BYTE_STREAM_SPLIT,
2316            )
2317            .set_column_encoding(
2318                ColumnPath::from("int32_byte_stream_split"),
2319                Encoding::BYTE_STREAM_SPLIT,
2320            )
2321            .set_column_encoding(
2322                ColumnPath::from("int64_byte_stream_split"),
2323                Encoding::BYTE_STREAM_SPLIT,
2324            )
2325            .set_column_encoding(
2326                ColumnPath::from("flba5_byte_stream_split"),
2327                Encoding::BYTE_STREAM_SPLIT,
2328            )
2329            .set_column_encoding(
2330                ColumnPath::from("decimal_byte_stream_split"),
2331                Encoding::BYTE_STREAM_SPLIT,
2332            )
2333            .build();
2334
2335        let mut parquet_writer = ArrowWriter::try_new(
2336            file.try_clone().expect("cannot open file"),
2337            parquet_reader.schema(),
2338            Some(props),
2339        )
2340        .expect("create arrow writer");
2341
2342        for maybe_batch in parquet_reader {
2343            let batch = maybe_batch.expect("reading batch");
2344            parquet_writer.write(&batch).expect("writing data");
2345        }
2346
2347        parquet_writer.close().expect("finalizing file");
2348
2349        let reader = SerializedFileReader::new(file).expect("Failed to create reader");
2350        let filemeta = reader.metadata();
2351
2352        // Make sure byte_stream_split encoding was used
2353        let check_encoding = |x: usize, filemeta: &ParquetMetaData| {
2354            assert!(filemeta
2355                .row_group(0)
2356                .column(x)
2357                .encodings()
2358                .contains(&Encoding::BYTE_STREAM_SPLIT));
2359        };
2360
2361        check_encoding(1, filemeta);
2362        check_encoding(3, filemeta);
2363        check_encoding(5, filemeta);
2364        check_encoding(7, filemeta);
2365        check_encoding(9, filemeta);
2366        check_encoding(11, filemeta);
2367        check_encoding(13, filemeta);
2368
2369        // Read back tmpfile and make sure all values are correct
2370        let mut iter = reader
2371            .get_row_iter(None)
2372            .expect("Failed to create row iterator");
2373
2374        let mut start = 0;
2375        let end = reader.metadata().file_metadata().num_rows();
2376
2377        let check_row = |row: Result<Row, ParquetError>| {
2378            assert!(row.is_ok());
2379            let r = row.unwrap();
2380            assert_eq!(r.get_float16(0).unwrap(), r.get_float16(1).unwrap());
2381            assert_eq!(r.get_float(2).unwrap(), r.get_float(3).unwrap());
2382            assert_eq!(r.get_double(4).unwrap(), r.get_double(5).unwrap());
2383            assert_eq!(r.get_int(6).unwrap(), r.get_int(7).unwrap());
2384            assert_eq!(r.get_long(8).unwrap(), r.get_long(9).unwrap());
2385            assert_eq!(r.get_bytes(10).unwrap(), r.get_bytes(11).unwrap());
2386            assert_eq!(r.get_decimal(12).unwrap(), r.get_decimal(13).unwrap());
2387        };
2388
2389        while start < end {
2390            match iter.next() {
2391                Some(row) => check_row(row),
2392                None => break,
2393            };
2394            start += 1;
2395        }
2396    }
2397}