source: trunk/integration/grid.py

Last change on this file was 9758569, checked in by meejah <meejah@…>, at 2023-08-09T21:16:07Z

obsolete comment

  • Property mode set to 100644
File size: 15.7 KB
Line 
1"""
2Classes which directly represent various kinds of Tahoe processes
3that co-operate to for "a Grid".
4
5These methods and objects are used by conftest.py fixtures but may
6also be used as direct helpers for tests that don't want to (or can't)
7rely on 'the' global grid as provided by fixtures like 'alice' or
8'storage_servers'.
9"""
10
11from os import mkdir, listdir
12from os.path import join, exists
13from json import loads
14from tempfile import mktemp
15from time import sleep
16
17from eliot import (
18    log_call,
19)
20
21from foolscap.furl import (
22    decode_furl,
23)
24
25from twisted.python.procutils import which
26from twisted.internet.defer import (
27    inlineCallbacks,
28    returnValue,
29    Deferred,
30)
31from twisted.internet.task import (
32    deferLater,
33)
34from twisted.internet.interfaces import (
35    IProcessTransport,
36    IProcessProtocol,
37)
38from twisted.internet.error import ProcessTerminated
39
40from allmydata.util.attrs_provides import (
41    provides,
42)
43from allmydata.node import read_config
44from .util import (
45    _CollectOutputProtocol,
46    _MagicTextProtocol,
47    _DumpOutputProtocol,
48    _ProcessExitedProtocol,
49    _run_node,
50    _cleanup_tahoe_process,
51    _tahoe_runner_optional_coverage,
52    TahoeProcess,
53    await_client_ready,
54    generate_ssh_key,
55    cli,
56    reconfigure,
57    _create_node,
58)
59
60import attr
61import pytest_twisted
62
63
64# currently, we pass a "request" around a bunch but it seems to only
65# be for addfinalizer() calls.
66# - is "keeping" a request like that okay? What if it's a session-scoped one?
67#   (i.e. in Grid etc)
68# - maybe limit to "a callback to hang your cleanup off of" (instead of request)?
69
70
71@attr.s
72class FlogGatherer(object):
73    """
74    Flog Gatherer process.
75    """
76    process = attr.ib(
77        validator=provides(IProcessTransport)
78    )
79    protocol = attr.ib(
80        validator=provides(IProcessProtocol)
81    )
82    furl = attr.ib()
83
84
85@inlineCallbacks
86def create_flog_gatherer(reactor, request, temp_dir, flog_binary):
87    out_protocol = _CollectOutputProtocol()
88    gather_dir = join(temp_dir, 'flog_gather')
89    reactor.spawnProcess(
90        out_protocol,
91        flog_binary,
92        (
93            'flogtool', 'create-gatherer',
94            '--location', 'tcp:localhost:3117',
95            '--port', '3117',
96            gather_dir,
97        )
98    )
99    yield out_protocol.done
100
101    twistd_protocol = _MagicTextProtocol("Gatherer waiting at", "gatherer")
102    twistd_process = reactor.spawnProcess(
103        twistd_protocol,
104        which('twistd')[0],
105        (
106            'twistd', '--nodaemon', '--python',
107            join(gather_dir, 'gatherer.tac'),
108        ),
109        path=gather_dir,
110    )
111    yield twistd_protocol.magic_seen
112
113    def cleanup():
114        _cleanup_tahoe_process(twistd_process, twistd_protocol.exited)
115
116        flog_file = mktemp('.flog_dump')
117        flog_protocol = _DumpOutputProtocol(open(flog_file, 'w'))
118        flog_dir = join(temp_dir, 'flog_gather')
119        flogs = [x for x in listdir(flog_dir) if x.endswith('.flog')]
120
121        print("Dumping {} flogtool logfiles to '{}'".format(len(flogs), flog_file))
122        for flog_path in flogs:
123            reactor.spawnProcess(
124                flog_protocol,
125                flog_binary,
126                (
127                    'flogtool', 'dump', join(temp_dir, 'flog_gather', flog_path)
128                ),
129            )
130        print("Waiting for flogtool to complete")
131        try:
132            pytest_twisted.blockon(flog_protocol.done)
133        except ProcessTerminated as e:
134            print("flogtool exited unexpectedly: {}".format(str(e)))
135        print("Flogtool completed")
136
137    request.addfinalizer(cleanup)
138
139    with open(join(gather_dir, 'log_gatherer.furl'), 'r') as f:
140        furl = f.read().strip()
141    returnValue(
142        FlogGatherer(
143            protocol=twistd_protocol,
144            process=twistd_process,
145            furl=furl,
146        )
147    )
148
149
150@attr.s
151class StorageServer(object):
152    """
153    Represents a Tahoe Storage Server
154    """
155
156    process = attr.ib(
157        validator=attr.validators.instance_of(TahoeProcess)
158    )
159    protocol = attr.ib(
160        validator=provides(IProcessProtocol)
161    )
162
163    @inlineCallbacks
164    def restart(self, reactor, request):
165        """
166        re-start our underlying process by issuing a TERM, waiting and
167        then running again. await_client_ready() will be done as well
168
169        Note that self.process and self.protocol will be new instances
170        after this.
171        """
172        self.process.transport.signalProcess('TERM')
173        yield self.protocol.exited
174        self.process = yield _run_node(
175            reactor, self.process.node_dir, request, None,
176        )
177        self.protocol = self.process.transport.proto
178        yield await_client_ready(self.process)
179
180
181@inlineCallbacks
182def create_storage_server(reactor, request, temp_dir, introducer, flog_gatherer, name, web_port,
183                          needed=2, happy=3, total=4):
184    """
185    Create a new storage server
186    """
187    node_process = yield _create_node(
188        reactor, request, temp_dir, introducer.furl, flog_gatherer,
189        name, web_port, storage=True, needed=needed, happy=happy, total=total,
190    )
191    storage = StorageServer(
192        process=node_process,
193        # node_process is a TahoeProcess. its transport is an
194        # IProcessTransport.  in practice, this means it is a
195        # twisted.internet._baseprocess.BaseProcess. BaseProcess records the
196        # process protocol as its proto attribute.
197        protocol=node_process.transport.proto,
198    )
199    returnValue(storage)
200
201
202@attr.s
203class Client(object):
204    """
205    Represents a Tahoe client
206    """
207
208    process = attr.ib(
209        validator=attr.validators.instance_of(TahoeProcess)
210    )
211    protocol = attr.ib(
212        validator=provides(IProcessProtocol)
213    )
214    request = attr.ib()  # original request, for addfinalizer()
215
216## XXX convenience? or confusion?
217#    @property
218#    def node_dir(self):
219#        return self.process.node_dir
220
221    @inlineCallbacks
222    def reconfigure_zfec(self, reactor, zfec_params, convergence=None, max_segment_size=None):
223        """
224        Reconfigure the ZFEC parameters for this node
225        """
226        # XXX this is a stop-gap to keep tests running "as is"
227        # -> we should fix the tests so that they create a new client
228        #    in the grid with the required parameters, instead of
229        #    re-configuring Alice (or whomever)
230
231        rtn = yield Deferred.fromCoroutine(
232            reconfigure(reactor, self.request, self.process, zfec_params, convergence, max_segment_size)
233        )
234        return rtn
235
236    @inlineCallbacks
237    def restart(self, reactor, request, servers=1):
238        """
239        re-start our underlying process by issuing a TERM, waiting and
240        then running again.
241
242        :param int servers: number of server connections we will wait
243            for before being 'ready'
244
245        Note that self.process and self.protocol will be new instances
246        after this.
247        """
248        # XXX similar to above, can we make this return a new instance
249        # instead of mutating?
250        self.process.transport.signalProcess('TERM')
251        yield self.protocol.exited
252        process = yield _run_node(
253            reactor, self.process.node_dir, request, None,
254        )
255        self.process = process
256        self.protocol = self.process.transport.proto
257        yield await_client_ready(self.process, minimum_number_of_servers=servers)
258
259    @inlineCallbacks
260    def add_sftp(self, reactor, request):
261        """
262        """
263        # if other things need to add or change configuration, further
264        # refactoring could be useful here (i.e. move reconfigure
265        # parts to their own functions)
266
267        # XXX why do we need an alias?
268        # 1. Create a new RW directory cap:
269        cli(self.process, "create-alias", "test")
270        rwcap = loads(cli(self.process, "list-aliases", "--json"))["test"]["readwrite"]
271
272        # 2. Enable SFTP on the node:
273        host_ssh_key_path = join(self.process.node_dir, "private", "ssh_host_rsa_key")
274        sftp_client_key_path = join(self.process.node_dir, "private", "ssh_client_rsa_key")
275        accounts_path = join(self.process.node_dir, "private", "accounts")
276        with open(join(self.process.node_dir, "tahoe.cfg"), "a") as f:
277            f.write(
278                ("\n\n[sftpd]\n"
279                 "enabled = true\n"
280                 "port = tcp:8022:interface=127.0.0.1\n"
281                 "host_pubkey_file = {ssh_key_path}.pub\n"
282                 "host_privkey_file = {ssh_key_path}\n"
283                 "accounts.file = {accounts_path}\n").format(
284                     ssh_key_path=host_ssh_key_path,
285                     accounts_path=accounts_path,
286                 )
287            )
288        generate_ssh_key(host_ssh_key_path)
289
290        # 3. Add a SFTP access file with an SSH key for auth.
291        generate_ssh_key(sftp_client_key_path)
292        # Pub key format is "ssh-rsa <thekey> <username>". We want the key.
293        with open(sftp_client_key_path + ".pub") as pubkey_file:
294            ssh_public_key = pubkey_file.read().strip().split()[1]
295        with open(accounts_path, "w") as f:
296            f.write(
297                "alice-key ssh-rsa {ssh_public_key} {rwcap}\n".format(
298                    rwcap=rwcap,
299                    ssh_public_key=ssh_public_key,
300                )
301            )
302
303        # 4. Restart the node with new SFTP config.
304        print("restarting for SFTP")
305        yield self.restart(reactor, request)
306        print("restart done")
307        # XXX i think this is broken because we're "waiting for ready" during first bootstrap? or something?
308
309
310@inlineCallbacks
311def create_client(reactor, request, temp_dir, introducer, flog_gatherer, name, web_port,
312                  needed=2, happy=3, total=4):
313    """
314    Create a new storage server
315    """
316    from .util import _create_node
317    node_process = yield _create_node(
318        reactor, request, temp_dir, introducer.furl, flog_gatherer,
319        name, web_port, storage=False, needed=needed, happy=happy, total=total,
320    )
321    returnValue(
322        Client(
323            process=node_process,
324            protocol=node_process.transport.proto,
325            request=request,
326        )
327    )
328
329
330@attr.s
331class Introducer(object):
332    """
333    Reprsents a running introducer
334    """
335
336    process = attr.ib(
337        validator=attr.validators.instance_of(TahoeProcess)
338    )
339    protocol = attr.ib(
340        validator=provides(IProcessProtocol)
341    )
342    furl = attr.ib()
343
344
345def _validate_furl(furl_fname):
346    """
347    Opens and validates a fURL, ensuring location hints.
348    :returns: the furl
349    :raises: ValueError if no location hints
350    """
351    while not exists(furl_fname):
352        print("Don't see {} yet".format(furl_fname))
353        sleep(.1)
354    furl = open(furl_fname, 'r').read()
355    tubID, location_hints, name = decode_furl(furl)
356    if not location_hints:
357        # If there are no location hints then nothing can ever possibly
358        # connect to it and the only thing that can happen next is something
359        # will hang or time out.  So just give up right now.
360        raise ValueError(
361            "Introducer ({!r}) fURL has no location hints!".format(
362                furl,
363            ),
364        )
365    return furl
366
367
368@inlineCallbacks
369@log_call(
370    action_type=u"integration:introducer",
371    include_args=["temp_dir", "flog_gatherer"],
372    include_result=False,
373)
374def create_introducer(reactor, request, temp_dir, flog_gatherer, port):
375    """
376    Run a new Introducer and return an Introducer instance.
377    """
378    intro_dir = join(temp_dir, 'introducer{}'.format(port))
379
380    if not exists(intro_dir):
381        mkdir(intro_dir)
382        done_proto = _ProcessExitedProtocol()
383        _tahoe_runner_optional_coverage(
384            done_proto,
385            reactor,
386            request,
387            (
388                'create-introducer',
389                '--listen=tcp',
390                '--hostname=localhost',
391                intro_dir,
392            ),
393        )
394        yield done_proto.done
395
396    config = read_config(intro_dir, "tub.port")
397    config.set_config("node", "nickname", f"introducer-{port}")
398    config.set_config("node", "web.port", f"{port}")
399    config.set_config("node", "log_gatherer.furl", flog_gatherer.furl)
400
401    # on windows, "tahoe start" means: run forever in the foreground,
402    # but on linux it means daemonize. "tahoe run" is consistent
403    # between platforms.
404    protocol = _MagicTextProtocol('introducer running', "introducer")
405    transport = _tahoe_runner_optional_coverage(
406        protocol,
407        reactor,
408        request,
409        (
410            'run',
411            intro_dir,
412        ),
413    )
414
415    def clean():
416        return _cleanup_tahoe_process(transport, protocol.exited)
417    request.addfinalizer(clean)
418
419    yield protocol.magic_seen
420
421    furl_fname = join(intro_dir, 'private', 'introducer.furl')
422    while not exists(furl_fname):
423        print("Don't see {} yet".format(furl_fname))
424        yield deferLater(reactor, .1, lambda: None)
425    furl = _validate_furl(furl_fname)
426
427    returnValue(
428        Introducer(
429            process=TahoeProcess(transport, intro_dir),
430            protocol=protocol,
431            furl=furl,
432        )
433    )
434
435
436@attr.s
437class Grid(object):
438    """
439    Represents an entire Tahoe Grid setup
440
441    A Grid includes an Introducer, Flog Gatherer and some number of
442    Storage Servers. Optionally includes Clients.
443    """
444
445    _reactor = attr.ib()
446    _request = attr.ib()
447    _temp_dir = attr.ib()
448    _port_allocator = attr.ib()
449    introducer = attr.ib()
450    flog_gatherer = attr.ib()
451    storage_servers = attr.ib(factory=list)
452    clients = attr.ib(factory=dict)
453
454    @storage_servers.validator
455    def check(self, attribute, value):
456        for server in value:
457            if not isinstance(server, StorageServer):
458                raise ValueError(
459                    "storage_servers must be StorageServer"
460                )
461
462    @inlineCallbacks
463    def add_storage_node(self):
464        """
465        Creates a new storage node, returns a StorageServer instance
466        (which will already be added to our .storage_servers list)
467        """
468        port = yield self._port_allocator()
469        print("make {}".format(port))
470        name = 'node{}'.format(port)
471        web_port = 'tcp:{}:interface=localhost'.format(port)
472        server = yield create_storage_server(
473            self._reactor,
474            self._request,
475            self._temp_dir,
476            self.introducer,
477            self.flog_gatherer,
478            name,
479            web_port,
480        )
481        self.storage_servers.append(server)
482        returnValue(server)
483
484    @inlineCallbacks
485    def add_client(self, name, needed=2, happy=3, total=4):
486        """
487        Create a new client node
488        """
489        port = yield self._port_allocator()
490        web_port = 'tcp:{}:interface=localhost'.format(port)
491        client = yield create_client(
492            self._reactor,
493            self._request,
494            self._temp_dir,
495            self.introducer,
496            self.flog_gatherer,
497            name,
498            web_port,
499            needed=needed,
500            happy=happy,
501            total=total,
502        )
503        self.clients[name] = client
504        yield await_client_ready(client.process)
505        returnValue(client)
506
507
508# A grid is now forever tied to its original 'request' which is where
509# it must hang finalizers off of. The "main" one is a session-level
510# fixture so it'll live the life of the tests but it could be
511# per-function Grid too.
512@inlineCallbacks
513def create_grid(reactor, request, temp_dir, flog_gatherer, port_allocator):
514    """
515    Create a new grid. This will have one Introducer but zero
516    storage-servers or clients; those must be added by a test or
517    subsequent fixtures.
518    """
519    intro_port = yield port_allocator()
520    introducer = yield create_introducer(reactor, request, temp_dir, flog_gatherer, intro_port)
521    grid = Grid(
522        reactor,
523        request,
524        temp_dir,
525        port_allocator,
526        introducer,
527        flog_gatherer,
528    )
529    returnValue(grid)
Note: See TracBrowser for help on using the repository browser.