parquet/arrow/async_writer/
store.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use bytes::Bytes;
19use futures::future::BoxFuture;
20use std::sync::Arc;
21
22use crate::arrow::async_writer::AsyncFileWriter;
23use crate::errors::{ParquetError, Result};
24use object_store::buffered::BufWriter;
25use object_store::path::Path;
26use object_store::ObjectStore;
27use tokio::io::AsyncWriteExt;
28
29/// [`ParquetObjectWriter`] for writing to parquet to [`ObjectStore`]
30///
31/// ```
32/// # use arrow_array::{ArrayRef, Int64Array, RecordBatch};
33/// # use object_store::memory::InMemory;
34/// # use object_store::path::Path;
35/// # use object_store::ObjectStore;
36/// # use std::sync::Arc;
37///
38/// # use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
39/// # use parquet::arrow::async_writer::ParquetObjectWriter;
40/// # use parquet::arrow::AsyncArrowWriter;
41///
42/// # #[tokio::main(flavor="current_thread")]
43/// # async fn main() {
44///     let store = Arc::new(InMemory::new());
45///
46///     let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
47///     let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
48///
49///     let object_store_writer = ParquetObjectWriter::new(store.clone(), Path::from("test"));
50///     let mut writer =
51///         AsyncArrowWriter::try_new(object_store_writer, to_write.schema(), None).unwrap();
52///     writer.write(&to_write).await.unwrap();
53///     writer.close().await.unwrap();
54///
55///     let buffer = store
56///         .get(&Path::from("test"))
57///         .await
58///         .unwrap()
59///         .bytes()
60///         .await
61///         .unwrap();
62///     let mut reader = ParquetRecordBatchReaderBuilder::try_new(buffer)
63///         .unwrap()
64///         .build()
65///         .unwrap();
66///     let read = reader.next().unwrap().unwrap();
67///
68///     assert_eq!(to_write, read);
69/// # }
70/// ```
71#[derive(Debug)]
72pub struct ParquetObjectWriter {
73    w: BufWriter,
74}
75
76impl ParquetObjectWriter {
77    /// Create a new [`ParquetObjectWriter`] that writes to the specified path in the given store.
78    ///
79    /// To configure the writer behavior, please build [`BufWriter`] and then use [`Self::from_buf_writer`]
80    pub fn new(store: Arc<dyn ObjectStore>, path: Path) -> Self {
81        Self::from_buf_writer(BufWriter::new(store, path))
82    }
83
84    /// Construct a new ParquetObjectWriter via a existing BufWriter.
85    pub fn from_buf_writer(w: BufWriter) -> Self {
86        Self { w }
87    }
88
89    /// Consume the writer and return the underlying BufWriter.
90    pub fn into_inner(self) -> BufWriter {
91        self.w
92    }
93}
94
95impl AsyncFileWriter for ParquetObjectWriter {
96    fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<()>> {
97        Box::pin(async {
98            self.w
99                .put(bs)
100                .await
101                .map_err(|err| ParquetError::External(Box::new(err)))
102        })
103    }
104
105    fn complete(&mut self) -> BoxFuture<'_, Result<()>> {
106        Box::pin(async {
107            self.w
108                .shutdown()
109                .await
110                .map_err(|err| ParquetError::External(Box::new(err)))
111        })
112    }
113}
114impl From<BufWriter> for ParquetObjectWriter {
115    fn from(w: BufWriter) -> Self {
116        Self::from_buf_writer(w)
117    }
118}
119#[cfg(test)]
120mod tests {
121    use arrow_array::{ArrayRef, Int64Array, RecordBatch};
122    use object_store::memory::InMemory;
123    use std::sync::Arc;
124
125    use super::*;
126    use crate::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
127    use crate::arrow::AsyncArrowWriter;
128
129    #[tokio::test]
130    async fn test_async_writer() {
131        let store = Arc::new(InMemory::new());
132
133        let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
134        let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
135
136        let object_store_writer = ParquetObjectWriter::new(store.clone(), Path::from("test"));
137        let mut writer =
138            AsyncArrowWriter::try_new(object_store_writer, to_write.schema(), None).unwrap();
139        writer.write(&to_write).await.unwrap();
140        writer.close().await.unwrap();
141
142        let buffer = store
143            .get(&Path::from("test"))
144            .await
145            .unwrap()
146            .bytes()
147            .await
148            .unwrap();
149        let mut reader = ParquetRecordBatchReaderBuilder::try_new(buffer)
150            .unwrap()
151            .build()
152            .unwrap();
153        let read = reader.next().unwrap().unwrap();
154
155        assert_eq!(to_write, read);
156    }
157}