Skip to content

Commit

Permalink
Merge branch 'main' into kddubey/autofix/retry-stream
Browse files Browse the repository at this point in the history
  • Loading branch information
kddubey committed Jan 9, 2025
2 parents 6940277 + 18749fc commit 255c0fa
Show file tree
Hide file tree
Showing 7 changed files with 231 additions and 59 deletions.
90 changes: 44 additions & 46 deletions src/seer/anomaly_detection/accessors.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,67 +93,64 @@ def _hydrate_alert(
) -> DynamicAlert:
rand_offset = random.randint(0, 24)
timestamp_threshold = (datetime.now() - timedelta(days=28, hours=rand_offset)).timestamp()
num_old_points = 0
window_size = db_alert.anomaly_algo_data.get("window_size")
flags = []
scores = []
mp_suss = []
mp_fixed = []
ts = []
values = []
original_flags = []
use_suss = []

timeseries = db_alert.timeseries
n_points = len(timeseries)
ts = np.array([0.0] * n_points)
values = np.array([0.0] * n_points)
flags = [""] * n_points
scores = [0.0] * n_points
mp_suss = []
mp_fixed = []
original_flags = ["none"] * n_points
use_suss = [True] * n_points

# If the timeseries does not have both matrix profiles, then we only use the suss window
only_suss = False
if len(timeseries) > 0 and any(
only_suss = len(timeseries) > 0 and any(
point.anomaly_algo_data is not None
and "mp_suss" not in point.anomaly_algo_data
and "mp_fixed" not in point.anomaly_algo_data
for point in timeseries
):
only_suss = True
)

num_old_points = 0
window_size = db_alert.anomaly_algo_data.get("window_size")

for point in timeseries:
ts.append(point.timestamp.timestamp())
values.append(point.value)
flags.append(point.anomaly_type)
scores.append(point.anomaly_score)
for i, point in enumerate(timeseries):
ts[i] = point.timestamp.timestamp()
values[i] = point.value
flags[i] = point.anomaly_type
scores[i] = point.anomaly_score

if point.anomaly_algo_data is not None:
algo_data = MPTimeSeriesAnomalies.extract_algo_data(point.anomaly_algo_data)

if "mp_suss" in algo_data and algo_data["mp_suss"]:
mp_suss_data = [
algo_data["mp_suss"]["dist"],
algo_data["mp_suss"]["idx"],
algo_data["mp_suss"]["l_idx"],
algo_data["mp_suss"]["r_idx"],
]
mp_suss.append(mp_suss_data)
mp_suss.append(
[
algo_data["mp_suss"]["dist"],
algo_data["mp_suss"]["idx"],
algo_data["mp_suss"]["l_idx"],
algo_data["mp_suss"]["r_idx"],
]
)

if "mp_fixed" in algo_data and algo_data["mp_fixed"]:
mp_fixed_data = [
algo_data["mp_fixed"]["dist"],
algo_data["mp_fixed"]["idx"],
algo_data["mp_fixed"]["l_idx"],
algo_data["mp_fixed"]["r_idx"],
]
mp_fixed.append(mp_fixed_data)
use_suss.append(algo_data["use_suss"])
original_flags.append(algo_data["original_flag"])

if point.timestamp.timestamp() < timestamp_threshold:
num_old_points += 1
# Default value is "none" for original flags
if len(original_flags) < len(ts):
original_flags = ["none"] * (len(ts) - len(original_flags)) + original_flags
mp_fixed.append(
[
algo_data["mp_fixed"]["dist"],
algo_data["mp_fixed"]["idx"],
algo_data["mp_fixed"]["l_idx"],
algo_data["mp_fixed"]["r_idx"],
]
)

if i >= n_points - len(algo_data.get("original_flags", [])):
original_flags[i] = algo_data["original_flag"]
use_suss[i] = algo_data["use_suss"]

# Default value is True for use_suss
if len(use_suss) < len(ts):
use_suss = [True] * (len(ts) - len(use_suss)) + use_suss
if ts[i] < timestamp_threshold:
num_old_points += 1

