1 源码之外

key 的类型要求

map 中,key 的数据类型必须为可比较的类型,slice、map、func不可比较

  • 注意,chan可比较,是个指针
  • array 可比较,元素一模一样就一样

1
delete(myMap,5)

倘若 key 不存在或 map 未初始化,则方法直接结束,不会产生显式提示.

并发冲突

map 不是并发安全的数据结构,倘若存在并发读写行为,会抛出 fatal error.

具体规则是:

(1)并发读没有问题;

(2)并发读写中的“写”是广义上的,包含写入、更新、删除等操作;

(3)读的时候发现其他 goroutine 在并发写,抛出 fatal error;

(4)写的时候发现其他 goroutine 在并发写,抛出 fatal error.

1
2
fatal("concurrent map read and map write")
fatal("concurrent map writes")

需要关注,此处并发读写会引发 fatal error,是一种比 panic 更严重的错误,无法使用 recover 操作捕获.

2 核心原理

桶数组

map 中,会通过长度为 2 的整数次幂的桶数组进行 key-value 对的存储:

(1)每个桶固定可以存放 8 个 key-value 对;

(2)倘若超过 8 个 key-value 对打到桶数组的同一个索引当中,此时会通过创建桶链表的方式来化解这一问题.

解决 hash 冲突

方法 优点
拉链法 简单常用;无需预先为元素分配内存.
开放寻址法 无需额外的指针用于链接元素;内存地址完全连续,可以基于局部性原理,充分利用 CPU 高速缓存.

在 map 解决 hash /分桶 冲突问题时,实际上结合了拉链法和开放寻址法两种思路. 以 map 的插入写流程为例,进行思路阐述:

(1)桶数组中的每个桶,严格意义上是一个单向桶链表,以桶为节点进行串联;

(2)每个桶固定可以存放 8 个 key-value 对;

(3)当 key 命中一个桶时,首先根据开放寻址法,在桶的 8 个位置中寻找空位进行插入;

(4)倘若桶的 8 个位置都已被占满,则基于桶的溢出桶指针,找到下一个桶,重复第(3)步;

(5)倘若遍历到链表尾部,仍未找到空位,则基于拉链法,在桶链表尾部续接新桶,并插入 key-value 对.

扩容优化性能

倘若 map 的桶数组长度固定不变,那么随着 key-value 对数量的增长,当一个桶下挂载的 key-value 达到一定的量级,此时操作的时间复杂度会趋于线性,无法满足诉求.

因此在实现上,map 桶数组的长度会随着 key-value 对数量的变化而实时调整,以保证每个桶内的 key-value 对数量始终控制在常量级别,满足各项操作为 O(1) 时间复杂度的要求.

map 扩容机制的核心点包括:

(1)扩容分为增量扩容和等量扩容;

(2)当桶内 key-value 总数/桶数组长度 > 6.5 时发生增量扩容,桶数组长度增长为原值的两倍;

(3)当桶内溢出桶数量大于等于 2^B 时( B 为桶数组长度的指数,B 最大取 15),发生等量扩容,桶的长度保持为原值;

(4)采用渐进扩容的方式,当桶被实际操作到时,由使用者负责完成数据迁移,避免因为一次性的全量数据迁移引发性能抖动.

3 数据结构

3.1 hmap

1
2
3
4
5
6
7
8
9
10
11
type hmap struct {
count int map //中的 key-value 总数;
flags uint8 map //用每一位做状态标识,可以标识出 map 是否被 goroutine 并发读写、是不是正在扩容等. 全文搜索 h.flags可以看到用法很好理解。
B uint8 //桶数组长度的指数,桶数组长度为 2^B;
noverflow uint16 //map 中溢出桶的数量;
hash0 uint32 //hash 随机因子,生成 key 的 hash 值时会使用到;
buckets unsafe.Pointer //桶数组;
oldbuckets unsafe.Pointer //扩容过程中老的桶数组;
nevacuate uintptr //扩容时的进度标识,index 小于 nevacuate 的桶都已经由老桶转移到新桶中;
extra *mapextra //预申请的溢出桶.
}

3.2 mapextra

1
2
3
4
5
6
7
type mapextra struct {
overflow *[]*bmap 供桶数组 buckets 使用的溢出桶;
oldoverflow *[]*bmap 扩容流程中,供老桶数组 oldBuckets 使用的溢出桶;


nextOverflow *bmap 下一个可用的溢出桶.
}

在 map 初始化时,倘若容量过大,会提前申请好一批溢出桶,以供后续使用,这部分溢出桶存放在 hmap.mapextra 当中

3.3 bmap

1
2
3
4
5
6
7
8
9
const bucketCnt = 8

type bmap struct {
// 下面是三个都是定长 8 的数组。tophash\k\v三个也是一组。
tophash [bucketCnt]uint8
keys [bucketCnt]T
values [bucketCnt]T
overflow uint8 //下一个桶指针
}

tophash 是干啥呢?

  • 一个桶里不是有 8 个元素吗?这 8 个元素是 hash 冲突了的。hash 冲突一定是低 x 位相等。比如%4=3的所有数一定是最后两位都是 11。但是其 top8 位不一定会相等。通过高 8 位判断可以快速桶里寻找。
  • 还可以用于做一些特殊的标识。比如某个 key 被删了。可以通过 tophash 标识

4 构造方法

创建 map 时,实际上会调用 runtime/map.go 文件中的 makemap 方法,下面对源码展开分析:

4.1 makemap

