arrow_csv/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! CSV Writer
19//!
20//! This CSV writer allows Arrow data (in record batches) to be written as CSV files.
21//! The writer does not support writing `ListArray` and `StructArray`.
22//!
23//! Example:
24//!
25//! ```
26//! # use arrow_array::*;
27//! # use arrow_array::types::*;
28//! # use arrow_csv::Writer;
29//! # use arrow_schema::*;
30//! # use std::sync::Arc;
31//!
32//! let schema = Schema::new(vec![
33//!     Field::new("c1", DataType::Utf8, false),
34//!     Field::new("c2", DataType::Float64, true),
35//!     Field::new("c3", DataType::UInt32, false),
36//!     Field::new("c4", DataType::Boolean, true),
37//! ]);
38//! let c1 = StringArray::from(vec![
39//!     "Lorem ipsum dolor sit amet",
40//!     "consectetur adipiscing elit",
41//!     "sed do eiusmod tempor",
42//! ]);
43//! let c2 = PrimitiveArray::<Float64Type>::from(vec![
44//!     Some(123.564532),
45//!     None,
46//!     Some(-556132.25),
47//! ]);
48//! let c3 = PrimitiveArray::<UInt32Type>::from(vec![3, 2, 1]);
49//! let c4 = BooleanArray::from(vec![Some(true), Some(false), None]);
50//!
51//! let batch = RecordBatch::try_new(
52//!     Arc::new(schema),
53//!     vec![Arc::new(c1), Arc::new(c2), Arc::new(c3), Arc::new(c4)],
54//! )
55//! .unwrap();
56//!
57//! let mut output = Vec::with_capacity(1024);
58//!
59//! let mut writer = Writer::new(&mut output);
60//! let batches = vec![&batch, &batch];
61//! for batch in batches {
62//!     writer.write(batch).unwrap();
63//! }
64//! ```
65
66use arrow_array::*;
67use arrow_cast::display::*;
68use arrow_schema::*;
69use csv::ByteRecord;
70use std::io::Write;
71
72use crate::map_csv_error;
73const DEFAULT_NULL_VALUE: &str = "";
74
75/// A CSV writer
76#[derive(Debug)]
77pub struct Writer<W: Write> {
78    /// The object to write to
79    writer: csv::Writer<W>,
80    /// Whether file should be written with headers, defaults to `true`
81    has_headers: bool,
82    /// The date format for date arrays, defaults to RFC3339
83    date_format: Option<String>,
84    /// The datetime format for datetime arrays, defaults to RFC3339
85    datetime_format: Option<String>,
86    /// The timestamp format for timestamp arrays, defaults to RFC3339
87    timestamp_format: Option<String>,
88    /// The timestamp format for timestamp (with timezone) arrays, defaults to RFC3339
89    timestamp_tz_format: Option<String>,
90    /// The time format for time arrays, defaults to RFC3339
91    time_format: Option<String>,
92    /// Is the beginning-of-writer
93    beginning: bool,
94    /// The value to represent null entries, defaults to [`DEFAULT_NULL_VALUE`]
95    null_value: Option<String>,
96}
97
98impl<W: Write> Writer<W> {
99    /// Create a new CsvWriter from a writable object, with default options
100    pub fn new(writer: W) -> Self {
101        let delimiter = b',';
102        WriterBuilder::new().with_delimiter(delimiter).build(writer)
103    }
104
105    /// Write a vector of record batches to a writable object
106    pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
107        let num_columns = batch.num_columns();
108        if self.beginning {
109            if self.has_headers {
110                let mut headers: Vec<String> = Vec::with_capacity(num_columns);
111                batch
112                    .schema()
113                    .fields()
114                    .iter()
115                    .for_each(|field| headers.push(field.name().to_string()));
116                self.writer
117                    .write_record(&headers[..])
118                    .map_err(map_csv_error)?;
119            }
120            self.beginning = false;
121        }
122
123        let options = FormatOptions::default()
124            .with_null(self.null_value.as_deref().unwrap_or(DEFAULT_NULL_VALUE))
125            .with_date_format(self.date_format.as_deref())
126            .with_datetime_format(self.datetime_format.as_deref())
127            .with_timestamp_format(self.timestamp_format.as_deref())
128            .with_timestamp_tz_format(self.timestamp_tz_format.as_deref())
129            .with_time_format(self.time_format.as_deref());
130
131        let converters = batch
132            .columns()
133            .iter()
134            .map(|a| {
135                if a.data_type().is_nested() {
136                    Err(ArrowError::CsvError(format!(
137                        "Nested type {} is not supported in CSV",
138                        a.data_type()
139                    )))
140                } else {
141                    ArrayFormatter::try_new(a.as_ref(), &options)
142                }
143            })
144            .collect::<Result<Vec<_>, ArrowError>>()?;
145
146        let mut buffer = String::with_capacity(1024);
147        let mut byte_record = ByteRecord::with_capacity(1024, converters.len());
148
149        for row_idx in 0..batch.num_rows() {
150            byte_record.clear();
151            for (col_idx, converter) in converters.iter().enumerate() {
152                buffer.clear();
153                converter.value(row_idx).write(&mut buffer).map_err(|e| {
154                    ArrowError::CsvError(format!(
155                        "Error processing row {}, col {}: {e}",
156                        row_idx + 1,
157                        col_idx + 1
158                    ))
159                })?;
160                byte_record.push_field(buffer.as_bytes());
161            }
162
163            self.writer
164                .write_byte_record(&byte_record)
165                .map_err(map_csv_error)?;
166        }
167        self.writer.flush()?;
168
169        Ok(())
170    }
171
172    /// Unwraps this `Writer<W>`, returning the underlying writer.
173    pub fn into_inner(self) -> W {
174        // Safe to call `unwrap` since `write` always flushes the writer.
175        self.writer.into_inner().unwrap()
176    }
177}
178
179impl<W: Write> RecordBatchWriter for Writer<W> {
180    fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
181        self.write(batch)
182    }
183
184    fn close(self) -> Result<(), ArrowError> {
185        Ok(())
186    }
187}
188
189/// A CSV writer builder
190#[derive(Clone, Debug)]
191pub struct WriterBuilder {
192    /// Optional column delimiter. Defaults to `b','`
193    delimiter: u8,
194    /// Whether to write column names as file headers. Defaults to `true`
195    has_header: bool,
196    /// Optional quote character. Defaults to `b'"'`
197    quote: u8,
198    /// Optional escape character. Defaults to `b'\\'`
199    escape: u8,
200    /// Enable double quote escapes. Defaults to `true`
201    double_quote: bool,
202    /// Optional date format for date arrays
203    date_format: Option<String>,
204    /// Optional datetime format for datetime arrays
205    datetime_format: Option<String>,
206    /// Optional timestamp format for timestamp arrays
207    timestamp_format: Option<String>,
208    /// Optional timestamp format for timestamp with timezone arrays
209    timestamp_tz_format: Option<String>,
210    /// Optional time format for time arrays
211    time_format: Option<String>,
212    /// Optional value to represent null
213    null_value: Option<String>,
214}
215
216impl Default for WriterBuilder {
217    fn default() -> Self {
218        WriterBuilder {
219            delimiter: b',',
220            has_header: true,
221            quote: b'"',
222            escape: b'\\',
223            double_quote: true,
224            date_format: None,
225            datetime_format: None,
226            timestamp_format: None,
227            timestamp_tz_format: None,
228            time_format: None,
229            null_value: None,
230        }
231    }
232}
233
234impl WriterBuilder {
235    /// Create a new builder for configuring CSV writing options.
236    ///
237    /// To convert a builder into a writer, call `WriterBuilder::build`
238    ///
239    /// # Example
240    ///
241    /// ```
242    /// # use arrow_csv::{Writer, WriterBuilder};
243    /// # use std::fs::File;
244    ///
245    /// fn example() -> Writer<File> {
246    ///     let file = File::create("target/out.csv").unwrap();
247    ///
248    ///     // create a builder that doesn't write headers
249    ///     let builder = WriterBuilder::new().with_header(false);
250    ///     let writer = builder.build(file);
251    ///
252    ///     writer
253    /// }
254    /// ```
255    pub fn new() -> Self {
256        Self::default()
257    }
258
259    /// Set whether to write the CSV file with a header
260    pub fn with_header(mut self, header: bool) -> Self {
261        self.has_header = header;
262        self
263    }
264
265    /// Returns `true` if this writer is configured to write a header
266    pub fn header(&self) -> bool {
267        self.has_header
268    }
269
270    /// Set the CSV file's column delimiter as a byte character
271    pub fn with_delimiter(mut self, delimiter: u8) -> Self {
272        self.delimiter = delimiter;
273        self
274    }
275
276    /// Get the CSV file's column delimiter as a byte character
277    pub fn delimiter(&self) -> u8 {
278        self.delimiter
279    }
280
281    /// Set the CSV file's quote character as a byte character
282    pub fn with_quote(mut self, quote: u8) -> Self {
283        self.quote = quote;
284        self
285    }
286
287    /// Get the CSV file's quote character as a byte character
288    pub fn quote(&self) -> u8 {
289        self.quote
290    }
291
292    /// Set the CSV file's escape character as a byte character
293    ///
294    /// In some variants of CSV, quotes are escaped using a special escape
295    /// character like `\` (instead of escaping quotes by doubling them).
296    ///
297    /// By default, writing these idiosyncratic escapes is disabled, and is
298    /// only used when `double_quote` is disabled.
299    pub fn with_escape(mut self, escape: u8) -> Self {
300        self.escape = escape;
301        self
302    }
303
304    /// Get the CSV file's escape character as a byte character
305    pub fn escape(&self) -> u8 {
306        self.escape
307    }
308
309    /// Set whether to enable double quote escapes
310    ///
311    /// When enabled (which is the default), quotes are escaped by doubling
312    /// them. e.g., `"` escapes to `""`.
313    ///
314    /// When disabled, quotes are escaped with the escape character (which
315    /// is `\\` by default).
316    pub fn with_double_quote(mut self, double_quote: bool) -> Self {
317        self.double_quote = double_quote;
318        self
319    }
320
321    /// Get whether double quote escapes are enabled
322    pub fn double_quote(&self) -> bool {
323        self.double_quote
324    }
325
326    /// Set the CSV file's date format
327    pub fn with_date_format(mut self, format: String) -> Self {
328        self.date_format = Some(format);
329        self
330    }
331
332    /// Get the CSV file's date format if set, defaults to RFC3339
333    pub fn date_format(&self) -> Option<&str> {
334        self.date_format.as_deref()
335    }
336
337    /// Set the CSV file's datetime format
338    pub fn with_datetime_format(mut self, format: String) -> Self {
339        self.datetime_format = Some(format);
340        self
341    }
342
343    /// Get the CSV file's datetime format if set, defaults to RFC3339
344    pub fn datetime_format(&self) -> Option<&str> {
345        self.datetime_format.as_deref()
346    }
347
348    /// Set the CSV file's time format
349    pub fn with_time_format(mut self, format: String) -> Self {
350        self.time_format = Some(format);
351        self
352    }
353
354    /// Get the CSV file's datetime time if set, defaults to RFC3339
355    pub fn time_format(&self) -> Option<&str> {
356        self.time_format.as_deref()
357    }
358
359    /// Set the CSV file's timestamp format
360    pub fn with_timestamp_format(mut self, format: String) -> Self {
361        self.timestamp_format = Some(format);
362        self
363    }
364
365    /// Get the CSV file's timestamp format if set, defaults to RFC3339
366    pub fn timestamp_format(&self) -> Option<&str> {
367        self.timestamp_format.as_deref()
368    }
369
370    /// Set the CSV file's timestamp tz format
371    pub fn with_timestamp_tz_format(mut self, tz_format: String) -> Self {
372        self.timestamp_tz_format = Some(tz_format);
373        self
374    }
375
376    /// Get the CSV file's timestamp tz format if set, defaults to RFC3339
377    pub fn timestamp_tz_format(&self) -> Option<&str> {
378        self.timestamp_tz_format.as_deref()
379    }
380
381    /// Set the value to represent null in output
382    pub fn with_null(mut self, null_value: String) -> Self {
383        self.null_value = Some(null_value);
384        self
385    }
386
387    /// Get the value to represent null in output
388    pub fn null(&self) -> &str {
389        self.null_value.as_deref().unwrap_or(DEFAULT_NULL_VALUE)
390    }
391
392    /// Create a new `Writer`
393    pub fn build<W: Write>(self, writer: W) -> Writer<W> {
394        let mut builder = csv::WriterBuilder::new();
395        let writer = builder
396            .delimiter(self.delimiter)
397            .quote(self.quote)
398            .double_quote(self.double_quote)
399            .escape(self.escape)
400            .from_writer(writer);
401        Writer {
402            writer,
403            beginning: true,
404            has_headers: self.has_header,
405            date_format: self.date_format,
406            datetime_format: self.datetime_format,
407            time_format: self.time_format,
408            timestamp_format: self.timestamp_format,
409            timestamp_tz_format: self.timestamp_tz_format,
410            null_value: self.null_value,
411        }
412    }
413}
414
415#[cfg(test)]
416mod tests {
417    use super::*;
418
419    use crate::ReaderBuilder;
420    use arrow_array::builder::{
421        BinaryBuilder, Decimal128Builder, Decimal256Builder, Decimal32Builder, Decimal64Builder,
422        FixedSizeBinaryBuilder, LargeBinaryBuilder,
423    };
424    use arrow_array::types::*;
425    use arrow_buffer::i256;
426    use core::str;
427    use std::io::{Cursor, Read, Seek};
428    use std::sync::Arc;
429
430    #[test]
431    fn test_write_csv() {
432        let schema = Schema::new(vec![
433            Field::new("c1", DataType::Utf8, false),
434            Field::new("c2", DataType::Float64, true),
435            Field::new("c3", DataType::UInt32, false),
436            Field::new("c4", DataType::Boolean, true),
437            Field::new("c5", DataType::Timestamp(TimeUnit::Millisecond, None), true),
438            Field::new("c6", DataType::Time32(TimeUnit::Second), false),
439            Field::new_dictionary("c7", DataType::Int32, DataType::Utf8, false),
440        ]);
441
442        let c1 = StringArray::from(vec![
443            "Lorem ipsum dolor sit amet",
444            "consectetur adipiscing elit",
445            "sed do eiusmod tempor",
446        ]);
447        let c2 =
448            PrimitiveArray::<Float64Type>::from(vec![Some(123.564532), None, Some(-556132.25)]);
449        let c3 = PrimitiveArray::<UInt32Type>::from(vec![3, 2, 1]);
450        let c4 = BooleanArray::from(vec![Some(true), Some(false), None]);
451        let c5 =
452            TimestampMillisecondArray::from(vec![None, Some(1555584887378), Some(1555555555555)]);
453        let c6 = Time32SecondArray::from(vec![1234, 24680, 85563]);
454        let c7: DictionaryArray<Int32Type> =
455            vec!["cupcakes", "cupcakes", "foo"].into_iter().collect();
456
457        let batch = RecordBatch::try_new(
458            Arc::new(schema),
459            vec![
460                Arc::new(c1),
461                Arc::new(c2),
462                Arc::new(c3),
463                Arc::new(c4),
464                Arc::new(c5),
465                Arc::new(c6),
466                Arc::new(c7),
467            ],
468        )
469        .unwrap();
470
471        let mut file = tempfile::tempfile().unwrap();
472
473        let mut writer = Writer::new(&mut file);
474        let batches = vec![&batch, &batch];
475        for batch in batches {
476            writer.write(batch).unwrap();
477        }
478        drop(writer);
479
480        // check that file was written successfully
481        file.rewind().unwrap();
482        let mut buffer: Vec<u8> = vec![];
483        file.read_to_end(&mut buffer).unwrap();
484
485        let expected = r#"c1,c2,c3,c4,c5,c6,c7
486Lorem ipsum dolor sit amet,123.564532,3,true,,00:20:34,cupcakes
487consectetur adipiscing elit,,2,false,2019-04-18T10:54:47.378,06:51:20,cupcakes
488sed do eiusmod tempor,-556132.25,1,,2019-04-18T02:45:55.555,23:46:03,foo
489Lorem ipsum dolor sit amet,123.564532,3,true,,00:20:34,cupcakes
490consectetur adipiscing elit,,2,false,2019-04-18T10:54:47.378,06:51:20,cupcakes
491sed do eiusmod tempor,-556132.25,1,,2019-04-18T02:45:55.555,23:46:03,foo
492"#;
493        assert_eq!(expected, str::from_utf8(&buffer).unwrap());
494    }
495
496    #[test]
497    fn test_write_csv_decimal() {
498        let schema = Schema::new(vec![
499            Field::new("c1", DataType::Decimal32(9, 6), true),
500            Field::new("c2", DataType::Decimal64(17, 6), true),
501            Field::new("c3", DataType::Decimal128(38, 6), true),
502            Field::new("c4", DataType::Decimal256(76, 6), true),
503        ]);
504
505        let mut c1_builder = Decimal32Builder::new().with_data_type(DataType::Decimal32(9, 6));
506        c1_builder.extend(vec![Some(-3335724), Some(2179404), None, Some(290472)]);
507        let c1 = c1_builder.finish();
508
509        let mut c2_builder = Decimal64Builder::new().with_data_type(DataType::Decimal64(17, 6));
510        c2_builder.extend(vec![Some(-3335724), Some(2179404), None, Some(290472)]);
511        let c2 = c2_builder.finish();
512
513        let mut c3_builder = Decimal128Builder::new().with_data_type(DataType::Decimal128(38, 6));
514        c3_builder.extend(vec![Some(-3335724), Some(2179404), None, Some(290472)]);
515        let c3 = c3_builder.finish();
516
517        let mut c4_builder = Decimal256Builder::new().with_data_type(DataType::Decimal256(76, 6));
518        c4_builder.extend(vec![
519            Some(i256::from_i128(-3335724)),
520            Some(i256::from_i128(2179404)),
521            None,
522            Some(i256::from_i128(290472)),
523        ]);
524        let c4 = c4_builder.finish();
525
526        let batch = RecordBatch::try_new(
527            Arc::new(schema),
528            vec![Arc::new(c1), Arc::new(c2), Arc::new(c3), Arc::new(c4)],
529        )
530        .unwrap();
531
532        let mut file = tempfile::tempfile().unwrap();
533
534        let mut writer = Writer::new(&mut file);
535        let batches = vec![&batch, &batch];
536        for batch in batches {
537            writer.write(batch).unwrap();
538        }
539        drop(writer);
540
541        // check that file was written successfully
542        file.rewind().unwrap();
543        let mut buffer: Vec<u8> = vec![];
544        file.read_to_end(&mut buffer).unwrap();
545
546        let expected = r#"c1,c2,c3,c4
547-3.335724,-3.335724,-3.335724,-3.335724
5482.179404,2.179404,2.179404,2.179404
549,,,
5500.290472,0.290472,0.290472,0.290472
551-3.335724,-3.335724,-3.335724,-3.335724
5522.179404,2.179404,2.179404,2.179404
553,,,
5540.290472,0.290472,0.290472,0.290472
555"#;
556        assert_eq!(expected, str::from_utf8(&buffer).unwrap());
557    }
558
559    #[test]
560    fn test_write_csv_custom_options() {
561        let schema = Schema::new(vec![
562            Field::new("c1", DataType::Utf8, false),
563            Field::new("c2", DataType::Float64, true),
564            Field::new("c3", DataType::UInt32, false),
565            Field::new("c4", DataType::Boolean, true),
566            Field::new("c6", DataType::Time32(TimeUnit::Second), false),
567        ]);
568
569        let c1 = StringArray::from(vec![
570            "Lorem ipsum \ndolor sit amet",
571            "consectetur \"adipiscing\" elit",
572            "sed do eiusmod tempor",
573        ]);
574        let c2 =
575            PrimitiveArray::<Float64Type>::from(vec![Some(123.564532), None, Some(-556132.25)]);
576        let c3 = PrimitiveArray::<UInt32Type>::from(vec![3, 2, 1]);
577        let c4 = BooleanArray::from(vec![Some(true), Some(false), None]);
578        let c6 = Time32SecondArray::from(vec![1234, 24680, 85563]);
579
580        let batch = RecordBatch::try_new(
581            Arc::new(schema),
582            vec![
583                Arc::new(c1),
584                Arc::new(c2),
585                Arc::new(c3),
586                Arc::new(c4),
587                Arc::new(c6),
588            ],
589        )
590        .unwrap();
591
592        let mut file = tempfile::tempfile().unwrap();
593
594        let builder = WriterBuilder::new()
595            .with_header(false)
596            .with_delimiter(b'|')
597            .with_quote(b'\'')
598            .with_null("NULL".to_string())
599            .with_time_format("%r".to_string());
600        let mut writer = builder.build(&mut file);
601        let batches = vec![&batch];
602        for batch in batches {
603            writer.write(batch).unwrap();
604        }
605        drop(writer);
606
607        // check that file was written successfully
608        file.rewind().unwrap();
609        let mut buffer: Vec<u8> = vec![];
610        file.read_to_end(&mut buffer).unwrap();
611
612        assert_eq!(
613            "'Lorem ipsum \ndolor sit amet'|123.564532|3|true|12:20:34 AM\nconsectetur \"adipiscing\" elit|NULL|2|false|06:51:20 AM\nsed do eiusmod tempor|-556132.25|1|NULL|11:46:03 PM\n"
614            .to_string(),
615            String::from_utf8(buffer).unwrap()
616        );
617
618        let mut file = tempfile::tempfile().unwrap();
619
620        let builder = WriterBuilder::new()
621            .with_header(true)
622            .with_double_quote(false)
623            .with_escape(b'$');
624        let mut writer = builder.build(&mut file);
625        let batches = vec![&batch];
626        for batch in batches {
627            writer.write(batch).unwrap();
628        }
629        drop(writer);
630
631        file.rewind().unwrap();
632        let mut buffer: Vec<u8> = vec![];
633        file.read_to_end(&mut buffer).unwrap();
634
635        assert_eq!(
636            "c1,c2,c3,c4,c6\n\"Lorem ipsum \ndolor sit amet\",123.564532,3,true,00:20:34\n\"consectetur $\"adipiscing$\" elit\",,2,false,06:51:20\nsed do eiusmod tempor,-556132.25,1,,23:46:03\n"
637            .to_string(),
638            String::from_utf8(buffer).unwrap()
639        );
640    }
641
642    #[test]
643    fn test_conversion_consistency() {
644        // test if we can serialize and deserialize whilst retaining the same type information/ precision
645
646        let schema = Schema::new(vec![
647            Field::new("c1", DataType::Date32, false),
648            Field::new("c2", DataType::Date64, false),
649            Field::new("c3", DataType::Timestamp(TimeUnit::Nanosecond, None), false),
650        ]);
651
652        let nanoseconds = vec![
653            1599566300000000000,
654            1599566200000000000,
655            1599566100000000000,
656        ];
657        let c1 = Date32Array::from(vec![3, 2, 1]);
658        let c2 = Date64Array::from(vec![3, 2, 1]);
659        let c3 = TimestampNanosecondArray::from(nanoseconds.clone());
660
661        let batch = RecordBatch::try_new(
662            Arc::new(schema.clone()),
663            vec![Arc::new(c1), Arc::new(c2), Arc::new(c3)],
664        )
665        .unwrap();
666
667        let builder = WriterBuilder::new().with_header(false);
668
669        let mut buf: Cursor<Vec<u8>> = Default::default();
670        // drop the writer early to release the borrow.
671        {
672            let mut writer = builder.build(&mut buf);
673            writer.write(&batch).unwrap();
674        }
675        buf.set_position(0);
676
677        let mut reader = ReaderBuilder::new(Arc::new(schema))
678            .with_batch_size(3)
679            .build_buffered(buf)
680            .unwrap();
681
682        let rb = reader.next().unwrap().unwrap();
683        let c1 = rb.column(0).as_any().downcast_ref::<Date32Array>().unwrap();
684        let c2 = rb.column(1).as_any().downcast_ref::<Date64Array>().unwrap();
685        let c3 = rb
686            .column(2)
687            .as_any()
688            .downcast_ref::<TimestampNanosecondArray>()
689            .unwrap();
690
691        let actual = c1.into_iter().collect::<Vec<_>>();
692        let expected = vec![Some(3), Some(2), Some(1)];
693        assert_eq!(actual, expected);
694        let actual = c2.into_iter().collect::<Vec<_>>();
695        let expected = vec![Some(3), Some(2), Some(1)];
696        assert_eq!(actual, expected);
697        let actual = c3.into_iter().collect::<Vec<_>>();
698        let expected = nanoseconds.into_iter().map(Some).collect::<Vec<_>>();
699        assert_eq!(actual, expected);
700    }
701
702    #[test]
703    fn test_write_csv_invalid_cast() {
704        let schema = Schema::new(vec![
705            Field::new("c0", DataType::UInt32, false),
706            Field::new("c1", DataType::Date64, false),
707        ]);
708
709        let c0 = UInt32Array::from(vec![Some(123), Some(234)]);
710        let c1 = Date64Array::from(vec![Some(1926632005177), Some(1926632005177685347)]);
711        let batch =
712            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c0), Arc::new(c1)]).unwrap();
713
714        let mut file = tempfile::tempfile().unwrap();
715        let mut writer = Writer::new(&mut file);
716        let batches = vec![&batch, &batch];
717
718        for batch in batches {
719            let err = writer.write(batch).unwrap_err().to_string();
720            assert_eq!(err, "Csv error: Error processing row 2, col 2: Cast error: Failed to convert 1926632005177685347 to temporal for Date64")
721        }
722        drop(writer);
723    }
724
725    #[test]
726    fn test_write_csv_using_rfc3339() {
727        let schema = Schema::new(vec![
728            Field::new(
729                "c1",
730                DataType::Timestamp(TimeUnit::Millisecond, Some("+00:00".into())),
731                true,
732            ),
733            Field::new("c2", DataType::Timestamp(TimeUnit::Millisecond, None), true),
734            Field::new("c3", DataType::Date32, false),
735            Field::new("c4", DataType::Time32(TimeUnit::Second), false),
736        ]);
737
738        let c1 = TimestampMillisecondArray::from(vec![Some(1555584887378), Some(1635577147000)])
739            .with_timezone("+00:00".to_string());
740        let c2 = TimestampMillisecondArray::from(vec![Some(1555584887378), Some(1635577147000)]);
741        let c3 = Date32Array::from(vec![3, 2]);
742        let c4 = Time32SecondArray::from(vec![1234, 24680]);
743
744        let batch = RecordBatch::try_new(
745            Arc::new(schema),
746            vec![Arc::new(c1), Arc::new(c2), Arc::new(c3), Arc::new(c4)],
747        )
748        .unwrap();
749
750        let mut file = tempfile::tempfile().unwrap();
751
752        let builder = WriterBuilder::new();
753        let mut writer = builder.build(&mut file);
754        let batches = vec![&batch];
755        for batch in batches {
756            writer.write(batch).unwrap();
757        }
758        drop(writer);
759
760        file.rewind().unwrap();
761        let mut buffer: Vec<u8> = vec![];
762        file.read_to_end(&mut buffer).unwrap();
763
764        assert_eq!(
765            "c1,c2,c3,c4
7662019-04-18T10:54:47.378Z,2019-04-18T10:54:47.378,1970-01-04,00:20:34
7672021-10-30T06:59:07Z,2021-10-30T06:59:07,1970-01-03,06:51:20\n",
768            String::from_utf8(buffer).unwrap()
769        );
770    }
771
772    #[test]
773    fn test_write_csv_tz_format() {
774        let schema = Schema::new(vec![
775            Field::new(
776                "c1",
777                DataType::Timestamp(TimeUnit::Millisecond, Some("+02:00".into())),
778                true,
779            ),
780            Field::new(
781                "c2",
782                DataType::Timestamp(TimeUnit::Second, Some("+04:00".into())),
783                true,
784            ),
785        ]);
786        let c1 = TimestampMillisecondArray::from(vec![Some(1_000), Some(2_000)])
787            .with_timezone("+02:00".to_string());
788        let c2 = TimestampSecondArray::from(vec![Some(1_000_000), None])
789            .with_timezone("+04:00".to_string());
790        let batch =
791            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap();
792
793        let mut file = tempfile::tempfile().unwrap();
794        let mut writer = WriterBuilder::new()
795            .with_timestamp_tz_format("%M:%H".to_string())
796            .build(&mut file);
797        writer.write(&batch).unwrap();
798
799        drop(writer);
800        file.rewind().unwrap();
801        let mut buffer: Vec<u8> = vec![];
802        file.read_to_end(&mut buffer).unwrap();
803
804        assert_eq!(
805            "c1,c2\n00:02,46:17\n00:02,\n",
806            String::from_utf8(buffer).unwrap()
807        );
808    }
809
810    #[test]
811    fn test_write_csv_binary() {
812        let fixed_size = 8;
813        let schema = SchemaRef::new(Schema::new(vec![
814            Field::new("c1", DataType::Binary, true),
815            Field::new("c2", DataType::FixedSizeBinary(fixed_size), true),
816            Field::new("c3", DataType::LargeBinary, true),
817        ]));
818        let mut c1_builder = BinaryBuilder::new();
819        c1_builder.append_value(b"Homer");
820        c1_builder.append_value(b"Bart");
821        c1_builder.append_null();
822        c1_builder.append_value(b"Ned");
823        let mut c2_builder = FixedSizeBinaryBuilder::new(fixed_size);
824        c2_builder.append_value(b"Simpson ").unwrap();
825        c2_builder.append_value(b"Simpson ").unwrap();
826        c2_builder.append_null();
827        c2_builder.append_value(b"Flanders").unwrap();
828        let mut c3_builder = LargeBinaryBuilder::new();
829        c3_builder.append_null();
830        c3_builder.append_null();
831        c3_builder.append_value(b"Comic Book Guy");
832        c3_builder.append_null();
833
834        let batch = RecordBatch::try_new(
835            schema,
836            vec![
837                Arc::new(c1_builder.finish()) as ArrayRef,
838                Arc::new(c2_builder.finish()) as ArrayRef,
839                Arc::new(c3_builder.finish()) as ArrayRef,
840            ],
841        )
842        .unwrap();
843
844        let mut buf = Vec::new();
845        let builder = WriterBuilder::new();
846        let mut writer = builder.build(&mut buf);
847        writer.write(&batch).unwrap();
848        drop(writer);
849        assert_eq!(
850            "\
851            c1,c2,c3\n\
852            486f6d6572,53696d70736f6e20,\n\
853            42617274,53696d70736f6e20,\n\
854            ,,436f6d696320426f6f6b20477579\n\
855            4e6564,466c616e64657273,\n\
856            ",
857            String::from_utf8(buf).unwrap()
858        );
859    }
860}