Skip to content

Commit

Permalink
Fix up Python 3.8 loop argument warnings (#246)
Browse files Browse the repository at this point in the history
* Fix up Python 3.8 loop argument warnings

* Remove all "loop=self.loop" expressions.

* Rely on the currently running loop in the constructor of Queue.

* Assuming that janus.Queue objects are created in the functions or
  coroutines called by the event loop, rewrite most test cases to
  be async.

  - No longer manage the event loop lifecycles by ourselves.

  - Adopt pytest-asyncio to seamlessly run test cases in an event loop.

* Add missing .close() / .wait_closed() calls to the end of many test
  cases to ensure proper termination of the queues.

* Insert asyncio.sleep(0) in the wait_closed() method so that all
  task-done callbacks for tasks spawned by _notify_async_not_empty(),
  _notify_async_not_full() internal methods are properly awaited.
  This eliminates hundreds of resource warnings after finishing the test
  suite.

* Ensure dropping of Python 3.3/3.4 in CI configs.

* Add Python 3.7 and 3.8 to CI configs.

* Oops

* Let tox install pytest-asyncio

* Remove PY_33/PY_35 conditional branches as we no longer support Python
  3.4 or older versions.

* Update gitignore

* Let requirements-dev.txt to include pytest-asyncio

* Fix up errors for Python 3.5 and 3.6

* Fix too-long-line error

* Add changelog and update README
  • Loading branch information
achimnol authored Apr 23, 2020
1 parent 2543af6 commit ec8592b
Show file tree
Hide file tree
Showing 11 changed files with 427 additions and 301 deletions.
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,8 @@ target/

coverage

.pytest_cache
.pytest_cache/
.mypy_cache/

# pyenv
.python-version
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ language: python
python:
- "3.5"
- "3.6"
- "3.7"
- "3.8"


install:
Expand Down
5 changes: 5 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
Changes
=======

to be released
--------------

- Remove explicit loop arguments and forbid creating queues outside event loops #246

0.4.0 (2018-07-28)
------------------

Expand Down
55 changes: 49 additions & 6 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,50 @@ Synchronous is fully compatible with `standard queue
follows `asyncio queue design
<https://docs.python.org/3/library/asyncio-queue.html>`_.

Usage example
=============
Usage example (Python 3.7+)
===========================

.. code:: python
import asyncio
import janus
def threaded(sync_q):
for i in range(100):
sync_q.put(i)
sync_q.join()
async def async_coro(async_q):
for i in range(100):
val = await async_q.get()
assert val == i
async_q.task_done()
async def main():
queue = janus.Queue()
loop = asyncio.get_running_loop()
fut = loop.run_in_executor(None, threaded, queue.sync_q)
await async_coro(queue.async_q)
await fut
queue.close()
await queue.wait_closed()
asyncio.run(main())
Usage example (Python 3.5 and 3.6)
==================================

.. code:: python
import asyncio
import janus
loop = asyncio.get_event_loop()
queue = janus.Queue(loop=loop)
def threaded(sync_q):
Expand All @@ -51,9 +85,18 @@ Usage example
async_q.task_done()
fut = loop.run_in_executor(None, threaded, queue.sync_q)
loop.run_until_complete(async_coro(queue.async_q))
loop.run_until_complete(fut)
async def main():
queue = janus.Queue()
fut = loop.run_in_executor(None, threaded, queue.sync_q)
await async_coro(queue.async_q)
await fut
queue.close()
await queue.wait_closed()
try:
loop.run_until_complete(main())
finally:
loop.close()
Communication channels
Expand Down
34 changes: 19 additions & 15 deletions janus/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,17 @@


T = TypeVar('T')
OptLoop = Optional[asyncio.AbstractEventLoop]
OptInt = Optional[int]


class Queue(Generic[T]):
def __init__(self, maxsize: int = 0, *, loop: OptLoop = None) -> None:
if loop is None:
loop = asyncio.get_event_loop()
current_loop = getattr(asyncio, 'get_running_loop', None)
if current_loop is None:
current_loop = asyncio.get_event_loop


self._loop = loop # type: asyncio.AbstractEventLoop
class Queue(Generic[T]):
def __init__(self, maxsize: int = 0) -> None:
self._loop = current_loop()
self._maxsize = maxsize

self._init(maxsize)
Expand All @@ -37,12 +38,10 @@ def __init__(self, maxsize: int = 0, *, loop: OptLoop = None) -> None:
self._sync_not_full = threading.Condition(self._sync_mutex)
self._all_tasks_done = threading.Condition(self._sync_mutex)

self._async_mutex = asyncio.Lock(loop=self._loop)
self._async_not_empty = asyncio.Condition(
self._async_mutex, loop=self._loop)
self._async_not_full = asyncio.Condition(
self._async_mutex, loop=self._loop)
self._finished = asyncio.Event(loop=self._loop)
self._async_mutex = asyncio.Lock()
self._async_not_empty = asyncio.Condition(self._async_mutex)
self._async_not_full = asyncio.Condition(self._async_mutex)
self._finished = asyncio.Event()
self._finished.set()

self._closing = False
Expand Down Expand Up @@ -78,9 +77,14 @@ async def wait_closed(self) -> None:
# so lock acquiring is not required
if not self._closing:
raise RuntimeError("Waiting for non-closed queue")
# give execution chances for the task-done callbacks
# of async tasks created inside
# _notify_async_not_empty, _notify_async_not_full
# methods.
await asyncio.sleep(0)
if not self._pending:
return
await asyncio.wait(self._pending, loop=self._loop)
await asyncio.wait(self._pending)

@property
def closed(self) -> bool:
Expand Down Expand Up @@ -143,7 +147,7 @@ async def f() -> None:
self._async_not_empty.notify()

def task_maker() -> None:
task = asyncio.ensure_future(f(), loop=self._loop)
task = self._loop.create_task(f())
task.add_done_callback(self._pending.discard)
self._pending.add(task)

Expand All @@ -158,7 +162,7 @@ async def f() -> None:
self._async_not_full.notify()

def task_maker() -> None:
task = asyncio.ensure_future(f(), loop=self._loop)
task = self._loop.create_task(f())
task.add_done_callback(self._pending.discard)
self._pending.add(task)

Expand Down
1 change: 1 addition & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ mypy==0.770
pyroma==2.6
pytest-cov==2.8.1
pytest==5.4.1
pytest-asyncio==0.10.0
tox==3.14.6
wheel==0.34.2
15 changes: 5 additions & 10 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@

from setuptools.command.test import test as TestCommand

PY_33 = sys.version_info < (3, 4)
PY_35 = sys.version_info >= (3, 5)


class PyTest(TestCommand):
user_options = [('pytest-args=', 'a', "Arguments to pass to py.test")]
Expand Down Expand Up @@ -43,13 +40,10 @@ def read(f):

install_requires = []

if PY_33:
install_requires.append('asyncio')

# if not PY_35:
# install_requires.append('typing')

tests_require = install_requires + ['pytest']
tests_require = install_requires + [
'pytest>=5.4',
'pytest-asyncio>=0.10.0',
]
extras_require = {}


Expand All @@ -66,6 +60,7 @@ def read(f):
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Topic :: Software Development :: Libraries',
'Framework :: AsyncIO',
],
Expand Down
Loading

0 comments on commit ec8592b

Please sign in to comment.