Skip to content

Commit

Permalink
Return WriteResponse from WriteAsync (#117)
Browse files Browse the repository at this point in the history
  • Loading branch information
xqrzd authored Jun 25, 2020
1 parent cb06b8e commit 95223c6
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 9 deletions.
19 changes: 17 additions & 2 deletions src/Knet.Kudu.Client/KuduClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ public async Task<KuduTable> OpenTableAsync(
return new KuduTable(response);
}

public async Task WriteAsync(
public async Task<WriteResponse> WriteAsync(
IEnumerable<KuduOperation> operations,
ExternalConsistencyMode externalConsistencyMode = ExternalConsistencyMode.ClientPropagated,
CancellationToken cancellationToken = default)
Expand Down Expand Up @@ -428,15 +428,28 @@ public async Task WriteAsync(
}

var results = await Task.WhenAll(tasks).ConfigureAwait(false);

return CreateWriteResponse(results);
}

private WriteResponse CreateWriteResponse(WriteResponsePB[] results)
{
long writeTimestamp = NoTimestamp;
List<KuduStatus> rowErrors = null;

foreach (var result in results)
{
var timestamp = (long)result.Timestamp;
var perRowErrors = result.PerRowErrors;

if (writeTimestamp == NoTimestamp || writeTimestamp < timestamp)
{
writeTimestamp = timestamp;
}

if (perRowErrors.Count > 0)
{
rowErrors = rowErrors ?? new List<KuduStatus>();
rowErrors ??= new List<KuduStatus>();

foreach (var rowError in perRowErrors)
{
Expand All @@ -450,6 +463,8 @@ public async Task WriteAsync(
{
throw new KuduWriteException(rowErrors);
}

return new WriteResponse(writeTimestamp);
}

private async Task<WriteResponsePB> WriteAsync(
Expand Down
15 changes: 15 additions & 0 deletions src/Knet.Kudu.Client/WriteResponse.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
namespace Knet.Kudu.Client
{
public class WriteResponse
{
/// <summary>
/// The HybridTime-encoded write timestamp.
/// </summary>
public long WriteTimestampRaw { get; }

public WriteResponse(long writeTimestampRaw)
{
WriteTimestampRaw = writeTimestampRaw;
}
}
}
11 changes: 4 additions & 7 deletions test/Knet.Kudu.Client.FunctionalTests/ClientStressTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ public class ClientStressTests : IAsyncLifetime
private KuduClient _client;
private TestConnectionFactory _testConnectionFactory;
private KuduTable _table;
private long _sharedWriteTimestamp;

public async Task InitializeAsync()
{
Expand Down Expand Up @@ -154,8 +153,6 @@ ValueTask HandleSessionExceptionAsync(SessionExceptionContext context)
flush = ThreadSafeRandom.Instance.NextBool();

currentRowKey++;

Volatile.Write(ref _sharedWriteTimestamp, _client.LastPropagatedTimestamp);
}

await session.FlushAsync();
Expand All @@ -171,9 +168,9 @@ private async Task ScanAsync(CancellationToken cancellationToken)

while (!cancellationToken.IsCancellationRequested)
{
var sharedWriteTimestamp = Volatile.Read(ref _sharedWriteTimestamp);
var timestamp = _client.LastPropagatedTimestamp;

if (sharedWriteTimestamp == 0)
if (timestamp == 0)
{
// Nothing has been written yet.
}
Expand Down Expand Up @@ -235,11 +232,11 @@ void AccumulateResults(ResultSet resultSet)

private KuduScannerBuilder GetScannerBuilder()
{
var sharedWriteTimestamp = Volatile.Read(ref _sharedWriteTimestamp);
var timestamp = _client.LastPropagatedTimestamp;

return _client.NewScanBuilder(_table)
.SetReadMode(ReadMode.ReadAtSnapshot)
.SnapshotTimestampRaw(sharedWriteTimestamp)
.SnapshotTimestampRaw(timestamp)
.SetFaultTolerant(true);
}

Expand Down

0 comments on commit 95223c6

Please sign in to comment.