Skip to content

Commit

Permalink
Add typing to acquire
Browse files Browse the repository at this point in the history
  • Loading branch information
DanielNoord committed Nov 4, 2024
1 parent e9bb695 commit bc9b9ab
Showing 1 changed file with 14 additions and 9 deletions.
23 changes: 14 additions & 9 deletions asyncpg/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from __future__ import annotations

import asyncio
from collections.abc import Awaitable, Callable
from collections.abc import Awaitable, Callable, Iterator
import functools
import inspect
import logging
Expand Down Expand Up @@ -405,7 +405,7 @@ def __init__(self, *connect_args,
self._holders = []
self._initialized = False
self._initializing = False
self._queue = None
self._queue: Optional[asyncio.LifoQueue[PoolConnectionHolder]] = None

self._connection_class = connection_class
self._record_class = record_class
Expand Down Expand Up @@ -838,7 +838,11 @@ async def copy_records_to_table(
where=where
)

def acquire(self, *, timeout=None):
def acquire(
self,
*,
timeout: Optional[float] = None,
) -> PoolAcquireContext:
"""Acquire a database connection from the pool.
:param float timeout: A timeout for acquiring a Connection.
Expand All @@ -863,11 +867,12 @@ def acquire(self, *, timeout=None):
"""
return PoolAcquireContext(self, timeout)

async def _acquire(self, timeout):
async def _acquire_impl():
ch = await self._queue.get() # type: PoolConnectionHolder
async def _acquire(self, timeout: Optional[float]) -> PoolConnectionProxy:
async def _acquire_impl() -> PoolConnectionProxy:
assert self._queue is not None
ch = await self._queue.get()
try:
proxy = await ch.acquire() # type: PoolConnectionProxy
proxy = await ch.acquire()
except (Exception, asyncio.CancelledError):
self._queue.put_nowait(ch)
raise
Expand Down Expand Up @@ -1039,7 +1044,7 @@ def __init__(self, pool: Pool, timeout: Optional[float]) -> None:
self.connection = None
self.done = False

async def __aenter__(self):
async def __aenter__(self) -> PoolConnectionProxy:
if self.connection is not None or self.done:
raise exceptions.InterfaceError('a connection is already acquired')
self.connection = await self.pool._acquire(self.timeout)
Expand All @@ -1056,7 +1061,7 @@ async def __aexit__(
self.connection = None
await self.pool.release(con)

def __await__(self):
def __await__(self) -> Iterator[PoolConnectionProxy]:
self.done = True
return self.pool._acquire(self.timeout).__await__()

Expand Down

0 comments on commit bc9b9ab

Please sign in to comment.