Skip to content

Commit

Permalink
Zap DatabaseAdapter
Browse files Browse the repository at this point in the history
Rule of 3
  • Loading branch information
julik committed Feb 28, 2024
1 parent 585576c commit 99ab0f8
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 65 deletions.
1 change: 0 additions & 1 deletion lib/pecorino.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ module Pecorino

module Adapters
autoload :MemoryAdapter, "pecorino/adapters/memory_adapter"
autoload :DatabaseAdapter, "pecorino/adapters/database_adapter"
autoload :PostgresAdapter, "pecorino/adapters/postgres_adapter"
autoload :SqliteAdapter, "pecorino/adapters/sqlite_adapter"
autoload :RedisAdapter, "pecorino/adapters/redis_adapter"
Expand Down
36 changes: 0 additions & 36 deletions lib/pecorino/adapters/database_adapter.rb

This file was deleted.

48 changes: 35 additions & 13 deletions lib/pecorino/adapters/postgres_adapter.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
# frozen_string_literal: true

class Pecorino::Adapters::PostgresAdapter < Pecorino::Adapters::DatabaseAdapter
class Pecorino::Adapters::PostgresAdapter
def initialize(model_class)
@model_class = model_class
end

def state(key:, capacity:, leak_rate:)
query_params = {
key: key.to_s,
Expand All @@ -10,7 +14,7 @@ def state(key:, capacity:, leak_rate:)
# The `level` of the bucket is what got stored at `last_touched_at` time, and we can
# extrapolate from it to see how many tokens have leaked out since `last_touched_at` -
# we don't need to UPDATE the value in the bucket here
sql = model_class.sanitize_sql_array([<<~SQL, query_params])
sql = @model_class.sanitize_sql_array([<<~SQL, query_params])
SELECT
GREATEST(
0.0, LEAST(
Expand All @@ -26,7 +30,7 @@ def state(key:, capacity:, leak_rate:)

# If the return value of the query is a NULL it means no such bucket exists,
# so we assume the bucket is empty
current_level = model_class.connection.uncached { model_class.connection.select_value(sql) } || 0.0
current_level = @model_class.connection.uncached { @model_class.connection.select_value(sql) } || 0.0
[current_level, capacity - current_level.abs < 0.01]
end

Expand All @@ -45,7 +49,7 @@ def add_tokens(key:, capacity:, leak_rate:, n_tokens:)
fillup: n_tokens.to_f
}

sql = model_class.sanitize_sql_array([<<~SQL, query_params])
sql = @model_class.sanitize_sql_array([<<~SQL, query_params])
INSERT INTO pecorino_leaky_buckets AS t
(key, last_touched_at, may_be_deleted_after, level)
VALUES
Expand Down Expand Up @@ -79,7 +83,7 @@ def add_tokens(key:, capacity:, leak_rate:, n_tokens:)
# query as a repeat (since we use "select_one" for the RETURNING bit) and will not call into Postgres
# correctly, thus the clock_timestamp() value would be frozen between calls. We don't want that here.
# See https://stackoverflow.com/questions/73184531/why-would-postgres-clock-timestamp-freeze-inside-a-rails-unit-test
upserted = model_class.connection.uncached { model_class.connection.select_one(sql) }
upserted = @model_class.connection.uncached { @model_class.connection.select_one(sql) }
capped_level_after_fillup, at_capacity = upserted.fetch("level"), upserted.fetch("at_capacity")
[capped_level_after_fillup, at_capacity]
end
Expand All @@ -99,7 +103,7 @@ def add_tokens_conditionally(key:, capacity:, leak_rate:, n_tokens:)
fillup: n_tokens.to_f
}

sql = model_class.sanitize_sql_array([<<~SQL, query_params])
sql = @model_class.sanitize_sql_array([<<~SQL, query_params])
WITH pre AS MATERIALIZED (
SELECT
-- Note the double clamping here. First we clamp the "current level - leak" to not go below zero,
Expand Down Expand Up @@ -137,7 +141,7 @@ def add_tokens_conditionally(key:, capacity:, leak_rate:, n_tokens:)
level AS level_after
SQL

upserted = model_class.connection.uncached { model_class.connection.select_one(sql) }
upserted = @model_class.connection.uncached { @model_class.connection.select_one(sql) }
level_after = upserted.fetch("level_after")
level_before = upserted.fetch("level_before")
[level_after, level_after >= capacity, level_after != level_before]
Expand All @@ -146,7 +150,7 @@ def add_tokens_conditionally(key:, capacity:, leak_rate:, n_tokens:)
def set_block(key:, block_for:)
raise ArgumentError, "block_for must be positive" unless block_for > 0
query_params = {key: key.to_s, block_for: block_for.to_f}
block_set_query = model_class.sanitize_sql_array([<<~SQL, query_params])
block_set_query = @model_class.sanitize_sql_array([<<~SQL, query_params])
INSERT INTO pecorino_blocks AS t
(key, blocked_until)
VALUES
Expand All @@ -155,18 +159,36 @@ def set_block(key:, block_for:)
blocked_until = GREATEST(EXCLUDED.blocked_until, t.blocked_until)
RETURNING blocked_until
SQL
model_class.connection.uncached { model_class.connection.select_value(block_set_query) }
@model_class.connection.uncached { @model_class.connection.select_value(block_set_query) }
end

def blocked_until(key:)
block_check_query = model_class.sanitize_sql_array([<<~SQL, key])
block_check_query = @model_class.sanitize_sql_array([<<~SQL, key])
SELECT blocked_until FROM pecorino_blocks WHERE key = ? AND blocked_until >= clock_timestamp() LIMIT 1
SQL
model_class.connection.uncached { model_class.connection.select_value(block_check_query) }
@model_class.connection.uncached { @model_class.connection.select_value(block_check_query) }
end

def prune
model_class.connection.execute("DELETE FROM pecorino_blocks WHERE blocked_until < NOW()")
model_class.connection.execute("DELETE FROM pecorino_leaky_buckets WHERE may_be_deleted_after < NOW()")
@model_class.connection.execute("DELETE FROM pecorino_blocks WHERE blocked_until < NOW()")
@model_class.connection.execute("DELETE FROM pecorino_leaky_buckets WHERE may_be_deleted_after < NOW()")
end

def create_tables(active_record_schema)
active_record_schema.create_table :pecorino_leaky_buckets, id: :uuid do |t|
t.string :key, null: false
t.float :level, null: false
t.datetime :last_touched_at, null: false
t.datetime :may_be_deleted_after, null: false
end
active_record_schema.add_index :pecorino_leaky_buckets, [:key], unique: true
active_record_schema.add_index :pecorino_leaky_buckets, [:may_be_deleted_after]

active_record_schema.create_table :pecorino_blocks, id: :uuid do |t|
t.string :key, null: false
t.datetime :blocked_until, null: false
end
active_record_schema.add_index :pecorino_blocks, [:key], unique: true
active_record_schema.add_index :pecorino_blocks, [:blocked_until]
end
end
52 changes: 37 additions & 15 deletions lib/pecorino/adapters/sqlite_adapter.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
# frozen_string_literal: true

class Pecorino::Adapters::SqliteAdapter < Pecorino::Adapters::DatabaseAdapter
class Pecorino::Adapters::SqliteAdapter
def initialize(model_class)
@model_class = model_class
end

def state(key:, capacity:, leak_rate:)
# With a server database, it is really important to use the clock of the database itself so
# that concurrent requests will see consistent bucket level calculations. Since SQLite is
Expand All @@ -17,7 +21,7 @@ def state(key:, capacity:, leak_rate:)
# The `level` of the bucket is what got stored at `last_touched_at` time, and we can
# extrapolate from it to see how many tokens have leaked out since `last_touched_at` -
# we don't need to UPDATE the value in the bucket here
sql = model_class.sanitize_sql_array([<<~SQL, query_params])
sql = @model_class.sanitize_sql_array([<<~SQL, query_params])
SELECT
MAX(
0.0, MIN(
Expand All @@ -33,7 +37,7 @@ def state(key:, capacity:, leak_rate:)

# If the return value of the query is a NULL it means no such bucket exists,
# so we assume the bucket is empty
current_level = model_class.connection.uncached { model_class.connection.select_value(sql) } || 0.0
current_level = @model_class.connection.uncached { @model_class.connection.select_value(sql) } || 0.0
[current_level, capacity - current_level.abs < 0.01]
end

Expand All @@ -54,7 +58,7 @@ def add_tokens(key:, capacity:, leak_rate:, n_tokens:)
id: SecureRandom.uuid # SQLite3 does not autogenerate UUIDs
}

sql = model_class.sanitize_sql_array([<<~SQL, query_params])
sql = @model_class.sanitize_sql_array([<<~SQL, query_params])
INSERT INTO pecorino_leaky_buckets AS t
(id, key, last_touched_at, may_be_deleted_after, level)
VALUES
Expand Down Expand Up @@ -89,7 +93,7 @@ def add_tokens(key:, capacity:, leak_rate:, n_tokens:)
# query as a repeat (since we use "select_one" for the RETURNING bit) and will not call into Postgres
# correctly, thus the clock_timestamp() value would be frozen between calls. We don't want that here.
# See https://stackoverflow.com/questions/73184531/why-would-postgres-clock-timestamp-freeze-inside-a-rails-unit-test
upserted = model_class.connection.uncached { model_class.connection.select_one(sql) }
upserted = @model_class.connection.uncached { @model_class.connection.select_one(sql) }
capped_level_after_fillup, one_if_did_overflow = upserted.fetch("level"), upserted.fetch("did_overflow")
[capped_level_after_fillup, one_if_did_overflow == 1]
end
Expand All @@ -114,7 +118,7 @@ def add_tokens_conditionally(key:, capacity:, leak_rate:, n_tokens:)
# Sadly with SQLite we need to do an INSERT first, because otherwise the inserted row is visible
# to the WITH clause, so we cannot combine the initial fillup and the update into one statement.
# This shuld be fine however since we will suppress the INSERT on a key conflict
insert_sql = model_class.sanitize_sql_array([<<~SQL, query_params])
insert_sql = @model_class.sanitize_sql_array([<<~SQL, query_params])
INSERT INTO pecorino_leaky_buckets AS t
(id, key, last_touched_at, may_be_deleted_after, level)
VALUES
Expand All @@ -130,9 +134,9 @@ def add_tokens_conditionally(key:, capacity:, leak_rate:, n_tokens:)
-- so that it can't be deleted between our INSERT and our UPDATE
may_be_deleted_after = EXCLUDED.may_be_deleted_after
SQL
model_class.connection.execute(insert_sql)
@model_class.connection.execute(insert_sql)

sql = model_class.sanitize_sql_array([<<~SQL, query_params])
sql = @model_class.sanitize_sql_array([<<~SQL, query_params])
-- With SQLite MATERIALIZED has to be used so that level_post is calculated before the UPDATE takes effect
WITH pre(level_post_with_uncapped_fillup, level_post) AS MATERIALIZED (
SELECT
Expand All @@ -156,7 +160,7 @@ def add_tokens_conditionally(key:, capacity:, leak_rate:, n_tokens:)
level AS level_after
SQL

upserted = model_class.connection.uncached { model_class.connection.select_one(sql) }
upserted = @model_class.connection.uncached { @model_class.connection.select_one(sql) }
level_after = upserted.fetch("level_after")
level_before = upserted.fetch("level_before")
[level_after, level_after >= capacity, level_after != level_before]
Expand All @@ -165,7 +169,7 @@ def add_tokens_conditionally(key:, capacity:, leak_rate:, n_tokens:)
def set_block(key:, block_for:)
raise ArgumentError, "block_for must be positive" unless block_for > 0
query_params = {id: SecureRandom.uuid, key: key.to_s, block_for: block_for.to_f, now_s: Time.now.to_f}
block_set_query = model_class.sanitize_sql_array([<<~SQL, query_params])
block_set_query = @model_class.sanitize_sql_array([<<~SQL, query_params])
INSERT INTO pecorino_blocks AS t
(id, key, blocked_until)
VALUES
Expand All @@ -174,27 +178,45 @@ def set_block(key:, block_for:)
blocked_until = MAX(EXCLUDED.blocked_until, t.blocked_until)
RETURNING blocked_until;
SQL
blocked_until_s = model_class.connection.uncached { model_class.connection.select_value(block_set_query) }
blocked_until_s = @model_class.connection.uncached { @model_class.connection.select_value(block_set_query) }
Time.at(blocked_until_s)
end

def blocked_until(key:)
now_s = Time.now.to_f
block_check_query = model_class.sanitize_sql_array([<<~SQL, {now_s: now_s, key: key}])
block_check_query = @model_class.sanitize_sql_array([<<~SQL, {now_s: now_s, key: key}])
SELECT
blocked_until
FROM
pecorino_blocks
WHERE
key = :key AND blocked_until >= :now_s LIMIT 1
SQL
blocked_until_s = model_class.connection.uncached { model_class.connection.select_value(block_check_query) }
blocked_until_s = @model_class.connection.uncached { @model_class.connection.select_value(block_check_query) }
blocked_until_s && Time.at(blocked_until_s)
end

def prune
now_s = Time.now.to_f
model_class.connection.execute("DELETE FROM pecorino_blocks WHERE blocked_until < ?", now_s)
model_class.connection.execute("DELETE FROM pecorino_leaky_buckets WHERE may_be_deleted_after < ?", now_s)
@model_class.connection.execute("DELETE FROM pecorino_blocks WHERE blocked_until < ?", now_s)
@model_class.connection.execute("DELETE FROM pecorino_leaky_buckets WHERE may_be_deleted_after < ?", now_s)
end

def create_tables(active_record_schema)
active_record_schema.create_table :pecorino_leaky_buckets, id: :uuid do |t|
t.string :key, null: false
t.float :level, null: false
t.datetime :last_touched_at, null: false
t.datetime :may_be_deleted_after, null: false
end
active_record_schema.add_index :pecorino_leaky_buckets, [:key], unique: true
active_record_schema.add_index :pecorino_leaky_buckets, [:may_be_deleted_after]

active_record_schema.create_table :pecorino_blocks, id: :uuid do |t|
t.string :key, null: false
t.datetime :blocked_until, null: false
end
active_record_schema.add_index :pecorino_blocks, [:key], unique: true
active_record_schema.add_index :pecorino_blocks, [:blocked_until]
end
end

0 comments on commit 99ab0f8

Please sign in to comment.