parquet/arrow/arrow_reader/filter.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::arrow::ProjectionMask;
19use arrow_array::{BooleanArray, RecordBatch};
20use arrow_schema::ArrowError;
21use std::fmt::{Debug, Formatter};
22
23/// A predicate operating on [`RecordBatch`]
24///
25/// See also:
26/// * [`RowFilter`] for more information on applying filters during the
27/// Parquet decoding process.
28/// * [`ArrowPredicateFn`] for a concrete implementation based on a function
29pub trait ArrowPredicate: Send + 'static {
30 /// Returns the [`ProjectionMask`] that describes the columns required
31 /// to evaluate this predicate.
32 ///
33 /// All projected columns will be provided in the `batch` passed to
34 /// [`evaluate`](Self::evaluate). The projection mask should be as small as
35 /// possible because any columns needed for the overall projection mask are
36 /// decoded again after a predicate is applied.
37 fn projection(&self) -> &ProjectionMask;
38
39 /// Evaluate this predicate for the given [`RecordBatch`] containing the columns
40 /// identified by [`Self::projection`]
41 ///
42 /// Must return a [`BooleanArray`] that has the same length as the input
43 /// `batch` where each row indicates whether the row should be returned:
44 /// * `true`:the row should be returned
45 /// * `false` or `null`: the row should not be returned
46 fn evaluate(&mut self, batch: RecordBatch) -> Result<BooleanArray, ArrowError>;
47}
48
49/// An [`ArrowPredicate`] created from an [`FnMut`] and a [`ProjectionMask`]
50///
51/// See [`RowFilter`] for more information on applying filters during the
52/// Parquet decoding process.
53///
54/// The function is passed `RecordBatch`es with only the columns specified in
55/// the [`ProjectionMask`].
56///
57/// The function must return a [`BooleanArray`] that has the same length as the
58/// input `batch` where each row indicates whether the row should be returned:
59/// * `true`: the row should be returned
60/// * `false` or `null`: the row should not be returned
61///
62/// # Example:
63///
64/// Given an input schema: `"a:int64", "b:int64"`, you can create a predicate that
65/// evaluates `b > 0` like this:
66///
67/// ```
68/// # use std::sync::Arc;
69/// # use arrow::compute::kernels::cmp::gt;
70/// # use arrow_array::{BooleanArray, Int64Array, RecordBatch};
71/// # use arrow_array::cast::AsArray;
72/// # use arrow_array::types::Int64Type;
73/// # use parquet::arrow::arrow_reader::ArrowPredicateFn;
74/// # use parquet::arrow::ProjectionMask;
75/// # use parquet::schema::types::{SchemaDescriptor, Type};
76/// # use parquet::basic; // note there are two `Type`s that are different
77/// # // Schema for a table with one columns: "a" (int64) and "b" (int64)
78/// # let descriptor = SchemaDescriptor::new(
79/// # Arc::new(
80/// # Type::group_type_builder("my_schema")
81/// # .with_fields(vec![
82/// # Arc::new(
83/// # Type::primitive_type_builder("a", basic::Type::INT64)
84/// # .build().unwrap()
85/// # ),
86/// # Arc::new(
87/// # Type::primitive_type_builder("b", basic::Type::INT64)
88/// # .build().unwrap()
89/// # ),
90/// # ])
91/// # .build().unwrap()
92/// # )
93/// # );
94/// // Create a mask for selecting only the second column "b" (index 1)
95/// let projection_mask = ProjectionMask::leaves(&descriptor, [1]);
96/// // Closure that evaluates "b > 0"
97/// let predicate = |batch: RecordBatch| {
98/// let scalar_0 = Int64Array::new_scalar(0);
99/// let column = batch.column(0).as_primitive::<Int64Type>();
100/// // call the gt kernel to compute `>` which returns a BooleanArray
101/// gt(column, &scalar_0)
102/// };
103/// // Create ArrowPredicateFn that can be passed to RowFilter
104/// let arrow_predicate = ArrowPredicateFn::new(projection_mask, predicate);
105/// ```
106pub struct ArrowPredicateFn<F> {
107 f: F,
108 projection: ProjectionMask,
109}
110
111impl<F> ArrowPredicateFn<F>
112where
113 F: FnMut(RecordBatch) -> Result<BooleanArray, ArrowError> + Send + 'static,
114{
115 /// Create a new [`ArrowPredicateFn`] that invokes `f` on the columns
116 /// specified in `projection`.
117 pub fn new(projection: ProjectionMask, f: F) -> Self {
118 Self { f, projection }
119 }
120}
121
122impl<F> ArrowPredicate for ArrowPredicateFn<F>
123where
124 F: FnMut(RecordBatch) -> Result<BooleanArray, ArrowError> + Send + 'static,
125{
126 fn projection(&self) -> &ProjectionMask {
127 &self.projection
128 }
129
130 fn evaluate(&mut self, batch: RecordBatch) -> Result<BooleanArray, ArrowError> {
131 (self.f)(batch)
132 }
133}
134
135/// Filter applied *during* the parquet read process
136///
137/// [`RowFilter`] applies predicates in order, after decoding only the columns
138/// required. As predicates eliminate rows, fewer rows from subsequent columns
139/// may be required, thus potentially reducing IO and decode.
140///
141/// A `RowFilter` consists of a list of [`ArrowPredicate`]s. Only the rows for which
142/// all the predicates evaluate to `true` will be returned.
143/// Any [`RowSelection`] provided to the reader will be applied prior
144/// to the first predicate, and each predicate in turn will then be used to compute
145/// a more refined [`RowSelection`] used when evaluating the subsequent predicates.
146///
147/// Once all predicates have been evaluated, the final [`RowSelection`] is applied
148/// to the top-level [`ProjectionMask`] to produce the final output [`RecordBatch`].
149///
150/// This design has a couple of implications:
151///
152/// * [`RowFilter`] can be used to skip entire pages, and thus IO, in addition to CPU decode overheads
153/// * Columns may be decoded multiple times if they appear in multiple [`ProjectionMask`]
154/// * IO will be deferred until needed by a [`ProjectionMask`]
155///
156/// As such there is a trade-off between a single large predicate, or multiple predicates,
157/// that will depend on the shape of the data. Whilst multiple smaller predicates may
158/// minimise the amount of data scanned/decoded, it may not be faster overall.
159///
160/// For example, if a predicate that needs a single column of data filters out all but
161/// 1% of the rows, applying it as one of the early `ArrowPredicateFn` will likely significantly
162/// improve performance.
163///
164/// As a counter example, if a predicate needs several columns of data to evaluate but
165/// leaves 99% of the rows, it may be better to not filter the data from parquet and
166/// apply the filter after the RecordBatch has been fully decoded.
167///
168/// Additionally, even if a predicate eliminates a moderate number of rows, it may still be faster
169/// to filter the data after the RecordBatch has been fully decoded, if the eliminated rows are
170/// not contiguous.
171///
172/// [`RowSelection`]: crate::arrow::arrow_reader::RowSelection
173pub struct RowFilter {
174 /// A list of [`ArrowPredicate`]
175 pub(crate) predicates: Vec<Box<dyn ArrowPredicate>>,
176}
177
178impl Debug for RowFilter {
179 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
180 write!(f, "RowFilter {{ {} predicates: }}", self.predicates.len())
181 }
182}
183
184impl RowFilter {
185 /// Create a new [`RowFilter`] from an array of [`ArrowPredicate`]
186 pub fn new(predicates: Vec<Box<dyn ArrowPredicate>>) -> Self {
187 Self { predicates }
188 }
189}