parquet/arrow/async_writer/
mod.rs1#[cfg(feature = "object_store")]
59mod store;
60#[cfg(feature = "object_store")]
61pub use store::*;
62
63use crate::{
64 arrow::arrow_writer::{ArrowColumnChunk, ArrowColumnWriter, ArrowWriterOptions},
65 arrow::ArrowWriter,
66 errors::{ParquetError, Result},
67 file::{metadata::RowGroupMetaData, properties::WriterProperties},
68 format::{FileMetaData, KeyValue},
69};
70use arrow_array::RecordBatch;
71use arrow_schema::SchemaRef;
72use bytes::Bytes;
73use futures::future::BoxFuture;
74use futures::FutureExt;
75use std::mem;
76use tokio::io::{AsyncWrite, AsyncWriteExt};
77
78pub trait AsyncFileWriter: Send {
80 fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<()>>;
87
88 fn complete(&mut self) -> BoxFuture<'_, Result<()>>;
92}
93
94impl AsyncFileWriter for Box<dyn AsyncFileWriter + '_> {
95 fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<()>> {
96 self.as_mut().write(bs)
97 }
98
99 fn complete(&mut self) -> BoxFuture<'_, Result<()>> {
100 self.as_mut().complete()
101 }
102}
103
104impl<T: AsyncWrite + Unpin + Send> AsyncFileWriter for T {
105 fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<()>> {
106 async move {
107 self.write_all(&bs).await?;
108 Ok(())
109 }
110 .boxed()
111 }
112
113 fn complete(&mut self) -> BoxFuture<'_, Result<()>> {
114 async move {
115 self.flush().await?;
116 self.shutdown().await?;
117 Ok(())
118 }
119 .boxed()
120 }
121}
122
123pub struct AsyncArrowWriter<W> {
152 sync_writer: ArrowWriter<Vec<u8>>,
154
155 async_writer: W,
157}
158
159impl<W: AsyncFileWriter> AsyncArrowWriter<W> {
160 pub fn try_new(
162 writer: W,
163 arrow_schema: SchemaRef,
164 props: Option<WriterProperties>,
165 ) -> Result<Self> {
166 let options = ArrowWriterOptions::new().with_properties(props.unwrap_or_default());
167 Self::try_new_with_options(writer, arrow_schema, options)
168 }
169
170 pub fn try_new_with_options(
172 writer: W,
173 arrow_schema: SchemaRef,
174 options: ArrowWriterOptions,
175 ) -> Result<Self> {
176 let sync_writer = ArrowWriter::try_new_with_options(Vec::new(), arrow_schema, options)?;
177
178 Ok(Self {
179 sync_writer,
180 async_writer: writer,
181 })
182 }
183
184 pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
186 self.sync_writer.flushed_row_groups()
187 }
188
189 pub fn memory_size(&self) -> usize {
193 self.sync_writer.memory_size()
194 }
195
196 pub fn in_progress_size(&self) -> usize {
200 self.sync_writer.in_progress_size()
201 }
202
203 pub fn in_progress_rows(&self) -> usize {
205 self.sync_writer.in_progress_rows()
206 }
207
208 pub fn bytes_written(&self) -> usize {
210 self.sync_writer.bytes_written()
211 }
212
213 pub async fn write(&mut self, batch: &RecordBatch) -> Result<()> {
218 let before = self.sync_writer.flushed_row_groups().len();
219 self.sync_writer.write(batch)?;
220 if before != self.sync_writer.flushed_row_groups().len() {
221 self.do_write().await?;
222 }
223 Ok(())
224 }
225
226 pub async fn flush(&mut self) -> Result<()> {
228 self.sync_writer.flush()?;
229 self.do_write().await?;
230
231 Ok(())
232 }
233
234 pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
238 self.sync_writer.append_key_value_metadata(kv_metadata);
239 }
240
241 pub async fn finish(&mut self) -> Result<FileMetaData> {
249 let metadata = self.sync_writer.finish()?;
250
251 self.do_write().await?;
253 self.async_writer.complete().await?;
254
255 Ok(metadata)
256 }
257
258 pub async fn close(mut self) -> Result<FileMetaData> {
262 self.finish().await
263 }
264
265 pub fn into_inner(self) -> W {
272 self.async_writer
273 }
274
275 async fn do_write(&mut self) -> Result<()> {
282 let buffer = mem::take(self.sync_writer.inner_mut());
283
284 self.async_writer
285 .write(Bytes::from(buffer))
286 .await
287 .map_err(|e| ParquetError::External(Box::new(e)))?;
288
289 Ok(())
290 }
291
292 pub async fn get_column_writers(&mut self) -> Result<Vec<ArrowColumnWriter>> {
294 let before = self.sync_writer.flushed_row_groups().len();
295 let writers = self.sync_writer.get_column_writers()?;
296 if before != self.sync_writer.flushed_row_groups().len() {
297 self.do_write().await?;
298 }
299 Ok(writers)
300 }
301
302 pub async fn append_row_group(&mut self, chunks: Vec<ArrowColumnChunk>) -> Result<()> {
304 self.sync_writer.append_row_group(chunks)?;
305 self.do_write().await
306 }
307}
308
309#[cfg(test)]
310mod tests {
311 use arrow::datatypes::{DataType, Field, Schema};
312 use arrow_array::{ArrayRef, BinaryArray, Int32Array, Int64Array, RecordBatchReader};
313 use bytes::Bytes;
314 use std::sync::Arc;
315
316 use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
317 use crate::arrow::arrow_writer::compute_leaves;
318
319 use super::*;
320
321 fn get_test_reader() -> ParquetRecordBatchReader {
322 let testdata = arrow::util::test_util::parquet_test_data();
323 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
325 let original_data = Bytes::from(std::fs::read(path).unwrap());
326 ParquetRecordBatchReaderBuilder::try_new(original_data)
327 .unwrap()
328 .build()
329 .unwrap()
330 }
331
332 #[tokio::test]
333 async fn test_async_writer() {
334 let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
335 let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
336
337 let mut buffer = Vec::new();
338 let mut writer = AsyncArrowWriter::try_new(&mut buffer, to_write.schema(), None).unwrap();
339 writer.write(&to_write).await.unwrap();
340 writer.close().await.unwrap();
341
342 let buffer = Bytes::from(buffer);
343 let mut reader = ParquetRecordBatchReaderBuilder::try_new(buffer)
344 .unwrap()
345 .build()
346 .unwrap();
347 let read = reader.next().unwrap().unwrap();
348
349 assert_eq!(to_write, read);
350 }
351
352 #[tokio::test]
353 async fn test_async_arrow_group_writer() {
354 let col = Arc::new(Int64Array::from_iter_values([4, 5, 6])) as ArrayRef;
355 let to_write_record = RecordBatch::try_from_iter([("col", col)]).unwrap();
356
357 let mut buffer = Vec::new();
358 let mut writer =
359 AsyncArrowWriter::try_new(&mut buffer, to_write_record.schema(), None).unwrap();
360
361 writer.write(&to_write_record).await.unwrap();
363
364 let mut writers = writer.get_column_writers().await.unwrap();
365 let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
366 let to_write_arrow_group = RecordBatch::try_from_iter([("col", col)]).unwrap();
367
368 for (field, column) in to_write_arrow_group
369 .schema()
370 .fields()
371 .iter()
372 .zip(to_write_arrow_group.columns())
373 {
374 for leaf in compute_leaves(field.as_ref(), column).unwrap() {
375 writers[0].write(&leaf).unwrap();
376 }
377 }
378
379 let columns: Vec<_> = writers.into_iter().map(|w| w.close().unwrap()).collect();
380 writer.append_row_group(columns).await.unwrap();
382 writer.close().await.unwrap();
383
384 let buffer = Bytes::from(buffer);
385 let mut reader = ParquetRecordBatchReaderBuilder::try_new(buffer)
386 .unwrap()
387 .build()
388 .unwrap();
389
390 let col = Arc::new(Int64Array::from_iter_values([4, 5, 6, 1, 2, 3])) as ArrayRef;
391 let expected = RecordBatch::try_from_iter([("col", col)]).unwrap();
392
393 let read = reader.next().unwrap().unwrap();
394 assert_eq!(expected, read);
395 }
396
397 #[tokio::test]
400 async fn test_async_writer_with_sync_writer() {
401 let reader = get_test_reader();
402
403 let write_props = WriterProperties::builder()
404 .set_max_row_group_size(64)
405 .build();
406
407 let mut async_buffer = Vec::new();
408 let mut async_writer = AsyncArrowWriter::try_new(
409 &mut async_buffer,
410 reader.schema(),
411 Some(write_props.clone()),
412 )
413 .unwrap();
414
415 let mut sync_buffer = Vec::new();
416 let mut sync_writer =
417 ArrowWriter::try_new(&mut sync_buffer, reader.schema(), Some(write_props)).unwrap();
418 for record_batch in reader {
419 let record_batch = record_batch.unwrap();
420 async_writer.write(&record_batch).await.unwrap();
421 sync_writer.write(&record_batch).unwrap();
422 }
423 sync_writer.close().unwrap();
424 async_writer.close().await.unwrap();
425
426 assert_eq!(sync_buffer, async_buffer);
427 }
428
429 #[tokio::test]
430 async fn test_async_writer_bytes_written() {
431 let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
432 let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
433
434 let temp = tempfile::tempfile().unwrap();
435
436 let file = tokio::fs::File::from_std(temp.try_clone().unwrap());
437 let mut writer =
438 AsyncArrowWriter::try_new(file.try_clone().await.unwrap(), to_write.schema(), None)
439 .unwrap();
440 writer.write(&to_write).await.unwrap();
441 let _metadata = writer.finish().await.unwrap();
442 let reported = writer.bytes_written();
444
445 let actual = file.metadata().await.unwrap().len() as usize;
447
448 assert_eq!(reported, actual);
449 }
450
451 #[tokio::test]
452 async fn test_async_writer_file() {
453 let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
454 let col2 = Arc::new(BinaryArray::from_iter_values(vec![
455 vec![0; 500000],
456 vec![0; 500000],
457 vec![0; 500000],
458 ])) as ArrayRef;
459 let to_write = RecordBatch::try_from_iter([("col", col), ("col2", col2)]).unwrap();
460
461 let temp = tempfile::tempfile().unwrap();
462
463 let file = tokio::fs::File::from_std(temp.try_clone().unwrap());
464 let mut writer = AsyncArrowWriter::try_new(file, to_write.schema(), None).unwrap();
465 writer.write(&to_write).await.unwrap();
466 writer.close().await.unwrap();
467
468 let mut reader = ParquetRecordBatchReaderBuilder::try_new(temp)
469 .unwrap()
470 .build()
471 .unwrap();
472 let read = reader.next().unwrap().unwrap();
473
474 assert_eq!(to_write, read);
475 }
476
477 #[tokio::test]
478 async fn in_progress_accounting() {
479 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
481
482 let a = Int32Array::from_value(0_i32, 512);
484
485 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
487
488 let temp = tempfile::tempfile().unwrap();
489 let file = tokio::fs::File::from_std(temp.try_clone().unwrap());
490 let mut writer = AsyncArrowWriter::try_new(file, batch.schema(), None).unwrap();
491
492 assert_eq!(writer.in_progress_size(), 0);
494 assert_eq!(writer.in_progress_rows(), 0);
495 assert_eq!(writer.bytes_written(), 4); writer.write(&batch).await.unwrap();
497
498 let initial_size = writer.in_progress_size();
500 assert!(initial_size > 0);
501 assert_eq!(writer.in_progress_rows(), batch.num_rows());
502 let initial_memory = writer.memory_size();
503 assert!(
505 initial_size <= initial_memory,
506 "{initial_size} <= {initial_memory}"
507 );
508
509 writer.write(&batch).await.unwrap();
511 assert!(writer.in_progress_size() > initial_size);
512 assert_eq!(writer.in_progress_rows(), batch.num_rows() * 2);
513 assert!(writer.memory_size() > initial_memory);
514 assert!(
515 writer.in_progress_size() <= writer.memory_size(),
516 "in_progress_size {} <= memory_size {}",
517 writer.in_progress_size(),
518 writer.memory_size()
519 );
520
521 let pre_flush_bytes_written = writer.bytes_written();
523 writer.flush().await.unwrap();
524 assert_eq!(writer.in_progress_size(), 0);
525 assert_eq!(writer.memory_size(), 0);
526 assert_eq!(writer.in_progress_rows(), 0);
527 assert!(writer.bytes_written() > pre_flush_bytes_written);
528
529 writer.close().await.unwrap();
530 }
531}