Skip to content

Commit

Permalink
Improve interval too short warnings (#93)
Browse files Browse the repository at this point in the history
  • Loading branch information
pederhan authored Jan 14, 2025
1 parent 18349a7 commit 597f736
Show file tree
Hide file tree
Showing 5 changed files with 198 additions and 51 deletions.
64 changes: 51 additions & 13 deletions tests/test_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import time

import pytest
from inline_snapshot import snapshot

from zabbix_auto_config.exceptions import ZACException
from zabbix_auto_config.processing import BaseProcess
Expand Down Expand Up @@ -144,13 +145,19 @@ def test_state_asdict_ok(use_manager: bool) -> None:
else:
state = State()
state.set_ok()
assert state.asdict() == {
"ok": True,
"error": None,
"error_type": None,
"error_count": 0,
"error_time": None,
}
assert state.asdict() == snapshot(
{
"ok": True,
"error": None,
"error_type": None,
"error_time": None,
"error_count": 0,
"execution_count": 0,
"total_duration": datetime.timedelta(0),
"max_duration": datetime.timedelta(0),
"last_duration_warning": None,
}
)


class CustomException(Exception):
Expand All @@ -174,9 +181,40 @@ def test_state_asdict_error(use_manager: bool) -> None:
assert post >= d["error_time"] >= pre
d.pop("error_time")

assert d == {
"ok": False,
"error": "Test error",
"error_type": "CustomException",
"error_count": 1,
}
assert d == snapshot(
{
"ok": False,
"error": "Test error",
"error_type": "CustomException",
"error_count": 1,
"execution_count": 0,
"total_duration": datetime.timedelta(0),
"max_duration": datetime.timedelta(0),
"last_duration_warning": None,
}
)


def test_state_record_execution() -> None:
state = State()

# Add 1 second execution
state.record_execution(datetime.timedelta(seconds=1))
assert state.execution_count == 1
assert state.total_duration == datetime.timedelta(seconds=1)
assert state.max_duration == datetime.timedelta(seconds=1)
assert state.avg_duration == datetime.timedelta(seconds=1 / 1)

# Add 2 second execution
state.record_execution(datetime.timedelta(seconds=2))
assert state.execution_count == 2
assert state.total_duration == datetime.timedelta(seconds=3)
assert state.max_duration == datetime.timedelta(seconds=2)
assert state.avg_duration == datetime.timedelta(seconds=3 / 2)

# Add 1 second execution
state.record_execution(datetime.timedelta(seconds=1))
assert state.execution_count == 3
assert state.total_duration == datetime.timedelta(seconds=4)
assert state.max_duration == datetime.timedelta(seconds=2)
assert state.avg_duration == datetime.timedelta(seconds=4 / 3)
34 changes: 34 additions & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import logging
from datetime import timedelta
from ipaddress import IPv4Address
from ipaddress import IPv6Address
from pathlib import Path
Expand Down Expand Up @@ -235,3 +236,36 @@ def test_mapping_values_with_prefix_no_prefix_separator(
)
assert res == {"[email protected]": ["Foouser1-primary", "Foouser1-secondary"]}
assert caplog.text.count("WARNING") == 2


@pytest.mark.parametrize(
"inp,expected",
[
# Normal cases
(timedelta(hours=1, minutes=30, seconds=45), "01:30:45"),
(timedelta(hours=0, minutes=5, seconds=30), "00:05:30"),
(timedelta(hours=24, minutes=0, seconds=0), "24:00:00"),
# Zero and near-zero cases
(timedelta(seconds=0), "00:00:00"),
(timedelta(microseconds=999999), "00:00:00"), # Should truncate to zero
# Negative durations
(timedelta(hours=-1, minutes=-30, seconds=-45), "-01:30:45"),
(timedelta(hours=-2), "-02:00:00"),
# Large durations (multiple days)
(timedelta(days=1, hours=2, minutes=3, seconds=4), "26:03:04"),
(timedelta(days=2), "48:00:00"),
# Fractional components
(timedelta(hours=1, minutes=30, seconds=45, microseconds=500000), "01:30:45"),
# None case
(None, "00:00:00"),
# Edge cases around zero
(timedelta(microseconds=1), "00:00:00"),
(timedelta(microseconds=-1), "00:00:00"),
# Mixed positive/negative components
(timedelta(hours=1, minutes=-30), "00:30:00"), # Python normalizes this
# Large negative durations
(timedelta(days=-2, hours=-3), "-51:00:00"),
],
)
def test_format_timedelta(inp: timedelta, expected: str):
assert utils.format_timedelta(inp) == expected
52 changes: 35 additions & 17 deletions zabbix_auto_config/processing.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from __future__ import annotations

import datetime
import itertools
import logging
import multiprocessing
Expand All @@ -15,6 +14,8 @@
import time
from collections import Counter
from collections import defaultdict
from datetime import datetime
from datetime import timedelta
from enum import Enum
from typing import TYPE_CHECKING
from typing import Any
Expand Down Expand Up @@ -71,7 +72,7 @@ def __init__(self, name: str, state: State) -> None:
self.state = state

self.update_interval = 1
self.next_update = datetime.datetime.now()
self.next_update = datetime.now()

self.state.set_ok()
self.stop_event = multiprocessing.Event()
Expand All @@ -87,17 +88,17 @@ def run(self) -> None:
self.stop()
break

if self.next_update > datetime.datetime.now():
if self.next_update > datetime.now():
time.sleep(1)
continue

self.next_update = datetime.datetime.now() + datetime.timedelta(
start_time = datetime.now()
self.next_update = datetime.now() + timedelta(
seconds=self.update_interval
)

try:
self.work()
self.state.set_ok()
except Exception as e:
# These are the error types we handle ourselves then continue
if isinstance(e, httpx.TimeoutException):
Expand All @@ -116,16 +117,35 @@ def run(self) -> None:
else:
raise e # all other exceptions are fatal
self.state.set_error(e)
else:
self.state.set_ok()

work_duration = datetime.now() - start_time
self.state.record_execution(work_duration)

# Only warn about long-running tasks if:
# 1. Interval is non-zero (not continuous processing)
# 2. Work took longer than the interval
# 3. Haven't warned in last hour
if (
self.update_interval > 1
and self.next_update < datetime.datetime.now()
self.update_interval > 0
and work_duration.total_seconds() > self.update_interval
and (
not self.state.last_duration_warning
or datetime.now() - self.state.last_duration_warning
> timedelta(hours=1)
)
):
# Only log warning when update_interval is actually changed from default
logging.warning(
"Next update is in the past. Interval too short? Lagging behind? Next update was: %s",
self.next_update.isoformat(timespec="seconds"),
"Work duration (%s) exceeded update interval (%s). "
"Stats - Avg duration: %s, Max duration: %s, Updates: %d",
utils.format_timedelta(work_duration),
utils.format_timedelta(timedelta(seconds=self.update_interval)),
utils.format_timedelta(self.state.avg_duration),
utils.format_timedelta(self.state.max_duration),
self.state.execution_count,
)
self.state.last_duration_warning = datetime.now()

logging.info("Process exiting")

Expand Down Expand Up @@ -185,7 +205,7 @@ def __init__(

# Repeated errors will disable the source
self.disabled = False
self.disabled_until = datetime.datetime.now()
self.disabled_until = datetime.now()
self.error_counter = RollingErrorCounter(
duration=self.config.error_duration,
tolerance=self.config.error_tolerance,
Expand All @@ -196,10 +216,10 @@ def work(self) -> None:
# If not, we raise a ZACException, so that the state of the process
# is marked as not ok.
if self.disabled:
if self.disabled_until > datetime.datetime.now():
time_left = self.disabled_until - datetime.datetime.now()
if self.disabled_until > datetime.now():
time_left = self.disabled_until - datetime.now()
raise ZACException(
f"Source is disabled for {utils.timedelta_to_str(time_left)}"
f"Source is disabled for {utils.format_timedelta(time_left)}"
)
else:
logging.info("Reactivating source")
Expand Down Expand Up @@ -282,9 +302,7 @@ def disable(self) -> None:
logging.info(
"Disabling source '%s' for %s seconds", self.name, disable_duration
)
self.disabled_until = datetime.datetime.now() + datetime.timedelta(
seconds=disable_duration
)
self.disabled_until = datetime.now() + timedelta(seconds=disable_duration)
# Reset the error counter so that previous errors don't count towards
# the error counter in the next run in case the disable duration is short
self.error_counter.reset()
Expand Down
71 changes: 54 additions & 17 deletions zabbix_auto_config/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import time
import types
from dataclasses import asdict
from dataclasses import field
from datetime import datetime
from datetime import timedelta
from multiprocessing.managers import BaseManager
from multiprocessing.managers import NamespaceProxy # type: ignore # why unexported?
from typing import Any
Expand All @@ -14,24 +17,35 @@

@dataclass
class State:
"""Health state of a process."""

"""Health state and performance metrics of a process.
This class tracks both error states and execution statistics for a process,
providing a comprehensive view of the process's health and performance.
Attributes:
ok: Status of the process. False if an error occurred in the most recent run.
error: Error message from most recent error, if any.
error_type: Error type name from most recent error, if any.
error_time: Timestamp of the most recent error, if any.
error_count: Total number of errors encountered since process start.
execution_count: Total number of executions since process start.
total_duration: Cumulative execution time of all runs.
max_duration: Longest execution time observed.
last_duration_warning: When the last warning about long execution was logged.
"""

# Error tracking
ok: bool = True
"""Status of the process. False if an error has occurred in the most recent run."""

# BELOW: Only applicable if ok is False

error: Optional[str] = None
"""Error message for most recent error."""

error_type: Optional[str] = None
"""Error type name for most recent error"""

error_time: Optional[float] = None
"""Timestamp of the most recent error."""

error_count: int = 0
"""Number of errors the process has encountered since starting."""

# Execution metrics
execution_count: int = 0
total_duration: timedelta = field(default_factory=timedelta)
max_duration: timedelta = field(default_factory=timedelta)
last_duration_warning: Optional[datetime] = None

def asdict(self) -> Dict[str, Any]:
"""Return dict representation of the State object."""
Expand All @@ -41,23 +55,46 @@ def asdict(self) -> Dict[str, Any]:
def set_ok(self) -> None:
"""Set current state to OK, clear error information.
NOTE
----
Does not reset the error count.
NOTE: Does not reset the error count or execution metrics.
"""
self.ok = True
self.error = None
self.error_type = None
self.error_time = None

def set_error(self, exc: Exception) -> None:
"""Set current state to error and record error information."""
"""Set current state to error and record error information.
Args:
exc (Exception): The exception that caused the error state.
"""
self.ok = False
self.error = str(exc)
self.error_type = type(exc).__name__
self.error_time = time.time()
self.error_count += 1

def record_execution(self, duration: timedelta) -> None:
"""Record metrics for a process execution.
Args:
duration (timedelta): The duration of the execution that just completed.
"""
self.execution_count += 1
self.total_duration += duration
self.max_duration = max(self.max_duration, duration)

@property
def avg_duration(self) -> Optional[timedelta]:
"""Calculate average execution duration.
Returns:
Optional[timedelta]: Average duration of all executions, or None if no executions recorded.
"""
if self.execution_count == 0:
return None
return self.total_duration / self.execution_count


class Manager(BaseManager):
pass
Expand Down
Loading

0 comments on commit 597f736

Please sign in to comment.