parquet/arrow/async_reader/
store.rs1use std::{ops::Range, sync::Arc};
19
20use bytes::Bytes;
21use futures::{future::BoxFuture, FutureExt, TryFutureExt};
22use object_store::{path::Path, ObjectMeta, ObjectStore};
23use tokio::runtime::Handle;
24
25use crate::arrow::async_reader::AsyncFileReader;
26use crate::errors::{ParquetError, Result};
27use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
28
29#[derive(Clone, Debug)]
54pub struct ParquetObjectReader {
55 store: Arc<dyn ObjectStore>,
56 meta: ObjectMeta,
57 metadata_size_hint: Option<usize>,
58 preload_column_index: bool,
59 preload_offset_index: bool,
60 runtime: Option<Handle>,
61}
62
63impl ParquetObjectReader {
64 pub fn new(store: Arc<dyn ObjectStore>, meta: ObjectMeta) -> Self {
68 Self {
69 store,
70 meta,
71 metadata_size_hint: None,
72 preload_column_index: false,
73 preload_offset_index: false,
74 runtime: None,
75 }
76 }
77
78 pub fn with_footer_size_hint(self, hint: usize) -> Self {
81 Self {
82 metadata_size_hint: Some(hint),
83 ..self
84 }
85 }
86
87 pub fn with_preload_column_index(self, preload_column_index: bool) -> Self {
89 Self {
90 preload_column_index,
91 ..self
92 }
93 }
94
95 pub fn with_preload_offset_index(self, preload_offset_index: bool) -> Self {
97 Self {
98 preload_offset_index,
99 ..self
100 }
101 }
102
103 pub fn with_runtime(self, handle: Handle) -> Self {
112 Self {
113 runtime: Some(handle),
114 ..self
115 }
116 }
117
118 fn spawn<F, O, E>(&self, f: F) -> BoxFuture<'_, Result<O>>
119 where
120 F: for<'a> FnOnce(&'a Arc<dyn ObjectStore>, &'a Path) -> BoxFuture<'a, Result<O, E>>
121 + Send
122 + 'static,
123 O: Send + 'static,
124 E: Into<ParquetError> + Send + 'static,
125 {
126 match &self.runtime {
127 Some(handle) => {
128 let path = self.meta.location.clone();
129 let store = Arc::clone(&self.store);
130 handle
131 .spawn(async move { f(&store, &path).await })
132 .map_ok_or_else(
133 |e| match e.try_into_panic() {
134 Err(e) => Err(ParquetError::External(Box::new(e))),
135 Ok(p) => std::panic::resume_unwind(p),
136 },
137 |res| res.map_err(|e| e.into()),
138 )
139 .boxed()
140 }
141 None => f(&self.store, &self.meta.location)
142 .map_err(|e| e.into())
143 .boxed(),
144 }
145 }
146}
147
148impl AsyncFileReader for ParquetObjectReader {
149 fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
150 self.spawn(|store, path| store.get_range(path, range))
151 }
152
153 fn get_byte_ranges(&mut self, ranges: Vec<Range<usize>>) -> BoxFuture<'_, Result<Vec<Bytes>>>
154 where
155 Self: Send,
156 {
157 self.spawn(|store, path| async move { store.get_ranges(path, &ranges).await }.boxed())
158 }
159
160 fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
167 Box::pin(async move {
168 let file_size = self.meta.size;
169 let metadata = ParquetMetaDataReader::new()
170 .with_column_indexes(self.preload_column_index)
171 .with_offset_indexes(self.preload_offset_index)
172 .with_prefetch_hint(self.metadata_size_hint)
173 .load_and_finish(self, file_size)
174 .await?;
175 Ok(Arc::new(metadata))
176 })
177 }
178}
179
180#[cfg(test)]
181mod tests {
182 use std::sync::{
183 atomic::{AtomicUsize, Ordering},
184 Arc,
185 };
186
187 use futures::TryStreamExt;
188
189 use arrow::util::test_util::parquet_test_data;
190 use futures::FutureExt;
191 use object_store::local::LocalFileSystem;
192 use object_store::path::Path;
193 use object_store::{ObjectMeta, ObjectStore};
194
195 use crate::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
196 use crate::arrow::ParquetRecordBatchStreamBuilder;
197 use crate::errors::ParquetError;
198
199 async fn get_meta_store() -> (ObjectMeta, Arc<dyn ObjectStore>) {
200 let res = parquet_test_data();
201 let store = LocalFileSystem::new_with_prefix(res).unwrap();
202
203 let meta = store
204 .head(&Path::from("alltypes_plain.parquet"))
205 .await
206 .unwrap();
207
208 (meta, Arc::new(store) as Arc<dyn ObjectStore>)
209 }
210
211 #[tokio::test]
212 async fn test_simple() {
213 let (meta, store) = get_meta_store().await;
214 let object_reader = ParquetObjectReader::new(store, meta);
215
216 let builder = ParquetRecordBatchStreamBuilder::new(object_reader)
217 .await
218 .unwrap();
219 let batches: Vec<_> = builder.build().unwrap().try_collect().await.unwrap();
220
221 assert_eq!(batches.len(), 1);
222 assert_eq!(batches[0].num_rows(), 8);
223 }
224
225 #[tokio::test]
226 async fn test_not_found() {
227 let (mut meta, store) = get_meta_store().await;
228 meta.location = Path::from("I don't exist.parquet");
229
230 let object_reader = ParquetObjectReader::new(store, meta);
231 match ParquetRecordBatchStreamBuilder::new(object_reader).await {
233 Ok(_) => panic!("expected failure"),
234 Err(e) => {
235 let err = e.to_string();
236 assert!(
237 err.contains("not found: No such file or directory (os error 2)"),
238 "{err}",
239 );
240 }
241 }
242 }
243
244 #[tokio::test]
245 async fn test_runtime_is_used() {
246 let num_actions = Arc::new(AtomicUsize::new(0));
247
248 let (a1, a2) = (num_actions.clone(), num_actions.clone());
249 let rt = tokio::runtime::Builder::new_multi_thread()
250 .on_thread_park(move || {
251 a1.fetch_add(1, Ordering::Relaxed);
252 })
253 .on_thread_unpark(move || {
254 a2.fetch_add(1, Ordering::Relaxed);
255 })
256 .build()
257 .unwrap();
258
259 let (meta, store) = get_meta_store().await;
260
261 let initial_actions = num_actions.load(Ordering::Relaxed);
262
263 let reader = ParquetObjectReader::new(store, meta).with_runtime(rt.handle().clone());
264
265 let builder = ParquetRecordBatchStreamBuilder::new(reader).await.unwrap();
266 let batches: Vec<_> = builder.build().unwrap().try_collect().await.unwrap();
267
268 assert_eq!(batches.len(), 1);
270 assert_eq!(batches[0].num_rows(), 8);
271
272 assert!(num_actions.load(Ordering::Relaxed) - initial_actions > 0);
273
274 tokio::runtime::Handle::current().spawn_blocking(move || drop(rt));
277 }
278
279 #[tokio::test]
281 async fn test_runtime_thread_id_different() {
282 let rt = tokio::runtime::Builder::new_multi_thread()
283 .worker_threads(1)
284 .build()
285 .unwrap();
286
287 let (meta, store) = get_meta_store().await;
288
289 let reader = ParquetObjectReader::new(store, meta).with_runtime(rt.handle().clone());
290
291 let current_id = std::thread::current().id();
292
293 let other_id = reader
294 .spawn(|_, _| async move { Ok::<_, ParquetError>(std::thread::current().id()) }.boxed())
295 .await
296 .unwrap();
297
298 assert_ne!(current_id, other_id);
299
300 tokio::runtime::Handle::current().spawn_blocking(move || drop(rt));
301 }
302
303 #[tokio::test]
304 async fn io_fails_on_shutdown_runtime() {
305 let rt = tokio::runtime::Builder::new_multi_thread()
306 .worker_threads(1)
307 .build()
308 .unwrap();
309
310 let (meta, store) = get_meta_store().await;
311
312 let mut reader = ParquetObjectReader::new(store, meta).with_runtime(rt.handle().clone());
313
314 rt.shutdown_background();
315
316 let err = reader.get_bytes(0..1).await.unwrap_err().to_string();
317
318 assert!(err.to_string().contains("was cancelled"));
319 }
320}