Skip to content

Commit

Permalink
Add support for resource syncer to use a shared informer
Browse files Browse the repository at this point in the history
Some users may want to configure a shared informer rather than use
the internal dedicated informer for efficiency if there's multiple
consumers of a resource type.

Signed-off-by: Tom Pantelis <[email protected]>
  • Loading branch information
tpantelis committed Apr 29, 2024
1 parent 4c6e959 commit bf265b8
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 20 deletions.
92 changes: 73 additions & 19 deletions pkg/syncer/resource_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ type ResourceSyncerConfig struct {

type resourceSyncer struct {
workQueue workqueue.Interface
hasSynced func() bool
informer cache.Controller
store cache.Store
config ResourceSyncerConfig
Expand All @@ -199,6 +200,56 @@ type resourceSyncer struct {
}

func NewResourceSyncer(config *ResourceSyncerConfig) (Interface, error) {
syncer := newResourceSyncer(config)

rawType, gvr, err := util.ToUnstructuredResource(config.ResourceType, config.RestMapper)
if err != nil {
return nil, err //nolint:wrapcheck // OK to return the error as is.
}

resourceClient := config.SourceClient.Resource(*gvr).Namespace(config.SourceNamespace)

syncer.store, syncer.informer = cache.NewTransformingInformer(&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.LabelSelector = config.SourceLabelSelector
options.FieldSelector = config.SourceFieldSelector
return resourceClient.List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.LabelSelector = config.SourceLabelSelector
options.FieldSelector = config.SourceFieldSelector
return resourceClient.Watch(context.TODO(), options)
},
}, rawType, config.ResyncPeriod, cache.ResourceEventHandlerFuncs{
AddFunc: syncer.onCreate,
UpdateFunc: syncer.onUpdate,
DeleteFunc: syncer.onDelete,
}, resourceUtil.TrimManagedFields)

syncer.hasSynced = syncer.informer.HasSynced

return syncer, nil
}

func NewResourceSyncerWithSharedInformer(config *ResourceSyncerConfig, informer cache.SharedInformer) (Interface, error) {
syncer := newResourceSyncer(config)
syncer.store = informer.GetStore()

reg, err := informer.AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{
AddFunc: syncer.onCreate,
UpdateFunc: syncer.onUpdate,
DeleteFunc: syncer.onDelete,
}, config.ResyncPeriod)
if err != nil {
return nil, errors.Wrapf(err, "error registering even handler")
}

syncer.hasSynced = reg.HasSynced

return syncer, nil
}

func newResourceSyncer(config *ResourceSyncerConfig) *resourceSyncer {
syncer := &resourceSyncer{
config: *config,
stopped: make(chan struct{}),
Expand All @@ -218,11 +269,6 @@ func NewResourceSyncer(config *ResourceSyncerConfig) (Interface, error) {
syncer.config.WaitForCacheSync = &wait
}

rawType, gvr, err := util.ToUnstructuredResource(config.ResourceType, config.RestMapper)
if err != nil {
return nil, err //nolint:wrapcheck // OK to return the error as is.
}

if syncer.config.SyncCounter != nil {
syncer.syncCounter = syncer.config.SyncCounter
} else if syncer.config.SyncCounterOpts != nil {
Expand All @@ -239,26 +285,30 @@ func NewResourceSyncer(config *ResourceSyncerConfig) (Interface, error) {

syncer.workQueue = workqueue.New(config.Name)

return syncer
}

func NewSharedInformer(config *ResourceSyncerConfig) (cache.SharedInformer, error) {
rawType, gvr, err := util.ToUnstructuredResource(config.ResourceType, config.RestMapper)
if err != nil {
return nil, err //nolint:wrapcheck // OK to return the error as is.
}

resourceClient := config.SourceClient.Resource(*gvr).Namespace(config.SourceNamespace)

syncer.store, syncer.informer = cache.NewTransformingInformer(&cache.ListWatch{
informer := cache.NewSharedIndexInformerWithOptions(&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.LabelSelector = config.SourceLabelSelector
options.FieldSelector = config.SourceFieldSelector
return resourceClient.List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.LabelSelector = config.SourceLabelSelector
options.FieldSelector = config.SourceFieldSelector
return resourceClient.Watch(context.TODO(), options)
},
}, rawType, config.ResyncPeriod, cache.ResourceEventHandlerFuncs{
AddFunc: syncer.onCreate,
UpdateFunc: syncer.onUpdate,
DeleteFunc: syncer.onDelete,
}, resourceUtil.TrimManagedFields)
}, rawType, cache.SharedIndexInformerOptions{
ResyncPeriod: config.ResyncPeriod,
})

return syncer, nil
//nolint:wrapcheck // OK to return the error as is.
return informer, informer.SetTransform(resourceUtil.TrimManagedFields)
}

func (r *resourceSyncer) Start(stopCh <-chan struct{}) error {
Expand All @@ -277,13 +327,17 @@ func (r *resourceSyncer) Start(stopCh <-chan struct{}) error {
}()
defer r.workQueue.ShutDownWithDrain()

r.informer.Run(stopCh)
if r.informer != nil {
r.informer.Run(stopCh)
} else {
<-r.stopCh
}
}()

if *r.config.WaitForCacheSync {
r.log.V(log.LIBDEBUG).Infof("Syncer %q waiting for informer cache to sync", r.config.Name)

_ = cache.WaitForCacheSync(stopCh, r.informer.HasSynced)
_ = cache.WaitForCacheSync(stopCh, r.hasSynced)
}

r.workQueue.Run(stopCh, r.processNextWorkItem)
Expand Down Expand Up @@ -412,7 +466,7 @@ func (r *resourceSyncer) doReconcile(resourceLister func() []runtime.Object) {
}

func (r *resourceSyncer) runIfCacheSynced(defaultReturn any, run func() any) any {
if ok := cache.WaitForCacheSync(r.stopCh, r.informer.HasSynced); !ok {
if ok := cache.WaitForCacheSync(r.stopCh, r.hasSynced); !ok {
// This means the cache was stopped.
r.log.Warningf("Syncer %q failed to wait for informer cache to sync", r.config.Name)

Expand Down
33 changes: 32 additions & 1 deletion pkg/syncer/resource_syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/dynamic"
fakeClient "k8s.io/client-go/dynamic/fake"
"k8s.io/client-go/tools/cache"
"k8s.io/utils/ptr"
)

Expand All @@ -71,6 +72,7 @@ var _ = Describe("Resource Syncer", func() {
Context(fmt.Sprintf("Direction: %s", syncer.None), testReconcileNoDirection)
})
Describe("Trim Resource Fields", testTrimResourceFields)
Describe("With SharedInformer", testWithSharedInformer)
})

func testReconcileLocalToRemote() {
Expand Down Expand Up @@ -1216,6 +1218,18 @@ func testTrimResourceFields() {
})
}

