Skip to content

Commit

Permalink
Add option to re-queue from TransformFunc
Browse files Browse the repository at this point in the history
See #67 for use case.

Added a boolean return value from the TransformFunc to specify to
re-queue/retry.

Signed-off-by: Tom Pantelis <[email protected]>
  • Loading branch information
tpantelis authored and mangelajo committed Jul 3, 2020
1 parent eeb8191 commit a03e294
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 38 deletions.
6 changes: 2 additions & 4 deletions pkg/federate/fake/federator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,24 @@ func New() *Federator {
}

func (f *Federator) Distribute(resource runtime.Object) error {
f.distribute <- resource

err := f.FailOnDistribute
if err != nil {
f.FailOnDistribute = nil
return err
}

f.distribute <- resource
return nil
}

func (f *Federator) Delete(resource runtime.Object) error {
f.delete <- resource

err := f.FailOnDelete
if err != nil {
f.FailOnDelete = nil
return err
}

f.delete <- resource
return nil
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/syncer/broker/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ var _ = Describe("Broker Syncer", func() {
When("a local transform function is specified", func() {
BeforeEach(func() {
transformed = test.NewPodWithImage(config.LocalNamespace, "transformed")
config.ResourceConfigs[0].LocalTransform = func(from runtime.Object) runtime.Object {
return transformed
config.ResourceConfigs[0].LocalTransform = func(from runtime.Object) (runtime.Object, bool) {
return transformed, false
}
})

Expand All @@ -146,8 +146,8 @@ var _ = Describe("Broker Syncer", func() {
When("a broker transform function is specified", func() {
BeforeEach(func() {
transformed = test.NewPodWithImage(config.LocalNamespace, "transformed")
config.ResourceConfigs[0].BrokerTransform = func(from runtime.Object) runtime.Object {
return transformed
config.ResourceConfigs[0].BrokerTransform = func(from runtime.Object) (runtime.Object, bool) {
return transformed, false
}
})

Expand Down
38 changes: 21 additions & 17 deletions pkg/syncer/resource_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ const (
)

// TransformFunc is invoked prior to syncing to transform the resource or evaluate if it should be synced.
// If nil is returned, the resource is not synced.
type TransformFunc func(from runtime.Object) runtime.Object
// If nil is returned, the resource is not synced and, if the second return value is true, the resource is re-queued
// to be retried later.
type TransformFunc func(from runtime.Object) (runtime.Object, bool)

type ResourceSyncerConfig struct {
// Name of this syncer used for logging.
Expand Down Expand Up @@ -148,9 +149,9 @@ func (r *resourceSyncer) processNextWorkItem(key, name, ns string) (bool, error)
klog.V(log.DEBUG).Infof("Syncer %q retrieved added or updated resource: %#v", r.config.Name, resource)

if r.shouldSync(resource) {
resource = r.transform(resource)
resource, requeue := r.transform(resource)
if resource == nil {
return false, nil
return requeue, nil
}

klog.V(log.DEBUG).Infof("Syncing resource: %#v", resource)
Expand All @@ -177,11 +178,14 @@ func (r *resourceSyncer) handleDeleted(key string) (bool, error) {

r.deleted.Delete(key)

resource := obj.(*unstructured.Unstructured)
if r.shouldSync(resource) {
resource = r.transform(resource)
deletedResource := obj.(*unstructured.Unstructured)
if r.shouldSync(deletedResource) {
resource, requeue := r.transform(deletedResource)
if resource == nil {
return false, nil
if requeue {
r.deleted.Store(key, deletedResource)
}
return requeue, nil
}

klog.V(log.DEBUG).Infof("Syncer %q deleting resource: %#v", r.config.Name, resource)
Expand All @@ -193,7 +197,7 @@ func (r *resourceSyncer) handleDeleted(key string) (bool, error) {
}

if err != nil {
r.deleted.Store(key, resource)
r.deleted.Store(key, deletedResource)
return true, err
}

Expand All @@ -203,9 +207,9 @@ func (r *resourceSyncer) handleDeleted(key string) (bool, error) {
return false, nil
}

func (r *resourceSyncer) transform(from *unstructured.Unstructured) *unstructured.Unstructured {
func (r *resourceSyncer) transform(from *unstructured.Unstructured) (*unstructured.Unstructured, bool) {
if r.config.Transform == nil {
return from
return from, false
}

clusterID, _ := getClusterIDLabel(from)
Expand All @@ -214,27 +218,27 @@ func (r *resourceSyncer) transform(from *unstructured.Unstructured) *unstructure
err := r.config.Scheme.Convert(from, converted, nil)
if err != nil {
klog.Errorf("Syncer %q: error converting %#v to %T: %v", r.config.Name, from, r.config.ResourceType, err)
return nil
return nil, false
}

transformed := r.config.Transform(converted)
transformed, requeue := r.config.Transform(converted)
if transformed == nil {
klog.V(log.DEBUG).Infof("Syncer %q: transform function returned nil - not syncing", r.config.Name)
return nil
klog.V(log.DEBUG).Infof("Syncer %q: transform function returned nil - not syncing - requeue: %v", r.config.Name, requeue)
return nil, requeue
}

result, err := util.ToUnstructured(transformed)
if err != nil {
klog.Errorf("Syncer %q: error converting transform function result: %v", r.config.Name, err)
return nil
return nil, false
}

// Preserve the cluster ID label
if clusterID != "" {
_ = unstructured.SetNestedField(result.Object, clusterID, util.MetadataField, util.LabelsField, federate.ClusterIDLabelKey)
}

return result
return result, false
}

func (r *resourceSyncer) onUpdate(old interface{}, new interface{}) {
Expand Down
84 changes: 74 additions & 10 deletions pkg/syncer/resource_syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package syncer_test

import (
"errors"
"sync/atomic"
"time"

. "github.com/onsi/ginkgo"
Expand Down Expand Up @@ -137,11 +138,13 @@ func testTransformFunction() {

BeforeEach(func() {
transformed = test.NewPodWithImage(d.config.SourceNamespace, "transformed")
d.config.Transform = func(from runtime.Object) runtime.Object {
d.config.Transform = func(from runtime.Object) (runtime.Object, bool) {
defer GinkgoRecover()
pod, ok := from.(*corev1.Pod)
Expect(ok).To(BeTrue())
Expect(equality.Semantic.DeepDerivative(d.resource.Spec, pod.Spec)).To(BeTrue())
return transformed
Expect(ok).To(BeTrue(), "Expected a Pod object: %#v", from)
Expect(equality.Semantic.DeepDerivative(d.resource.Spec, pod.Spec)).To(BeTrue(),
"Expected:\n%#v\n to be equivalent to: \n%#v", pod.Spec, d.resource.Spec)
return transformed, false
}
})

Expand All @@ -166,7 +169,7 @@ func testTransformFunction() {
})
})

When("a resource is deleted in the datastore", func() {
When("a resource is deleted from the datastore", func() {
BeforeEach(func() {
d.addInitialResource(d.resource)
})
Expand All @@ -179,17 +182,36 @@ func testTransformFunction() {
})
})

When("the transform function returns nil", func() {
When("deletion of the transformed resource initially fails", func() {
BeforeEach(func() {
d.config.Transform = func(from runtime.Object) runtime.Object {
return nil
d.federator.FailOnDelete = errors.New("fake error")
d.addInitialResource(d.resource)
})

It("should retry until it succeeds", func() {
d.federator.VerifyDistribute(test.ToUnstructured(transformed))

Expect(d.sourceClient.Delete(d.resource.GetName(), nil)).To(Succeed())
d.federator.VerifyDelete(test.ToUnstructured(transformed))
})
})

When("the transform function returns nil with no re-queue", func() {
var count int32

BeforeEach(func() {
atomic.StoreInt32(&count, 0)
d.config.Transform = func(from runtime.Object) (runtime.Object, bool) {
atomic.AddInt32(&count, 1)
return nil, false
}
})

When("a resource is created in the datastore", func() {
It("should not distribute the resource", func() {
test.CreateResource(d.sourceClient, d.resource)
d.federator.VerifyNoDistribute()
Expect(int(atomic.LoadInt32(&count))).To(Equal(1))
})
})

Expand All @@ -200,9 +222,52 @@ func testTransformFunction() {

It("should not delete the resource", func() {
d.federator.VerifyNoDistribute()
atomic.StoreInt32(&count, 0)

Expect(d.sourceClient.Delete(d.resource.GetName(), nil)).To(Succeed())
d.federator.VerifyNoDelete()
Expect(int(atomic.LoadInt32(&count))).To(Equal(1))
})
})
})

When("the transform function initially returns nil with re-queue", func() {
nilResource := &corev1.Pod{}
var transformFuncRet *atomic.Value

BeforeEach(func() {
transformFuncRet = &atomic.Value{}
transformFuncRet.Store(nilResource)
d.config.Transform = func(from runtime.Object) (runtime.Object, bool) {
var ret runtime.Object
v := transformFuncRet.Load()
if v != nilResource {
ret = v.(runtime.Object)
}

transformFuncRet.Store(transformed)
return ret, true
}
})

When("a resource is created in the datastore", func() {
It("should eventually distribute the transformed resource", func() {
test.CreateResource(d.sourceClient, d.resource)
d.federator.VerifyDistribute(test.ToUnstructured(transformed))
})
})

When("a resource is deleted in the datastore", func() {
BeforeEach(func() {
d.addInitialResource(d.resource)
})

It("should eventually delete the resource", func() {
d.federator.VerifyDistribute(test.ToUnstructured(transformed))
transformFuncRet.Store(nilResource)

Expect(d.sourceClient.Delete(d.resource.GetName(), nil)).To(Succeed())
d.federator.VerifyDelete(test.ToUnstructured(transformed))
})
})
})
Expand Down Expand Up @@ -239,8 +304,8 @@ func testSyncErrors() {
d.federator.VerifyDistribute(expected)

Expect(d.sourceClient.Delete(d.resource.GetName(), nil)).To(Succeed())
d.federator.VerifyDelete(expected)
Eventually(d.handledError, 5).Should(Receive(ContainErrorSubstring(expectedErr)))
d.federator.VerifyDelete(expected)
})
})

Expand All @@ -255,7 +320,6 @@ func testSyncErrors() {
d.federator.VerifyDistribute(expected)

Expect(d.sourceClient.Delete(d.resource.GetName(), nil)).To(Succeed())
d.federator.VerifyDelete(expected)
Consistently(d.handledError, 300*time.Millisecond).ShouldNot(Receive(), "Error was unexpectedly logged")
})
})
Expand Down
6 changes: 3 additions & 3 deletions test/e2e/syncer/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@ func testWithLocalTransform() {

BeforeEach(func() {
t.brokerResourceType = &testV1.ExportedToaster{}
t.localTransform = func(from runtime.Object) runtime.Object {
t.localTransform = func(from runtime.Object) (runtime.Object, bool) {
toaster := from.(*testV1.Toaster)
return &testV1.ExportedToaster{
ObjectMeta: metav1.ObjectMeta{
Name: toaster.GetName(),
},
Spec: toaster.Spec,
}
}, false
}
})

Expand All @@ -73,7 +73,7 @@ func testWithLocalTransform() {
type testDriver struct {
framework *framework.Framework
localSourceNamespace string
localTransform func(from runtime.Object) runtime.Object
localTransform func(from runtime.Object) (runtime.Object, bool)
brokerResourceType runtime.Object
clusterClients []dynamic.Interface
stopCh chan struct{}
Expand Down

0 comments on commit a03e294

Please sign in to comment.