在我的一个基于LSM-Tree结构的存储引擎项目中,由于LSM-Tree结构的查询操作高度依赖缓存,所以我需要一个高性能的本地缓存模块来提高查询效率,抱着学习的目的,我们来 重复造轮子 亲自实现一个拥有良好性能的缓存模块。
所谓缓存,本质上就是 映射+淘汰策略 ,如何在有限的空间存储最”有用“的数据,也就是如何设定淘汰策略往往是我们所需要关注的,在Redis中,我们很多时候都是用的TTL(Time To Live),以固定生存时间的角度控制数据的淘汰,但是对于一个存储引擎来说,显然不是一个良好的方案,而除了固定生存时间的角度就是访问频率的角度,基于访问的角度则指向了两种算法: LRU 和 LFU 。
LRU 和 LFU 算法
LRU 算法
LRU (Least Recently Used) 即最近最久未使用算法
LRU 算法的宗旨是,在定量空间的存储数据时,当新增数据后超出缓存设定的阈值,淘汰最长时间没有访问到数据。在 LRU 算法的思想下,如果一段数据在最近的时间内没有被访问到,那么它接下来被访问到的概率也很小,执行淘汰。
在某些情况下,LRU算法是简单且有效的,但是在有些情况下,它就并不是十分的合理,如下操作。
| 最早访问 | 中间访问 | 最晚访问 | |
|---|---|---|---|
| A | B | C | 初始数据 |
| B | C | A | 访问A |
| B | A | C | 访问C |
| B | A | C | 访问C |
| B | C | A | 访问A |
| B | A | C | 访问C |
| A | C | B | 访问B |
| C | B | D | 插入D |
经过多次访问后,最终插入数据时淘汰了数据A,而通过访问情况我们可以发现,数据A和C的访问频率都相比B要高,而淘汰策略却保留了B舍弃了A。在局部性显著的时候,LRU这种淘汰策略也是正确的,但是在其它局部性不显著的大量数据访问的情况下(如全量遍历),缓存就有可能被污染,导致查询性能下降。且LRU对热门数据的保护不强,不过这也使得LRU有用更强的访问模式适应能力
此处由于展示方式有限,数据量较少,可自行脑补大量数据时产生热门数据被淘汰的特殊情况
LFU 算法
LFU(Least Frequently Used)即最近最少使用算法。
LFU算法根据数据的历史访问频率来淘汰数据,其核心思想是“如果数据过去被访问多次,那么将来被访问的频率也更高”。
由于其淘汰策略所致,对于突发的稀疏流量,LFU的应对能力不如LRU,大量新数据可能不被缓存,但LFU所带来的好处就是其对于热点数据的缓存命中率会更高
优缺点对比
LRU
- 优点:实现简单,可以应对突变的访问模式
- 缺点:难以应对缓存污染,对于热数据的缓存命中率低于LFU
LFU
- 优点:拥有更高的热门数据命中率
- 缺点:难以应对突变的稀疏流量、可能存在旧数据长期不淘汰,且需要额外消耗来记录更新访问次数
Window-TinyLFU算法
在 Java 中有一个很出名的 Caffeine 的高性能本地缓存库,正是因为其实现的 Window-TinyLFU 的回收策略为它提供了良好的缓存命中率。
在《TinyLFU: A Highly E cient Cache Admission Policy》论文中,详细介绍了 TinyLFU 这种通过LRU实现类LFU功能的结构设计。
实现
对外暴露的结构体,其中包含了读写锁、window-lr、分段lru、布隆过滤器、cmSketch算法次数统计器、保险的阈值
type Cache struct {
// 读写锁
m sync.RWMutex
// window-lru
lru *windowLRU
// 分段lru
slru *segmentedLRU
// 布隆过滤器
bf *BloomFilter
// cmSketch 次数统计器
c *cmSketch
// total 总共的访问次数
t int32
// 保险设计的阈值
threshold int32
// 数据的实际存储
data map[uint64]*list.Element
}
window-lru
type windowLRU struct {
data map[uint64]*list.Element
cap int
list *list.List
}
type storeItem struct {
stage int
key uint64
conflict uint64
value interface{}
}
func newWindowLRU(size int, data map[uint64]*list.Element) *windowLRU {
return &windowLRU{
data: data,
cap: size,
list: list.New(),
}
}
func (lru *windowLRU) add(newItem storeItem) (eitem storeItem, evicted bool) {
// If part of window is not full, insert it directly
if lru.list.Len() < lru.cap {
lru.data[newItem.key] = lru.list.PushFront(&newItem)
return storeItem{}, false
}7
evictItem := lru.list.Back()
item := evictItem.Value.(*storeItem)
delete(lru.data, item.key)
eitem, *item = *item, newItem
lru.data[item.key] = evictItem
lru.list.MoveToFront(evictItem)
return eitem, true
}
func (lru *windowLRU) get(v *list.Element) {
lru.list.MoveToFront(v)
}
func (lru *windowLRU) String() (s string) {
for e := lru.list.Front(); e != nil; e = e.Next() {
s += fmt.Sprintf("%v,", e.Value.(*storeItem).value)
}
return s
}
segmented-lru
type segmentedLRU struct {
data map[uint64]*list.Element
stageOneCap, stageTwoCap int
stageOne, stageTwo *list.List
}
const (
STAGE_ONE = iota + 1
STAGE_TWO
)
func newS2LRU(data map[uint64]*list.Element, stageOneCap, stageTwoCap int) *segmentedLRU {
return &segmentedLRU{
data: data,
stageOneCap: stageOneCap,
stageTwoCap: stageTwoCap,
stageOne: list.New(),
stageTwo: list.New(),
}
}
func (s2lru *segmentedLRU) add(newItem storeItem) {
// 先进来的都放 stageOne
newItem.stage = STAGE_ONE
// 如果 stageOne 没满, 整个LFU区域也没满
if s2lru.stageOne.Len() < s2lru.stageOneCap || s2lru.Len() < s2lru.stageOneCap+s2lru.stageTwoCap {
s2lru.data[newItem.key] = s2lru.stageOne.PushFront(&newItem)
return
}
// 走到这里说明 stageOne 满了,或者整个LFU都满了
// 则需要载 stageOne 淘汰数据
e := s2lru.stageOne.Back()
item := e.Value.(*storeItem)
// 淘汰数据
delete(s2lru.data, item.key)
*item = newItem
s2lru.data[item.key] = e
s2lru.stageOne.MoveToFront(e)
}
func (s2lru *segmentedLRU) get(v *list.Element) {
item := v.Value.(*storeItem)
// 若访问的缓存数据已经载StageTwo,只需要按照LRU规则提前即可
if item.stage == STAGE_TWO {
s2lru.stageTwo.MoveToFront(v)
return
}
// 若访问的数据还在StageOne,那么两次被访问倒,就需要提升到StageTwo阶段了
if s2lru.stageTwo.Len() < s2lru.stageTwoCap {
s2lru.stageOne.Remove(v)
item.stage = STAGE_TWO
s2lru.data[item.key] = s2lru.stageTwo.PushFront(item)
return
}
// 新数据加入StageTwo, 需要淘汰旧数据
// StageTwo 中淘汰的数据不会丢失,会进入StageOne
// StageOne 中,访问频率低的数据,可能会被淘汰
// 将第二个链表和第一个链表中的数据进行交换
back := s2lru.stageTwo.Back()
bItem := back.Value.(*storeItem)
*bItem, *item = *item, *bItem
bItem.stage = STAGE_TWO
item.stage = STAGE_ONE
// 数据提前
s2lru.data[item.key] = v
s2lru.data[bItem.key] = back
s2lru.stageOne.MoveToFront(v)
s2lru.stageOne.MoveToFront(back)
}
func (s2lru *segmentedLRU) victim() *storeItem {
// 如果s2lru的容量未满,不需要淘汰
if s2lru.Len() < s2lru.stageOneCap+s2lru.stageTwoCap {
return nil
}
// 如果已经满了, 则需要从20%的区域淘汰数据,直接从末尾部拿最后一个数据即可
v := s2lru.stageOne.Back()
return v.Value.(*storeItem)
}
func (s2lru *segmentedLRU) String() (s string) {
for e := s2lru.stageTwo.Front(); e != nil; e = e.Next() {
s += fmt.Sprintf("%v,", e.Value.(*storeItem).value)
}
s += fmt.Sprintf(" | ")
for e := s2lru.stageOne.Front(); e != nil; e = e.Next() {
s += fmt.Sprintf("%v,", e.Value.(*storeItem).value)
}
return s
}
func (s2lru *segmentedLRU) Len() int {
return s2lru.stageOne.Len() + s2lru.stageTwo.Len()
}
cmSketch
const (
cmDepth = 4
)
type cmSketch struct {
rows [cmDepth]cmRow
seed [cmDepth]uint64
mask uint64
}
func newCmSketch(numCounters int64) *cmSketch {
if numCounters == 0 {
panic("cmSketch: invalid numCounters")
}
// 因为在位图的实际存储中2个Counters存放在一个byte中,所以numCounters为一定为偶数
numCounters = next2Power(numCounters)
// mask 为numcounter - 1 即一定是0111...111,用以保留后n位
sketch := &cmSketch{mask: uint64(numCounters - 1)}
source := rand.New(rand.NewSource(time.Now().UnixNano()))
// 假设预计cache 6条数据,初始化[4]rows如下
// 0000,0000|0000,0000|0000,0000
// 0000,0000|0000,0000|0000,0000
// 0000,0000|0000,0000|0000,0000
// 0000,0000|0000,0000|0000,0000
for i := 0; i < cmDepth; i++ {
sketch.seed[i] = source.Uint64()
sketch.rows[i] = newCmRow(numCounters)
}
return sketch
}
// 在计数器中增加某key的计数
func (s *cmSketch) Increment(hashed uint64) {
// 对于每行进行相同操作
for i := range s.rows {
s.rows[i].increment((hashed ^ s.seed[i]) & s.mask)
}
}
// 估算的访问次数
func (s *cmSketch) Estimate(hashed uint64) int64 {
min := byte(255)
for i := range s.rows {
val := s.rows[i].get((hashed ^ s.seed[i]) & s.mask)
if val < min {
min = val
}
}
return int64(min)
}
// 将所有计数器值减半,即保鲜机制
func (s *cmSketch) Reset() {
for _, r := range s.rows {
r.reset()
}
}
// 将所有计数器归零
func (s *cmSketch) Clear() {
for _, r := range s.rows {
r.clear()
}
}
// 快速计算大于 X,且最接近 X 的二次幂
func next2Power(x int64) int64 {
x--
x |= x >> 1
x |= x >> 2
x |= x >> 4
x |= x >> 8
x |= x >> 16
x |= x >> 32
x++
return x
}
// 计数器位图
type cmRow []byte
// 计数器的每个key的计数值(counter)占用4bit,每个byte为8bit,故cmRow的长度为计数总量的一半
func newCmRow(numCounters int64) cmRow {
return make(cmRow, numCounters/2)
}
func (r cmRow) get(n uint64) byte {
return r[n/2] >> ((n & 1) * 4) & 0x0f
}
func (r cmRow) increment(n uint64) {
// 定位到第i个couter
i := n / 2
// 右移距离,偶数为0,奇数为4
// 决定了取前4bit 还是后4bit
s := (n & 1) * 4
v := (r[i] >> s) & 0x0f
// 若没有超过最大计数,则计数+1
if v < 15 {
r[i] += 1 << s
}
}
// 保险机制
func (r cmRow) reset() {
// 给每个byte中的2个counter同时减半
for i := range r {
r[i] = (r[i] >> 1) & 0x77
}
}
// 清零
func (r cmRow) clear() {
for i := range r {
r[i] = 0
}
}
func (r cmRow) String() (s string) {
for i := uint64(0); i < uint64(len(r)*2); i++ {
s += fmt.Sprintf("%02d ", (r[(i/2)]>>((i&1)*4))&0x0f)
}
s = s[:len(s)-1]
return s
}
最终的封装
type Cache struct {
m sync.RWMutex
lru *windowLRU
slru *segmentedLRU
bf *BloomFilter
c *cmSketch
t int32
threshold int32
data map[uint64]*list.Element
}
type Options struct {
lruPct uint8
}
// NewCache size: 要缓存的数据数量
func NewCache(size int) *Cache {
// 定义window部分缓存所占百分比,这里定义为1%
const lruPct = 1
// 计算window部分的容量
lruSize := (lruPct * size) / 100
if lruSize < 1 {
lruSize = 1
}
// 计算LFU部分的缓存容量
slruSize := int(float64(size) * ((100 - lruPct) / 100.0))
if slruSize < 1 {
slruSize = 1
}
// LFU 分为两部分, stageOne部分占比20%
slru1 := int(0.2 * float64(slruSize))
if slru1 < 1 {
slru1 = 1
}
data := make(map[uint64]*list.Element, size)
return &Cache{
lru: newWindowLRU(lruSize, data),
slru: newS2LRU(data, slru1, slruSize-slru1),
bf: NewBloomFilter(size, 0.01),
c: newCmSketch(int64(size)),
data: data,
}
}
// Set
// todo Optimize this method by using generics
func (c *Cache) Set(key, value interface{}) bool {
c.m.Lock()
defer c.m.Lock()
return c.set(key, value)
}
func (c *Cache) set(key, value interface{}) bool {
// keyHash 用来快速定位, conflictHash 用来判断冲突
keyHash, conflictHash := c.key2Hash(key)
// 刚放进去的缓存都先放到window lru 中, 所以stage = 0
i := storeItem{
stage: 0,
key: keyHash,
conflict: conflictHash,
value: value,
}
// 如果window 已满, 返回被淘汰的数据
eitem, evicted := c.lru.add(i)
if !evicted {
return true
}
// 如果window中有被淘汰的数据,会走到这里
// 需要从LFU的stageOne 部分找到一个淘汰者
// 二者进行再次比较
victim := c.slru.victim()
// 如果LFU未满,那么window lru的淘汰数据,可以进入stageOne
if victim == nil {
c.slru.add(eitem)
return true
}
// 先在bloomfilter中查找
// 如果存在,说明访问频率 >= 2
if !c.bf.Allow(uint32(eitem.key)) {
return true
}
// 估算windowlru和LFU中淘汰数据, 历史访问频次
// 访问频率高的,更有资格留下
vcount := c.c.Estimate(victim.key)
ocount := c.c.Estimate(eitem.key)
if ocount < vcount {
return true
}
// 留下来的进入 stageOne
c.slru.add(eitem)
return true
}
func (c *Cache) Get(key interface{}) (interface{}, bool) {
c.m.RLock()
defer c.m.RUnlock()
return c.get(key)
}
func (c *Cache) get(key interface{}) (interface{}, bool) {
c.t++
if c.t == c.threshold {
c.c.Reset()
c.bf.Reset()
c.t = 0
}
keyHash, confilctHash := c.key2Hash(key)
val, ok := c.data[keyHash]
if !ok {
c.bf.Allow(uint32(keyHash))
c.c.Increment(keyHash)
return nil, false
}
item := val.Value.(*storeItem)
if item.conflict != confilctHash {
c.bf.Allow(uint32(keyHash))
c.c.Increment(keyHash)
return nil, false
}
c.bf.Allow(uint32(keyHash))
c.c.Increment(item.key)
v := item.value
if item.stage == 0 {
c.lru.get(val)
} else {
c.slru.get(val)
}
return v, true
}
func (c *Cache) Del(key interface{}) (interface{}, bool) {
c.m.Lock()
defer c.m.Unlock()
return c.del(key)
}
func (c *Cache) del(key interface{}) (interface{}, bool) {
keyHash, conflictHash := c.key2Hash(key)
val, ok := c.data[keyHash]
if !ok {
return 0, false
}
item := val.Value.(*storeItem)
if conflictHash != 0 && (conflictHash != item.conflict) {
return 0, false
}
delete(c.data, keyHash)
return item.conflict, true
}
func (c *Cache) key2Hash(key interface{}) (uint64, uint64) {
if key == nil {
return 0, 0
}
switch k := key.(type) {
case uint64:
return k, 0
case string:
return MemHashString(k), xxhash.Sum64String(k)
case []byte:
return MemHash(k), xxhash.Sum64(k)
case byte:
return uint64(k), 0
case int:
return uint64(k), 0
case int32:
return uint64(k), 0
case uint32:
return uint64(k), 0
case int64:
return uint64(k), 0
default:
panic("Key type not supported")
}
}
type stringStruct struct {
str unsafe.Pointer
len int
}
//go:noescape
//go:linkname memhash runtime.memhash
func memhash(p unsafe.Pointer, h, s uintptr) uintptr
func MemHashString(str string) uint64 {
ss := (*stringStruct)(unsafe.Pointer(&str))
return uint64(memhash(ss.str, 0, uintptr(ss.len)))
}
func MemHash(data []byte) uint64 {
ss := (*stringStruct)(unsafe.Pointer(&data))
return uint64(memhash(ss.str, 0, uintptr(ss.len)))
}
扩展阅读
[《TinyLFU: A Highly E cient Cache Admission Policy》]([1512.00727] TinyLFU: A Highly Efficient Cache Admission Policy (arxiv.org))