parquet/arrow/arrow_reader/filter.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::arrow::ProjectionMask;
19use arrow_array::{BooleanArray, RecordBatch};
20use arrow_schema::ArrowError;
21
22/// A predicate operating on [`RecordBatch`]
23///
24/// See also:
25/// * [`RowFilter`] for more information on applying filters during the
26/// Parquet decoding process.
27/// * [`ArrowPredicateFn`] for a concrete implementation based on a function
28pub trait ArrowPredicate: Send + 'static {
29 /// Returns the [`ProjectionMask`] that describes the columns required
30 /// to evaluate this predicate.
31 ///
32 /// All projected columns will be provided in the `batch` passed to
33 /// [`evaluate`](Self::evaluate). The projection mask should be as small as
34 /// possible because any columns needed for the overall projection mask are
35 /// decoded again after a predicate is applied.
36 fn projection(&self) -> &ProjectionMask;
37
38 /// Evaluate this predicate for the given [`RecordBatch`] containing the columns
39 /// identified by [`Self::projection`]
40 ///
41 /// Must return a [`BooleanArray`] that has the same length as the input
42 /// `batch` where each row indicates whether the row should be returned:
43 /// * `true`:the row should be returned
44 /// * `false` or `null`: the row should not be returned
45 fn evaluate(&mut self, batch: RecordBatch) -> Result<BooleanArray, ArrowError>;
46}
47
48/// An [`ArrowPredicate`] created from an [`FnMut`] and a [`ProjectionMask`]
49///
50/// See [`RowFilter`] for more information on applying filters during the
51/// Parquet decoding process.
52///
53/// The function is passed `RecordBatch`es with only the columns specified in
54/// the [`ProjectionMask`].
55///
56/// The function must return a [`BooleanArray`] that has the same length as the
57/// input `batch` where each row indicates whether the row should be returned:
58/// * `true`: the row should be returned
59/// * `false` or `null`: the row should not be returned
60///
61/// # Example:
62///
63/// Given an input schema: `"a:int64", "b:int64"`, you can create a predicate that
64/// evaluates `b > 0` like this:
65///
66/// ```
67/// # use std::sync::Arc;
68/// # use arrow::compute::kernels::cmp::gt;
69/// # use arrow_array::{BooleanArray, Int64Array, RecordBatch};
70/// # use arrow_array::cast::AsArray;
71/// # use arrow_array::types::Int64Type;
72/// # use parquet::arrow::arrow_reader::ArrowPredicateFn;
73/// # use parquet::arrow::ProjectionMask;
74/// # use parquet::schema::types::{SchemaDescriptor, Type};
75/// # use parquet::basic; // note there are two `Type`s that are different
76/// # // Schema for a table with one columns: "a" (int64) and "b" (int64)
77/// # let descriptor = SchemaDescriptor::new(
78/// # Arc::new(
79/// # Type::group_type_builder("my_schema")
80/// # .with_fields(vec![
81/// # Arc::new(
82/// # Type::primitive_type_builder("a", basic::Type::INT64)
83/// # .build().unwrap()
84/// # ),
85/// # Arc::new(
86/// # Type::primitive_type_builder("b", basic::Type::INT64)
87/// # .build().unwrap()
88/// # ),
89/// # ])
90/// # .build().unwrap()
91/// # )
92/// # );
93/// // Create a mask for selecting only the second column "b" (index 1)
94/// let projection_mask = ProjectionMask::leaves(&descriptor, [1]);
95/// // Closure that evaluates "b > 0"
96/// let predicate = |batch: RecordBatch| {
97/// let scalar_0 = Int64Array::new_scalar(0);
98/// let column = batch.column(0).as_primitive::<Int64Type>();
99/// // call the gt kernel to compute `>` which returns a BooleanArray
100/// gt(column, &scalar_0)
101/// };
102/// // Create ArrowPredicateFn that can be passed to RowFilter
103/// let arrow_predicate = ArrowPredicateFn::new(projection_mask, predicate);
104/// ```
105pub struct ArrowPredicateFn<F> {
106 f: F,
107 projection: ProjectionMask,
108}
109
110impl<F> ArrowPredicateFn<F>
111where
112 F: FnMut(RecordBatch) -> Result<BooleanArray, ArrowError> + Send + 'static,
113{
114 /// Create a new [`ArrowPredicateFn`] that invokes `f` on the columns
115 /// specified in `projection`.
116 pub fn new(projection: ProjectionMask, f: F) -> Self {
117 Self { f, projection }
118 }
119}
120
121impl<F> ArrowPredicate for ArrowPredicateFn<F>
122where
123 F: FnMut(RecordBatch) -> Result<BooleanArray, ArrowError> + Send + 'static,
124{
125 fn projection(&self) -> &ProjectionMask {
126 &self.projection
127 }
128
129 fn evaluate(&mut self, batch: RecordBatch) -> Result<BooleanArray, ArrowError> {
130 (self.f)(batch)
131 }
132}
133
134/// Filter applied *during* the parquet read process
135///
136/// [`RowFilter`] applies predicates in order, after decoding only the columns
137/// required. As predicates eliminate rows, fewer rows from subsequent columns
138/// may be required, thus potentially reducing IO and decode.
139///
140/// A `RowFilter` consists of a list of [`ArrowPredicate`]s. Only the rows for which
141/// all the predicates evaluate to `true` will be returned.
142/// Any [`RowSelection`] provided to the reader will be applied prior
143/// to the first predicate, and each predicate in turn will then be used to compute
144/// a more refined [`RowSelection`] used when evaluating the subsequent predicates.
145///
146/// Once all predicates have been evaluated, the final [`RowSelection`] is applied
147/// to the top-level [`ProjectionMask`] to produce the final output [`RecordBatch`].
148///
149/// This design has a couple of implications:
150///
151/// * [`RowFilter`] can be used to skip entire pages, and thus IO, in addition to CPU decode overheads
152/// * Columns may be decoded multiple times if they appear in multiple [`ProjectionMask`]
153/// * IO will be deferred until needed by a [`ProjectionMask`]
154///
155/// As such there is a trade-off between a single large predicate, or multiple predicates,
156/// that will depend on the shape of the data. Whilst multiple smaller predicates may
157/// minimise the amount of data scanned/decoded, it may not be faster overall.
158///
159/// For example, if a predicate that needs a single column of data filters out all but
160/// 1% of the rows, applying it as one of the early `ArrowPredicateFn` will likely significantly
161/// improve performance.
162///
163/// As a counter example, if a predicate needs several columns of data to evaluate but
164/// leaves 99% of the rows, it may be better to not filter the data from parquet and
165/// apply the filter after the RecordBatch has been fully decoded.
166///
167/// Additionally, even if a predicate eliminates a moderate number of rows, it may still be faster
168/// to filter the data after the RecordBatch has been fully decoded, if the eliminated rows are
169/// not contiguous.
170///
171/// [`RowSelection`]: crate::arrow::arrow_reader::RowSelection
172pub struct RowFilter {
173 /// A list of [`ArrowPredicate`]
174 pub(crate) predicates: Vec<Box<dyn ArrowPredicate>>,
175}
176
177impl RowFilter {
178 /// Create a new [`RowFilter`] from an array of [`ArrowPredicate`]
179 pub fn new(predicates: Vec<Box<dyn ArrowPredicate>>) -> Self {
180 Self { predicates }
181 }
182}