parquet/arrow/async_writer/store.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use bytes::Bytes;
19use futures::future::BoxFuture;
20use std::sync::Arc;
21
22use crate::arrow::async_writer::AsyncFileWriter;
23use crate::errors::{ParquetError, Result};
24use object_store::buffered::BufWriter;
25use object_store::path::Path;
26use object_store::ObjectStore;
27use tokio::io::AsyncWriteExt;
28
29/// [`ParquetObjectWriter`] for writing to parquet to [`ObjectStore`]
30///
31/// ```
32/// # use arrow_array::{ArrayRef, Int64Array, RecordBatch};
33/// # use object_store::memory::InMemory;
34/// # use object_store::path::Path;
35/// # use object_store::ObjectStore;
36/// # use std::sync::Arc;
37///
38/// # use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
39/// # use parquet::arrow::async_writer::ParquetObjectWriter;
40/// # use parquet::arrow::AsyncArrowWriter;
41///
42/// # #[tokio::main(flavor="current_thread")]
43/// # async fn main() {
44/// let store = Arc::new(InMemory::new());
45///
46/// let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
47/// let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
48///
49/// let object_store_writer = ParquetObjectWriter::new(store.clone(), Path::from("test"));
50/// let mut writer =
51/// AsyncArrowWriter::try_new(object_store_writer, to_write.schema(), None).unwrap();
52/// writer.write(&to_write).await.unwrap();
53/// writer.close().await.unwrap();
54///
55/// let buffer = store
56/// .get(&Path::from("test"))
57/// .await
58/// .unwrap()
59/// .bytes()
60/// .await
61/// .unwrap();
62/// let mut reader = ParquetRecordBatchReaderBuilder::try_new(buffer)
63/// .unwrap()
64/// .build()
65/// .unwrap();
66/// let read = reader.next().unwrap().unwrap();
67///
68/// assert_eq!(to_write, read);
69/// # }
70/// ```
71#[derive(Debug)]
72pub struct ParquetObjectWriter {
73 w: BufWriter,
74}
75
76impl ParquetObjectWriter {
77 /// Create a new [`ParquetObjectWriter`] that writes to the specified path in the given store.
78 ///
79 /// To configure the writer behavior, please build [`BufWriter`] and then use [`Self::from_buf_writer`]
80 pub fn new(store: Arc<dyn ObjectStore>, path: Path) -> Self {
81 Self::from_buf_writer(BufWriter::new(store, path))
82 }
83
84 /// Construct a new ParquetObjectWriter via a existing BufWriter.
85 pub fn from_buf_writer(w: BufWriter) -> Self {
86 Self { w }
87 }
88
89 /// Consume the writer and return the underlying BufWriter.
90 pub fn into_inner(self) -> BufWriter {
91 self.w
92 }
93}
94
95impl AsyncFileWriter for ParquetObjectWriter {
96 fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<()>> {
97 Box::pin(async {
98 self.w
99 .put(bs)
100 .await
101 .map_err(|err| ParquetError::External(Box::new(err)))
102 })
103 }
104
105 fn complete(&mut self) -> BoxFuture<'_, Result<()>> {
106 Box::pin(async {
107 self.w
108 .shutdown()
109 .await
110 .map_err(|err| ParquetError::External(Box::new(err)))
111 })
112 }
113}
114impl From<BufWriter> for ParquetObjectWriter {
115 fn from(w: BufWriter) -> Self {
116 Self::from_buf_writer(w)
117 }
118}
119#[cfg(test)]
120mod tests {
121 use arrow_array::{ArrayRef, Int64Array, RecordBatch};
122 use object_store::memory::InMemory;
123 use std::sync::Arc;
124
125 use super::*;
126 use crate::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
127 use crate::arrow::AsyncArrowWriter;
128
129 #[tokio::test]
130 async fn test_async_writer() {
131 let store = Arc::new(InMemory::new());
132
133 let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
134 let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
135
136 let object_store_writer = ParquetObjectWriter::new(store.clone(), Path::from("test"));
137 let mut writer =
138 AsyncArrowWriter::try_new(object_store_writer, to_write.schema(), None).unwrap();
139 writer.write(&to_write).await.unwrap();
140 writer.close().await.unwrap();
141
142 let buffer = store
143 .get(&Path::from("test"))
144 .await
145 .unwrap()
146 .bytes()
147 .await
148 .unwrap();
149 let mut reader = ParquetRecordBatchReaderBuilder::try_new(buffer)
150 .unwrap()
151 .build()
152 .unwrap();
153 let read = reader.next().unwrap().unwrap();
154
155 assert_eq!(to_write, read);
156 }
157}