自学内容网 自学内容网

Golang语言系列-哈希表


哈希表也是每一种语言最常用的数据结构之一。一般计算机编程语言都是使用拉链法来解决哈希表哈希冲突的问题,而且一般有2的幂方 个的bucket,key通过哈希函数然后再对bucket的数量取模落到某个桶里。如果某个桶里已经有元素了,则使用拉链法插上一个新节点来保存数据。与java语言不同的是,golang里一个bucket里用overflow指针连接不同的bucket,然后一个bucket有八个槽位,比如key哈希后得到64位,那么低B位用于决定放在那一个bucket里,高8位用于决定放入bucket的那个槽位中,有点像redis底层数据结构中的quicklist,即链表的每一个节点又是一个数组,兼用数组和链表的优势,提升查找效率,而java使用长链表转换为红黑树来缓解这个问题的。同样的,关于扩容,也是基于一个叫做负载因子的量,负载因子等于哈希表时间存储元素数量除以桶的数量,在java中,阈值是0.75,而在golang里,这个值是6.5,当然还有其他的条件来判断是否要扩容。扩容都是将桶的数量扩容为原来的两倍。只不过,在golang里,用的是类似于redis的渐进式扩容,将扩容操作分摊到每一次的增删操作中进行,一次增删操作可能迁移一个桶的数据。
同样的是,普通的哈希表是并发不安全的,即使是一边遍历一边删除也会触发panic。在golang里,并发安全的map可以使用 sync.map,这个map适用于读多写少的场景,内部使用两个哈希表,一个read map用来只读,一个dirty map用来加锁下去写。当频繁去读dirty map或者dirty map中有较多新的数据,则将dirty map转换为read map,如果有新的请求到来,则初始化一个dirty map,然后拷贝read map的数据过去。
本文将对这两种哈希表进行详细的介绍,并且进行源码分析和实验验证。

源码分析

golang源码版本基于go1.21。

普通哈希表

源码位于runtime包下的map.go文件中。

makemap

首先来看make一个map的时候底层的函数,代码如下。值得关注的是,如果通过make(map[int]int, 8)这种方式指定容量,会计算出一个桶的数量,满足当8个元素插入哈希表,哈希表不会发生扩容。

func makemap(t *maptype, hint int, h *hmap) *hmap {
mem, overflow := math.MulUintptr(uintptr(hint), t.Bucket.Size_)
if overflow || mem > maxAlloc {
hint = 0
}

// initialize Hmap
if h == nil {
h = new(hmap)   // new关键字
}
h.hash0 = fastrand()

// Find the size parameter B which will hold the requested # of elements.
// For hint < 0 overLoadFactor returns false since hint < bucketCnt.
B := uint8(0)
for overLoadFactor(hint, B) {   // 首先为hint找到一个合适的桶的数量,满足不会扩容的条件
B++
}
h.B = B

// allocate initial hash table
// if B == 0, the buckets field is allocated lazily later (in mapassign)
// If hint is large zeroing this memory could take a while.
if h.B != 0 {
var nextOverflow *bmap
h.buckets, nextOverflow = makeBucketArray(t, h.B, nil)
if nextOverflow != nil {
h.extra = new(mapextra)
h.extra.nextOverflow = nextOverflow
}
}

return h
}

再来看一下map的整体结构如下

// A header for a Go map.
type hmap struct {
// Note: the format of the hmap is also encoded in cmd/compile/internal/reflectdata/reflect.go.
// Make sure this stays in sync with the compiler's definition.
count     int // # live cells == size of map.  Must be first (used by len() builtin)  // 元素数量
flags     uint8   // 标志位
B         uint8  // log_2 of # of buckets (can hold up to loadFactor * 2^B items)   // 2^B表示桶的数量
noverflow uint16 // approximate number of overflow buckets; see incrnoverflow for details   // 溢出桶的大概数量
hash0     uint32 // hash seed  // hash时的随机种子

buckets    unsafe.Pointer // array of 2^B Buckets. may be nil if count==0.
oldbuckets unsafe.Pointer // previous bucket array of half the size, non-nil only when growing  // 渐进式扩容时需要
nevacuate  uintptr        // progress counter for evacuation (buckets less than this have been evacuated)  // 渐进式扩容,搬迁精度

extra *mapextra // optional fields
}

再来看一下桶结构体的定义,代码如下:

// A bucket for a Go map.
type bmap struct {
// tophash generally contains the top byte of the hash value
// for each key in this bucket. If tophash[0] < minTopHash,
// tophash[0] is a bucket evacuation state instead.
// 一个桶里有八个槽位,第一个槽位值小于minTopHash时,说明该槽位值用来表示这个桶的搬迁进度
tophash [bucketCnt]uint8
// Followed by bucketCnt keys and then bucketCnt elems.
// NOTE: packing all the keys together and then all the elems together makes the
// code a bit more complicated than alternating key/elem/key/elem/... but it allows
// us to eliminate padding which would be needed for, e.g., map[int64]int8.
// Followed by an overflow pointer.
}

事实上,在编译期间会动态地创建一个新的结构,如下。
在这里插入图片描述

创建出来的新结构体大致如下:

type bmap struct {
    topbits  [8]uint8
    keys     [8]keytype
    values   [8]valuetype
    pad      uintptr
    overflow uintptr
}

每一个元素的tophash值、key、value都被分开存放,源码里解释说对于map[int64]int8这种,可以有效地避免内存对齐带来的空间浪费。

mapaccess

关于get操作源码里定义两个函数,函数签名分别为mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointerfunc mapaccess2(t *maptype, h *hmap, key unsafe.Pointer) (unsafe.Pointer, bool)。这个就是带comma和不带cmma的实现原理,编译器分析语法选择使用那一个函数。代码如下:

