parquet/file/metadata/writer.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::errors::Result;
19use crate::file::metadata::{KeyValue, ParquetMetaData};
20use crate::file::page_index::index::Index;
21use crate::file::writer::TrackedWrite;
22use crate::file::PARQUET_MAGIC;
23use crate::format::{ColumnIndex, OffsetIndex, RowGroup};
24use crate::schema::types;
25use crate::schema::types::{SchemaDescPtr, SchemaDescriptor, TypePtr};
26use crate::thrift::TSerializable;
27use std::io::Write;
28use std::sync::Arc;
29use thrift::protocol::TCompactOutputProtocol;
30
31/// Writes `crate::file::metadata` structures to a thrift encoded byte stream
32///
33/// See [`ParquetMetaDataWriter`] for background and example.
34pub(crate) struct ThriftMetadataWriter<'a, W: Write> {
35 buf: &'a mut TrackedWrite<W>,
36 schema: &'a TypePtr,
37 schema_descr: &'a SchemaDescPtr,
38 row_groups: Vec<RowGroup>,
39 column_indexes: Option<&'a [Vec<Option<ColumnIndex>>]>,
40 offset_indexes: Option<&'a [Vec<Option<OffsetIndex>>]>,
41 key_value_metadata: Option<Vec<KeyValue>>,
42 created_by: Option<String>,
43 writer_version: i32,
44}
45
46impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
47 /// Serialize all the offset indexes to `self.buf`,
48 ///
49 /// Note: also updates the `ColumnChunk::offset_index_offset` and
50 /// `ColumnChunk::offset_index_length` to reflect the position and length
51 /// of the serialized offset indexes.
52 fn write_offset_indexes(&mut self, offset_indexes: &[Vec<Option<OffsetIndex>>]) -> Result<()> {
53 // iter row group
54 // iter each column
55 // write offset index to the file
56 for (row_group_idx, row_group) in self.row_groups.iter_mut().enumerate() {
57 for (column_idx, column_metadata) in row_group.columns.iter_mut().enumerate() {
58 if let Some(offset_index) = &offset_indexes[row_group_idx][column_idx] {
59 let start_offset = self.buf.bytes_written();
60 let mut protocol = TCompactOutputProtocol::new(&mut self.buf);
61 offset_index.write_to_out_protocol(&mut protocol)?;
62 let end_offset = self.buf.bytes_written();
63 // set offset and index for offset index
64 column_metadata.offset_index_offset = Some(start_offset as i64);
65 column_metadata.offset_index_length = Some((end_offset - start_offset) as i32);
66 }
67 }
68 }
69 Ok(())
70 }
71
72 /// Serialize all the column indexes to the `self.buf`
73 ///
74 /// Note: also updates the `ColumnChunk::column_index_offset` and
75 /// `ColumnChunk::column_index_length` to reflect the position and length
76 /// of the serialized column indexes.
77 fn write_column_indexes(&mut self, column_indexes: &[Vec<Option<ColumnIndex>>]) -> Result<()> {
78 // iter row group
79 // iter each column
80 // write column index to the file
81 for (row_group_idx, row_group) in self.row_groups.iter_mut().enumerate() {
82 for (column_idx, column_metadata) in row_group.columns.iter_mut().enumerate() {
83 if let Some(column_index) = &column_indexes[row_group_idx][column_idx] {
84 let start_offset = self.buf.bytes_written();
85 let mut protocol = TCompactOutputProtocol::new(&mut self.buf);
86 column_index.write_to_out_protocol(&mut protocol)?;
87 let end_offset = self.buf.bytes_written();
88 // set offset and index for offset index
89 column_metadata.column_index_offset = Some(start_offset as i64);
90 column_metadata.column_index_length = Some((end_offset - start_offset) as i32);
91 }
92 }
93 }
94 Ok(())
95 }
96
97 /// Assembles and writes the final metadata to self.buf
98 pub fn finish(mut self) -> Result<crate::format::FileMetaData> {
99 let num_rows = self.row_groups.iter().map(|x| x.num_rows).sum();
100
101 // Write column indexes and offset indexes
102 if let Some(column_indexes) = self.column_indexes {
103 self.write_column_indexes(column_indexes)?;
104 }
105 if let Some(offset_indexes) = self.offset_indexes {
106 self.write_offset_indexes(offset_indexes)?;
107 }
108
109 // We only include ColumnOrder for leaf nodes.
110 // Currently only supported ColumnOrder is TypeDefinedOrder so we set this
111 // for all leaf nodes.
112 // Even if the column has an undefined sort order, such as INTERVAL, this
113 // is still technically the defined TYPEORDER so it should still be set.
114 let column_orders = (0..self.schema_descr.num_columns())
115 .map(|_| crate::format::ColumnOrder::TYPEORDER(crate::format::TypeDefinedOrder {}))
116 .collect();
117 // This field is optional, perhaps in cases where no min/max fields are set
118 // in any Statistics or ColumnIndex object in the whole file.
119 // But for simplicity we always set this field.
120 let column_orders = Some(column_orders);
121
122 let file_metadata = crate::format::FileMetaData {
123 num_rows,
124 row_groups: self.row_groups,
125 key_value_metadata: self.key_value_metadata.clone(),
126 version: self.writer_version,
127 schema: types::to_thrift(self.schema.as_ref())?,
128 created_by: self.created_by.clone(),
129 column_orders,
130 encryption_algorithm: None,
131 footer_signing_key_metadata: None,
132 };
133
134 // Write file metadata
135 let start_pos = self.buf.bytes_written();
136 {
137 let mut protocol = TCompactOutputProtocol::new(&mut self.buf);
138 file_metadata.write_to_out_protocol(&mut protocol)?;
139 }
140 let end_pos = self.buf.bytes_written();
141
142 // Write footer
143 let metadata_len = (end_pos - start_pos) as u32;
144
145 self.buf.write_all(&metadata_len.to_le_bytes())?;
146 self.buf.write_all(&PARQUET_MAGIC)?;
147 Ok(file_metadata)
148 }
149
150 pub fn new(
151 buf: &'a mut TrackedWrite<W>,
152 schema: &'a TypePtr,
153 schema_descr: &'a SchemaDescPtr,
154 row_groups: Vec<RowGroup>,
155 created_by: Option<String>,
156 writer_version: i32,
157 ) -> Self {
158 Self {
159 buf,
160 schema,
161 schema_descr,
162 row_groups,
163 column_indexes: None,
164 offset_indexes: None,
165 key_value_metadata: None,
166 created_by,
167 writer_version,
168 }
169 }
170
171 pub fn with_column_indexes(mut self, column_indexes: &'a [Vec<Option<ColumnIndex>>]) -> Self {
172 self.column_indexes = Some(column_indexes);
173 self
174 }
175
176 pub fn with_offset_indexes(mut self, offset_indexes: &'a [Vec<Option<OffsetIndex>>]) -> Self {
177 self.offset_indexes = Some(offset_indexes);
178 self
179 }
180
181 pub fn with_key_value_metadata(mut self, key_value_metadata: Vec<KeyValue>) -> Self {
182 self.key_value_metadata = Some(key_value_metadata);
183 self
184 }
185}
186
187/// Writes [`ParquetMetaData`] to a byte stream
188///
189/// This structure handles the details of writing the various parts of Parquet
190/// metadata into a byte stream. It is used to write the metadata into a parquet
191/// file and can also write metadata into other locations (such as a store of
192/// bytes).
193///
194/// # Discussion
195///
196/// The process of writing Parquet metadata is tricky because the
197/// metadata is not stored as a single inline thrift structure. It can have
198/// several "out of band" structures such as the [`OffsetIndex`] and
199/// BloomFilters stored in separate structures whose locations are stored as
200/// offsets from the beginning of the file.
201///
202/// Note: this writer does not directly write BloomFilters. In order to write
203/// BloomFilters, write the bloom filters into the buffer before creating the
204/// metadata writer. Then set the corresponding `bloom_filter_offset` and
205/// `bloom_filter_length` on [`ColumnChunkMetaData`] passed to this writer.
206///
207/// # Output Format
208///
209/// The format of the metadata is as follows:
210///
211/// 1. Optional [`ColumnIndex`] (thrift encoded)
212/// 2. Optional [`OffsetIndex`] (thrift encoded)
213/// 3. [`FileMetaData`] (thrift encoded)
214/// 4. Length of encoded `FileMetaData` (4 bytes, little endian)
215/// 5. Parquet Magic Bytes (4 bytes)
216///
217/// [`FileMetaData`]: crate::format::FileMetaData
218/// [`ColumnChunkMetaData`]: crate::file::metadata::ColumnChunkMetaData
219///
220/// ```text
221/// ┌──────────────────────┐
222/// │ │
223/// │ ... │
224/// │ │
225/// │┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ │
226/// │ ColumnIndex ◀│─ ─ ─
227/// ││ (Optional) │ │ │
228/// │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │
229/// │┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ │ │ FileMetadata
230/// │ OffsetIndex │ contains embedded
231/// ││ (Optional) │◀┼ ─ │ offsets to
232/// │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │ │ ColumnIndex and
233/// │╔═══════════════════╗ │ │ OffsetIndex
234/// │║ ║ │ │
235/// │║ ║ ┼ ─ │
236/// │║ FileMetadata ║ │
237/// │║ ║ ┼ ─ ─ ┘
238/// │║ ║ │
239/// │╚═══════════════════╝ │
240/// │┌───────────────────┐ │
241/// ││ metadata length │ │ length of FileMetadata (only)
242/// │└───────────────────┘ │
243/// │┌───────────────────┐ │
244/// ││ 'PAR1' │ │ Parquet Magic Bytes
245/// │└───────────────────┘ │
246/// └──────────────────────┘
247/// Output Buffer
248/// ```
249///
250/// # Example
251/// ```no_run
252/// # use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataWriter};
253/// # fn get_metadata() -> ParquetMetaData { unimplemented!(); }
254/// // write parquet metadata to an in-memory buffer
255/// let mut buffer = vec![];
256/// let metadata: ParquetMetaData = get_metadata();
257/// let writer = ParquetMetaDataWriter::new(&mut buffer, &metadata);
258/// // write the metadata to the buffer
259/// writer.finish().unwrap();
260/// assert!(!buffer.is_empty());
261/// ```
262pub struct ParquetMetaDataWriter<'a, W: Write> {
263 buf: TrackedWrite<W>,
264 metadata: &'a ParquetMetaData,
265}
266
267impl<'a, W: Write> ParquetMetaDataWriter<'a, W> {
268 /// Create a new `ParquetMetaDataWriter` to write to `buf`
269 ///
270 /// Note any embedded offsets in the metadata will be written assuming the
271 /// metadata is at the start of the buffer. If the metadata is being written
272 /// to a location other than the start of the buffer, see [`Self::new_with_tracked`]
273 ///
274 /// See example on the struct level documentation
275 pub fn new(buf: W, metadata: &'a ParquetMetaData) -> Self {
276 Self::new_with_tracked(TrackedWrite::new(buf), metadata)
277 }
278
279 /// Create a new ParquetMetaDataWriter to write to `buf`
280 ///
281 /// This method is used when the metadata is being written to a location other
282 /// than the start of the buffer.
283 ///
284 /// See example on the struct level documentation
285 pub fn new_with_tracked(buf: TrackedWrite<W>, metadata: &'a ParquetMetaData) -> Self {
286 Self { buf, metadata }
287 }
288
289 /// Write the metadata to the buffer
290 pub fn finish(mut self) -> Result<()> {
291 let file_metadata = self.metadata.file_metadata();
292
293 let schema = Arc::new(file_metadata.schema().clone());
294 let schema_descr = Arc::new(SchemaDescriptor::new(schema.clone()));
295 let created_by = file_metadata.created_by().map(str::to_string);
296
297 let row_groups = self
298 .metadata
299 .row_groups()
300 .iter()
301 .map(|rg| rg.to_thrift())
302 .collect::<Vec<_>>();
303
304 let key_value_metadata = file_metadata.key_value_metadata().cloned();
305
306 let column_indexes = self.convert_column_indexes();
307 let offset_indexes = self.convert_offset_index();
308
309 let mut encoder = ThriftMetadataWriter::new(
310 &mut self.buf,
311 &schema,
312 &schema_descr,
313 row_groups,
314 created_by,
315 file_metadata.version(),
316 );
317 encoder = encoder.with_column_indexes(&column_indexes);
318 encoder = encoder.with_offset_indexes(&offset_indexes);
319 if let Some(key_value_metadata) = key_value_metadata {
320 encoder = encoder.with_key_value_metadata(key_value_metadata);
321 }
322 encoder.finish()?;
323
324 Ok(())
325 }
326
327 fn convert_column_indexes(&self) -> Vec<Vec<Option<ColumnIndex>>> {
328 if let Some(row_group_column_indexes) = self.metadata.column_index() {
329 (0..self.metadata.row_groups().len())
330 .map(|rg_idx| {
331 let column_indexes = &row_group_column_indexes[rg_idx];
332 column_indexes
333 .iter()
334 .map(|column_index| match column_index {
335 Index::NONE => None,
336 Index::BOOLEAN(column_index) => Some(column_index.to_thrift()),
337 Index::BYTE_ARRAY(column_index) => Some(column_index.to_thrift()),
338 Index::DOUBLE(column_index) => Some(column_index.to_thrift()),
339 Index::FIXED_LEN_BYTE_ARRAY(column_index) => {
340 Some(column_index.to_thrift())
341 }
342 Index::FLOAT(column_index) => Some(column_index.to_thrift()),
343 Index::INT32(column_index) => Some(column_index.to_thrift()),
344 Index::INT64(column_index) => Some(column_index.to_thrift()),
345 Index::INT96(column_index) => Some(column_index.to_thrift()),
346 })
347 .collect()
348 })
349 .collect()
350 } else {
351 // make a None for each row group, for each column
352 self.metadata
353 .row_groups()
354 .iter()
355 .map(|rg| std::iter::repeat(None).take(rg.columns().len()).collect())
356 .collect()
357 }
358 }
359
360 fn convert_offset_index(&self) -> Vec<Vec<Option<OffsetIndex>>> {
361 if let Some(row_group_offset_indexes) = self.metadata.offset_index() {
362 (0..self.metadata.row_groups().len())
363 .map(|rg_idx| {
364 let offset_indexes = &row_group_offset_indexes[rg_idx];
365 offset_indexes
366 .iter()
367 .map(|offset_index| Some(offset_index.to_thrift()))
368 .collect()
369 })
370 .collect()
371 } else {
372 // make a None for each row group, for each column
373 self.metadata
374 .row_groups()
375 .iter()
376 .map(|rg| std::iter::repeat(None).take(rg.columns().len()).collect())
377 .collect()
378 }
379 }
380}