From d07c2d16def6a41c9df7f0f361e30717a33893bb Mon Sep 17 00:00:00 2001 From: xqrzd <4950876+xqrzd@users.noreply.github.com> Date: Mon, 14 Sep 2020 20:41:12 -0400 Subject: [PATCH] Add KuduPartitioner (#133) The API allows a client to determine which partition a row falls into without actually writing that row. --- src/Knet.Kudu.Client/KuduClient.cs | 49 ++++- src/Knet.Kudu.Client/KuduPartitioner.cs | 205 ++++++++++++++++++ src/Knet.Kudu.Client/KuduScanTokenBuilder.cs | 2 +- src/Knet.Kudu.Client/Tablet/RemoteTablet.cs | 22 +- .../KuduPartitionerTests.cs | 129 +++++++++++ 5 files changed, 388 insertions(+), 19 deletions(-) create mode 100644 src/Knet.Kudu.Client/KuduPartitioner.cs create mode 100644 test/Knet.Kudu.Client.FunctionalTests/KuduPartitionerTests.cs diff --git a/src/Knet.Kudu.Client/KuduClient.cs b/src/Knet.Kudu.Client/KuduClient.cs index 6e4f7ee2..8b8a8822 100644 --- a/src/Knet.Kudu.Client/KuduClient.cs +++ b/src/Knet.Kudu.Client/KuduClient.cs @@ -532,6 +532,19 @@ public IKuduSession NewSession(KuduSessionOptions options) return new KuduSession(this, options, _loggerFactory); } + public async ValueTask CreatePartitionerAsync( + KuduTable table, CancellationToken cancellationToken = default) + { + var tablets = await LoopLocateTableAsync( + table.TableId, + null, + null, + 1000, + cancellationToken).ConfigureAwait(false); + + return new KuduPartitioner(table, tablets); + } + private async Task OpenTableAsync( TableIdentifierPB tableIdentifier, CancellationToken cancellationToken = default) { @@ -723,8 +736,28 @@ private TableLocationsCache GetTableLocationsCache(string tableId) #endif } - private async Task LoopLocateTableAsync( - KuduTable table, + public async ValueTask> LoopLocateTableAsync( + string tableId, + byte[] startPartitionKey, + byte[] endPartitionKey, + int fetchBatchSize, + CancellationToken cancellationToken = default) + { + var tablets = new List(); + + await LoopLocateTableAsync( + tableId, + startPartitionKey, + endPartitionKey, + fetchBatchSize, + tablets, + cancellationToken).ConfigureAwait(false); + + return tablets; + } + + private async ValueTask LoopLocateTableAsync( + string tableId, byte[] startPartitionKey, byte[] endPartitionKey, int fetchBatchSize, @@ -741,7 +774,6 @@ private async Task LoopLocateTableAsync( // The next partition key to look up. If null, then it represents // the minimum partition key, If empty, it represents the maximum key. byte[] partitionKey = startPartitionKey; - string tableId = table.TableId; // Continue while the partition key is the minimum, or it is not the maximum // and it is less than the end partition key. @@ -771,7 +803,7 @@ await GetTableLocationsAsync(tableId, key, fetchBatchSize, cancellationToken) .ConfigureAwait(false); await LoopLocateTableAsync( - table, + tableId, lookupKey, endPartitionKey, fetchBatchSize, @@ -820,7 +852,7 @@ private Task> GetTabletKeyRangesAsync( } internal async ValueTask> GetTableKeyRangesAsync( - KuduTable table, + string tableId, byte[] startPrimaryKey, byte[] endPrimaryKey, byte[] startPartitionKey, @@ -829,14 +861,11 @@ internal async ValueTask> GetTableKeyRangesAsync( long splitSizeBytes, CancellationToken cancellationToken = default) { - var tablets = new List(); - - await LoopLocateTableAsync( - table, + var tablets = await LoopLocateTableAsync( + tableId, startPartitionKey, endPartitionKey, fetchBatchSize, - tablets, cancellationToken).ConfigureAwait(false); var keyRanges = new List(tablets.Count); diff --git a/src/Knet.Kudu.Client/KuduPartitioner.cs b/src/Knet.Kudu.Client/KuduPartitioner.cs new file mode 100644 index 00000000..b249f4af --- /dev/null +++ b/src/Knet.Kudu.Client/KuduPartitioner.cs @@ -0,0 +1,205 @@ +using System; +using System.Collections.Generic; +using Knet.Kudu.Client.Exceptions; +using Knet.Kudu.Client.Internal; +using Knet.Kudu.Client.Tablet; +using Knet.Kudu.Client.Util; + +namespace Knet.Kudu.Client +{ + /// + /// + /// A allows clients to determine the target + /// partition of a row without actually performing a write. The set of + /// partitions is eagerly fetched when the KuduPartitioner is constructed + /// so that the actual partitioning step can be performed synchronously + /// without any network trips. + /// + /// + /// + /// NOTE: Because this operates on a metadata snapshot retrieved at + /// construction time, it will not reflect any metadata changes to the + /// table that have occurred since its creation. + /// + /// + public class KuduPartitioner + { + private readonly PartitionSchema _partitionSchema; + private readonly AvlTree _cache; + + public int NumPartitions { get; } + + public KuduPartitioner(KuduTable table, List tablets) + { + _partitionSchema = table.PartitionSchema; + _cache = new AvlTree(); + + NumPartitions = tablets.Count; + InitializeCache(tablets); + } + + /// + /// Determine if the given row falls into a valid partition. + /// + /// The row to check. + public bool IsCovered(PartialRow row) + { + var entry = GetCacheEntry(row); + return entry.IsCoveredRange; + } + + public PartitionResult GetRowTablet(PartialRow row) + { + var entry = GetCacheEntry(row); + + if (entry.IsNonCoveredRange) + { + throw new NonCoveredRangeException( + entry.LowerBoundPartitionKey, + entry.UpperBoundPartitionKey); + } + + return new PartitionResult(entry.Tablet, entry.PartitionIndex); + } + + private PartitionerLocationEntry GetCacheEntry(PartialRow row) + { + var partitionSchema = _partitionSchema; + int maxSize = KeyEncoder.CalculateMaxPartitionKeySize(row, partitionSchema); + Span buffer = stackalloc byte[maxSize]; + + KeyEncoder.EncodePartitionKey( + row, + partitionSchema, + buffer, + out int bytesWritten); + + var partitionKey = buffer.Slice(0, bytesWritten); + + return (PartitionerLocationEntry)_cache.FloorEntry(partitionKey); + } + + private void InitializeCache(List tablets) + { + var newEntries = new List(); + int partitionIndex = 0; + + if (tablets.Count == 0) + { + // If there are no tablets in the response, then the table is empty. If + // there were any tablets in the table they would have been returned, since + // the master guarantees that if the partition key falls in a non-covered + // range, the previous tablet will be returned, and we did not set an upper + // bound partition key on the request. + + newEntries.Add(PartitionerLocationEntry.NewNonCoveredRange2( + Array.Empty(), + Array.Empty())); + } + else + { + // The comments below will reference the following diagram: + // + // +---+ +---+---+ + // | | | | | + // A | B | C | D | E | F + // | | | | | + // +---+ +---+---+ + // + // It depicts a tablet locations response from the master containing three + // tablets: B, D and E. Three non-covered ranges are present: A, C, and F. + // An RPC response containing B, D and E could occur if the lookup partition + // key falls in A, B, or C, although the existence of A as an initial + // non-covered range can only be inferred if the lookup partition key falls + // in A. + + byte[] firstLowerBound = tablets[0].Partition.PartitionKeyStart; + + if (firstLowerBound.Length > 0) + { + // There is an initial non-covered range, such as A. + newEntries.Add(PartitionerLocationEntry.NewNonCoveredRange2( + Array.Empty(), firstLowerBound)); + } + + // lastUpperBound tracks the upper bound of the previously processed + // entry, so that we can determine when we have found a non-covered range. + byte[] lastUpperBound = firstLowerBound; + + foreach (var tablet in tablets) + { + byte[] tabletLowerBound = tablet.Partition.PartitionKeyStart; + byte[] tabletUpperBound = tablet.Partition.PartitionKeyEnd; + + if (lastUpperBound.SequenceCompareTo(tabletLowerBound) < 0) + { + // There is a non-covered range between the previous tablet and this tablet. + // This will discover C while processing the tablet location for D. + newEntries.Add(PartitionerLocationEntry.NewNonCoveredRange2( + lastUpperBound, tabletLowerBound)); + } + + lastUpperBound = tabletUpperBound; + + // Now add the tablet itself (such as B, D, or E). + newEntries.Add(PartitionerLocationEntry.NewTablet2(tablet, partitionIndex++)); + } + + if (lastUpperBound.Length > 0) + { + // There is a non-covered range between the last tablet and the end + // of the partition key space, such as F. + newEntries.Add(PartitionerLocationEntry.NewNonCoveredRange2( + lastUpperBound, Array.Empty())); + } + } + + foreach (var entry in newEntries) + _cache.Insert(entry); + } + + private class PartitionerLocationEntry : TableLocationEntry + { + public int PartitionIndex { get; } + + public PartitionerLocationEntry( + RemoteTablet tablet, + byte[] lowerBoundPartitionKey, + byte[] upperBoundPartitionKey, + int partitionIndex) : base(tablet, lowerBoundPartitionKey, upperBoundPartitionKey, -1) + { + PartitionIndex = partitionIndex; + } + + public static PartitionerLocationEntry NewNonCoveredRange2( + byte[] lowerBoundPartitionKey, + byte[] upperBoundPartitionKey) + { + return new PartitionerLocationEntry( + null, + lowerBoundPartitionKey, + upperBoundPartitionKey, + -1); + } + + public static PartitionerLocationEntry NewTablet2( + RemoteTablet tablet, int partitionIndex) + { + return new PartitionerLocationEntry(tablet, null, null, partitionIndex); + } + } + } + + public readonly struct PartitionResult + { + public RemoteTablet Tablet { get; } + + public int PartitionIndex { get; } + + public PartitionResult(RemoteTablet tablet, int partitionIndex) + { + Tablet = tablet; + PartitionIndex = partitionIndex; + } + } +} diff --git a/src/Knet.Kudu.Client/KuduScanTokenBuilder.cs b/src/Knet.Kudu.Client/KuduScanTokenBuilder.cs index ed45f472..ea3df1cd 100644 --- a/src/Knet.Kudu.Client/KuduScanTokenBuilder.cs +++ b/src/Knet.Kudu.Client/KuduScanTokenBuilder.cs @@ -161,7 +161,7 @@ public async ValueTask> BuildAsync( var partitionRange = pruner.NextPartitionKeyRange; var newKeyRanges = await Client.GetTableKeyRangesAsync( - Table, + Table.TableId, LowerBoundPrimaryKey, UpperBoundPrimaryKey, partitionRange.Lower.Length == 0 ? null : partitionRange.Lower, diff --git a/src/Knet.Kudu.Client/Tablet/RemoteTablet.cs b/src/Knet.Kudu.Client/Tablet/RemoteTablet.cs index 05ceb0d8..0d1cfa44 100644 --- a/src/Knet.Kudu.Client/Tablet/RemoteTablet.cs +++ b/src/Knet.Kudu.Client/Tablet/RemoteTablet.cs @@ -6,20 +6,26 @@ namespace Knet.Kudu.Client.Tablet { /// + /// /// This class encapsulates the information regarding a tablet and its locations. - /// + /// + /// + /// /// RemoteTablet's main function is to keep track of where the leader for this /// tablet is. For example, an RPC might call GetServerInfo, contact that TS, find - /// it's not the leader anymore, and then re-fetch the tablet locations. - /// + /// it's not the leader anymore, and then re-fetch the tablet locations. This + /// class is immutable. + /// + /// + /// /// A RemoteTablet's life is expected to be long in a cluster where roles aren't /// changing often, and short when they do since the Kudu client will replace the /// RemoteTablet it caches with new ones after getting tablet locations from the master. + /// /// public class RemoteTablet : IEquatable { private readonly ServerInfoCache _cache; - private readonly List _replicas; public string TableId { get; } @@ -27,22 +33,22 @@ public class RemoteTablet : IEquatable public Partition Partition { get; } + public IReadOnlyList Replicas { get; } + public RemoteTablet( string tableId, string tabletId, Partition partition, ServerInfoCache cache, - List replicas) + IReadOnlyList replicas) { TableId = tableId; TabletId = tabletId; Partition = partition; + Replicas = replicas; _cache = cache; - _replicas = replicas; } - public IReadOnlyList Replicas => _replicas; - public ServerInfo GetServerInfo( ReplicaSelection replicaSelection, string location = null) { diff --git a/test/Knet.Kudu.Client.FunctionalTests/KuduPartitionerTests.cs b/test/Knet.Kudu.Client.FunctionalTests/KuduPartitionerTests.cs new file mode 100644 index 00000000..7de0d6dd --- /dev/null +++ b/test/Knet.Kudu.Client.FunctionalTests/KuduPartitionerTests.cs @@ -0,0 +1,129 @@ +using System.Threading.Tasks; +using Knet.Kudu.Client.Exceptions; +using Knet.Kudu.Client.FunctionalTests.MiniCluster; +using Knet.Kudu.Client.FunctionalTests.Util; +using McMaster.Extensions.Xunit; +using Xunit; + +namespace Knet.Kudu.Client.FunctionalTests +{ + [MiniKuduClusterTest] + public class KuduPartitionerTests + { + [SkippableFact] + public async Task TestPartitioner() + { + await using var miniCluster = await new MiniKuduClusterBuilder().BuildAsync(); + await using var client = miniCluster.CreateClient(); + + // Create a table with the following 9 partitions: + // + // hash bucket + // key 0 1 2 + // ----------------- + // <3333 x x x + // 3333-6666 x x x + // >=6666 x x x + + int numRanges = 3; + int numHashPartitions = 3; + var splits = new int[] { 3333, 6666 }; + + var builder = ClientTestUtil.GetBasicSchema() + .SetTableName(nameof(TestPartitioner)) + .AddHashPartitions(numHashPartitions, "key") + .SetRangePartitionColumns("key"); + + foreach (var split in splits) + { + builder.AddSplitRow(row => row.SetInt32("key", split)); + } + + var table = await client.CreateTableAsync(builder); + + var partitioner = await client.CreatePartitionerAsync(table); + int numPartitions = partitioner.NumPartitions; + + Assert.Equal(numRanges * numHashPartitions, numPartitions); + + // Partition a bunch of rows, counting how many fall into each partition. + int numRowsToPartition = 10000; + var countsByPartition = new int[numPartitions]; + for (int i = 0; i < numRowsToPartition; i++) + { + var row = table.NewInsert(); + row.SetInt32("key", i); + var tabletInfo = partitioner.GetRowTablet(row); + countsByPartition[tabletInfo.PartitionIndex]++; + } + + // We don't expect a completely even division of rows into partitions, but + // we should be within 10% of that. + int expectedPerPartition = numRowsToPartition / numPartitions; + int fuzziness = expectedPerPartition / 10; + int minPerPartition = expectedPerPartition - fuzziness; + int maxPerPartition = expectedPerPartition + fuzziness; + for (int i = 0; i < numPartitions; i++) + { + Assert.True(minPerPartition <= countsByPartition[i]); + Assert.True(maxPerPartition >= countsByPartition[i]); + } + + // Drop the first and third range partition. + await client.AlterTableAsync(new AlterTableBuilder(table) + .DropRangePartition((lower, upper) => upper.SetInt32("key", splits[0])) + .DropRangePartition((lower, upper) => lower.SetInt32("key", splits[1]))); + + // The existing partitioner should still return results based on the table + // state at the time it was created, and successfully return partitions + // for rows in the now-dropped range. + Assert.Equal(numRanges * numHashPartitions, partitioner.NumPartitions); + var row2 = table.NewInsert(); + row2.SetInt32("key", 1000); + Assert.Equal(0, partitioner.GetRowTablet(row2).PartitionIndex); + + // If we recreate the partitioner, it should get the new partitioning info. + partitioner = await client.CreatePartitionerAsync(table); + Assert.Equal(numHashPartitions, partitioner.NumPartitions); + } + + [SkippableFact] + public async Task TestPartitionerNonCoveredRange() + { + await using var miniCluster = await new MiniKuduClusterBuilder().BuildAsync(); + await using var client = miniCluster.CreateClient(); + + int numHashPartitions = 3; + + var builder = ClientTestUtil.GetBasicSchema() + .SetTableName(nameof(TestPartitionerNonCoveredRange)) + .AddHashPartitions(numHashPartitions, "key") + .SetRangePartitionColumns("key"); + + // Cover a range where 1000 <= key < 2000 + builder.AddRangePartition((lower, upper) => + { + lower.SetInt32("key", 1000); + upper.SetInt32("key", 2000); + }); + + var table = await client.CreateTableAsync(builder); + + var partitioner = await client.CreatePartitionerAsync(table); + + Assert.Throws(() => + { + var under = table.NewInsert(); + under.SetInt32("key", 999); + partitioner.GetRowTablet(under); + }); + + Assert.Throws(() => + { + var over = table.NewInsert(); + over.SetInt32("key", 2000); + partitioner.GetRowTablet(over); + }); + } + } +}