// 返回的value以指针形式返回,不能持有太久,其会导致整个hashmap在垃圾回收中都是存活状态
func mapaccess2(t *maptype, h *hmap, key unsafe.Pointer) (unsafe.Pointer, bool) {
if raceenabled && h != nil {
callerpc := getcallerpc()
pc := abi.FuncPCABIInternal(mapaccess2)
racereadpc(unsafe.Pointer(h), callerpc, pc)
raceReadObjectPC(t.Key, key, callerpc, pc)
}
if msanenabled && h != nil {
msanread(key, t.Key.Size_)
}
if asanenabled && h != nil {
asanread(key, t.Key.Size_)
}
if h == nil || h.count == 0 {
if t.HashMightPanic() {
t.Hasher(key, 0) // see issue 23734
}
// 返回零值
return unsafe.Pointer(&zeroVal[0]), false
}
// 表示有另外一个协程在写这个map,说明map不是并发安全的
if h.flags&hashWriting != 0 {  
fatal("concurrent map read and map write")
}
hash := t.Hasher(key, uintptr(h.hash0))
m := bucketMask(h.B)  // 1 << B - 1, hash&m相当于hash % (m + 1)
// 通过指针转换指针运算获取该元素所在桶的起始地址
b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.BucketSize)))
if c := h.oldbuckets; c != nil {  // 渐进式扩容中
// 扩容存在两种扩容时机,两种扩容方式
// 一、元素数量过大,导致超过负载因子,两倍扩容
// 二、溢出桶过多,key比较分散,一倍扩容,相当于做整理

// 两倍扩容下,旧桶大小为原来桶大小的一半,所以这里右移一位,方便下面获取key所在的旧桶的起始位置
if !h.sameSizeGrow() {
// There used to be half as many buckets; mask down one more power of two.
m >>= 1
}
oldb := (*bmap)(add(c, (hash&m)*uintptr(t.BucketSize)))
if !evacuated(oldb) {   // 没有搬迁完
b = oldb
}
}
// 计算topHash,取高八位,如果值小于minTophash,需要加上minTophash,,因为小于minTopHash的需要用来表示桶的搬迁状态
top := tophash(hash)  
bucketloop:
// 遍历所在的桶即之后可能存在的用指针链接起来的溢出桶
for ; b != nil; b = b.overflow(t) {  
// 遍历每一个槽位,通过tophash值比较
for i := uintptr(0); i < bucketCnt; i++ {
if b.tophash[i] != top {
if b.tophash[i] == emptyRest {
break bucketloop
}
continue
}
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.KeySize))   // 找到tophash匹配的key的起始地址
if t.IndirectKey() {
k = *((*unsafe.Pointer)(k))
}
if t.Key.Equal(key, k) {   // 还要比较key值是否相等
// 找到对应值在内存中的起始地址
e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.KeySize)+i*uintptr(t.ValueSize)) 
if t.IndirectElem() {
e = *((*unsafe.Pointer)(e))
}
return e, true
}
}
}
return unsafe.Pointer(&zeroVal[0]), false
}

// returns both key and elem. Used by map iterator.
func mapaccessK(t *maptype, h *hmap, key unsafe.Pointer) (unsafe.Pointer, unsafe.Pointer) {
if h == nil || h.count == 0 {
return nil, nil
}
hash := t.Hasher(key, uintptr(h.hash0))
m := bucketMask(h.B)
b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.BucketSize)))
if c := h.oldbuckets; c != nil {
if !h.sameSizeGrow() {
// There used to be half as many buckets; mask down one more power of two.
m >>= 1
}
oldb := (*bmap)(add(c, (hash&m)*uintptr(t.BucketSize)))
if !evacuated(oldb) {
b = oldb
}
}
top := tophash(hash)
bucketloop:   // 配合break实现跳出外层循环
for ; b != nil; b = b.overflow(t) {
for i := uintptr(0); i < bucketCnt; i++ {
if b.tophash[i] != top {
if b.tophash[i] == emptyRest {
break bucketloop
}
continue
}
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.KeySize))
if t.IndirectKey() {
k = *((*unsafe.Pointer)(k))
}
if t.Key.Equal(key, k) {
e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.KeySize)+i*uintptr(t.ValueSize))
if t.IndirectElem() {
e = *((*unsafe.Pointer)(e))
}
return k, e
}
}
}
return nil, nil
}

总结一下具体查找过程:首先通过指针运算找到桶和旧桶的起始地址,检查旧桶的搬迁情况,如果未完成搬迁,则访问旧桶,否则访问新桶。在访问过程中,根据key的哈希值定位到所在的桶的位置,然后遍历每一个槽位,检查各个槽位的tophash是否和当前tophash相等,如果相等,进一步检查key是否相等,若相等,则返回对应的value。

mapassign

再来看一下如何实现set操作,相关逻辑在函数mapassign中,代码如下:

// Like mapaccess, but allocates a slot for the key if it is not present in the map.
func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
if h == nil {   // 不能是一个nil的哈希表
panic(plainError("assignment to entry in nil map"))
}
if raceenabled {
callerpc := getcallerpc()
pc := abi.FuncPCABIInternal(mapassign)
racewritepc(unsafe.Pointer(h), callerpc, pc)
raceReadObjectPC(t.Key, key, callerpc, pc)
}
if msanenabled {
msanread(key, t.Key.Size_)
}
if asanenabled {
asanread(key, t.Key.Size_)
}
if h.flags&hashWriting != 0 {
fatal("concurrent map writes")
}
hash := t.Hasher(key, uintptr(h.hash0))

// Set hashWriting after calling t.hasher, since t.hasher may panic,
// in which case we have not actually done a write.
h.flags ^= hashWriting   // 写操作,置标志位

if h.buckets == nil { // 没有初始化桶,则初始化一个桶
h.buckets = newobject(t.Bucket) // newarray(t.Bucket, 1)
}