方法主干源码一览:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func makemap(t *maptype, hint int, h *hmap) *hmap {
hint 为 map 拟分配的容量;在分配前,会提前对拟分配的内存大小进行判断,倘若超限,会将 hint 置为零;
mem, overflow := math.MulUintptr(uintptr(hint), t.bucket.size)
if overflow || mem > maxAlloc {
hint = 0
}


if h == nil {
h = new(hmap)
}
h.hash0 = fastrand() 调用 fastrand,构造 hash 因子:hmap.hash0;


B := uint8(0)
for overLoadFactor(hint, B) {
大致上基于 log2(B) >= hint 的思路计算桶数组的容量 B;overLoadFactor具体内容下面马上贴
B++
}
h.B = B


if h.B != 0 {
var nextOverflow *bmap
h.buckets, nextOverflow = makeBucketArray(t, h.B, nil) 调用 makeBucketArray 方法,初始化桶数组 hmap.buckets;
if nextOverflow != nil {
倘若 map 容量较大,会提前申请一批溢出桶 hmap.extra.
h.extra = new(mapextra)
h.extra.nextOverflow = nextOverflow
}
}


return

4.2 overLoadFactor

通过 overLoadFactor 方法,对 map 预分配容量和桶数组长度指数进行判断,决定是否仍需要增长 B 的数值:

1
2
3
4
5
6
7
8
9
10
11
12
13
const loadFactorNum = 13
const loadFactorDen = 2
const goarch.PtrSize = 8
const bucketCnt = 8


func overLoadFactor(count int, B uint8) bool {
return count > bucketCnt && uintptr(count) > loadFactorNum*(bucketShift(B)/loadFactorDen)
}


func bucketShift(b uint8) uintptr {
return uintptr(1) << (b & (goarch.PtrSize*8 - 1))

(1)倘若 map 预分配容量小于等于 8,B 取 0,桶的个数为 1;

(2)保证 map 预分配容量小于等于桶数组长度 * 6.5.

map 预分配容量、桶数组长度指数、桶数组长度之间的关系如下表:

kv 对数量 B 桶数组长度 2^B
0 ~ 8 0 1
9 ~ 13 1 2
14 ~ 26 2 4
27 ~ 52 3 8
2^(B-1) * 6.5+1 ~ 2^B*6.5 B 2^B

4.3 makeBucketArray

makeBucketArray 方法会进行桶数组的初始化,并根据桶的数量决定是否需要提前作溢出桶的初始化. 方法主干代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
makeBucketArray 会为 map 的桶数组申请内存,在桶数组的指数 b >= 4时(桶数组的容量 >= 52 ),会需要提前创建溢出桶.
func makeBucketArray(t *maptype, b uint8, dirtyalloc unsafe.Pointer) (buckets unsafe.Pointer, nextOverflow *bmap) {
base := bucketShift(b) 记录桶数组的长度,不包含溢出桶;
nbuckets := base 记录累加上溢出桶后,桶数组的总长度.
if b >= 4 {
nbuckets += bucketShift(b - 4)
}

buckets = newarray(t.bucket, int(nbuckets)) 为桶数组申请内存空间,连带着需要初始化的溢出桶:

if base != nbuckets {
//说明需要创建溢出桶,会基于地址偏移的方式,通过 nextOverflow 指向首个溢出桶的地址.
nextOverflow = (*bmap)(add(buckets, base*uintptr(t.bucketsize)))
last := (*bmap)(add(buckets, (nbuckets-1)*uintptr(t.bucketsize)))
last.setoverflow(t, (*bmap)(buckets)) //setoverflow下面贴
}
return buckets, nextOverflow
}

// 倘若需要创建溢出桶,会在将最后一个溢出桶的 overflow 指针指向 buckets 数组,以此来标识申请的溢出桶已经用完.
func (b *bmap) setoverflow(t *maptype, ovf *bmap) {
*(**bmap)(add(unsafe.Pointer(b), uintptr(t.bucketsize)-goarch.PtrSize)) = ovf
}

5 读流程

5.1 读流程梳理

map 读流程主要分为以下几步:

(1)根据 key 取 hash 值;

(2)根据 hash 值对桶数组取模,确定所在的桶;

(3)沿着桶链表依次遍历各个桶内的 key-value 对;

(4)命中相同的 key,则返回 value;倘若 key 不存在,则返回零值.

map 读操作最终会走进 runtime/map.go 的 mapaccess 方法中,下面开始阅读源码:

5.2 mapaccess 方法源码走读

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
func mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
if h == nil || h.count == 0 {
// 倘若 map 未初始化,或此时存在 key-value 对数量为 0,直接返回零值;
return unsafe.Pointer(&zeroVal[0])
}
if h.flags&hashWriting != 0 {
// const hashWriting = 4, 与4 就是 &100
// 倘若发现存在其他 goroutine 在写 map,直接抛出并发读写的 fatal error;其中,并发写标记,位于 hmap.flags 的第 3 个 bit 位;
fatal("concurrent map read and map write")
}

// 通过 maptype.hasher() 方法计算得到 key 的 hash 值,并对桶数组长度取模,取得对应的桶. 关于 hash 方法的内部实现,golang 并未暴露.
hash := t.hasher(key, uintptr(h.hash0))
m := bucketMask(h.B)

// 通过 hash&m 可以知道桶的位置b。其实就是取余运算。
x % 4 => x & 341003011
b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.bucketsize)))


if c := h.oldbuckets; c != nil {
// 在取老桶前,会先判断 map 的扩容流程是否是增量扩容,倘若是的话,说明老桶数组的长度是新桶数组的一半,需要将桶长度值 m 除以 2.
if !h.sameSizeGrow() {
m >>= 1 // 除以 2
}
// 如果是在渐进式 hash,则要看看是不是找老 b
oldb := (*bmap)(add(c, (hash&m)*uintptr(t.bucketsize)))
if !evacuated(oldb) { // evacuated 会告诉你有没有迁移完
b = oldb // 没迁移完,就从老 b 里找
}
}

top := tophash(hash) //取出高 8 位. tophash具体下面会贴
bucketloop:
for ; b != nil; b = b.overflow(t) {
// 外层for基于桶链表 依次遍历首个桶和后续的每个溢出桶
for i := uintptr(0); i < bucketCnt; i++ {
// 内层依次遍历一个桶内的 key-value 对.
if b.tophash[i] != top {
// 高 8 位不等于 tophash 值,可以跳过,但 go 还做了一个 tophash 判断是否删除的设计。
if b.tophash[i] == emptyRest {
// 倘若不匹配且当前位置 tophash 值为 0,说明桶的后续位置都未放入过元素,当前 key 在 map 中不存在,可以直接打破循环,返回零值. 这就是上面提到的 tophash 特殊标识的用法
break bucketloop
}
continue
}
// 高 8 位相等。进一步比较 key 是不是一致
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
if t.indirectkey() {
k = *((*unsafe.Pointer)(k))
}
if t.key.equal(key, k) {
// 倘若找到了相等的 key,则通过地址偏移的方式取到 value
e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))// 其中 dataOffset 为一个桶中 tophash 数组所占用的空间大小. 这个数字每次都是固定的。就是 x*8
if t.indirectelem() {
e = *((*unsafe.Pointer)(e))
}
return e //返回
}
}
}
// 找不到,反 0 值
return unsafe.Pointer(&zeroVal[0])
}

