parquet/arrow/async_writer/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! `async` API for writing [`RecordBatch`]es to Parquet files
19//!
20//! See the [crate-level documentation](crate) for more details.
21//!
22//! The `async` API for writing [`RecordBatch`]es is
23//! similar to the [`sync` API](ArrowWriter), so please
24//! read the documentation there before using this API.
25//!
26//! Here is an example for using [`AsyncArrowWriter`]:
27//!
28//! ```
29//! # #[tokio::main(flavor="current_thread")]
30//! # async fn main() {
31//! #
32//! # use std::sync::Arc;
33//! # use arrow_array::{ArrayRef, Int64Array, RecordBatch, RecordBatchReader};
34//! # use bytes::Bytes;
35//! # use parquet::arrow::{AsyncArrowWriter, arrow_reader::ParquetRecordBatchReaderBuilder};
36//! #
37//! let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
38//! let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
39//!
40//! let mut buffer = Vec::new();
41//! let mut writer = AsyncArrowWriter::try_new(&mut buffer, to_write.schema(), None).unwrap();
42//! writer.write(&to_write).await.unwrap();
43//! writer.close().await.unwrap();
44//!
45//! let buffer = Bytes::from(buffer);
46//! let mut reader = ParquetRecordBatchReaderBuilder::try_new(buffer.clone())
47//!     .unwrap()
48//!     .build()
49//!     .unwrap();
50//! let read = reader.next().unwrap().unwrap();
51//!
52//! assert_eq!(to_write, read);
53//! # }
54//! ```
55//!
56//! [`object_store`] provides it's native implementation of [`AsyncFileWriter`] by [`ParquetObjectWriter`].
57
58#[cfg(feature = "object_store")]
59mod store;
60#[cfg(feature = "object_store")]
61pub use store::*;
62
63use crate::{
64    arrow::arrow_writer::{ArrowColumnChunk, ArrowColumnWriter, ArrowWriterOptions},
65    arrow::ArrowWriter,
66    errors::{ParquetError, Result},
67    file::{metadata::RowGroupMetaData, properties::WriterProperties},
68    format::{FileMetaData, KeyValue},
69};
70use arrow_array::RecordBatch;
71use arrow_schema::SchemaRef;
72use bytes::Bytes;
73use futures::future::BoxFuture;
74use futures::FutureExt;
75use std::mem;
76use tokio::io::{AsyncWrite, AsyncWriteExt};
77
78/// The asynchronous interface used by [`AsyncArrowWriter`] to write parquet files.
79pub trait AsyncFileWriter: Send {
80    /// Write the provided bytes to the underlying writer
81    ///
82    /// The underlying writer CAN decide to buffer the data or write it immediately.
83    /// This design allows the writer implementer to control the buffering and I/O scheduling.
84    ///
85    /// The underlying writer MAY implement retry logic to prevent breaking users write process.
86    fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<()>>;
87
88    /// Flush any buffered data to the underlying writer and finish writing process.
89    ///
90    /// After `complete` returns `Ok(())`, caller SHOULD not call write again.
91    fn complete(&mut self) -> BoxFuture<'_, Result<()>>;
92}
93
94impl AsyncFileWriter for Box<dyn AsyncFileWriter + '_> {
95    fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<()>> {
96        self.as_mut().write(bs)
97    }
98
99    fn complete(&mut self) -> BoxFuture<'_, Result<()>> {
100        self.as_mut().complete()
101    }
102}
103
104impl<T: AsyncWrite + Unpin + Send> AsyncFileWriter for T {
105    fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<()>> {
106        async move {
107            self.write_all(&bs).await?;
108            Ok(())
109        }
110        .boxed()
111    }
112
113    fn complete(&mut self) -> BoxFuture<'_, Result<()>> {
114        async move {
115            self.flush().await?;
116            self.shutdown().await?;
117            Ok(())
118        }
119        .boxed()
120    }
121}
122
123/// Encodes [`RecordBatch`] to parquet, outputting to an [`AsyncFileWriter`]
124///
125/// ## Memory Usage
126///
127/// This writer eagerly writes data as soon as possible to the underlying [`AsyncFileWriter`],
128/// permitting fine-grained control over buffering and I/O scheduling. However, the columnar
129/// nature of parquet forces data for an entire row group to be buffered in memory, before
130/// it can be flushed. Depending on the data and the configured row group size, this buffering
131/// may be substantial.
132///
133/// Memory usage can be limited by calling [`Self::flush`] to flush the in progress row group,
134/// although this will likely increase overall file size and reduce query performance.
135/// See [ArrowWriter] for more information.
136///
137/// ```no_run
138/// # use tokio::fs::File;
139/// # use arrow_array::RecordBatch;
140/// # use parquet::arrow::AsyncArrowWriter;
141/// # async fn test() {
142/// let mut writer: AsyncArrowWriter<File> = todo!();
143/// let batch: RecordBatch = todo!();
144/// writer.write(&batch).await.unwrap();
145/// // Trigger an early flush if buffered size exceeds 1_000_000
146/// if writer.in_progress_size() > 1_000_000 {
147///     writer.flush().await.unwrap()
148/// }
149/// # }
150/// ```
151pub struct AsyncArrowWriter<W> {
152    /// Underlying sync writer
153    sync_writer: ArrowWriter<Vec<u8>>,
154
155    /// Async writer provided by caller
156    async_writer: W,
157}
158
159impl<W: AsyncFileWriter> AsyncArrowWriter<W> {
160    /// Try to create a new Async Arrow Writer
161    pub fn try_new(
162        writer: W,
163        arrow_schema: SchemaRef,
164        props: Option<WriterProperties>,
165    ) -> Result<Self> {
166        let options = ArrowWriterOptions::new().with_properties(props.unwrap_or_default());
167        Self::try_new_with_options(writer, arrow_schema, options)
168    }
169
170    /// Try to create a new Async Arrow Writer with [`ArrowWriterOptions`]
171    pub fn try_new_with_options(
172        writer: W,
173        arrow_schema: SchemaRef,
174        options: ArrowWriterOptions,
175    ) -> Result<Self> {
176        let sync_writer = ArrowWriter::try_new_with_options(Vec::new(), arrow_schema, options)?;
177
178        Ok(Self {
179            sync_writer,
180            async_writer: writer,
181        })
182    }
183
184    /// Returns metadata for any flushed row groups
185    pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
186        self.sync_writer.flushed_row_groups()
187    }
188
189    /// Estimated memory usage, in bytes, of this `ArrowWriter`
190    ///
191    /// See [ArrowWriter::memory_size] for more information.
192    pub fn memory_size(&self) -> usize {
193        self.sync_writer.memory_size()
194    }
195
196    /// Anticipated encoded size of the in progress row group.
197    ///
198    /// See [ArrowWriter::memory_size] for more information.
199    pub fn in_progress_size(&self) -> usize {
200        self.sync_writer.in_progress_size()
201    }
202
203    /// Returns the number of rows buffered in the in progress row group
204    pub fn in_progress_rows(&self) -> usize {
205        self.sync_writer.in_progress_rows()
206    }
207
208    /// Returns the number of bytes written by this instance
209    pub fn bytes_written(&self) -> usize {
210        self.sync_writer.bytes_written()
211    }
212
213    /// Enqueues the provided `RecordBatch` to be written
214    ///
215    /// After every sync write by the inner [ArrowWriter], the inner buffer will be
216    /// checked and flush if at least half full
217    pub async fn write(&mut self, batch: &RecordBatch) -> Result<()> {
218        let before = self.sync_writer.flushed_row_groups().len();
219        self.sync_writer.write(batch)?;
220        if before != self.sync_writer.flushed_row_groups().len() {
221            self.do_write().await?;
222        }
223        Ok(())
224    }
225
226    /// Flushes all buffered rows into a new row group
227    pub async fn flush(&mut self) -> Result<()> {
228        self.sync_writer.flush()?;
229        self.do_write().await?;
230
231        Ok(())
232    }
233
234    /// Append [`KeyValue`] metadata in addition to those in [`WriterProperties`]
235    ///
236    /// This method allows to append metadata after [`RecordBatch`]es are written.
237    pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
238        self.sync_writer.append_key_value_metadata(kv_metadata);
239    }
240
241    /// Close and finalize the writer.
242    ///
243    /// All the data in the inner buffer will be force flushed.
244    ///
245    /// Unlike [`Self::close`] this does not consume self
246    ///
247    /// Attempting to write after calling finish will result in an error
248    pub async fn finish(&mut self) -> Result<FileMetaData> {
249        let metadata = self.sync_writer.finish()?;
250
251        // Force to flush the remaining data.
252        self.do_write().await?;
253        self.async_writer.complete().await?;
254
255        Ok(metadata)
256    }
257
258    /// Close and finalize the writer.
259    ///
260    /// All the data in the inner buffer will be force flushed.
261    pub async fn close(mut self) -> Result<FileMetaData> {
262        self.finish().await
263    }
264
265    /// Consumes the [`AsyncArrowWriter`] and returns the underlying [`AsyncFileWriter`]
266    ///
267    /// # Notes
268    ///
269    /// This method does **not** flush or finalize the writer, so buffered data
270    /// will be lost if you have not called [`Self::finish`].
271    pub fn into_inner(self) -> W {
272        self.async_writer
273    }
274
275    /// Flush the data written by `sync_writer` into the `async_writer`
276    ///
277    /// # Notes
278    ///
279    /// This method will take the inner buffer from the `sync_writer` and write it into the
280    /// async writer. After the write, the inner buffer will be empty.
281    async fn do_write(&mut self) -> Result<()> {
282        let buffer = mem::take(self.sync_writer.inner_mut());
283
284        self.async_writer
285            .write(Bytes::from(buffer))
286            .await
287            .map_err(|e| ParquetError::External(Box::new(e)))?;
288
289        Ok(())
290    }
291
292    /// Create a new row group writer and return its column writers.
293    pub async fn get_column_writers(&mut self) -> Result<Vec<ArrowColumnWriter>> {
294        let before = self.sync_writer.flushed_row_groups().len();
295        let writers = self.sync_writer.get_column_writers()?;
296        if before != self.sync_writer.flushed_row_groups().len() {
297            self.do_write().await?;
298        }
299        Ok(writers)
300    }
301
302    /// Append the given column chunks to the file as a new row group.
303    pub async fn append_row_group(&mut self, chunks: Vec<ArrowColumnChunk>) -> Result<()> {
304        self.sync_writer.append_row_group(chunks)?;
305        self.do_write().await
306    }
307}
308
309#[cfg(test)]
310mod tests {
311    use arrow::datatypes::{DataType, Field, Schema};
312    use arrow_array::{ArrayRef, BinaryArray, Int32Array, Int64Array, RecordBatchReader};
313    use bytes::Bytes;
314    use std::sync::Arc;
315
316    use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
317    use crate::arrow::arrow_writer::compute_leaves;
318
319    use super::*;
320
321    fn get_test_reader() -> ParquetRecordBatchReader {
322        let testdata = arrow::util::test_util::parquet_test_data();
323        // This test file is large enough to generate multiple row groups.
324        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
325        let original_data = Bytes::from(std::fs::read(path).unwrap());
326        ParquetRecordBatchReaderBuilder::try_new(original_data)
327            .unwrap()
328            .build()
329            .unwrap()
330    }
331
332    #[tokio::test]
333    async fn test_async_writer() {
334        let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
335        let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
336
337        let mut buffer = Vec::new();
338        let mut writer = AsyncArrowWriter::try_new(&mut buffer, to_write.schema(), None).unwrap();
339        writer.write(&to_write).await.unwrap();
340        writer.close().await.unwrap();
341
342        let buffer = Bytes::from(buffer);
343        let mut reader = ParquetRecordBatchReaderBuilder::try_new(buffer)
344            .unwrap()
345            .build()
346            .unwrap();
347        let read = reader.next().unwrap().unwrap();
348
349        assert_eq!(to_write, read);
350    }
351
352    #[tokio::test]
353    async fn test_async_arrow_group_writer() {
354        let col = Arc::new(Int64Array::from_iter_values([4, 5, 6])) as ArrayRef;
355        let to_write_record = RecordBatch::try_from_iter([("col", col)]).unwrap();
356
357        let mut buffer = Vec::new();
358        let mut writer =
359            AsyncArrowWriter::try_new(&mut buffer, to_write_record.schema(), None).unwrap();
360
361        // Use classic API
362        writer.write(&to_write_record).await.unwrap();
363
364        let mut writers = writer.get_column_writers().await.unwrap();
365        let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
366        let to_write_arrow_group = RecordBatch::try_from_iter([("col", col)]).unwrap();
367
368        for (field, column) in to_write_arrow_group
369            .schema()
370            .fields()
371            .iter()
372            .zip(to_write_arrow_group.columns())
373        {
374            for leaf in compute_leaves(field.as_ref(), column).unwrap() {
375                writers[0].write(&leaf).unwrap();
376            }
377        }
378
379        let columns: Vec<_> = writers.into_iter().map(|w| w.close().unwrap()).collect();
380        // Append the arrow group as a new row group. Flush in progress
381        writer.append_row_group(columns).await.unwrap();
382        writer.close().await.unwrap();
383
384        let buffer = Bytes::from(buffer);
385        let mut reader = ParquetRecordBatchReaderBuilder::try_new(buffer)
386            .unwrap()
387            .build()
388            .unwrap();
389
390        let col = Arc::new(Int64Array::from_iter_values([4, 5, 6, 1, 2, 3])) as ArrayRef;
391        let expected = RecordBatch::try_from_iter([("col", col)]).unwrap();
392
393        let read = reader.next().unwrap().unwrap();
394        assert_eq!(expected, read);
395    }
396
397    // Read the data from the test file and write it by the async writer and sync writer.
398    // And then compares the results of the two writers.
399    #[tokio::test]
400    async fn test_async_writer_with_sync_writer() {
401        let reader = get_test_reader();
402
403        let write_props = WriterProperties::builder()
404            .set_max_row_group_size(64)
405            .build();
406
407        let mut async_buffer = Vec::new();
408        let mut async_writer = AsyncArrowWriter::try_new(
409            &mut async_buffer,
410            reader.schema(),
411            Some(write_props.clone()),
412        )
413        .unwrap();
414
415        let mut sync_buffer = Vec::new();
416        let mut sync_writer =
417            ArrowWriter::try_new(&mut sync_buffer, reader.schema(), Some(write_props)).unwrap();
418        for record_batch in reader {
419            let record_batch = record_batch.unwrap();
420            async_writer.write(&record_batch).await.unwrap();
421            sync_writer.write(&record_batch).unwrap();
422        }
423        sync_writer.close().unwrap();
424        async_writer.close().await.unwrap();
425
426        assert_eq!(sync_buffer, async_buffer);
427    }
428
429    #[tokio::test]
430    async fn test_async_writer_bytes_written() {
431        let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
432        let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
433
434        let temp = tempfile::tempfile().unwrap();
435
436        let file = tokio::fs::File::from_std(temp.try_clone().unwrap());
437        let mut writer =
438            AsyncArrowWriter::try_new(file.try_clone().await.unwrap(), to_write.schema(), None)
439                .unwrap();
440        writer.write(&to_write).await.unwrap();
441        let _metadata = writer.finish().await.unwrap();
442        // After `finish` this should include the metadata and footer
443        let reported = writer.bytes_written();
444
445        // Get actual size from file metadata
446        let actual = file.metadata().await.unwrap().len() as usize;
447
448        assert_eq!(reported, actual);
449    }
450
451    #[tokio::test]
452    async fn test_async_writer_file() {
453        let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
454        let col2 = Arc::new(BinaryArray::from_iter_values(vec![
455            vec![0; 500000],
456            vec![0; 500000],
457            vec![0; 500000],
458        ])) as ArrayRef;
459        let to_write = RecordBatch::try_from_iter([("col", col), ("col2", col2)]).unwrap();
460
461        let temp = tempfile::tempfile().unwrap();
462
463        let file = tokio::fs::File::from_std(temp.try_clone().unwrap());
464        let mut writer = AsyncArrowWriter::try_new(file, to_write.schema(), None).unwrap();
465        writer.write(&to_write).await.unwrap();
466        writer.close().await.unwrap();
467
468        let mut reader = ParquetRecordBatchReaderBuilder::try_new(temp)
469            .unwrap()
470            .build()
471            .unwrap();
472        let read = reader.next().unwrap().unwrap();
473
474        assert_eq!(to_write, read);
475    }
476
477    #[tokio::test]
478    async fn in_progress_accounting() {
479        // define schema
480        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
481
482        // create some data
483        let a = Int32Array::from_value(0_i32, 512);
484
485        // build a record batch
486        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
487
488        let temp = tempfile::tempfile().unwrap();
489        let file = tokio::fs::File::from_std(temp.try_clone().unwrap());
490        let mut writer = AsyncArrowWriter::try_new(file, batch.schema(), None).unwrap();
491
492        // starts empty
493        assert_eq!(writer.in_progress_size(), 0);
494        assert_eq!(writer.in_progress_rows(), 0);
495        assert_eq!(writer.bytes_written(), 4); // Initial Parquet header
496        writer.write(&batch).await.unwrap();
497
498        // updated on write
499        let initial_size = writer.in_progress_size();
500        assert!(initial_size > 0);
501        assert_eq!(writer.in_progress_rows(), batch.num_rows());
502        let initial_memory = writer.memory_size();
503        // memory estimate is larger than estimated encoded size
504        assert!(
505            initial_size <= initial_memory,
506            "{initial_size} <= {initial_memory}"
507        );
508
509        // updated on second write
510        writer.write(&batch).await.unwrap();
511        assert!(writer.in_progress_size() > initial_size);
512        assert_eq!(writer.in_progress_rows(), batch.num_rows() * 2);
513        assert!(writer.memory_size() > initial_memory);
514        assert!(
515            writer.in_progress_size() <= writer.memory_size(),
516            "in_progress_size {} <= memory_size {}",
517            writer.in_progress_size(),
518            writer.memory_size()
519        );
520
521        // in progress tracking is cleared, but the overall data written is updated
522        let pre_flush_bytes_written = writer.bytes_written();
523        writer.flush().await.unwrap();
524        assert_eq!(writer.in_progress_size(), 0);
525        assert_eq!(writer.memory_size(), 0);
526        assert_eq!(writer.in_progress_rows(), 0);
527        assert!(writer.bytes_written() > pre_flush_bytes_written);
528
529        writer.close().await.unwrap();
530    }
531}