parquet/arrow/async_writer/store.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use bytes::Bytes;
19use futures::future::BoxFuture;
20use std::sync::Arc;
21
22use crate::arrow::async_writer::AsyncFileWriter;
23use crate::errors::{ParquetError, Result};
24use object_store::ObjectStore;
25use object_store::buffered::BufWriter;
26use object_store::path::Path;
27use tokio::io::AsyncWriteExt;
28
29/// [`ParquetObjectWriter`] for writing to parquet to [`ObjectStore`]
30///
31/// ```
32/// # use arrow_array::{ArrayRef, Int64Array, RecordBatch};
33/// # use object_store::memory::InMemory;
34/// # use object_store::path::Path;
35/// # use object_store::{ObjectStore, ObjectStoreExt};
36/// # use std::sync::Arc;
37///
38/// # use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
39/// # use parquet::arrow::async_writer::ParquetObjectWriter;
40/// # use parquet::arrow::AsyncArrowWriter;
41///
42/// # #[tokio::main(flavor="current_thread")]
43/// # async fn main() {
44/// let store = Arc::new(InMemory::new());
45///
46/// let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
47/// let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
48///
49/// let object_store_writer = ParquetObjectWriter::new(store.clone(), Path::from("test"));
50/// let mut writer =
51/// AsyncArrowWriter::try_new(object_store_writer, to_write.schema(), None).unwrap();
52/// writer.write(&to_write).await.unwrap();
53/// writer.close().await.unwrap();
54///
55/// let buffer = store
56/// .get(&Path::from("test"))
57/// .await
58/// .unwrap()
59/// .bytes()
60/// .await
61/// .unwrap();
62/// let mut reader = ParquetRecordBatchReaderBuilder::try_new(buffer)
63/// .unwrap()
64/// .build()
65/// .unwrap();
66/// let read = reader.next().unwrap().unwrap();
67///
68/// assert_eq!(to_write, read);
69/// # }
70/// ```
71#[derive(Debug)]
72pub struct ParquetObjectWriter {
73 w: BufWriter,
74}
75
76impl ParquetObjectWriter {
77 /// Create a new [`ParquetObjectWriter`] that writes to the specified path in the given store.
78 ///
79 /// To configure the writer behavior, please build [`BufWriter`] and then use [`Self::from_buf_writer`]
80 pub fn new(store: Arc<dyn ObjectStore>, path: Path) -> Self {
81 Self::from_buf_writer(BufWriter::new(store, path))
82 }
83
84 /// Construct a new ParquetObjectWriter via a existing BufWriter.
85 pub fn from_buf_writer(w: BufWriter) -> Self {
86 Self { w }
87 }
88
89 /// Consume the writer and return the underlying BufWriter.
90 pub fn into_inner(self) -> BufWriter {
91 self.w
92 }
93}
94
95impl AsyncFileWriter for ParquetObjectWriter {
96 fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<()>> {
97 Box::pin(async {
98 self.w
99 .put(bs)
100 .await
101 .map_err(|err| ParquetError::External(Box::new(err)))
102 })
103 }
104
105 fn complete(&mut self) -> BoxFuture<'_, Result<()>> {
106 Box::pin(async {
107 self.w
108 .shutdown()
109 .await
110 .map_err(|err| ParquetError::External(Box::new(err)))
111 })
112 }
113}
114impl From<BufWriter> for ParquetObjectWriter {
115 fn from(w: BufWriter) -> Self {
116 Self::from_buf_writer(w)
117 }
118}
119#[cfg(test)]
120mod tests {
121 use arrow_array::{ArrayRef, Int64Array, RecordBatch};
122 use object_store::memory::InMemory;
123 use std::sync::Arc;
124
125 use super::*;
126 use crate::arrow::AsyncArrowWriter;
127 use crate::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
128 use object_store::ObjectStoreExt;
129
130 #[tokio::test]
131 async fn test_async_writer() {
132 let store = Arc::new(InMemory::new());
133
134 let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
135 let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
136
137 let object_store_writer = ParquetObjectWriter::new(store.clone(), Path::from("test"));
138 let mut writer =
139 AsyncArrowWriter::try_new(object_store_writer, to_write.schema(), None).unwrap();
140 writer.write(&to_write).await.unwrap();
141 writer.close().await.unwrap();
142
143 let buffer = store
144 .get(&Path::from("test"))
145 .await
146 .unwrap()
147 .bytes()
148 .await
149 .unwrap();
150 let mut reader = ParquetRecordBatchReaderBuilder::try_new(buffer)
151 .unwrap()
152 .build()
153 .unwrap();
154 let read = reader.next().unwrap().unwrap();
155
156 assert_eq!(to_write, read);
157 }
158}