| 1 | """ |
|---|
| 2 | Ported to Python 3. |
|---|
| 3 | """ |
|---|
| 4 | from __future__ import annotations |
|---|
| 5 | |
|---|
| 6 | import os |
|---|
| 7 | from struct import ( |
|---|
| 8 | pack, |
|---|
| 9 | ) |
|---|
| 10 | from functools import ( |
|---|
| 11 | partial, |
|---|
| 12 | ) |
|---|
| 13 | |
|---|
| 14 | import attr |
|---|
| 15 | |
|---|
| 16 | from twisted.internet import defer |
|---|
| 17 | from twisted.trial import unittest |
|---|
| 18 | from twisted.application import service |
|---|
| 19 | |
|---|
| 20 | from foolscap.api import Tub, fireEventually, flushEventualQueue |
|---|
| 21 | |
|---|
| 22 | from eliot.twisted import ( |
|---|
| 23 | inline_callbacks, |
|---|
| 24 | ) |
|---|
| 25 | |
|---|
| 26 | from allmydata.introducer.client import IntroducerClient |
|---|
| 27 | from allmydata.crypto import aes |
|---|
| 28 | from allmydata.storage.server import ( |
|---|
| 29 | si_b2a, |
|---|
| 30 | StorageServer, |
|---|
| 31 | FoolscapStorageServer, |
|---|
| 32 | ) |
|---|
| 33 | from allmydata.storage_client import StorageFarmBroker |
|---|
| 34 | from allmydata.immutable.layout import ( |
|---|
| 35 | make_write_bucket_proxy, |
|---|
| 36 | ) |
|---|
| 37 | from allmydata.immutable import offloaded, upload |
|---|
| 38 | from allmydata import uri, client |
|---|
| 39 | from allmydata.util import hashutil, fileutil, mathutil, dictutil |
|---|
| 40 | |
|---|
| 41 | from .no_network import ( |
|---|
| 42 | NoNetworkServer, |
|---|
| 43 | LocalWrapper, |
|---|
| 44 | fireNow, |
|---|
| 45 | ) |
|---|
| 46 | from .common import ( |
|---|
| 47 | EMPTY_CLIENT_CONFIG, |
|---|
| 48 | SyncTestCase, |
|---|
| 49 | ) |
|---|
| 50 | |
|---|
| 51 | from testtools.matchers import ( |
|---|
| 52 | Equals, |
|---|
| 53 | MatchesListwise, |
|---|
| 54 | IsInstance, |
|---|
| 55 | ) |
|---|
| 56 | from testtools.twistedsupport import ( |
|---|
| 57 | succeeded, |
|---|
| 58 | ) |
|---|
| 59 | |
|---|
| 60 | MiB = 1024*1024 |
|---|
| 61 | |
|---|
| 62 | DATA = b"I need help\n" * 1000 |
|---|
| 63 | |
|---|
| 64 | class CHKUploadHelper_fake(offloaded.CHKUploadHelper): |
|---|
| 65 | def start_encrypted(self, eu): |
|---|
| 66 | d = eu.get_size() |
|---|
| 67 | def _got_size(size): |
|---|
| 68 | d2 = eu.get_all_encoding_parameters() |
|---|
| 69 | def _got_parms(parms): |
|---|
| 70 | # just pretend we did the upload |
|---|
| 71 | needed_shares, happy, total_shares, segsize = parms |
|---|
| 72 | ueb_data = {"needed_shares": needed_shares, |
|---|
| 73 | "total_shares": total_shares, |
|---|
| 74 | "segment_size": segsize, |
|---|
| 75 | "size": size, |
|---|
| 76 | } |
|---|
| 77 | ueb_hash = b"fake" |
|---|
| 78 | v = uri.CHKFileVerifierURI(self._storage_index, b"x"*32, |
|---|
| 79 | needed_shares, total_shares, size) |
|---|
| 80 | _UR = upload.UploadResults |
|---|
| 81 | ur = _UR(file_size=size, |
|---|
| 82 | ciphertext_fetched=0, |
|---|
| 83 | preexisting_shares=0, |
|---|
| 84 | pushed_shares=total_shares, |
|---|
| 85 | sharemap={}, |
|---|
| 86 | servermap={}, |
|---|
| 87 | timings={}, |
|---|
| 88 | uri_extension_data=ueb_data, |
|---|
| 89 | uri_extension_hash=ueb_hash, |
|---|
| 90 | verifycapstr=v.to_string()) |
|---|
| 91 | self._upload_status.set_results(ur) |
|---|
| 92 | return ur |
|---|
| 93 | d2.addCallback(_got_parms) |
|---|
| 94 | return d2 |
|---|
| 95 | d.addCallback(_got_size) |
|---|
| 96 | return d |
|---|
| 97 | |
|---|
| 98 | @attr.s |
|---|
| 99 | class FakeCHKCheckerAndUEBFetcher: |
|---|
| 100 | """ |
|---|
| 101 | A fake of ``CHKCheckerAndUEBFetcher`` which hard-codes some check result. |
|---|
| 102 | """ |
|---|
| 103 | peer_getter = attr.ib() |
|---|
| 104 | storage_index = attr.ib() |
|---|
| 105 | logparent = attr.ib() |
|---|
| 106 | |
|---|
| 107 | _sharemap = attr.ib() |
|---|
| 108 | _ueb_data = attr.ib() |
|---|
| 109 | |
|---|
| 110 | @property |
|---|
| 111 | def _ueb_hash(self): |
|---|
| 112 | return hashutil.uri_extension_hash( |
|---|
| 113 | uri.pack_extension(self._ueb_data), |
|---|
| 114 | ) |
|---|
| 115 | |
|---|
| 116 | def check(self): |
|---|
| 117 | return defer.succeed(( |
|---|
| 118 | self._sharemap, |
|---|
| 119 | self._ueb_data, |
|---|
| 120 | self._ueb_hash, |
|---|
| 121 | )) |
|---|
| 122 | |
|---|
| 123 | class FakeClient(service.MultiService): |
|---|
| 124 | introducer_clients : list[IntroducerClient] = [] |
|---|
| 125 | DEFAULT_ENCODING_PARAMETERS = {"k":25, |
|---|
| 126 | "happy": 75, |
|---|
| 127 | "n": 100, |
|---|
| 128 | "max_segment_size": 1*MiB, |
|---|
| 129 | } |
|---|
| 130 | |
|---|
| 131 | def get_encoding_parameters(self): |
|---|
| 132 | return self.DEFAULT_ENCODING_PARAMETERS |
|---|
| 133 | def get_storage_broker(self): |
|---|
| 134 | return self.storage_broker |
|---|
| 135 | |
|---|
| 136 | def flush_but_dont_ignore(res): |
|---|
| 137 | d = flushEventualQueue() |
|---|
| 138 | def _done(ignored): |
|---|
| 139 | return res |
|---|
| 140 | d.addCallback(_done) |
|---|
| 141 | return d |
|---|
| 142 | |
|---|
| 143 | def wait_a_few_turns(ignored=None): |
|---|
| 144 | d = fireEventually() |
|---|
| 145 | d.addCallback(fireEventually) |
|---|
| 146 | d.addCallback(fireEventually) |
|---|
| 147 | d.addCallback(fireEventually) |
|---|
| 148 | d.addCallback(fireEventually) |
|---|
| 149 | d.addCallback(fireEventually) |
|---|
| 150 | return d |
|---|
| 151 | |
|---|
| 152 | def upload_data(uploader, data, convergence): |
|---|
| 153 | u = upload.Data(data, convergence=convergence) |
|---|
| 154 | return uploader.upload(u) |
|---|
| 155 | |
|---|
| 156 | |
|---|
| 157 | def make_uploader(helper_furl, parent, override_name=None): |
|---|
| 158 | """ |
|---|
| 159 | Make an ``upload.Uploader`` service pointed at the given helper and with |
|---|
| 160 | the given service parent. |
|---|
| 161 | |
|---|
| 162 | :param bytes helper_furl: The Foolscap URL of the upload helper. |
|---|
| 163 | |
|---|
| 164 | :param IServiceCollection parent: A parent to assign to the new uploader. |
|---|
| 165 | |
|---|
| 166 | :param str override_name: If not ``None``, a new name for the uploader |
|---|
| 167 | service. Multiple services cannot coexist with the same name. |
|---|
| 168 | """ |
|---|
| 169 | u = upload.Uploader(helper_furl) |
|---|
| 170 | if override_name is not None: |
|---|
| 171 | u.name = override_name |
|---|
| 172 | u.setServiceParent(parent) |
|---|
| 173 | return u |
|---|
| 174 | |
|---|
| 175 | |
|---|
| 176 | class AssistedUpload(unittest.TestCase): |
|---|
| 177 | def setUp(self): |
|---|
| 178 | self.tub = t = Tub() |
|---|
| 179 | t.setOption("expose-remote-exception-types", False) |
|---|
| 180 | self.s = FakeClient() |
|---|
| 181 | self.s.storage_broker = StorageFarmBroker( |
|---|
| 182 | True, |
|---|
| 183 | lambda h: self.tub, |
|---|
| 184 | EMPTY_CLIENT_CONFIG, |
|---|
| 185 | ) |
|---|
| 186 | self.s.secret_holder = client.SecretHolder(b"lease secret", b"converge") |
|---|
| 187 | self.s.startService() |
|---|
| 188 | |
|---|
| 189 | t.setServiceParent(self.s) |
|---|
| 190 | self.s.tub = t |
|---|
| 191 | # we never actually use this for network traffic, so it can use a |
|---|
| 192 | # bogus host/port |
|---|
| 193 | t.setLocation(b"bogus:1234") |
|---|
| 194 | |
|---|
| 195 | def setUpHelper(self, basedir, chk_upload=CHKUploadHelper_fake, chk_checker=None): |
|---|
| 196 | fileutil.make_dirs(basedir) |
|---|
| 197 | self.helper = offloaded.Helper( |
|---|
| 198 | basedir, |
|---|
| 199 | self.s.storage_broker, |
|---|
| 200 | self.s.secret_holder, |
|---|
| 201 | None, |
|---|
| 202 | None, |
|---|
| 203 | ) |
|---|
| 204 | if chk_upload is not None: |
|---|
| 205 | self.helper.chk_upload = chk_upload |
|---|
| 206 | if chk_checker is not None: |
|---|
| 207 | self.helper.chk_checker = chk_checker |
|---|
| 208 | self.helper_furl = self.tub.registerReference(self.helper) |
|---|
| 209 | |
|---|
| 210 | def tearDown(self): |
|---|
| 211 | d = self.s.stopService() |
|---|
| 212 | d.addCallback(fireEventually) |
|---|
| 213 | d.addBoth(flush_but_dont_ignore) |
|---|
| 214 | return d |
|---|
| 215 | |
|---|
| 216 | def test_one(self): |
|---|
| 217 | """ |
|---|
| 218 | Some data that has never been uploaded before can be uploaded in CHK |
|---|
| 219 | format using the ``RIHelper`` provider and ``Uploader.upload``. |
|---|
| 220 | """ |
|---|
| 221 | self.basedir = "helper/AssistedUpload/test_one" |
|---|
| 222 | self.setUpHelper(self.basedir) |
|---|
| 223 | u = make_uploader(self.helper_furl, self.s) |
|---|
| 224 | |
|---|
| 225 | d = wait_a_few_turns() |
|---|
| 226 | |
|---|
| 227 | def _ready(res): |
|---|
| 228 | self.assertTrue( |
|---|
| 229 | u._helper, |
|---|
| 230 | "Expected uploader to have a helper reference, had {} instead.".format( |
|---|
| 231 | u._helper, |
|---|
| 232 | ), |
|---|
| 233 | ) |
|---|
| 234 | return upload_data(u, DATA, convergence=b"some convergence string") |
|---|
| 235 | d.addCallback(_ready) |
|---|
| 236 | |
|---|
| 237 | def _uploaded(results): |
|---|
| 238 | the_uri = results.get_uri() |
|---|
| 239 | self.assertIn(b"CHK", the_uri) |
|---|
| 240 | self.assertNotEqual( |
|---|
| 241 | results.get_pushed_shares(), |
|---|
| 242 | 0, |
|---|
| 243 | ) |
|---|
| 244 | d.addCallback(_uploaded) |
|---|
| 245 | |
|---|
| 246 | def _check_empty(res): |
|---|
| 247 | # Make sure the intermediate artifacts aren't left lying around. |
|---|
| 248 | files = os.listdir(os.path.join(self.basedir, "CHK_encoding")) |
|---|
| 249 | self.assertEqual(files, []) |
|---|
| 250 | files = os.listdir(os.path.join(self.basedir, "CHK_incoming")) |
|---|
| 251 | self.assertEqual(files, []) |
|---|
| 252 | d.addCallback(_check_empty) |
|---|
| 253 | |
|---|
| 254 | return d |
|---|
| 255 | |
|---|
| 256 | @inline_callbacks |
|---|
| 257 | def test_concurrent(self): |
|---|
| 258 | """ |
|---|
| 259 | The same data can be uploaded by more than one ``Uploader`` at a time. |
|---|
| 260 | """ |
|---|
| 261 | self.basedir = "helper/AssistedUpload/test_concurrent" |
|---|
| 262 | self.setUpHelper(self.basedir) |
|---|
| 263 | u1 = make_uploader(self.helper_furl, self.s, "u1") |
|---|
| 264 | u2 = make_uploader(self.helper_furl, self.s, "u2") |
|---|
| 265 | |
|---|
| 266 | yield wait_a_few_turns() |
|---|
| 267 | |
|---|
| 268 | for u in [u1, u2]: |
|---|
| 269 | self.assertTrue( |
|---|
| 270 | u._helper, |
|---|
| 271 | "Expected uploader to have a helper reference, had {} instead.".format( |
|---|
| 272 | u._helper, |
|---|
| 273 | ), |
|---|
| 274 | ) |
|---|
| 275 | |
|---|
| 276 | uploads = list( |
|---|
| 277 | upload_data(u, DATA, convergence=b"some convergence string") |
|---|
| 278 | for u |
|---|
| 279 | in [u1, u2] |
|---|
| 280 | ) |
|---|
| 281 | |
|---|
| 282 | result1, result2 = yield defer.gatherResults(uploads) |
|---|
| 283 | |
|---|
| 284 | self.assertEqual( |
|---|
| 285 | result1.get_uri(), |
|---|
| 286 | result2.get_uri(), |
|---|
| 287 | ) |
|---|
| 288 | # It would be really cool to assert that result1.get_pushed_shares() + |
|---|
| 289 | # result2.get_pushed_shares() == total_shares here. However, we're |
|---|
| 290 | # faking too much for that to be meaningful here. Also it doesn't |
|---|
| 291 | # hold because we don't actually push _anything_, we just lie about |
|---|
| 292 | # having pushed stuff. |
|---|
| 293 | |
|---|
| 294 | def test_previous_upload_failed(self): |
|---|
| 295 | self.basedir = "helper/AssistedUpload/test_previous_upload_failed" |
|---|
| 296 | self.setUpHelper(self.basedir) |
|---|
| 297 | |
|---|
| 298 | # we want to make sure that an upload which fails (leaving the |
|---|
| 299 | # ciphertext in the CHK_encoding/ directory) does not prevent a later |
|---|
| 300 | # attempt to upload that file from working. We simulate this by |
|---|
| 301 | # populating the directory manually. The hardest part is guessing the |
|---|
| 302 | # storage index. |
|---|
| 303 | |
|---|
| 304 | k = FakeClient.DEFAULT_ENCODING_PARAMETERS["k"] |
|---|
| 305 | n = FakeClient.DEFAULT_ENCODING_PARAMETERS["n"] |
|---|
| 306 | max_segsize = FakeClient.DEFAULT_ENCODING_PARAMETERS["max_segment_size"] |
|---|
| 307 | segsize = min(max_segsize, len(DATA)) |
|---|
| 308 | # this must be a multiple of 'required_shares'==k |
|---|
| 309 | segsize = mathutil.next_multiple(segsize, k) |
|---|
| 310 | |
|---|
| 311 | key = hashutil.convergence_hash(k, n, segsize, DATA, b"test convergence string") |
|---|
| 312 | assert len(key) == 16 |
|---|
| 313 | encryptor = aes.create_encryptor(key) |
|---|
| 314 | SI = hashutil.storage_index_hash(key) |
|---|
| 315 | SI_s = str(si_b2a(SI), "utf-8") |
|---|
| 316 | encfile = os.path.join(self.basedir, "CHK_encoding", SI_s) |
|---|
| 317 | f = open(encfile, "wb") |
|---|
| 318 | f.write(aes.encrypt_data(encryptor, DATA)) |
|---|
| 319 | f.close() |
|---|
| 320 | |
|---|
| 321 | u = make_uploader(self.helper_furl, self.s) |
|---|
| 322 | |
|---|
| 323 | d = wait_a_few_turns() |
|---|
| 324 | |
|---|
| 325 | def _ready(res): |
|---|
| 326 | assert u._helper |
|---|
| 327 | return upload_data(u, DATA, convergence=b"test convergence string") |
|---|
| 328 | d.addCallback(_ready) |
|---|
| 329 | def _uploaded(results): |
|---|
| 330 | the_uri = results.get_uri() |
|---|
| 331 | assert b"CHK" in the_uri |
|---|
| 332 | d.addCallback(_uploaded) |
|---|
| 333 | |
|---|
| 334 | def _check_empty(res): |
|---|
| 335 | files = os.listdir(os.path.join(self.basedir, "CHK_encoding")) |
|---|
| 336 | self.failUnlessEqual(files, []) |
|---|
| 337 | files = os.listdir(os.path.join(self.basedir, "CHK_incoming")) |
|---|
| 338 | self.failUnlessEqual(files, []) |
|---|
| 339 | d.addCallback(_check_empty) |
|---|
| 340 | |
|---|
| 341 | return d |
|---|
| 342 | |
|---|
| 343 | @inline_callbacks |
|---|
| 344 | def test_already_uploaded(self): |
|---|
| 345 | """ |
|---|
| 346 | If enough shares to satisfy the needed parameter already exist, the upload |
|---|
| 347 | succeeds without pushing any shares. |
|---|
| 348 | """ |
|---|
| 349 | params = FakeClient.DEFAULT_ENCODING_PARAMETERS |
|---|
| 350 | chk_checker = partial( |
|---|
| 351 | FakeCHKCheckerAndUEBFetcher, |
|---|
| 352 | sharemap=dictutil.DictOfSets({ |
|---|
| 353 | 0: {b"server0"}, |
|---|
| 354 | 1: {b"server1"}, |
|---|
| 355 | }), |
|---|
| 356 | ueb_data={ |
|---|
| 357 | "size": len(DATA), |
|---|
| 358 | "segment_size": min(params["max_segment_size"], len(DATA)), |
|---|
| 359 | "needed_shares": params["k"], |
|---|
| 360 | "total_shares": params["n"], |
|---|
| 361 | }, |
|---|
| 362 | ) |
|---|
| 363 | self.basedir = "helper/AssistedUpload/test_already_uploaded" |
|---|
| 364 | self.setUpHelper( |
|---|
| 365 | self.basedir, |
|---|
| 366 | chk_checker=chk_checker, |
|---|
| 367 | ) |
|---|
| 368 | u = make_uploader(self.helper_furl, self.s) |
|---|
| 369 | |
|---|
| 370 | yield wait_a_few_turns() |
|---|
| 371 | |
|---|
| 372 | assert u._helper |
|---|
| 373 | |
|---|
| 374 | results = yield upload_data(u, DATA, convergence=b"some convergence string") |
|---|
| 375 | the_uri = results.get_uri() |
|---|
| 376 | assert b"CHK" in the_uri |
|---|
| 377 | |
|---|
| 378 | files = os.listdir(os.path.join(self.basedir, "CHK_encoding")) |
|---|
| 379 | self.failUnlessEqual(files, []) |
|---|
| 380 | files = os.listdir(os.path.join(self.basedir, "CHK_incoming")) |
|---|
| 381 | self.failUnlessEqual(files, []) |
|---|
| 382 | |
|---|
| 383 | self.assertEqual( |
|---|
| 384 | results.get_pushed_shares(), |
|---|
| 385 | 0, |
|---|
| 386 | ) |
|---|
| 387 | |
|---|
| 388 | |
|---|
| 389 | class CHKCheckerAndUEBFetcherTests(SyncTestCase): |
|---|
| 390 | """ |
|---|
| 391 | Tests for ``CHKCheckerAndUEBFetcher``. |
|---|
| 392 | """ |
|---|
| 393 | def test_check_no_peers(self): |
|---|
| 394 | """ |
|---|
| 395 | If the supplied "peer getter" returns no peers then |
|---|
| 396 | ``CHKCheckerAndUEBFetcher.check`` returns a ``Deferred`` that fires |
|---|
| 397 | with ``False``. |
|---|
| 398 | """ |
|---|
| 399 | storage_index = b"a" * 16 |
|---|
| 400 | peers = {storage_index: []} |
|---|
| 401 | caf = offloaded.CHKCheckerAndUEBFetcher( |
|---|
| 402 | peers.get, |
|---|
| 403 | storage_index, |
|---|
| 404 | None, |
|---|
| 405 | ) |
|---|
| 406 | self.assertThat( |
|---|
| 407 | caf.check(), |
|---|
| 408 | succeeded(Equals(False)), |
|---|
| 409 | ) |
|---|
| 410 | |
|---|
| 411 | @inline_callbacks |
|---|
| 412 | def test_check_ueb_unavailable(self): |
|---|
| 413 | """ |
|---|
| 414 | If the UEB cannot be read from any of the peers supplied by the "peer |
|---|
| 415 | getter" then ``CHKCheckerAndUEBFetcher.check`` returns a ``Deferred`` |
|---|
| 416 | that fires with ``False``. |
|---|
| 417 | """ |
|---|
| 418 | storage_index = b"a" * 16 |
|---|
| 419 | serverid = b"b" * 20 |
|---|
| 420 | storage = FoolscapStorageServer(StorageServer(self.mktemp(), serverid)) |
|---|
| 421 | rref_without_ueb = LocalWrapper(storage, fireNow) |
|---|
| 422 | yield write_bad_share(rref_without_ueb, storage_index) |
|---|
| 423 | server_without_ueb = NoNetworkServer(serverid, rref_without_ueb) |
|---|
| 424 | peers = {storage_index: [server_without_ueb]} |
|---|
| 425 | caf = offloaded.CHKCheckerAndUEBFetcher( |
|---|
| 426 | peers.get, |
|---|
| 427 | storage_index, |
|---|
| 428 | None, |
|---|
| 429 | ) |
|---|
| 430 | self.assertThat( |
|---|
| 431 | caf.check(), |
|---|
| 432 | succeeded(Equals(False)), |
|---|
| 433 | ) |
|---|
| 434 | |
|---|
| 435 | @inline_callbacks |
|---|
| 436 | def test_not_enough_shares(self): |
|---|
| 437 | """ |
|---|
| 438 | If fewer shares are found than are required to reassemble the data then |
|---|
| 439 | ``CHKCheckerAndUEBFetcher.check`` returns a ``Deferred`` that fires |
|---|
| 440 | with ``False``. |
|---|
| 441 | """ |
|---|
| 442 | storage_index = b"a" * 16 |
|---|
| 443 | serverid = b"b" * 20 |
|---|
| 444 | storage = FoolscapStorageServer(StorageServer(self.mktemp(), serverid)) |
|---|
| 445 | rref_with_ueb = LocalWrapper(storage, fireNow) |
|---|
| 446 | ueb = { |
|---|
| 447 | "needed_shares": 2, |
|---|
| 448 | "total_shares": 2, |
|---|
| 449 | "segment_size": 128 * 1024, |
|---|
| 450 | "size": 1024, |
|---|
| 451 | } |
|---|
| 452 | yield write_good_share(rref_with_ueb, storage_index, ueb, [0]) |
|---|
| 453 | |
|---|
| 454 | server_with_ueb = NoNetworkServer(serverid, rref_with_ueb) |
|---|
| 455 | peers = {storage_index: [server_with_ueb]} |
|---|
| 456 | caf = offloaded.CHKCheckerAndUEBFetcher( |
|---|
| 457 | peers.get, |
|---|
| 458 | storage_index, |
|---|
| 459 | None, |
|---|
| 460 | ) |
|---|
| 461 | self.assertThat( |
|---|
| 462 | caf.check(), |
|---|
| 463 | succeeded(Equals(False)), |
|---|
| 464 | ) |
|---|
| 465 | |
|---|
| 466 | @inline_callbacks |
|---|
| 467 | def test_enough_shares(self): |
|---|
| 468 | """ |
|---|
| 469 | If enough shares are found to reassemble the data then |
|---|
| 470 | ``CHKCheckerAndUEBFetcher.check`` returns a ``Deferred`` that fires |
|---|
| 471 | with share and share placement information. |
|---|
| 472 | """ |
|---|
| 473 | storage_index = b"a" * 16 |
|---|
| 474 | serverids = list( |
|---|
| 475 | ch * 20 |
|---|
| 476 | for ch |
|---|
| 477 | in [b"b", b"c"] |
|---|
| 478 | ) |
|---|
| 479 | storages = list( |
|---|
| 480 | FoolscapStorageServer(StorageServer(self.mktemp(), serverid)) |
|---|
| 481 | for serverid |
|---|
| 482 | in serverids |
|---|
| 483 | ) |
|---|
| 484 | rrefs_with_ueb = list( |
|---|
| 485 | LocalWrapper(storage, fireNow) |
|---|
| 486 | for storage |
|---|
| 487 | in storages |
|---|
| 488 | ) |
|---|
| 489 | ueb = { |
|---|
| 490 | "needed_shares": len(serverids), |
|---|
| 491 | "total_shares": len(serverids), |
|---|
| 492 | "segment_size": 128 * 1024, |
|---|
| 493 | "size": 1024, |
|---|
| 494 | } |
|---|
| 495 | for n, rref_with_ueb in enumerate(rrefs_with_ueb): |
|---|
| 496 | yield write_good_share(rref_with_ueb, storage_index, ueb, [n]) |
|---|
| 497 | |
|---|
| 498 | servers_with_ueb = list( |
|---|
| 499 | NoNetworkServer(serverid, rref_with_ueb) |
|---|
| 500 | for (serverid, rref_with_ueb) |
|---|
| 501 | in zip(serverids, rrefs_with_ueb) |
|---|
| 502 | ) |
|---|
| 503 | peers = {storage_index: servers_with_ueb} |
|---|
| 504 | caf = offloaded.CHKCheckerAndUEBFetcher( |
|---|
| 505 | peers.get, |
|---|
| 506 | storage_index, |
|---|
| 507 | None, |
|---|
| 508 | ) |
|---|
| 509 | self.assertThat( |
|---|
| 510 | caf.check(), |
|---|
| 511 | succeeded(MatchesListwise([ |
|---|
| 512 | Equals({ |
|---|
| 513 | n: {serverid} |
|---|
| 514 | for (n, serverid) |
|---|
| 515 | in enumerate(serverids) |
|---|
| 516 | }), |
|---|
| 517 | Equals(ueb), |
|---|
| 518 | IsInstance(bytes), |
|---|
| 519 | ])), |
|---|
| 520 | ) |
|---|
| 521 | |
|---|
| 522 | |
|---|
| 523 | def write_bad_share(storage_rref, storage_index): |
|---|
| 524 | """ |
|---|
| 525 | Write a share with a corrupt URI extension block. |
|---|
| 526 | """ |
|---|
| 527 | # Write some trash to the right bucket on this storage server. It won't |
|---|
| 528 | # have a recoverable UEB block. |
|---|
| 529 | return write_share(storage_rref, storage_index, [0], b"\0" * 1024) |
|---|
| 530 | |
|---|
| 531 | |
|---|
| 532 | def write_good_share(storage_rref, storage_index, ueb, sharenums): |
|---|
| 533 | """ |
|---|
| 534 | Write a valid share with the given URI extension block. |
|---|
| 535 | """ |
|---|
| 536 | write_proxy = make_write_bucket_proxy( |
|---|
| 537 | storage_rref, |
|---|
| 538 | None, |
|---|
| 539 | 1024, |
|---|
| 540 | ueb["segment_size"], |
|---|
| 541 | 1, |
|---|
| 542 | 1, |
|---|
| 543 | ueb["size"], |
|---|
| 544 | ) |
|---|
| 545 | # See allmydata/immutable/layout.py |
|---|
| 546 | offset = write_proxy._offsets["uri_extension"] |
|---|
| 547 | filler = b"\0" * (offset - len(write_proxy._offset_data)) |
|---|
| 548 | ueb_data = uri.pack_extension(ueb) |
|---|
| 549 | data = ( |
|---|
| 550 | write_proxy._offset_data + |
|---|
| 551 | filler + |
|---|
| 552 | pack(write_proxy.fieldstruct, len(ueb_data)) + |
|---|
| 553 | ueb_data |
|---|
| 554 | ) |
|---|
| 555 | return write_share(storage_rref, storage_index, sharenums, data) |
|---|
| 556 | |
|---|
| 557 | |
|---|
| 558 | @inline_callbacks |
|---|
| 559 | def write_share(storage_rref, storage_index, sharenums, sharedata): |
|---|
| 560 | """ |
|---|
| 561 | Write the given share data to the given storage index using the given |
|---|
| 562 | IStorageServer remote reference. |
|---|
| 563 | |
|---|
| 564 | :param foolscap.ipb.IRemoteReference storage_rref: A remote reference to |
|---|
| 565 | an IStorageServer. |
|---|
| 566 | |
|---|
| 567 | :param bytes storage_index: The storage index to which to write the share |
|---|
| 568 | data. |
|---|
| 569 | |
|---|
| 570 | :param [int] sharenums: The share numbers to which to write this sharedata. |
|---|
| 571 | |
|---|
| 572 | :param bytes sharedata: The ciphertext to write as the share. |
|---|
| 573 | """ |
|---|
| 574 | ignored, writers = yield storage_rref.callRemote( |
|---|
| 575 | "allocate_buckets", |
|---|
| 576 | storage_index, |
|---|
| 577 | b"x" * 16, |
|---|
| 578 | b"x" * 16, |
|---|
| 579 | sharenums, |
|---|
| 580 | len(sharedata), |
|---|
| 581 | LocalWrapper(None), |
|---|
| 582 | |
|---|
| 583 | ) |
|---|
| 584 | [writer] = writers.values() |
|---|
| 585 | yield writer.callRemote("write", 0, sharedata) |
|---|
| 586 | yield writer.callRemote("close") |
|---|