Skip to content

Commit

Permalink
Added the current_default_thread_limiter() function
Browse files Browse the repository at this point in the history
  • Loading branch information
agronholm committed Oct 6, 2019
1 parent f46d336 commit 9689554
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 4 deletions.
10 changes: 10 additions & 0 deletions anyio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,16 @@ def run_async_from_thread(func: Callable[..., Coroutine[Any, Any, T_Retval]], *a
return asynclib.run_async_from_thread(func, *args)


def current_default_thread_limiter() -> CapacityLimiter:
"""
Return the capacity limiter that is used by default to limit the number of concurrent threads.
:return: a capacity limiter object
"""
return _get_asynclib().current_default_thread_limiter()


#
# Async file I/O
#
Expand Down
4 changes: 4 additions & 0 deletions anyio/_backends/_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,10 @@ async def release_on_behalf_of(self, borrower) -> None:
event.set()


def current_default_thread_limiter():
return _default_thread_limiter


_default_thread_limiter = CapacityLimiter(40)

abc.Lock.register(Lock)
Expand Down
4 changes: 4 additions & 0 deletions anyio/_backends/_curio.py
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,10 @@ async def release_on_behalf_of(self, borrower):
await event.set()


def current_default_thread_limiter():
return _default_thread_limiter


_default_thread_limiter = CapacityLimiter(40)

abc.Lock.register(Lock)
Expand Down
14 changes: 11 additions & 3 deletions anyio/_backends/_trio.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Callable, Optional, List
from typing import Callable, Optional, List, Union

import trio.hazmat
from async_generator import async_generator, yield_, asynccontextmanager, aclosing
Expand Down Expand Up @@ -285,8 +285,11 @@ async def __anext__(self):


class CapacityLimiter(abc.CapacityLimiter):
def __init__(self, total_tokens: int):
self._limiter = trio.CapacityLimiter(total_tokens)
def __init__(self, limiter_or_tokens: Union[float, trio.CapacityLimiter]):
if isinstance(limiter_or_tokens, trio.CapacityLimiter):
self._limiter = limiter_or_tokens
else:
self._limiter = trio.CapacityLimiter(limiter_or_tokens)

async def __aenter__(self) -> 'CapacityLimiter':
await self._limiter.__aenter__()
Expand Down Expand Up @@ -332,6 +335,11 @@ async def release_on_behalf_of(self, borrower):
self._limiter.release_on_behalf_of(borrower)


def current_default_thread_limiter():
native_limiter = trio.to_thread.current_default_thread_limiter()
return CapacityLimiter(native_limiter)


abc.Lock.register(Lock)
abc.Condition.register(Condition)
abc.Event.register(Event)
Expand Down
1 change: 1 addition & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ Threads

.. autofunction:: anyio.run_in_thread
.. autofunction:: anyio.run_async_from_thread
.. autofunction:: anyio.current_default_thread_limiter

Async file I/O
--------------
Expand Down
1 change: 1 addition & 0 deletions docs/versionhistory.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ This library adheres to `Semantic Versioning 2.0 <http://semver.org/>`_.
- Added the possibility to parametrize regular pytest test functions against the selected list of
backends
- Added the ``set_total_tokens()`` method to ``CapacityLimiter``
- Added the ``anyio.current_default_thread_limiter()`` function
- Implemented the Happy Eyeballs (:rfc:`6555`) algorithm for ``anyio.connect_tcp()``
- Fixed ``KeyError`` on asyncio and curio where entering and exiting a cancel scope happens in
different tasks
Expand Down
9 changes: 8 additions & 1 deletion tests/test_synchronization.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

from anyio import (
create_lock, create_task_group, create_queue, create_event, create_semaphore, create_condition,
open_cancel_scope, wait_all_tasks_blocked, create_capacity_limiter)
open_cancel_scope, wait_all_tasks_blocked, create_capacity_limiter,
current_default_thread_limiter, CapacityLimiter)


class TestLock:
Expand Down Expand Up @@ -298,3 +299,9 @@ async def waiter():
await limiter.set_total_tokens(2)

assert event2.is_set()

@pytest.mark.anyio
async def test_current_default_thread_limiter(self):
limiter = current_default_thread_limiter()
assert isinstance(limiter, CapacityLimiter)
assert limiter.total_tokens == 40

0 comments on commit 9689554

Please sign in to comment.