again:
bucket := hash & bucketMask(h.B)   // 找到对应的桶
if h.growing() {  // 渐进式扩容中
growWork(t, h, bucket)   // 完成旧桶的搬迁工作,这样写一定写在新桶里,要读这个key,因为旧桶完成了搬迁,所以读也一定读的是新桶
}
// 找到对应的桶
b := (*bmap)(add(h.buckets, bucket*uintptr(t.BucketSize)))
top := tophash(hash)

var inserti *uint8
var insertk unsafe.Pointer
var elem unsafe.Pointer
bucketloop:
for {
for i := uintptr(0); i < bucketCnt; i++ {  // 遍历每一个槽位
if b.tophash[i] != top {
// 找到了一个空闲的槽位
if isEmpty(b.tophash[i]) && inserti == nil {
inserti = &b.tophash[i]
insertk = add(unsafe.Pointer(b), dataOffset+i*uintptr(t.KeySize))
elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.KeySize)+i*uintptr(t.ValueSize))
}
if b.tophash[i] == emptyRest {
break bucketloop
}
continue
}
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.KeySize))
if t.IndirectKey() {
k = *((*unsafe.Pointer)(k))
}
if !t.Key.Equal(key, k) {   // tophash相等,但是key值不相等
continue
}
// already have a mapping for key. Update it.
if t.NeedKeyUpdate() {
typedmemmove(t.Key, k, key)
}
// 此时说明表中有key,返回对应value的起始地址,便于修改value值
elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.KeySize)+i*uintptr(t.ValueSize))
goto done
}
ovf := b.overflow(t)  // 遍历溢出桶
if ovf == nil {
break
}
b = ovf
}

// Did not find mapping for key. Allocate new cell & add entry.

// If we hit the max load factor or we have too many overflow buckets,
// and we're not already in the middle of growing, start growing.
// 扩容相关操作,条件为必须没有在渐进式扩容中,然后满足以下两个条件即可:
// 一、元素数量过多,超出了负载因子承受的范围
// 二、溢出桶过多
if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
hashGrow(t, h)   // 扩容操作,但是不搬迁
goto again // Growing the table invalidates everything, so try again
// 因为发生了扩容,一切需要重来
}
// 说明key不存在,且对应桶里没有空闲槽位,需要新建一个溢出桶
if inserti == nil {
// The current bucket and all the overflow buckets connected to it are full, allocate a new one.
newb := h.newoverflow(t, b)
inserti = &newb.tophash[0]
insertk = add(unsafe.Pointer(newb), dataOffset)
elem = add(insertk, bucketCnt*uintptr(t.KeySize))
}

// store new key/elem at insert position
if t.IndirectKey() {
kmem := newobject(t.Key)
*(*unsafe.Pointer)(insertk) = kmem
insertk = kmem
}
if t.IndirectElem() {
vmem := newobject(t.Elem)
*(*unsafe.Pointer)(elem) = vmem
}
typedmemmove(t.Key, insertk, key)
*inserti = top  // 将自己的tophash赋值给对应位置
h.count++

done:
if h.flags&hashWriting == 0 {
fatal("concurrent map writes")
}
h.flags &^= hashWriting  // 复位
if t.IndirectElem() {
elem = *((*unsafe.Pointer)(elem))
}
return elem
}

总结一下大概的操作,首先这个函数返回的是value的内存地址,调用方需要将value的值写入到这个地址。首先也是通过哈希值找到所在的桶,然后如果在搬迁中,需要将对应旧桶的数据搬迁到新桶中。然后遍历新桶中的每一个溢出桶每一个槽位,维护第一个找到的空闲槽位,如果找到一个槽位的tophash和key值都相等,即此次赋值操作只是一个更新,直接返回对应值的地址,如果没找到,说明要增加一个元素,首先检查是否要扩容,必须是当前不在搬迁状态且满足扩容的两个条件之一,然后进行扩容,扩容后需要重新遍历新桶。遍历完成后,如果没有空闲槽位,需要新建一个溢出桶,然后溢出桶的第一个槽位即为空闲槽位。最后返回空闲槽位对应的值的地址。

扩容和搬迁

再来看扩容,首先扩容时机,首先必须得保证当前不处于搬迁状态,然后满足以下的两个条件:

  • 元素数量过大,超出了负载因子能够承受的范围
  • 溢出桶过大,此时key分布比较散,查找效率低
    两个条件的判断代码分别如下:
// 决定count个元素的哈希表在2^B大小的哈希表下是否超载
func overLoadFactor(count int, B uint8) bool {
// bucketCnt=8 loadFactorNum=12 bucketShift(B)=1 << B loadFactorDen=2
// count > loadFactor * (1 << B), loadFactor大约等于6.5
return count > bucketCnt && uintptr(count) > loadFactorNum*(bucketShift(B)/loadFactorDen)
}
func tooManyOverflowBuckets(noverflow uint16, B uint8) bool {
// If the threshold is too low, we do extraneous work.
// If the threshold is too high, maps that grow and shrink can hold on to lots of unused memory.
// "too many" means (approximately) as many overflow buckets as regular buckets.
// See incrnoverflow for more details.
if B > 15 {
B = 15
}
// The compiler doesn't see here that B < 16; mask B to generate shorter shift code.
return noverflow >= uint16(1)<<(B&15)
}

具体完成扩容的函数hashGrow如下:

