parquet/arrow/async_reader/
store.rs1use std::{ops::Range, sync::Arc};
19
20use crate::arrow::arrow_reader::ArrowReaderOptions;
21use crate::arrow::async_reader::{AsyncFileReader, MetadataSuffixFetch};
22use crate::errors::{ParquetError, Result};
23use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader};
24use bytes::Bytes;
25use futures::{FutureExt, TryFutureExt, future::BoxFuture};
26use object_store::{GetOptions, GetRange};
27use object_store::{ObjectStore, path::Path};
28use tokio::runtime::Handle;
29
30#[derive(Clone, Debug)]
55pub struct ParquetObjectReader {
56 store: Arc<dyn ObjectStore>,
57 path: Path,
58 file_size: Option<u64>,
59 metadata_size_hint: Option<usize>,
60 preload_column_index: bool,
61 preload_offset_index: bool,
62 runtime: Option<Handle>,
63}
64
65impl ParquetObjectReader {
66 pub fn new(store: Arc<dyn ObjectStore>, path: Path) -> Self {
68 Self {
69 store,
70 path,
71 file_size: None,
72 metadata_size_hint: None,
73 preload_column_index: false,
74 preload_offset_index: false,
75 runtime: None,
76 }
77 }
78
79 pub fn with_footer_size_hint(self, hint: usize) -> Self {
82 Self {
83 metadata_size_hint: Some(hint),
84 ..self
85 }
86 }
87
88 pub fn with_file_size(self, file_size: u64) -> Self {
98 Self {
99 file_size: Some(file_size),
100 ..self
101 }
102 }
103
104 pub fn with_preload_column_index(self, preload_column_index: bool) -> Self {
110 Self {
111 preload_column_index,
112 ..self
113 }
114 }
115
116 pub fn with_preload_offset_index(self, preload_offset_index: bool) -> Self {
122 Self {
123 preload_offset_index,
124 ..self
125 }
126 }
127
128 pub fn with_runtime(self, handle: Handle) -> Self {
137 Self {
138 runtime: Some(handle),
139 ..self
140 }
141 }
142
143 fn spawn<F, O, E>(&self, f: F) -> BoxFuture<'_, Result<O>>
144 where
145 F: for<'a> FnOnce(&'a Arc<dyn ObjectStore>, &'a Path) -> BoxFuture<'a, Result<O, E>>
146 + Send
147 + 'static,
148 O: Send + 'static,
149 E: Into<ParquetError> + Send + 'static,
150 {
151 match &self.runtime {
152 Some(handle) => {
153 let path = self.path.clone();
154 let store = Arc::clone(&self.store);
155 handle
156 .spawn(async move { f(&store, &path).await })
157 .map_ok_or_else(
158 |e| match e.try_into_panic() {
159 Err(e) => Err(ParquetError::External(Box::new(e))),
160 Ok(p) => std::panic::resume_unwind(p),
161 },
162 |res| res.map_err(|e| e.into()),
163 )
164 .boxed()
165 }
166 None => f(&self.store, &self.path).map_err(|e| e.into()).boxed(),
167 }
168 }
169}
170
171impl MetadataSuffixFetch for &mut ParquetObjectReader {
172 fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
173 let options = GetOptions {
174 range: Some(GetRange::Suffix(suffix as u64)),
175 ..Default::default()
176 };
177 self.spawn(|store, path| {
178 async move {
179 let resp = store.get_opts(path, options).await?;
180 Ok::<_, ParquetError>(resp.bytes().await?)
181 }
182 .boxed()
183 })
184 }
185}
186
187impl AsyncFileReader for ParquetObjectReader {
188 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
189 self.spawn(|store, path| store.get_range(path, range))
190 }
191
192 fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>>
193 where
194 Self: Send,
195 {
196 self.spawn(|store, path| async move { store.get_ranges(path, &ranges).await }.boxed())
197 }
198
199 fn get_metadata<'a>(
206 &'a mut self,
207 options: Option<&'a ArrowReaderOptions>,
208 ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
209 Box::pin(async move {
210 let metadata_opts = options.map(|o| o.metadata_options().clone());
211 let mut metadata = ParquetMetaDataReader::new()
212 .with_metadata_options(metadata_opts)
213 .with_column_index_policy(PageIndexPolicy::from(self.preload_column_index))
214 .with_offset_index_policy(PageIndexPolicy::from(self.preload_offset_index))
215 .with_prefetch_hint(self.metadata_size_hint);
216
217 #[cfg(feature = "encryption")]
218 if let Some(options) = options {
219 metadata = metadata.with_decryption_properties(
220 options.file_decryption_properties.as_ref().map(Arc::clone),
221 );
222 }
223
224 if let Some(options) = options {
229 if options.page_index_policy != PageIndexPolicy::Skip {
230 metadata = metadata.with_page_index_policy(options.page_index_policy);
231 }
232 }
233
234 let metadata = if let Some(file_size) = self.file_size {
235 metadata.load_and_finish(self, file_size).await?
236 } else {
237 metadata.load_via_suffix_and_finish(self).await?
238 };
239
240 Ok(Arc::new(metadata))
241 })
242 }
243}
244
245#[cfg(test)]
246mod tests {
247 use crate::arrow::async_reader::ArrowReaderOptions;
248 use crate::file::metadata::PageIndexPolicy;
249 use std::sync::{
250 Arc,
251 atomic::{AtomicUsize, Ordering},
252 };
253
254 use futures::TryStreamExt;
255
256 use crate::arrow::ParquetRecordBatchStreamBuilder;
257 use crate::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
258 use crate::errors::ParquetError;
259 use arrow::util::test_util::parquet_test_data;
260 use futures::FutureExt;
261 use object_store::local::LocalFileSystem;
262 use object_store::path::Path;
263 use object_store::{ObjectMeta, ObjectStore};
264
265 async fn get_meta_store() -> (ObjectMeta, Arc<dyn ObjectStore>) {
266 let res = parquet_test_data();
267 let store = LocalFileSystem::new_with_prefix(res).unwrap();
268
269 let meta = store
270 .head(&Path::from("alltypes_plain.parquet"))
271 .await
272 .unwrap();
273
274 (meta, Arc::new(store) as Arc<dyn ObjectStore>)
275 }
276
277 async fn get_meta_store_with_page_index() -> (ObjectMeta, Arc<dyn ObjectStore>) {
278 let res = parquet_test_data();
279 let store = LocalFileSystem::new_with_prefix(res).unwrap();
280
281 let meta = store
282 .head(&Path::from("alltypes_tiny_pages_plain.parquet"))
283 .await
284 .unwrap();
285
286 (meta, Arc::new(store) as Arc<dyn ObjectStore>)
287 }
288
289 #[tokio::test]
290 async fn test_simple() {
291 let (meta, store) = get_meta_store().await;
292 let object_reader =
293 ParquetObjectReader::new(store, meta.location).with_file_size(meta.size);
294
295 let builder = ParquetRecordBatchStreamBuilder::new(object_reader)
296 .await
297 .unwrap();
298 let batches: Vec<_> = builder.build().unwrap().try_collect().await.unwrap();
299
300 assert_eq!(batches.len(), 1);
301 assert_eq!(batches[0].num_rows(), 8);
302 }
303
304 #[tokio::test]
305 async fn test_simple_without_file_length() {
306 let (meta, store) = get_meta_store().await;
307 let object_reader = ParquetObjectReader::new(store, meta.location);
308
309 let builder = ParquetRecordBatchStreamBuilder::new(object_reader)
310 .await
311 .unwrap();
312 let batches: Vec<_> = builder.build().unwrap().try_collect().await.unwrap();
313
314 assert_eq!(batches.len(), 1);
315 assert_eq!(batches[0].num_rows(), 8);
316 }
317
318 #[tokio::test]
319 async fn test_not_found() {
320 let (mut meta, store) = get_meta_store().await;
321 meta.location = Path::from("I don't exist.parquet");
322
323 let object_reader =
324 ParquetObjectReader::new(store, meta.location).with_file_size(meta.size);
325 match ParquetRecordBatchStreamBuilder::new(object_reader).await {
327 Ok(_) => panic!("expected failure"),
328 Err(e) => {
329 let err = e.to_string();
330 assert!(err.contains("I don't exist.parquet not found:"), "{err}",);
331 }
332 }
333 }
334
335 #[tokio::test]
336 async fn test_runtime_is_used() {
337 let num_actions = Arc::new(AtomicUsize::new(0));
338
339 let (a1, a2) = (num_actions.clone(), num_actions.clone());
340 let rt = tokio::runtime::Builder::new_multi_thread()
341 .on_thread_park(move || {
342 a1.fetch_add(1, Ordering::Relaxed);
343 })
344 .on_thread_unpark(move || {
345 a2.fetch_add(1, Ordering::Relaxed);
346 })
347 .build()
348 .unwrap();
349
350 let (meta, store) = get_meta_store().await;
351
352 let initial_actions = num_actions.load(Ordering::Relaxed);
353
354 let reader = ParquetObjectReader::new(store, meta.location)
355 .with_file_size(meta.size)
356 .with_runtime(rt.handle().clone());
357
358 let builder = ParquetRecordBatchStreamBuilder::new(reader).await.unwrap();
359 let batches: Vec<_> = builder.build().unwrap().try_collect().await.unwrap();
360
361 assert_eq!(batches.len(), 1);
363 assert_eq!(batches[0].num_rows(), 8);
364
365 assert!(num_actions.load(Ordering::Relaxed) - initial_actions > 0);
366
367 tokio::runtime::Handle::current().spawn_blocking(move || drop(rt));
370 }
371
372 #[tokio::test]
374 async fn test_runtime_thread_id_different() {
375 let rt = tokio::runtime::Builder::new_multi_thread()
376 .worker_threads(1)
377 .build()
378 .unwrap();
379
380 let (meta, store) = get_meta_store().await;
381
382 let reader = ParquetObjectReader::new(store, meta.location)
383 .with_file_size(meta.size)
384 .with_runtime(rt.handle().clone());
385
386 let current_id = std::thread::current().id();
387
388 let other_id = reader
389 .spawn(|_, _| async move { Ok::<_, ParquetError>(std::thread::current().id()) }.boxed())
390 .await
391 .unwrap();
392
393 assert_ne!(current_id, other_id);
394
395 tokio::runtime::Handle::current().spawn_blocking(move || drop(rt));
396 }
397
398 #[tokio::test]
399 async fn io_fails_on_shutdown_runtime() {
400 let rt = tokio::runtime::Builder::new_multi_thread()
401 .worker_threads(1)
402 .build()
403 .unwrap();
404
405 let (meta, store) = get_meta_store().await;
406
407 let mut reader = ParquetObjectReader::new(store, meta.location)
408 .with_file_size(meta.size)
409 .with_runtime(rt.handle().clone());
410
411 rt.shutdown_background();
412
413 let err = reader.get_bytes(0..1).await.unwrap_err().to_string();
414
415 assert!(err.to_string().contains("was cancelled"));
416 }
417
418 #[tokio::test]
419 async fn test_page_index_policy_skip_uses_preload_true() {
420 let (meta, store) = get_meta_store_with_page_index().await;
421
422 let mut reader = ParquetObjectReader::new(store.clone(), meta.location.clone())
424 .with_file_size(meta.size)
425 .with_preload_column_index(true)
426 .with_preload_offset_index(true);
427
428 let mut options = ArrowReaderOptions::new();
430 options.page_index_policy = PageIndexPolicy::Skip;
431
432 let metadata = reader.get_metadata(Some(&options)).await.unwrap();
434
435 assert!(metadata.column_index().is_some());
437 }
438
439 #[tokio::test]
440 async fn test_page_index_policy_optional_overrides_preload_false() {
441 let (meta, store) = get_meta_store_with_page_index().await;
442
443 let mut reader = ParquetObjectReader::new(store.clone(), meta.location.clone())
445 .with_file_size(meta.size)
446 .with_preload_column_index(false)
447 .with_preload_offset_index(false);
448
449 let mut options = ArrowReaderOptions::new();
451 options.page_index_policy = PageIndexPolicy::Optional;
452
453 let metadata = reader.get_metadata(Some(&options)).await.unwrap();
455
456 assert!(metadata.column_index().is_some());
459 }
460
461 #[tokio::test]
462 async fn test_page_index_policy_optional_vs_skip() {
463 let (meta, store) = get_meta_store_with_page_index().await;
464
465 let mut reader1 = ParquetObjectReader::new(store.clone(), meta.location.clone())
467 .with_file_size(meta.size)
468 .with_preload_column_index(false)
469 .with_preload_offset_index(false);
470
471 let mut options1 = ArrowReaderOptions::new();
472 options1.page_index_policy = PageIndexPolicy::Skip;
473 let metadata1 = reader1.get_metadata(Some(&options1)).await.unwrap();
474
475 let mut reader2 = ParquetObjectReader::new(store.clone(), meta.location.clone())
477 .with_file_size(meta.size)
478 .with_preload_column_index(false)
479 .with_preload_offset_index(false);
480
481 let mut options2 = ArrowReaderOptions::new();
482 options2.page_index_policy = PageIndexPolicy::Optional;
483 let metadata2 = reader2.get_metadata(Some(&options2)).await.unwrap();
484
485 assert!(metadata1.column_index().is_none());
489 assert!(metadata2.column_index().is_some());
490 }
491
492 #[tokio::test]
493 async fn test_page_index_policy_no_options_uses_preload() {
494 let (meta, store) = get_meta_store_with_page_index().await;
495
496 let mut reader = ParquetObjectReader::new(store, meta.location)
498 .with_file_size(meta.size)
499 .with_preload_column_index(true)
500 .with_preload_offset_index(true);
501
502 let metadata = reader.get_metadata(None).await.unwrap();
504
505 assert!(metadata.column_index().is_some() && metadata.column_index().is_some());
509 }
510}