From a3606016832f5d8081f616b9666b110f0441896d Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Fri, 13 Dec 2024 13:35:07 +0100 Subject: [PATCH] Implement .shutdown() for both sync and async APIs (#720) --- .github/workflows/ci.yml | 4 + CHANGES.rst | 14 +++ Makefile | 2 +- janus/__init__.py | 157 ++++++++++++++++++++++++++------- tests/test_async.py | 139 +++++++++++++++++++++++++++++ tests/test_mixed.py | 86 ++++++------------ tests/test_sync.py | 186 +++++++++++++++++++++++++++++++++++++++ 7 files changed, 497 insertions(+), 91 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d0efb1d9..5ff82492 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -34,6 +34,8 @@ jobs: uses: actions/checkout@v4 - name: Setup Python uses: actions/setup-python@v5 + with: + python-version: 3.13 - name: Cache PyPI uses: actions/cache@v4 with: @@ -157,6 +159,8 @@ jobs: uses: actions/checkout@v4 - name: Setup Python uses: actions/setup-python@v5 + with: + python-version: 3.13 - name: Install dependencies uses: py-actions/py-dependency-install@v4.1.0 with: diff --git a/CHANGES.rst b/CHANGES.rst index b4b01d9d..52402b2b 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -13,6 +13,20 @@ Changes .. towncrier release notes start +2.0.0 (2024-12-XX) +------------------ + +- Implement ``.shutdown(immediate=False)`` for both sync and async APIs #720 + + The change is not fully backward compatible: + + 1. If the queue is closed, ``janus.AsyncQueueShutDown`` and + ``janus.SyncQueueShutDown`` exceptions are raised instead of ``RuntimeError``. + + 2. Both sync and async ``.task_done()`` and ``.join()`` don't raise any exception + on queue shutdown/closing anymore; it is compatible with shutdown behavior + of stdlib sync and async queues. + 1.2.0 (2024-12-12) ------------------ diff --git a/Makefile b/Makefile index 64d3911a..4d3aa67c 100644 --- a/Makefile +++ b/Makefile @@ -16,7 +16,7 @@ fmt: cov: flake develop pytest --cov=janus --cov=tests --cov-report=term --cov-report=html - @echo "open file://`pwd`/htmlcov/index.html" + @echo "open file://`pwd`/coverage/index.html" checkrst: python setup.py check --restructuredtext diff --git a/janus/__init__.py b/janus/__init__.py index 54bf3d54..bc1d7b4b 100644 --- a/janus/__init__.py +++ b/janus/__init__.py @@ -8,7 +8,22 @@ from queue import Empty as SyncQueueEmpty from queue import Full as SyncQueueFull from time import monotonic -from typing import Any, Callable, Generic, Optional, Protocol, TypeVar +from typing import Callable, Generic, Optional, Protocol, TypeVar + +if sys.version_info >= (3, 13): + from asyncio import QueueShutDown as AsyncQueueShutDown + from queue import ShutDown as SyncQueueShutDown +else: + class QueueShutDown(Exception): + pass + + AsyncQueueShutDown = QueueShutDown + + class ShutDown(Exception): + pass + + SyncQueueShutDown = ShutDown + __version__ = "1.2.0" __all__ = ( @@ -18,9 +33,11 @@ "SyncQueue", "SyncQueueEmpty", "SyncQueueFull", + "SyncQueueShutDown", "AsyncQueue", "AsyncQueueEmpty", "AsyncQueueFull", + "AsyncQueueShutDown", "BaseQueue", ) @@ -51,6 +68,8 @@ def put_nowait(self, item: T) -> None: ... def get_nowait(self) -> T: ... + def shutdown(self, immediate: bool = False) -> None: ... + class SyncQueue(BaseQueue[T], Protocol[T]): @@ -77,6 +96,7 @@ def __init__(self, maxsize: int = 0) -> None: self._loop = asyncio.get_running_loop() self._maxsize = maxsize + self._is_shutdown = False self._init(maxsize) @@ -101,8 +121,7 @@ def __init__(self, maxsize: int = 0) -> None: self._async_tasks_done = asyncio.Condition(self._async_mutex) self._async_tasks_done_waiting = 0 - self._closing = False - self._pending: deque[asyncio.Future[Any]] = deque() + self._pending: deque[asyncio.Future[None]] = deque() self._sync_queue = _SyncQueueProxy(self) self._async_queue = _AsyncQueueProxy(self) @@ -119,39 +138,72 @@ def _get_loop(self) -> asyncio.AbstractEventLoop: raise RuntimeError(f"{self!r} is bound to a different event loop") return loop - def close(self) -> None: + def shutdown(self, immediate: bool = False) -> None: + """Shut-down the queue, making queue gets and puts raise an exception. + + By default, gets will only raise once the queue is empty. Set + 'immediate' to True to make gets raise immediately instead. + + All blocked callers of put() and get() will be unblocked. If + 'immediate', a task is marked as done for each item remaining in + the queue, which may unblock callers of join(). + + The raise exception is SyncQueueShutDown for sync api and AsyncQueueShutDown + for async one. + """ with self._sync_mutex: - self._closing = True - for fut in self._pending: - fut.cancel() - if self._async_tasks_done_waiting: - # unblocks all async_q.join() - self._notify_async(self._async_tasks_done.notify_all) - if self._sync_tasks_done_waiting: - self._sync_tasks_done.notify_all() # unblocks all sync_q.join() + self._is_shutdown = True + if immediate: + while self._qsize(): + self._get() + if self._unfinished_tasks > 0: + self._unfinished_tasks -= 1 + # release all blocked threads in `join()` + if self._sync_tasks_done_waiting: + self._sync_tasks_done.notify_all() + if self._async_tasks_done_waiting: + self._notify_async(self._async_tasks_done.notify_all) + # All getters need to re-check queue-empty to raise ShutDown + if self._sync_not_empty_waiting: + self._sync_not_empty.notify_all() + if self._sync_not_full_waiting: + self._sync_not_full.notify_all() + if self._async_not_empty_waiting: + self._notify_async(self._async_not_empty.notify_all) + if self._async_not_full_waiting: + self._notify_async(self._async_not_full.notify_all) + + def close(self) -> None: + """Close the queue. + + The method is a shortcut for .shutdown(immediate=True) + """ + self.shutdown(immediate=True) async def wait_closed(self) -> None: + """Wait for finishing all pending activities""" # should be called from loop after close(). # Nobody should put/get at this point, # so lock acquiring is not required - if not self._closing: + if not self._is_shutdown: raise RuntimeError("Waiting for non-closed queue") - # give execution chances for the task-done callbacks + # give a chance for the task-done callbacks # of async tasks created inside - # _make_async_not_empty_notifier, _make_async_not_full_notifier - # methods. + # _notify_async() + # methods to be executed. await asyncio.sleep(0) if not self._pending: return await asyncio.wait(self._pending) async def aclose(self) -> None: + """Shutdown the queue and wait for actual shutting down""" self.close() await self.wait_closed() @property def closed(self) -> bool: - return self._closing and not self._pending + return self._is_shutdown and not self._pending @property def maxsize(self) -> int: @@ -208,10 +260,6 @@ def _notify_async(self, method: Callable[[], None]) -> None: return loop.call_soon_threadsafe(self._setup_async_notifier, loop, method) - def _check_closing(self) -> None: - if self._closing: - raise RuntimeError("Operation on the closed queue is forbidden") - class _SyncQueueProxy(SyncQueue[T]): """Create a queue object with a given maximum size. @@ -245,7 +293,6 @@ def task_done(self) -> None: placed in the queue. """ parent = self._parent - parent._check_closing() with parent._sync_tasks_done: unfinished = parent._unfinished_tasks - 1 if unfinished <= 0: @@ -267,7 +314,6 @@ def join(self) -> None: When the count of unfinished tasks drops to zero, join() unblocks. """ parent = self._parent - parent._check_closing() with parent._sync_tasks_done: while parent._unfinished_tasks: parent._sync_tasks_done_waiting += 1 @@ -275,7 +321,6 @@ def join(self) -> None: parent._sync_tasks_done.wait() finally: parent._sync_tasks_done_waiting -= 1 - parent._check_closing() def qsize(self) -> int: """Return the approximate size of the queue (not reliable!).""" @@ -322,8 +367,9 @@ def put(self, item: T, block: bool = True, timeout: OptFloat = None) -> None: is ignored in that case). """ parent = self._parent - parent._check_closing() with parent._sync_not_full: + if parent._is_shutdown: + raise SyncQueueShutDown if parent._maxsize > 0: if not block: if parent._qsize() >= parent._maxsize: @@ -335,6 +381,8 @@ def put(self, item: T, block: bool = True, timeout: OptFloat = None) -> None: parent._sync_not_full.wait() finally: parent._sync_not_full_waiting -= 1 + if parent._is_shutdown: + raise SyncQueueShutDown elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") else: @@ -348,6 +396,8 @@ def put(self, item: T, block: bool = True, timeout: OptFloat = None) -> None: parent._sync_not_full.wait(remaining) finally: parent._sync_not_full_waiting -= 1 + if parent._is_shutdown: + raise SyncQueueShutDown parent._put_internal(item) if parent._sync_not_empty_waiting: parent._sync_not_empty.notify() @@ -366,8 +416,9 @@ def get(self, block: bool = True, timeout: OptFloat = None) -> T: in that case). """ parent = self._parent - parent._check_closing() with parent._sync_not_empty: + if parent._is_shutdown and not parent._qsize(): + raise SyncQueueShutDown if not block: if not parent._qsize(): raise SyncQueueEmpty @@ -378,6 +429,8 @@ def get(self, block: bool = True, timeout: OptFloat = None) -> T: parent._sync_not_empty.wait() finally: parent._sync_not_empty_waiting -= 1 + if parent._is_shutdown and not parent._qsize(): + raise SyncQueueShutDown elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") else: @@ -391,6 +444,8 @@ def get(self, block: bool = True, timeout: OptFloat = None) -> T: parent._sync_not_empty.wait(remaining) finally: parent._sync_not_empty_waiting -= 1 + if parent._is_shutdown and not parent._qsize(): + raise SyncQueueShutDown item = parent._get() if parent._sync_not_full_waiting: parent._sync_not_full.notify() @@ -414,6 +469,21 @@ def get_nowait(self) -> T: """ return self.get(block=False) + def shutdown(self, immediate: bool = False) -> None: + """Shut-down the queue, making queue gets and puts raise an exception. + + By default, gets will only raise once the queue is empty. Set + 'immediate' to True to make gets raise immediately instead. + + All blocked callers of put() and get() will be unblocked. If + 'immediate', a task is marked as done for each item remaining in + the queue, which may unblock callers of join(). + + The raise exception is SyncQueueShutDown for sync api and AsyncQueueShutDown + for async one. + """ + self._parent.shutdown(immediate) + class _AsyncQueueProxy(AsyncQueue[T]): """Create a queue object with a given maximum size. @@ -471,9 +541,10 @@ async def put(self, item: T) -> None: This method is a coroutine. """ parent = self._parent - parent._check_closing() async with parent._async_not_full: with parent._sync_mutex: + if parent._is_shutdown: + raise AsyncQueueShutDown parent._get_loop() # check the event loop while 0 < parent._maxsize <= parent._qsize(): parent._async_not_full_waiting += 1 @@ -483,6 +554,8 @@ async def put(self, item: T) -> None: finally: parent._sync_mutex.acquire() parent._async_not_full_waiting -= 1 + if parent._is_shutdown: + raise AsyncQueueShutDown parent._put_internal(item) if parent._async_not_empty_waiting: @@ -496,8 +569,10 @@ def put_nowait(self, item: T) -> None: If no free slot is immediately available, raise QueueFull. """ parent = self._parent - parent._check_closing() with parent._sync_mutex: + if parent._is_shutdown: + raise AsyncQueueShutDown + parent._get_loop() if 0 < parent._maxsize <= parent._qsize(): raise AsyncQueueFull @@ -516,9 +591,10 @@ async def get(self) -> T: This method is a coroutine. """ parent = self._parent - parent._check_closing() async with parent._async_not_empty: with parent._sync_mutex: + if parent._is_shutdown and not parent._qsize(): + raise AsyncQueueShutDown parent._get_loop() # check the event loop while not parent._qsize(): parent._async_not_empty_waiting += 1 @@ -528,6 +604,8 @@ async def get(self) -> T: finally: parent._sync_mutex.acquire() parent._async_not_empty_waiting -= 1 + if parent._is_shutdown and not parent._qsize(): + raise AsyncQueueShutDown item = parent._get() if parent._async_not_full_waiting: @@ -542,8 +620,9 @@ def get_nowait(self) -> T: Return an item if one is immediately available, else raise QueueEmpty. """ parent = self._parent - parent._check_closing() with parent._sync_mutex: + if parent._is_shutdown and not parent._qsize(): + raise AsyncQueueShutDown if not parent._qsize(): raise AsyncQueueEmpty @@ -570,7 +649,6 @@ def task_done(self) -> None: the queue. """ parent = self._parent - parent._check_closing() with parent._sync_tasks_done: if parent._unfinished_tasks <= 0: raise ValueError("task_done() called too many times") @@ -590,7 +668,6 @@ async def join(self) -> None: When the count of unfinished tasks drops to zero, join() unblocks. """ parent = self._parent - parent._check_closing() async with parent._async_tasks_done: with parent._sync_mutex: parent._get_loop() # check the event loop @@ -602,7 +679,21 @@ async def join(self) -> None: finally: parent._sync_mutex.acquire() parent._async_tasks_done_waiting -= 1 - parent._check_closing() + + def shutdown(self, immediate: bool = False) -> None: + """Shut-down the queue, making queue gets and puts raise an exception. + + By default, gets will only raise once the queue is empty. Set + 'immediate' to True to make gets raise immediately instead. + + All blocked callers of put() and get() will be unblocked. If + 'immediate', a task is marked as done for each item remaining in + the queue, which may unblock callers of join(). + + The raise exception is SyncQueueShutDown for sync api and AsyncQueueShutDown + for async one. + """ + self._parent.shutdown(immediate) class PriorityQueue(Queue[T]): diff --git a/tests/test_async.py b/tests/test_async.py index 093e05db..f13841c8 100644 --- a/tests/test_async.py +++ b/tests/test_async.py @@ -446,6 +446,145 @@ async def put(): await close(_q) +class TestQueueShutdown: + @pytest.mark.asyncio + async def test_shutdown_empty(self): + _q = janus.Queue() + q = _q.async_q + + q.shutdown() + with pytest.raises(janus.AsyncQueueShutDown): + await q.put("data") + with pytest.raises(janus.AsyncQueueShutDown): + await q.get() + with pytest.raises(janus.AsyncQueueShutDown): + q.get_nowait() + + @pytest.mark.asyncio + async def test_shutdown_nonempty(self): + _q = janus.Queue() + q = _q.async_q + + await q.put("data") + q.shutdown() + await q.get() + with pytest.raises(janus.AsyncQueueShutDown): + await q.get() + + @pytest.mark.asyncio + async def test_shutdown_nonempty_get_nowait(self): + _q = janus.Queue() + q = _q.async_q + + await q.put("data") + q.shutdown() + q.get_nowait() + with pytest.raises(janus.AsyncQueueShutDown): + q.get_nowait() + + @pytest.mark.asyncio + async def test_shutdown_immediate(self): + _q = janus.Queue() + q = _q.async_q + + await q.put("data") + q.shutdown(immediate=True) + with pytest.raises(janus.AsyncQueueShutDown): + await q.get() + with pytest.raises(janus.AsyncQueueShutDown): + q.get_nowait() + + @pytest.mark.asyncio + async def test_shutdown_immediate_with_undone_tasks(self): + _q = janus.Queue() + q = _q.async_q + + await q.put(1) + await q.put(2) + # artificial .task_done() without .get() for covering specific codeline + # in .shutdown(True) + q.task_done() + + q.shutdown(True) + await close(_q) + + @pytest.mark.asyncio + async def test_shutdown_putter(self): + _q = janus.Queue(maxsize=1) + q = _q.async_q + + await q.put(1) + + async def putter(): + await q.put(2) + + task = asyncio.create_task(putter()) + # wait for the task start + await asyncio.sleep(0.01) + + q.shutdown() + + with pytest.raises(janus.AsyncQueueShutDown): + await task + + await close(_q) + + @pytest.mark.asyncio + async def test_shutdown_many_putters(self): + _q = janus.Queue(maxsize=1) + q = _q.async_q + + await q.put(1) + + async def putter(n): + await q.put(n) + + tasks = [] + for i in range(2): + tasks.append(asyncio.create_task(putter(i))) + # wait for the task start + await asyncio.sleep(0.01) + + q.shutdown() + + for task in tasks: + with pytest.raises(janus.AsyncQueueShutDown): + await task + + await close(_q) + + @pytest.mark.asyncio + async def test_shutdown_getter(self): + _q = janus.Queue() + q = _q.async_q + + async def getter(): + await q.get() + + task = asyncio.create_task(getter()) + # wait for the task start + await asyncio.sleep(0.01) + + q.shutdown() + + with pytest.raises(janus.AsyncQueueShutDown): + await task + + await close(_q) + + @pytest.mark.asyncio + async def test_shutdown_early_getter(self): + _q = janus.Queue() + q = _q.async_q + + q.shutdown() + + with pytest.raises(janus.AsyncQueueShutDown): + await q.get() + + await close(_q) + + class TestLifoQueue: @pytest.mark.asyncio async def test_order(self): diff --git a/tests/test_mixed.py b/tests/test_mixed.py index cf4613c9..a0c02daa 100644 --- a/tests/test_mixed.py +++ b/tests/test_mixed.py @@ -1,6 +1,5 @@ import asyncio import sys -import threading from concurrent.futures import ThreadPoolExecutor @@ -210,40 +209,30 @@ async def test_modifying_forbidden_after_closing(self): q.close() with pytest.raises( - RuntimeError, match="Operation on the closed queue is forbidden" + janus.SyncQueueShutDown ): q.sync_q.put(5) with pytest.raises( - RuntimeError, match="Operation on the closed queue is forbidden" + janus.SyncQueueShutDown ): q.sync_q.get() with pytest.raises( - RuntimeError, match="Operation on the closed queue is forbidden" - ): - q.sync_q.task_done() - - with pytest.raises( - RuntimeError, match="Operation on the closed queue is forbidden" + janus.AsyncQueueShutDown ): await q.async_q.put(5) with pytest.raises( - RuntimeError, match="Operation on the closed queue is forbidden" + janus.AsyncQueueShutDown ): q.async_q.put_nowait(5) with pytest.raises( - RuntimeError, match="Operation on the closed queue is forbidden" + janus.AsyncQueueShutDown ): q.async_q.get_nowait() - with pytest.raises( - RuntimeError, match="Operation on the closed queue is forbidden" - ): - await q.sync_q.task_done() - await q.wait_closed() @pytest.mark.asyncio @@ -269,10 +258,7 @@ async def test_closed(self): async def test_async_join_after_closing(self): q = janus.Queue() q.close() - with pytest.raises( - RuntimeError, match="Operation on the closed queue is forbidden" - ): - await asyncio.wait_for(q.async_q.join(), timeout=0.1) + await asyncio.wait_for(q.async_q.join(), timeout=0.1) await q.wait_closed() @@ -285,63 +271,33 @@ async def test_close_after_async_join(self): await asyncio.sleep(0.01) # ensure tasks are blocking q.close() - with pytest.raises( - RuntimeError, match="Operation on the closed queue is forbidden" - ): - await asyncio.wait_for(task, timeout=0.1) + await asyncio.wait_for(task, timeout=0.1) await q.wait_closed() @pytest.mark.asyncio async def test_sync_join_after_closing(self): + loop = asyncio.get_running_loop() q = janus.Queue() q.sync_q.put(1) q.close() - - loop = asyncio.get_event_loop() - fut = asyncio.Future() - - def sync_join(): - try: - q.sync_q.join() - except Exception as exc: - loop.call_soon_threadsafe(fut.set_exception, exc) - - thr = threading.Thread(target=sync_join, daemon=True) - thr.start() - - with pytest.raises( - RuntimeError, match="Operation on the closed queue is forbidden" - ): - await asyncio.wait_for(fut, timeout=0.1) + await asyncio.wait_for(loop.run_in_executor(None, q.sync_q.join), timeout=0.1) await q.wait_closed() @pytest.mark.asyncio async def test_close_after_sync_join(self): + loop = asyncio.get_running_loop() q = janus.Queue() q.sync_q.put(1) - loop = asyncio.get_event_loop() - fut = asyncio.Future() - - def sync_join(): - try: - q.sync_q.join() - except Exception as exc: - loop.call_soon_threadsafe(fut.set_exception, exc) - - thr = threading.Thread(target=sync_join, daemon=True) - thr.start() - thr.join(0.1) # ensure tasks are blocking + fut = loop.run_in_executor(None, q.sync_q.join) + await asyncio.sleep(0.1) # ensure tasks are blocking q.close() - with pytest.raises( - RuntimeError, match="Operation on the closed queue is forbidden" - ): - await asyncio.wait_for(fut, timeout=0.1) + await asyncio.wait_for(fut, timeout=0.1) await q.wait_closed() @@ -426,3 +382,19 @@ async def test_get_notifies_async_not_full(self): await asyncio.gather(*tasks) assert q.sync_q.qsize() == 2 await q.aclose() + + @pytest.mark.asyncio + async def test_wait_closed_with_pending_tasks(self): + q = janus.Queue() + + async def getter(): + await q.async_q.get() + + task = asyncio.create_task(getter()) + await asyncio.sleep(0.01) + q.shutdown() + # q._pending is not empty now + await q.wait_closed() + + with pytest.raises(janus.AsyncQueueShutDown): + await task diff --git a/tests/test_sync.py b/tests/test_sync.py index 330e9f2d..161ab843 100644 --- a/tests/test_sync.py +++ b/tests/test_sync.py @@ -434,3 +434,189 @@ def test_sync_only_api(): q = janus.Queue() q.sync_q.put(1) assert q.sync_q.get() == 1 + + +class TestQueueShutdown: + @pytest.mark.asyncio + async def test_shutdown_empty(self): + _q = janus.Queue() + q = _q.sync_q + + q.shutdown() + with pytest.raises(janus.SyncQueueShutDown): + q.put("data") + with pytest.raises(janus.SyncQueueShutDown): + q.get() + with pytest.raises(janus.SyncQueueShutDown): + q.get_nowait() + + @pytest.mark.asyncio + async def test_shutdown_nonempty(self): + _q = janus.Queue() + q = _q.sync_q + + q.put("data") + q.shutdown() + q.get() + with pytest.raises(janus.SyncQueueShutDown): + q.get() + + @pytest.mark.asyncio + async def test_shutdown_nonempty_get_nowait(self): + _q = janus.Queue() + q = _q.sync_q + + q.put("data") + q.shutdown() + q.get_nowait() + with pytest.raises(janus.SyncQueueShutDown): + q.get_nowait() + + @pytest.mark.asyncio + async def test_shutdown_immediate(self): + _q = janus.Queue() + q = _q.sync_q + + q.put("data") + q.shutdown(immediate=True) + with pytest.raises(janus.SyncQueueShutDown): + q.get() + with pytest.raises(janus.SyncQueueShutDown): + q.get_nowait() + + @pytest.mark.asyncio + async def test_shutdown_immediate_with_undone_tasks(self): + _q = janus.Queue() + q = _q.sync_q + + q.put(1) + q.put(2) + # artificial .task_done() without .get() for covering specific codeline + # in .shutdown(True) + q.task_done() + + q.shutdown(True) + + @pytest.mark.asyncio + async def test_shutdown_putter(self): + loop = asyncio.get_running_loop() + _q = janus.Queue(maxsize=1) + q = _q.sync_q + + q.put(1) + + def putter(): + q.put(2) + + fut = loop.run_in_executor(None, putter) + # wait for the task start + await asyncio.sleep(0.01) + + q.shutdown() + + with pytest.raises(janus.SyncQueueShutDown): + await fut + + await _q.aclose() + + @pytest.mark.asyncio + async def test_shutdown_many_putters(self): + loop = asyncio.get_running_loop() + _q = janus.Queue(maxsize=1) + q = _q.sync_q + + q.put(1) + + def putter(n): + q.put(n) + + futs = [] + for i in range(2): + futs.append(loop.run_in_executor(None, putter, i)) + # wait for the task start + await asyncio.sleep(0.01) + + q.shutdown() + + for fut in futs: + with pytest.raises(janus.SyncQueueShutDown): + await fut + + await _q.aclose() + + @pytest.mark.asyncio + async def test_shutdown_many_putters_with_timeout(self): + loop = asyncio.get_running_loop() + _q = janus.Queue(maxsize=1) + q = _q.sync_q + + q.put(1) + + def putter(n): + q.put(n, timeout=60) + + futs = [] + for i in range(2): + futs.append(loop.run_in_executor(None, putter, i)) + # wait for the task start + await asyncio.sleep(0.01) + + q.shutdown() + + for fut in futs: + with pytest.raises(janus.SyncQueueShutDown): + await fut + + await _q.aclose() + + @pytest.mark.asyncio + async def test_shutdown_getter(self): + loop = asyncio.get_running_loop() + _q = janus.Queue() + q = _q.sync_q + + def getter(): + q.get() + + fut = loop.run_in_executor(None, getter) + # wait for the task start + await asyncio.sleep(0.01) + + q.shutdown() + + with pytest.raises(janus.SyncQueueShutDown): + await fut + + await _q.aclose() + + @pytest.mark.asyncio + async def test_shutdown_getter_with_timeout(self): + loop = asyncio.get_running_loop() + _q = janus.Queue() + q = _q.sync_q + + def getter(): + q.get(timeout=60) + + fut = loop.run_in_executor(None, getter) + # wait for the task start + await asyncio.sleep(0.01) + + q.shutdown() + + with pytest.raises(janus.SyncQueueShutDown): + await fut + + await _q.aclose() + + @pytest.mark.asyncio + async def test_shutdown_early_getter(self): + _q = janus.Queue() + q = _q.sync_q + + q.shutdown() + + with pytest.raises(janus.SyncQueueShutDown): + q.get() + + await _q.aclose()