parquet_rewrite/
parquet-rewrite.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Binary file to rewrite parquet files.
19//!
20//! # Install
21//!
22//! `parquet-rewrite` can be installed using `cargo`:
23//! ```
24//! cargo install parquet --features=cli
25//! ```
26//! After this `parquet-rewrite` should be available:
27//! ```
28//! parquet-rewrite -i XYZ.parquet -o XYZ2.parquet
29//! ```
30//!
31//! The binary can also be built from the source code and run as follows:
32//! ```
33//! cargo run --features=cli --bin parquet-rewrite -- -i XYZ.parquet -o XYZ2.parquet
34//! ```
35
36use std::fs::File;
37
38use arrow_array::RecordBatchReader;
39use clap::{builder::PossibleValue, Parser, ValueEnum};
40use parquet::{
41    arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, ArrowWriter},
42    basic::{Compression, Encoding},
43    file::{
44        properties::{BloomFilterPosition, EnabledStatistics, WriterProperties, WriterVersion},
45        reader::FileReader,
46        serialized_reader::SerializedFileReader,
47    },
48};
49
50#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum, Debug)]
51enum CompressionArgs {
52    /// No compression.
53    None,
54
55    /// Snappy
56    Snappy,
57
58    /// GZip
59    Gzip,
60
61    /// LZO
62    Lzo,
63
64    /// Brotli
65    Brotli,
66
67    /// LZ4
68    Lz4,
69
70    /// Zstd
71    Zstd,
72
73    /// LZ4 Raw
74    Lz4Raw,
75}
76
77impl From<CompressionArgs> for Compression {
78    fn from(value: CompressionArgs) -> Self {
79        match value {
80            CompressionArgs::None => Self::UNCOMPRESSED,
81            CompressionArgs::Snappy => Self::SNAPPY,
82            CompressionArgs::Gzip => Self::GZIP(Default::default()),
83            CompressionArgs::Lzo => Self::LZO,
84            CompressionArgs::Brotli => Self::BROTLI(Default::default()),
85            CompressionArgs::Lz4 => Self::LZ4,
86            CompressionArgs::Zstd => Self::ZSTD(Default::default()),
87            CompressionArgs::Lz4Raw => Self::LZ4_RAW,
88        }
89    }
90}
91
92#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum, Debug)]
93enum EncodingArgs {
94    /// Default byte encoding.
95    Plain,
96
97    /// **Deprecated** dictionary encoding.
98    PlainDictionary,
99
100    /// Group packed run length encoding.
101    Rle,
102
103    /// **Deprecated** Bit-packed encoding.
104    BitPacked,
105
106    /// Delta encoding for integers, either INT32 or INT64.
107    DeltaBinaryPacked,
108
109    /// Encoding for byte arrays to separate the length values and the data.
110    DeltaLengthByteArray,
111
112    /// Incremental encoding for byte arrays.
113    DeltaByteArray,
114
115    /// Dictionary encoding.
116    RleDictionary,
117
118    /// Encoding for fixed-width data.
119    ByteStreamSplit,
120}
121
122#[allow(deprecated)]
123impl From<EncodingArgs> for Encoding {
124    fn from(value: EncodingArgs) -> Self {
125        match value {
126            EncodingArgs::Plain => Self::PLAIN,
127            EncodingArgs::PlainDictionary => Self::PLAIN_DICTIONARY,
128            EncodingArgs::Rle => Self::RLE,
129            EncodingArgs::BitPacked => Self::BIT_PACKED,
130            EncodingArgs::DeltaBinaryPacked => Self::DELTA_BINARY_PACKED,
131            EncodingArgs::DeltaLengthByteArray => Self::DELTA_LENGTH_BYTE_ARRAY,
132            EncodingArgs::DeltaByteArray => Self::DELTA_BYTE_ARRAY,
133            EncodingArgs::RleDictionary => Self::RLE_DICTIONARY,
134            EncodingArgs::ByteStreamSplit => Self::BYTE_STREAM_SPLIT,
135        }
136    }
137}
138
139#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum, Debug)]
140enum EnabledStatisticsArgs {
141    /// Compute no statistics
142    None,
143
144    /// Compute chunk-level statistics but not page-level
145    Chunk,
146
147    /// Compute page-level and chunk-level statistics
148    Page,
149}
150
151impl From<EnabledStatisticsArgs> for EnabledStatistics {
152    fn from(value: EnabledStatisticsArgs) -> Self {
153        match value {
154            EnabledStatisticsArgs::None => Self::None,
155            EnabledStatisticsArgs::Chunk => Self::Chunk,
156            EnabledStatisticsArgs::Page => Self::Page,
157        }
158    }
159}
160
161#[derive(Clone, Copy, Debug)]
162enum WriterVersionArgs {
163    Parquet1_0,
164    Parquet2_0,
165}
166
167impl ValueEnum for WriterVersionArgs {
168    fn value_variants<'a>() -> &'a [Self] {
169        &[Self::Parquet1_0, Self::Parquet2_0]
170    }
171
172    fn to_possible_value(&self) -> Option<PossibleValue> {
173        match self {
174            WriterVersionArgs::Parquet1_0 => Some(PossibleValue::new("1.0")),
175            WriterVersionArgs::Parquet2_0 => Some(PossibleValue::new("2.0")),
176        }
177    }
178}
179
180impl From<WriterVersionArgs> for WriterVersion {
181    fn from(value: WriterVersionArgs) -> Self {
182        match value {
183            WriterVersionArgs::Parquet1_0 => Self::PARQUET_1_0,
184            WriterVersionArgs::Parquet2_0 => Self::PARQUET_2_0,
185        }
186    }
187}
188
189#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum, Debug)]
190enum BloomFilterPositionArgs {
191    /// Write Bloom Filters of each row group right after the row group
192    AfterRowGroup,
193
194    /// Write Bloom Filters at the end of the file
195    End,
196}
197
198impl From<BloomFilterPositionArgs> for BloomFilterPosition {
199    fn from(value: BloomFilterPositionArgs) -> Self {
200        match value {
201            BloomFilterPositionArgs::AfterRowGroup => Self::AfterRowGroup,
202            BloomFilterPositionArgs::End => Self::End,
203        }
204    }
205}
206
207#[derive(Debug, Parser)]
208#[clap(author, version, about("Read and write parquet file with potentially different settings"), long_about = None)]
209struct Args {
210    /// Path to input parquet file.
211    #[clap(short, long)]
212    input: String,
213
214    /// Path to output parquet file.
215    #[clap(short, long)]
216    output: String,
217
218    /// Compression used for all columns.
219    #[clap(long, value_enum)]
220    compression: Option<CompressionArgs>,
221
222    /// Encoding used for all columns, if dictionary is not enabled.
223    #[clap(long, value_enum)]
224    encoding: Option<EncodingArgs>,
225
226    /// Sets flag to enable/disable dictionary encoding for all columns.
227    #[clap(long)]
228    dictionary_enabled: Option<bool>,
229
230    /// Sets best effort maximum dictionary page size, in bytes.
231    #[clap(long)]
232    dictionary_page_size_limit: Option<usize>,
233
234    /// Sets maximum number of rows in a row group.
235    #[clap(long)]
236    max_row_group_size: Option<usize>,
237
238    /// Sets best effort maximum number of rows in a data page.
239    #[clap(long)]
240    data_page_row_count_limit: Option<usize>,
241
242    /// Sets best effort maximum size of a data page in bytes.
243    #[clap(long)]
244    data_page_size_limit: Option<usize>,
245
246    /// Sets max statistics size for all columns.
247    ///
248    /// Applicable only if statistics are enabled.
249    #[clap(long)]
250    max_statistics_size: Option<usize>,
251
252    /// Sets whether bloom filter is enabled for all columns.
253    #[clap(long)]
254    bloom_filter_enabled: Option<bool>,
255
256    /// Sets bloom filter false positive probability (fpp) for all columns.
257    #[clap(long)]
258    bloom_filter_fpp: Option<f64>,
259
260    /// Sets number of distinct values (ndv) for bloom filter for all columns.
261    #[clap(long)]
262    bloom_filter_ndv: Option<u64>,
263
264    /// Sets the position of bloom filter
265    #[clap(long)]
266    bloom_filter_position: Option<BloomFilterPositionArgs>,
267
268    /// Sets flag to enable/disable statistics for all columns.
269    #[clap(long)]
270    statistics_enabled: Option<EnabledStatisticsArgs>,
271
272    /// Sets writer version.
273    #[clap(long)]
274    writer_version: Option<WriterVersionArgs>,
275
276    /// Sets whether to coerce Arrow types to match Parquet specification
277    #[clap(long)]
278    coerce_types: Option<bool>,
279}
280
281fn main() {
282    let args = Args::parse();
283
284    // read key-value metadata
285    let parquet_reader =
286        SerializedFileReader::new(File::open(&args.input).expect("Unable to open input file"))
287            .expect("Failed to create reader");
288    let kv_md = parquet_reader
289        .metadata()
290        .file_metadata()
291        .key_value_metadata()
292        .cloned();
293
294    // create actual parquet reader
295    let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(
296        File::open(args.input).expect("Unable to open input file"),
297    )
298    .expect("parquet open")
299    .build()
300    .expect("parquet open");
301
302    let mut writer_properties_builder = WriterProperties::builder().set_key_value_metadata(kv_md);
303    if let Some(value) = args.compression {
304        writer_properties_builder = writer_properties_builder.set_compression(value.into());
305    }
306
307    // setup encoding
308    if let Some(value) = args.encoding {
309        writer_properties_builder = writer_properties_builder.set_encoding(value.into());
310    }
311    if let Some(value) = args.dictionary_enabled {
312        writer_properties_builder = writer_properties_builder.set_dictionary_enabled(value);
313    }
314    if let Some(value) = args.dictionary_page_size_limit {
315        writer_properties_builder = writer_properties_builder.set_dictionary_page_size_limit(value);
316    }
317
318    if let Some(value) = args.max_row_group_size {
319        writer_properties_builder = writer_properties_builder.set_max_row_group_size(value);
320    }
321    if let Some(value) = args.data_page_row_count_limit {
322        writer_properties_builder = writer_properties_builder.set_data_page_row_count_limit(value);
323    }
324    if let Some(value) = args.data_page_size_limit {
325        writer_properties_builder = writer_properties_builder.set_data_page_size_limit(value);
326    }
327    #[allow(deprecated)]
328    if let Some(value) = args.max_statistics_size {
329        writer_properties_builder = writer_properties_builder.set_max_statistics_size(value);
330    }
331    if let Some(value) = args.bloom_filter_enabled {
332        writer_properties_builder = writer_properties_builder.set_bloom_filter_enabled(value);
333
334        if value {
335            if let Some(value) = args.bloom_filter_fpp {
336                writer_properties_builder = writer_properties_builder.set_bloom_filter_fpp(value);
337            }
338            if let Some(value) = args.bloom_filter_ndv {
339                writer_properties_builder = writer_properties_builder.set_bloom_filter_ndv(value);
340            }
341            if let Some(value) = args.bloom_filter_position {
342                writer_properties_builder =
343                    writer_properties_builder.set_bloom_filter_position(value.into());
344            }
345        }
346    }
347    if let Some(value) = args.statistics_enabled {
348        writer_properties_builder = writer_properties_builder.set_statistics_enabled(value.into());
349    }
350    if let Some(value) = args.writer_version {
351        writer_properties_builder = writer_properties_builder.set_writer_version(value.into());
352    }
353    if let Some(value) = args.coerce_types {
354        writer_properties_builder = writer_properties_builder.set_coerce_types(value);
355    }
356    let writer_properties = writer_properties_builder.build();
357    let mut parquet_writer = ArrowWriter::try_new(
358        File::create(&args.output).expect("Unable to open output file"),
359        parquet_reader.schema(),
360        Some(writer_properties),
361    )
362    .expect("create arrow writer");
363
364    for maybe_batch in parquet_reader {
365        let batch = maybe_batch.expect("reading batch");
366        parquet_writer.write(&batch).expect("writing data");
367    }
368
369    parquet_writer.close().expect("finalizing file");
370}