1use crate::data_type::AsBytes;
76use crate::errors::ParquetError;
77use crate::file::metadata::ColumnChunkMetaData;
78use crate::file::reader::ChunkReader;
79use crate::format::{
80 BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash, BloomFilterHeader,
81 SplitBlockAlgorithm, Uncompressed, XxHash,
82};
83use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
84use bytes::Bytes;
85use std::io::Write;
86use thrift::protocol::{TCompactOutputProtocol, TOutputProtocol};
87use twox_hash::XxHash64;
88
89const SALT: [u32; 8] = [
91 0x47b6137b_u32,
92 0x44974d91_u32,
93 0x8824ad5b_u32,
94 0xa2b7289d_u32,
95 0x705495c7_u32,
96 0x2df1424b_u32,
97 0x9efc4947_u32,
98 0x5c6bfb31_u32,
99];
100
101#[derive(Debug, Copy, Clone)]
104struct Block([u32; 8]);
105impl Block {
106 const ZERO: Block = Block([0; 8]);
107
108 fn mask(x: u32) -> Self {
111 let mut result = [0_u32; 8];
112 for i in 0..8 {
113 let y = x.wrapping_mul(SALT[i]);
115 let y = y >> 27;
116 result[i] = 1 << y;
117 }
118 Self(result)
119 }
120
121 #[inline]
122 #[cfg(target_endian = "little")]
123 fn to_le_bytes(self) -> [u8; 32] {
124 self.to_ne_bytes()
125 }
126
127 #[inline]
128 #[cfg(not(target_endian = "little"))]
129 fn to_le_bytes(self) -> [u8; 32] {
130 self.swap_bytes().to_ne_bytes()
131 }
132
133 #[inline]
134 fn to_ne_bytes(self) -> [u8; 32] {
135 unsafe { std::mem::transmute(self.0) }
137 }
138
139 #[inline]
140 #[cfg(not(target_endian = "little"))]
141 fn swap_bytes(mut self) -> Self {
142 self.0.iter_mut().for_each(|x| *x = x.swap_bytes());
143 self
144 }
145
146 fn insert(&mut self, hash: u32) {
148 let mask = Self::mask(hash);
149 for i in 0..8 {
150 self[i] |= mask[i];
151 }
152 }
153
154 fn check(&self, hash: u32) -> bool {
156 let mask = Self::mask(hash);
157 for i in 0..8 {
158 if self[i] & mask[i] == 0 {
159 return false;
160 }
161 }
162 true
163 }
164}
165
166impl std::ops::Index<usize> for Block {
167 type Output = u32;
168
169 #[inline]
170 fn index(&self, index: usize) -> &Self::Output {
171 self.0.index(index)
172 }
173}
174
175impl std::ops::IndexMut<usize> for Block {
176 #[inline]
177 fn index_mut(&mut self, index: usize) -> &mut Self::Output {
178 self.0.index_mut(index)
179 }
180}
181
182#[derive(Debug, Clone)]
187pub struct Sbbf(Vec<Block>);
188
189pub(crate) const SBBF_HEADER_SIZE_ESTIMATE: usize = 20;
190
191pub(crate) fn chunk_read_bloom_filter_header_and_offset(
194 offset: u64,
195 buffer: Bytes,
196) -> Result<(BloomFilterHeader, u64), ParquetError> {
197 let (header, length) = read_bloom_filter_header_and_length(buffer)?;
198 Ok((header, offset + length))
199}
200
201#[inline]
204pub(crate) fn read_bloom_filter_header_and_length(
205 buffer: Bytes,
206) -> Result<(BloomFilterHeader, u64), ParquetError> {
207 let total_length = buffer.len();
208 let mut prot = TCompactSliceInputProtocol::new(buffer.as_ref());
209 let header = BloomFilterHeader::read_from_in_protocol(&mut prot)
210 .map_err(|e| ParquetError::General(format!("Could not read bloom filter header: {e}")))?;
211 Ok((header, (total_length - prot.as_slice().len()) as u64))
212}
213
214pub(crate) const BITSET_MIN_LENGTH: usize = 32;
215pub(crate) const BITSET_MAX_LENGTH: usize = 128 * 1024 * 1024;
216
217#[inline]
218fn optimal_num_of_bytes(num_bytes: usize) -> usize {
219 let num_bytes = num_bytes.min(BITSET_MAX_LENGTH);
220 let num_bytes = num_bytes.max(BITSET_MIN_LENGTH);
221 num_bytes.next_power_of_two()
222}
223
224#[inline]
229fn num_of_bits_from_ndv_fpp(ndv: u64, fpp: f64) -> usize {
230 let num_bits = -8.0 * ndv as f64 / (1.0 - fpp.powf(1.0 / 8.0)).ln();
231 num_bits as usize
232}
233
234impl Sbbf {
235 pub(crate) fn new_with_ndv_fpp(ndv: u64, fpp: f64) -> Result<Self, ParquetError> {
238 if !(0.0..1.0).contains(&fpp) {
239 return Err(ParquetError::General(format!(
240 "False positive probability must be between 0.0 and 1.0, got {fpp}"
241 )));
242 }
243 let num_bits = num_of_bits_from_ndv_fpp(ndv, fpp);
244 Ok(Self::new_with_num_of_bytes(num_bits / 8))
245 }
246
247 pub(crate) fn new_with_num_of_bytes(num_bytes: usize) -> Self {
250 let num_bytes = optimal_num_of_bytes(num_bytes);
251 let bitset = vec![0_u8; num_bytes];
252 Self::new(&bitset)
253 }
254
255 pub(crate) fn new(bitset: &[u8]) -> Self {
256 let data = bitset
257 .chunks_exact(4 * 8)
258 .map(|chunk| {
259 let mut block = Block::ZERO;
260 for (i, word) in chunk.chunks_exact(4).enumerate() {
261 block[i] = u32::from_le_bytes(word.try_into().unwrap());
262 }
263 block
264 })
265 .collect::<Vec<Block>>();
266 Self(data)
267 }
268
269 pub(crate) fn write<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
273 let mut protocol = TCompactOutputProtocol::new(&mut writer);
274 let header = self.header();
275 header.write_to_out_protocol(&mut protocol).map_err(|e| {
276 ParquetError::General(format!("Could not write bloom filter header: {e}"))
277 })?;
278 protocol.flush()?;
279 self.write_bitset(&mut writer)?;
280 Ok(())
281 }
282
283 fn write_bitset<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
285 for block in &self.0 {
286 writer
287 .write_all(block.to_le_bytes().as_slice())
288 .map_err(|e| {
289 ParquetError::General(format!("Could not write bloom filter bit set: {e}"))
290 })?;
291 }
292 Ok(())
293 }
294
295 fn header(&self) -> BloomFilterHeader {
297 BloomFilterHeader {
298 num_bytes: self.0.len() as i32 * 4 * 8,
300 algorithm: BloomFilterAlgorithm::BLOCK(SplitBlockAlgorithm {}),
301 hash: BloomFilterHash::XXHASH(XxHash {}),
302 compression: BloomFilterCompression::UNCOMPRESSED(Uncompressed {}),
303 }
304 }
305
306 pub(crate) fn read_from_column_chunk<R: ChunkReader>(
308 column_metadata: &ColumnChunkMetaData,
309 reader: &R,
310 ) -> Result<Option<Self>, ParquetError> {
311 let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
312 offset
313 .try_into()
314 .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
315 } else {
316 return Ok(None);
317 };
318
319 let buffer = match column_metadata.bloom_filter_length() {
320 Some(length) => reader.get_bytes(offset, length as usize),
321 None => reader.get_bytes(offset, SBBF_HEADER_SIZE_ESTIMATE),
322 }?;
323
324 let (header, bitset_offset) =
325 chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
326
327 match header.algorithm {
328 BloomFilterAlgorithm::BLOCK(_) => {
329 }
331 }
332 match header.compression {
333 BloomFilterCompression::UNCOMPRESSED(_) => {
334 }
336 }
337 match header.hash {
338 BloomFilterHash::XXHASH(_) => {
339 }
341 }
342
343 let bitset = match column_metadata.bloom_filter_length() {
344 Some(_) => buffer.slice((bitset_offset - offset) as usize..),
345 None => {
346 let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
347 ParquetError::General("Bloom filter length is invalid".to_string())
348 })?;
349 reader.get_bytes(bitset_offset, bitset_length)?
350 }
351 };
352
353 Ok(Some(Self::new(&bitset)))
354 }
355
356 #[inline]
357 fn hash_to_block_index(&self, hash: u64) -> usize {
358 (((hash >> 32).saturating_mul(self.0.len() as u64)) >> 32) as usize
361 }
362
363 pub fn insert<T: AsBytes + ?Sized>(&mut self, value: &T) {
365 self.insert_hash(hash_as_bytes(value));
366 }
367
368 fn insert_hash(&mut self, hash: u64) {
370 let block_index = self.hash_to_block_index(hash);
371 self.0[block_index].insert(hash as u32)
372 }
373
374 pub fn check<T: AsBytes>(&self, value: &T) -> bool {
376 self.check_hash(hash_as_bytes(value))
377 }
378
379 fn check_hash(&self, hash: u64) -> bool {
383 let block_index = self.hash_to_block_index(hash);
384 self.0[block_index].check(hash as u32)
385 }
386
387 pub(crate) fn estimated_memory_size(&self) -> usize {
389 self.0.capacity() * std::mem::size_of::<Block>()
390 }
391}
392
393const SEED: u64 = 0;
395
396#[inline]
397fn hash_as_bytes<A: AsBytes + ?Sized>(value: &A) -> u64 {
398 XxHash64::oneshot(SEED, value.as_bytes())
399}
400
401#[cfg(test)]
402mod tests {
403 use super::*;
404
405 #[test]
406 fn test_hash_bytes() {
407 assert_eq!(hash_as_bytes(""), 17241709254077376921);
408 }
409
410 #[test]
411 fn test_mask_set_quick_check() {
412 for i in 0..1_000_000 {
413 let result = Block::mask(i);
414 assert!(result.0.iter().all(|&x| x.is_power_of_two()));
415 }
416 }
417
418 #[test]
419 fn test_block_insert_and_check() {
420 for i in 0..1_000_000 {
421 let mut block = Block::ZERO;
422 block.insert(i);
423 assert!(block.check(i));
424 }
425 }
426
427 #[test]
428 fn test_sbbf_insert_and_check() {
429 let mut sbbf = Sbbf(vec![Block::ZERO; 1_000]);
430 for i in 0..1_000_000 {
431 sbbf.insert(&i);
432 assert!(sbbf.check(&i));
433 }
434 }
435
436 #[test]
437 fn test_with_fixture() {
438 let bitset: &[u8] = &[
440 200, 1, 80, 20, 64, 68, 8, 109, 6, 37, 4, 67, 144, 80, 96, 32, 8, 132, 43, 33, 0, 5,
441 99, 65, 2, 0, 224, 44, 64, 78, 96, 4,
442 ];
443 let sbbf = Sbbf::new(bitset);
444 for a in 0..10i64 {
445 let value = format!("a{a}");
446 assert!(sbbf.check(&value.as_str()));
447 }
448 }
449
450 #[test]
454 fn test_bloom_filter_header_size_assumption() {
455 let buffer: &[u8; 16] = &[21, 64, 28, 28, 0, 0, 28, 28, 0, 0, 28, 28, 0, 0, 0, 99];
456 let (
457 BloomFilterHeader {
458 algorithm,
459 compression,
460 hash,
461 num_bytes,
462 },
463 read_length,
464 ) = read_bloom_filter_header_and_length(Bytes::copy_from_slice(buffer)).unwrap();
465 assert_eq!(read_length, 15);
466 assert_eq!(
467 algorithm,
468 BloomFilterAlgorithm::BLOCK(SplitBlockAlgorithm {})
469 );
470 assert_eq!(
471 compression,
472 BloomFilterCompression::UNCOMPRESSED(Uncompressed {})
473 );
474 assert_eq!(hash, BloomFilterHash::XXHASH(XxHash {}));
475 assert_eq!(num_bytes, 32_i32);
476 assert_eq!(20, SBBF_HEADER_SIZE_ESTIMATE);
477 }
478
479 #[test]
480 fn test_optimal_num_of_bytes() {
481 for (input, expected) in &[
482 (0, 32),
483 (9, 32),
484 (31, 32),
485 (32, 32),
486 (33, 64),
487 (99, 128),
488 (1024, 1024),
489 (999_000_000, 128 * 1024 * 1024),
490 ] {
491 assert_eq!(*expected, optimal_num_of_bytes(*input));
492 }
493 }
494
495 #[test]
496 fn test_num_of_bits_from_ndv_fpp() {
497 for (fpp, ndv, num_bits) in &[
498 (0.1, 10, 57),
499 (0.01, 10, 96),
500 (0.001, 10, 146),
501 (0.1, 100, 577),
502 (0.01, 100, 968),
503 (0.001, 100, 1460),
504 (0.1, 1000, 5772),
505 (0.01, 1000, 9681),
506 (0.001, 1000, 14607),
507 (0.1, 10000, 57725),
508 (0.01, 10000, 96815),
509 (0.001, 10000, 146076),
510 (0.1, 100000, 577254),
511 (0.01, 100000, 968152),
512 (0.001, 100000, 1460769),
513 (0.1, 1000000, 5772541),
514 (0.01, 1000000, 9681526),
515 (0.001, 1000000, 14607697),
516 (1e-50, 1_000_000_000_000, 14226231280773240832),
517 ] {
518 assert_eq!(*num_bits, num_of_bits_from_ndv_fpp(*ndv, *fpp) as u64);
519 }
520 }
521}