1use arrow_array::*;
177use arrow_cast::display::*;
178use arrow_schema::*;
179use csv::ByteRecord;
180use std::io::Write;
181
182use crate::map_csv_error;
183const DEFAULT_NULL_VALUE: &str = "";
184
185pub use csv::QuoteStyle;
200
201#[derive(Debug)]
205pub struct Writer<W: Write> {
206 writer: csv::Writer<W>,
208 has_headers: bool,
210 date_format: Option<String>,
212 datetime_format: Option<String>,
214 timestamp_format: Option<String>,
216 timestamp_tz_format: Option<String>,
218 time_format: Option<String>,
220 beginning: bool,
222 null_value: Option<String>,
224 ignore_leading_whitespace: bool,
226 ignore_trailing_whitespace: bool,
228}
229
230impl<W: Write> Writer<W> {
231 pub fn new(writer: W) -> Self {
236 let delimiter = b',';
237 WriterBuilder::new().with_delimiter(delimiter).build(writer)
238 }
239
240 pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
242 let num_columns = batch.num_columns();
243 if self.beginning {
244 if self.has_headers {
245 let mut headers: Vec<String> = Vec::with_capacity(num_columns);
246 batch
247 .schema()
248 .fields()
249 .iter()
250 .for_each(|field| headers.push(field.name().to_string()));
251 self.writer
252 .write_record(&headers[..])
253 .map_err(map_csv_error)?;
254 }
255 self.beginning = false;
256 }
257
258 let options = FormatOptions::default()
259 .with_null(self.null_value.as_deref().unwrap_or(DEFAULT_NULL_VALUE))
260 .with_date_format(self.date_format.as_deref())
261 .with_datetime_format(self.datetime_format.as_deref())
262 .with_timestamp_format(self.timestamp_format.as_deref())
263 .with_timestamp_tz_format(self.timestamp_tz_format.as_deref())
264 .with_time_format(self.time_format.as_deref());
265
266 let converters = batch
267 .columns()
268 .iter()
269 .map(|a| {
270 if a.data_type().is_nested() {
271 Err(ArrowError::CsvError(format!(
272 "Nested type {} is not supported in CSV",
273 a.data_type()
274 )))
275 } else {
276 ArrayFormatter::try_new(a.as_ref(), &options)
277 }
278 })
279 .collect::<Result<Vec<_>, ArrowError>>()?;
280
281 let mut buffer = String::with_capacity(1024);
282 let mut byte_record = ByteRecord::with_capacity(1024, converters.len());
283
284 for row_idx in 0..batch.num_rows() {
285 byte_record.clear();
286 for (col_idx, converter) in converters.iter().enumerate() {
287 buffer.clear();
288 converter.value(row_idx).write(&mut buffer).map_err(|e| {
289 ArrowError::CsvError(format!(
290 "Error processing row {}, col {}: {e}",
291 row_idx + 1,
292 col_idx + 1
293 ))
294 })?;
295
296 let field_bytes =
297 self.get_trimmed_field_bytes(&buffer, batch.column(col_idx).data_type());
298 byte_record.push_field(field_bytes);
299 }
300
301 self.writer
302 .write_byte_record(&byte_record)
303 .map_err(map_csv_error)?;
304 }
305 self.writer.flush()?;
306
307 Ok(())
308 }
309
310 fn get_trimmed_field_bytes<'a>(&self, buffer: &'a str, data_type: &DataType) -> &'a [u8] {
312 let should_trim = (self.ignore_leading_whitespace || self.ignore_trailing_whitespace)
314 && matches!(
315 data_type,
316 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
317 );
318
319 if !should_trim {
320 return buffer.as_bytes();
321 }
322
323 let mut trimmed = buffer;
324 if self.ignore_leading_whitespace {
325 trimmed = trimmed.trim_start();
326 }
327 if self.ignore_trailing_whitespace {
328 trimmed = trimmed.trim_end();
329 }
330 trimmed.as_bytes()
331 }
332
333 pub fn into_inner(self) -> W {
335 self.writer.into_inner().unwrap()
337 }
338}
339
340impl<W: Write> RecordBatchWriter for Writer<W> {
341 fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
342 self.write(batch)
343 }
344
345 fn close(self) -> Result<(), ArrowError> {
346 Ok(())
347 }
348}
349
350#[derive(Clone, Debug)]
352pub struct WriterBuilder {
353 delimiter: u8,
355 has_header: bool,
357 quote: u8,
359 escape: u8,
361 terminator: Terminator,
363 double_quote: bool,
365 date_format: Option<String>,
367 datetime_format: Option<String>,
369 timestamp_format: Option<String>,
371 timestamp_tz_format: Option<String>,
373 time_format: Option<String>,
375 null_value: Option<String>,
377 ignore_leading_whitespace: bool,
379 ignore_trailing_whitespace: bool,
381 quote_style: QuoteStyle,
383}
384
385#[derive(Clone, Debug)]
387pub enum Terminator {
388 CRLF,
390 Any(u8),
392}
393
394impl Default for WriterBuilder {
395 fn default() -> Self {
396 WriterBuilder {
397 delimiter: b',',
398 has_header: true,
399 quote: b'"',
400 escape: b'\\',
401 terminator: Terminator::Any(b'\n'),
402 double_quote: true,
403 date_format: None,
404 datetime_format: None,
405 timestamp_format: None,
406 timestamp_tz_format: None,
407 time_format: None,
408 null_value: None,
409 ignore_leading_whitespace: false,
410 ignore_trailing_whitespace: false,
411 quote_style: QuoteStyle::default(),
412 }
413 }
414}
415
416impl WriterBuilder {
417 pub fn new() -> Self {
439 Self::default()
440 }
441
442 pub fn with_header(mut self, header: bool) -> Self {
444 self.has_header = header;
445 self
446 }
447
448 pub fn header(&self) -> bool {
450 self.has_header
451 }
452
453 pub fn with_delimiter(mut self, delimiter: u8) -> Self {
455 self.delimiter = delimiter;
456 self
457 }
458
459 pub fn delimiter(&self) -> u8 {
461 self.delimiter
462 }
463
464 pub fn with_quote(mut self, quote: u8) -> Self {
466 self.quote = quote;
467 self
468 }
469
470 pub fn quote(&self) -> u8 {
472 self.quote
473 }
474
475 pub fn with_escape(mut self, escape: u8) -> Self {
483 self.escape = escape;
484 self
485 }
486
487 pub fn escape(&self) -> u8 {
489 self.escape
490 }
491
492 pub fn with_double_quote(mut self, double_quote: bool) -> Self {
500 self.double_quote = double_quote;
501 self
502 }
503
504 pub fn double_quote(&self) -> bool {
506 self.double_quote
507 }
508
509 pub fn with_date_format(mut self, format: String) -> Self {
511 self.date_format = Some(format);
512 self
513 }
514
515 pub fn date_format(&self) -> Option<&str> {
517 self.date_format.as_deref()
518 }
519
520 pub fn with_datetime_format(mut self, format: String) -> Self {
522 self.datetime_format = Some(format);
523 self
524 }
525
526 pub fn datetime_format(&self) -> Option<&str> {
528 self.datetime_format.as_deref()
529 }
530
531 pub fn with_time_format(mut self, format: String) -> Self {
533 self.time_format = Some(format);
534 self
535 }
536
537 pub fn time_format(&self) -> Option<&str> {
539 self.time_format.as_deref()
540 }
541
542 pub fn with_timestamp_format(mut self, format: String) -> Self {
544 self.timestamp_format = Some(format);
545 self
546 }
547
548 pub fn timestamp_format(&self) -> Option<&str> {
550 self.timestamp_format.as_deref()
551 }
552
553 pub fn with_timestamp_tz_format(mut self, tz_format: String) -> Self {
555 self.timestamp_tz_format = Some(tz_format);
556 self
557 }
558
559 pub fn timestamp_tz_format(&self) -> Option<&str> {
561 self.timestamp_tz_format.as_deref()
562 }
563
564 pub fn with_null(mut self, null_value: String) -> Self {
566 self.null_value = Some(null_value);
567 self
568 }
569
570 pub fn null(&self) -> &str {
572 self.null_value.as_deref().unwrap_or(DEFAULT_NULL_VALUE)
573 }
574
575 pub fn with_ignore_leading_whitespace(mut self, ignore: bool) -> Self {
578 self.ignore_leading_whitespace = ignore;
579 self
580 }
581
582 pub fn ignore_leading_whitespace(&self) -> bool {
584 self.ignore_leading_whitespace
585 }
586
587 pub fn with_ignore_trailing_whitespace(mut self, ignore: bool) -> Self {
590 self.ignore_trailing_whitespace = ignore;
591 self
592 }
593
594 pub fn ignore_trailing_whitespace(&self) -> bool {
596 self.ignore_trailing_whitespace
597 }
598
599 pub fn with_quote_style(mut self, quote_style: QuoteStyle) -> Self {
615 self.quote_style = quote_style;
616 self
617 }
618
619 pub fn quote_style(&self) -> QuoteStyle {
621 self.quote_style
622 }
623
624 pub fn with_line_terminator(mut self, terminator: Terminator) -> Self {
626 self.terminator = terminator;
627 self
628 }
629
630 pub fn line_terminator(&self) -> &Terminator {
632 &self.terminator
633 }
634
635 pub fn build<W: Write>(self, writer: W) -> Writer<W> {
637 let mut builder = csv::WriterBuilder::new();
638
639 let terminator = match self.terminator {
640 Terminator::CRLF => csv::Terminator::CRLF,
641 Terminator::Any(byte) => csv::Terminator::Any(byte),
642 };
643
644 let writer = builder
645 .delimiter(self.delimiter)
646 .quote(self.quote)
647 .quote_style(self.quote_style)
648 .double_quote(self.double_quote)
649 .escape(self.escape)
650 .terminator(terminator)
651 .from_writer(writer);
652 Writer {
653 writer,
654 beginning: true,
655 has_headers: self.has_header,
656 date_format: self.date_format,
657 datetime_format: self.datetime_format,
658 time_format: self.time_format,
659 timestamp_format: self.timestamp_format,
660 timestamp_tz_format: self.timestamp_tz_format,
661 null_value: self.null_value,
662 ignore_leading_whitespace: self.ignore_leading_whitespace,
663 ignore_trailing_whitespace: self.ignore_trailing_whitespace,
664 }
665 }
666}
667
668#[cfg(test)]
669mod tests {
670 use super::*;
671
672 use crate::ReaderBuilder;
673 use arrow_array::builder::{
674 BinaryBuilder, Decimal32Builder, Decimal64Builder, Decimal128Builder, Decimal256Builder,
675 FixedSizeBinaryBuilder, LargeBinaryBuilder,
676 };
677 use arrow_array::types::*;
678 use arrow_buffer::i256;
679 use core::str;
680 use std::io::{Cursor, Read, Seek};
681 use std::sync::Arc;
682
683 #[test]
684 fn test_write_csv() {
685 let schema = Schema::new(vec![
686 Field::new("c1", DataType::Utf8, false),
687 Field::new("c2", DataType::Float64, true),
688 Field::new("c3", DataType::UInt32, false),
689 Field::new("c4", DataType::Boolean, true),
690 Field::new("c5", DataType::Timestamp(TimeUnit::Millisecond, None), true),
691 Field::new("c6", DataType::Time32(TimeUnit::Second), false),
692 Field::new_dictionary("c7", DataType::Int32, DataType::Utf8, false),
693 ]);
694
695 let c1 = StringArray::from(vec![
696 "Lorem ipsum dolor sit amet",
697 "consectetur adipiscing elit",
698 "sed do eiusmod tempor",
699 ]);
700 let c2 =
701 PrimitiveArray::<Float64Type>::from(vec![Some(123.564532), None, Some(-556132.25)]);
702 let c3 = PrimitiveArray::<UInt32Type>::from(vec![3, 2, 1]);
703 let c4 = BooleanArray::from(vec![Some(true), Some(false), None]);
704 let c5 =
705 TimestampMillisecondArray::from(vec![None, Some(1555584887378), Some(1555555555555)]);
706 let c6 = Time32SecondArray::from(vec![1234, 24680, 85563]);
707 let c7: DictionaryArray<Int32Type> =
708 vec!["cupcakes", "cupcakes", "foo"].into_iter().collect();
709
710 let batch = RecordBatch::try_new(
711 Arc::new(schema),
712 vec![
713 Arc::new(c1),
714 Arc::new(c2),
715 Arc::new(c3),
716 Arc::new(c4),
717 Arc::new(c5),
718 Arc::new(c6),
719 Arc::new(c7),
720 ],
721 )
722 .unwrap();
723
724 let mut file = tempfile::tempfile().unwrap();
725
726 let mut writer = Writer::new(&mut file);
727 let batches = vec![&batch, &batch];
728 for batch in batches {
729 writer.write(batch).unwrap();
730 }
731 drop(writer);
732
733 file.rewind().unwrap();
735 let mut buffer: Vec<u8> = vec![];
736 file.read_to_end(&mut buffer).unwrap();
737
738 let expected = r#"c1,c2,c3,c4,c5,c6,c7
739Lorem ipsum dolor sit amet,123.564532,3,true,,00:20:34,cupcakes
740consectetur adipiscing elit,,2,false,2019-04-18T10:54:47.378,06:51:20,cupcakes
741sed do eiusmod tempor,-556132.25,1,,2019-04-18T02:45:55.555,23:46:03,foo
742Lorem ipsum dolor sit amet,123.564532,3,true,,00:20:34,cupcakes
743consectetur adipiscing elit,,2,false,2019-04-18T10:54:47.378,06:51:20,cupcakes
744sed do eiusmod tempor,-556132.25,1,,2019-04-18T02:45:55.555,23:46:03,foo
745"#;
746 assert_eq!(expected, str::from_utf8(&buffer).unwrap());
747 }
748
749 #[test]
750 fn test_write_csv_decimal() {
751 let schema = Schema::new(vec![
752 Field::new("c1", DataType::Decimal32(9, 6), true),
753 Field::new("c2", DataType::Decimal64(17, 6), true),
754 Field::new("c3", DataType::Decimal128(38, 6), true),
755 Field::new("c4", DataType::Decimal256(76, 6), true),
756 ]);
757
758 let mut c1_builder = Decimal32Builder::new().with_data_type(DataType::Decimal32(9, 6));
759 c1_builder.extend(vec![Some(-3335724), Some(2179404), None, Some(290472)]);
760 let c1 = c1_builder.finish();
761
762 let mut c2_builder = Decimal64Builder::new().with_data_type(DataType::Decimal64(17, 6));
763 c2_builder.extend(vec![Some(-3335724), Some(2179404), None, Some(290472)]);
764 let c2 = c2_builder.finish();
765
766 let mut c3_builder = Decimal128Builder::new().with_data_type(DataType::Decimal128(38, 6));
767 c3_builder.extend(vec![Some(-3335724), Some(2179404), None, Some(290472)]);
768 let c3 = c3_builder.finish();
769
770 let mut c4_builder = Decimal256Builder::new().with_data_type(DataType::Decimal256(76, 6));
771 c4_builder.extend(vec![
772 Some(i256::from_i128(-3335724)),
773 Some(i256::from_i128(2179404)),
774 None,
775 Some(i256::from_i128(290472)),
776 ]);
777 let c4 = c4_builder.finish();
778
779 let batch = RecordBatch::try_new(
780 Arc::new(schema),
781 vec![Arc::new(c1), Arc::new(c2), Arc::new(c3), Arc::new(c4)],
782 )
783 .unwrap();
784
785 let mut file = tempfile::tempfile().unwrap();
786
787 let mut writer = Writer::new(&mut file);
788 let batches = vec![&batch, &batch];
789 for batch in batches {
790 writer.write(batch).unwrap();
791 }
792 drop(writer);
793
794 file.rewind().unwrap();
796 let mut buffer: Vec<u8> = vec![];
797 file.read_to_end(&mut buffer).unwrap();
798
799 let expected = r#"c1,c2,c3,c4
800-3.335724,-3.335724,-3.335724,-3.335724
8012.179404,2.179404,2.179404,2.179404
802,,,
8030.290472,0.290472,0.290472,0.290472
804-3.335724,-3.335724,-3.335724,-3.335724
8052.179404,2.179404,2.179404,2.179404
806,,,
8070.290472,0.290472,0.290472,0.290472
808"#;
809 assert_eq!(expected, str::from_utf8(&buffer).unwrap());
810 }
811
812 #[test]
813 fn test_write_csv_custom_options() {
814 let schema = Schema::new(vec![
815 Field::new("c1", DataType::Utf8, false),
816 Field::new("c2", DataType::Float64, true),
817 Field::new("c3", DataType::UInt32, false),
818 Field::new("c4", DataType::Boolean, true),
819 Field::new("c6", DataType::Time32(TimeUnit::Second), false),
820 ]);
821
822 let c1 = StringArray::from(vec![
823 "Lorem ipsum \ndolor sit amet",
824 "consectetur \"adipiscing\" elit",
825 "sed do eiusmod tempor",
826 ]);
827 let c2 =
828 PrimitiveArray::<Float64Type>::from(vec![Some(123.564532), None, Some(-556132.25)]);
829 let c3 = PrimitiveArray::<UInt32Type>::from(vec![3, 2, 1]);
830 let c4 = BooleanArray::from(vec![Some(true), Some(false), None]);
831 let c6 = Time32SecondArray::from(vec![1234, 24680, 85563]);
832
833 let batch = RecordBatch::try_new(
834 Arc::new(schema),
835 vec![
836 Arc::new(c1),
837 Arc::new(c2),
838 Arc::new(c3),
839 Arc::new(c4),
840 Arc::new(c6),
841 ],
842 )
843 .unwrap();
844
845 let mut file = tempfile::tempfile().unwrap();
846
847 let builder = WriterBuilder::new()
848 .with_header(false)
849 .with_delimiter(b'|')
850 .with_quote(b'\'')
851 .with_null("NULL".to_string())
852 .with_time_format("%r".to_string());
853 let mut writer = builder.build(&mut file);
854 let batches = vec![&batch];
855 for batch in batches {
856 writer.write(batch).unwrap();
857 }
858 drop(writer);
859
860 file.rewind().unwrap();
862 let mut buffer: Vec<u8> = vec![];
863 file.read_to_end(&mut buffer).unwrap();
864
865 assert_eq!(
866 "'Lorem ipsum \ndolor sit amet'|123.564532|3|true|12:20:34 AM\nconsectetur \"adipiscing\" elit|NULL|2|false|06:51:20 AM\nsed do eiusmod tempor|-556132.25|1|NULL|11:46:03 PM\n"
867 .to_string(),
868 String::from_utf8(buffer).unwrap()
869 );
870
871 let mut file = tempfile::tempfile().unwrap();
872
873 let builder = WriterBuilder::new()
874 .with_header(true)
875 .with_double_quote(false)
876 .with_escape(b'$');
877 let mut writer = builder.build(&mut file);
878 let batches = vec![&batch];
879 for batch in batches {
880 writer.write(batch).unwrap();
881 }
882 drop(writer);
883
884 file.rewind().unwrap();
885 let mut buffer: Vec<u8> = vec![];
886 file.read_to_end(&mut buffer).unwrap();
887
888 assert_eq!(
889 "c1,c2,c3,c4,c6\n\"Lorem ipsum \ndolor sit amet\",123.564532,3,true,00:20:34\n\"consectetur $\"adipiscing$\" elit\",,2,false,06:51:20\nsed do eiusmod tempor,-556132.25,1,,23:46:03\n"
890 .to_string(),
891 String::from_utf8(buffer).unwrap()
892 );
893 }
894
895 #[test]
896 fn test_conversion_consistency() {
897 let schema = Schema::new(vec![
900 Field::new("c1", DataType::Date32, false),
901 Field::new("c2", DataType::Date64, false),
902 Field::new("c3", DataType::Timestamp(TimeUnit::Nanosecond, None), false),
903 ]);
904
905 let nanoseconds = vec![
906 1599566300000000000,
907 1599566200000000000,
908 1599566100000000000,
909 ];
910 let c1 = Date32Array::from(vec![3, 2, 1]);
911 let c2 = Date64Array::from(vec![3, 2, 1]);
912 let c3 = TimestampNanosecondArray::from(nanoseconds.clone());
913
914 let batch = RecordBatch::try_new(
915 Arc::new(schema.clone()),
916 vec![Arc::new(c1), Arc::new(c2), Arc::new(c3)],
917 )
918 .unwrap();
919
920 let builder = WriterBuilder::new().with_header(false);
921
922 let mut buf: Cursor<Vec<u8>> = Default::default();
923 {
925 let mut writer = builder.build(&mut buf);
926 writer.write(&batch).unwrap();
927 }
928 buf.set_position(0);
929
930 let mut reader = ReaderBuilder::new(Arc::new(schema))
931 .with_batch_size(3)
932 .build_buffered(buf)
933 .unwrap();
934
935 let rb = reader.next().unwrap().unwrap();
936 let c1 = rb.column(0).as_any().downcast_ref::<Date32Array>().unwrap();
937 let c2 = rb.column(1).as_any().downcast_ref::<Date64Array>().unwrap();
938 let c3 = rb
939 .column(2)
940 .as_any()
941 .downcast_ref::<TimestampNanosecondArray>()
942 .unwrap();
943
944 let actual = c1.into_iter().collect::<Vec<_>>();
945 let expected = vec![Some(3), Some(2), Some(1)];
946 assert_eq!(actual, expected);
947 let actual = c2.into_iter().collect::<Vec<_>>();
948 let expected = vec![Some(3), Some(2), Some(1)];
949 assert_eq!(actual, expected);
950 let actual = c3.into_iter().collect::<Vec<_>>();
951 let expected = nanoseconds.into_iter().map(Some).collect::<Vec<_>>();
952 assert_eq!(actual, expected);
953 }
954
955 #[test]
956 fn test_write_csv_invalid_cast() {
957 let schema = Schema::new(vec![
958 Field::new("c0", DataType::UInt32, false),
959 Field::new("c1", DataType::Date64, false),
960 ]);
961
962 let c0 = UInt32Array::from(vec![Some(123), Some(234)]);
963 let c1 = Date64Array::from(vec![Some(1926632005177), Some(1926632005177685347)]);
964 let batch =
965 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c0), Arc::new(c1)]).unwrap();
966
967 let mut file = tempfile::tempfile().unwrap();
968 let mut writer = Writer::new(&mut file);
969 let batches = vec![&batch, &batch];
970
971 for batch in batches {
972 let err = writer.write(batch).unwrap_err().to_string();
973 assert_eq!(
974 err,
975 "Csv error: Error processing row 2, col 2: Cast error: Failed to convert 1926632005177685347 to temporal for Date64"
976 )
977 }
978 drop(writer);
979 }
980
981 #[test]
982 fn test_write_csv_using_rfc3339() {
983 let schema = Schema::new(vec![
984 Field::new(
985 "c1",
986 DataType::Timestamp(TimeUnit::Millisecond, Some("+00:00".into())),
987 true,
988 ),
989 Field::new("c2", DataType::Timestamp(TimeUnit::Millisecond, None), true),
990 Field::new("c3", DataType::Date32, false),
991 Field::new("c4", DataType::Time32(TimeUnit::Second), false),
992 ]);
993
994 let c1 = TimestampMillisecondArray::from(vec![Some(1555584887378), Some(1635577147000)])
995 .with_timezone("+00:00".to_string());
996 let c2 = TimestampMillisecondArray::from(vec![Some(1555584887378), Some(1635577147000)]);
997 let c3 = Date32Array::from(vec![3, 2]);
998 let c4 = Time32SecondArray::from(vec![1234, 24680]);
999
1000 let batch = RecordBatch::try_new(
1001 Arc::new(schema),
1002 vec![Arc::new(c1), Arc::new(c2), Arc::new(c3), Arc::new(c4)],
1003 )
1004 .unwrap();
1005
1006 let mut file = tempfile::tempfile().unwrap();
1007
1008 let builder = WriterBuilder::new();
1009 let mut writer = builder.build(&mut file);
1010 let batches = vec![&batch];
1011 for batch in batches {
1012 writer.write(batch).unwrap();
1013 }
1014 drop(writer);
1015
1016 file.rewind().unwrap();
1017 let mut buffer: Vec<u8> = vec![];
1018 file.read_to_end(&mut buffer).unwrap();
1019
1020 assert_eq!(
1021 "c1,c2,c3,c4
10222019-04-18T10:54:47.378Z,2019-04-18T10:54:47.378,1970-01-04,00:20:34
10232021-10-30T06:59:07Z,2021-10-30T06:59:07,1970-01-03,06:51:20\n",
1024 String::from_utf8(buffer).unwrap()
1025 );
1026 }
1027
1028 #[test]
1029 fn test_write_csv_tz_format() {
1030 let schema = Schema::new(vec![
1031 Field::new(
1032 "c1",
1033 DataType::Timestamp(TimeUnit::Millisecond, Some("+02:00".into())),
1034 true,
1035 ),
1036 Field::new(
1037 "c2",
1038 DataType::Timestamp(TimeUnit::Second, Some("+04:00".into())),
1039 true,
1040 ),
1041 ]);
1042 let c1 = TimestampMillisecondArray::from(vec![Some(1_000), Some(2_000)])
1043 .with_timezone("+02:00".to_string());
1044 let c2 = TimestampSecondArray::from(vec![Some(1_000_000), None])
1045 .with_timezone("+04:00".to_string());
1046 let batch =
1047 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap();
1048
1049 let mut file = tempfile::tempfile().unwrap();
1050 let mut writer = WriterBuilder::new()
1051 .with_timestamp_tz_format("%M:%H".to_string())
1052 .build(&mut file);
1053 writer.write(&batch).unwrap();
1054
1055 drop(writer);
1056 file.rewind().unwrap();
1057 let mut buffer: Vec<u8> = vec![];
1058 file.read_to_end(&mut buffer).unwrap();
1059
1060 let output = String::from_utf8(buffer).unwrap();
1061 assert_eq!(output, "c1,c2\n00:02,46:17\n00:02,\n");
1062 }
1063
1064 #[test]
1065 fn test_write_csv_with_lf_terminator() {
1066 let output = write_batch_with_terminator(Terminator::Any(b'\n'));
1067 assert_eq!(output, "c1,c2\nhello,1\nworld,2\n");
1068 }
1069
1070 #[test]
1071 fn test_write_csv_with_crlf_terminator() {
1072 let output = write_batch_with_terminator(Terminator::CRLF);
1073 assert_eq!(output, "c1,c2\r\nhello,1\r\nworld,2\r\n");
1074 }
1075
1076 #[test]
1077 fn test_write_csv_with_any_terminator() {
1078 let output = write_batch_with_terminator(Terminator::Any(b'|'));
1079 assert_eq!(output, "c1,c2|hello,1|world,2|");
1080 }
1081
1082 fn write_batch_with_terminator(terminator: Terminator) -> String {
1083 let schema = Schema::new(vec![
1084 Field::new("c1", DataType::Utf8, false),
1085 Field::new("c2", DataType::UInt32, false),
1086 ]);
1087
1088 let c1 = StringArray::from(vec!["hello", "world"]);
1089 let c2 = PrimitiveArray::<UInt32Type>::from(vec![1, 2]);
1090
1091 let batch =
1092 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap();
1093
1094 let mut buf = Vec::new();
1095 let mut writer = WriterBuilder::new()
1096 .with_line_terminator(terminator)
1097 .build(&mut buf);
1098 writer.write(&batch).unwrap();
1099 drop(writer);
1100
1101 String::from_utf8(buf).unwrap()
1102 }
1103
1104 #[test]
1105 fn test_write_csv_binary() {
1106 let fixed_size = 8;
1107 let schema = SchemaRef::new(Schema::new(vec![
1108 Field::new("c1", DataType::Binary, true),
1109 Field::new("c2", DataType::FixedSizeBinary(fixed_size), true),
1110 Field::new("c3", DataType::LargeBinary, true),
1111 ]));
1112 let mut c1_builder = BinaryBuilder::new();
1113 c1_builder.append_value(b"Homer");
1114 c1_builder.append_value(b"Bart");
1115 c1_builder.append_null();
1116 c1_builder.append_value(b"Ned");
1117 let mut c2_builder = FixedSizeBinaryBuilder::new(fixed_size);
1118 c2_builder.append_value(b"Simpson ").unwrap();
1119 c2_builder.append_value(b"Simpson ").unwrap();
1120 c2_builder.append_null();
1121 c2_builder.append_value(b"Flanders").unwrap();
1122 let mut c3_builder = LargeBinaryBuilder::new();
1123 c3_builder.append_null();
1124 c3_builder.append_null();
1125 c3_builder.append_value(b"Comic Book Guy");
1126 c3_builder.append_null();
1127
1128 let batch = RecordBatch::try_new(
1129 schema,
1130 vec![
1131 Arc::new(c1_builder.finish()) as ArrayRef,
1132 Arc::new(c2_builder.finish()) as ArrayRef,
1133 Arc::new(c3_builder.finish()) as ArrayRef,
1134 ],
1135 )
1136 .unwrap();
1137
1138 let mut buf = Vec::new();
1139 let builder = WriterBuilder::new();
1140 let mut writer = builder.build(&mut buf);
1141 writer.write(&batch).unwrap();
1142 drop(writer);
1143 assert_eq!(
1144 "\
1145 c1,c2,c3\n\
1146 486f6d6572,53696d70736f6e20,\n\
1147 42617274,53696d70736f6e20,\n\
1148 ,,436f6d696320426f6f6b20477579\n\
1149 4e6564,466c616e64657273,\n\
1150 ",
1151 String::from_utf8(buf).unwrap()
1152 );
1153 }
1154
1155 #[test]
1156 fn test_write_csv_whitespace_handling() {
1157 let schema = Schema::new(vec![
1158 Field::new("c1", DataType::Utf8, false),
1159 Field::new("c2", DataType::Float64, true),
1160 Field::new("c3", DataType::Utf8, true),
1161 ]);
1162
1163 let c1 = StringArray::from(vec![
1164 " leading space",
1165 "trailing space ",
1166 " both spaces ",
1167 "no spaces",
1168 ]);
1169 let c2 = PrimitiveArray::<Float64Type>::from(vec![
1170 Some(123.45),
1171 Some(678.90),
1172 None,
1173 Some(111.22),
1174 ]);
1175 let c3 = StringArray::from(vec![
1176 Some(" test "),
1177 Some("value "),
1178 None,
1179 Some(" another"),
1180 ]);
1181
1182 let batch = RecordBatch::try_new(
1183 Arc::new(schema),
1184 vec![Arc::new(c1), Arc::new(c2), Arc::new(c3)],
1185 )
1186 .unwrap();
1187
1188 let mut buf = Vec::new();
1190 let builder = WriterBuilder::new();
1191 let mut writer = builder.build(&mut buf);
1192 writer.write(&batch).unwrap();
1193 drop(writer);
1194 assert_eq!(
1195 "c1,c2,c3\n leading space,123.45, test \ntrailing space ,678.9,value \n both spaces ,,\nno spaces,111.22, another\n",
1196 String::from_utf8(buf).unwrap()
1197 );
1198
1199 let mut buf = Vec::new();
1201 let builder = WriterBuilder::new().with_ignore_leading_whitespace(true);
1202 let mut writer = builder.build(&mut buf);
1203 writer.write(&batch).unwrap();
1204 drop(writer);
1205 assert_eq!(
1206 "c1,c2,c3\nleading space,123.45,test \ntrailing space ,678.9,value \nboth spaces ,,\nno spaces,111.22,another\n",
1207 String::from_utf8(buf).unwrap()
1208 );
1209
1210 let mut buf = Vec::new();
1212 let builder = WriterBuilder::new().with_ignore_trailing_whitespace(true);
1213 let mut writer = builder.build(&mut buf);
1214 writer.write(&batch).unwrap();
1215 drop(writer);
1216 assert_eq!(
1217 "c1,c2,c3\n leading space,123.45, test\ntrailing space,678.9,value\n both spaces,,\nno spaces,111.22, another\n",
1218 String::from_utf8(buf).unwrap()
1219 );
1220
1221 let mut buf = Vec::new();
1223 let builder = WriterBuilder::new()
1224 .with_ignore_leading_whitespace(true)
1225 .with_ignore_trailing_whitespace(true);
1226 let mut writer = builder.build(&mut buf);
1227 writer.write(&batch).unwrap();
1228 drop(writer);
1229 assert_eq!(
1230 "c1,c2,c3\nleading space,123.45,test\ntrailing space,678.9,value\nboth spaces,,\nno spaces,111.22,another\n",
1231 String::from_utf8(buf).unwrap()
1232 );
1233 }
1234
1235 #[test]
1236 fn test_write_csv_whitespace_with_special_chars() {
1237 let schema = Schema::new(vec![Field::new("c1", DataType::Utf8, false)]);
1238
1239 let c1 = StringArray::from(vec![
1240 " quoted \"value\" ",
1241 " new\nline ",
1242 " comma,value ",
1243 "\ttab\tvalue\t",
1244 ]);
1245
1246 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1)]).unwrap();
1247
1248 let mut buf = Vec::new();
1250 let builder = WriterBuilder::new()
1251 .with_ignore_leading_whitespace(true)
1252 .with_ignore_trailing_whitespace(true);
1253 let mut writer = builder.build(&mut buf);
1254 writer.write(&batch).unwrap();
1255 drop(writer);
1256
1257 assert_eq!(
1259 "c1\n\"quoted \"\"value\"\"\"\n\"new\nline\"\n\"comma,value\"\ntab\tvalue\n",
1260 String::from_utf8(buf).unwrap()
1261 );
1262 }
1263
1264 #[test]
1265 fn test_write_csv_whitespace_all_string_types() {
1266 use arrow_array::{LargeStringArray, StringViewArray};
1267
1268 let schema = Schema::new(vec![
1269 Field::new("utf8", DataType::Utf8, false),
1270 Field::new("large_utf8", DataType::LargeUtf8, false),
1271 Field::new("utf8_view", DataType::Utf8View, false),
1272 ]);
1273
1274 let utf8 = StringArray::from(vec![" leading", "trailing ", " both ", "no_spaces"]);
1275
1276 let large_utf8 =
1277 LargeStringArray::from(vec![" leading", "trailing ", " both ", "no_spaces"]);
1278
1279 let utf8_view =
1280 StringViewArray::from(vec![" leading", "trailing ", " both ", "no_spaces"]);
1281
1282 let batch = RecordBatch::try_new(
1283 Arc::new(schema),
1284 vec![Arc::new(utf8), Arc::new(large_utf8), Arc::new(utf8_view)],
1285 )
1286 .unwrap();
1287
1288 let mut buf = Vec::new();
1290 let builder = WriterBuilder::new();
1291 let mut writer = builder.build(&mut buf);
1292 writer.write(&batch).unwrap();
1293 drop(writer);
1294 assert_eq!(
1295 "utf8,large_utf8,utf8_view\n leading, leading, leading\ntrailing ,trailing ,trailing \n both , both , both \nno_spaces,no_spaces,no_spaces\n",
1296 String::from_utf8(buf).unwrap()
1297 );
1298
1299 let mut buf = Vec::new();
1301 let builder = WriterBuilder::new()
1302 .with_ignore_leading_whitespace(true)
1303 .with_ignore_trailing_whitespace(true);
1304 let mut writer = builder.build(&mut buf);
1305 writer.write(&batch).unwrap();
1306 drop(writer);
1307 assert_eq!(
1308 "utf8,large_utf8,utf8_view\nleading,leading,leading\ntrailing,trailing,trailing\nboth,both,both\nno_spaces,no_spaces,no_spaces\n",
1309 String::from_utf8(buf).unwrap()
1310 );
1311
1312 let mut buf = Vec::new();
1314 let builder = WriterBuilder::new().with_ignore_leading_whitespace(true);
1315 let mut writer = builder.build(&mut buf);
1316 writer.write(&batch).unwrap();
1317 drop(writer);
1318 assert_eq!(
1319 "utf8,large_utf8,utf8_view\nleading,leading,leading\ntrailing ,trailing ,trailing \nboth ,both ,both \nno_spaces,no_spaces,no_spaces\n",
1320 String::from_utf8(buf).unwrap()
1321 );
1322
1323 let mut buf = Vec::new();
1325 let builder = WriterBuilder::new().with_ignore_trailing_whitespace(true);
1326 let mut writer = builder.build(&mut buf);
1327 writer.write(&batch).unwrap();
1328 drop(writer);
1329 assert_eq!(
1330 "utf8,large_utf8,utf8_view\n leading, leading, leading\ntrailing,trailing,trailing\n both, both, both\nno_spaces,no_spaces,no_spaces\n",
1331 String::from_utf8(buf).unwrap()
1332 );
1333 }
1334
1335 fn write_quote_style(batch: &RecordBatch, quote_style: QuoteStyle) -> String {
1336 let mut buf = Vec::new();
1337 let mut writer = WriterBuilder::new()
1338 .with_quote_style(quote_style)
1339 .build(&mut buf);
1340 writer.write(batch).unwrap();
1341 drop(writer);
1342 String::from_utf8(buf).unwrap()
1343 }
1344
1345 fn write_quote_style_with_null(
1346 batch: &RecordBatch,
1347 quote_style: QuoteStyle,
1348 null_value: &str,
1349 ) -> String {
1350 let mut buf = Vec::new();
1351 let mut writer = WriterBuilder::new()
1352 .with_quote_style(quote_style)
1353 .with_null(null_value.to_string())
1354 .build(&mut buf);
1355 writer.write(batch).unwrap();
1356 drop(writer);
1357 String::from_utf8(buf).unwrap()
1358 }
1359
1360 #[test]
1361 fn test_write_csv_quote_style() {
1362 let schema = Schema::new(vec![
1363 Field::new("text", DataType::Utf8, false),
1364 Field::new("number", DataType::Int32, false),
1365 Field::new("float", DataType::Float64, false),
1366 ]);
1367
1368 let text = StringArray::from(vec!["hello", "world", "comma,value", "quote\"test"]);
1369 let number = Int32Array::from(vec![1, 2, 3, 4]);
1370 let float = Float64Array::from(vec![1.1, 2.2, 3.3, 4.4]);
1371
1372 let batch = RecordBatch::try_new(
1373 Arc::new(schema),
1374 vec![Arc::new(text), Arc::new(number), Arc::new(float)],
1375 )
1376 .unwrap();
1377
1378 assert_eq!(
1380 "text,number,float\nhello,1,1.1\nworld,2,2.2\n\"comma,value\",3,3.3\n\"quote\"\"test\",4,4.4\n",
1381 write_quote_style(&batch, QuoteStyle::Necessary)
1382 );
1383
1384 assert_eq!(
1386 "\"text\",\"number\",\"float\"\n\"hello\",\"1\",\"1.1\"\n\"world\",\"2\",\"2.2\"\n\"comma,value\",\"3\",\"3.3\"\n\"quote\"\"test\",\"4\",\"4.4\"\n",
1387 write_quote_style(&batch, QuoteStyle::Always)
1388 );
1389
1390 assert_eq!(
1392 "\"text\",\"number\",\"float\"\n\"hello\",1,1.1\n\"world\",2,2.2\n\"comma,value\",3,3.3\n\"quote\"\"test\",4,4.4\n",
1393 write_quote_style(&batch, QuoteStyle::NonNumeric)
1394 );
1395
1396 assert_eq!(
1399 "text,number,float\nhello,1,1.1\nworld,2,2.2\ncomma,value,3,3.3\nquote\"test,4,4.4\n",
1400 write_quote_style(&batch, QuoteStyle::Never)
1401 );
1402 }
1403
1404 #[test]
1405 fn test_write_csv_quote_style_with_nulls() {
1406 let schema = Schema::new(vec![
1407 Field::new("text", DataType::Utf8, true),
1408 Field::new("number", DataType::Int32, true),
1409 ]);
1410
1411 let text = StringArray::from(vec![Some("hello"), None, Some("world")]);
1412 let number = Int32Array::from(vec![Some(1), Some(2), None]);
1413
1414 let batch =
1415 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(text), Arc::new(number)]).unwrap();
1416
1417 assert_eq!(
1419 "\"text\",\"number\"\n\"hello\",\"1\"\n\"\",\"2\"\n\"world\",\"\"\n",
1420 write_quote_style(&batch, QuoteStyle::Always)
1421 );
1422
1423 assert_eq!(
1425 "\"text\",\"number\"\n\"hello\",\"1\"\n\"NULL\",\"2\"\n\"world\",\"NULL\"\n",
1426 write_quote_style_with_null(&batch, QuoteStyle::Always, "NULL")
1427 );
1428 }
1429}