Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use of LongAdder in StatisticsCollector #140

Open
dfa1 opened this issue Mar 10, 2024 · 3 comments
Open

Use of LongAdder in StatisticsCollector #140

dfa1 opened this issue Mar 10, 2024 · 3 comments
Labels

Comments

@dfa1
Copy link
Contributor

dfa1 commented Mar 10, 2024

According to javadoc:

This class is usually preferable to AtomicLong when multiple threads update a common sum that is used for purposes such as collecting statistics, not for fine-grained synchronization control. Under low update contention, the two classes have similar characteristics. But under high contention, expected throughput of this class is significantly higher, at the expense of higher space consumption.

I tried to update the StatisticsCollector to use LongAdder but it is not really possible to make it very efficient,
because StatisticsCollector uses increment and get pattern.

Example:

    @Override
    public <K> long incrementLoadCount(IncrementLoadCountStatisticsContext<K> context) {
        return loadCount.incrementAndGet();
    }

With LongAdder it would be something like:

    @Override
    public <K> long incrementLoadCount(IncrementLoadCountStatisticsContext<K> context) {
        loadCount.increment();
        return loadCount.sum(); // this could be more expensive
    }

sum() is triggered on every call, so it could slow down a bit, but luckily the return value is never used, at least internally.

I'm aware that changing all methods to return void it is a breaking change but in theory it could greatly improve performance under high contention (i.e. lot of threads updating statistics).

In case you're interested, here is the implementation:

/**
 * This simple collector uses {@link java.util.concurrent.atomic.LongAdder}s to collect
 * statistics
 *
 * @see org.dataloader.stats.StatisticsCollector
 */
public class ScalableStatisticsCollector implements StatisticsCollector {
    private final LongAdder loadCount = new LongAdder();
    private final LongAdder batchInvokeCount = new LongAdder();
    private final LongAdder batchLoadCount = new LongAdder();
    private final LongAdder cacheHitCount = new LongAdder();
    private final LongAdder batchLoadExceptionCount = new LongAdder();
    private final LongAdder loadErrorCount = new LongAdder();

    @Override
    public <K> long incrementLoadCount(IncrementLoadCountStatisticsContext<K> context) {
        loadCount.increment();
        return loadCount.sum();
    }

    @Deprecated
    @Override
    public long incrementLoadCount() {
        return incrementLoadCount(null);
    }

    @Override
    public <K> long incrementLoadErrorCount(IncrementLoadErrorCountStatisticsContext<K> context) {
        loadErrorCount.increment();
        return loadErrorCount.sum();
    }

    @Deprecated
    @Override
    public long incrementLoadErrorCount() {
        return incrementLoadErrorCount(null);
    }

    @Override
    public <K> long incrementBatchLoadCountBy(long delta, IncrementBatchLoadCountByStatisticsContext<K> context) {
        batchInvokeCount.increment();
        batchLoadCount.add(delta);
        return batchLoadCount.sum();
    }

    @Deprecated
    @Override
    public long incrementBatchLoadCountBy(long delta) {
        return incrementBatchLoadCountBy(delta, null);
    }

    @Override
    public <K> long incrementBatchLoadExceptionCount(IncrementBatchLoadExceptionCountStatisticsContext<K> context) {
        batchLoadExceptionCount.increment();
        return batchLoadExceptionCount.sum();
    }

    @Deprecated
    @Override
    public long incrementBatchLoadExceptionCount() {
        return incrementBatchLoadExceptionCount(null);
    }

    @Override
    public <K> long incrementCacheHitCount(IncrementCacheHitCountStatisticsContext<K> context) {
        cacheHitCount.increment();
        return cacheHitCount.sum();
    }

    @Deprecated
    @Override
    public long incrementCacheHitCount() {
        return incrementCacheHitCount(null);
    }

    @Override
    public Statistics getStatistics() {
        return new Statistics(loadCount.sum(), loadErrorCount.sum(), batchInvokeCount.sum(), batchLoadCount.sum(), batchLoadExceptionCount.sum(), cacheHitCount.sum());
    }

