| 1 | """ |
|---|
| 2 | Ported to Python 3. |
|---|
| 3 | """ |
|---|
| 4 | |
|---|
| 5 | |
|---|
| 6 | from io import BytesIO |
|---|
| 7 | import attr |
|---|
| 8 | from twisted.internet import defer, reactor |
|---|
| 9 | from foolscap.api import eventually, fireEventually |
|---|
| 10 | from allmydata import client |
|---|
| 11 | from allmydata.nodemaker import NodeMaker |
|---|
| 12 | from allmydata.interfaces import SDMF_VERSION, MDMF_VERSION |
|---|
| 13 | from allmydata.util import base32 |
|---|
| 14 | from allmydata.util.hashutil import tagged_hash |
|---|
| 15 | from allmydata.storage_client import StorageFarmBroker |
|---|
| 16 | from allmydata.mutable.layout import MDMFSlotReadProxy |
|---|
| 17 | from allmydata.mutable.publish import MutableData |
|---|
| 18 | from ..common import ( |
|---|
| 19 | EMPTY_CLIENT_CONFIG, |
|---|
| 20 | ) |
|---|
| 21 | |
|---|
| 22 | def bchr(s): |
|---|
| 23 | return bytes([s]) |
|---|
| 24 | |
|---|
| 25 | def eventuaaaaaly(res=None): |
|---|
| 26 | d = fireEventually(res) |
|---|
| 27 | d.addCallback(fireEventually) |
|---|
| 28 | d.addCallback(fireEventually) |
|---|
| 29 | return d |
|---|
| 30 | |
|---|
| 31 | # this "FakeStorage" exists to put the share data in RAM and avoid using real |
|---|
| 32 | # network connections, both to speed up the tests and to reduce the amount of |
|---|
| 33 | # non-mutable.py code being exercised. |
|---|
| 34 | |
|---|
| 35 | class FakeStorage: |
|---|
| 36 | # this class replaces the collection of storage servers, allowing the |
|---|
| 37 | # tests to examine and manipulate the published shares. It also lets us |
|---|
| 38 | # control the order in which read queries are answered, to exercise more |
|---|
| 39 | # of the error-handling code in Retrieve . |
|---|
| 40 | # |
|---|
| 41 | # Note that we ignore the storage index: this FakeStorage instance can |
|---|
| 42 | # only be used for a single storage index. |
|---|
| 43 | |
|---|
| 44 | |
|---|
| 45 | def __init__(self): |
|---|
| 46 | self._peers = {} |
|---|
| 47 | # _sequence is used to cause the responses to occur in a specific |
|---|
| 48 | # order. If it is in use, then we will defer queries instead of |
|---|
| 49 | # answering them right away, accumulating the Deferreds in a dict. We |
|---|
| 50 | # don't know exactly how many queries we'll get, so exactly one |
|---|
| 51 | # second after the first query arrives, we will release them all (in |
|---|
| 52 | # order). |
|---|
| 53 | self._sequence = None |
|---|
| 54 | self._pending = {} |
|---|
| 55 | self._pending_timer = None |
|---|
| 56 | |
|---|
| 57 | def read(self, peerid, storage_index): |
|---|
| 58 | shares = self._peers.get(peerid, {}) |
|---|
| 59 | if self._sequence is None: |
|---|
| 60 | return eventuaaaaaly(shares) |
|---|
| 61 | d = defer.Deferred() |
|---|
| 62 | if not self._pending: |
|---|
| 63 | self._pending_timer = reactor.callLater(1.0, self._fire_readers) |
|---|
| 64 | if peerid not in self._pending: |
|---|
| 65 | self._pending[peerid] = [] |
|---|
| 66 | self._pending[peerid].append( (d, shares) ) |
|---|
| 67 | return d |
|---|
| 68 | |
|---|
| 69 | def _fire_readers(self): |
|---|
| 70 | self._pending_timer = None |
|---|
| 71 | pending = self._pending |
|---|
| 72 | self._pending = {} |
|---|
| 73 | for peerid in self._sequence: |
|---|
| 74 | if peerid in pending: |
|---|
| 75 | for (d, shares) in pending.pop(peerid): |
|---|
| 76 | eventually(d.callback, shares) |
|---|
| 77 | for peerid in pending: |
|---|
| 78 | for (d, shares) in pending[peerid]: |
|---|
| 79 | eventually(d.callback, shares) |
|---|
| 80 | |
|---|
| 81 | def write(self, peerid, storage_index, shnum, offset, data): |
|---|
| 82 | if peerid not in self._peers: |
|---|
| 83 | self._peers[peerid] = {} |
|---|
| 84 | shares = self._peers[peerid] |
|---|
| 85 | f = BytesIO() |
|---|
| 86 | f.write(shares.get(shnum, b"")) |
|---|
| 87 | f.seek(offset) |
|---|
| 88 | f.write(data) |
|---|
| 89 | shares[shnum] = f.getvalue() |
|---|
| 90 | |
|---|
| 91 | |
|---|
| 92 | # This doesn't actually implement the whole interface, but adding a commented |
|---|
| 93 | # interface implementation annotation for grepping purposes. |
|---|
| 94 | #@implementer(RIStorageServer) |
|---|
| 95 | class FakeStorageServer: |
|---|
| 96 | """ |
|---|
| 97 | A fake Foolscap remote object, implemented by overriding callRemote() to |
|---|
| 98 | call local methods. |
|---|
| 99 | """ |
|---|
| 100 | def __init__(self, peerid, storage): |
|---|
| 101 | self.peerid = peerid |
|---|
| 102 | self.storage = storage |
|---|
| 103 | self.queries = 0 |
|---|
| 104 | |
|---|
| 105 | def callRemote(self, methname, *args, **kwargs): |
|---|
| 106 | self.queries += 1 |
|---|
| 107 | def _call(): |
|---|
| 108 | meth = getattr(self, methname) |
|---|
| 109 | return meth(*args, **kwargs) |
|---|
| 110 | d = fireEventually() |
|---|
| 111 | d.addCallback(lambda res: _call()) |
|---|
| 112 | return d |
|---|
| 113 | |
|---|
| 114 | def callRemoteOnly(self, methname, *args, **kwargs): |
|---|
| 115 | self.queries += 1 |
|---|
| 116 | d = self.callRemote(methname, *args, **kwargs) |
|---|
| 117 | d.addBoth(lambda ignore: None) |
|---|
| 118 | pass |
|---|
| 119 | |
|---|
| 120 | def advise_corrupt_share(self, share_type, storage_index, shnum, reason): |
|---|
| 121 | pass |
|---|
| 122 | |
|---|
| 123 | def slot_readv(self, storage_index, shnums, readv): |
|---|
| 124 | d = self.storage.read(self.peerid, storage_index) |
|---|
| 125 | def _read(shares): |
|---|
| 126 | response = {} |
|---|
| 127 | for shnum in shares: |
|---|
| 128 | if shnums and shnum not in shnums: |
|---|
| 129 | continue |
|---|
| 130 | vector = response[shnum] = [] |
|---|
| 131 | for (offset, length) in readv: |
|---|
| 132 | assert isinstance(offset, int), offset |
|---|
| 133 | assert isinstance(length, int), length |
|---|
| 134 | vector.append(shares[shnum][offset:offset+length]) |
|---|
| 135 | return response |
|---|
| 136 | d.addCallback(_read) |
|---|
| 137 | return d |
|---|
| 138 | |
|---|
| 139 | def slot_testv_and_readv_and_writev(self, storage_index, secrets, |
|---|
| 140 | tw_vectors, read_vector): |
|---|
| 141 | # always-pass: parrot the test vectors back to them. |
|---|
| 142 | readv = {} |
|---|
| 143 | for shnum, (testv, writev, new_length) in list(tw_vectors.items()): |
|---|
| 144 | for (offset, length, op, specimen) in testv: |
|---|
| 145 | assert op == b"eq" |
|---|
| 146 | # TODO: this isn't right, the read is controlled by read_vector, |
|---|
| 147 | # not by testv |
|---|
| 148 | readv[shnum] = [ specimen |
|---|
| 149 | for (offset, length, op, specimen) |
|---|
| 150 | in testv ] |
|---|
| 151 | for (offset, data) in writev: |
|---|
| 152 | self.storage.write(self.peerid, storage_index, shnum, |
|---|
| 153 | offset, data) |
|---|
| 154 | answer = (True, readv) |
|---|
| 155 | return fireEventually(answer) |
|---|
| 156 | |
|---|
| 157 | |
|---|
| 158 | def flip_bit(original, byte_offset): |
|---|
| 159 | return (original[:byte_offset] + |
|---|
| 160 | bchr(ord(original[byte_offset:byte_offset+1]) ^ 0x01) + |
|---|
| 161 | original[byte_offset+1:]) |
|---|
| 162 | |
|---|
| 163 | def add_two(original, byte_offset): |
|---|
| 164 | # It isn't enough to simply flip the bit for the version number, |
|---|
| 165 | # because 1 is a valid version number. So we add two instead. |
|---|
| 166 | return (original[:byte_offset] + |
|---|
| 167 | bchr(ord(original[byte_offset:byte_offset+1]) ^ 0x02) + |
|---|
| 168 | original[byte_offset+1:]) |
|---|
| 169 | |
|---|
| 170 | def corrupt(res, s, offset, shnums_to_corrupt=None, offset_offset=0): |
|---|
| 171 | # if shnums_to_corrupt is None, corrupt all shares. Otherwise it is a |
|---|
| 172 | # list of shnums to corrupt. |
|---|
| 173 | ds = [] |
|---|
| 174 | for peerid in s._peers: |
|---|
| 175 | shares = s._peers[peerid] |
|---|
| 176 | for shnum in shares: |
|---|
| 177 | if (shnums_to_corrupt is not None |
|---|
| 178 | and shnum not in shnums_to_corrupt): |
|---|
| 179 | continue |
|---|
| 180 | data = shares[shnum] |
|---|
| 181 | # We're feeding the reader all of the share data, so it |
|---|
| 182 | # won't need to use the rref that we didn't provide, nor the |
|---|
| 183 | # storage index that we didn't provide. We do this because |
|---|
| 184 | # the reader will work for both MDMF and SDMF. |
|---|
| 185 | reader = MDMFSlotReadProxy(None, None, shnum, data) |
|---|
| 186 | # We need to get the offsets for the next part. |
|---|
| 187 | d = reader.get_verinfo() |
|---|
| 188 | def _do_corruption(verinfo, data, shnum, shares): |
|---|
| 189 | (seqnum, |
|---|
| 190 | root_hash, |
|---|
| 191 | IV, |
|---|
| 192 | segsize, |
|---|
| 193 | datalen, |
|---|
| 194 | k, n, prefix, o) = verinfo |
|---|
| 195 | if isinstance(offset, tuple): |
|---|
| 196 | offset1, offset2 = offset |
|---|
| 197 | else: |
|---|
| 198 | offset1 = offset |
|---|
| 199 | offset2 = 0 |
|---|
| 200 | if offset1 == "pubkey" and IV: |
|---|
| 201 | real_offset = 107 |
|---|
| 202 | elif offset1 in o: |
|---|
| 203 | real_offset = o[offset1] |
|---|
| 204 | else: |
|---|
| 205 | real_offset = offset1 |
|---|
| 206 | real_offset = int(real_offset) + offset2 + offset_offset |
|---|
| 207 | assert isinstance(real_offset, int), offset |
|---|
| 208 | if offset1 == 0: # verbyte |
|---|
| 209 | f = add_two |
|---|
| 210 | else: |
|---|
| 211 | f = flip_bit |
|---|
| 212 | shares[shnum] = f(data, real_offset) |
|---|
| 213 | d.addCallback(_do_corruption, data, shnum, shares) |
|---|
| 214 | ds.append(d) |
|---|
| 215 | dl = defer.DeferredList(ds) |
|---|
| 216 | dl.addCallback(lambda ignored: res) |
|---|
| 217 | return dl |
|---|
| 218 | |
|---|
| 219 | @attr.s |
|---|
| 220 | class Peer: |
|---|
| 221 | peerid = attr.ib() |
|---|
| 222 | storage_server = attr.ib() |
|---|
| 223 | announcement = attr.ib() |
|---|
| 224 | |
|---|
| 225 | def make_peer(s, i): |
|---|
| 226 | """ |
|---|
| 227 | Create a "peer" suitable for use with ``make_storagebroker_with_peers`` or |
|---|
| 228 | ``make_nodemaker_with_peers``. |
|---|
| 229 | |
|---|
| 230 | :param IServer s: The server with which to associate the peers. |
|---|
| 231 | |
|---|
| 232 | :param int i: A unique identifier for this peer within the whole group of |
|---|
| 233 | peers to be used. For example, a sequence number. This is used to |
|---|
| 234 | generate a unique peer id. |
|---|
| 235 | |
|---|
| 236 | :rtype: ``Peer`` |
|---|
| 237 | """ |
|---|
| 238 | peerid = base32.b2a(tagged_hash(b"peerid", b"%d" % i)[:20]) |
|---|
| 239 | fss = FakeStorageServer(peerid, s) |
|---|
| 240 | ann = { |
|---|
| 241 | "anonymous-storage-FURL": "pb://%s@nowhere/fake" % (str(peerid, "utf-8"),), |
|---|
| 242 | "permutation-seed-base32": peerid, |
|---|
| 243 | } |
|---|
| 244 | return Peer(peerid=peerid, storage_server=fss, announcement=ann) |
|---|
| 245 | |
|---|
| 246 | |
|---|
| 247 | def make_storagebroker(s=None, num_peers=10): |
|---|
| 248 | """ |
|---|
| 249 | Make a ``StorageFarmBroker`` connected to some number of fake storage |
|---|
| 250 | servers. |
|---|
| 251 | |
|---|
| 252 | :param IServer s: The server with which to associate the fake storage |
|---|
| 253 | servers. |
|---|
| 254 | |
|---|
| 255 | :param int num_peers: The number of fake storage servers to associate with |
|---|
| 256 | the broker. |
|---|
| 257 | """ |
|---|
| 258 | if not s: |
|---|
| 259 | s = FakeStorage() |
|---|
| 260 | peers = [] |
|---|
| 261 | for peer_num in range(num_peers): |
|---|
| 262 | peers.append(make_peer(s, peer_num)) |
|---|
| 263 | return make_storagebroker_with_peers(peers) |
|---|
| 264 | |
|---|
| 265 | |
|---|
| 266 | def make_storagebroker_with_peers(peers): |
|---|
| 267 | """ |
|---|
| 268 | Make a ``StorageFarmBroker`` connected to the given storage servers. |
|---|
| 269 | |
|---|
| 270 | :param list peers: The storage servers to associate with the storage |
|---|
| 271 | broker. |
|---|
| 272 | """ |
|---|
| 273 | storage_broker = StorageFarmBroker(True, None, EMPTY_CLIENT_CONFIG) |
|---|
| 274 | for peer in peers: |
|---|
| 275 | storage_broker.test_add_rref( |
|---|
| 276 | peer.peerid, |
|---|
| 277 | peer.storage_server, |
|---|
| 278 | peer.announcement, |
|---|
| 279 | ) |
|---|
| 280 | return storage_broker |
|---|
| 281 | |
|---|
| 282 | |
|---|
| 283 | def make_nodemaker(s=None, num_peers=10): |
|---|
| 284 | """ |
|---|
| 285 | Make a ``NodeMaker`` connected to some number of fake storage servers. |
|---|
| 286 | |
|---|
| 287 | :param IServer s: The server with which to associate the fake storage |
|---|
| 288 | servers. |
|---|
| 289 | |
|---|
| 290 | :param int num_peers: The number of fake storage servers to associate with |
|---|
| 291 | the node maker. |
|---|
| 292 | """ |
|---|
| 293 | storage_broker = make_storagebroker(s, num_peers) |
|---|
| 294 | return make_nodemaker_with_storage_broker(storage_broker) |
|---|
| 295 | |
|---|
| 296 | |
|---|
| 297 | def make_nodemaker_with_peers(peers): |
|---|
| 298 | """ |
|---|
| 299 | Make a ``NodeMaker`` connected to the given storage servers. |
|---|
| 300 | |
|---|
| 301 | :param list peers: The storage servers to associate with the node maker. |
|---|
| 302 | """ |
|---|
| 303 | storage_broker = make_storagebroker_with_peers(peers) |
|---|
| 304 | return make_nodemaker_with_storage_broker(storage_broker) |
|---|
| 305 | |
|---|
| 306 | |
|---|
| 307 | def make_nodemaker_with_storage_broker(storage_broker): |
|---|
| 308 | """ |
|---|
| 309 | Make a ``NodeMaker`` using the given storage broker. |
|---|
| 310 | |
|---|
| 311 | :param StorageFarmBroker peers: The storage broker to use. |
|---|
| 312 | """ |
|---|
| 313 | sh = client.SecretHolder(b"lease secret", b"convergence secret") |
|---|
| 314 | keygen = client.KeyGenerator() |
|---|
| 315 | nodemaker = NodeMaker(storage_broker, sh, None, |
|---|
| 316 | None, None, |
|---|
| 317 | {"k": 3, "n": 10}, SDMF_VERSION, keygen) |
|---|
| 318 | return nodemaker |
|---|
| 319 | |
|---|
| 320 | |
|---|
| 321 | class PublishMixin: |
|---|
| 322 | def publish_one(self): |
|---|
| 323 | # publish a file and create shares, which can then be manipulated |
|---|
| 324 | # later. |
|---|
| 325 | self.CONTENTS = b"New contents go here" * 1000 |
|---|
| 326 | self.uploadable = MutableData(self.CONTENTS) |
|---|
| 327 | self._storage = FakeStorage() |
|---|
| 328 | self._nodemaker = make_nodemaker(self._storage) |
|---|
| 329 | self._storage_broker = self._nodemaker.storage_broker |
|---|
| 330 | d = self._nodemaker.create_mutable_file(self.uploadable) |
|---|
| 331 | def _created(node): |
|---|
| 332 | self._fn = node |
|---|
| 333 | self._fn2 = self._nodemaker.create_from_cap(node.get_uri()) |
|---|
| 334 | d.addCallback(_created) |
|---|
| 335 | return d |
|---|
| 336 | |
|---|
| 337 | def publish_mdmf(self, data=None): |
|---|
| 338 | # like publish_one, except that the result is guaranteed to be |
|---|
| 339 | # an MDMF file. |
|---|
| 340 | # self.CONTENTS should have more than one segment. |
|---|
| 341 | if data is None: |
|---|
| 342 | data = b"This is an MDMF file" * 100000 |
|---|
| 343 | self.CONTENTS = data |
|---|
| 344 | self.uploadable = MutableData(self.CONTENTS) |
|---|
| 345 | self._storage = FakeStorage() |
|---|
| 346 | self._nodemaker = make_nodemaker(self._storage) |
|---|
| 347 | self._storage_broker = self._nodemaker.storage_broker |
|---|
| 348 | d = self._nodemaker.create_mutable_file(self.uploadable, version=MDMF_VERSION) |
|---|
| 349 | def _created(node): |
|---|
| 350 | self._fn = node |
|---|
| 351 | self._fn2 = self._nodemaker.create_from_cap(node.get_uri()) |
|---|
| 352 | d.addCallback(_created) |
|---|
| 353 | return d |
|---|
| 354 | |
|---|
| 355 | |
|---|
| 356 | def publish_sdmf(self, data=None): |
|---|
| 357 | # like publish_one, except that the result is guaranteed to be |
|---|
| 358 | # an SDMF file |
|---|
| 359 | if data is None: |
|---|
| 360 | data = b"This is an SDMF file" * 1000 |
|---|
| 361 | self.CONTENTS = data |
|---|
| 362 | self.uploadable = MutableData(self.CONTENTS) |
|---|
| 363 | self._storage = FakeStorage() |
|---|
| 364 | self._nodemaker = make_nodemaker(self._storage) |
|---|
| 365 | self._storage_broker = self._nodemaker.storage_broker |
|---|
| 366 | d = self._nodemaker.create_mutable_file(self.uploadable, version=SDMF_VERSION) |
|---|
| 367 | def _created(node): |
|---|
| 368 | self._fn = node |
|---|
| 369 | self._fn2 = self._nodemaker.create_from_cap(node.get_uri()) |
|---|
| 370 | d.addCallback(_created) |
|---|
| 371 | return d |
|---|
| 372 | |
|---|
| 373 | |
|---|
| 374 | def publish_multiple(self, version=0): |
|---|
| 375 | self.CONTENTS = [b"Contents 0", |
|---|
| 376 | b"Contents 1", |
|---|
| 377 | b"Contents 2", |
|---|
| 378 | b"Contents 3a", |
|---|
| 379 | b"Contents 3b"] |
|---|
| 380 | self.uploadables = [MutableData(d) for d in self.CONTENTS] |
|---|
| 381 | self._copied_shares = {} |
|---|
| 382 | self._storage = FakeStorage() |
|---|
| 383 | self._nodemaker = make_nodemaker(self._storage) |
|---|
| 384 | d = self._nodemaker.create_mutable_file(self.uploadables[0], version=version) # seqnum=1 |
|---|
| 385 | def _created(node): |
|---|
| 386 | self._fn = node |
|---|
| 387 | # now create multiple versions of the same file, and accumulate |
|---|
| 388 | # their shares, so we can mix and match them later. |
|---|
| 389 | d = defer.succeed(None) |
|---|
| 390 | d.addCallback(self._copy_shares, 0) |
|---|
| 391 | d.addCallback(lambda res: node.overwrite(self.uploadables[1])) #s2 |
|---|
| 392 | d.addCallback(self._copy_shares, 1) |
|---|
| 393 | d.addCallback(lambda res: node.overwrite(self.uploadables[2])) #s3 |
|---|
| 394 | d.addCallback(self._copy_shares, 2) |
|---|
| 395 | d.addCallback(lambda res: node.overwrite(self.uploadables[3])) #s4a |
|---|
| 396 | d.addCallback(self._copy_shares, 3) |
|---|
| 397 | # now we replace all the shares with version s3, and upload a new |
|---|
| 398 | # version to get s4b. |
|---|
| 399 | rollback = dict([(i,2) for i in range(10)]) |
|---|
| 400 | d.addCallback(lambda res: self._set_versions(rollback)) |
|---|
| 401 | d.addCallback(lambda res: node.overwrite(self.uploadables[4])) #s4b |
|---|
| 402 | d.addCallback(self._copy_shares, 4) |
|---|
| 403 | # we leave the storage in state 4 |
|---|
| 404 | return d |
|---|
| 405 | d.addCallback(_created) |
|---|
| 406 | return d |
|---|
| 407 | |
|---|
| 408 | |
|---|
| 409 | def _copy_shares(self, ignored, index): |
|---|
| 410 | shares = self._storage._peers |
|---|
| 411 | # we need a deep copy |
|---|
| 412 | new_shares = {} |
|---|
| 413 | for peerid in shares: |
|---|
| 414 | new_shares[peerid] = {} |
|---|
| 415 | for shnum in shares[peerid]: |
|---|
| 416 | new_shares[peerid][shnum] = shares[peerid][shnum] |
|---|
| 417 | self._copied_shares[index] = new_shares |
|---|
| 418 | |
|---|
| 419 | def _set_versions(self, versionmap): |
|---|
| 420 | # versionmap maps shnums to which version (0,1,2,3,4) we want the |
|---|
| 421 | # share to be at. Any shnum which is left out of the map will stay at |
|---|
| 422 | # its current version. |
|---|
| 423 | shares = self._storage._peers |
|---|
| 424 | oldshares = self._copied_shares |
|---|
| 425 | for peerid in shares: |
|---|
| 426 | for shnum in shares[peerid]: |
|---|
| 427 | if shnum in versionmap: |
|---|
| 428 | index = versionmap[shnum] |
|---|
| 429 | shares[peerid][shnum] = oldshares[index][peerid][shnum] |
|---|
| 430 | |
|---|
| 431 | class CheckerMixin: |
|---|
| 432 | def check_good(self, r, where): |
|---|
| 433 | self.failUnless(r.is_healthy(), where) |
|---|
| 434 | return r |
|---|
| 435 | |
|---|
| 436 | def check_bad(self, r, where): |
|---|
| 437 | self.failIf(r.is_healthy(), where) |
|---|
| 438 | return r |
|---|
| 439 | |
|---|
| 440 | def check_expected_failure(self, r, expected_exception, substring, where): |
|---|
| 441 | for (peerid, storage_index, shnum, f) in r.get_share_problems(): |
|---|
| 442 | if f.check(expected_exception): |
|---|
| 443 | self.failUnless(substring in str(f), |
|---|
| 444 | "%s: substring '%s' not in '%s'" % |
|---|
| 445 | (where, substring, str(f))) |
|---|
| 446 | return |
|---|
| 447 | self.fail("%s: didn't see expected exception %s in problems %s" % |
|---|
| 448 | (where, expected_exception, r.get_share_problems())) |
|---|