parquet/bloom_filter/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Bloom filter implementation specific to Parquet, as described
19//! in the [spec][parquet-bf-spec].
20//!
21//! # Bloom Filter Size
22//!
23//! Parquet uses the [Split Block Bloom Filter][sbbf-paper] (SBBF) as its bloom filter
24//! implementation. For each column upon which bloom filters are enabled, the offset and length of an SBBF
25//! is stored in  the metadata for each row group in the parquet file. The size of each filter is
26//! initialized using a calculation based on the desired number of distinct values (NDV) and false
27//! positive probability (FPP). The FPP for a SBBF can be approximated as<sup>[1][bf-formulae]</sup>:
28//!
29//! ```text
30//! f = (1 - e^(-k * n / m))^k
31//! ```
32//!
33//! Where, `f` is the FPP, `k` the number of hash functions, `n` the NDV, and `m` the total number
34//! of bits in the bloom filter. This can be re-arranged to determine the total number of bits
35//! required to achieve a given FPP and NDV:
36//!
37//! ```text
38//! m = -k * n / ln(1 - f^(1/k))
39//! ```
40//!
41//! SBBFs use eight hash functions to cleanly fit in SIMD lanes<sup>[2][sbbf-paper]</sup>, therefore
42//! `k` is set to 8. The SBBF will spread those `m` bits accross a set of `b` blocks that
43//! are each 256 bits, i.e., 32 bytes, in size. The number of blocks is chosen as:
44//!
45//! ```text
46//! b = NP2(m/8) / 32
47//! ```
48//!
49//! Where, `NP2` denotes *the next power of two*, and `m` is divided by 8 to be represented as bytes.
50//!
51//! Here is a table of calculated sizes for various FPP and NDV:
52//!
53//! | NDV       | FPP       | b       | Size (KB) |
54//! |-----------|-----------|---------|-----------|
55//! | 10,000    | 0.1       | 256     | 8         |
56//! | 10,000    | 0.01      | 512     | 16        |
57//! | 10,000    | 0.001     | 1,024   | 32        |
58//! | 10,000    | 0.0001    | 1,024   | 32        |
59//! | 100,000   | 0.1       | 4,096   | 128       |
60//! | 100,000   | 0.01      | 4,096   | 128       |
61//! | 100,000   | 0.001     | 8,192   | 256       |
62//! | 100,000   | 0.0001    | 16,384  | 512       |
63//! | 100,000   | 0.00001   | 16,384  | 512       |
64//! | 1,000,000 | 0.1       | 32,768  | 1,024     |
65//! | 1,000,000 | 0.01      | 65,536  | 2,048     |
66//! | 1,000,000 | 0.001     | 65,536  | 2,048     |
67//! | 1,000,000 | 0.0001    | 131,072 | 4,096     |
68//! | 1,000,000 | 0.00001   | 131,072 | 4,096     |
69//! | 1,000,000 | 0.000001  | 262,144 | 8,192     |
70//!
71//! [parquet-bf-spec]: https://github.com/apache/parquet-format/blob/master/BloomFilter.md
72//! [sbbf-paper]: https://arxiv.org/pdf/2101.01719
73//! [bf-formulae]: http://tfk.mit.edu/pdf/bloom.pdf
74
75use crate::data_type::AsBytes;
76use crate::errors::ParquetError;
77use crate::file::metadata::ColumnChunkMetaData;
78use crate::file::reader::ChunkReader;
79use crate::format::{
80    BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash, BloomFilterHeader,
81    SplitBlockAlgorithm, Uncompressed, XxHash,
82};
83use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
84use bytes::Bytes;
85use std::io::Write;
86use thrift::protocol::{TCompactOutputProtocol, TOutputProtocol};
87use twox_hash::XxHash64;
88
89/// Salt as defined in the [spec](https://github.com/apache/parquet-format/blob/master/BloomFilter.md#technical-approach).
90const SALT: [u32; 8] = [
91    0x47b6137b_u32,
92    0x44974d91_u32,
93    0x8824ad5b_u32,
94    0xa2b7289d_u32,
95    0x705495c7_u32,
96    0x2df1424b_u32,
97    0x9efc4947_u32,
98    0x5c6bfb31_u32,
99];
100
101/// Each block is 256 bits, broken up into eight contiguous "words", each consisting of 32 bits.
102/// Each word is thought of as an array of bits; each bit is either "set" or "not set".
103#[derive(Debug, Copy, Clone)]
104#[repr(transparent)]
105struct Block([u32; 8]);
106impl Block {
107    const ZERO: Block = Block([0; 8]);
108
109    /// takes as its argument a single unsigned 32-bit integer and returns a block in which each
110    /// word has exactly one bit set.
111    fn mask(x: u32) -> Self {
112        let mut result = [0_u32; 8];
113        for i in 0..8 {
114            // wrapping instead of checking for overflow
115            let y = x.wrapping_mul(SALT[i]);
116            let y = y >> 27;
117            result[i] = 1 << y;
118        }
119        Self(result)
120    }
121
122    #[inline]
123    #[cfg(not(target_endian = "little"))]
124    fn to_ne_bytes(self) -> [u8; 32] {
125        // SAFETY: [u32; 8] and [u8; 32] have the same size and neither has invalid bit patterns.
126        unsafe { std::mem::transmute(self.0) }
127    }
128
129    #[inline]
130    #[cfg(not(target_endian = "little"))]
131    fn to_le_bytes(self) -> [u8; 32] {
132        self.swap_bytes().to_ne_bytes()
133    }
134
135    #[inline]
136    #[cfg(not(target_endian = "little"))]
137    fn swap_bytes(mut self) -> Self {
138        self.0.iter_mut().for_each(|x| *x = x.swap_bytes());
139        self
140    }
141
142    /// setting every bit in the block that was also set in the result from mask
143    fn insert(&mut self, hash: u32) {
144        let mask = Self::mask(hash);
145        for i in 0..8 {
146            self[i] |= mask[i];
147        }
148    }
149
150    /// returns true when every bit that is set in the result of mask is also set in the block.
151    fn check(&self, hash: u32) -> bool {
152        let mask = Self::mask(hash);
153        for i in 0..8 {
154            if self[i] & mask[i] == 0 {
155                return false;
156            }
157        }
158        true
159    }
160}
161
162impl std::ops::Index<usize> for Block {
163    type Output = u32;
164
165    #[inline]
166    fn index(&self, index: usize) -> &Self::Output {
167        self.0.index(index)
168    }
169}
170
171impl std::ops::IndexMut<usize> for Block {
172    #[inline]
173    fn index_mut(&mut self, index: usize) -> &mut Self::Output {
174        self.0.index_mut(index)
175    }
176}
177
178/// A split block Bloom filter.
179///
180/// The creation of this structure is based on the [`crate::file::properties::BloomFilterProperties`]
181/// struct set via [`crate::file::properties::WriterProperties`] and is thus hidden by default.
182#[derive(Debug, Clone)]
183pub struct Sbbf(Vec<Block>);
184
185pub(crate) const SBBF_HEADER_SIZE_ESTIMATE: usize = 20;
186
187/// given an initial offset, and a byte buffer, try to read out a bloom filter header and return
188/// both the header and the offset after it (for bitset).
189pub(crate) fn chunk_read_bloom_filter_header_and_offset(
190    offset: u64,
191    buffer: Bytes,
192) -> Result<(BloomFilterHeader, u64), ParquetError> {
193    let (header, length) = read_bloom_filter_header_and_length(buffer)?;
194    Ok((header, offset + length))
195}
196
197/// given a [Bytes] buffer, try to read out a bloom filter header and return both the header and
198/// length of the header.
199#[inline]
200pub(crate) fn read_bloom_filter_header_and_length(
201    buffer: Bytes,
202) -> Result<(BloomFilterHeader, u64), ParquetError> {
203    let total_length = buffer.len();
204    let mut prot = TCompactSliceInputProtocol::new(buffer.as_ref());
205    let header = BloomFilterHeader::read_from_in_protocol(&mut prot)
206        .map_err(|e| ParquetError::General(format!("Could not read bloom filter header: {e}")))?;
207    Ok((header, (total_length - prot.as_slice().len()) as u64))
208}
209
210pub(crate) const BITSET_MIN_LENGTH: usize = 32;
211pub(crate) const BITSET_MAX_LENGTH: usize = 128 * 1024 * 1024;
212
213#[inline]
214fn optimal_num_of_bytes(num_bytes: usize) -> usize {
215    let num_bytes = num_bytes.min(BITSET_MAX_LENGTH);
216    let num_bytes = num_bytes.max(BITSET_MIN_LENGTH);
217    num_bytes.next_power_of_two()
218}
219
220// see http://algo2.iti.kit.edu/documents/cacheefficientbloomfilters-jea.pdf
221// given fpp = (1 - e^(-k * n / m)) ^ k
222// we have m = - k * n / ln(1 - fpp ^ (1 / k))
223// where k = number of hash functions, m = number of bits, n = number of distinct values
224#[inline]
225fn num_of_bits_from_ndv_fpp(ndv: u64, fpp: f64) -> usize {
226    let num_bits = -8.0 * ndv as f64 / (1.0 - fpp.powf(1.0 / 8.0)).ln();
227    num_bits as usize
228}
229
230impl Sbbf {
231    /// Create a new [Sbbf] with given number of distinct values and false positive probability.
232    /// Will return an error if `fpp` is greater than or equal to 1.0 or less than 0.0.
233    pub(crate) fn new_with_ndv_fpp(ndv: u64, fpp: f64) -> Result<Self, ParquetError> {
234        if !(0.0..1.0).contains(&fpp) {
235            return Err(ParquetError::General(format!(
236                "False positive probability must be between 0.0 and 1.0, got {fpp}"
237            )));
238        }
239        let num_bits = num_of_bits_from_ndv_fpp(ndv, fpp);
240        Ok(Self::new_with_num_of_bytes(num_bits / 8))
241    }
242
243    /// Create a new [Sbbf] with given number of bytes, the exact number of bytes will be adjusted
244    /// to the next power of two bounded by [BITSET_MIN_LENGTH] and [BITSET_MAX_LENGTH].
245    pub(crate) fn new_with_num_of_bytes(num_bytes: usize) -> Self {
246        let num_bytes = optimal_num_of_bytes(num_bytes);
247        assert_eq!(num_bytes % size_of::<Block>(), 0);
248        let num_blocks = num_bytes / size_of::<Block>();
249        let bitset = vec![Block::ZERO; num_blocks];
250        Self(bitset)
251    }
252
253    pub(crate) fn new(bitset: &[u8]) -> Self {
254        let data = bitset
255            .chunks_exact(4 * 8)
256            .map(|chunk| {
257                let mut block = Block::ZERO;
258                for (i, word) in chunk.chunks_exact(4).enumerate() {
259                    block[i] = u32::from_le_bytes(word.try_into().unwrap());
260                }
261                block
262            })
263            .collect::<Vec<Block>>();
264        Self(data)
265    }
266
267    /// Write the bloom filter data (header and then bitset) to the output. This doesn't
268    /// flush the writer in order to boost performance of bulk writing all blocks. Caller
269    /// must remember to flush the writer.
270    pub(crate) fn write<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
271        let mut protocol = TCompactOutputProtocol::new(&mut writer);
272        let header = self.header();
273        header.write_to_out_protocol(&mut protocol).map_err(|e| {
274            ParquetError::General(format!("Could not write bloom filter header: {e}"))
275        })?;
276        protocol.flush()?;
277        self.write_bitset(&mut writer)?;
278        Ok(())
279    }
280
281    /// Write the bitset in serialized form to the writer.
282    #[cfg(not(target_endian = "little"))]
283    fn write_bitset<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
284        for block in &self.0 {
285            writer
286                .write_all(block.to_le_bytes().as_slice())
287                .map_err(|e| {
288                    ParquetError::General(format!("Could not write bloom filter bit set: {e}"))
289                })?;
290        }
291        Ok(())
292    }
293
294    /// Write the bitset in serialized form to the writer.
295    #[cfg(target_endian = "little")]
296    fn write_bitset<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
297        // Safety: Block is repr(transparent) and [u32; 8] can be reinterpreted as [u8; 32].
298        let slice = unsafe {
299            std::slice::from_raw_parts(
300                self.0.as_ptr() as *const u8,
301                self.0.len() * size_of::<Block>(),
302            )
303        };
304        writer.write_all(slice).map_err(|e| {
305            ParquetError::General(format!("Could not write bloom filter bit set: {e}"))
306        })?;
307        Ok(())
308    }
309
310    /// Create and populate [`BloomFilterHeader`] from this bitset for writing to serialized form
311    fn header(&self) -> BloomFilterHeader {
312        BloomFilterHeader {
313            // 8 i32 per block, 4 bytes per i32
314            num_bytes: self.0.len() as i32 * 4 * 8,
315            algorithm: BloomFilterAlgorithm::BLOCK(SplitBlockAlgorithm {}),
316            hash: BloomFilterHash::XXHASH(XxHash {}),
317            compression: BloomFilterCompression::UNCOMPRESSED(Uncompressed {}),
318        }
319    }
320
321    /// Read a new bloom filter from the given offset in the given reader.
322    pub(crate) fn read_from_column_chunk<R: ChunkReader>(
323        column_metadata: &ColumnChunkMetaData,
324        reader: &R,
325    ) -> Result<Option<Self>, ParquetError> {
326        let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
327            offset
328                .try_into()
329                .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
330        } else {
331            return Ok(None);
332        };
333
334        let buffer = match column_metadata.bloom_filter_length() {
335            Some(length) => reader.get_bytes(offset, length as usize),
336            None => reader.get_bytes(offset, SBBF_HEADER_SIZE_ESTIMATE),
337        }?;
338
339        let (header, bitset_offset) =
340            chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
341
342        match header.algorithm {
343            BloomFilterAlgorithm::BLOCK(_) => {
344                // this match exists to future proof the singleton algorithm enum
345            }
346        }
347        match header.compression {
348            BloomFilterCompression::UNCOMPRESSED(_) => {
349                // this match exists to future proof the singleton compression enum
350            }
351        }
352        match header.hash {
353            BloomFilterHash::XXHASH(_) => {
354                // this match exists to future proof the singleton hash enum
355            }
356        }
357
358        let bitset = match column_metadata.bloom_filter_length() {
359            Some(_) => buffer.slice((bitset_offset - offset) as usize..),
360            None => {
361                let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
362                    ParquetError::General("Bloom filter length is invalid".to_string())
363                })?;
364                reader.get_bytes(bitset_offset, bitset_length)?
365            }
366        };
367
368        Ok(Some(Self::new(&bitset)))
369    }
370
371    #[inline]
372    fn hash_to_block_index(&self, hash: u64) -> usize {
373        // unchecked_mul is unstable, but in reality this is safe, we'd just use saturating mul
374        // but it will not saturate
375        (((hash >> 32).saturating_mul(self.0.len() as u64)) >> 32) as usize
376    }
377
378    /// Insert an [AsBytes] value into the filter
379    pub fn insert<T: AsBytes + ?Sized>(&mut self, value: &T) {
380        self.insert_hash(hash_as_bytes(value));
381    }
382
383    /// Insert a hash into the filter
384    fn insert_hash(&mut self, hash: u64) {
385        let block_index = self.hash_to_block_index(hash);
386        self.0[block_index].insert(hash as u32)
387    }
388
389    /// Check if an [AsBytes] value is probably present or definitely absent in the filter
390    pub fn check<T: AsBytes>(&self, value: &T) -> bool {
391        self.check_hash(hash_as_bytes(value))
392    }
393
394    /// Check if a hash is in the filter. May return
395    /// true for values that was never inserted ("false positive")
396    /// but will always return false if a hash has not been inserted.
397    fn check_hash(&self, hash: u64) -> bool {
398        let block_index = self.hash_to_block_index(hash);
399        self.0[block_index].check(hash as u32)
400    }
401
402    /// Return the total in memory size of this bloom filter in bytes
403    pub(crate) fn estimated_memory_size(&self) -> usize {
404        self.0.capacity() * std::mem::size_of::<Block>()
405    }
406}
407
408// per spec we use xxHash with seed=0
409const SEED: u64 = 0;
410
411#[inline]
412fn hash_as_bytes<A: AsBytes + ?Sized>(value: &A) -> u64 {
413    XxHash64::oneshot(SEED, value.as_bytes())
414}
415
416#[cfg(test)]
417mod tests {
418    use super::*;
419
420    #[test]
421    fn test_hash_bytes() {
422        assert_eq!(hash_as_bytes(""), 17241709254077376921);
423    }
424
425    #[test]
426    fn test_mask_set_quick_check() {
427        for i in 0..1_000_000 {
428            let result = Block::mask(i);
429            assert!(result.0.iter().all(|&x| x.is_power_of_two()));
430        }
431    }
432
433    #[test]
434    fn test_block_insert_and_check() {
435        for i in 0..1_000_000 {
436            let mut block = Block::ZERO;
437            block.insert(i);
438            assert!(block.check(i));
439        }
440    }
441
442    #[test]
443    fn test_sbbf_insert_and_check() {
444        let mut sbbf = Sbbf(vec![Block::ZERO; 1_000]);
445        for i in 0..1_000_000 {
446            sbbf.insert(&i);
447            assert!(sbbf.check(&i));
448        }
449    }
450
451    #[test]
452    fn test_with_fixture() {
453        // bloom filter produced by parquet-mr/spark for a column of i64 f"a{i}" for i in 0..10
454        let bitset: &[u8] = &[
455            200, 1, 80, 20, 64, 68, 8, 109, 6, 37, 4, 67, 144, 80, 96, 32, 8, 132, 43, 33, 0, 5,
456            99, 65, 2, 0, 224, 44, 64, 78, 96, 4,
457        ];
458        let sbbf = Sbbf::new(bitset);
459        for a in 0..10i64 {
460            let value = format!("a{a}");
461            assert!(sbbf.check(&value.as_str()));
462        }
463    }
464
465    /// test the assumption that bloom filter header size should not exceed SBBF_HEADER_SIZE_ESTIMATE
466    /// essentially we are testing that the struct is packed with 4 i32 fields, each can be 1-5 bytes
467    /// so altogether it'll be 20 bytes at most.
468    #[test]
469    fn test_bloom_filter_header_size_assumption() {
470        let buffer: &[u8; 16] = &[21, 64, 28, 28, 0, 0, 28, 28, 0, 0, 28, 28, 0, 0, 0, 99];
471        let (
472            BloomFilterHeader {
473                algorithm,
474                compression,
475                hash,
476                num_bytes,
477            },
478            read_length,
479        ) = read_bloom_filter_header_and_length(Bytes::copy_from_slice(buffer)).unwrap();
480        assert_eq!(read_length, 15);
481        assert_eq!(
482            algorithm,
483            BloomFilterAlgorithm::BLOCK(SplitBlockAlgorithm {})
484        );
485        assert_eq!(
486            compression,
487            BloomFilterCompression::UNCOMPRESSED(Uncompressed {})
488        );
489        assert_eq!(hash, BloomFilterHash::XXHASH(XxHash {}));
490        assert_eq!(num_bytes, 32_i32);
491        assert_eq!(20, SBBF_HEADER_SIZE_ESTIMATE);
492    }
493
494    #[test]
495    fn test_optimal_num_of_bytes() {
496        for (input, expected) in &[
497            (0, 32),
498            (9, 32),
499            (31, 32),
500            (32, 32),
501            (33, 64),
502            (99, 128),
503            (1024, 1024),
504            (999_000_000, 128 * 1024 * 1024),
505        ] {
506            assert_eq!(*expected, optimal_num_of_bytes(*input));
507        }
508    }
509
510    #[test]
511    fn test_num_of_bits_from_ndv_fpp() {
512        for (fpp, ndv, num_bits) in &[
513            (0.1, 10, 57),
514            (0.01, 10, 96),
515            (0.001, 10, 146),
516            (0.1, 100, 577),
517            (0.01, 100, 968),
518            (0.001, 100, 1460),
519            (0.1, 1000, 5772),
520            (0.01, 1000, 9681),
521            (0.001, 1000, 14607),
522            (0.1, 10000, 57725),
523            (0.01, 10000, 96815),
524            (0.001, 10000, 146076),
525            (0.1, 100000, 577254),
526            (0.01, 100000, 968152),
527            (0.001, 100000, 1460769),
528            (0.1, 1000000, 5772541),
529            (0.01, 1000000, 9681526),
530            (0.001, 1000000, 14607697),
531            (1e-50, 1_000_000_000_000, 14226231280773240832),
532        ] {
533            assert_eq!(*num_bits, num_of_bits_from_ndv_fpp(*ndv, *fpp) as u64);
534        }
535    }
536}