diff --git a/.gitignore b/.gitignore index 7375139..efd14fc 100644 --- a/.gitignore +++ b/.gitignore @@ -58,4 +58,8 @@ target/ coverage -.pytest_cache \ No newline at end of file +.pytest_cache/ +.mypy_cache/ + +# pyenv +.python-version diff --git a/.travis.yml b/.travis.yml index 39d1a4c..a9eecc5 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,6 +3,8 @@ language: python python: - "3.5" - "3.6" + - "3.7" + - "3.8" install: diff --git a/CHANGES.rst b/CHANGES.rst index d85624e..dc1967a 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,6 +1,11 @@ Changes ======= +to be released +-------------- + +- Remove explicit loop arguments and forbid creating queues outside event loops #246 + 0.4.0 (2018-07-28) ------------------ diff --git a/README.rst b/README.rst index 97bb295..8527649 100644 --- a/README.rst +++ b/README.rst @@ -26,8 +26,43 @@ Synchronous is fully compatible with `standard queue follows `asyncio queue design `_. -Usage example -============= +Usage example (Python 3.7+) +=========================== + +.. code:: python + + import asyncio + import janus + + + def threaded(sync_q): + for i in range(100): + sync_q.put(i) + sync_q.join() + + + async def async_coro(async_q): + for i in range(100): + val = await async_q.get() + assert val == i + async_q.task_done() + + + async def main(): + queue = janus.Queue() + loop = asyncio.get_running_loop() + fut = loop.run_in_executor(None, threaded, queue.sync_q) + await async_coro(queue.async_q) + await fut + queue.close() + await queue.wait_closed() + + + asyncio.run(main()) + + +Usage example (Python 3.5 and 3.6) +================================== .. code:: python @@ -35,7 +70,6 @@ Usage example import janus loop = asyncio.get_event_loop() - queue = janus.Queue(loop=loop) def threaded(sync_q): @@ -51,9 +85,18 @@ Usage example async_q.task_done() - fut = loop.run_in_executor(None, threaded, queue.sync_q) - loop.run_until_complete(async_coro(queue.async_q)) - loop.run_until_complete(fut) + async def main(): + queue = janus.Queue() + fut = loop.run_in_executor(None, threaded, queue.sync_q) + await async_coro(queue.async_q) + await fut + queue.close() + await queue.wait_closed() + + try: + loop.run_until_complete(main()) + finally: + loop.close() Communication channels diff --git a/janus/__init__.py b/janus/__init__.py index e6e4c44..047d2f8 100644 --- a/janus/__init__.py +++ b/janus/__init__.py @@ -16,16 +16,17 @@ T = TypeVar('T') -OptLoop = Optional[asyncio.AbstractEventLoop] OptInt = Optional[int] -class Queue(Generic[T]): - def __init__(self, maxsize: int = 0, *, loop: OptLoop = None) -> None: - if loop is None: - loop = asyncio.get_event_loop() +current_loop = getattr(asyncio, 'get_running_loop', None) +if current_loop is None: + current_loop = asyncio.get_event_loop + - self._loop = loop # type: asyncio.AbstractEventLoop +class Queue(Generic[T]): + def __init__(self, maxsize: int = 0) -> None: + self._loop = current_loop() self._maxsize = maxsize self._init(maxsize) @@ -37,12 +38,10 @@ def __init__(self, maxsize: int = 0, *, loop: OptLoop = None) -> None: self._sync_not_full = threading.Condition(self._sync_mutex) self._all_tasks_done = threading.Condition(self._sync_mutex) - self._async_mutex = asyncio.Lock(loop=self._loop) - self._async_not_empty = asyncio.Condition( - self._async_mutex, loop=self._loop) - self._async_not_full = asyncio.Condition( - self._async_mutex, loop=self._loop) - self._finished = asyncio.Event(loop=self._loop) + self._async_mutex = asyncio.Lock() + self._async_not_empty = asyncio.Condition(self._async_mutex) + self._async_not_full = asyncio.Condition(self._async_mutex) + self._finished = asyncio.Event() self._finished.set() self._closing = False @@ -78,9 +77,14 @@ async def wait_closed(self) -> None: # so lock acquiring is not required if not self._closing: raise RuntimeError("Waiting for non-closed queue") + # give execution chances for the task-done callbacks + # of async tasks created inside + # _notify_async_not_empty, _notify_async_not_full + # methods. + await asyncio.sleep(0) if not self._pending: return - await asyncio.wait(self._pending, loop=self._loop) + await asyncio.wait(self._pending) @property def closed(self) -> bool: @@ -143,7 +147,7 @@ async def f() -> None: self._async_not_empty.notify() def task_maker() -> None: - task = asyncio.ensure_future(f(), loop=self._loop) + task = self._loop.create_task(f()) task.add_done_callback(self._pending.discard) self._pending.add(task) @@ -158,7 +162,7 @@ async def f() -> None: self._async_not_full.notify() def task_maker() -> None: - task = asyncio.ensure_future(f(), loop=self._loop) + task = self._loop.create_task(f()) task.add_done_callback(self._pending.discard) self._pending.add(task) diff --git a/requirements-dev.txt b/requirements-dev.txt index 03ca1bc..7f87225 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -7,5 +7,6 @@ mypy==0.770 pyroma==2.6 pytest-cov==2.8.1 pytest==5.4.1 +pytest-asyncio==0.10.0 tox==3.14.6 wheel==0.34.2 diff --git a/setup.py b/setup.py index 1b3f07a..f4a852c 100644 --- a/setup.py +++ b/setup.py @@ -6,9 +6,6 @@ from setuptools.command.test import test as TestCommand -PY_33 = sys.version_info < (3, 4) -PY_35 = sys.version_info >= (3, 5) - class PyTest(TestCommand): user_options = [('pytest-args=', 'a', "Arguments to pass to py.test")] @@ -43,13 +40,10 @@ def read(f): install_requires = [] -if PY_33: - install_requires.append('asyncio') - -# if not PY_35: -# install_requires.append('typing') - -tests_require = install_requires + ['pytest'] +tests_require = install_requires + [ + 'pytest>=5.4', + 'pytest-asyncio>=0.10.0', +] extras_require = {} @@ -66,6 +60,7 @@ def read(f): 'Programming Language :: Python :: 3.5', 'Programming Language :: Python :: 3.6', 'Programming Language :: Python :: 3.7', + 'Programming Language :: Python :: 3.8', 'Topic :: Software Development :: Libraries', 'Framework :: AsyncIO', ], diff --git a/tests/test_async.py b/tests/test_async.py index 505b577..9f2a1ae 100644 --- a/tests/test_async.py +++ b/tests/test_async.py @@ -1,66 +1,56 @@ """Tests for queues.py""" import asyncio -import concurrent.futures import unittest import janus +import pytest -class _QueueTestBase(unittest.TestCase): - def setUp(self): - self.loop = asyncio.new_event_loop() - self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=5) - self.loop.set_default_executor(self.executor) - asyncio.set_event_loop(None) +class QueueBasicTests(unittest.TestCase): - def tearDown(self): - self.executor.shutdown() - self.loop.close() - - -class QueueBasicTests(_QueueTestBase): - def _test_repr_or_str(self, fn, expect_id): + async def _test_repr_or_str(self, fn, expect_id): """Test Queue's repr or str. fn is repr or str. expect_id is True if we expect the Queue's id to appear in fn(Queue()). """ - _q = janus.Queue(loop=self.loop) + _q = janus.Queue() q = _q.async_q self.assertTrue(fn(q).startswith('