parquet/arrow/arrow_reader/filter.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::arrow::ProjectionMask;
19use arrow_array::{BooleanArray, RecordBatch};
20use arrow_schema::ArrowError;
21
22/// A predicate operating on [`RecordBatch`]
23///
24/// See [`RowFilter`] for more information on the use of this trait.
25pub trait ArrowPredicate: Send + 'static {
26 /// Returns the [`ProjectionMask`] that describes the columns required
27 /// to evaluate this predicate. All projected columns will be provided in the `batch`
28 /// passed to [`evaluate`](Self::evaluate)
29 fn projection(&self) -> &ProjectionMask;
30
31 /// Evaluate this predicate for the given [`RecordBatch`] containing the columns
32 /// identified by [`Self::projection`]
33 ///
34 /// Must return a [`BooleanArray`] that has the same length as the input
35 /// `batch` where each row indicates whether the row should be returned:
36 /// * `true`:the row should be returned
37 /// * `false` or `null`: the row should not be returned
38 fn evaluate(&mut self, batch: RecordBatch) -> Result<BooleanArray, ArrowError>;
39}
40
41/// An [`ArrowPredicate`] created from an [`FnMut`]
42pub struct ArrowPredicateFn<F> {
43 f: F,
44 projection: ProjectionMask,
45}
46
47impl<F> ArrowPredicateFn<F>
48where
49 F: FnMut(RecordBatch) -> Result<BooleanArray, ArrowError> + Send + 'static,
50{
51 /// Create a new [`ArrowPredicateFn`]. `f` will be passed batches
52 /// that contains the columns specified in `projection`
53 /// and returns a [`BooleanArray`] that describes which rows should
54 /// be passed along
55 pub fn new(projection: ProjectionMask, f: F) -> Self {
56 Self { f, projection }
57 }
58}
59
60impl<F> ArrowPredicate for ArrowPredicateFn<F>
61where
62 F: FnMut(RecordBatch) -> Result<BooleanArray, ArrowError> + Send + 'static,
63{
64 fn projection(&self) -> &ProjectionMask {
65 &self.projection
66 }
67
68 fn evaluate(&mut self, batch: RecordBatch) -> Result<BooleanArray, ArrowError> {
69 (self.f)(batch)
70 }
71}
72
73/// Filter applied *during* the parquet read process
74///
75/// [`RowFilter`] applies predicates in order, after decoding only the columns
76/// required. As predicates eliminate rows, fewer rows from subsequent columns
77/// may be required, thus potentially reducing IO and decode.
78///
79/// A `RowFilter` consists of a list of [`ArrowPredicate`]s. Only the rows for which
80/// all the predicates evaluate to `true` will be returned.
81/// Any [`RowSelection`] provided to the reader will be applied prior
82/// to the first predicate, and each predicate in turn will then be used to compute
83/// a more refined [`RowSelection`] used when evaluating the subsequent predicates.
84///
85/// Once all predicates have been evaluated, the final [`RowSelection`] is applied
86/// to the top-level [`ProjectionMask`] to produce the final output [`RecordBatch`].
87///
88/// This design has a couple of implications:
89///
90/// * [`RowFilter`] can be used to skip entire pages, and thus IO, in addition to CPU decode overheads
91/// * Columns may be decoded multiple times if they appear in multiple [`ProjectionMask`]
92/// * IO will be deferred until needed by a [`ProjectionMask`]
93///
94/// As such there is a trade-off between a single large predicate, or multiple predicates,
95/// that will depend on the shape of the data. Whilst multiple smaller predicates may
96/// minimise the amount of data scanned/decoded, it may not be faster overall.
97///
98/// For example, if a predicate that needs a single column of data filters out all but
99/// 1% of the rows, applying it as one of the early `ArrowPredicateFn` will likely significantly
100/// improve performance.
101///
102/// As a counter example, if a predicate needs several columns of data to evaluate but
103/// leaves 99% of the rows, it may be better to not filter the data from parquet and
104/// apply the filter after the RecordBatch has been fully decoded.
105///
106/// Additionally, even if a predicate eliminates a moderate number of rows, it may still be faster
107/// to filter the data after the RecordBatch has been fully decoded, if the eliminated rows are
108/// not contiguous.
109///
110/// [`RowSelection`]: crate::arrow::arrow_reader::RowSelection
111pub struct RowFilter {
112 /// A list of [`ArrowPredicate`]
113 pub(crate) predicates: Vec<Box<dyn ArrowPredicate>>,
114}
115
116impl RowFilter {
117 /// Create a new [`RowFilter`] from an array of [`ArrowPredicate`]
118 pub fn new(predicates: Vec<Box<dyn ArrowPredicate>>) -> Self {
119 Self { predicates }
120 }
121}