1#![doc = include_str!("./parquet-fromcsv-help.txt")] use std::{
74 fmt::Display,
75 fs::{File, read_to_string},
76 io::Read,
77 path::{Path, PathBuf},
78 sync::Arc,
79};
80
81use arrow_csv::ReaderBuilder;
82use arrow_schema::{ArrowError, Schema};
83use clap::{Parser, ValueEnum};
84use parquet::arrow::arrow_writer::ArrowWriterOptions;
85use parquet::{
86 arrow::{ArrowWriter, parquet_to_arrow_schema},
87 basic::Compression,
88 errors::ParquetError,
89 file::properties::{WriterProperties, WriterVersion},
90 schema::{parser::parse_message_type, types::SchemaDescriptor},
91};
92
93#[derive(Debug)]
94enum ParquetFromCsvError {
95 CommandLineParseError(clap::Error),
96 IoError(std::io::Error),
97 ArrowError(ArrowError),
98 ParquetError(ParquetError),
99 WithContext(String, Box<Self>),
100}
101
102impl From<std::io::Error> for ParquetFromCsvError {
103 fn from(e: std::io::Error) -> Self {
104 Self::IoError(e)
105 }
106}
107
108impl From<ArrowError> for ParquetFromCsvError {
109 fn from(e: ArrowError) -> Self {
110 Self::ArrowError(e)
111 }
112}
113
114impl From<ParquetError> for ParquetFromCsvError {
115 fn from(e: ParquetError) -> Self {
116 Self::ParquetError(e)
117 }
118}
119
120impl From<clap::Error> for ParquetFromCsvError {
121 fn from(e: clap::Error) -> Self {
122 Self::CommandLineParseError(e)
123 }
124}
125
126impl ParquetFromCsvError {
127 pub fn with_context<E: Into<ParquetFromCsvError>>(
128 inner_error: E,
129 context: &str,
130 ) -> ParquetFromCsvError {
131 let inner = inner_error.into();
132 ParquetFromCsvError::WithContext(context.to_string(), Box::new(inner))
133 }
134}
135
136impl Display for ParquetFromCsvError {
137 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
138 match self {
139 ParquetFromCsvError::CommandLineParseError(e) => write!(f, "{e}"),
140 ParquetFromCsvError::IoError(e) => write!(f, "{e}"),
141 ParquetFromCsvError::ArrowError(e) => write!(f, "{e}"),
142 ParquetFromCsvError::ParquetError(e) => write!(f, "{e}"),
143 ParquetFromCsvError::WithContext(c, e) => {
144 writeln!(f, "{e}")?;
145 write!(f, "context: {c}")
146 }
147 }
148 }
149}
150
151#[derive(Debug, Parser)]
152#[clap(author, version, disable_help_flag=true, about("Binary to convert csv to Parquet"), long_about=None)]
153struct Args {
154 #[clap(short, long, help("message schema for output Parquet"))]
156 schema: PathBuf,
157 #[clap(short, long, help("input CSV file"))]
159 input_file: PathBuf,
160 #[clap(short, long, help("output Parquet file"))]
162 output_file: PathBuf,
163 #[clap(
165 value_enum,
166 short('f'),
167 long,
168 help("input file format"),
169 default_value_t=CsvDialect::Csv
170 )]
171 input_format: CsvDialect,
172 #[clap(
174 short,
175 long,
176 help("batch size"),
177 default_value_t = 1000,
178 env = "PARQUET_FROM_CSV_BATCHSIZE"
179 )]
180 batch_size: usize,
181 #[clap(short, long, help("has header"))]
183 has_header: bool,
184 #[clap(short, long, help("field delimiter"))]
190 delimiter: Option<char>,
191 #[clap(value_enum, short, long, help("record terminator"))]
192 record_terminator: Option<RecordTerminator>,
193 #[clap(short, long, help("escape character"))]
194 escape_char: Option<char>,
195 #[clap(short, long, help("quote character"))]
196 quote_char: Option<char>,
197 #[clap(short('D'), long, help("double quote"))]
198 double_quote: Option<bool>,
199 #[clap(short('C'), long, help("compression mode of csv"), default_value_t=Compression::UNCOMPRESSED)]
200 #[clap(value_parser=compression_from_str)]
201 csv_compression: Compression,
202 #[clap(short('c'), long, help("compression mode of parquet"), default_value_t=Compression::SNAPPY)]
203 #[clap(value_parser=compression_from_str)]
204 parquet_compression: Compression,
205
206 #[clap(short, long, help("writer version"))]
207 #[clap(value_parser=writer_version_from_str)]
208 writer_version: Option<WriterVersion>,
209 #[clap(short, long, help("max row group size"))]
210 max_row_group_size: Option<usize>,
211 #[clap(long, help("whether to enable bloom filter writing"))]
212 enable_bloom_filter: Option<bool>,
213
214 #[clap(long, action=clap::ArgAction::Help, help("display usage help"))]
215 help: Option<bool>,
216}
217
218fn compression_from_str(cmp: &str) -> Result<Compression, String> {
219 match cmp.to_uppercase().as_str() {
220 "UNCOMPRESSED" => Ok(Compression::UNCOMPRESSED),
221 "SNAPPY" => Ok(Compression::SNAPPY),
222 "GZIP" => Ok(Compression::GZIP(Default::default())),
223 "LZO" => Ok(Compression::LZO),
224 "BROTLI" => Ok(Compression::BROTLI(Default::default())),
225 "LZ4" => Ok(Compression::LZ4),
226 "ZSTD" => Ok(Compression::ZSTD(Default::default())),
227 v => Err(format!(
228 "Unknown compression {v} : possible values UNCOMPRESSED, SNAPPY, GZIP, LZO, BROTLI, LZ4, ZSTD \n\nFor more information try --help"
229 )),
230 }
231}
232
233fn writer_version_from_str(cmp: &str) -> Result<WriterVersion, String> {
234 match cmp.to_uppercase().as_str() {
235 "1" => Ok(WriterVersion::PARQUET_1_0),
236 "2" => Ok(WriterVersion::PARQUET_2_0),
237 v => Err(format!("Unknown writer version {v} : possible values 1, 2")),
238 }
239}
240
241impl Args {
242 fn schema_path(&self) -> &Path {
243 self.schema.as_path()
244 }
245 fn get_delimiter(&self) -> u8 {
246 match self.delimiter {
247 Some(ch) => ch as u8,
248 None => match self.input_format {
249 CsvDialect::Csv => b',',
250 CsvDialect::Tsv => b'\t',
251 },
252 }
253 }
254 fn get_terminator(&self) -> Option<u8> {
255 match self.record_terminator {
256 Some(RecordTerminator::LF) => Some(0x0a),
257 Some(RecordTerminator::CR) => Some(0x0d),
258 Some(RecordTerminator::Crlf) => None,
259 None => match self.input_format {
260 CsvDialect::Csv => None,
261 CsvDialect::Tsv => Some(0x0a),
262 },
263 }
264 }
265 fn get_escape(&self) -> Option<u8> {
266 self.escape_char.map(|ch| ch as u8)
267 }
268 fn get_quote(&self) -> Option<u8> {
269 if self.quote_char.is_none() {
270 match self.input_format {
271 CsvDialect::Csv => Some(b'\"'),
272 CsvDialect::Tsv => None,
273 }
274 } else {
275 self.quote_char.map(|c| c as u8)
276 }
277 }
278}
279
280#[derive(Debug, Clone, Copy, ValueEnum, PartialEq)]
281enum CsvDialect {
282 Csv,
283 Tsv,
284}
285
286#[derive(Debug, Clone, Copy, ValueEnum, PartialEq)]
287enum RecordTerminator {
288 LF,
289 Crlf,
290 CR,
291}
292
293fn configure_writer_properties(args: &Args) -> WriterProperties {
294 let mut properties_builder =
295 WriterProperties::builder().set_compression(args.parquet_compression);
296 if let Some(writer_version) = args.writer_version {
297 properties_builder = properties_builder.set_writer_version(writer_version);
298 }
299 if let Some(max_row_group_size) = args.max_row_group_size {
300 properties_builder =
301 properties_builder.set_max_row_group_row_count(Some(max_row_group_size));
302 }
303 if let Some(enable_bloom_filter) = args.enable_bloom_filter {
304 properties_builder = properties_builder.set_bloom_filter_enabled(enable_bloom_filter);
305 }
306 properties_builder.build()
307}
308
309fn configure_reader_builder(args: &Args, arrow_schema: Arc<Schema>) -> ReaderBuilder {
310 fn configure_reader<T, F: Fn(ReaderBuilder, T) -> ReaderBuilder>(
311 builder: ReaderBuilder,
312 value: Option<T>,
313 fun: F,
314 ) -> ReaderBuilder {
315 if let Some(val) = value {
316 fun(builder, val)
317 } else {
318 builder
319 }
320 }
321
322 let mut builder = ReaderBuilder::new(arrow_schema)
323 .with_batch_size(args.batch_size)
324 .with_header(args.has_header)
325 .with_delimiter(args.get_delimiter());
326
327 builder = configure_reader(
328 builder,
329 args.get_terminator(),
330 ReaderBuilder::with_terminator,
331 );
332 builder = configure_reader(builder, args.get_escape(), ReaderBuilder::with_escape);
333 builder = configure_reader(builder, args.get_quote(), ReaderBuilder::with_quote);
334
335 builder
336}
337
338fn convert_csv_to_parquet(args: &Args) -> Result<(), ParquetFromCsvError> {
339 let schema = read_to_string(args.schema_path()).map_err(|e| {
340 ParquetFromCsvError::with_context(
341 e,
342 &format!("Failed to open schema file {:#?}", args.schema_path()),
343 )
344 })?;
345 let parquet_schema = Arc::new(parse_message_type(&schema)?);
346 let desc = SchemaDescriptor::new(parquet_schema);
347 let arrow_schema = Arc::new(parquet_to_arrow_schema(&desc, None)?);
348
349 let parquet_file = File::create(&args.output_file).map_err(|e| {
351 ParquetFromCsvError::with_context(
352 e,
353 &format!("Failed to create output file {:#?}", &args.output_file),
354 )
355 })?;
356
357 let options = ArrowWriterOptions::new()
358 .with_properties(configure_writer_properties(args))
359 .with_schema_root(desc.name().to_string());
360
361 let mut arrow_writer =
362 ArrowWriter::try_new_with_options(parquet_file, arrow_schema.clone(), options)
363 .map_err(|e| ParquetFromCsvError::with_context(e, "Failed to create ArrowWriter"))?;
364
365 let input_file = File::open(&args.input_file).map_err(|e| {
367 ParquetFromCsvError::with_context(
368 e,
369 &format!("Failed to open input file {:#?}", &args.input_file),
370 )
371 })?;
372
373 let input_file_decoder = match args.csv_compression {
375 Compression::UNCOMPRESSED => Box::new(input_file) as Box<dyn Read>,
376 Compression::SNAPPY => Box::new(snap::read::FrameDecoder::new(input_file)) as Box<dyn Read>,
377 Compression::GZIP(_) => {
378 Box::new(flate2::read::MultiGzDecoder::new(input_file)) as Box<dyn Read>
379 }
380 Compression::BROTLI(_) => {
381 Box::new(brotli::Decompressor::new(input_file, 0)) as Box<dyn Read>
382 }
383 Compression::LZ4 => {
384 Box::new(lz4_flex::frame::FrameDecoder::new(input_file)) as Box<dyn Read>
385 }
386 Compression::ZSTD(_) => {
387 Box::new(zstd::Decoder::new(input_file).map_err(|e| {
388 ParquetFromCsvError::with_context(e, "Failed to create zstd::Decoder")
389 })?) as Box<dyn Read>
390 }
391 d => unimplemented!("compression type {d}"),
392 };
393
394 let builder = configure_reader_builder(args, arrow_schema);
396 let reader = builder.build(input_file_decoder)?;
397 for batch_result in reader {
398 let batch = batch_result.map_err(|e| {
399 ParquetFromCsvError::with_context(e, "Failed to read RecordBatch from CSV")
400 })?;
401 arrow_writer.write(&batch).map_err(|e| {
402 ParquetFromCsvError::with_context(e, "Failed to write RecordBatch to parquet")
403 })?;
404 }
405 arrow_writer
406 .close()
407 .map_err(|e| ParquetFromCsvError::with_context(e, "Failed to close parquet"))?;
408 Ok(())
409}
410
411fn main() -> Result<(), ParquetFromCsvError> {
412 let args = Args::parse();
413 convert_csv_to_parquet(&args)
414}
415
416#[cfg(test)]
417mod tests {
418 use std::{
419 io::Write,
420 path::{Path, PathBuf},
421 };
422
423 use super::*;
424 use arrow::datatypes::{DataType, Field};
425 use brotli::CompressorWriter;
426 use clap::{CommandFactory, Parser};
427 use flate2::write::GzEncoder;
428 use parquet::basic::{BrotliLevel, GzipLevel, ZstdLevel};
429 use parquet::file::reader::{FileReader, SerializedFileReader};
430 use snap::write::FrameEncoder;
431 use tempfile::NamedTempFile;
432
433 #[test]
434 fn test_command_help() {
435 let mut cmd = Args::command();
436 let dir = std::env::var("CARGO_MANIFEST_DIR").unwrap();
437 let mut path_buf = PathBuf::from(dir);
438 path_buf.push("src");
439 path_buf.push("bin");
440 path_buf.push("parquet-fromcsv-help.txt");
441 let expected = std::fs::read_to_string(path_buf).unwrap();
442 let mut buffer_vec = Vec::new();
443 let mut buffer = std::io::Cursor::new(&mut buffer_vec);
444 cmd.write_long_help(&mut buffer).unwrap();
445 let mut actual = String::from_utf8(buffer_vec).unwrap();
447 let pos = actual.find('\n').unwrap() + 1;
448 actual = actual[pos..].to_string();
449 assert_eq!(
450 expected, actual,
451 "help text not match. please update to \n---\n{actual}\n---\n"
452 )
453 }
454
455 fn parse_args(mut extra_args: Vec<&str>) -> Result<Args, ParquetFromCsvError> {
456 let mut args = vec![
457 "test",
458 "--schema",
459 "test.schema",
460 "--input-file",
461 "infile.csv",
462 "--output-file",
463 "out.parquet",
464 ];
465 args.append(&mut extra_args);
466 let args = Args::try_parse_from(args.iter())?;
467 Ok(args)
468 }
469
470 #[test]
471 fn test_parse_arg_minimum() -> Result<(), ParquetFromCsvError> {
472 let args = parse_args(vec![])?;
473
474 assert_eq!(args.schema, PathBuf::from(Path::new("test.schema")));
475 assert_eq!(args.input_file, PathBuf::from(Path::new("infile.csv")));
476 assert_eq!(args.output_file, PathBuf::from(Path::new("out.parquet")));
477 assert_eq!(args.input_format, CsvDialect::Csv);
479 assert_eq!(args.batch_size, 1000);
480 assert!(!args.has_header);
481 assert_eq!(args.delimiter, None);
482 assert_eq!(args.get_delimiter(), b',');
483 assert_eq!(args.record_terminator, None);
484 assert_eq!(args.get_terminator(), None); assert_eq!(args.quote_char, None);
486 assert_eq!(args.get_quote(), Some(b'\"'));
487 assert_eq!(args.double_quote, None);
488 assert_eq!(args.parquet_compression, Compression::SNAPPY);
489 Ok(())
490 }
491
492 #[test]
493 fn test_parse_arg_format_variants() -> Result<(), ParquetFromCsvError> {
494 let args = parse_args(vec!["--input-format", "csv"])?;
495 assert_eq!(args.input_format, CsvDialect::Csv);
496 assert_eq!(args.get_delimiter(), b',');
497 assert_eq!(args.get_terminator(), None); assert_eq!(args.get_quote(), Some(b'\"'));
499 assert_eq!(args.get_escape(), None);
500 let args = parse_args(vec!["--input-format", "tsv"])?;
501 assert_eq!(args.input_format, CsvDialect::Tsv);
502 assert_eq!(args.get_delimiter(), b'\t');
503 assert_eq!(args.get_terminator(), Some(b'\x0a')); assert_eq!(args.get_quote(), None); assert_eq!(args.get_escape(), None);
506
507 let args = parse_args(vec!["--input-format", "csv", "--escape-char", "\\"])?;
508 assert_eq!(args.input_format, CsvDialect::Csv);
509 assert_eq!(args.get_delimiter(), b',');
510 assert_eq!(args.get_terminator(), None); assert_eq!(args.get_quote(), Some(b'\"'));
512 assert_eq!(args.get_escape(), Some(b'\\'));
513
514 let args = parse_args(vec!["--input-format", "tsv", "--delimiter", ":"])?;
515 assert_eq!(args.input_format, CsvDialect::Tsv);
516 assert_eq!(args.get_delimiter(), b':');
517 assert_eq!(args.get_terminator(), Some(b'\x0a')); assert_eq!(args.get_quote(), None); assert_eq!(args.get_escape(), None);
520
521 Ok(())
522 }
523
524 #[test]
525 #[should_panic]
526 fn test_parse_arg_format_error() {
527 parse_args(vec!["--input-format", "excel"]).unwrap();
528 }
529
530 #[test]
531 fn test_parse_arg_compression_format() {
532 let args = parse_args(vec!["--parquet-compression", "uncompressed"]).unwrap();
533 assert_eq!(args.parquet_compression, Compression::UNCOMPRESSED);
534 let args = parse_args(vec!["--parquet-compression", "snappy"]).unwrap();
535 assert_eq!(args.parquet_compression, Compression::SNAPPY);
536 let args = parse_args(vec!["--parquet-compression", "gzip"]).unwrap();
537 assert_eq!(
538 args.parquet_compression,
539 Compression::GZIP(Default::default())
540 );
541 let args = parse_args(vec!["--parquet-compression", "lzo"]).unwrap();
542 assert_eq!(args.parquet_compression, Compression::LZO);
543 let args = parse_args(vec!["--parquet-compression", "lz4"]).unwrap();
544 assert_eq!(args.parquet_compression, Compression::LZ4);
545 let args = parse_args(vec!["--parquet-compression", "brotli"]).unwrap();
546 assert_eq!(
547 args.parquet_compression,
548 Compression::BROTLI(Default::default())
549 );
550 let args = parse_args(vec!["--parquet-compression", "zstd"]).unwrap();
551 assert_eq!(
552 args.parquet_compression,
553 Compression::ZSTD(Default::default())
554 );
555 }
556
557 #[test]
558 fn test_parse_arg_compression_format_fail() {
559 match parse_args(vec!["--parquet-compression", "zip"]) {
560 Ok(_) => panic!("unexpected success"),
561 Err(e) => {
562 let err = e.to_string();
563 assert!(err.contains("error: invalid value 'zip' for '--parquet-compression <PARQUET_COMPRESSION>': Unknown compression ZIP : possible values UNCOMPRESSED, SNAPPY, GZIP, LZO, BROTLI, LZ4, ZSTD \n\nFor more information try --help"), "{err}")
564 }
565 }
566 }
567
568 fn assert_debug_text(debug_text: &str, name: &str, value: &str) {
569 let pattern = format!(" {name}: {value}");
570 assert!(
571 debug_text.contains(&pattern),
572 "\"{debug_text}\" not contains \"{pattern}\""
573 )
574 }
575
576 #[test]
577 fn test_configure_reader_builder() {
578 let args = Args {
579 schema: PathBuf::from(Path::new("schema.arvo")),
580 input_file: PathBuf::from(Path::new("test.csv")),
581 output_file: PathBuf::from(Path::new("out.parquet")),
582 batch_size: 1000,
583 input_format: CsvDialect::Csv,
584 has_header: false,
585 delimiter: None,
586 record_terminator: None,
587 escape_char: None,
588 quote_char: None,
589 double_quote: None,
590 csv_compression: Compression::UNCOMPRESSED,
591 parquet_compression: Compression::SNAPPY,
592 writer_version: None,
593 max_row_group_size: None,
594 enable_bloom_filter: None,
595 help: None,
596 };
597 let arrow_schema = Arc::new(Schema::new(vec![
598 Field::new("field1", DataType::Utf8, false),
599 Field::new("field2", DataType::Utf8, false),
600 Field::new("field3", DataType::Utf8, false),
601 Field::new("field4", DataType::Utf8, false),
602 Field::new("field5", DataType::Utf8, false),
603 ]));
604
605 let reader_builder = configure_reader_builder(&args, arrow_schema);
606 let builder_debug = format!("{reader_builder:?}");
607 assert_debug_text(&builder_debug, "header", "false");
608 assert_debug_text(&builder_debug, "delimiter", "Some(44)");
609 assert_debug_text(&builder_debug, "quote", "Some(34)");
610 assert_debug_text(&builder_debug, "terminator", "None");
611 assert_debug_text(&builder_debug, "batch_size", "1000");
612 assert_debug_text(&builder_debug, "escape", "None");
613
614 let args = Args {
615 schema: PathBuf::from(Path::new("schema.arvo")),
616 input_file: PathBuf::from(Path::new("test.csv")),
617 output_file: PathBuf::from(Path::new("out.parquet")),
618 batch_size: 2000,
619 input_format: CsvDialect::Tsv,
620 has_header: true,
621 delimiter: None,
622 record_terminator: None,
623 escape_char: Some('\\'),
624 quote_char: None,
625 double_quote: None,
626 csv_compression: Compression::UNCOMPRESSED,
627 parquet_compression: Compression::SNAPPY,
628 writer_version: None,
629 max_row_group_size: None,
630 enable_bloom_filter: None,
631 help: None,
632 };
633 let arrow_schema = Arc::new(Schema::new(vec![
634 Field::new("field1", DataType::Utf8, false),
635 Field::new("field2", DataType::Utf8, false),
636 Field::new("field3", DataType::Utf8, false),
637 Field::new("field4", DataType::Utf8, false),
638 Field::new("field5", DataType::Utf8, false),
639 ]));
640 let reader_builder = configure_reader_builder(&args, arrow_schema);
641 let builder_debug = format!("{reader_builder:?}");
642 assert_debug_text(&builder_debug, "header", "true");
643 assert_debug_text(&builder_debug, "delimiter", "Some(9)");
644 assert_debug_text(&builder_debug, "quote", "None");
645 assert_debug_text(&builder_debug, "terminator", "Some(10)");
646 assert_debug_text(&builder_debug, "batch_size", "2000");
647 assert_debug_text(&builder_debug, "escape", "Some(92)");
648 }
649
650 fn test_convert_compressed_csv_to_parquet(csv_compression: Compression) {
651 let schema = NamedTempFile::new().unwrap();
652 let schema_text = r"message my_amazing_schema {
653 optional int32 id;
654 optional binary name (STRING);
655 }";
656 schema.as_file().write_all(schema_text.as_bytes()).unwrap();
657
658 let mut input_file = NamedTempFile::new().unwrap();
659
660 fn write_tmp_file<T: Write>(w: &mut T) {
661 for index in 1..2000 {
662 write!(w, "{index},\"name_{index}\"\r\n").unwrap();
663 }
664 w.flush().unwrap();
665 }
666
667 input_file = match csv_compression {
669 Compression::UNCOMPRESSED => {
670 write_tmp_file(&mut input_file);
671 input_file
672 }
673 Compression::SNAPPY => {
674 let mut encoder = FrameEncoder::new(input_file);
675 write_tmp_file(&mut encoder);
676 encoder.into_inner().unwrap()
677 }
678 Compression::GZIP(level) => {
679 let mut encoder = GzEncoder::new(
680 input_file,
681 flate2::Compression::new(level.compression_level()),
682 );
683 write_tmp_file(&mut encoder);
684 encoder.finish().unwrap()
685 }
686 Compression::BROTLI(level) => {
687 let mut encoder =
688 CompressorWriter::new(input_file, 0, level.compression_level(), 0);
689 write_tmp_file(&mut encoder);
690 encoder.into_inner()
691 }
692 Compression::LZ4 => {
693 let mut encoder = lz4_flex::frame::FrameEncoder::new(input_file);
694 write_tmp_file(&mut encoder);
695 encoder.finish().unwrap()
696 }
697
698 Compression::ZSTD(level) => {
699 let mut encoder = zstd::Encoder::new(input_file, level.compression_level())
700 .map_err(|e| {
701 ParquetFromCsvError::with_context(e, "Failed to create zstd::Encoder")
702 })
703 .unwrap();
704 write_tmp_file(&mut encoder);
705 encoder.finish().unwrap()
706 }
707 d => unimplemented!("compression type {d}"),
708 };
709
710 let output_parquet = NamedTempFile::new().unwrap();
711
712 let args = Args {
713 schema: PathBuf::from(schema.path()),
714 input_file: PathBuf::from(input_file.path()),
715 output_file: PathBuf::from(output_parquet.path()),
716 batch_size: 1000,
717 input_format: CsvDialect::Csv,
718 has_header: false,
719 delimiter: None,
720 record_terminator: None,
721 escape_char: None,
722 quote_char: None,
723 double_quote: None,
724 csv_compression,
725 parquet_compression: Compression::SNAPPY,
726 writer_version: None,
727 max_row_group_size: None,
728 enable_bloom_filter: Some(true),
730 help: None,
731 };
732 convert_csv_to_parquet(&args).unwrap();
733
734 let file = SerializedFileReader::new(output_parquet.into_file()).unwrap();
735 let schema_name = file.metadata().file_metadata().schema().name();
736 assert_eq!(schema_name, "my_amazing_schema");
737 }
738
739 #[test]
740 fn test_convert_csv_to_parquet() {
741 test_convert_compressed_csv_to_parquet(Compression::UNCOMPRESSED);
742 test_convert_compressed_csv_to_parquet(Compression::SNAPPY);
743 test_convert_compressed_csv_to_parquet(Compression::GZIP(GzipLevel::try_new(1).unwrap()));
744 test_convert_compressed_csv_to_parquet(Compression::BROTLI(
745 BrotliLevel::try_new(2).unwrap(),
746 ));
747 test_convert_compressed_csv_to_parquet(Compression::LZ4);
748 test_convert_compressed_csv_to_parquet(Compression::ZSTD(ZstdLevel::try_new(1).unwrap()));
749 }
750}