cachego分片、上报、GC与数据加载机制原理

cachego 分片、上报、GC与数据加载机制原理

cachego github地址

  • 为减小锁之间的竞争,cachego引入了分片机制,将key分散到多个sharding中,减低锁冲突概率加速索引。 sharding机制可与lru、lfu、standard模式丝滑结合,以实现cache中所有sharding内存模型都是指定的某种类型。
  • reporter 上报功能,支持上报hit次数、miss次数、gc次数、load次数(从下层存储load次数),同时通过 option function 模式提供灵活的上报函数注册满足业务定制化上报功能。

newCache操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func newCache(withReport bool, opts ...Option) (cache Cache, reporter *Reporter) {
conf := newDefaultConfig()
applyOptions(conf, opts) // option 模式赋值config

newCache, ok := newCaches[conf.cacheType] // 选择内存模型 lru/lfu/standard
if !ok {
panic("cachego: cache type doesn't exist")
}

if conf.shardings > 0 { // cache分片, 本文讨论重点
cache = newShardingCache(conf, newCache)
} else {
cache = newCache(conf)
}

if withReport { // 上报, 本文讨论重点
cache, reporter = report(conf, cache)
}

if conf.gcDuration > 0 { // gc
RunGCTask(cache, conf.gcDuration)
}

return cache, reporter
}
  • newCache函数通过option模式允许用户自定义配置
  • 根据用户配置决定是否创建sharding与reporter
  • 根据所传gc间隔时长制定gc回收计划

shardingCache实现原理

newShardingCache操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func newShardingCache(conf *config, newCache func(conf *config) Cache) Cache {
if conf.shardings <= 0 {
panic("cachego: shardings must be > 0.")
}

if bits.OnesCount(uint(conf.shardings)) > 1 { // 只允许设置2的幂个分片
panic("cachego: shardings must be the pow of 2 (such as 64).")
}

caches := make([]Cache, 0, conf.shardings)
for i := 0; i < conf.shardings; i++ { // 循环创建分片数的cache,每个分片都是一个cache(lru/lfu/standard)
caches = append(caches, newCache(conf))
}

cache := &shardingCache{
config: conf,
caches: caches,
}

return cache
}

FNV-1a算法

1
2
3
4
5
6
7
8
9
10
func hash(key string) int { // 高效的sharding分片寻址算法, FNV-1a算法 
hash := 1469598103934665603

for _, r := range key {
hash = (hash << 5) - hash + int(r&0xffff)
hash *= 1099511628211
}

return hash
}

算法高效原因

  • 简单的操作,
    • 左移操作:hash << 5
    • 加法和减法:(hash << 5) - hash 加法是哈希混合的一部分,有助于在不同字符之间产生良好的差异。
    • 乘法:hash *= 1099511628211 使用一个质数进行乘法引入更多的不可预测性。质数乘法有助于散列过程的均匀分布,因为质数避免了模式和重复的出现,从而减少碰撞
  • 通过 (hash << 5) - hash 和 hash *= 1099511628211,每个字符都会引入一个新的混合因子,使得前面的字符和后面的字符对最终哈希值的贡献是均匀的,这意味着输入数据中的每一个字符都能够“影响”哈希值的所有位,导致哈希值具有很好的散列性质
  • 复杂度是 O(n), 通过质数乘法和每个字符的逐步混合减少哈希碰撞

set操作

1
2
3
func (sc *shardingCache) Set(key string, value interface{}, ttl time.Duration) (oldValue interface{}) {
return sc.cacheOf(key).Set(key, value, ttl)
}
1
2
3
4
5
6
func (sc *shardingCache) cacheOf(key string) Cache { // sharding 寻址操作
hash := sc.hash(key) // FNV-1a算法
mask := len(sc.caches) - 1

return sc.caches[hash&mask] // 与sharding数与操作选定sharding
}
  • 采用高效的FNV-1a算法获取key的散列值,确保了key的均匀性与sharding寻址速度
  • 最终写入操作还是三种核心cache执行,key-value最终会被存入所选的lruCache、lfuCache或standardCache
  • 除此之外shardingCache的hash也是支持自定义的,在创建cache时可自定义实现WithHash()

get操作

get操作通过FNV-1a算法定位到sharding后执行对应cache的get获取

reportableCache实现原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func report(conf *config, cache Cache) (Cache, *Reporter) {
reporter := &Reporter{ // 构建reporter
conf: conf,
cache: cache,
hitCount: 0,
missedCount: 0,
gcCount: 0,
loadCount: 0,
}

cache = &reportableCache{
config: conf,
Reporter: reporter,
}

return cache, reporter
}

set操作

1
2
3
func (rc *reportableCache) Set(key string, value interface{}, ttl time.Duration) (evictedValue interface{}) {
return rc.cache.Set(key, value, ttl)
}

set操作核心是调用另外三种cache的set接口执行写入操作

