Skip to content

Commit

Permalink
Merge branch 'main' into release
Browse files Browse the repository at this point in the history
  • Loading branch information
denniswittich committed Oct 31, 2024
2 parents 62a27e0 + 3cbbd59 commit 25b7c59
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 9 deletions.
11 changes: 7 additions & 4 deletions learning_loop_node/loop_communication.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import logging
import time
from typing import Awaitable, Callable, List, Optional

import httpx
Expand Down Expand Up @@ -33,7 +34,6 @@ def __init__(self) -> None:
base_url=self.base_url, timeout=Timeout(60.0), verify=self.ssl_cert_path)
else:
self.async_client = httpx.AsyncClient(base_url=self.base_url, timeout=Timeout(60.0))
self.async_client.cookies.clear()

logging.info(f'Loop interface initialized with base_url: {self.base_url} / user: {self.username}')

Expand Down Expand Up @@ -68,16 +68,19 @@ async def shutdown(self):
if self.async_client is not None and not self.async_client.is_closed:
await self.async_client.aclose()

async def backend_ready(self) -> bool:
async def backend_ready(self, timeout: Optional[int] = None) -> bool:
"""Wait until the backend is ready"""
start_time = time.time()
while True:
try:
logging.info('Checking if backend is ready')
response = await self.get('/status', requires_login=False)
if response.status_code == 200:
return True
except Exception as e:
logging.info(f'backend not ready: {e}')
except Exception:
logging.info('backend not ready yet.')
if timeout is not None and time.time() + 10 - start_time > timeout:
raise TimeoutError('Backend not ready within timeout')
await asyncio.sleep(10)

async def retry_on_401(self, func: Callable[..., Awaitable[httpx.Response]], *args, **kwargs) -> httpx.Response:
Expand Down
14 changes: 11 additions & 3 deletions learning_loop_node/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ async def repeat_loop(self) -> None:
await self.on_repeat()
except asyncio.CancelledError:
return
except TimeoutError:
self.log.debug('Backend not ready within timeout, skipping repeat loop')
except Exception:
self.log.exception('error in repeat loop')

Expand All @@ -140,6 +142,7 @@ async def _ensure_sio_connection(self):
async def reconnect_to_loop(self):
"""Initialize the loop communicator, log in if needed and reconnect to the loop via socket.io."""
self.init_loop_communicator()
await self.loop_communicator.backend_ready(timeout=5)
if self.needs_login:
await self.loop_communicator.ensure_login(relogin=True)
try:
Expand All @@ -162,7 +165,8 @@ async def _reconnect_socketio(self):
The current client is disconnected and deleted if it already exists."""

self.log.debug('-------------- Connecting to loop via socket.io -------------------')
self.log.debug('HTTP Cookies: %s\n', self.loop_communicator.get_cookies())
cookies = self.loop_communicator.get_cookies()
self.log.debug('HTTP Cookies: %s\n', cookies)

if self._sio_client is not None:
try:
Expand All @@ -185,8 +189,12 @@ async def _reconnect_socketio(self):
ssl_context.verify_mode = ssl.CERT_REQUIRED
connector = TCPConnector(ssl=ssl_context)

self._sio_client = AsyncClient(request_timeout=20, http_session=aiohttp.ClientSession(
cookies=self.loop_communicator.get_cookies(), connector=connector))
if self.needs_login:
self._sio_client = AsyncClient(request_timeout=20, http_session=aiohttp.ClientSession(
cookies=cookies, connector=connector))
else:
self._sio_client = AsyncClient(request_timeout=20, http_session=aiohttp.ClientSession(
connector=connector))

# pylint: disable=protected-access
self._sio_client._trigger_event = ensure_socket_response(self._sio_client._trigger_event)
Expand Down
12 changes: 11 additions & 1 deletion learning_loop_node/trainer/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,12 @@
class CriticalError(Exception):
pass
'''
CriticalError is raised when the training cannot be continued.
In this case the trainer jumps to the TrainerState.ReadyForCleanup and tries to upload the latest model.
'''


class NodeNeedsRestartError(Exception):
'''
NodeNeedsRestartError is raised when the node needs to be restarted.
This is e.g. the case when the GPU is not available anymore.
'''
5 changes: 4 additions & 1 deletion learning_loop_node/trainer/trainer_logic_generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
TrainingOut, TrainingStateData)
from ..helpers.misc import create_project_folder, delete_all_training_folders, generate_training, is_valid_uuid4
from .downloader import TrainingsDownloader
from .exceptions import CriticalError
from .exceptions import CriticalError, NodeNeedsRestartError
from .io_helpers import ActiveTrainingIO, EnvironmentVars, LastTrainingIO

if TYPE_CHECKING:
Expand Down Expand Up @@ -294,6 +294,9 @@ async def _perform_state(self, error_key: str, state_during: TrainerState, state
logger.error('CriticalError in %s - Exception: %s', state_during, e)
self.errors.set(error_key, str(e))
self.training.training_state = TrainerState.ReadyForCleanup
except NodeNeedsRestartError:
logger.error('Node Restart Requested')
sys.exit(0)
except Exception as e:
self.errors.set(error_key, str(e))
logger.exception('Error in %s - Exception: %s', state_during, e)
Expand Down

0 comments on commit 25b7c59

Please sign in to comment.