source: trunk/src/allmydata/introducer/client.py

Last change on this file was 2243ce3, checked in by Alexandre Detiste <alexandre.detiste@…>, at 2024-02-28T00:07:08Z

remove "from past.builtins import long"

  • Property mode set to 100644
File size: 15.0 KB
Line 
1"""
2Ported to Python 3.
3"""
4
5from six import ensure_text, ensure_str
6
7import time
8from zope.interface import implementer
9from twisted.application import service
10from foolscap.api import Referenceable
11from allmydata.interfaces import InsufficientVersionError
12from allmydata.introducer.interfaces import IIntroducerClient, \
13     RIIntroducerSubscriberClient_v2
14from allmydata.introducer.common import sign_to_foolscap, unsign_from_foolscap,\
15     get_tubid_string_from_ann
16from allmydata.util import log, yamlutil, connection_status
17from allmydata.util.rrefutil import add_version_to_remote_reference
18from allmydata.util.observer import (
19    ObserverList,
20)
21from allmydata.crypto.error import BadSignature
22from allmydata.util.assertutil import precondition
23
24class InvalidCacheError(Exception):
25    pass
26
27V2 = b"http://allmydata.org/tahoe/protocols/introducer/v2"
28
29@implementer(RIIntroducerSubscriberClient_v2, IIntroducerClient)  # type: ignore[misc]
30class IntroducerClient(service.Service, Referenceable):
31
32    def __init__(self, tub, introducer_furl,
33                 nickname, my_version, oldest_supported,
34                 sequencer, cache_filepath):
35        self._tub = tub
36        self.introducer_furl = introducer_furl
37
38        assert isinstance(nickname, str)
39        self._nickname = nickname
40        self._my_version = my_version
41        self._oldest_supported = oldest_supported
42        self._sequencer = sequencer
43        self._cache_filepath = cache_filepath
44
45        self._my_subscriber_info = { b"version": 0,
46                                     b"nickname": self._nickname,
47                                     b"app-versions": [],
48                                     b"my-version": self._my_version,
49                                     b"oldest-supported": self._oldest_supported,
50                                     }
51
52        self._outbound_announcements = {} # not signed
53        self._published_announcements = {} # signed
54        self._canary = Referenceable()
55
56        self._publisher = None
57        self._since = None
58
59        self._local_subscribers = {} # {servicename: ObserverList}
60        self._subscriptions = set() # requests we've actually sent
61
62        # _inbound_announcements remembers one announcement per
63        # (servicename,serverid) pair. Anything that arrives with the same
64        # pair will displace the previous one. This stores tuples of
65        # (unpacked announcement dictionary, verifyingkey, rxtime). The ann
66        # dicts can be compared for equality to distinguish re-announcement
67        # from updates. It also provides memory for clients who subscribe
68        # after startup.
69        self._inbound_announcements = {}
70
71        # hooks for unit tests
72        self._debug_counts = {
73            "inbound_message": 0,
74            "inbound_announcement": 0,
75            "wrong_service": 0,
76            "duplicate_announcement": 0,
77            "update": 0,
78            "new_announcement": 0,
79            "outbound_message": 0,
80            }
81        self._debug_outstanding = 0
82
83    def _debug_retired(self, res):
84        self._debug_outstanding -= 1
85        return res
86
87    def startService(self):
88        service.Service.startService(self)
89        self._introducer_error = None
90        rc = self._tub.connectTo(ensure_str(self.introducer_furl), self._got_introducer)
91        self._introducer_reconnector = rc
92        def connect_failed(failure):
93            self.log("Initial Introducer connection failed: perhaps it's down",
94                     level=log.WEIRD, failure=failure, umid="c5MqUQ")
95            self._load_announcements()
96        d = self._tub.getReference(self.introducer_furl)
97        d.addErrback(connect_failed)
98
99    def _load_announcements(self):
100        try:
101            with self._cache_filepath.open() as f:
102                servers = yamlutil.safe_load(f)
103        except EnvironmentError:
104            return # no cache file
105        if not isinstance(servers, list):
106            log.err(InvalidCacheError("not a list"), level=log.WEIRD)
107            return
108        self.log("Using server data from cache", level=log.UNUSUAL)
109        for server_params in servers:
110            if not isinstance(server_params, dict):
111                log.err(InvalidCacheError("not a dict: %r" % (server_params,)),
112                        level=log.WEIRD)
113                continue
114            # everything coming from yamlutil.safe_load is unicode
115            key_s = server_params['key_s'].encode("ascii")
116            self._deliver_announcements(key_s, server_params['ann'])
117
118    def _save_announcements(self):
119        announcements = []
120        for value in self._inbound_announcements.values():
121            ann, key_s, time_stamp = value
122            # On Python 2, bytes strings are encoded into YAML Unicode strings.
123            # On Python 3, bytes are encoded as YAML bytes. To minimize
124            # changes, Python 3 for now ensures the same is true.
125            server_params = {
126                "ann" : ann,
127                "key_s" : ensure_text(key_s),
128                }
129            announcements.append(server_params)
130        announcement_cache_yaml = yamlutil.safe_dump(announcements)
131        if isinstance(announcement_cache_yaml, str):
132            announcement_cache_yaml = announcement_cache_yaml.encode("utf-8")
133        self._cache_filepath.setContent(announcement_cache_yaml)
134
135    def _got_introducer(self, publisher):
136        self.log("connected to introducer, getting versions")
137        default = { b"http://allmydata.org/tahoe/protocols/introducer/v1":
138                    { },
139                    b"application-version": b"unknown: no get_version()",
140                    }
141        d = add_version_to_remote_reference(publisher, default)
142        d.addCallback(self._got_versioned_introducer)
143        d.addErrback(self._got_error)
144
145    def _got_error(self, f):
146        # TODO: for the introducer, perhaps this should halt the application
147        self._introducer_error = f # polled by tests
148
149    def _got_versioned_introducer(self, publisher):
150        self.log("got introducer version: %s" % (publisher.version,))
151        # we require an introducer that speaks at least V2
152        assert all(type(V2) == type(v) for v in publisher.version)
153        if V2 not in publisher.version:
154            raise InsufficientVersionError("V2", publisher.version)
155        self._publisher = publisher
156        self._since = int(time.time())
157        publisher.notifyOnDisconnect(self._disconnected)
158        self._maybe_publish()
159        self._maybe_subscribe()
160
161    def _disconnected(self):
162        self.log("bummer, we've lost our connection to the introducer")
163        self._publisher = None
164        self._since = int(time.time())
165        self._subscriptions.clear()
166
167    def log(self, *args, **kwargs):
168        if "facility" not in kwargs:
169            kwargs["facility"] = "tahoe.introducer.client"
170        return log.msg(*args, **kwargs)
171
172    def subscribe_to(self, service_name, callback, *args, **kwargs):
173        obs = self._local_subscribers.setdefault(service_name, ObserverList())
174        obs.subscribe(lambda key_s, ann: callback(key_s, ann, *args, **kwargs))
175        self._maybe_subscribe()
176        for index,(ann,key_s,when) in list(self._inbound_announcements.items()):
177            precondition(isinstance(key_s, bytes), key_s)
178            servicename = index[0]
179            if servicename == service_name:
180                obs.notify(key_s, ann)
181
182    def _maybe_subscribe(self):
183        if not self._publisher:
184            self.log("want to subscribe, but no introducer yet",
185                     level=log.NOISY)
186            return
187        for service_name in self._local_subscribers:
188            if service_name in self._subscriptions:
189                continue
190            self._subscriptions.add(service_name)
191            self._debug_outstanding += 1
192            d = self._publisher.callRemote("subscribe_v2",
193                                           self, service_name.encode("utf-8"),
194                                           self._my_subscriber_info)
195            d.addBoth(self._debug_retired)
196            d.addErrback(log.err, facility="tahoe.introducer.client",
197                         level=log.WEIRD, umid="2uMScQ")
198
199    def create_announcement_dict(self, service_name, ann):
200        ann_d = { "version": 0,
201                  # "seqnum" and "nonce" will be populated with new values in
202                  # publish(), each time we make a change
203                  "nickname": self._nickname,
204                  "app-versions": [],
205                  "my-version": self._my_version,
206                  "oldest-supported": self._oldest_supported,
207
208                  "service-name": service_name,
209                  }
210        ann_d.update(ann)
211        return ann_d
212
213    def publish(self, service_name, ann, signing_key):
214        # we increment the seqnum every time we publish something new
215        current_seqnum, current_nonce = self._sequencer()
216
217        ann_d = self.create_announcement_dict(service_name, ann)
218        self._outbound_announcements[service_name] = ann_d
219
220        # publish all announcements with the new seqnum and nonce
221        for service_name,ann_d in list(self._outbound_announcements.items()):
222            ann_d["seqnum"] = current_seqnum
223            ann_d["nonce"] = current_nonce
224            ann_t = sign_to_foolscap(ann_d, signing_key)
225            self._published_announcements[service_name] = ann_t
226        self._maybe_publish()
227
228    def _maybe_publish(self):
229        if not self._publisher:
230            self.log("want to publish, but no introducer yet", level=log.NOISY)
231            return
232        # this re-publishes everything. The Introducer ignores duplicates
233        for ann_t in list(self._published_announcements.values()):
234            self._debug_counts["outbound_message"] += 1
235            self._debug_outstanding += 1
236            d = self._publisher.callRemote("publish_v2", ann_t, self._canary)
237            d.addBoth(self._debug_retired)
238            d.addErrback(log.err, ann_t=ann_t,
239                         facility="tahoe.introducer.client",
240                         level=log.WEIRD, umid="xs9pVQ")
241
242    def remote_announce_v2(self, announcements):
243        lp = self.log("received %d announcements (v2)" % len(announcements))
244        return self.got_announcements(announcements, lp)
245
246    def got_announcements(self, announcements, lp=None):
247        self._debug_counts["inbound_message"] += 1
248        for ann_t in announcements:
249            try:
250                # this might raise UnknownKeyError or bad-sig error
251                ann, key_s = unsign_from_foolscap(ann_t)
252                # key is "v0-base32abc123"
253                precondition(isinstance(key_s, bytes), key_s)
254            except BadSignature:
255                self.log("bad signature on inbound announcement: %s" % (ann_t,),
256                         parent=lp, level=log.WEIRD, umid="ZAU15Q")
257                # process other announcements that arrived with the bad one
258                continue
259
260            self._process_announcement(ann, key_s)
261
262    def _process_announcement(self, ann, key_s):
263        precondition(isinstance(key_s, bytes), key_s)
264        self._debug_counts["inbound_announcement"] += 1
265        service_name = str(ann["service-name"])
266        if service_name not in self._local_subscribers:
267            self.log("announcement for a service we don't care about [%s]"
268                     % (service_name,), level=log.UNUSUAL, umid="dIpGNA")
269            self._debug_counts["wrong_service"] += 1
270            return
271        # for ASCII values, simplejson might give us unicode *or* bytes
272        if "nickname" in ann and isinstance(ann["nickname"], bytes):
273            ann["nickname"] = str(ann["nickname"])
274        nick_s = ann.get("nickname",u"").encode("utf-8")
275        lp2 = self.log(format="announcement for nickname '%(nick)s', service=%(svc)s: %(ann)s",
276                       nick=nick_s, svc=service_name, ann=ann, umid="BoKEag")
277
278        # how do we describe this node in the logs?
279        desc_bits = []
280        assert key_s
281        desc_bits.append(b"serverid=" + key_s[:20])
282        if "anonymous-storage-FURL" in ann:
283            tubid_s = get_tubid_string_from_ann(ann)
284            desc_bits.append(b"tubid=" + tubid_s[:8])
285        description = b"/".join(desc_bits)
286
287        # the index is used to track duplicates
288        index = (service_name, key_s)
289
290        # is this announcement a duplicate?
291        if (index in self._inbound_announcements
292            and self._inbound_announcements[index][0] == ann):
293            self.log(format="reannouncement for [%(service)s]:%(description)s, ignoring",
294                     service=service_name, description=description,
295                     parent=lp2, level=log.UNUSUAL, umid="B1MIdA")
296            self._debug_counts["duplicate_announcement"] += 1
297            return
298
299        # does it update an existing one?
300        if index in self._inbound_announcements:
301            old,_,_ = self._inbound_announcements[index]
302            if "seqnum" in old:
303                # must beat previous sequence number to replace
304                if ("seqnum" not in ann
305                    or not isinstance(ann["seqnum"], int)):
306                    self.log("not replacing old announcement, no valid seqnum: %s"
307                             % (ann,),
308                             parent=lp2, level=log.NOISY, umid="zFGH3Q")
309                    return
310                if ann["seqnum"] <= old["seqnum"]:
311                    # note that exact replays are caught earlier, by
312                    # comparing the entire signed announcement.
313                    self.log("not replacing old announcement, "
314                             "new seqnum is too old (%s <= %s) "
315                             "(replay attack?): %s"
316                             % (ann["seqnum"], old["seqnum"], ann),
317                             parent=lp2, level=log.UNUSUAL, umid="JAAAoQ")
318                    return
319                # ok, seqnum is newer, allow replacement
320            self._debug_counts["update"] += 1
321            self.log("replacing old announcement: %s" % (ann,),
322                     parent=lp2, level=log.NOISY, umid="wxwgIQ")
323        else:
324            self._debug_counts["new_announcement"] += 1
325            self.log("new announcement[%s]" % service_name,
326                     parent=lp2, level=log.NOISY)
327
328        self._inbound_announcements[index] = (ann, key_s, time.time())
329        self._save_announcements()
330        # note: we never forget an index, but we might update its value
331
332        self._deliver_announcements(key_s, ann)
333
334    def _deliver_announcements(self, key_s, ann):
335        precondition(isinstance(key_s, bytes), key_s)
336        service_name = str(ann["service-name"])
337        obs = self._local_subscribers.get(service_name)
338        if obs is not None:
339            obs.notify(key_s, ann)
340
341    def connection_status(self):
342        assert self.running # startService builds _introducer_reconnector
343        irc = self._introducer_reconnector
344        last_received = (self._publisher.getDataLastReceivedAt()
345                         if self._publisher
346                         else None)
347        return connection_status.from_foolscap_reconnector(irc, last_received)
348
349    def connected_to_introducer(self):
350        return bool(self._publisher)
351
352    def get_since(self):
353        return self._since
Note: See TracBrowser for help on using the repository browser.