1use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
76use crate::data_type::AsBytes;
77use crate::errors::{ParquetError, Result};
78use crate::file::metadata::ColumnChunkMetaData;
79use crate::file::reader::ChunkReader;
80use crate::parquet_thrift::{
81 ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol, ThriftCompactOutputProtocol,
82 ThriftSliceInputProtocol, WriteThrift, WriteThriftField,
83};
84use crate::thrift_struct;
85use bytes::Bytes;
86use std::io::Write;
87use twox_hash::XxHash64;
88
89const SALT: [u32; 8] = [
91 0x47b6137b_u32,
92 0x44974d91_u32,
93 0x8824ad5b_u32,
94 0xa2b7289d_u32,
95 0x705495c7_u32,
96 0x2df1424b_u32,
97 0x9efc4947_u32,
98 0x5c6bfb31_u32,
99];
100
101thrift_struct!(
102pub struct BloomFilterHeader {
106 1: required i32 num_bytes;
108 2: required BloomFilterAlgorithm algorithm;
110 3: required BloomFilterHash hash;
112 4: required BloomFilterCompression compression;
114}
115);
116
117#[derive(Debug, Copy, Clone)]
120#[repr(transparent)]
121struct Block([u32; 8]);
122impl Block {
123 const ZERO: Block = Block([0; 8]);
124
125 fn mask(x: u32) -> Self {
128 let mut result = [0_u32; 8];
129 for i in 0..8 {
130 let y = x.wrapping_mul(SALT[i]);
132 let y = y >> 27;
133 result[i] = 1 << y;
134 }
135 Self(result)
136 }
137
138 #[inline]
139 #[cfg(not(target_endian = "little"))]
140 fn to_ne_bytes(self) -> [u8; 32] {
141 unsafe { std::mem::transmute(self.0) }
143 }
144
145 #[inline]
146 #[cfg(not(target_endian = "little"))]
147 fn to_le_bytes(self) -> [u8; 32] {
148 self.swap_bytes().to_ne_bytes()
149 }
150
151 #[inline]
152 #[cfg(not(target_endian = "little"))]
153 fn swap_bytes(mut self) -> Self {
154 self.0.iter_mut().for_each(|x| *x = x.swap_bytes());
155 self
156 }
157
158 fn insert(&mut self, hash: u32) {
160 let mask = Self::mask(hash);
161 for i in 0..8 {
162 self[i] |= mask[i];
163 }
164 }
165
166 fn check(&self, hash: u32) -> bool {
168 let mask = Self::mask(hash);
169 for i in 0..8 {
170 if self[i] & mask[i] == 0 {
171 return false;
172 }
173 }
174 true
175 }
176}
177
178impl std::ops::Index<usize> for Block {
179 type Output = u32;
180
181 #[inline]
182 fn index(&self, index: usize) -> &Self::Output {
183 self.0.index(index)
184 }
185}
186
187impl std::ops::IndexMut<usize> for Block {
188 #[inline]
189 fn index_mut(&mut self, index: usize) -> &mut Self::Output {
190 self.0.index_mut(index)
191 }
192}
193
194#[derive(Debug, Clone)]
199pub struct Sbbf(Vec<Block>);
200
201pub(crate) const SBBF_HEADER_SIZE_ESTIMATE: usize = 20;
202
203pub(crate) fn chunk_read_bloom_filter_header_and_offset(
206 offset: u64,
207 buffer: Bytes,
208) -> Result<(BloomFilterHeader, u64), ParquetError> {
209 let (header, length) = read_bloom_filter_header_and_length(buffer)?;
210 Ok((header, offset + length))
211}
212
213#[inline]
216pub(crate) fn read_bloom_filter_header_and_length(
217 buffer: Bytes,
218) -> Result<(BloomFilterHeader, u64), ParquetError> {
219 let total_length = buffer.len();
220 let mut prot = ThriftSliceInputProtocol::new(buffer.as_ref());
221 let header = BloomFilterHeader::read_thrift(&mut prot)
222 .map_err(|e| ParquetError::General(format!("Could not read bloom filter header: {e}")))?;
223 Ok((header, (total_length - prot.as_slice().len()) as u64))
224}
225
226pub(crate) const BITSET_MIN_LENGTH: usize = 32;
227pub(crate) const BITSET_MAX_LENGTH: usize = 128 * 1024 * 1024;
228
229#[inline]
230fn optimal_num_of_bytes(num_bytes: usize) -> usize {
231 let num_bytes = num_bytes.min(BITSET_MAX_LENGTH);
232 let num_bytes = num_bytes.max(BITSET_MIN_LENGTH);
233 num_bytes.next_power_of_two()
234}
235
236#[inline]
241fn num_of_bits_from_ndv_fpp(ndv: u64, fpp: f64) -> usize {
242 let num_bits = -8.0 * ndv as f64 / (1.0 - fpp.powf(1.0 / 8.0)).ln();
243 num_bits as usize
244}
245
246impl Sbbf {
247 pub(crate) fn new_with_ndv_fpp(ndv: u64, fpp: f64) -> Result<Self, ParquetError> {
250 if !(0.0..1.0).contains(&fpp) {
251 return Err(ParquetError::General(format!(
252 "False positive probability must be between 0.0 and 1.0, got {fpp}"
253 )));
254 }
255 let num_bits = num_of_bits_from_ndv_fpp(ndv, fpp);
256 Ok(Self::new_with_num_of_bytes(num_bits / 8))
257 }
258
259 pub(crate) fn new_with_num_of_bytes(num_bytes: usize) -> Self {
262 let num_bytes = optimal_num_of_bytes(num_bytes);
263 assert_eq!(num_bytes % size_of::<Block>(), 0);
264 let num_blocks = num_bytes / size_of::<Block>();
265 let bitset = vec![Block::ZERO; num_blocks];
266 Self(bitset)
267 }
268
269 pub(crate) fn new(bitset: &[u8]) -> Self {
270 let data = bitset
271 .chunks_exact(4 * 8)
272 .map(|chunk| {
273 let mut block = Block::ZERO;
274 for (i, word) in chunk.chunks_exact(4).enumerate() {
275 block[i] = u32::from_le_bytes(word.try_into().unwrap());
276 }
277 block
278 })
279 .collect::<Vec<Block>>();
280 Self(data)
281 }
282
283 pub(crate) fn write<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
287 let mut protocol = ThriftCompactOutputProtocol::new(&mut writer);
288 self.header().write_thrift(&mut protocol).map_err(|e| {
289 ParquetError::General(format!("Could not write bloom filter header: {e}"))
290 })?;
291 self.write_bitset(&mut writer)?;
292 Ok(())
293 }
294
295 #[cfg(not(target_endian = "little"))]
297 fn write_bitset<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
298 for block in &self.0 {
299 writer
300 .write_all(block.to_le_bytes().as_slice())
301 .map_err(|e| {
302 ParquetError::General(format!("Could not write bloom filter bit set: {e}"))
303 })?;
304 }
305 Ok(())
306 }
307
308 #[cfg(target_endian = "little")]
310 fn write_bitset<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
311 let slice = unsafe {
313 std::slice::from_raw_parts(
314 self.0.as_ptr() as *const u8,
315 self.0.len() * size_of::<Block>(),
316 )
317 };
318 writer.write_all(slice).map_err(|e| {
319 ParquetError::General(format!("Could not write bloom filter bit set: {e}"))
320 })?;
321 Ok(())
322 }
323
324 fn header(&self) -> BloomFilterHeader {
326 BloomFilterHeader {
327 num_bytes: self.0.len() as i32 * 4 * 8,
329 algorithm: BloomFilterAlgorithm::BLOCK,
330 hash: BloomFilterHash::XXHASH,
331 compression: BloomFilterCompression::UNCOMPRESSED,
332 }
333 }
334
335 pub(crate) fn read_from_column_chunk<R: ChunkReader>(
337 column_metadata: &ColumnChunkMetaData,
338 reader: &R,
339 ) -> Result<Option<Self>, ParquetError> {
340 let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
341 offset
342 .try_into()
343 .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
344 } else {
345 return Ok(None);
346 };
347
348 let buffer = match column_metadata.bloom_filter_length() {
349 Some(length) => reader.get_bytes(offset, length as usize),
350 None => reader.get_bytes(offset, SBBF_HEADER_SIZE_ESTIMATE),
351 }?;
352
353 let (header, bitset_offset) =
354 chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
355
356 match header.algorithm {
357 BloomFilterAlgorithm::BLOCK => {
358 }
360 }
361 match header.compression {
362 BloomFilterCompression::UNCOMPRESSED => {
363 }
365 }
366 match header.hash {
367 BloomFilterHash::XXHASH => {
368 }
370 }
371
372 let bitset = match column_metadata.bloom_filter_length() {
373 Some(_) => buffer.slice((bitset_offset - offset) as usize..),
374 None => {
375 let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
376 ParquetError::General("Bloom filter length is invalid".to_string())
377 })?;
378 reader.get_bytes(bitset_offset, bitset_length)?
379 }
380 };
381
382 Ok(Some(Self::new(&bitset)))
383 }
384
385 #[inline]
386 fn hash_to_block_index(&self, hash: u64) -> usize {
387 (((hash >> 32).saturating_mul(self.0.len() as u64)) >> 32) as usize
390 }
391
392 pub fn insert<T: AsBytes + ?Sized>(&mut self, value: &T) {
394 self.insert_hash(hash_as_bytes(value));
395 }
396
397 fn insert_hash(&mut self, hash: u64) {
399 let block_index = self.hash_to_block_index(hash);
400 self.0[block_index].insert(hash as u32)
401 }
402
403 pub fn check<T: AsBytes>(&self, value: &T) -> bool {
405 self.check_hash(hash_as_bytes(value))
406 }
407
408 fn check_hash(&self, hash: u64) -> bool {
412 let block_index = self.hash_to_block_index(hash);
413 self.0[block_index].check(hash as u32)
414 }
415
416 pub(crate) fn estimated_memory_size(&self) -> usize {
418 self.0.capacity() * std::mem::size_of::<Block>()
419 }
420}
421
422const SEED: u64 = 0;
424
425#[inline]
426fn hash_as_bytes<A: AsBytes + ?Sized>(value: &A) -> u64 {
427 XxHash64::oneshot(SEED, value.as_bytes())
428}
429
430#[cfg(test)]
431mod tests {
432 use super::*;
433
434 #[test]
435 fn test_hash_bytes() {
436 assert_eq!(hash_as_bytes(""), 17241709254077376921);
437 }
438
439 #[test]
440 fn test_mask_set_quick_check() {
441 for i in 0..1_000_000 {
442 let result = Block::mask(i);
443 assert!(result.0.iter().all(|&x| x.is_power_of_two()));
444 }
445 }
446
447 #[test]
448 fn test_block_insert_and_check() {
449 for i in 0..1_000_000 {
450 let mut block = Block::ZERO;
451 block.insert(i);
452 assert!(block.check(i));
453 }
454 }
455
456 #[test]
457 fn test_sbbf_insert_and_check() {
458 let mut sbbf = Sbbf(vec![Block::ZERO; 1_000]);
459 for i in 0..1_000_000 {
460 sbbf.insert(&i);
461 assert!(sbbf.check(&i));
462 }
463 }
464
465 #[test]
466 fn test_with_fixture() {
467 let bitset: &[u8] = &[
469 200, 1, 80, 20, 64, 68, 8, 109, 6, 37, 4, 67, 144, 80, 96, 32, 8, 132, 43, 33, 0, 5,
470 99, 65, 2, 0, 224, 44, 64, 78, 96, 4,
471 ];
472 let sbbf = Sbbf::new(bitset);
473 for a in 0..10i64 {
474 let value = format!("a{a}");
475 assert!(sbbf.check(&value.as_str()));
476 }
477 }
478
479 #[test]
483 fn test_bloom_filter_header_size_assumption() {
484 let buffer: &[u8; 16] = &[21, 64, 28, 28, 0, 0, 28, 28, 0, 0, 28, 28, 0, 0, 0, 99];
485 let (
486 BloomFilterHeader {
487 algorithm,
488 compression,
489 hash,
490 num_bytes,
491 },
492 read_length,
493 ) = read_bloom_filter_header_and_length(Bytes::copy_from_slice(buffer)).unwrap();
494 assert_eq!(read_length, 15);
495 assert_eq!(algorithm, BloomFilterAlgorithm::BLOCK);
496 assert_eq!(compression, BloomFilterCompression::UNCOMPRESSED);
497 assert_eq!(hash, BloomFilterHash::XXHASH);
498 assert_eq!(num_bytes, 32_i32);
499 assert_eq!(20, SBBF_HEADER_SIZE_ESTIMATE);
500 }
501
502 #[test]
503 fn test_optimal_num_of_bytes() {
504 for (input, expected) in &[
505 (0, 32),
506 (9, 32),
507 (31, 32),
508 (32, 32),
509 (33, 64),
510 (99, 128),
511 (1024, 1024),
512 (999_000_000, 128 * 1024 * 1024),
513 ] {
514 assert_eq!(*expected, optimal_num_of_bytes(*input));
515 }
516 }
517
518 #[test]
519 fn test_num_of_bits_from_ndv_fpp() {
520 for (fpp, ndv, num_bits) in &[
521 (0.1, 10, 57),
522 (0.01, 10, 96),
523 (0.001, 10, 146),
524 (0.1, 100, 577),
525 (0.01, 100, 968),
526 (0.001, 100, 1460),
527 (0.1, 1000, 5772),
528 (0.01, 1000, 9681),
529 (0.001, 1000, 14607),
530 (0.1, 10000, 57725),
531 (0.01, 10000, 96815),
532 (0.001, 10000, 146076),
533 (0.1, 100000, 577254),
534 (0.01, 100000, 968152),
535 (0.001, 100000, 1460769),
536 (0.1, 1000000, 5772541),
537 (0.01, 1000000, 9681526),
538 (0.001, 1000000, 14607697),
539 (1e-50, 1_000_000_000_000, 14226231280773240832),
540 ] {
541 assert_eq!(*num_bits, num_of_bits_from_ndv_fpp(*ndv, *fpp) as u64);
542 }
543 }
544}