Skip to content

Commit

Permalink
feat: add ability to clear cache
Browse files Browse the repository at this point in the history
While the LRU package has the ability to purge all items from cache,
this functionality was not available to `ProtoGetter`, making it
imposibile to clear the cache without restarting all peers. This change
adds a `Clear()` method to `ProtoGetter`, that enables clearing the
cache with no downtime.
  • Loading branch information
ct16k committed Dec 17, 2022
1 parent df53951 commit 8e3936a
Show file tree
Hide file tree
Showing 7 changed files with 168 additions and 17 deletions.
4 changes: 2 additions & 2 deletions CHANGELOG
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [2.3.0] - 2022-01-06
### Added
* Added Group.Set() to allow users to explicity set values in the cache.
* Added Group.Set() to allow users to explicitly set values in the cache.

## [2.2.1] - 2021-01-13
### Changes
* Now uses the much faster fnv1
* Now md5 hashs the keys to help distribute hosts more evenly in some
* Now md5 hashes the keys to help distribute hosts more evenly in some
cases.

## [2.2.0] - 2019-07-09
Expand Down
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,4 +156,7 @@ func ExampleUsage() {

```
### Note
The call to `groupcache.NewHTTPPoolOpts()` is a bit misleading. `NewHTTPPoolOpts()` creates a new pool internally within the `groupcache` package where it is uitilized by any groups created. The `pool` returned is only a pointer to the internallly registered pool so the caller can update the peers in the pool as needed.
The call to `groupcache.NewHTTPPoolOpts()` is a bit misleading. `NewHTTPPoolOpts()`
creates a new pool internally within the `groupcache` package where it is utilized
by any groups created. The `pool` returned is only a pointer to the internally
registered pool so the caller can update the peers in the pool as needed.
3 changes: 1 addition & 2 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ func ExampleUsage() {
// Create a new group cache with a max cache size of 3MB
group := groupcache.NewGroup("users", 3000000, groupcache.GetterFunc(
func(ctx context.Context, id string, dest groupcache.Sink) error {

// In a real scenario we might fetch the value from a database.
/*if user, err := fetchUserFromMongo(ctx, id); err != nil {
return err
Expand All @@ -58,7 +57,7 @@ func ExampleUsage() {

var user User

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

if err := group.Get(ctx, "12345", groupcache.ProtoSink(&user)); err != nil {
Expand Down
65 changes: 64 additions & 1 deletion groupcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,6 @@ func (g *Group) Remove(ctx context.Context, key string) error {
g.peersOnce.Do(g.initPeers)

_, err := g.removeGroup.Do(key, func() (interface{}, error) {

// Remove from key owner first
owner, ok := g.peers.PickPeer(key)
if ok {
Expand Down Expand Up @@ -341,6 +340,41 @@ func (g *Group) Remove(ctx context.Context, key string) error {
return err
}

// Clear purges our cache then forwards the clear request to all peers.
func (g *Group) Clear(ctx context.Context) error {
g.peersOnce.Do(g.initPeers)

_, err := g.removeGroup.Do("", func() (interface{}, error) {
// Clear our cache first
g.localClear()
wg := sync.WaitGroup{}
errs := make(chan error)

// Asynchronously clear all caches of peers
for _, peer := range g.peers.GetAll() {
wg.Add(1)
go func(peer ProtoGetter) {
errs <- g.clearFromPeer(ctx, peer)
wg.Done()
}(peer)
}
go func() {
wg.Wait()
close(errs)
}()

// TODO(thrawn01): Should we report all errors? Reporting context
// cancelled error for each peer doesn't make much sense.
var err error
for e := range errs {
err = e
}

return nil, err
})
return err
}

// load loads key either by invoking the getter locally or by sending it to another machine.
func (g *Group) load(ctx context.Context, key string, dest Sink) (value ByteView, destPopulated bool, err error) {
g.Stats.Loads.Add(1)
Expand Down Expand Up @@ -490,6 +524,13 @@ func (g *Group) removeFromPeer(ctx context.Context, peer ProtoGetter, key string
return peer.Remove(ctx, req)
}

func (g *Group) clearFromPeer(ctx context.Context, peer ProtoGetter) error {
req := &pb.GetRequest{
Group: &g.name,
}
return peer.Clear(ctx, req)
}

func (g *Group) lookupCache(key string) (value ByteView, ok bool) {
if g.cacheBytes <= 0 {
return
Expand Down Expand Up @@ -531,6 +572,19 @@ func (g *Group) localRemove(key string) {
})
}

func (g *Group) localClear() {
// Clear our local cache
if g.cacheBytes <= 0 {
return
}

// Ensure no requests are in flight
g.loadGroup.Lock(func() {
g.hotCache.clear()
g.mainCache.clear()
})
}

func (g *Group) populateCache(key string, value ByteView, cache *cache) {
if g.cacheBytes <= 0 {
return
Expand Down Expand Up @@ -645,6 +699,15 @@ func (c *cache) remove(key string) {
c.lru.Remove(key)
}

func (c *cache) clear() {
c.mu.Lock()
defer c.mu.Unlock()
if c.lru == nil {
return
}
c.lru.Clear()
}

func (c *cache) removeOldest() {
c.mu.Lock()
defer c.mu.Unlock()
Expand Down
53 changes: 53 additions & 0 deletions groupcache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,51 @@ func TestCacheEviction(t *testing.T) {
}
}

func TestCachePurging(t *testing.T) {
once.Do(testSetup)
testKey1 := "TestCachePurging-key1"
getTestKey1 := func() {
var res string
for i := 0; i < 10; i++ {
if err := stringGroup.Get(dummyCtx, testKey1, StringSink(&res)); err != nil {
t.Fatal(err)
}
}
}
fills := countFills(getTestKey1)
if fills != 1 {
t.Fatalf("expected 1 cache fill; got %d", fills)
}

testKey2 := "TestCachePurging-key2"
getTestKey2 := func() {
var res string
for i := 0; i < 10; i++ {
if err := stringGroup.Get(dummyCtx, testKey2, StringSink(&res)); err != nil {
t.Fatal(err)
}
}
}
fills = countFills(getTestKey2)
if fills != 1 {
t.Fatalf("expected 1 cache fill; got %d", fills)
}

g := stringGroup.(*Group)
// Clear the cache
g.Clear(dummyCtx)

// Test that the keys are gone.
fills = countFills(getTestKey1)
if fills != 1 {
t.Fatalf("expected 1 cache fill after cache purging; got %d", fills)
}
fills = countFills(getTestKey2)
if fills != 1 {
t.Fatalf("expected 1 cache fill after cache purging; got %d", fills)
}
}

type fakePeer struct {
hits int
fail bool
Expand Down Expand Up @@ -279,6 +324,14 @@ func (p *fakePeer) Remove(_ context.Context, in *pb.GetRequest) error {
return nil
}

func (p *fakePeer) Clear(_ context.Context, in *pb.GetRequest) error {
p.hits++
if p.fail {
return errors.New("simulated error from peer")
}
return nil
}

func (p *fakePeer) GetURL() string {
return "fakePeer"
}
Expand Down
50 changes: 42 additions & 8 deletions http.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,12 +164,13 @@ func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) {
panic("HTTPPool serving unexpected path: " + r.URL.Path)
}
parts := strings.SplitN(r.URL.Path[len(p.opts.BasePath):], "/", 2)
if len(parts) != 2 {
lenParts := len(parts)

if (lenParts != 2) || ((lenParts == 1) && (r.Method != http.MethodDelete)) {
http.Error(w, "bad request", http.StatusBadRequest)
return
}
groupName := parts[0]
key := parts[1]

// Fetch the value for this group/key.
group := GetGroup(groupName)
Expand All @@ -186,6 +187,13 @@ func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) {

group.Stats.ServerRequests.Add(1)

if (lenParts == 1) && (r.Method == http.MethodDelete) {
group.localRemove("")
return
}

key := parts[1]

// Delete the key and return 200
if r.Method == http.MethodDelete {
group.localRemove(key)
Expand Down Expand Up @@ -268,12 +276,21 @@ type request interface {
}

func (h *httpGetter) makeRequest(ctx context.Context, m string, in request, b io.Reader, out *http.Response) error {
u := fmt.Sprintf(
"%v%v/%v",
h.baseURL,
url.QueryEscape(in.GetGroup()),
url.QueryEscape(in.GetKey()),
)
var u string
if key := in.GetKey(); key != "" {
u = fmt.Sprintf(
"%v%v/%v",
h.baseURL,
url.QueryEscape(in.GetGroup()),
url.QueryEscape(key),
)
} else {
u = fmt.Sprintf(
"%v%v",
h.baseURL,
url.QueryEscape(in.GetGroup()),
)
}
req, err := http.NewRequestWithContext(ctx, m, u, b)
if err != nil {
return err
Expand Down Expand Up @@ -353,3 +370,20 @@ func (h *httpGetter) Remove(ctx context.Context, in *pb.GetRequest) error {
}
return nil
}

func (h *httpGetter) Clear(ctx context.Context, in *pb.GetRequest) error {
var res http.Response
if err := h.makeRequest(ctx, http.MethodDelete, in, nil, &res); err != nil {
return err
}
defer res.Body.Close()

if res.StatusCode != http.StatusOK {
body, err := ioutil.ReadAll(res.Body)
if err != nil {
return fmt.Errorf("while reading body response: %v", res.Status)
}
return fmt.Errorf("server returned status %d: %s", res.StatusCode, body)
}
return nil
}
5 changes: 2 additions & 3 deletions peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type ProtoGetter interface {
Get(context context.Context, in *pb.GetRequest, out *pb.GetResponse) error
Remove(context context.Context, in *pb.GetRequest) error
Set(context context.Context, in *pb.SetRequest) error
Clear(context context.Context, in *pb.GetRequest) error
// GetURL returns the peer URL
GetURL() string
}
Expand All @@ -50,9 +51,7 @@ type NoPeers struct{}
func (NoPeers) PickPeer(key string) (peer ProtoGetter, ok bool) { return }
func (NoPeers) GetAll() []ProtoGetter { return []ProtoGetter{} }

var (
portPicker func(groupName string) PeerPicker
)
var portPicker func(groupName string) PeerPicker

// RegisterPeerPicker registers the peer initialization function.
// It is called once, when the first group is created.
Expand Down

0 comments on commit 8e3936a

Please sign in to comment.