Skip to content

Commit

Permalink
Api review (#93)
Browse files Browse the repository at this point in the history
API adjustments for the first preview.
  • Loading branch information
xqrzd authored May 24, 2020
1 parent 43ce686 commit 43b6be1
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 12 deletions.
40 changes: 29 additions & 11 deletions src/Knet.Kudu.Client/KuduClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -287,28 +287,46 @@ public async Task DeleteTableAsync(
await SendRpcAsync(rpc, cancellationToken).ConfigureAwait(false);
}

public async Task<List<ListTablesResponsePB.TableInfo>> GetTablesAsync(
public async Task<List<TableInfo>> GetTablesAsync(
string nameFilter = null, CancellationToken cancellationToken = default)
{
var rpc = new ListTablesRequest(nameFilter);
var response = await SendRpcAsync(rpc, cancellationToken).ConfigureAwait(false);
var response = await SendRpcAsync(rpc, cancellationToken)
.ConfigureAwait(false);

var tables = response.Tables;
var results = new List<TableInfo>(tables.Count);

return response.Tables;
foreach (var tablePb in tables)
{
var tableId = tablePb.Id.ToStringUtf8();
var table = new TableInfo(tablePb.Name, tableId);
results.Add(table);
}

return results;
}

/// <summary>
/// Get the list of running tablet servers.
/// </summary>
/// <param name="cancellationToken">The cancellation token.</param>
public async Task<List<ListTabletServersResponsePB.Entry>> GetTabletServersAsync(
public async Task<List<TabletServerInfo>> GetTabletServersAsync(
CancellationToken cancellationToken = default)
{
var rpc = new ListTabletServersRequest();
var response = await SendRpcAsync(rpc, cancellationToken)
.ConfigureAwait(false);

// TODO: Create managed wrapper for this response.
return response.Servers;
var servers = response.Servers;
var results = new List<TabletServerInfo>(servers.Count);
foreach (var serverPb in servers)
{
var serverInfo = serverPb.ToTabletServerInfo();
results.Add(serverInfo);
}

return results;
}

/// <summary>
Expand All @@ -327,15 +345,15 @@ public async Task<KuduTableStatistics> GetTableStatisticsAsync(
return new KuduTableStatistics(response.OnDiskSize, response.LiveRowCount);
}

public Task<List<RemoteTablet>> GetTableLocationsAsync(
internal Task<List<RemoteTablet>> GetTableLocationsAsync(
string tableId, byte[] partitionKeyStart, int fetchBatchSize,
CancellationToken cancellationToken = default)
{
return GetTableLocationsAsync(
tableId, partitionKeyStart, null, fetchBatchSize, cancellationToken);
}

public async Task<List<RemoteTablet>> GetTableLocationsAsync(
internal async Task<List<RemoteTablet>> GetTableLocationsAsync(
string tableId, byte[] partitionKeyStart, byte[] partitionKeyEnd,
int fetchBatchSize, CancellationToken cancellationToken = default)
{
Expand Down Expand Up @@ -664,7 +682,7 @@ private TableLocationsCache GetTableLocationsCache(string tableId)
#endif
}

public async Task LoopLocateTableAsync(
private async Task LoopLocateTableAsync(
KuduTable table,
byte[] startPartitionKey,
byte[] endPartitionKey,
Expand Down Expand Up @@ -760,7 +778,7 @@ private Task<List<KeyRangePB>> GetTabletKeyRangesAsync(
return SendRpcAsync(rpc, cancellationToken);
}

public async ValueTask<List<KeyRange>> GetTableKeyRangesAsync(
internal async ValueTask<List<KeyRange>> GetTableKeyRangesAsync(
KuduTable table,
byte[] startPrimaryKey,
byte[] endPrimaryKey,
Expand Down Expand Up @@ -1000,7 +1018,7 @@ private bool TryGetConnectResponse(
return true;
}

public async Task<T> SendRpcAsync<T>(
internal async Task<T> SendRpcAsync<T>(
KuduRpc<T> rpc, CancellationToken cancellationToken = default)
{
using var cts = new CancellationTokenSource(_defaultOperationTimeoutMs);
Expand Down
23 changes: 23 additions & 0 deletions src/Knet.Kudu.Client/TableInfo.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
namespace Knet.Kudu.Client
{
public class TableInfo
{
/// <summary>
/// The table name.
/// </summary>
public string TableName { get; }

/// <summary>
/// The table Id.
/// </summary>
public string TableId { get; }

public TableInfo(string tableName, string tableId)
{
TableName = tableName;
TableId = tableId;
}

public override string ToString() => TableName;
}
}
67 changes: 67 additions & 0 deletions src/Knet.Kudu.Client/TabletServerInfo.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
using System;
using System.Collections.Generic;
using Knet.Kudu.Client.Connection;
using Knet.Kudu.Client.Protocol.Master;

namespace Knet.Kudu.Client
{
public class TabletServerInfo
{
public string TsUuid { get; }

public int MillisSinceHeartbeat { get; }

public string Location { get; }

public TabletServerState State { get; }

public IReadOnlyList<HostAndPort> RpcAddresses { get; }

public IReadOnlyList<HostAndPort> HttpAddresses { get; }

public string SoftwareVersion { get; }

public bool HttpsEnabled { get; }

public DateTimeOffset StartTime { get; }

public TabletServerInfo(
string tsUuid,
int millisSinceHeartbeat,
string location,
TabletServerState state,
IReadOnlyList<HostAndPort> rpcAddresses,
IReadOnlyList<HostAndPort> httpAddresses,
string softwareVersion,
bool httpsEnabled,
DateTimeOffset startTime)
{
TsUuid = tsUuid;
MillisSinceHeartbeat = millisSinceHeartbeat;
Location = location;
State = state;
RpcAddresses = rpcAddresses;
HttpAddresses = httpAddresses;
SoftwareVersion = softwareVersion;
HttpsEnabled = httpsEnabled;
StartTime = startTime;
}
}

public enum TabletServerState
{
/// <summary>
/// Default value for backwards compatibility.
/// </summary>
Unknown = TServerStatePB.UnknownState,
/// <summary>
/// No state for the tserver.
/// </summary>
None = TServerStatePB.None,
/// <summary>
/// New replicas are not added to the tserver, and failed replicas on
/// the tserver are not re-replicated.
/// </summary>
MaintenanceMode = TServerStatePB.MaintenanceMode,
}
}
39 changes: 38 additions & 1 deletion src/Knet.Kudu.Client/Util/ProtobufHelper.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
using System.Buffers;
using System;
using System.Buffers;
using System.Collections.Generic;
using System.Linq;
using Knet.Kudu.Client.Connection;
using Knet.Kudu.Client.Protocol;
using Knet.Kudu.Client.Protocol.Master;
using Knet.Kudu.Client.Protocol.Rpc;
using ProtoBuf;

Expand Down Expand Up @@ -109,6 +112,40 @@ public static ColumnSchemaPB ToColumnSchemaPb(this ColumnSchema columnSchema)
};
}

public static TabletServerInfo ToTabletServerInfo(
this ListTabletServersResponsePB.Entry entry)
{
var tsUuid = entry.InstanceId.PermanentUuid.ToStringUtf8();
var registration = entry.Registration;

var rpcAddresses = new List<HostAndPort>(registration.RpcAddresses.Count);
foreach (var rpcAddress in registration.RpcAddresses)
{
var hostPort = rpcAddress.ToHostAndPort();
rpcAddresses.Add(hostPort);
}

var httpAddresses = new List<HostAndPort>(registration.HttpAddresses.Count);
foreach (var rpcAddress in registration.HttpAddresses)
{
var hostPort = rpcAddress.ToHostAndPort();
httpAddresses.Add(hostPort);
}

var startTime = DateTimeOffset.FromUnixTimeSeconds(registration.StartTime);

return new TabletServerInfo(
tsUuid,
entry.MillisSinceHeartbeat,
entry.Location,
(TabletServerState)entry.State,
rpcAddresses,
httpAddresses,
registration.SoftwareVersion,
registration.HttpsEnabled,
startTime.ToLocalTime());
}

public static PartitionSchema CreatePartitionSchema(
PartitionSchemaPB partitionSchemaPb, KuduSchema schema)
{
Expand Down

0 comments on commit 43b6be1

Please sign in to comment.