parquet/arrow/arrow_reader/
filter.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::arrow::ProjectionMask;
19use arrow_array::{BooleanArray, RecordBatch};
20use arrow_schema::ArrowError;
21
22/// A predicate operating on [`RecordBatch`]
23///
24/// See [`RowFilter`] for more information on the use of this trait.
25pub trait ArrowPredicate: Send + 'static {
26    /// Returns the [`ProjectionMask`] that describes the columns required
27    /// to evaluate this predicate. All projected columns will be provided in the `batch`
28    /// passed to [`evaluate`](Self::evaluate)
29    fn projection(&self) -> &ProjectionMask;
30
31    /// Evaluate this predicate for the given [`RecordBatch`] containing the columns
32    /// identified by [`Self::projection`]
33    ///
34    /// Must return a [`BooleanArray`] that has the same length as the input
35    /// `batch` where each row indicates whether the row should be returned:
36    /// * `true`:the row should be returned
37    /// * `false` or `null`: the row should not be returned
38    fn evaluate(&mut self, batch: RecordBatch) -> Result<BooleanArray, ArrowError>;
39}
40
41/// An [`ArrowPredicate`] created from an [`FnMut`]
42pub struct ArrowPredicateFn<F> {
43    f: F,
44    projection: ProjectionMask,
45}
46
47impl<F> ArrowPredicateFn<F>
48where
49    F: FnMut(RecordBatch) -> Result<BooleanArray, ArrowError> + Send + 'static,
50{
51    /// Create a new [`ArrowPredicateFn`]. `f` will be passed batches
52    /// that contains the columns specified in `projection`
53    /// and returns a [`BooleanArray`] that describes which rows should
54    /// be passed along
55    pub fn new(projection: ProjectionMask, f: F) -> Self {
56        Self { f, projection }
57    }
58}
59
60impl<F> ArrowPredicate for ArrowPredicateFn<F>
61where
62    F: FnMut(RecordBatch) -> Result<BooleanArray, ArrowError> + Send + 'static,
63{
64    fn projection(&self) -> &ProjectionMask {
65        &self.projection
66    }
67
68    fn evaluate(&mut self, batch: RecordBatch) -> Result<BooleanArray, ArrowError> {
69        (self.f)(batch)
70    }
71}
72
73/// Filter applied *during* the parquet read process
74///
75/// [`RowFilter`] applies predicates in order, after decoding only the columns
76/// required. As predicates eliminate rows, fewer rows from subsequent columns
77/// may be required, thus potentially reducing IO and decode.
78///
79/// A `RowFilter` consists of a list of [`ArrowPredicate`]s. Only the rows for which
80/// all the predicates evaluate to `true` will be returned.
81/// Any [`RowSelection`] provided to the reader will be applied prior
82/// to the first predicate, and each predicate in turn will then be used to compute
83/// a more refined [`RowSelection`] used when evaluating the subsequent predicates.
84///
85/// Once all predicates have been evaluated, the final [`RowSelection`] is applied
86/// to the top-level [`ProjectionMask`] to produce the final output [`RecordBatch`].
87///
88/// This design has a couple of implications:
89///
90/// * [`RowFilter`] can be used to skip entire pages, and thus IO, in addition to CPU decode overheads
91/// * Columns may be decoded multiple times if they appear in multiple [`ProjectionMask`]
92/// * IO will be deferred until needed by a [`ProjectionMask`]
93///
94/// As such there is a trade-off between a single large predicate, or multiple predicates,
95/// that will depend on the shape of the data. Whilst multiple smaller predicates may
96/// minimise the amount of data scanned/decoded, it may not be faster overall.
97///
98/// For example, if a predicate that needs a single column of data filters out all but
99/// 1% of the rows, applying it as one of the early `ArrowPredicateFn` will likely significantly
100/// improve performance.
101///
102/// As a counter example, if a predicate needs several columns of data to evaluate but
103/// leaves 99% of the rows, it may be better to not filter the data from parquet and
104/// apply the filter after the RecordBatch has been fully decoded.
105///
106/// Additionally, even if a predicate eliminates a moderate number of rows, it may still be faster
107/// to filter the data after the RecordBatch has been fully decoded, if the eliminated rows are
108/// not contiguous.
109///
110/// [`RowSelection`]: crate::arrow::arrow_reader::RowSelection
111pub struct RowFilter {
112    /// A list of [`ArrowPredicate`]
113    pub(crate) predicates: Vec<Box<dyn ArrowPredicate>>,
114}
115
116impl RowFilter {
117    /// Create a new [`RowFilter`] from an array of [`ArrowPredicate`]
118    pub fn new(predicates: Vec<Box<dyn ArrowPredicate>>) -> Self {
119        Self { predicates }
120    }
121}