    @Override
    public String toString() {
        return getStatistics().toString();
    }
}
@dfa1
Copy link
Contributor Author

dfa1 commented Mar 17, 2024

Attaching a simple benchmark that is trying to reproduce a very busy thread pool used for batching:

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;

public class AtomicVsAdder {

    private static final ExecutorService EXECUTOR = Executors.newCachedThreadPool();

    public static void main(final String[] args) throws ExecutionException, InterruptedException {
        // knobs
        final Strategy strategy = new LongAdderStrategy();
        final long iterations = 100L * 1000 * 1000;

        // test
        System.out.println("testing with #cpu=" + Runtime.getRuntime().availableProcessors());
        final List<Future<?>> futures = new ArrayList<>();
        for (final int nThreads : List.of(1, 2, 4, 8, 16)) {
            System.out.println("start test with " + nThreads + " thread");
            final long start = System.nanoTime();

            for (int i = 0; i < nThreads; i++) {
                Future<?> submit = EXECUTOR.submit(() -> concurrentWork(strategy, iterations));
                futures.add(submit);
            }
            for (final Future<?> future : futures) {
                future.get(); // wait for all
            }
            final long end = System.nanoTime();
            System.out.println("done in " + Duration.ofNanos(end - start).toMillis() + "ms => result " + strategy.get());
            strategy.reset();
        }
        System.out.println("the end");
        EXECUTOR.shutdownNow();
    }

    @SuppressWarnings("SameParameterValue")
    private static void concurrentWork(final Strategy strategy, final long iterations) {
        long work = iterations;
        while (work-- > 0) {
            strategy.increment();
        }
    }

    interface Strategy {
        void increment();

        long get();

        void reset();
    }

    static class LongAdderStrategy implements Strategy {

        private LongAdder longAdder = new LongAdder();

        @Override
        public void increment() {
            longAdder.increment();
        }

        @Override
        public long get() {
            return longAdder.sum();
        }

        @Override
        public void reset() {
            longAdder = new LongAdder();
        }
    }

    static class AtomicLongStrategy implements Strategy {

        private final AtomicLong atomicLong = new AtomicLong(0);

        @Override
        public void increment() {
            atomicLong.incrementAndGet();
        }

        @Override
        public long get() {
            return atomicLong.get();
        }

        @Override
        public void reset() {
            atomicLong.set(0);
        }
    }

}

On my workstation I get the following numbers for AtomicLong:

testing with #cpu=4
start test with 1 thread
done in 702ms => result 100000000
start test with 2 thread
done in 3819ms => result 200000000
start test with 4 thread
done in 7603ms => result 400000000
start test with 8 thread
done in 15245ms => result 800000000
start test with 16 thread
done in 30556ms => result 1600000000
the end

and the following for LongAdder:

testing with #cpu=4
start test with 1 thread
done in 1105ms => result 100000000
start test with 2 thread
done in 1066ms => result 200000000
start test with 4 thread
done in 1095ms => result 400000000
start test with 8 thread
done in 2190ms => result 800000000
start test with 16 thread
done in 4443ms => result 1600000000
the end

Basically LongAdder outperforms AtomitcLong when nThreads >= 2. It would be nice to have another StatisticsCollector class using LongAdder to be used out of the box.

@bbakerman @dondonz what do you think?

@bbakerman
Copy link
Member

It would be nice to have another StatisticsCollector class using LongAdder to be used out of the box.

By all means add a new version that breaks the contract and does not return the value.

By the way NoOpStatisticsCollector is used by default right - so people have to opt into any of this anyway

Copy link

github-actions bot commented Jan 2, 2025

Hello, this issue has been inactive for 60 days, so we're marking it as stale. If you would like to continue this discussion, please comment within the next 30 days or we'll close the issue.

@github-actions github-actions bot added the Stale label Jan 2, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants