parquet_rewrite/
parquet-rewrite.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Binary file to rewrite parquet files.
19//!
20//! # Install
21//!
22//! `parquet-rewrite` can be installed using `cargo`:
23//! ```
24//! cargo install parquet --features=cli
25//! ```
26//! After this `parquet-rewrite` should be available:
27//! ```
28//! parquet-rewrite -i XYZ.parquet -o XYZ2.parquet
29//! ```
30//!
31//! The binary can also be built from the source code and run as follows:
32//! ```
33//! cargo run --features=cli --bin parquet-rewrite -- -i XYZ.parquet -o XYZ2.parquet
34//! ```
35
36use std::fs::File;
37
38use arrow_array::RecordBatchReader;
39use clap::{builder::PossibleValue, Parser, ValueEnum};
40use parquet::{
41    arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, ArrowWriter},
42    basic::Compression,
43    file::{
44        properties::{EnabledStatistics, WriterProperties, WriterVersion},
45        reader::FileReader,
46        serialized_reader::SerializedFileReader,
47    },
48};
49
50#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum, Debug)]
51enum CompressionArgs {
52    /// No compression.
53    None,
54
55    /// Snappy
56    Snappy,
57
58    /// GZip
59    Gzip,
60
61    /// LZO
62    Lzo,
63
64    /// Brotli
65    Brotli,
66
67    /// LZ4
68    Lz4,
69
70    /// Zstd
71    Zstd,
72
73    /// LZ4 Raw
74    Lz4Raw,
75}
76
77impl From<CompressionArgs> for Compression {
78    fn from(value: CompressionArgs) -> Self {
79        match value {
80            CompressionArgs::None => Self::UNCOMPRESSED,
81            CompressionArgs::Snappy => Self::SNAPPY,
82            CompressionArgs::Gzip => Self::GZIP(Default::default()),
83            CompressionArgs::Lzo => Self::LZO,
84            CompressionArgs::Brotli => Self::BROTLI(Default::default()),
85            CompressionArgs::Lz4 => Self::LZ4,
86            CompressionArgs::Zstd => Self::ZSTD(Default::default()),
87            CompressionArgs::Lz4Raw => Self::LZ4_RAW,
88        }
89    }
90}
91
92#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum, Debug)]
93enum EnabledStatisticsArgs {
94    /// Compute no statistics
95    None,
96
97    /// Compute chunk-level statistics but not page-level
98    Chunk,
99
100    /// Compute page-level and chunk-level statistics
101    Page,
102}
103
104impl From<EnabledStatisticsArgs> for EnabledStatistics {
105    fn from(value: EnabledStatisticsArgs) -> Self {
106        match value {
107            EnabledStatisticsArgs::None => Self::None,
108            EnabledStatisticsArgs::Chunk => Self::Chunk,
109            EnabledStatisticsArgs::Page => Self::Page,
110        }
111    }
112}
113
114#[derive(Clone, Copy, Debug)]
115enum WriterVersionArgs {
116    Parquet1_0,
117    Parquet2_0,
118}
119
120impl ValueEnum for WriterVersionArgs {
121    fn value_variants<'a>() -> &'a [Self] {
122        &[Self::Parquet1_0, Self::Parquet2_0]
123    }
124
125    fn to_possible_value(&self) -> Option<PossibleValue> {
126        match self {
127            WriterVersionArgs::Parquet1_0 => Some(PossibleValue::new("1.0")),
128            WriterVersionArgs::Parquet2_0 => Some(PossibleValue::new("2.0")),
129        }
130    }
131}
132
133impl From<WriterVersionArgs> for WriterVersion {
134    fn from(value: WriterVersionArgs) -> Self {
135        match value {
136            WriterVersionArgs::Parquet1_0 => Self::PARQUET_1_0,
137            WriterVersionArgs::Parquet2_0 => Self::PARQUET_2_0,
138        }
139    }
140}
141
142#[derive(Debug, Parser)]
143#[clap(author, version, about("Read and write parquet file with potentially different settings"), long_about = None)]
144struct Args {
145    /// Path to input parquet file.
146    #[clap(short, long)]
147    input: String,
148
149    /// Path to output parquet file.
150    #[clap(short, long)]
151    output: String,
152
153    /// Compression used.
154    #[clap(long, value_enum)]
155    compression: Option<CompressionArgs>,
156
157    /// Sets maximum number of rows in a row group.
158    #[clap(long)]
159    max_row_group_size: Option<usize>,
160
161    /// Sets best effort maximum number of rows in a data page.
162    #[clap(long)]
163    data_page_row_count_limit: Option<usize>,
164
165    /// Sets best effort maximum size of a data page in bytes.
166    #[clap(long)]
167    data_page_size_limit: Option<usize>,
168
169    /// Sets max statistics size for any column.
170    ///
171    /// Applicable only if statistics are enabled.
172    #[clap(long)]
173    max_statistics_size: Option<usize>,
174
175    /// Sets best effort maximum dictionary page size, in bytes.
176    #[clap(long)]
177    dictionary_page_size_limit: Option<usize>,
178
179    /// Sets whether bloom filter is enabled for any column.
180    #[clap(long)]
181    bloom_filter_enabled: Option<bool>,
182
183    /// Sets bloom filter false positive probability (fpp) for any column.
184    #[clap(long)]
185    bloom_filter_fpp: Option<f64>,
186
187    /// Sets number of distinct values (ndv) for bloom filter for any column.
188    #[clap(long)]
189    bloom_filter_ndv: Option<u64>,
190
191    /// Sets flag to enable/disable dictionary encoding for any column.
192    #[clap(long)]
193    dictionary_enabled: Option<bool>,
194
195    /// Sets flag to enable/disable statistics for any column.
196    #[clap(long)]
197    statistics_enabled: Option<EnabledStatisticsArgs>,
198
199    /// Sets writer version.
200    #[clap(long)]
201    writer_version: Option<WriterVersionArgs>,
202
203    /// Sets whether to coerce Arrow types to match Parquet specification
204    #[clap(long)]
205    coerce_types: Option<bool>,
206}
207
208fn main() {
209    let args = Args::parse();
210
211    // read key-value metadata
212    let parquet_reader =
213        SerializedFileReader::new(File::open(&args.input).expect("Unable to open input file"))
214            .expect("Failed to create reader");
215    let kv_md = parquet_reader
216        .metadata()
217        .file_metadata()
218        .key_value_metadata()
219        .cloned();
220
221    // create actual parquet reader
222    let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(
223        File::open(args.input).expect("Unable to open input file"),
224    )
225    .expect("parquet open")
226    .build()
227    .expect("parquet open");
228
229    let mut writer_properties_builder = WriterProperties::builder().set_key_value_metadata(kv_md);
230    if let Some(value) = args.compression {
231        writer_properties_builder = writer_properties_builder.set_compression(value.into());
232    }
233    if let Some(value) = args.max_row_group_size {
234        writer_properties_builder = writer_properties_builder.set_max_row_group_size(value);
235    }
236    if let Some(value) = args.data_page_row_count_limit {
237        writer_properties_builder = writer_properties_builder.set_data_page_row_count_limit(value);
238    }
239    if let Some(value) = args.data_page_size_limit {
240        writer_properties_builder = writer_properties_builder.set_data_page_size_limit(value);
241    }
242    if let Some(value) = args.dictionary_page_size_limit {
243        writer_properties_builder = writer_properties_builder.set_dictionary_page_size_limit(value);
244    }
245    #[allow(deprecated)]
246    if let Some(value) = args.max_statistics_size {
247        writer_properties_builder = writer_properties_builder.set_max_statistics_size(value);
248    }
249    if let Some(value) = args.bloom_filter_enabled {
250        writer_properties_builder = writer_properties_builder.set_bloom_filter_enabled(value);
251
252        if value {
253            if let Some(value) = args.bloom_filter_fpp {
254                writer_properties_builder = writer_properties_builder.set_bloom_filter_fpp(value);
255            }
256            if let Some(value) = args.bloom_filter_ndv {
257                writer_properties_builder = writer_properties_builder.set_bloom_filter_ndv(value);
258            }
259        }
260    }
261    if let Some(value) = args.dictionary_enabled {
262        writer_properties_builder = writer_properties_builder.set_dictionary_enabled(value);
263    }
264    if let Some(value) = args.statistics_enabled {
265        writer_properties_builder = writer_properties_builder.set_statistics_enabled(value.into());
266    }
267    if let Some(value) = args.writer_version {
268        writer_properties_builder = writer_properties_builder.set_writer_version(value.into());
269    }
270    if let Some(value) = args.coerce_types {
271        writer_properties_builder = writer_properties_builder.set_coerce_types(value);
272    }
273    let writer_properties = writer_properties_builder.build();
274    let mut parquet_writer = ArrowWriter::try_new(
275        File::create(&args.output).expect("Unable to open output file"),
276        parquet_reader.schema(),
277        Some(writer_properties),
278    )
279    .expect("create arrow writer");
280
281    for maybe_batch in parquet_reader {
282        let batch = maybe_batch.expect("reading batch");
283        parquet_writer.write(&batch).expect("writing data");
284    }
285
286    parquet_writer.close().expect("finalizing file");
287}