Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add ability to clear cache #54

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions CHANGELOG
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [2.3.0] - 2022-01-06
### Added
* Added Group.Set() to allow users to explicity set values in the cache.
* Added Group.Set() to allow users to explicitly set values in the cache.

## [2.2.1] - 2021-01-13
### Changes
* Now uses the much faster fnv1
* Now md5 hashs the keys to help distribute hosts more evenly in some
* Now md5 hashes the keys to help distribute hosts more evenly in some
cases.

## [2.2.0] - 2019-07-09
Expand Down
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,4 +156,7 @@ func ExampleUsage() {

```
### Note
The call to `groupcache.NewHTTPPoolOpts()` is a bit misleading. `NewHTTPPoolOpts()` creates a new pool internally within the `groupcache` package where it is uitilized by any groups created. The `pool` returned is only a pointer to the internallly registered pool so the caller can update the peers in the pool as needed.
The call to `groupcache.NewHTTPPoolOpts()` is a bit misleading. `NewHTTPPoolOpts()`
creates a new pool internally within the `groupcache` package where it is utilized
by any groups created. The `pool` returned is only a pointer to the internally
registered pool so the caller can update the peers in the pool as needed.
13 changes: 10 additions & 3 deletions byteview.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,24 @@ import (
// A ByteView is meant to be used as a value type, not
// a pointer (like a time.Time).
type ByteView struct {
e time.Time
s string
// If b is non-nil, b is used, else s is used.
b []byte
s string
e time.Time
// generation is an optional field, used only in certain operations
g int64
}

// Returns the expire time associated with this view
// Expire returns the expire time associated with this view
func (v ByteView) Expire() time.Time {
return v.e
}

// Generation returns the generation associated with this cache view
func (v ByteView) Generation() int64 {
return v.g
}

// Len returns the view's length.
func (v ByteView) Len() int {
if v.b != nil {
Expand Down
3 changes: 1 addition & 2 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ func ExampleUsage() {
// Create a new group cache with a max cache size of 3MB
group := groupcache.NewGroup("users", 3000000, groupcache.GetterFunc(
func(ctx context.Context, id string, dest groupcache.Sink) error {

// In a real scenario we might fetch the value from a database.
/*if user, err := fetchUserFromMongo(ctx, id); err != nil {
return err
Expand All @@ -58,7 +57,7 @@ func ExampleUsage() {

var user User

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

if err := group.Get(ctx, "12345", groupcache.ProtoSink(&user)); err != nil {
Expand Down
139 changes: 116 additions & 23 deletions groupcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,19 +274,20 @@ func (g *Group) Set(ctx context.Context, key string, value []byte, expire time.T
_, err := g.setGroup.Do(key, func() (interface{}, error) {
// If remote peer owns this key
owner, ok := g.peers.PickPeer(key)
generation := g.mainCache.generation()
if ok {
if err := g.setFromPeer(ctx, owner, key, value, expire); err != nil {
if err := g.setFromPeer(ctx, owner, key, value, expire, generation); err != nil {
return nil, err
}
// TODO(thrawn01): Not sure if this is useful outside of tests...
// maybe we should ALWAYS update the local cache?
if hotCache {
g.localSet(key, value, expire, &g.hotCache)
g.localSet(key, value, expire, generation, &g.hotCache)
}
return nil, nil
}
// We own this key
g.localSet(key, value, expire, &g.mainCache)
g.localSet(key, value, expire, generation, &g.mainCache)
return nil, nil
})
return err
Expand All @@ -298,7 +299,6 @@ func (g *Group) Remove(ctx context.Context, key string) error {
g.peersOnce.Do(g.initPeers)

_, err := g.removeGroup.Do(key, func() (interface{}, error) {

// Remove from key owner first
owner, ok := g.peers.PickPeer(key)
if ok {
Expand Down Expand Up @@ -341,6 +341,41 @@ func (g *Group) Remove(ctx context.Context, key string) error {
return err
}

// Clear purges our cache then forwards the clear request to all peers.
func (g *Group) Clear(ctx context.Context) error {
g.peersOnce.Do(g.initPeers)

_, err := g.removeGroup.Do("", func() (interface{}, error) {
// Clear our cache first
g.localClear()
wg := sync.WaitGroup{}
errs := make(chan error)

// Asynchronously clear all caches of peers
for _, peer := range g.peers.GetAll() {
wg.Add(1)
go func(peer ProtoGetter) {
errs <- g.clearFromPeer(ctx, peer)
wg.Done()
}(peer)
}
go func() {
wg.Wait()
close(errs)
}()

// TODO(thrawn01): Should we report all errors? Reporting context
// cancelled error for each peer doesn't make much sense.
var err error
for e := range errs {
err = e
}

return nil, err
})
return err
}

// load loads key either by invoking the getter locally or by sending it to another machine.
func (g *Group) load(ctx context.Context, key string, dest Sink) (value ByteView, destPopulated bool, err error) {
g.Stats.Loads.Add(1)
Expand Down Expand Up @@ -461,23 +496,29 @@ func (g *Group) getFromPeer(ctx context.Context, peer ProtoGetter, key string) (
}
}

value := ByteView{b: res.Value, e: expire}
var generation int64
if res.Generation != nil {
generation = *res.Generation
}

value := ByteView{b: res.Value, e: expire, g: generation}

// Always populate the hot cache
g.populateCache(key, value, &g.hotCache)
return value, nil
}

func (g *Group) setFromPeer(ctx context.Context, peer ProtoGetter, k string, v []byte, e time.Time) error {
func (g *Group) setFromPeer(ctx context.Context, peer ProtoGetter, k string, v []byte, e time.Time, gen int64) error {
var expire int64
if !e.IsZero() {
expire = e.UnixNano()
}
req := &pb.SetRequest{
Expire: &expire,
Group: &g.name,
Key: &k,
Value: v,
Expire: &expire,
Group: &g.name,
Key: &k,
Value: v,
Generation: &gen,
}
return peer.Set(ctx, req)
}
Expand All @@ -490,6 +531,13 @@ func (g *Group) removeFromPeer(ctx context.Context, peer ProtoGetter, key string
return peer.Remove(ctx, req)
}

func (g *Group) clearFromPeer(ctx context.Context, peer ProtoGetter) error {
req := &pb.GetRequest{
Group: &g.name,
}
return peer.Clear(ctx, req)
}

func (g *Group) lookupCache(key string) (value ByteView, ok bool) {
if g.cacheBytes <= 0 {
return
Expand All @@ -502,14 +550,15 @@ func (g *Group) lookupCache(key string) (value ByteView, ok bool) {
return
}

func (g *Group) localSet(key string, value []byte, expire time.Time, cache *cache) {
func (g *Group) localSet(key string, value []byte, expire time.Time, generation int64, cache *cache) {
if g.cacheBytes <= 0 {
return
}

bv := ByteView{
b: value,
e: expire,
g: generation,
}

// Ensure no requests are in flight
Expand All @@ -531,6 +580,19 @@ func (g *Group) localRemove(key string) {
})
}

func (g *Group) localClear() {
// Clear our local cache
if g.cacheBytes <= 0 {
return
}

// Ensure no requests are in flight
g.loadGroup.Lock(func() {
g.hotCache.clear()
g.mainCache.clear()
})
}

func (g *Group) populateCache(key string, value ByteView, cache *cache) {
if g.cacheBytes <= 0 {
return
Expand Down Expand Up @@ -592,21 +654,23 @@ var NowFunc lru.NowFunc = time.Now
// values.
type cache struct {
mu sync.RWMutex
nbytes int64 // of all keys and values
lru *lru.Cache
nbytes int64 // of all keys and values
nhit, nget int64
nevict int64 // number of evictions
gen int64
}

func (c *cache) stats() CacheStats {
c.mu.RLock()
defer c.mu.RUnlock()
return CacheStats{
Bytes: c.nbytes,
Items: c.itemsLocked(),
Gets: c.nget,
Hits: c.nhit,
Evictions: c.nevict,
Bytes: c.nbytes,
Items: c.itemsLocked(),
Gets: c.nget,
Hits: c.nhit,
Evictions: c.nevict,
Generation: c.gen,
}
}

Expand All @@ -623,6 +687,16 @@ func (c *cache) add(key string, value ByteView) {
},
}
}
if c.gen != value.g {
if logger != nil {
logger.Error().WithFields(map[string]interface{}{
"got": value.g,
"have": c.generation,
"key": key,
}).Printf("generation mismatch")
}
return
}
c.lru.Add(key, value, value.Expire())
c.nbytes += int64(len(key)) + int64(value.Len())
}
Expand All @@ -639,7 +713,10 @@ func (c *cache) get(key string) (value ByteView, ok bool) {
return
}
c.nhit++
return vi.(ByteView), true

bv := vi.(ByteView)
bv.g = c.gen
return bv, true
}

func (c *cache) remove(key string) {
Expand All @@ -651,6 +728,15 @@ func (c *cache) remove(key string) {
c.lru.Remove(key)
}

func (c *cache) clear() {
c.mu.Lock()
defer c.mu.Unlock()
if c.lru == nil {
return
}
c.lru.Clear()
}

func (c *cache) removeOldest() {
c.mu.Lock()
defer c.mu.Unlock()
Expand Down Expand Up @@ -678,6 +764,12 @@ func (c *cache) itemsLocked() int64 {
return int64(c.lru.Len())
}

func (c *cache) generation() int64 {
c.mu.RLock()
defer c.mu.RUnlock()
return c.gen
}

// An AtomicInt is an int64 to be accessed atomically.
type AtomicInt int64

Expand All @@ -702,9 +794,10 @@ func (i *AtomicInt) String() string {

// CacheStats are returned by stats accessors on Group.
type CacheStats struct {
Bytes int64
Items int64
Gets int64
Hits int64
Evictions int64
Bytes int64
Items int64
Gets int64
Hits int64
Evictions int64
Generation int64
}
Loading