1use std::fs::File;
37
38use arrow_array::RecordBatchReader;
39use clap::{builder::PossibleValue, Parser, ValueEnum};
40use parquet::{
41 arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, ArrowWriter},
42 basic::{Compression, Encoding},
43 file::{
44 properties::{BloomFilterPosition, EnabledStatistics, WriterProperties, WriterVersion},
45 reader::FileReader,
46 serialized_reader::SerializedFileReader,
47 },
48};
49
50#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum, Debug)]
51enum CompressionArgs {
52 None,
54
55 Snappy,
57
58 Gzip,
60
61 Lzo,
63
64 Brotli,
66
67 Lz4,
69
70 Zstd,
72
73 Lz4Raw,
75}
76
77impl From<CompressionArgs> for Compression {
78 fn from(value: CompressionArgs) -> Self {
79 match value {
80 CompressionArgs::None => Self::UNCOMPRESSED,
81 CompressionArgs::Snappy => Self::SNAPPY,
82 CompressionArgs::Gzip => Self::GZIP(Default::default()),
83 CompressionArgs::Lzo => Self::LZO,
84 CompressionArgs::Brotli => Self::BROTLI(Default::default()),
85 CompressionArgs::Lz4 => Self::LZ4,
86 CompressionArgs::Zstd => Self::ZSTD(Default::default()),
87 CompressionArgs::Lz4Raw => Self::LZ4_RAW,
88 }
89 }
90}
91
92#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum, Debug)]
93enum EncodingArgs {
94 Plain,
96
97 PlainDictionary,
99
100 Rle,
102
103 BitPacked,
105
106 DeltaBinaryPacked,
108
109 DeltaLengthByteArray,
111
112 DeltaByteArray,
114
115 RleDictionary,
117
118 ByteStreamSplit,
120}
121
122#[allow(deprecated)]
123impl From<EncodingArgs> for Encoding {
124 fn from(value: EncodingArgs) -> Self {
125 match value {
126 EncodingArgs::Plain => Self::PLAIN,
127 EncodingArgs::PlainDictionary => Self::PLAIN_DICTIONARY,
128 EncodingArgs::Rle => Self::RLE,
129 EncodingArgs::BitPacked => Self::BIT_PACKED,
130 EncodingArgs::DeltaBinaryPacked => Self::DELTA_BINARY_PACKED,
131 EncodingArgs::DeltaLengthByteArray => Self::DELTA_LENGTH_BYTE_ARRAY,
132 EncodingArgs::DeltaByteArray => Self::DELTA_BYTE_ARRAY,
133 EncodingArgs::RleDictionary => Self::RLE_DICTIONARY,
134 EncodingArgs::ByteStreamSplit => Self::BYTE_STREAM_SPLIT,
135 }
136 }
137}
138
139#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum, Debug)]
140enum EnabledStatisticsArgs {
141 None,
143
144 Chunk,
146
147 Page,
149}
150
151impl From<EnabledStatisticsArgs> for EnabledStatistics {
152 fn from(value: EnabledStatisticsArgs) -> Self {
153 match value {
154 EnabledStatisticsArgs::None => Self::None,
155 EnabledStatisticsArgs::Chunk => Self::Chunk,
156 EnabledStatisticsArgs::Page => Self::Page,
157 }
158 }
159}
160
161#[derive(Clone, Copy, Debug)]
162enum WriterVersionArgs {
163 Parquet1_0,
164 Parquet2_0,
165}
166
167impl ValueEnum for WriterVersionArgs {
168 fn value_variants<'a>() -> &'a [Self] {
169 &[Self::Parquet1_0, Self::Parquet2_0]
170 }
171
172 fn to_possible_value(&self) -> Option<PossibleValue> {
173 match self {
174 WriterVersionArgs::Parquet1_0 => Some(PossibleValue::new("1.0")),
175 WriterVersionArgs::Parquet2_0 => Some(PossibleValue::new("2.0")),
176 }
177 }
178}
179
180impl From<WriterVersionArgs> for WriterVersion {
181 fn from(value: WriterVersionArgs) -> Self {
182 match value {
183 WriterVersionArgs::Parquet1_0 => Self::PARQUET_1_0,
184 WriterVersionArgs::Parquet2_0 => Self::PARQUET_2_0,
185 }
186 }
187}
188
189#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum, Debug)]
190enum BloomFilterPositionArgs {
191 AfterRowGroup,
193
194 End,
196}
197
198impl From<BloomFilterPositionArgs> for BloomFilterPosition {
199 fn from(value: BloomFilterPositionArgs) -> Self {
200 match value {
201 BloomFilterPositionArgs::AfterRowGroup => Self::AfterRowGroup,
202 BloomFilterPositionArgs::End => Self::End,
203 }
204 }
205}
206
207#[derive(Debug, Parser)]
208#[clap(author, version, about("Read and write parquet file with potentially different settings"), long_about = None)]
209struct Args {
210 #[clap(short, long)]
212 input: String,
213
214 #[clap(short, long)]
216 output: String,
217
218 #[clap(long, value_enum)]
220 compression: Option<CompressionArgs>,
221
222 #[clap(long, value_enum)]
224 encoding: Option<EncodingArgs>,
225
226 #[clap(long)]
228 dictionary_enabled: Option<bool>,
229
230 #[clap(long)]
232 dictionary_page_size_limit: Option<usize>,
233
234 #[clap(long)]
236 max_row_group_size: Option<usize>,
237
238 #[clap(long)]
240 data_page_row_count_limit: Option<usize>,
241
242 #[clap(long)]
244 data_page_size_limit: Option<usize>,
245
246 #[clap(long)]
250 max_statistics_size: Option<usize>,
251
252 #[clap(long)]
254 bloom_filter_enabled: Option<bool>,
255
256 #[clap(long)]
258 bloom_filter_fpp: Option<f64>,
259
260 #[clap(long)]
262 bloom_filter_ndv: Option<u64>,
263
264 #[clap(long)]
266 bloom_filter_position: Option<BloomFilterPositionArgs>,
267
268 #[clap(long)]
270 statistics_enabled: Option<EnabledStatisticsArgs>,
271
272 #[clap(long)]
274 writer_version: Option<WriterVersionArgs>,
275
276 #[clap(long)]
278 coerce_types: Option<bool>,
279}
280
281fn main() {
282 let args = Args::parse();
283
284 let parquet_reader =
286 SerializedFileReader::new(File::open(&args.input).expect("Unable to open input file"))
287 .expect("Failed to create reader");
288 let kv_md = parquet_reader
289 .metadata()
290 .file_metadata()
291 .key_value_metadata()
292 .cloned();
293
294 let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(
296 File::open(args.input).expect("Unable to open input file"),
297 )
298 .expect("parquet open")
299 .build()
300 .expect("parquet open");
301
302 let mut writer_properties_builder = WriterProperties::builder().set_key_value_metadata(kv_md);
303 if let Some(value) = args.compression {
304 writer_properties_builder = writer_properties_builder.set_compression(value.into());
305 }
306
307 if let Some(value) = args.encoding {
309 writer_properties_builder = writer_properties_builder.set_encoding(value.into());
310 }
311 if let Some(value) = args.dictionary_enabled {
312 writer_properties_builder = writer_properties_builder.set_dictionary_enabled(value);
313 }
314 if let Some(value) = args.dictionary_page_size_limit {
315 writer_properties_builder = writer_properties_builder.set_dictionary_page_size_limit(value);
316 }
317
318 if let Some(value) = args.max_row_group_size {
319 writer_properties_builder = writer_properties_builder.set_max_row_group_size(value);
320 }
321 if let Some(value) = args.data_page_row_count_limit {
322 writer_properties_builder = writer_properties_builder.set_data_page_row_count_limit(value);
323 }
324 if let Some(value) = args.data_page_size_limit {
325 writer_properties_builder = writer_properties_builder.set_data_page_size_limit(value);
326 }
327 #[allow(deprecated)]
328 if let Some(value) = args.max_statistics_size {
329 writer_properties_builder = writer_properties_builder.set_max_statistics_size(value);
330 }
331 if let Some(value) = args.bloom_filter_enabled {
332 writer_properties_builder = writer_properties_builder.set_bloom_filter_enabled(value);
333
334 if value {
335 if let Some(value) = args.bloom_filter_fpp {
336 writer_properties_builder = writer_properties_builder.set_bloom_filter_fpp(value);
337 }
338 if let Some(value) = args.bloom_filter_ndv {
339 writer_properties_builder = writer_properties_builder.set_bloom_filter_ndv(value);
340 }
341 if let Some(value) = args.bloom_filter_position {
342 writer_properties_builder =
343 writer_properties_builder.set_bloom_filter_position(value.into());
344 }
345 }
346 }
347 if let Some(value) = args.statistics_enabled {
348 writer_properties_builder = writer_properties_builder.set_statistics_enabled(value.into());
349 }
350 if let Some(value) = args.writer_version {
351 writer_properties_builder = writer_properties_builder.set_writer_version(value.into());
352 }
353 if let Some(value) = args.coerce_types {
354 writer_properties_builder = writer_properties_builder.set_coerce_types(value);
355 }
356 let writer_properties = writer_properties_builder.build();
357 let mut parquet_writer = ArrowWriter::try_new(
358 File::create(&args.output).expect("Unable to open output file"),
359 parquet_reader.schema(),
360 Some(writer_properties),
361 )
362 .expect("create arrow writer");
363
364 for maybe_batch in parquet_reader {
365 let batch = maybe_batch.expect("reading batch");
366 parquet_writer.write(&batch).expect("writing data");
367 }
368
369 parquet_writer.close().expect("finalizing file");
370}