parquet/arrow/async_reader/
store.rs1use std::{ops::Range, sync::Arc};
19
20use crate::arrow::arrow_reader::ArrowReaderOptions;
21use crate::arrow::async_reader::{AsyncFileReader, MetadataSuffixFetch};
22use crate::errors::{ParquetError, Result};
23use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader};
24use bytes::Bytes;
25use futures::{FutureExt, TryFutureExt, future::BoxFuture};
26use object_store::{GetOptions, GetRange};
27use object_store::{ObjectStore, path::Path};
28use tokio::runtime::Handle;
29
30#[derive(Clone, Debug)]
55pub struct ParquetObjectReader {
56 store: Arc<dyn ObjectStore>,
57 path: Path,
58 file_size: Option<u64>,
59 metadata_size_hint: Option<usize>,
60 preload_column_index: bool,
61 preload_offset_index: bool,
62 runtime: Option<Handle>,
63}
64
65impl ParquetObjectReader {
66 pub fn new(store: Arc<dyn ObjectStore>, path: Path) -> Self {
68 Self {
69 store,
70 path,
71 file_size: None,
72 metadata_size_hint: None,
73 preload_column_index: false,
74 preload_offset_index: false,
75 runtime: None,
76 }
77 }
78
79 pub fn with_footer_size_hint(self, hint: usize) -> Self {
82 Self {
83 metadata_size_hint: Some(hint),
84 ..self
85 }
86 }
87
88 pub fn with_file_size(self, file_size: u64) -> Self {
98 Self {
99 file_size: Some(file_size),
100 ..self
101 }
102 }
103
104 pub fn with_preload_column_index(self, preload_column_index: bool) -> Self {
106 Self {
107 preload_column_index,
108 ..self
109 }
110 }
111
112 pub fn with_preload_offset_index(self, preload_offset_index: bool) -> Self {
114 Self {
115 preload_offset_index,
116 ..self
117 }
118 }
119
120 pub fn with_runtime(self, handle: Handle) -> Self {
129 Self {
130 runtime: Some(handle),
131 ..self
132 }
133 }
134
135 fn spawn<F, O, E>(&self, f: F) -> BoxFuture<'_, Result<O>>
136 where
137 F: for<'a> FnOnce(&'a Arc<dyn ObjectStore>, &'a Path) -> BoxFuture<'a, Result<O, E>>
138 + Send
139 + 'static,
140 O: Send + 'static,
141 E: Into<ParquetError> + Send + 'static,
142 {
143 match &self.runtime {
144 Some(handle) => {
145 let path = self.path.clone();
146 let store = Arc::clone(&self.store);
147 handle
148 .spawn(async move { f(&store, &path).await })
149 .map_ok_or_else(
150 |e| match e.try_into_panic() {
151 Err(e) => Err(ParquetError::External(Box::new(e))),
152 Ok(p) => std::panic::resume_unwind(p),
153 },
154 |res| res.map_err(|e| e.into()),
155 )
156 .boxed()
157 }
158 None => f(&self.store, &self.path).map_err(|e| e.into()).boxed(),
159 }
160 }
161}
162
163impl MetadataSuffixFetch for &mut ParquetObjectReader {
164 fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
165 let options = GetOptions {
166 range: Some(GetRange::Suffix(suffix as u64)),
167 ..Default::default()
168 };
169 self.spawn(|store, path| {
170 async move {
171 let resp = store.get_opts(path, options).await?;
172 Ok::<_, ParquetError>(resp.bytes().await?)
173 }
174 .boxed()
175 })
176 }
177}
178
179impl AsyncFileReader for ParquetObjectReader {
180 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
181 self.spawn(|store, path| store.get_range(path, range))
182 }
183
184 fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>>
185 where
186 Self: Send,
187 {
188 self.spawn(|store, path| async move { store.get_ranges(path, &ranges).await }.boxed())
189 }
190
191 fn get_metadata<'a>(
198 &'a mut self,
199 options: Option<&'a ArrowReaderOptions>,
200 ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
201 Box::pin(async move {
202 let mut metadata = ParquetMetaDataReader::new()
203 .with_column_index_policy(PageIndexPolicy::from(self.preload_column_index))
204 .with_offset_index_policy(PageIndexPolicy::from(self.preload_offset_index))
205 .with_prefetch_hint(self.metadata_size_hint);
206
207 #[cfg(feature = "encryption")]
208 if let Some(options) = options {
209 metadata = metadata.with_decryption_properties(
210 options.file_decryption_properties.as_ref().map(Arc::clone),
211 );
212 }
213
214 let metadata = if let Some(file_size) = self.file_size {
215 metadata.load_and_finish(self, file_size).await?
216 } else {
217 metadata.load_via_suffix_and_finish(self).await?
218 };
219
220 Ok(Arc::new(metadata))
221 })
222 }
223}
224
225#[cfg(test)]
226mod tests {
227 use std::sync::{
228 Arc,
229 atomic::{AtomicUsize, Ordering},
230 };
231
232 use futures::TryStreamExt;
233
234 use crate::arrow::ParquetRecordBatchStreamBuilder;
235 use crate::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
236 use crate::errors::ParquetError;
237 use arrow::util::test_util::parquet_test_data;
238 use futures::FutureExt;
239 use object_store::local::LocalFileSystem;
240 use object_store::path::Path;
241 use object_store::{ObjectMeta, ObjectStore};
242
243 async fn get_meta_store() -> (ObjectMeta, Arc<dyn ObjectStore>) {
244 let res = parquet_test_data();
245 let store = LocalFileSystem::new_with_prefix(res).unwrap();
246
247 let meta = store
248 .head(&Path::from("alltypes_plain.parquet"))
249 .await
250 .unwrap();
251
252 (meta, Arc::new(store) as Arc<dyn ObjectStore>)
253 }
254
255 #[tokio::test]
256 async fn test_simple() {
257 let (meta, store) = get_meta_store().await;
258 let object_reader =
259 ParquetObjectReader::new(store, meta.location).with_file_size(meta.size);
260
261 let builder = ParquetRecordBatchStreamBuilder::new(object_reader)
262 .await
263 .unwrap();
264 let batches: Vec<_> = builder.build().unwrap().try_collect().await.unwrap();
265
266 assert_eq!(batches.len(), 1);
267 assert_eq!(batches[0].num_rows(), 8);
268 }
269
270 #[tokio::test]
271 async fn test_simple_without_file_length() {
272 let (meta, store) = get_meta_store().await;
273 let object_reader = ParquetObjectReader::new(store, meta.location);
274
275 let builder = ParquetRecordBatchStreamBuilder::new(object_reader)
276 .await
277 .unwrap();
278 let batches: Vec<_> = builder.build().unwrap().try_collect().await.unwrap();
279
280 assert_eq!(batches.len(), 1);
281 assert_eq!(batches[0].num_rows(), 8);
282 }
283
284 #[tokio::test]
285 async fn test_not_found() {
286 let (mut meta, store) = get_meta_store().await;
287 meta.location = Path::from("I don't exist.parquet");
288
289 let object_reader =
290 ParquetObjectReader::new(store, meta.location).with_file_size(meta.size);
291 match ParquetRecordBatchStreamBuilder::new(object_reader).await {
293 Ok(_) => panic!("expected failure"),
294 Err(e) => {
295 let err = e.to_string();
296 assert!(err.contains("I don't exist.parquet not found:"), "{err}",);
297 }
298 }
299 }
300
301 #[tokio::test]
302 async fn test_runtime_is_used() {
303 let num_actions = Arc::new(AtomicUsize::new(0));
304
305 let (a1, a2) = (num_actions.clone(), num_actions.clone());
306 let rt = tokio::runtime::Builder::new_multi_thread()
307 .on_thread_park(move || {
308 a1.fetch_add(1, Ordering::Relaxed);
309 })
310 .on_thread_unpark(move || {
311 a2.fetch_add(1, Ordering::Relaxed);
312 })
313 .build()
314 .unwrap();
315
316 let (meta, store) = get_meta_store().await;
317
318 let initial_actions = num_actions.load(Ordering::Relaxed);
319
320 let reader = ParquetObjectReader::new(store, meta.location)
321 .with_file_size(meta.size)
322 .with_runtime(rt.handle().clone());
323
324 let builder = ParquetRecordBatchStreamBuilder::new(reader).await.unwrap();
325 let batches: Vec<_> = builder.build().unwrap().try_collect().await.unwrap();
326
327 assert_eq!(batches.len(), 1);
329 assert_eq!(batches[0].num_rows(), 8);
330
331 assert!(num_actions.load(Ordering::Relaxed) - initial_actions > 0);
332
333 tokio::runtime::Handle::current().spawn_blocking(move || drop(rt));
336 }
337
338 #[tokio::test]
340 async fn test_runtime_thread_id_different() {
341 let rt = tokio::runtime::Builder::new_multi_thread()
342 .worker_threads(1)
343 .build()
344 .unwrap();
345
346 let (meta, store) = get_meta_store().await;
347
348 let reader = ParquetObjectReader::new(store, meta.location)
349 .with_file_size(meta.size)
350 .with_runtime(rt.handle().clone());
351
352 let current_id = std::thread::current().id();
353
354 let other_id = reader
355 .spawn(|_, _| async move { Ok::<_, ParquetError>(std::thread::current().id()) }.boxed())
356 .await
357 .unwrap();
358
359 assert_ne!(current_id, other_id);
360
361 tokio::runtime::Handle::current().spawn_blocking(move || drop(rt));
362 }
363
364 #[tokio::test]
365 async fn io_fails_on_shutdown_runtime() {
366 let rt = tokio::runtime::Builder::new_multi_thread()
367 .worker_threads(1)
368 .build()
369 .unwrap();
370
371 let (meta, store) = get_meta_store().await;
372
373 let mut reader = ParquetObjectReader::new(store, meta.location)
374 .with_file_size(meta.size)
375 .with_runtime(rt.handle().clone());
376
377 rt.shutdown_background();
378
379 let err = reader.get_bytes(0..1).await.unwrap_err().to_string();
380
381 assert!(err.to_string().contains("was cancelled"));
382 }
383}