func bucketMask(b uint8) uintptr {
根据 B 求得桶数组长度 - 1 的值,用于后续的 & 运算,实现取模的效果
return bucketShift(b) - 1
}

func (h *hmap) sameSizeGrow() bool {
return h.flags&sameSizeGrow != 0 //const sameSizeGrow = 8
// 与 8 就是 => & 1000 ,其实就是看第 4 位是不是 1,flags 的每一位都有其标记
}


// 取老桶时,会调用 evacuated 方法判断数据是否已经迁移到新桶. 判断的方式是,取桶中首个 tophash 值,倘若该值为 2,3,4 中的一个,都代表数据已经完成迁移.
const emptyOne = 1
const evacuatedX = 2
const evacuatedY = 3
const evacuatedEmpty = 4
const minTopHash = 5
func evacuated(b *bmap) bool {
h := b.tophash[0]
return h > emptyOne && h < minTopHash
}

// 取 key hash 值的高 8 位值 top. 倘若该值 < 5,会累加 5,以避开 0 ~ 4 的取值. 因为这几个值会用于枚举,具有一些特殊的含义.
func tophash(hash uintptr) uint8 {
top := uint8(hash >> (goarch.PtrSize*8 - 8))
if top < minTopHash { //const minTopHash = 5
top += minTopHash
}
return top
}

6 写流程

6.1 写流程梳理

map 写流程主要分为以下几步:

(1)根据 key 取 hash 值;

(2)根据 hash 值对桶数组取模,确定所在的桶;

(3)倘若 map 处于扩容,则迁移命中的桶,帮助推进渐进式扩容;

(4)沿着桶链表依次遍历各个桶内的 key-value 对;

(5)倘若命中相同的 key,则对 value 中进行更新;

(6)倘若 key 不存在,则插入 key-value 对;

(7)倘若发现 map 达成扩容条件,则会开启扩容模式,并重新返回第(2)步.

map 写操作最终会走进 runtime/map.gomapassign 方法中,下面开始阅读源码:

6.2 mapassign

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
const emptyRest = 0  // 包括当前位置在内,此后的位置都为空
const emptyOne = 1 // 当前位置未放入元素


func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
if h == nil {
// 倘若 map 未初始化,直接 panic
panic(plainError("assignment to entry in nil map"))
}
if h.flags&hashWriting != 0 {
// 其他 goroutine 在进行写或删操作,抛出并发写 fatal error
fatal("concurrent map writes")
}
hash := t.hasher(key, uintptr(h.hash0)) //求得 key 对应的 hash 值;


h.flags ^= hashWriting // 异或,将 map.flags 的第 3 个 bit 位置为 1,添加写标记;


if h.buckets == nil {
// 还没有桶,初始化一下
h.buckets = newobject(t.bucket)
}


again:
bucket := hash & bucketMask(h.B) // 找到当前 key 对应的桶索引 bucket;bucketMask上面讲过。(模4就是&3那逻辑)
if h.growing() {
// 发现当前 map 正处于扩容过程,则帮助其渐进扩容。具体怎么扩容文章最后涉及
growWork(t, h, bucket)
}

// 从 map 的桶数组 buckets 出发,结合桶索引和桶容量大小,进行地址偏移,获得对应桶 b;
b := (*bmap)(add(h.buckets, bucket*uintptr(t.bucketsize)))
top := tophash(hash) //取得 key 的高 8 位 tophash:


var inserti *uint8 //tophash 拟插入位置
var insertk unsafe.Pointer //key 拟插入位置
var elem unsafe.Pointer //val 拟插入位置
bucketloop:
// 2 层循环找位置入桶
for {
// 这层 for 的终止条件是内层的后面的 if ovf == nil
for i := uintptr(0); i < bucketCnt; i++ {
if b.tophash[i] != top {
if isEmpty(b.tophash[i]) && inserti == nil { // isEmpty下面会贴。就是判断是不是<=1
// <=1 说明已经找不到了。可以 break

// 尝试将 inserti、insertk elem 调整指向首个空位,用于后续的插入操作.
inserti = &b.tophash[i]
insertk = add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
}
// 能走到这,说明要么后面的元素被删了(0),要么是空位(1)。
// 我们还需要找到1。如果是 0 就继续continue
if b.tophash[i] == emptyRest {
break bucketloop// 找到空位,这里直接 break 的是 bucketloop,注意,都不是外层 for
}
continue
}
// 能走到这,可能找到了相等的 key
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
if t.indirectkey() {
k = *((*unsafe.Pointer)(k))
}
if !t.key.equal(key, k) {
// 不等
continue
}
// 更新操作
if t.needkeyupdate() {
typedmemmove(t.key, k, key)
}
elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
goto done
}

// 内层循环没找到,b指向下一个桶overflow,继续下一路外层for
ovf := b.overflow(t)
if ovf == nil {
break // 终止外层 for
}
b = ovf
}


// 倘若没找到相等的 key,会在执行插入操作前,判断 map 是否需要开启扩容模式.
// 倘若需要扩容,会在开启扩容模式后,跳转回 again 标志位,重新开始桶的定位以及遍历流程.
if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
hashGrow(t, h)
goto again
}


// 倘若遍历完桶链表,都没有为当前待插入的 key-value 对找到空位,则会创建一个新的溢出桶,挂载在桶链表的尾部,并将 inserti、insertk、elem 指向溢出桶的首个空位:
if inserti == nil {
newb := h.newoverflow(t, b) // newoverflow 具体后面会贴
inserti = &newb.tophash[0]
insertk = add(unsafe.Pointer(newb), dataOffset)
elem = add(insertk, bucketCnt*uintptr(t.keysize))
}


// 将 tophash、key、value 插入到取得空位中
if t.indirectkey() {
kmem := newobject(t.key)
*(*unsafe.Pointer)(insertk) = kmem
insertk = kmem
}
if t.indirectelem() {
vmem := newobject(t.elem)
*(*unsafe.Pointer)(elem) = vmem
}
typedmemmove(t.key, insertk, key)
*inserti = top
h.count++ // map 的 key-value 对计数器 count 值加 1;




