parquet/arrow/async_writer/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains async writer which writes arrow data into parquet data.
19//!
20//! Provides `async` API for writing [`RecordBatch`]es as parquet files. The API is
21//! similar to the [`sync` API](crate::arrow::arrow_writer::ArrowWriter), so please
22//! read the documentation there before using this API.
23//!
24//! Here is an example for using [`AsyncArrowWriter`]:
25//!
26//! ```
27//! # #[tokio::main(flavor="current_thread")]
28//! # async fn main() {
29//! #
30//! # use std::sync::Arc;
31//! # use arrow_array::{ArrayRef, Int64Array, RecordBatch, RecordBatchReader};
32//! # use bytes::Bytes;
33//! # use parquet::arrow::{AsyncArrowWriter, arrow_reader::ParquetRecordBatchReaderBuilder};
34//! #
35//! let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
36//! let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
37//!
38//! let mut buffer = Vec::new();
39//! let mut writer = AsyncArrowWriter::try_new(&mut buffer, to_write.schema(), None).unwrap();
40//! writer.write(&to_write).await.unwrap();
41//! writer.close().await.unwrap();
42//!
43//! let buffer = Bytes::from(buffer);
44//! let mut reader = ParquetRecordBatchReaderBuilder::try_new(buffer.clone())
45//!     .unwrap()
46//!     .build()
47//!     .unwrap();
48//! let read = reader.next().unwrap().unwrap();
49//!
50//! assert_eq!(to_write, read);
51//! # }
52//! ```
53//!
54//! [`object_store`] provides it's native implementation of [`AsyncFileWriter`] by [`ParquetObjectWriter`].
55
56#[cfg(feature = "object_store")]
57mod store;
58#[cfg(feature = "object_store")]
59pub use store::*;
60
61use crate::{
62    arrow::arrow_writer::ArrowWriterOptions,
63    arrow::ArrowWriter,
64    errors::{ParquetError, Result},
65    file::{metadata::RowGroupMetaData, properties::WriterProperties},
66    format::{FileMetaData, KeyValue},
67};
68use arrow_array::RecordBatch;
69use arrow_schema::SchemaRef;
70use bytes::Bytes;
71use futures::future::BoxFuture;
72use futures::FutureExt;
73use std::mem;
74use tokio::io::{AsyncWrite, AsyncWriteExt};
75
76/// The asynchronous interface used by [`AsyncArrowWriter`] to write parquet files.
77pub trait AsyncFileWriter: Send {
78    /// Write the provided bytes to the underlying writer
79    ///
80    /// The underlying writer CAN decide to buffer the data or write it immediately.
81    /// This design allows the writer implementer to control the buffering and I/O scheduling.
82    ///
83    /// The underlying writer MAY implement retry logic to prevent breaking users write process.
84    fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<()>>;
85
86    /// Flush any buffered data to the underlying writer and finish writing process.
87    ///
88    /// After `complete` returns `Ok(())`, caller SHOULD not call write again.
89    fn complete(&mut self) -> BoxFuture<'_, Result<()>>;
90}
91
92impl AsyncFileWriter for Box<dyn AsyncFileWriter + '_> {
93    fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<()>> {
94        self.as_mut().write(bs)
95    }
96
97    fn complete(&mut self) -> BoxFuture<'_, Result<()>> {
98        self.as_mut().complete()
99    }
100}
101
102impl<T: AsyncWrite + Unpin + Send> AsyncFileWriter for T {
103    fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<()>> {
104        async move {
105            self.write_all(&bs).await?;
106            Ok(())
107        }
108        .boxed()
109    }
110
111    fn complete(&mut self) -> BoxFuture<'_, Result<()>> {
112        async move {
113            self.flush().await?;
114            self.shutdown().await?;
115            Ok(())
116        }
117        .boxed()
118    }
119}
120
121/// Encodes [`RecordBatch`] to parquet, outputting to an [`AsyncFileWriter`]
122///
123/// ## Memory Usage
124///
125/// This writer eagerly writes data as soon as possible to the underlying [`AsyncFileWriter`],
126/// permitting fine-grained control over buffering and I/O scheduling. However, the columnar
127/// nature of parquet forces data for an entire row group to be buffered in memory, before
128/// it can be flushed. Depending on the data and the configured row group size, this buffering
129/// may be substantial.
130///
131/// Memory usage can be limited by calling [`Self::flush`] to flush the in progress row group,
132/// although this will likely increase overall file size and reduce query performance.
133/// See [ArrowWriter] for more information.
134///
135/// ```no_run
136/// # use tokio::fs::File;
137/// # use arrow_array::RecordBatch;
138/// # use parquet::arrow::AsyncArrowWriter;
139/// # async fn test() {
140/// let mut writer: AsyncArrowWriter<File> = todo!();
141/// let batch: RecordBatch = todo!();
142/// writer.write(&batch).await.unwrap();
143/// // Trigger an early flush if buffered size exceeds 1_000_000
144/// if writer.in_progress_size() > 1_000_000 {
145///     writer.flush().await.unwrap()
146/// }
147/// # }
148/// ```
149pub struct AsyncArrowWriter<W> {
150    /// Underlying sync writer
151    sync_writer: ArrowWriter<Vec<u8>>,
152
153    /// Async writer provided by caller
154    async_writer: W,
155}
156
157impl<W: AsyncFileWriter> AsyncArrowWriter<W> {
158    /// Try to create a new Async Arrow Writer
159    pub fn try_new(
160        writer: W,
161        arrow_schema: SchemaRef,
162        props: Option<WriterProperties>,
163    ) -> Result<Self> {
164        let options = ArrowWriterOptions::new().with_properties(props.unwrap_or_default());
165        Self::try_new_with_options(writer, arrow_schema, options)
166    }
167
168    /// Try to create a new Async Arrow Writer with [`ArrowWriterOptions`]
169    pub fn try_new_with_options(
170        writer: W,
171        arrow_schema: SchemaRef,
172        options: ArrowWriterOptions,
173    ) -> Result<Self> {
174        let sync_writer = ArrowWriter::try_new_with_options(Vec::new(), arrow_schema, options)?;
175
176        Ok(Self {
177            sync_writer,
178            async_writer: writer,
179        })
180    }
181
182    /// Returns metadata for any flushed row groups
183    pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
184        self.sync_writer.flushed_row_groups()
185    }
186
187    /// Estimated memory usage, in bytes, of this `ArrowWriter`
188    ///
189    /// See [ArrowWriter::memory_size] for more information.
190    pub fn memory_size(&self) -> usize {
191        self.sync_writer.memory_size()
192    }
193
194    /// Anticipated encoded size of the in progress row group.
195    ///
196    /// See [ArrowWriter::memory_size] for more information.
197    pub fn in_progress_size(&self) -> usize {
198        self.sync_writer.in_progress_size()
199    }
200
201    /// Returns the number of rows buffered in the in progress row group
202    pub fn in_progress_rows(&self) -> usize {
203        self.sync_writer.in_progress_rows()
204    }
205
206    /// Returns the number of bytes written by this instance
207    pub fn bytes_written(&self) -> usize {
208        self.sync_writer.bytes_written()
209    }
210
211    /// Enqueues the provided `RecordBatch` to be written
212    ///
213    /// After every sync write by the inner [ArrowWriter], the inner buffer will be
214    /// checked and flush if at least half full
215    pub async fn write(&mut self, batch: &RecordBatch) -> Result<()> {
216        let before = self.sync_writer.flushed_row_groups().len();
217        self.sync_writer.write(batch)?;
218        if before != self.sync_writer.flushed_row_groups().len() {
219            self.do_write().await?;
220        }
221        Ok(())
222    }
223
224    /// Flushes all buffered rows into a new row group
225    pub async fn flush(&mut self) -> Result<()> {
226        self.sync_writer.flush()?;
227        self.do_write().await?;
228
229        Ok(())
230    }
231
232    /// Append [`KeyValue`] metadata in addition to those in [`WriterProperties`]
233    ///
234    /// This method allows to append metadata after [`RecordBatch`]es are written.
235    pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
236        self.sync_writer.append_key_value_metadata(kv_metadata);
237    }
238
239    /// Close and finalize the writer.
240    ///
241    /// All the data in the inner buffer will be force flushed.
242    ///
243    /// Unlike [`Self::close`] this does not consume self
244    ///
245    /// Attempting to write after calling finish will result in an error
246    pub async fn finish(&mut self) -> Result<FileMetaData> {
247        let metadata = self.sync_writer.finish()?;
248
249        // Force to flush the remaining data.
250        self.do_write().await?;
251        self.async_writer.complete().await?;
252
253        Ok(metadata)
254    }
255
256    /// Close and finalize the writer.
257    ///
258    /// All the data in the inner buffer will be force flushed.
259    pub async fn close(mut self) -> Result<FileMetaData> {
260        self.finish().await
261    }
262
263    /// Flush the data written by `sync_writer` into the `async_writer`
264    ///
265    /// # Notes
266    ///
267    /// This method will take the inner buffer from the `sync_writer` and write it into the
268    /// async writer. After the write, the inner buffer will be empty.
269    async fn do_write(&mut self) -> Result<()> {
270        let buffer = mem::take(self.sync_writer.inner_mut());
271
272        self.async_writer
273            .write(Bytes::from(buffer))
274            .await
275            .map_err(|e| ParquetError::External(Box::new(e)))?;
276
277        Ok(())
278    }
279}
280
281#[cfg(test)]
282mod tests {
283    use arrow::datatypes::{DataType, Field, Schema};
284    use arrow_array::{ArrayRef, BinaryArray, Int32Array, Int64Array, RecordBatchReader};
285    use bytes::Bytes;
286    use std::sync::Arc;
287    use tokio::pin;
288
289    use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
290
291    use super::*;
292
293    fn get_test_reader() -> ParquetRecordBatchReader {
294        let testdata = arrow::util::test_util::parquet_test_data();
295        // This test file is large enough to generate multiple row groups.
296        let path = format!("{}/alltypes_tiny_pages_plain.parquet", testdata);
297        let original_data = Bytes::from(std::fs::read(path).unwrap());
298        ParquetRecordBatchReaderBuilder::try_new(original_data)
299            .unwrap()
300            .build()
301            .unwrap()
302    }
303
304    #[tokio::test]
305    async fn test_async_writer() {
306        let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
307        let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
308
309        let mut buffer = Vec::new();
310        let mut writer = AsyncArrowWriter::try_new(&mut buffer, to_write.schema(), None).unwrap();
311        writer.write(&to_write).await.unwrap();
312        writer.close().await.unwrap();
313
314        let buffer = Bytes::from(buffer);
315        let mut reader = ParquetRecordBatchReaderBuilder::try_new(buffer)
316            .unwrap()
317            .build()
318            .unwrap();
319        let read = reader.next().unwrap().unwrap();
320
321        assert_eq!(to_write, read);
322    }
323
324    // Read the data from the test file and write it by the async writer and sync writer.
325    // And then compares the results of the two writers.
326    #[tokio::test]
327    async fn test_async_writer_with_sync_writer() {
328        let reader = get_test_reader();
329
330        let write_props = WriterProperties::builder()
331            .set_max_row_group_size(64)
332            .build();
333
334        let mut async_buffer = Vec::new();
335        let mut async_writer = AsyncArrowWriter::try_new(
336            &mut async_buffer,
337            reader.schema(),
338            Some(write_props.clone()),
339        )
340        .unwrap();
341
342        let mut sync_buffer = Vec::new();
343        let mut sync_writer =
344            ArrowWriter::try_new(&mut sync_buffer, reader.schema(), Some(write_props)).unwrap();
345        for record_batch in reader {
346            let record_batch = record_batch.unwrap();
347            async_writer.write(&record_batch).await.unwrap();
348            sync_writer.write(&record_batch).unwrap();
349        }
350        sync_writer.close().unwrap();
351        async_writer.close().await.unwrap();
352
353        assert_eq!(sync_buffer, async_buffer);
354    }
355
356    struct TestAsyncSink {
357        sink: Vec<u8>,
358        min_accept_bytes: usize,
359        expect_total_bytes: usize,
360    }
361
362    impl AsyncWrite for TestAsyncSink {
363        fn poll_write(
364            self: std::pin::Pin<&mut Self>,
365            cx: &mut std::task::Context<'_>,
366            buf: &[u8],
367        ) -> std::task::Poll<std::result::Result<usize, std::io::Error>> {
368            let written_bytes = self.sink.len();
369            if written_bytes + buf.len() < self.expect_total_bytes {
370                assert!(buf.len() >= self.min_accept_bytes);
371            } else {
372                assert_eq!(written_bytes + buf.len(), self.expect_total_bytes);
373            }
374
375            let sink = &mut self.get_mut().sink;
376            pin!(sink);
377            sink.poll_write(cx, buf)
378        }
379
380        fn poll_flush(
381            self: std::pin::Pin<&mut Self>,
382            cx: &mut std::task::Context<'_>,
383        ) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
384            let sink = &mut self.get_mut().sink;
385            pin!(sink);
386            sink.poll_flush(cx)
387        }
388
389        fn poll_shutdown(
390            self: std::pin::Pin<&mut Self>,
391            cx: &mut std::task::Context<'_>,
392        ) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
393            let sink = &mut self.get_mut().sink;
394            pin!(sink);
395            sink.poll_shutdown(cx)
396        }
397    }
398
399    #[tokio::test]
400    async fn test_async_writer_bytes_written() {
401        let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
402        let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
403
404        let temp = tempfile::tempfile().unwrap();
405
406        let file = tokio::fs::File::from_std(temp.try_clone().unwrap());
407        let mut writer =
408            AsyncArrowWriter::try_new(file.try_clone().await.unwrap(), to_write.schema(), None)
409                .unwrap();
410        writer.write(&to_write).await.unwrap();
411        let _metadata = writer.finish().await.unwrap();
412        // After `finish` this should include the metadata and footer
413        let reported = writer.bytes_written();
414
415        // Get actual size from file metadata
416        let actual = file.metadata().await.unwrap().len() as usize;
417
418        assert_eq!(reported, actual);
419    }
420
421    #[tokio::test]
422    async fn test_async_writer_file() {
423        let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
424        let col2 = Arc::new(BinaryArray::from_iter_values(vec![
425            vec![0; 500000],
426            vec![0; 500000],
427            vec![0; 500000],
428        ])) as ArrayRef;
429        let to_write = RecordBatch::try_from_iter([("col", col), ("col2", col2)]).unwrap();
430
431        let temp = tempfile::tempfile().unwrap();
432
433        let file = tokio::fs::File::from_std(temp.try_clone().unwrap());
434        let mut writer = AsyncArrowWriter::try_new(file, to_write.schema(), None).unwrap();
435        writer.write(&to_write).await.unwrap();
436        writer.close().await.unwrap();
437
438        let mut reader = ParquetRecordBatchReaderBuilder::try_new(temp)
439            .unwrap()
440            .build()
441            .unwrap();
442        let read = reader.next().unwrap().unwrap();
443
444        assert_eq!(to_write, read);
445    }
446
447    #[tokio::test]
448    async fn in_progress_accounting() {
449        // define schema
450        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
451
452        // create some data
453        let a = Int32Array::from_value(0_i32, 512);
454
455        // build a record batch
456        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
457
458        let temp = tempfile::tempfile().unwrap();
459        let file = tokio::fs::File::from_std(temp.try_clone().unwrap());
460        let mut writer = AsyncArrowWriter::try_new(file, batch.schema(), None).unwrap();
461
462        // starts empty
463        assert_eq!(writer.in_progress_size(), 0);
464        assert_eq!(writer.in_progress_rows(), 0);
465        assert_eq!(writer.bytes_written(), 4); // Initial Parquet header
466        writer.write(&batch).await.unwrap();
467
468        // updated on write
469        let initial_size = writer.in_progress_size();
470        assert!(initial_size > 0);
471        assert_eq!(writer.in_progress_rows(), batch.num_rows());
472        let initial_memory = writer.memory_size();
473        // memory estimate is larger than estimated encoded size
474        assert!(
475            initial_size <= initial_memory,
476            "{initial_size} <= {initial_memory}"
477        );
478
479        // updated on second write
480        writer.write(&batch).await.unwrap();
481        assert!(writer.in_progress_size() > initial_size);
482        assert_eq!(writer.in_progress_rows(), batch.num_rows() * 2);
483        assert!(writer.memory_size() > initial_memory);
484        assert!(
485            writer.in_progress_size() <= writer.memory_size(),
486            "in_progress_size {} <= memory_size {}",
487            writer.in_progress_size(),
488            writer.memory_size()
489        );
490
491        // in progress tracking is cleared, but the overall data written is updated
492        let pre_flush_bytes_written = writer.bytes_written();
493        writer.flush().await.unwrap();
494        assert_eq!(writer.in_progress_size(), 0);
495        assert_eq!(writer.memory_size(), 0);
496        assert_eq!(writer.in_progress_rows(), 0);
497        assert!(writer.bytes_written() > pre_flush_bytes_written);
498
499        writer.close().await.unwrap();
500    }
501}