Skip to content

Commit

Permalink
Exclude requests from health checkers based on User-Agent
Browse files Browse the repository at this point in the history
  • Loading branch information
itssimon committed Dec 12, 2024
1 parent acf1c36 commit 46506e1
Showing 1 changed file with 20 additions and 1 deletion.
21 changes: 20 additions & 1 deletion apitally/client/request_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@
r"/ready$",
r"/live$",
]
EXCLUDE_USER_AGENT_PATTERNS = [
r"health[_- ]?check",
r"microsoft-azure-application-lb",
r"googlehc",
r"kube-probe",
]
MASK_QUERY_PARAM_PATTERNS = [
r"auth",
r"api-?key",
Expand Down Expand Up @@ -162,7 +168,12 @@ def log_request(self, request: RequestDict, response: ResponseDict) -> None:
if not self.enabled or self.suspend_until is not None:
return
parsed_url = urlparse(request["url"])
if self._should_exclude_path(request["path"] or parsed_url.path) or self._should_exclude(request, response):
user_agent = self._get_user_agent(request["headers"])
if (
self._should_exclude_path(request["path"] or parsed_url.path)
or self._should_exclude_user_agent(user_agent)
or self._should_exclude(request, response)
):
return

query = self._mask_query_params(parsed_url.query) if self.config.log_query_params else ""
Expand Down Expand Up @@ -271,6 +282,10 @@ def _should_exclude_path(self, url_path: str) -> bool:
patterns = self.config.exclude_paths + EXCLUDE_PATH_PATTERNS
return self._match_patterns(url_path, patterns)

@lru_cache(maxsize=1000)
def _should_exclude_user_agent(self, user_agent: Optional[str]) -> bool:
return self._match_patterns(user_agent, EXCLUDE_USER_AGENT_PATTERNS) if user_agent is not None else False

def _mask_query_params(self, query: str) -> str:
query_params = parse_qsl(query)
masked_query_params = [(k, v if not self._should_mask_query_param(k) else MASKED) for k, v in query_params]
Expand Down Expand Up @@ -302,6 +317,10 @@ def _has_supported_content_type(headers: List[Tuple[str, str]]) -> bool:
content_type = next((v for k, v in headers if k.lower() == "content-type"), None)
return content_type is not None and any(content_type.startswith(t) for t in ALLOWED_CONTENT_TYPES)

@staticmethod
def _get_user_agent(headers: List[Tuple[str, str]]) -> Optional[str]:
return next((v for k, v in headers if k.lower() == "user-agent"), None)


def _check_writable_fs() -> bool:
try:
Expand Down

0 comments on commit 46506e1

Please sign in to comment.