done:
if h.flags&hashWriting == 0 {
// 收尾环节,再次校验是否有其他协程并发写,倘若有,则抛 fatal error.
fatal("concurrent map writes")
}

h.flags &^= hashWriting// 将 hmap.flags 中的写标记抹去
if t.indirectelem() {
elem = *((*unsafe.Pointer)(elem))
}
return //完成!
}

func isEmpty(x uint8) bool {
return x <= emptyOne
}

创建溢出桶时:

I 倘若 hmap.extra 中还有剩余可用的溢出桶,则直接获取 hmap.extra.nextOverflow,并将 nextOverflow 调整指向下一个空闲可用的溢出桶;

II 倘若 hmap 已经没有空闲溢出桶了,则创建一个新的溢出桶.

III hmap 的溢出桶数量 hmap.noverflow 累加 1;

IV 将新获得的溢出桶添加到原桶链表的尾部;

V 返回溢出桶.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (h *hmap) newoverflow(t *maptype, b *bmap) *bmap {
var ovf *bmap
if h.extra != nil && h.extra.nextOverflow != nil {
ovf = h.extra.nextOverflow
if ovf.overflow(t) == nil {
h.extra.nextOverflow = (*bmap)(add(unsafe.Pointer(ovf), uintptr(t.bucketsize)))
} else {
ovf.setoverflow(t, nil)
h.extra.nextOverflow = nil
}
} else {
ovf = (*bmap)(newobject(t.bucket))
}
h.incrnoverflow()
if t.bucket.ptrdata == 0 {
h.createOverflow()
*h.extra.overflow = append(*h.extra.overflow, ovf)
}
b.setoverflow(t, ovf)
return ovf
}

7 删流程

7.1 删除 kv 对流程梳理

map 删楚 kv 对流程主要分为以下几步:

(1)根据 key 取 hash 值;

(2)根据 hash 值对桶数组取模,确定所在的桶;

(3)倘若 map 处于扩容,则迁移命中的桶,帮助推进渐进式扩容;

(4)沿着桶链表依次遍历各个桶内的 key-value 对;

(5)倘若命中相同的 key,删除对应的 key-value 对;并将当前位置的 tophash 置为 emptyOne,表示为空;

(6)倘若当前位置为末位,或者下一个位置的 tophash 为 emptyRest,则沿当前位置向前遍历,将毗邻的 emptyOne 统一更新为 emptyRest.

map 删操作最终会走进 runtime/map.gomapdelete 方法中,下面开始阅读源码:

7.2 mapdelete

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
func mapdelete(t *maptype, h *hmap, key unsafe.Pointer) {
if h == nil || h.count == 0 {
// 倘若 map 未初始化或者内部 key-value 对数量为 0,删除时不会报错,直接返回;
return
}
if h.flags&hashWriting != 0 {
// 倘若存在其他 goroutine 在进行写或删操作,抛出并发写的 fatal error;
fatal("concurrent map writes")
}


// 通过 maptype.hasher() 方法求得 key 对应的 hash 值;
hash := t.hasher(key, uintptr(h.hash0))


h.flags ^= hashWriting //通过异或位运算,添加写标记;


bucket := hash & bucketMask(h.B) //找到当前 key 对应的桶索引 bucket;
if h.growing() {
// 倘若发现当前 map 正处于扩容过程,则帮助其渐进扩容
growWork(t, h, bucket)
}
b := (*bmap)(add(h.buckets, bucket*uintptr(t.bucketsize)))
// 从 map 的桶数组 buckets 出发,结合桶索引和桶容量大小,进行地址偏移,获得对应桶 b,并赋值给 bOrg
bOrig := b
top := tophash(hash) //取得 key 的高 8 位 tophash
search:
for ; b != nil; b = b.overflow(t) {
for i := uintptr(0); i < bucketCnt; i++ {
if b.tophash[i] != top {
if b.tophash[i] == emptyRest {
// 后头都没啥了。别找了
break search
}
continue
}

k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))// 地址偏移获得现在的 k
k2 := k
if t.indirectkey() {
k2 = *((*unsafe.Pointer)(k2))
}
if !t.key.equal(key, k2) {
// 不相等,则继续遍历
continue
}

// 则删除对应的 key-value 对
// Only clear key if there are pointers in it.
if t.indirectkey() {
*(*unsafe.Pointer)(k) = nil
} else if t.key.ptrdata != 0 {
memclrHasPointers(k, t.key.size)
}
e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
if t.indirectelem() {
*(*unsafe.Pointer)(e) = nil
} else if t.elem.ptrdata != 0 {
memclrHasPointers(e, t.elem.size)
} else {
memclrNoHeapPointers(e, t.elem.size)
}
b.tophash[i] = emptyOne //将当前位置的 tophash 置为 emptyOne
if i == bucketCnt-1 {
if b.overflow(t) != nil && b.overflow(t).tophash[0] != emptyRest {
goto notLast
}
} else {
if b.tophash[i+1] != emptyRest {
// 倘若当前位置不位于最后一个桶的最后一个位置,或者当前位置的后置位 tophash 不为 emptyRest,则无需向前遍历更新 tophash 标识,直接跳转到 notLast 位置即可;
goto notLast
}
}
for {
// 向前遍历,将沿途的空位( tophash 为 emptyOne )的 tophash 都更新为 emptySet.
b.tophash[i] = emptyRest
if i == 0 {
// 更新到头了。往前找一个桶
if b == bOrig {
// 前面也不会有桶了
break
}
c := b
for b = bOrig; b.overflow(t) != c; b = b.overflow(t) {
} // 定位到当前的桶
i = bucketCnt - 1 //调整下标,从最后一个开始搞!
} else {
i--
}
if b.tophash[i] != emptyOne {
break //不是空才停止
}
}
notLast:
// 倘若成功从 map 中删除了一组 key-value 对,则将 hmap 的计数器 count 值减 1.
h.count--
if h.count == 0 {
// 倘若 map 中的元素全都被删除完了,会为 map 更换一个新的随机因子 hash0.
h.hash0 = fastrand()
}
break search
}
}