get操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (rc *reportableCache) Get(key string) (value interface{}, found bool) {
value, found = rc.cache.Get(key) // get

if found {
if rc.recordHit { // hit数自增, atomic库操作
rc.increaseHitCount()
}

if rc.reportHit != nil { // 上报
rc.reportHit(rc.Reporter, key, value) // reportHit 自定义上报函数
}
} else {
if rc.recordMissed { // miss数自增, atomic库操作
rc.increaseMissedCount()
}

if rc.reportMissed != nil { // 上报
rc.reportMissed(rc.Reporter, key) // reportMissed 自定义上报函数
}
}

return value, found
}
  • get操作分为两步操作,先获取到值,再执行上报逻辑,其中hit与miss数均使用atomic库操作,保证原子性。
  • 上报函数是由用户自定义,同时cachego也提供了HitRate与MissedRate等函数进行概率计算,详情可参考 ./_examples/report.go

gc与load数据加载

垃圾清理机制

gc函数用于手动清理过期key。可在创建cache时手动传入WithGC()指定定时清理时间间隔,或者手动调用GC()函数执行

定时清理

在创建cache时如果传入WithGC(),则会执行RunGCTask()函数在后台定时扫描并清理过期数据,详情可参考newCache函数

1
2
3
4
5
6
7
8
9
10
11
func RunGCTask(cache Cache, duration time.Duration) (cancel func()) {
fn := func(ctx context.Context) {
cache.GC() // 调用对应的cache只执行gc
}

ctx := context.Background()
ctx, cancel = context.WithCancel(ctx)

go task.New(fn).Context(ctx).Duration(duration).Run()
return cancel
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (t *Task) Run() {
if t.fn == nil {
return
}

if t.before != nil { // AOP前置操作
t.before(t.ctx)
}

if t.after != nil { // AOP后置操作
defer t.after(t.ctx)
}

ticker := time.NewTicker(t.duration)
defer ticker.Stop()

for { // ticker定时执行
select {
case <-t.ctx.Done():
return
case <-ticker.C:
t.fn(t.ctx) // 执行自定义清理函数
}
}
}
  • 定时垃圾回收通过ticker+for循环定时执行,且返回一个cancel函数支持手动终止回收任务
  • task包封装了任务执行过程,允许自定义函数前置和后置操作,类似AOP操作
  • 自动清理相比于手动清理多了一个定时任务触发

手动清理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (lc *lruCache) gc() (cleans int) {
now := lc.now()
scans := 0

for _, element := range lc.elementMap {
scans++

if entry := lc.unwrap(element); entry.expired(now) {
lc.removeElement(element)
cleans++
}

if lc.maxScans > 0 && scans >= lc.maxScans { // 控制扫描key的数量以实现部分扫描
break
}
}

return cleans // return被清理的数量
}

手动清理支持指定扫描key数量,以免扫描key过多导致资源占用过多
如果cache类型是reportableCache,则在清理完成后还会刷新相关指标

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (rc *reportableCache) GC() (cleans int) {
if rc.recordGC { // gc次数自增
rc.increaseGCCount()
}

if rc.reportGC == nil {
return rc.cache.GC()
}

begin := rc.now()
cleans = rc.cache.GC()
end := rc.now()

cost := time.Duration(end - begin)
rc.reportGC(rc.Reporter, cost, cleans) // 上报耗时、被清理key的数量

return cleans
}

load数据加载

当在cache获取到某个key不存在时,可以向下层存取加载,这是可以使用cachego的Load()函数实现。
Load()函数支持自定义加载函数,将从下层获取到的值再set会cachego中,且引入了singleflight减少缓存穿透的伤害

1
2
3
4
5
6
7
8
9
10
func (lc *lruCache) Load(key string, ttl time.Duration, load func() (value interface{}, err error)) (value interface{}, err error) {
value, err = lc.loader.Load(key, ttl, load)
if err != nil {
return value, err
}

lc.Set(key, value, ttl)
return value, nil
}

1
2
3
4
5
6
7
8
9
10
11
func (l *loader) Load(key string, ttl time.Duration, load func() (value interface{}, err error)) (value interface{}, err error) {
if load == nil {
return nil, errors.New("cachego: load function is nil in loader")
}

if l.group == nil {
return load()
}

return l.group.Call(key, load)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// Call calls fn in singleflight mode and returns its result and error.
func (g *Group) Call(key string, fn func() (interface{}, error)) (interface{}, error) { // 避免同一时间多次调用同一个函数
g.lock.Lock()

if c, ok := g.calls[key]; ok { // 如果已经存在调用,则直接wait等待结果返回即可
g.lock.Unlock()

// Waiting...
c.wg.Wait()
return c.result, c.err
}

c := newCall(fn)
c.wg.Add(1)

g.calls[key] = c
g.lock.Unlock()

c.do()
g.lock.Lock()

if !c.deleted { // 完成调用后,删除
delete(g.calls, key)
}

g.lock.Unlock()
return c.result, c.err
}
  • singleflight是什么

    • singleflight 避免重复的网络请求:对于重复的网络请求(如缓存查询),来避免多个 goroutine 发起重复的请求。
    • 避免重复的数据库查询:类似于网络请求,多个并发查询可以避免重复的数据库访问。
    • 共享计算结果:如果某个计算过程非常复杂且耗时,多个并发任务可以共享这个计算结果,而不需要重新计算
  • load函数核心为从下层取数据,再写入当前内存缓存中

  • 为避免同一key短时间内从下层存储中多次load,默认singleflight是开启的,如果不想用的话可以在新建 cachego传入WithDisableSingleflight()以关闭