diff --git a/tests/test_state.py b/tests/test_state.py index af9b120..3f4f2c9 100644 --- a/tests/test_state.py +++ b/tests/test_state.py @@ -4,6 +4,7 @@ import time import pytest +from inline_snapshot import snapshot from zabbix_auto_config.exceptions import ZACException from zabbix_auto_config.processing import BaseProcess @@ -144,13 +145,19 @@ def test_state_asdict_ok(use_manager: bool) -> None: else: state = State() state.set_ok() - assert state.asdict() == { - "ok": True, - "error": None, - "error_type": None, - "error_count": 0, - "error_time": None, - } + assert state.asdict() == snapshot( + { + "ok": True, + "error": None, + "error_type": None, + "error_time": None, + "error_count": 0, + "execution_count": 0, + "total_duration": datetime.timedelta(0), + "max_duration": datetime.timedelta(0), + "last_duration_warning": None, + } + ) class CustomException(Exception): @@ -174,9 +181,40 @@ def test_state_asdict_error(use_manager: bool) -> None: assert post >= d["error_time"] >= pre d.pop("error_time") - assert d == { - "ok": False, - "error": "Test error", - "error_type": "CustomException", - "error_count": 1, - } + assert d == snapshot( + { + "ok": False, + "error": "Test error", + "error_type": "CustomException", + "error_count": 1, + "execution_count": 0, + "total_duration": datetime.timedelta(0), + "max_duration": datetime.timedelta(0), + "last_duration_warning": None, + } + ) + + +def test_state_record_execution() -> None: + state = State() + + # Add 1 second execution + state.record_execution(datetime.timedelta(seconds=1)) + assert state.execution_count == 1 + assert state.total_duration == datetime.timedelta(seconds=1) + assert state.max_duration == datetime.timedelta(seconds=1) + assert state.avg_duration == datetime.timedelta(seconds=1 / 1) + + # Add 2 second execution + state.record_execution(datetime.timedelta(seconds=2)) + assert state.execution_count == 2 + assert state.total_duration == datetime.timedelta(seconds=3) + assert state.max_duration == datetime.timedelta(seconds=2) + assert state.avg_duration == datetime.timedelta(seconds=3 / 2) + + # Add 1 second execution + state.record_execution(datetime.timedelta(seconds=1)) + assert state.execution_count == 3 + assert state.total_duration == datetime.timedelta(seconds=4) + assert state.max_duration == datetime.timedelta(seconds=2) + assert state.avg_duration == datetime.timedelta(seconds=4 / 3) diff --git a/tests/test_utils.py b/tests/test_utils.py index 0cce716..a916a74 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,6 +1,7 @@ from __future__ import annotations import logging +from datetime import timedelta from ipaddress import IPv4Address from ipaddress import IPv6Address from pathlib import Path @@ -235,3 +236,36 @@ def test_mapping_values_with_prefix_no_prefix_separator( ) assert res == {"user1@example.com": ["Foouser1-primary", "Foouser1-secondary"]} assert caplog.text.count("WARNING") == 2 + + +@pytest.mark.parametrize( + "inp,expected", + [ + # Normal cases + (timedelta(hours=1, minutes=30, seconds=45), "01:30:45"), + (timedelta(hours=0, minutes=5, seconds=30), "00:05:30"), + (timedelta(hours=24, minutes=0, seconds=0), "24:00:00"), + # Zero and near-zero cases + (timedelta(seconds=0), "00:00:00"), + (timedelta(microseconds=999999), "00:00:00"), # Should truncate to zero + # Negative durations + (timedelta(hours=-1, minutes=-30, seconds=-45), "-01:30:45"), + (timedelta(hours=-2), "-02:00:00"), + # Large durations (multiple days) + (timedelta(days=1, hours=2, minutes=3, seconds=4), "26:03:04"), + (timedelta(days=2), "48:00:00"), + # Fractional components + (timedelta(hours=1, minutes=30, seconds=45, microseconds=500000), "01:30:45"), + # None case + (None, "00:00:00"), + # Edge cases around zero + (timedelta(microseconds=1), "00:00:00"), + (timedelta(microseconds=-1), "00:00:00"), + # Mixed positive/negative components + (timedelta(hours=1, minutes=-30), "00:30:00"), # Python normalizes this + # Large negative durations + (timedelta(days=-2, hours=-3), "-51:00:00"), + ], +) +def test_format_timedelta(inp: timedelta, expected: str): + assert utils.format_timedelta(inp) == expected diff --git a/zabbix_auto_config/processing.py b/zabbix_auto_config/processing.py index 12efa70..0f34601 100644 --- a/zabbix_auto_config/processing.py +++ b/zabbix_auto_config/processing.py @@ -1,6 +1,5 @@ from __future__ import annotations -import datetime import itertools import logging import multiprocessing @@ -15,6 +14,8 @@ import time from collections import Counter from collections import defaultdict +from datetime import datetime +from datetime import timedelta from enum import Enum from typing import TYPE_CHECKING from typing import Any @@ -71,7 +72,7 @@ def __init__(self, name: str, state: State) -> None: self.state = state self.update_interval = 1 - self.next_update = datetime.datetime.now() + self.next_update = datetime.now() self.state.set_ok() self.stop_event = multiprocessing.Event() @@ -87,17 +88,17 @@ def run(self) -> None: self.stop() break - if self.next_update > datetime.datetime.now(): + if self.next_update > datetime.now(): time.sleep(1) continue - self.next_update = datetime.datetime.now() + datetime.timedelta( + start_time = datetime.now() + self.next_update = datetime.now() + timedelta( seconds=self.update_interval ) try: self.work() - self.state.set_ok() except Exception as e: # These are the error types we handle ourselves then continue if isinstance(e, httpx.TimeoutException): @@ -116,16 +117,35 @@ def run(self) -> None: else: raise e # all other exceptions are fatal self.state.set_error(e) + else: + self.state.set_ok() + work_duration = datetime.now() - start_time + self.state.record_execution(work_duration) + + # Only warn about long-running tasks if: + # 1. Interval is non-zero (not continuous processing) + # 2. Work took longer than the interval + # 3. Haven't warned in last hour if ( - self.update_interval > 1 - and self.next_update < datetime.datetime.now() + self.update_interval > 0 + and work_duration.total_seconds() > self.update_interval + and ( + not self.state.last_duration_warning + or datetime.now() - self.state.last_duration_warning + > timedelta(hours=1) + ) ): - # Only log warning when update_interval is actually changed from default logging.warning( - "Next update is in the past. Interval too short? Lagging behind? Next update was: %s", - self.next_update.isoformat(timespec="seconds"), + "Work duration (%s) exceeded update interval (%s). " + "Stats - Avg duration: %s, Max duration: %s, Updates: %d", + utils.format_timedelta(work_duration), + utils.format_timedelta(timedelta(seconds=self.update_interval)), + utils.format_timedelta(self.state.avg_duration), + utils.format_timedelta(self.state.max_duration), + self.state.execution_count, ) + self.state.last_duration_warning = datetime.now() logging.info("Process exiting") @@ -185,7 +205,7 @@ def __init__( # Repeated errors will disable the source self.disabled = False - self.disabled_until = datetime.datetime.now() + self.disabled_until = datetime.now() self.error_counter = RollingErrorCounter( duration=self.config.error_duration, tolerance=self.config.error_tolerance, @@ -196,10 +216,10 @@ def work(self) -> None: # If not, we raise a ZACException, so that the state of the process # is marked as not ok. if self.disabled: - if self.disabled_until > datetime.datetime.now(): - time_left = self.disabled_until - datetime.datetime.now() + if self.disabled_until > datetime.now(): + time_left = self.disabled_until - datetime.now() raise ZACException( - f"Source is disabled for {utils.timedelta_to_str(time_left)}" + f"Source is disabled for {utils.format_timedelta(time_left)}" ) else: logging.info("Reactivating source") @@ -282,9 +302,7 @@ def disable(self) -> None: logging.info( "Disabling source '%s' for %s seconds", self.name, disable_duration ) - self.disabled_until = datetime.datetime.now() + datetime.timedelta( - seconds=disable_duration - ) + self.disabled_until = datetime.now() + timedelta(seconds=disable_duration) # Reset the error counter so that previous errors don't count towards # the error counter in the next run in case the disable duration is short self.error_counter.reset() diff --git a/zabbix_auto_config/state.py b/zabbix_auto_config/state.py index a6ef796..0ec1cdd 100644 --- a/zabbix_auto_config/state.py +++ b/zabbix_auto_config/state.py @@ -3,6 +3,9 @@ import time import types from dataclasses import asdict +from dataclasses import field +from datetime import datetime +from datetime import timedelta from multiprocessing.managers import BaseManager from multiprocessing.managers import NamespaceProxy # type: ignore # why unexported? from typing import Any @@ -14,24 +17,35 @@ @dataclass class State: - """Health state of a process.""" - + """Health state and performance metrics of a process. + + This class tracks both error states and execution statistics for a process, + providing a comprehensive view of the process's health and performance. + + Attributes: + ok: Status of the process. False if an error occurred in the most recent run. + error: Error message from most recent error, if any. + error_type: Error type name from most recent error, if any. + error_time: Timestamp of the most recent error, if any. + error_count: Total number of errors encountered since process start. + execution_count: Total number of executions since process start. + total_duration: Cumulative execution time of all runs. + max_duration: Longest execution time observed. + last_duration_warning: When the last warning about long execution was logged. + """ + + # Error tracking ok: bool = True - """Status of the process. False if an error has occurred in the most recent run.""" - - # BELOW: Only applicable if ok is False - error: Optional[str] = None - """Error message for most recent error.""" - error_type: Optional[str] = None - """Error type name for most recent error""" - error_time: Optional[float] = None - """Timestamp of the most recent error.""" - error_count: int = 0 - """Number of errors the process has encountered since starting.""" + + # Execution metrics + execution_count: int = 0 + total_duration: timedelta = field(default_factory=timedelta) + max_duration: timedelta = field(default_factory=timedelta) + last_duration_warning: Optional[datetime] = None def asdict(self) -> Dict[str, Any]: """Return dict representation of the State object.""" @@ -41,9 +55,7 @@ def asdict(self) -> Dict[str, Any]: def set_ok(self) -> None: """Set current state to OK, clear error information. - NOTE - ---- - Does not reset the error count. + NOTE: Does not reset the error count or execution metrics. """ self.ok = True self.error = None @@ -51,13 +63,38 @@ def set_ok(self) -> None: self.error_time = None def set_error(self, exc: Exception) -> None: - """Set current state to error and record error information.""" + """Set current state to error and record error information. + + Args: + exc (Exception): The exception that caused the error state. + """ self.ok = False self.error = str(exc) self.error_type = type(exc).__name__ self.error_time = time.time() self.error_count += 1 + def record_execution(self, duration: timedelta) -> None: + """Record metrics for a process execution. + + Args: + duration (timedelta): The duration of the execution that just completed. + """ + self.execution_count += 1 + self.total_duration += duration + self.max_duration = max(self.max_duration, duration) + + @property + def avg_duration(self) -> Optional[timedelta]: + """Calculate average execution duration. + + Returns: + Optional[timedelta]: Average duration of all executions, or None if no executions recorded. + """ + if self.execution_count == 0: + return None + return self.total_duration / self.execution_count + class Manager(BaseManager): pass diff --git a/zabbix_auto_config/utils.py b/zabbix_auto_config/utils.py index 99b81cb..ae45b3a 100644 --- a/zabbix_auto_config/utils.py +++ b/zabbix_auto_config/utils.py @@ -1,17 +1,18 @@ from __future__ import annotations import copy -import datetime import ipaddress import logging import multiprocessing import queue import re +from datetime import timedelta from pathlib import Path from typing import TYPE_CHECKING from typing import Dict from typing import List from typing import MutableMapping +from typing import Optional from typing import Union from zabbix_auto_config.pyzabbix.types import HostTag @@ -170,9 +171,28 @@ def drain_queue(q: multiprocessing.Queue) -> None: break -def timedelta_to_str(td: datetime.timedelta) -> str: - """Converts a timedelta to a string of the form HH:MM:SS.""" - return str(td).partition(".")[0] +def format_timedelta(td: Optional[timedelta] = None) -> str: + """Format a timedelta object showing only hours, minutes, and seconds. + + Args: + td: The timedelta object to format + + Returns: + A string representation in the format "HH:MM:SS" + """ + if td is None: + return "00:00:00" + + # Convert to total seconds and handle sign + total_seconds = int(td.total_seconds()) + sign = "-" if total_seconds < 0 else "" + total_seconds = abs(total_seconds) + + # Convert to hours, minutes, seconds + hours, remainder = divmod(total_seconds, 3600) + minutes, seconds = divmod(remainder, 60) + + return f"{sign}{hours:02d}:{minutes:02d}:{seconds:02d}" def write_file(path: Union[str, Path], content: str, end: str = "\n") -> None: