diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java index f75ddae460..77088462a7 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java @@ -348,6 +348,16 @@ static void setWriteBufferSize(RaftProperties properties, SizeInBytes writeBuffe setSizeInBytes(properties::set, WRITE_BUFFER_SIZE_KEY, writeBufferSize); } + String BYTE_ARRAY_SHARE_KEY = PREFIX + ".byte-array.share"; + boolean BYTE_ARRAY_SHARE_DEFAULT = true; + static boolean byteArrayShare(RaftProperties properties) { + return getBoolean(properties::getBoolean, + BYTE_ARRAY_SHARE_KEY, BYTE_ARRAY_SHARE_DEFAULT, getDefaultLog()); + } + static void setByteArrayShare(RaftProperties properties, boolean isShared) { + setBoolean(properties::setBoolean, BYTE_ARRAY_SHARE_KEY, isShared); + } + String FORCE_SYNC_NUM_KEY = PREFIX + ".force.sync.num"; int FORCE_SYNC_NUM_DEFAULT = 128; static int forceSyncNum(RaftProperties properties) { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java index e0fd41fbdf..4cb36392be 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java @@ -34,7 +34,7 @@ import java.nio.channels.FileChannel; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; -import java.util.function.Supplier; +import java.util.function.IntFunction; import java.util.zip.Checksum; public class SegmentedRaftLogOutputStream implements Closeable { @@ -53,7 +53,7 @@ public class SegmentedRaftLogOutputStream implements Closeable { private final File file; private final BufferedWriteChannel out; // buffered FileChannel for writing private final Checksum checksum; - private final Supplier sharedBuffer; + private final IntFunction byteArrayGet; private final long segmentMaxSize; private final long preallocatedSize; @@ -65,13 +65,13 @@ public SegmentedRaftLogOutputStream(File file, boolean append, long segmentMaxSi } SegmentedRaftLogOutputStream(File file, boolean append, long segmentMaxSize, - long preallocatedSize, ByteBuffer byteBuffer, Supplier sharedBuffer) + long preallocatedSize, ByteBuffer byteBuffer, IntFunction byteArrayGet) throws IOException { this.file = file; this.checksum = new PureJavaCrc32C(); this.segmentMaxSize = segmentMaxSize; this.preallocatedSize = preallocatedSize; - this.sharedBuffer = sharedBuffer; + this.byteArrayGet = byteArrayGet; this.out = BufferedWriteChannel.open(file, append, byteBuffer); if (!append) { @@ -98,7 +98,7 @@ public void write(LogEntryProto entry) throws IOException { final int serialized = entry.getSerializedSize(); final int proto = CodedOutputStream.computeUInt32SizeNoTag(serialized) + serialized; final int total = proto + 4; // proto and 4-byte checksum - final byte[] buf = sharedBuffer != null? sharedBuffer.get(): new byte[total]; + final byte[] buf = byteArrayGet != null? byteArrayGet.apply(total): new byte[total]; Preconditions.assertTrue(total <= buf.length, () -> "total = " + total + " > buf.length " + buf.length); preallocateIfNecessary(total); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java index 7bc673a79a..3ff18afb9e 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java @@ -19,7 +19,6 @@ import com.codahale.metrics.Gauge; import com.codahale.metrics.Timer; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto; import org.apache.ratis.protocol.ClientInvocationId; @@ -51,6 +50,7 @@ import java.util.Optional; import java.util.Queue; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.Supplier; @@ -132,6 +132,53 @@ synchronized void updateIndex(long i) { } } + static class ByteArray { + private final boolean shared; + private final int sizeLimit; + private volatile byte[] bytes; + + ByteArray(boolean shared, int sizeLimit) { + this.shared = shared; + this.sizeLimit = sizeLimit; + } + + byte[] get(int size) { + final byte[] byteArray = getImpl(size); + Preconditions.assertNotNull(byteArray, "byteArray"); + Preconditions.assertTrue(byteArray.length >= size); + return byteArray; + } + + private byte[] getImpl(int size) { + if (!shared) { + return new byte[size]; + } + + final byte[] existing = bytes; + if (existing != null && existing.length >= size) { + return existing; + } + + synchronized (this) { + if (bytes != null && bytes.length >= size) { + return bytes; + } + bytes = new byte[Math.min(sizeLimit, roundUpPowerOfTwo(size))]; + return bytes; + } + } + + static int roundUpPowerOfTwo(int n) { + final long highestOne = Integer.highestOneBit(n); + if (highestOne == n) { + return n; // n is a power of two. + } + return Math.toIntExact(highestOne << 1); + } + } + + private static final AtomicInteger COUNT = new AtomicInteger(); + private final Consumer infoIndexChange = s -> LOG.info("{}: {}", this, s); private final Consumer traceIndexChange = s -> LOG.trace("{}: {}", this, s); @@ -154,7 +201,7 @@ synchronized void updateIndex(long i) { private final Timer raftLogEnqueueingDelayTimer; private final SegmentedRaftLogMetrics raftLogMetrics; private final ByteBuffer writeBuffer; - private final Supplier sharedBuffer; + private final ByteArray byteArray; /** * The number of entries that have been written into the SegmentedRaftLogOutputStream but @@ -185,7 +232,7 @@ synchronized void updateIndex(long i) { SegmentedRaftLogWorker(RaftGroupMemberId memberId, StateMachine stateMachine, Runnable submitUpdateCommitEvent, RaftServer.Division server, RaftStorage storage, RaftProperties properties, SegmentedRaftLogMetrics metricRegistry) { - this.name = memberId + "-" + JavaUtils.getClassSimpleName(getClass()); + this.name = memberId + "-" + JavaUtils.getClassSimpleName(getClass()) + COUNT.getAndIncrement(); LOG.info("new {} for {}", name, storage); this.submitUpdateCommitEvent = submitUpdateCommitEvent; @@ -219,8 +266,9 @@ synchronized void updateIndex(long i) { final int bufferSize = RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt(); this.writeBuffer = ByteBuffer.allocateDirect(bufferSize); final int logEntryLimit = RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties).getSizeInt(); + final boolean share = RaftServerConfigKeys.Log.byteArrayShare(properties); // 4 bytes (serialized size) + logEntryLimit + 4 bytes (checksum) - this.sharedBuffer = MemoizedSupplier.valueOf(() -> new byte[logEntryLimit + 8]); + this.byteArray = new ByteArray(share, logEntryLimit + 8); this.unsafeFlush = RaftServerConfigKeys.Log.unsafeFlushEnabled(properties); this.asyncFlush = RaftServerConfigKeys.Log.asyncFlushEnabled(properties); if (asyncFlush && unsafeFlush) { @@ -368,7 +416,6 @@ private boolean shouldFlush() { return pendingFlushNum > 0 && queue.isEmpty(); } - @SuppressFBWarnings("NP_NULL_PARAM_DEREF") private void flushIfNecessary() throws IOException { if (shouldFlush()) { raftLogMetrics.onRaftLogFlush(); @@ -754,6 +801,6 @@ private void freeSegmentedRaftLogOutputStream() { private void allocateSegmentedRaftLogOutputStream(File file, boolean append) throws IOException { Preconditions.assertTrue(out == null && writeBuffer.position() == 0); out = new SegmentedRaftLogOutputStream(file, append, segmentMaxSize, - preallocatedSize, writeBuffer, sharedBuffer); + preallocatedSize, writeBuffer, byteArray::get); } }