if h.flags&hashWriting == 0 {
// 再次校验是否有其他协程并发写,倘若有,则抛 fatal error.
fatal("concurrent map writes")
}
h.flags &^= hashWritin //将 hmap.flags 中的写标记抹去,然后退出方法.

8 遍历流程

map 的遍历流程首先会走进 runtime/map.go 的 mapiterinit() 方法当中,初始化用于遍历的迭代器 hiter;接着会调用 runtime/map.gomapiternext() 方法开启遍历流程.

8.1 迭代器数据结构

hiter 是遍历 map 时用于存放临时数据的迭代器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
type hiter struct {
key unsafe.Pointer 指向遍历得到 key 的指针;
elem unsafe.Pointer 指向遍历得到 value 的指针;
t *maptype map 类型,包含了 key、value 类型大小等信息;
h *hmap map 的指针;
buckets unsafe.Pointer map 的桶数组;
bptr *bmap 当前遍历到的桶;
overflow *[]*bmap 新老桶数组对应的溢出桶;
oldoverflow *[]*bmap
startBucket uintptr 遍历起始位置的桶索引;
offset uint8 遍历起始位置的 key-value 对索引;
wrapped bool 遍历是否穿越桶数组尾端回到头部了;
B uint8 桶数组的长度指数;
i uint8 当前遍历到的 key-value 对在桶中的索引;
bucket uintptr 当前遍历到的桶;
checkBucket uintptr 因为扩容流程的存在,需要额外检查的桶.
}

8.2 mapiterinit

map 遍历流程开始时,首先会走进 runtime/map.go 的 mapiterinit() 方法当中,此时会对创建 map 迭代器 hiter,并且通过取随机数的方式,决定遍历的起始桶号,以及起始 key-value 对索引号.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func mapiterinit(t *maptype, h *hmap, it *hiter) {
it.t = t
if h == nil || h.count == 0 {
return
}


it.h = h


it.B = h.B
it.buckets = h.buckets
if t.bucket.ptrdata == 0 {
h.createOverflow()
it.overflow = h.extra.overflow
it.oldoverflow = h.extra.oldoverflow
}


// decide where to start
// 通过取随机数的方式,决定遍历时的起始桶,以及桶中起始 key-value 对的位置。一定注意。有两个随机。
var r uintptr
r = uintptr(fastrand())
it.startBucket = r & bucketMask(h.B) // startBucket 是随机的。(开始的桶位置
it.offset = uint8(r >> h.B & (bucketCnt - 1)) // offset 也是随机的。(每个桶开始的元素位置


// iterator state
it.bucket = it.startBucket


// Remember we have an iterator.
// Can run concurrently with another mapiterinit().
if old := h.flags; old&(iterator|oldIterator) != iterator|oldIterator {
atomic.Or8(&h.flags, iterator|oldIterator)
}

//完成迭代器 hiter 中各项参数的初始化后,进入 mapiternext 方法开启遍历.
mapiternext(

8.2 mapiternext

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
func mapiternext(it *hiter) {
h := it.h
if h.flags&hashWriting != 0 {
fatal("concurrent map iteration and map write")
}
t := it.t
bucket := it.bucket
b := it.bptr
i := it.i
checkBucket := it.checkBucket


next:
if b == nil {
if bucket == it.startBucket && it.wrapped {
it.key = nil
it.elem = nil
return
}
if h.growing() && it.B == h.B {
oldbucket := bucket & it.h.oldbucketmask()
b = (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)))
if !evacuated(b) {
checkBucket = bucket
} else {
b = (*bmap)(add(it.buckets, bucket*uintptr(t.bucketsize)))
checkBucket = noCheck
}
} else {
b = (*bmap)(add(it.buckets, bucket*uintptr(t.bucketsize)))
checkBucket = noCheck
}
bucket++
if bucket == bucketShift(it.B) {
bucket = 0
it.wrapped = true
}
i = 0
}
for ; i < bucketCnt; i++ {
offi := (i + it.offset) & (bucketCnt - 1)
if isEmpty(b.tophash[offi]) || b.tophash[offi] == evacuatedEmpty {
continue
}
k := add(unsafe.Pointer(b), dataOffset+uintptr(offi)*uintptr(t.keysize))
if t.indirectkey() {
k = *((*unsafe.Pointer)(k))
}
e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+uintptr(offi)*uintptr(t.elemsize))
if checkBucket != noCheck && !h.sameSizeGrow() {
if checkBucket>>(it.B-1) != uintptr(b.tophash[offi]&1) {
continue
}

}
if (b.tophash[offi] != evacuatedX && b.tophash[offi] != evacuatedY) ||
!(t.reflexivekey() || t.key.equal(k, k)) {

it.key = k
if t.indirectelem() {
e = *((*unsafe.Pointer)(e))
}
it.elem = e
} else {
rk, re := mapaccessK(t, h, k)
if rk == nil {
continue // key has been deleted
}
it.key = rk
it.elem = re
}
it.bucket = bucket
if it.bptr != b { // avoid unnecessary write barrier; see issue 14921
it.bptr = b
}
it.i = i + 1
it.checkBucket = checkBucket
return
}
b = b.overflow(t)
i = 0
goto next
}

(1)遍历时发现其他 goroutine 在并发写,直接抛出 fatal error:

1
2
3
if h.flags&hashWriting != 0 {
fatal("concurrent map iteration and map write")
}

(2)开启最外圈的循环,依次遍历桶数组中的每个桶链表,通过 next 和 goto next 关键字实现循环代码块;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
next:
if b == nil {
// ...
b = (*bmap)(add(it.buckets, bucket*uintptr(t.bucketsize)))
//
bucket++
if bucket == bucketShift(it.B) {
bucket = 0
it.wrapped = true
}
i = 0
}
// ...
b = b.overflow(t)
// ...
goto next
}

(3)倘若已经遍历完所有的桶,重新回到起始桶为止,则直接结束方法;

1
2
3
4
5
if bucket == it.startBucket && it.wrapped {
it.key = nil
it.elem = nil
return
}

(4)倘若 map 处于扩容流程,取桶时兼容新老桶数组的逻辑. 倘若桶处于旧桶数组且未完成迁移,需要将 checkBucket 置为当前的桶号;

