parquet_fromcsv/
parquet-fromcsv.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Binary file to converts csv to Parquet file
19//!
20//! # Install
21//!
22//! `parquet-fromcsv` can be installed using `cargo`:
23//!
24//! ```text
25//! cargo install parquet --features=cli
26//! ```
27//!
28//! After this `parquet-fromcsv` should be available:
29//!
30//! ```text
31//! parquet-fromcsv --schema message_schema_for_parquet.txt input.csv output.parquet
32//! ```
33//!
34//! The binary can also be built from the source code and run as follows:
35//!
36//! ```text
37//! cargo run --features=cli --bin parquet-fromcsv --schema message_schema_for_parquet.txt \
38//!    \ input.csv output.parquet
39//! ```
40//!
41//! # Options
42//!
43//! ```text
44#![doc = include_str!("./parquet-fromcsv-help.txt")] // Update for this file : Run test test_command_help
45//! ```
46//!
47//! ## Parquet file options
48//!
49//! ```text
50//! - `-b`, `--batch-size` : Batch size for Parquet
51//! - `-c`, `--parquet-compression` : Compression option for Parquet, default is SNAPPY
52//! - `-s`, `--schema` : Path to message schema for generated Parquet file
53//! - `-o`, `--output-file` : Path to output Parquet file
54//! - `-w`, `--writer-version` : Writer version
55//! - `-m`, `--max-row-group-size` : Max row group size
56//! -       `--enable-bloom-filter` : Enable bloom filter during writing
57//! ```
58//!
59//! ## Input file options
60//!
61//! ```text
62//! - `-i`, `--input-file` : Path to input CSV file
63//! - `-f`, `--input-format` : Dialect for input file, `csv` or `tsv`.
64//! - `-C`, `--csv-compression` : Compression option for csv, default is UNCOMPRESSED
65//! - `-d`, `--delimiter : Field delimiter for CSV file, default depends `--input-format`
66//! - `-e`, `--escape` : Escape character for input file
67//! - `-h`, `--has-header` : Input has header
68//! - `-r`, `--record-terminator` : Record terminator character for input. default is CRLF
69//! - `-q`, `--quote-char` : Input quoting character
70//! ```
71//!
72
73use std::{
74    fmt::Display,
75    fs::{read_to_string, File},
76    io::Read,
77    path::{Path, PathBuf},
78    sync::Arc,
79};
80
81use arrow_csv::ReaderBuilder;
82use arrow_schema::{ArrowError, Schema};
83use clap::{Parser, ValueEnum};
84use parquet::arrow::arrow_writer::ArrowWriterOptions;
85use parquet::{
86    arrow::{parquet_to_arrow_schema, ArrowWriter},
87    basic::Compression,
88    errors::ParquetError,
89    file::properties::{WriterProperties, WriterVersion},
90    schema::{parser::parse_message_type, types::SchemaDescriptor},
91};
92
93#[derive(Debug)]
94enum ParquetFromCsvError {
95    CommandLineParseError(clap::Error),
96    IoError(std::io::Error),
97    ArrowError(ArrowError),
98    ParquetError(ParquetError),
99    WithContext(String, Box<Self>),
100}
101
102impl From<std::io::Error> for ParquetFromCsvError {
103    fn from(e: std::io::Error) -> Self {
104        Self::IoError(e)
105    }
106}
107
108impl From<ArrowError> for ParquetFromCsvError {
109    fn from(e: ArrowError) -> Self {
110        Self::ArrowError(e)
111    }
112}
113
114impl From<ParquetError> for ParquetFromCsvError {
115    fn from(e: ParquetError) -> Self {
116        Self::ParquetError(e)
117    }
118}
119
120impl From<clap::Error> for ParquetFromCsvError {
121    fn from(e: clap::Error) -> Self {
122        Self::CommandLineParseError(e)
123    }
124}
125
126impl ParquetFromCsvError {
127    pub fn with_context<E: Into<ParquetFromCsvError>>(
128        inner_error: E,
129        context: &str,
130    ) -> ParquetFromCsvError {
131        let inner = inner_error.into();
132        ParquetFromCsvError::WithContext(context.to_string(), Box::new(inner))
133    }
134}
135
136impl Display for ParquetFromCsvError {
137    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
138        match self {
139            ParquetFromCsvError::CommandLineParseError(e) => write!(f, "{e}"),
140            ParquetFromCsvError::IoError(e) => write!(f, "{e}"),
141            ParquetFromCsvError::ArrowError(e) => write!(f, "{e}"),
142            ParquetFromCsvError::ParquetError(e) => write!(f, "{e}"),
143            ParquetFromCsvError::WithContext(c, e) => {
144                writeln!(f, "{e}")?;
145                write!(f, "context: {c}")
146            }
147        }
148    }
149}
150
151#[derive(Debug, Parser)]
152#[clap(author, version, disable_help_flag=true, about("Binary to convert csv to Parquet"), long_about=None)]
153struct Args {
154    /// Path to a text file containing a parquet schema definition
155    #[clap(short, long, help("message schema for output Parquet"))]
156    schema: PathBuf,
157    /// input CSV file path
158    #[clap(short, long, help("input CSV file"))]
159    input_file: PathBuf,
160    /// output Parquet file path
161    #[clap(short, long, help("output Parquet file"))]
162    output_file: PathBuf,
163    /// input file format
164    #[clap(
165        value_enum,
166        short('f'),
167        long,
168        help("input file format"),
169        default_value_t=CsvDialect::Csv
170    )]
171    input_format: CsvDialect,
172    /// batch size
173    #[clap(
174        short,
175        long,
176        help("batch size"),
177        default_value_t = 1000,
178        env = "PARQUET_FROM_CSV_BATCHSIZE"
179    )]
180    batch_size: usize,
181    /// has header line
182    #[clap(short, long, help("has header"))]
183    has_header: bool,
184    /// field delimiter
185    ///
186    /// default value:
187    ///  when input_format==CSV: ','
188    ///  when input_format==TSV: 'TAB'
189    #[clap(short, long, help("field delimiter"))]
190    delimiter: Option<char>,
191    #[clap(value_enum, short, long, help("record terminator"))]
192    record_terminator: Option<RecordTerminator>,
193    #[clap(short, long, help("escape character"))]
194    escape_char: Option<char>,
195    #[clap(short, long, help("quote character"))]
196    quote_char: Option<char>,
197    #[clap(short('D'), long, help("double quote"))]
198    double_quote: Option<bool>,
199    #[clap(short('C'), long, help("compression mode of csv"), default_value_t=Compression::UNCOMPRESSED)]
200    #[clap(value_parser=compression_from_str)]
201    csv_compression: Compression,
202    #[clap(short('c'), long, help("compression mode of parquet"), default_value_t=Compression::SNAPPY)]
203    #[clap(value_parser=compression_from_str)]
204    parquet_compression: Compression,
205
206    #[clap(short, long, help("writer version"))]
207    #[clap(value_parser=writer_version_from_str)]
208    writer_version: Option<WriterVersion>,
209    #[clap(short, long, help("max row group size"))]
210    max_row_group_size: Option<usize>,
211    #[clap(long, help("whether to enable bloom filter writing"))]
212    enable_bloom_filter: Option<bool>,
213
214    #[clap(long, action=clap::ArgAction::Help, help("display usage help"))]
215    help: Option<bool>,
216}
217
218fn compression_from_str(cmp: &str) -> Result<Compression, String> {
219    match cmp.to_uppercase().as_str() {
220        "UNCOMPRESSED" => Ok(Compression::UNCOMPRESSED),
221        "SNAPPY" => Ok(Compression::SNAPPY),
222        "GZIP" => Ok(Compression::GZIP(Default::default())),
223        "LZO" => Ok(Compression::LZO),
224        "BROTLI" => Ok(Compression::BROTLI(Default::default())),
225        "LZ4" => Ok(Compression::LZ4),
226        "ZSTD" => Ok(Compression::ZSTD(Default::default())),
227        v => Err(
228            format!("Unknown compression {v} : possible values UNCOMPRESSED, SNAPPY, GZIP, LZO, BROTLI, LZ4, ZSTD \n\nFor more information try --help")
229        )
230    }
231}
232
233fn writer_version_from_str(cmp: &str) -> Result<WriterVersion, String> {
234    match cmp.to_uppercase().as_str() {
235        "1" => Ok(WriterVersion::PARQUET_1_0),
236        "2" => Ok(WriterVersion::PARQUET_2_0),
237        v => Err(format!("Unknown writer version {v} : possible values 1, 2")),
238    }
239}
240
241impl Args {
242    fn schema_path(&self) -> &Path {
243        self.schema.as_path()
244    }
245    fn get_delimiter(&self) -> u8 {
246        match self.delimiter {
247            Some(ch) => ch as u8,
248            None => match self.input_format {
249                CsvDialect::Csv => b',',
250                CsvDialect::Tsv => b'\t',
251            },
252        }
253    }
254    fn get_terminator(&self) -> Option<u8> {
255        match self.record_terminator {
256            Some(RecordTerminator::LF) => Some(0x0a),
257            Some(RecordTerminator::CR) => Some(0x0d),
258            Some(RecordTerminator::Crlf) => None,
259            None => match self.input_format {
260                CsvDialect::Csv => None,
261                CsvDialect::Tsv => Some(0x0a),
262            },
263        }
264    }
265    fn get_escape(&self) -> Option<u8> {
266        self.escape_char.map(|ch| ch as u8)
267    }
268    fn get_quote(&self) -> Option<u8> {
269        if self.quote_char.is_none() {
270            match self.input_format {
271                CsvDialect::Csv => Some(b'\"'),
272                CsvDialect::Tsv => None,
273            }
274        } else {
275            self.quote_char.map(|c| c as u8)
276        }
277    }
278}
279
280#[derive(Debug, Clone, Copy, ValueEnum, PartialEq)]
281enum CsvDialect {
282    Csv,
283    Tsv,
284}
285
286#[derive(Debug, Clone, Copy, ValueEnum, PartialEq)]
287enum RecordTerminator {
288    LF,
289    Crlf,
290    CR,
291}
292
293fn configure_writer_properties(args: &Args) -> WriterProperties {
294    let mut properties_builder =
295        WriterProperties::builder().set_compression(args.parquet_compression);
296    if let Some(writer_version) = args.writer_version {
297        properties_builder = properties_builder.set_writer_version(writer_version);
298    }
299    if let Some(max_row_group_size) = args.max_row_group_size {
300        properties_builder = properties_builder.set_max_row_group_size(max_row_group_size);
301    }
302    if let Some(enable_bloom_filter) = args.enable_bloom_filter {
303        properties_builder = properties_builder.set_bloom_filter_enabled(enable_bloom_filter);
304    }
305    properties_builder.build()
306}
307
308fn configure_reader_builder(args: &Args, arrow_schema: Arc<Schema>) -> ReaderBuilder {
309    fn configure_reader<T, F: Fn(ReaderBuilder, T) -> ReaderBuilder>(
310        builder: ReaderBuilder,
311        value: Option<T>,
312        fun: F,
313    ) -> ReaderBuilder {
314        if let Some(val) = value {
315            fun(builder, val)
316        } else {
317            builder
318        }
319    }
320
321    let mut builder = ReaderBuilder::new(arrow_schema)
322        .with_batch_size(args.batch_size)
323        .with_header(args.has_header)
324        .with_delimiter(args.get_delimiter());
325
326    builder = configure_reader(
327        builder,
328        args.get_terminator(),
329        ReaderBuilder::with_terminator,
330    );
331    builder = configure_reader(builder, args.get_escape(), ReaderBuilder::with_escape);
332    builder = configure_reader(builder, args.get_quote(), ReaderBuilder::with_quote);
333
334    builder
335}
336
337fn convert_csv_to_parquet(args: &Args) -> Result<(), ParquetFromCsvError> {
338    let schema = read_to_string(args.schema_path()).map_err(|e| {
339        ParquetFromCsvError::with_context(
340            e,
341            &format!("Failed to open schema file {:#?}", args.schema_path()),
342        )
343    })?;
344    let parquet_schema = Arc::new(parse_message_type(&schema)?);
345    let desc = SchemaDescriptor::new(parquet_schema);
346    let arrow_schema = Arc::new(parquet_to_arrow_schema(&desc, None)?);
347
348    // create output parquet writer
349    let parquet_file = File::create(&args.output_file).map_err(|e| {
350        ParquetFromCsvError::with_context(
351            e,
352            &format!("Failed to create output file {:#?}", &args.output_file),
353        )
354    })?;
355
356    let options = ArrowWriterOptions::new()
357        .with_properties(configure_writer_properties(args))
358        .with_schema_root(desc.name().to_string());
359
360    let mut arrow_writer =
361        ArrowWriter::try_new_with_options(parquet_file, arrow_schema.clone(), options)
362            .map_err(|e| ParquetFromCsvError::with_context(e, "Failed to create ArrowWriter"))?;
363
364    // open input file
365    let input_file = File::open(&args.input_file).map_err(|e| {
366        ParquetFromCsvError::with_context(
367            e,
368            &format!("Failed to open input file {:#?}", &args.input_file),
369        )
370    })?;
371
372    // open input file decoder
373    let input_file_decoder = match args.csv_compression {
374        Compression::UNCOMPRESSED => Box::new(input_file) as Box<dyn Read>,
375        Compression::SNAPPY => Box::new(snap::read::FrameDecoder::new(input_file)) as Box<dyn Read>,
376        Compression::GZIP(_) => {
377            Box::new(flate2::read::MultiGzDecoder::new(input_file)) as Box<dyn Read>
378        }
379        Compression::BROTLI(_) => {
380            Box::new(brotli::Decompressor::new(input_file, 0)) as Box<dyn Read>
381        }
382        Compression::LZ4 => {
383            Box::new(lz4_flex::frame::FrameDecoder::new(input_file)) as Box<dyn Read>
384        }
385        Compression::ZSTD(_) => {
386            Box::new(zstd::Decoder::new(input_file).map_err(|e| {
387                ParquetFromCsvError::with_context(e, "Failed to create zstd::Decoder")
388            })?) as Box<dyn Read>
389        }
390        d => unimplemented!("compression type {d}"),
391    };
392
393    // create input csv reader
394    let builder = configure_reader_builder(args, arrow_schema);
395    let reader = builder.build(input_file_decoder)?;
396    for batch_result in reader {
397        let batch = batch_result.map_err(|e| {
398            ParquetFromCsvError::with_context(e, "Failed to read RecordBatch from CSV")
399        })?;
400        arrow_writer.write(&batch).map_err(|e| {
401            ParquetFromCsvError::with_context(e, "Failed to write RecordBatch to parquet")
402        })?;
403    }
404    arrow_writer
405        .close()
406        .map_err(|e| ParquetFromCsvError::with_context(e, "Failed to close parquet"))?;
407    Ok(())
408}
409
410fn main() -> Result<(), ParquetFromCsvError> {
411    let args = Args::parse();
412    convert_csv_to_parquet(&args)
413}
414
415#[cfg(test)]
416mod tests {
417    use std::{
418        io::Write,
419        path::{Path, PathBuf},
420    };
421
422    use super::*;
423    use arrow::datatypes::{DataType, Field};
424    use brotli::CompressorWriter;
425    use clap::{CommandFactory, Parser};
426    use flate2::write::GzEncoder;
427    use parquet::basic::{BrotliLevel, GzipLevel, ZstdLevel};
428    use parquet::file::reader::{FileReader, SerializedFileReader};
429    use snap::write::FrameEncoder;
430    use tempfile::NamedTempFile;
431
432    #[test]
433    fn test_command_help() {
434        let mut cmd = Args::command();
435        let dir = std::env::var("CARGO_MANIFEST_DIR").unwrap();
436        let mut path_buf = PathBuf::from(dir);
437        path_buf.push("src");
438        path_buf.push("bin");
439        path_buf.push("parquet-fromcsv-help.txt");
440        let expected = std::fs::read_to_string(path_buf).unwrap();
441        let mut buffer_vec = Vec::new();
442        let mut buffer = std::io::Cursor::new(&mut buffer_vec);
443        cmd.write_long_help(&mut buffer).unwrap();
444        // Remove Parquet version string from the help text
445        let mut actual = String::from_utf8(buffer_vec).unwrap();
446        let pos = actual.find('\n').unwrap() + 1;
447        actual = actual[pos..].to_string();
448        assert_eq!(
449            expected, actual,
450            "help text not match. please update to \n---\n{actual}\n---\n"
451        )
452    }
453
454    fn parse_args(mut extra_args: Vec<&str>) -> Result<Args, ParquetFromCsvError> {
455        let mut args = vec![
456            "test",
457            "--schema",
458            "test.schema",
459            "--input-file",
460            "infile.csv",
461            "--output-file",
462            "out.parquet",
463        ];
464        args.append(&mut extra_args);
465        let args = Args::try_parse_from(args.iter())?;
466        Ok(args)
467    }
468
469    #[test]
470    fn test_parse_arg_minimum() -> Result<(), ParquetFromCsvError> {
471        let args = parse_args(vec![])?;
472
473        assert_eq!(args.schema, PathBuf::from(Path::new("test.schema")));
474        assert_eq!(args.input_file, PathBuf::from(Path::new("infile.csv")));
475        assert_eq!(args.output_file, PathBuf::from(Path::new("out.parquet")));
476        // test default values
477        assert_eq!(args.input_format, CsvDialect::Csv);
478        assert_eq!(args.batch_size, 1000);
479        assert!(!args.has_header);
480        assert_eq!(args.delimiter, None);
481        assert_eq!(args.get_delimiter(), b',');
482        assert_eq!(args.record_terminator, None);
483        assert_eq!(args.get_terminator(), None); // CRLF
484        assert_eq!(args.quote_char, None);
485        assert_eq!(args.get_quote(), Some(b'\"'));
486        assert_eq!(args.double_quote, None);
487        assert_eq!(args.parquet_compression, Compression::SNAPPY);
488        Ok(())
489    }
490
491    #[test]
492    fn test_parse_arg_format_variants() -> Result<(), ParquetFromCsvError> {
493        let args = parse_args(vec!["--input-format", "csv"])?;
494        assert_eq!(args.input_format, CsvDialect::Csv);
495        assert_eq!(args.get_delimiter(), b',');
496        assert_eq!(args.get_terminator(), None); // CRLF
497        assert_eq!(args.get_quote(), Some(b'\"'));
498        assert_eq!(args.get_escape(), None);
499        let args = parse_args(vec!["--input-format", "tsv"])?;
500        assert_eq!(args.input_format, CsvDialect::Tsv);
501        assert_eq!(args.get_delimiter(), b'\t');
502        assert_eq!(args.get_terminator(), Some(b'\x0a')); // LF
503        assert_eq!(args.get_quote(), None); // quote none
504        assert_eq!(args.get_escape(), None);
505
506        let args = parse_args(vec!["--input-format", "csv", "--escape-char", "\\"])?;
507        assert_eq!(args.input_format, CsvDialect::Csv);
508        assert_eq!(args.get_delimiter(), b',');
509        assert_eq!(args.get_terminator(), None); // CRLF
510        assert_eq!(args.get_quote(), Some(b'\"'));
511        assert_eq!(args.get_escape(), Some(b'\\'));
512
513        let args = parse_args(vec!["--input-format", "tsv", "--delimiter", ":"])?;
514        assert_eq!(args.input_format, CsvDialect::Tsv);
515        assert_eq!(args.get_delimiter(), b':');
516        assert_eq!(args.get_terminator(), Some(b'\x0a')); // LF
517        assert_eq!(args.get_quote(), None); // quote none
518        assert_eq!(args.get_escape(), None);
519
520        Ok(())
521    }
522
523    #[test]
524    #[should_panic]
525    fn test_parse_arg_format_error() {
526        parse_args(vec!["--input-format", "excel"]).unwrap();
527    }
528
529    #[test]
530    fn test_parse_arg_compression_format() {
531        let args = parse_args(vec!["--parquet-compression", "uncompressed"]).unwrap();
532        assert_eq!(args.parquet_compression, Compression::UNCOMPRESSED);
533        let args = parse_args(vec!["--parquet-compression", "snappy"]).unwrap();
534        assert_eq!(args.parquet_compression, Compression::SNAPPY);
535        let args = parse_args(vec!["--parquet-compression", "gzip"]).unwrap();
536        assert_eq!(
537            args.parquet_compression,
538            Compression::GZIP(Default::default())
539        );
540        let args = parse_args(vec!["--parquet-compression", "lzo"]).unwrap();
541        assert_eq!(args.parquet_compression, Compression::LZO);
542        let args = parse_args(vec!["--parquet-compression", "lz4"]).unwrap();
543        assert_eq!(args.parquet_compression, Compression::LZ4);
544        let args = parse_args(vec!["--parquet-compression", "brotli"]).unwrap();
545        assert_eq!(
546            args.parquet_compression,
547            Compression::BROTLI(Default::default())
548        );
549        let args = parse_args(vec!["--parquet-compression", "zstd"]).unwrap();
550        assert_eq!(
551            args.parquet_compression,
552            Compression::ZSTD(Default::default())
553        );
554    }
555
556    #[test]
557    fn test_parse_arg_compression_format_fail() {
558        match parse_args(vec!["--parquet-compression", "zip"]) {
559            Ok(_) => panic!("unexpected success"),
560            Err(e) => {
561                let err = e.to_string();
562                assert!(err.contains("error: invalid value 'zip' for '--parquet-compression <PARQUET_COMPRESSION>': Unknown compression ZIP : possible values UNCOMPRESSED, SNAPPY, GZIP, LZO, BROTLI, LZ4, ZSTD \n\nFor more information try --help"), "{err}")
563            }
564        }
565    }
566
567    fn assert_debug_text(debug_text: &str, name: &str, value: &str) {
568        let pattern = format!(" {name}: {value}");
569        assert!(
570            debug_text.contains(&pattern),
571            "\"{debug_text}\" not contains \"{pattern}\""
572        )
573    }
574
575    #[test]
576    fn test_configure_reader_builder() {
577        let args = Args {
578            schema: PathBuf::from(Path::new("schema.arvo")),
579            input_file: PathBuf::from(Path::new("test.csv")),
580            output_file: PathBuf::from(Path::new("out.parquet")),
581            batch_size: 1000,
582            input_format: CsvDialect::Csv,
583            has_header: false,
584            delimiter: None,
585            record_terminator: None,
586            escape_char: None,
587            quote_char: None,
588            double_quote: None,
589            csv_compression: Compression::UNCOMPRESSED,
590            parquet_compression: Compression::SNAPPY,
591            writer_version: None,
592            max_row_group_size: None,
593            enable_bloom_filter: None,
594            help: None,
595        };
596        let arrow_schema = Arc::new(Schema::new(vec![
597            Field::new("field1", DataType::Utf8, false),
598            Field::new("field2", DataType::Utf8, false),
599            Field::new("field3", DataType::Utf8, false),
600            Field::new("field4", DataType::Utf8, false),
601            Field::new("field5", DataType::Utf8, false),
602        ]));
603
604        let reader_builder = configure_reader_builder(&args, arrow_schema);
605        let builder_debug = format!("{reader_builder:?}");
606        assert_debug_text(&builder_debug, "header", "false");
607        assert_debug_text(&builder_debug, "delimiter", "Some(44)");
608        assert_debug_text(&builder_debug, "quote", "Some(34)");
609        assert_debug_text(&builder_debug, "terminator", "None");
610        assert_debug_text(&builder_debug, "batch_size", "1000");
611        assert_debug_text(&builder_debug, "escape", "None");
612
613        let args = Args {
614            schema: PathBuf::from(Path::new("schema.arvo")),
615            input_file: PathBuf::from(Path::new("test.csv")),
616            output_file: PathBuf::from(Path::new("out.parquet")),
617            batch_size: 2000,
618            input_format: CsvDialect::Tsv,
619            has_header: true,
620            delimiter: None,
621            record_terminator: None,
622            escape_char: Some('\\'),
623            quote_char: None,
624            double_quote: None,
625            csv_compression: Compression::UNCOMPRESSED,
626            parquet_compression: Compression::SNAPPY,
627            writer_version: None,
628            max_row_group_size: None,
629            enable_bloom_filter: None,
630            help: None,
631        };
632        let arrow_schema = Arc::new(Schema::new(vec![
633            Field::new("field1", DataType::Utf8, false),
634            Field::new("field2", DataType::Utf8, false),
635            Field::new("field3", DataType::Utf8, false),
636            Field::new("field4", DataType::Utf8, false),
637            Field::new("field5", DataType::Utf8, false),
638        ]));
639        let reader_builder = configure_reader_builder(&args, arrow_schema);
640        let builder_debug = format!("{reader_builder:?}");
641        assert_debug_text(&builder_debug, "header", "true");
642        assert_debug_text(&builder_debug, "delimiter", "Some(9)");
643        assert_debug_text(&builder_debug, "quote", "None");
644        assert_debug_text(&builder_debug, "terminator", "Some(10)");
645        assert_debug_text(&builder_debug, "batch_size", "2000");
646        assert_debug_text(&builder_debug, "escape", "Some(92)");
647    }
648
649    fn test_convert_compressed_csv_to_parquet(csv_compression: Compression) {
650        let schema = NamedTempFile::new().unwrap();
651        let schema_text = r"message my_amazing_schema {
652            optional int32 id;
653            optional binary name (STRING);
654        }";
655        schema.as_file().write_all(schema_text.as_bytes()).unwrap();
656
657        let mut input_file = NamedTempFile::new().unwrap();
658
659        fn write_tmp_file<T: Write>(w: &mut T) {
660            for index in 1..2000 {
661                write!(w, "{index},\"name_{index}\"\r\n").unwrap();
662            }
663            w.flush().unwrap();
664        }
665
666        // make sure the input_file's lifetime being long enough
667        input_file = match csv_compression {
668            Compression::UNCOMPRESSED => {
669                write_tmp_file(&mut input_file);
670                input_file
671            }
672            Compression::SNAPPY => {
673                let mut encoder = FrameEncoder::new(input_file);
674                write_tmp_file(&mut encoder);
675                encoder.into_inner().unwrap()
676            }
677            Compression::GZIP(level) => {
678                let mut encoder = GzEncoder::new(
679                    input_file,
680                    flate2::Compression::new(level.compression_level()),
681                );
682                write_tmp_file(&mut encoder);
683                encoder.finish().unwrap()
684            }
685            Compression::BROTLI(level) => {
686                let mut encoder =
687                    CompressorWriter::new(input_file, 0, level.compression_level(), 0);
688                write_tmp_file(&mut encoder);
689                encoder.into_inner()
690            }
691            Compression::LZ4 => {
692                let mut encoder = lz4_flex::frame::FrameEncoder::new(input_file);
693                write_tmp_file(&mut encoder);
694                encoder.finish().unwrap()
695            }
696
697            Compression::ZSTD(level) => {
698                let mut encoder = zstd::Encoder::new(input_file, level.compression_level())
699                    .map_err(|e| {
700                        ParquetFromCsvError::with_context(e, "Failed to create zstd::Encoder")
701                    })
702                    .unwrap();
703                write_tmp_file(&mut encoder);
704                encoder.finish().unwrap()
705            }
706            d => unimplemented!("compression type {d}"),
707        };
708
709        let output_parquet = NamedTempFile::new().unwrap();
710
711        let args = Args {
712            schema: PathBuf::from(schema.path()),
713            input_file: PathBuf::from(input_file.path()),
714            output_file: PathBuf::from(output_parquet.path()),
715            batch_size: 1000,
716            input_format: CsvDialect::Csv,
717            has_header: false,
718            delimiter: None,
719            record_terminator: None,
720            escape_char: None,
721            quote_char: None,
722            double_quote: None,
723            csv_compression,
724            parquet_compression: Compression::SNAPPY,
725            writer_version: None,
726            max_row_group_size: None,
727            // by default we shall test bloom filter writing
728            enable_bloom_filter: Some(true),
729            help: None,
730        };
731        convert_csv_to_parquet(&args).unwrap();
732
733        let file = SerializedFileReader::new(output_parquet.into_file()).unwrap();
734        let schema_name = file.metadata().file_metadata().schema().name();
735        assert_eq!(schema_name, "my_amazing_schema");
736    }
737
738    #[test]
739    fn test_convert_csv_to_parquet() {
740        test_convert_compressed_csv_to_parquet(Compression::UNCOMPRESSED);
741        test_convert_compressed_csv_to_parquet(Compression::SNAPPY);
742        test_convert_compressed_csv_to_parquet(Compression::GZIP(GzipLevel::try_new(1).unwrap()));
743        test_convert_compressed_csv_to_parquet(Compression::BROTLI(
744            BrotliLevel::try_new(2).unwrap(),
745        ));
746        test_convert_compressed_csv_to_parquet(Compression::LZ4);
747        test_convert_compressed_csv_to_parquet(Compression::ZSTD(ZstdLevel::try_new(1).unwrap()));
748    }
749}