数据结构-布隆过滤器和可逆布隆过滤器
在解决缓存穿透问题时,往往会用到一种高效的数据结构-布隆过滤器,其能够快速过滤掉不存在的非法请求,但其也存在一定的误差,即少量不存在的请求也会被放过去。本文对布隆过滤器家族进行介绍,除了常见的普通布隆过滤器(Bloom Filters),还有计数布隆过滤器(Couting Bloom Filters)和可逆布隆过滤器(Invertible Bloom Filters)。
普通布隆过滤器
基本原理
普通的布隆过滤器原理就是基于位图和哈希函数,通过多个哈希函数依次将元素哈希到位图的某个位图的某个位置,然后将该位置的值赋值为1。如果要判断一个元素是否存在,也是一个哈希函数一个哈希函数来,然后看位图中的对应位置是否为1,如果为零,则说明该元素一定不存在,如果各个哈希函数对应位图中的位置都是1,只能说明该元素可能存在。可以看到,布隆过滤器可以过滤掉一定不存在的元素,但是对于去重任务存在一定的局限性。从概率上可以缓解这种局限性,一方面是增大位图的大小,另一方面是选用好的哈希函数,能够均匀地将各个元素哈希到位图的不同位置。此外,普通的布隆过滤器不能支持删除操作,也无法还原出原来的元素。
代码实现
首先定义一个过滤器的接口1,显然,对于普通的布隆过滤器无法支持后面两种操作。
type Filter[T Hashable] interface {
Add(elem T) // 增加一个元素
Search(elem T) bool // 查找一个元素是否存在
Delete(elem T) (bool, error) // 删除一个元素,成功则返回true
List() ([]T, error) // 列出当前所有剩余元素
}
接着来看普通布隆过滤器实现代码
type BloomFilter[T Hashable] struct {
bitMap []uint8
size uint64
}
func NewBloomFilter[T Hashable](size uint64) Filter[T] {
count := size >> 3
if (size & 7) != 0 {
count++
}
return &BloomFilter[T]{size: count << 3}
}
func (f *BloomFilter[T]) Add(elem T) {
if f.bitMap == nil {
f.bitMap = make([]uint8, f.size>>3)
}
elemHash := elem.Hash()
for _, hash := range hashFuncs {
hashBytes := hash(elemHash)
hashVal := LE2Int64(hashBytes[:8])
pos := hashVal % f.size
f.bitMap[(pos >> 3)] |= (1 << (pos & 7))
}
}
func (f *BloomFilter[T]) Search(elem T) bool {
if f.bitMap == nil {
return false
}
elemHash := elem.Hash()
for _, hash := range hashFuncs {
hashBytes := hash(elemHash)
hashVal := LE2Int64(hashBytes[:8])
pos := hashVal % f.size
if (f.bitMap[(pos>>3)] & (1 << (pos & 7))) == 0 {
return false
}
}
return true
}
func (f *BloomFilter[T]) Delete(elem T) (bool, error) {
return false, ErrFunctionUnSupported
}
func (f *BloomFilter[T]) List() ([]T, error) {
return nil, ErrFunctionUnSupported
}
单元测试代码如下
func TestBloomFilter(t *testing.T) {
filter := NewBloomFilter[String](128)
filter.Add(StringPacket("Alice"))
filter.Add(StringPacket("Bob"))
filter.Add(StringPacket("Clone"))
filter.Add(StringPacket("Dawe"))
filter.Add(StringPacket("Eleps"))
ret := filter.Search(StringPacket("Dawe"))
Assert(t, true, ret, "BloomFilter(Dawe)")
ret = filter.Search(StringPacket("Helen"))
Assert(t, false, ret, "BloomFilter(Helen)")
}
测试截图如下:
计数布隆过滤器
基本原理
为了解决普通布隆过滤器无法解决删除元素的问题,计数布隆过滤器被提出来。正如其名,这个时候每一个位置上不再是一个比特位,而是一个计数器,用一个整数来记录哈希到这个位置的次数。这样,在删除的时候,也可以用每一个哈希函数依次哈希,将对应位置减一即可,在增加一个元素的时候,则是将哈希到的位置的值加一。这样就支持了删除操作,但其实还有误删的情况,因为没有办法确定一个元素是否在布隆过滤器中。而且这种方法有点背离了布隆过滤器的初衷——以极少的内存空间判断海量数据是否存在的问题。
代码实现
首先看计数布隆过滤器的实现
type CountBloomFilter[T Hashable] struct {
bitMap []uint64
size uint64
}
func NewCountBloomFilter[T Hashable](size uint64) Filter[T] {
return &CountBloomFilter[T]{size: size}
}
func (f *CountBloomFilter[T]) Add(elem T) {
if f.bitMap == nil {
f.bitMap = make([]uint64, f.size)
}
elemHash := elem.Hash()
for _, hash := range hashFuncs {
hashBytes := hash(elemHash)
hashVal := LE2Int64(hashBytes[:8])
pos := hashVal % f.size
f.bitMap[pos]++
}
}
func (f *CountBloomFilter[T]) Search(elem T) bool {
if f.bitMap == nil {
return false
}
elemHash := elem.Hash()
for _, hash := range hashFuncs {
hashBytes := hash(elemHash)
hashVal := LE2Int64(hashBytes[:8])
pos := hashVal % f.size
if (f.bitMap[pos]) == 0 {
return false
}
}
return true
}
func (f *CountBloomFilter[T]) Delete(elem T) (bool, error) {
if !f.Search(elem) {
return false, nil
}
elemHash := elem.Hash()
for _, hash := range hashFuncs {
hashBytes := hash(elemHash)
hashVal := LE2Int64(hashBytes[:8])
pos := hashVal % f.size
f.bitMap[pos]--
}
return true, nil
}
func (f *CountBloomFilter[T]) List() ([]T, error) {
return nil, ErrFunctionUnSupported
}
单元测试代码实现如下:
func TestCountBloomFilter(t *testing.T) {
filter := NewCountBloomFilter[String](128)
filter.Add(StringPacket("Alice"))
filter.Add(StringPacket("Bob"))
filter.Add(StringPacket("Clone"))
filter.Add(StringPacket("Dawe"))
filter.Add(StringPacket("Eleps"))
ret := filter.Search(StringPacket("Dawe"))
Assert(t, true, ret, "BloomFilter(Search)")
ret, err := filter.Delete(StringPacket("Dawe"))
Assert(t, true, ret, "BloomFilter(Delete Dawe)")
AssertNil(t, err, "BloomFilter(Delete Dawe)")
ret = filter.Search(StringPacket("Dawe"))
Assert(t, false, ret, "BloomFilter(Search)")
}
测试截图如下
可逆布隆过滤器
基本原理
而可逆布隆过滤器则是在计数布隆过滤器的基础上,引入更多的数据结构来实现可逆,即能够从过滤器中还原出原来的数据。具体的思路如下,定义一个数据范围0~n,一个哈希函数g用于将[0, n]的数据放缩到[0, n^2],定义另外两个哈希函数f1和f2,用于将[0, n]的数据哈希到范围[0, m],其中m为过滤器底层数组的大小,然后除了定义m大小的count外,还定义一个数组idSum,对应每个位置是已加入且哈希落到该点所有元素之和,以及数组hashSum,类似于idSum,只不过取得是每一个元素经过g哈希的结果之和。此外,还要维护一组这样的数据,这不过其哈希函数使用上文定义的f1和f2。具体可以参考代码实现。在列出所有原元素时,是通过不断寻找纯cell的方式,纯cell就是数组中的一个点,且只有一个元素通过哈希会落到这一点,这样,就可以删除这个元素,然后继续找纯cell。如何去找纯cell,用这样的判断条件_g(int(tab[i].idSum / tab[i].count), N) == tab[i].hashSum / tab[i].count
,首先如果一个cell是纯的,一定会满足这个等式,由于哈希函数g的随机性,所以从概率上讲,不是纯cell但是满足这个等式的概率是比较小的。
代码实现
type InvertibleBloomFilter struct {
tabB []filterCell
tabC []filterCell
size uint64
count uint64
}
func NewInvertibleBloomFilter(size uint64) Filter[Integer] {
return &InvertibleBloomFilter{size: size}
}
func (f *InvertibleBloomFilter) Add(elem Integer) {
if f.tabB == nil {
f.tabB = make([]filterCell, f.size)
f.tabC = make([]filterCell, f.size)
}
elemHash := elem.Hash()
for _, hash := range hashFuncs {
hashBytes := hash(elemHash)
hashVal := LE2Int64(hashBytes[:8])
pos := hashVal % f.size
f.tabB[pos].count++
f.tabB[pos].idSum += uint64(elem.Depacket())
f.tabB[pos].hashSum += _g(elem.Depacket(), N)
}
poses := make([]uint, 2)
poses[0] = _f1(elem.Depacket(), uint(f.size))
poses[1] = _f2(elem.Depacket(), uint(f.size))
for _, pos := range poses {
f.tabC[pos].count++
f.tabC[pos].idSum += uint64(elem.Depacket())
f.tabC[pos].hashSum += _g(elem.Depacket(), N)
}
f.count++
}
func (f *InvertibleBloomFilter) Search(elem Integer) bool {
if f.tabB == nil {
return false
}
elemHash := elem.Hash()
for _, hash := range hashFuncs {
hashBytes := hash(elemHash)
hashVal := LE2Int64(hashBytes[:8])
pos := hashVal % f.size
if (f.tabB[pos].count) == 0 {
return false
}
}
return true
}
func (f *InvertibleBloomFilter) Delete(elem Integer) (bool, error) {
if !f.Search(elem) {
return false, nil
}
elemHash := elem.Hash()
for _, hash := range hashFuncs {
hashBytes := hash(elemHash)
hashVal := LE2Int64(hashBytes[:8])
pos := hashVal % f.size
f.tabB[pos].count--
f.tabB[pos].idSum -= uint64(elem.Depacket())
f.tabB[pos].hashSum -= _g(elem.Depacket(), N)
}
poses := make([]uint, 2)
poses[0] = _f1(elem.Depacket(), uint(f.size))
poses[1] = _f2(elem.Depacket(), uint(f.size))
for _, pos := range poses {
f.tabC[pos].count--
f.tabC[pos].idSum -= uint64(elem.Depacket())
f.tabC[pos].hashSum -= _g(elem.Depacket(), N)
}
f.count--
return true, nil
}
func (f *InvertibleBloomFilter) List() ([]Integer, error) {
res := make([]Integer, 0)
var loopAndFind func(tab []filterCell) = func(tab []filterCell) {
for {
flag := false
for i := uint64(0); i < f.size; i++ {
if tab[i].count != 0 && _g(int(tab[i].idSum / tab[i].count), N) == tab[i].hashSum / tab[i].count {
x := IntegerPacket(int(tab[i].idSum / tab[i].count))
for j := uint64(0); j < tab[i].count; i++ {
f.Delete(x)
}
flag = true
res = append(res, x)
break
}
}
if flag {
break
}
}
}
loopAndFind(f.tabB)
if f.count != 0 {
loopAndFind(f.tabC)
}
for _, item := range res {
f.Add(item)
}
return res, nil
}
单元测试代码如下:
func TestInvertibleBloomFilter(t *testing.T) {
filter := NewInvertibleBloomFilter(128)
filter.Add(IntegerPacket(123))
filter.Add(IntegerPacket(89))
filter.Add(IntegerPacket(34))
filter.Add(IntegerPacket(123))
ret := filter.Search(IntegerPacket(123))
Assert(t, true, ret, "TestInvertibleBloomFilter(search 123)")
ret, err := filter.Delete(IntegerPacket(89))
Assert(t, true, ret, "TestInvertibleBloomFilter(delete 89)")
AssertNil(t, err, "TestInvertibleBloomFilter(delete 89)")
ret = filter.Search(IntegerPacket(89))
Assert(t, false, ret, "TestInvertibleBloomFilter(search 89)")
rets, err := filter.List()
Assert(t, 3, len(rets), "TestInvertibleBloomFilter(List)")
AssertNil(t, err, "TestInvertibleBloomFilter(List)")
t.Logf("filter.List()=%+v", rets)
}
测试截图如下:
参考
完整代码可以参考:gulc: golang语言实用工具库 ↩︎
原文地址:https://blog.csdn.net/rtffcggh/article/details/143088396
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!