-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-17125: Streams Sticky Task Assignor #18652
base: trunk
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @aliehsaeedii - I made a pass and have a few comments.
public class ProcessState { | ||
private final String processId; | ||
// number of members | ||
private int capacity; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: why not just make it numMembersCapacity
although it's fine as is.
this.capacity = 0; | ||
this.load = Double.MAX_VALUE; | ||
this.assignedActiveTasks = new HashMap<>(); | ||
this.assignedStandbyTasks = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we're not going to consider warm-up tasks as part of the load?
} | ||
|
||
private Set<TaskId> toTaskIds(final GroupSpec groupSpec, final TopologyDescriber topologyDescriber, final boolean isActive) { | ||
Set<TaskId> ret = new HashSet<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe a more clear name like extractedTaskIds
or taskIds
- I realize I'm being a bit picky given the TaskId
in the generics.
|
||
private GroupAssignment buildGroupAssignment(final Set<String> members) { | ||
final Map<String, MemberAssignment> memberAssignments = new HashMap<>(); | ||
final Map<String, Set<TaskId>> activeTasksAssignments = activeTasksAssignments(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could make these parameters to the method, but it comes down to personal choice so I'll leave it to you.
processIdToState.putIfAbsent(processId, new ProcessState(processId)); | ||
processIdToState.get(processId).addMember(memberId); | ||
// prev active tasks | ||
for (Map.Entry<String, Set<Integer>> entry : memberSpec.activeTasks().entrySet()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A more general question comes to mind, at this point how will we handle members that have previous assignments but are now fenced due to inactivity/non-responsive/dropped out of group before the AssignmentMemberSpec
is updated?
Optional<ProcessState> processWithLeastLoad = rightPairs.stream() | ||
.map(member -> processIdToState.get(member.processId)) | ||
.min(Comparator.comparingDouble(ProcessState::load)); | ||
assert processWithLeastLoad.isPresent(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought assert
has to be explicitly enabled - maybe use processWithLeastLoad.orElseThrow()
I could be missing something here though.
return memberWithLeastLoad(processWithLeastLoad.get()); | ||
} | ||
|
||
private Member findMemberWithLeastLoad(final TaskId taskId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I get what's going on here, but I'm finding it a little challenging to follow exactly what the flow is with all these overloaded findMemberWithLeastLoad
perhaps a short one-line comment for each?
for (int i = 0; i < numStandbyReplicas; i++) { | ||
final Set<String> availableProcesses = findAllowedProcesses(task); | ||
if (availableProcesses.isEmpty()) { | ||
log.warn("Unable to assign " + (numStandbyReplicas - i) + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we throw here instead of just a warning?
return process.hasCapacity() || isLeastLoadedProcess(process.load()); | ||
} | ||
|
||
private boolean isLeastLoadedProcess(final double load) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another more general question - when it comes to calculating load we will consider stateful vs. stateless ?
No description provided.