1use crate::data_type::AsBytes;
76use crate::errors::ParquetError;
77use crate::file::metadata::ColumnChunkMetaData;
78use crate::file::reader::ChunkReader;
79use crate::format::{
80 BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash, BloomFilterHeader,
81 SplitBlockAlgorithm, Uncompressed, XxHash,
82};
83use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
84use bytes::Bytes;
85use std::hash::Hasher;
86use std::io::Write;
87use std::sync::Arc;
88use thrift::protocol::{TCompactOutputProtocol, TOutputProtocol};
89use twox_hash::XxHash64;
90
91const SALT: [u32; 8] = [
93 0x47b6137b_u32,
94 0x44974d91_u32,
95 0x8824ad5b_u32,
96 0xa2b7289d_u32,
97 0x705495c7_u32,
98 0x2df1424b_u32,
99 0x9efc4947_u32,
100 0x5c6bfb31_u32,
101];
102
103#[derive(Debug, Copy, Clone)]
106struct Block([u32; 8]);
107impl Block {
108 const ZERO: Block = Block([0; 8]);
109
110 fn mask(x: u32) -> Self {
113 let mut result = [0_u32; 8];
114 for i in 0..8 {
115 let y = x.wrapping_mul(SALT[i]);
117 let y = y >> 27;
118 result[i] = 1 << y;
119 }
120 Self(result)
121 }
122
123 #[inline]
124 #[cfg(target_endian = "little")]
125 fn to_le_bytes(self) -> [u8; 32] {
126 self.to_ne_bytes()
127 }
128
129 #[inline]
130 #[cfg(not(target_endian = "little"))]
131 fn to_le_bytes(self) -> [u8; 32] {
132 self.swap_bytes().to_ne_bytes()
133 }
134
135 #[inline]
136 fn to_ne_bytes(self) -> [u8; 32] {
137 unsafe { std::mem::transmute(self.0) }
139 }
140
141 #[inline]
142 #[cfg(not(target_endian = "little"))]
143 fn swap_bytes(mut self) -> Self {
144 self.0.iter_mut().for_each(|x| *x = x.swap_bytes());
145 self
146 }
147
148 fn insert(&mut self, hash: u32) {
150 let mask = Self::mask(hash);
151 for i in 0..8 {
152 self[i] |= mask[i];
153 }
154 }
155
156 fn check(&self, hash: u32) -> bool {
158 let mask = Self::mask(hash);
159 for i in 0..8 {
160 if self[i] & mask[i] == 0 {
161 return false;
162 }
163 }
164 true
165 }
166}
167
168impl std::ops::Index<usize> for Block {
169 type Output = u32;
170
171 #[inline]
172 fn index(&self, index: usize) -> &Self::Output {
173 self.0.index(index)
174 }
175}
176
177impl std::ops::IndexMut<usize> for Block {
178 #[inline]
179 fn index_mut(&mut self, index: usize) -> &mut Self::Output {
180 self.0.index_mut(index)
181 }
182}
183
184#[derive(Debug, Clone)]
189pub struct Sbbf(Vec<Block>);
190
191pub(crate) const SBBF_HEADER_SIZE_ESTIMATE: usize = 20;
192
193pub(crate) fn chunk_read_bloom_filter_header_and_offset(
196 offset: u64,
197 buffer: Bytes,
198) -> Result<(BloomFilterHeader, u64), ParquetError> {
199 let (header, length) = read_bloom_filter_header_and_length(buffer)?;
200 Ok((header, offset + length))
201}
202
203#[inline]
206pub(crate) fn read_bloom_filter_header_and_length(
207 buffer: Bytes,
208) -> Result<(BloomFilterHeader, u64), ParquetError> {
209 let total_length = buffer.len();
210 let mut prot = TCompactSliceInputProtocol::new(buffer.as_ref());
211 let header = BloomFilterHeader::read_from_in_protocol(&mut prot)
212 .map_err(|e| ParquetError::General(format!("Could not read bloom filter header: {e}")))?;
213 Ok((header, (total_length - prot.as_slice().len()) as u64))
214}
215
216pub(crate) const BITSET_MIN_LENGTH: usize = 32;
217pub(crate) const BITSET_MAX_LENGTH: usize = 128 * 1024 * 1024;
218
219#[inline]
220fn optimal_num_of_bytes(num_bytes: usize) -> usize {
221 let num_bytes = num_bytes.min(BITSET_MAX_LENGTH);
222 let num_bytes = num_bytes.max(BITSET_MIN_LENGTH);
223 num_bytes.next_power_of_two()
224}
225
226#[inline]
231fn num_of_bits_from_ndv_fpp(ndv: u64, fpp: f64) -> usize {
232 let num_bits = -8.0 * ndv as f64 / (1.0 - fpp.powf(1.0 / 8.0)).ln();
233 num_bits as usize
234}
235
236impl Sbbf {
237 pub(crate) fn new_with_ndv_fpp(ndv: u64, fpp: f64) -> Result<Self, ParquetError> {
240 if !(0.0..1.0).contains(&fpp) {
241 return Err(ParquetError::General(format!(
242 "False positive probability must be between 0.0 and 1.0, got {fpp}"
243 )));
244 }
245 let num_bits = num_of_bits_from_ndv_fpp(ndv, fpp);
246 Ok(Self::new_with_num_of_bytes(num_bits / 8))
247 }
248
249 pub(crate) fn new_with_num_of_bytes(num_bytes: usize) -> Self {
252 let num_bytes = optimal_num_of_bytes(num_bytes);
253 let bitset = vec![0_u8; num_bytes];
254 Self::new(&bitset)
255 }
256
257 pub(crate) fn new(bitset: &[u8]) -> Self {
258 let data = bitset
259 .chunks_exact(4 * 8)
260 .map(|chunk| {
261 let mut block = Block::ZERO;
262 for (i, word) in chunk.chunks_exact(4).enumerate() {
263 block[i] = u32::from_le_bytes(word.try_into().unwrap());
264 }
265 block
266 })
267 .collect::<Vec<Block>>();
268 Self(data)
269 }
270
271 pub(crate) fn write<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
275 let mut protocol = TCompactOutputProtocol::new(&mut writer);
276 let header = self.header();
277 header.write_to_out_protocol(&mut protocol).map_err(|e| {
278 ParquetError::General(format!("Could not write bloom filter header: {e}"))
279 })?;
280 protocol.flush()?;
281 self.write_bitset(&mut writer)?;
282 Ok(())
283 }
284
285 fn write_bitset<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
287 for block in &self.0 {
288 writer
289 .write_all(block.to_le_bytes().as_slice())
290 .map_err(|e| {
291 ParquetError::General(format!("Could not write bloom filter bit set: {e}"))
292 })?;
293 }
294 Ok(())
295 }
296
297 fn header(&self) -> BloomFilterHeader {
299 BloomFilterHeader {
300 num_bytes: self.0.len() as i32 * 4 * 8,
302 algorithm: BloomFilterAlgorithm::BLOCK(SplitBlockAlgorithm {}),
303 hash: BloomFilterHash::XXHASH(XxHash {}),
304 compression: BloomFilterCompression::UNCOMPRESSED(Uncompressed {}),
305 }
306 }
307
308 pub(crate) fn read_from_column_chunk<R: ChunkReader>(
310 column_metadata: &ColumnChunkMetaData,
311 reader: Arc<R>,
312 ) -> Result<Option<Self>, ParquetError> {
313 let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
314 offset
315 .try_into()
316 .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
317 } else {
318 return Ok(None);
319 };
320
321 let buffer = match column_metadata.bloom_filter_length() {
322 Some(length) => reader.get_bytes(offset, length as usize),
323 None => reader.get_bytes(offset, SBBF_HEADER_SIZE_ESTIMATE),
324 }?;
325
326 let (header, bitset_offset) =
327 chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
328
329 match header.algorithm {
330 BloomFilterAlgorithm::BLOCK(_) => {
331 }
333 }
334 match header.compression {
335 BloomFilterCompression::UNCOMPRESSED(_) => {
336 }
338 }
339 match header.hash {
340 BloomFilterHash::XXHASH(_) => {
341 }
343 }
344
345 let bitset = match column_metadata.bloom_filter_length() {
346 Some(_) => buffer.slice((bitset_offset - offset) as usize..),
347 None => {
348 let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
349 ParquetError::General("Bloom filter length is invalid".to_string())
350 })?;
351 reader.get_bytes(bitset_offset, bitset_length)?
352 }
353 };
354
355 Ok(Some(Self::new(&bitset)))
356 }
357
358 #[inline]
359 fn hash_to_block_index(&self, hash: u64) -> usize {
360 (((hash >> 32).saturating_mul(self.0.len() as u64)) >> 32) as usize
363 }
364
365 pub fn insert<T: AsBytes + ?Sized>(&mut self, value: &T) {
367 self.insert_hash(hash_as_bytes(value));
368 }
369
370 fn insert_hash(&mut self, hash: u64) {
372 let block_index = self.hash_to_block_index(hash);
373 self.0[block_index].insert(hash as u32)
374 }
375
376 pub fn check<T: AsBytes>(&self, value: &T) -> bool {
378 self.check_hash(hash_as_bytes(value))
379 }
380
381 fn check_hash(&self, hash: u64) -> bool {
385 let block_index = self.hash_to_block_index(hash);
386 self.0[block_index].check(hash as u32)
387 }
388
389 pub(crate) fn estimated_memory_size(&self) -> usize {
391 self.0.capacity() * std::mem::size_of::<Block>()
392 }
393}
394
395const SEED: u64 = 0;
397
398#[inline]
399fn hash_as_bytes<A: AsBytes + ?Sized>(value: &A) -> u64 {
400 let mut hasher = XxHash64::with_seed(SEED);
401 hasher.write(value.as_bytes());
402 hasher.finish()
403}
404
405#[cfg(test)]
406mod tests {
407 use super::*;
408
409 #[test]
410 fn test_hash_bytes() {
411 assert_eq!(hash_as_bytes(""), 17241709254077376921);
412 }
413
414 #[test]
415 fn test_mask_set_quick_check() {
416 for i in 0..1_000_000 {
417 let result = Block::mask(i);
418 assert!(result.0.iter().all(|&x| x.is_power_of_two()));
419 }
420 }
421
422 #[test]
423 fn test_block_insert_and_check() {
424 for i in 0..1_000_000 {
425 let mut block = Block::ZERO;
426 block.insert(i);
427 assert!(block.check(i));
428 }
429 }
430
431 #[test]
432 fn test_sbbf_insert_and_check() {
433 let mut sbbf = Sbbf(vec![Block::ZERO; 1_000]);
434 for i in 0..1_000_000 {
435 sbbf.insert(&i);
436 assert!(sbbf.check(&i));
437 }
438 }
439
440 #[test]
441 fn test_with_fixture() {
442 let bitset: &[u8] = &[
444 200, 1, 80, 20, 64, 68, 8, 109, 6, 37, 4, 67, 144, 80, 96, 32, 8, 132, 43, 33, 0, 5,
445 99, 65, 2, 0, 224, 44, 64, 78, 96, 4,
446 ];
447 let sbbf = Sbbf::new(bitset);
448 for a in 0..10i64 {
449 let value = format!("a{a}");
450 assert!(sbbf.check(&value.as_str()));
451 }
452 }
453
454 #[test]
458 fn test_bloom_filter_header_size_assumption() {
459 let buffer: &[u8; 16] = &[21, 64, 28, 28, 0, 0, 28, 28, 0, 0, 28, 28, 0, 0, 0, 99];
460 let (
461 BloomFilterHeader {
462 algorithm,
463 compression,
464 hash,
465 num_bytes,
466 },
467 read_length,
468 ) = read_bloom_filter_header_and_length(Bytes::copy_from_slice(buffer)).unwrap();
469 assert_eq!(read_length, 15);
470 assert_eq!(
471 algorithm,
472 BloomFilterAlgorithm::BLOCK(SplitBlockAlgorithm {})
473 );
474 assert_eq!(
475 compression,
476 BloomFilterCompression::UNCOMPRESSED(Uncompressed {})
477 );
478 assert_eq!(hash, BloomFilterHash::XXHASH(XxHash {}));
479 assert_eq!(num_bytes, 32_i32);
480 assert_eq!(20, SBBF_HEADER_SIZE_ESTIMATE);
481 }
482
483 #[test]
484 fn test_optimal_num_of_bytes() {
485 for (input, expected) in &[
486 (0, 32),
487 (9, 32),
488 (31, 32),
489 (32, 32),
490 (33, 64),
491 (99, 128),
492 (1024, 1024),
493 (999_000_000, 128 * 1024 * 1024),
494 ] {
495 assert_eq!(*expected, optimal_num_of_bytes(*input));
496 }
497 }
498
499 #[test]
500 fn test_num_of_bits_from_ndv_fpp() {
501 for (fpp, ndv, num_bits) in &[
502 (0.1, 10, 57),
503 (0.01, 10, 96),
504 (0.001, 10, 146),
505 (0.1, 100, 577),
506 (0.01, 100, 968),
507 (0.001, 100, 1460),
508 (0.1, 1000, 5772),
509 (0.01, 1000, 9681),
510 (0.001, 1000, 14607),
511 (0.1, 10000, 57725),
512 (0.01, 10000, 96815),
513 (0.001, 10000, 146076),
514 (0.1, 100000, 577254),
515 (0.01, 100000, 968152),
516 (0.001, 100000, 1460769),
517 (0.1, 1000000, 5772541),
518 (0.01, 1000000, 9681526),
519 (0.001, 1000000, 14607697),
520 (1e-50, 1_000_000_000_000, 14226231280773240832),
521 ] {
522 assert_eq!(*num_bits, num_of_bits_from_ndv_fpp(*ndv, *fpp) as u64);
523 }
524 }
525}