-
Notifications
You must be signed in to change notification settings - Fork 237
improve: filter only own updates for read-after-write-consistency #3399
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
16e693e
8998d62
560ce1b
474d2db
8b5de53
3c1f963
4b18e65
cf0d0d3
1ec53fb
cf14ec3
b83d1fc
9c6a777
c35b51c
e74fbf1
b8279c1
fb9af1a
ac7bb83
1f4fd4f
7a101b4
c5d582c
f2d3f6e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,26 +24,32 @@ | |
| import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent; | ||
|
|
||
| /** Used only for resource event filtering. */ | ||
| public class ExtendedResourceEvent extends ResourceEvent { | ||
| public class GenericResourceEvent extends ResourceEvent { | ||
|
|
||
| private final HasMetadata previousResource; | ||
| private final Boolean lastStateUnknow; | ||
|
|
||
| public ExtendedResourceEvent( | ||
| public GenericResourceEvent( | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We shouldn't do this kind of renames of public classes in minors
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is strictly internal class. Maybe we should explicitly annotate public APIs |
||
| ResourceAction action, | ||
| ResourceID resourceID, | ||
| HasMetadata latestResource, | ||
| HasMetadata previousResource) { | ||
| super(action, resourceID, latestResource); | ||
| HasMetadata previousResource, | ||
| Boolean lastStateUnknow) { | ||
| super(action, ResourceID.fromResource(latestResource), latestResource); | ||
| this.previousResource = previousResource; | ||
| this.lastStateUnknow = lastStateUnknow; | ||
| } | ||
|
|
||
| public Optional<HasMetadata> getPreviousResource() { | ||
| return Optional.ofNullable(previousResource); | ||
| } | ||
|
|
||
| public Boolean getLastStateUnknow() { | ||
| return lastStateUnknow; | ||
| } | ||
|
|
||
| @Override | ||
| public String toString() { | ||
| return "ExtendedResourceEvent{" | ||
| return "GenericResourceEvent{" | ||
| + getPreviousResource() | ||
| .map(r -> "previousResourceVersion=" + r.getMetadata().getResourceVersion()) | ||
| .orElse("") | ||
|
|
@@ -61,7 +67,7 @@ public String toString() { | |
| public boolean equals(Object o) { | ||
| if (o == null || getClass() != o.getClass()) return false; | ||
| if (!super.equals(o)) return false; | ||
| ExtendedResourceEvent that = (ExtendedResourceEvent) o; | ||
| GenericResourceEvent that = (GenericResourceEvent) o; | ||
| return Objects.equals(previousResource, that.previousResource); | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -33,7 +33,6 @@ | |||||
| import io.javaoperatorsdk.operator.processing.event.ResourceID; | ||||||
| import io.javaoperatorsdk.operator.processing.event.source.PrimaryToSecondaryMapper; | ||||||
| import io.javaoperatorsdk.operator.processing.event.source.ResourceAction; | ||||||
| import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.EventHandling; | ||||||
|
|
||||||
| /** | ||||||
| * Wraps informer(s) so they are connected to the eventing system of the framework. Note that since | ||||||
|
|
@@ -123,8 +122,15 @@ public synchronized void onDelete(R resource, boolean deletedFinalStateUnknown) | |||||
| log.debug( | ||||||
| "On delete event received. deletedFinalStateUnknown: {}", deletedFinalStateUnknown); | ||||||
| } | ||||||
| var resultEvent = | ||||||
| temporaryResourceCache.onDeleteEvent(resource, deletedFinalStateUnknown); | ||||||
| if (resultEvent.isEmpty()) { | ||||||
| return; | ||||||
| } | ||||||
| if (resultEvent.orElseThrow().getAction() != ResourceAction.DELETED) { | ||||||
| log.warn("Non delete event received on onDelete handling. This should not happen."); | ||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe we should also log the real event received
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't want to log the whole event, that might even contain sensitive enformation (think of k8s Secrets). We already logging metadata with MDC. |
||||||
| } | ||||||
| primaryToSecondaryIndex.onDelete(resource); | ||||||
| temporaryResourceCache.onDeleteEvent(resource, deletedFinalStateUnknown); | ||||||
| if (acceptedByDeleteFilters(resource, deletedFinalStateUnknown)) { | ||||||
| propagateEvent(resource); | ||||||
| } | ||||||
|
|
@@ -152,22 +158,26 @@ private synchronized void onAddOrUpdate(ResourceAction action, R newObject, R ol | |||||
| primaryToSecondaryIndex.onAddOrUpdate(newObject); | ||||||
| var resourceID = ResourceID.fromResource(newObject); | ||||||
|
|
||||||
| var eventHandling = temporaryResourceCache.onAddOrUpdateEvent(action, newObject, oldObject); | ||||||
| var resultEvent = temporaryResourceCache.onAddOrUpdateEvent(action, newObject, oldObject); | ||||||
|
|
||||||
| if (eventHandling != EventHandling.NEW) { | ||||||
| log.debug( | ||||||
| "{} event propagation", eventHandling == EventHandling.DEFER ? "Deferring" : "Skipping"); | ||||||
| if (resultEvent.isEmpty()) { | ||||||
| log.debug("Deferring event propagation"); | ||||||
| } else if (eventAcceptedByFilter(action, newObject, oldObject)) { | ||||||
| log.debug( | ||||||
| "Propagating event for {}, resource with same version not result of a reconciliation.", | ||||||
| "Propagating event for {}, resource with same version not result of a our update.", | ||||||
| action); | ||||||
|
csviri marked this conversation as resolved.
Comment on lines
165
to
168
|
||||||
| propagateEvent(newObject); | ||||||
| var event = resultEvent.get(); | ||||||
| handleEvent( | ||||||
| event.getAction(), | ||||||
| (R) event.getResource().orElseThrow(), | ||||||
| (R) event.getPreviousResource().orElse(null), | ||||||
| event.getLastStateUnknow()); | ||||||
| } else { | ||||||
| log.debug("Event filtered out for operation: {}, resourceID: {}", action, resourceID); | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| private void propagateEvent(R object) { | ||||||
| protected void propagateEvent(R object) { | ||||||
| var primaryResourceIdSet = | ||||||
| configuration().getSecondaryToPrimaryMapper().toPrimaryResourceIDs(object); | ||||||
| if (primaryResourceIdSet.isEmpty()) { | ||||||
|
|
||||||
Uh oh!
There was an error while loading. Please reload this page.