parquet/arrow/async_writer/
mod.rs1#[cfg(feature = "object_store")]
57mod store;
58#[cfg(feature = "object_store")]
59pub use store::*;
60
61use crate::{
62 arrow::arrow_writer::ArrowWriterOptions,
63 arrow::ArrowWriter,
64 errors::{ParquetError, Result},
65 file::{metadata::RowGroupMetaData, properties::WriterProperties},
66 format::{FileMetaData, KeyValue},
67};
68use arrow_array::RecordBatch;
69use arrow_schema::SchemaRef;
70use bytes::Bytes;
71use futures::future::BoxFuture;
72use futures::FutureExt;
73use std::mem;
74use tokio::io::{AsyncWrite, AsyncWriteExt};
75
76pub trait AsyncFileWriter: Send {
78 fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<()>>;
85
86 fn complete(&mut self) -> BoxFuture<'_, Result<()>>;
90}
91
92impl AsyncFileWriter for Box<dyn AsyncFileWriter + '_> {
93 fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<()>> {
94 self.as_mut().write(bs)
95 }
96
97 fn complete(&mut self) -> BoxFuture<'_, Result<()>> {
98 self.as_mut().complete()
99 }
100}
101
102impl<T: AsyncWrite + Unpin + Send> AsyncFileWriter for T {
103 fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<()>> {
104 async move {
105 self.write_all(&bs).await?;
106 Ok(())
107 }
108 .boxed()
109 }
110
111 fn complete(&mut self) -> BoxFuture<'_, Result<()>> {
112 async move {
113 self.flush().await?;
114 self.shutdown().await?;
115 Ok(())
116 }
117 .boxed()
118 }
119}
120
121pub struct AsyncArrowWriter<W> {
150 sync_writer: ArrowWriter<Vec<u8>>,
152
153 async_writer: W,
155}
156
157impl<W: AsyncFileWriter> AsyncArrowWriter<W> {
158 pub fn try_new(
160 writer: W,
161 arrow_schema: SchemaRef,
162 props: Option<WriterProperties>,
163 ) -> Result<Self> {
164 let options = ArrowWriterOptions::new().with_properties(props.unwrap_or_default());
165 Self::try_new_with_options(writer, arrow_schema, options)
166 }
167
168 pub fn try_new_with_options(
170 writer: W,
171 arrow_schema: SchemaRef,
172 options: ArrowWriterOptions,
173 ) -> Result<Self> {
174 let sync_writer = ArrowWriter::try_new_with_options(Vec::new(), arrow_schema, options)?;
175
176 Ok(Self {
177 sync_writer,
178 async_writer: writer,
179 })
180 }
181
182 pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
184 self.sync_writer.flushed_row_groups()
185 }
186
187 pub fn memory_size(&self) -> usize {
191 self.sync_writer.memory_size()
192 }
193
194 pub fn in_progress_size(&self) -> usize {
198 self.sync_writer.in_progress_size()
199 }
200
201 pub fn in_progress_rows(&self) -> usize {
203 self.sync_writer.in_progress_rows()
204 }
205
206 pub fn bytes_written(&self) -> usize {
208 self.sync_writer.bytes_written()
209 }
210
211 pub async fn write(&mut self, batch: &RecordBatch) -> Result<()> {
216 let before = self.sync_writer.flushed_row_groups().len();
217 self.sync_writer.write(batch)?;
218 if before != self.sync_writer.flushed_row_groups().len() {
219 self.do_write().await?;
220 }
221 Ok(())
222 }
223
224 pub async fn flush(&mut self) -> Result<()> {
226 self.sync_writer.flush()?;
227 self.do_write().await?;
228
229 Ok(())
230 }
231
232 pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
236 self.sync_writer.append_key_value_metadata(kv_metadata);
237 }
238
239 pub async fn finish(&mut self) -> Result<FileMetaData> {
247 let metadata = self.sync_writer.finish()?;
248
249 self.do_write().await?;
251 self.async_writer.complete().await?;
252
253 Ok(metadata)
254 }
255
256 pub async fn close(mut self) -> Result<FileMetaData> {
260 self.finish().await
261 }
262
263 async fn do_write(&mut self) -> Result<()> {
270 let buffer = mem::take(self.sync_writer.inner_mut());
271
272 self.async_writer
273 .write(Bytes::from(buffer))
274 .await
275 .map_err(|e| ParquetError::External(Box::new(e)))?;
276
277 Ok(())
278 }
279}
280
281#[cfg(test)]
282mod tests {
283 use arrow::datatypes::{DataType, Field, Schema};
284 use arrow_array::{ArrayRef, BinaryArray, Int32Array, Int64Array, RecordBatchReader};
285 use bytes::Bytes;
286 use std::sync::Arc;
287 use tokio::pin;
288
289 use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
290
291 use super::*;
292
293 fn get_test_reader() -> ParquetRecordBatchReader {
294 let testdata = arrow::util::test_util::parquet_test_data();
295 let path = format!("{}/alltypes_tiny_pages_plain.parquet", testdata);
297 let original_data = Bytes::from(std::fs::read(path).unwrap());
298 ParquetRecordBatchReaderBuilder::try_new(original_data)
299 .unwrap()
300 .build()
301 .unwrap()
302 }
303
304 #[tokio::test]
305 async fn test_async_writer() {
306 let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
307 let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
308
309 let mut buffer = Vec::new();
310 let mut writer = AsyncArrowWriter::try_new(&mut buffer, to_write.schema(), None).unwrap();
311 writer.write(&to_write).await.unwrap();
312 writer.close().await.unwrap();
313
314 let buffer = Bytes::from(buffer);
315 let mut reader = ParquetRecordBatchReaderBuilder::try_new(buffer)
316 .unwrap()
317 .build()
318 .unwrap();
319 let read = reader.next().unwrap().unwrap();
320
321 assert_eq!(to_write, read);
322 }
323
324 #[tokio::test]
327 async fn test_async_writer_with_sync_writer() {
328 let reader = get_test_reader();
329
330 let write_props = WriterProperties::builder()
331 .set_max_row_group_size(64)
332 .build();
333
334 let mut async_buffer = Vec::new();
335 let mut async_writer = AsyncArrowWriter::try_new(
336 &mut async_buffer,
337 reader.schema(),
338 Some(write_props.clone()),
339 )
340 .unwrap();
341
342 let mut sync_buffer = Vec::new();
343 let mut sync_writer =
344 ArrowWriter::try_new(&mut sync_buffer, reader.schema(), Some(write_props)).unwrap();
345 for record_batch in reader {
346 let record_batch = record_batch.unwrap();
347 async_writer.write(&record_batch).await.unwrap();
348 sync_writer.write(&record_batch).unwrap();
349 }
350 sync_writer.close().unwrap();
351 async_writer.close().await.unwrap();
352
353 assert_eq!(sync_buffer, async_buffer);
354 }
355
356 struct TestAsyncSink {
357 sink: Vec<u8>,
358 min_accept_bytes: usize,
359 expect_total_bytes: usize,
360 }
361
362 impl AsyncWrite for TestAsyncSink {
363 fn poll_write(
364 self: std::pin::Pin<&mut Self>,
365 cx: &mut std::task::Context<'_>,
366 buf: &[u8],
367 ) -> std::task::Poll<std::result::Result<usize, std::io::Error>> {
368 let written_bytes = self.sink.len();
369 if written_bytes + buf.len() < self.expect_total_bytes {
370 assert!(buf.len() >= self.min_accept_bytes);
371 } else {
372 assert_eq!(written_bytes + buf.len(), self.expect_total_bytes);
373 }
374
375 let sink = &mut self.get_mut().sink;
376 pin!(sink);
377 sink.poll_write(cx, buf)
378 }
379
380 fn poll_flush(
381 self: std::pin::Pin<&mut Self>,
382 cx: &mut std::task::Context<'_>,
383 ) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
384 let sink = &mut self.get_mut().sink;
385 pin!(sink);
386 sink.poll_flush(cx)
387 }
388
389 fn poll_shutdown(
390 self: std::pin::Pin<&mut Self>,
391 cx: &mut std::task::Context<'_>,
392 ) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
393 let sink = &mut self.get_mut().sink;
394 pin!(sink);
395 sink.poll_shutdown(cx)
396 }
397 }
398
399 #[tokio::test]
400 async fn test_async_writer_bytes_written() {
401 let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
402 let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
403
404 let temp = tempfile::tempfile().unwrap();
405
406 let file = tokio::fs::File::from_std(temp.try_clone().unwrap());
407 let mut writer =
408 AsyncArrowWriter::try_new(file.try_clone().await.unwrap(), to_write.schema(), None)
409 .unwrap();
410 writer.write(&to_write).await.unwrap();
411 let _metadata = writer.finish().await.unwrap();
412 let reported = writer.bytes_written();
414
415 let actual = file.metadata().await.unwrap().len() as usize;
417
418 assert_eq!(reported, actual);
419 }
420
421 #[tokio::test]
422 async fn test_async_writer_file() {
423 let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
424 let col2 = Arc::new(BinaryArray::from_iter_values(vec![
425 vec![0; 500000],
426 vec![0; 500000],
427 vec![0; 500000],
428 ])) as ArrayRef;
429 let to_write = RecordBatch::try_from_iter([("col", col), ("col2", col2)]).unwrap();
430
431 let temp = tempfile::tempfile().unwrap();
432
433 let file = tokio::fs::File::from_std(temp.try_clone().unwrap());
434 let mut writer = AsyncArrowWriter::try_new(file, to_write.schema(), None).unwrap();
435 writer.write(&to_write).await.unwrap();
436 writer.close().await.unwrap();
437
438 let mut reader = ParquetRecordBatchReaderBuilder::try_new(temp)
439 .unwrap()
440 .build()
441 .unwrap();
442 let read = reader.next().unwrap().unwrap();
443
444 assert_eq!(to_write, read);
445 }
446
447 #[tokio::test]
448 async fn in_progress_accounting() {
449 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
451
452 let a = Int32Array::from_value(0_i32, 512);
454
455 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
457
458 let temp = tempfile::tempfile().unwrap();
459 let file = tokio::fs::File::from_std(temp.try_clone().unwrap());
460 let mut writer = AsyncArrowWriter::try_new(file, batch.schema(), None).unwrap();
461
462 assert_eq!(writer.in_progress_size(), 0);
464 assert_eq!(writer.in_progress_rows(), 0);
465 assert_eq!(writer.bytes_written(), 4); writer.write(&batch).await.unwrap();
467
468 let initial_size = writer.in_progress_size();
470 assert!(initial_size > 0);
471 assert_eq!(writer.in_progress_rows(), batch.num_rows());
472 let initial_memory = writer.memory_size();
473 assert!(
475 initial_size <= initial_memory,
476 "{initial_size} <= {initial_memory}"
477 );
478
479 writer.write(&batch).await.unwrap();
481 assert!(writer.in_progress_size() > initial_size);
482 assert_eq!(writer.in_progress_rows(), batch.num_rows() * 2);
483 assert!(writer.memory_size() > initial_memory);
484 assert!(
485 writer.in_progress_size() <= writer.memory_size(),
486 "in_progress_size {} <= memory_size {}",
487 writer.in_progress_size(),
488 writer.memory_size()
489 );
490
491 let pre_flush_bytes_written = writer.bytes_written();
493 writer.flush().await.unwrap();
494 assert_eq!(writer.in_progress_size(), 0);
495 assert_eq!(writer.memory_size(), 0);
496 assert_eq!(writer.in_progress_rows(), 0);
497 assert!(writer.bytes_written() > pre_flush_bytes_written);
498
499 writer.close().await.unwrap();
500 }
501}