parquet_rewrite/
parquet-rewrite.rs1use std::fs::File;
37
38use arrow_array::RecordBatchReader;
39use clap::{builder::PossibleValue, Parser, ValueEnum};
40use parquet::{
41 arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, ArrowWriter},
42 basic::Compression,
43 file::{
44 properties::{EnabledStatistics, WriterProperties, WriterVersion},
45 reader::FileReader,
46 serialized_reader::SerializedFileReader,
47 },
48};
49
50#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum, Debug)]
51enum CompressionArgs {
52 None,
54
55 Snappy,
57
58 Gzip,
60
61 Lzo,
63
64 Brotli,
66
67 Lz4,
69
70 Zstd,
72
73 Lz4Raw,
75}
76
77impl From<CompressionArgs> for Compression {
78 fn from(value: CompressionArgs) -> Self {
79 match value {
80 CompressionArgs::None => Self::UNCOMPRESSED,
81 CompressionArgs::Snappy => Self::SNAPPY,
82 CompressionArgs::Gzip => Self::GZIP(Default::default()),
83 CompressionArgs::Lzo => Self::LZO,
84 CompressionArgs::Brotli => Self::BROTLI(Default::default()),
85 CompressionArgs::Lz4 => Self::LZ4,
86 CompressionArgs::Zstd => Self::ZSTD(Default::default()),
87 CompressionArgs::Lz4Raw => Self::LZ4_RAW,
88 }
89 }
90}
91
92#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum, Debug)]
93enum EnabledStatisticsArgs {
94 None,
96
97 Chunk,
99
100 Page,
102}
103
104impl From<EnabledStatisticsArgs> for EnabledStatistics {
105 fn from(value: EnabledStatisticsArgs) -> Self {
106 match value {
107 EnabledStatisticsArgs::None => Self::None,
108 EnabledStatisticsArgs::Chunk => Self::Chunk,
109 EnabledStatisticsArgs::Page => Self::Page,
110 }
111 }
112}
113
114#[derive(Clone, Copy, Debug)]
115enum WriterVersionArgs {
116 Parquet1_0,
117 Parquet2_0,
118}
119
120impl ValueEnum for WriterVersionArgs {
121 fn value_variants<'a>() -> &'a [Self] {
122 &[Self::Parquet1_0, Self::Parquet2_0]
123 }
124
125 fn to_possible_value(&self) -> Option<PossibleValue> {
126 match self {
127 WriterVersionArgs::Parquet1_0 => Some(PossibleValue::new("1.0")),
128 WriterVersionArgs::Parquet2_0 => Some(PossibleValue::new("2.0")),
129 }
130 }
131}
132
133impl From<WriterVersionArgs> for WriterVersion {
134 fn from(value: WriterVersionArgs) -> Self {
135 match value {
136 WriterVersionArgs::Parquet1_0 => Self::PARQUET_1_0,
137 WriterVersionArgs::Parquet2_0 => Self::PARQUET_2_0,
138 }
139 }
140}
141
142#[derive(Debug, Parser)]
143#[clap(author, version, about("Read and write parquet file with potentially different settings"), long_about = None)]
144struct Args {
145 #[clap(short, long)]
147 input: String,
148
149 #[clap(short, long)]
151 output: String,
152
153 #[clap(long, value_enum)]
155 compression: Option<CompressionArgs>,
156
157 #[clap(long)]
159 max_row_group_size: Option<usize>,
160
161 #[clap(long)]
163 data_page_row_count_limit: Option<usize>,
164
165 #[clap(long)]
167 data_page_size_limit: Option<usize>,
168
169 #[clap(long)]
173 max_statistics_size: Option<usize>,
174
175 #[clap(long)]
177 dictionary_page_size_limit: Option<usize>,
178
179 #[clap(long)]
181 bloom_filter_enabled: Option<bool>,
182
183 #[clap(long)]
185 bloom_filter_fpp: Option<f64>,
186
187 #[clap(long)]
189 bloom_filter_ndv: Option<u64>,
190
191 #[clap(long)]
193 dictionary_enabled: Option<bool>,
194
195 #[clap(long)]
197 statistics_enabled: Option<EnabledStatisticsArgs>,
198
199 #[clap(long)]
201 writer_version: Option<WriterVersionArgs>,
202
203 #[clap(long)]
205 coerce_types: Option<bool>,
206}
207
208fn main() {
209 let args = Args::parse();
210
211 let parquet_reader =
213 SerializedFileReader::new(File::open(&args.input).expect("Unable to open input file"))
214 .expect("Failed to create reader");
215 let kv_md = parquet_reader
216 .metadata()
217 .file_metadata()
218 .key_value_metadata()
219 .cloned();
220
221 let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(
223 File::open(args.input).expect("Unable to open input file"),
224 )
225 .expect("parquet open")
226 .build()
227 .expect("parquet open");
228
229 let mut writer_properties_builder = WriterProperties::builder().set_key_value_metadata(kv_md);
230 if let Some(value) = args.compression {
231 writer_properties_builder = writer_properties_builder.set_compression(value.into());
232 }
233 if let Some(value) = args.max_row_group_size {
234 writer_properties_builder = writer_properties_builder.set_max_row_group_size(value);
235 }
236 if let Some(value) = args.data_page_row_count_limit {
237 writer_properties_builder = writer_properties_builder.set_data_page_row_count_limit(value);
238 }
239 if let Some(value) = args.data_page_size_limit {
240 writer_properties_builder = writer_properties_builder.set_data_page_size_limit(value);
241 }
242 if let Some(value) = args.dictionary_page_size_limit {
243 writer_properties_builder = writer_properties_builder.set_dictionary_page_size_limit(value);
244 }
245 #[allow(deprecated)]
246 if let Some(value) = args.max_statistics_size {
247 writer_properties_builder = writer_properties_builder.set_max_statistics_size(value);
248 }
249 if let Some(value) = args.bloom_filter_enabled {
250 writer_properties_builder = writer_properties_builder.set_bloom_filter_enabled(value);
251
252 if value {
253 if let Some(value) = args.bloom_filter_fpp {
254 writer_properties_builder = writer_properties_builder.set_bloom_filter_fpp(value);
255 }
256 if let Some(value) = args.bloom_filter_ndv {
257 writer_properties_builder = writer_properties_builder.set_bloom_filter_ndv(value);
258 }
259 }
260 }
261 if let Some(value) = args.dictionary_enabled {
262 writer_properties_builder = writer_properties_builder.set_dictionary_enabled(value);
263 }
264 if let Some(value) = args.statistics_enabled {
265 writer_properties_builder = writer_properties_builder.set_statistics_enabled(value.into());
266 }
267 if let Some(value) = args.writer_version {
268 writer_properties_builder = writer_properties_builder.set_writer_version(value.into());
269 }
270 if let Some(value) = args.coerce_types {
271 writer_properties_builder = writer_properties_builder.set_coerce_types(value);
272 }
273 let writer_properties = writer_properties_builder.build();
274 let mut parquet_writer = ArrowWriter::try_new(
275 File::create(&args.output).expect("Unable to open output file"),
276 parquet_reader.schema(),
277 Some(writer_properties),
278 )
279 .expect("create arrow writer");
280
281 for maybe_batch in parquet_reader {
282 let batch = maybe_batch.expect("reading batch");
283 parquet_writer.write(&batch).expect("writing data");
284 }
285
286 parquet_writer.close().expect("finalizing file");
287}