anomalies = MPTimeSeriesAnomalies(
flags=flags,
Expand All @@ -175,14 +172,15 @@ def _hydrate_alert(
original_flags=original_flags,
use_suss=use_suss,
)

return DynamicAlert(
organization_id=db_alert.organization_id,
project_id=db_alert.project_id,
external_alert_id=db_alert.external_alert_id,
config=AnomalyDetectionConfig.model_validate(db_alert.config),
timeseries=MPTimeSeries(
timestamps=np.array(ts),
values=np.array(values),
timestamps=ts,
values=values,
),
anomalies=anomalies,
cleanup_config=CleanupConfig(
Expand Down
47 changes: 41 additions & 6 deletions src/seer/anomaly_detection/anomaly_detection.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
import logging
from typing import List, Tuple

Expand Down Expand Up @@ -61,6 +62,7 @@ def _batch_detect(
config: AnomalyDetectionConfig,
window_size: int | None = None,
algo_config: AlgoConfig = injected,
time_budget_ms: int | None = None,
) -> Tuple[List[TimeSeriesPoint], MPTimeSeriesAnomalies]:
"""
Stateless batch anomaly detection on entire timeseries as provided. In batch mode, analysis of a
Expand All @@ -84,12 +86,18 @@ def _batch_detect(
config,
algo_config=algo_config,
window_size=window_size,
time_budget_ms=(
time_budget_ms // 2 if time_budget_ms else None
), # Time budget is split between the two detection calls
)
anomalies_fixed = batch_detector.detect(
convert_external_ts_to_internal(timeseries),
config,
algo_config=algo_config,
window_size=algo_config.mp_fixed_window_size,
time_budget_ms=(
time_budget_ms // 2 if time_budget_ms else None
), # Time budget is split between the two detection calls
)
anomalies = DbAlertDataAccessor().combine_anomalies(
anomalies_suss, anomalies_fixed, [True] * len(timeseries)
Expand Down Expand Up @@ -261,6 +269,7 @@ def _combo_detect(
self,
ts_with_history: TimeSeriesWithHistory,
config: AnomalyDetectionConfig,
time_budget_ms: int | None = None,
) -> Tuple[List[TimeSeriesPoint], MPTimeSeriesAnomalies]:
"""
Stateless online anomaly detection for a part of a time series. This function takes two parts of the time series -
Expand Down Expand Up @@ -308,8 +317,15 @@ def _combo_detect(

# Run batch detect on history data
batch_detector = MPBatchAnomalyDetector()
historic_anomalies_suss = batch_detector.detect(historic, config)
historic_anomalies_fixed = batch_detector.detect(historic, config, window_size=10)
historic_anomalies_suss = batch_detector.detect(
historic, config, time_budget_ms=time_budget_ms // 2 if time_budget_ms else None
)
historic_anomalies_fixed = batch_detector.detect(
historic,
config,
window_size=10,
time_budget_ms=time_budget_ms // 2 if time_budget_ms else None,
)

# Run stream detection on current data
# SuSS Window
Expand Down Expand Up @@ -395,15 +411,18 @@ def detect_anomalies(self, request: DetectAnomaliesRequest) -> DetectAnomaliesRe
sentry_sdk.set_tag(AnomalyDetectionTags.ALERT_ID, request.context.id)
ts, anomalies = self._online_detect(request.context, request.config)
elif isinstance(request.context, TimeSeriesWithHistory):
ts, anomalies = self._combo_detect(request.context, request.config)
ts, anomalies = self._combo_detect(request.context, request.config, time_budget_ms=4500)
else:
ts, anomalies = self._batch_detect(request.context, request.config)
ts, anomalies = self._batch_detect(request.context, request.config, time_budget_ms=4500)
self._update_anomalies(ts, anomalies)
return DetectAnomaliesResponse(success=True, timeseries=ts)

@inject
def store_data(
self, request: StoreDataRequest, alert_data_accessor: AlertDataAccessor = injected
self,
request: StoreDataRequest,
alert_data_accessor: AlertDataAccessor = injected,
time_budget_ms: int = 4500, # Allocating 4.5 seconds as alerting system timesout after 5 seconds
) -> StoreDataResponse:
"""
Main entry point for storing time series data for an alert.
Expand Down Expand Up @@ -439,7 +458,23 @@ def store_data(
"config": request.config.model_dump(),
},
)
ts, anomalies = self._batch_detect(request.timeseries, request.config)
time_start = datetime.datetime.now()
ts, anomalies = self._batch_detect(
request.timeseries, request.config, time_budget_ms=time_budget_ms
)
time_elapsed = datetime.datetime.now() - time_start
time_allocated = datetime.timedelta(milliseconds=time_budget_ms)
if time_elapsed > time_allocated:
sentry_sdk.set_extra("time_taken_for_batch_detection", time_elapsed)
sentry_sdk.set_extra("time_allocated_for_batch_detection", time_allocated)
sentry_sdk.capture_message(
"batch_detection_took_too_long",
level="error",
)
raise ServerError(
"Batch detection took too long"
) # Abort without saving to avoid data going out of sync with alerting system.

alert_data_accessor.save_alert(
organization_id=request.organization_id,
project_id=request.project_id,
Expand Down
14 changes: 12 additions & 2 deletions src/seer/anomaly_detection/detectors/anomaly_detectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ class AnomalyDetector(BaseModel, abc.ABC):

@abc.abstractmethod
def detect(
self, timeseries: TimeSeries, ad_config: AnomalyDetectionConfig, algo_config: AlgoConfig
self,
timeseries: TimeSeries,
ad_config: AnomalyDetectionConfig,
algo_config: AlgoConfig,
time_budget_ms: int | None = None,
) -> TimeSeriesAnomalies:
return NotImplemented

Expand All @@ -53,6 +57,7 @@ def detect(
timeseries: TimeSeries,
ad_config: AnomalyDetectionConfig,
algo_config: AlgoConfig = injected,
time_budget_ms: int | None = None,
window_size: int | None = None,
) -> MPTimeSeriesAnomaliesSingleWindow:
"""
Expand All @@ -68,7 +73,9 @@ def detect(
Returns:
The input timeseries with an anomaly scores and a flag added
"""
return self._compute_matrix_profile(timeseries, ad_config, algo_config, window_size)
return self._compute_matrix_profile(
timeseries, ad_config, algo_config, window_size, time_budget_ms=time_budget_ms
)

@inject
@sentry_sdk.trace
Expand All @@ -78,6 +85,7 @@ def _compute_matrix_profile(
ad_config: AnomalyDetectionConfig,
algo_config: AlgoConfig,
window_size: int | None = None,
time_budget_ms: int | None = None,
ws_selector: WindowSizeSelector = injected,
scorer: MPScorer = injected,
mp_utils: MPUtils = injected,
Expand Down Expand Up @@ -123,6 +131,7 @@ def _compute_matrix_profile(
mp_dist=mp_dist,
ad_config=ad_config,
window_size=window_size,
time_budget_ms=time_budget_ms,
)
if flags_and_scores is None:
raise ServerError("Failed to score the matrix profile distance")
Expand Down Expand Up @@ -175,6 +184,7 @@ def detect(
timeseries: TimeSeries,
ad_config: AnomalyDetectionConfig,
algo_config: AlgoConfig = injected,
time_budget_ms: int | None = None,
scorer: MPScorer = injected,
mp_utils: MPUtils = injected,
) -> MPTimeSeriesAnomaliesSingleWindow:
Expand Down
30 changes: 28 additions & 2 deletions src/seer/anomaly_detection/detectors/mp_scorers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import abc
import datetime
import logging
from typing import Dict, List, Optional, Tuple

Expand All @@ -19,7 +20,7 @@
ThresholdType,
)
from seer.dependency_injection import inject, injected
from seer.exceptions import ClientError
from seer.exceptions import ClientError, ServerError
from seer.tags import AnomalyDetectionTags

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -49,6 +50,7 @@ def batch_score(
mp_dist: npt.NDArray[np.float64],
ad_config: AnomalyDetectionConfig,
window_size: int,
time_budget_ms: int | None = None,
algo_config: AlgoConfig = injected,
location_detector: LocationDetector = injected,
) -> Optional[FlagsAndScores]:
Expand Down Expand Up @@ -130,6 +132,7 @@ def batch_score(
mp_dist: npt.NDArray[np.float64],
ad_config: AnomalyDetectionConfig,
window_size: int,
time_budget_ms: int | None = None,
algo_config: AlgoConfig = injected,
location_detector: LocationDetector = injected,
) -> Optional[FlagsAndScores]:
Expand Down Expand Up @@ -222,6 +225,7 @@ def batch_score(
mp_dist: npt.NDArray[np.float64],
ad_config: AnomalyDetectionConfig,
window_size: int,
time_budget_ms: int | None = None,
algo_config: AlgoConfig = injected,
location_detector: LocationDetector = injected,
) -> FlagsAndScores:
Expand All @@ -243,16 +247,30 @@ def batch_score(
Configuration for anomaly detection
window_size: int
Size of the window used for matrix profile computation
time_budget_ms: int | None = None,
"""
scores: List[float] = []
flags: List[AnomalyFlags] = []
thresholds: List[List[Threshold]] = []
time_allocated = datetime.timedelta(milliseconds=time_budget_ms) if time_budget_ms else None
time_start = datetime.datetime.now()
# Compute score and anomaly flags
mp_dist_threshold = self._get_mp_dist_threshold(mp_dist, ad_config.sensitivity)
idx_to_detect_location_from = (
len(mp_dist) - algo_config.direction_detection_num_timesteps_in_batch_mode
)
batch_size = 10 if len(mp_dist) > 10 else 1
for i, val in enumerate(mp_dist):
if time_allocated is not None and i % batch_size == 0:
time_elapsed = datetime.datetime.now() - time_start
if time_allocated is not None and time_elapsed > time_allocated:
sentry_sdk.set_extra("time_taken_for_batch_detection", time_elapsed)
sentry_sdk.set_extra("time_allocated_for_batch_detection", time_allocated)
sentry_sdk.capture_message(
"batch_detection_took_too_long",
level="error",
)
raise ServerError("Batch detection took too long")
scores.append(0.0 if np.isnan(val) or np.isinf(val) else val - mp_dist_threshold)
cur_thresholds = [
Threshold(
Expand Down Expand Up @@ -438,12 +456,20 @@ def batch_score(
mp_dist: npt.NDArray[np.float64],
ad_config: AnomalyDetectionConfig,
window_size: int,
time_budget_ms: int | None = None,
algo_config: AlgoConfig = injected,
location_detector: LocationDetector = injected,
) -> Optional[FlagsAndScores]:
for scorer in self.scorers:
flags_and_scores = scorer.batch_score(
values, timestamps, mp_dist, ad_config, window_size, algo_config, location_detector
values,
timestamps,
mp_dist,
ad_config,
window_size,
time_budget_ms,
algo_config,
location_detector,
)
if flags_and_scores is not None:
return flags_and_scores
Expand Down
Loading

0 comments on commit 255c0fa

Please sign in to comment.