-
Notifications
You must be signed in to change notification settings - Fork 175
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Circuit breakers #566
base: develop
Are you sure you want to change the base?
Circuit breakers #566
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
from baseplate.lib.circuit_breaker.errors import BreakerTrippedError | ||
from baseplate.lib.circuit_breaker.factory import breaker_box_from_config | ||
from baseplate.lib.circuit_breaker.factory import CircuitBreakerClientWrapperFactory | ||
|
||
|
||
__all__ = [ | ||
"breaker_box_from_config", | ||
"BreakerTrippedError", | ||
"CircuitBreakerClientWrapperFactory", | ||
] |
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,127 @@ | ||||||||
from collections import deque | ||||||||
from datetime import datetime | ||||||||
from datetime import timedelta | ||||||||
from enum import Enum | ||||||||
from math import ceil | ||||||||
from random import random | ||||||||
from typing import Deque | ||||||||
|
||||||||
|
||||||||
class BreakerState(Enum): | ||||||||
WORKING = "working" | ||||||||
TRIPPED = "tripped" | ||||||||
# trip immediately after failure | ||||||||
TESTING = "testing" | ||||||||
|
||||||||
|
||||||||
class Breaker: | ||||||||
"""Circuit breaker. | ||||||||
|
||||||||
The circuit breaker has 3 states: | ||||||||
* WORKING (closed) | ||||||||
* TRIPPED (open) | ||||||||
* TESTING (half open) | ||||||||
|
||||||||
During normal operation the circuit breaker is in the WORKING state. | ||||||||
|
||||||||
When the number of failures exceeds the threshold the breaker moves to the TRIPPED state. It | ||||||||
stays in this state for the timeout period. | ||||||||
|
||||||||
After the timeout period passes the breaker moves to the TESTING state. If the next attempt | ||||||||
is successful the breaker moves to the WORKING state. If the next attempt is a failure the | ||||||||
breaker moves back to the TRIPPED state. | ||||||||
|
||||||||
:param name: full name/path of the circuit breaker | ||||||||
:param samples: number of previous results used to calculate the trip failure ratio | ||||||||
:param trip_failure_percent: the minimum ratio of sampled failed results to trip the breaker | ||||||||
:param trip_for: how long to remain tripped before resetting the breaker | ||||||||
:param fuzz_ratio: how much to randomly add/subtract to the trip_for time | ||||||||
""" | ||||||||
|
||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔕 My linter complains about variables initialized outside init, feel free to disregard
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah that's a little funky that it's only initialized in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it'd be good to add the type annotation just so everything knows it can expect the attribute to be there |
||||||||
_state: BreakerState = BreakerState.WORKING | ||||||||
_is_bucket_full: bool = False | ||||||||
|
||||||||
def __init__( | ||||||||
self, | ||||||||
name: str, | ||||||||
samples: int = 20, | ||||||||
trip_failure_ratio: float = 0.5, | ||||||||
trip_for: timedelta = timedelta(minutes=1), | ||||||||
fuzz_ratio: float = 0.1, | ||||||||
): | ||||||||
self.name = name | ||||||||
self.samples = samples | ||||||||
self.results_bucket: Deque = deque([], self.samples) | ||||||||
self.tripped_until: datetime = datetime.utcnow() | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔕 would also happens to be a bit faster
|
||||||||
self.trip_threshold = ceil(trip_failure_ratio * samples) | ||||||||
self.trip_for = trip_for | ||||||||
self.fuzz_ratio = fuzz_ratio | ||||||||
self.reset() | ||||||||
|
||||||||
@property | ||||||||
def state(self) -> BreakerState: | ||||||||
if self._state == BreakerState.TRIPPED and (datetime.utcnow() >= self.tripped_until): | ||||||||
self.set_state(BreakerState.TESTING) | ||||||||
|
||||||||
return self._state | ||||||||
|
||||||||
def register_attempt(self, success: bool) -> None: | ||||||||
"""Register a success or failure. | ||||||||
|
||||||||
This may cause the state to change. | ||||||||
|
||||||||
:param success: Whether the attempt was a success (not a failure). | ||||||||
""" | ||||||||
# This breaker has already tripped, so ignore the "late" registrations | ||||||||
if self.state == BreakerState.TRIPPED: | ||||||||
return | ||||||||
|
||||||||
if not success: | ||||||||
self.failures += 1 | ||||||||
|
||||||||
if self._is_bucket_full and not self.results_bucket[0]: | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔕 can we grab this comment or put something like it here? |
||||||||
self.failures -= 1 | ||||||||
|
||||||||
self.results_bucket.append(success) | ||||||||
|
||||||||
if not self._is_bucket_full and (len(self.results_bucket) == self.samples): | ||||||||
self._is_bucket_full = True | ||||||||
|
||||||||
if success and (self.state == BreakerState.TESTING): | ||||||||
self.reset() | ||||||||
return | ||||||||
|
||||||||
if self.state == BreakerState.TESTING: | ||||||||
# failure in the TESTING state trips the breaker immediately | ||||||||
self.trip() | ||||||||
return | ||||||||
|
||||||||
if not self._is_bucket_full: | ||||||||
# no need to check anything if we haven't recorded enough samples | ||||||||
return | ||||||||
|
||||||||
# check for trip condition | ||||||||
if self.failures >= self.trip_threshold: | ||||||||
self.trip() | ||||||||
|
||||||||
def set_state(self, state: BreakerState) -> None: | ||||||||
self._state = state | ||||||||
|
||||||||
def trip(self) -> None: | ||||||||
"""Change state to TRIPPED and set the timeout after which state will change to TESTING.""" | ||||||||
if self.fuzz_ratio > 0.0: | ||||||||
fuzz_ratio = ((2 * random()) - 1.0) * self.fuzz_ratio | ||||||||
fuzz_ratio = 1 + fuzz_ratio | ||||||||
else: | ||||||||
fuzz_ratio = 1.0 | ||||||||
|
||||||||
self.tripped_until = datetime.utcnow() + (self.trip_for * fuzz_ratio) | ||||||||
self.set_state(BreakerState.TRIPPED) | ||||||||
|
||||||||
def reset(self) -> None: | ||||||||
"""Reset to freshly initialized WORKING state.""" | ||||||||
self.results_bucket.clear() | ||||||||
self.failures = 0 | ||||||||
self._is_bucket_full = False | ||||||||
self.tripped_until = datetime.utcnow() | ||||||||
self.set_state(BreakerState.WORKING) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
class BreakerTrippedError(Exception): | ||
def __init__(self) -> None: | ||
default_message = "Breaker tripped!" | ||
super().__init__(default_message) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,164 @@ | ||
import logging | ||
|
||
from contextlib import contextmanager | ||
from datetime import timedelta | ||
from typing import Any | ||
from typing import Dict | ||
from typing import Iterator | ||
from typing import Tuple | ||
from typing import Type | ||
|
||
from baseplate import Span | ||
from baseplate.clients import ContextFactory | ||
from baseplate.lib import config | ||
from baseplate.lib.circuit_breaker.breaker import Breaker | ||
from baseplate.lib.circuit_breaker.breaker import BreakerState | ||
from baseplate.lib.circuit_breaker.errors import BreakerTrippedError | ||
|
||
|
||
logger = logging.getLogger(__name__) | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It'd be nice to add a |
||
|
||
class CircuitBreakerClientWrapperFactory(ContextFactory): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should you also be exposing a |
||
"""Provide an object combining a client and circuit breaker for use with the client. | ||
|
||
When attached to the baseplate `RequestContext` can be used like: | ||
|
||
``` | ||
breakable_exceptions = (...) # exceptions indicating the service is unhealthy | ||
with context.breaker_wrapped_client.breaker_context("identifier", breakable_exceptions) as svc: | ||
svc.get_something() | ||
``` | ||
""" | ||
|
||
def __init__(self, client_factory: ContextFactory, breaker_box: "CircuitBreakerBox"): | ||
self.client_factory = client_factory | ||
self.breaker_box = breaker_box | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so in this way, people will have to initialize the breaker_box in. init file and pass it in to factory? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah exactly. that's similar to how we do other clients in baseplate.py, and will make sure that all workers/greenlets share the same breakers. |
||
|
||
def make_object_for_context(self, name: str, span: Span) -> Any: | ||
client = self.client_factory.make_object_for_context(name, span) | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. extra line? |
||
return CircuitBreakerWrappedClient(span, self.breaker_box, client) | ||
|
||
|
||
class CircuitBreakerWrappedClient: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Either way may be ideally out of scope for this PR but it could be nice to have a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wrote a little implementation in our codebase for this, works fine. I wonder if a patch to integrate a circuit breaker with the thrift client would be accepted after this merges. Also, since retry logic can cause N requests to be sent before it lets the last exception bubble up to the breaker any approach not integrated with the thrift client doesn't really have an accurate picture of failures. |
||
def __init__(self, span: Span, breaker_box: "CircuitBreakerBox", client: Any): | ||
self.span = span | ||
self.breaker_box = breaker_box | ||
self._client = client | ||
|
||
@property | ||
def client(self) -> Any: | ||
"""Return the raw, undecorated client""" | ||
return self._client | ||
|
||
@contextmanager | ||
def breaker_context( | ||
self, operation: str, breakable_exceptions: Tuple[Type[Exception]] | ||
) -> Iterator[Any]: | ||
"""Get a context manager to perform client operations within. | ||
|
||
Yields the client to use within the breaker context. | ||
|
||
The context manager manages the Breaker's state and registers | ||
successes and failures. | ||
|
||
When the `Breaker` is in TRIPPED state all calls to this context | ||
manager will raise a `BreakerTrippedError` exception. | ||
|
||
:param operation: The operation name, used to get a specific `Breaker`. | ||
:param breakable_exceptions: Tuple of exceptions that count as failures | ||
""" | ||
breaker = self.breaker_box.get(operation) | ||
|
||
if breaker.state == BreakerState.TRIPPED: | ||
logger.debug("Circuit breaker '%s' tripped; request failed fast", breaker.name) | ||
self.span.incr_tag(f"breakers.{breaker.name}.request.fail_fast") | ||
raise BreakerTrippedError() | ||
|
||
success: bool = True | ||
try: | ||
# yield to the application code that will use | ||
# the client covered by this breaker. if this | ||
# raises an exception we will catch it here. | ||
yield self._client | ||
except breakable_exceptions: | ||
# only known exceptions in `breakable_exceptions` should trigger | ||
# opening the circuit. the client call may raise exceptions that | ||
# are a meaningful response, like defined thrift IDL exceptions. | ||
success = False | ||
raise | ||
finally: | ||
prev = breaker.state | ||
breaker.register_attempt(success) | ||
final = breaker.state | ||
if prev != final: | ||
self.span.incr_tag( | ||
f"breakers.{breaker.name}.state_change.{prev.value}.{final.value}" | ||
) | ||
|
||
|
||
class CircuitBreakerBox: | ||
"""Container for a client's `Breaker`s. | ||
|
||
Will lazily create `Breaker`s for each operation as needed. There | ||
is no global coordination across operations--each `Breaker` is | ||
isolated and does not consider the state or failure rates in other | ||
`Breaker`s. | ||
|
||
:param name: The base `Breaker` name. The full name is like "name.operation". | ||
:param samples: See `Breaker` | ||
:param trip_failure_ratio: See `Breaker` | ||
:param trip_for: See `Breaker` | ||
:param fuzz_ratio: See `Breaker` | ||
""" | ||
|
||
def __init__( | ||
self, | ||
name: str, | ||
samples: int, | ||
trip_failure_ratio: float, | ||
trip_for: timedelta, | ||
fuzz_ratio: float, | ||
): | ||
self.name = name | ||
self.samples = samples | ||
self.trip_failure_ratio = trip_failure_ratio | ||
self.trip_for = trip_for | ||
self.fuzz_ratio = fuzz_ratio | ||
self.breaker_box: Dict[str, Breaker] = {} | ||
|
||
def get(self, operation: str) -> Breaker: | ||
# lazy add breaker into breaker box | ||
if operation not in self.breaker_box: | ||
breaker = Breaker( | ||
name=f"{self.name}.{operation}", | ||
samples=self.samples, | ||
trip_failure_ratio=self.trip_failure_ratio, | ||
trip_for=self.trip_for, | ||
fuzz_ratio=self.fuzz_ratio, | ||
) | ||
self.breaker_box[operation] = breaker | ||
return self.breaker_box[operation] | ||
|
||
|
||
def breaker_box_from_config( | ||
app_config: config.RawConfig, name: str, prefix: str = "breaker.", | ||
) -> CircuitBreakerBox: | ||
"""Make a CircuitBreakerBox from a configuration dictionary.""" | ||
# TODO: fix default handling here. if these are not set | ||
# they will be None and passed through to the Breaker() constructor | ||
# which will override the defaults set in Breaker() | ||
assert prefix.endswith(".") | ||
parser = config.SpecParser( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what is difference between There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. https://github.com/reddit/baseplate.py/blob/develop/baseplate/lib/config.py#L625-L635
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. but in the parser.parse{"", {prefix: }} can set the |
||
{ | ||
"samples": config.Optional(config.Integer), | ||
"trip_failure_ratio": config.Optional(config.Float), | ||
"trip_for": config.Optional(config.Timespan), | ||
"fuzz_ratio": config.Optional(config.Float), | ||
} | ||
) | ||
options = parser.parse(prefix[:-1], app_config) | ||
return CircuitBreakerBox( | ||
name, options.samples, options.trip_failure_ratio, options.trip_for, options.fuzz_ratio | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
given the sequence of states described below, it's a bit weird that testing is after tripped in this enum