1use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
76use crate::data_type::AsBytes;
77use crate::errors::{ParquetError, Result};
78use crate::file::metadata::ColumnChunkMetaData;
79use crate::file::reader::ChunkReader;
80use crate::parquet_thrift::{
81 ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol, ThriftCompactOutputProtocol,
82 ThriftSliceInputProtocol, WriteThrift, WriteThriftField,
83};
84use crate::thrift_struct;
85use bytes::Bytes;
86use std::io::Write;
87use twox_hash::XxHash64;
88
89const SALT: [u32; 8] = [
91 0x47b6137b_u32,
92 0x44974d91_u32,
93 0x8824ad5b_u32,
94 0xa2b7289d_u32,
95 0x705495c7_u32,
96 0x2df1424b_u32,
97 0x9efc4947_u32,
98 0x5c6bfb31_u32,
99];
100
101thrift_struct!(
102pub struct BloomFilterHeader {
106 1: required i32 num_bytes;
108 2: required BloomFilterAlgorithm algorithm;
110 3: required BloomFilterHash hash;
112 4: required BloomFilterCompression compression;
114}
115);
116
117#[derive(Debug, Copy, Clone)]
120#[repr(transparent)]
121struct Block([u32; 8]);
122impl Block {
123 const ZERO: Block = Block([0; 8]);
124
125 fn mask(x: u32) -> Self {
128 let mut result = [0_u32; 8];
129 for i in 0..8 {
130 let y = x.wrapping_mul(SALT[i]);
132 let y = y >> 27;
133 result[i] = 1 << y;
134 }
135 Self(result)
136 }
137
138 #[inline]
139 #[cfg(not(target_endian = "little"))]
140 fn to_ne_bytes(self) -> [u8; 32] {
141 unsafe { std::mem::transmute(self.0) }
143 }
144
145 #[inline]
146 #[cfg(not(target_endian = "little"))]
147 fn to_le_bytes(self) -> [u8; 32] {
148 self.swap_bytes().to_ne_bytes()
149 }
150
151 #[inline]
152 #[cfg(not(target_endian = "little"))]
153 fn swap_bytes(mut self) -> Self {
154 self.0.iter_mut().for_each(|x| *x = x.swap_bytes());
155 self
156 }
157
158 fn insert(&mut self, hash: u32) {
160 let mask = Self::mask(hash);
161 for i in 0..8 {
162 self[i] |= mask[i];
163 }
164 }
165
166 fn check(&self, hash: u32) -> bool {
168 let mask = Self::mask(hash);
169 for i in 0..8 {
170 if self[i] & mask[i] == 0 {
171 return false;
172 }
173 }
174 true
175 }
176}
177
178impl std::ops::Index<usize> for Block {
179 type Output = u32;
180
181 #[inline]
182 fn index(&self, index: usize) -> &Self::Output {
183 self.0.index(index)
184 }
185}
186
187impl std::ops::IndexMut<usize> for Block {
188 #[inline]
189 fn index_mut(&mut self, index: usize) -> &mut Self::Output {
190 self.0.index_mut(index)
191 }
192}
193
194#[derive(Debug, Clone)]
199pub struct Sbbf(Vec<Block>);
200
201pub(crate) const SBBF_HEADER_SIZE_ESTIMATE: usize = 20;
202
203pub(crate) fn chunk_read_bloom_filter_header_and_offset(
206 offset: u64,
207 buffer: Bytes,
208) -> Result<(BloomFilterHeader, u64), ParquetError> {
209 let (header, length) = read_bloom_filter_header_and_length(buffer)?;
210 Ok((header, offset + length))
211}
212
213#[inline]
216pub(crate) fn read_bloom_filter_header_and_length(
217 buffer: Bytes,
218) -> Result<(BloomFilterHeader, u64), ParquetError> {
219 read_bloom_filter_header_and_length_from_bytes(buffer.as_ref())
220}
221
222#[inline]
225fn read_bloom_filter_header_and_length_from_bytes(
226 buffer: &[u8],
227) -> Result<(BloomFilterHeader, u64), ParquetError> {
228 let total_length = buffer.len();
229 let mut prot = ThriftSliceInputProtocol::new(buffer);
230 let header = BloomFilterHeader::read_thrift(&mut prot)
231 .map_err(|e| ParquetError::General(format!("Could not read bloom filter header: {e}")))?;
232 Ok((header, (total_length - prot.as_slice().len()) as u64))
233}
234
235pub(crate) const BITSET_MIN_LENGTH: usize = 32;
236pub(crate) const BITSET_MAX_LENGTH: usize = 128 * 1024 * 1024;
237
238#[inline]
239fn optimal_num_of_bytes(num_bytes: usize) -> usize {
240 let num_bytes = num_bytes.min(BITSET_MAX_LENGTH);
241 let num_bytes = num_bytes.max(BITSET_MIN_LENGTH);
242 num_bytes.next_power_of_two()
243}
244
245#[inline]
250fn num_of_bits_from_ndv_fpp(ndv: u64, fpp: f64) -> usize {
251 let num_bits = -8.0 * ndv as f64 / (1.0 - fpp.powf(1.0 / 8.0)).ln();
252 num_bits as usize
253}
254
255impl Sbbf {
256 pub(crate) fn new_with_ndv_fpp(ndv: u64, fpp: f64) -> Result<Self, ParquetError> {
259 if !(0.0..1.0).contains(&fpp) {
260 return Err(ParquetError::General(format!(
261 "False positive probability must be between 0.0 and 1.0, got {fpp}"
262 )));
263 }
264 let num_bits = num_of_bits_from_ndv_fpp(ndv, fpp);
265 Ok(Self::new_with_num_of_bytes(num_bits / 8))
266 }
267
268 pub(crate) fn new_with_num_of_bytes(num_bytes: usize) -> Self {
271 let num_bytes = optimal_num_of_bytes(num_bytes);
272 assert_eq!(num_bytes % size_of::<Block>(), 0);
273 let num_blocks = num_bytes / size_of::<Block>();
274 let bitset = vec![Block::ZERO; num_blocks];
275 Self(bitset)
276 }
277
278 pub(crate) fn new(bitset: &[u8]) -> Self {
279 let data = bitset
280 .chunks_exact(4 * 8)
281 .map(|chunk| {
282 let mut block = Block::ZERO;
283 for (i, word) in chunk.chunks_exact(4).enumerate() {
284 block[i] = u32::from_le_bytes(word.try_into().unwrap());
285 }
286 block
287 })
288 .collect::<Vec<Block>>();
289 Self(data)
290 }
291
292 pub fn write<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
297 let mut protocol = ThriftCompactOutputProtocol::new(&mut writer);
298 self.header().write_thrift(&mut protocol).map_err(|e| {
299 ParquetError::General(format!("Could not write bloom filter header: {e}"))
300 })?;
301 self.write_bitset(&mut writer)?;
302 Ok(())
303 }
304
305 #[cfg(not(target_endian = "little"))]
307 fn write_bitset<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
308 for block in &self.0 {
309 writer
310 .write_all(block.to_le_bytes().as_slice())
311 .map_err(|e| {
312 ParquetError::General(format!("Could not write bloom filter bit set: {e}"))
313 })?;
314 }
315 Ok(())
316 }
317
318 #[cfg(target_endian = "little")]
320 fn write_bitset<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
321 let slice = unsafe {
323 std::slice::from_raw_parts(
324 self.0.as_ptr() as *const u8,
325 self.0.len() * size_of::<Block>(),
326 )
327 };
328 writer.write_all(slice).map_err(|e| {
329 ParquetError::General(format!("Could not write bloom filter bit set: {e}"))
330 })?;
331 Ok(())
332 }
333
334 fn header(&self) -> BloomFilterHeader {
336 BloomFilterHeader {
337 num_bytes: self.0.len() as i32 * 4 * 8,
339 algorithm: BloomFilterAlgorithm::BLOCK,
340 hash: BloomFilterHash::XXHASH,
341 compression: BloomFilterCompression::UNCOMPRESSED,
342 }
343 }
344
345 pub fn read_from_column_chunk<R: ChunkReader>(
347 column_metadata: &ColumnChunkMetaData,
348 reader: &R,
349 ) -> Result<Option<Self>, ParquetError> {
350 let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
351 offset
352 .try_into()
353 .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
354 } else {
355 return Ok(None);
356 };
357
358 let buffer = match column_metadata.bloom_filter_length() {
359 Some(length) => reader.get_bytes(offset, length as usize),
360 None => reader.get_bytes(offset, SBBF_HEADER_SIZE_ESTIMATE),
361 }?;
362
363 let (header, bitset_offset) =
364 chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
365
366 match header.algorithm {
367 BloomFilterAlgorithm::BLOCK => {
368 }
370 }
371 match header.compression {
372 BloomFilterCompression::UNCOMPRESSED => {
373 }
375 }
376 match header.hash {
377 BloomFilterHash::XXHASH => {
378 }
380 }
381
382 let bitset = match column_metadata.bloom_filter_length() {
383 Some(_) => buffer.slice((bitset_offset - offset) as usize..),
384 None => {
385 let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
386 ParquetError::General("Bloom filter length is invalid".to_string())
387 })?;
388 reader.get_bytes(bitset_offset, bitset_length)?
389 }
390 };
391
392 Ok(Some(Self::new(&bitset)))
393 }
394
395 #[inline]
396 fn hash_to_block_index(&self, hash: u64) -> usize {
397 (((hash >> 32).saturating_mul(self.0.len() as u64)) >> 32) as usize
400 }
401
402 pub fn insert<T: AsBytes + ?Sized>(&mut self, value: &T) {
404 self.insert_hash(hash_as_bytes(value));
405 }
406
407 fn insert_hash(&mut self, hash: u64) {
409 let block_index = self.hash_to_block_index(hash);
410 self.0[block_index].insert(hash as u32)
411 }
412
413 pub fn check<T: AsBytes>(&self, value: &T) -> bool {
415 self.check_hash(hash_as_bytes(value))
416 }
417
418 fn check_hash(&self, hash: u64) -> bool {
422 let block_index = self.hash_to_block_index(hash);
423 self.0[block_index].check(hash as u32)
424 }
425
426 pub(crate) fn estimated_memory_size(&self) -> usize {
428 self.0.capacity() * std::mem::size_of::<Block>()
429 }
430
431 pub fn from_bytes(bytes: &[u8]) -> Result<Self, ParquetError> {
454 let (header, header_len) = read_bloom_filter_header_and_length_from_bytes(bytes)?;
455
456 let bitset_length: u64 = header
457 .num_bytes
458 .try_into()
459 .map_err(|_| ParquetError::General("Bloom filter length is invalid".to_string()))?;
460
461 if header_len + bitset_length != bytes.len() as u64 {
463 return Err(ParquetError::General(format!(
464 "Bloom filter data contains extra bytes: expected {} total bytes, got {}",
465 header_len + bitset_length,
466 bytes.len()
467 )));
468 }
469
470 let start = header_len as usize;
471 let end = (header_len + bitset_length) as usize;
472 let bitset = bytes
473 .get(start..end)
474 .ok_or_else(|| ParquetError::General("Bloom filter bitset is invalid".to_string()))?;
475
476 Ok(Self::new(bitset))
477 }
478}
479
480const SEED: u64 = 0;
482
483#[inline]
484fn hash_as_bytes<A: AsBytes + ?Sized>(value: &A) -> u64 {
485 XxHash64::oneshot(SEED, value.as_bytes())
486}
487
488#[cfg(test)]
489mod tests {
490 use super::*;
491
492 #[test]
493 fn test_hash_bytes() {
494 assert_eq!(hash_as_bytes(""), 17241709254077376921);
495 }
496
497 #[test]
498 fn test_mask_set_quick_check() {
499 for i in 0..1_000_000 {
500 let result = Block::mask(i);
501 assert!(result.0.iter().all(|&x| x.is_power_of_two()));
502 }
503 }
504
505 #[test]
506 fn test_block_insert_and_check() {
507 for i in 0..1_000_000 {
508 let mut block = Block::ZERO;
509 block.insert(i);
510 assert!(block.check(i));
511 }
512 }
513
514 #[test]
515 fn test_sbbf_insert_and_check() {
516 let mut sbbf = Sbbf(vec![Block::ZERO; 1_000]);
517 for i in 0..1_000_000 {
518 sbbf.insert(&i);
519 assert!(sbbf.check(&i));
520 }
521 }
522
523 #[test]
524 fn test_with_fixture() {
525 let bitset: &[u8] = &[
527 200, 1, 80, 20, 64, 68, 8, 109, 6, 37, 4, 67, 144, 80, 96, 32, 8, 132, 43, 33, 0, 5,
528 99, 65, 2, 0, 224, 44, 64, 78, 96, 4,
529 ];
530 let sbbf = Sbbf::new(bitset);
531 for a in 0..10i64 {
532 let value = format!("a{a}");
533 assert!(sbbf.check(&value.as_str()));
534 }
535 }
536
537 #[test]
541 fn test_bloom_filter_header_size_assumption() {
542 let buffer: &[u8; 16] = &[21, 64, 28, 28, 0, 0, 28, 28, 0, 0, 28, 28, 0, 0, 0, 99];
543 let (
544 BloomFilterHeader {
545 algorithm,
546 compression,
547 hash,
548 num_bytes,
549 },
550 read_length,
551 ) = read_bloom_filter_header_and_length(Bytes::copy_from_slice(buffer)).unwrap();
552 assert_eq!(read_length, 15);
553 assert_eq!(algorithm, BloomFilterAlgorithm::BLOCK);
554 assert_eq!(compression, BloomFilterCompression::UNCOMPRESSED);
555 assert_eq!(hash, BloomFilterHash::XXHASH);
556 assert_eq!(num_bytes, 32_i32);
557 assert_eq!(20, SBBF_HEADER_SIZE_ESTIMATE);
558 }
559
560 #[test]
561 fn test_optimal_num_of_bytes() {
562 for (input, expected) in &[
563 (0, 32),
564 (9, 32),
565 (31, 32),
566 (32, 32),
567 (33, 64),
568 (99, 128),
569 (1024, 1024),
570 (999_000_000, 128 * 1024 * 1024),
571 ] {
572 assert_eq!(*expected, optimal_num_of_bytes(*input));
573 }
574 }
575
576 #[test]
577 fn test_num_of_bits_from_ndv_fpp() {
578 for (fpp, ndv, num_bits) in &[
579 (0.1, 10, 57),
580 (0.01, 10, 96),
581 (0.001, 10, 146),
582 (0.1, 100, 577),
583 (0.01, 100, 968),
584 (0.001, 100, 1460),
585 (0.1, 1000, 5772),
586 (0.01, 1000, 9681),
587 (0.001, 1000, 14607),
588 (0.1, 10000, 57725),
589 (0.01, 10000, 96815),
590 (0.001, 10000, 146076),
591 (0.1, 100000, 577254),
592 (0.01, 100000, 968152),
593 (0.001, 100000, 1460769),
594 (0.1, 1000000, 5772541),
595 (0.01, 1000000, 9681526),
596 (0.001, 1000000, 14607697),
597 (1e-50, 1_000_000_000_000, 14226231280773240832),
598 ] {
599 assert_eq!(*num_bits, num_of_bits_from_ndv_fpp(*ndv, *fpp) as u64);
600 }
601 }
602
603 #[test]
604 fn test_sbbf_write_round_trip() {
605 let bitset_bytes = vec![0u8; 32];
607 let mut original = Sbbf::new(&bitset_bytes);
608
609 let test_values = ["hello", "world", "rust", "parquet", "bloom", "filter"];
611 for value in &test_values {
612 original.insert(value);
613 }
614
615 let mut output = Vec::new();
617 original.write(&mut output).unwrap();
618
619 let mut protocol = ThriftSliceInputProtocol::new(&output);
621 let header = BloomFilterHeader::read_thrift(&mut protocol).unwrap();
622 assert_eq!(header.num_bytes, bitset_bytes.len() as i32);
623 assert_eq!(header.algorithm, BloomFilterAlgorithm::BLOCK);
624 assert_eq!(header.hash, BloomFilterHash::XXHASH);
625 assert_eq!(header.compression, BloomFilterCompression::UNCOMPRESSED);
626
627 let reconstructed = Sbbf::from_bytes(&output).unwrap();
629
630 for value in &test_values {
634 assert!(
635 reconstructed.check(value),
636 "Value '{}' should be present after round-trip",
637 value
638 );
639 }
640 }
641}