func hashGrow(t *maptype, h *hmap) {
// If we've hit the load factor, get bigger.
// Otherwise, there are too many overflow buckets,
// so keep the same number of buckets and "grow" laterally.
bigger := uint8(1)
// 溢出桶过大,扩容桶数量不变
if !overLoadFactor(h.count+1, h.B) { 
bigger = 0
h.flags |= sameSizeGrow
}
oldbuckets := h.buckets
newbuckets, nextOverflow := makeBucketArray(t, h.B+bigger, nil)

flags := h.flags &^ (iterator | oldIterator)
if h.flags&iterator != 0 {
flags |= oldIterator
}
// commit the grow (atomic wrt gc)
// 当前桶变为旧桶,新创建出来的桶是一个空桶,且搬迁进度和溢出桶数量都赋值为零
h.B += bigger
h.flags = flags
h.oldbuckets = oldbuckets
h.buckets = newbuckets
h.nevacuate = 0
h.noverflow = 0

if h.extra != nil && h.extra.overflow != nil {
// Promote current overflow buckets to the old generation.
if h.extra.oldoverflow != nil {
throw("oldoverflow is not nil")
}
h.extra.oldoverflow = h.extra.overflow
h.extra.overflow = nil
}
if nextOverflow != nil {
if h.extra == nil {
h.extra = new(mapextra)
}
h.extra.nextOverflow = nextOverflow
}

// the actual copying of the hash table data is done incrementally
// by growWork() and evacuate().
}

最后来看最关键的搬迁逻辑,代码如下:

func growWork(t *maptype, h *hmap, bucket uintptr) {
// make sure we evacuate the oldbucket corresponding
// to the bucket we're about to use
// 找到对应的旧桶
evacuate(t, h, bucket&h.oldbucketmask())

// evacuate one more oldbucket to make progress on growing
// 加速搬迁
if h.growing() {
evacuate(t, h, h.nevacuate)
}
}

具体完成搬迁工作的函数是evacuate,代码如下:

func evacuate(t *maptype, h *hmap, oldbucket uintptr) {
// 找到旧桶的起始位置
b := (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.BucketSize)))
newbit := h.noldbuckets()  // 旧桶数量
if !evacuated(b) {
// TODO: reuse overflow buckets instead of using new ones, if there
// is no iterator using the old buckets.  (If !oldIterator.)

// xy contains the x and y (low and high) evacuation destinations.
// 搬迁中如果扩容两倍,则一个旧桶的数据会搬迁到两个新桶中
// 这里x表示低位置的那个,这个无论是那种扩容都存在这个新桶
var xy [2]evacDst
x := &xy[0]
x.b = (*bmap)(add(h.buckets, oldbucket*uintptr(t.BucketSize)))
// 使用k和e来保存第一个空闲的槽位对应的key和value的起始地址
x.k = add(unsafe.Pointer(x.b), dataOffset)
x.e = add(x.k, bucketCnt*uintptr(t.KeySize))

if !h.sameSizeGrow() {  // 扩容两倍时需要计算高位的新桶,序号等于旧桶序号加上旧桶数量
// Only calculate y pointers if we're growing bigger.
// Otherwise GC can see bad pointers.
y := &xy[1]
y.b = (*bmap)(add(h.buckets, (oldbucket+newbit)*uintptr(t.BucketSize)))
y.k = add(unsafe.Pointer(y.b), dataOffset)
y.e = add(y.k, bucketCnt*uintptr(t.KeySize))
}
// 遍历每一个溢出桶
for ; b != nil; b = b.overflow(t) {
k := add(unsafe.Pointer(b), dataOffset)
e := add(k, bucketCnt*uintptr(t.KeySize))
// 遍历每一个槽位
for i := 0; i < bucketCnt; i, k, e = i+1, add(k, uintptr(t.KeySize)), add(e, uintptr(t.ValueSize)) {
top := b.tophash[i]
if isEmpty(top) {
b.tophash[i] = evacuatedEmpty
continue
}
if top < minTopHash {   
throw("bad map state")
}
k2 := k
if t.IndirectKey() {
k2 = *((*unsafe.Pointer)(k2))
}
var useY uint8   // 是否使用新桶
if !h.sameSizeGrow() {
// Compute hash to make our evacuation decision (whether we need
// to send this key/elem to bucket x or bucket y).
hash := t.Hasher(k2, uintptr(h.hash0))
if h.flags&iterator != 0 && !t.ReflexiveKey() && !t.Key.Equal(k2, k2) {
// If key != key (NaNs), then the hash could be (and probably
// will be) entirely different from the old hash. Moreover,
// it isn't reproducible. Reproducibility is required in the
// presence of iterators, as our evacuation decision must
// match whatever decision the iterator made.
// Fortunately, we have the freedom to send these keys either
// way. Also, tophash is meaningless for these kinds of keys.
// We let the low bit of tophash drive the evacuation decision.
// We recompute a new random tophash for the next level so
// these keys will get evenly distributed across all buckets
// after multiple grows.
// 因为NAN值每次哈希结果都不一样,为了保证迭代的可重现性,使用旧的哈希值的最低位决定放在那一个新桶中
useY = top & 1
top = tophash(hash)
} else {
if hash&newbit != 0 {
useY = 1
}
}
}

if evacuatedX+1 != evacuatedY || evacuatedX^1 != evacuatedY {
throw("bad evacuatedN")
}

b.tophash[i] = evacuatedX + useY // evacuatedX + 1 == evacuatedY
// 搬迁的终点
dst := &xy[useY]                 // evacuation destination

if dst.i == bucketCnt {  // 当前桶已满,新建一个溢出桶
dst.b = h.newoverflow(t, dst.b)
dst.i = 0
dst.k = add(unsafe.Pointer(dst.b), dataOffset)
dst.e = add(dst.k, bucketCnt*uintptr(t.KeySize))
}
// 赋值操作
dst.b.tophash[dst.i&(bucketCnt-1)] = top // mask dst.i as an optimization, to avoid a bounds check
if t.IndirectKey() {
*(*unsafe.Pointer)(dst.k) = k2 // copy pointer
} else {
typedmemmove(t.Key, dst.k, k) // copy elem
}
if t.IndirectElem() {
*(*unsafe.Pointer)(dst.e) = *(*unsafe.Pointer)(e)
} else {
typedmemmove(t.Elem, dst.e, e)
}
dst.i++
// These updates might push these pointers past the end of the
// key or elem arrays.  That's ok, as we have the overflow pointer
// at the end of the bucket to protect against pointing past the
// end of the bucket.
dst.k = add(dst.k, uintptr(t.KeySize))
dst.e = add(dst.e, uintptr(t.ValueSize))
}
}
// Unlink the overflow buckets & clear key/elem to help GC.
// 清空指向旧桶中的溢出桶和各种key和value,帮助gc
if h.flags&oldIterator == 0 && t.Bucket.PtrBytes != 0 {
b := add(h.oldbuckets, oldbucket*uintptr(t.BucketSize))
// Preserve b.tophash because the evacuation
// state is maintained there.
ptr := add(b, dataOffset)
n := uintptr(t.BucketSize) - dataOffset
memclrHasPointers(ptr, n)
}
}

