Skip to content

Commit

Permalink
Refactor health file (#95)
Browse files Browse the repository at this point in the history
* Refactor health file

* Remove unused fixture

* Remove flaky snapshot test in CI

* Refactor forward refs

* Use Python 3.8 typing semantics

* Rewrite State as BaseModel
  • Loading branch information
pederhan authored Jan 16, 2025
1 parent 4ce9b1c commit 198dd01
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 77 deletions.
38 changes: 38 additions & 0 deletions tests/test_health.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from __future__ import annotations

import datetime

import pytest

from zabbix_auto_config.health import HealthFile
from zabbix_auto_config.health import ProcessInfo
from zabbix_auto_config.state import State
from zabbix_auto_config.state import get_manager


@pytest.mark.parametrize("use_manager", [True, False])
def test_healthfile_to_json(use_manager: bool) -> None:
# Test with and without proxied classes
if use_manager:
man = get_manager()
s = man.State()
else:
s = State()

health_file = HealthFile(
date=datetime.datetime(2021, 1, 1, 0, 0, 0),
cwd="/path/to/zac",
pid=1234,
failsafe=123,
processes=[
ProcessInfo(
name="test_process",
pid=1235,
alive=True,
state=s,
)
],
)

# Check that we can call to_json() without errors
assert health_file.to_json()
8 changes: 4 additions & 4 deletions tests/test_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@ def test_state_asdict_ok(use_manager: bool) -> None:
"error_time": None,
"error_count": 0,
"execution_count": 0,
"total_duration": datetime.timedelta(0),
"max_duration": datetime.timedelta(0),
"total_duration": 0.0,
"max_duration": 0.0,
"last_duration_warning": None,
}
)
Expand Down Expand Up @@ -188,8 +188,8 @@ def test_state_asdict_error(use_manager: bool) -> None:
"error_type": "CustomException",
"error_count": 1,
"execution_count": 0,
"total_duration": datetime.timedelta(0),
"max_duration": datetime.timedelta(0),
"total_duration": 0.0,
"max_duration": 0.0,
"last_duration_warning": None,
}
)
Expand Down
46 changes: 1 addition & 45 deletions zabbix_auto_config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,12 @@
import datetime
import importlib
import importlib.metadata
import json
import logging
import multiprocessing
import os
import os.path
import sys
import time
from pathlib import Path
from typing import List

import multiprocessing_logging
Expand All @@ -19,11 +17,11 @@
from zabbix_auto_config import models
from zabbix_auto_config import processing
from zabbix_auto_config.__about__ import __version__
from zabbix_auto_config._types import HealthDict
from zabbix_auto_config._types import HostModifier
from zabbix_auto_config._types import HostModifierModule
from zabbix_auto_config._types import SourceCollector
from zabbix_auto_config._types import SourceCollectorModule
from zabbix_auto_config.health import write_health
from zabbix_auto_config.state import get_manager


Expand Down Expand Up @@ -105,48 +103,6 @@ def get_config() -> models.Settings:
return config


def write_health(
health_file: Path,
processes: List[processing.BaseProcess],
queues: List[multiprocessing.Queue],
failsafe: int,
) -> None:
now = datetime.datetime.now()
health: HealthDict = {
"date": now.isoformat(timespec="seconds"),
"date_unixtime": int(now.timestamp()),
"pid": os.getpid(),
"cwd": os.getcwd(),
"all_ok": all(p.state.ok for p in processes),
"processes": [],
"queues": [],
"failsafe": failsafe,
}

for process in processes:
health["processes"].append(
{
"name": process.name,
"pid": process.pid,
"alive": process.is_alive(),
**process.state.asdict(),
}
)

for queue in queues:
health["queues"].append(
{
"size": queue.qsize(),
}
)

try:
with open(health_file, "w") as f:
f.write(json.dumps(health))
except Exception as e:
logging.error("Unable to write health file %s: %s", health_file, e)


def log_process_status(processes: List[processing.BaseProcess]) -> None:
process_statuses = []

