parquet/bloom_filter/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Bloom filter implementation specific to Parquet, as described
19//! in the [spec][parquet-bf-spec].
20//!
21//! # Bloom Filter Size
22//!
23//! Parquet uses the [Split Block Bloom Filter][sbbf-paper] (SBBF) as its bloom filter
24//! implementation. For each column upon which bloom filters are enabled, the offset and length of an SBBF
25//! is stored in  the metadata for each row group in the parquet file. The size of each filter is
26//! initialized using a calculation based on the desired number of distinct values (NDV) and false
27//! positive probability (FPP). The FPP for a SBBF can be approximated as<sup>[1][bf-formulae]</sup>:
28//!
29//! ```text
30//! f = (1 - e^(-k * n / m))^k
31//! ```
32//!
33//! Where, `f` is the FPP, `k` the number of hash functions, `n` the NDV, and `m` the total number
34//! of bits in the bloom filter. This can be re-arranged to determine the total number of bits
35//! required to achieve a given FPP and NDV:
36//!
37//! ```text
38//! m = -k * n / ln(1 - f^(1/k))
39//! ```
40//!
41//! SBBFs use eight hash functions to cleanly fit in SIMD lanes<sup>[2][sbbf-paper]</sup>, therefore
42//! `k` is set to 8. The SBBF will spread those `m` bits accross a set of `b` blocks that
43//! are each 256 bits, i.e., 32 bytes, in size. The number of blocks is chosen as:
44//!
45//! ```text
46//! b = NP2(m/8) / 32
47//! ```
48//!
49//! Where, `NP2` denotes *the next power of two*, and `m` is divided by 8 to be represented as bytes.
50//!
51//! Here is a table of calculated sizes for various FPP and NDV:
52//!
53//! | NDV       | FPP       | b       | Size (KB) |
54//! |-----------|-----------|---------|-----------|
55//! | 10,000    | 0.1       | 256     | 8         |
56//! | 10,000    | 0.01      | 512     | 16        |
57//! | 10,000    | 0.001     | 1,024   | 32        |
58//! | 10,000    | 0.0001    | 1,024   | 32        |
59//! | 100,000   | 0.1       | 4,096   | 128       |
60//! | 100,000   | 0.01      | 4,096   | 128       |
61//! | 100,000   | 0.001     | 8,192   | 256       |
62//! | 100,000   | 0.0001    | 16,384  | 512       |
63//! | 100,000   | 0.00001   | 16,384  | 512       |
64//! | 1,000,000 | 0.1       | 32,768  | 1,024     |
65//! | 1,000,000 | 0.01      | 65,536  | 2,048     |
66//! | 1,000,000 | 0.001     | 65,536  | 2,048     |
67//! | 1,000,000 | 0.0001    | 131,072 | 4,096     |
68//! | 1,000,000 | 0.00001   | 131,072 | 4,096     |
69//! | 1,000,000 | 0.000001  | 262,144 | 8,192     |
70//!
71//! [parquet-bf-spec]: https://github.com/apache/parquet-format/blob/master/BloomFilter.md
72//! [sbbf-paper]: https://arxiv.org/pdf/2101.01719
73//! [bf-formulae]: http://tfk.mit.edu/pdf/bloom.pdf
74
75use crate::data_type::AsBytes;
76use crate::errors::ParquetError;
77use crate::file::metadata::ColumnChunkMetaData;
78use crate::file::reader::ChunkReader;
79use crate::format::{
80    BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash, BloomFilterHeader,
81    SplitBlockAlgorithm, Uncompressed, XxHash,
82};
83use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
84use bytes::Bytes;
85use std::io::Write;
86use thrift::protocol::{TCompactOutputProtocol, TOutputProtocol};
87use twox_hash::XxHash64;
88
89/// Salt as defined in the [spec](https://github.com/apache/parquet-format/blob/master/BloomFilter.md#technical-approach).
90const SALT: [u32; 8] = [
91    0x47b6137b_u32,
92    0x44974d91_u32,
93    0x8824ad5b_u32,
94    0xa2b7289d_u32,
95    0x705495c7_u32,
96    0x2df1424b_u32,
97    0x9efc4947_u32,
98    0x5c6bfb31_u32,
99];
100
101/// Each block is 256 bits, broken up into eight contiguous "words", each consisting of 32 bits.
102/// Each word is thought of as an array of bits; each bit is either "set" or "not set".
103#[derive(Debug, Copy, Clone)]
104#[repr(transparent)]
105struct Block([u32; 8]);
106impl Block {
107    const ZERO: Block = Block([0; 8]);
108
109    /// takes as its argument a single unsigned 32-bit integer and returns a block in which each
110    /// word has exactly one bit set.
111    fn mask(x: u32) -> Self {
112        let mut result = [0_u32; 8];
113        for i in 0..8 {
114            // wrapping instead of checking for overflow
115            let y = x.wrapping_mul(SALT[i]);
116            let y = y >> 27;
117            result[i] = 1 << y;
118        }
119        Self(result)
120    }
121
122    #[inline]
123    #[cfg(not(target_endian = "little"))]
124    fn to_le_bytes(self) -> [u8; 32] {
125        self.swap_bytes().to_ne_bytes()
126    }
127
128    #[inline]
129    #[cfg(not(target_endian = "little"))]
130    fn swap_bytes(mut self) -> Self {
131        self.0.iter_mut().for_each(|x| *x = x.swap_bytes());
132        self
133    }
134
135    /// setting every bit in the block that was also set in the result from mask
136    fn insert(&mut self, hash: u32) {
137        let mask = Self::mask(hash);
138        for i in 0..8 {
139            self[i] |= mask[i];
140        }
141    }
142
143    /// returns true when every bit that is set in the result of mask is also set in the block.
144    fn check(&self, hash: u32) -> bool {
145        let mask = Self::mask(hash);
146        for i in 0..8 {
147            if self[i] & mask[i] == 0 {
148                return false;
149            }
150        }
151        true
152    }
153}
154
155impl std::ops::Index<usize> for Block {
156    type Output = u32;
157
158    #[inline]
159    fn index(&self, index: usize) -> &Self::Output {
160        self.0.index(index)
161    }
162}
163
164impl std::ops::IndexMut<usize> for Block {
165    #[inline]
166    fn index_mut(&mut self, index: usize) -> &mut Self::Output {
167        self.0.index_mut(index)
168    }
169}
170
171/// A split block Bloom filter.
172///
173/// The creation of this structure is based on the [`crate::file::properties::BloomFilterProperties`]
174/// struct set via [`crate::file::properties::WriterProperties`] and is thus hidden by default.
175#[derive(Debug, Clone)]
176pub struct Sbbf(Vec<Block>);
177
178pub(crate) const SBBF_HEADER_SIZE_ESTIMATE: usize = 20;
179
180/// given an initial offset, and a byte buffer, try to read out a bloom filter header and return
181/// both the header and the offset after it (for bitset).
182pub(crate) fn chunk_read_bloom_filter_header_and_offset(
183    offset: u64,
184    buffer: Bytes,
185) -> Result<(BloomFilterHeader, u64), ParquetError> {
186    let (header, length) = read_bloom_filter_header_and_length(buffer)?;
187    Ok((header, offset + length))
188}
189
190/// given a [Bytes] buffer, try to read out a bloom filter header and return both the header and
191/// length of the header.
192#[inline]
193pub(crate) fn read_bloom_filter_header_and_length(
194    buffer: Bytes,
195) -> Result<(BloomFilterHeader, u64), ParquetError> {
196    let total_length = buffer.len();
197    let mut prot = TCompactSliceInputProtocol::new(buffer.as_ref());
198    let header = BloomFilterHeader::read_from_in_protocol(&mut prot)
199        .map_err(|e| ParquetError::General(format!("Could not read bloom filter header: {e}")))?;
200    Ok((header, (total_length - prot.as_slice().len()) as u64))
201}
202
203pub(crate) const BITSET_MIN_LENGTH: usize = 32;
204pub(crate) const BITSET_MAX_LENGTH: usize = 128 * 1024 * 1024;
205
206#[inline]
207fn optimal_num_of_bytes(num_bytes: usize) -> usize {
208    let num_bytes = num_bytes.min(BITSET_MAX_LENGTH);
209    let num_bytes = num_bytes.max(BITSET_MIN_LENGTH);
210    num_bytes.next_power_of_two()
211}
212
213// see http://algo2.iti.kit.edu/documents/cacheefficientbloomfilters-jea.pdf
214// given fpp = (1 - e^(-k * n / m)) ^ k
215// we have m = - k * n / ln(1 - fpp ^ (1 / k))
216// where k = number of hash functions, m = number of bits, n = number of distinct values
217#[inline]
218fn num_of_bits_from_ndv_fpp(ndv: u64, fpp: f64) -> usize {
219    let num_bits = -8.0 * ndv as f64 / (1.0 - fpp.powf(1.0 / 8.0)).ln();
220    num_bits as usize
221}
222
223impl Sbbf {
224    /// Create a new [Sbbf] with given number of distinct values and false positive probability.
225    /// Will return an error if `fpp` is greater than or equal to 1.0 or less than 0.0.
226    pub(crate) fn new_with_ndv_fpp(ndv: u64, fpp: f64) -> Result<Self, ParquetError> {
227        if !(0.0..1.0).contains(&fpp) {
228            return Err(ParquetError::General(format!(
229                "False positive probability must be between 0.0 and 1.0, got {fpp}"
230            )));
231        }
232        let num_bits = num_of_bits_from_ndv_fpp(ndv, fpp);
233        Ok(Self::new_with_num_of_bytes(num_bits / 8))
234    }
235
236    /// Create a new [Sbbf] with given number of bytes, the exact number of bytes will be adjusted
237    /// to the next power of two bounded by [BITSET_MIN_LENGTH] and [BITSET_MAX_LENGTH].
238    pub(crate) fn new_with_num_of_bytes(num_bytes: usize) -> Self {
239        let num_bytes = optimal_num_of_bytes(num_bytes);
240        assert_eq!(num_bytes % size_of::<Block>(), 0);
241        let num_blocks = num_bytes / size_of::<Block>();
242        let bitset = vec![Block::ZERO; num_blocks];
243        Self(bitset)
244    }
245
246    pub(crate) fn new(bitset: &[u8]) -> Self {
247        let data = bitset
248            .chunks_exact(4 * 8)
249            .map(|chunk| {
250                let mut block = Block::ZERO;
251                for (i, word) in chunk.chunks_exact(4).enumerate() {
252                    block[i] = u32::from_le_bytes(word.try_into().unwrap());
253                }
254                block
255            })
256            .collect::<Vec<Block>>();
257        Self(data)
258    }
259
260    /// Write the bloom filter data (header and then bitset) to the output. This doesn't
261    /// flush the writer in order to boost performance of bulk writing all blocks. Caller
262    /// must remember to flush the writer.
263    pub(crate) fn write<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
264        let mut protocol = TCompactOutputProtocol::new(&mut writer);
265        let header = self.header();
266        header.write_to_out_protocol(&mut protocol).map_err(|e| {
267            ParquetError::General(format!("Could not write bloom filter header: {e}"))
268        })?;
269        protocol.flush()?;
270        self.write_bitset(&mut writer)?;
271        Ok(())
272    }
273
274    /// Write the bitset in serialized form to the writer.
275    #[cfg(not(target_endian = "little"))]
276    fn write_bitset<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
277        for block in &self.0 {
278            writer
279                .write_all(block.to_le_bytes().as_slice())
280                .map_err(|e| {
281                    ParquetError::General(format!("Could not write bloom filter bit set: {e}"))
282                })?;
283        }
284        Ok(())
285    }
286
287    /// Write the bitset in serialized form to the writer.
288    #[cfg(target_endian = "little")]
289    fn write_bitset<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
290        // Safety: Block is repr(transparent) and [u32; 8] can be reinterpreted as [u8; 32].
291        let slice = unsafe {
292            std::slice::from_raw_parts(
293                self.0.as_ptr() as *const u8,
294                self.0.len() * size_of::<Block>(),
295            )
296        };
297        writer.write_all(slice).map_err(|e| {
298            ParquetError::General(format!("Could not write bloom filter bit set: {e}"))
299        })?;
300        Ok(())
301    }
302
303    /// Create and populate [`BloomFilterHeader`] from this bitset for writing to serialized form
304    fn header(&self) -> BloomFilterHeader {
305        BloomFilterHeader {
306            // 8 i32 per block, 4 bytes per i32
307            num_bytes: self.0.len() as i32 * 4 * 8,
308            algorithm: BloomFilterAlgorithm::BLOCK(SplitBlockAlgorithm {}),
309            hash: BloomFilterHash::XXHASH(XxHash {}),
310            compression: BloomFilterCompression::UNCOMPRESSED(Uncompressed {}),
311        }
312    }
313
314    /// Read a new bloom filter from the given offset in the given reader.
315    pub(crate) fn read_from_column_chunk<R: ChunkReader>(
316        column_metadata: &ColumnChunkMetaData,
317        reader: &R,
318    ) -> Result<Option<Self>, ParquetError> {
319        let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
320            offset
321                .try_into()
322                .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
323        } else {
324            return Ok(None);
325        };
326
327        let buffer = match column_metadata.bloom_filter_length() {
328            Some(length) => reader.get_bytes(offset, length as usize),
329            None => reader.get_bytes(offset, SBBF_HEADER_SIZE_ESTIMATE),
330        }?;
331
332        let (header, bitset_offset) =
333            chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
334
335        match header.algorithm {
336            BloomFilterAlgorithm::BLOCK(_) => {
337                // this match exists to future proof the singleton algorithm enum
338            }
339        }
340        match header.compression {
341            BloomFilterCompression::UNCOMPRESSED(_) => {
342                // this match exists to future proof the singleton compression enum
343            }
344        }
345        match header.hash {
346            BloomFilterHash::XXHASH(_) => {
347                // this match exists to future proof the singleton hash enum
348            }
349        }
350
351        let bitset = match column_metadata.bloom_filter_length() {
352            Some(_) => buffer.slice((bitset_offset - offset) as usize..),
353            None => {
354                let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
355                    ParquetError::General("Bloom filter length is invalid".to_string())
356                })?;
357                reader.get_bytes(bitset_offset, bitset_length)?
358            }
359        };
360
361        Ok(Some(Self::new(&bitset)))
362    }
363
364    #[inline]
365    fn hash_to_block_index(&self, hash: u64) -> usize {
366        // unchecked_mul is unstable, but in reality this is safe, we'd just use saturating mul
367        // but it will not saturate
368        (((hash >> 32).saturating_mul(self.0.len() as u64)) >> 32) as usize
369    }
370
371    /// Insert an [AsBytes] value into the filter
372    pub fn insert<T: AsBytes + ?Sized>(&mut self, value: &T) {
373        self.insert_hash(hash_as_bytes(value));
374    }
375
376    /// Insert a hash into the filter
377    fn insert_hash(&mut self, hash: u64) {
378        let block_index = self.hash_to_block_index(hash);
379        self.0[block_index].insert(hash as u32)
380    }
381
382    /// Check if an [AsBytes] value is probably present or definitely absent in the filter
383    pub fn check<T: AsBytes>(&self, value: &T) -> bool {
384        self.check_hash(hash_as_bytes(value))
385    }
386
387    /// Check if a hash is in the filter. May return
388    /// true for values that was never inserted ("false positive")
389    /// but will always return false if a hash has not been inserted.
390    fn check_hash(&self, hash: u64) -> bool {
391        let block_index = self.hash_to_block_index(hash);
392        self.0[block_index].check(hash as u32)
393    }
394
395    /// Return the total in memory size of this bloom filter in bytes
396    pub(crate) fn estimated_memory_size(&self) -> usize {
397        self.0.capacity() * std::mem::size_of::<Block>()
398    }
399}
400
401// per spec we use xxHash with seed=0
402const SEED: u64 = 0;
403
404#[inline]
405fn hash_as_bytes<A: AsBytes + ?Sized>(value: &A) -> u64 {
406    XxHash64::oneshot(SEED, value.as_bytes())
407}
408
409#[cfg(test)]
410mod tests {
411    use super::*;
412
413    #[test]
414    fn test_hash_bytes() {
415        assert_eq!(hash_as_bytes(""), 17241709254077376921);
416    }
417
418    #[test]
419    fn test_mask_set_quick_check() {
420        for i in 0..1_000_000 {
421            let result = Block::mask(i);
422            assert!(result.0.iter().all(|&x| x.is_power_of_two()));
423        }
424    }
425
426    #[test]
427    fn test_block_insert_and_check() {
428        for i in 0..1_000_000 {
429            let mut block = Block::ZERO;
430            block.insert(i);
431            assert!(block.check(i));
432        }
433    }
434
435    #[test]
436    fn test_sbbf_insert_and_check() {
437        let mut sbbf = Sbbf(vec![Block::ZERO; 1_000]);
438        for i in 0..1_000_000 {
439            sbbf.insert(&i);
440            assert!(sbbf.check(&i));
441        }
442    }
443
444    #[test]
445    fn test_with_fixture() {
446        // bloom filter produced by parquet-mr/spark for a column of i64 f"a{i}" for i in 0..10
447        let bitset: &[u8] = &[
448            200, 1, 80, 20, 64, 68, 8, 109, 6, 37, 4, 67, 144, 80, 96, 32, 8, 132, 43, 33, 0, 5,
449            99, 65, 2, 0, 224, 44, 64, 78, 96, 4,
450        ];
451        let sbbf = Sbbf::new(bitset);
452        for a in 0..10i64 {
453            let value = format!("a{a}");
454            assert!(sbbf.check(&value.as_str()));
455        }
456    }
457
458    /// test the assumption that bloom filter header size should not exceed SBBF_HEADER_SIZE_ESTIMATE
459    /// essentially we are testing that the struct is packed with 4 i32 fields, each can be 1-5 bytes
460    /// so altogether it'll be 20 bytes at most.
461    #[test]
462    fn test_bloom_filter_header_size_assumption() {
463        let buffer: &[u8; 16] = &[21, 64, 28, 28, 0, 0, 28, 28, 0, 0, 28, 28, 0, 0, 0, 99];
464        let (
465            BloomFilterHeader {
466                algorithm,
467                compression,
468                hash,
469                num_bytes,
470            },
471            read_length,
472        ) = read_bloom_filter_header_and_length(Bytes::copy_from_slice(buffer)).unwrap();
473        assert_eq!(read_length, 15);
474        assert_eq!(
475            algorithm,
476            BloomFilterAlgorithm::BLOCK(SplitBlockAlgorithm {})
477        );
478        assert_eq!(
479            compression,
480            BloomFilterCompression::UNCOMPRESSED(Uncompressed {})
481        );
482        assert_eq!(hash, BloomFilterHash::XXHASH(XxHash {}));
483        assert_eq!(num_bytes, 32_i32);
484        assert_eq!(20, SBBF_HEADER_SIZE_ESTIMATE);
485    }
486
487    #[test]
488    fn test_optimal_num_of_bytes() {
489        for (input, expected) in &[
490            (0, 32),
491            (9, 32),
492            (31, 32),
493            (32, 32),
494            (33, 64),
495            (99, 128),
496            (1024, 1024),
497            (999_000_000, 128 * 1024 * 1024),
498        ] {
499            assert_eq!(*expected, optimal_num_of_bytes(*input));
500        }
501    }
502
503    #[test]
504    fn test_num_of_bits_from_ndv_fpp() {
505        for (fpp, ndv, num_bits) in &[
506            (0.1, 10, 57),
507            (0.01, 10, 96),
508            (0.001, 10, 146),
509            (0.1, 100, 577),
510            (0.01, 100, 968),
511            (0.001, 100, 1460),
512            (0.1, 1000, 5772),
513            (0.01, 1000, 9681),
514            (0.001, 1000, 14607),
515            (0.1, 10000, 57725),
516            (0.01, 10000, 96815),
517            (0.001, 10000, 146076),
518            (0.1, 100000, 577254),
519            (0.01, 100000, 968152),
520            (0.001, 100000, 1460769),
521            (0.1, 1000000, 5772541),
522            (0.01, 1000000, 9681526),
523            (0.001, 1000000, 14607697),
524            (1e-50, 1_000_000_000_000, 14226231280773240832),
525        ] {
526            assert_eq!(*num_bits, num_of_bits_from_ndv_fpp(*ndv, *fpp) as u64);
527        }
528    }
529}