parquet/arrow/async_reader/
store.rs1use std::{ops::Range, sync::Arc};
19
20use crate::arrow::arrow_reader::ArrowReaderOptions;
21use crate::arrow::async_reader::{AsyncFileReader, MetadataSuffixFetch};
22use crate::errors::{ParquetError, Result};
23use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
24use bytes::Bytes;
25use futures::{future::BoxFuture, FutureExt, TryFutureExt};
26use object_store::{path::Path, ObjectStore};
27use object_store::{GetOptions, GetRange};
28use tokio::runtime::Handle;
29
30#[derive(Clone, Debug)]
55pub struct ParquetObjectReader {
56 store: Arc<dyn ObjectStore>,
57 path: Path,
58 file_size: Option<u64>,
59 metadata_size_hint: Option<usize>,
60 preload_column_index: bool,
61 preload_offset_index: bool,
62 runtime: Option<Handle>,
63}
64
65impl ParquetObjectReader {
66 pub fn new(store: Arc<dyn ObjectStore>, path: Path) -> Self {
68 Self {
69 store,
70 path,
71 file_size: None,
72 metadata_size_hint: None,
73 preload_column_index: false,
74 preload_offset_index: false,
75 runtime: None,
76 }
77 }
78
79 pub fn with_footer_size_hint(self, hint: usize) -> Self {
82 Self {
83 metadata_size_hint: Some(hint),
84 ..self
85 }
86 }
87
88 pub fn with_file_size(self, file_size: u64) -> Self {
98 Self {
99 file_size: Some(file_size),
100 ..self
101 }
102 }
103
104 pub fn with_preload_column_index(self, preload_column_index: bool) -> Self {
106 Self {
107 preload_column_index,
108 ..self
109 }
110 }
111
112 pub fn with_preload_offset_index(self, preload_offset_index: bool) -> Self {
114 Self {
115 preload_offset_index,
116 ..self
117 }
118 }
119
120 pub fn with_runtime(self, handle: Handle) -> Self {
129 Self {
130 runtime: Some(handle),
131 ..self
132 }
133 }
134
135 fn spawn<F, O, E>(&self, f: F) -> BoxFuture<'_, Result<O>>
136 where
137 F: for<'a> FnOnce(&'a Arc<dyn ObjectStore>, &'a Path) -> BoxFuture<'a, Result<O, E>>
138 + Send
139 + 'static,
140 O: Send + 'static,
141 E: Into<ParquetError> + Send + 'static,
142 {
143 match &self.runtime {
144 Some(handle) => {
145 let path = self.path.clone();
146 let store = Arc::clone(&self.store);
147 handle
148 .spawn(async move { f(&store, &path).await })
149 .map_ok_or_else(
150 |e| match e.try_into_panic() {
151 Err(e) => Err(ParquetError::External(Box::new(e))),
152 Ok(p) => std::panic::resume_unwind(p),
153 },
154 |res| res.map_err(|e| e.into()),
155 )
156 .boxed()
157 }
158 None => f(&self.store, &self.path).map_err(|e| e.into()).boxed(),
159 }
160 }
161}
162
163impl MetadataSuffixFetch for &mut ParquetObjectReader {
164 fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
165 let options = GetOptions {
166 range: Some(GetRange::Suffix(suffix as u64)),
167 ..Default::default()
168 };
169 self.spawn(|store, path| {
170 async move {
171 let resp = store.get_opts(path, options).await?;
172 Ok::<_, ParquetError>(resp.bytes().await?)
173 }
174 .boxed()
175 })
176 }
177}
178
179impl AsyncFileReader for ParquetObjectReader {
180 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
181 self.spawn(|store, path| store.get_range(path, range))
182 }
183
184 fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>>
185 where
186 Self: Send,
187 {
188 self.spawn(|store, path| async move { store.get_ranges(path, &ranges).await }.boxed())
189 }
190
191 fn get_metadata<'a>(
198 &'a mut self,
199 options: Option<&'a ArrowReaderOptions>,
200 ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
201 Box::pin(async move {
202 let mut metadata = ParquetMetaDataReader::new()
203 .with_column_indexes(self.preload_column_index)
204 .with_offset_indexes(self.preload_offset_index)
205 .with_prefetch_hint(self.metadata_size_hint);
206
207 #[cfg(feature = "encryption")]
208 if let Some(options) = options {
209 metadata = metadata
210 .with_decryption_properties(options.file_decryption_properties.as_ref());
211 }
212
213 let metadata = if let Some(file_size) = self.file_size {
214 metadata.load_and_finish(self, file_size).await?
215 } else {
216 metadata.load_via_suffix_and_finish(self).await?
217 };
218
219 Ok(Arc::new(metadata))
220 })
221 }
222}
223
224#[cfg(test)]
225mod tests {
226 use std::sync::{
227 atomic::{AtomicUsize, Ordering},
228 Arc,
229 };
230
231 use futures::TryStreamExt;
232
233 use crate::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
234 use crate::arrow::ParquetRecordBatchStreamBuilder;
235 use crate::errors::ParquetError;
236 use arrow::util::test_util::parquet_test_data;
237 use futures::FutureExt;
238 use object_store::local::LocalFileSystem;
239 use object_store::path::Path;
240 use object_store::{ObjectMeta, ObjectStore};
241
242 async fn get_meta_store() -> (ObjectMeta, Arc<dyn ObjectStore>) {
243 let res = parquet_test_data();
244 let store = LocalFileSystem::new_with_prefix(res).unwrap();
245
246 let meta = store
247 .head(&Path::from("alltypes_plain.parquet"))
248 .await
249 .unwrap();
250
251 (meta, Arc::new(store) as Arc<dyn ObjectStore>)
252 }
253
254 #[tokio::test]
255 async fn test_simple() {
256 let (meta, store) = get_meta_store().await;
257 let object_reader =
258 ParquetObjectReader::new(store, meta.location).with_file_size(meta.size);
259
260 let builder = ParquetRecordBatchStreamBuilder::new(object_reader)
261 .await
262 .unwrap();
263 let batches: Vec<_> = builder.build().unwrap().try_collect().await.unwrap();
264
265 assert_eq!(batches.len(), 1);
266 assert_eq!(batches[0].num_rows(), 8);
267 }
268
269 #[tokio::test]
270 async fn test_simple_without_file_length() {
271 let (meta, store) = get_meta_store().await;
272 let object_reader = ParquetObjectReader::new(store, meta.location);
273
274 let builder = ParquetRecordBatchStreamBuilder::new(object_reader)
275 .await
276 .unwrap();
277 let batches: Vec<_> = builder.build().unwrap().try_collect().await.unwrap();
278
279 assert_eq!(batches.len(), 1);
280 assert_eq!(batches[0].num_rows(), 8);
281 }
282
283 #[tokio::test]
284 async fn test_not_found() {
285 let (mut meta, store) = get_meta_store().await;
286 meta.location = Path::from("I don't exist.parquet");
287
288 let object_reader =
289 ParquetObjectReader::new(store, meta.location).with_file_size(meta.size);
290 match ParquetRecordBatchStreamBuilder::new(object_reader).await {
292 Ok(_) => panic!("expected failure"),
293 Err(e) => {
294 let err = e.to_string();
295 assert!(
296 err.contains("not found: No such file or directory (os error 2)"),
297 "{err}",
298 );
299 }
300 }
301 }
302
303 #[tokio::test]
304 async fn test_runtime_is_used() {
305 let num_actions = Arc::new(AtomicUsize::new(0));
306
307 let (a1, a2) = (num_actions.clone(), num_actions.clone());
308 let rt = tokio::runtime::Builder::new_multi_thread()
309 .on_thread_park(move || {
310 a1.fetch_add(1, Ordering::Relaxed);
311 })
312 .on_thread_unpark(move || {
313 a2.fetch_add(1, Ordering::Relaxed);
314 })
315 .build()
316 .unwrap();
317
318 let (meta, store) = get_meta_store().await;
319
320 let initial_actions = num_actions.load(Ordering::Relaxed);
321
322 let reader = ParquetObjectReader::new(store, meta.location)
323 .with_file_size(meta.size)
324 .with_runtime(rt.handle().clone());
325
326 let builder = ParquetRecordBatchStreamBuilder::new(reader).await.unwrap();
327 let batches: Vec<_> = builder.build().unwrap().try_collect().await.unwrap();
328
329 assert_eq!(batches.len(), 1);
331 assert_eq!(batches[0].num_rows(), 8);
332
333 assert!(num_actions.load(Ordering::Relaxed) - initial_actions > 0);
334
335 tokio::runtime::Handle::current().spawn_blocking(move || drop(rt));
338 }
339
340 #[tokio::test]
342 async fn test_runtime_thread_id_different() {
343 let rt = tokio::runtime::Builder::new_multi_thread()
344 .worker_threads(1)
345 .build()
346 .unwrap();
347
348 let (meta, store) = get_meta_store().await;
349
350 let reader = ParquetObjectReader::new(store, meta.location)
351 .with_file_size(meta.size)
352 .with_runtime(rt.handle().clone());
353
354 let current_id = std::thread::current().id();
355
356 let other_id = reader
357 .spawn(|_, _| async move { Ok::<_, ParquetError>(std::thread::current().id()) }.boxed())
358 .await
359 .unwrap();
360
361 assert_ne!(current_id, other_id);
362
363 tokio::runtime::Handle::current().spawn_blocking(move || drop(rt));
364 }
365
366 #[tokio::test]
367 async fn io_fails_on_shutdown_runtime() {
368 let rt = tokio::runtime::Builder::new_multi_thread()
369 .worker_threads(1)
370 .build()
371 .unwrap();
372
373 let (meta, store) = get_meta_store().await;
374
375 let mut reader = ParquetObjectReader::new(store, meta.location)
376 .with_file_size(meta.size)
377 .with_runtime(rt.handle().clone());
378
379 rt.shutdown_background();
380
381 let err = reader.get_bytes(0..1).await.unwrap_err().to_string();
382
383 assert!(err.to_string().contains("was cancelled"));
384 }
385}