use arrow_array::*;
use arrow_cast::display::*;
use arrow_schema::*;
use csv::ByteRecord;
use std::io::Write;
use crate::map_csv_error;
const DEFAULT_NULL_VALUE: &str = "";
#[derive(Debug)]
pub struct Writer<W: Write> {
writer: csv::Writer<W>,
has_headers: bool,
date_format: Option<String>,
datetime_format: Option<String>,
timestamp_format: Option<String>,
timestamp_tz_format: Option<String>,
time_format: Option<String>,
beginning: bool,
null_value: Option<String>,
}
impl<W: Write> Writer<W> {
pub fn new(writer: W) -> Self {
let delimiter = b',';
WriterBuilder::new().with_delimiter(delimiter).build(writer)
}
pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
let num_columns = batch.num_columns();
if self.beginning {
if self.has_headers {
let mut headers: Vec<String> = Vec::with_capacity(num_columns);
batch
.schema()
.fields()
.iter()
.for_each(|field| headers.push(field.name().to_string()));
self.writer
.write_record(&headers[..])
.map_err(map_csv_error)?;
}
self.beginning = false;
}
let options = FormatOptions::default()
.with_null(self.null_value.as_deref().unwrap_or(DEFAULT_NULL_VALUE))
.with_date_format(self.date_format.as_deref())
.with_datetime_format(self.datetime_format.as_deref())
.with_timestamp_format(self.timestamp_format.as_deref())
.with_timestamp_tz_format(self.timestamp_tz_format.as_deref())
.with_time_format(self.time_format.as_deref());
let converters = batch
.columns()
.iter()
.map(|a| {
if a.data_type().is_nested() {
Err(ArrowError::CsvError(format!(
"Nested type {} is not supported in CSV",
a.data_type()
)))
} else {
ArrayFormatter::try_new(a.as_ref(), &options)
}
})
.collect::<Result<Vec<_>, ArrowError>>()?;
let mut buffer = String::with_capacity(1024);
let mut byte_record = ByteRecord::with_capacity(1024, converters.len());
for row_idx in 0..batch.num_rows() {
byte_record.clear();
for (col_idx, converter) in converters.iter().enumerate() {
buffer.clear();
converter.value(row_idx).write(&mut buffer).map_err(|e| {
ArrowError::CsvError(format!(
"Error processing row {}, col {}: {e}",
row_idx + 1,
col_idx + 1
))
})?;
byte_record.push_field(buffer.as_bytes());
}
self.writer
.write_byte_record(&byte_record)
.map_err(map_csv_error)?;
}
self.writer.flush()?;
Ok(())
}
pub fn into_inner(self) -> W {
self.writer.into_inner().unwrap()
}
}
impl<W: Write> RecordBatchWriter for Writer<W> {
fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
self.write(batch)
}
fn close(self) -> Result<(), ArrowError> {
Ok(())
}
}
#[derive(Clone, Debug)]
pub struct WriterBuilder {
delimiter: u8,
has_header: bool,
quote: u8,
escape: u8,
double_quote: bool,
date_format: Option<String>,
datetime_format: Option<String>,
timestamp_format: Option<String>,
timestamp_tz_format: Option<String>,
time_format: Option<String>,
null_value: Option<String>,
}
impl Default for WriterBuilder {
fn default() -> Self {
WriterBuilder {
delimiter: b',',
has_header: true,
quote: b'"',
escape: b'\\',
double_quote: true,
date_format: None,
datetime_format: None,
timestamp_format: None,
timestamp_tz_format: None,
time_format: None,
null_value: None,
}
}
}
impl WriterBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn with_header(mut self, header: bool) -> Self {
self.has_header = header;
self
}
pub fn header(&self) -> bool {
self.has_header
}
pub fn with_delimiter(mut self, delimiter: u8) -> Self {
self.delimiter = delimiter;
self
}
pub fn delimiter(&self) -> u8 {
self.delimiter
}
pub fn with_quote(mut self, quote: u8) -> Self {
self.quote = quote;
self
}
pub fn quote(&self) -> u8 {
self.quote
}
pub fn with_escape(mut self, escape: u8) -> Self {
self.escape = escape;
self
}
pub fn escape(&self) -> u8 {
self.escape
}
pub fn with_double_quote(mut self, double_quote: bool) -> Self {
self.double_quote = double_quote;
self
}
pub fn double_quote(&self) -> bool {
self.double_quote
}
pub fn with_date_format(mut self, format: String) -> Self {
self.date_format = Some(format);
self
}
pub fn date_format(&self) -> Option<&str> {
self.date_format.as_deref()
}
pub fn with_datetime_format(mut self, format: String) -> Self {
self.datetime_format = Some(format);
self
}
pub fn datetime_format(&self) -> Option<&str> {
self.datetime_format.as_deref()
}
pub fn with_time_format(mut self, format: String) -> Self {
self.time_format = Some(format);
self
}
pub fn time_format(&self) -> Option<&str> {
self.time_format.as_deref()
}
pub fn with_timestamp_format(mut self, format: String) -> Self {
self.timestamp_format = Some(format);
self
}
pub fn timestamp_format(&self) -> Option<&str> {
self.timestamp_format.as_deref()
}
pub fn with_timestamp_tz_format(mut self, tz_format: String) -> Self {
self.timestamp_tz_format = Some(tz_format);
self
}
pub fn timestamp_tz_format(&self) -> Option<&str> {
self.timestamp_tz_format.as_deref()
}
pub fn with_null(mut self, null_value: String) -> Self {
self.null_value = Some(null_value);
self
}
pub fn null(&self) -> &str {
self.null_value.as_deref().unwrap_or(DEFAULT_NULL_VALUE)
}
pub fn build<W: Write>(self, writer: W) -> Writer<W> {
let mut builder = csv::WriterBuilder::new();
let writer = builder
.delimiter(self.delimiter)
.quote(self.quote)
.double_quote(self.double_quote)
.escape(self.escape)
.from_writer(writer);
Writer {
writer,
beginning: true,
has_headers: self.has_header,
date_format: self.date_format,
datetime_format: self.datetime_format,
time_format: self.time_format,
timestamp_format: self.timestamp_format,
timestamp_tz_format: self.timestamp_tz_format,
null_value: self.null_value,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::ReaderBuilder;
use arrow_array::builder::{
BinaryBuilder, Decimal128Builder, Decimal256Builder, FixedSizeBinaryBuilder,
LargeBinaryBuilder,
};
use arrow_array::types::*;
use arrow_buffer::i256;
use core::str;
use std::io::{Cursor, Read, Seek};
use std::sync::Arc;
#[test]
fn test_write_csv() {
let schema = Schema::new(vec![
Field::new("c1", DataType::Utf8, false),
Field::new("c2", DataType::Float64, true),
Field::new("c3", DataType::UInt32, false),
Field::new("c4", DataType::Boolean, true),
Field::new("c5", DataType::Timestamp(TimeUnit::Millisecond, None), true),
Field::new("c6", DataType::Time32(TimeUnit::Second), false),
Field::new_dictionary("c7", DataType::Int32, DataType::Utf8, false),
]);
let c1 = StringArray::from(vec![
"Lorem ipsum dolor sit amet",
"consectetur adipiscing elit",
"sed do eiusmod tempor",
]);
let c2 =
PrimitiveArray::<Float64Type>::from(vec![Some(123.564532), None, Some(-556132.25)]);
let c3 = PrimitiveArray::<UInt32Type>::from(vec![3, 2, 1]);
let c4 = BooleanArray::from(vec![Some(true), Some(false), None]);
let c5 =
TimestampMillisecondArray::from(vec![None, Some(1555584887378), Some(1555555555555)]);
let c6 = Time32SecondArray::from(vec![1234, 24680, 85563]);
let c7: DictionaryArray<Int32Type> =
vec!["cupcakes", "cupcakes", "foo"].into_iter().collect();
let batch = RecordBatch::try_new(
Arc::new(schema),
vec![
Arc::new(c1),
Arc::new(c2),
Arc::new(c3),
Arc::new(c4),
Arc::new(c5),
Arc::new(c6),
Arc::new(c7),
],
)
.unwrap();
let mut file = tempfile::tempfile().unwrap();
let mut writer = Writer::new(&mut file);
let batches = vec![&batch, &batch];
for batch in batches {
writer.write(batch).unwrap();
}
drop(writer);
file.rewind().unwrap();
let mut buffer: Vec<u8> = vec![];
file.read_to_end(&mut buffer).unwrap();
let expected = r#"c1,c2,c3,c4,c5,c6,c7
Lorem ipsum dolor sit amet,123.564532,3,true,,00:20:34,cupcakes
consectetur adipiscing elit,,2,false,2019-04-18T10:54:47.378,06:51:20,cupcakes
sed do eiusmod tempor,-556132.25,1,,2019-04-18T02:45:55.555,23:46:03,foo
Lorem ipsum dolor sit amet,123.564532,3,true,,00:20:34,cupcakes
consectetur adipiscing elit,,2,false,2019-04-18T10:54:47.378,06:51:20,cupcakes
sed do eiusmod tempor,-556132.25,1,,2019-04-18T02:45:55.555,23:46:03,foo
"#;
assert_eq!(expected, str::from_utf8(&buffer).unwrap());
}
#[test]
fn test_write_csv_decimal() {
let schema = Schema::new(vec![
Field::new("c1", DataType::Decimal128(38, 6), true),
Field::new("c2", DataType::Decimal256(76, 6), true),
]);
let mut c1_builder = Decimal128Builder::new().with_data_type(DataType::Decimal128(38, 6));
c1_builder.extend(vec![Some(-3335724), Some(2179404), None, Some(290472)]);
let c1 = c1_builder.finish();
let mut c2_builder = Decimal256Builder::new().with_data_type(DataType::Decimal256(76, 6));
c2_builder.extend(vec![
Some(i256::from_i128(-3335724)),
Some(i256::from_i128(2179404)),
None,
Some(i256::from_i128(290472)),
]);
let c2 = c2_builder.finish();
let batch =
RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap();
let mut file = tempfile::tempfile().unwrap();
let mut writer = Writer::new(&mut file);
let batches = vec![&batch, &batch];
for batch in batches {
writer.write(batch).unwrap();
}
drop(writer);
file.rewind().unwrap();
let mut buffer: Vec<u8> = vec![];
file.read_to_end(&mut buffer).unwrap();
let expected = r#"c1,c2
-3.335724,-3.335724
2.179404,2.179404
,
0.290472,0.290472
-3.335724,-3.335724
2.179404,2.179404
,
0.290472,0.290472
"#;
assert_eq!(expected, str::from_utf8(&buffer).unwrap());
}
#[test]
fn test_write_csv_custom_options() {
let schema = Schema::new(vec![
Field::new("c1", DataType::Utf8, false),
Field::new("c2", DataType::Float64, true),
Field::new("c3", DataType::UInt32, false),
Field::new("c4", DataType::Boolean, true),
Field::new("c6", DataType::Time32(TimeUnit::Second), false),
]);
let c1 = StringArray::from(vec![
"Lorem ipsum \ndolor sit amet",
"consectetur \"adipiscing\" elit",
"sed do eiusmod tempor",
]);
let c2 =
PrimitiveArray::<Float64Type>::from(vec![Some(123.564532), None, Some(-556132.25)]);
let c3 = PrimitiveArray::<UInt32Type>::from(vec![3, 2, 1]);
let c4 = BooleanArray::from(vec![Some(true), Some(false), None]);
let c6 = Time32SecondArray::from(vec![1234, 24680, 85563]);
let batch = RecordBatch::try_new(
Arc::new(schema),
vec![
Arc::new(c1),
Arc::new(c2),
Arc::new(c3),
Arc::new(c4),
Arc::new(c6),
],
)
.unwrap();
let mut file = tempfile::tempfile().unwrap();
let builder = WriterBuilder::new()
.with_header(false)
.with_delimiter(b'|')
.with_quote(b'\'')
.with_null("NULL".to_string())
.with_time_format("%r".to_string());
let mut writer = builder.build(&mut file);
let batches = vec![&batch];
for batch in batches {
writer.write(batch).unwrap();
}
drop(writer);
file.rewind().unwrap();
let mut buffer: Vec<u8> = vec![];
file.read_to_end(&mut buffer).unwrap();
assert_eq!(
"'Lorem ipsum \ndolor sit amet'|123.564532|3|true|12:20:34 AM\nconsectetur \"adipiscing\" elit|NULL|2|false|06:51:20 AM\nsed do eiusmod tempor|-556132.25|1|NULL|11:46:03 PM\n"
.to_string(),
String::from_utf8(buffer).unwrap()
);
let mut file = tempfile::tempfile().unwrap();
let builder = WriterBuilder::new()
.with_header(true)
.with_double_quote(false)
.with_escape(b'$');
let mut writer = builder.build(&mut file);
let batches = vec![&batch];
for batch in batches {
writer.write(batch).unwrap();
}
drop(writer);
file.rewind().unwrap();
let mut buffer: Vec<u8> = vec![];
file.read_to_end(&mut buffer).unwrap();
assert_eq!(
"c1,c2,c3,c4,c6\n\"Lorem ipsum \ndolor sit amet\",123.564532,3,true,00:20:34\n\"consectetur $\"adipiscing$\" elit\",,2,false,06:51:20\nsed do eiusmod tempor,-556132.25,1,,23:46:03\n"
.to_string(),
String::from_utf8(buffer).unwrap()
);
}
#[test]
fn test_conversion_consistency() {
let schema = Schema::new(vec![
Field::new("c1", DataType::Date32, false),
Field::new("c2", DataType::Date64, false),
Field::new("c3", DataType::Timestamp(TimeUnit::Nanosecond, None), false),
]);
let nanoseconds = vec![
1599566300000000000,
1599566200000000000,
1599566100000000000,
];
let c1 = Date32Array::from(vec![3, 2, 1]);
let c2 = Date64Array::from(vec![3, 2, 1]);
let c3 = TimestampNanosecondArray::from(nanoseconds.clone());
let batch = RecordBatch::try_new(
Arc::new(schema.clone()),
vec![Arc::new(c1), Arc::new(c2), Arc::new(c3)],
)
.unwrap();
let builder = WriterBuilder::new().with_header(false);
let mut buf: Cursor<Vec<u8>> = Default::default();
{
let mut writer = builder.build(&mut buf);
writer.write(&batch).unwrap();
}
buf.set_position(0);
let mut reader = ReaderBuilder::new(Arc::new(schema))
.with_batch_size(3)
.build_buffered(buf)
.unwrap();
let rb = reader.next().unwrap().unwrap();
let c1 = rb.column(0).as_any().downcast_ref::<Date32Array>().unwrap();
let c2 = rb.column(1).as_any().downcast_ref::<Date64Array>().unwrap();
let c3 = rb
.column(2)
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.unwrap();
let actual = c1.into_iter().collect::<Vec<_>>();
let expected = vec![Some(3), Some(2), Some(1)];
assert_eq!(actual, expected);
let actual = c2.into_iter().collect::<Vec<_>>();
let expected = vec![Some(3), Some(2), Some(1)];
assert_eq!(actual, expected);
let actual = c3.into_iter().collect::<Vec<_>>();
let expected = nanoseconds.into_iter().map(Some).collect::<Vec<_>>();
assert_eq!(actual, expected);
}
#[test]
fn test_write_csv_invalid_cast() {
let schema = Schema::new(vec![
Field::new("c0", DataType::UInt32, false),
Field::new("c1", DataType::Date64, false),
]);
let c0 = UInt32Array::from(vec![Some(123), Some(234)]);
let c1 = Date64Array::from(vec![Some(1926632005177), Some(1926632005177685347)]);
let batch =
RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c0), Arc::new(c1)]).unwrap();
let mut file = tempfile::tempfile().unwrap();
let mut writer = Writer::new(&mut file);
let batches = vec![&batch, &batch];
for batch in batches {
let err = writer.write(batch).unwrap_err().to_string();
assert_eq!(err, "Csv error: Error processing row 2, col 2: Cast error: Failed to convert 1926632005177685347 to temporal for Date64")
}
drop(writer);
}
#[test]
fn test_write_csv_using_rfc3339() {
let schema = Schema::new(vec![
Field::new(
"c1",
DataType::Timestamp(TimeUnit::Millisecond, Some("+00:00".into())),
true,
),
Field::new("c2", DataType::Timestamp(TimeUnit::Millisecond, None), true),
Field::new("c3", DataType::Date32, false),
Field::new("c4", DataType::Time32(TimeUnit::Second), false),
]);
let c1 = TimestampMillisecondArray::from(vec![Some(1555584887378), Some(1635577147000)])
.with_timezone("+00:00".to_string());
let c2 = TimestampMillisecondArray::from(vec![Some(1555584887378), Some(1635577147000)]);
let c3 = Date32Array::from(vec![3, 2]);
let c4 = Time32SecondArray::from(vec![1234, 24680]);
let batch = RecordBatch::try_new(
Arc::new(schema),
vec![Arc::new(c1), Arc::new(c2), Arc::new(c3), Arc::new(c4)],
)
.unwrap();
let mut file = tempfile::tempfile().unwrap();
let builder = WriterBuilder::new();
let mut writer = builder.build(&mut file);
let batches = vec![&batch];
for batch in batches {
writer.write(batch).unwrap();
}
drop(writer);
file.rewind().unwrap();
let mut buffer: Vec<u8> = vec![];
file.read_to_end(&mut buffer).unwrap();
assert_eq!(
"c1,c2,c3,c4
2019-04-18T10:54:47.378Z,2019-04-18T10:54:47.378,1970-01-04,00:20:34
2021-10-30T06:59:07Z,2021-10-30T06:59:07,1970-01-03,06:51:20\n",
String::from_utf8(buffer).unwrap()
);
}
#[test]
fn test_write_csv_tz_format() {
let schema = Schema::new(vec![
Field::new(
"c1",
DataType::Timestamp(TimeUnit::Millisecond, Some("+02:00".into())),
true,
),
Field::new(
"c2",
DataType::Timestamp(TimeUnit::Second, Some("+04:00".into())),
true,
),
]);
let c1 = TimestampMillisecondArray::from(vec![Some(1_000), Some(2_000)])
.with_timezone("+02:00".to_string());
let c2 = TimestampSecondArray::from(vec![Some(1_000_000), None])
.with_timezone("+04:00".to_string());
let batch =
RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap();
let mut file = tempfile::tempfile().unwrap();
let mut writer = WriterBuilder::new()
.with_timestamp_tz_format("%M:%H".to_string())
.build(&mut file);
writer.write(&batch).unwrap();
drop(writer);
file.rewind().unwrap();
let mut buffer: Vec<u8> = vec![];
file.read_to_end(&mut buffer).unwrap();
assert_eq!(
"c1,c2\n00:02,46:17\n00:02,\n",
String::from_utf8(buffer).unwrap()
);
}
#[test]
fn test_write_csv_binary() {
let fixed_size = 8;
let schema = SchemaRef::new(Schema::new(vec![
Field::new("c1", DataType::Binary, true),
Field::new("c2", DataType::FixedSizeBinary(fixed_size), true),
Field::new("c3", DataType::LargeBinary, true),
]));
let mut c1_builder = BinaryBuilder::new();
c1_builder.append_value(b"Homer");
c1_builder.append_value(b"Bart");
c1_builder.append_null();
c1_builder.append_value(b"Ned");
let mut c2_builder = FixedSizeBinaryBuilder::new(fixed_size);
c2_builder.append_value(b"Simpson ").unwrap();
c2_builder.append_value(b"Simpson ").unwrap();
c2_builder.append_null();
c2_builder.append_value(b"Flanders").unwrap();
let mut c3_builder = LargeBinaryBuilder::new();
c3_builder.append_null();
c3_builder.append_null();
c3_builder.append_value(b"Comic Book Guy");
c3_builder.append_null();
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(c1_builder.finish()) as ArrayRef,
Arc::new(c2_builder.finish()) as ArrayRef,
Arc::new(c3_builder.finish()) as ArrayRef,
],
)
.unwrap();
let mut buf = Vec::new();
let builder = WriterBuilder::new();
let mut writer = builder.build(&mut buf);
writer.write(&batch).unwrap();
drop(writer);
assert_eq!(
"\
c1,c2,c3\n\
486f6d6572,53696d70736f6e20,\n\
42617274,53696d70736f6e20,\n\
,,436f6d696320426f6f6b20477579\n\
4e6564,466c616e64657273,\n\
",
String::from_utf8(buf).unwrap()
);
}
}