Skip to content

Commit

Permalink
Merge pull request #19233 from AwesomePatrol/add-compaction-to-kubern…
Browse files Browse the repository at this point in the history
…etess-robustness-tests

Add compaction to kubernetes robustness tests
  • Loading branch information
serathius authored Jan 21, 2025
2 parents 0dcd015 + 845a330 commit 7c56ef2
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 35 deletions.
32 changes: 31 additions & 1 deletion tests/robustness/traffic/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"math/rand"
"time"

"golang.org/x/time/rate"

Expand Down Expand Up @@ -108,7 +109,7 @@ func (t etcdTraffic) Name() string {
return "Etcd"
}

func (t etcdTraffic) Run(ctx context.Context, c *client.RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIDStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{}) {
func (t etcdTraffic) RunTrafficLoop(ctx context.Context, c *client.RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIDStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{}) {
lastOperationSucceeded := true
var lastRev int64
var requestType etcdRequestType
Expand Down Expand Up @@ -155,6 +156,35 @@ func (t etcdTraffic) Run(ctx context.Context, c *client.RecordingClient, limiter
}
}

func (t etcdTraffic) RunCompactLoop(ctx context.Context, c *client.RecordingClient, period time.Duration, finish <-chan struct{}) {
var lastRev int64 = 2
ticker := time.NewTicker(period)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-finish:
return
case <-ticker.C:
}
statusCtx, cancel := context.WithTimeout(ctx, RequestTimeout)
resp, err := c.Status(statusCtx, c.Endpoints()[0])
cancel()
if err != nil {
continue
}

// Range allows for both revision has been compacted and future revision errors
compactRev := random.RandRange(lastRev, resp.Header.Revision+5)
_, err = c.Compact(ctx, compactRev)
if err != nil {
continue
}
lastRev = compactRev
}
}

func filterOutNonUniqueEtcdWrites(choices []random.ChoiceWeight[etcdRequestType]) (resp []random.ChoiceWeight[etcdRequestType]) {
for _, choice := range choices {
if choice.Choice != Delete && choice.Choice != LeaseRevoke {
Expand Down
60 changes: 59 additions & 1 deletion tests/robustness/traffic/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import (
"errors"
"fmt"
"math/rand"
"strconv"
"sync"
"time"

"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"
Expand Down Expand Up @@ -56,7 +58,7 @@ func (t kubernetesTraffic) ExpectUniqueRevision() bool {
return true
}

func (t kubernetesTraffic) Run(ctx context.Context, c *client.RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIDStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{}) {
func (t kubernetesTraffic) RunTrafficLoop(ctx context.Context, c *client.RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIDStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{}) {
kc := kubernetes.Client{Client: &clientv3.Client{KV: c}}
s := newStorage()
keyPrefix := "/registry/" + t.resource + "/"
Expand Down Expand Up @@ -205,6 +207,62 @@ func (t kubernetesTraffic) generateKey() string {
return fmt.Sprintf("/registry/%s/%s/%s", t.resource, t.namespace, stringutil.RandString(5))
}

func (t kubernetesTraffic) RunCompactLoop(ctx context.Context, c *client.RecordingClient, interval time.Duration, finish <-chan struct{}) {
// Based on https://github.com/kubernetes/apiserver/blob/7dd4904f1896e11244ba3c5a59797697709de6b6/pkg/storage/etcd3/compact.go#L112-L127
var compactTime int64
var rev int64
var err error
for {
select {
case <-time.After(interval):
case <-ctx.Done():
return
case <-finish:
return
}

compactTime, rev, err = compact(ctx, c, compactTime, rev)
if err != nil {
continue
}
}
}

// Based on https://github.com/kubernetes/apiserver/blob/7dd4904f1896e11244ba3c5a59797697709de6b6/pkg/storage/etcd3/compact.go#L30
const (
compactRevKey = "compact_rev_key"
)

func compact(ctx context.Context, client *client.RecordingClient, t, rev int64) (int64, int64, error) {
// Based on https://github.com/kubernetes/apiserver/blob/7dd4904f1896e11244ba3c5a59797697709de6b6/pkg/storage/etcd3/compact.go#L133-L162
// TODO: Use Version and not ModRevision when model supports key versioning.
resp, err := client.Txn(ctx).
If(clientv3.Compare(clientv3.ModRevision(compactRevKey), "=", t)).
Then(clientv3.OpPut(compactRevKey, strconv.FormatInt(rev, 10))).
Else(clientv3.OpGet(compactRevKey)).
Commit()
if err != nil {
return t, rev, err
}

curRev := resp.Header.Revision

if !resp.Succeeded {
// TODO: Use Version and not ModRevision when model supports key versioning.
curTime := resp.Responses[0].GetResponseRange().Kvs[0].ModRevision
return curTime, curRev, nil
}
curTime := t + 1

if rev == 0 {
return curTime, curRev, nil
}
if _, err = client.Compact(ctx, rev); err != nil {
return curTime, curRev, err
}
return curTime, curRev, nil
}

type KubernetesRequestType string

const (
Expand Down
38 changes: 5 additions & 33 deletions tests/robustness/traffic/traffic.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"go.etcd.io/etcd/tests/v3/robustness/client"
"go.etcd.io/etcd/tests/v3/robustness/identity"
"go.etcd.io/etcd/tests/v3/robustness/model"
"go.etcd.io/etcd/tests/v3/robustness/random"
"go.etcd.io/etcd/tests/v3/robustness/report"
)

Expand Down Expand Up @@ -81,7 +80,7 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
defer wg.Done()
defer c.Close()

traffic.Run(ctx, c, limiter, ids, lm, nonUniqueWriteLimiter, finish)
traffic.RunTrafficLoop(ctx, c, limiter, ids, lm, nonUniqueWriteLimiter, finish)
mux.Lock()
reports = append(reports, c.Report())
mux.Unlock()
Expand All @@ -101,7 +100,8 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
if profile.CompactPeriod != time.Duration(0) {
compactionPeriod = profile.CompactPeriod
}
RunCompactLoop(ctx, c, compactionPeriod, finish)

traffic.RunCompactLoop(ctx, c, compactionPeriod, finish)
mux.Lock()
reports = append(reports, c.Report())
mux.Unlock()
Expand Down Expand Up @@ -195,35 +195,7 @@ func (p Profile) WithCompactionPeriod(cp time.Duration) Profile {
}

type Traffic interface {
Run(ctx context.Context, c *client.RecordingClient, qpsLimiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIDStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{})
RunTrafficLoop(ctx context.Context, c *client.RecordingClient, qpsLimiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIDStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{})
RunCompactLoop(ctx context.Context, c *client.RecordingClient, period time.Duration, finish <-chan struct{})
ExpectUniqueRevision() bool
}

func RunCompactLoop(ctx context.Context, c *client.RecordingClient, period time.Duration, finish <-chan struct{}) {
var lastRev int64 = 2
timer := time.NewTimer(period)
for {
timer.Reset(period)
select {
case <-ctx.Done():
return
case <-finish:
return
case <-timer.C:
}
statusCtx, cancel := context.WithTimeout(ctx, RequestTimeout)
resp, err := c.Status(statusCtx, c.Endpoints()[0])
cancel()
if err != nil {
continue
}

// Range allows for both revision has been compacted and future revision errors
compactRev := random.RandRange(lastRev, resp.Header.Revision+5)
_, err = c.Compact(ctx, compactRev)
if err != nil {
continue
}
lastRev = compactRev
}
}

0 comments on commit 7c56ef2

Please sign in to comment.