1
2
3
4
5
6
7
8
9
10
11
12
13
if h.growing() && it.B == h.B {
oldbucket := bucket & it.h.oldbucketmask()
b = (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)))
if !evacuated(b) {
checkBucket = bucket
} else {
b = (*bmap)(add(it.buckets, bucket*uintptr(t.bucketsize)))
checkBucket = noCheck
}
} else {
b = (*bmap)(add(it.buckets, bucket*uintptr(t.bucketsize)))
checkBucket = noCheck
}

(5)遍历的桶号加 1,倘若来到桶数组末尾,则将桶号置为 0. 将 key-value 对的遍历索引 i 置为 0.

1
2
3
4
5
6
bucket++
if bucket == bucketShift(it.B) {
bucket = 0
it.wrapped = true
}
i = 0

(6)依次遍历各个桶中每个 key-value 对:

1
2
3
4
for ; i < bucketCnt; i++ {
// ...
return
}

(7)倘若遍历到的桶属于旧桶数组未迁移完成的桶,需要按照其在新桶中的顺序完成遍历. 比如,增量扩容流程中,旧桶中的 key-value 对最终应该被分散迁移到新桶数组的 x、y 两个区域,则此时遍历时,哪怕 key-value 对仍存留在旧桶中未完成迁移,遍历时也应该严格按照其在新桶数组中的顺序来执行.

1
2
3
4
5
6
if checkBucket != noCheck && !h.sameSizeGrow() {

if checkBucket>>(it.B-1) != uintptr(b.tophash[offi]&1) {
continue
}
}

(8)执行 mapaccessK 方法,基于读流程方法获取 key-value 对,通过迭代 hiter 的 key、value 指针进行接收,用于对用户的遍历操作进行响应:

1
2
3
4
5
6
rk, re := mapaccessK(t, h, k)
if rk == nil {
continue // key has been deleted
}
it.key = rk
it.elem = re

9 扩容流程

9.1 扩容类型

图片

map 的扩容类型分为两类,一类叫做增量扩容,一类叫做等量扩容.

(1)增量扩容 biggerSizeGrow

表现:扩容后,桶数组的长度增长为原长度的 2 倍;

目的:降低每个桶中 key-value 对的数量,优化 map 操作的时间复杂度.

count/size > 6.5 (装载因子 :overLoadFactor), 避免读写效率降低。

扩容一倍,并渐进的在赋值和删除(mapassign和mapdelete)期间,

对每个桶重新分流到x(原来桶区间)和y(扩容后的增加的新桶区间)

这里overLoadFactor (count/size)是评估桶的平均装载数据能力,即map平均每个桶装载多少个k/v。

这个值太大,则桶不够用,会有太多溢出桶;太小,则分配了太多桶,浪费了空间。

6.5是测试后对map装载能力最大化的一个的选择。

(2)等量扩容 sameSizeGrow

表现:扩容后,桶数组的长度和之前保持一致;但是溢出桶的数量会下降.

目的:提高桶主体结构的数据填充率,减少溢出桶数量,避免发生内存泄漏.

过多的overflow使用,使用等大小的buckets重新整理,回收多余的overflow桶,提高map读写效率,减少溢出桶占用

这里借助hmap.noverflow来判断溢出桶是否过多

hmap.B<=15 时,判断是溢出桶是否多于桶数1<<hmap.B

否则只判断 溢出桶是否多于 1<<15

这也就是为啥hmap.noverflow,当其接近1<<15 - 1时为近似值, 只要可以评估是否溢出桶过多不合理就行了

9.2 何时扩容

(1)只有 map 的写流程可能开启扩容模式;

(2)写 map 新插入 key-value 对之前,会发起是否需要扩容的逻辑判断:

1
2
3
4
5
6
7
8
9
10
11
func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
// ...

if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
hashGrow(t, h)
goto again
}


// ...
}

会不会出现扩容过程中,又一次海量数据插入需要引发二次扩容?

不会的,map 不允许并发!

(3)根据 hmap 的 oldbuckets 是否空,可以判断 map 此前是否已开启扩容模式:

1
2
3
func (h *hmap) growing() bool {
return h.oldbuckets != nil
}

(4)倘若此前未进入扩容模式,且 map 中 key-value 对的数量超过 8 个,且大于桶数组长度的 6.5 倍,则进入增量扩容:

1
2
3
4
5
6
7
8
9
10
const(
loadFactorNum = 13
loadFactorDen = 2
bucketCnt = 8
)


func overLoadFactor(count int, B uint8) bool {
return count > bucketCnt && uintptr(count) > loadFactorNum*(bucketShift(B)/loadFactorDen)
}

(5)倘若溢出桶的数量大于 2^B 个(即桶数组的长度;B 大于 15 时取15),则进入等量扩容:

1
2
3
4
5
6
func tooManyOverflowBuckets(noverflow uint16, B uint8) bool {
if B > 15 {
B = 15
}
return noverflow >= uint16(1)<<(B&15)
}

9.3 如何开启扩容模式

