parquet/bloom_filter/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Bloom filter implementation specific to Parquet, as described
19//! in the [spec][parquet-bf-spec].
20//!
21//! # Bloom Filter Size
22//!
23//! Parquet uses the [Split Block Bloom Filter][sbbf-paper] (SBBF) as its bloom filter
24//! implementation. For each column upon which bloom filters are enabled, the offset and length of an SBBF
25//! is stored in  the metadata for each row group in the parquet file. The size of each filter is
26//! initialized using a calculation based on the desired number of distinct values (NDV) and false
27//! positive probability (FPP). The FPP for a SBBF can be approximated as<sup>[1][bf-formulae]</sup>:
28//!
29//! ```text
30//! f = (1 - e^(-k * n / m))^k
31//! ```
32//!
33//! Where, `f` is the FPP, `k` the number of hash functions, `n` the NDV, and `m` the total number
34//! of bits in the bloom filter. This can be re-arranged to determine the total number of bits
35//! required to achieve a given FPP and NDV:
36//!
37//! ```text
38//! m = -k * n / ln(1 - f^(1/k))
39//! ```
40//!
41//! SBBFs use eight hash functions to cleanly fit in SIMD lanes<sup>[2][sbbf-paper]</sup>, therefore
42//! `k` is set to 8. The SBBF will spread those `m` bits accross a set of `b` blocks that
43//! are each 256 bits, i.e., 32 bytes, in size. The number of blocks is chosen as:
44//!
45//! ```text
46//! b = NP2(m/8) / 32
47//! ```
48//!
49//! Where, `NP2` denotes *the next power of two*, and `m` is divided by 8 to be represented as bytes.
50//!
51//! Here is a table of calculated sizes for various FPP and NDV:
52//!
53//! | NDV       | FPP       | b       | Size (KB) |
54//! |-----------|-----------|---------|-----------|
55//! | 10,000    | 0.1       | 256     | 8         |
56//! | 10,000    | 0.01      | 512     | 16        |
57//! | 10,000    | 0.001     | 1,024   | 32        |
58//! | 10,000    | 0.0001    | 1,024   | 32        |
59//! | 100,000   | 0.1       | 4,096   | 128       |
60//! | 100,000   | 0.01      | 4,096   | 128       |
61//! | 100,000   | 0.001     | 8,192   | 256       |
62//! | 100,000   | 0.0001    | 16,384  | 512       |
63//! | 100,000   | 0.00001   | 16,384  | 512       |
64//! | 1,000,000 | 0.1       | 32,768  | 1,024     |
65//! | 1,000,000 | 0.01      | 65,536  | 2,048     |
66//! | 1,000,000 | 0.001     | 65,536  | 2,048     |
67//! | 1,000,000 | 0.0001    | 131,072 | 4,096     |
68//! | 1,000,000 | 0.00001   | 131,072 | 4,096     |
69//! | 1,000,000 | 0.000001  | 262,144 | 8,192     |
70//!
71//! [parquet-bf-spec]: https://github.com/apache/parquet-format/blob/master/BloomFilter.md
72//! [sbbf-paper]: https://arxiv.org/pdf/2101.01719
73//! [bf-formulae]: http://tfk.mit.edu/pdf/bloom.pdf
74
75use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
76use crate::data_type::AsBytes;
77use crate::errors::{ParquetError, Result};
78use crate::file::metadata::ColumnChunkMetaData;
79use crate::file::reader::ChunkReader;
80use crate::parquet_thrift::{
81    ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol, ThriftCompactOutputProtocol,
82    ThriftSliceInputProtocol, WriteThrift, WriteThriftField,
83};
84use crate::thrift_struct;
85use bytes::Bytes;
86use std::io::Write;
87use twox_hash::XxHash64;
88
89/// Salt as defined in the [spec](https://github.com/apache/parquet-format/blob/master/BloomFilter.md#technical-approach).
90const SALT: [u32; 8] = [
91    0x47b6137b_u32,
92    0x44974d91_u32,
93    0x8824ad5b_u32,
94    0xa2b7289d_u32,
95    0x705495c7_u32,
96    0x2df1424b_u32,
97    0x9efc4947_u32,
98    0x5c6bfb31_u32,
99];
100
101thrift_struct!(
102/// Bloom filter header is stored at beginning of Bloom filter data of each column
103/// and followed by its bitset.
104///
105pub struct BloomFilterHeader {
106  /// The size of bitset in bytes
107  1: required i32 num_bytes;
108  /// The algorithm for setting bits.
109  2: required BloomFilterAlgorithm algorithm;
110  /// The hash function used for Bloom filter
111  3: required BloomFilterHash hash;
112  /// The compression used in the Bloom filter
113  4: required BloomFilterCompression compression;
114}
115);
116
117/// Each block is 256 bits, broken up into eight contiguous "words", each consisting of 32 bits.
118/// Each word is thought of as an array of bits; each bit is either "set" or "not set".
119#[derive(Debug, Copy, Clone)]
120#[repr(transparent)]
121struct Block([u32; 8]);
122impl Block {
123    const ZERO: Block = Block([0; 8]);
124
125    /// takes as its argument a single unsigned 32-bit integer and returns a block in which each
126    /// word has exactly one bit set.
127    fn mask(x: u32) -> Self {
128        let mut result = [0_u32; 8];
129        for i in 0..8 {
130            // wrapping instead of checking for overflow
131            let y = x.wrapping_mul(SALT[i]);
132            let y = y >> 27;
133            result[i] = 1 << y;
134        }
135        Self(result)
136    }
137
138    #[inline]
139    #[cfg(not(target_endian = "little"))]
140    fn to_ne_bytes(self) -> [u8; 32] {
141        // SAFETY: [u32; 8] and [u8; 32] have the same size and neither has invalid bit patterns.
142        unsafe { std::mem::transmute(self.0) }
143    }
144
145    #[inline]
146    #[cfg(not(target_endian = "little"))]
147    fn to_le_bytes(self) -> [u8; 32] {
148        self.swap_bytes().to_ne_bytes()
149    }
150
151    #[inline]
152    #[cfg(not(target_endian = "little"))]
153    fn swap_bytes(mut self) -> Self {
154        self.0.iter_mut().for_each(|x| *x = x.swap_bytes());
155        self
156    }
157
158    /// setting every bit in the block that was also set in the result from mask
159    fn insert(&mut self, hash: u32) {
160        let mask = Self::mask(hash);
161        for i in 0..8 {
162            self[i] |= mask[i];
163        }
164    }
165
166    /// returns true when every bit that is set in the result of mask is also set in the block.
167    fn check(&self, hash: u32) -> bool {
168        let mask = Self::mask(hash);
169        for i in 0..8 {
170            if self[i] & mask[i] == 0 {
171                return false;
172            }
173        }
174        true
175    }
176}
177
178impl std::ops::Index<usize> for Block {
179    type Output = u32;
180
181    #[inline]
182    fn index(&self, index: usize) -> &Self::Output {
183        self.0.index(index)
184    }
185}
186
187impl std::ops::IndexMut<usize> for Block {
188    #[inline]
189    fn index_mut(&mut self, index: usize) -> &mut Self::Output {
190        self.0.index_mut(index)
191    }
192}
193
194/// A split block Bloom filter.
195///
196/// The creation of this structure is based on the [`crate::file::properties::BloomFilterProperties`]
197/// struct set via [`crate::file::properties::WriterProperties`] and is thus hidden by default.
198#[derive(Debug, Clone)]
199pub struct Sbbf(Vec<Block>);
200
201pub(crate) const SBBF_HEADER_SIZE_ESTIMATE: usize = 20;
202
203/// given an initial offset, and a byte buffer, try to read out a bloom filter header and return
204/// both the header and the offset after it (for bitset).
205pub(crate) fn chunk_read_bloom_filter_header_and_offset(
206    offset: u64,
207    buffer: Bytes,
208) -> Result<(BloomFilterHeader, u64), ParquetError> {
209    let (header, length) = read_bloom_filter_header_and_length(buffer)?;
210    Ok((header, offset + length))
211}
212
213/// given a [Bytes] buffer, try to read out a bloom filter header and return both the header and
214/// length of the header.
215#[inline]
216pub(crate) fn read_bloom_filter_header_and_length(
217    buffer: Bytes,
218) -> Result<(BloomFilterHeader, u64), ParquetError> {
219    let total_length = buffer.len();
220    let mut prot = ThriftSliceInputProtocol::new(buffer.as_ref());
221    let header = BloomFilterHeader::read_thrift(&mut prot)
222        .map_err(|e| ParquetError::General(format!("Could not read bloom filter header: {e}")))?;
223    Ok((header, (total_length - prot.as_slice().len()) as u64))
224}
225
226pub(crate) const BITSET_MIN_LENGTH: usize = 32;
227pub(crate) const BITSET_MAX_LENGTH: usize = 128 * 1024 * 1024;
228
229#[inline]
230fn optimal_num_of_bytes(num_bytes: usize) -> usize {
231    let num_bytes = num_bytes.min(BITSET_MAX_LENGTH);
232    let num_bytes = num_bytes.max(BITSET_MIN_LENGTH);
233    num_bytes.next_power_of_two()
234}
235
236// see http://algo2.iti.kit.edu/documents/cacheefficientbloomfilters-jea.pdf
237// given fpp = (1 - e^(-k * n / m)) ^ k
238// we have m = - k * n / ln(1 - fpp ^ (1 / k))
239// where k = number of hash functions, m = number of bits, n = number of distinct values
240#[inline]
241fn num_of_bits_from_ndv_fpp(ndv: u64, fpp: f64) -> usize {
242    let num_bits = -8.0 * ndv as f64 / (1.0 - fpp.powf(1.0 / 8.0)).ln();
243    num_bits as usize
244}
245
246impl Sbbf {
247    /// Create a new [Sbbf] with given number of distinct values and false positive probability.
248    /// Will return an error if `fpp` is greater than or equal to 1.0 or less than 0.0.
249    pub(crate) fn new_with_ndv_fpp(ndv: u64, fpp: f64) -> Result<Self, ParquetError> {
250        if !(0.0..1.0).contains(&fpp) {
251            return Err(ParquetError::General(format!(
252                "False positive probability must be between 0.0 and 1.0, got {fpp}"
253            )));
254        }
255        let num_bits = num_of_bits_from_ndv_fpp(ndv, fpp);
256        Ok(Self::new_with_num_of_bytes(num_bits / 8))
257    }
258
259    /// Create a new [Sbbf] with given number of bytes, the exact number of bytes will be adjusted
260    /// to the next power of two bounded by [BITSET_MIN_LENGTH] and [BITSET_MAX_LENGTH].
261    pub(crate) fn new_with_num_of_bytes(num_bytes: usize) -> Self {
262        let num_bytes = optimal_num_of_bytes(num_bytes);
263        assert_eq!(num_bytes % size_of::<Block>(), 0);
264        let num_blocks = num_bytes / size_of::<Block>();
265        let bitset = vec![Block::ZERO; num_blocks];
266        Self(bitset)
267    }
268
269    pub(crate) fn new(bitset: &[u8]) -> Self {
270        let data = bitset
271            .chunks_exact(4 * 8)
272            .map(|chunk| {
273                let mut block = Block::ZERO;
274                for (i, word) in chunk.chunks_exact(4).enumerate() {
275                    block[i] = u32::from_le_bytes(word.try_into().unwrap());
276                }
277                block
278            })
279            .collect::<Vec<Block>>();
280        Self(data)
281    }
282
283    /// Write the bloom filter data (header and then bitset) to the output. This doesn't
284    /// flush the writer in order to boost performance of bulk writing all blocks. Caller
285    /// must remember to flush the writer.
286    pub(crate) fn write<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
287        let mut protocol = ThriftCompactOutputProtocol::new(&mut writer);
288        self.header().write_thrift(&mut protocol).map_err(|e| {
289            ParquetError::General(format!("Could not write bloom filter header: {e}"))
290        })?;
291        self.write_bitset(&mut writer)?;
292        Ok(())
293    }
294
295    /// Write the bitset in serialized form to the writer.
296    #[cfg(not(target_endian = "little"))]
297    fn write_bitset<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
298        for block in &self.0 {
299            writer
300                .write_all(block.to_le_bytes().as_slice())
301                .map_err(|e| {
302                    ParquetError::General(format!("Could not write bloom filter bit set: {e}"))
303                })?;
304        }
305        Ok(())
306    }
307
308    /// Write the bitset in serialized form to the writer.
309    #[cfg(target_endian = "little")]
310    fn write_bitset<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
311        // Safety: Block is repr(transparent) and [u32; 8] can be reinterpreted as [u8; 32].
312        let slice = unsafe {
313            std::slice::from_raw_parts(
314                self.0.as_ptr() as *const u8,
315                self.0.len() * size_of::<Block>(),
316            )
317        };
318        writer.write_all(slice).map_err(|e| {
319            ParquetError::General(format!("Could not write bloom filter bit set: {e}"))
320        })?;
321        Ok(())
322    }
323
324    /// Create and populate [`BloomFilterHeader`] from this bitset for writing to serialized form
325    fn header(&self) -> BloomFilterHeader {
326        BloomFilterHeader {
327            // 8 i32 per block, 4 bytes per i32
328            num_bytes: self.0.len() as i32 * 4 * 8,
329            algorithm: BloomFilterAlgorithm::BLOCK,
330            hash: BloomFilterHash::XXHASH,
331            compression: BloomFilterCompression::UNCOMPRESSED,
332        }
333    }
334
335    /// Read a new bloom filter from the given offset in the given reader.
336    pub(crate) fn read_from_column_chunk<R: ChunkReader>(
337        column_metadata: &ColumnChunkMetaData,
338        reader: &R,
339    ) -> Result<Option<Self>, ParquetError> {
340        let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
341            offset
342                .try_into()
343                .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
344        } else {
345            return Ok(None);
346        };
347
348        let buffer = match column_metadata.bloom_filter_length() {
349            Some(length) => reader.get_bytes(offset, length as usize),
350            None => reader.get_bytes(offset, SBBF_HEADER_SIZE_ESTIMATE),
351        }?;
352
353        let (header, bitset_offset) =
354            chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
355
356        match header.algorithm {
357            BloomFilterAlgorithm::BLOCK => {
358                // this match exists to future proof the singleton algorithm enum
359            }
360        }
361        match header.compression {
362            BloomFilterCompression::UNCOMPRESSED => {
363                // this match exists to future proof the singleton compression enum
364            }
365        }
366        match header.hash {
367            BloomFilterHash::XXHASH => {
368                // this match exists to future proof the singleton hash enum
369            }
370        }
371
372        let bitset = match column_metadata.bloom_filter_length() {
373            Some(_) => buffer.slice((bitset_offset - offset) as usize..),
374            None => {
375                let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
376                    ParquetError::General("Bloom filter length is invalid".to_string())
377                })?;
378                reader.get_bytes(bitset_offset, bitset_length)?
379            }
380        };
381
382        Ok(Some(Self::new(&bitset)))
383    }
384
385    #[inline]
386    fn hash_to_block_index(&self, hash: u64) -> usize {
387        // unchecked_mul is unstable, but in reality this is safe, we'd just use saturating mul
388        // but it will not saturate
389        (((hash >> 32).saturating_mul(self.0.len() as u64)) >> 32) as usize
390    }
391
392    /// Insert an [AsBytes] value into the filter
393    pub fn insert<T: AsBytes + ?Sized>(&mut self, value: &T) {
394        self.insert_hash(hash_as_bytes(value));
395    }
396
397    /// Insert a hash into the filter
398    fn insert_hash(&mut self, hash: u64) {
399        let block_index = self.hash_to_block_index(hash);
400        self.0[block_index].insert(hash as u32)
401    }
402
403    /// Check if an [AsBytes] value is probably present or definitely absent in the filter
404    pub fn check<T: AsBytes>(&self, value: &T) -> bool {
405        self.check_hash(hash_as_bytes(value))
406    }
407
408    /// Check if a hash is in the filter. May return
409    /// true for values that was never inserted ("false positive")
410    /// but will always return false if a hash has not been inserted.
411    fn check_hash(&self, hash: u64) -> bool {
412        let block_index = self.hash_to_block_index(hash);
413        self.0[block_index].check(hash as u32)
414    }
415
416    /// Return the total in memory size of this bloom filter in bytes
417    pub(crate) fn estimated_memory_size(&self) -> usize {
418        self.0.capacity() * std::mem::size_of::<Block>()
419    }
420}
421
422// per spec we use xxHash with seed=0
423const SEED: u64 = 0;
424
425#[inline]
426fn hash_as_bytes<A: AsBytes + ?Sized>(value: &A) -> u64 {
427    XxHash64::oneshot(SEED, value.as_bytes())
428}
429
430#[cfg(test)]
431mod tests {
432    use super::*;
433
434    #[test]
435    fn test_hash_bytes() {
436        assert_eq!(hash_as_bytes(""), 17241709254077376921);
437    }
438
439    #[test]
440    fn test_mask_set_quick_check() {
441        for i in 0..1_000_000 {
442            let result = Block::mask(i);
443            assert!(result.0.iter().all(|&x| x.is_power_of_two()));
444        }
445    }
446
447    #[test]
448    fn test_block_insert_and_check() {
449        for i in 0..1_000_000 {
450            let mut block = Block::ZERO;
451            block.insert(i);
452            assert!(block.check(i));
453        }
454    }
455
456    #[test]
457    fn test_sbbf_insert_and_check() {
458        let mut sbbf = Sbbf(vec![Block::ZERO; 1_000]);
459        for i in 0..1_000_000 {
460            sbbf.insert(&i);
461            assert!(sbbf.check(&i));
462        }
463    }
464
465    #[test]
466    fn test_with_fixture() {
467        // bloom filter produced by parquet-mr/spark for a column of i64 f"a{i}" for i in 0..10
468        let bitset: &[u8] = &[
469            200, 1, 80, 20, 64, 68, 8, 109, 6, 37, 4, 67, 144, 80, 96, 32, 8, 132, 43, 33, 0, 5,
470            99, 65, 2, 0, 224, 44, 64, 78, 96, 4,
471        ];
472        let sbbf = Sbbf::new(bitset);
473        for a in 0..10i64 {
474            let value = format!("a{a}");
475            assert!(sbbf.check(&value.as_str()));
476        }
477    }
478
479    /// test the assumption that bloom filter header size should not exceed SBBF_HEADER_SIZE_ESTIMATE
480    /// essentially we are testing that the struct is packed with 4 i32 fields, each can be 1-5 bytes
481    /// so altogether it'll be 20 bytes at most.
482    #[test]
483    fn test_bloom_filter_header_size_assumption() {
484        let buffer: &[u8; 16] = &[21, 64, 28, 28, 0, 0, 28, 28, 0, 0, 28, 28, 0, 0, 0, 99];
485        let (
486            BloomFilterHeader {
487                algorithm,
488                compression,
489                hash,
490                num_bytes,
491            },
492            read_length,
493        ) = read_bloom_filter_header_and_length(Bytes::copy_from_slice(buffer)).unwrap();
494        assert_eq!(read_length, 15);
495        assert_eq!(algorithm, BloomFilterAlgorithm::BLOCK);
496        assert_eq!(compression, BloomFilterCompression::UNCOMPRESSED);
497        assert_eq!(hash, BloomFilterHash::XXHASH);
498        assert_eq!(num_bytes, 32_i32);
499        assert_eq!(20, SBBF_HEADER_SIZE_ESTIMATE);
500    }
501
502    #[test]
503    fn test_optimal_num_of_bytes() {
504        for (input, expected) in &[
505            (0, 32),
506            (9, 32),
507            (31, 32),
508            (32, 32),
509            (33, 64),
510            (99, 128),
511            (1024, 1024),
512            (999_000_000, 128 * 1024 * 1024),
513        ] {
514            assert_eq!(*expected, optimal_num_of_bytes(*input));
515        }
516    }
517
518    #[test]
519    fn test_num_of_bits_from_ndv_fpp() {
520        for (fpp, ndv, num_bits) in &[
521            (0.1, 10, 57),
522            (0.01, 10, 96),
523            (0.001, 10, 146),
524            (0.1, 100, 577),
525            (0.01, 100, 968),
526            (0.001, 100, 1460),
527            (0.1, 1000, 5772),
528            (0.01, 1000, 9681),
529            (0.001, 1000, 14607),
530            (0.1, 10000, 57725),
531            (0.01, 10000, 96815),
532            (0.001, 10000, 146076),
533            (0.1, 100000, 577254),
534            (0.01, 100000, 968152),
535            (0.001, 100000, 1460769),
536            (0.1, 1000000, 5772541),
537            (0.01, 1000000, 9681526),
538            (0.001, 1000000, 14607697),
539            (1e-50, 1_000_000_000_000, 14226231280773240832),
540        ] {
541            assert_eq!(*num_bits, num_of_bits_from_ndv_fpp(*ndv, *fpp) as u64);
542        }
543    }
544}