// 完成oldbucket的搬迁,更新nevacuate搬迁进度,由于该值表示小于该值的旧桶已经完成搬迁
if oldbucket == h.nevacuate {   
advanceEvacuationMark(h, t, newbit)
}
}

func advanceEvacuationMark(h *hmap, t *maptype, newbit uintptr) {
h.nevacuate++
// Experiments suggest that 1024 is overkill by at least an order of magnitude.
// Put it in there as a safeguard anyway, to ensure O(1) behavior.
stop := h.nevacuate + 1024
if stop > newbit {
stop = newbit
}
// 更新搬迁的进度
for h.nevacuate != stop && bucketEvacuated(t, h, h.nevacuate) {
h.nevacuate++
}
if h.nevacuate == newbit { // newbit == # of oldbuckets // 完成搬迁
// Growing is all done. Free old main bucket array.
h.oldbuckets = nil
// Can discard old overflow buckets as well.
// If they are still referenced by an iterator,
// then the iterator holds a pointers to the slice.
if h.extra != nil {
h.extra.oldoverflow = nil
}
h.flags &^= sameSizeGrow
}
}

可以看到,每次搬迁只是搬迁一个旧桶的数值,且只需要遍历这个旧桶的每一个溢出桶,每一个槽位,将非空槽位的数据搬迁到新桶中,且一个旧桶的数据只会被搬迁到两个新桶中。有一点需要注意的是,一个是维护当前搬迁进度的变量nevacuate,这个值表示的含义就是任何小于该值的编号对应的旧桶已经完成了搬迁,从growWork函数中可以看到,对哈希表的增删两种操作会触发对应旧桶位置的搬迁,并且为了加速,会额外搬迁nevacuate对应编号的旧桶。

mapdelete

代码如下,具体逻辑和前面的操作类似

func mapdelete(t *maptype, h *hmap, key unsafe.Pointer) {
if raceenabled && h != nil {
callerpc := getcallerpc()
pc := abi.FuncPCABIInternal(mapdelete)
racewritepc(unsafe.Pointer(h), callerpc, pc)
raceReadObjectPC(t.Key, key, callerpc, pc)
}
if msanenabled && h != nil {
msanread(key, t.Key.Size_)
}
if asanenabled && h != nil {
asanread(key, t.Key.Size_)
}
if h == nil || h.count == 0 {
if t.HashMightPanic() {
t.Hasher(key, 0) // see issue 23734
}
return
}
if h.flags&hashWriting != 0 {
fatal("concurrent map writes")
}

hash := t.Hasher(key, uintptr(h.hash0))

// Set hashWriting after calling t.hasher, since t.hasher may panic,
// in which case we have not actually done a write (delete).
h.flags ^= hashWriting

bucket := hash & bucketMask(h.B)
if h.growing() {   // 进行搬迁
growWork(t, h, bucket)
}
b := (*bmap)(add(h.buckets, bucket*uintptr(t.BucketSize)))
bOrig := b
top := tophash(hash)
search:
for ; b != nil; b = b.overflow(t) {
for i := uintptr(0); i < bucketCnt; i++ {
if b.tophash[i] != top {
if b.tophash[i] == emptyRest {
break search
}
continue
}
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.KeySize))
k2 := k
if t.IndirectKey() {
k2 = *((*unsafe.Pointer)(k2))
}
if !t.Key.Equal(key, k2) {  // 值是否对应相等
continue
}
// Only clear key if there are pointers in it.
if t.IndirectKey() {
*(*unsafe.Pointer)(k) = nil
} else if t.Key.PtrBytes != 0 {
memclrHasPointers(k, t.Key.Size_)
}
// 找到了对应key指向的value的地址,释放掉
e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.KeySize)+i*uintptr(t.ValueSize))
if t.IndirectElem() {
*(*unsafe.Pointer)(e) = nil
} else if t.Elem.PtrBytes != 0 {
memclrHasPointers(e, t.Elem.Size_)
} else {
memclrNoHeapPointers(e, t.Elem.Size_)
}
b.tophash[i] = emptyOne  // 槽位清空
// If the bucket now ends in a bunch of emptyOne states,
// change those to emptyRest states.
// It would be nice to make this a separate function, but
// for loops are not currently inlineable.
if i == bucketCnt-1 {
if b.overflow(t) != nil && b.overflow(t).tophash[0] != emptyRest {
goto notLast
}
} else {
if b.tophash[i+1] != emptyRest {
goto notLast
}
}
for {
b.tophash[i] = emptyRest
if i == 0 {
if b == bOrig {
break // beginning of initial bucket, we're done.
}
// Find previous bucket, continue at its last entry.
c := b
for b = bOrig; b.overflow(t) != c; b = b.overflow(t) {
}
i = bucketCnt - 1
} else {
i--
}
if b.tophash[i] != emptyOne {
break
}
}
notLast:
h.count--
// Reset the hash seed to make it more difficult for attackers to
// repeatedly trigger hash collisions. See issue 25237.
// 由于此时哈希表没有元素了,所以可以更换哈希随机种子,增加安全性
if h.count == 0 {
h.hash0 = fastrand()
}
break search
}
}

