1use std::fs::File;
37
38use arrow_array::RecordBatchReader;
39use clap::{Parser, ValueEnum, builder::PossibleValue};
40use parquet::{
41 arrow::{ArrowWriter, arrow_reader::ParquetRecordBatchReaderBuilder},
42 basic::{BrotliLevel, Compression, Encoding, GzipLevel, ZstdLevel},
43 file::{
44 properties::{BloomFilterPosition, EnabledStatistics, WriterProperties, WriterVersion},
45 reader::FileReader,
46 serialized_reader::SerializedFileReader,
47 },
48};
49
50#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum, Debug)]
51enum CompressionArgs {
52 None,
54
55 Snappy,
57
58 Gzip,
60
61 Lzo,
63
64 Brotli,
66
67 Lz4,
69
70 Zstd,
72
73 Lz4Raw,
75}
76
77fn compression_from_args(codec: CompressionArgs, level: Option<u32>) -> Compression {
78 match codec {
79 CompressionArgs::None => Compression::UNCOMPRESSED,
80 CompressionArgs::Snappy => Compression::SNAPPY,
81 CompressionArgs::Gzip => match level {
82 Some(lvl) => {
83 Compression::GZIP(GzipLevel::try_new(lvl).expect("invalid gzip compression level"))
84 }
85 None => Compression::GZIP(Default::default()),
86 },
87 CompressionArgs::Lzo => Compression::LZO,
88 CompressionArgs::Brotli => match level {
89 Some(lvl) => Compression::BROTLI(
90 BrotliLevel::try_new(lvl).expect("invalid brotli compression level"),
91 ),
92 None => Compression::BROTLI(Default::default()),
93 },
94 CompressionArgs::Lz4 => Compression::LZ4,
95 CompressionArgs::Zstd => match level {
96 Some(lvl) => Compression::ZSTD(
97 ZstdLevel::try_new(lvl as i32).expect("invalid zstd compression level"),
98 ),
99 None => Compression::ZSTD(Default::default()),
100 },
101 CompressionArgs::Lz4Raw => Compression::LZ4_RAW,
102 }
103}
104
105#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum, Debug)]
106enum EncodingArgs {
107 Plain,
109
110 PlainDictionary,
112
113 Rle,
115
116 BitPacked,
118
119 DeltaBinaryPacked,
121
122 DeltaLengthByteArray,
124
125 DeltaByteArray,
127
128 RleDictionary,
130
131 ByteStreamSplit,
133}
134
135#[allow(deprecated)]
136impl From<EncodingArgs> for Encoding {
137 fn from(value: EncodingArgs) -> Self {
138 match value {
139 EncodingArgs::Plain => Self::PLAIN,
140 EncodingArgs::PlainDictionary => Self::PLAIN_DICTIONARY,
141 EncodingArgs::Rle => Self::RLE,
142 EncodingArgs::BitPacked => Self::BIT_PACKED,
143 EncodingArgs::DeltaBinaryPacked => Self::DELTA_BINARY_PACKED,
144 EncodingArgs::DeltaLengthByteArray => Self::DELTA_LENGTH_BYTE_ARRAY,
145 EncodingArgs::DeltaByteArray => Self::DELTA_BYTE_ARRAY,
146 EncodingArgs::RleDictionary => Self::RLE_DICTIONARY,
147 EncodingArgs::ByteStreamSplit => Self::BYTE_STREAM_SPLIT,
148 }
149 }
150}
151
152#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum, Debug)]
153enum EnabledStatisticsArgs {
154 None,
156
157 Chunk,
159
160 Page,
162}
163
164impl From<EnabledStatisticsArgs> for EnabledStatistics {
165 fn from(value: EnabledStatisticsArgs) -> Self {
166 match value {
167 EnabledStatisticsArgs::None => Self::None,
168 EnabledStatisticsArgs::Chunk => Self::Chunk,
169 EnabledStatisticsArgs::Page => Self::Page,
170 }
171 }
172}
173
174#[derive(Clone, Copy, Debug)]
175enum WriterVersionArgs {
176 Parquet1_0,
177 Parquet2_0,
178}
179
180impl ValueEnum for WriterVersionArgs {
181 fn value_variants<'a>() -> &'a [Self] {
182 &[Self::Parquet1_0, Self::Parquet2_0]
183 }
184
185 fn to_possible_value(&self) -> Option<PossibleValue> {
186 match self {
187 WriterVersionArgs::Parquet1_0 => Some(PossibleValue::new("1.0")),
188 WriterVersionArgs::Parquet2_0 => Some(PossibleValue::new("2.0")),
189 }
190 }
191}
192
193impl From<WriterVersionArgs> for WriterVersion {
194 fn from(value: WriterVersionArgs) -> Self {
195 match value {
196 WriterVersionArgs::Parquet1_0 => Self::PARQUET_1_0,
197 WriterVersionArgs::Parquet2_0 => Self::PARQUET_2_0,
198 }
199 }
200}
201
202#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum, Debug)]
203enum BloomFilterPositionArgs {
204 AfterRowGroup,
206
207 End,
209}
210
211impl From<BloomFilterPositionArgs> for BloomFilterPosition {
212 fn from(value: BloomFilterPositionArgs) -> Self {
213 match value {
214 BloomFilterPositionArgs::AfterRowGroup => Self::AfterRowGroup,
215 BloomFilterPositionArgs::End => Self::End,
216 }
217 }
218}
219
220#[derive(Debug, Parser)]
221#[clap(author, version, about("Read and write parquet file with potentially different settings"), long_about = None)]
222struct Args {
223 #[clap(short, long)]
225 input: String,
226
227 #[clap(short, long)]
229 output: String,
230
231 #[clap(long, value_enum)]
233 compression: Option<CompressionArgs>,
234
235 #[clap(long)]
237 compression_level: Option<u32>,
238
239 #[clap(long, value_enum)]
241 encoding: Option<EncodingArgs>,
242
243 #[clap(long)]
245 dictionary_enabled: Option<bool>,
246
247 #[clap(long)]
249 dictionary_page_size_limit: Option<usize>,
250
251 #[clap(long)]
253 max_row_group_size: Option<usize>,
254
255 #[clap(long)]
257 data_page_row_count_limit: Option<usize>,
258
259 #[clap(long)]
261 data_page_size_limit: Option<usize>,
262
263 #[clap(long)]
268 statistics_truncate_length: Option<usize>,
269
270 #[clap(long)]
274 column_index_truncate_length: Option<usize>,
275
276 #[clap(long)]
280 write_page_header_statistics: Option<bool>,
281
282 #[clap(long)]
284 bloom_filter_enabled: Option<bool>,
285
286 #[clap(long)]
288 bloom_filter_fpp: Option<f64>,
289
290 #[clap(long)]
292 bloom_filter_ndv: Option<u64>,
293
294 #[clap(long)]
296 bloom_filter_position: Option<BloomFilterPositionArgs>,
297
298 #[clap(long)]
300 statistics_enabled: Option<EnabledStatisticsArgs>,
301
302 #[clap(long)]
304 writer_version: Option<WriterVersionArgs>,
305
306 #[clap(long)]
308 write_batch_size: Option<usize>,
309
310 #[clap(long)]
312 coerce_types: Option<bool>,
313}
314
315fn main() {
316 let args = Args::parse();
317
318 let parquet_reader =
320 SerializedFileReader::new(File::open(&args.input).expect("Unable to open input file"))
321 .expect("Failed to create reader");
322 let kv_md = parquet_reader
323 .metadata()
324 .file_metadata()
325 .key_value_metadata()
326 .cloned();
327
328 let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(
330 File::open(args.input).expect("Unable to open input file"),
331 )
332 .expect("parquet open")
333 .build()
334 .expect("parquet open");
335
336 let mut writer_properties_builder = WriterProperties::builder().set_key_value_metadata(kv_md);
337
338 if let Some(value) = args.compression {
339 let compression = compression_from_args(value, args.compression_level);
340 writer_properties_builder = writer_properties_builder.set_compression(compression);
341 }
342
343 if let Some(value) = args.encoding {
345 writer_properties_builder = writer_properties_builder.set_encoding(value.into());
346 }
347 if let Some(value) = args.dictionary_enabled {
348 writer_properties_builder = writer_properties_builder.set_dictionary_enabled(value);
349 }
350 if let Some(value) = args.dictionary_page_size_limit {
351 writer_properties_builder = writer_properties_builder.set_dictionary_page_size_limit(value);
352 }
353
354 if let Some(value) = args.max_row_group_size {
355 writer_properties_builder = writer_properties_builder.set_max_row_group_size(value);
356 }
357 if let Some(value) = args.data_page_row_count_limit {
358 writer_properties_builder = writer_properties_builder.set_data_page_row_count_limit(value);
359 }
360 if let Some(value) = args.data_page_size_limit {
361 writer_properties_builder = writer_properties_builder.set_data_page_size_limit(value);
362 }
363 if let Some(value) = args.dictionary_page_size_limit {
364 writer_properties_builder = writer_properties_builder.set_dictionary_page_size_limit(value);
365 }
366 if let Some(value) = args.statistics_truncate_length {
367 writer_properties_builder =
368 writer_properties_builder.set_statistics_truncate_length(Some(value));
369 }
370 if let Some(value) = args.column_index_truncate_length {
371 writer_properties_builder =
372 writer_properties_builder.set_column_index_truncate_length(Some(value));
373 }
374 if let Some(value) = args.bloom_filter_enabled {
375 writer_properties_builder = writer_properties_builder.set_bloom_filter_enabled(value);
376
377 if value {
378 if let Some(value) = args.bloom_filter_fpp {
379 writer_properties_builder = writer_properties_builder.set_bloom_filter_fpp(value);
380 }
381 if let Some(value) = args.bloom_filter_ndv {
382 writer_properties_builder = writer_properties_builder.set_bloom_filter_ndv(value);
383 }
384 if let Some(value) = args.bloom_filter_position {
385 writer_properties_builder =
386 writer_properties_builder.set_bloom_filter_position(value.into());
387 }
388 }
389 }
390 if let Some(value) = args.statistics_enabled {
391 writer_properties_builder = writer_properties_builder.set_statistics_enabled(value.into());
392 }
393 if let Some(value) = args.write_page_header_statistics {
395 writer_properties_builder =
396 writer_properties_builder.set_write_page_header_statistics(value);
397 if value {
398 writer_properties_builder =
399 writer_properties_builder.set_statistics_enabled(EnabledStatistics::Page);
400 }
401 }
402 if let Some(value) = args.writer_version {
403 writer_properties_builder = writer_properties_builder.set_writer_version(value.into());
404 }
405 if let Some(value) = args.coerce_types {
406 writer_properties_builder = writer_properties_builder.set_coerce_types(value);
407 }
408 if let Some(value) = args.write_batch_size {
409 writer_properties_builder = writer_properties_builder.set_write_batch_size(value);
410 }
411 let writer_properties = writer_properties_builder.build();
412 let mut parquet_writer = ArrowWriter::try_new(
413 File::create(&args.output).expect("Unable to open output file"),
414 parquet_reader.schema(),
415 Some(writer_properties),
416 )
417 .expect("create arrow writer");
418
419 for maybe_batch in parquet_reader {
420 let batch = maybe_batch.expect("reading batch");
421 parquet_writer.write(&batch).expect("writing data");
422 }
423
424 parquet_writer.close().expect("finalizing file");
425}