Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RATIS-1746. Change the shared byte array in SegmentedRaftLogWorker to be configurable. #783

Open
wants to merge 1 commit into
base: branch-2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,16 @@ static void setWriteBufferSize(RaftProperties properties, SizeInBytes writeBuffe
setSizeInBytes(properties::set, WRITE_BUFFER_SIZE_KEY, writeBufferSize);
}

String BYTE_ARRAY_SHARE_KEY = PREFIX + ".byte-array.share";
boolean BYTE_ARRAY_SHARE_DEFAULT = true;
static boolean byteArrayShare(RaftProperties properties) {
return getBoolean(properties::getBoolean,
BYTE_ARRAY_SHARE_KEY, BYTE_ARRAY_SHARE_DEFAULT, getDefaultLog());
}
static void setByteArrayShare(RaftProperties properties, boolean isShared) {
setBoolean(properties::setBoolean, BYTE_ARRAY_SHARE_KEY, isShared);
}

String FORCE_SYNC_NUM_KEY = PREFIX + ".force.sync.num";
int FORCE_SYNC_NUM_DEFAULT = 128;
static int forceSyncNum(RaftProperties properties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import java.nio.channels.FileChannel;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import java.util.function.IntFunction;
import java.util.zip.Checksum;

public class SegmentedRaftLogOutputStream implements Closeable {
Expand All @@ -53,7 +53,7 @@ public class SegmentedRaftLogOutputStream implements Closeable {
private final File file;
private final BufferedWriteChannel out; // buffered FileChannel for writing
private final Checksum checksum;
private final Supplier<byte[]> sharedBuffer;
private final IntFunction<byte[]> byteArrayGet;

private final long segmentMaxSize;
private final long preallocatedSize;
Expand All @@ -65,13 +65,13 @@ public SegmentedRaftLogOutputStream(File file, boolean append, long segmentMaxSi
}

SegmentedRaftLogOutputStream(File file, boolean append, long segmentMaxSize,
long preallocatedSize, ByteBuffer byteBuffer, Supplier<byte[]> sharedBuffer)
long preallocatedSize, ByteBuffer byteBuffer, IntFunction<byte[]> byteArrayGet)
throws IOException {
this.file = file;
this.checksum = new PureJavaCrc32C();
this.segmentMaxSize = segmentMaxSize;
this.preallocatedSize = preallocatedSize;
this.sharedBuffer = sharedBuffer;
this.byteArrayGet = byteArrayGet;
this.out = BufferedWriteChannel.open(file, append, byteBuffer);

if (!append) {
Expand All @@ -98,7 +98,7 @@ public void write(LogEntryProto entry) throws IOException {
final int serialized = entry.getSerializedSize();
final int proto = CodedOutputStream.computeUInt32SizeNoTag(serialized) + serialized;
final int total = proto + 4; // proto and 4-byte checksum
final byte[] buf = sharedBuffer != null? sharedBuffer.get(): new byte[total];
final byte[] buf = byteArrayGet != null? byteArrayGet.apply(total): new byte[total];
Preconditions.assertTrue(total <= buf.length, () -> "total = " + total + " > buf.length " + buf.length);
preallocateIfNecessary(total);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import com.codahale.metrics.Gauge;
import com.codahale.metrics.Timer;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
import org.apache.ratis.protocol.ClientInvocationId;
Expand Down Expand Up @@ -51,6 +50,7 @@
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;

Expand Down Expand Up @@ -132,6 +132,53 @@ synchronized void updateIndex(long i) {
}
}

static class ByteArray {
private final boolean shared;
private final int sizeLimit;
private volatile byte[] bytes;

ByteArray(boolean shared, int sizeLimit) {
this.shared = shared;
this.sizeLimit = sizeLimit;
}

byte[] get(int size) {
final byte[] byteArray = getImpl(size);
Preconditions.assertNotNull(byteArray, "byteArray");
Preconditions.assertTrue(byteArray.length >= size);
return byteArray;
}

private byte[] getImpl(int size) {
if (!shared) {
return new byte[size];
}

final byte[] existing = bytes;
if (existing != null && existing.length >= size) {
return existing;
}

synchronized (this) {
if (bytes != null && bytes.length >= size) {
return bytes;
}
bytes = new byte[Math.min(sizeLimit, roundUpPowerOfTwo(size))];
return bytes;
}
}

static int roundUpPowerOfTwo(int n) {
final long highestOne = Integer.highestOneBit(n);
if (highestOne == n) {
return n; // n is a power of two.
}
return Math.toIntExact(highestOne << 1);
}
}

private static final AtomicInteger COUNT = new AtomicInteger();

private final Consumer<Object> infoIndexChange = s -> LOG.info("{}: {}", this, s);
private final Consumer<Object> traceIndexChange = s -> LOG.trace("{}: {}", this, s);

Expand All @@ -154,7 +201,7 @@ synchronized void updateIndex(long i) {
private final Timer raftLogEnqueueingDelayTimer;
private final SegmentedRaftLogMetrics raftLogMetrics;
private final ByteBuffer writeBuffer;
private final Supplier<byte[]> sharedBuffer;
private final ByteArray byteArray;

/**
* The number of entries that have been written into the SegmentedRaftLogOutputStream but
Expand Down Expand Up @@ -185,7 +232,7 @@ synchronized void updateIndex(long i) {
SegmentedRaftLogWorker(RaftGroupMemberId memberId, StateMachine stateMachine, Runnable submitUpdateCommitEvent,
RaftServer.Division server, RaftStorage storage, RaftProperties properties,
SegmentedRaftLogMetrics metricRegistry) {
this.name = memberId + "-" + JavaUtils.getClassSimpleName(getClass());
this.name = memberId + "-" + JavaUtils.getClassSimpleName(getClass()) + COUNT.getAndIncrement();
LOG.info("new {} for {}", name, storage);

this.submitUpdateCommitEvent = submitUpdateCommitEvent;
Expand Down Expand Up @@ -219,8 +266,9 @@ synchronized void updateIndex(long i) {
final int bufferSize = RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt();
this.writeBuffer = ByteBuffer.allocateDirect(bufferSize);
final int logEntryLimit = RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties).getSizeInt();
final boolean share = RaftServerConfigKeys.Log.byteArrayShare(properties);
// 4 bytes (serialized size) + logEntryLimit + 4 bytes (checksum)
this.sharedBuffer = MemoizedSupplier.valueOf(() -> new byte[logEntryLimit + 8]);
this.byteArray = new ByteArray(share, logEntryLimit + 8);
this.unsafeFlush = RaftServerConfigKeys.Log.unsafeFlushEnabled(properties);
this.asyncFlush = RaftServerConfigKeys.Log.asyncFlushEnabled(properties);
if (asyncFlush && unsafeFlush) {
Expand Down Expand Up @@ -368,7 +416,6 @@ private boolean shouldFlush() {
return pendingFlushNum > 0 && queue.isEmpty();
}

@SuppressFBWarnings("NP_NULL_PARAM_DEREF")
private void flushIfNecessary() throws IOException {
if (shouldFlush()) {
raftLogMetrics.onRaftLogFlush();
Expand Down Expand Up @@ -754,6 +801,6 @@ private void freeSegmentedRaftLogOutputStream() {
private void allocateSegmentedRaftLogOutputStream(File file, boolean append) throws IOException {
Preconditions.assertTrue(out == null && writeBuffer.position() == 0);
out = new SegmentedRaftLogOutputStream(file, append, segmentMaxSize,
preallocatedSize, writeBuffer, sharedBuffer);
preallocatedSize, writeBuffer, byteArray::get);
}
}