parquet_rewrite/
parquet-rewrite.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Binary file to rewrite parquet files.
19//!
20//! # Install
21//!
22//! `parquet-rewrite` can be installed using `cargo`:
23//! ```
24//! cargo install parquet --features=cli
25//! ```
26//! After this `parquet-rewrite` should be available:
27//! ```
28//! parquet-rewrite -i XYZ.parquet -o XYZ2.parquet
29//! ```
30//!
31//! The binary can also be built from the source code and run as follows:
32//! ```
33//! cargo run --features=cli --bin parquet-rewrite -- -i XYZ.parquet -o XYZ2.parquet
34//! ```
35
36use std::fs::File;
37
38use arrow_array::RecordBatchReader;
39use clap::{Parser, ValueEnum, builder::PossibleValue};
40use parquet::{
41    arrow::{ArrowWriter, arrow_reader::ParquetRecordBatchReaderBuilder},
42    basic::{BrotliLevel, Compression, Encoding, GzipLevel, ZstdLevel},
43    file::{
44        properties::{BloomFilterPosition, EnabledStatistics, WriterProperties, WriterVersion},
45        reader::FileReader,
46        serialized_reader::SerializedFileReader,
47    },
48};
49
50#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum, Debug)]
51enum CompressionArgs {
52    /// No compression.
53    None,
54
55    /// Snappy
56    Snappy,
57
58    /// GZip
59    Gzip,
60
61    /// LZO
62    Lzo,
63
64    /// Brotli
65    Brotli,
66
67    /// LZ4
68    Lz4,
69
70    /// Zstd
71    Zstd,
72
73    /// LZ4 Raw
74    Lz4Raw,
75}
76
77fn compression_from_args(codec: CompressionArgs, level: Option<u32>) -> Compression {
78    match codec {
79        CompressionArgs::None => Compression::UNCOMPRESSED,
80        CompressionArgs::Snappy => Compression::SNAPPY,
81        CompressionArgs::Gzip => match level {
82            Some(lvl) => {
83                Compression::GZIP(GzipLevel::try_new(lvl).expect("invalid gzip compression level"))
84            }
85            None => Compression::GZIP(Default::default()),
86        },
87        CompressionArgs::Lzo => Compression::LZO,
88        CompressionArgs::Brotli => match level {
89            Some(lvl) => Compression::BROTLI(
90                BrotliLevel::try_new(lvl).expect("invalid brotli compression level"),
91            ),
92            None => Compression::BROTLI(Default::default()),
93        },
94        CompressionArgs::Lz4 => Compression::LZ4,
95        CompressionArgs::Zstd => match level {
96            Some(lvl) => Compression::ZSTD(
97                ZstdLevel::try_new(lvl as i32).expect("invalid zstd compression level"),
98            ),
99            None => Compression::ZSTD(Default::default()),
100        },
101        CompressionArgs::Lz4Raw => Compression::LZ4_RAW,
102    }
103}
104
105#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum, Debug)]
106enum EncodingArgs {
107    /// Default byte encoding.
108    Plain,
109
110    /// **Deprecated** dictionary encoding.
111    PlainDictionary,
112
113    /// Group packed run length encoding.
114    Rle,
115
116    /// **Deprecated** Bit-packed encoding.
117    BitPacked,
118
119    /// Delta encoding for integers, either INT32 or INT64.
120    DeltaBinaryPacked,
121
122    /// Encoding for byte arrays to separate the length values and the data.
123    DeltaLengthByteArray,
124
125    /// Incremental encoding for byte arrays.
126    DeltaByteArray,
127
128    /// Dictionary encoding.
129    RleDictionary,
130
131    /// Encoding for fixed-width data.
132    ByteStreamSplit,
133}
134
135#[allow(deprecated)]
136impl From<EncodingArgs> for Encoding {
137    fn from(value: EncodingArgs) -> Self {
138        match value {
139            EncodingArgs::Plain => Self::PLAIN,
140            EncodingArgs::PlainDictionary => Self::PLAIN_DICTIONARY,
141            EncodingArgs::Rle => Self::RLE,
142            EncodingArgs::BitPacked => Self::BIT_PACKED,
143            EncodingArgs::DeltaBinaryPacked => Self::DELTA_BINARY_PACKED,
144            EncodingArgs::DeltaLengthByteArray => Self::DELTA_LENGTH_BYTE_ARRAY,
145            EncodingArgs::DeltaByteArray => Self::DELTA_BYTE_ARRAY,
146            EncodingArgs::RleDictionary => Self::RLE_DICTIONARY,
147            EncodingArgs::ByteStreamSplit => Self::BYTE_STREAM_SPLIT,
148        }
149    }
150}
151
152#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum, Debug)]
153enum EnabledStatisticsArgs {
154    /// Compute no statistics
155    None,
156
157    /// Compute chunk-level statistics but not page-level
158    Chunk,
159
160    /// Compute page-level and chunk-level statistics
161    Page,
162}
163
164impl From<EnabledStatisticsArgs> for EnabledStatistics {
165    fn from(value: EnabledStatisticsArgs) -> Self {
166        match value {
167            EnabledStatisticsArgs::None => Self::None,
168            EnabledStatisticsArgs::Chunk => Self::Chunk,
169            EnabledStatisticsArgs::Page => Self::Page,
170        }
171    }
172}
173
174#[derive(Clone, Copy, Debug)]
175enum WriterVersionArgs {
176    Parquet1_0,
177    Parquet2_0,
178}
179
180impl ValueEnum for WriterVersionArgs {
181    fn value_variants<'a>() -> &'a [Self] {
182        &[Self::Parquet1_0, Self::Parquet2_0]
183    }
184
185    fn to_possible_value(&self) -> Option<PossibleValue> {
186        match self {
187            WriterVersionArgs::Parquet1_0 => Some(PossibleValue::new("1.0")),
188            WriterVersionArgs::Parquet2_0 => Some(PossibleValue::new("2.0")),
189        }
190    }
191}
192
193impl From<WriterVersionArgs> for WriterVersion {
194    fn from(value: WriterVersionArgs) -> Self {
195        match value {
196            WriterVersionArgs::Parquet1_0 => Self::PARQUET_1_0,
197            WriterVersionArgs::Parquet2_0 => Self::PARQUET_2_0,
198        }
199    }
200}
201
202#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum, Debug)]
203enum BloomFilterPositionArgs {
204    /// Write Bloom Filters of each row group right after the row group
205    AfterRowGroup,
206
207    /// Write Bloom Filters at the end of the file
208    End,
209}
210
211impl From<BloomFilterPositionArgs> for BloomFilterPosition {
212    fn from(value: BloomFilterPositionArgs) -> Self {
213        match value {
214            BloomFilterPositionArgs::AfterRowGroup => Self::AfterRowGroup,
215            BloomFilterPositionArgs::End => Self::End,
216        }
217    }
218}
219
220#[derive(Debug, Parser)]
221#[clap(author, version, about("Read and write parquet file with potentially different settings"), long_about = None)]
222struct Args {
223    /// Path to input parquet file.
224    #[clap(short, long)]
225    input: String,
226
227    /// Path to output parquet file.
228    #[clap(short, long)]
229    output: String,
230
231    /// Compression used for all columns.
232    #[clap(long, value_enum)]
233    compression: Option<CompressionArgs>,
234
235    /// Compression level for gzip/brotli/zstd.
236    #[clap(long)]
237    compression_level: Option<u32>,
238
239    /// Encoding used for all columns, if dictionary is not enabled.
240    #[clap(long, value_enum)]
241    encoding: Option<EncodingArgs>,
242
243    /// Sets flag to enable/disable dictionary encoding for all columns.
244    #[clap(long)]
245    dictionary_enabled: Option<bool>,
246
247    /// Sets best effort maximum dictionary page size, in bytes.
248    #[clap(long)]
249    dictionary_page_size_limit: Option<usize>,
250
251    /// Sets maximum number of rows in a row group.
252    #[clap(long)]
253    max_row_group_size: Option<usize>,
254
255    /// Sets best effort maximum number of rows in a data page.
256    #[clap(long)]
257    data_page_row_count_limit: Option<usize>,
258
259    /// Sets best effort maximum size of a data page in bytes.
260    #[clap(long)]
261    data_page_size_limit: Option<usize>,
262
263    /// Sets the max length of min/max statistics in row group and data page
264    /// header statistics for all columns.
265    ///
266    /// Applicable only if statistics are enabled.
267    #[clap(long)]
268    statistics_truncate_length: Option<usize>,
269
270    /// Sets the max length of min/max statistics in the column index.
271    ///
272    /// Applicable only if statistics are enabled.
273    #[clap(long)]
274    column_index_truncate_length: Option<usize>,
275
276    /// Write statistics to the data page headers?
277    ///
278    /// Setting this true will also enable page level statistics.
279    #[clap(long)]
280    write_page_header_statistics: Option<bool>,
281
282    /// Sets whether bloom filter is enabled for all columns.
283    #[clap(long)]
284    bloom_filter_enabled: Option<bool>,
285
286    /// Sets bloom filter false positive probability (fpp) for all columns.
287    #[clap(long)]
288    bloom_filter_fpp: Option<f64>,
289
290    /// Sets number of distinct values (ndv) for bloom filter for all columns.
291    #[clap(long)]
292    bloom_filter_ndv: Option<u64>,
293
294    /// Sets the position of bloom filter
295    #[clap(long)]
296    bloom_filter_position: Option<BloomFilterPositionArgs>,
297
298    /// Sets flag to enable/disable statistics for all columns.
299    #[clap(long)]
300    statistics_enabled: Option<EnabledStatisticsArgs>,
301
302    /// Sets writer version.
303    #[clap(long)]
304    writer_version: Option<WriterVersionArgs>,
305
306    /// Sets write batch size.
307    #[clap(long)]
308    write_batch_size: Option<usize>,
309
310    /// Sets whether to coerce Arrow types to match Parquet specification
311    #[clap(long)]
312    coerce_types: Option<bool>,
313}
314
315fn main() {
316    let args = Args::parse();
317
318    // read key-value metadata
319    let parquet_reader =
320        SerializedFileReader::new(File::open(&args.input).expect("Unable to open input file"))
321            .expect("Failed to create reader");
322    let kv_md = parquet_reader
323        .metadata()
324        .file_metadata()
325        .key_value_metadata()
326        .cloned();
327
328    // create actual parquet reader
329    let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(
330        File::open(args.input).expect("Unable to open input file"),
331    )
332    .expect("parquet open")
333    .build()
334    .expect("parquet open");
335
336    let mut writer_properties_builder = WriterProperties::builder().set_key_value_metadata(kv_md);
337
338    if let Some(value) = args.compression {
339        let compression = compression_from_args(value, args.compression_level);
340        writer_properties_builder = writer_properties_builder.set_compression(compression);
341    }
342
343    // setup encoding
344    if let Some(value) = args.encoding {
345        writer_properties_builder = writer_properties_builder.set_encoding(value.into());
346    }
347    if let Some(value) = args.dictionary_enabled {
348        writer_properties_builder = writer_properties_builder.set_dictionary_enabled(value);
349    }
350    if let Some(value) = args.dictionary_page_size_limit {
351        writer_properties_builder = writer_properties_builder.set_dictionary_page_size_limit(value);
352    }
353
354    if let Some(value) = args.max_row_group_size {
355        writer_properties_builder = writer_properties_builder.set_max_row_group_size(value);
356    }
357    if let Some(value) = args.data_page_row_count_limit {
358        writer_properties_builder = writer_properties_builder.set_data_page_row_count_limit(value);
359    }
360    if let Some(value) = args.data_page_size_limit {
361        writer_properties_builder = writer_properties_builder.set_data_page_size_limit(value);
362    }
363    if let Some(value) = args.dictionary_page_size_limit {
364        writer_properties_builder = writer_properties_builder.set_dictionary_page_size_limit(value);
365    }
366    if let Some(value) = args.statistics_truncate_length {
367        writer_properties_builder =
368            writer_properties_builder.set_statistics_truncate_length(Some(value));
369    }
370    if let Some(value) = args.column_index_truncate_length {
371        writer_properties_builder =
372            writer_properties_builder.set_column_index_truncate_length(Some(value));
373    }
374    if let Some(value) = args.bloom_filter_enabled {
375        writer_properties_builder = writer_properties_builder.set_bloom_filter_enabled(value);
376
377        if value {
378            if let Some(value) = args.bloom_filter_fpp {
379                writer_properties_builder = writer_properties_builder.set_bloom_filter_fpp(value);
380            }
381            if let Some(value) = args.bloom_filter_ndv {
382                writer_properties_builder = writer_properties_builder.set_bloom_filter_ndv(value);
383            }
384            if let Some(value) = args.bloom_filter_position {
385                writer_properties_builder =
386                    writer_properties_builder.set_bloom_filter_position(value.into());
387            }
388        }
389    }
390    if let Some(value) = args.statistics_enabled {
391        writer_properties_builder = writer_properties_builder.set_statistics_enabled(value.into());
392    }
393    // set this after statistics_enabled
394    if let Some(value) = args.write_page_header_statistics {
395        writer_properties_builder =
396            writer_properties_builder.set_write_page_header_statistics(value);
397        if value {
398            writer_properties_builder =
399                writer_properties_builder.set_statistics_enabled(EnabledStatistics::Page);
400        }
401    }
402    if let Some(value) = args.writer_version {
403        writer_properties_builder = writer_properties_builder.set_writer_version(value.into());
404    }
405    if let Some(value) = args.coerce_types {
406        writer_properties_builder = writer_properties_builder.set_coerce_types(value);
407    }
408    if let Some(value) = args.write_batch_size {
409        writer_properties_builder = writer_properties_builder.set_write_batch_size(value);
410    }
411    let writer_properties = writer_properties_builder.build();
412    let mut parquet_writer = ArrowWriter::try_new(
413        File::create(&args.output).expect("Unable to open output file"),
414        parquet_reader.schema(),
415        Some(writer_properties),
416    )
417    .expect("create arrow writer");
418
419    for maybe_batch in parquet_reader {
420        let batch = maybe_batch.expect("reading batch");
421        parquet_writer.write(&batch).expect("writing data");
422    }
423
424    parquet_writer.close().expect("finalizing file");
425}