| 1 | """ |
|---|
| 2 | Observer for Twisted code. |
|---|
| 3 | |
|---|
| 4 | Ported to Python 3. |
|---|
| 5 | """ |
|---|
| 6 | |
|---|
| 7 | import weakref |
|---|
| 8 | from twisted.internet import defer |
|---|
| 9 | from foolscap.api import eventually |
|---|
| 10 | from twisted.logger import ( |
|---|
| 11 | Logger, |
|---|
| 12 | ) |
|---|
| 13 | |
|---|
| 14 | """The idiom we use is for the observed object to offer a method named |
|---|
| 15 | 'when_something', which returns a deferred. That deferred will be fired when |
|---|
| 16 | something happens. The way this is typically implemented is that the observed |
|---|
| 17 | has an ObserverList whose when_fired method is called in the observed's |
|---|
| 18 | 'when_something'.""" |
|---|
| 19 | |
|---|
| 20 | class OneShotObserverList: |
|---|
| 21 | """A one-shot event distributor.""" |
|---|
| 22 | def __init__(self): |
|---|
| 23 | self._fired = False |
|---|
| 24 | self._result = None |
|---|
| 25 | self._watchers = [] |
|---|
| 26 | self.__repr__ = self._unfired_repr |
|---|
| 27 | |
|---|
| 28 | def __repr__(self): |
|---|
| 29 | """string representation of the OneshotObserverList""" |
|---|
| 30 | if self._fired: |
|---|
| 31 | return self._fired_repr() |
|---|
| 32 | return self._unfired_repr() |
|---|
| 33 | |
|---|
| 34 | def _unfired_repr(self): |
|---|
| 35 | return "<OneShotObserverList [%s]>" % (self._watchers, ) |
|---|
| 36 | |
|---|
| 37 | def _fired_repr(self): |
|---|
| 38 | return "<OneShotObserverList -> %s>" % (self._result, ) |
|---|
| 39 | |
|---|
| 40 | def _get_result(self): |
|---|
| 41 | return self._result |
|---|
| 42 | |
|---|
| 43 | def when_fired(self): |
|---|
| 44 | if self._fired: |
|---|
| 45 | return defer.succeed(self._get_result()) |
|---|
| 46 | d = defer.Deferred() |
|---|
| 47 | self._watchers.append(d) |
|---|
| 48 | return d |
|---|
| 49 | |
|---|
| 50 | def fire(self, result): |
|---|
| 51 | assert not self._fired |
|---|
| 52 | self._fired = True |
|---|
| 53 | self._result = result |
|---|
| 54 | self._fire(result) |
|---|
| 55 | |
|---|
| 56 | def _fire(self, result): |
|---|
| 57 | for w in self._watchers: |
|---|
| 58 | w.callback(result) |
|---|
| 59 | del self._watchers |
|---|
| 60 | self.__repr__ = self._fired_repr |
|---|
| 61 | |
|---|
| 62 | def fire_if_not_fired(self, result): |
|---|
| 63 | if not self._fired: |
|---|
| 64 | self.fire(result) |
|---|
| 65 | |
|---|
| 66 | class LazyOneShotObserverList(OneShotObserverList): |
|---|
| 67 | """ |
|---|
| 68 | a variant of OneShotObserverList which does not retain |
|---|
| 69 | the result it handles, but rather retains a callable() |
|---|
| 70 | through which is retrieves the data if and when needed. |
|---|
| 71 | """ |
|---|
| 72 | def __init__(self): |
|---|
| 73 | OneShotObserverList.__init__(self) |
|---|
| 74 | |
|---|
| 75 | def _get_result(self): |
|---|
| 76 | return self._result_producer() |
|---|
| 77 | |
|---|
| 78 | def fire(self, result_producer): |
|---|
| 79 | """ |
|---|
| 80 | @param result_producer: a no-arg callable which |
|---|
| 81 | returns the data which is to be considered the |
|---|
| 82 | 'result' for this observer list. note that this |
|---|
| 83 | function may be called multiple times - once |
|---|
| 84 | upon initial firing, and potentially once more |
|---|
| 85 | for each subsequent when_fired() deferred created |
|---|
| 86 | """ |
|---|
| 87 | assert not self._fired |
|---|
| 88 | self._fired = True |
|---|
| 89 | self._result_producer = result_producer |
|---|
| 90 | if self._watchers: # if not, don't call result_producer |
|---|
| 91 | self._fire(self._get_result()) |
|---|
| 92 | |
|---|
| 93 | class ObserverList: |
|---|
| 94 | """ |
|---|
| 95 | Immediately distribute events to a number of subscribers. |
|---|
| 96 | """ |
|---|
| 97 | _logger = Logger() |
|---|
| 98 | |
|---|
| 99 | def __init__(self): |
|---|
| 100 | self._watchers = [] |
|---|
| 101 | |
|---|
| 102 | def subscribe(self, observer): |
|---|
| 103 | self._watchers.append(observer) |
|---|
| 104 | |
|---|
| 105 | def unsubscribe(self, observer): |
|---|
| 106 | self._watchers.remove(observer) |
|---|
| 107 | |
|---|
| 108 | def notify(self, *args, **kwargs): |
|---|
| 109 | for o in self._watchers[:]: |
|---|
| 110 | try: |
|---|
| 111 | o(*args, **kwargs) |
|---|
| 112 | except Exception: |
|---|
| 113 | self._logger.failure("While notifying {o!r}", o=o) |
|---|
| 114 | |
|---|
| 115 | class EventStreamObserver: |
|---|
| 116 | """A simple class to distribute multiple events to a single subscriber. |
|---|
| 117 | It accepts arbitrary kwargs, but no posargs.""" |
|---|
| 118 | def __init__(self): |
|---|
| 119 | self._watcher = None |
|---|
| 120 | self._undelivered_results = [] |
|---|
| 121 | self._canceler = None |
|---|
| 122 | |
|---|
| 123 | def set_canceler(self, c, methname): |
|---|
| 124 | """I will call c.METHNAME(self) when somebody cancels me.""" |
|---|
| 125 | # we use a weakref to avoid creating a cycle between us and the thing |
|---|
| 126 | # we're observing: they'll be holding a reference to us to compare |
|---|
| 127 | # against the value we pass to their canceler function. However, |
|---|
| 128 | # since bound methods are first-class objects (and not kept alive by |
|---|
| 129 | # the object they're bound to), we can't just stash a weakref to the |
|---|
| 130 | # bound cancel method. Instead, we must hold a weakref to the actual |
|---|
| 131 | # object, and obtain its cancel method later. |
|---|
| 132 | # http://code.activestate.com/recipes/81253-weakmethod/ has an |
|---|
| 133 | # alternative. |
|---|
| 134 | self._canceler = (weakref.ref(c), methname) |
|---|
| 135 | |
|---|
| 136 | def subscribe(self, observer, **watcher_kwargs): |
|---|
| 137 | self._watcher = (observer, watcher_kwargs) |
|---|
| 138 | while self._undelivered_results: |
|---|
| 139 | self._notify(self._undelivered_results.pop(0)) |
|---|
| 140 | |
|---|
| 141 | def notify(self, **result_kwargs): |
|---|
| 142 | if self._watcher: |
|---|
| 143 | self._notify(result_kwargs) |
|---|
| 144 | else: |
|---|
| 145 | self._undelivered_results.append(result_kwargs) |
|---|
| 146 | |
|---|
| 147 | def _notify(self, result_kwargs): |
|---|
| 148 | o, watcher_kwargs = self._watcher |
|---|
| 149 | kwargs = dict(result_kwargs) |
|---|
| 150 | kwargs.update(watcher_kwargs) |
|---|
| 151 | eventually(o, **kwargs) |
|---|
| 152 | |
|---|
| 153 | def cancel(self): |
|---|
| 154 | wr,methname = self._canceler |
|---|
| 155 | o = wr() |
|---|
| 156 | if o: |
|---|
| 157 | getattr(o,methname)(self) |
|---|