parquet/arrow/async_reader/
store.rs1use std::{ops::Range, sync::Arc};
19
20use crate::arrow::arrow_reader::ArrowReaderOptions;
21use crate::arrow::async_reader::{AsyncFileReader, MetadataSuffixFetch};
22use crate::errors::{ParquetError, Result};
23use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader};
24use bytes::Bytes;
25use futures::{FutureExt, TryFutureExt, future::BoxFuture};
26use object_store::{GetOptions, GetRange};
27use object_store::{ObjectStore, path::Path};
28use tokio::runtime::Handle;
29
30#[derive(Clone, Debug)]
55pub struct ParquetObjectReader {
56 store: Arc<dyn ObjectStore>,
57 path: Path,
58 file_size: Option<u64>,
59 metadata_size_hint: Option<usize>,
60 preload_column_index: bool,
61 preload_offset_index: bool,
62 runtime: Option<Handle>,
63}
64
65impl ParquetObjectReader {
66 pub fn new(store: Arc<dyn ObjectStore>, path: Path) -> Self {
68 Self {
69 store,
70 path,
71 file_size: None,
72 metadata_size_hint: None,
73 preload_column_index: false,
74 preload_offset_index: false,
75 runtime: None,
76 }
77 }
78
79 pub fn with_footer_size_hint(self, hint: usize) -> Self {
82 Self {
83 metadata_size_hint: Some(hint),
84 ..self
85 }
86 }
87
88 pub fn with_file_size(self, file_size: u64) -> Self {
98 Self {
99 file_size: Some(file_size),
100 ..self
101 }
102 }
103
104 pub fn with_preload_column_index(self, preload_column_index: bool) -> Self {
110 Self {
111 preload_column_index,
112 ..self
113 }
114 }
115
116 pub fn with_preload_offset_index(self, preload_offset_index: bool) -> Self {
122 Self {
123 preload_offset_index,
124 ..self
125 }
126 }
127
128 pub fn with_runtime(self, handle: Handle) -> Self {
137 Self {
138 runtime: Some(handle),
139 ..self
140 }
141 }
142
143 fn spawn<F, O, E>(&self, f: F) -> BoxFuture<'_, Result<O>>
144 where
145 F: for<'a> FnOnce(&'a Arc<dyn ObjectStore>, &'a Path) -> BoxFuture<'a, Result<O, E>>
146 + Send
147 + 'static,
148 O: Send + 'static,
149 E: Into<ParquetError> + Send + 'static,
150 {
151 match &self.runtime {
152 Some(handle) => {
153 let path = self.path.clone();
154 let store = Arc::clone(&self.store);
155 handle
156 .spawn(async move { f(&store, &path).await })
157 .map_ok_or_else(
158 |e| match e.try_into_panic() {
159 Err(e) => Err(ParquetError::External(Box::new(e))),
160 Ok(p) => std::panic::resume_unwind(p),
161 },
162 |res| res.map_err(|e| e.into()),
163 )
164 .boxed()
165 }
166 None => f(&self.store, &self.path).map_err(|e| e.into()).boxed(),
167 }
168 }
169}
170
171impl MetadataSuffixFetch for &mut ParquetObjectReader {
172 fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
173 let options = GetOptions {
174 range: Some(GetRange::Suffix(suffix as u64)),
175 ..Default::default()
176 };
177 self.spawn(|store, path| {
178 async move {
179 let resp = store.get_opts(path, options).await?;
180 Ok::<_, ParquetError>(resp.bytes().await?)
181 }
182 .boxed()
183 })
184 }
185}
186
187impl AsyncFileReader for ParquetObjectReader {
188 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
189 self.spawn(|store, path| store.get_range(path, range))
190 }
191
192 fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>>
193 where
194 Self: Send,
195 {
196 self.spawn(|store, path| async move { store.get_ranges(path, &ranges).await }.boxed())
197 }
198
199 fn get_metadata<'a>(
206 &'a mut self,
207 options: Option<&'a ArrowReaderOptions>,
208 ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
209 Box::pin(async move {
210 let metadata_opts = options.map(|o| o.metadata_options().clone());
211 let mut metadata = ParquetMetaDataReader::new()
212 .with_metadata_options(metadata_opts)
213 .with_column_index_policy(PageIndexPolicy::from(self.preload_column_index))
214 .with_offset_index_policy(PageIndexPolicy::from(self.preload_offset_index))
215 .with_prefetch_hint(self.metadata_size_hint);
216
217 #[cfg(feature = "encryption")]
218 if let Some(options) = options {
219 metadata = metadata.with_decryption_properties(
220 options.file_decryption_properties.as_ref().map(Arc::clone),
221 );
222 }
223
224 if let Some(options) = options {
229 if options.column_index_policy() != PageIndexPolicy::Skip
230 || options.offset_index_policy() != PageIndexPolicy::Skip
231 {
232 metadata = metadata
233 .with_column_index_policy(options.column_index_policy())
234 .with_offset_index_policy(options.offset_index_policy());
235 }
236 }
237
238 let metadata = if let Some(file_size) = self.file_size {
239 metadata.load_and_finish(self, file_size).await?
240 } else {
241 metadata.load_via_suffix_and_finish(self).await?
242 };
243
244 Ok(Arc::new(metadata))
245 })
246 }
247}
248
249#[cfg(test)]
250mod tests {
251 use crate::arrow::async_reader::ArrowReaderOptions;
252 use crate::file::metadata::PageIndexPolicy;
253 use std::sync::{
254 Arc,
255 atomic::{AtomicUsize, Ordering},
256 };
257
258 use futures::TryStreamExt;
259
260 use crate::arrow::ParquetRecordBatchStreamBuilder;
261 use crate::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
262 use crate::errors::ParquetError;
263 use arrow::util::test_util::parquet_test_data;
264 use futures::FutureExt;
265 use object_store::local::LocalFileSystem;
266 use object_store::path::Path;
267 use object_store::{ObjectMeta, ObjectStore};
268
269 async fn get_meta_store() -> (ObjectMeta, Arc<dyn ObjectStore>) {
270 let res = parquet_test_data();
271 let store = LocalFileSystem::new_with_prefix(res).unwrap();
272
273 let meta = store
274 .head(&Path::from("alltypes_plain.parquet"))
275 .await
276 .unwrap();
277
278 (meta, Arc::new(store) as Arc<dyn ObjectStore>)
279 }
280
281 async fn get_meta_store_with_page_index() -> (ObjectMeta, Arc<dyn ObjectStore>) {
282 let res = parquet_test_data();
283 let store = LocalFileSystem::new_with_prefix(res).unwrap();
284
285 let meta = store
286 .head(&Path::from("alltypes_tiny_pages_plain.parquet"))
287 .await
288 .unwrap();
289
290 (meta, Arc::new(store) as Arc<dyn ObjectStore>)
291 }
292
293 #[tokio::test]
294 async fn test_simple() {
295 let (meta, store) = get_meta_store().await;
296 let object_reader =
297 ParquetObjectReader::new(store, meta.location).with_file_size(meta.size);
298
299 let builder = ParquetRecordBatchStreamBuilder::new(object_reader)
300 .await
301 .unwrap();
302 let batches: Vec<_> = builder.build().unwrap().try_collect().await.unwrap();
303
304 assert_eq!(batches.len(), 1);
305 assert_eq!(batches[0].num_rows(), 8);
306 }
307
308 #[tokio::test]
309 async fn test_simple_without_file_length() {
310 let (meta, store) = get_meta_store().await;
311 let object_reader = ParquetObjectReader::new(store, meta.location);
312
313 let builder = ParquetRecordBatchStreamBuilder::new(object_reader)
314 .await
315 .unwrap();
316 let batches: Vec<_> = builder.build().unwrap().try_collect().await.unwrap();
317
318 assert_eq!(batches.len(), 1);
319 assert_eq!(batches[0].num_rows(), 8);
320 }
321
322 #[tokio::test]
323 async fn test_not_found() {
324 let (mut meta, store) = get_meta_store().await;
325 meta.location = Path::from("I don't exist.parquet");
326
327 let object_reader =
328 ParquetObjectReader::new(store, meta.location).with_file_size(meta.size);
329 match ParquetRecordBatchStreamBuilder::new(object_reader).await {
331 Ok(_) => panic!("expected failure"),
332 Err(e) => {
333 let err = e.to_string();
334 assert!(err.contains("I don't exist.parquet not found:"), "{err}",);
335 }
336 }
337 }
338
339 #[tokio::test]
340 async fn test_runtime_is_used() {
341 let num_actions = Arc::new(AtomicUsize::new(0));
342
343 let (a1, a2) = (num_actions.clone(), num_actions.clone());
344 let rt = tokio::runtime::Builder::new_multi_thread()
345 .on_thread_park(move || {
346 a1.fetch_add(1, Ordering::Relaxed);
347 })
348 .on_thread_unpark(move || {
349 a2.fetch_add(1, Ordering::Relaxed);
350 })
351 .build()
352 .unwrap();
353
354 let (meta, store) = get_meta_store().await;
355
356 let initial_actions = num_actions.load(Ordering::Relaxed);
357
358 let reader = ParquetObjectReader::new(store, meta.location)
359 .with_file_size(meta.size)
360 .with_runtime(rt.handle().clone());
361
362 let builder = ParquetRecordBatchStreamBuilder::new(reader).await.unwrap();
363 let batches: Vec<_> = builder.build().unwrap().try_collect().await.unwrap();
364
365 assert_eq!(batches.len(), 1);
367 assert_eq!(batches[0].num_rows(), 8);
368
369 assert!(num_actions.load(Ordering::Relaxed) - initial_actions > 0);
370
371 tokio::runtime::Handle::current().spawn_blocking(move || drop(rt));
374 }
375
376 #[tokio::test]
378 async fn test_runtime_thread_id_different() {
379 let rt = tokio::runtime::Builder::new_multi_thread()
380 .worker_threads(1)
381 .build()
382 .unwrap();
383
384 let (meta, store) = get_meta_store().await;
385
386 let reader = ParquetObjectReader::new(store, meta.location)
387 .with_file_size(meta.size)
388 .with_runtime(rt.handle().clone());
389
390 let current_id = std::thread::current().id();
391
392 let other_id = reader
393 .spawn(|_, _| async move { Ok::<_, ParquetError>(std::thread::current().id()) }.boxed())
394 .await
395 .unwrap();
396
397 assert_ne!(current_id, other_id);
398
399 tokio::runtime::Handle::current().spawn_blocking(move || drop(rt));
400 }
401
402 #[tokio::test]
403 async fn io_fails_on_shutdown_runtime() {
404 let rt = tokio::runtime::Builder::new_multi_thread()
405 .worker_threads(1)
406 .build()
407 .unwrap();
408
409 let (meta, store) = get_meta_store().await;
410
411 let mut reader = ParquetObjectReader::new(store, meta.location)
412 .with_file_size(meta.size)
413 .with_runtime(rt.handle().clone());
414
415 rt.shutdown_background();
416
417 let err = reader.get_bytes(0..1).await.unwrap_err().to_string();
418
419 assert!(err.to_string().contains("was cancelled"));
420 }
421
422 #[tokio::test]
423 async fn test_page_index_policy_skip_uses_preload_true() {
424 let (meta, store) = get_meta_store_with_page_index().await;
425
426 let mut reader = ParquetObjectReader::new(store.clone(), meta.location.clone())
428 .with_file_size(meta.size)
429 .with_preload_column_index(true)
430 .with_preload_offset_index(true);
431
432 let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Skip);
434
435 let metadata = reader.get_metadata(Some(&options)).await.unwrap();
437
438 assert!(metadata.column_index().is_some());
440 }
441
442 #[tokio::test]
443 async fn test_page_index_policy_optional_overrides_preload_false() {
444 let (meta, store) = get_meta_store_with_page_index().await;
445
446 let mut reader = ParquetObjectReader::new(store.clone(), meta.location.clone())
448 .with_file_size(meta.size)
449 .with_preload_column_index(false)
450 .with_preload_offset_index(false);
451
452 let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Optional);
454
455 let metadata = reader.get_metadata(Some(&options)).await.unwrap();
457
458 assert!(metadata.column_index().is_some());
461 }
462
463 #[tokio::test]
464 async fn test_page_index_policy_optional_vs_skip() {
465 let (meta, store) = get_meta_store_with_page_index().await;
466
467 let mut reader1 = ParquetObjectReader::new(store.clone(), meta.location.clone())
469 .with_file_size(meta.size)
470 .with_preload_column_index(false)
471 .with_preload_offset_index(false);
472
473 let options1 = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Skip);
474 let metadata1 = reader1.get_metadata(Some(&options1)).await.unwrap();
475
476 let mut reader2 = ParquetObjectReader::new(store.clone(), meta.location.clone())
478 .with_file_size(meta.size)
479 .with_preload_column_index(false)
480 .with_preload_offset_index(false);
481
482 let options2 = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Optional);
483 let metadata2 = reader2.get_metadata(Some(&options2)).await.unwrap();
484
485 assert!(metadata1.column_index().is_none());
489 assert!(metadata2.column_index().is_some());
490 }
491
492 #[tokio::test]
493 async fn test_page_index_policy_no_options_uses_preload() {
494 let (meta, store) = get_meta_store_with_page_index().await;
495
496 let mut reader = ParquetObjectReader::new(store, meta.location)
498 .with_file_size(meta.size)
499 .with_preload_column_index(true)
500 .with_preload_offset_index(true);
501
502 let metadata = reader.get_metadata(None).await.unwrap();
504
505 assert!(metadata.column_index().is_some() && metadata.column_index().is_some());
509 }
510}