| 1 | """ |
|---|
| 2 | Ported to Python 3. |
|---|
| 3 | """ |
|---|
| 4 | |
|---|
| 5 | import os, time |
|---|
| 6 | from io import BytesIO |
|---|
| 7 | from itertools import count |
|---|
| 8 | from zope.interface import implementer |
|---|
| 9 | from twisted.internet import defer |
|---|
| 10 | from twisted.python import failure |
|---|
| 11 | |
|---|
| 12 | from allmydata.crypto import aes |
|---|
| 13 | from allmydata.crypto import rsa |
|---|
| 14 | from allmydata.interfaces import IPublishStatus, SDMF_VERSION, MDMF_VERSION, \ |
|---|
| 15 | IMutableUploadable |
|---|
| 16 | from allmydata.util import base32, hashutil, mathutil, log |
|---|
| 17 | from allmydata.util.dictutil import DictOfSets |
|---|
| 18 | from allmydata.util.deferredutil import async_to_deferred |
|---|
| 19 | from allmydata.util.cputhreadpool import defer_to_thread |
|---|
| 20 | from allmydata import hashtree, codec |
|---|
| 21 | from allmydata.storage.server import si_b2a |
|---|
| 22 | from foolscap.api import eventually, fireEventually |
|---|
| 23 | from allmydata.mutable.common import MODE_WRITE, MODE_CHECK, MODE_REPAIR, \ |
|---|
| 24 | UncoordinatedWriteError, NotEnoughServersError |
|---|
| 25 | from allmydata.mutable.servermap import ServerMap |
|---|
| 26 | from allmydata.mutable.layout import get_version_from_checkstring,\ |
|---|
| 27 | unpack_mdmf_checkstring, \ |
|---|
| 28 | unpack_sdmf_checkstring, \ |
|---|
| 29 | MDMFSlotWriteProxy, \ |
|---|
| 30 | SDMFSlotWriteProxy |
|---|
| 31 | |
|---|
| 32 | from eliot import ( |
|---|
| 33 | Message, |
|---|
| 34 | start_action, |
|---|
| 35 | ) |
|---|
| 36 | |
|---|
| 37 | KiB = 1024 |
|---|
| 38 | DEFAULT_MUTABLE_MAX_SEGMENT_SIZE = 128 * KiB |
|---|
| 39 | PUSHING_BLOCKS_STATE = 0 |
|---|
| 40 | PUSHING_EVERYTHING_ELSE_STATE = 1 |
|---|
| 41 | DONE_STATE = 2 |
|---|
| 42 | |
|---|
| 43 | @implementer(IPublishStatus) |
|---|
| 44 | class PublishStatus: |
|---|
| 45 | statusid_counter = count(0) |
|---|
| 46 | def __init__(self): |
|---|
| 47 | self.timings = {} |
|---|
| 48 | self.timings["send_per_server"] = {} |
|---|
| 49 | self.timings["encrypt"] = 0.0 |
|---|
| 50 | self.timings["encode"] = 0.0 |
|---|
| 51 | self.servermap = None |
|---|
| 52 | self._problems = {} |
|---|
| 53 | self.active = True |
|---|
| 54 | self.storage_index = None |
|---|
| 55 | self.helper = False |
|---|
| 56 | self.encoding = ("?", "?") |
|---|
| 57 | self.size = None |
|---|
| 58 | self.status = "Not started" |
|---|
| 59 | self.progress = 0.0 |
|---|
| 60 | self.counter = next(self.statusid_counter) |
|---|
| 61 | self.started = time.time() |
|---|
| 62 | |
|---|
| 63 | def add_per_server_time(self, server, elapsed): |
|---|
| 64 | if server not in self.timings["send_per_server"]: |
|---|
| 65 | self.timings["send_per_server"][server] = [] |
|---|
| 66 | self.timings["send_per_server"][server].append(elapsed) |
|---|
| 67 | def accumulate_encode_time(self, elapsed): |
|---|
| 68 | self.timings["encode"] += elapsed |
|---|
| 69 | def accumulate_encrypt_time(self, elapsed): |
|---|
| 70 | self.timings["encrypt"] += elapsed |
|---|
| 71 | |
|---|
| 72 | def get_started(self): |
|---|
| 73 | return self.started |
|---|
| 74 | def get_storage_index(self): |
|---|
| 75 | return self.storage_index |
|---|
| 76 | def get_encoding(self): |
|---|
| 77 | return self.encoding |
|---|
| 78 | def using_helper(self): |
|---|
| 79 | return self.helper |
|---|
| 80 | def get_servermap(self): |
|---|
| 81 | return self.servermap |
|---|
| 82 | def get_size(self): |
|---|
| 83 | return self.size |
|---|
| 84 | def get_status(self): |
|---|
| 85 | return self.status |
|---|
| 86 | def get_progress(self): |
|---|
| 87 | return self.progress |
|---|
| 88 | def get_active(self): |
|---|
| 89 | return self.active |
|---|
| 90 | def get_counter(self): |
|---|
| 91 | return self.counter |
|---|
| 92 | def get_problems(self): |
|---|
| 93 | return self._problems |
|---|
| 94 | |
|---|
| 95 | def set_storage_index(self, si): |
|---|
| 96 | self.storage_index = si |
|---|
| 97 | def set_helper(self, helper): |
|---|
| 98 | self.helper = helper |
|---|
| 99 | def set_servermap(self, servermap): |
|---|
| 100 | self.servermap = servermap |
|---|
| 101 | def set_encoding(self, k, n): |
|---|
| 102 | self.encoding = (k, n) |
|---|
| 103 | def set_size(self, size): |
|---|
| 104 | self.size = size |
|---|
| 105 | def set_status(self, status): |
|---|
| 106 | self.status = status |
|---|
| 107 | def set_progress(self, value): |
|---|
| 108 | self.progress = value |
|---|
| 109 | def set_active(self, value): |
|---|
| 110 | self.active = value |
|---|
| 111 | |
|---|
| 112 | class LoopLimitExceededError(Exception): |
|---|
| 113 | pass |
|---|
| 114 | |
|---|
| 115 | class Publish: |
|---|
| 116 | """I represent a single act of publishing the mutable file to the grid. I |
|---|
| 117 | will only publish my data if the servermap I am using still represents |
|---|
| 118 | the current state of the world. |
|---|
| 119 | |
|---|
| 120 | To make the initial publish, set servermap to None. |
|---|
| 121 | """ |
|---|
| 122 | |
|---|
| 123 | def __init__(self, filenode, storage_broker, servermap): |
|---|
| 124 | self._node = filenode |
|---|
| 125 | self._storage_broker = storage_broker |
|---|
| 126 | self._servermap = servermap |
|---|
| 127 | self._storage_index = self._node.get_storage_index() |
|---|
| 128 | self._log_prefix = prefix = si_b2a(self._storage_index)[:5] |
|---|
| 129 | num = self.log("Publish(%r): starting" % prefix, parent=None) |
|---|
| 130 | self._log_number = num |
|---|
| 131 | self._running = True |
|---|
| 132 | self._first_write_error = None |
|---|
| 133 | self._last_failure = None |
|---|
| 134 | |
|---|
| 135 | self._status = PublishStatus() |
|---|
| 136 | self._status.set_storage_index(self._storage_index) |
|---|
| 137 | self._status.set_helper(False) |
|---|
| 138 | self._status.set_progress(0.0) |
|---|
| 139 | self._status.set_active(True) |
|---|
| 140 | self._version = self._node.get_version() |
|---|
| 141 | assert self._version in (SDMF_VERSION, MDMF_VERSION) |
|---|
| 142 | |
|---|
| 143 | |
|---|
| 144 | def get_status(self): |
|---|
| 145 | return self._status |
|---|
| 146 | |
|---|
| 147 | def log(self, *args, **kwargs): |
|---|
| 148 | if 'parent' not in kwargs: |
|---|
| 149 | kwargs['parent'] = self._log_number |
|---|
| 150 | if "facility" not in kwargs: |
|---|
| 151 | kwargs["facility"] = "tahoe.mutable.publish" |
|---|
| 152 | return log.msg(*args, **kwargs) |
|---|
| 153 | |
|---|
| 154 | |
|---|
| 155 | def update(self, data, offset, blockhashes, version): |
|---|
| 156 | """ |
|---|
| 157 | I replace the contents of this file with the contents of data, |
|---|
| 158 | starting at offset. I return a Deferred that fires with None |
|---|
| 159 | when the replacement has been completed, or with an error if |
|---|
| 160 | something went wrong during the process. |
|---|
| 161 | |
|---|
| 162 | Note that this process will not upload new shares. If the file |
|---|
| 163 | being updated is in need of repair, callers will have to repair |
|---|
| 164 | it on their own. |
|---|
| 165 | """ |
|---|
| 166 | # How this works: |
|---|
| 167 | # 1: Make server assignments. We'll assign each share that we know |
|---|
| 168 | # about on the grid to that server that currently holds that |
|---|
| 169 | # share, and will not place any new shares. |
|---|
| 170 | # 2: Setup encoding parameters. Most of these will stay the same |
|---|
| 171 | # -- datalength will change, as will some of the offsets. |
|---|
| 172 | # 3. Upload the new segments. |
|---|
| 173 | # 4. Be done. |
|---|
| 174 | assert IMutableUploadable.providedBy(data) |
|---|
| 175 | |
|---|
| 176 | self.data = data |
|---|
| 177 | |
|---|
| 178 | # XXX: Use the MutableFileVersion instead. |
|---|
| 179 | self.datalength = self._node.get_size() |
|---|
| 180 | if data.get_size() > self.datalength: |
|---|
| 181 | self.datalength = data.get_size() |
|---|
| 182 | |
|---|
| 183 | self.log("starting update") |
|---|
| 184 | self.log("adding new data of length %d at offset %d" % \ |
|---|
| 185 | (data.get_size(), offset)) |
|---|
| 186 | self.log("new data length is %d" % self.datalength) |
|---|
| 187 | self._status.set_size(self.datalength) |
|---|
| 188 | self._status.set_status("Started") |
|---|
| 189 | self._started = time.time() |
|---|
| 190 | |
|---|
| 191 | self.done_deferred = defer.Deferred() |
|---|
| 192 | |
|---|
| 193 | self._writekey = self._node.get_writekey() |
|---|
| 194 | assert self._writekey, "need write capability to publish" |
|---|
| 195 | |
|---|
| 196 | # first, which servers will we publish to? We require that the |
|---|
| 197 | # servermap was updated in MODE_WRITE, so we can depend upon the |
|---|
| 198 | # serverlist computed by that process instead of computing our own. |
|---|
| 199 | assert self._servermap |
|---|
| 200 | assert self._servermap.get_last_update()[0] in (MODE_WRITE, MODE_CHECK, MODE_REPAIR) |
|---|
| 201 | # we will push a version that is one larger than anything present |
|---|
| 202 | # in the grid, according to the servermap. |
|---|
| 203 | self._new_seqnum = self._servermap.highest_seqnum() + 1 |
|---|
| 204 | self._status.set_servermap(self._servermap) |
|---|
| 205 | |
|---|
| 206 | self.log(format="new seqnum will be %(seqnum)d", |
|---|
| 207 | seqnum=self._new_seqnum, level=log.NOISY) |
|---|
| 208 | |
|---|
| 209 | # We're updating an existing file, so all of the following |
|---|
| 210 | # should be available. |
|---|
| 211 | self.readkey = self._node.get_readkey() |
|---|
| 212 | self.required_shares = self._node.get_required_shares() |
|---|
| 213 | assert self.required_shares is not None |
|---|
| 214 | self.total_shares = self._node.get_total_shares() |
|---|
| 215 | assert self.total_shares is not None |
|---|
| 216 | self._status.set_encoding(self.required_shares, self.total_shares) |
|---|
| 217 | |
|---|
| 218 | self._pubkey = self._node.get_pubkey() |
|---|
| 219 | assert self._pubkey |
|---|
| 220 | self._privkey = self._node.get_privkey() |
|---|
| 221 | assert self._privkey |
|---|
| 222 | self._encprivkey = self._node.get_encprivkey() |
|---|
| 223 | |
|---|
| 224 | sb = self._storage_broker |
|---|
| 225 | full_serverlist = list(sb.get_servers_for_psi(self._storage_index)) |
|---|
| 226 | self.full_serverlist = full_serverlist # for use later, immutable |
|---|
| 227 | self.bad_servers = set() # servers who have errbacked/refused requests |
|---|
| 228 | |
|---|
| 229 | # This will set self.segment_size, self.num_segments, and |
|---|
| 230 | # self.fec. TODO: Does it know how to do the offset? Probably |
|---|
| 231 | # not. So do that part next. |
|---|
| 232 | self.setup_encoding_parameters(offset=offset) |
|---|
| 233 | |
|---|
| 234 | # if we experience any surprises (writes which were rejected because |
|---|
| 235 | # our test vector did not match, or shares which we didn't expect to |
|---|
| 236 | # see), we set this flag and report an UncoordinatedWriteError at the |
|---|
| 237 | # end of the publish process. |
|---|
| 238 | self.surprised = False |
|---|
| 239 | |
|---|
| 240 | # we keep track of three tables. The first is our goal: which share |
|---|
| 241 | # we want to see on which servers. This is initially populated by the |
|---|
| 242 | # existing servermap. |
|---|
| 243 | self.goal = set() # pairs of (server, shnum) tuples |
|---|
| 244 | |
|---|
| 245 | # the number of outstanding queries: those that are in flight and |
|---|
| 246 | # may or may not be delivered, accepted, or acknowledged. This is |
|---|
| 247 | # incremented when a query is sent, and decremented when the response |
|---|
| 248 | # returns or errbacks. |
|---|
| 249 | self.num_outstanding = 0 |
|---|
| 250 | |
|---|
| 251 | # the third is a table of successes: share which have actually been |
|---|
| 252 | # placed. These are populated when responses come back with success. |
|---|
| 253 | # When self.placed == self.goal, we're done. |
|---|
| 254 | self.placed = set() # (server, shnum) tuples |
|---|
| 255 | |
|---|
| 256 | self.bad_share_checkstrings = {} |
|---|
| 257 | |
|---|
| 258 | # This is set at the last step of the publishing process. |
|---|
| 259 | self.versioninfo = "" |
|---|
| 260 | |
|---|
| 261 | # we use the servermap to populate the initial goal: this way we will |
|---|
| 262 | # try to update each existing share in place. Since we're |
|---|
| 263 | # updating, we ignore damaged and missing shares -- callers must |
|---|
| 264 | # do a repair to repair and recreate these. |
|---|
| 265 | self.goal = set(self._servermap.get_known_shares()) |
|---|
| 266 | |
|---|
| 267 | # shnum -> set of IMutableSlotWriter |
|---|
| 268 | self.writers = DictOfSets() |
|---|
| 269 | |
|---|
| 270 | # SDMF files are updated differently. |
|---|
| 271 | self._version = MDMF_VERSION |
|---|
| 272 | writer_class = MDMFSlotWriteProxy |
|---|
| 273 | |
|---|
| 274 | # For each (server, shnum) in self.goal, we make a |
|---|
| 275 | # write proxy for that server. We'll use this to write |
|---|
| 276 | # shares to the server. |
|---|
| 277 | for (server,shnum) in self.goal: |
|---|
| 278 | write_enabler = self._node.get_write_enabler(server) |
|---|
| 279 | renew_secret = self._node.get_renewal_secret(server) |
|---|
| 280 | cancel_secret = self._node.get_cancel_secret(server) |
|---|
| 281 | secrets = (write_enabler, renew_secret, cancel_secret) |
|---|
| 282 | |
|---|
| 283 | writer = writer_class(shnum, |
|---|
| 284 | server.get_storage_server(), |
|---|
| 285 | self._storage_index, |
|---|
| 286 | secrets, |
|---|
| 287 | self._new_seqnum, |
|---|
| 288 | self.required_shares, |
|---|
| 289 | self.total_shares, |
|---|
| 290 | self.segment_size, |
|---|
| 291 | self.datalength) |
|---|
| 292 | |
|---|
| 293 | self.writers.add(shnum, writer) |
|---|
| 294 | writer.server = server |
|---|
| 295 | known_shares = self._servermap.get_known_shares() |
|---|
| 296 | assert (server, shnum) in known_shares |
|---|
| 297 | old_versionid, old_timestamp = known_shares[(server,shnum)] |
|---|
| 298 | (old_seqnum, old_root_hash, old_salt, old_segsize, |
|---|
| 299 | old_datalength, old_k, old_N, old_prefix, |
|---|
| 300 | old_offsets_tuple) = old_versionid |
|---|
| 301 | writer.set_checkstring(old_seqnum, |
|---|
| 302 | old_root_hash, |
|---|
| 303 | old_salt) |
|---|
| 304 | |
|---|
| 305 | # Our remote shares will not have a complete checkstring until |
|---|
| 306 | # after we are done writing share data and have started to write |
|---|
| 307 | # blocks. In the meantime, we need to know what to look for when |
|---|
| 308 | # writing, so that we can detect UncoordinatedWriteErrors. |
|---|
| 309 | self._checkstring = self._get_some_writer().get_checkstring() |
|---|
| 310 | |
|---|
| 311 | # Now, we start pushing shares. |
|---|
| 312 | self._status.timings["setup"] = time.time() - self._started |
|---|
| 313 | # First, we encrypt, encode, and publish the shares that we need |
|---|
| 314 | # to encrypt, encode, and publish. |
|---|
| 315 | |
|---|
| 316 | # Our update process fetched these for us. We need to update |
|---|
| 317 | # them in place as publishing happens. |
|---|
| 318 | self.blockhashes = {} # (shnum, [blochashes]) |
|---|
| 319 | for (i, bht) in list(blockhashes.items()): |
|---|
| 320 | # We need to extract the leaves from our old hash tree. |
|---|
| 321 | old_segcount = mathutil.div_ceil(version[4], |
|---|
| 322 | version[3]) |
|---|
| 323 | h = hashtree.IncompleteHashTree(old_segcount) |
|---|
| 324 | bht = dict(enumerate(bht)) |
|---|
| 325 | h.set_hashes(bht) |
|---|
| 326 | leaves = h[h.get_leaf_index(0):] |
|---|
| 327 | for j in range(self.num_segments - len(leaves)): |
|---|
| 328 | leaves.append(None) |
|---|
| 329 | |
|---|
| 330 | assert len(leaves) >= self.num_segments |
|---|
| 331 | self.blockhashes[i] = leaves |
|---|
| 332 | # This list will now be the leaves that were set during the |
|---|
| 333 | # initial upload + enough empty hashes to make it a |
|---|
| 334 | # power-of-two. If we exceed a power of two boundary, we |
|---|
| 335 | # should be encoding the file over again, and should not be |
|---|
| 336 | # here. So, we have |
|---|
| 337 | #assert len(self.blockhashes[i]) == \ |
|---|
| 338 | # hashtree.roundup_pow2(self.num_segments), \ |
|---|
| 339 | # len(self.blockhashes[i]) |
|---|
| 340 | # XXX: Except this doesn't work. Figure out why. |
|---|
| 341 | |
|---|
| 342 | # These are filled in later, after we've modified the block hash |
|---|
| 343 | # tree suitably. |
|---|
| 344 | self.sharehash_leaves = None # eventually [sharehashes] |
|---|
| 345 | self.sharehashes = {} # shnum -> [sharehash leaves necessary to |
|---|
| 346 | # validate the share] |
|---|
| 347 | |
|---|
| 348 | self.log("Starting push") |
|---|
| 349 | |
|---|
| 350 | self._state = PUSHING_BLOCKS_STATE |
|---|
| 351 | self._push() |
|---|
| 352 | |
|---|
| 353 | return self.done_deferred |
|---|
| 354 | |
|---|
| 355 | |
|---|
| 356 | def publish(self, newdata): |
|---|
| 357 | """Publish the filenode's current contents. Returns a Deferred that |
|---|
| 358 | fires (with None) when the publish has done as much work as it's ever |
|---|
| 359 | going to do, or errbacks with ConsistencyError if it detects a |
|---|
| 360 | simultaneous write. |
|---|
| 361 | """ |
|---|
| 362 | |
|---|
| 363 | # 0. Setup encoding parameters, encoder, and other such things. |
|---|
| 364 | # 1. Encrypt, encode, and publish segments. |
|---|
| 365 | assert IMutableUploadable.providedBy(newdata) |
|---|
| 366 | |
|---|
| 367 | self.data = newdata |
|---|
| 368 | self.datalength = newdata.get_size() |
|---|
| 369 | #if self.datalength >= DEFAULT_MUTABLE_MAX_SEGMENT_SIZE: |
|---|
| 370 | # self._version = MDMF_VERSION |
|---|
| 371 | #else: |
|---|
| 372 | # self._version = SDMF_VERSION |
|---|
| 373 | |
|---|
| 374 | self.log("starting publish, datalen is %s" % self.datalength) |
|---|
| 375 | self._status.set_size(self.datalength) |
|---|
| 376 | self._status.set_status("Started") |
|---|
| 377 | self._started = time.time() |
|---|
| 378 | |
|---|
| 379 | self.done_deferred = defer.Deferred() |
|---|
| 380 | |
|---|
| 381 | self._writekey = self._node.get_writekey() |
|---|
| 382 | assert self._writekey, "need write capability to publish" |
|---|
| 383 | |
|---|
| 384 | # first, which servers will we publish to? We require that the |
|---|
| 385 | # servermap was updated in MODE_WRITE, so we can depend upon the |
|---|
| 386 | # serverlist computed by that process instead of computing our own. |
|---|
| 387 | if self._servermap: |
|---|
| 388 | assert self._servermap.get_last_update()[0] in (MODE_WRITE, MODE_CHECK, MODE_REPAIR) |
|---|
| 389 | # we will push a version that is one larger than anything present |
|---|
| 390 | # in the grid, according to the servermap. |
|---|
| 391 | self._new_seqnum = self._servermap.highest_seqnum() + 1 |
|---|
| 392 | else: |
|---|
| 393 | # If we don't have a servermap, that's because we're doing the |
|---|
| 394 | # initial publish |
|---|
| 395 | self._new_seqnum = 1 |
|---|
| 396 | self._servermap = ServerMap() |
|---|
| 397 | self._status.set_servermap(self._servermap) |
|---|
| 398 | |
|---|
| 399 | self.log(format="new seqnum will be %(seqnum)d", |
|---|
| 400 | seqnum=self._new_seqnum, level=log.NOISY) |
|---|
| 401 | |
|---|
| 402 | # having an up-to-date servermap (or using a filenode that was just |
|---|
| 403 | # created for the first time) also guarantees that the following |
|---|
| 404 | # fields are available |
|---|
| 405 | self.readkey = self._node.get_readkey() |
|---|
| 406 | self.required_shares = self._node.get_required_shares() |
|---|
| 407 | assert self.required_shares is not None |
|---|
| 408 | self.total_shares = self._node.get_total_shares() |
|---|
| 409 | assert self.total_shares is not None |
|---|
| 410 | self._status.set_encoding(self.required_shares, self.total_shares) |
|---|
| 411 | |
|---|
| 412 | self._pubkey = self._node.get_pubkey() |
|---|
| 413 | assert self._pubkey |
|---|
| 414 | self._privkey = self._node.get_privkey() |
|---|
| 415 | assert self._privkey |
|---|
| 416 | self._encprivkey = self._node.get_encprivkey() |
|---|
| 417 | |
|---|
| 418 | sb = self._storage_broker |
|---|
| 419 | full_serverlist = list(sb.get_servers_for_psi(self._storage_index)) |
|---|
| 420 | self.full_serverlist = full_serverlist # for use later, immutable |
|---|
| 421 | self.bad_servers = set() # servers who have errbacked/refused requests |
|---|
| 422 | |
|---|
| 423 | # This will set self.segment_size, self.num_segments, and |
|---|
| 424 | # self.fec. |
|---|
| 425 | self.setup_encoding_parameters() |
|---|
| 426 | |
|---|
| 427 | # if we experience any surprises (writes which were rejected because |
|---|
| 428 | # our test vector did not match, or shares which we didn't expect to |
|---|
| 429 | # see), we set this flag and report an UncoordinatedWriteError at the |
|---|
| 430 | # end of the publish process. |
|---|
| 431 | self.surprised = False |
|---|
| 432 | |
|---|
| 433 | # we keep track of three tables. The first is our goal: which share |
|---|
| 434 | # we want to see on which servers. This is initially populated by the |
|---|
| 435 | # existing servermap. |
|---|
| 436 | self.goal = set() # pairs of (server, shnum) tuples |
|---|
| 437 | |
|---|
| 438 | # the number of outstanding queries: those that are in flight and |
|---|
| 439 | # may or may not be delivered, accepted, or acknowledged. This is |
|---|
| 440 | # incremented when a query is sent, and decremented when the response |
|---|
| 441 | # returns or errbacks. |
|---|
| 442 | self.num_outstanding = 0 |
|---|
| 443 | |
|---|
| 444 | # the third is a table of successes: share which have actually been |
|---|
| 445 | # placed. These are populated when responses come back with success. |
|---|
| 446 | # When self.placed == self.goal, we're done. |
|---|
| 447 | self.placed = set() # (server, shnum) tuples |
|---|
| 448 | |
|---|
| 449 | self.bad_share_checkstrings = {} |
|---|
| 450 | |
|---|
| 451 | # This is set at the last step of the publishing process. |
|---|
| 452 | self.versioninfo = "" |
|---|
| 453 | |
|---|
| 454 | # we use the servermap to populate the initial goal: this way we will |
|---|
| 455 | # try to update each existing share in place. |
|---|
| 456 | self.goal = set(self._servermap.get_known_shares()) |
|---|
| 457 | |
|---|
| 458 | # then we add in all the shares that were bad (corrupted, bad |
|---|
| 459 | # signatures, etc). We want to replace these. |
|---|
| 460 | for key, old_checkstring in list(self._servermap.get_bad_shares().items()): |
|---|
| 461 | (server, shnum) = key |
|---|
| 462 | self.goal.add( (server,shnum) ) |
|---|
| 463 | self.bad_share_checkstrings[(server,shnum)] = old_checkstring |
|---|
| 464 | |
|---|
| 465 | # TODO: Make this part do server selection. |
|---|
| 466 | self.update_goal() |
|---|
| 467 | |
|---|
| 468 | # shnum -> set of IMutableSlotWriter |
|---|
| 469 | self.writers = DictOfSets() |
|---|
| 470 | |
|---|
| 471 | if self._version == MDMF_VERSION: |
|---|
| 472 | writer_class = MDMFSlotWriteProxy |
|---|
| 473 | else: |
|---|
| 474 | writer_class = SDMFSlotWriteProxy |
|---|
| 475 | |
|---|
| 476 | # For each (server, shnum) in self.goal, we make a |
|---|
| 477 | # write proxy for that server. We'll use this to write |
|---|
| 478 | # shares to the server. |
|---|
| 479 | for (server,shnum) in self.goal: |
|---|
| 480 | write_enabler = self._node.get_write_enabler(server) |
|---|
| 481 | renew_secret = self._node.get_renewal_secret(server) |
|---|
| 482 | cancel_secret = self._node.get_cancel_secret(server) |
|---|
| 483 | secrets = (write_enabler, renew_secret, cancel_secret) |
|---|
| 484 | |
|---|
| 485 | writer = writer_class(shnum, |
|---|
| 486 | server.get_storage_server(), |
|---|
| 487 | self._storage_index, |
|---|
| 488 | secrets, |
|---|
| 489 | self._new_seqnum, |
|---|
| 490 | self.required_shares, |
|---|
| 491 | self.total_shares, |
|---|
| 492 | self.segment_size, |
|---|
| 493 | self.datalength) |
|---|
| 494 | self.writers.add(shnum, writer) |
|---|
| 495 | writer.server = server |
|---|
| 496 | known_shares = self._servermap.get_known_shares() |
|---|
| 497 | if (server, shnum) in known_shares: |
|---|
| 498 | old_versionid, old_timestamp = known_shares[(server,shnum)] |
|---|
| 499 | (old_seqnum, old_root_hash, old_salt, old_segsize, |
|---|
| 500 | old_datalength, old_k, old_N, old_prefix, |
|---|
| 501 | old_offsets_tuple) = old_versionid |
|---|
| 502 | writer.set_checkstring(old_seqnum, |
|---|
| 503 | old_root_hash, |
|---|
| 504 | old_salt) |
|---|
| 505 | elif (server, shnum) in self.bad_share_checkstrings: |
|---|
| 506 | old_checkstring = self.bad_share_checkstrings[(server, shnum)] |
|---|
| 507 | writer.set_checkstring(old_checkstring) |
|---|
| 508 | |
|---|
| 509 | # Our remote shares will not have a complete checkstring until |
|---|
| 510 | # after we are done writing share data and have started to write |
|---|
| 511 | # blocks. In the meantime, we need to know what to look for when |
|---|
| 512 | # writing, so that we can detect UncoordinatedWriteErrors. |
|---|
| 513 | self._checkstring = self._get_some_writer().get_checkstring() |
|---|
| 514 | |
|---|
| 515 | # Now, we start pushing shares. |
|---|
| 516 | self._status.timings["setup"] = time.time() - self._started |
|---|
| 517 | # First, we encrypt, encode, and publish the shares that we need |
|---|
| 518 | # to encrypt, encode, and publish. |
|---|
| 519 | |
|---|
| 520 | # This will eventually hold the block hash chain for each share |
|---|
| 521 | # that we publish. We define it this way so that empty publishes |
|---|
| 522 | # will still have something to write to the remote slot. |
|---|
| 523 | self.blockhashes = dict([(i, []) for i in range(self.total_shares)]) |
|---|
| 524 | for i in range(self.total_shares): |
|---|
| 525 | blocks = self.blockhashes[i] |
|---|
| 526 | for j in range(self.num_segments): |
|---|
| 527 | blocks.append(None) |
|---|
| 528 | self.sharehash_leaves = None # eventually [sharehashes] |
|---|
| 529 | self.sharehashes = {} # shnum -> [sharehash leaves necessary to |
|---|
| 530 | # validate the share] |
|---|
| 531 | |
|---|
| 532 | self.log("Starting push") |
|---|
| 533 | |
|---|
| 534 | self._state = PUSHING_BLOCKS_STATE |
|---|
| 535 | self._push() |
|---|
| 536 | |
|---|
| 537 | return self.done_deferred |
|---|
| 538 | |
|---|
| 539 | def _get_some_writer(self): |
|---|
| 540 | return list(list(self.writers.values())[0])[0] |
|---|
| 541 | |
|---|
| 542 | def _update_status(self): |
|---|
| 543 | self._status.set_status("Sending Shares: %d placed out of %d, " |
|---|
| 544 | "%d messages outstanding" % |
|---|
| 545 | (len(self.placed), |
|---|
| 546 | len(self.goal), |
|---|
| 547 | self.num_outstanding)) |
|---|
| 548 | self._status.set_progress(1.0 * len(self.placed) / len(self.goal)) |
|---|
| 549 | |
|---|
| 550 | |
|---|
| 551 | def setup_encoding_parameters(self, offset=0): |
|---|
| 552 | if self._version == MDMF_VERSION: |
|---|
| 553 | segment_size = DEFAULT_MUTABLE_MAX_SEGMENT_SIZE # 128 KiB by default |
|---|
| 554 | else: |
|---|
| 555 | segment_size = self.datalength # SDMF is only one segment |
|---|
| 556 | # this must be a multiple of self.required_shares |
|---|
| 557 | segment_size = mathutil.next_multiple(segment_size, |
|---|
| 558 | self.required_shares) |
|---|
| 559 | self.segment_size = segment_size |
|---|
| 560 | |
|---|
| 561 | # Calculate the starting segment for the upload. |
|---|
| 562 | if segment_size: |
|---|
| 563 | # We use div_ceil instead of integer division here because |
|---|
| 564 | # it is semantically correct. |
|---|
| 565 | # If datalength isn't an even multiple of segment_size, but |
|---|
| 566 | # is larger than segment_size, datalength // segment_size |
|---|
| 567 | # will be the largest number such that num <= datalength and |
|---|
| 568 | # num % segment_size == 0. But that's not what we want, |
|---|
| 569 | # because it ignores the extra data. div_ceil will give us |
|---|
| 570 | # the right number of segments for the data that we're |
|---|
| 571 | # given. |
|---|
| 572 | self.num_segments = mathutil.div_ceil(self.datalength, |
|---|
| 573 | segment_size) |
|---|
| 574 | |
|---|
| 575 | self.starting_segment = offset // segment_size |
|---|
| 576 | |
|---|
| 577 | else: |
|---|
| 578 | self.num_segments = 0 |
|---|
| 579 | self.starting_segment = 0 |
|---|
| 580 | |
|---|
| 581 | |
|---|
| 582 | self.log("building encoding parameters for file") |
|---|
| 583 | self.log("got segsize %d" % self.segment_size) |
|---|
| 584 | self.log("got %d segments" % self.num_segments) |
|---|
| 585 | |
|---|
| 586 | if self._version == SDMF_VERSION: |
|---|
| 587 | assert self.num_segments in (0, 1) # SDMF |
|---|
| 588 | # calculate the tail segment size. |
|---|
| 589 | |
|---|
| 590 | if segment_size and self.datalength: |
|---|
| 591 | self.tail_segment_size = self.datalength % segment_size |
|---|
| 592 | self.log("got tail segment size %d" % self.tail_segment_size) |
|---|
| 593 | else: |
|---|
| 594 | self.tail_segment_size = 0 |
|---|
| 595 | |
|---|
| 596 | if self.tail_segment_size == 0 and segment_size: |
|---|
| 597 | # The tail segment is the same size as the other segments. |
|---|
| 598 | self.tail_segment_size = segment_size |
|---|
| 599 | |
|---|
| 600 | # Make FEC encoders |
|---|
| 601 | fec = codec.CRSEncoder() |
|---|
| 602 | fec.set_params(self.segment_size, |
|---|
| 603 | self.required_shares, self.total_shares) |
|---|
| 604 | self.piece_size = fec.get_block_size() |
|---|
| 605 | self.fec = fec |
|---|
| 606 | |
|---|
| 607 | if self.tail_segment_size == self.segment_size: |
|---|
| 608 | self.tail_fec = self.fec |
|---|
| 609 | else: |
|---|
| 610 | tail_fec = codec.CRSEncoder() |
|---|
| 611 | tail_fec.set_params(self.tail_segment_size, |
|---|
| 612 | self.required_shares, |
|---|
| 613 | self.total_shares) |
|---|
| 614 | self.tail_fec = tail_fec |
|---|
| 615 | |
|---|
| 616 | self._current_segment = self.starting_segment |
|---|
| 617 | self.end_segment = self.num_segments - 1 |
|---|
| 618 | # Now figure out where the last segment should be. |
|---|
| 619 | if self.data.get_size() != self.datalength: |
|---|
| 620 | # We're updating a few segments in the middle of a mutable |
|---|
| 621 | # file, so we don't want to republish the whole thing. |
|---|
| 622 | # (we don't have enough data to do that even if we wanted |
|---|
| 623 | # to) |
|---|
| 624 | end = self.data.get_size() |
|---|
| 625 | self.end_segment = end // segment_size |
|---|
| 626 | if end % segment_size == 0: |
|---|
| 627 | self.end_segment -= 1 |
|---|
| 628 | |
|---|
| 629 | self.log("got start segment %d" % self.starting_segment) |
|---|
| 630 | self.log("got end segment %d" % self.end_segment) |
|---|
| 631 | |
|---|
| 632 | |
|---|
| 633 | def _push(self, ignored=None): |
|---|
| 634 | """ |
|---|
| 635 | I manage state transitions. In particular, I see that we still |
|---|
| 636 | have a good enough number of writers to complete the upload |
|---|
| 637 | successfully. |
|---|
| 638 | """ |
|---|
| 639 | # Can we still successfully publish this file? |
|---|
| 640 | # TODO: Keep track of outstanding queries before aborting the |
|---|
| 641 | # process. |
|---|
| 642 | num_shnums = len(self.writers) |
|---|
| 643 | if num_shnums < self.required_shares or self.surprised: |
|---|
| 644 | return self._failure() |
|---|
| 645 | |
|---|
| 646 | # Figure out what we need to do next. Each of these needs to |
|---|
| 647 | # return a deferred so that we don't block execution when this |
|---|
| 648 | # is first called in the upload method. |
|---|
| 649 | if self._state == PUSHING_BLOCKS_STATE: |
|---|
| 650 | return self.push_segment(self._current_segment) |
|---|
| 651 | |
|---|
| 652 | elif self._state == PUSHING_EVERYTHING_ELSE_STATE: |
|---|
| 653 | return self.push_everything_else() |
|---|
| 654 | |
|---|
| 655 | # If we make it to this point, we were successful in placing the |
|---|
| 656 | # file. |
|---|
| 657 | return self._done() |
|---|
| 658 | |
|---|
| 659 | |
|---|
| 660 | def push_segment(self, segnum): |
|---|
| 661 | if self.num_segments == 0 and self._version == SDMF_VERSION: |
|---|
| 662 | self._add_dummy_salts() |
|---|
| 663 | |
|---|
| 664 | if segnum > self.end_segment: |
|---|
| 665 | # We don't have any more segments to push. |
|---|
| 666 | self._state = PUSHING_EVERYTHING_ELSE_STATE |
|---|
| 667 | return self._push() |
|---|
| 668 | |
|---|
| 669 | d = self._encode_segment(segnum) |
|---|
| 670 | d.addCallback(self._push_segment, segnum) |
|---|
| 671 | def _increment_segnum(ign): |
|---|
| 672 | self._current_segment += 1 |
|---|
| 673 | # XXX: I don't think we need to do addBoth here -- any errBacks |
|---|
| 674 | # should be handled within push_segment. |
|---|
| 675 | d.addCallback(_increment_segnum) |
|---|
| 676 | d.addCallback(self._turn_barrier) |
|---|
| 677 | d.addCallback(self._push) |
|---|
| 678 | d.addErrback(self._failure) |
|---|
| 679 | |
|---|
| 680 | |
|---|
| 681 | def _turn_barrier(self, result): |
|---|
| 682 | """ |
|---|
| 683 | I help the publish process avoid the recursion limit issues |
|---|
| 684 | described in #237. |
|---|
| 685 | """ |
|---|
| 686 | return fireEventually(result) |
|---|
| 687 | |
|---|
| 688 | |
|---|
| 689 | def _add_dummy_salts(self): |
|---|
| 690 | """ |
|---|
| 691 | SDMF files need a salt even if they're empty, or the signature |
|---|
| 692 | won't make sense. This method adds a dummy salt to each of our |
|---|
| 693 | SDMF writers so that they can write the signature later. |
|---|
| 694 | """ |
|---|
| 695 | salt = os.urandom(16) |
|---|
| 696 | assert self._version == SDMF_VERSION |
|---|
| 697 | |
|---|
| 698 | for shnum, writers in self.writers.items(): |
|---|
| 699 | for writer in writers: |
|---|
| 700 | writer.put_salt(salt) |
|---|
| 701 | |
|---|
| 702 | |
|---|
| 703 | @async_to_deferred |
|---|
| 704 | async def _encode_segment(self, segnum): |
|---|
| 705 | """ |
|---|
| 706 | I encrypt and encode the segment segnum. |
|---|
| 707 | """ |
|---|
| 708 | started = time.time() |
|---|
| 709 | |
|---|
| 710 | if segnum + 1 == self.num_segments: |
|---|
| 711 | segsize = self.tail_segment_size |
|---|
| 712 | else: |
|---|
| 713 | segsize = self.segment_size |
|---|
| 714 | |
|---|
| 715 | |
|---|
| 716 | self.log("Pushing segment %d of %d" % (segnum + 1, self.num_segments)) |
|---|
| 717 | data = self.data.read(segsize) |
|---|
| 718 | if not isinstance(data, bytes): |
|---|
| 719 | # XXX: Why does this return a list? |
|---|
| 720 | data = b"".join(data) |
|---|
| 721 | |
|---|
| 722 | assert len(data) == segsize, len(data) |
|---|
| 723 | |
|---|
| 724 | self._status.set_status("Encrypting") |
|---|
| 725 | |
|---|
| 726 | def encrypt(readkey): |
|---|
| 727 | salt = os.urandom(16) |
|---|
| 728 | key = hashutil.ssk_readkey_data_hash(salt, readkey) |
|---|
| 729 | encryptor = aes.create_encryptor(key) |
|---|
| 730 | crypttext = aes.encrypt_data(encryptor, data) |
|---|
| 731 | assert len(crypttext) == len(data) |
|---|
| 732 | return salt, crypttext |
|---|
| 733 | |
|---|
| 734 | salt, crypttext = await defer_to_thread(encrypt, self.readkey) |
|---|
| 735 | |
|---|
| 736 | now = time.time() |
|---|
| 737 | self._status.accumulate_encrypt_time(now - started) |
|---|
| 738 | started = now |
|---|
| 739 | |
|---|
| 740 | # now apply FEC |
|---|
| 741 | if segnum + 1 == self.num_segments: |
|---|
| 742 | fec = self.tail_fec |
|---|
| 743 | else: |
|---|
| 744 | fec = self.fec |
|---|
| 745 | |
|---|
| 746 | self._status.set_status("Encoding") |
|---|
| 747 | crypttext_pieces = [None] * self.required_shares |
|---|
| 748 | piece_size = fec.get_block_size() |
|---|
| 749 | for i in range(len(crypttext_pieces)): |
|---|
| 750 | offset = i * piece_size |
|---|
| 751 | piece = crypttext[offset:offset+piece_size] |
|---|
| 752 | piece = piece + b"\x00"*(piece_size - len(piece)) # padding |
|---|
| 753 | crypttext_pieces[i] = piece |
|---|
| 754 | assert len(piece) == piece_size |
|---|
| 755 | |
|---|
| 756 | res = await fec.encode(crypttext_pieces) |
|---|
| 757 | elapsed = time.time() - started |
|---|
| 758 | self._status.accumulate_encode_time(elapsed) |
|---|
| 759 | return (res, salt) |
|---|
| 760 | |
|---|
| 761 | @async_to_deferred |
|---|
| 762 | async def _push_segment(self, encoded_and_salt, segnum): |
|---|
| 763 | """ |
|---|
| 764 | I push (data, salt) as segment number segnum. |
|---|
| 765 | """ |
|---|
| 766 | results, salt = encoded_and_salt |
|---|
| 767 | shares, shareids = results |
|---|
| 768 | self._status.set_status("Pushing segment") |
|---|
| 769 | for i in range(len(shares)): |
|---|
| 770 | sharedata = shares[i] |
|---|
| 771 | shareid = shareids[i] |
|---|
| 772 | if self._version == MDMF_VERSION: |
|---|
| 773 | hashed = salt + sharedata |
|---|
| 774 | else: |
|---|
| 775 | hashed = sharedata |
|---|
| 776 | block_hash = await defer_to_thread(hashutil.block_hash, hashed) |
|---|
| 777 | self.blockhashes[shareid][segnum] = block_hash |
|---|
| 778 | # find the writer for this share |
|---|
| 779 | writers = self.writers[shareid] |
|---|
| 780 | for writer in writers: |
|---|
| 781 | writer.put_block(sharedata, segnum, salt) |
|---|
| 782 | |
|---|
| 783 | |
|---|
| 784 | def push_everything_else(self): |
|---|
| 785 | """ |
|---|
| 786 | I put everything else associated with a share. |
|---|
| 787 | """ |
|---|
| 788 | self._pack_started = time.time() |
|---|
| 789 | self.push_encprivkey() |
|---|
| 790 | self.push_blockhashes() |
|---|
| 791 | self.push_sharehashes() |
|---|
| 792 | self.push_toplevel_hashes_and_signature() |
|---|
| 793 | d = self.finish_publishing() |
|---|
| 794 | def _change_state(ignored): |
|---|
| 795 | self._state = DONE_STATE |
|---|
| 796 | d.addCallback(_change_state) |
|---|
| 797 | d.addCallback(self._push) |
|---|
| 798 | return d |
|---|
| 799 | |
|---|
| 800 | |
|---|
| 801 | def push_encprivkey(self): |
|---|
| 802 | encprivkey = self._encprivkey |
|---|
| 803 | self._status.set_status("Pushing encrypted private key") |
|---|
| 804 | for shnum, writers in self.writers.items(): |
|---|
| 805 | for writer in writers: |
|---|
| 806 | writer.put_encprivkey(encprivkey) |
|---|
| 807 | |
|---|
| 808 | |
|---|
| 809 | def push_blockhashes(self): |
|---|
| 810 | self.sharehash_leaves = [None] * len(self.blockhashes) |
|---|
| 811 | self._status.set_status("Building and pushing block hash tree") |
|---|
| 812 | for shnum, blockhashes in list(self.blockhashes.items()): |
|---|
| 813 | t = hashtree.HashTree(blockhashes) |
|---|
| 814 | self.blockhashes[shnum] = list(t) |
|---|
| 815 | # set the leaf for future use. |
|---|
| 816 | self.sharehash_leaves[shnum] = t[0] |
|---|
| 817 | |
|---|
| 818 | writers = self.writers[shnum] |
|---|
| 819 | for writer in writers: |
|---|
| 820 | writer.put_blockhashes(self.blockhashes[shnum]) |
|---|
| 821 | |
|---|
| 822 | |
|---|
| 823 | def push_sharehashes(self): |
|---|
| 824 | self._status.set_status("Building and pushing share hash chain") |
|---|
| 825 | share_hash_tree = hashtree.HashTree(self.sharehash_leaves) |
|---|
| 826 | for shnum in range(len(self.sharehash_leaves)): |
|---|
| 827 | needed_indices = share_hash_tree.needed_hashes(shnum) |
|---|
| 828 | self.sharehashes[shnum] = dict( [ (i, share_hash_tree[i]) |
|---|
| 829 | for i in needed_indices] ) |
|---|
| 830 | writers = self.writers[shnum] |
|---|
| 831 | for writer in writers: |
|---|
| 832 | writer.put_sharehashes(self.sharehashes[shnum]) |
|---|
| 833 | self.root_hash = share_hash_tree[0] |
|---|
| 834 | |
|---|
| 835 | |
|---|
| 836 | def push_toplevel_hashes_and_signature(self): |
|---|
| 837 | # We need to to three things here: |
|---|
| 838 | # - Push the root hash and salt hash |
|---|
| 839 | # - Get the checkstring of the resulting layout; sign that. |
|---|
| 840 | # - Push the signature |
|---|
| 841 | self._status.set_status("Pushing root hashes and signature") |
|---|
| 842 | for shnum in range(self.total_shares): |
|---|
| 843 | writers = self.writers[shnum] |
|---|
| 844 | for writer in writers: |
|---|
| 845 | writer.put_root_hash(self.root_hash) |
|---|
| 846 | self._update_checkstring() |
|---|
| 847 | self._make_and_place_signature() |
|---|
| 848 | |
|---|
| 849 | |
|---|
| 850 | def _update_checkstring(self): |
|---|
| 851 | """ |
|---|
| 852 | After putting the root hash, MDMF files will have the |
|---|
| 853 | checkstring written to the storage server. This means that we |
|---|
| 854 | can update our copy of the checkstring so we can detect |
|---|
| 855 | uncoordinated writes. SDMF files will have the same checkstring, |
|---|
| 856 | so we need not do anything. |
|---|
| 857 | """ |
|---|
| 858 | self._checkstring = self._get_some_writer().get_checkstring() |
|---|
| 859 | |
|---|
| 860 | |
|---|
| 861 | def _make_and_place_signature(self): |
|---|
| 862 | """ |
|---|
| 863 | I create and place the signature. |
|---|
| 864 | """ |
|---|
| 865 | started = time.time() |
|---|
| 866 | self._status.set_status("Signing prefix") |
|---|
| 867 | signable = self._get_some_writer().get_signable() |
|---|
| 868 | self.signature = rsa.sign_data(self._privkey, signable) |
|---|
| 869 | |
|---|
| 870 | for (shnum, writers) in self.writers.items(): |
|---|
| 871 | for writer in writers: |
|---|
| 872 | writer.put_signature(self.signature) |
|---|
| 873 | self._status.timings['sign'] = time.time() - started |
|---|
| 874 | |
|---|
| 875 | |
|---|
| 876 | def finish_publishing(self): |
|---|
| 877 | # We're almost done -- we just need to put the verification key |
|---|
| 878 | # and the offsets |
|---|
| 879 | started = time.time() |
|---|
| 880 | self._status.set_status("Pushing shares") |
|---|
| 881 | self._started_pushing = started |
|---|
| 882 | ds = [] |
|---|
| 883 | verification_key = rsa.der_string_from_verifying_key(self._pubkey) |
|---|
| 884 | |
|---|
| 885 | for (shnum, writers) in list(self.writers.copy().items()): |
|---|
| 886 | for writer in writers: |
|---|
| 887 | writer.put_verification_key(verification_key) |
|---|
| 888 | self.num_outstanding += 1 |
|---|
| 889 | def _no_longer_outstanding(res): |
|---|
| 890 | self.num_outstanding -= 1 |
|---|
| 891 | return res |
|---|
| 892 | |
|---|
| 893 | d = writer.finish_publishing() |
|---|
| 894 | d.addBoth(_no_longer_outstanding) |
|---|
| 895 | d.addErrback(self._connection_problem, writer) |
|---|
| 896 | d.addCallback(self._got_write_answer, writer, started) |
|---|
| 897 | ds.append(d) |
|---|
| 898 | self._record_verinfo() |
|---|
| 899 | self._status.timings['pack'] = time.time() - started |
|---|
| 900 | return defer.DeferredList(ds) |
|---|
| 901 | |
|---|
| 902 | |
|---|
| 903 | def _record_verinfo(self): |
|---|
| 904 | self.versioninfo = self._get_some_writer().get_verinfo() |
|---|
| 905 | |
|---|
| 906 | |
|---|
| 907 | def _connection_problem(self, f, writer): |
|---|
| 908 | """ |
|---|
| 909 | We ran into a connection problem while working with writer, and |
|---|
| 910 | need to deal with that. |
|---|
| 911 | """ |
|---|
| 912 | self.log("found problem: %s" % str(f)) |
|---|
| 913 | self._last_failure = f |
|---|
| 914 | self.writers.discard(writer.shnum, writer) |
|---|
| 915 | |
|---|
| 916 | |
|---|
| 917 | def log_goal(self, goal, message=""): |
|---|
| 918 | logmsg = [message] |
|---|
| 919 | for (shnum, server) in sorted([(s,p) for (p,s) in goal], key=lambda t: (id(t[0]), id(t[1]))): |
|---|
| 920 | logmsg.append("sh%d to [%r]" % (shnum, server.get_name())) |
|---|
| 921 | self.log("current goal: %s" % (", ".join(logmsg)), level=log.NOISY) |
|---|
| 922 | self.log("we are planning to push new seqnum=#%d" % self._new_seqnum, |
|---|
| 923 | level=log.NOISY) |
|---|
| 924 | |
|---|
| 925 | def update_goal(self): |
|---|
| 926 | self.log_goal(self.goal, "before update: ") |
|---|
| 927 | |
|---|
| 928 | # first, remove any bad servers from our goal |
|---|
| 929 | self.goal = set([ (server, shnum) |
|---|
| 930 | for (server, shnum) in self.goal |
|---|
| 931 | if server not in self.bad_servers ]) |
|---|
| 932 | |
|---|
| 933 | # find the homeless shares: |
|---|
| 934 | homefull_shares = set([shnum for (server, shnum) in self.goal]) |
|---|
| 935 | homeless_shares = set(range(self.total_shares)) - homefull_shares |
|---|
| 936 | homeless_shares = sorted(list(homeless_shares)) |
|---|
| 937 | # place them somewhere. We prefer unused servers at the beginning of |
|---|
| 938 | # the available server list. |
|---|
| 939 | |
|---|
| 940 | if not homeless_shares: |
|---|
| 941 | return |
|---|
| 942 | |
|---|
| 943 | # if an old share X is on a node, put the new share X there too. |
|---|
| 944 | # TODO: 1: redistribute shares to achieve one-per-server, by copying |
|---|
| 945 | # shares from existing servers to new (less-crowded) ones. The |
|---|
| 946 | # old shares must still be updated. |
|---|
| 947 | # TODO: 2: move those shares instead of copying them, to reduce future |
|---|
| 948 | # update work |
|---|
| 949 | |
|---|
| 950 | # this is a bit CPU intensive but easy to analyze. We create a sort |
|---|
| 951 | # order for each server. If the server is marked as bad, we don't |
|---|
| 952 | # even put them in the list. Then we care about the number of shares |
|---|
| 953 | # which have already been assigned to them. After that we care about |
|---|
| 954 | # their permutation order. |
|---|
| 955 | old_assignments = DictOfSets() |
|---|
| 956 | for (server, shnum) in self.goal: |
|---|
| 957 | old_assignments.add(server, shnum) |
|---|
| 958 | |
|---|
| 959 | serverlist = [] |
|---|
| 960 | |
|---|
| 961 | action = start_action( |
|---|
| 962 | action_type=u"mutable:upload:update_goal", |
|---|
| 963 | homeless_shares=len(homeless_shares), |
|---|
| 964 | ) |
|---|
| 965 | with action: |
|---|
| 966 | for i, server in enumerate(self.full_serverlist): |
|---|
| 967 | serverid = server.get_serverid() |
|---|
| 968 | if server in self.bad_servers: |
|---|
| 969 | Message.log( |
|---|
| 970 | message_type=u"mutable:upload:bad-server", |
|---|
| 971 | server_id=serverid, |
|---|
| 972 | ) |
|---|
| 973 | continue |
|---|
| 974 | # if we have >= 1 grid-managers, this checks that we have |
|---|
| 975 | # a valid certificate for this server |
|---|
| 976 | if not server.upload_permitted(): |
|---|
| 977 | Message.log( |
|---|
| 978 | message_type=u"mutable:upload:no-gm-certs", |
|---|
| 979 | server_id=serverid, |
|---|
| 980 | ) |
|---|
| 981 | continue |
|---|
| 982 | |
|---|
| 983 | entry = (len(old_assignments.get(server, [])), i, serverid, server) |
|---|
| 984 | serverlist.append(entry) |
|---|
| 985 | serverlist.sort() |
|---|
| 986 | |
|---|
| 987 | if not serverlist: |
|---|
| 988 | raise NotEnoughServersError("Ran out of non-bad servers, " |
|---|
| 989 | "first_error=%s" % |
|---|
| 990 | str(self._first_write_error), |
|---|
| 991 | self._first_write_error) |
|---|
| 992 | |
|---|
| 993 | # we then index this serverlist with an integer, because we may have |
|---|
| 994 | # to wrap. We update the goal as we go. |
|---|
| 995 | i = 0 |
|---|
| 996 | for shnum in homeless_shares: |
|---|
| 997 | (ignored1, ignored2, ignored3, server) = serverlist[i] |
|---|
| 998 | # if we are forced to send a share to a server that already has |
|---|
| 999 | # one, we may have two write requests in flight, and the |
|---|
| 1000 | # servermap (which was computed before either request was sent) |
|---|
| 1001 | # won't reflect the new shares, so the second response will be |
|---|
| 1002 | # surprising. There is code in _got_write_answer() to tolerate |
|---|
| 1003 | # this, otherwise it would cause the publish to fail with an |
|---|
| 1004 | # UncoordinatedWriteError. See #546 for details of the trouble |
|---|
| 1005 | # this used to cause. |
|---|
| 1006 | self.goal.add( (server, shnum) ) |
|---|
| 1007 | i += 1 |
|---|
| 1008 | if i >= len(serverlist): |
|---|
| 1009 | i = 0 |
|---|
| 1010 | self.log_goal(self.goal, "after update: ") |
|---|
| 1011 | |
|---|
| 1012 | |
|---|
| 1013 | def _got_write_answer(self, answer, writer, started): |
|---|
| 1014 | if not answer: |
|---|
| 1015 | # SDMF writers only pretend to write when readers set their |
|---|
| 1016 | # blocks, salts, and so on -- they actually just write once, |
|---|
| 1017 | # at the end of the upload process. In fake writes, they |
|---|
| 1018 | # return defer.succeed(None). If we see that, we shouldn't |
|---|
| 1019 | # bother checking it. |
|---|
| 1020 | return |
|---|
| 1021 | |
|---|
| 1022 | server = writer.server |
|---|
| 1023 | lp = self.log("_got_write_answer from %r, share %d" % |
|---|
| 1024 | (server.get_name(), writer.shnum)) |
|---|
| 1025 | |
|---|
| 1026 | now = time.time() |
|---|
| 1027 | elapsed = now - started |
|---|
| 1028 | |
|---|
| 1029 | self._status.add_per_server_time(server, elapsed) |
|---|
| 1030 | |
|---|
| 1031 | wrote, read_data = answer |
|---|
| 1032 | |
|---|
| 1033 | surprise_shares = set(read_data.keys()) - set([writer.shnum]) |
|---|
| 1034 | |
|---|
| 1035 | # We need to remove from surprise_shares any shares that we are |
|---|
| 1036 | # knowingly also writing to that server from other writers. |
|---|
| 1037 | |
|---|
| 1038 | # TODO: Precompute this. |
|---|
| 1039 | shares = [] |
|---|
| 1040 | for shnum, writers in self.writers.items(): |
|---|
| 1041 | shares.extend([x.shnum for x in writers if x.server == server]) |
|---|
| 1042 | known_shnums = set(shares) |
|---|
| 1043 | surprise_shares -= known_shnums |
|---|
| 1044 | self.log("found the following surprise shares: %s" % |
|---|
| 1045 | str(surprise_shares)) |
|---|
| 1046 | |
|---|
| 1047 | # Now surprise shares contains all of the shares that we did not |
|---|
| 1048 | # expect to be there. |
|---|
| 1049 | |
|---|
| 1050 | surprised = False |
|---|
| 1051 | for shnum in surprise_shares: |
|---|
| 1052 | # read_data is a dict mapping shnum to checkstring (SIGNED_PREFIX) |
|---|
| 1053 | checkstring = read_data[shnum][0] |
|---|
| 1054 | # What we want to do here is to see if their (seqnum, |
|---|
| 1055 | # roothash, salt) is the same as our (seqnum, roothash, |
|---|
| 1056 | # salt), or the equivalent for MDMF. The best way to do this |
|---|
| 1057 | # is to store a packed representation of our checkstring |
|---|
| 1058 | # somewhere, then not bother unpacking the other |
|---|
| 1059 | # checkstring. |
|---|
| 1060 | if checkstring == self._checkstring: |
|---|
| 1061 | # they have the right share, somehow |
|---|
| 1062 | |
|---|
| 1063 | if (server,shnum) in self.goal: |
|---|
| 1064 | # and we want them to have it, so we probably sent them a |
|---|
| 1065 | # copy in an earlier write. This is ok, and avoids the |
|---|
| 1066 | # #546 problem. |
|---|
| 1067 | continue |
|---|
| 1068 | |
|---|
| 1069 | # They aren't in our goal, but they are still for the right |
|---|
| 1070 | # version. Somebody else wrote them, and it's a convergent |
|---|
| 1071 | # uncoordinated write. Pretend this is ok (don't be |
|---|
| 1072 | # surprised), since I suspect there's a decent chance that |
|---|
| 1073 | # we'll hit this in normal operation. |
|---|
| 1074 | continue |
|---|
| 1075 | |
|---|
| 1076 | else: |
|---|
| 1077 | # the new shares are of a different version |
|---|
| 1078 | if server in self._servermap.get_reachable_servers(): |
|---|
| 1079 | # we asked them about their shares, so we had knowledge |
|---|
| 1080 | # of what they used to have. Any surprising shares must |
|---|
| 1081 | # have come from someone else, so UCW. |
|---|
| 1082 | surprised = True |
|---|
| 1083 | else: |
|---|
| 1084 | # we didn't ask them, and now we've discovered that they |
|---|
| 1085 | # have a share we didn't know about. This indicates that |
|---|
| 1086 | # mapupdate should have wokred harder and asked more |
|---|
| 1087 | # servers before concluding that it knew about them all. |
|---|
| 1088 | |
|---|
| 1089 | # signal UCW, but make sure to ask this server next time, |
|---|
| 1090 | # so we'll remember to update it if/when we retry. |
|---|
| 1091 | surprised = True |
|---|
| 1092 | # TODO: ask this server next time. I don't yet have a good |
|---|
| 1093 | # way to do this. Two insufficient possibilities are: |
|---|
| 1094 | # |
|---|
| 1095 | # self._servermap.add_new_share(server, shnum, verinfo, now) |
|---|
| 1096 | # but that requires fetching/validating/parsing the whole |
|---|
| 1097 | # version string, and all we have is the checkstring |
|---|
| 1098 | # self._servermap.mark_bad_share(server, shnum, checkstring) |
|---|
| 1099 | # that will make publish overwrite the share next time, |
|---|
| 1100 | # but it won't re-query the server, and it won't make |
|---|
| 1101 | # mapupdate search further |
|---|
| 1102 | |
|---|
| 1103 | # TODO later: when publish starts, do |
|---|
| 1104 | # servermap.get_best_version(), extract the seqnum, |
|---|
| 1105 | # subtract one, and store as highest-replaceable-seqnum. |
|---|
| 1106 | # Then, if this surprise-because-we-didn't-ask share is |
|---|
| 1107 | # of highest-replaceable-seqnum or lower, we're allowed |
|---|
| 1108 | # to replace it: send out a new writev (or rather add it |
|---|
| 1109 | # to self.goal and loop). |
|---|
| 1110 | |
|---|
| 1111 | surprised = True |
|---|
| 1112 | |
|---|
| 1113 | if surprised: |
|---|
| 1114 | self.log("they had shares %s that we didn't know about" % |
|---|
| 1115 | (list(surprise_shares),), |
|---|
| 1116 | parent=lp, level=log.WEIRD, umid="un9CSQ") |
|---|
| 1117 | self.surprised = True |
|---|
| 1118 | |
|---|
| 1119 | if not wrote: |
|---|
| 1120 | # TODO: there are two possibilities. The first is that the server |
|---|
| 1121 | # is full (or just doesn't want to give us any room), which means |
|---|
| 1122 | # we shouldn't ask them again, but is *not* an indication of an |
|---|
| 1123 | # uncoordinated write. The second is that our testv failed, which |
|---|
| 1124 | # *does* indicate an uncoordinated write. We currently don't have |
|---|
| 1125 | # a way to tell these two apart (in fact, the storage server code |
|---|
| 1126 | # doesn't have the option of refusing our share). |
|---|
| 1127 | # |
|---|
| 1128 | # If the server is full, mark the server as bad (so we don't ask |
|---|
| 1129 | # them again), but don't set self.surprised. The loop() will find |
|---|
| 1130 | # a new server. |
|---|
| 1131 | # |
|---|
| 1132 | # If the testv failed, log it, set self.surprised, but don't |
|---|
| 1133 | # bother adding to self.bad_servers . |
|---|
| 1134 | |
|---|
| 1135 | self.log("our testv failed, so the write did not happen", |
|---|
| 1136 | parent=lp, level=log.WEIRD, umid="8sc26g") |
|---|
| 1137 | self.surprised = True |
|---|
| 1138 | self.bad_servers.add(server) # don't ask them again |
|---|
| 1139 | # use the checkstring to add information to the log message |
|---|
| 1140 | unknown_format = False |
|---|
| 1141 | for (shnum,readv) in list(read_data.items()): |
|---|
| 1142 | checkstring = readv[0] |
|---|
| 1143 | version = get_version_from_checkstring(checkstring) |
|---|
| 1144 | if version == MDMF_VERSION: |
|---|
| 1145 | (other_seqnum, |
|---|
| 1146 | other_roothash) = unpack_mdmf_checkstring(checkstring) |
|---|
| 1147 | elif version == SDMF_VERSION: |
|---|
| 1148 | (other_seqnum, |
|---|
| 1149 | other_roothash, |
|---|
| 1150 | other_IV) = unpack_sdmf_checkstring(checkstring) |
|---|
| 1151 | else: |
|---|
| 1152 | unknown_format = True |
|---|
| 1153 | expected_version = self._servermap.version_on_server(server, |
|---|
| 1154 | shnum) |
|---|
| 1155 | if expected_version: |
|---|
| 1156 | (seqnum, root_hash, IV, segsize, datalength, k, N, prefix, |
|---|
| 1157 | offsets_tuple) = expected_version |
|---|
| 1158 | msg = ("somebody modified the share on us:" |
|---|
| 1159 | " shnum=%d: I thought they had #%d:R=%r," % |
|---|
| 1160 | (shnum, |
|---|
| 1161 | seqnum, base32.b2a(root_hash)[:4])) |
|---|
| 1162 | if unknown_format: |
|---|
| 1163 | msg += (" but I don't know how to read share" |
|---|
| 1164 | " format %d" % version) |
|---|
| 1165 | else: |
|---|
| 1166 | msg += " but testv reported #%d:R=%r" % \ |
|---|
| 1167 | (other_seqnum, base32.b2a(other_roothash)[:4]) |
|---|
| 1168 | self.log(msg, parent=lp, level=log.NOISY) |
|---|
| 1169 | # if expected_version==None, then we didn't expect to see a |
|---|
| 1170 | # share on that server, and the 'surprise_shares' clause |
|---|
| 1171 | # above will have logged it. |
|---|
| 1172 | return |
|---|
| 1173 | |
|---|
| 1174 | # and update the servermap |
|---|
| 1175 | # self.versioninfo is set during the last phase of publishing. |
|---|
| 1176 | # If we get there, we know that responses correspond to placed |
|---|
| 1177 | # shares, and can safely execute these statements. |
|---|
| 1178 | if self.versioninfo: |
|---|
| 1179 | self.log("wrote successfully: adding new share to servermap") |
|---|
| 1180 | self._servermap.add_new_share(server, writer.shnum, |
|---|
| 1181 | self.versioninfo, started) |
|---|
| 1182 | self.placed.add( (server, writer.shnum) ) |
|---|
| 1183 | self._update_status() |
|---|
| 1184 | # the next method in the deferred chain will check to see if |
|---|
| 1185 | # we're done and successful. |
|---|
| 1186 | return |
|---|
| 1187 | |
|---|
| 1188 | |
|---|
| 1189 | def _done(self): |
|---|
| 1190 | if not self._running: |
|---|
| 1191 | return |
|---|
| 1192 | self._running = False |
|---|
| 1193 | now = time.time() |
|---|
| 1194 | self._status.timings["total"] = now - self._started |
|---|
| 1195 | |
|---|
| 1196 | elapsed = now - self._started_pushing |
|---|
| 1197 | self._status.timings['push'] = elapsed |
|---|
| 1198 | |
|---|
| 1199 | self._status.set_active(False) |
|---|
| 1200 | self.log("Publish done, success") |
|---|
| 1201 | self._status.set_status("Finished") |
|---|
| 1202 | self._status.set_progress(1.0) |
|---|
| 1203 | # Get k and segsize, then give them to the caller. |
|---|
| 1204 | hints = {} |
|---|
| 1205 | hints['segsize'] = self.segment_size |
|---|
| 1206 | hints['k'] = self.required_shares |
|---|
| 1207 | self._node.set_downloader_hints(hints) |
|---|
| 1208 | eventually(self.done_deferred.callback, None) |
|---|
| 1209 | |
|---|
| 1210 | def _failure(self, f=None): |
|---|
| 1211 | if f: |
|---|
| 1212 | self._last_failure = f |
|---|
| 1213 | |
|---|
| 1214 | if not self.surprised: |
|---|
| 1215 | # We ran out of servers |
|---|
| 1216 | msg = "Publish ran out of good servers" |
|---|
| 1217 | if self._last_failure: |
|---|
| 1218 | msg += ", last failure was: %s" % str(self._last_failure) |
|---|
| 1219 | self.log(msg) |
|---|
| 1220 | e = NotEnoughServersError(msg) |
|---|
| 1221 | |
|---|
| 1222 | else: |
|---|
| 1223 | # We ran into shares that we didn't recognize, which means |
|---|
| 1224 | # that we need to return an UncoordinatedWriteError. |
|---|
| 1225 | self.log("Publish failed with UncoordinatedWriteError") |
|---|
| 1226 | e = UncoordinatedWriteError() |
|---|
| 1227 | f = failure.Failure(e) |
|---|
| 1228 | eventually(self.done_deferred.callback, f) |
|---|
| 1229 | |
|---|
| 1230 | |
|---|
| 1231 | @implementer(IMutableUploadable) |
|---|
| 1232 | class MutableFileHandle: |
|---|
| 1233 | """ |
|---|
| 1234 | I am a mutable uploadable built around a filehandle-like object, |
|---|
| 1235 | usually either a BytesIO instance or a handle to an actual file. |
|---|
| 1236 | """ |
|---|
| 1237 | |
|---|
| 1238 | def __init__(self, filehandle): |
|---|
| 1239 | # The filehandle is defined as a generally file-like object that |
|---|
| 1240 | # has these two methods. We don't care beyond that. |
|---|
| 1241 | assert hasattr(filehandle, "read") |
|---|
| 1242 | assert hasattr(filehandle, "close") |
|---|
| 1243 | |
|---|
| 1244 | self._filehandle = filehandle |
|---|
| 1245 | # We must start reading at the beginning of the file, or we risk |
|---|
| 1246 | # encountering errors when the data read does not match the size |
|---|
| 1247 | # reported to the uploader. |
|---|
| 1248 | self._filehandle.seek(0) |
|---|
| 1249 | |
|---|
| 1250 | # We have not yet read anything, so our position is 0. |
|---|
| 1251 | self._marker = 0 |
|---|
| 1252 | |
|---|
| 1253 | |
|---|
| 1254 | def get_size(self): |
|---|
| 1255 | """ |
|---|
| 1256 | I return the amount of data in my filehandle. |
|---|
| 1257 | """ |
|---|
| 1258 | if not hasattr(self, "_size"): |
|---|
| 1259 | old_position = self._filehandle.tell() |
|---|
| 1260 | # Seek to the end of the file by seeking 0 bytes from the |
|---|
| 1261 | # file's end |
|---|
| 1262 | self._filehandle.seek(0, os.SEEK_END) |
|---|
| 1263 | self._size = self._filehandle.tell() |
|---|
| 1264 | # Restore the previous position, in case this was called |
|---|
| 1265 | # after a read. |
|---|
| 1266 | self._filehandle.seek(old_position) |
|---|
| 1267 | assert self._filehandle.tell() == old_position |
|---|
| 1268 | |
|---|
| 1269 | assert hasattr(self, "_size") |
|---|
| 1270 | return self._size |
|---|
| 1271 | |
|---|
| 1272 | |
|---|
| 1273 | def pos(self): |
|---|
| 1274 | """ |
|---|
| 1275 | I return the position of my read marker -- i.e., how much data I |
|---|
| 1276 | have already read and returned to callers. |
|---|
| 1277 | """ |
|---|
| 1278 | return self._marker |
|---|
| 1279 | |
|---|
| 1280 | |
|---|
| 1281 | def read(self, length): |
|---|
| 1282 | """ |
|---|
| 1283 | I return some data (up to length bytes) from my filehandle. |
|---|
| 1284 | |
|---|
| 1285 | In most cases, I return length bytes, but sometimes I won't -- |
|---|
| 1286 | for example, if I am asked to read beyond the end of a file, or |
|---|
| 1287 | an error occurs. |
|---|
| 1288 | """ |
|---|
| 1289 | results = self._filehandle.read(length) |
|---|
| 1290 | self._marker += len(results) |
|---|
| 1291 | return [results] |
|---|
| 1292 | |
|---|
| 1293 | |
|---|
| 1294 | def close(self): |
|---|
| 1295 | """ |
|---|
| 1296 | I close the underlying filehandle. Any further operations on the |
|---|
| 1297 | filehandle fail at this point. |
|---|
| 1298 | """ |
|---|
| 1299 | self._filehandle.close() |
|---|
| 1300 | |
|---|
| 1301 | |
|---|
| 1302 | class MutableData(MutableFileHandle): |
|---|
| 1303 | """ |
|---|
| 1304 | I am a mutable uploadable built around a string, which I then cast |
|---|
| 1305 | into a BytesIO and treat as a filehandle. |
|---|
| 1306 | """ |
|---|
| 1307 | |
|---|
| 1308 | def __init__(self, s): |
|---|
| 1309 | # Take a string and return a file-like uploadable. |
|---|
| 1310 | assert isinstance(s, bytes) |
|---|
| 1311 | |
|---|
| 1312 | MutableFileHandle.__init__(self, BytesIO(s)) |
|---|
| 1313 | |
|---|
| 1314 | |
|---|
| 1315 | @implementer(IMutableUploadable) |
|---|
| 1316 | class TransformingUploadable: |
|---|
| 1317 | """ |
|---|
| 1318 | I am an IMutableUploadable that wraps another IMutableUploadable, |
|---|
| 1319 | and some segments that are already on the grid. When I am called to |
|---|
| 1320 | read, I handle merging of boundary segments. |
|---|
| 1321 | """ |
|---|
| 1322 | |
|---|
| 1323 | |
|---|
| 1324 | def __init__(self, data, offset, segment_size, start, end): |
|---|
| 1325 | assert IMutableUploadable.providedBy(data) |
|---|
| 1326 | |
|---|
| 1327 | self._newdata = data |
|---|
| 1328 | self._offset = offset |
|---|
| 1329 | self._segment_size = segment_size |
|---|
| 1330 | self._start = start |
|---|
| 1331 | self._end = end |
|---|
| 1332 | |
|---|
| 1333 | self._read_marker = 0 |
|---|
| 1334 | |
|---|
| 1335 | self._first_segment_offset = offset % segment_size |
|---|
| 1336 | |
|---|
| 1337 | num = self.log("TransformingUploadable: starting", parent=None) |
|---|
| 1338 | self._log_number = num |
|---|
| 1339 | self.log("got fso: %d" % self._first_segment_offset) |
|---|
| 1340 | self.log("got offset: %d" % self._offset) |
|---|
| 1341 | |
|---|
| 1342 | |
|---|
| 1343 | def log(self, *args, **kwargs): |
|---|
| 1344 | if 'parent' not in kwargs: |
|---|
| 1345 | kwargs['parent'] = self._log_number |
|---|
| 1346 | if "facility" not in kwargs: |
|---|
| 1347 | kwargs["facility"] = "tahoe.mutable.transforminguploadable" |
|---|
| 1348 | return log.msg(*args, **kwargs) |
|---|
| 1349 | |
|---|
| 1350 | |
|---|
| 1351 | def get_size(self): |
|---|
| 1352 | return self._offset + self._newdata.get_size() |
|---|
| 1353 | |
|---|
| 1354 | |
|---|
| 1355 | def read(self, length): |
|---|
| 1356 | # We can get data from 3 sources here. |
|---|
| 1357 | # 1. The first of the segments provided to us. |
|---|
| 1358 | # 2. The data that we're replacing things with. |
|---|
| 1359 | # 3. The last of the segments provided to us. |
|---|
| 1360 | |
|---|
| 1361 | # are we in state 0? |
|---|
| 1362 | self.log("reading %d bytes" % length) |
|---|
| 1363 | |
|---|
| 1364 | old_start_data = b"" |
|---|
| 1365 | old_data_length = self._first_segment_offset - self._read_marker |
|---|
| 1366 | if old_data_length > 0: |
|---|
| 1367 | if old_data_length > length: |
|---|
| 1368 | old_data_length = length |
|---|
| 1369 | self.log("returning %d bytes of old start data" % old_data_length) |
|---|
| 1370 | |
|---|
| 1371 | old_data_end = old_data_length + self._read_marker |
|---|
| 1372 | old_start_data = self._start[self._read_marker:old_data_end] |
|---|
| 1373 | length -= old_data_length |
|---|
| 1374 | else: |
|---|
| 1375 | # otherwise calculations later get screwed up. |
|---|
| 1376 | old_data_length = 0 |
|---|
| 1377 | |
|---|
| 1378 | # Is there enough new data to satisfy this read? If not, we need |
|---|
| 1379 | # to pad the end of the data with data from our last segment. |
|---|
| 1380 | old_end_length = length - \ |
|---|
| 1381 | (self._newdata.get_size() - self._newdata.pos()) |
|---|
| 1382 | old_end_data = b"" |
|---|
| 1383 | if old_end_length > 0: |
|---|
| 1384 | self.log("reading %d bytes of old end data" % old_end_length) |
|---|
| 1385 | |
|---|
| 1386 | # TODO: We're not explicitly checking for tail segment size |
|---|
| 1387 | # here. Is that a problem? |
|---|
| 1388 | old_data_offset = (length - old_end_length + \ |
|---|
| 1389 | old_data_length) % self._segment_size |
|---|
| 1390 | self.log("reading at offset %d" % old_data_offset) |
|---|
| 1391 | old_end = old_data_offset + old_end_length |
|---|
| 1392 | old_end_data = self._end[old_data_offset:old_end] |
|---|
| 1393 | length -= old_end_length |
|---|
| 1394 | assert length == self._newdata.get_size() - self._newdata.pos() |
|---|
| 1395 | |
|---|
| 1396 | self.log("reading %d bytes of new data" % length) |
|---|
| 1397 | new_data = self._newdata.read(length) |
|---|
| 1398 | new_data = b"".join(new_data) |
|---|
| 1399 | |
|---|
| 1400 | self._read_marker += len(old_start_data + new_data + old_end_data) |
|---|
| 1401 | |
|---|
| 1402 | return old_start_data + new_data + old_end_data |
|---|
| 1403 | |
|---|
| 1404 | def close(self): |
|---|
| 1405 | pass |
|---|