1
1
extern crate bloomfilter;
2
2
#[ macro_use]
3
3
extern crate rustler;
4
+ extern crate siphasher;
4
5
5
6
use bloomfilter:: Bloom ;
6
- use rustler:: { Encoder , Env , NifResult , Term , OwnedBinary , Binary } ;
7
7
use rustler:: resource:: ResourceArc ;
8
- use std:: sync:: RwLock ;
8
+ use rustler:: { Binary , Encoder , Env , NifResult , OwnedBinary , Term } ;
9
+ use siphasher:: sip:: SipHasher13 ;
10
+ use std:: hash:: Hash ;
11
+ use std:: hash:: Hasher ;
9
12
use std:: io:: Write ;
13
+ use std:: sync:: RwLock ;
10
14
11
15
mod atoms {
12
16
rustler_atoms ! {
@@ -15,7 +19,7 @@ mod atoms {
15
19
}
16
20
17
21
struct FilterResource {
18
- filter : RwLock < Bloom < Vec < u8 > > >
22
+ filter : RwLock < Bloom < [ u8 ] > > ,
19
23
}
20
24
21
25
rustler_export_nifs ! (
@@ -26,7 +30,8 @@ rustler_export_nifs!(
26
30
( "serialize" , 1 , serialize) ,
27
31
( "deserialize" , 7 , deserialize) ,
28
32
( "set" , 2 , set) ,
29
- ( "check" , 2 , check) ,
33
+ ( "check_nif" , 2 , check) ,
34
+ ( "check_nif" , 8 , check_ro) ,
30
35
( "check_and_set" , 2 , check_and_set) ,
31
36
( "clear" , 1 , clear) ,
32
37
] ,
@@ -43,9 +48,7 @@ fn new<'a>(env: Env<'a>, args: &[Term<'a>]) -> NifResult<Term<'a>> {
43
48
let items_count: i64 = args[ 1 ] . decode ( ) ?;
44
49
45
50
let resource = ResourceArc :: new ( FilterResource {
46
- filter : RwLock :: new (
47
- Bloom :: new ( bitmap_size as usize , items_count as usize )
48
- )
51
+ filter : RwLock :: new ( Bloom :: new ( bitmap_size as usize , items_count as usize ) ) ,
49
52
} ) ;
50
53
51
54
Ok ( ( atoms:: ok ( ) , resource. encode ( env) ) . encode ( env) )
@@ -56,15 +59,12 @@ fn new_for_fp_rate<'a>(env: Env<'a>, args: &[Term<'a>]) -> NifResult<Term<'a>> {
56
59
let fp_p: f64 = args[ 1 ] . decode ( ) ?;
57
60
58
61
let resource = ResourceArc :: new ( FilterResource {
59
- filter : RwLock :: new (
60
- Bloom :: new_for_fp_rate ( items_count as usize , fp_p)
61
- )
62
+ filter : RwLock :: new ( Bloom :: new_for_fp_rate ( items_count as usize , fp_p) ) ,
62
63
} ) ;
63
64
64
65
Ok ( ( atoms:: ok ( ) , resource) . encode ( env) )
65
66
}
66
67
67
-
68
68
fn serialize < ' a > ( env : Env < ' a > , args : & [ Term < ' a > ] ) -> NifResult < Term < ' a > > {
69
69
let resource: ResourceArc < FilterResource > = args[ 0 ] . decode ( ) ?;
70
70
@@ -74,11 +74,17 @@ fn serialize<'a>(env: Env<'a>, args: &[Term<'a>]) -> NifResult<Term<'a>> {
74
74
let mut binary = OwnedBinary :: new ( bitmap. len ( ) ) . unwrap ( ) ;
75
75
binary. as_mut_slice ( ) . write_all ( & bitmap) . unwrap ( ) ;
76
76
77
- Ok ( ( atoms:: ok ( ) , ( Binary :: from_owned ( binary, env) ,
78
- filter. number_of_bits ( ) ,
79
- filter. number_of_hash_functions ( ) ,
80
- sips[ 0 ] ,
81
- sips[ 1 ] ) ) . encode ( env) )
77
+ Ok ( (
78
+ atoms:: ok ( ) ,
79
+ (
80
+ Binary :: from_owned ( binary, env) ,
81
+ filter. number_of_bits ( ) ,
82
+ filter. number_of_hash_functions ( ) ,
83
+ sips[ 0 ] ,
84
+ sips[ 1 ] ,
85
+ ) ,
86
+ )
87
+ . encode ( env) )
82
88
}
83
89
84
90
fn deserialize < ' a > ( env : Env < ' a > , args : & [ Term < ' a > ] ) -> NifResult < Term < ' a > > {
@@ -96,15 +102,19 @@ fn deserialize<'a>(env: Env<'a>, args: &[Term<'a>]) -> NifResult<Term<'a>> {
96
102
num_bits,
97
103
num_funs,
98
104
[ ( sip00, sip01) , ( sip10, sip11) ] ,
99
- ) )
105
+ ) ) ,
100
106
} ) ;
101
107
102
108
Ok ( ( atoms:: ok ( ) , resource) . encode ( env) )
103
109
}
104
110
105
111
fn set < ' a > ( env : Env < ' a > , args : & [ Term < ' a > ] ) -> NifResult < Term < ' a > > {
106
112
let resource: ResourceArc < FilterResource > = args[ 0 ] . decode ( ) ?;
107
- let key: Vec < u8 > = args[ 1 ] . decode ( ) ?;
113
+ let key: Binary = if args[ 1 ] . is_binary ( ) {
114
+ args[ 1 ] . decode ( ) ?
115
+ } else {
116
+ Binary :: from_owned ( args[ 1 ] . to_binary ( ) , env)
117
+ } ;
108
118
109
119
let mut filter = resource. filter . write ( ) . unwrap ( ) ;
110
120
( * filter) . set ( & key) ;
@@ -114,16 +124,72 @@ fn set<'a>(env: Env<'a>, args: &[Term<'a>]) -> NifResult<Term<'a>> {
114
124
115
125
fn check < ' a > ( env : Env < ' a > , args : & [ Term < ' a > ] ) -> NifResult < Term < ' a > > {
116
126
let resource: ResourceArc < FilterResource > = args[ 0 ] . decode ( ) ?;
117
- let key: Vec < u8 > = args[ 1 ] . decode ( ) ?;
127
+ let key: Binary = if args[ 1 ] . is_binary ( ) {
128
+ args[ 1 ] . decode ( ) ?
129
+ } else {
130
+ Binary :: from_owned ( args[ 1 ] . to_binary ( ) , env)
131
+ } ;
118
132
119
133
let filter = resource. filter . read ( ) . unwrap ( ) ;
120
134
121
135
Ok ( filter. check ( & key) . encode ( env) )
122
136
}
123
137
138
+ // check a serialized bloom for key membership without fully deserializing the bloom
139
+ // specifically we want to avoid the very slow bitvec deserialization and simply compute
140
+ // the hash keys manually and check them inside the Erlang binary by hand
141
+ // for a 50mb bloom, this improves checking a serialized bloom from 25 seconds to 35 microseconds
142
+ fn check_ro < ' a > ( env : Env < ' a > , args : & [ Term < ' a > ] ) -> NifResult < Term < ' a > > {
143
+ let bitmap: Binary = args[ 0 ] . decode ( ) ?;
144
+ let num_bits: u64 = args[ 1 ] . decode ( ) ?;
145
+ let num_funs: u32 = args[ 2 ] . decode ( ) ?;
146
+ let sip00: u64 = args[ 3 ] . decode ( ) ?;
147
+ let sip01: u64 = args[ 4 ] . decode ( ) ?;
148
+ let sip10: u64 = args[ 5 ] . decode ( ) ?;
149
+ let sip11: u64 = args[ 6 ] . decode ( ) ?;
150
+ let key: Binary = if args[ 7 ] . is_binary ( ) {
151
+ args[ 7 ] . decode ( ) ?
152
+ } else {
153
+ Binary :: from_owned ( args[ 7 ] . to_binary ( ) , env)
154
+ } ;
155
+
156
+ let sips = [
157
+ SipHasher13 :: new_with_keys ( sip00, sip01) ,
158
+ SipHasher13 :: new_with_keys ( sip10, sip11) ,
159
+ ] ;
160
+
161
+ let mut hashes = [ 0u64 , 0u64 ] ;
162
+ for k_i in 0 ..num_funs {
163
+ let bit_offset = ( bloom_hash ( & mut hashes, & key, k_i, & sips) % num_bits) as usize ;
164
+ let byte_offset = bit_offset / 8 ;
165
+ let bit = 7 - ( bit_offset % 8 ) ;
166
+ if ( bitmap[ byte_offset] >> bit) & 1 != 1 {
167
+ return Ok ( false . encode ( env) ) ;
168
+ }
169
+ }
170
+ Ok ( true . encode ( env) )
171
+ }
172
+
173
+ // helper for check_ro, extracted from the bloom crate source code
174
+ fn bloom_hash ( hashes : & mut [ u64 ; 2 ] , item : & [ u8 ] , k_i : u32 , sips : & [ SipHasher13 ; 2 ] ) -> u64 {
175
+ if k_i < 2 {
176
+ let mut sip = sips[ k_i as usize ] ;
177
+ item. hash ( & mut sip) ;
178
+ let hash = sip. finish ( ) ;
179
+ hashes[ k_i as usize ] = hash;
180
+ hash
181
+ } else {
182
+ hashes[ 0 ] . wrapping_add ( ( u64:: from ( k_i) ) . wrapping_mul ( hashes[ 1 ] ) % 0xffff_ffff_ffff_ffc5 )
183
+ }
184
+ }
185
+
124
186
fn check_and_set < ' a > ( env : Env < ' a > , args : & [ Term < ' a > ] ) -> NifResult < Term < ' a > > {
125
187
let resource: ResourceArc < FilterResource > = args[ 0 ] . decode ( ) ?;
126
- let key: Vec < u8 > = args[ 1 ] . decode ( ) ?;
188
+ let key: Binary = if args[ 1 ] . is_binary ( ) {
189
+ args[ 1 ] . decode ( ) ?
190
+ } else {
191
+ Binary :: from_owned ( args[ 1 ] . to_binary ( ) , env)
192
+ } ;
127
193
128
194
let mut filter = resource. filter . write ( ) . unwrap ( ) ;
129
195
0 commit comments