Expand Down
19 changes: 0 additions & 19 deletions zabbix_auto_config/_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,22 +62,3 @@ class SourceCollector(NamedTuple):
name: str
module: SourceCollectorModule
config: SourceCollectorSettings


class QueueDict(TypedDict):
"""Queue information for the health check dict."""

size: int


class HealthDict(TypedDict):
"""Application health dict used by `zabbix_auto_config.__init__.write_health`"""

date: str
date_unixtime: int
pid: int
cwd: str
all_ok: bool
processes: List[dict]
queues: List[QueueDict]
failsafe: int
102 changes: 102 additions & 0 deletions zabbix_auto_config/health.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
from __future__ import annotations

import logging
import multiprocessing
import os
from datetime import datetime
from pathlib import Path
from typing import Any
from typing import List
from typing import Optional

from pydantic import BaseModel
from pydantic import Field
from pydantic import computed_field
from pydantic import field_serializer
from pydantic import field_validator

from zabbix_auto_config import processing
from zabbix_auto_config.state import State
from zabbix_auto_config.state import StateProxy


class ProcessInfo(BaseModel):
name: str
pid: Optional[int]
alive: bool
state: State

@field_validator("state", mode="before")
@classmethod
def validate_state(cls, value: State) -> Any:
if isinstance(value, StateProxy):
return value._getvalue()
return value


class QueueInfo(BaseModel):
size: int


class HealthFile(BaseModel):
"""Health file for the application."""

date: datetime = Field(default_factory=datetime.now)
cwd: str
pid: int
processes: List[ProcessInfo] = []
queues: List[QueueInfo] = []
failsafe: int

@computed_field
@property
def date_unixtime(self) -> int:
return int(self.date.timestamp())

@computed_field
@property
def all_ok(self) -> bool:
return all(p.alive for p in self.processes)

@field_serializer("date", when_used="json")
def serialize_date(self, value: datetime) -> str:
return value.isoformat(timespec="seconds")

def to_json(self) -> str:
return self.model_dump_json(indent=2)


def write_health(
health_file: Path,
processes: list[processing.BaseProcess],
queues: list[multiprocessing.Queue],
failsafe: int,
) -> None:
health = HealthFile(
cwd=os.getcwd(),
pid=os.getpid(),
failsafe=failsafe,
)

for process in processes:
health.processes.append(
ProcessInfo(
name=process.name,
pid=process.pid,
alive=process.is_alive(),
state=process.state,
)
)

for queue in queues:
health.queues.append(
QueueInfo(
size=queue.qsize(),
)
)

try:
with open(health_file, "w") as f:
f.write(health.to_json())
except Exception as e:
logging.error("Unable to write health file %s: %s", health_file, e)
20 changes: 11 additions & 9 deletions zabbix_auto_config/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import time
import types
from dataclasses import asdict
from dataclasses import field
from datetime import datetime
from datetime import timedelta
from multiprocessing.managers import BaseManager
Expand All @@ -12,11 +10,12 @@
from typing import Dict
from typing import Optional

from pydantic.dataclasses import dataclass
from pydantic import BaseModel
from pydantic import Field
from pydantic import field_serializer


@dataclass
class State:
class State(BaseModel):
"""Health state and performance metrics of a process.
This class tracks both error states and execution statistics for a process,
Expand All @@ -43,14 +42,17 @@ class State:

# Execution metrics
execution_count: int = 0
total_duration: timedelta = field(default_factory=timedelta)
max_duration: timedelta = field(default_factory=timedelta)
total_duration: timedelta = Field(default_factory=timedelta)
max_duration: timedelta = Field(default_factory=timedelta)
last_duration_warning: Optional[datetime] = None

@field_serializer("total_duration", "max_duration", when_used="json")
def _serialize_timedelta(self, value: timedelta) -> float:
return value.total_seconds()

def asdict(self) -> Dict[str, Any]:
"""Return dict representation of the State object."""
# NOTE: just construct dict ourselves instead?
return asdict(self)
return self.model_dump(mode="json")

def set_ok(self) -> None:
"""Set current state to OK, clear error information.
Expand Down

0 comments on commit 198dd01

Please sign in to comment.