1 | """ |
---|
2 | Observer for Twisted code. |
---|
3 | |
---|
4 | Ported to Python 3. |
---|
5 | """ |
---|
6 | |
---|
7 | import weakref |
---|
8 | from twisted.internet import defer |
---|
9 | from foolscap.api import eventually |
---|
10 | from twisted.logger import ( |
---|
11 | Logger, |
---|
12 | ) |
---|
13 | |
---|
14 | """The idiom we use is for the observed object to offer a method named |
---|
15 | 'when_something', which returns a deferred. That deferred will be fired when |
---|
16 | something happens. The way this is typically implemented is that the observed |
---|
17 | has an ObserverList whose when_fired method is called in the observed's |
---|
18 | 'when_something'.""" |
---|
19 | |
---|
20 | class OneShotObserverList: |
---|
21 | """A one-shot event distributor.""" |
---|
22 | def __init__(self): |
---|
23 | self._fired = False |
---|
24 | self._result = None |
---|
25 | self._watchers = [] |
---|
26 | self.__repr__ = self._unfired_repr |
---|
27 | |
---|
28 | def __repr__(self): |
---|
29 | """string representation of the OneshotObserverList""" |
---|
30 | if self._fired: |
---|
31 | return self._fired_repr() |
---|
32 | return self._unfired_repr() |
---|
33 | |
---|
34 | def _unfired_repr(self): |
---|
35 | return "<OneShotObserverList [%s]>" % (self._watchers, ) |
---|
36 | |
---|
37 | def _fired_repr(self): |
---|
38 | return "<OneShotObserverList -> %s>" % (self._result, ) |
---|
39 | |
---|
40 | def _get_result(self): |
---|
41 | return self._result |
---|
42 | |
---|
43 | def when_fired(self): |
---|
44 | if self._fired: |
---|
45 | return defer.succeed(self._get_result()) |
---|
46 | d = defer.Deferred() |
---|
47 | self._watchers.append(d) |
---|
48 | return d |
---|
49 | |
---|
50 | def fire(self, result): |
---|
51 | assert not self._fired |
---|
52 | self._fired = True |
---|
53 | self._result = result |
---|
54 | self._fire(result) |
---|
55 | |
---|
56 | def _fire(self, result): |
---|
57 | for w in self._watchers: |
---|
58 | w.callback(result) |
---|
59 | del self._watchers |
---|
60 | self.__repr__ = self._fired_repr |
---|
61 | |
---|
62 | def fire_if_not_fired(self, result): |
---|
63 | if not self._fired: |
---|
64 | self.fire(result) |
---|
65 | |
---|
66 | class LazyOneShotObserverList(OneShotObserverList): |
---|
67 | """ |
---|
68 | a variant of OneShotObserverList which does not retain |
---|
69 | the result it handles, but rather retains a callable() |
---|
70 | through which is retrieves the data if and when needed. |
---|
71 | """ |
---|
72 | def __init__(self): |
---|
73 | OneShotObserverList.__init__(self) |
---|
74 | |
---|
75 | def _get_result(self): |
---|
76 | return self._result_producer() |
---|
77 | |
---|
78 | def fire(self, result_producer): |
---|
79 | """ |
---|
80 | @param result_producer: a no-arg callable which |
---|
81 | returns the data which is to be considered the |
---|
82 | 'result' for this observer list. note that this |
---|
83 | function may be called multiple times - once |
---|
84 | upon initial firing, and potentially once more |
---|
85 | for each subsequent when_fired() deferred created |
---|
86 | """ |
---|
87 | assert not self._fired |
---|
88 | self._fired = True |
---|
89 | self._result_producer = result_producer |
---|
90 | if self._watchers: # if not, don't call result_producer |
---|
91 | self._fire(self._get_result()) |
---|
92 | |
---|
93 | class ObserverList: |
---|
94 | """ |
---|
95 | Immediately distribute events to a number of subscribers. |
---|
96 | """ |
---|
97 | _logger = Logger() |
---|
98 | |
---|
99 | def __init__(self): |
---|
100 | self._watchers = [] |
---|
101 | |
---|
102 | def subscribe(self, observer): |
---|
103 | self._watchers.append(observer) |
---|
104 | |
---|
105 | def unsubscribe(self, observer): |
---|
106 | self._watchers.remove(observer) |
---|
107 | |
---|
108 | def notify(self, *args, **kwargs): |
---|
109 | for o in self._watchers[:]: |
---|
110 | try: |
---|
111 | o(*args, **kwargs) |
---|
112 | except Exception: |
---|
113 | self._logger.failure("While notifying {o!r}", o=o) |
---|
114 | |
---|
115 | class EventStreamObserver: |
---|
116 | """A simple class to distribute multiple events to a single subscriber. |
---|
117 | It accepts arbitrary kwargs, but no posargs.""" |
---|
118 | def __init__(self): |
---|
119 | self._watcher = None |
---|
120 | self._undelivered_results = [] |
---|
121 | self._canceler = None |
---|
122 | |
---|
123 | def set_canceler(self, c, methname): |
---|
124 | """I will call c.METHNAME(self) when somebody cancels me.""" |
---|
125 | # we use a weakref to avoid creating a cycle between us and the thing |
---|
126 | # we're observing: they'll be holding a reference to us to compare |
---|
127 | # against the value we pass to their canceler function. However, |
---|
128 | # since bound methods are first-class objects (and not kept alive by |
---|
129 | # the object they're bound to), we can't just stash a weakref to the |
---|
130 | # bound cancel method. Instead, we must hold a weakref to the actual |
---|
131 | # object, and obtain its cancel method later. |
---|
132 | # http://code.activestate.com/recipes/81253-weakmethod/ has an |
---|
133 | # alternative. |
---|
134 | self._canceler = (weakref.ref(c), methname) |
---|
135 | |
---|
136 | def subscribe(self, observer, **watcher_kwargs): |
---|
137 | self._watcher = (observer, watcher_kwargs) |
---|
138 | while self._undelivered_results: |
---|
139 | self._notify(self._undelivered_results.pop(0)) |
---|
140 | |
---|
141 | def notify(self, **result_kwargs): |
---|
142 | if self._watcher: |
---|
143 | self._notify(result_kwargs) |
---|
144 | else: |
---|
145 | self._undelivered_results.append(result_kwargs) |
---|
146 | |
---|
147 | def _notify(self, result_kwargs): |
---|
148 | o, watcher_kwargs = self._watcher |
---|
149 | kwargs = dict(result_kwargs) |
---|
150 | kwargs.update(watcher_kwargs) |
---|
151 | eventually(o, **kwargs) |
---|
152 | |
---|
153 | def cancel(self): |
---|
154 | wr,methname = self._canceler |
---|
155 | o = wr() |
---|
156 | if o: |
---|
157 | getattr(o,methname)(self) |
---|