parquet/bloom_filter/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Bloom filter implementation specific to Parquet, as described
19//! in the [spec][parquet-bf-spec].
20//!
21//! # Bloom Filter Size
22//!
23//! Parquet uses the [Split Block Bloom Filter][sbbf-paper] (SBBF) as its bloom filter
24//! implementation. For each column upon which bloom filters are enabled, the offset and length of an SBBF
25//! is stored in  the metadata for each row group in the parquet file. The size of each filter is
26//! initialized using a calculation based on the desired number of distinct values (NDV) and false
27//! positive probability (FPP). The FPP for a SBBF can be approximated as<sup>[1][bf-formulae]</sup>:
28//!
29//! ```text
30//! f = (1 - e^(-k * n / m))^k
31//! ```
32//!
33//! Where, `f` is the FPP, `k` the number of hash functions, `n` the NDV, and `m` the total number
34//! of bits in the bloom filter. This can be re-arranged to determine the total number of bits
35//! required to achieve a given FPP and NDV:
36//!
37//! ```text
38//! m = -k * n / ln(1 - f^(1/k))
39//! ```
40//!
41//! SBBFs use eight hash functions to cleanly fit in SIMD lanes<sup>[2][sbbf-paper]</sup>, therefore
42//! `k` is set to 8. The SBBF will spread those `m` bits accross a set of `b` blocks that
43//! are each 256 bits, i.e., 32 bytes, in size. The number of blocks is chosen as:
44//!
45//! ```text
46//! b = NP2(m/8) / 32
47//! ```
48//!
49//! Where, `NP2` denotes *the next power of two*, and `m` is divided by 8 to be represented as bytes.
50//!
51//! Here is a table of calculated sizes for various FPP and NDV:
52//!
53//! | NDV       | FPP       | b       | Size (KB) |
54//! |-----------|-----------|---------|-----------|
55//! | 10,000    | 0.1       | 256     | 8         |
56//! | 10,000    | 0.01      | 512     | 16        |
57//! | 10,000    | 0.001     | 1,024   | 32        |
58//! | 10,000    | 0.0001    | 1,024   | 32        |
59//! | 100,000   | 0.1       | 4,096   | 128       |
60//! | 100,000   | 0.01      | 4,096   | 128       |
61//! | 100,000   | 0.001     | 8,192   | 256       |
62//! | 100,000   | 0.0001    | 16,384  | 512       |
63//! | 100,000   | 0.00001   | 16,384  | 512       |
64//! | 1,000,000 | 0.1       | 32,768  | 1,024     |
65//! | 1,000,000 | 0.01      | 65,536  | 2,048     |
66//! | 1,000,000 | 0.001     | 65,536  | 2,048     |
67//! | 1,000,000 | 0.0001    | 131,072 | 4,096     |
68//! | 1,000,000 | 0.00001   | 131,072 | 4,096     |
69//! | 1,000,000 | 0.000001  | 262,144 | 8,192     |
70//!
71//! [parquet-bf-spec]: https://github.com/apache/parquet-format/blob/master/BloomFilter.md
72//! [sbbf-paper]: https://arxiv.org/pdf/2101.01719
73//! [bf-formulae]: http://tfk.mit.edu/pdf/bloom.pdf
74
75use crate::data_type::AsBytes;
76use crate::errors::ParquetError;
77use crate::file::metadata::ColumnChunkMetaData;
78use crate::file::reader::ChunkReader;
79use crate::format::{
80    BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash, BloomFilterHeader,
81    SplitBlockAlgorithm, Uncompressed, XxHash,
82};
83use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
84use bytes::Bytes;
85use std::io::Write;
86use thrift::protocol::{TCompactOutputProtocol, TOutputProtocol};
87use twox_hash::XxHash64;
88
89/// Salt as defined in the [spec](https://github.com/apache/parquet-format/blob/master/BloomFilter.md#technical-approach).
90const SALT: [u32; 8] = [
91    0x47b6137b_u32,
92    0x44974d91_u32,
93    0x8824ad5b_u32,
94    0xa2b7289d_u32,
95    0x705495c7_u32,
96    0x2df1424b_u32,
97    0x9efc4947_u32,
98    0x5c6bfb31_u32,
99];
100
101/// Each block is 256 bits, broken up into eight contiguous "words", each consisting of 32 bits.
102/// Each word is thought of as an array of bits; each bit is either "set" or "not set".
103#[derive(Debug, Copy, Clone)]
104struct Block([u32; 8]);
105impl Block {
106    const ZERO: Block = Block([0; 8]);
107
108    /// takes as its argument a single unsigned 32-bit integer and returns a block in which each
109    /// word has exactly one bit set.
110    fn mask(x: u32) -> Self {
111        let mut result = [0_u32; 8];
112        for i in 0..8 {
113            // wrapping instead of checking for overflow
114            let y = x.wrapping_mul(SALT[i]);
115            let y = y >> 27;
116            result[i] = 1 << y;
117        }
118        Self(result)
119    }
120
121    #[inline]
122    #[cfg(target_endian = "little")]
123    fn to_le_bytes(self) -> [u8; 32] {
124        self.to_ne_bytes()
125    }
126
127    #[inline]
128    #[cfg(not(target_endian = "little"))]
129    fn to_le_bytes(self) -> [u8; 32] {
130        self.swap_bytes().to_ne_bytes()
131    }
132
133    #[inline]
134    fn to_ne_bytes(self) -> [u8; 32] {
135        // SAFETY: [u32; 8] and [u8; 32] have the same size and neither has invalid bit patterns.
136        unsafe { std::mem::transmute(self.0) }
137    }
138
139    #[inline]
140    #[cfg(not(target_endian = "little"))]
141    fn swap_bytes(mut self) -> Self {
142        self.0.iter_mut().for_each(|x| *x = x.swap_bytes());
143        self
144    }
145
146    /// setting every bit in the block that was also set in the result from mask
147    fn insert(&mut self, hash: u32) {
148        let mask = Self::mask(hash);
149        for i in 0..8 {
150            self[i] |= mask[i];
151        }
152    }
153
154    /// returns true when every bit that is set in the result of mask is also set in the block.
155    fn check(&self, hash: u32) -> bool {
156        let mask = Self::mask(hash);
157        for i in 0..8 {
158            if self[i] & mask[i] == 0 {
159                return false;
160            }
161        }
162        true
163    }
164}
165
166impl std::ops::Index<usize> for Block {
167    type Output = u32;
168
169    #[inline]
170    fn index(&self, index: usize) -> &Self::Output {
171        self.0.index(index)
172    }
173}
174
175impl std::ops::IndexMut<usize> for Block {
176    #[inline]
177    fn index_mut(&mut self, index: usize) -> &mut Self::Output {
178        self.0.index_mut(index)
179    }
180}
181
182/// A split block Bloom filter.
183///
184/// The creation of this structure is based on the [`crate::file::properties::BloomFilterProperties`]
185/// struct set via [`crate::file::properties::WriterProperties`] and is thus hidden by default.
186#[derive(Debug, Clone)]
187pub struct Sbbf(Vec<Block>);
188
189pub(crate) const SBBF_HEADER_SIZE_ESTIMATE: usize = 20;
190
191/// given an initial offset, and a byte buffer, try to read out a bloom filter header and return
192/// both the header and the offset after it (for bitset).
193pub(crate) fn chunk_read_bloom_filter_header_and_offset(
194    offset: u64,
195    buffer: Bytes,
196) -> Result<(BloomFilterHeader, u64), ParquetError> {
197    let (header, length) = read_bloom_filter_header_and_length(buffer)?;
198    Ok((header, offset + length))
199}
200
201/// given a [Bytes] buffer, try to read out a bloom filter header and return both the header and
202/// length of the header.
203#[inline]
204pub(crate) fn read_bloom_filter_header_and_length(
205    buffer: Bytes,
206) -> Result<(BloomFilterHeader, u64), ParquetError> {
207    let total_length = buffer.len();
208    let mut prot = TCompactSliceInputProtocol::new(buffer.as_ref());
209    let header = BloomFilterHeader::read_from_in_protocol(&mut prot)
210        .map_err(|e| ParquetError::General(format!("Could not read bloom filter header: {e}")))?;
211    Ok((header, (total_length - prot.as_slice().len()) as u64))
212}
213
214pub(crate) const BITSET_MIN_LENGTH: usize = 32;
215pub(crate) const BITSET_MAX_LENGTH: usize = 128 * 1024 * 1024;
216
217#[inline]
218fn optimal_num_of_bytes(num_bytes: usize) -> usize {
219    let num_bytes = num_bytes.min(BITSET_MAX_LENGTH);
220    let num_bytes = num_bytes.max(BITSET_MIN_LENGTH);
221    num_bytes.next_power_of_two()
222}
223
224// see http://algo2.iti.kit.edu/documents/cacheefficientbloomfilters-jea.pdf
225// given fpp = (1 - e^(-k * n / m)) ^ k
226// we have m = - k * n / ln(1 - fpp ^ (1 / k))
227// where k = number of hash functions, m = number of bits, n = number of distinct values
228#[inline]
229fn num_of_bits_from_ndv_fpp(ndv: u64, fpp: f64) -> usize {
230    let num_bits = -8.0 * ndv as f64 / (1.0 - fpp.powf(1.0 / 8.0)).ln();
231    num_bits as usize
232}
233
234impl Sbbf {
235    /// Create a new [Sbbf] with given number of distinct values and false positive probability.
236    /// Will return an error if `fpp` is greater than or equal to 1.0 or less than 0.0.
237    pub(crate) fn new_with_ndv_fpp(ndv: u64, fpp: f64) -> Result<Self, ParquetError> {
238        if !(0.0..1.0).contains(&fpp) {
239            return Err(ParquetError::General(format!(
240                "False positive probability must be between 0.0 and 1.0, got {fpp}"
241            )));
242        }
243        let num_bits = num_of_bits_from_ndv_fpp(ndv, fpp);
244        Ok(Self::new_with_num_of_bytes(num_bits / 8))
245    }
246
247    /// Create a new [Sbbf] with given number of bytes, the exact number of bytes will be adjusted
248    /// to the next power of two bounded by [BITSET_MIN_LENGTH] and [BITSET_MAX_LENGTH].
249    pub(crate) fn new_with_num_of_bytes(num_bytes: usize) -> Self {
250        let num_bytes = optimal_num_of_bytes(num_bytes);
251        let bitset = vec![0_u8; num_bytes];
252        Self::new(&bitset)
253    }
254
255    pub(crate) fn new(bitset: &[u8]) -> Self {
256        let data = bitset
257            .chunks_exact(4 * 8)
258            .map(|chunk| {
259                let mut block = Block::ZERO;
260                for (i, word) in chunk.chunks_exact(4).enumerate() {
261                    block[i] = u32::from_le_bytes(word.try_into().unwrap());
262                }
263                block
264            })
265            .collect::<Vec<Block>>();
266        Self(data)
267    }
268
269    /// Write the bloom filter data (header and then bitset) to the output. This doesn't
270    /// flush the writer in order to boost performance of bulk writing all blocks. Caller
271    /// must remember to flush the writer.
272    pub(crate) fn write<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
273        let mut protocol = TCompactOutputProtocol::new(&mut writer);
274        let header = self.header();
275        header.write_to_out_protocol(&mut protocol).map_err(|e| {
276            ParquetError::General(format!("Could not write bloom filter header: {e}"))
277        })?;
278        protocol.flush()?;
279        self.write_bitset(&mut writer)?;
280        Ok(())
281    }
282
283    /// Write the bitset in serialized form to the writer.
284    fn write_bitset<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
285        for block in &self.0 {
286            writer
287                .write_all(block.to_le_bytes().as_slice())
288                .map_err(|e| {
289                    ParquetError::General(format!("Could not write bloom filter bit set: {e}"))
290                })?;
291        }
292        Ok(())
293    }
294
295    /// Create and populate [`BloomFilterHeader`] from this bitset for writing to serialized form
296    fn header(&self) -> BloomFilterHeader {
297        BloomFilterHeader {
298            // 8 i32 per block, 4 bytes per i32
299            num_bytes: self.0.len() as i32 * 4 * 8,
300            algorithm: BloomFilterAlgorithm::BLOCK(SplitBlockAlgorithm {}),
301            hash: BloomFilterHash::XXHASH(XxHash {}),
302            compression: BloomFilterCompression::UNCOMPRESSED(Uncompressed {}),
303        }
304    }
305
306    /// Read a new bloom filter from the given offset in the given reader.
307    pub(crate) fn read_from_column_chunk<R: ChunkReader>(
308        column_metadata: &ColumnChunkMetaData,
309        reader: &R,
310    ) -> Result<Option<Self>, ParquetError> {
311        let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
312            offset
313                .try_into()
314                .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
315        } else {
316            return Ok(None);
317        };
318
319        let buffer = match column_metadata.bloom_filter_length() {
320            Some(length) => reader.get_bytes(offset, length as usize),
321            None => reader.get_bytes(offset, SBBF_HEADER_SIZE_ESTIMATE),
322        }?;
323
324        let (header, bitset_offset) =
325            chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
326
327        match header.algorithm {
328            BloomFilterAlgorithm::BLOCK(_) => {
329                // this match exists to future proof the singleton algorithm enum
330            }
331        }
332        match header.compression {
333            BloomFilterCompression::UNCOMPRESSED(_) => {
334                // this match exists to future proof the singleton compression enum
335            }
336        }
337        match header.hash {
338            BloomFilterHash::XXHASH(_) => {
339                // this match exists to future proof the singleton hash enum
340            }
341        }
342
343        let bitset = match column_metadata.bloom_filter_length() {
344            Some(_) => buffer.slice((bitset_offset - offset) as usize..),
345            None => {
346                let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
347                    ParquetError::General("Bloom filter length is invalid".to_string())
348                })?;
349                reader.get_bytes(bitset_offset, bitset_length)?
350            }
351        };
352
353        Ok(Some(Self::new(&bitset)))
354    }
355
356    #[inline]
357    fn hash_to_block_index(&self, hash: u64) -> usize {
358        // unchecked_mul is unstable, but in reality this is safe, we'd just use saturating mul
359        // but it will not saturate
360        (((hash >> 32).saturating_mul(self.0.len() as u64)) >> 32) as usize
361    }
362
363    /// Insert an [AsBytes] value into the filter
364    pub fn insert<T: AsBytes + ?Sized>(&mut self, value: &T) {
365        self.insert_hash(hash_as_bytes(value));
366    }
367
368    /// Insert a hash into the filter
369    fn insert_hash(&mut self, hash: u64) {
370        let block_index = self.hash_to_block_index(hash);
371        self.0[block_index].insert(hash as u32)
372    }
373
374    /// Check if an [AsBytes] value is probably present or definitely absent in the filter
375    pub fn check<T: AsBytes>(&self, value: &T) -> bool {
376        self.check_hash(hash_as_bytes(value))
377    }
378
379    /// Check if a hash is in the filter. May return
380    /// true for values that was never inserted ("false positive")
381    /// but will always return false if a hash has not been inserted.
382    fn check_hash(&self, hash: u64) -> bool {
383        let block_index = self.hash_to_block_index(hash);
384        self.0[block_index].check(hash as u32)
385    }
386
387    /// Return the total in memory size of this bloom filter in bytes
388    pub(crate) fn estimated_memory_size(&self) -> usize {
389        self.0.capacity() * std::mem::size_of::<Block>()
390    }
391}
392
393// per spec we use xxHash with seed=0
394const SEED: u64 = 0;
395
396#[inline]
397fn hash_as_bytes<A: AsBytes + ?Sized>(value: &A) -> u64 {
398    XxHash64::oneshot(SEED, value.as_bytes())
399}
400
401#[cfg(test)]
402mod tests {
403    use super::*;
404
405    #[test]
406    fn test_hash_bytes() {
407        assert_eq!(hash_as_bytes(""), 17241709254077376921);
408    }
409
410    #[test]
411    fn test_mask_set_quick_check() {
412        for i in 0..1_000_000 {
413            let result = Block::mask(i);
414            assert!(result.0.iter().all(|&x| x.is_power_of_two()));
415        }
416    }
417
418    #[test]
419    fn test_block_insert_and_check() {
420        for i in 0..1_000_000 {
421            let mut block = Block::ZERO;
422            block.insert(i);
423            assert!(block.check(i));
424        }
425    }
426
427    #[test]
428    fn test_sbbf_insert_and_check() {
429        let mut sbbf = Sbbf(vec![Block::ZERO; 1_000]);
430        for i in 0..1_000_000 {
431            sbbf.insert(&i);
432            assert!(sbbf.check(&i));
433        }
434    }
435
436    #[test]
437    fn test_with_fixture() {
438        // bloom filter produced by parquet-mr/spark for a column of i64 f"a{i}" for i in 0..10
439        let bitset: &[u8] = &[
440            200, 1, 80, 20, 64, 68, 8, 109, 6, 37, 4, 67, 144, 80, 96, 32, 8, 132, 43, 33, 0, 5,
441            99, 65, 2, 0, 224, 44, 64, 78, 96, 4,
442        ];
443        let sbbf = Sbbf::new(bitset);
444        for a in 0..10i64 {
445            let value = format!("a{a}");
446            assert!(sbbf.check(&value.as_str()));
447        }
448    }
449
450    /// test the assumption that bloom filter header size should not exceed SBBF_HEADER_SIZE_ESTIMATE
451    /// essentially we are testing that the struct is packed with 4 i32 fields, each can be 1-5 bytes
452    /// so altogether it'll be 20 bytes at most.
453    #[test]
454    fn test_bloom_filter_header_size_assumption() {
455        let buffer: &[u8; 16] = &[21, 64, 28, 28, 0, 0, 28, 28, 0, 0, 28, 28, 0, 0, 0, 99];
456        let (
457            BloomFilterHeader {
458                algorithm,
459                compression,
460                hash,
461                num_bytes,
462            },
463            read_length,
464        ) = read_bloom_filter_header_and_length(Bytes::copy_from_slice(buffer)).unwrap();
465        assert_eq!(read_length, 15);
466        assert_eq!(
467            algorithm,
468            BloomFilterAlgorithm::BLOCK(SplitBlockAlgorithm {})
469        );
470        assert_eq!(
471            compression,
472            BloomFilterCompression::UNCOMPRESSED(Uncompressed {})
473        );
474        assert_eq!(hash, BloomFilterHash::XXHASH(XxHash {}));
475        assert_eq!(num_bytes, 32_i32);
476        assert_eq!(20, SBBF_HEADER_SIZE_ESTIMATE);
477    }
478
479    #[test]
480    fn test_optimal_num_of_bytes() {
481        for (input, expected) in &[
482            (0, 32),
483            (9, 32),
484            (31, 32),
485            (32, 32),
486            (33, 64),
487            (99, 128),
488            (1024, 1024),
489            (999_000_000, 128 * 1024 * 1024),
490        ] {
491            assert_eq!(*expected, optimal_num_of_bytes(*input));
492        }
493    }
494
495    #[test]
496    fn test_num_of_bits_from_ndv_fpp() {
497        for (fpp, ndv, num_bits) in &[
498            (0.1, 10, 57),
499            (0.01, 10, 96),
500            (0.001, 10, 146),
501            (0.1, 100, 577),
502            (0.01, 100, 968),
503            (0.001, 100, 1460),
504            (0.1, 1000, 5772),
505            (0.01, 1000, 9681),
506            (0.001, 1000, 14607),
507            (0.1, 10000, 57725),
508            (0.01, 10000, 96815),
509            (0.001, 10000, 146076),
510            (0.1, 100000, 577254),
511            (0.01, 100000, 968152),
512            (0.001, 100000, 1460769),
513            (0.1, 1000000, 5772541),
514            (0.01, 1000000, 9681526),
515            (0.001, 1000000, 14607697),
516            (1e-50, 1_000_000_000_000, 14226231280773240832),
517        ] {
518            assert_eq!(*num_bits, num_of_bits_from_ndv_fpp(*ndv, *fpp) as u64);
519        }
520    }
521}