开启扩容模式的方法位于 runtime/map.go 的 hashGrow 方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func hashGrow(t *maptype, h *hmap) {
bigger := uint8(1)
if !overLoadFactor(h.count+1, h.B) {
bigger = 0
h.flags |= sameSizeGrow
}
oldbuckets := h.buckets
newbuckets, nextOverflow := makeBucketArray(t, h.B+bigger, nil)




flags := h.flags &^ (iterator | oldIterator)
if h.flags&iterator != 0 {
flags |= oldIterator
}
// commit the grow (atomic wrt gc)
h.B += bigger
h.flags = flags
h.oldbuckets = oldbuckets
h.buckets = newbuckets
h.nevacuate = 0
h.noverflow = 0


if h.extra != nil && h.extra.overflow != nil {
// Promote current overflow buckets to the old generation.
if h.extra.oldoverflow != nil {
throw("oldoverflow is not nil")
}
h.extra.oldoverflow = h.extra.overflow
h.extra.overflow = nil
}
if nextOverflow != nil {
if h.extra == nil {
h.extra = new(mapextra)
}
h.extra.nextOverflow = nextOverflow
}

(1)倘若是增量扩容,bigger 值取 1;倘若是等量扩容,bigger 值取 0,并将 hmap.flags 的第 4 个 bit 位置为 1,标识当前处于等量扩容流程.

1
2
3
4
5
6
7
8
const sameSizeGrow = 8


bigger := uint8(1)
if !overLoadFactor(h.count+1, h.B) {
bigger = 0
h.flags |= sameSizeGrow
}

(2)将原桶数组赋值给 oldBuckets,并创建新的桶数组和一批新的溢出桶.

此处会通过变量 bigger,实现不同扩容模式下,新桶数组长度的区别处理.

1
2
oldbuckets := h.buckets
newbuckets, nextOverflow := makeBucketArray(t, h.B+bigger, nil)

(3)更新 hmap 的桶数组长度指数 B,flag 标识,并将新、老桶数组赋值给 hmap.oldBuckets 和 hmap.buckets;扩容迁移进度 hmap.nevacuate 标记为 0;新桶数组的溢出桶数量 hmap.noverflow 置为 0.

1
2
3
4
5
6
7
8
9
10
11
flags := h.flags &^ (iterator | oldIterator)
if h.flags&iterator != 0 {
flags |= oldIterator
}
// commit the grow (atomic wrt gc)
h.B += bigger
h.flags = flags
h.oldbuckets = oldbuckets
h.buckets = newbuckets
h.nevacuate = 0
h.noverflow = 0

(4)将原本存量可用的溢出桶赋给 hmap.extra.oldoverflow;倘若存在下一个可用的溢出桶,赋给 hmap.extra.nextOverflow.

1
2
3
4
5
6
7
8
9
10
 if h.extra != nil && h.extra.overflow != nil {
h.extra.oldoverflow = h.extra.overflow
h.extra.overflow = nil
}
if nextOverflow != nil {
if h.extra == nil {
h.extra = new(mapextra)
}
h.extra.nextOverflow = nextOverflow
}

9.4 扩容迁移规则

图片

(1)在等量扩容中,新桶数组长度与原桶数组相同;

(2)key-value 对在新桶数组和老桶数组的中的索引号保持一致;

(3)在增量扩容中,新桶数组长度为原桶数组的两倍;

(4)把新桶数组中桶号对应于老桶数组的区域称为 x 区域,新扩展的区域称为 y 区域.

(5)实际上,一个 key 属于哪个桶,取决于其 hash 值对桶数组长度取模得到的结果,因此依赖于其低位的 hash 值结果.;

(6)在增量扩容流程中,新桶数组的长度会扩展一位,假定 key 原本从属的桶号为 i,则在新桶数组中从属的桶号只可能是 i (x 区域)或者 i + 老桶数组长度(y 区域);

(7)当 key 低位 hash 值向左扩展一位的 bit 位为 0,则应该迁往 x 区域的 i 位置;倘若该 bit 位为 1,应该迁往 y 区域对应的 i + 老桶数组长度的位置.

9.5 渐进式扩容

map 采用的是渐进扩容的方式,避免因为一次性的全量数据迁移引发性能抖动.

当每次触发写、删操作时,会为处于扩容流程中的 map 完成两组桶的数据迁移:

(1)一组桶是当前写、删操作所命中的桶;

(2)另一组桶是,当前未迁移的桶中,索引最小的那个桶.

1
2
3
4
5
6
7
8
9
10
11
func growWork(t *maptype, h *hmap, bucket uintptr) {
// make sure we evacuate the oldbucket corresponding
// to the bucket we're about to use
evacuate(t, h, bucket&h.oldbucketmask())


// evacuate one more oldbucket to make progress on growing
if h.growing() {
evacuate(t, h, h.nevacuate)
}
}

图片

数据迁移的逻辑位于 runtime/map.go 的 evacuate 方法当中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
func evacuate(t *maptype, h *hmap, oldbucket uintptr) {
// 入参中,oldbucket 为当前要迁移的桶在旧桶数组中的索引
// 获取到待迁移桶的内存地址 b
b := (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)))
// 获取到旧桶数组的容量 newbit
newbit := h.noldbuckets()
// evacuated 方法判断出桶 b 是否已经迁移过了,未迁移过,才进入此 if 分支进行迁移处理
if !evacuated(b) {
// 通过一个二元数组 xy 指向当前桶可能迁移到的目的桶
// x = xy[0],代表新桶数组中索引和旧桶数组一致的桶
// y = xy[1],代表新桶数组中,索引为原索引加上旧桶容量的桶,只在增量扩容中会使用到
var xy [2]evacDst
x := &xy[0]
x.b = (*bmap)(add(h.buckets, oldbucket*uintptr(t.bucketsize)))
x.k = add(unsafe.Pointer(x.b), dataOffset)
x.e = add(x.k, bucketCnt*uintptr(t.keysize))


// 只有进入增量扩容的分支,才需要对 y 进行初始化
if !h.sameSizeGrow() {
// Only calculate y pointers if we're growing bigger.
// Otherwise GC can see bad pointers.
y := &xy[1]
y.b = (*bmap)(add(h.buckets, (oldbucket+newbit)*uintptr(t.bucketsize)))
y.k = add(unsafe.Pointer(y.b), dataOffset)
y.e = add(y.k, bucketCnt*uintptr(t.keysize))
}


// 外层 for 循环,遍历桶 b 和对应的溢出桶
for ; b != nil; b = b.overflow(t) {
// k,e 分别记录遍历桶时,当前的 key 和 value 的指针
k := add(unsafe.Pointer(b), dataOffset)
e := add(k, bucketCnt*uintptr(t.keysize))
// 遍历桶内的 key-value 对
for i := 0; i < bucketCnt; i, k, e = i+1, add(k, uintptr(t.keysize)), add(e, uintptr(t.elemsize)) {
top := b.tophash[i]
if isEmpty(top) {
b.tophash[i] = evacuatedEmpty
continue
}
if top < minTopHash {
throw("bad map state")
}
k2 := k
if t.indirectkey() {
k2 = *((*unsafe.Pointer)(k2))
}
var useY uint8
if !h.sameSizeGrow() {
// Compute hash to make our evacuation decision (whether we need
// to send this key/elem to bucket x or bucket y).
hash := t.hasher(k2, uintptr(h.hash0))
if hash&newbit != 0 {
useY = 1
}
}
b.tophash[i] = evacuatedX + useY // evacuatedX + 1 == evacuatedY
dst := &xy[useY] // evacuation destination
if dst.i == bucketCnt {
dst.b = h.newoverflow(t, dst.b)
dst.i = 0
dst.k = add(unsafe.Pointer(dst.b), dataOffset)
dst.e = add(dst.k, bucketCnt*uintptr(t.keysize))
}
dst.b.tophash[dst.i&(bucketCnt-1)] = top // mask dst.i as an optimization, to avoid a bounds check
if t.indirectkey() {
*(*unsafe.Pointer)(dst.k) = k2 // copy pointer
} else {
typedmemmove(t.key, dst.k, k) // copy elem
}
if t.indirectelem() {
*(*unsafe.Pointer)(dst.e) = *(*unsafe.Pointer)(e)
} else {
typedmemmove(t.elem, dst.e, e)
}
dst.i++
dst.k = add(dst.k, uintptr(t.keysize))
dst.e = add(dst.e, uintptr(t.elemsize))
}
}
// Unlink the overflow buckets & clear key/elem to help GC.
if h.flags&oldIterator == 0 && t.bucket.ptrdata != 0 {
b := add(h.oldbuckets, oldbucket*uintptr(t.bucketsize))
// Preserve b.tophash because the evacuation
// state is maintained there.
ptr := add(b, dataOffset)
n := uintptr(t.bucketsize) - dataOffset
memclrHasPointers(ptr, n)
}
}


