From bc2a2c4b3f885ac20a4cb66f5075c91cccf1130d Mon Sep 17 00:00:00 2001 From: aliehsaeedii Date: Tue, 21 Jan 2025 09:20:21 +0100 Subject: [PATCH] mv sticky TA --- .../group/streams/assignor/GroupSpec.java | 6 + .../group/streams/assignor/GroupSpecImpl.java | 3 + .../group/streams/assignor/ProcessState.java | 144 +++ .../streams/assignor/StickyTaskAssignor.java | 401 +++++++ .../streams/assignor/GroupSpecImplTest.java | 2 + .../streams/assignor/MockAssignorTest.java | 7 + .../assignor/StickyTaskAssignorTest.java | 1037 +++++++++++++++++ 7 files changed, 1600 insertions(+) create mode 100644 group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/ProcessState.java create mode 100644 group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java create mode 100644 group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/GroupSpec.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/GroupSpec.java index 1a8e7edc01c63..b1581377e024e 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/GroupSpec.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/GroupSpec.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.coordinator.group.streams.assignor; +import java.util.List; import java.util.Map; /** @@ -28,6 +29,11 @@ public interface GroupSpec { */ Map members(); + /** + * @return The list of subtopologies. + */ + List subtopologies(); + /** * @return Any configurations passed to the assignor. */ diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/GroupSpecImpl.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/GroupSpecImpl.java index caa82ed2cb21c..6faea5387a016 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/GroupSpecImpl.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/GroupSpecImpl.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.coordinator.group.streams.assignor; +import java.util.List; import java.util.Map; import java.util.Objects; @@ -26,10 +27,12 @@ * @param assignmentConfigs Any configurations passed to the assignor. */ public record GroupSpecImpl(Map members, + List subtopologies, Map assignmentConfigs) implements GroupSpec { public GroupSpecImpl { Objects.requireNonNull(members); + Objects.requireNonNull(subtopologies); Objects.requireNonNull(assignmentConfigs); } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/ProcessState.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/ProcessState.java new file mode 100644 index 0000000000000..6b64ba87e7087 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/ProcessState.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams.assignor; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static java.util.Collections.unmodifiableSet; +import static org.apache.kafka.common.utils.Utils.union; + +public class ProcessState { + private final String processId; + // number of members + private int capacity; + private double load; + private final Map memberToTaskCounts; + private final Map> assignedActiveTasks; + private final Map> assignedStandbyTasks; + + + ProcessState(final String processId) { + this.processId = processId; + this.capacity = 0; + this.load = Double.MAX_VALUE; + this.assignedActiveTasks = new HashMap<>(); + this.assignedStandbyTasks = new HashMap<>(); + this.memberToTaskCounts = new HashMap<>(); + } + + + public String processId() { + return processId; + } + + public int capacity() { + return capacity; + } + + public int totalTaskCount() { + return assignedStandbyTasks().size() + assignedActiveTasks().size(); + } + + public double load() { + return load; + } + + public Map memberToTaskCounts() { + return memberToTaskCounts; + } + + public Set assignedActiveTasks() { + return assignedActiveTasks.values().stream() + .flatMap(Set::stream) + .collect(Collectors.toSet()); + } + + public Map> assignedActiveTasksByMember() { + return assignedActiveTasks; + } + + public Set assignedStandbyTasks() { + return assignedStandbyTasks.values().stream() + .flatMap(Set::stream) + .collect(Collectors.toSet()); + } + + public Map> assignedStandbyTasksByMember() { + return assignedStandbyTasks; + } + + public void addTask(final String memberId, final TaskId taskId, final boolean isActive) { + if (isActive) { + assignedActiveTasks.putIfAbsent(memberId, new HashSet<>()); + assignedActiveTasks.get(memberId).add(taskId); + } else { + assignedStandbyTasks.putIfAbsent(memberId, new HashSet<>()); + assignedStandbyTasks.get(memberId).add(taskId); + } + memberToTaskCounts.put(memberId, memberToTaskCounts.get(memberId) + 1); + computeLoad(); + } + + private void incrementCapacity() { + capacity++; + computeLoad(); + } + public void computeLoad() { + if (capacity <= 0) { + this.load = -1; + } else { + this.load = (double) totalTaskCount() / capacity; + } + } + + public void addMember(final String member) { + this.memberToTaskCounts.put(member, 0); + incrementCapacity(); + } + + public boolean hasCapacity() { + return totalTaskCount() < capacity; + } + + public int compareTo(final ProcessState other) { + int loadCompare = Double.compare(this.load, other.load()); + if (loadCompare == 0) { + return Integer.compare(other.capacity, this.capacity); + } + return loadCompare; + } + + public boolean hasTask(final TaskId taskId) { + return assignedActiveTasks().contains(taskId) || assignedStandbyTasks().contains(taskId); } + + + Set assignedTasks() { + final Set assignedActiveTaskIds = assignedActiveTasks(); + final Set assignedStandbyTaskIds = assignedStandbyTasks(); + return unmodifiableSet( + union( + () -> new HashSet<>(assignedActiveTaskIds.size() + assignedStandbyTaskIds.size()), + assignedActiveTaskIds, + assignedStandbyTaskIds + ) + ); + } +} \ No newline at end of file diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java new file mode 100644 index 0000000000000..345f9ca6e6aa8 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group.streams.assignor; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +public class StickyTaskAssignor implements TaskAssignor { + + public static final String STICKY_ASSIGNOR_NAME = "sticky"; + private static final Logger log = LoggerFactory.getLogger(StickyTaskAssignor.class); + // helper data structures: + private TaskPairs taskPairs; + Map activeTaskToPrevMember; + Map> standbyTaskToPrevMember; + Map processIdToState; + int allTasks; + int totalCapacity; + int tasksPerMember; + + @Override + public String name() { + return STICKY_ASSIGNOR_NAME; + } + + @Override + public GroupAssignment assign(final GroupSpec groupSpec, final TopologyDescriber topologyDescriber) throws TaskAssignorException { + initialize(groupSpec, topologyDescriber); + //active + Set activeTasks = toTaskIds(groupSpec, topologyDescriber, true); + assignActive(activeTasks); + //standby + final int numStandbyReplicas = + groupSpec.assignmentConfigs().isEmpty() ? 0 + : Integer.parseInt(groupSpec.assignmentConfigs().get("numStandbyReplicas")); + if (numStandbyReplicas > 0) { + Set statefulTasks = toTaskIds(groupSpec, topologyDescriber, false); + assignStandby(statefulTasks, numStandbyReplicas); + } + return buildGroupAssignment(groupSpec.members().keySet()); + } + + private Set toTaskIds(final GroupSpec groupSpec, final TopologyDescriber topologyDescriber, final boolean isActive) { + Set ret = new HashSet<>(); + for (String subtopology : groupSpec.subtopologies()) { + if (isActive || topologyDescriber.isStateful(subtopology)) { + int numberOfPartitions = topologyDescriber.numTasks(subtopology); + for (int i = 0; i < numberOfPartitions; i++) { + ret.add(new TaskId(subtopology, i)); + } + } + } + return ret; + } + + private void initialize(final GroupSpec groupSpec, final TopologyDescriber topologyDescriber) { + allTasks = 0; + for (String subtopology : groupSpec.subtopologies()) { + int numberOfPartitions = topologyDescriber.numTasks(subtopology); + allTasks += numberOfPartitions; + } + totalCapacity = groupSpec.members().size(); + tasksPerMember = computeTasksPerMember(allTasks, totalCapacity); + taskPairs = new TaskPairs(allTasks * (allTasks - 1) / 2); + processIdToState = new HashMap<>(); + activeTaskToPrevMember = new HashMap<>(); + standbyTaskToPrevMember = new HashMap<>(); + for (Map.Entry memberEntry : groupSpec.members().entrySet()) { + final String memberId = memberEntry.getKey(); + final String processId = memberEntry.getValue().processId(); + final Member member = new Member(processId, memberId); + final AssignmentMemberSpec memberSpec = memberEntry.getValue(); + processIdToState.putIfAbsent(processId, new ProcessState(processId)); + processIdToState.get(processId).addMember(memberId); + // prev active tasks + for (Map.Entry> entry : memberSpec.activeTasks().entrySet()) { + Set partitionNoSet = entry.getValue(); + for (int partitionNo : partitionNoSet) { + activeTaskToPrevMember.put(new TaskId(entry.getKey(), partitionNo), member); + } + } + // prev standby tasks + for (Map.Entry> entry : memberSpec.standbyTasks().entrySet()) { + Set partitionNoSet = entry.getValue(); + for (int partitionNo : partitionNoSet) { + TaskId taskId = new TaskId(entry.getKey(), partitionNo); + standbyTaskToPrevMember.putIfAbsent(taskId, new HashSet<>()); + standbyTaskToPrevMember.get(taskId).add(member); + } + } + } + } + + private GroupAssignment buildGroupAssignment(final Set members) { + final Map memberAssignments = new HashMap<>(); + final Map> activeTasksAssignments = activeTasksAssignments(); + final Map> standbyTasksAssignments = standbyTasksAssignments(); + for (String memberId : members) { + Map> activeTasks = new HashMap<>(); + if (activeTasksAssignments.containsKey(memberId)) { + activeTasks = toCompactedTaskIds(activeTasksAssignments.get(memberId)); + } + Map> standByTasks = new HashMap<>(); + if (standbyTasksAssignments.containsKey(memberId)) { + standByTasks = toCompactedTaskIds(standbyTasksAssignments.get(memberId)); + } + memberAssignments.put(memberId, new MemberAssignment(activeTasks, standByTasks, new HashMap<>())); + } + return new GroupAssignment(memberAssignments); + } + + private Map> standbyTasksAssignments() { + return processIdToState.entrySet().stream() + .flatMap(entry -> entry.getValue().assignedStandbyTasksByMember().entrySet().stream()) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (set1, set2) -> { + set1.addAll(set2); + return set1; + })); + } + + private Map> activeTasksAssignments() { + return processIdToState.entrySet().stream() + .flatMap(entry -> entry.getValue().assignedActiveTasksByMember().entrySet().stream()) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (set1, set2) -> { + set1.addAll(set2); + return set1; + })); + } + + private Map> toCompactedTaskIds(final Set taskIds) { + Map> ret = new HashMap<>(); + for (TaskId taskId : taskIds) { + ret.putIfAbsent(taskId.subtopologyId(), new HashSet<>()); + ret.get(taskId.subtopologyId()).add(taskId.partition()); + } + return ret; + } + + private void assignActive(final Set activeTasks) { + // 1. re-assigning existing active tasks to clients that previously had the same active tasks + for (Iterator it = activeTasks.iterator(); it.hasNext();) { + final TaskId task = it.next(); + final Member prevMember = activeTaskToPrevMember.get(task); + if (prevMember != null && hasUnfulfilledQuota(prevMember)) { + processIdToState.get(prevMember.processId).addTask(prevMember.memberId, task, true); + updateHelpers(prevMember, task, true); + it.remove(); + } + } + // 2. re-assigning tasks to clients that previously have seen the same task (as standby task) + for (Iterator it = activeTasks.iterator(); it.hasNext();) { + final TaskId task = it.next(); + final Set prevMembers = standbyTaskToPrevMember.get(task); + if (prevMembers != null && !prevMembers.isEmpty()) { + final Member prevMember = findMemberWithLeastLoad(prevMembers, task, true); + if (prevMember != null && hasUnfulfilledQuota(prevMember)) { + processIdToState.get(prevMember.processId).addTask(prevMember.memberId, task, true); + updateHelpers(prevMember, task, true); + it.remove(); + } + } + } + // 3. assign any remaining unassigned tasks + for (Iterator it = activeTasks.iterator(); it.hasNext();) { + final TaskId task = it.next(); + final Member member = findMemberWithLeastLoad(task); + if (member != null) { + processIdToState.get(member.processId).addTask(member.memberId, task, true); + it.remove(); + updateHelpers(member, task, true); + } + } + } + + private void maybeUpdateTasksPerMember(final int activeTasksNo) { + if (activeTasksNo == tasksPerMember) { + totalCapacity--; + allTasks -= activeTasksNo; + tasksPerMember = computeTasksPerMember(allTasks, totalCapacity); + } + } + + private Member findMemberWithLeastLoad(final Set members, TaskId taskId, final boolean returnSameMember) { + Set rightPairs = members.stream() + .filter(member -> taskPairs.hasNewPair(taskId, processIdToState.get(member.processId).assignedTasks())) + .collect(Collectors.toSet()); + if (rightPairs.isEmpty()) { + rightPairs = members; + } + Optional processWithLeastLoad = rightPairs.stream() + .map(member -> processIdToState.get(member.processId)) + .min(Comparator.comparingDouble(ProcessState::load)); + assert processWithLeastLoad.isPresent(); + // if the same exact former member is needed + if (returnSameMember) { + return standbyTaskToPrevMember.get(taskId).stream() + .filter(standby -> standby.processId.equals(processWithLeastLoad.get().processId())) + .findFirst() + .orElseGet(() -> memberWithLeastLoad(processWithLeastLoad.get())); + } + return memberWithLeastLoad(processWithLeastLoad.get()); + } + + private Member findMemberWithLeastLoad(final TaskId taskId) { + Set allMembers = processIdToState.entrySet().stream() + .flatMap(entry -> entry.getValue().memberToTaskCounts().keySet().stream() + .map(memberId -> new Member(entry.getKey(), memberId))) + .collect(Collectors.toSet()); + return findMemberWithLeastLoad(allMembers, taskId, false); + } + + private Member findMemberWithLeastLoad(final TaskId taskId, final Set processes) { + Set allMembers = processes.stream() + .flatMap(processId -> processIdToState.get(processId).memberToTaskCounts().keySet().stream() + .map(memberId -> new Member(processId, memberId))) + .collect(Collectors.toSet()); + return findMemberWithLeastLoad(allMembers, taskId, false); + } + + private Member memberWithLeastLoad(final ProcessState processWithLeastLoad) { + Optional memberWithLeastLoad = processWithLeastLoad.memberToTaskCounts().entrySet().stream() + .min(Map.Entry.comparingByValue()) + .map(Map.Entry::getKey); + return memberWithLeastLoad.map(memberId -> new Member(processWithLeastLoad.processId(), memberId)).orElse(null); + } + + private boolean hasUnfulfilledQuota(final Member member) { + return processIdToState.get(member.processId).memberToTaskCounts().get(member.memberId) < tasksPerMember; + } + + private void assignStandby(final Set standbyTasks, final int numStandbyReplicas) { + for (TaskId task : standbyTasks) { + for (int i = 0; i < numStandbyReplicas; i++) { + final Set availableProcesses = findAllowedProcesses(task); + if (availableProcesses.isEmpty()) { + log.warn("Unable to assign " + (numStandbyReplicas - i) + + " of " + numStandbyReplicas + " standby tasks for task [" + task + "]. " + + "There is not enough available capacity. You should " + + "increase the number of threads and/or application instances " + + "to maintain the requested number of standby replicas."); + break; + } + Member standby = null; + // prev active task + Member prevMember = activeTaskToPrevMember.get(task); + if (prevMember != null && availableProcesses.contains(prevMember.processId) && isLoadBalanced(prevMember.processId) + && taskPairs.hasNewPair(task, processIdToState.get(prevMember.processId).assignedTasks())) { + standby = prevMember; + } + // prev standby tasks + if (standby == null) { + final Set prevMembers = standbyTaskToPrevMember.get(task); + if (prevMembers != null && !prevMembers.isEmpty()) { + prevMembers.removeIf(member -> !availableProcesses.contains(member.processId)); + prevMember = findMemberWithLeastLoad(prevMembers, task, true); + if (prevMember != null && isLoadBalanced(prevMember.processId)) { + standby = prevMember; + } + } + } + // others + if (standby == null) { + standby = findMemberWithLeastLoad(task, availableProcesses); + } + processIdToState.get(standby.processId).addTask(standby.memberId, task, false); + updateHelpers(standby, task, false); + } + } + } + + private boolean isLoadBalanced(final String processId) { + final ProcessState process = processIdToState.get(processId); + return process.hasCapacity() || isLeastLoadedProcess(process.load()); + } + + private boolean isLeastLoadedProcess(final double load) { + return processIdToState.values().stream() + .allMatch(process -> process.load() >= load); + } + + private Set findAllowedProcesses(final TaskId taskId) { + return processIdToState.values().stream() + .filter(process -> !process.hasTask(taskId)) + .map(ProcessState::processId) + .collect(Collectors.toSet()); + } + + private void updateHelpers(final Member member, final TaskId taskId, final boolean isActive) { + // add all pair combinations: update taskPairs + taskPairs.addPairs(taskId, processIdToState.get(member.processId).assignedTasks()); + if (isActive) { + // update task per process + maybeUpdateTasksPerMember(processIdToState.get(member.processId).assignedActiveTasks().size()); + } + } + + private static int computeTasksPerMember(final int numberOfTasks, final int numberOfMembers) { + if (numberOfMembers == 0) { + return 0; + } + int tasksPerMember = numberOfTasks / numberOfMembers; + if (numberOfTasks % numberOfMembers > 0) { + tasksPerMember++; + } + return tasksPerMember; + } + + private static class TaskPairs { + private final Set pairs; + private final int maxPairs; + TaskPairs(final int maxPairs) { + this.maxPairs = maxPairs; + this.pairs = new HashSet<>(maxPairs); + } + boolean hasNewPair(final TaskId task1, + final Set taskIds) { + if (pairs.size() == maxPairs) { + return false; + } + if (taskIds.size() == 0) { + return true; + } + for (final TaskId taskId : taskIds) { + if (!pairs.contains(pair(task1, taskId))) { + return true; + } + } + return false; + } + void addPairs(final TaskId taskId, final Set assigned) { + for (final TaskId id : assigned) { + if (!id.equals(taskId)) + pairs.add(pair(id, taskId)); + } + } + Pair pair(final TaskId task1, final TaskId task2) { + if (task1.compareTo(task2) < 0) { + return new Pair(task1, task2); + } + return new Pair(task2, task1); + } + private static class Pair { + private final TaskId task1; + private final TaskId task2; + Pair(final TaskId task1, final TaskId task2) { + this.task1 = task1; + this.task2 = task2; + } + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final Pair pair = (Pair) o; + return Objects.equals(task1, pair.task1) && + Objects.equals(task2, pair.task2); + } + @Override + public int hashCode() { + return Objects.hash(task1, task2); + } + } + } + + static class Member { + private final String processId; + private final String memberId; + public Member(final String processId, final String memberId) { + this.processId = processId; + this.memberId = memberId; + } + } +} diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/GroupSpecImplTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/GroupSpecImplTest.java index 5deccb9717f17..3bbececac83e9 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/GroupSpecImplTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/GroupSpecImplTest.java @@ -19,6 +19,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -50,6 +51,7 @@ void setUp() { groupSpec = new GroupSpecImpl( members, + new ArrayList<>(), new HashMap<>() ); } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/MockAssignorTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/MockAssignorTest.java index 25dada072df13..b46ae1bf95931 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/MockAssignorTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/MockAssignorTest.java @@ -18,6 +18,7 @@ import org.junit.jupiter.api.Test; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -44,6 +45,7 @@ public void testZeroMembers() { TaskAssignorException ex = assertThrows(TaskAssignorException.class, () -> assignor.assign( new GroupSpecImpl( Collections.emptyMap(), + Collections.emptyList(), new HashMap<>() ), new TopologyDescriberImpl(5, Collections.singletonList("test-subtopology")) @@ -82,6 +84,7 @@ public void testDoubleAssignment() { TaskAssignorException ex = assertThrows(TaskAssignorException.class, () -> assignor.assign( new GroupSpecImpl( Map.of("member1", memberSpec1, "member2", memberSpec2), + new ArrayList<>(), new HashMap<>() ), new TopologyDescriberImpl(5, Collections.singletonList("test-subtopology")) @@ -96,6 +99,7 @@ public void testBasicScenario() { final GroupAssignment result = assignor.assign( new GroupSpecImpl( Collections.emptyMap(), + new ArrayList<>(), new HashMap<>() ), new TopologyDescriberImpl(5, Collections.emptyList()) @@ -123,6 +127,7 @@ public void testSingleMember() { final GroupAssignment result = assignor.assign( new GroupSpecImpl( Collections.singletonMap("test_member", memberSpec), + new ArrayList<>(), new HashMap<>() ), new TopologyDescriberImpl(4, List.of("test-subtopology")) @@ -167,6 +172,7 @@ public void testTwoMembersTwoSubtopologies() { final GroupAssignment result = assignor.assign( new GroupSpecImpl( mkMap(mkEntry("test_member1", memberSpec1), mkEntry("test_member2", memberSpec2)), + new ArrayList<>(), new HashMap<>() ), new TopologyDescriberImpl(4, List.of("test-subtopology1", "test-subtopology2")) @@ -225,6 +231,7 @@ public void testTwoMembersTwoSubtopologiesStickiness() { final GroupAssignment result = assignor.assign( new GroupSpecImpl( mkMap(mkEntry("test_member1", memberSpec1), mkEntry("test_member2", memberSpec2)), + new ArrayList<>(), new HashMap<>() ), new TopologyDescriberImpl(4, List.of("test-subtopology1", "test-subtopology2")) diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java new file mode 100644 index 0000000000000..fd0812431041d --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java @@ -0,0 +1,1037 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group.streams.assignor; + +import org.junit.jupiter.api.Test; +import org.mockito.internal.util.collections.Sets; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static java.util.Arrays.asList; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class StickyTaskAssignorTest { + + private final StickyTaskAssignor assignor = new StickyTaskAssignor(); + + @Test + public void shouldAssignOneActiveTaskToEachProcessWhenTaskCountSameAsProcessCount() { + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1"); + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2"); + final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3"); + final GroupAssignment result = assignor.assign( + new GroupSpecImpl( + mkMap(mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2), mkEntry("member3", memberSpec3)), + Collections.singletonList("test-subtopology"), + new HashMap<>() + ), + new TopologyDescriberImpl(3, false) + ); + assertEquals(3, result.members().size()); + Set actualActiveTasks = new HashSet<>(); + for (int i = 0; i < 3; i++) { + final MemberAssignment testMember = result.members().get("member" + (i + 1)); + assertNotNull(testMember); + assertEquals(1, testMember.activeTasks().size()); + actualActiveTasks.addAll(testMember.activeTasks().get("test-subtopology")); + } + assertEquals(Sets.newSet(0, 1, 2), actualActiveTasks); + } + + @Test + public void shouldAssignTopicGroupIdEvenlyAcrossClientsWithNoStandByTasks() { + final AssignmentMemberSpec memberSpec11 = createAssignmentMemberSpec("process1"); + final AssignmentMemberSpec memberSpec12 = createAssignmentMemberSpec("process1"); + final AssignmentMemberSpec memberSpec21 = createAssignmentMemberSpec("process2"); + final AssignmentMemberSpec memberSpec22 = createAssignmentMemberSpec("process2"); + final AssignmentMemberSpec memberSpec31 = createAssignmentMemberSpec("process3"); + final AssignmentMemberSpec memberSpec32 = createAssignmentMemberSpec("process3"); + final Map members = mkMap(mkEntry("member1_1", memberSpec11), mkEntry("member1_2", memberSpec12), + mkEntry("member2_1", memberSpec21), mkEntry("member2_2", memberSpec22), + mkEntry("member3_1", memberSpec31), mkEntry("member3_2", memberSpec32)); + final GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, Arrays.asList("test-subtopology1", "test-subtopology2"), new HashMap<>()), + new TopologyDescriberImpl(3, false) + ); + assertEquals(1, getAllActiveTaskCount(result, "member1_1")); + assertEquals(1, getAllActiveTaskCount(result, "member1_2")); + assertEquals(1, getAllActiveTaskCount(result, "member2_1")); + assertEquals(1, getAllActiveTaskCount(result, "member2_2")); + assertEquals(1, getAllActiveTaskCount(result, "member3_1")); + assertEquals(1, getAllActiveTaskCount(result, "member3_2")); + assertEquals(mkMap(mkEntry("test-subtopology1", Sets.newSet(0, 1, 2)), mkEntry("test-subtopology2", Sets.newSet(0, 1, 2))), + mergeAllActiveTasks(result, "member1_1", "member1_2", "member2_1", "member2_2", "member3_1", "member3_2")); + } + + @Test + public void shouldAssignTopicGroupIdEvenlyAcrossClientsWithStandByTasks() { + final Map> tasks = mkMap(mkEntry("test-subtopology1", Sets.newSet(0, 1, 2)), mkEntry("test-subtopology2", Sets.newSet(0, 1, 2))); + final AssignmentMemberSpec memberSpec11 = createAssignmentMemberSpec("process1"); + final AssignmentMemberSpec memberSpec12 = createAssignmentMemberSpec("process1"); + final AssignmentMemberSpec memberSpec21 = createAssignmentMemberSpec("process2"); + final AssignmentMemberSpec memberSpec22 = createAssignmentMemberSpec("process2"); + final AssignmentMemberSpec memberSpec31 = createAssignmentMemberSpec("process3"); + final AssignmentMemberSpec memberSpec32 = createAssignmentMemberSpec("process3"); + final Map members = mkMap(mkEntry("member1_1", memberSpec11), mkEntry("member1_2", memberSpec12), + mkEntry("member2_1", memberSpec21), mkEntry("member2_2", memberSpec22), + mkEntry("member3_1", memberSpec31), mkEntry("member3_2", memberSpec32)); + final GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, + Arrays.asList("test-subtopology1", "test-subtopology2"), + mkMap(mkEntry("numStandbyReplicas", "1"))), + new TopologyDescriberImpl(3, true) + ); + // active tasks + assertEquals(1, getAllActiveTaskCount(result, "member1_1")); + assertEquals(1, getAllActiveTaskCount(result, "member1_2")); + assertEquals(1, getAllActiveTaskCount(result, "member2_1")); + assertEquals(1, getAllActiveTaskCount(result, "member2_2")); + assertEquals(1, getAllActiveTaskCount(result, "member3_1")); + assertEquals(1, getAllActiveTaskCount(result, "member3_2")); + assertEquals(tasks, + mergeAllActiveTasks(result, "member1_1", "member1_2", "member2_1", "member2_2", "member3_1", "member3_2")); + } + + @Test + public void shouldNotMigrateActiveTaskToOtherProcess() { + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", Collections.singleton(0))), Collections.emptyMap()); + AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", Collections.singleton(1))), Collections.emptyMap()); + Map members = mkMap(mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2)); + GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, Collections.singletonList("test-subtopology"), new HashMap<>()), + new TopologyDescriberImpl(3, false) + ); + MemberAssignment testMember1 = result.members().get("member1"); + assertNotNull(testMember1); + assertTrue(testMember1.activeTasks().get("test-subtopology").contains(0)); + MemberAssignment testMember2 = result.members().get("member2"); + assertNotNull(testMember2); + assertTrue(testMember2.activeTasks().get("test-subtopology").contains(1)); + assertEquals(3, + testMember1.activeTasks().get("test-subtopology").size() + testMember2.activeTasks().get("test-subtopology").size()); + // flip the previous active tasks assignment around. + memberSpec2 = createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", Collections.singleton(1))), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", Collections.singleton(2))), Collections.emptyMap()); + members = mkMap(mkEntry("member2", memberSpec2), mkEntry("member3", memberSpec3)); + result = assignor.assign( + new GroupSpecImpl(members, Collections.singletonList("test-subtopology"), new HashMap<>()), + new TopologyDescriberImpl(3, false) + ); + testMember2 = result.members().get("member2"); + assertNotNull(testMember2); + assertTrue(testMember2.activeTasks().get("test-subtopology").contains(1)); + MemberAssignment testMember3 = result.members().get("member3"); + assertNotNull(testMember3); + assertTrue(testMember3.activeTasks().get("test-subtopology").contains(2)); + assertEquals(3, + testMember2.activeTasks().get("test-subtopology").size() + testMember3.activeTasks().get("test-subtopology").size()); + } + + @Test + public void shouldMigrateActiveTasksToNewProcessWithoutChangingAllAssignments() { + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", Sets.newSet(0, 2))), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", Collections.singleton(1))), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3"); + Map members = mkMap( + mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2), mkEntry("member3", memberSpec3)); + GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, Collections.singletonList("test-subtopology"), new HashMap<>()), + new TopologyDescriberImpl(3, false) + ); + MemberAssignment testMember1 = result.members().get("member1"); + assertNotNull(testMember1); + assertTrue(testMember1.activeTasks().get("test-subtopology").contains(0) || testMember1.activeTasks().get("test-subtopology").contains(2)); + MemberAssignment testMember2 = result.members().get("member2"); + assertNotNull(testMember2); + assertTrue(testMember2.activeTasks().get("test-subtopology").contains(1)); + MemberAssignment testMember3 = result.members().get("member3"); + assertNotNull(testMember3); + assertTrue(testMember3.activeTasks().get("test-subtopology").contains(2) || testMember3.activeTasks().get("test-subtopology").contains(0)); + assertEquals(3, + testMember1.activeTasks().get("test-subtopology").size() + + testMember2.activeTasks().get("test-subtopology").size() + + testMember3.activeTasks().get("test-subtopology").size()); + } + + @Test + public void shouldAssignBasedOnCapacity() { + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1"); + final AssignmentMemberSpec memberSpec21 = createAssignmentMemberSpec("process2"); + final AssignmentMemberSpec memberSpec22 = createAssignmentMemberSpec("process2"); + Map members = mkMap( + mkEntry("member1", memberSpec1), mkEntry("member2_1", memberSpec21), mkEntry("member2_2", memberSpec22)); + GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, Collections.singletonList("test-subtopology"), new HashMap<>()), + new TopologyDescriberImpl(3, false) + ); + MemberAssignment testMember1 = result.members().get("member1"); + assertNotNull(testMember1); + assertEquals(1, testMember1.activeTasks().get("test-subtopology").size()); + MemberAssignment testMember21 = result.members().get("member2_1"); + assertNotNull(testMember21); + assertEquals(1, testMember21.activeTasks().get("test-subtopology").size()); + MemberAssignment testMember22 = result.members().get("member2_2"); + assertNotNull(testMember22); + assertEquals(1, testMember22.activeTasks().get("test-subtopology").size()); + } + + @Test + public void shouldAssignTasksEvenlyWithUnequalTopicGroupSizes() { + final Map> activeTasks = mkMap( + mkEntry("test-subtopology1", Sets.newSet(0, 1, 2, 3, 4, 5)), + mkEntry("test-subtopology2", Sets.newSet(0))); + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", activeTasks, Collections.emptyMap()); + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2"); + Map members = mkMap( + mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2)); + GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, Arrays.asList("test-subtopology1", "test-subtopology2"), new HashMap<>()), + new TopologyDescriberImpl2() + ); + MemberAssignment testMember1 = result.members().get("member1"); + assertNotNull(testMember1); + final Set member1Topology1 = testMember1.activeTasks().get("test-subtopology1"); + final Set member1Topology2 = testMember1.activeTasks().getOrDefault("test-subtopology2", new HashSet<>()); + assertEquals(4, member1Topology1.size() + member1Topology2.size()); + MemberAssignment testMember2 = result.members().get("member2"); + assertNotNull(testMember2); + final Set member2Topology1 = testMember2.activeTasks().get("test-subtopology1"); + final Set member2Topology2 = testMember2.activeTasks().getOrDefault("test-subtopology2", new HashSet<>()); + assertEquals(3, member2Topology1.size() + member2Topology2.size()); + assertEquals(activeTasks, mkMap( + mkEntry("test-subtopology1", Stream.concat(member1Topology1.stream(), member2Topology1.stream()).collect(Collectors.toSet())), + mkEntry("test-subtopology2", Stream.concat(member1Topology2.stream(), member2Topology2.stream()).collect(Collectors.toSet())))); + } + + @Test + public void shouldKeepActiveTaskStickinessWhenMoreClientThanActiveTasks() { + AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", Collections.singleton(0))), Collections.emptyMap()); + AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", Collections.singleton(2))), Collections.emptyMap()); + AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3", mkMap(mkEntry("test-subtopology", Collections.singleton(1))), Collections.emptyMap()); + AssignmentMemberSpec memberSpec4 = createAssignmentMemberSpec("process4"); + AssignmentMemberSpec memberSpec5 = createAssignmentMemberSpec("process5"); + Map members = mkMap( + mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2), + mkEntry("member3", memberSpec3), mkEntry("member4", memberSpec4), mkEntry("member5", memberSpec5)); + GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, Collections.singletonList("test-subtopology"), new HashMap<>()), + new TopologyDescriberImpl(3, false) + ); + MemberAssignment testMember1 = result.members().get("member1"); + assertNotNull(testMember1); + assertEquals(1, testMember1.activeTasks().get("test-subtopology").size()); + assertEquals(Collections.singleton(0), testMember1.activeTasks().get("test-subtopology")); + MemberAssignment testMember2 = result.members().get("member2"); + assertNotNull(testMember2); + assertEquals(1, testMember2.activeTasks().get("test-subtopology").size()); + assertEquals(Collections.singleton(2), testMember2.activeTasks().get("test-subtopology")); + MemberAssignment testMember3 = result.members().get("member3"); + assertNotNull(testMember3); + assertEquals(1, testMember3.activeTasks().get("test-subtopology").size()); + assertEquals(Collections.singleton(1), testMember3.activeTasks().get("test-subtopology")); + MemberAssignment testMember4 = result.members().get("member4"); + assertNotNull(testMember4); + assertNull(testMember4.activeTasks().get("test-subtopology")); + MemberAssignment testMember5 = result.members().get("member5"); + assertNotNull(testMember5); + assertNull(testMember5.activeTasks().get("test-subtopology")); + // change up the assignment and make sure it is still sticky + memberSpec1 = createAssignmentMemberSpec("process1"); + memberSpec2 = createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", Collections.singleton(0))), Collections.emptyMap()); + memberSpec3 = createAssignmentMemberSpec("process3"); + memberSpec4 = createAssignmentMemberSpec("process4", mkMap(mkEntry("test-subtopology", Collections.singleton(2))), Collections.emptyMap()); + memberSpec5 = createAssignmentMemberSpec("process5", mkMap(mkEntry("test-subtopology", Collections.singleton(1))), Collections.emptyMap()); + members = mkMap( + mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2), + mkEntry("member3", memberSpec3), mkEntry("member4", memberSpec4), mkEntry("member5", memberSpec5)); + result = assignor.assign( + new GroupSpecImpl(members, Collections.singletonList("test-subtopology"), new HashMap<>()), + new TopologyDescriberImpl(3, false) + ); + testMember1 = result.members().get("member1"); + assertNotNull(testMember1); + assertNull(testMember1.activeTasks().get("test-subtopology")); + testMember2 = result.members().get("member2"); + assertNotNull(testMember2); + assertEquals(1, testMember2.activeTasks().get("test-subtopology").size()); + assertEquals(Collections.singleton(0), testMember2.activeTasks().get("test-subtopology")); + testMember3 = result.members().get("member3"); + assertNotNull(testMember3); + assertNull(testMember3.activeTasks().get("test-subtopology")); + testMember4 = result.members().get("member4"); + assertNotNull(testMember4); + assertEquals(1, testMember4.activeTasks().get("test-subtopology").size()); + assertEquals(Collections.singleton(2), testMember4.activeTasks().get("test-subtopology")); + testMember5 = result.members().get("member5"); + assertNotNull(testMember5); + assertEquals(1, testMember5.activeTasks().get("test-subtopology").size()); + assertEquals(Collections.singleton(1), testMember5.activeTasks().get("test-subtopology")); + } + + @Test + public void shouldAssignTasksToClientWithPreviousStandbyTasks() { + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", Collections.emptyMap(), mkMap(mkEntry("test-subtopology", Collections.singleton(2)))); + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2", Collections.emptyMap(), mkMap(mkEntry("test-subtopology", Collections.singleton(1)))); + final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3", Collections.emptyMap(), mkMap(mkEntry("test-subtopology", Collections.singleton(0)))); + Map members = mkMap( + mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2), mkEntry("member3", memberSpec3)); + GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, Collections.singletonList("test-subtopology"), new HashMap<>()), + new TopologyDescriberImpl(3, false) + ); + MemberAssignment testMember1 = result.members().get("member1"); + assertNotNull(testMember1); + assertEquals(1, testMember1.activeTasks().get("test-subtopology").size()); + assertEquals(Collections.singleton(2), testMember1.activeTasks().get("test-subtopology")); + MemberAssignment testMember2 = result.members().get("member2"); + assertNotNull(testMember2); + assertEquals(1, testMember2.activeTasks().get("test-subtopology").size()); + assertEquals(Collections.singleton(1), testMember2.activeTasks().get("test-subtopology")); + MemberAssignment testMember3 = result.members().get("member3"); + assertNotNull(testMember3); + assertEquals(1, testMember3.activeTasks().get("test-subtopology").size()); + assertEquals(Collections.singleton(0), testMember3.activeTasks().get("test-subtopology")); + } + + @Test + public void shouldAssignBasedOnCapacityWhenMultipleClientHaveStandbyTasks() { + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", + mkMap(mkEntry("test-subtopology", Collections.singleton(0))), + mkMap(mkEntry("test-subtopology", Collections.singleton(1)))); + final AssignmentMemberSpec memberSpec21 = createAssignmentMemberSpec("process2", + mkMap(mkEntry("test-subtopology", Collections.singleton(2))), + mkMap(mkEntry("test-subtopology", Collections.singleton(1)))); + final AssignmentMemberSpec memberSpec22 = createAssignmentMemberSpec("process2", + Collections.emptyMap(), Collections.emptyMap()); + Map members = mkMap( + mkEntry("member1", memberSpec1), + mkEntry("member2_1", memberSpec21), mkEntry("member2_2", memberSpec22)); + GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, Collections.singletonList("test-subtopology"), new HashMap<>()), + new TopologyDescriberImpl(3, false) + ); + MemberAssignment testMember1 = result.members().get("member1"); + assertNotNull(testMember1); + assertEquals(1, testMember1.activeTasks().get("test-subtopology").size()); + assertEquals(Collections.singleton(0), testMember1.activeTasks().get("test-subtopology")); + MemberAssignment testMember21 = result.members().get("member2_1"); + assertNotNull(testMember21); + assertEquals(1, testMember21.activeTasks().get("test-subtopology").size()); + assertEquals(Collections.singleton(2), testMember21.activeTasks().get("test-subtopology")); + MemberAssignment testMember22 = result.members().get("member2_2"); + assertNotNull(testMember22); + assertEquals(1, testMember22.activeTasks().get("test-subtopology").size()); + assertEquals(Collections.singleton(1), testMember22.activeTasks().get("test-subtopology")); + } + + @Test + public void shouldAssignStandbyTasksToDifferentClientThanCorrespondingActiveTaskIsAssignedTo() { + final Map> tasks = mkMap(mkEntry("test-subtopology", Sets.newSet(0, 1, 2, 3))); + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", Collections.singleton(0))), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", Collections.singleton(1))), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3", mkMap(mkEntry("test-subtopology", Collections.singleton(2))), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec4 = createAssignmentMemberSpec("process4", mkMap(mkEntry("test-subtopology", Collections.singleton(3))), Collections.emptyMap()); + Map members = mkMap( + mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2), + mkEntry("member3", memberSpec3), mkEntry("member4", memberSpec4)); + final GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, + Collections.singletonList("test-subtopology"), + mkMap(mkEntry("numStandbyReplicas", "1"))), + new TopologyDescriberImpl(4, true) + ); + final List member1TaskIds = getAllStandbyTaskIds(result, "member1"); + assertFalse(member1TaskIds.contains(0)); + assertTrue(member1TaskIds.size() <= 2); + final List member2TaskIds = getAllStandbyTaskIds(result, "member2"); + assertFalse(member2TaskIds.contains(1)); + assertTrue(member2TaskIds.size() <= 2); + final List member3TaskIds = getAllStandbyTaskIds(result, "member3"); + assertFalse(member3TaskIds.contains(2)); + assertTrue(member3TaskIds.size() <= 2); + final List member4TaskIds = getAllStandbyTaskIds(result, "member4"); + assertFalse(member4TaskIds.contains(3)); + assertTrue(member4TaskIds.size() <= 2); + int nonEmptyStandbyTaskCount = 0; + nonEmptyStandbyTaskCount += member1TaskIds.isEmpty() ? 0 : 1; + nonEmptyStandbyTaskCount += member2TaskIds.isEmpty() ? 0 : 1; + nonEmptyStandbyTaskCount += member3TaskIds.isEmpty() ? 0 : 1; + nonEmptyStandbyTaskCount += member4TaskIds.isEmpty() ? 0 : 1; + assertTrue(nonEmptyStandbyTaskCount >= 3); + assertEquals(tasks, mergeAllStandbyTasks(result)); + } + + @Test + public void shouldAssignMultipleReplicasOfStandbyTask() { + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", Collections.singleton(0))), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", Collections.singleton(1))), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3", mkMap(mkEntry("test-subtopology", Collections.singleton(2))), Collections.emptyMap()); + Map members = mkMap( + mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2), + mkEntry("member3", memberSpec3)); + final GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, + Collections.singletonList("test-subtopology"), + mkMap(mkEntry("numStandbyReplicas", "2"))), + new TopologyDescriberImpl(3, true) + ); + assertEquals(Sets.newSet(1, 2), new HashSet<>(getAllStandbyTaskIds(result, "member1"))); + assertEquals(Sets.newSet(0, 2), new HashSet<>(getAllStandbyTaskIds(result, "member2"))); + assertEquals(Sets.newSet(0, 1), new HashSet<>(getAllStandbyTaskIds(result, "member3"))); + } + + @Test + public void shouldNotAssignStandbyTaskReplicasWhenNoClientAvailableWithoutHavingTheTaskAssigned() { + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1"); + Map members = mkMap( + mkEntry("member1", memberSpec1)); + final GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, + Collections.singletonList("test-subtopology"), + mkMap(mkEntry("numStandbyReplicas", "1"))), + new TopologyDescriberImpl(1, true) + ); + assertTrue(getAllStandbyTasks(result, "member1").isEmpty()); + } + + @Test + public void shouldAssignActiveAndStandbyTasks() { + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1"); + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2"); + final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3"); + Map members = mkMap( + mkEntry("member1", memberSpec1), + mkEntry("member2", memberSpec2), mkEntry("member3", memberSpec3)); + final GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, + Collections.singletonList("test-subtopology"), + mkMap(mkEntry("numStandbyReplicas", "1"))), + new TopologyDescriberImpl(3, true) + ); + assertEquals(Sets.newSet(0, 1, 2), new HashSet<>(getAllActiveTaskIds(result))); + assertEquals(Sets.newSet(0, 1, 2), new HashSet<>(getAllStandbyTaskIds(result))); + } + + @Test + public void shouldAssignAtLeastOneTaskToEachClientIfPossible() { + final AssignmentMemberSpec memberSpec11 = createAssignmentMemberSpec("process1"); + final AssignmentMemberSpec memberSpec12 = createAssignmentMemberSpec("process1"); + final AssignmentMemberSpec memberSpec13 = createAssignmentMemberSpec("process1"); + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2"); + final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3"); + Map members = mkMap( + mkEntry("member1_1", memberSpec11), mkEntry("member1_2", memberSpec12), mkEntry("member1_3", memberSpec13), + mkEntry("member2", memberSpec2), mkEntry("member3", memberSpec3)); + GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, Collections.singletonList("test-subtopology"), new HashMap<>()), + new TopologyDescriberImpl(3, false) + ); + assertEquals(1, getAllActiveTaskIds(result, "member1_1", "member1_2", "member1_3").size()); + assertEquals(1, getAllActiveTaskIds(result, "member2").size()); + assertEquals(1, getAllActiveTaskIds(result, "member3").size()); + } + + @Test + public void shouldAssignEachActiveTaskToOneClientWhenMoreClientsThanTasks() { + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1"); + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2"); + final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3"); + final AssignmentMemberSpec memberSpec4 = createAssignmentMemberSpec("process4"); + final AssignmentMemberSpec memberSpec5 = createAssignmentMemberSpec("process5"); + final AssignmentMemberSpec memberSpec6 = createAssignmentMemberSpec("process6"); + Map members = mkMap( + mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2), mkEntry("member3", memberSpec3), + mkEntry("member4", memberSpec4), mkEntry("member5", memberSpec5), mkEntry("member6", memberSpec6)); + GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, Collections.singletonList("test-subtopology"), new HashMap<>()), + new TopologyDescriberImpl(3, false) + ); + assertEquals(3, getAllActiveTaskIds(result, "member1", "member2", "member3", "member4", "member5", "member6").size()); + assertEquals(Sets.newSet(0, 1, 2), getActiveTasks(result, "test-subtopology", "member1", "member2", "member3", "member4", "member5", "member6")); + } + + @Test + public void shouldBalanceActiveAndStandbyTasksAcrossAvailableClients() { + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1"); + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2"); + final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3"); + final AssignmentMemberSpec memberSpec4 = createAssignmentMemberSpec("process4"); + final AssignmentMemberSpec memberSpec5 = createAssignmentMemberSpec("process5"); + final AssignmentMemberSpec memberSpec6 = createAssignmentMemberSpec("process6"); + Map members = mkMap( + mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2), mkEntry("member3", memberSpec3), + mkEntry("member4", memberSpec4), mkEntry("member5", memberSpec5), mkEntry("member6", memberSpec6)); + final GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, + Collections.singletonList("test-subtopology"), + mkMap(mkEntry("numStandbyReplicas", "1"))), + new TopologyDescriberImpl(3, true) + ); + for (String memberId : result.members().keySet()) { + assertEquals(1, getAllStandbyTasks(result, memberId).size() + getAllActiveTaskIds(result, memberId).size()); + } + } + + @Test + public void shouldAssignMoreTasksToClientWithMoreCapacity() { + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1"); + final AssignmentMemberSpec memberSpec21 = createAssignmentMemberSpec("process2"); + final AssignmentMemberSpec memberSpec22 = createAssignmentMemberSpec("process2"); + Map members = mkMap( + mkEntry("member1", memberSpec1), mkEntry("member2_1", memberSpec21), mkEntry("member2_2", memberSpec22)); + GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, Arrays.asList("test-subtopology0", "test-subtopology1", "test-subtopology2", "test-subtopology3"), new HashMap<>()), + new TopologyDescriberImpl(3, false) + ); + assertEquals(8, getAllActiveTaskCount(result, "member2_1", "member2_2")); + assertEquals(4, getAllActiveTaskCount(result, "member1")); + } + + @Test + public void shouldNotHaveSameAssignmentOnAnyTwoHosts() { + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1"); + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2"); + final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3"); + final AssignmentMemberSpec memberSpec4 = createAssignmentMemberSpec("process4"); + final List allMemberIds = asList("member1", "member2", "member3", "member4"); + Map members = mkMap( + mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2), mkEntry("member3", memberSpec3), mkEntry("member4", memberSpec4)); + final GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, + Collections.singletonList("test-subtopology"), + mkMap(mkEntry("numStandbyReplicas", "1"))), + new TopologyDescriberImpl(4, true) + ); + for (final String memberId : allMemberIds) { + final List taskIds = getAllTaskIds(result, memberId); + for (final String otherMemberId : allMemberIds) { + if (!memberId.equals(otherMemberId)) { + assertNotEquals(taskIds, getAllTaskIds(result, otherMemberId)); + } + } + } + } + + @Test + public void shouldNotHaveSameAssignmentOnAnyTwoHostsWhenThereArePreviousActiveTasks() { + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", Sets.newSet(1, 2))), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", Sets.newSet(3))), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3", mkMap(mkEntry("test-subtopology", Sets.newSet(0))), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec4 = createAssignmentMemberSpec("process4"); + final List allMemberIds = asList("member1", "member2", "member3", "member4"); + Map members = mkMap( + mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2), mkEntry("member3", memberSpec3), mkEntry("member4", memberSpec4)); + final GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, + Collections.singletonList("test-subtopology"), + mkMap(mkEntry("numStandbyReplicas", "1"))), + new TopologyDescriberImpl(4, true) + ); + for (final String memberId : allMemberIds) { + final List taskIds = getAllTaskIds(result, memberId); + for (final String otherMemberId : allMemberIds) { + if (!memberId.equals(otherMemberId)) { + assertNotEquals(taskIds, getAllTaskIds(result, otherMemberId)); + } + } + } + } + + @Test + public void shouldNotHaveSameAssignmentOnAnyTwoHostsWhenThereArePreviousStandbyTasks() { + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", + mkMap(mkEntry("test-subtopology", Sets.newSet(1, 2))), mkMap(mkEntry("test-subtopology", Sets.newSet(3, 0)))); + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2", + mkMap(mkEntry("test-subtopology", Sets.newSet(3, 0))), mkMap(mkEntry("test-subtopology", Sets.newSet(1, 2)))); + final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3"); + final AssignmentMemberSpec memberSpec4 = createAssignmentMemberSpec("process4"); + final List allMemberIds = asList("member1", "member2", "member3", "member4"); + Map members = mkMap( + mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2), mkEntry("member3", memberSpec3), mkEntry("member4", memberSpec4)); + final GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, + Collections.singletonList("test-subtopology"), + mkMap(mkEntry("numStandbyReplicas", "1"))), + new TopologyDescriberImpl(4, true) + ); + for (final String memberId : allMemberIds) { + final List taskIds = getAllTaskIds(result, memberId); + for (final String otherMemberId : allMemberIds) { + if (!memberId.equals(otherMemberId)) { + assertNotEquals(taskIds, getAllTaskIds(result, otherMemberId)); + } + } + } + } + + @Test + public void shouldReBalanceTasksAcrossAllClientsWhenCapacityAndTaskCountTheSame() { + final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3", mkMap(mkEntry("test-subtopology", Sets.newSet(0, 1, 2, 3))), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1"); + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2"); + final AssignmentMemberSpec memberSpec4 = createAssignmentMemberSpec("process4"); + Map members = mkMap( + mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2), mkEntry("member3", memberSpec3), mkEntry("member4", memberSpec4)); + GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, Collections.singletonList("test-subtopology"), new HashMap<>()), + new TopologyDescriberImpl(4, false) + ); + assertEquals(1, getAllActiveTaskCount(result, "member1")); + assertEquals(1, getAllActiveTaskCount(result, "member2")); + assertEquals(1, getAllActiveTaskCount(result, "member3")); + assertEquals(1, getAllActiveTaskCount(result, "member4")); + } + + @Test + public void shouldReBalanceTasksAcrossClientsWhenCapacityLessThanTaskCount() { + final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3", mkMap(mkEntry("test-subtopology", Sets.newSet(0, 1, 2, 3))), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1"); + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2"); + Map members = mkMap( + mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2), mkEntry("member3", memberSpec3)); + GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, Collections.singletonList("test-subtopology"), new HashMap<>()), + new TopologyDescriberImpl(4, false) + ); + assertEquals(1, getAllActiveTaskCount(result, "member1")); + assertEquals(1, getAllActiveTaskCount(result, "member2")); + assertEquals(2, getAllActiveTaskCount(result, "member3")); + } + + @Test + public void shouldRebalanceTasksToClientsBasedOnCapacity() { + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", Sets.newSet(0, 3, 2))), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec31 = createAssignmentMemberSpec("process3"); + final AssignmentMemberSpec memberSpec32 = createAssignmentMemberSpec("process3"); + Map members = mkMap( + mkEntry("member2", memberSpec2), mkEntry("member3_1", memberSpec31), mkEntry("member3_2", memberSpec32)); + GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, Collections.singletonList("test-subtopology"), new HashMap<>()), + new TopologyDescriberImpl(3, false) + ); + assertEquals(1, getAllActiveTaskCount(result, "member2")); + assertEquals(2, getAllActiveTaskCount(result, "member3_1", "member3_2")); + } + + @Test + public void shouldMoveMinimalNumberOfTasksWhenPreviouslyAboveCapacityAndNewClientAdded() { + final Set p1PrevTasks = Sets.newSet(0, 2); + final Set p2PrevTasks = Sets.newSet(1, 3); + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", p1PrevTasks)), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", p2PrevTasks)), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3"); + final Map members = mkMap( + mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2), mkEntry("member3", memberSpec3)); + GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, Collections.singletonList("test-subtopology"), new HashMap<>()), + new TopologyDescriberImpl(4, false) + ); + assertEquals(1, getAllActiveTaskCount(result, "member3")); + final List p3ActiveTasks = getAllActiveTaskIds(result, "member3"); + if (p1PrevTasks.removeAll(p3ActiveTasks)) { + assertEquals(p2PrevTasks, new HashSet<>(getAllActiveTaskIds(result, "member2"))); + } else { + assertEquals(p1PrevTasks, new HashSet<>(getAllActiveTaskIds(result, "member1"))); + } + } + + @Test + public void shouldNotMoveAnyTasksWhenNewTasksAdded() { + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", Sets.newSet(0, 1))), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", Sets.newSet(2, 3))), Collections.emptyMap()); + final Map members = mkMap( + mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2)); + GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, Collections.singletonList("test-subtopology"), new HashMap<>()), + new TopologyDescriberImpl(6, false) + ); + final List mem1Tasks = getAllActiveTaskIds(result, "member1"); + assertTrue(mem1Tasks.contains(0)); + assertTrue(mem1Tasks.contains(1)); + final List mem2Tasks = getAllActiveTaskIds(result, "member2"); + assertTrue(mem2Tasks.contains(2)); + assertTrue(mem2Tasks.contains(3)); + } + + @Test + public void shouldAssignNewTasksToNewClientWhenPreviousTasksAssignedToOldClients() { + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", Sets.newSet(2, 1))), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", Sets.newSet(0, 3))), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3"); + final Map members = mkMap( + mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2), mkEntry("member3", memberSpec3)); + GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, Collections.singletonList("test-subtopology"), new HashMap<>()), + new TopologyDescriberImpl(6, false) + ); + final List mem1Tasks = getAllActiveTaskIds(result, "member1"); + assertTrue(mem1Tasks.contains(2)); + assertTrue(mem1Tasks.contains(1)); + final List mem2Tasks = getAllActiveTaskIds(result, "member2"); + assertTrue(mem2Tasks.contains(0)); + assertTrue(mem2Tasks.contains(3)); + final List mem3Tasks = getAllActiveTaskIds(result, "member3"); + assertTrue(mem3Tasks.contains(4)); + assertTrue(mem3Tasks.contains(5)); + } + + @Test + public void shouldAssignTasksNotPreviouslyActiveToNewClient() { + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", + mkMap(mkEntry("test-subtopology0", Sets.newSet(1)), mkEntry("test-subtopology1", Sets.newSet(2, 3))), + mkMap(mkEntry("test-subtopology0", Sets.newSet(0)), mkEntry("test-subtopology1", Sets.newSet(1)), mkEntry("test-subtopology2", Sets.newSet(0, 1, 3)))); + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2", + mkMap(mkEntry("test-subtopology0", Sets.newSet(0)), mkEntry("test-subtopology1", Sets.newSet(1)), mkEntry("test-subtopology2", Sets.newSet(2))), + mkMap(mkEntry("test-subtopology0", Sets.newSet(1, 2, 3)), mkEntry("test-subtopology1", Sets.newSet(0, 2, 3)), mkEntry("test-subtopology2", Sets.newSet(0, 1, 3)))); + final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3", + mkMap(mkEntry("test-subtopology2", Sets.newSet(0, 1, 3))), + mkMap(mkEntry("test-subtopology0", Sets.newSet(2)), mkEntry("test-subtopology1", Sets.newSet(2)))); + final AssignmentMemberSpec newMemberSpec = createAssignmentMemberSpec("process4", + Collections.emptyMap(), + mkMap(mkEntry("test-subtopology0", Sets.newSet(0, 1, 2, 3)), mkEntry("test-subtopology1", Sets.newSet(0, 1, 2, 3)), mkEntry("test-subtopology2", Sets.newSet(0, 1, 2, 3)))); + final Map members = mkMap( + mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2), mkEntry("member3", memberSpec3), mkEntry("newMember", newMemberSpec)); + GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, Arrays.asList("test-subtopology0", "test-subtopology1", "test-subtopology2"), new HashMap<>()), + new TopologyDescriberImpl(4, false) + ); + assertEquals(mkMap(mkEntry("test-subtopology0", Sets.newSet(1)), mkEntry("test-subtopology1", Sets.newSet(2, 3))), + getAllActiveTasks(result, "member1")); + assertEquals(mkMap(mkEntry("test-subtopology0", Sets.newSet(0)), mkEntry("test-subtopology1", Sets.newSet(1)), mkEntry("test-subtopology2", Sets.newSet(2))), + getAllActiveTasks(result, "member2")); + assertEquals(mkMap(mkEntry("test-subtopology2", Sets.newSet(0, 1, 3))), + getAllActiveTasks(result, "member3")); + assertEquals(mkMap(mkEntry("test-subtopology0", Sets.newSet(2, 3)), mkEntry("test-subtopology1", Sets.newSet(0))), + getAllActiveTasks(result, "newMember")); + } + + @Test + public void shouldAssignTasksNotPreviouslyActiveToMultipleNewClients() { + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", + mkMap(mkEntry("test-subtopology0", Sets.newSet(1)), mkEntry("test-subtopology1", Sets.newSet(2, 3))), + mkMap(mkEntry("test-subtopology0", Sets.newSet(0)), mkEntry("test-subtopology1", Sets.newSet(1)), mkEntry("test-subtopology2", Sets.newSet(0, 1, 3)))); + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2", + mkMap(mkEntry("test-subtopology0", Sets.newSet(0)), mkEntry("test-subtopology1", Sets.newSet(1)), mkEntry("test-subtopology2", Sets.newSet(2))), + mkMap(mkEntry("test-subtopology0", Sets.newSet(1, 2, 3)), mkEntry("test-subtopology1", Sets.newSet(0, 2, 3)), mkEntry("test-subtopology2", Sets.newSet(0, 1, 3)))); + final AssignmentMemberSpec bounce1 = createAssignmentMemberSpec("bounce1", + Collections.emptyMap(), + mkMap(mkEntry("test-subtopology2", Sets.newSet(0, 1, 3)))); + final AssignmentMemberSpec bounce2 = createAssignmentMemberSpec("bounce2", + Collections.emptyMap(), + mkMap(mkEntry("test-subtopology0", Sets.newSet(2, 3)), mkEntry("test-subtopology1", Sets.newSet(0)))); + final Map members = mkMap( + mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2), mkEntry("bounce_member1", bounce1), mkEntry("bounce_member2", bounce2)); + GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, Arrays.asList("test-subtopology0", "test-subtopology1", "test-subtopology2"), new HashMap<>()), + new TopologyDescriberImpl(4, false) + ); + assertEquals(mkMap(mkEntry("test-subtopology0", Sets.newSet(1)), mkEntry("test-subtopology1", Sets.newSet(2, 3))), + getAllActiveTasks(result, "member1")); + assertEquals(mkMap(mkEntry("test-subtopology0", Sets.newSet(0)), mkEntry("test-subtopology1", Sets.newSet(1)), mkEntry("test-subtopology2", Sets.newSet(2))), + getAllActiveTasks(result, "member2")); + assertEquals(mkMap(mkEntry("test-subtopology2", Sets.newSet(0, 1, 3))), + getAllActiveTasks(result, "bounce_member1")); + assertEquals(mkMap(mkEntry("test-subtopology0", Sets.newSet(2, 3)), mkEntry("test-subtopology1", Sets.newSet(0))), + getAllActiveTasks(result, "bounce_member2")); + } + + @Test + public void shouldAssignTasksToNewClient() { + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", Sets.newSet(1, 2))), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2"); + final Map members = mkMap( + mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2)); + GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, Collections.singletonList("test-subtopology"), new HashMap<>()), + new TopologyDescriberImpl(2, false) + ); + assertEquals(1, getAllActiveTaskCount(result, "member1")); + } + + @Test + public void shouldAssignTasksToNewClientWithoutFlippingAssignmentBetweenExistingClients() { + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", Sets.newSet(0, 1, 2))), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", Sets.newSet(3, 4, 5))), Collections.emptyMap()); + final AssignmentMemberSpec newMemberSpec = createAssignmentMemberSpec("process3"); + final Map members = mkMap( + mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2), mkEntry("newMember", newMemberSpec)); + GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, Collections.singletonList("test-subtopology"), new HashMap<>()), + new TopologyDescriberImpl(6, false) + ); + final List mem1Tasks = getAllActiveTaskIds(result, "member1"); + assertFalse(mem1Tasks.contains(3)); + assertFalse(mem1Tasks.contains(4)); + assertFalse(mem1Tasks.contains(5)); + assertEquals(2, mem1Tasks.size()); + final List mem2Tasks = getAllActiveTaskIds(result, "member2"); + assertFalse(mem2Tasks.contains(0)); + assertFalse(mem2Tasks.contains(1)); + assertFalse(mem2Tasks.contains(2)); + assertEquals(2, mem2Tasks.size()); + assertEquals(2, getAllActiveTaskIds(result, "newMember").size()); + } + + @Test + public void shouldAssignTasksToNewClientWithoutFlippingAssignmentBetweenExistingAndBouncedClients() { + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", Sets.newSet(0, 1, 2, 6))), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2", Collections.emptyMap(), mkMap(mkEntry("test-subtopology", Sets.newSet(3, 4, 5)))); + final AssignmentMemberSpec newMemberSpec = createAssignmentMemberSpec("newProcess"); + final Map members = mkMap( + mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2), mkEntry("newMember", newMemberSpec)); + GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, Collections.singletonList("test-subtopology"), new HashMap<>()), + new TopologyDescriberImpl(7, false) + ); + final List mem1Tasks = getAllActiveTaskIds(result, "member1"); + assertFalse(mem1Tasks.contains(3)); + assertFalse(mem1Tasks.contains(4)); + assertFalse(mem1Tasks.contains(5)); + assertEquals(3, mem1Tasks.size()); + final List mem2Tasks = getAllActiveTaskIds(result, "member2"); + assertFalse(mem2Tasks.contains(0)); + assertFalse(mem2Tasks.contains(1)); + assertFalse(mem2Tasks.contains(2)); + assertEquals(2, mem2Tasks.size()); + assertEquals(2, getAllActiveTaskIds(result, "newMember").size()); + } + + private int getAllActiveTaskCount(GroupAssignment result, String... memberIds) { + int size = 0; + for (String memberId : memberIds) { + final MemberAssignment testMember = result.members().get(memberId); + assertNotNull(testMember); + assertNotNull(testMember.activeTasks()); + if (testMember.activeTasks().size() != 0) { + for (Map.Entry> entry : testMember.activeTasks().entrySet()) { + size += entry.getValue().size(); + } + } + } + return size; + } + + private Set getActiveTasks(GroupAssignment result, final String topologyId, String... memberIds) { + Set res = new HashSet<>(); + for (String memberId : memberIds) { + final MemberAssignment testMember = result.members().get(memberId); + assertNotNull(testMember); + assertNotNull(testMember.activeTasks()); + if (testMember.activeTasks().get(topologyId) != null) { + res.addAll(testMember.activeTasks().get(topologyId)); + } + } + return res; + } + + private List getAllActiveTaskIds(GroupAssignment result, String... memberIds) { + List res = new ArrayList<>(); + for (String memberId : memberIds) { + final MemberAssignment testMember = result.members().get(memberId); + assertNotNull(testMember); + assertNotNull(testMember.activeTasks()); + if (testMember.activeTasks().size() != 0) { + for (Map.Entry> entry : testMember.activeTasks().entrySet()) { + res.addAll(entry.getValue()); + } + } + } + return res; + } + + private List getAllActiveTaskIds(GroupAssignment result) { + String[] memberIds = new String[result.members().size()]; + return getAllActiveTaskIds(result, result.members().keySet().toArray(memberIds)); + } + + private Map> getAllActiveTasks(GroupAssignment result, String memberId) { + final MemberAssignment testMember = result.members().get(memberId); + assertNotNull(testMember); + assertNotNull(testMember.activeTasks()); + if (testMember.activeTasks().size() != 0) { + return testMember.activeTasks(); + } + return new HashMap<>(); + } + + private Map> getAllStandbyTasks(GroupAssignment result, String memberId) { + final MemberAssignment testMember = result.members().get(memberId); + assertNotNull(testMember); + assertNotNull(testMember.standbyTasks()); + if (testMember.standbyTasks().size() != 0) { + return testMember.standbyTasks(); + } + return new HashMap<>(); + } + + private List getAllStandbyTaskIds(GroupAssignment result, String... memberIds) { + List res = new ArrayList<>(); + for (String memberId : memberIds) { + final MemberAssignment testMember = result.members().get(memberId); + assertNotNull(testMember); + assertNotNull(testMember.standbyTasks()); + if (testMember.standbyTasks().size() != 0) { + for (Map.Entry> entry : testMember.standbyTasks().entrySet()) { + res.addAll(entry.getValue()); + } + } + } + return res; + } + + private List getAllStandbyTaskIds(GroupAssignment result) { + String[] memberIds = new String[result.members().size()]; + return getAllStandbyTaskIds(result, result.members().keySet().toArray(memberIds)); + } + + private Map> mergeAllActiveTasks(GroupAssignment result, String... memberIds) { + Map> res = new HashMap<>(); + for (String memberId : memberIds) { + Map> memberActiveTasks = getAllActiveTasks(result, memberId); + res = Stream.of(res, memberActiveTasks) + .flatMap(map -> map.entrySet().stream()) + .collect(Collectors.toMap( + Map.Entry::getKey, + Map.Entry::getValue, + (v1, v2) -> { + v1.addAll(v2); + return new HashSet<>(v1); + })); + } + return res; + } + + private List getAllTaskIds(GroupAssignment result, String... memberIds) { + List res = new ArrayList<>(); + res.addAll(getAllActiveTaskIds(result, memberIds)); + res.addAll(getAllStandbyTaskIds(result, memberIds)); + return res; + } + + private Map> mergeAllStandbyTasks(GroupAssignment result, String... memberIds) { + Map> res = new HashMap<>(); + for (String memberId : memberIds) { + Map> memberStandbyTasks = getAllStandbyTasks(result, memberId); + res = Stream.of(res, memberStandbyTasks) + .flatMap(map -> map.entrySet().stream()) + .collect(Collectors.toMap( + Map.Entry::getKey, + Map.Entry::getValue, + (v1, v2) -> { + v1.addAll(v2); + return new HashSet<>(v1); + })); + } + return res; + } + + private Map> mergeAllStandbyTasks(GroupAssignment result) { + String[] memberIds = new String[result.members().size()]; + return mergeAllStandbyTasks(result, result.members().keySet().toArray(memberIds)); + } + + private AssignmentMemberSpec createAssignmentMemberSpec(final String processId) { + return new AssignmentMemberSpec( + Optional.empty(), + Optional.empty(), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap(), + processId, + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap()); + } + + private AssignmentMemberSpec createAssignmentMemberSpec(final String processId, final Map> prevActiveTasks, + final Map> prevStandbyTasks) { + return new AssignmentMemberSpec( + Optional.empty(), + Optional.empty(), + prevActiveTasks, + prevStandbyTasks, + Collections.emptyMap(), + processId, + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap()); + } + + static class TopologyDescriberImpl implements TopologyDescriber { + final int numTasks; + final boolean isStateful; + + TopologyDescriberImpl(int numTasks, boolean isStateful) { + this.numTasks = numTasks; + this.isStateful = isStateful; + } + + @Override + public List subtopologies() { + return List.of(); + } + + @Override + public int numTasks(String subtopologyId) throws NoSuchElementException { + return numTasks; + } + + @Override + public boolean isStateful(String subtopologyId) { + return isStateful; + } + } + + static class TopologyDescriberImpl2 implements TopologyDescriber { + + @Override + public List subtopologies() { + return List.of(); + } + + @Override + public int numTasks(String subtopologyId) throws NoSuchElementException { + if (subtopologyId.equals("test-subtopology1")) + return 6; + return 1; + } + + @Override + public boolean isStateful(String subtopologyId) { + return false; + } + } +}