parquet_rewrite/
parquet-rewrite.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Binary file to rewrite parquet files.
19//!
20//! # Install
21//!
22//! `parquet-rewrite` can be installed using `cargo`:
23//! ```
24//! cargo install parquet --features=cli
25//! ```
26//! After this `parquet-rewrite` should be available:
27//! ```
28//! parquet-rewrite -i XYZ.parquet -o XYZ2.parquet
29//! ```
30//!
31//! The binary can also be built from the source code and run as follows:
32//! ```
33//! cargo run --features=cli --bin parquet-rewrite -- -i XYZ.parquet -o XYZ2.parquet
34//! ```
35
36use std::fs::File;
37
38use arrow_array::RecordBatchReader;
39use clap::{builder::PossibleValue, Parser, ValueEnum};
40use parquet::{
41    arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, ArrowWriter},
42    basic::{Compression, Encoding},
43    file::{
44        properties::{BloomFilterPosition, EnabledStatistics, WriterProperties, WriterVersion},
45        reader::FileReader,
46        serialized_reader::SerializedFileReader,
47    },
48};
49
50#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum, Debug)]
51enum CompressionArgs {
52    /// No compression.
53    None,
54
55    /// Snappy
56    Snappy,
57
58    /// GZip
59    Gzip,
60
61    /// LZO
62    Lzo,
63
64    /// Brotli
65    Brotli,
66
67    /// LZ4
68    Lz4,
69
70    /// Zstd
71    Zstd,
72
73    /// LZ4 Raw
74    Lz4Raw,
75}
76
77impl From<CompressionArgs> for Compression {
78    fn from(value: CompressionArgs) -> Self {
79        match value {
80            CompressionArgs::None => Self::UNCOMPRESSED,
81            CompressionArgs::Snappy => Self::SNAPPY,
82            CompressionArgs::Gzip => Self::GZIP(Default::default()),
83            CompressionArgs::Lzo => Self::LZO,
84            CompressionArgs::Brotli => Self::BROTLI(Default::default()),
85            CompressionArgs::Lz4 => Self::LZ4,
86            CompressionArgs::Zstd => Self::ZSTD(Default::default()),
87            CompressionArgs::Lz4Raw => Self::LZ4_RAW,
88        }
89    }
90}
91
92#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum, Debug)]
93enum EncodingArgs {
94    /// Default byte encoding.
95    Plain,
96
97    /// **Deprecated** dictionary encoding.
98    PlainDictionary,
99
100    /// Group packed run length encoding.
101    Rle,
102
103    /// **Deprecated** Bit-packed encoding.
104    BitPacked,
105
106    /// Delta encoding for integers, either INT32 or INT64.
107    DeltaBinaryPacked,
108
109    /// Encoding for byte arrays to separate the length values and the data.
110    DeltaLengthByteArray,
111
112    /// Incremental encoding for byte arrays.
113    DeltaByteArray,
114
115    /// Dictionary encoding.
116    RleDictionary,
117
118    /// Encoding for fixed-width data.
119    ByteStreamSplit,
120}
121
122#[allow(deprecated)]
123impl From<EncodingArgs> for Encoding {
124    fn from(value: EncodingArgs) -> Self {
125        match value {
126            EncodingArgs::Plain => Self::PLAIN,
127            EncodingArgs::PlainDictionary => Self::PLAIN_DICTIONARY,
128            EncodingArgs::Rle => Self::RLE,
129            EncodingArgs::BitPacked => Self::BIT_PACKED,
130            EncodingArgs::DeltaBinaryPacked => Self::DELTA_BINARY_PACKED,
131            EncodingArgs::DeltaLengthByteArray => Self::DELTA_LENGTH_BYTE_ARRAY,
132            EncodingArgs::DeltaByteArray => Self::DELTA_BYTE_ARRAY,
133            EncodingArgs::RleDictionary => Self::RLE_DICTIONARY,
134            EncodingArgs::ByteStreamSplit => Self::BYTE_STREAM_SPLIT,
135        }
136    }
137}
138
139#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum, Debug)]
140enum EnabledStatisticsArgs {
141    /// Compute no statistics
142    None,
143
144    /// Compute chunk-level statistics but not page-level
145    Chunk,
146
147    /// Compute page-level and chunk-level statistics
148    Page,
149}
150
151impl From<EnabledStatisticsArgs> for EnabledStatistics {
152    fn from(value: EnabledStatisticsArgs) -> Self {
153        match value {
154            EnabledStatisticsArgs::None => Self::None,
155            EnabledStatisticsArgs::Chunk => Self::Chunk,
156            EnabledStatisticsArgs::Page => Self::Page,
157        }
158    }
159}
160
161#[derive(Clone, Copy, Debug)]
162enum WriterVersionArgs {
163    Parquet1_0,
164    Parquet2_0,
165}
166
167impl ValueEnum for WriterVersionArgs {
168    fn value_variants<'a>() -> &'a [Self] {
169        &[Self::Parquet1_0, Self::Parquet2_0]
170    }
171
172    fn to_possible_value(&self) -> Option<PossibleValue> {
173        match self {
174            WriterVersionArgs::Parquet1_0 => Some(PossibleValue::new("1.0")),
175            WriterVersionArgs::Parquet2_0 => Some(PossibleValue::new("2.0")),
176        }
177    }
178}
179
180impl From<WriterVersionArgs> for WriterVersion {
181    fn from(value: WriterVersionArgs) -> Self {
182        match value {
183            WriterVersionArgs::Parquet1_0 => Self::PARQUET_1_0,
184            WriterVersionArgs::Parquet2_0 => Self::PARQUET_2_0,
185        }
186    }
187}
188
189#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum, Debug)]
190enum BloomFilterPositionArgs {
191    /// Write Bloom Filters of each row group right after the row group
192    AfterRowGroup,
193
194    /// Write Bloom Filters at the end of the file
195    End,
196}
197
198impl From<BloomFilterPositionArgs> for BloomFilterPosition {
199    fn from(value: BloomFilterPositionArgs) -> Self {
200        match value {
201            BloomFilterPositionArgs::AfterRowGroup => Self::AfterRowGroup,
202            BloomFilterPositionArgs::End => Self::End,
203        }
204    }
205}
206
207#[derive(Debug, Parser)]
208#[clap(author, version, about("Read and write parquet file with potentially different settings"), long_about = None)]
209struct Args {
210    /// Path to input parquet file.
211    #[clap(short, long)]
212    input: String,
213
214    /// Path to output parquet file.
215    #[clap(short, long)]
216    output: String,
217
218    /// Compression used for all columns.
219    #[clap(long, value_enum)]
220    compression: Option<CompressionArgs>,
221
222    /// Encoding used for all columns, if dictionary is not enabled.
223    #[clap(long, value_enum)]
224    encoding: Option<EncodingArgs>,
225
226    /// Sets flag to enable/disable dictionary encoding for all columns.
227    #[clap(long)]
228    dictionary_enabled: Option<bool>,
229
230    /// Sets best effort maximum dictionary page size, in bytes.
231    #[clap(long)]
232    dictionary_page_size_limit: Option<usize>,
233
234    /// Sets maximum number of rows in a row group.
235    #[clap(long)]
236    max_row_group_size: Option<usize>,
237
238    /// Sets best effort maximum number of rows in a data page.
239    #[clap(long)]
240    data_page_row_count_limit: Option<usize>,
241
242    /// Sets best effort maximum size of a data page in bytes.
243    #[clap(long)]
244    data_page_size_limit: Option<usize>,
245
246    /// Sets the max length of min/max statistics in row group and data page
247    /// header statistics for all columns.
248    ///
249    /// Applicable only if statistics are enabled.
250    #[clap(long)]
251    statistics_truncate_length: Option<usize>,
252
253    /// Sets the max length of min/max statistics in the column index.
254    ///
255    /// Applicable only if statistics are enabled.
256    #[clap(long)]
257    column_index_truncate_length: Option<usize>,
258
259    /// Write statistics to the data page headers?
260    ///
261    /// Setting this true will also enable page level statistics.
262    #[clap(long)]
263    write_page_header_statistics: Option<bool>,
264
265    /// Sets whether bloom filter is enabled for all columns.
266    #[clap(long)]
267    bloom_filter_enabled: Option<bool>,
268
269    /// Sets bloom filter false positive probability (fpp) for all columns.
270    #[clap(long)]
271    bloom_filter_fpp: Option<f64>,
272
273    /// Sets number of distinct values (ndv) for bloom filter for all columns.
274    #[clap(long)]
275    bloom_filter_ndv: Option<u64>,
276
277    /// Sets the position of bloom filter
278    #[clap(long)]
279    bloom_filter_position: Option<BloomFilterPositionArgs>,
280
281    /// Sets flag to enable/disable statistics for all columns.
282    #[clap(long)]
283    statistics_enabled: Option<EnabledStatisticsArgs>,
284
285    /// Sets writer version.
286    #[clap(long)]
287    writer_version: Option<WriterVersionArgs>,
288
289    /// Sets whether to coerce Arrow types to match Parquet specification
290    #[clap(long)]
291    coerce_types: Option<bool>,
292}
293
294fn main() {
295    let args = Args::parse();
296
297    // read key-value metadata
298    let parquet_reader =
299        SerializedFileReader::new(File::open(&args.input).expect("Unable to open input file"))
300            .expect("Failed to create reader");
301    let kv_md = parquet_reader
302        .metadata()
303        .file_metadata()
304        .key_value_metadata()
305        .cloned();
306
307    // create actual parquet reader
308    let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(
309        File::open(args.input).expect("Unable to open input file"),
310    )
311    .expect("parquet open")
312    .build()
313    .expect("parquet open");
314
315    let mut writer_properties_builder = WriterProperties::builder().set_key_value_metadata(kv_md);
316    if let Some(value) = args.compression {
317        writer_properties_builder = writer_properties_builder.set_compression(value.into());
318    }
319
320    // setup encoding
321    if let Some(value) = args.encoding {
322        writer_properties_builder = writer_properties_builder.set_encoding(value.into());
323    }
324    if let Some(value) = args.dictionary_enabled {
325        writer_properties_builder = writer_properties_builder.set_dictionary_enabled(value);
326    }
327    if let Some(value) = args.dictionary_page_size_limit {
328        writer_properties_builder = writer_properties_builder.set_dictionary_page_size_limit(value);
329    }
330
331    if let Some(value) = args.max_row_group_size {
332        writer_properties_builder = writer_properties_builder.set_max_row_group_size(value);
333    }
334    if let Some(value) = args.data_page_row_count_limit {
335        writer_properties_builder = writer_properties_builder.set_data_page_row_count_limit(value);
336    }
337    if let Some(value) = args.data_page_size_limit {
338        writer_properties_builder = writer_properties_builder.set_data_page_size_limit(value);
339    }
340    if let Some(value) = args.dictionary_page_size_limit {
341        writer_properties_builder = writer_properties_builder.set_dictionary_page_size_limit(value);
342    }
343    if let Some(value) = args.statistics_truncate_length {
344        writer_properties_builder =
345            writer_properties_builder.set_statistics_truncate_length(Some(value));
346    }
347    if let Some(value) = args.column_index_truncate_length {
348        writer_properties_builder =
349            writer_properties_builder.set_column_index_truncate_length(Some(value));
350    }
351    if let Some(value) = args.bloom_filter_enabled {
352        writer_properties_builder = writer_properties_builder.set_bloom_filter_enabled(value);
353
354        if value {
355            if let Some(value) = args.bloom_filter_fpp {
356                writer_properties_builder = writer_properties_builder.set_bloom_filter_fpp(value);
357            }
358            if let Some(value) = args.bloom_filter_ndv {
359                writer_properties_builder = writer_properties_builder.set_bloom_filter_ndv(value);
360            }
361            if let Some(value) = args.bloom_filter_position {
362                writer_properties_builder =
363                    writer_properties_builder.set_bloom_filter_position(value.into());
364            }
365        }
366    }
367    if let Some(value) = args.statistics_enabled {
368        writer_properties_builder = writer_properties_builder.set_statistics_enabled(value.into());
369    }
370    // set this after statistics_enabled
371    if let Some(value) = args.write_page_header_statistics {
372        writer_properties_builder =
373            writer_properties_builder.set_write_page_header_statistics(value);
374        if value {
375            writer_properties_builder =
376                writer_properties_builder.set_statistics_enabled(EnabledStatistics::Page);
377        }
378    }
379    if let Some(value) = args.writer_version {
380        writer_properties_builder = writer_properties_builder.set_writer_version(value.into());
381    }
382    if let Some(value) = args.coerce_types {
383        writer_properties_builder = writer_properties_builder.set_coerce_types(value);
384    }
385    let writer_properties = writer_properties_builder.build();
386    let mut parquet_writer = ArrowWriter::try_new(
387        File::create(&args.output).expect("Unable to open output file"),
388        parquet_reader.schema(),
389        Some(writer_properties),
390    )
391    .expect("create arrow writer");
392
393    for maybe_batch in parquet_reader {
394        let batch = maybe_batch.expect("reading batch");
395        parquet_writer.write(&batch).expect("writing data");
396    }
397
398    parquet_writer.close().expect("finalizing file");
399}