Skip to content

Commit

Permalink
Optimize the merging of operator stats
Browse files Browse the repository at this point in the history
  • Loading branch information
shangm2 committed Jan 22, 2025
1 parent 07b9896 commit dfcc4ca
Showing 1 changed file with 8 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,8 @@ public static StageExecutionInfo create(
{
TaskStatsAggregator taskStatsAggregator = new TaskStatsAggregator();
taskStatsAggregator.increaseTotalTaskCount(taskInfos.size());

taskStatsAggregator.mergeRuntimeStats(stageRuntimeStats);

List<TaskStats> allTaskStats = new ArrayList<>();

for (TaskInfo taskInfo : taskInfos) {
TaskState taskState = taskInfo.getTaskStatus().getState();
if (taskState.isDone()) {
Expand All @@ -88,7 +85,6 @@ public static StageExecutionInfo create(
}

TaskStats taskStats = taskInfo.getStats();
allTaskStats.add(taskStats);

if (state == FINISHED && taskInfo.getTaskStatus().getState() == TaskState.FAILED) {
taskStatsAggregator.increaseRetriedCpuTime(taskStats.getTotalCpuTimeInNanos());
Expand All @@ -100,9 +96,6 @@ public static StageExecutionInfo create(
}

taskStatsAggregator.increaseBufferedDataSize(taskInfo.getOutputBuffers().getTotalBufferedBytes());
}

for (TaskStats taskStats : allTaskStats) {
taskStatsAggregator.processTaskStats(taskStats);
}

Expand Down Expand Up @@ -221,13 +214,13 @@ public static StageExecutionInfo unscheduledExecutionInfo(int stageId, boolean i
private static class TaskStatsAggregator
{
private int totalTasks;
int runningTasks = 0;
int completedTasks = 0;
long retriedCpuTime = 0;
long bufferedDataSize = 0;
private int runningTasks;
private int completedTasks;
private long retriedCpuTime;
private long bufferedDataSize;

boolean fullyBlocked = true;
Set<BlockedReason> blockedReasons = new HashSet<>();
private boolean fullyBlocked = true;
private final Set<BlockedReason> blockedReasons = new HashSet<>();

private int totalDrivers;
private int queuedDrivers;
Expand Down Expand Up @@ -263,8 +256,8 @@ private static class TaskStatsAggregator
private int maxFullGcSec;
private int totalFullGcSec;

Map<String, OperatorStats> operatorToStats = new HashMap<>();
RuntimeStats mergedRuntimeStats = new RuntimeStats();
private final Map<String, OperatorStats> operatorToStats = new HashMap<>();
private final RuntimeStats mergedRuntimeStats = new RuntimeStats();

public void processTaskStats(TaskStats taskStats)
{
Expand Down

0 comments on commit dfcc4ca

Please sign in to comment.