1#![doc = include_str!("./parquet-fromcsv-help.txt")] use std::{
74 fmt::Display,
75 fs::{read_to_string, File},
76 io::Read,
77 path::{Path, PathBuf},
78 sync::Arc,
79};
80
81use arrow_csv::ReaderBuilder;
82use arrow_schema::{ArrowError, Schema};
83use clap::{Parser, ValueEnum};
84use parquet::arrow::arrow_writer::ArrowWriterOptions;
85use parquet::{
86 arrow::{parquet_to_arrow_schema, ArrowWriter},
87 basic::Compression,
88 errors::ParquetError,
89 file::properties::{WriterProperties, WriterVersion},
90 schema::{parser::parse_message_type, types::SchemaDescriptor},
91};
92
93#[derive(Debug)]
94enum ParquetFromCsvError {
95 CommandLineParseError(clap::Error),
96 IoError(std::io::Error),
97 ArrowError(ArrowError),
98 ParquetError(ParquetError),
99 WithContext(String, Box<Self>),
100}
101
102impl From<std::io::Error> for ParquetFromCsvError {
103 fn from(e: std::io::Error) -> Self {
104 Self::IoError(e)
105 }
106}
107
108impl From<ArrowError> for ParquetFromCsvError {
109 fn from(e: ArrowError) -> Self {
110 Self::ArrowError(e)
111 }
112}
113
114impl From<ParquetError> for ParquetFromCsvError {
115 fn from(e: ParquetError) -> Self {
116 Self::ParquetError(e)
117 }
118}
119
120impl From<clap::Error> for ParquetFromCsvError {
121 fn from(e: clap::Error) -> Self {
122 Self::CommandLineParseError(e)
123 }
124}
125
126impl ParquetFromCsvError {
127 pub fn with_context<E: Into<ParquetFromCsvError>>(
128 inner_error: E,
129 context: &str,
130 ) -> ParquetFromCsvError {
131 let inner = inner_error.into();
132 ParquetFromCsvError::WithContext(context.to_string(), Box::new(inner))
133 }
134}
135
136impl Display for ParquetFromCsvError {
137 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
138 match self {
139 ParquetFromCsvError::CommandLineParseError(e) => write!(f, "{e}"),
140 ParquetFromCsvError::IoError(e) => write!(f, "{e}"),
141 ParquetFromCsvError::ArrowError(e) => write!(f, "{e}"),
142 ParquetFromCsvError::ParquetError(e) => write!(f, "{e}"),
143 ParquetFromCsvError::WithContext(c, e) => {
144 writeln!(f, "{e}")?;
145 write!(f, "context: {c}")
146 }
147 }
148 }
149}
150
151#[derive(Debug, Parser)]
152#[clap(author, version, disable_help_flag=true, about("Binary to convert csv to Parquet"), long_about=None)]
153struct Args {
154 #[clap(short, long, help("message schema for output Parquet"))]
156 schema: PathBuf,
157 #[clap(short, long, help("input CSV file"))]
159 input_file: PathBuf,
160 #[clap(short, long, help("output Parquet file"))]
162 output_file: PathBuf,
163 #[clap(
165 value_enum,
166 short('f'),
167 long,
168 help("input file format"),
169 default_value_t=CsvDialect::Csv
170 )]
171 input_format: CsvDialect,
172 #[clap(
174 short,
175 long,
176 help("batch size"),
177 default_value_t = 1000,
178 env = "PARQUET_FROM_CSV_BATCHSIZE"
179 )]
180 batch_size: usize,
181 #[clap(short, long, help("has header"))]
183 has_header: bool,
184 #[clap(short, long, help("field delimiter"))]
190 delimiter: Option<char>,
191 #[clap(value_enum, short, long, help("record terminator"))]
192 record_terminator: Option<RecordTerminator>,
193 #[clap(short, long, help("escape character"))]
194 escape_char: Option<char>,
195 #[clap(short, long, help("quote character"))]
196 quote_char: Option<char>,
197 #[clap(short('D'), long, help("double quote"))]
198 double_quote: Option<bool>,
199 #[clap(short('C'), long, help("compression mode of csv"), default_value_t=Compression::UNCOMPRESSED)]
200 #[clap(value_parser=compression_from_str)]
201 csv_compression: Compression,
202 #[clap(short('c'), long, help("compression mode of parquet"), default_value_t=Compression::SNAPPY)]
203 #[clap(value_parser=compression_from_str)]
204 parquet_compression: Compression,
205
206 #[clap(short, long, help("writer version"))]
207 #[clap(value_parser=writer_version_from_str)]
208 writer_version: Option<WriterVersion>,
209 #[clap(short, long, help("max row group size"))]
210 max_row_group_size: Option<usize>,
211 #[clap(long, help("whether to enable bloom filter writing"))]
212 enable_bloom_filter: Option<bool>,
213
214 #[clap(long, action=clap::ArgAction::Help, help("display usage help"))]
215 help: Option<bool>,
216}
217
218fn compression_from_str(cmp: &str) -> Result<Compression, String> {
219 match cmp.to_uppercase().as_str() {
220 "UNCOMPRESSED" => Ok(Compression::UNCOMPRESSED),
221 "SNAPPY" => Ok(Compression::SNAPPY),
222 "GZIP" => Ok(Compression::GZIP(Default::default())),
223 "LZO" => Ok(Compression::LZO),
224 "BROTLI" => Ok(Compression::BROTLI(Default::default())),
225 "LZ4" => Ok(Compression::LZ4),
226 "ZSTD" => Ok(Compression::ZSTD(Default::default())),
227 v => Err(
228 format!("Unknown compression {v} : possible values UNCOMPRESSED, SNAPPY, GZIP, LZO, BROTLI, LZ4, ZSTD \n\nFor more information try --help")
229 )
230 }
231}
232
233fn writer_version_from_str(cmp: &str) -> Result<WriterVersion, String> {
234 match cmp.to_uppercase().as_str() {
235 "1" => Ok(WriterVersion::PARQUET_1_0),
236 "2" => Ok(WriterVersion::PARQUET_2_0),
237 v => Err(format!("Unknown writer version {v} : possible values 1, 2")),
238 }
239}
240
241impl Args {
242 fn schema_path(&self) -> &Path {
243 self.schema.as_path()
244 }
245 fn get_delimiter(&self) -> u8 {
246 match self.delimiter {
247 Some(ch) => ch as u8,
248 None => match self.input_format {
249 CsvDialect::Csv => b',',
250 CsvDialect::Tsv => b'\t',
251 },
252 }
253 }
254 fn get_terminator(&self) -> Option<u8> {
255 match self.record_terminator {
256 Some(RecordTerminator::LF) => Some(0x0a),
257 Some(RecordTerminator::CR) => Some(0x0d),
258 Some(RecordTerminator::Crlf) => None,
259 None => match self.input_format {
260 CsvDialect::Csv => None,
261 CsvDialect::Tsv => Some(0x0a),
262 },
263 }
264 }
265 fn get_escape(&self) -> Option<u8> {
266 self.escape_char.map(|ch| ch as u8)
267 }
268 fn get_quote(&self) -> Option<u8> {
269 if self.quote_char.is_none() {
270 match self.input_format {
271 CsvDialect::Csv => Some(b'\"'),
272 CsvDialect::Tsv => None,
273 }
274 } else {
275 self.quote_char.map(|c| c as u8)
276 }
277 }
278}
279
280#[derive(Debug, Clone, Copy, ValueEnum, PartialEq)]
281enum CsvDialect {
282 Csv,
283 Tsv,
284}
285
286#[derive(Debug, Clone, Copy, ValueEnum, PartialEq)]
287enum RecordTerminator {
288 LF,
289 Crlf,
290 CR,
291}
292
293fn configure_writer_properties(args: &Args) -> WriterProperties {
294 let mut properties_builder =
295 WriterProperties::builder().set_compression(args.parquet_compression);
296 if let Some(writer_version) = args.writer_version {
297 properties_builder = properties_builder.set_writer_version(writer_version);
298 }
299 if let Some(max_row_group_size) = args.max_row_group_size {
300 properties_builder = properties_builder.set_max_row_group_size(max_row_group_size);
301 }
302 if let Some(enable_bloom_filter) = args.enable_bloom_filter {
303 properties_builder = properties_builder.set_bloom_filter_enabled(enable_bloom_filter);
304 }
305 properties_builder.build()
306}
307
308fn configure_reader_builder(args: &Args, arrow_schema: Arc<Schema>) -> ReaderBuilder {
309 fn configure_reader<T, F: Fn(ReaderBuilder, T) -> ReaderBuilder>(
310 builder: ReaderBuilder,
311 value: Option<T>,
312 fun: F,
313 ) -> ReaderBuilder {
314 if let Some(val) = value {
315 fun(builder, val)
316 } else {
317 builder
318 }
319 }
320
321 let mut builder = ReaderBuilder::new(arrow_schema)
322 .with_batch_size(args.batch_size)
323 .with_header(args.has_header)
324 .with_delimiter(args.get_delimiter());
325
326 builder = configure_reader(
327 builder,
328 args.get_terminator(),
329 ReaderBuilder::with_terminator,
330 );
331 builder = configure_reader(builder, args.get_escape(), ReaderBuilder::with_escape);
332 builder = configure_reader(builder, args.get_quote(), ReaderBuilder::with_quote);
333
334 builder
335}
336
337fn convert_csv_to_parquet(args: &Args) -> Result<(), ParquetFromCsvError> {
338 let schema = read_to_string(args.schema_path()).map_err(|e| {
339 ParquetFromCsvError::with_context(
340 e,
341 &format!("Failed to open schema file {:#?}", args.schema_path()),
342 )
343 })?;
344 let parquet_schema = Arc::new(parse_message_type(&schema)?);
345 let desc = SchemaDescriptor::new(parquet_schema);
346 let arrow_schema = Arc::new(parquet_to_arrow_schema(&desc, None)?);
347
348 let parquet_file = File::create(&args.output_file).map_err(|e| {
350 ParquetFromCsvError::with_context(
351 e,
352 &format!("Failed to create output file {:#?}", &args.output_file),
353 )
354 })?;
355
356 let options = ArrowWriterOptions::new()
357 .with_properties(configure_writer_properties(args))
358 .with_schema_root(desc.name().to_string());
359
360 let mut arrow_writer =
361 ArrowWriter::try_new_with_options(parquet_file, arrow_schema.clone(), options)
362 .map_err(|e| ParquetFromCsvError::with_context(e, "Failed to create ArrowWriter"))?;
363
364 let input_file = File::open(&args.input_file).map_err(|e| {
366 ParquetFromCsvError::with_context(
367 e,
368 &format!("Failed to open input file {:#?}", &args.input_file),
369 )
370 })?;
371
372 let input_file_decoder = match args.csv_compression {
374 Compression::UNCOMPRESSED => Box::new(input_file) as Box<dyn Read>,
375 Compression::SNAPPY => Box::new(snap::read::FrameDecoder::new(input_file)) as Box<dyn Read>,
376 Compression::GZIP(_) => {
377 Box::new(flate2::read::MultiGzDecoder::new(input_file)) as Box<dyn Read>
378 }
379 Compression::BROTLI(_) => {
380 Box::new(brotli::Decompressor::new(input_file, 0)) as Box<dyn Read>
381 }
382 Compression::LZ4 => {
383 Box::new(lz4_flex::frame::FrameDecoder::new(input_file)) as Box<dyn Read>
384 }
385 Compression::ZSTD(_) => {
386 Box::new(zstd::Decoder::new(input_file).map_err(|e| {
387 ParquetFromCsvError::with_context(e, "Failed to create zstd::Decoder")
388 })?) as Box<dyn Read>
389 }
390 d => unimplemented!("compression type {d}"),
391 };
392
393 let builder = configure_reader_builder(args, arrow_schema);
395 let reader = builder.build(input_file_decoder)?;
396 for batch_result in reader {
397 let batch = batch_result.map_err(|e| {
398 ParquetFromCsvError::with_context(e, "Failed to read RecordBatch from CSV")
399 })?;
400 arrow_writer.write(&batch).map_err(|e| {
401 ParquetFromCsvError::with_context(e, "Failed to write RecordBatch to parquet")
402 })?;
403 }
404 arrow_writer
405 .close()
406 .map_err(|e| ParquetFromCsvError::with_context(e, "Failed to close parquet"))?;
407 Ok(())
408}
409
410fn main() -> Result<(), ParquetFromCsvError> {
411 let args = Args::parse();
412 convert_csv_to_parquet(&args)
413}
414
415#[cfg(test)]
416mod tests {
417 use std::{
418 io::Write,
419 path::{Path, PathBuf},
420 };
421
422 use super::*;
423 use arrow::datatypes::{DataType, Field};
424 use brotli::CompressorWriter;
425 use clap::{CommandFactory, Parser};
426 use flate2::write::GzEncoder;
427 use parquet::basic::{BrotliLevel, GzipLevel, ZstdLevel};
428 use parquet::file::reader::{FileReader, SerializedFileReader};
429 use snap::write::FrameEncoder;
430 use tempfile::NamedTempFile;
431
432 #[test]
433 fn test_command_help() {
434 let mut cmd = Args::command();
435 let dir = std::env::var("CARGO_MANIFEST_DIR").unwrap();
436 let mut path_buf = PathBuf::from(dir);
437 path_buf.push("src");
438 path_buf.push("bin");
439 path_buf.push("parquet-fromcsv-help.txt");
440 let expected = std::fs::read_to_string(path_buf).unwrap();
441 let mut buffer_vec = Vec::new();
442 let mut buffer = std::io::Cursor::new(&mut buffer_vec);
443 cmd.write_long_help(&mut buffer).unwrap();
444 let mut actual = String::from_utf8(buffer_vec).unwrap();
446 let pos = actual.find('\n').unwrap() + 1;
447 actual = actual[pos..].to_string();
448 assert_eq!(
449 expected, actual,
450 "help text not match. please update to \n---\n{actual}\n---\n"
451 )
452 }
453
454 fn parse_args(mut extra_args: Vec<&str>) -> Result<Args, ParquetFromCsvError> {
455 let mut args = vec![
456 "test",
457 "--schema",
458 "test.schema",
459 "--input-file",
460 "infile.csv",
461 "--output-file",
462 "out.parquet",
463 ];
464 args.append(&mut extra_args);
465 let args = Args::try_parse_from(args.iter())?;
466 Ok(args)
467 }
468
469 #[test]
470 fn test_parse_arg_minimum() -> Result<(), ParquetFromCsvError> {
471 let args = parse_args(vec![])?;
472
473 assert_eq!(args.schema, PathBuf::from(Path::new("test.schema")));
474 assert_eq!(args.input_file, PathBuf::from(Path::new("infile.csv")));
475 assert_eq!(args.output_file, PathBuf::from(Path::new("out.parquet")));
476 assert_eq!(args.input_format, CsvDialect::Csv);
478 assert_eq!(args.batch_size, 1000);
479 assert!(!args.has_header);
480 assert_eq!(args.delimiter, None);
481 assert_eq!(args.get_delimiter(), b',');
482 assert_eq!(args.record_terminator, None);
483 assert_eq!(args.get_terminator(), None); assert_eq!(args.quote_char, None);
485 assert_eq!(args.get_quote(), Some(b'\"'));
486 assert_eq!(args.double_quote, None);
487 assert_eq!(args.parquet_compression, Compression::SNAPPY);
488 Ok(())
489 }
490
491 #[test]
492 fn test_parse_arg_format_variants() -> Result<(), ParquetFromCsvError> {
493 let args = parse_args(vec!["--input-format", "csv"])?;
494 assert_eq!(args.input_format, CsvDialect::Csv);
495 assert_eq!(args.get_delimiter(), b',');
496 assert_eq!(args.get_terminator(), None); assert_eq!(args.get_quote(), Some(b'\"'));
498 assert_eq!(args.get_escape(), None);
499 let args = parse_args(vec!["--input-format", "tsv"])?;
500 assert_eq!(args.input_format, CsvDialect::Tsv);
501 assert_eq!(args.get_delimiter(), b'\t');
502 assert_eq!(args.get_terminator(), Some(b'\x0a')); assert_eq!(args.get_quote(), None); assert_eq!(args.get_escape(), None);
505
506 let args = parse_args(vec!["--input-format", "csv", "--escape-char", "\\"])?;
507 assert_eq!(args.input_format, CsvDialect::Csv);
508 assert_eq!(args.get_delimiter(), b',');
509 assert_eq!(args.get_terminator(), None); assert_eq!(args.get_quote(), Some(b'\"'));
511 assert_eq!(args.get_escape(), Some(b'\\'));
512
513 let args = parse_args(vec!["--input-format", "tsv", "--delimiter", ":"])?;
514 assert_eq!(args.input_format, CsvDialect::Tsv);
515 assert_eq!(args.get_delimiter(), b':');
516 assert_eq!(args.get_terminator(), Some(b'\x0a')); assert_eq!(args.get_quote(), None); assert_eq!(args.get_escape(), None);
519
520 Ok(())
521 }
522
523 #[test]
524 #[should_panic]
525 fn test_parse_arg_format_error() {
526 parse_args(vec!["--input-format", "excel"]).unwrap();
527 }
528
529 #[test]
530 fn test_parse_arg_compression_format() {
531 let args = parse_args(vec!["--parquet-compression", "uncompressed"]).unwrap();
532 assert_eq!(args.parquet_compression, Compression::UNCOMPRESSED);
533 let args = parse_args(vec!["--parquet-compression", "snappy"]).unwrap();
534 assert_eq!(args.parquet_compression, Compression::SNAPPY);
535 let args = parse_args(vec!["--parquet-compression", "gzip"]).unwrap();
536 assert_eq!(
537 args.parquet_compression,
538 Compression::GZIP(Default::default())
539 );
540 let args = parse_args(vec!["--parquet-compression", "lzo"]).unwrap();
541 assert_eq!(args.parquet_compression, Compression::LZO);
542 let args = parse_args(vec!["--parquet-compression", "lz4"]).unwrap();
543 assert_eq!(args.parquet_compression, Compression::LZ4);
544 let args = parse_args(vec!["--parquet-compression", "brotli"]).unwrap();
545 assert_eq!(
546 args.parquet_compression,
547 Compression::BROTLI(Default::default())
548 );
549 let args = parse_args(vec!["--parquet-compression", "zstd"]).unwrap();
550 assert_eq!(
551 args.parquet_compression,
552 Compression::ZSTD(Default::default())
553 );
554 }
555
556 #[test]
557 fn test_parse_arg_compression_format_fail() {
558 match parse_args(vec!["--parquet-compression", "zip"]) {
559 Ok(_) => panic!("unexpected success"),
560 Err(e) => {
561 let err = e.to_string();
562 assert!(err.contains("error: invalid value 'zip' for '--parquet-compression <PARQUET_COMPRESSION>': Unknown compression ZIP : possible values UNCOMPRESSED, SNAPPY, GZIP, LZO, BROTLI, LZ4, ZSTD \n\nFor more information try --help"), "{err}")
563 }
564 }
565 }
566
567 fn assert_debug_text(debug_text: &str, name: &str, value: &str) {
568 let pattern = format!(" {name}: {value}");
569 assert!(
570 debug_text.contains(&pattern),
571 "\"{debug_text}\" not contains \"{pattern}\""
572 )
573 }
574
575 #[test]
576 fn test_configure_reader_builder() {
577 let args = Args {
578 schema: PathBuf::from(Path::new("schema.arvo")),
579 input_file: PathBuf::from(Path::new("test.csv")),
580 output_file: PathBuf::from(Path::new("out.parquet")),
581 batch_size: 1000,
582 input_format: CsvDialect::Csv,
583 has_header: false,
584 delimiter: None,
585 record_terminator: None,
586 escape_char: None,
587 quote_char: None,
588 double_quote: None,
589 csv_compression: Compression::UNCOMPRESSED,
590 parquet_compression: Compression::SNAPPY,
591 writer_version: None,
592 max_row_group_size: None,
593 enable_bloom_filter: None,
594 help: None,
595 };
596 let arrow_schema = Arc::new(Schema::new(vec![
597 Field::new("field1", DataType::Utf8, false),
598 Field::new("field2", DataType::Utf8, false),
599 Field::new("field3", DataType::Utf8, false),
600 Field::new("field4", DataType::Utf8, false),
601 Field::new("field5", DataType::Utf8, false),
602 ]));
603
604 let reader_builder = configure_reader_builder(&args, arrow_schema);
605 let builder_debug = format!("{reader_builder:?}");
606 assert_debug_text(&builder_debug, "header", "false");
607 assert_debug_text(&builder_debug, "delimiter", "Some(44)");
608 assert_debug_text(&builder_debug, "quote", "Some(34)");
609 assert_debug_text(&builder_debug, "terminator", "None");
610 assert_debug_text(&builder_debug, "batch_size", "1000");
611 assert_debug_text(&builder_debug, "escape", "None");
612
613 let args = Args {
614 schema: PathBuf::from(Path::new("schema.arvo")),
615 input_file: PathBuf::from(Path::new("test.csv")),
616 output_file: PathBuf::from(Path::new("out.parquet")),
617 batch_size: 2000,
618 input_format: CsvDialect::Tsv,
619 has_header: true,
620 delimiter: None,
621 record_terminator: None,
622 escape_char: Some('\\'),
623 quote_char: None,
624 double_quote: None,
625 csv_compression: Compression::UNCOMPRESSED,
626 parquet_compression: Compression::SNAPPY,
627 writer_version: None,
628 max_row_group_size: None,
629 enable_bloom_filter: None,
630 help: None,
631 };
632 let arrow_schema = Arc::new(Schema::new(vec![
633 Field::new("field1", DataType::Utf8, false),
634 Field::new("field2", DataType::Utf8, false),
635 Field::new("field3", DataType::Utf8, false),
636 Field::new("field4", DataType::Utf8, false),
637 Field::new("field5", DataType::Utf8, false),
638 ]));
639 let reader_builder = configure_reader_builder(&args, arrow_schema);
640 let builder_debug = format!("{reader_builder:?}");
641 assert_debug_text(&builder_debug, "header", "true");
642 assert_debug_text(&builder_debug, "delimiter", "Some(9)");
643 assert_debug_text(&builder_debug, "quote", "None");
644 assert_debug_text(&builder_debug, "terminator", "Some(10)");
645 assert_debug_text(&builder_debug, "batch_size", "2000");
646 assert_debug_text(&builder_debug, "escape", "Some(92)");
647 }
648
649 fn test_convert_compressed_csv_to_parquet(csv_compression: Compression) {
650 let schema = NamedTempFile::new().unwrap();
651 let schema_text = r"message my_amazing_schema {
652 optional int32 id;
653 optional binary name (STRING);
654 }";
655 schema.as_file().write_all(schema_text.as_bytes()).unwrap();
656
657 let mut input_file = NamedTempFile::new().unwrap();
658
659 fn write_tmp_file<T: Write>(w: &mut T) {
660 for index in 1..2000 {
661 write!(w, "{index},\"name_{index}\"\r\n").unwrap();
662 }
663 w.flush().unwrap();
664 }
665
666 input_file = match csv_compression {
668 Compression::UNCOMPRESSED => {
669 write_tmp_file(&mut input_file);
670 input_file
671 }
672 Compression::SNAPPY => {
673 let mut encoder = FrameEncoder::new(input_file);
674 write_tmp_file(&mut encoder);
675 encoder.into_inner().unwrap()
676 }
677 Compression::GZIP(level) => {
678 let mut encoder = GzEncoder::new(
679 input_file,
680 flate2::Compression::new(level.compression_level()),
681 );
682 write_tmp_file(&mut encoder);
683 encoder.finish().unwrap()
684 }
685 Compression::BROTLI(level) => {
686 let mut encoder =
687 CompressorWriter::new(input_file, 0, level.compression_level(), 0);
688 write_tmp_file(&mut encoder);
689 encoder.into_inner()
690 }
691 Compression::LZ4 => {
692 let mut encoder = lz4_flex::frame::FrameEncoder::new(input_file);
693 write_tmp_file(&mut encoder);
694 encoder.finish().unwrap()
695 }
696
697 Compression::ZSTD(level) => {
698 let mut encoder = zstd::Encoder::new(input_file, level.compression_level())
699 .map_err(|e| {
700 ParquetFromCsvError::with_context(e, "Failed to create zstd::Encoder")
701 })
702 .unwrap();
703 write_tmp_file(&mut encoder);
704 encoder.finish().unwrap()
705 }
706 d => unimplemented!("compression type {d}"),
707 };
708
709 let output_parquet = NamedTempFile::new().unwrap();
710
711 let args = Args {
712 schema: PathBuf::from(schema.path()),
713 input_file: PathBuf::from(input_file.path()),
714 output_file: PathBuf::from(output_parquet.path()),
715 batch_size: 1000,
716 input_format: CsvDialect::Csv,
717 has_header: false,
718 delimiter: None,
719 record_terminator: None,
720 escape_char: None,
721 quote_char: None,
722 double_quote: None,
723 csv_compression,
724 parquet_compression: Compression::SNAPPY,
725 writer_version: None,
726 max_row_group_size: None,
727 enable_bloom_filter: Some(true),
729 help: None,
730 };
731 convert_csv_to_parquet(&args).unwrap();
732
733 let file = SerializedFileReader::new(output_parquet.into_file()).unwrap();
734 let schema_name = file.metadata().file_metadata().schema().name();
735 assert_eq!(schema_name, "my_amazing_schema");
736 }
737
738 #[test]
739 fn test_convert_csv_to_parquet() {
740 test_convert_compressed_csv_to_parquet(Compression::UNCOMPRESSED);
741 test_convert_compressed_csv_to_parquet(Compression::SNAPPY);
742 test_convert_compressed_csv_to_parquet(Compression::GZIP(GzipLevel::try_new(1).unwrap()));
743 test_convert_compressed_csv_to_parquet(Compression::BROTLI(
744 BrotliLevel::try_new(2).unwrap(),
745 ));
746 test_convert_compressed_csv_to_parquet(Compression::LZ4);
747 test_convert_compressed_csv_to_parquet(Compression::ZSTD(ZstdLevel::try_new(1).unwrap()));
748 }
749}