Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Option to set max tasks per instance per datastream #677

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
*/
package com.linkedin.datastream.server.assignment;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;

import org.slf4j.Logger;
Expand All @@ -22,6 +24,7 @@
import com.linkedin.datastream.server.api.strategy.AssignmentStrategy;

import static com.linkedin.datastream.server.assignment.BroadcastStrategyFactory.CFG_MAX_TASKS;
import static com.linkedin.datastream.server.assignment.BroadcastStrategyFactory.CFG_MAX_TASKS_PER_INSTANCE;


/**
Expand All @@ -30,13 +33,17 @@
* number of instances, so each instance could process multiple tasks for the same Datastream. If "maxTasks" is not
* provided, the strategy will broadcast one task to each of the instances in the cluster.
*
* The "maxTasksPerInstance" setting limits the number of tasks per datastream.
* If "maxTasksPerInstance" is not provided, the strategy will not limit the number of tasks per instance
*
* All the tasks are redistributed across all the instances equally.
*/
public class BroadcastStrategy implements AssignmentStrategy {

private static final Logger LOG = LoggerFactory.getLogger(BroadcastStrategy.class.getName());

private final Optional<Integer> _maxTasks;
private final Optional<Integer> _maxTasksPerInstance;

/**
* Constructor for BroadcastStrategy
Expand All @@ -47,6 +54,21 @@ public class BroadcastStrategy implements AssignmentStrategy {
*/
public BroadcastStrategy(Optional<Integer> maxTasks) {
_maxTasks = maxTasks;
_maxTasksPerInstance = Optional.empty();
}

/**
* Constructor for BroadcastStrategy
* @param maxTasks Maximum number of {@link DatastreamTask}s to create out
* of any {@link com.linkedin.datastream.common.Datastream}
* if no value is specified for the "maxTasks" config property
* at an individual datastream level.
* @param maxTasksPerInstance Maximum number of {@link DatastreamTask}s per instance to create out
* of any {@link com.linkedin.datastream.common.Datastream}
*/
public BroadcastStrategy(Optional<Integer> maxTasks, Optional<Integer> maxTasksPerInstance) {
_maxTasks = maxTasks;
_maxTasksPerInstance = maxTasksPerInstance;
}

@Override
Expand All @@ -55,9 +77,15 @@ public Map<String, Set<DatastreamTask>> assign(List<DatastreamGroup> datastreams

int totalAssignedTasks = currentAssignment.values().stream().mapToInt(Set::size).sum();
LOG.info("Assigning {} datastreams to {} instances with {} tasks", datastreams.size(), instances.size(),
totalAssignedTasks);
totalAssignedTasks);

// if there are no live instances, return empty assignment
if (instances.isEmpty()) {
return new HashMap<>();
}

Map<String, Set<DatastreamTask>> newAssignment = new HashMap<>();
Integer[] assignmentCountForDatastream = new Integer[instances.size()];

// Make a copy of the current assignment, since the strategy modifies it during calculation
Map<String, Set<DatastreamTask>> currentAssignmentCopy = new HashMap<>(currentAssignment.size());
Expand All @@ -71,6 +99,11 @@ public Map<String, Set<DatastreamTask>> assign(List<DatastreamGroup> datastreams
int instancePos = 0;
for (DatastreamGroup dg : datastreams) {
int numTasks = getNumTasks(dg, instances.size());
Optional<Integer> maxTasksPerInstance = getMaxNumTasksPerInstance(dg);

// initialize the assignment counts on each datastream
Arrays.fill(assignmentCountForDatastream, 0);

for (int taskPos = 0; taskPos < numTasks; taskPos++) {
String instance = instances.get(instancePos);

Expand All @@ -82,9 +115,15 @@ public Map<String, Set<DatastreamTask>> assign(List<DatastreamGroup> datastreams

currentAssignmentCopy.get(instance).remove(foundDatastreamTask);
newAssignment.get(instance).add(foundDatastreamTask);

// Move to the next instance
instancePos = (instancePos + 1) % instances.size();
assignmentCountForDatastream[instancePos]++;

// Move to the next instance or the next datastream if there is no available capacity
Optional<Integer> nextPos = getNextInstanceWithCapacity(instances, assignmentCountForDatastream, maxTasksPerInstance, instancePos);
if (nextPos.isPresent()) {
instancePos = nextPos.get();
} else {
break;
}
}
}

Expand All @@ -93,6 +132,21 @@ public Map<String, Set<DatastreamTask>> assign(List<DatastreamGroup> datastreams
return newAssignment;
}

private Optional<Integer> getNextInstanceWithCapacity(List<String> instances, Integer[] assignmentCountForDatastream,
Optional<Integer> maxTasksPerInstance, int prevPos) {
int pos = (prevPos + 1) % instances.size();

if (!maxTasksPerInstance.isPresent()) {
return Optional.of(pos);
}

while (assignmentCountForDatastream[pos] >= maxTasksPerInstance.get() && pos != prevPos) {
pos = (pos + 1) % instances.size();
}

return assignmentCountForDatastream[pos] < maxTasksPerInstance.get() ? Optional.of(pos) : Optional.empty();
}

private int getNumTasks(DatastreamGroup dg, int numInstances) {
// Look for an override in any of the datastream. In the case of multiple overrides, select the largest.
// If no override is present then use the default "_maxTasks" from config.
Expand All @@ -105,4 +159,16 @@ private int getNumTasks(DatastreamGroup dg, int numInstances) {
.max()
.orElse(_maxTasks.orElse(numInstances));
}

private Optional<Integer> getMaxNumTasksPerInstance(DatastreamGroup dg) {
// Look for an override in any of the datastream. In the case of multiple overrides, select the largest.
// If no override is present then use the default "_maxTasksPerInstance" from config.
OptionalInt overrideValue = dg.getDatastreams()
.stream()
.map(ds -> ds.getMetadata().get(CFG_MAX_TASKS_PER_INSTANCE))
.filter(Objects::nonNull)
.mapToInt(Integer::valueOf)
.max();
return overrideValue.isPresent() ? Optional.of(overrideValue.getAsInt()) : _maxTasksPerInstance;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,17 @@
public class BroadcastStrategyFactory implements AssignmentStrategyFactory {
// the number of datastream tasks to create for a datastream
public static final String CFG_MAX_TASKS = "maxTasks";
public static final String CFG_MAX_TASKS_PER_INSTANCE = "maxTasksPerInstance";

@Override
public AssignmentStrategy createStrategy(Properties assignmentStrategyProperties) {
VerifiableProperties props = new VerifiableProperties(assignmentStrategyProperties);
int cfgMaxTasks = props.getInt(CFG_MAX_TASKS, Integer.MIN_VALUE);
Optional<Integer> maxTasks = cfgMaxTasks > 0 ? Optional.of(cfgMaxTasks) : Optional.empty();
return new BroadcastStrategy(maxTasks);

int cfgMaxTasksPerInstance = props.getInt(CFG_MAX_TASKS_PER_INSTANCE, Integer.MIN_VALUE);
Optional<Integer> maxTasksPerInstance = cfgMaxTasksPerInstance > 0 ? Optional.of(cfgMaxTasksPerInstance) : Optional.empty();

return maxTasksPerInstance.isPresent() ? new BroadcastStrategy(maxTasks, maxTasksPerInstance) : new BroadcastStrategy(maxTasks);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.linkedin.datastream.testutil.DatastreamTestUtils;

import static com.linkedin.datastream.server.assignment.BroadcastStrategyFactory.CFG_MAX_TASKS;
import static com.linkedin.datastream.server.assignment.BroadcastStrategyFactory.CFG_MAX_TASKS_PER_INSTANCE;


/**
Expand Down Expand Up @@ -106,6 +107,42 @@ private void doTestMaxTasks(BroadcastStrategy strategy, int numInstances, int ex
Assert.assertEquals(totalTasks, expectedTotalTasks);
}

@Test
public void testMaxTasksPerInstance() {
int numDatastreams = 10;
int numInstances = 20;
int maxTasks = 400;
int maxTasksPerInstance = 2;
int expectedTotalTasks = numInstances * maxTasksPerInstance * numDatastreams;
List<DatastreamGroup> datastreams = generateDatastreams("ds", numDatastreams);
doTestMaxTasksPerInstance(new BroadcastStrategy(Optional.of(maxTasks), Optional.of(maxTasksPerInstance)), numInstances, expectedTotalTasks, datastreams);
}

@Test
public void testMaxTasksPerInstanceDatastreamOverride() {
int numDatastreams = 25;
int numInstances = 4;
int maxTasks = 400;
int maxTasksPerInstance = 2;
List<DatastreamGroup> datastreams = generateDatastreams("ds", numDatastreams);
datastreams.get(0).getDatastreams().get(0).getMetadata().put(CFG_MAX_TASKS_PER_INSTANCE, "4");
int expectedTotalTasks = (numInstances * maxTasksPerInstance * (numDatastreams - 1)) + (numInstances * 4);
doTestMaxTasksPerInstance(new BroadcastStrategy(Optional.of(maxTasks), Optional.of(maxTasksPerInstance)), numInstances, expectedTotalTasks, datastreams);
}

private void doTestMaxTasksPerInstance(BroadcastStrategy strategy, int numInstances, int expectedTotalTasks,
List<DatastreamGroup> datastreams) {
String[] instances = IntStream.range(0, numInstances).mapToObj(x -> "instance" + x).toArray(String[]::new);
Map<String, Set<DatastreamTask>> assignment =
strategy.assign(datastreams, Arrays.asList(instances), new HashMap<>());

int totalTasks = 0;
for (String instance : instances) {
totalTasks += assignment.get(instance).size();
}
Assert.assertEquals(totalTasks, expectedTotalTasks);
}

private List<DatastreamGroup> generateDatastreams(String namePrefix, int numberOfDatastreams) {
List<DatastreamGroup> datastreams = new ArrayList<>();
String type = DummyConnector.CONNECTOR_TYPE;
Expand Down