if h.flags&hashWriting == 0 {
fatal("concurrent map writes")
}
h.flags &^= hashWriting
}

迭代操作

首先来关注一下迭代器结构体,如下:

type hiter struct {
key         unsafe.Pointer // Must be in first position.  Write nil to indicate iteration end (see cmd/compile/internal/walk/range.go).
elem        unsafe.Pointer // Must be in second position (see cmd/compile/internal/walk/range.go).
t           *maptype
h           *hmap
buckets     unsafe.Pointer // bucket ptr at hash_iter initialization time
bptr        *bmap          // current bucket
overflow    *[]*bmap       // keeps overflow buckets of hmap.buckets alive
oldoverflow *[]*bmap       // keeps overflow buckets of hmap.oldbuckets alive
startBucket uintptr        // bucket iteration started at
offset      uint8          // intra-bucket offset to start from during iteration (should be big enough to hold bucketCnt-1)
wrapped     bool           // already wrapped around from end of bucket array to beginning
B           uint8
i           uint8
bucket      uintptr
checkBucket uintptr
}

通过将含有for k,v := range map之类代码汇编,从汇编码中可以直到迭代器两个重要的操作,一个是初始化函数mapiterinit, 相关代码如下:

func mapiterinit(t *maptype, h *hmap, it *hiter) {
if raceenabled && h != nil {
callerpc := getcallerpc()
racereadpc(unsafe.Pointer(h), callerpc, abi.FuncPCABIInternal(mapiterinit))
}

it.t = t
// 各种校验操作
if h == nil || h.count == 0 {
return
}

if unsafe.Sizeof(hiter{})/goarch.PtrSize != 12 {
throw("hash_iter size incorrect") // see cmd/compile/internal/reflectdata/reflect.go
}
it.h = h

// grab snapshot of bucket state
it.B = h.B
it.buckets = h.buckets
if t.Bucket.PtrBytes == 0 {
// Allocate the current slice and remember pointers to both current and old.
// This preserves all relevant overflow buckets alive even if
// the table grows and/or overflow buckets are added to the table
// while we are iterating.
h.createOverflow()
it.overflow = h.extra.overflow
it.oldoverflow = h.extra.oldoverflow
}

// decide where to start
var r uintptr
if h.B > 31-bucketCntBits {
r = uintptr(fastrand64())
} else {
r = uintptr(fastrand())
}
// 随机选择一个bucket开始迭代
it.startBucket = r & bucketMask(h.B)
it.offset = uint8(r >> h.B & (bucketCnt - 1))

// iterator state
it.bucket = it.startBucket

// Remember we have an iterator.
// Can run concurrently with another mapiterinit().
// 标志位赋值,表明当前有迭代器正在遍历
if old := h.flags; old&(iterator|oldIterator) != iterator|oldIterator {
atomic.Or8(&h.flags, iterator|oldIterator)
}

mapiternext(it)
}

另外一个是迭代函数mapiternext,代码如下:

func mapiternext(it *hiter) {
h := it.h
if raceenabled {
callerpc := getcallerpc()
racereadpc(unsafe.Pointer(h), callerpc, abi.FuncPCABIInternal(mapiternext))
}
if h.flags&hashWriting != 0 {
fatal("concurrent map iteration and map write")
}
t := it.t
bucket := it.bucket
b := it.bptr
i := it.i
checkBucket := it.checkBucket

next:
if b == nil {
if bucket == it.startBucket && it.wrapped {
// end of iteration  // 迭代结束
it.key = nil
it.elem = nil
return
}
if h.growing() && it.B == h.B {   // 迭代器开始与搬迁状态中,且此时搬迁状态还没有结束
// Iterator was started in the middle of a grow, and the grow isn't done yet.
// If the bucket we're looking at hasn't been filled in yet (i.e. the old
// bucket hasn't been evacuated) then we need to iterate through the old
// bucket and only return the ones that will be migrated to this bucket.
oldbucket := bucket & it.h.oldbucketmask()
b = (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.BucketSize)))
if !evacuated(b) {
checkBucket = bucket   // 需要检查旧桶
} else {
b = (*bmap)(add(it.buckets, bucket*uintptr(t.BucketSize)))
checkBucket = noCheck   // 搬迁完,所以需要检查旧桶
}
} else {   // 其他情况下直接可以使用迭代器初始化时的桶数组
b = (*bmap)(add(it.buckets, bucket*uintptr(t.BucketSize)))
checkBucket = noCheck
}
// 下一个要迭代的bucket就是该bucket的下一个bucket,如果是最后一个,则回到第一个
bucket++
if bucket == bucketShift(it.B) {
bucket = 0
it.wrapped = true
}
i = 0
}
for ; i < bucketCnt; i++ {
// 槽位也是随机从一个槽位开始遍历
offi := (i + it.offset) & (bucketCnt - 1)
if isEmpty(b.tophash[offi]) || b.tophash[offi] == evacuatedEmpty {
// TODO: emptyRest is hard to use here, as we start iterating
// in the middle of a bucket. It's feasible, just tricky.
continue
}
k := add(unsafe.Pointer(b), dataOffset+uintptr(offi)*uintptr(t.KeySize))
if t.IndirectKey() {
k = *((*unsafe.Pointer)(k))
}
e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.KeySize)+uintptr(offi)*uintptr(t.ValueSize))
// 需要检查旧桶
if checkBucket != noCheck && !h.sameSizeGrow() {
// Special case: iterator was started during a grow to a larger size
// and the grow is not done yet. We're working on a bucket whose
// oldbucket has not been evacuated yet. Or at least, it wasn't
// evacuated when we started the bucket. So we're iterating
// through the oldbucket, skipping any keys that will go
// to the other new bucket (each oldbucket expands to two
// buckets during a grow).
if t.ReflexiveKey() || t.Key.Equal(k, k) {
// If the item in the oldbucket is not destined for
// the current new bucket in the iteration, skip it.
                // 旧桶中的这个元素搬迁后不是到准备遍历的新桶,下一个
hash := t.Hasher(k, uintptr(h.hash0))
if hash&bucketMask(it.B) != checkBucket {
continue
}
} else {   // 针对一些特殊的多次哈希结果不同的
// Hash isn't repeatable if k != k (NaNs).  We need a
// repeatable and randomish choice of which direction
// to send NaNs during evacuation. We'll use the low
// bit of tophash to decide which way NaNs go.
// NOTE: this case is why we need two evacuate tophash
// values, evacuatedX and evacuatedY, that differ in
// their low bit.
if checkBucket>>(it.B-1) != uintptr(b.tophash[offi]&1) {
continue
}
}
}
if (b.tophash[offi] != evacuatedX && b.tophash[offi] != evacuatedY) ||
!(t.ReflexiveKey() || t.Key.Equal(k, k)) {
// This is the golden data, we can return it.
// OR
// key!=key, so the entry can't be deleted or updated, so we can just return it.
// That's lucky for us because when key!=key we can't look it up successfully.
it.key = k
if t.IndirectElem() {
e = *((*unsafe.Pointer)(e))
}
it.elem = e
} else {
// The hash table has grown since the iterator was started.
// The golden data for this key is now somewhere else.
// Check the current hash table for the data.
// This code handles the case where the key
// has been deleted, updated, or deleted and reinserted.
// NOTE: we need to regrab the key as it has potentially been
// updated to an equal() but not identical key (e.g. +0.0 vs -0.0).
rk, re := mapaccessK(t, h, k)
if rk == nil {
continue // key has been deleted
}
it.key = rk
it.elem = re
}
it.bucket = bucket
if it.bptr != b { // avoid unnecessary write barrier; see issue 14921
it.bptr = b
}
it.i = i + 1
it.checkBucket = checkBucket
return
}
b = b.overflow(t)
i = 0
goto next
}

