1use crate::data_type::AsBytes;
76use crate::errors::ParquetError;
77use crate::file::metadata::ColumnChunkMetaData;
78use crate::file::reader::ChunkReader;
79use crate::format::{
80 BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash, BloomFilterHeader,
81 SplitBlockAlgorithm, Uncompressed, XxHash,
82};
83use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
84use bytes::Bytes;
85use std::io::Write;
86use thrift::protocol::{TCompactOutputProtocol, TOutputProtocol};
87use twox_hash::XxHash64;
88
89const SALT: [u32; 8] = [
91 0x47b6137b_u32,
92 0x44974d91_u32,
93 0x8824ad5b_u32,
94 0xa2b7289d_u32,
95 0x705495c7_u32,
96 0x2df1424b_u32,
97 0x9efc4947_u32,
98 0x5c6bfb31_u32,
99];
100
101#[derive(Debug, Copy, Clone)]
104#[repr(transparent)]
105struct Block([u32; 8]);
106impl Block {
107 const ZERO: Block = Block([0; 8]);
108
109 fn mask(x: u32) -> Self {
112 let mut result = [0_u32; 8];
113 for i in 0..8 {
114 let y = x.wrapping_mul(SALT[i]);
116 let y = y >> 27;
117 result[i] = 1 << y;
118 }
119 Self(result)
120 }
121
122 #[inline]
123 #[cfg(not(target_endian = "little"))]
124 fn to_ne_bytes(self) -> [u8; 32] {
125 unsafe { std::mem::transmute(self.0) }
127 }
128
129 #[inline]
130 #[cfg(not(target_endian = "little"))]
131 fn to_le_bytes(self) -> [u8; 32] {
132 self.swap_bytes().to_ne_bytes()
133 }
134
135 #[inline]
136 #[cfg(not(target_endian = "little"))]
137 fn swap_bytes(mut self) -> Self {
138 self.0.iter_mut().for_each(|x| *x = x.swap_bytes());
139 self
140 }
141
142 fn insert(&mut self, hash: u32) {
144 let mask = Self::mask(hash);
145 for i in 0..8 {
146 self[i] |= mask[i];
147 }
148 }
149
150 fn check(&self, hash: u32) -> bool {
152 let mask = Self::mask(hash);
153 for i in 0..8 {
154 if self[i] & mask[i] == 0 {
155 return false;
156 }
157 }
158 true
159 }
160}
161
162impl std::ops::Index<usize> for Block {
163 type Output = u32;
164
165 #[inline]
166 fn index(&self, index: usize) -> &Self::Output {
167 self.0.index(index)
168 }
169}
170
171impl std::ops::IndexMut<usize> for Block {
172 #[inline]
173 fn index_mut(&mut self, index: usize) -> &mut Self::Output {
174 self.0.index_mut(index)
175 }
176}
177
178#[derive(Debug, Clone)]
183pub struct Sbbf(Vec<Block>);
184
185pub(crate) const SBBF_HEADER_SIZE_ESTIMATE: usize = 20;
186
187pub(crate) fn chunk_read_bloom_filter_header_and_offset(
190 offset: u64,
191 buffer: Bytes,
192) -> Result<(BloomFilterHeader, u64), ParquetError> {
193 let (header, length) = read_bloom_filter_header_and_length(buffer)?;
194 Ok((header, offset + length))
195}
196
197#[inline]
200pub(crate) fn read_bloom_filter_header_and_length(
201 buffer: Bytes,
202) -> Result<(BloomFilterHeader, u64), ParquetError> {
203 let total_length = buffer.len();
204 let mut prot = TCompactSliceInputProtocol::new(buffer.as_ref());
205 let header = BloomFilterHeader::read_from_in_protocol(&mut prot)
206 .map_err(|e| ParquetError::General(format!("Could not read bloom filter header: {e}")))?;
207 Ok((header, (total_length - prot.as_slice().len()) as u64))
208}
209
210pub(crate) const BITSET_MIN_LENGTH: usize = 32;
211pub(crate) const BITSET_MAX_LENGTH: usize = 128 * 1024 * 1024;
212
213#[inline]
214fn optimal_num_of_bytes(num_bytes: usize) -> usize {
215 let num_bytes = num_bytes.min(BITSET_MAX_LENGTH);
216 let num_bytes = num_bytes.max(BITSET_MIN_LENGTH);
217 num_bytes.next_power_of_two()
218}
219
220#[inline]
225fn num_of_bits_from_ndv_fpp(ndv: u64, fpp: f64) -> usize {
226 let num_bits = -8.0 * ndv as f64 / (1.0 - fpp.powf(1.0 / 8.0)).ln();
227 num_bits as usize
228}
229
230impl Sbbf {
231 pub(crate) fn new_with_ndv_fpp(ndv: u64, fpp: f64) -> Result<Self, ParquetError> {
234 if !(0.0..1.0).contains(&fpp) {
235 return Err(ParquetError::General(format!(
236 "False positive probability must be between 0.0 and 1.0, got {fpp}"
237 )));
238 }
239 let num_bits = num_of_bits_from_ndv_fpp(ndv, fpp);
240 Ok(Self::new_with_num_of_bytes(num_bits / 8))
241 }
242
243 pub(crate) fn new_with_num_of_bytes(num_bytes: usize) -> Self {
246 let num_bytes = optimal_num_of_bytes(num_bytes);
247 assert_eq!(num_bytes % size_of::<Block>(), 0);
248 let num_blocks = num_bytes / size_of::<Block>();
249 let bitset = vec![Block::ZERO; num_blocks];
250 Self(bitset)
251 }
252
253 pub(crate) fn new(bitset: &[u8]) -> Self {
254 let data = bitset
255 .chunks_exact(4 * 8)
256 .map(|chunk| {
257 let mut block = Block::ZERO;
258 for (i, word) in chunk.chunks_exact(4).enumerate() {
259 block[i] = u32::from_le_bytes(word.try_into().unwrap());
260 }
261 block
262 })
263 .collect::<Vec<Block>>();
264 Self(data)
265 }
266
267 pub(crate) fn write<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
271 let mut protocol = TCompactOutputProtocol::new(&mut writer);
272 let header = self.header();
273 header.write_to_out_protocol(&mut protocol).map_err(|e| {
274 ParquetError::General(format!("Could not write bloom filter header: {e}"))
275 })?;
276 protocol.flush()?;
277 self.write_bitset(&mut writer)?;
278 Ok(())
279 }
280
281 #[cfg(not(target_endian = "little"))]
283 fn write_bitset<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
284 for block in &self.0 {
285 writer
286 .write_all(block.to_le_bytes().as_slice())
287 .map_err(|e| {
288 ParquetError::General(format!("Could not write bloom filter bit set: {e}"))
289 })?;
290 }
291 Ok(())
292 }
293
294 #[cfg(target_endian = "little")]
296 fn write_bitset<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
297 let slice = unsafe {
299 std::slice::from_raw_parts(
300 self.0.as_ptr() as *const u8,
301 self.0.len() * size_of::<Block>(),
302 )
303 };
304 writer.write_all(slice).map_err(|e| {
305 ParquetError::General(format!("Could not write bloom filter bit set: {e}"))
306 })?;
307 Ok(())
308 }
309
310 fn header(&self) -> BloomFilterHeader {
312 BloomFilterHeader {
313 num_bytes: self.0.len() as i32 * 4 * 8,
315 algorithm: BloomFilterAlgorithm::BLOCK(SplitBlockAlgorithm {}),
316 hash: BloomFilterHash::XXHASH(XxHash {}),
317 compression: BloomFilterCompression::UNCOMPRESSED(Uncompressed {}),
318 }
319 }
320
321 pub(crate) fn read_from_column_chunk<R: ChunkReader>(
323 column_metadata: &ColumnChunkMetaData,
324 reader: &R,
325 ) -> Result<Option<Self>, ParquetError> {
326 let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
327 offset
328 .try_into()
329 .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
330 } else {
331 return Ok(None);
332 };
333
334 let buffer = match column_metadata.bloom_filter_length() {
335 Some(length) => reader.get_bytes(offset, length as usize),
336 None => reader.get_bytes(offset, SBBF_HEADER_SIZE_ESTIMATE),
337 }?;
338
339 let (header, bitset_offset) =
340 chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
341
342 match header.algorithm {
343 BloomFilterAlgorithm::BLOCK(_) => {
344 }
346 }
347 match header.compression {
348 BloomFilterCompression::UNCOMPRESSED(_) => {
349 }
351 }
352 match header.hash {
353 BloomFilterHash::XXHASH(_) => {
354 }
356 }
357
358 let bitset = match column_metadata.bloom_filter_length() {
359 Some(_) => buffer.slice((bitset_offset - offset) as usize..),
360 None => {
361 let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
362 ParquetError::General("Bloom filter length is invalid".to_string())
363 })?;
364 reader.get_bytes(bitset_offset, bitset_length)?
365 }
366 };
367
368 Ok(Some(Self::new(&bitset)))
369 }
370
371 #[inline]
372 fn hash_to_block_index(&self, hash: u64) -> usize {
373 (((hash >> 32).saturating_mul(self.0.len() as u64)) >> 32) as usize
376 }
377
378 pub fn insert<T: AsBytes + ?Sized>(&mut self, value: &T) {
380 self.insert_hash(hash_as_bytes(value));
381 }
382
383 fn insert_hash(&mut self, hash: u64) {
385 let block_index = self.hash_to_block_index(hash);
386 self.0[block_index].insert(hash as u32)
387 }
388
389 pub fn check<T: AsBytes>(&self, value: &T) -> bool {
391 self.check_hash(hash_as_bytes(value))
392 }
393
394 fn check_hash(&self, hash: u64) -> bool {
398 let block_index = self.hash_to_block_index(hash);
399 self.0[block_index].check(hash as u32)
400 }
401
402 pub(crate) fn estimated_memory_size(&self) -> usize {
404 self.0.capacity() * std::mem::size_of::<Block>()
405 }
406}
407
408const SEED: u64 = 0;
410
411#[inline]
412fn hash_as_bytes<A: AsBytes + ?Sized>(value: &A) -> u64 {
413 XxHash64::oneshot(SEED, value.as_bytes())
414}
415
416#[cfg(test)]
417mod tests {
418 use super::*;
419
420 #[test]
421 fn test_hash_bytes() {
422 assert_eq!(hash_as_bytes(""), 17241709254077376921);
423 }
424
425 #[test]
426 fn test_mask_set_quick_check() {
427 for i in 0..1_000_000 {
428 let result = Block::mask(i);
429 assert!(result.0.iter().all(|&x| x.is_power_of_two()));
430 }
431 }
432
433 #[test]
434 fn test_block_insert_and_check() {
435 for i in 0..1_000_000 {
436 let mut block = Block::ZERO;
437 block.insert(i);
438 assert!(block.check(i));
439 }
440 }
441
442 #[test]
443 fn test_sbbf_insert_and_check() {
444 let mut sbbf = Sbbf(vec![Block::ZERO; 1_000]);
445 for i in 0..1_000_000 {
446 sbbf.insert(&i);
447 assert!(sbbf.check(&i));
448 }
449 }
450
451 #[test]
452 fn test_with_fixture() {
453 let bitset: &[u8] = &[
455 200, 1, 80, 20, 64, 68, 8, 109, 6, 37, 4, 67, 144, 80, 96, 32, 8, 132, 43, 33, 0, 5,
456 99, 65, 2, 0, 224, 44, 64, 78, 96, 4,
457 ];
458 let sbbf = Sbbf::new(bitset);
459 for a in 0..10i64 {
460 let value = format!("a{a}");
461 assert!(sbbf.check(&value.as_str()));
462 }
463 }
464
465 #[test]
469 fn test_bloom_filter_header_size_assumption() {
470 let buffer: &[u8; 16] = &[21, 64, 28, 28, 0, 0, 28, 28, 0, 0, 28, 28, 0, 0, 0, 99];
471 let (
472 BloomFilterHeader {
473 algorithm,
474 compression,
475 hash,
476 num_bytes,
477 },
478 read_length,
479 ) = read_bloom_filter_header_and_length(Bytes::copy_from_slice(buffer)).unwrap();
480 assert_eq!(read_length, 15);
481 assert_eq!(
482 algorithm,
483 BloomFilterAlgorithm::BLOCK(SplitBlockAlgorithm {})
484 );
485 assert_eq!(
486 compression,
487 BloomFilterCompression::UNCOMPRESSED(Uncompressed {})
488 );
489 assert_eq!(hash, BloomFilterHash::XXHASH(XxHash {}));
490 assert_eq!(num_bytes, 32_i32);
491 assert_eq!(20, SBBF_HEADER_SIZE_ESTIMATE);
492 }
493
494 #[test]
495 fn test_optimal_num_of_bytes() {
496 for (input, expected) in &[
497 (0, 32),
498 (9, 32),
499 (31, 32),
500 (32, 32),
501 (33, 64),
502 (99, 128),
503 (1024, 1024),
504 (999_000_000, 128 * 1024 * 1024),
505 ] {
506 assert_eq!(*expected, optimal_num_of_bytes(*input));
507 }
508 }
509
510 #[test]
511 fn test_num_of_bits_from_ndv_fpp() {
512 for (fpp, ndv, num_bits) in &[
513 (0.1, 10, 57),
514 (0.01, 10, 96),
515 (0.001, 10, 146),
516 (0.1, 100, 577),
517 (0.01, 100, 968),
518 (0.001, 100, 1460),
519 (0.1, 1000, 5772),
520 (0.01, 1000, 9681),
521 (0.001, 1000, 14607),
522 (0.1, 10000, 57725),
523 (0.01, 10000, 96815),
524 (0.001, 10000, 146076),
525 (0.1, 100000, 577254),
526 (0.01, 100000, 968152),
527 (0.001, 100000, 1460769),
528 (0.1, 1000000, 5772541),
529 (0.01, 1000000, 9681526),
530 (0.001, 1000000, 14607697),
531 (1e-50, 1_000_000_000_000, 14226231280773240832),
532 ] {
533 assert_eq!(*num_bits, num_of_bits_from_ndv_fpp(*ndv, *fpp) as u64);
534 }
535 }
536}