if oldbucket == h.nevacuate {
advanceEvacuationMark(h, t, newbit)
}
}


func (h *hmap) noldbuckets() uintptr {
oldB := h.B
if !h.sameSizeGrow() {
oldB--
}
return bucketShift(oldB)

(1)从老桶数组中获取到待迁移的桶 b;

1
b := (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)))

(2)获取到老桶数组的长度 newbit;

1
newbit := h.noldbuckets()

(3)倘若当前桶已经完成了迁移,则无需处理;

(4)创建一个二元数组 xy,分别承载 x 区域和 y 区域(含义定义见 9.4 小节)中的新桶位置,用于接受来自老桶数组的迁移数组;只有在增量扩容的流程中,才存在 y 区域,因此才需要对 xy 中的 y 进行定义;

1
2
3
4
5
6
7
8
9
10
11
12
13
var xy [2]evacDst
x := &xy[0]
x.b = (*bmap)(add(h.buckets, oldbucket*uintptr(t.bucketsize)))
x.k = add(unsafe.Pointer(x.b), dataOffset)
x.e = add(x.k, bucketCnt*uintptr(t.keysize))


if !h.sameSizeGrow() {
y := &xy[1]
y.b = (*bmap)(add(h.buckets, (oldbucket+newbit)*uintptr(t.bucketsize)))
y.k = add(unsafe.Pointer(y.b), dataOffset)
y.e = add(y.k, bucketCnt*uintptr(t.keysize))
}

(5)开启两层 for 循环,外层遍历桶链表,内层遍历每个桶中的 key-value 对:

1
2
3
4
5
6
7
8
for ; b != nil; b = b.overflow(t) {
k := add(unsafe.Pointer(b), dataOffset)
e := add(k, bucketCnt*uintptr(t.keysize))
for i := 0; i < bucketCnt; i, k, e = i+1, add(k, uintptr(t.keysize)), add(e, uintptr(t.elemsize)) {
// ...
}
}

(6)取每个位置的 tophash 值进行判断,倘若当前是个空位,则将当前位置 tophash 值置为 evacuatedEmpty,开始遍历下一个位置:

1
2
3
4
5
top := b.tophash[i]
if isEmpty(top) {
b.tophash[i] = evacuatedEmpty
continue
}

(7)基于 9.4 的规则,寻找到迁移的目的桶;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
const evacuatedX = 2
const evacuatedY = 3


k2 := k
var useY uint8
if !h.sameSizeGrow() {
hash := t.hasher(k2, uintptr(h.hash0))
if hash&newbit != 0 {
useY = 1
}
}
b.tophash[i] = evacuatedX + useY // evacuatedX + 1 == evacuatedY
dst := &xy[useY]

其中目的桶的类型定义如下:

1
2
3
4
5
6
type evacDst struct {
b *bmap // current destination bucket
i int // key/elem index into b
k unsafe.Pointer // pointer to current key storage
e unsafe.Pointer // pointer to current elem storage
}

I evacDst.b:目的地的所在桶;

II evacDst.i:即将入桶的 key-value 对在桶中的索引;

III evacDst.k:入桶 key 的存储指针;

IV evacDst.e:入桶 value 的存储指针.

(8)将 key-value 对迁移到目的桶中,并且更新目的桶结构内几个指针的指向:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
if dst.i == bucketCnt {
dst.b = h.newoverflow(t, dst.b)
dst.i = 0
dst.k = add(unsafe.Pointer(dst.b), dataOffset)
dst.e = add(dst.k, bucketCnt*uintptr(t.keysize))
}
dst.b.tophash[dst.i&(bucketCnt-1)] = top // mask dst.i as an optimization, to avoid a bounds check
if t.indirectkey() {
*(*unsafe.Pointer)(dst.k) = k2 // copy pointer
} else {
typedmemmove(t.key, dst.k, k) // copy elem
}
if t.indirectelem() {
*(*unsafe.Pointer)(dst.e) = *(*unsafe.Pointer)(e)
} else {
typedmemmove(t.elem, dst.e, e)
}
dst.i++
dst.k = add(dst.k, uintptr(t.keysize))
dst.e = add(dst.e, uintptr(t.elemsize

(9)倘若当前迁移的桶是旧桶数组未迁移的桶中索引最小的一个,则 hmap.nevacuate 累加 1.

倘若已经迁移完所有的旧桶,则会确保 hmap.flags 中,等量扩容的标识位被置为 0.

1
2
3
if oldbucket == h.nevacuate {
advanceEvacuationMark(h, t, newbit)
}
1
2
3
4
5
6
7
8
9
10
11
func advanceEvacuationMark(h *hmap, t *maptype, newbit uintptr) {
h.nevacuate++
// ...
if h.nevacuate == newbit { // newbit == ## of oldbuckets
h.oldbuckets = nil
if h.extra != nil {
h.extra.oldoverflow = nil
}
h.flags &^= sameSizeGrow
}
}