diff --git a/src/allmydata/immutable/filenode.py b/src/allmydata/immutable/filenode.py
index 44c8e95..ed3785b 100644
|
a
|
b
|
import binascii |
| 3 | 3 | import copy |
| 4 | 4 | import time |
| 5 | 5 | now = time.time |
| 6 | | from zope.interface import implements |
| | 6 | from zope.interface import implements, Interface |
| 7 | 7 | from twisted.internet import defer |
| 8 | 8 | from twisted.internet.interfaces import IConsumer |
| 9 | 9 | |
| … |
… |
from allmydata.immutable.repairer import Repairer |
| 19 | 19 | from allmydata.immutable.downloader.node import DownloadNode |
| 20 | 20 | from allmydata.immutable.downloader.status import DownloadStatus |
| 21 | 21 | |
| | 22 | class IDownloadStatusHandlingConsumer(Interface): |
| | 23 | def set_download_status_read_event(read_ev): |
| | 24 | """Record the DownloadStatus 'read event', to be updated with the |
| | 25 | time it takes to decrypt each chunk of data.""" |
| | 26 | |
| 22 | 27 | class CiphertextFileNode: |
| 23 | 28 | def __init__(self, verifycap, storage_broker, secret_holder, |
| 24 | | terminator, history, download_status=None): |
| | 29 | terminator, history): |
| 25 | 30 | assert isinstance(verifycap, uri.CHKFileVerifierURI) |
| 26 | 31 | self._verifycap = verifycap |
| 27 | 32 | self._storage_broker = storage_broker |
| 28 | 33 | self._secret_holder = secret_holder |
| 29 | | if download_status is None: |
| 30 | | ds = DownloadStatus(verifycap.storage_index, verifycap.size) |
| 31 | | if history: |
| 32 | | history.add_download(ds) |
| 33 | | download_status = ds |
| 34 | 34 | self._terminator = terminator |
| 35 | 35 | self._history = history |
| 36 | | self._download_status = download_status |
| | 36 | self._download_status = None |
| 37 | 37 | self._node = None # created lazily, on read() |
| 38 | 38 | |
| 39 | 39 | def _maybe_create_download_node(self): |
| | 40 | if not self._download_status: |
| | 41 | ds = DownloadStatus(self._verifycap.storage_index, |
| | 42 | self._verifycap.size) |
| | 43 | if self._history: |
| | 44 | self._history.add_download(ds) |
| | 45 | self._download_status = ds |
| 40 | 46 | if self._node is None: |
| 41 | 47 | self._node = DownloadNode(self._verifycap, self._storage_broker, |
| 42 | 48 | self._secret_holder, |
| 43 | 49 | self._terminator, |
| 44 | 50 | self._history, self._download_status) |
| 45 | 51 | |
| 46 | | def read(self, consumer, offset=0, size=None, read_ev=None): |
| | 52 | def read(self, consumer, offset=0, size=None): |
| 47 | 53 | """I am the main entry point, from which FileNode.read() can get |
| 48 | 54 | data. I feed the consumer with the desired range of ciphertext. I |
| 49 | 55 | return a Deferred that fires (with the consumer) when the read is |
| 50 | 56 | finished.""" |
| 51 | 57 | self._maybe_create_download_node() |
| | 58 | actual_size = size |
| | 59 | if actual_size is None: |
| | 60 | actual_size = self._verifycap.size - offset |
| | 61 | read_ev = self._download_status.add_read_event(offset, actual_size, |
| | 62 | now()) |
| | 63 | if IDownloadStatusHandlingConsumer.providedBy(consumer): |
| | 64 | consumer.set_download_status_read_event(read_ev) |
| 52 | 65 | return self._node.read(consumer, offset, size, read_ev) |
| 53 | 66 | |
| 54 | 67 | def get_segment(self, segnum): |
| … |
… |
class CiphertextFileNode: |
| 155 | 168 | monitor=monitor) |
| 156 | 169 | return v.start() |
| 157 | 170 | |
| 158 | | |
| 159 | 171 | class DecryptingConsumer: |
| 160 | 172 | """I sit between a CiphertextDownloader (which acts as a Producer) and |
| 161 | 173 | the real Consumer, decrypting everything that passes by. The real |
| 162 | 174 | Consumer sees the real Producer, but the Producer sees us instead of the |
| 163 | 175 | real consumer.""" |
| 164 | | implements(IConsumer) |
| | 176 | implements(IConsumer, IDownloadStatusHandlingConsumer) |
| 165 | 177 | |
| 166 | | def __init__(self, consumer, readkey, offset, read_event): |
| | 178 | def __init__(self, consumer, readkey, offset): |
| 167 | 179 | self._consumer = consumer |
| 168 | | self._read_event = read_event |
| | 180 | self._read_event = None |
| 169 | 181 | # TODO: pycryptopp CTR-mode needs random-access operations: I want |
| 170 | 182 | # either a=AES(readkey, offset) or better yet both of: |
| 171 | 183 | # a=AES(readkey, offset=0) |
| … |
… |
class DecryptingConsumer: |
| 177 | 189 | self._decryptor = AES(readkey, iv=iv) |
| 178 | 190 | self._decryptor.process("\x00"*offset_small) |
| 179 | 191 | |
| | 192 | def set_download_status_read_event(self, read_ev): |
| | 193 | self._read_event = read_ev |
| | 194 | |
| 180 | 195 | def registerProducer(self, producer, streaming): |
| 181 | 196 | # this passes through, so the real consumer can flow-control the real |
| 182 | 197 | # producer. Therefore we don't need to provide any IPushProducer |
| … |
… |
class DecryptingConsumer: |
| 188 | 203 | def write(self, ciphertext): |
| 189 | 204 | started = now() |
| 190 | 205 | plaintext = self._decryptor.process(ciphertext) |
| 191 | | elapsed = now() - started |
| 192 | | self._read_event.update(0, elapsed, 0) |
| | 206 | if self._read_event: |
| | 207 | elapsed = now() - started |
| | 208 | self._read_event.update(0, elapsed, 0) |
| 193 | 209 | self._consumer.write(plaintext) |
| 194 | 210 | |
| 195 | 211 | class ImmutableFileNode: |
| … |
… |
class ImmutableFileNode: |
| 200 | 216 | history): |
| 201 | 217 | assert isinstance(filecap, uri.CHKFileURI) |
| 202 | 218 | verifycap = filecap.get_verify_cap() |
| 203 | | ds = DownloadStatus(verifycap.storage_index, verifycap.size) |
| 204 | | if history: |
| 205 | | history.add_download(ds) |
| 206 | | self._download_status = ds |
| 207 | 219 | self._cnode = CiphertextFileNode(verifycap, storage_broker, |
| 208 | | secret_holder, terminator, history, ds) |
| | 220 | secret_holder, terminator, history) |
| 209 | 221 | assert isinstance(filecap, uri.CHKFileURI) |
| 210 | 222 | self.u = filecap |
| 211 | 223 | self._readkey = filecap.key |
| … |
… |
class ImmutableFileNode: |
| 226 | 238 | return True |
| 227 | 239 | |
| 228 | 240 | def read(self, consumer, offset=0, size=None): |
| 229 | | actual_size = size |
| 230 | | if actual_size == None: |
| 231 | | actual_size = self.u.size |
| 232 | | actual_size = actual_size - offset |
| 233 | | read_ev = self._download_status.add_read_event(offset,actual_size, |
| 234 | | now()) |
| 235 | | decryptor = DecryptingConsumer(consumer, self._readkey, offset, read_ev) |
| 236 | | d = self._cnode.read(decryptor, offset, size, read_ev) |
| | 241 | decryptor = DecryptingConsumer(consumer, self._readkey, offset) |
| | 242 | d = self._cnode.read(decryptor, offset, size) |
| 237 | 243 | d.addCallback(lambda dc: consumer) |
| 238 | 244 | return d |
| 239 | 245 | |