#![doc = include_str!("./parquet-fromcsv-help.txt")] use std::{
fmt::Display,
fs::{read_to_string, File},
io::Read,
path::{Path, PathBuf},
sync::Arc,
};
use arrow_csv::ReaderBuilder;
use arrow_schema::{ArrowError, Schema};
use clap::{Parser, ValueEnum};
use parquet::arrow::arrow_writer::ArrowWriterOptions;
use parquet::{
arrow::{parquet_to_arrow_schema, ArrowWriter},
basic::Compression,
errors::ParquetError,
file::properties::{WriterProperties, WriterVersion},
schema::{parser::parse_message_type, types::SchemaDescriptor},
};
#[derive(Debug)]
enum ParquetFromCsvError {
CommandLineParseError(clap::Error),
IoError(std::io::Error),
ArrowError(ArrowError),
ParquetError(ParquetError),
WithContext(String, Box<Self>),
}
impl From<std::io::Error> for ParquetFromCsvError {
fn from(e: std::io::Error) -> Self {
Self::IoError(e)
}
}
impl From<ArrowError> for ParquetFromCsvError {
fn from(e: ArrowError) -> Self {
Self::ArrowError(e)
}
}
impl From<ParquetError> for ParquetFromCsvError {
fn from(e: ParquetError) -> Self {
Self::ParquetError(e)
}
}
impl From<clap::Error> for ParquetFromCsvError {
fn from(e: clap::Error) -> Self {
Self::CommandLineParseError(e)
}
}
impl ParquetFromCsvError {
pub fn with_context<E: Into<ParquetFromCsvError>>(
inner_error: E,
context: &str,
) -> ParquetFromCsvError {
let inner = inner_error.into();
ParquetFromCsvError::WithContext(context.to_string(), Box::new(inner))
}
}
impl Display for ParquetFromCsvError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ParquetFromCsvError::CommandLineParseError(e) => write!(f, "{e}"),
ParquetFromCsvError::IoError(e) => write!(f, "{e}"),
ParquetFromCsvError::ArrowError(e) => write!(f, "{e}"),
ParquetFromCsvError::ParquetError(e) => write!(f, "{e}"),
ParquetFromCsvError::WithContext(c, e) => {
writeln!(f, "{e}")?;
write!(f, "context: {c}")
}
}
}
}
#[derive(Debug, Parser)]
#[clap(author, version, disable_help_flag=true, about("Binary to convert csv to Parquet"), long_about=None)]
struct Args {
#[clap(short, long, help("message schema for output Parquet"))]
schema: PathBuf,
#[clap(short, long, help("input CSV file"))]
input_file: PathBuf,
#[clap(short, long, help("output Parquet file"))]
output_file: PathBuf,
#[clap(
value_enum,
short('f'),
long,
help("input file format"),
default_value_t=CsvDialect::Csv
)]
input_format: CsvDialect,
#[clap(
short,
long,
help("batch size"),
default_value_t = 1000,
env = "PARQUET_FROM_CSV_BATCHSIZE"
)]
batch_size: usize,
#[clap(short, long, help("has header"))]
has_header: bool,
#[clap(short, long, help("field delimiter"))]
delimiter: Option<char>,
#[clap(value_enum, short, long, help("record terminator"))]
record_terminator: Option<RecordTerminator>,
#[clap(short, long, help("escape character"))]
escape_char: Option<char>,
#[clap(short, long, help("quote character"))]
quote_char: Option<char>,
#[clap(short('D'), long, help("double quote"))]
double_quote: Option<bool>,
#[clap(short('C'), long, help("compression mode of csv"), default_value_t=Compression::UNCOMPRESSED)]
#[clap(value_parser=compression_from_str)]
csv_compression: Compression,
#[clap(short('c'), long, help("compression mode of parquet"), default_value_t=Compression::SNAPPY)]
#[clap(value_parser=compression_from_str)]
parquet_compression: Compression,
#[clap(short, long, help("writer version"))]
#[clap(value_parser=writer_version_from_str)]
writer_version: Option<WriterVersion>,
#[clap(short, long, help("max row group size"))]
max_row_group_size: Option<usize>,
#[clap(long, help("whether to enable bloom filter writing"))]
enable_bloom_filter: Option<bool>,
#[clap(long, action=clap::ArgAction::Help, help("display usage help"))]
help: Option<bool>,
}
fn compression_from_str(cmp: &str) -> Result<Compression, String> {
match cmp.to_uppercase().as_str() {
"UNCOMPRESSED" => Ok(Compression::UNCOMPRESSED),
"SNAPPY" => Ok(Compression::SNAPPY),
"GZIP" => Ok(Compression::GZIP(Default::default())),
"LZO" => Ok(Compression::LZO),
"BROTLI" => Ok(Compression::BROTLI(Default::default())),
"LZ4" => Ok(Compression::LZ4),
"ZSTD" => Ok(Compression::ZSTD(Default::default())),
v => Err(
format!("Unknown compression {v} : possible values UNCOMPRESSED, SNAPPY, GZIP, LZO, BROTLI, LZ4, ZSTD \n\nFor more information try --help")
)
}
}
fn writer_version_from_str(cmp: &str) -> Result<WriterVersion, String> {
match cmp.to_uppercase().as_str() {
"1" => Ok(WriterVersion::PARQUET_1_0),
"2" => Ok(WriterVersion::PARQUET_2_0),
v => Err(format!("Unknown writer version {v} : possible values 1, 2")),
}
}
impl Args {
fn schema_path(&self) -> &Path {
self.schema.as_path()
}
fn get_delimiter(&self) -> u8 {
match self.delimiter {
Some(ch) => ch as u8,
None => match self.input_format {
CsvDialect::Csv => b',',
CsvDialect::Tsv => b'\t',
},
}
}
fn get_terminator(&self) -> Option<u8> {
match self.record_terminator {
Some(RecordTerminator::LF) => Some(0x0a),
Some(RecordTerminator::CR) => Some(0x0d),
Some(RecordTerminator::Crlf) => None,
None => match self.input_format {
CsvDialect::Csv => None,
CsvDialect::Tsv => Some(0x0a),
},
}
}
fn get_escape(&self) -> Option<u8> {
self.escape_char.map(|ch| ch as u8)
}
fn get_quote(&self) -> Option<u8> {
if self.quote_char.is_none() {
match self.input_format {
CsvDialect::Csv => Some(b'\"'),
CsvDialect::Tsv => None,
}
} else {
self.quote_char.map(|c| c as u8)
}
}
}
#[derive(Debug, Clone, Copy, ValueEnum, PartialEq)]
enum CsvDialect {
Csv,
Tsv,
}
#[derive(Debug, Clone, Copy, ValueEnum, PartialEq)]
enum RecordTerminator {
LF,
Crlf,
CR,
}
fn configure_writer_properties(args: &Args) -> WriterProperties {
let mut properties_builder =
WriterProperties::builder().set_compression(args.parquet_compression);
if let Some(writer_version) = args.writer_version {
properties_builder = properties_builder.set_writer_version(writer_version);
}
if let Some(max_row_group_size) = args.max_row_group_size {
properties_builder = properties_builder.set_max_row_group_size(max_row_group_size);
}
if let Some(enable_bloom_filter) = args.enable_bloom_filter {
properties_builder = properties_builder.set_bloom_filter_enabled(enable_bloom_filter);
}
properties_builder.build()
}
fn configure_reader_builder(args: &Args, arrow_schema: Arc<Schema>) -> ReaderBuilder {
fn configure_reader<T, F: Fn(ReaderBuilder, T) -> ReaderBuilder>(
builder: ReaderBuilder,
value: Option<T>,
fun: F,
) -> ReaderBuilder {
if let Some(val) = value {
fun(builder, val)
} else {
builder
}
}
let mut builder = ReaderBuilder::new(arrow_schema)
.with_batch_size(args.batch_size)
.with_header(args.has_header)
.with_delimiter(args.get_delimiter());
builder = configure_reader(
builder,
args.get_terminator(),
ReaderBuilder::with_terminator,
);
builder = configure_reader(builder, args.get_escape(), ReaderBuilder::with_escape);
builder = configure_reader(builder, args.get_quote(), ReaderBuilder::with_quote);
builder
}
fn convert_csv_to_parquet(args: &Args) -> Result<(), ParquetFromCsvError> {
let schema = read_to_string(args.schema_path()).map_err(|e| {
ParquetFromCsvError::with_context(
e,
&format!("Failed to open schema file {:#?}", args.schema_path()),
)
})?;
let parquet_schema = Arc::new(parse_message_type(&schema)?);
let desc = SchemaDescriptor::new(parquet_schema);
let arrow_schema = Arc::new(parquet_to_arrow_schema(&desc, None)?);
let parquet_file = File::create(&args.output_file).map_err(|e| {
ParquetFromCsvError::with_context(
e,
&format!("Failed to create output file {:#?}", &args.output_file),
)
})?;
let options = ArrowWriterOptions::new()
.with_properties(configure_writer_properties(args))
.with_schema_root(desc.name().to_string());
let mut arrow_writer =
ArrowWriter::try_new_with_options(parquet_file, arrow_schema.clone(), options)
.map_err(|e| ParquetFromCsvError::with_context(e, "Failed to create ArrowWriter"))?;
let input_file = File::open(&args.input_file).map_err(|e| {
ParquetFromCsvError::with_context(
e,
&format!("Failed to open input file {:#?}", &args.input_file),
)
})?;
let input_file_decoder = match args.csv_compression {
Compression::UNCOMPRESSED => Box::new(input_file) as Box<dyn Read>,
Compression::SNAPPY => Box::new(snap::read::FrameDecoder::new(input_file)) as Box<dyn Read>,
Compression::GZIP(_) => {
Box::new(flate2::read::MultiGzDecoder::new(input_file)) as Box<dyn Read>
}
Compression::BROTLI(_) => {
Box::new(brotli::Decompressor::new(input_file, 0)) as Box<dyn Read>
}
Compression::LZ4 => {
Box::new(lz4_flex::frame::FrameDecoder::new(input_file)) as Box<dyn Read>
}
Compression::ZSTD(_) => {
Box::new(zstd::Decoder::new(input_file).map_err(|e| {
ParquetFromCsvError::with_context(e, "Failed to create zstd::Decoder")
})?) as Box<dyn Read>
}
d => unimplemented!("compression type {d}"),
};
let builder = configure_reader_builder(args, arrow_schema);
let reader = builder.build(input_file_decoder)?;
for batch_result in reader {
let batch = batch_result.map_err(|e| {
ParquetFromCsvError::with_context(e, "Failed to read RecordBatch from CSV")
})?;
arrow_writer.write(&batch).map_err(|e| {
ParquetFromCsvError::with_context(e, "Failed to write RecordBatch to parquet")
})?;
}
arrow_writer
.close()
.map_err(|e| ParquetFromCsvError::with_context(e, "Failed to close parquet"))?;
Ok(())
}
fn main() -> Result<(), ParquetFromCsvError> {
let args = Args::parse();
convert_csv_to_parquet(&args)
}
#[cfg(test)]
mod tests {
use std::{
io::Write,
path::{Path, PathBuf},
};
use super::*;
use arrow::datatypes::{DataType, Field};
use brotli::CompressorWriter;
use clap::{CommandFactory, Parser};
use flate2::write::GzEncoder;
use parquet::basic::{BrotliLevel, GzipLevel, ZstdLevel};
use parquet::file::reader::{FileReader, SerializedFileReader};
use snap::write::FrameEncoder;
use tempfile::NamedTempFile;
#[test]
fn test_command_help() {
let mut cmd = Args::command();
let dir = std::env::var("CARGO_MANIFEST_DIR").unwrap();
let mut path_buf = PathBuf::from(dir);
path_buf.push("src");
path_buf.push("bin");
path_buf.push("parquet-fromcsv-help.txt");
let expected = std::fs::read_to_string(path_buf).unwrap();
let mut buffer_vec = Vec::new();
let mut buffer = std::io::Cursor::new(&mut buffer_vec);
cmd.write_long_help(&mut buffer).unwrap();
let mut actual = String::from_utf8(buffer_vec).unwrap();
let pos = actual.find('\n').unwrap() + 1;
actual = actual[pos..].to_string();
assert_eq!(
expected, actual,
"help text not match. please update to \n---\n{actual}\n---\n"
)
}
fn parse_args(mut extra_args: Vec<&str>) -> Result<Args, ParquetFromCsvError> {
let mut args = vec![
"test",
"--schema",
"test.schema",
"--input-file",
"infile.csv",
"--output-file",
"out.parquet",
];
args.append(&mut extra_args);
let args = Args::try_parse_from(args.iter())?;
Ok(args)
}
#[test]
fn test_parse_arg_minimum() -> Result<(), ParquetFromCsvError> {
let args = parse_args(vec![])?;
assert_eq!(args.schema, PathBuf::from(Path::new("test.schema")));
assert_eq!(args.input_file, PathBuf::from(Path::new("infile.csv")));
assert_eq!(args.output_file, PathBuf::from(Path::new("out.parquet")));
assert_eq!(args.input_format, CsvDialect::Csv);
assert_eq!(args.batch_size, 1000);
assert!(!args.has_header);
assert_eq!(args.delimiter, None);
assert_eq!(args.get_delimiter(), b',');
assert_eq!(args.record_terminator, None);
assert_eq!(args.get_terminator(), None); assert_eq!(args.quote_char, None);
assert_eq!(args.get_quote(), Some(b'\"'));
assert_eq!(args.double_quote, None);
assert_eq!(args.parquet_compression, Compression::SNAPPY);
Ok(())
}
#[test]
fn test_parse_arg_format_variants() -> Result<(), ParquetFromCsvError> {
let args = parse_args(vec!["--input-format", "csv"])?;
assert_eq!(args.input_format, CsvDialect::Csv);
assert_eq!(args.get_delimiter(), b',');
assert_eq!(args.get_terminator(), None); assert_eq!(args.get_quote(), Some(b'\"'));
assert_eq!(args.get_escape(), None);
let args = parse_args(vec!["--input-format", "tsv"])?;
assert_eq!(args.input_format, CsvDialect::Tsv);
assert_eq!(args.get_delimiter(), b'\t');
assert_eq!(args.get_terminator(), Some(b'\x0a')); assert_eq!(args.get_quote(), None); assert_eq!(args.get_escape(), None);
let args = parse_args(vec!["--input-format", "csv", "--escape-char", "\\"])?;
assert_eq!(args.input_format, CsvDialect::Csv);
assert_eq!(args.get_delimiter(), b',');
assert_eq!(args.get_terminator(), None); assert_eq!(args.get_quote(), Some(b'\"'));
assert_eq!(args.get_escape(), Some(b'\\'));
let args = parse_args(vec!["--input-format", "tsv", "--delimiter", ":"])?;
assert_eq!(args.input_format, CsvDialect::Tsv);
assert_eq!(args.get_delimiter(), b':');
assert_eq!(args.get_terminator(), Some(b'\x0a')); assert_eq!(args.get_quote(), None); assert_eq!(args.get_escape(), None);
Ok(())
}
#[test]
#[should_panic]
fn test_parse_arg_format_error() {
parse_args(vec!["--input-format", "excel"]).unwrap();
}
#[test]
fn test_parse_arg_compression_format() {
let args = parse_args(vec!["--parquet-compression", "uncompressed"]).unwrap();
assert_eq!(args.parquet_compression, Compression::UNCOMPRESSED);
let args = parse_args(vec!["--parquet-compression", "snappy"]).unwrap();
assert_eq!(args.parquet_compression, Compression::SNAPPY);
let args = parse_args(vec!["--parquet-compression", "gzip"]).unwrap();
assert_eq!(
args.parquet_compression,
Compression::GZIP(Default::default())
);
let args = parse_args(vec!["--parquet-compression", "lzo"]).unwrap();
assert_eq!(args.parquet_compression, Compression::LZO);
let args = parse_args(vec!["--parquet-compression", "lz4"]).unwrap();
assert_eq!(args.parquet_compression, Compression::LZ4);
let args = parse_args(vec!["--parquet-compression", "brotli"]).unwrap();
assert_eq!(
args.parquet_compression,
Compression::BROTLI(Default::default())
);
let args = parse_args(vec!["--parquet-compression", "zstd"]).unwrap();
assert_eq!(
args.parquet_compression,
Compression::ZSTD(Default::default())
);
}
#[test]
fn test_parse_arg_compression_format_fail() {
match parse_args(vec!["--parquet-compression", "zip"]) {
Ok(_) => panic!("unexpected success"),
Err(e) => {
let err = e.to_string();
assert!(err.contains("error: invalid value 'zip' for '--parquet-compression <PARQUET_COMPRESSION>': Unknown compression ZIP : possible values UNCOMPRESSED, SNAPPY, GZIP, LZO, BROTLI, LZ4, ZSTD \n\nFor more information try --help"), "{err}")
}
}
}
fn assert_debug_text(debug_text: &str, name: &str, value: &str) {
let pattern = format!(" {name}: {value}");
assert!(
debug_text.contains(&pattern),
"\"{debug_text}\" not contains \"{pattern}\""
)
}
#[test]
fn test_configure_reader_builder() {
let args = Args {
schema: PathBuf::from(Path::new("schema.arvo")),
input_file: PathBuf::from(Path::new("test.csv")),
output_file: PathBuf::from(Path::new("out.parquet")),
batch_size: 1000,
input_format: CsvDialect::Csv,
has_header: false,
delimiter: None,
record_terminator: None,
escape_char: None,
quote_char: None,
double_quote: None,
csv_compression: Compression::UNCOMPRESSED,
parquet_compression: Compression::SNAPPY,
writer_version: None,
max_row_group_size: None,
enable_bloom_filter: None,
help: None,
};
let arrow_schema = Arc::new(Schema::new(vec![
Field::new("field1", DataType::Utf8, false),
Field::new("field2", DataType::Utf8, false),
Field::new("field3", DataType::Utf8, false),
Field::new("field4", DataType::Utf8, false),
Field::new("field5", DataType::Utf8, false),
]));
let reader_builder = configure_reader_builder(&args, arrow_schema);
let builder_debug = format!("{reader_builder:?}");
assert_debug_text(&builder_debug, "header", "false");
assert_debug_text(&builder_debug, "delimiter", "Some(44)");
assert_debug_text(&builder_debug, "quote", "Some(34)");
assert_debug_text(&builder_debug, "terminator", "None");
assert_debug_text(&builder_debug, "batch_size", "1000");
assert_debug_text(&builder_debug, "escape", "None");
let args = Args {
schema: PathBuf::from(Path::new("schema.arvo")),
input_file: PathBuf::from(Path::new("test.csv")),
output_file: PathBuf::from(Path::new("out.parquet")),
batch_size: 2000,
input_format: CsvDialect::Tsv,
has_header: true,
delimiter: None,
record_terminator: None,
escape_char: Some('\\'),
quote_char: None,
double_quote: None,
csv_compression: Compression::UNCOMPRESSED,
parquet_compression: Compression::SNAPPY,
writer_version: None,
max_row_group_size: None,
enable_bloom_filter: None,
help: None,
};
let arrow_schema = Arc::new(Schema::new(vec![
Field::new("field1", DataType::Utf8, false),
Field::new("field2", DataType::Utf8, false),
Field::new("field3", DataType::Utf8, false),
Field::new("field4", DataType::Utf8, false),
Field::new("field5", DataType::Utf8, false),
]));
let reader_builder = configure_reader_builder(&args, arrow_schema);
let builder_debug = format!("{reader_builder:?}");
assert_debug_text(&builder_debug, "header", "true");
assert_debug_text(&builder_debug, "delimiter", "Some(9)");
assert_debug_text(&builder_debug, "quote", "None");
assert_debug_text(&builder_debug, "terminator", "Some(10)");
assert_debug_text(&builder_debug, "batch_size", "2000");
assert_debug_text(&builder_debug, "escape", "Some(92)");
}
fn test_convert_compressed_csv_to_parquet(csv_compression: Compression) {
let schema = NamedTempFile::new().unwrap();
let schema_text = r"message my_amazing_schema {
optional int32 id;
optional binary name (STRING);
}";
schema.as_file().write_all(schema_text.as_bytes()).unwrap();
let mut input_file = NamedTempFile::new().unwrap();
fn write_tmp_file<T: Write>(w: &mut T) {
for index in 1..2000 {
write!(w, "{index},\"name_{index}\"\r\n").unwrap();
}
w.flush().unwrap();
}
input_file = match csv_compression {
Compression::UNCOMPRESSED => {
write_tmp_file(&mut input_file);
input_file
}
Compression::SNAPPY => {
let mut encoder = FrameEncoder::new(input_file);
write_tmp_file(&mut encoder);
encoder.into_inner().unwrap()
}
Compression::GZIP(level) => {
let mut encoder = GzEncoder::new(
input_file,
flate2::Compression::new(level.compression_level()),
);
write_tmp_file(&mut encoder);
encoder.finish().unwrap()
}
Compression::BROTLI(level) => {
let mut encoder =
CompressorWriter::new(input_file, 0, level.compression_level(), 0);
write_tmp_file(&mut encoder);
encoder.into_inner()
}
Compression::LZ4 => {
let mut encoder = lz4_flex::frame::FrameEncoder::new(input_file);
write_tmp_file(&mut encoder);
encoder.finish().unwrap()
}
Compression::ZSTD(level) => {
let mut encoder = zstd::Encoder::new(input_file, level.compression_level())
.map_err(|e| {
ParquetFromCsvError::with_context(e, "Failed to create zstd::Encoder")
})
.unwrap();
write_tmp_file(&mut encoder);
encoder.finish().unwrap()
}
d => unimplemented!("compression type {d}"),
};
let output_parquet = NamedTempFile::new().unwrap();
let args = Args {
schema: PathBuf::from(schema.path()),
input_file: PathBuf::from(input_file.path()),
output_file: PathBuf::from(output_parquet.path()),
batch_size: 1000,
input_format: CsvDialect::Csv,
has_header: false,
delimiter: None,
record_terminator: None,
escape_char: None,
quote_char: None,
double_quote: None,
csv_compression,
parquet_compression: Compression::SNAPPY,
writer_version: None,
max_row_group_size: None,
enable_bloom_filter: Some(true),
help: None,
};
convert_csv_to_parquet(&args).unwrap();
let file = SerializedFileReader::new(output_parquet.into_file()).unwrap();
let schema_name = file.metadata().file_metadata().schema().name();
assert_eq!(schema_name, "my_amazing_schema");
}
#[test]
fn test_convert_csv_to_parquet() {
test_convert_compressed_csv_to_parquet(Compression::UNCOMPRESSED);
test_convert_compressed_csv_to_parquet(Compression::SNAPPY);
test_convert_compressed_csv_to_parquet(Compression::GZIP(GzipLevel::try_new(1).unwrap()));
test_convert_compressed_csv_to_parquet(Compression::BROTLI(
BrotliLevel::try_new(2).unwrap(),
));
test_convert_compressed_csv_to_parquet(Compression::LZ4);
test_convert_compressed_csv_to_parquet(Compression::ZSTD(ZstdLevel::try_new(1).unwrap()));
}
}