Skip to content

Commit

Permalink
add watch to health check service
Browse files Browse the repository at this point in the history
  • Loading branch information
William Parsley committed Nov 17, 2023
1 parent db0f9a6 commit 16ff755
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 6 deletions.
2 changes: 2 additions & 0 deletions protos/feast/third_party/grpc/health/v1/HealthService.proto
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ enum ServingStatus {
UNKNOWN = 0;
SERVING = 1;
NOT_SERVING = 2;
SERVICE_UNKNOWN = 3;
}

message HealthCheckResponse {
Expand All @@ -21,4 +22,5 @@ message HealthCheckResponse {

service Health {
rpc Check(HealthCheckRequest) returns (HealthCheckResponse);
rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse);
}
147 changes: 141 additions & 6 deletions sdk/python/feast/transformation_server.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import logging
import collections
import sys
import threading
from concurrent import futures

import grpc
Expand Down Expand Up @@ -31,17 +33,150 @@

log = logging.getLogger(__name__)

class _Watcher:
def __init__(self):
self._condition = threading.Condition()
self._responses = collections.deque()
self._open = True

def __iter__(self):
return self

def _next(self):
with self._condition:
while not self._responses and self._open:
self._condition.wait()
if self._responses:
return self._responses.popleft()
else:
raise StopIteration()

def next(self):
return self._next()

def __next__(self):
return self._next()

def add(self, response):
with self._condition:
self._responses.append(response)
self._condition.notify()

def close(self):
with self._condition:
self._open = False
self._condition.notify()
def _watcher_to_send_response_callback_adapter(watcher):
def send_response_callback(response):
if response is None:
watcher.close()
else:
watcher.add(response)

return send_response_callback

class HealthServer(HealthServicer):
def __init__(self) -> None:
super().__init__()
"""Servicer handling RPCs for service statuses."""

def __init__(
self, experimental_non_blocking=True, experimental_thread_pool=None
):
self._lock = threading.RLock()
self._server_status = {"": ServingStatus.SERVING}
self._send_response_callbacks = {}
self.Watch.__func__.experimental_non_blocking = (
experimental_non_blocking
)
self.Watch.__func__.experimental_thread_pool = experimental_thread_pool
self._gracefully_shutting_down = False

def Check(self, request, context):
response = HealthCheckResponse(status=ServingStatus.SERVING)
return response
def _on_close_callback(self, send_response_callback, service):
def callback():
with self._lock:
self._send_response_callbacks[service].remove(
send_response_callback
)
send_response_callback(None)

return callback

def Check(self, request, context):
with self._lock:
status = self._server_status.get(request.service)
if status is None:
context.set_code(grpc.StatusCode.NOT_FOUND)
return HealthCheckResponse()
else:
return HealthCheckResponse(status=status)

# pylint: disable=arguments-differ
def Watch(self, request, context, send_response_callback=None):
send_response_callback(HealthCheckResponse(status=ServingStatus.SERVING))
blocking_watcher = None
if send_response_callback is None:
# The server does not support the experimental_non_blocking
# parameter. For backwards compatibility, return a blocking response
# generator.
blocking_watcher = _Watcher()
send_response_callback = _watcher_to_send_response_callback_adapter(
blocking_watcher
)
service = request.service
with self._lock:
status = self._server_status.get(service)
if status is None:
status = (
ServingStatus.SERVICE_UNKNOWN
) # pylint: disable=no-member
send_response_callback(
HealthCheckResponse(status=status)
)
if service not in self._send_response_callbacks:
self._send_response_callbacks[service] = set()
self._send_response_callbacks[service].add(send_response_callback)
context.add_callback(
self._on_close_callback(send_response_callback, service)
)
return blocking_watcher

def set(self, service, status):
"""Sets the status of a service.
Args:
service: string, the name of the service.
status: HealthCheckResponse.status enum value indicating the status of
the service
"""
with self._lock:
if self._gracefully_shutting_down:
return
else:
self._server_status[service] = status
if service in self._send_response_callbacks:
for send_response_callback in self._send_response_callbacks[
service
]:
send_response_callback(
HealthCheckResponse(status=status)
)

def enter_graceful_shutdown(self):
"""Permanently sets the status of all services to NOT_SERVING.
This should be invoked when the server is entering a graceful shutdown
period. After this method is invoked, future attempts to set the status
of a service will be ignored.
This is an EXPERIMENTAL API.
"""
with self._lock:
if self._gracefully_shutting_down:
return
else:
for service in self._server_status:
self.set(
service, ServingStatus.NOT_SERVING
) # pylint: disable=no-member
self._gracefully_shutting_down = True

class TransformationServer(TransformationServiceServicer):
def __init__(self, fs: FeatureStore) -> None:
Expand Down

0 comments on commit 16ff755

Please sign in to comment.