parquet/arrow/async_writer/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! `async` API for writing [`RecordBatch`]es to Parquet files
19//!
20//! See the [crate-level documentation](crate) for more details.
21//!
22//! The `async` API for writing [`RecordBatch`]es is
23//! similar to the [`sync` API](ArrowWriter), so please
24//! read the documentation there before using this API.
25//!
26//! Here is an example for using [`AsyncArrowWriter`]:
27//!
28//! ```
29//! # #[tokio::main(flavor="current_thread")]
30//! # async fn main() {
31//! #
32//! # use std::sync::Arc;
33//! # use arrow_array::{ArrayRef, Int64Array, RecordBatch, RecordBatchReader};
34//! # use bytes::Bytes;
35//! # use parquet::arrow::{AsyncArrowWriter, arrow_reader::ParquetRecordBatchReaderBuilder};
36//! #
37//! let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
38//! let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
39//!
40//! let mut buffer = Vec::new();
41//! let mut writer = AsyncArrowWriter::try_new(&mut buffer, to_write.schema(), None).unwrap();
42//! writer.write(&to_write).await.unwrap();
43//! writer.close().await.unwrap();
44//!
45//! let buffer = Bytes::from(buffer);
46//! let mut reader = ParquetRecordBatchReaderBuilder::try_new(buffer.clone())
47//!     .unwrap()
48//!     .build()
49//!     .unwrap();
50//! let read = reader.next().unwrap().unwrap();
51//!
52//! assert_eq!(to_write, read);
53//! # }
54//! ```
55//!
56//! [`object_store`] provides it's native implementation of [`AsyncFileWriter`] by [`ParquetObjectWriter`].
57
58#[cfg(feature = "object_store")]
59mod store;
60#[cfg(feature = "object_store")]
61pub use store::*;
62
63use crate::{
64    arrow::ArrowWriter,
65    arrow::arrow_writer::ArrowWriterOptions,
66    errors::{ParquetError, Result},
67    file::{
68        metadata::{KeyValue, ParquetMetaData, RowGroupMetaData},
69        properties::WriterProperties,
70    },
71};
72use arrow_array::RecordBatch;
73use arrow_schema::SchemaRef;
74use bytes::Bytes;
75use futures::FutureExt;
76use futures::future::BoxFuture;
77use std::mem;
78use tokio::io::{AsyncWrite, AsyncWriteExt};
79
80/// The asynchronous interface used by [`AsyncArrowWriter`] to write parquet files.
81pub trait AsyncFileWriter: Send {
82    /// Write the provided bytes to the underlying writer
83    ///
84    /// The underlying writer CAN decide to buffer the data or write it immediately.
85    /// This design allows the writer implementer to control the buffering and I/O scheduling.
86    ///
87    /// The underlying writer MAY implement retry logic to prevent breaking users write process.
88    fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<()>>;
89
90    /// Flush any buffered data to the underlying writer and finish writing process.
91    ///
92    /// After `complete` returns `Ok(())`, caller SHOULD not call write again.
93    fn complete(&mut self) -> BoxFuture<'_, Result<()>>;
94}
95
96impl AsyncFileWriter for Box<dyn AsyncFileWriter + '_> {
97    fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<()>> {
98        self.as_mut().write(bs)
99    }
100
101    fn complete(&mut self) -> BoxFuture<'_, Result<()>> {
102        self.as_mut().complete()
103    }
104}
105
106impl<T: AsyncWrite + Unpin + Send> AsyncFileWriter for T {
107    fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<()>> {
108        async move {
109            self.write_all(&bs).await?;
110            Ok(())
111        }
112        .boxed()
113    }
114
115    fn complete(&mut self) -> BoxFuture<'_, Result<()>> {
116        async move {
117            self.flush().await?;
118            self.shutdown().await?;
119            Ok(())
120        }
121        .boxed()
122    }
123}
124
125/// Encodes [`RecordBatch`] to parquet, outputting to an [`AsyncFileWriter`]
126///
127/// ## Memory Usage
128///
129/// This writer eagerly writes data as soon as possible to the underlying [`AsyncFileWriter`],
130/// permitting fine-grained control over buffering and I/O scheduling. However, the columnar
131/// nature of parquet forces data for an entire row group to be buffered in memory, before
132/// it can be flushed. Depending on the data and the configured row group size, this buffering
133/// may be substantial.
134///
135/// Memory usage can be limited by calling [`Self::flush`] to flush the in progress row group,
136/// although this will likely increase overall file size and reduce query performance.
137/// See [ArrowWriter] for more information.
138///
139/// ```no_run
140/// # use tokio::fs::File;
141/// # use arrow_array::RecordBatch;
142/// # use parquet::arrow::AsyncArrowWriter;
143/// # async fn test() {
144/// let mut writer: AsyncArrowWriter<File> = todo!();
145/// let batch: RecordBatch = todo!();
146/// writer.write(&batch).await.unwrap();
147/// // Trigger an early flush if buffered size exceeds 1_000_000
148/// if writer.in_progress_size() > 1_000_000 {
149///     writer.flush().await.unwrap()
150/// }
151/// # }
152/// ```
153pub struct AsyncArrowWriter<W> {
154    /// Underlying sync writer
155    sync_writer: ArrowWriter<Vec<u8>>,
156
157    /// Async writer provided by caller
158    async_writer: W,
159}
160
161impl<W: AsyncFileWriter> AsyncArrowWriter<W> {
162    /// Try to create a new Async Arrow Writer
163    pub fn try_new(
164        writer: W,
165        arrow_schema: SchemaRef,
166        props: Option<WriterProperties>,
167    ) -> Result<Self> {
168        let options = ArrowWriterOptions::new().with_properties(props.unwrap_or_default());
169        Self::try_new_with_options(writer, arrow_schema, options)
170    }
171
172    /// Try to create a new Async Arrow Writer with [`ArrowWriterOptions`]
173    pub fn try_new_with_options(
174        writer: W,
175        arrow_schema: SchemaRef,
176        options: ArrowWriterOptions,
177    ) -> Result<Self> {
178        let sync_writer = ArrowWriter::try_new_with_options(Vec::new(), arrow_schema, options)?;
179
180        Ok(Self {
181            sync_writer,
182            async_writer: writer,
183        })
184    }
185
186    /// Returns metadata for any flushed row groups
187    pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
188        self.sync_writer.flushed_row_groups()
189    }
190
191    /// Estimated memory usage, in bytes, of this `ArrowWriter`
192    ///
193    /// See [ArrowWriter::memory_size] for more information.
194    pub fn memory_size(&self) -> usize {
195        self.sync_writer.memory_size()
196    }
197
198    /// Anticipated encoded size of the in progress row group.
199    ///
200    /// See [ArrowWriter::memory_size] for more information.
201    pub fn in_progress_size(&self) -> usize {
202        self.sync_writer.in_progress_size()
203    }
204
205    /// Returns the number of rows buffered in the in progress row group
206    pub fn in_progress_rows(&self) -> usize {
207        self.sync_writer.in_progress_rows()
208    }
209
210    /// Returns the number of bytes written by this instance
211    pub fn bytes_written(&self) -> usize {
212        self.sync_writer.bytes_written()
213    }
214
215    /// Enqueues the provided `RecordBatch` to be written
216    ///
217    /// After every sync write by the inner [ArrowWriter], the inner buffer will be
218    /// checked and flush if at least half full
219    pub async fn write(&mut self, batch: &RecordBatch) -> Result<()> {
220        let before = self.sync_writer.flushed_row_groups().len();
221        self.sync_writer.write(batch)?;
222        if before != self.sync_writer.flushed_row_groups().len() {
223            self.do_write().await?;
224        }
225        Ok(())
226    }
227
228    /// Flushes all buffered rows into a new row group
229    pub async fn flush(&mut self) -> Result<()> {
230        self.sync_writer.flush()?;
231        self.do_write().await?;
232
233        Ok(())
234    }
235
236    /// Append [`KeyValue`] metadata in addition to those in [`WriterProperties`]
237    ///
238    /// This method allows to append metadata after [`RecordBatch`]es are written.
239    pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
240        self.sync_writer.append_key_value_metadata(kv_metadata);
241    }
242
243    /// Close and finalize the writer.
244    ///
245    /// All the data in the inner buffer will be force flushed.
246    ///
247    /// Unlike [`Self::close`] this does not consume self
248    ///
249    /// Attempting to write after calling finish will result in an error
250    pub async fn finish(&mut self) -> Result<ParquetMetaData> {
251        let metadata = self.sync_writer.finish()?;
252
253        // Force to flush the remaining data.
254        self.do_write().await?;
255        self.async_writer.complete().await?;
256
257        Ok(metadata)
258    }
259
260    /// Close and finalize the writer.
261    ///
262    /// All the data in the inner buffer will be force flushed.
263    pub async fn close(mut self) -> Result<ParquetMetaData> {
264        self.finish().await
265    }
266
267    /// Consumes the [`AsyncArrowWriter`] and returns the underlying [`AsyncFileWriter`]
268    ///
269    /// # Notes
270    ///
271    /// This method does **not** flush or finalize the writer, so buffered data
272    /// will be lost if you have not called [`Self::finish`].
273    pub fn into_inner(self) -> W {
274        self.async_writer
275    }
276
277    /// Flush the data written by `sync_writer` into the `async_writer`
278    ///
279    /// # Notes
280    ///
281    /// This method will take the inner buffer from the `sync_writer` and write it into the
282    /// async writer. After the write, the inner buffer will be empty.
283    async fn do_write(&mut self) -> Result<()> {
284        let buffer = mem::take(self.sync_writer.inner_mut());
285
286        self.async_writer
287            .write(Bytes::from(buffer))
288            .await
289            .map_err(|e| ParquetError::External(Box::new(e)))?;
290
291        Ok(())
292    }
293}
294
295#[cfg(test)]
296mod tests {
297    use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
298    use arrow::datatypes::{DataType, Field, Schema};
299    use arrow_array::{ArrayRef, BinaryArray, Int32Array, Int64Array, RecordBatchReader};
300    use bytes::Bytes;
301    use std::sync::Arc;
302
303    use super::*;
304
305    fn get_test_reader() -> ParquetRecordBatchReader {
306        let testdata = arrow::util::test_util::parquet_test_data();
307        // This test file is large enough to generate multiple row groups.
308        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
309        let original_data = Bytes::from(std::fs::read(path).unwrap());
310        ParquetRecordBatchReaderBuilder::try_new(original_data)
311            .unwrap()
312            .build()
313            .unwrap()
314    }
315
316    #[tokio::test]
317    async fn test_async_writer() {
318        let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
319        let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
320
321        let mut buffer = Vec::new();
322        let mut writer = AsyncArrowWriter::try_new(&mut buffer, to_write.schema(), None).unwrap();
323        writer.write(&to_write).await.unwrap();
324        writer.close().await.unwrap();
325
326        let buffer = Bytes::from(buffer);
327        let mut reader = ParquetRecordBatchReaderBuilder::try_new(buffer)
328            .unwrap()
329            .build()
330            .unwrap();
331        let read = reader.next().unwrap().unwrap();
332
333        assert_eq!(to_write, read);
334    }
335
336    // Read the data from the test file and write it by the async writer and sync writer.
337    // And then compares the results of the two writers.
338    #[tokio::test]
339    async fn test_async_writer_with_sync_writer() {
340        let reader = get_test_reader();
341
342        let write_props = WriterProperties::builder()
343            .set_max_row_group_size(64)
344            .build();
345
346        let mut async_buffer = Vec::new();
347        let mut async_writer = AsyncArrowWriter::try_new(
348            &mut async_buffer,
349            reader.schema(),
350            Some(write_props.clone()),
351        )
352        .unwrap();
353
354        let mut sync_buffer = Vec::new();
355        let mut sync_writer =
356            ArrowWriter::try_new(&mut sync_buffer, reader.schema(), Some(write_props)).unwrap();
357        for record_batch in reader {
358            let record_batch = record_batch.unwrap();
359            async_writer.write(&record_batch).await.unwrap();
360            sync_writer.write(&record_batch).unwrap();
361        }
362        sync_writer.close().unwrap();
363        async_writer.close().await.unwrap();
364
365        assert_eq!(sync_buffer, async_buffer);
366    }
367
368    #[tokio::test]
369    async fn test_async_writer_bytes_written() {
370        let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
371        let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
372
373        let temp = tempfile::tempfile().unwrap();
374
375        let file = tokio::fs::File::from_std(temp.try_clone().unwrap());
376        let mut writer =
377            AsyncArrowWriter::try_new(file.try_clone().await.unwrap(), to_write.schema(), None)
378                .unwrap();
379        writer.write(&to_write).await.unwrap();
380        let _metadata = writer.finish().await.unwrap();
381        // After `finish` this should include the metadata and footer
382        let reported = writer.bytes_written();
383
384        // Get actual size from file metadata
385        let actual = file.metadata().await.unwrap().len() as usize;
386
387        assert_eq!(reported, actual);
388    }
389
390    #[tokio::test]
391    async fn test_async_writer_file() {
392        let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
393        let col2 = Arc::new(BinaryArray::from_iter_values(vec![
394            vec![0; 500000],
395            vec![0; 500000],
396            vec![0; 500000],
397        ])) as ArrayRef;
398        let to_write = RecordBatch::try_from_iter([("col", col), ("col2", col2)]).unwrap();
399
400        let temp = tempfile::tempfile().unwrap();
401
402        let file = tokio::fs::File::from_std(temp.try_clone().unwrap());
403        let mut writer = AsyncArrowWriter::try_new(file, to_write.schema(), None).unwrap();
404        writer.write(&to_write).await.unwrap();
405        writer.close().await.unwrap();
406
407        let mut reader = ParquetRecordBatchReaderBuilder::try_new(temp)
408            .unwrap()
409            .build()
410            .unwrap();
411        let read = reader.next().unwrap().unwrap();
412
413        assert_eq!(to_write, read);
414    }
415
416    #[tokio::test]
417    async fn in_progress_accounting() {
418        // define schema
419        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
420
421        // create some data
422        let a = Int32Array::from_value(0_i32, 512);
423
424        // build a record batch
425        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
426
427        let temp = tempfile::tempfile().unwrap();
428        let file = tokio::fs::File::from_std(temp.try_clone().unwrap());
429        let mut writer = AsyncArrowWriter::try_new(file, batch.schema(), None).unwrap();
430
431        // starts empty
432        assert_eq!(writer.in_progress_size(), 0);
433        assert_eq!(writer.in_progress_rows(), 0);
434        assert_eq!(writer.bytes_written(), 4); // Initial Parquet header
435        writer.write(&batch).await.unwrap();
436
437        // updated on write
438        let initial_size = writer.in_progress_size();
439        assert!(initial_size > 0);
440        assert_eq!(writer.in_progress_rows(), batch.num_rows());
441        let initial_memory = writer.memory_size();
442        // memory estimate is larger than estimated encoded size
443        assert!(
444            initial_size <= initial_memory,
445            "{initial_size} <= {initial_memory}"
446        );
447
448        // updated on second write
449        writer.write(&batch).await.unwrap();
450        assert!(writer.in_progress_size() > initial_size);
451        assert_eq!(writer.in_progress_rows(), batch.num_rows() * 2);
452        assert!(writer.memory_size() > initial_memory);
453        assert!(
454            writer.in_progress_size() <= writer.memory_size(),
455            "in_progress_size {} <= memory_size {}",
456            writer.in_progress_size(),
457            writer.memory_size()
458        );
459
460        // in progress tracking is cleared, but the overall data written is updated
461        let pre_flush_bytes_written = writer.bytes_written();
462        writer.flush().await.unwrap();
463        assert_eq!(writer.in_progress_size(), 0);
464        assert_eq!(writer.memory_size(), 0);
465        assert_eq!(writer.in_progress_rows(), 0);
466        assert!(writer.bytes_written() > pre_flush_bytes_written);
467
468        writer.close().await.unwrap();
469    }
470}