parquet/file/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains file writer API, and provides methods to write row groups and columns by
19//! using row group writers and column writers respectively.
20
21use crate::bloom_filter::Sbbf;
22use crate::format as parquet;
23use crate::format::{ColumnIndex, OffsetIndex};
24use crate::thrift::TSerializable;
25use std::fmt::Debug;
26use std::io::{BufWriter, IoSlice, Read};
27use std::{io::Write, sync::Arc};
28use thrift::protocol::TCompactOutputProtocol;
29
30use crate::column::page_encryption::PageEncryptor;
31use crate::column::writer::{get_typed_column_writer_mut, ColumnCloseResult, ColumnWriterImpl};
32use crate::column::{
33    page::{CompressedPage, PageWriteSpec, PageWriter},
34    writer::{get_column_writer, ColumnWriter},
35};
36use crate::data_type::DataType;
37#[cfg(feature = "encryption")]
38use crate::encryption::encrypt::{
39    get_column_crypto_metadata, FileEncryptionProperties, FileEncryptor,
40};
41use crate::errors::{ParquetError, Result};
42use crate::file::properties::{BloomFilterPosition, WriterPropertiesPtr};
43use crate::file::reader::ChunkReader;
44#[cfg(feature = "encryption")]
45use crate::file::PARQUET_MAGIC_ENCR_FOOTER;
46use crate::file::{metadata::*, PARQUET_MAGIC};
47use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor, TypePtr};
48
49/// A wrapper around a [`Write`] that keeps track of the number
50/// of bytes that have been written. The given [`Write`] is wrapped
51/// with a [`BufWriter`] to optimize writing performance.
52pub struct TrackedWrite<W: Write> {
53    inner: BufWriter<W>,
54    bytes_written: usize,
55}
56
57impl<W: Write> TrackedWrite<W> {
58    /// Create a new [`TrackedWrite`] from a [`Write`]
59    pub fn new(inner: W) -> Self {
60        let buf_write = BufWriter::new(inner);
61        Self {
62            inner: buf_write,
63            bytes_written: 0,
64        }
65    }
66
67    /// Returns the number of bytes written to this instance
68    pub fn bytes_written(&self) -> usize {
69        self.bytes_written
70    }
71
72    /// Returns a reference to the underlying writer.
73    pub fn inner(&self) -> &W {
74        self.inner.get_ref()
75    }
76
77    /// Returns a mutable reference to the underlying writer.
78    ///
79    /// It is inadvisable to directly write to the underlying writer, doing so
80    /// will likely result in data corruption
81    pub fn inner_mut(&mut self) -> &mut W {
82        self.inner.get_mut()
83    }
84
85    /// Returns the underlying writer.
86    pub fn into_inner(self) -> Result<W> {
87        self.inner.into_inner().map_err(|err| {
88            ParquetError::General(format!("fail to get inner writer: {:?}", err.to_string()))
89        })
90    }
91}
92
93impl<W: Write> Write for TrackedWrite<W> {
94    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
95        let bytes = self.inner.write(buf)?;
96        self.bytes_written += bytes;
97        Ok(bytes)
98    }
99
100    fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> std::io::Result<usize> {
101        let bytes = self.inner.write_vectored(bufs)?;
102        self.bytes_written += bytes;
103        Ok(bytes)
104    }
105
106    fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
107        self.inner.write_all(buf)?;
108        self.bytes_written += buf.len();
109
110        Ok(())
111    }
112
113    fn flush(&mut self) -> std::io::Result<()> {
114        self.inner.flush()
115    }
116}
117
118/// Callback invoked on closing a column chunk
119pub type OnCloseColumnChunk<'a> = Box<dyn FnOnce(ColumnCloseResult) -> Result<()> + 'a>;
120
121/// Callback invoked on closing a row group, arguments are:
122///
123/// - the row group metadata
124/// - the column index for each column chunk
125/// - the offset index for each column chunk
126pub type OnCloseRowGroup<'a, W> = Box<
127    dyn FnOnce(
128            &'a mut TrackedWrite<W>,
129            RowGroupMetaData,
130            Vec<Option<Sbbf>>,
131            Vec<Option<ColumnIndex>>,
132            Vec<Option<OffsetIndex>>,
133        ) -> Result<()>
134        + 'a
135        + Send,
136>;
137
138// ----------------------------------------------------------------------
139// Serialized impl for file & row group writers
140
141/// Parquet file writer API.
142/// Provides methods to write row groups sequentially.
143///
144/// The main workflow should be as following:
145/// - Create file writer, this will open a new file and potentially write some metadata.
146/// - Request a new row group writer by calling `next_row_group`.
147/// - Once finished writing row group, close row group writer by calling `close`
148/// - Write subsequent row groups, if necessary.
149/// - After all row groups have been written, close the file writer using `close` method.
150pub struct SerializedFileWriter<W: Write> {
151    buf: TrackedWrite<W>,
152    schema: TypePtr,
153    descr: SchemaDescPtr,
154    props: WriterPropertiesPtr,
155    row_groups: Vec<RowGroupMetaData>,
156    bloom_filters: Vec<Vec<Option<Sbbf>>>,
157    column_indexes: Vec<Vec<Option<ColumnIndex>>>,
158    offset_indexes: Vec<Vec<Option<OffsetIndex>>>,
159    row_group_index: usize,
160    // kv_metadatas will be appended to `props` when `write_metadata`
161    kv_metadatas: Vec<KeyValue>,
162    finished: bool,
163    #[cfg(feature = "encryption")]
164    file_encryptor: Option<Arc<FileEncryptor>>,
165}
166
167impl<W: Write> Debug for SerializedFileWriter<W> {
168    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
169        // implement Debug so this can be used with #[derive(Debug)]
170        // in client code rather than actually listing all the fields
171        f.debug_struct("SerializedFileWriter")
172            .field("descr", &self.descr)
173            .field("row_group_index", &self.row_group_index)
174            .field("kv_metadatas", &self.kv_metadatas)
175            .finish_non_exhaustive()
176    }
177}
178
179impl<W: Write + Send> SerializedFileWriter<W> {
180    /// Creates new file writer.
181    pub fn new(buf: W, schema: TypePtr, properties: WriterPropertiesPtr) -> Result<Self> {
182        let mut buf = TrackedWrite::new(buf);
183
184        let schema_descriptor = SchemaDescriptor::new(schema.clone());
185
186        #[cfg(feature = "encryption")]
187        let file_encryptor = Self::get_file_encryptor(&properties, &schema_descriptor)?;
188
189        Self::start_file(&properties, &mut buf)?;
190        Ok(Self {
191            buf,
192            schema,
193            descr: Arc::new(schema_descriptor),
194            props: properties,
195            row_groups: vec![],
196            bloom_filters: vec![],
197            column_indexes: Vec::new(),
198            offset_indexes: Vec::new(),
199            row_group_index: 0,
200            kv_metadatas: Vec::new(),
201            finished: false,
202            #[cfg(feature = "encryption")]
203            file_encryptor,
204        })
205    }
206
207    #[cfg(feature = "encryption")]
208    fn get_file_encryptor(
209        properties: &WriterPropertiesPtr,
210        schema_descriptor: &SchemaDescriptor,
211    ) -> Result<Option<Arc<FileEncryptor>>> {
212        if let Some(file_encryption_properties) = &properties.file_encryption_properties {
213            file_encryption_properties.validate_encrypted_column_names(schema_descriptor)?;
214
215            if !file_encryption_properties.encrypt_footer() {
216                return Err(general_err!(
217                    "Writing encrypted files with plaintext footers is not supported yet"
218                ));
219            }
220
221            Ok(Some(Arc::new(FileEncryptor::new(
222                file_encryption_properties.clone(),
223            )?)))
224        } else {
225            Ok(None)
226        }
227    }
228
229    /// Creates new row group from this file writer.
230    /// In case of IO error or Thrift error, returns `Err`.
231    ///
232    /// There can be at most 2^15 row groups in a file; and row groups have
233    /// to be written sequentially. Every time the next row group is requested, the
234    /// previous row group must be finalised and closed using `RowGroupWriter::close` method.
235    pub fn next_row_group(&mut self) -> Result<SerializedRowGroupWriter<'_, W>> {
236        self.assert_previous_writer_closed()?;
237        let ordinal = self.row_group_index;
238
239        let ordinal: i16 = ordinal.try_into().map_err(|_| {
240            ParquetError::General(format!(
241                "Parquet does not support more than {} row groups per file (currently: {})",
242                i16::MAX,
243                ordinal
244            ))
245        })?;
246
247        self.row_group_index = self
248            .row_group_index
249            .checked_add(1)
250            .expect("SerializedFileWriter::row_group_index overflowed");
251
252        let bloom_filter_position = self.properties().bloom_filter_position();
253        let row_groups = &mut self.row_groups;
254        let row_bloom_filters = &mut self.bloom_filters;
255        let row_column_indexes = &mut self.column_indexes;
256        let row_offset_indexes = &mut self.offset_indexes;
257        let on_close = move |buf,
258                             mut metadata,
259                             row_group_bloom_filter,
260                             row_group_column_index,
261                             row_group_offset_index| {
262            row_bloom_filters.push(row_group_bloom_filter);
263            row_column_indexes.push(row_group_column_index);
264            row_offset_indexes.push(row_group_offset_index);
265            // write bloom filters out immediately after the row group if requested
266            match bloom_filter_position {
267                BloomFilterPosition::AfterRowGroup => {
268                    write_bloom_filters(buf, row_bloom_filters, &mut metadata)?
269                }
270                BloomFilterPosition::End => (),
271            };
272            row_groups.push(metadata);
273            Ok(())
274        };
275
276        let row_group_writer = SerializedRowGroupWriter::new(
277            self.descr.clone(),
278            self.props.clone(),
279            &mut self.buf,
280            ordinal,
281            Some(Box::new(on_close)),
282        );
283        #[cfg(feature = "encryption")]
284        let row_group_writer = row_group_writer.with_file_encryptor(self.file_encryptor.clone());
285
286        Ok(row_group_writer)
287    }
288
289    /// Returns metadata for any flushed row groups
290    pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
291        &self.row_groups
292    }
293
294    /// Close and finalize the underlying Parquet writer
295    ///
296    /// Unlike [`Self::close`] this does not consume self
297    ///
298    /// Attempting to write after calling finish will result in an error
299    pub fn finish(&mut self) -> Result<parquet::FileMetaData> {
300        self.assert_previous_writer_closed()?;
301        let metadata = self.write_metadata()?;
302        self.buf.flush()?;
303        Ok(metadata)
304    }
305
306    /// Closes and finalises file writer, returning the file metadata.
307    pub fn close(mut self) -> Result<parquet::FileMetaData> {
308        self.finish()
309    }
310
311    /// Writes magic bytes at the beginning of the file.
312    #[cfg(not(feature = "encryption"))]
313    fn start_file(_properties: &WriterPropertiesPtr, buf: &mut TrackedWrite<W>) -> Result<()> {
314        buf.write_all(get_file_magic())?;
315        Ok(())
316    }
317
318    /// Writes magic bytes at the beginning of the file.
319    #[cfg(feature = "encryption")]
320    fn start_file(properties: &WriterPropertiesPtr, buf: &mut TrackedWrite<W>) -> Result<()> {
321        let magic = get_file_magic(properties.file_encryption_properties.as_ref());
322
323        buf.write_all(magic)?;
324        Ok(())
325    }
326
327    /// Assembles and writes metadata at the end of the file.
328    fn write_metadata(&mut self) -> Result<parquet::FileMetaData> {
329        self.finished = true;
330
331        // write out any remaining bloom filters after all row groups
332        for row_group in &mut self.row_groups {
333            write_bloom_filters(&mut self.buf, &mut self.bloom_filters, row_group)?;
334        }
335
336        let key_value_metadata = match self.props.key_value_metadata() {
337            Some(kv) => Some(kv.iter().chain(&self.kv_metadatas).cloned().collect()),
338            None if self.kv_metadatas.is_empty() => None,
339            None => Some(self.kv_metadatas.clone()),
340        };
341
342        let row_groups = self
343            .row_groups
344            .iter()
345            .map(|v| v.to_thrift())
346            .collect::<Vec<_>>();
347
348        let mut encoder = ThriftMetadataWriter::new(
349            &mut self.buf,
350            &self.schema,
351            &self.descr,
352            row_groups,
353            Some(self.props.created_by().to_string()),
354            self.props.writer_version().as_num(),
355        );
356
357        #[cfg(feature = "encryption")]
358        {
359            encoder = encoder.with_file_encryptor(self.file_encryptor.clone());
360        }
361
362        if let Some(key_value_metadata) = key_value_metadata {
363            encoder = encoder.with_key_value_metadata(key_value_metadata)
364        }
365        encoder = encoder.with_column_indexes(&self.column_indexes);
366        encoder = encoder.with_offset_indexes(&self.offset_indexes);
367        encoder.finish()
368    }
369
370    #[inline]
371    fn assert_previous_writer_closed(&self) -> Result<()> {
372        if self.finished {
373            return Err(general_err!("SerializedFileWriter already finished"));
374        }
375
376        if self.row_group_index != self.row_groups.len() {
377            Err(general_err!("Previous row group writer was not closed"))
378        } else {
379            Ok(())
380        }
381    }
382
383    /// Add a [`KeyValue`] to the file writer's metadata
384    pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
385        self.kv_metadatas.push(kv_metadata);
386    }
387
388    /// Returns a reference to schema descriptor.
389    pub fn schema_descr(&self) -> &SchemaDescriptor {
390        &self.descr
391    }
392
393    /// Returns a reference to the writer properties
394    pub fn properties(&self) -> &WriterPropertiesPtr {
395        &self.props
396    }
397
398    /// Returns a reference to the underlying writer.
399    pub fn inner(&self) -> &W {
400        self.buf.inner()
401    }
402
403    /// Returns a mutable reference to the underlying writer.
404    ///
405    /// It is inadvisable to directly write to the underlying writer.
406    pub fn inner_mut(&mut self) -> &mut W {
407        self.buf.inner_mut()
408    }
409
410    /// Writes the file footer and returns the underlying writer.
411    pub fn into_inner(mut self) -> Result<W> {
412        self.assert_previous_writer_closed()?;
413        let _ = self.write_metadata()?;
414
415        self.buf.into_inner()
416    }
417
418    /// Returns the number of bytes written to this instance
419    pub fn bytes_written(&self) -> usize {
420        self.buf.bytes_written()
421    }
422
423    /// Get the file encryptor used by this instance to encrypt data
424    #[cfg(feature = "encryption")]
425    pub(crate) fn file_encryptor(&self) -> Option<Arc<FileEncryptor>> {
426        self.file_encryptor.clone()
427    }
428}
429
430/// Serialize all the bloom filters of the given row group to the given buffer,
431/// and returns the updated row group metadata.
432fn write_bloom_filters<W: Write + Send>(
433    buf: &mut TrackedWrite<W>,
434    bloom_filters: &mut [Vec<Option<Sbbf>>],
435    row_group: &mut RowGroupMetaData,
436) -> Result<()> {
437    // iter row group
438    // iter each column
439    // write bloom filter to the file
440
441    let row_group_idx: u16 = row_group
442        .ordinal()
443        .expect("Missing row group ordinal")
444        .try_into()
445        .map_err(|_| {
446            ParquetError::General(format!(
447                "Negative row group ordinal: {})",
448                row_group.ordinal().unwrap()
449            ))
450        })?;
451    let row_group_idx = row_group_idx as usize;
452    for (column_idx, column_chunk) in row_group.columns_mut().iter_mut().enumerate() {
453        if let Some(bloom_filter) = bloom_filters[row_group_idx][column_idx].take() {
454            let start_offset = buf.bytes_written();
455            bloom_filter.write(&mut *buf)?;
456            let end_offset = buf.bytes_written();
457            // set offset and index for bloom filter
458            *column_chunk = column_chunk
459                .clone()
460                .into_builder()
461                .set_bloom_filter_offset(Some(start_offset as i64))
462                .set_bloom_filter_length(Some((end_offset - start_offset) as i32))
463                .build()?;
464        }
465    }
466    Ok(())
467}
468
469/// Parquet row group writer API.
470/// Provides methods to access column writers in an iterator-like fashion, order is
471/// guaranteed to match the order of schema leaves (column descriptors).
472///
473/// All columns should be written sequentially; the main workflow is:
474/// - Request the next column using `next_column` method - this will return `None` if no
475///   more columns are available to write.
476/// - Once done writing a column, close column writer with `close`
477/// - Once all columns have been written, close row group writer with `close`
478///   method. THe close method will return row group metadata and is no-op
479///   on already closed row group.
480pub struct SerializedRowGroupWriter<'a, W: Write> {
481    descr: SchemaDescPtr,
482    props: WriterPropertiesPtr,
483    buf: &'a mut TrackedWrite<W>,
484    total_rows_written: Option<u64>,
485    total_bytes_written: u64,
486    total_uncompressed_bytes: i64,
487    column_index: usize,
488    row_group_metadata: Option<RowGroupMetaDataPtr>,
489    column_chunks: Vec<ColumnChunkMetaData>,
490    bloom_filters: Vec<Option<Sbbf>>,
491    column_indexes: Vec<Option<ColumnIndex>>,
492    offset_indexes: Vec<Option<OffsetIndex>>,
493    row_group_index: i16,
494    file_offset: i64,
495    on_close: Option<OnCloseRowGroup<'a, W>>,
496    #[cfg(feature = "encryption")]
497    file_encryptor: Option<Arc<FileEncryptor>>,
498}
499
500impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> {
501    /// Creates a new `SerializedRowGroupWriter` with:
502    ///
503    /// - `schema_descr` - the schema to write
504    /// - `properties` - writer properties
505    /// - `buf` - the buffer to write data to
506    /// - `row_group_index` - row group index in this parquet file.
507    /// - `file_offset` - file offset of this row group in this parquet file.
508    /// - `on_close` - an optional callback that will invoked on [`Self::close`]
509    pub fn new(
510        schema_descr: SchemaDescPtr,
511        properties: WriterPropertiesPtr,
512        buf: &'a mut TrackedWrite<W>,
513        row_group_index: i16,
514        on_close: Option<OnCloseRowGroup<'a, W>>,
515    ) -> Self {
516        let num_columns = schema_descr.num_columns();
517        let file_offset = buf.bytes_written() as i64;
518        Self {
519            buf,
520            row_group_index,
521            file_offset,
522            on_close,
523            total_rows_written: None,
524            descr: schema_descr,
525            props: properties,
526            column_index: 0,
527            row_group_metadata: None,
528            column_chunks: Vec::with_capacity(num_columns),
529            bloom_filters: Vec::with_capacity(num_columns),
530            column_indexes: Vec::with_capacity(num_columns),
531            offset_indexes: Vec::with_capacity(num_columns),
532            total_bytes_written: 0,
533            total_uncompressed_bytes: 0,
534            #[cfg(feature = "encryption")]
535            file_encryptor: None,
536        }
537    }
538
539    #[cfg(feature = "encryption")]
540    /// Set the file encryptor to use for encrypting row group data and metadata
541    pub(crate) fn with_file_encryptor(
542        mut self,
543        file_encryptor: Option<Arc<FileEncryptor>>,
544    ) -> Self {
545        self.file_encryptor = file_encryptor;
546        self
547    }
548
549    /// Advance `self.column_index` returning the next [`ColumnDescPtr`] if any
550    fn next_column_desc(&mut self) -> Option<ColumnDescPtr> {
551        let ret = self.descr.columns().get(self.column_index)?.clone();
552        self.column_index += 1;
553        Some(ret)
554    }
555
556    /// Returns [`OnCloseColumnChunk`] for the next writer
557    fn get_on_close(&mut self) -> (&mut TrackedWrite<W>, OnCloseColumnChunk<'_>) {
558        let total_bytes_written = &mut self.total_bytes_written;
559        let total_uncompressed_bytes = &mut self.total_uncompressed_bytes;
560        let total_rows_written = &mut self.total_rows_written;
561        let column_chunks = &mut self.column_chunks;
562        let column_indexes = &mut self.column_indexes;
563        let offset_indexes = &mut self.offset_indexes;
564        let bloom_filters = &mut self.bloom_filters;
565
566        let on_close = |r: ColumnCloseResult| {
567            // Update row group writer metrics
568            *total_bytes_written += r.bytes_written;
569            *total_uncompressed_bytes += r.metadata.uncompressed_size();
570            column_chunks.push(r.metadata);
571            bloom_filters.push(r.bloom_filter);
572            column_indexes.push(r.column_index);
573            offset_indexes.push(r.offset_index);
574
575            if let Some(rows) = *total_rows_written {
576                if rows != r.rows_written {
577                    return Err(general_err!(
578                        "Incorrect number of rows, expected {} != {} rows",
579                        rows,
580                        r.rows_written
581                    ));
582                }
583            } else {
584                *total_rows_written = Some(r.rows_written);
585            }
586
587            Ok(())
588        };
589        (self.buf, Box::new(on_close))
590    }
591
592    /// Returns the next column writer, if available, using the factory function;
593    /// otherwise returns `None`.
594    pub(crate) fn next_column_with_factory<'b, F, C>(&'b mut self, factory: F) -> Result<Option<C>>
595    where
596        F: FnOnce(
597            ColumnDescPtr,
598            WriterPropertiesPtr,
599            Box<dyn PageWriter + 'b>,
600            OnCloseColumnChunk<'b>,
601        ) -> Result<C>,
602    {
603        self.assert_previous_writer_closed()?;
604
605        let encryptor_context = self.get_page_encryptor_context();
606
607        Ok(match self.next_column_desc() {
608            Some(column) => {
609                let props = self.props.clone();
610                let (buf, on_close) = self.get_on_close();
611
612                let page_writer = SerializedPageWriter::new(buf);
613                let page_writer =
614                    Self::set_page_writer_encryptor(&column, encryptor_context, page_writer)?;
615
616                Some(factory(
617                    column,
618                    props,
619                    Box::new(page_writer),
620                    Box::new(on_close),
621                )?)
622            }
623            None => None,
624        })
625    }
626
627    /// Returns the next column writer, if available; otherwise returns `None`.
628    /// In case of any IO error or Thrift error, or if row group writer has already been
629    /// closed returns `Err`.
630    pub fn next_column(&mut self) -> Result<Option<SerializedColumnWriter<'_>>> {
631        self.next_column_with_factory(|descr, props, page_writer, on_close| {
632            let column_writer = get_column_writer(descr, props, page_writer);
633            Ok(SerializedColumnWriter::new(column_writer, Some(on_close)))
634        })
635    }
636
637    /// Append an encoded column chunk from another source without decoding it
638    ///
639    /// This can be used for efficiently concatenating or projecting parquet data,
640    /// or encoding parquet data to temporary in-memory buffers
641    ///
642    /// See [`Self::next_column`] for writing data that isn't already encoded
643    pub fn append_column<R: ChunkReader>(
644        &mut self,
645        reader: &R,
646        mut close: ColumnCloseResult,
647    ) -> Result<()> {
648        self.assert_previous_writer_closed()?;
649        let desc = self
650            .next_column_desc()
651            .ok_or_else(|| general_err!("exhausted columns in SerializedRowGroupWriter"))?;
652
653        let metadata = close.metadata;
654
655        if metadata.column_descr() != desc.as_ref() {
656            return Err(general_err!(
657                "column descriptor mismatch, expected {:?} got {:?}",
658                desc,
659                metadata.column_descr()
660            ));
661        }
662
663        let src_dictionary_offset = metadata.dictionary_page_offset();
664        let src_data_offset = metadata.data_page_offset();
665        let src_offset = src_dictionary_offset.unwrap_or(src_data_offset);
666        let src_length = metadata.compressed_size();
667
668        let write_offset = self.buf.bytes_written();
669        let mut read = reader.get_read(src_offset as _)?.take(src_length as _);
670        let write_length = std::io::copy(&mut read, &mut self.buf)?;
671
672        if src_length as u64 != write_length {
673            return Err(general_err!(
674                "Failed to splice column data, expected {read_length} got {write_length}"
675            ));
676        }
677
678        let map_offset = |x| x - src_offset + write_offset as i64;
679        let mut builder = ColumnChunkMetaData::builder(metadata.column_descr_ptr())
680            .set_compression(metadata.compression())
681            .set_encodings(metadata.encodings().clone())
682            .set_total_compressed_size(metadata.compressed_size())
683            .set_total_uncompressed_size(metadata.uncompressed_size())
684            .set_num_values(metadata.num_values())
685            .set_data_page_offset(map_offset(src_data_offset))
686            .set_dictionary_page_offset(src_dictionary_offset.map(map_offset))
687            .set_unencoded_byte_array_data_bytes(metadata.unencoded_byte_array_data_bytes());
688
689        if let Some(rep_hist) = metadata.repetition_level_histogram() {
690            builder = builder.set_repetition_level_histogram(Some(rep_hist.clone()))
691        }
692        if let Some(def_hist) = metadata.definition_level_histogram() {
693            builder = builder.set_definition_level_histogram(Some(def_hist.clone()))
694        }
695        if let Some(statistics) = metadata.statistics() {
696            builder = builder.set_statistics(statistics.clone())
697        }
698        builder = self.set_column_crypto_metadata(builder, &metadata);
699        close.metadata = builder.build()?;
700
701        if let Some(offsets) = close.offset_index.as_mut() {
702            for location in &mut offsets.page_locations {
703                location.offset = map_offset(location.offset)
704            }
705        }
706
707        let (_, on_close) = self.get_on_close();
708        on_close(close)
709    }
710
711    /// Closes this row group writer and returns row group metadata.
712    pub fn close(mut self) -> Result<RowGroupMetaDataPtr> {
713        if self.row_group_metadata.is_none() {
714            self.assert_previous_writer_closed()?;
715
716            let column_chunks = std::mem::take(&mut self.column_chunks);
717            let row_group_metadata = RowGroupMetaData::builder(self.descr.clone())
718                .set_column_metadata(column_chunks)
719                .set_total_byte_size(self.total_uncompressed_bytes)
720                .set_num_rows(self.total_rows_written.unwrap_or(0) as i64)
721                .set_sorting_columns(self.props.sorting_columns().cloned())
722                .set_ordinal(self.row_group_index)
723                .set_file_offset(self.file_offset)
724                .build()?;
725
726            self.row_group_metadata = Some(Arc::new(row_group_metadata.clone()));
727
728            if let Some(on_close) = self.on_close.take() {
729                on_close(
730                    self.buf,
731                    row_group_metadata,
732                    self.bloom_filters,
733                    self.column_indexes,
734                    self.offset_indexes,
735                )?
736            }
737        }
738
739        let metadata = self.row_group_metadata.as_ref().unwrap().clone();
740        Ok(metadata)
741    }
742
743    /// Set the column crypto metadata for a column chunk
744    #[cfg(feature = "encryption")]
745    fn set_column_crypto_metadata(
746        &self,
747        builder: ColumnChunkMetaDataBuilder,
748        metadata: &ColumnChunkMetaData,
749    ) -> ColumnChunkMetaDataBuilder {
750        if let Some(file_encryptor) = self.file_encryptor.as_ref() {
751            builder.set_column_crypto_metadata(get_column_crypto_metadata(
752                file_encryptor.properties(),
753                &metadata.column_descr_ptr(),
754            ))
755        } else {
756            builder
757        }
758    }
759
760    /// Get context required to create a [`PageEncryptor`] for a column
761    #[cfg(feature = "encryption")]
762    fn get_page_encryptor_context(&self) -> PageEncryptorContext {
763        PageEncryptorContext {
764            file_encryptor: self.file_encryptor.clone(),
765            row_group_index: self.row_group_index as usize,
766            column_index: self.column_index,
767        }
768    }
769
770    /// Set the [`PageEncryptor`] on a page writer if a column is encrypted
771    #[cfg(feature = "encryption")]
772    fn set_page_writer_encryptor<'b>(
773        column: &ColumnDescPtr,
774        context: PageEncryptorContext,
775        page_writer: SerializedPageWriter<'b, W>,
776    ) -> Result<SerializedPageWriter<'b, W>> {
777        let page_encryptor = PageEncryptor::create_if_column_encrypted(
778            &context.file_encryptor,
779            context.row_group_index,
780            context.column_index,
781            &column.path().string(),
782        )?;
783
784        Ok(page_writer.with_page_encryptor(page_encryptor))
785    }
786
787    /// No-op implementation of setting the column crypto metadata for a column chunk
788    #[cfg(not(feature = "encryption"))]
789    fn set_column_crypto_metadata(
790        &self,
791        builder: ColumnChunkMetaDataBuilder,
792        _metadata: &ColumnChunkMetaData,
793    ) -> ColumnChunkMetaDataBuilder {
794        builder
795    }
796
797    #[cfg(not(feature = "encryption"))]
798    fn get_page_encryptor_context(&self) -> PageEncryptorContext {
799        PageEncryptorContext {}
800    }
801
802    /// No-op implementation of setting a [`PageEncryptor`] for when encryption is disabled
803    #[cfg(not(feature = "encryption"))]
804    fn set_page_writer_encryptor<'b>(
805        _column: &ColumnDescPtr,
806        _context: PageEncryptorContext,
807        page_writer: SerializedPageWriter<'b, W>,
808    ) -> Result<SerializedPageWriter<'b, W>> {
809        Ok(page_writer)
810    }
811
812    #[inline]
813    fn assert_previous_writer_closed(&self) -> Result<()> {
814        if self.column_index != self.column_chunks.len() {
815            Err(general_err!("Previous column writer was not closed"))
816        } else {
817            Ok(())
818        }
819    }
820}
821
822/// Context required to create a [`PageEncryptor`] for a column
823#[cfg(feature = "encryption")]
824struct PageEncryptorContext {
825    file_encryptor: Option<Arc<FileEncryptor>>,
826    row_group_index: usize,
827    column_index: usize,
828}
829
830#[cfg(not(feature = "encryption"))]
831struct PageEncryptorContext {}
832
833/// A wrapper around a [`ColumnWriter`] that invokes a callback on [`Self::close`]
834pub struct SerializedColumnWriter<'a> {
835    inner: ColumnWriter<'a>,
836    on_close: Option<OnCloseColumnChunk<'a>>,
837}
838
839impl<'a> SerializedColumnWriter<'a> {
840    /// Create a new [`SerializedColumnWriter`] from a [`ColumnWriter`] and an
841    /// optional callback to be invoked on [`Self::close`]
842    pub fn new(inner: ColumnWriter<'a>, on_close: Option<OnCloseColumnChunk<'a>>) -> Self {
843        Self { inner, on_close }
844    }
845
846    /// Returns a reference to an untyped [`ColumnWriter`]
847    pub fn untyped(&mut self) -> &mut ColumnWriter<'a> {
848        &mut self.inner
849    }
850
851    /// Returns a reference to a typed [`ColumnWriterImpl`]
852    pub fn typed<T: DataType>(&mut self) -> &mut ColumnWriterImpl<'a, T> {
853        get_typed_column_writer_mut(&mut self.inner)
854    }
855
856    /// Close this [`SerializedColumnWriter`]
857    pub fn close(mut self) -> Result<()> {
858        let r = self.inner.close()?;
859        if let Some(on_close) = self.on_close.take() {
860            on_close(r)?
861        }
862
863        Ok(())
864    }
865}
866
867/// A serialized implementation for Parquet [`PageWriter`].
868/// Writes and serializes pages and metadata into output stream.
869///
870/// `SerializedPageWriter` should not be used after calling `close()`.
871pub struct SerializedPageWriter<'a, W: Write> {
872    sink: &'a mut TrackedWrite<W>,
873    #[cfg(feature = "encryption")]
874    page_encryptor: Option<PageEncryptor>,
875}
876
877impl<'a, W: Write> SerializedPageWriter<'a, W> {
878    /// Creates new page writer.
879    pub fn new(sink: &'a mut TrackedWrite<W>) -> Self {
880        Self {
881            sink,
882            #[cfg(feature = "encryption")]
883            page_encryptor: None,
884        }
885    }
886
887    /// Serializes page header into Thrift.
888    /// Returns number of bytes that have been written into the sink.
889    #[inline]
890    fn serialize_page_header(&mut self, header: parquet::PageHeader) -> Result<usize> {
891        let start_pos = self.sink.bytes_written();
892        match self.page_encryptor_and_sink_mut() {
893            Some((page_encryptor, sink)) => {
894                page_encryptor.encrypt_page_header(&header, sink)?;
895            }
896            None => {
897                let mut protocol = TCompactOutputProtocol::new(&mut self.sink);
898                header.write_to_out_protocol(&mut protocol)?;
899            }
900        }
901        Ok(self.sink.bytes_written() - start_pos)
902    }
903}
904
905#[cfg(feature = "encryption")]
906impl<'a, W: Write> SerializedPageWriter<'a, W> {
907    /// Set the encryptor to use to encrypt page data
908    fn with_page_encryptor(mut self, page_encryptor: Option<PageEncryptor>) -> Self {
909        self.page_encryptor = page_encryptor;
910        self
911    }
912
913    fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
914        self.page_encryptor.as_mut()
915    }
916
917    fn page_encryptor_and_sink_mut(
918        &mut self,
919    ) -> Option<(&mut PageEncryptor, &mut &'a mut TrackedWrite<W>)> {
920        self.page_encryptor.as_mut().map(|pe| (pe, &mut self.sink))
921    }
922}
923
924#[cfg(not(feature = "encryption"))]
925impl<'a, W: Write> SerializedPageWriter<'a, W> {
926    fn page_encryptor_mut(&mut self) -> Option<&mut PageEncryptor> {
927        None
928    }
929
930    fn page_encryptor_and_sink_mut(
931        &mut self,
932    ) -> Option<(&mut PageEncryptor, &mut &'a mut TrackedWrite<W>)> {
933        None
934    }
935}
936
937impl<W: Write + Send> PageWriter for SerializedPageWriter<'_, W> {
938    fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
939        let page = match self.page_encryptor_mut() {
940            Some(page_encryptor) => page_encryptor.encrypt_compressed_page(page)?,
941            None => page,
942        };
943
944        let page_type = page.page_type();
945        let start_pos = self.sink.bytes_written() as u64;
946
947        let page_header = page.to_thrift_header();
948        let header_size = self.serialize_page_header(page_header)?;
949
950        self.sink.write_all(page.data())?;
951
952        let mut spec = PageWriteSpec::new();
953        spec.page_type = page_type;
954        spec.uncompressed_size = page.uncompressed_size() + header_size;
955        spec.compressed_size = page.compressed_size() + header_size;
956        spec.offset = start_pos;
957        spec.bytes_written = self.sink.bytes_written() as u64 - start_pos;
958        spec.num_values = page.num_values();
959
960        if let Some(page_encryptor) = self.page_encryptor_mut() {
961            if page.compressed_page().is_data_page() {
962                page_encryptor.increment_page();
963            }
964        }
965        Ok(spec)
966    }
967
968    fn close(&mut self) -> Result<()> {
969        self.sink.flush()?;
970        Ok(())
971    }
972}
973
974/// Get the magic bytes at the start and end of the file that identify this
975/// as a Parquet file.
976#[cfg(feature = "encryption")]
977pub(crate) fn get_file_magic(
978    file_encryption_properties: Option<&FileEncryptionProperties>,
979) -> &'static [u8; 4] {
980    match file_encryption_properties.as_ref() {
981        Some(encryption_properties) if encryption_properties.encrypt_footer() => {
982            &PARQUET_MAGIC_ENCR_FOOTER
983        }
984        _ => &PARQUET_MAGIC,
985    }
986}
987
988#[cfg(not(feature = "encryption"))]
989pub(crate) fn get_file_magic() -> &'static [u8; 4] {
990    &PARQUET_MAGIC
991}
992
993#[cfg(test)]
994mod tests {
995    use super::*;
996
997    #[cfg(feature = "arrow")]
998    use arrow_array::RecordBatchReader;
999    use bytes::Bytes;
1000    use std::fs::File;
1001
1002    #[cfg(feature = "arrow")]
1003    use crate::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
1004    #[cfg(feature = "arrow")]
1005    use crate::arrow::ArrowWriter;
1006    use crate::basic::{
1007        ColumnOrder, Compression, ConvertedType, Encoding, LogicalType, Repetition, SortOrder, Type,
1008    };
1009    use crate::column::page::{Page, PageReader};
1010    use crate::column::reader::get_typed_column_reader;
1011    use crate::compression::{create_codec, Codec, CodecOptionsBuilder};
1012    use crate::data_type::{BoolType, ByteArrayType, Int32Type};
1013    use crate::file::page_index::index::Index;
1014    use crate::file::properties::EnabledStatistics;
1015    use crate::file::serialized_reader::ReadOptionsBuilder;
1016    use crate::file::{
1017        properties::{ReaderProperties, WriterProperties, WriterVersion},
1018        reader::{FileReader, SerializedFileReader, SerializedPageReader},
1019        statistics::{from_thrift, to_thrift, Statistics},
1020    };
1021    use crate::format::SortingColumn;
1022    use crate::record::{Row, RowAccessor};
1023    use crate::schema::parser::parse_message_type;
1024    use crate::schema::types;
1025    use crate::schema::types::{ColumnDescriptor, ColumnPath};
1026    use crate::util::test_common::rand_gen::RandGen;
1027
1028    #[test]
1029    fn test_row_group_writer_error_not_all_columns_written() {
1030        let file = tempfile::tempfile().unwrap();
1031        let schema = Arc::new(
1032            types::Type::group_type_builder("schema")
1033                .with_fields(vec![Arc::new(
1034                    types::Type::primitive_type_builder("col1", Type::INT32)
1035                        .build()
1036                        .unwrap(),
1037                )])
1038                .build()
1039                .unwrap(),
1040        );
1041        let props = Default::default();
1042        let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
1043        let row_group_writer = writer.next_row_group().unwrap();
1044        let res = row_group_writer.close();
1045        assert!(res.is_err());
1046        if let Err(err) = res {
1047            assert_eq!(
1048                format!("{err}"),
1049                "Parquet error: Column length mismatch: 1 != 0"
1050            );
1051        }
1052    }
1053
1054    #[test]
1055    fn test_row_group_writer_num_records_mismatch() {
1056        let file = tempfile::tempfile().unwrap();
1057        let schema = Arc::new(
1058            types::Type::group_type_builder("schema")
1059                .with_fields(vec![
1060                    Arc::new(
1061                        types::Type::primitive_type_builder("col1", Type::INT32)
1062                            .with_repetition(Repetition::REQUIRED)
1063                            .build()
1064                            .unwrap(),
1065                    ),
1066                    Arc::new(
1067                        types::Type::primitive_type_builder("col2", Type::INT32)
1068                            .with_repetition(Repetition::REQUIRED)
1069                            .build()
1070                            .unwrap(),
1071                    ),
1072                ])
1073                .build()
1074                .unwrap(),
1075        );
1076        let props = Default::default();
1077        let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
1078        let mut row_group_writer = writer.next_row_group().unwrap();
1079
1080        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
1081        col_writer
1082            .typed::<Int32Type>()
1083            .write_batch(&[1, 2, 3], None, None)
1084            .unwrap();
1085        col_writer.close().unwrap();
1086
1087        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
1088        col_writer
1089            .typed::<Int32Type>()
1090            .write_batch(&[1, 2], None, None)
1091            .unwrap();
1092
1093        let err = col_writer.close().unwrap_err();
1094        assert_eq!(
1095            err.to_string(),
1096            "Parquet error: Incorrect number of rows, expected 3 != 2 rows"
1097        );
1098    }
1099
1100    #[test]
1101    fn test_file_writer_empty_file() {
1102        let file = tempfile::tempfile().unwrap();
1103
1104        let schema = Arc::new(
1105            types::Type::group_type_builder("schema")
1106                .with_fields(vec![Arc::new(
1107                    types::Type::primitive_type_builder("col1", Type::INT32)
1108                        .build()
1109                        .unwrap(),
1110                )])
1111                .build()
1112                .unwrap(),
1113        );
1114        let props = Default::default();
1115        let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1116        writer.close().unwrap();
1117
1118        let reader = SerializedFileReader::new(file).unwrap();
1119        assert_eq!(reader.get_row_iter(None).unwrap().count(), 0);
1120    }
1121
1122    #[test]
1123    fn test_file_writer_column_orders_populated() {
1124        let file = tempfile::tempfile().unwrap();
1125
1126        let schema = Arc::new(
1127            types::Type::group_type_builder("schema")
1128                .with_fields(vec![
1129                    Arc::new(
1130                        types::Type::primitive_type_builder("col1", Type::INT32)
1131                            .build()
1132                            .unwrap(),
1133                    ),
1134                    Arc::new(
1135                        types::Type::primitive_type_builder("col2", Type::FIXED_LEN_BYTE_ARRAY)
1136                            .with_converted_type(ConvertedType::INTERVAL)
1137                            .with_length(12)
1138                            .build()
1139                            .unwrap(),
1140                    ),
1141                    Arc::new(
1142                        types::Type::group_type_builder("nested")
1143                            .with_repetition(Repetition::REQUIRED)
1144                            .with_fields(vec![
1145                                Arc::new(
1146                                    types::Type::primitive_type_builder(
1147                                        "col3",
1148                                        Type::FIXED_LEN_BYTE_ARRAY,
1149                                    )
1150                                    .with_logical_type(Some(LogicalType::Float16))
1151                                    .with_length(2)
1152                                    .build()
1153                                    .unwrap(),
1154                                ),
1155                                Arc::new(
1156                                    types::Type::primitive_type_builder("col4", Type::BYTE_ARRAY)
1157                                        .with_logical_type(Some(LogicalType::String))
1158                                        .build()
1159                                        .unwrap(),
1160                                ),
1161                            ])
1162                            .build()
1163                            .unwrap(),
1164                    ),
1165                ])
1166                .build()
1167                .unwrap(),
1168        );
1169
1170        let props = Default::default();
1171        let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1172        writer.close().unwrap();
1173
1174        let reader = SerializedFileReader::new(file).unwrap();
1175
1176        // only leaves
1177        let expected = vec![
1178            // INT32
1179            ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED),
1180            // INTERVAL
1181            ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::UNDEFINED),
1182            // Float16
1183            ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED),
1184            // String
1185            ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::UNSIGNED),
1186        ];
1187        let actual = reader.metadata().file_metadata().column_orders();
1188
1189        assert!(actual.is_some());
1190        let actual = actual.unwrap();
1191        assert_eq!(*actual, expected);
1192    }
1193
1194    #[test]
1195    fn test_file_writer_with_metadata() {
1196        let file = tempfile::tempfile().unwrap();
1197
1198        let schema = Arc::new(
1199            types::Type::group_type_builder("schema")
1200                .with_fields(vec![Arc::new(
1201                    types::Type::primitive_type_builder("col1", Type::INT32)
1202                        .build()
1203                        .unwrap(),
1204                )])
1205                .build()
1206                .unwrap(),
1207        );
1208        let props = Arc::new(
1209            WriterProperties::builder()
1210                .set_key_value_metadata(Some(vec![KeyValue::new(
1211                    "key".to_string(),
1212                    "value".to_string(),
1213                )]))
1214                .build(),
1215        );
1216        let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1217        writer.close().unwrap();
1218
1219        let reader = SerializedFileReader::new(file).unwrap();
1220        assert_eq!(
1221            reader
1222                .metadata()
1223                .file_metadata()
1224                .key_value_metadata()
1225                .to_owned()
1226                .unwrap()
1227                .len(),
1228            1
1229        );
1230    }
1231
1232    #[test]
1233    fn test_file_writer_v2_with_metadata() {
1234        let file = tempfile::tempfile().unwrap();
1235        let field_logical_type = Some(LogicalType::Integer {
1236            bit_width: 8,
1237            is_signed: false,
1238        });
1239        let field = Arc::new(
1240            types::Type::primitive_type_builder("col1", Type::INT32)
1241                .with_logical_type(field_logical_type.clone())
1242                .with_converted_type(field_logical_type.into())
1243                .build()
1244                .unwrap(),
1245        );
1246        let schema = Arc::new(
1247            types::Type::group_type_builder("schema")
1248                .with_fields(vec![field.clone()])
1249                .build()
1250                .unwrap(),
1251        );
1252        let props = Arc::new(
1253            WriterProperties::builder()
1254                .set_key_value_metadata(Some(vec![KeyValue::new(
1255                    "key".to_string(),
1256                    "value".to_string(),
1257                )]))
1258                .set_writer_version(WriterVersion::PARQUET_2_0)
1259                .build(),
1260        );
1261        let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1262        writer.close().unwrap();
1263
1264        let reader = SerializedFileReader::new(file).unwrap();
1265
1266        assert_eq!(
1267            reader
1268                .metadata()
1269                .file_metadata()
1270                .key_value_metadata()
1271                .to_owned()
1272                .unwrap()
1273                .len(),
1274            1
1275        );
1276
1277        // ARROW-11803: Test that the converted and logical types have been populated
1278        let fields = reader.metadata().file_metadata().schema().get_fields();
1279        assert_eq!(fields.len(), 1);
1280        assert_eq!(fields[0], field);
1281    }
1282
1283    #[test]
1284    fn test_file_writer_with_sorting_columns_metadata() {
1285        let file = tempfile::tempfile().unwrap();
1286
1287        let schema = Arc::new(
1288            types::Type::group_type_builder("schema")
1289                .with_fields(vec![
1290                    Arc::new(
1291                        types::Type::primitive_type_builder("col1", Type::INT32)
1292                            .build()
1293                            .unwrap(),
1294                    ),
1295                    Arc::new(
1296                        types::Type::primitive_type_builder("col2", Type::INT32)
1297                            .build()
1298                            .unwrap(),
1299                    ),
1300                ])
1301                .build()
1302                .unwrap(),
1303        );
1304        let expected_result = Some(vec![SortingColumn {
1305            column_idx: 0,
1306            descending: false,
1307            nulls_first: true,
1308        }]);
1309        let props = Arc::new(
1310            WriterProperties::builder()
1311                .set_key_value_metadata(Some(vec![KeyValue::new(
1312                    "key".to_string(),
1313                    "value".to_string(),
1314                )]))
1315                .set_sorting_columns(expected_result.clone())
1316                .build(),
1317        );
1318        let mut writer =
1319            SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
1320        let mut row_group_writer = writer.next_row_group().expect("get row group writer");
1321
1322        let col_writer = row_group_writer.next_column().unwrap().unwrap();
1323        col_writer.close().unwrap();
1324
1325        let col_writer = row_group_writer.next_column().unwrap().unwrap();
1326        col_writer.close().unwrap();
1327
1328        row_group_writer.close().unwrap();
1329        writer.close().unwrap();
1330
1331        let reader = SerializedFileReader::new(file).unwrap();
1332        let result: Vec<Option<&Vec<SortingColumn>>> = reader
1333            .metadata()
1334            .row_groups()
1335            .iter()
1336            .map(|f| f.sorting_columns())
1337            .collect();
1338        // validate the sorting column read match the one written above
1339        assert_eq!(expected_result.as_ref(), result[0]);
1340    }
1341
1342    #[test]
1343    fn test_file_writer_empty_row_groups() {
1344        let file = tempfile::tempfile().unwrap();
1345        test_file_roundtrip(file, vec![]);
1346    }
1347
1348    #[test]
1349    fn test_file_writer_single_row_group() {
1350        let file = tempfile::tempfile().unwrap();
1351        test_file_roundtrip(file, vec![vec![1, 2, 3, 4, 5]]);
1352    }
1353
1354    #[test]
1355    fn test_file_writer_multiple_row_groups() {
1356        let file = tempfile::tempfile().unwrap();
1357        test_file_roundtrip(
1358            file,
1359            vec![
1360                vec![1, 2, 3, 4, 5],
1361                vec![1, 2, 3],
1362                vec![1],
1363                vec![1, 2, 3, 4, 5, 6],
1364            ],
1365        );
1366    }
1367
1368    #[test]
1369    fn test_file_writer_multiple_large_row_groups() {
1370        let file = tempfile::tempfile().unwrap();
1371        test_file_roundtrip(
1372            file,
1373            vec![vec![123; 1024], vec![124; 1000], vec![125; 15], vec![]],
1374        );
1375    }
1376
1377    #[test]
1378    fn test_page_writer_data_pages() {
1379        let pages = vec![
1380            Page::DataPage {
1381                buf: Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]),
1382                num_values: 10,
1383                encoding: Encoding::DELTA_BINARY_PACKED,
1384                def_level_encoding: Encoding::RLE,
1385                rep_level_encoding: Encoding::RLE,
1386                statistics: Some(Statistics::int32(Some(1), Some(3), None, Some(7), true)),
1387            },
1388            Page::DataPageV2 {
1389                buf: Bytes::from(vec![4; 128]),
1390                num_values: 10,
1391                encoding: Encoding::DELTA_BINARY_PACKED,
1392                num_nulls: 2,
1393                num_rows: 12,
1394                def_levels_byte_len: 24,
1395                rep_levels_byte_len: 32,
1396                is_compressed: false,
1397                statistics: Some(Statistics::int32(Some(1), Some(3), None, Some(7), true)),
1398            },
1399        ];
1400
1401        test_page_roundtrip(&pages[..], Compression::SNAPPY, Type::INT32);
1402        test_page_roundtrip(&pages[..], Compression::UNCOMPRESSED, Type::INT32);
1403    }
1404
1405    #[test]
1406    fn test_page_writer_dict_pages() {
1407        let pages = vec![
1408            Page::DictionaryPage {
1409                buf: Bytes::from(vec![1, 2, 3, 4, 5]),
1410                num_values: 5,
1411                encoding: Encoding::RLE_DICTIONARY,
1412                is_sorted: false,
1413            },
1414            Page::DataPage {
1415                buf: Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]),
1416                num_values: 10,
1417                encoding: Encoding::DELTA_BINARY_PACKED,
1418                def_level_encoding: Encoding::RLE,
1419                rep_level_encoding: Encoding::RLE,
1420                statistics: Some(Statistics::int32(Some(1), Some(3), None, Some(7), true)),
1421            },
1422            Page::DataPageV2 {
1423                buf: Bytes::from(vec![4; 128]),
1424                num_values: 10,
1425                encoding: Encoding::DELTA_BINARY_PACKED,
1426                num_nulls: 2,
1427                num_rows: 12,
1428                def_levels_byte_len: 24,
1429                rep_levels_byte_len: 32,
1430                is_compressed: false,
1431                statistics: None,
1432            },
1433        ];
1434
1435        test_page_roundtrip(&pages[..], Compression::SNAPPY, Type::INT32);
1436        test_page_roundtrip(&pages[..], Compression::UNCOMPRESSED, Type::INT32);
1437    }
1438
1439    /// Tests writing and reading pages.
1440    /// Physical type is for statistics only, should match any defined statistics type in
1441    /// pages.
1442    fn test_page_roundtrip(pages: &[Page], codec: Compression, physical_type: Type) {
1443        let mut compressed_pages = vec![];
1444        let mut total_num_values = 0i64;
1445        let codec_options = CodecOptionsBuilder::default()
1446            .set_backward_compatible_lz4(false)
1447            .build();
1448        let mut compressor = create_codec(codec, &codec_options).unwrap();
1449
1450        for page in pages {
1451            let uncompressed_len = page.buffer().len();
1452
1453            let compressed_page = match *page {
1454                Page::DataPage {
1455                    ref buf,
1456                    num_values,
1457                    encoding,
1458                    def_level_encoding,
1459                    rep_level_encoding,
1460                    ref statistics,
1461                } => {
1462                    total_num_values += num_values as i64;
1463                    let output_buf = compress_helper(compressor.as_mut(), buf);
1464
1465                    Page::DataPage {
1466                        buf: Bytes::from(output_buf),
1467                        num_values,
1468                        encoding,
1469                        def_level_encoding,
1470                        rep_level_encoding,
1471                        statistics: from_thrift(physical_type, to_thrift(statistics.as_ref()))
1472                            .unwrap(),
1473                    }
1474                }
1475                Page::DataPageV2 {
1476                    ref buf,
1477                    num_values,
1478                    encoding,
1479                    num_nulls,
1480                    num_rows,
1481                    def_levels_byte_len,
1482                    rep_levels_byte_len,
1483                    ref statistics,
1484                    ..
1485                } => {
1486                    total_num_values += num_values as i64;
1487                    let offset = (def_levels_byte_len + rep_levels_byte_len) as usize;
1488                    let cmp_buf = compress_helper(compressor.as_mut(), &buf[offset..]);
1489                    let mut output_buf = Vec::from(&buf[..offset]);
1490                    output_buf.extend_from_slice(&cmp_buf[..]);
1491
1492                    Page::DataPageV2 {
1493                        buf: Bytes::from(output_buf),
1494                        num_values,
1495                        encoding,
1496                        num_nulls,
1497                        num_rows,
1498                        def_levels_byte_len,
1499                        rep_levels_byte_len,
1500                        is_compressed: compressor.is_some(),
1501                        statistics: from_thrift(physical_type, to_thrift(statistics.as_ref()))
1502                            .unwrap(),
1503                    }
1504                }
1505                Page::DictionaryPage {
1506                    ref buf,
1507                    num_values,
1508                    encoding,
1509                    is_sorted,
1510                } => {
1511                    let output_buf = compress_helper(compressor.as_mut(), buf);
1512
1513                    Page::DictionaryPage {
1514                        buf: Bytes::from(output_buf),
1515                        num_values,
1516                        encoding,
1517                        is_sorted,
1518                    }
1519                }
1520            };
1521
1522            let compressed_page = CompressedPage::new(compressed_page, uncompressed_len);
1523            compressed_pages.push(compressed_page);
1524        }
1525
1526        let mut buffer: Vec<u8> = vec![];
1527        let mut result_pages: Vec<Page> = vec![];
1528        {
1529            let mut writer = TrackedWrite::new(&mut buffer);
1530            let mut page_writer = SerializedPageWriter::new(&mut writer);
1531
1532            for page in compressed_pages {
1533                page_writer.write_page(page).unwrap();
1534            }
1535            page_writer.close().unwrap();
1536        }
1537        {
1538            let reader = bytes::Bytes::from(buffer);
1539
1540            let t = types::Type::primitive_type_builder("t", physical_type)
1541                .build()
1542                .unwrap();
1543
1544            let desc = ColumnDescriptor::new(Arc::new(t), 0, 0, ColumnPath::new(vec![]));
1545            let meta = ColumnChunkMetaData::builder(Arc::new(desc))
1546                .set_compression(codec)
1547                .set_total_compressed_size(reader.len() as i64)
1548                .set_num_values(total_num_values)
1549                .build()
1550                .unwrap();
1551
1552            let props = ReaderProperties::builder()
1553                .set_backward_compatible_lz4(false)
1554                .build();
1555            let mut page_reader = SerializedPageReader::new_with_properties(
1556                Arc::new(reader),
1557                &meta,
1558                total_num_values as usize,
1559                None,
1560                Arc::new(props),
1561            )
1562            .unwrap();
1563
1564            while let Some(page) = page_reader.get_next_page().unwrap() {
1565                result_pages.push(page);
1566            }
1567        }
1568
1569        assert_eq!(result_pages.len(), pages.len());
1570        for i in 0..result_pages.len() {
1571            assert_page(&result_pages[i], &pages[i]);
1572        }
1573    }
1574
1575    /// Helper function to compress a slice
1576    fn compress_helper(compressor: Option<&mut Box<dyn Codec>>, data: &[u8]) -> Vec<u8> {
1577        let mut output_buf = vec![];
1578        if let Some(cmpr) = compressor {
1579            cmpr.compress(data, &mut output_buf).unwrap();
1580        } else {
1581            output_buf.extend_from_slice(data);
1582        }
1583        output_buf
1584    }
1585
1586    /// Check if pages match.
1587    fn assert_page(left: &Page, right: &Page) {
1588        assert_eq!(left.page_type(), right.page_type());
1589        assert_eq!(&left.buffer(), &right.buffer());
1590        assert_eq!(left.num_values(), right.num_values());
1591        assert_eq!(left.encoding(), right.encoding());
1592        assert_eq!(to_thrift(left.statistics()), to_thrift(right.statistics()));
1593    }
1594
1595    /// Tests roundtrip of i32 data written using `W` and read using `R`
1596    fn test_roundtrip_i32<W, R>(
1597        file: W,
1598        data: Vec<Vec<i32>>,
1599        compression: Compression,
1600    ) -> crate::format::FileMetaData
1601    where
1602        W: Write + Send,
1603        R: ChunkReader + From<W> + 'static,
1604    {
1605        test_roundtrip::<W, R, Int32Type, _>(file, data, |r| r.get_int(0).unwrap(), compression)
1606    }
1607
1608    /// Tests roundtrip of data of type `D` written using `W` and read using `R`
1609    /// and the provided `values` function
1610    fn test_roundtrip<W, R, D, F>(
1611        mut file: W,
1612        data: Vec<Vec<D::T>>,
1613        value: F,
1614        compression: Compression,
1615    ) -> crate::format::FileMetaData
1616    where
1617        W: Write + Send,
1618        R: ChunkReader + From<W> + 'static,
1619        D: DataType,
1620        F: Fn(Row) -> D::T,
1621    {
1622        let schema = Arc::new(
1623            types::Type::group_type_builder("schema")
1624                .with_fields(vec![Arc::new(
1625                    types::Type::primitive_type_builder("col1", D::get_physical_type())
1626                        .with_repetition(Repetition::REQUIRED)
1627                        .build()
1628                        .unwrap(),
1629                )])
1630                .build()
1631                .unwrap(),
1632        );
1633        let props = Arc::new(
1634            WriterProperties::builder()
1635                .set_compression(compression)
1636                .build(),
1637        );
1638        let mut file_writer = SerializedFileWriter::new(&mut file, schema, props).unwrap();
1639        let mut rows: i64 = 0;
1640
1641        for (idx, subset) in data.iter().enumerate() {
1642            let row_group_file_offset = file_writer.buf.bytes_written();
1643            let mut row_group_writer = file_writer.next_row_group().unwrap();
1644            if let Some(mut writer) = row_group_writer.next_column().unwrap() {
1645                rows += writer
1646                    .typed::<D>()
1647                    .write_batch(&subset[..], None, None)
1648                    .unwrap() as i64;
1649                writer.close().unwrap();
1650            }
1651            let last_group = row_group_writer.close().unwrap();
1652            let flushed = file_writer.flushed_row_groups();
1653            assert_eq!(flushed.len(), idx + 1);
1654            assert_eq!(Some(idx as i16), last_group.ordinal());
1655            assert_eq!(Some(row_group_file_offset as i64), last_group.file_offset());
1656            assert_eq!(&flushed[idx], last_group.as_ref());
1657        }
1658        let file_metadata = file_writer.close().unwrap();
1659
1660        let reader = SerializedFileReader::new(R::from(file)).unwrap();
1661        assert_eq!(reader.num_row_groups(), data.len());
1662        assert_eq!(
1663            reader.metadata().file_metadata().num_rows(),
1664            rows,
1665            "row count in metadata not equal to number of rows written"
1666        );
1667        for (i, item) in data.iter().enumerate().take(reader.num_row_groups()) {
1668            let row_group_reader = reader.get_row_group(i).unwrap();
1669            let iter = row_group_reader.get_row_iter(None).unwrap();
1670            let res: Vec<_> = iter.map(|row| row.unwrap()).map(&value).collect();
1671            let row_group_size = row_group_reader.metadata().total_byte_size();
1672            let uncompressed_size: i64 = row_group_reader
1673                .metadata()
1674                .columns()
1675                .iter()
1676                .map(|v| v.uncompressed_size())
1677                .sum();
1678            assert_eq!(row_group_size, uncompressed_size);
1679            assert_eq!(res, *item);
1680        }
1681        file_metadata
1682    }
1683
1684    /// File write-read roundtrip.
1685    /// `data` consists of arrays of values for each row group.
1686    fn test_file_roundtrip(file: File, data: Vec<Vec<i32>>) -> crate::format::FileMetaData {
1687        test_roundtrip_i32::<File, File>(file, data, Compression::UNCOMPRESSED)
1688    }
1689
1690    #[test]
1691    fn test_bytes_writer_empty_row_groups() {
1692        test_bytes_roundtrip(vec![], Compression::UNCOMPRESSED);
1693    }
1694
1695    #[test]
1696    fn test_bytes_writer_single_row_group() {
1697        test_bytes_roundtrip(vec![vec![1, 2, 3, 4, 5]], Compression::UNCOMPRESSED);
1698    }
1699
1700    #[test]
1701    fn test_bytes_writer_multiple_row_groups() {
1702        test_bytes_roundtrip(
1703            vec![
1704                vec![1, 2, 3, 4, 5],
1705                vec![1, 2, 3],
1706                vec![1],
1707                vec![1, 2, 3, 4, 5, 6],
1708            ],
1709            Compression::UNCOMPRESSED,
1710        );
1711    }
1712
1713    #[test]
1714    fn test_bytes_writer_single_row_group_compressed() {
1715        test_bytes_roundtrip(vec![vec![1, 2, 3, 4, 5]], Compression::SNAPPY);
1716    }
1717
1718    #[test]
1719    fn test_bytes_writer_multiple_row_groups_compressed() {
1720        test_bytes_roundtrip(
1721            vec![
1722                vec![1, 2, 3, 4, 5],
1723                vec![1, 2, 3],
1724                vec![1],
1725                vec![1, 2, 3, 4, 5, 6],
1726            ],
1727            Compression::SNAPPY,
1728        );
1729    }
1730
1731    fn test_bytes_roundtrip(data: Vec<Vec<i32>>, compression: Compression) {
1732        test_roundtrip_i32::<Vec<u8>, Bytes>(Vec::with_capacity(1024), data, compression);
1733    }
1734
1735    #[test]
1736    fn test_boolean_roundtrip() {
1737        let my_bool_values: Vec<_> = (0..2049).map(|idx| idx % 2 == 0).collect();
1738        test_roundtrip::<Vec<u8>, Bytes, BoolType, _>(
1739            Vec::with_capacity(1024),
1740            vec![my_bool_values],
1741            |r| r.get_bool(0).unwrap(),
1742            Compression::UNCOMPRESSED,
1743        );
1744    }
1745
1746    #[test]
1747    fn test_boolean_compressed_roundtrip() {
1748        let my_bool_values: Vec<_> = (0..2049).map(|idx| idx % 2 == 0).collect();
1749        test_roundtrip::<Vec<u8>, Bytes, BoolType, _>(
1750            Vec::with_capacity(1024),
1751            vec![my_bool_values],
1752            |r| r.get_bool(0).unwrap(),
1753            Compression::SNAPPY,
1754        );
1755    }
1756
1757    #[test]
1758    fn test_column_offset_index_file() {
1759        let file = tempfile::tempfile().unwrap();
1760        let file_metadata = test_file_roundtrip(file, vec![vec![1, 2, 3, 4, 5]]);
1761        file_metadata.row_groups.iter().for_each(|row_group| {
1762            row_group.columns.iter().for_each(|column_chunk| {
1763                assert_ne!(None, column_chunk.column_index_offset);
1764                assert_ne!(None, column_chunk.column_index_length);
1765
1766                assert_ne!(None, column_chunk.offset_index_offset);
1767                assert_ne!(None, column_chunk.offset_index_length);
1768            })
1769        });
1770    }
1771
1772    fn test_kv_metadata(initial_kv: Option<Vec<KeyValue>>, final_kv: Option<Vec<KeyValue>>) {
1773        let schema = Arc::new(
1774            types::Type::group_type_builder("schema")
1775                .with_fields(vec![Arc::new(
1776                    types::Type::primitive_type_builder("col1", Type::INT32)
1777                        .with_repetition(Repetition::REQUIRED)
1778                        .build()
1779                        .unwrap(),
1780                )])
1781                .build()
1782                .unwrap(),
1783        );
1784        let mut out = Vec::with_capacity(1024);
1785        let props = Arc::new(
1786            WriterProperties::builder()
1787                .set_key_value_metadata(initial_kv.clone())
1788                .build(),
1789        );
1790        let mut writer = SerializedFileWriter::new(&mut out, schema, props).unwrap();
1791        let mut row_group_writer = writer.next_row_group().unwrap();
1792        let column = row_group_writer.next_column().unwrap().unwrap();
1793        column.close().unwrap();
1794        row_group_writer.close().unwrap();
1795        if let Some(kvs) = &final_kv {
1796            for kv in kvs {
1797                writer.append_key_value_metadata(kv.clone())
1798            }
1799        }
1800        writer.close().unwrap();
1801
1802        let reader = SerializedFileReader::new(Bytes::from(out)).unwrap();
1803        let metadata = reader.metadata().file_metadata();
1804        let keys = metadata.key_value_metadata();
1805
1806        match (initial_kv, final_kv) {
1807            (Some(a), Some(b)) => {
1808                let keys = keys.unwrap();
1809                assert_eq!(keys.len(), a.len() + b.len());
1810                assert_eq!(&keys[..a.len()], a.as_slice());
1811                assert_eq!(&keys[a.len()..], b.as_slice());
1812            }
1813            (Some(v), None) => assert_eq!(keys.unwrap(), &v),
1814            (None, Some(v)) if !v.is_empty() => assert_eq!(keys.unwrap(), &v),
1815            _ => assert!(keys.is_none()),
1816        }
1817    }
1818
1819    #[test]
1820    fn test_append_metadata() {
1821        let kv1 = KeyValue::new("cupcakes".to_string(), "awesome".to_string());
1822        let kv2 = KeyValue::new("bingo".to_string(), "bongo".to_string());
1823
1824        test_kv_metadata(None, None);
1825        test_kv_metadata(Some(vec![kv1.clone()]), None);
1826        test_kv_metadata(None, Some(vec![kv2.clone()]));
1827        test_kv_metadata(Some(vec![kv1.clone()]), Some(vec![kv2.clone()]));
1828        test_kv_metadata(Some(vec![]), Some(vec![kv2]));
1829        test_kv_metadata(Some(vec![]), Some(vec![]));
1830        test_kv_metadata(Some(vec![kv1]), Some(vec![]));
1831        test_kv_metadata(None, Some(vec![]));
1832    }
1833
1834    #[test]
1835    fn test_backwards_compatible_statistics() {
1836        let message_type = "
1837            message test_schema {
1838                REQUIRED INT32 decimal1 (DECIMAL(8,2));
1839                REQUIRED INT32 i32 (INTEGER(32,true));
1840                REQUIRED INT32 u32 (INTEGER(32,false));
1841            }
1842        ";
1843
1844        let schema = Arc::new(parse_message_type(message_type).unwrap());
1845        let props = Default::default();
1846        let mut writer = SerializedFileWriter::new(vec![], schema, props).unwrap();
1847        let mut row_group_writer = writer.next_row_group().unwrap();
1848
1849        for _ in 0..3 {
1850            let mut writer = row_group_writer.next_column().unwrap().unwrap();
1851            writer
1852                .typed::<Int32Type>()
1853                .write_batch(&[1, 2, 3], None, None)
1854                .unwrap();
1855            writer.close().unwrap();
1856        }
1857        let metadata = row_group_writer.close().unwrap();
1858        writer.close().unwrap();
1859
1860        let thrift = metadata.to_thrift();
1861        let encoded_stats: Vec<_> = thrift
1862            .columns
1863            .into_iter()
1864            .map(|x| x.meta_data.unwrap().statistics.unwrap())
1865            .collect();
1866
1867        // decimal
1868        let s = &encoded_stats[0];
1869        assert_eq!(s.min.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1870        assert_eq!(s.max.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1871        assert_eq!(s.min_value.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1872        assert_eq!(s.max_value.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1873
1874        // i32
1875        let s = &encoded_stats[1];
1876        assert_eq!(s.min.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1877        assert_eq!(s.max.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1878        assert_eq!(s.min_value.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1879        assert_eq!(s.max_value.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1880
1881        // u32
1882        let s = &encoded_stats[2];
1883        assert_eq!(s.min.as_deref(), None);
1884        assert_eq!(s.max.as_deref(), None);
1885        assert_eq!(s.min_value.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
1886        assert_eq!(s.max_value.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
1887    }
1888
1889    #[test]
1890    fn test_spliced_write() {
1891        let message_type = "
1892            message test_schema {
1893                REQUIRED INT32 i32 (INTEGER(32,true));
1894                REQUIRED INT32 u32 (INTEGER(32,false));
1895            }
1896        ";
1897        let schema = Arc::new(parse_message_type(message_type).unwrap());
1898        let props = Arc::new(WriterProperties::builder().build());
1899
1900        let mut file = Vec::with_capacity(1024);
1901        let mut file_writer = SerializedFileWriter::new(&mut file, schema, props.clone()).unwrap();
1902
1903        let columns = file_writer.descr.columns();
1904        let mut column_state: Vec<(_, Option<ColumnCloseResult>)> = columns
1905            .iter()
1906            .map(|_| (TrackedWrite::new(Vec::with_capacity(1024)), None))
1907            .collect();
1908
1909        let mut column_state_slice = column_state.as_mut_slice();
1910        let mut column_writers = Vec::with_capacity(columns.len());
1911        for c in columns {
1912            let ((buf, out), tail) = column_state_slice.split_first_mut().unwrap();
1913            column_state_slice = tail;
1914
1915            let page_writer = Box::new(SerializedPageWriter::new(buf));
1916            let col_writer = get_column_writer(c.clone(), props.clone(), page_writer);
1917            column_writers.push(SerializedColumnWriter::new(
1918                col_writer,
1919                Some(Box::new(|on_close| {
1920                    *out = Some(on_close);
1921                    Ok(())
1922                })),
1923            ));
1924        }
1925
1926        let column_data = [[1, 2, 3, 4], [7, 3, 7, 3]];
1927
1928        // Interleaved writing to the column writers
1929        for (writer, batch) in column_writers.iter_mut().zip(column_data) {
1930            let writer = writer.typed::<Int32Type>();
1931            writer.write_batch(&batch, None, None).unwrap();
1932        }
1933
1934        // Close the column writers
1935        for writer in column_writers {
1936            writer.close().unwrap()
1937        }
1938
1939        // Splice column data into a row group
1940        let mut row_group_writer = file_writer.next_row_group().unwrap();
1941        for (write, close) in column_state {
1942            let buf = Bytes::from(write.into_inner().unwrap());
1943            row_group_writer
1944                .append_column(&buf, close.unwrap())
1945                .unwrap();
1946        }
1947        row_group_writer.close().unwrap();
1948        file_writer.close().unwrap();
1949
1950        // Check data was written correctly
1951        let file = Bytes::from(file);
1952        let test_read = |reader: SerializedFileReader<Bytes>| {
1953            let row_group = reader.get_row_group(0).unwrap();
1954
1955            let mut out = Vec::with_capacity(4);
1956            let c1 = row_group.get_column_reader(0).unwrap();
1957            let mut c1 = get_typed_column_reader::<Int32Type>(c1);
1958            c1.read_records(4, None, None, &mut out).unwrap();
1959            assert_eq!(out, column_data[0]);
1960
1961            out.clear();
1962
1963            let c2 = row_group.get_column_reader(1).unwrap();
1964            let mut c2 = get_typed_column_reader::<Int32Type>(c2);
1965            c2.read_records(4, None, None, &mut out).unwrap();
1966            assert_eq!(out, column_data[1]);
1967        };
1968
1969        let reader = SerializedFileReader::new(file.clone()).unwrap();
1970        test_read(reader);
1971
1972        let options = ReadOptionsBuilder::new().with_page_index().build();
1973        let reader = SerializedFileReader::new_with_options(file, options).unwrap();
1974        test_read(reader);
1975    }
1976
1977    #[test]
1978    fn test_disabled_statistics() {
1979        let message_type = "
1980            message test_schema {
1981                REQUIRED INT32 a;
1982                REQUIRED INT32 b;
1983            }
1984        ";
1985        let schema = Arc::new(parse_message_type(message_type).unwrap());
1986        let props = WriterProperties::builder()
1987            .set_statistics_enabled(EnabledStatistics::None)
1988            .set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
1989            .set_offset_index_disabled(true) // this should be ignored because of the line above
1990            .build();
1991        let mut file = Vec::with_capacity(1024);
1992        let mut file_writer =
1993            SerializedFileWriter::new(&mut file, schema, Arc::new(props)).unwrap();
1994
1995        let mut row_group_writer = file_writer.next_row_group().unwrap();
1996        let mut a_writer = row_group_writer.next_column().unwrap().unwrap();
1997        let col_writer = a_writer.typed::<Int32Type>();
1998        col_writer.write_batch(&[1, 2, 3], None, None).unwrap();
1999        a_writer.close().unwrap();
2000
2001        let mut b_writer = row_group_writer.next_column().unwrap().unwrap();
2002        let col_writer = b_writer.typed::<Int32Type>();
2003        col_writer.write_batch(&[4, 5, 6], None, None).unwrap();
2004        b_writer.close().unwrap();
2005        row_group_writer.close().unwrap();
2006
2007        let metadata = file_writer.finish().unwrap();
2008        assert_eq!(metadata.row_groups.len(), 1);
2009        let row_group = &metadata.row_groups[0];
2010        assert_eq!(row_group.columns.len(), 2);
2011        // Column "a" has both offset and column index, as requested
2012        assert!(row_group.columns[0].offset_index_offset.is_some());
2013        assert!(row_group.columns[0].column_index_offset.is_some());
2014        // Column "b" should only have offset index
2015        assert!(row_group.columns[1].offset_index_offset.is_some());
2016        assert!(row_group.columns[1].column_index_offset.is_none());
2017
2018        let err = file_writer.next_row_group().err().unwrap().to_string();
2019        assert_eq!(err, "Parquet error: SerializedFileWriter already finished");
2020
2021        drop(file_writer);
2022
2023        let options = ReadOptionsBuilder::new().with_page_index().build();
2024        let reader = SerializedFileReader::new_with_options(Bytes::from(file), options).unwrap();
2025
2026        let offset_index = reader.metadata().offset_index().unwrap();
2027        assert_eq!(offset_index.len(), 1); // 1 row group
2028        assert_eq!(offset_index[0].len(), 2); // 2 columns
2029
2030        let column_index = reader.metadata().column_index().unwrap();
2031        assert_eq!(column_index.len(), 1); // 1 row group
2032        assert_eq!(column_index[0].len(), 2); // 2 column
2033
2034        let a_idx = &column_index[0][0];
2035        assert!(matches!(a_idx, Index::INT32(_)), "{a_idx:?}");
2036        let b_idx = &column_index[0][1];
2037        assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
2038    }
2039
2040    #[test]
2041    fn test_byte_array_size_statistics() {
2042        let message_type = "
2043            message test_schema {
2044                OPTIONAL BYTE_ARRAY a (UTF8);
2045            }
2046        ";
2047        let schema = Arc::new(parse_message_type(message_type).unwrap());
2048        let data = ByteArrayType::gen_vec(32, 7);
2049        let def_levels = [1, 1, 1, 1, 0, 1, 0, 1, 0, 1];
2050        let unenc_size: i64 = data.iter().map(|x| x.len() as i64).sum();
2051        let file: File = tempfile::tempfile().unwrap();
2052        let props = Arc::new(
2053            WriterProperties::builder()
2054                .set_statistics_enabled(EnabledStatistics::Page)
2055                .build(),
2056        );
2057
2058        let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
2059        let mut row_group_writer = writer.next_row_group().unwrap();
2060
2061        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
2062        col_writer
2063            .typed::<ByteArrayType>()
2064            .write_batch(&data, Some(&def_levels), None)
2065            .unwrap();
2066        col_writer.close().unwrap();
2067        row_group_writer.close().unwrap();
2068        let file_metadata = writer.close().unwrap();
2069
2070        assert_eq!(file_metadata.row_groups.len(), 1);
2071        assert_eq!(file_metadata.row_groups[0].columns.len(), 1);
2072        assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
2073
2074        let check_def_hist = |def_hist: &[i64]| {
2075            assert_eq!(def_hist.len(), 2);
2076            assert_eq!(def_hist[0], 3);
2077            assert_eq!(def_hist[1], 7);
2078        };
2079
2080        assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
2081        let meta_data = file_metadata.row_groups[0].columns[0]
2082            .meta_data
2083            .as_ref()
2084            .unwrap();
2085        assert!(meta_data.size_statistics.is_some());
2086        let size_stats = meta_data.size_statistics.as_ref().unwrap();
2087
2088        assert!(size_stats.repetition_level_histogram.is_none());
2089        assert!(size_stats.definition_level_histogram.is_some());
2090        assert!(size_stats.unencoded_byte_array_data_bytes.is_some());
2091        assert_eq!(
2092            unenc_size,
2093            size_stats.unencoded_byte_array_data_bytes.unwrap()
2094        );
2095        check_def_hist(size_stats.definition_level_histogram.as_ref().unwrap());
2096
2097        // check that the read metadata is also correct
2098        let options = ReadOptionsBuilder::new().with_page_index().build();
2099        let reader = SerializedFileReader::new_with_options(file, options).unwrap();
2100
2101        let rfile_metadata = reader.metadata().file_metadata();
2102        assert_eq!(rfile_metadata.num_rows(), file_metadata.num_rows);
2103        assert_eq!(reader.num_row_groups(), 1);
2104        let rowgroup = reader.get_row_group(0).unwrap();
2105        assert_eq!(rowgroup.num_columns(), 1);
2106        let column = rowgroup.metadata().column(0);
2107        assert!(column.definition_level_histogram().is_some());
2108        assert!(column.repetition_level_histogram().is_none());
2109        assert!(column.unencoded_byte_array_data_bytes().is_some());
2110        check_def_hist(column.definition_level_histogram().unwrap().values());
2111        assert_eq!(
2112            unenc_size,
2113            column.unencoded_byte_array_data_bytes().unwrap()
2114        );
2115
2116        // check histogram in column index as well
2117        assert!(reader.metadata().column_index().is_some());
2118        let column_index = reader.metadata().column_index().unwrap();
2119        assert_eq!(column_index.len(), 1);
2120        assert_eq!(column_index[0].len(), 1);
2121        let col_idx = if let Index::BYTE_ARRAY(index) = &column_index[0][0] {
2122            assert_eq!(index.indexes.len(), 1);
2123            &index.indexes[0]
2124        } else {
2125            unreachable!()
2126        };
2127
2128        assert!(col_idx.repetition_level_histogram().is_none());
2129        assert!(col_idx.definition_level_histogram().is_some());
2130        check_def_hist(col_idx.definition_level_histogram().unwrap().values());
2131
2132        assert!(reader.metadata().offset_index().is_some());
2133        let offset_index = reader.metadata().offset_index().unwrap();
2134        assert_eq!(offset_index.len(), 1);
2135        assert_eq!(offset_index[0].len(), 1);
2136        assert!(offset_index[0][0].unencoded_byte_array_data_bytes.is_some());
2137        let page_sizes = offset_index[0][0]
2138            .unencoded_byte_array_data_bytes
2139            .as_ref()
2140            .unwrap();
2141        assert_eq!(page_sizes.len(), 1);
2142        assert_eq!(page_sizes[0], unenc_size);
2143    }
2144
2145    #[test]
2146    fn test_too_many_rowgroups() {
2147        let message_type = "
2148            message test_schema {
2149                REQUIRED BYTE_ARRAY a (UTF8);
2150            }
2151        ";
2152        let schema = Arc::new(parse_message_type(message_type).unwrap());
2153        let file: File = tempfile::tempfile().unwrap();
2154        let props = Arc::new(
2155            WriterProperties::builder()
2156                .set_statistics_enabled(EnabledStatistics::None)
2157                .set_max_row_group_size(1)
2158                .build(),
2159        );
2160        let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
2161
2162        // Create 32k empty rowgroups. Should error when i == 32768.
2163        for i in 0..0x8001 {
2164            match writer.next_row_group() {
2165                Ok(mut row_group_writer) => {
2166                    assert_ne!(i, 0x8000);
2167                    let col_writer = row_group_writer.next_column().unwrap().unwrap();
2168                    col_writer.close().unwrap();
2169                    row_group_writer.close().unwrap();
2170                }
2171                Err(e) => {
2172                    assert_eq!(i, 0x8000);
2173                    assert_eq!(
2174                        e.to_string(),
2175                        "Parquet error: Parquet does not support more than 32767 row groups per file (currently: 32768)"
2176                    );
2177                }
2178            }
2179        }
2180        writer.close().unwrap();
2181    }
2182
2183    #[test]
2184    fn test_size_statistics_with_repetition_and_nulls() {
2185        let message_type = "
2186            message test_schema {
2187                OPTIONAL group i32_list (LIST) {
2188                    REPEATED group list {
2189                        OPTIONAL INT32 element;
2190                    }
2191                }
2192            }
2193        ";
2194        // column is:
2195        // row 0: [1, 2]
2196        // row 1: NULL
2197        // row 2: [4, NULL]
2198        // row 3: []
2199        // row 4: [7, 8, 9, 10]
2200        let schema = Arc::new(parse_message_type(message_type).unwrap());
2201        let data = [1, 2, 4, 7, 8, 9, 10];
2202        let def_levels = [3, 3, 0, 3, 2, 1, 3, 3, 3, 3];
2203        let rep_levels = [0, 1, 0, 0, 1, 0, 0, 1, 1, 1];
2204        let file = tempfile::tempfile().unwrap();
2205        let props = Arc::new(
2206            WriterProperties::builder()
2207                .set_statistics_enabled(EnabledStatistics::Page)
2208                .build(),
2209        );
2210        let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
2211        let mut row_group_writer = writer.next_row_group().unwrap();
2212
2213        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
2214        col_writer
2215            .typed::<Int32Type>()
2216            .write_batch(&data, Some(&def_levels), Some(&rep_levels))
2217            .unwrap();
2218        col_writer.close().unwrap();
2219        row_group_writer.close().unwrap();
2220        let file_metadata = writer.close().unwrap();
2221
2222        assert_eq!(file_metadata.row_groups.len(), 1);
2223        assert_eq!(file_metadata.row_groups[0].columns.len(), 1);
2224        assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
2225
2226        let check_def_hist = |def_hist: &[i64]| {
2227            assert_eq!(def_hist.len(), 4);
2228            assert_eq!(def_hist[0], 1);
2229            assert_eq!(def_hist[1], 1);
2230            assert_eq!(def_hist[2], 1);
2231            assert_eq!(def_hist[3], 7);
2232        };
2233
2234        let check_rep_hist = |rep_hist: &[i64]| {
2235            assert_eq!(rep_hist.len(), 2);
2236            assert_eq!(rep_hist[0], 5);
2237            assert_eq!(rep_hist[1], 5);
2238        };
2239
2240        // check that histograms are set properly in the write and read metadata
2241        // also check that unencoded_byte_array_data_bytes is not set
2242        assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
2243        let meta_data = file_metadata.row_groups[0].columns[0]
2244            .meta_data
2245            .as_ref()
2246            .unwrap();
2247        assert!(meta_data.size_statistics.is_some());
2248        let size_stats = meta_data.size_statistics.as_ref().unwrap();
2249        assert!(size_stats.repetition_level_histogram.is_some());
2250        assert!(size_stats.definition_level_histogram.is_some());
2251        assert!(size_stats.unencoded_byte_array_data_bytes.is_none());
2252        check_def_hist(size_stats.definition_level_histogram.as_ref().unwrap());
2253        check_rep_hist(size_stats.repetition_level_histogram.as_ref().unwrap());
2254
2255        // check that the read metadata is also correct
2256        let options = ReadOptionsBuilder::new().with_page_index().build();
2257        let reader = SerializedFileReader::new_with_options(file, options).unwrap();
2258
2259        let rfile_metadata = reader.metadata().file_metadata();
2260        assert_eq!(rfile_metadata.num_rows(), file_metadata.num_rows);
2261        assert_eq!(reader.num_row_groups(), 1);
2262        let rowgroup = reader.get_row_group(0).unwrap();
2263        assert_eq!(rowgroup.num_columns(), 1);
2264        let column = rowgroup.metadata().column(0);
2265        assert!(column.definition_level_histogram().is_some());
2266        assert!(column.repetition_level_histogram().is_some());
2267        assert!(column.unencoded_byte_array_data_bytes().is_none());
2268        check_def_hist(column.definition_level_histogram().unwrap().values());
2269        check_rep_hist(column.repetition_level_histogram().unwrap().values());
2270
2271        // check histogram in column index as well
2272        assert!(reader.metadata().column_index().is_some());
2273        let column_index = reader.metadata().column_index().unwrap();
2274        assert_eq!(column_index.len(), 1);
2275        assert_eq!(column_index[0].len(), 1);
2276        let col_idx = if let Index::INT32(index) = &column_index[0][0] {
2277            assert_eq!(index.indexes.len(), 1);
2278            &index.indexes[0]
2279        } else {
2280            unreachable!()
2281        };
2282
2283        check_def_hist(col_idx.definition_level_histogram().unwrap().values());
2284        check_rep_hist(col_idx.repetition_level_histogram().unwrap().values());
2285
2286        assert!(reader.metadata().offset_index().is_some());
2287        let offset_index = reader.metadata().offset_index().unwrap();
2288        assert_eq!(offset_index.len(), 1);
2289        assert_eq!(offset_index[0].len(), 1);
2290        assert!(offset_index[0][0].unencoded_byte_array_data_bytes.is_none());
2291    }
2292
2293    #[test]
2294    #[cfg(feature = "arrow")]
2295    fn test_byte_stream_split_extended_roundtrip() {
2296        let path = format!(
2297            "{}/byte_stream_split_extended.gzip.parquet",
2298            arrow::util::test_util::parquet_test_data(),
2299        );
2300        let file = File::open(path).unwrap();
2301
2302        // Read in test file and rewrite to tmp
2303        let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(file)
2304            .expect("parquet open")
2305            .build()
2306            .expect("parquet open");
2307
2308        let file = tempfile::tempfile().unwrap();
2309        let props = WriterProperties::builder()
2310            .set_dictionary_enabled(false)
2311            .set_column_encoding(
2312                ColumnPath::from("float16_byte_stream_split"),
2313                Encoding::BYTE_STREAM_SPLIT,
2314            )
2315            .set_column_encoding(
2316                ColumnPath::from("float_byte_stream_split"),
2317                Encoding::BYTE_STREAM_SPLIT,
2318            )
2319            .set_column_encoding(
2320                ColumnPath::from("double_byte_stream_split"),
2321                Encoding::BYTE_STREAM_SPLIT,
2322            )
2323            .set_column_encoding(
2324                ColumnPath::from("int32_byte_stream_split"),
2325                Encoding::BYTE_STREAM_SPLIT,
2326            )
2327            .set_column_encoding(
2328                ColumnPath::from("int64_byte_stream_split"),
2329                Encoding::BYTE_STREAM_SPLIT,
2330            )
2331            .set_column_encoding(
2332                ColumnPath::from("flba5_byte_stream_split"),
2333                Encoding::BYTE_STREAM_SPLIT,
2334            )
2335            .set_column_encoding(
2336                ColumnPath::from("decimal_byte_stream_split"),
2337                Encoding::BYTE_STREAM_SPLIT,
2338            )
2339            .build();
2340
2341        let mut parquet_writer = ArrowWriter::try_new(
2342            file.try_clone().expect("cannot open file"),
2343            parquet_reader.schema(),
2344            Some(props),
2345        )
2346        .expect("create arrow writer");
2347
2348        for maybe_batch in parquet_reader {
2349            let batch = maybe_batch.expect("reading batch");
2350            parquet_writer.write(&batch).expect("writing data");
2351        }
2352
2353        parquet_writer.close().expect("finalizing file");
2354
2355        let reader = SerializedFileReader::new(file).expect("Failed to create reader");
2356        let filemeta = reader.metadata();
2357
2358        // Make sure byte_stream_split encoding was used
2359        let check_encoding = |x: usize, filemeta: &ParquetMetaData| {
2360            assert!(filemeta
2361                .row_group(0)
2362                .column(x)
2363                .encodings()
2364                .contains(&Encoding::BYTE_STREAM_SPLIT));
2365        };
2366
2367        check_encoding(1, filemeta);
2368        check_encoding(3, filemeta);
2369        check_encoding(5, filemeta);
2370        check_encoding(7, filemeta);
2371        check_encoding(9, filemeta);
2372        check_encoding(11, filemeta);
2373        check_encoding(13, filemeta);
2374
2375        // Read back tmpfile and make sure all values are correct
2376        let mut iter = reader
2377            .get_row_iter(None)
2378            .expect("Failed to create row iterator");
2379
2380        let mut start = 0;
2381        let end = reader.metadata().file_metadata().num_rows();
2382
2383        let check_row = |row: Result<Row, ParquetError>| {
2384            assert!(row.is_ok());
2385            let r = row.unwrap();
2386            assert_eq!(r.get_float16(0).unwrap(), r.get_float16(1).unwrap());
2387            assert_eq!(r.get_float(2).unwrap(), r.get_float(3).unwrap());
2388            assert_eq!(r.get_double(4).unwrap(), r.get_double(5).unwrap());
2389            assert_eq!(r.get_int(6).unwrap(), r.get_int(7).unwrap());
2390            assert_eq!(r.get_long(8).unwrap(), r.get_long(9).unwrap());
2391            assert_eq!(r.get_bytes(10).unwrap(), r.get_bytes(11).unwrap());
2392            assert_eq!(r.get_decimal(12).unwrap(), r.get_decimal(13).unwrap());
2393        };
2394
2395        while start < end {
2396            match iter.next() {
2397                Some(row) => check_row(row),
2398                None => break,
2399            };
2400            start += 1;
2401        }
2402    }
2403}