use std::fs::File;
use arrow_array::RecordBatchReader;
use clap::{builder::PossibleValue, Parser, ValueEnum};
use parquet::{
arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, ArrowWriter},
basic::Compression,
file::{
properties::{EnabledStatistics, WriterProperties, WriterVersion},
reader::FileReader,
serialized_reader::SerializedFileReader,
},
};
#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum, Debug)]
enum CompressionArgs {
None,
Snappy,
Gzip,
Lzo,
Brotli,
Lz4,
Zstd,
Lz4Raw,
}
impl From<CompressionArgs> for Compression {
fn from(value: CompressionArgs) -> Self {
match value {
CompressionArgs::None => Self::UNCOMPRESSED,
CompressionArgs::Snappy => Self::SNAPPY,
CompressionArgs::Gzip => Self::GZIP(Default::default()),
CompressionArgs::Lzo => Self::LZO,
CompressionArgs::Brotli => Self::BROTLI(Default::default()),
CompressionArgs::Lz4 => Self::LZ4,
CompressionArgs::Zstd => Self::ZSTD(Default::default()),
CompressionArgs::Lz4Raw => Self::LZ4_RAW,
}
}
}
#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum, Debug)]
enum EnabledStatisticsArgs {
None,
Chunk,
Page,
}
impl From<EnabledStatisticsArgs> for EnabledStatistics {
fn from(value: EnabledStatisticsArgs) -> Self {
match value {
EnabledStatisticsArgs::None => Self::None,
EnabledStatisticsArgs::Chunk => Self::Chunk,
EnabledStatisticsArgs::Page => Self::Page,
}
}
}
#[derive(Clone, Copy, Debug)]
enum WriterVersionArgs {
Parquet1_0,
Parquet2_0,
}
impl ValueEnum for WriterVersionArgs {
fn value_variants<'a>() -> &'a [Self] {
&[Self::Parquet1_0, Self::Parquet2_0]
}
fn to_possible_value(&self) -> Option<PossibleValue> {
match self {
WriterVersionArgs::Parquet1_0 => Some(PossibleValue::new("1.0")),
WriterVersionArgs::Parquet2_0 => Some(PossibleValue::new("2.0")),
}
}
}
impl From<WriterVersionArgs> for WriterVersion {
fn from(value: WriterVersionArgs) -> Self {
match value {
WriterVersionArgs::Parquet1_0 => Self::PARQUET_1_0,
WriterVersionArgs::Parquet2_0 => Self::PARQUET_2_0,
}
}
}
#[derive(Debug, Parser)]
#[clap(author, version, about("Read and write parquet file with potentially different settings"), long_about = None)]
struct Args {
#[clap(short, long)]
input: String,
#[clap(short, long)]
output: String,
#[clap(long, value_enum)]
compression: Option<CompressionArgs>,
#[clap(long)]
max_row_group_size: Option<usize>,
#[clap(long)]
data_page_row_count_limit: Option<usize>,
#[clap(long)]
data_page_size_limit: Option<usize>,
#[clap(long)]
max_statistics_size: Option<usize>,
#[clap(long)]
dictionary_page_size_limit: Option<usize>,
#[clap(long)]
bloom_filter_enabled: Option<bool>,
#[clap(long)]
bloom_filter_fpp: Option<f64>,
#[clap(long)]
bloom_filter_ndv: Option<u64>,
#[clap(long)]
dictionary_enabled: Option<bool>,
#[clap(long)]
statistics_enabled: Option<EnabledStatisticsArgs>,
#[clap(long)]
writer_version: Option<WriterVersionArgs>,
}
fn main() {
let args = Args::parse();
let parquet_reader =
SerializedFileReader::new(File::open(&args.input).expect("Unable to open input file"))
.expect("Failed to create reader");
let kv_md = parquet_reader
.metadata()
.file_metadata()
.key_value_metadata()
.cloned();
let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(
File::open(args.input).expect("Unable to open input file"),
)
.expect("parquet open")
.build()
.expect("parquet open");
let mut writer_properties_builder = WriterProperties::builder().set_key_value_metadata(kv_md);
if let Some(value) = args.compression {
writer_properties_builder = writer_properties_builder.set_compression(value.into());
}
if let Some(value) = args.max_row_group_size {
writer_properties_builder = writer_properties_builder.set_max_row_group_size(value);
}
if let Some(value) = args.data_page_row_count_limit {
writer_properties_builder = writer_properties_builder.set_data_page_row_count_limit(value);
}
if let Some(value) = args.data_page_size_limit {
writer_properties_builder = writer_properties_builder.set_data_page_size_limit(value);
}
if let Some(value) = args.dictionary_page_size_limit {
writer_properties_builder = writer_properties_builder.set_dictionary_page_size_limit(value);
}
if let Some(value) = args.max_statistics_size {
writer_properties_builder = writer_properties_builder.set_max_statistics_size(value);
}
if let Some(value) = args.bloom_filter_enabled {
writer_properties_builder = writer_properties_builder.set_bloom_filter_enabled(value);
if value {
if let Some(value) = args.bloom_filter_fpp {
writer_properties_builder = writer_properties_builder.set_bloom_filter_fpp(value);
}
if let Some(value) = args.bloom_filter_ndv {
writer_properties_builder = writer_properties_builder.set_bloom_filter_ndv(value);
}
}
}
if let Some(value) = args.dictionary_enabled {
writer_properties_builder = writer_properties_builder.set_dictionary_enabled(value);
}
if let Some(value) = args.statistics_enabled {
writer_properties_builder = writer_properties_builder.set_statistics_enabled(value.into());
}
if let Some(value) = args.writer_version {
writer_properties_builder = writer_properties_builder.set_writer_version(value.into());
}
let writer_properties = writer_properties_builder.build();
let mut parquet_writer = ArrowWriter::try_new(
File::create(&args.output).expect("Unable to open output file"),
parquet_reader.schema(),
Some(writer_properties),
)
.expect("create arrow writer");
for maybe_batch in parquet_reader {
let batch = maybe_batch.expect("reading batch");
parquet_writer.write(&batch).expect("writing data");
}
parquet_writer.close().expect("finalizing file");
}