parquet/arrow/async_reader/
store.rs1use std::{ops::Range, sync::Arc};
19
20use crate::arrow::arrow_reader::ArrowReaderOptions;
21use crate::arrow::async_reader::AsyncFileReader;
22use crate::errors::{ParquetError, Result};
23use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
24use bytes::Bytes;
25use futures::{future::BoxFuture, FutureExt, TryFutureExt};
26use object_store::{path::Path, ObjectMeta, ObjectStore};
27use tokio::runtime::Handle;
28
29#[derive(Clone, Debug)]
54pub struct ParquetObjectReader {
55 store: Arc<dyn ObjectStore>,
56 meta: ObjectMeta,
57 metadata_size_hint: Option<usize>,
58 preload_column_index: bool,
59 preload_offset_index: bool,
60 runtime: Option<Handle>,
61}
62
63impl ParquetObjectReader {
64 pub fn new(store: Arc<dyn ObjectStore>, meta: ObjectMeta) -> Self {
68 Self {
69 store,
70 meta,
71 metadata_size_hint: None,
72 preload_column_index: false,
73 preload_offset_index: false,
74 runtime: None,
75 }
76 }
77
78 pub fn with_footer_size_hint(self, hint: usize) -> Self {
81 Self {
82 metadata_size_hint: Some(hint),
83 ..self
84 }
85 }
86
87 pub fn with_preload_column_index(self, preload_column_index: bool) -> Self {
89 Self {
90 preload_column_index,
91 ..self
92 }
93 }
94
95 pub fn with_preload_offset_index(self, preload_offset_index: bool) -> Self {
97 Self {
98 preload_offset_index,
99 ..self
100 }
101 }
102
103 pub fn with_runtime(self, handle: Handle) -> Self {
112 Self {
113 runtime: Some(handle),
114 ..self
115 }
116 }
117
118 fn spawn<F, O, E>(&self, f: F) -> BoxFuture<'_, Result<O>>
119 where
120 F: for<'a> FnOnce(&'a Arc<dyn ObjectStore>, &'a Path) -> BoxFuture<'a, Result<O, E>>
121 + Send
122 + 'static,
123 O: Send + 'static,
124 E: Into<ParquetError> + Send + 'static,
125 {
126 match &self.runtime {
127 Some(handle) => {
128 let path = self.meta.location.clone();
129 let store = Arc::clone(&self.store);
130 handle
131 .spawn(async move { f(&store, &path).await })
132 .map_ok_or_else(
133 |e| match e.try_into_panic() {
134 Err(e) => Err(ParquetError::External(Box::new(e))),
135 Ok(p) => std::panic::resume_unwind(p),
136 },
137 |res| res.map_err(|e| e.into()),
138 )
139 .boxed()
140 }
141 None => f(&self.store, &self.meta.location)
142 .map_err(|e| e.into())
143 .boxed(),
144 }
145 }
146}
147
148impl AsyncFileReader for ParquetObjectReader {
149 fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
150 self.spawn(|store, path| store.get_range(path, range))
151 }
152
153 fn get_byte_ranges(&mut self, ranges: Vec<Range<usize>>) -> BoxFuture<'_, Result<Vec<Bytes>>>
154 where
155 Self: Send,
156 {
157 self.spawn(|store, path| async move { store.get_ranges(path, &ranges).await }.boxed())
158 }
159
160 fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
167 Box::pin(async move {
168 let file_size = self.meta.size;
169 let metadata = ParquetMetaDataReader::new()
170 .with_column_indexes(self.preload_column_index)
171 .with_offset_indexes(self.preload_offset_index)
172 .with_prefetch_hint(self.metadata_size_hint)
173 .load_and_finish(self, file_size)
174 .await?;
175 Ok(Arc::new(metadata))
176 })
177 }
178
179 fn get_metadata_with_options<'a>(
180 &'a mut self,
181 options: &'a ArrowReaderOptions,
182 ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
183 Box::pin(async move {
184 let file_size = self.meta.size;
185 let metadata = ParquetMetaDataReader::new()
186 .with_column_indexes(self.preload_column_index)
187 .with_offset_indexes(self.preload_offset_index)
188 .with_prefetch_hint(self.metadata_size_hint);
189
190 #[cfg(feature = "encryption")]
191 let metadata =
192 metadata.with_decryption_properties(options.file_decryption_properties.as_ref());
193
194 let metadata = metadata.load_and_finish(self, file_size).await?;
195
196 Ok(Arc::new(metadata))
197 })
198 }
199}
200
201#[cfg(test)]
202mod tests {
203 use std::sync::{
204 atomic::{AtomicUsize, Ordering},
205 Arc,
206 };
207
208 use futures::TryStreamExt;
209
210 use crate::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
211 use crate::arrow::ParquetRecordBatchStreamBuilder;
212 use crate::errors::ParquetError;
213 use arrow::util::test_util::parquet_test_data;
214 use futures::FutureExt;
215 use object_store::local::LocalFileSystem;
216 use object_store::path::Path;
217 use object_store::{ObjectMeta, ObjectStore};
218
219 async fn get_meta_store() -> (ObjectMeta, Arc<dyn ObjectStore>) {
220 let res = parquet_test_data();
221 let store = LocalFileSystem::new_with_prefix(res).unwrap();
222
223 let meta = store
224 .head(&Path::from("alltypes_plain.parquet"))
225 .await
226 .unwrap();
227
228 (meta, Arc::new(store) as Arc<dyn ObjectStore>)
229 }
230
231 #[tokio::test]
232 async fn test_simple() {
233 let (meta, store) = get_meta_store().await;
234 let object_reader = ParquetObjectReader::new(store, meta);
235
236 let builder = ParquetRecordBatchStreamBuilder::new(object_reader)
237 .await
238 .unwrap();
239 let batches: Vec<_> = builder.build().unwrap().try_collect().await.unwrap();
240
241 assert_eq!(batches.len(), 1);
242 assert_eq!(batches[0].num_rows(), 8);
243 }
244
245 #[tokio::test]
246 async fn test_not_found() {
247 let (mut meta, store) = get_meta_store().await;
248 meta.location = Path::from("I don't exist.parquet");
249
250 let object_reader = ParquetObjectReader::new(store, meta);
251 match ParquetRecordBatchStreamBuilder::new(object_reader).await {
253 Ok(_) => panic!("expected failure"),
254 Err(e) => {
255 let err = e.to_string();
256 assert!(
257 err.contains("not found: No such file or directory (os error 2)"),
258 "{err}",
259 );
260 }
261 }
262 }
263
264 #[tokio::test]
265 async fn test_runtime_is_used() {
266 let num_actions = Arc::new(AtomicUsize::new(0));
267
268 let (a1, a2) = (num_actions.clone(), num_actions.clone());
269 let rt = tokio::runtime::Builder::new_multi_thread()
270 .on_thread_park(move || {
271 a1.fetch_add(1, Ordering::Relaxed);
272 })
273 .on_thread_unpark(move || {
274 a2.fetch_add(1, Ordering::Relaxed);
275 })
276 .build()
277 .unwrap();
278
279 let (meta, store) = get_meta_store().await;
280
281 let initial_actions = num_actions.load(Ordering::Relaxed);
282
283 let reader = ParquetObjectReader::new(store, meta).with_runtime(rt.handle().clone());
284
285 let builder = ParquetRecordBatchStreamBuilder::new(reader).await.unwrap();
286 let batches: Vec<_> = builder.build().unwrap().try_collect().await.unwrap();
287
288 assert_eq!(batches.len(), 1);
290 assert_eq!(batches[0].num_rows(), 8);
291
292 assert!(num_actions.load(Ordering::Relaxed) - initial_actions > 0);
293
294 tokio::runtime::Handle::current().spawn_blocking(move || drop(rt));
297 }
298
299 #[tokio::test]
301 async fn test_runtime_thread_id_different() {
302 let rt = tokio::runtime::Builder::new_multi_thread()
303 .worker_threads(1)
304 .build()
305 .unwrap();
306
307 let (meta, store) = get_meta_store().await;
308
309 let reader = ParquetObjectReader::new(store, meta).with_runtime(rt.handle().clone());
310
311 let current_id = std::thread::current().id();
312
313 let other_id = reader
314 .spawn(|_, _| async move { Ok::<_, ParquetError>(std::thread::current().id()) }.boxed())
315 .await
316 .unwrap();
317
318 assert_ne!(current_id, other_id);
319
320 tokio::runtime::Handle::current().spawn_blocking(move || drop(rt));
321 }
322
323 #[tokio::test]
324 async fn io_fails_on_shutdown_runtime() {
325 let rt = tokio::runtime::Builder::new_multi_thread()
326 .worker_threads(1)
327 .build()
328 .unwrap();
329
330 let (meta, store) = get_meta_store().await;
331
332 let mut reader = ParquetObjectReader::new(store, meta).with_runtime(rt.handle().clone());
333
334 rt.shutdown_background();
335
336 let err = reader.get_bytes(0..1).await.unwrap_err().to_string();
337
338 assert!(err.to_string().contains("was cancelled"));
339 }
340}