cachego 分片、上报、GC与数据加载机制原理
cachego github地址
- 为减小锁之间的竞争,cachego引入了分片机制,将key分散到多个sharding中,减低锁冲突概率加速索引。 sharding机制可与lru、lfu、standard模式丝滑结合,以实现cache中所有sharding内存模型都是指定的某种类型。
- reporter 上报功能,支持上报hit次数、miss次数、gc次数、load次数(从下层存储load次数),同时通过 option function 模式提供灵活的上报函数注册满足业务定制化上报功能。
newCache操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| func newCache(withReport bool, opts ...Option) (cache Cache, reporter *Reporter) { conf := newDefaultConfig() applyOptions(conf, opts)
newCache, ok := newCaches[conf.cacheType] if !ok { panic("cachego: cache type doesn't exist") }
if conf.shardings > 0 { cache = newShardingCache(conf, newCache) } else { cache = newCache(conf) }
if withReport { cache, reporter = report(conf, cache) }
if conf.gcDuration > 0 { RunGCTask(cache, conf.gcDuration) }
return cache, reporter }
|
- newCache函数通过option模式允许用户自定义配置
- 根据用户配置决定是否创建sharding与reporter
- 根据所传gc间隔时长制定gc回收计划
shardingCache实现原理
newShardingCache操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| func newShardingCache(conf *config, newCache func(conf *config) Cache) Cache { if conf.shardings <= 0 { panic("cachego: shardings must be > 0.") }
if bits.OnesCount(uint(conf.shardings)) > 1 { panic("cachego: shardings must be the pow of 2 (such as 64).") }
caches := make([]Cache, 0, conf.shardings) for i := 0; i < conf.shardings; i++ { caches = append(caches, newCache(conf)) }
cache := &shardingCache{ config: conf, caches: caches, }
return cache }
|
FNV-1a算法
1 2 3 4 5 6 7 8 9 10
| func hash(key string) int { hash := 1469598103934665603
for _, r := range key { hash = (hash << 5) - hash + int(r&0xffff) hash *= 1099511628211 }
return hash }
|
算法高效原因
- 简单的操作,
- 左移操作:hash << 5
- 加法和减法:(hash << 5) - hash 加法是哈希混合的一部分,有助于在不同字符之间产生良好的差异。
- 乘法:hash *= 1099511628211 使用一个质数进行乘法引入更多的不可预测性。质数乘法有助于散列过程的均匀分布,因为质数避免了模式和重复的出现,从而减少碰撞
- 通过 (hash << 5) - hash 和 hash *= 1099511628211,每个字符都会引入一个新的混合因子,使得前面的字符和后面的字符对最终哈希值的贡献是均匀的,这意味着输入数据中的每一个字符都能够“影响”哈希值的所有位,导致哈希值具有很好的散列性质
- 复杂度是 O(n), 通过质数乘法和每个字符的逐步混合减少哈希碰撞
set操作
1 2 3
| func (sc *shardingCache) Set(key string, value interface{}, ttl time.Duration) (oldValue interface{}) { return sc.cacheOf(key).Set(key, value, ttl) }
|
1 2 3 4 5 6
| func (sc *shardingCache) cacheOf(key string) Cache { hash := sc.hash(key) mask := len(sc.caches) - 1
return sc.caches[hash&mask] }
|
- 采用高效的
FNV-1a
算法获取key的散列值,确保了key的均匀性与sharding寻址速度
- 最终写入操作还是三种核心cache执行,key-value最终会被存入所选的lruCache、lfuCache或standardCache
- 除此之外shardingCache的hash也是支持自定义的,在创建cache时可自定义实现
WithHash()
get操作
get操作通过FNV-1a算法定位到sharding后执行对应cache的get获取
reportableCache实现原理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| func report(conf *config, cache Cache) (Cache, *Reporter) { reporter := &Reporter{ conf: conf, cache: cache, hitCount: 0, missedCount: 0, gcCount: 0, loadCount: 0, }
cache = &reportableCache{ config: conf, Reporter: reporter, }
return cache, reporter }
|
set操作
1 2 3
| func (rc *reportableCache) Set(key string, value interface{}, ttl time.Duration) (evictedValue interface{}) { return rc.cache.Set(key, value, ttl) }
|
set操作核心是调用另外三种cache的set接口执行写入操作
get操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| func (rc *reportableCache) Get(key string) (value interface{}, found bool) { value, found = rc.cache.Get(key)
if found { if rc.recordHit { rc.increaseHitCount() }
if rc.reportHit != nil { rc.reportHit(rc.Reporter, key, value) } } else { if rc.recordMissed { rc.increaseMissedCount() }
if rc.reportMissed != nil { rc.reportMissed(rc.Reporter, key) } }
return value, found }
|
- get操作分为两步操作,先获取到值,再执行上报逻辑,其中hit与miss数均使用atomic库操作,保证原子性。
- 上报函数是由用户自定义,同时cachego也提供了HitRate与MissedRate等函数进行概率计算,详情可参考
./_examples/report.go
gc与load数据加载
垃圾清理机制
gc函数用于手动清理过期key。可在创建cache时手动传入WithGC()
指定定时清理时间间隔,或者手动调用GC()
函数执行
定时清理
在创建cache时如果传入WithGC()
,则会执行RunGCTask()
函数在后台定时扫描并清理过期数据,详情可参考newCache
函数
1 2 3 4 5 6 7 8 9 10 11
| func RunGCTask(cache Cache, duration time.Duration) (cancel func()) { fn := func(ctx context.Context) { cache.GC() }
ctx := context.Background() ctx, cancel = context.WithCancel(ctx)
go task.New(fn).Context(ctx).Duration(duration).Run() return cancel }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| func (t *Task) Run() { if t.fn == nil { return }
if t.before != nil { t.before(t.ctx) }
if t.after != nil { defer t.after(t.ctx) }
ticker := time.NewTicker(t.duration) defer ticker.Stop()
for { select { case <-t.ctx.Done(): return case <-ticker.C: t.fn(t.ctx) } } }
|
- 定时垃圾回收通过ticker+for循环定时执行,且返回一个cancel函数支持手动终止回收任务
- task包封装了任务执行过程,允许自定义函数前置和后置操作,类似AOP操作
- 自动清理相比于手动清理多了一个定时任务触发
手动清理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| func (lc *lruCache) gc() (cleans int) { now := lc.now() scans := 0
for _, element := range lc.elementMap { scans++
if entry := lc.unwrap(element); entry.expired(now) { lc.removeElement(element) cleans++ }
if lc.maxScans > 0 && scans >= lc.maxScans { break } }
return cleans }
|
手动清理支持指定扫描key数量,以免扫描key过多导致资源占用过多
如果cache类型是reportableCache,则在清理完成后还会刷新相关指标
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| func (rc *reportableCache) GC() (cleans int) { if rc.recordGC { rc.increaseGCCount() }
if rc.reportGC == nil { return rc.cache.GC() }
begin := rc.now() cleans = rc.cache.GC() end := rc.now()
cost := time.Duration(end - begin) rc.reportGC(rc.Reporter, cost, cleans)
return cleans }
|
load数据加载
当在cache获取到某个key不存在时,可以向下层存取加载,这是可以使用cachego的Load()
函数实现。
Load()
函数支持自定义加载函数,将从下层获取到的值再set会cachego中,且引入了singleflight
减少缓存穿透的伤害
1 2 3 4 5 6 7 8 9 10
| func (lc *lruCache) Load(key string, ttl time.Duration, load func() (value interface{}, err error)) (value interface{}, err error) { value, err = lc.loader.Load(key, ttl, load) if err != nil { return value, err }
lc.Set(key, value, ttl) return value, nil }
|
1 2 3 4 5 6 7 8 9 10 11
| func (l *loader) Load(key string, ttl time.Duration, load func() (value interface{}, err error)) (value interface{}, err error) { if load == nil { return nil, errors.New("cachego: load function is nil in loader") }
if l.group == nil { return load() }
return l.group.Call(key, load) }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| func (g *Group) Call(key string, fn func() (interface{}, error)) (interface{}, error) { g.lock.Lock()
if c, ok := g.calls[key]; ok { g.lock.Unlock()
c.wg.Wait() return c.result, c.err }
c := newCall(fn) c.wg.Add(1)
g.calls[key] = c g.lock.Unlock()
c.do() g.lock.Lock()
if !c.deleted { delete(g.calls, key) }
g.lock.Unlock() return c.result, c.err }
|