Skip to content

Commit

Permalink
Add operation param to syncer TransformFunc (#83)
Browse files Browse the repository at this point in the history
Lighthouse has a use case to have the operation - Create, Update,
or Delete - passed into the TransformFunc.

Signed-off-by: Tom Pantelis <[email protected]>
  • Loading branch information
tpantelis authored Jul 8, 2020
1 parent d52c64c commit b9748da
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 14 deletions.
5 changes: 3 additions & 2 deletions pkg/syncer/broker/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/submariner-io/admiral/pkg/fake"
sync "github.com/submariner-io/admiral/pkg/syncer"
"github.com/submariner-io/admiral/pkg/syncer/test"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -128,7 +129,7 @@ var _ = Describe("Broker Syncer", func() {
When("a local transform function is specified", func() {
BeforeEach(func() {
transformed = test.NewPodWithImage(config.LocalNamespace, "transformed")
config.ResourceConfigs[0].LocalTransform = func(from runtime.Object) (runtime.Object, bool) {
config.ResourceConfigs[0].LocalTransform = func(from runtime.Object, op sync.Operation) (runtime.Object, bool) {
return transformed, false
}
})
Expand All @@ -146,7 +147,7 @@ var _ = Describe("Broker Syncer", func() {
When("a broker transform function is specified", func() {
BeforeEach(func() {
transformed = test.NewPodWithImage(config.LocalNamespace, "transformed")
config.ResourceConfigs[0].BrokerTransform = func(from runtime.Object) (runtime.Object, bool) {
config.ResourceConfigs[0].BrokerTransform = func(from runtime.Object, op sync.Operation) (runtime.Object, bool) {
return transformed, false
}
})
Expand Down
51 changes: 44 additions & 7 deletions pkg/syncer/resource_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,29 @@ const (
RemoteToLocal
)

type Operation int

const (
Create Operation = iota
Update
Delete
)

func (o Operation) String() string {
switch o {
case Create:
return "create"
case Update:
return "update"
default:
return "delete"
}
}

// TransformFunc is invoked prior to syncing to transform the resource or evaluate if it should be synced.
// If nil is returned, the resource is not synced and, if the second return value is true, the resource is re-queued
// to be retried later.
type TransformFunc func(from runtime.Object) (runtime.Object, bool)
type TransformFunc func(from runtime.Object, op Operation) (runtime.Object, bool)

type ResourceSyncerConfig struct {
// Name of this syncer used for logging.
Expand Down Expand Up @@ -80,6 +99,7 @@ type resourceSyncer struct {
store cache.Store
config ResourceSyncerConfig
deleted sync.Map
created sync.Map
}

func NewResourceSyncer(config *ResourceSyncerConfig) (Interface, error) {
Expand Down Expand Up @@ -107,7 +127,7 @@ func NewResourceSyncer(config *ResourceSyncerConfig) (Interface, error) {
return resourceClient.Watch(options)
},
}, &unstructured.Unstructured{}, 0, cache.ResourceEventHandlerFuncs{
AddFunc: syncer.workQueue.Enqueue,
AddFunc: syncer.onCreate,
UpdateFunc: syncer.onUpdate,
DeleteFunc: syncer.onDelete,
})
Expand Down Expand Up @@ -146,11 +166,20 @@ func (r *resourceSyncer) processNextWorkItem(key, name, ns string) (bool, error)

resource := obj.(*unstructured.Unstructured)

klog.V(log.DEBUG).Infof("Syncer %q retrieved added or updated resource: %#v", r.config.Name, resource)
op := Update
_, found := r.created.Load(key)
if found {
op = Create
}

klog.V(log.DEBUG).Infof("Syncer %q retrieved %sd resource: %#v", r.config.Name, op, resource)

if r.shouldSync(resource) {
resource, requeue := r.transform(resource)
resource, requeue := r.transform(resource, op)
if resource == nil {
if !requeue {
r.created.Delete(key)
}
return requeue, nil
}

Expand All @@ -164,6 +193,7 @@ func (r *resourceSyncer) processNextWorkItem(key, name, ns string) (bool, error)
klog.V(log.DEBUG).Infof("Syncer %q successfully synced %q", r.config.Name, key)
}

r.created.Delete(key)
return false, nil
}

Expand All @@ -180,7 +210,7 @@ func (r *resourceSyncer) handleDeleted(key string) (bool, error) {

deletedResource := obj.(*unstructured.Unstructured)
if r.shouldSync(deletedResource) {
resource, requeue := r.transform(deletedResource)
resource, requeue := r.transform(deletedResource, Delete)
if resource == nil {
if requeue {
r.deleted.Store(key, deletedResource)
Expand All @@ -207,7 +237,7 @@ func (r *resourceSyncer) handleDeleted(key string) (bool, error) {
return false, nil
}

func (r *resourceSyncer) transform(from *unstructured.Unstructured) (*unstructured.Unstructured, bool) {
func (r *resourceSyncer) transform(from *unstructured.Unstructured, op Operation) (*unstructured.Unstructured, bool) {
if r.config.Transform == nil {
return from, false
}
Expand All @@ -221,7 +251,7 @@ func (r *resourceSyncer) transform(from *unstructured.Unstructured) (*unstructur
return nil, false
}

transformed, requeue := r.config.Transform(converted)
transformed, requeue := r.config.Transform(converted, op)
if transformed == nil {
klog.V(log.DEBUG).Infof("Syncer %q: transform function returned nil - not syncing - requeue: %v", r.config.Name, requeue)
return nil, requeue
Expand All @@ -241,6 +271,13 @@ func (r *resourceSyncer) transform(from *unstructured.Unstructured) (*unstructur
return result, false
}

func (r *resourceSyncer) onCreate(resource interface{}) {
key, _ := cache.MetaNamespaceKeyFunc(resource)
v := true
r.created.Store(key, &v)
r.workQueue.Enqueue(resource)
}

func (r *resourceSyncer) onUpdate(old interface{}, new interface{}) {
oldObj := old.(*unstructured.Unstructured)
newObj := new.(*unstructured.Unstructured)
Expand Down
38 changes: 35 additions & 3 deletions pkg/syncer/resource_syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,15 +135,18 @@ func testTransformFunction() {
d := newTestDiver(test.LocalNamespace, "", syncer.LocalToRemote)

var transformed *corev1.Pod
var expOperation chan syncer.Operation

BeforeEach(func() {
expOperation = make(chan syncer.Operation, 20)
transformed = test.NewPodWithImage(d.config.SourceNamespace, "transformed")
d.config.Transform = func(from runtime.Object) (runtime.Object, bool) {
d.config.Transform = func(from runtime.Object, op syncer.Operation) (runtime.Object, bool) {
defer GinkgoRecover()
pod, ok := from.(*corev1.Pod)
Expect(ok).To(BeTrue(), "Expected a Pod object: %#v", from)
Expect(equality.Semantic.DeepDerivative(d.resource.Spec, pod.Spec)).To(BeTrue(),
"Expected:\n%#v\n to be equivalent to: \n%#v", pod.Spec, d.resource.Spec)
expOperation <- op
return transformed, false
}
})
Expand All @@ -152,6 +155,7 @@ func testTransformFunction() {
It("should distribute the transformed resource", func() {
test.CreateResource(d.sourceClient, d.resource)
d.federator.VerifyDistribute(test.ToUnstructured(transformed))
Eventually(expOperation).Should(Receive(Equal(syncer.Create)))
})
})

Expand All @@ -162,10 +166,12 @@ func testTransformFunction() {

It("should distribute the transformed resource", func() {
d.federator.VerifyDistribute(test.ToUnstructured(transformed))
Eventually(expOperation).Should(Receive(Equal(syncer.Create)))

d.resource = test.NewPodWithImage(d.config.SourceNamespace, "updated")
test.UpdateResource(d.sourceClient, test.NewPodWithImage(d.config.SourceNamespace, "updated"))
d.federator.VerifyDistribute(test.ToUnstructured(transformed))
Eventually(expOperation).Should(Receive(Equal(syncer.Update)))
})
})

Expand All @@ -176,9 +182,11 @@ func testTransformFunction() {

It("should delete the transformed resource", func() {
d.federator.VerifyDistribute(test.ToUnstructured(transformed))
Eventually(expOperation).Should(Receive(Equal(syncer.Create)))

Expect(d.sourceClient.Delete(d.resource.GetName(), nil)).To(Succeed())
d.federator.VerifyDelete(test.ToUnstructured(transformed))
Eventually(expOperation).Should(Receive(Equal(syncer.Delete)))
})
})

Expand All @@ -190,9 +198,25 @@ func testTransformFunction() {

It("should retry until it succeeds", func() {
d.federator.VerifyDistribute(test.ToUnstructured(transformed))
Eventually(expOperation).Should(Receive(Equal(syncer.Create)))

Expect(d.sourceClient.Delete(d.resource.GetName(), nil)).To(Succeed())
d.federator.VerifyDelete(test.ToUnstructured(transformed))
Eventually(expOperation).Should(Receive(Equal(syncer.Delete)))
Eventually(expOperation).Should(Receive(Equal(syncer.Delete)))
})
})

When("distribute for the transformed resource initially fails", func() {
JustBeforeEach(func() {
d.federator.FailOnDistribute = errors.New("fake error")
})

It("retry until it succeeds", func() {
test.CreateResource(d.sourceClient, d.resource)
d.federator.VerifyDistribute(test.ToUnstructured(transformed))
Eventually(expOperation).Should(Receive(Equal(syncer.Create)))
Eventually(expOperation).Should(Receive(Equal(syncer.Create)))
})
})

Expand All @@ -201,8 +225,9 @@ func testTransformFunction() {

BeforeEach(func() {
atomic.StoreInt32(&count, 0)
d.config.Transform = func(from runtime.Object) (runtime.Object, bool) {
d.config.Transform = func(from runtime.Object, op syncer.Operation) (runtime.Object, bool) {
atomic.AddInt32(&count, 1)
expOperation <- op
return nil, false
}
})
Expand All @@ -212,6 +237,7 @@ func testTransformFunction() {
test.CreateResource(d.sourceClient, d.resource)
d.federator.VerifyNoDistribute()
Expect(int(atomic.LoadInt32(&count))).To(Equal(1))
Eventually(expOperation).Should(Receive(Equal(syncer.Create)))
})
})

Expand All @@ -223,10 +249,12 @@ func testTransformFunction() {
It("should not delete the resource", func() {
d.federator.VerifyNoDistribute()
atomic.StoreInt32(&count, 0)
Eventually(expOperation).Should(Receive(Equal(syncer.Create)))

Expect(d.sourceClient.Delete(d.resource.GetName(), nil)).To(Succeed())
d.federator.VerifyNoDelete()
Expect(int(atomic.LoadInt32(&count))).To(Equal(1))
Eventually(expOperation).Should(Receive(Equal(syncer.Delete)))
})
})
})
Expand All @@ -238,14 +266,15 @@ func testTransformFunction() {
BeforeEach(func() {
transformFuncRet = &atomic.Value{}
transformFuncRet.Store(nilResource)
d.config.Transform = func(from runtime.Object) (runtime.Object, bool) {
d.config.Transform = func(from runtime.Object, op syncer.Operation) (runtime.Object, bool) {
var ret runtime.Object
v := transformFuncRet.Load()
if v != nilResource {
ret = v.(runtime.Object)
}

transformFuncRet.Store(transformed)
expOperation <- op
return ret, true
}
})
Expand All @@ -254,6 +283,7 @@ func testTransformFunction() {
It("should eventually distribute the transformed resource", func() {
test.CreateResource(d.sourceClient, d.resource)
d.federator.VerifyDistribute(test.ToUnstructured(transformed))
Eventually(expOperation).Should(Receive(Equal(syncer.Create)))
})
})

Expand All @@ -264,10 +294,12 @@ func testTransformFunction() {

It("should eventually delete the resource", func() {
d.federator.VerifyDistribute(test.ToUnstructured(transformed))
Eventually(expOperation).Should(Receive(Equal(syncer.Create)))
transformFuncRet.Store(nilResource)

Expect(d.sourceClient.Delete(d.resource.GetName(), nil)).To(Succeed())
d.federator.VerifyDelete(test.ToUnstructured(transformed))
Eventually(expOperation).Should(Receive(Equal(syncer.Delete)))
})
})
})
Expand Down
5 changes: 3 additions & 2 deletions test/e2e/syncer/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/submariner-io/admiral/pkg/syncer"
"github.com/submariner-io/admiral/pkg/syncer/broker"
"github.com/submariner-io/admiral/pkg/syncer/test"
testV1 "github.com/submariner-io/admiral/test/apis/admiral.submariner.io/v1"
Expand Down Expand Up @@ -46,7 +47,7 @@ func testWithLocalTransform() {

BeforeEach(func() {
t.brokerResourceType = &testV1.ExportedToaster{}
t.localTransform = func(from runtime.Object) (runtime.Object, bool) {
t.localTransform = func(from runtime.Object, op syncer.Operation) (runtime.Object, bool) {
toaster := from.(*testV1.Toaster)
return &testV1.ExportedToaster{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -73,7 +74,7 @@ func testWithLocalTransform() {
type testDriver struct {
framework *framework.Framework
localSourceNamespace string
localTransform func(from runtime.Object) (runtime.Object, bool)
localTransform func(from runtime.Object, op syncer.Operation) (runtime.Object, bool)
brokerResourceType runtime.Object
clusterClients []dynamic.Interface
stopCh chan struct{}
Expand Down

0 comments on commit b9748da

Please sign in to comment.