从代码中就可以解释为什么每次遍历哈希表key的顺序都是不一样的,首先在初始化迭代器中,是随机选择了一个桶,然后顺着循环遍历完所有桶,然后在每一个桶里,也是随机一个槽位。

sync.map

sync.Map是golang标准库中提供的并发安全的哈希表实现,其适用于读多写少的场景,在写多的场景下,几乎退化成了互斥锁加哈希表,性能较低,这一点就比如java提供的高度优化的concurrentHashMap了。
sync.Map源码在sync包下的map.go文件中。首先来看一些结构体的定义,代码如下:

type Map struct {
mu Mutex   // 互斥锁

// read contains the portion of the map's contents that are safe for
// concurrent access (with or without mu held).
//
// The read field itself is always safe to load, but must only be stored with
// mu held.
//
// Entries stored in read may be updated concurrently without mu, but updating
// a previously-expunged entry requires that the entry be copied to the dirty
// map and unexpunged with mu held.
// atomic.Pointer 可以指向任意类型,可以对指针指向的内存进行原子性操作
read atomic.Pointer[readOnly]

// dirty contains the portion of the map's contents that require mu to be
// held. To ensure that the dirty map can be promoted to the read map quickly,
// it also includes all of the non-expunged entries in the read map.
//
// Expunged entries are not stored in the dirty map. An expunged entry in the
// clean map must be unexpunged and added to the dirty map before a new value
// can be stored to it.
//
// If the dirty map is nil, the next write to the map will initialize it by
// making a shallow copy of the clean map, omitting stale entries.
dirty map[any]*entry   // 需要加锁处理的读写map

// misses counts the number of loads since the read map was last updated that
// needed to lock mu to determine whether the key was present.
//
// Once enough misses have occurred to cover the cost of copying the dirty
// map, the dirty map will be promoted to the read map (in the unamended
// state) and the next store to the map will make a new dirty copy.
// 访问read 的失效次数
misses int
}
// readOnly is an immutable struct stored atomically in the Map.read field.
type readOnly struct {`在这里插入代码片`
m       map[any]*entry
// 标明dirty map是否有key不在m中,即dirty map是否有修改
amended bool // true if the dirty map contains some key not in m.
}
// An entry is a slot in the map corresponding to a particular key.
type entry struct {
// p points to the interface{} value stored for the entry.
//
// If p == nil, the entry has been deleted, and either m.dirty == nil or
// m.dirty[key] is e.
//
// If p == expunged, the entry has been deleted, m.dirty != nil, and the entry
// is missing from m.dirty.
//
// Otherwise, the entry is valid and recorded in m.read.m[key] and, if m.dirty
// != nil, in m.dirty[key].
//
// An entry can be deleted by atomic replacement with nil: when m.dirty is
// next created, it will atomically replace nil with expunged and leave
// m.dirty[key] unset.
//
// An entry's associated value can be updated by atomic replacement, provided
// p != expunged. If p == expunged, an entry's associated value can be updated
// only after first setting m.dirty[key] = e so that lookups using the dirty
// map find the entry.
p atomic.Pointer[any]
}

Load读操作

源码如下:

func (m *Map) Load(key any) (value any, ok bool) {
read := m.loadReadOnly()  // 先从read表中读
e, ok := read.m[key]  // 只读表,不需要加锁
if !ok && read.amended {  // 在read表中没找到,且dirty表有修改
m.mu.Lock()
// Avoid reporting a spurious miss if m.dirty got promoted while we were
// blocked on m.mu. (If further loads of the same key will not miss, it's
// not worth copying the dirty map for this key.)
// 双重校验:避免阻塞在锁期间,有另外的协程执行了dirty表转read表
read = m.loadReadOnly()
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key]
// Regardless of whether the entry was present, record a miss: this key
// will take the slow path until the dirty map is promoted to the read
// map.
m.missLocked()
}
m.mu.Unlock()
}
if !ok {
return nil, false
}
return e.load()
}

