parquet/bloom_filter/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Bloom filter implementation specific to Parquet, as described
19//! in the [spec][parquet-bf-spec].
20//!
21//! # Bloom Filter Size
22//!
23//! Parquet uses the [Split Block Bloom Filter][sbbf-paper] (SBBF) as its bloom filter
24//! implementation. For each column upon which bloom filters are enabled, the offset and length of an SBBF
25//! is stored in  the metadata for each row group in the parquet file. The size of each filter is
26//! initialized using a calculation based on the desired number of distinct values (NDV) and false
27//! positive probability (FPP). The FPP for a SBBF can be approximated as<sup>[1][bf-formulae]</sup>:
28//!
29//! ```text
30//! f = (1 - e^(-k * n / m))^k
31//! ```
32//!
33//! Where, `f` is the FPP, `k` the number of hash functions, `n` the NDV, and `m` the total number
34//! of bits in the bloom filter. This can be re-arranged to determine the total number of bits
35//! required to achieve a given FPP and NDV:
36//!
37//! ```text
38//! m = -k * n / ln(1 - f^(1/k))
39//! ```
40//!
41//! SBBFs use eight hash functions to cleanly fit in SIMD lanes<sup>[2][sbbf-paper]</sup>, therefore
42//! `k` is set to 8. The SBBF will spread those `m` bits accross a set of `b` blocks that
43//! are each 256 bits, i.e., 32 bytes, in size. The number of blocks is chosen as:
44//!
45//! ```text
46//! b = NP2(m/8) / 32
47//! ```
48//!
49//! Where, `NP2` denotes *the next power of two*, and `m` is divided by 8 to be represented as bytes.
50//!
51//! Here is a table of calculated sizes for various FPP and NDV:
52//!
53//! | NDV       | FPP       | b       | Size (KB) |
54//! |-----------|-----------|---------|-----------|
55//! | 10,000    | 0.1       | 256     | 8         |
56//! | 10,000    | 0.01      | 512     | 16        |
57//! | 10,000    | 0.001     | 1,024   | 32        |
58//! | 10,000    | 0.0001    | 1,024   | 32        |
59//! | 100,000   | 0.1       | 4,096   | 128       |
60//! | 100,000   | 0.01      | 4,096   | 128       |
61//! | 100,000   | 0.001     | 8,192   | 256       |
62//! | 100,000   | 0.0001    | 16,384  | 512       |
63//! | 100,000   | 0.00001   | 16,384  | 512       |
64//! | 1,000,000 | 0.1       | 32,768  | 1,024     |
65//! | 1,000,000 | 0.01      | 65,536  | 2,048     |
66//! | 1,000,000 | 0.001     | 65,536  | 2,048     |
67//! | 1,000,000 | 0.0001    | 131,072 | 4,096     |
68//! | 1,000,000 | 0.00001   | 131,072 | 4,096     |
69//! | 1,000,000 | 0.000001  | 262,144 | 8,192     |
70//!
71//! [parquet-bf-spec]: https://github.com/apache/parquet-format/blob/master/BloomFilter.md
72//! [sbbf-paper]: https://arxiv.org/pdf/2101.01719
73//! [bf-formulae]: http://tfk.mit.edu/pdf/bloom.pdf
74
75use crate::data_type::AsBytes;
76use crate::errors::ParquetError;
77use crate::file::metadata::ColumnChunkMetaData;
78use crate::file::reader::ChunkReader;
79use crate::format::{
80    BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash, BloomFilterHeader,
81    SplitBlockAlgorithm, Uncompressed, XxHash,
82};
83use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
84use bytes::Bytes;
85use std::hash::Hasher;
86use std::io::Write;
87use std::sync::Arc;
88use thrift::protocol::{TCompactOutputProtocol, TOutputProtocol};
89use twox_hash::XxHash64;
90
91/// Salt as defined in the [spec](https://github.com/apache/parquet-format/blob/master/BloomFilter.md#technical-approach).
92const SALT: [u32; 8] = [
93    0x47b6137b_u32,
94    0x44974d91_u32,
95    0x8824ad5b_u32,
96    0xa2b7289d_u32,
97    0x705495c7_u32,
98    0x2df1424b_u32,
99    0x9efc4947_u32,
100    0x5c6bfb31_u32,
101];
102
103/// Each block is 256 bits, broken up into eight contiguous "words", each consisting of 32 bits.
104/// Each word is thought of as an array of bits; each bit is either "set" or "not set".
105#[derive(Debug, Copy, Clone)]
106struct Block([u32; 8]);
107impl Block {
108    const ZERO: Block = Block([0; 8]);
109
110    /// takes as its argument a single unsigned 32-bit integer and returns a block in which each
111    /// word has exactly one bit set.
112    fn mask(x: u32) -> Self {
113        let mut result = [0_u32; 8];
114        for i in 0..8 {
115            // wrapping instead of checking for overflow
116            let y = x.wrapping_mul(SALT[i]);
117            let y = y >> 27;
118            result[i] = 1 << y;
119        }
120        Self(result)
121    }
122
123    #[inline]
124    #[cfg(target_endian = "little")]
125    fn to_le_bytes(self) -> [u8; 32] {
126        self.to_ne_bytes()
127    }
128
129    #[inline]
130    #[cfg(not(target_endian = "little"))]
131    fn to_le_bytes(self) -> [u8; 32] {
132        self.swap_bytes().to_ne_bytes()
133    }
134
135    #[inline]
136    fn to_ne_bytes(self) -> [u8; 32] {
137        // SAFETY: [u32; 8] and [u8; 32] have the same size and neither has invalid bit patterns.
138        unsafe { std::mem::transmute(self.0) }
139    }
140
141    #[inline]
142    #[cfg(not(target_endian = "little"))]
143    fn swap_bytes(mut self) -> Self {
144        self.0.iter_mut().for_each(|x| *x = x.swap_bytes());
145        self
146    }
147
148    /// setting every bit in the block that was also set in the result from mask
149    fn insert(&mut self, hash: u32) {
150        let mask = Self::mask(hash);
151        for i in 0..8 {
152            self[i] |= mask[i];
153        }
154    }
155
156    /// returns true when every bit that is set in the result of mask is also set in the block.
157    fn check(&self, hash: u32) -> bool {
158        let mask = Self::mask(hash);
159        for i in 0..8 {
160            if self[i] & mask[i] == 0 {
161                return false;
162            }
163        }
164        true
165    }
166}
167
168impl std::ops::Index<usize> for Block {
169    type Output = u32;
170
171    #[inline]
172    fn index(&self, index: usize) -> &Self::Output {
173        self.0.index(index)
174    }
175}
176
177impl std::ops::IndexMut<usize> for Block {
178    #[inline]
179    fn index_mut(&mut self, index: usize) -> &mut Self::Output {
180        self.0.index_mut(index)
181    }
182}
183
184/// A split block Bloom filter.
185///
186/// The creation of this structure is based on the [`crate::file::properties::BloomFilterProperties`]
187/// struct set via [`crate::file::properties::WriterProperties`] and is thus hidden by default.
188#[derive(Debug, Clone)]
189pub struct Sbbf(Vec<Block>);
190
191pub(crate) const SBBF_HEADER_SIZE_ESTIMATE: usize = 20;
192
193/// given an initial offset, and a byte buffer, try to read out a bloom filter header and return
194/// both the header and the offset after it (for bitset).
195pub(crate) fn chunk_read_bloom_filter_header_and_offset(
196    offset: u64,
197    buffer: Bytes,
198) -> Result<(BloomFilterHeader, u64), ParquetError> {
199    let (header, length) = read_bloom_filter_header_and_length(buffer)?;
200    Ok((header, offset + length))
201}
202
203/// given a [Bytes] buffer, try to read out a bloom filter header and return both the header and
204/// length of the header.
205#[inline]
206pub(crate) fn read_bloom_filter_header_and_length(
207    buffer: Bytes,
208) -> Result<(BloomFilterHeader, u64), ParquetError> {
209    let total_length = buffer.len();
210    let mut prot = TCompactSliceInputProtocol::new(buffer.as_ref());
211    let header = BloomFilterHeader::read_from_in_protocol(&mut prot)
212        .map_err(|e| ParquetError::General(format!("Could not read bloom filter header: {e}")))?;
213    Ok((header, (total_length - prot.as_slice().len()) as u64))
214}
215
216pub(crate) const BITSET_MIN_LENGTH: usize = 32;
217pub(crate) const BITSET_MAX_LENGTH: usize = 128 * 1024 * 1024;
218
219#[inline]
220fn optimal_num_of_bytes(num_bytes: usize) -> usize {
221    let num_bytes = num_bytes.min(BITSET_MAX_LENGTH);
222    let num_bytes = num_bytes.max(BITSET_MIN_LENGTH);
223    num_bytes.next_power_of_two()
224}
225
226// see http://algo2.iti.kit.edu/documents/cacheefficientbloomfilters-jea.pdf
227// given fpp = (1 - e^(-k * n / m)) ^ k
228// we have m = - k * n / ln(1 - fpp ^ (1 / k))
229// where k = number of hash functions, m = number of bits, n = number of distinct values
230#[inline]
231fn num_of_bits_from_ndv_fpp(ndv: u64, fpp: f64) -> usize {
232    let num_bits = -8.0 * ndv as f64 / (1.0 - fpp.powf(1.0 / 8.0)).ln();
233    num_bits as usize
234}
235
236impl Sbbf {
237    /// Create a new [Sbbf] with given number of distinct values and false positive probability.
238    /// Will return an error if `fpp` is greater than or equal to 1.0 or less than 0.0.
239    pub(crate) fn new_with_ndv_fpp(ndv: u64, fpp: f64) -> Result<Self, ParquetError> {
240        if !(0.0..1.0).contains(&fpp) {
241            return Err(ParquetError::General(format!(
242                "False positive probability must be between 0.0 and 1.0, got {fpp}"
243            )));
244        }
245        let num_bits = num_of_bits_from_ndv_fpp(ndv, fpp);
246        Ok(Self::new_with_num_of_bytes(num_bits / 8))
247    }
248
249    /// Create a new [Sbbf] with given number of bytes, the exact number of bytes will be adjusted
250    /// to the next power of two bounded by [BITSET_MIN_LENGTH] and [BITSET_MAX_LENGTH].
251    pub(crate) fn new_with_num_of_bytes(num_bytes: usize) -> Self {
252        let num_bytes = optimal_num_of_bytes(num_bytes);
253        let bitset = vec![0_u8; num_bytes];
254        Self::new(&bitset)
255    }
256
257    pub(crate) fn new(bitset: &[u8]) -> Self {
258        let data = bitset
259            .chunks_exact(4 * 8)
260            .map(|chunk| {
261                let mut block = Block::ZERO;
262                for (i, word) in chunk.chunks_exact(4).enumerate() {
263                    block[i] = u32::from_le_bytes(word.try_into().unwrap());
264                }
265                block
266            })
267            .collect::<Vec<Block>>();
268        Self(data)
269    }
270
271    /// Write the bloom filter data (header and then bitset) to the output. This doesn't
272    /// flush the writer in order to boost performance of bulk writing all blocks. Caller
273    /// must remember to flush the writer.
274    pub(crate) fn write<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
275        let mut protocol = TCompactOutputProtocol::new(&mut writer);
276        let header = self.header();
277        header.write_to_out_protocol(&mut protocol).map_err(|e| {
278            ParquetError::General(format!("Could not write bloom filter header: {e}"))
279        })?;
280        protocol.flush()?;
281        self.write_bitset(&mut writer)?;
282        Ok(())
283    }
284
285    /// Write the bitset in serialized form to the writer.
286    fn write_bitset<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
287        for block in &self.0 {
288            writer
289                .write_all(block.to_le_bytes().as_slice())
290                .map_err(|e| {
291                    ParquetError::General(format!("Could not write bloom filter bit set: {e}"))
292                })?;
293        }
294        Ok(())
295    }
296
297    /// Create and populate [`BloomFilterHeader`] from this bitset for writing to serialized form
298    fn header(&self) -> BloomFilterHeader {
299        BloomFilterHeader {
300            // 8 i32 per block, 4 bytes per i32
301            num_bytes: self.0.len() as i32 * 4 * 8,
302            algorithm: BloomFilterAlgorithm::BLOCK(SplitBlockAlgorithm {}),
303            hash: BloomFilterHash::XXHASH(XxHash {}),
304            compression: BloomFilterCompression::UNCOMPRESSED(Uncompressed {}),
305        }
306    }
307
308    /// Read a new bloom filter from the given offset in the given reader.
309    pub(crate) fn read_from_column_chunk<R: ChunkReader>(
310        column_metadata: &ColumnChunkMetaData,
311        reader: Arc<R>,
312    ) -> Result<Option<Self>, ParquetError> {
313        let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
314            offset
315                .try_into()
316                .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
317        } else {
318            return Ok(None);
319        };
320
321        let buffer = match column_metadata.bloom_filter_length() {
322            Some(length) => reader.get_bytes(offset, length as usize),
323            None => reader.get_bytes(offset, SBBF_HEADER_SIZE_ESTIMATE),
324        }?;
325
326        let (header, bitset_offset) =
327            chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
328
329        match header.algorithm {
330            BloomFilterAlgorithm::BLOCK(_) => {
331                // this match exists to future proof the singleton algorithm enum
332            }
333        }
334        match header.compression {
335            BloomFilterCompression::UNCOMPRESSED(_) => {
336                // this match exists to future proof the singleton compression enum
337            }
338        }
339        match header.hash {
340            BloomFilterHash::XXHASH(_) => {
341                // this match exists to future proof the singleton hash enum
342            }
343        }
344
345        let bitset = match column_metadata.bloom_filter_length() {
346            Some(_) => buffer.slice((bitset_offset - offset) as usize..),
347            None => {
348                let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
349                    ParquetError::General("Bloom filter length is invalid".to_string())
350                })?;
351                reader.get_bytes(bitset_offset, bitset_length)?
352            }
353        };
354
355        Ok(Some(Self::new(&bitset)))
356    }
357
358    #[inline]
359    fn hash_to_block_index(&self, hash: u64) -> usize {
360        // unchecked_mul is unstable, but in reality this is safe, we'd just use saturating mul
361        // but it will not saturate
362        (((hash >> 32).saturating_mul(self.0.len() as u64)) >> 32) as usize
363    }
364
365    /// Insert an [AsBytes] value into the filter
366    pub fn insert<T: AsBytes + ?Sized>(&mut self, value: &T) {
367        self.insert_hash(hash_as_bytes(value));
368    }
369
370    /// Insert a hash into the filter
371    fn insert_hash(&mut self, hash: u64) {
372        let block_index = self.hash_to_block_index(hash);
373        self.0[block_index].insert(hash as u32)
374    }
375
376    /// Check if an [AsBytes] value is probably present or definitely absent in the filter
377    pub fn check<T: AsBytes>(&self, value: &T) -> bool {
378        self.check_hash(hash_as_bytes(value))
379    }
380
381    /// Check if a hash is in the filter. May return
382    /// true for values that was never inserted ("false positive")
383    /// but will always return false if a hash has not been inserted.
384    fn check_hash(&self, hash: u64) -> bool {
385        let block_index = self.hash_to_block_index(hash);
386        self.0[block_index].check(hash as u32)
387    }
388
389    /// Return the total in memory size of this bloom filter in bytes
390    pub(crate) fn estimated_memory_size(&self) -> usize {
391        self.0.capacity() * std::mem::size_of::<Block>()
392    }
393}
394
395// per spec we use xxHash with seed=0
396const SEED: u64 = 0;
397
398#[inline]
399fn hash_as_bytes<A: AsBytes + ?Sized>(value: &A) -> u64 {
400    let mut hasher = XxHash64::with_seed(SEED);
401    hasher.write(value.as_bytes());
402    hasher.finish()
403}
404
405#[cfg(test)]
406mod tests {
407    use super::*;
408
409    #[test]
410    fn test_hash_bytes() {
411        assert_eq!(hash_as_bytes(""), 17241709254077376921);
412    }
413
414    #[test]
415    fn test_mask_set_quick_check() {
416        for i in 0..1_000_000 {
417            let result = Block::mask(i);
418            assert!(result.0.iter().all(|&x| x.is_power_of_two()));
419        }
420    }
421
422    #[test]
423    fn test_block_insert_and_check() {
424        for i in 0..1_000_000 {
425            let mut block = Block::ZERO;
426            block.insert(i);
427            assert!(block.check(i));
428        }
429    }
430
431    #[test]
432    fn test_sbbf_insert_and_check() {
433        let mut sbbf = Sbbf(vec![Block::ZERO; 1_000]);
434        for i in 0..1_000_000 {
435            sbbf.insert(&i);
436            assert!(sbbf.check(&i));
437        }
438    }
439
440    #[test]
441    fn test_with_fixture() {
442        // bloom filter produced by parquet-mr/spark for a column of i64 f"a{i}" for i in 0..10
443        let bitset: &[u8] = &[
444            200, 1, 80, 20, 64, 68, 8, 109, 6, 37, 4, 67, 144, 80, 96, 32, 8, 132, 43, 33, 0, 5,
445            99, 65, 2, 0, 224, 44, 64, 78, 96, 4,
446        ];
447        let sbbf = Sbbf::new(bitset);
448        for a in 0..10i64 {
449            let value = format!("a{a}");
450            assert!(sbbf.check(&value.as_str()));
451        }
452    }
453
454    /// test the assumption that bloom filter header size should not exceed SBBF_HEADER_SIZE_ESTIMATE
455    /// essentially we are testing that the struct is packed with 4 i32 fields, each can be 1-5 bytes
456    /// so altogether it'll be 20 bytes at most.
457    #[test]
458    fn test_bloom_filter_header_size_assumption() {
459        let buffer: &[u8; 16] = &[21, 64, 28, 28, 0, 0, 28, 28, 0, 0, 28, 28, 0, 0, 0, 99];
460        let (
461            BloomFilterHeader {
462                algorithm,
463                compression,
464                hash,
465                num_bytes,
466            },
467            read_length,
468        ) = read_bloom_filter_header_and_length(Bytes::copy_from_slice(buffer)).unwrap();
469        assert_eq!(read_length, 15);
470        assert_eq!(
471            algorithm,
472            BloomFilterAlgorithm::BLOCK(SplitBlockAlgorithm {})
473        );
474        assert_eq!(
475            compression,
476            BloomFilterCompression::UNCOMPRESSED(Uncompressed {})
477        );
478        assert_eq!(hash, BloomFilterHash::XXHASH(XxHash {}));
479        assert_eq!(num_bytes, 32_i32);
480        assert_eq!(20, SBBF_HEADER_SIZE_ESTIMATE);
481    }
482
483    #[test]
484    fn test_optimal_num_of_bytes() {
485        for (input, expected) in &[
486            (0, 32),
487            (9, 32),
488            (31, 32),
489            (32, 32),
490            (33, 64),
491            (99, 128),
492            (1024, 1024),
493            (999_000_000, 128 * 1024 * 1024),
494        ] {
495            assert_eq!(*expected, optimal_num_of_bytes(*input));
496        }
497    }
498
499    #[test]
500    fn test_num_of_bits_from_ndv_fpp() {
501        for (fpp, ndv, num_bits) in &[
502            (0.1, 10, 57),
503            (0.01, 10, 96),
504            (0.001, 10, 146),
505            (0.1, 100, 577),
506            (0.01, 100, 968),
507            (0.001, 100, 1460),
508            (0.1, 1000, 5772),
509            (0.01, 1000, 9681),
510            (0.001, 1000, 14607),
511            (0.1, 10000, 57725),
512            (0.01, 10000, 96815),
513            (0.001, 10000, 146076),
514            (0.1, 100000, 577254),
515            (0.01, 100000, 968152),
516            (0.001, 100000, 1460769),
517            (0.1, 1000000, 5772541),
518            (0.01, 1000000, 9681526),
519            (0.001, 1000000, 14607697),
520            (1e-50, 1_000_000_000_000, 14226231280773240832),
521        ] {
522            assert_eq!(*num_bits, num_of_bits_from_ndv_fpp(*ndv, *fpp) as u64);
523        }
524    }
525}