1use crate::data_type::AsBytes;
76use crate::errors::ParquetError;
77use crate::file::metadata::ColumnChunkMetaData;
78use crate::file::reader::ChunkReader;
79use crate::format::{
80 BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash, BloomFilterHeader,
81 SplitBlockAlgorithm, Uncompressed, XxHash,
82};
83use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
84use bytes::Bytes;
85use std::io::Write;
86use thrift::protocol::{TCompactOutputProtocol, TOutputProtocol};
87use twox_hash::XxHash64;
88
89const SALT: [u32; 8] = [
91 0x47b6137b_u32,
92 0x44974d91_u32,
93 0x8824ad5b_u32,
94 0xa2b7289d_u32,
95 0x705495c7_u32,
96 0x2df1424b_u32,
97 0x9efc4947_u32,
98 0x5c6bfb31_u32,
99];
100
101#[derive(Debug, Copy, Clone)]
104#[repr(transparent)]
105struct Block([u32; 8]);
106impl Block {
107 const ZERO: Block = Block([0; 8]);
108
109 fn mask(x: u32) -> Self {
112 let mut result = [0_u32; 8];
113 for i in 0..8 {
114 let y = x.wrapping_mul(SALT[i]);
116 let y = y >> 27;
117 result[i] = 1 << y;
118 }
119 Self(result)
120 }
121
122 #[inline]
123 #[cfg(not(target_endian = "little"))]
124 fn to_le_bytes(self) -> [u8; 32] {
125 self.swap_bytes().to_ne_bytes()
126 }
127
128 #[inline]
129 #[cfg(not(target_endian = "little"))]
130 fn swap_bytes(mut self) -> Self {
131 self.0.iter_mut().for_each(|x| *x = x.swap_bytes());
132 self
133 }
134
135 fn insert(&mut self, hash: u32) {
137 let mask = Self::mask(hash);
138 for i in 0..8 {
139 self[i] |= mask[i];
140 }
141 }
142
143 fn check(&self, hash: u32) -> bool {
145 let mask = Self::mask(hash);
146 for i in 0..8 {
147 if self[i] & mask[i] == 0 {
148 return false;
149 }
150 }
151 true
152 }
153}
154
155impl std::ops::Index<usize> for Block {
156 type Output = u32;
157
158 #[inline]
159 fn index(&self, index: usize) -> &Self::Output {
160 self.0.index(index)
161 }
162}
163
164impl std::ops::IndexMut<usize> for Block {
165 #[inline]
166 fn index_mut(&mut self, index: usize) -> &mut Self::Output {
167 self.0.index_mut(index)
168 }
169}
170
171#[derive(Debug, Clone)]
176pub struct Sbbf(Vec<Block>);
177
178pub(crate) const SBBF_HEADER_SIZE_ESTIMATE: usize = 20;
179
180pub(crate) fn chunk_read_bloom_filter_header_and_offset(
183 offset: u64,
184 buffer: Bytes,
185) -> Result<(BloomFilterHeader, u64), ParquetError> {
186 let (header, length) = read_bloom_filter_header_and_length(buffer)?;
187 Ok((header, offset + length))
188}
189
190#[inline]
193pub(crate) fn read_bloom_filter_header_and_length(
194 buffer: Bytes,
195) -> Result<(BloomFilterHeader, u64), ParquetError> {
196 let total_length = buffer.len();
197 let mut prot = TCompactSliceInputProtocol::new(buffer.as_ref());
198 let header = BloomFilterHeader::read_from_in_protocol(&mut prot)
199 .map_err(|e| ParquetError::General(format!("Could not read bloom filter header: {e}")))?;
200 Ok((header, (total_length - prot.as_slice().len()) as u64))
201}
202
203pub(crate) const BITSET_MIN_LENGTH: usize = 32;
204pub(crate) const BITSET_MAX_LENGTH: usize = 128 * 1024 * 1024;
205
206#[inline]
207fn optimal_num_of_bytes(num_bytes: usize) -> usize {
208 let num_bytes = num_bytes.min(BITSET_MAX_LENGTH);
209 let num_bytes = num_bytes.max(BITSET_MIN_LENGTH);
210 num_bytes.next_power_of_two()
211}
212
213#[inline]
218fn num_of_bits_from_ndv_fpp(ndv: u64, fpp: f64) -> usize {
219 let num_bits = -8.0 * ndv as f64 / (1.0 - fpp.powf(1.0 / 8.0)).ln();
220 num_bits as usize
221}
222
223impl Sbbf {
224 pub(crate) fn new_with_ndv_fpp(ndv: u64, fpp: f64) -> Result<Self, ParquetError> {
227 if !(0.0..1.0).contains(&fpp) {
228 return Err(ParquetError::General(format!(
229 "False positive probability must be between 0.0 and 1.0, got {fpp}"
230 )));
231 }
232 let num_bits = num_of_bits_from_ndv_fpp(ndv, fpp);
233 Ok(Self::new_with_num_of_bytes(num_bits / 8))
234 }
235
236 pub(crate) fn new_with_num_of_bytes(num_bytes: usize) -> Self {
239 let num_bytes = optimal_num_of_bytes(num_bytes);
240 assert_eq!(num_bytes % size_of::<Block>(), 0);
241 let num_blocks = num_bytes / size_of::<Block>();
242 let bitset = vec![Block::ZERO; num_blocks];
243 Self(bitset)
244 }
245
246 pub(crate) fn new(bitset: &[u8]) -> Self {
247 let data = bitset
248 .chunks_exact(4 * 8)
249 .map(|chunk| {
250 let mut block = Block::ZERO;
251 for (i, word) in chunk.chunks_exact(4).enumerate() {
252 block[i] = u32::from_le_bytes(word.try_into().unwrap());
253 }
254 block
255 })
256 .collect::<Vec<Block>>();
257 Self(data)
258 }
259
260 pub(crate) fn write<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
264 let mut protocol = TCompactOutputProtocol::new(&mut writer);
265 let header = self.header();
266 header.write_to_out_protocol(&mut protocol).map_err(|e| {
267 ParquetError::General(format!("Could not write bloom filter header: {e}"))
268 })?;
269 protocol.flush()?;
270 self.write_bitset(&mut writer)?;
271 Ok(())
272 }
273
274 #[cfg(not(target_endian = "little"))]
276 fn write_bitset<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
277 for block in &self.0 {
278 writer
279 .write_all(block.to_le_bytes().as_slice())
280 .map_err(|e| {
281 ParquetError::General(format!("Could not write bloom filter bit set: {e}"))
282 })?;
283 }
284 Ok(())
285 }
286
287 #[cfg(target_endian = "little")]
289 fn write_bitset<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
290 let slice = unsafe {
292 std::slice::from_raw_parts(
293 self.0.as_ptr() as *const u8,
294 self.0.len() * size_of::<Block>(),
295 )
296 };
297 writer.write_all(slice).map_err(|e| {
298 ParquetError::General(format!("Could not write bloom filter bit set: {e}"))
299 })?;
300 Ok(())
301 }
302
303 fn header(&self) -> BloomFilterHeader {
305 BloomFilterHeader {
306 num_bytes: self.0.len() as i32 * 4 * 8,
308 algorithm: BloomFilterAlgorithm::BLOCK(SplitBlockAlgorithm {}),
309 hash: BloomFilterHash::XXHASH(XxHash {}),
310 compression: BloomFilterCompression::UNCOMPRESSED(Uncompressed {}),
311 }
312 }
313
314 pub(crate) fn read_from_column_chunk<R: ChunkReader>(
316 column_metadata: &ColumnChunkMetaData,
317 reader: &R,
318 ) -> Result<Option<Self>, ParquetError> {
319 let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
320 offset
321 .try_into()
322 .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
323 } else {
324 return Ok(None);
325 };
326
327 let buffer = match column_metadata.bloom_filter_length() {
328 Some(length) => reader.get_bytes(offset, length as usize),
329 None => reader.get_bytes(offset, SBBF_HEADER_SIZE_ESTIMATE),
330 }?;
331
332 let (header, bitset_offset) =
333 chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
334
335 match header.algorithm {
336 BloomFilterAlgorithm::BLOCK(_) => {
337 }
339 }
340 match header.compression {
341 BloomFilterCompression::UNCOMPRESSED(_) => {
342 }
344 }
345 match header.hash {
346 BloomFilterHash::XXHASH(_) => {
347 }
349 }
350
351 let bitset = match column_metadata.bloom_filter_length() {
352 Some(_) => buffer.slice((bitset_offset - offset) as usize..),
353 None => {
354 let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
355 ParquetError::General("Bloom filter length is invalid".to_string())
356 })?;
357 reader.get_bytes(bitset_offset, bitset_length)?
358 }
359 };
360
361 Ok(Some(Self::new(&bitset)))
362 }
363
364 #[inline]
365 fn hash_to_block_index(&self, hash: u64) -> usize {
366 (((hash >> 32).saturating_mul(self.0.len() as u64)) >> 32) as usize
369 }
370
371 pub fn insert<T: AsBytes + ?Sized>(&mut self, value: &T) {
373 self.insert_hash(hash_as_bytes(value));
374 }
375
376 fn insert_hash(&mut self, hash: u64) {
378 let block_index = self.hash_to_block_index(hash);
379 self.0[block_index].insert(hash as u32)
380 }
381
382 pub fn check<T: AsBytes>(&self, value: &T) -> bool {
384 self.check_hash(hash_as_bytes(value))
385 }
386
387 fn check_hash(&self, hash: u64) -> bool {
391 let block_index = self.hash_to_block_index(hash);
392 self.0[block_index].check(hash as u32)
393 }
394
395 pub(crate) fn estimated_memory_size(&self) -> usize {
397 self.0.capacity() * std::mem::size_of::<Block>()
398 }
399}
400
401const SEED: u64 = 0;
403
404#[inline]
405fn hash_as_bytes<A: AsBytes + ?Sized>(value: &A) -> u64 {
406 XxHash64::oneshot(SEED, value.as_bytes())
407}
408
409#[cfg(test)]
410mod tests {
411 use super::*;
412
413 #[test]
414 fn test_hash_bytes() {
415 assert_eq!(hash_as_bytes(""), 17241709254077376921);
416 }
417
418 #[test]
419 fn test_mask_set_quick_check() {
420 for i in 0..1_000_000 {
421 let result = Block::mask(i);
422 assert!(result.0.iter().all(|&x| x.is_power_of_two()));
423 }
424 }
425
426 #[test]
427 fn test_block_insert_and_check() {
428 for i in 0..1_000_000 {
429 let mut block = Block::ZERO;
430 block.insert(i);
431 assert!(block.check(i));
432 }
433 }
434
435 #[test]
436 fn test_sbbf_insert_and_check() {
437 let mut sbbf = Sbbf(vec![Block::ZERO; 1_000]);
438 for i in 0..1_000_000 {
439 sbbf.insert(&i);
440 assert!(sbbf.check(&i));
441 }
442 }
443
444 #[test]
445 fn test_with_fixture() {
446 let bitset: &[u8] = &[
448 200, 1, 80, 20, 64, 68, 8, 109, 6, 37, 4, 67, 144, 80, 96, 32, 8, 132, 43, 33, 0, 5,
449 99, 65, 2, 0, 224, 44, 64, 78, 96, 4,
450 ];
451 let sbbf = Sbbf::new(bitset);
452 for a in 0..10i64 {
453 let value = format!("a{a}");
454 assert!(sbbf.check(&value.as_str()));
455 }
456 }
457
458 #[test]
462 fn test_bloom_filter_header_size_assumption() {
463 let buffer: &[u8; 16] = &[21, 64, 28, 28, 0, 0, 28, 28, 0, 0, 28, 28, 0, 0, 0, 99];
464 let (
465 BloomFilterHeader {
466 algorithm,
467 compression,
468 hash,
469 num_bytes,
470 },
471 read_length,
472 ) = read_bloom_filter_header_and_length(Bytes::copy_from_slice(buffer)).unwrap();
473 assert_eq!(read_length, 15);
474 assert_eq!(
475 algorithm,
476 BloomFilterAlgorithm::BLOCK(SplitBlockAlgorithm {})
477 );
478 assert_eq!(
479 compression,
480 BloomFilterCompression::UNCOMPRESSED(Uncompressed {})
481 );
482 assert_eq!(hash, BloomFilterHash::XXHASH(XxHash {}));
483 assert_eq!(num_bytes, 32_i32);
484 assert_eq!(20, SBBF_HEADER_SIZE_ESTIMATE);
485 }
486
487 #[test]
488 fn test_optimal_num_of_bytes() {
489 for (input, expected) in &[
490 (0, 32),
491 (9, 32),
492 (31, 32),
493 (32, 32),
494 (33, 64),
495 (99, 128),
496 (1024, 1024),
497 (999_000_000, 128 * 1024 * 1024),
498 ] {
499 assert_eq!(*expected, optimal_num_of_bytes(*input));
500 }
501 }
502
503 #[test]
504 fn test_num_of_bits_from_ndv_fpp() {
505 for (fpp, ndv, num_bits) in &[
506 (0.1, 10, 57),
507 (0.01, 10, 96),
508 (0.001, 10, 146),
509 (0.1, 100, 577),
510 (0.01, 100, 968),
511 (0.001, 100, 1460),
512 (0.1, 1000, 5772),
513 (0.01, 1000, 9681),
514 (0.001, 1000, 14607),
515 (0.1, 10000, 57725),
516 (0.01, 10000, 96815),
517 (0.001, 10000, 146076),
518 (0.1, 100000, 577254),
519 (0.01, 100000, 968152),
520 (0.001, 100000, 1460769),
521 (0.1, 1000000, 5772541),
522 (0.01, 1000000, 9681526),
523 (0.001, 1000000, 14607697),
524 (1e-50, 1_000_000_000_000, 14226231280773240832),
525 ] {
526 assert_eq!(*num_bits, num_of_bits_from_ndv_fpp(*ndv, *fpp) as u64);
527 }
528 }
529}