1use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
76use crate::data_type::AsBytes;
77use crate::errors::{ParquetError, Result};
78use crate::file::metadata::ColumnChunkMetaData;
79use crate::file::reader::ChunkReader;
80use crate::parquet_thrift::{
81 ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol, ThriftCompactOutputProtocol,
82 ThriftSliceInputProtocol, WriteThrift, WriteThriftField,
83};
84use crate::thrift_struct;
85use bytes::Bytes;
86use std::io::Write;
87use twox_hash::XxHash64;
88
89const SALT: [u32; 8] = [
91 0x47b6137b_u32,
92 0x44974d91_u32,
93 0x8824ad5b_u32,
94 0xa2b7289d_u32,
95 0x705495c7_u32,
96 0x2df1424b_u32,
97 0x9efc4947_u32,
98 0x5c6bfb31_u32,
99];
100
101thrift_struct!(
102pub struct BloomFilterHeader {
106 1: required i32 num_bytes;
108 2: required BloomFilterAlgorithm algorithm;
110 3: required BloomFilterHash hash;
112 4: required BloomFilterCompression compression;
114}
115);
116
117#[derive(Debug, Copy, Clone)]
120#[repr(transparent)]
121struct Block([u32; 8]);
122impl Block {
123 const ZERO: Block = Block([0; 8]);
124
125 fn mask(x: u32) -> Self {
128 let mut result = [0_u32; 8];
129 for i in 0..8 {
130 let y = x.wrapping_mul(SALT[i]);
132 let y = y >> 27;
133 result[i] = 1 << y;
134 }
135 Self(result)
136 }
137
138 #[inline]
139 #[cfg(not(target_endian = "little"))]
140 fn to_ne_bytes(self) -> [u8; 32] {
141 unsafe { std::mem::transmute(self.0) }
143 }
144
145 #[inline]
146 #[cfg(not(target_endian = "little"))]
147 fn to_le_bytes(self) -> [u8; 32] {
148 self.swap_bytes().to_ne_bytes()
149 }
150
151 #[inline]
152 #[cfg(not(target_endian = "little"))]
153 fn swap_bytes(mut self) -> Self {
154 self.0.iter_mut().for_each(|x| *x = x.swap_bytes());
155 self
156 }
157
158 fn insert(&mut self, hash: u32) {
160 let mask = Self::mask(hash);
161 for i in 0..8 {
162 self[i] |= mask[i];
163 }
164 }
165
166 fn check(&self, hash: u32) -> bool {
168 let mask = Self::mask(hash);
169 for i in 0..8 {
170 if self[i] & mask[i] == 0 {
171 return false;
172 }
173 }
174 true
175 }
176}
177
178impl std::ops::Index<usize> for Block {
179 type Output = u32;
180
181 #[inline]
182 fn index(&self, index: usize) -> &Self::Output {
183 self.0.index(index)
184 }
185}
186
187impl std::ops::IndexMut<usize> for Block {
188 #[inline]
189 fn index_mut(&mut self, index: usize) -> &mut Self::Output {
190 self.0.index_mut(index)
191 }
192}
193
194#[derive(Debug, Clone)]
199pub struct Sbbf(Vec<Block>);
200
201pub(crate) const SBBF_HEADER_SIZE_ESTIMATE: usize = 20;
202
203pub(crate) fn chunk_read_bloom_filter_header_and_offset(
206 offset: u64,
207 buffer: Bytes,
208) -> Result<(BloomFilterHeader, u64), ParquetError> {
209 let (header, length) = read_bloom_filter_header_and_length(buffer)?;
210 Ok((header, offset + length))
211}
212
213#[inline]
216pub(crate) fn read_bloom_filter_header_and_length(
217 buffer: Bytes,
218) -> Result<(BloomFilterHeader, u64), ParquetError> {
219 read_bloom_filter_header_and_length_from_bytes(buffer.as_ref())
220}
221
222#[inline]
225fn read_bloom_filter_header_and_length_from_bytes(
226 buffer: &[u8],
227) -> Result<(BloomFilterHeader, u64), ParquetError> {
228 let total_length = buffer.len();
229 let mut prot = ThriftSliceInputProtocol::new(buffer);
230 let header = BloomFilterHeader::read_thrift(&mut prot)
231 .map_err(|e| ParquetError::General(format!("Could not read bloom filter header: {e}")))?;
232 Ok((header, (total_length - prot.as_slice().len()) as u64))
233}
234
235pub const BITSET_MIN_LENGTH: usize = 32;
237pub const BITSET_MAX_LENGTH: usize = 128 * 1024 * 1024;
239
240#[inline]
241fn optimal_num_of_bytes(num_bytes: usize) -> usize {
242 let num_bytes = num_bytes.min(BITSET_MAX_LENGTH);
243 let num_bytes = num_bytes.max(BITSET_MIN_LENGTH);
244 num_bytes.next_power_of_two()
245}
246
247#[inline]
252fn num_of_bits_from_ndv_fpp(ndv: u64, fpp: f64) -> usize {
253 let num_bits = -8.0 * ndv as f64 / (1.0 - fpp.powf(1.0 / 8.0)).ln();
254 num_bits as usize
255}
256
257impl Sbbf {
258 pub fn new_with_ndv_fpp(ndv: u64, fpp: f64) -> Result<Self, ParquetError> {
261 if !(0.0..1.0).contains(&fpp) {
262 return Err(ParquetError::General(format!(
263 "False positive probability must be between 0.0 and 1.0, got {fpp}"
264 )));
265 }
266 let num_bits = num_of_bits_from_ndv_fpp(ndv, fpp);
267 Ok(Self::new_with_num_of_bytes(num_bits / 8))
268 }
269
270 pub fn new_with_num_of_bytes(num_bytes: usize) -> Self {
273 let num_bytes = optimal_num_of_bytes(num_bytes);
274 assert_eq!(num_bytes % size_of::<Block>(), 0);
275 let num_blocks = num_bytes / size_of::<Block>();
276 let bitset = vec![Block::ZERO; num_blocks];
277 Self(bitset)
278 }
279
280 pub fn new(bitset: &[u8]) -> Self {
282 let data = bitset
283 .chunks_exact(4 * 8)
284 .map(|chunk| {
285 let mut block = Block::ZERO;
286 for (i, word) in chunk.chunks_exact(4).enumerate() {
287 block[i] = u32::from_le_bytes(word.try_into().unwrap());
288 }
289 block
290 })
291 .collect::<Vec<Block>>();
292 Self(data)
293 }
294
295 pub fn write<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
300 let mut protocol = ThriftCompactOutputProtocol::new(&mut writer);
301 self.header().write_thrift(&mut protocol).map_err(|e| {
302 ParquetError::General(format!("Could not write bloom filter header: {e}"))
303 })?;
304 self.write_bitset(&mut writer)?;
305 Ok(())
306 }
307
308 #[cfg(not(target_endian = "little"))]
310 pub fn write_bitset<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
311 for block in &self.0 {
312 writer
313 .write_all(block.to_le_bytes().as_slice())
314 .map_err(|e| {
315 ParquetError::General(format!("Could not write bloom filter bit set: {e}"))
316 })?;
317 }
318 Ok(())
319 }
320
321 #[cfg(target_endian = "little")]
323 pub fn write_bitset<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
324 let slice = unsafe {
326 std::slice::from_raw_parts(
327 self.0.as_ptr() as *const u8,
328 self.0.len() * size_of::<Block>(),
329 )
330 };
331 writer.write_all(slice).map_err(|e| {
332 ParquetError::General(format!("Could not write bloom filter bit set: {e}"))
333 })?;
334 Ok(())
335 }
336
337 fn header(&self) -> BloomFilterHeader {
339 BloomFilterHeader {
340 num_bytes: self.0.len() as i32 * 4 * 8,
342 algorithm: BloomFilterAlgorithm::BLOCK,
343 hash: BloomFilterHash::XXHASH,
344 compression: BloomFilterCompression::UNCOMPRESSED,
345 }
346 }
347
348 pub fn read_from_column_chunk<R: ChunkReader>(
350 column_metadata: &ColumnChunkMetaData,
351 reader: &R,
352 ) -> Result<Option<Self>, ParquetError> {
353 let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
354 offset
355 .try_into()
356 .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
357 } else {
358 return Ok(None);
359 };
360
361 let buffer = match column_metadata.bloom_filter_length() {
362 Some(length) => reader.get_bytes(offset, length as usize),
363 None => reader.get_bytes(offset, SBBF_HEADER_SIZE_ESTIMATE),
364 }?;
365
366 let (header, bitset_offset) =
367 chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
368
369 match header.algorithm {
370 BloomFilterAlgorithm::BLOCK => {
371 }
373 }
374 match header.compression {
375 BloomFilterCompression::UNCOMPRESSED => {
376 }
378 }
379 match header.hash {
380 BloomFilterHash::XXHASH => {
381 }
383 }
384
385 let bitset = match column_metadata.bloom_filter_length() {
386 Some(_) => buffer.slice((bitset_offset - offset) as usize..),
387 None => {
388 let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
389 ParquetError::General("Bloom filter length is invalid".to_string())
390 })?;
391 reader.get_bytes(bitset_offset, bitset_length)?
392 }
393 };
394
395 Ok(Some(Self::new(&bitset)))
396 }
397
398 #[inline]
399 fn hash_to_block_index(&self, hash: u64) -> usize {
400 (((hash >> 32).saturating_mul(self.0.len() as u64)) >> 32) as usize
403 }
404
405 pub fn insert<T: AsBytes + ?Sized>(&mut self, value: &T) {
407 self.insert_hash(hash_as_bytes(value));
408 }
409
410 fn insert_hash(&mut self, hash: u64) {
412 let block_index = self.hash_to_block_index(hash);
413 self.0[block_index].insert(hash as u32)
414 }
415
416 pub fn check<T: AsBytes + ?Sized>(&self, value: &T) -> bool {
418 self.check_hash(hash_as_bytes(value))
419 }
420
421 fn check_hash(&self, hash: u64) -> bool {
425 let block_index = self.hash_to_block_index(hash);
426 self.0[block_index].check(hash as u32)
427 }
428
429 pub(crate) fn estimated_memory_size(&self) -> usize {
431 self.0.capacity() * std::mem::size_of::<Block>()
432 }
433
434 pub fn from_bytes(bytes: &[u8]) -> Result<Self, ParquetError> {
457 let (header, header_len) = read_bloom_filter_header_and_length_from_bytes(bytes)?;
458
459 let bitset_length: u64 = header
460 .num_bytes
461 .try_into()
462 .map_err(|_| ParquetError::General("Bloom filter length is invalid".to_string()))?;
463
464 if header_len + bitset_length != bytes.len() as u64 {
466 return Err(ParquetError::General(format!(
467 "Bloom filter data contains extra bytes: expected {} total bytes, got {}",
468 header_len + bitset_length,
469 bytes.len()
470 )));
471 }
472
473 let start = header_len as usize;
474 let end = (header_len + bitset_length) as usize;
475 let bitset = bytes
476 .get(start..end)
477 .ok_or_else(|| ParquetError::General("Bloom filter bitset is invalid".to_string()))?;
478
479 Ok(Self::new(bitset))
480 }
481}
482
483const SEED: u64 = 0;
485
486#[inline]
487fn hash_as_bytes<A: AsBytes + ?Sized>(value: &A) -> u64 {
488 XxHash64::oneshot(SEED, value.as_bytes())
489}
490
491#[cfg(test)]
492mod tests {
493 use super::*;
494
495 #[test]
496 fn test_hash_bytes() {
497 assert_eq!(hash_as_bytes(""), 17241709254077376921);
498 }
499
500 #[test]
501 fn test_mask_set_quick_check() {
502 for i in 0..1_000_000 {
503 let result = Block::mask(i);
504 assert!(result.0.iter().all(|&x| x.is_power_of_two()));
505 }
506 }
507
508 #[test]
509 fn test_block_insert_and_check() {
510 for i in 0..1_000_000 {
511 let mut block = Block::ZERO;
512 block.insert(i);
513 assert!(block.check(i));
514 }
515 }
516
517 #[test]
518 fn test_sbbf_insert_and_check() {
519 let mut sbbf = Sbbf(vec![Block::ZERO; 1_000]);
520 for i in 0..1_000_000 {
521 sbbf.insert(&i);
522 assert!(sbbf.check(&i));
523 }
524 }
525
526 #[test]
527 fn test_with_fixture() {
528 let bitset: &[u8] = &[
530 200, 1, 80, 20, 64, 68, 8, 109, 6, 37, 4, 67, 144, 80, 96, 32, 8, 132, 43, 33, 0, 5,
531 99, 65, 2, 0, 224, 44, 64, 78, 96, 4,
532 ];
533 let sbbf = Sbbf::new(bitset);
534 for a in 0..10i64 {
535 let value = format!("a{a}");
536 assert!(sbbf.check(&value.as_str()));
537 }
538 }
539
540 #[test]
544 fn test_bloom_filter_header_size_assumption() {
545 let buffer: &[u8; 16] = &[21, 64, 28, 28, 0, 0, 28, 28, 0, 0, 28, 28, 0, 0, 0, 99];
546 let (
547 BloomFilterHeader {
548 algorithm,
549 compression,
550 hash,
551 num_bytes,
552 },
553 read_length,
554 ) = read_bloom_filter_header_and_length(Bytes::copy_from_slice(buffer)).unwrap();
555 assert_eq!(read_length, 15);
556 assert_eq!(algorithm, BloomFilterAlgorithm::BLOCK);
557 assert_eq!(compression, BloomFilterCompression::UNCOMPRESSED);
558 assert_eq!(hash, BloomFilterHash::XXHASH);
559 assert_eq!(num_bytes, 32_i32);
560 assert_eq!(20, SBBF_HEADER_SIZE_ESTIMATE);
561 }
562
563 #[test]
564 fn test_optimal_num_of_bytes() {
565 for (input, expected) in &[
566 (0, 32),
567 (9, 32),
568 (31, 32),
569 (32, 32),
570 (33, 64),
571 (99, 128),
572 (1024, 1024),
573 (999_000_000, 128 * 1024 * 1024),
574 ] {
575 assert_eq!(*expected, optimal_num_of_bytes(*input));
576 }
577 }
578
579 #[test]
580 fn test_num_of_bits_from_ndv_fpp() {
581 for (fpp, ndv, num_bits) in &[
582 (0.1, 10, 57),
583 (0.01, 10, 96),
584 (0.001, 10, 146),
585 (0.1, 100, 577),
586 (0.01, 100, 968),
587 (0.001, 100, 1460),
588 (0.1, 1000, 5772),
589 (0.01, 1000, 9681),
590 (0.001, 1000, 14607),
591 (0.1, 10000, 57725),
592 (0.01, 10000, 96815),
593 (0.001, 10000, 146076),
594 (0.1, 100000, 577254),
595 (0.01, 100000, 968152),
596 (0.001, 100000, 1460769),
597 (0.1, 1000000, 5772541),
598 (0.01, 1000000, 9681526),
599 (0.001, 1000000, 14607697),
600 (1e-50, 1_000_000_000_000, 14226231280773240832),
601 ] {
602 assert_eq!(*num_bits, num_of_bits_from_ndv_fpp(*ndv, *fpp) as u64);
603 }
604 }
605
606 #[test]
607 fn test_sbbf_write_round_trip() {
608 let bitset_bytes = vec![0u8; 32];
610 let mut original = Sbbf::new(&bitset_bytes);
611
612 let test_values = ["hello", "world", "rust", "parquet", "bloom", "filter"];
614 for value in &test_values {
615 original.insert(value);
616 }
617
618 let mut output = Vec::new();
620 original.write(&mut output).unwrap();
621
622 let mut protocol = ThriftSliceInputProtocol::new(&output);
624 let header = BloomFilterHeader::read_thrift(&mut protocol).unwrap();
625 assert_eq!(header.num_bytes, bitset_bytes.len() as i32);
626 assert_eq!(header.algorithm, BloomFilterAlgorithm::BLOCK);
627 assert_eq!(header.hash, BloomFilterHash::XXHASH);
628 assert_eq!(header.compression, BloomFilterCompression::UNCOMPRESSED);
629
630 let reconstructed = Sbbf::from_bytes(&output).unwrap();
632
633 for value in &test_values {
637 assert!(
638 reconstructed.check(value),
639 "Value '{}' should be present after round-trip",
640 value
641 );
642 }
643 }
644}