| 1 | """ |
|---|
| 2 | Ported to Python 3. |
|---|
| 3 | """ |
|---|
| 4 | |
|---|
| 5 | from io import StringIO |
|---|
| 6 | from ..common import AsyncTestCase |
|---|
| 7 | from testtools.matchers import Equals, HasLength, Contains |
|---|
| 8 | from twisted.internet import defer |
|---|
| 9 | |
|---|
| 10 | from allmydata.util import base32, consumer |
|---|
| 11 | from allmydata.interfaces import NotEnoughSharesError |
|---|
| 12 | from allmydata.monitor import Monitor |
|---|
| 13 | from allmydata.mutable.common import MODE_READ, UnrecoverableFileError |
|---|
| 14 | from allmydata.mutable.servermap import ServerMap, ServermapUpdater |
|---|
| 15 | from allmydata.mutable.retrieve import Retrieve |
|---|
| 16 | from .util import PublishMixin, make_storagebroker, corrupt |
|---|
| 17 | from .. import common_util as testutil |
|---|
| 18 | |
|---|
| 19 | class Roundtrip(AsyncTestCase, testutil.ShouldFailMixin, PublishMixin): |
|---|
| 20 | def setUp(self): |
|---|
| 21 | super(Roundtrip, self).setUp() |
|---|
| 22 | return self.publish_one() |
|---|
| 23 | |
|---|
| 24 | def make_servermap(self, mode=MODE_READ, oldmap=None, sb=None): |
|---|
| 25 | if oldmap is None: |
|---|
| 26 | oldmap = ServerMap() |
|---|
| 27 | if sb is None: |
|---|
| 28 | sb = self._storage_broker |
|---|
| 29 | smu = ServermapUpdater(self._fn, sb, Monitor(), oldmap, mode) |
|---|
| 30 | d = smu.update() |
|---|
| 31 | return d |
|---|
| 32 | |
|---|
| 33 | def abbrev_verinfo(self, verinfo): |
|---|
| 34 | if verinfo is None: |
|---|
| 35 | return None |
|---|
| 36 | (seqnum, root_hash, IV, segsize, datalength, k, N, prefix, |
|---|
| 37 | offsets_tuple) = verinfo |
|---|
| 38 | return "%d-%s" % (seqnum, base32.b2a(root_hash)[:4]) |
|---|
| 39 | |
|---|
| 40 | def abbrev_verinfo_dict(self, verinfo_d): |
|---|
| 41 | output = {} |
|---|
| 42 | for verinfo,value in list(verinfo_d.items()): |
|---|
| 43 | (seqnum, root_hash, IV, segsize, datalength, k, N, prefix, |
|---|
| 44 | offsets_tuple) = verinfo |
|---|
| 45 | output["%d-%s" % (seqnum, base32.b2a(root_hash)[:4])] = value |
|---|
| 46 | return output |
|---|
| 47 | |
|---|
| 48 | def dump_servermap(self, servermap): |
|---|
| 49 | print("SERVERMAP", servermap) |
|---|
| 50 | print("RECOVERABLE", [self.abbrev_verinfo(v) |
|---|
| 51 | for v in servermap.recoverable_versions()]) |
|---|
| 52 | print("BEST", self.abbrev_verinfo(servermap.best_recoverable_version())) |
|---|
| 53 | print("available", self.abbrev_verinfo_dict(servermap.shares_available())) |
|---|
| 54 | |
|---|
| 55 | def do_download(self, servermap, version=None): |
|---|
| 56 | if version is None: |
|---|
| 57 | version = servermap.best_recoverable_version() |
|---|
| 58 | r = Retrieve(self._fn, self._storage_broker, servermap, version) |
|---|
| 59 | c = consumer.MemoryConsumer() |
|---|
| 60 | d = r.download(consumer=c) |
|---|
| 61 | d.addCallback(lambda mc: b"".join(mc.chunks)) |
|---|
| 62 | return d |
|---|
| 63 | |
|---|
| 64 | |
|---|
| 65 | def test_basic(self): |
|---|
| 66 | d = self.make_servermap() |
|---|
| 67 | def _do_retrieve(servermap): |
|---|
| 68 | self._smap = servermap |
|---|
| 69 | #self.dump_servermap(servermap) |
|---|
| 70 | self.assertThat(servermap.recoverable_versions(), HasLength(1)) |
|---|
| 71 | return self.do_download(servermap) |
|---|
| 72 | d.addCallback(_do_retrieve) |
|---|
| 73 | def _retrieved(new_contents): |
|---|
| 74 | self.assertThat(new_contents, Equals(self.CONTENTS)) |
|---|
| 75 | d.addCallback(_retrieved) |
|---|
| 76 | # we should be able to re-use the same servermap, both with and |
|---|
| 77 | # without updating it. |
|---|
| 78 | d.addCallback(lambda res: self.do_download(self._smap)) |
|---|
| 79 | d.addCallback(_retrieved) |
|---|
| 80 | d.addCallback(lambda res: self.make_servermap(oldmap=self._smap)) |
|---|
| 81 | d.addCallback(lambda res: self.do_download(self._smap)) |
|---|
| 82 | d.addCallback(_retrieved) |
|---|
| 83 | # clobbering the pubkey should make the servermap updater re-fetch it |
|---|
| 84 | def _clobber_pubkey(res): |
|---|
| 85 | self._fn._pubkey = None |
|---|
| 86 | d.addCallback(_clobber_pubkey) |
|---|
| 87 | d.addCallback(lambda res: self.make_servermap(oldmap=self._smap)) |
|---|
| 88 | d.addCallback(lambda res: self.do_download(self._smap)) |
|---|
| 89 | d.addCallback(_retrieved) |
|---|
| 90 | return d |
|---|
| 91 | |
|---|
| 92 | def test_all_shares_vanished(self): |
|---|
| 93 | d = self.make_servermap() |
|---|
| 94 | def _remove_shares(servermap): |
|---|
| 95 | for shares in list(self._storage._peers.values()): |
|---|
| 96 | shares.clear() |
|---|
| 97 | d1 = self.shouldFail(NotEnoughSharesError, |
|---|
| 98 | "test_all_shares_vanished", |
|---|
| 99 | "ran out of servers", |
|---|
| 100 | self.do_download, servermap) |
|---|
| 101 | return d1 |
|---|
| 102 | d.addCallback(_remove_shares) |
|---|
| 103 | return d |
|---|
| 104 | |
|---|
| 105 | def test_all_but_two_shares_vanished_updated_servermap(self): |
|---|
| 106 | # tests error reporting for ticket #1742 |
|---|
| 107 | d = self.make_servermap() |
|---|
| 108 | def _remove_shares(servermap): |
|---|
| 109 | self._version = servermap.best_recoverable_version() |
|---|
| 110 | for shares in list(self._storage._peers.values())[2:]: |
|---|
| 111 | shares.clear() |
|---|
| 112 | return self.make_servermap(servermap) |
|---|
| 113 | d.addCallback(_remove_shares) |
|---|
| 114 | def _check(updated_servermap): |
|---|
| 115 | d1 = self.shouldFail(NotEnoughSharesError, |
|---|
| 116 | "test_all_but_two_shares_vanished_updated_servermap", |
|---|
| 117 | "ran out of servers", |
|---|
| 118 | self.do_download, updated_servermap, version=self._version) |
|---|
| 119 | return d1 |
|---|
| 120 | d.addCallback(_check) |
|---|
| 121 | return d |
|---|
| 122 | |
|---|
| 123 | def test_no_servers(self): |
|---|
| 124 | sb2 = make_storagebroker(num_peers=0) |
|---|
| 125 | # if there are no servers, then a MODE_READ servermap should come |
|---|
| 126 | # back empty |
|---|
| 127 | d = self.make_servermap(sb=sb2) |
|---|
| 128 | def _check_servermap(servermap): |
|---|
| 129 | self.assertThat(servermap.best_recoverable_version(), Equals(None)) |
|---|
| 130 | self.assertFalse(servermap.recoverable_versions()) |
|---|
| 131 | self.assertFalse(servermap.unrecoverable_versions()) |
|---|
| 132 | self.assertFalse(servermap.all_servers()) |
|---|
| 133 | d.addCallback(_check_servermap) |
|---|
| 134 | return d |
|---|
| 135 | |
|---|
| 136 | def test_no_servers_download(self): |
|---|
| 137 | sb2 = make_storagebroker(num_peers=0) |
|---|
| 138 | self._fn._storage_broker = sb2 |
|---|
| 139 | d = self.shouldFail(UnrecoverableFileError, |
|---|
| 140 | "test_no_servers_download", |
|---|
| 141 | "no recoverable versions", |
|---|
| 142 | self._fn.download_best_version) |
|---|
| 143 | def _restore(res): |
|---|
| 144 | # a failed download that occurs while we aren't connected to |
|---|
| 145 | # anybody should not prevent a subsequent download from working. |
|---|
| 146 | # This isn't quite the webapi-driven test that #463 wants, but it |
|---|
| 147 | # should be close enough. |
|---|
| 148 | self._fn._storage_broker = self._storage_broker |
|---|
| 149 | return self._fn.download_best_version() |
|---|
| 150 | def _retrieved(new_contents): |
|---|
| 151 | self.assertThat(new_contents, Equals(self.CONTENTS)) |
|---|
| 152 | d.addCallback(_restore) |
|---|
| 153 | d.addCallback(_retrieved) |
|---|
| 154 | return d |
|---|
| 155 | |
|---|
| 156 | |
|---|
| 157 | def _test_corrupt_all(self, offset, substring, |
|---|
| 158 | should_succeed=False, |
|---|
| 159 | corrupt_early=True, |
|---|
| 160 | failure_checker=None, |
|---|
| 161 | fetch_privkey=False): |
|---|
| 162 | d = defer.succeed(None) |
|---|
| 163 | if corrupt_early: |
|---|
| 164 | d.addCallback(corrupt, self._storage, offset) |
|---|
| 165 | d.addCallback(lambda res: self.make_servermap()) |
|---|
| 166 | if not corrupt_early: |
|---|
| 167 | d.addCallback(corrupt, self._storage, offset) |
|---|
| 168 | def _do_retrieve(servermap): |
|---|
| 169 | ver = servermap.best_recoverable_version() |
|---|
| 170 | if ver is None and not should_succeed: |
|---|
| 171 | # no recoverable versions == not succeeding. The problem |
|---|
| 172 | # should be noted in the servermap's list of problems. |
|---|
| 173 | if substring: |
|---|
| 174 | allproblems = [str(f) for f in servermap.get_problems()] |
|---|
| 175 | self.assertThat("".join(allproblems), Contains(substring)) |
|---|
| 176 | return servermap |
|---|
| 177 | if should_succeed: |
|---|
| 178 | d1 = self._fn.download_version(servermap, ver, |
|---|
| 179 | fetch_privkey) |
|---|
| 180 | d1.addCallback(lambda new_contents: |
|---|
| 181 | self.assertThat(new_contents, Equals(self.CONTENTS))) |
|---|
| 182 | else: |
|---|
| 183 | d1 = self.shouldFail(NotEnoughSharesError, |
|---|
| 184 | "_corrupt_all(offset=%s)" % (offset,), |
|---|
| 185 | substring, |
|---|
| 186 | self._fn.download_version, servermap, |
|---|
| 187 | ver, |
|---|
| 188 | fetch_privkey) |
|---|
| 189 | if failure_checker: |
|---|
| 190 | d1.addCallback(failure_checker) |
|---|
| 191 | d1.addCallback(lambda res: servermap) |
|---|
| 192 | return d1 |
|---|
| 193 | d.addCallback(_do_retrieve) |
|---|
| 194 | return d |
|---|
| 195 | |
|---|
| 196 | def test_corrupt_all_verbyte(self): |
|---|
| 197 | # when the version byte is not 0 or 1, we hit an UnknownVersionError |
|---|
| 198 | # error in unpack_share(). |
|---|
| 199 | d = self._test_corrupt_all(0, "UnknownVersionError") |
|---|
| 200 | def _check_servermap(servermap): |
|---|
| 201 | # and the dump should mention the problems |
|---|
| 202 | s = StringIO() |
|---|
| 203 | dump = servermap.dump(s).getvalue() |
|---|
| 204 | self.assertTrue("30 PROBLEMS" in dump, msg=dump) |
|---|
| 205 | d.addCallback(_check_servermap) |
|---|
| 206 | return d |
|---|
| 207 | |
|---|
| 208 | def test_corrupt_all_seqnum(self): |
|---|
| 209 | # a corrupt sequence number will trigger a bad signature |
|---|
| 210 | return self._test_corrupt_all(1, "signature is invalid") |
|---|
| 211 | |
|---|
| 212 | def test_corrupt_all_R(self): |
|---|
| 213 | # a corrupt root hash will trigger a bad signature |
|---|
| 214 | return self._test_corrupt_all(9, "signature is invalid") |
|---|
| 215 | |
|---|
| 216 | def test_corrupt_all_IV(self): |
|---|
| 217 | # a corrupt salt/IV will trigger a bad signature |
|---|
| 218 | return self._test_corrupt_all(41, "signature is invalid") |
|---|
| 219 | |
|---|
| 220 | def test_corrupt_all_k(self): |
|---|
| 221 | # a corrupt 'k' will trigger a bad signature |
|---|
| 222 | return self._test_corrupt_all(57, "signature is invalid") |
|---|
| 223 | |
|---|
| 224 | def test_corrupt_all_N(self): |
|---|
| 225 | # a corrupt 'N' will trigger a bad signature |
|---|
| 226 | return self._test_corrupt_all(58, "signature is invalid") |
|---|
| 227 | |
|---|
| 228 | def test_corrupt_all_segsize(self): |
|---|
| 229 | # a corrupt segsize will trigger a bad signature |
|---|
| 230 | return self._test_corrupt_all(59, "signature is invalid") |
|---|
| 231 | |
|---|
| 232 | def test_corrupt_all_datalen(self): |
|---|
| 233 | # a corrupt data length will trigger a bad signature |
|---|
| 234 | return self._test_corrupt_all(67, "signature is invalid") |
|---|
| 235 | |
|---|
| 236 | def test_corrupt_all_pubkey(self): |
|---|
| 237 | # a corrupt pubkey won't match the URI's fingerprint. We need to |
|---|
| 238 | # remove the pubkey from the filenode, or else it won't bother trying |
|---|
| 239 | # to update it. |
|---|
| 240 | self._fn._pubkey = None |
|---|
| 241 | return self._test_corrupt_all("pubkey", |
|---|
| 242 | "pubkey doesn't match fingerprint") |
|---|
| 243 | |
|---|
| 244 | def test_corrupt_all_sig(self): |
|---|
| 245 | # a corrupt signature is a bad one |
|---|
| 246 | # the signature runs from about [543:799], depending upon the length |
|---|
| 247 | # of the pubkey |
|---|
| 248 | return self._test_corrupt_all("signature", "signature is invalid") |
|---|
| 249 | |
|---|
| 250 | def test_corrupt_all_share_hash_chain_number(self): |
|---|
| 251 | # a corrupt share hash chain entry will show up as a bad hash. If we |
|---|
| 252 | # mangle the first byte, that will look like a bad hash number, |
|---|
| 253 | # causing an IndexError |
|---|
| 254 | return self._test_corrupt_all("share_hash_chain", "corrupt hashes") |
|---|
| 255 | |
|---|
| 256 | def test_corrupt_all_share_hash_chain_hash(self): |
|---|
| 257 | # a corrupt share hash chain entry will show up as a bad hash. If we |
|---|
| 258 | # mangle a few bytes in, that will look like a bad hash. |
|---|
| 259 | return self._test_corrupt_all(("share_hash_chain",4), "corrupt hashes") |
|---|
| 260 | |
|---|
| 261 | def test_corrupt_all_block_hash_tree(self): |
|---|
| 262 | return self._test_corrupt_all("block_hash_tree", |
|---|
| 263 | "block hash tree failure") |
|---|
| 264 | |
|---|
| 265 | def test_corrupt_all_block(self): |
|---|
| 266 | return self._test_corrupt_all("share_data", "block hash tree failure") |
|---|
| 267 | |
|---|
| 268 | def test_corrupt_all_encprivkey(self): |
|---|
| 269 | # a corrupted privkey won't even be noticed by the reader, only by a |
|---|
| 270 | # writer. |
|---|
| 271 | return self._test_corrupt_all("enc_privkey", None, should_succeed=True) |
|---|
| 272 | |
|---|
| 273 | |
|---|
| 274 | def test_corrupt_all_encprivkey_late(self): |
|---|
| 275 | # this should work for the same reason as above, but we corrupt |
|---|
| 276 | # after the servermap update to exercise the error handling |
|---|
| 277 | # code. |
|---|
| 278 | # We need to remove the privkey from the node, or the retrieve |
|---|
| 279 | # process won't know to update it. |
|---|
| 280 | self._fn._privkey = None |
|---|
| 281 | return self._test_corrupt_all("enc_privkey", |
|---|
| 282 | None, # this shouldn't fail |
|---|
| 283 | should_succeed=True, |
|---|
| 284 | corrupt_early=False, |
|---|
| 285 | fetch_privkey=True) |
|---|
| 286 | |
|---|
| 287 | |
|---|
| 288 | # disabled until retrieve tests checkstring on each blockfetch. I didn't |
|---|
| 289 | # just use a .todo because the failing-but-ignored test emits about 30kB |
|---|
| 290 | # of noise. |
|---|
| 291 | def OFF_test_corrupt_all_seqnum_late(self): |
|---|
| 292 | # corrupting the seqnum between mapupdate and retrieve should result |
|---|
| 293 | # in NotEnoughSharesError, since each share will look invalid |
|---|
| 294 | def _check(res): |
|---|
| 295 | f = res[0] |
|---|
| 296 | self.assertThat(f.check(NotEnoughSharesError), HasLength(1)) |
|---|
| 297 | self.assertThat("uncoordinated write" in str(f), Equals(True)) |
|---|
| 298 | return self._test_corrupt_all(1, "ran out of servers", |
|---|
| 299 | corrupt_early=False, |
|---|
| 300 | failure_checker=_check) |
|---|
| 301 | |
|---|
| 302 | |
|---|
| 303 | def test_corrupt_all_block_late(self): |
|---|
| 304 | def _check(res): |
|---|
| 305 | f = res[0] |
|---|
| 306 | self.assertTrue(f.check(NotEnoughSharesError)) |
|---|
| 307 | return self._test_corrupt_all("share_data", "block hash tree failure", |
|---|
| 308 | corrupt_early=False, |
|---|
| 309 | failure_checker=_check) |
|---|
| 310 | |
|---|
| 311 | |
|---|
| 312 | def test_basic_pubkey_at_end(self): |
|---|
| 313 | # we corrupt the pubkey in all but the last 'k' shares, allowing the |
|---|
| 314 | # download to succeed but forcing a bunch of retries first. Note that |
|---|
| 315 | # this is rather pessimistic: our Retrieve process will throw away |
|---|
| 316 | # the whole share if the pubkey is bad, even though the rest of the |
|---|
| 317 | # share might be good. |
|---|
| 318 | |
|---|
| 319 | self._fn._pubkey = None |
|---|
| 320 | k = self._fn.get_required_shares() |
|---|
| 321 | N = self._fn.get_total_shares() |
|---|
| 322 | d = defer.succeed(None) |
|---|
| 323 | d.addCallback(corrupt, self._storage, "pubkey", |
|---|
| 324 | shnums_to_corrupt=list(range(0, N-k))) |
|---|
| 325 | d.addCallback(lambda res: self.make_servermap()) |
|---|
| 326 | def _do_retrieve(servermap): |
|---|
| 327 | self.assertTrue(servermap.get_problems()) |
|---|
| 328 | self.assertThat("pubkey doesn't match fingerprint" |
|---|
| 329 | in str(servermap.get_problems()[0]), Equals(True)) |
|---|
| 330 | ver = servermap.best_recoverable_version() |
|---|
| 331 | r = Retrieve(self._fn, self._storage_broker, servermap, ver) |
|---|
| 332 | c = consumer.MemoryConsumer() |
|---|
| 333 | return r.download(c) |
|---|
| 334 | d.addCallback(_do_retrieve) |
|---|
| 335 | d.addCallback(lambda mc: b"".join(mc.chunks)) |
|---|
| 336 | d.addCallback(lambda new_contents: |
|---|
| 337 | self.assertThat(new_contents, Equals(self.CONTENTS))) |
|---|
| 338 | return d |
|---|
| 339 | |
|---|
| 340 | |
|---|
| 341 | def _test_corrupt_some(self, offset, mdmf=False): |
|---|
| 342 | if mdmf: |
|---|
| 343 | d = self.publish_mdmf() |
|---|
| 344 | else: |
|---|
| 345 | d = defer.succeed(None) |
|---|
| 346 | d.addCallback(lambda ignored: |
|---|
| 347 | corrupt(None, self._storage, offset, list(range(5)))) |
|---|
| 348 | d.addCallback(lambda ignored: |
|---|
| 349 | self.make_servermap()) |
|---|
| 350 | def _do_retrieve(servermap): |
|---|
| 351 | ver = servermap.best_recoverable_version() |
|---|
| 352 | self.assertTrue(ver) |
|---|
| 353 | return self._fn.download_best_version() |
|---|
| 354 | d.addCallback(_do_retrieve) |
|---|
| 355 | d.addCallback(lambda new_contents: |
|---|
| 356 | self.assertThat(new_contents, Equals(self.CONTENTS))) |
|---|
| 357 | return d |
|---|
| 358 | |
|---|
| 359 | |
|---|
| 360 | def test_corrupt_some(self): |
|---|
| 361 | # corrupt the data of first five shares (so the servermap thinks |
|---|
| 362 | # they're good but retrieve marks them as bad), so that the |
|---|
| 363 | # MODE_READ set of 6 will be insufficient, forcing node.download to |
|---|
| 364 | # retry with more servers. |
|---|
| 365 | return self._test_corrupt_some("share_data") |
|---|
| 366 | |
|---|
| 367 | |
|---|
| 368 | def test_download_fails(self): |
|---|
| 369 | d = corrupt(None, self._storage, "signature") |
|---|
| 370 | d.addCallback(lambda ignored: |
|---|
| 371 | self.shouldFail(UnrecoverableFileError, "test_download_anyway", |
|---|
| 372 | "no recoverable versions", |
|---|
| 373 | self._fn.download_best_version)) |
|---|
| 374 | return d |
|---|
| 375 | |
|---|
| 376 | |
|---|
| 377 | |
|---|
| 378 | def test_corrupt_mdmf_block_hash_tree(self): |
|---|
| 379 | d = self.publish_mdmf() |
|---|
| 380 | d.addCallback(lambda ignored: |
|---|
| 381 | self._test_corrupt_all(("block_hash_tree", 12 * 32), |
|---|
| 382 | "block hash tree failure", |
|---|
| 383 | corrupt_early=True, |
|---|
| 384 | should_succeed=False)) |
|---|
| 385 | return d |
|---|
| 386 | |
|---|
| 387 | |
|---|
| 388 | def test_corrupt_mdmf_block_hash_tree_late(self): |
|---|
| 389 | # Note - there is no SDMF counterpart to this test, as the SDMF |
|---|
| 390 | # files are guaranteed to have exactly one block, and therefore |
|---|
| 391 | # the block hash tree fits within the initial read (#1240). |
|---|
| 392 | d = self.publish_mdmf() |
|---|
| 393 | d.addCallback(lambda ignored: |
|---|
| 394 | self._test_corrupt_all(("block_hash_tree", 12 * 32), |
|---|
| 395 | "block hash tree failure", |
|---|
| 396 | corrupt_early=False, |
|---|
| 397 | should_succeed=False)) |
|---|
| 398 | return d |
|---|
| 399 | |
|---|
| 400 | |
|---|
| 401 | def test_corrupt_mdmf_share_data(self): |
|---|
| 402 | d = self.publish_mdmf() |
|---|
| 403 | d.addCallback(lambda ignored: |
|---|
| 404 | # TODO: Find out what the block size is and corrupt a |
|---|
| 405 | # specific block, rather than just guessing. |
|---|
| 406 | self._test_corrupt_all(("share_data", 12 * 40), |
|---|
| 407 | "block hash tree failure", |
|---|
| 408 | corrupt_early=True, |
|---|
| 409 | should_succeed=False)) |
|---|
| 410 | return d |
|---|
| 411 | |
|---|
| 412 | |
|---|
| 413 | def test_corrupt_some_mdmf(self): |
|---|
| 414 | return self._test_corrupt_some(("share_data", 12 * 40), |
|---|
| 415 | mdmf=True) |
|---|