parquet/file/metadata/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::errors::Result;
19use crate::file::metadata::{KeyValue, ParquetMetaData};
20use crate::file::page_index::index::Index;
21use crate::file::writer::TrackedWrite;
22use crate::file::PARQUET_MAGIC;
23use crate::format::{ColumnIndex, OffsetIndex, RowGroup};
24use crate::schema::types;
25use crate::schema::types::{SchemaDescPtr, SchemaDescriptor, TypePtr};
26use crate::thrift::TSerializable;
27use std::io::Write;
28use std::sync::Arc;
29use thrift::protocol::TCompactOutputProtocol;
30
31/// Writes `crate::file::metadata` structures to a thrift encoded byte stream
32///
33/// See [`ParquetMetaDataWriter`] for background and example.
34pub(crate) struct ThriftMetadataWriter<'a, W: Write> {
35    buf: &'a mut TrackedWrite<W>,
36    schema: &'a TypePtr,
37    schema_descr: &'a SchemaDescPtr,
38    row_groups: Vec<RowGroup>,
39    column_indexes: Option<&'a [Vec<Option<ColumnIndex>>]>,
40    offset_indexes: Option<&'a [Vec<Option<OffsetIndex>>]>,
41    key_value_metadata: Option<Vec<KeyValue>>,
42    created_by: Option<String>,
43    writer_version: i32,
44}
45
46impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
47    /// Serialize all the offset indexes to `self.buf`,
48    ///
49    /// Note: also updates the `ColumnChunk::offset_index_offset` and
50    /// `ColumnChunk::offset_index_length` to reflect the position and length
51    /// of the serialized offset indexes.
52    fn write_offset_indexes(&mut self, offset_indexes: &[Vec<Option<OffsetIndex>>]) -> Result<()> {
53        // iter row group
54        // iter each column
55        // write offset index to the file
56        for (row_group_idx, row_group) in self.row_groups.iter_mut().enumerate() {
57            for (column_idx, column_metadata) in row_group.columns.iter_mut().enumerate() {
58                if let Some(offset_index) = &offset_indexes[row_group_idx][column_idx] {
59                    let start_offset = self.buf.bytes_written();
60                    let mut protocol = TCompactOutputProtocol::new(&mut self.buf);
61                    offset_index.write_to_out_protocol(&mut protocol)?;
62                    let end_offset = self.buf.bytes_written();
63                    // set offset and index for offset index
64                    column_metadata.offset_index_offset = Some(start_offset as i64);
65                    column_metadata.offset_index_length = Some((end_offset - start_offset) as i32);
66                }
67            }
68        }
69        Ok(())
70    }
71
72    /// Serialize all the column indexes to the `self.buf`
73    ///
74    /// Note: also updates the `ColumnChunk::column_index_offset` and
75    /// `ColumnChunk::column_index_length` to reflect the position and length
76    /// of the serialized column indexes.
77    fn write_column_indexes(&mut self, column_indexes: &[Vec<Option<ColumnIndex>>]) -> Result<()> {
78        // iter row group
79        // iter each column
80        // write column index to the file
81        for (row_group_idx, row_group) in self.row_groups.iter_mut().enumerate() {
82            for (column_idx, column_metadata) in row_group.columns.iter_mut().enumerate() {
83                if let Some(column_index) = &column_indexes[row_group_idx][column_idx] {
84                    let start_offset = self.buf.bytes_written();
85                    let mut protocol = TCompactOutputProtocol::new(&mut self.buf);
86                    column_index.write_to_out_protocol(&mut protocol)?;
87                    let end_offset = self.buf.bytes_written();
88                    // set offset and index for offset index
89                    column_metadata.column_index_offset = Some(start_offset as i64);
90                    column_metadata.column_index_length = Some((end_offset - start_offset) as i32);
91                }
92            }
93        }
94        Ok(())
95    }
96
97    /// Assembles and writes the final metadata to self.buf
98    pub fn finish(mut self) -> Result<crate::format::FileMetaData> {
99        let num_rows = self.row_groups.iter().map(|x| x.num_rows).sum();
100
101        // Write column indexes and offset indexes
102        if let Some(column_indexes) = self.column_indexes {
103            self.write_column_indexes(column_indexes)?;
104        }
105        if let Some(offset_indexes) = self.offset_indexes {
106            self.write_offset_indexes(offset_indexes)?;
107        }
108
109        // We only include ColumnOrder for leaf nodes.
110        // Currently only supported ColumnOrder is TypeDefinedOrder so we set this
111        // for all leaf nodes.
112        // Even if the column has an undefined sort order, such as INTERVAL, this
113        // is still technically the defined TYPEORDER so it should still be set.
114        let column_orders = (0..self.schema_descr.num_columns())
115            .map(|_| crate::format::ColumnOrder::TYPEORDER(crate::format::TypeDefinedOrder {}))
116            .collect();
117        // This field is optional, perhaps in cases where no min/max fields are set
118        // in any Statistics or ColumnIndex object in the whole file.
119        // But for simplicity we always set this field.
120        let column_orders = Some(column_orders);
121
122        let file_metadata = crate::format::FileMetaData {
123            num_rows,
124            row_groups: self.row_groups,
125            key_value_metadata: self.key_value_metadata.clone(),
126            version: self.writer_version,
127            schema: types::to_thrift(self.schema.as_ref())?,
128            created_by: self.created_by.clone(),
129            column_orders,
130            encryption_algorithm: None,
131            footer_signing_key_metadata: None,
132        };
133
134        // Write file metadata
135        let start_pos = self.buf.bytes_written();
136        {
137            let mut protocol = TCompactOutputProtocol::new(&mut self.buf);
138            file_metadata.write_to_out_protocol(&mut protocol)?;
139        }
140        let end_pos = self.buf.bytes_written();
141
142        // Write footer
143        let metadata_len = (end_pos - start_pos) as u32;
144
145        self.buf.write_all(&metadata_len.to_le_bytes())?;
146        self.buf.write_all(&PARQUET_MAGIC)?;
147        Ok(file_metadata)
148    }
149
150    pub fn new(
151        buf: &'a mut TrackedWrite<W>,
152        schema: &'a TypePtr,
153        schema_descr: &'a SchemaDescPtr,
154        row_groups: Vec<RowGroup>,
155        created_by: Option<String>,
156        writer_version: i32,
157    ) -> Self {
158        Self {
159            buf,
160            schema,
161            schema_descr,
162            row_groups,
163            column_indexes: None,
164            offset_indexes: None,
165            key_value_metadata: None,
166            created_by,
167            writer_version,
168        }
169    }
170
171    pub fn with_column_indexes(mut self, column_indexes: &'a [Vec<Option<ColumnIndex>>]) -> Self {
172        self.column_indexes = Some(column_indexes);
173        self
174    }
175
176    pub fn with_offset_indexes(mut self, offset_indexes: &'a [Vec<Option<OffsetIndex>>]) -> Self {
177        self.offset_indexes = Some(offset_indexes);
178        self
179    }
180
181    pub fn with_key_value_metadata(mut self, key_value_metadata: Vec<KeyValue>) -> Self {
182        self.key_value_metadata = Some(key_value_metadata);
183        self
184    }
185}
186
187/// Writes [`ParquetMetaData`] to a byte stream
188///
189/// This structure handles the details of writing the various parts of Parquet
190/// metadata into a byte stream. It is used to write the metadata into a parquet
191/// file and can also write metadata into other locations (such as a store of
192/// bytes).
193///
194/// # Discussion
195///
196/// The process of writing Parquet metadata is tricky because the
197/// metadata is not stored as a single inline thrift structure. It can have
198/// several "out of band" structures such as the [`OffsetIndex`] and
199/// BloomFilters stored in separate structures whose locations are stored as
200/// offsets from the beginning of the file.
201///
202/// Note: this writer does not directly write BloomFilters. In order to write
203/// BloomFilters, write the bloom filters into the buffer before creating the
204/// metadata writer. Then set the corresponding `bloom_filter_offset` and
205/// `bloom_filter_length` on [`ColumnChunkMetaData`] passed to this writer.
206///
207/// # Output Format
208///
209/// The format of the metadata is as follows:
210///
211/// 1. Optional [`ColumnIndex`] (thrift encoded)
212/// 2. Optional [`OffsetIndex`] (thrift encoded)
213/// 3. [`FileMetaData`] (thrift encoded)
214/// 4. Length of encoded `FileMetaData` (4 bytes, little endian)
215/// 5. Parquet Magic Bytes (4 bytes)
216///
217/// [`FileMetaData`]: crate::format::FileMetaData
218/// [`ColumnChunkMetaData`]: crate::file::metadata::ColumnChunkMetaData
219///
220/// ```text
221/// ┌──────────────────────┐
222/// │                      │
223/// │         ...          │
224/// │                      │
225/// │┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ │
226/// │     ColumnIndex     ◀│─ ─ ─
227/// ││    (Optional)     │ │     │
228/// │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  │
229/// │┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ │     │ FileMetadata
230/// │     OffsetIndex      │       contains embedded
231/// ││    (Optional)     │◀┼ ─   │ offsets to
232/// │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  │  │    ColumnIndex and
233/// │╔═══════════════════╗ │     │ OffsetIndex
234/// │║                   ║ │  │
235/// │║                   ║ ┼ ─   │
236/// │║   FileMetadata    ║ │
237/// │║                   ║ ┼ ─ ─ ┘
238/// │║                   ║ │
239/// │╚═══════════════════╝ │
240/// │┌───────────────────┐ │
241/// ││  metadata length  │ │ length of FileMetadata  (only)
242/// │└───────────────────┘ │
243/// │┌───────────────────┐ │
244/// ││      'PAR1'       │ │ Parquet Magic Bytes
245/// │└───────────────────┘ │
246/// └──────────────────────┘
247///      Output Buffer
248/// ```
249///
250/// # Example
251/// ```no_run
252/// # use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataWriter};
253/// # fn get_metadata() -> ParquetMetaData { unimplemented!(); }
254/// // write parquet metadata to an in-memory buffer
255/// let mut buffer = vec![];
256/// let metadata: ParquetMetaData = get_metadata();
257/// let writer = ParquetMetaDataWriter::new(&mut buffer, &metadata);
258/// // write the metadata to the buffer
259/// writer.finish().unwrap();
260/// assert!(!buffer.is_empty());
261/// ```
262pub struct ParquetMetaDataWriter<'a, W: Write> {
263    buf: TrackedWrite<W>,
264    metadata: &'a ParquetMetaData,
265}
266
267impl<'a, W: Write> ParquetMetaDataWriter<'a, W> {
268    /// Create a new `ParquetMetaDataWriter` to write to `buf`
269    ///
270    /// Note any embedded offsets in the metadata will be written assuming the
271    /// metadata is at the start of the buffer. If the metadata is being written
272    /// to a location other than the start of the buffer, see [`Self::new_with_tracked`]
273    ///
274    /// See example on the struct level documentation
275    pub fn new(buf: W, metadata: &'a ParquetMetaData) -> Self {
276        Self::new_with_tracked(TrackedWrite::new(buf), metadata)
277    }
278
279    /// Create a new ParquetMetaDataWriter to write to `buf`
280    ///
281    /// This method is used when the metadata is being written to a location other
282    /// than the start of the buffer.
283    ///
284    /// See example on the struct level documentation
285    pub fn new_with_tracked(buf: TrackedWrite<W>, metadata: &'a ParquetMetaData) -> Self {
286        Self { buf, metadata }
287    }
288
289    /// Write the metadata to the buffer
290    pub fn finish(mut self) -> Result<()> {
291        let file_metadata = self.metadata.file_metadata();
292
293        let schema = Arc::new(file_metadata.schema().clone());
294        let schema_descr = Arc::new(SchemaDescriptor::new(schema.clone()));
295        let created_by = file_metadata.created_by().map(str::to_string);
296
297        let row_groups = self
298            .metadata
299            .row_groups()
300            .iter()
301            .map(|rg| rg.to_thrift())
302            .collect::<Vec<_>>();
303
304        let key_value_metadata = file_metadata.key_value_metadata().cloned();
305
306        let column_indexes = self.convert_column_indexes();
307        let offset_indexes = self.convert_offset_index();
308
309        let mut encoder = ThriftMetadataWriter::new(
310            &mut self.buf,
311            &schema,
312            &schema_descr,
313            row_groups,
314            created_by,
315            file_metadata.version(),
316        );
317        encoder = encoder.with_column_indexes(&column_indexes);
318        encoder = encoder.with_offset_indexes(&offset_indexes);
319        if let Some(key_value_metadata) = key_value_metadata {
320            encoder = encoder.with_key_value_metadata(key_value_metadata);
321        }
322        encoder.finish()?;
323
324        Ok(())
325    }
326
327    fn convert_column_indexes(&self) -> Vec<Vec<Option<ColumnIndex>>> {
328        if let Some(row_group_column_indexes) = self.metadata.column_index() {
329            (0..self.metadata.row_groups().len())
330                .map(|rg_idx| {
331                    let column_indexes = &row_group_column_indexes[rg_idx];
332                    column_indexes
333                        .iter()
334                        .map(|column_index| match column_index {
335                            Index::NONE => None,
336                            Index::BOOLEAN(column_index) => Some(column_index.to_thrift()),
337                            Index::BYTE_ARRAY(column_index) => Some(column_index.to_thrift()),
338                            Index::DOUBLE(column_index) => Some(column_index.to_thrift()),
339                            Index::FIXED_LEN_BYTE_ARRAY(column_index) => {
340                                Some(column_index.to_thrift())
341                            }
342                            Index::FLOAT(column_index) => Some(column_index.to_thrift()),
343                            Index::INT32(column_index) => Some(column_index.to_thrift()),
344                            Index::INT64(column_index) => Some(column_index.to_thrift()),
345                            Index::INT96(column_index) => Some(column_index.to_thrift()),
346                        })
347                        .collect()
348                })
349                .collect()
350        } else {
351            // make a None for each row group, for each column
352            self.metadata
353                .row_groups()
354                .iter()
355                .map(|rg| std::iter::repeat(None).take(rg.columns().len()).collect())
356                .collect()
357        }
358    }
359
360    fn convert_offset_index(&self) -> Vec<Vec<Option<OffsetIndex>>> {
361        if let Some(row_group_offset_indexes) = self.metadata.offset_index() {
362            (0..self.metadata.row_groups().len())
363                .map(|rg_idx| {
364                    let offset_indexes = &row_group_offset_indexes[rg_idx];
365                    offset_indexes
366                        .iter()
367                        .map(|offset_index| Some(offset_index.to_thrift()))
368                        .collect()
369                })
370                .collect()
371        } else {
372            // make a None for each row group, for each column
373            self.metadata
374                .row_groups()
375                .iter()
376                .map(|rg| std::iter::repeat(None).take(rg.columns().len()).collect())
377                .collect()
378        }
379    }
380}