load操作优先去读read表,只有当read表为命中且dirty表相比read表有修改时,会去加锁读dirty表,无论是否命中,miss计数都会加一,然后如果该计数大于等于dirty表中的元素数量,就会发生missLocked,即dirty表转换为read表,然后dirty本身置空。missLocked的代码如下:

func (m *Map) missLocked() {
m.misses++
if m.misses < len(m.dirty) {   // 阈值是miss次数大于等于dirty表中的元素
return
}
m.read.Store(&readOnly{m: m.dirty})
m.dirty = nil
m.misses = 0
}

Store操作

// Store sets the value for a key.
func (m *Map) Store(key, value any) {
_, _ = m.Swap(key, value)
}
// Load写操作通过Swap来实现
func (m *Map) Swap(key, value any) (previous any, loaded bool) {
// 先看read表,key命中且CAS赋值成功下,直接返回
read := m.loadReadOnly()
if e, ok := read.m[key]; ok {
if v, ok := e.trySwap(&value); ok {
if v == nil {
return nil, false
}
return *v, true
}
}
// 处理dirty表
m.mu.Lock()
read = m.loadReadOnly()
if e, ok := read.m[key]; ok {   // 存在于read表中
if e.unexpungeLocked() {
// The entry was previously expunged, which implies that there is a
// non-nil dirty map and this entry is not in it.
// e不属于硬删除
m.dirty[key] = e
}
if v := e.swapLocked(&value); v != nil {
loaded = true
previous = *v
}
} else if e, ok := m.dirty[key]; ok {  // 只存在于dirty表中
if v := e.swapLocked(&value); v != nil {
loaded = true
previous = *v
}
} else {
if !read.amended {   // 说明此时dirty表为空
// We're adding the first new key to the dirty map.
// Make sure it is allocated and mark the read-only map as incomplete.
m.dirtyLocked()
m.read.Store(&readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
}
m.mu.Unlock()
return previous, loaded
}

整体的流程也是先看read表是否命中,若命中的话可以基于CAS原子操作修改更新值。如果没有命中,则加锁去读dirty表,并且双重检验检查read表是否命中,然后尝试在read表或者dirty表中基于CAS操作更新值,如果都没有命中,则检查dirty是否为空,如果是,发生dirtyLocked,即拷贝read表作为dirty表,然后插入新的键值对。双重检验的目的在于避免在阻塞在锁的时候,有另外的协程执行了missLocked。

Delete操作

// LoadAndDelete deletes the value for a key, returning the previous value if any.
// The loaded result reports whether the key was present.
func (m *Map) LoadAndDelete(key any) (value any, loaded bool) {
read := m.loadReadOnly()
e, ok := read.m[key]
if !ok && read.amended {  // 没命中read表,且dirty表相比read有变化
m.mu.Lock()
read = m.loadReadOnly()
e, ok = read.m[key]
if !ok && read.amended {   // 双重校验
e, ok = m.dirty[key]
delete(m.dirty, key)
// Regardless of whether the entry was present, record a miss: this key
// will take the slow path until the dirty map is promoted to the read
// map.
m.missLocked()
}
m.mu.Unlock()
}
if ok {   // 命中read表
return e.delete()
}
return nil, false
}

// Delete deletes the value for a key.
func (m *Map) Delete(key any) {
m.LoadAndDelete(key)
}

和Store过程类似,但是如果在read表中二次检验中没有命中,也会触发missLocked。

遍历操作

func (m *Map) Range(f func(key, value any) bool) {
// We need to be able to iterate over all of the keys that were already
// present at the start of the call to Range.
// If read.amended is false, then read.m satisfies that property without
// requiring us to hold m.mu for a long time.
read := m.loadReadOnly()
if read.amended {   // dirty表有修改
// m.dirty contains keys not in read.m. Fortunately, Range is already O(N)
// (assuming the caller does not break out early), so a call to Range
// amortizes an entire copy of the map: we can promote the dirty copy
// immediately!
m.mu.Lock()
read = m.loadReadOnly()
if read.amended {   // 直接missLocked
read = readOnly{m: m.dirty}
m.read.Store(&read)
m.dirty = nil
m.misses = 0
}
m.mu.Unlock()
}

for k, e := range read.m {  // 遍历只读表
v, ok := e.load()
if !ok {
continue
}
if !f(k, v) {
break
}
}
}

如果dirty表相比read表有更新,则直接missLocked,保证read表中有当前的全量数据,然后可以无锁遍历read表。

其他操作

除了上述提及的接口,sync.map还对外暴露了如下接口:LoadOrStore,如果命中则直接返回旧值,没有命中需要插入这个键值对,类似于redis实现分布式锁的setnx命令;CompareAndSwap,这个就是类似于CAS操作,只有当key对应的值是传入的旧值,才将值更新为传入的新值;CompareAndDelete,当key对应的旧值等于传入的旧值,才进行删除。

总结

sync.map底层基于map实现,适用于读多写少的场景,基于以空间换时间和动态调整策略。整体包括两张表,read表和dirty表,前者使用atomic.Pointer指向,所以基于CAS操作并发访问,后者则需要加锁访问,且两张表的value都是atomic.Pointer类型的entry,保证了对于value的访问也可以基于CAS操作访问。无论是读还是写操作亦或是删除遍历等操作,都是优先在read表中进行,只有当read表未命中或者遍历是不是全量数据,才会加锁去访问dirty表,如果访问dirty次数过多,会发生missLocked操作,即dirty表会转换为read表,然后当需要写入dirty表时,又会发生dirtyLocked操作,将read表拷贝一份作为dirty表。

参考


原文地址:https://blog.csdn.net/rtffcggh/article/details/143647669

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!