| 1 | """ |
|---|
| 2 | A global thread pool for CPU-intensive tasks. |
|---|
| 3 | |
|---|
| 4 | Motivation: |
|---|
| 5 | |
|---|
| 6 | * Certain tasks are blocking on CPU, and so should be run in a thread. |
|---|
| 7 | * The Twisted thread pool is used for operations that don't necessarily block |
|---|
| 8 | on CPU, like DNS lookups. CPU processing should not block DNS lookups! |
|---|
| 9 | * The number of threads should be fixed, and tied to the number of available |
|---|
| 10 | CPUs. |
|---|
| 11 | |
|---|
| 12 | As a first pass, this uses ``os.cpu_count()`` to determine the max number of |
|---|
| 13 | threads. This may create too many threads, as it doesn't cover things like |
|---|
| 14 | scheduler affinity or cgroups, but that's not the end of the world. |
|---|
| 15 | """ |
|---|
| 16 | |
|---|
| 17 | import os |
|---|
| 18 | from typing import TypeVar, Callable, cast |
|---|
| 19 | from functools import partial |
|---|
| 20 | import threading |
|---|
| 21 | from typing_extensions import ParamSpec |
|---|
| 22 | from unittest import TestCase |
|---|
| 23 | |
|---|
| 24 | from twisted.python.threadpool import ThreadPool |
|---|
| 25 | from twisted.internet.threads import deferToThreadPool |
|---|
| 26 | from twisted.internet import reactor |
|---|
| 27 | from twisted.internet.interfaces import IReactorFromThreads |
|---|
| 28 | |
|---|
| 29 | _CPU_THREAD_POOL = ThreadPool(minthreads=0, maxthreads=os.cpu_count() or 1, name="TahoeCPU") |
|---|
| 30 | if hasattr(threading, "_register_atexit"): |
|---|
| 31 | # This is a private API present in Python 3.8 or later, specifically |
|---|
| 32 | # designed for thread pool shutdown. Since it's private, it might go away |
|---|
| 33 | # at any point, so if it doesn't exist we still have a solution. |
|---|
| 34 | threading._register_atexit(_CPU_THREAD_POOL.stop) # type: ignore |
|---|
| 35 | else: |
|---|
| 36 | # Daemon threads allow shutdown to happen without any explicit stopping of |
|---|
| 37 | # threads. There are some bugs in old Python versions related to daemon |
|---|
| 38 | # threads (fixed in subsequent CPython patch releases), but Python's own |
|---|
| 39 | # thread pools use daemon threads in those versions so we're no worse off. |
|---|
| 40 | _CPU_THREAD_POOL.threadFactory = partial( # type: ignore |
|---|
| 41 | _CPU_THREAD_POOL.threadFactory, daemon=True |
|---|
| 42 | ) |
|---|
| 43 | _CPU_THREAD_POOL.start() |
|---|
| 44 | |
|---|
| 45 | |
|---|
| 46 | P = ParamSpec("P") |
|---|
| 47 | R = TypeVar("R") |
|---|
| 48 | |
|---|
| 49 | # Is running in a thread pool disabled? Should only be true in synchronous unit |
|---|
| 50 | # tests. |
|---|
| 51 | _DISABLED = False |
|---|
| 52 | |
|---|
| 53 | |
|---|
| 54 | async def defer_to_thread(f: Callable[P, R], *args: P.args, **kwargs: P.kwargs) -> R: |
|---|
| 55 | """ |
|---|
| 56 | Run the function in a thread, return the result. |
|---|
| 57 | |
|---|
| 58 | However, if ``disable_thread_pool_for_test()`` was called the function will |
|---|
| 59 | be called synchronously inside the current thread. |
|---|
| 60 | |
|---|
| 61 | To reduce chances of synchronous tests being misleading as a result, this |
|---|
| 62 | is an async function on presumption that will encourage immediate ``await``ing. |
|---|
| 63 | """ |
|---|
| 64 | if _DISABLED: |
|---|
| 65 | return f(*args, **kwargs) |
|---|
| 66 | |
|---|
| 67 | # deferToThreadPool has no type annotations... |
|---|
| 68 | result = await deferToThreadPool(cast(IReactorFromThreads, reactor), _CPU_THREAD_POOL, f, *args, **kwargs) |
|---|
| 69 | return result |
|---|
| 70 | |
|---|
| 71 | |
|---|
| 72 | def disable_thread_pool_for_test(test: TestCase) -> None: |
|---|
| 73 | """ |
|---|
| 74 | For the duration of the test, calls to ``defer_to_thread()`` will actually |
|---|
| 75 | run synchronously, which is useful for synchronous unit tests. |
|---|
| 76 | """ |
|---|
| 77 | global _DISABLED |
|---|
| 78 | |
|---|
| 79 | def restore(): |
|---|
| 80 | global _DISABLED |
|---|
| 81 | _DISABLED = False |
|---|
| 82 | |
|---|
| 83 | test.addCleanup(restore) |
|---|
| 84 | |
|---|
| 85 | _DISABLED = True |
|---|
| 86 | |
|---|
| 87 | |
|---|
| 88 | __all__ = ["defer_to_thread", "disable_thread_pool_for_test"] |
|---|