| 1 | """ |
|---|
| 2 | Utilities for working with Twisted Deferreds. |
|---|
| 3 | """ |
|---|
| 4 | |
|---|
| 5 | from __future__ import annotations |
|---|
| 6 | |
|---|
| 7 | import time |
|---|
| 8 | from functools import wraps |
|---|
| 9 | |
|---|
| 10 | from typing import ( |
|---|
| 11 | Callable, |
|---|
| 12 | Any, |
|---|
| 13 | Sequence, |
|---|
| 14 | TypeVar, |
|---|
| 15 | Optional, |
|---|
| 16 | Coroutine, |
|---|
| 17 | Generator |
|---|
| 18 | ) |
|---|
| 19 | from typing_extensions import ParamSpec |
|---|
| 20 | |
|---|
| 21 | from foolscap.api import eventually |
|---|
| 22 | from eliot.twisted import ( |
|---|
| 23 | inline_callbacks, |
|---|
| 24 | ) |
|---|
| 25 | from twisted.internet import defer, reactor, error |
|---|
| 26 | from twisted.internet.defer import Deferred |
|---|
| 27 | from twisted.python.failure import Failure |
|---|
| 28 | |
|---|
| 29 | from allmydata.util import log |
|---|
| 30 | from allmydata.util.assertutil import _assert |
|---|
| 31 | from allmydata.util.pollmixin import PollMixin |
|---|
| 32 | |
|---|
| 33 | |
|---|
| 34 | class TimeoutError(Exception): |
|---|
| 35 | pass |
|---|
| 36 | |
|---|
| 37 | |
|---|
| 38 | def timeout_call(reactor, d, timeout): |
|---|
| 39 | """ |
|---|
| 40 | This returns the result of 'd', unless 'timeout' expires before |
|---|
| 41 | 'd' is completed in which case a TimeoutError is raised. |
|---|
| 42 | """ |
|---|
| 43 | timer_d = defer.Deferred() |
|---|
| 44 | |
|---|
| 45 | def _timed_out(): |
|---|
| 46 | timer_d.errback(Failure(TimeoutError())) |
|---|
| 47 | |
|---|
| 48 | def _got_result(x): |
|---|
| 49 | try: |
|---|
| 50 | timer.cancel() |
|---|
| 51 | timer_d.callback(x) |
|---|
| 52 | except (error.AlreadyCalled, defer.AlreadyCalledError): |
|---|
| 53 | pass |
|---|
| 54 | return None |
|---|
| 55 | |
|---|
| 56 | timer = reactor.callLater(timeout, _timed_out) |
|---|
| 57 | d.addBoth(_got_result) |
|---|
| 58 | return timer_d |
|---|
| 59 | |
|---|
| 60 | |
|---|
| 61 | |
|---|
| 62 | # utility wrapper for DeferredList |
|---|
| 63 | def _check_deferred_list(results): |
|---|
| 64 | # if any of the component Deferreds failed, return the first failure such |
|---|
| 65 | # that an addErrback() would fire. If all were ok, return a list of the |
|---|
| 66 | # results (without the success/failure booleans) |
|---|
| 67 | for success,f in results: |
|---|
| 68 | if not success: |
|---|
| 69 | return f |
|---|
| 70 | return [r[1] for r in results] |
|---|
| 71 | |
|---|
| 72 | def DeferredListShouldSucceed(dl): |
|---|
| 73 | d = defer.DeferredList(dl) |
|---|
| 74 | d.addCallback(_check_deferred_list) |
|---|
| 75 | return d |
|---|
| 76 | |
|---|
| 77 | def _parseDListResult(l): |
|---|
| 78 | return [x[1] for x in l] |
|---|
| 79 | |
|---|
| 80 | def _unwrapFirstError(f): |
|---|
| 81 | f.trap(defer.FirstError) |
|---|
| 82 | raise f.value.subFailure |
|---|
| 83 | |
|---|
| 84 | def gatherResults(deferredList): |
|---|
| 85 | """Returns list with result of given Deferreds. |
|---|
| 86 | |
|---|
| 87 | This builds on C{DeferredList} but is useful since you don't |
|---|
| 88 | need to parse the result for success/failure. |
|---|
| 89 | |
|---|
| 90 | @type deferredList: C{list} of L{Deferred}s |
|---|
| 91 | """ |
|---|
| 92 | d = defer.DeferredList(deferredList, fireOnOneErrback=True, consumeErrors=True) |
|---|
| 93 | d.addCallbacks(_parseDListResult, _unwrapFirstError) |
|---|
| 94 | return d |
|---|
| 95 | |
|---|
| 96 | |
|---|
| 97 | def _with_log(op, res): |
|---|
| 98 | """ |
|---|
| 99 | The default behaviour on firing an already-fired Deferred is unhelpful for |
|---|
| 100 | debugging, because the AlreadyCalledError can easily get lost or be raised |
|---|
| 101 | in a context that results in a different error. So make sure it is logged |
|---|
| 102 | (for the abstractions defined here). If we are in a test, log.err will cause |
|---|
| 103 | the test to fail. |
|---|
| 104 | """ |
|---|
| 105 | try: |
|---|
| 106 | op(res) |
|---|
| 107 | except defer.AlreadyCalledError as e: |
|---|
| 108 | log.err(e, op=repr(op), level=log.WEIRD) |
|---|
| 109 | |
|---|
| 110 | def eventually_callback(d): |
|---|
| 111 | def _callback(res): |
|---|
| 112 | eventually(_with_log, d.callback, res) |
|---|
| 113 | return res |
|---|
| 114 | return _callback |
|---|
| 115 | |
|---|
| 116 | def eventually_errback(d): |
|---|
| 117 | def _errback(res): |
|---|
| 118 | eventually(_with_log, d.errback, res) |
|---|
| 119 | return res |
|---|
| 120 | return _errback |
|---|
| 121 | |
|---|
| 122 | def eventual_chain(source, target): |
|---|
| 123 | source.addCallbacks(eventually_callback(target), eventually_errback(target)) |
|---|
| 124 | |
|---|
| 125 | |
|---|
| 126 | class HookMixin: |
|---|
| 127 | """ |
|---|
| 128 | I am a helper mixin that maintains a collection of named hooks, primarily |
|---|
| 129 | for use in tests. Each hook is set to an unfired Deferred using 'set_hook', |
|---|
| 130 | and can then be fired exactly once at the appropriate time by '_call_hook'. |
|---|
| 131 | If 'ignore_count' is given, that number of calls to '_call_hook' will be |
|---|
| 132 | ignored before firing the hook. |
|---|
| 133 | |
|---|
| 134 | I assume a '_hooks' attribute that should set by the class constructor to |
|---|
| 135 | a dict mapping each valid hook name to None. |
|---|
| 136 | """ |
|---|
| 137 | def set_hook(self, name, d=None, ignore_count=0): |
|---|
| 138 | """ |
|---|
| 139 | Called by the hook observer (e.g. by a test). |
|---|
| 140 | If d is not given, an unfired Deferred is created and returned. |
|---|
| 141 | The hook must not already be set. |
|---|
| 142 | """ |
|---|
| 143 | self._log("set_hook %r, ignore_count=%r" % (name, ignore_count)) |
|---|
| 144 | if d is None: |
|---|
| 145 | d = defer.Deferred() |
|---|
| 146 | _assert(ignore_count >= 0, ignore_count=ignore_count) |
|---|
| 147 | _assert(name in self._hooks, name=name) |
|---|
| 148 | _assert(self._hooks[name] is None, name=name, hook=self._hooks[name]) |
|---|
| 149 | _assert(isinstance(d, defer.Deferred), d=d) |
|---|
| 150 | |
|---|
| 151 | self._hooks[name] = (d, ignore_count) |
|---|
| 152 | return d |
|---|
| 153 | |
|---|
| 154 | def _call_hook(self, res, name, **kwargs): |
|---|
| 155 | """ |
|---|
| 156 | Called to trigger the hook, with argument 'res'. This is a no-op if |
|---|
| 157 | the hook is unset. If the hook's ignore_count is positive, it will be |
|---|
| 158 | decremented; if it was already zero, the hook will be unset, and then |
|---|
| 159 | its Deferred will be fired synchronously. |
|---|
| 160 | |
|---|
| 161 | The expected usage is "deferred.addBoth(self._call_hook, 'hookname')". |
|---|
| 162 | This ensures that if 'res' is a failure, the hook will be errbacked, |
|---|
| 163 | which will typically cause the test to also fail. |
|---|
| 164 | 'res' is returned so that the current result or failure will be passed |
|---|
| 165 | through. |
|---|
| 166 | |
|---|
| 167 | Accepts a single keyword argument, async, defaulting to False. |
|---|
| 168 | """ |
|---|
| 169 | async_ = kwargs.get("async", False) |
|---|
| 170 | hook = self._hooks[name] |
|---|
| 171 | if hook is None: |
|---|
| 172 | return res # pass on error/result |
|---|
| 173 | |
|---|
| 174 | (d, ignore_count) = hook |
|---|
| 175 | self._log("call_hook %r, ignore_count=%r" % (name, ignore_count)) |
|---|
| 176 | if ignore_count > 0: |
|---|
| 177 | self._hooks[name] = (d, ignore_count - 1) |
|---|
| 178 | else: |
|---|
| 179 | self._hooks[name] = None |
|---|
| 180 | if async_: |
|---|
| 181 | _with_log(eventually_callback(d), res) |
|---|
| 182 | else: |
|---|
| 183 | _with_log(d.callback, res) |
|---|
| 184 | return res |
|---|
| 185 | |
|---|
| 186 | def _log(self, msg): |
|---|
| 187 | log.msg(msg, level=log.NOISY) |
|---|
| 188 | |
|---|
| 189 | |
|---|
| 190 | class WaitForDelayedCallsMixin(PollMixin): |
|---|
| 191 | def _delayed_calls_done(self): |
|---|
| 192 | # We're done when the only remaining DelayedCalls fire after threshold. |
|---|
| 193 | # (These will be associated with the test timeout, or else they *should* |
|---|
| 194 | # cause an unclean reactor error because the test should have waited for |
|---|
| 195 | # them.) |
|---|
| 196 | threshold = time.time() + 10 |
|---|
| 197 | for delayed in reactor.getDelayedCalls(): |
|---|
| 198 | if delayed.getTime() < threshold: |
|---|
| 199 | return False |
|---|
| 200 | return True |
|---|
| 201 | |
|---|
| 202 | def wait_for_delayed_calls(self, res=None): |
|---|
| 203 | """ |
|---|
| 204 | Use like this at the end of a test: |
|---|
| 205 | d.addBoth(self.wait_for_delayed_calls) |
|---|
| 206 | """ |
|---|
| 207 | d = self.poll(self._delayed_calls_done) |
|---|
| 208 | d.addErrback(log.err, "error while waiting for delayed calls") |
|---|
| 209 | d.addBoth(lambda ign: res) |
|---|
| 210 | return d |
|---|
| 211 | |
|---|
| 212 | @inline_callbacks |
|---|
| 213 | def until( |
|---|
| 214 | action: Callable[[], defer.Deferred[Any]], |
|---|
| 215 | condition: Callable[[], bool], |
|---|
| 216 | ) -> Generator[Any, None, None]: |
|---|
| 217 | """ |
|---|
| 218 | Run a Deferred-returning function until a condition is true. |
|---|
| 219 | |
|---|
| 220 | :param action: The action to run. |
|---|
| 221 | :param condition: The predicate signaling stop. |
|---|
| 222 | |
|---|
| 223 | :return: A Deferred that fires after the condition signals stop. |
|---|
| 224 | """ |
|---|
| 225 | while True: |
|---|
| 226 | yield action() |
|---|
| 227 | if condition(): |
|---|
| 228 | break |
|---|
| 229 | |
|---|
| 230 | |
|---|
| 231 | P = ParamSpec("P") |
|---|
| 232 | R = TypeVar("R") |
|---|
| 233 | |
|---|
| 234 | |
|---|
| 235 | def async_to_deferred(f: Callable[P, Coroutine[defer.Deferred[R], None, R]]) -> Callable[P, Deferred[R]]: |
|---|
| 236 | """ |
|---|
| 237 | Wrap an async function to return a Deferred instead. |
|---|
| 238 | |
|---|
| 239 | Maybe solution to https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3886 |
|---|
| 240 | """ |
|---|
| 241 | |
|---|
| 242 | @wraps(f) |
|---|
| 243 | def not_async(*args: P.args, **kwargs: P.kwargs) -> Deferred[R]: |
|---|
| 244 | return defer.Deferred.fromCoroutine(f(*args, **kwargs)) |
|---|
| 245 | |
|---|
| 246 | return not_async |
|---|
| 247 | |
|---|
| 248 | |
|---|
| 249 | class MultiFailure(Exception): |
|---|
| 250 | """ |
|---|
| 251 | More than one failure occurred. |
|---|
| 252 | """ |
|---|
| 253 | |
|---|
| 254 | def __init__(self, failures: Sequence[Failure]) -> None: |
|---|
| 255 | super(MultiFailure, self).__init__() |
|---|
| 256 | self.failures = failures |
|---|
| 257 | |
|---|
| 258 | |
|---|
| 259 | _T = TypeVar("_T") |
|---|
| 260 | |
|---|
| 261 | # Eventually this should be in Twisted upstream: |
|---|
| 262 | # https://github.com/twisted/twisted/pull/11818 |
|---|
| 263 | def race(ds: Sequence[Deferred[_T]]) -> Deferred[tuple[int, _T]]: |
|---|
| 264 | """ |
|---|
| 265 | Select the first available result from the sequence of Deferreds and |
|---|
| 266 | cancel the rest. |
|---|
| 267 | @return: A cancellable L{Deferred} that fires with the index and output of |
|---|
| 268 | the element of C{ds} to have a success result first, or that fires |
|---|
| 269 | with L{MultiFailure} holding a list of their failures if they all |
|---|
| 270 | fail. |
|---|
| 271 | """ |
|---|
| 272 | # Keep track of the Deferred for the action which completed first. When |
|---|
| 273 | # it completes, all of the other Deferreds will get cancelled but this one |
|---|
| 274 | # shouldn't be. Even though it "completed" it isn't really done - the |
|---|
| 275 | # caller will still be using it for something. If we cancelled it, |
|---|
| 276 | # cancellation could propagate down to them. |
|---|
| 277 | winner: Optional[Deferred] = None |
|---|
| 278 | |
|---|
| 279 | # The cancellation function for the Deferred this function returns. |
|---|
| 280 | def cancel(result: Deferred) -> None: |
|---|
| 281 | # If it is cancelled then we cancel all of the Deferreds for the |
|---|
| 282 | # individual actions because there is no longer the possibility of |
|---|
| 283 | # delivering any of their results anywhere. We don't have to fire |
|---|
| 284 | # `result` because the Deferred will do that for us. |
|---|
| 285 | for d in to_cancel: |
|---|
| 286 | d.cancel() |
|---|
| 287 | |
|---|
| 288 | # The Deferred that this function will return. It will fire with the |
|---|
| 289 | # index and output of the action that completes first, or None if all of |
|---|
| 290 | # the actions fail. If it is cancelled, all of the actions will be |
|---|
| 291 | # cancelled. |
|---|
| 292 | final_result: Deferred[tuple[int, _T]] = Deferred(canceller=cancel) |
|---|
| 293 | |
|---|
| 294 | # A callback for an individual action. |
|---|
| 295 | def succeeded(this_output: _T, this_index: int) -> None: |
|---|
| 296 | # If it is the first action to succeed then it becomes the "winner", |
|---|
| 297 | # its index/output become the externally visible result, and the rest |
|---|
| 298 | # of the action Deferreds get cancelled. If it is not the first |
|---|
| 299 | # action to succeed (because some action did not support |
|---|
| 300 | # cancellation), just ignore the result. It is uncommon for this |
|---|
| 301 | # callback to be entered twice. The only way it can happen is if one |
|---|
| 302 | # of the input Deferreds has a cancellation function that fires the |
|---|
| 303 | # Deferred with a success result. |
|---|
| 304 | nonlocal winner |
|---|
| 305 | if winner is None: |
|---|
| 306 | # This is the first success. Act on it. |
|---|
| 307 | winner = to_cancel[this_index] |
|---|
| 308 | |
|---|
| 309 | # Cancel the rest. |
|---|
| 310 | for d in to_cancel: |
|---|
| 311 | if d is not winner: |
|---|
| 312 | d.cancel() |
|---|
| 313 | |
|---|
| 314 | # Fire our Deferred |
|---|
| 315 | final_result.callback((this_index, this_output)) |
|---|
| 316 | |
|---|
| 317 | # Keep track of how many actions have failed. If they all fail we need to |
|---|
| 318 | # deliver failure notification on our externally visible result. |
|---|
| 319 | failure_state = [] |
|---|
| 320 | |
|---|
| 321 | def failed(failure: Failure, this_index: int) -> None: |
|---|
| 322 | failure_state.append((this_index, failure)) |
|---|
| 323 | if len(failure_state) == len(to_cancel): |
|---|
| 324 | # Every operation failed. |
|---|
| 325 | failure_state.sort() |
|---|
| 326 | failures = [f for (ignored, f) in failure_state] |
|---|
| 327 | final_result.errback(MultiFailure(failures)) |
|---|
| 328 | |
|---|
| 329 | # Copy the sequence of Deferreds so we know it doesn't get mutated out |
|---|
| 330 | # from under us. |
|---|
| 331 | to_cancel = list(ds) |
|---|
| 332 | for index, d in enumerate(ds): |
|---|
| 333 | # Propagate the position of this action as well as the argument to f |
|---|
| 334 | # to the success callback so we can cancel the right Deferreds and |
|---|
| 335 | # propagate the result outwards. |
|---|
| 336 | d.addCallbacks(succeeded, failed, callbackArgs=(index,), errbackArgs=(index,)) |
|---|
| 337 | |
|---|
| 338 | return final_result |
|---|