func testWithSharedInformer() {
d := newTestDriver(test.LocalNamespace, "", syncer.LocalToRemote)

BeforeEach(func() {
d.useSharedInformer = true
})

When("a resource is created in the local datastore", func() {
d.verifyDistributeOnCreateTest("")
})
}

func assertResourceList(actual []runtime.Object, expected ...*corev1.Pod) {
expSpecs := map[string]*corev1.PodSpec{}
for i := range expected {
Expand All @@ -1235,6 +1249,7 @@ func assertResourceList(actual []runtime.Object, expected ...*corev1.Pod) {

type testDriver struct {
config syncer.ResourceSyncerConfig
useSharedInformer bool
syncer syncer.Interface
sourceClient dynamic.ResourceInterface
federator *fake.Federator
Expand Down Expand Up @@ -1270,6 +1285,7 @@ func newTestDriver(sourceNamespace, localClusterID string, syncDirection syncer.
d.config.OnSuccessfulSync = nil
d.config.ResourcesEquivalent = nil
d.config.ResyncPeriod = 0
d.useSharedInformer = false

err := corev1.AddToScheme(d.config.Scheme)
Expect(err).To(Succeed())
Expand All @@ -1286,7 +1302,22 @@ func newTestDriver(sourceNamespace, localClusterID string, syncDirection syncer.
d.sourceClient = d.config.SourceClient.Resource(*gvr).Namespace(d.config.SourceNamespace)

var err error
d.syncer, err = syncer.NewResourceSyncer(&d.config)

if d.useSharedInformer {
var sharedInformer cache.SharedInformer

sharedInformer, err = syncer.NewSharedInformer(&d.config)
Expect(err).To(Succeed())

d.syncer, err = syncer.NewResourceSyncerWithSharedInformer(&d.config, sharedInformer)

go func() {
sharedInformer.Run(d.stopCh)
}()
} else {
d.syncer, err = syncer.NewResourceSyncer(&d.config)
}

Expect(err).To(Succeed())

utilruntime.ErrorHandlers = append(utilruntime.ErrorHandlers, func(err error) {
Expand Down
5 changes: 5 additions & 0 deletions pkg/syncer/syncer_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ limitations under the License.
package syncer_test

import (
"flag"
"testing"

. "github.com/onsi/ginkgo/v2"
Expand All @@ -26,6 +27,10 @@ import (
)

func init() {
flags := flag.NewFlagSet("kzerolog", flag.ExitOnError)
kzerolog.AddFlags(flags)
_ = flags.Parse([]string{"-v=1"})

kzerolog.AddFlags(nil)
}

Expand Down

0 comments on commit bf265b8

Please sign in to comment.