1use arrow_array::*;
177use arrow_cast::display::*;
178use arrow_schema::*;
179use csv::ByteRecord;
180use std::io::Write;
181
182use crate::map_csv_error;
183const DEFAULT_NULL_VALUE: &str = "";
184
185pub use csv::QuoteStyle;
200
201#[derive(Debug)]
205pub struct Writer<W: Write> {
206 writer: csv::Writer<W>,
208 has_headers: bool,
210 date_format: Option<String>,
212 datetime_format: Option<String>,
214 timestamp_format: Option<String>,
216 timestamp_tz_format: Option<String>,
218 time_format: Option<String>,
220 beginning: bool,
222 null_value: Option<String>,
224 ignore_leading_whitespace: bool,
226 ignore_trailing_whitespace: bool,
228}
229
230impl<W: Write> Writer<W> {
231 pub fn new(writer: W) -> Self {
236 let delimiter = b',';
237 WriterBuilder::new().with_delimiter(delimiter).build(writer)
238 }
239
240 pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
242 let num_columns = batch.num_columns();
243 if self.beginning {
244 if self.has_headers {
245 let mut headers: Vec<String> = Vec::with_capacity(num_columns);
246 batch
247 .schema()
248 .fields()
249 .iter()
250 .for_each(|field| headers.push(field.name().to_string()));
251 self.writer
252 .write_record(&headers[..])
253 .map_err(map_csv_error)?;
254 }
255 self.beginning = false;
256 }
257
258 let options = FormatOptions::default()
259 .with_null(self.null_value.as_deref().unwrap_or(DEFAULT_NULL_VALUE))
260 .with_date_format(self.date_format.as_deref())
261 .with_datetime_format(self.datetime_format.as_deref())
262 .with_timestamp_format(self.timestamp_format.as_deref())
263 .with_timestamp_tz_format(self.timestamp_tz_format.as_deref())
264 .with_time_format(self.time_format.as_deref());
265
266 let converters = batch
267 .columns()
268 .iter()
269 .map(|a| {
270 if a.data_type().is_nested() {
271 Err(ArrowError::CsvError(format!(
272 "Nested type {} is not supported in CSV",
273 a.data_type()
274 )))
275 } else {
276 ArrayFormatter::try_new(a.as_ref(), &options)
277 }
278 })
279 .collect::<Result<Vec<_>, ArrowError>>()?;
280
281 let mut buffer = String::with_capacity(1024);
282 let mut byte_record = ByteRecord::with_capacity(1024, converters.len());
283
284 for row_idx in 0..batch.num_rows() {
285 byte_record.clear();
286 for (col_idx, converter) in converters.iter().enumerate() {
287 buffer.clear();
288 converter.value(row_idx).write(&mut buffer).map_err(|e| {
289 ArrowError::CsvError(format!(
290 "Error processing row {}, col {}: {e}",
291 row_idx + 1,
292 col_idx + 1
293 ))
294 })?;
295
296 let field_bytes =
297 self.get_trimmed_field_bytes(&buffer, batch.column(col_idx).data_type());
298 byte_record.push_field(field_bytes);
299 }
300
301 self.writer
302 .write_byte_record(&byte_record)
303 .map_err(map_csv_error)?;
304 }
305 self.writer.flush()?;
306
307 Ok(())
308 }
309
310 fn get_trimmed_field_bytes<'a>(&self, buffer: &'a str, data_type: &DataType) -> &'a [u8] {
312 let should_trim = (self.ignore_leading_whitespace || self.ignore_trailing_whitespace)
314 && matches!(
315 data_type,
316 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
317 );
318
319 if !should_trim {
320 return buffer.as_bytes();
321 }
322
323 let mut trimmed = buffer;
324 if self.ignore_leading_whitespace {
325 trimmed = trimmed.trim_start();
326 }
327 if self.ignore_trailing_whitespace {
328 trimmed = trimmed.trim_end();
329 }
330 trimmed.as_bytes()
331 }
332
333 pub fn into_inner(self) -> W {
335 self.writer.into_inner().unwrap()
337 }
338}
339
340impl<W: Write> RecordBatchWriter for Writer<W> {
341 fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
342 self.write(batch)
343 }
344
345 fn close(self) -> Result<(), ArrowError> {
346 Ok(())
347 }
348}
349
350#[derive(Clone, Debug)]
352pub struct WriterBuilder {
353 delimiter: u8,
355 has_header: bool,
357 quote: u8,
359 escape: u8,
361 double_quote: bool,
363 date_format: Option<String>,
365 datetime_format: Option<String>,
367 timestamp_format: Option<String>,
369 timestamp_tz_format: Option<String>,
371 time_format: Option<String>,
373 null_value: Option<String>,
375 ignore_leading_whitespace: bool,
377 ignore_trailing_whitespace: bool,
379 quote_style: QuoteStyle,
381}
382
383impl Default for WriterBuilder {
384 fn default() -> Self {
385 WriterBuilder {
386 delimiter: b',',
387 has_header: true,
388 quote: b'"',
389 escape: b'\\',
390 double_quote: true,
391 date_format: None,
392 datetime_format: None,
393 timestamp_format: None,
394 timestamp_tz_format: None,
395 time_format: None,
396 null_value: None,
397 ignore_leading_whitespace: false,
398 ignore_trailing_whitespace: false,
399 quote_style: QuoteStyle::default(),
400 }
401 }
402}
403
404impl WriterBuilder {
405 pub fn new() -> Self {
427 Self::default()
428 }
429
430 pub fn with_header(mut self, header: bool) -> Self {
432 self.has_header = header;
433 self
434 }
435
436 pub fn header(&self) -> bool {
438 self.has_header
439 }
440
441 pub fn with_delimiter(mut self, delimiter: u8) -> Self {
443 self.delimiter = delimiter;
444 self
445 }
446
447 pub fn delimiter(&self) -> u8 {
449 self.delimiter
450 }
451
452 pub fn with_quote(mut self, quote: u8) -> Self {
454 self.quote = quote;
455 self
456 }
457
458 pub fn quote(&self) -> u8 {
460 self.quote
461 }
462
463 pub fn with_escape(mut self, escape: u8) -> Self {
471 self.escape = escape;
472 self
473 }
474
475 pub fn escape(&self) -> u8 {
477 self.escape
478 }
479
480 pub fn with_double_quote(mut self, double_quote: bool) -> Self {
488 self.double_quote = double_quote;
489 self
490 }
491
492 pub fn double_quote(&self) -> bool {
494 self.double_quote
495 }
496
497 pub fn with_date_format(mut self, format: String) -> Self {
499 self.date_format = Some(format);
500 self
501 }
502
503 pub fn date_format(&self) -> Option<&str> {
505 self.date_format.as_deref()
506 }
507
508 pub fn with_datetime_format(mut self, format: String) -> Self {
510 self.datetime_format = Some(format);
511 self
512 }
513
514 pub fn datetime_format(&self) -> Option<&str> {
516 self.datetime_format.as_deref()
517 }
518
519 pub fn with_time_format(mut self, format: String) -> Self {
521 self.time_format = Some(format);
522 self
523 }
524
525 pub fn time_format(&self) -> Option<&str> {
527 self.time_format.as_deref()
528 }
529
530 pub fn with_timestamp_format(mut self, format: String) -> Self {
532 self.timestamp_format = Some(format);
533 self
534 }
535
536 pub fn timestamp_format(&self) -> Option<&str> {
538 self.timestamp_format.as_deref()
539 }
540
541 pub fn with_timestamp_tz_format(mut self, tz_format: String) -> Self {
543 self.timestamp_tz_format = Some(tz_format);
544 self
545 }
546
547 pub fn timestamp_tz_format(&self) -> Option<&str> {
549 self.timestamp_tz_format.as_deref()
550 }
551
552 pub fn with_null(mut self, null_value: String) -> Self {
554 self.null_value = Some(null_value);
555 self
556 }
557
558 pub fn null(&self) -> &str {
560 self.null_value.as_deref().unwrap_or(DEFAULT_NULL_VALUE)
561 }
562
563 pub fn with_ignore_leading_whitespace(mut self, ignore: bool) -> Self {
566 self.ignore_leading_whitespace = ignore;
567 self
568 }
569
570 pub fn ignore_leading_whitespace(&self) -> bool {
572 self.ignore_leading_whitespace
573 }
574
575 pub fn with_ignore_trailing_whitespace(mut self, ignore: bool) -> Self {
578 self.ignore_trailing_whitespace = ignore;
579 self
580 }
581
582 pub fn ignore_trailing_whitespace(&self) -> bool {
584 self.ignore_trailing_whitespace
585 }
586
587 pub fn with_quote_style(mut self, quote_style: QuoteStyle) -> Self {
603 self.quote_style = quote_style;
604 self
605 }
606
607 pub fn quote_style(&self) -> QuoteStyle {
609 self.quote_style
610 }
611
612 pub fn build<W: Write>(self, writer: W) -> Writer<W> {
614 let mut builder = csv::WriterBuilder::new();
615 let writer = builder
616 .delimiter(self.delimiter)
617 .quote(self.quote)
618 .quote_style(self.quote_style)
619 .double_quote(self.double_quote)
620 .escape(self.escape)
621 .from_writer(writer);
622 Writer {
623 writer,
624 beginning: true,
625 has_headers: self.has_header,
626 date_format: self.date_format,
627 datetime_format: self.datetime_format,
628 time_format: self.time_format,
629 timestamp_format: self.timestamp_format,
630 timestamp_tz_format: self.timestamp_tz_format,
631 null_value: self.null_value,
632 ignore_leading_whitespace: self.ignore_leading_whitespace,
633 ignore_trailing_whitespace: self.ignore_trailing_whitespace,
634 }
635 }
636}
637
638#[cfg(test)]
639mod tests {
640 use super::*;
641
642 use crate::ReaderBuilder;
643 use arrow_array::builder::{
644 BinaryBuilder, Decimal32Builder, Decimal64Builder, Decimal128Builder, Decimal256Builder,
645 FixedSizeBinaryBuilder, LargeBinaryBuilder,
646 };
647 use arrow_array::types::*;
648 use arrow_buffer::i256;
649 use core::str;
650 use std::io::{Cursor, Read, Seek};
651 use std::sync::Arc;
652
653 #[test]
654 fn test_write_csv() {
655 let schema = Schema::new(vec![
656 Field::new("c1", DataType::Utf8, false),
657 Field::new("c2", DataType::Float64, true),
658 Field::new("c3", DataType::UInt32, false),
659 Field::new("c4", DataType::Boolean, true),
660 Field::new("c5", DataType::Timestamp(TimeUnit::Millisecond, None), true),
661 Field::new("c6", DataType::Time32(TimeUnit::Second), false),
662 Field::new_dictionary("c7", DataType::Int32, DataType::Utf8, false),
663 ]);
664
665 let c1 = StringArray::from(vec![
666 "Lorem ipsum dolor sit amet",
667 "consectetur adipiscing elit",
668 "sed do eiusmod tempor",
669 ]);
670 let c2 =
671 PrimitiveArray::<Float64Type>::from(vec![Some(123.564532), None, Some(-556132.25)]);
672 let c3 = PrimitiveArray::<UInt32Type>::from(vec![3, 2, 1]);
673 let c4 = BooleanArray::from(vec![Some(true), Some(false), None]);
674 let c5 =
675 TimestampMillisecondArray::from(vec![None, Some(1555584887378), Some(1555555555555)]);
676 let c6 = Time32SecondArray::from(vec![1234, 24680, 85563]);
677 let c7: DictionaryArray<Int32Type> =
678 vec!["cupcakes", "cupcakes", "foo"].into_iter().collect();
679
680 let batch = RecordBatch::try_new(
681 Arc::new(schema),
682 vec![
683 Arc::new(c1),
684 Arc::new(c2),
685 Arc::new(c3),
686 Arc::new(c4),
687 Arc::new(c5),
688 Arc::new(c6),
689 Arc::new(c7),
690 ],
691 )
692 .unwrap();
693
694 let mut file = tempfile::tempfile().unwrap();
695
696 let mut writer = Writer::new(&mut file);
697 let batches = vec![&batch, &batch];
698 for batch in batches {
699 writer.write(batch).unwrap();
700 }
701 drop(writer);
702
703 file.rewind().unwrap();
705 let mut buffer: Vec<u8> = vec![];
706 file.read_to_end(&mut buffer).unwrap();
707
708 let expected = r#"c1,c2,c3,c4,c5,c6,c7
709Lorem ipsum dolor sit amet,123.564532,3,true,,00:20:34,cupcakes
710consectetur adipiscing elit,,2,false,2019-04-18T10:54:47.378,06:51:20,cupcakes
711sed do eiusmod tempor,-556132.25,1,,2019-04-18T02:45:55.555,23:46:03,foo
712Lorem ipsum dolor sit amet,123.564532,3,true,,00:20:34,cupcakes
713consectetur adipiscing elit,,2,false,2019-04-18T10:54:47.378,06:51:20,cupcakes
714sed do eiusmod tempor,-556132.25,1,,2019-04-18T02:45:55.555,23:46:03,foo
715"#;
716 assert_eq!(expected, str::from_utf8(&buffer).unwrap());
717 }
718
719 #[test]
720 fn test_write_csv_decimal() {
721 let schema = Schema::new(vec![
722 Field::new("c1", DataType::Decimal32(9, 6), true),
723 Field::new("c2", DataType::Decimal64(17, 6), true),
724 Field::new("c3", DataType::Decimal128(38, 6), true),
725 Field::new("c4", DataType::Decimal256(76, 6), true),
726 ]);
727
728 let mut c1_builder = Decimal32Builder::new().with_data_type(DataType::Decimal32(9, 6));
729 c1_builder.extend(vec![Some(-3335724), Some(2179404), None, Some(290472)]);
730 let c1 = c1_builder.finish();
731
732 let mut c2_builder = Decimal64Builder::new().with_data_type(DataType::Decimal64(17, 6));
733 c2_builder.extend(vec![Some(-3335724), Some(2179404), None, Some(290472)]);
734 let c2 = c2_builder.finish();
735
736 let mut c3_builder = Decimal128Builder::new().with_data_type(DataType::Decimal128(38, 6));
737 c3_builder.extend(vec![Some(-3335724), Some(2179404), None, Some(290472)]);
738 let c3 = c3_builder.finish();
739
740 let mut c4_builder = Decimal256Builder::new().with_data_type(DataType::Decimal256(76, 6));
741 c4_builder.extend(vec![
742 Some(i256::from_i128(-3335724)),
743 Some(i256::from_i128(2179404)),
744 None,
745 Some(i256::from_i128(290472)),
746 ]);
747 let c4 = c4_builder.finish();
748
749 let batch = RecordBatch::try_new(
750 Arc::new(schema),
751 vec![Arc::new(c1), Arc::new(c2), Arc::new(c3), Arc::new(c4)],
752 )
753 .unwrap();
754
755 let mut file = tempfile::tempfile().unwrap();
756
757 let mut writer = Writer::new(&mut file);
758 let batches = vec![&batch, &batch];
759 for batch in batches {
760 writer.write(batch).unwrap();
761 }
762 drop(writer);
763
764 file.rewind().unwrap();
766 let mut buffer: Vec<u8> = vec![];
767 file.read_to_end(&mut buffer).unwrap();
768
769 let expected = r#"c1,c2,c3,c4
770-3.335724,-3.335724,-3.335724,-3.335724
7712.179404,2.179404,2.179404,2.179404
772,,,
7730.290472,0.290472,0.290472,0.290472
774-3.335724,-3.335724,-3.335724,-3.335724
7752.179404,2.179404,2.179404,2.179404
776,,,
7770.290472,0.290472,0.290472,0.290472
778"#;
779 assert_eq!(expected, str::from_utf8(&buffer).unwrap());
780 }
781
782 #[test]
783 fn test_write_csv_custom_options() {
784 let schema = Schema::new(vec![
785 Field::new("c1", DataType::Utf8, false),
786 Field::new("c2", DataType::Float64, true),
787 Field::new("c3", DataType::UInt32, false),
788 Field::new("c4", DataType::Boolean, true),
789 Field::new("c6", DataType::Time32(TimeUnit::Second), false),
790 ]);
791
792 let c1 = StringArray::from(vec![
793 "Lorem ipsum \ndolor sit amet",
794 "consectetur \"adipiscing\" elit",
795 "sed do eiusmod tempor",
796 ]);
797 let c2 =
798 PrimitiveArray::<Float64Type>::from(vec![Some(123.564532), None, Some(-556132.25)]);
799 let c3 = PrimitiveArray::<UInt32Type>::from(vec![3, 2, 1]);
800 let c4 = BooleanArray::from(vec![Some(true), Some(false), None]);
801 let c6 = Time32SecondArray::from(vec![1234, 24680, 85563]);
802
803 let batch = RecordBatch::try_new(
804 Arc::new(schema),
805 vec![
806 Arc::new(c1),
807 Arc::new(c2),
808 Arc::new(c3),
809 Arc::new(c4),
810 Arc::new(c6),
811 ],
812 )
813 .unwrap();
814
815 let mut file = tempfile::tempfile().unwrap();
816
817 let builder = WriterBuilder::new()
818 .with_header(false)
819 .with_delimiter(b'|')
820 .with_quote(b'\'')
821 .with_null("NULL".to_string())
822 .with_time_format("%r".to_string());
823 let mut writer = builder.build(&mut file);
824 let batches = vec![&batch];
825 for batch in batches {
826 writer.write(batch).unwrap();
827 }
828 drop(writer);
829
830 file.rewind().unwrap();
832 let mut buffer: Vec<u8> = vec![];
833 file.read_to_end(&mut buffer).unwrap();
834
835 assert_eq!(
836 "'Lorem ipsum \ndolor sit amet'|123.564532|3|true|12:20:34 AM\nconsectetur \"adipiscing\" elit|NULL|2|false|06:51:20 AM\nsed do eiusmod tempor|-556132.25|1|NULL|11:46:03 PM\n"
837 .to_string(),
838 String::from_utf8(buffer).unwrap()
839 );
840
841 let mut file = tempfile::tempfile().unwrap();
842
843 let builder = WriterBuilder::new()
844 .with_header(true)
845 .with_double_quote(false)
846 .with_escape(b'$');
847 let mut writer = builder.build(&mut file);
848 let batches = vec![&batch];
849 for batch in batches {
850 writer.write(batch).unwrap();
851 }
852 drop(writer);
853
854 file.rewind().unwrap();
855 let mut buffer: Vec<u8> = vec![];
856 file.read_to_end(&mut buffer).unwrap();
857
858 assert_eq!(
859 "c1,c2,c3,c4,c6\n\"Lorem ipsum \ndolor sit amet\",123.564532,3,true,00:20:34\n\"consectetur $\"adipiscing$\" elit\",,2,false,06:51:20\nsed do eiusmod tempor,-556132.25,1,,23:46:03\n"
860 .to_string(),
861 String::from_utf8(buffer).unwrap()
862 );
863 }
864
865 #[test]
866 fn test_conversion_consistency() {
867 let schema = Schema::new(vec![
870 Field::new("c1", DataType::Date32, false),
871 Field::new("c2", DataType::Date64, false),
872 Field::new("c3", DataType::Timestamp(TimeUnit::Nanosecond, None), false),
873 ]);
874
875 let nanoseconds = vec![
876 1599566300000000000,
877 1599566200000000000,
878 1599566100000000000,
879 ];
880 let c1 = Date32Array::from(vec![3, 2, 1]);
881 let c2 = Date64Array::from(vec![3, 2, 1]);
882 let c3 = TimestampNanosecondArray::from(nanoseconds.clone());
883
884 let batch = RecordBatch::try_new(
885 Arc::new(schema.clone()),
886 vec![Arc::new(c1), Arc::new(c2), Arc::new(c3)],
887 )
888 .unwrap();
889
890 let builder = WriterBuilder::new().with_header(false);
891
892 let mut buf: Cursor<Vec<u8>> = Default::default();
893 {
895 let mut writer = builder.build(&mut buf);
896 writer.write(&batch).unwrap();
897 }
898 buf.set_position(0);
899
900 let mut reader = ReaderBuilder::new(Arc::new(schema))
901 .with_batch_size(3)
902 .build_buffered(buf)
903 .unwrap();
904
905 let rb = reader.next().unwrap().unwrap();
906 let c1 = rb.column(0).as_any().downcast_ref::<Date32Array>().unwrap();
907 let c2 = rb.column(1).as_any().downcast_ref::<Date64Array>().unwrap();
908 let c3 = rb
909 .column(2)
910 .as_any()
911 .downcast_ref::<TimestampNanosecondArray>()
912 .unwrap();
913
914 let actual = c1.into_iter().collect::<Vec<_>>();
915 let expected = vec![Some(3), Some(2), Some(1)];
916 assert_eq!(actual, expected);
917 let actual = c2.into_iter().collect::<Vec<_>>();
918 let expected = vec![Some(3), Some(2), Some(1)];
919 assert_eq!(actual, expected);
920 let actual = c3.into_iter().collect::<Vec<_>>();
921 let expected = nanoseconds.into_iter().map(Some).collect::<Vec<_>>();
922 assert_eq!(actual, expected);
923 }
924
925 #[test]
926 fn test_write_csv_invalid_cast() {
927 let schema = Schema::new(vec![
928 Field::new("c0", DataType::UInt32, false),
929 Field::new("c1", DataType::Date64, false),
930 ]);
931
932 let c0 = UInt32Array::from(vec![Some(123), Some(234)]);
933 let c1 = Date64Array::from(vec![Some(1926632005177), Some(1926632005177685347)]);
934 let batch =
935 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c0), Arc::new(c1)]).unwrap();
936
937 let mut file = tempfile::tempfile().unwrap();
938 let mut writer = Writer::new(&mut file);
939 let batches = vec![&batch, &batch];
940
941 for batch in batches {
942 let err = writer.write(batch).unwrap_err().to_string();
943 assert_eq!(
944 err,
945 "Csv error: Error processing row 2, col 2: Cast error: Failed to convert 1926632005177685347 to temporal for Date64"
946 )
947 }
948 drop(writer);
949 }
950
951 #[test]
952 fn test_write_csv_using_rfc3339() {
953 let schema = Schema::new(vec![
954 Field::new(
955 "c1",
956 DataType::Timestamp(TimeUnit::Millisecond, Some("+00:00".into())),
957 true,
958 ),
959 Field::new("c2", DataType::Timestamp(TimeUnit::Millisecond, None), true),
960 Field::new("c3", DataType::Date32, false),
961 Field::new("c4", DataType::Time32(TimeUnit::Second), false),
962 ]);
963
964 let c1 = TimestampMillisecondArray::from(vec![Some(1555584887378), Some(1635577147000)])
965 .with_timezone("+00:00".to_string());
966 let c2 = TimestampMillisecondArray::from(vec![Some(1555584887378), Some(1635577147000)]);
967 let c3 = Date32Array::from(vec![3, 2]);
968 let c4 = Time32SecondArray::from(vec![1234, 24680]);
969
970 let batch = RecordBatch::try_new(
971 Arc::new(schema),
972 vec![Arc::new(c1), Arc::new(c2), Arc::new(c3), Arc::new(c4)],
973 )
974 .unwrap();
975
976 let mut file = tempfile::tempfile().unwrap();
977
978 let builder = WriterBuilder::new();
979 let mut writer = builder.build(&mut file);
980 let batches = vec![&batch];
981 for batch in batches {
982 writer.write(batch).unwrap();
983 }
984 drop(writer);
985
986 file.rewind().unwrap();
987 let mut buffer: Vec<u8> = vec![];
988 file.read_to_end(&mut buffer).unwrap();
989
990 assert_eq!(
991 "c1,c2,c3,c4
9922019-04-18T10:54:47.378Z,2019-04-18T10:54:47.378,1970-01-04,00:20:34
9932021-10-30T06:59:07Z,2021-10-30T06:59:07,1970-01-03,06:51:20\n",
994 String::from_utf8(buffer).unwrap()
995 );
996 }
997
998 #[test]
999 fn test_write_csv_tz_format() {
1000 let schema = Schema::new(vec![
1001 Field::new(
1002 "c1",
1003 DataType::Timestamp(TimeUnit::Millisecond, Some("+02:00".into())),
1004 true,
1005 ),
1006 Field::new(
1007 "c2",
1008 DataType::Timestamp(TimeUnit::Second, Some("+04:00".into())),
1009 true,
1010 ),
1011 ]);
1012 let c1 = TimestampMillisecondArray::from(vec![Some(1_000), Some(2_000)])
1013 .with_timezone("+02:00".to_string());
1014 let c2 = TimestampSecondArray::from(vec![Some(1_000_000), None])
1015 .with_timezone("+04:00".to_string());
1016 let batch =
1017 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap();
1018
1019 let mut file = tempfile::tempfile().unwrap();
1020 let mut writer = WriterBuilder::new()
1021 .with_timestamp_tz_format("%M:%H".to_string())
1022 .build(&mut file);
1023 writer.write(&batch).unwrap();
1024
1025 drop(writer);
1026 file.rewind().unwrap();
1027 let mut buffer: Vec<u8> = vec![];
1028 file.read_to_end(&mut buffer).unwrap();
1029
1030 assert_eq!(
1031 "c1,c2\n00:02,46:17\n00:02,\n",
1032 String::from_utf8(buffer).unwrap()
1033 );
1034 }
1035
1036 #[test]
1037 fn test_write_csv_binary() {
1038 let fixed_size = 8;
1039 let schema = SchemaRef::new(Schema::new(vec![
1040 Field::new("c1", DataType::Binary, true),
1041 Field::new("c2", DataType::FixedSizeBinary(fixed_size), true),
1042 Field::new("c3", DataType::LargeBinary, true),
1043 ]));
1044 let mut c1_builder = BinaryBuilder::new();
1045 c1_builder.append_value(b"Homer");
1046 c1_builder.append_value(b"Bart");
1047 c1_builder.append_null();
1048 c1_builder.append_value(b"Ned");
1049 let mut c2_builder = FixedSizeBinaryBuilder::new(fixed_size);
1050 c2_builder.append_value(b"Simpson ").unwrap();
1051 c2_builder.append_value(b"Simpson ").unwrap();
1052 c2_builder.append_null();
1053 c2_builder.append_value(b"Flanders").unwrap();
1054 let mut c3_builder = LargeBinaryBuilder::new();
1055 c3_builder.append_null();
1056 c3_builder.append_null();
1057 c3_builder.append_value(b"Comic Book Guy");
1058 c3_builder.append_null();
1059
1060 let batch = RecordBatch::try_new(
1061 schema,
1062 vec![
1063 Arc::new(c1_builder.finish()) as ArrayRef,
1064 Arc::new(c2_builder.finish()) as ArrayRef,
1065 Arc::new(c3_builder.finish()) as ArrayRef,
1066 ],
1067 )
1068 .unwrap();
1069
1070 let mut buf = Vec::new();
1071 let builder = WriterBuilder::new();
1072 let mut writer = builder.build(&mut buf);
1073 writer.write(&batch).unwrap();
1074 drop(writer);
1075 assert_eq!(
1076 "\
1077 c1,c2,c3\n\
1078 486f6d6572,53696d70736f6e20,\n\
1079 42617274,53696d70736f6e20,\n\
1080 ,,436f6d696320426f6f6b20477579\n\
1081 4e6564,466c616e64657273,\n\
1082 ",
1083 String::from_utf8(buf).unwrap()
1084 );
1085 }
1086
1087 #[test]
1088 fn test_write_csv_whitespace_handling() {
1089 let schema = Schema::new(vec![
1090 Field::new("c1", DataType::Utf8, false),
1091 Field::new("c2", DataType::Float64, true),
1092 Field::new("c3", DataType::Utf8, true),
1093 ]);
1094
1095 let c1 = StringArray::from(vec![
1096 " leading space",
1097 "trailing space ",
1098 " both spaces ",
1099 "no spaces",
1100 ]);
1101 let c2 = PrimitiveArray::<Float64Type>::from(vec![
1102 Some(123.45),
1103 Some(678.90),
1104 None,
1105 Some(111.22),
1106 ]);
1107 let c3 = StringArray::from(vec![
1108 Some(" test "),
1109 Some("value "),
1110 None,
1111 Some(" another"),
1112 ]);
1113
1114 let batch = RecordBatch::try_new(
1115 Arc::new(schema),
1116 vec![Arc::new(c1), Arc::new(c2), Arc::new(c3)],
1117 )
1118 .unwrap();
1119
1120 let mut buf = Vec::new();
1122 let builder = WriterBuilder::new();
1123 let mut writer = builder.build(&mut buf);
1124 writer.write(&batch).unwrap();
1125 drop(writer);
1126 assert_eq!(
1127 "c1,c2,c3\n leading space,123.45, test \ntrailing space ,678.9,value \n both spaces ,,\nno spaces,111.22, another\n",
1128 String::from_utf8(buf).unwrap()
1129 );
1130
1131 let mut buf = Vec::new();
1133 let builder = WriterBuilder::new().with_ignore_leading_whitespace(true);
1134 let mut writer = builder.build(&mut buf);
1135 writer.write(&batch).unwrap();
1136 drop(writer);
1137 assert_eq!(
1138 "c1,c2,c3\nleading space,123.45,test \ntrailing space ,678.9,value \nboth spaces ,,\nno spaces,111.22,another\n",
1139 String::from_utf8(buf).unwrap()
1140 );
1141
1142 let mut buf = Vec::new();
1144 let builder = WriterBuilder::new().with_ignore_trailing_whitespace(true);
1145 let mut writer = builder.build(&mut buf);
1146 writer.write(&batch).unwrap();
1147 drop(writer);
1148 assert_eq!(
1149 "c1,c2,c3\n leading space,123.45, test\ntrailing space,678.9,value\n both spaces,,\nno spaces,111.22, another\n",
1150 String::from_utf8(buf).unwrap()
1151 );
1152
1153 let mut buf = Vec::new();
1155 let builder = WriterBuilder::new()
1156 .with_ignore_leading_whitespace(true)
1157 .with_ignore_trailing_whitespace(true);
1158 let mut writer = builder.build(&mut buf);
1159 writer.write(&batch).unwrap();
1160 drop(writer);
1161 assert_eq!(
1162 "c1,c2,c3\nleading space,123.45,test\ntrailing space,678.9,value\nboth spaces,,\nno spaces,111.22,another\n",
1163 String::from_utf8(buf).unwrap()
1164 );
1165 }
1166
1167 #[test]
1168 fn test_write_csv_whitespace_with_special_chars() {
1169 let schema = Schema::new(vec![Field::new("c1", DataType::Utf8, false)]);
1170
1171 let c1 = StringArray::from(vec![
1172 " quoted \"value\" ",
1173 " new\nline ",
1174 " comma,value ",
1175 "\ttab\tvalue\t",
1176 ]);
1177
1178 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1)]).unwrap();
1179
1180 let mut buf = Vec::new();
1182 let builder = WriterBuilder::new()
1183 .with_ignore_leading_whitespace(true)
1184 .with_ignore_trailing_whitespace(true);
1185 let mut writer = builder.build(&mut buf);
1186 writer.write(&batch).unwrap();
1187 drop(writer);
1188
1189 assert_eq!(
1191 "c1\n\"quoted \"\"value\"\"\"\n\"new\nline\"\n\"comma,value\"\ntab\tvalue\n",
1192 String::from_utf8(buf).unwrap()
1193 );
1194 }
1195
1196 #[test]
1197 fn test_write_csv_whitespace_all_string_types() {
1198 use arrow_array::{LargeStringArray, StringViewArray};
1199
1200 let schema = Schema::new(vec![
1201 Field::new("utf8", DataType::Utf8, false),
1202 Field::new("large_utf8", DataType::LargeUtf8, false),
1203 Field::new("utf8_view", DataType::Utf8View, false),
1204 ]);
1205
1206 let utf8 = StringArray::from(vec![" leading", "trailing ", " both ", "no_spaces"]);
1207
1208 let large_utf8 =
1209 LargeStringArray::from(vec![" leading", "trailing ", " both ", "no_spaces"]);
1210
1211 let utf8_view =
1212 StringViewArray::from(vec![" leading", "trailing ", " both ", "no_spaces"]);
1213
1214 let batch = RecordBatch::try_new(
1215 Arc::new(schema),
1216 vec![Arc::new(utf8), Arc::new(large_utf8), Arc::new(utf8_view)],
1217 )
1218 .unwrap();
1219
1220 let mut buf = Vec::new();
1222 let builder = WriterBuilder::new();
1223 let mut writer = builder.build(&mut buf);
1224 writer.write(&batch).unwrap();
1225 drop(writer);
1226 assert_eq!(
1227 "utf8,large_utf8,utf8_view\n leading, leading, leading\ntrailing ,trailing ,trailing \n both , both , both \nno_spaces,no_spaces,no_spaces\n",
1228 String::from_utf8(buf).unwrap()
1229 );
1230
1231 let mut buf = Vec::new();
1233 let builder = WriterBuilder::new()
1234 .with_ignore_leading_whitespace(true)
1235 .with_ignore_trailing_whitespace(true);
1236 let mut writer = builder.build(&mut buf);
1237 writer.write(&batch).unwrap();
1238 drop(writer);
1239 assert_eq!(
1240 "utf8,large_utf8,utf8_view\nleading,leading,leading\ntrailing,trailing,trailing\nboth,both,both\nno_spaces,no_spaces,no_spaces\n",
1241 String::from_utf8(buf).unwrap()
1242 );
1243
1244 let mut buf = Vec::new();
1246 let builder = WriterBuilder::new().with_ignore_leading_whitespace(true);
1247 let mut writer = builder.build(&mut buf);
1248 writer.write(&batch).unwrap();
1249 drop(writer);
1250 assert_eq!(
1251 "utf8,large_utf8,utf8_view\nleading,leading,leading\ntrailing ,trailing ,trailing \nboth ,both ,both \nno_spaces,no_spaces,no_spaces\n",
1252 String::from_utf8(buf).unwrap()
1253 );
1254
1255 let mut buf = Vec::new();
1257 let builder = WriterBuilder::new().with_ignore_trailing_whitespace(true);
1258 let mut writer = builder.build(&mut buf);
1259 writer.write(&batch).unwrap();
1260 drop(writer);
1261 assert_eq!(
1262 "utf8,large_utf8,utf8_view\n leading, leading, leading\ntrailing,trailing,trailing\n both, both, both\nno_spaces,no_spaces,no_spaces\n",
1263 String::from_utf8(buf).unwrap()
1264 );
1265 }
1266
1267 fn write_quote_style(batch: &RecordBatch, quote_style: QuoteStyle) -> String {
1268 let mut buf = Vec::new();
1269 let mut writer = WriterBuilder::new()
1270 .with_quote_style(quote_style)
1271 .build(&mut buf);
1272 writer.write(batch).unwrap();
1273 drop(writer);
1274 String::from_utf8(buf).unwrap()
1275 }
1276
1277 fn write_quote_style_with_null(
1278 batch: &RecordBatch,
1279 quote_style: QuoteStyle,
1280 null_value: &str,
1281 ) -> String {
1282 let mut buf = Vec::new();
1283 let mut writer = WriterBuilder::new()
1284 .with_quote_style(quote_style)
1285 .with_null(null_value.to_string())
1286 .build(&mut buf);
1287 writer.write(batch).unwrap();
1288 drop(writer);
1289 String::from_utf8(buf).unwrap()
1290 }
1291
1292 #[test]
1293 fn test_write_csv_quote_style() {
1294 let schema = Schema::new(vec![
1295 Field::new("text", DataType::Utf8, false),
1296 Field::new("number", DataType::Int32, false),
1297 Field::new("float", DataType::Float64, false),
1298 ]);
1299
1300 let text = StringArray::from(vec!["hello", "world", "comma,value", "quote\"test"]);
1301 let number = Int32Array::from(vec![1, 2, 3, 4]);
1302 let float = Float64Array::from(vec![1.1, 2.2, 3.3, 4.4]);
1303
1304 let batch = RecordBatch::try_new(
1305 Arc::new(schema),
1306 vec![Arc::new(text), Arc::new(number), Arc::new(float)],
1307 )
1308 .unwrap();
1309
1310 assert_eq!(
1312 "text,number,float\nhello,1,1.1\nworld,2,2.2\n\"comma,value\",3,3.3\n\"quote\"\"test\",4,4.4\n",
1313 write_quote_style(&batch, QuoteStyle::Necessary)
1314 );
1315
1316 assert_eq!(
1318 "\"text\",\"number\",\"float\"\n\"hello\",\"1\",\"1.1\"\n\"world\",\"2\",\"2.2\"\n\"comma,value\",\"3\",\"3.3\"\n\"quote\"\"test\",\"4\",\"4.4\"\n",
1319 write_quote_style(&batch, QuoteStyle::Always)
1320 );
1321
1322 assert_eq!(
1324 "\"text\",\"number\",\"float\"\n\"hello\",1,1.1\n\"world\",2,2.2\n\"comma,value\",3,3.3\n\"quote\"\"test\",4,4.4\n",
1325 write_quote_style(&batch, QuoteStyle::NonNumeric)
1326 );
1327
1328 assert_eq!(
1331 "text,number,float\nhello,1,1.1\nworld,2,2.2\ncomma,value,3,3.3\nquote\"test,4,4.4\n",
1332 write_quote_style(&batch, QuoteStyle::Never)
1333 );
1334 }
1335
1336 #[test]
1337 fn test_write_csv_quote_style_with_nulls() {
1338 let schema = Schema::new(vec![
1339 Field::new("text", DataType::Utf8, true),
1340 Field::new("number", DataType::Int32, true),
1341 ]);
1342
1343 let text = StringArray::from(vec![Some("hello"), None, Some("world")]);
1344 let number = Int32Array::from(vec![Some(1), Some(2), None]);
1345
1346 let batch =
1347 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(text), Arc::new(number)]).unwrap();
1348
1349 assert_eq!(
1351 "\"text\",\"number\"\n\"hello\",\"1\"\n\"\",\"2\"\n\"world\",\"\"\n",
1352 write_quote_style(&batch, QuoteStyle::Always)
1353 );
1354
1355 assert_eq!(
1357 "\"text\",\"number\"\n\"hello\",\"1\"\n\"NULL\",\"2\"\n\"world\",\"NULL\"\n",
1358 write_quote_style_with_null(&batch, QuoteStyle::Always, "NULL")
1359 );
1360 }
1361}